diff --git a/nextgraph/src/tests/orm.rs b/nextgraph/src/tests/orm.rs index 45692e2..190aa45 100644 --- a/nextgraph/src/tests/orm.rs +++ b/nextgraph/src/tests/orm.rs @@ -10,15 +10,15 @@ use crate::local_broker::{doc_create, doc_sparql_construct, doc_sparql_update}; use crate::tests::create_or_open_wallet::create_or_open_wallet; use ng_net::orm::{ - OrmSchemaDataType, OrmSchemaLiteralType, OrmSchemaLiterals, OrmSchemaPredicate, OrmSchemaShape, - OrmShapeType, + BasicType, OrmSchemaDataType, OrmSchemaLiteralType, OrmSchemaPredicate, OrmSchemaShape, OrmShapeType }; use ng_repo::log_info; use ng_verifier::orm::shape_type_to_sparql; use std::collections::HashMap; +use std::sync::Arc; #[async_std::test] -#async fn test_create_sparql_from_schema() { +async fn test_create_sparql_from_schema() { // Setup wallet and document let (_wallet, session_id) = create_or_open_wallet().await; let doc_nuri = doc_create( @@ -70,10 +70,10 @@ INSERT DATA { schema.insert( "http://example.org/TestObject||http://example.org/anotherObject".to_string(), - OrmSchemaShape { + Arc::new(OrmSchemaShape { iri: "http://example.org/TestObject||http://example.org/anotherObject".to_string(), predicates: vec![ - OrmSchemaPredicate { + Arc::new(OrmSchemaPredicate { dataTypes: vec![OrmSchemaDataType { valType: OrmSchemaLiteralType::string, literals: None, @@ -84,8 +84,8 @@ INSERT DATA { maxCardinality: 1, minCardinality: 1, extra: None, - }, - OrmSchemaPredicate { + }), + Arc::new(OrmSchemaPredicate { dataTypes: vec![OrmSchemaDataType { valType: OrmSchemaLiteralType::number, literals: None, @@ -96,22 +96,22 @@ INSERT DATA { maxCardinality: 1, minCardinality: 1, extra: None, - }, + }), ], - }, + }), ); schema.insert( "http://example.org/TestObject".to_string(), - OrmSchemaShape { + Arc::new(OrmSchemaShape { iri: "http://example.org/TestObject".to_string(), predicates: vec![ - OrmSchemaPredicate { + Arc::new(OrmSchemaPredicate { dataTypes: vec![OrmSchemaDataType { valType: OrmSchemaLiteralType::literal, - literals: Some(OrmSchemaLiterals::StrArray(vec![ - "http://example.org/TestObject".to_string(), - ])), + literals: Some(vec![ + BasicType::Str("http://example.org/TestObject".to_string()), + ]), shape: None, }], iri: "http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(), @@ -119,8 +119,8 @@ INSERT DATA { maxCardinality: 1, minCardinality: 1, extra: Some(true), - }, - OrmSchemaPredicate { + }), + Arc::new(OrmSchemaPredicate { dataTypes: vec![OrmSchemaDataType { valType: OrmSchemaLiteralType::string, literals: None, @@ -131,8 +131,8 @@ INSERT DATA { maxCardinality: 1, minCardinality: 1, extra: None, - }, - OrmSchemaPredicate { + }), + Arc::new(OrmSchemaPredicate { dataTypes: vec![OrmSchemaDataType { valType: OrmSchemaLiteralType::number, literals: None, @@ -143,8 +143,8 @@ INSERT DATA { maxCardinality: 1, minCardinality: 1, extra: None, - }, - OrmSchemaPredicate { + }), + Arc::new(OrmSchemaPredicate { dataTypes: vec![OrmSchemaDataType { valType: OrmSchemaLiteralType::boolean, literals: None, @@ -155,8 +155,8 @@ INSERT DATA { maxCardinality: 1, minCardinality: 1, extra: None, - }, - OrmSchemaPredicate { + }), + Arc::new(OrmSchemaPredicate { dataTypes: vec![OrmSchemaDataType { valType: OrmSchemaLiteralType::number, literals: None, @@ -167,8 +167,8 @@ INSERT DATA { maxCardinality: -1, minCardinality: 0, extra: None, - }, - OrmSchemaPredicate { + }), + Arc::new(OrmSchemaPredicate { dataTypes: vec![OrmSchemaDataType { valType: OrmSchemaLiteralType::shape, literals: None, @@ -182,8 +182,8 @@ INSERT DATA { maxCardinality: 1, minCardinality: 1, extra: None, - }, - OrmSchemaPredicate { + }), + Arc::new(OrmSchemaPredicate { dataTypes: vec![OrmSchemaDataType { valType: OrmSchemaLiteralType::shape, literals: None, @@ -197,8 +197,8 @@ INSERT DATA { maxCardinality: -1, minCardinality: 0, extra: None, - }, - OrmSchemaPredicate { + }), + Arc::new(OrmSchemaPredicate { dataTypes: vec![ OrmSchemaDataType { valType: OrmSchemaLiteralType::string, @@ -216,14 +216,14 @@ INSERT DATA { maxCardinality: 1, minCardinality: 1, extra: None, - }, - OrmSchemaPredicate { + }), + Arc::new(OrmSchemaPredicate { dataTypes: vec![OrmSchemaDataType { valType: OrmSchemaLiteralType::literal, - literals: Some(OrmSchemaLiterals::StrArray(vec![ - "lit1".to_string(), - "lit2".to_string(), - ])), + literals: Some(vec![ + BasicType::Str("lit1".to_string()), + BasicType::Str("lit2".to_string()), + ]), shape: None, }], iri: "http://example.org/lit1Or2".to_string(), @@ -231,17 +231,17 @@ INSERT DATA { maxCardinality: 1, minCardinality: 1, extra: None, - }, + }), ], - }, + }), ); schema.insert( "http://example.org/TestObject||http://example.org/objectValue".to_string(), - OrmSchemaShape { + Arc::new(OrmSchemaShape { iri: "http://example.org/TestObject||http://example.org/objectValue".to_string(), predicates: vec![ - OrmSchemaPredicate { + Arc::new(OrmSchemaPredicate { dataTypes: vec![OrmSchemaDataType { valType: OrmSchemaLiteralType::string, literals: None, @@ -252,8 +252,8 @@ INSERT DATA { maxCardinality: 1, minCardinality: 1, extra: None, - }, - OrmSchemaPredicate { + }), + Arc::new(OrmSchemaPredicate { dataTypes: vec![OrmSchemaDataType { valType: OrmSchemaLiteralType::number, literals: None, @@ -264,8 +264,8 @@ INSERT DATA { maxCardinality: 1, minCardinality: 1, extra: None, - }, - OrmSchemaPredicate { + }), + Arc::new(OrmSchemaPredicate { dataTypes: vec![OrmSchemaDataType { valType: OrmSchemaLiteralType::number, literals: None, @@ -276,9 +276,9 @@ INSERT DATA { maxCardinality: -1, minCardinality: 0, extra: None, - }, + }), ], - }, + }), ); let shape_type = OrmShapeType { @@ -368,21 +368,21 @@ INSERT DATA { let mut schema = HashMap::new(); schema.insert( "http://example.org/TestObject".to_string(), - OrmSchemaShape { + Arc::new(OrmSchemaShape { iri: "http://example.org/TestObject".to_string(), predicates: vec![ - OrmSchemaPredicate { + Arc::new(OrmSchemaPredicate { iri: "http://example.org/prop1".to_string(), minCardinality: 1, ..Default::default() - }, - OrmSchemaPredicate { + }), + Arc::new(OrmSchemaPredicate { iri: "http://example.org/prop2".to_string(), minCardinality: 1, ..Default::default() - }, + }), ], - }, + }), ); let shape_type = OrmShapeType { schema, @@ -431,21 +431,21 @@ INSERT DATA { let mut schema = HashMap::new(); schema.insert( "http://example.org/TestObject".to_string(), - OrmSchemaShape { + Arc::new(OrmSchemaShape { iri: "http://example.org/TestObject".to_string(), predicates: vec![ - OrmSchemaPredicate { + Arc::new(OrmSchemaPredicate { iri: "http://example.org/prop1".to_string(), minCardinality: 1, ..Default::default() - }, - OrmSchemaPredicate { + }), + Arc::new(OrmSchemaPredicate { iri: "http://example.org/prop2".to_string(), minCardinality: 0, // Optional ..Default::default() - }, + }), ], - }, + }), ); let shape_type = OrmShapeType { schema, @@ -495,20 +495,20 @@ INSERT DATA { let mut schema = HashMap::new(); schema.insert( "http://example.org/Person".to_string(), - OrmSchemaShape { + Arc::new(OrmSchemaShape { iri: "http://example.org/Person".to_string(), predicates: vec![ - OrmSchemaPredicate { + Arc::new(OrmSchemaPredicate { iri: "http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(), minCardinality: 1, ..Default::default() - }, - OrmSchemaPredicate { + }), + Arc::new(OrmSchemaPredicate { iri: "http://example.org/name".to_string(), minCardinality: 1, ..Default::default() - }, - OrmSchemaPredicate { + }), + Arc::new(OrmSchemaPredicate { iri: "http://example.org/knows".to_string(), minCardinality: 0, maxCardinality: -1, @@ -518,9 +518,9 @@ INSERT DATA { literals: None, }], ..Default::default() - }, + }), ], - }, + }), ); let shape_type = OrmShapeType { schema, diff --git a/ng-net/src/orm.rs b/ng-net/src/orm.rs index 7884a7a..0764d91 100644 --- a/ng-net/src/orm.rs +++ b/ng-net/src/orm.rs @@ -54,12 +54,12 @@ pub struct OrmDiffOp { pub type OrmDiff = Vec; -pub type OrmSchema = HashMap; +pub type OrmSchema = HashMap>; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct OrmSchemaShape { pub iri: String, - pub predicates: Vec, + pub predicates: Vec>, } #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] @@ -105,10 +105,10 @@ pub struct OrmSchemaPredicate { pub struct OrmTrackedSubject { /// The known predicates (only those relevant to the shape). /// If there are no triples with a predicate, they are discarded - pub tracked_predicates: HashMap, + pub tracked_predicates: HashMap>, /// If this is a nested subject, this records the parents /// and if they are currently tracking this subject. - pub parents: HashMap, + pub parents: HashMap>, /// Validity. When untracked, triple updates are not processed here. pub valid: OrmTrackedSubjectValidity, pub subject_iri: String, @@ -138,14 +138,14 @@ pub struct OrmTrackedPredicate { // Used only for tracking construction of new objects and diffs // in parallel to modifying the tracked objects and predicates. -pub struct OrmTrackedSubjectChange<'a> { +pub struct OrmTrackedSubjectChange { pub subject_iri: String, /// Predicates that were changed. - pub predicates: HashMap>, + pub predicates: HashMap, } -pub struct OrmTrackedPredicateChanges<'a> { +pub struct OrmTrackedPredicateChanges { /// The tracked predicate for which those changes were recorded. - pub tracked_predicate: &'a OrmTrackedPredicate, + pub tracked_predicate: Weak, pub values_added: Vec, pub values_removed: Vec, } @@ -164,7 +164,7 @@ pub struct OrmSubscription { pub session_id: u64, pub nuri: NuriV0, pub sender: Sender, - pub tracked_subjects: HashMap>, + pub tracked_subjects: HashMap>>, } type ShapeIri = String; type SubjectIri = String; diff --git a/ng-verifier/src/commits/mod.rs b/ng-verifier/src/commits/mod.rs index 4be9f9e..068e580 100644 --- a/ng-verifier/src/commits/mod.rs +++ b/ng-verifier/src/commits/mod.rs @@ -29,7 +29,7 @@ use ng_net::app_protocol::*; use crate::verifier::Verifier; -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] pub trait CommitVerifier { async fn verify( &self, @@ -288,7 +288,7 @@ impl CommitVerifier for AddBranch { Ok(()) } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for Repository { async fn verify( &self, @@ -302,7 +302,7 @@ impl CommitVerifier for Repository { Ok(()) } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for StoreUpdate { async fn verify( &self, @@ -315,7 +315,7 @@ impl CommitVerifier for StoreUpdate { verifier.new_store_from_update(self) } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for AddInboxCap { async fn verify( &self, @@ -330,7 +330,7 @@ impl CommitVerifier for AddInboxCap { } } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for AddSignerCap { async fn verify( &self, @@ -345,7 +345,7 @@ impl CommitVerifier for AddSignerCap { } } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for AddMember { #[allow(unused_variables)] async fn verify( @@ -359,7 +359,7 @@ impl CommitVerifier for AddMember { Ok(()) } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for RemoveMember { #[allow(unused_variables)] async fn verify( @@ -373,7 +373,7 @@ impl CommitVerifier for RemoveMember { Ok(()) } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for AddPermission { #[allow(unused_variables)] async fn verify( @@ -387,7 +387,7 @@ impl CommitVerifier for AddPermission { Ok(()) } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for RemovePermission { #[allow(unused_variables)] async fn verify( @@ -401,7 +401,7 @@ impl CommitVerifier for RemovePermission { Ok(()) } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for RemoveBranch { #[allow(unused_variables)] async fn verify( @@ -415,7 +415,7 @@ impl CommitVerifier for RemoveBranch { Ok(()) } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for AddName { #[allow(unused_variables)] async fn verify( @@ -429,7 +429,7 @@ impl CommitVerifier for AddName { Ok(()) } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for RemoveName { #[allow(unused_variables)] async fn verify( @@ -443,7 +443,7 @@ impl CommitVerifier for RemoveName { Ok(()) } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for () { #[allow(unused_variables)] async fn verify( @@ -457,7 +457,7 @@ impl CommitVerifier for () { Ok(()) } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for Snapshot { #[allow(unused_variables)] async fn verify( @@ -484,7 +484,7 @@ impl CommitVerifier for Snapshot { Ok(()) } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for AddFile { #[allow(unused_variables)] async fn verify( @@ -529,7 +529,7 @@ impl CommitVerifier for AddFile { } } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for RemoveFile { #[allow(unused_variables)] async fn verify( @@ -543,7 +543,7 @@ impl CommitVerifier for RemoveFile { Ok(()) } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for Compact { #[allow(unused_variables)] async fn verify( @@ -557,7 +557,7 @@ impl CommitVerifier for Compact { Ok(()) } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for AsyncSignature { #[allow(unused_variables)] async fn verify( @@ -605,7 +605,7 @@ impl CommitVerifier for AsyncSignature { } } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for RootCapRefresh { #[allow(unused_variables)] async fn verify( @@ -619,7 +619,7 @@ impl CommitVerifier for RootCapRefresh { Ok(()) } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for BranchCapRefresh { #[allow(unused_variables)] async fn verify( @@ -633,7 +633,7 @@ impl CommitVerifier for BranchCapRefresh { Ok(()) } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for AddRepo { #[allow(unused_variables)] async fn verify( @@ -656,7 +656,7 @@ impl CommitVerifier for AddRepo { Ok(()) } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for RemoveRepo { #[allow(unused_variables)] async fn verify( @@ -670,7 +670,7 @@ impl CommitVerifier for RemoveRepo { Ok(()) } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for AddLink { #[allow(unused_variables)] async fn verify( @@ -684,7 +684,7 @@ impl CommitVerifier for AddLink { Ok(()) } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for RemoveLink { #[allow(unused_variables)] async fn verify( @@ -698,7 +698,7 @@ impl CommitVerifier for RemoveLink { Ok(()) } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for RemoveSignerCap { #[allow(unused_variables)] async fn verify( @@ -712,7 +712,7 @@ impl CommitVerifier for RemoveSignerCap { Ok(()) } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl CommitVerifier for WalletUpdate { #[allow(unused_variables)] async fn verify( diff --git a/ng-verifier/src/orm.rs b/ng-verifier/src/orm.rs index 6a9317a..37e89f3 100644 --- a/ng-verifier/src/orm.rs +++ b/ng-verifier/src/orm.rs @@ -8,22 +8,16 @@ // according to those terms. use futures::channel::mpsc; -use ng_net::orm::OrmSubscription; + use std::collections::HashMap; use std::collections::HashSet; +use std::sync::Arc; use futures::SinkExt; use lazy_static::lazy_static; -use ng_net::orm::BasicType; -pub use ng_net::orm::OrmDiff; -use ng_net::orm::OrmSchemaLiteralType; -pub use ng_net::orm::OrmShapeType; -use ng_net::orm::OrmTrackedSubject; -use ng_net::orm::OrmTrackedSubjectChange; -use ng_net::orm::OrmTrackedSubjectValidity; -use ng_net::orm::{OrmSchemaDataType, OrmSchemaShape}; use ng_net::utils::Receiver; -use ng_net::{app_protocol::*, orm::OrmSchema}; +use ng_net::{app_protocol::*, orm::*}; +pub use ng_net::orm::{OrmDiff, OrmShapeType}; use ng_oxigraph::oxigraph::sparql::{Query, QueryResults}; use ng_oxigraph::oxrdf::Triple; use ng_repo::errors::NgError; @@ -42,7 +36,7 @@ type SubjectIri = String; // Structure to store changes in. By shape iri > subject iri > OrmTrackedSubjectChange // **NOTE**: In comparison to OrmSubscription.tracked_subjects, the outer hashmap's keys are shape IRIs. // (shape IRI -> (subject IRI -> OrmTrackedSubjectChange)) -type OrmChanges<'a> = HashMap>>; +type OrmChanges = HashMap>; impl Verifier { pub fn query_sparql_construct( @@ -83,44 +77,45 @@ impl Verifier { } } - fn process_changes_for_subscription<'a>( + fn process_changes_for_subscription( self: &mut Self, - orm_subscription: &'a mut OrmSubscription, - triples_added: &'a [Triple], - triples_removed: &'a [Triple], - nuri: &String, - ) -> Result, NgError> { - let tracked_subjects = &mut orm_subscription.tracked_subjects; + nuri: &NuriV0, + triples_added: &[Triple], + triples_removed: &[Triple] + ) -> Result { + let orm_subscription = self.orm_subscriptions.get(nuri).unwrap(); + //let tracked_subjects = orm_subscription.tracked_subjects; let schema = &orm_subscription.shape_type.schema; let root_shape = schema .get(&orm_subscription.shape_type.shape) .ok_or(VerifierError::InvalidOrmSchema)?; - return self.process_changes_for_subject_and_shape( - root_shape, - tracked_subjects, - schema, + let mut orm_changes = HashMap::new(); + self.process_changes_for_subject_and_shape( + root_shape.clone(), triples_added, triples_removed, nuri, - None, - ); + &mut orm_changes, + )?; + Ok(orm_changes) } - fn process_changes_for_subject_and_shape<'a>( + fn process_changes_for_subject_and_shape( self: &mut Self, - root_shape: &OrmSchemaShape, - tracked_subjects: &mut HashMap>, - schema: &OrmSchema, - triples_added: &'a [Triple], - triples_removed: &'a [Triple], - nuri: &String, - mut orm_changes: Option>, - ) -> Result, NgError> { - let mut orm_changes: OrmChanges<'a> = orm_changes.take().unwrap_or_else(HashMap::new); + root_shape: Arc, + triples_added: &[Triple], + triples_removed: &[Triple], + nuri: &NuriV0, + orm_changes: &mut OrmChanges, + ) -> Result<(), NgError> { + let nuri_repo = nuri.repo(); + //let orm_subscription = self.orm_subscriptions.get(nuri).unwrap(); + //let tracked_subjects = orm_subscription.tracked_subjects; + //let schema = &orm_subscription.shape_type.schema; // First in, last out stack to keep track of objects to validate (nested objects first). Strings are object IRIs. - let mut shape_validation_queue: Vec<(&OrmSchemaShape, Vec)> = vec![]; + let mut shape_validation_queue: Vec<(Arc, Vec)> = vec![]; // Add root shape for first validation run. shape_validation_queue.push((root_shape, vec![])); @@ -129,9 +124,9 @@ impl Verifier { while let Some((shape, objects_to_validate)) = shape_validation_queue.pop() { // Collect triples relevant for validation. let added_triples_by_subject = - group_by_subject_for_shape(shape, triples_added, &objects_to_validate); + group_by_subject_for_shape(&shape, triples_added, &objects_to_validate); let removed_triples_by_subject = - group_by_subject_for_shape(shape, triples_removed, &objects_to_validate); + group_by_subject_for_shape(&shape, triples_removed, &objects_to_validate); let all_modified_subjects: HashSet<&SubjectIri> = added_triples_by_subject .keys() .chain(removed_triples_by_subject.keys()) @@ -165,38 +160,47 @@ impl Verifier { // Apply all triples for that subject to the tracked (shape, subject) pair. // Record the changes. - if let Err(e) = add_remove_triples_mut( - shape, - subject_iri, - triples_added_for_subj, - triples_removed_for_subj, - tracked_subjects, - change, - ) { - log_err!("apply_changes_from_triples add/remove error: {:?}", e); - } - - let tracked_subject_opt = tracked_subjects - .get_mut(subject_iri) - .and_then(|m| m.get_mut(&shape.iri)); - let Some(tracked_subject) = tracked_subject_opt else { - continue; - }; // skip if missing - - // Validate the subject. - let need_eval = - update_subject_validity(change, shape, tracked_subjects, tracked_subject.valid); - - // We add the need_eval to be processed next after loop. - for (iri, schema_shape, needs_refetch) in need_eval { - // Add to nested_objects_to_validate. - nested_objects_to_eval - .entry(schema_shape) - .or_insert_with(Vec::new) - .push((iri.clone(), needs_refetch)); + { + let orm_subscription = self.orm_subscriptions.get_mut(nuri).unwrap(); + if let Err(e) = add_remove_triples_mut( + shape.clone(), + subject_iri, + triples_added_for_subj, + triples_removed_for_subj, + &mut orm_subscription.tracked_subjects, + change, + ) { + log_err!("apply_changes_from_triples add/remove error: {:?}", e); + } + + + let validity = { + let tracked_subject_opt = orm_subscription.tracked_subjects + .get(subject_iri) + .and_then(|m| m.get(&shape.iri)); + let Some(tracked_subject) = tracked_subject_opt else { + continue; + }; // skip if missing + tracked_subject.valid.clone() + }; + // Validate the subject. + let need_eval = + update_subject_validity(change, &shape, &mut orm_subscription.tracked_subjects, validity); + + // We add the need_eval to be processed next after loop. + for (iri, schema_shape, needs_refetch) in need_eval { + // Add to nested_objects_to_validate. + nested_objects_to_eval + .entry(schema_shape) + .or_insert_with(Vec::new) + .push((iri.clone(), needs_refetch)); + } } } + let orm_subscription = self.orm_subscriptions.get(nuri).unwrap(); + let schema = &orm_subscription.shape_type.schema; + // Now, we fetch all un-fetched subjects for re-evaluation. for (shape_iri, objects_to_eval) in &nested_objects_to_eval { let objects_to_fetch = objects_to_eval @@ -211,16 +215,14 @@ impl Verifier { // Create sparql query let shape_query = shape_type_to_sparql(&schema, &shape_iri, Some(objects_to_fetch))?; - let new_triples = self.query_sparql_construct(shape_query, Some(nuri.clone()))?; - orm_changes = self.process_changes_for_subject_and_shape( - shape, - tracked_subjects, - schema, - new_triples, - &vec![], - nuri, - Some(orm_changes), - )?; + let new_triples = self.query_sparql_construct(shape_query, Some(nuri_repo.clone()))?; + // self.process_changes_for_subject_and_shape( + // shape.clone(), + // &new_triples, + // &vec![], + // nuri, + // orm_changes, + // )?; } // Now, add all subjects to the queue that did not need a fetch. for (shape_iri, objects_to_eval) in &nested_objects_to_eval { @@ -233,33 +235,32 @@ impl Verifier { .get(shape_iri) .ok_or(VerifierError::InvalidOrmSchema)?; - shape_validation_queue.push((shape, objects_to_fetch)); + shape_validation_queue.push((shape.clone(), objects_to_fetch)); } } - Ok(orm_changes) + Ok(()) } - fn apply_triple_changes<'a>( - &'a mut self, - triples_added: &'a [Triple], - triples_removed: &'a [Triple], + fn apply_triple_changes( + &mut self, + triples_added: &[Triple], + triples_removed: &[Triple], only_for_nuri: Option<&NuriV0>, only_for_session_id: Option, - ) -> Result, NgError> { + ) -> Result { // If we have a specific session, handle only that subscription. if let Some(session_id) = only_for_session_id { if let Some((nuri, sub)) = self .orm_subscriptions - .iter_mut() + .iter() .find(|(_, s)| s.session_id == session_id) { // TODO: Is repo correct? return self.process_changes_for_subscription( - sub, + &nuri.clone(), triples_added, - triples_removed, - &nuri.repo(), + triples_removed ); } else { return Ok(HashMap::new()); @@ -268,17 +269,19 @@ impl Verifier { // Otherwise, iterate all (optionally filter by nuri) and merge. let mut merged: OrmChanges = HashMap::new(); - for (nuri, sub) in self.orm_subscriptions.iter_mut() { + for nuri in self.orm_subscriptions.iter().filter(|(nuri,_)|{ if let Some(filter_nuri) = only_for_nuri { - if nuri != filter_nuri { - continue; + if *nuri != filter_nuri { + return false; } } + true + }).map(|(nuri,_)| nuri.clone()).collect::>() { + let changes = self.process_changes_for_subscription( - sub, + &nuri, triples_added, triples_removed, - &nuri.repo(), // TODO: Is this correct? )?; for (shape_iri, subj_map) in changes { merged @@ -294,7 +297,7 @@ impl Verifier { change: &OrmTrackedSubjectChange, changes: &OrmChanges, shape: &OrmSchemaShape, - tracked_subjects: &HashMap>, + tracked_subjects: &HashMap>>, ) -> Value { let mut orm_obj = json!({"id": change.subject_iri}); let orm_obj_map = orm_obj.as_object_mut().unwrap(); @@ -413,17 +416,22 @@ impl Verifier { fn create_orm_from_triples( &mut self, - orm_subscription: &OrmSubscription, + nuri: &NuriV0, triples: &[Triple], ) -> Result { + let session_id = { + let orm_subscription = self.orm_subscriptions.get(nuri).unwrap(); + orm_subscription.session_id + }; + let changes: OrmChanges = + self.apply_triple_changes(triples, &[], None, Some(session_id))?; + + let orm_subscription = self.orm_subscriptions.get(nuri).unwrap(); let root_shape_iri = &orm_subscription.shape_type.shape; let schema = &orm_subscription.shape_type.schema; let root_shape = schema .get(root_shape_iri) .ok_or(VerifierError::InvalidOrmSchema)?; - let changes: OrmChanges = - self.apply_triple_changes(triples, &[], None, Some(orm_subscription.session_id))?; - let Some(_root_changes) = changes.get(&root_shape.iri).map(|s| s.values()) else { return Ok(Value::Array(vec![])); }; @@ -455,7 +463,7 @@ impl Verifier { return Ok(return_vals); } - pub(crate) async fn orm_update(&mut self, scope: &NuriV0, patch: GraphTransaction) {} + pub(crate) async fn orm_update(&mut self, scope: &NuriV0, patch: GraphQuadsPatch) {} pub(crate) async fn orm_frontend_update( &mut self, @@ -487,14 +495,10 @@ impl Verifier { } pub(crate) fn clean_orm_subscriptions(&mut self) { - let subscriptions = self.orm_subscriptions; - - // TODO: Ownership issues. - // for (nuri, subscription) in subscriptions { - // if subscription.sender.is_closed() { - // subscriptions.remove(nuri); - // } - // } + + self.orm_subscriptions.retain(|_,subscription| + !subscription.sender.is_closed() + ); } pub(crate) async fn start_orm( @@ -526,8 +530,7 @@ impl Verifier { let shape_triples = self.query_sparql_construct(shape_query, Some(nuri.repo()))?; // Create objects from queried triples. - let subscription_ref = self.orm_subscriptions.get(nuri).unwrap(); - let _orm_objects = self.create_orm_from_triples(subscription_ref, &shape_triples)?; + let _orm_objects = self.create_orm_from_triples(nuri, &shape_triples)?; // TODO integrate response diff --git a/ng-verifier/src/utils/orm_validation.rs b/ng-verifier/src/utils/orm_validation.rs index 45a13e5..adfa648 100644 --- a/ng-verifier/src/utils/orm_validation.rs +++ b/ng-verifier/src/utils/orm_validation.rs @@ -9,20 +9,16 @@ use std::collections::HashMap; use std::collections::HashSet; +use std::sync::Weak; +use std::sync::Arc; -use ng_net::orm::BasicType; -use ng_net::orm::OrmSchemaLiteralType; -use ng_net::orm::OrmSchemaShape; -use ng_net::orm::OrmTrackedPredicate; -use ng_net::orm::OrmTrackedSubject; -use ng_net::orm::OrmTrackedSubjectChange; -use ng_net::orm::OrmTrackedSubjectValidity; +use ng_net::orm::*; use ng_oxigraph::oxrdf::Subject; use ng_oxigraph::oxrdf::Triple; use ng_repo::errors::NgError; pub fn group_by_subject_for_shape<'a>( - shape: &'a OrmSchemaShape, + shape: &OrmSchemaShape, triples: &'a [Triple], allowed_subjects: &[String], ) -> HashMap> { @@ -30,7 +26,7 @@ pub fn group_by_subject_for_shape<'a>( let allowed_preds_set: HashSet<&str> = shape .predicates .iter() - .map(|OrmSchemaDataType, OrmSchemaPredicatep| p.iri.as_str()) + .map(|p| p.iri.as_str()) .collect(); let allowed_subject_set: HashSet<&str> = allowed_subjects.iter().map(|s| s.as_str()).collect(); for triple in triples { @@ -59,33 +55,37 @@ pub fn group_by_subject_for_shape<'a>( /// Returns predicates to nested objects that were touched and need processing. /// Assumes all triples have same subject. pub fn add_remove_triples_mut( - shape: &OrmSchemaShape, + shape: Arc, subject_iri: &str, triples_added: &[&Triple], triples_removed: &[&Triple], - tracked_subjects: &mut HashMap>, + tracked_subjects: &mut HashMap>>, subject_changes: &mut OrmTrackedSubjectChange, ) -> Result<(), NgError> { - let get_or_create_tracked_subject = - |subject_iri: &str, - shape_iri: &str, - tracked_subjects: &mut HashMap>| { - let tracked_shapes_for_subject = tracked_subjects - .entry(subject_iri.to_string()) - .or_insert_with(HashMap::new); - - tracked_shapes_for_subject - .entry(shape_iri.to_string()) - .or_insert_with(|| OrmTrackedSubject { - tracked_predicates: HashMap::new(), - parents: HashMap::new(), - valid: ng_net::orm::OrmTrackedSubjectValidity::Pending, - subject_iri: subject_iri.to_string(), - shape, - }) - }; + fn get_or_create_tracked_subject<'a> ( + subject_iri: &str, + shape: &Arc, + tracked_subjects: &'a mut HashMap>>) + -> (&'a mut OrmTrackedSubject, Weak) + { + let tracked_shapes_for_subject = tracked_subjects + .entry(subject_iri.to_string()) + .or_insert_with(HashMap::new); + + let subject = tracked_shapes_for_subject + .entry(shape.iri.clone()) + .or_insert_with(|| Arc::new(OrmTrackedSubject { + tracked_predicates: HashMap::new(), + parents: HashMap::new(), + valid: ng_net::orm::OrmTrackedSubjectValidity::Pending, + subject_iri: subject_iri.to_string(), + shape: shape.clone(), + })); + let weak = Arc::downgrade(&subject); + (Arc::get_mut(subject).unwrap(), weak) + } - let tracked_subject = get_or_create_tracked_subject(subject_iri, &shape.iri, tracked_subjects); + let (_, tracked_subject_weak) = get_or_create_tracked_subject(subject_iri, &shape, tracked_subjects); // Process added triples. // For each triple, check if it matches the shape. @@ -98,26 +98,30 @@ pub fn add_remove_triples_mut( } // Predicate schema constraint matches this triple. + let mut upgraded = tracked_subject_weak.upgrade().unwrap(); + let tracked_subject = Arc::get_mut(&mut upgraded).unwrap(); // Add tracked predicate or increase cardinality - let tracked_predicate = tracked_subject + let _tracked_predicate = tracked_subject .tracked_predicates .entry(predicate_schema.iri.to_string()) - .or_insert_with(|| OrmTrackedPredicate { + .or_insert_with(|| Arc::new(OrmTrackedPredicate { current_cardinality: 0, - schema: predicate_schema, + schema: predicate_schema.clone(), tracked_children: Vec::new(), current_literals: None, - }); + })); + let tracked_predicate_weak = Arc::downgrade(&_tracked_predicate); + let tracked_predicate = Arc::get_mut(_tracked_predicate).unwrap(); tracked_predicate.current_cardinality += 1; let obj_term = oxrdf_term_to_orm_basic_type(&triple.object); // Keep track of the changed values too. - let pred_changes = subject_changes + let pred_changes: &mut OrmTrackedPredicateChanges = subject_changes .predicates .entry(predicate_schema.iri.clone()) .or_insert_with(|| OrmTrackedPredicateChanges { - tracked_predicate: tracked_predicate, // reference remains inside lifetime of this call + tracked_predicate: tracked_predicate_weak.clone(), // reference remains inside lifetime of this call values_added: Vec::new(), values_removed: Vec::new(), }); @@ -132,33 +136,35 @@ pub fn add_remove_triples_mut( .any(|dt| dt.valType == OrmSchemaLiteralType::literal) { match &mut tracked_predicate.current_literals { - Some(lits) => lits.push(obj_term), + Some(lits) => lits.push(obj_term.clone()), None => { - tracked_predicate.current_literals = Some(vec![obj_term]); + tracked_predicate.current_literals = Some(vec![obj_term.clone()]); } } } // If predicate is of type shape, register (parent -> child) links so that // nested subjects can later be (lazily) fetched / validated. + // FIXME : shape_iri is never used for shape_iri in predicate_schema .dataTypes .iter() - .filter(|dt| dt.valType == OrmSchemaLiteralType::shape) - .flat_map(|dt| dt.shape) + .filter_map(|dt| if dt.valType == OrmSchemaLiteralType::shape {dt.shape.clone()} else {None} ) { - if let BasicType::Str(obj_iri) = obj_term { + if let BasicType::Str(obj_iri) = &obj_term { // Get or create object's tracked subject struct. - let tracked_child = - get_or_create_tracked_subject(triple.predicate.as_string(), &shape_iri); + let (tracked_child, tracked_child_weak) = + get_or_create_tracked_subject(triple.predicate.as_string(), &shape, tracked_subjects); // Add self to parent (set tracked to true, preliminary). - tracked_child.parents.insert(obj_iri, tracked_child); + tracked_child.parents.insert(obj_iri.clone(), tracked_child_weak.clone()); // Add link to children + let mut upgraded = tracked_predicate_weak.upgrade().unwrap(); + let tracked_predicate = Arc::get_mut(&mut upgraded).unwrap(); tracked_predicate .tracked_children - .push(unsafe { Weak::from_raw(tracked_child) }); + .push(tracked_child_weak); } } } @@ -171,10 +177,11 @@ pub fn add_remove_triples_mut( let tracked_predicate_opt = tracked_subjects .get_mut(subject_iri) .and_then(|tss| tss.get_mut(&shape.iri)) - .and_then(|ts| ts.tracked_predicates.get_mut(pred_iri)); - let Some(tracked_predicate) = tracked_predicate_opt else { + .and_then(|ts| Arc::get_mut(ts).unwrap().tracked_predicates.get_mut(pred_iri)); + let Some(tracked_predicate_arc) = tracked_predicate_opt else { continue; }; + let tracked_predicate = Arc::get_mut(tracked_predicate_arc).unwrap(); // The cardinality might become -1 or 0. We will remove them from the tracked predicates during validation. tracked_predicate.current_cardinality = @@ -211,8 +218,7 @@ pub fn add_remove_triples_mut( .schema .dataTypes .iter() - .filter(|dt| dt.valType == OrmSchemaLiteralType::shape) - .flat_map(|dt| dt.shape) + .filter_map(|dt| if dt.valType == OrmSchemaLiteralType::shape {dt.shape.clone()} else {None} ) { // Nested shape removal logic disabled (see note above). } @@ -224,26 +230,31 @@ pub fn add_remove_triples_mut( /// Check the validity of a subject and update affecting tracked subjects' validity. /// Might return nested objects that need to be validated. /// Assumes all triples to be of same subject. -pub fn update_subject_validity<'a>( - s_change: &'a OrmTrackedSubjectChange<'a>, +pub fn update_subject_validity( + s_change: &OrmTrackedSubjectChange, shape: &OrmSchemaShape, - tracked_subjects: &HashMap>>, + tracked_subjects: &mut HashMap>>, previous_validity: OrmTrackedSubjectValidity, ) -> Vec<(String, String, bool)> { - let Some(tracked_shapes) = tracked_subjects.get_mut(subject_iri) else { + let Some(tracked_shapes) = tracked_subjects.get_mut(&s_change.subject_iri) else { return vec![]; }; let Some(tracked_subject) = tracked_shapes.get_mut(&shape.iri) else { return vec![]; }; + let tracked_subject = Arc::get_mut(tracked_subject).unwrap(); // Keep track of objects that need to be validated against a shape to fetch and validate. let mut need_evaluation: Vec<(String, String, bool)> = vec![]; // Check 1) Check if we need to fetch this object or all parents are untracked. if tracked_subject.parents.len() != 0 { let no_parents_tracking = tracked_subject.parents.values().all(|parent| { - parent.valid == OrmTrackedSubjectValidity::Untracked - || parent.valid == OrmTrackedSubjectValidity::Invalid + match parent.upgrade() { + Some(subject) => + subject.valid == OrmTrackedSubjectValidity::Untracked + || subject.valid == OrmTrackedSubjectValidity::Invalid, + None => true + } }); if no_parents_tracking { @@ -256,7 +267,7 @@ pub fn update_subject_validity<'a>( // We need to fetch the subject's current state: // We have new parents but were previously not recording changes. - return vec![(s_change.subject_iri,)]; + return vec![(s_change.subject_iri.clone(), shape.iri.clone(), true)]; // TODO } } @@ -287,10 +298,10 @@ pub fn update_subject_validity<'a>( // Check 4) Validate subject against each predicate in shape. for p_schema in shape.predicates.iter() { let p_change = s_change.predicates.get(&p_schema.iri); - let tracked_pred = p_change.map(|pc| pc.tracked_predicate); + let tracked_pred = p_change.and_then(|pc| pc.tracked_predicate.upgrade()); let count = - tracked_pred.map_or_else(|| 0, |tp: &OrmTrackedPredicate| tp.current_cardinality); + tracked_pred.as_ref().map_or_else(|| 0, |tp| tp.current_cardinality); // Check 4.1) Cardinality if count < p_schema.minCardinality { @@ -326,7 +337,7 @@ pub fn update_subject_validity<'a>( // between required and given values mismatches. if !p_schema.extra.unwrap_or(false) && ((required_literals.len() as i32) - != tracked_pred.map_or(0, |p| p.current_cardinality)) + != tracked_pred.as_ref().map_or(0, |p| p.current_cardinality)) { return false; } @@ -334,11 +345,9 @@ pub fn update_subject_validity<'a>( // Check that each required literal is present. for required_literal in required_literals { // Is tracked predicate present? - if !tracked_pred - .iter() - .flat_map(|tp| &tp.current_literals) - .flatten() - .any(|literal| *literal == *required_literal) + if !tracked_pred.as_ref().map_or(false, + |t|t.current_literals.as_ref().map_or(false, + |tt|tt.iter().any(|literal| *literal == *required_literal))) { return false; } @@ -356,13 +365,12 @@ pub fn update_subject_validity<'a>( .any(|dt| dt.valType == OrmSchemaLiteralType::shape) { // If we have a nested shape, we need to check if the nested objects are tracked and valid. - + let tracked_children = tracked_pred.as_ref().map(|tp| + tp.tracked_children.iter().filter_map(|weak_tc| weak_tc.upgrade()).collect::>() + ); // First, Count valid, invalid, unknowns, and untracked - let counts = tracked_pred - .iter() - .flat_map(|tp| tp.tracked_children) - .map(|tc| { - tc.upgrade().map(|tc| { + let counts = tracked_children.as_ref() + .map_or((0,0,0,0),|children| children.iter().map(|tc| { if tc.valid == OrmTrackedSubjectValidity::Valid { (1, 0, 0, 0) } else if tc.valid == OrmTrackedSubjectValidity::Invalid { @@ -375,11 +383,9 @@ pub fn update_subject_validity<'a>( (0, 0, 0, 0) } }) - }) - .flatten() .fold((0, 0, 0, 0), |(v1, i1, u1, ut1), o| { (v1 + o.0, i1 + o.1, u1 + o.2, ut1 + o.3) - }); + })); if counts.1 > 0 && p_schema.extra != Some(true) { // If we have at least one invalid nested object and no extra allowed, invalid. @@ -393,38 +399,34 @@ pub fn update_subject_validity<'a>( // If we have untracked nested objects, we need to fetch them and validate. set_validity(&mut new_validity, OrmTrackedSubjectValidity::Pending); // After that we need to reevaluate this (subject,shape) again. - need_evaluation.push((subject_iri.to_string(), shape.iri.clone(), false)); + need_evaluation.push((s_change.subject_iri.to_string(), shape.iri.clone(), false)); // Also schedule untracked children for fetching and validation. - if let Some(tp) = tracked_pred { - for weak_child in &tp.tracked_children { - if let Some(child) = weak_child.upgrade() { - if child.valid == OrmTrackedSubjectValidity::Untracked { - need_evaluation.push(( - child.subject_iri.clone(), - child.shape.iri.clone(), - true, - )); - } + tracked_children.as_ref().map(|children| + for child in children { + if child.valid == OrmTrackedSubjectValidity::Untracked { + need_evaluation.push(( + child.subject_iri.clone(), + child.shape.iri.clone(), + true, + )); } } - } + ); } else if counts.2 > 0 { // If we have unknown nested objects, we need to wait for their evaluation. set_validity(&mut new_validity, OrmTrackedSubjectValidity::Pending); // Schedule unknown children (NotEvaluated) for re-evaluation without fetch. - if let Some(tp) = tracked_pred { - for weak_child in &tp.tracked_children { - if let Some(child) = weak_child.upgrade() { - if child.valid == OrmTrackedSubjectValidity::Pending { - need_evaluation.push(( - child.subject_iri.clone(), - child.shape.iri.clone(), - false, - )); - } + tracked_children.as_ref().map(|children| + for child in children { + if child.valid == OrmTrackedSubjectValidity::Pending { + need_evaluation.push(( + child.subject_iri.clone(), + child.shape.iri.clone(), + false, + )); } } - } + ); } else { // All nested objects are valid and cardinality is correct. // We are valid with this predicate. @@ -435,7 +437,7 @@ pub fn update_subject_validity<'a>( let allowed_types: Vec<&OrmSchemaLiteralType> = p_schema.dataTypes.iter().map(|dt| &dt.valType).collect(); // For each new value, check that data type is in allowed_types. - for val_added in p_change.iter().map(|pc| pc.values_added).flatten() { + for val_added in p_change.iter().map(|pc| &pc.values_added).flatten() { let matches = match val_added { BasicType::Bool(_) => allowed_types .iter() @@ -475,7 +477,7 @@ pub fn update_subject_validity<'a>( { // If this subject became valid, we need to refetch this subject; // We fetch - need_evaluation.insert(0, (s_change.subject_iri, shape.iri.clone(), true)); + need_evaluation.insert(0, (s_change.subject_iri.clone(), shape.iri.clone(), true)); } // If validity changed, parents need to be re-evaluated. @@ -485,7 +487,7 @@ pub fn update_subject_validity<'a>( return tracked_subject .parents .values() - .map(|parent| (&parent.subject_iri, parent.shape, false)) + .filter_map(|parent| parent.upgrade().map(|parent| {(parent.subject_iri.clone(), parent.shape.iri.clone(), false)}) ) // Add `need_evaluation`. .chain(need_evaluation) .collect(); @@ -510,9 +512,11 @@ fn has_cycle(subject: &OrmTrackedSubject, visited: &mut HashSet) -> bool return true; } visited.insert(subject.subject_iri.clone()); - for (_parent_iri, (parent_subject, _)) in &subject.parents { - if has_cycle(parent_subject, visited) { - return true; + for (_parent_iri, parent_subject) in &subject.parents { + if let Some(parent_subject) = parent_subject.upgrade() { + if has_cycle(&parent_subject, visited) { + return true; + } } } visited.remove(&subject.subject_iri);