Merge remote-tracking branch 'origin/feat/orm-diffs' into refactor

feat/orm-diffs
Niko PLP 2 days ago
commit 1753b6d5fb
  1. 528
      engine/verifier/src/orm/handle_backend_update.rs
  2. 115
      engine/verifier/src/orm/handle_frontend_update.rs
  3. 269
      engine/verifier/src/orm/materialize.rs
  4. 1182
      engine/verifier/src/orm/mod.rs
  5. 387
      engine/verifier/src/orm/process_changes.rs
  6. 322
      engine/verifier/src/orm/query.rs
  7. 7
      engine/verifier/src/orm/shape_validation.rs
  8. 5
      engine/verifier/src/orm/types.rs
  9. 262
      engine/verifier/src/orm/utils.rs
  10. 1
      sdk/js/signals/src/connector/applyDiff.ts
  11. 99
      sdk/rust/src/tests/mod.rs
  12. 92
      sdk/rust/src/tests/orm_creation.rs
  13. 1059
      sdk/rust/src/tests/orm_patches.rs

@ -0,0 +1,528 @@
// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
// at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except
// according to those terms.
use std::collections::HashMap;
use std::u64;
use futures::SinkExt;
pub use ng_net::orm::{OrmDiff, OrmShapeType};
use ng_net::{app_protocol::*, orm::*};
use ng_repo::log::*;
use crate::orm::types::*;
use crate::orm::utils::*;
use crate::orm::OrmChanges;
use crate::types::*;
use crate::verifier::*;
use ng_net::types::OverlayLink;
use ng_oxigraph::oxrdf::Triple;
use ng_repo::types::OverlayId;
use ng_repo::types::RepoId;
use serde_json::json;
use serde_json::Value;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::RwLock;
impl Verifier {
/// Generate and send JSON patches from GraphQuadsPatch (quad inserts and removes) to JS-land.
pub(crate) async fn orm_backend_update(
&mut self,
session_id: u64,
repo_id: RepoId,
overlay_id: OverlayId,
patch: GraphQuadsPatch,
) {
let overlaylink: OverlayLink = overlay_id.into();
// We need to apply the patches to all subscriptions we have. We can use process_changes_for_*
// That updates the tracked subjects, validates them, and returns a set of changes structured
// by the respective schema.
let triple_inserts: Vec<Triple> = patch
.inserts
.iter()
.map(|quad| {
Triple::new(
quad.subject.clone(),
quad.predicate.clone(),
quad.object.clone(),
)
})
.collect();
let triple_removes: Vec<Triple> = patch
.removes
.iter()
.map(|quad| {
Triple::new(
quad.subject.clone(),
quad.predicate.clone(),
quad.object.clone(),
)
})
.collect();
// let mut updates = Vec::new();
let mut scopes = vec![];
for (scope, subs) in self.orm_subscriptions.iter_mut() {
// Remove old subscriptions
subs.retain(|sub| !sub.sender.is_closed());
if !(scope.target == NuriTargetV0::UserSite
|| scope
.overlay
.as_ref()
.map_or(false, |ol| overlaylink == *ol)
|| scope.target == NuriTargetV0::Repo(repo_id))
{
continue;
}
// prepare to apply updates to tracked subjects and record the changes.
let root_shapes = subs
.iter()
.map(|sub| {
sub.shape_type
.schema
.get(&sub.shape_type.shape)
.unwrap()
.clone()
})
.collect::<Vec<_>>();
scopes.push((scope.clone(), root_shapes));
}
log_debug!(
"[orm_backend_update], creating patch objects for scopes:\n{}",
scopes.len()
);
for (scope, shapes) in scopes {
let mut orm_changes: OrmChanges = HashMap::new();
// Apply the changes to tracked subjects.
for shape_arc in shapes {
let _ = self.process_changes_for_shape_and_session(
&scope,
shape_arc,
session_id,
&triple_inserts,
&triple_removes,
&mut orm_changes,
false,
);
}
let subs = self.orm_subscriptions.get(&scope).unwrap();
for sub in subs.iter() {
log_debug!(
"Applying changes to subscription with nuri {} and shape {}",
sub.nuri.repo(),
sub.shape_type.shape
);
// The JSON patches to send to JS land.
let mut patches: Vec<OrmDiffOp> = vec![];
// Keep track of created objects by path and if they need an id.
// Later we created patches from them to ensure the objects exist.
let mut paths_of_objects_to_create: HashSet<(Vec<String>, Option<SubjectIri>)> =
HashSet::new();
// We construct object patches from a change (which is associated with a shape type). {op: add, valType: object, value: Null, path: ...}
// For each change that has a subject tracked in this subscription,
// - Get change operation (calling diff_op_from_pred_change).
// - case not object, single --> either add or remove (must be one of each at max)
// - case not object, multi --> just add and or set patch
// - case object, multi --> create object patch + nested object patch (will be handled when recursing paths to add primitive values)
// - case object, single --> just object patch (will be handled when recursing paths to add primitive values)
// - Add patches for each change operation for the path of the change in the schema.
// We find the path by traversing the schema up to the parents (add_diff_ops_for_tracked_subject).
// TODO: Special edge case: An object with parents changed and the parents' predicate schema has multiple allowed shapes.
// Now, there are multiple tracked subjects with the same subject IRI but different shapes of which some
// are valid or invalid. The first valid (subject, shape) pair must used for materialization.
// - if a higher-priority shape became invalid but a lower priority shape is valid, delete and new add.
// - if a higher-priority shape became valid, delete and add new valid.
// Problem: We might not have the triples present to materialize the newly valid object so we need to fetch them.
// Process changes for this subscription
// Iterate over all changes and create patches
for (shape_iri, subject_changes) in &orm_changes {
for (subject_iri, change) in subject_changes {
// Get the tracked subject for this (subject, shape) pair
let tracked_subject = sub
.tracked_subjects
.get(subject_iri)
.and_then(|shapes| shapes.get(shape_iri))
.map(|ts| ts.read().unwrap())
.unwrap();
// Now we have the tracked predicate (containing the shape) and the change.
// Check validity changes
if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Invalid
&& tracked_subject.valid == OrmTrackedSubjectValidity::Invalid
{
// Is the subject invalid and was it before? There is nothing we need to inform about.
continue;
} else if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Valid
&& tracked_subject.valid == OrmTrackedSubjectValidity::Invalid
|| tracked_subject.valid == OrmTrackedSubjectValidity::Untracked
{
// Has the subject become invalid or untracked?
// We add a patch, deleting the object at its root.
let mut path: Vec<String>;
if tracked_subject.parents.is_empty() {
// If this is a root object, we need to add the object's id itself.
path = vec![tracked_subject.subject_iri.clone()];
} else {
path = vec![];
}
build_path_to_root_and_create_patches(
&tracked_subject,
&sub.tracked_subjects,
&sub.shape_type.shape,
&mut path,
(OrmDiffOpType::remove, Some(OrmDiffType::object), None, None),
&mut patches,
&mut paths_of_objects_to_create,
);
} else {
// The subject is valid or has become valid.
// Process each predicate change
for (_pred_iri, pred_change) in &change.predicates {
let tracked_predicate =
pred_change.tracked_predicate.read().unwrap();
let pred_name = tracked_predicate.schema.readablePredicate.clone();
// Get the diff operations for this predicate change
let diff_ops = create_diff_ops_from_predicate_change(pred_change);
// For each diff operation, traverse up to the root to build the path
for diff_op in diff_ops {
let mut path = vec![pred_name.clone()];
// Start recursion from this tracked subject
build_path_to_root_and_create_patches(
&tracked_subject,
&sub.tracked_subjects,
&sub.shape_type.shape,
&mut path,
diff_op,
&mut patches,
&mut paths_of_objects_to_create,
);
}
}
}
}
}
// Create patches for objects that need to be created
// These are patches with {op: add, valType: object, value: Null, path: ...}
// Sort by path length (shorter first) to ensure parent objects are created before children
let mut sorted_object_paths: Vec<_> = paths_of_objects_to_create.iter().collect();
sorted_object_paths.sort_by_key(|(path_segments, _)| path_segments.len());
for (path_segments, maybe_iri) in sorted_object_paths {
let escaped_path: Vec<String> = path_segments
.iter()
.map(|seg| escape_json_pointer(seg))
.collect();
let json_pointer = format!("/{}", escaped_path.join("/"));
patches.push(OrmDiffOp {
op: OrmDiffOpType::add,
valType: Some(OrmDiffType::object),
path: json_pointer.clone(),
value: None,
});
if let Some(iri) = maybe_iri {
patches.push(OrmDiffOp {
op: OrmDiffOpType::add,
valType: Some(OrmDiffType::object),
path: format!("{}/id", json_pointer),
value: Some(json!(iri)),
});
}
}
// Send response with patches.
let _ = sub
.sender
.clone()
.send(AppResponse::V0(AppResponseV0::OrmUpdate(patches.to_vec())))
.await;
}
}
}
}
/// Find the predicate schema linking a parent to a child tracked subject and build the path segment.
/// Returns the updated path if a linking predicate is found.
fn build_path_segment_for_parent(
tracked_subject: &OrmTrackedSubject,
parent_ts: &OrmTrackedSubject,
base_path: &[String],
) -> Option<Vec<String>> {
// Find the predicate schema linking parent to this tracked subject
for pred_arc in &parent_ts.shape.predicates {
// Check if this predicate has our subject as a child
if let Some(tracked_pred) = parent_ts.tracked_predicates.get(&pred_arc.iri) {
let tp = tracked_pred.read().unwrap();
// Check if this tracked subject is in the children
let is_child = tp.tracked_children.iter().any(|child| {
let child_read = child.read().unwrap();
child_read.subject_iri == tracked_subject.subject_iri
});
if is_child {
// Build the path segment
let mut new_path = base_path.to_vec();
let is_multi = pred_arc.maxCardinality > 1 || pred_arc.maxCardinality == -1;
// For multi-valued predicates, add the object IRI as a key first
if is_multi {
new_path.insert(0, tracked_subject.subject_iri.clone());
}
// Add the readable predicate name
new_path.insert(0, pred_arc.readablePredicate.clone());
return Some(new_path);
}
}
}
None
}
/// Recursively build the path from a tracked subject to the root and create diff operation patches.
/// The function recurses from child to parents down to a root tracked subject.
/// If multiple parents exist, it adds separate patches for each.
fn build_path_to_root_and_create_patches(
tracked_subject: &OrmTrackedSubject,
tracked_subjects: &HashMap<String, HashMap<String, Arc<RwLock<OrmTrackedSubject>>>>,
root_shape: &String,
path: &mut Vec<String>,
diff_op: (
OrmDiffOpType,
Option<OrmDiffType>,
Option<Value>, // The value added / removed
Option<String>, // The IRI, if change is an added / removed object.
),
patches: &mut Vec<OrmDiffOp>,
paths_of_objects_to_create: &mut HashSet<(Vec<String>, Option<SubjectIri>)>,
) {
// If the tracked subject is not valid, we don't create patches for it
if tracked_subject.valid != OrmTrackedSubjectValidity::Valid {
return;
}
// If the tracked subject is newly valid (was not valid before but is now),
// we need to ensure the object is created with an "add object" patch
if tracked_subject.prev_valid != OrmTrackedSubjectValidity::Valid {
// Check if we're at a root subject or need to traverse to parents
if tracked_subject.parents.is_empty() || tracked_subject.shape.iri == *root_shape {
// At root: build the path with the subject IRI
let escaped_path: Vec<String> =
path.iter().map(|seg| escape_json_pointer(seg)).collect();
let json_pointer = format!(
"/{}/{}",
escape_json_pointer(&tracked_subject.subject_iri),
escaped_path.join("/")
);
// Create an "add object" patch to ensure the object exists
patches.push(OrmDiffOp {
op: OrmDiffOpType::add,
valType: Some(OrmDiffType::object),
path: json_pointer.clone(),
value: None,
});
// Also add the id field for the object
patches.push(OrmDiffOp {
op: OrmDiffOpType::add,
valType: None,
path: format!("{}/id", json_pointer),
value: Some(json!(tracked_subject.subject_iri)),
});
} else {
// Not at root: traverse to parents and create object patches along the way
for (_parent_iri, parent_tracked_subject) in tracked_subject.parents.iter() {
let parent_ts = parent_tracked_subject.read().unwrap();
if let Some(new_path) =
build_path_segment_for_parent(tracked_subject, &parent_ts, path)
{
// Recurse to the parent first
build_path_to_root_and_create_patches(
&parent_ts,
tracked_subjects,
root_shape,
&mut new_path.clone(),
(OrmDiffOpType::add, Some(OrmDiffType::object), None, None),
patches,
paths_of_objects_to_create,
);
}
}
}
}
// If this subject has no parents or its shape matches the root shape, we've reached the root
if tracked_subject.parents.is_empty() || tracked_subject.shape.iri == *root_shape {
// Build the final JSON Pointer path
let escaped_path: Vec<String> = path.iter().map(|seg| escape_json_pointer(seg)).collect();
// Always add the root subject to the path.
let json_pointer = format!(
"/{}/{}",
escape_json_pointer(&tracked_subject.subject_iri),
escaped_path.join("/")
);
// Create the patch
patches.push(OrmDiffOp {
op: diff_op.0.clone(),
valType: diff_op.1.clone(),
path: json_pointer.clone(),
value: diff_op.2.clone(),
});
// // If a new object is created on a predicate where multiple ones are allowed, create IRI path too.
// if let Some(added_obj_iri) = diff_op.3 {
// patches.push(OrmDiffOp {
// op: diff_op.0.clone(),
// valType: diff_op.1.clone(),
// path: format!("{}/{}", json_pointer, escape_json_pointer(&added_obj_iri)),
// value: diff_op.2.clone(),
// });
// }
return;
}
// Recurse to parents
for (_parent_iri, parent_tracked_subject) in tracked_subject.parents.iter() {
let parent_ts = parent_tracked_subject.read().unwrap();
// Build the path segment for this parent
if let Some(new_path) = build_path_segment_for_parent(tracked_subject, &parent_ts, path) {
// Recurse to the parent
build_path_to_root_and_create_patches(
&parent_ts,
tracked_subjects,
root_shape,
&mut new_path.clone(),
diff_op.clone(),
patches,
paths_of_objects_to_create,
);
}
}
}
/// Create diff operations from a predicate change.
/// Returns a list of (op_type, value_type, value, iri) tuples.
fn create_diff_ops_from_predicate_change(
pred_change: &OrmTrackedPredicateChanges,
) -> Vec<(
OrmDiffOpType,
Option<OrmDiffType>,
Option<Value>, // The value added / removed
Option<String>, // The IRI, if change is an added / removed object.
)> {
let tracked_predicate = pred_change.tracked_predicate.read().unwrap();
let is_multi = tracked_predicate.schema.maxCardinality > 1
|| tracked_predicate.schema.maxCardinality == -1;
let is_object = tracked_predicate
.schema
.dataTypes
.iter()
.any(|dt| dt.shape.is_some());
let mut ops = vec![];
if !is_multi && !is_object {
if pred_change.values_added.len() == 1 {
// A value was added. Another one might have been removed
// but the add patch overwrite previous values.
return [(
OrmDiffOpType::add,
None,
Some(json!(pred_change.values_added[0])),
None,
)]
.to_vec();
} else {
// Since there is only one possible value, removing the path is enough.
return [(OrmDiffOpType::remove, None, None, None)].to_vec();
}
} else if is_multi && !is_object {
if pred_change.values_added.len() > 0 {
ops.push((
OrmDiffOpType::add,
Some(OrmDiffType::set),
Some(json!(pred_change.values_added)),
None,
));
}
if pred_change.values_removed.len() > 0 {
ops.push((
OrmDiffOpType::remove,
Some(OrmDiffType::set),
Some(json!(pred_change.values_removed)),
None,
));
}
return ops;
}
// else if !is_multi && is_object {
// if pred_change.values_added.len() > 0 {
// ops.push((OrmDiffOpType::add, Some(OrmDiffType::object), None, None));
// } else if pred_change.values_removed.len() > 0 {
// ops.push((OrmDiffOpType::remove, Some(OrmDiffType::object), None, None));
// }
// } else if is_multi && is_object {
// for val_added in pred_change.values_added.iter() {
// let iri = match val_added {
// BasicType::Str(s) => s,
// _ => {
// continue;
// }
// };
// ops.push((
// OrmDiffOpType::add,
// Some(OrmDiffType::object),
// None,
// Some(iri.clone()),
// ));
// }
// for val_removed in pred_change.values_added.iter() {
// let iri = match val_removed {
// BasicType::Str(s) => s,
// _ => {
// continue;
// }
// };
// ops.push((
// OrmDiffOpType::remove,
// Some(OrmDiffType::object),
// None,
// Some(iri.clone()),
// ));
// }
// }
return ops;
}

