diff --git a/nextgraph/src/local_broker.rs b/nextgraph/src/local_broker.rs index 7d2db65..54a41fc 100644 --- a/nextgraph/src/local_broker.rs +++ b/nextgraph/src/local_broker.rs @@ -2761,7 +2761,7 @@ pub async fn doc_sparql_construct( ) -> Result, NgError> { let broker = get_broker().await?; let session = broker.get_session(session_id)?; - session.verifier.sparql_construct(sparql, nuri) + session.verifier.query_sparql_construct(sparql, nuri) } pub async fn doc_create( diff --git a/nextgraph/src/tests/orm.rs b/nextgraph/src/tests/orm.rs index d989985..45692e2 100644 --- a/nextgraph/src/tests/orm.rs +++ b/nextgraph/src/tests/orm.rs @@ -14,7 +14,7 @@ use ng_net::orm::{ OrmShapeType, }; use ng_repo::log_info; -use ng_verifier::orm::sparql_construct_from_orm_shape_type; +use ng_verifier::orm::shape_type_to_sparql; use std::collections::HashMap; #[async_std::test] @@ -287,7 +287,7 @@ INSERT DATA { }; // Generate and execute the CONSTRUCT query - let query = sparql_construct_from_orm_shape_type(&shape_type, Some(1)).unwrap(); + let query = shape_type_to_sparql(&shape_type, Some(1)).unwrap(); let triples = doc_sparql_construct(session_id, query, Some(doc_nuri.clone())) .await @@ -390,7 +390,7 @@ INSERT DATA { }; // Generate and run query - let query = sparql_construct_from_orm_shape_type(&shape_type, Some(1)).unwrap(); + let query = shape_type_to_sparql(&shape_type, Some(1)).unwrap(); let triples = doc_sparql_construct(session_id, query, Some(doc_nuri.clone())) .await .unwrap(); @@ -453,7 +453,7 @@ INSERT DATA { }; // Generate and run query - let query = sparql_construct_from_orm_shape_type(&shape_type, Some(1)).unwrap(); + let query = shape_type_to_sparql(&shape_type, Some(1)).unwrap(); let triples = doc_sparql_construct(session_id, query, Some(doc_nuri.clone())) .await .unwrap(); @@ -528,7 +528,7 @@ INSERT DATA { }; // Generate and run query. This must not infinite loop. - let query = sparql_construct_from_orm_shape_type(&shape_type, Some(1)).unwrap(); + let query = shape_type_to_sparql(&shape_type, Some(1)).unwrap(); log_info!("cyclic query result:\n{}", query); let triples = doc_sparql_construct(session_id, query, Some(doc_nuri.clone())) .await diff --git a/ng-verifier/src/orm.rs b/ng-verifier/src/orm.rs index b4d71ed..6a9317a 100644 --- a/ng-verifier/src/orm.rs +++ b/ng-verifier/src/orm.rs @@ -45,7 +45,7 @@ type SubjectIri = String; type OrmChanges<'a> = HashMap>>; impl Verifier { - pub fn sparql_construct( + pub fn query_sparql_construct( &self, query: String, nuri: Option, @@ -83,10 +83,12 @@ impl Verifier { } } - fn build_changes_for_subscription<'a>( + fn process_changes_for_subscription<'a>( + self: &mut Self, orm_subscription: &'a mut OrmSubscription, triples_added: &'a [Triple], triples_removed: &'a [Triple], + nuri: &String, ) -> Result, NgError> { let tracked_subjects = &mut orm_subscription.tracked_subjects; let schema = &orm_subscription.shape_type.schema; @@ -94,7 +96,28 @@ impl Verifier { .get(&orm_subscription.shape_type.shape) .ok_or(VerifierError::InvalidOrmSchema)?; - let mut orm_changes: OrmChanges = HashMap::new(); + return self.process_changes_for_subject_and_shape( + root_shape, + tracked_subjects, + schema, + triples_added, + triples_removed, + nuri, + None, + ); + } + + fn process_changes_for_subject_and_shape<'a>( + self: &mut Self, + root_shape: &OrmSchemaShape, + tracked_subjects: &mut HashMap>, + schema: &OrmSchema, + triples_added: &'a [Triple], + triples_removed: &'a [Triple], + nuri: &String, + mut orm_changes: Option>, + ) -> Result, NgError> { + let mut orm_changes: OrmChanges<'a> = orm_changes.take().unwrap_or_else(HashMap::new); // First in, last out stack to keep track of objects to validate (nested objects first). Strings are object IRIs. let mut shape_validation_queue: Vec<(&OrmSchemaShape, Vec)> = vec![]; @@ -115,10 +138,11 @@ impl Verifier { .collect(); // Variable to collect nested objects that need validation. - // First string is shape IRI, second are object IRIs. - let mut nested_objects_to_validate: HashMap> = HashMap::new(); + let mut nested_objects_to_eval: HashMap> = + HashMap::new(); // For each subject, add/remove triples and validate. + for subject_iri in all_modified_subjects { let triples_added_for_subj = added_triples_by_subject .get(subject_iri) @@ -141,7 +165,7 @@ impl Verifier { // Apply all triples for that subject to the tracked (shape, subject) pair. // Record the changes. - if let Err(e) = add_remove_triples( + if let Err(e) = add_remove_triples_mut( shape, subject_iri, triples_added_for_subj, @@ -160,30 +184,59 @@ impl Verifier { }; // skip if missing // Validate the subject. - let (_new_validity, new_unknowns) = + let need_eval = update_subject_validity(change, shape, tracked_subjects, tracked_subject.valid); - // TODO: Add logic to fetch un-fetched objects after validation. - - // We add the new_unknowns to be processed next - for (iri, schema_shape) in new_unknowns { + // We add the need_eval to be processed next after loop. + for (iri, schema_shape, needs_refetch) in need_eval { // Add to nested_objects_to_validate. - nested_objects_to_validate + nested_objects_to_eval .entry(schema_shape) .or_insert_with(Vec::new) - .push(iri.clone()); + .push((iri.clone(), needs_refetch)); } } - // Now, we add all objects that need re-evaluation to the queue. - for (shape_iri, objects) in nested_objects_to_validate { - if let Some(next_shape) = schema.get(&shape_iri) { - shape_validation_queue.push((next_shape, objects)); - } else { - log_err!("Schema missing for nested shape {}", shape_iri); - } + // Now, we fetch all un-fetched subjects for re-evaluation. + for (shape_iri, objects_to_eval) in &nested_objects_to_eval { + let objects_to_fetch = objects_to_eval + .iter() + .filter(|(_iri, needs_fetch)| *needs_fetch) + .map(|(s, _)| s.clone()) + .collect(); + let shape = schema + .get(shape_iri) + .ok_or(VerifierError::InvalidOrmSchema)?; + + // Create sparql query + let shape_query = + shape_type_to_sparql(&schema, &shape_iri, Some(objects_to_fetch))?; + let new_triples = self.query_sparql_construct(shape_query, Some(nuri.clone()))?; + orm_changes = self.process_changes_for_subject_and_shape( + shape, + tracked_subjects, + schema, + new_triples, + &vec![], + nuri, + Some(orm_changes), + )?; + } + // Now, add all subjects to the queue that did not need a fetch. + for (shape_iri, objects_to_eval) in &nested_objects_to_eval { + let objects_to_fetch = objects_to_eval + .iter() + .filter(|(_iri, needs_fetch)| !*needs_fetch) + .map(|(s, _)| s.clone()) + .collect(); + let shape = schema + .get(shape_iri) + .ok_or(VerifierError::InvalidOrmSchema)?; + + shape_validation_queue.push((shape, objects_to_fetch)); } } + Ok(orm_changes) } @@ -196,12 +249,18 @@ impl Verifier { ) -> Result, NgError> { // If we have a specific session, handle only that subscription. if let Some(session_id) = only_for_session_id { - if let Some((_, sub)) = self + if let Some((nuri, sub)) = self .orm_subscriptions .iter_mut() .find(|(_, s)| s.session_id == session_id) { - return Self::build_changes_for_subscription(sub, triples_added, triples_removed); + // TODO: Is repo correct? + return self.process_changes_for_subscription( + sub, + triples_added, + triples_removed, + &nuri.repo(), + ); } else { return Ok(HashMap::new()); } @@ -215,8 +274,12 @@ impl Verifier { continue; } } - let changes = - Self::build_changes_for_subscription(sub, triples_added, triples_removed)?; + let changes = self.process_changes_for_subscription( + sub, + triples_added, + triples_removed, + &nuri.repo(), // TODO: Is this correct? + )?; for (shape_iri, subj_map) in changes { merged .entry(shape_iri) @@ -405,34 +468,35 @@ impl Verifier { pub(crate) async fn push_orm_response( &mut self, - scope: &NuriV0, - schema_iri: &String, + subscription: &OrmSubscription, response: AppResponse, ) { - log_info!( - "push_orm_response {:?} {} {:?}", - scope, - schema_iri, - self.orm_subscriptions + log_debug!( + "sending orm response for session {}:\n{:?}", + subscription.session_id, + &response ); - if let Some(shapes) = self.orm_subscriptions.get_mut(scope) { - if let Some(sessions) = shapes.get_mut(schema_iri) { - let mut sessions_to_close: Vec = vec![]; - for (session_id, subscription) in sessions.iter_mut() { - if subscription.sender.is_closed() { - log_debug!("closed so removing session {}", session_id); - sessions_to_close.push(*session_id); - } else { - let _ = subscription.sender.send(response.clone()).await; - } - } - for session_id in sessions_to_close.iter() { - sessions.remove(session_id); - } - } + + if subscription.sender.is_closed() { + log_debug!("closed so removing session {}", subscription.session_id); + + self.orm_subscriptions.remove(&subscription.nuri); + } else { + subscription.sender.clone().send(response); } } + pub(crate) fn clean_orm_subscriptions(&mut self) { + let subscriptions = self.orm_subscriptions; + + // TODO: Ownership issues. + // for (nuri, subscription) in subscriptions { + // if subscription.sender.is_closed() { + // subscriptions.remove(nuri); + // } + // } + } + pub(crate) async fn start_orm( &mut self, nuri: &NuriV0, @@ -457,14 +521,15 @@ impl Verifier { .insert(nuri.clone(), orm_subscription); // Query triples for this shape - let shape_query = - sparql_construct_from_orm_shape_type(&shape_type.schema, &shape_type.shape)?; + let shape_query = shape_type_to_sparql(&shape_type.schema, &shape_type.shape, None)?; // TODO: How to stringify nuri correctly? - let shape_triples = self.sparql_construct(shape_query, Some(nuri.repo()))?; + let shape_triples = self.query_sparql_construct(shape_query, Some(nuri.repo()))?; // Create objects from queried triples. let subscription_ref = self.orm_subscriptions.get(nuri).unwrap(); - let _orm_objects = self.create_orm_from_triples(subscription_ref, &shape_triples)?; // placeholder call; TODO integrate response + let _orm_objects = self.create_orm_from_triples(subscription_ref, &shape_triples)?; + + // TODO integrate response //self.push_orm_response().await; (only for requester, not all sessions) @@ -513,9 +578,10 @@ fn literal_to_sparql_str(var: OrmSchemaDataType) -> Vec { } } -pub fn sparql_construct_from_orm_shape_type( +pub fn shape_type_to_sparql( schema: &OrmSchema, shape: &ShapeIri, + filter_subjects: Option>, ) -> Result { // Use a counter to generate unique variable names. let mut var_counter = 0; @@ -530,7 +596,6 @@ pub fn sparql_construct_from_orm_shape_type( let mut where_statements = Vec::new(); // Keep track of visited shapes while recursing to prevent infinite loops. - // TODO: Update type let mut visited_shapes: HashSet = HashSet::new(); // Recursive function to call for (nested) shapes. @@ -666,9 +731,21 @@ pub fn sparql_construct_from_orm_shape_type( &mut visited_shapes, ); + // Filter subjects, if present. + if let Some(subjects) = filter_subjects { + let subjects_str = subjects + .iter() + .map(|s| format!("<{}>", s)) + .collect::>() + .join(", "); + where_statements.push(format!(" FILTER (?s0 IN ({})", subjects_str)); + } + // Create query from statements. let construct_body = construct_statements.join(" .\n"); + let where_body = where_statements.join(" .\n"); + Ok(format!( "CONSTRUCT {{\n{}\n}}\nWHERE {{\n{}\n}}", construct_body, where_body diff --git a/ng-verifier/src/utils/orm_validation.rs b/ng-verifier/src/utils/orm_validation.rs index efaa327..45a13e5 100644 --- a/ng-verifier/src/utils/orm_validation.rs +++ b/ng-verifier/src/utils/orm_validation.rs @@ -58,10 +58,8 @@ pub fn group_by_subject_for_shape<'a>( /// Add all triples to `subject_changes` /// Returns predicates to nested objects that were touched and need processing. /// Assumes all triples have same subject. -use std::sync::Arc; - pub fn add_remove_triples_mut( - shape: &Arc, + shape: &OrmSchemaShape, subject_iri: &str, triples_added: &[&Triple], triples_removed: &[&Triple], @@ -90,9 +88,8 @@ pub fn add_remove_triples_mut( let tracked_subject = get_or_create_tracked_subject(subject_iri, &shape.iri, tracked_subjects); // Process added triples. - // For each triple, check matching predicates in shape. - // keeping track of value count (for later validations). - // In parallel, we keep track of the values added (tracked_changes) + // For each triple, check if it matches the shape. + // In parallel, we record the values added and removed (tracked_changes) for triple in triples_added { for predicate_schema in &shape.predicates { if predicate_schema.iri != triple.predicate.as_str() { @@ -232,22 +229,17 @@ pub fn update_subject_validity<'a>( shape: &OrmSchemaShape, tracked_subjects: &HashMap>>, previous_validity: OrmTrackedSubjectValidity, -) -> ( - OrmTrackedSubjectValidity, - // Vec - Vec<(String, String)>, -) { +) -> Vec<(String, String, bool)> { let Some(tracked_shapes) = tracked_subjects.get_mut(subject_iri) else { - return (previous_validity, vec![]); + return vec![]; }; let Some(tracked_subject) = tracked_shapes.get_mut(&shape.iri) else { - return (previous_validity, vec![]); + return vec![]; }; + // Keep track of objects that need to be validated against a shape to fetch and validate. + let mut need_evaluation: Vec<(String, String, bool)> = vec![]; - // Check 1) Check if we need to fetch this object. - // If this subject has not been monitored but parents are now valid or need evaluation, we fetch. - - // Check 2) If all parents are untracked, return untracked. + // Check 1) Check if we need to fetch this object or all parents are untracked. if tracked_subject.parents.len() != 0 { let no_parents_tracking = tracked_subject.parents.values().all(|parent| { parent.valid == OrmTrackedSubjectValidity::Untracked @@ -258,19 +250,20 @@ pub fn update_subject_validity<'a>( // Remove tracked predicates and set untracked. tracked_subject.tracked_predicates = HashMap::new(); tracked_subject.valid = OrmTrackedSubjectValidity::Untracked; - return (OrmTrackedSubjectValidity::Untracked, vec![]); + return vec![]; } else if !no_parents_tracking && previous_validity == OrmTrackedSubjectValidity::Untracked { // We need to fetch the subject's current state: // We have new parents but were previously not recording changes. + return vec![(s_change.subject_iri,)]; // TODO } } // Check 2) If there are no changes, there is nothing to do. if s_change.predicates.is_empty() { - return (previous_validity, vec![]); + return vec![]; } let mut new_validity = OrmTrackedSubjectValidity::Valid; @@ -288,12 +281,9 @@ pub fn update_subject_validity<'a>( // Remove tracked predicates and set invalid. tracked_subject.tracked_predicates = HashMap::new(); tracked_subject.valid = OrmTrackedSubjectValidity::Invalid; - return (OrmTrackedSubjectValidity::Invalid, vec![]); + return vec![]; } - // Keep track of objects that need to be validated against a shape to fetch and validate. - let mut need_evaluation: Vec<(String, String)> = vec![]; - // Check 4) Validate subject against each predicate in shape. for p_schema in shape.predicates.iter() { let p_change = s_change.predicates.get(&p_schema.iri); @@ -484,32 +474,26 @@ pub fn update_subject_validity<'a>( && previous_validity != OrmTrackedSubjectValidity::Valid { // If this subject became valid, we need to refetch this subject; - // For that we prepend self to needs_fetch. - need_evaluation.insert(0, (s_change.subject_iri, shape.iri.clone())); + // We fetch + need_evaluation.insert(0, (s_change.subject_iri, shape.iri.clone(), true)); } // If validity changed, parents need to be re-evaluated. if new_validity != previous_validity { // We return the tracking parents which need re-evaluation. // Remember that the last elements (i.e. children or needs_fetch) are evaluated first. - return ( - new_validity, - // Add parents and objects in `need_evaluation`. - tracked_subject - .parents - .values() - // Inform tracking parents only. - .filter(|(parent, is_tracked)| *is_tracked) - .map(|(parent, is_tracked)| (&parent.subject_iri, parent.shape)) - // Add `need_evaluation`. - .chain(need_evaluation) - .collect(), - ); + return tracked_subject + .parents + .values() + .map(|parent| (&parent.subject_iri, parent.shape, false)) + // Add `need_evaluation`. + .chain(need_evaluation) + .collect(); } tracked_subject.valid = new_validity; - return (new_validity, need_evaluation); + return need_evaluation; } fn oxrdf_term_to_orm_basic_type(term: &ng_oxigraph::oxrdf::Term) -> BasicType {