improve OrmUpdate logic for self update

feat/orm
Niko PLP 6 days ago
parent e6d3cbc337
commit d54290d993
  1. 3
      ng-net/src/app_protocol.rs
  2. 8
      ng-net/src/orm.rs
  3. 9
      ng-verifier/src/commits/transaction.rs
  4. 10
      ng-verifier/src/inbox_processor.rs
  5. 38
      ng-verifier/src/orm/mod.rs
  6. 13
      ng-verifier/src/request_processor.rs
  7. 5
      ng-verifier/src/verifier.rs

@ -20,7 +20,7 @@ use ng_repo::utils::{decode_digest, decode_key, decode_sym_key};
use ng_repo::utils::{decode_overlayid, display_timestamp_local}; use ng_repo::utils::{decode_overlayid, display_timestamp_local};
use serde_json::Value; use serde_json::Value;
use crate::orm::{OrmDiff, OrmShapeType}; use crate::orm::{OrmDiff, OrmShapeType, OrmUpdateBlankNodeIds};
use crate::types::*; use crate::types::*;
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
@ -1309,6 +1309,7 @@ pub enum AppResponseV0 {
Commits(Vec<String>), Commits(Vec<String>),
OrmInitial(Value), OrmInitial(Value),
OrmUpdate(OrmDiff), OrmUpdate(OrmDiff),
OrmUpdateBlankNodeIds(OrmUpdateBlankNodeIds),
OrmError(String), OrmError(String),
} }

@ -48,6 +48,14 @@ pub struct OrmDiffOp {
pub type OrmDiff = Vec<OrmDiffOp>; pub type OrmDiff = Vec<OrmDiffOp>;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OrmUpdateBlankNodeId {
pub path: String,
pub nuri: String,
}
pub type OrmUpdateBlankNodeIds = Vec<OrmUpdateBlankNodeId>;
pub type OrmSchema = HashMap<String, Arc<OrmSchemaShape>>; pub type OrmSchema = HashMap<String, Arc<OrmSchemaShape>>;
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]

