switching to RwLock instead of Arc

feat/orm
Niko PLP 2 weeks ago
parent 0d37d89ffd
commit 6d76e92d45
  1. 7
      nextgraph/src/tests/orm.rs
  2. 1
      ng-repo/src/errors.rs
  3. 210
      ng-verifier/src/orm/add_remove_triples.rs
  4. 41
      ng-verifier/src/orm/mod.rs
  5. 20
      ng-verifier/src/orm/types.rs
  6. 47
      ng-verifier/src/orm/validation.rs
  7. 2
      ng-verifier/src/verifier.rs

@ -818,6 +818,11 @@ INSERT DATA {
log_info!("orm_start called");
// TODO: remove this call to cancel_fn()
cancel_fn();
// TODO : change the data with sparql_query
while let Some(app_response) = receiver.next().await {
let orm_json = match app_response {
AppResponse::V0(v) => match v {
@ -828,6 +833,8 @@ INSERT DATA {
.unwrap();
log_info!("ORM JSON arrived\n: {:?}", orm_json);
// TODO: after we got what we wanted, call cancel_fn, otherwise the test never ends.
}
//
}

@ -399,6 +399,7 @@ pub enum VerifierError {
InvalidInboxPost,
InvalidOrmSchema,
OrmSubjectNotFound,
OrmPredicateNotFound,
}
impl Error for VerifierError {}

@ -8,11 +8,9 @@
// according to those terms.
use ng_oxigraph::oxrdf::Triple;
use ng_repo::errors::NgError;
use ng_repo::errors::VerifierError;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Weak;
use std::sync::{Arc, RwLock};
use crate::orm::types::*;
use ng_net::orm::*;
@ -26,29 +24,36 @@ pub fn add_remove_triples(
subject_iri: &str,
triples_added: &[&Triple],
triples_removed: &[&Triple],
orm_subscription: &mut Arc<OrmSubscription>,
orm_subscription: &mut OrmSubscription,
subject_changes: &mut OrmTrackedSubjectChange,
) -> Result<(), VerifierError> {
fn get_tracked_subject(
subject_iri: &str,
shape: &Arc<OrmSchemaShape>,
tracked_subjects: &HashMap<String, HashMap<String, Arc<OrmTrackedSubject>>>,
) -> Result<Weak<OrmTrackedSubject>, VerifierError> {
let tracked_shapes_for_subject = tracked_subjects
.get(&subject_iri.to_string())
.ok_or(VerifierError::OrmSubjectNotFound)?;
let subject = tracked_shapes_for_subject
.get(&shape.iri)
.ok_or(VerifierError::OrmSubjectNotFound)?;
Ok(Arc::<OrmTrackedSubject>::downgrade(&subject))
}
// fn get_tracked_predicate<'a>(
// subject_iri: &str,
// shape: &Arc<OrmSchemaShape>,
// predicate_schema_iri: &String,
// tracked_subjects: &'a mut HashMap<String, HashMap<String, Arc<RwLock<OrmTrackedSubject>>>>,
// ) -> Result<Arc<RwLock<OrmTrackedPredicate>>, VerifierError> {
// let tracked_shapes_for_subject = tracked_subjects
// .get_mut(&subject_iri.to_string())
// .ok_or(VerifierError::OrmSubjectNotFound)?;
// let subject = tracked_shapes_for_subject
// .get_mut(&shape.iri)
// .ok_or(VerifierError::OrmSubjectNotFound)?;
// Ok(subject
// .read()
// .unwrap()
// .tracked_predicates
// .get(predicate_schema_iri)
// .ok_or(VerifierError::OrmPredicateNotFound)?
// .clone())
// }
// Helper to get/create tracked subjects
fn get_or_create_tracked_subject<'a>(
subject_iri: &str,
shape: &Arc<OrmSchemaShape>,
tracked_subjects: &'a mut HashMap<String, HashMap<String, Arc<OrmTrackedSubject>>>,
) -> &'a mut Arc<OrmTrackedSubject> {
tracked_subjects: &'a mut HashMap<String, HashMap<String, Arc<RwLock<OrmTrackedSubject>>>>,
) -> Arc<RwLock<OrmTrackedSubject>> {
let tracked_shapes_for_subject = tracked_subjects
.entry(subject_iri.to_string())
.or_insert_with(HashMap::new);
@ -56,94 +61,80 @@ pub fn add_remove_triples(
let subject = tracked_shapes_for_subject
.entry(shape.iri.clone())
.or_insert_with(|| {
Arc::new(OrmTrackedSubject {
Arc::new(RwLock::new(OrmTrackedSubject {
tracked_predicates: HashMap::new(),
parents: HashMap::new(),
valid: OrmTrackedSubjectValidity::Pending,
subject_iri: subject_iri.to_string(),
shape: shape.clone(),
})
}))
});
//let strong = Arc::get_mut(subject).unwrap();
// log_info!(
// "strong {} weak {}",
// Arc::<OrmTrackedSubject>::strong_count(&subject),
// Arc::<OrmTrackedSubject>::weak_count(&subject)
// );
subject
subject.clone()
}
// Destructure to get separate references and avoid borrowing conflicts
let orm_sub = Arc::get_mut(orm_subscription).unwrap();
let schema = &orm_sub.shape_type.schema;
let tracked_subjects = &mut orm_sub.tracked_subjects;
// log_info!(
// "strong {} weak {}",
// Arc::<OrmTrackedSubject>::strong_count(&tracked_subject_strong),
// Arc::<OrmTrackedSubject>::weak_count(&tracked_subject_strong)
// );
// let tracked_subject_weak = Arc::<OrmTrackedSubject>::downgrade(&tracked_subject_strong);
let schema = &orm_subscription.shape_type.schema;
let tracked_subjects = &mut orm_subscription.tracked_subjects;
// Process added triples.
// For each triple, check if it matches the shape.
// In parallel, we record the values added and removed (tracked_changes)
for triple in triples_added {
let obj_term = oxrdf_term_to_orm_basic_type(&triple.object);
log_debug!("processing triple {triple}");
for predicate_schema in &shape.predicates {
if predicate_schema.iri != triple.predicate.as_str() {
// Triple does not match predicate.
continue;
}
// Predicate schema constraint matches this triple.
let mut tracked_subject_upgraded =
let tracked_subject_lock =
get_or_create_tracked_subject(subject_iri, &shape, tracked_subjects);
let tracked_subject = Arc::get_mut(&mut tracked_subject_upgraded).unwrap();
let mut tracked_subject = tracked_subject_lock.write().unwrap();
log_debug!("lock acquired on tracked_subject");
// Add tracked predicate or increase cardinality
let tracked_predicate_ = tracked_subject
let tracked_predicate_lock = tracked_subject
.tracked_predicates
.entry(predicate_schema.iri.to_string())
.entry(predicate_schema.iri.clone())
.or_insert_with(|| {
Arc::new(OrmTrackedPredicate {
Arc::new(RwLock::new(OrmTrackedPredicate {
current_cardinality: 0,
schema: predicate_schema.clone(),
tracked_children: Vec::new(),
current_literals: None,
})
}))
});
let tracked_predicate_weak = Arc::downgrade(&tracked_predicate_);
let tracked_predicate = Arc::get_mut(tracked_predicate_).unwrap();
tracked_predicate.current_cardinality += 1;
let obj_term = oxrdf_term_to_orm_basic_type(&triple.object);
{
let mut tracked_predicate = tracked_predicate_lock.write().unwrap();
log_debug!("lock acquired on tracked_predicate");
tracked_predicate.current_cardinality += 1;
// Keep track of the changed values too.
let pred_changes: &mut OrmTrackedPredicateChanges = subject_changes
.predicates
.entry(predicate_schema.iri.clone())
.or_insert_with(|| OrmTrackedPredicateChanges {
tracked_predicate: tracked_predicate_weak.clone(), // reference remains inside lifetime of this call
values_added: Vec::new(),
values_removed: Vec::new(),
});
// Keep track of the changed values too.
let pred_changes: &mut OrmTrackedPredicateChanges = subject_changes
.predicates
.entry(predicate_schema.iri.clone())
.or_insert_with(|| OrmTrackedPredicateChanges {
tracked_predicate: tracked_predicate_lock.clone(),
values_added: Vec::new(),
values_removed: Vec::new(),
});
pred_changes.values_added.push(obj_term.clone());
pred_changes.values_added.push(obj_term.clone());
// If value type is literal, we need to add the current value to the tracked predicate.
if tracked_predicate
.schema
.dataTypes
.iter()
.any(|dt| dt.valType == OrmSchemaLiteralType::literal)
{
match &mut tracked_predicate.current_literals {
Some(lits) => lits.push(obj_term.clone()),
None => {
tracked_predicate.current_literals = Some(vec![obj_term.clone()]);
// If value type is literal, we need to add the current value to the tracked predicate.
if tracked_predicate
.schema
.dataTypes
.iter()
.any(|dt| dt.valType == OrmSchemaLiteralType::literal)
{
match &mut tracked_predicate.current_literals {
Some(lits) => lits.push(obj_term.clone()),
None => {
tracked_predicate.current_literals = Some(vec![obj_term.clone()]);
}
}
}
}
// If predicate is of type shape, register
// "parent (predicate) -> child subject" and `child_subject.parents`.
for shape_iri in predicate_schema.dataTypes.iter().filter_map(|dt| {
@ -153,30 +144,39 @@ pub fn add_remove_triples(
None
}
}) {
log_debug!("dealing with nesting for {shape_iri}");
if let BasicType::Str(obj_iri) = &obj_term {
// Get or create object's tracked subject struct.
let child_shape = schema.get(&shape_iri).unwrap();
// find the parent
let parent = get_tracked_subject(subject_iri, &shape, tracked_subjects)?;
let tracked_child_arc = {
// Get or create object's tracked subject struct.
let child_shape = schema.get(&shape_iri).unwrap();
// find the parent
let parent =
get_or_create_tracked_subject(subject_iri, &shape, tracked_subjects);
// If this actually created a new tracked subject, that's fine and will be removed during validation.
let tracked_child =
get_or_create_tracked_subject(obj_iri, child_shape, tracked_subjects);
// If this actually created a new tracked subject, that's fine and will be removed during validation.
let tracked_child =
get_or_create_tracked_subject(obj_iri, child_shape, tracked_subjects);
// Add self to parent.
Arc::get_mut(tracked_child)
.unwrap()
.parents
.insert(subject_iri.to_string(), parent);
// Add self to parent.
tracked_child
.write()
.unwrap()
.parents
.insert(subject_iri.to_string(), parent);
log_debug!("lock acquired on tracked_child {obj_iri}");
tracked_child
};
// Add link to children
let mut upgraded = tracked_predicate_weak.upgrade().unwrap();
let tracked_predicate = Arc::get_mut(&mut upgraded).unwrap();
tracked_predicate
.tracked_children
.push(Arc::<OrmTrackedSubject>::downgrade(&tracked_child));
let mut tracked_predicate = tracked_predicate_lock.write().unwrap();
log_debug!(
"for children, lock acquired on tracked_predicate {}",
predicate_schema.iri
);
tracked_predicate.tracked_children.push(tracked_child_arc);
}
}
log_debug!("end of dealing with nesting");
}
}
// Process removed triples.
@ -185,18 +185,13 @@ pub fn add_remove_triples(
// Only adjust if we had tracked state.
let tracked_predicate_opt = tracked_subjects
.get_mut(subject_iri)
.and_then(|tss| tss.get_mut(&shape.iri))
.and_then(|ts| {
Arc::get_mut(ts)
.unwrap()
.tracked_predicates
.get_mut(pred_iri)
});
let Some(tracked_predicate_arc) = tracked_predicate_opt else {
.get(subject_iri)
.and_then(|tss| tss.get(&shape.iri))
.and_then(|ts| ts.read().unwrap().tracked_predicates.get(pred_iri).cloned());
let Some(tracked_predicate_rc) = tracked_predicate_opt else {
continue;
};
let tracked_predicate = Arc::get_mut(tracked_predicate_arc).unwrap();
let mut tracked_predicate = tracked_predicate_rc.write().unwrap();
// The cardinality might become -1 or 0. We will remove them from the tracked predicates during validation.
tracked_predicate.current_cardinality =
@ -248,21 +243,18 @@ pub fn add_remove_triples(
// Remove link to children
tracked_predicate
.tracked_children
.retain(|c| *obj_iri != c.upgrade().unwrap().subject_iri);
.retain(|c| *obj_iri != c.read().unwrap().subject_iri);
for shape_iri in shapes_to_process {
// Get or create object's tracked subject struct.
let child_shape = schema.get(&shape_iri).unwrap();
let tracked_child = Arc::get_mut(get_or_create_tracked_subject(
&obj_iri,
child_shape,
tracked_subjects,
))
.unwrap();
// Remove self from parent
tracked_child.parents.remove(obj_iri);
get_or_create_tracked_subject(&obj_iri, child_shape, tracked_subjects)
.write()
.unwrap()
.parents
.remove(obj_iri);
}
}
}

@ -18,6 +18,7 @@ use ng_repo::types::OverlayId;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::RwLock;
use std::u64;
use futures::SinkExt;
@ -102,15 +103,16 @@ impl Verifier {
.get(nuri)
.unwrap()
.iter()
.map(|s| {
s.shape_type
.map(|sub| {
sub.shape_type
.schema
.get(&s.shape_type.shape)
.get(&sub.shape_type.shape)
.unwrap()
.clone()
})
.collect();
log_debug!("process_changes_for_nuri_and_session {:?}", shapes);
for root_shape in shapes {
self.process_changes_for_shape_and_session(
nuri,
@ -158,6 +160,10 @@ impl Verifier {
let mut nested_objects_to_eval: HashMap<ShapeIri, Vec<(SubjectIri, bool)>> =
HashMap::new();
log_debug!(
"processing_changes_for_shape_and_session for shape {:?}",
shape
);
// For each subject, add/remove triples and validate.
for subject_iri in all_modified_subjects {
@ -188,9 +194,12 @@ impl Verifier {
.get_mut(nuri)
.unwrap()
.iter_mut()
.find(|s| s.session_id == session_id && s.shape_type.shape == shape.iri)
.find(|sub| {
sub.session_id == session_id && sub.shape_type.shape == shape.iri
})
.unwrap();
log_debug!("add_remove_triples for subject {subject_iri}");
if let Err(e) = add_remove_triples(
shape.clone(),
subject_iri,
@ -211,7 +220,7 @@ impl Verifier {
let Some(tracked_subject) = tracked_subject_opt else {
continue;
}; // skip if missing
tracked_subject.valid.clone()
tracked_subject.read().unwrap().valid.clone()
};
// Validate the subject.
@ -281,7 +290,7 @@ impl Verifier {
nuri: &NuriV0,
shape: Option<&ShapeIri>,
session_id: Option<&u64>,
) -> Vec<&Arc<OrmSubscription>> {
) -> Vec<&OrmSubscription> {
self.orm_subscriptions.get(nuri).unwrap().
// Filter shapes, if present.
iter().filter(|s| match shape {
@ -299,7 +308,7 @@ impl Verifier {
nuri: &NuriV0,
shape: Option<&ShapeIri>,
session_id: Option<&u64>,
) -> &Arc<OrmSubscription> {
) -> &OrmSubscription {
self.orm_subscriptions.get(nuri).unwrap().
// Filter shapes, if present.
iter().filter(|s| match shape {
@ -321,6 +330,7 @@ impl Verifier {
nuri: &NuriV0,
only_for_session_id: Option<u64>,
) -> Result<OrmChanges, NgError> {
log_debug!("apply_triple_changes {:?}", only_for_session_id);
// If we have a specific session, handle only that subscription.
if let Some(session_id) = only_for_session_id {
return self.process_changes_for_nuri_and_session(
@ -365,7 +375,7 @@ impl Verifier {
change: &OrmTrackedSubjectChange,
changes: &OrmChanges,
shape: &OrmSchemaShape,
tracked_subjects: &HashMap<String, HashMap<String, Arc<OrmTrackedSubject>>>,
tracked_subjects: &HashMap<String, HashMap<String, Arc<RwLock<OrmTrackedSubject>>>>,
) -> Value {
let mut orm_obj = json!({"id": change.subject_iri});
let orm_obj_map = orm_obj.as_object_mut().unwrap();
@ -408,6 +418,7 @@ impl Verifier {
shape_to_tracked_orm.get(matched_shape_iri)
})
{
let nested_tracked_subject = nested_tracked_subject.read().unwrap();
if nested_tracked_subject.valid == OrmTrackedSubjectValidity::Valid {
// Recurse
return Some(Self::materialize_orm_object(
@ -489,17 +500,19 @@ impl Verifier {
session_id: u64,
shape_type: &OrmShapeType,
) -> Result<Value, NgError> {
log_debug!("create_orm_object_for_shape {:?}", shape_type);
// Query triples for this shape
let shape_query = shape_type_to_sparql(&shape_type.schema, &shape_type.shape, None)?;
let shape_triples = self.query_sparql_construct(shape_query, Some(nuri_to_string(nuri)))?;
log_debug!("query_sparql_construct done {:?}", shape_triples);
let changes: OrmChanges =
self.apply_triple_changes(&shape_triples, &[], nuri, Some(session_id.clone()))?;
log_debug!("apply_triple_changes done {:?}", changes);
let orm_subscription =
self.get_first_orm_subscription_for(nuri, Some(&shape_type.shape), Some(&session_id));
let schema = &orm_subscription.shape_type.schema;
let schema: &HashMap<String, Arc<OrmSchemaShape>> = &orm_subscription.shape_type.schema;
let root_shape = schema.get(&shape_type.shape).unwrap();
let Some(_root_changes) = changes.get(&root_shape.iri).map(|s| s.values()) else {
return Ok(Value::Array(vec![]));
@ -512,7 +525,7 @@ impl Verifier {
// The way we get the changes from the tracked subjects is a bit hacky, sorry.
for (subject_iri, tracked_subjects_by_shape) in &orm_subscription.tracked_subjects {
if let Some(tracked_subject) = tracked_subjects_by_shape.get(&shape_type.shape) {
if tracked_subject.valid == OrmTrackedSubjectValidity::Valid {
if tracked_subject.read().unwrap().valid == OrmTrackedSubjectValidity::Valid {
if let Some(change) = changes
.get(&shape_type.shape)
.and_then(|subject_iri_to_ts| subject_iri_to_ts.get(subject_iri).clone())
@ -585,20 +598,20 @@ impl Verifier {
// All referenced shapes must be available.
// Create new subscription and add to self.orm_subscriptions
let orm_subscription = Arc::new(OrmSubscription {
let orm_subscription = OrmSubscription {
shape_type: shape_type.clone(),
session_id: session_id,
sender: tx.clone(),
tracked_subjects: HashMap::new(),
nuri: nuri.clone(),
});
};
self.orm_subscriptions
.entry(nuri.clone())
.or_insert(vec![])
.push(orm_subscription);
let _orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type);
log_debug!("create_orm_object_for_shape return {:?}", _orm_objects);
// TODO integrate response
//self.push_orm_response().await; (only for requester, not all sessions)

@ -7,13 +7,11 @@
// notice may not be copied, modified, or distributed except
// according to those terms.
use std::{
collections::HashMap,
sync::{Arc, Weak},
};
use std::{collections::HashMap, sync::Arc};
use ng_net::app_protocol::{AppResponse, NuriV0};
use ng_net::{orm::*, utils::Sender};
use std::sync::RwLock;
/// A struct for recording the state of subjects and its predicates
/// relevant to its shape.
@ -21,10 +19,10 @@ use ng_net::{orm::*, utils::Sender};
pub struct OrmTrackedSubject {
/// The known predicates (only those relevant to the shape).
/// If there are no triples with a predicate, they are discarded
pub tracked_predicates: HashMap<String, Arc<OrmTrackedPredicate>>,
pub tracked_predicates: HashMap<String, Arc<RwLock<OrmTrackedPredicate>>>,
/// If this is a nested subject, this records the parents
/// and if they are currently tracking this subject.
pub parents: HashMap<String, Weak<OrmTrackedSubject>>,
pub parents: HashMap<String, Arc<RwLock<OrmTrackedSubject>>>,
/// Validity. When untracked, triple updates are not processed here.
pub valid: OrmTrackedSubjectValidity,
pub subject_iri: String,
@ -45,7 +43,7 @@ pub struct OrmTrackedPredicate {
/// The predicate schema
pub schema: Arc<OrmSchemaPredicate>,
/// If the schema is a nested object, the children.
pub tracked_children: Vec<Weak<OrmTrackedSubject>>,
pub tracked_children: Vec<Arc<RwLock<OrmTrackedSubject>>>,
/// The count of triples for this subject and predicate.
pub current_cardinality: i32,
/// If schema is of type literal, the currently present ones.
@ -54,14 +52,16 @@ pub struct OrmTrackedPredicate {
// Used only for tracking construction of new objects and diffs
// in parallel to modifying the tracked objects and predicates.
#[derive(Debug)]
pub struct OrmTrackedSubjectChange {
pub subject_iri: String,
/// Predicates that were changed.
pub predicates: HashMap<String, OrmTrackedPredicateChanges>,
}
#[derive(Debug)]
pub struct OrmTrackedPredicateChanges {
/// The tracked predicate for which those changes were recorded.
pub tracked_predicate: Weak<OrmTrackedPredicate>,
pub tracked_predicate: Arc<RwLock<OrmTrackedPredicate>>,
pub values_added: Vec<BasicType>,
pub values_removed: Vec<BasicType>,
}
@ -74,13 +74,13 @@ pub enum Term {
Ref(String),
}
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct OrmSubscription {
pub shape_type: OrmShapeType,
pub session_id: u64,
pub nuri: NuriV0,
pub sender: Sender<AppResponse>,
pub tracked_subjects: HashMap<SubjectIri, HashMap<ShapeIri, Arc<OrmTrackedSubject>>>,
pub tracked_subjects: HashMap<SubjectIri, HashMap<ShapeIri, Arc<RwLock<OrmTrackedSubject>>>>,
}
type ShapeIri = String;
type SubjectIri = String;

@ -9,7 +9,6 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use crate::orm::types::*;
use crate::verifier::*;
@ -22,35 +21,28 @@ impl Verifier {
pub fn update_subject_validity(
s_change: &OrmTrackedSubjectChange,
shape: &OrmSchemaShape,
orm_subscription: &mut Arc<OrmSubscription>,
orm_subscription: &mut OrmSubscription,
previous_validity: OrmTrackedSubjectValidity,
) -> Vec<(String, String, bool)> {
let orm_sub = Arc::get_mut(orm_subscription).unwrap();
let tracked_subjects = &mut orm_sub.tracked_subjects;
let tracked_subjects = &mut orm_subscription.tracked_subjects;
let Some(tracked_shapes) = tracked_subjects.get_mut(&s_change.subject_iri) else {
let Some(tracked_shapes) = tracked_subjects.get(&s_change.subject_iri) else {
return vec![];
};
let Some(tracked_subject) = tracked_shapes.get_mut(&shape.iri) else {
let Some(tracked_subject) = tracked_shapes.get(&shape.iri) else {
return vec![];
};
let tracked_subject = Arc::get_mut(tracked_subject).unwrap();
let mut tracked_subject = tracked_subject.write().unwrap();
// Keep track of objects that need to be validated against a shape to fetch and validate.
let mut need_evaluation: Vec<(String, String, bool)> = vec![];
// Check 1) Check if we need to fetch this object or all parents are untracked.
if tracked_subject.parents.len() != 0 {
let no_parents_tracking =
tracked_subject
.parents
.values()
.all(|parent| match parent.upgrade() {
Some(subject) => {
subject.valid == OrmTrackedSubjectValidity::Untracked
|| subject.valid == OrmTrackedSubjectValidity::Invalid
}
None => true,
});
let no_parents_tracking = tracked_subject.parents.values().all(|parent| {
let subject = parent.read().unwrap();
subject.valid == OrmTrackedSubjectValidity::Untracked
|| subject.valid == OrmTrackedSubjectValidity::Invalid
});
if no_parents_tracking {
// Remove tracked predicates and set untracked.
@ -87,7 +79,7 @@ impl Verifier {
// Check 3) If there is an infinite loop of parents pointing back to us, return invalid.
// Create a set of visited parents to detect cycles.
if has_cycle(tracked_subject, &mut HashSet::new()) {
if has_cycle(&tracked_subject, &mut HashSet::new()) {
// Remove tracked predicates and set invalid.
tracked_subject.tracked_predicates = HashMap::new();
tracked_subject.valid = OrmTrackedSubjectValidity::Invalid;
@ -97,7 +89,7 @@ impl Verifier {
// Check 4) Validate subject against each predicate in shape.
for p_schema in shape.predicates.iter() {
let p_change = s_change.predicates.get(&p_schema.iri);
let tracked_pred = p_change.and_then(|pc| pc.tracked_predicate.upgrade());
let tracked_pred = p_change.map(|pc| pc.tracked_predicate.read().unwrap());
let count = tracked_pred
.as_ref()
@ -166,7 +158,7 @@ impl Verifier {
let tracked_children = tracked_pred.as_ref().map(|tp| {
tp.tracked_children
.iter()
.filter_map(|weak_tc| weak_tc.upgrade())
.map(|tc| tc.read().unwrap())
.collect::<Vec<_>>()
});
// First, Count valid, invalid, unknowns, and untracked
@ -295,10 +287,9 @@ impl Verifier {
return tracked_subject
.parents
.values()
.filter_map(|parent| {
parent
.upgrade()
.map(|parent| (parent.subject_iri.clone(), parent.shape.iri.clone(), false))
.map(|parent| {
let p = parent.read().unwrap();
(p.subject_iri.clone(), p.shape.iri.clone(), false)
})
// Add `need_evaluation`.
.chain(need_evaluation)
@ -317,10 +308,8 @@ fn has_cycle(subject: &OrmTrackedSubject, visited: &mut HashSet<String>) -> bool
}
visited.insert(subject.subject_iri.clone());
for (_parent_iri, parent_subject) in &subject.parents {
if let Some(parent_subject) = parent_subject.upgrade() {
if has_cycle(&parent_subject, visited) {
return true;
}
if has_cycle(&parent_subject.read().unwrap(), visited) {
return true;
}
}
visited.remove(&subject.subject_iri);

@ -112,7 +112,7 @@ pub struct Verifier {
in_memory_outbox: Vec<EventOutboxStorage>,
uploads: BTreeMap<u32, RandomAccessFile>,
branch_subscriptions: HashMap<BranchId, Sender<AppResponse>>,
pub(crate) orm_subscriptions: HashMap<NuriV0, Vec<Arc<OrmSubscription>>>,
pub(crate) orm_subscriptions: HashMap<NuriV0, Vec<OrmSubscription>>,
pub(crate) temporary_repo_certificates: HashMap<RepoId, ObjectRef>,
}

Loading…
Cancel
Save