flams_flodown/
math.rs

1#[cfg(feature = "hydrate")]
2use ftml_ontology::utils::time::Timestamp;
3use ftml_uris::ModuleUri;
4use leptos::prelude::*;
5use std::{collections::BTreeMap, sync::atomic::AtomicUsize};
6
7const SERVER_ENDPOINT: &str = "/ws/mathjx";
8
9#[cfg(feature = "ssr")]
10use flams_router_base::ws::WSSocket;
11
12#[cfg(feature = "ssr")]
13pub(crate) static TEX_SPAN: std::sync::LazyLock<tracing::Span> =
14    std::sync::LazyLock::new(|| tracing::info_span!(target:"mathjx",parent:None,"mathjx"));
15
16#[cfg(feature = "ssr")]
17#[derive(Clone)]
18pub struct TeXSocket {
19    socket: WSSocket<(usize, TeXMath), (usize, Result<String, String>)>,
20    tex: std::sync::Arc<flams_stex::math::RusTeXMath>,
21}
22
23#[cfg(feature = "ssr")]
24impl flams_router_base::ws::WSServerSocket<(usize, TeXMath), (usize, Result<String, String>)>
25    for TeXSocket
26{
27    #[inline]
28    fn span(&self) -> Option<&'static tracing::Span> {
29        Some(&*TEX_SPAN)
30    }
31
32    async fn new(
33        socket: flams_router_base::ws::WSSocket<(usize, TeXMath), (usize, Result<String, String>)>,
34    ) -> Self {
35        tokio::task::spawn_blocking(move || {
36            TEX_SPAN.in_scope(move || Self {
37                socket,
38                tex: std::sync::Arc::new(flams_stex::math::RusTeXMath::default()),
39            })
40        })
41        .await
42        .expect("this should not happen")
43    }
44    async fn handle(&self, (i, msg): (usize, TeXMath)) -> bool {
45        let s = self.clone();
46        tokio::task::spawn_blocking(move || {
47            TEX_SPAN.in_scope(move || {
48                tracing::info!("Received {:?}", msg);
49                if let Some(r) = match msg {
50                    TeXMath::UseModule(m) => {
51                        s.tex.add_usemodule(&m);
52                        None
53                    }
54                    TeXMath::Inline(il) => {
55                        let r = s.tex.run(&format!("${il}$"));
56                        Some((i, r))
57                    }
58                    TeXMath::Block(bl) => {
59                        let r = s.tex.run(&format!("\\[{bl}\\]")).map(|mut s| {
60                            s.insert_str(0, "<div class=\"rustex-display\">");
61                            s.push_str("</div>");
62                            s
63                        });
64                        Some((i, r))
65                    }
66                } {
67                    tracing::info!("Returning {:?}", r);
68                    s.socket.send(r);
69                }
70                true
71            })
72        })
73        .await
74        .expect("this should not happen")
75    }
76}
77
78#[cfg(feature = "hydrate")]
79#[derive(Debug, Clone)]
80pub struct TeXClient {
81    socket: send_wrapper::SendWrapper<
82        flams_router_base::ws::WSClient<(usize, TeXMath), (usize, Result<String, String>)>,
83    >,
84    counter: std::sync::Arc<AtomicUsize>,
85    values: std::sync::Arc<
86        parking_lot::Mutex<BTreeMap<usize, RwSignal<Option<Result<String, String>>>>>,
87    >,
88    cache: std::sync::Arc<
89        dashmap::DashMap<
90            TeXMath,
91            RwSignal<Option<Result<String, String>>>,
92            rustc_hash::FxBuildHasher,
93        >,
94    >,
95    current: std::sync::Arc<parking_lot::Mutex<(Timestamp, Vec<(usize, TeXMath)>)>>,
96    drop: DropFlag,
97}
98
99#[cfg(feature = "hydrate")]
100#[derive(Clone, Debug, Default)]
101struct DropFlag(std::sync::Arc<std::sync::atomic::AtomicBool>);
102#[cfg(feature = "hydrate")]
103impl Drop for DropFlag {
104    fn drop(&mut self) {
105        if std::sync::Arc::strong_count(&self.0) == 1 {
106            self.0.store(true, std::sync::atomic::Ordering::Release);
107        }
108    }
109}
110#[cfg(feature = "hydrate")]
111impl DropFlag {
112    pub fn get(&self) -> bool {
113        self.0.load(std::sync::atomic::Ordering::Acquire)
114    }
115}
116
117#[cfg(feature = "hydrate")]
118impl TeXClient {
119    pub fn provide() {
120        let counter = std::sync::Arc::default();
121        let values: std::sync::Arc<
122            parking_lot::Mutex<BTreeMap<usize, RwSignal<Option<Result<String, String>>>>>,
123        > = std::sync::Arc::default();
124        let cache: std::sync::Arc<
125            dashmap::DashMap<
126                TeXMath,
127                RwSignal<Option<Result<String, String>>>,
128                rustc_hash::FxBuildHasher,
129            >,
130        > = std::sync::Arc::default();
131        let drop = DropFlag::default();
132        let current: std::sync::Arc<parking_lot::Mutex<(Timestamp, Vec<(usize, TeXMath)>)>> =
133            std::sync::Arc::default();
134
135        let valuecl = values.clone();
136        let socket = flams_router_base::ws::WSClient::<
137            (usize, TeXMath),
138            (usize, Result<String, String>),
139        >::new(SERVER_ENDPOINT, move |(i, res)| {
140            let Some(e) = valuecl.lock().remove(&i) else {
141                tracing::warn!("Signal {i} disappeared!");
142                return;
143            };
144            e.set(Some(res));
145        })
146        .expect("should be impossible");
147
148        let dropcl = drop.clone();
149        let currentcl = current.clone();
150        let socket_cl = socket.clone();
151        wasm_bindgen_futures::spawn_local(async move {
152            tracing::info!("Running wasm future");
153            loop {
154                if dropcl.get() {
155                    tracing::info!("Dropping future");
156                    break;
157                }
158                gloo_timers::future::TimeoutFuture::new(500).await;
159                let (ts, v) = &mut *currentcl.lock();
160                tracing::debug!("Checking {ts} (now: {})", Timestamp::now());
161                if ts.since_now().seconds() >= 0.5 && !v.is_empty() {
162                    tracing::debug!("Sending things");
163                    for msg in v.drain(..) {
164                        socket_cl.send(&msg);
165                    }
166                }
167            }
168        });
169
170        let slf = Self {
171            socket: send_wrapper::SendWrapper::new(socket),
172            counter,
173            cache,
174            values,
175            current,
176            drop,
177        };
178        provide_context(slf);
179    }
180    pub fn add_import(uri: ModuleUri) {
181        let slf = expect_context::<Self>();
182        slf.socket.send(&(0, TeXMath::UseModule(uri)));
183    }
184    pub fn reset() {
185        let slf = expect_context::<Self>();
186        slf.current.lock().1.clear();
187        slf.values.lock().retain(|_, v| {
188            v.update_untracked(|v| {
189                if v.is_none() {
190                    *v = Some(Err("ABORTED".to_string()));
191                }
192            });
193            false
194        });
195    }
196
197    pub fn inline_math(s: &str) -> (usize, RwSignal<Option<Result<String, String>>>) {
198        Self::run(TeXMath::Inline(s.trim().to_string()))
199    }
200
201    pub fn block_math(s: &str) -> (usize, RwSignal<Option<Result<String, String>>>) {
202        Self::run(TeXMath::Block(s.trim().to_string()))
203    }
204
205    fn run(msg: TeXMath) -> (usize, RwSignal<Option<Result<String, String>>>) {
206        let slf = expect_context::<Self>();
207        let counter = slf
208            .counter
209            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
210        let sig = if let Some(sig) = slf.cache.get(&msg) {
211            if sig.with_untracked(|v| {
212                v.as_ref()
213                    .is_some_and(|v| v.as_ref().is_ok_and(|v| v == "ABORTED"))
214            }) {
215                sig.update_untracked(|v| *v = None);
216                *sig
217            } else {
218                return (counter, *sig);
219            }
220        } else {
221            let sig = RwSignal::new(None);
222            slf.cache.insert(msg.clone(), sig);
223            sig
224        };
225        {
226            slf.values.lock().insert(counter, sig);
227            let (ts, curr) = &mut *slf.current.lock();
228            *ts = Timestamp::now();
229            curr.push((counter, msg));
230        }
231        (counter, sig)
232    }
233}
234
235// ---------------------------------------------------------------------------
236
237#[cfg(feature = "ssr")]
238#[derive(Debug)]
239enum MaybeMath {
240    Done(flams_stex::math::RusTeXMath),
241    Pending(Vec<(usize, TeXMath)>),
242}
243
244#[derive(Debug, Clone)]
245pub struct MathSocket {
246    #[cfg(all(feature = "hydrate", not(feature = "docs-only")))]
247    socket: send_wrapper::SendWrapper<leptos::web_sys::WebSocket>,
248    #[cfg(all(feature = "hydrate", feature = "docs-only"))]
249    socket: (),
250    #[cfg(feature = "ssr")]
251    rustex: std::sync::Arc<parking_lot::Mutex<MaybeMath>>,
252    #[cfg(feature = "ssr")]
253    queue: std::sync::Arc<parking_lot::Mutex<Vec<(usize, Result<String, String>)>>>,
254    #[cfg(feature = "hydrate")]
255    initialized: std::sync::Arc<parking_lot::Mutex<Option<Vec<(usize, TeXMath)>>>>,
256}
257
258impl flams_router_base::ws::WebSocket<(usize, TeXMath), (usize, Result<String, String>)>
259    for MathSocket
260{
261    const SERVER_ENDPOINT: &str = "/ws/mathjx";
262    const TIMEOUT: f32 = 10.0;
263}
264
265#[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
266pub enum TeXMath {
267    Inline(String),
268    Block(String),
269    UseModule(ModuleUri),
270}
271
272#[cfg(feature = "ssr")]
273impl MathSocket {
274    fn run_arg(
275        slf: &flams_stex::math::RusTeXMath,
276        i: usize,
277        tm: TeXMath,
278    ) -> Option<(usize, Result<String, String>)> {
279        use ftml_ontology::utils::time::measure;
280        match tm {
281            TeXMath::UseModule(m) => {
282                //println!("Doing UseModule {m}");
283                slf.add_usemodule(&m);
284                None
285            }
286            TeXMath::Inline(il) => {
287                //println!("Doing ${il}$");
288                let r = slf.run(&format!("${il}$")); //(r, t) = measure(|| slf.run(&format!("${il}$")));
289                //println!("Done ${il}$ after {t}");
290                Some((i, r))
291            }
292            TeXMath::Block(bl) => {
293                //println!("Doing $${bl}$$");
294                let r=//(r, t) = measure(|| {
295                    slf.run(&format!("\\[{bl}\\]")).map(|mut s| {
296                        s.insert_str(0, "<div class=\"rustex-display\">");
297                        s.push_str("</div>");
298                        s
299                        //})
300                });
301                //println!("Done $${bl}$$ after {t}");
302                Some((i, r))
303            }
304        }
305    }
306}
307
308#[cfg(feature = "ssr")]
309#[async_trait::async_trait]
310impl flams_router_base::ws::WebSocketServer<(usize, TeXMath), (usize, Result<String, String>)>
311    for MathSocket
312{
313    async fn new(
314        account: flams_router_base::LoginState,
315        _db: flams_database::DBBackend,
316    ) -> Option<Self> {
317        use flams_router_base::LoginState;
318        match account {
319            LoginState::Admin | LoginState::NoAccounts | LoginState::User { .. } => {
320                let rustex =
321                    std::sync::Arc::new(parking_lot::Mutex::new(MaybeMath::Pending(Vec::new())));
322                let queue = std::sync::Arc::new(parking_lot::Mutex::new(Vec::new()));
323                let rst = rustex.clone();
324                let q2 = queue.clone();
325                let _ = tokio::task::spawn_blocking(move || {
326                    let r = flams_stex::math::RusTeXMath::default();
327                    loop {
328                        let mut lock = rst.lock();
329                        //let mut queue = q2.lock();
330                        if let MaybeMath::Pending(v) = &mut *lock
331                            && !v.is_empty()
332                        {
333                            let (i, msg) = v.remove(0);
334                            drop(lock);
335                            if let Some(s) = Self::run_arg(&r, i, msg) {
336                                q2.lock().push(s);
337                            }
338                        } else {
339                            *lock = MaybeMath::Done(r);
340                            break;
341                        }
342                    }
343                });
344                Some(Self {
345                    rustex,
346                    queue,
347                    #[cfg(feature = "hydrate")]
348                    socket: unreachable!(),
349                    #[cfg(all(feature = "hydrate", feature = "docs-only"))]
350                    initialized: unreachable!(),
351                })
352            }
353            _ => None,
354        }
355    }
356
357    #[inline]
358    async fn next(&mut self) -> Option<(usize, Result<String, String>)> {
359        let v = {
360            let mut lock = self.queue.lock();
361            if !lock.is_empty() {
362                Some(lock.remove(0))
363            } else {
364                None
365            }
366        };
367        if let Some(v) = v {
368            Some(v)
369        } else {
370            // hack to avoid async channels - 3.5 > Self::TIMEOUT
371            tokio::time::sleep(std::time::Duration::from_secs_f32(3.5)).await;
372            None
373        }
374    }
375
376    async fn handle_message(
377        &mut self,
378        (i, msg): (usize, TeXMath),
379    ) -> Option<(usize, Result<String, String>)> {
380        let rs = self.rustex.clone();
381        tokio::task::spawn_blocking(move || {
382            let mut lock = rs.lock();
383            match &mut *lock {
384                MaybeMath::Done(rs) => {
385                    //let rs = rs.clone();
386                    //drop(lock);
387                    Self::run_arg(rs, i, msg)
388                }
389                MaybeMath::Pending(v) => {
390                    //println!("Deferring {msg:?}");
391                    v.push((i, msg));
392                    None
393                }
394            }
395        })
396        .await
397        .ok()
398        .flatten()
399    }
400}
401
402#[cfg(feature = "hydrate")]
403impl flams_router_base::ws::WebSocketClient<(usize, TeXMath), (usize, Result<String, String>)>
404    for MathSocket
405{
406    fn new(ws: leptos::web_sys::WebSocket) -> Self {
407        Self {
408            #[cfg(not(feature = "docs-only"))]
409            socket: send_wrapper::SendWrapper::new(ws),
410            #[cfg(feature = "docs-only")]
411            socket: (),
412            initialized: std::sync::Arc::new(parking_lot::Mutex::new(Some(Vec::new()))),
413            #[cfg(feature = "ssr")]
414            rustex: unreachable!(),
415            #[cfg(feature = "ssr")]
416            queue: unreachable!(),
417        }
418    }
419    fn socket(&mut self) -> &mut leptos::web_sys::WebSocket {
420        #[cfg(not(feature = "docs-only"))]
421        {
422            &mut self.socket
423        }
424        #[cfg(feature = "docs-only")]
425        {
426            unreachable!()
427        }
428    }
429    fn on_open(&self) -> Option<Box<dyn FnMut()>> {
430        let mut slf = self.clone();
431        Some(Box::new(move || {
432            let v = slf.initialized.lock().take();
433            if let Some(v) = v {
434                for msg in v {
435                    //tracing::warn!("Sending message: {msg:?}");
436                    slf.send(&msg);
437                }
438            }
439        }) as _)
440    }
441}
442
443#[derive(Debug, Clone)]
444struct MathState {
445    counter: std::sync::Arc<AtomicUsize>,
446    values: std::sync::Arc<
447        parking_lot::Mutex<BTreeMap<usize, RwSignal<Option<Result<String, String>>>>>,
448    >,
449    hash: std::sync::Arc<
450        dashmap::DashMap<
451            TeXMath,
452            RwSignal<Option<Result<String, String>>>,
453            rustc_hash::FxBuildHasher,
454        >,
455    >,
456}
457
458#[cfg(feature = "hydrate")]
459impl MathSocket {
460    pub fn add_module(uri: ModuleUri) {
461        #[cfg(not(feature = "docs-only"))]
462        {
463            use flams_router_base::ws::WebSocketClient;
464            let slf = expect_context::<leptos::prelude::RwSignal<Self>>();
465            let state = expect_context::<MathState>();
466            let msg = TeXMath::UseModule(uri);
467            if state.hash.contains_key(&msg) {
468                return;
469            }
470            state.hash.insert(msg.clone(), RwSignal::new(None));
471            let counter = state
472                .counter
473                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
474            slf.update_untracked(move |slf| {
475                {
476                    if let Some(v) = &mut *slf.initialized.lock() {
477                        v.push((counter, msg));
478                        return;
479                    }
480                }
481                //tracing::warn!("Sending message: {msg:?}");
482                slf.send(&(counter, msg));
483            })
484        }
485        #[cfg(feature = "docs-only")]
486        {
487            unreachable!()
488        }
489    }
490
491    pub fn inline_math(s: &str) -> (usize, RwSignal<Option<Result<String, String>>>) {
492        #[cfg(not(feature = "docs-only"))]
493        {
494            use flams_router_base::ws::WebSocketClient;
495            let state = expect_context::<MathState>();
496            let counter = state
497                .counter
498                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
499            let msg = TeXMath::Inline(s.trim().to_string());
500            if let Some(sig) = state.hash.get(&msg) {
501                return (counter, *sig);
502            }
503            let sig = RwSignal::new(None);
504            state.hash.insert(msg.clone(), sig);
505            let slf = expect_context::<leptos::prelude::RwSignal<Self>>();
506            slf.update_untracked(move |slf| {
507                state.values.lock().insert(counter, sig);
508                {
509                    if let Some(v) = &mut *slf.initialized.lock() {
510                        v.push((counter, msg));
511                        return;
512                    }
513                }
514                //tracing::warn!("Sending message: {msg:?}");
515                slf.send(&(counter, msg));
516            });
517            (counter, sig)
518        }
519        #[cfg(feature = "docs-only")]
520        {
521            unreachable!()
522        }
523    }
524
525    pub fn block_math(s: &str) -> (usize, RwSignal<Option<Result<String, String>>>) {
526        #[cfg(not(feature = "docs-only"))]
527        {
528            use flams_router_base::ws::WebSocketClient;
529            let state = expect_context::<MathState>();
530            let counter = state
531                .counter
532                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
533            let msg = TeXMath::Block(s.trim().to_string());
534            if let Some(sig) = state.hash.get(&msg) {
535                return (counter, *sig);
536            }
537            let sig = RwSignal::new(None);
538            state.hash.insert(msg.clone(), sig);
539            let slf = expect_context::<leptos::prelude::RwSignal<Self>>();
540            slf.update_untracked(move |slf| {
541                state.values.lock().insert(counter, sig);
542                state.values.lock().insert(counter, sig);
543                {
544                    if let Some(v) = &mut *slf.initialized.lock() {
545                        v.push((counter, msg));
546                        return;
547                    }
548                }
549                //tracing::warn!("Sending message: {msg:?}");
550                slf.send(&(counter, msg));
551            });
552            (counter, sig)
553        }
554        #[cfg(feature = "docs-only")]
555        {
556            unreachable!()
557        }
558    }
559
560    pub fn run() {
561        #[cfg(not(feature = "docs-only"))]
562        {
563            use flams_router_base::ws::WebSocketClient;
564            let values = std::sync::Arc::new(parking_lot::Mutex::default());
565            let hash = std::sync::Arc::new(dashmap::DashMap::default());
566            provide_context(MathState {
567                counter: std::sync::Arc::default(),
568                values: values.clone(),
569                hash: hash.clone(),
570            });
571            let slf = Self::start(move |(i, msg)| {
572                //tracing::warn!("Received message {i}: {msg:?}");
573                if let Some(v) = values.lock().remove(&i) {
574                    let _ = v.try_set(Some(msg));
575                } else {
576                    tracing::error!("Unexpected websocket message");
577                };
578                None
579            })
580            .expect("failed to open websocket");
581            leptos::prelude::provide_context(leptos::prelude::RwSignal::new(slf));
582        }
583        #[cfg(feature = "docs-only")]
584        {
585            unreachable!()
586        }
587    }
588}