diff --git a/engine/verifier/src/orm/mod.rs b/engine/verifier/src/orm/mod.rs index d660625..910e85b 100644 --- a/engine/verifier/src/orm/mod.rs +++ b/engine/verifier/src/orm/mod.rs @@ -725,277 +725,333 @@ impl Verifier { let subs = self.orm_subscriptions.get(&scope).unwrap(); for sub in subs.iter() { // TODO: This if-condition is wrong (intended to not re-apply changes coming from the same subscription). - if sub.session_id != session_id { - // Create diff from changes & subscription. - - let mut patches: OrmDiff = vec![]; - let mut path: Vec = vec![]; - fn create_patches_for_changed_subj( - orm_changes: &OrmChanges, - patches: &mut OrmDiff, - shape_iri: &String, - subject_iri: &String, - sub: &OrmSubscription, - path: &mut Vec, - tracked_subjects: &HashMap< - SubjectIri, - HashMap>>, - >, - ) { - let change = orm_changes - .get(shape_iri) - .unwrap() - .get(subject_iri) - .unwrap(); - let subject_shape = sub.shape_type.schema.get(shape_iri).unwrap(); + if sub.session_id == session_id { + continue; + } + // Create diff from changes & subscription. + + fn create_patches_for_nested_object( + pred_shape: &OrmSchemaPredicate, + tracked_subjects: &HashMap< + String, + HashMap>>, + >, + patches: &mut Vec, + path: &mut Vec, + object_iri: &String, + orm_changes: &OrmChanges, + sub: &OrmSubscription, + ) { + // Object was added. That means, we need to add a basic object with no value, + // Then add further predicates to it in a recursive call. + patches.push(OrmDiffOp { + op: OrmDiffOpType::add, + valType: Some(OrmDiffType::object), + path: format!("/{}", path.join("/")), + value: None, + }); - // @Niko, is it safe to do this? - let tracked_subject = tracked_subjects - .get(subject_iri) - .unwrap() - .get(shape_iri) + // Get the shape IRI for a nested object that is valid. + let object_shape_iri = { + // Get the tracked subject for this object IRI + let tracked_subjects_for_obj = tracked_subjects + .get(object_iri) + .expect("Object should be tracked"); + + // Find the first valid shape for this object from the allowed shapes + let allowed_shape_iris: Vec<&String> = pred_shape + .dataTypes + .iter() + .filter_map(|dt| dt.shape.as_ref()) + .collect(); + + allowed_shape_iris + .iter() + .find(|shape_iri| { + tracked_subjects_for_obj + .get(**shape_iri) + .map(|ts| { + ts.read().unwrap().valid == OrmTrackedSubjectValidity::Valid + }) + .unwrap_or(false) + }) .unwrap() - .read() - .unwrap(); + .to_string() + }; - // Check validity changes - if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Invalid - && tracked_subject.valid == OrmTrackedSubjectValidity::Invalid - { - // Is the subject invalid and was it before? There is nothing we need to inform about. - return; - } else if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Valid - && tracked_subject.valid == OrmTrackedSubjectValidity::Invalid - || tracked_subject.valid == OrmTrackedSubjectValidity::Untracked - { - // Has the subject become invalid or untracked? - // We add a patch, deleting the object at its root. + // Apply changes for nested object. + create_patches_for_changed_subj( + orm_changes, + patches, + &object_shape_iri, + &object_iri, + sub, + path, + tracked_subjects, + ); + } + + fn create_patches_for_changed_subj( + orm_changes: &OrmChanges, + patches: &mut OrmDiff, + shape_iri: &String, + subject_iri: &String, + sub: &OrmSubscription, + path: &mut Vec, + tracked_subjects: &HashMap< + SubjectIri, + HashMap>>, + >, + ) { + let change = orm_changes + .get(shape_iri) + .unwrap() + .get(subject_iri) + .unwrap(); + let subject_shape = sub.shape_type.schema.get(shape_iri).unwrap(); + + // @Niko, is it safe to do this? + let tracked_subject = tracked_subjects + .get(subject_iri) + .unwrap() + .get(shape_iri) + .unwrap() + .read() + .unwrap(); + + // Check validity changes + if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Invalid + && tracked_subject.valid == OrmTrackedSubjectValidity::Invalid + { + // Is the subject invalid and was it before? There is nothing we need to inform about. + return; + } else if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Valid + && tracked_subject.valid == OrmTrackedSubjectValidity::Invalid + || tracked_subject.valid == OrmTrackedSubjectValidity::Untracked + { + // Has the subject become invalid or untracked? + // We add a patch, deleting the object at its root. + patches.push(OrmDiffOp { + op: OrmDiffOpType::remove, + valType: Some(OrmDiffType::object), + path: format!("/{}", path.join("/")), + value: None, + }); + return; + } else { + // The subject is valid or has become valid. + // In both cases, all information necessary to send patches are available in the orm_changes. + + // In case the subject was not valid before, we create the object at the current path as an empty object though. + if tracked_subject.prev_valid != OrmTrackedSubjectValidity::Valid { patches.push(OrmDiffOp { - op: OrmDiffOpType::remove, + op: OrmDiffOpType::add, valType: Some(OrmDiffType::object), - path: path.join("/"), + path: format!("/{}", path.join("/")), value: None, }); - return; - } else { - // The subject is valid or has become valid. - // In both cases, all information necessary to send patches are available in the orm_changes. + // And add the id field. + patches.push(OrmDiffOp { + op: OrmDiffOpType::add, + valType: None, + path: format!("/{}/{}", path.join("/"), subject_iri), + value: None, + }); + } + } + + // Iterate over every predicate change and create patches + for (pred_iri, pred_change) in change.predicates.iter() { + let pred_shape = subject_shape + .predicates + .iter() + .find(|p| p.iri == *pred_iri) + .unwrap(); + + let is_multi = + pred_shape.maxCardinality > 1 || pred_shape.maxCardinality == -1; + let is_object = pred_shape.dataTypes.iter().any(|dt| !dt.shape.is_none()); + let pred_name = pred_shape.readablePredicate.clone(); + path.push(pred_name); + let path_str = format!("/{}", path.join("/")); - // In case the subject was not valid before, we create the object at the current path as an empty object though. - if tracked_subject.prev_valid != OrmTrackedSubjectValidity::Valid { + // Depending on the predicate type (multi / single, object / not object), + // add the respective diff operation. + + // Single primitive value + if !is_multi && !is_object { + if pred_change.values_added.len() > 0 { patches.push(OrmDiffOp { op: OrmDiffOpType::add, - valType: Some(OrmDiffType::object), - path: path.join("/"), - value: None, - }) + valType: None, + path: path_str.clone(), + value: Some(json!(pred_change.values_added[0])), + }); } - } - - // Iterate over every predicate change and create patches - for (pred_iri, pred_change) in change.predicates.iter() { - let pred_shape = subject_shape - .predicates - .iter() - .find(|p| p.iri == *pred_iri) - .unwrap(); - - let is_multi = - pred_shape.maxCardinality > 1 || pred_shape.maxCardinality == -1; - let is_object = - pred_shape.dataTypes.iter().any(|dt| !dt.shape.is_none()); - let pred_name = pred_shape.readablePredicate.clone(); - path.push(pred_name); - let path_str = path.join("/"); - - // Depending on the predicate type (multi / single, object / not object), - // add the respective diff operation. - - // Single primitive value - if !is_multi && !is_object { - if pred_change.values_added.len() > 0 { - patches.push(OrmDiffOp { - op: OrmDiffOpType::add, - valType: None, - path: path_str.clone(), - value: Some(json!(pred_change.values_added[0])), - }); - } - if pred_change.values_removed.len() > 0 { - patches.push(OrmDiffOp { - op: OrmDiffOpType::remove, - valType: None, - path: path_str, - value: Some(json!(pred_change.values_added[0])), - }); - } - } else if is_multi && !is_object { - // Set of primitive values + if pred_change.values_removed.len() > 0 { + patches.push(OrmDiffOp { + op: OrmDiffOpType::remove, + valType: None, + path: path_str, + value: Some(json!(pred_change.values_added[0])), + }); + } + } else if is_multi && !is_object { + // Set of primitive values + if pred_change.values_added.len() > 0 { + patches.push(OrmDiffOp { + op: OrmDiffOpType::add, + valType: Some(OrmDiffType::set), + path: path_str.clone(), + value: Some(json!(pred_change.values_added)), + }); + } + if pred_change.values_removed.len() > 0 { + patches.push(OrmDiffOp { + op: OrmDiffOpType::remove, + valType: Some(OrmDiffType::set), + path: path_str, + value: Some(json!(pred_change.values_removed)), + }); + } + } else if is_object { + // Change in single object property. + if !is_multi { + let object_iri = match &pred_change.values_added[0] { + BasicType::Str(iri) => iri, + _ => panic!("Object no IRI"), + }; + // Single object. if pred_change.values_added.len() > 0 { - patches.push(OrmDiffOp { - op: OrmDiffOpType::add, - valType: Some(OrmDiffType::set), - path: path_str.clone(), - value: Some(json!(pred_change.values_added)), - }); + create_patches_for_nested_object( + pred_shape, + tracked_subjects, + patches, + path, + object_iri, + orm_changes, + sub, + ); } if pred_change.values_removed.len() > 0 { + // Object is removed. patches.push(OrmDiffOp { op: OrmDiffOpType::remove, - valType: Some(OrmDiffType::set), + valType: Some(OrmDiffType::object), path: path_str, - value: Some(json!(pred_change.values_removed)), + value: None, }); } - } else if is_object { - fn create_patches_for_nested_object( - object_iri: &String, - pred_shape: &OrmSchemaPredicate, - tracked_subjects: &HashMap< - String, - HashMap>>, - >, - patches: &mut Vec, - path: &mut Vec, - pred_change: &OrmTrackedPredicateChanges, - orm_changes: &mut OrmChanges, - sub: &OrmSubscription, - ) { - // Object was added. That means, we need to add a basic object with no value, - // Then add further predicates to it in a recursive call. + } else { + // Change(s) in multi object property. + + // Add every new object. + for obj_iri_bt in pred_change.values_added.iter() { + let obj_iri = match obj_iri_bt { + BasicType::Str(iri) => iri, + _ => panic!("Object no IRI"), + }; + + // First, we create a root object (if the object existed before, this has no effect). patches.push(OrmDiffOp { op: OrmDiffOpType::add, valType: Some(OrmDiffType::object), - path: path.join("/"), + path: path_str.clone(), value: None, }); - let object_iri = match &pred_change.values_added[0] { - BasicType::Str(iri) => iri, - _ => panic!("object not an IRI"), - }; - - /// Get the shape IRI for a nested object that is valid. - let object_shape_iri = { - // Get the tracked subject for this object IRI - let tracked_subjects_for_obj = tracked_subjects - .get(object_iri) - .expect("Object should be tracked"); - - // Find the first valid shape for this object from the allowed shapes - let allowed_shape_iris: Vec<&String> = pred_shape - .dataTypes - .iter() - .filter_map(|dt| dt.shape.as_ref()) - .collect(); - - allowed_shape_iris - .iter() - .find(|shape_iri| { - tracked_subjects_for_obj - .get(**shape_iri) - .map(|ts| { - ts.read().unwrap().valid - == OrmTrackedSubjectValidity::Valid - }) - .unwrap_or(false) - }) - .unwrap() - .to_string() - }; + // Add escaped object IRI to path. + path.push(escape_json_pointer(obj_iri)); - // Apply changes for nested object. - create_patches_for_changed_subj( - orm_changes, + create_patches_for_nested_object( + pred_shape, + tracked_subjects, patches, - &object_shape_iri, - &object_iri, - sub, path, - tracked_subjects, + obj_iri, + orm_changes, + sub, ); + + // Remove object IRI from stack again. + path.pop(); } - if !is_multi { - // Single object. - if pred_change.values_added.len() > 0 { - } else { - // Object is removed. - patches.push(OrmDiffOp { - op: OrmDiffOpType::remove, - valType: Some(OrmDiffType::object), - path: path_str, - value: None, - }); - } + // Delete objects. + // If there are no more predicates, delete the whole object. + if pred_change + .tracked_predicate + .read() + .unwrap() + .tracked_children + .len() + == 0 + { + // Or the whole thing if no children remain + patches.push(OrmDiffOp { + op: OrmDiffOpType::remove, + valType: Some(OrmDiffType::object), + path: path_str, + value: None, + }); } else { - // is_multi && is_object - - // Add every new object. - for object_iri_added in pred_change.values_added.iter() { - // If object did not exist before, we create a root patch. - - // We also need to check if the object existed before. - } - - // Delete every object removed. - // If there are no more predicates, delete the whole object. - if pred_change - .tracked_predicate - .read() - .unwrap() - .tracked_children - .len() - == 0 - { - // Or the whole thing if no children remain + for object_iri_removed in pred_change.values_removed.iter() { patches.push(OrmDiffOp { op: OrmDiffOpType::remove, valType: Some(OrmDiffType::object), - path: path_str, + path: format!( + "/{}/{}", + path_str, + match object_iri_removed { + BasicType::Str(iri) => iri, + _ => panic!("Object IRI must be string"), + } + ), value: None, }); - } else { - for object_iri_removed in pred_change.values_removed.iter() - { - patches.push(OrmDiffOp { - op: OrmDiffOpType::remove, - valType: Some(OrmDiffType::object), - path: format!( - "{}/{}", - path_str, - match object_iri_removed { - BasicType::Str(iri) => iri, - _ => panic!("Object IRI must be string"), - } - ), - value: None, - }); - } } } } - - // Remove to this predicate name from the path again. - path.pop(); } - } - // For each tracked subject that has the subscription's shape, call fn above - for subject_iri in sub.tracked_subjects.iter() { - // Path.push - // Create changes - // path.pop + // Remove this predicate name from the path again. + path.pop(); } - // + } - let orm_diff: OrmDiff = vec![]; - let _ = sub - .sender - .clone() - .send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff.to_vec()))) - .await; + let mut patches: OrmDiff = vec![]; + let mut path: Vec = Vec::with_capacity(4); + + // Iterate over each root subject with the right shape + // For each tracked subject that has the subscription's shape, call fn above + for (subject_iri, tracked_subjects_by_shape) in sub.tracked_subjects.iter() { + for (shape_iri, tracked_subject) in tracked_subjects_by_shape.iter() { + if *shape_iri != sub.shape_type.shape { + continue; + } + // Found a root subject for this shape. + + // Add subject IRI as first part of path pointer. + path.push(escape_json_pointer(subject_iri)); + create_patches_for_changed_subj( + &orm_changes, + &mut patches, + shape_iri, + subject_iri, + sub, + &mut path, + &sub.tracked_subjects, + ); + path.pop(); + } } + + // Send response with patches. + let _ = sub + .sender + .clone() + .send(AppResponse::V0(AppResponseV0::OrmUpdate(patches.to_vec()))) + .await; } } } @@ -1143,3 +1199,5 @@ impl Verifier { Ok((rx, close)) } } + +// Btw, orm/mod.rs is exceeding 1200 lines again. Is that a good practice? I have the feeling, we could separate a couple of things.. diff --git a/engine/verifier/src/orm/utils.rs b/engine/verifier/src/orm/utils.rs index 8df48c2..b45a289 100644 --- a/engine/verifier/src/orm/utils.rs +++ b/engine/verifier/src/orm/utils.rs @@ -10,6 +10,7 @@ use ng_oxigraph::oxrdf::Subject; use ng_repo::log::*; use ng_repo::types::OverlayId; +use yrs::types::PathSegment; use std::collections::HashMap; use std::collections::HashSet; @@ -321,3 +322,11 @@ pub fn nuri_to_string(nuri: &NuriV0) -> String { let graph_name = NuriV0::repo_graph_name(repo_id, &overlay_id); graph_name } + +pub fn escape_json_pointer(path_segment: &String) -> String { + path_segment.replace("~", "~0").replace("/", "~1") +} + +pub fn decode_join_pointer(path: &String) -> String { + path.replace("~1", "/").replace("~0", "~") +}