From c3a1b48d25df9b5f74c4430a9ab6150f3246ebfa Mon Sep 17 00:00:00 2001 From: Laurin Weger Date: Tue, 14 Oct 2025 14:41:54 +0200 Subject: [PATCH] wip tripple changes to json patch --- engine/verifier/src/commits/transaction.rs | 2 +- engine/verifier/src/orm/mod.rs | 274 +++++++++++++++++++-- 2 files changed, 250 insertions(+), 26 deletions(-) diff --git a/engine/verifier/src/commits/transaction.rs b/engine/verifier/src/commits/transaction.rs index 8303fe7..db4b652 100644 --- a/engine/verifier/src/commits/transaction.rs +++ b/engine/verifier/src/commits/transaction.rs @@ -778,7 +778,7 @@ impl Verifier { .await; let graph_nuri = NuriV0::repo_graph_name(&update.repo_id, &update.overlay_id); - self.orm_update( + self.orm_backend_update( session_id, update.repo_id.clone(), update.overlay_id, diff --git a/engine/verifier/src/orm/mod.rs b/engine/verifier/src/orm/mod.rs index aea2f6a..4ca3e27 100644 --- a/engine/verifier/src/orm/mod.rs +++ b/engine/verifier/src/orm/mod.rs @@ -391,14 +391,10 @@ impl Verifier { shape: Option<&ShapeIri>, session_id: Option<&u64>, ) -> Result<(UnboundedSender, &OrmSubscription), VerifierError> { - let subs = self - .orm_subscriptions - .get_mut(nuri) - .unwrap(); + let subs = self.orm_subscriptions.get_mut(nuri).unwrap(); subs.retain(|sub| !sub.sender.is_closed()); - match - // Filter shapes, if present. - subs.iter() + match subs // Filter shapes, if present. + .iter() .filter(|s| match shape { Some(sh) => *sh == s.shape_type.shape, None => true, // Filter session ids if present. @@ -410,9 +406,7 @@ impl Verifier { .next() { None => Err(VerifierError::OrmSubscriptionNotFound), - Some(subscription) => { - Ok((subscription.sender.clone(), subscription)) - } + Some(subscription) => Ok((subscription.sender.clone(), subscription)), } } @@ -658,34 +652,258 @@ impl Verifier { return Ok(return_vals); } - pub(crate) async fn orm_update( + /// Generate and send JSON patches from GraphQuadsPatch (quad inserts and removes) to JS-land. + pub(crate) async fn orm_backend_update( &mut self, session_id: u64, repo_id: RepoId, overlay_id: OverlayId, patch: GraphQuadsPatch, ) { - let overlaylink: OverlayLink = overlay_id.into(); + let overlaylink: OverlayLink = overlay_id.into(); + + // We need to apply the patches to all subscriptions we have. We can use process_changes_for_* + // That updates the tracked subjects, validates them, and returns a set of changes structured + // by the respective schema. + + let triple_inserts: Vec = patch + .inserts + .iter() + .map(|quad| { + Triple::new( + quad.subject.clone(), + quad.predicate.clone(), + quad.object.clone(), + ) + }) + .collect(); + let triple_removes: Vec = patch + .removes + .iter() + .map(|quad| { + Triple::new( + quad.subject.clone(), + quad.predicate.clone(), + quad.object.clone(), + ) + }) + .collect(); + + // let mut updates = Vec::new(); + for (scope, subs) in self.orm_subscriptions.iter_mut() { - subs.retain(|sub| !sub.sender.is_closed()); if scope.target == NuriTargetV0::UserSite - || scope.overlay.as_ref().map_or(false, |ol| overlaylink == *ol) + || scope + .overlay + .as_ref() + .map_or(false, |ol| overlaylink == *ol) || scope.target == NuriTargetV0::Repo(repo_id) { - for sub in subs { - if sub.session_id != session_id { // this is incorrect. we are excluding all the subscriptions from the originating session, - // while we should only exclude the one with exact same shape_type. but we don't have access to that here + continue; + } + + let mut orm_changes: OrmChanges = HashMap::new(); - // TODO: implement this, generate orm_diff using the patch and the sub.shape_type - let orm_diff: OrmDiff = vec![]; + // Remove old subscriptions + subs.retain(|sub| !sub.sender.is_closed()); + + // Apply updates to tracked subjects and record the changes. + let shape_types = subs.iter().map(|sub| &sub.shape_type); + for shape_type in shape_types { + let shape_arc = shape_type.schema.get(&shape_type.shape).unwrap().clone(); + + let _ = self.process_changes_for_shape_and_session( + &scope, + shape_arc, + session_id, + &triple_inserts, + &triple_removes, + &mut orm_changes, + false, + ); + } + + for sub in subs.iter_mut() { + // TODO: This if-condition is wrong (intended to not re-apply changes coming from the same subscription). + if sub.session_id != session_id { + // Create diff from changes & subscription. + + let mut patches: OrmDiff = vec![]; + let mut path: Vec = vec![]; + fn create_patches_for_changed_subj( + orm_changes: &OrmChanges, + patches: &mut OrmDiff, + shape_iri: &String, + subject_iri: &String, + sub: &OrmSubscription, + path: &mut Vec, + ) { + let change = orm_changes + .get(shape_iri) + .unwrap() + .get(subject_iri) + .unwrap(); + let subject_shape = sub.shape_type.schema.get(shape_iri).unwrap(); + + // Iterate over every predicate change and create patches + for (pred_iri, pred_change) in change.predicates { + let pred_shape = subject_shape + .predicates + .iter() + .find(|p| p.iri == pred_iri) + .unwrap(); + + let is_multi = + pred_shape.maxCardinality > 1 || pred_shape.maxCardinality == -1; + let is_object = + pred_shape.dataTypes.iter().any(|dt| !dt.shape.is_none()); + let pred_name = &pred_shape.readablePredicate; + path.push(pred_name); + let path_str = path.join("/"); + + // Depending on the predicate type, add the respective diff operation. + + // Single primitive value + if !is_multi && !is_object { + if pred_change.values_added.len() > 0 { + patches.push(OrmDiffOp { + op: OrmDiffOpType::add, + valType: None, + path: path_str.clone(), + value: Some(json!(pred_change.values_added[0])), + }); + } + if pred_change.values_removed.len() > 0 { + patches.push(OrmDiffOp { + op: OrmDiffOpType::remove, + valType: None, + path: path_str, + value: Some(json!(pred_change.values_added[0])), + }); + } + } else if is_multi && !is_object { + // Set of primitive values + if pred_change.values_added.len() > 0 { + patches.push(OrmDiffOp { + op: OrmDiffOpType::add, + valType: Some(OrmDiffType::set), + path: path_str.clone(), + value: Some(json!(pred_change.values_added)), + }); + } + if pred_change.values_removed.len() > 0 { + patches.push(OrmDiffOp { + op: OrmDiffOpType::remove, + valType: Some(OrmDiffType::set), + path: path_str, + value: Some(json!(pred_change.values_removed)), + }); + } + } else if !is_multi && is_object { + // Single object. + if pred_change.values_added.len() > 0 { + // Object was added. That means, we need to add a basic object with no value, + // Then add further predicates to it in a recursive call. + patches.push(OrmDiffOp { + op: OrmDiffOpType::add, + valType: Some(OrmDiffType::object), + path: path_str.clone(), + value: None, + }); + // Apply changes for nested object. + create_patches_for_changed_subj( + orm_changes, + patches, + &pred_shape.dataTypes[0].shape.unwrap(), // TODO: We need to get to the information which of the object types was validated successfully. + match pred_change.values_added[0] { + BasicType::Str(str) => &str, + _ => panic!("Nested object should be a string."), + }, + sub, + path, + ); + } else { + // Object is removed. + patches.push(OrmDiffOp { + op: OrmDiffOpType::remove, + valType: Some(OrmDiffType::object), + path: path_str, + value: None, + }); + } + } else if is_multi && is_object { + // Add every object added. + for object_iri_added in pred_change.values_added { + // TODO: As with single object, just that we add the IRI to the path. + // We also need to check if the object existed before. + } + + // Delete every object removed. + if tracked_predicate.children.len() == 0 { + // Or the whole thing if no children remain + patches.push(OrmDiffOp { + op: OrmDiffOpType::remove, + valType: Some(OrmDiffType::object), + path: path_str, + value: None, + }); + } else { + for object_iri_removed in pred_change.values_removed { + patches.push(OrmDiffOp { + op: OrmDiffOpType::remove, + valType: Some(OrmDiffType::object), + path: format!( + "{}/{}", + path_str, + match object_iri_removed { + BasicType::Str(iri) => iri, + _ => panic!("Object IRI must be string"), + } + ), + value: None, + }); + } + } + } + + // Is the tracked subject valid and was it before? + // Just regular patch for each predicate. + // If the predicate is a nested object... + // We need to recurse. Do the same as here but append to the path. - _ = sub.sender.send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff.to_vec()))).await; + // Is the tracked subject invalid and was valid before? + // Delete of root + + // Did the subject become valid but was invalid before? + // We need to compose a orm object (based on the data we luckily have already). + + // Was the subject invalid and is still invalid? + // Nothing to do. + + // Remove to this predicate name from the path again. + path.pop(); + } } + + // For each tracked subject that has the subscription's shape, call fn above + for subject_iri in sub.tracked_subjects { // TODO + } + // + + let orm_diff: OrmDiff = vec![]; + let _ = sub + .sender + .send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff.to_vec()))) + .await; } } } } + /// After creating new objects (without an id) in JS-land, + /// we send the generated id for those back. + /// If something went wrong (revert_inserts / revert_removes not empty), + /// we send a JSON patch back to revert the made changes. pub(crate) async fn orm_update_self( &mut self, scope: &NuriV0, @@ -695,7 +913,6 @@ impl Verifier { revert_inserts: Vec, revert_removes: Vec, ) -> Result<(), VerifierError> { - let (mut sender, orm_subscription) = self.get_first_orm_subscription_sender_for(scope, Some(&shape_iri), Some(&session_id))?; @@ -703,7 +920,11 @@ impl Verifier { // use orm_subscription if needed // note(niko): I think skolemnized blank nodes can still be many, in case of multi-level nested sub-objects. let orm_bnids = vec![]; - let _ = sender.send(AppResponse::V0(AppResponseV0::OrmUpdateBlankNodeIds(orm_bnids))).await; + let _ = sender + .send(AppResponse::V0(AppResponseV0::OrmUpdateBlankNodeIds( + orm_bnids, + ))) + .await; // TODO (later) revert the inserts and removes // let orm_diff = vec![]; @@ -712,6 +933,7 @@ impl Verifier { Ok(()) } + /// Handles updates coming from JS-land (JSON patches). pub(crate) async fn orm_frontend_update( &mut self, session_id: u64, @@ -727,7 +949,6 @@ impl Verifier { diff ); - // find OrmSubscription let (doc_nuri, sparql_update) = { let orm_subscription = self.get_first_orm_subscription_for(scope, Some(&shape_iri), Some(&session_id)); @@ -763,7 +984,8 @@ impl Verifier { revert_inserts, revert_removes, ) - .await.map_err(|e|e.to_string())?; + .await + .map_err(|e| e.to_string())?; } Ok(()) } @@ -808,7 +1030,9 @@ impl Verifier { let orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type)?; // log_debug!("create_orm_object_for_shape return {:?}", orm_objects); - let _ = tx.send(AppResponse::V0(AppResponseV0::OrmInitial(orm_objects))).await; + let _ = tx + .send(AppResponse::V0(AppResponseV0::OrmInitial(orm_objects))) + .await; let close = Box::new(move || { log_debug!("closing ORM subscription");