Compare commits

...

8 Commits

  1. 25
      Cargo.lock
  2. 4
      nextgraph/Cargo.toml
  3. 1211
      nextgraph/src/tests/orm.rs
  4. 3
      ng-net/src/app_protocol.rs
  5. 8
      ng-net/src/orm.rs
  6. 1
      ng-repo/src/errors.rs
  7. 58
      ng-verifier/src/commits/transaction.rs
  8. 10
      ng-verifier/src/inbox_processor.rs
  9. 9
      ng-verifier/src/orm/add_remove_triples.rs
  10. 202
      ng-verifier/src/orm/mod.rs
  11. 127
      ng-verifier/src/orm/utils.rs
  12. 26
      ng-verifier/src/orm/validation.rs
  13. 36
      ng-verifier/src/request_processor.rs
  14. 5
      ng-verifier/src/verifier.rs
  15. 3
      package.json
  16. 3
      sdk/ng-sdk-js/examples/multi-framework-signals/package.json
  17. 20
      sdk/ng-sdk-js/examples/multi-framework-signals/src/app/pages/index.astro
  18. 28
      sdk/ng-sdk-js/ng-signals/src/connector/createSignalObjectForShape.ts

25
Cargo.lock generated

@ -596,6 +596,19 @@ dependencies = [
"pkg-config", "pkg-config",
] ]
[[package]]
name = "canonical_json"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f89083fd014d71c47a718d7f4ac050864dac8587668dbe90baf9e261064c5710"
dependencies = [
"hex",
"regex",
"serde",
"serde_json",
"thiserror 1.0.69",
]
[[package]] [[package]]
name = "cast" name = "cast"
version = "0.3.0" version = "0.3.0"
@ -2156,6 +2169,7 @@ dependencies = [
"async-std", "async-std",
"async-trait", "async-trait",
"base64-url", "base64-url",
"canonical_json",
"futures", "futures",
"lazy_static", "lazy_static",
"ng-client-ws", "ng-client-ws",
@ -2171,6 +2185,7 @@ dependencies = [
"serde_bare", "serde_bare",
"serde_bytes", "serde_bytes",
"serde_json", "serde_json",
"serde_json_diff",
"svg2pdf", "svg2pdf",
"web-time", "web-time",
"whoami", "whoami",
@ -3497,6 +3512,16 @@ dependencies = [
"serde_core", "serde_core",
] ]
[[package]]
name = "serde_json_diff"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac615f2de9556d78ec9d5924abae441d1764f833fbd6db24bb56d2b6b5200ed"
dependencies = [
"serde",
"serde_json",
]
[[package]] [[package]]
name = "serde_urlencoded" name = "serde_urlencoded"
version = "0.7.1" version = "0.7.1"

@ -45,6 +45,10 @@ ng-client-ws = { path = "../ng-client-ws", version = "0.1.2" }
ng-verifier = { path = "../ng-verifier", version = "0.1.2" } ng-verifier = { path = "../ng-verifier", version = "0.1.2" }
ng-oxigraph = { path = "../ng-oxigraph", version = "0.4.0-alpha.8-ngalpha" } ng-oxigraph = { path = "../ng-oxigraph", version = "0.4.0-alpha.8-ngalpha" }
[dev-dependencies]
serde_json_diff = "0.2.0"
canonical_json = "0.5.0"
[target.'cfg(all(not(target_family = "wasm"),not(docsrs)))'.dependencies] [target.'cfg(all(not(target_family = "wasm"),not(docsrs)))'.dependencies]
ng-storage-rocksdb = { path = "../ng-storage-rocksdb", version = "0.1.2" } ng-storage-rocksdb = { path = "../ng-storage-rocksdb", version = "0.1.2" }

File diff suppressed because it is too large Load Diff

@ -20,7 +20,7 @@ use ng_repo::utils::{decode_digest, decode_key, decode_sym_key};
use ng_repo::utils::{decode_overlayid, display_timestamp_local}; use ng_repo::utils::{decode_overlayid, display_timestamp_local};
use serde_json::Value; use serde_json::Value;
use crate::orm::{OrmDiff, OrmShapeType}; use crate::orm::{OrmDiff, OrmShapeType, OrmUpdateBlankNodeIds};
use crate::types::*; use crate::types::*;
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
@ -1309,6 +1309,7 @@ pub enum AppResponseV0 {
Commits(Vec<String>), Commits(Vec<String>),
OrmInitial(Value), OrmInitial(Value),
OrmUpdate(OrmDiff), OrmUpdate(OrmDiff),
OrmUpdateBlankNodeIds(OrmUpdateBlankNodeIds),
OrmError(String), OrmError(String),
} }

@ -48,6 +48,14 @@ pub struct OrmDiffOp {
pub type OrmDiff = Vec<OrmDiffOp>; pub type OrmDiff = Vec<OrmDiffOp>;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OrmUpdateBlankNodeId {
pub path: String,
pub nuri: String,
}
pub type OrmUpdateBlankNodeIds = Vec<OrmUpdateBlankNodeId>;
pub type OrmSchema = HashMap<String, Arc<OrmSchemaShape>>; pub type OrmSchema = HashMap<String, Arc<OrmSchemaShape>>;
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]

@ -400,6 +400,7 @@ pub enum VerifierError {
InvalidOrmSchema, InvalidOrmSchema,
OrmSubjectNotFound, OrmSubjectNotFound,
OrmPredicateNotFound, OrmPredicateNotFound,
OrmSubscriptionNotFound,
} }
impl Error for VerifierError {} impl Error for VerifierError {}

