Skip to main content

flams_math_archives/triple_store/
mod.rs

1pub mod sparql;
2
3use ftml_uris::{ArchiveUri, DocumentUri, FtmlUri, Language, SymbolUri, UriPath, UriWithPath};
4use std::{path::Path, str::FromStr};
5use ulo::rdf_types::{GraphNameRef, Quad, Triple};
6
7use crate::{
8    Archive, LocallyBuilt, MathArchive,
9    utils::{AsyncEngine, path_ext::PathExt},
10};
11
12pub struct RDFStore {
13    store: oxigraph::store::Store,
14}
15
16#[derive(thiserror::Error, Debug)]
17pub enum SparqlError {
18    #[error("sparql syntax error: {0}")]
19    Syntax(#[from] sparql::SparqlSyntaxError),
20    #[error("sparql query error: {0}")]
21    Query(#[from] sparql::QueryError),
22}
23
24impl RDFStore {
25    #[cfg(feature = "rocksdb")]
26    #[must_use]
27    pub fn new(path: &Path) -> Self {
28        let _ = std::fs::remove_dir_all(path);
29        let store = oxigraph::store::Store::open(path).expect("failed to open rdf database");
30        let _ = store.clear();
31        let mut loader = store.bulk_loader();
32        loader
33            .load_quads(ulo::ulo::QUADS.iter().copied())
34            .expect("error loading ulo base ontology; this is a bug.");
35        loader
36            .commit()
37            .expect("error loading ulo base ontology; this is a bug.");
38        Self { store }
39    }
40
41    #[inline]
42    pub fn clear(&self) {
43        let _ = self.store.clear();
44    }
45    #[inline]
46    #[must_use]
47    pub fn num_relations(&self) -> usize {
48        self.store.len().unwrap_or_default()
49    }
50    pub fn add_quads(&self, iter: impl Iterator<Item = Quad>) {
51        let mut loader = self.store.bulk_loader();
52        let _ = loader.load_quads(iter);
53        let _ = loader.commit();
54    }
55
56    pub fn add_graph<'a>(
57        &self,
58        graph: impl Into<GraphNameRef<'a>>,
59        triples: impl Iterator<Item = Triple>,
60    ) {
61        let graph = graph.into();
62        let _ = self.store.clear_graph(graph);
63        self.add_quads(triples.map(|t| Quad {
64            subject: t.subject,
65            predicate: t.predicate,
66            object: t.object,
67            graph_name: graph.into_owned(),
68        }));
69    }
70
71    #[must_use]
72    pub fn los<E: AsyncEngine>(&self, s: &SymbolUri, problems: bool) -> Option<sparql::LOIter<'_>> {
73        let q = sparql::lo_query(s, problems);
74        self.query::<E>(q).ok().and_then(|s| {
75            if let sparql::QueryResults::Solutions(s) = s.0 {
76                Some(sparql::LOIter { inner: s })
77            } else {
78                None
79            }
80        })
81    }
82
83    pub fn export(&self, iter: impl Iterator<Item = Triple>, p: &Path, uri: &DocumentUri) {
84        if let Ok(file) = std::fs::File::create(p) {
85            let writer = std::io::BufWriter::new(file);
86            let iri = uri.path_uri().to_iri();
87            let ns = iri.as_str();
88            // SAFETY: all prefixes are valid iris
89            let mut writer = unsafe {
90                oxigraph::io::RdfSerializer::from_format(oxigraph::io::RdfFormat::Turtle)
91                    .with_prefix("ns", ns)
92                    .unwrap_unchecked()
93                    .with_prefix("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns")
94                    .unwrap_unchecked()
95                    .with_prefix("ulo", "http://mathhub.info/ulo")
96                    .unwrap_unchecked()
97                    .with_prefix("dc", "http://purl.org/dc/terms")
98                    .unwrap_unchecked()
99                    .for_writer(writer)
100            };
101            for t in iter {
102                if let Err(e) = writer.serialize_triple(&t) {
103                    tracing::warn!("Error serializing triple: {e:?}");
104                }
105            }
106            let _ = writer.finish();
107        }
108    }
109
110    /// ### Errors
111    /// ### Panics
112    pub fn query_str<E: AsyncEngine>(
113        &self,
114        s: impl AsRef<str>,
115    ) -> Result<sparql::QueryResult<'_>, SparqlError> {
116        /*let mut query_str = String::from(
117            r"PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
118          PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
119          PREFIX dc: <http://purl.org/dc/terms#>
120          PREFIX ulo: <http://mathhub.info/ulo#>
121      ",
122        );
123        query_str.push_str(s.as_ref());*/
124        let query = spargebra::SparqlParser::new()
125            .with_prefix("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#")
126            .expect("bug")
127            .with_prefix("rdfs", "http://www.w3.org/2000/01/rdf-schema#")
128            .expect("bug")
129            .with_prefix("dc", "http://purl.org/dc/terms#")
130            .expect("bug")
131            .with_prefix("ulo", "http://mathhub.info/ulo#")
132            .expect("bug")
133            .parse_query(s.as_ref())?;
134        //let query: oxigraph::sparql::Query = query_str.as_str().try_into()?;
135        self.query::<E>(query).map_err(Into::into)
136    }
137
138    /// ### Errors
139    pub fn query<E: AsyncEngine>(
140        &self,
141        q: spargebra::Query,
142    ) -> Result<sparql::QueryResult<'_>, sparql::QueryError> {
143        //normalize(&mut q);
144
145        let token = oxigraph::sparql::CancellationToken::new();
146        let tk = token.clone();
147        E::exec_after(std::time::Duration::from_secs(5), move || tk.cancel());
148        /*
149        let token = oxigraph::sparql::CancellationToken::new();
150        let tk = token.clone();
151        std::thread::spawn(move || {
152            std::thread::sleep(std::time::Duration::from_secs(5));
153            tk.cancel();
154        });
155         */
156        let mut q = oxigraph::sparql::SparqlEvaluator::new()
157            .with_cancellation_token(token)
158            .for_query(q);
159        q.dataset_mut().set_default_graph_as_union();
160        Ok(q.on_store(&self.store).execute().map(sparql::QueryResult)?)
161    }
162
163    pub(crate) fn load(&self, path: &Path, graph: ulo::rdf_types::NamedNode) {
164        let Ok(file) = std::fs::File::open(path) else {
165            tracing::error!("Failed to open file {}", path.display());
166            return;
167        };
168        let _ = self.store.clear_graph(&graph);
169
170        let buf = std::io::BufReader::new(file);
171        let mut loader = self.store.bulk_loader();
172        let reader = oxigraph::io::RdfParser::from_format(oxigraph::io::RdfFormat::Turtle)
173            .with_default_graph(graph)
174            .for_reader(buf);
175        let _ = loader.load_quads(reader.filter_map(Result::ok));
176        //println!("commiting new triples");
177        let _ = loader.commit();
178    }
179
180    pub fn load_archives(&self, archives: &[Archive]) {
181        use rayon::prelude::*;
182        tracing::info_span!(target:"relational","loading relational","Loading relational for {} archives...",archives.len()).in_scope(move || {
183            let old = self.store.len().unwrap_or_default();
184            let all_files = archives
185                .par_iter()
186                .filter_map(|a| match a {
187                    Archive::Local(a) => Some(a),
188                    Archive::Ext(..) => None,
189                })
190                .filter_map(|a| {
191                    let out = a.out_dir();
192                    if out.exists() && out.is_dir() {
193                        Some(
194                            walkdir::WalkDir::new(out)
195                                .into_iter()
196                                .filter_map(Result::ok)
197                                .filter(|entry| entry.file_name() == "index.ttl")
198                                .filter_map(|e| {
199                                    let Some(graph) = Self::get_iri(a.uri(), out, &e) else {
200                                        println!("wut! {}",e.path().display());
201                                        return None
202                                    };
203                                    //let graph = Self::get_iri(a.uri(), out, &e)?;
204                                    Some((e.into_path(), graph))
205                                })
206                                .collect::<Vec<_>>(),
207                        )
208                        /*for e in walkdir::WalkDir::new(out)
209                            .into_iter()
210                            .filter_map(Result::ok)
211                            .filter(|entry| entry.file_name() == "index.ttl")
212                        {
213                            let Some(graph) = Self::get_iri(a.uri(), out, &e) else {
214                                return None;
215                            };
216                            Some((e.into_path(),graph))
217                            //self.load(e.path(), graph);
218                        }*/
219                    } else {
220                        None
221                    }
222                })
223                .collect_vec_list();
224            for (i,path_graph) in all_files.into_iter().flatten().enumerate() {//.flatten().flatten().enumerate() {
225                //tracing::info!("Loading {}",i+1);
226                let mut loader = self.store.bulk_loader();
227                for (path,graph) in path_graph {
228                    let Ok(file) = std::fs::File::open(&path) else {
229                        tracing::error!("Failed to open file {}", path.display());
230                        continue;
231                    };
232                    let buf = std::io::BufReader::new(file);
233                    let reader = oxigraph::io::RdfParser::from_format(oxigraph::io::RdfFormat::Turtle)
234                        .with_default_graph(graph)
235                        .for_reader(buf);
236                    let _ = loader.load_quads(reader.filter_map(Result::ok));
237                }
238                let _ = loader.commit();
239            }
240
241            tracing::info!(target:"relational","Loaded {} relations", self.store.len().unwrap_or_default() - old);
242        });
243    }
244
245    fn get_iri(
246        a: &ArchiveUri,
247        out: &Path,
248        e: &walkdir::DirEntry,
249    ) -> Option<ulo::rdf_types::NamedNode> {
250        let parent = e.path().parent()?;
251        let parentname = parent.file_name()?.to_str()?;
252        //let parentname = parentname.rsplit_once('.').map_or(parentname, |(s, _)| s);
253        let language = Language::from_rel_path(parentname);
254        let parentname = parentname
255            .strip_suffix(&format!(".{language}"))
256            .unwrap_or(parentname);
257        let grandparent = parent.parent()?;
258        let path: Option<UriPath> = if grandparent == out {
259            None
260        } else {
261            Some(grandparent.relative_to(&out)?.parse().ok()?)
262        };
263        //let path: UriPath = parent.parent()?.relative_to(&out)?.parse().ok()?;
264        let doc: DocumentUri = (a.clone() / path) & (parentname.parse().ok()?, language);
265        Some(doc.to_iri())
266    }
267}
268
269impl std::fmt::Debug for RDFStore {
270    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271        f.debug_struct("RDFStore").finish()
272    }
273}
274impl Default for RDFStore {
275    fn default() -> Self {
276        let store = oxigraph::store::Store::new().unwrap_or_else(|_| unreachable!());
277        let mut loader = store.bulk_loader();
278        loader
279            .load_quads(ulo::ulo::QUADS.iter().copied())
280            .expect("error loading ulo base ontology; this is a bug.");
281        loader
282            .commit()
283            .expect("error loading ulo base ontology; this is a bug.");
284        Self { store }
285    }
286}
287
288fn normalize(query: &mut spargebra::Query) {
289    fn norm(n: &mut ulo::rdf_types::NamedNode) {
290        if n.as_str().starts_with("https:") {
291            *n = ulo::rdf_types::NamedNode::new_unchecked(format!("http:{}", &n.as_str()[6..]));
292        }
293    }
294    fn norm_i(n: &mut oxiri::Iri<String>) {
295        if n.starts_with("https:") {
296            *n = oxiri::Iri::parse_unchecked(format!("http:{}", &n.as_str()[6..]));
297        }
298    }
299    fn dset(d: &mut ::spargebra::algebra::QueryDataset) {
300        for n in d.default.iter_mut() {
301            norm(n);
302        }
303        if let Some(n) = &mut d.named {
304            for n in n.iter_mut() {
305                norm(n);
306            }
307        }
308    }
309    fn termpat(p: &mut spargebra::term::TermPattern) {
310        use spargebra::term::TermPattern as TP;
311        match p {
312            TP::BlankNode(_) | TP::Literal(_) | TP::Variable(_) => (),
313            TP::NamedNode(n) => norm(n),
314            TP::Triple(t) => trip(t),
315        }
316    }
317    fn nnpat(p: &mut spargebra::term::NamedNodePattern) {
318        if let spargebra::term::NamedNodePattern::NamedNode(n) = p {
319            norm(n);
320        }
321    }
322    fn trip(p: &mut spargebra::term::TriplePattern) {
323        termpat(&mut p.subject);
324        termpat(&mut p.object);
325        nnpat(&mut p.predicate);
326    }
327    fn expr(e: &mut spargebra::algebra::Expression) {
328        use spargebra::algebra::Expression as Exp;
329        match e {
330            Exp::NamedNode(n) => norm(n),
331            Exp::Or(a, b)
332            | Exp::Add(a, b)
333            | Exp::And(a, b)
334            | Exp::Divide(a, b)
335            | Exp::Equal(a, b)
336            | Exp::SameTerm(a, b)
337            | Exp::Greater(a, b)
338            | Exp::GreaterOrEqual(a, b)
339            | Exp::Less(a, b)
340            | Exp::Subtract(a, b)
341            | Exp::Multiply(a, b)
342            | Exp::LessOrEqual(a, b) => {
343                expr(a);
344                expr(b);
345            }
346            Exp::In(a, v) => {
347                expr(a);
348                for e in v {
349                    expr(e);
350                }
351            }
352            Exp::UnaryPlus(e) | Exp::UnaryMinus(e) | Exp::Not(e) => expr(e),
353            Exp::Exists(p) => pat(p),
354            Exp::If(a, b, c) => {
355                expr(a);
356                expr(b);
357                expr(c);
358            }
359            Exp::Coalesce(e) | Exp::FunctionCall(_, e) => {
360                for e in e {
361                    expr(e)
362                }
363            }
364            Exp::Bound(_) | Exp::Literal(_) | Exp::Variable(_) => (),
365        }
366    }
367    fn ppexpr(e: &mut spargebra::algebra::PropertyPathExpression) {
368        use spargebra::algebra::PropertyPathExpression as Exp;
369        match e {
370            Exp::NamedNode(n) => norm(n),
371            Exp::Reverse(n) | Exp::ZeroOrMore(n) | Exp::OneOrMore(n) | Exp::ZeroOrOne(n) => {
372                ppexpr(n)
373            }
374            Exp::Sequence(a, b) | Exp::Alternative(a, b) => {
375                ppexpr(a);
376                ppexpr(b);
377            }
378            Exp::NegatedPropertySet(v) => {
379                for e in v {
380                    norm(e);
381                }
382            }
383        }
384    }
385    fn pat(p: &mut spargebra::algebra::GraphPattern) {
386        use spargebra::algebra::GraphPattern as GP;
387        match p {
388            GP::Bgp { patterns } => {
389                for p in patterns {
390                    trip(p)
391                }
392            }
393            GP::Distinct { inner } => pat(inner),
394            GP::Extend {
395                inner, expression, ..
396            } => {
397                pat(inner);
398                expr(expression);
399            }
400            GP::Filter { expr: exp, inner } => {
401                expr(exp);
402                pat(inner);
403            }
404            GP::Graph { name, inner } | GP::Service { name, inner, .. } => {
405                nnpat(name);
406                pat(inner);
407            }
408            GP::Group {
409                inner, aggregates, ..
410            } => {
411                pat(inner);
412                for (_, e) in aggregates {
413                    if let spargebra::algebra::AggregateExpression::FunctionCall {
414                        expr: e, ..
415                    } = e
416                    {
417                        expr(e);
418                    }
419                }
420            }
421            GP::Join { left, right }
422            | GP::Lateral { left, right }
423            | GP::Minus { left, right }
424            | GP::Union { left, right } => {
425                pat(left);
426                pat(right);
427            }
428            GP::LeftJoin {
429                left,
430                right,
431                expression,
432            } => {
433                pat(left);
434                pat(right);
435                if let Some(e) = expression {
436                    expr(e);
437                }
438            }
439            GP::OrderBy { inner, expression } => {
440                pat(inner);
441                for e in expression {
442                    match e {
443                        spargebra::algebra::OrderExpression::Asc(e)
444                        | spargebra::algebra::OrderExpression::Desc(e) => expr(e),
445                    }
446                }
447            }
448            GP::Path {
449                subject,
450                path,
451                object,
452            } => {
453                termpat(subject);
454                termpat(object);
455                ppexpr(path);
456            }
457            GP::Project { inner, .. } | GP::Reduced { inner } | GP::Slice { inner, .. } => {
458                pat(inner);
459            }
460            GP::Values { .. } => (),
461        }
462    }
463    use spargebra::Query as Q;
464    match query {
465        Q::Ask {
466            dataset,
467            pattern,
468            base_iri,
469        }
470        | Q::Describe {
471            dataset,
472            pattern,
473            base_iri,
474        }
475        | Q::Select {
476            dataset,
477            pattern,
478            base_iri,
479        } => {
480            if let Some(d) = dataset {
481                dset(d);
482            }
483            if let Some(iri) = base_iri {
484                norm_i(iri);
485            }
486            pat(pattern);
487        }
488        Q::Construct {
489            template,
490            dataset,
491            pattern,
492            base_iri,
493        } => {
494            if let Some(d) = dataset {
495                dset(d);
496            }
497            if let Some(iri) = base_iri {
498                norm_i(iri);
499            }
500            for t in template {
501                trip(t);
502            }
503            pat(pattern);
504        }
505    }
506}