@ -0,0 +1,115 @@
// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
// at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except
// according to those terms.
use ng_oxigraph::oxrdf::Quad;
use ng_repo::errors::VerifierError;
use std::u64;
use futures::SinkExt;
use ng_net::app_protocol::*;
pub use ng_net::orm::{OrmDiff, OrmShapeType};
use ng_repo::log::*;
use crate::orm::types::*;
use crate::verifier::*;
impl Verifier {
/// After creating new objects (without an id) in JS-land,
/// we send the generated id for those back.
/// If something went wrong (revert_inserts / revert_removes not empty),
/// we send a JSON patch back to revert the made changes.
pub(crate) async fn orm_update_self(
&mut self,
scope: &NuriV0,
shape_iri: ShapeIri,
session_id: u64,
skolemnized_blank_nodes: Vec<Quad>,
revert_inserts: Vec<Quad>,
revert_removes: Vec<Quad>,
) -> Result<(), VerifierError> {
let (mut sender, orm_subscription) =
self.get_first_orm_subscription_sender_for(scope, Some(&shape_iri), Some(&session_id))?;
// TODO prepare OrmUpdateBlankNodeIds with skolemnized_blank_nodes
// use orm_subscription if needed
// note(niko): I think skolemnized blank nodes can still be many, in case of multi-level nested sub-objects.
let orm_bnids = vec![];
let _ = sender
.send(AppResponse::V0(AppResponseV0::OrmUpdateBlankNodeIds(
orm_bnids,
)))
.await;
// TODO (later) revert the inserts and removes
// let orm_diff = vec![];
// let _ = sender.send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff))).await;
Ok(())
}
/// Handles updates coming from JS-land (JSON patches).
pub(crate) async fn orm_frontend_update(
&mut self,
session_id: u64,
scope: &NuriV0,
shape_iri: ShapeIri,
diff: OrmDiff,
) -> Result<(), String> {
log_info!(
"frontend_update_orm session={} scope={:?} shape={} diff={:?}",
session_id,
scope,
shape_iri,
diff
);
let (doc_nuri, sparql_update) = {
let orm_subscription =
self.get_first_orm_subscription_for(scope, Some(&shape_iri), Some(&session_id));
// use orm_subscription as needed
// do the magic, then, find the doc where the query should start and generate the sparql update
let doc_nuri = NuriV0::new_empty();
let sparql_update: String = String::new();
(doc_nuri, sparql_update)
};
match self
.process_sparql_update(
&doc_nuri,
&sparql_update,
&None,
self.get_peer_id_for_skolem(),
session_id,
)
.await
{
Err(e) => Err(e),
Ok((_, revert_inserts, revert_removes, skolemnized_blank_nodes)) => {
if !revert_inserts.is_empty()
|| !revert_removes.is_empty()
|| !skolemnized_blank_nodes.is_empty()
{
self.orm_update_self(
scope,
shape_iri,
session_id,
skolemnized_blank_nodes,
revert_inserts,
revert_removes,
)
.await
.map_err(|e| e.to_string())?;
}
Ok(())
}
}
}
}

