From d54290d99308cbf3322379f83aea68076208da98 Mon Sep 17 00:00:00 2001 From: Niko PLP Date: Mon, 13 Oct 2025 13:07:57 +0300 Subject: [PATCH] improve OrmUpdate logic for self update --- ng-net/src/app_protocol.rs | 3 +- ng-net/src/orm.rs | 8 ++++++ ng-verifier/src/commits/transaction.rs | 9 ++++-- ng-verifier/src/inbox_processor.rs | 10 +++---- ng-verifier/src/orm/mod.rs | 38 ++++++++++++++++++-------- ng-verifier/src/request_processor.rs | 13 +++++++-- ng-verifier/src/verifier.rs | 5 +++- 7 files changed, 63 insertions(+), 23 deletions(-) diff --git a/ng-net/src/app_protocol.rs b/ng-net/src/app_protocol.rs index 993586a..2def62a 100644 --- a/ng-net/src/app_protocol.rs +++ b/ng-net/src/app_protocol.rs @@ -20,7 +20,7 @@ use ng_repo::utils::{decode_digest, decode_key, decode_sym_key}; use ng_repo::utils::{decode_overlayid, display_timestamp_local}; use serde_json::Value; -use crate::orm::{OrmDiff, OrmShapeType}; +use crate::orm::{OrmDiff, OrmShapeType, OrmUpdateBlankNodeIds}; use crate::types::*; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -1309,6 +1309,7 @@ pub enum AppResponseV0 { Commits(Vec), OrmInitial(Value), OrmUpdate(OrmDiff), + OrmUpdateBlankNodeIds(OrmUpdateBlankNodeIds), OrmError(String), } diff --git a/ng-net/src/orm.rs b/ng-net/src/orm.rs index 2d16b93..afbd2f2 100644 --- a/ng-net/src/orm.rs +++ b/ng-net/src/orm.rs @@ -48,6 +48,14 @@ pub struct OrmDiffOp { pub type OrmDiff = Vec; +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct OrmUpdateBlankNodeId { + pub path: String, + pub nuri: String, +} + +pub type OrmUpdateBlankNodeIds = Vec; + pub type OrmSchema = HashMap>; #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/ng-verifier/src/commits/transaction.rs b/ng-verifier/src/commits/transaction.rs index 4e88b6c..0afe73c 100644 --- a/ng-verifier/src/commits/transaction.rs +++ b/ng-verifier/src/commits/transaction.rs @@ -295,7 +295,7 @@ impl Verifier { transaction, commit_info, }; - self.update_graph(vec![info]).await?; + self.update_graph(vec![info], 0).await?; } else //TODO: change the logic here. transaction commits can have both a discrete and graph update. Only one AppResponse should be sent in this case, containing both updates. if body.discrete.is_some() { @@ -417,6 +417,7 @@ impl Verifier { inserts: Vec, removes: Vec, peer_id: Vec, + session_id: u64, ) -> Result, VerifierError> { // options when not a publisher on the repo: // - skip @@ -531,12 +532,13 @@ impl Verifier { }; updates.push(info); } - self.update_graph(updates).await + self.update_graph(updates, session_id).await } async fn update_graph( &mut self, mut updates: Vec, + session_id: u64, ) -> Result, VerifierError> { let updates_ref = &mut updates; let res = self @@ -758,6 +760,7 @@ impl Verifier { self.orm_update( &NuriV0::new_empty(), nuri, + session_id, update.transaction.as_quads_patch(graph_nuri), ) .await; @@ -775,6 +778,7 @@ impl Verifier { query: &String, base: &Option, peer_id: Vec, + session_id: u64, ) -> Result, String> { let store = self.graph_dataset.as_ref().unwrap(); @@ -796,6 +800,7 @@ impl Verifier { Vec::from_iter(inserts), Vec::from_iter(removes), peer_id, + session_id, ) .await .map_err(|e| e.to_string()) diff --git a/ng-verifier/src/inbox_processor.rs b/ng-verifier/src/inbox_processor.rs index 287bc58..64b756f 100644 --- a/ng-verifier/src/inbox_processor.rs +++ b/ng-verifier/src/inbox_processor.rs @@ -65,7 +65,7 @@ impl Verifier { <> ng:social_query_from_profile <{from_profile_nuri_string}>. <> ng:social_query_started \"{}\"^^xsd:dateTime . }}",DateTime::now()); let ret = self - .process_sparql_update(&forwarder_nuri, &sparql_update, &Some(forwarder_nuri_string.clone()), vec![]) + .process_sparql_update(&forwarder_nuri, &sparql_update, &Some(forwarder_nuri_string.clone()), vec![],0) .await; if let Err(e) = ret { return Err(VerifierError::SparqlError(e)); @@ -78,7 +78,7 @@ impl Verifier { // adding triples in forwarder doc : ng:social_query_id let sparql_update = format!("INSERT DATA {{ <{forwarder_nuri_string}> \"{}\"^^ . }}",DateTime::now()); let ret = self - .process_sparql_update(forwarder_nuri, &sparql_update, &None, vec![]) + .process_sparql_update(forwarder_nuri, &sparql_update, &None, vec![],0) .await; if let Err(e) = ret { return Err(VerifierError::SparqlError(e)); @@ -153,7 +153,7 @@ impl Verifier { ng:social_query_forwarded_to_inbox <{to_inbox_nuri}> . }}"); let ret = self - .process_sparql_update(&forwarder_nuri, &sparql_update, &None, vec![]) + .process_sparql_update(&forwarder_nuri, &sparql_update, &None, vec![],0) .await; if let Err(e) = ret { return Err(VerifierError::SparqlError(e)); @@ -593,7 +593,7 @@ impl Verifier { let nuri_ov = NuriV0::repo_graph_name(&response.query_id, &overlay_id); let graph_name = NamedNode::new_unchecked(&nuri_ov); let quads = triples.into_iter().map(|t| t.in_graph(graph_name.clone()) ).collect(); - let commits = self.prepare_sparql_update(quads, vec![], self.get_peer_id_for_skolem()).await?; + let commits = self.prepare_sparql_update(quads, vec![], self.get_peer_id_for_skolem(), 0).await?; } else { @@ -665,7 +665,7 @@ impl Verifier { {has_email} }}", details.name); let ret = self - .process_sparql_update(&contact_nuri, &sparql_update, &Some(contact_nuri_string), vec![]) + .process_sparql_update(&contact_nuri, &sparql_update, &Some(contact_nuri_string), vec![],0) .await; if let Err(e) = ret { return Err(VerifierError::SparqlError(e)); diff --git a/ng-verifier/src/orm/mod.rs b/ng-verifier/src/orm/mod.rs index cce5add..3b83a64 100644 --- a/ng-verifier/src/orm/mod.rs +++ b/ng-verifier/src/orm/mod.rs @@ -626,23 +626,39 @@ impl Verifier { &mut self, scope: &NuriV0, commit_nuri: String, + session_id: u64, patch: GraphQuadsPatch, ) { + let mut responses = Vec::with_capacity(1); if let Some(subs) = self.orm_subscriptions.get(scope) { + let mut orm_diff: Option = None; for sub in subs { - - // //TODO fix this - // let orm_diff = ??; - - // self.push_orm_response( - // scope, - // sub.session_id, - // sub.sender.clone(), - // AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff)), - // ) - // .await; + if sub.session_id == session_id { + //TODO prepare OrmUpdateBlankNodeIds + let orm_bnids = vec![]; + + responses.push(( + sub.session_id, + sub.sender.clone(), + AppResponseV0::OrmUpdateBlankNodeIds(orm_bnids), + )); + } else { + if orm_diff.is_none() { + //orm_diff = Some(??) + //TODO implement this + } + responses.push(( + sub.session_id, + sub.sender.clone(), + AppResponseV0::OrmUpdate(orm_diff.as_ref().unwrap().to_vec()), + )); + } } } + for (session_id, sender, res) in responses { + self.push_orm_response(scope, session_id, sender, AppResponse::V0(res)) + .await; + } } pub(crate) async fn orm_frontend_update( diff --git a/ng-verifier/src/request_processor.rs b/ng-verifier/src/request_processor.rs index e76f959..e1b8a5f 100644 --- a/ng-verifier/src/request_processor.rs +++ b/ng-verifier/src/request_processor.rs @@ -216,6 +216,7 @@ impl Verifier { Vec::from_iter(inserts), Vec::from_iter(removes), self.get_peer_id_for_skolem(), + 0, ) .await?; Ok(()) @@ -696,7 +697,7 @@ impl Verifier { ); let ret = self - .process_sparql_update(&store_nuri, &query, &None, vec![]) + .process_sparql_update(&store_nuri, &query, &None, vec![], 0) .await; if let Err(e) = ret { return Err(NgError::SparqlError(e)); @@ -712,7 +713,9 @@ impl Verifier { object: Literal::new_simple_literal(primary_class).into(), graph_name: NamedNode::new_unchecked(&header_branch_nuri).into(), }; - let ret = self.prepare_sparql_update(vec![quad], vec![], vec![]).await; + let ret = self + .prepare_sparql_update(vec![quad], vec![], vec![], 0) + .await; if let Err(e) = ret { return Err(NgError::SparqlError(e.to_string())); } @@ -814,6 +817,7 @@ impl Verifier { &sparql_update, &Some(contact_doc_nuri_string), vec![], + 0, ) .await; if let Err(e) = ret { @@ -894,6 +898,7 @@ impl Verifier { command: &AppRequestCommandV0, nuri: NuriV0, payload: Option, + session_id: u64, ) -> Result { match command { AppRequestCommandV0::OrmUpdate => match payload { @@ -993,7 +998,7 @@ impl Verifier { let social_query_doc_nuri_string = NuriV0::repo_id(query_id); let sparql_update = format!("INSERT DATA {{ <{social_query_doc_nuri_string}> <{forwarder_nuri_string}>. }}"); let ret = self - .process_sparql_update(&nuri, &sparql_update, &None, vec![]) + .process_sparql_update(&nuri, &sparql_update, &None, vec![], 0) .await; if let Err(e) = ret { return Err(NgError::SparqlError(e)); @@ -1008,6 +1013,7 @@ impl Verifier { &sparql_update, &Some(forwarder_nuri_string), vec![], + 0, ) .await; if let Err(e) = ret { @@ -1217,6 +1223,7 @@ impl Verifier { &sparql, &base, self.get_peer_id_for_skolem(), + session_id, ) .await { diff --git a/ng-verifier/src/verifier.rs b/ng-verifier/src/verifier.rs index 05de65c..c05298a 100644 --- a/ng-verifier/src/verifier.rs +++ b/ng-verifier/src/verifier.rs @@ -2836,7 +2836,10 @@ impl Verifier { pub async fn app_request(&mut self, req: AppRequest) -> Result { match req { - AppRequest::V0(v0) => self.process(&v0.command, v0.nuri, v0.payload).await, + AppRequest::V0(v0) => { + self.process(&v0.command, v0.nuri, v0.payload, v0.session_id) + .await + } } }