fix: allow multiple subscriptions per nuri

feat/orm
Laurin Weger 2 weeks ago
parent 6ed1359887
commit 3d8e4af2ee
No known key found for this signature in database
GPG Key ID: 9B372BB0B792770F
  1. 247
      ng-verifier/src/orm.rs
  2. 175
      ng-verifier/src/utils/orm_validation.rs
  3. 2
      ng-verifier/src/verifier.rs

@ -8,16 +8,18 @@
// according to those terms. // according to those terms.
use futures::channel::mpsc; use futures::channel::mpsc;
use ng_net::actors::app::session;
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
use std::u64;
use futures::SinkExt; use futures::SinkExt;
use lazy_static::lazy_static; use lazy_static::lazy_static;
pub use ng_net::orm::{OrmDiff, OrmShapeType};
use ng_net::utils::Receiver; use ng_net::utils::Receiver;
use ng_net::{app_protocol::*, orm::*}; use ng_net::{app_protocol::*, orm::*};
pub use ng_net::orm::{OrmDiff, OrmShapeType};
use ng_oxigraph::oxigraph::sparql::{Query, QueryResults}; use ng_oxigraph::oxigraph::sparql::{Query, QueryResults};
use ng_oxigraph::oxrdf::Triple; use ng_oxigraph::oxrdf::Triple;
use ng_repo::errors::NgError; use ng_repo::errors::NgError;
@ -77,36 +79,53 @@ impl Verifier {
} }
} }
fn process_changes_for_subscription( /// Helper to call process_changes_for_shape for all subscriptions on nuri's document.
fn process_changes_for_nuri_and_session(
self: &mut Self, self: &mut Self,
nuri: &NuriV0, nuri: &NuriV0,
session_id: u64,
triples_added: &[Triple], triples_added: &[Triple],
triples_removed: &[Triple] triples_removed: &[Triple],
) -> Result<OrmChanges, NgError> { ) -> Result<OrmChanges, NgError> {
let orm_subscription = self.orm_subscriptions.get(nuri).unwrap();
//let tracked_subjects = orm_subscription.tracked_subjects;
let schema = &orm_subscription.shape_type.schema;
let root_shape = schema
.get(&orm_subscription.shape_type.shape)
.ok_or(VerifierError::InvalidOrmSchema)?;
let mut orm_changes = HashMap::new(); let mut orm_changes = HashMap::new();
self.process_changes_for_subject_and_shape(
root_shape.clone(), let shapes: Vec<_> = self
triples_added, .orm_subscriptions
triples_removed, .get(nuri)
nuri, .unwrap()
&mut orm_changes, .iter()
)?; .map(|s| {
s.shape_type
.schema
.get(&s.shape_type.shape)
.unwrap()
.clone()
})
.collect();
for root_shape in shapes {
self.process_changes_for_shape_and_session(
nuri,
root_shape,
session_id,
triples_added,
triples_removed,
&mut orm_changes,
)?;
}
Ok(orm_changes) Ok(orm_changes)
} }
fn process_changes_for_subject_and_shape( /// Add and remove the triples from the tracked subjects,
/// re-validate, and update `changes` containing the updated data.
fn process_changes_for_shape_and_session(
self: &mut Self, self: &mut Self,
nuri: &NuriV0,
root_shape: Arc<OrmSchemaShape>, root_shape: Arc<OrmSchemaShape>,
session_id: u64,
triples_added: &[Triple], triples_added: &[Triple],
triples_removed: &[Triple], triples_removed: &[Triple],
nuri: &NuriV0,
orm_changes: &mut OrmChanges, orm_changes: &mut OrmChanges,
) -> Result<(), NgError> { ) -> Result<(), NgError> {
let nuri_repo = nuri.repo(); let nuri_repo = nuri.repo();
@ -161,7 +180,14 @@ impl Verifier {
// Apply all triples for that subject to the tracked (shape, subject) pair. // Apply all triples for that subject to the tracked (shape, subject) pair.
// Record the changes. // Record the changes.
{ {
let orm_subscription = self.orm_subscriptions.get_mut(nuri).unwrap(); let orm_subscription = self
.orm_subscriptions
.get_mut(nuri)
.unwrap()
.iter()
.find(|s| s.session_id == session_id && s.shape_type.shape == shape.iri)
.unwrap();
if let Err(e) = add_remove_triples_mut( if let Err(e) = add_remove_triples_mut(
shape.clone(), shape.clone(),
subject_iri, subject_iri,
@ -172,10 +198,10 @@ impl Verifier {
) { ) {
log_err!("apply_changes_from_triples add/remove error: {:?}", e); log_err!("apply_changes_from_triples add/remove error: {:?}", e);
} }
let validity = { let validity = {
let tracked_subject_opt = orm_subscription.tracked_subjects let tracked_subject_opt = orm_subscription
.tracked_subjects
.get(subject_iri) .get(subject_iri)
.and_then(|m| m.get(&shape.iri)); .and_then(|m| m.get(&shape.iri));
let Some(tracked_subject) = tracked_subject_opt else { let Some(tracked_subject) = tracked_subject_opt else {
@ -183,9 +209,14 @@ impl Verifier {
}; // skip if missing }; // skip if missing
tracked_subject.valid.clone() tracked_subject.valid.clone()
}; };
// Validate the subject. // Validate the subject.
let need_eval = let need_eval = update_subject_validity(
update_subject_validity(change, &shape, &mut orm_subscription.tracked_subjects, validity); change,
&shape,
&mut orm_subscription.tracked_subjects,
validity,
);
// We add the need_eval to be processed next after loop. // We add the need_eval to be processed next after loop.
for (iri, schema_shape, needs_refetch) in need_eval { for (iri, schema_shape, needs_refetch) in need_eval {
@ -198,9 +229,6 @@ impl Verifier {
} }
} }
let orm_subscription = self.orm_subscriptions.get(nuri).unwrap();
let schema = &orm_subscription.shape_type.schema;
// Now, we fetch all un-fetched subjects for re-evaluation. // Now, we fetch all un-fetched subjects for re-evaluation.
for (shape_iri, objects_to_eval) in &nested_objects_to_eval { for (shape_iri, objects_to_eval) in &nested_objects_to_eval {
let objects_to_fetch = objects_to_eval let objects_to_fetch = objects_to_eval
@ -208,25 +236,35 @@ impl Verifier {
.filter(|(_iri, needs_fetch)| *needs_fetch) .filter(|(_iri, needs_fetch)| *needs_fetch)
.map(|(s, _)| s.clone()) .map(|(s, _)| s.clone())
.collect(); .collect();
let shape = schema
let orm_subscription = self
.get_orm_subscriptions_for(nuri, Some(&shape.iri), Some(&session_id))
.get(0)
.unwrap();
let schema = &orm_subscription.shape_type.schema;
let shape: &Arc<OrmSchemaShape> = schema
.get(shape_iri) .get(shape_iri)
.ok_or(VerifierError::InvalidOrmSchema)?; .ok_or(VerifierError::InvalidOrmSchema)?;
// Create sparql query // Create sparql query
let shape_query = let shape_query =
shape_type_to_sparql(&schema, &shape_iri, Some(objects_to_fetch))?; shape_type_to_sparql(&schema, &shape_iri, Some(objects_to_fetch))?;
let new_triples = self.query_sparql_construct(shape_query, Some(nuri_repo.clone()))?; let new_triples =
// self.process_changes_for_subject_and_shape( self.query_sparql_construct(shape_query, Some(nuri_repo.clone()))?;
// shape.clone(),
// &new_triples, self.process_changes_for_shape_and_session(
// &vec![], nuri,
// nuri, shape.clone(),
// orm_changes, session_id,
// )?; &new_triples,
} &vec![],
// Now, add all subjects to the queue that did not need a fetch. orm_changes,
for (shape_iri, objects_to_eval) in &nested_objects_to_eval { )?;
let objects_to_fetch = objects_to_eval
// @Niko, if I put this in the same loop, I get borrow conflicts
let objects_not_to_fetch = objects_to_eval
.iter() .iter()
.filter(|(_iri, needs_fetch)| !*needs_fetch) .filter(|(_iri, needs_fetch)| !*needs_fetch)
.map(|(s, _)| s.clone()) .map(|(s, _)| s.clone())
@ -235,54 +273,70 @@ impl Verifier {
.get(shape_iri) .get(shape_iri)
.ok_or(VerifierError::InvalidOrmSchema)?; .ok_or(VerifierError::InvalidOrmSchema)?;
shape_validation_queue.push((shape.clone(), objects_to_fetch)); shape_validation_queue.push((shape.clone(), objects_not_to_fetch));
} }
} }
Ok(()) Ok(())
} }
/// Helper to get orm subscriptions for nuri, shapes and sessions.
fn get_orm_subscriptions_for(
&self,
nuri: &NuriV0,
shape: Option<&ShapeIri>,
session_id: Option<&u64>,
) -> Vec<&OrmSubscription> {
self.orm_subscriptions.get(nuri).unwrap().
// Filter shapes, if present.
iter().filter(|s| match shape {
Some(sh) => *sh == s.shape_type.shape,
None => true
// Filter session ids if present.
}).filter(|s| match session_id {
Some(id) => *id == s.session_id,
None => true
}).collect()
}
/// Apply triples to a nuri's document.
/// Updates tracked_subjects in orm_subscriptions.
fn apply_triple_changes( fn apply_triple_changes(
&mut self, &mut self,
triples_added: &[Triple], triples_added: &[Triple],
triples_removed: &[Triple], triples_removed: &[Triple],
only_for_nuri: Option<&NuriV0>, nuri: &NuriV0,
only_for_session_id: Option<u64>, only_for_session_id: Option<u64>,
) -> Result<OrmChanges, NgError> { ) -> Result<OrmChanges, NgError> {
// If we have a specific session, handle only that subscription. // If we have a specific session, handle only that subscription.
if let Some(session_id) = only_for_session_id { if let Some(session_id) = only_for_session_id {
if let Some((nuri, sub)) = self return self.process_changes_for_nuri_and_session(
.orm_subscriptions &nuri.clone(),
.iter() session_id,
.find(|(_, s)| s.session_id == session_id) triples_added,
{ triples_removed,
// TODO: Is repo correct? );
return self.process_changes_for_subscription(
&nuri.clone(),
triples_added,
triples_removed
);
} else {
return Ok(HashMap::new());
}
} }
// Otherwise, iterate all (optionally filter by nuri) and merge. // Otherwise, iterate all sessions.
let mut merged: OrmChanges = HashMap::new(); let mut merged: OrmChanges = HashMap::new();
for nuri in self.orm_subscriptions.iter().filter(|(nuri,_)|{
if let Some(filter_nuri) = only_for_nuri { let session_ids: Vec<_> = self
if *nuri != filter_nuri { .orm_subscriptions
return false; .get(nuri)
} .unwrap()
} .iter()
true .map(|s| s.session_id.clone())
}).map(|(nuri,_)| nuri.clone()).collect::<Vec<_>>() { .collect();
let changes = self.process_changes_for_subscription( for session_id in session_ids {
let changes = self.process_changes_for_nuri_and_session(
&nuri, &nuri,
session_id,
triples_added, triples_added,
triples_removed, triples_removed,
)?; )?;
for (shape_iri, subj_map) in changes { for (shape_iri, subj_map) in changes {
merged merged
.entry(shape_iri) .entry(shape_iri)
@ -293,6 +347,7 @@ impl Verifier {
Ok(merged) Ok(merged)
} }
/// Create ORM JSON object from OrmTrackedSubjectChange and shape.
fn materialize_orm_object( fn materialize_orm_object(
change: &OrmTrackedSubjectChange, change: &OrmTrackedSubjectChange,
changes: &OrmChanges, changes: &OrmChanges,
@ -414,23 +469,29 @@ impl Verifier {
return orm_obj; return orm_obj;
} }
fn create_orm_from_triples( /// For a nuri, session, and shape, create an ORM JSON object.
fn create_orm_object_for_shape(
&mut self, &mut self,
nuri: &NuriV0, nuri: &NuriV0,
triples: &[Triple], session_id: u64,
shape_type: &OrmShapeType,
) -> Result<Value, NgError> { ) -> Result<Value, NgError> {
let session_id = { // Query triples for this shape
let orm_subscription = self.orm_subscriptions.get(nuri).unwrap(); let shape_query = shape_type_to_sparql(&shape_type.schema, &shape_type.shape, None)?;
orm_subscription.session_id // TODO: How to stringify nuri correctly?
}; let shape_triples = self.query_sparql_construct(shape_query, Some(nuri.repo()))?;
let changes: OrmChanges = let changes: OrmChanges =
self.apply_triple_changes(triples, &[], None, Some(session_id))?; self.apply_triple_changes(&shape_triples, &[], nuri, Some(session_id.clone()))?;
let orm_subscription = *self
.get_orm_subscriptions_for(nuri, Some(&shape_type.shape), Some(&session_id))
.get(0)
.unwrap();
let orm_subscription = self.orm_subscriptions.get(nuri).unwrap();
let root_shape_iri = &orm_subscription.shape_type.shape;
let schema = &orm_subscription.shape_type.schema; let schema = &orm_subscription.shape_type.schema;
let root_shape = schema let root_shape = schema
.get(root_shape_iri) .get(&shape_type.shape)
.ok_or(VerifierError::InvalidOrmSchema)?; .ok_or(VerifierError::InvalidOrmSchema)?;
let Some(_root_changes) = changes.get(&root_shape.iri).map(|s| s.values()) else { let Some(_root_changes) = changes.get(&root_shape.iri).map(|s| s.values()) else {
return Ok(Value::Array(vec![])); return Ok(Value::Array(vec![]));
@ -442,11 +503,11 @@ impl Verifier {
// For each valid change struct, we build an orm object. // For each valid change struct, we build an orm object.
// The way we get the changes from the tracked subjects is a bit hacky, sorry. // The way we get the changes from the tracked subjects is a bit hacky, sorry.
for (subject_iri, tracked_subjects_by_shape) in &orm_subscription.tracked_subjects { for (subject_iri, tracked_subjects_by_shape) in &orm_subscription.tracked_subjects {
if let Some(tracked_subject) = tracked_subjects_by_shape.get(root_shape_iri) { if let Some(tracked_subject) = tracked_subjects_by_shape.get(&shape_type.shape) {
if tracked_subject.valid == OrmTrackedSubjectValidity::Valid { if tracked_subject.valid == OrmTrackedSubjectValidity::Valid {
if let Some(change) = changes if let Some(change) = changes
.get(root_shape_iri) .get(&shape_type.shape)
.and_then(|subject_iri_to_ts| subject_iri_to_ts.get(subject_iri)) .and_then(|subject_iri_to_ts| subject_iri_to_ts.get(subject_iri).clone())
{ {
let new_val = Self::materialize_orm_object( let new_val = Self::materialize_orm_object(
change, change,
@ -463,7 +524,7 @@ impl Verifier {
return Ok(return_vals); return Ok(return_vals);
} }
pub(crate) async fn orm_update(&mut self, scope: &NuriV0, patch: GraphQuadsPatch) {} pub(crate) async fn orm_update(&mut self, scope: &NuriV0, patch: GraphTransaction) {}
pub(crate) async fn orm_frontend_update( pub(crate) async fn orm_frontend_update(
&mut self, &mut self,
@ -495,12 +556,14 @@ impl Verifier {
} }
pub(crate) fn clean_orm_subscriptions(&mut self) { pub(crate) fn clean_orm_subscriptions(&mut self) {
self.orm_subscriptions.retain(|_, subscriptions| {
self.orm_subscriptions.retain(|_,subscription| subscriptions.retain(|sub| !sub.sender.is_closed());
!subscription.sender.is_closed() !subscriptions.is_empty()
); });
} }
/// Entry point to create a new orm subscription.
/// Triggers the creation of an orm object which is sent back to the receiver.
pub(crate) async fn start_orm( pub(crate) async fn start_orm(
&mut self, &mut self,
nuri: &NuriV0, nuri: &NuriV0,
@ -522,15 +585,11 @@ impl Verifier {
nuri: nuri.clone(), nuri: nuri.clone(),
}; };
self.orm_subscriptions self.orm_subscriptions
.insert(nuri.clone(), orm_subscription); .entry(nuri.clone())
.or_insert(vec![])
// Query triples for this shape .push(orm_subscription);
let shape_query = shape_type_to_sparql(&shape_type.schema, &shape_type.shape, None)?;
// TODO: How to stringify nuri correctly?
let shape_triples = self.query_sparql_construct(shape_query, Some(nuri.repo()))?;
// Create objects from queried triples. let orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type);
let _orm_objects = self.create_orm_from_triples(nuri, &shape_triples)?;
// TODO integrate response // TODO integrate response

@ -9,8 +9,8 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::Weak;
use std::sync::Arc; use std::sync::Arc;
use std::sync::Weak;
use ng_net::orm::*; use ng_net::orm::*;
use ng_oxigraph::oxrdf::Subject; use ng_oxigraph::oxrdf::Subject;
@ -23,11 +23,8 @@ pub fn group_by_subject_for_shape<'a>(
allowed_subjects: &[String], allowed_subjects: &[String],
) -> HashMap<String, Vec<&'a Triple>> { ) -> HashMap<String, Vec<&'a Triple>> {
let mut triples_by_subject: HashMap<String, Vec<&Triple>> = HashMap::new(); let mut triples_by_subject: HashMap<String, Vec<&Triple>> = HashMap::new();
let allowed_preds_set: HashSet<&str> = shape let allowed_preds_set: HashSet<&str> =
.predicates shape.predicates.iter().map(|p| p.iri.as_str()).collect();
.iter()
.map(|p| p.iri.as_str())
.collect();
let allowed_subject_set: HashSet<&str> = allowed_subjects.iter().map(|s| s.as_str()).collect(); let allowed_subject_set: HashSet<&str> = allowed_subjects.iter().map(|s| s.as_str()).collect();
for triple in triples { for triple in triples {
// triple.subject must be in allowed_subjects (or allowed_subjects empty) // triple.subject must be in allowed_subjects (or allowed_subjects empty)
@ -62,30 +59,32 @@ pub fn add_remove_triples_mut(
tracked_subjects: &mut HashMap<String, HashMap<String, Arc<OrmTrackedSubject>>>, tracked_subjects: &mut HashMap<String, HashMap<String, Arc<OrmTrackedSubject>>>,
subject_changes: &mut OrmTrackedSubjectChange, subject_changes: &mut OrmTrackedSubjectChange,
) -> Result<(), NgError> { ) -> Result<(), NgError> {
fn get_or_create_tracked_subject<'a> ( fn get_or_create_tracked_subject<'a>(
subject_iri: &str, subject_iri: &str,
shape: &Arc<OrmSchemaShape>, shape: &Arc<OrmSchemaShape>,
tracked_subjects: &'a mut HashMap<String, HashMap<String, Arc<OrmTrackedSubject>>>) tracked_subjects: &'a mut HashMap<String, HashMap<String, Arc<OrmTrackedSubject>>>,
-> (&'a mut OrmTrackedSubject, Weak<OrmTrackedSubject>) ) -> (&'a mut OrmTrackedSubject, Weak<OrmTrackedSubject>) {
{
let tracked_shapes_for_subject = tracked_subjects let tracked_shapes_for_subject = tracked_subjects
.entry(subject_iri.to_string()) .entry(subject_iri.to_string())
.or_insert_with(HashMap::new); .or_insert_with(HashMap::new);
let subject = tracked_shapes_for_subject let subject = tracked_shapes_for_subject
.entry(shape.iri.clone()) .entry(shape.iri.clone())
.or_insert_with(|| Arc::new(OrmTrackedSubject { .or_insert_with(|| {
tracked_predicates: HashMap::new(), Arc::new(OrmTrackedSubject {
parents: HashMap::new(), tracked_predicates: HashMap::new(),
valid: ng_net::orm::OrmTrackedSubjectValidity::Pending, parents: HashMap::new(),
subject_iri: subject_iri.to_string(), valid: ng_net::orm::OrmTrackedSubjectValidity::Pending,
shape: shape.clone(), subject_iri: subject_iri.to_string(),
})); shape: shape.clone(),
})
});
let weak = Arc::downgrade(&subject); let weak = Arc::downgrade(&subject);
(Arc::get_mut(subject).unwrap(), weak) (Arc::get_mut(subject).unwrap(), weak)
} }
let (_, tracked_subject_weak) = get_or_create_tracked_subject(subject_iri, &shape, tracked_subjects); let (_, tracked_subject_weak) =
get_or_create_tracked_subject(subject_iri, &shape, tracked_subjects);
// Process added triples. // Process added triples.
// For each triple, check if it matches the shape. // For each triple, check if it matches the shape.
@ -101,17 +100,19 @@ pub fn add_remove_triples_mut(
let mut upgraded = tracked_subject_weak.upgrade().unwrap(); let mut upgraded = tracked_subject_weak.upgrade().unwrap();
let tracked_subject = Arc::get_mut(&mut upgraded).unwrap(); let tracked_subject = Arc::get_mut(&mut upgraded).unwrap();
// Add tracked predicate or increase cardinality // Add tracked predicate or increase cardinality
let _tracked_predicate = tracked_subject let tracked_predicate_ = tracked_subject
.tracked_predicates .tracked_predicates
.entry(predicate_schema.iri.to_string()) .entry(predicate_schema.iri.to_string())
.or_insert_with(|| Arc::new(OrmTrackedPredicate { .or_insert_with(|| {
current_cardinality: 0, Arc::new(OrmTrackedPredicate {
schema: predicate_schema.clone(), current_cardinality: 0,
tracked_children: Vec::new(), schema: predicate_schema.clone(),
current_literals: None, tracked_children: Vec::new(),
})); current_literals: None,
let tracked_predicate_weak = Arc::downgrade(&_tracked_predicate); })
let tracked_predicate = Arc::get_mut(_tracked_predicate).unwrap(); });
let tracked_predicate_weak = Arc::downgrade(&tracked_predicate_);
let tracked_predicate = Arc::get_mut(tracked_predicate_).unwrap();
tracked_predicate.current_cardinality += 1; tracked_predicate.current_cardinality += 1;
let obj_term = oxrdf_term_to_orm_basic_type(&triple.object); let obj_term = oxrdf_term_to_orm_basic_type(&triple.object);
@ -146,25 +147,30 @@ pub fn add_remove_triples_mut(
// If predicate is of type shape, register (parent -> child) links so that // If predicate is of type shape, register (parent -> child) links so that
// nested subjects can later be (lazily) fetched / validated. // nested subjects can later be (lazily) fetched / validated.
// FIXME : shape_iri is never used // FIXME : shape_iri is never used
for shape_iri in predicate_schema for shape_iri in predicate_schema.dataTypes.iter().filter_map(|dt| {
.dataTypes if dt.valType == OrmSchemaLiteralType::shape {
.iter() dt.shape.clone()
.filter_map(|dt| if dt.valType == OrmSchemaLiteralType::shape {dt.shape.clone()} else {None} ) } else {
{ None
}
}) {
if let BasicType::Str(obj_iri) = &obj_term { if let BasicType::Str(obj_iri) = &obj_term {
// Get or create object's tracked subject struct. // Get or create object's tracked subject struct.
let (tracked_child, tracked_child_weak) = let (tracked_child, tracked_child_weak) = get_or_create_tracked_subject(
get_or_create_tracked_subject(triple.predicate.as_string(), &shape, tracked_subjects); triple.predicate.as_string(),
&shape,
tracked_subjects,
);
// Add self to parent (set tracked to true, preliminary). // Add self to parent (set tracked to true, preliminary).
tracked_child.parents.insert(obj_iri.clone(), tracked_child_weak.clone()); tracked_child
.parents
.insert(obj_iri.clone(), tracked_child_weak.clone());
// Add link to children // Add link to children
let mut upgraded = tracked_predicate_weak.upgrade().unwrap(); let mut upgraded = tracked_predicate_weak.upgrade().unwrap();
let tracked_predicate = Arc::get_mut(&mut upgraded).unwrap(); let tracked_predicate = Arc::get_mut(&mut upgraded).unwrap();
tracked_predicate tracked_predicate.tracked_children.push(tracked_child_weak);
.tracked_children
.push(tracked_child_weak);
} }
} }
} }
@ -177,7 +183,12 @@ pub fn add_remove_triples_mut(
let tracked_predicate_opt = tracked_subjects let tracked_predicate_opt = tracked_subjects
.get_mut(subject_iri) .get_mut(subject_iri)
.and_then(|tss| tss.get_mut(&shape.iri)) .and_then(|tss| tss.get_mut(&shape.iri))
.and_then(|ts| Arc::get_mut(ts).unwrap().tracked_predicates.get_mut(pred_iri)); .and_then(|ts| {
Arc::get_mut(ts)
.unwrap()
.tracked_predicates
.get_mut(pred_iri)
});
let Some(tracked_predicate_arc) = tracked_predicate_opt else { let Some(tracked_predicate_arc) = tracked_predicate_opt else {
continue; continue;
}; };
@ -214,12 +225,13 @@ pub fn add_remove_triples_mut(
.any(|dt| dt.valType == OrmSchemaLiteralType::shape) .any(|dt| dt.valType == OrmSchemaLiteralType::shape)
{ {
// Remove parent from child and child from tracked children. // Remove parent from child and child from tracked children.
for shape_iri in tracked_predicate for shape_iri in tracked_predicate.schema.dataTypes.iter().filter_map(|dt| {
.schema if dt.valType == OrmSchemaLiteralType::shape {
.dataTypes dt.shape.clone()
.iter() } else {
.filter_map(|dt| if dt.valType == OrmSchemaLiteralType::shape {dt.shape.clone()} else {None} ) None
{ }
}) {
// Nested shape removal logic disabled (see note above). // Nested shape removal logic disabled (see note above).
} }
} }
@ -248,14 +260,17 @@ pub fn update_subject_validity(
// Check 1) Check if we need to fetch this object or all parents are untracked. // Check 1) Check if we need to fetch this object or all parents are untracked.
if tracked_subject.parents.len() != 0 { if tracked_subject.parents.len() != 0 {
let no_parents_tracking = tracked_subject.parents.values().all(|parent| { let no_parents_tracking =
match parent.upgrade() { tracked_subject
Some(subject) => .parents
subject.valid == OrmTrackedSubjectValidity::Untracked .values()
|| subject.valid == OrmTrackedSubjectValidity::Invalid, .all(|parent| match parent.upgrade() {
None => true Some(subject) => {
} subject.valid == OrmTrackedSubjectValidity::Untracked
}); || subject.valid == OrmTrackedSubjectValidity::Invalid
}
None => true,
});
if no_parents_tracking { if no_parents_tracking {
// Remove tracked predicates and set untracked. // Remove tracked predicates and set untracked.
@ -300,8 +315,9 @@ pub fn update_subject_validity(
let p_change = s_change.predicates.get(&p_schema.iri); let p_change = s_change.predicates.get(&p_schema.iri);
let tracked_pred = p_change.and_then(|pc| pc.tracked_predicate.upgrade()); let tracked_pred = p_change.and_then(|pc| pc.tracked_predicate.upgrade());
let count = let count = tracked_pred
tracked_pred.as_ref().map_or_else(|| 0, |tp| tp.current_cardinality); .as_ref()
.map_or_else(|| 0, |tp| tp.current_cardinality);
// Check 4.1) Cardinality // Check 4.1) Cardinality
if count < p_schema.minCardinality { if count < p_schema.minCardinality {
@ -345,10 +361,11 @@ pub fn update_subject_validity(
// Check that each required literal is present. // Check that each required literal is present.
for required_literal in required_literals { for required_literal in required_literals {
// Is tracked predicate present? // Is tracked predicate present?
if !tracked_pred.as_ref().map_or(false, if !tracked_pred.as_ref().map_or(false, |t| {
|t|t.current_literals.as_ref().map_or(false, t.current_literals.as_ref().map_or(false, |tt| {
|tt|tt.iter().any(|literal| *literal == *required_literal))) tt.iter().any(|literal| *literal == *required_literal)
{ })
}) {
return false; return false;
} }
} }
@ -365,12 +382,17 @@ pub fn update_subject_validity(
.any(|dt| dt.valType == OrmSchemaLiteralType::shape) .any(|dt| dt.valType == OrmSchemaLiteralType::shape)
{ {
// If we have a nested shape, we need to check if the nested objects are tracked and valid. // If we have a nested shape, we need to check if the nested objects are tracked and valid.
let tracked_children = tracked_pred.as_ref().map(|tp| let tracked_children = tracked_pred.as_ref().map(|tp| {
tp.tracked_children.iter().filter_map(|weak_tc| weak_tc.upgrade()).collect::<Vec<_>>() tp.tracked_children
); .iter()
.filter_map(|weak_tc| weak_tc.upgrade())
.collect::<Vec<_>>()
});
// First, Count valid, invalid, unknowns, and untracked // First, Count valid, invalid, unknowns, and untracked
let counts = tracked_children.as_ref() let counts = tracked_children.as_ref().map_or((0, 0, 0, 0), |children| {
.map_or((0,0,0,0),|children| children.iter().map(|tc| { children
.iter()
.map(|tc| {
if tc.valid == OrmTrackedSubjectValidity::Valid { if tc.valid == OrmTrackedSubjectValidity::Valid {
(1, 0, 0, 0) (1, 0, 0, 0)
} else if tc.valid == OrmTrackedSubjectValidity::Invalid { } else if tc.valid == OrmTrackedSubjectValidity::Invalid {
@ -383,9 +405,10 @@ pub fn update_subject_validity(
(0, 0, 0, 0) (0, 0, 0, 0)
} }
}) })
.fold((0, 0, 0, 0), |(v1, i1, u1, ut1), o| { .fold((0, 0, 0, 0), |(v1, i1, u1, ut1), o| {
(v1 + o.0, i1 + o.1, u1 + o.2, ut1 + o.3) (v1 + o.0, i1 + o.1, u1 + o.2, ut1 + o.3)
})); })
});
if counts.1 > 0 && p_schema.extra != Some(true) { if counts.1 > 0 && p_schema.extra != Some(true) {
// If we have at least one invalid nested object and no extra allowed, invalid. // If we have at least one invalid nested object and no extra allowed, invalid.
@ -401,8 +424,8 @@ pub fn update_subject_validity(
// After that we need to reevaluate this (subject,shape) again. // After that we need to reevaluate this (subject,shape) again.
need_evaluation.push((s_change.subject_iri.to_string(), shape.iri.clone(), false)); need_evaluation.push((s_change.subject_iri.to_string(), shape.iri.clone(), false));
// Also schedule untracked children for fetching and validation. // Also schedule untracked children for fetching and validation.
tracked_children.as_ref().map(|children| tracked_children.as_ref().map(|children| {
for child in children { for child in children {
if child.valid == OrmTrackedSubjectValidity::Untracked { if child.valid == OrmTrackedSubjectValidity::Untracked {
need_evaluation.push(( need_evaluation.push((
child.subject_iri.clone(), child.subject_iri.clone(),
@ -411,13 +434,13 @@ pub fn update_subject_validity(
)); ));
} }
} }
); });
} else if counts.2 > 0 { } else if counts.2 > 0 {
// If we have unknown nested objects, we need to wait for their evaluation. // If we have unknown nested objects, we need to wait for their evaluation.
set_validity(&mut new_validity, OrmTrackedSubjectValidity::Pending); set_validity(&mut new_validity, OrmTrackedSubjectValidity::Pending);
// Schedule unknown children (NotEvaluated) for re-evaluation without fetch. // Schedule unknown children (NotEvaluated) for re-evaluation without fetch.
tracked_children.as_ref().map(|children| tracked_children.as_ref().map(|children| {
for child in children { for child in children {
if child.valid == OrmTrackedSubjectValidity::Pending { if child.valid == OrmTrackedSubjectValidity::Pending {
need_evaluation.push(( need_evaluation.push((
child.subject_iri.clone(), child.subject_iri.clone(),
@ -426,7 +449,7 @@ pub fn update_subject_validity(
)); ));
} }
} }
); });
} else { } else {
// All nested objects are valid and cardinality is correct. // All nested objects are valid and cardinality is correct.
// We are valid with this predicate. // We are valid with this predicate.
@ -487,7 +510,11 @@ pub fn update_subject_validity(
return tracked_subject return tracked_subject
.parents .parents
.values() .values()
.filter_map(|parent| parent.upgrade().map(|parent| {(parent.subject_iri.clone(), parent.shape.iri.clone(), false)}) ) .filter_map(|parent| {
parent
.upgrade()
.map(|parent| (parent.subject_iri.clone(), parent.shape.iri.clone(), false))
})
// Add `need_evaluation`. // Add `need_evaluation`.
.chain(need_evaluation) .chain(need_evaluation)
.collect(); .collect();

@ -112,7 +112,7 @@ pub struct Verifier {
in_memory_outbox: Vec<EventOutboxStorage>, in_memory_outbox: Vec<EventOutboxStorage>,
uploads: BTreeMap<u32, RandomAccessFile>, uploads: BTreeMap<u32, RandomAccessFile>,
branch_subscriptions: HashMap<BranchId, Sender<AppResponse>>, branch_subscriptions: HashMap<BranchId, Sender<AppResponse>>,
pub(crate) orm_subscriptions: HashMap<NuriV0, OrmSubscription>, pub(crate) orm_subscriptions: HashMap<NuriV0, Vec<OrmSubscription>>,
pub(crate) temporary_repo_certificates: HashMap<RepoId, ObjectRef>, pub(crate) temporary_repo_certificates: HashMap<RepoId, ObjectRef>,
} }

Loading…
Cancel
Save