@ -0,0 +1,269 @@
// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
// at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except
// according to those terms.
use futures::SinkExt;
use ng_net::orm::*;
pub use ng_net::orm::{OrmDiff, OrmShapeType};
use ng_net::utils::Receiver;
use serde_json::json;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
use crate::orm::query::shape_type_to_sparql;
use crate::orm::types::*;
use crate::orm::utils::nuri_to_string;
use crate::types::CancelFn;
use crate::verifier::Verifier;
use ng_net::app_protocol::{AppResponse, AppResponseV0, NuriV0};
use ng_net::orm::OrmSchemaShape;
use ng_repo::errors::NgError;
use ng_repo::log::*;
use std::u64;
use futures::channel::mpsc;
use crate::orm::{types::OrmTrackedSubjectChange, OrmChanges};
impl Verifier {
/// Entry point to create a new orm subscription.
/// Triggers the creation of an orm object which is sent back to the receiver.
pub(crate) async fn start_orm(
&mut self,
nuri: &NuriV0,
shape_type: &OrmShapeType,
session_id: u64,
) -> Result<(Receiver<AppResponse>, CancelFn), NgError> {
let (mut tx, rx) = mpsc::unbounded::<AppResponse>();
// TODO: Validate schema:
// If multiple data types are present for the same predicate, they must be of of the same type.
// All referenced shapes must be available.
// Create new subscription and add to self.orm_subscriptions
let orm_subscription = OrmSubscription {
shape_type: shape_type.clone(),
session_id: session_id,
sender: tx.clone(),
tracked_subjects: HashMap::new(),
nuri: nuri.clone(),
};
self.orm_subscriptions
.entry(nuri.clone())
.or_insert(vec![])
.push(orm_subscription);
let orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type)?;
// log_debug!("create_orm_object_for_shape return {:?}", orm_objects);
let _ = tx
.send(AppResponse::V0(AppResponseV0::OrmInitial(orm_objects)))
.await;
let close = Box::new(move || {
log_debug!("closing ORM subscription");
if !tx.is_closed() {
tx.close_channel();
}
});
Ok((rx, close))
}
/// For a nuri, session, and shape, create an ORM JSON object.
fn create_orm_object_for_shape(
&mut self,
nuri: &NuriV0,
session_id: u64,
shape_type: &OrmShapeType,
) -> Result<Value, NgError> {
// Query triples for this shape
let shape_query = shape_type_to_sparql(&shape_type.schema, &shape_type.shape, None)?;
let shape_triples = self.query_sparql_construct(shape_query, Some(nuri_to_string(nuri)))?;
let changes: OrmChanges =
self.apply_triple_changes(&shape_triples, &[], nuri, Some(session_id.clone()), true)?;
let orm_subscription =
self.get_first_orm_subscription_for(nuri, Some(&shape_type.shape), Some(&session_id));
let schema: &HashMap<String, Arc<OrmSchemaShape>> = &orm_subscription.shape_type.schema;
let root_shape = schema.get(&shape_type.shape).unwrap();
let Some(_root_changes) = changes.get(&root_shape.iri).map(|s| s.values()) else {
return Ok(Value::Array(vec![]));
};
let mut return_vals: Value = Value::Array(vec![]);
let return_val_vec = return_vals.as_array_mut().unwrap();
// log_debug!(
// "Tracked subjects:\n{:?}\n",
// orm_subscription.tracked_subjects,
// );
// For each valid change struct, we build an orm object.
// The way we get the changes from the tracked subjects is a bit hacky, sorry.
for (subject_iri, tracked_subjects_by_shape) in &orm_subscription.tracked_subjects {
if let Some(tracked_subject) = tracked_subjects_by_shape.get(&shape_type.shape) {
let ts = tracked_subject.read().unwrap();
log_info!("changes for: {:?} valid: {:?}\n", ts.subject_iri, ts.valid);
if ts.valid == OrmTrackedSubjectValidity::Valid {
if let Some(change) = changes
.get(&shape_type.shape)
.and_then(|subject_iri_to_ts| subject_iri_to_ts.get(subject_iri).clone())
{
let new_val = materialize_orm_object(
change,
&changes,
root_shape,
&orm_subscription.tracked_subjects,
);
// TODO: For some reason, this log statement causes a panic.
// log_debug!("Materialized change:\n{:?}\ninto:\n{:?}", change, new_val);
return_val_vec.push(new_val);
}
}
}
}
return Ok(return_vals);
}
}
/// Create ORM JSON object from OrmTrackedSubjectChange and shape.
pub(crate) fn materialize_orm_object(
change: &OrmTrackedSubjectChange,
changes: &OrmChanges,
shape: &OrmSchemaShape,
tracked_subjects: &HashMap<String, HashMap<String, Arc<RwLock<OrmTrackedSubject>>>>,
) -> Value {
let mut orm_obj = json!({"id": change.subject_iri});
let orm_obj_map = orm_obj.as_object_mut().unwrap();
for pred_schema in &shape.predicates {
let property_name = &pred_schema.readablePredicate;
let is_multi = pred_schema.maxCardinality > 1 || pred_schema.maxCardinality == -1;
let Some(pred_change) = change.predicates.get(&pred_schema.iri) else {
// No triples for this property.
if pred_schema.minCardinality == 0 && is_multi {
// If this predicate schema is an array though, insert empty array.
orm_obj_map.insert(property_name.clone(), Value::Array(vec![]));
}
continue;
};
if pred_schema
.dataTypes
.iter()
.any(|dt| dt.valType == OrmSchemaLiteralType::shape)
{
// We have a nested type.
// Helper closure to create Value structs from a nested object_iri.
let get_nested_orm_obj = |object_iri: &SubjectIri| {
// Find allowed schemas for the predicate's datatype.
let shape_iris: Vec<ShapeIri> = pred_schema
.dataTypes
.iter()
.flat_map(|dt| dt.shape.clone())
.collect();
// Find subject_change for this subject. There exists at least one (shape, subject) pair.
// If multiple allowed shapes exist, the first one is chosen.
let nested = shape_iris.iter().find_map(|shape_iri| {
changes
.get(shape_iri)
.and_then(|subject_changes| subject_changes.get(object_iri))
.map(|ch| (shape_iri, ch))
});
if let Some((matched_shape_iri, nested_subject_change)) = nested {
if let Some(nested_tracked_subject) = tracked_subjects
.get(&nested_subject_change.subject_iri)
.and_then(|shape_to_tracked_orm| {
shape_to_tracked_orm.get(matched_shape_iri)
})
{
let nested_tracked_subject = nested_tracked_subject.read().unwrap();
if nested_tracked_subject.valid == OrmTrackedSubjectValidity::Valid {
// Recurse
return Some(materialize_orm_object(
nested_subject_change,
changes,
&nested_tracked_subject.shape,
tracked_subjects,
));
}
}
}
None
};
if is_multi {
// Represent nested objects with more than one child
// as a map/object of <IRI of nested object> -> nested object,
// since there is no conceptual ordering of the children.
let mut nested_objects_map = serde_json::Map::new();
// Add each nested objects.
for new_val in &pred_change.values_added {
if let BasicType::Str(object_iri) = new_val {
if let Some(nested_orm_obj) = get_nested_orm_obj(object_iri) {
nested_objects_map.insert(object_iri.clone(), nested_orm_obj);
}
}
}
orm_obj_map.insert(property_name.clone(), Value::Object(nested_objects_map));
} else {
if let Some(BasicType::Str(object_iri)) = pred_change.values_added.get(0) {
if let Some(nested_orm_obj) = get_nested_orm_obj(object_iri) {
orm_obj_map.insert(property_name.clone(), nested_orm_obj);
}
}
}
} else {
// We have a basic type (string, number, bool, literal).
if is_multi {
// Add values as array.
orm_obj_map.insert(
property_name.clone(),
Value::Array(
pred_change
.values_added
.iter()
.map(|v| match v {
BasicType::Bool(b) => json!(*b),
BasicType::Num(n) => json!(*n),
BasicType::Str(s) => json!(s),
})
.collect(),
),
);
} else {
// Add value as primitive, if present.
if let Some(val) = pred_change.values_added.get(0) {
orm_obj_map.insert(
property_name.clone(),
match val {
BasicType::Bool(b) => json!(*b),
BasicType::Num(n) => json!(*n),
BasicType::Str(s) => json!(s),
},
);
}
}
}
}
return orm_obj;
}

