diff --git a/ng-net/src/orm.rs b/ng-net/src/orm.rs index e346e34..7884a7a 100644 --- a/ng-net/src/orm.rs +++ b/ng-net/src/orm.rs @@ -11,14 +11,16 @@ #![allow(non_snake_case)] -use std::collections::HashSet; -use std::{collections::HashMap, rc::Weak}; +use std::{ + collections::HashMap, + sync::{Arc, Weak}, +}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use crate::app_protocol::AppResponse; +use crate::app_protocol::{AppResponse, NuriV0}; use crate::utils::Sender; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -52,7 +54,6 @@ pub struct OrmDiffOp { pub type OrmDiff = Vec; -/* == ORM Schema == */ pub type OrmSchema = HashMap; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -98,44 +99,37 @@ pub struct OrmSchemaPredicate { pub extra: Option, } -#[derive(Clone, Debug)] -pub struct OrmSubscription<'a> { - pub sender: Sender, - pub tracked_objects: HashMap>, -} - /// A struct for recording the state of subjects and its predicates /// relevant to its shape. #[derive(Clone, Debug)] -pub struct OrmTrackedSubjectAndShape<'a> { +pub struct OrmTrackedSubject { /// The known predicates (only those relevant to the shape). /// If there are no triples with a predicate, they are discarded - pub tracked_predicates: HashMap>, + pub tracked_predicates: HashMap, /// If this is a nested subject, this records the parents /// and if they are currently tracking this subject. - pub parents: HashMap, bool)>, + pub parents: HashMap, /// Validity. When untracked, triple updates are not processed here. pub valid: OrmTrackedSubjectValidity, pub subject_iri: String, /// The shape for which the predicates are tracked. - pub shape: &'a OrmSchemaShape, + pub shape: Arc, } #[derive(Clone, Debug, Eq, PartialEq)] pub enum OrmTrackedSubjectValidity { Valid, Invalid, - NotEvaluated, + Pending, Untracked, - NeedsFetch, } #[derive(Clone, Debug)] -pub struct OrmTrackedPredicate<'a> { +pub struct OrmTrackedPredicate { /// The predicate schema - pub schema: &'a OrmSchemaPredicate, + pub schema: Arc, /// If the schema is a nested object, the children. - pub tracked_children: Vec>>, + pub tracked_children: Vec>, /// The count of triples for this subject and predicate. pub current_cardinality: i32, /// If schema is of type literal, the currently present ones. @@ -148,12 +142,10 @@ pub struct OrmTrackedSubjectChange<'a> { pub subject_iri: String, /// Predicates that were changed. pub predicates: HashMap>, - /// During validation, the current state of validity (can be subject to change). - pub valid: OrmTrackedSubjectValidity, } pub struct OrmTrackedPredicateChanges<'a> { /// The tracked predicate for which those changes were recorded. - pub tracked_predicate: &'a OrmTrackedPredicate<'a>, + pub tracked_predicate: &'a OrmTrackedPredicate, pub values_added: Vec, pub values_removed: Vec, } @@ -166,6 +158,17 @@ pub enum Term { Ref(String), } +#[derive(Clone, Debug)] +pub struct OrmSubscription { + pub shape_type: OrmShapeType, + pub session_id: u64, + pub nuri: NuriV0, + pub sender: Sender, + pub tracked_subjects: HashMap>, +} +type ShapeIri = String; +type SubjectIri = String; + impl Default for OrmSchemaDataType { fn default() -> Self { Self { @@ -188,10 +191,3 @@ impl Default for OrmSchemaPredicate { } } } - -/** == Internal data types == */ -#[derive(Clone, Debug)] -pub struct OrmShapeTypeRef { - pub ref_count: u64, - pub shape_type: OrmShapeType, -} diff --git a/ng-verifier/src/commits/mod.rs b/ng-verifier/src/commits/mod.rs index 068e580..4be9f9e 100644 --- a/ng-verifier/src/commits/mod.rs +++ b/ng-verifier/src/commits/mod.rs @@ -29,7 +29,7 @@ use ng_net::app_protocol::*; use crate::verifier::Verifier; -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] pub trait CommitVerifier { async fn verify( &self, @@ -288,7 +288,7 @@ impl CommitVerifier for AddBranch { Ok(()) } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for Repository { async fn verify( &self, @@ -302,7 +302,7 @@ impl CommitVerifier for Repository { Ok(()) } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for StoreUpdate { async fn verify( &self, @@ -315,7 +315,7 @@ impl CommitVerifier for StoreUpdate { verifier.new_store_from_update(self) } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for AddInboxCap { async fn verify( &self, @@ -330,7 +330,7 @@ impl CommitVerifier for AddInboxCap { } } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for AddSignerCap { async fn verify( &self, @@ -345,7 +345,7 @@ impl CommitVerifier for AddSignerCap { } } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for AddMember { #[allow(unused_variables)] async fn verify( @@ -359,7 +359,7 @@ impl CommitVerifier for AddMember { Ok(()) } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for RemoveMember { #[allow(unused_variables)] async fn verify( @@ -373,7 +373,7 @@ impl CommitVerifier for RemoveMember { Ok(()) } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for AddPermission { #[allow(unused_variables)] async fn verify( @@ -387,7 +387,7 @@ impl CommitVerifier for AddPermission { Ok(()) } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for RemovePermission { #[allow(unused_variables)] async fn verify( @@ -401,7 +401,7 @@ impl CommitVerifier for RemovePermission { Ok(()) } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for RemoveBranch { #[allow(unused_variables)] async fn verify( @@ -415,7 +415,7 @@ impl CommitVerifier for RemoveBranch { Ok(()) } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for AddName { #[allow(unused_variables)] async fn verify( @@ -429,7 +429,7 @@ impl CommitVerifier for AddName { Ok(()) } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for RemoveName { #[allow(unused_variables)] async fn verify( @@ -443,7 +443,7 @@ impl CommitVerifier for RemoveName { Ok(()) } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for () { #[allow(unused_variables)] async fn verify( @@ -457,7 +457,7 @@ impl CommitVerifier for () { Ok(()) } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for Snapshot { #[allow(unused_variables)] async fn verify( @@ -484,7 +484,7 @@ impl CommitVerifier for Snapshot { Ok(()) } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for AddFile { #[allow(unused_variables)] async fn verify( @@ -529,7 +529,7 @@ impl CommitVerifier for AddFile { } } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for RemoveFile { #[allow(unused_variables)] async fn verify( @@ -543,7 +543,7 @@ impl CommitVerifier for RemoveFile { Ok(()) } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for Compact { #[allow(unused_variables)] async fn verify( @@ -557,7 +557,7 @@ impl CommitVerifier for Compact { Ok(()) } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for AsyncSignature { #[allow(unused_variables)] async fn verify( @@ -605,7 +605,7 @@ impl CommitVerifier for AsyncSignature { } } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for RootCapRefresh { #[allow(unused_variables)] async fn verify( @@ -619,7 +619,7 @@ impl CommitVerifier for RootCapRefresh { Ok(()) } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for BranchCapRefresh { #[allow(unused_variables)] async fn verify( @@ -633,7 +633,7 @@ impl CommitVerifier for BranchCapRefresh { Ok(()) } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for AddRepo { #[allow(unused_variables)] async fn verify( @@ -656,7 +656,7 @@ impl CommitVerifier for AddRepo { Ok(()) } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for RemoveRepo { #[allow(unused_variables)] async fn verify( @@ -670,7 +670,7 @@ impl CommitVerifier for RemoveRepo { Ok(()) } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for AddLink { #[allow(unused_variables)] async fn verify( @@ -684,7 +684,7 @@ impl CommitVerifier for AddLink { Ok(()) } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for RemoveLink { #[allow(unused_variables)] async fn verify( @@ -698,7 +698,7 @@ impl CommitVerifier for RemoveLink { Ok(()) } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for RemoveSignerCap { #[allow(unused_variables)] async fn verify( @@ -712,7 +712,7 @@ impl CommitVerifier for RemoveSignerCap { Ok(()) } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl CommitVerifier for WalletUpdate { #[allow(unused_variables)] async fn verify( diff --git a/ng-verifier/src/lib.rs b/ng-verifier/src/lib.rs index ba1ee55..f2120d4 100644 --- a/ng-verifier/src/lib.rs +++ b/ng-verifier/src/lib.rs @@ -26,6 +26,7 @@ mod inbox_processor; #[cfg(all(not(target_family = "wasm"), not(docsrs)))] mod rocksdb_user_storage; +pub(crate) mod utils; use ng_net::app_protocol::*; use ng_oxigraph::oxrdf::Triple; diff --git a/ng-verifier/src/orm.rs b/ng-verifier/src/orm.rs index a63a2ee..b4d71ed 100644 --- a/ng-verifier/src/orm.rs +++ b/ng-verifier/src/orm.rs @@ -8,9 +8,9 @@ // according to those terms. use futures::channel::mpsc; +use ng_net::orm::OrmSubscription; use std::collections::HashMap; use std::collections::HashSet; -use std::rc::Weak; use futures::SinkExt; use lazy_static::lazy_static; @@ -18,18 +18,13 @@ use ng_net::orm::BasicType; pub use ng_net::orm::OrmDiff; use ng_net::orm::OrmSchemaLiteralType; pub use ng_net::orm::OrmShapeType; -use ng_net::orm::OrmShapeTypeRef; -use ng_net::orm::OrmSubscription; -use ng_net::orm::OrmTrackedPredicate; -use ng_net::orm::OrmTrackedPredicateChanges; -use ng_net::orm::OrmTrackedSubjectAndShape; +use ng_net::orm::OrmTrackedSubject; use ng_net::orm::OrmTrackedSubjectChange; use ng_net::orm::OrmTrackedSubjectValidity; use ng_net::orm::{OrmSchemaDataType, OrmSchemaShape}; use ng_net::utils::Receiver; use ng_net::{app_protocol::*, orm::OrmSchema}; use ng_oxigraph::oxigraph::sparql::{Query, QueryResults}; -use ng_oxigraph::oxrdf::Subject; use ng_oxigraph::oxrdf::Triple; use ng_repo::errors::NgError; use ng_repo::errors::VerifierError; @@ -39,8 +34,16 @@ use serde_json::json; use serde_json::Value; use crate::types::*; +use crate::utils::orm_validation::*; use crate::verifier::*; +type ShapeIri = String; +type SubjectIri = String; +// Structure to store changes in. By shape iri > subject iri > OrmTrackedSubjectChange +// **NOTE**: In comparison to OrmSubscription.tracked_subjects, the outer hashmap's keys are shape IRIs. +// (shape IRI -> (subject IRI -> OrmTrackedSubjectChange)) +type OrmChanges<'a> = HashMap>>; + impl Verifier { pub fn sparql_construct( &self, @@ -80,754 +83,324 @@ impl Verifier { } } - fn apply_changes_from_triples( - &mut self, - scope: &NuriV0, - schema: &OrmSchema, - root_shape: &OrmSchemaShape, - triples_added: &Vec, - triples_removed: &Vec, - ) -> HashMap> { - let tracked_subjects = self.orm_tracked_subjects; - - // === Helper functions === - - fn group_by_subject_for_shape<'a>( - shape: &'a OrmSchemaShape, - triples: &'a Vec, - allowed_subjects: &Vec, - ) -> HashMap> { - let mut triples_by_pred: HashMap> = HashMap::new(); - let allowed_preds: HashSet<&str> = - shape.predicates.iter().map(|p| p.iri.as_str()).collect(); - let allowed_objs: HashSet<&String> = allowed_subjects.iter().collect(); - for triple in triples { - // triple.subject must be in allowed_subjects (or allowed_subjects empty) - // and triple.predicate must be in allowed_preds. - if (allowed_objs.is_empty() || allowed_objs.contains(&triple.subject.to_string())) - && allowed_preds.contains(triple.predicate.as_str()) - { - triples_by_pred - .entry(triple.predicate.as_str().to_string()) - .or_insert_with(|| vec![]) - .push(triple); - } - } - // Based on those triples, group by subject. - let mut triples_by_subject: HashMap> = HashMap::new(); - for triple in triples { - let subject_iri = match &triple.subject { - Subject::NamedNode(node) => node.as_str(), - _ => continue, // Won't happen. - }; - triples_by_subject - .entry(subject_iri.to_string()) - .or_insert_with(|| vec![]) - .push(&triple); - } - return triples_by_subject; - } - - /// Add all triples to `changes` - /// Returns predicates to nested objects that were touched and need processing. - /// Assumes all triples have same subject. - fn add_remove_triples( - shape: &OrmSchemaShape, - subject_iri: &String, - triples_added: &Vec<&Triple>, - triples_removed: &Vec<&Triple>, - tracked_subjects: &HashMap>, - subject_changes: &OrmTrackedSubjectChange, - ) { - let get_or_create_tracked_subject = |subject_iri: &String, shape_iri: &String| { - let tracked_shapes_for_subject = tracked_subjects - .entry(subject_iri.clone()) - .or_insert_with(|| HashMap::new()); - - let tracked_subject = tracked_shapes_for_subject - .entry(shape_iri.clone()) - .or_insert_with(|| OrmTrackedSubjectAndShape { - tracked_predicates: HashMap::new(), - parents: HashMap::new(), - valid: ng_net::orm::OrmTrackedSubjectValidity::NotEvaluated, - subject_iri: subject_iri.clone(), - shape, - }); - - tracked_subject - }; - - let tracked_subject = get_or_create_tracked_subject(subject_iri, &shape.iri); - - // Process added triples. - // For each triple, check matching predicates in shape. - // keeping track of value count (for later validations). - // In parallel, we keep track of the values added (tracked_changes) - for triple in triples_added { - for predicate_schema in &shape.predicates { - if predicate_schema.iri != triple.predicate.as_str() { - // Triple does not match predicate. - continue; - } - // Predicate schema constraint matches this triple. - - // Add tracked predicate or increase cardinality - let tracked_predicate = tracked_subject - .tracked_predicates - .entry(predicate_schema.iri.to_string()) - .or_insert_with(|| OrmTrackedPredicate { - current_cardinality: 0, - schema: predicate_schema, - tracked_children: Vec::new(), - current_literals: None, - }); - tracked_predicate.current_cardinality += 1; - - let obj_term = oxrdf_term_to_orm_basic_type(&triple.object); - - // Keep track of the changed values too. - let pred_changes = subject_changes - .predicates - .entry(predicate_schema.iri.clone()) - .or_insert_with(|| OrmTrackedPredicateChanges { - tracked_predicate: &tracked_predicate, - values_added: Vec::new(), - values_removed: Vec::new(), - }); - - pred_changes.values_added.push(obj_term.clone()); - - // If value type is literal, we need to add the current value to the tracked predicate. - if tracked_predicate - .schema - .dataTypes - .iter() - .any(|dt| dt.valType == OrmSchemaLiteralType::literal) - { - if let Some(current_literals) = tracked_predicate.current_literals { - current_literals.push(obj_term); - } else { - tracked_predicate.current_literals.insert(vec![obj_term]); - } - } - - // If predicate is of type shape, register (parent -> child) links so that - // nested subjects can later be (lazily) fetched / validated. - for shape_iri in predicate_schema - .dataTypes - .iter() - .filter(|dt| dt.valType == OrmSchemaLiteralType::shape) - .flat_map(|dt| dt.shape) - { - if let BasicType::Str(obj_iri) = obj_term { - // Get or create object's tracked subject struct. - let tracked_child = get_or_create_tracked_subject( - triple.predicate.as_string(), - &shape_iri, - ); - - // Add self to parent (set tracked to true, preliminary). - tracked_child.parents.insert(obj_iri, (tracked_child, true)); - - // Add link to children - tracked_predicate - .tracked_children - .push(unsafe { Weak::from_raw(tracked_child) }); - } - } - } - } - - // Process removed triples. - for triple in triples_removed { - let pred_iri = triple.predicate.as_str(); - - // Only adjust if we had tracked state. - let Some(tracked_predicate) = tracked_subjects - .get_mut(subject_iri) - .map(|tss| tss.get(&shape.iri)) - .flatten() - .map(|ts| ts.tracked_predicates.get(pred_iri)) - .flatten() - else { - continue; - }; - - // The cardinality might become -1 or 0. We will remove them from the tracked predicates during validation. - tracked_predicate.current_cardinality -= 1; - - let Some(pred_changes) = subject_changes.predicates.get(pred_iri) else { - continue; - }; - - let val_removed = oxrdf_term_to_orm_basic_type(&triple.object); - pred_changes.values_removed.push(val_removed.clone()); - - // If value type is literal, we need to remove the current value from the tracked predicate. - if tracked_predicate - .schema - .dataTypes - .iter() - .any(|dt| dt.valType == OrmSchemaLiteralType::literal) - { - if let Some(current_literals) = &mut tracked_predicate.current_literals { - // Remove obj_val from current_literals in-place - current_literals.retain(|val| *val != val_removed); - } else { - tracked_predicate.current_literals = Some(vec![val_removed]); - } - } else if tracked_predicate - .schema - .dataTypes - .iter() - .any(|dt| dt.valType == OrmSchemaLiteralType::shape) - { - // Remove parent from child and child from tracked children. - for shape_iri in tracked_predicate - .schema - .dataTypes - .iter() - .filter(|dt| dt.valType == OrmSchemaLiteralType::shape) - .flat_map(|dt| dt.shape) - { - if let BasicType::Str(object_iri) = val_removed { - // Get child. - let nested_tracked_subject = get_or_create_tracked_subject( - triple.predicate.as_string(), - &shape_iri, - ); - - // Add self to parent (set tracked to true, preliminary). - nested_tracked_subject.parents.remove(&object_iri); - - // Remove from tracked_children - tracked_predicate.tracked_children.retain_mut(|el| { - el.upgrade().map(|el| el.subject_iri) == Some(object_iri) - }); - } - } - } - } - } - - /// Check the validity of a subject. - /// Might return nested objects that need to be validated. - /// Assumes all triples to be of same subject. - fn check_subject_validity<'a>( - s_change: &'a OrmTrackedSubjectChange<'a>, - shape: &OrmSchemaShape, - schema: &'a OrmSchema, - tracked_subjects: &HashMap>>, - previous_validity: OrmTrackedSubjectValidity, - ) -> ( - OrmTrackedSubjectValidity, - // Vec - Vec<(&'a String, &'a OrmSchemaShape, bool)>, - ) { - // Check 1) If there are no changes, there is nothing to do. - if s_change.predicates.is_empty() { - return (previous_validity, vec![]); - } - - let previous_validity = s_change.valid; - let mut new_validity = OrmTrackedSubjectValidity::Valid; - // Helper to set own validity which does not overwrite worse invalids. - let mut set_validity = |new_val: OrmTrackedSubjectValidity| { - if new_val == OrmTrackedSubjectValidity::Invalid { - new_validity = OrmTrackedSubjectValidity::Invalid; - // Remove all tracked predicates - s_change.tracked_subjects.tracked_predicates = HashMap::new(); - } else if new_val == OrmTrackedSubjectValidity::NotEvaluated - && new_validity != OrmTrackedSubjectValidity::Invalid - { - new_validity = OrmTrackedSubjectValidity::NotEvaluated; - } - }; - - let tracked_subject = tracked_subjects - .get(&s_change.subject_iri) - .unwrap() - .get(&shape.iri) - .unwrap(); - - // Check 2) If all parents are untracked, return untracked. - if tracked_subject - .parents - .values() - .all(|(parent, tracked)| !tracked) - { - // Remove tracked predicates and set untracked. - tracked_subject.tracked_predicates = HashMap::new(); - tracked_subject.valid = OrmTrackedSubjectValidity::Untracked; - return (OrmTrackedSubjectValidity::Untracked, vec![]); - } - - // Check 3) If there is an infinite loop of parents pointing back to us, return invalid. - // Create a set of visited parents to detect cycles. - if has_cycle(tracked_subject, &mut HashSet::new()) { - // Remove tracked predicates and set invalid. - tracked_subject.tracked_predicates = HashMap::new(); - tracked_subject.valid = OrmTrackedSubjectValidity::Invalid; - return (OrmTrackedSubjectValidity::Invalid, vec![]); - } - - // Keep track of objects that need to be validated against a shape to fetch and validate. - let mut need_evaluation: Vec<(&String, &OrmSchemaShape, bool)> = vec![]; - - // Check 4) Validate subject against each predicate in shape. - for p_schema in shape.predicates.iter() { - let p_change = s_change.predicates.get(&p_schema.iri); - let tracked_pred = p_change.map(|pc| pc.tracked_predicate); - - let count = tracked_pred - .map_or_else(|| 0, |tp: &OrmTrackedPredicate<'_>| tp.current_cardinality); - - // Check 4.1) Cardinality - if count < p_schema.minCardinality { - set_validity(OrmTrackedSubjectValidity::Invalid); - if count <= 0 { - // If cardinality is 0, we can remove the tracked predicate. - tracked_subject.tracked_predicates.remove(&p_schema.iri); - } - break; - // Check 4.2) Cardinality too high and extra values not allowed. - } else if count > p_schema.maxCardinality - && p_schema.maxCardinality != -1 - && p_schema.extra != Some(true) - { - // If cardinality is too high and no extra allowed, invalid. - set_validity(OrmTrackedSubjectValidity::Invalid); - break; - // Check 4.3) Literal present types and valid. - } else if p_schema - .dataTypes - .iter() - .any(|dt| dt.valType == OrmSchemaLiteralType::literal) - { - // If we have literals, check if all required literals are present. - let required_literals: Vec = p_schema - .dataTypes - .iter() - .flat_map(|dt| dt.literals) - .flatten() - .collect(); - - // Early stop: If no extra values allowed but the sizes - // between required and given values mismatches. - if !p_schema.extra.unwrap_or(false) - && (required_literals.len().into() - != tracked_pred.map_or(0, |p| p.current_cardinality)) - { - set_validity(OrmTrackedSubjectValidity::Invalid); - break; - } - - // Check that each required literal is present. - for required_literal in required_literals { - // Is tracked predicate present? - if !tracked_pred - .iter() - .flat_map(|tp| tp.current_literals) - .flatten() - .any(|literal| literal == required_literal) - { - set_validity(OrmTrackedSubjectValidity::Invalid); - break; - } - } - // Check 4.4) Nested shape correct. - } else if p_schema - .dataTypes - .iter() - .any(|dt| dt.valType == OrmSchemaLiteralType::shape) - { - // If we have a nested shape, we need to check if the nested object is tracked and valid. - - // First, Count valid, invalid, unknowns, and untracked - let counts = tracked_pred - .iter() - .flat_map(|tp| tp.tracked_children) - .map(|tc| { - tc.upgrade().map(|tc| { - if tc.valid == OrmTrackedSubjectValidity::Valid { - (1, 0, 0, 0) - } else if tc.valid == OrmTrackedSubjectValidity::Invalid { - (0, 1, 0, 0) - } else if tc.valid == OrmTrackedSubjectValidity::NotEvaluated { - (0, 0, 1, 0) - } else if tc.valid == OrmTrackedSubjectValidity::Untracked { - (0, 0, 0, 1) - } else { - (0, 0, 0, 0) - } - }) - }) - .flatten() - .fold((0, 0, 0, 0), |(v1, i1, u1, ut1), o| { - (v1 + o.0, i1 + o.1, u1 + o.2, ut1 + o.3) - }); - - if counts.1 > 0 && p_schema.extra != Some(true) { - // If we have at least one invalid nested object and no extra allowed, invalid. - set_validity(OrmTrackedSubjectValidity::Invalid); - break; - } else if counts.0 < p_schema.minCardinality { - // If we have not enough valid nested objects, invalid. - set_validity(OrmTrackedSubjectValidity::Invalid); - break; - } else if counts.3 > 0 { - // If we have untracked nested objects, we need to fetch them and validate. - set_validity(OrmTrackedSubjectValidity::NotEvaluated); - // Add them to the list of unknowns to fetch and validate. - for o in tracked_pred - .iter() - .flat_map(|tp| tp.tracked_children.iter()) - { - if let Some(tc) = o.upgrade() { - if tc.valid == OrmTrackedSubjectValidity::Untracked { - need_evaluation.push((&tc.subject_iri, tc.shape, true)); - } - } - } - } else if counts.2 > 0 { - // If we have unknown nested objects, we need to wait for their evaluation. - set_validity(OrmTrackedSubjectValidity::NotEvaluated); - } else { - // All nested objects are valid and cardinality is correct. - // We are valid with this predicate. - } - // Check 4.5) Data types correct. - } else { - // Check if the data type is correct. - let allowed_types: Vec = - p_schema.dataTypes.iter().map(|dt| dt.valType).collect(); - // For each new value, check that data type is in allowed_types. - for val_added in p_change.iter().map(|pc| pc.values_added).flatten() { - let matches = match val_added { - BasicType::Bool(_) => allowed_types - .iter() - .any(|t| *t == OrmSchemaLiteralType::boolean), - BasicType::Num(_) => allowed_types - .iter() - .any(|t| *t == OrmSchemaLiteralType::number), - BasicType::Str(_) => allowed_types.iter().any(|t| { - *t == OrmSchemaLiteralType::string - || *t == OrmSchemaLiteralType::iri - }), - }; - if !matches { - set_validity(OrmTrackedSubjectValidity::Invalid); - break; - } - } - // Break if validity has become invalid. - if new_validity == OrmTrackedSubjectValidity::Invalid { - break; - } - }; - } - - if new_validity == OrmTrackedSubjectValidity::Invalid { - // If we are invalid, we can discard new unknowns again - they won't be kept in memory. - // We need to remove ourself from child objects parents field and - // remove them if no other is tracking. - for child_subject_weak in tracked_subject - .tracked_predicates - .iter() - .flat_map(|(tp_iri, tp)| tp.tracked_children) - { - if let Some(child_subject) = child_subject_weak.upgrade() { - child_subject.parents.remove(&s_change.subject_iri); - }; - } - - // Remove tracked predicates and set untracked. - tracked_subject.tracked_predicates = HashMap::new(); - tracked_subject.valid = OrmTrackedSubjectValidity::Invalid; - s_change.valid = OrmTrackedSubjectValidity::Invalid; - - // Empty list of children that need evaluation. - need_evaluation.retain(|_| false); - } else if new_validity == OrmTrackedSubjectValidity::Valid - && previous_validity != OrmTrackedSubjectValidity::Valid - { - // If this subject became valid, we need to refetch this subject; - // For that we prepend self to needs_fetch. - s_change.valid = OrmTrackedSubjectValidity::Valid; - tracked_subject.valid = OrmTrackedSubjectValidity::Valid; - need_evaluation.insert(0, (&s_change.subject_iri, shape, true)); - } - - // If validity changed, parents need to be re-evaluated. - if new_validity != previous_validity { - // We return the tracking parents which need re-evaluation. - // Remember that the last elements (i.e. children or needs_fetch) are evaluated first. - return ( - new_validity, - // Add parents and objects in `need_evaluation`. - tracked_subject - .parents - .values() - // Inform tracking parents only. - .filter(|(parent, is_tracked)| *is_tracked) - .map(|(parent, is_tracked)| (&parent.subject_iri, parent.shape, false)) - // Add `need_evaluation`. - .chain(need_evaluation) - .collect(), - ); - } - - return (new_validity, need_evaluation); - } + fn build_changes_for_subscription<'a>( + orm_subscription: &'a mut OrmSubscription, + triples_added: &'a [Triple], + triples_removed: &'a [Triple], + ) -> Result, NgError> { + let tracked_subjects = &mut orm_subscription.tracked_subjects; + let schema = &orm_subscription.shape_type.schema; + let root_shape = schema + .get(&orm_subscription.shape_type.shape) + .ok_or(VerifierError::InvalidOrmSchema)?; - // === Validation === + let mut orm_changes: OrmChanges = HashMap::new(); - // FILO queue: To validate object changes (nested objects first). Strings are object IRIs. + // First in, last out stack to keep track of objects to validate (nested objects first). Strings are object IRIs. let mut shape_validation_queue: Vec<(&OrmSchemaShape, Vec)> = vec![]; // Add root shape for first validation run. - shape_validation_queue.push((&root_shape, vec![])); - - // Structure to store changes in. By shape iri > subject iri > OrmTrackedSubjectChange - let mut shape_and_subject_changes: HashMap< - String, - HashMap, - > = HashMap::new(); + shape_validation_queue.push((root_shape, vec![])); // Process queue of shapes and subjects to validate. + // For a given shape, we evaluate every subject against that shape. while let Some((shape, objects_to_validate)) = shape_validation_queue.pop() { - // For a given shape, we evaluate every subject against that shape. - // Collect triples relevant for validation. let added_triples_by_subject = group_by_subject_for_shape(shape, triples_added, &objects_to_validate); let removed_triples_by_subject = group_by_subject_for_shape(shape, triples_removed, &objects_to_validate); - let all_modified_subjects: HashSet<&String> = added_triples_by_subject + let all_modified_subjects: HashSet<&SubjectIri> = added_triples_by_subject .keys() .chain(removed_triples_by_subject.keys()) .collect(); - // Use to collect nested objects that need validation. + // Variable to collect nested objects that need validation. // First string is shape IRI, second are object IRIs. - let nested_objects_to_validate: HashMap> = HashMap::new(); + let mut nested_objects_to_validate: HashMap> = HashMap::new(); // For each subject, add/remove triples and validate. for subject_iri in all_modified_subjects { let triples_added_for_subj = added_triples_by_subject .get(subject_iri) - .unwrap_or(&vec![]) - .to_vec(); + .map(|v| v.as_slice()) + .unwrap_or(&[]); let triples_removed_for_subj = removed_triples_by_subject .get(subject_iri) - .unwrap_or(&vec![]) - .to_vec(); + .map(|v| v.as_slice()) + .unwrap_or(&[]); // Get or create change object for (shape, subject) pair. - let change = shape_and_subject_changes + let change = orm_changes .entry(shape.iri.clone()) - .or_insert_with(|| HashMap::new()) + .or_insert_with(HashMap::new) .entry(subject_iri.clone()) .or_insert_with(|| OrmTrackedSubjectChange { subject_iri: subject_iri.clone(), predicates: HashMap::new(), - valid: OrmTrackedSubjectValidity::NeedsFetch, }); // Apply all triples for that subject to the tracked (shape, subject) pair. // Record the changes. - add_remove_triples( + if let Err(e) = add_remove_triples( shape, - &subject_iri, - &triples_added_for_subj, - &triples_removed_for_subj, - &tracked_subjects, - &change, - ); + subject_iri, + triples_added_for_subj, + triples_removed_for_subj, + tracked_subjects, + change, + ) { + log_err!("apply_changes_from_triples add/remove error: {:?}", e); + } + + let tracked_subject_opt = tracked_subjects + .get_mut(subject_iri) + .and_then(|m| m.get_mut(&shape.iri)); + let Some(tracked_subject) = tracked_subject_opt else { + continue; + }; // skip if missing - let tracked_subject = tracked_subjects.get(subject_iri).unwrap(); // Validate the subject. - let (new_validity, new_unknowns) = check_subject_validity( - &change, - shape, - schema, - tracked_subjects, - tracked_subject, - ); + let (_new_validity, new_unknowns) = + update_subject_validity(change, shape, tracked_subjects, tracked_subject.valid); // TODO: Add logic to fetch un-fetched objects after validation. - // and return logic to add unprocessed nested objects after validation. // We add the new_unknowns to be processed next - for (iri, schema, needs_fetch) in new_unknowns { + for (iri, schema_shape) in new_unknowns { // Add to nested_objects_to_validate. nested_objects_to_validate - .entry(schema.iri.clone()) - .or_insert_with(|| vec![]) + .entry(schema_shape) + .or_insert_with(Vec::new) .push(iri.clone()); } } // Now, we add all objects that need re-evaluation to the queue. for (shape_iri, objects) in nested_objects_to_validate { - shape_validation_queue.push((schema.get(&shape_iri).unwrap(), objects)); + if let Some(next_shape) = schema.get(&shape_iri) { + shape_validation_queue.push((next_shape, objects)); + } else { + log_err!("Schema missing for nested shape {}", shape_iri); + } } } + Ok(orm_changes) + } - return shape_and_subject_changes; + fn apply_triple_changes<'a>( + &'a mut self, + triples_added: &'a [Triple], + triples_removed: &'a [Triple], + only_for_nuri: Option<&NuriV0>, + only_for_session_id: Option, + ) -> Result, NgError> { + // If we have a specific session, handle only that subscription. + if let Some(session_id) = only_for_session_id { + if let Some((_, sub)) = self + .orm_subscriptions + .iter_mut() + .find(|(_, s)| s.session_id == session_id) + { + return Self::build_changes_for_subscription(sub, triples_added, triples_removed); + } else { + return Ok(HashMap::new()); + } + } + + // Otherwise, iterate all (optionally filter by nuri) and merge. + let mut merged: OrmChanges = HashMap::new(); + for (nuri, sub) in self.orm_subscriptions.iter_mut() { + if let Some(filter_nuri) = only_for_nuri { + if nuri != filter_nuri { + continue; + } + } + let changes = + Self::build_changes_for_subscription(sub, triples_added, triples_removed)?; + for (shape_iri, subj_map) in changes { + merged + .entry(shape_iri) + .or_insert_with(HashMap::new) + .extend(subj_map); + } + } + Ok(merged) } - fn create_orm_from_triples( - &mut self, - scope: &NuriV0, - schema: &OrmSchema, + fn materialize_orm_object( + change: &OrmTrackedSubjectChange, + changes: &OrmChanges, shape: &OrmSchemaShape, - triples: &Vec, - ) -> Result { - let changes = self.apply_changes_from_triples(scope, schema, shape, triples, &vec![]); - - let Some(root_changes) = changes.get(&shape.iri).map(|s| s.values()) else { - return Ok(Value::Array(vec![])); - }; + tracked_subjects: &HashMap>, + ) -> Value { + let mut orm_obj = json!({"id": change.subject_iri}); + let orm_obj_map = orm_obj.as_object_mut().unwrap(); + for pred_schema in &shape.predicates { + let Some(pred_change) = change.predicates.get(&pred_schema.iri) else { + continue; + }; + let property_name = &pred_schema.readablePredicate; + let is_multi = pred_schema.maxCardinality > 1 || pred_schema.maxCardinality == -1; - let valid_roots = root_changes.filter(|v| v.valid == OrmTrackedSubjectValidity::Valid); + if pred_schema + .dataTypes + .iter() + .any(|dt| dt.valType == OrmSchemaLiteralType::shape) + { + // We have a nested type. - let mut return_vals: Value = Value::Array(vec![]); - let return_val_vec = return_vals.as_array_mut().unwrap(); + // Helper closure to create Value structs from a nested object_iri. + let get_nested_orm_obj = |object_iri: &SubjectIri| { + // Find allowed schemas for the predicate's datatype. + let shape_iris: Vec = pred_schema + .dataTypes + .iter() + .flat_map(|dt| dt.shape.clone()) + .collect(); - fn create_value_from_change( - change: &OrmTrackedSubjectChange, - changes: &HashMap>>, - shape: &OrmSchemaShape, - schema: &OrmSchema, - tracked_subjects: &HashMap>, - ) -> Value { - let mut new_val = json!({"id": change.subject_iri}); - let new_val_map = new_val.as_object_mut().unwrap(); - for pred_schema in &shape.predicates { - let Some(pred_change) = change.predicates.get(&pred_schema.iri) else { - continue; - }; - let property_name = pred_schema.readablePredicate.clone(); - let is_multi = pred_schema.maxCardinality > 1 || pred_schema.maxCardinality == -1; - - if pred_schema - .dataTypes - .iter() - .any(|dt| dt.valType == OrmSchemaLiteralType::shape) - { - // We have a nested type. - - // Helper to create Value structs from a nested object_iri. - let get_nested_value = |object_iri: &String| { - // Find allowed schemas for the predicate's datatype. - let shape_iris: Vec = pred_schema - .dataTypes - .iter() - .flat_map(|dt| dt.shape.clone()) - .collect(); - - // Find subject_change for this subject. There exists at least one (shape, subject) pair. - // If multiple exist, we take the first one. _Which one is chosen is undefined behavior_ - let nested_subject_change = shape_iris - .iter() - .find_map(|shape_iri| { - changes - .get(shape_iri) - .and_then(|subject_changes| subject_changes.get(object_iri)) - }) - .unwrap(); + // Find subject_change for this subject. There exists at least one (shape, subject) pair. + // If multiple allowed shapes exist, the first one is chosen. + let nested = shape_iris.iter().find_map(|shape_iri| { + changes + .get(shape_iri) + .and_then(|subject_changes| subject_changes.get(object_iri)) + .map(|ch| (shape_iri, ch)) + }); - if let Some(nested_schema) = tracked_subjects + if let Some((matched_shape_iri, nested_subject_change)) = nested { + if let Some(nested_tracked_subject) = tracked_subjects .get(&nested_subject_change.subject_iri) - .and_then(|tracked_subjects| tracked_subjects.get(&shape.iri)) + .and_then(|shape_to_tracked_orm| { + shape_to_tracked_orm.get(matched_shape_iri) + }) { - // Recurse - return create_value_from_change( - nested_subject_change, - changes, - nested_schema.shape, - schema, - tracked_subjects, - ); + if nested_tracked_subject.valid == OrmTrackedSubjectValidity::Valid { + // Recurse + return Some(Self::materialize_orm_object( + nested_subject_change, + changes, + &nested_tracked_subject.shape, + tracked_subjects, + )); + } } - return json!({}); - }; - - if is_multi { - // Add each value to a new object (predicates being object IRIs) like {iri: }. - let mut nested_objects = json!({"id": change.subject_iri}); - let nested_objects_map = nested_objects.as_object_mut().unwrap(); - - // Add (object iri, nested object) pairs to object. - for new_val in &pred_change.values_added { - if let BasicType::Str(object_iri) = new_val { - new_val_map - .insert(object_iri.clone(), get_nested_value(&object_iri)); + } + None + }; + + if is_multi { + // Represent nested objects with more than one child + // as a map/object of -> nested object, + // since there is no conceptual ordering of the children. + let mut nested_objects_map = serde_json::Map::new(); + + // Add each nested objects. + for new_val in &pred_change.values_added { + if let BasicType::Str(object_iri) = new_val { + if let Some(nested_orm_obj) = get_nested_orm_obj(object_iri) { + nested_objects_map.insert(object_iri.clone(), nested_orm_obj); } } - new_val_map.insert(property_name.clone(), nested_objects); - } else { - if let Some(BasicType::Str(object_iri)) = pred_change.values_added.get(0) { - new_val_map.insert(property_name.clone(), get_nested_value(object_iri)); + } + orm_obj_map.insert(property_name.clone(), Value::Object(nested_objects_map)); + } else { + if let Some(BasicType::Str(object_iri)) = pred_change.values_added.get(0) { + if let Some(nested_orm_obj) = get_nested_orm_obj(object_iri) { + orm_obj_map.insert(property_name.clone(), nested_orm_obj); } } + } + } else { + // We have a basic type (string, number, bool, literal). + + if is_multi { + // Add values as array. + orm_obj_map.insert( + property_name.clone(), + Value::Array( + pred_change + .values_added + .iter() + .map(|v| match v { + BasicType::Bool(b) => json!(*b), + BasicType::Num(n) => json!(*n), + BasicType::Str(s) => json!(s), + }) + .collect(), + ), + ); } else { - // We have a basic type (string, number, bool, literal). - - if is_multi { - // Add values as array. - new_val_map.insert( - property_name, - Value::Array( - pred_change.values_added.iter().map(|v| json!(v)).collect(), - ), + // Add value as primitive, if present. + if let Some(val) = pred_change.values_added.get(0) { + orm_obj_map.insert( + property_name.clone(), + match val { + BasicType::Bool(b) => json!(*b), + BasicType::Num(n) => json!(*n), + BasicType::Str(s) => json!(s), + }, ); - } else { - // Add value as primitive, if present. - if let Some(val) = pred_change.values_added.get(0) { - new_val_map.insert( - property_name, - match val { - BasicType::Bool(b) => json!(b), - BasicType::Num(n) => json!(n), - BasicType::Str(s) => json!(s), - }, - ); - } } } } - - return new_val; } + return orm_obj; + } + + fn create_orm_from_triples( + &mut self, + orm_subscription: &OrmSubscription, + triples: &[Triple], + ) -> Result { + let root_shape_iri = &orm_subscription.shape_type.shape; + let schema = &orm_subscription.shape_type.schema; + let root_shape = schema + .get(root_shape_iri) + .ok_or(VerifierError::InvalidOrmSchema)?; + let changes: OrmChanges = + self.apply_triple_changes(triples, &[], None, Some(orm_subscription.session_id))?; + + let Some(_root_changes) = changes.get(&root_shape.iri).map(|s| s.values()) else { + return Ok(Value::Array(vec![])); + }; + + let mut return_vals: Value = Value::Array(vec![]); + let return_val_vec = return_vals.as_array_mut().unwrap(); + // For each valid change struct, we build an orm object. - for root_change in valid_roots { - let new_val = create_value_from_change( - root_change, - &changes, - shape, - schema, - self.orm_tracked_subjects, - ); - return_val_vec.push(new_val); + // The way we get the changes from the tracked subjects is a bit hacky, sorry. + for (subject_iri, tracked_subjects_by_shape) in &orm_subscription.tracked_subjects { + if let Some(tracked_subject) = tracked_subjects_by_shape.get(root_shape_iri) { + if tracked_subject.valid == OrmTrackedSubjectValidity::Valid { + if let Some(change) = changes + .get(root_shape_iri) + .and_then(|subject_iri_to_ts| subject_iri_to_ts.get(subject_iri)) + { + let new_val = Self::materialize_orm_object( + change, + &changes, + root_shape, + &orm_subscription.tracked_subjects, + ); + return_val_vec.push(new_val); + } + } + } } return Ok(return_vals); } - // Collect result - // For all valid tracked_subjects, build an object from the tracked_subject_changes. - pub(crate) async fn orm_update(&mut self, scope: &NuriV0, patch: GraphTransaction) {} pub(crate) async fn orm_frontend_update( &mut self, scope: &NuriV0, - shape_id: String, + shape_iri: ShapeIri, diff: OrmDiff, ) { - log_info!("frontend_update_orm {:?} {} {:?}", scope, shape_id, diff); + log_info!("frontend_update_orm {:?} {} {:?}", scope, shape_iri, diff); } pub(crate) async fn push_orm_response( @@ -840,9 +413,9 @@ impl Verifier { "push_orm_response {:?} {} {:?}", scope, schema_iri, - self.orm_tracked_subjects + self.orm_subscriptions ); - if let Some(shapes) = self.orm_tracked_subjects.get_mut(scope) { + if let Some(shapes) = self.orm_subscriptions.get_mut(scope) { if let Some(sessions) = shapes.get_mut(schema_iri) { let mut sessions_to_close: Vec = vec![]; for (session_id, subscription) in sessions.iter_mut() { @@ -863,7 +436,7 @@ impl Verifier { pub(crate) async fn start_orm( &mut self, nuri: &NuriV0, - shape_type: OrmShapeType, + shape_type: &OrmShapeType, session_id: u64, ) -> Result<(Receiver, CancelFn), NgError> { let (tx, rx) = mpsc::unbounded::(); @@ -872,43 +445,26 @@ impl Verifier { // If multiple data types are present for the same predicate, they must be of of the same type. // All referenced shapes must be available. - // Keep track of connections here. - self.orm_tracked_subjects.insert( - nuri.clone(), - HashMap::from([( - shape_type.shape.clone(), - HashMap::from([( - session_id, - OrmSubscription { - sender: tx.clone(), - tracked_objects: HashMap::new(), - }, - )]), - )]), - ); - - // Add shape to registry or increase ref count. - if let Some(shape_ref) = self.orm_shape_types.get_mut(&shape_type.shape) { - shape_ref.ref_count += 1; - } else { - self.orm_shape_types.insert( - shape_type.shape.clone(), - OrmShapeTypeRef { - ref_count: 1, - shape_type, - }, - ); - } + // Create new subscription and add to self.orm_subscriptions + let orm_subscription = OrmSubscription { + shape_type: shape_type.clone(), + session_id: session_id, + sender: tx.clone(), + tracked_subjects: HashMap::new(), + nuri: nuri.clone(), + }; + self.orm_subscriptions + .insert(nuri.clone(), orm_subscription); + // Query triples for this shape let shape_query = - sparql_construct_from_orm_shape_type(&shape_type.schema, &shape_type.shape, None)?; - let shape_triples = self.sparql_construct(shape_query, Some(nuri))?; - let orm_object = self.create_orm_from_triples( - nuri, - &shape_type.schema, - &shape_type.shape, - &shape_triples, - ); + sparql_construct_from_orm_shape_type(&shape_type.schema, &shape_type.shape)?; + // TODO: How to stringify nuri correctly? + let shape_triples = self.sparql_construct(shape_query, Some(nuri.repo()))?; + + // Create objects from queried triples. + let subscription_ref = self.orm_subscriptions.get(nuri).unwrap(); + let _orm_objects = self.create_orm_from_triples(subscription_ref, &shape_triples)?; // placeholder call; TODO integrate response //self.push_orm_response().await; (only for requester, not all sessions) @@ -947,7 +503,7 @@ fn literal_to_sparql_str(var: OrmSchemaDataType) -> Vec { BasicType::Num(number) => number.to_string(), BasicType::Str(sting) => { if is_iri(sting) { - format!("<{}>", escape_iri(sting)) + format!("<{}>", sting) } else { format!("\"{}\"", escape_literal(sting)) } @@ -959,9 +515,7 @@ fn literal_to_sparql_str(var: OrmSchemaDataType) -> Vec { pub fn sparql_construct_from_orm_shape_type( schema: &OrmSchema, - shape: &String, - // TODO: Remove max_recursion - max_recursion: Option, + shape: &ShapeIri, ) -> Result { // Use a counter to generate unique variable names. let mut var_counter = 0; @@ -977,7 +531,7 @@ pub fn sparql_construct_from_orm_shape_type( // Keep track of visited shapes while recursing to prevent infinite loops. // TODO: Update type - let mut visited_shapes: HashMap = HashMap::new(); + let mut visited_shapes: HashSet = HashSet::new(); // Recursive function to call for (nested) shapes. fn process_shape( @@ -987,18 +541,14 @@ pub fn sparql_construct_from_orm_shape_type( construct_statements: &mut Vec, where_statements: &mut Vec, var_counter: &mut i32, - visited_shapes: &mut HashMap, - max_recursion: u8, + visited_shapes: &mut HashSet, ) { // Prevent infinite recursion on cyclic schemas. - // Keep track of the number of shape occurrences and return if it's larger than max_recursion. - // For the last recursion, we could use by-reference queries but that could be for the future. - let current_self_recursion_depth = visited_shapes.get(&shape.iri).unwrap_or(&0); - if *current_self_recursion_depth > max_recursion { + // TODO: We could handle this as IRI string reference. + if visited_shapes.contains(&shape.iri) { return; - } else { - visited_shapes.insert(shape.iri.clone(), current_self_recursion_depth + 1); } + visited_shapes.insert(shape.iri.clone()); // Add statements for each predicate. for predicate in &shape.predicates { @@ -1039,7 +589,6 @@ pub fn sparql_construct_from_orm_shape_type( where_statements, var_counter, visited_shapes, - max_recursion, ); } } @@ -1115,7 +664,6 @@ pub fn sparql_construct_from_orm_shape_type( &mut where_statements, &mut var_counter, &mut visited_shapes, - max_recursion.unwrap_or(1), ); // Create query from statements. @@ -1127,11 +675,6 @@ pub fn sparql_construct_from_orm_shape_type( )) } -/// Escape an IRI fragment if needed (very conservative, only wrap with <...>). Assumes input already a full IRI. -fn escape_iri(iri: &str) -> String { - format!("<{}>", iri) -} - /// SPARQL literal escape: backslash, quotes, newlines, tabs. fn escape_literal(lit: &str) -> String { let mut out = String::with_capacity(lit.len() + 4); @@ -1147,75 +690,3 @@ fn escape_literal(lit: &str) -> String { } return out; } - -/// Converts an oxrdf::Term to an orm::Term -fn oxrdf_term_to_orm_term(term: &ng_oxigraph::oxrdf::Term) -> ng_net::orm::Term { - match term { - ng_oxigraph::oxrdf::Term::NamedNode(node) => { - ng_net::orm::Term::Ref(node.as_str().to_string()) - } - ng_oxigraph::oxrdf::Term::BlankNode(node) => { - ng_net::orm::Term::Ref(node.as_str().to_string()) - } - ng_oxigraph::oxrdf::Term::Literal(literal) => { - // Check the datatype to determine how to convert - match literal.datatype().as_str() { - // Check for string first, this is the most common. - "http://www.w3.org/2001/XMLSchema#string" => { - ng_net::orm::Term::Str(literal.value().to_string()) - } - "http://www.w3.org/2001/XMLSchema#boolean" => { - match literal.value().parse::() { - Ok(b) => ng_net::orm::Term::Bool(b), - Err(_) => ng_net::orm::Term::Str(literal.value().to_string()), - } - } - "http://www.w3.org/2001/XMLSchema#integer" - | "http://www.w3.org/2001/XMLSchema#decimal" - | "http://www.w3.org/2001/XMLSchema#double" - | "http://www.w3.org/2001/XMLSchema#float" - | "http://www.w3.org/2001/XMLSchema#int" - | "http://www.w3.org/2001/XMLSchema#long" - | "http://www.w3.org/2001/XMLSchema#short" - | "http://www.w3.org/2001/XMLSchema#byte" - | "http://www.w3.org/2001/XMLSchema#unsignedInt" - | "http://www.w3.org/2001/XMLSchema#unsignedLong" - | "http://www.w3.org/2001/XMLSchema#unsignedShort" - | "http://www.w3.org/2001/XMLSchema#unsignedByte" => { - match literal.value().parse::() { - Ok(n) => ng_net::orm::Term::Num(n), - Err(_) => ng_net::orm::Term::Str(literal.value().to_string()), - } - } - _ => ng_net::orm::Term::Str(literal.value().to_string()), - } - } - ng_oxigraph::oxrdf::Term::Triple(triple) => { - // For RDF-star triples, convert to string representation - ng_net::orm::Term::Str(triple.to_string()) - } - } -} - -fn oxrdf_term_to_orm_basic_type(term: &ng_oxigraph::oxrdf::Term) -> BasicType { - match oxrdf_term_to_orm_term(term) { - ng_net::orm::Term::Str(s) => BasicType::Str(s), - ng_net::orm::Term::Num(n) => BasicType::Num(n), - ng_net::orm::Term::Bool(b) => BasicType::Bool(b), - ng_net::orm::Term::Ref(b) => BasicType::Str(b), // Treat IRIs as strings - } -} - -fn has_cycle(subject: &OrmTrackedSubjectAndShape, visited: &mut HashSet) -> bool { - if visited.contains(subject.subject_iri) { - return true; - } - visited.insert(subject.subject_iri.clone()); - for (_parent_iri, (parent_subject, _)) in &subject.parents { - if has_cycle(parent_subject, visited) { - return true; - } - } - visited.remove(subject.subject_iri); - false -} diff --git a/ng-verifier/src/request_processor.rs b/ng-verifier/src/request_processor.rs index 0438567..e76f959 100644 --- a/ng-verifier/src/request_processor.rs +++ b/ng-verifier/src/request_processor.rs @@ -54,7 +54,7 @@ impl Verifier { match command { AppRequestCommandV0::OrmStart => match _payload { Some(AppRequestPayload::V0(AppRequestPayloadV0::OrmStart(shape_type))) => { - self.start_orm(nuri, shape_type, session_id).await + self.start_orm(nuri, &shape_type, session_id).await } _ => return Err(NgError::InvalidArgument), }, diff --git a/ng-verifier/src/utils/mod.rs b/ng-verifier/src/utils/mod.rs new file mode 100644 index 0000000..d2039c7 --- /dev/null +++ b/ng-verifier/src/utils/mod.rs @@ -0,0 +1,10 @@ +// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers +// All rights reserved. +// Licensed under the Apache License, Version 2.0 +// +// or the MIT license , +// at your option. All files in the project carrying such +// notice may not be copied, modified, or distributed except +// according to those terms. + +pub mod orm_validation; diff --git a/ng-verifier/src/utils/orm_validation.rs b/ng-verifier/src/utils/orm_validation.rs new file mode 100644 index 0000000..efaa327 --- /dev/null +++ b/ng-verifier/src/utils/orm_validation.rs @@ -0,0 +1,585 @@ +// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers +// All rights reserved. +// Licensed under the Apache License, Version 2.0 +// +// or the MIT license , +// at your option. All files in the project carrying such +// notice may not be copied, modified, or distributed except +// according to those terms. + +use std::collections::HashMap; +use std::collections::HashSet; + +use ng_net::orm::BasicType; +use ng_net::orm::OrmSchemaLiteralType; +use ng_net::orm::OrmSchemaShape; +use ng_net::orm::OrmTrackedPredicate; +use ng_net::orm::OrmTrackedSubject; +use ng_net::orm::OrmTrackedSubjectChange; +use ng_net::orm::OrmTrackedSubjectValidity; +use ng_oxigraph::oxrdf::Subject; +use ng_oxigraph::oxrdf::Triple; +use ng_repo::errors::NgError; + +pub fn group_by_subject_for_shape<'a>( + shape: &'a OrmSchemaShape, + triples: &'a [Triple], + allowed_subjects: &[String], +) -> HashMap> { + let mut triples_by_subject: HashMap> = HashMap::new(); + let allowed_preds_set: HashSet<&str> = shape + .predicates + .iter() + .map(|OrmSchemaDataType, OrmSchemaPredicatep| p.iri.as_str()) + .collect(); + let allowed_subject_set: HashSet<&str> = allowed_subjects.iter().map(|s| s.as_str()).collect(); + for triple in triples { + // triple.subject must be in allowed_subjects (or allowed_subjects empty) + // and triple.predicate must be in allowed_preds. + if allowed_preds_set.contains(triple.predicate.as_str()) { + // filter subjects if list provided + let subj = match &triple.subject { + Subject::NamedNode(n) => n.as_ref(), + _ => continue, + }; + // Subject must be in allowed subjects (or allowed_subjects is empty). + if allowed_subject_set.is_empty() || allowed_subject_set.contains(subj.as_str()) { + triples_by_subject + .entry(subj.to_string()) + .or_insert_with(Vec::new) + .push(triple); + } + } + } + + return triples_by_subject; +} + +/// Add all triples to `subject_changes` +/// Returns predicates to nested objects that were touched and need processing. +/// Assumes all triples have same subject. +use std::sync::Arc; + +pub fn add_remove_triples_mut( + shape: &Arc, + subject_iri: &str, + triples_added: &[&Triple], + triples_removed: &[&Triple], + tracked_subjects: &mut HashMap>, + subject_changes: &mut OrmTrackedSubjectChange, +) -> Result<(), NgError> { + let get_or_create_tracked_subject = + |subject_iri: &str, + shape_iri: &str, + tracked_subjects: &mut HashMap>| { + let tracked_shapes_for_subject = tracked_subjects + .entry(subject_iri.to_string()) + .or_insert_with(HashMap::new); + + tracked_shapes_for_subject + .entry(shape_iri.to_string()) + .or_insert_with(|| OrmTrackedSubject { + tracked_predicates: HashMap::new(), + parents: HashMap::new(), + valid: ng_net::orm::OrmTrackedSubjectValidity::Pending, + subject_iri: subject_iri.to_string(), + shape, + }) + }; + + let tracked_subject = get_or_create_tracked_subject(subject_iri, &shape.iri, tracked_subjects); + + // Process added triples. + // For each triple, check matching predicates in shape. + // keeping track of value count (for later validations). + // In parallel, we keep track of the values added (tracked_changes) + for triple in triples_added { + for predicate_schema in &shape.predicates { + if predicate_schema.iri != triple.predicate.as_str() { + // Triple does not match predicate. + continue; + } + // Predicate schema constraint matches this triple. + + // Add tracked predicate or increase cardinality + let tracked_predicate = tracked_subject + .tracked_predicates + .entry(predicate_schema.iri.to_string()) + .or_insert_with(|| OrmTrackedPredicate { + current_cardinality: 0, + schema: predicate_schema, + tracked_children: Vec::new(), + current_literals: None, + }); + tracked_predicate.current_cardinality += 1; + + let obj_term = oxrdf_term_to_orm_basic_type(&triple.object); + + // Keep track of the changed values too. + let pred_changes = subject_changes + .predicates + .entry(predicate_schema.iri.clone()) + .or_insert_with(|| OrmTrackedPredicateChanges { + tracked_predicate: tracked_predicate, // reference remains inside lifetime of this call + values_added: Vec::new(), + values_removed: Vec::new(), + }); + + pred_changes.values_added.push(obj_term.clone()); + + // If value type is literal, we need to add the current value to the tracked predicate. + if tracked_predicate + .schema + .dataTypes + .iter() + .any(|dt| dt.valType == OrmSchemaLiteralType::literal) + { + match &mut tracked_predicate.current_literals { + Some(lits) => lits.push(obj_term), + None => { + tracked_predicate.current_literals = Some(vec![obj_term]); + } + } + } + + // If predicate is of type shape, register (parent -> child) links so that + // nested subjects can later be (lazily) fetched / validated. + for shape_iri in predicate_schema + .dataTypes + .iter() + .filter(|dt| dt.valType == OrmSchemaLiteralType::shape) + .flat_map(|dt| dt.shape) + { + if let BasicType::Str(obj_iri) = obj_term { + // Get or create object's tracked subject struct. + let tracked_child = + get_or_create_tracked_subject(triple.predicate.as_string(), &shape_iri); + + // Add self to parent (set tracked to true, preliminary). + tracked_child.parents.insert(obj_iri, tracked_child); + + // Add link to children + tracked_predicate + .tracked_children + .push(unsafe { Weak::from_raw(tracked_child) }); + } + } + } + } + // Process removed triples. + for triple in triples_removed { + let pred_iri = triple.predicate.as_str(); + + // Only adjust if we had tracked state. + let tracked_predicate_opt = tracked_subjects + .get_mut(subject_iri) + .and_then(|tss| tss.get_mut(&shape.iri)) + .and_then(|ts| ts.tracked_predicates.get_mut(pred_iri)); + let Some(tracked_predicate) = tracked_predicate_opt else { + continue; + }; + + // The cardinality might become -1 or 0. We will remove them from the tracked predicates during validation. + tracked_predicate.current_cardinality = + tracked_predicate.current_cardinality.saturating_sub(1); + + let Some(pred_changes) = subject_changes.predicates.get_mut(pred_iri) else { + continue; + }; + + let val_removed = oxrdf_term_to_orm_basic_type(&triple.object); + pred_changes.values_removed.push(val_removed.clone()); + + // If value type is literal, we need to remove the current value from the tracked predicate. + if tracked_predicate + .schema + .dataTypes + .iter() + .any(|dt| dt.valType == OrmSchemaLiteralType::literal) + { + if let Some(current_literals) = &mut tracked_predicate.current_literals { + // Remove obj_val from current_literals in-place + current_literals.retain(|val| *val != val_removed); + } else { + tracked_predicate.current_literals = Some(vec![val_removed]); + } + } else if tracked_predicate + .schema + .dataTypes + .iter() + .any(|dt| dt.valType == OrmSchemaLiteralType::shape) + { + // Remove parent from child and child from tracked children. + for shape_iri in tracked_predicate + .schema + .dataTypes + .iter() + .filter(|dt| dt.valType == OrmSchemaLiteralType::shape) + .flat_map(|dt| dt.shape) + { + // Nested shape removal logic disabled (see note above). + } + } + } + Ok(()) +} + +/// Check the validity of a subject and update affecting tracked subjects' validity. +/// Might return nested objects that need to be validated. +/// Assumes all triples to be of same subject. +pub fn update_subject_validity<'a>( + s_change: &'a OrmTrackedSubjectChange<'a>, + shape: &OrmSchemaShape, + tracked_subjects: &HashMap>>, + previous_validity: OrmTrackedSubjectValidity, +) -> ( + OrmTrackedSubjectValidity, + // Vec + Vec<(String, String)>, +) { + let Some(tracked_shapes) = tracked_subjects.get_mut(subject_iri) else { + return (previous_validity, vec![]); + }; + let Some(tracked_subject) = tracked_shapes.get_mut(&shape.iri) else { + return (previous_validity, vec![]); + }; + + // Check 1) Check if we need to fetch this object. + // If this subject has not been monitored but parents are now valid or need evaluation, we fetch. + + // Check 2) If all parents are untracked, return untracked. + if tracked_subject.parents.len() != 0 { + let no_parents_tracking = tracked_subject.parents.values().all(|parent| { + parent.valid == OrmTrackedSubjectValidity::Untracked + || parent.valid == OrmTrackedSubjectValidity::Invalid + }); + + if no_parents_tracking { + // Remove tracked predicates and set untracked. + tracked_subject.tracked_predicates = HashMap::new(); + tracked_subject.valid = OrmTrackedSubjectValidity::Untracked; + return (OrmTrackedSubjectValidity::Untracked, vec![]); + } else if !no_parents_tracking && previous_validity == OrmTrackedSubjectValidity::Untracked + { + // We need to fetch the subject's current state: + // We have new parents but were previously not recording changes. + + // TODO + } + } + + // Check 2) If there are no changes, there is nothing to do. + if s_change.predicates.is_empty() { + return (previous_validity, vec![]); + } + + let mut new_validity = OrmTrackedSubjectValidity::Valid; + fn set_validity(current: &mut OrmTrackedSubjectValidity, new_val: OrmTrackedSubjectValidity) { + if new_val == OrmTrackedSubjectValidity::Invalid { + *current = OrmTrackedSubjectValidity::Invalid; + } else { + *current = new_val; + } + } + + // Check 3) If there is an infinite loop of parents pointing back to us, return invalid. + // Create a set of visited parents to detect cycles. + if has_cycle(tracked_subject, &mut HashSet::new()) { + // Remove tracked predicates and set invalid. + tracked_subject.tracked_predicates = HashMap::new(); + tracked_subject.valid = OrmTrackedSubjectValidity::Invalid; + return (OrmTrackedSubjectValidity::Invalid, vec![]); + } + + // Keep track of objects that need to be validated against a shape to fetch and validate. + let mut need_evaluation: Vec<(String, String)> = vec![]; + + // Check 4) Validate subject against each predicate in shape. + for p_schema in shape.predicates.iter() { + let p_change = s_change.predicates.get(&p_schema.iri); + let tracked_pred = p_change.map(|pc| pc.tracked_predicate); + + let count = + tracked_pred.map_or_else(|| 0, |tp: &OrmTrackedPredicate| tp.current_cardinality); + + // Check 4.1) Cardinality + if count < p_schema.minCardinality { + set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); + if count <= 0 { + // If cardinality is 0, we can remove the tracked predicate. + tracked_subject.tracked_predicates.remove(&p_schema.iri); + } + break; + // Check 4.2) Cardinality too high and extra values not allowed. + } else if count > p_schema.maxCardinality + && p_schema.maxCardinality != -1 + && p_schema.extra != Some(true) + { + // If cardinality is too high and no extra allowed, invalid. + set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); + break; + // Check 4.3) Required literals present. + } else if p_schema + .dataTypes + .iter() + .any(|dt| dt.valType == OrmSchemaLiteralType::literal) + { + // If we have literals, check if all required literals are present. + // At least one datatype must match. + let some_valid = + p_schema + .dataTypes + .iter() + .flat_map(|dt| &dt.literals) + .any(|required_literals| { + // Early stop: If no extra values allowed but the sizes + // between required and given values mismatches. + if !p_schema.extra.unwrap_or(false) + && ((required_literals.len() as i32) + != tracked_pred.map_or(0, |p| p.current_cardinality)) + { + return false; + } + + // Check that each required literal is present. + for required_literal in required_literals { + // Is tracked predicate present? + if !tracked_pred + .iter() + .flat_map(|tp| &tp.current_literals) + .flatten() + .any(|literal| *literal == *required_literal) + { + return false; + } + } + // All required literals present. + return true; + }); + if !some_valid { + set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); + } + // Check 4.4) Nested shape correct. + } else if p_schema + .dataTypes + .iter() + .any(|dt| dt.valType == OrmSchemaLiteralType::shape) + { + // If we have a nested shape, we need to check if the nested objects are tracked and valid. + + // First, Count valid, invalid, unknowns, and untracked + let counts = tracked_pred + .iter() + .flat_map(|tp| tp.tracked_children) + .map(|tc| { + tc.upgrade().map(|tc| { + if tc.valid == OrmTrackedSubjectValidity::Valid { + (1, 0, 0, 0) + } else if tc.valid == OrmTrackedSubjectValidity::Invalid { + (0, 1, 0, 0) + } else if tc.valid == OrmTrackedSubjectValidity::Pending { + (0, 0, 1, 0) + } else if tc.valid == OrmTrackedSubjectValidity::Untracked { + (0, 0, 0, 1) + } else { + (0, 0, 0, 0) + } + }) + }) + .flatten() + .fold((0, 0, 0, 0), |(v1, i1, u1, ut1), o| { + (v1 + o.0, i1 + o.1, u1 + o.2, ut1 + o.3) + }); + + if counts.1 > 0 && p_schema.extra != Some(true) { + // If we have at least one invalid nested object and no extra allowed, invalid. + set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); + break; + } else if counts.0 < p_schema.minCardinality { + // If we have not enough valid nested objects, invalid. + set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); + break; + } else if counts.3 > 0 { + // If we have untracked nested objects, we need to fetch them and validate. + set_validity(&mut new_validity, OrmTrackedSubjectValidity::Pending); + // After that we need to reevaluate this (subject,shape) again. + need_evaluation.push((subject_iri.to_string(), shape.iri.clone(), false)); + // Also schedule untracked children for fetching and validation. + if let Some(tp) = tracked_pred { + for weak_child in &tp.tracked_children { + if let Some(child) = weak_child.upgrade() { + if child.valid == OrmTrackedSubjectValidity::Untracked { + need_evaluation.push(( + child.subject_iri.clone(), + child.shape.iri.clone(), + true, + )); + } + } + } + } + } else if counts.2 > 0 { + // If we have unknown nested objects, we need to wait for their evaluation. + set_validity(&mut new_validity, OrmTrackedSubjectValidity::Pending); + // Schedule unknown children (NotEvaluated) for re-evaluation without fetch. + if let Some(tp) = tracked_pred { + for weak_child in &tp.tracked_children { + if let Some(child) = weak_child.upgrade() { + if child.valid == OrmTrackedSubjectValidity::Pending { + need_evaluation.push(( + child.subject_iri.clone(), + child.shape.iri.clone(), + false, + )); + } + } + } + } + } else { + // All nested objects are valid and cardinality is correct. + // We are valid with this predicate. + } + // Check 4.5) Data types correct. + } else { + // Check if the data type is correct. + let allowed_types: Vec<&OrmSchemaLiteralType> = + p_schema.dataTypes.iter().map(|dt| &dt.valType).collect(); + // For each new value, check that data type is in allowed_types. + for val_added in p_change.iter().map(|pc| pc.values_added).flatten() { + let matches = match val_added { + BasicType::Bool(_) => allowed_types + .iter() + .any(|t| **t == OrmSchemaLiteralType::boolean), + BasicType::Num(_) => allowed_types + .iter() + .any(|t| **t == OrmSchemaLiteralType::number), + BasicType::Str(_) => allowed_types.iter().any(|t| { + **t == OrmSchemaLiteralType::string || **t == OrmSchemaLiteralType::iri + }), + }; + if !matches { + set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); + break; + } + } + // Break again if validity has become invalid. + if new_validity == OrmTrackedSubjectValidity::Invalid { + break; + } + }; + } + + if new_validity == OrmTrackedSubjectValidity::Invalid { + // If we are invalid, we can discard new unknowns again - they won't be kept in memory. + // We need to remove ourself from child objects parents field and + // remove them if no other is tracking. + // Child relationship cleanup disabled (nested tracking disabled in this refactor step) + + // Remove tracked predicates and set untracked. + tracked_subject.tracked_predicates = HashMap::new(); + + // Empty list of children that need evaluation. + need_evaluation.retain(|_| false); + } else if new_validity == OrmTrackedSubjectValidity::Valid + && previous_validity != OrmTrackedSubjectValidity::Valid + { + // If this subject became valid, we need to refetch this subject; + // For that we prepend self to needs_fetch. + need_evaluation.insert(0, (s_change.subject_iri, shape.iri.clone())); + } + + // If validity changed, parents need to be re-evaluated. + if new_validity != previous_validity { + // We return the tracking parents which need re-evaluation. + // Remember that the last elements (i.e. children or needs_fetch) are evaluated first. + return ( + new_validity, + // Add parents and objects in `need_evaluation`. + tracked_subject + .parents + .values() + // Inform tracking parents only. + .filter(|(parent, is_tracked)| *is_tracked) + .map(|(parent, is_tracked)| (&parent.subject_iri, parent.shape)) + // Add `need_evaluation`. + .chain(need_evaluation) + .collect(), + ); + } + + tracked_subject.valid = new_validity; + + return (new_validity, need_evaluation); +} + +fn oxrdf_term_to_orm_basic_type(term: &ng_oxigraph::oxrdf::Term) -> BasicType { + match oxrdf_term_to_orm_term(term) { + ng_net::orm::Term::Str(s) => BasicType::Str(s), + ng_net::orm::Term::Num(n) => BasicType::Num(n), + ng_net::orm::Term::Bool(b) => BasicType::Bool(b), + ng_net::orm::Term::Ref(b) => BasicType::Str(b), // Treat IRIs as strings + } +} + +fn has_cycle(subject: &OrmTrackedSubject, visited: &mut HashSet) -> bool { + if visited.contains(&subject.subject_iri) { + return true; + } + visited.insert(subject.subject_iri.clone()); + for (_parent_iri, (parent_subject, _)) in &subject.parents { + if has_cycle(parent_subject, visited) { + return true; + } + } + visited.remove(&subject.subject_iri); + false +} + +/// Converts an oxrdf::Term to an orm::Term +fn oxrdf_term_to_orm_term(term: &ng_oxigraph::oxrdf::Term) -> ng_net::orm::Term { + match term { + ng_oxigraph::oxrdf::Term::NamedNode(node) => { + ng_net::orm::Term::Ref(node.as_str().to_string()) + } + ng_oxigraph::oxrdf::Term::BlankNode(node) => { + ng_net::orm::Term::Ref(node.as_str().to_string()) + } + ng_oxigraph::oxrdf::Term::Literal(literal) => { + // Check the datatype to determine how to convert + match literal.datatype().as_str() { + // Check for string first, this is the most common. + "http://www.w3.org/2001/XMLSchema#string" => { + ng_net::orm::Term::Str(literal.value().to_string()) + } + "http://www.w3.org/2001/XMLSchema#boolean" => { + match literal.value().parse::() { + Ok(b) => ng_net::orm::Term::Bool(b), + Err(_) => ng_net::orm::Term::Str(literal.value().to_string()), + } + } + "http://www.w3.org/2001/XMLSchema#integer" + | "http://www.w3.org/2001/XMLSchema#decimal" + | "http://www.w3.org/2001/XMLSchema#double" + | "http://www.w3.org/2001/XMLSchema#float" + | "http://www.w3.org/2001/XMLSchema#int" + | "http://www.w3.org/2001/XMLSchema#long" + | "http://www.w3.org/2001/XMLSchema#short" + | "http://www.w3.org/2001/XMLSchema#byte" + | "http://www.w3.org/2001/XMLSchema#unsignedInt" + | "http://www.w3.org/2001/XMLSchema#unsignedLong" + | "http://www.w3.org/2001/XMLSchema#unsignedShort" + | "http://www.w3.org/2001/XMLSchema#unsignedByte" => { + match literal.value().parse::() { + Ok(n) => ng_net::orm::Term::Num(n), + Err(_) => ng_net::orm::Term::Str(literal.value().to_string()), + } + } + _ => ng_net::orm::Term::Str(literal.value().to_string()), + } + } + ng_oxigraph::oxrdf::Term::Triple(triple) => { + // For RDF-star triples, convert to string representation + ng_net::orm::Term::Str(triple.to_string()) + } + } +} diff --git a/ng-verifier/src/verifier.rs b/ng-verifier/src/verifier.rs index d5b36fc..0cdfe38 100644 --- a/ng-verifier/src/verifier.rs +++ b/ng-verifier/src/verifier.rs @@ -19,15 +19,12 @@ use std::fs::create_dir_all; use std::fs::{read, File, OpenOptions}; #[cfg(all(not(target_family = "wasm"), not(docsrs)))] use std::io::Write; -use std::rc::Weak; use std::{collections::HashMap, sync::Arc}; use async_std::stream::StreamExt; use async_std::sync::{Mutex, RwLockReadGuard}; use futures::channel::mpsc; use futures::SinkExt; -use ng_net::orm::OrmSchemaPredicate; -use ng_net::orm::OrmShapeTypeRef; use ng_net::orm::OrmSubscription; use ng_oxigraph::oxigraph::sparql::Query; use ng_oxigraph::oxigraph::sparql::QueryResults; @@ -115,12 +112,12 @@ pub struct Verifier { in_memory_outbox: Vec, uploads: BTreeMap, branch_subscriptions: HashMap>, - pub(crate) orm_tracked_subjects: - HashMap>>, - pub(crate) orm_shape_types: HashMap, + pub(crate) orm_subscriptions: HashMap, pub(crate) temporary_repo_certificates: HashMap, } +type SessionId = u64; + impl fmt::Debug for Verifier { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { writeln!(f, "Verifier\nconfig: {:?}", self.config)?; @@ -523,8 +520,7 @@ impl Verifier { inner_to_outer: HashMap::new(), uploads: BTreeMap::new(), branch_subscriptions: HashMap::new(), - orm_tracked_subjects: HashMap::new(), - orm_shape_types: HashMap::new(), + orm_subscriptions: HashMap::new(), temporary_repo_certificates: HashMap::new(), } } @@ -2816,8 +2812,7 @@ impl Verifier { inner_to_outer: HashMap::new(), uploads: BTreeMap::new(), branch_subscriptions: HashMap::new(), - orm_tracked_subjects: HashMap::new(), - orm_shape_types: HashMap::new(), + orm_subscriptions: HashMap::new(), temporary_repo_certificates: HashMap::new(), }; // this is important as it will load the last seq from storage