feat/orm
Laurin Weger 2 weeks ago
parent bce06e5a05
commit 576ba213ee
No known key found for this signature in database
GPG Key ID: 9B372BB0B792770F
  1. 112
      nextgraph/src/tests/orm.rs
  2. 3
      ng-verifier/src/orm/add_remove_triples.rs
  3. 365
      ng-verifier/src/orm/mod.rs
  4. 4
      ng-verifier/src/orm/types.rs
  5. 294
      ng-verifier/src/orm/utils.rs
  6. 13
      ng-verifier/src/orm/validation.rs

@ -9,13 +9,13 @@
use crate::local_broker::{doc_create, doc_sparql_construct, doc_sparql_update, orm_start};
use crate::tests::create_or_open_wallet::create_or_open_wallet;
use async_std::stream::{IntoStream, StreamExt};
use async_std::stream::StreamExt;
use ng_net::app_protocol::{AppResponse, AppResponseV0, NuriV0};
use ng_net::orm::{
BasicType, OrmSchemaDataType, OrmSchemaLiteralType, OrmSchemaPredicate, OrmSchemaShape,
OrmShapeType,
};
use ng_verifier::orm::shape_type_to_sparql;
use ng_verifier::orm::utils::shape_type_to_sparql;
use ng_repo::log_info;
use std::collections::HashMap;
@ -592,39 +592,6 @@ INSERT DATA {
// Define the ORM schema
let mut schema = HashMap::new();
schema.insert(
"http://example.org/TestObject||http://example.org/anotherObject".to_string(),
Arc::new(OrmSchemaShape {
iri: "http://example.org/TestObject||http://example.org/anotherObject".to_string(),
predicates: vec![
Arc::new(OrmSchemaPredicate {
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::string,
literals: None,
shape: None,
}],
iri: "http://example.org/prop1".to_string(),
readablePredicate: "prop1".to_string(),
maxCardinality: 1,
minCardinality: 1,
extra: None,
}),
Arc::new(OrmSchemaPredicate {
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::number,
literals: None,
shape: None,
}],
iri: "http://example.org/prop2".to_string(),
readablePredicate: "prop2".to_string(),
maxCardinality: 1,
minCardinality: 1,
extra: None,
}),
],
}),
);
schema.insert(
"http://example.org/TestObject".to_string(),
Arc::new(OrmSchemaShape {
@ -692,36 +659,6 @@ INSERT DATA {
minCardinality: 0,
extra: None,
}),
Arc::new(OrmSchemaPredicate {
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::shape,
literals: None,
shape: Some(
"http://example.org/TestObject||http://example.org/objectValue"
.to_string(),
),
}],
iri: "http://example.org/objectValue".to_string(),
readablePredicate: "objectValue".to_string(),
maxCardinality: 1,
minCardinality: 1,
extra: None,
}),
Arc::new(OrmSchemaPredicate {
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::shape,
literals: None,
shape: Some(
"http://example.org/TestObject||http://example.org/anotherObject"
.to_string(),
),
}],
iri: "http://example.org/anotherObject".to_string(),
readablePredicate: "anotherObject".to_string(),
maxCardinality: -1,
minCardinality: 0,
extra: None,
}),
Arc::new(OrmSchemaPredicate {
dataTypes: vec![
OrmSchemaDataType {
@ -760,51 +697,6 @@ INSERT DATA {
}),
);
schema.insert(
"http://example.org/TestObject||http://example.org/objectValue".to_string(),
Arc::new(OrmSchemaShape {
iri: "http://example.org/TestObject||http://example.org/objectValue".to_string(),
predicates: vec![
Arc::new(OrmSchemaPredicate {
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::string,
literals: None,
shape: None,
}],
iri: "http://example.org/nestedString".to_string(),
readablePredicate: "nestedString".to_string(),
maxCardinality: 1,
minCardinality: 1,
extra: None,
}),
Arc::new(OrmSchemaPredicate {
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::number,
literals: None,
shape: None,
}],
iri: "http://example.org/nestedNum".to_string(),
readablePredicate: "nestedNum".to_string(),
maxCardinality: 1,
minCardinality: 1,
extra: None,
}),
Arc::new(OrmSchemaPredicate {
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::number,
literals: None,
shape: None,
}],
iri: "http://example.org/nestedArray".to_string(),
readablePredicate: "nestedArray".to_string(),
maxCardinality: -1,
minCardinality: 0,
extra: None,
}),
],
}),
);
let shape_type = OrmShapeType {
schema,
shape: "http://example.org/TestObject".to_string(),

@ -78,6 +78,7 @@ pub fn add_remove_triples(
// Process added triples.
// For each triple, check if it matches the shape.
// In parallel, we record the values added and removed (tracked_changes)
log_debug!("Processing # triples: {}", triples_added.len());
for triple in triples_added {
let obj_term = oxrdf_term_to_orm_basic_type(&triple.object);
log_debug!("processing triple {triple}");
@ -175,10 +176,10 @@ pub fn add_remove_triples(
);
tracked_predicate.tracked_children.push(tracked_child_arc);
}
}
log_debug!("end of dealing with nesting");
}
}
}
// Process removed triples.
for triple in triples_removed {
let pred_iri = triple.predicate.as_str();

@ -9,11 +9,11 @@
pub mod add_remove_triples;
pub mod types;
pub mod utils;
pub mod validation;
use futures::channel::mpsc;
use ng_oxigraph::oxrdf::Subject;
use ng_repo::types::OverlayId;
use futures::channel::mpsc::UnboundedSender;
use std::collections::HashMap;
use std::collections::HashSet;
@ -22,26 +22,22 @@ use std::sync::RwLock;
use std::u64;
use futures::SinkExt;
use lazy_static::lazy_static;
pub use ng_net::orm::{OrmDiff, OrmShapeType};
use ng_net::utils::Receiver;
use ng_net::{app_protocol::*, orm::*};
use ng_oxigraph::oxigraph::sparql::{Query, QueryResults};
use ng_oxigraph::oxrdf::Triple;
use ng_repo::errors::NgError;
use ng_repo::errors::VerifierError;
use ng_repo::log::*;
use regex::Regex;
use serde_json::json;
use serde_json::Value;
use crate::orm::add_remove_triples::add_remove_triples;
use crate::orm::types::*;
use crate::orm::utils::*;
use crate::types::*;
use crate::verifier::*;
type ShapeIri = String;
type SubjectIri = String;
// Structure to store changes in. By shape iri > subject iri > OrmTrackedSubjectChange
// **NOTE**: In comparison to OrmSubscription.tracked_subjects, the outer hashmap's keys are shape IRIs.
// (shape IRI -> (subject IRI -> OrmTrackedSubjectChange))
@ -61,9 +57,8 @@ impl Verifier {
// );
//let base = NuriV0::repo_id(&repo.id);
log_debug!("querying construct\n{}\n\n", query);
let nuri_str = nuri.as_ref().map(|s| s.as_str());
log_debug!("querying construct\n{}\n{}\n\n", nuri_str.unwrap(), query);
let parsed =
Query::parse(&query, nuri_str).map_err(|e| NgError::OxiGraphError(e.to_string()))?;
@ -95,6 +90,7 @@ impl Verifier {
session_id: u64,
triples_added: &[Triple],
triples_removed: &[Triple],
data_already_fetched: bool,
) -> Result<OrmChanges, NgError> {
let mut orm_changes = HashMap::new();
@ -112,7 +108,6 @@ impl Verifier {
})
.collect();
log_debug!("process_changes_for_nuri_and_session {:?}", shapes);
for root_shape in shapes {
self.process_changes_for_shape_and_session(
nuri,
@ -121,6 +116,7 @@ impl Verifier {
triples_added,
triples_removed,
&mut orm_changes,
data_already_fetched,
)?;
}
@ -137,6 +133,7 @@ impl Verifier {
triples_added: &[Triple],
triples_removed: &[Triple],
orm_changes: &mut OrmChanges,
data_already_fetched: bool,
) -> Result<(), NgError> {
// First in, last out stack to keep track of objects to validate (nested objects first). Strings are object IRIs.
let mut shape_validation_queue: Vec<(Arc<OrmSchemaShape>, Vec<String>)> = vec![];
@ -147,6 +144,10 @@ impl Verifier {
// For a given shape, we evaluate every subject against that shape.
while let Some((shape, objects_to_validate)) = shape_validation_queue.pop() {
// Collect triples relevant for validation.
log_debug!(
"process_changes_for_shape_and_session triples_added: {:?}",
triples_added
);
let added_triples_by_subject =
group_by_subject_for_shape(&shape, triples_added, &objects_to_validate);
let removed_triples_by_subject =
@ -168,11 +169,8 @@ impl Verifier {
let mut nested_objects_to_eval: HashMap<ShapeIri, Vec<(SubjectIri, bool)>> =
HashMap::new();
log_debug!(
"processing_changes_for_shape_and_session for shape {:?}",
shape
);
// For each subject, add/remove triples and validate.
log_debug!("all_modified_subjects: {:?}", all_modified_subjects);
for subject_iri in all_modified_subjects {
let triples_added_for_subj = added_triples_by_subject
@ -240,27 +238,29 @@ impl Verifier {
}
}
// Now, we fetch all un-fetched subjects for re-evaluation.
// Now, we queue all non-evaluated objects
for (shape_iri, objects_to_eval) in &nested_objects_to_eval {
let objects_to_fetch = objects_to_eval
.iter()
.filter(|(_iri, needs_fetch)| *needs_fetch)
.map(|(s, _)| s.clone())
.collect();
let orm_subscription =
self.get_first_orm_subscription_for(nuri, Some(&shape.iri), Some(&session_id));
// Extract schema and shape Arc before mutable borrow
let schema = orm_subscription.shape_type.schema.clone();
let shape_arc = schema.get(shape_iri).unwrap().clone();
// Data might need to be fetched (if it has not been during initialization or nested shape fetch).
if !data_already_fetched {
let objects_to_fetch = objects_to_eval
.iter()
.filter(|(_iri, needs_fetch)| *needs_fetch)
.map(|(s, _)| s.clone())
.collect();
// Create sparql query
let shape_query =
shape_type_to_sparql(&schema, &shape_iri, Some(objects_to_fetch))?;
let new_triples =
self.query_sparql_construct(shape_query, Some(nuri_to_string(nuri)))?;
self.query_sparql_construct(shape_query, Some(nuri.repo()))?;
// Recursively process nested objects.
self.process_changes_for_shape_and_session(
nuri,
shape_arc.clone(),
@ -268,16 +268,22 @@ impl Verifier {
&new_triples,
&vec![],
orm_changes,
true,
)?;
}
let objects_not_to_fetch = objects_to_eval
// Add objects
let objects_not_to_fetch: Vec<String> = objects_to_eval
.iter()
.filter(|(_iri, needs_fetch)| !*needs_fetch)
.map(|(s, _)| s.clone())
.collect();
if objects_not_to_fetch.len() > 0 {
// Queue all objects that don't need fetching.
shape_validation_queue.push((shape_arc, objects_not_to_fetch));
}
}
}
Ok(())
}
@ -327,6 +333,7 @@ impl Verifier {
triples_removed: &[Triple],
nuri: &NuriV0,
only_for_session_id: Option<u64>,
data_already_fetched: bool,
) -> Result<OrmChanges, NgError> {
log_debug!("apply_triple_changes {:?}", only_for_session_id);
// If we have a specific session, handle only that subscription.
@ -336,6 +343,7 @@ impl Verifier {
session_id,
triples_added,
triples_removed,
data_already_fetched,
);
}
@ -356,6 +364,7 @@ impl Verifier {
session_id,
triples_added,
triples_removed,
data_already_fetched,
)?;
for (shape_iri, subj_map) in changes {
@ -498,14 +507,12 @@ impl Verifier {
session_id: u64,
shape_type: &OrmShapeType,
) -> Result<Value, NgError> {
log_debug!("create_orm_object_for_shape {:?}", shape_type);
// Query triples for this shape
let shape_query = shape_type_to_sparql(&shape_type.schema, &shape_type.shape, None)?;
let shape_triples = self.query_sparql_construct(shape_query, Some(nuri_to_string(nuri)))?;
log_debug!("query_sparql_construct done {:?}", shape_triples);
let shape_triples = self.query_sparql_construct(shape_query, Some(nuri.repo()))?;
let changes: OrmChanges =
self.apply_triple_changes(&shape_triples, &[], nuri, Some(session_id.clone()))?;
log_debug!("apply_triple_changes done {:?}", changes);
self.apply_triple_changes(&shape_triples, &[], nuri, Some(session_id.clone()), true)?;
let orm_subscription =
self.get_first_orm_subscription_for(nuri, Some(&shape_type.shape), Some(&session_id));
@ -519,10 +526,15 @@ impl Verifier {
let mut return_vals: Value = Value::Array(vec![]);
let return_val_vec = return_vals.as_array_mut().unwrap();
log_debug!(
"Tracked subjects:\n{:?}\n",
orm_subscription.tracked_subjects,
);
// For each valid change struct, we build an orm object.
// The way we get the changes from the tracked subjects is a bit hacky, sorry.
for (subject_iri, tracked_subjects_by_shape) in &orm_subscription.tracked_subjects {
if let Some(tracked_subject) = tracked_subjects_by_shape.get(&shape_type.shape) {
log_info!("changes for : {:?}\n{:?}", tracked_subject, changes);
if tracked_subject.read().unwrap().valid == OrmTrackedSubjectValidity::Valid {
if let Some(change) = changes
.get(&shape_type.shape)
@ -534,6 +546,7 @@ impl Verifier {
root_shape,
&orm_subscription.tracked_subjects,
);
log_debug!("Materialized change:\n{:?}\ninto:\n{:?}", change, new_val);
return_val_vec.push(new_val);
}
}
@ -556,21 +569,23 @@ impl Verifier {
pub(crate) async fn push_orm_response(
&mut self,
subscription: &Arc<OrmSubscription>,
nuri: &NuriV0,
session_id: u64,
sender: UnboundedSender<AppResponse>,
response: AppResponse,
) {
log_debug!(
"sending orm response for session {}:\n{:?}",
subscription.session_id,
session_id,
&response
);
if subscription.sender.is_closed() {
log_debug!("closed so removing session {}", subscription.session_id);
if sender.is_closed() {
log_debug!("closed so removing session {}", session_id);
self.orm_subscriptions.remove(&subscription.nuri);
self.orm_subscriptions.remove(&nuri);
} else {
subscription.sender.clone().send(response);
sender.clone().send(response);
}
}
@ -603,16 +618,22 @@ impl Verifier {
tracked_subjects: HashMap::new(),
nuri: nuri.clone(),
};
self.orm_subscriptions
.entry(nuri.clone())
.or_insert(vec![])
.push(orm_subscription);
let _orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type);
let _orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type)?;
log_debug!("create_orm_object_for_shape return {:?}", _orm_objects);
// TODO integrate response
//self.push_orm_response().await; (only for requester, not all sessions)
self.push_orm_response(
&nuri.clone(),
session_id,
tx.clone(),
AppResponse::V0(AppResponseV0::OrmInitial(_orm_objects)),
)
.await;
let close = Box::new(move || {
//log_debug!("CLOSE_CHANNEL of subscription for branch {}", branch_id);
@ -623,271 +644,3 @@ impl Verifier {
Ok((rx, close))
}
}
/// Heuristic:
/// Consider a string an IRI if it contains alphanumeric characters and then a colon within the first 13 characters
fn is_iri(s: &str) -> bool {
lazy_static! {
static ref IRI_REGEX: Regex = Regex::new(r"^[A-Za-z][A-Za-z0-9+\.\-]{1,12}:").unwrap();
}
IRI_REGEX.is_match(s)
}
fn literal_to_sparql_str(var: OrmSchemaDataType) -> Vec<String> {
match var.literals {
None => [].to_vec(),
Some(literals) => literals
.iter()
.map(|literal| match literal {
BasicType::Bool(val) => {
if *val {
"true".to_string()
} else {
"false".to_string()
}
}
BasicType::Num(number) => number.to_string(),
BasicType::Str(sting) => {
if is_iri(sting) {
format!("<{}>", sting)
} else {
format!("\"{}\"", escape_literal(sting))
}
}
})
.collect(),
}
}
pub fn shape_type_to_sparql(
schema: &OrmSchema,
shape: &ShapeIri,
filter_subjects: Option<Vec<String>>,
) -> Result<String, NgError> {
// Use a counter to generate unique variable names.
let mut var_counter = 0;
fn get_new_var_name(counter: &mut i32) -> String {
let name = format!("v{}", counter);
*counter += 1;
name
}
// Collect all statements to be added to the construct and where bodies.
let mut construct_statements = Vec::new();
let mut where_statements = Vec::new();
// Keep track of visited shapes while recursing to prevent infinite loops.
let mut visited_shapes: HashSet<ShapeIri> = HashSet::new();
// Recursive function to call for (nested) shapes.
fn process_shape(
schema: &OrmSchema,
shape: &OrmSchemaShape,
subject_var_name: &str,
construct_statements: &mut Vec<String>,
where_statements: &mut Vec<String>,
var_counter: &mut i32,
visited_shapes: &mut HashSet<String>,
) {
// Prevent infinite recursion on cyclic schemas.
// TODO: We could handle this as IRI string reference.
if visited_shapes.contains(&shape.iri) {
return;
}
visited_shapes.insert(shape.iri.clone());
// Add statements for each predicate.
for predicate in &shape.predicates {
let mut union_branches = Vec::new();
let mut allowed_literals = Vec::new();
// Predicate constraints might have more than one acceptable data type. Traverse each.
// It is assumed that constant literals, nested shapes and regular types are not mixed.
for datatype in &predicate.dataTypes {
if datatype.valType == OrmSchemaLiteralType::literal {
// Collect allowed literals and as strings
// (already in SPARQL-format, e.g. `"a astring"`, `<http:ex.co/>`, `true`, or `42`).
allowed_literals.extend(literal_to_sparql_str(datatype.clone()));
} else if datatype.valType == OrmSchemaLiteralType::shape {
let shape_iri = &datatype.shape.clone().unwrap();
let nested_shape = schema.get(shape_iri).unwrap();
// For the current acceptable shape, add CONSTRUCT, WHERE, and recurse.
// Each shape option gets its own var.
let obj_var_name = get_new_var_name(var_counter);
construct_statements.push(format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, obj_var_name
));
// Those are later added to a UNION, if there is more than one shape.
union_branches.push(format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, obj_var_name
));
// Recurse to add statements for nested object.
process_shape(
schema,
nested_shape,
&obj_var_name,
construct_statements,
where_statements,
var_counter,
visited_shapes,
);
}
}
// The where statement which might be wrapped in OPTIONAL.
let where_body: String;
if !allowed_literals.is_empty()
&& !predicate.extra.unwrap_or(false)
&& predicate.minCardinality > 0
{
// If we have literal requirements and they are not optional ("extra"),
// Add CONSTRUCT, WHERE, and FILTER.
let pred_var_name = get_new_var_name(var_counter);
construct_statements.push(format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, pred_var_name
));
where_body = format!(
" ?{s} <{p}> ?{o} . \n FILTER (?{o} IN ({lits}))",
s = subject_var_name,
p = predicate.iri,
o = pred_var_name,
lits = allowed_literals.join(", ")
);
} else if !union_branches.is_empty() {
// We have nested shape(s) which were already added to CONSTRUCT above.
// Join them with UNION.
where_body = union_branches
.into_iter()
.map(|b| format!("{{\n{}\n}}", b))
.collect::<Vec<_>>()
.join(" UNION ");
} else {
// Regular predicate data type. Just add basic CONSTRUCT and WHERE statements.
let pred_var_name = get_new_var_name(var_counter);
construct_statements.push(format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, pred_var_name
));
where_body = format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, pred_var_name
);
}
// Wrap in optional, if necessary.
if predicate.minCardinality < 1 {
where_statements.push(format!(" OPTIONAL {{\n{}\n }}", where_body));
} else {
where_statements.push(where_body);
};
}
visited_shapes.remove(&shape.iri);
}
let root_shape = schema.get(shape).ok_or(VerifierError::InvalidOrmSchema)?;
// Root subject variable name
let root_var_name = get_new_var_name(&mut var_counter);
process_shape(
schema,
root_shape,
&root_var_name,
&mut construct_statements,
&mut where_statements,
&mut var_counter,
&mut visited_shapes,
);
// Filter subjects, if present.
if let Some(subjects) = filter_subjects {
let subjects_str = subjects
.iter()
.map(|s| format!("<{}>", s))
.collect::<Vec<_>>()
.join(", ");
where_statements.push(format!(" FILTER (?s0 IN ({})", subjects_str));
}
// Create query from statements.
let construct_body = construct_statements.join(" .\n");
let where_body = where_statements.join(" .\n");
Ok(format!(
"CONSTRUCT {{\n{}\n}}\nWHERE {{\n{}\n}}",
construct_body, where_body
))
}
/// SPARQL literal escape: backslash, quotes, newlines, tabs.
fn escape_literal(lit: &str) -> String {
let mut out = String::with_capacity(lit.len() + 4);
for c in lit.chars() {
match c {
'\\' => out.push_str("\\\\"),
'\"' => out.push_str("\\\""),
'\n' => out.push_str("\\n"),
'\r' => out.push_str("\\r"),
'\t' => out.push_str("\\t"),
_ => out.push(c),
}
}
return out;
}
pub fn group_by_subject_for_shape<'a>(
shape: &OrmSchemaShape,
triples: &'a [Triple],
allowed_subjects: &[String],
) -> HashMap<String, Vec<&'a Triple>> {
let mut triples_by_subject: HashMap<String, Vec<&Triple>> = HashMap::new();
let allowed_preds_set: HashSet<&str> =
shape.predicates.iter().map(|p| p.iri.as_str()).collect();
let allowed_subject_set: HashSet<&str> = allowed_subjects.iter().map(|s| s.as_str()).collect();
for triple in triples {
// triple.subject must be in allowed_subjects (or allowed_subjects empty)
// and triple.predicate must be in allowed_preds.
if allowed_preds_set.contains(triple.predicate.as_str()) {
// filter subjects if list provided
let subj = match &triple.subject {
Subject::NamedNode(n) => n.as_ref(),
_ => continue,
};
// Subject must be in allowed subjects (or allowed_subjects is empty).
if allowed_subject_set.is_empty() || allowed_subject_set.contains(subj.as_str()) {
triples_by_subject
.entry(subj.to_string())
.or_insert_with(Vec::new)
.push(triple);
}
}
}
return triples_by_subject;
}
fn nuri_to_string(nuri: &NuriV0) -> String {
// Get repo_id and overlay_id from the nuri
let repo_id = nuri.target.repo_id();
let overlay_id = if let Some(overlay_link) = &nuri.overlay {
overlay_link.clone().try_into().unwrap()
} else {
// Default overlay for the repo
OverlayId::outer(repo_id)
};
let graph_name = NuriV0::repo_graph_name(repo_id, &overlay_id);
graph_name
}

@ -82,5 +82,5 @@ pub struct OrmSubscription {
pub sender: Sender<AppResponse>,
pub tracked_subjects: HashMap<SubjectIri, HashMap<ShapeIri, Arc<RwLock<OrmTrackedSubject>>>>,
}
type ShapeIri = String;
type SubjectIri = String;
pub type ShapeIri = String;
pub type SubjectIri = String;

@ -0,0 +1,294 @@
// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
// at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except
// according to those terms.
use ng_oxigraph::oxrdf::Subject;
use ng_repo::log::*;
use ng_repo::types::OverlayId;
use std::collections::HashMap;
use std::collections::HashSet;
use lazy_static::lazy_static;
pub use ng_net::orm::{OrmDiff, OrmShapeType};
use ng_net::{app_protocol::*, orm::*};
use ng_oxigraph::oxrdf::Triple;
use ng_repo::errors::NgError;
use ng_repo::errors::VerifierError;
use regex::Regex;
use crate::orm::types::*;
/// Heuristic:
/// Consider a string an IRI if it contains alphanumeric characters and then a colon within the first 13 characters
pub fn is_iri(s: &str) -> bool {
lazy_static! {
static ref IRI_REGEX: Regex = Regex::new(r"^[A-Za-z][A-Za-z0-9+\.\-]{1,12}:").unwrap();
}
IRI_REGEX.is_match(s)
}
pub fn literal_to_sparql_str(var: OrmSchemaDataType) -> Vec<String> {
match var.literals {
None => [].to_vec(),
Some(literals) => literals
.iter()
.map(|literal| match literal {
BasicType::Bool(val) => {
if *val {
"true".to_string()
} else {
"false".to_string()
}
}
BasicType::Num(number) => number.to_string(),
BasicType::Str(sting) => {
if is_iri(sting) {
format!("<{}>", sting)
} else {
format!("\"{}\"", escape_literal(sting))
}
}
})
.collect(),
}
}
pub fn shape_type_to_sparql(
schema: &OrmSchema,
shape: &ShapeIri,
filter_subjects: Option<Vec<String>>,
) -> Result<String, NgError> {
// Use a counter to generate unique variable names.
let mut var_counter = 0;
fn get_new_var_name(counter: &mut i32) -> String {
let name = format!("v{}", counter);
*counter += 1;
name
}
// Collect all statements to be added to the construct and where bodies.
let mut construct_statements = Vec::new();
let mut where_statements = Vec::new();
// Keep track of visited shapes while recursing to prevent infinite loops.
let mut visited_shapes: HashSet<ShapeIri> = HashSet::new();
// Recursive function to call for (nested) shapes.
fn process_shape(
schema: &OrmSchema,
shape: &OrmSchemaShape,
subject_var_name: &str,
construct_statements: &mut Vec<String>,
where_statements: &mut Vec<String>,
var_counter: &mut i32,
visited_shapes: &mut HashSet<String>,
) {
// Prevent infinite recursion on cyclic schemas.
// TODO: We could handle this as IRI string reference.
if visited_shapes.contains(&shape.iri) {
return;
}
visited_shapes.insert(shape.iri.clone());
// Add statements for each predicate.
for predicate in &shape.predicates {
let mut union_branches = Vec::new();
let mut allowed_literals = Vec::new();
// Predicate constraints might have more than one acceptable data type. Traverse each.
// It is assumed that constant literals, nested shapes and regular types are not mixed.
for datatype in &predicate.dataTypes {
if datatype.valType == OrmSchemaLiteralType::literal {
// Collect allowed literals and as strings
// (already in SPARQL-format, e.g. `"a astring"`, `<http:ex.co/>`, `true`, or `42`).
allowed_literals.extend(literal_to_sparql_str(datatype.clone()));
} else if datatype.valType == OrmSchemaLiteralType::shape {
let shape_iri = &datatype.shape.clone().unwrap();
let nested_shape = schema.get(shape_iri).unwrap();
// For the current acceptable shape, add CONSTRUCT, WHERE, and recurse.
// Each shape option gets its own var.
let obj_var_name = get_new_var_name(var_counter);
construct_statements.push(format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, obj_var_name
));
// Those are later added to a UNION, if there is more than one shape.
union_branches.push(format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, obj_var_name
));
// Recurse to add statements for nested object.
process_shape(
schema,
nested_shape,
&obj_var_name,
construct_statements,
where_statements,
var_counter,
visited_shapes,
);
}
}
// The where statement which might be wrapped in OPTIONAL.
let where_body: String;
if !allowed_literals.is_empty()
&& !predicate.extra.unwrap_or(false)
&& predicate.minCardinality > 0
{
// If we have literal requirements and they are not optional ("extra"),
// Add CONSTRUCT, WHERE, and FILTER.
let pred_var_name = get_new_var_name(var_counter);
construct_statements.push(format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, pred_var_name
));
where_body = format!(
" ?{s} <{p}> ?{o} . \n FILTER(?{o} IN ({lits}))",
s = subject_var_name,
p = predicate.iri,
o = pred_var_name,
lits = allowed_literals.join(", ")
);
} else if !union_branches.is_empty() {
// We have nested shape(s) which were already added to CONSTRUCT above.
// Join them with UNION.
where_body = union_branches
.into_iter()
.map(|b| format!("{{\n{}\n}}", b))
.collect::<Vec<_>>()
.join(" UNION ");
} else {
// Regular predicate data type. Just add basic CONSTRUCT and WHERE statements.
let pred_var_name = get_new_var_name(var_counter);
construct_statements.push(format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, pred_var_name
));
where_body = format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, pred_var_name
);
}
// Wrap in optional, if necessary.
if predicate.minCardinality < 1 {
where_statements.push(format!(" OPTIONAL {{\n{}\n }}", where_body));
} else {
where_statements.push(where_body);
};
}
visited_shapes.remove(&shape.iri);
}
let root_shape = schema.get(shape).ok_or(VerifierError::InvalidOrmSchema)?;
// Root subject variable name
let root_var_name = get_new_var_name(&mut var_counter);
process_shape(
schema,
root_shape,
&root_var_name,
&mut construct_statements,
&mut where_statements,
&mut var_counter,
&mut visited_shapes,
);
// Filter subjects, if present.
if let Some(subjects) = filter_subjects {
log_debug!("filter_subjects: {:?}", subjects);
let subjects_str = subjects
.iter()
.map(|s| format!("<{}>", s))
.collect::<Vec<_>>()
.join(", ");
where_statements.push(format!(" FILTER(?v0 IN ({}))", subjects_str));
}
// Create query from statements.
let construct_body = construct_statements.join(" .\n");
let where_body = where_statements.join(" .\n");
Ok(format!(
"CONSTRUCT {{\n{}\n}}\nWHERE {{\n{}\n}}",
construct_body, where_body
))
}
/// SPARQL literal escape: backslash, quotes, newlines, tabs.
fn escape_literal(lit: &str) -> String {
let mut out = String::with_capacity(lit.len() + 4);
for c in lit.chars() {
match c {
'\\' => out.push_str("\\\\"),
'\"' => out.push_str("\\\""),
'\n' => out.push_str("\\n"),
'\r' => out.push_str("\\r"),
'\t' => out.push_str("\\t"),
_ => out.push(c),
}
}
return out;
}
pub fn group_by_subject_for_shape<'a>(
shape: &OrmSchemaShape,
triples: &'a [Triple],
allowed_subjects: &[String],
) -> HashMap<String, Vec<&'a Triple>> {
let mut triples_by_subject: HashMap<String, Vec<&Triple>> = HashMap::new();
let allowed_preds_set: HashSet<&str> =
shape.predicates.iter().map(|p| p.iri.as_str()).collect();
let allowed_subject_set: HashSet<&str> = allowed_subjects.iter().map(|s| s.as_str()).collect();
for triple in triples {
// triple.subject must be in allowed_subjects (or allowed_subjects empty)
// and triple.predicate must be in allowed_preds.
if allowed_preds_set.contains(triple.predicate.as_str()) {
// filter subjects if list provided
let subj = match &triple.subject {
Subject::NamedNode(n) => n.clone().into_string(),
_ => continue,
};
// Subject must be in allowed subjects (or allowed_subjects is empty).
if allowed_subject_set.is_empty() || allowed_subject_set.contains(&subj.as_str()) {
triples_by_subject
.entry(subj)
.or_insert_with(Vec::new)
.push(triple);
}
}
}
return triples_by_subject;
}
pub fn nuri_to_string(nuri: &NuriV0) -> String {
// Get repo_id and overlay_id from the nuri
let repo_id = nuri.target.repo_id();
let overlay_id = if let Some(overlay_link) = &nuri.overlay {
overlay_link.clone().try_into().unwrap()
} else {
// Default overlay for the repo
OverlayId::outer(repo_id)
};
let graph_name = NuriV0::repo_graph_name(repo_id, &overlay_id);
graph_name
}