File diff suppressed because it is too large Load Diff

@ -0,0 +1,387 @@
// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
// at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except
// according to those terms.
use futures::channel::mpsc::UnboundedSender;
use ng_repo::errors::VerifierError;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::u64;
pub use ng_net::orm::{OrmDiff, OrmShapeType};
use ng_net::{app_protocol::*, orm::*};
use ng_oxigraph::oxrdf::Triple;
use ng_repo::errors::NgError;
use ng_repo::log::*;
use crate::orm::add_remove_triples::add_remove_triples;
use crate::orm::query::shape_type_to_sparql;
use crate::orm::types::*;
use crate::orm::utils::*;
use crate::orm::OrmChanges;
use crate::verifier::*;
impl Verifier {
/// Apply triples to a nuri's document.
/// Updates tracked_subjects in orm_subscriptions.
pub(crate) fn apply_triple_changes(
&mut self,
triples_added: &[Triple],
triples_removed: &[Triple],
nuri: &NuriV0,
only_for_session_id: Option<u64>,
data_already_fetched: bool,
) -> Result<OrmChanges, NgError> {
log_debug!("apply_triple_changes {:?}", only_for_session_id);
// If we have a specific session, handle only that subscription.
if let Some(session_id) = only_for_session_id {
return self.process_changes_for_nuri_and_session(
&nuri.clone(),
session_id,
triples_added,
triples_removed,
data_already_fetched,
);
}
// Otherwise, iterate all sessions.
let mut merged: OrmChanges = HashMap::new();
let session_ids: Vec<_> = self
.orm_subscriptions
.get(nuri)
.unwrap()
.iter()
.map(|s| s.session_id.clone())
.collect();
for session_id in session_ids {
let changes = self.process_changes_for_nuri_and_session(
&nuri,
session_id,
triples_added,
triples_removed,
data_already_fetched,
)?;
for (shape_iri, subj_map) in changes {
merged
.entry(shape_iri)
.or_insert_with(HashMap::new)
.extend(subj_map);
}
}
Ok(merged)
}
/// Helper to call process_changes_for_shape for all subscriptions on nuri's document.
fn process_changes_for_nuri_and_session(
self: &mut Self,
nuri: &NuriV0,
session_id: u64,
triples_added: &[Triple],
triples_removed: &[Triple],
data_already_fetched: bool,
) -> Result<OrmChanges, NgError> {
let mut orm_changes = HashMap::new();
let shapes: Vec<_> = self
.orm_subscriptions
.get(nuri)
.unwrap()
.iter()
.map(|sub| {
sub.shape_type
.schema
.get(&sub.shape_type.shape)
.unwrap()
.clone()
})
.collect();
for root_shape in shapes {
self.process_changes_for_shape_and_session(
nuri,
root_shape,
session_id,
triples_added,
triples_removed,
&mut orm_changes,
data_already_fetched,
)?;
}
Ok(orm_changes)
}
/// Add and remove the triples from the tracked subjects,
/// re-validate, and update `changes` containing the updated data.
/// Works by queuing changes by shape and subjects on a stack.
/// Nested objects are added to the stack
pub(crate) fn process_changes_for_shape_and_session(
self: &mut Self,
nuri: &NuriV0,
root_shape: Arc<OrmSchemaShape>,
session_id: u64,
triples_added: &[Triple],
triples_removed: &[Triple],
orm_changes: &mut OrmChanges,
data_already_fetched: bool,
) -> Result<(), NgError> {
// First in, last out stack to keep track of objects to validate (nested objects first). Strings are object IRIs.
let mut shape_validation_stack: Vec<(Arc<OrmSchemaShape>, Vec<String>)> = vec![];
// Track (shape_iri, subject_iri) pairs currently being validated to prevent cycles and double evaluation.
let mut currently_validating: HashSet<(String, String)> = HashSet::new();
// Add root shape for first validation run.
let root_shape_iri = root_shape.iri.clone();
shape_validation_stack.push((root_shape, vec![]));
// Process queue of shapes and subjects to validate.
// For a given shape, we evaluate every subject against that shape.
while let Some((shape, objects_to_validate)) = shape_validation_stack.pop() {
// Collect triples relevant for validation.
let added_triples_by_subject =
group_by_subject_for_shape(&shape, triples_added, &objects_to_validate);
let removed_triples_by_subject =
group_by_subject_for_shape(&shape, triples_removed, &objects_to_validate);
let modified_subject_iris: HashSet<&SubjectIri> = added_triples_by_subject
.keys()
.chain(removed_triples_by_subject.keys())
.collect();
let mut orm_subscription = self
.orm_subscriptions
.get_mut(nuri)
.unwrap()
.iter_mut()
.find(|sub| sub.session_id == session_id && sub.shape_type.shape == root_shape_iri)
.unwrap();
// Variable to collect nested objects that need validation.
let mut nested_objects_to_eval: HashMap<ShapeIri, Vec<(SubjectIri, bool)>> =
HashMap::new();
// For each subject, add/remove triples and validate.
log_debug!(
"processing modified subjects: {:?} against shape: {}",
modified_subject_iris,
shape.iri
);
for subject_iri in &modified_subject_iris {
let validation_key = (shape.iri.clone(), subject_iri.to_string());
// Cycle detection: Check if this (shape, subject) pair is already being validated
if currently_validating.contains(&validation_key) {
log_warn!(
"Cycle detected: subject '{}' with shape '{}' is already being validated. Marking as invalid.",
subject_iri,
shape.iri
);
// Mark as invalid due to cycle
// TODO: We could handle this by handling nested references as IRIs.
if let Some(tracked_shapes) =
orm_subscription.tracked_subjects.get(*subject_iri)
{
if let Some(tracked_subject) = tracked_shapes.get(&shape.iri) {
let mut ts = tracked_subject.write().unwrap();
ts.valid = OrmTrackedSubjectValidity::Invalid;
ts.tracked_predicates.clear();
}
}
continue;
}
// Mark as currently validating
currently_validating.insert(validation_key.clone());
// Get triples of subject (added & removed).
let triples_added_for_subj = added_triples_by_subject
.get(*subject_iri)
.map(|v| v.as_slice())
.unwrap_or(&[]);
let triples_removed_for_subj = removed_triples_by_subject
.get(*subject_iri)
.map(|v| v.as_slice())
.unwrap_or(&[]);
// Get or create change object for (shape, subject) pair.
let change = orm_changes
.entry(shape.iri.clone())
.or_insert_with(HashMap::new)
.entry((*subject_iri).clone())
.or_insert_with(|| OrmTrackedSubjectChange {
subject_iri: (*subject_iri).clone(),
predicates: HashMap::new(),
data_applied: false,
});
// Apply all triples for that subject to the tracked (shape, subject) pair.
// Record the changes.
{
if !change.data_applied {
log_debug!(
"Adding triples to change tracker for subject {}",
subject_iri
);
if let Err(e) = add_remove_triples(
shape.clone(),
subject_iri,
triples_added_for_subj,
triples_removed_for_subj,
&mut orm_subscription,
change,
) {
log_err!("apply_changes_from_triples add/remove error: {:?}", e);
panic!();
}
change.data_applied = true;
} else {
log_debug!("not applying triples again for subject {subject_iri}");
}
// Validate the subject.
let need_eval =
Self::update_subject_validity(change, &shape, &mut orm_subscription);
// We add the need_eval to be processed next after loop.
// Filter out subjects already in the validation stack to prevent double evaluation.
for (iri, schema_shape, needs_refetch) in need_eval {
let eval_key = (schema_shape.clone(), iri.clone());
if !currently_validating.contains(&eval_key) {
// Only add if not currently being validated
nested_objects_to_eval
.entry(schema_shape)
.or_insert_with(Vec::new)
.push((iri.clone(), needs_refetch));
}
}
}
}
// Now, we queue all non-evaluated objects
for (shape_iri, objects_to_eval) in &nested_objects_to_eval {
let orm_subscription = self.get_first_orm_subscription_for(
nuri,
Some(&root_shape_iri),
Some(&session_id),
);
// Extract schema and shape Arc before mutable borrow
let schema = orm_subscription.shape_type.schema.clone();
let shape_arc = schema.get(shape_iri).unwrap().clone();
// Data might need to be fetched (if it has not been during initialization or nested shape fetch).
if !data_already_fetched {
let objects_to_fetch = objects_to_eval
.iter()
.filter(|(_iri, needs_fetch)| *needs_fetch)
.map(|(s, _)| s.clone())
.collect();
// Create sparql query
let shape_query =
shape_type_to_sparql(&schema, &shape_iri, Some(objects_to_fetch))?;
let new_triples =
self.query_sparql_construct(shape_query, Some(nuri_to_string(nuri)))?;
// Recursively process nested objects.
self.process_changes_for_shape_and_session(
nuri,
shape_arc.clone(),
session_id,
&new_triples,
&vec![],
orm_changes,
true,
)?;
}
// Add objects
let objects_not_to_fetch: Vec<String> = objects_to_eval
.iter()
.filter(|(_iri, needs_fetch)| !*needs_fetch)
.map(|(s, _)| s.clone())
.collect();
if objects_not_to_fetch.len() > 0 {
// Queue all objects that don't need fetching.
shape_validation_stack.push((shape_arc, objects_not_to_fetch));
}
}
for subject_iri in modified_subject_iris {
let validation_key = (shape.iri.clone(), subject_iri.to_string());
currently_validating.remove(&validation_key);
}
}
Ok(())
}
/// Helper to get orm subscriptions for nuri, shapes and sessions.
pub fn get_orm_subscriptions_for(
&self,
nuri: &NuriV0,
shape: Option<&ShapeIri>,
session_id: Option<&u64>,
) -> Vec<&OrmSubscription> {
self.orm_subscriptions.get(nuri).unwrap().
// Filter shapes, if present.
iter().filter(|s| match shape {
Some(sh) => *sh == s.shape_type.shape,
None => true
// Filter session ids if present.
}).filter(|s| match session_id {
Some(id) => *id == s.session_id,
None => true
}).collect()
}
pub fn get_first_orm_subscription_for(
&self,
nuri: &NuriV0,
shape: Option<&ShapeIri>,
session_id: Option<&u64>,
) -> &OrmSubscription {
self.orm_subscriptions.get(nuri).unwrap().
// Filter shapes, if present.
iter().filter(|s| match shape {
Some(sh) => *sh == s.shape_type.shape,
None => true
// Filter session ids if present.
}).filter(|s| match session_id {
Some(id) => *id == s.session_id,
None => true
}).next().unwrap()
}
pub fn get_first_orm_subscription_sender_for(
&mut self,
nuri: &NuriV0,
shape: Option<&ShapeIri>,
session_id: Option<&u64>,
) -> Result<(UnboundedSender<AppResponse>, &OrmSubscription), VerifierError> {
let subs = self.orm_subscriptions.get_mut(nuri).unwrap();
subs.retain(|sub| !sub.sender.is_closed());
match subs // Filter shapes, if present.
.iter()
.filter(|s| match shape {
Some(sh) => *sh == s.shape_type.shape,
None => true, // Filter session ids if present.
})
.filter(|s| match session_id {
Some(id) => *id == s.session_id,
None => true,
})
.next()
{
None => Err(VerifierError::OrmSubscriptionNotFound),
Some(subscription) => Ok((subscription.sender.clone(), subscription)),
}
}
}

