flams_math_archives/
manager.rs

1use std::path::{Path, PathBuf};
2
3use crate::{
4    Archive, MathArchive,
5    backend::LocalBackend,
6    document_file::DocumentFile,
7    formats::SourceFormatId,
8    mathhub::mathhubs,
9    source_files::FileStates,
10    utils::{
11        AsyncEngine,
12        errors::{BackendError, ManifestParseError, NewArchiveError},
13        path_ext::{PathExt, RelPath},
14    },
15};
16#[cfg(feature = "deepsize")]
17use flams_backend_types::ManagerCacheSize;
18use flams_backend_types::archive_json::{ArchiveIndex, Institution};
19use ftml_ontology::{
20    domain::modules::Module,
21    utils::{
22        RefTree, TreeChild,
23        awaitable::{AsyncCache, MaybeValue},
24    },
25};
26use ftml_uris::{ArchiveId, ArchiveUri, BaseUri, DocumentUri, ModuleUri, UriPath, UriWithArchive};
27
28#[derive(Debug)]
29pub struct ArchiveManager {
30    pub(crate) tree: parking_lot::RwLock<ArchiveTree>,
31    //pub(crate) modules: AsyncCache<ModuleUri, Module, BackendError>,
32    //pub(crate) documents: AsyncCache<DocumentUri, triomphe::Arc<DocumentFile>, BackendError>,
33    #[cfg(feature = "rdf")]
34    triple_store: crate::triple_store::RDFStore,
35}
36
37#[cfg(feature = "rocksdb")]
38impl ArchiveManager {
39    pub fn new(rdf_path: &Path) -> Self {
40        Self {
41            tree: parking_lot::RwLock::new(ArchiveTree::default()),
42            //modules: AsyncCache::new(2048),
43            //documents: AsyncCache::new(4096),
44            #[cfg(feature = "rdf")]
45            triple_store: crate::triple_store::RDFStore::new(rdf_path),
46        }
47    }
48}
49
50impl Default for ArchiveManager {
51    fn default() -> Self {
52        Self {
53            tree: parking_lot::RwLock::new(ArchiveTree::default()),
54            //modules: AsyncCache::new(2048),
55            //documents: AsyncCache::new(4096),
56            #[cfg(feature = "rdf")]
57            triple_store: crate::triple_store::RDFStore::default(),
58        }
59    }
60}
61
62impl ArchiveManager {
63    #[cfg(feature = "deepsize")]
64    pub fn memory(&self) -> ManagerCacheSize {
65        use deepsize::DeepSizeOf;
66        let relations = self.triple_store.num_relations();
67        let mut num_modules = 0;
68        let mut modules_bytes = 0;
69        /*
70        self.modules.all(|_, v| {
71            num_modules += 1;
72            if let MaybeValue::Done(Ok(m)) = &*v.read() {
73                modules_bytes += m.deep_size_of();
74            }
75        });
76        */
77        let mut num_documents = 0;
78        let mut documents_bytes = 0;
79        /*
80        self.documents.all(|_, v| {
81            num_documents += 1;
82            if let MaybeValue::Done(Ok(d)) = &*v.read() {
83                documents_bytes += d.deep_size_of();
84            }
85        });
86         */
87        ManagerCacheSize {
88            num_modules,
89            modules_bytes,
90            num_documents,
91            documents_bytes,
92            relations,
93        }
94    }
95
96    #[inline]
97    #[must_use]
98    pub fn all_archives(&self) -> impl std::ops::Deref<Target = [Archive]> + '_ {
99        parking_lot::RwLockReadGuard::map(self.tree.read(), |s| s.archives.as_slice())
100    }
101
102    #[cfg(feature = "rdf")]
103    #[inline]
104    #[must_use]
105    pub const fn triple_store(&self) -> &crate::triple_store::RDFStore {
106        &self.triple_store
107    }
108
109    #[inline]
110    pub fn with_tree<R>(&self, f: impl FnOnce(&ArchiveTree) -> R) -> R {
111        f(&self.tree.read())
112    }
113
114    pub fn reinit<R>(&self, f: impl FnOnce(&mut ArchiveTree) -> R, paths: &[&Path]) -> R {
115        let ls = self.tree.read().load(paths, false);
116        let mut tree = self.tree.write();
117        let r = f(&mut tree);
118        tree.archives.clear();
119        tree.top.clear();
120        *tree.index.write() = None;
121        //self.modules.clear();
122        //self.documents.clear();
123        #[cfg(feature = "rdf")]
124        self.triple_store.clear();
125        for a in ls.into_iter().flatten() {
126            tree.insert(
127                a,
128                #[cfg(feature = "rdf")]
129                &self.triple_store,
130            );
131        }
132        r
133    }
134    /*
135    pub(crate) fn load_document(
136        &self,
137        archive: &ArchiveUri,
138        path: Option<&UriPath>,
139        language: Language,
140        name: &SimpleUriName,
141    ) -> Option<UncheckedDocument> {
142        self.with_archive(archive.archive_id(), |a| {
143            let Some(a) = a else {
144                return Err(crate::BackendError::ArchiveNotFound);
145            };
146            a.load_document(path, name, language)
147        })
148    }
149     */
150
151    pub(crate) fn load_module(
152        &self,
153        archive: &ArchiveUri,
154        path: Option<&UriPath>,
155        name: &str,
156    ) -> Result<Module, crate::BackendError> {
157        self.with_archive(archive.archive_id(), |a| {
158            let Some(a) = a else {
159                return Err(crate::BackendError::ArchiveNotFound);
160            };
161            a.load_module(path, name)
162        })
163    }
164    pub(crate) fn load_module_async<A: AsyncEngine>(
165        &self,
166        archive: &ArchiveUri,
167        path: Option<&UriPath>,
168        name: &str,
169    ) -> impl Future<Output = Result<Module, crate::BackendError>> + 'static + use<A> {
170        self.with_archive(archive.archive_id(), |a| {
171            let Some(a) = a else {
172                return either::Left(std::future::ready(Err(
173                    crate::BackendError::ArchiveNotFound,
174                )));
175            };
176            either::Right(a.load_module_async::<A>(path, name))
177        })
178    }
179
180    /// # Errors
181    pub fn load_one(&self, manifest: &Path, rel_path: RelPath) -> Result<(), ManifestParseError> {
182        let a = crate::manifest::parse_manifest(manifest, rel_path)?;
183        if let Archive::Local(a) = &a {
184            a.update_sources();
185        }
186        let mut tree = self.tree.write();
187        *tree.index.write() = None;
188        tree.insert(
189            a,
190            #[cfg(feature = "rdf")]
191            &self.triple_store,
192        );
193        Ok(())
194    }
195
196    pub fn load(&self, paths: &[&Path]) {
197        let ls = self.tree.read().load(paths, true);
198        let mut lock = self.tree.write();
199        for a in ls.into_iter().flatten() {
200            lock.insert(
201                a,
202                #[cfg(feature = "rdf")]
203                &self.triple_store,
204            );
205        }
206    }
207
208    /// # Errors
209    /// # Panics
210    pub fn new_archive(
211        &self,
212        id: &ArchiveId,
213        base_uri: &BaseUri,
214        format: SourceFormatId,
215        default_file: &str,
216        content: &str,
217    ) -> Result<PathBuf, NewArchiveError> {
218        use std::io::Write;
219        let mh = *mathhubs().first().ok_or(NewArchiveError::NoMathHub)?;
220        let meta_inf = id
221            .steps()
222            .fold(mh.to_path_buf(), |p, s| p.join(s))
223            .join("META-INF");
224        // SAFETY: we constructed the path as a descendant of mh
225        let root = unsafe { meta_inf.parent().unwrap_unchecked() };
226        macro_rules! err {
227            ($p:pat = $expr:expr;$id:ident) => {
228                #[allow(clippy::let_unit_value)]
229                let $p = match $expr {
230                    Ok(v) => v,
231                    Err(e) => return Err(NewArchiveError::$id(root.to_path_buf(), e)),
232                };
233            };
234        }
235        macro_rules! dump {
236            ($f:expr; $($t:tt)*) => {
237                err!(f = std::fs::File::create(&$f);Write);
238                if let Err(e) = write!(std::io::BufWriter::new(f),$($t)*) {
239                    return Err(NewArchiveError::Write($f, e));
240                }
241            };
242        }
243        err!(() = std::fs::create_dir_all(&meta_inf);CreateDir);
244        let manifest = meta_inf.join("MANIFEST.MF");
245        err!(mf = std::fs::File::create_new(&manifest);Write);
246        if let Err(e) = write!(
247            std::io::BufWriter::new(mf),
248            "id: {id}\nurl-base: {base_uri}\nformat: {}",
249            format.name
250        ) {
251            return Err(NewArchiveError::Write(manifest, e));
252        }
253        dump!(root.join(".gitignore");"{}",include_str!("gitignore_template.txt"));
254
255        let lib = root.join("lib");
256        err!(() = std::fs::create_dir_all(&lib);CreateDir);
257        let preamble = lib.join("preamble.tex");
258        dump!(preamble;"% preamble code for stex");
259
260        let source = root.join("source");
261        err!(() = std::fs::create_dir_all(&source);CreateDir);
262        let default = source.join(default_file);
263        dump!(default;"{}",content);
264        self.load_one(&manifest, RelPath::from_id(id))
265            .expect("this is a bug");
266        Ok(root.to_path_buf())
267    }
268
269    pub fn index(&self, external_url: &str) -> (Vec<Institution>, Vec<ArchiveIndex>) {
270        let tree = self.tree.read();
271        if let Some(idx) = (*tree.index.read()).clone() {
272            return match idx {
273                either::Left(r) => r,
274                either::Right(r) => r.recv().expect("this is a bug"),
275            };
276        }
277        let (s, r) = flume::bounded(1);
278        *tree.index.write() = Some(either::Right(r));
279        let (is, ars) = tree.load_index(external_url);
280        *tree.index.write() = Some(either::Left((is.clone(), ars.clone())));
281        while s.receiver_count() > 0 {
282            let _ = s.send((is.clone(), ars.clone()));
283        }
284        (is, ars)
285    }
286
287    fn fut1(
288        r: flume::Receiver<(Vec<Institution>, Vec<ArchiveIndex>)>,
289    ) -> impl Future<Output = (Vec<Institution>, Vec<ArchiveIndex>)> + Send {
290        async move { r.recv_async().await.expect("this is a bug") }
291    }
292    fn ft(
293        v: either::Either<
294            (Vec<Institution>, Vec<ArchiveIndex>),
295            flume::Receiver<(Vec<Institution>, Vec<ArchiveIndex>)>,
296        >,
297    ) -> impl Future<Output = (Vec<Institution>, Vec<ArchiveIndex>)> + Send {
298        match v {
299            either::Left(r) => either::Left(std::future::ready(r)),
300            either::Right(r) => either::Right(Self::fut1(r)),
301        }
302    }
303
304    pub fn index_async<A: AsyncEngine>(
305        external_url: impl Fn() -> &'static str + Send + Sync + 'static,
306    ) -> impl Future<Output = (Vec<Institution>, Vec<ArchiveIndex>)> + Send {
307        let tree = crate::backend::GlobalBackend.tree.read();
308        let idx = (*tree.index.read()).clone();
309        if let Some(idx) = idx {
310            return either::Left(Self::ft(idx));
311        }
312        let (s, r) = flume::bounded(1);
313        *tree.index.write() = Some(either::Right(r));
314        drop(tree);
315        either::Right(async move {
316            let (is, ars) = A::block_on(move || {
317                let tree = crate::backend::GlobalBackend.tree.read();
318                let (is, ars) = tree.load_index(external_url());
319                *tree.index.write() = Some(either::Left((is.clone(), ars.clone())));
320                drop(tree);
321                (is, ars)
322            })
323            .await;
324            while s.receiver_count() > 0 {
325                let _ = s.send_async((is.clone(), ars.clone())).await;
326            }
327            (is, ars)
328        })
329    }
330}
331
332#[derive(Debug, Default)]
333pub struct ArchiveTree {
334    pub archives: Vec<Archive>,
335    pub top: Vec<ArchiveOrGroup>,
336    index: parking_lot::RwLock<
337        Option<
338            either::Either<
339                (Vec<Institution>, Vec<ArchiveIndex>),
340                flume::Receiver<(Vec<Institution>, Vec<ArchiveIndex>)>,
341            >,
342        >,
343    >, //pub index: (Vec<Institution>, Vec<ArchiveIndex>),
344}
345
346#[derive(Debug)]
347pub enum ArchiveOrGroup {
348    Archive(ArchiveId),
349    Group(ArchiveGroup),
350}
351
352#[derive(Debug)]
353pub struct ArchiveGroup {
354    pub id: ArchiveId,
355    pub children: Vec<ArchiveOrGroup>,
356    pub state: FileStates,
357}
358
359pub trait MaybeTriple: Send {
360    #[cfg(feature = "rdf")]
361    fn add_triple(&mut self, quad: impl FnOnce() -> ulo::rdf_types::Triple);
362}
363impl MaybeTriple for () {
364    #[cfg(feature = "rdf")]
365    #[inline]
366    fn add_triple(&mut self, _: impl FnOnce() -> ulo::rdf_types::Triple) {}
367}
368#[cfg(feature = "rdf")]
369impl<F> MaybeTriple for F
370where
371    F: FnMut(ulo::rdf_types::Triple) + Send,
372{
373    #[inline]
374    fn add_triple(&mut self, quad: impl FnOnce() -> ulo::rdf_types::Triple) {
375        self(quad());
376    }
377}
378
379impl ArchiveTree {
380    #[must_use]
381    pub fn state(&self) -> FileStates {
382        let mut r = FileStates::default();
383        for aog in &self.top {
384            match aog {
385                ArchiveOrGroup::Archive(a) => {
386                    if let Some(Archive::Local(a)) = self.get(a) {
387                        r.merge_all(&a.file_state.read().state);
388                    }
389                }
390                ArchiveOrGroup::Group(g) => r.merge_all(&g.state),
391            }
392        }
393        r
394    }
395
396    #[must_use]
397    pub fn get_group_or_archive(&self, id: &ArchiveId) -> Option<&ArchiveOrGroup> {
398        let mut steps = id.steps().peekable();
399        let mut curr = &self.top;
400        while let Some(step) = steps.next() {
401            let e = curr
402                .binary_search_by_key(&step, |e| e.id().last())
403                .ok()
404                .map(|i| &curr[i])?;
405            if steps.peek().is_none() {
406                return Some(e);
407            }
408            if let ArchiveOrGroup::Group(g) = e {
409                curr = &g.children;
410            } else {
411                return None;
412            }
413        }
414        None
415    }
416
417    #[must_use]
418    pub fn get(&self, id: &ArchiveId) -> Option<&Archive> {
419        self.archives
420            .binary_search_by_key(&id, |a: &Archive| a.id())
421            .ok()
422            .map(|i| &self.archives[i])
423    }
424
425    #[allow(clippy::linkedlist)]
426    fn load(
427        &self,
428        paths: &[&Path],
429        skip_existent: bool,
430    ) -> std::collections::LinkedList<Vec<Archive>> {
431        use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
432        use spliter::ParallelSpliterator;
433        paths
434            .par_iter()
435            .flat_map(|p| {
436                crate::archive_iter::ManifestIterator::new(p)
437                    .par_split()
438                    .into_par_iter()
439                    .map(move |r| (p, r))
440                    .filter_map(|(mh, p)| {
441                        // SAFETY: manifest file is grandchild of root directory of archive
442                        let top_dir =
443                            unsafe { p.parent().unwrap_unchecked().parent().unwrap_unchecked() };
444                        let rel_path = top_dir.relative_to(mh)?;
445                        let Some(id): Option<ArchiveId> = rel_path.parse().ok() else {
446                            tracing::warn!("invalid archive id: {rel_path}");
447                            return None;
448                        };
449                        if skip_existent && self.get(&id).is_some() {
450                            return None;
451                        }
452                        match crate::manifest::parse_manifest(&p, rel_path) {
453                            Ok(r) => Some(r),
454                            Err(e) => {
455                                tracing::warn!("{e} in {rel_path}");
456                                None
457                            }
458                        }
459                    })
460                    .map(|a| {
461                        if let Archive::Local(a) = &a {
462                            a.update_sources();
463                        }
464                        a
465                    })
466            })
467            .collect_vec_list()
468        /*for n in news.into_iter().flatten() {
469            self.insert(n, &mut f);
470        }*/
471    }
472
473    fn insert(
474        &mut self,
475        archive: Archive,
476        #[cfg(feature = "rdf")] triple_store: &crate::triple_store::RDFStore,
477    ) {
478        #[cfg(feature = "rdf")]
479        let mut triples = vec![{
480            use ftml_uris::FtmlUri;
481            ulo::triple!(<(archive.uri().to_iri())>: ulo:library)
482        }];
483
484        let id = archive.id().clone();
485        let rel_path = RelPath::from_id(&id);
486        let steps = if let Some((group, _)) = rel_path.split_last() {
487            group.steps()
488        } else {
489            match self
490                .archives
491                .binary_search_by_key(&&id, |a: &Archive| a.id())
492            {
493                Ok(i) => self.archives[i] = archive,
494                Err(i) => self.archives.insert(i, archive),
495            }
496            match self
497                .top
498                .binary_search_by_key(&id.as_ref(), |v| v.id().last())
499            {
500                Ok(i) => self.top[i] = ArchiveOrGroup::Archive(id),
501                Err(i) => self.top.insert(i, ArchiveOrGroup::Archive(id)),
502            }
503            return;
504        };
505        let mut curr = &mut self.top;
506        let mut curr_name_len = 0;
507        let mut group = &id;
508        for step in steps {
509            if curr_name_len == 0 {
510                curr_name_len += step.len();
511            } else {
512                curr_name_len += step.len() + 1;
513            }
514            let curr_name = &id.as_ref()[..curr_name_len];
515            match curr.binary_search_by_key(&step, |v| v.id().last()) {
516                Ok(i) => {
517                    let ArchiveOrGroup::Group(g) = &mut curr[i]
518                    // TODO maybe reachable?
519                    else {
520                        unreachable!()
521                    };
522                    if let Archive::Local(a) = &archive {
523                        g.state.merge_all(a.file_state.read().state());
524                    }
525                    group = &g.id;
526                    curr = &mut g.children;
527                }
528                Err(i) => {
529                    let mut state = FileStates::default();
530                    if let Archive::Local(a) = &archive {
531                        state.merge_all(a.file_state.read().state());
532                    }
533                    let g = ArchiveGroup {
534                        // SAFETY: known to be valid
535                        id: unsafe { curr_name.parse().unwrap_unchecked() },
536                        children: Vec::new(),
537                        state,
538                    };
539                    curr.insert(i, ArchiveOrGroup::Group(g));
540                    let ArchiveOrGroup::Group(g) = &mut curr[i] else {
541                        unreachable!()
542                    };
543                    #[cfg(feature = "rdf")]
544                    {
545                        use ftml_uris::FtmlUri;
546                        let iri = (archive.uri().base.clone() & g.id.clone()).to_iri();
547                        if *group != id {
548                            let parent = (archive.uri().base.clone() & group.clone()).to_iri();
549                            triples.push(ulo::triple!(<(parent)> ulo:contains <(iri.clone())>));
550                        }
551                        triples.push(ulo::triple!(<(iri)>: ulo:library_group));
552                    }
553                    curr = &mut g.children;
554                }
555            }
556        }
557
558        #[cfg(feature = "rdf")]
559        {
560            use ftml_uris::FtmlUri;
561            let parent = (archive.uri().base.clone() & group.clone()).to_iri();
562            triples.push(ulo::triple!(<(parent)> ulo:contains <(archive.uri().to_iri())>));
563            let global = ulo::rdf_types::NamedNodeRef::new_unchecked("flams://archives");
564            triple_store.add_quads(triples.into_iter().map(|t| t.in_graph(global)));
565        }
566
567        match self
568            .archives
569            .binary_search_by_key(&&id, |a: &Archive| a.id())
570        {
571            Ok(i) => self.archives[i] = archive,
572            Err(i) => self.archives.insert(i, archive),
573        }
574        match curr.binary_search_by_key(&id.last(), |v| v.id().last()) {
575            Ok(i) => curr[i] = ArchiveOrGroup::Archive(id),
576            Err(i) => curr.insert(i, ArchiveOrGroup::Archive(id)),
577        }
578    }
579
580    fn load_index(&self, external_url: &str) -> (Vec<Institution>, Vec<ArchiveIndex>) {
581        let mut is = Vec::new();
582        let mut ai = Vec::new();
583        for a in &self.archives {
584            let Some(p) = crate::LocalArchive::manifest_of(a.path()) else {
585                continue;
586            };
587            let Some(p) = p.parent().map(|p| p.join("archive.json")) else {
588                continue;
589            };
590            let (isi, ars) = crate::manifest::read_archive_json(a.uri(), &p, external_url);
591            for i in isi {
592                if !is.contains(&i) {
593                    is.push(i);
594                }
595            }
596            for a in ars {
597                if !ai.contains(&a) {
598                    ai.push(a);
599                }
600            }
601        }
602        (is, ai)
603    }
604}
605
606impl ArchiveOrGroup {
607    #[inline]
608    #[must_use]
609    pub const fn id(&self) -> &ArchiveId {
610        match self {
611            Self::Archive(id) => id,
612            Self::Group(g) => &g.id,
613        }
614    }
615}
616
617impl<'a> TreeChild<'a> for &'a ArchiveOrGroup {
618    fn tree_children(self) -> impl Iterator<Item = Self> {
619        match self {
620            ArchiveOrGroup::Archive(_) => either::Either::Left(std::iter::empty()),
621            ArchiveOrGroup::Group(g) => either::Either::Right(g.children.iter()),
622        }
623    }
624}
625
626impl RefTree for ArchiveTree {
627    type Child<'a>
628        = &'a ArchiveOrGroup
629    where
630        Self: 'a;
631    #[inline]
632    fn tree_children(&self) -> impl Iterator<Item = Self::Child<'_>> {
633        self.top.iter()
634    }
635}
636impl RefTree for ArchiveGroup {
637    type Child<'a>
638        = &'a ArchiveOrGroup
639    where
640        Self: 'a;
641    #[inline]
642    fn tree_children(&self) -> impl Iterator<Item = Self::Child<'_>> {
643        self.children.iter()
644    }
645}