handle_backend_update generates valid patches

feat/orm-diffs
Laurin Weger 3 days ago
parent e63941054a
commit 197e010034
No known key found for this signature in database
GPG Key ID: 9B372BB0B792770F
  1. 90
      engine/verifier/src/orm/handle_backend_update.rs
  2. 7
      engine/verifier/src/orm/materialize.rs
  3. 13
      engine/verifier/src/orm/process_changes.rs
  4. 2
      sdk/rust/src/tests/mod.rs
  5. 10
      sdk/rust/src/tests/orm_patches.rs

@ -68,8 +68,6 @@ impl Verifier {
}) })
.collect(); .collect();
// let mut updates = Vec::new();
let mut scopes = vec![]; let mut scopes = vec![];
for (scope, subs) in self.orm_subscriptions.iter_mut() { for (scope, subs) in self.orm_subscriptions.iter_mut() {
// Remove old subscriptions // Remove old subscriptions
@ -86,34 +84,37 @@ impl Verifier {
} }
// prepare to apply updates to tracked subjects and record the changes. // prepare to apply updates to tracked subjects and record the changes.
let root_shapes = subs let root_shapes_and_tracked_subjects = subs
.iter() .iter()
.map(|sub| { .map(|sub| {
sub.shape_type (
.schema sub.shape_type
.get(&sub.shape_type.shape) .schema
.unwrap() .get(&sub.shape_type.shape)
.clone() .unwrap()
.clone(),
shapes_in_tracked_subjects(&sub.tracked_subjects),
)
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
scopes.push((scope.clone(), root_shapes)); scopes.push((scope.clone(), root_shapes_and_tracked_subjects));
} }
log_debug!( log_debug!(
"[orm_backend_update], creating patch objects for #scopes {}", "[orm_backend_update], creating patch objects for #scopes {}",
scopes.len() scopes.len()
); );
for (scope, shapes) in scopes { for (scope, shapes_zip) in scopes {
let mut orm_changes: OrmChanges = HashMap::new(); let mut orm_changes: OrmChanges = HashMap::new();
// Apply the changes to tracked subjects. // Apply the changes to tracked subjects.
for shape_arc in shapes { for (root_shape_arc, all_shapes) in shapes_zip {
let shape_iri = shape_arc.iri.clone(); let shape_iri = root_shape_arc.iri.clone();
let _ = self.process_changes_for_shape_and_session( let _ = self.process_changes_for_shape_and_session(
&scope, &scope,
&shape_iri, &shape_iri,
shape_arc, all_shapes,
session_id, session_id,
&triple_inserts, &triple_inserts,
&triple_removes, &triple_removes,
@ -159,6 +160,11 @@ impl Verifier {
// Iterate over all changes and create patches // Iterate over all changes and create patches
for (shape_iri, subject_changes) in &orm_changes { for (shape_iri, subject_changes) in &orm_changes {
for (subject_iri, change) in subject_changes { for (subject_iri, change) in subject_changes {
log_debug!(
"Patch creating for subject change {}. #changed preds: {}",
subject_iri,
change.predicates.len()
);
// Get the tracked subject for this (subject, shape) pair // Get the tracked subject for this (subject, shape) pair
let tracked_subject = sub let tracked_subject = sub
.tracked_subjects .tracked_subjects
@ -201,6 +207,13 @@ impl Verifier {
// The subject is valid or has become valid. // The subject is valid or has become valid.
// Process each predicate change // Process each predicate change
for (_pred_iri, pred_change) in &change.predicates { for (_pred_iri, pred_change) in &change.predicates {
log_debug!(
" - Predicate changes: {}; #Adds: {}; #Removes {}",
_pred_iri,
pred_change.values_added.len(),
pred_change.values_removed.len()
);
let tracked_predicate = let tracked_predicate =
pred_change.tracked_predicate.read().unwrap(); pred_change.tracked_predicate.read().unwrap();
let pred_name = tracked_predicate.schema.readablePredicate.clone(); let pred_name = tracked_predicate.schema.readablePredicate.clone();
@ -430,24 +443,16 @@ fn build_path_to_root_and_create_patches(
patches: &mut Vec<OrmDiffOp>, patches: &mut Vec<OrmDiffOp>,
objects_to_create: &mut HashSet<(Vec<String>, Option<SubjectIri>)>, objects_to_create: &mut HashSet<(Vec<String>, Option<SubjectIri>)>,
) { ) {
log_debug!(
" - build path, ts: {}, path {:?}",
tracked_subject.subject_iri,
path
);
// If the tracked subject is not valid, we don't create patches for it // If the tracked subject is not valid, we don't create patches for it
if tracked_subject.valid != OrmTrackedSubjectValidity::Valid { if tracked_subject.valid != OrmTrackedSubjectValidity::Valid {
return; return;
} }
// If the tracked subject is newly valid (was not valid before but is now),
// we need to ensure the object is created with an "add object" patch
if tracked_subject.prev_valid != OrmTrackedSubjectValidity::Valid {
queue_patches_for_newly_valid_subject(
tracked_subject,
tracked_subjects,
root_shape,
path,
patches,
objects_to_create,
);
}
// If this subject has no parents or its shape matches the root shape, we've reached the root // If this subject has no parents or its shape matches the root shape, we've reached the root
if tracked_subject.parents.is_empty() || tracked_subject.shape.iri == *root_shape { if tracked_subject.parents.is_empty() || tracked_subject.shape.iri == *root_shape {
// Build the final JSON Pointer path // Build the final JSON Pointer path
@ -459,7 +464,7 @@ fn build_path_to_root_and_create_patches(
escaped_path.join("/") escaped_path.join("/")
); );
// Create the patch // Create the patch for the actual value change
patches.push(OrmDiffOp { patches.push(OrmDiffOp {
op: diff_op.0.clone(), op: diff_op.0.clone(),
valType: diff_op.1.clone(), valType: diff_op.1.clone(),
@ -467,6 +472,20 @@ fn build_path_to_root_and_create_patches(
value: diff_op.2.clone(), value: diff_op.2.clone(),
}); });
// If the subject is newly valid, now we have the full path to queue its creation.
if tracked_subject.prev_valid != OrmTrackedSubjectValidity::Valid {
let mut final_path = vec![tracked_subject.subject_iri.clone()];
final_path.extend_from_slice(path);
queue_patches_for_newly_valid_subject(
tracked_subject,
tracked_subjects,
root_shape,
&final_path,
patches,
objects_to_create,
);
}
return; return;
} }
@ -475,13 +494,14 @@ fn build_path_to_root_and_create_patches(
let parent_ts = parent_tracked_subject.read().unwrap(); let parent_ts = parent_tracked_subject.read().unwrap();
// Build the path segment for this parent // Build the path segment for this parent
if let Some(new_path) = build_path_segment_for_parent(tracked_subject, &parent_ts, path) { if let Some(mut new_path) = build_path_segment_for_parent(tracked_subject, &parent_ts, path)
{
// Recurse to the parent // Recurse to the parent
build_path_to_root_and_create_patches( build_path_to_root_and_create_patches(
&parent_ts, &parent_ts,
tracked_subjects, tracked_subjects,
root_shape, root_shape,
&mut new_path.clone(), &mut new_path,
diff_op.clone(), diff_op.clone(),
patches, patches,
objects_to_create, objects_to_create,
@ -584,3 +604,15 @@ fn create_diff_ops_from_predicate_change(
// } // }
return ops; return ops;
} }
fn shapes_in_tracked_subjects(
tracked_subjects: &HashMap<String, HashMap<String, Arc<RwLock<OrmTrackedSubject>>>>,
) -> Vec<Arc<OrmSchemaShape>> {
let mut shapes = vec![];
for (_subject_iri, tss) in tracked_subjects.iter() {
for (_shape_iri, ts) in tss.iter() {
shapes.push(ts.read().unwrap().shape.clone());
}
}
shapes
}

@ -103,16 +103,13 @@ impl Verifier {
let mut return_vals: Value = Value::Array(vec![]); let mut return_vals: Value = Value::Array(vec![]);
let return_val_vec = return_vals.as_array_mut().unwrap(); let return_val_vec = return_vals.as_array_mut().unwrap();
// log_debug!( log_debug!("\nMaterializing: {}", shape_type.shape);
// "Tracked subjects:\n{:?}\n",
// orm_subscription.tracked_subjects,
// );
// For each valid change struct, we build an orm object. // For each valid change struct, we build an orm object.
// The way we get the changes from the tracked subjects is a bit hacky, sorry. // The way we get the changes from the tracked subjects is a bit hacky, sorry.
for (subject_iri, tracked_subjects_by_shape) in &orm_subscription.tracked_subjects { for (subject_iri, tracked_subjects_by_shape) in &orm_subscription.tracked_subjects {
if let Some(tracked_subject) = tracked_subjects_by_shape.get(&shape_type.shape) { if let Some(tracked_subject) = tracked_subjects_by_shape.get(&shape_type.shape) {
let ts = tracked_subject.read().unwrap(); let ts = tracked_subject.read().unwrap();
log_info!("changes for: {:?} valid: {:?}\n", ts.subject_iri, ts.valid); log_info!(" - changes for: {:?} valid: {:?}", ts.subject_iri, ts.valid);
if ts.valid == OrmTrackedSubjectValidity::Valid { if ts.valid == OrmTrackedSubjectValidity::Valid {
if let Some(change) = changes if let Some(change) = changes

@ -89,7 +89,7 @@ impl Verifier {
&mut self, &mut self,
nuri: &NuriV0, nuri: &NuriV0,
root_shape_iri: &String, root_shape_iri: &String,
shape: Arc<OrmSchemaShape>, shapes: Vec<Arc<OrmSchemaShape>>,
session_id: u64, session_id: u64,
triples_added: &[Triple], triples_added: &[Triple],
triples_removed: &[Triple], triples_removed: &[Triple],
@ -101,7 +101,9 @@ impl Verifier {
// Track (shape_iri, subject_iri) pairs currently being validated to prevent cycles and double evaluation. // Track (shape_iri, subject_iri) pairs currently being validated to prevent cycles and double evaluation.
let mut currently_validating: HashSet<(String, String)> = HashSet::new(); let mut currently_validating: HashSet<(String, String)> = HashSet::new();
// Add root shape for first validation run. // Add root shape for first validation run.
shape_validation_stack.push((shape, vec![])); for shape in shapes {
shape_validation_stack.push((shape, vec![]));
}
// Process queue of shapes and subjects to validate. // Process queue of shapes and subjects to validate.
// For a given shape, we evaluate every subject against that shape. // For a given shape, we evaluate every subject against that shape.
@ -127,6 +129,7 @@ impl Verifier {
shape.iri shape.iri
); );
// For each modified subject, apply changes to tracked subjects and validate.
for subject_iri in &modified_subject_iris { for subject_iri in &modified_subject_iris {
let validation_key = (shape.iri.clone(), subject_iri.to_string()); let validation_key = (shape.iri.clone(), subject_iri.to_string());
@ -248,6 +251,8 @@ impl Verifier {
} }
// Validate the subject. // Validate the subject.
// need_eval contains elements in reverse priority (last element to be validated first)
// TODO: Improve order by distinguishing between parents, children and self to be re-evaluated.
let need_eval = Self::update_subject_validity(change, &shape, orm_subscription); let need_eval = Self::update_subject_validity(change, &shape, orm_subscription);
// We add the need_eval to be processed next after loop. // We add the need_eval to be processed next after loop.
@ -296,7 +301,7 @@ impl Verifier {
self.process_changes_for_shape_and_session( self.process_changes_for_shape_and_session(
nuri, nuri,
&root_shape_iri, &root_shape_iri,
shape_arc.clone(), [shape_arc.clone()].to_vec(),
session_id, session_id,
&new_triples, &new_triples,
&vec![], &vec![],
@ -356,7 +361,7 @@ impl Verifier {
self.process_changes_for_shape_and_session( self.process_changes_for_shape_and_session(
nuri, nuri,
&shape_iri, &shape_iri,
root_shape, [root_shape].to_vec(),
session_id, session_id,
triples_added, triples_added,
triples_removed, triples_removed,

@ -51,7 +51,7 @@ pub(crate) fn assert_json_eq(expected: &mut Value, actual: &mut Value) {
let diff = serde_json_diff::values(expected.clone(), actual.clone()); let diff = serde_json_diff::values(expected.clone(), actual.clone());
if let Some(diff_) = diff { if let Some(diff_) = diff {
log_err!( log_err!(
"Expected and actual ORM JSON mismatch.\nDiff: {:?}\nExpected: {}\nActual: {}", "Expected and actual JSON mismatch.\nDiff: {:?}\nExpected: {}\nActual: {}",
diff_, diff_,
expected, expected,
actual actual

@ -7,7 +7,7 @@
// notice may not be copied, modified, or distributed except // notice may not be copied, modified, or distributed except
// according to those terms. // according to those terms.
use crate::local_broker::{doc_create, doc_sparql_construct, doc_sparql_update, orm_start}; use crate::local_broker::{doc_sparql_update, orm_start};
use crate::tests::create_or_open_wallet::create_or_open_wallet; use crate::tests::create_or_open_wallet::create_or_open_wallet;
use crate::tests::{assert_json_eq, create_doc_with_data}; use crate::tests::{assert_json_eq, create_doc_with_data};
use async_std::stream::StreamExt; use async_std::stream::StreamExt;
@ -864,20 +864,12 @@ INSERT DATA {
let mut expected = json!([ let mut expected = json!([
// Modified house color // Modified house color
{
"op": "remove",
"path": "/urn:test:house1/rootColor",
},
{ {
"op": "add", "op": "add",
"value": "red", "value": "red",
"path": "/urn:test:house1/rootColor", "path": "/urn:test:house1/rootColor",
}, },
// Modified Alice's name // Modified Alice's name
{
"op": "remove",
"path": "/urn:test:house1/inhabitants/urn:test:person1/name",
},
{ {
"op": "add", "op": "add",
"value": "Alicia", "value": "Alicia",

Loading…
Cancel
Save