flams_math_archives/triple_store/
mod.rs1pub mod sparql;
2
3use ftml_uris::{ArchiveUri, DocumentUri, FtmlUri, Language, SymbolUri, UriPath, UriWithPath};
4use std::{path::Path, str::FromStr};
5use ulo::rdf_types::{Quad, Triple};
6
7use crate::{
8 Archive, LocallyBuilt, MathArchive,
9 utils::{AsyncEngine, path_ext::PathExt},
10};
11
12pub struct RDFStore {
13 store: oxigraph::store::Store,
14}
15
16#[derive(thiserror::Error, Debug)]
17pub enum SparqlError {
18 #[error("sparql syntax error: {0}")]
19 Syntax(#[from] sparql::SparqlSyntaxError),
20 #[error("sparql query error: {0}")]
21 Query(#[from] sparql::QueryError),
22}
23
24impl RDFStore {
25 #[cfg(feature = "rocksdb")]
26 pub fn new(path: &Path) -> Self {
27 let _ = std::fs::remove_dir_all(path);
28 let store = oxigraph::store::Store::open(path).expect("failed to open rdf database");
29 store.clear();
30 let mut loader = store.bulk_loader();
31 loader
32 .load_quads(ulo::ulo::QUADS.iter().copied())
33 .expect("error loading ulo base ontology; this is a bug.");
34 loader
35 .commit()
36 .expect("error loading ulo base ontology; this is a bug.");
37 Self { store }
38 }
39
40 #[inline]
41 pub fn clear(&self) {
42 let _ = self.store.clear();
43 }
44 #[inline]
45 #[must_use]
46 pub fn num_relations(&self) -> usize {
47 self.store.len().unwrap_or_default()
48 }
49 pub fn add_quads(&self, iter: impl Iterator<Item = Quad>) {
50 let mut loader = self.store.bulk_loader();
51 let _ = loader.load_quads(iter);
52 let _ = loader.commit();
53 }
54
55 #[must_use]
56 pub fn los<E: AsyncEngine>(&self, s: &SymbolUri, problems: bool) -> Option<sparql::LOIter<'_>> {
57 let q = sparql::lo_query(s, problems);
58 self.query::<E>(q).ok().and_then(|s| {
59 if let sparql::QueryResults::Solutions(s) = s.0 {
60 Some(sparql::LOIter { inner: s })
61 } else {
62 None
63 }
64 })
65 }
66
67 pub fn export(&self, iter: impl Iterator<Item = Triple>, p: &Path, uri: &DocumentUri) {
68 if let Ok(file) = std::fs::File::create(p) {
69 let writer = std::io::BufWriter::new(file);
70 let iri = uri.path_uri().to_iri();
71 let ns = iri.as_str();
72 let mut writer = unsafe {
74 oxigraph::io::RdfSerializer::from_format(oxigraph::io::RdfFormat::Turtle)
75 .with_prefix("ns", ns)
76 .unwrap_unchecked()
77 .with_prefix("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns")
78 .unwrap_unchecked()
79 .with_prefix("ulo", "http://mathhub.info/ulo")
80 .unwrap_unchecked()
81 .with_prefix("dc", "http://purl.org/dc/terms")
82 .unwrap_unchecked()
83 .for_writer(writer)
84 };
85 for t in iter {
86 if let Err(e) = writer.serialize_triple(&t) {
87 tracing::warn!("Error serializing triple: {e:?}");
88 }
89 }
90 let _ = writer.finish();
91 }
92 }
93
94 pub fn query_str<E: AsyncEngine>(
96 &self,
97 s: impl AsRef<str>,
98 ) -> Result<sparql::QueryResult<'_>, SparqlError> {
99 let query = spargebra::SparqlParser::new()
108 .with_prefix("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#")
109 .expect("bug")
110 .with_prefix("rdfs", "http://www.w3.org/2000/01/rdf-schema#")
111 .expect("bug")
112 .with_prefix("dc", "http://purl.org/dc/terms#")
113 .expect("bug")
114 .with_prefix("ulo", "http://mathhub.info/ulo#")
115 .expect("bug")
116 .parse_query(s.as_ref())?;
117 self.query::<E>(query).map_err(Into::into)
119 }
120
121 pub fn query<E: AsyncEngine>(
123 &self,
124 mut q: spargebra::Query,
125 ) -> Result<sparql::QueryResult<'_>, sparql::QueryError> {
126 normalize(&mut q);
127
128 let token = oxigraph::sparql::CancellationToken::new();
129 let tk = token.clone();
130 E::exec_after(std::time::Duration::from_secs(5), move || tk.cancel());
131 let mut q = oxigraph::sparql::SparqlEvaluator::new()
140 .with_cancellation_token(token)
141 .for_query(q);
142 q.dataset_mut().set_default_graph_as_union();
143 Ok(q.on_store(&self.store).execute().map(sparql::QueryResult)?)
144 }
145
146 pub(crate) fn load(&self, path: &Path, graph: ulo::rdf_types::NamedNode) {
147 let Ok(file) = std::fs::File::open(path) else {
148 tracing::error!("Failed to open file {}", path.display());
149 return;
150 };
151 let buf = std::io::BufReader::new(file);
152 let mut loader = self.store.bulk_loader();
153 let reader = oxigraph::io::RdfParser::from_format(oxigraph::io::RdfFormat::Turtle)
154 .with_default_graph(graph)
155 .for_reader(buf);
156 let _ = loader.load_quads(reader.filter_map(Result::ok));
157 let _ = loader.commit();
158 }
159
160 pub fn load_archives(&self, archives: &[Archive]) {
161 use rayon::prelude::*;
162 tracing::info_span!(target:"relational","loading relational","Loading relational for {} archives...",archives.len()).in_scope(move || {
163 let old = self.store.len().unwrap_or_default();
164 let all_files = archives
165 .par_iter()
166 .filter_map(|a| match a {
167 Archive::Local(a) => Some(a),
168 Archive::Ext(..) => None,
169 })
170 .filter_map(|a| {
171 let out = a.out_dir();
172 if out.exists() && out.is_dir() {
173 Some(
174 walkdir::WalkDir::new(out)
175 .into_iter()
176 .filter_map(Result::ok)
177 .filter(|entry| entry.file_name() == "index.ttl")
178 .filter_map(|e| {
179 let graph = Self::get_iri(a.uri(), out, &e)?;
180 Some((e.into_path(), graph))
181 })
182 .collect::<Vec<_>>(),
183 )
184 } else {
196 None
197 }
198 })
199 .collect_vec_list();
200 for (i,path_graph) in all_files.into_iter().flatten().enumerate() {tracing::info!("Loading {}",i+1);
202 let mut loader = self.store.bulk_loader();
203 for (path,graph) in path_graph {
204 let Ok(file) = std::fs::File::open(&path) else {
205 tracing::error!("Failed to open file {}", path.display());
206 continue;
207 };
208 let buf = std::io::BufReader::new(file);
209 let reader = oxigraph::io::RdfParser::from_format(oxigraph::io::RdfFormat::Turtle)
210 .with_default_graph(graph)
211 .for_reader(buf);
212 let _ = loader.load_quads(reader.filter_map(Result::ok));
213 }
214 let _ = loader.commit();
215 }
216
217 tracing::info!(target:"relational","Loaded {} relations", self.store.len().unwrap_or_default() - old);
218 });
219 }
220
221 fn get_iri(
222 a: &ArchiveUri,
223 out: &Path,
224 e: &walkdir::DirEntry,
225 ) -> Option<ulo::rdf_types::NamedNode> {
226 let parent = e.path().parent()?;
227 let parentname = parent.file_name()?.to_str()?;
228 let parentname = parentname.rsplit_once('.').map_or(parentname, |(s, _)| s);
229 let language = Language::from_rel_path(parentname);
230 let parentname = parentname
231 .strip_suffix(&format!(".{language}"))
232 .unwrap_or(parentname);
233 let path: UriPath = parent.parent()?.relative_to(&out)?.parse().ok()?;
234 let doc: DocumentUri = (a.clone() / path) & (parentname.parse().ok()?, language);
235 Some(doc.to_iri())
236 }
237}
238
239impl std::fmt::Debug for RDFStore {
240 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241 f.debug_struct("RDFStore").finish()
242 }
243}
244impl Default for RDFStore {
245 fn default() -> Self {
246 let store = oxigraph::store::Store::new().unwrap_or_else(|_| unreachable!());
247 let mut loader = store.bulk_loader();
248 loader
249 .load_quads(ulo::ulo::QUADS.iter().copied())
250 .expect("error loading ulo base ontology; this is a bug.");
251 loader
252 .commit()
253 .expect("error loading ulo base ontology; this is a bug.");
254 Self { store }
255 }
256}
257
258fn normalize(query: &mut spargebra::Query) {
259 fn norm(n: &mut ulo::rdf_types::NamedNode) {
260 if n.as_str().starts_with("https:") {
261 *n = ulo::rdf_types::NamedNode::new_unchecked(format!("http:{}", &n.as_str()[6..]));
262 }
263 }
264 fn norm_i(n: &mut oxiri::Iri<String>) {
265 if n.starts_with("https:") {
266 *n = oxiri::Iri::parse_unchecked(format!("http:{}", &n.as_str()[6..]));
267 }
268 }
269 fn dset(d: &mut ::spargebra::algebra::QueryDataset) {
270 for n in d.default.iter_mut() {
271 norm(n);
272 }
273 if let Some(n) = &mut d.named {
274 for n in n.iter_mut() {
275 norm(n);
276 }
277 }
278 }
279 fn termpat(p: &mut spargebra::term::TermPattern) {
280 use spargebra::term::TermPattern as TP;
281 match p {
282 TP::BlankNode(_) | TP::Literal(_) | TP::Variable(_) => (),
283 TP::NamedNode(n) => norm(n),
284 TP::Triple(t) => trip(t),
285 }
286 }
287 fn nnpat(p: &mut spargebra::term::NamedNodePattern) {
288 if let spargebra::term::NamedNodePattern::NamedNode(n) = p {
289 norm(n);
290 }
291 }
292 fn trip(p: &mut spargebra::term::TriplePattern) {
293 termpat(&mut p.subject);
294 termpat(&mut p.object);
295 nnpat(&mut p.predicate);
296 }
297 fn expr(e: &mut spargebra::algebra::Expression) {
298 use spargebra::algebra::Expression as Exp;
299 match e {
300 Exp::NamedNode(n) => norm(n),
301 Exp::Or(a, b)
302 | Exp::Add(a, b)
303 | Exp::And(a, b)
304 | Exp::Divide(a, b)
305 | Exp::Equal(a, b)
306 | Exp::SameTerm(a, b)
307 | Exp::Greater(a, b)
308 | Exp::GreaterOrEqual(a, b)
309 | Exp::Less(a, b)
310 | Exp::Subtract(a, b)
311 | Exp::Multiply(a, b)
312 | Exp::LessOrEqual(a, b) => {
313 expr(a);
314 expr(b);
315 }
316 Exp::In(a, v) => {
317 expr(a);
318 for e in v {
319 expr(e);
320 }
321 }
322 Exp::UnaryPlus(e) | Exp::UnaryMinus(e) | Exp::Not(e) => expr(e),
323 Exp::Exists(p) => pat(p),
324 Exp::If(a, b, c) => {
325 expr(a);
326 expr(b);
327 expr(c);
328 }
329 Exp::Coalesce(e) | Exp::FunctionCall(_, e) => {
330 for e in e {
331 expr(e)
332 }
333 }
334 Exp::Bound(_) | Exp::Literal(_) | Exp::Variable(_) => (),
335 }
336 }
337 fn ppexpr(e: &mut spargebra::algebra::PropertyPathExpression) {
338 use spargebra::algebra::PropertyPathExpression as Exp;
339 match e {
340 Exp::NamedNode(n) => norm(n),
341 Exp::Reverse(n) | Exp::ZeroOrMore(n) | Exp::OneOrMore(n) | Exp::ZeroOrOne(n) => {
342 ppexpr(n)
343 }
344 Exp::Sequence(a, b) | Exp::Alternative(a, b) => {
345 ppexpr(a);
346 ppexpr(b);
347 }
348 Exp::NegatedPropertySet(v) => {
349 for e in v {
350 norm(e);
351 }
352 }
353 }
354 }
355 fn pat(p: &mut spargebra::algebra::GraphPattern) {
356 use spargebra::algebra::GraphPattern as GP;
357 match p {
358 GP::Bgp { patterns } => {
359 for p in patterns {
360 trip(p)
361 }
362 }
363 GP::Distinct { inner } => pat(inner),
364 GP::Extend {
365 inner, expression, ..
366 } => {
367 pat(inner);
368 expr(expression);
369 }
370 GP::Filter { expr: exp, inner } => {
371 expr(exp);
372 pat(inner);
373 }
374 GP::Graph { name, inner } | GP::Service { name, inner, .. } => {
375 nnpat(name);
376 pat(inner);
377 }
378 GP::Group {
379 inner, aggregates, ..
380 } => {
381 pat(inner);
382 for (_, e) in aggregates {
383 if let spargebra::algebra::AggregateExpression::FunctionCall {
384 expr: e, ..
385 } = e
386 {
387 expr(e);
388 }
389 }
390 }
391 GP::Join { left, right }
392 | GP::Lateral { left, right }
393 | GP::Minus { left, right }
394 | GP::Union { left, right } => {
395 pat(left);
396 pat(right);
397 }
398 GP::LeftJoin {
399 left,
400 right,
401 expression,
402 } => {
403 pat(left);
404 pat(right);
405 if let Some(e) = expression {
406 expr(e);
407 }
408 }
409 GP::OrderBy { inner, expression } => {
410 pat(inner);
411 for e in expression {
412 match e {
413 spargebra::algebra::OrderExpression::Asc(e)
414 | spargebra::algebra::OrderExpression::Desc(e) => expr(e),
415 }
416 }
417 }
418 GP::Path {
419 subject,
420 path,
421 object,
422 } => {
423 termpat(subject);
424 termpat(object);
425 ppexpr(path);
426 }
427 GP::Project { inner, .. } | GP::Reduced { inner } | GP::Slice { inner, .. } => {
428 pat(inner);
429 }
430 GP::Values { .. } => (),
431 }
432 }
433 use spargebra::Query as Q;
434 match query {
435 Q::Ask {
436 dataset,
437 pattern,
438 base_iri,
439 }
440 | Q::Describe {
441 dataset,
442 pattern,
443 base_iri,
444 }
445 | Q::Select {
446 dataset,
447 pattern,
448 base_iri,
449 } => {
450 if let Some(d) = dataset {
451 dset(d);
452 }
453 if let Some(iri) = base_iri {
454 norm_i(iri);
455 }
456 pat(pattern);
457 }
458 Q::Construct {
459 template,
460 dataset,
461 pattern,
462 base_iri,
463 } => {
464 if let Some(d) = dataset {
465 dset(d);
466 }
467 if let Some(iri) = base_iri {
468 norm_i(iri);
469 }
470 for t in template {
471 trip(t);
472 }
473 pat(pattern);
474 }
475 }
476}