diff --git a/engine/verifier/src/orm/handle_backend_update.rs b/engine/verifier/src/orm/handle_backend_update.rs index 683c5c7..b8a2b0e 100644 --- a/engine/verifier/src/orm/handle_backend_update.rs +++ b/engine/verifier/src/orm/handle_backend_update.rs @@ -136,400 +136,6 @@ impl Verifier { let mut paths_of_objects_to_create: HashSet<(Vec, Option)> = HashSet::new(); - // Function to create diff objects from a given change. - // The function recurses from child to parents down to a root tracked subject. - // If multiple parents exist, it adds separate patches for each. - fn add_diff_ops_for_tracked_subject( - tracked_subject: &OrmTrackedSubject, - tracked_subjects: &HashMap< - String, - HashMap>>, - >, - root_shape: &String, - path: &mut Vec, - diff_op: ( - OrmDiffOpType, - Option, - Option, // The value added / removed - Option, // The IRI, if change is an added / removed object. - ), - patches: &mut Vec, - paths_of_objects_to_create: &mut HashSet<(Vec, Option)>, - ) { - // If this subject has no parents or its shape matches the root shape, we've reached the root - if tracked_subject.parents.is_empty() - || tracked_subject.shape.iri == *root_shape - { - // Build the final JSON Pointer path - let escaped_path: Vec = - path.iter().map(|seg| escape_json_pointer(seg)).collect(); - let json_pointer = format!("/{}", escaped_path.join("/")); - - // Create the patch - let patch = OrmDiffOp { - op: diff_op.0.clone(), - valType: diff_op.1.clone(), - path: json_pointer, - value: diff_op.2.clone(), - }; - patches.push(patch); - - return; - } - - // Recurse to parents - for (parent_iri, parent_tracked_subject) in tracked_subject.parents.iter() { - // Get predicate schema linking parent with tracked_subject - - // Use predicate schema readable_predicate to add to path. - // If predicate schema is multi, add our own subject iri to path first. - - // If parent is root, we don't need to recurse. - // Instead we add new patches based on the path (we need to escape segments before) - // and the diff_op content - - let parent_ts = parent_tracked_subject.read().unwrap(); - - // Find the predicate schema linking parent to this tracked subject - for pred_arc in &parent_ts.shape.predicates { - // Check if this predicate has our subject as a child - if let Some(tracked_pred) = - parent_ts.tracked_predicates.get(&pred_arc.iri) - { - let tp = tracked_pred.read().unwrap(); - - // Check if this tracked subject is in the children - let is_child = tp.tracked_children.iter().any(|child| { - let child_read = child.read().unwrap(); - child_read.subject_iri == tracked_subject.subject_iri - }); - - if is_child { - // Build the path segment - let mut new_path = path.clone(); - - let is_multi = pred_arc.maxCardinality > 1 - || pred_arc.maxCardinality == -1; - - // For multi-valued predicates, add the object IRI as a key first - if is_multi { - new_path.insert(0, tracked_subject.subject_iri.clone()); - } - - // Add the readable predicate name - new_path.insert(0, pred_arc.readablePredicate.clone()); - - // Recurse to the parent - add_diff_ops_for_tracked_subject( - &parent_ts, - tracked_subjects, - root_shape, - &mut new_path, - diff_op.clone(), - patches, - paths_of_objects_to_create, - ); - - break; - } - } - } - } - } - - fn diff_op_from_pred_change( - pred_change: &OrmTrackedPredicateChanges, - ) -> Vec<( - OrmDiffOpType, - Option, - Option, // The value added / removed - Option, // The IRI, if change is an added / removed object. - )> { - let tracked_predicate = pred_change.tracked_predicate.read().unwrap(); - - let is_multi = tracked_predicate.schema.maxCardinality > 1 - || tracked_predicate.schema.maxCardinality == -1; - let is_object = tracked_predicate - .schema - .dataTypes - .iter() - .any(|dt| dt.shape.is_some()); - - if !is_multi && !is_object { - if pred_change.values_added.len() == 1 { - // A value was added. Another one might have been removed - // but the add patch overwrite previous values. - return [( - OrmDiffOpType::add, - None, - Some(json!(pred_change.values_added[0])), - None, - )] - .to_vec(); - } else { - // Since there is only one possible value, removing the path is enough. - return [(OrmDiffOpType::remove, None, None, None)].to_vec(); - } - } else if is_multi && !is_object { - let mut ops = vec![]; - if pred_change.values_added.len() > 0 { - ops.push(( - OrmDiffOpType::add, - Some(OrmDiffType::set), - Some(json!(pred_change.values_added)), - None, - )); - } - if pred_change.values_removed.len() > 0 { - ops.push(( - OrmDiffOpType::remove, - Some(OrmDiffType::set), - Some(json!(pred_change.values_removed)), - None, - )); - } - return ops; - } - // objects are not handled here because objects to create - // are registered during path traversal. - return vec![]; - } - - // Helper function to determine the highest-priority valid shape for a subject - // given the allowed shapes in a predicate's dataTypes. - // Returns (current_valid_shape, previous_valid_shape) - #[allow(dead_code)] - fn get_highest_priority_valid_shapes( - subject_iri: &SubjectIri, - allowed_shapes: &[OrmSchemaDataType], // From predicate.dataTypes (in priority order) - tracked_subjects: &HashMap< - String, - HashMap>>, - >, - ) -> (Option, Option) { - let Some(shapes_for_subject) = tracked_subjects.get(subject_iri) else { - return (None, None); - }; - - // Find current highest-priority valid shape - let current_valid = allowed_shapes - .iter() - .filter_map(|dt| dt.shape.as_ref()) - .find_map(|shape_iri| { - shapes_for_subject.get(shape_iri).and_then(|ts| { - let tracked = ts.read().unwrap(); - if tracked.valid == OrmTrackedSubjectValidity::Valid { - Some(shape_iri.clone()) - } else { - None - } - }) - }); - - // Find previous highest-priority valid shape - let previous_valid = allowed_shapes - .iter() - .filter_map(|dt| dt.shape.as_ref()) - .find_map(|shape_iri| { - shapes_for_subject.get(shape_iri).and_then(|ts| { - let tracked = ts.read().unwrap(); - if tracked.prev_valid == OrmTrackedSubjectValidity::Valid { - Some(shape_iri.clone()) - } else { - None - } - }) - }); - - (current_valid, previous_valid) - } - - // Helper function to handle validity changes when highest-priority shape changes - #[allow(dead_code)] - fn handle_shape_priority_change( - subject_iri: &SubjectIri, - shape_iri: &ShapeIri, - tracked_subjects: &HashMap< - String, - HashMap>>, - >, - root_shape: &String, - orm_changes: &OrmChanges, - patches: &mut Vec, - paths_of_objects_to_create: &mut HashSet<(Vec, Option)>, - ) { - // Step 1: Check if this subject has multiple tracked shapes - let Some(shapes_for_subject) = tracked_subjects.get(subject_iri) else { - return; - }; - - if shapes_for_subject.len() <= 1 { - // Only one shape, no priority conflict possible - return; - } - - // Step 2: Get the current tracked subject - let Some(tracked_subject_arc) = shapes_for_subject.get(shape_iri) else { - return; - }; - let tracked_subject = tracked_subject_arc.read().unwrap(); - - // Step 3: For each parent, check if the highest-priority valid shape changed - for (parent_iri, parent_tracked_subject_arc) in tracked_subject.parents.iter() { - let parent_ts = parent_tracked_subject_arc.read().unwrap(); - - // Find the predicate linking parent to this subject - for pred_arc in &parent_ts.shape.predicates { - if let Some(tracked_pred) = - parent_ts.tracked_predicates.get(&pred_arc.iri) - { - let tp = tracked_pred.read().unwrap(); - - // Check if this tracked subject is a child of this predicate - let is_child = tp.tracked_children.iter().any(|child| { - let child_read = child.read().unwrap(); - child_read.subject_iri == *subject_iri - }); - - if !is_child { - continue; - } - - // Get the allowed shapes for this predicate (in priority order) - let allowed_shapes: Vec<_> = pred_arc - .dataTypes - .iter() - .filter(|dt| dt.shape.is_some()) - .collect(); - - if allowed_shapes.len() <= 1 { - // No priority conflict possible with single shape - continue; - } - - // Determine current and previous highest-priority valid shapes - let (current_valid, previous_valid) = - get_highest_priority_valid_shapes( - subject_iri, - &pred_arc.dataTypes, - tracked_subjects, - ); - - // Step 4: Create patches based on what changed - if current_valid != previous_valid { - let is_multi = pred_arc.maxCardinality > 1 - || pred_arc.maxCardinality == -1; - - // Case A: Shape switch (ShapeA -> ShapeB) - if let (Some(new_shape), Some(old_shape)) = - (¤t_valid, &previous_valid) - { - // Remove the old object - if let Some(old_ts) = shapes_for_subject.get(old_shape) { - let old_tracked = old_ts.read().unwrap(); - let mut path = vec![]; - let diff_op = ( - OrmDiffOpType::remove, - Some(OrmDiffType::object), - None, - Some(subject_iri.clone()), - ); - - add_diff_ops_for_tracked_subject( - &old_tracked, - tracked_subjects, - root_shape, - &mut path, - diff_op, - patches, - paths_of_objects_to_create, - ); - } - - // Add the new object (need to materialize it) - if let Some(new_ts) = shapes_for_subject.get(new_shape) { - let new_tracked = new_ts.read().unwrap(); - - // TODO: Materialize the object with current triples - // This requires access to the change data or re-querying - // For now, we'll just create an object placeholder patch - let mut path = vec![]; - let diff_op = ( - OrmDiffOpType::add, - Some(OrmDiffType::object), - Some(Value::Null), - Some(subject_iri.clone()), - ); - - add_diff_ops_for_tracked_subject( - &new_tracked, - tracked_subjects, - root_shape, - &mut path, - diff_op, - patches, - paths_of_objects_to_create, - ); - } - } - // Case B: Object became valid (None -> ShapeX) - else if let (Some(new_shape), None) = - (¤t_valid, &previous_valid) - { - if let Some(new_ts) = shapes_for_subject.get(new_shape) { - let new_tracked = new_ts.read().unwrap(); - let mut path = vec![]; - let diff_op = ( - OrmDiffOpType::add, - Some(OrmDiffType::object), - Some(Value::Null), - Some(subject_iri.clone()), - ); - - add_diff_ops_for_tracked_subject( - &new_tracked, - tracked_subjects, - root_shape, - &mut path, - diff_op, - patches, - paths_of_objects_to_create, - ); - } - } - // Case C: Object became invalid (ShapeX -> None) - else if let (None, Some(old_shape)) = - (¤t_valid, &previous_valid) - { - if let Some(old_ts) = shapes_for_subject.get(old_shape) { - let old_tracked = old_ts.read().unwrap(); - let mut path = vec![]; - let diff_op = ( - OrmDiffOpType::remove, - Some(OrmDiffType::object), - None, - Some(subject_iri.clone()), - ); - - add_diff_ops_for_tracked_subject( - &old_tracked, - tracked_subjects, - root_shape, - &mut path, - diff_op, - patches, - paths_of_objects_to_create, - ); - } - } - } - - break; // Found the predicate, no need to check others - } - } - } - } - // We construct object patches from a change (which is associated with a shape type). {op: add, valType: object, value: Null, path: ...} // For each change that has a subject tracked in this subscription, // - Get change operation (calling diff_op_from_pred_change). @@ -548,59 +154,64 @@ impl Verifier { // Problem: We might not have the triples present to materialize the newly valid object so we need to fetch them. // Process changes for this subscription - // Iterate through all changes and create patches + // Iterate over all changes and create patches for (shape_iri, subject_changes) in &orm_changes { for (subject_iri, change) in subject_changes { // Get the tracked subject for this (subject, shape) pair - let tracked_subject_opt = sub + let tracked_subject = sub .tracked_subjects .get(subject_iri) .and_then(|shapes| shapes.get(shape_iri)) - .map(|ts| ts.read().unwrap()); - - let Some(tracked_subject) = tracked_subject_opt else { + .map(|ts| ts.read().unwrap()) + .unwrap(); + + // Now we have the tracked predicate (containing the shape) and the change. + // Check validity changes + if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Invalid + && tracked_subject.valid == OrmTrackedSubjectValidity::Invalid + { + // Is the subject invalid and was it before? There is nothing we need to inform about. continue; - }; - - // Process each predicate change - for (pred_iri, pred_change) in &change.predicates { - let tracked_predicate = pred_change.tracked_predicate.read().unwrap(); - let pred_name = tracked_predicate.schema.readablePredicate.clone(); - // Check validity changes - if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Invalid - && tracked_subject.valid == OrmTrackedSubjectValidity::Invalid - { - // Is the subject invalid and was it before? There is nothing we need to inform about. - return; - } else if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Valid - && tracked_subject.valid == OrmTrackedSubjectValidity::Invalid - || tracked_subject.valid == OrmTrackedSubjectValidity::Untracked - { - // Has the subject become invalid or untracked? - // We add a patch, deleting the object at its root. - let mut path: Vec = - vec![subject_iri.clone(), pred_name.clone()]; - add_diff_ops_for_tracked_subject( - &tracked_subject, - &sub.tracked_subjects, - &sub.shape_type.shape, - &mut path, - (OrmDiffOpType::remove, Some(OrmDiffType::object), None, None), - &mut patches, - &mut paths_of_objects_to_create, - ); + } else if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Valid + && tracked_subject.valid == OrmTrackedSubjectValidity::Invalid + || tracked_subject.valid == OrmTrackedSubjectValidity::Untracked + { + // Has the subject become invalid or untracked? + // We add a patch, deleting the object at its root. + let mut path: Vec; + if tracked_subject.parents.is_empty() { + // If this is a root object, we need to add the object's id itself. + path = vec![tracked_subject.subject_iri.clone()]; } else { - // The subject is valid or has become valid. + path = vec![]; + } + + build_path_to_root_and_create_patches( + &tracked_subject, + &sub.tracked_subjects, + &sub.shape_type.shape, + &mut path, + (OrmDiffOpType::remove, Some(OrmDiffType::object), None, None), + &mut patches, + &mut paths_of_objects_to_create, + ); + } else { + // The subject is valid or has become valid. + // Process each predicate change + for (_pred_iri, pred_change) in &change.predicates { + let tracked_predicate = + pred_change.tracked_predicate.read().unwrap(); + let pred_name = tracked_predicate.schema.readablePredicate.clone(); // Get the diff operations for this predicate change - let diff_ops = diff_op_from_pred_change(pred_change); + let diff_ops = create_diff_ops_from_predicate_change(pred_change); // For each diff operation, traverse up to the root to build the path for diff_op in diff_ops { - let mut path = vec![subject_iri.clone(), pred_name.clone()]; + let mut path = vec![pred_name.clone()]; // Start recursion from this tracked subject - add_diff_ops_for_tracked_subject( + build_path_to_root_and_create_patches( &tracked_subject, &sub.tracked_subjects, &sub.shape_type.shape, @@ -654,3 +265,264 @@ impl Verifier { } } } + +/// Find the predicate schema linking a parent to a child tracked subject and build the path segment. +/// Returns the updated path if a linking predicate is found. +fn build_path_segment_for_parent( + tracked_subject: &OrmTrackedSubject, + parent_ts: &OrmTrackedSubject, + base_path: &[String], +) -> Option> { + // Find the predicate schema linking parent to this tracked subject + for pred_arc in &parent_ts.shape.predicates { + // Check if this predicate has our subject as a child + if let Some(tracked_pred) = parent_ts.tracked_predicates.get(&pred_arc.iri) { + let tp = tracked_pred.read().unwrap(); + + // Check if this tracked subject is in the children + let is_child = tp.tracked_children.iter().any(|child| { + let child_read = child.read().unwrap(); + child_read.subject_iri == tracked_subject.subject_iri + }); + + if is_child { + // Build the path segment + let mut new_path = base_path.to_vec(); + + let is_multi = pred_arc.maxCardinality > 1 || pred_arc.maxCardinality == -1; + + // For multi-valued predicates, add the object IRI as a key first + if is_multi { + new_path.insert(0, tracked_subject.subject_iri.clone()); + } + + // Add the readable predicate name + new_path.insert(0, pred_arc.readablePredicate.clone()); + + return Some(new_path); + } + } + } + None +} + +/// Recursively build the path from a tracked subject to the root and create diff operation patches. +/// The function recurses from child to parents down to a root tracked subject. +/// If multiple parents exist, it adds separate patches for each. +fn build_path_to_root_and_create_patches( + tracked_subject: &OrmTrackedSubject, + tracked_subjects: &HashMap>>>, + root_shape: &String, + path: &mut Vec, + diff_op: ( + OrmDiffOpType, + Option, + Option, // The value added / removed + Option, // The IRI, if change is an added / removed object. + ), + patches: &mut Vec, + paths_of_objects_to_create: &mut HashSet<(Vec, Option)>, +) { + // If the tracked subject is not valid, we don't create patches for it + if tracked_subject.valid != OrmTrackedSubjectValidity::Valid { + return; + } + + // If the tracked subject is newly valid (was not valid before but is now), + // we need to ensure the object is created with an "add object" patch + if tracked_subject.prev_valid != OrmTrackedSubjectValidity::Valid { + // Check if we're at a root subject or need to traverse to parents + if tracked_subject.parents.is_empty() || tracked_subject.shape.iri == *root_shape { + // At root: build the path with the subject IRI + let escaped_path: Vec = + path.iter().map(|seg| escape_json_pointer(seg)).collect(); + let json_pointer = format!( + "/{}/{}", + escape_json_pointer(&tracked_subject.subject_iri), + escaped_path.join("/") + ); + + // Create an "add object" patch to ensure the object exists + patches.push(OrmDiffOp { + op: OrmDiffOpType::add, + valType: Some(OrmDiffType::object), + path: json_pointer.clone(), + value: None, + }); + + // Also add the id field for the object + patches.push(OrmDiffOp { + op: OrmDiffOpType::add, + valType: None, + path: format!("{}/id", json_pointer), + value: Some(json!(tracked_subject.subject_iri)), + }); + } else { + // Not at root: traverse to parents and create object patches along the way + for (_parent_iri, parent_tracked_subject) in tracked_subject.parents.iter() { + let parent_ts = parent_tracked_subject.read().unwrap(); + + if let Some(new_path) = + build_path_segment_for_parent(tracked_subject, &parent_ts, path) + { + // Recurse to the parent first + build_path_to_root_and_create_patches( + &parent_ts, + tracked_subjects, + root_shape, + &mut new_path.clone(), + (OrmDiffOpType::add, Some(OrmDiffType::object), None, None), + patches, + paths_of_objects_to_create, + ); + } + } + } + } + + // If this subject has no parents or its shape matches the root shape, we've reached the root + if tracked_subject.parents.is_empty() || tracked_subject.shape.iri == *root_shape { + // Build the final JSON Pointer path + let escaped_path: Vec = path.iter().map(|seg| escape_json_pointer(seg)).collect(); + // Always add the root subject to the path. + let json_pointer = format!( + "/{}/{}", + escape_json_pointer(&tracked_subject.subject_iri), + escaped_path.join("/") + ); + + // Create the patch + patches.push(OrmDiffOp { + op: diff_op.0.clone(), + valType: diff_op.1.clone(), + path: json_pointer.clone(), + value: diff_op.2.clone(), + }); + + // // If a new object is created on a predicate where multiple ones are allowed, create IRI path too. + // if let Some(added_obj_iri) = diff_op.3 { + // patches.push(OrmDiffOp { + // op: diff_op.0.clone(), + // valType: diff_op.1.clone(), + // path: format!("{}/{}", json_pointer, escape_json_pointer(&added_obj_iri)), + // value: diff_op.2.clone(), + // }); + // } + + return; + } + + // Recurse to parents + for (_parent_iri, parent_tracked_subject) in tracked_subject.parents.iter() { + let parent_ts = parent_tracked_subject.read().unwrap(); + + // Build the path segment for this parent + if let Some(new_path) = build_path_segment_for_parent(tracked_subject, &parent_ts, path) { + // Recurse to the parent + build_path_to_root_and_create_patches( + &parent_ts, + tracked_subjects, + root_shape, + &mut new_path.clone(), + diff_op.clone(), + patches, + paths_of_objects_to_create, + ); + } + } +} + +/// Create diff operations from a predicate change. +/// Returns a list of (op_type, value_type, value, iri) tuples. +fn create_diff_ops_from_predicate_change( + pred_change: &OrmTrackedPredicateChanges, +) -> Vec<( + OrmDiffOpType, + Option, + Option, // The value added / removed + Option, // The IRI, if change is an added / removed object. +)> { + let tracked_predicate = pred_change.tracked_predicate.read().unwrap(); + + let is_multi = tracked_predicate.schema.maxCardinality > 1 + || tracked_predicate.schema.maxCardinality == -1; + let is_object = tracked_predicate + .schema + .dataTypes + .iter() + .any(|dt| dt.shape.is_some()); + + let mut ops = vec![]; + + if !is_multi && !is_object { + if pred_change.values_added.len() == 1 { + // A value was added. Another one might have been removed + // but the add patch overwrite previous values. + return [( + OrmDiffOpType::add, + None, + Some(json!(pred_change.values_added[0])), + None, + )] + .to_vec(); + } else { + // Since there is only one possible value, removing the path is enough. + return [(OrmDiffOpType::remove, None, None, None)].to_vec(); + } + } else if is_multi && !is_object { + if pred_change.values_added.len() > 0 { + ops.push(( + OrmDiffOpType::add, + Some(OrmDiffType::set), + Some(json!(pred_change.values_added)), + None, + )); + } + if pred_change.values_removed.len() > 0 { + ops.push(( + OrmDiffOpType::remove, + Some(OrmDiffType::set), + Some(json!(pred_change.values_removed)), + None, + )); + } + return ops; + } + // else if !is_multi && is_object { + // if pred_change.values_added.len() > 0 { + // ops.push((OrmDiffOpType::add, Some(OrmDiffType::object), None, None)); + // } else if pred_change.values_removed.len() > 0 { + // ops.push((OrmDiffOpType::remove, Some(OrmDiffType::object), None, None)); + // } + // } else if is_multi && is_object { + // for val_added in pred_change.values_added.iter() { + // let iri = match val_added { + // BasicType::Str(s) => s, + // _ => { + // continue; + // } + // }; + // ops.push(( + // OrmDiffOpType::add, + // Some(OrmDiffType::object), + // None, + // Some(iri.clone()), + // )); + // } + // for val_removed in pred_change.values_added.iter() { + // let iri = match val_removed { + // BasicType::Str(s) => s, + // _ => { + // continue; + // } + // }; + // ops.push(( + // OrmDiffOpType::remove, + // Some(OrmDiffType::object), + // None, + // Some(iri.clone()), + // )); + // } + // } + return ops; +}