From 8472562913a0343bf979dcdc76e6bf1a2c5be32b Mon Sep 17 00:00:00 2001 From: Niko PLP Date: Mon, 13 Oct 2025 15:52:02 +0300 Subject: [PATCH] orm_update_self --- ng-verifier/src/commits/transaction.rs | 44 ++++++++++++++----- ng-verifier/src/orm/mod.rs | 59 +++++++++++++++++++------- 2 files changed, 76 insertions(+), 27 deletions(-) diff --git a/ng-verifier/src/commits/transaction.rs b/ng-verifier/src/commits/transaction.rs index 0afe73c..b17ba30 100644 --- a/ng-verifier/src/commits/transaction.rs +++ b/ng-verifier/src/commits/transaction.rs @@ -393,14 +393,16 @@ impl Verifier { // TODO: implement TargetBranchV0::Named _ => unimplemented!(), }; - let _ = branches.entry(branch_id).or_insert(( - store.get_store_repo().clone(), - repo.id, - branch_type, - topic_id, - token, - store.overlay_id, - )); + if is_publisher { + let _ = branches.entry(branch_id).or_insert(( + store.get_store_repo().clone(), + repo.id, + branch_type, + topic_id, + token, + store.overlay_id, + )); + } let _ = nuri_branches.entry(graph_name.clone()).or_insert(( repo.id, branch_id, @@ -424,6 +426,9 @@ impl Verifier { // - TODO: abort (the whole transaction) // - TODO: inbox (sent to inbox of document for a suggested update) // for now we just do skip, without giving option to user + let mut revert_inserts: Vec = vec![]; + let mut revert_removes: Vec = vec![]; + let mut skolemnized_blank_nodes: Vec = vec![]; let mut inserts_map: HashMap> = HashMap::with_capacity(1); let mut removes_map: HashMap> = HashMap::with_capacity(1); let mut branches: HashMap< @@ -438,6 +443,7 @@ impl Verifier { let (repo_id, branch_id, is_publisher) = self.find_branch_and_repo_for_quad(&insert, &mut branches, &mut nuri_branches)?; if !is_publisher { + revert_inserts.push(insert); continue; } let set = inserts_map.entry(branch_id).or_insert_with(|| { @@ -463,6 +469,7 @@ impl Verifier { let iri = NuriV0::repo_skolem(&repo_id, &peer_id, b.as_ref().unique_id().unwrap())?; insert.object = Term::NamedNode(NamedNode::new_unchecked(iri)); + skolemnized_blank_nodes.push(insert.clone()); } } // TODO deal with triples in subject and object (RDF-STAR) @@ -473,6 +480,7 @@ impl Verifier { let (repo_id, branch_id, is_publisher) = self.find_branch_and_repo_for_quad(&remove, &mut branches, &mut nuri_branches)?; if !is_publisher { + revert_removes.push(remove); continue; } let set = removes_map.entry(branch_id).or_insert_with(|| { @@ -532,7 +540,22 @@ impl Verifier { }; updates.push(info); } - self.update_graph(updates, session_id).await + match self.update_graph(updates, session_id).await { + Ok(commits) => { + if session_id != 0 { + self.orm_update_self( + &NuriV0::new_empty(), + session_id, + skolemnized_blank_nodes, + revert_inserts, + revert_removes, + ) + .await; + } + Ok(commits) + } + Err(e) => Err(e), + } } async fn update_graph( @@ -743,7 +766,7 @@ impl Verifier { } else { let graph_patch = update.transaction.as_patch(); let nuri = NuriV0::commit(&update.repo_id, &update.commit_id); - commit_nuris.push(nuri.clone()); + commit_nuris.push(nuri); self.push_app_response( &update.branch_id, AppResponse::V0(AppResponseV0::Patch(AppPatch { @@ -759,7 +782,6 @@ impl Verifier { NuriV0::repo_graph_name(&update.repo_id, &update.overlay_id); self.orm_update( &NuriV0::new_empty(), - nuri, session_id, update.transaction.as_quads_patch(graph_nuri), ) diff --git a/ng-verifier/src/orm/mod.rs b/ng-verifier/src/orm/mod.rs index 3b83a64..3f495e0 100644 --- a/ng-verifier/src/orm/mod.rs +++ b/ng-verifier/src/orm/mod.rs @@ -14,6 +14,7 @@ pub mod validation; use futures::channel::mpsc; use futures::channel::mpsc::UnboundedSender; +use ng_oxigraph::oxrdf::Quad; use std::collections::HashMap; use std::collections::HashSet; @@ -625,33 +626,57 @@ impl Verifier { pub(crate) async fn orm_update( &mut self, scope: &NuriV0, - commit_nuri: String, session_id: u64, patch: GraphQuadsPatch, ) { let mut responses = Vec::with_capacity(1); if let Some(subs) = self.orm_subscriptions.get(scope) { - let mut orm_diff: Option = None; + // TODO: implement this, generate orm_diff using the patch + let orm_diff: OrmDiff = vec![]; for sub in subs { - if sub.session_id == session_id { - //TODO prepare OrmUpdateBlankNodeIds - let orm_bnids = vec![]; - + if sub.session_id != session_id { responses.push(( sub.session_id, sub.sender.clone(), - AppResponseV0::OrmUpdateBlankNodeIds(orm_bnids), + AppResponseV0::OrmUpdate(orm_diff.to_vec()), )); - } else { - if orm_diff.is_none() { - //orm_diff = Some(??) - //TODO implement this - } + } + } + } + for (session_id, sender, res) in responses { + self.push_orm_response(scope, session_id, sender, AppResponse::V0(res)) + .await; + } + } + + pub(crate) async fn orm_update_self( + &mut self, + scope: &NuriV0, + session_id: u64, + skolemnized_blank_nodes: Vec, + revert_inserts: Vec, + revert_removes: Vec, + ) { + let mut responses = Vec::with_capacity(1); + if let Some(subs) = self.orm_subscriptions.get(scope) { + for sub in subs { + if sub.session_id == session_id { + // TODO prepare OrmUpdateBlankNodeIds with skolemnized_blank_nodes + // note(niko): I think skolemnized blank nodes can still be many, in case of multi-level nested sub-objects. + let orm_bnids = vec![]; responses.push(( - sub.session_id, + session_id, sub.sender.clone(), - AppResponseV0::OrmUpdate(orm_diff.as_ref().unwrap().to_vec()), + AppResponseV0::OrmUpdateBlankNodeIds(orm_bnids), )); + // TODO (later) revert the inserts and removes + // let orm_diff = vec![]; + // responses.push(( + // session_id, + // sub.sender.clone(), + // AppResponseV0::OrmUpdate(orm_diff), + // )); + break; } } } @@ -672,7 +697,7 @@ impl Verifier { pub(crate) async fn push_orm_response( &mut self, - nuri: &NuriV0, + scope: &NuriV0, session_id: u64, sender: UnboundedSender, response: AppResponse, @@ -682,7 +707,9 @@ impl Verifier { if sender.is_closed() { log_debug!("closed so removing session {}", session_id); - self.orm_subscriptions.remove(&nuri); + if let Some(subs) = self.orm_subscriptions.get_mut(&scope) { + subs.retain(|sub| sub.session_id != session_id); + } } else { let _ = sender.clone().send(response).await; }