From 39152ad1ce9719f9403fbda6e37e7e745f88596f Mon Sep 17 00:00:00 2001 From: Laurin Weger Date: Sun, 5 Oct 2025 16:55:12 +0200 Subject: [PATCH] restructure code --- ng-verifier/src/commits/transaction.rs | 26 +- ng-verifier/src/lib.rs | 3 +- ng-verifier/src/orm/mod.rs | 13 + ng-verifier/src/{ => orm}/orm.rs | 160 ++--- ng-verifier/src/orm/orm_add_remove_triples.rs | 293 +++++++++ ng-verifier/src/orm/orm_validation.rs | 327 ++++++++++ .../src/{utils/mod.rs => orm/types.rs} | 2 - ng-verifier/src/utils/orm_validation.rs | 600 ------------------ ng-verifier/src/verifier.rs | 2 +- 9 files changed, 740 insertions(+), 686 deletions(-) create mode 100644 ng-verifier/src/orm/mod.rs rename ng-verifier/src/{ => orm}/orm.rs (87%) create mode 100644 ng-verifier/src/orm/orm_add_remove_triples.rs create mode 100644 ng-verifier/src/orm/orm_validation.rs rename ng-verifier/src/{utils/mod.rs => orm/types.rs} (94%) delete mode 100644 ng-verifier/src/utils/orm_validation.rs diff --git a/ng-verifier/src/commits/transaction.rs b/ng-verifier/src/commits/transaction.rs index adcbff7..bab25f3 100644 --- a/ng-verifier/src/commits/transaction.rs +++ b/ng-verifier/src/commits/transaction.rs @@ -248,14 +248,16 @@ impl Verifier { Ok(()) } - pub(crate) fn get_triples_from_transaction(commit_body: &CommitBody) -> Result, VerifierError> { + pub(crate) fn get_triples_from_transaction( + commit_body: &CommitBody, + ) -> Result, VerifierError> { match commit_body { CommitBody::V0(CommitBodyV0::AsyncTransaction(Transaction::V0(v0))) => { let transac: TransactionBody = serde_bare::from_slice(v0)?; if let Some(graph_transac) = transac.graph { return Ok(graph_transac.inserts); } - }, + } _ => {} } Err(VerifierError::InvalidCommit) @@ -705,7 +707,9 @@ impl Verifier { let mut tab_doc_info = AppTabDocInfo::new(); for removed in update.transaction.removes { match removed.predicate.as_str() { - NG_ONTOLOGY_ABOUT => tab_doc_info.description = Some("".to_string()), + NG_ONTOLOGY_ABOUT => { + tab_doc_info.description = Some("".to_string()) + } NG_ONTOLOGY_TITLE => tab_doc_info.title = Some("".to_string()), _ => {} } @@ -748,16 +752,18 @@ impl Verifier { })), ) .await; - let graph_nuri = NuriV0::repo_graph_name( - &update.repo_id, - &update.overlay_id, - ); - self.orm_update(&NuriV0::new_empty(), update.transaction.as_quads_patch(graph_nuri)).await; + let graph_nuri = + NuriV0::repo_graph_name(&update.repo_id, &update.overlay_id); + self.orm_update( + &NuriV0::new_empty(), + update.transaction.as_quads_patch(graph_nuri), + ) + .await; } } Ok(commit_nuris) - }, - Err(e) => Err(e) + } + Err(e) => Err(e), } } diff --git a/ng-verifier/src/lib.rs b/ng-verifier/src/lib.rs index f2120d4..65587dc 100644 --- a/ng-verifier/src/lib.rs +++ b/ng-verifier/src/lib.rs @@ -18,7 +18,7 @@ mod user_storage; mod commits; -pub mod orm; +pub(crate) mod orm; mod request_processor; @@ -26,7 +26,6 @@ mod inbox_processor; #[cfg(all(not(target_family = "wasm"), not(docsrs)))] mod rocksdb_user_storage; -pub(crate) mod utils; use ng_net::app_protocol::*; use ng_oxigraph::oxrdf::Triple; diff --git a/ng-verifier/src/orm/mod.rs b/ng-verifier/src/orm/mod.rs new file mode 100644 index 0000000..e38dd1b --- /dev/null +++ b/ng-verifier/src/orm/mod.rs @@ -0,0 +1,13 @@ +// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers +// All rights reserved. +// Licensed under the Apache License, Version 2.0 +// +// or the MIT license , +// at your option. All files in the project carrying such +// notice may not be copied, modified, or distributed except +// according to those terms. + +pub mod orm; +pub mod orm_add_remove_triples; +pub mod orm_validation; +pub mod types; diff --git a/ng-verifier/src/orm.rs b/ng-verifier/src/orm/orm.rs similarity index 87% rename from ng-verifier/src/orm.rs rename to ng-verifier/src/orm/orm.rs index 68dd074..90bbdaa 100644 --- a/ng-verifier/src/orm.rs +++ b/ng-verifier/src/orm/orm.rs @@ -8,7 +8,7 @@ // according to those terms. use futures::channel::mpsc; -use ng_net::actors::app::session; +use ng_oxigraph::oxrdf::Subject; use std::collections::HashMap; use std::collections::HashSet; @@ -29,8 +29,8 @@ use regex::Regex; use serde_json::json; use serde_json::Value; +use crate::orm::orm_add_remove_triples::add_remove_triples; use crate::types::*; -use crate::utils::orm_validation::*; use crate::verifier::*; type ShapeIri = String; @@ -129,9 +129,6 @@ impl Verifier { orm_changes: &mut OrmChanges, ) -> Result<(), NgError> { let nuri_repo = nuri.repo(); - //let orm_subscription = self.orm_subscriptions.get(nuri).unwrap(); - //let tracked_subjects = orm_subscription.tracked_subjects; - //let schema = &orm_subscription.shape_type.schema; // First in, last out stack to keep track of objects to validate (nested objects first). Strings are object IRIs. let mut shape_validation_queue: Vec<(Arc, Vec)> = vec![]; @@ -180,23 +177,25 @@ impl Verifier { // Apply all triples for that subject to the tracked (shape, subject) pair. // Record the changes. { - let orm_subscription = self + let mut orm_subscription = self .orm_subscriptions .get_mut(nuri) .unwrap() - .iter() + .iter_mut() .find(|s| s.session_id == session_id && s.shape_type.shape == shape.iri) - .unwrap(); + .unwrap() + .clone(); - if let Err(e) = add_remove_triples_mut( + if let Err(e) = add_remove_triples( shape.clone(), subject_iri, triples_added_for_subj, triples_removed_for_subj, - &mut orm_subscription.tracked_subjects, + &mut orm_subscription, change, ) { log_err!("apply_changes_from_triples add/remove error: {:?}", e); + panic!(); } let validity = { @@ -211,10 +210,10 @@ impl Verifier { }; // Validate the subject. - let need_eval = update_subject_validity( + let need_eval = Self::update_subject_validity( change, &shape, - &mut orm_subscription.tracked_subjects, + &mut orm_subscription, validity, ); @@ -237,16 +236,13 @@ impl Verifier { .map(|(s, _)| s.clone()) .collect(); - let orm_subscription = self - .get_orm_subscriptions_for(nuri, Some(&shape.iri), Some(&session_id)) - .get(0) - .unwrap(); + let orm_subscriptions_vec = + self.get_orm_subscriptions_for(nuri, Some(&shape.iri), Some(&session_id)); + let orm_subscription = orm_subscriptions_vec.get(0).unwrap(); - let schema = &orm_subscription.shape_type.schema; - - let shape: &Arc = schema - .get(shape_iri) - .ok_or(VerifierError::InvalidOrmSchema)?; + // Extract schema and shape Arc before mutable borrow + let schema = orm_subscription.shape_type.schema.clone(); + let shape_arc = schema.get(shape_iri).unwrap().clone(); // Create sparql query let shape_query = @@ -256,24 +252,19 @@ impl Verifier { self.process_changes_for_shape_and_session( nuri, - shape.clone(), + shape_arc.clone(), session_id, &new_triples, &vec![], orm_changes, )?; - // @Niko, if I put this in the same loop, I get borrow conflicts let objects_not_to_fetch = objects_to_eval .iter() .filter(|(_iri, needs_fetch)| !*needs_fetch) .map(|(s, _)| s.clone()) .collect(); - let shape = schema - .get(shape_iri) - .ok_or(VerifierError::InvalidOrmSchema)?; - - shape_validation_queue.push((shape.clone(), objects_not_to_fetch)); + shape_validation_queue.push((shape_arc, objects_not_to_fetch)); } } @@ -281,12 +272,12 @@ impl Verifier { } /// Helper to get orm subscriptions for nuri, shapes and sessions. - fn get_orm_subscriptions_for( + pub fn get_orm_subscriptions_for( &self, nuri: &NuriV0, shape: Option<&ShapeIri>, session_id: Option<&u64>, - ) -> Vec<&OrmSubscription> { + ) -> Vec<&Arc> { self.orm_subscriptions.get(nuri).unwrap(). // Filter shapes, if present. iter().filter(|s| match shape { @@ -484,15 +475,12 @@ impl Verifier { let changes: OrmChanges = self.apply_triple_changes(&shape_triples, &[], nuri, Some(session_id.clone()))?; - let orm_subscription = *self - .get_orm_subscriptions_for(nuri, Some(&shape_type.shape), Some(&session_id)) - .get(0) - .unwrap(); + let orm_subscriptions_vec = + self.get_orm_subscriptions_for(nuri, Some(&shape_type.shape), Some(&session_id)); + let orm_subscription = orm_subscriptions_vec.get(0).unwrap(); let schema = &orm_subscription.shape_type.schema; - let root_shape = schema - .get(&shape_type.shape) - .ok_or(VerifierError::InvalidOrmSchema)?; + let root_shape = schema.get(&shape_type.shape).unwrap(); let Some(_root_changes) = changes.get(&root_shape.iri).map(|s| s.values()) else { return Ok(Value::Array(vec![])); }; @@ -524,7 +512,7 @@ impl Verifier { return Ok(return_vals); } - pub(crate) async fn orm_update(&mut self, scope: &NuriV0, patch: GraphTransaction) {} + pub(crate) async fn orm_update(&mut self, scope: &NuriV0, patch: GraphQuadsPatch) {} pub(crate) async fn orm_frontend_update( &mut self, @@ -537,7 +525,7 @@ impl Verifier { pub(crate) async fn push_orm_response( &mut self, - subscription: &OrmSubscription, + subscription: &Arc, response: AppResponse, ) { log_debug!( @@ -577,19 +565,19 @@ impl Verifier { // All referenced shapes must be available. // Create new subscription and add to self.orm_subscriptions - let orm_subscription = OrmSubscription { + let orm_subscription = Arc::new(OrmSubscription { shape_type: shape_type.clone(), session_id: session_id, sender: tx.clone(), tracked_subjects: HashMap::new(), nuri: nuri.clone(), - }; + }); self.orm_subscriptions .entry(nuri.clone()) .or_insert(vec![]) .push(orm_subscription); - let orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type); + let _orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type); // TODO integrate response @@ -690,35 +678,34 @@ pub fn shape_type_to_sparql( // (already in SPARQL-format, e.g. `"a astring"`, ``, `true`, or `42`). allowed_literals.extend(literal_to_sparql_str(datatype.clone())); } else if datatype.valType == OrmSchemaLiteralType::shape { - if let Some(shape_id) = &datatype.shape { - if let Some(nested_shape) = schema.get(shape_id) { - // For the current acceptable shape, add CONSTRUCT, WHERE, and recurse. - - // Each shape option gets its own var. - let obj_var_name = get_new_var_name(var_counter); - - construct_statements.push(format!( - " ?{} <{}> ?{}", - subject_var_name, predicate.iri, obj_var_name - )); - // Those are later added to a UNION, if there is more than one shape. - union_branches.push(format!( - " ?{} <{}> ?{}", - subject_var_name, predicate.iri, obj_var_name - )); - - // Recurse to add statements for nested object. - process_shape( - schema, - nested_shape, - &obj_var_name, - construct_statements, - where_statements, - var_counter, - visited_shapes, - ); - } - } + let shape_iri = &datatype.shape.clone().unwrap(); + let nested_shape = schema.get(shape_iri).unwrap(); + + // For the current acceptable shape, add CONSTRUCT, WHERE, and recurse. + + // Each shape option gets its own var. + let obj_var_name = get_new_var_name(var_counter); + + construct_statements.push(format!( + " ?{} <{}> ?{}", + subject_var_name, predicate.iri, obj_var_name + )); + // Those are later added to a UNION, if there is more than one shape. + union_branches.push(format!( + " ?{} <{}> ?{}", + subject_var_name, predicate.iri, obj_var_name + )); + + // Recurse to add statements for nested object. + process_shape( + schema, + nested_shape, + &obj_var_name, + construct_statements, + where_statements, + var_counter, + visited_shapes, + ); } } @@ -829,3 +816,34 @@ fn escape_literal(lit: &str) -> String { } return out; } + +pub fn group_by_subject_for_shape<'a>( + shape: &OrmSchemaShape, + triples: &'a [Triple], + allowed_subjects: &[String], +) -> HashMap> { + let mut triples_by_subject: HashMap> = HashMap::new(); + let allowed_preds_set: HashSet<&str> = + shape.predicates.iter().map(|p| p.iri.as_str()).collect(); + let allowed_subject_set: HashSet<&str> = allowed_subjects.iter().map(|s| s.as_str()).collect(); + for triple in triples { + // triple.subject must be in allowed_subjects (or allowed_subjects empty) + // and triple.predicate must be in allowed_preds. + if allowed_preds_set.contains(triple.predicate.as_str()) { + // filter subjects if list provided + let subj = match &triple.subject { + Subject::NamedNode(n) => n.as_ref(), + _ => continue, + }; + // Subject must be in allowed subjects (or allowed_subjects is empty). + if allowed_subject_set.is_empty() || allowed_subject_set.contains(subj.as_str()) { + triples_by_subject + .entry(subj.to_string()) + .or_insert_with(Vec::new) + .push(triple); + } + } + } + + return triples_by_subject; +} diff --git a/ng-verifier/src/orm/orm_add_remove_triples.rs b/ng-verifier/src/orm/orm_add_remove_triples.rs new file mode 100644 index 0000000..e1576d7 --- /dev/null +++ b/ng-verifier/src/orm/orm_add_remove_triples.rs @@ -0,0 +1,293 @@ +// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers +// All rights reserved. +// Licensed under the Apache License, Version 2.0 +// +// or the MIT license , +// at your option. All files in the project carrying such +// notice may not be copied, modified, or distributed except +// according to those terms. + +use ng_oxigraph::oxrdf::Triple; +use ng_repo::errors::NgError; +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::Weak; + +use ng_net::orm::*; + +/// Add all triples to `subject_changes` +/// Returns predicates to nested objects that were touched and need processing. +/// Assumes all triples have same subject. +pub fn add_remove_triples( + shape: Arc, + subject_iri: &str, + triples_added: &[&Triple], + triples_removed: &[&Triple], + orm_subscription: &mut Arc, + subject_changes: &mut OrmTrackedSubjectChange, +) -> Result<(), NgError> { + // Helper to get/create tracked subjects + fn get_or_create_tracked_subject<'a>( + subject_iri: &str, + shape: &Arc, + tracked_subjects: &'a mut HashMap>>, + ) -> (&'a mut OrmTrackedSubject, Weak) { + let tracked_shapes_for_subject = tracked_subjects + .entry(subject_iri.to_string()) + .or_insert_with(HashMap::new); + + let subject = tracked_shapes_for_subject + .entry(shape.iri.clone()) + .or_insert_with(|| { + Arc::new(OrmTrackedSubject { + tracked_predicates: HashMap::new(), + parents: HashMap::new(), + valid: ng_net::orm::OrmTrackedSubjectValidity::Pending, + subject_iri: subject_iri.to_string(), + shape: shape.clone(), + }) + }); + let weak = Arc::downgrade(&subject); + (Arc::get_mut(subject).unwrap(), weak) + } + + // Destructure to get separate references and avoid borrowing conflicts + let orm_sub = Arc::get_mut(orm_subscription).unwrap(); + let schema = &orm_sub.shape_type.schema; + let tracked_subjects = &mut orm_sub.tracked_subjects; + + let (_, tracked_subject_weak) = + get_or_create_tracked_subject(subject_iri, &shape, tracked_subjects); + + // Process added triples. + // For each triple, check if it matches the shape. + // In parallel, we record the values added and removed (tracked_changes) + for triple in triples_added { + for predicate_schema in &shape.predicates { + if predicate_schema.iri != triple.predicate.as_str() { + // Triple does not match predicate. + continue; + } + // Predicate schema constraint matches this triple. + + let mut tracked_subject_upgraded = tracked_subject_weak.upgrade().unwrap(); + let tracked_subject = Arc::get_mut(&mut tracked_subject_upgraded).unwrap(); + // Add tracked predicate or increase cardinality + let tracked_predicate_ = tracked_subject + .tracked_predicates + .entry(predicate_schema.iri.to_string()) + .or_insert_with(|| { + Arc::new(OrmTrackedPredicate { + current_cardinality: 0, + schema: predicate_schema.clone(), + tracked_children: Vec::new(), + current_literals: None, + }) + }); + let tracked_predicate_weak = Arc::downgrade(&tracked_predicate_); + let tracked_predicate = Arc::get_mut(tracked_predicate_).unwrap(); + tracked_predicate.current_cardinality += 1; + + let obj_term = oxrdf_term_to_orm_basic_type(&triple.object); + + // Keep track of the changed values too. + let pred_changes: &mut OrmTrackedPredicateChanges = subject_changes + .predicates + .entry(predicate_schema.iri.clone()) + .or_insert_with(|| OrmTrackedPredicateChanges { + tracked_predicate: tracked_predicate_weak.clone(), // reference remains inside lifetime of this call + values_added: Vec::new(), + values_removed: Vec::new(), + }); + + pred_changes.values_added.push(obj_term.clone()); + + // If value type is literal, we need to add the current value to the tracked predicate. + if tracked_predicate + .schema + .dataTypes + .iter() + .any(|dt| dt.valType == OrmSchemaLiteralType::literal) + { + match &mut tracked_predicate.current_literals { + Some(lits) => lits.push(obj_term.clone()), + None => { + tracked_predicate.current_literals = Some(vec![obj_term.clone()]); + } + } + } + + // If predicate is of type shape, register (parent -> child) links so that + // nested subjects can later be (lazily) fetched / validated. + for shape_iri in predicate_schema.dataTypes.iter().filter_map(|dt| { + if dt.valType == OrmSchemaLiteralType::shape { + dt.shape.clone() + } else { + None + } + }) { + if let BasicType::Str(obj_iri) = &obj_term { + // Get or create object's tracked subject struct. + let child_shape = schema.get(&shape_iri).unwrap(); + + // If this actually created a new tracked subject, that's fine and will be removed during validation. + let (tracked_child, tracked_child_weak) = + get_or_create_tracked_subject(obj_iri, child_shape, tracked_subjects); + + // Add self to parent. + tracked_child + .parents + .insert(obj_iri.clone(), tracked_child_weak.clone()); + + // Add link to children + let mut upgraded = tracked_predicate_weak.upgrade().unwrap(); + let tracked_predicate = Arc::get_mut(&mut upgraded).unwrap(); + tracked_predicate.tracked_children.push(tracked_child_weak); + } + } + } + } + // Process removed triples. + for triple in triples_removed { + let pred_iri = triple.predicate.as_str(); + + // Only adjust if we had tracked state. + let tracked_predicate_opt = tracked_subjects + .get_mut(subject_iri) + .and_then(|tss| tss.get_mut(&shape.iri)) + .and_then(|ts| { + Arc::get_mut(ts) + .unwrap() + .tracked_predicates + .get_mut(pred_iri) + }); + let Some(tracked_predicate_arc) = tracked_predicate_opt else { + continue; + }; + let tracked_predicate = Arc::get_mut(tracked_predicate_arc).unwrap(); + + // The cardinality might become -1 or 0. We will remove them from the tracked predicates during validation. + tracked_predicate.current_cardinality = + tracked_predicate.current_cardinality.saturating_sub(1); + + let Some(pred_changes) = subject_changes.predicates.get_mut(pred_iri) else { + continue; + }; + + let val_removed = oxrdf_term_to_orm_basic_type(&triple.object); + pred_changes.values_removed.push(val_removed.clone()); + + // If value type is literal, we need to remove the current value from the tracked predicate. + if tracked_predicate + .schema + .dataTypes + .iter() + .any(|dt| dt.valType == OrmSchemaLiteralType::literal) + { + if let Some(current_literals) = &mut tracked_predicate.current_literals { + // Remove obj_val from current_literals in-place + current_literals.retain(|val| *val != val_removed); + } else { + tracked_predicate.current_literals = Some(vec![val_removed]); + } + } else if tracked_predicate + .schema + .dataTypes + .iter() + .any(|dt| dt.valType == OrmSchemaLiteralType::shape) + { + // Remove parent from child and child from tracked children. + // If predicate is of type shape, register (parent -> child) links so that + // nested subjects can later be (lazily) fetched / validated. + let shapes_to_process: Vec<_> = tracked_predicate + .schema + .dataTypes + .iter() + .filter_map(|dt| { + if dt.valType == OrmSchemaLiteralType::shape { + dt.shape.clone() + } else { + None + } + }) + .collect(); + + if let BasicType::Str(obj_iri) = &val_removed { + // Remove link to children + tracked_predicate + .tracked_children + .retain(|c| *obj_iri != c.upgrade().unwrap().subject_iri); + + for shape_iri in shapes_to_process { + // Get or create object's tracked subject struct. + let child_shape = schema.get(&shape_iri).unwrap(); + + let (tracked_child, _) = + get_or_create_tracked_subject(&obj_iri, child_shape, tracked_subjects); + + // Remove self from parent + tracked_child.parents.remove(obj_iri); + } + } + } + } + Ok(()) +} + +fn oxrdf_term_to_orm_basic_type(term: &ng_oxigraph::oxrdf::Term) -> BasicType { + match oxrdf_term_to_orm_term(term) { + ng_net::orm::Term::Str(s) => BasicType::Str(s), + ng_net::orm::Term::Num(n) => BasicType::Num(n), + ng_net::orm::Term::Bool(b) => BasicType::Bool(b), + ng_net::orm::Term::Ref(b) => BasicType::Str(b), // Treat IRIs as strings + } +} + +/// Converts an oxrdf::Term to an orm::Term +fn oxrdf_term_to_orm_term(term: &ng_oxigraph::oxrdf::Term) -> ng_net::orm::Term { + match term { + ng_oxigraph::oxrdf::Term::NamedNode(node) => { + ng_net::orm::Term::Ref(node.as_str().to_string()) + } + ng_oxigraph::oxrdf::Term::BlankNode(node) => { + ng_net::orm::Term::Ref(node.as_str().to_string()) + } + ng_oxigraph::oxrdf::Term::Literal(literal) => { + // Check the datatype to determine how to convert + match literal.datatype().as_str() { + // Check for string first, this is the most common. + "http://www.w3.org/2001/XMLSchema#string" => { + ng_net::orm::Term::Str(literal.value().to_string()) + } + "http://www.w3.org/2001/XMLSchema#boolean" => { + match literal.value().parse::() { + Ok(b) => ng_net::orm::Term::Bool(b), + Err(_) => ng_net::orm::Term::Str(literal.value().to_string()), + } + } + "http://www.w3.org/2001/XMLSchema#integer" + | "http://www.w3.org/2001/XMLSchema#decimal" + | "http://www.w3.org/2001/XMLSchema#double" + | "http://www.w3.org/2001/XMLSchema#float" + | "http://www.w3.org/2001/XMLSchema#int" + | "http://www.w3.org/2001/XMLSchema#long" + | "http://www.w3.org/2001/XMLSchema#short" + | "http://www.w3.org/2001/XMLSchema#byte" + | "http://www.w3.org/2001/XMLSchema#unsignedInt" + | "http://www.w3.org/2001/XMLSchema#unsignedLong" + | "http://www.w3.org/2001/XMLSchema#unsignedShort" + | "http://www.w3.org/2001/XMLSchema#unsignedByte" => { + match literal.value().parse::() { + Ok(n) => ng_net::orm::Term::Num(n), + Err(_) => ng_net::orm::Term::Str(literal.value().to_string()), + } + } + _ => ng_net::orm::Term::Str(literal.value().to_string()), + } + } + ng_oxigraph::oxrdf::Term::Triple(triple) => { + // For RDF-star triples, convert to string representation + ng_net::orm::Term::Str(triple.to_string()) + } + } +} diff --git a/ng-verifier/src/orm/orm_validation.rs b/ng-verifier/src/orm/orm_validation.rs new file mode 100644 index 0000000..fc3af88 --- /dev/null +++ b/ng-verifier/src/orm/orm_validation.rs @@ -0,0 +1,327 @@ +// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers +// All rights reserved. +// Licensed under the Apache License, Version 2.0 +// +// or the MIT license , +// at your option. All files in the project carrying such +// notice may not be copied, modified, or distributed except +// according to those terms. + +use std::collections::HashMap; +use std::collections::HashSet; +use std::sync::Arc; + +use crate::verifier::*; +use ng_net::orm::*; + +impl Verifier { + /// Check the validity of a subject and update affecting tracked subjects' validity. + /// Might return nested objects that need to be validated. + /// Assumes all triples to be of same subject. + pub fn update_subject_validity( + s_change: &OrmTrackedSubjectChange, + shape: &OrmSchemaShape, + orm_subscription: &mut Arc, + previous_validity: OrmTrackedSubjectValidity, + ) -> Vec<(String, String, bool)> { + let orm_sub = Arc::get_mut(orm_subscription).unwrap(); + let tracked_subjects = &mut orm_sub.tracked_subjects; + + let Some(tracked_shapes) = tracked_subjects.get_mut(&s_change.subject_iri) else { + return vec![]; + }; + let Some(tracked_subject) = tracked_shapes.get_mut(&shape.iri) else { + return vec![]; + }; + let tracked_subject = Arc::get_mut(tracked_subject).unwrap(); + // Keep track of objects that need to be validated against a shape to fetch and validate. + let mut need_evaluation: Vec<(String, String, bool)> = vec![]; + + // Check 1) Check if we need to fetch this object or all parents are untracked. + if tracked_subject.parents.len() != 0 { + let no_parents_tracking = + tracked_subject + .parents + .values() + .all(|parent| match parent.upgrade() { + Some(subject) => { + subject.valid == OrmTrackedSubjectValidity::Untracked + || subject.valid == OrmTrackedSubjectValidity::Invalid + } + None => true, + }); + + if no_parents_tracking { + // Remove tracked predicates and set untracked. + tracked_subject.tracked_predicates = HashMap::new(); + tracked_subject.valid = OrmTrackedSubjectValidity::Untracked; + return vec![]; + } else if !no_parents_tracking + && previous_validity == OrmTrackedSubjectValidity::Untracked + { + // We need to fetch the subject's current state: + // We have new parents but were previously not recording changes. + + return vec![(s_change.subject_iri.clone(), shape.iri.clone(), true)]; + // TODO + } + } + + // Check 2) If there are no changes, there is nothing to do. + if s_change.predicates.is_empty() { + return vec![]; + } + + let mut new_validity = OrmTrackedSubjectValidity::Valid; + fn set_validity( + current: &mut OrmTrackedSubjectValidity, + new_val: OrmTrackedSubjectValidity, + ) { + if new_val == OrmTrackedSubjectValidity::Invalid { + *current = OrmTrackedSubjectValidity::Invalid; + } else { + *current = new_val; + } + } + + // Check 3) If there is an infinite loop of parents pointing back to us, return invalid. + // Create a set of visited parents to detect cycles. + if has_cycle(tracked_subject, &mut HashSet::new()) { + // Remove tracked predicates and set invalid. + tracked_subject.tracked_predicates = HashMap::new(); + tracked_subject.valid = OrmTrackedSubjectValidity::Invalid; + return vec![]; + } + + // Check 4) Validate subject against each predicate in shape. + for p_schema in shape.predicates.iter() { + let p_change = s_change.predicates.get(&p_schema.iri); + let tracked_pred = p_change.and_then(|pc| pc.tracked_predicate.upgrade()); + + let count = tracked_pred + .as_ref() + .map_or_else(|| 0, |tp| tp.current_cardinality); + + // Check 4.1) Cardinality + if count < p_schema.minCardinality { + set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); + if count <= 0 { + // If cardinality is 0, we can remove the tracked predicate. + tracked_subject.tracked_predicates.remove(&p_schema.iri); + } + break; + // Check 4.2) Cardinality too high and extra values not allowed. + } else if count > p_schema.maxCardinality + && p_schema.maxCardinality != -1 + && p_schema.extra != Some(true) + { + // If cardinality is too high and no extra allowed, invalid. + set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); + break; + // Check 4.3) Required literals present. + } else if p_schema + .dataTypes + .iter() + .any(|dt| dt.valType == OrmSchemaLiteralType::literal) + { + // If we have literals, check if all required literals are present. + // At least one datatype must match. + let some_valid = p_schema.dataTypes.iter().flat_map(|dt| &dt.literals).any( + |required_literals| { + // Early stop: If no extra values allowed but the sizes + // between required and given values mismatches. + if !p_schema.extra.unwrap_or(false) + && ((required_literals.len() as i32) + != tracked_pred.as_ref().map_or(0, |p| p.current_cardinality)) + { + return false; + } + + // Check that each required literal is present. + for required_literal in required_literals { + // Is tracked predicate present? + if !tracked_pred.as_ref().map_or(false, |t| { + t.current_literals.as_ref().map_or(false, |tt| { + tt.iter().any(|literal| *literal == *required_literal) + }) + }) { + return false; + } + } + // All required literals present. + return true; + }, + ); + if !some_valid { + set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); + } + // Check 4.4) Nested shape correct. + } else if p_schema + .dataTypes + .iter() + .any(|dt| dt.valType == OrmSchemaLiteralType::shape) + { + // If we have a nested shape, we need to check if the nested objects are tracked and valid. + let tracked_children = tracked_pred.as_ref().map(|tp| { + tp.tracked_children + .iter() + .filter_map(|weak_tc| weak_tc.upgrade()) + .collect::>() + }); + // First, Count valid, invalid, unknowns, and untracked + let counts = tracked_children.as_ref().map_or((0, 0, 0, 0), |children| { + children + .iter() + .map(|tc| { + if tc.valid == OrmTrackedSubjectValidity::Valid { + (1, 0, 0, 0) + } else if tc.valid == OrmTrackedSubjectValidity::Invalid { + (0, 1, 0, 0) + } else if tc.valid == OrmTrackedSubjectValidity::Pending { + (0, 0, 1, 0) + } else if tc.valid == OrmTrackedSubjectValidity::Untracked { + (0, 0, 0, 1) + } else { + (0, 0, 0, 0) + } + }) + .fold((0, 0, 0, 0), |(v1, i1, u1, ut1), o| { + (v1 + o.0, i1 + o.1, u1 + o.2, ut1 + o.3) + }) + }); + + if counts.1 > 0 && p_schema.extra != Some(true) { + // If we have at least one invalid nested object and no extra allowed, invalid. + set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); + break; + } else if counts.0 < p_schema.minCardinality { + // If we have not enough valid nested objects, invalid. + set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); + break; + } else if counts.3 > 0 { + // If we have untracked nested objects, we need to fetch them and validate. + set_validity(&mut new_validity, OrmTrackedSubjectValidity::Pending); + // After that we need to reevaluate this (subject,shape) again. + need_evaluation.push(( + s_change.subject_iri.to_string(), + shape.iri.clone(), + false, + )); + // Also schedule untracked children for fetching and validation. + tracked_children.as_ref().map(|children| { + for child in children { + if child.valid == OrmTrackedSubjectValidity::Untracked { + need_evaluation.push(( + child.subject_iri.clone(), + child.shape.iri.clone(), + true, + )); + } + } + }); + } else if counts.2 > 0 { + // If we have unknown nested objects, we need to wait for their evaluation. + set_validity(&mut new_validity, OrmTrackedSubjectValidity::Pending); + // Schedule unknown children (NotEvaluated) for re-evaluation without fetch. + tracked_children.as_ref().map(|children| { + for child in children { + if child.valid == OrmTrackedSubjectValidity::Pending { + need_evaluation.push(( + child.subject_iri.clone(), + child.shape.iri.clone(), + false, + )); + } + } + }); + } else { + // All nested objects are valid and cardinality is correct. + // We are valid with this predicate. + } + // Check 4.5) Data types correct. + } else { + // Check if the data type is correct. + let allowed_types: Vec<&OrmSchemaLiteralType> = + p_schema.dataTypes.iter().map(|dt| &dt.valType).collect(); + // For each new value, check that data type is in allowed_types. + for val_added in p_change.iter().map(|pc| &pc.values_added).flatten() { + let matches = match val_added { + BasicType::Bool(_) => allowed_types + .iter() + .any(|t| **t == OrmSchemaLiteralType::boolean), + BasicType::Num(_) => allowed_types + .iter() + .any(|t| **t == OrmSchemaLiteralType::number), + BasicType::Str(_) => allowed_types.iter().any(|t| { + **t == OrmSchemaLiteralType::string || **t == OrmSchemaLiteralType::iri + }), + }; + if !matches { + set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); + break; + } + } + // Break again if validity has become invalid. + if new_validity == OrmTrackedSubjectValidity::Invalid { + break; + } + }; + } + + if new_validity == OrmTrackedSubjectValidity::Invalid { + // If we are invalid, we can discard new unknowns again - they won't be kept in memory. + // We need to remove ourself from child objects parents field and + // remove them if no other is tracking. + // Child relationship cleanup disabled (nested tracking disabled in this refactor step) + + // Remove tracked predicates and set untracked. + tracked_subject.tracked_predicates = HashMap::new(); + + // Empty list of children that need evaluation. + need_evaluation.retain(|_| false); + } else if new_validity == OrmTrackedSubjectValidity::Valid + && previous_validity != OrmTrackedSubjectValidity::Valid + { + // If this subject became valid, we need to refetch this subject; + // We fetch + need_evaluation.insert(0, (s_change.subject_iri.clone(), shape.iri.clone(), true)); + } + + // If validity changed, parents need to be re-evaluated. + if new_validity != previous_validity { + // We return the tracking parents which need re-evaluation. + // Remember that the last elements (i.e. children or needs_fetch) are evaluated first. + return tracked_subject + .parents + .values() + .filter_map(|parent| { + parent + .upgrade() + .map(|parent| (parent.subject_iri.clone(), parent.shape.iri.clone(), false)) + }) + // Add `need_evaluation`. + .chain(need_evaluation) + .collect(); + } + + tracked_subject.valid = new_validity; + + return need_evaluation; + } +} + +fn has_cycle(subject: &OrmTrackedSubject, visited: &mut HashSet) -> bool { + if visited.contains(&subject.subject_iri) { + return true; + } + visited.insert(subject.subject_iri.clone()); + for (_parent_iri, parent_subject) in &subject.parents { + if let Some(parent_subject) = parent_subject.upgrade() { + if has_cycle(&parent_subject, visited) { + return true; + } + } + } + visited.remove(&subject.subject_iri); + false +} diff --git a/ng-verifier/src/utils/mod.rs b/ng-verifier/src/orm/types.rs similarity index 94% rename from ng-verifier/src/utils/mod.rs rename to ng-verifier/src/orm/types.rs index d2039c7..2e4f5f3 100644 --- a/ng-verifier/src/utils/mod.rs +++ b/ng-verifier/src/orm/types.rs @@ -6,5 +6,3 @@ // at your option. All files in the project carrying such // notice may not be copied, modified, or distributed except // according to those terms. - -pub mod orm_validation; diff --git a/ng-verifier/src/utils/orm_validation.rs b/ng-verifier/src/utils/orm_validation.rs deleted file mode 100644 index 77575cf..0000000 --- a/ng-verifier/src/utils/orm_validation.rs +++ /dev/null @@ -1,600 +0,0 @@ -// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers -// All rights reserved. -// Licensed under the Apache License, Version 2.0 -// -// or the MIT license , -// at your option. All files in the project carrying such -// notice may not be copied, modified, or distributed except -// according to those terms. - -use std::collections::HashMap; -use std::collections::HashSet; -use std::sync::Arc; -use std::sync::Weak; - -use ng_net::orm::*; -use ng_oxigraph::oxrdf::Subject; -use ng_oxigraph::oxrdf::Triple; -use ng_repo::errors::NgError; - -pub fn group_by_subject_for_shape<'a>( - shape: &OrmSchemaShape, - triples: &'a [Triple], - allowed_subjects: &[String], -) -> HashMap> { - let mut triples_by_subject: HashMap> = HashMap::new(); - let allowed_preds_set: HashSet<&str> = - shape.predicates.iter().map(|p| p.iri.as_str()).collect(); - let allowed_subject_set: HashSet<&str> = allowed_subjects.iter().map(|s| s.as_str()).collect(); - for triple in triples { - // triple.subject must be in allowed_subjects (or allowed_subjects empty) - // and triple.predicate must be in allowed_preds. - if allowed_preds_set.contains(triple.predicate.as_str()) { - // filter subjects if list provided - let subj = match &triple.subject { - Subject::NamedNode(n) => n.as_ref(), - _ => continue, - }; - // Subject must be in allowed subjects (or allowed_subjects is empty). - if allowed_subject_set.is_empty() || allowed_subject_set.contains(subj.as_str()) { - triples_by_subject - .entry(subj.to_string()) - .or_insert_with(Vec::new) - .push(triple); - } - } - } - - return triples_by_subject; -} - -/// Add all triples to `subject_changes` -/// Returns predicates to nested objects that were touched and need processing. -/// Assumes all triples have same subject. -pub fn add_remove_triples_mut( - shape: Arc, - subject_iri: &str, - triples_added: &[&Triple], - triples_removed: &[&Triple], - tracked_subjects: &mut HashMap>>, - subject_changes: &mut OrmTrackedSubjectChange, -) -> Result<(), NgError> { - fn get_or_create_tracked_subject<'a>( - subject_iri: &str, - shape: &Arc, - tracked_subjects: &'a mut HashMap>>, - ) -> (&'a mut OrmTrackedSubject, Weak) { - let tracked_shapes_for_subject = tracked_subjects - .entry(subject_iri.to_string()) - .or_insert_with(HashMap::new); - - let subject = tracked_shapes_for_subject - .entry(shape.iri.clone()) - .or_insert_with(|| { - Arc::new(OrmTrackedSubject { - tracked_predicates: HashMap::new(), - parents: HashMap::new(), - valid: ng_net::orm::OrmTrackedSubjectValidity::Pending, - subject_iri: subject_iri.to_string(), - shape: shape.clone(), - }) - }); - let weak = Arc::downgrade(&subject); - (Arc::get_mut(subject).unwrap(), weak) - } - - let (_, tracked_subject_weak) = - get_or_create_tracked_subject(subject_iri, &shape, tracked_subjects); - - // Process added triples. - // For each triple, check if it matches the shape. - // In parallel, we record the values added and removed (tracked_changes) - for triple in triples_added { - for predicate_schema in &shape.predicates { - if predicate_schema.iri != triple.predicate.as_str() { - // Triple does not match predicate. - continue; - } - // Predicate schema constraint matches this triple. - - let mut upgraded = tracked_subject_weak.upgrade().unwrap(); - let tracked_subject = Arc::get_mut(&mut upgraded).unwrap(); - // Add tracked predicate or increase cardinality - let tracked_predicate_ = tracked_subject - .tracked_predicates - .entry(predicate_schema.iri.to_string()) - .or_insert_with(|| { - Arc::new(OrmTrackedPredicate { - current_cardinality: 0, - schema: predicate_schema.clone(), - tracked_children: Vec::new(), - current_literals: None, - }) - }); - let tracked_predicate_weak = Arc::downgrade(&tracked_predicate_); - let tracked_predicate = Arc::get_mut(tracked_predicate_).unwrap(); - tracked_predicate.current_cardinality += 1; - - let obj_term = oxrdf_term_to_orm_basic_type(&triple.object); - - // Keep track of the changed values too. - let pred_changes: &mut OrmTrackedPredicateChanges = subject_changes - .predicates - .entry(predicate_schema.iri.clone()) - .or_insert_with(|| OrmTrackedPredicateChanges { - tracked_predicate: tracked_predicate_weak.clone(), // reference remains inside lifetime of this call - values_added: Vec::new(), - values_removed: Vec::new(), - }); - - pred_changes.values_added.push(obj_term.clone()); - - // If value type is literal, we need to add the current value to the tracked predicate. - if tracked_predicate - .schema - .dataTypes - .iter() - .any(|dt| dt.valType == OrmSchemaLiteralType::literal) - { - match &mut tracked_predicate.current_literals { - Some(lits) => lits.push(obj_term.clone()), - None => { - tracked_predicate.current_literals = Some(vec![obj_term.clone()]); - } - } - } - - // If predicate is of type shape, register (parent -> child) links so that - // nested subjects can later be (lazily) fetched / validated. - // FIXME : shape_iri is never used - for shape_iri in predicate_schema.dataTypes.iter().filter_map(|dt| { - if dt.valType == OrmSchemaLiteralType::shape { - dt.shape.clone() - } else { - None - } - }) { - if let BasicType::Str(obj_iri) = &obj_term { - // Get or create object's tracked subject struct. - let (tracked_child, tracked_child_weak) = get_or_create_tracked_subject( - triple.predicate.as_string(), - &shape, - tracked_subjects, - ); - - // Add self to parent (set tracked to true, preliminary). - tracked_child - .parents - .insert(obj_iri.clone(), tracked_child_weak.clone()); - - // Add link to children - let mut upgraded = tracked_predicate_weak.upgrade().unwrap(); - let tracked_predicate = Arc::get_mut(&mut upgraded).unwrap(); - tracked_predicate.tracked_children.push(tracked_child_weak); - } - } - } - } - // Process removed triples. - for triple in triples_removed { - let pred_iri = triple.predicate.as_str(); - - // Only adjust if we had tracked state. - let tracked_predicate_opt = tracked_subjects - .get_mut(subject_iri) - .and_then(|tss| tss.get_mut(&shape.iri)) - .and_then(|ts| { - Arc::get_mut(ts) - .unwrap() - .tracked_predicates - .get_mut(pred_iri) - }); - let Some(tracked_predicate_arc) = tracked_predicate_opt else { - continue; - }; - let tracked_predicate = Arc::get_mut(tracked_predicate_arc).unwrap(); - - // The cardinality might become -1 or 0. We will remove them from the tracked predicates during validation. - tracked_predicate.current_cardinality = - tracked_predicate.current_cardinality.saturating_sub(1); - - let Some(pred_changes) = subject_changes.predicates.get_mut(pred_iri) else { - continue; - }; - - let val_removed = oxrdf_term_to_orm_basic_type(&triple.object); - pred_changes.values_removed.push(val_removed.clone()); - - // If value type is literal, we need to remove the current value from the tracked predicate. - if tracked_predicate - .schema - .dataTypes - .iter() - .any(|dt| dt.valType == OrmSchemaLiteralType::literal) - { - if let Some(current_literals) = &mut tracked_predicate.current_literals { - // Remove obj_val from current_literals in-place - current_literals.retain(|val| *val != val_removed); - } else { - tracked_predicate.current_literals = Some(vec![val_removed]); - } - } else if tracked_predicate - .schema - .dataTypes - .iter() - .any(|dt| dt.valType == OrmSchemaLiteralType::shape) - { - // Remove parent from child and child from tracked children. - for shape_iri in tracked_predicate.schema.dataTypes.iter().filter_map(|dt| { - if dt.valType == OrmSchemaLiteralType::shape { - dt.shape.clone() - } else { - None - } - }) { - // Nested shape removal logic disabled (see note above). - } - } - } - Ok(()) -} - -/// Check the validity of a subject and update affecting tracked subjects' validity. -/// Might return nested objects that need to be validated. -/// Assumes all triples to be of same subject. -pub fn update_subject_validity( - s_change: &OrmTrackedSubjectChange, - shape: &OrmSchemaShape, - tracked_subjects: &mut HashMap>>, - previous_validity: OrmTrackedSubjectValidity, -) -> Vec<(String, String, bool)> { - let Some(tracked_shapes) = tracked_subjects.get_mut(&s_change.subject_iri) else { - return vec![]; - }; - let Some(tracked_subject) = tracked_shapes.get_mut(&shape.iri) else { - return vec![]; - }; - let tracked_subject = Arc::get_mut(tracked_subject).unwrap(); - // Keep track of objects that need to be validated against a shape to fetch and validate. - let mut need_evaluation: Vec<(String, String, bool)> = vec![]; - - // Check 1) Check if we need to fetch this object or all parents are untracked. - if tracked_subject.parents.len() != 0 { - let no_parents_tracking = - tracked_subject - .parents - .values() - .all(|parent| match parent.upgrade() { - Some(subject) => { - subject.valid == OrmTrackedSubjectValidity::Untracked - || subject.valid == OrmTrackedSubjectValidity::Invalid - } - None => true, - }); - - if no_parents_tracking { - // Remove tracked predicates and set untracked. - tracked_subject.tracked_predicates = HashMap::new(); - tracked_subject.valid = OrmTrackedSubjectValidity::Untracked; - return vec![]; - } else if !no_parents_tracking && previous_validity == OrmTrackedSubjectValidity::Untracked - { - // We need to fetch the subject's current state: - // We have new parents but were previously not recording changes. - - return vec![(s_change.subject_iri.clone(), shape.iri.clone(), true)]; - // TODO - } - } - - // Check 2) If there are no changes, there is nothing to do. - if s_change.predicates.is_empty() { - return vec![]; - } - - let mut new_validity = OrmTrackedSubjectValidity::Valid; - fn set_validity(current: &mut OrmTrackedSubjectValidity, new_val: OrmTrackedSubjectValidity) { - if new_val == OrmTrackedSubjectValidity::Invalid { - *current = OrmTrackedSubjectValidity::Invalid; - } else { - *current = new_val; - } - } - - // Check 3) If there is an infinite loop of parents pointing back to us, return invalid. - // Create a set of visited parents to detect cycles. - if has_cycle(tracked_subject, &mut HashSet::new()) { - // Remove tracked predicates and set invalid. - tracked_subject.tracked_predicates = HashMap::new(); - tracked_subject.valid = OrmTrackedSubjectValidity::Invalid; - return vec![]; - } - - // Check 4) Validate subject against each predicate in shape. - for p_schema in shape.predicates.iter() { - let p_change = s_change.predicates.get(&p_schema.iri); - let tracked_pred = p_change.and_then(|pc| pc.tracked_predicate.upgrade()); - - let count = tracked_pred - .as_ref() - .map_or_else(|| 0, |tp| tp.current_cardinality); - - // Check 4.1) Cardinality - if count < p_schema.minCardinality { - set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); - if count <= 0 { - // If cardinality is 0, we can remove the tracked predicate. - tracked_subject.tracked_predicates.remove(&p_schema.iri); - } - break; - // Check 4.2) Cardinality too high and extra values not allowed. - } else if count > p_schema.maxCardinality - && p_schema.maxCardinality != -1 - && p_schema.extra != Some(true) - { - // If cardinality is too high and no extra allowed, invalid. - set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); - break; - // Check 4.3) Required literals present. - } else if p_schema - .dataTypes - .iter() - .any(|dt| dt.valType == OrmSchemaLiteralType::literal) - { - // If we have literals, check if all required literals are present. - // At least one datatype must match. - let some_valid = - p_schema - .dataTypes - .iter() - .flat_map(|dt| &dt.literals) - .any(|required_literals| { - // Early stop: If no extra values allowed but the sizes - // between required and given values mismatches. - if !p_schema.extra.unwrap_or(false) - && ((required_literals.len() as i32) - != tracked_pred.as_ref().map_or(0, |p| p.current_cardinality)) - { - return false; - } - - // Check that each required literal is present. - for required_literal in required_literals { - // Is tracked predicate present? - if !tracked_pred.as_ref().map_or(false, |t| { - t.current_literals.as_ref().map_or(false, |tt| { - tt.iter().any(|literal| *literal == *required_literal) - }) - }) { - return false; - } - } - // All required literals present. - return true; - }); - if !some_valid { - set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); - } - // Check 4.4) Nested shape correct. - } else if p_schema - .dataTypes - .iter() - .any(|dt| dt.valType == OrmSchemaLiteralType::shape) - { - // If we have a nested shape, we need to check if the nested objects are tracked and valid. - let tracked_children = tracked_pred.as_ref().map(|tp| { - tp.tracked_children - .iter() - .filter_map(|weak_tc| weak_tc.upgrade()) - .collect::>() - }); - // First, Count valid, invalid, unknowns, and untracked - let counts = tracked_children.as_ref().map_or((0, 0, 0, 0), |children| { - children - .iter() - .map(|tc| { - if tc.valid == OrmTrackedSubjectValidity::Valid { - (1, 0, 0, 0) - } else if tc.valid == OrmTrackedSubjectValidity::Invalid { - (0, 1, 0, 0) - } else if tc.valid == OrmTrackedSubjectValidity::Pending { - (0, 0, 1, 0) - } else if tc.valid == OrmTrackedSubjectValidity::Untracked { - (0, 0, 0, 1) - } else { - (0, 0, 0, 0) - } - }) - .fold((0, 0, 0, 0), |(v1, i1, u1, ut1), o| { - (v1 + o.0, i1 + o.1, u1 + o.2, ut1 + o.3) - }) - }); - - if counts.1 > 0 && p_schema.extra != Some(true) { - // If we have at least one invalid nested object and no extra allowed, invalid. - set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); - break; - } else if counts.0 < p_schema.minCardinality { - // If we have not enough valid nested objects, invalid. - set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); - break; - } else if counts.3 > 0 { - // If we have untracked nested objects, we need to fetch them and validate. - set_validity(&mut new_validity, OrmTrackedSubjectValidity::Pending); - // After that we need to reevaluate this (subject,shape) again. - need_evaluation.push((s_change.subject_iri.to_string(), shape.iri.clone(), false)); - // Also schedule untracked children for fetching and validation. - tracked_children.as_ref().map(|children| { - for child in children { - if child.valid == OrmTrackedSubjectValidity::Untracked { - need_evaluation.push(( - child.subject_iri.clone(), - child.shape.iri.clone(), - true, - )); - } - } - }); - } else if counts.2 > 0 { - // If we have unknown nested objects, we need to wait for their evaluation. - set_validity(&mut new_validity, OrmTrackedSubjectValidity::Pending); - // Schedule unknown children (NotEvaluated) for re-evaluation without fetch. - tracked_children.as_ref().map(|children| { - for child in children { - if child.valid == OrmTrackedSubjectValidity::Pending { - need_evaluation.push(( - child.subject_iri.clone(), - child.shape.iri.clone(), - false, - )); - } - } - }); - } else { - // All nested objects are valid and cardinality is correct. - // We are valid with this predicate. - } - // Check 4.5) Data types correct. - } else { - // Check if the data type is correct. - let allowed_types: Vec<&OrmSchemaLiteralType> = - p_schema.dataTypes.iter().map(|dt| &dt.valType).collect(); - // For each new value, check that data type is in allowed_types. - for val_added in p_change.iter().map(|pc| &pc.values_added).flatten() { - let matches = match val_added { - BasicType::Bool(_) => allowed_types - .iter() - .any(|t| **t == OrmSchemaLiteralType::boolean), - BasicType::Num(_) => allowed_types - .iter() - .any(|t| **t == OrmSchemaLiteralType::number), - BasicType::Str(_) => allowed_types.iter().any(|t| { - **t == OrmSchemaLiteralType::string || **t == OrmSchemaLiteralType::iri - }), - }; - if !matches { - set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); - break; - } - } - // Break again if validity has become invalid. - if new_validity == OrmTrackedSubjectValidity::Invalid { - break; - } - }; - } - - if new_validity == OrmTrackedSubjectValidity::Invalid { - // If we are invalid, we can discard new unknowns again - they won't be kept in memory. - // We need to remove ourself from child objects parents field and - // remove them if no other is tracking. - // Child relationship cleanup disabled (nested tracking disabled in this refactor step) - - // Remove tracked predicates and set untracked. - tracked_subject.tracked_predicates = HashMap::new(); - - // Empty list of children that need evaluation. - need_evaluation.retain(|_| false); - } else if new_validity == OrmTrackedSubjectValidity::Valid - && previous_validity != OrmTrackedSubjectValidity::Valid - { - // If this subject became valid, we need to refetch this subject; - // We fetch - need_evaluation.insert(0, (s_change.subject_iri.clone(), shape.iri.clone(), true)); - } - - // If validity changed, parents need to be re-evaluated. - if new_validity != previous_validity { - // We return the tracking parents which need re-evaluation. - // Remember that the last elements (i.e. children or needs_fetch) are evaluated first. - return tracked_subject - .parents - .values() - .filter_map(|parent| { - parent - .upgrade() - .map(|parent| (parent.subject_iri.clone(), parent.shape.iri.clone(), false)) - }) - // Add `need_evaluation`. - .chain(need_evaluation) - .collect(); - } - - tracked_subject.valid = new_validity; - - return need_evaluation; -} - -fn oxrdf_term_to_orm_basic_type(term: &ng_oxigraph::oxrdf::Term) -> BasicType { - match oxrdf_term_to_orm_term(term) { - ng_net::orm::Term::Str(s) => BasicType::Str(s), - ng_net::orm::Term::Num(n) => BasicType::Num(n), - ng_net::orm::Term::Bool(b) => BasicType::Bool(b), - ng_net::orm::Term::Ref(b) => BasicType::Str(b), // Treat IRIs as strings - } -} - -fn has_cycle(subject: &OrmTrackedSubject, visited: &mut HashSet) -> bool { - if visited.contains(&subject.subject_iri) { - return true; - } - visited.insert(subject.subject_iri.clone()); - for (_parent_iri, parent_subject) in &subject.parents { - if let Some(parent_subject) = parent_subject.upgrade() { - if has_cycle(&parent_subject, visited) { - return true; - } - } - } - visited.remove(&subject.subject_iri); - false -} - -/// Converts an oxrdf::Term to an orm::Term -fn oxrdf_term_to_orm_term(term: &ng_oxigraph::oxrdf::Term) -> ng_net::orm::Term { - match term { - ng_oxigraph::oxrdf::Term::NamedNode(node) => { - ng_net::orm::Term::Ref(node.as_str().to_string()) - } - ng_oxigraph::oxrdf::Term::BlankNode(node) => { - ng_net::orm::Term::Ref(node.as_str().to_string()) - } - ng_oxigraph::oxrdf::Term::Literal(literal) => { - // Check the datatype to determine how to convert - match literal.datatype().as_str() { - // Check for string first, this is the most common. - "http://www.w3.org/2001/XMLSchema#string" => { - ng_net::orm::Term::Str(literal.value().to_string()) - } - "http://www.w3.org/2001/XMLSchema#boolean" => { - match literal.value().parse::() { - Ok(b) => ng_net::orm::Term::Bool(b), - Err(_) => ng_net::orm::Term::Str(literal.value().to_string()), - } - } - "http://www.w3.org/2001/XMLSchema#integer" - | "http://www.w3.org/2001/XMLSchema#decimal" - | "http://www.w3.org/2001/XMLSchema#double" - | "http://www.w3.org/2001/XMLSchema#float" - | "http://www.w3.org/2001/XMLSchema#int" - | "http://www.w3.org/2001/XMLSchema#long" - | "http://www.w3.org/2001/XMLSchema#short" - | "http://www.w3.org/2001/XMLSchema#byte" - | "http://www.w3.org/2001/XMLSchema#unsignedInt" - | "http://www.w3.org/2001/XMLSchema#unsignedLong" - | "http://www.w3.org/2001/XMLSchema#unsignedShort" - | "http://www.w3.org/2001/XMLSchema#unsignedByte" => { - match literal.value().parse::() { - Ok(n) => ng_net::orm::Term::Num(n), - Err(_) => ng_net::orm::Term::Str(literal.value().to_string()), - } - } - _ => ng_net::orm::Term::Str(literal.value().to_string()), - } - } - ng_oxigraph::oxrdf::Term::Triple(triple) => { - // For RDF-star triples, convert to string representation - ng_net::orm::Term::Str(triple.to_string()) - } - } -} diff --git a/ng-verifier/src/verifier.rs b/ng-verifier/src/verifier.rs index fe85862..164c8a6 100644 --- a/ng-verifier/src/verifier.rs +++ b/ng-verifier/src/verifier.rs @@ -112,7 +112,7 @@ pub struct Verifier { in_memory_outbox: Vec, uploads: BTreeMap, branch_subscriptions: HashMap>, - pub(crate) orm_subscriptions: HashMap>, + pub(crate) orm_subscriptions: HashMap>>, pub(crate) temporary_repo_certificates: HashMap, }