@ -0,0 +1,322 @@
// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
// at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except
// according to those terms.
use lazy_static::lazy_static;
use ng_repo::errors::VerifierError;
use regex::Regex;
use std::collections::HashSet;
pub use ng_net::orm::{OrmDiff, OrmShapeType};
use crate::orm::types::*;
use crate::verifier::*;
use ng_net::orm::*;
use ng_oxigraph::oxigraph::sparql::{Query, QueryResults};
use ng_oxigraph::oxrdf::Triple;
use ng_repo::errors::NgError;
use ng_repo::log::*;
impl Verifier {
pub fn query_sparql_construct(
&self,
query: String,
nuri: Option<String>,
) -> Result<Vec<Triple>, NgError> {
let oxistore = self.graph_dataset.as_ref().unwrap();
// let graph_nuri = NuriV0::repo_graph_name(
// &update.repo_id,
// &update.overlay_id,
// );
//let base = NuriV0::repo_id(&repo.id);
let nuri_str = nuri.as_ref().map(|s| s.as_str());
log_debug!("querying construct\n{}\n{}\n", nuri_str.unwrap(), query);
let parsed =
Query::parse(&query, nuri_str).map_err(|e| NgError::OxiGraphError(e.to_string()))?;
let results = oxistore
.query(parsed, nuri)
.map_err(|e| NgError::OxiGraphError(e.to_string()))?;
match results {
QueryResults::Graph(triples) => {
let mut result_triples: Vec<Triple> = vec![];
for t in triples {
match t {
Err(e) => {
log_err!("{}", e.to_string());
return Err(NgError::SparqlError(e.to_string()));
}
Ok(triple) => {
log_debug!("Triple fetched: {:?}", triple);
result_triples.push(triple);
}
}
}
Ok(result_triples)
}
_ => return Err(NgError::InvalidResponse),
}
}
}
/// Heuristic:
/// Consider a string an IRI if it contains alphanumeric characters and then a colon within the first 13 characters
pub fn is_iri(s: &str) -> bool {
lazy_static! {
static ref IRI_REGEX: Regex = Regex::new(r"^[A-Za-z][A-Za-z0-9+\.\-]{1,12}:").unwrap();
}
IRI_REGEX.is_match(s)
}
pub fn literal_to_sparql_str(var: OrmSchemaDataType) -> Vec<String> {
match var.literals {
None => [].to_vec(),
Some(literals) => literals
.iter()
.map(|literal| match literal {
BasicType::Bool(val) => {
if *val {
"true".to_string()
} else {
"false".to_string()
}
}
BasicType::Num(number) => number.to_string(),
BasicType::Str(sting) => {
if is_iri(sting) {
format!("<{}>", sting)
} else {
format!("\"{}\"", escape_literal(sting))
}
}
})
.collect(),
}
}
pub fn shape_type_to_sparql(
schema: &OrmSchema,
shape: &ShapeIri,
filter_subjects: Option<Vec<String>>,
) -> Result<String, NgError> {
// Use a counter to generate unique variable names.
let mut var_counter = 0;
fn get_new_var_name(counter: &mut i32) -> String {
let name = format!("v{}", counter);
*counter += 1;
name
}
// Collect all statements to be added to the construct and where bodies.
let mut construct_statements = Vec::new();
let mut where_statements = Vec::new();
// Keep track of visited shapes while recursing to prevent infinite loops.
let mut visited_shapes: HashSet<ShapeIri> = HashSet::new();
// Recursive function to call for (nested) shapes.
// Returns nested WHERE statements that should be included with this shape's binding.
fn process_shape(
schema: &OrmSchema,
shape: &OrmSchemaShape,
subject_var_name: &str,
construct_statements: &mut Vec<String>,
where_statements: &mut Vec<String>,
var_counter: &mut i32,
visited_shapes: &mut HashSet<String>,
in_recursion: bool,
) -> Vec<String> {
// Prevent infinite recursion on cyclic schemas.
// TODO: We could handle this as IRI string reference.
if visited_shapes.contains(&shape.iri) {
return vec![];
}
let mut new_where_statements: Vec<String> = vec![];
let mut new_construct_statements: Vec<String> = vec![];
visited_shapes.insert(shape.iri.clone());
// Add statements for each predicate.
// If we are in recursion, we want to get all triples.
// That's why we add a "<subject> ?p ?o" statement afterwards
// and the extra construct statements are skipped.
for predicate in &shape.predicates {
let mut union_branches = Vec::new();
let mut nested_where_statements = Vec::new();
// Predicate constraints might have more than one acceptable nested shape. Traverse each.
for datatype in &predicate.dataTypes {
if datatype.valType == OrmSchemaLiteralType::shape {
let shape_iri = &datatype.shape.clone().unwrap();
let nested_shape = schema.get(shape_iri).unwrap();
// For the current acceptable shape, add CONSTRUCT, WHERE, and recurse.
// Each shape option gets its own var.
let obj_var_name = get_new_var_name(var_counter);
if !in_recursion {
new_construct_statements.push(format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, obj_var_name
));
}
// Those are later added to a UNION, if there is more than one shape.
union_branches.push(format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, obj_var_name
));
// Recurse to add statements for nested object.
// Collect nested WHERE statements to include within this predicate's scope.
let nested_stmts = process_shape(
schema,
nested_shape,
&obj_var_name,
construct_statements,
where_statements,
var_counter,
visited_shapes,
true,
);
nested_where_statements.extend(nested_stmts);
}
}
// The where statement (which may be wrapped in OPTIONAL).
let where_body: String;
if !union_branches.is_empty() {
// We have nested shape(s) which were already added to CONSTRUCT above.
// Join them with UNION and include nested WHERE statements.
let union_body = union_branches
.into_iter()
.map(|b| format!("{{\n{}\n}}", b))
.collect::<Vec<_>>()
.join(" UNION ");
// Combine the parent binding with nested statements
if !nested_where_statements.is_empty() {
let nested_joined = nested_where_statements.join(" .\n");
where_body = format!("{} .\n{}", union_body, nested_joined);
} else {
where_body = union_body;
}
} else {
// Regular predicate data type. Just add basic CONSTRUCT and WHERE statements.
let obj_var_name = get_new_var_name(var_counter);
if !in_recursion {
// Only add construct, if we don't have catch-all statement already.
new_construct_statements.push(format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, obj_var_name
));
}
where_body = format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, obj_var_name
);
}
// Wrap in optional, if predicate is optional
if predicate.minCardinality < 1 {
new_where_statements.push(format!(" OPTIONAL {{\n{}\n }}", where_body));
} else {
new_where_statements.push(where_body);
};
}
if in_recursion {
// All statements in recursive objects need to be optional
// because we want to fetch _all_ nested objects,
// invalid ones too, for later validation.
let pred_var_name = get_new_var_name(var_counter);
let obj_var_name = get_new_var_name(var_counter);
// The "catch any triple in subject" construct statement
construct_statements.push(format!(
" ?{} ?{} ?{}",
subject_var_name, pred_var_name, obj_var_name
));
let joined_where_statements = new_where_statements.join(" .\n");
// Return nested statements to be included in parent's scope
// Combine catch-all with specific predicates in a UNION
let nested_block = format!(
" {{\n {{?{} ?{} ?{}}}\n UNION {{\n {}\n }}\n }}",
subject_var_name, pred_var_name, obj_var_name, joined_where_statements
);
visited_shapes.remove(&shape.iri);
return vec![nested_block];
} else {
where_statements.append(&mut new_where_statements);
construct_statements.append(&mut new_construct_statements);
}
visited_shapes.remove(&shape.iri);
vec![]
}
let root_shape = schema.get(shape).ok_or(VerifierError::InvalidOrmSchema)?;
// Root subject variable name
let root_var_name = get_new_var_name(&mut var_counter);
process_shape(
schema,
root_shape,
&root_var_name,
&mut construct_statements,
&mut where_statements,
&mut var_counter,
&mut visited_shapes,
false,
);
// Filter subjects, if present.
if let Some(subjects) = filter_subjects {
// log_debug!("filter_subjects: {:?}", subjects);
let subjects_str = subjects
.iter()
.map(|s| format!("<{}>", s))
.collect::<Vec<_>>()
.join(", ");
where_statements.push(format!(" FILTER(?v0 IN ({}))", subjects_str));
}
// Create query from statements.
let construct_body = construct_statements.join(" .\n");
let where_body = where_statements.join(" .\n");
Ok(format!(
"CONSTRUCT {{\n{}\n}}\nWHERE {{\n{}\n}}",
construct_body, where_body
))
}
/// SPARQL literal escape: backslash, quotes, newlines, tabs.
fn escape_literal(lit: &str) -> String {
let mut out = String::with_capacity(lit.len() + 4);
for c in lit.chars() {
match c {
'\\' => out.push_str("\\\\"),
'\"' => out.push_str("\\\""),
'\n' => out.push_str("\\n"),
'\r' => out.push_str("\\r"),
'\t' => out.push_str("\\t"),
_ => out.push(c),
}
}
return out;
}

