From c9d26d87f2e07c8083cebb8e847e8c8ce6e21e02 Mon Sep 17 00:00:00 2001 From: Laurin Weger Date: Wed, 1 Oct 2025 16:25:10 +0200 Subject: [PATCH] wip --- ng-net/src/orm.rs | 13 +- ng-verifier/src/orm.rs | 450 +++++++++++++++++++++++------------------ 2 files changed, 255 insertions(+), 208 deletions(-) diff --git a/ng-net/src/orm.rs b/ng-net/src/orm.rs index 2d6a6de..d46f442 100644 --- a/ng-net/src/orm.rs +++ b/ng-net/src/orm.rs @@ -11,6 +11,7 @@ #![allow(non_snake_case)] +use std::collections::HashSet; use std::{collections::HashMap, rc::Weak}; use serde::{Deserialize, Serialize}; @@ -100,14 +101,14 @@ pub struct OrmSchemaPredicate { #[derive(Clone, Debug)] pub struct OrmSubscription<'a> { pub sender: Sender, - pub tracked_objects: HashMap>, + pub tracked_objects: HashMap>, } #[derive(Clone, Debug)] -pub struct OrmTrackedSubject<'a> { +pub struct OrmTrackedSubjectAndShape<'a> { pub tracked_predicates: HashMap>, // Parents and if they are currently tracking us. - pub parents: HashMap, bool)>, + pub parents: HashMap, bool)>, pub valid: OrmTrackedSubjectValidity, pub subj_iri: &'a String, pub shape: &'a OrmSchemaShape, @@ -117,14 +118,15 @@ pub struct OrmTrackedSubject<'a> { pub enum OrmTrackedSubjectValidity { Valid, Invalid, - Unknown, + NotEvaluated, Untracked, + NeedsFetch, } #[derive(Clone, Debug)] pub struct OrmTrackedPredicate<'a> { pub schema: &'a OrmSchemaPredicate, - pub tracked_children: Vec>>, + pub tracked_children: Vec>>, pub current_cardinality: i32, pub current_literals: Option>, } @@ -135,7 +137,6 @@ pub struct OrmTrackedSubjectChange<'a> { pub subject_iri: String, pub predicates: HashMap>, pub valid: OrmTrackedSubjectValidity, - pub tracked_subject: &'a OrmTrackedSubject<'a>, } pub struct OrmTrackedPredicateChanges<'a> { pub tracked_predicate: &'a OrmTrackedPredicate<'a>, diff --git a/ng-verifier/src/orm.rs b/ng-verifier/src/orm.rs index c57bc03..cfa5bc1 100644 --- a/ng-verifier/src/orm.rs +++ b/ng-verifier/src/orm.rs @@ -25,10 +25,9 @@ use ng_net::orm::OrmShapeTypeRef; use ng_net::orm::OrmSubscription; use ng_net::orm::OrmTrackedPredicate; use ng_net::orm::OrmTrackedPredicateChanges; -use ng_net::orm::OrmTrackedSubject; +use ng_net::orm::OrmTrackedSubjectAndShape; use ng_net::orm::OrmTrackedSubjectChange; use ng_net::orm::OrmTrackedSubjectValidity; -use ng_net::orm::Term; use ng_net::orm::{OrmSchemaDataType, OrmSchemaShape}; use ng_net::{app_protocol::*, orm::OrmSchema}; use ng_net::{ @@ -39,12 +38,13 @@ use ng_oxigraph::oxigraph::sparql::{results::*, Query, QueryResults}; use ng_oxigraph::oxrdf::LiteralRef; use ng_oxigraph::oxrdf::NamedNode; use ng_oxigraph::oxrdf::Subject; -use ng_oxigraph::oxrdf::Term; use ng_oxigraph::oxrdf::Triple; use ng_repo::errors::NgError; use ng_repo::errors::VerifierError; use ng_repo::log::*; use regex::Regex; +use serde_json::json; +use serde_json::Value; use crate::types::*; use crate::verifier::*; @@ -92,94 +92,60 @@ impl Verifier { &mut self, scope: &NuriV0, schema: &OrmSchema, - shape: &String, + root_shape: &OrmSchemaShape, triples_added: &Vec, triples_removed: &Vec, - ) { - let tracked_subjects: HashMap> = - self.orm_tracked_subjects; - // Structure to store changes in. - let mut subject_changes: HashMap = HashMap::new(); - - // Group triples by predicate (only keep predicates defined in the shape). Drop others. - let mut added_triples_by_pred: HashMap> = HashMap::new(); - let Some(shape_def) = schema.get(shape) else { - log_err!( - "Shape {} not found in schema when grouping triples by predicate", - shape - ); - return; - }; - - // Collect allowed predicate IRIs for this shape - let allowed: std::collections::HashSet<&str> = shape_def - .predicates - .iter() - .map(|p| p.iri.as_str()) - .collect(); - - for triple in triples_added { - if allowed.contains(triple.predicate.as_str()) { - added_triples_by_pred - .entry(triple.predicate.as_str().to_string()) - .or_insert_with(|| vec![]) - .push(*triple); + ) -> HashMap> { + let tracked_subjects = self.orm_tracked_subjects; + + // === Helper functions === + + fn group_by_subject_for_shape<'a>( + shape: &'a OrmSchemaShape, + triples: &'a Vec, + allowed_subjects: &Vec, + ) -> HashMap> { + let mut triples_by_pred: HashMap> = HashMap::new(); + let allowed_preds: HashSet<&str> = + shape.predicates.iter().map(|p| p.iri.as_str()).collect(); + let allowed_objs: HashSet<&String> = allowed_subjects.iter().collect(); + for triple in triples { + // triple.subject must be in allowed_subjects (or allowed_subjects empty) + // and triple.predicate must be in allowed_preds. + if (allowed_objs.is_empty() || allowed_objs.contains(&triple.subject.to_string())) + && allowed_preds.contains(triple.predicate.as_str()) + { + triples_by_pred + .entry(triple.predicate.as_str().to_string()) + .or_insert_with(|| vec![]) + .push(triple); + } } - } - - // Based on those triples, group by subject. - let mut added_triples_by_subject: HashMap> = HashMap::new(); - for triple in triples_added { - let subject_iri = match &triple.subject { - Subject::NamedNode(node) => node.as_str(), - _ => continue, // Won't happen. - }; - added_triples_by_subject - .entry(subject_iri.to_string()) - .or_insert_with(|| vec![]) - .push(triple.clone()); - } - - // Do the same for removed ones. - let mut removed_triples_by_pred: HashMap> = HashMap::new(); - // Collect allowed predicate IRIs for this shape - let allowed: std::collections::HashSet<&str> = shape_def - .predicates - .iter() - .map(|p| p.iri.as_str()) - .collect(); - - for triple in triples_removed { - if allowed.contains(triple.predicate.as_str()) { - removed_triples_by_pred - .entry(triple.predicate.as_str().to_string()) + // Based on those triples, group by subject. + let mut triples_by_subject: HashMap> = HashMap::new(); + for triple in triples { + let subject_iri = match &triple.subject { + Subject::NamedNode(node) => node.as_str(), + _ => continue, // Won't happen. + }; + triples_by_subject + .entry(subject_iri.to_string()) .or_insert_with(|| vec![]) - .push(*triple); + .push(&triple); } - } - let mut removed_triples_by_subject: HashMap> = HashMap::new(); - for triple in triples_removed { - let subject_iri = match &triple.subject { - Subject::NamedNode(node) => node.as_str(), - _ => continue, // Won't happen. - }; - removed_triples_by_subject - .entry(subject_iri.to_string()) - .or_insert_with(|| vec![]) - .push(triple.clone()); + return triples_by_subject; } - // Assumes all triples have same subject. - fn orm_from_triple_for_level<'a>( + /// Add all triples to `changes` + /// Returns predicates to nested objects that were touched and need processing. + /// Assumes all triples have same subject. + fn add_remove_triples( shape: &OrmSchemaShape, subject_iri: &String, - triples_added: &Vec, - triples_removed: &Vec, - tracked_subjects: &HashMap>>, - changes: &HashMap, - ) -> ( - Vec<&'a OrmTrackedPredicateChanges<'a>>, - Vec<&'a OrmTrackedPredicateChanges<'a>>, + triples_added: &Vec<&Triple>, + triples_removed: &Vec<&Triple>, + tracked_subjects: &HashMap>, + subject_changes: &OrmTrackedSubjectChange, ) { let tracked_shapes_for_subject = tracked_subjects .entry(subject_iri.clone()) @@ -187,28 +153,15 @@ impl Verifier { let tracked_subject = tracked_shapes_for_subject .entry(subject_iri.clone()) - .or_insert_with(|| OrmTrackedSubject { + .or_insert_with(|| OrmTrackedSubjectAndShape { tracked_predicates: HashMap::new(), parents: HashMap::new(), - valid: ng_net::orm::OrmTrackedSubjectValidity::Unknown, + valid: ng_net::orm::OrmTrackedSubjectValidity::NotEvaluated, subj_iri: subject_iri, shape, }); - let subject_changes = - changes - .entry(subject_iri.clone()) - .or_insert_with(|| OrmTrackedSubjectChange { - subject_iri: subject_iri.clone(), - predicates: HashMap::new(), - tracked_subject, - valid: OrmTrackedSubjectValidity::Unknown, - }); - - // Keep track of all children that were spotted or removed. - let mut children_removed: Vec<&OrmTrackedPredicateChanges> = vec![]; - let mut children_added: Vec<&OrmTrackedPredicateChanges> = vec![]; - + // Process added triples. // For each triple, check matching predicates in shape. // keeping track of value count (for later validations). // In parallel, we keep track of the values added (tracked_changes) @@ -242,7 +195,7 @@ impl Verifier { tracked_predicate: &tp, values_added: Vec::new(), values_removed: Vec::new(), - validity: OrmTrackedSubjectValidity::Unknown, + validity: OrmTrackedSubjectValidity::NotEvaluated, }); pred_changes.values_added.push(obj_term.clone()); @@ -254,24 +207,16 @@ impl Verifier { .iter() .any(|dt| dt.valType == OrmSchemaLiteralType::literal) { - if let Some(current_literals) = &mut tp.current_literals { + if let Some(current_literals) = tp.current_literals { current_literals.push(obj_term); } else { - tp.current_literals = Some(vec![obj_term]); + tp.current_literals.insert(vec![obj_term]); } - } else if tp - .schema - .dataTypes - .iter() - .any(|dt| dt.valType == OrmSchemaLiteralType::shape) - { - // For nested, add object to tracked predicates and add self as parent. - children_added.push(&pred_changes); } } } - // Removed triples + // Process removed triples. for triple in triples_removed { let pred_iri = triple.predicate.as_str(); @@ -309,32 +254,27 @@ impl Verifier { } else { tp.current_literals = Some(vec![val_removed]); } - } else if tp - .schema - .dataTypes - .iter() - .any(|dt| dt.valType == OrmSchemaLiteralType::shape) - { - // For nested, add object to tracked predicates and add self as parent. - children_removed.push(&pred_changes); } } - - return (children_added, children_removed); } + /// Check the validity of a subject. + /// Might return nested objects that need to be validated. + /// Assumes all triples to be of same subject. fn check_subject_validity<'a>( s_change: &'a OrmTrackedSubjectChange<'a>, - shape: &String, + shape: &OrmSchemaShape, schema: &'a OrmSchema, + tracked_subjects: &HashMap>>, previous_validity: OrmTrackedSubjectValidity, ) -> ( OrmTrackedSubjectValidity, - HashMap>>, + // Vec + Vec<(&'a String, &'a OrmSchemaShape, bool)>, ) { + // Check 1) If there are no changes, there is nothing to do. if s_change.predicates.is_empty() { - // There has not been any changes. There is nothing to do. - return (previous_validity, HashMap::new()); + return (previous_validity, vec![]); } let previous_validity = s_change.valid; @@ -344,24 +284,45 @@ impl Verifier { if new_val == OrmTrackedSubjectValidity::Invalid { new_validity = OrmTrackedSubjectValidity::Invalid; // Remove all tracked predicates - s_change.tracked_subject.tracked_predicates = HashMap::new(); - } else if new_val == OrmTrackedSubjectValidity::Unknown + s_change.tracked_subjects.tracked_predicates = HashMap::new(); + } else if new_val == OrmTrackedSubjectValidity::NotEvaluated && new_validity != OrmTrackedSubjectValidity::Invalid { - new_validity = OrmTrackedSubjectValidity::Unknown; + new_validity = OrmTrackedSubjectValidity::NotEvaluated; } }; - let tracked_subject = s_change.tracked_subject; - let shape = schema.get(shape).expect("Shape not available"); + let tracked_subject = tracked_subjects + .get(&s_change.subject_iri) + .unwrap() + .get(&shape.iri) + .unwrap(); + + // Check 2) If all parents are untracked, return untracked. + if tracked_subject + .parents + .values() + .all(|(parent, tracked)| !tracked) + { + // Remove tracked predicates and set untracked. + tracked_subject.tracked_predicates = HashMap::new(); + tracked_subject.valid = OrmTrackedSubjectValidity::Untracked; + return (OrmTrackedSubjectValidity::Untracked, vec![]); + } - // TODO: Check parent validities: - // If no parent is tracking us, we are untracked. - // If there is an infinite loop of parents pointing back to use, return invalid. + // Check 3) If there is an infinite loop of parents pointing back to us, return invalid. + // Create a set of visited parents to detect cycles. + if has_cycle(tracked_subject, &mut HashSet::new()) { + // Remove tracked predicates and set invalid. + tracked_subject.tracked_predicates = HashMap::new(); + tracked_subject.valid = OrmTrackedSubjectValidity::Invalid; + return (OrmTrackedSubjectValidity::Invalid, vec![]); + } // Keep track of objects that need to be validated against a shape to fetch and validate. - let mut new_unknowns: Vec<(&String, &OrmSchemaShape)> = vec![]; + let mut new_unknowns: Vec<(&String, &OrmSchemaShape, bool)> = vec![]; + // Check 4) Validate subject against each predicate in shape. for p_schema in shape.predicates.iter() { let p_change = s_change.predicates.get(&p_schema.iri); let tracked_pred = p_change.map(|pc| pc.tracked_predicate); @@ -369,12 +330,15 @@ impl Verifier { let count = tracked_pred .map_or_else(|| 0, |tp: &OrmTrackedPredicate<'_>| tp.current_cardinality); + // Check 4.1) Cardinality if count < p_schema.minCardinality { set_validity(OrmTrackedSubjectValidity::Invalid); if count <= 0 { - // If no other parent is tracking, remove all tracked predicates. + // If cardinality is 0, we can remove the tracked predicate. tracked_subject.tracked_predicates.remove(&p_schema.iri); } + break; + // Check 4.2) Cardinality too high and extra values not allowed. } else if count > p_schema.maxCardinality && p_schema.maxCardinality != -1 && p_schema.extra != Some(true) @@ -382,6 +346,7 @@ impl Verifier { // If cardinality is too high and no extra allowed, invalid. set_validity(OrmTrackedSubjectValidity::Invalid); break; + // Check 4.3) Literal present types and valid. } else if p_schema .dataTypes .iter() @@ -418,6 +383,7 @@ impl Verifier { break; } } + // Check 4.4) Nested shape correct. } else if p_schema .dataTypes .iter() @@ -435,7 +401,7 @@ impl Verifier { (1, 0, 0, 0) } else if tc.valid == OrmTrackedSubjectValidity::Invalid { (0, 1, 0, 0) - } else if tc.valid == OrmTrackedSubjectValidity::Unknown { + } else if tc.valid == OrmTrackedSubjectValidity::NotEvaluated { (0, 0, 1, 0) } else if tc.valid == OrmTrackedSubjectValidity::Untracked { (0, 0, 0, 1) @@ -459,7 +425,7 @@ impl Verifier { break; } else if counts.3 > 0 { // If we have untracked nested objects, we need to fetch them and validate. - set_validity(OrmTrackedSubjectValidity::Unknown); + set_validity(OrmTrackedSubjectValidity::NotEvaluated); // Add them to the list of unknowns to fetch and validate. for o in tracked_pred .iter() @@ -467,17 +433,18 @@ impl Verifier { { if let Some(tc) = o.upgrade() { if tc.valid == OrmTrackedSubjectValidity::Untracked { - new_unknowns.push((tc.subj_iri, tc.shape)); + new_unknowns.push((tc.subj_iri, tc.shape, true)); } } } } else if counts.2 > 0 { // If we have unknown nested objects, we need to wait for their evaluation. - set_validity(OrmTrackedSubjectValidity::Unknown); + set_validity(OrmTrackedSubjectValidity::NotEvaluated); } else { // All nested objects are valid and cardinality is correct. // We are valid with this predicate. } + // Check 4.5) Data types correct. } else { // Check if the data type is correct. let allowed_types: Vec = @@ -508,17 +475,19 @@ impl Verifier { }; } - // TODO // If we are invalid, we can discard new unknowns again - they won't be kept in memory. + // We need to inform all children (by returning them for later evaluation), to untrack them. + // TODO: Collect info about all children to untrack them. if new_validity == OrmTrackedSubjectValidity::Invalid { - return (OrmTrackedSubjectValidity::Invalid, HashMap::new()); - } else if (new_validity == OrmTrackedSubjectValidity::Valid - && previous_validity != OrmTrackedSubjectValidity::Valid) + return (OrmTrackedSubjectValidity::Invalid, vec![]); + } else if new_validity == OrmTrackedSubjectValidity::Valid + && previous_validity != OrmTrackedSubjectValidity::Valid { - // If the validity is newly valid, we need to refetch this subject. // TODO + // If this subject became valid, we need to refetch this subject. } - // If validity changed, inform parents (add to new_unknowns). + // TODO... + // If validity changed, parents need to be re-evaluated. if new_validity != previous_validity { // TODO } @@ -527,69 +496,130 @@ impl Verifier { return (new_validity, new_unknowns); } - let all_subjects: HashSet<&String> = added_triples_by_subject - .keys() - .chain(removed_triples_by_subject.keys()) - .collect(); - - // Process added/removed triples for each subject. - for subject_iri in all_subjects { - let triples_added_for_subj = added_triples_by_subject - .get(subject_iri) - .unwrap_or(&vec![]) - .to_vec(); - let triples_removed_for_subj = removed_triples_by_subject - .get(subject_iri) - .unwrap_or(&vec![]) - .to_vec(); - - let _ = orm_from_triple_for_level( - &shape_def, - &subject_iri, - &triples_added_for_subj, - &triples_removed_for_subj, - &tracked_subjects, - &subject_changes, - ); - } + // === Validation === + + // FILO queue: To validate object changes (nested objects first). Strings are object IRIs. + let mut shape_validation_queue: Vec<(&OrmSchemaShape, Vec)> = vec![]; + // Add root shape for first validation run. + shape_validation_queue.push((&root_shape, vec![])); + + // Structure to store changes in. By shape iri > subject iri > OrmTrackedSubjectChange + let mut shape_and_subject_changes: HashMap< + String, + HashMap, + > = HashMap::new(); + + // Process queue of shapes and subjects to validate. + while let Some((shape, objects_to_validate)) = shape_validation_queue.pop() { + // For a given shape, we evaluate every subject against that shape. + + // Collect triples relevant for validation. + let added_triples_by_subject = + group_by_subject_for_shape(shape, triples_added, &objects_to_validate); + let removed_triples_by_subject = + group_by_subject_for_shape(shape, triples_removed, &objects_to_validate); + let all_modified_subjects: HashSet<&String> = added_triples_by_subject + .keys() + .chain(removed_triples_by_subject.keys()) + .collect(); + + // Use to collect nested objects that need validation. + // First string is shape IRI, second are object IRIs. + let nested_objects_to_validate: HashMap> = HashMap::new(); + + // For each subject, add/remove triples and validate. + for subject_iri in all_modified_subjects { + let triples_added_for_subj = added_triples_by_subject + .get(subject_iri) + .unwrap_or(&vec![]) + .to_vec(); + let triples_removed_for_subj = removed_triples_by_subject + .get(subject_iri) + .unwrap_or(&vec![]) + .to_vec(); + + // Get or create change object for (shape, subject) pair. + let change = shape_and_subject_changes + .entry(shape.iri.clone()) + .or_insert_with(|| HashMap::new()) + .entry(subject_iri.clone()) + .or_insert_with(|| OrmTrackedSubjectChange { + subject_iri: subject_iri.clone(), + predicates: HashMap::new(), + valid: OrmTrackedSubjectValidity::NeedsFetch, + }); + + // Apply all triples for that subject to the tracked (shape, subject) pair. + // Record the changes. + add_remove_triples( + shape, + &subject_iri, + &triples_added_for_subj, + &triples_removed_for_subj, + &tracked_subjects, + &change, + ); + + let tracked_subject = tracked_subjects.get(subject_iri).unwrap(); + // Validate the subject. + let (new_validity, new_unknowns) = check_subject_validity( + &change, + shape, + schema, + tracked_subjects, + tracked_subject, + ); + + // TODO: Add logic to fetch un-fetched objects after validation. + // and return logic to add unprocessed nested objects after validation. - // TODO ==== - - // To process validation, we collect all subject changes in one of the buckets. - // Subjects for which we did not apply triples. - let un_processed: HashSet = HashSet::new(); - // Subjects that are invalid. No further processing needed. - let invalids: HashSet<&OrmTrackedSubjectChange> = HashSet::new(); - // Subjects that are valid. Fetch might still be required if newly valid. - let valids: HashSet<&OrmTrackedSubjectChange> = HashSet::new(); - // Will need re-evaluation - let unknowns: HashSet<&OrmTrackedSubjectChange> = HashSet::new(); - // Either because it became tracked again or because it's newly valid. - let needs_fetch: HashSet<&OrmTrackedSubjectChange> = HashSet::new(); - - while !unknowns.is_empty() || !needs_fetch.is_empty() { - // Process buckets by priority and nesting - // First unknown, then needs fetch (the latter could still become invalid). - // Start from from the end because nested objects will less likely need further nested eval. - - // Check validity for each modified subject. - for sc in un_processed { - let tracked_subject = tracked_subjects - .get(sc.tracked_subject.subj_iri) - .unwrap() - .get(shape) - .unwrap(); - let (is_valid, new_unknowns) = - check_subject_validity(s_change, &shape, schema, tracked_subject.valid); + // We add the new_unknowns to be processed next + for (iri, schema) in new_unknowns { + // Add to nested_objects_to_validate. + nested_objects_to_validate + .entry(schema.iri.clone()) + .or_insert_with(|| vec![]) + .push(iri.clone()); + } } - if !unknowns.is_empty() { - continue; + + // Now, we add all objects that need re-evaluation to the queue. + for (shape_iri, objects) in nested_objects_to_validate { + shape_validation_queue.push((schema.get(&shape_iri).unwrap(), objects)); } - for sc in needs_fetch { - // TODO: fetch and evaluate. + } + + return shape_and_subject_changes; + } + + fn create_orm_from_triples( + &mut self, + scope: &NuriV0, + schema: &OrmSchema, + shape: &OrmSchemaShape, + triples: &Vec, + ) -> Result { + let changes = self.apply_changes_from_triples(scope, schema, shape, triples, vec![]); + + let root_changes = changes.get(shape.iri).unwrap().values(); + let valid_roots = root_changes.filter(|v| v.valid == OrmTrackedSubjectValidity::Valid); + + let mut return_vals: Value = Value::Array(vec![]); + for root_change in root_changes { + let new_val = json!({"id": root_change.subject_iri}); + for (pred_iri, pred_change) in root_change.predicates { + // Add the readable predicate name + let property_name = pred_change.tracked_predicate.schema.readablePredicate; + + // For basic values, add value. + // For arrays of basic values, add array. + // For single nested object, add object. + // For multiple nested objects, create object with iri keys. } } - // === + + return Ok(return_vals); + // } // Collect result @@ -698,6 +728,8 @@ impl Verifier { } } +/// Heuristic: +/// Consider a string an IRI if it contains alphanumeric characters and then a colon within the first 13 characters fn is_iri(s: &str) -> bool { lazy_static! { static ref IRI_REGEX: Regex = Regex::new(r"^[A-Za-z][A-Za-z0-9+\.\-]{1,12}:").unwrap(); @@ -979,3 +1011,17 @@ fn oxrdf_term_to_orm_basic_type(term: &ng_oxigraph::oxrdf::Term) -> BasicType { ng_net::orm::Term::Ref(b) => BasicType::Str(b), // Treat IRIs as strings } } + +fn has_cycle(subject: &OrmTrackedSubjectAndShape, visited: &mut HashSet) -> bool { + if visited.contains(subject.subj_iri) { + return true; + } + visited.insert(subject.subj_iri.clone()); + for (_parent_iri, (parent_subject, _)) in &subject.parents { + if has_cycle(parent_subject, visited) { + return true; + } + } + visited.remove(subject.subj_iri); + false +}