prevent double evaluation, improve cycle detection, proper child deletion

feat/orm
Laurin Weger 1 week ago
parent 35dc5e2c93
commit 43c3356942
No known key found for this signature in database
GPG Key ID: 9B372BB0B792770F
  1. 14
      ng-verifier/src/orm/add_remove_triples.rs
  2. 46
      ng-verifier/src/orm/mod.rs
  3. 159
      ng-verifier/src/orm/validation.rs

@ -124,7 +124,7 @@ pub fn add_remove_triples(
None None
} }
}) { }) {
log_debug!("dealing with nesting for {shape_iri}"); // log_debug!("dealing with nesting for {shape_iri}");
if let BasicType::Str(obj_iri) = &obj_term { if let BasicType::Str(obj_iri) = &obj_term {
let tracked_child_arc = { let tracked_child_arc = {
// Get or create object's tracked subject struct. // Get or create object's tracked subject struct.
@ -143,19 +143,19 @@ pub fn add_remove_triples(
.unwrap() .unwrap()
.parents .parents
.insert(subject_iri.to_string(), parent); .insert(subject_iri.to_string(), parent);
log_debug!("lock acquired on tracked_child {obj_iri}"); // log_debug!("lock acquired on tracked_child {obj_iri}");
tracked_child tracked_child
}; };
// Add link to children // Add link to children
let mut tracked_predicate = tracked_predicate_lock.write().unwrap(); let mut tracked_predicate = tracked_predicate_lock.write().unwrap();
log_debug!( // log_debug!(
"for children, lock acquired on tracked_predicate {}", // "for children, lock acquired on tracked_predicate {}",
predicate_schema.iri // predicate_schema.iri
); // );
tracked_predicate.tracked_children.push(tracked_child_arc); tracked_predicate.tracked_children.push(tracked_child_arc);
} }
log_debug!("end of dealing with nesting"); // log_debug!("end of dealing with nesting");
} }
} }
} }

