Compare commits

..

No commits in common. 'b6e1a68dbe9b9936b2f58304dadd153728eb4a76' and '058204f8bccd59a3046fb344019da339f551a475' have entirely different histories.

  1. 25
      Cargo.lock
  2. 4
      nextgraph/Cargo.toml
  3. 1227
      nextgraph/src/tests/orm.rs
  4. 3
      ng-net/src/app_protocol.rs
  5. 8
      ng-net/src/orm.rs
  6. 1
      ng-repo/src/errors.rs
  7. 58
      ng-verifier/src/commits/transaction.rs
  8. 10
      ng-verifier/src/inbox_processor.rs
  9. 9
      ng-verifier/src/orm/add_remove_triples.rs
  10. 202
      ng-verifier/src/orm/mod.rs
  11. 127
      ng-verifier/src/orm/utils.rs
  12. 26
      ng-verifier/src/orm/validation.rs
  13. 36
      ng-verifier/src/request_processor.rs
  14. 5
      ng-verifier/src/verifier.rs
  15. 3
      package.json
  16. 3
      sdk/ng-sdk-js/examples/multi-framework-signals/package.json
  17. 20
      sdk/ng-sdk-js/examples/multi-framework-signals/src/app/pages/index.astro
  18. 28
      sdk/ng-sdk-js/ng-signals/src/connector/createSignalObjectForShape.ts

25
Cargo.lock generated

