1#[cfg(feature = "hydrate")]
2use ftml_ontology::utils::time::Timestamp;
3use ftml_uris::ModuleUri;
4use leptos::prelude::*;
5use std::{collections::BTreeMap, sync::atomic::AtomicUsize};
6
7const SERVER_ENDPOINT: &str = "/ws/mathjx";
8
9#[cfg(feature = "ssr")]
10use flams_router_base::ws::WSSocket;
11
12#[cfg(feature = "ssr")]
13pub(crate) static TEX_SPAN: std::sync::LazyLock<tracing::Span> =
14 std::sync::LazyLock::new(|| tracing::info_span!(target:"mathjx",parent:None,"mathjx"));
15
16#[cfg(feature = "ssr")]
17#[derive(Clone)]
18pub struct TeXSocket {
19 socket: WSSocket<(usize, TeXMath), (usize, Result<String, String>)>,
20 tex: std::sync::Arc<flams_stex::math::RusTeXMath>,
21}
22
23#[cfg(feature = "ssr")]
24impl flams_router_base::ws::WSServerSocket<(usize, TeXMath), (usize, Result<String, String>)>
25 for TeXSocket
26{
27 #[inline]
28 fn span(&self) -> Option<&'static tracing::Span> {
29 Some(&*TEX_SPAN)
30 }
31
32 async fn new(
33 socket: flams_router_base::ws::WSSocket<(usize, TeXMath), (usize, Result<String, String>)>,
34 ) -> Self {
35 tokio::task::spawn_blocking(move || {
36 TEX_SPAN.in_scope(move || Self {
37 socket,
38 tex: std::sync::Arc::new(flams_stex::math::RusTeXMath::default()),
39 })
40 })
41 .await
42 .expect("this should not happen")
43 }
44 async fn handle(&self, (i, msg): (usize, TeXMath)) -> bool {
45 let s = self.clone();
46 tokio::task::spawn_blocking(move || {
47 TEX_SPAN.in_scope(move || {
48 tracing::info!("Received {:?}", msg);
49 if let Some(r) = match msg {
50 TeXMath::UseModule(m) => {
51 s.tex.add_usemodule(&m);
52 None
53 }
54 TeXMath::Inline(il) => {
55 let r = s.tex.run(&format!("${il}$"));
56 Some((i, r))
57 }
58 TeXMath::Block(bl) => {
59 let r = s.tex.run(&format!("\\[{bl}\\]")).map(|mut s| {
60 s.insert_str(0, "<div class=\"rustex-display\">");
61 s.push_str("</div>");
62 s
63 });
64 Some((i, r))
65 }
66 } {
67 tracing::info!("Returning {:?}", r);
68 s.socket.send(r);
69 }
70 true
71 })
72 })
73 .await
74 .expect("this should not happen")
75 }
76}
77
78#[cfg(feature = "hydrate")]
79#[derive(Debug, Clone)]
80pub struct TeXClient {
81 socket: send_wrapper::SendWrapper<
82 flams_router_base::ws::WSClient<(usize, TeXMath), (usize, Result<String, String>)>,
83 >,
84 counter: std::sync::Arc<AtomicUsize>,
85 values: std::sync::Arc<
86 parking_lot::Mutex<BTreeMap<usize, RwSignal<Option<Result<String, String>>>>>,
87 >,
88 cache: std::sync::Arc<
89 dashmap::DashMap<
90 TeXMath,
91 RwSignal<Option<Result<String, String>>>,
92 rustc_hash::FxBuildHasher,
93 >,
94 >,
95 current: std::sync::Arc<parking_lot::Mutex<(Timestamp, Vec<(usize, TeXMath)>)>>,
96 drop: DropFlag,
97}
98
99#[cfg(feature = "hydrate")]
100#[derive(Clone, Debug, Default)]
101struct DropFlag(std::sync::Arc<std::sync::atomic::AtomicBool>);
102#[cfg(feature = "hydrate")]
103impl Drop for DropFlag {
104 fn drop(&mut self) {
105 if std::sync::Arc::strong_count(&self.0) == 1 {
106 self.0.store(true, std::sync::atomic::Ordering::Release);
107 }
108 }
109}
110#[cfg(feature = "hydrate")]
111impl DropFlag {
112 pub fn get(&self) -> bool {
113 self.0.load(std::sync::atomic::Ordering::Acquire)
114 }
115}
116
117#[cfg(feature = "hydrate")]
118impl TeXClient {
119 pub fn provide() {
120 let counter = std::sync::Arc::default();
121 let values: std::sync::Arc<
122 parking_lot::Mutex<BTreeMap<usize, RwSignal<Option<Result<String, String>>>>>,
123 > = std::sync::Arc::default();
124 let cache: std::sync::Arc<
125 dashmap::DashMap<
126 TeXMath,
127 RwSignal<Option<Result<String, String>>>,
128 rustc_hash::FxBuildHasher,
129 >,
130 > = std::sync::Arc::default();
131 let drop = DropFlag::default();
132 let current: std::sync::Arc<parking_lot::Mutex<(Timestamp, Vec<(usize, TeXMath)>)>> =
133 std::sync::Arc::default();
134
135 let valuecl = values.clone();
136 let socket = flams_router_base::ws::WSClient::<
137 (usize, TeXMath),
138 (usize, Result<String, String>),
139 >::new(SERVER_ENDPOINT, move |(i, res)| {
140 let Some(e) = valuecl.lock().remove(&i) else {
141 tracing::warn!("Signal {i} disappeared!");
142 return;
143 };
144 e.set(Some(res));
145 })
146 .expect("should be impossible");
147
148 let dropcl = drop.clone();
149 let currentcl = current.clone();
150 let socket_cl = socket.clone();
151 wasm_bindgen_futures::spawn_local(async move {
152 tracing::info!("Running wasm future");
153 loop {
154 if dropcl.get() {
155 tracing::info!("Dropping future");
156 break;
157 }
158 gloo_timers::future::TimeoutFuture::new(500).await;
159 let (ts, v) = &mut *currentcl.lock();
160 tracing::debug!("Checking {ts} (now: {})", Timestamp::now());
161 if ts.since_now().seconds() >= 0.5 && !v.is_empty() {
162 tracing::debug!("Sending things");
163 for msg in v.drain(..) {
164 socket_cl.send(&msg);
165 }
166 }
167 }
168 });
169
170 let slf = Self {
171 socket: send_wrapper::SendWrapper::new(socket),
172 counter,
173 cache,
174 values,
175 current,
176 drop,
177 };
178 provide_context(slf);
179 }
180 pub fn add_import(uri: ModuleUri) {
181 let slf = expect_context::<Self>();
182 slf.socket.send(&(0, TeXMath::UseModule(uri)));
183 }
184 pub fn reset() {
185 let slf = expect_context::<Self>();
186 slf.current.lock().1.clear();
187 slf.values.lock().retain(|_, v| {
188 v.update_untracked(|v| {
189 if v.is_none() {
190 *v = Some(Err("ABORTED".to_string()));
191 }
192 });
193 false
194 });
195 }
196
197 pub fn inline_math(s: &str) -> (usize, RwSignal<Option<Result<String, String>>>) {
198 Self::run(TeXMath::Inline(s.trim().to_string()))
199 }
200
201 pub fn block_math(s: &str) -> (usize, RwSignal<Option<Result<String, String>>>) {
202 Self::run(TeXMath::Block(s.trim().to_string()))
203 }
204
205 fn run(msg: TeXMath) -> (usize, RwSignal<Option<Result<String, String>>>) {
206 let slf = expect_context::<Self>();
207 let counter = slf
208 .counter
209 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
210 let sig = if let Some(sig) = slf.cache.get(&msg) {
211 if sig.with_untracked(|v| {
212 v.as_ref()
213 .is_some_and(|v| v.as_ref().is_ok_and(|v| v == "ABORTED"))
214 }) {
215 sig.update_untracked(|v| *v = None);
216 *sig
217 } else {
218 return (counter, *sig);
219 }
220 } else {
221 let sig = RwSignal::new(None);
222 slf.cache.insert(msg.clone(), sig);
223 sig
224 };
225 {
226 slf.values.lock().insert(counter, sig);
227 let (ts, curr) = &mut *slf.current.lock();
228 *ts = Timestamp::now();
229 curr.push((counter, msg));
230 }
231 (counter, sig)
232 }
233}
234
235#[cfg(feature = "ssr")]
238#[derive(Debug)]
239enum MaybeMath {
240 Done(flams_stex::math::RusTeXMath),
241 Pending(Vec<(usize, TeXMath)>),
242}
243
244#[derive(Debug, Clone)]
245pub struct MathSocket {
246 #[cfg(all(feature = "hydrate", not(feature = "docs-only")))]
247 socket: send_wrapper::SendWrapper<leptos::web_sys::WebSocket>,
248 #[cfg(all(feature = "hydrate", feature = "docs-only"))]
249 socket: (),
250 #[cfg(feature = "ssr")]
251 rustex: std::sync::Arc<parking_lot::Mutex<MaybeMath>>,
252 #[cfg(feature = "ssr")]
253 queue: std::sync::Arc<parking_lot::Mutex<Vec<(usize, Result<String, String>)>>>,
254 #[cfg(feature = "hydrate")]
255 initialized: std::sync::Arc<parking_lot::Mutex<Option<Vec<(usize, TeXMath)>>>>,
256}
257
258impl flams_router_base::ws::WebSocket<(usize, TeXMath), (usize, Result<String, String>)>
259 for MathSocket
260{
261 const SERVER_ENDPOINT: &str = "/ws/mathjx";
262 const TIMEOUT: f32 = 10.0;
263}
264
265#[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
266pub enum TeXMath {
267 Inline(String),
268 Block(String),
269 UseModule(ModuleUri),
270}
271
272#[cfg(feature = "ssr")]
273impl MathSocket {
274 fn run_arg(
275 slf: &flams_stex::math::RusTeXMath,
276 i: usize,
277 tm: TeXMath,
278 ) -> Option<(usize, Result<String, String>)> {
279 use ftml_ontology::utils::time::measure;
280 match tm {
281 TeXMath::UseModule(m) => {
282 slf.add_usemodule(&m);
284 None
285 }
286 TeXMath::Inline(il) => {
287 let r = slf.run(&format!("${il}$")); Some((i, r))
291 }
292 TeXMath::Block(bl) => {
293 let r=slf.run(&format!("\\[{bl}\\]")).map(|mut s| {
296 s.insert_str(0, "<div class=\"rustex-display\">");
297 s.push_str("</div>");
298 s
299 });
301 Some((i, r))
303 }
304 }
305 }
306}
307
308#[cfg(feature = "ssr")]
309#[async_trait::async_trait]
310impl flams_router_base::ws::WebSocketServer<(usize, TeXMath), (usize, Result<String, String>)>
311 for MathSocket
312{
313 async fn new(
314 account: flams_router_base::LoginState,
315 _db: flams_database::DBBackend,
316 ) -> Option<Self> {
317 use flams_router_base::LoginState;
318 match account {
319 LoginState::Admin | LoginState::NoAccounts | LoginState::User { .. } => {
320 let rustex =
321 std::sync::Arc::new(parking_lot::Mutex::new(MaybeMath::Pending(Vec::new())));
322 let queue = std::sync::Arc::new(parking_lot::Mutex::new(Vec::new()));
323 let rst = rustex.clone();
324 let q2 = queue.clone();
325 let _ = tokio::task::spawn_blocking(move || {
326 let r = flams_stex::math::RusTeXMath::default();
327 loop {
328 let mut lock = rst.lock();
329 if let MaybeMath::Pending(v) = &mut *lock
331 && !v.is_empty()
332 {
333 let (i, msg) = v.remove(0);
334 drop(lock);
335 if let Some(s) = Self::run_arg(&r, i, msg) {
336 q2.lock().push(s);
337 }
338 } else {
339 *lock = MaybeMath::Done(r);
340 break;
341 }
342 }
343 });
344 Some(Self {
345 rustex,
346 queue,
347 #[cfg(feature = "hydrate")]
348 socket: unreachable!(),
349 #[cfg(all(feature = "hydrate", feature = "docs-only"))]
350 initialized: unreachable!(),
351 })
352 }
353 _ => None,
354 }
355 }
356
357 #[inline]
358 async fn next(&mut self) -> Option<(usize, Result<String, String>)> {
359 let v = {
360 let mut lock = self.queue.lock();
361 if !lock.is_empty() {
362 Some(lock.remove(0))
363 } else {
364 None
365 }
366 };
367 if let Some(v) = v {
368 Some(v)
369 } else {
370 tokio::time::sleep(std::time::Duration::from_secs_f32(3.5)).await;
372 None
373 }
374 }
375
376 async fn handle_message(
377 &mut self,
378 (i, msg): (usize, TeXMath),
379 ) -> Option<(usize, Result<String, String>)> {
380 let rs = self.rustex.clone();
381 tokio::task::spawn_blocking(move || {
382 let mut lock = rs.lock();
383 match &mut *lock {
384 MaybeMath::Done(rs) => {
385 Self::run_arg(rs, i, msg)
388 }
389 MaybeMath::Pending(v) => {
390 v.push((i, msg));
392 None
393 }
394 }
395 })
396 .await
397 .ok()
398 .flatten()
399 }
400}
401
402#[cfg(feature = "hydrate")]
403impl flams_router_base::ws::WebSocketClient<(usize, TeXMath), (usize, Result<String, String>)>
404 for MathSocket
405{
406 fn new(ws: leptos::web_sys::WebSocket) -> Self {
407 Self {
408 #[cfg(not(feature = "docs-only"))]
409 socket: send_wrapper::SendWrapper::new(ws),
410 #[cfg(feature = "docs-only")]
411 socket: (),
412 initialized: std::sync::Arc::new(parking_lot::Mutex::new(Some(Vec::new()))),
413 #[cfg(feature = "ssr")]
414 rustex: unreachable!(),
415 #[cfg(feature = "ssr")]
416 queue: unreachable!(),
417 }
418 }
419 fn socket(&mut self) -> &mut leptos::web_sys::WebSocket {
420 #[cfg(not(feature = "docs-only"))]
421 {
422 &mut self.socket
423 }
424 #[cfg(feature = "docs-only")]
425 {
426 unreachable!()
427 }
428 }
429 fn on_open(&self) -> Option<Box<dyn FnMut()>> {
430 let mut slf = self.clone();
431 Some(Box::new(move || {
432 let v = slf.initialized.lock().take();
433 if let Some(v) = v {
434 for msg in v {
435 slf.send(&msg);
437 }
438 }
439 }) as _)
440 }
441}
442
443#[derive(Debug, Clone)]
444struct MathState {
445 counter: std::sync::Arc<AtomicUsize>,
446 values: std::sync::Arc<
447 parking_lot::Mutex<BTreeMap<usize, RwSignal<Option<Result<String, String>>>>>,
448 >,
449 hash: std::sync::Arc<
450 dashmap::DashMap<
451 TeXMath,
452 RwSignal<Option<Result<String, String>>>,
453 rustc_hash::FxBuildHasher,
454 >,
455 >,
456}
457
458#[cfg(feature = "hydrate")]
459impl MathSocket {
460 pub fn add_module(uri: ModuleUri) {
461 #[cfg(not(feature = "docs-only"))]
462 {
463 use flams_router_base::ws::WebSocketClient;
464 let slf = expect_context::<leptos::prelude::RwSignal<Self>>();
465 let state = expect_context::<MathState>();
466 let msg = TeXMath::UseModule(uri);
467 if state.hash.contains_key(&msg) {
468 return;
469 }
470 state.hash.insert(msg.clone(), RwSignal::new(None));
471 let counter = state
472 .counter
473 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
474 slf.update_untracked(move |slf| {
475 {
476 if let Some(v) = &mut *slf.initialized.lock() {
477 v.push((counter, msg));
478 return;
479 }
480 }
481 slf.send(&(counter, msg));
483 })
484 }
485 #[cfg(feature = "docs-only")]
486 {
487 unreachable!()
488 }
489 }
490
491 pub fn inline_math(s: &str) -> (usize, RwSignal<Option<Result<String, String>>>) {
492 #[cfg(not(feature = "docs-only"))]
493 {
494 use flams_router_base::ws::WebSocketClient;
495 let state = expect_context::<MathState>();
496 let counter = state
497 .counter
498 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
499 let msg = TeXMath::Inline(s.trim().to_string());
500 if let Some(sig) = state.hash.get(&msg) {
501 return (counter, *sig);
502 }
503 let sig = RwSignal::new(None);
504 state.hash.insert(msg.clone(), sig);
505 let slf = expect_context::<leptos::prelude::RwSignal<Self>>();
506 slf.update_untracked(move |slf| {
507 state.values.lock().insert(counter, sig);
508 {
509 if let Some(v) = &mut *slf.initialized.lock() {
510 v.push((counter, msg));
511 return;
512 }
513 }
514 slf.send(&(counter, msg));
516 });
517 (counter, sig)
518 }
519 #[cfg(feature = "docs-only")]
520 {
521 unreachable!()
522 }
523 }
524
525 pub fn block_math(s: &str) -> (usize, RwSignal<Option<Result<String, String>>>) {
526 #[cfg(not(feature = "docs-only"))]
527 {
528 use flams_router_base::ws::WebSocketClient;
529 let state = expect_context::<MathState>();
530 let counter = state
531 .counter
532 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
533 let msg = TeXMath::Block(s.trim().to_string());
534 if let Some(sig) = state.hash.get(&msg) {
535 return (counter, *sig);
536 }
537 let sig = RwSignal::new(None);
538 state.hash.insert(msg.clone(), sig);
539 let slf = expect_context::<leptos::prelude::RwSignal<Self>>();
540 slf.update_untracked(move |slf| {
541 state.values.lock().insert(counter, sig);
542 state.values.lock().insert(counter, sig);
543 {
544 if let Some(v) = &mut *slf.initialized.lock() {
545 v.push((counter, msg));
546 return;
547 }
548 }
549 slf.send(&(counter, msg));
551 });
552 (counter, sig)
553 }
554 #[cfg(feature = "docs-only")]
555 {
556 unreachable!()
557 }
558 }
559
560 pub fn run() {
561 #[cfg(not(feature = "docs-only"))]
562 {
563 use flams_router_base::ws::WebSocketClient;
564 let values = std::sync::Arc::new(parking_lot::Mutex::default());
565 let hash = std::sync::Arc::new(dashmap::DashMap::default());
566 provide_context(MathState {
567 counter: std::sync::Arc::default(),
568 values: values.clone(),
569 hash: hash.clone(),
570 });
571 let slf = Self::start(move |(i, msg)| {
572 if let Some(v) = values.lock().remove(&i) {
574 let _ = v.try_set(Some(msg));
575 } else {
576 tracing::error!("Unexpected websocket message");
577 };
578 None
579 })
580 .expect("failed to open websocket");
581 leptos::prelude::provide_context(leptos::prelude::RwSignal::new(slf));
582 }
583 #[cfg(feature = "docs-only")]
584 {
585 unreachable!()
586 }
587 }
588}