@ -127,6 +127,8 @@ impl Verifier {
/// Add and remove the triples from the tracked subjects, /// Add and remove the triples from the tracked subjects,
/// re-validate, and update `changes` containing the updated data. /// re-validate, and update `changes` containing the updated data.
/// Works by queuing changes by shape and subjects on a stack.
/// Nested objects are added to the stack
fn process_changes_for_shape_and_session( fn process_changes_for_shape_and_session(
self: &mut Self, self: &mut Self,
nuri: &NuriV0, nuri: &NuriV0,
@ -138,14 +140,16 @@ impl Verifier {
data_already_fetched: bool, data_already_fetched: bool,
) -> Result<(), NgError> { ) -> Result<(), NgError> {
// First in, last out stack to keep track of objects to validate (nested objects first). Strings are object IRIs. // First in, last out stack to keep track of objects to validate (nested objects first). Strings are object IRIs.
let mut shape_validation_queue: Vec<(Arc<OrmSchemaShape>, Vec<String>)> = vec![]; let mut shape_validation_stack: Vec<(Arc<OrmSchemaShape>, Vec<String>)> = vec![];
// Track (shape_iri, subject_iri) pairs currently being validated to prevent cycles and double evaluation.
let mut currently_validating: HashSet<(String, String)> = HashSet::new();
// Add root shape for first validation run. // Add root shape for first validation run.
let root_shape_iri = root_shape.iri.clone(); let root_shape_iri = root_shape.iri.clone();
shape_validation_queue.push((root_shape, vec![])); shape_validation_stack.push((root_shape, vec![]));
// Process queue of shapes and subjects to validate. // Process queue of shapes and subjects to validate.
// For a given shape, we evaluate every subject against that shape. // For a given shape, we evaluate every subject against that shape.
while let Some((shape, objects_to_validate)) = shape_validation_queue.pop() { while let Some((shape, objects_to_validate)) = shape_validation_stack.pop() {
// Collect triples relevant for validation. // Collect triples relevant for validation.
let added_triples_by_subject = let added_triples_by_subject =
group_by_subject_for_shape(&shape, triples_added, &objects_to_validate); group_by_subject_for_shape(&shape, triples_added, &objects_to_validate);
@ -172,6 +176,31 @@ impl Verifier {
log_debug!("all_modified_subjects: {:?}", modified_subject_iris); log_debug!("all_modified_subjects: {:?}", modified_subject_iris);
for subject_iri in modified_subject_iris { for subject_iri in modified_subject_iris {
let validation_key = (shape.iri.clone(), subject_iri.to_string());
// Cycle detection: Check if this (shape, subject) pair is already being validated
if currently_validating.contains(&validation_key) {
log_warn!(
"Cycle detected: subject '{}' with shape '{}' is already being validated. Marking as invalid.",
subject_iri,
shape.iri
);
// Mark as invalid due to cycle
// TODO: We could handle this by handling nested references as IRIs.
if let Some(tracked_shapes) = orm_subscription.tracked_subjects.get(subject_iri)
{
if let Some(tracked_subject) = tracked_shapes.get(&shape.iri) {
let mut ts = tracked_subject.write().unwrap();
ts.valid = OrmTrackedSubjectValidity::Invalid;
ts.tracked_predicates.clear();
}
}
continue;
}
// Mark as currently validating
currently_validating.insert(validation_key.clone());
let triples_added_for_subj = added_triples_by_subject let triples_added_for_subj = added_triples_by_subject
.get(subject_iri) .get(subject_iri)
.map(|v| v.as_slice()) .map(|v| v.as_slice())
@ -236,8 +265,11 @@ impl Verifier {
); );
// We add the need_eval to be processed next after loop. // We add the need_eval to be processed next after loop.
// Filter out subjects already in the validation stack to prevent double evaluation.
for (iri, schema_shape, needs_refetch) in need_eval { for (iri, schema_shape, needs_refetch) in need_eval {
// Add to nested_objects_to_validate. let eval_key = (schema_shape.clone(), iri.clone());
if !currently_validating.contains(&eval_key) {
// Only add if not currently being validated
nested_objects_to_eval nested_objects_to_eval
.entry(schema_shape) .entry(schema_shape)
.or_insert_with(Vec::new) .or_insert_with(Vec::new)
@ -246,6 +278,10 @@ impl Verifier {
} }
} }
// Remove from validation stack after processing this subject
currently_validating.remove(&validation_key);
}
// Now, we queue all non-evaluated objects // Now, we queue all non-evaluated objects
for (shape_iri, objects_to_eval) in &nested_objects_to_eval { for (shape_iri, objects_to_eval) in &nested_objects_to_eval {
let orm_subscription = self.get_first_orm_subscription_for( let orm_subscription = self.get_first_orm_subscription_for(
@ -291,7 +327,7 @@ impl Verifier {
.collect(); .collect();
if objects_not_to_fetch.len() > 0 { if objects_not_to_fetch.len() > 0 {
// Queue all objects that don't need fetching. // Queue all objects that don't need fetching.
shape_validation_queue.push((shape_arc, objects_not_to_fetch)); shape_validation_stack.push((shape_arc, objects_not_to_fetch));
} }
} }
} }

@ -7,14 +7,13 @@
// notice may not be copied, modified, or distributed except // notice may not be copied, modified, or distributed except
// according to those terms. // according to those terms.
use std::collections::HashMap;
use std::collections::HashSet;
use crate::orm::types::*; use crate::orm::types::*;
use crate::verifier::*; use crate::verifier::*;
use ng_net::orm::*; use ng_net::orm::*;
use ng_repo::log::*; use ng_repo::log::*;
type NeedsFetchBool = bool;
impl Verifier { impl Verifier {
/// Check the validity of a subject and update affecting tracked subjects' validity. /// Check the validity of a subject and update affecting tracked subjects' validity.
/// Might return nested objects that need to be validated. /// Might return nested objects that need to be validated.
@ -24,7 +23,7 @@ impl Verifier {
shape: &OrmSchemaShape, shape: &OrmSchemaShape,
orm_subscription: &mut OrmSubscription, orm_subscription: &mut OrmSubscription,
previous_validity: OrmTrackedSubjectValidity, previous_validity: OrmTrackedSubjectValidity,
) -> Vec<(String, String, bool)> { ) -> Vec<(SubjectIri, ShapeIri, NeedsFetchBool)> {
let tracked_subjects = &mut orm_subscription.tracked_subjects; let tracked_subjects = &mut orm_subscription.tracked_subjects;
let Some(tracked_shapes) = tracked_subjects.get(&s_change.subject_iri) else { let Some(tracked_shapes) = tracked_subjects.get(&s_change.subject_iri) else {
@ -37,29 +36,47 @@ impl Verifier {
// Keep track of objects that need to be validated against a shape to fetch and validate. // Keep track of objects that need to be validated against a shape to fetch and validate.
let mut need_evaluation: Vec<(String, String, bool)> = vec![]; let mut need_evaluation: Vec<(String, String, bool)> = vec![];
// Check 1) Check if we need to fetch this object or all parents are untracked. // Check 1) Check if this object is untracked and we need to remove children and ourselves.
if tracked_subject.parents.len() != 0 { if previous_validity == OrmTrackedSubjectValidity::Untracked {
let no_parents_tracking = tracked_subject.parents.values().all(|parent| { // 1.1) Schedule children for deletion
let subject = parent.read().unwrap(); // 1.1.1) Set all children to `untracked` that don't have other parents.
subject.valid == OrmTrackedSubjectValidity::Untracked for tracked_predicate in tracked_subject.tracked_predicates.values() {
|| subject.valid == OrmTrackedSubjectValidity::Invalid for child in &tracked_predicate.write().unwrap().tracked_children {
}); let mut tracked_child = child.write().unwrap();
if tracked_child.parents.is_empty()
if no_parents_tracking { || (tracked_child.parents.len() == 1
// Remove tracked predicates and set untracked. && tracked_child
tracked_subject.tracked_predicates = HashMap::new(); .parents
tracked_subject.valid = OrmTrackedSubjectValidity::Untracked; .contains_key(&tracked_subject.subject_iri))
return vec![];
} else if !no_parents_tracking
&& previous_validity == OrmTrackedSubjectValidity::Untracked
{ {
// We need to fetch the subject's current state: tracked_child.valid = OrmTrackedSubjectValidity::Untracked;
// We have new parents but were previously not recording changes. }
// Return the subject_iri with `needs_fetch` set to true.
return vec![(s_change.subject_iri.clone(), shape.iri.clone(), true)];
} }
} }
// 1.1.2) Add all children to need_evaluation for their cleanup.
for tracked_predicate in tracked_subject.tracked_predicates.values() {
for child in &tracked_predicate.write().unwrap().tracked_children {
let child = child.read().unwrap();
need_evaluation.push((
child.subject_iri.clone(),
child.shape.iri.clone(),
false,
));
}
}
// 1.2) If we don't have parents, we need to remove ourself too.
if tracked_subject.parents.is_empty() {
// Drop the guard to release the immutable borrow
drop(tracked_subject);
tracked_subjects.remove(&s_change.subject_iri);
}
return need_evaluation;
}
// Check 2) If there are no changes, there is nothing to do. // Check 2) If there are no changes, there is nothing to do.
if s_change.predicates.is_empty() { if s_change.predicates.is_empty() {
return vec![]; return vec![];
@ -77,16 +94,7 @@ impl Verifier {
} }
} }
// Check 3) If there is an infinite loop of parents pointing back to us, return invalid. // Check 3) Validate subject against each predicate in shape.
// Create a set of visited parents to detect cycles.
if has_cycle(&tracked_subject, &mut HashSet::new()) {
// Remove tracked predicates and set invalid.
tracked_subject.tracked_predicates = HashMap::new();
tracked_subject.valid = OrmTrackedSubjectValidity::Invalid;
return vec![];
}
// Check 4) Validate subject against each predicate in shape.
for p_schema in shape.predicates.iter() { for p_schema in shape.predicates.iter() {
let p_change = s_change.predicates.get(&p_schema.iri); let p_change = s_change.predicates.get(&p_schema.iri);
let tracked_pred = p_change.map(|pc| pc.tracked_predicate.read().unwrap()); let tracked_pred = p_change.map(|pc| pc.tracked_predicate.read().unwrap());
@ -95,7 +103,7 @@ impl Verifier {
.as_ref() .as_ref()
.map_or_else(|| 0, |tp| tp.current_cardinality); .map_or_else(|| 0, |tp| tp.current_cardinality);
// Check 4.1) Cardinality // Check 3.1) Cardinality
if count < p_schema.minCardinality { if count < p_schema.minCardinality {
log_debug!( log_debug!(
"[VALIDATION] Invalid: minCardinality not met | predicate: {:?} | count: {} | min: {} | schema: {:?} | changed: {:?}", "[VALIDATION] Invalid: minCardinality not met | predicate: {:?} | count: {} | min: {} | schema: {:?} | changed: {:?}",
@ -111,7 +119,7 @@ impl Verifier {
tracked_subject.tracked_predicates.remove(&p_schema.iri); tracked_subject.tracked_predicates.remove(&p_schema.iri);
} }
break; break;
// Check 4.2) Cardinality too high and extra values not allowed. // Check 3.2) Cardinality too high and extra values not allowed.
} else if count > p_schema.maxCardinality } else if count > p_schema.maxCardinality
&& p_schema.maxCardinality != -1 && p_schema.maxCardinality != -1
&& p_schema.extra != Some(true) && p_schema.extra != Some(true)
@ -127,7 +135,7 @@ impl Verifier {
// If cardinality is too high and no extra allowed, invalid. // If cardinality is too high and no extra allowed, invalid.
set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid);
break; break;
// Check 4.3) Required literals present. // Check 3.3) Required literals present.
} else if p_schema } else if p_schema
.dataTypes .dataTypes
.iter() .iter()
@ -170,7 +178,7 @@ impl Verifier {
); );
set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid);
} }
// Check 4.4) Nested shape correct. // Check 3.4) Nested shape correct.
} else if p_schema } else if p_schema
.dataTypes .dataTypes
.iter() .iter()
@ -279,7 +287,7 @@ impl Verifier {
// All nested objects are valid and cardinality is correct. // All nested objects are valid and cardinality is correct.
// We are valid with this predicate. // We are valid with this predicate.
} }
// Check 4.5) Data types correct. // Check 3.5) Data types correct.
} else { } else {
// Check if the data type is correct. // Check if the data type is correct.
let allowed_types: Vec<&OrmSchemaLiteralType> = let allowed_types: Vec<&OrmSchemaLiteralType> =
@ -320,29 +328,68 @@ impl Verifier {
tracked_subject.valid = new_validity.clone(); tracked_subject.valid = new_validity.clone();
if new_validity == OrmTrackedSubjectValidity::Invalid { if new_validity == OrmTrackedSubjectValidity::Invalid {
// If we are invalid, we can discard new unknowns again - they won't be kept in memory. // For invalid subjects, we need to to cleanup.
// We need to remove ourself from child objects' parents field and
// remove them if no other is tracking.
// TODO: Child relationship cleanup disabled (nested tracking disabled in this refactor step) let has_parents = !tracked_subject.parents.is_empty();
if has_parents {
// This object is not a root object. Tracked child objects can be dropped.
// We therefore delete the child -> parent links.
// Untracked objects (with no parents) will be deleted in the subsequent child validation.
for tracked_predicate in tracked_subject.tracked_predicates.values() {
for child in &tracked_predicate.write().unwrap().tracked_children {
child
.write()
.unwrap()
.parents
.remove(&tracked_subject.subject_iri);
}
}
} else {
// This is a root objects, we will set the children to untracked
// but don't delete the child > parent relationship.
}
// Set all children to `untracked` that don't have other parents.
for tracked_predicate in tracked_subject.tracked_predicates.values() {
for child in &tracked_predicate.write().unwrap().tracked_children {
let mut tracked_child = child.write().unwrap();
if tracked_child.parents.is_empty()
|| (tracked_child.parents.len() == 1
&& tracked_child
.parents
.contains_key(&tracked_subject.subject_iri))
{
tracked_child.valid = OrmTrackedSubjectValidity::Untracked;
}
}
}
// Remove tracked predicates and set untracked. // Add all children to need_evaluation for their cleanup.
tracked_subject.tracked_predicates = HashMap::new(); for tracked_predicate in tracked_subject.tracked_predicates.values() {
for child in &tracked_predicate.write().unwrap().tracked_children {
let child = child.read().unwrap();
need_evaluation.push((
child.subject_iri.clone(),
child.shape.iri.clone(),
false,
));
}
}
// Empty list of children that need evaluation. // Remove all tracked_predicates.
need_evaluation.retain(|_| false); tracked_subject.tracked_predicates.clear();
} else if new_validity == OrmTrackedSubjectValidity::Valid } else if new_validity == OrmTrackedSubjectValidity::Valid
&& previous_validity != OrmTrackedSubjectValidity::Valid && previous_validity != OrmTrackedSubjectValidity::Valid
{ {
// If this subject became valid, we need to refetch this subject. // If this subject became valid, we need to refetch this subject.
// If the data has already been fetched, the parent function will prevent the fetch. // If the data has already been fetched, the parent function will prevent the refetch.
need_evaluation.insert(0, (s_change.subject_iri.clone(), shape.iri.clone(), true)); need_evaluation.insert(0, (s_change.subject_iri.clone(), shape.iri.clone(), true));
} }
// If validity changed, parents need to be re-evaluated. // If validity changed, parents need to be re-evaluated.
if new_validity != previous_validity { if new_validity != previous_validity {
// We return the tracking parents which need re-evaluation. // Parents that are not tracking this subject, don't need to be added.
// Remember that the last elements (i.e. children or needs_fetch) are evaluated first. // Remember that the last elements are evaluated first.
return tracked_subject return tracked_subject
.parents .parents
.values() .values()
@ -358,17 +405,3 @@ impl Verifier {
return need_evaluation; return need_evaluation;
} }
} }
fn has_cycle(subject: &OrmTrackedSubject, visited: &mut HashSet<String>) -> bool {
if visited.contains(&subject.subject_iri) {
return true;
}
visited.insert(subject.subject_iri.clone());
for (_parent_iri, parent_subject) in &subject.parents {
if has_cycle(&parent_subject.read().unwrap(), visited) {
return true;
}
}
visited.remove(&subject.subject_iri);
false
}

Loading…
Cancel
Save