diff --git a/ng-verifier/src/orm/add_remove_triples.rs b/ng-verifier/src/orm/add_remove_triples.rs index 2e94255..cfc7c94 100644 --- a/ng-verifier/src/orm/add_remove_triples.rs +++ b/ng-verifier/src/orm/add_remove_triples.rs @@ -124,7 +124,7 @@ pub fn add_remove_triples( None } }) { - log_debug!("dealing with nesting for {shape_iri}"); + // log_debug!("dealing with nesting for {shape_iri}"); if let BasicType::Str(obj_iri) = &obj_term { let tracked_child_arc = { // Get or create object's tracked subject struct. @@ -143,19 +143,19 @@ pub fn add_remove_triples( .unwrap() .parents .insert(subject_iri.to_string(), parent); - log_debug!("lock acquired on tracked_child {obj_iri}"); + // log_debug!("lock acquired on tracked_child {obj_iri}"); tracked_child }; // Add link to children let mut tracked_predicate = tracked_predicate_lock.write().unwrap(); - log_debug!( - "for children, lock acquired on tracked_predicate {}", - predicate_schema.iri - ); + // log_debug!( + // "for children, lock acquired on tracked_predicate {}", + // predicate_schema.iri + // ); tracked_predicate.tracked_children.push(tracked_child_arc); } - log_debug!("end of dealing with nesting"); + // log_debug!("end of dealing with nesting"); } } } diff --git a/ng-verifier/src/orm/mod.rs b/ng-verifier/src/orm/mod.rs index 475306f..0c742a7 100644 --- a/ng-verifier/src/orm/mod.rs +++ b/ng-verifier/src/orm/mod.rs @@ -127,6 +127,8 @@ impl Verifier { /// Add and remove the triples from the tracked subjects, /// re-validate, and update `changes` containing the updated data. + /// Works by queuing changes by shape and subjects on a stack. + /// Nested objects are added to the stack fn process_changes_for_shape_and_session( self: &mut Self, nuri: &NuriV0, @@ -138,14 +140,16 @@ impl Verifier { data_already_fetched: bool, ) -> Result<(), NgError> { // First in, last out stack to keep track of objects to validate (nested objects first). Strings are object IRIs. - let mut shape_validation_queue: Vec<(Arc, Vec)> = vec![]; + let mut shape_validation_stack: Vec<(Arc, Vec)> = vec![]; + // Track (shape_iri, subject_iri) pairs currently being validated to prevent cycles and double evaluation. + let mut currently_validating: HashSet<(String, String)> = HashSet::new(); // Add root shape for first validation run. let root_shape_iri = root_shape.iri.clone(); - shape_validation_queue.push((root_shape, vec![])); + shape_validation_stack.push((root_shape, vec![])); // Process queue of shapes and subjects to validate. // For a given shape, we evaluate every subject against that shape. - while let Some((shape, objects_to_validate)) = shape_validation_queue.pop() { + while let Some((shape, objects_to_validate)) = shape_validation_stack.pop() { // Collect triples relevant for validation. let added_triples_by_subject = group_by_subject_for_shape(&shape, triples_added, &objects_to_validate); @@ -172,6 +176,31 @@ impl Verifier { log_debug!("all_modified_subjects: {:?}", modified_subject_iris); for subject_iri in modified_subject_iris { + let validation_key = (shape.iri.clone(), subject_iri.to_string()); + + // Cycle detection: Check if this (shape, subject) pair is already being validated + if currently_validating.contains(&validation_key) { + log_warn!( + "Cycle detected: subject '{}' with shape '{}' is already being validated. Marking as invalid.", + subject_iri, + shape.iri + ); + // Mark as invalid due to cycle + // TODO: We could handle this by handling nested references as IRIs. + if let Some(tracked_shapes) = orm_subscription.tracked_subjects.get(subject_iri) + { + if let Some(tracked_subject) = tracked_shapes.get(&shape.iri) { + let mut ts = tracked_subject.write().unwrap(); + ts.valid = OrmTrackedSubjectValidity::Invalid; + ts.tracked_predicates.clear(); + } + } + continue; + } + + // Mark as currently validating + currently_validating.insert(validation_key.clone()); + let triples_added_for_subj = added_triples_by_subject .get(subject_iri) .map(|v| v.as_slice()) @@ -236,14 +265,21 @@ impl Verifier { ); // We add the need_eval to be processed next after loop. + // Filter out subjects already in the validation stack to prevent double evaluation. for (iri, schema_shape, needs_refetch) in need_eval { - // Add to nested_objects_to_validate. - nested_objects_to_eval - .entry(schema_shape) - .or_insert_with(Vec::new) - .push((iri.clone(), needs_refetch)); + let eval_key = (schema_shape.clone(), iri.clone()); + if !currently_validating.contains(&eval_key) { + // Only add if not currently being validated + nested_objects_to_eval + .entry(schema_shape) + .or_insert_with(Vec::new) + .push((iri.clone(), needs_refetch)); + } } } + + // Remove from validation stack after processing this subject + currently_validating.remove(&validation_key); } // Now, we queue all non-evaluated objects @@ -291,7 +327,7 @@ impl Verifier { .collect(); if objects_not_to_fetch.len() > 0 { // Queue all objects that don't need fetching. - shape_validation_queue.push((shape_arc, objects_not_to_fetch)); + shape_validation_stack.push((shape_arc, objects_not_to_fetch)); } } } diff --git a/ng-verifier/src/orm/validation.rs b/ng-verifier/src/orm/validation.rs index 35e47c1..be44f18 100644 --- a/ng-verifier/src/orm/validation.rs +++ b/ng-verifier/src/orm/validation.rs @@ -7,14 +7,13 @@ // notice may not be copied, modified, or distributed except // according to those terms. -use std::collections::HashMap; -use std::collections::HashSet; - use crate::orm::types::*; use crate::verifier::*; use ng_net::orm::*; use ng_repo::log::*; +type NeedsFetchBool = bool; + impl Verifier { /// Check the validity of a subject and update affecting tracked subjects' validity. /// Might return nested objects that need to be validated. @@ -24,7 +23,7 @@ impl Verifier { shape: &OrmSchemaShape, orm_subscription: &mut OrmSubscription, previous_validity: OrmTrackedSubjectValidity, - ) -> Vec<(String, String, bool)> { + ) -> Vec<(SubjectIri, ShapeIri, NeedsFetchBool)> { let tracked_subjects = &mut orm_subscription.tracked_subjects; let Some(tracked_shapes) = tracked_subjects.get(&s_change.subject_iri) else { @@ -37,27 +36,45 @@ impl Verifier { // Keep track of objects that need to be validated against a shape to fetch and validate. let mut need_evaluation: Vec<(String, String, bool)> = vec![]; - // Check 1) Check if we need to fetch this object or all parents are untracked. - if tracked_subject.parents.len() != 0 { - let no_parents_tracking = tracked_subject.parents.values().all(|parent| { - let subject = parent.read().unwrap(); - subject.valid == OrmTrackedSubjectValidity::Untracked - || subject.valid == OrmTrackedSubjectValidity::Invalid - }); + // Check 1) Check if this object is untracked and we need to remove children and ourselves. + if previous_validity == OrmTrackedSubjectValidity::Untracked { + // 1.1) Schedule children for deletion + // 1.1.1) Set all children to `untracked` that don't have other parents. + for tracked_predicate in tracked_subject.tracked_predicates.values() { + for child in &tracked_predicate.write().unwrap().tracked_children { + let mut tracked_child = child.write().unwrap(); + if tracked_child.parents.is_empty() + || (tracked_child.parents.len() == 1 + && tracked_child + .parents + .contains_key(&tracked_subject.subject_iri)) + { + tracked_child.valid = OrmTrackedSubjectValidity::Untracked; + } + } + } - if no_parents_tracking { - // Remove tracked predicates and set untracked. - tracked_subject.tracked_predicates = HashMap::new(); - tracked_subject.valid = OrmTrackedSubjectValidity::Untracked; - return vec![]; - } else if !no_parents_tracking - && previous_validity == OrmTrackedSubjectValidity::Untracked - { - // We need to fetch the subject's current state: - // We have new parents but were previously not recording changes. - // Return the subject_iri with `needs_fetch` set to true. - return vec![(s_change.subject_iri.clone(), shape.iri.clone(), true)]; + // 1.1.2) Add all children to need_evaluation for their cleanup. + for tracked_predicate in tracked_subject.tracked_predicates.values() { + for child in &tracked_predicate.write().unwrap().tracked_children { + let child = child.read().unwrap(); + need_evaluation.push(( + child.subject_iri.clone(), + child.shape.iri.clone(), + false, + )); + } } + + // 1.2) If we don't have parents, we need to remove ourself too. + if tracked_subject.parents.is_empty() { + // Drop the guard to release the immutable borrow + drop(tracked_subject); + + tracked_subjects.remove(&s_change.subject_iri); + } + + return need_evaluation; } // Check 2) If there are no changes, there is nothing to do. @@ -77,16 +94,7 @@ impl Verifier { } } - // Check 3) If there is an infinite loop of parents pointing back to us, return invalid. - // Create a set of visited parents to detect cycles. - if has_cycle(&tracked_subject, &mut HashSet::new()) { - // Remove tracked predicates and set invalid. - tracked_subject.tracked_predicates = HashMap::new(); - tracked_subject.valid = OrmTrackedSubjectValidity::Invalid; - return vec![]; - } - - // Check 4) Validate subject against each predicate in shape. + // Check 3) Validate subject against each predicate in shape. for p_schema in shape.predicates.iter() { let p_change = s_change.predicates.get(&p_schema.iri); let tracked_pred = p_change.map(|pc| pc.tracked_predicate.read().unwrap()); @@ -95,7 +103,7 @@ impl Verifier { .as_ref() .map_or_else(|| 0, |tp| tp.current_cardinality); - // Check 4.1) Cardinality + // Check 3.1) Cardinality if count < p_schema.minCardinality { log_debug!( "[VALIDATION] Invalid: minCardinality not met | predicate: {:?} | count: {} | min: {} | schema: {:?} | changed: {:?}", @@ -111,7 +119,7 @@ impl Verifier { tracked_subject.tracked_predicates.remove(&p_schema.iri); } break; - // Check 4.2) Cardinality too high and extra values not allowed. + // Check 3.2) Cardinality too high and extra values not allowed. } else if count > p_schema.maxCardinality && p_schema.maxCardinality != -1 && p_schema.extra != Some(true) @@ -127,7 +135,7 @@ impl Verifier { // If cardinality is too high and no extra allowed, invalid. set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); break; - // Check 4.3) Required literals present. + // Check 3.3) Required literals present. } else if p_schema .dataTypes .iter() @@ -170,7 +178,7 @@ impl Verifier { ); set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); } - // Check 4.4) Nested shape correct. + // Check 3.4) Nested shape correct. } else if p_schema .dataTypes .iter() @@ -279,7 +287,7 @@ impl Verifier { // All nested objects are valid and cardinality is correct. // We are valid with this predicate. } - // Check 4.5) Data types correct. + // Check 3.5) Data types correct. } else { // Check if the data type is correct. let allowed_types: Vec<&OrmSchemaLiteralType> = @@ -320,29 +328,68 @@ impl Verifier { tracked_subject.valid = new_validity.clone(); if new_validity == OrmTrackedSubjectValidity::Invalid { - // If we are invalid, we can discard new unknowns again - they won't be kept in memory. - // We need to remove ourself from child objects' parents field and - // remove them if no other is tracking. + // For invalid subjects, we need to to cleanup. - // TODO: Child relationship cleanup disabled (nested tracking disabled in this refactor step) + let has_parents = !tracked_subject.parents.is_empty(); + if has_parents { + // This object is not a root object. Tracked child objects can be dropped. + // We therefore delete the child -> parent links. + // Untracked objects (with no parents) will be deleted in the subsequent child validation. + for tracked_predicate in tracked_subject.tracked_predicates.values() { + for child in &tracked_predicate.write().unwrap().tracked_children { + child + .write() + .unwrap() + .parents + .remove(&tracked_subject.subject_iri); + } + } + } else { + // This is a root objects, we will set the children to untracked + // but don't delete the child > parent relationship. + } - // Remove tracked predicates and set untracked. - tracked_subject.tracked_predicates = HashMap::new(); + // Set all children to `untracked` that don't have other parents. + for tracked_predicate in tracked_subject.tracked_predicates.values() { + for child in &tracked_predicate.write().unwrap().tracked_children { + let mut tracked_child = child.write().unwrap(); + if tracked_child.parents.is_empty() + || (tracked_child.parents.len() == 1 + && tracked_child + .parents + .contains_key(&tracked_subject.subject_iri)) + { + tracked_child.valid = OrmTrackedSubjectValidity::Untracked; + } + } + } - // Empty list of children that need evaluation. - need_evaluation.retain(|_| false); + // Add all children to need_evaluation for their cleanup. + for tracked_predicate in tracked_subject.tracked_predicates.values() { + for child in &tracked_predicate.write().unwrap().tracked_children { + let child = child.read().unwrap(); + need_evaluation.push(( + child.subject_iri.clone(), + child.shape.iri.clone(), + false, + )); + } + } + + // Remove all tracked_predicates. + tracked_subject.tracked_predicates.clear(); } else if new_validity == OrmTrackedSubjectValidity::Valid && previous_validity != OrmTrackedSubjectValidity::Valid { // If this subject became valid, we need to refetch this subject. - // If the data has already been fetched, the parent function will prevent the fetch. + // If the data has already been fetched, the parent function will prevent the refetch. need_evaluation.insert(0, (s_change.subject_iri.clone(), shape.iri.clone(), true)); } // If validity changed, parents need to be re-evaluated. if new_validity != previous_validity { - // We return the tracking parents which need re-evaluation. - // Remember that the last elements (i.e. children or needs_fetch) are evaluated first. + // Parents that are not tracking this subject, don't need to be added. + // Remember that the last elements are evaluated first. return tracked_subject .parents .values() @@ -358,17 +405,3 @@ impl Verifier { return need_evaluation; } } - -fn has_cycle(subject: &OrmTrackedSubject, visited: &mut HashSet) -> bool { - if visited.contains(&subject.subject_iri) { - return true; - } - visited.insert(subject.subject_iri.clone()); - for (_parent_iri, parent_subject) in &subject.parents { - if has_cycle(&parent_subject.read().unwrap(), visited) { - return true; - } - } - visited.remove(&subject.subject_iri); - false -}