@ -105,7 +105,10 @@ impl Verifier {
// Check 3) Validate subject against each predicate in shape.
for p_schema in shape.predicates.iter() {
let p_change = s_change.predicates.get(&p_schema.iri);
let tracked_pred = p_change.map(|pc| pc.tracked_predicate.read().unwrap());
let tracked_pred = tracked_subject
.tracked_predicates
.get(&p_schema.iri)
.map(|tp_write_lock| tp_write_lock.read().unwrap());
let count = tracked_pred
.as_ref()
@ -124,6 +127,8 @@ impl Verifier {
set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid);
if count <= 0 {
// If cardinality is 0, we can remove the tracked predicate.
// Drop the guard to release the immutable borrow
drop(tracked_pred);
tracked_subject.tracked_predicates.remove(&p_schema.iri);
}
break;

@ -91,3 +91,8 @@ pub struct OrmSubscription {
}
pub type ShapeIri = String;
pub type SubjectIri = String;
// Structure to store changes in. By shape iri > subject iri > OrmTrackedSubjectChange
// **NOTE**: In comparison to OrmSubscription.tracked_subjects, the outer hashmap's keys are shape IRIs.
// (shape IRI -> (subject IRI -> OrmTrackedSubjectChange))
pub type OrmChanges = HashMap<ShapeIri, HashMap<SubjectIri, OrmTrackedSubjectChange>>;

@ -8,276 +8,14 @@
// according to those terms.
use ng_oxigraph::oxrdf::Subject;
use ng_repo::log::*;
use ng_repo::types::OverlayId;
use yrs::types::PathSegment;
use std::collections::HashMap;
use std::collections::HashSet;
use lazy_static::lazy_static;
pub use ng_net::orm::{OrmDiff, OrmShapeType};
use ng_net::{app_protocol::*, orm::*};
use ng_oxigraph::oxrdf::Triple;
use ng_repo::errors::NgError;
use ng_repo::errors::VerifierError;
use regex::Regex;
use crate::orm::types::*;
/// Heuristic:
/// Consider a string an IRI if it contains alphanumeric characters and then a colon within the first 13 characters
pub fn is_iri(s: &str) -> bool {
lazy_static! {
static ref IRI_REGEX: Regex = Regex::new(r"^[A-Za-z][A-Za-z0-9+\.\-]{1,12}:").unwrap();
}
IRI_REGEX.is_match(s)
}
pub fn literal_to_sparql_str(var: OrmSchemaDataType) -> Vec<String> {
match var.literals {
None => [].to_vec(),
Some(literals) => literals
.iter()
.map(|literal| match literal {
BasicType::Bool(val) => {
if *val {
"true".to_string()
} else {
"false".to_string()
}
}
BasicType::Num(number) => number.to_string(),
BasicType::Str(sting) => {
if is_iri(sting) {
format!("<{}>", sting)
} else {
format!("\"{}\"", escape_literal(sting))
}
}
})
.collect(),
}
}
pub fn shape_type_to_sparql(
schema: &OrmSchema,
shape: &ShapeIri,
filter_subjects: Option<Vec<String>>,
) -> Result<String, NgError> {
// Use a counter to generate unique variable names.
let mut var_counter = 0;
fn get_new_var_name(counter: &mut i32) -> String {
let name = format!("v{}", counter);
*counter += 1;
name
}
// Collect all statements to be added to the construct and where bodies.
let mut construct_statements = Vec::new();
let mut where_statements = Vec::new();
// Keep track of visited shapes while recursing to prevent infinite loops.
let mut visited_shapes: HashSet<ShapeIri> = HashSet::new();
// Recursive function to call for (nested) shapes.
// Returns nested WHERE statements that should be included with this shape's binding.
fn process_shape(
schema: &OrmSchema,
shape: &OrmSchemaShape,
subject_var_name: &str,
construct_statements: &mut Vec<String>,
where_statements: &mut Vec<String>,
var_counter: &mut i32,
visited_shapes: &mut HashSet<String>,
in_recursion: bool,
) -> Vec<String> {
// Prevent infinite recursion on cyclic schemas.
// TODO: We could handle this as IRI string reference.
if visited_shapes.contains(&shape.iri) {
return vec![];
}
let mut new_where_statements: Vec<String> = vec![];
let mut new_construct_statements: Vec<String> = vec![];
visited_shapes.insert(shape.iri.clone());
// Add statements for each predicate.
// If we are in recursion, we want to get all triples.
// That's why we add a "<subject> ?p ?o" statement afterwards
// and the extra construct statements are skipped.
for predicate in &shape.predicates {
let mut union_branches = Vec::new();
let mut nested_where_statements = Vec::new();
// Predicate constraints might have more than one acceptable nested shape. Traverse each.
for datatype in &predicate.dataTypes {
if datatype.valType == OrmSchemaLiteralType::shape {
let shape_iri = &datatype.shape.clone().unwrap();
let nested_shape = schema.get(shape_iri).unwrap();
// For the current acceptable shape, add CONSTRUCT, WHERE, and recurse.
// Each shape option gets its own var.
let obj_var_name = get_new_var_name(var_counter);
if !in_recursion {
new_construct_statements.push(format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, obj_var_name
));
}
// Those are later added to a UNION, if there is more than one shape.
union_branches.push(format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, obj_var_name
));
// Recurse to add statements for nested object.
// Collect nested WHERE statements to include within this predicate's scope.
let nested_stmts = process_shape(
schema,
nested_shape,
&obj_var_name,
construct_statements,
where_statements,
var_counter,
visited_shapes,
true,
);
nested_where_statements.extend(nested_stmts);
}
}
// The where statement (which may be wrapped in OPTIONAL).
let where_body: String;
if !union_branches.is_empty() {
// We have nested shape(s) which were already added to CONSTRUCT above.
// Join them with UNION and include nested WHERE statements.
let union_body = union_branches
.into_iter()
.map(|b| format!("{{\n{}\n}}", b))
.collect::<Vec<_>>()
.join(" UNION ");
// Combine the parent binding with nested statements
if !nested_where_statements.is_empty() {
let nested_joined = nested_where_statements.join(" .\n");
where_body = format!("{} .\n{}", union_body, nested_joined);
} else {
where_body = union_body;
}
} else {
// Regular predicate data type. Just add basic CONSTRUCT and WHERE statements.
let obj_var_name = get_new_var_name(var_counter);
if !in_recursion {
// Only add construct, if we don't have catch-all statement already.
new_construct_statements.push(format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, obj_var_name
));
}
where_body = format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, obj_var_name
);
}
// Wrap in optional, if predicate is optional
if predicate.minCardinality < 1 {
new_where_statements.push(format!(" OPTIONAL {{\n{}\n }}", where_body));
} else {
new_where_statements.push(where_body);
};
}
if in_recursion {
// All statements in recursive objects need to be optional
// because we want to fetch _all_ nested objects,
// invalid ones too, for later validation.
let pred_var_name = get_new_var_name(var_counter);
let obj_var_name = get_new_var_name(var_counter);
// The "catch any triple in subject" construct statement
construct_statements.push(format!(
" ?{} ?{} ?{}",
subject_var_name, pred_var_name, obj_var_name
));
let joined_where_statements = new_where_statements.join(" .\n");
// Return nested statements to be included in parent's scope
// Combine catch-all with specific predicates in a UNION
let nested_block = format!(
" {{\n {{?{} ?{} ?{}}}\n UNION {{\n {}\n }}\n }}",
subject_var_name, pred_var_name, obj_var_name, joined_where_statements
);
visited_shapes.remove(&shape.iri);
return vec![nested_block];
} else {
where_statements.append(&mut new_where_statements);
construct_statements.append(&mut new_construct_statements);
}
visited_shapes.remove(&shape.iri);
vec![]
}
let root_shape = schema.get(shape).ok_or(VerifierError::InvalidOrmSchema)?;
// Root subject variable name
let root_var_name = get_new_var_name(&mut var_counter);
process_shape(
schema,
root_shape,
&root_var_name,
&mut construct_statements,
&mut where_statements,
&mut var_counter,
&mut visited_shapes,
false,
);
// Filter subjects, if present.
if let Some(subjects) = filter_subjects {
// log_debug!("filter_subjects: {:?}", subjects);
let subjects_str = subjects
.iter()
.map(|s| format!("<{}>", s))
.collect::<Vec<_>>()
.join(", ");
where_statements.push(format!(" FILTER(?v0 IN ({}))", subjects_str));
}
// Create query from statements.
let construct_body = construct_statements.join(" .\n");
let where_body = where_statements.join(" .\n");
Ok(format!(
"CONSTRUCT {{\n{}\n}}\nWHERE {{\n{}\n}}",
construct_body, where_body
))
}
/// SPARQL literal escape: backslash, quotes, newlines, tabs.
fn escape_literal(lit: &str) -> String {
let mut out = String::with_capacity(lit.len() + 4);
for c in lit.chars() {
match c {
'\\' => out.push_str("\\\\"),
'\"' => out.push_str("\\\""),
'\n' => out.push_str("\\n"),
'\r' => out.push_str("\\r"),
'\t' => out.push_str("\\t"),
_ => out.push(c),
}
}
return out;
}
pub fn group_by_subject_for_shape<'a>(
shape: &OrmSchemaShape,

