diff --git a/engine/verifier/src/orm/handle_backend_update.rs b/engine/verifier/src/orm/handle_backend_update.rs index 0c9f5b6..1ac9b9f 100644 --- a/engine/verifier/src/orm/handle_backend_update.rs +++ b/engine/verifier/src/orm/handle_backend_update.rs @@ -68,8 +68,6 @@ impl Verifier { }) .collect(); - // let mut updates = Vec::new(); - let mut scopes = vec![]; for (scope, subs) in self.orm_subscriptions.iter_mut() { // Remove old subscriptions @@ -86,34 +84,37 @@ impl Verifier { } // prepare to apply updates to tracked subjects and record the changes. - let root_shapes = subs + let root_shapes_and_tracked_subjects = subs .iter() .map(|sub| { - sub.shape_type - .schema - .get(&sub.shape_type.shape) - .unwrap() - .clone() + ( + sub.shape_type + .schema + .get(&sub.shape_type.shape) + .unwrap() + .clone(), + shapes_in_tracked_subjects(&sub.tracked_subjects), + ) }) .collect::>(); - scopes.push((scope.clone(), root_shapes)); + scopes.push((scope.clone(), root_shapes_and_tracked_subjects)); } log_debug!( "[orm_backend_update], creating patch objects for #scopes {}", scopes.len() ); - for (scope, shapes) in scopes { + for (scope, shapes_zip) in scopes { let mut orm_changes: OrmChanges = HashMap::new(); // Apply the changes to tracked subjects. - for shape_arc in shapes { - let shape_iri = shape_arc.iri.clone(); + for (root_shape_arc, all_shapes) in shapes_zip { + let shape_iri = root_shape_arc.iri.clone(); let _ = self.process_changes_for_shape_and_session( &scope, &shape_iri, - shape_arc, + all_shapes, session_id, &triple_inserts, &triple_removes, @@ -159,6 +160,11 @@ impl Verifier { // Iterate over all changes and create patches for (shape_iri, subject_changes) in &orm_changes { for (subject_iri, change) in subject_changes { + log_debug!( + "Patch creating for subject change {}. #changed preds: {}", + subject_iri, + change.predicates.len() + ); // Get the tracked subject for this (subject, shape) pair let tracked_subject = sub .tracked_subjects @@ -201,6 +207,13 @@ impl Verifier { // The subject is valid or has become valid. // Process each predicate change for (_pred_iri, pred_change) in &change.predicates { + log_debug!( + " - Predicate changes: {}; #Adds: {}; #Removes {}", + _pred_iri, + pred_change.values_added.len(), + pred_change.values_removed.len() + ); + let tracked_predicate = pred_change.tracked_predicate.read().unwrap(); let pred_name = tracked_predicate.schema.readablePredicate.clone(); @@ -430,24 +443,16 @@ fn build_path_to_root_and_create_patches( patches: &mut Vec, objects_to_create: &mut HashSet<(Vec, Option)>, ) { + log_debug!( + " - build path, ts: {}, path {:?}", + tracked_subject.subject_iri, + path + ); // If the tracked subject is not valid, we don't create patches for it if tracked_subject.valid != OrmTrackedSubjectValidity::Valid { return; } - // If the tracked subject is newly valid (was not valid before but is now), - // we need to ensure the object is created with an "add object" patch - if tracked_subject.prev_valid != OrmTrackedSubjectValidity::Valid { - queue_patches_for_newly_valid_subject( - tracked_subject, - tracked_subjects, - root_shape, - path, - patches, - objects_to_create, - ); - } - // If this subject has no parents or its shape matches the root shape, we've reached the root if tracked_subject.parents.is_empty() || tracked_subject.shape.iri == *root_shape { // Build the final JSON Pointer path @@ -459,7 +464,7 @@ fn build_path_to_root_and_create_patches( escaped_path.join("/") ); - // Create the patch + // Create the patch for the actual value change patches.push(OrmDiffOp { op: diff_op.0.clone(), valType: diff_op.1.clone(), @@ -467,6 +472,20 @@ fn build_path_to_root_and_create_patches( value: diff_op.2.clone(), }); + // If the subject is newly valid, now we have the full path to queue its creation. + if tracked_subject.prev_valid != OrmTrackedSubjectValidity::Valid { + let mut final_path = vec![tracked_subject.subject_iri.clone()]; + final_path.extend_from_slice(path); + queue_patches_for_newly_valid_subject( + tracked_subject, + tracked_subjects, + root_shape, + &final_path, + patches, + objects_to_create, + ); + } + return; } @@ -475,13 +494,14 @@ fn build_path_to_root_and_create_patches( let parent_ts = parent_tracked_subject.read().unwrap(); // Build the path segment for this parent - if let Some(new_path) = build_path_segment_for_parent(tracked_subject, &parent_ts, path) { + if let Some(mut new_path) = build_path_segment_for_parent(tracked_subject, &parent_ts, path) + { // Recurse to the parent build_path_to_root_and_create_patches( &parent_ts, tracked_subjects, root_shape, - &mut new_path.clone(), + &mut new_path, diff_op.clone(), patches, objects_to_create, @@ -584,3 +604,15 @@ fn create_diff_ops_from_predicate_change( // } return ops; } + +fn shapes_in_tracked_subjects( + tracked_subjects: &HashMap>>>, +) -> Vec> { + let mut shapes = vec![]; + for (_subject_iri, tss) in tracked_subjects.iter() { + for (_shape_iri, ts) in tss.iter() { + shapes.push(ts.read().unwrap().shape.clone()); + } + } + shapes +} diff --git a/engine/verifier/src/orm/materialize.rs b/engine/verifier/src/orm/materialize.rs index 00efe18..d55017a 100644 --- a/engine/verifier/src/orm/materialize.rs +++ b/engine/verifier/src/orm/materialize.rs @@ -103,16 +103,13 @@ impl Verifier { let mut return_vals: Value = Value::Array(vec![]); let return_val_vec = return_vals.as_array_mut().unwrap(); - // log_debug!( - // "Tracked subjects:\n{:?}\n", - // orm_subscription.tracked_subjects, - // ); + log_debug!("\nMaterializing: {}", shape_type.shape); // For each valid change struct, we build an orm object. // The way we get the changes from the tracked subjects is a bit hacky, sorry. for (subject_iri, tracked_subjects_by_shape) in &orm_subscription.tracked_subjects { if let Some(tracked_subject) = tracked_subjects_by_shape.get(&shape_type.shape) { let ts = tracked_subject.read().unwrap(); - log_info!("changes for: {:?} valid: {:?}\n", ts.subject_iri, ts.valid); + log_info!(" - changes for: {:?} valid: {:?}", ts.subject_iri, ts.valid); if ts.valid == OrmTrackedSubjectValidity::Valid { if let Some(change) = changes diff --git a/engine/verifier/src/orm/process_changes.rs b/engine/verifier/src/orm/process_changes.rs index 011c228..aa36b8b 100644 --- a/engine/verifier/src/orm/process_changes.rs +++ b/engine/verifier/src/orm/process_changes.rs @@ -89,7 +89,7 @@ impl Verifier { &mut self, nuri: &NuriV0, root_shape_iri: &String, - shape: Arc, + shapes: Vec>, session_id: u64, triples_added: &[Triple], triples_removed: &[Triple], @@ -101,7 +101,9 @@ impl Verifier { // Track (shape_iri, subject_iri) pairs currently being validated to prevent cycles and double evaluation. let mut currently_validating: HashSet<(String, String)> = HashSet::new(); // Add root shape for first validation run. - shape_validation_stack.push((shape, vec![])); + for shape in shapes { + shape_validation_stack.push((shape, vec![])); + } // Process queue of shapes and subjects to validate. // For a given shape, we evaluate every subject against that shape. @@ -127,6 +129,7 @@ impl Verifier { shape.iri ); + // For each modified subject, apply changes to tracked subjects and validate. for subject_iri in &modified_subject_iris { let validation_key = (shape.iri.clone(), subject_iri.to_string()); @@ -248,6 +251,8 @@ impl Verifier { } // Validate the subject. + // need_eval contains elements in reverse priority (last element to be validated first) + // TODO: Improve order by distinguishing between parents, children and self to be re-evaluated. let need_eval = Self::update_subject_validity(change, &shape, orm_subscription); // We add the need_eval to be processed next after loop. @@ -296,7 +301,7 @@ impl Verifier { self.process_changes_for_shape_and_session( nuri, &root_shape_iri, - shape_arc.clone(), + [shape_arc.clone()].to_vec(), session_id, &new_triples, &vec![], @@ -356,7 +361,7 @@ impl Verifier { self.process_changes_for_shape_and_session( nuri, &shape_iri, - root_shape, + [root_shape].to_vec(), session_id, triples_added, triples_removed, diff --git a/sdk/rust/src/tests/mod.rs b/sdk/rust/src/tests/mod.rs index caac2a7..37b948a 100644 --- a/sdk/rust/src/tests/mod.rs +++ b/sdk/rust/src/tests/mod.rs @@ -51,7 +51,7 @@ pub(crate) fn assert_json_eq(expected: &mut Value, actual: &mut Value) { let diff = serde_json_diff::values(expected.clone(), actual.clone()); if let Some(diff_) = diff { log_err!( - "Expected and actual ORM JSON mismatch.\nDiff: {:?}\nExpected: {}\nActual: {}", + "Expected and actual JSON mismatch.\nDiff: {:?}\nExpected: {}\nActual: {}", diff_, expected, actual diff --git a/sdk/rust/src/tests/orm_patches.rs b/sdk/rust/src/tests/orm_patches.rs index 63438fb..5f66665 100644 --- a/sdk/rust/src/tests/orm_patches.rs +++ b/sdk/rust/src/tests/orm_patches.rs @@ -7,7 +7,7 @@ // notice may not be copied, modified, or distributed except // according to those terms. -use crate::local_broker::{doc_create, doc_sparql_construct, doc_sparql_update, orm_start}; +use crate::local_broker::{doc_sparql_update, orm_start}; use crate::tests::create_or_open_wallet::create_or_open_wallet; use crate::tests::{assert_json_eq, create_doc_with_data}; use async_std::stream::StreamExt; @@ -864,20 +864,12 @@ INSERT DATA { let mut expected = json!([ // Modified house color - { - "op": "remove", - "path": "/urn:test:house1/rootColor", - }, { "op": "add", "value": "red", "path": "/urn:test:house1/rootColor", }, // Modified Alice's name - { - "op": "remove", - "path": "/urn:test:house1/inhabitants/urn:test:person1/name", - }, { "op": "add", "value": "Alicia",