flams_math_archives/triple_store/
mod.rs1pub mod sparql;
2
3use ftml_uris::{ArchiveUri, DocumentUri, FtmlUri, Language, SymbolUri, UriPath, UriWithPath};
4use std::{path::Path, str::FromStr};
5use ulo::rdf_types::{GraphNameRef, Quad, Triple};
6
7use crate::{
8 Archive, LocallyBuilt, MathArchive,
9 utils::{AsyncEngine, path_ext::PathExt},
10};
11
12pub struct RDFStore {
13 store: oxigraph::store::Store,
14}
15
16#[derive(thiserror::Error, Debug)]
17pub enum SparqlError {
18 #[error("sparql syntax error: {0}")]
19 Syntax(#[from] sparql::SparqlSyntaxError),
20 #[error("sparql query error: {0}")]
21 Query(#[from] sparql::QueryError),
22}
23
24impl RDFStore {
25 #[cfg(feature = "rocksdb")]
26 #[must_use]
27 pub fn new(path: &Path) -> Self {
28 let _ = std::fs::remove_dir_all(path);
29 let store = oxigraph::store::Store::open(path).expect("failed to open rdf database");
30 let _ = store.clear();
31 let mut loader = store.bulk_loader();
32 loader
33 .load_quads(ulo::ulo::QUADS.iter().copied())
34 .expect("error loading ulo base ontology; this is a bug.");
35 loader
36 .commit()
37 .expect("error loading ulo base ontology; this is a bug.");
38 Self { store }
39 }
40
41 #[inline]
42 pub fn clear(&self) {
43 let _ = self.store.clear();
44 }
45 #[inline]
46 #[must_use]
47 pub fn num_relations(&self) -> usize {
48 self.store.len().unwrap_or_default()
49 }
50 pub fn add_quads(&self, iter: impl Iterator<Item = Quad>) {
51 let mut loader = self.store.bulk_loader();
52 let _ = loader.load_quads(iter);
53 let _ = loader.commit();
54 }
55
56 pub fn add_graph<'a>(
57 &self,
58 graph: impl Into<GraphNameRef<'a>>,
59 triples: impl Iterator<Item = Triple>,
60 ) {
61 let graph = graph.into();
62 let _ = self.store.clear_graph(graph);
63 self.add_quads(triples.map(|t| Quad {
64 subject: t.subject,
65 predicate: t.predicate,
66 object: t.object,
67 graph_name: graph.into_owned(),
68 }));
69 }
70
71 #[must_use]
72 pub fn los<E: AsyncEngine>(&self, s: &SymbolUri, problems: bool) -> Option<sparql::LOIter<'_>> {
73 let q = sparql::lo_query(s, problems);
74 self.query::<E>(q).ok().and_then(|s| {
75 if let sparql::QueryResults::Solutions(s) = s.0 {
76 Some(sparql::LOIter { inner: s })
77 } else {
78 None
79 }
80 })
81 }
82
83 pub fn export(&self, iter: impl Iterator<Item = Triple>, p: &Path, uri: &DocumentUri) {
84 if let Ok(file) = std::fs::File::create(p) {
85 let writer = std::io::BufWriter::new(file);
86 let iri = uri.path_uri().to_iri();
87 let ns = iri.as_str();
88 let mut writer = unsafe {
90 oxigraph::io::RdfSerializer::from_format(oxigraph::io::RdfFormat::Turtle)
91 .with_prefix("ns", ns)
92 .unwrap_unchecked()
93 .with_prefix("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns")
94 .unwrap_unchecked()
95 .with_prefix("ulo", "http://mathhub.info/ulo")
96 .unwrap_unchecked()
97 .with_prefix("dc", "http://purl.org/dc/terms")
98 .unwrap_unchecked()
99 .for_writer(writer)
100 };
101 for t in iter {
102 if let Err(e) = writer.serialize_triple(&t) {
103 tracing::warn!("Error serializing triple: {e:?}");
104 }
105 }
106 let _ = writer.finish();
107 }
108 }
109
110 pub fn query_str<E: AsyncEngine>(
113 &self,
114 s: impl AsRef<str>,
115 ) -> Result<sparql::QueryResult<'_>, SparqlError> {
116 let query = spargebra::SparqlParser::new()
125 .with_prefix("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#")
126 .expect("bug")
127 .with_prefix("rdfs", "http://www.w3.org/2000/01/rdf-schema#")
128 .expect("bug")
129 .with_prefix("dc", "http://purl.org/dc/terms#")
130 .expect("bug")
131 .with_prefix("ulo", "http://mathhub.info/ulo#")
132 .expect("bug")
133 .parse_query(s.as_ref())?;
134 self.query::<E>(query).map_err(Into::into)
136 }
137
138 pub fn query<E: AsyncEngine>(
140 &self,
141 q: spargebra::Query,
142 ) -> Result<sparql::QueryResult<'_>, sparql::QueryError> {
143 let token = oxigraph::sparql::CancellationToken::new();
146 let tk = token.clone();
147 E::exec_after(std::time::Duration::from_secs(5), move || tk.cancel());
148 let mut q = oxigraph::sparql::SparqlEvaluator::new()
157 .with_cancellation_token(token)
158 .for_query(q);
159 q.dataset_mut().set_default_graph_as_union();
160 Ok(q.on_store(&self.store).execute().map(sparql::QueryResult)?)
161 }
162
163 pub(crate) fn load(&self, path: &Path, graph: ulo::rdf_types::NamedNode) {
164 let Ok(file) = std::fs::File::open(path) else {
165 tracing::error!("Failed to open file {}", path.display());
166 return;
167 };
168 let _ = self.store.clear_graph(&graph);
169
170 let buf = std::io::BufReader::new(file);
171 let mut loader = self.store.bulk_loader();
172 let reader = oxigraph::io::RdfParser::from_format(oxigraph::io::RdfFormat::Turtle)
173 .with_default_graph(graph)
174 .for_reader(buf);
175 let _ = loader.load_quads(reader.filter_map(Result::ok));
176 let _ = loader.commit();
178 }
179
180 pub fn load_archives(&self, archives: &[Archive]) {
181 use rayon::prelude::*;
182 tracing::info_span!(target:"relational","loading relational","Loading relational for {} archives...",archives.len()).in_scope(move || {
183 let old = self.store.len().unwrap_or_default();
184 let all_files = archives
185 .par_iter()
186 .filter_map(|a| match a {
187 Archive::Local(a) => Some(a),
188 Archive::Ext(..) => None,
189 })
190 .filter_map(|a| {
191 let out = a.out_dir();
192 if out.exists() && out.is_dir() {
193 Some(
194 walkdir::WalkDir::new(out)
195 .into_iter()
196 .filter_map(Result::ok)
197 .filter(|entry| entry.file_name() == "index.ttl")
198 .filter_map(|e| {
199 let Some(graph) = Self::get_iri(a.uri(), out, &e) else {
200 println!("wut! {}",e.path().display());
201 return None
202 };
203 Some((e.into_path(), graph))
205 })
206 .collect::<Vec<_>>(),
207 )
208 } else {
220 None
221 }
222 })
223 .collect_vec_list();
224 for (i,path_graph) in all_files.into_iter().flatten().enumerate() {let mut loader = self.store.bulk_loader();
227 for (path,graph) in path_graph {
228 let Ok(file) = std::fs::File::open(&path) else {
229 tracing::error!("Failed to open file {}", path.display());
230 continue;
231 };
232 let buf = std::io::BufReader::new(file);
233 let reader = oxigraph::io::RdfParser::from_format(oxigraph::io::RdfFormat::Turtle)
234 .with_default_graph(graph)
235 .for_reader(buf);
236 let _ = loader.load_quads(reader.filter_map(Result::ok));
237 }
238 let _ = loader.commit();
239 }
240
241 tracing::info!(target:"relational","Loaded {} relations", self.store.len().unwrap_or_default() - old);
242 });
243 }
244
245 fn get_iri(
246 a: &ArchiveUri,
247 out: &Path,
248 e: &walkdir::DirEntry,
249 ) -> Option<ulo::rdf_types::NamedNode> {
250 let parent = e.path().parent()?;
251 let parentname = parent.file_name()?.to_str()?;
252 let language = Language::from_rel_path(parentname);
254 let parentname = parentname
255 .strip_suffix(&format!(".{language}"))
256 .unwrap_or(parentname);
257 let grandparent = parent.parent()?;
258 let path: Option<UriPath> = if grandparent == out {
259 None
260 } else {
261 Some(grandparent.relative_to(&out)?.parse().ok()?)
262 };
263 let doc: DocumentUri = (a.clone() / path) & (parentname.parse().ok()?, language);
265 Some(doc.to_iri())
266 }
267}
268
269impl std::fmt::Debug for RDFStore {
270 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271 f.debug_struct("RDFStore").finish()
272 }
273}
274impl Default for RDFStore {
275 fn default() -> Self {
276 let store = oxigraph::store::Store::new().unwrap_or_else(|_| unreachable!());
277 let mut loader = store.bulk_loader();
278 loader
279 .load_quads(ulo::ulo::QUADS.iter().copied())
280 .expect("error loading ulo base ontology; this is a bug.");
281 loader
282 .commit()
283 .expect("error loading ulo base ontology; this is a bug.");
284 Self { store }
285 }
286}
287
288fn normalize(query: &mut spargebra::Query) {
289 fn norm(n: &mut ulo::rdf_types::NamedNode) {
290 if n.as_str().starts_with("https:") {
291 *n = ulo::rdf_types::NamedNode::new_unchecked(format!("http:{}", &n.as_str()[6..]));
292 }
293 }
294 fn norm_i(n: &mut oxiri::Iri<String>) {
295 if n.starts_with("https:") {
296 *n = oxiri::Iri::parse_unchecked(format!("http:{}", &n.as_str()[6..]));
297 }
298 }
299 fn dset(d: &mut ::spargebra::algebra::QueryDataset) {
300 for n in d.default.iter_mut() {
301 norm(n);
302 }
303 if let Some(n) = &mut d.named {
304 for n in n.iter_mut() {
305 norm(n);
306 }
307 }
308 }
309 fn termpat(p: &mut spargebra::term::TermPattern) {
310 use spargebra::term::TermPattern as TP;
311 match p {
312 TP::BlankNode(_) | TP::Literal(_) | TP::Variable(_) => (),
313 TP::NamedNode(n) => norm(n),
314 TP::Triple(t) => trip(t),
315 }
316 }
317 fn nnpat(p: &mut spargebra::term::NamedNodePattern) {
318 if let spargebra::term::NamedNodePattern::NamedNode(n) = p {
319 norm(n);
320 }
321 }
322 fn trip(p: &mut spargebra::term::TriplePattern) {
323 termpat(&mut p.subject);
324 termpat(&mut p.object);
325 nnpat(&mut p.predicate);
326 }
327 fn expr(e: &mut spargebra::algebra::Expression) {
328 use spargebra::algebra::Expression as Exp;
329 match e {
330 Exp::NamedNode(n) => norm(n),
331 Exp::Or(a, b)
332 | Exp::Add(a, b)
333 | Exp::And(a, b)
334 | Exp::Divide(a, b)
335 | Exp::Equal(a, b)
336 | Exp::SameTerm(a, b)
337 | Exp::Greater(a, b)
338 | Exp::GreaterOrEqual(a, b)
339 | Exp::Less(a, b)
340 | Exp::Subtract(a, b)
341 | Exp::Multiply(a, b)
342 | Exp::LessOrEqual(a, b) => {
343 expr(a);
344 expr(b);
345 }
346 Exp::In(a, v) => {
347 expr(a);
348 for e in v {
349 expr(e);
350 }
351 }
352 Exp::UnaryPlus(e) | Exp::UnaryMinus(e) | Exp::Not(e) => expr(e),
353 Exp::Exists(p) => pat(p),
354 Exp::If(a, b, c) => {
355 expr(a);
356 expr(b);
357 expr(c);
358 }
359 Exp::Coalesce(e) | Exp::FunctionCall(_, e) => {
360 for e in e {
361 expr(e)
362 }
363 }
364 Exp::Bound(_) | Exp::Literal(_) | Exp::Variable(_) => (),
365 }
366 }
367 fn ppexpr(e: &mut spargebra::algebra::PropertyPathExpression) {
368 use spargebra::algebra::PropertyPathExpression as Exp;
369 match e {
370 Exp::NamedNode(n) => norm(n),
371 Exp::Reverse(n) | Exp::ZeroOrMore(n) | Exp::OneOrMore(n) | Exp::ZeroOrOne(n) => {
372 ppexpr(n)
373 }
374 Exp::Sequence(a, b) | Exp::Alternative(a, b) => {
375 ppexpr(a);
376 ppexpr(b);
377 }
378 Exp::NegatedPropertySet(v) => {
379 for e in v {
380 norm(e);
381 }
382 }
383 }
384 }
385 fn pat(p: &mut spargebra::algebra::GraphPattern) {
386 use spargebra::algebra::GraphPattern as GP;
387 match p {
388 GP::Bgp { patterns } => {
389 for p in patterns {
390 trip(p)
391 }
392 }
393 GP::Distinct { inner } => pat(inner),
394 GP::Extend {
395 inner, expression, ..
396 } => {
397 pat(inner);
398 expr(expression);
399 }
400 GP::Filter { expr: exp, inner } => {
401 expr(exp);
402 pat(inner);
403 }
404 GP::Graph { name, inner } | GP::Service { name, inner, .. } => {
405 nnpat(name);
406 pat(inner);
407 }
408 GP::Group {
409 inner, aggregates, ..
410 } => {
411 pat(inner);
412 for (_, e) in aggregates {
413 if let spargebra::algebra::AggregateExpression::FunctionCall {
414 expr: e, ..
415 } = e
416 {
417 expr(e);
418 }
419 }
420 }
421 GP::Join { left, right }
422 | GP::Lateral { left, right }
423 | GP::Minus { left, right }
424 | GP::Union { left, right } => {
425 pat(left);
426 pat(right);
427 }
428 GP::LeftJoin {
429 left,
430 right,
431 expression,
432 } => {
433 pat(left);
434 pat(right);
435 if let Some(e) = expression {
436 expr(e);
437 }
438 }
439 GP::OrderBy { inner, expression } => {
440 pat(inner);
441 for e in expression {
442 match e {
443 spargebra::algebra::OrderExpression::Asc(e)
444 | spargebra::algebra::OrderExpression::Desc(e) => expr(e),
445 }
446 }
447 }
448 GP::Path {
449 subject,
450 path,
451 object,
452 } => {
453 termpat(subject);
454 termpat(object);
455 ppexpr(path);
456 }
457 GP::Project { inner, .. } | GP::Reduced { inner } | GP::Slice { inner, .. } => {
458 pat(inner);
459 }
460 GP::Values { .. } => (),
461 }
462 }
463 use spargebra::Query as Q;
464 match query {
465 Q::Ask {
466 dataset,
467 pattern,
468 base_iri,
469 }
470 | Q::Describe {
471 dataset,
472 pattern,
473 base_iri,
474 }
475 | Q::Select {
476 dataset,
477 pattern,
478 base_iri,
479 } => {
480 if let Some(d) = dataset {
481 dset(d);
482 }
483 if let Some(iri) = base_iri {
484 norm_i(iri);
485 }
486 pat(pattern);
487 }
488 Q::Construct {
489 template,
490 dataset,
491 pattern,
492 base_iri,
493 } => {
494 if let Some(d) = dataset {
495 dset(d);
496 }
497 if let Some(iri) = base_iri {
498 norm_i(iri);
499 }
500 for t in template {
501 trip(t);
502 }
503 pat(pattern);
504 }
505 }
506}