Compare commits
6 Commits
bf8bdd94b9
...
1753b6d5fb
Author | SHA1 | Date |
---|---|---|
|
1753b6d5fb | 2 days ago |
![]() |
cc9e3c0940 | 3 days ago |
![]() |
94dc33d6bb | 3 days ago |
![]() |
4ab7b547c9 | 3 days ago |
![]() |
f817809a2a | 3 days ago |
![]() |
c0637ddaee | 4 days ago |
@ -0,0 +1,528 @@ |
||||
// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers
|
||||
// All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0
|
||||
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
|
||||
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
|
||||
// at your option. All files in the project carrying such
|
||||
// notice may not be copied, modified, or distributed except
|
||||
// according to those terms.
|
||||
|
||||
use std::collections::HashMap; |
||||
use std::u64; |
||||
|
||||
use futures::SinkExt; |
||||
pub use ng_net::orm::{OrmDiff, OrmShapeType}; |
||||
use ng_net::{app_protocol::*, orm::*}; |
||||
use ng_repo::log::*; |
||||
|
||||
use crate::orm::types::*; |
||||
use crate::orm::utils::*; |
||||
use crate::orm::OrmChanges; |
||||
use crate::types::*; |
||||
use crate::verifier::*; |
||||
use ng_net::types::OverlayLink; |
||||
use ng_oxigraph::oxrdf::Triple; |
||||
use ng_repo::types::OverlayId; |
||||
use ng_repo::types::RepoId; |
||||
use serde_json::json; |
||||
use serde_json::Value; |
||||
use std::collections::HashSet; |
||||
use std::sync::Arc; |
||||
use std::sync::RwLock; |
||||
|
||||
impl Verifier { |
||||
/// Generate and send JSON patches from GraphQuadsPatch (quad inserts and removes) to JS-land.
|
||||
pub(crate) async fn orm_backend_update( |
||||
&mut self, |
||||
session_id: u64, |
||||
repo_id: RepoId, |
||||
overlay_id: OverlayId, |
||||
patch: GraphQuadsPatch, |
||||
) { |
||||
let overlaylink: OverlayLink = overlay_id.into(); |
||||
|
||||
// We need to apply the patches to all subscriptions we have. We can use process_changes_for_*
|
||||
// That updates the tracked subjects, validates them, and returns a set of changes structured
|
||||
// by the respective schema.
|
||||
|
||||
let triple_inserts: Vec<Triple> = patch |
||||
.inserts |
||||
.iter() |
||||
.map(|quad| { |
||||
Triple::new( |
||||
quad.subject.clone(), |
||||
quad.predicate.clone(), |
||||
quad.object.clone(), |
||||
) |
||||
}) |
||||
.collect(); |
||||
let triple_removes: Vec<Triple> = patch |
||||
.removes |
||||
.iter() |
||||
.map(|quad| { |
||||
Triple::new( |
||||
quad.subject.clone(), |
||||
quad.predicate.clone(), |
||||
quad.object.clone(), |
||||
) |
||||
}) |
||||
.collect(); |
||||
|
||||
// let mut updates = Vec::new();
|
||||
|
||||
let mut scopes = vec![]; |
||||
for (scope, subs) in self.orm_subscriptions.iter_mut() { |
||||
// Remove old subscriptions
|
||||
subs.retain(|sub| !sub.sender.is_closed()); |
||||
|
||||
if !(scope.target == NuriTargetV0::UserSite |
||||
|| scope |
||||
.overlay |
||||
.as_ref() |
||||
.map_or(false, |ol| overlaylink == *ol) |
||||
|| scope.target == NuriTargetV0::Repo(repo_id)) |
||||
{ |
||||
continue; |
||||
} |
||||
|
||||
// prepare to apply updates to tracked subjects and record the changes.
|
||||
let root_shapes = subs |
||||
.iter() |
||||
.map(|sub| { |
||||
sub.shape_type |
||||
.schema |
||||
.get(&sub.shape_type.shape) |
||||
.unwrap() |
||||
.clone() |
||||
}) |
||||
.collect::<Vec<_>>(); |
||||
|
||||
scopes.push((scope.clone(), root_shapes)); |
||||
} |
||||
|
||||
log_debug!( |
||||
"[orm_backend_update], creating patch objects for scopes:\n{}", |
||||
scopes.len() |
||||
); |
||||
for (scope, shapes) in scopes { |
||||
let mut orm_changes: OrmChanges = HashMap::new(); |
||||
|
||||
// Apply the changes to tracked subjects.
|
||||
for shape_arc in shapes { |
||||
let _ = self.process_changes_for_shape_and_session( |
||||
&scope, |
||||
shape_arc, |
||||
session_id, |
||||
&triple_inserts, |
||||
&triple_removes, |
||||
&mut orm_changes, |
||||
false, |
||||
); |
||||
} |
||||
|
||||
let subs = self.orm_subscriptions.get(&scope).unwrap(); |
||||
for sub in subs.iter() { |
||||
log_debug!( |
||||
"Applying changes to subscription with nuri {} and shape {}", |
||||
sub.nuri.repo(), |
||||
sub.shape_type.shape |
||||
); |
||||
|
||||
// The JSON patches to send to JS land.
|
||||
let mut patches: Vec<OrmDiffOp> = vec![]; |
||||
|
||||
// Keep track of created objects by path and if they need an id.
|
||||
// Later we created patches from them to ensure the objects exist.
|
||||
let mut paths_of_objects_to_create: HashSet<(Vec<String>, Option<SubjectIri>)> = |
||||
HashSet::new(); |
||||
|
||||
// We construct object patches from a change (which is associated with a shape type). {op: add, valType: object, value: Null, path: ...}
|
||||
// For each change that has a subject tracked in this subscription,
|
||||
// - Get change operation (calling diff_op_from_pred_change).
|
||||
// - case not object, single --> either add or remove (must be one of each at max)
|
||||
// - case not object, multi --> just add and or set patch
|
||||
// - case object, multi --> create object patch + nested object patch (will be handled when recursing paths to add primitive values)
|
||||
// - case object, single --> just object patch (will be handled when recursing paths to add primitive values)
|
||||
// - Add patches for each change operation for the path of the change in the schema.
|
||||
// We find the path by traversing the schema up to the parents (add_diff_ops_for_tracked_subject).
|
||||
|
||||
// TODO: Special edge case: An object with parents changed and the parents' predicate schema has multiple allowed shapes.
|
||||
// Now, there are multiple tracked subjects with the same subject IRI but different shapes of which some
|
||||
// are valid or invalid. The first valid (subject, shape) pair must used for materialization.
|
||||
// - if a higher-priority shape became invalid but a lower priority shape is valid, delete and new add.
|
||||
// - if a higher-priority shape became valid, delete and add new valid.
|
||||
// Problem: We might not have the triples present to materialize the newly valid object so we need to fetch them.
|
||||
|
||||
// Process changes for this subscription
|
||||
// Iterate over all changes and create patches
|
||||
for (shape_iri, subject_changes) in &orm_changes { |
||||
for (subject_iri, change) in subject_changes { |
||||
// Get the tracked subject for this (subject, shape) pair
|
||||
let tracked_subject = sub |
||||
.tracked_subjects |
||||
.get(subject_iri) |
||||
.and_then(|shapes| shapes.get(shape_iri)) |
||||
.map(|ts| ts.read().unwrap()) |
||||
.unwrap(); |
||||
|
||||
// Now we have the tracked predicate (containing the shape) and the change.
|
||||
// Check validity changes
|
||||
if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Invalid |
||||
&& tracked_subject.valid == OrmTrackedSubjectValidity::Invalid |
||||
{ |
||||
// Is the subject invalid and was it before? There is nothing we need to inform about.
|
||||
continue; |
||||
} else if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Valid |
||||
&& tracked_subject.valid == OrmTrackedSubjectValidity::Invalid |
||||
|| tracked_subject.valid == OrmTrackedSubjectValidity::Untracked |
||||
{ |
||||
// Has the subject become invalid or untracked?
|
||||
// We add a patch, deleting the object at its root.
|
||||
let mut path: Vec<String>; |
||||
if tracked_subject.parents.is_empty() { |
||||
// If this is a root object, we need to add the object's id itself.
|
||||
path = vec![tracked_subject.subject_iri.clone()]; |
||||
} else { |
||||
path = vec![]; |
||||
} |
||||
|
||||
build_path_to_root_and_create_patches( |
||||
&tracked_subject, |
||||
&sub.tracked_subjects, |
||||
&sub.shape_type.shape, |
||||
&mut path, |
||||
(OrmDiffOpType::remove, Some(OrmDiffType::object), None, None), |
||||
&mut patches, |
||||
&mut paths_of_objects_to_create, |
||||
); |
||||
} else { |
||||
// The subject is valid or has become valid.
|
||||
// Process each predicate change
|
||||
for (_pred_iri, pred_change) in &change.predicates { |
||||
let tracked_predicate = |
||||
pred_change.tracked_predicate.read().unwrap(); |
||||
let pred_name = tracked_predicate.schema.readablePredicate.clone(); |
||||
|
||||
// Get the diff operations for this predicate change
|
||||
let diff_ops = create_diff_ops_from_predicate_change(pred_change); |
||||
|
||||
// For each diff operation, traverse up to the root to build the path
|
||||
for diff_op in diff_ops { |
||||
let mut path = vec![pred_name.clone()]; |
||||
|
||||
// Start recursion from this tracked subject
|
||||
build_path_to_root_and_create_patches( |
||||
&tracked_subject, |
||||
&sub.tracked_subjects, |
||||
&sub.shape_type.shape, |
||||
&mut path, |
||||
diff_op, |
||||
&mut patches, |
||||
&mut paths_of_objects_to_create, |
||||
); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Create patches for objects that need to be created
|
||||
// These are patches with {op: add, valType: object, value: Null, path: ...}
|
||||
// Sort by path length (shorter first) to ensure parent objects are created before children
|
||||
let mut sorted_object_paths: Vec<_> = paths_of_objects_to_create.iter().collect(); |
||||
sorted_object_paths.sort_by_key(|(path_segments, _)| path_segments.len()); |
||||
|
||||
for (path_segments, maybe_iri) in sorted_object_paths { |
||||
let escaped_path: Vec<String> = path_segments |
||||
.iter() |
||||
.map(|seg| escape_json_pointer(seg)) |
||||
.collect(); |
||||
let json_pointer = format!("/{}", escaped_path.join("/")); |
||||
|
||||
patches.push(OrmDiffOp { |
||||
op: OrmDiffOpType::add, |
||||
valType: Some(OrmDiffType::object), |
||||
path: json_pointer.clone(), |
||||
value: None, |
||||
}); |
||||
if let Some(iri) = maybe_iri { |
||||
patches.push(OrmDiffOp { |
||||
op: OrmDiffOpType::add, |
||||
valType: Some(OrmDiffType::object), |
||||
path: format!("{}/id", json_pointer), |
||||
value: Some(json!(iri)), |
||||
}); |
||||
} |
||||
} |
||||
|
||||
// Send response with patches.
|
||||
let _ = sub |
||||
.sender |
||||
.clone() |
||||
.send(AppResponse::V0(AppResponseV0::OrmUpdate(patches.to_vec()))) |
||||
.await; |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
/// Find the predicate schema linking a parent to a child tracked subject and build the path segment.
|
||||
/// Returns the updated path if a linking predicate is found.
|
||||
fn build_path_segment_for_parent( |
||||
tracked_subject: &OrmTrackedSubject, |
||||
parent_ts: &OrmTrackedSubject, |
||||
base_path: &[String], |
||||
) -> Option<Vec<String>> { |
||||
// Find the predicate schema linking parent to this tracked subject
|
||||
for pred_arc in &parent_ts.shape.predicates { |
||||
// Check if this predicate has our subject as a child
|
||||
if let Some(tracked_pred) = parent_ts.tracked_predicates.get(&pred_arc.iri) { |
||||
let tp = tracked_pred.read().unwrap(); |
||||
|
||||
// Check if this tracked subject is in the children
|
||||
let is_child = tp.tracked_children.iter().any(|child| { |
||||
let child_read = child.read().unwrap(); |
||||
child_read.subject_iri == tracked_subject.subject_iri |
||||
}); |
||||
|
||||
if is_child { |
||||
// Build the path segment
|
||||
let mut new_path = base_path.to_vec(); |
||||
|
||||
let is_multi = pred_arc.maxCardinality > 1 || pred_arc.maxCardinality == -1; |
||||
|
||||
// For multi-valued predicates, add the object IRI as a key first
|
||||
if is_multi { |
||||
new_path.insert(0, tracked_subject.subject_iri.clone()); |
||||
} |
||||
|
||||
// Add the readable predicate name
|
||||
new_path.insert(0, pred_arc.readablePredicate.clone()); |
||||
|
||||
return Some(new_path); |
||||
} |
||||
} |
||||
} |
||||
None |
||||
} |
||||
|
||||
/// Recursively build the path from a tracked subject to the root and create diff operation patches.
|
||||
/// The function recurses from child to parents down to a root tracked subject.
|
||||
/// If multiple parents exist, it adds separate patches for each.
|
||||
fn build_path_to_root_and_create_patches( |
||||
tracked_subject: &OrmTrackedSubject, |
||||
tracked_subjects: &HashMap<String, HashMap<String, Arc<RwLock<OrmTrackedSubject>>>>, |
||||
root_shape: &String, |
||||
path: &mut Vec<String>, |
||||
diff_op: ( |
||||
OrmDiffOpType, |
||||
Option<OrmDiffType>, |
||||
Option<Value>, // The value added / removed
|
||||
Option<String>, // The IRI, if change is an added / removed object.
|
||||
), |
||||
patches: &mut Vec<OrmDiffOp>, |
||||
paths_of_objects_to_create: &mut HashSet<(Vec<String>, Option<SubjectIri>)>, |
||||
) { |
||||
// If the tracked subject is not valid, we don't create patches for it
|
||||
if tracked_subject.valid != OrmTrackedSubjectValidity::Valid { |
||||
return; |
||||
} |
||||
|
||||
// If the tracked subject is newly valid (was not valid before but is now),
|
||||
// we need to ensure the object is created with an "add object" patch
|
||||
if tracked_subject.prev_valid != OrmTrackedSubjectValidity::Valid { |
||||
// Check if we're at a root subject or need to traverse to parents
|
||||
if tracked_subject.parents.is_empty() || tracked_subject.shape.iri == *root_shape { |
||||
// At root: build the path with the subject IRI
|
||||
let escaped_path: Vec<String> = |
||||
path.iter().map(|seg| escape_json_pointer(seg)).collect(); |
||||
let json_pointer = format!( |
||||
"/{}/{}", |
||||
escape_json_pointer(&tracked_subject.subject_iri), |
||||
escaped_path.join("/") |
||||
); |
||||
|
||||
// Create an "add object" patch to ensure the object exists
|
||||
patches.push(OrmDiffOp { |
||||
op: OrmDiffOpType::add, |
||||
valType: Some(OrmDiffType::object), |
||||
path: json_pointer.clone(), |
||||
value: None, |
||||
}); |
||||
|
||||
// Also add the id field for the object
|
||||
patches.push(OrmDiffOp { |
||||
op: OrmDiffOpType::add, |
||||
valType: None, |
||||
path: format!("{}/id", json_pointer), |
||||
value: Some(json!(tracked_subject.subject_iri)), |
||||
}); |
||||
} else { |
||||
// Not at root: traverse to parents and create object patches along the way
|
||||
for (_parent_iri, parent_tracked_subject) in tracked_subject.parents.iter() { |
||||
let parent_ts = parent_tracked_subject.read().unwrap(); |
||||
|
||||
if let Some(new_path) = |
||||
build_path_segment_for_parent(tracked_subject, &parent_ts, path) |
||||
{ |
||||
// Recurse to the parent first
|
||||
build_path_to_root_and_create_patches( |
||||
&parent_ts, |
||||
tracked_subjects, |
||||
root_shape, |
||||
&mut new_path.clone(), |
||||
(OrmDiffOpType::add, Some(OrmDiffType::object), None, None), |
||||
patches, |
||||
paths_of_objects_to_create, |
||||
); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// If this subject has no parents or its shape matches the root shape, we've reached the root
|
||||
if tracked_subject.parents.is_empty() || tracked_subject.shape.iri == *root_shape { |
||||
// Build the final JSON Pointer path
|
||||
let escaped_path: Vec<String> = path.iter().map(|seg| escape_json_pointer(seg)).collect(); |
||||
// Always add the root subject to the path.
|
||||
let json_pointer = format!( |
||||
"/{}/{}", |
||||
escape_json_pointer(&tracked_subject.subject_iri), |
||||
escaped_path.join("/") |
||||
); |
||||
|
||||
// Create the patch
|
||||
patches.push(OrmDiffOp { |
||||
op: diff_op.0.clone(), |
||||
valType: diff_op.1.clone(), |
||||
path: json_pointer.clone(), |
||||
value: diff_op.2.clone(), |
||||
}); |
||||
|
||||
// // If a new object is created on a predicate where multiple ones are allowed, create IRI path too.
|
||||
// if let Some(added_obj_iri) = diff_op.3 {
|
||||
// patches.push(OrmDiffOp {
|
||||
// op: diff_op.0.clone(),
|
||||
// valType: diff_op.1.clone(),
|
||||
// path: format!("{}/{}", json_pointer, escape_json_pointer(&added_obj_iri)),
|
||||
// value: diff_op.2.clone(),
|
||||
// });
|
||||
// }
|
||||
|
||||
return; |
||||
} |
||||
|
||||
// Recurse to parents
|
||||
for (_parent_iri, parent_tracked_subject) in tracked_subject.parents.iter() { |
||||
let parent_ts = parent_tracked_subject.read().unwrap(); |
||||
|
||||
// Build the path segment for this parent
|
||||
if let Some(new_path) = build_path_segment_for_parent(tracked_subject, &parent_ts, path) { |
||||
// Recurse to the parent
|
||||
build_path_to_root_and_create_patches( |
||||
&parent_ts, |
||||
tracked_subjects, |
||||
root_shape, |
||||
&mut new_path.clone(), |
||||
diff_op.clone(), |
||||
patches, |
||||
paths_of_objects_to_create, |
||||
); |
||||
} |
||||
} |
||||
} |
||||
|
||||
/// Create diff operations from a predicate change.
|
||||
/// Returns a list of (op_type, value_type, value, iri) tuples.
|
||||
fn create_diff_ops_from_predicate_change( |
||||
pred_change: &OrmTrackedPredicateChanges, |
||||
) -> Vec<( |
||||
OrmDiffOpType, |
||||
Option<OrmDiffType>, |
||||
Option<Value>, // The value added / removed
|
||||
Option<String>, // The IRI, if change is an added / removed object.
|
||||
)> { |
||||
let tracked_predicate = pred_change.tracked_predicate.read().unwrap(); |
||||
|
||||
let is_multi = tracked_predicate.schema.maxCardinality > 1 |
||||
|| tracked_predicate.schema.maxCardinality == -1; |
||||
let is_object = tracked_predicate |
||||
.schema |
||||
.dataTypes |
||||
.iter() |
||||
.any(|dt| dt.shape.is_some()); |
||||
|
||||
let mut ops = vec![]; |
||||
|
||||
if !is_multi && !is_object { |
||||
if pred_change.values_added.len() == 1 { |
||||
// A value was added. Another one might have been removed
|
||||
// but the add patch overwrite previous values.
|
||||
return [( |
||||
OrmDiffOpType::add, |
||||
None, |
||||
Some(json!(pred_change.values_added[0])), |
||||
None, |
||||
)] |
||||
.to_vec(); |
||||
} else { |
||||
// Since there is only one possible value, removing the path is enough.
|
||||
return [(OrmDiffOpType::remove, None, None, None)].to_vec(); |
||||
} |
||||
} else if is_multi && !is_object { |
||||
if pred_change.values_added.len() > 0 { |
||||
ops.push(( |
||||
OrmDiffOpType::add, |
||||
Some(OrmDiffType::set), |
||||
Some(json!(pred_change.values_added)), |
||||
None, |
||||
)); |
||||
} |
||||
if pred_change.values_removed.len() > 0 { |
||||
ops.push(( |
||||
OrmDiffOpType::remove, |
||||
Some(OrmDiffType::set), |
||||
Some(json!(pred_change.values_removed)), |
||||
None, |
||||
)); |
||||
} |
||||
return ops; |
||||
} |
||||
// else if !is_multi && is_object {
|
||||
// if pred_change.values_added.len() > 0 {
|
||||
// ops.push((OrmDiffOpType::add, Some(OrmDiffType::object), None, None));
|
||||
// } else if pred_change.values_removed.len() > 0 {
|
||||
// ops.push((OrmDiffOpType::remove, Some(OrmDiffType::object), None, None));
|
||||
// }
|
||||
// } else if is_multi && is_object {
|
||||
// for val_added in pred_change.values_added.iter() {
|
||||
// let iri = match val_added {
|
||||
// BasicType::Str(s) => s,
|
||||
// _ => {
|
||||
// continue;
|
||||
// }
|
||||
// };
|
||||
// ops.push((
|
||||
// OrmDiffOpType::add,
|
||||
// Some(OrmDiffType::object),
|
||||
// None,
|
||||
// Some(iri.clone()),
|
||||
// ));
|
||||
// }
|
||||
// for val_removed in pred_change.values_added.iter() {
|
||||
// let iri = match val_removed {
|
||||
// BasicType::Str(s) => s,
|
||||
// _ => {
|
||||
// continue;
|
||||
// }
|
||||
// };
|
||||
// ops.push((
|
||||
// OrmDiffOpType::remove,
|
||||
// Some(OrmDiffType::object),
|
||||
// None,
|
||||
// Some(iri.clone()),
|
||||
// ));
|
||||
// }
|
||||
// }
|
||||
return ops; |
||||
} |
@ -0,0 +1,115 @@ |
||||
// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers
|
||||
// All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0
|
||||
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
|
||||
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
|
||||
// at your option. All files in the project carrying such
|
||||
// notice may not be copied, modified, or distributed except
|
||||
// according to those terms.
|
||||
|
||||
use ng_oxigraph::oxrdf::Quad; |
||||
use ng_repo::errors::VerifierError; |
||||
|
||||
use std::u64; |
||||
|
||||
use futures::SinkExt; |
||||
use ng_net::app_protocol::*; |
||||
pub use ng_net::orm::{OrmDiff, OrmShapeType}; |
||||
use ng_repo::log::*; |
||||
|
||||
use crate::orm::types::*; |
||||
use crate::verifier::*; |
||||
|
||||
impl Verifier { |
||||
/// After creating new objects (without an id) in JS-land,
|
||||
/// we send the generated id for those back.
|
||||
/// If something went wrong (revert_inserts / revert_removes not empty),
|
||||
/// we send a JSON patch back to revert the made changes.
|
||||
pub(crate) async fn orm_update_self( |
||||
&mut self, |
||||
scope: &NuriV0, |
||||
shape_iri: ShapeIri, |
||||
session_id: u64, |
||||
skolemnized_blank_nodes: Vec<Quad>, |
||||
revert_inserts: Vec<Quad>, |
||||
revert_removes: Vec<Quad>, |
||||
) -> Result<(), VerifierError> { |
||||
let (mut sender, orm_subscription) = |
||||
self.get_first_orm_subscription_sender_for(scope, Some(&shape_iri), Some(&session_id))?; |
||||
|
||||
// TODO prepare OrmUpdateBlankNodeIds with skolemnized_blank_nodes
|
||||
// use orm_subscription if needed
|
||||
// note(niko): I think skolemnized blank nodes can still be many, in case of multi-level nested sub-objects.
|
||||
let orm_bnids = vec![]; |
||||
let _ = sender |
||||
.send(AppResponse::V0(AppResponseV0::OrmUpdateBlankNodeIds( |
||||
orm_bnids, |
||||
))) |
||||
.await; |
||||
|
||||
// TODO (later) revert the inserts and removes
|
||||
// let orm_diff = vec![];
|
||||
// let _ = sender.send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff))).await;
|
||||
|
||||
Ok(()) |
||||
} |
||||
|
||||
/// Handles updates coming from JS-land (JSON patches).
|
||||
pub(crate) async fn orm_frontend_update( |
||||
&mut self, |
||||
session_id: u64, |
||||
scope: &NuriV0, |
||||
shape_iri: ShapeIri, |
||||
diff: OrmDiff, |
||||
) -> Result<(), String> { |
||||
log_info!( |
||||
"frontend_update_orm session={} scope={:?} shape={} diff={:?}", |
||||
session_id, |
||||
scope, |
||||
shape_iri, |
||||
diff |
||||
); |
||||
|
||||
let (doc_nuri, sparql_update) = { |
||||
let orm_subscription = |
||||
self.get_first_orm_subscription_for(scope, Some(&shape_iri), Some(&session_id)); |
||||
|
||||
// use orm_subscription as needed
|
||||
// do the magic, then, find the doc where the query should start and generate the sparql update
|
||||
let doc_nuri = NuriV0::new_empty(); |
||||
let sparql_update: String = String::new(); |
||||
(doc_nuri, sparql_update) |
||||
}; |
||||
|
||||
match self |
||||
.process_sparql_update( |
||||
&doc_nuri, |
||||
&sparql_update, |
||||
&None, |
||||
self.get_peer_id_for_skolem(), |
||||
session_id, |
||||
) |
||||
.await |
||||
{ |
||||
Err(e) => Err(e), |
||||
Ok((_, revert_inserts, revert_removes, skolemnized_blank_nodes)) => { |
||||
if !revert_inserts.is_empty() |
||||
|| !revert_removes.is_empty() |
||||
|| !skolemnized_blank_nodes.is_empty() |
||||
{ |
||||
self.orm_update_self( |
||||
scope, |
||||
shape_iri, |
||||
session_id, |
||||
skolemnized_blank_nodes, |
||||
revert_inserts, |
||||
revert_removes, |
||||
) |
||||
.await |
||||
.map_err(|e| e.to_string())?; |
||||
} |
||||
Ok(()) |
||||
} |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,269 @@ |
||||
// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers
|
||||
// All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0
|
||||
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
|
||||
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
|
||||
// at your option. All files in the project carrying such
|
||||
// notice may not be copied, modified, or distributed except
|
||||
// according to those terms.
|
||||
|
||||
use futures::SinkExt; |
||||
use ng_net::orm::*; |
||||
pub use ng_net::orm::{OrmDiff, OrmShapeType}; |
||||
use ng_net::utils::Receiver; |
||||
use serde_json::json; |
||||
use serde_json::Value; |
||||
use std::collections::HashMap; |
||||
use std::sync::Arc; |
||||
use std::sync::RwLock; |
||||
|
||||
use crate::orm::query::shape_type_to_sparql; |
||||
use crate::orm::types::*; |
||||
use crate::orm::utils::nuri_to_string; |
||||
use crate::types::CancelFn; |
||||
use crate::verifier::Verifier; |
||||
use ng_net::app_protocol::{AppResponse, AppResponseV0, NuriV0}; |
||||
use ng_net::orm::OrmSchemaShape; |
||||
use ng_repo::errors::NgError; |
||||
use ng_repo::log::*; |
||||
use std::u64; |
||||
|
||||
use futures::channel::mpsc; |
||||
|
||||
use crate::orm::{types::OrmTrackedSubjectChange, OrmChanges}; |
||||
|
||||
impl Verifier { |
||||
/// Entry point to create a new orm subscription.
|
||||
/// Triggers the creation of an orm object which is sent back to the receiver.
|
||||
pub(crate) async fn start_orm( |
||||
&mut self, |
||||
nuri: &NuriV0, |
||||
shape_type: &OrmShapeType, |
||||
session_id: u64, |
||||
) -> Result<(Receiver<AppResponse>, CancelFn), NgError> { |
||||
let (mut tx, rx) = mpsc::unbounded::<AppResponse>(); |
||||
|
||||
// TODO: Validate schema:
|
||||
// If multiple data types are present for the same predicate, they must be of of the same type.
|
||||
// All referenced shapes must be available.
|
||||
|
||||
// Create new subscription and add to self.orm_subscriptions
|
||||
let orm_subscription = OrmSubscription { |
||||
shape_type: shape_type.clone(), |
||||
session_id: session_id, |
||||
sender: tx.clone(), |
||||
tracked_subjects: HashMap::new(), |
||||
nuri: nuri.clone(), |
||||
}; |
||||
|
||||
self.orm_subscriptions |
||||
.entry(nuri.clone()) |
||||
.or_insert(vec![]) |
||||
.push(orm_subscription); |
||||
|
||||
let orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type)?; |
||||
// log_debug!("create_orm_object_for_shape return {:?}", orm_objects);
|
||||
|
||||
let _ = tx |
||||
.send(AppResponse::V0(AppResponseV0::OrmInitial(orm_objects))) |
||||
.await; |
||||
|
||||
let close = Box::new(move || { |
||||
log_debug!("closing ORM subscription"); |
||||
if !tx.is_closed() { |
||||
tx.close_channel(); |
||||
} |
||||
}); |
||||
Ok((rx, close)) |
||||
} |
||||
|
||||
/// For a nuri, session, and shape, create an ORM JSON object.
|
||||
fn create_orm_object_for_shape( |
||||
&mut self, |
||||
nuri: &NuriV0, |
||||
session_id: u64, |
||||
shape_type: &OrmShapeType, |
||||
) -> Result<Value, NgError> { |
||||
// Query triples for this shape
|
||||
let shape_query = shape_type_to_sparql(&shape_type.schema, &shape_type.shape, None)?; |
||||
let shape_triples = self.query_sparql_construct(shape_query, Some(nuri_to_string(nuri)))?; |
||||
|
||||
let changes: OrmChanges = |
||||
self.apply_triple_changes(&shape_triples, &[], nuri, Some(session_id.clone()), true)?; |
||||
|
||||
let orm_subscription = |
||||
self.get_first_orm_subscription_for(nuri, Some(&shape_type.shape), Some(&session_id)); |
||||
|
||||
let schema: &HashMap<String, Arc<OrmSchemaShape>> = &orm_subscription.shape_type.schema; |
||||
let root_shape = schema.get(&shape_type.shape).unwrap(); |
||||
let Some(_root_changes) = changes.get(&root_shape.iri).map(|s| s.values()) else { |
||||
return Ok(Value::Array(vec![])); |
||||
}; |
||||
|
||||
let mut return_vals: Value = Value::Array(vec![]); |
||||
let return_val_vec = return_vals.as_array_mut().unwrap(); |
||||
|
||||
// log_debug!(
|
||||
// "Tracked subjects:\n{:?}\n",
|
||||
// orm_subscription.tracked_subjects,
|
||||
// );
|
||||
// For each valid change struct, we build an orm object.
|
||||
// The way we get the changes from the tracked subjects is a bit hacky, sorry.
|
||||
for (subject_iri, tracked_subjects_by_shape) in &orm_subscription.tracked_subjects { |
||||
if let Some(tracked_subject) = tracked_subjects_by_shape.get(&shape_type.shape) { |
||||
let ts = tracked_subject.read().unwrap(); |
||||
log_info!("changes for: {:?} valid: {:?}\n", ts.subject_iri, ts.valid); |
||||
|
||||
if ts.valid == OrmTrackedSubjectValidity::Valid { |
||||
if let Some(change) = changes |
||||
.get(&shape_type.shape) |
||||
.and_then(|subject_iri_to_ts| subject_iri_to_ts.get(subject_iri).clone()) |
||||
{ |
||||
let new_val = materialize_orm_object( |
||||
change, |
||||
&changes, |
||||
root_shape, |
||||
&orm_subscription.tracked_subjects, |
||||
); |
||||
// TODO: For some reason, this log statement causes a panic.
|
||||
// log_debug!("Materialized change:\n{:?}\ninto:\n{:?}", change, new_val);
|
||||
return_val_vec.push(new_val); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
return Ok(return_vals); |
||||
} |
||||
} |
||||
|
||||
/// Create ORM JSON object from OrmTrackedSubjectChange and shape.
|
||||
pub(crate) fn materialize_orm_object( |
||||
change: &OrmTrackedSubjectChange, |
||||
changes: &OrmChanges, |
||||
shape: &OrmSchemaShape, |
||||
tracked_subjects: &HashMap<String, HashMap<String, Arc<RwLock<OrmTrackedSubject>>>>, |
||||
) -> Value { |
||||
let mut orm_obj = json!({"id": change.subject_iri}); |
||||
let orm_obj_map = orm_obj.as_object_mut().unwrap(); |
||||
for pred_schema in &shape.predicates { |
||||
let property_name = &pred_schema.readablePredicate; |
||||
let is_multi = pred_schema.maxCardinality > 1 || pred_schema.maxCardinality == -1; |
||||
|
||||
let Some(pred_change) = change.predicates.get(&pred_schema.iri) else { |
||||
// No triples for this property.
|
||||
|
||||
if pred_schema.minCardinality == 0 && is_multi { |
||||
// If this predicate schema is an array though, insert empty array.
|
||||
orm_obj_map.insert(property_name.clone(), Value::Array(vec![])); |
||||
} |
||||
|
||||
continue; |
||||
}; |
||||
|
||||
if pred_schema |
||||
.dataTypes |
||||
.iter() |
||||
.any(|dt| dt.valType == OrmSchemaLiteralType::shape) |
||||
{ |
||||
// We have a nested type.
|
||||
|
||||
// Helper closure to create Value structs from a nested object_iri.
|
||||
let get_nested_orm_obj = |object_iri: &SubjectIri| { |
||||
// Find allowed schemas for the predicate's datatype.
|
||||
let shape_iris: Vec<ShapeIri> = pred_schema |
||||
.dataTypes |
||||
.iter() |
||||
.flat_map(|dt| dt.shape.clone()) |
||||
.collect(); |
||||
|
||||
// Find subject_change for this subject. There exists at least one (shape, subject) pair.
|
||||
// If multiple allowed shapes exist, the first one is chosen.
|
||||
let nested = shape_iris.iter().find_map(|shape_iri| { |
||||
changes |
||||
.get(shape_iri) |
||||
.and_then(|subject_changes| subject_changes.get(object_iri)) |
||||
.map(|ch| (shape_iri, ch)) |
||||
}); |
||||
|
||||
if let Some((matched_shape_iri, nested_subject_change)) = nested { |
||||
if let Some(nested_tracked_subject) = tracked_subjects |
||||
.get(&nested_subject_change.subject_iri) |
||||
.and_then(|shape_to_tracked_orm| { |
||||
shape_to_tracked_orm.get(matched_shape_iri) |
||||
}) |
||||
{ |
||||
let nested_tracked_subject = nested_tracked_subject.read().unwrap(); |
||||
if nested_tracked_subject.valid == OrmTrackedSubjectValidity::Valid { |
||||
// Recurse
|
||||
return Some(materialize_orm_object( |
||||
nested_subject_change, |
||||
changes, |
||||
&nested_tracked_subject.shape, |
||||
tracked_subjects, |
||||
)); |
||||
} |
||||
} |
||||
} |
||||
None |
||||
}; |
||||
|
||||
if is_multi { |
||||
// Represent nested objects with more than one child
|
||||
// as a map/object of <IRI of nested object> -> nested object,
|
||||
// since there is no conceptual ordering of the children.
|
||||
let mut nested_objects_map = serde_json::Map::new(); |
||||
|
||||
// Add each nested objects.
|
||||
for new_val in &pred_change.values_added { |
||||
if let BasicType::Str(object_iri) = new_val { |
||||
if let Some(nested_orm_obj) = get_nested_orm_obj(object_iri) { |
||||
nested_objects_map.insert(object_iri.clone(), nested_orm_obj); |
||||
} |
||||
} |
||||
} |
||||
orm_obj_map.insert(property_name.clone(), Value::Object(nested_objects_map)); |
||||
} else { |
||||
if let Some(BasicType::Str(object_iri)) = pred_change.values_added.get(0) { |
||||
if let Some(nested_orm_obj) = get_nested_orm_obj(object_iri) { |
||||
orm_obj_map.insert(property_name.clone(), nested_orm_obj); |
||||
} |
||||
} |
||||
} |
||||
} else { |
||||
// We have a basic type (string, number, bool, literal).
|
||||
|
||||
if is_multi { |
||||
// Add values as array.
|
||||
orm_obj_map.insert( |
||||
property_name.clone(), |
||||
Value::Array( |
||||
pred_change |
||||
.values_added |
||||
.iter() |
||||
.map(|v| match v { |
||||
BasicType::Bool(b) => json!(*b), |
||||
BasicType::Num(n) => json!(*n), |
||||
BasicType::Str(s) => json!(s), |
||||
}) |
||||
.collect(), |
||||
), |
||||
); |
||||
} else { |
||||
// Add value as primitive, if present.
|
||||
if let Some(val) = pred_change.values_added.get(0) { |
||||
orm_obj_map.insert( |
||||
property_name.clone(), |
||||
match val { |
||||
BasicType::Bool(b) => json!(*b), |
||||
BasicType::Num(n) => json!(*n), |
||||
BasicType::Str(s) => json!(s), |
||||
}, |
||||
); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
return orm_obj; |
||||
} |
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,387 @@ |
||||
// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers
|
||||
// All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0
|
||||
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
|
||||
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
|
||||
// at your option. All files in the project carrying such
|
||||
// notice may not be copied, modified, or distributed except
|
||||
// according to those terms.
|
||||
|
||||
use futures::channel::mpsc::UnboundedSender; |
||||
use ng_repo::errors::VerifierError; |
||||
|
||||
use std::collections::HashMap; |
||||
use std::collections::HashSet; |
||||
use std::sync::Arc; |
||||
use std::u64; |
||||
|
||||
pub use ng_net::orm::{OrmDiff, OrmShapeType}; |
||||
use ng_net::{app_protocol::*, orm::*}; |
||||
use ng_oxigraph::oxrdf::Triple; |
||||
use ng_repo::errors::NgError; |
||||
use ng_repo::log::*; |
||||
|
||||
use crate::orm::add_remove_triples::add_remove_triples; |
||||
use crate::orm::query::shape_type_to_sparql; |
||||
use crate::orm::types::*; |
||||
use crate::orm::utils::*; |
||||
use crate::orm::OrmChanges; |
||||
use crate::verifier::*; |
||||
|
||||
impl Verifier { |
||||
/// Apply triples to a nuri's document.
|
||||
/// Updates tracked_subjects in orm_subscriptions.
|
||||
pub(crate) fn apply_triple_changes( |
||||
&mut self, |
||||
triples_added: &[Triple], |
||||
triples_removed: &[Triple], |
||||
nuri: &NuriV0, |
||||
only_for_session_id: Option<u64>, |
||||
data_already_fetched: bool, |
||||
) -> Result<OrmChanges, NgError> { |
||||
log_debug!("apply_triple_changes {:?}", only_for_session_id); |
||||
// If we have a specific session, handle only that subscription.
|
||||
if let Some(session_id) = only_for_session_id { |
||||
return self.process_changes_for_nuri_and_session( |
||||
&nuri.clone(), |
||||
session_id, |
||||
triples_added, |
||||
triples_removed, |
||||
data_already_fetched, |
||||
); |
||||
} |
||||
|
||||
// Otherwise, iterate all sessions.
|
||||
let mut merged: OrmChanges = HashMap::new(); |
||||
|
||||
let session_ids: Vec<_> = self |
||||
.orm_subscriptions |
||||
.get(nuri) |
||||
.unwrap() |
||||
.iter() |
||||
.map(|s| s.session_id.clone()) |
||||
.collect(); |
||||
|
||||
for session_id in session_ids { |
||||
let changes = self.process_changes_for_nuri_and_session( |
||||
&nuri, |
||||
session_id, |
||||
triples_added, |
||||
triples_removed, |
||||
data_already_fetched, |
||||
)?; |
||||
|
||||
for (shape_iri, subj_map) in changes { |
||||
merged |
||||
.entry(shape_iri) |
||||
.or_insert_with(HashMap::new) |
||||
.extend(subj_map); |
||||
} |
||||
} |
||||
Ok(merged) |
||||
} |
||||
|
||||
/// Helper to call process_changes_for_shape for all subscriptions on nuri's document.
|
||||
fn process_changes_for_nuri_and_session( |
||||
self: &mut Self, |
||||
nuri: &NuriV0, |
||||
session_id: u64, |
||||
triples_added: &[Triple], |
||||
triples_removed: &[Triple], |
||||
data_already_fetched: bool, |
||||
) -> Result<OrmChanges, NgError> { |
||||
let mut orm_changes = HashMap::new(); |
||||
|
||||
let shapes: Vec<_> = self |
||||
.orm_subscriptions |
||||
.get(nuri) |
||||
.unwrap() |
||||
.iter() |
||||
.map(|sub| { |
||||
sub.shape_type |
||||
.schema |
||||
.get(&sub.shape_type.shape) |
||||
.unwrap() |
||||
.clone() |
||||
}) |
||||
.collect(); |
||||
|
||||
for root_shape in shapes { |
||||
self.process_changes_for_shape_and_session( |
||||
nuri, |
||||
root_shape, |
||||
session_id, |
||||
triples_added, |
||||
triples_removed, |
||||
&mut orm_changes, |
||||
data_already_fetched, |
||||
)?; |
||||
} |
||||
|
||||
Ok(orm_changes) |
||||
} |
||||
|
||||
/// Add and remove the triples from the tracked subjects,
|
||||
/// re-validate, and update `changes` containing the updated data.
|
||||
/// Works by queuing changes by shape and subjects on a stack.
|
||||
/// Nested objects are added to the stack
|
||||
pub(crate) fn process_changes_for_shape_and_session( |
||||
self: &mut Self, |
||||
nuri: &NuriV0, |
||||
root_shape: Arc<OrmSchemaShape>, |
||||
session_id: u64, |
||||
triples_added: &[Triple], |
||||
triples_removed: &[Triple], |
||||
orm_changes: &mut OrmChanges, |
||||
data_already_fetched: bool, |
||||
) -> Result<(), NgError> { |
||||
// First in, last out stack to keep track of objects to validate (nested objects first). Strings are object IRIs.
|
||||
let mut shape_validation_stack: Vec<(Arc<OrmSchemaShape>, Vec<String>)> = vec![]; |
||||
// Track (shape_iri, subject_iri) pairs currently being validated to prevent cycles and double evaluation.
|
||||
let mut currently_validating: HashSet<(String, String)> = HashSet::new(); |
||||
// Add root shape for first validation run.
|
||||
let root_shape_iri = root_shape.iri.clone(); |
||||
shape_validation_stack.push((root_shape, vec![])); |
||||
|
||||
// Process queue of shapes and subjects to validate.
|
||||
// For a given shape, we evaluate every subject against that shape.
|
||||
while let Some((shape, objects_to_validate)) = shape_validation_stack.pop() { |
||||
// Collect triples relevant for validation.
|
||||
let added_triples_by_subject = |
||||
group_by_subject_for_shape(&shape, triples_added, &objects_to_validate); |
||||
let removed_triples_by_subject = |
||||
group_by_subject_for_shape(&shape, triples_removed, &objects_to_validate); |
||||
let modified_subject_iris: HashSet<&SubjectIri> = added_triples_by_subject |
||||
.keys() |
||||
.chain(removed_triples_by_subject.keys()) |
||||
.collect(); |
||||
|
||||
let mut orm_subscription = self |
||||
.orm_subscriptions |
||||
.get_mut(nuri) |
||||
.unwrap() |
||||
.iter_mut() |
||||
.find(|sub| sub.session_id == session_id && sub.shape_type.shape == root_shape_iri) |
||||
.unwrap(); |
||||
|
||||
// Variable to collect nested objects that need validation.
|
||||
let mut nested_objects_to_eval: HashMap<ShapeIri, Vec<(SubjectIri, bool)>> = |
||||
HashMap::new(); |
||||
|
||||
// For each subject, add/remove triples and validate.
|
||||
log_debug!( |
||||
"processing modified subjects: {:?} against shape: {}", |
||||
modified_subject_iris, |
||||
shape.iri |
||||
); |
||||
|
||||
for subject_iri in &modified_subject_iris { |
||||
let validation_key = (shape.iri.clone(), subject_iri.to_string()); |
||||
|
||||
// Cycle detection: Check if this (shape, subject) pair is already being validated
|
||||
if currently_validating.contains(&validation_key) { |
||||
log_warn!( |
||||
"Cycle detected: subject '{}' with shape '{}' is already being validated. Marking as invalid.", |
||||
subject_iri, |
||||
shape.iri |
||||
); |
||||
// Mark as invalid due to cycle
|
||||
// TODO: We could handle this by handling nested references as IRIs.
|
||||
if let Some(tracked_shapes) = |
||||
orm_subscription.tracked_subjects.get(*subject_iri) |
||||
{ |
||||
if let Some(tracked_subject) = tracked_shapes.get(&shape.iri) { |
||||
let mut ts = tracked_subject.write().unwrap(); |
||||
ts.valid = OrmTrackedSubjectValidity::Invalid; |
||||
ts.tracked_predicates.clear(); |
||||
} |
||||
} |
||||
continue; |
||||
} |
||||
|
||||
// Mark as currently validating
|
||||
currently_validating.insert(validation_key.clone()); |
||||
|
||||
// Get triples of subject (added & removed).
|
||||
let triples_added_for_subj = added_triples_by_subject |
||||
.get(*subject_iri) |
||||
.map(|v| v.as_slice()) |
||||
.unwrap_or(&[]); |
||||
let triples_removed_for_subj = removed_triples_by_subject |
||||
.get(*subject_iri) |
||||
.map(|v| v.as_slice()) |
||||
.unwrap_or(&[]); |
||||
|
||||
// Get or create change object for (shape, subject) pair.
|
||||
let change = orm_changes |
||||
.entry(shape.iri.clone()) |
||||
.or_insert_with(HashMap::new) |
||||
.entry((*subject_iri).clone()) |
||||
.or_insert_with(|| OrmTrackedSubjectChange { |
||||
subject_iri: (*subject_iri).clone(), |
||||
predicates: HashMap::new(), |
||||
data_applied: false, |
||||
}); |
||||
|
||||
// Apply all triples for that subject to the tracked (shape, subject) pair.
|
||||
// Record the changes.
|
||||
{ |
||||
if !change.data_applied { |
||||
log_debug!( |
||||
"Adding triples to change tracker for subject {}", |
||||
subject_iri |
||||
); |
||||
if let Err(e) = add_remove_triples( |
||||
shape.clone(), |
||||
subject_iri, |
||||
triples_added_for_subj, |
||||
triples_removed_for_subj, |
||||
&mut orm_subscription, |
||||
change, |
||||
) { |
||||
log_err!("apply_changes_from_triples add/remove error: {:?}", e); |
||||
panic!(); |
||||
} |
||||
change.data_applied = true; |
||||
} else { |
||||
log_debug!("not applying triples again for subject {subject_iri}"); |
||||
} |
||||
|
||||
// Validate the subject.
|
||||
let need_eval = |
||||
Self::update_subject_validity(change, &shape, &mut orm_subscription); |
||||
|
||||
// We add the need_eval to be processed next after loop.
|
||||
// Filter out subjects already in the validation stack to prevent double evaluation.
|
||||
for (iri, schema_shape, needs_refetch) in need_eval { |
||||
let eval_key = (schema_shape.clone(), iri.clone()); |
||||
if !currently_validating.contains(&eval_key) { |
||||
// Only add if not currently being validated
|
||||
nested_objects_to_eval |
||||
.entry(schema_shape) |
||||
.or_insert_with(Vec::new) |
||||
.push((iri.clone(), needs_refetch)); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Now, we queue all non-evaluated objects
|
||||
for (shape_iri, objects_to_eval) in &nested_objects_to_eval { |
||||
let orm_subscription = self.get_first_orm_subscription_for( |
||||
nuri, |
||||
Some(&root_shape_iri), |
||||
Some(&session_id), |
||||
); |
||||
// Extract schema and shape Arc before mutable borrow
|
||||
let schema = orm_subscription.shape_type.schema.clone(); |
||||
let shape_arc = schema.get(shape_iri).unwrap().clone(); |
||||
|
||||
// Data might need to be fetched (if it has not been during initialization or nested shape fetch).
|
||||
if !data_already_fetched { |
||||
let objects_to_fetch = objects_to_eval |
||||
.iter() |
||||
.filter(|(_iri, needs_fetch)| *needs_fetch) |
||||
.map(|(s, _)| s.clone()) |
||||
.collect(); |
||||
|
||||
// Create sparql query
|
||||
let shape_query = |
||||
shape_type_to_sparql(&schema, &shape_iri, Some(objects_to_fetch))?; |
||||
let new_triples = |
||||
self.query_sparql_construct(shape_query, Some(nuri_to_string(nuri)))?; |
||||
|
||||
// Recursively process nested objects.
|
||||
self.process_changes_for_shape_and_session( |
||||
nuri, |
||||
shape_arc.clone(), |
||||
session_id, |
||||
&new_triples, |
||||
&vec![], |
||||
orm_changes, |
||||
true, |
||||
)?; |
||||
} |
||||
|
||||
// Add objects
|
||||
let objects_not_to_fetch: Vec<String> = objects_to_eval |
||||
.iter() |
||||
.filter(|(_iri, needs_fetch)| !*needs_fetch) |
||||
.map(|(s, _)| s.clone()) |
||||
.collect(); |
||||
if objects_not_to_fetch.len() > 0 { |
||||
// Queue all objects that don't need fetching.
|
||||
shape_validation_stack.push((shape_arc, objects_not_to_fetch)); |
||||
} |
||||
} |
||||
for subject_iri in modified_subject_iris { |
||||
let validation_key = (shape.iri.clone(), subject_iri.to_string()); |
||||
currently_validating.remove(&validation_key); |
||||
} |
||||
} |
||||
|
||||
Ok(()) |
||||
} |
||||
|
||||
/// Helper to get orm subscriptions for nuri, shapes and sessions.
|
||||
pub fn get_orm_subscriptions_for( |
||||
&self, |
||||
nuri: &NuriV0, |
||||
shape: Option<&ShapeIri>, |
||||
session_id: Option<&u64>, |
||||
) -> Vec<&OrmSubscription> { |
||||
self.orm_subscriptions.get(nuri).unwrap(). |
||||
// Filter shapes, if present.
|
||||
iter().filter(|s| match shape { |
||||
Some(sh) => *sh == s.shape_type.shape, |
||||
None => true |
||||
// Filter session ids if present.
|
||||
}).filter(|s| match session_id { |
||||
Some(id) => *id == s.session_id, |
||||
None => true |
||||
}).collect() |
||||
} |
||||
|
||||
pub fn get_first_orm_subscription_for( |
||||
&self, |
||||
nuri: &NuriV0, |
||||
shape: Option<&ShapeIri>, |
||||
session_id: Option<&u64>, |
||||
) -> &OrmSubscription { |
||||
self.orm_subscriptions.get(nuri).unwrap(). |
||||
// Filter shapes, if present.
|
||||
iter().filter(|s| match shape { |
||||
Some(sh) => *sh == s.shape_type.shape, |
||||
None => true |
||||
// Filter session ids if present.
|
||||
}).filter(|s| match session_id { |
||||
Some(id) => *id == s.session_id, |
||||
None => true |
||||
}).next().unwrap() |
||||
} |
||||
|
||||
pub fn get_first_orm_subscription_sender_for( |
||||
&mut self, |
||||
nuri: &NuriV0, |
||||
shape: Option<&ShapeIri>, |
||||
session_id: Option<&u64>, |
||||
) -> Result<(UnboundedSender<AppResponse>, &OrmSubscription), VerifierError> { |
||||
let subs = self.orm_subscriptions.get_mut(nuri).unwrap(); |
||||
subs.retain(|sub| !sub.sender.is_closed()); |
||||
match subs // Filter shapes, if present.
|
||||
.iter() |
||||
.filter(|s| match shape { |
||||
Some(sh) => *sh == s.shape_type.shape, |
||||
None => true, // Filter session ids if present.
|
||||
}) |
||||
.filter(|s| match session_id { |
||||
Some(id) => *id == s.session_id, |
||||
None => true, |
||||
}) |
||||
.next() |
||||
{ |
||||
None => Err(VerifierError::OrmSubscriptionNotFound), |
||||
Some(subscription) => Ok((subscription.sender.clone(), subscription)), |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,322 @@ |
||||
// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers
|
||||
// All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0
|
||||
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
|
||||
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
|
||||
// at your option. All files in the project carrying such
|
||||
// notice may not be copied, modified, or distributed except
|
||||
// according to those terms.
|
||||
|
||||
use lazy_static::lazy_static; |
||||
use ng_repo::errors::VerifierError; |
||||
use regex::Regex; |
||||
|
||||
use std::collections::HashSet; |
||||
|
||||
pub use ng_net::orm::{OrmDiff, OrmShapeType}; |
||||
|
||||
use crate::orm::types::*; |
||||
use crate::verifier::*; |
||||
use ng_net::orm::*; |
||||
use ng_oxigraph::oxigraph::sparql::{Query, QueryResults}; |
||||
use ng_oxigraph::oxrdf::Triple; |
||||
use ng_repo::errors::NgError; |
||||
use ng_repo::log::*; |
||||
|
||||
impl Verifier { |
||||
pub fn query_sparql_construct( |
||||
&self, |
||||
query: String, |
||||
nuri: Option<String>, |
||||
) -> Result<Vec<Triple>, NgError> { |
||||
let oxistore = self.graph_dataset.as_ref().unwrap(); |
||||
|
||||
// let graph_nuri = NuriV0::repo_graph_name(
|
||||
// &update.repo_id,
|
||||
// &update.overlay_id,
|
||||
// );
|
||||
//let base = NuriV0::repo_id(&repo.id);
|
||||
|
||||
let nuri_str = nuri.as_ref().map(|s| s.as_str()); |
||||
log_debug!("querying construct\n{}\n{}\n", nuri_str.unwrap(), query); |
||||
|
||||
let parsed = |
||||
Query::parse(&query, nuri_str).map_err(|e| NgError::OxiGraphError(e.to_string()))?; |
||||
let results = oxistore |
||||
.query(parsed, nuri) |
||||
.map_err(|e| NgError::OxiGraphError(e.to_string()))?; |
||||
match results { |
||||
QueryResults::Graph(triples) => { |
||||
let mut result_triples: Vec<Triple> = vec![]; |
||||
for t in triples { |
||||
match t { |
||||
Err(e) => { |
||||
log_err!("{}", e.to_string()); |
||||
return Err(NgError::SparqlError(e.to_string())); |
||||
} |
||||
Ok(triple) => { |
||||
log_debug!("Triple fetched: {:?}", triple); |
||||
result_triples.push(triple); |
||||
} |
||||
} |
||||
} |
||||
Ok(result_triples) |
||||
} |
||||
_ => return Err(NgError::InvalidResponse), |
||||
} |
||||
} |
||||
} |
||||
|
||||
/// Heuristic:
|
||||
/// Consider a string an IRI if it contains alphanumeric characters and then a colon within the first 13 characters
|
||||
pub fn is_iri(s: &str) -> bool { |
||||
lazy_static! { |
||||
static ref IRI_REGEX: Regex = Regex::new(r"^[A-Za-z][A-Za-z0-9+\.\-]{1,12}:").unwrap(); |
||||
} |
||||
IRI_REGEX.is_match(s) |
||||
} |
||||
|
||||
pub fn literal_to_sparql_str(var: OrmSchemaDataType) -> Vec<String> { |
||||
match var.literals { |
||||
None => [].to_vec(), |
||||
Some(literals) => literals |
||||
.iter() |
||||
.map(|literal| match literal { |
||||
BasicType::Bool(val) => { |
||||
if *val { |
||||
"true".to_string() |
||||
} else { |
||||
"false".to_string() |
||||
} |
||||
} |
||||
BasicType::Num(number) => number.to_string(), |
||||
BasicType::Str(sting) => { |
||||
if is_iri(sting) { |
||||
format!("<{}>", sting) |
||||
} else { |
||||
format!("\"{}\"", escape_literal(sting)) |
||||
} |
||||
} |
||||
}) |
||||
.collect(), |
||||
} |
||||
} |
||||
|
||||
pub fn shape_type_to_sparql( |
||||
schema: &OrmSchema, |
||||
shape: &ShapeIri, |
||||
filter_subjects: Option<Vec<String>>, |
||||
) -> Result<String, NgError> { |
||||
// Use a counter to generate unique variable names.
|
||||
let mut var_counter = 0; |
||||
fn get_new_var_name(counter: &mut i32) -> String { |
||||
let name = format!("v{}", counter); |
||||
*counter += 1; |
||||
name |
||||
} |
||||
|
||||
// Collect all statements to be added to the construct and where bodies.
|
||||
let mut construct_statements = Vec::new(); |
||||
let mut where_statements = Vec::new(); |
||||
|
||||
// Keep track of visited shapes while recursing to prevent infinite loops.
|
||||
let mut visited_shapes: HashSet<ShapeIri> = HashSet::new(); |
||||
|
||||
// Recursive function to call for (nested) shapes.
|
||||
// Returns nested WHERE statements that should be included with this shape's binding.
|
||||
fn process_shape( |
||||
schema: &OrmSchema, |
||||
shape: &OrmSchemaShape, |
||||
subject_var_name: &str, |
||||
construct_statements: &mut Vec<String>, |
||||
where_statements: &mut Vec<String>, |
||||
var_counter: &mut i32, |
||||
visited_shapes: &mut HashSet<String>, |
||||
in_recursion: bool, |
||||
) -> Vec<String> { |
||||
// Prevent infinite recursion on cyclic schemas.
|
||||
// TODO: We could handle this as IRI string reference.
|
||||
if visited_shapes.contains(&shape.iri) { |
||||
return vec![]; |
||||
} |
||||
|
||||
let mut new_where_statements: Vec<String> = vec![]; |
||||
let mut new_construct_statements: Vec<String> = vec![]; |
||||
|
||||
visited_shapes.insert(shape.iri.clone()); |
||||
|
||||
// Add statements for each predicate.
|
||||
// If we are in recursion, we want to get all triples.
|
||||
// That's why we add a "<subject> ?p ?o" statement afterwards
|
||||
// and the extra construct statements are skipped.
|
||||
for predicate in &shape.predicates { |
||||
let mut union_branches = Vec::new(); |
||||
let mut nested_where_statements = Vec::new(); |
||||
|
||||
// Predicate constraints might have more than one acceptable nested shape. Traverse each.
|
||||
for datatype in &predicate.dataTypes { |
||||
if datatype.valType == OrmSchemaLiteralType::shape { |
||||
let shape_iri = &datatype.shape.clone().unwrap(); |
||||
let nested_shape = schema.get(shape_iri).unwrap(); |
||||
|
||||
// For the current acceptable shape, add CONSTRUCT, WHERE, and recurse.
|
||||
|
||||
// Each shape option gets its own var.
|
||||
let obj_var_name = get_new_var_name(var_counter); |
||||
|
||||
if !in_recursion { |
||||
new_construct_statements.push(format!( |
||||
" ?{} <{}> ?{}", |
||||
subject_var_name, predicate.iri, obj_var_name |
||||
)); |
||||
} |
||||
// Those are later added to a UNION, if there is more than one shape.
|
||||
union_branches.push(format!( |
||||
" ?{} <{}> ?{}", |
||||
subject_var_name, predicate.iri, obj_var_name |
||||
)); |
||||
|
||||
// Recurse to add statements for nested object.
|
||||
// Collect nested WHERE statements to include within this predicate's scope.
|
||||
let nested_stmts = process_shape( |
||||
schema, |
||||
nested_shape, |
||||
&obj_var_name, |
||||
construct_statements, |
||||
where_statements, |
||||
var_counter, |
||||
visited_shapes, |
||||
true, |
||||
); |
||||
nested_where_statements.extend(nested_stmts); |
||||
} |
||||
} |
||||
|
||||
// The where statement (which may be wrapped in OPTIONAL).
|
||||
let where_body: String; |
||||
|
||||
if !union_branches.is_empty() { |
||||
// We have nested shape(s) which were already added to CONSTRUCT above.
|
||||
// Join them with UNION and include nested WHERE statements.
|
||||
|
||||
let union_body = union_branches |
||||
.into_iter() |
||||
.map(|b| format!("{{\n{}\n}}", b)) |
||||
.collect::<Vec<_>>() |
||||
.join(" UNION "); |
||||
|
||||
// Combine the parent binding with nested statements
|
||||
if !nested_where_statements.is_empty() { |
||||
let nested_joined = nested_where_statements.join(" .\n"); |
||||
where_body = format!("{} .\n{}", union_body, nested_joined); |
||||
} else { |
||||
where_body = union_body; |
||||
} |
||||
} else { |
||||
// Regular predicate data type. Just add basic CONSTRUCT and WHERE statements.
|
||||
|
||||
let obj_var_name = get_new_var_name(var_counter); |
||||
if !in_recursion { |
||||
// Only add construct, if we don't have catch-all statement already.
|
||||
new_construct_statements.push(format!( |
||||
" ?{} <{}> ?{}", |
||||
subject_var_name, predicate.iri, obj_var_name |
||||
)); |
||||
} |
||||
where_body = format!( |
||||
" ?{} <{}> ?{}", |
||||
subject_var_name, predicate.iri, obj_var_name |
||||
); |
||||
} |
||||
|
||||
// Wrap in optional, if predicate is optional
|
||||
if predicate.minCardinality < 1 { |
||||
new_where_statements.push(format!(" OPTIONAL {{\n{}\n }}", where_body)); |
||||
} else { |
||||
new_where_statements.push(where_body); |
||||
}; |
||||
} |
||||
|
||||
if in_recursion { |
||||
// All statements in recursive objects need to be optional
|
||||
// because we want to fetch _all_ nested objects,
|
||||
// invalid ones too, for later validation.
|
||||
let pred_var_name = get_new_var_name(var_counter); |
||||
let obj_var_name = get_new_var_name(var_counter); |
||||
|
||||
// The "catch any triple in subject" construct statement
|
||||
construct_statements.push(format!( |
||||
" ?{} ?{} ?{}", |
||||
subject_var_name, pred_var_name, obj_var_name |
||||
)); |
||||
|
||||
let joined_where_statements = new_where_statements.join(" .\n"); |
||||
|
||||
// Return nested statements to be included in parent's scope
|
||||
// Combine catch-all with specific predicates in a UNION
|
||||
let nested_block = format!( |
||||
" {{\n {{?{} ?{} ?{}}}\n UNION {{\n {}\n }}\n }}", |
||||
subject_var_name, pred_var_name, obj_var_name, joined_where_statements |
||||
); |
||||
visited_shapes.remove(&shape.iri); |
||||
return vec![nested_block]; |
||||
} else { |
||||
where_statements.append(&mut new_where_statements); |
||||
construct_statements.append(&mut new_construct_statements); |
||||
} |
||||
visited_shapes.remove(&shape.iri); |
||||
vec![] |
||||
} |
||||
|
||||
let root_shape = schema.get(shape).ok_or(VerifierError::InvalidOrmSchema)?; |
||||
|
||||
// Root subject variable name
|
||||
let root_var_name = get_new_var_name(&mut var_counter); |
||||
|
||||
process_shape( |
||||
schema, |
||||
root_shape, |
||||
&root_var_name, |
||||
&mut construct_statements, |
||||
&mut where_statements, |
||||
&mut var_counter, |
||||
&mut visited_shapes, |
||||
false, |
||||
); |
||||
|
||||
// Filter subjects, if present.
|
||||
if let Some(subjects) = filter_subjects { |
||||
// log_debug!("filter_subjects: {:?}", subjects);
|
||||
let subjects_str = subjects |
||||
.iter() |
||||
.map(|s| format!("<{}>", s)) |
||||
.collect::<Vec<_>>() |
||||
.join(", "); |
||||
where_statements.push(format!(" FILTER(?v0 IN ({}))", subjects_str)); |
||||
} |
||||
|
||||
// Create query from statements.
|
||||
let construct_body = construct_statements.join(" .\n"); |
||||
|
||||
let where_body = where_statements.join(" .\n"); |
||||
|
||||
Ok(format!( |
||||
"CONSTRUCT {{\n{}\n}}\nWHERE {{\n{}\n}}", |
||||
construct_body, where_body |
||||
)) |
||||
} |
||||
/// SPARQL literal escape: backslash, quotes, newlines, tabs.
|
||||
fn escape_literal(lit: &str) -> String { |
||||
let mut out = String::with_capacity(lit.len() + 4); |
||||
for c in lit.chars() { |
||||
match c { |
||||
'\\' => out.push_str("\\\\"), |
||||
'\"' => out.push_str("\\\""), |
||||
'\n' => out.push_str("\\n"), |
||||
'\r' => out.push_str("\\r"), |
||||
'\t' => out.push_str("\\t"), |
||||
_ => out.push(c), |
||||
} |
||||
} |
||||
return out; |
||||
} |
File diff suppressed because it is too large
Load Diff
Loading…
Reference in new issue