orm_update_self

feat/orm
Niko PLP 6 days ago
parent d54290d993
commit 8472562913
  1. 44
      ng-verifier/src/commits/transaction.rs
  2. 59
      ng-verifier/src/orm/mod.rs

@ -393,14 +393,16 @@ impl Verifier {
// TODO: implement TargetBranchV0::Named // TODO: implement TargetBranchV0::Named
_ => unimplemented!(), _ => unimplemented!(),
}; };
let _ = branches.entry(branch_id).or_insert(( if is_publisher {
store.get_store_repo().clone(), let _ = branches.entry(branch_id).or_insert((
repo.id, store.get_store_repo().clone(),
branch_type, repo.id,
topic_id, branch_type,
token, topic_id,
store.overlay_id, token,
)); store.overlay_id,
));
}
let _ = nuri_branches.entry(graph_name.clone()).or_insert(( let _ = nuri_branches.entry(graph_name.clone()).or_insert((
repo.id, repo.id,
branch_id, branch_id,
@ -424,6 +426,9 @@ impl Verifier {
// - TODO: abort (the whole transaction) // - TODO: abort (the whole transaction)
// - TODO: inbox (sent to inbox of document for a suggested update) // - TODO: inbox (sent to inbox of document for a suggested update)
// for now we just do skip, without giving option to user // for now we just do skip, without giving option to user
let mut revert_inserts: Vec<Quad> = vec![];
let mut revert_removes: Vec<Quad> = vec![];
let mut skolemnized_blank_nodes: Vec<Quad> = vec![];
let mut inserts_map: HashMap<BranchId, HashSet<Triple>> = HashMap::with_capacity(1); let mut inserts_map: HashMap<BranchId, HashSet<Triple>> = HashMap::with_capacity(1);
let mut removes_map: HashMap<BranchId, HashSet<Triple>> = HashMap::with_capacity(1); let mut removes_map: HashMap<BranchId, HashSet<Triple>> = HashMap::with_capacity(1);
let mut branches: HashMap< let mut branches: HashMap<
@ -438,6 +443,7 @@ impl Verifier {
let (repo_id, branch_id, is_publisher) = let (repo_id, branch_id, is_publisher) =
self.find_branch_and_repo_for_quad(&insert, &mut branches, &mut nuri_branches)?; self.find_branch_and_repo_for_quad(&insert, &mut branches, &mut nuri_branches)?;
if !is_publisher { if !is_publisher {
revert_inserts.push(insert);
continue; continue;
} }
let set = inserts_map.entry(branch_id).or_insert_with(|| { let set = inserts_map.entry(branch_id).or_insert_with(|| {
@ -463,6 +469,7 @@ impl Verifier {
let iri = let iri =
NuriV0::repo_skolem(&repo_id, &peer_id, b.as_ref().unique_id().unwrap())?; NuriV0::repo_skolem(&repo_id, &peer_id, b.as_ref().unique_id().unwrap())?;
insert.object = Term::NamedNode(NamedNode::new_unchecked(iri)); insert.object = Term::NamedNode(NamedNode::new_unchecked(iri));
skolemnized_blank_nodes.push(insert.clone());
} }
} }
// TODO deal with triples in subject and object (RDF-STAR) // TODO deal with triples in subject and object (RDF-STAR)
@ -473,6 +480,7 @@ impl Verifier {
let (repo_id, branch_id, is_publisher) = let (repo_id, branch_id, is_publisher) =
self.find_branch_and_repo_for_quad(&remove, &mut branches, &mut nuri_branches)?; self.find_branch_and_repo_for_quad(&remove, &mut branches, &mut nuri_branches)?;
if !is_publisher { if !is_publisher {
revert_removes.push(remove);
continue; continue;
} }
let set = removes_map.entry(branch_id).or_insert_with(|| { let set = removes_map.entry(branch_id).or_insert_with(|| {
@ -532,7 +540,22 @@ impl Verifier {
}; };
updates.push(info); updates.push(info);
} }
self.update_graph(updates, session_id).await match self.update_graph(updates, session_id).await {
Ok(commits) => {
if session_id != 0 {
self.orm_update_self(
&NuriV0::new_empty(),
session_id,
skolemnized_blank_nodes,
revert_inserts,
revert_removes,
)
.await;
}
Ok(commits)
}
Err(e) => Err(e),
}
} }
async fn update_graph( async fn update_graph(
@ -743,7 +766,7 @@ impl Verifier {
} else { } else {
let graph_patch = update.transaction.as_patch(); let graph_patch = update.transaction.as_patch();
let nuri = NuriV0::commit(&update.repo_id, &update.commit_id); let nuri = NuriV0::commit(&update.repo_id, &update.commit_id);
commit_nuris.push(nuri.clone()); commit_nuris.push(nuri);
self.push_app_response( self.push_app_response(
&update.branch_id, &update.branch_id,
AppResponse::V0(AppResponseV0::Patch(AppPatch { AppResponse::V0(AppResponseV0::Patch(AppPatch {
@ -759,7 +782,6 @@ impl Verifier {
NuriV0::repo_graph_name(&update.repo_id, &update.overlay_id); NuriV0::repo_graph_name(&update.repo_id, &update.overlay_id);
self.orm_update( self.orm_update(
&NuriV0::new_empty(), &NuriV0::new_empty(),
nuri,
session_id, session_id,
update.transaction.as_quads_patch(graph_nuri), update.transaction.as_quads_patch(graph_nuri),
) )

@ -14,6 +14,7 @@ pub mod validation;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::channel::mpsc::UnboundedSender; use futures::channel::mpsc::UnboundedSender;
use ng_oxigraph::oxrdf::Quad;
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::HashSet; use std::collections::HashSet;
@ -625,33 +626,57 @@ impl Verifier {
pub(crate) async fn orm_update( pub(crate) async fn orm_update(
&mut self, &mut self,
scope: &NuriV0, scope: &NuriV0,
commit_nuri: String,
session_id: u64, session_id: u64,
patch: GraphQuadsPatch, patch: GraphQuadsPatch,
) { ) {
let mut responses = Vec::with_capacity(1); let mut responses = Vec::with_capacity(1);
if let Some(subs) = self.orm_subscriptions.get(scope) { if let Some(subs) = self.orm_subscriptions.get(scope) {
let mut orm_diff: Option<OrmDiff> = None; // TODO: implement this, generate orm_diff using the patch
let orm_diff: OrmDiff = vec![];
for sub in subs { for sub in subs {
if sub.session_id == session_id { if sub.session_id != session_id {
//TODO prepare OrmUpdateBlankNodeIds
let orm_bnids = vec![];
responses.push(( responses.push((
sub.session_id, sub.session_id,
sub.sender.clone(), sub.sender.clone(),
AppResponseV0::OrmUpdateBlankNodeIds(orm_bnids), AppResponseV0::OrmUpdate(orm_diff.to_vec()),
)); ));
} else { }
if orm_diff.is_none() { }
//orm_diff = Some(??) }
//TODO implement this for (session_id, sender, res) in responses {
} self.push_orm_response(scope, session_id, sender, AppResponse::V0(res))
.await;
}
}
pub(crate) async fn orm_update_self(
&mut self,
scope: &NuriV0,
session_id: u64,
skolemnized_blank_nodes: Vec<Quad>,
revert_inserts: Vec<Quad>,
revert_removes: Vec<Quad>,
) {
let mut responses = Vec::with_capacity(1);
if let Some(subs) = self.orm_subscriptions.get(scope) {
for sub in subs {
if sub.session_id == session_id {
// TODO prepare OrmUpdateBlankNodeIds with skolemnized_blank_nodes
// note(niko): I think skolemnized blank nodes can still be many, in case of multi-level nested sub-objects.
let orm_bnids = vec![];
responses.push(( responses.push((
sub.session_id, session_id,
sub.sender.clone(), sub.sender.clone(),
AppResponseV0::OrmUpdate(orm_diff.as_ref().unwrap().to_vec()), AppResponseV0::OrmUpdateBlankNodeIds(orm_bnids),
)); ));
// TODO (later) revert the inserts and removes
// let orm_diff = vec![];
// responses.push((
// session_id,
// sub.sender.clone(),
// AppResponseV0::OrmUpdate(orm_diff),
// ));
break;
} }
} }
} }
@ -672,7 +697,7 @@ impl Verifier {
pub(crate) async fn push_orm_response( pub(crate) async fn push_orm_response(
&mut self, &mut self,
nuri: &NuriV0, scope: &NuriV0,
session_id: u64, session_id: u64,
sender: UnboundedSender<AppResponse>, sender: UnboundedSender<AppResponse>,
response: AppResponse, response: AppResponse,
@ -682,7 +707,9 @@ impl Verifier {
if sender.is_closed() { if sender.is_closed() {
log_debug!("closed so removing session {}", session_id); log_debug!("closed so removing session {}", session_id);
self.orm_subscriptions.remove(&nuri); if let Some(subs) = self.orm_subscriptions.get_mut(&scope) {
subs.retain(|sub| sub.session_id != session_id);
}
} else { } else {
let _ = sender.clone().send(response).await; let _ = sender.clone().send(response).await;
} }

Loading…
Cancel
Save