|
|
|
@ -14,7 +14,11 @@ pub mod validation; |
|
|
|
|
|
|
|
|
|
use futures::channel::mpsc; |
|
|
|
|
use futures::channel::mpsc::UnboundedSender; |
|
|
|
|
use ng_net::types::OverlayLink; |
|
|
|
|
use ng_oxigraph::oxrdf::Quad; |
|
|
|
|
use ng_repo::errors::VerifierError; |
|
|
|
|
use ng_repo::types::OverlayId; |
|
|
|
|
use ng_repo::types::RepoId; |
|
|
|
|
|
|
|
|
|
use std::collections::HashMap; |
|
|
|
|
use std::collections::HashSet; |
|
|
|
@ -381,6 +385,37 @@ impl Verifier { |
|
|
|
|
}).next().unwrap() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn get_first_orm_subscription_sender_for( |
|
|
|
|
&mut self, |
|
|
|
|
nuri: &NuriV0, |
|
|
|
|
shape: Option<&ShapeIri>, |
|
|
|
|
session_id: Option<&u64>, |
|
|
|
|
) -> Result<(UnboundedSender<AppResponse>, &OrmSubscription), VerifierError> { |
|
|
|
|
let subs = self |
|
|
|
|
.orm_subscriptions |
|
|
|
|
.get_mut(nuri) |
|
|
|
|
.unwrap(); |
|
|
|
|
subs.retain(|sub| !sub.sender.is_closed()); |
|
|
|
|
match
|
|
|
|
|
// Filter shapes, if present.
|
|
|
|
|
subs.iter() |
|
|
|
|
.filter(|s| match shape { |
|
|
|
|
Some(sh) => *sh == s.shape_type.shape, |
|
|
|
|
None => true, // Filter session ids if present.
|
|
|
|
|
}) |
|
|
|
|
.filter(|s| match session_id { |
|
|
|
|
Some(id) => *id == s.session_id, |
|
|
|
|
None => true, |
|
|
|
|
}) |
|
|
|
|
.next() |
|
|
|
|
{ |
|
|
|
|
None => Err(VerifierError::OrmSubscriptionNotFound), |
|
|
|
|
Some(subscription) => { |
|
|
|
|
Ok((subscription.sender.clone(), subscription)) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Apply triples to a nuri's document.
|
|
|
|
|
/// Updates tracked_subjects in orm_subscriptions.
|
|
|
|
|
fn apply_triple_changes( |
|
|
|
@ -625,93 +660,113 @@ impl Verifier { |
|
|
|
|
|
|
|
|
|
pub(crate) async fn orm_update( |
|
|
|
|
&mut self, |
|
|
|
|
scope: &NuriV0, |
|
|
|
|
session_id: u64, |
|
|
|
|
repo_id: RepoId, |
|
|
|
|
overlay_id: OverlayId, |
|
|
|
|
patch: GraphQuadsPatch, |
|
|
|
|
) { |
|
|
|
|
let mut responses = Vec::with_capacity(1); |
|
|
|
|
if let Some(subs) = self.orm_subscriptions.get(scope) { |
|
|
|
|
// TODO: implement this, generate orm_diff using the patch
|
|
|
|
|
let orm_diff: OrmDiff = vec![]; |
|
|
|
|
for sub in subs { |
|
|
|
|
if sub.session_id != session_id { |
|
|
|
|
responses.push(( |
|
|
|
|
sub.session_id, |
|
|
|
|
sub.sender.clone(), |
|
|
|
|
AppResponseV0::OrmUpdate(orm_diff.to_vec()), |
|
|
|
|
)); |
|
|
|
|
let overlaylink: OverlayLink = overlay_id.into();
|
|
|
|
|
for (scope, subs) in self.orm_subscriptions.iter_mut() { |
|
|
|
|
subs.retain(|sub| !sub.sender.is_closed()); |
|
|
|
|
if scope.entire_store
|
|
|
|
|
|| scope.overlay.as_ref().map_or(false, |ol| overlaylink == *ol) |
|
|
|
|
|| scope.target == NuriTargetV0::Repo(repo_id) |
|
|
|
|
{ |
|
|
|
|
for sub in subs { |
|
|
|
|
if sub.session_id != session_id { // this is incorrect. we are excluding all the subscriptions from the originating session,
|
|
|
|
|
// while we should only exclude the one with exact same shape_type. but we don't have access to that here
|
|
|
|
|
|
|
|
|
|
// TODO: implement this, generate orm_diff using the patch and the sub.shape_type
|
|
|
|
|
let orm_diff: OrmDiff = vec![]; |
|
|
|
|
|
|
|
|
|
_ = sub.sender.send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff.to_vec()))).await; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
for (session_id, sender, res) in responses { |
|
|
|
|
self.push_orm_response(scope, session_id, sender, AppResponse::V0(res)) |
|
|
|
|
.await; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub(crate) async fn orm_update_self( |
|
|
|
|
&mut self, |
|
|
|
|
scope: &NuriV0, |
|
|
|
|
shape_iri: ShapeIri, |
|
|
|
|
session_id: u64, |
|
|
|
|
skolemnized_blank_nodes: Vec<Quad>, |
|
|
|
|
revert_inserts: Vec<Quad>, |
|
|
|
|
revert_removes: Vec<Quad>, |
|
|
|
|
) { |
|
|
|
|
let mut responses = Vec::with_capacity(1); |
|
|
|
|
if let Some(subs) = self.orm_subscriptions.get(scope) { |
|
|
|
|
for sub in subs { |
|
|
|
|
if sub.session_id == session_id { |
|
|
|
|
// TODO prepare OrmUpdateBlankNodeIds with skolemnized_blank_nodes
|
|
|
|
|
// note(niko): I think skolemnized blank nodes can still be many, in case of multi-level nested sub-objects.
|
|
|
|
|
let orm_bnids = vec![]; |
|
|
|
|
responses.push(( |
|
|
|
|
session_id, |
|
|
|
|
sub.sender.clone(), |
|
|
|
|
AppResponseV0::OrmUpdateBlankNodeIds(orm_bnids), |
|
|
|
|
)); |
|
|
|
|
// TODO (later) revert the inserts and removes
|
|
|
|
|
// let orm_diff = vec![];
|
|
|
|
|
// responses.push((
|
|
|
|
|
// session_id,
|
|
|
|
|
// sub.sender.clone(),
|
|
|
|
|
// AppResponseV0::OrmUpdate(orm_diff),
|
|
|
|
|
// ));
|
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
for (session_id, sender, res) in responses { |
|
|
|
|
self.push_orm_response(scope, session_id, sender, AppResponse::V0(res)) |
|
|
|
|
.await; |
|
|
|
|
} |
|
|
|
|
) -> Result<(), VerifierError> { |
|
|
|
|
|
|
|
|
|
let (mut sender, orm_subscription) = |
|
|
|
|
self.get_first_orm_subscription_sender_for(scope, Some(&shape_iri), Some(&session_id))?; |
|
|
|
|
|
|
|
|
|
// TODO prepare OrmUpdateBlankNodeIds with skolemnized_blank_nodes
|
|
|
|
|
// use orm_subscription if needed
|
|
|
|
|
// note(niko): I think skolemnized blank nodes can still be many, in case of multi-level nested sub-objects.
|
|
|
|
|
let orm_bnids = vec![]; |
|
|
|
|
let _ = sender.send(AppResponse::V0(AppResponseV0::OrmUpdateBlankNodeIds(orm_bnids))).await; |
|
|
|
|
|
|
|
|
|
// TODO (later) revert the inserts and removes
|
|
|
|
|
// let orm_diff = vec![];
|
|
|
|
|
// let _ = sender.send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff))).await;
|
|
|
|
|
|
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub(crate) async fn orm_frontend_update( |
|
|
|
|
&mut self, |
|
|
|
|
session_id: u64, |
|
|
|
|
scope: &NuriV0, |
|
|
|
|
shape_iri: ShapeIri, |
|
|
|
|
diff: OrmDiff, |
|
|
|
|
) { |
|
|
|
|
log_info!("frontend_update_orm {:?} {} {:?}", scope, shape_iri, diff); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub(crate) async fn push_orm_response( |
|
|
|
|
&mut self, |
|
|
|
|
scope: &NuriV0, |
|
|
|
|
session_id: u64, |
|
|
|
|
sender: UnboundedSender<AppResponse>, |
|
|
|
|
response: AppResponse, |
|
|
|
|
) { |
|
|
|
|
log_debug!("sending orm response for session {}:", session_id); |
|
|
|
|
|
|
|
|
|
if sender.is_closed() { |
|
|
|
|
log_debug!("closed so removing session {}", session_id); |
|
|
|
|
) -> Result<(), String> { |
|
|
|
|
log_info!( |
|
|
|
|
"frontend_update_orm session={} scope={:?} shape={} diff={:?}", |
|
|
|
|
session_id, |
|
|
|
|
scope, |
|
|
|
|
shape_iri, |
|
|
|
|
diff |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
// find OrmSubscription
|
|
|
|
|
let (doc_nuri, sparql_update) = { |
|
|
|
|
let orm_subscription = |
|
|
|
|
self.get_first_orm_subscription_for(scope, Some(&shape_iri), Some(&session_id)); |
|
|
|
|
|
|
|
|
|
// use orm_subscription as needed
|
|
|
|
|
// do the magic, then, find the doc where the query should start and generate the sparql update
|
|
|
|
|
let doc_nuri = NuriV0::new_empty(); |
|
|
|
|
let sparql_update: String = String::new(); |
|
|
|
|
(doc_nuri, sparql_update) |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
if let Some(subs) = self.orm_subscriptions.get_mut(&scope) { |
|
|
|
|
subs.retain(|sub| sub.session_id != session_id); |
|
|
|
|
match self |
|
|
|
|
.process_sparql_update( |
|
|
|
|
&doc_nuri, |
|
|
|
|
&sparql_update, |
|
|
|
|
&None, |
|
|
|
|
self.get_peer_id_for_skolem(), |
|
|
|
|
session_id, |
|
|
|
|
) |
|
|
|
|
.await |
|
|
|
|
{ |
|
|
|
|
Err(e) => Err(e), |
|
|
|
|
Ok((_, revert_inserts, revert_removes, skolemnized_blank_nodes)) => { |
|
|
|
|
if !revert_inserts.is_empty() |
|
|
|
|
|| !revert_removes.is_empty() |
|
|
|
|
|| !skolemnized_blank_nodes.is_empty() |
|
|
|
|
{ |
|
|
|
|
self.orm_update_self( |
|
|
|
|
scope, |
|
|
|
|
shape_iri, |
|
|
|
|
session_id, |
|
|
|
|
skolemnized_blank_nodes, |
|
|
|
|
revert_inserts, |
|
|
|
|
revert_removes, |
|
|
|
|
) |
|
|
|
|
.await.map_err(|e|e.to_string())?; |
|
|
|
|
} |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
let _ = sender.clone().send(response).await; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -730,7 +785,7 @@ impl Verifier { |
|
|
|
|
shape_type: &OrmShapeType, |
|
|
|
|
session_id: u64, |
|
|
|
|
) -> Result<(Receiver<AppResponse>, CancelFn), NgError> { |
|
|
|
|
let (tx, rx) = mpsc::unbounded::<AppResponse>(); |
|
|
|
|
let (mut tx, rx) = mpsc::unbounded::<AppResponse>(); |
|
|
|
|
|
|
|
|
|
// TODO: Validate schema:
|
|
|
|
|
// If multiple data types are present for the same predicate, they must be of of the same type.
|
|
|
|
@ -753,13 +808,7 @@ impl Verifier { |
|
|
|
|
let orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type)?; |
|
|
|
|
// log_debug!("create_orm_object_for_shape return {:?}", orm_objects);
|
|
|
|
|
|
|
|
|
|
self.push_orm_response( |
|
|
|
|
&nuri.clone(), |
|
|
|
|
session_id, |
|
|
|
|
tx.clone(), |
|
|
|
|
AppResponse::V0(AppResponseV0::OrmInitial(orm_objects)), |
|
|
|
|
) |
|
|
|
|
.await; |
|
|
|
|
let _ = tx.send(AppResponse::V0(AppResponseV0::OrmInitial(orm_objects))).await; |
|
|
|
|
|
|
|
|
|
let close = Box::new(move || { |
|
|
|
|
log_debug!("CLOSE_CHANNEL of subscription"); |
|
|
|
|