|
|
|
@ -676,7 +676,11 @@ impl Verifier { |
|
|
|
|
|
|
|
|
|
// let mut updates = Vec::new();
|
|
|
|
|
|
|
|
|
|
let mut scopes = vec![]; |
|
|
|
|
for (scope, subs) in self.orm_subscriptions.iter_mut() { |
|
|
|
|
// Remove old subscriptions
|
|
|
|
|
subs.retain(|sub| !sub.sender.is_closed()); |
|
|
|
|
|
|
|
|
|
if scope.target == NuriTargetV0::UserSite |
|
|
|
|
|| scope |
|
|
|
|
.overlay |
|
|
|
@ -687,16 +691,26 @@ impl Verifier { |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let mut orm_changes: OrmChanges = HashMap::new(); |
|
|
|
|
// prepare to apply updates to tracked subjects and record the changes.
|
|
|
|
|
let shapes = subs |
|
|
|
|
.iter() |
|
|
|
|
.map(|sub| { |
|
|
|
|
sub.shape_type |
|
|
|
|
.schema |
|
|
|
|
.get(&sub.shape_type.shape) |
|
|
|
|
.unwrap() |
|
|
|
|
.clone() |
|
|
|
|
}) |
|
|
|
|
.collect::<Vec<_>>(); |
|
|
|
|
|
|
|
|
|
// Remove old subscriptions
|
|
|
|
|
subs.retain(|sub| !sub.sender.is_closed()); |
|
|
|
|
scopes.push((scope.clone(), shapes)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Apply updates to tracked subjects and record the changes.
|
|
|
|
|
let shape_types = subs.iter().map(|sub| &sub.shape_type); |
|
|
|
|
for shape_type in shape_types { |
|
|
|
|
let shape_arc = shape_type.schema.get(&shape_type.shape).unwrap().clone(); |
|
|
|
|
for (scope, shapes) in scopes { |
|
|
|
|
let mut orm_changes: OrmChanges = HashMap::new(); |
|
|
|
|
|
|
|
|
|
// actually applying updates to tracked subjects and record the changes.
|
|
|
|
|
for shape_arc in shapes { |
|
|
|
|
let _ = self.process_changes_for_shape_and_session( |
|
|
|
|
&scope, |
|
|
|
|
shape_arc, |
|
|
|
@ -708,7 +722,8 @@ impl Verifier { |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for sub in subs.iter_mut() { |
|
|
|
|
let subs = self.orm_subscriptions.get(&scope).unwrap(); |
|
|
|
|
for sub in subs.iter() { |
|
|
|
|
// TODO: This if-condition is wrong (intended to not re-apply changes coming from the same subscription).
|
|
|
|
|
if sub.session_id != session_id { |
|
|
|
|
// Create diff from changes & subscription.
|
|
|
|
@ -731,18 +746,18 @@ impl Verifier { |
|
|
|
|
let subject_shape = sub.shape_type.schema.get(shape_iri).unwrap(); |
|
|
|
|
|
|
|
|
|
// Iterate over every predicate change and create patches
|
|
|
|
|
for (pred_iri, pred_change) in change.predicates { |
|
|
|
|
for (pred_iri, pred_change) in change.predicates.iter() { |
|
|
|
|
let pred_shape = subject_shape |
|
|
|
|
.predicates |
|
|
|
|
.iter() |
|
|
|
|
.find(|p| p.iri == pred_iri) |
|
|
|
|
.find(|p| p.iri == *pred_iri) |
|
|
|
|
.unwrap(); |
|
|
|
|
|
|
|
|
|
let is_multi = |
|
|
|
|
pred_shape.maxCardinality > 1 || pred_shape.maxCardinality == -1; |
|
|
|
|
let is_object = |
|
|
|
|
pred_shape.dataTypes.iter().any(|dt| !dt.shape.is_none()); |
|
|
|
|
let pred_name = &pred_shape.readablePredicate; |
|
|
|
|
let pred_name = pred_shape.readablePredicate.clone(); |
|
|
|
|
path.push(pred_name); |
|
|
|
|
let path_str = path.join("/"); |
|
|
|
|
|
|
|
|
@ -799,8 +814,8 @@ impl Verifier { |
|
|
|
|
create_patches_for_changed_subj( |
|
|
|
|
orm_changes, |
|
|
|
|
patches, |
|
|
|
|
&pred_shape.dataTypes[0].shape.unwrap(), // TODO: We need to get to the information which of the object types was validated successfully.
|
|
|
|
|
match pred_change.values_added[0] { |
|
|
|
|
&pred_shape.dataTypes[0].shape.as_ref().unwrap(), // TODO: We need to get to the information which of the object types was validated successfully.
|
|
|
|
|
match &pred_change.values_added[0] { |
|
|
|
|
BasicType::Str(str) => &str, |
|
|
|
|
_ => panic!("Nested object should be a string."), |
|
|
|
|
}, |
|
|
|
@ -818,13 +833,15 @@ impl Verifier { |
|
|
|
|
} |
|
|
|
|
} else if is_multi && is_object { |
|
|
|
|
// Add every object added.
|
|
|
|
|
for object_iri_added in pred_change.values_added { |
|
|
|
|
for object_iri_added in pred_change.values_added.iter() { |
|
|
|
|
// TODO: As with single object, just that we add the IRI to the path.
|
|
|
|
|
// We also need to check if the object existed before.
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let tracked_predicate = |
|
|
|
|
pred_change.tracked_predicate.read().unwrap(); |
|
|
|
|
// Delete every object removed.
|
|
|
|
|
if tracked_predicate.children.len() == 0 { |
|
|
|
|
if tracked_predicate.tracked_children.len() == 0 { |
|
|
|
|
// Or the whole thing if no children remain
|
|
|
|
|
patches.push(OrmDiffOp { |
|
|
|
|
op: OrmDiffOpType::remove, |
|
|
|
@ -833,7 +850,7 @@ impl Verifier { |
|
|
|
|
value: None, |
|
|
|
|
}); |
|
|
|
|
} else { |
|
|
|
|
for object_iri_removed in pred_change.values_removed { |
|
|
|
|
for object_iri_removed in pred_change.values_removed.iter() { |
|
|
|
|
patches.push(OrmDiffOp { |
|
|
|
|
op: OrmDiffOpType::remove, |
|
|
|
|
valType: Some(OrmDiffType::object), |
|
|
|
@ -871,13 +888,14 @@ impl Verifier { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// For each tracked subject that has the subscription's shape, call fn above
|
|
|
|
|
for subject_iri in sub.tracked_subjects { // TODO
|
|
|
|
|
for subject_iri in sub.tracked_subjects.iter() { // TODO
|
|
|
|
|
} |
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
let orm_diff: OrmDiff = vec![]; |
|
|
|
|
let _ = sub |
|
|
|
|
.sender |
|
|
|
|
.clone() |
|
|
|
|
.send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff.to_vec()))) |
|
|
|
|
.await; |
|
|
|
|
} |
|
|
|
|