@ -295,7 +295,7 @@ impl Verifier {
transaction, transaction,
commit_info, commit_info,
}; };
self.update_graph(vec![info]).await?; self.update_graph(vec![info], 0).await?;
} else } else
//TODO: change the logic here. transaction commits can have both a discrete and graph update. Only one AppResponse should be sent in this case, containing both updates. //TODO: change the logic here. transaction commits can have both a discrete and graph update. Only one AppResponse should be sent in this case, containing both updates.
if body.discrete.is_some() { if body.discrete.is_some() {
@ -417,6 +417,7 @@ impl Verifier {
inserts: Vec<Quad>, inserts: Vec<Quad>,
removes: Vec<Quad>, removes: Vec<Quad>,
peer_id: Vec<u8>, peer_id: Vec<u8>,
session_id: u64,
) -> Result<Vec<String>, VerifierError> { ) -> Result<Vec<String>, VerifierError> {
// options when not a publisher on the repo: // options when not a publisher on the repo:
// - skip // - skip
@ -531,12 +532,13 @@ impl Verifier {
}; };
updates.push(info); updates.push(info);
} }
self.update_graph(updates).await self.update_graph(updates, session_id).await
} }
async fn update_graph( async fn update_graph(
&mut self, &mut self,
mut updates: Vec<BranchUpdateInfo>, mut updates: Vec<BranchUpdateInfo>,
session_id: u64,
) -> Result<Vec<String>, VerifierError> { ) -> Result<Vec<String>, VerifierError> {
let updates_ref = &mut updates; let updates_ref = &mut updates;
let res = self let res = self
@ -758,6 +760,7 @@ impl Verifier {
self.orm_update( self.orm_update(
&NuriV0::new_empty(), &NuriV0::new_empty(),
nuri, nuri,
session_id,
update.transaction.as_quads_patch(graph_nuri), update.transaction.as_quads_patch(graph_nuri),
) )
.await; .await;
@ -775,6 +778,7 @@ impl Verifier {
query: &String, query: &String,
base: &Option<String>, base: &Option<String>,
peer_id: Vec<u8>, peer_id: Vec<u8>,
session_id: u64,
) -> Result<Vec<String>, String> { ) -> Result<Vec<String>, String> {
let store = self.graph_dataset.as_ref().unwrap(); let store = self.graph_dataset.as_ref().unwrap();
@ -796,6 +800,7 @@ impl Verifier {
Vec::from_iter(inserts), Vec::from_iter(inserts),
Vec::from_iter(removes), Vec::from_iter(removes),
peer_id, peer_id,
session_id,
) )
.await .await
.map_err(|e| e.to_string()) .map_err(|e| e.to_string())

@ -65,7 +65,7 @@ impl Verifier {
<> ng:social_query_from_profile <{from_profile_nuri_string}>. <> ng:social_query_from_profile <{from_profile_nuri_string}>.
<> ng:social_query_started \"{}\"^^xsd:dateTime . }}",DateTime::now()); <> ng:social_query_started \"{}\"^^xsd:dateTime . }}",DateTime::now());
let ret = self let ret = self
.process_sparql_update(&forwarder_nuri, &sparql_update, &Some(forwarder_nuri_string.clone()), vec![]) .process_sparql_update(&forwarder_nuri, &sparql_update, &Some(forwarder_nuri_string.clone()), vec![],0)
.await; .await;
if let Err(e) = ret { if let Err(e) = ret {
return Err(VerifierError::SparqlError(e)); return Err(VerifierError::SparqlError(e));
@ -78,7 +78,7 @@ impl Verifier {
// adding triples in forwarder doc : ng:social_query_id // adding triples in forwarder doc : ng:social_query_id
let sparql_update = format!("INSERT DATA {{ <{forwarder_nuri_string}> <did:ng:x:ng#{predicate}> \"{}\"^^<http://www.w3.org/2001/XMLSchema#dateTime> . }}",DateTime::now()); let sparql_update = format!("INSERT DATA {{ <{forwarder_nuri_string}> <did:ng:x:ng#{predicate}> \"{}\"^^<http://www.w3.org/2001/XMLSchema#dateTime> . }}",DateTime::now());
let ret = self let ret = self
.process_sparql_update(forwarder_nuri, &sparql_update, &None, vec![]) .process_sparql_update(forwarder_nuri, &sparql_update, &None, vec![],0)
.await; .await;
if let Err(e) = ret { if let Err(e) = ret {
return Err(VerifierError::SparqlError(e)); return Err(VerifierError::SparqlError(e));
@ -153,7 +153,7 @@ impl Verifier {
<did:ng:_> ng:social_query_forwarded_to_inbox <{to_inbox_nuri}> . <did:ng:_> ng:social_query_forwarded_to_inbox <{to_inbox_nuri}> .
}}"); }}");
let ret = self let ret = self
.process_sparql_update(&forwarder_nuri, &sparql_update, &None, vec![]) .process_sparql_update(&forwarder_nuri, &sparql_update, &None, vec![],0)
.await; .await;
if let Err(e) = ret { if let Err(e) = ret {
return Err(VerifierError::SparqlError(e)); return Err(VerifierError::SparqlError(e));
@ -593,7 +593,7 @@ impl Verifier {
let nuri_ov = NuriV0::repo_graph_name(&response.query_id, &overlay_id); let nuri_ov = NuriV0::repo_graph_name(&response.query_id, &overlay_id);
let graph_name = NamedNode::new_unchecked(&nuri_ov); let graph_name = NamedNode::new_unchecked(&nuri_ov);
let quads = triples.into_iter().map(|t| t.in_graph(graph_name.clone()) ).collect(); let quads = triples.into_iter().map(|t| t.in_graph(graph_name.clone()) ).collect();
let commits = self.prepare_sparql_update(quads, vec![], self.get_peer_id_for_skolem()).await?; let commits = self.prepare_sparql_update(quads, vec![], self.get_peer_id_for_skolem(), 0).await?;
} else { } else {
@ -665,7 +665,7 @@ impl Verifier {
{has_email} }}", details.name); {has_email} }}", details.name);
let ret = self let ret = self
.process_sparql_update(&contact_nuri, &sparql_update, &Some(contact_nuri_string), vec![]) .process_sparql_update(&contact_nuri, &sparql_update, &Some(contact_nuri_string), vec![],0)
.await; .await;
if let Err(e) = ret { if let Err(e) = ret {
return Err(VerifierError::SparqlError(e)); return Err(VerifierError::SparqlError(e));

@ -626,23 +626,39 @@ impl Verifier {
&mut self, &mut self,
scope: &NuriV0, scope: &NuriV0,
commit_nuri: String, commit_nuri: String,
session_id: u64,
patch: GraphQuadsPatch, patch: GraphQuadsPatch,
) { ) {
let mut responses = Vec::with_capacity(1);
if let Some(subs) = self.orm_subscriptions.get(scope) { if let Some(subs) = self.orm_subscriptions.get(scope) {
let mut orm_diff: Option<OrmDiff> = None;
for sub in subs { for sub in subs {
if sub.session_id == session_id {
// //TODO fix this //TODO prepare OrmUpdateBlankNodeIds
// let orm_diff = ??; let orm_bnids = vec![];
// self.push_orm_response( responses.push((
// scope, sub.session_id,
// sub.session_id, sub.sender.clone(),
// sub.sender.clone(), AppResponseV0::OrmUpdateBlankNodeIds(orm_bnids),
// AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff)), ));
// ) } else {
// .await; if orm_diff.is_none() {
//orm_diff = Some(??)
//TODO implement this
}
responses.push((
sub.session_id,
sub.sender.clone(),
AppResponseV0::OrmUpdate(orm_diff.as_ref().unwrap().to_vec()),
));
}
} }
} }
for (session_id, sender, res) in responses {
self.push_orm_response(scope, session_id, sender, AppResponse::V0(res))
.await;
}
} }
pub(crate) async fn orm_frontend_update( pub(crate) async fn orm_frontend_update(

@ -216,6 +216,7 @@ impl Verifier {
Vec::from_iter(inserts), Vec::from_iter(inserts),
Vec::from_iter(removes), Vec::from_iter(removes),
self.get_peer_id_for_skolem(), self.get_peer_id_for_skolem(),
0,
) )
.await?; .await?;
Ok(()) Ok(())
@ -696,7 +697,7 @@ impl Verifier {
); );
let ret = self let ret = self
.process_sparql_update(&store_nuri, &query, &None, vec![]) .process_sparql_update(&store_nuri, &query, &None, vec![], 0)
.await; .await;
if let Err(e) = ret { if let Err(e) = ret {
return Err(NgError::SparqlError(e)); return Err(NgError::SparqlError(e));
@ -712,7 +713,9 @@ impl Verifier {
object: Literal::new_simple_literal(primary_class).into(), object: Literal::new_simple_literal(primary_class).into(),
graph_name: NamedNode::new_unchecked(&header_branch_nuri).into(), graph_name: NamedNode::new_unchecked(&header_branch_nuri).into(),
}; };
let ret = self.prepare_sparql_update(vec![quad], vec![], vec![]).await; let ret = self
.prepare_sparql_update(vec![quad], vec![], vec![], 0)
.await;
if let Err(e) = ret { if let Err(e) = ret {
return Err(NgError::SparqlError(e.to_string())); return Err(NgError::SparqlError(e.to_string()));
} }
@ -814,6 +817,7 @@ impl Verifier {
&sparql_update, &sparql_update,
&Some(contact_doc_nuri_string), &Some(contact_doc_nuri_string),
vec![], vec![],
0,
) )
.await; .await;
if let Err(e) = ret { if let Err(e) = ret {
@ -894,6 +898,7 @@ impl Verifier {
command: &AppRequestCommandV0, command: &AppRequestCommandV0,
nuri: NuriV0, nuri: NuriV0,
payload: Option<AppRequestPayload>, payload: Option<AppRequestPayload>,
session_id: u64,
) -> Result<AppResponse, NgError> { ) -> Result<AppResponse, NgError> {
match command { match command {
AppRequestCommandV0::OrmUpdate => match payload { AppRequestCommandV0::OrmUpdate => match payload {
@ -993,7 +998,7 @@ impl Verifier {
let social_query_doc_nuri_string = NuriV0::repo_id(query_id); let social_query_doc_nuri_string = NuriV0::repo_id(query_id);
let sparql_update = format!("INSERT DATA {{ <{social_query_doc_nuri_string}> <did:ng:x:ng#social_query_forwarder> <{forwarder_nuri_string}>. }}"); let sparql_update = format!("INSERT DATA {{ <{social_query_doc_nuri_string}> <did:ng:x:ng#social_query_forwarder> <{forwarder_nuri_string}>. }}");
let ret = self let ret = self
.process_sparql_update(&nuri, &sparql_update, &None, vec![]) .process_sparql_update(&nuri, &sparql_update, &None, vec![], 0)
.await; .await;
if let Err(e) = ret { if let Err(e) = ret {
return Err(NgError::SparqlError(e)); return Err(NgError::SparqlError(e));
@ -1008,6 +1013,7 @@ impl Verifier {
&sparql_update, &sparql_update,
&Some(forwarder_nuri_string), &Some(forwarder_nuri_string),
vec![], vec![],
0,
) )
.await; .await;
if let Err(e) = ret { if let Err(e) = ret {
@ -1217,6 +1223,7 @@ impl Verifier {
&sparql, &sparql,
&base, &base,
self.get_peer_id_for_skolem(), self.get_peer_id_for_skolem(),
session_id,
) )
.await .await
{ {

@ -2836,7 +2836,10 @@ impl Verifier {
pub async fn app_request(&mut self, req: AppRequest) -> Result<AppResponse, NgError> { pub async fn app_request(&mut self, req: AppRequest) -> Result<AppResponse, NgError> {
match req { match req {
AppRequest::V0(v0) => self.process(&v0.command, v0.nuri, v0.payload).await, AppRequest::V0(v0) => {
self.process(&v0.command, v0.nuri, v0.payload, v0.session_id)
.await
}
} }
} }

Loading…
Cancel
Save