From 6d76e92d45ea86da5bd541ebdae4334f6ac5b0d4 Mon Sep 17 00:00:00 2001 From: Niko PLP Date: Mon, 6 Oct 2025 23:49:13 +0300 Subject: [PATCH] switching to RwLock instead of Arc --- nextgraph/src/tests/orm.rs | 7 + ng-repo/src/errors.rs | 1 + ng-verifier/src/orm/add_remove_triples.rs | 210 +++++++++++----------- ng-verifier/src/orm/mod.rs | 41 +++-- ng-verifier/src/orm/types.rs | 20 +-- ng-verifier/src/orm/validation.rs | 47 ++--- ng-verifier/src/verifier.rs | 2 +- 7 files changed, 165 insertions(+), 163 deletions(-) diff --git a/nextgraph/src/tests/orm.rs b/nextgraph/src/tests/orm.rs index 5286e89..dc55808 100644 --- a/nextgraph/src/tests/orm.rs +++ b/nextgraph/src/tests/orm.rs @@ -818,6 +818,11 @@ INSERT DATA { log_info!("orm_start called"); + // TODO: remove this call to cancel_fn() + cancel_fn(); + + // TODO : change the data with sparql_query + while let Some(app_response) = receiver.next().await { let orm_json = match app_response { AppResponse::V0(v) => match v { @@ -828,6 +833,8 @@ INSERT DATA { .unwrap(); log_info!("ORM JSON arrived\n: {:?}", orm_json); + + // TODO: after we got what we wanted, call cancel_fn, otherwise the test never ends. } // } diff --git a/ng-repo/src/errors.rs b/ng-repo/src/errors.rs index 186e3af..0b460b8 100644 --- a/ng-repo/src/errors.rs +++ b/ng-repo/src/errors.rs @@ -399,6 +399,7 @@ pub enum VerifierError { InvalidInboxPost, InvalidOrmSchema, OrmSubjectNotFound, + OrmPredicateNotFound, } impl Error for VerifierError {} diff --git a/ng-verifier/src/orm/add_remove_triples.rs b/ng-verifier/src/orm/add_remove_triples.rs index 090afd2..2f61b35 100644 --- a/ng-verifier/src/orm/add_remove_triples.rs +++ b/ng-verifier/src/orm/add_remove_triples.rs @@ -8,11 +8,9 @@ // according to those terms. use ng_oxigraph::oxrdf::Triple; -use ng_repo::errors::NgError; use ng_repo::errors::VerifierError; use std::collections::HashMap; -use std::sync::Arc; -use std::sync::Weak; +use std::sync::{Arc, RwLock}; use crate::orm::types::*; use ng_net::orm::*; @@ -26,29 +24,36 @@ pub fn add_remove_triples( subject_iri: &str, triples_added: &[&Triple], triples_removed: &[&Triple], - orm_subscription: &mut Arc, + orm_subscription: &mut OrmSubscription, subject_changes: &mut OrmTrackedSubjectChange, ) -> Result<(), VerifierError> { - fn get_tracked_subject( - subject_iri: &str, - shape: &Arc, - tracked_subjects: &HashMap>>, - ) -> Result, VerifierError> { - let tracked_shapes_for_subject = tracked_subjects - .get(&subject_iri.to_string()) - .ok_or(VerifierError::OrmSubjectNotFound)?; - let subject = tracked_shapes_for_subject - .get(&shape.iri) - .ok_or(VerifierError::OrmSubjectNotFound)?; - Ok(Arc::::downgrade(&subject)) - } + // fn get_tracked_predicate<'a>( + // subject_iri: &str, + // shape: &Arc, + // predicate_schema_iri: &String, + // tracked_subjects: &'a mut HashMap>>>, + // ) -> Result>, VerifierError> { + // let tracked_shapes_for_subject = tracked_subjects + // .get_mut(&subject_iri.to_string()) + // .ok_or(VerifierError::OrmSubjectNotFound)?; + // let subject = tracked_shapes_for_subject + // .get_mut(&shape.iri) + // .ok_or(VerifierError::OrmSubjectNotFound)?; + // Ok(subject + // .read() + // .unwrap() + // .tracked_predicates + // .get(predicate_schema_iri) + // .ok_or(VerifierError::OrmPredicateNotFound)? + // .clone()) + // } // Helper to get/create tracked subjects fn get_or_create_tracked_subject<'a>( subject_iri: &str, shape: &Arc, - tracked_subjects: &'a mut HashMap>>, - ) -> &'a mut Arc { + tracked_subjects: &'a mut HashMap>>>, + ) -> Arc> { let tracked_shapes_for_subject = tracked_subjects .entry(subject_iri.to_string()) .or_insert_with(HashMap::new); @@ -56,94 +61,80 @@ pub fn add_remove_triples( let subject = tracked_shapes_for_subject .entry(shape.iri.clone()) .or_insert_with(|| { - Arc::new(OrmTrackedSubject { + Arc::new(RwLock::new(OrmTrackedSubject { tracked_predicates: HashMap::new(), parents: HashMap::new(), valid: OrmTrackedSubjectValidity::Pending, subject_iri: subject_iri.to_string(), shape: shape.clone(), - }) + })) }); - //let strong = Arc::get_mut(subject).unwrap(); - // log_info!( - // "strong {} weak {}", - // Arc::::strong_count(&subject), - // Arc::::weak_count(&subject) - // ); - subject + subject.clone() } - // Destructure to get separate references and avoid borrowing conflicts - let orm_sub = Arc::get_mut(orm_subscription).unwrap(); - let schema = &orm_sub.shape_type.schema; - let tracked_subjects = &mut orm_sub.tracked_subjects; - - // log_info!( - // "strong {} weak {}", - // Arc::::strong_count(&tracked_subject_strong), - // Arc::::weak_count(&tracked_subject_strong) - // ); - // let tracked_subject_weak = Arc::::downgrade(&tracked_subject_strong); + let schema = &orm_subscription.shape_type.schema; + let tracked_subjects = &mut orm_subscription.tracked_subjects; // Process added triples. // For each triple, check if it matches the shape. // In parallel, we record the values added and removed (tracked_changes) for triple in triples_added { + let obj_term = oxrdf_term_to_orm_basic_type(&triple.object); + log_debug!("processing triple {triple}"); for predicate_schema in &shape.predicates { if predicate_schema.iri != triple.predicate.as_str() { // Triple does not match predicate. continue; } // Predicate schema constraint matches this triple. - - let mut tracked_subject_upgraded = + let tracked_subject_lock = get_or_create_tracked_subject(subject_iri, &shape, tracked_subjects); - let tracked_subject = Arc::get_mut(&mut tracked_subject_upgraded).unwrap(); + let mut tracked_subject = tracked_subject_lock.write().unwrap(); + log_debug!("lock acquired on tracked_subject"); // Add tracked predicate or increase cardinality - let tracked_predicate_ = tracked_subject + let tracked_predicate_lock = tracked_subject .tracked_predicates - .entry(predicate_schema.iri.to_string()) + .entry(predicate_schema.iri.clone()) .or_insert_with(|| { - Arc::new(OrmTrackedPredicate { + Arc::new(RwLock::new(OrmTrackedPredicate { current_cardinality: 0, schema: predicate_schema.clone(), tracked_children: Vec::new(), current_literals: None, - }) + })) }); - let tracked_predicate_weak = Arc::downgrade(&tracked_predicate_); - let tracked_predicate = Arc::get_mut(tracked_predicate_).unwrap(); - tracked_predicate.current_cardinality += 1; - - let obj_term = oxrdf_term_to_orm_basic_type(&triple.object); + { + let mut tracked_predicate = tracked_predicate_lock.write().unwrap(); + log_debug!("lock acquired on tracked_predicate"); + tracked_predicate.current_cardinality += 1; - // Keep track of the changed values too. - let pred_changes: &mut OrmTrackedPredicateChanges = subject_changes - .predicates - .entry(predicate_schema.iri.clone()) - .or_insert_with(|| OrmTrackedPredicateChanges { - tracked_predicate: tracked_predicate_weak.clone(), // reference remains inside lifetime of this call - values_added: Vec::new(), - values_removed: Vec::new(), - }); + // Keep track of the changed values too. + let pred_changes: &mut OrmTrackedPredicateChanges = subject_changes + .predicates + .entry(predicate_schema.iri.clone()) + .or_insert_with(|| OrmTrackedPredicateChanges { + tracked_predicate: tracked_predicate_lock.clone(), + values_added: Vec::new(), + values_removed: Vec::new(), + }); - pred_changes.values_added.push(obj_term.clone()); + pred_changes.values_added.push(obj_term.clone()); - // If value type is literal, we need to add the current value to the tracked predicate. - if tracked_predicate - .schema - .dataTypes - .iter() - .any(|dt| dt.valType == OrmSchemaLiteralType::literal) - { - match &mut tracked_predicate.current_literals { - Some(lits) => lits.push(obj_term.clone()), - None => { - tracked_predicate.current_literals = Some(vec![obj_term.clone()]); + // If value type is literal, we need to add the current value to the tracked predicate. + if tracked_predicate + .schema + .dataTypes + .iter() + .any(|dt| dt.valType == OrmSchemaLiteralType::literal) + { + match &mut tracked_predicate.current_literals { + Some(lits) => lits.push(obj_term.clone()), + None => { + tracked_predicate.current_literals = Some(vec![obj_term.clone()]); + } } } } - // If predicate is of type shape, register // "parent (predicate) -> child subject" and `child_subject.parents`. for shape_iri in predicate_schema.dataTypes.iter().filter_map(|dt| { @@ -153,30 +144,39 @@ pub fn add_remove_triples( None } }) { + log_debug!("dealing with nesting for {shape_iri}"); if let BasicType::Str(obj_iri) = &obj_term { - // Get or create object's tracked subject struct. - let child_shape = schema.get(&shape_iri).unwrap(); - // find the parent - let parent = get_tracked_subject(subject_iri, &shape, tracked_subjects)?; + let tracked_child_arc = { + // Get or create object's tracked subject struct. + let child_shape = schema.get(&shape_iri).unwrap(); + // find the parent + let parent = + get_or_create_tracked_subject(subject_iri, &shape, tracked_subjects); - // If this actually created a new tracked subject, that's fine and will be removed during validation. - let tracked_child = - get_or_create_tracked_subject(obj_iri, child_shape, tracked_subjects); + // If this actually created a new tracked subject, that's fine and will be removed during validation. + let tracked_child = + get_or_create_tracked_subject(obj_iri, child_shape, tracked_subjects); - // Add self to parent. - Arc::get_mut(tracked_child) - .unwrap() - .parents - .insert(subject_iri.to_string(), parent); + // Add self to parent. + tracked_child + .write() + .unwrap() + .parents + .insert(subject_iri.to_string(), parent); + log_debug!("lock acquired on tracked_child {obj_iri}"); + tracked_child + }; // Add link to children - let mut upgraded = tracked_predicate_weak.upgrade().unwrap(); - let tracked_predicate = Arc::get_mut(&mut upgraded).unwrap(); - tracked_predicate - .tracked_children - .push(Arc::::downgrade(&tracked_child)); + let mut tracked_predicate = tracked_predicate_lock.write().unwrap(); + log_debug!( + "for children, lock acquired on tracked_predicate {}", + predicate_schema.iri + ); + tracked_predicate.tracked_children.push(tracked_child_arc); } } + log_debug!("end of dealing with nesting"); } } // Process removed triples. @@ -185,18 +185,13 @@ pub fn add_remove_triples( // Only adjust if we had tracked state. let tracked_predicate_opt = tracked_subjects - .get_mut(subject_iri) - .and_then(|tss| tss.get_mut(&shape.iri)) - .and_then(|ts| { - Arc::get_mut(ts) - .unwrap() - .tracked_predicates - .get_mut(pred_iri) - }); - let Some(tracked_predicate_arc) = tracked_predicate_opt else { + .get(subject_iri) + .and_then(|tss| tss.get(&shape.iri)) + .and_then(|ts| ts.read().unwrap().tracked_predicates.get(pred_iri).cloned()); + let Some(tracked_predicate_rc) = tracked_predicate_opt else { continue; }; - let tracked_predicate = Arc::get_mut(tracked_predicate_arc).unwrap(); + let mut tracked_predicate = tracked_predicate_rc.write().unwrap(); // The cardinality might become -1 or 0. We will remove them from the tracked predicates during validation. tracked_predicate.current_cardinality = @@ -248,21 +243,18 @@ pub fn add_remove_triples( // Remove link to children tracked_predicate .tracked_children - .retain(|c| *obj_iri != c.upgrade().unwrap().subject_iri); + .retain(|c| *obj_iri != c.read().unwrap().subject_iri); for shape_iri in shapes_to_process { // Get or create object's tracked subject struct. let child_shape = schema.get(&shape_iri).unwrap(); - let tracked_child = Arc::get_mut(get_or_create_tracked_subject( - &obj_iri, - child_shape, - tracked_subjects, - )) - .unwrap(); - // Remove self from parent - tracked_child.parents.remove(obj_iri); + get_or_create_tracked_subject(&obj_iri, child_shape, tracked_subjects) + .write() + .unwrap() + .parents + .remove(obj_iri); } } } diff --git a/ng-verifier/src/orm/mod.rs b/ng-verifier/src/orm/mod.rs index 4e60b55..9f9f8c2 100644 --- a/ng-verifier/src/orm/mod.rs +++ b/ng-verifier/src/orm/mod.rs @@ -18,6 +18,7 @@ use ng_repo::types::OverlayId; use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; +use std::sync::RwLock; use std::u64; use futures::SinkExt; @@ -102,15 +103,16 @@ impl Verifier { .get(nuri) .unwrap() .iter() - .map(|s| { - s.shape_type + .map(|sub| { + sub.shape_type .schema - .get(&s.shape_type.shape) + .get(&sub.shape_type.shape) .unwrap() .clone() }) .collect(); + log_debug!("process_changes_for_nuri_and_session {:?}", shapes); for root_shape in shapes { self.process_changes_for_shape_and_session( nuri, @@ -158,6 +160,10 @@ impl Verifier { let mut nested_objects_to_eval: HashMap> = HashMap::new(); + log_debug!( + "processing_changes_for_shape_and_session for shape {:?}", + shape + ); // For each subject, add/remove triples and validate. for subject_iri in all_modified_subjects { @@ -188,9 +194,12 @@ impl Verifier { .get_mut(nuri) .unwrap() .iter_mut() - .find(|s| s.session_id == session_id && s.shape_type.shape == shape.iri) + .find(|sub| { + sub.session_id == session_id && sub.shape_type.shape == shape.iri + }) .unwrap(); + log_debug!("add_remove_triples for subject {subject_iri}"); if let Err(e) = add_remove_triples( shape.clone(), subject_iri, @@ -211,7 +220,7 @@ impl Verifier { let Some(tracked_subject) = tracked_subject_opt else { continue; }; // skip if missing - tracked_subject.valid.clone() + tracked_subject.read().unwrap().valid.clone() }; // Validate the subject. @@ -281,7 +290,7 @@ impl Verifier { nuri: &NuriV0, shape: Option<&ShapeIri>, session_id: Option<&u64>, - ) -> Vec<&Arc> { + ) -> Vec<&OrmSubscription> { self.orm_subscriptions.get(nuri).unwrap(). // Filter shapes, if present. iter().filter(|s| match shape { @@ -299,7 +308,7 @@ impl Verifier { nuri: &NuriV0, shape: Option<&ShapeIri>, session_id: Option<&u64>, - ) -> &Arc { + ) -> &OrmSubscription { self.orm_subscriptions.get(nuri).unwrap(). // Filter shapes, if present. iter().filter(|s| match shape { @@ -321,6 +330,7 @@ impl Verifier { nuri: &NuriV0, only_for_session_id: Option, ) -> Result { + log_debug!("apply_triple_changes {:?}", only_for_session_id); // If we have a specific session, handle only that subscription. if let Some(session_id) = only_for_session_id { return self.process_changes_for_nuri_and_session( @@ -365,7 +375,7 @@ impl Verifier { change: &OrmTrackedSubjectChange, changes: &OrmChanges, shape: &OrmSchemaShape, - tracked_subjects: &HashMap>>, + tracked_subjects: &HashMap>>>, ) -> Value { let mut orm_obj = json!({"id": change.subject_iri}); let orm_obj_map = orm_obj.as_object_mut().unwrap(); @@ -408,6 +418,7 @@ impl Verifier { shape_to_tracked_orm.get(matched_shape_iri) }) { + let nested_tracked_subject = nested_tracked_subject.read().unwrap(); if nested_tracked_subject.valid == OrmTrackedSubjectValidity::Valid { // Recurse return Some(Self::materialize_orm_object( @@ -489,17 +500,19 @@ impl Verifier { session_id: u64, shape_type: &OrmShapeType, ) -> Result { + log_debug!("create_orm_object_for_shape {:?}", shape_type); // Query triples for this shape let shape_query = shape_type_to_sparql(&shape_type.schema, &shape_type.shape, None)?; let shape_triples = self.query_sparql_construct(shape_query, Some(nuri_to_string(nuri)))?; - + log_debug!("query_sparql_construct done {:?}", shape_triples); let changes: OrmChanges = self.apply_triple_changes(&shape_triples, &[], nuri, Some(session_id.clone()))?; + log_debug!("apply_triple_changes done {:?}", changes); let orm_subscription = self.get_first_orm_subscription_for(nuri, Some(&shape_type.shape), Some(&session_id)); - let schema = &orm_subscription.shape_type.schema; + let schema: &HashMap> = &orm_subscription.shape_type.schema; let root_shape = schema.get(&shape_type.shape).unwrap(); let Some(_root_changes) = changes.get(&root_shape.iri).map(|s| s.values()) else { return Ok(Value::Array(vec![])); @@ -512,7 +525,7 @@ impl Verifier { // The way we get the changes from the tracked subjects is a bit hacky, sorry. for (subject_iri, tracked_subjects_by_shape) in &orm_subscription.tracked_subjects { if let Some(tracked_subject) = tracked_subjects_by_shape.get(&shape_type.shape) { - if tracked_subject.valid == OrmTrackedSubjectValidity::Valid { + if tracked_subject.read().unwrap().valid == OrmTrackedSubjectValidity::Valid { if let Some(change) = changes .get(&shape_type.shape) .and_then(|subject_iri_to_ts| subject_iri_to_ts.get(subject_iri).clone()) @@ -585,20 +598,20 @@ impl Verifier { // All referenced shapes must be available. // Create new subscription and add to self.orm_subscriptions - let orm_subscription = Arc::new(OrmSubscription { + let orm_subscription = OrmSubscription { shape_type: shape_type.clone(), session_id: session_id, sender: tx.clone(), tracked_subjects: HashMap::new(), nuri: nuri.clone(), - }); + }; self.orm_subscriptions .entry(nuri.clone()) .or_insert(vec![]) .push(orm_subscription); let _orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type); - + log_debug!("create_orm_object_for_shape return {:?}", _orm_objects); // TODO integrate response //self.push_orm_response().await; (only for requester, not all sessions) diff --git a/ng-verifier/src/orm/types.rs b/ng-verifier/src/orm/types.rs index a843924..8dd3916 100644 --- a/ng-verifier/src/orm/types.rs +++ b/ng-verifier/src/orm/types.rs @@ -7,13 +7,11 @@ // notice may not be copied, modified, or distributed except // according to those terms. -use std::{ - collections::HashMap, - sync::{Arc, Weak}, -}; +use std::{collections::HashMap, sync::Arc}; use ng_net::app_protocol::{AppResponse, NuriV0}; use ng_net::{orm::*, utils::Sender}; +use std::sync::RwLock; /// A struct for recording the state of subjects and its predicates /// relevant to its shape. @@ -21,10 +19,10 @@ use ng_net::{orm::*, utils::Sender}; pub struct OrmTrackedSubject { /// The known predicates (only those relevant to the shape). /// If there are no triples with a predicate, they are discarded - pub tracked_predicates: HashMap>, + pub tracked_predicates: HashMap>>, /// If this is a nested subject, this records the parents /// and if they are currently tracking this subject. - pub parents: HashMap>, + pub parents: HashMap>>, /// Validity. When untracked, triple updates are not processed here. pub valid: OrmTrackedSubjectValidity, pub subject_iri: String, @@ -45,7 +43,7 @@ pub struct OrmTrackedPredicate { /// The predicate schema pub schema: Arc, /// If the schema is a nested object, the children. - pub tracked_children: Vec>, + pub tracked_children: Vec>>, /// The count of triples for this subject and predicate. pub current_cardinality: i32, /// If schema is of type literal, the currently present ones. @@ -54,14 +52,16 @@ pub struct OrmTrackedPredicate { // Used only for tracking construction of new objects and diffs // in parallel to modifying the tracked objects and predicates. +#[derive(Debug)] pub struct OrmTrackedSubjectChange { pub subject_iri: String, /// Predicates that were changed. pub predicates: HashMap, } +#[derive(Debug)] pub struct OrmTrackedPredicateChanges { /// The tracked predicate for which those changes were recorded. - pub tracked_predicate: Weak, + pub tracked_predicate: Arc>, pub values_added: Vec, pub values_removed: Vec, } @@ -74,13 +74,13 @@ pub enum Term { Ref(String), } -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct OrmSubscription { pub shape_type: OrmShapeType, pub session_id: u64, pub nuri: NuriV0, pub sender: Sender, - pub tracked_subjects: HashMap>>, + pub tracked_subjects: HashMap>>>, } type ShapeIri = String; type SubjectIri = String; diff --git a/ng-verifier/src/orm/validation.rs b/ng-verifier/src/orm/validation.rs index dc18910..9e25ab7 100644 --- a/ng-verifier/src/orm/validation.rs +++ b/ng-verifier/src/orm/validation.rs @@ -9,7 +9,6 @@ use std::collections::HashMap; use std::collections::HashSet; -use std::sync::Arc; use crate::orm::types::*; use crate::verifier::*; @@ -22,35 +21,28 @@ impl Verifier { pub fn update_subject_validity( s_change: &OrmTrackedSubjectChange, shape: &OrmSchemaShape, - orm_subscription: &mut Arc, + orm_subscription: &mut OrmSubscription, previous_validity: OrmTrackedSubjectValidity, ) -> Vec<(String, String, bool)> { - let orm_sub = Arc::get_mut(orm_subscription).unwrap(); - let tracked_subjects = &mut orm_sub.tracked_subjects; + let tracked_subjects = &mut orm_subscription.tracked_subjects; - let Some(tracked_shapes) = tracked_subjects.get_mut(&s_change.subject_iri) else { + let Some(tracked_shapes) = tracked_subjects.get(&s_change.subject_iri) else { return vec![]; }; - let Some(tracked_subject) = tracked_shapes.get_mut(&shape.iri) else { + let Some(tracked_subject) = tracked_shapes.get(&shape.iri) else { return vec![]; }; - let tracked_subject = Arc::get_mut(tracked_subject).unwrap(); + let mut tracked_subject = tracked_subject.write().unwrap(); // Keep track of objects that need to be validated against a shape to fetch and validate. let mut need_evaluation: Vec<(String, String, bool)> = vec![]; // Check 1) Check if we need to fetch this object or all parents are untracked. if tracked_subject.parents.len() != 0 { - let no_parents_tracking = - tracked_subject - .parents - .values() - .all(|parent| match parent.upgrade() { - Some(subject) => { - subject.valid == OrmTrackedSubjectValidity::Untracked - || subject.valid == OrmTrackedSubjectValidity::Invalid - } - None => true, - }); + let no_parents_tracking = tracked_subject.parents.values().all(|parent| { + let subject = parent.read().unwrap(); + subject.valid == OrmTrackedSubjectValidity::Untracked + || subject.valid == OrmTrackedSubjectValidity::Invalid + }); if no_parents_tracking { // Remove tracked predicates and set untracked. @@ -87,7 +79,7 @@ impl Verifier { // Check 3) If there is an infinite loop of parents pointing back to us, return invalid. // Create a set of visited parents to detect cycles. - if has_cycle(tracked_subject, &mut HashSet::new()) { + if has_cycle(&tracked_subject, &mut HashSet::new()) { // Remove tracked predicates and set invalid. tracked_subject.tracked_predicates = HashMap::new(); tracked_subject.valid = OrmTrackedSubjectValidity::Invalid; @@ -97,7 +89,7 @@ impl Verifier { // Check 4) Validate subject against each predicate in shape. for p_schema in shape.predicates.iter() { let p_change = s_change.predicates.get(&p_schema.iri); - let tracked_pred = p_change.and_then(|pc| pc.tracked_predicate.upgrade()); + let tracked_pred = p_change.map(|pc| pc.tracked_predicate.read().unwrap()); let count = tracked_pred .as_ref() @@ -166,7 +158,7 @@ impl Verifier { let tracked_children = tracked_pred.as_ref().map(|tp| { tp.tracked_children .iter() - .filter_map(|weak_tc| weak_tc.upgrade()) + .map(|tc| tc.read().unwrap()) .collect::>() }); // First, Count valid, invalid, unknowns, and untracked @@ -295,10 +287,9 @@ impl Verifier { return tracked_subject .parents .values() - .filter_map(|parent| { - parent - .upgrade() - .map(|parent| (parent.subject_iri.clone(), parent.shape.iri.clone(), false)) + .map(|parent| { + let p = parent.read().unwrap(); + (p.subject_iri.clone(), p.shape.iri.clone(), false) }) // Add `need_evaluation`. .chain(need_evaluation) @@ -317,10 +308,8 @@ fn has_cycle(subject: &OrmTrackedSubject, visited: &mut HashSet) -> bool } visited.insert(subject.subject_iri.clone()); for (_parent_iri, parent_subject) in &subject.parents { - if let Some(parent_subject) = parent_subject.upgrade() { - if has_cycle(&parent_subject, visited) { - return true; - } + if has_cycle(&parent_subject.read().unwrap(), visited) { + return true; } } visited.remove(&subject.subject_iri); diff --git a/ng-verifier/src/verifier.rs b/ng-verifier/src/verifier.rs index 67319d2..05de65c 100644 --- a/ng-verifier/src/verifier.rs +++ b/ng-verifier/src/verifier.rs @@ -112,7 +112,7 @@ pub struct Verifier { in_memory_outbox: Vec, uploads: BTreeMap, branch_subscriptions: HashMap>, - pub(crate) orm_subscriptions: HashMap>>, + pub(crate) orm_subscriptions: HashMap>, pub(crate) temporary_repo_certificates: HashMap, }