From 952dce50d91a80c87331e33d1b8cb619463b727e Mon Sep 17 00:00:00 2001 From: Laurin Weger Date: Thu, 16 Oct 2025 18:28:39 +0200 Subject: [PATCH] fix nested fetching and processing of objects becoming valid --- .../verifier/src/orm/handle_backend_update.rs | 4 +- engine/verifier/src/orm/process_changes.rs | 164 +++++++++--------- 2 files changed, 87 insertions(+), 81 deletions(-) diff --git a/engine/verifier/src/orm/handle_backend_update.rs b/engine/verifier/src/orm/handle_backend_update.rs index ce35c19..d7e9653 100644 --- a/engine/verifier/src/orm/handle_backend_update.rs +++ b/engine/verifier/src/orm/handle_backend_update.rs @@ -109,8 +109,10 @@ impl Verifier { // Apply the changes to tracked subjects. for shape_arc in shapes { + let shape_iri = shape_arc.iri.clone(); let _ = self.process_changes_for_shape_and_session( &scope, + &shape_iri, shape_arc, session_id, &triple_inserts, @@ -352,7 +354,7 @@ fn check_should_create_parent_predicate_object( if is_child { let is_multi = pred_arc.maxCardinality > 1 || pred_arc.maxCardinality == -1; - + if is_multi { // Check if any siblings were previously valid let any_sibling_was_valid = tp.tracked_children.iter().any(|child| { diff --git a/engine/verifier/src/orm/process_changes.rs b/engine/verifier/src/orm/process_changes.rs index b3f698f..6445771 100644 --- a/engine/verifier/src/orm/process_changes.rs +++ b/engine/verifier/src/orm/process_changes.rs @@ -81,54 +81,15 @@ impl Verifier { Ok(merged) } - /// Helper to call process_changes_for_shape for all subscriptions on nuri's document. - fn process_changes_for_nuri_and_session( - self: &mut Self, - nuri: &NuriV0, - session_id: u64, - triples_added: &[Triple], - triples_removed: &[Triple], - data_already_fetched: bool, - ) -> Result { - let mut orm_changes = HashMap::new(); - - let shapes: Vec<_> = self - .orm_subscriptions - .get(nuri) - .unwrap() - .iter() - .map(|sub| { - sub.shape_type - .schema - .get(&sub.shape_type.shape) - .unwrap() - .clone() - }) - .collect(); - - for root_shape in shapes { - self.process_changes_for_shape_and_session( - nuri, - root_shape, - session_id, - triples_added, - triples_removed, - &mut orm_changes, - data_already_fetched, - )?; - } - - Ok(orm_changes) - } - /// Add and remove the triples from the tracked subjects, /// re-validate, and update `changes` containing the updated data. /// Works by queuing changes by shape and subjects on a stack. /// Nested objects are added to the stack pub(crate) fn process_changes_for_shape_and_session( - self: &mut Self, + &mut self, nuri: &NuriV0, - root_shape: Arc, + root_shape_iri: &String, + shape: Arc, session_id: u64, triples_added: &[Triple], triples_removed: &[Triple], @@ -140,8 +101,7 @@ impl Verifier { // Track (shape_iri, subject_iri) pairs currently being validated to prevent cycles and double evaluation. let mut currently_validating: HashSet<(String, String)> = HashSet::new(); // Add root shape for first validation run. - let root_shape_iri = root_shape.iri.clone(); - shape_validation_stack.push((root_shape, vec![])); + shape_validation_stack.push((shape, vec![])); // Process queue of shapes and subjects to validate. // For a given shape, we evaluate every subject against that shape. @@ -156,14 +116,6 @@ impl Verifier { .chain(removed_triples_by_subject.keys()) .collect(); - let mut orm_subscription = self - .orm_subscriptions - .get_mut(nuri) - .unwrap() - .iter_mut() - .find(|sub| sub.session_id == session_id && sub.shape_type.shape == root_shape_iri) - .unwrap(); - // Variable to collect nested objects that need validation. let mut nested_objects_to_eval: HashMap> = HashMap::new(); @@ -185,8 +137,13 @@ impl Verifier { subject_iri, shape.iri ); - // Mark as invalid due to cycle - // TODO: We could handle this by handling nested references as IRIs. + + // Find tracked and mark as invalid. + let orm_subscription = &mut self.get_first_orm_subscription_for( + nuri, + Some(&root_shape_iri), + Some(&session_id), + ); if let Some(tracked_shapes) = orm_subscription.tracked_subjects.get(*subject_iri) { @@ -231,12 +188,23 @@ impl Verifier { "Adding triples to change tracker for subject {}", subject_iri ); + + let orm_subscription = self + .orm_subscriptions + .get_mut(nuri) + .unwrap() + .iter_mut() + .find(|sub| { + sub.shape_type.shape == shape.iri && sub.session_id == session_id + }) + .unwrap(); + if let Err(e) = add_remove_triples( shape.clone(), subject_iri, triples_added_for_subj, triples_removed_for_subj, - &mut orm_subscription, + orm_subscription, change, ) { log_err!("apply_changes_from_triples add/remove error: {:?}", e); @@ -247,9 +215,18 @@ impl Verifier { log_debug!("not applying triples again for subject {subject_iri}"); } + let orm_subscription = self + .orm_subscriptions + .get_mut(nuri) + .unwrap() + .iter_mut() + .find(|sub| { + sub.shape_type.shape == shape.iri && sub.session_id == session_id + }) + .unwrap(); + // Validate the subject. - let need_eval = - Self::update_subject_validity(change, &shape, &mut orm_subscription); + let need_eval = Self::update_subject_validity(change, &shape, orm_subscription); // We add the need_eval to be processed next after loop. // Filter out subjects already in the validation stack to prevent double evaluation. @@ -268,13 +245,15 @@ impl Verifier { // Now, we queue all non-evaluated objects for (shape_iri, objects_to_eval) in &nested_objects_to_eval { - let orm_subscription = self.get_first_orm_subscription_for( - nuri, - Some(&root_shape_iri), - Some(&session_id), - ); - // Extract schema and shape Arc before mutable borrow - let schema = orm_subscription.shape_type.schema.clone(); + // Extract schema and shape Arc first (before any borrows) + let schema = { + let orm_sub = self.get_first_orm_subscription_for( + nuri, + Some(&root_shape_iri), + Some(&session_id), + ); + orm_sub.shape_type.schema.clone() + }; let shape_arc = schema.get(shape_iri).unwrap().clone(); // Data might need to be fetched (if it has not been during initialization or nested shape fetch). @@ -294,6 +273,7 @@ impl Verifier { // Recursively process nested objects. self.process_changes_for_shape_and_session( nuri, + &root_shape_iri, shape_arc.clone(), session_id, &new_triples, @@ -323,23 +303,47 @@ impl Verifier { Ok(()) } - /// Helper to get orm subscriptions for nuri, shapes and sessions. - pub fn get_orm_subscriptions_for( - &self, + /// Helper to call process_changes_for_shape for all subscriptions on nuri's document. + fn process_changes_for_nuri_and_session( + self: &mut Self, nuri: &NuriV0, - shape: Option<&ShapeIri>, - session_id: Option<&u64>, - ) -> Vec<&OrmSubscription> { - self.orm_subscriptions.get(nuri).unwrap(). - // Filter shapes, if present. - iter().filter(|s| match shape { - Some(sh) => *sh == s.shape_type.shape, - None => true - // Filter session ids if present. - }).filter(|s| match session_id { - Some(id) => *id == s.session_id, - None => true - }).collect() + session_id: u64, + triples_added: &[Triple], + triples_removed: &[Triple], + data_already_fetched: bool, + ) -> Result { + let mut orm_changes = HashMap::new(); + + let shapes: Vec<_> = self + .orm_subscriptions + .get(nuri) + .unwrap() + .iter() + .map(|sub| { + sub.shape_type + .schema + .get(&sub.shape_type.shape) + .unwrap() + .clone() + }) + .collect(); + + for root_shape in shapes { + let shape_iri = root_shape.iri.clone(); + // Now we can safely call the method with self + self.process_changes_for_shape_and_session( + nuri, + &shape_iri, + root_shape, + session_id, + triples_added, + triples_removed, + &mut orm_changes, + data_already_fetched, + )?; + } + + Ok(orm_changes) } pub fn get_first_orm_subscription_for(