@ -295,7 +295,7 @@ impl Verifier {
transaction, transaction,
commit_info, commit_info,
}; };
self.update_graph(vec![info]).await?; self.update_graph(vec![info], 0).await?;
} else } else
//TODO: change the logic here. transaction commits can have both a discrete and graph update. Only one AppResponse should be sent in this case, containing both updates. //TODO: change the logic here. transaction commits can have both a discrete and graph update. Only one AppResponse should be sent in this case, containing both updates.
if body.discrete.is_some() { if body.discrete.is_some() {
@ -393,14 +393,16 @@ impl Verifier {
// TODO: implement TargetBranchV0::Named // TODO: implement TargetBranchV0::Named
_ => unimplemented!(), _ => unimplemented!(),
}; };
let _ = branches.entry(branch_id).or_insert(( if is_publisher {
store.get_store_repo().clone(), let _ = branches.entry(branch_id).or_insert((
repo.id, store.get_store_repo().clone(),
branch_type, repo.id,
topic_id, branch_type,
token, topic_id,
store.overlay_id, token,
)); store.overlay_id,
));
}
let _ = nuri_branches.entry(graph_name.clone()).or_insert(( let _ = nuri_branches.entry(graph_name.clone()).or_insert((
repo.id, repo.id,
branch_id, branch_id,
@ -412,17 +414,26 @@ impl Verifier {
} }
} }
/// returns
/// - list of commit Nuris
/// - optional list of revert_inserts
/// - optional list of revert_removes
/// - optional list of skolemnized_blank_nodes
pub(crate) async fn prepare_sparql_update( pub(crate) async fn prepare_sparql_update(
&mut self, &mut self,
inserts: Vec<Quad>, inserts: Vec<Quad>,
removes: Vec<Quad>, removes: Vec<Quad>,
peer_id: Vec<u8>, peer_id: Vec<u8>,
) -> Result<Vec<String>, VerifierError> { session_id: u64,
) -> Result<(Vec<String>, Vec<Quad>, Vec<Quad>, Vec<Quad>), VerifierError> {
// options when not a publisher on the repo: // options when not a publisher on the repo:
// - skip // - skip
// - TODO: abort (the whole transaction) // - TODO: abort (the whole transaction)
// - TODO: inbox (sent to inbox of document for a suggested update) // - TODO: inbox (sent to inbox of document for a suggested update)
// for now we just do skip, without giving option to user // for now we just do skip, without giving option to user
let mut revert_inserts: Vec<Quad> = vec![];
let mut revert_removes: Vec<Quad> = vec![];
let mut skolemnized_blank_nodes: Vec<Quad> = vec![];
let mut inserts_map: HashMap<BranchId, HashSet<Triple>> = HashMap::with_capacity(1); let mut inserts_map: HashMap<BranchId, HashSet<Triple>> = HashMap::with_capacity(1);
let mut removes_map: HashMap<BranchId, HashSet<Triple>> = HashMap::with_capacity(1); let mut removes_map: HashMap<BranchId, HashSet<Triple>> = HashMap::with_capacity(1);
let mut branches: HashMap< let mut branches: HashMap<
@ -437,6 +448,7 @@ impl Verifier {
let (repo_id, branch_id, is_publisher) = let (repo_id, branch_id, is_publisher) =
self.find_branch_and_repo_for_quad(&insert, &mut branches, &mut nuri_branches)?; self.find_branch_and_repo_for_quad(&insert, &mut branches, &mut nuri_branches)?;
if !is_publisher { if !is_publisher {
revert_inserts.push(insert);
continue; continue;
} }
let set = inserts_map.entry(branch_id).or_insert_with(|| { let set = inserts_map.entry(branch_id).or_insert_with(|| {
@ -462,6 +474,7 @@ impl Verifier {
let iri = let iri =
NuriV0::repo_skolem(&repo_id, &peer_id, b.as_ref().unique_id().unwrap())?; NuriV0::repo_skolem(&repo_id, &peer_id, b.as_ref().unique_id().unwrap())?;
insert.object = Term::NamedNode(NamedNode::new_unchecked(iri)); insert.object = Term::NamedNode(NamedNode::new_unchecked(iri));
skolemnized_blank_nodes.push(insert.clone());
} }
} }
// TODO deal with triples in subject and object (RDF-STAR) // TODO deal with triples in subject and object (RDF-STAR)
@ -472,6 +485,7 @@ impl Verifier {
let (repo_id, branch_id, is_publisher) = let (repo_id, branch_id, is_publisher) =
self.find_branch_and_repo_for_quad(&remove, &mut branches, &mut nuri_branches)?; self.find_branch_and_repo_for_quad(&remove, &mut branches, &mut nuri_branches)?;
if !is_publisher { if !is_publisher {
revert_removes.push(remove);
continue; continue;
} }
let set = removes_map.entry(branch_id).or_insert_with(|| { let set = removes_map.entry(branch_id).or_insert_with(|| {
@ -531,12 +545,21 @@ impl Verifier {
}; };
updates.push(info); updates.push(info);
} }
self.update_graph(updates).await match self.update_graph(updates, session_id).await {
Ok(commits) => Ok((
commits,
revert_inserts,
revert_removes,
skolemnized_blank_nodes,
)),
Err(e) => Err(e),
}
} }
async fn update_graph( async fn update_graph(
&mut self, &mut self,
mut updates: Vec<BranchUpdateInfo>, mut updates: Vec<BranchUpdateInfo>,
session_id: u64,
) -> Result<Vec<String>, VerifierError> { ) -> Result<Vec<String>, VerifierError> {
let updates_ref = &mut updates; let updates_ref = &mut updates;
let res = self let res = self
@ -740,7 +763,8 @@ impl Verifier {
.await; .await;
} else { } else {
let graph_patch = update.transaction.as_patch(); let graph_patch = update.transaction.as_patch();
commit_nuris.push(NuriV0::commit(&update.repo_id, &update.commit_id)); let nuri = NuriV0::commit(&update.repo_id, &update.commit_id);
commit_nuris.push(nuri);
self.push_app_response( self.push_app_response(
&update.branch_id, &update.branch_id,
AppResponse::V0(AppResponseV0::Patch(AppPatch { AppResponse::V0(AppResponseV0::Patch(AppPatch {
@ -755,7 +779,9 @@ impl Verifier {
let graph_nuri = let graph_nuri =
NuriV0::repo_graph_name(&update.repo_id, &update.overlay_id); NuriV0::repo_graph_name(&update.repo_id, &update.overlay_id);
self.orm_update( self.orm_update(
&NuriV0::new_empty(), session_id,
update.repo_id.clone(),
update.overlay_id,
update.transaction.as_quads_patch(graph_nuri), update.transaction.as_quads_patch(graph_nuri),
) )
.await; .await;
@ -773,7 +799,8 @@ impl Verifier {
query: &String, query: &String,
base: &Option<String>, base: &Option<String>,
peer_id: Vec<u8>, peer_id: Vec<u8>,
) -> Result<Vec<String>, String> { session_id: u64,
) -> Result<(Vec<String>, Vec<Quad>, Vec<Quad>, Vec<Quad>), String> {
let store = self.graph_dataset.as_ref().unwrap(); let store = self.graph_dataset.as_ref().unwrap();
let update = ng_oxigraph::oxigraph::sparql::Update::parse(query, base.as_deref()) let update = ng_oxigraph::oxigraph::sparql::Update::parse(query, base.as_deref())
@ -788,12 +815,13 @@ impl Verifier {
Err(e) => Err(e.to_string()), Err(e) => Err(e.to_string()),
Ok((inserts, removes)) => { Ok((inserts, removes)) => {
if inserts.is_empty() && removes.is_empty() { if inserts.is_empty() && removes.is_empty() {
Ok(vec![]) Ok((vec![], vec![], vec![], vec![]))
} else { } else {
self.prepare_sparql_update( self.prepare_sparql_update(
Vec::from_iter(inserts), Vec::from_iter(inserts),
Vec::from_iter(removes), Vec::from_iter(removes),
peer_id, peer_id,
session_id,
) )
.await .await
.map_err(|e| e.to_string()) .map_err(|e| e.to_string())

@ -65,7 +65,7 @@ impl Verifier {
<> ng:social_query_from_profile <{from_profile_nuri_string}>. <> ng:social_query_from_profile <{from_profile_nuri_string}>.
<> ng:social_query_started \"{}\"^^xsd:dateTime . }}",DateTime::now()); <> ng:social_query_started \"{}\"^^xsd:dateTime . }}",DateTime::now());
let ret = self let ret = self
.process_sparql_update(&forwarder_nuri, &sparql_update, &Some(forwarder_nuri_string.clone()), vec![]) .process_sparql_update(&forwarder_nuri, &sparql_update, &Some(forwarder_nuri_string.clone()), vec![],0)
.await; .await;
if let Err(e) = ret { if let Err(e) = ret {
return Err(VerifierError::SparqlError(e)); return Err(VerifierError::SparqlError(e));
@ -78,7 +78,7 @@ impl Verifier {
// adding triples in forwarder doc : ng:social_query_id // adding triples in forwarder doc : ng:social_query_id
let sparql_update = format!("INSERT DATA {{ <{forwarder_nuri_string}> <did:ng:x:ng#{predicate}> \"{}\"^^<http://www.w3.org/2001/XMLSchema#dateTime> . }}",DateTime::now()); let sparql_update = format!("INSERT DATA {{ <{forwarder_nuri_string}> <did:ng:x:ng#{predicate}> \"{}\"^^<http://www.w3.org/2001/XMLSchema#dateTime> . }}",DateTime::now());
let ret = self let ret = self
.process_sparql_update(forwarder_nuri, &sparql_update, &None, vec![]) .process_sparql_update(forwarder_nuri, &sparql_update, &None, vec![],0)
.await; .await;
if let Err(e) = ret { if let Err(e) = ret {
return Err(VerifierError::SparqlError(e)); return Err(VerifierError::SparqlError(e));
@ -153,7 +153,7 @@ impl Verifier {
<did:ng:_> ng:social_query_forwarded_to_inbox <{to_inbox_nuri}> . <did:ng:_> ng:social_query_forwarded_to_inbox <{to_inbox_nuri}> .
}}"); }}");
let ret = self let ret = self
.process_sparql_update(&forwarder_nuri, &sparql_update, &None, vec![]) .process_sparql_update(&forwarder_nuri, &sparql_update, &None, vec![],0)
.await; .await;
if let Err(e) = ret { if let Err(e) = ret {
return Err(VerifierError::SparqlError(e)); return Err(VerifierError::SparqlError(e));
@ -593,7 +593,7 @@ impl Verifier {
let nuri_ov = NuriV0::repo_graph_name(&response.query_id, &overlay_id); let nuri_ov = NuriV0::repo_graph_name(&response.query_id, &overlay_id);
let graph_name = NamedNode::new_unchecked(&nuri_ov); let graph_name = NamedNode::new_unchecked(&nuri_ov);
let quads = triples.into_iter().map(|t| t.in_graph(graph_name.clone()) ).collect(); let quads = triples.into_iter().map(|t| t.in_graph(graph_name.clone()) ).collect();
let commits = self.prepare_sparql_update(quads, vec![], self.get_peer_id_for_skolem()).await?; let _ = self.prepare_sparql_update(quads, vec![], self.get_peer_id_for_skolem(), 0).await?;
} else { } else {
@ -665,7 +665,7 @@ impl Verifier {
{has_email} }}", details.name); {has_email} }}", details.name);
let ret = self let ret = self
.process_sparql_update(&contact_nuri, &sparql_update, &Some(contact_nuri_string), vec![]) .process_sparql_update(&contact_nuri, &sparql_update, &Some(contact_nuri_string), vec![],0)
.await; .await;
if let Err(e) = ret { if let Err(e) = ret {
return Err(VerifierError::SparqlError(e)); return Err(VerifierError::SparqlError(e));

@ -57,15 +57,18 @@ pub fn add_remove_triples(
// Process added triples. // Process added triples.
// For each triple, check if it matches the shape. // For each triple, check if it matches the shape.
// In parallel, we record the values added and removed (tracked_changes) // In parallel, we record the values added and removed (tracked_changes)
log_debug!("Processing # triples: {}", triples_added.len());
for triple in triples_added { for triple in triples_added {
let obj_term = oxrdf_term_to_orm_basic_type(&triple.object); let obj_term = oxrdf_term_to_orm_basic_type(&triple.object);
log_debug!("processing triple {triple}"); log_debug!(" - processing triple {triple}");
for predicate_schema in &shape.predicates { for predicate_schema in &shape.predicates {
if predicate_schema.iri != triple.predicate.as_str() { if predicate_schema.iri != triple.predicate.as_str() {
// Triple does not match predicate. // Triple does not match predicate.
continue; continue;
} }
log_debug!(
" - Matched triple for datatypes {:?}",
predicate_schema.dataTypes
);
// Predicate schema constraint matches this triple. // Predicate schema constraint matches this triple.
let tracked_subject_lock = let tracked_subject_lock =
get_or_create_tracked_subject(subject_iri, &shape, tracked_subjects); get_or_create_tracked_subject(subject_iri, &shape, tracked_subjects);
@ -124,7 +127,7 @@ pub fn add_remove_triples(
None None
} }
}) { }) {
// log_debug!("dealing with nesting for {shape_iri}"); log_debug!(" - dealing with nested type {shape_iri}");
if let BasicType::Str(obj_iri) = &obj_term { if let BasicType::Str(obj_iri) = &obj_term {
let tracked_child_arc = { let tracked_child_arc = {
// Get or create object's tracked subject struct. // Get or create object's tracked subject struct.

@ -14,6 +14,11 @@ pub mod validation;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::channel::mpsc::UnboundedSender; use futures::channel::mpsc::UnboundedSender;
use ng_net::types::OverlayLink;
use ng_oxigraph::oxrdf::Quad;
use ng_repo::errors::VerifierError;
use ng_repo::types::OverlayId;
use ng_repo::types::RepoId;
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::HashSet; use std::collections::HashSet;
@ -75,6 +80,7 @@ impl Verifier {
return Err(NgError::SparqlError(e.to_string())); return Err(NgError::SparqlError(e.to_string()));
} }
Ok(triple) => { Ok(triple) => {
log_debug!("Triple fetched: {:?}", triple);
result_triples.push(triple); result_triples.push(triple);
} }
} }
@ -173,9 +179,13 @@ impl Verifier {
HashMap::new(); HashMap::new();
// For each subject, add/remove triples and validate. // For each subject, add/remove triples and validate.
log_debug!("all_modified_subjects: {:?}", modified_subject_iris); log_debug!(
"processing modified subjects: {:?} against shape: {}",
modified_subject_iris,
shape.iri
);
for subject_iri in modified_subject_iris { for subject_iri in &modified_subject_iris {
let validation_key = (shape.iri.clone(), subject_iri.to_string()); let validation_key = (shape.iri.clone(), subject_iri.to_string());
// Cycle detection: Check if this (shape, subject) pair is already being validated // Cycle detection: Check if this (shape, subject) pair is already being validated
@ -187,7 +197,8 @@ impl Verifier {
); );
// Mark as invalid due to cycle // Mark as invalid due to cycle
// TODO: We could handle this by handling nested references as IRIs. // TODO: We could handle this by handling nested references as IRIs.
if let Some(tracked_shapes) = orm_subscription.tracked_subjects.get(subject_iri) if let Some(tracked_shapes) =
orm_subscription.tracked_subjects.get(*subject_iri)
{ {
if let Some(tracked_subject) = tracked_shapes.get(&shape.iri) { if let Some(tracked_subject) = tracked_shapes.get(&shape.iri) {
let mut ts = tracked_subject.write().unwrap(); let mut ts = tracked_subject.write().unwrap();
@ -201,12 +212,13 @@ impl Verifier {
// Mark as currently validating // Mark as currently validating
currently_validating.insert(validation_key.clone()); currently_validating.insert(validation_key.clone());
// Get triples of subject (added & removed).
let triples_added_for_subj = added_triples_by_subject let triples_added_for_subj = added_triples_by_subject
.get(subject_iri) .get(*subject_iri)
.map(|v| v.as_slice()) .map(|v| v.as_slice())
.unwrap_or(&[]); .unwrap_or(&[]);
let triples_removed_for_subj = removed_triples_by_subject let triples_removed_for_subj = removed_triples_by_subject
.get(subject_iri) .get(*subject_iri)
.map(|v| v.as_slice()) .map(|v| v.as_slice())
.unwrap_or(&[]); .unwrap_or(&[]);
@ -214,9 +226,9 @@ impl Verifier {
let change = orm_changes let change = orm_changes
.entry(shape.iri.clone()) .entry(shape.iri.clone())
.or_insert_with(HashMap::new) .or_insert_with(HashMap::new)
.entry(subject_iri.clone()) .entry((*subject_iri).clone())
.or_insert_with(|| OrmTrackedSubjectChange { .or_insert_with(|| OrmTrackedSubjectChange {
subject_iri: subject_iri.clone(), subject_iri: (*subject_iri).clone(),
predicates: HashMap::new(), predicates: HashMap::new(),
data_applied: false, data_applied: false,
}); });
@ -248,7 +260,7 @@ impl Verifier {
let validity = { let validity = {
let tracked_subject_opt = orm_subscription let tracked_subject_opt = orm_subscription
.tracked_subjects .tracked_subjects
.get(subject_iri) .get(*subject_iri)
.and_then(|m| m.get(&shape.iri)); .and_then(|m| m.get(&shape.iri));
let Some(tracked_subject) = tracked_subject_opt else { let Some(tracked_subject) = tracked_subject_opt else {
continue; continue;
@ -277,14 +289,8 @@ impl Verifier {
} }
} }
} }
// Remove from validation stack after processing this subject
currently_validating.remove(&validation_key);
} }
// TODO: Currently, all shape <-> nested subject combinations are queued for re-evaluation.
// Is that okay?
// Now, we queue all non-evaluated objects // Now, we queue all non-evaluated objects
for (shape_iri, objects_to_eval) in &nested_objects_to_eval { for (shape_iri, objects_to_eval) in &nested_objects_to_eval {
let orm_subscription = self.get_first_orm_subscription_for( let orm_subscription = self.get_first_orm_subscription_for(
@ -333,6 +339,10 @@ impl Verifier {
shape_validation_stack.push((shape_arc, objects_not_to_fetch)); shape_validation_stack.push((shape_arc, objects_not_to_fetch));
} }
} }
for subject_iri in modified_subject_iris {
let validation_key = (shape.iri.clone(), subject_iri.to_string());
currently_validating.remove(&validation_key);
}
} }
Ok(()) Ok(())
@ -375,6 +385,37 @@ impl Verifier {
}).next().unwrap() }).next().unwrap()
} }
pub fn get_first_orm_subscription_sender_for(
&mut self,
nuri: &NuriV0,
shape: Option<&ShapeIri>,
session_id: Option<&u64>,
) -> Result<(UnboundedSender<AppResponse>, &OrmSubscription), VerifierError> {
let subs = self
.orm_subscriptions
.get_mut(nuri)
.unwrap();
subs.retain(|sub| !sub.sender.is_closed());
match
// Filter shapes, if present.
subs.iter()
.filter(|s| match shape {
Some(sh) => *sh == s.shape_type.shape,
None => true, // Filter session ids if present.
})
.filter(|s| match session_id {
Some(id) => *id == s.session_id,
None => true,
})
.next()
{
None => Err(VerifierError::OrmSubscriptionNotFound),
Some(subscription) => {
Ok((subscription.sender.clone(), subscription))
}
}
}
/// Apply triples to a nuri's document. /// Apply triples to a nuri's document.
/// Updates tracked_subjects in orm_subscriptions. /// Updates tracked_subjects in orm_subscriptions.
fn apply_triple_changes( fn apply_triple_changes(
@ -617,32 +658,115 @@ impl Verifier {
return Ok(return_vals); return Ok(return_vals);
} }
pub(crate) async fn orm_update(&mut self, scope: &NuriV0, patch: GraphQuadsPatch) {} pub(crate) async fn orm_update(
&mut self,
session_id: u64,
repo_id: RepoId,
overlay_id: OverlayId,
patch: GraphQuadsPatch,
) {
let overlaylink: OverlayLink = overlay_id.into();
for (scope, subs) in self.orm_subscriptions.iter_mut() {
subs.retain(|sub| !sub.sender.is_closed());
if scope.entire_store
|| scope.overlay.as_ref().map_or(false, |ol| overlaylink == *ol)
|| scope.target == NuriTargetV0::Repo(repo_id)
{
for sub in subs {
if sub.session_id != session_id { // this is incorrect. we are excluding all the subscriptions from the originating session,
// while we should only exclude the one with exact same shape_type. but we don't have access to that here
// TODO: implement this, generate orm_diff using the patch and the sub.shape_type
let orm_diff: OrmDiff = vec![];
pub(crate) async fn orm_frontend_update( _ = sub.sender.send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff.to_vec()))).await;
}
}
}
}
}
pub(crate) async fn orm_update_self(
&mut self, &mut self,
scope: &NuriV0, scope: &NuriV0,
shape_iri: ShapeIri, shape_iri: ShapeIri,
diff: OrmDiff, session_id: u64,
) { skolemnized_blank_nodes: Vec<Quad>,
log_info!("frontend_update_orm {:?} {} {:?}", scope, shape_iri, diff); revert_inserts: Vec<Quad>,
revert_removes: Vec<Quad>,
) -> Result<(), VerifierError> {
let (mut sender, orm_subscription) =
self.get_first_orm_subscription_sender_for(scope, Some(&shape_iri), Some(&session_id))?;
// TODO prepare OrmUpdateBlankNodeIds with skolemnized_blank_nodes
// use orm_subscription if needed
// note(niko): I think skolemnized blank nodes can still be many, in case of multi-level nested sub-objects.
let orm_bnids = vec![];
let _ = sender.send(AppResponse::V0(AppResponseV0::OrmUpdateBlankNodeIds(orm_bnids))).await;
// TODO (later) revert the inserts and removes
// let orm_diff = vec![];
// let _ = sender.send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff))).await;
Ok(())
} }
pub(crate) async fn push_orm_response( pub(crate) async fn orm_frontend_update(
&mut self, &mut self,
nuri: &NuriV0,
session_id: u64, session_id: u64,
sender: UnboundedSender<AppResponse>, scope: &NuriV0,
response: AppResponse, shape_iri: ShapeIri,
) { diff: OrmDiff,
log_debug!("sending orm response for session {}:", session_id); ) -> Result<(), String> {
log_info!(
if sender.is_closed() { "frontend_update_orm session={} scope={:?} shape={} diff={:?}",
log_debug!("closed so removing session {}", session_id); session_id,
scope,
shape_iri,
diff
);
// find OrmSubscription
let (doc_nuri, sparql_update) = {
let orm_subscription =
self.get_first_orm_subscription_for(scope, Some(&shape_iri), Some(&session_id));
// use orm_subscription as needed
// do the magic, then, find the doc where the query should start and generate the sparql update
let doc_nuri = NuriV0::new_empty();
let sparql_update: String = String::new();
(doc_nuri, sparql_update)
};
self.orm_subscriptions.remove(&nuri); match self
} else { .process_sparql_update(
let _ = sender.clone().send(response).await; &doc_nuri,
&sparql_update,
&None,
self.get_peer_id_for_skolem(),
session_id,
)
.await
{
Err(e) => Err(e),
Ok((_, revert_inserts, revert_removes, skolemnized_blank_nodes)) => {
if !revert_inserts.is_empty()
|| !revert_removes.is_empty()
|| !skolemnized_blank_nodes.is_empty()
{
self.orm_update_self(
scope,
shape_iri,
session_id,
skolemnized_blank_nodes,
revert_inserts,
revert_removes,
)
.await.map_err(|e|e.to_string())?;
}
Ok(())
}
} }
} }
@ -661,7 +785,7 @@ impl Verifier {
shape_type: &OrmShapeType, shape_type: &OrmShapeType,
session_id: u64, session_id: u64,
) -> Result<(Receiver<AppResponse>, CancelFn), NgError> { ) -> Result<(Receiver<AppResponse>, CancelFn), NgError> {
let (tx, rx) = mpsc::unbounded::<AppResponse>(); let (mut tx, rx) = mpsc::unbounded::<AppResponse>();
// TODO: Validate schema: // TODO: Validate schema:
// If multiple data types are present for the same predicate, they must be of of the same type. // If multiple data types are present for the same predicate, they must be of of the same type.
@ -681,19 +805,13 @@ impl Verifier {
.or_insert(vec![]) .or_insert(vec![])
.push(orm_subscription); .push(orm_subscription);
let _orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type)?; let orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type)?;
// log_debug!("create_orm_object_for_shape return {:?}", _orm_objects); // log_debug!("create_orm_object_for_shape return {:?}", orm_objects);
self.push_orm_response( let _ = tx.send(AppResponse::V0(AppResponseV0::OrmInitial(orm_objects))).await;
&nuri.clone(),
session_id,
tx.clone(),
AppResponse::V0(AppResponseV0::OrmInitial(_orm_objects)),
)
.await;
let close = Box::new(move || { let close = Box::new(move || {
log_debug!("CLOSE_CHANNEL of subscription"); log_debug!("closing ORM subscription");
if !tx.is_closed() { if !tx.is_closed() {
tx.close_channel(); tx.close_channel();
} }

@ -80,6 +80,7 @@ pub fn shape_type_to_sparql(
let mut visited_shapes: HashSet<ShapeIri> = HashSet::new(); let mut visited_shapes: HashSet<ShapeIri> = HashSet::new();
// Recursive function to call for (nested) shapes. // Recursive function to call for (nested) shapes.
// Returns nested WHERE statements that should be included with this shape's binding.
fn process_shape( fn process_shape(
schema: &OrmSchema, schema: &OrmSchema,
shape: &OrmSchemaShape, shape: &OrmSchemaShape,
@ -88,27 +89,30 @@ pub fn shape_type_to_sparql(
where_statements: &mut Vec<String>, where_statements: &mut Vec<String>,
var_counter: &mut i32, var_counter: &mut i32,
visited_shapes: &mut HashSet<String>, visited_shapes: &mut HashSet<String>,
) { in_recursion: bool,
) -> Vec<String> {
// Prevent infinite recursion on cyclic schemas. // Prevent infinite recursion on cyclic schemas.
// TODO: We could handle this as IRI string reference. // TODO: We could handle this as IRI string reference.
if visited_shapes.contains(&shape.iri) { if visited_shapes.contains(&shape.iri) {
return; return vec![];
} }
let mut new_where_statements: Vec<String> = vec![];
let mut new_construct_statements: Vec<String> = vec![];
visited_shapes.insert(shape.iri.clone()); visited_shapes.insert(shape.iri.clone());
// Add statements for each predicate. // Add statements for each predicate.
// If we are in recursion, we want to get all triples.
// That's why we add a "<subject> ?p ?o" statement afterwards
// and the extra construct statements are skipped.
for predicate in &shape.predicates { for predicate in &shape.predicates {
let mut union_branches = Vec::new(); let mut union_branches = Vec::new();
let mut allowed_literals = Vec::new(); let mut nested_where_statements = Vec::new();
// Predicate constraints might have more than one acceptable data type. Traverse each. // Predicate constraints might have more than one acceptable nested shape. Traverse each.
// It is assumed that constant literals, nested shapes and regular types are not mixed.
for datatype in &predicate.dataTypes { for datatype in &predicate.dataTypes {
if datatype.valType == OrmSchemaLiteralType::literal { if datatype.valType == OrmSchemaLiteralType::shape {
// Collect allowed literals and as strings
// (already in SPARQL-format, e.g. `"a astring"`, `<http:ex.co/>`, `true`, or `42`).
allowed_literals.extend(literal_to_sparql_str(datatype.clone()));
} else if datatype.valType == OrmSchemaLiteralType::shape {
let shape_iri = &datatype.shape.clone().unwrap(); let shape_iri = &datatype.shape.clone().unwrap();
let nested_shape = schema.get(shape_iri).unwrap(); let nested_shape = schema.get(shape_iri).unwrap();
@ -117,10 +121,12 @@ pub fn shape_type_to_sparql(
// Each shape option gets its own var. // Each shape option gets its own var.
let obj_var_name = get_new_var_name(var_counter); let obj_var_name = get_new_var_name(var_counter);
construct_statements.push(format!( if !in_recursion {
" ?{} <{}> ?{}", new_construct_statements.push(format!(
subject_var_name, predicate.iri, obj_var_name " ?{} <{}> ?{}",
)); subject_var_name, predicate.iri, obj_var_name
));
}
// Those are later added to a UNION, if there is more than one shape. // Those are later added to a UNION, if there is more than one shape.
union_branches.push(format!( union_branches.push(format!(
" ?{} <{}> ?{}", " ?{} <{}> ?{}",
@ -128,7 +134,8 @@ pub fn shape_type_to_sparql(
)); ));
// Recurse to add statements for nested object. // Recurse to add statements for nested object.
process_shape( // Collect nested WHERE statements to include within this predicate's scope.
let nested_stmts = process_shape(
schema, schema,
nested_shape, nested_shape,
&obj_var_name, &obj_var_name,
@ -136,64 +143,86 @@ pub fn shape_type_to_sparql(
where_statements, where_statements,
var_counter, var_counter,
visited_shapes, visited_shapes,
true,
); );
nested_where_statements.extend(nested_stmts);
} }
} }
// The where statement which might be wrapped in OPTIONAL. // The where statement (which may be wrapped in OPTIONAL).
let where_body: String; let where_body: String;
if !allowed_literals.is_empty() if !union_branches.is_empty() {
&& !predicate.extra.unwrap_or(false)
&& predicate.minCardinality > 0
{
// If we have literal requirements and they are not optional ("extra"),
// Add CONSTRUCT, WHERE, and FILTER.
let pred_var_name = get_new_var_name(var_counter);
construct_statements.push(format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, pred_var_name
));
where_body = format!(
" ?{s} <{p}> ?{o} . \n FILTER(?{o} IN ({lits}))",
s = subject_var_name,
p = predicate.iri,
o = pred_var_name,
lits = allowed_literals.join(", ")
);
} else if !union_branches.is_empty() {
// We have nested shape(s) which were already added to CONSTRUCT above. // We have nested shape(s) which were already added to CONSTRUCT above.
// Join them with UNION. // Join them with UNION and include nested WHERE statements.
where_body = union_branches let union_body = union_branches
.into_iter() .into_iter()
.map(|b| format!("{{\n{}\n}}", b)) .map(|b| format!("{{\n{}\n}}", b))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(" UNION "); .join(" UNION ");
// Combine the parent binding with nested statements
if !nested_where_statements.is_empty() {
let nested_joined = nested_where_statements.join(" .\n");
where_body = format!("{} .\n{}", union_body, nested_joined);
} else {
where_body = union_body;
}
} else { } else {
// Regular predicate data type. Just add basic CONSTRUCT and WHERE statements. // Regular predicate data type. Just add basic CONSTRUCT and WHERE statements.
let pred_var_name = get_new_var_name(var_counter); let obj_var_name = get_new_var_name(var_counter);
construct_statements.push(format!( if !in_recursion {
" ?{} <{}> ?{}", // Only add construct, if we don't have catch-all statement already.
subject_var_name, predicate.iri, pred_var_name new_construct_statements.push(format!(
)); " ?{} <{}> ?{}",
subject_var_name, predicate.iri, obj_var_name
));
}
where_body = format!( where_body = format!(
" ?{} <{}> ?{}", " ?{} <{}> ?{}",
subject_var_name, predicate.iri, pred_var_name subject_var_name, predicate.iri, obj_var_name
); );
} }
// Wrap in optional, if necessary. // Wrap in optional, if predicate is optional
if predicate.minCardinality < 1 { if predicate.minCardinality < 1 {
where_statements.push(format!(" OPTIONAL {{\n{}\n }}", where_body)); new_where_statements.push(format!(" OPTIONAL {{\n{}\n }}", where_body));
} else { } else {
where_statements.push(where_body); new_where_statements.push(where_body);
}; };
} }
if in_recursion {
// All statements in recursive objects need to be optional
// because we want to fetch _all_ nested objects,
// invalid ones too, for later validation.
let pred_var_name = get_new_var_name(var_counter);
let obj_var_name = get_new_var_name(var_counter);
// The "catch any triple in subject" construct statement
construct_statements.push(format!(
" ?{} ?{} ?{}",
subject_var_name, pred_var_name, obj_var_name
));
let joined_where_statements = new_where_statements.join(" .\n");
// Return nested statements to be included in parent's scope
// Combine catch-all with specific predicates in a UNION
let nested_block = format!(
" {{\n {{?{} ?{} ?{}}}\n UNION {{\n {}\n }}\n }}",
subject_var_name, pred_var_name, obj_var_name, joined_where_statements
);
visited_shapes.remove(&shape.iri);
return vec![nested_block];
} else {
where_statements.append(&mut new_where_statements);
construct_statements.append(&mut new_construct_statements);
}
visited_shapes.remove(&shape.iri); visited_shapes.remove(&shape.iri);
vec![]
} }
let root_shape = schema.get(shape).ok_or(VerifierError::InvalidOrmSchema)?; let root_shape = schema.get(shape).ok_or(VerifierError::InvalidOrmSchema)?;
@ -209,11 +238,12 @@ pub fn shape_type_to_sparql(
&mut where_statements, &mut where_statements,
&mut var_counter, &mut var_counter,
&mut visited_shapes, &mut visited_shapes,
false,
); );
// Filter subjects, if present. // Filter subjects, if present.
if let Some(subjects) = filter_subjects { if let Some(subjects) = filter_subjects {
log_debug!("filter_subjects: {:?}", subjects); // log_debug!("filter_subjects: {:?}", subjects);
let subjects_str = subjects let subjects_str = subjects
.iter() .iter()
.map(|s| format!("<{}>", s)) .map(|s| format!("<{}>", s))
@ -232,7 +262,6 @@ pub fn shape_type_to_sparql(
construct_body, where_body construct_body, where_body
)) ))
} }
/// SPARQL literal escape: backslash, quotes, newlines, tabs. /// SPARQL literal escape: backslash, quotes, newlines, tabs.
fn escape_literal(lit: &str) -> String { fn escape_literal(lit: &str) -> String {
let mut out = String::with_capacity(lit.len() + 4); let mut out = String::with_capacity(lit.len() + 4);

@ -36,6 +36,12 @@ impl Verifier {
// Keep track of objects that need to be validated against a shape to fetch and validate. // Keep track of objects that need to be validated against a shape to fetch and validate.
let mut need_evaluation: Vec<(String, String, bool)> = vec![]; let mut need_evaluation: Vec<(String, String, bool)> = vec![];
log_debug!(
"[Validation] for shape {} and subject {}",
shape.iri,
s_change.subject_iri
);
// Check 1) Check if this object is untracked and we need to remove children and ourselves. // Check 1) Check if this object is untracked and we need to remove children and ourselves.
if previous_validity == OrmTrackedSubjectValidity::Untracked { if previous_validity == OrmTrackedSubjectValidity::Untracked {
// 1.1) Schedule children for deletion // 1.1) Schedule children for deletion
@ -106,7 +112,7 @@ impl Verifier {
// Check 3.1) Cardinality // Check 3.1) Cardinality
if count < p_schema.minCardinality { if count < p_schema.minCardinality {
log_debug!( log_debug!(
"[VALIDATION] Invalid: minCardinality not met | predicate: {:?} | count: {} | min: {} | schema: {:?} | changed: {:?}", " - Invalid: minCardinality not met | predicate: {:?} | count: {} | min: {} | schema: {:?} | changed: {:?}",
p_schema.iri, p_schema.iri,
count, count,
p_schema.minCardinality, p_schema.minCardinality,
@ -125,7 +131,7 @@ impl Verifier {
&& p_schema.extra != Some(true) && p_schema.extra != Some(true)
{ {
log_debug!( log_debug!(
"[VALIDATION] Invalid: maxCardinality exceeded | predicate: {:?} | count: {} | max: {} | schema: {:?} | changed: {:?}", " - Invalid: maxCardinality exceeded | predicate: {:?} | count: {} | max: {} | schema: {:?} | changed: {:?}",
p_schema.iri, p_schema.iri,
count, count,
p_schema.maxCardinality, p_schema.maxCardinality,
@ -171,12 +177,13 @@ impl Verifier {
); );
if !some_valid { if !some_valid {
log_debug!( log_debug!(
"[VALIDATION] Invalid: required literals missing | predicate: {:?} | schema: {:?} | changed: {:?}", " - Invalid: required literals missing | predicate: {:?} | schema: {:?} | changed: {:?}",
p_schema.iri, p_schema.iri,
shape.iri, shape.iri,
p_change p_change
); );
set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid);
break;
} }
// Check 3.4) Nested shape correct. // Check 3.4) Nested shape correct.
} else if p_schema } else if p_schema
@ -191,6 +198,7 @@ impl Verifier {
.map(|tc| tc.read().unwrap()) .map(|tc| tc.read().unwrap())
.collect::<Vec<_>>() .collect::<Vec<_>>()
}); });
// First, Count valid, invalid, unknowns, and untracked // First, Count valid, invalid, unknowns, and untracked
let counts = tracked_children.as_ref().map_or((0, 0, 0, 0), |children| { let counts = tracked_children.as_ref().map_or((0, 0, 0, 0), |children| {
children children
@ -213,9 +221,11 @@ impl Verifier {
}) })
}); });
log_debug!(" - checking nested - Counts: {:?}", counts);
if counts.1 > 0 && p_schema.extra != Some(true) { if counts.1 > 0 && p_schema.extra != Some(true) {
log_debug!( log_debug!(
"[VALIDATION] Invalid: nested invalid child | predicate: {:?} | schema: {:?} | changed: {:?}", " - Invalid: nested invalid child | predicate: {:?} | schema: {:?} | changed: {:?}",
p_schema.iri, p_schema.iri,
shape.iri, shape.iri,
p_change p_change
@ -226,7 +236,7 @@ impl Verifier {
break; break;
} else if counts.0 > p_schema.maxCardinality && p_schema.maxCardinality != -1 { } else if counts.0 > p_schema.maxCardinality && p_schema.maxCardinality != -1 {
log_debug!( log_debug!(
"[VALIDATION] Too many valid children: | predicate: {:?} | schema: {:?} | changed: {:?}", " - Invalid: Too many valid children: | predicate: {:?} | schema: {:?} | changed: {:?}",
p_schema.iri, p_schema.iri,
shape.iri, shape.iri,
p_change p_change
@ -236,7 +246,7 @@ impl Verifier {
break; break;
} else if counts.0 + counts.2 + counts.3 < p_schema.minCardinality { } else if counts.0 + counts.2 + counts.3 < p_schema.minCardinality {
log_debug!( log_debug!(
"[VALIDATION] Invalid: not enough nested children | predicate: {:?} | valid_count: {} | min: {} | schema: {:?} | changed: {:?}", " - Invalid: not enough nested children | predicate: {:?} | valid_count: {} | min: {} | schema: {:?} | changed: {:?}",
p_schema.iri, p_schema.iri,
counts.0, counts.0,
p_schema.minCardinality, p_schema.minCardinality,
@ -269,7 +279,7 @@ impl Verifier {
} }
}); });
} else if counts.2 > 0 { } else if counts.2 > 0 {
// If we have pending nested objects, we need to wait for their evaluation. // If we have pending children, we need to wait for their evaluation.
set_validity(&mut new_validity, OrmTrackedSubjectValidity::Pending); set_validity(&mut new_validity, OrmTrackedSubjectValidity::Pending);
// Schedule pending children for re-evaluation without fetch. // Schedule pending children for re-evaluation without fetch.
tracked_children.as_ref().map(|children| { tracked_children.as_ref().map(|children| {
@ -307,7 +317,7 @@ impl Verifier {
}; };
if !matches { if !matches {
log_debug!( log_debug!(
"[VALIDATION] Invalid: value type mismatch | predicate: {:?} | value: {:?} | allowed_types: {:?} | schema: {:?} | changed: {:?}", " - Invalid: value type mismatch | predicate: {:?} | value: {:?} | allowed_types: {:?} | schema: {:?} | changed: {:?}",
p_schema.iri, p_schema.iri,
val_added, val_added,
allowed_types, allowed_types,

@ -212,12 +212,14 @@ impl Verifier {
if inserts.is_empty() && removes.is_empty() { if inserts.is_empty() && removes.is_empty() {
Ok(()) Ok(())
} else { } else {
self.prepare_sparql_update( let _ = self
Vec::from_iter(inserts), .prepare_sparql_update(
Vec::from_iter(removes), Vec::from_iter(inserts),
self.get_peer_id_for_skolem(), Vec::from_iter(removes),
) self.get_peer_id_for_skolem(),
.await?; 0,
)
.await?;
Ok(()) Ok(())
} }
} }
@ -696,7 +698,7 @@ impl Verifier {
); );
let ret = self let ret = self
.process_sparql_update(&store_nuri, &query, &None, vec![]) .process_sparql_update(&store_nuri, &query, &None, vec![], 0)
.await; .await;
if let Err(e) = ret { if let Err(e) = ret {
return Err(NgError::SparqlError(e)); return Err(NgError::SparqlError(e));
@ -712,7 +714,9 @@ impl Verifier {
object: Literal::new_simple_literal(primary_class).into(), object: Literal::new_simple_literal(primary_class).into(),
graph_name: NamedNode::new_unchecked(&header_branch_nuri).into(), graph_name: NamedNode::new_unchecked(&header_branch_nuri).into(),
}; };
let ret = self.prepare_sparql_update(vec![quad], vec![], vec![]).await; let ret = self
.prepare_sparql_update(vec![quad], vec![], vec![], 0)
.await;
if let Err(e) = ret { if let Err(e) = ret {
return Err(NgError::SparqlError(e.to_string())); return Err(NgError::SparqlError(e.to_string()));
} }
@ -814,6 +818,7 @@ impl Verifier {
&sparql_update, &sparql_update,
&Some(contact_doc_nuri_string), &Some(contact_doc_nuri_string),
vec![], vec![],
0,
) )
.await; .await;
if let Err(e) = ret { if let Err(e) = ret {
@ -894,11 +899,18 @@ impl Verifier {
command: &AppRequestCommandV0, command: &AppRequestCommandV0,
nuri: NuriV0, nuri: NuriV0,
payload: Option<AppRequestPayload>, payload: Option<AppRequestPayload>,
session_id: u64,
) -> Result<AppResponse, NgError> { ) -> Result<AppResponse, NgError> {
match command { match command {
AppRequestCommandV0::OrmUpdate => match payload { AppRequestCommandV0::OrmUpdate => match payload {
Some(AppRequestPayload::V0(AppRequestPayloadV0::OrmUpdate((diff, shape_id)))) => { Some(AppRequestPayload::V0(AppRequestPayloadV0::OrmUpdate((diff, shape_id)))) => {
self.orm_frontend_update(&nuri, shape_id, diff).await return match self
.orm_frontend_update(session_id, &nuri, shape_id, diff)
.await
{
Err(e) => Ok(AppResponse::error(e)),
Ok(()) => Ok(AppResponse::ok()),
}
} }
_ => return Err(NgError::InvalidArgument), _ => return Err(NgError::InvalidArgument),
}, },
@ -993,7 +1005,7 @@ impl Verifier {
let social_query_doc_nuri_string = NuriV0::repo_id(query_id); let social_query_doc_nuri_string = NuriV0::repo_id(query_id);
let sparql_update = format!("INSERT DATA {{ <{social_query_doc_nuri_string}> <did:ng:x:ng#social_query_forwarder> <{forwarder_nuri_string}>. }}"); let sparql_update = format!("INSERT DATA {{ <{social_query_doc_nuri_string}> <did:ng:x:ng#social_query_forwarder> <{forwarder_nuri_string}>. }}");
let ret = self let ret = self
.process_sparql_update(&nuri, &sparql_update, &None, vec![]) .process_sparql_update(&nuri, &sparql_update, &None, vec![], 0)
.await; .await;
if let Err(e) = ret { if let Err(e) = ret {
return Err(NgError::SparqlError(e)); return Err(NgError::SparqlError(e));
@ -1008,6 +1020,7 @@ impl Verifier {
&sparql_update, &sparql_update,
&Some(forwarder_nuri_string), &Some(forwarder_nuri_string),
vec![], vec![],
0,
) )
.await; .await;
if let Err(e) = ret { if let Err(e) = ret {
@ -1217,11 +1230,12 @@ impl Verifier {
&sparql, &sparql,
&base, &base,
self.get_peer_id_for_skolem(), self.get_peer_id_for_skolem(),
session_id,
) )
.await .await
{ {
Err(e) => AppResponse::error(e), Err(e) => AppResponse::error(e),
Ok(commits) => AppResponse::commits(commits), Ok((commits, ..)) => AppResponse::commits(commits),
}, },
) )
} else { } else {

@ -2836,7 +2836,10 @@ impl Verifier {
pub async fn app_request(&mut self, req: AppRequest) -> Result<AppResponse, NgError> { pub async fn app_request(&mut self, req: AppRequest) -> Result<AppResponse, NgError> {
match req { match req {
AppRequest::V0(v0) => self.process(&v0.command, v0.nuri, v0.payload).await, AppRequest::V0(v0) => {
self.process(&v0.command, v0.nuri, v0.payload, v0.session_id)
.await
}
} }
} }

@ -15,5 +15,8 @@
"prettier": "^3.6.2", "prettier": "^3.6.2",
"prettier-plugin-svelte": "^3.4.0" "prettier-plugin-svelte": "^3.4.0"
}, },
"engines": {
"node": ">=22.18"
},
"packageManager": "pnpm@10.15.0+sha512.486ebc259d3e999a4e8691ce03b5cac4a71cbeca39372a9b762cb500cfdf0873e2cb16abe3d951b1ee2cf012503f027b98b6584e4df22524e0c7450d9ec7aa7b" "packageManager": "pnpm@10.15.0+sha512.486ebc259d3e999a4e8691ce03b5cac4a71cbeca39372a9b762cb500cfdf0873e2cb16abe3d951b1ee2cf012503f027b98b6584e4df22524e0c7450d9ec7aa7b"
} }

@ -43,5 +43,8 @@
"@types/react-dom": "19.1.7", "@types/react-dom": "19.1.7",
"vite": "7.1.3", "vite": "7.1.3",
"vitest": "^3.2.4" "vitest": "^3.2.4"
},
"engines": {
"node": ">=22.18"
} }
} }

@ -6,21 +6,21 @@ import VueRoot from "../components/VueRoot.vue";
import ReactRoot from "../components/ReactRoot"; import ReactRoot from "../components/ReactRoot";
import SvelteRoot from "../components/SvelteRoot.svelte"; import SvelteRoot from "../components/SvelteRoot.svelte";
// Hack to get mock backend started // Hack to get mock backend started
import { mockTestObject } from "../../ng-mock/wasm-land/shapeHandler"; //import { mockTestObject } from "../../ng-mock/wasm-land/shapeHandler";
const title = "Multi-framework app"; const title = "Multi-framework app";
--- ---
<Layout title={title}> <Layout title={title}>
<Highlight vue> <Highlight vue>
<VueRoot client:only /> <VueRoot client:only />
</Highlight> </Highlight>
<Highlight react> <Highlight react>
<ReactRoot client:only="react" /> <ReactRoot client:only="react" />
</Highlight> </Highlight>
<Highlight svelte> <Highlight svelte>
<SvelteRoot client:only /> <SvelteRoot client:only />
</Highlight> </Highlight>
</Layout> </Layout>

@ -1,6 +1,8 @@
import type { Diff, Scope } from "../types.ts"; import type { Diff, Scope } from "../types.ts";
import { applyDiff } from "./applyDiff.ts"; import { applyDiff } from "./applyDiff.ts";
import ng from "@nextgraph-monorepo/ng-sdk-js";
import { import {
deepSignal, deepSignal,
watch, watch,
@ -111,7 +113,7 @@ const setUpConnection = (entry: PoolEntry<any>, wasmMessage: WasmMessage) => {
}; };
// Handler for messages from wasm land. // Handler for messages from wasm land.
const onWasmMessage = (event: MessageEvent<WasmMessage>) => { const onMessage = (event: MessageEvent<WasmMessage>) => {
console.debug("[JsLand] onWasmMessage", event); console.debug("[JsLand] onWasmMessage", event);
const { diff, connectionId, type } = event.data; const { diff, connectionId, type } = event.data;
@ -121,7 +123,12 @@ const onWasmMessage = (event: MessageEvent<WasmMessage>) => {
// And only process messages that are addressed to js-land. // And only process messages that are addressed to js-land.
if (type === "FrontendUpdate") return; if (type === "FrontendUpdate") return;
if (type === "Request") return; if (type === "Request") {
// TODO: Handle message from wasm land and js land
// in different functions
return;
}
if (type === "Stop") return; if (type === "Stop") return;
if (type === "InitialResponse") { if (type === "InitialResponse") {
@ -137,7 +144,7 @@ const keyToEntry = new Map<string, PoolEntry<any>>();
const connectionIdToEntry = new Map<string, PoolEntry<any>>(); const connectionIdToEntry = new Map<string, PoolEntry<any>>();
const communicationChannel = new BroadcastChannel("shape-manager"); const communicationChannel = new BroadcastChannel("shape-manager");
communicationChannel.addEventListener("message", onWasmMessage); communicationChannel.addEventListener("message", onMessage);
// FinalizationRegistry to clean up connections when signal objects are GC'd. // FinalizationRegistry to clean up connections when signal objects are GC'd.
const cleanupSignalRegistry = const cleanupSignalRegistry =
@ -210,16 +217,11 @@ export function createSignalObjectForShape<T extends BaseType>(
keyToEntry.set(key, entry); keyToEntry.set(key, entry);
connectionIdToEntry.set(entry.connectionId, entry); connectionIdToEntry.set(entry.connectionId, entry);
// TODO: Just a hack since the channel is not set up in mock-mode communicationChannel.postMessage({
setTimeout( type: "Request",
() => connectionId: entry.connectionId,
communicationChannel.postMessage({ shapeType,
type: "Request", } as WasmMessage);
connectionId: entry.connectionId,
shapeType,
} as WasmMessage),
100
);
function buildReturn(entry: PoolEntry<T>) { function buildReturn(entry: PoolEntry<T>) {
return { return {

Loading…
Cancel
Save