@ -67,6 +67,7 @@ function isPrimitive(v: unknown): v is string | number | boolean {
);
}
// TODO: Escape slashes and tildes (~1, ~0)
/**
* Apply a diff to an object.
*

@ -7,8 +7,105 @@
// notice may not be copied, modified, or distributed except
// according to those terms.
use ng_repo::log_err;
use serde_json::Value;
use crate::local_broker::{doc_create, doc_sparql_update};
#[doc(hidden)]
pub mod orm_creation;
#[doc(hidden)]
pub mod orm;
pub mod orm_patches;
#[doc(hidden)]
pub mod create_or_open_wallet;
pub(crate) async fn create_doc_with_data(session_id: u64, sparql_insert: String) -> String {
let doc_nuri = doc_create(
session_id,
"Graph".to_string(),
"test_orm_query".to_string(),
"store".to_string(),
None,
None,
)
.await
.expect("error creating doc");
// Insert data
doc_sparql_update(session_id, sparql_insert, Some(doc_nuri.clone()))
.await
.expect("SPARQL update failed");
return doc_nuri;
}
pub(crate) fn assert_json_eq(expected: &mut Value, actual: &mut Value) {
remove_id_fields(expected);
remove_id_fields(actual);
sort_arrays(expected);
sort_arrays(actual);
let diff = serde_json_diff::values(expected.clone(), actual.clone());
if let Some(diff_) = diff {
log_err!(
"Expected and actual ORM JSON mismatch.\nDiff: {:?}\nExpected: {}\nActual: {}",
diff_,
expected,
actual
);
assert!(false);
}
}
/// Helper to recursively sort all arrays in nested objects into a stable ordering.
/// Arrays are sorted by their JSON string representation.
fn sort_arrays(value: &mut Value) {
match value {
Value::Object(map) => {
for v in map.values_mut() {
sort_arrays(v);
}
}
Value::Array(arr) => {
// First, recursively sort nested structures
for v in arr.iter_mut() {
sort_arrays(v);
}
// Then sort the array itself by JSON string representation
arr.sort_by(|a, b| {
let a_str = canonical_json::ser::to_string(a).unwrap_or_default();
let b_str = canonical_json::ser::to_string(b).unwrap_or_default();
a_str.cmp(&b_str)
});
}
_ => {}
}
}
/// Helper to recursively remove nested "id" fields from nested objects,
/// but only if they are not at the root level.
fn remove_id_fields(value: &mut Value) {
fn remove_id_fields_inner(value: &mut Value, is_root: bool) {
match value {
Value::Object(map) => {
if !is_root {
map.remove("id");
}
for v in map.values_mut() {
remove_id_fields_inner(v, false);
}
}
Value::Array(arr) => {
for v in arr {
remove_id_fields_inner(v, false);
}
}
_ => {}
}
}
remove_id_fields_inner(value, true);
}

@ -9,17 +9,17 @@
use crate::local_broker::{doc_create, doc_sparql_construct, doc_sparql_update, orm_start};
use crate::tests::create_or_open_wallet::create_or_open_wallet;
use crate::tests::{assert_json_eq, create_doc_with_data};
use async_std::stream::StreamExt;
use ng_net::app_protocol::{AppResponse, AppResponseV0, NuriV0};
use ng_net::orm::{
self, BasicType, OrmSchema, OrmSchemaDataType, OrmSchemaLiteralType, OrmSchemaPredicate,
BasicType, OrmSchema, OrmSchemaDataType, OrmSchemaLiteralType, OrmSchemaPredicate,
OrmSchemaShape, OrmShapeType,
};
use ng_verifier::orm::utils::shape_type_to_sparql;
use ng_repo::{log_err, log_info};
use ng_repo::log_info;
use ng_verifier::orm::query::shape_type_to_sparql;
use serde_json::json;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
@ -1788,87 +1788,3 @@ fn create_big_schema() -> OrmSchema {
return schema;
}
async fn create_doc_with_data(session_id: u64, sparql_insert: String) -> String {
let doc_nuri = doc_create(
session_id,
"Graph".to_string(),
"test_orm_query".to_string(),
"store".to_string(),
None,
None,
)
.await
.expect("error creating doc");
// Insert data
doc_sparql_update(session_id, sparql_insert, Some(doc_nuri.clone()))
.await
.expect("SPARQL update failed");
return doc_nuri;
}
fn assert_json_eq(expected: &mut Value, actual: &mut Value) {
remove_id_fields(expected);
remove_id_fields(actual);
sort_arrays(expected);
sort_arrays(actual);
let diff = serde_json_diff::values(expected.clone(), actual.clone());
if let Some(diff_) = diff {
log_err!("Expected and actual ORM JSON mismatch.\nDiff: {:?}", diff_);
assert!(false);
}
}
/// Helper to recursively sort all arrays in nested objects into a stable ordering.
/// Arrays are sorted by their JSON string representation.
fn sort_arrays(value: &mut Value) {
match value {
Value::Object(map) => {
for v in map.values_mut() {
sort_arrays(v);
}
}
Value::Array(arr) => {
// First, recursively sort nested structures
for v in arr.iter_mut() {
sort_arrays(v);
}
// Then sort the array itself by JSON string representation
arr.sort_by(|a, b| {
let a_str = canonical_json::ser::to_string(a).unwrap_or_default();
let b_str = canonical_json::ser::to_string(b).unwrap_or_default();
a_str.cmp(&b_str)
});
}
_ => {}
}
}
/// Helper to recursively remove nested "id" fields from nested objects,
/// but only if they are not at the root level.
fn remove_id_fields(value: &mut Value) {
fn remove_id_fields_inner(value: &mut Value, is_root: bool) {
match value {
Value::Object(map) => {
if !is_root {
map.remove("id");
}
for v in map.values_mut() {
remove_id_fields_inner(v, false);
}
}
Value::Array(arr) => {
for v in arr {
remove_id_fields_inner(v, false);
}
}
_ => {}
}
}
remove_id_fields_inner(value, true);
}

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save