fix nested fetching and processing of objects becoming valid

feat/orm-diffs
Laurin Weger 3 days ago
parent 5bab0073c0
commit 952dce50d9
No known key found for this signature in database
GPG Key ID: 9B372BB0B792770F
  1. 4
      engine/verifier/src/orm/handle_backend_update.rs
  2. 164
      engine/verifier/src/orm/process_changes.rs

@ -109,8 +109,10 @@ impl Verifier {
// Apply the changes to tracked subjects. // Apply the changes to tracked subjects.
for shape_arc in shapes { for shape_arc in shapes {
let shape_iri = shape_arc.iri.clone();
let _ = self.process_changes_for_shape_and_session( let _ = self.process_changes_for_shape_and_session(
&scope, &scope,
&shape_iri,
shape_arc, shape_arc,
session_id, session_id,
&triple_inserts, &triple_inserts,
@ -352,7 +354,7 @@ fn check_should_create_parent_predicate_object(
if is_child { if is_child {
let is_multi = pred_arc.maxCardinality > 1 || pred_arc.maxCardinality == -1; let is_multi = pred_arc.maxCardinality > 1 || pred_arc.maxCardinality == -1;
if is_multi { if is_multi {
// Check if any siblings were previously valid // Check if any siblings were previously valid
let any_sibling_was_valid = tp.tracked_children.iter().any(|child| { let any_sibling_was_valid = tp.tracked_children.iter().any(|child| {

@ -81,54 +81,15 @@ impl Verifier {
Ok(merged) Ok(merged)
} }
/// Helper to call process_changes_for_shape for all subscriptions on nuri's document.
fn process_changes_for_nuri_and_session(
self: &mut Self,
nuri: &NuriV0,
session_id: u64,
triples_added: &[Triple],
triples_removed: &[Triple],
data_already_fetched: bool,
) -> Result<OrmChanges, NgError> {
let mut orm_changes = HashMap::new();
let shapes: Vec<_> = self
.orm_subscriptions
.get(nuri)
.unwrap()
.iter()
.map(|sub| {
sub.shape_type
.schema
.get(&sub.shape_type.shape)
.unwrap()
.clone()
})
.collect();
for root_shape in shapes {
self.process_changes_for_shape_and_session(
nuri,
root_shape,
session_id,
triples_added,
triples_removed,
&mut orm_changes,
data_already_fetched,
)?;
}
Ok(orm_changes)
}
/// Add and remove the triples from the tracked subjects, /// Add and remove the triples from the tracked subjects,
/// re-validate, and update `changes` containing the updated data. /// re-validate, and update `changes` containing the updated data.
/// Works by queuing changes by shape and subjects on a stack. /// Works by queuing changes by shape and subjects on a stack.
/// Nested objects are added to the stack /// Nested objects are added to the stack
pub(crate) fn process_changes_for_shape_and_session( pub(crate) fn process_changes_for_shape_and_session(
self: &mut Self, &mut self,
nuri: &NuriV0, nuri: &NuriV0,
root_shape: Arc<OrmSchemaShape>, root_shape_iri: &String,
shape: Arc<OrmSchemaShape>,
session_id: u64, session_id: u64,
triples_added: &[Triple], triples_added: &[Triple],
triples_removed: &[Triple], triples_removed: &[Triple],
@ -140,8 +101,7 @@ impl Verifier {
// Track (shape_iri, subject_iri) pairs currently being validated to prevent cycles and double evaluation. // Track (shape_iri, subject_iri) pairs currently being validated to prevent cycles and double evaluation.
let mut currently_validating: HashSet<(String, String)> = HashSet::new(); let mut currently_validating: HashSet<(String, String)> = HashSet::new();
// Add root shape for first validation run. // Add root shape for first validation run.
let root_shape_iri = root_shape.iri.clone(); shape_validation_stack.push((shape, vec![]));
shape_validation_stack.push((root_shape, vec![]));
// Process queue of shapes and subjects to validate. // Process queue of shapes and subjects to validate.
// For a given shape, we evaluate every subject against that shape. // For a given shape, we evaluate every subject against that shape.
@ -156,14 +116,6 @@ impl Verifier {
.chain(removed_triples_by_subject.keys()) .chain(removed_triples_by_subject.keys())
.collect(); .collect();
let mut orm_subscription = self
.orm_subscriptions
.get_mut(nuri)
.unwrap()
.iter_mut()
.find(|sub| sub.session_id == session_id && sub.shape_type.shape == root_shape_iri)
.unwrap();
// Variable to collect nested objects that need validation. // Variable to collect nested objects that need validation.
let mut nested_objects_to_eval: HashMap<ShapeIri, Vec<(SubjectIri, bool)>> = let mut nested_objects_to_eval: HashMap<ShapeIri, Vec<(SubjectIri, bool)>> =
HashMap::new(); HashMap::new();
@ -185,8 +137,13 @@ impl Verifier {
subject_iri, subject_iri,
shape.iri shape.iri
); );
// Mark as invalid due to cycle
// TODO: We could handle this by handling nested references as IRIs. // Find tracked and mark as invalid.
let orm_subscription = &mut self.get_first_orm_subscription_for(
nuri,
Some(&root_shape_iri),
Some(&session_id),
);
if let Some(tracked_shapes) = if let Some(tracked_shapes) =
orm_subscription.tracked_subjects.get(*subject_iri) orm_subscription.tracked_subjects.get(*subject_iri)
{ {
@ -231,12 +188,23 @@ impl Verifier {
"Adding triples to change tracker for subject {}", "Adding triples to change tracker for subject {}",
subject_iri subject_iri
); );
let orm_subscription = self
.orm_subscriptions
.get_mut(nuri)
.unwrap()
.iter_mut()
.find(|sub| {
sub.shape_type.shape == shape.iri && sub.session_id == session_id
})
.unwrap();
if let Err(e) = add_remove_triples( if let Err(e) = add_remove_triples(
shape.clone(), shape.clone(),
subject_iri, subject_iri,
triples_added_for_subj, triples_added_for_subj,
triples_removed_for_subj, triples_removed_for_subj,
&mut orm_subscription, orm_subscription,
change, change,
) { ) {
log_err!("apply_changes_from_triples add/remove error: {:?}", e); log_err!("apply_changes_from_triples add/remove error: {:?}", e);
@ -247,9 +215,18 @@ impl Verifier {
log_debug!("not applying triples again for subject {subject_iri}"); log_debug!("not applying triples again for subject {subject_iri}");
} }
let orm_subscription = self
.orm_subscriptions
.get_mut(nuri)
.unwrap()
.iter_mut()
.find(|sub| {
sub.shape_type.shape == shape.iri && sub.session_id == session_id
})
.unwrap();
// Validate the subject. // Validate the subject.
let need_eval = let need_eval = Self::update_subject_validity(change, &shape, orm_subscription);
Self::update_subject_validity(change, &shape, &mut orm_subscription);
// We add the need_eval to be processed next after loop. // We add the need_eval to be processed next after loop.
// Filter out subjects already in the validation stack to prevent double evaluation. // Filter out subjects already in the validation stack to prevent double evaluation.
@ -268,13 +245,15 @@ impl Verifier {
// Now, we queue all non-evaluated objects // Now, we queue all non-evaluated objects
for (shape_iri, objects_to_eval) in &nested_objects_to_eval { for (shape_iri, objects_to_eval) in &nested_objects_to_eval {
let orm_subscription = self.get_first_orm_subscription_for( // Extract schema and shape Arc first (before any borrows)
nuri, let schema = {
Some(&root_shape_iri), let orm_sub = self.get_first_orm_subscription_for(
Some(&session_id), nuri,
); Some(&root_shape_iri),
// Extract schema and shape Arc before mutable borrow Some(&session_id),
let schema = orm_subscription.shape_type.schema.clone(); );
orm_sub.shape_type.schema.clone()
};
let shape_arc = schema.get(shape_iri).unwrap().clone(); let shape_arc = schema.get(shape_iri).unwrap().clone();
// Data might need to be fetched (if it has not been during initialization or nested shape fetch). // Data might need to be fetched (if it has not been during initialization or nested shape fetch).
@ -294,6 +273,7 @@ impl Verifier {
// Recursively process nested objects. // Recursively process nested objects.
self.process_changes_for_shape_and_session( self.process_changes_for_shape_and_session(
nuri, nuri,
&root_shape_iri,
shape_arc.clone(), shape_arc.clone(),
session_id, session_id,
&new_triples, &new_triples,
@ -323,23 +303,47 @@ impl Verifier {
Ok(()) Ok(())
} }
/// Helper to get orm subscriptions for nuri, shapes and sessions. /// Helper to call process_changes_for_shape for all subscriptions on nuri's document.
pub fn get_orm_subscriptions_for( fn process_changes_for_nuri_and_session(
&self, self: &mut Self,
nuri: &NuriV0, nuri: &NuriV0,
shape: Option<&ShapeIri>, session_id: u64,
session_id: Option<&u64>, triples_added: &[Triple],
) -> Vec<&OrmSubscription> { triples_removed: &[Triple],
self.orm_subscriptions.get(nuri).unwrap(). data_already_fetched: bool,
// Filter shapes, if present. ) -> Result<OrmChanges, NgError> {
iter().filter(|s| match shape { let mut orm_changes = HashMap::new();
Some(sh) => *sh == s.shape_type.shape,
None => true let shapes: Vec<_> = self
// Filter session ids if present. .orm_subscriptions
}).filter(|s| match session_id { .get(nuri)
Some(id) => *id == s.session_id, .unwrap()
None => true .iter()
}).collect() .map(|sub| {
sub.shape_type
.schema
.get(&sub.shape_type.shape)
.unwrap()
.clone()
})
.collect();
for root_shape in shapes {
let shape_iri = root_shape.iri.clone();
// Now we can safely call the method with self
self.process_changes_for_shape_and_session(
nuri,
&shape_iri,
root_shape,
session_id,
triples_added,
triples_removed,
&mut orm_changes,
data_already_fetched,
)?;
}
Ok(orm_changes)
} }
pub fn get_first_orm_subscription_for( pub fn get_first_orm_subscription_for(

Loading…
Cancel
Save