@ -262,11 +262,14 @@ impl Verifier {
};
}
tracked_subject.valid = new_validity.clone();
if new_validity == OrmTrackedSubjectValidity::Invalid {
// If we are invalid, we can discard new unknowns again - they won't be kept in memory.
// We need to remove ourself from child objects parents field and
// We need to remove ourself from child objects' parents field and
// remove them if no other is tracking.
// Child relationship cleanup disabled (nested tracking disabled in this refactor step)
// TODO: Child relationship cleanup disabled (nested tracking disabled in this refactor step)
// Remove tracked predicates and set untracked.
tracked_subject.tracked_predicates = HashMap::new();
@ -276,8 +279,8 @@ impl Verifier {
} else if new_validity == OrmTrackedSubjectValidity::Valid
&& previous_validity != OrmTrackedSubjectValidity::Valid
{
// If this subject became valid, we need to refetch this subject;
// We fetch
// If this subject became valid, we need to refetch this subject.
// If the data has already been fetched, the parent function will prevent the fetch.
need_evaluation.insert(0, (s_change.subject_iri.clone(), shape.iri.clone(), true));
}
@ -297,8 +300,6 @@ impl Verifier {
.collect();
}
tracked_subject.valid = new_validity;
return need_evaluation;
}
}

Loading…
Cancel
Save