@ -596,19 +596,6 @@ dependencies = [
"pkg-config",
]
[[package]]
name = "canonical_json"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f89083fd014d71c47a718d7f4ac050864dac8587668dbe90baf9e261064c5710"
dependencies = [
"hex",
"regex",
"serde",
"serde_json",
"thiserror 1.0.69",
]
[[package]]
name = "cast"
version = "0.3.0"
@ -2169,7 +2156,6 @@ dependencies = [
"async-std",
"async-trait",
"base64-url",
"canonical_json",
"futures",
"lazy_static",
"ng-client-ws",
@ -2185,7 +2171,6 @@ dependencies = [
"serde_bare",
"serde_bytes",
"serde_json",
"serde_json_diff",
"svg2pdf",
"web-time",
"whoami",
@ -3512,16 +3497,6 @@ dependencies = [
"serde_core",
]
[[package]]
name = "serde_json_diff"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac615f2de9556d78ec9d5924abae441d1764f833fbd6db24bb56d2b6b5200ed"
dependencies = [
"serde",
"serde_json",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"

@ -45,10 +45,6 @@ ng-client-ws = { path = "../ng-client-ws", version = "0.1.2" }
ng-verifier = { path = "../ng-verifier", version = "0.1.2" }
ng-oxigraph = { path = "../ng-oxigraph", version = "0.4.0-alpha.8-ngalpha" }
[dev-dependencies]
serde_json_diff = "0.2.0"
canonical_json = "0.5.0"
[target.'cfg(all(not(target_family = "wasm"),not(docsrs)))'.dependencies]
ng-storage-rocksdb = { path = "../ng-storage-rocksdb", version = "0.1.2" }

File diff suppressed because it is too large Load Diff

@ -20,7 +20,7 @@ use ng_repo::utils::{decode_digest, decode_key, decode_sym_key};
use ng_repo::utils::{decode_overlayid, display_timestamp_local};
use serde_json::Value;
use crate::orm::{OrmDiff, OrmShapeType, OrmUpdateBlankNodeIds};
use crate::orm::{OrmDiff, OrmShapeType};
use crate::types::*;
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -1309,7 +1309,6 @@ pub enum AppResponseV0 {
Commits(Vec<String>),
OrmInitial(Value),
OrmUpdate(OrmDiff),
OrmUpdateBlankNodeIds(OrmUpdateBlankNodeIds),
OrmError(String),
}

@ -48,14 +48,6 @@ pub struct OrmDiffOp {
pub type OrmDiff = Vec<OrmDiffOp>;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OrmUpdateBlankNodeId {
pub path: String,
pub nuri: String,
}
pub type OrmUpdateBlankNodeIds = Vec<OrmUpdateBlankNodeId>;
pub type OrmSchema = HashMap<String, Arc<OrmSchemaShape>>;
#[derive(Clone, Debug, Serialize, Deserialize)]

@ -400,7 +400,6 @@ pub enum VerifierError {
InvalidOrmSchema,
OrmSubjectNotFound,
OrmPredicateNotFound,
OrmSubscriptionNotFound,
}
impl Error for VerifierError {}

@ -295,7 +295,7 @@ impl Verifier {
transaction,
commit_info,
};
self.update_graph(vec![info], 0).await?;
self.update_graph(vec![info]).await?;
} else
//TODO: change the logic here. transaction commits can have both a discrete and graph update. Only one AppResponse should be sent in this case, containing both updates.
if body.discrete.is_some() {
@ -393,16 +393,14 @@ impl Verifier {
// TODO: implement TargetBranchV0::Named
_ => unimplemented!(),
};
if is_publisher {
let _ = branches.entry(branch_id).or_insert((
store.get_store_repo().clone(),
repo.id,
branch_type,
topic_id,
token,
store.overlay_id,
));
}
let _ = branches.entry(branch_id).or_insert((
store.get_store_repo().clone(),
repo.id,
branch_type,
topic_id,
token,
store.overlay_id,
));
let _ = nuri_branches.entry(graph_name.clone()).or_insert((
repo.id,
branch_id,
@ -414,26 +412,17 @@ impl Verifier {
}
}
/// returns
/// - list of commit Nuris
/// - optional list of revert_inserts
/// - optional list of revert_removes
/// - optional list of skolemnized_blank_nodes
pub(crate) async fn prepare_sparql_update(
&mut self,
inserts: Vec<Quad>,
removes: Vec<Quad>,
peer_id: Vec<u8>,
session_id: u64,
) -> Result<(Vec<String>, Vec<Quad>, Vec<Quad>, Vec<Quad>), VerifierError> {
) -> Result<Vec<String>, VerifierError> {
// options when not a publisher on the repo:
// - skip
// - TODO: abort (the whole transaction)
// - TODO: inbox (sent to inbox of document for a suggested update)
// for now we just do skip, without giving option to user
let mut revert_inserts: Vec<Quad> = vec![];
let mut revert_removes: Vec<Quad> = vec![];
let mut skolemnized_blank_nodes: Vec<Quad> = vec![];
let mut inserts_map: HashMap<BranchId, HashSet<Triple>> = HashMap::with_capacity(1);
let mut removes_map: HashMap<BranchId, HashSet<Triple>> = HashMap::with_capacity(1);
let mut branches: HashMap<
@ -448,7 +437,6 @@ impl Verifier {
let (repo_id, branch_id, is_publisher) =
self.find_branch_and_repo_for_quad(&insert, &mut branches, &mut nuri_branches)?;
if !is_publisher {
revert_inserts.push(insert);
continue;
}
let set = inserts_map.entry(branch_id).or_insert_with(|| {
@ -474,7 +462,6 @@ impl Verifier {
let iri =
NuriV0::repo_skolem(&repo_id, &peer_id, b.as_ref().unique_id().unwrap())?;
insert.object = Term::NamedNode(NamedNode::new_unchecked(iri));
skolemnized_blank_nodes.push(insert.clone());
}
}
// TODO deal with triples in subject and object (RDF-STAR)
@ -485,7 +472,6 @@ impl Verifier {
let (repo_id, branch_id, is_publisher) =
self.find_branch_and_repo_for_quad(&remove, &mut branches, &mut nuri_branches)?;
if !is_publisher {
revert_removes.push(remove);
continue;
}
let set = removes_map.entry(branch_id).or_insert_with(|| {
@ -545,21 +531,12 @@ impl Verifier {
};
updates.push(info);
}
match self.update_graph(updates, session_id).await {
Ok(commits) => Ok((
commits,
revert_inserts,
revert_removes,
skolemnized_blank_nodes,
)),
Err(e) => Err(e),
}
self.update_graph(updates).await
}
async fn update_graph(
&mut self,
mut updates: Vec<BranchUpdateInfo>,
session_id: u64,
) -> Result<Vec<String>, VerifierError> {
let updates_ref = &mut updates;
let res = self
@ -763,8 +740,7 @@ impl Verifier {
.await;
} else {
let graph_patch = update.transaction.as_patch();
let nuri = NuriV0::commit(&update.repo_id, &update.commit_id);
commit_nuris.push(nuri);
commit_nuris.push(NuriV0::commit(&update.repo_id, &update.commit_id));
self.push_app_response(
&update.branch_id,
AppResponse::V0(AppResponseV0::Patch(AppPatch {
@ -779,9 +755,7 @@ impl Verifier {
let graph_nuri =
NuriV0::repo_graph_name(&update.repo_id, &update.overlay_id);
self.orm_update(
session_id,
update.repo_id.clone(),
update.overlay_id,
&NuriV0::new_empty(),
update.transaction.as_quads_patch(graph_nuri),
)
.await;
@ -799,8 +773,7 @@ impl Verifier {
query: &String,
base: &Option<String>,
peer_id: Vec<u8>,
session_id: u64,
) -> Result<(Vec<String>, Vec<Quad>, Vec<Quad>, Vec<Quad>), String> {
) -> Result<Vec<String>, String> {
let store = self.graph_dataset.as_ref().unwrap();
let update = ng_oxigraph::oxigraph::sparql::Update::parse(query, base.as_deref())
@ -815,13 +788,12 @@ impl Verifier {
Err(e) => Err(e.to_string()),
Ok((inserts, removes)) => {
if inserts.is_empty() && removes.is_empty() {
Ok((vec![], vec![], vec![], vec![]))
Ok(vec![])
} else {
self.prepare_sparql_update(
Vec::from_iter(inserts),
Vec::from_iter(removes),
peer_id,
session_id,
)
.await
.map_err(|e| e.to_string())

@ -65,7 +65,7 @@ impl Verifier {
<> ng:social_query_from_profile <{from_profile_nuri_string}>.
<> ng:social_query_started \"{}\"^^xsd:dateTime . }}",DateTime::now());
let ret = self
.process_sparql_update(&forwarder_nuri, &sparql_update, &Some(forwarder_nuri_string.clone()), vec![],0)
.process_sparql_update(&forwarder_nuri, &sparql_update, &Some(forwarder_nuri_string.clone()), vec![])
.await;
if let Err(e) = ret {
return Err(VerifierError::SparqlError(e));
@ -78,7 +78,7 @@ impl Verifier {
// adding triples in forwarder doc : ng:social_query_id
let sparql_update = format!("INSERT DATA {{ <{forwarder_nuri_string}> <did:ng:x:ng#{predicate}> \"{}\"^^<http://www.w3.org/2001/XMLSchema#dateTime> . }}",DateTime::now());
let ret = self
.process_sparql_update(forwarder_nuri, &sparql_update, &None, vec![],0)
.process_sparql_update(forwarder_nuri, &sparql_update, &None, vec![])
.await;
if let Err(e) = ret {
return Err(VerifierError::SparqlError(e));
@ -153,7 +153,7 @@ impl Verifier {
<did:ng:_> ng:social_query_forwarded_to_inbox <{to_inbox_nuri}> .
}}");
let ret = self
.process_sparql_update(&forwarder_nuri, &sparql_update, &None, vec![],0)
.process_sparql_update(&forwarder_nuri, &sparql_update, &None, vec![])
.await;
if let Err(e) = ret {
return Err(VerifierError::SparqlError(e));
@ -593,7 +593,7 @@ impl Verifier {
let nuri_ov = NuriV0::repo_graph_name(&response.query_id, &overlay_id);
let graph_name = NamedNode::new_unchecked(&nuri_ov);
let quads = triples.into_iter().map(|t| t.in_graph(graph_name.clone()) ).collect();
let _ = self.prepare_sparql_update(quads, vec![], self.get_peer_id_for_skolem(), 0).await?;
let commits = self.prepare_sparql_update(quads, vec![], self.get_peer_id_for_skolem()).await?;
} else {
@ -665,7 +665,7 @@ impl Verifier {
{has_email} }}", details.name);
let ret = self
.process_sparql_update(&contact_nuri, &sparql_update, &Some(contact_nuri_string), vec![],0)
.process_sparql_update(&contact_nuri, &sparql_update, &Some(contact_nuri_string), vec![])
.await;
if let Err(e) = ret {
return Err(VerifierError::SparqlError(e));

@ -57,18 +57,15 @@ pub fn add_remove_triples(
// Process added triples.
// For each triple, check if it matches the shape.
// In parallel, we record the values added and removed (tracked_changes)
log_debug!("Processing # triples: {}", triples_added.len());
for triple in triples_added {
let obj_term = oxrdf_term_to_orm_basic_type(&triple.object);
log_debug!(" - processing triple {triple}");
log_debug!("processing triple {triple}");
for predicate_schema in &shape.predicates {
if predicate_schema.iri != triple.predicate.as_str() {
// Triple does not match predicate.
continue;
}
log_debug!(
" - Matched triple for datatypes {:?}",
predicate_schema.dataTypes
);
// Predicate schema constraint matches this triple.
let tracked_subject_lock =
get_or_create_tracked_subject(subject_iri, &shape, tracked_subjects);
@ -127,7 +124,7 @@ pub fn add_remove_triples(
None
}
}) {
log_debug!(" - dealing with nested type {shape_iri}");
// log_debug!("dealing with nesting for {shape_iri}");
if let BasicType::Str(obj_iri) = &obj_term {
let tracked_child_arc = {
// Get or create object's tracked subject struct.

@ -14,11 +14,6 @@ pub mod validation;
use futures::channel::mpsc;
use futures::channel::mpsc::UnboundedSender;
use ng_net::types::OverlayLink;
use ng_oxigraph::oxrdf::Quad;
use ng_repo::errors::VerifierError;
use ng_repo::types::OverlayId;
use ng_repo::types::RepoId;
use std::collections::HashMap;
use std::collections::HashSet;
@ -80,7 +75,6 @@ impl Verifier {
return Err(NgError::SparqlError(e.to_string()));
}
Ok(triple) => {
log_debug!("Triple fetched: {:?}", triple);
result_triples.push(triple);
}
}
@ -179,13 +173,9 @@ impl Verifier {
HashMap::new();
// For each subject, add/remove triples and validate.
log_debug!(
"processing modified subjects: {:?} against shape: {}",
modified_subject_iris,
shape.iri
);
log_debug!("all_modified_subjects: {:?}", modified_subject_iris);
for subject_iri in &modified_subject_iris {
for subject_iri in modified_subject_iris {
let validation_key = (shape.iri.clone(), subject_iri.to_string());
// Cycle detection: Check if this (shape, subject) pair is already being validated
@ -197,8 +187,7 @@ impl Verifier {
);
// Mark as invalid due to cycle
// TODO: We could handle this by handling nested references as IRIs.
if let Some(tracked_shapes) =
orm_subscription.tracked_subjects.get(*subject_iri)
if let Some(tracked_shapes) = orm_subscription.tracked_subjects.get(subject_iri)
{
if let Some(tracked_subject) = tracked_shapes.get(&shape.iri) {
let mut ts = tracked_subject.write().unwrap();
@ -212,13 +201,12 @@ impl Verifier {
// Mark as currently validating
currently_validating.insert(validation_key.clone());
// Get triples of subject (added & removed).
let triples_added_for_subj = added_triples_by_subject
.get(*subject_iri)
.get(subject_iri)
.map(|v| v.as_slice())
.unwrap_or(&[]);
let triples_removed_for_subj = removed_triples_by_subject
.get(*subject_iri)
.get(subject_iri)
.map(|v| v.as_slice())
.unwrap_or(&[]);
@ -226,9 +214,9 @@ impl Verifier {
let change = orm_changes
.entry(shape.iri.clone())
.or_insert_with(HashMap::new)
.entry((*subject_iri).clone())
.entry(subject_iri.clone())
.or_insert_with(|| OrmTrackedSubjectChange {
subject_iri: (*subject_iri).clone(),
subject_iri: subject_iri.clone(),
predicates: HashMap::new(),
data_applied: false,
});
@ -260,7 +248,7 @@ impl Verifier {
let validity = {
let tracked_subject_opt = orm_subscription
.tracked_subjects
.get(*subject_iri)
.get(subject_iri)
.and_then(|m| m.get(&shape.iri));
let Some(tracked_subject) = tracked_subject_opt else {
continue;
@ -289,8 +277,14 @@ impl Verifier {
}
}
}
// Remove from validation stack after processing this subject
currently_validating.remove(&validation_key);
}
// TODO: Currently, all shape <-> nested subject combinations are queued for re-evaluation.
// Is that okay?
// Now, we queue all non-evaluated objects
for (shape_iri, objects_to_eval) in &nested_objects_to_eval {
let orm_subscription = self.get_first_orm_subscription_for(
@ -339,10 +333,6 @@ impl Verifier {
shape_validation_stack.push((shape_arc, objects_not_to_fetch));
}
}
for subject_iri in modified_subject_iris {
let validation_key = (shape.iri.clone(), subject_iri.to_string());
currently_validating.remove(&validation_key);
}
}
Ok(())
@ -385,37 +375,6 @@ impl Verifier {
}).next().unwrap()
}
pub fn get_first_orm_subscription_sender_for(
&mut self,
nuri: &NuriV0,
shape: Option<&ShapeIri>,
session_id: Option<&u64>,
) -> Result<(UnboundedSender<AppResponse>, &OrmSubscription), VerifierError> {
let subs = self
.orm_subscriptions
.get_mut(nuri)
.unwrap();
subs.retain(|sub| !sub.sender.is_closed());
match
// Filter shapes, if present.
subs.iter()
.filter(|s| match shape {
Some(sh) => *sh == s.shape_type.shape,
None => true, // Filter session ids if present.
})
.filter(|s| match session_id {
Some(id) => *id == s.session_id,
None => true,
})
.next()
{
None => Err(VerifierError::OrmSubscriptionNotFound),
Some(subscription) => {
Ok((subscription.sender.clone(), subscription))
}
}
}
/// Apply triples to a nuri's document.
/// Updates tracked_subjects in orm_subscriptions.
fn apply_triple_changes(
@ -658,115 +617,32 @@ impl Verifier {
return Ok(return_vals);
}
pub(crate) async fn orm_update(
&mut self,
session_id: u64,
repo_id: RepoId,
overlay_id: OverlayId,
patch: GraphQuadsPatch,
) {
let overlaylink: OverlayLink = overlay_id.into();
for (scope, subs) in self.orm_subscriptions.iter_mut() {
subs.retain(|sub| !sub.sender.is_closed());
if scope.entire_store
|| scope.overlay.as_ref().map_or(false, |ol| overlaylink == *ol)
|| scope.target == NuriTargetV0::Repo(repo_id)
{
for sub in subs {
if sub.session_id != session_id { // this is incorrect. we are excluding all the subscriptions from the originating session,
// while we should only exclude the one with exact same shape_type. but we don't have access to that here
// TODO: implement this, generate orm_diff using the patch and the sub.shape_type
let orm_diff: OrmDiff = vec![];
pub(crate) async fn orm_update(&mut self, scope: &NuriV0, patch: GraphQuadsPatch) {}
_ = sub.sender.send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff.to_vec()))).await;
}
}
}
}
}
pub(crate) async fn orm_update_self(
pub(crate) async fn orm_frontend_update(
&mut self,
scope: &NuriV0,
shape_iri: ShapeIri,
session_id: u64,
skolemnized_blank_nodes: Vec<Quad>,
revert_inserts: Vec<Quad>,
revert_removes: Vec<Quad>,
) -> Result<(), VerifierError> {
let (mut sender, orm_subscription) =
self.get_first_orm_subscription_sender_for(scope, Some(&shape_iri), Some(&session_id))?;
// TODO prepare OrmUpdateBlankNodeIds with skolemnized_blank_nodes
// use orm_subscription if needed
// note(niko): I think skolemnized blank nodes can still be many, in case of multi-level nested sub-objects.
let orm_bnids = vec![];
let _ = sender.send(AppResponse::V0(AppResponseV0::OrmUpdateBlankNodeIds(orm_bnids))).await;
// TODO (later) revert the inserts and removes
// let orm_diff = vec![];
// let _ = sender.send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff))).await;
Ok(())
diff: OrmDiff,
) {
log_info!("frontend_update_orm {:?} {} {:?}", scope, shape_iri, diff);
}
pub(crate) async fn orm_frontend_update(
pub(crate) async fn push_orm_response(
&mut self,
nuri: &NuriV0,
session_id: u64,
scope: &NuriV0,
shape_iri: ShapeIri,
diff: OrmDiff,
) -> Result<(), String> {
log_info!(
"frontend_update_orm session={} scope={:?} shape={} diff={:?}",
session_id,
scope,
shape_iri,
diff
);
// find OrmSubscription
let (doc_nuri, sparql_update) = {
let orm_subscription =
self.get_first_orm_subscription_for(scope, Some(&shape_iri), Some(&session_id));
// use orm_subscription as needed
// do the magic, then, find the doc where the query should start and generate the sparql update
let doc_nuri = NuriV0::new_empty();
let sparql_update: String = String::new();
(doc_nuri, sparql_update)
};
sender: UnboundedSender<AppResponse>,
response: AppResponse,
) {
log_debug!("sending orm response for session {}:", session_id);
match self
.process_sparql_update(
&doc_nuri,
&sparql_update,
&None,
self.get_peer_id_for_skolem(),
session_id,
)
.await
{
Err(e) => Err(e),
Ok((_, revert_inserts, revert_removes, skolemnized_blank_nodes)) => {
if !revert_inserts.is_empty()
|| !revert_removes.is_empty()
|| !skolemnized_blank_nodes.is_empty()
{
self.orm_update_self(
scope,
shape_iri,
session_id,
skolemnized_blank_nodes,
revert_inserts,
revert_removes,
)
.await.map_err(|e|e.to_string())?;
}
Ok(())
}
if sender.is_closed() {
log_debug!("closed so removing session {}", session_id);
self.orm_subscriptions.remove(&nuri);
} else {
let _ = sender.clone().send(response).await;
}
}
@ -785,7 +661,7 @@ impl Verifier {
shape_type: &OrmShapeType,
session_id: u64,
) -> Result<(Receiver<AppResponse>, CancelFn), NgError> {
let (mut tx, rx) = mpsc::unbounded::<AppResponse>();
let (tx, rx) = mpsc::unbounded::<AppResponse>();
// TODO: Validate schema:
// If multiple data types are present for the same predicate, they must be of of the same type.
@ -805,13 +681,19 @@ impl Verifier {
.or_insert(vec![])
.push(orm_subscription);
let orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type)?;
// log_debug!("create_orm_object_for_shape return {:?}", orm_objects);
let _orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type)?;
// log_debug!("create_orm_object_for_shape return {:?}", _orm_objects);
let _ = tx.send(AppResponse::V0(AppResponseV0::OrmInitial(orm_objects))).await;
self.push_orm_response(
&nuri.clone(),
session_id,
tx.clone(),
AppResponse::V0(AppResponseV0::OrmInitial(_orm_objects)),
)
.await;
let close = Box::new(move || {
log_debug!("closing ORM subscription");
log_debug!("CLOSE_CHANNEL of subscription");
if !tx.is_closed() {
tx.close_channel();
}

@ -80,7 +80,6 @@ pub fn shape_type_to_sparql(
let mut visited_shapes: HashSet<ShapeIri> = HashSet::new();
// Recursive function to call for (nested) shapes.
// Returns nested WHERE statements that should be included with this shape's binding.
fn process_shape(
schema: &OrmSchema,
shape: &OrmSchemaShape,
@ -89,30 +88,27 @@ pub fn shape_type_to_sparql(
where_statements: &mut Vec<String>,
var_counter: &mut i32,
visited_shapes: &mut HashSet<String>,
in_recursion: bool,
) -> Vec<String> {
) {
// Prevent infinite recursion on cyclic schemas.
// TODO: We could handle this as IRI string reference.
if visited_shapes.contains(&shape.iri) {
return vec![];
return;
}
let mut new_where_statements: Vec<String> = vec![];
let mut new_construct_statements: Vec<String> = vec![];
visited_shapes.insert(shape.iri.clone());
// Add statements for each predicate.
// If we are in recursion, we want to get all triples.
// That's why we add a "<subject> ?p ?o" statement afterwards
// and the extra construct statements are skipped.
for predicate in &shape.predicates {
let mut union_branches = Vec::new();
let mut nested_where_statements = Vec::new();
let mut allowed_literals = Vec::new();
// Predicate constraints might have more than one acceptable nested shape. Traverse each.
// Predicate constraints might have more than one acceptable data type. Traverse each.
// It is assumed that constant literals, nested shapes and regular types are not mixed.
for datatype in &predicate.dataTypes {
if datatype.valType == OrmSchemaLiteralType::shape {
if datatype.valType == OrmSchemaLiteralType::literal {
// Collect allowed literals and as strings
// (already in SPARQL-format, e.g. `"a astring"`, `<http:ex.co/>`, `true`, or `42`).
allowed_literals.extend(literal_to_sparql_str(datatype.clone()));
} else if datatype.valType == OrmSchemaLiteralType::shape {
let shape_iri = &datatype.shape.clone().unwrap();
let nested_shape = schema.get(shape_iri).unwrap();
@ -121,12 +117,10 @@ pub fn shape_type_to_sparql(
// Each shape option gets its own var.
let obj_var_name = get_new_var_name(var_counter);
if !in_recursion {
new_construct_statements.push(format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, obj_var_name
));
}
construct_statements.push(format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, obj_var_name
));
// Those are later added to a UNION, if there is more than one shape.
union_branches.push(format!(
" ?{} <{}> ?{}",
@ -134,8 +128,7 @@ pub fn shape_type_to_sparql(
));
// Recurse to add statements for nested object.
// Collect nested WHERE statements to include within this predicate's scope.
let nested_stmts = process_shape(
process_shape(
schema,
nested_shape,
&obj_var_name,
@ -143,86 +136,64 @@ pub fn shape_type_to_sparql(
where_statements,
var_counter,
visited_shapes,
true,
);
nested_where_statements.extend(nested_stmts);
}
}
// The where statement (which may be wrapped in OPTIONAL).
// The where statement which might be wrapped in OPTIONAL.
let where_body: String;
if !union_branches.is_empty() {
if !allowed_literals.is_empty()
&& !predicate.extra.unwrap_or(false)
&& predicate.minCardinality > 0
{
// If we have literal requirements and they are not optional ("extra"),
// Add CONSTRUCT, WHERE, and FILTER.
let pred_var_name = get_new_var_name(var_counter);
construct_statements.push(format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, pred_var_name
));
where_body = format!(
" ?{s} <{p}> ?{o} . \n FILTER(?{o} IN ({lits}))",
s = subject_var_name,
p = predicate.iri,
o = pred_var_name,
lits = allowed_literals.join(", ")
);
} else if !union_branches.is_empty() {
// We have nested shape(s) which were already added to CONSTRUCT above.
// Join them with UNION and include nested WHERE statements.
// Join them with UNION.
let union_body = union_branches
where_body = union_branches
.into_iter()
.map(|b| format!("{{\n{}\n}}", b))
.collect::<Vec<_>>()
.join(" UNION ");
// Combine the parent binding with nested statements
if !nested_where_statements.is_empty() {
let nested_joined = nested_where_statements.join(" .\n");
where_body = format!("{} .\n{}", union_body, nested_joined);
} else {
where_body = union_body;
}
} else {
// Regular predicate data type. Just add basic CONSTRUCT and WHERE statements.
let obj_var_name = get_new_var_name(var_counter);
if !in_recursion {
// Only add construct, if we don't have catch-all statement already.
new_construct_statements.push(format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, obj_var_name
));
}
let pred_var_name = get_new_var_name(var_counter);
construct_statements.push(format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, pred_var_name
));
where_body = format!(
" ?{} <{}> ?{}",
subject_var_name, predicate.iri, obj_var_name
subject_var_name, predicate.iri, pred_var_name
);
}
// Wrap in optional, if predicate is optional
// Wrap in optional, if necessary.
if predicate.minCardinality < 1 {
new_where_statements.push(format!(" OPTIONAL {{\n{}\n }}", where_body));
where_statements.push(format!(" OPTIONAL {{\n{}\n }}", where_body));
} else {
new_where_statements.push(where_body);
where_statements.push(where_body);
};
}
if in_recursion {
// All statements in recursive objects need to be optional
// because we want to fetch _all_ nested objects,
// invalid ones too, for later validation.
let pred_var_name = get_new_var_name(var_counter);
let obj_var_name = get_new_var_name(var_counter);
// The "catch any triple in subject" construct statement
construct_statements.push(format!(
" ?{} ?{} ?{}",
subject_var_name, pred_var_name, obj_var_name
));
let joined_where_statements = new_where_statements.join(" .\n");
// Return nested statements to be included in parent's scope
// Combine catch-all with specific predicates in a UNION
let nested_block = format!(
" {{\n {{?{} ?{} ?{}}}\n UNION {{\n {}\n }}\n }}",
subject_var_name, pred_var_name, obj_var_name, joined_where_statements
);
visited_shapes.remove(&shape.iri);
return vec![nested_block];
} else {
where_statements.append(&mut new_where_statements);
construct_statements.append(&mut new_construct_statements);
}
visited_shapes.remove(&shape.iri);
vec![]
}
let root_shape = schema.get(shape).ok_or(VerifierError::InvalidOrmSchema)?;
@ -238,12 +209,11 @@ pub fn shape_type_to_sparql(
&mut where_statements,
&mut var_counter,
&mut visited_shapes,
false,
);
// Filter subjects, if present.
if let Some(subjects) = filter_subjects {
// log_debug!("filter_subjects: {:?}", subjects);
log_debug!("filter_subjects: {:?}", subjects);
let subjects_str = subjects
.iter()
.map(|s| format!("<{}>", s))
@ -262,6 +232,7 @@ pub fn shape_type_to_sparql(
construct_body, where_body
))
}
/// SPARQL literal escape: backslash, quotes, newlines, tabs.
fn escape_literal(lit: &str) -> String {
let mut out = String::with_capacity(lit.len() + 4);

@ -36,12 +36,6 @@ impl Verifier {
// Keep track of objects that need to be validated against a shape to fetch and validate.
let mut need_evaluation: Vec<(String, String, bool)> = vec![];
log_debug!(
"[Validation] for shape {} and subject {}",
shape.iri,
s_change.subject_iri
);
// Check 1) Check if this object is untracked and we need to remove children and ourselves.
if previous_validity == OrmTrackedSubjectValidity::Untracked {
// 1.1) Schedule children for deletion
@ -112,7 +106,7 @@ impl Verifier {
// Check 3.1) Cardinality
if count < p_schema.minCardinality {
log_debug!(
" - Invalid: minCardinality not met | predicate: {:?} | count: {} | min: {} | schema: {:?} | changed: {:?}",
"[VALIDATION] Invalid: minCardinality not met | predicate: {:?} | count: {} | min: {} | schema: {:?} | changed: {:?}",
p_schema.iri,
count,
p_schema.minCardinality,
@ -131,7 +125,7 @@ impl Verifier {
&& p_schema.extra != Some(true)
{
log_debug!(
" - Invalid: maxCardinality exceeded | predicate: {:?} | count: {} | max: {} | schema: {:?} | changed: {:?}",
"[VALIDATION] Invalid: maxCardinality exceeded | predicate: {:?} | count: {} | max: {} | schema: {:?} | changed: {:?}",
p_schema.iri,
count,
p_schema.maxCardinality,
@ -177,13 +171,12 @@ impl Verifier {
);
if !some_valid {
log_debug!(
" - Invalid: required literals missing | predicate: {:?} | schema: {:?} | changed: {:?}",
"[VALIDATION] Invalid: required literals missing | predicate: {:?} | schema: {:?} | changed: {:?}",
p_schema.iri,
shape.iri,
p_change
);
set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid);
break;
}
// Check 3.4) Nested shape correct.
} else if p_schema
@ -198,7 +191,6 @@ impl Verifier {
.map(|tc| tc.read().unwrap())
.collect::<Vec<_>>()
});
// First, Count valid, invalid, unknowns, and untracked
let counts = tracked_children.as_ref().map_or((0, 0, 0, 0), |children| {
children
@ -221,11 +213,9 @@ impl Verifier {
})
});
log_debug!(" - checking nested - Counts: {:?}", counts);
if counts.1 > 0 && p_schema.extra != Some(true) {
log_debug!(
" - Invalid: nested invalid child | predicate: {:?} | schema: {:?} | changed: {:?}",
"[VALIDATION] Invalid: nested invalid child | predicate: {:?} | schema: {:?} | changed: {:?}",
p_schema.iri,
shape.iri,
p_change
@ -236,7 +226,7 @@ impl Verifier {
break;
} else if counts.0 > p_schema.maxCardinality && p_schema.maxCardinality != -1 {
log_debug!(
" - Invalid: Too many valid children: | predicate: {:?} | schema: {:?} | changed: {:?}",
"[VALIDATION] Too many valid children: | predicate: {:?} | schema: {:?} | changed: {:?}",
p_schema.iri,
shape.iri,
p_change
@ -246,7 +236,7 @@ impl Verifier {
break;
} else if counts.0 + counts.2 + counts.3 < p_schema.minCardinality {
log_debug!(
" - Invalid: not enough nested children | predicate: {:?} | valid_count: {} | min: {} | schema: {:?} | changed: {:?}",
"[VALIDATION] Invalid: not enough nested children | predicate: {:?} | valid_count: {} | min: {} | schema: {:?} | changed: {:?}",
p_schema.iri,
counts.0,
p_schema.minCardinality,
@ -279,7 +269,7 @@ impl Verifier {
}
});
} else if counts.2 > 0 {
// If we have pending children, we need to wait for their evaluation.
// If we have pending nested objects, we need to wait for their evaluation.
set_validity(&mut new_validity, OrmTrackedSubjectValidity::Pending);
// Schedule pending children for re-evaluation without fetch.
tracked_children.as_ref().map(|children| {
@ -317,7 +307,7 @@ impl Verifier {
};
if !matches {
log_debug!(
" - Invalid: value type mismatch | predicate: {:?} | value: {:?} | allowed_types: {:?} | schema: {:?} | changed: {:?}",
"[VALIDATION] Invalid: value type mismatch | predicate: {:?} | value: {:?} | allowed_types: {:?} | schema: {:?} | changed: {:?}",
p_schema.iri,
val_added,
allowed_types,

@ -212,14 +212,12 @@ impl Verifier {
if inserts.is_empty() && removes.is_empty() {
Ok(())
} else {
let _ = self
.prepare_sparql_update(
Vec::from_iter(inserts),
Vec::from_iter(removes),
self.get_peer_id_for_skolem(),
0,
)
.await?;
self.prepare_sparql_update(
Vec::from_iter(inserts),
Vec::from_iter(removes),
self.get_peer_id_for_skolem(),
)
.await?;
Ok(())
}
}
@ -698,7 +696,7 @@ impl Verifier {
);
let ret = self
.process_sparql_update(&store_nuri, &query, &None, vec![], 0)
.process_sparql_update(&store_nuri, &query, &None, vec![])
.await;
if let Err(e) = ret {
return Err(NgError::SparqlError(e));
@ -714,9 +712,7 @@ impl Verifier {
object: Literal::new_simple_literal(primary_class).into(),
graph_name: NamedNode::new_unchecked(&header_branch_nuri).into(),
};
let ret = self
.prepare_sparql_update(vec![quad], vec![], vec![], 0)
.await;
let ret = self.prepare_sparql_update(vec![quad], vec![], vec![]).await;
if let Err(e) = ret {
return Err(NgError::SparqlError(e.to_string()));
}
@ -818,7 +814,6 @@ impl Verifier {
&sparql_update,
&Some(contact_doc_nuri_string),
vec![],
0,
)
.await;
if let Err(e) = ret {
@ -899,18 +894,11 @@ impl Verifier {
command: &AppRequestCommandV0,
nuri: NuriV0,
payload: Option<AppRequestPayload>,
session_id: u64,
) -> Result<AppResponse, NgError> {
match command {
AppRequestCommandV0::OrmUpdate => match payload {
Some(AppRequestPayload::V0(AppRequestPayloadV0::OrmUpdate((diff, shape_id)))) => {
return match self
.orm_frontend_update(session_id, &nuri, shape_id, diff)
.await
{
Err(e) => Ok(AppResponse::error(e)),
Ok(()) => Ok(AppResponse::ok()),
}
self.orm_frontend_update(&nuri, shape_id, diff).await
}
_ => return Err(NgError::InvalidArgument),
},
@ -1005,7 +993,7 @@ impl Verifier {
let social_query_doc_nuri_string = NuriV0::repo_id(query_id);
let sparql_update = format!("INSERT DATA {{ <{social_query_doc_nuri_string}> <did:ng:x:ng#social_query_forwarder> <{forwarder_nuri_string}>. }}");
let ret = self
.process_sparql_update(&nuri, &sparql_update, &None, vec![], 0)
.process_sparql_update(&nuri, &sparql_update, &None, vec![])
.await;
if let Err(e) = ret {
return Err(NgError::SparqlError(e));
@ -1020,7 +1008,6 @@ impl Verifier {
&sparql_update,
&Some(forwarder_nuri_string),
vec![],
0,
)
.await;
if let Err(e) = ret {
@ -1230,12 +1217,11 @@ impl Verifier {
&sparql,
&base,
self.get_peer_id_for_skolem(),
session_id,
)
.await
{
Err(e) => AppResponse::error(e),
Ok((commits, ..)) => AppResponse::commits(commits),
Ok(commits) => AppResponse::commits(commits),
},
)
} else {

@ -2836,10 +2836,7 @@ impl Verifier {
pub async fn app_request(&mut self, req: AppRequest) -> Result<AppResponse, NgError> {
match req {
AppRequest::V0(v0) => {
self.process(&v0.command, v0.nuri, v0.payload, v0.session_id)
.await
}
AppRequest::V0(v0) => self.process(&v0.command, v0.nuri, v0.payload).await,
}
}

@ -15,8 +15,5 @@
"prettier": "^3.6.2",
"prettier-plugin-svelte": "^3.4.0"
},
"engines": {
"node": ">=22.18"
},
"packageManager": "pnpm@10.15.0+sha512.486ebc259d3e999a4e8691ce03b5cac4a71cbeca39372a9b762cb500cfdf0873e2cb16abe3d951b1ee2cf012503f027b98b6584e4df22524e0c7450d9ec7aa7b"
}

@ -43,8 +43,5 @@
"@types/react-dom": "19.1.7",
"vite": "7.1.3",
"vitest": "^3.2.4"
},
"engines": {
"node": ">=22.18"
}
}

@ -6,21 +6,21 @@ import VueRoot from "../components/VueRoot.vue";
import ReactRoot from "../components/ReactRoot";
import SvelteRoot from "../components/SvelteRoot.svelte";
// Hack to get mock backend started
//import { mockTestObject } from "../../ng-mock/wasm-land/shapeHandler";
import { mockTestObject } from "../../ng-mock/wasm-land/shapeHandler";
const title = "Multi-framework app";
---
<Layout title={title}>
<Highlight vue>
<VueRoot client:only />
</Highlight>
<Highlight vue>
<VueRoot client:only />
</Highlight>
<Highlight react>
<ReactRoot client:only="react" />
</Highlight>
<Highlight react>
<ReactRoot client:only="react" />
</Highlight>
<Highlight svelte>
<SvelteRoot client:only />
</Highlight>
<Highlight svelte>
<SvelteRoot client:only />
</Highlight>
</Layout>

@ -1,8 +1,6 @@
import type { Diff, Scope } from "../types.ts";
import { applyDiff } from "./applyDiff.ts";
import ng from "@nextgraph-monorepo/ng-sdk-js";
import {
deepSignal,
watch,
@ -113,7 +111,7 @@ const setUpConnection = (entry: PoolEntry<any>, wasmMessage: WasmMessage) => {
};
// Handler for messages from wasm land.
const onMessage = (event: MessageEvent<WasmMessage>) => {
const onWasmMessage = (event: MessageEvent<WasmMessage>) => {
console.debug("[JsLand] onWasmMessage", event);
const { diff, connectionId, type } = event.data;
@ -123,12 +121,7 @@ const onMessage = (event: MessageEvent<WasmMessage>) => {
// And only process messages that are addressed to js-land.
if (type === "FrontendUpdate") return;
if (type === "Request") {
// TODO: Handle message from wasm land and js land
// in different functions
return;
}
if (type === "Request") return;
if (type === "Stop") return;
if (type === "InitialResponse") {
@ -144,7 +137,7 @@ const keyToEntry = new Map<string, PoolEntry<any>>();
const connectionIdToEntry = new Map<string, PoolEntry<any>>();
const communicationChannel = new BroadcastChannel("shape-manager");
communicationChannel.addEventListener("message", onMessage);
communicationChannel.addEventListener("message", onWasmMessage);
// FinalizationRegistry to clean up connections when signal objects are GC'd.
const cleanupSignalRegistry =
@ -217,11 +210,16 @@ export function createSignalObjectForShape<T extends BaseType>(
keyToEntry.set(key, entry);
connectionIdToEntry.set(entry.connectionId, entry);
communicationChannel.postMessage({
type: "Request",
connectionId: entry.connectionId,
shapeType,
} as WasmMessage);
// TODO: Just a hack since the channel is not set up in mock-mode
setTimeout(
() =>
communicationChannel.postMessage({
type: "Request",
connectionId: entry.connectionId,
shapeType,
} as WasmMessage),
100
);
function buildReturn(entry: PoolEntry<T>) {
return {

Loading…
Cancel
Save