Compare commits

...

2 Commits

  1. 510
      engine/verifier/src/orm/mod.rs
  2. 9
      engine/verifier/src/orm/utils.rs

@ -725,277 +725,333 @@ impl Verifier {
let subs = self.orm_subscriptions.get(&scope).unwrap(); let subs = self.orm_subscriptions.get(&scope).unwrap();
for sub in subs.iter() { for sub in subs.iter() {
// TODO: This if-condition is wrong (intended to not re-apply changes coming from the same subscription). // TODO: This if-condition is wrong (intended to not re-apply changes coming from the same subscription).
if sub.session_id != session_id { if sub.session_id == session_id {
// Create diff from changes & subscription. continue;
}
let mut patches: OrmDiff = vec![]; // Create diff from changes & subscription.
let mut path: Vec<String> = vec![];
fn create_patches_for_changed_subj( fn create_patches_for_nested_object(
orm_changes: &OrmChanges, pred_shape: &OrmSchemaPredicate,
patches: &mut OrmDiff, tracked_subjects: &HashMap<
shape_iri: &String, String,
subject_iri: &String, HashMap<String, Arc<RwLock<OrmTrackedSubject>>>,
sub: &OrmSubscription, >,
path: &mut Vec<String>, patches: &mut Vec<OrmDiffOp>,
tracked_subjects: &HashMap< path: &mut Vec<String>,
SubjectIri, object_iri: &String,
HashMap<ShapeIri, Arc<RwLock<OrmTrackedSubject>>>, orm_changes: &OrmChanges,
>, sub: &OrmSubscription,
) { ) {
let change = orm_changes // Object was added. That means, we need to add a basic object with no value,
.get(shape_iri) // Then add further predicates to it in a recursive call.
.unwrap() patches.push(OrmDiffOp {
.get(subject_iri) op: OrmDiffOpType::add,
.unwrap(); valType: Some(OrmDiffType::object),
let subject_shape = sub.shape_type.schema.get(shape_iri).unwrap(); path: format!("/{}", path.join("/")),
value: None,
});
// @Niko, is it safe to do this? // Get the shape IRI for a nested object that is valid.
let tracked_subject = tracked_subjects let object_shape_iri = {
.get(subject_iri) // Get the tracked subject for this object IRI
.unwrap() let tracked_subjects_for_obj = tracked_subjects
.get(shape_iri) .get(object_iri)
.expect("Object should be tracked");
// Find the first valid shape for this object from the allowed shapes
let allowed_shape_iris: Vec<&String> = pred_shape
.dataTypes
.iter()
.filter_map(|dt| dt.shape.as_ref())
.collect();
allowed_shape_iris
.iter()
.find(|shape_iri| {
tracked_subjects_for_obj
.get(**shape_iri)
.map(|ts| {
ts.read().unwrap().valid == OrmTrackedSubjectValidity::Valid
})
.unwrap_or(false)
})
.unwrap() .unwrap()
.read() .to_string()
.unwrap(); };
// Check validity changes // Apply changes for nested object.
if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Invalid create_patches_for_changed_subj(
&& tracked_subject.valid == OrmTrackedSubjectValidity::Invalid orm_changes,
{ patches,
// Is the subject invalid and was it before? There is nothing we need to inform about. &object_shape_iri,
return; &object_iri,
} else if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Valid sub,
&& tracked_subject.valid == OrmTrackedSubjectValidity::Invalid path,
|| tracked_subject.valid == OrmTrackedSubjectValidity::Untracked tracked_subjects,
{ );
// Has the subject become invalid or untracked? }
// We add a patch, deleting the object at its root.
fn create_patches_for_changed_subj(
orm_changes: &OrmChanges,
patches: &mut OrmDiff,
shape_iri: &String,
subject_iri: &String,
sub: &OrmSubscription,
path: &mut Vec<String>,
tracked_subjects: &HashMap<
SubjectIri,
HashMap<ShapeIri, Arc<RwLock<OrmTrackedSubject>>>,
>,
) {
let change = orm_changes
.get(shape_iri)
.unwrap()
.get(subject_iri)
.unwrap();
let subject_shape = sub.shape_type.schema.get(shape_iri).unwrap();
// @Niko, is it safe to do this?
let tracked_subject = tracked_subjects
.get(subject_iri)
.unwrap()
.get(shape_iri)
.unwrap()
.read()
.unwrap();
// Check validity changes
if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Invalid
&& tracked_subject.valid == OrmTrackedSubjectValidity::Invalid
{
// Is the subject invalid and was it before? There is nothing we need to inform about.
return;
} else if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Valid
&& tracked_subject.valid == OrmTrackedSubjectValidity::Invalid
|| tracked_subject.valid == OrmTrackedSubjectValidity::Untracked
{
// Has the subject become invalid or untracked?
// We add a patch, deleting the object at its root.
patches.push(OrmDiffOp {
op: OrmDiffOpType::remove,
valType: Some(OrmDiffType::object),
path: format!("/{}", path.join("/")),
value: None,
});
return;
} else {
// The subject is valid or has become valid.
// In both cases, all information necessary to send patches are available in the orm_changes.
// In case the subject was not valid before, we create the object at the current path as an empty object though.
if tracked_subject.prev_valid != OrmTrackedSubjectValidity::Valid {
patches.push(OrmDiffOp { patches.push(OrmDiffOp {
op: OrmDiffOpType::remove, op: OrmDiffOpType::add,
valType: Some(OrmDiffType::object), valType: Some(OrmDiffType::object),
path: path.join("/"), path: format!("/{}", path.join("/")),
value: None, value: None,
}); });
return; // And add the id field.
} else { patches.push(OrmDiffOp {
// The subject is valid or has become valid. op: OrmDiffOpType::add,
// In both cases, all information necessary to send patches are available in the orm_changes. valType: None,
path: format!("/{}/{}", path.join("/"), subject_iri),
value: None,
});
}
}
// Iterate over every predicate change and create patches
for (pred_iri, pred_change) in change.predicates.iter() {
let pred_shape = subject_shape
.predicates
.iter()
.find(|p| p.iri == *pred_iri)
.unwrap();
let is_multi =
pred_shape.maxCardinality > 1 || pred_shape.maxCardinality == -1;
let is_object = pred_shape.dataTypes.iter().any(|dt| !dt.shape.is_none());
let pred_name = pred_shape.readablePredicate.clone();
path.push(pred_name);
let path_str = format!("/{}", path.join("/"));
// In case the subject was not valid before, we create the object at the current path as an empty object though. // Depending on the predicate type (multi / single, object / not object),
if tracked_subject.prev_valid != OrmTrackedSubjectValidity::Valid { // add the respective diff operation.
// Single primitive value
if !is_multi && !is_object {
if pred_change.values_added.len() > 0 {
patches.push(OrmDiffOp { patches.push(OrmDiffOp {
op: OrmDiffOpType::add, op: OrmDiffOpType::add,
valType: Some(OrmDiffType::object), valType: None,
path: path.join("/"), path: path_str.clone(),
value: None, value: Some(json!(pred_change.values_added[0])),
}) });
} }
} if pred_change.values_removed.len() > 0 {
patches.push(OrmDiffOp {
// Iterate over every predicate change and create patches op: OrmDiffOpType::remove,
for (pred_iri, pred_change) in change.predicates.iter() { valType: None,
let pred_shape = subject_shape path: path_str,
.predicates value: Some(json!(pred_change.values_added[0])),
.iter() });
.find(|p| p.iri == *pred_iri) }
.unwrap(); } else if is_multi && !is_object {
// Set of primitive values
let is_multi = if pred_change.values_added.len() > 0 {
pred_shape.maxCardinality > 1 || pred_shape.maxCardinality == -1; patches.push(OrmDiffOp {
let is_object = op: OrmDiffOpType::add,
pred_shape.dataTypes.iter().any(|dt| !dt.shape.is_none()); valType: Some(OrmDiffType::set),
let pred_name = pred_shape.readablePredicate.clone(); path: path_str.clone(),
path.push(pred_name); value: Some(json!(pred_change.values_added)),
let path_str = path.join("/"); });
}
// Depending on the predicate type (multi / single, object / not object), if pred_change.values_removed.len() > 0 {
// add the respective diff operation. patches.push(OrmDiffOp {
op: OrmDiffOpType::remove,
// Single primitive value valType: Some(OrmDiffType::set),
if !is_multi && !is_object { path: path_str,
if pred_change.values_added.len() > 0 { value: Some(json!(pred_change.values_removed)),
patches.push(OrmDiffOp { });
op: OrmDiffOpType::add, }
valType: None, } else if is_object {
path: path_str.clone(), // Change in single object property.
value: Some(json!(pred_change.values_added[0])), if !is_multi {
}); let object_iri = match &pred_change.values_added[0] {
} BasicType::Str(iri) => iri,
if pred_change.values_removed.len() > 0 { _ => panic!("Object no IRI"),
patches.push(OrmDiffOp { };
op: OrmDiffOpType::remove, // Single object.
valType: None,
path: path_str,
value: Some(json!(pred_change.values_added[0])),
});
}
} else if is_multi && !is_object {
// Set of primitive values
if pred_change.values_added.len() > 0 { if pred_change.values_added.len() > 0 {
patches.push(OrmDiffOp { create_patches_for_nested_object(
op: OrmDiffOpType::add, pred_shape,
valType: Some(OrmDiffType::set), tracked_subjects,
path: path_str.clone(), patches,
value: Some(json!(pred_change.values_added)), path,
}); object_iri,
orm_changes,
sub,
);
} }
if pred_change.values_removed.len() > 0 { if pred_change.values_removed.len() > 0 {
// Object is removed.
patches.push(OrmDiffOp { patches.push(OrmDiffOp {
op: OrmDiffOpType::remove, op: OrmDiffOpType::remove,
valType: Some(OrmDiffType::set), valType: Some(OrmDiffType::object),
path: path_str, path: path_str,
value: Some(json!(pred_change.values_removed)), value: None,
}); });
} }
} else if is_object { } else {
fn create_patches_for_nested_object( // Change(s) in multi object property.
object_iri: &String,
pred_shape: &OrmSchemaPredicate, // Add every new object.
tracked_subjects: &HashMap< for obj_iri_bt in pred_change.values_added.iter() {
String, let obj_iri = match obj_iri_bt {
HashMap<String, Arc<RwLock<OrmTrackedSubject>>>, BasicType::Str(iri) => iri,
>, _ => panic!("Object no IRI"),
patches: &mut Vec<OrmDiffOp>, };
path: &mut Vec<String>,
pred_change: &OrmTrackedPredicateChanges, // First, we create a root object (if the object existed before, this has no effect).
orm_changes: &mut OrmChanges,
sub: &OrmSubscription,
) {
// Object was added. That means, we need to add a basic object with no value,
// Then add further predicates to it in a recursive call.
patches.push(OrmDiffOp { patches.push(OrmDiffOp {
op: OrmDiffOpType::add, op: OrmDiffOpType::add,
valType: Some(OrmDiffType::object), valType: Some(OrmDiffType::object),
path: path.join("/"), path: path_str.clone(),
value: None, value: None,
}); });
let object_iri = match &pred_change.values_added[0] { // Add escaped object IRI to path.
BasicType::Str(iri) => iri, path.push(escape_json_pointer(obj_iri));
_ => panic!("object not an IRI"),
};
/// Get the shape IRI for a nested object that is valid.
let object_shape_iri = {
// Get the tracked subject for this object IRI
let tracked_subjects_for_obj = tracked_subjects
.get(object_iri)
.expect("Object should be tracked");
// Find the first valid shape for this object from the allowed shapes
let allowed_shape_iris: Vec<&String> = pred_shape
.dataTypes
.iter()
.filter_map(|dt| dt.shape.as_ref())
.collect();
allowed_shape_iris
.iter()
.find(|shape_iri| {
tracked_subjects_for_obj
.get(**shape_iri)
.map(|ts| {
ts.read().unwrap().valid
== OrmTrackedSubjectValidity::Valid
})
.unwrap_or(false)
})
.unwrap()
.to_string()
};
// Apply changes for nested object. create_patches_for_nested_object(
create_patches_for_changed_subj( pred_shape,
orm_changes, tracked_subjects,
patches, patches,
&object_shape_iri,
&object_iri,
sub,
path, path,
tracked_subjects, obj_iri,
orm_changes,
sub,
); );
// Remove object IRI from stack again.
path.pop();
} }
if !is_multi { // Delete objects.
// Single object. // If there are no more predicates, delete the whole object.
if pred_change.values_added.len() > 0 { if pred_change
} else { .tracked_predicate
// Object is removed. .read()
patches.push(OrmDiffOp { .unwrap()
op: OrmDiffOpType::remove, .tracked_children
valType: Some(OrmDiffType::object), .len()
path: path_str, == 0
value: None, {
}); // Or the whole thing if no children remain
} patches.push(OrmDiffOp {
op: OrmDiffOpType::remove,
valType: Some(OrmDiffType::object),
path: path_str,
value: None,
});
} else { } else {
// is_multi && is_object for object_iri_removed in pred_change.values_removed.iter() {
// Add every new object.
for object_iri_added in pred_change.values_added.iter() {
// If object did not exist before, we create a root patch.
// We also need to check if the object existed before.
}
// Delete every object removed.
// If there are no more predicates, delete the whole object.
if pred_change
.tracked_predicate
.read()
.unwrap()
.tracked_children
.len()
== 0
{
// Or the whole thing if no children remain
patches.push(OrmDiffOp { patches.push(OrmDiffOp {
op: OrmDiffOpType::remove, op: OrmDiffOpType::remove,
valType: Some(OrmDiffType::object), valType: Some(OrmDiffType::object),
path: path_str, path: format!(
"/{}/{}",
path_str,
match object_iri_removed {
BasicType::Str(iri) => iri,
_ => panic!("Object IRI must be string"),
}
),
value: None, value: None,
}); });
} else {
for object_iri_removed in pred_change.values_removed.iter()
{
patches.push(OrmDiffOp {
op: OrmDiffOpType::remove,
valType: Some(OrmDiffType::object),
path: format!(
"{}/{}",
path_str,
match object_iri_removed {
BasicType::Str(iri) => iri,
_ => panic!("Object IRI must be string"),
}
),
value: None,
});
}
} }
} }
} }
// Remove to this predicate name from the path again.
path.pop();
} }
}
// For each tracked subject that has the subscription's shape, call fn above // Remove this predicate name from the path again.
for subject_iri in sub.tracked_subjects.iter() { path.pop();
// Path.push
// Create changes
// path.pop
} }
// }
let orm_diff: OrmDiff = vec![]; let mut patches: OrmDiff = vec![];
let _ = sub let mut path: Vec<String> = Vec::with_capacity(4);
.sender
.clone() // Iterate over each root subject with the right shape
.send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff.to_vec()))) // For each tracked subject that has the subscription's shape, call fn above
.await; for (subject_iri, tracked_subjects_by_shape) in sub.tracked_subjects.iter() {
for (shape_iri, tracked_subject) in tracked_subjects_by_shape.iter() {
if *shape_iri != sub.shape_type.shape {
continue;
}
// Found a root subject for this shape.
// Add subject IRI as first part of path pointer.
path.push(escape_json_pointer(subject_iri));
create_patches_for_changed_subj(
&orm_changes,
&mut patches,
shape_iri,
subject_iri,
sub,
&mut path,
&sub.tracked_subjects,
);
path.pop();
}
} }
// Send response with patches.
let _ = sub
.sender
.clone()
.send(AppResponse::V0(AppResponseV0::OrmUpdate(patches.to_vec())))
.await;
} }
} }
} }
@ -1143,3 +1199,5 @@ impl Verifier {
Ok((rx, close)) Ok((rx, close))
} }
} }
// Btw, orm/mod.rs is exceeding 1200 lines again. Is that a good practice? I have the feeling, we could separate a couple of things..

@ -10,6 +10,7 @@
use ng_oxigraph::oxrdf::Subject; use ng_oxigraph::oxrdf::Subject;
use ng_repo::log::*; use ng_repo::log::*;
use ng_repo::types::OverlayId; use ng_repo::types::OverlayId;
use yrs::types::PathSegment;
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::HashSet; use std::collections::HashSet;
@ -321,3 +322,11 @@ pub fn nuri_to_string(nuri: &NuriV0) -> String {
let graph_name = NuriV0::repo_graph_name(repo_id, &overlay_id); let graph_name = NuriV0::repo_graph_name(repo_id, &overlay_id);
graph_name graph_name
} }
pub fn escape_json_pointer(path_segment: &String) -> String {
path_segment.replace("~", "~0").replace("/", "~1")
}
pub fn decode_join_pointer(path: &String) -> String {
path.replace("~1", "/").replace("~0", "~")
}

Loading…
Cancel
Save