flams_math_archives/triple_store/
mod.rs

1pub mod sparql;
2
3use ftml_uris::{ArchiveUri, DocumentUri, FtmlUri, Language, SymbolUri, UriPath, UriWithPath};
4use std::{path::Path, str::FromStr};
5use ulo::rdf_types::{Quad, Triple};
6
7use crate::{
8    Archive, LocallyBuilt, MathArchive,
9    utils::{AsyncEngine, path_ext::PathExt},
10};
11
12pub struct RDFStore {
13    store: oxigraph::store::Store,
14}
15
16#[derive(thiserror::Error, Debug)]
17pub enum SparqlError {
18    #[error("sparql syntax error: {0}")]
19    Syntax(#[from] sparql::SparqlSyntaxError),
20    #[error("sparql query error: {0}")]
21    Query(#[from] sparql::QueryError),
22}
23
24impl RDFStore {
25    #[cfg(feature = "rocksdb")]
26    pub fn new(path: &Path) -> Self {
27        let _ = std::fs::remove_dir_all(path);
28        let store = oxigraph::store::Store::open(path).expect("failed to open rdf database");
29        store.clear();
30        let mut loader = store.bulk_loader();
31        loader
32            .load_quads(ulo::ulo::QUADS.iter().copied())
33            .expect("error loading ulo base ontology; this is a bug.");
34        loader
35            .commit()
36            .expect("error loading ulo base ontology; this is a bug.");
37        Self { store }
38    }
39
40    #[inline]
41    pub fn clear(&self) {
42        let _ = self.store.clear();
43    }
44    #[inline]
45    #[must_use]
46    pub fn num_relations(&self) -> usize {
47        self.store.len().unwrap_or_default()
48    }
49    pub fn add_quads(&self, iter: impl Iterator<Item = Quad>) {
50        let mut loader = self.store.bulk_loader();
51        let _ = loader.load_quads(iter);
52        let _ = loader.commit();
53    }
54
55    #[must_use]
56    pub fn los<E: AsyncEngine>(&self, s: &SymbolUri, problems: bool) -> Option<sparql::LOIter<'_>> {
57        let q = sparql::lo_query(s, problems);
58        self.query::<E>(q).ok().and_then(|s| {
59            if let sparql::QueryResults::Solutions(s) = s.0 {
60                Some(sparql::LOIter { inner: s })
61            } else {
62                None
63            }
64        })
65    }
66
67    pub fn export(&self, iter: impl Iterator<Item = Triple>, p: &Path, uri: &DocumentUri) {
68        if let Ok(file) = std::fs::File::create(p) {
69            let writer = std::io::BufWriter::new(file);
70            let iri = uri.path_uri().to_iri();
71            let ns = iri.as_str();
72            // SAFETY: all prefixes are valid iris
73            let mut writer = unsafe {
74                oxigraph::io::RdfSerializer::from_format(oxigraph::io::RdfFormat::Turtle)
75                    .with_prefix("ns", ns)
76                    .unwrap_unchecked()
77                    .with_prefix("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns")
78                    .unwrap_unchecked()
79                    .with_prefix("ulo", "http://mathhub.info/ulo")
80                    .unwrap_unchecked()
81                    .with_prefix("dc", "http://purl.org/dc/terms")
82                    .unwrap_unchecked()
83                    .for_writer(writer)
84            };
85            for t in iter {
86                if let Err(e) = writer.serialize_triple(&t) {
87                    tracing::warn!("Error serializing triple: {e:?}");
88                }
89            }
90            let _ = writer.finish();
91        }
92    }
93
94    /// ### Errors
95    pub fn query_str<E: AsyncEngine>(
96        &self,
97        s: impl AsRef<str>,
98    ) -> Result<sparql::QueryResult<'_>, SparqlError> {
99        /*let mut query_str = String::from(
100            r"PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
101          PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
102          PREFIX dc: <http://purl.org/dc/terms#>
103          PREFIX ulo: <http://mathhub.info/ulo#>
104      ",
105        );
106        query_str.push_str(s.as_ref());*/
107        let query = spargebra::SparqlParser::new()
108            .with_prefix("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#")
109            .expect("bug")
110            .with_prefix("rdfs", "http://www.w3.org/2000/01/rdf-schema#")
111            .expect("bug")
112            .with_prefix("dc", "http://purl.org/dc/terms#")
113            .expect("bug")
114            .with_prefix("ulo", "http://mathhub.info/ulo#")
115            .expect("bug")
116            .parse_query(s.as_ref())?;
117        //let query: oxigraph::sparql::Query = query_str.as_str().try_into()?;
118        self.query::<E>(query).map_err(Into::into)
119    }
120
121    /// ### Errors
122    pub fn query<E: AsyncEngine>(
123        &self,
124        mut q: spargebra::Query,
125    ) -> Result<sparql::QueryResult<'_>, sparql::QueryError> {
126        normalize(&mut q);
127
128        let token = oxigraph::sparql::CancellationToken::new();
129        let tk = token.clone();
130        E::exec_after(std::time::Duration::from_secs(5), move || tk.cancel());
131        /*
132        let token = oxigraph::sparql::CancellationToken::new();
133        let tk = token.clone();
134        std::thread::spawn(move || {
135            std::thread::sleep(std::time::Duration::from_secs(5));
136            tk.cancel();
137        });
138         */
139        let mut q = oxigraph::sparql::SparqlEvaluator::new()
140            .with_cancellation_token(token)
141            .for_query(q);
142        q.dataset_mut().set_default_graph_as_union();
143        Ok(q.on_store(&self.store).execute().map(sparql::QueryResult)?)
144    }
145
146    pub(crate) fn load(&self, path: &Path, graph: ulo::rdf_types::NamedNode) {
147        let Ok(file) = std::fs::File::open(path) else {
148            tracing::error!("Failed to open file {}", path.display());
149            return;
150        };
151        let buf = std::io::BufReader::new(file);
152        let mut loader = self.store.bulk_loader();
153        let reader = oxigraph::io::RdfParser::from_format(oxigraph::io::RdfFormat::Turtle)
154            .with_default_graph(graph)
155            .for_reader(buf);
156        let _ = loader.load_quads(reader.filter_map(Result::ok));
157        let _ = loader.commit();
158    }
159
160    pub fn load_archives(&self, archives: &[Archive]) {
161        use rayon::prelude::*;
162        tracing::info_span!(target:"relational","loading relational","Loading relational for {} archives...",archives.len()).in_scope(move || {
163            let old = self.store.len().unwrap_or_default();
164            let all_files = archives
165                .par_iter()
166                .filter_map(|a| match a {
167                    Archive::Local(a) => Some(a),
168                    Archive::Ext(..) => None,
169                })
170                .filter_map(|a| {
171                    let out = a.out_dir();
172                    if out.exists() && out.is_dir() {
173                        Some(
174                            walkdir::WalkDir::new(out)
175                                .into_iter()
176                                .filter_map(Result::ok)
177                                .filter(|entry| entry.file_name() == "index.ttl")
178                                .filter_map(|e| {
179                                    let graph = Self::get_iri(a.uri(), out, &e)?;
180                                    Some((e.into_path(), graph))
181                                })
182                                .collect::<Vec<_>>(),
183                        )
184                        /*for e in walkdir::WalkDir::new(out)
185                            .into_iter()
186                            .filter_map(Result::ok)
187                            .filter(|entry| entry.file_name() == "index.ttl")
188                        {
189                            let Some(graph) = Self::get_iri(a.uri(), out, &e) else {
190                                return None;
191                            };
192                            Some((e.into_path(),graph))
193                            //self.load(e.path(), graph);
194                        }*/
195                    } else {
196                        None
197                    }
198                })
199                .collect_vec_list();
200            for (i,path_graph) in all_files.into_iter().flatten().enumerate() {//.flatten().flatten().enumerate() {
201                tracing::info!("Loading {}",i+1);
202                let mut loader = self.store.bulk_loader();
203                for (path,graph) in path_graph {
204                    let Ok(file) = std::fs::File::open(&path) else {
205                        tracing::error!("Failed to open file {}", path.display());
206                        continue;
207                    };
208                    let buf = std::io::BufReader::new(file);
209                    let reader = oxigraph::io::RdfParser::from_format(oxigraph::io::RdfFormat::Turtle)
210                        .with_default_graph(graph)
211                        .for_reader(buf);
212                    let _ = loader.load_quads(reader.filter_map(Result::ok));
213                }
214                let _ = loader.commit();
215            }
216
217            tracing::info!(target:"relational","Loaded {} relations", self.store.len().unwrap_or_default() - old);
218        });
219    }
220
221    fn get_iri(
222        a: &ArchiveUri,
223        out: &Path,
224        e: &walkdir::DirEntry,
225    ) -> Option<ulo::rdf_types::NamedNode> {
226        let parent = e.path().parent()?;
227        let parentname = parent.file_name()?.to_str()?;
228        let parentname = parentname.rsplit_once('.').map_or(parentname, |(s, _)| s);
229        let language = Language::from_rel_path(parentname);
230        let parentname = parentname
231            .strip_suffix(&format!(".{language}"))
232            .unwrap_or(parentname);
233        let path: UriPath = parent.parent()?.relative_to(&out)?.parse().ok()?;
234        let doc: DocumentUri = (a.clone() / path) & (parentname.parse().ok()?, language);
235        Some(doc.to_iri())
236    }
237}
238
239impl std::fmt::Debug for RDFStore {
240    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241        f.debug_struct("RDFStore").finish()
242    }
243}
244impl Default for RDFStore {
245    fn default() -> Self {
246        let store = oxigraph::store::Store::new().unwrap_or_else(|_| unreachable!());
247        let mut loader = store.bulk_loader();
248        loader
249            .load_quads(ulo::ulo::QUADS.iter().copied())
250            .expect("error loading ulo base ontology; this is a bug.");
251        loader
252            .commit()
253            .expect("error loading ulo base ontology; this is a bug.");
254        Self { store }
255    }
256}
257
258fn normalize(query: &mut spargebra::Query) {
259    fn norm(n: &mut ulo::rdf_types::NamedNode) {
260        if n.as_str().starts_with("https:") {
261            *n = ulo::rdf_types::NamedNode::new_unchecked(format!("http:{}", &n.as_str()[6..]));
262        }
263    }
264    fn norm_i(n: &mut oxiri::Iri<String>) {
265        if n.starts_with("https:") {
266            *n = oxiri::Iri::parse_unchecked(format!("http:{}", &n.as_str()[6..]));
267        }
268    }
269    fn dset(d: &mut ::spargebra::algebra::QueryDataset) {
270        for n in d.default.iter_mut() {
271            norm(n);
272        }
273        if let Some(n) = &mut d.named {
274            for n in n.iter_mut() {
275                norm(n);
276            }
277        }
278    }
279    fn termpat(p: &mut spargebra::term::TermPattern) {
280        use spargebra::term::TermPattern as TP;
281        match p {
282            TP::BlankNode(_) | TP::Literal(_) | TP::Variable(_) => (),
283            TP::NamedNode(n) => norm(n),
284            TP::Triple(t) => trip(t),
285        }
286    }
287    fn nnpat(p: &mut spargebra::term::NamedNodePattern) {
288        if let spargebra::term::NamedNodePattern::NamedNode(n) = p {
289            norm(n);
290        }
291    }
292    fn trip(p: &mut spargebra::term::TriplePattern) {
293        termpat(&mut p.subject);
294        termpat(&mut p.object);
295        nnpat(&mut p.predicate);
296    }
297    fn expr(e: &mut spargebra::algebra::Expression) {
298        use spargebra::algebra::Expression as Exp;
299        match e {
300            Exp::NamedNode(n) => norm(n),
301            Exp::Or(a, b)
302            | Exp::Add(a, b)
303            | Exp::And(a, b)
304            | Exp::Divide(a, b)
305            | Exp::Equal(a, b)
306            | Exp::SameTerm(a, b)
307            | Exp::Greater(a, b)
308            | Exp::GreaterOrEqual(a, b)
309            | Exp::Less(a, b)
310            | Exp::Subtract(a, b)
311            | Exp::Multiply(a, b)
312            | Exp::LessOrEqual(a, b) => {
313                expr(a);
314                expr(b);
315            }
316            Exp::In(a, v) => {
317                expr(a);
318                for e in v {
319                    expr(e);
320                }
321            }
322            Exp::UnaryPlus(e) | Exp::UnaryMinus(e) | Exp::Not(e) => expr(e),
323            Exp::Exists(p) => pat(p),
324            Exp::If(a, b, c) => {
325                expr(a);
326                expr(b);
327                expr(c);
328            }
329            Exp::Coalesce(e) | Exp::FunctionCall(_, e) => {
330                for e in e {
331                    expr(e)
332                }
333            }
334            Exp::Bound(_) | Exp::Literal(_) | Exp::Variable(_) => (),
335        }
336    }
337    fn ppexpr(e: &mut spargebra::algebra::PropertyPathExpression) {
338        use spargebra::algebra::PropertyPathExpression as Exp;
339        match e {
340            Exp::NamedNode(n) => norm(n),
341            Exp::Reverse(n) | Exp::ZeroOrMore(n) | Exp::OneOrMore(n) | Exp::ZeroOrOne(n) => {
342                ppexpr(n)
343            }
344            Exp::Sequence(a, b) | Exp::Alternative(a, b) => {
345                ppexpr(a);
346                ppexpr(b);
347            }
348            Exp::NegatedPropertySet(v) => {
349                for e in v {
350                    norm(e);
351                }
352            }
353        }
354    }
355    fn pat(p: &mut spargebra::algebra::GraphPattern) {
356        use spargebra::algebra::GraphPattern as GP;
357        match p {
358            GP::Bgp { patterns } => {
359                for p in patterns {
360                    trip(p)
361                }
362            }
363            GP::Distinct { inner } => pat(inner),
364            GP::Extend {
365                inner, expression, ..
366            } => {
367                pat(inner);
368                expr(expression);
369            }
370            GP::Filter { expr: exp, inner } => {
371                expr(exp);
372                pat(inner);
373            }
374            GP::Graph { name, inner } | GP::Service { name, inner, .. } => {
375                nnpat(name);
376                pat(inner);
377            }
378            GP::Group {
379                inner, aggregates, ..
380            } => {
381                pat(inner);
382                for (_, e) in aggregates {
383                    if let spargebra::algebra::AggregateExpression::FunctionCall {
384                        expr: e, ..
385                    } = e
386                    {
387                        expr(e);
388                    }
389                }
390            }
391            GP::Join { left, right }
392            | GP::Lateral { left, right }
393            | GP::Minus { left, right }
394            | GP::Union { left, right } => {
395                pat(left);
396                pat(right);
397            }
398            GP::LeftJoin {
399                left,
400                right,
401                expression,
402            } => {
403                pat(left);
404                pat(right);
405                if let Some(e) = expression {
406                    expr(e);
407                }
408            }
409            GP::OrderBy { inner, expression } => {
410                pat(inner);
411                for e in expression {
412                    match e {
413                        spargebra::algebra::OrderExpression::Asc(e)
414                        | spargebra::algebra::OrderExpression::Desc(e) => expr(e),
415                    }
416                }
417            }
418            GP::Path {
419                subject,
420                path,
421                object,
422            } => {
423                termpat(subject);
424                termpat(object);
425                ppexpr(path);
426            }
427            GP::Project { inner, .. } | GP::Reduced { inner } | GP::Slice { inner, .. } => {
428                pat(inner);
429            }
430            GP::Values { .. } => (),
431        }
432    }
433    use spargebra::Query as Q;
434    match query {
435        Q::Ask {
436            dataset,
437            pattern,
438            base_iri,
439        }
440        | Q::Describe {
441            dataset,
442            pattern,
443            base_iri,
444        }
445        | Q::Select {
446            dataset,
447            pattern,
448            base_iri,
449        } => {
450            if let Some(d) = dataset {
451                dset(d);
452            }
453            if let Some(iri) = base_iri {
454                norm_i(iri);
455            }
456            pat(pattern);
457        }
458        Q::Construct {
459            template,
460            dataset,
461            pattern,
462            base_iri,
463        } => {
464            if let Some(d) = dataset {
465                dset(d);
466            }
467            if let Some(iri) = base_iri {
468                norm_i(iri);
469            }
470            for t in template {
471                trip(t);
472            }
473            pat(pattern);
474        }
475    }
476}