diff --git a/ng-net/src/orm.rs b/ng-net/src/orm.rs index 561a2f9..e346e34 100644 --- a/ng-net/src/orm.rs +++ b/ng-net/src/orm.rs @@ -116,7 +116,7 @@ pub struct OrmTrackedSubjectAndShape<'a> { pub parents: HashMap, bool)>, /// Validity. When untracked, triple updates are not processed here. pub valid: OrmTrackedSubjectValidity, - pub subject_iri: &'a String, + pub subject_iri: String, /// The shape for which the predicates are tracked. pub shape: &'a OrmSchemaShape, } @@ -134,7 +134,6 @@ pub enum OrmTrackedSubjectValidity { pub struct OrmTrackedPredicate<'a> { /// The predicate schema pub schema: &'a OrmSchemaPredicate, - /// TODO: This is not correctly implemented. /// If the schema is a nested object, the children. pub tracked_children: Vec>>, /// The count of triples for this subject and predicate. @@ -149,7 +148,7 @@ pub struct OrmTrackedSubjectChange<'a> { pub subject_iri: String, /// Predicates that were changed. pub predicates: HashMap>, - /// During validation, the current state of validity. + /// During validation, the current state of validity (can be subject to change). pub valid: OrmTrackedSubjectValidity, } pub struct OrmTrackedPredicateChanges<'a> { diff --git a/ng-verifier/src/orm.rs b/ng-verifier/src/orm.rs index 000e68a..a63a2ee 100644 --- a/ng-verifier/src/orm.rs +++ b/ng-verifier/src/orm.rs @@ -7,14 +7,11 @@ // notice may not be copied, modified, or distributed except // according to those terms. +use futures::channel::mpsc; use std::collections::HashMap; use std::collections::HashSet; -use std::hash::Hash; use std::rc::Weak; -use async_std::task::current; -use futures::channel::mpsc; - use futures::SinkExt; use lazy_static::lazy_static; use ng_net::orm::BasicType; @@ -29,21 +26,15 @@ use ng_net::orm::OrmTrackedSubjectAndShape; use ng_net::orm::OrmTrackedSubjectChange; use ng_net::orm::OrmTrackedSubjectValidity; use ng_net::orm::{OrmSchemaDataType, OrmSchemaShape}; +use ng_net::utils::Receiver; use ng_net::{app_protocol::*, orm::OrmSchema}; -use ng_net::{ - types::*, - utils::{Receiver, Sender}, -}; -use ng_oxigraph::oxigraph::sparql::{results::*, Query, QueryResults}; -use ng_oxigraph::oxrdf::LiteralRef; -use ng_oxigraph::oxrdf::NamedNode; +use ng_oxigraph::oxigraph::sparql::{Query, QueryResults}; use ng_oxigraph::oxrdf::Subject; use ng_oxigraph::oxrdf::Triple; use ng_repo::errors::NgError; use ng_repo::errors::VerifierError; use ng_repo::log::*; use regex::Regex; -use serde::de::IntoDeserializer; use serde_json::json; use serde_json::Value; @@ -148,52 +139,58 @@ impl Verifier { tracked_subjects: &HashMap>, subject_changes: &OrmTrackedSubjectChange, ) { - let tracked_shapes_for_subject = tracked_subjects - .entry(subject_iri.clone()) - .or_insert_with(|| HashMap::new()); - - let tracked_subject = tracked_shapes_for_subject - .entry(subject_iri.clone()) - .or_insert_with(|| OrmTrackedSubjectAndShape { - tracked_predicates: HashMap::new(), - parents: HashMap::new(), - valid: ng_net::orm::OrmTrackedSubjectValidity::NotEvaluated, - subject_iri, - shape, - }); + let get_or_create_tracked_subject = |subject_iri: &String, shape_iri: &String| { + let tracked_shapes_for_subject = tracked_subjects + .entry(subject_iri.clone()) + .or_insert_with(|| HashMap::new()); + + let tracked_subject = tracked_shapes_for_subject + .entry(shape_iri.clone()) + .or_insert_with(|| OrmTrackedSubjectAndShape { + tracked_predicates: HashMap::new(), + parents: HashMap::new(), + valid: ng_net::orm::OrmTrackedSubjectValidity::NotEvaluated, + subject_iri: subject_iri.clone(), + shape, + }); + + tracked_subject + }; + + let tracked_subject = get_or_create_tracked_subject(subject_iri, &shape.iri); // Process added triples. // For each triple, check matching predicates in shape. // keeping track of value count (for later validations). // In parallel, we keep track of the values added (tracked_changes) for triple in triples_added { - for schema_predicate in &shape.predicates { - if schema_predicate.iri != triple.predicate.as_str() { + for predicate_schema in &shape.predicates { + if predicate_schema.iri != triple.predicate.as_str() { // Triple does not match predicate. continue; } // Predicate schema constraint matches this triple. // Add tracked predicate or increase cardinality - let tp = tracked_subject + let tracked_predicate = tracked_subject .tracked_predicates - .entry(schema_predicate.iri.to_string()) + .entry(predicate_schema.iri.to_string()) .or_insert_with(|| OrmTrackedPredicate { current_cardinality: 0, - schema: schema_predicate, + schema: predicate_schema, tracked_children: Vec::new(), current_literals: None, }); - tp.current_cardinality += 1; + tracked_predicate.current_cardinality += 1; let obj_term = oxrdf_term_to_orm_basic_type(&triple.object); // Keep track of the changed values too. let pred_changes = subject_changes .predicates - .entry(schema_predicate.iri.clone()) + .entry(predicate_schema.iri.clone()) .or_insert_with(|| OrmTrackedPredicateChanges { - tracked_predicate: &tp, + tracked_predicate: &tracked_predicate, values_added: Vec::new(), values_removed: Vec::new(), }); @@ -201,16 +198,41 @@ impl Verifier { pred_changes.values_added.push(obj_term.clone()); // If value type is literal, we need to add the current value to the tracked predicate. - if tp + if tracked_predicate .schema .dataTypes .iter() .any(|dt| dt.valType == OrmSchemaLiteralType::literal) { - if let Some(current_literals) = tp.current_literals { + if let Some(current_literals) = tracked_predicate.current_literals { current_literals.push(obj_term); } else { - tp.current_literals.insert(vec![obj_term]); + tracked_predicate.current_literals.insert(vec![obj_term]); + } + } + + // If predicate is of type shape, register (parent -> child) links so that + // nested subjects can later be (lazily) fetched / validated. + for shape_iri in predicate_schema + .dataTypes + .iter() + .filter(|dt| dt.valType == OrmSchemaLiteralType::shape) + .flat_map(|dt| dt.shape) + { + if let BasicType::Str(obj_iri) = obj_term { + // Get or create object's tracked subject struct. + let tracked_child = get_or_create_tracked_subject( + triple.predicate.as_string(), + &shape_iri, + ); + + // Add self to parent (set tracked to true, preliminary). + tracked_child.parents.insert(obj_iri, (tracked_child, true)); + + // Add link to children + tracked_predicate + .tracked_children + .push(unsafe { Weak::from_raw(tracked_child) }); } } } @@ -221,7 +243,7 @@ impl Verifier { let pred_iri = triple.predicate.as_str(); // Only adjust if we had tracked state. - let Some(tp) = tracked_subjects + let Some(tracked_predicate) = tracked_subjects .get_mut(subject_iri) .map(|tss| tss.get(&shape.iri)) .flatten() @@ -232,7 +254,7 @@ impl Verifier { }; // The cardinality might become -1 or 0. We will remove them from the tracked predicates during validation. - tp.current_cardinality -= 1; + tracked_predicate.current_cardinality -= 1; let Some(pred_changes) = subject_changes.predicates.get(pred_iri) else { continue; @@ -242,17 +264,47 @@ impl Verifier { pred_changes.values_removed.push(val_removed.clone()); // If value type is literal, we need to remove the current value from the tracked predicate. - if tp + if tracked_predicate .schema .dataTypes .iter() .any(|dt| dt.valType == OrmSchemaLiteralType::literal) { - if let Some(current_literals) = &mut tp.current_literals { + if let Some(current_literals) = &mut tracked_predicate.current_literals { // Remove obj_val from current_literals in-place current_literals.retain(|val| *val != val_removed); } else { - tp.current_literals = Some(vec![val_removed]); + tracked_predicate.current_literals = Some(vec![val_removed]); + } + } else if tracked_predicate + .schema + .dataTypes + .iter() + .any(|dt| dt.valType == OrmSchemaLiteralType::shape) + { + // Remove parent from child and child from tracked children. + for shape_iri in tracked_predicate + .schema + .dataTypes + .iter() + .filter(|dt| dt.valType == OrmSchemaLiteralType::shape) + .flat_map(|dt| dt.shape) + { + if let BasicType::Str(object_iri) = val_removed { + // Get child. + let nested_tracked_subject = get_or_create_tracked_subject( + triple.predicate.as_string(), + &shape_iri, + ); + + // Add self to parent (set tracked to true, preliminary). + nested_tracked_subject.parents.remove(&object_iri); + + // Remove from tracked_children + tracked_predicate.tracked_children.retain_mut(|el| { + el.upgrade().map(|el| el.subject_iri) == Some(object_iri) + }); + } } } } @@ -320,7 +372,7 @@ impl Verifier { } // Keep track of objects that need to be validated against a shape to fetch and validate. - let mut new_unknowns: Vec<(&String, &OrmSchemaShape, bool)> = vec![]; + let mut need_evaluation: Vec<(&String, &OrmSchemaShape, bool)> = vec![]; // Check 4) Validate subject against each predicate in shape. for p_schema in shape.predicates.iter() { @@ -433,7 +485,7 @@ impl Verifier { { if let Some(tc) = o.upgrade() { if tc.valid == OrmTrackedSubjectValidity::Untracked { - new_unknowns.push((tc.subject_iri, tc.shape, true)); + need_evaluation.push((&tc.subject_iri, tc.shape, true)); } } } @@ -475,25 +527,57 @@ impl Verifier { }; } - // If we are invalid, we can discard new unknowns again - they won't be kept in memory. - // We need to inform all children (by returning them for later evaluation), to untrack them. - // TODO: Collect info about all children to untrack them. if new_validity == OrmTrackedSubjectValidity::Invalid { - return (OrmTrackedSubjectValidity::Invalid, vec![]); + // If we are invalid, we can discard new unknowns again - they won't be kept in memory. + // We need to remove ourself from child objects parents field and + // remove them if no other is tracking. + for child_subject_weak in tracked_subject + .tracked_predicates + .iter() + .flat_map(|(tp_iri, tp)| tp.tracked_children) + { + if let Some(child_subject) = child_subject_weak.upgrade() { + child_subject.parents.remove(&s_change.subject_iri); + }; + } + + // Remove tracked predicates and set untracked. + tracked_subject.tracked_predicates = HashMap::new(); + tracked_subject.valid = OrmTrackedSubjectValidity::Invalid; + s_change.valid = OrmTrackedSubjectValidity::Invalid; + + // Empty list of children that need evaluation. + need_evaluation.retain(|_| false); } else if new_validity == OrmTrackedSubjectValidity::Valid && previous_validity != OrmTrackedSubjectValidity::Valid { - // TODO - // If this subject became valid, we need to refetch this subject. + // If this subject became valid, we need to refetch this subject; + // For that we prepend self to needs_fetch. + s_change.valid = OrmTrackedSubjectValidity::Valid; + tracked_subject.valid = OrmTrackedSubjectValidity::Valid; + need_evaluation.insert(0, (&s_change.subject_iri, shape, true)); } - // TODO... + // If validity changed, parents need to be re-evaluated. if new_validity != previous_validity { - // TODO + // We return the tracking parents which need re-evaluation. + // Remember that the last elements (i.e. children or needs_fetch) are evaluated first. + return ( + new_validity, + // Add parents and objects in `need_evaluation`. + tracked_subject + .parents + .values() + // Inform tracking parents only. + .filter(|(parent, is_tracked)| *is_tracked) + .map(|(parent, is_tracked)| (&parent.subject_iri, parent.shape, false)) + // Add `need_evaluation`. + .chain(need_evaluation) + .collect(), + ); } - // TODO - return (new_validity, new_unknowns); + return (new_validity, need_evaluation); } // === Validation === @@ -574,7 +658,7 @@ impl Verifier { // and return logic to add unprocessed nested objects after validation. // We add the new_unknowns to be processed next - for (iri, schema) in new_unknowns { + for (iri, schema, needs_fetch) in new_unknowns { // Add to nested_objects_to_validate. nested_objects_to_validate .entry(schema.iri.clone()) @@ -601,7 +685,10 @@ impl Verifier { ) -> Result { let changes = self.apply_changes_from_triples(scope, schema, shape, triples, &vec![]); - let root_changes = changes.get(&shape.iri).unwrap().values(); + let Some(root_changes) = changes.get(&shape.iri).map(|s| s.values()) else { + return Ok(Value::Array(vec![])); + }; + let valid_roots = root_changes.filter(|v| v.valid == OrmTrackedSubjectValidity::Valid); let mut return_vals: Value = Value::Array(vec![]); @@ -612,28 +699,35 @@ impl Verifier { changes: &HashMap>>, shape: &OrmSchemaShape, schema: &OrmSchema, + tracked_subjects: &HashMap>, ) -> Value { let mut new_val = json!({"id": change.subject_iri}); let new_val_map = new_val.as_object_mut().unwrap(); for pred_schema in &shape.predicates { + let Some(pred_change) = change.predicates.get(&pred_schema.iri) else { + continue; + }; let property_name = pred_schema.readablePredicate.clone(); - let is_multi = pred_schema.maxCardinality > 1; - let pred_change = change.predicates.get(&pred_schema.iri).unwrap(); + let is_multi = pred_schema.maxCardinality > 1 || pred_schema.maxCardinality == -1; if pred_schema .dataTypes .iter() .any(|dt| dt.valType == OrmSchemaLiteralType::shape) { - // Helper to create nested objects. + // We have a nested type. + + // Helper to create Value structs from a nested object_iri. let get_nested_value = |object_iri: &String| { + // Find allowed schemas for the predicate's datatype. let shape_iris: Vec = pred_schema .dataTypes .iter() .flat_map(|dt| dt.shape.clone()) .collect(); - // Find subject_change for this subject. There exists at least one (shape, subject). + // Find subject_change for this subject. There exists at least one (shape, subject) pair. + // If multiple exist, we take the first one. _Which one is chosen is undefined behavior_ let nested_subject_change = shape_iris .iter() .find_map(|shape_iri| { @@ -643,32 +737,43 @@ impl Verifier { }) .unwrap(); - // Recurse - create_value_from_change( - nested_subject_change, - changes, - schema.get(&nested_subject_change.subject_iri).unwrap(), - schema, - ) + if let Some(nested_schema) = tracked_subjects + .get(&nested_subject_change.subject_iri) + .and_then(|tracked_subjects| tracked_subjects.get(&shape.iri)) + { + // Recurse + return create_value_from_change( + nested_subject_change, + changes, + nested_schema.shape, + schema, + tracked_subjects, + ); + } + return json!({}); }; if is_multi { - // Add each value to a new object (predicate being object IRIs). + // Add each value to a new object (predicates being object IRIs) like {iri: }. let mut nested_objects = json!({"id": change.subject_iri}); let nested_objects_map = nested_objects.as_object_mut().unwrap(); + // Add (object iri, nested object) pairs to object. for new_val in &pred_change.values_added { if let BasicType::Str(object_iri) = new_val { new_val_map .insert(object_iri.clone(), get_nested_value(&object_iri)); } } + new_val_map.insert(property_name.clone(), nested_objects); } else { if let Some(BasicType::Str(object_iri)) = pred_change.values_added.get(0) { new_val_map.insert(property_name.clone(), get_nested_value(object_iri)); } } } else { + // We have a basic type (string, number, bool, literal). + if is_multi { // Add values as array. new_val_map.insert( @@ -692,16 +797,23 @@ impl Verifier { } } } + return new_val; } + // For each valid change struct, we build an orm object. for root_change in valid_roots { - let new_val = create_value_from_change(root_change, &changes, shape, schema); + let new_val = create_value_from_change( + root_change, + &changes, + shape, + schema, + self.orm_tracked_subjects, + ); return_val_vec.push(new_val); } return Ok(return_vals); - // } // Collect result