diff --git a/engine/verifier/src/orm/handle_backend_update.rs b/engine/verifier/src/orm/handle_backend_update.rs new file mode 100644 index 0000000..b8a2b0e --- /dev/null +++ b/engine/verifier/src/orm/handle_backend_update.rs @@ -0,0 +1,528 @@ +// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers +// All rights reserved. +// Licensed under the Apache License, Version 2.0 +// +// or the MIT license , +// at your option. All files in the project carrying such +// notice may not be copied, modified, or distributed except +// according to those terms. + +use std::collections::HashMap; +use std::u64; + +use futures::SinkExt; +pub use ng_net::orm::{OrmDiff, OrmShapeType}; +use ng_net::{app_protocol::*, orm::*}; +use ng_repo::log::*; + +use crate::orm::types::*; +use crate::orm::utils::*; +use crate::orm::OrmChanges; +use crate::types::*; +use crate::verifier::*; +use ng_net::types::OverlayLink; +use ng_oxigraph::oxrdf::Triple; +use ng_repo::types::OverlayId; +use ng_repo::types::RepoId; +use serde_json::json; +use serde_json::Value; +use std::collections::HashSet; +use std::sync::Arc; +use std::sync::RwLock; + +impl Verifier { + /// Generate and send JSON patches from GraphQuadsPatch (quad inserts and removes) to JS-land. + pub(crate) async fn orm_backend_update( + &mut self, + session_id: u64, + repo_id: RepoId, + overlay_id: OverlayId, + patch: GraphQuadsPatch, + ) { + let overlaylink: OverlayLink = overlay_id.into(); + + // We need to apply the patches to all subscriptions we have. We can use process_changes_for_* + // That updates the tracked subjects, validates them, and returns a set of changes structured + // by the respective schema. + + let triple_inserts: Vec = patch + .inserts + .iter() + .map(|quad| { + Triple::new( + quad.subject.clone(), + quad.predicate.clone(), + quad.object.clone(), + ) + }) + .collect(); + let triple_removes: Vec = patch + .removes + .iter() + .map(|quad| { + Triple::new( + quad.subject.clone(), + quad.predicate.clone(), + quad.object.clone(), + ) + }) + .collect(); + + // let mut updates = Vec::new(); + + let mut scopes = vec![]; + for (scope, subs) in self.orm_subscriptions.iter_mut() { + // Remove old subscriptions + subs.retain(|sub| !sub.sender.is_closed()); + + if !(scope.target == NuriTargetV0::UserSite + || scope + .overlay + .as_ref() + .map_or(false, |ol| overlaylink == *ol) + || scope.target == NuriTargetV0::Repo(repo_id)) + { + continue; + } + + // prepare to apply updates to tracked subjects and record the changes. + let root_shapes = subs + .iter() + .map(|sub| { + sub.shape_type + .schema + .get(&sub.shape_type.shape) + .unwrap() + .clone() + }) + .collect::>(); + + scopes.push((scope.clone(), root_shapes)); + } + + log_debug!( + "[orm_backend_update], creating patch objects for scopes:\n{}", + scopes.len() + ); + for (scope, shapes) in scopes { + let mut orm_changes: OrmChanges = HashMap::new(); + + // Apply the changes to tracked subjects. + for shape_arc in shapes { + let _ = self.process_changes_for_shape_and_session( + &scope, + shape_arc, + session_id, + &triple_inserts, + &triple_removes, + &mut orm_changes, + false, + ); + } + + let subs = self.orm_subscriptions.get(&scope).unwrap(); + for sub in subs.iter() { + log_debug!( + "Applying changes to subscription with nuri {} and shape {}", + sub.nuri.repo(), + sub.shape_type.shape + ); + + // The JSON patches to send to JS land. + let mut patches: Vec = vec![]; + + // Keep track of created objects by path and if they need an id. + // Later we created patches from them to ensure the objects exist. + let mut paths_of_objects_to_create: HashSet<(Vec, Option)> = + HashSet::new(); + + // We construct object patches from a change (which is associated with a shape type). {op: add, valType: object, value: Null, path: ...} + // For each change that has a subject tracked in this subscription, + // - Get change operation (calling diff_op_from_pred_change). + // - case not object, single --> either add or remove (must be one of each at max) + // - case not object, multi --> just add and or set patch + // - case object, multi --> create object patch + nested object patch (will be handled when recursing paths to add primitive values) + // - case object, single --> just object patch (will be handled when recursing paths to add primitive values) + // - Add patches for each change operation for the path of the change in the schema. + // We find the path by traversing the schema up to the parents (add_diff_ops_for_tracked_subject). + + // TODO: Special edge case: An object with parents changed and the parents' predicate schema has multiple allowed shapes. + // Now, there are multiple tracked subjects with the same subject IRI but different shapes of which some + // are valid or invalid. The first valid (subject, shape) pair must used for materialization. + // - if a higher-priority shape became invalid but a lower priority shape is valid, delete and new add. + // - if a higher-priority shape became valid, delete and add new valid. + // Problem: We might not have the triples present to materialize the newly valid object so we need to fetch them. + + // Process changes for this subscription + // Iterate over all changes and create patches + for (shape_iri, subject_changes) in &orm_changes { + for (subject_iri, change) in subject_changes { + // Get the tracked subject for this (subject, shape) pair + let tracked_subject = sub + .tracked_subjects + .get(subject_iri) + .and_then(|shapes| shapes.get(shape_iri)) + .map(|ts| ts.read().unwrap()) + .unwrap(); + + // Now we have the tracked predicate (containing the shape) and the change. + // Check validity changes + if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Invalid + && tracked_subject.valid == OrmTrackedSubjectValidity::Invalid + { + // Is the subject invalid and was it before? There is nothing we need to inform about. + continue; + } else if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Valid + && tracked_subject.valid == OrmTrackedSubjectValidity::Invalid + || tracked_subject.valid == OrmTrackedSubjectValidity::Untracked + { + // Has the subject become invalid or untracked? + // We add a patch, deleting the object at its root. + let mut path: Vec; + if tracked_subject.parents.is_empty() { + // If this is a root object, we need to add the object's id itself. + path = vec![tracked_subject.subject_iri.clone()]; + } else { + path = vec![]; + } + + build_path_to_root_and_create_patches( + &tracked_subject, + &sub.tracked_subjects, + &sub.shape_type.shape, + &mut path, + (OrmDiffOpType::remove, Some(OrmDiffType::object), None, None), + &mut patches, + &mut paths_of_objects_to_create, + ); + } else { + // The subject is valid or has become valid. + // Process each predicate change + for (_pred_iri, pred_change) in &change.predicates { + let tracked_predicate = + pred_change.tracked_predicate.read().unwrap(); + let pred_name = tracked_predicate.schema.readablePredicate.clone(); + + // Get the diff operations for this predicate change + let diff_ops = create_diff_ops_from_predicate_change(pred_change); + + // For each diff operation, traverse up to the root to build the path + for diff_op in diff_ops { + let mut path = vec![pred_name.clone()]; + + // Start recursion from this tracked subject + build_path_to_root_and_create_patches( + &tracked_subject, + &sub.tracked_subjects, + &sub.shape_type.shape, + &mut path, + diff_op, + &mut patches, + &mut paths_of_objects_to_create, + ); + } + } + } + } + } + + // Create patches for objects that need to be created + // These are patches with {op: add, valType: object, value: Null, path: ...} + // Sort by path length (shorter first) to ensure parent objects are created before children + let mut sorted_object_paths: Vec<_> = paths_of_objects_to_create.iter().collect(); + sorted_object_paths.sort_by_key(|(path_segments, _)| path_segments.len()); + + for (path_segments, maybe_iri) in sorted_object_paths { + let escaped_path: Vec = path_segments + .iter() + .map(|seg| escape_json_pointer(seg)) + .collect(); + let json_pointer = format!("/{}", escaped_path.join("/")); + + patches.push(OrmDiffOp { + op: OrmDiffOpType::add, + valType: Some(OrmDiffType::object), + path: json_pointer.clone(), + value: None, + }); + if let Some(iri) = maybe_iri { + patches.push(OrmDiffOp { + op: OrmDiffOpType::add, + valType: Some(OrmDiffType::object), + path: format!("{}/id", json_pointer), + value: Some(json!(iri)), + }); + } + } + + // Send response with patches. + let _ = sub + .sender + .clone() + .send(AppResponse::V0(AppResponseV0::OrmUpdate(patches.to_vec()))) + .await; + } + } + } +} + +/// Find the predicate schema linking a parent to a child tracked subject and build the path segment. +/// Returns the updated path if a linking predicate is found. +fn build_path_segment_for_parent( + tracked_subject: &OrmTrackedSubject, + parent_ts: &OrmTrackedSubject, + base_path: &[String], +) -> Option> { + // Find the predicate schema linking parent to this tracked subject + for pred_arc in &parent_ts.shape.predicates { + // Check if this predicate has our subject as a child + if let Some(tracked_pred) = parent_ts.tracked_predicates.get(&pred_arc.iri) { + let tp = tracked_pred.read().unwrap(); + + // Check if this tracked subject is in the children + let is_child = tp.tracked_children.iter().any(|child| { + let child_read = child.read().unwrap(); + child_read.subject_iri == tracked_subject.subject_iri + }); + + if is_child { + // Build the path segment + let mut new_path = base_path.to_vec(); + + let is_multi = pred_arc.maxCardinality > 1 || pred_arc.maxCardinality == -1; + + // For multi-valued predicates, add the object IRI as a key first + if is_multi { + new_path.insert(0, tracked_subject.subject_iri.clone()); + } + + // Add the readable predicate name + new_path.insert(0, pred_arc.readablePredicate.clone()); + + return Some(new_path); + } + } + } + None +} + +/// Recursively build the path from a tracked subject to the root and create diff operation patches. +/// The function recurses from child to parents down to a root tracked subject. +/// If multiple parents exist, it adds separate patches for each. +fn build_path_to_root_and_create_patches( + tracked_subject: &OrmTrackedSubject, + tracked_subjects: &HashMap>>>, + root_shape: &String, + path: &mut Vec, + diff_op: ( + OrmDiffOpType, + Option, + Option, // The value added / removed + Option, // The IRI, if change is an added / removed object. + ), + patches: &mut Vec, + paths_of_objects_to_create: &mut HashSet<(Vec, Option)>, +) { + // If the tracked subject is not valid, we don't create patches for it + if tracked_subject.valid != OrmTrackedSubjectValidity::Valid { + return; + } + + // If the tracked subject is newly valid (was not valid before but is now), + // we need to ensure the object is created with an "add object" patch + if tracked_subject.prev_valid != OrmTrackedSubjectValidity::Valid { + // Check if we're at a root subject or need to traverse to parents + if tracked_subject.parents.is_empty() || tracked_subject.shape.iri == *root_shape { + // At root: build the path with the subject IRI + let escaped_path: Vec = + path.iter().map(|seg| escape_json_pointer(seg)).collect(); + let json_pointer = format!( + "/{}/{}", + escape_json_pointer(&tracked_subject.subject_iri), + escaped_path.join("/") + ); + + // Create an "add object" patch to ensure the object exists + patches.push(OrmDiffOp { + op: OrmDiffOpType::add, + valType: Some(OrmDiffType::object), + path: json_pointer.clone(), + value: None, + }); + + // Also add the id field for the object + patches.push(OrmDiffOp { + op: OrmDiffOpType::add, + valType: None, + path: format!("{}/id", json_pointer), + value: Some(json!(tracked_subject.subject_iri)), + }); + } else { + // Not at root: traverse to parents and create object patches along the way + for (_parent_iri, parent_tracked_subject) in tracked_subject.parents.iter() { + let parent_ts = parent_tracked_subject.read().unwrap(); + + if let Some(new_path) = + build_path_segment_for_parent(tracked_subject, &parent_ts, path) + { + // Recurse to the parent first + build_path_to_root_and_create_patches( + &parent_ts, + tracked_subjects, + root_shape, + &mut new_path.clone(), + (OrmDiffOpType::add, Some(OrmDiffType::object), None, None), + patches, + paths_of_objects_to_create, + ); + } + } + } + } + + // If this subject has no parents or its shape matches the root shape, we've reached the root + if tracked_subject.parents.is_empty() || tracked_subject.shape.iri == *root_shape { + // Build the final JSON Pointer path + let escaped_path: Vec = path.iter().map(|seg| escape_json_pointer(seg)).collect(); + // Always add the root subject to the path. + let json_pointer = format!( + "/{}/{}", + escape_json_pointer(&tracked_subject.subject_iri), + escaped_path.join("/") + ); + + // Create the patch + patches.push(OrmDiffOp { + op: diff_op.0.clone(), + valType: diff_op.1.clone(), + path: json_pointer.clone(), + value: diff_op.2.clone(), + }); + + // // If a new object is created on a predicate where multiple ones are allowed, create IRI path too. + // if let Some(added_obj_iri) = diff_op.3 { + // patches.push(OrmDiffOp { + // op: diff_op.0.clone(), + // valType: diff_op.1.clone(), + // path: format!("{}/{}", json_pointer, escape_json_pointer(&added_obj_iri)), + // value: diff_op.2.clone(), + // }); + // } + + return; + } + + // Recurse to parents + for (_parent_iri, parent_tracked_subject) in tracked_subject.parents.iter() { + let parent_ts = parent_tracked_subject.read().unwrap(); + + // Build the path segment for this parent + if let Some(new_path) = build_path_segment_for_parent(tracked_subject, &parent_ts, path) { + // Recurse to the parent + build_path_to_root_and_create_patches( + &parent_ts, + tracked_subjects, + root_shape, + &mut new_path.clone(), + diff_op.clone(), + patches, + paths_of_objects_to_create, + ); + } + } +} + +/// Create diff operations from a predicate change. +/// Returns a list of (op_type, value_type, value, iri) tuples. +fn create_diff_ops_from_predicate_change( + pred_change: &OrmTrackedPredicateChanges, +) -> Vec<( + OrmDiffOpType, + Option, + Option, // The value added / removed + Option, // The IRI, if change is an added / removed object. +)> { + let tracked_predicate = pred_change.tracked_predicate.read().unwrap(); + + let is_multi = tracked_predicate.schema.maxCardinality > 1 + || tracked_predicate.schema.maxCardinality == -1; + let is_object = tracked_predicate + .schema + .dataTypes + .iter() + .any(|dt| dt.shape.is_some()); + + let mut ops = vec![]; + + if !is_multi && !is_object { + if pred_change.values_added.len() == 1 { + // A value was added. Another one might have been removed + // but the add patch overwrite previous values. + return [( + OrmDiffOpType::add, + None, + Some(json!(pred_change.values_added[0])), + None, + )] + .to_vec(); + } else { + // Since there is only one possible value, removing the path is enough. + return [(OrmDiffOpType::remove, None, None, None)].to_vec(); + } + } else if is_multi && !is_object { + if pred_change.values_added.len() > 0 { + ops.push(( + OrmDiffOpType::add, + Some(OrmDiffType::set), + Some(json!(pred_change.values_added)), + None, + )); + } + if pred_change.values_removed.len() > 0 { + ops.push(( + OrmDiffOpType::remove, + Some(OrmDiffType::set), + Some(json!(pred_change.values_removed)), + None, + )); + } + return ops; + } + // else if !is_multi && is_object { + // if pred_change.values_added.len() > 0 { + // ops.push((OrmDiffOpType::add, Some(OrmDiffType::object), None, None)); + // } else if pred_change.values_removed.len() > 0 { + // ops.push((OrmDiffOpType::remove, Some(OrmDiffType::object), None, None)); + // } + // } else if is_multi && is_object { + // for val_added in pred_change.values_added.iter() { + // let iri = match val_added { + // BasicType::Str(s) => s, + // _ => { + // continue; + // } + // }; + // ops.push(( + // OrmDiffOpType::add, + // Some(OrmDiffType::object), + // None, + // Some(iri.clone()), + // )); + // } + // for val_removed in pred_change.values_added.iter() { + // let iri = match val_removed { + // BasicType::Str(s) => s, + // _ => { + // continue; + // } + // }; + // ops.push(( + // OrmDiffOpType::remove, + // Some(OrmDiffType::object), + // None, + // Some(iri.clone()), + // )); + // } + // } + return ops; +} diff --git a/engine/verifier/src/orm/handle_frontend_update.rs b/engine/verifier/src/orm/handle_frontend_update.rs new file mode 100644 index 0000000..4d49be7 --- /dev/null +++ b/engine/verifier/src/orm/handle_frontend_update.rs @@ -0,0 +1,115 @@ +// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers +// All rights reserved. +// Licensed under the Apache License, Version 2.0 +// +// or the MIT license , +// at your option. All files in the project carrying such +// notice may not be copied, modified, or distributed except +// according to those terms. + +use ng_oxigraph::oxrdf::Quad; +use ng_repo::errors::VerifierError; + +use std::u64; + +use futures::SinkExt; +use ng_net::app_protocol::*; +pub use ng_net::orm::{OrmDiff, OrmShapeType}; +use ng_repo::log::*; + +use crate::orm::types::*; +use crate::verifier::*; + +impl Verifier { + /// After creating new objects (without an id) in JS-land, + /// we send the generated id for those back. + /// If something went wrong (revert_inserts / revert_removes not empty), + /// we send a JSON patch back to revert the made changes. + pub(crate) async fn orm_update_self( + &mut self, + scope: &NuriV0, + shape_iri: ShapeIri, + session_id: u64, + skolemnized_blank_nodes: Vec, + revert_inserts: Vec, + revert_removes: Vec, + ) -> Result<(), VerifierError> { + let (mut sender, orm_subscription) = + self.get_first_orm_subscription_sender_for(scope, Some(&shape_iri), Some(&session_id))?; + + // TODO prepare OrmUpdateBlankNodeIds with skolemnized_blank_nodes + // use orm_subscription if needed + // note(niko): I think skolemnized blank nodes can still be many, in case of multi-level nested sub-objects. + let orm_bnids = vec![]; + let _ = sender + .send(AppResponse::V0(AppResponseV0::OrmUpdateBlankNodeIds( + orm_bnids, + ))) + .await; + + // TODO (later) revert the inserts and removes + // let orm_diff = vec![]; + // let _ = sender.send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff))).await; + + Ok(()) + } + + /// Handles updates coming from JS-land (JSON patches). + pub(crate) async fn orm_frontend_update( + &mut self, + session_id: u64, + scope: &NuriV0, + shape_iri: ShapeIri, + diff: OrmDiff, + ) -> Result<(), String> { + log_info!( + "frontend_update_orm session={} scope={:?} shape={} diff={:?}", + session_id, + scope, + shape_iri, + diff + ); + + let (doc_nuri, sparql_update) = { + let orm_subscription = + self.get_first_orm_subscription_for(scope, Some(&shape_iri), Some(&session_id)); + + // use orm_subscription as needed + // do the magic, then, find the doc where the query should start and generate the sparql update + let doc_nuri = NuriV0::new_empty(); + let sparql_update: String = String::new(); + (doc_nuri, sparql_update) + }; + + match self + .process_sparql_update( + &doc_nuri, + &sparql_update, + &None, + self.get_peer_id_for_skolem(), + session_id, + ) + .await + { + Err(e) => Err(e), + Ok((_, revert_inserts, revert_removes, skolemnized_blank_nodes)) => { + if !revert_inserts.is_empty() + || !revert_removes.is_empty() + || !skolemnized_blank_nodes.is_empty() + { + self.orm_update_self( + scope, + shape_iri, + session_id, + skolemnized_blank_nodes, + revert_inserts, + revert_removes, + ) + .await + .map_err(|e| e.to_string())?; + } + Ok(()) + } + } + } +} diff --git a/engine/verifier/src/orm/materialize.rs b/engine/verifier/src/orm/materialize.rs new file mode 100644 index 0000000..00efe18 --- /dev/null +++ b/engine/verifier/src/orm/materialize.rs @@ -0,0 +1,269 @@ +// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers +// All rights reserved. +// Licensed under the Apache License, Version 2.0 +// +// or the MIT license , +// at your option. All files in the project carrying such +// notice may not be copied, modified, or distributed except +// according to those terms. + +use futures::SinkExt; +use ng_net::orm::*; +pub use ng_net::orm::{OrmDiff, OrmShapeType}; +use ng_net::utils::Receiver; +use serde_json::json; +use serde_json::Value; +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::RwLock; + +use crate::orm::query::shape_type_to_sparql; +use crate::orm::types::*; +use crate::orm::utils::nuri_to_string; +use crate::types::CancelFn; +use crate::verifier::Verifier; +use ng_net::app_protocol::{AppResponse, AppResponseV0, NuriV0}; +use ng_net::orm::OrmSchemaShape; +use ng_repo::errors::NgError; +use ng_repo::log::*; +use std::u64; + +use futures::channel::mpsc; + +use crate::orm::{types::OrmTrackedSubjectChange, OrmChanges}; + +impl Verifier { + /// Entry point to create a new orm subscription. + /// Triggers the creation of an orm object which is sent back to the receiver. + pub(crate) async fn start_orm( + &mut self, + nuri: &NuriV0, + shape_type: &OrmShapeType, + session_id: u64, + ) -> Result<(Receiver, CancelFn), NgError> { + let (mut tx, rx) = mpsc::unbounded::(); + + // TODO: Validate schema: + // If multiple data types are present for the same predicate, they must be of of the same type. + // All referenced shapes must be available. + + // Create new subscription and add to self.orm_subscriptions + let orm_subscription = OrmSubscription { + shape_type: shape_type.clone(), + session_id: session_id, + sender: tx.clone(), + tracked_subjects: HashMap::new(), + nuri: nuri.clone(), + }; + + self.orm_subscriptions + .entry(nuri.clone()) + .or_insert(vec![]) + .push(orm_subscription); + + let orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type)?; + // log_debug!("create_orm_object_for_shape return {:?}", orm_objects); + + let _ = tx + .send(AppResponse::V0(AppResponseV0::OrmInitial(orm_objects))) + .await; + + let close = Box::new(move || { + log_debug!("closing ORM subscription"); + if !tx.is_closed() { + tx.close_channel(); + } + }); + Ok((rx, close)) + } + + /// For a nuri, session, and shape, create an ORM JSON object. + fn create_orm_object_for_shape( + &mut self, + nuri: &NuriV0, + session_id: u64, + shape_type: &OrmShapeType, + ) -> Result { + // Query triples for this shape + let shape_query = shape_type_to_sparql(&shape_type.schema, &shape_type.shape, None)?; + let shape_triples = self.query_sparql_construct(shape_query, Some(nuri_to_string(nuri)))?; + + let changes: OrmChanges = + self.apply_triple_changes(&shape_triples, &[], nuri, Some(session_id.clone()), true)?; + + let orm_subscription = + self.get_first_orm_subscription_for(nuri, Some(&shape_type.shape), Some(&session_id)); + + let schema: &HashMap> = &orm_subscription.shape_type.schema; + let root_shape = schema.get(&shape_type.shape).unwrap(); + let Some(_root_changes) = changes.get(&root_shape.iri).map(|s| s.values()) else { + return Ok(Value::Array(vec![])); + }; + + let mut return_vals: Value = Value::Array(vec![]); + let return_val_vec = return_vals.as_array_mut().unwrap(); + + // log_debug!( + // "Tracked subjects:\n{:?}\n", + // orm_subscription.tracked_subjects, + // ); + // For each valid change struct, we build an orm object. + // The way we get the changes from the tracked subjects is a bit hacky, sorry. + for (subject_iri, tracked_subjects_by_shape) in &orm_subscription.tracked_subjects { + if let Some(tracked_subject) = tracked_subjects_by_shape.get(&shape_type.shape) { + let ts = tracked_subject.read().unwrap(); + log_info!("changes for: {:?} valid: {:?}\n", ts.subject_iri, ts.valid); + + if ts.valid == OrmTrackedSubjectValidity::Valid { + if let Some(change) = changes + .get(&shape_type.shape) + .and_then(|subject_iri_to_ts| subject_iri_to_ts.get(subject_iri).clone()) + { + let new_val = materialize_orm_object( + change, + &changes, + root_shape, + &orm_subscription.tracked_subjects, + ); + // TODO: For some reason, this log statement causes a panic. + // log_debug!("Materialized change:\n{:?}\ninto:\n{:?}", change, new_val); + return_val_vec.push(new_val); + } + } + } + } + + return Ok(return_vals); + } +} + +/// Create ORM JSON object from OrmTrackedSubjectChange and shape. +pub(crate) fn materialize_orm_object( + change: &OrmTrackedSubjectChange, + changes: &OrmChanges, + shape: &OrmSchemaShape, + tracked_subjects: &HashMap>>>, +) -> Value { + let mut orm_obj = json!({"id": change.subject_iri}); + let orm_obj_map = orm_obj.as_object_mut().unwrap(); + for pred_schema in &shape.predicates { + let property_name = &pred_schema.readablePredicate; + let is_multi = pred_schema.maxCardinality > 1 || pred_schema.maxCardinality == -1; + + let Some(pred_change) = change.predicates.get(&pred_schema.iri) else { + // No triples for this property. + + if pred_schema.minCardinality == 0 && is_multi { + // If this predicate schema is an array though, insert empty array. + orm_obj_map.insert(property_name.clone(), Value::Array(vec![])); + } + + continue; + }; + + if pred_schema + .dataTypes + .iter() + .any(|dt| dt.valType == OrmSchemaLiteralType::shape) + { + // We have a nested type. + + // Helper closure to create Value structs from a nested object_iri. + let get_nested_orm_obj = |object_iri: &SubjectIri| { + // Find allowed schemas for the predicate's datatype. + let shape_iris: Vec = pred_schema + .dataTypes + .iter() + .flat_map(|dt| dt.shape.clone()) + .collect(); + + // Find subject_change for this subject. There exists at least one (shape, subject) pair. + // If multiple allowed shapes exist, the first one is chosen. + let nested = shape_iris.iter().find_map(|shape_iri| { + changes + .get(shape_iri) + .and_then(|subject_changes| subject_changes.get(object_iri)) + .map(|ch| (shape_iri, ch)) + }); + + if let Some((matched_shape_iri, nested_subject_change)) = nested { + if let Some(nested_tracked_subject) = tracked_subjects + .get(&nested_subject_change.subject_iri) + .and_then(|shape_to_tracked_orm| { + shape_to_tracked_orm.get(matched_shape_iri) + }) + { + let nested_tracked_subject = nested_tracked_subject.read().unwrap(); + if nested_tracked_subject.valid == OrmTrackedSubjectValidity::Valid { + // Recurse + return Some(materialize_orm_object( + nested_subject_change, + changes, + &nested_tracked_subject.shape, + tracked_subjects, + )); + } + } + } + None + }; + + if is_multi { + // Represent nested objects with more than one child + // as a map/object of -> nested object, + // since there is no conceptual ordering of the children. + let mut nested_objects_map = serde_json::Map::new(); + + // Add each nested objects. + for new_val in &pred_change.values_added { + if let BasicType::Str(object_iri) = new_val { + if let Some(nested_orm_obj) = get_nested_orm_obj(object_iri) { + nested_objects_map.insert(object_iri.clone(), nested_orm_obj); + } + } + } + orm_obj_map.insert(property_name.clone(), Value::Object(nested_objects_map)); + } else { + if let Some(BasicType::Str(object_iri)) = pred_change.values_added.get(0) { + if let Some(nested_orm_obj) = get_nested_orm_obj(object_iri) { + orm_obj_map.insert(property_name.clone(), nested_orm_obj); + } + } + } + } else { + // We have a basic type (string, number, bool, literal). + + if is_multi { + // Add values as array. + orm_obj_map.insert( + property_name.clone(), + Value::Array( + pred_change + .values_added + .iter() + .map(|v| match v { + BasicType::Bool(b) => json!(*b), + BasicType::Num(n) => json!(*n), + BasicType::Str(s) => json!(s), + }) + .collect(), + ), + ); + } else { + // Add value as primitive, if present. + if let Some(val) = pred_change.values_added.get(0) { + orm_obj_map.insert( + property_name.clone(), + match val { + BasicType::Bool(b) => json!(*b), + BasicType::Num(n) => json!(*n), + BasicType::Str(s) => json!(s), + }, + ); + } + } + } + } + + return orm_obj; +} diff --git a/engine/verifier/src/orm/mod.rs b/engine/verifier/src/orm/mod.rs index 910e85b..9e921ec 100644 --- a/engine/verifier/src/orm/mod.rs +++ b/engine/verifier/src/orm/mod.rs @@ -8,1196 +8,28 @@ // according to those terms. pub mod add_remove_triples; +pub mod handle_backend_update; +pub mod handle_frontend_update; +pub mod materialize; +pub mod process_changes; +pub mod query; +pub mod shape_validation; pub mod types; pub mod utils; -pub mod validation; -use futures::channel::mpsc; -use futures::channel::mpsc::UnboundedSender; -use ng_net::types::OverlayLink; -use ng_oxigraph::oxrdf::Quad; -use ng_repo::errors::VerifierError; -use ng_repo::types::OverlayId; -use ng_repo::types::RepoId; - -use std::collections::HashMap; -use std::collections::HashSet; -use std::sync::Arc; -use std::sync::RwLock; -use std::u64; - -use futures::SinkExt; pub use ng_net::orm::{OrmDiff, OrmShapeType}; -use ng_net::utils::Receiver; -use ng_net::{app_protocol::*, orm::*}; -use ng_oxigraph::oxigraph::sparql::{Query, QueryResults}; -use ng_oxigraph::oxrdf::Triple; -use ng_repo::errors::NgError; -use ng_repo::log::*; -use serde_json::json; -use serde_json::Value; +use std::collections::HashMap; -use crate::orm::add_remove_triples::add_remove_triples; use crate::orm::types::*; -use crate::orm::utils::*; -use crate::types::*; use crate::verifier::*; -// Structure to store changes in. By shape iri > subject iri > OrmTrackedSubjectChange -// **NOTE**: In comparison to OrmSubscription.tracked_subjects, the outer hashmap's keys are shape IRIs. -// (shape IRI -> (subject IRI -> OrmTrackedSubjectChange)) -type OrmChanges = HashMap>; - impl Verifier { - pub fn query_sparql_construct( - &self, - query: String, - nuri: Option, - ) -> Result, NgError> { - let oxistore = self.graph_dataset.as_ref().unwrap(); - - // let graph_nuri = NuriV0::repo_graph_name( - // &update.repo_id, - // &update.overlay_id, - // ); - //let base = NuriV0::repo_id(&repo.id); - - let nuri_str = nuri.as_ref().map(|s| s.as_str()); - log_debug!("querying construct\n{}\n{}\n", nuri_str.unwrap(), query); - - let parsed = - Query::parse(&query, nuri_str).map_err(|e| NgError::OxiGraphError(e.to_string()))?; - let results = oxistore - .query(parsed, nuri) - .map_err(|e| NgError::OxiGraphError(e.to_string()))?; - match results { - QueryResults::Graph(triples) => { - let mut result_triples: Vec = vec![]; - for t in triples { - match t { - Err(e) => { - log_err!("{}", e.to_string()); - return Err(NgError::SparqlError(e.to_string())); - } - Ok(triple) => { - log_debug!("Triple fetched: {:?}", triple); - result_triples.push(triple); - } - } - } - Ok(result_triples) - } - _ => return Err(NgError::InvalidResponse), - } - } - - /// Helper to call process_changes_for_shape for all subscriptions on nuri's document. - fn process_changes_for_nuri_and_session( - self: &mut Self, - nuri: &NuriV0, - session_id: u64, - triples_added: &[Triple], - triples_removed: &[Triple], - data_already_fetched: bool, - ) -> Result { - let mut orm_changes = HashMap::new(); - - let shapes: Vec<_> = self - .orm_subscriptions - .get(nuri) - .unwrap() - .iter() - .map(|sub| { - sub.shape_type - .schema - .get(&sub.shape_type.shape) - .unwrap() - .clone() - }) - .collect(); - - for root_shape in shapes { - self.process_changes_for_shape_and_session( - nuri, - root_shape, - session_id, - triples_added, - triples_removed, - &mut orm_changes, - data_already_fetched, - )?; - } - - Ok(orm_changes) - } - - /// Add and remove the triples from the tracked subjects, - /// re-validate, and update `changes` containing the updated data. - /// Works by queuing changes by shape and subjects on a stack. - /// Nested objects are added to the stack - fn process_changes_for_shape_and_session( - self: &mut Self, - nuri: &NuriV0, - root_shape: Arc, - session_id: u64, - triples_added: &[Triple], - triples_removed: &[Triple], - orm_changes: &mut OrmChanges, - data_already_fetched: bool, - ) -> Result<(), NgError> { - // First in, last out stack to keep track of objects to validate (nested objects first). Strings are object IRIs. - let mut shape_validation_stack: Vec<(Arc, Vec)> = vec![]; - // Track (shape_iri, subject_iri) pairs currently being validated to prevent cycles and double evaluation. - let mut currently_validating: HashSet<(String, String)> = HashSet::new(); - // Add root shape for first validation run. - let root_shape_iri = root_shape.iri.clone(); - shape_validation_stack.push((root_shape, vec![])); - - // Process queue of shapes and subjects to validate. - // For a given shape, we evaluate every subject against that shape. - while let Some((shape, objects_to_validate)) = shape_validation_stack.pop() { - // Collect triples relevant for validation. - let added_triples_by_subject = - group_by_subject_for_shape(&shape, triples_added, &objects_to_validate); - let removed_triples_by_subject = - group_by_subject_for_shape(&shape, triples_removed, &objects_to_validate); - let modified_subject_iris: HashSet<&SubjectIri> = added_triples_by_subject - .keys() - .chain(removed_triples_by_subject.keys()) - .collect(); - - let mut orm_subscription = self - .orm_subscriptions - .get_mut(nuri) - .unwrap() - .iter_mut() - .find(|sub| sub.session_id == session_id && sub.shape_type.shape == root_shape_iri) - .unwrap(); - - // Variable to collect nested objects that need validation. - let mut nested_objects_to_eval: HashMap> = - HashMap::new(); - - // For each subject, add/remove triples and validate. - log_debug!( - "processing modified subjects: {:?} against shape: {}", - modified_subject_iris, - shape.iri - ); - - for subject_iri in &modified_subject_iris { - let validation_key = (shape.iri.clone(), subject_iri.to_string()); - - // Cycle detection: Check if this (shape, subject) pair is already being validated - if currently_validating.contains(&validation_key) { - log_warn!( - "Cycle detected: subject '{}' with shape '{}' is already being validated. Marking as invalid.", - subject_iri, - shape.iri - ); - // Mark as invalid due to cycle - // TODO: We could handle this by handling nested references as IRIs. - if let Some(tracked_shapes) = - orm_subscription.tracked_subjects.get(*subject_iri) - { - if let Some(tracked_subject) = tracked_shapes.get(&shape.iri) { - let mut ts = tracked_subject.write().unwrap(); - ts.valid = OrmTrackedSubjectValidity::Invalid; - ts.tracked_predicates.clear(); - } - } - continue; - } - - // Mark as currently validating - currently_validating.insert(validation_key.clone()); - - // Get triples of subject (added & removed). - let triples_added_for_subj = added_triples_by_subject - .get(*subject_iri) - .map(|v| v.as_slice()) - .unwrap_or(&[]); - let triples_removed_for_subj = removed_triples_by_subject - .get(*subject_iri) - .map(|v| v.as_slice()) - .unwrap_or(&[]); - - // Get or create change object for (shape, subject) pair. - let change = orm_changes - .entry(shape.iri.clone()) - .or_insert_with(HashMap::new) - .entry((*subject_iri).clone()) - .or_insert_with(|| OrmTrackedSubjectChange { - subject_iri: (*subject_iri).clone(), - predicates: HashMap::new(), - data_applied: false, - }); - - // Apply all triples for that subject to the tracked (shape, subject) pair. - // Record the changes. - { - if !change.data_applied { - log_debug!( - "Adding triples to change tracker for subject {}", - subject_iri - ); - if let Err(e) = add_remove_triples( - shape.clone(), - subject_iri, - triples_added_for_subj, - triples_removed_for_subj, - &mut orm_subscription, - change, - ) { - log_err!("apply_changes_from_triples add/remove error: {:?}", e); - panic!(); - } - change.data_applied = true; - } else { - log_debug!("not applying triples again for subject {subject_iri}"); - } - - // Validate the subject. - let need_eval = - Self::update_subject_validity(change, &shape, &mut orm_subscription); - - // We add the need_eval to be processed next after loop. - // Filter out subjects already in the validation stack to prevent double evaluation. - for (iri, schema_shape, needs_refetch) in need_eval { - let eval_key = (schema_shape.clone(), iri.clone()); - if !currently_validating.contains(&eval_key) { - // Only add if not currently being validated - nested_objects_to_eval - .entry(schema_shape) - .or_insert_with(Vec::new) - .push((iri.clone(), needs_refetch)); - } - } - } - } - - // Now, we queue all non-evaluated objects - for (shape_iri, objects_to_eval) in &nested_objects_to_eval { - let orm_subscription = self.get_first_orm_subscription_for( - nuri, - Some(&root_shape_iri), - Some(&session_id), - ); - // Extract schema and shape Arc before mutable borrow - let schema = orm_subscription.shape_type.schema.clone(); - let shape_arc = schema.get(shape_iri).unwrap().clone(); - - // Data might need to be fetched (if it has not been during initialization or nested shape fetch). - if !data_already_fetched { - let objects_to_fetch = objects_to_eval - .iter() - .filter(|(_iri, needs_fetch)| *needs_fetch) - .map(|(s, _)| s.clone()) - .collect(); - - // Create sparql query - let shape_query = - shape_type_to_sparql(&schema, &shape_iri, Some(objects_to_fetch))?; - let new_triples = - self.query_sparql_construct(shape_query, Some(nuri_to_string(nuri)))?; - - // Recursively process nested objects. - self.process_changes_for_shape_and_session( - nuri, - shape_arc.clone(), - session_id, - &new_triples, - &vec![], - orm_changes, - true, - )?; - } - - // Add objects - let objects_not_to_fetch: Vec = objects_to_eval - .iter() - .filter(|(_iri, needs_fetch)| !*needs_fetch) - .map(|(s, _)| s.clone()) - .collect(); - if objects_not_to_fetch.len() > 0 { - // Queue all objects that don't need fetching. - shape_validation_stack.push((shape_arc, objects_not_to_fetch)); - } - } - for subject_iri in modified_subject_iris { - let validation_key = (shape.iri.clone(), subject_iri.to_string()); - currently_validating.remove(&validation_key); - } - } - - Ok(()) - } - - /// Helper to get orm subscriptions for nuri, shapes and sessions. - pub fn get_orm_subscriptions_for( - &self, - nuri: &NuriV0, - shape: Option<&ShapeIri>, - session_id: Option<&u64>, - ) -> Vec<&OrmSubscription> { - self.orm_subscriptions.get(nuri).unwrap(). - // Filter shapes, if present. - iter().filter(|s| match shape { - Some(sh) => *sh == s.shape_type.shape, - None => true - // Filter session ids if present. - }).filter(|s| match session_id { - Some(id) => *id == s.session_id, - None => true - }).collect() - } - - pub fn get_first_orm_subscription_for( - &self, - nuri: &NuriV0, - shape: Option<&ShapeIri>, - session_id: Option<&u64>, - ) -> &OrmSubscription { - self.orm_subscriptions.get(nuri).unwrap(). - // Filter shapes, if present. - iter().filter(|s| match shape { - Some(sh) => *sh == s.shape_type.shape, - None => true - // Filter session ids if present. - }).filter(|s| match session_id { - Some(id) => *id == s.session_id, - None => true - }).next().unwrap() - } - - pub fn get_first_orm_subscription_sender_for( - &mut self, - nuri: &NuriV0, - shape: Option<&ShapeIri>, - session_id: Option<&u64>, - ) -> Result<(UnboundedSender, &OrmSubscription), VerifierError> { - let subs = self.orm_subscriptions.get_mut(nuri).unwrap(); - subs.retain(|sub| !sub.sender.is_closed()); - match subs // Filter shapes, if present. - .iter() - .filter(|s| match shape { - Some(sh) => *sh == s.shape_type.shape, - None => true, // Filter session ids if present. - }) - .filter(|s| match session_id { - Some(id) => *id == s.session_id, - None => true, - }) - .next() - { - None => Err(VerifierError::OrmSubscriptionNotFound), - Some(subscription) => Ok((subscription.sender.clone(), subscription)), - } - } - - /// Apply triples to a nuri's document. - /// Updates tracked_subjects in orm_subscriptions. - fn apply_triple_changes( - &mut self, - triples_added: &[Triple], - triples_removed: &[Triple], - nuri: &NuriV0, - only_for_session_id: Option, - data_already_fetched: bool, - ) -> Result { - log_debug!("apply_triple_changes {:?}", only_for_session_id); - // If we have a specific session, handle only that subscription. - if let Some(session_id) = only_for_session_id { - return self.process_changes_for_nuri_and_session( - &nuri.clone(), - session_id, - triples_added, - triples_removed, - data_already_fetched, - ); - } - - // Otherwise, iterate all sessions. - let mut merged: OrmChanges = HashMap::new(); - - let session_ids: Vec<_> = self - .orm_subscriptions - .get(nuri) - .unwrap() - .iter() - .map(|s| s.session_id.clone()) - .collect(); - - for session_id in session_ids { - let changes = self.process_changes_for_nuri_and_session( - &nuri, - session_id, - triples_added, - triples_removed, - data_already_fetched, - )?; - - for (shape_iri, subj_map) in changes { - merged - .entry(shape_iri) - .or_insert_with(HashMap::new) - .extend(subj_map); - } - } - Ok(merged) - } - - /// Create ORM JSON object from OrmTrackedSubjectChange and shape. - fn materialize_orm_object( - change: &OrmTrackedSubjectChange, - changes: &OrmChanges, - shape: &OrmSchemaShape, - tracked_subjects: &HashMap>>>, - ) -> Value { - let mut orm_obj = json!({"id": change.subject_iri}); - let orm_obj_map = orm_obj.as_object_mut().unwrap(); - for pred_schema in &shape.predicates { - let property_name = &pred_schema.readablePredicate; - let is_multi = pred_schema.maxCardinality > 1 || pred_schema.maxCardinality == -1; - - let Some(pred_change) = change.predicates.get(&pred_schema.iri) else { - // No triples for this property. - - if pred_schema.minCardinality == 0 && is_multi { - // If this predicate schema is an array though, insert empty array. - orm_obj_map.insert(property_name.clone(), Value::Array(vec![])); - } - - continue; - }; - - if pred_schema - .dataTypes - .iter() - .any(|dt| dt.valType == OrmSchemaLiteralType::shape) - { - // We have a nested type. - - // Helper closure to create Value structs from a nested object_iri. - let get_nested_orm_obj = |object_iri: &SubjectIri| { - // Find allowed schemas for the predicate's datatype. - let shape_iris: Vec = pred_schema - .dataTypes - .iter() - .flat_map(|dt| dt.shape.clone()) - .collect(); - - // Find subject_change for this subject. There exists at least one (shape, subject) pair. - // If multiple allowed shapes exist, the first one is chosen. - let nested = shape_iris.iter().find_map(|shape_iri| { - changes - .get(shape_iri) - .and_then(|subject_changes| subject_changes.get(object_iri)) - .map(|ch| (shape_iri, ch)) - }); - - if let Some((matched_shape_iri, nested_subject_change)) = nested { - if let Some(nested_tracked_subject) = tracked_subjects - .get(&nested_subject_change.subject_iri) - .and_then(|shape_to_tracked_orm| { - shape_to_tracked_orm.get(matched_shape_iri) - }) - { - let nested_tracked_subject = nested_tracked_subject.read().unwrap(); - if nested_tracked_subject.valid == OrmTrackedSubjectValidity::Valid { - // Recurse - return Some(Self::materialize_orm_object( - nested_subject_change, - changes, - &nested_tracked_subject.shape, - tracked_subjects, - )); - } - } - } - None - }; - - if is_multi { - // Represent nested objects with more than one child - // as a map/object of -> nested object, - // since there is no conceptual ordering of the children. - let mut nested_objects_map = serde_json::Map::new(); - - // Add each nested objects. - for new_val in &pred_change.values_added { - if let BasicType::Str(object_iri) = new_val { - if let Some(nested_orm_obj) = get_nested_orm_obj(object_iri) { - nested_objects_map.insert(object_iri.clone(), nested_orm_obj); - } - } - } - orm_obj_map.insert(property_name.clone(), Value::Object(nested_objects_map)); - } else { - if let Some(BasicType::Str(object_iri)) = pred_change.values_added.get(0) { - if let Some(nested_orm_obj) = get_nested_orm_obj(object_iri) { - orm_obj_map.insert(property_name.clone(), nested_orm_obj); - } - } - } - } else { - // We have a basic type (string, number, bool, literal). - - if is_multi { - // Add values as array. - orm_obj_map.insert( - property_name.clone(), - Value::Array( - pred_change - .values_added - .iter() - .map(|v| match v { - BasicType::Bool(b) => json!(*b), - BasicType::Num(n) => json!(*n), - BasicType::Str(s) => json!(s), - }) - .collect(), - ), - ); - } else { - // Add value as primitive, if present. - if let Some(val) = pred_change.values_added.get(0) { - orm_obj_map.insert( - property_name.clone(), - match val { - BasicType::Bool(b) => json!(*b), - BasicType::Num(n) => json!(*n), - BasicType::Str(s) => json!(s), - }, - ); - } - } - } - } - - return orm_obj; - } - - /// For a nuri, session, and shape, create an ORM JSON object. - fn create_orm_object_for_shape( - &mut self, - nuri: &NuriV0, - session_id: u64, - shape_type: &OrmShapeType, - ) -> Result { - // Query triples for this shape - let shape_query = shape_type_to_sparql(&shape_type.schema, &shape_type.shape, None)?; - let shape_triples = self.query_sparql_construct(shape_query, Some(nuri_to_string(nuri)))?; - - let changes: OrmChanges = - self.apply_triple_changes(&shape_triples, &[], nuri, Some(session_id.clone()), true)?; - - let orm_subscription = - self.get_first_orm_subscription_for(nuri, Some(&shape_type.shape), Some(&session_id)); - - let schema: &HashMap> = &orm_subscription.shape_type.schema; - let root_shape = schema.get(&shape_type.shape).unwrap(); - let Some(_root_changes) = changes.get(&root_shape.iri).map(|s| s.values()) else { - return Ok(Value::Array(vec![])); - }; - - let mut return_vals: Value = Value::Array(vec![]); - let return_val_vec = return_vals.as_array_mut().unwrap(); - - // log_debug!( - // "Tracked subjects:\n{:?}\n", - // orm_subscription.tracked_subjects, - // ); - // For each valid change struct, we build an orm object. - // The way we get the changes from the tracked subjects is a bit hacky, sorry. - for (subject_iri, tracked_subjects_by_shape) in &orm_subscription.tracked_subjects { - if let Some(tracked_subject) = tracked_subjects_by_shape.get(&shape_type.shape) { - let ts = tracked_subject.read().unwrap(); - log_info!("changes for: {:?} valid: {:?}\n", ts.subject_iri, ts.valid); - - if ts.valid == OrmTrackedSubjectValidity::Valid { - if let Some(change) = changes - .get(&shape_type.shape) - .and_then(|subject_iri_to_ts| subject_iri_to_ts.get(subject_iri).clone()) - { - let new_val = Self::materialize_orm_object( - change, - &changes, - root_shape, - &orm_subscription.tracked_subjects, - ); - // TODO: For some reason, this log statement causes a panic. - // log_debug!("Materialized change:\n{:?}\ninto:\n{:?}", change, new_val); - return_val_vec.push(new_val); - } - } - } - } - - return Ok(return_vals); - } - - /// Generate and send JSON patches from GraphQuadsPatch (quad inserts and removes) to JS-land. - pub(crate) async fn orm_backend_update( - &mut self, - session_id: u64, - repo_id: RepoId, - overlay_id: OverlayId, - patch: GraphQuadsPatch, - ) { - let overlaylink: OverlayLink = overlay_id.into(); - - // We need to apply the patches to all subscriptions we have. We can use process_changes_for_* - // That updates the tracked subjects, validates them, and returns a set of changes structured - // by the respective schema. - - let triple_inserts: Vec = patch - .inserts - .iter() - .map(|quad| { - Triple::new( - quad.subject.clone(), - quad.predicate.clone(), - quad.object.clone(), - ) - }) - .collect(); - let triple_removes: Vec = patch - .removes - .iter() - .map(|quad| { - Triple::new( - quad.subject.clone(), - quad.predicate.clone(), - quad.object.clone(), - ) - }) - .collect(); - - // let mut updates = Vec::new(); - - let mut scopes = vec![]; - for (scope, subs) in self.orm_subscriptions.iter_mut() { - // Remove old subscriptions - subs.retain(|sub| !sub.sender.is_closed()); - - if scope.target == NuriTargetV0::UserSite - || scope - .overlay - .as_ref() - .map_or(false, |ol| overlaylink == *ol) - || scope.target == NuriTargetV0::Repo(repo_id) - { - continue; - } - - // prepare to apply updates to tracked subjects and record the changes. - let shapes = subs - .iter() - .map(|sub| { - sub.shape_type - .schema - .get(&sub.shape_type.shape) - .unwrap() - .clone() - }) - .collect::>(); - - scopes.push((scope.clone(), shapes)); - } - - for (scope, shapes) in scopes { - let mut orm_changes: OrmChanges = HashMap::new(); - - // actually applying updates to tracked subjects and record the changes. - for shape_arc in shapes { - let _ = self.process_changes_for_shape_and_session( - &scope, - shape_arc, - session_id, - &triple_inserts, - &triple_removes, - &mut orm_changes, - false, - ); - } - - let subs = self.orm_subscriptions.get(&scope).unwrap(); - for sub in subs.iter() { - // TODO: This if-condition is wrong (intended to not re-apply changes coming from the same subscription). - if sub.session_id == session_id { - continue; - } - // Create diff from changes & subscription. - - fn create_patches_for_nested_object( - pred_shape: &OrmSchemaPredicate, - tracked_subjects: &HashMap< - String, - HashMap>>, - >, - patches: &mut Vec, - path: &mut Vec, - object_iri: &String, - orm_changes: &OrmChanges, - sub: &OrmSubscription, - ) { - // Object was added. That means, we need to add a basic object with no value, - // Then add further predicates to it in a recursive call. - patches.push(OrmDiffOp { - op: OrmDiffOpType::add, - valType: Some(OrmDiffType::object), - path: format!("/{}", path.join("/")), - value: None, - }); - - // Get the shape IRI for a nested object that is valid. - let object_shape_iri = { - // Get the tracked subject for this object IRI - let tracked_subjects_for_obj = tracked_subjects - .get(object_iri) - .expect("Object should be tracked"); - - // Find the first valid shape for this object from the allowed shapes - let allowed_shape_iris: Vec<&String> = pred_shape - .dataTypes - .iter() - .filter_map(|dt| dt.shape.as_ref()) - .collect(); - - allowed_shape_iris - .iter() - .find(|shape_iri| { - tracked_subjects_for_obj - .get(**shape_iri) - .map(|ts| { - ts.read().unwrap().valid == OrmTrackedSubjectValidity::Valid - }) - .unwrap_or(false) - }) - .unwrap() - .to_string() - }; - - // Apply changes for nested object. - create_patches_for_changed_subj( - orm_changes, - patches, - &object_shape_iri, - &object_iri, - sub, - path, - tracked_subjects, - ); - } - - fn create_patches_for_changed_subj( - orm_changes: &OrmChanges, - patches: &mut OrmDiff, - shape_iri: &String, - subject_iri: &String, - sub: &OrmSubscription, - path: &mut Vec, - tracked_subjects: &HashMap< - SubjectIri, - HashMap>>, - >, - ) { - let change = orm_changes - .get(shape_iri) - .unwrap() - .get(subject_iri) - .unwrap(); - let subject_shape = sub.shape_type.schema.get(shape_iri).unwrap(); - - // @Niko, is it safe to do this? - let tracked_subject = tracked_subjects - .get(subject_iri) - .unwrap() - .get(shape_iri) - .unwrap() - .read() - .unwrap(); - - // Check validity changes - if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Invalid - && tracked_subject.valid == OrmTrackedSubjectValidity::Invalid - { - // Is the subject invalid and was it before? There is nothing we need to inform about. - return; - } else if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Valid - && tracked_subject.valid == OrmTrackedSubjectValidity::Invalid - || tracked_subject.valid == OrmTrackedSubjectValidity::Untracked - { - // Has the subject become invalid or untracked? - // We add a patch, deleting the object at its root. - patches.push(OrmDiffOp { - op: OrmDiffOpType::remove, - valType: Some(OrmDiffType::object), - path: format!("/{}", path.join("/")), - value: None, - }); - return; - } else { - // The subject is valid or has become valid. - // In both cases, all information necessary to send patches are available in the orm_changes. - - // In case the subject was not valid before, we create the object at the current path as an empty object though. - if tracked_subject.prev_valid != OrmTrackedSubjectValidity::Valid { - patches.push(OrmDiffOp { - op: OrmDiffOpType::add, - valType: Some(OrmDiffType::object), - path: format!("/{}", path.join("/")), - value: None, - }); - // And add the id field. - patches.push(OrmDiffOp { - op: OrmDiffOpType::add, - valType: None, - path: format!("/{}/{}", path.join("/"), subject_iri), - value: None, - }); - } - } - - // Iterate over every predicate change and create patches - for (pred_iri, pred_change) in change.predicates.iter() { - let pred_shape = subject_shape - .predicates - .iter() - .find(|p| p.iri == *pred_iri) - .unwrap(); - - let is_multi = - pred_shape.maxCardinality > 1 || pred_shape.maxCardinality == -1; - let is_object = pred_shape.dataTypes.iter().any(|dt| !dt.shape.is_none()); - let pred_name = pred_shape.readablePredicate.clone(); - path.push(pred_name); - let path_str = format!("/{}", path.join("/")); - - // Depending on the predicate type (multi / single, object / not object), - // add the respective diff operation. - - // Single primitive value - if !is_multi && !is_object { - if pred_change.values_added.len() > 0 { - patches.push(OrmDiffOp { - op: OrmDiffOpType::add, - valType: None, - path: path_str.clone(), - value: Some(json!(pred_change.values_added[0])), - }); - } - if pred_change.values_removed.len() > 0 { - patches.push(OrmDiffOp { - op: OrmDiffOpType::remove, - valType: None, - path: path_str, - value: Some(json!(pred_change.values_added[0])), - }); - } - } else if is_multi && !is_object { - // Set of primitive values - if pred_change.values_added.len() > 0 { - patches.push(OrmDiffOp { - op: OrmDiffOpType::add, - valType: Some(OrmDiffType::set), - path: path_str.clone(), - value: Some(json!(pred_change.values_added)), - }); - } - if pred_change.values_removed.len() > 0 { - patches.push(OrmDiffOp { - op: OrmDiffOpType::remove, - valType: Some(OrmDiffType::set), - path: path_str, - value: Some(json!(pred_change.values_removed)), - }); - } - } else if is_object { - // Change in single object property. - if !is_multi { - let object_iri = match &pred_change.values_added[0] { - BasicType::Str(iri) => iri, - _ => panic!("Object no IRI"), - }; - // Single object. - if pred_change.values_added.len() > 0 { - create_patches_for_nested_object( - pred_shape, - tracked_subjects, - patches, - path, - object_iri, - orm_changes, - sub, - ); - } - if pred_change.values_removed.len() > 0 { - // Object is removed. - patches.push(OrmDiffOp { - op: OrmDiffOpType::remove, - valType: Some(OrmDiffType::object), - path: path_str, - value: None, - }); - } - } else { - // Change(s) in multi object property. - - // Add every new object. - for obj_iri_bt in pred_change.values_added.iter() { - let obj_iri = match obj_iri_bt { - BasicType::Str(iri) => iri, - _ => panic!("Object no IRI"), - }; - - // First, we create a root object (if the object existed before, this has no effect). - patches.push(OrmDiffOp { - op: OrmDiffOpType::add, - valType: Some(OrmDiffType::object), - path: path_str.clone(), - value: None, - }); - - // Add escaped object IRI to path. - path.push(escape_json_pointer(obj_iri)); - - create_patches_for_nested_object( - pred_shape, - tracked_subjects, - patches, - path, - obj_iri, - orm_changes, - sub, - ); - - // Remove object IRI from stack again. - path.pop(); - } - - // Delete objects. - // If there are no more predicates, delete the whole object. - if pred_change - .tracked_predicate - .read() - .unwrap() - .tracked_children - .len() - == 0 - { - // Or the whole thing if no children remain - patches.push(OrmDiffOp { - op: OrmDiffOpType::remove, - valType: Some(OrmDiffType::object), - path: path_str, - value: None, - }); - } else { - for object_iri_removed in pred_change.values_removed.iter() { - patches.push(OrmDiffOp { - op: OrmDiffOpType::remove, - valType: Some(OrmDiffType::object), - path: format!( - "/{}/{}", - path_str, - match object_iri_removed { - BasicType::Str(iri) => iri, - _ => panic!("Object IRI must be string"), - } - ), - value: None, - }); - } - } - } - } - - // Remove this predicate name from the path again. - path.pop(); - } - } - - let mut patches: OrmDiff = vec![]; - let mut path: Vec = Vec::with_capacity(4); - - // Iterate over each root subject with the right shape - // For each tracked subject that has the subscription's shape, call fn above - for (subject_iri, tracked_subjects_by_shape) in sub.tracked_subjects.iter() { - for (shape_iri, tracked_subject) in tracked_subjects_by_shape.iter() { - if *shape_iri != sub.shape_type.shape { - continue; - } - // Found a root subject for this shape. - - // Add subject IRI as first part of path pointer. - path.push(escape_json_pointer(subject_iri)); - create_patches_for_changed_subj( - &orm_changes, - &mut patches, - shape_iri, - subject_iri, - sub, - &mut path, - &sub.tracked_subjects, - ); - path.pop(); - } - } - - // Send response with patches. - let _ = sub - .sender - .clone() - .send(AppResponse::V0(AppResponseV0::OrmUpdate(patches.to_vec()))) - .await; - } - } - } - - /// After creating new objects (without an id) in JS-land, - /// we send the generated id for those back. - /// If something went wrong (revert_inserts / revert_removes not empty), - /// we send a JSON patch back to revert the made changes. - pub(crate) async fn orm_update_self( - &mut self, - scope: &NuriV0, - shape_iri: ShapeIri, - session_id: u64, - skolemnized_blank_nodes: Vec, - revert_inserts: Vec, - revert_removes: Vec, - ) -> Result<(), VerifierError> { - let (mut sender, orm_subscription) = - self.get_first_orm_subscription_sender_for(scope, Some(&shape_iri), Some(&session_id))?; - - // TODO prepare OrmUpdateBlankNodeIds with skolemnized_blank_nodes - // use orm_subscription if needed - // note(niko): I think skolemnized blank nodes can still be many, in case of multi-level nested sub-objects. - let orm_bnids = vec![]; - let _ = sender - .send(AppResponse::V0(AppResponseV0::OrmUpdateBlankNodeIds( - orm_bnids, - ))) - .await; - - // TODO (later) revert the inserts and removes - // let orm_diff = vec![]; - // let _ = sender.send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff))).await; - - Ok(()) - } - - /// Handles updates coming from JS-land (JSON patches). - pub(crate) async fn orm_frontend_update( - &mut self, - session_id: u64, - scope: &NuriV0, - shape_iri: ShapeIri, - diff: OrmDiff, - ) -> Result<(), String> { - log_info!( - "frontend_update_orm session={} scope={:?} shape={} diff={:?}", - session_id, - scope, - shape_iri, - diff - ); - - let (doc_nuri, sparql_update) = { - let orm_subscription = - self.get_first_orm_subscription_for(scope, Some(&shape_iri), Some(&session_id)); - - // use orm_subscription as needed - // do the magic, then, find the doc where the query should start and generate the sparql update - let doc_nuri = NuriV0::new_empty(); - let sparql_update: String = String::new(); - (doc_nuri, sparql_update) - }; - - match self - .process_sparql_update( - &doc_nuri, - &sparql_update, - &None, - self.get_peer_id_for_skolem(), - session_id, - ) - .await - { - Err(e) => Err(e), - Ok((_, revert_inserts, revert_removes, skolemnized_blank_nodes)) => { - if !revert_inserts.is_empty() - || !revert_removes.is_empty() - || !skolemnized_blank_nodes.is_empty() - { - self.orm_update_self( - scope, - shape_iri, - session_id, - skolemnized_blank_nodes, - revert_inserts, - revert_removes, - ) - .await - .map_err(|e| e.to_string())?; - } - Ok(()) - } - } - } - pub(crate) fn clean_orm_subscriptions(&mut self) { self.orm_subscriptions.retain(|_, subscriptions| { subscriptions.retain(|sub| !sub.sender.is_closed()); !subscriptions.is_empty() }); } - - /// Entry point to create a new orm subscription. - /// Triggers the creation of an orm object which is sent back to the receiver. - pub(crate) async fn start_orm( - &mut self, - nuri: &NuriV0, - shape_type: &OrmShapeType, - session_id: u64, - ) -> Result<(Receiver, CancelFn), NgError> { - let (mut tx, rx) = mpsc::unbounded::(); - - // TODO: Validate schema: - // If multiple data types are present for the same predicate, they must be of of the same type. - // All referenced shapes must be available. - - // Create new subscription and add to self.orm_subscriptions - let orm_subscription = OrmSubscription { - shape_type: shape_type.clone(), - session_id: session_id, - sender: tx.clone(), - tracked_subjects: HashMap::new(), - nuri: nuri.clone(), - }; - - self.orm_subscriptions - .entry(nuri.clone()) - .or_insert(vec![]) - .push(orm_subscription); - - let orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type)?; - // log_debug!("create_orm_object_for_shape return {:?}", orm_objects); - - let _ = tx - .send(AppResponse::V0(AppResponseV0::OrmInitial(orm_objects))) - .await; - - let close = Box::new(move || { - log_debug!("closing ORM subscription"); - if !tx.is_closed() { - tx.close_channel(); - } - }); - Ok((rx, close)) - } } // Btw, orm/mod.rs is exceeding 1200 lines again. Is that a good practice? I have the feeling, we could separate a couple of things.. diff --git a/engine/verifier/src/orm/process_changes.rs b/engine/verifier/src/orm/process_changes.rs new file mode 100644 index 0000000..b3f698f --- /dev/null +++ b/engine/verifier/src/orm/process_changes.rs @@ -0,0 +1,387 @@ +// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers +// All rights reserved. +// Licensed under the Apache License, Version 2.0 +// +// or the MIT license , +// at your option. All files in the project carrying such +// notice may not be copied, modified, or distributed except +// according to those terms. + +use futures::channel::mpsc::UnboundedSender; +use ng_repo::errors::VerifierError; + +use std::collections::HashMap; +use std::collections::HashSet; +use std::sync::Arc; +use std::u64; + +pub use ng_net::orm::{OrmDiff, OrmShapeType}; +use ng_net::{app_protocol::*, orm::*}; +use ng_oxigraph::oxrdf::Triple; +use ng_repo::errors::NgError; +use ng_repo::log::*; + +use crate::orm::add_remove_triples::add_remove_triples; +use crate::orm::query::shape_type_to_sparql; +use crate::orm::types::*; +use crate::orm::utils::*; +use crate::orm::OrmChanges; +use crate::verifier::*; + +impl Verifier { + /// Apply triples to a nuri's document. + /// Updates tracked_subjects in orm_subscriptions. + pub(crate) fn apply_triple_changes( + &mut self, + triples_added: &[Triple], + triples_removed: &[Triple], + nuri: &NuriV0, + only_for_session_id: Option, + data_already_fetched: bool, + ) -> Result { + log_debug!("apply_triple_changes {:?}", only_for_session_id); + // If we have a specific session, handle only that subscription. + if let Some(session_id) = only_for_session_id { + return self.process_changes_for_nuri_and_session( + &nuri.clone(), + session_id, + triples_added, + triples_removed, + data_already_fetched, + ); + } + + // Otherwise, iterate all sessions. + let mut merged: OrmChanges = HashMap::new(); + + let session_ids: Vec<_> = self + .orm_subscriptions + .get(nuri) + .unwrap() + .iter() + .map(|s| s.session_id.clone()) + .collect(); + + for session_id in session_ids { + let changes = self.process_changes_for_nuri_and_session( + &nuri, + session_id, + triples_added, + triples_removed, + data_already_fetched, + )?; + + for (shape_iri, subj_map) in changes { + merged + .entry(shape_iri) + .or_insert_with(HashMap::new) + .extend(subj_map); + } + } + Ok(merged) + } + + /// Helper to call process_changes_for_shape for all subscriptions on nuri's document. + fn process_changes_for_nuri_and_session( + self: &mut Self, + nuri: &NuriV0, + session_id: u64, + triples_added: &[Triple], + triples_removed: &[Triple], + data_already_fetched: bool, + ) -> Result { + let mut orm_changes = HashMap::new(); + + let shapes: Vec<_> = self + .orm_subscriptions + .get(nuri) + .unwrap() + .iter() + .map(|sub| { + sub.shape_type + .schema + .get(&sub.shape_type.shape) + .unwrap() + .clone() + }) + .collect(); + + for root_shape in shapes { + self.process_changes_for_shape_and_session( + nuri, + root_shape, + session_id, + triples_added, + triples_removed, + &mut orm_changes, + data_already_fetched, + )?; + } + + Ok(orm_changes) + } + + /// Add and remove the triples from the tracked subjects, + /// re-validate, and update `changes` containing the updated data. + /// Works by queuing changes by shape and subjects on a stack. + /// Nested objects are added to the stack + pub(crate) fn process_changes_for_shape_and_session( + self: &mut Self, + nuri: &NuriV0, + root_shape: Arc, + session_id: u64, + triples_added: &[Triple], + triples_removed: &[Triple], + orm_changes: &mut OrmChanges, + data_already_fetched: bool, + ) -> Result<(), NgError> { + // First in, last out stack to keep track of objects to validate (nested objects first). Strings are object IRIs. + let mut shape_validation_stack: Vec<(Arc, Vec)> = vec![]; + // Track (shape_iri, subject_iri) pairs currently being validated to prevent cycles and double evaluation. + let mut currently_validating: HashSet<(String, String)> = HashSet::new(); + // Add root shape for first validation run. + let root_shape_iri = root_shape.iri.clone(); + shape_validation_stack.push((root_shape, vec![])); + + // Process queue of shapes and subjects to validate. + // For a given shape, we evaluate every subject against that shape. + while let Some((shape, objects_to_validate)) = shape_validation_stack.pop() { + // Collect triples relevant for validation. + let added_triples_by_subject = + group_by_subject_for_shape(&shape, triples_added, &objects_to_validate); + let removed_triples_by_subject = + group_by_subject_for_shape(&shape, triples_removed, &objects_to_validate); + let modified_subject_iris: HashSet<&SubjectIri> = added_triples_by_subject + .keys() + .chain(removed_triples_by_subject.keys()) + .collect(); + + let mut orm_subscription = self + .orm_subscriptions + .get_mut(nuri) + .unwrap() + .iter_mut() + .find(|sub| sub.session_id == session_id && sub.shape_type.shape == root_shape_iri) + .unwrap(); + + // Variable to collect nested objects that need validation. + let mut nested_objects_to_eval: HashMap> = + HashMap::new(); + + // For each subject, add/remove triples and validate. + log_debug!( + "processing modified subjects: {:?} against shape: {}", + modified_subject_iris, + shape.iri + ); + + for subject_iri in &modified_subject_iris { + let validation_key = (shape.iri.clone(), subject_iri.to_string()); + + // Cycle detection: Check if this (shape, subject) pair is already being validated + if currently_validating.contains(&validation_key) { + log_warn!( + "Cycle detected: subject '{}' with shape '{}' is already being validated. Marking as invalid.", + subject_iri, + shape.iri + ); + // Mark as invalid due to cycle + // TODO: We could handle this by handling nested references as IRIs. + if let Some(tracked_shapes) = + orm_subscription.tracked_subjects.get(*subject_iri) + { + if let Some(tracked_subject) = tracked_shapes.get(&shape.iri) { + let mut ts = tracked_subject.write().unwrap(); + ts.valid = OrmTrackedSubjectValidity::Invalid; + ts.tracked_predicates.clear(); + } + } + continue; + } + + // Mark as currently validating + currently_validating.insert(validation_key.clone()); + + // Get triples of subject (added & removed). + let triples_added_for_subj = added_triples_by_subject + .get(*subject_iri) + .map(|v| v.as_slice()) + .unwrap_or(&[]); + let triples_removed_for_subj = removed_triples_by_subject + .get(*subject_iri) + .map(|v| v.as_slice()) + .unwrap_or(&[]); + + // Get or create change object for (shape, subject) pair. + let change = orm_changes + .entry(shape.iri.clone()) + .or_insert_with(HashMap::new) + .entry((*subject_iri).clone()) + .or_insert_with(|| OrmTrackedSubjectChange { + subject_iri: (*subject_iri).clone(), + predicates: HashMap::new(), + data_applied: false, + }); + + // Apply all triples for that subject to the tracked (shape, subject) pair. + // Record the changes. + { + if !change.data_applied { + log_debug!( + "Adding triples to change tracker for subject {}", + subject_iri + ); + if let Err(e) = add_remove_triples( + shape.clone(), + subject_iri, + triples_added_for_subj, + triples_removed_for_subj, + &mut orm_subscription, + change, + ) { + log_err!("apply_changes_from_triples add/remove error: {:?}", e); + panic!(); + } + change.data_applied = true; + } else { + log_debug!("not applying triples again for subject {subject_iri}"); + } + + // Validate the subject. + let need_eval = + Self::update_subject_validity(change, &shape, &mut orm_subscription); + + // We add the need_eval to be processed next after loop. + // Filter out subjects already in the validation stack to prevent double evaluation. + for (iri, schema_shape, needs_refetch) in need_eval { + let eval_key = (schema_shape.clone(), iri.clone()); + if !currently_validating.contains(&eval_key) { + // Only add if not currently being validated + nested_objects_to_eval + .entry(schema_shape) + .or_insert_with(Vec::new) + .push((iri.clone(), needs_refetch)); + } + } + } + } + + // Now, we queue all non-evaluated objects + for (shape_iri, objects_to_eval) in &nested_objects_to_eval { + let orm_subscription = self.get_first_orm_subscription_for( + nuri, + Some(&root_shape_iri), + Some(&session_id), + ); + // Extract schema and shape Arc before mutable borrow + let schema = orm_subscription.shape_type.schema.clone(); + let shape_arc = schema.get(shape_iri).unwrap().clone(); + + // Data might need to be fetched (if it has not been during initialization or nested shape fetch). + if !data_already_fetched { + let objects_to_fetch = objects_to_eval + .iter() + .filter(|(_iri, needs_fetch)| *needs_fetch) + .map(|(s, _)| s.clone()) + .collect(); + + // Create sparql query + let shape_query = + shape_type_to_sparql(&schema, &shape_iri, Some(objects_to_fetch))?; + let new_triples = + self.query_sparql_construct(shape_query, Some(nuri_to_string(nuri)))?; + + // Recursively process nested objects. + self.process_changes_for_shape_and_session( + nuri, + shape_arc.clone(), + session_id, + &new_triples, + &vec![], + orm_changes, + true, + )?; + } + + // Add objects + let objects_not_to_fetch: Vec = objects_to_eval + .iter() + .filter(|(_iri, needs_fetch)| !*needs_fetch) + .map(|(s, _)| s.clone()) + .collect(); + if objects_not_to_fetch.len() > 0 { + // Queue all objects that don't need fetching. + shape_validation_stack.push((shape_arc, objects_not_to_fetch)); + } + } + for subject_iri in modified_subject_iris { + let validation_key = (shape.iri.clone(), subject_iri.to_string()); + currently_validating.remove(&validation_key); + } + } + + Ok(()) + } + + /// Helper to get orm subscriptions for nuri, shapes and sessions. + pub fn get_orm_subscriptions_for( + &self, + nuri: &NuriV0, + shape: Option<&ShapeIri>, + session_id: Option<&u64>, + ) -> Vec<&OrmSubscription> { + self.orm_subscriptions.get(nuri).unwrap(). + // Filter shapes, if present. + iter().filter(|s| match shape { + Some(sh) => *sh == s.shape_type.shape, + None => true + // Filter session ids if present. + }).filter(|s| match session_id { + Some(id) => *id == s.session_id, + None => true + }).collect() + } + + pub fn get_first_orm_subscription_for( + &self, + nuri: &NuriV0, + shape: Option<&ShapeIri>, + session_id: Option<&u64>, + ) -> &OrmSubscription { + self.orm_subscriptions.get(nuri).unwrap(). + // Filter shapes, if present. + iter().filter(|s| match shape { + Some(sh) => *sh == s.shape_type.shape, + None => true + // Filter session ids if present. + }).filter(|s| match session_id { + Some(id) => *id == s.session_id, + None => true + }).next().unwrap() + } + + pub fn get_first_orm_subscription_sender_for( + &mut self, + nuri: &NuriV0, + shape: Option<&ShapeIri>, + session_id: Option<&u64>, + ) -> Result<(UnboundedSender, &OrmSubscription), VerifierError> { + let subs = self.orm_subscriptions.get_mut(nuri).unwrap(); + subs.retain(|sub| !sub.sender.is_closed()); + match subs // Filter shapes, if present. + .iter() + .filter(|s| match shape { + Some(sh) => *sh == s.shape_type.shape, + None => true, // Filter session ids if present. + }) + .filter(|s| match session_id { + Some(id) => *id == s.session_id, + None => true, + }) + .next() + { + None => Err(VerifierError::OrmSubscriptionNotFound), + Some(subscription) => Ok((subscription.sender.clone(), subscription)), + } + } +} diff --git a/engine/verifier/src/orm/query.rs b/engine/verifier/src/orm/query.rs new file mode 100644 index 0000000..46aec7c --- /dev/null +++ b/engine/verifier/src/orm/query.rs @@ -0,0 +1,322 @@ +// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers +// All rights reserved. +// Licensed under the Apache License, Version 2.0 +// +// or the MIT license , +// at your option. All files in the project carrying such +// notice may not be copied, modified, or distributed except +// according to those terms. + +use lazy_static::lazy_static; +use ng_repo::errors::VerifierError; +use regex::Regex; + +use std::collections::HashSet; + +pub use ng_net::orm::{OrmDiff, OrmShapeType}; + +use crate::orm::types::*; +use crate::verifier::*; +use ng_net::orm::*; +use ng_oxigraph::oxigraph::sparql::{Query, QueryResults}; +use ng_oxigraph::oxrdf::Triple; +use ng_repo::errors::NgError; +use ng_repo::log::*; + +impl Verifier { + pub fn query_sparql_construct( + &self, + query: String, + nuri: Option, + ) -> Result, NgError> { + let oxistore = self.graph_dataset.as_ref().unwrap(); + + // let graph_nuri = NuriV0::repo_graph_name( + // &update.repo_id, + // &update.overlay_id, + // ); + //let base = NuriV0::repo_id(&repo.id); + + let nuri_str = nuri.as_ref().map(|s| s.as_str()); + log_debug!("querying construct\n{}\n{}\n", nuri_str.unwrap(), query); + + let parsed = + Query::parse(&query, nuri_str).map_err(|e| NgError::OxiGraphError(e.to_string()))?; + let results = oxistore + .query(parsed, nuri) + .map_err(|e| NgError::OxiGraphError(e.to_string()))?; + match results { + QueryResults::Graph(triples) => { + let mut result_triples: Vec = vec![]; + for t in triples { + match t { + Err(e) => { + log_err!("{}", e.to_string()); + return Err(NgError::SparqlError(e.to_string())); + } + Ok(triple) => { + log_debug!("Triple fetched: {:?}", triple); + result_triples.push(triple); + } + } + } + Ok(result_triples) + } + _ => return Err(NgError::InvalidResponse), + } + } +} + +/// Heuristic: +/// Consider a string an IRI if it contains alphanumeric characters and then a colon within the first 13 characters +pub fn is_iri(s: &str) -> bool { + lazy_static! { + static ref IRI_REGEX: Regex = Regex::new(r"^[A-Za-z][A-Za-z0-9+\.\-]{1,12}:").unwrap(); + } + IRI_REGEX.is_match(s) +} + +pub fn literal_to_sparql_str(var: OrmSchemaDataType) -> Vec { + match var.literals { + None => [].to_vec(), + Some(literals) => literals + .iter() + .map(|literal| match literal { + BasicType::Bool(val) => { + if *val { + "true".to_string() + } else { + "false".to_string() + } + } + BasicType::Num(number) => number.to_string(), + BasicType::Str(sting) => { + if is_iri(sting) { + format!("<{}>", sting) + } else { + format!("\"{}\"", escape_literal(sting)) + } + } + }) + .collect(), + } +} + +pub fn shape_type_to_sparql( + schema: &OrmSchema, + shape: &ShapeIri, + filter_subjects: Option>, +) -> Result { + // Use a counter to generate unique variable names. + let mut var_counter = 0; + fn get_new_var_name(counter: &mut i32) -> String { + let name = format!("v{}", counter); + *counter += 1; + name + } + + // Collect all statements to be added to the construct and where bodies. + let mut construct_statements = Vec::new(); + let mut where_statements = Vec::new(); + + // Keep track of visited shapes while recursing to prevent infinite loops. + let mut visited_shapes: HashSet = HashSet::new(); + + // Recursive function to call for (nested) shapes. + // Returns nested WHERE statements that should be included with this shape's binding. + fn process_shape( + schema: &OrmSchema, + shape: &OrmSchemaShape, + subject_var_name: &str, + construct_statements: &mut Vec, + where_statements: &mut Vec, + var_counter: &mut i32, + visited_shapes: &mut HashSet, + in_recursion: bool, + ) -> Vec { + // Prevent infinite recursion on cyclic schemas. + // TODO: We could handle this as IRI string reference. + if visited_shapes.contains(&shape.iri) { + return vec![]; + } + + let mut new_where_statements: Vec = vec![]; + let mut new_construct_statements: Vec = vec![]; + + visited_shapes.insert(shape.iri.clone()); + + // Add statements for each predicate. + // If we are in recursion, we want to get all triples. + // That's why we add a " ?p ?o" statement afterwards + // and the extra construct statements are skipped. + for predicate in &shape.predicates { + let mut union_branches = Vec::new(); + let mut nested_where_statements = Vec::new(); + + // Predicate constraints might have more than one acceptable nested shape. Traverse each. + for datatype in &predicate.dataTypes { + if datatype.valType == OrmSchemaLiteralType::shape { + let shape_iri = &datatype.shape.clone().unwrap(); + let nested_shape = schema.get(shape_iri).unwrap(); + + // For the current acceptable shape, add CONSTRUCT, WHERE, and recurse. + + // Each shape option gets its own var. + let obj_var_name = get_new_var_name(var_counter); + + if !in_recursion { + new_construct_statements.push(format!( + " ?{} <{}> ?{}", + subject_var_name, predicate.iri, obj_var_name + )); + } + // Those are later added to a UNION, if there is more than one shape. + union_branches.push(format!( + " ?{} <{}> ?{}", + subject_var_name, predicate.iri, obj_var_name + )); + + // Recurse to add statements for nested object. + // Collect nested WHERE statements to include within this predicate's scope. + let nested_stmts = process_shape( + schema, + nested_shape, + &obj_var_name, + construct_statements, + where_statements, + var_counter, + visited_shapes, + true, + ); + nested_where_statements.extend(nested_stmts); + } + } + + // The where statement (which may be wrapped in OPTIONAL). + let where_body: String; + + if !union_branches.is_empty() { + // We have nested shape(s) which were already added to CONSTRUCT above. + // Join them with UNION and include nested WHERE statements. + + let union_body = union_branches + .into_iter() + .map(|b| format!("{{\n{}\n}}", b)) + .collect::>() + .join(" UNION "); + + // Combine the parent binding with nested statements + if !nested_where_statements.is_empty() { + let nested_joined = nested_where_statements.join(" .\n"); + where_body = format!("{} .\n{}", union_body, nested_joined); + } else { + where_body = union_body; + } + } else { + // Regular predicate data type. Just add basic CONSTRUCT and WHERE statements. + + let obj_var_name = get_new_var_name(var_counter); + if !in_recursion { + // Only add construct, if we don't have catch-all statement already. + new_construct_statements.push(format!( + " ?{} <{}> ?{}", + subject_var_name, predicate.iri, obj_var_name + )); + } + where_body = format!( + " ?{} <{}> ?{}", + subject_var_name, predicate.iri, obj_var_name + ); + } + + // Wrap in optional, if predicate is optional + if predicate.minCardinality < 1 { + new_where_statements.push(format!(" OPTIONAL {{\n{}\n }}", where_body)); + } else { + new_where_statements.push(where_body); + }; + } + + if in_recursion { + // All statements in recursive objects need to be optional + // because we want to fetch _all_ nested objects, + // invalid ones too, for later validation. + let pred_var_name = get_new_var_name(var_counter); + let obj_var_name = get_new_var_name(var_counter); + + // The "catch any triple in subject" construct statement + construct_statements.push(format!( + " ?{} ?{} ?{}", + subject_var_name, pred_var_name, obj_var_name + )); + + let joined_where_statements = new_where_statements.join(" .\n"); + + // Return nested statements to be included in parent's scope + // Combine catch-all with specific predicates in a UNION + let nested_block = format!( + " {{\n {{?{} ?{} ?{}}}\n UNION {{\n {}\n }}\n }}", + subject_var_name, pred_var_name, obj_var_name, joined_where_statements + ); + visited_shapes.remove(&shape.iri); + return vec![nested_block]; + } else { + where_statements.append(&mut new_where_statements); + construct_statements.append(&mut new_construct_statements); + } + visited_shapes.remove(&shape.iri); + vec![] + } + + let root_shape = schema.get(shape).ok_or(VerifierError::InvalidOrmSchema)?; + + // Root subject variable name + let root_var_name = get_new_var_name(&mut var_counter); + + process_shape( + schema, + root_shape, + &root_var_name, + &mut construct_statements, + &mut where_statements, + &mut var_counter, + &mut visited_shapes, + false, + ); + + // Filter subjects, if present. + if let Some(subjects) = filter_subjects { + // log_debug!("filter_subjects: {:?}", subjects); + let subjects_str = subjects + .iter() + .map(|s| format!("<{}>", s)) + .collect::>() + .join(", "); + where_statements.push(format!(" FILTER(?v0 IN ({}))", subjects_str)); + } + + // Create query from statements. + let construct_body = construct_statements.join(" .\n"); + + let where_body = where_statements.join(" .\n"); + + Ok(format!( + "CONSTRUCT {{\n{}\n}}\nWHERE {{\n{}\n}}", + construct_body, where_body + )) +} +/// SPARQL literal escape: backslash, quotes, newlines, tabs. +fn escape_literal(lit: &str) -> String { + let mut out = String::with_capacity(lit.len() + 4); + for c in lit.chars() { + match c { + '\\' => out.push_str("\\\\"), + '\"' => out.push_str("\\\""), + '\n' => out.push_str("\\n"), + '\r' => out.push_str("\\r"), + '\t' => out.push_str("\\t"), + _ => out.push(c), + } + } + return out; +} diff --git a/engine/verifier/src/orm/validation.rs b/engine/verifier/src/orm/shape_validation.rs similarity index 98% rename from engine/verifier/src/orm/validation.rs rename to engine/verifier/src/orm/shape_validation.rs index fc41cc2..041cf58 100644 --- a/engine/verifier/src/orm/validation.rs +++ b/engine/verifier/src/orm/shape_validation.rs @@ -105,7 +105,10 @@ impl Verifier { // Check 3) Validate subject against each predicate in shape. for p_schema in shape.predicates.iter() { let p_change = s_change.predicates.get(&p_schema.iri); - let tracked_pred = p_change.map(|pc| pc.tracked_predicate.read().unwrap()); + let tracked_pred = tracked_subject + .tracked_predicates + .get(&p_schema.iri) + .map(|tp_write_lock| tp_write_lock.read().unwrap()); let count = tracked_pred .as_ref() @@ -124,6 +127,8 @@ impl Verifier { set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); if count <= 0 { // If cardinality is 0, we can remove the tracked predicate. + // Drop the guard to release the immutable borrow + drop(tracked_pred); tracked_subject.tracked_predicates.remove(&p_schema.iri); } break; diff --git a/engine/verifier/src/orm/types.rs b/engine/verifier/src/orm/types.rs index 8d156c7..5021934 100644 --- a/engine/verifier/src/orm/types.rs +++ b/engine/verifier/src/orm/types.rs @@ -91,3 +91,8 @@ pub struct OrmSubscription { } pub type ShapeIri = String; pub type SubjectIri = String; + +// Structure to store changes in. By shape iri > subject iri > OrmTrackedSubjectChange +// **NOTE**: In comparison to OrmSubscription.tracked_subjects, the outer hashmap's keys are shape IRIs. +// (shape IRI -> (subject IRI -> OrmTrackedSubjectChange)) +pub type OrmChanges = HashMap>; diff --git a/engine/verifier/src/orm/utils.rs b/engine/verifier/src/orm/utils.rs index b45a289..80b804d 100644 --- a/engine/verifier/src/orm/utils.rs +++ b/engine/verifier/src/orm/utils.rs @@ -8,276 +8,14 @@ // according to those terms. use ng_oxigraph::oxrdf::Subject; -use ng_repo::log::*; use ng_repo::types::OverlayId; -use yrs::types::PathSegment; use std::collections::HashMap; use std::collections::HashSet; -use lazy_static::lazy_static; pub use ng_net::orm::{OrmDiff, OrmShapeType}; use ng_net::{app_protocol::*, orm::*}; use ng_oxigraph::oxrdf::Triple; -use ng_repo::errors::NgError; -use ng_repo::errors::VerifierError; -use regex::Regex; - -use crate::orm::types::*; - -/// Heuristic: -/// Consider a string an IRI if it contains alphanumeric characters and then a colon within the first 13 characters -pub fn is_iri(s: &str) -> bool { - lazy_static! { - static ref IRI_REGEX: Regex = Regex::new(r"^[A-Za-z][A-Za-z0-9+\.\-]{1,12}:").unwrap(); - } - IRI_REGEX.is_match(s) -} - -pub fn literal_to_sparql_str(var: OrmSchemaDataType) -> Vec { - match var.literals { - None => [].to_vec(), - Some(literals) => literals - .iter() - .map(|literal| match literal { - BasicType::Bool(val) => { - if *val { - "true".to_string() - } else { - "false".to_string() - } - } - BasicType::Num(number) => number.to_string(), - BasicType::Str(sting) => { - if is_iri(sting) { - format!("<{}>", sting) - } else { - format!("\"{}\"", escape_literal(sting)) - } - } - }) - .collect(), - } -} - -pub fn shape_type_to_sparql( - schema: &OrmSchema, - shape: &ShapeIri, - filter_subjects: Option>, -) -> Result { - // Use a counter to generate unique variable names. - let mut var_counter = 0; - fn get_new_var_name(counter: &mut i32) -> String { - let name = format!("v{}", counter); - *counter += 1; - name - } - - // Collect all statements to be added to the construct and where bodies. - let mut construct_statements = Vec::new(); - let mut where_statements = Vec::new(); - - // Keep track of visited shapes while recursing to prevent infinite loops. - let mut visited_shapes: HashSet = HashSet::new(); - - // Recursive function to call for (nested) shapes. - // Returns nested WHERE statements that should be included with this shape's binding. - fn process_shape( - schema: &OrmSchema, - shape: &OrmSchemaShape, - subject_var_name: &str, - construct_statements: &mut Vec, - where_statements: &mut Vec, - var_counter: &mut i32, - visited_shapes: &mut HashSet, - in_recursion: bool, - ) -> Vec { - // Prevent infinite recursion on cyclic schemas. - // TODO: We could handle this as IRI string reference. - if visited_shapes.contains(&shape.iri) { - return vec![]; - } - - let mut new_where_statements: Vec = vec![]; - let mut new_construct_statements: Vec = vec![]; - - visited_shapes.insert(shape.iri.clone()); - - // Add statements for each predicate. - // If we are in recursion, we want to get all triples. - // That's why we add a " ?p ?o" statement afterwards - // and the extra construct statements are skipped. - for predicate in &shape.predicates { - let mut union_branches = Vec::new(); - let mut nested_where_statements = Vec::new(); - - // Predicate constraints might have more than one acceptable nested shape. Traverse each. - for datatype in &predicate.dataTypes { - if datatype.valType == OrmSchemaLiteralType::shape { - let shape_iri = &datatype.shape.clone().unwrap(); - let nested_shape = schema.get(shape_iri).unwrap(); - - // For the current acceptable shape, add CONSTRUCT, WHERE, and recurse. - - // Each shape option gets its own var. - let obj_var_name = get_new_var_name(var_counter); - - if !in_recursion { - new_construct_statements.push(format!( - " ?{} <{}> ?{}", - subject_var_name, predicate.iri, obj_var_name - )); - } - // Those are later added to a UNION, if there is more than one shape. - union_branches.push(format!( - " ?{} <{}> ?{}", - subject_var_name, predicate.iri, obj_var_name - )); - - // Recurse to add statements for nested object. - // Collect nested WHERE statements to include within this predicate's scope. - let nested_stmts = process_shape( - schema, - nested_shape, - &obj_var_name, - construct_statements, - where_statements, - var_counter, - visited_shapes, - true, - ); - nested_where_statements.extend(nested_stmts); - } - } - - // The where statement (which may be wrapped in OPTIONAL). - let where_body: String; - - if !union_branches.is_empty() { - // We have nested shape(s) which were already added to CONSTRUCT above. - // Join them with UNION and include nested WHERE statements. - - let union_body = union_branches - .into_iter() - .map(|b| format!("{{\n{}\n}}", b)) - .collect::>() - .join(" UNION "); - - // Combine the parent binding with nested statements - if !nested_where_statements.is_empty() { - let nested_joined = nested_where_statements.join(" .\n"); - where_body = format!("{} .\n{}", union_body, nested_joined); - } else { - where_body = union_body; - } - } else { - // Regular predicate data type. Just add basic CONSTRUCT and WHERE statements. - - let obj_var_name = get_new_var_name(var_counter); - if !in_recursion { - // Only add construct, if we don't have catch-all statement already. - new_construct_statements.push(format!( - " ?{} <{}> ?{}", - subject_var_name, predicate.iri, obj_var_name - )); - } - where_body = format!( - " ?{} <{}> ?{}", - subject_var_name, predicate.iri, obj_var_name - ); - } - - // Wrap in optional, if predicate is optional - if predicate.minCardinality < 1 { - new_where_statements.push(format!(" OPTIONAL {{\n{}\n }}", where_body)); - } else { - new_where_statements.push(where_body); - }; - } - - if in_recursion { - // All statements in recursive objects need to be optional - // because we want to fetch _all_ nested objects, - // invalid ones too, for later validation. - let pred_var_name = get_new_var_name(var_counter); - let obj_var_name = get_new_var_name(var_counter); - - // The "catch any triple in subject" construct statement - construct_statements.push(format!( - " ?{} ?{} ?{}", - subject_var_name, pred_var_name, obj_var_name - )); - - let joined_where_statements = new_where_statements.join(" .\n"); - - // Return nested statements to be included in parent's scope - // Combine catch-all with specific predicates in a UNION - let nested_block = format!( - " {{\n {{?{} ?{} ?{}}}\n UNION {{\n {}\n }}\n }}", - subject_var_name, pred_var_name, obj_var_name, joined_where_statements - ); - visited_shapes.remove(&shape.iri); - return vec![nested_block]; - } else { - where_statements.append(&mut new_where_statements); - construct_statements.append(&mut new_construct_statements); - } - visited_shapes.remove(&shape.iri); - vec![] - } - - let root_shape = schema.get(shape).ok_or(VerifierError::InvalidOrmSchema)?; - - // Root subject variable name - let root_var_name = get_new_var_name(&mut var_counter); - - process_shape( - schema, - root_shape, - &root_var_name, - &mut construct_statements, - &mut where_statements, - &mut var_counter, - &mut visited_shapes, - false, - ); - - // Filter subjects, if present. - if let Some(subjects) = filter_subjects { - // log_debug!("filter_subjects: {:?}", subjects); - let subjects_str = subjects - .iter() - .map(|s| format!("<{}>", s)) - .collect::>() - .join(", "); - where_statements.push(format!(" FILTER(?v0 IN ({}))", subjects_str)); - } - - // Create query from statements. - let construct_body = construct_statements.join(" .\n"); - - let where_body = where_statements.join(" .\n"); - - Ok(format!( - "CONSTRUCT {{\n{}\n}}\nWHERE {{\n{}\n}}", - construct_body, where_body - )) -} -/// SPARQL literal escape: backslash, quotes, newlines, tabs. -fn escape_literal(lit: &str) -> String { - let mut out = String::with_capacity(lit.len() + 4); - for c in lit.chars() { - match c { - '\\' => out.push_str("\\\\"), - '\"' => out.push_str("\\\""), - '\n' => out.push_str("\\n"), - '\r' => out.push_str("\\r"), - '\t' => out.push_str("\\t"), - _ => out.push(c), - } - } - return out; -} pub fn group_by_subject_for_shape<'a>( shape: &OrmSchemaShape, diff --git a/sdk/js/signals/src/connector/applyDiff.ts b/sdk/js/signals/src/connector/applyDiff.ts index f33c695..f7da730 100644 --- a/sdk/js/signals/src/connector/applyDiff.ts +++ b/sdk/js/signals/src/connector/applyDiff.ts @@ -67,6 +67,7 @@ function isPrimitive(v: unknown): v is string | number | boolean { ); } +// TODO: Escape slashes and tildes (~1, ~0) /** * Apply a diff to an object. * diff --git a/sdk/rust/src/tests/mod.rs b/sdk/rust/src/tests/mod.rs index 1abaa70..caac2a7 100644 --- a/sdk/rust/src/tests/mod.rs +++ b/sdk/rust/src/tests/mod.rs @@ -7,8 +7,105 @@ // notice may not be copied, modified, or distributed except // according to those terms. +use ng_repo::log_err; +use serde_json::Value; + +use crate::local_broker::{doc_create, doc_sparql_update}; + +#[doc(hidden)] +pub mod orm_creation; + #[doc(hidden)] -pub mod orm; +pub mod orm_patches; #[doc(hidden)] pub mod create_or_open_wallet; + +pub(crate) async fn create_doc_with_data(session_id: u64, sparql_insert: String) -> String { + let doc_nuri = doc_create( + session_id, + "Graph".to_string(), + "test_orm_query".to_string(), + "store".to_string(), + None, + None, + ) + .await + .expect("error creating doc"); + + // Insert data + doc_sparql_update(session_id, sparql_insert, Some(doc_nuri.clone())) + .await + .expect("SPARQL update failed"); + + return doc_nuri; +} + +pub(crate) fn assert_json_eq(expected: &mut Value, actual: &mut Value) { + remove_id_fields(expected); + remove_id_fields(actual); + + sort_arrays(expected); + sort_arrays(actual); + + let diff = serde_json_diff::values(expected.clone(), actual.clone()); + if let Some(diff_) = diff { + log_err!( + "Expected and actual ORM JSON mismatch.\nDiff: {:?}\nExpected: {}\nActual: {}", + diff_, + expected, + actual + ); + assert!(false); + } +} + +/// Helper to recursively sort all arrays in nested objects into a stable ordering. +/// Arrays are sorted by their JSON string representation. +fn sort_arrays(value: &mut Value) { + match value { + Value::Object(map) => { + for v in map.values_mut() { + sort_arrays(v); + } + } + Value::Array(arr) => { + // First, recursively sort nested structures + for v in arr.iter_mut() { + sort_arrays(v); + } + // Then sort the array itself by JSON string representation + arr.sort_by(|a, b| { + let a_str = canonical_json::ser::to_string(a).unwrap_or_default(); + let b_str = canonical_json::ser::to_string(b).unwrap_or_default(); + a_str.cmp(&b_str) + }); + } + _ => {} + } +} + +/// Helper to recursively remove nested "id" fields from nested objects, +/// but only if they are not at the root level. +fn remove_id_fields(value: &mut Value) { + fn remove_id_fields_inner(value: &mut Value, is_root: bool) { + match value { + Value::Object(map) => { + if !is_root { + map.remove("id"); + } + for v in map.values_mut() { + remove_id_fields_inner(v, false); + } + } + Value::Array(arr) => { + for v in arr { + remove_id_fields_inner(v, false); + } + } + _ => {} + } + } + + remove_id_fields_inner(value, true); +} diff --git a/sdk/rust/src/tests/orm.rs b/sdk/rust/src/tests/orm_creation.rs similarity index 95% rename from sdk/rust/src/tests/orm.rs rename to sdk/rust/src/tests/orm_creation.rs index 8881fbd..7e29b96 100644 --- a/sdk/rust/src/tests/orm.rs +++ b/sdk/rust/src/tests/orm_creation.rs @@ -9,17 +9,17 @@ use crate::local_broker::{doc_create, doc_sparql_construct, doc_sparql_update, orm_start}; use crate::tests::create_or_open_wallet::create_or_open_wallet; +use crate::tests::{assert_json_eq, create_doc_with_data}; use async_std::stream::StreamExt; use ng_net::app_protocol::{AppResponse, AppResponseV0, NuriV0}; use ng_net::orm::{ - self, BasicType, OrmSchema, OrmSchemaDataType, OrmSchemaLiteralType, OrmSchemaPredicate, + BasicType, OrmSchema, OrmSchemaDataType, OrmSchemaLiteralType, OrmSchemaPredicate, OrmSchemaShape, OrmShapeType, }; -use ng_verifier::orm::utils::shape_type_to_sparql; -use ng_repo::{log_err, log_info}; +use ng_repo::log_info; +use ng_verifier::orm::query::shape_type_to_sparql; use serde_json::json; -use serde_json::Value; use std::collections::HashMap; use std::sync::Arc; @@ -1788,87 +1788,3 @@ fn create_big_schema() -> OrmSchema { return schema; } - -async fn create_doc_with_data(session_id: u64, sparql_insert: String) -> String { - let doc_nuri = doc_create( - session_id, - "Graph".to_string(), - "test_orm_query".to_string(), - "store".to_string(), - None, - None, - ) - .await - .expect("error creating doc"); - - // Insert data - doc_sparql_update(session_id, sparql_insert, Some(doc_nuri.clone())) - .await - .expect("SPARQL update failed"); - - return doc_nuri; -} - -fn assert_json_eq(expected: &mut Value, actual: &mut Value) { - remove_id_fields(expected); - remove_id_fields(actual); - - sort_arrays(expected); - sort_arrays(actual); - - let diff = serde_json_diff::values(expected.clone(), actual.clone()); - if let Some(diff_) = diff { - log_err!("Expected and actual ORM JSON mismatch.\nDiff: {:?}", diff_); - assert!(false); - } -} - -/// Helper to recursively sort all arrays in nested objects into a stable ordering. -/// Arrays are sorted by their JSON string representation. -fn sort_arrays(value: &mut Value) { - match value { - Value::Object(map) => { - for v in map.values_mut() { - sort_arrays(v); - } - } - Value::Array(arr) => { - // First, recursively sort nested structures - for v in arr.iter_mut() { - sort_arrays(v); - } - // Then sort the array itself by JSON string representation - arr.sort_by(|a, b| { - let a_str = canonical_json::ser::to_string(a).unwrap_or_default(); - let b_str = canonical_json::ser::to_string(b).unwrap_or_default(); - a_str.cmp(&b_str) - }); - } - _ => {} - } -} - -/// Helper to recursively remove nested "id" fields from nested objects, -/// but only if they are not at the root level. -fn remove_id_fields(value: &mut Value) { - fn remove_id_fields_inner(value: &mut Value, is_root: bool) { - match value { - Value::Object(map) => { - if !is_root { - map.remove("id"); - } - for v in map.values_mut() { - remove_id_fields_inner(v, false); - } - } - Value::Array(arr) => { - for v in arr { - remove_id_fields_inner(v, false); - } - } - _ => {} - } - } - - remove_id_fields_inner(value, true); -} diff --git a/sdk/rust/src/tests/orm_patches.rs b/sdk/rust/src/tests/orm_patches.rs new file mode 100644 index 0000000..1cb7938 --- /dev/null +++ b/sdk/rust/src/tests/orm_patches.rs @@ -0,0 +1,1059 @@ +// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers +// All rights reserved. +// Licensed under the Apache License, Version 2.0 +// +// or the MIT license , +// at your option. All files in the project carrying such +// notice may not be copied, modified, or distributed except +// according to those terms. + +use crate::local_broker::{doc_create, doc_sparql_construct, doc_sparql_update, orm_start}; +use crate::tests::create_or_open_wallet::create_or_open_wallet; +use crate::tests::{assert_json_eq, create_doc_with_data}; +use async_std::stream::StreamExt; +use ng_net::app_protocol::{AppResponse, AppResponseV0, NuriV0}; +use ng_net::orm::{ + BasicType, OrmSchemaDataType, OrmSchemaLiteralType, OrmSchemaPredicate, OrmSchemaShape, + OrmShapeType, +}; + +use ng_repo::log_info; +use serde_json::json; +use serde_json::Value; +use std::collections::HashMap; + +#[async_std::test] +async fn test_orm_path_creation() { + // Setup wallet and document + let (_wallet, session_id) = create_or_open_wallet().await; + + // Tests below all in this test, to prevent waiting times through wallet creation. + + // === + test_patch_add_array(session_id).await; + test_patch_remove_array(session_id).await; + + // // === + // test_orm_with_optional(session_id).await; + + // // === + // test_orm_literal(session_id).await; + + // // === + // test_orm_multi_type(session_id).await; + + // // === + // test_orm_nested_1(session_id).await; + + // // // === + // // test_orm_nested_2(session_id).await; + + // // // === + // // test_orm_nested_3(session_id).await; + + // // === + // test_orm_nested_4(session_id).await; +} + +async fn test_patch_add_array(session_id: u64) { + let doc_nuri = create_doc_with_data( + session_id, + r#" +PREFIX ex: +INSERT DATA { + a ex:TestObject ; + ex:arr 1, 2, 3 . + + a ex:TestObject . + + a ex:TestObject ; + ex:unrelated ex:TestObject ; + ex:arr 1, 2 . +} +"# + .to_string(), + ) + .await; + + // Define the ORM schema + let mut schema = HashMap::new(); + schema.insert( + "http://example.org/TestShape".to_string(), + OrmSchemaShape { + iri: "http://example.org/TestShape".to_string(), + predicates: vec![ + OrmSchemaPredicate { + iri: "http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(), + extra: Some(false), + maxCardinality: 1, + minCardinality: 1, + readablePredicate: "type".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::literal, + literals: Some(vec![BasicType::Str( + "http://example.org/TestObject".to_string(), + )]), + shape: None, + }], + } + .into(), + OrmSchemaPredicate { + iri: "http://example.org/arr".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::number, + literals: None, + shape: None, + }], + extra: Some(false), + maxCardinality: -1, + minCardinality: 0, + readablePredicate: "numArray".to_string(), + } + .into(), + ], + } + .into(), + ); + + let shape_type = OrmShapeType { + schema, + shape: "http://example.org/TestShape".to_string(), + }; + + let nuri = NuriV0::new_from(&doc_nuri).expect("parse nuri"); + let (mut receiver, cancel_fn) = orm_start(nuri, shape_type, session_id) + .await + .expect("orm_start"); + + while let Some(app_response) = receiver.next().await { + let _ = match app_response { + AppResponse::V0(v) => match v { + AppResponseV0::OrmInitial(json) => Some(json), + _ => None, + }, + } + .unwrap(); + + break; + } + + // Add more data, remove some + doc_sparql_update( + session_id, + r#" +PREFIX ex: +INSERT DATA { + + ex:arr 4 . + + + ex:arr 1, 2 . + + + ex:arr 3 . + + + ex:arr 0 . +} +"# + .to_string(), + Some(doc_nuri.clone()), + ) + .await + .expect("2nd SPARQL update failed"); + + while let Some(app_response) = receiver.next().await { + let patches = match app_response { + AppResponse::V0(v) => match v { + AppResponseV0::OrmUpdate(json) => Some(json), + _ => None, + }, + } + .unwrap(); + + log_info!("Diff ops arrived:\n"); + for patch in patches.iter() { + log_info!("{:?}", patch); + } + + let mut expected = json!([ + { + "op": "add", + "valType": "set", + "value": [4.0], + "path": "/urn:test:numArrayObj1/numArray", + + }, + { + "op": "add", + "valType": "set", + "value": [1.0,2.0], + "path": "/urn:test:numArrayObj2/numArray", + }, + { + "op": "add", + "valType": "set", + "value": [3.0], + "path": "/urn:test:numArrayObj3/numArray", + }, + // TODO: The two below are not added. + { + "op": "add", + "valType": "object", + "path": "/urn:test:numArrayObj4", + "value": Value::Null + }, + { + "op": "add", + "value": "urn:test:numArrayObj4", + "path": "/urn:test:numArrayObj4/id", + "valType": Value::Null, + }, + { + "op": "add", + "valType": "set", + "value": [0.0], + "path": "/urn:test:numArrayObj4/numArray", + }, + ]); + + let mut actual = json!(patches); + assert_json_eq(&mut expected, &mut actual); + + break; + } +} + +async fn test_patch_remove_array(session_id: u64) { + let doc_nuri = create_doc_with_data( + session_id, + r#" +PREFIX ex: +INSERT DATA { + a ex:TestObject ; + ex:arr 1, 2, 3 . + + a ex:TestObject . + + a ex:TestObject ; + ex:unrelated ex:TestObject ; + ex:arr 1, 2 . +} +"# + .to_string(), + ) + .await; + + // Define the ORM schema + let mut schema = HashMap::new(); + schema.insert( + "http://example.org/TestShape".to_string(), + OrmSchemaShape { + iri: "http://example.org/TestShape".to_string(), + predicates: vec![ + OrmSchemaPredicate { + iri: "http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(), + extra: Some(false), + maxCardinality: 1, + minCardinality: 1, + readablePredicate: "type".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::literal, + literals: Some(vec![BasicType::Str( + "http://example.org/TestObject".to_string(), + )]), + shape: None, + }], + } + .into(), + OrmSchemaPredicate { + iri: "http://example.org/arr".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::number, + literals: None, + shape: None, + }], + extra: Some(false), + maxCardinality: -1, + minCardinality: 0, + readablePredicate: "numArray".to_string(), + } + .into(), + ], + } + .into(), + ); + + let shape_type = OrmShapeType { + schema, + shape: "http://example.org/TestShape".to_string(), + }; + + let nuri = NuriV0::new_from(&doc_nuri).expect("parse nuri"); + let (mut receiver, cancel_fn) = orm_start(nuri, shape_type, session_id) + .await + .expect("orm_start"); + + while let Some(app_response) = receiver.next().await { + let _ = match app_response { + AppResponse::V0(v) => match v { + AppResponseV0::OrmInitial(json) => Some(json), + _ => None, + }, + } + .unwrap(); + + break; + } + + // Add more data, remove some + doc_sparql_update( + session_id, + r#" +PREFIX ex: +DELETE DATA { + + ex:arr 1 . +} +"# + .to_string(), + Some(doc_nuri.clone()), + ) + .await + .expect("2nd SPARQL update failed"); + + while let Some(app_response) = receiver.next().await { + let patches = match app_response { + AppResponse::V0(v) => match v { + AppResponseV0::OrmUpdate(json) => Some(json), + _ => None, + }, + } + .unwrap(); + + log_info!("Diff ops arrived:\n"); + for patch in patches.iter() { + log_info!("{:?}", patch); + } + + let mut expected = json!([ + { + "op": "remove", + "valType": "set", + "value": [1.0], + "path": "/urn:test:numArrayObj1/numArray", + + } + ]); + + let mut actual = json!(patches); + assert_json_eq(&mut expected, &mut actual); + + break; + } +} + +async fn test_patch_add_nested_1(session_id: u64) { + let doc_nuri = create_doc_with_data( + session_id, + r#" +PREFIX ex: +INSERT DATA { + + ex:multiNest , ; + ex:singleNest . + + + ex:multiNest1Str "a multi 1 string" . + + + ex:multiNest2Str "a multi 2 string" . + + + ex:singleNestStr "a single nest string" . +} +"# + .to_string(), + ) + .await; + + // Define the ORM schema + let mut schema = HashMap::new(); + schema.insert( + "http://example.org/RootShape".to_string(), + OrmSchemaShape { + iri: "http://example.org/RootShape".to_string(), + predicates: vec![ + OrmSchemaPredicate { + iri: "http://example.org/multiNest".to_string(), + extra: None, + maxCardinality: 6, + minCardinality: 1, + readablePredicate: "multiNest".to_string(), + dataTypes: vec![ + OrmSchemaDataType { + valType: OrmSchemaLiteralType::shape, + literals: None, + shape: Some("http://example.org/MultiNestShape1".to_string()), + }, + OrmSchemaDataType { + valType: OrmSchemaLiteralType::shape, + literals: None, + shape: Some("http://example.org/MultiNestShape2".to_string()), + }, + ], + } + .into(), + OrmSchemaPredicate { + iri: "http://example.org/singleNest".to_string(), + extra: Some(true), + maxCardinality: 1, + minCardinality: 1, + readablePredicate: "singleNest".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::shape, + literals: None, + shape: Some("http://example.org/SingleNestShape".to_string()), + }], + } + .into(), + ], + } + .into(), + ); + schema.insert( + "http://example.org/SingleNestShape".to_string(), + OrmSchemaShape { + iri: "http://example.org/SingleNestShape".to_string(), + predicates: vec![OrmSchemaPredicate { + iri: "http://example.org/singleNestStr".to_string(), + extra: None, + readablePredicate: "str".to_string(), + maxCardinality: 1, + minCardinality: 1, + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::string, + literals: None, + shape: None, + }], + } + .into()], + } + .into(), + ); + schema.insert( + "http://example.org/MultiNestShape1".to_string(), + OrmSchemaShape { + iri: "http://example.org/MultiNestShape1".to_string(), + predicates: vec![OrmSchemaPredicate { + iri: "http://example.org/multiNest1Str".to_string(), + extra: None, + readablePredicate: "string1".to_string(), + maxCardinality: 1, + minCardinality: 1, + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::string, + literals: None, + shape: None, + }], + } + .into()], + } + .into(), + ); + schema.insert( + "http://example.org/MultiNestShape2".to_string(), + OrmSchemaShape { + iri: "http://example.org/MultiNestShape2".to_string(), + predicates: vec![OrmSchemaPredicate { + iri: "http://example.org/multiNest2Str".to_string(), + extra: None, + readablePredicate: "string2".to_string(), + maxCardinality: 1, + minCardinality: 1, + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::string, + literals: None, + shape: None, + }], + } + .into()], + } + .into(), + ); + + let shape_type = OrmShapeType { + schema, + shape: "http://example.org/RootShape".to_string(), + }; + + let nuri = NuriV0::new_from(&doc_nuri).expect("parse nuri"); + let (mut receiver, cancel_fn) = orm_start(nuri, shape_type, session_id) + .await + .expect("orm_start"); + + while let Some(app_response) = receiver.next().await { + let orm_json = match app_response { + AppResponse::V0(v) => match v { + AppResponseV0::OrmInitial(json) => Some(json), + _ => None, + }, + } + .unwrap(); + + break; + } + + // Add more data, remove some + doc_sparql_update( + session_id, + r#" +PREFIX ex: +INSERT DATA { + + ex:multiNest1Str "replacing object shape view" . + + + ex:multiNest2Str "multi 4 added" . + + + ex:singleNestStr "Different nested val" . +} +"# + .to_string(), + Some(doc_nuri.clone()), + ) + .await + .expect("2nd SPARQL update failed"); + + while let Some(app_response) = receiver.next().await { + let patches = match app_response { + AppResponse::V0(v) => match v { + AppResponseV0::OrmUpdate(json) => Some(json), + _ => None, + }, + } + .unwrap(); + + log_info!("Diff ops arrived:\n"); + for patch in patches.iter() { + log_info!("{:?}", patch); + } + + let mut expected = json!([ + { + "op": "remove", + "path": "/urn:test:oj1/multiNest/urn:test:multiNested2/string2", + // "valType": None, + // "value": None, + }, + { + "op": "add", + // "valType": None, + "value": "replacing object shape view", + "path": "/urn:test:oj1/multiNest/urn:test:multiNested2/string1", + }, + { + "op": "add", + "valType": "object", + // "value": None, + "path": "/urn:test:oj1/multiNest/urn:test:multiNested4", + }, + { + "op": "add", + // "valType": None, + "value": "urn:test:multiNested4", + "path": "/urn:test:oj1/multiNest/urn:test:multiNested4/id", + }, + { + "op": "add", + // "valType": None, + "value": "multi 4 added", + "path": "/urn:test:oj1/multiNest/urn:test:multiNested4/string2", + }, + { + "op": "remove", + // "valType": None, + // "value": None, + "path": "/urn:test:oj1/singleNest/str", + }, + { + "op": "add", + // "valType": None, + "value": "Different nested val", + "path": "/urn:test:oj1/singleNest/str", + }, + ]); + + let mut actual = json!(patches); + assert_json_eq(&mut expected, &mut actual); + + break; + } +} + +/* + + +Old things + +*/ +async fn test_orm_nested_2(session_id: u64) { + let doc_nuri = create_doc_with_data( + session_id, + r#" +PREFIX ex: +INSERT DATA { + # Valid + + ex:knows , ; + ex:name "Alice" . + + ex:knows ; + ex:name "Bob" . + + ex:name "Claire" . + + # Invalid because claire2 is invalid + + ex:knows , ; + ex:name "Alice" . + # Invalid because claire2 is invalid + + ex:knows ; + ex:name "Bob" . + # Invalid because name is missing. + + ex:missingName "Claire missing" . +} +"# + .to_string(), + ) + .await; + + // Define the ORM schema + let mut schema = HashMap::new(); + schema.insert( + "http://example.org/PersonShape".to_string(), + OrmSchemaShape { + iri: "http://example.org/PersonShape".to_string(), + predicates: vec![ + OrmSchemaPredicate { + iri: "http://example.org/name".to_string(), + extra: None, + maxCardinality: 1, + minCardinality: 1, + readablePredicate: "name".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::string, + literals: None, + shape: None, + }], + } + .into(), + OrmSchemaPredicate { + iri: "http://example.org/knows".to_string(), + extra: Some(false), + maxCardinality: -1, + minCardinality: 0, + readablePredicate: "knows".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::shape, + literals: None, + shape: Some("http://example.org/PersonShape".to_string()), + }], + } + .into(), + ], + } + .into(), + ); + + let shape_type = OrmShapeType { + schema, + shape: "http://example.org/PersonShape".to_string(), + }; + + let nuri = NuriV0::new_from(&doc_nuri).expect("parse nuri"); + let (mut receiver, cancel_fn) = orm_start(nuri, shape_type, session_id) + .await + .expect("orm_start"); + + while let Some(app_response) = receiver.next().await { + let orm_json = match app_response { + AppResponse::V0(v) => match v { + AppResponseV0::OrmInitial(json) => Some(json), + _ => None, + }, + } + .unwrap(); + + log_info!( + "ORM JSON arrived for nested2 (person) test\n: {:?}", + orm_json + ); + + // Expected: alice and bob with their nested knows relationships + // claire2 is invalid (missing name), so alice2's knows chain is incomplete + let mut expected = json!([ + { + "id": "urn:test:alice", + "name": "Alice", + "knows": { + "urn:test:bob": { + "name": "Bob", + "knows": { + "urn:test:claire": { + "name": "Claire", + "knows": {} + } + } + }, + "urn:test:claire": { + "name": "Claire", + "knows": {} + } + } + }, + { + "id": "urn:test:bob", + "name": "Bob", + "knows": { + "urn:test:claire": { + "name": "Claire", + "knows": {} + } + } + }, + { + "id": "urn:test:claire", + "name": "Claire", + "knows": {} + } + ]); + + let mut actual_mut = orm_json.clone(); + log_info!( + "JSON for nested2\n{}", + serde_json::to_string(&actual_mut).unwrap() + ); + assert_json_eq(&mut expected, &mut actual_mut); + + break; + } + cancel_fn(); +} + +async fn test_orm_nested_3(session_id: u64) { + let doc_nuri = create_doc_with_data( + session_id, + r#" +PREFIX ex: +INSERT DATA { + # Valid + + a ex:Alice ; + ex:knows , . + + a ex:Bob ; + ex:knows . + + a ex:Claire . + + # Invalid because claire is invalid + + a ex:Alice ; + ex:knows , . + # Invalid because claire is invalid + + a ex:Bob ; + ex:knows . + # Invalid, wrong type. + + a ex:Claire2 . +} +"# + .to_string(), + ) + .await; + + // Define the ORM schema + let mut schema = HashMap::new(); + schema.insert( + "http://example.org/AliceShape".to_string(), + OrmSchemaShape { + iri: "http://example.org/AliceShape".to_string(), + predicates: vec![ + OrmSchemaPredicate { + iri: "http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(), + extra: None, + maxCardinality: 1, + minCardinality: 1, + readablePredicate: "type".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::literal, + literals: Some(vec![BasicType::Str( + "http://example.org/Alice".to_string(), + )]), + shape: None, + }], + } + .into(), + OrmSchemaPredicate { + iri: "http://example.org/knows".to_string(), + extra: Some(false), + maxCardinality: -1, + minCardinality: 0, + readablePredicate: "knows".to_string(), + dataTypes: vec![ + OrmSchemaDataType { + valType: OrmSchemaLiteralType::shape, + literals: None, + shape: Some("http://example.org/BobShape".to_string()), + }, + OrmSchemaDataType { + valType: OrmSchemaLiteralType::shape, + literals: None, + shape: Some("http://example.org/ClaireShape".to_string()), + }, + ], + } + .into(), + ], + } + .into(), + ); + schema.insert( + "http://example.org/BobShape".to_string(), + OrmSchemaShape { + iri: "http://example.org/BobShape".to_string(), + predicates: vec![ + OrmSchemaPredicate { + iri: "http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(), + extra: Some(true), + maxCardinality: 1, + minCardinality: 1, + readablePredicate: "type".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::literal, + literals: Some(vec![BasicType::Str("http://example.org/Bob".to_string())]), + shape: None, + }], + } + .into(), + OrmSchemaPredicate { + iri: "http://example.org/knows".to_string(), + extra: Some(false), + maxCardinality: -1, + minCardinality: 0, + readablePredicate: "knows".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::shape, + literals: None, + shape: Some("http://example.org/ClaireShape".to_string()), + }], + } + .into(), + ], + } + .into(), + ); + schema.insert( + "http://example.org/ClaireShape".to_string(), + OrmSchemaShape { + iri: "http://example.org/ClaireShape".to_string(), + predicates: vec![OrmSchemaPredicate { + iri: "http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(), + extra: None, + maxCardinality: 1, + minCardinality: 1, + readablePredicate: "type".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::literal, + literals: Some(vec![BasicType::Str( + "http://example.org/Claire".to_string(), + )]), + shape: None, + }], + } + .into()], + } + .into(), + ); + + let shape_type = OrmShapeType { + schema, + shape: "http://example.org/AliceShape".to_string(), + }; + + let nuri = NuriV0::new_from(&doc_nuri).expect("parse nuri"); + let (mut receiver, cancel_fn) = orm_start(nuri, shape_type, session_id) + .await + .expect("orm_start"); + + while let Some(app_response) = receiver.next().await { + let orm_json = match app_response { + AppResponse::V0(v) => match v { + AppResponseV0::OrmInitial(json) => Some(json), + _ => None, + }, + } + .unwrap(); + + log_info!( + "ORM JSON arrived for nested3 (person) test\n: {:?}", + serde_json::to_string(&orm_json).unwrap() + ); + + // Expected: alice with knows relationships to bob and claire + // alice2 is incomplete because claire2 has wrong type + let mut expected = json!([ + { + "id": "urn:test:alice", + "type": "http://example.org/Alice", + "knows": { + "urn:test:bob": { + "type": "http://example.org/Bob", + "knows": { + "urn:test:claire": { + "type": "http://example.org/Claire" + } + } + }, + "urn:test:claire": { + "type": "http://example.org/Claire" + } + } + } + ]); + + let mut actual_mut = orm_json.clone(); + assert_json_eq(&mut expected, &mut actual_mut); + + break; + } + cancel_fn(); +} + +async fn test_orm_nested_4(session_id: u64) { + let doc_nuri = create_doc_with_data( + session_id, + r#" +PREFIX ex: +INSERT DATA { + # Valid + + a ex:Person ; + ex:hasCat , . + + a ex:Cat . + + a ex:Cat . +} +"# + .to_string(), + ) + .await; + + // Define the ORM schema + let mut schema = HashMap::new(); + schema.insert( + "http://example.org/PersonShape".to_string(), + OrmSchemaShape { + iri: "http://example.org/PersonShape".to_string(), + predicates: vec![ + OrmSchemaPredicate { + iri: "http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(), + extra: None, + maxCardinality: 1, + minCardinality: 1, + readablePredicate: "type".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::literal, + literals: Some(vec![BasicType::Str( + "http://example.org/Person".to_string(), + )]), + shape: None, + }], + } + .into(), + OrmSchemaPredicate { + iri: "http://example.org/hasCat".to_string(), + extra: Some(false), + maxCardinality: -1, + minCardinality: 0, + readablePredicate: "cats".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::shape, + literals: None, + shape: Some("http://example.org/CatShape".to_string()), + }], + } + .into(), + ], + } + .into(), + ); + schema.insert( + "http://example.org/CatShape".to_string(), + OrmSchemaShape { + iri: "http://example.org/CatShape".to_string(), + predicates: vec![OrmSchemaPredicate { + iri: "http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(), + extra: Some(true), + maxCardinality: 1, + minCardinality: 1, + readablePredicate: "type".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::literal, + literals: Some(vec![BasicType::Str("http://example.org/Cat".to_string())]), + shape: None, + }], + } + .into()], + } + .into(), + ); + + let shape_type = OrmShapeType { + schema, + shape: "http://example.org/PersonShape".to_string(), + }; + + let nuri = NuriV0::new_from(&doc_nuri).expect("parse nuri"); + let (mut receiver, cancel_fn) = orm_start(nuri, shape_type, session_id) + .await + .expect("orm_start"); + + while let Some(app_response) = receiver.next().await { + let orm_json = match app_response { + AppResponse::V0(v) => match v { + AppResponseV0::OrmInitial(json) => Some(json), + _ => None, + }, + } + .unwrap(); + + let mut expected = json!([ + { + "id": "urn:test:alice", + "type": "http://example.org/Person", + "cats": { + "urn:test:kitten1": { + "type": "http://example.org/Cat" + }, + "urn:test:kitten2": { + "type": "http://example.org/Cat" + } + }, + } + ]); + + let mut actual_mut = orm_json.clone(); + + assert_json_eq(&mut expected, &mut actual_mut); + + break; + } + cancel_fn(); +}