|
|
|
@ -391,14 +391,10 @@ impl Verifier { |
|
|
|
|
shape: Option<&ShapeIri>, |
|
|
|
|
session_id: Option<&u64>, |
|
|
|
|
) -> Result<(UnboundedSender<AppResponse>, &OrmSubscription), VerifierError> { |
|
|
|
|
let subs = self |
|
|
|
|
.orm_subscriptions |
|
|
|
|
.get_mut(nuri) |
|
|
|
|
.unwrap(); |
|
|
|
|
let subs = self.orm_subscriptions.get_mut(nuri).unwrap(); |
|
|
|
|
subs.retain(|sub| !sub.sender.is_closed()); |
|
|
|
|
match
|
|
|
|
|
// Filter shapes, if present.
|
|
|
|
|
subs.iter() |
|
|
|
|
match subs // Filter shapes, if present.
|
|
|
|
|
.iter() |
|
|
|
|
.filter(|s| match shape { |
|
|
|
|
Some(sh) => *sh == s.shape_type.shape, |
|
|
|
|
None => true, // Filter session ids if present.
|
|
|
|
@ -410,9 +406,7 @@ impl Verifier { |
|
|
|
|
.next() |
|
|
|
|
{ |
|
|
|
|
None => Err(VerifierError::OrmSubscriptionNotFound), |
|
|
|
|
Some(subscription) => { |
|
|
|
|
Ok((subscription.sender.clone(), subscription)) |
|
|
|
|
} |
|
|
|
|
Some(subscription) => Ok((subscription.sender.clone(), subscription)), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -658,7 +652,8 @@ impl Verifier { |
|
|
|
|
return Ok(return_vals); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub(crate) async fn orm_update( |
|
|
|
|
/// Generate and send JSON patches from GraphQuadsPatch (quad inserts and removes) to JS-land.
|
|
|
|
|
pub(crate) async fn orm_backend_update( |
|
|
|
|
&mut self, |
|
|
|
|
session_id: u64, |
|
|
|
|
repo_id: RepoId, |
|
|
|
@ -666,26 +661,249 @@ impl Verifier { |
|
|
|
|
patch: GraphQuadsPatch, |
|
|
|
|
) { |
|
|
|
|
let overlaylink: OverlayLink = overlay_id.into(); |
|
|
|
|
|
|
|
|
|
// We need to apply the patches to all subscriptions we have. We can use process_changes_for_*
|
|
|
|
|
// That updates the tracked subjects, validates them, and returns a set of changes structured
|
|
|
|
|
// by the respective schema.
|
|
|
|
|
|
|
|
|
|
let triple_inserts: Vec<Triple> = patch |
|
|
|
|
.inserts |
|
|
|
|
.iter() |
|
|
|
|
.map(|quad| { |
|
|
|
|
Triple::new( |
|
|
|
|
quad.subject.clone(), |
|
|
|
|
quad.predicate.clone(), |
|
|
|
|
quad.object.clone(), |
|
|
|
|
) |
|
|
|
|
}) |
|
|
|
|
.collect(); |
|
|
|
|
let triple_removes: Vec<Triple> = patch |
|
|
|
|
.removes |
|
|
|
|
.iter() |
|
|
|
|
.map(|quad| { |
|
|
|
|
Triple::new( |
|
|
|
|
quad.subject.clone(), |
|
|
|
|
quad.predicate.clone(), |
|
|
|
|
quad.object.clone(), |
|
|
|
|
) |
|
|
|
|
}) |
|
|
|
|
.collect(); |
|
|
|
|
|
|
|
|
|
// let mut updates = Vec::new();
|
|
|
|
|
|
|
|
|
|
for (scope, subs) in self.orm_subscriptions.iter_mut() { |
|
|
|
|
subs.retain(|sub| !sub.sender.is_closed()); |
|
|
|
|
if scope.target == NuriTargetV0::UserSite |
|
|
|
|
|| scope.overlay.as_ref().map_or(false, |ol| overlaylink == *ol) |
|
|
|
|
|| scope |
|
|
|
|
.overlay |
|
|
|
|
.as_ref() |
|
|
|
|
.map_or(false, |ol| overlaylink == *ol) |
|
|
|
|
|| scope.target == NuriTargetV0::Repo(repo_id) |
|
|
|
|
{ |
|
|
|
|
for sub in subs { |
|
|
|
|
if sub.session_id != session_id { // this is incorrect. we are excluding all the subscriptions from the originating session,
|
|
|
|
|
// while we should only exclude the one with exact same shape_type. but we don't have access to that here
|
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TODO: implement this, generate orm_diff using the patch and the sub.shape_type
|
|
|
|
|
let orm_diff: OrmDiff = vec![]; |
|
|
|
|
let mut orm_changes: OrmChanges = HashMap::new(); |
|
|
|
|
|
|
|
|
|
// Remove old subscriptions
|
|
|
|
|
subs.retain(|sub| !sub.sender.is_closed()); |
|
|
|
|
|
|
|
|
|
// Apply updates to tracked subjects and record the changes.
|
|
|
|
|
let shape_types = subs.iter().map(|sub| &sub.shape_type); |
|
|
|
|
for shape_type in shape_types { |
|
|
|
|
let shape_arc = shape_type.schema.get(&shape_type.shape).unwrap().clone(); |
|
|
|
|
|
|
|
|
|
let _ = self.process_changes_for_shape_and_session( |
|
|
|
|
&scope, |
|
|
|
|
shape_arc, |
|
|
|
|
session_id, |
|
|
|
|
&triple_inserts, |
|
|
|
|
&triple_removes, |
|
|
|
|
&mut orm_changes, |
|
|
|
|
false, |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for sub in subs.iter_mut() { |
|
|
|
|
// TODO: This if-condition is wrong (intended to not re-apply changes coming from the same subscription).
|
|
|
|
|
if sub.session_id != session_id { |
|
|
|
|
// Create diff from changes & subscription.
|
|
|
|
|
|
|
|
|
|
let mut patches: OrmDiff = vec![]; |
|
|
|
|
let mut path: Vec<String> = vec![]; |
|
|
|
|
fn create_patches_for_changed_subj( |
|
|
|
|
orm_changes: &OrmChanges, |
|
|
|
|
patches: &mut OrmDiff, |
|
|
|
|
shape_iri: &String, |
|
|
|
|
subject_iri: &String, |
|
|
|
|
sub: &OrmSubscription, |
|
|
|
|
path: &mut Vec<String>, |
|
|
|
|
) { |
|
|
|
|
let change = orm_changes |
|
|
|
|
.get(shape_iri) |
|
|
|
|
.unwrap() |
|
|
|
|
.get(subject_iri) |
|
|
|
|
.unwrap(); |
|
|
|
|
let subject_shape = sub.shape_type.schema.get(shape_iri).unwrap(); |
|
|
|
|
|
|
|
|
|
// Iterate over every predicate change and create patches
|
|
|
|
|
for (pred_iri, pred_change) in change.predicates { |
|
|
|
|
let pred_shape = subject_shape |
|
|
|
|
.predicates |
|
|
|
|
.iter() |
|
|
|
|
.find(|p| p.iri == pred_iri) |
|
|
|
|
.unwrap(); |
|
|
|
|
|
|
|
|
|
_ = sub.sender.send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff.to_vec()))).await; |
|
|
|
|
let is_multi = |
|
|
|
|
pred_shape.maxCardinality > 1 || pred_shape.maxCardinality == -1; |
|
|
|
|
let is_object = |
|
|
|
|
pred_shape.dataTypes.iter().any(|dt| !dt.shape.is_none()); |
|
|
|
|
let pred_name = &pred_shape.readablePredicate; |
|
|
|
|
path.push(pred_name); |
|
|
|
|
let path_str = path.join("/"); |
|
|
|
|
|
|
|
|
|
// Depending on the predicate type, add the respective diff operation.
|
|
|
|
|
|
|
|
|
|
// Single primitive value
|
|
|
|
|
if !is_multi && !is_object { |
|
|
|
|
if pred_change.values_added.len() > 0 { |
|
|
|
|
patches.push(OrmDiffOp { |
|
|
|
|
op: OrmDiffOpType::add, |
|
|
|
|
valType: None, |
|
|
|
|
path: path_str.clone(), |
|
|
|
|
value: Some(json!(pred_change.values_added[0])), |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
if pred_change.values_removed.len() > 0 { |
|
|
|
|
patches.push(OrmDiffOp { |
|
|
|
|
op: OrmDiffOpType::remove, |
|
|
|
|
valType: None, |
|
|
|
|
path: path_str, |
|
|
|
|
value: Some(json!(pred_change.values_added[0])), |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
} else if is_multi && !is_object { |
|
|
|
|
// Set of primitive values
|
|
|
|
|
if pred_change.values_added.len() > 0 { |
|
|
|
|
patches.push(OrmDiffOp { |
|
|
|
|
op: OrmDiffOpType::add, |
|
|
|
|
valType: Some(OrmDiffType::set), |
|
|
|
|
path: path_str.clone(), |
|
|
|
|
value: Some(json!(pred_change.values_added)), |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
if pred_change.values_removed.len() > 0 { |
|
|
|
|
patches.push(OrmDiffOp { |
|
|
|
|
op: OrmDiffOpType::remove, |
|
|
|
|
valType: Some(OrmDiffType::set), |
|
|
|
|
path: path_str, |
|
|
|
|
value: Some(json!(pred_change.values_removed)), |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
} else if !is_multi && is_object { |
|
|
|
|
// Single object.
|
|
|
|
|
if pred_change.values_added.len() > 0 { |
|
|
|
|
// Object was added. That means, we need to add a basic object with no value,
|
|
|
|
|
// Then add further predicates to it in a recursive call.
|
|
|
|
|
patches.push(OrmDiffOp { |
|
|
|
|
op: OrmDiffOpType::add, |
|
|
|
|
valType: Some(OrmDiffType::object), |
|
|
|
|
path: path_str.clone(), |
|
|
|
|
value: None, |
|
|
|
|
}); |
|
|
|
|
// Apply changes for nested object.
|
|
|
|
|
create_patches_for_changed_subj( |
|
|
|
|
orm_changes, |
|
|
|
|
patches, |
|
|
|
|
&pred_shape.dataTypes[0].shape.unwrap(), // TODO: We need to get to the information which of the object types was validated successfully.
|
|
|
|
|
match pred_change.values_added[0] { |
|
|
|
|
BasicType::Str(str) => &str, |
|
|
|
|
_ => panic!("Nested object should be a string."), |
|
|
|
|
}, |
|
|
|
|
sub, |
|
|
|
|
path, |
|
|
|
|
); |
|
|
|
|
} else { |
|
|
|
|
// Object is removed.
|
|
|
|
|
patches.push(OrmDiffOp { |
|
|
|
|
op: OrmDiffOpType::remove, |
|
|
|
|
valType: Some(OrmDiffType::object), |
|
|
|
|
path: path_str, |
|
|
|
|
value: None, |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
} else if is_multi && is_object { |
|
|
|
|
// Add every object added.
|
|
|
|
|
for object_iri_added in pred_change.values_added { |
|
|
|
|
// TODO: As with single object, just that we add the IRI to the path.
|
|
|
|
|
// We also need to check if the object existed before.
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Delete every object removed.
|
|
|
|
|
if tracked_predicate.children.len() == 0 { |
|
|
|
|
// Or the whole thing if no children remain
|
|
|
|
|
patches.push(OrmDiffOp { |
|
|
|
|
op: OrmDiffOpType::remove, |
|
|
|
|
valType: Some(OrmDiffType::object), |
|
|
|
|
path: path_str, |
|
|
|
|
value: None, |
|
|
|
|
}); |
|
|
|
|
} else { |
|
|
|
|
for object_iri_removed in pred_change.values_removed { |
|
|
|
|
patches.push(OrmDiffOp { |
|
|
|
|
op: OrmDiffOpType::remove, |
|
|
|
|
valType: Some(OrmDiffType::object), |
|
|
|
|
path: format!( |
|
|
|
|
"{}/{}", |
|
|
|
|
path_str, |
|
|
|
|
match object_iri_removed { |
|
|
|
|
BasicType::Str(iri) => iri, |
|
|
|
|
_ => panic!("Object IRI must be string"), |
|
|
|
|
} |
|
|
|
|
), |
|
|
|
|
value: None, |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Is the tracked subject valid and was it before?
|
|
|
|
|
// Just regular patch for each predicate.
|
|
|
|
|
// If the predicate is a nested object...
|
|
|
|
|
// We need to recurse. Do the same as here but append to the path.
|
|
|
|
|
|
|
|
|
|
// Is the tracked subject invalid and was valid before?
|
|
|
|
|
// Delete of root
|
|
|
|
|
|
|
|
|
|
// Did the subject become valid but was invalid before?
|
|
|
|
|
// We need to compose a orm object (based on the data we luckily have already).
|
|
|
|
|
|
|
|
|
|
// Was the subject invalid and is still invalid?
|
|
|
|
|
// Nothing to do.
|
|
|
|
|
|
|
|
|
|
// Remove to this predicate name from the path again.
|
|
|
|
|
path.pop(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// For each tracked subject that has the subscription's shape, call fn above
|
|
|
|
|
for subject_iri in sub.tracked_subjects { // TODO
|
|
|
|
|
} |
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
let orm_diff: OrmDiff = vec![]; |
|
|
|
|
let _ = sub |
|
|
|
|
.sender |
|
|
|
|
.send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff.to_vec()))) |
|
|
|
|
.await; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// After creating new objects (without an id) in JS-land,
|
|
|
|
|
/// we send the generated id for those back.
|
|
|
|
|
/// If something went wrong (revert_inserts / revert_removes not empty),
|
|
|
|
|
/// we send a JSON patch back to revert the made changes.
|
|
|
|
|
pub(crate) async fn orm_update_self( |
|
|
|
|
&mut self, |
|
|
|
|
scope: &NuriV0, |
|
|
|
@ -695,7 +913,6 @@ impl Verifier { |
|
|
|
|
revert_inserts: Vec<Quad>, |
|
|
|
|
revert_removes: Vec<Quad>, |
|
|
|
|
) -> Result<(), VerifierError> { |
|
|
|
|
|
|
|
|
|
let (mut sender, orm_subscription) = |
|
|
|
|
self.get_first_orm_subscription_sender_for(scope, Some(&shape_iri), Some(&session_id))?; |
|
|
|
|
|
|
|
|
@ -703,7 +920,11 @@ impl Verifier { |
|
|
|
|
// use orm_subscription if needed
|
|
|
|
|
// note(niko): I think skolemnized blank nodes can still be many, in case of multi-level nested sub-objects.
|
|
|
|
|
let orm_bnids = vec![]; |
|
|
|
|
let _ = sender.send(AppResponse::V0(AppResponseV0::OrmUpdateBlankNodeIds(orm_bnids))).await; |
|
|
|
|
let _ = sender |
|
|
|
|
.send(AppResponse::V0(AppResponseV0::OrmUpdateBlankNodeIds( |
|
|
|
|
orm_bnids, |
|
|
|
|
))) |
|
|
|
|
.await; |
|
|
|
|
|
|
|
|
|
// TODO (later) revert the inserts and removes
|
|
|
|
|
// let orm_diff = vec![];
|
|
|
|
@ -712,6 +933,7 @@ impl Verifier { |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Handles updates coming from JS-land (JSON patches).
|
|
|
|
|
pub(crate) async fn orm_frontend_update( |
|
|
|
|
&mut self, |
|
|
|
|
session_id: u64, |
|
|
|
@ -727,7 +949,6 @@ impl Verifier { |
|
|
|
|
diff |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
// find OrmSubscription
|
|
|
|
|
let (doc_nuri, sparql_update) = { |
|
|
|
|
let orm_subscription = |
|
|
|
|
self.get_first_orm_subscription_for(scope, Some(&shape_iri), Some(&session_id)); |
|
|
|
@ -763,7 +984,8 @@ impl Verifier { |
|
|
|
|
revert_inserts, |
|
|
|
|
revert_removes, |
|
|
|
|
) |
|
|
|
|
.await.map_err(|e|e.to_string())?; |
|
|
|
|
.await |
|
|
|
|
.map_err(|e| e.to_string())?; |
|
|
|
|
} |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
@ -808,7 +1030,9 @@ impl Verifier { |
|
|
|
|
let orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type)?; |
|
|
|
|
// log_debug!("create_orm_object_for_shape return {:?}", orm_objects);
|
|
|
|
|
|
|
|
|
|
let _ = tx.send(AppResponse::V0(AppResponseV0::OrmInitial(orm_objects))).await; |
|
|
|
|
let _ = tx |
|
|
|
|
.send(AppResponse::V0(AppResponseV0::OrmInitial(orm_objects))) |
|
|
|
|
.await; |
|
|
|
|
|
|
|
|
|
let close = Box::new(move || { |
|
|
|
|
log_debug!("closing ORM subscription"); |
|
|
|
|