diff --git a/ng-repo/src/errors.rs b/ng-repo/src/errors.rs index 0b460b8..87bebc5 100644 --- a/ng-repo/src/errors.rs +++ b/ng-repo/src/errors.rs @@ -400,6 +400,7 @@ pub enum VerifierError { InvalidOrmSchema, OrmSubjectNotFound, OrmPredicateNotFound, + OrmSubscriptionNotFound, } impl Error for VerifierError {} diff --git a/ng-verifier/src/commits/transaction.rs b/ng-verifier/src/commits/transaction.rs index b17ba30..8303fe7 100644 --- a/ng-verifier/src/commits/transaction.rs +++ b/ng-verifier/src/commits/transaction.rs @@ -414,13 +414,18 @@ impl Verifier { } } + /// returns + /// - list of commit Nuris + /// - optional list of revert_inserts + /// - optional list of revert_removes + /// - optional list of skolemnized_blank_nodes pub(crate) async fn prepare_sparql_update( &mut self, inserts: Vec, removes: Vec, peer_id: Vec, session_id: u64, - ) -> Result, VerifierError> { + ) -> Result<(Vec, Vec, Vec, Vec), VerifierError> { // options when not a publisher on the repo: // - skip // - TODO: abort (the whole transaction) @@ -541,19 +546,12 @@ impl Verifier { updates.push(info); } match self.update_graph(updates, session_id).await { - Ok(commits) => { - if session_id != 0 { - self.orm_update_self( - &NuriV0::new_empty(), - session_id, - skolemnized_blank_nodes, - revert_inserts, - revert_removes, - ) - .await; - } - Ok(commits) - } + Ok(commits) => Ok(( + commits, + revert_inserts, + revert_removes, + skolemnized_blank_nodes, + )), Err(e) => Err(e), } } @@ -781,8 +779,9 @@ impl Verifier { let graph_nuri = NuriV0::repo_graph_name(&update.repo_id, &update.overlay_id); self.orm_update( - &NuriV0::new_empty(), session_id, + update.repo_id.clone(), + update.overlay_id, update.transaction.as_quads_patch(graph_nuri), ) .await; @@ -801,7 +800,7 @@ impl Verifier { base: &Option, peer_id: Vec, session_id: u64, - ) -> Result, String> { + ) -> Result<(Vec, Vec, Vec, Vec), String> { let store = self.graph_dataset.as_ref().unwrap(); let update = ng_oxigraph::oxigraph::sparql::Update::parse(query, base.as_deref()) @@ -816,7 +815,7 @@ impl Verifier { Err(e) => Err(e.to_string()), Ok((inserts, removes)) => { if inserts.is_empty() && removes.is_empty() { - Ok(vec![]) + Ok((vec![], vec![], vec![], vec![])) } else { self.prepare_sparql_update( Vec::from_iter(inserts), diff --git a/ng-verifier/src/inbox_processor.rs b/ng-verifier/src/inbox_processor.rs index 64b756f..f6571f2 100644 --- a/ng-verifier/src/inbox_processor.rs +++ b/ng-verifier/src/inbox_processor.rs @@ -593,7 +593,7 @@ impl Verifier { let nuri_ov = NuriV0::repo_graph_name(&response.query_id, &overlay_id); let graph_name = NamedNode::new_unchecked(&nuri_ov); let quads = triples.into_iter().map(|t| t.in_graph(graph_name.clone()) ).collect(); - let commits = self.prepare_sparql_update(quads, vec![], self.get_peer_id_for_skolem(), 0).await?; + let _ = self.prepare_sparql_update(quads, vec![], self.get_peer_id_for_skolem(), 0).await?; } else { diff --git a/ng-verifier/src/orm/mod.rs b/ng-verifier/src/orm/mod.rs index 3f495e0..d9379c5 100644 --- a/ng-verifier/src/orm/mod.rs +++ b/ng-verifier/src/orm/mod.rs @@ -14,7 +14,11 @@ pub mod validation; use futures::channel::mpsc; use futures::channel::mpsc::UnboundedSender; +use ng_net::types::OverlayLink; use ng_oxigraph::oxrdf::Quad; +use ng_repo::errors::VerifierError; +use ng_repo::types::OverlayId; +use ng_repo::types::RepoId; use std::collections::HashMap; use std::collections::HashSet; @@ -381,6 +385,37 @@ impl Verifier { }).next().unwrap() } + pub fn get_first_orm_subscription_sender_for( + &mut self, + nuri: &NuriV0, + shape: Option<&ShapeIri>, + session_id: Option<&u64>, + ) -> Result<(UnboundedSender, &OrmSubscription), VerifierError> { + let subs = self + .orm_subscriptions + .get_mut(nuri) + .unwrap(); + subs.retain(|sub| !sub.sender.is_closed()); + match + // Filter shapes, if present. + subs.iter() + .filter(|s| match shape { + Some(sh) => *sh == s.shape_type.shape, + None => true, // Filter session ids if present. + }) + .filter(|s| match session_id { + Some(id) => *id == s.session_id, + None => true, + }) + .next() + { + None => Err(VerifierError::OrmSubscriptionNotFound), + Some(subscription) => { + Ok((subscription.sender.clone(), subscription)) + } + } + } + /// Apply triples to a nuri's document. /// Updates tracked_subjects in orm_subscriptions. fn apply_triple_changes( @@ -625,93 +660,113 @@ impl Verifier { pub(crate) async fn orm_update( &mut self, - scope: &NuriV0, session_id: u64, + repo_id: RepoId, + overlay_id: OverlayId, patch: GraphQuadsPatch, ) { - let mut responses = Vec::with_capacity(1); - if let Some(subs) = self.orm_subscriptions.get(scope) { - // TODO: implement this, generate orm_diff using the patch - let orm_diff: OrmDiff = vec![]; - for sub in subs { - if sub.session_id != session_id { - responses.push(( - sub.session_id, - sub.sender.clone(), - AppResponseV0::OrmUpdate(orm_diff.to_vec()), - )); + let overlaylink: OverlayLink = overlay_id.into(); + for (scope, subs) in self.orm_subscriptions.iter_mut() { + subs.retain(|sub| !sub.sender.is_closed()); + if scope.entire_store + || scope.overlay.as_ref().map_or(false, |ol| overlaylink == *ol) + || scope.target == NuriTargetV0::Repo(repo_id) + { + for sub in subs { + if sub.session_id != session_id { // this is incorrect. we are excluding all the subscriptions from the originating session, + // while we should only exclude the one with exact same shape_type. but we don't have access to that here + + // TODO: implement this, generate orm_diff using the patch and the sub.shape_type + let orm_diff: OrmDiff = vec![]; + + _ = sub.sender.send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff.to_vec()))).await; + } } } } - for (session_id, sender, res) in responses { - self.push_orm_response(scope, session_id, sender, AppResponse::V0(res)) - .await; - } } pub(crate) async fn orm_update_self( &mut self, scope: &NuriV0, + shape_iri: ShapeIri, session_id: u64, skolemnized_blank_nodes: Vec, revert_inserts: Vec, revert_removes: Vec, - ) { - let mut responses = Vec::with_capacity(1); - if let Some(subs) = self.orm_subscriptions.get(scope) { - for sub in subs { - if sub.session_id == session_id { - // TODO prepare OrmUpdateBlankNodeIds with skolemnized_blank_nodes - // note(niko): I think skolemnized blank nodes can still be many, in case of multi-level nested sub-objects. - let orm_bnids = vec![]; - responses.push(( - session_id, - sub.sender.clone(), - AppResponseV0::OrmUpdateBlankNodeIds(orm_bnids), - )); - // TODO (later) revert the inserts and removes - // let orm_diff = vec![]; - // responses.push(( - // session_id, - // sub.sender.clone(), - // AppResponseV0::OrmUpdate(orm_diff), - // )); - break; - } - } - } - for (session_id, sender, res) in responses { - self.push_orm_response(scope, session_id, sender, AppResponse::V0(res)) - .await; - } + ) -> Result<(), VerifierError> { + + let (mut sender, orm_subscription) = + self.get_first_orm_subscription_sender_for(scope, Some(&shape_iri), Some(&session_id))?; + + // TODO prepare OrmUpdateBlankNodeIds with skolemnized_blank_nodes + // use orm_subscription if needed + // note(niko): I think skolemnized blank nodes can still be many, in case of multi-level nested sub-objects. + let orm_bnids = vec![]; + let _ = sender.send(AppResponse::V0(AppResponseV0::OrmUpdateBlankNodeIds(orm_bnids))).await; + + // TODO (later) revert the inserts and removes + // let orm_diff = vec![]; + // let _ = sender.send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff))).await; + + Ok(()) } pub(crate) async fn orm_frontend_update( &mut self, + session_id: u64, scope: &NuriV0, shape_iri: ShapeIri, diff: OrmDiff, - ) { - log_info!("frontend_update_orm {:?} {} {:?}", scope, shape_iri, diff); - } - - pub(crate) async fn push_orm_response( - &mut self, - scope: &NuriV0, - session_id: u64, - sender: UnboundedSender, - response: AppResponse, - ) { - log_debug!("sending orm response for session {}:", session_id); - - if sender.is_closed() { - log_debug!("closed so removing session {}", session_id); + ) -> Result<(), String> { + log_info!( + "frontend_update_orm session={} scope={:?} shape={} diff={:?}", + session_id, + scope, + shape_iri, + diff + ); + + // find OrmSubscription + let (doc_nuri, sparql_update) = { + let orm_subscription = + self.get_first_orm_subscription_for(scope, Some(&shape_iri), Some(&session_id)); + + // use orm_subscription as needed + // do the magic, then, find the doc where the query should start and generate the sparql update + let doc_nuri = NuriV0::new_empty(); + let sparql_update: String = String::new(); + (doc_nuri, sparql_update) + }; - if let Some(subs) = self.orm_subscriptions.get_mut(&scope) { - subs.retain(|sub| sub.session_id != session_id); + match self + .process_sparql_update( + &doc_nuri, + &sparql_update, + &None, + self.get_peer_id_for_skolem(), + session_id, + ) + .await + { + Err(e) => Err(e), + Ok((_, revert_inserts, revert_removes, skolemnized_blank_nodes)) => { + if !revert_inserts.is_empty() + || !revert_removes.is_empty() + || !skolemnized_blank_nodes.is_empty() + { + self.orm_update_self( + scope, + shape_iri, + session_id, + skolemnized_blank_nodes, + revert_inserts, + revert_removes, + ) + .await.map_err(|e|e.to_string())?; + } + Ok(()) } - } else { - let _ = sender.clone().send(response).await; } } @@ -730,7 +785,7 @@ impl Verifier { shape_type: &OrmShapeType, session_id: u64, ) -> Result<(Receiver, CancelFn), NgError> { - let (tx, rx) = mpsc::unbounded::(); + let (mut tx, rx) = mpsc::unbounded::(); // TODO: Validate schema: // If multiple data types are present for the same predicate, they must be of of the same type. @@ -753,13 +808,7 @@ impl Verifier { let orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type)?; // log_debug!("create_orm_object_for_shape return {:?}", orm_objects); - self.push_orm_response( - &nuri.clone(), - session_id, - tx.clone(), - AppResponse::V0(AppResponseV0::OrmInitial(orm_objects)), - ) - .await; + let _ = tx.send(AppResponse::V0(AppResponseV0::OrmInitial(orm_objects))).await; let close = Box::new(move || { log_debug!("CLOSE_CHANNEL of subscription"); diff --git a/ng-verifier/src/request_processor.rs b/ng-verifier/src/request_processor.rs index e1b8a5f..ab24952 100644 --- a/ng-verifier/src/request_processor.rs +++ b/ng-verifier/src/request_processor.rs @@ -212,13 +212,14 @@ impl Verifier { if inserts.is_empty() && removes.is_empty() { Ok(()) } else { - self.prepare_sparql_update( - Vec::from_iter(inserts), - Vec::from_iter(removes), - self.get_peer_id_for_skolem(), - 0, - ) - .await?; + let _ = self + .prepare_sparql_update( + Vec::from_iter(inserts), + Vec::from_iter(removes), + self.get_peer_id_for_skolem(), + 0, + ) + .await?; Ok(()) } } @@ -903,7 +904,13 @@ impl Verifier { match command { AppRequestCommandV0::OrmUpdate => match payload { Some(AppRequestPayload::V0(AppRequestPayloadV0::OrmUpdate((diff, shape_id)))) => { - self.orm_frontend_update(&nuri, shape_id, diff).await + return match self + .orm_frontend_update(session_id, &nuri, shape_id, diff) + .await + { + Err(e) => Ok(AppResponse::error(e)), + Ok(()) => Ok(AppResponse::ok()), + } } _ => return Err(NgError::InvalidArgument), }, @@ -1228,7 +1235,7 @@ impl Verifier { .await { Err(e) => AppResponse::error(e), - Ok(commits) => AppResponse::commits(commits), + Ok((commits, ..)) => AppResponse::commits(commits), }, ) } else { diff --git a/sdk/ng-sdk-js/examples/multi-framework-signals/src/app/pages/index.astro b/sdk/ng-sdk-js/examples/multi-framework-signals/src/app/pages/index.astro index a868115..a5e636d 100644 --- a/sdk/ng-sdk-js/examples/multi-framework-signals/src/app/pages/index.astro +++ b/sdk/ng-sdk-js/examples/multi-framework-signals/src/app/pages/index.astro @@ -6,21 +6,21 @@ import VueRoot from "../components/VueRoot.vue"; import ReactRoot from "../components/ReactRoot"; import SvelteRoot from "../components/SvelteRoot.svelte"; // Hack to get mock backend started -import { mockTestObject } from "../../ng-mock/wasm-land/shapeHandler"; +//import { mockTestObject } from "../../ng-mock/wasm-land/shapeHandler"; const title = "Multi-framework app"; --- - - - + + + - - - + + + - - - + + +