From 32c17eb543cf71d6bd4977da3b1a2aeefe56af9b Mon Sep 17 00:00:00 2001 From: Laurin Weger Date: Sat, 11 Oct 2025 23:27:59 +0200 Subject: [PATCH 1/7] wip and more tests --- nextgraph/src/tests/orm.rs | 304 +++++++++++++++++- ng-verifier/src/orm/utils.rs | 83 ++--- package.json | 3 + .../multi-framework-signals/package.json | 3 + 4 files changed, 342 insertions(+), 51 deletions(-) diff --git a/nextgraph/src/tests/orm.rs b/nextgraph/src/tests/orm.rs index b829893..004051a 100644 --- a/nextgraph/src/tests/orm.rs +++ b/nextgraph/src/tests/orm.rs @@ -330,6 +330,29 @@ INSERT DATA { async fn test_orm_creation() { // Setup wallet and document let (_wallet, session_id) = create_or_open_wallet().await; + + // Tests below all in this test, to prevent waiting times through wallet creation. + + // === + test_orm_big_object(session_id).await; + + // === + test_orm_root_array(session_id).await; + + // === + test_orm_with_optional(session_id).await; + + // === + test_orm_literal(session_id).await; + + // === + test_orm_multi_type(session_id).await; + + // === + test_orm_nested(session_id).await; +} + +async fn test_orm_big_object(session_id: u64) { let doc_nuri = create_doc_with_data( session_id, r#" @@ -413,16 +436,6 @@ INSERT DATA { } } cancel_fn(); - // - - // === - test_orm_root_array(session_id).await; - - // === - test_orm_with_optional(session_id).await; - - // === - test_orm_literal(session_id).await; } async fn test_orm_root_array(session_id: u64) { @@ -655,9 +668,6 @@ INSERT DATA { .into(), ); - // TODO ======= - // obj3 valid even though it should not. - let shape_type = OrmShapeType { schema, shape: "http://example.org/OptionShape".to_string(), @@ -683,6 +693,274 @@ INSERT DATA { } cancel_fn(); } + +async fn test_orm_multi_type(session_id: u64) { + let doc_nuri = create_doc_with_data( + session_id, + r#" +PREFIX ex: +INSERT DATA { + + ex:strOrNum "a string" ; + ex:strOrNum "another string" ; + ex:strOrNum 2 . + + # Invalid because false is not string or number. + + ex:strOrNum "a string2" ; + ex:strOrNum 2 ; + ex:strOrNum false . +} +"# + .to_string(), + ) + .await; + + // Define the ORM schema + let mut schema = HashMap::new(); + schema.insert( + "http://example.org/MultiTypeShape".to_string(), + OrmSchemaShape { + iri: "http://example.org/MultiTypeShape".to_string(), + predicates: vec![OrmSchemaPredicate { + iri: "http://example.org/strOrNum".to_string(), + extra: Some(true), + maxCardinality: -1, + minCardinality: 1, + readablePredicate: "strOrNum".to_string(), + dataTypes: vec![ + OrmSchemaDataType { + valType: OrmSchemaLiteralType::string, + literals: None, + shape: None, + }, + OrmSchemaDataType { + valType: OrmSchemaLiteralType::number, + literals: None, + shape: None, + }, + ], + } + .into()], + } + .into(), + ); + + let shape_type = OrmShapeType { + schema, + shape: "http://example.org/MultiTypeShape".to_string(), + }; + + let nuri = NuriV0::new_from(&doc_nuri).expect("parse nuri"); + let (mut receiver, cancel_fn) = orm_start(nuri, shape_type, session_id) + .await + .expect("orm_start"); + + while let Some(app_response) = receiver.next().await { + let orm_json = match app_response { + AppResponse::V0(v) => match v { + AppResponseV0::OrmInitial(json) => Some(json), + _ => None, + }, + } + .unwrap(); + + log_info!("ORM JSON arrived for multi type test\n: {:?}", orm_json); + + break; + } + cancel_fn(); +} + +async fn test_orm_nested(session_id: u64) { + let doc_nuri = create_doc_with_data( + session_id, + r#" +PREFIX ex: +INSERT DATA { + # Valid + + ex:str "obj1 str" ; + ex:nestedWithExtra [ + ex:nestedStr "obj1 nested with extra valid" ; + ex:nestedNum 2 + ] , [ + # Invalid, nestedNum is missing but okay because extra. + ex:nestedStr "obj1 nested with extra invalid" + ] ; + ex:nestedWithoutExtra [ + ex:nestedStr "obj1 nested without extra valid" ; + ex:nestedNum 2 + ] . + + # Invalid because nestedWithoutExtra has an invalid child. + + ex:str "obj2 str" ; + ex:nestedWithExtra [ + ex:nestedStr "obj2: a nested string valid" ; + ex:nestedNum 2 + ] ; + ex:nestedWithoutExtra [ + ex:nestedStr "obj2 nested without extra valid" ; + ex:nestedNum 2 + ] , + # Invalid because nestedNum is missing. + [ + ex:nestedStr "obj2 nested without extra invalid" + ] . +} +"# + .to_string(), + ) + .await; + + // Define the ORM schema + let mut schema = HashMap::new(); + schema.insert( + "http://example.org/RootShape".to_string(), + OrmSchemaShape { + iri: "http://example.org/RootShape".to_string(), + predicates: vec![ + OrmSchemaPredicate { + iri: "http://example.org/str".to_string(), + extra: None, + maxCardinality: 1, + minCardinality: 1, + readablePredicate: "str".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::string, + literals: None, + shape: None, + }], + } + .into(), + OrmSchemaPredicate { + iri: "http://example.org/nestedWithExtra".to_string(), + extra: Some(true), + maxCardinality: 1, + minCardinality: 1, + readablePredicate: "nestedWithExtra".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::shape, + literals: None, + shape: Some("http://example.org/NestedShapeWithExtra".to_string()), + }], + } + .into(), + OrmSchemaPredicate { + iri: "http://example.org/nestedWithoutExtra".to_string(), + extra: Some(false), + maxCardinality: 1, + minCardinality: 1, + readablePredicate: "nestedWithoutExtra".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::shape, + literals: None, + shape: Some("http://example.org/NestedShapeWithoutExtra".to_string()), + }], + } + .into(), + ], + } + .into(), + ); + schema.insert( + "http://example.org/NestedShapeWithExtra".to_string(), + OrmSchemaShape { + iri: "http://example.org/NestedShapeWithExtra".to_string(), + predicates: vec![ + OrmSchemaPredicate { + iri: "http://example.org/nestedStr".to_string(), + extra: None, + readablePredicate: "nestedStr".to_string(), + maxCardinality: 1, + minCardinality: 1, + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::string, + literals: None, + shape: None, + }], + } + .into(), + OrmSchemaPredicate { + iri: "http://example.org/nestedNum".to_string(), + extra: None, + readablePredicate: "nestedNum".to_string(), + maxCardinality: 1, + minCardinality: 1, + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::number, + literals: None, + shape: None, + }], + } + .into(), + ], + } + .into(), + ); + schema.insert( + "http://example.org/NestedShapeWithoutExtra".to_string(), + OrmSchemaShape { + iri: "http://example.org/NestedShapeWithoutExtra".to_string(), + predicates: vec![ + OrmSchemaPredicate { + iri: "http://example.org/nestedStr".to_string(), + extra: None, + readablePredicate: "nestedStr".to_string(), + maxCardinality: 1, + minCardinality: 1, + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::string, + literals: None, + shape: None, + }], + } + .into(), + OrmSchemaPredicate { + iri: "http://example.org/nestedNum".to_string(), + extra: None, + readablePredicate: "nestedNum".to_string(), + maxCardinality: 1, + minCardinality: 1, + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::number, + literals: None, + shape: None, + }], + } + .into(), + ], + } + .into(), + ); + + let shape_type = OrmShapeType { + schema, + shape: "http://example.org/RootShape".to_string(), + }; + + let nuri = NuriV0::new_from(&doc_nuri).expect("parse nuri"); + let (mut receiver, cancel_fn) = orm_start(nuri, shape_type, session_id) + .await + .expect("orm_start"); + + while let Some(app_response) = receiver.next().await { + let orm_json = match app_response { + AppResponse::V0(v) => match v { + AppResponseV0::OrmInitial(json) => Some(json), + _ => None, + }, + } + .unwrap(); + + log_info!("ORM JSON arrived for nested test\n: {:?}", orm_json); + + break; + } + cancel_fn(); +} + // // Helpers fn create_big_schema() -> OrmSchema { diff --git a/ng-verifier/src/orm/utils.rs b/ng-verifier/src/orm/utils.rs index 54aad45..3a1662f 100644 --- a/ng-verifier/src/orm/utils.rs +++ b/ng-verifier/src/orm/utils.rs @@ -88,27 +88,26 @@ pub fn shape_type_to_sparql( where_statements: &mut Vec, var_counter: &mut i32, visited_shapes: &mut HashSet, + in_recursion: bool, ) { // Prevent infinite recursion on cyclic schemas. // TODO: We could handle this as IRI string reference. if visited_shapes.contains(&shape.iri) { return; } + + let mut new_where_statements: Vec = vec![]; + let mut new_construct_statements: Vec = vec![]; + visited_shapes.insert(shape.iri.clone()); // Add statements for each predicate. for predicate in &shape.predicates { let mut union_branches = Vec::new(); - let mut allowed_literals = Vec::new(); - // Predicate constraints might have more than one acceptable data type. Traverse each. - // It is assumed that constant literals, nested shapes and regular types are not mixed. + // Predicate constraints might have more than one acceptable nested shape. Traverse each. for datatype in &predicate.dataTypes { - if datatype.valType == OrmSchemaLiteralType::literal { - // Collect allowed literals and as strings - // (already in SPARQL-format, e.g. `"a astring"`, ``, `true`, or `42`). - allowed_literals.extend(literal_to_sparql_str(datatype.clone())); - } else if datatype.valType == OrmSchemaLiteralType::shape { + if datatype.valType == OrmSchemaLiteralType::shape { let shape_iri = &datatype.shape.clone().unwrap(); let nested_shape = schema.get(shape_iri).unwrap(); @@ -117,7 +116,7 @@ pub fn shape_type_to_sparql( // Each shape option gets its own var. let obj_var_name = get_new_var_name(var_counter); - construct_statements.push(format!( + new_construct_statements.push(format!( " ?{} <{}> ?{}", subject_var_name, predicate.iri, obj_var_name )); @@ -136,33 +135,15 @@ pub fn shape_type_to_sparql( where_statements, var_counter, visited_shapes, + true, ); } } - // The where statement which might be wrapped in OPTIONAL. + // The where statement (which may be wrapped in OPTIONAL). let where_body: String; - if !allowed_literals.is_empty() - && !predicate.extra.unwrap_or(false) - && predicate.minCardinality > 0 - { - // If we have literal requirements and they are not optional ("extra"), - // Add CONSTRUCT, WHERE, and FILTER. - - let pred_var_name = get_new_var_name(var_counter); - construct_statements.push(format!( - " ?{} <{}> ?{}", - subject_var_name, predicate.iri, pred_var_name - )); - where_body = format!( - " ?{s} <{p}> ?{o} . \n FILTER(?{o} IN ({lits}))", - s = subject_var_name, - p = predicate.iri, - o = pred_var_name, - lits = allowed_literals.join(", ") - ); - } else if !union_branches.is_empty() { + if !union_branches.is_empty() { // We have nested shape(s) which were already added to CONSTRUCT above. // Join them with UNION. @@ -174,25 +155,50 @@ pub fn shape_type_to_sparql( } else { // Regular predicate data type. Just add basic CONSTRUCT and WHERE statements. - let pred_var_name = get_new_var_name(var_counter); - construct_statements.push(format!( + let obj_var_name = get_new_var_name(var_counter); + new_construct_statements.push(format!( " ?{} <{}> ?{}", - subject_var_name, predicate.iri, pred_var_name + subject_var_name, predicate.iri, obj_var_name )); where_body = format!( " ?{} <{}> ?{}", - subject_var_name, predicate.iri, pred_var_name + subject_var_name, predicate.iri, obj_var_name ); } - // Wrap in optional, if necessary. + // Wrap in optional, if predicate is optional if predicate.minCardinality < 1 { - where_statements.push(format!(" OPTIONAL {{\n{}\n }}", where_body)); + new_where_statements.push(format!(" OPTIONAL {{\n{}\n }}", where_body)); } else { - where_statements.push(where_body); + new_where_statements.push(where_body); }; } + if in_recursion { + // All statements in recursive objects need to be optional + // because we want to fetch _all_ nested objects, + // invalid ones too, for later validation. + let pred_var_name = get_new_var_name(var_counter); + let obj_var_name = get_new_var_name(var_counter); + + // The "catch any triple in subject" where statement + construct_statements.push(format!( + " ?{} ?{} ?{}", + subject_var_name, pred_var_name, obj_var_name + )); + + let joined_where_statements = new_where_statements.join(" .\n"); + + // We do a join of the where statements (which will take care of querying further nested objects) + // and the "catch any triple in subject" where statement. + where_statements.push(format!( + " {{?{} ?{} ?{}}}\n UNION {{\n {}\n }}", + subject_var_name, pred_var_name, obj_var_name, joined_where_statements + )); + } else { + where_statements.append(&mut new_where_statements); + construct_statements.append(&mut new_construct_statements); + } visited_shapes.remove(&shape.iri); } @@ -209,11 +215,12 @@ pub fn shape_type_to_sparql( &mut where_statements, &mut var_counter, &mut visited_shapes, + false, ); // Filter subjects, if present. if let Some(subjects) = filter_subjects { - log_debug!("filter_subjects: {:?}", subjects); + // log_debug!("filter_subjects: {:?}", subjects); let subjects_str = subjects .iter() .map(|s| format!("<{}>", s)) diff --git a/package.json b/package.json index cd0f902..becc57e 100644 --- a/package.json +++ b/package.json @@ -15,5 +15,8 @@ "prettier": "^3.6.2", "prettier-plugin-svelte": "^3.4.0" }, + "engines": { + "node": ">=22.18" + }, "packageManager": "pnpm@10.15.0+sha512.486ebc259d3e999a4e8691ce03b5cac4a71cbeca39372a9b762cb500cfdf0873e2cb16abe3d951b1ee2cf012503f027b98b6584e4df22524e0c7450d9ec7aa7b" } diff --git a/sdk/ng-sdk-js/examples/multi-framework-signals/package.json b/sdk/ng-sdk-js/examples/multi-framework-signals/package.json index 3feb6d9..84b3371 100644 --- a/sdk/ng-sdk-js/examples/multi-framework-signals/package.json +++ b/sdk/ng-sdk-js/examples/multi-framework-signals/package.json @@ -43,5 +43,8 @@ "@types/react-dom": "19.1.7", "vite": "7.1.3", "vitest": "^3.2.4" + }, + "engines": { + "node": ">=22.18" } } From a48e0b00c8af16009c9ec42ec59fa37cc20facf7 Mon Sep 17 00:00:00 2001 From: Laurin Weger Date: Sun, 12 Oct 2025 21:10:56 +0200 Subject: [PATCH 2/7] orm creation: two tests still failing --- Cargo.lock | 25 + nextgraph/Cargo.toml | 4 + nextgraph/src/tests/orm.rs | 793 ++++++++++++++++-- ng-verifier/src/orm/add_remove_triples.rs | 9 +- ng-verifier/src/orm/mod.rs | 33 +- ng-verifier/src/orm/utils.rs | 62 +- ng-verifier/src/orm/validation.rs | 26 +- .../connector/createSignalObjectForShape.ts | 28 +- 8 files changed, 857 insertions(+), 123 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 524c76a..adf197b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -596,6 +596,19 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "canonical_json" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f89083fd014d71c47a718d7f4ac050864dac8587668dbe90baf9e261064c5710" +dependencies = [ + "hex", + "regex", + "serde", + "serde_json", + "thiserror 1.0.69", +] + [[package]] name = "cast" version = "0.3.0" @@ -2156,6 +2169,7 @@ dependencies = [ "async-std", "async-trait", "base64-url", + "canonical_json", "futures", "lazy_static", "ng-client-ws", @@ -2171,6 +2185,7 @@ dependencies = [ "serde_bare", "serde_bytes", "serde_json", + "serde_json_diff", "svg2pdf", "web-time", "whoami", @@ -3497,6 +3512,16 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_json_diff" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac615f2de9556d78ec9d5924abae441d1764f833fbd6db24bb56d2b6b5200ed" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" diff --git a/nextgraph/Cargo.toml b/nextgraph/Cargo.toml index 1b30f3a..96e0eac 100644 --- a/nextgraph/Cargo.toml +++ b/nextgraph/Cargo.toml @@ -45,6 +45,10 @@ ng-client-ws = { path = "../ng-client-ws", version = "0.1.2" } ng-verifier = { path = "../ng-verifier", version = "0.1.2" } ng-oxigraph = { path = "../ng-oxigraph", version = "0.4.0-alpha.8-ngalpha" } +[dev-dependencies] +serde_json_diff = "0.2.0" +canonical_json = "0.5.0" + [target.'cfg(all(not(target_family = "wasm"),not(docsrs)))'.dependencies] ng-storage-rocksdb = { path = "../ng-storage-rocksdb", version = "0.1.2" } diff --git a/nextgraph/src/tests/orm.rs b/nextgraph/src/tests/orm.rs index 004051a..8881fbd 100644 --- a/nextgraph/src/tests/orm.rs +++ b/nextgraph/src/tests/orm.rs @@ -12,12 +12,14 @@ use crate::tests::create_or_open_wallet::create_or_open_wallet; use async_std::stream::StreamExt; use ng_net::app_protocol::{AppResponse, AppResponseV0, NuriV0}; use ng_net::orm::{ - BasicType, OrmSchema, OrmSchemaDataType, OrmSchemaLiteralType, OrmSchemaPredicate, + self, BasicType, OrmSchema, OrmSchemaDataType, OrmSchemaLiteralType, OrmSchemaPredicate, OrmSchemaShape, OrmShapeType, }; use ng_verifier::orm::utils::shape_type_to_sparql; -use ng_repo::log_info; +use ng_repo::{log_err, log_info}; +use serde_json::json; +use serde_json::Value; use std::collections::HashMap; use std::sync::Arc; @@ -349,7 +351,16 @@ async fn test_orm_creation() { test_orm_multi_type(session_id).await; // === - test_orm_nested(session_id).await; + test_orm_nested_1(session_id).await; + + // // === + // test_orm_nested_2(session_id).await; + + // // === + // test_orm_nested_3(session_id).await; + + // === + test_orm_nested_4(session_id).await; } async fn test_orm_big_object(session_id: u64) { @@ -363,45 +374,50 @@ INSERT DATA { ex:numValue 42 ; ex:boolValue true ; ex:arrayValue 1,2,3 ; - ex:objectValue [ - ex:nestedString "nested" ; - ex:nestedNum 7 ; - ex:nestedArray 5,6 - ] ; - ex:anotherObject [ - ex:prop1 "one" ; - ex:prop2 1 - ], [ - ex:prop1 "two" ; - ex:prop2 2 - ] ; + ex:objectValue ; + ex:anotherObject , ; ex:numOrStr "either" ; ex:lit1Or2 "lit1" ; ex:unrelated "some value" ; ex:anotherUnrelated 4242 . + + ex:nestedString "nested" ; + ex:nestedNum 7 ; + ex:nestedArray 5,6 . + + + ex:prop1 "one" ; + ex:prop2 1 . + + + ex:prop1 "two" ; + ex:prop2 2 . a ex:TestObject ; ex:stringValue "hello world #2" ; ex:numValue 422 ; ex:boolValue false ; ex:arrayValue 4,5,6 ; - ex:objectValue [ - ex:nestedString "nested2" ; - ex:nestedNum 72 ; - ex:nestedArray 7,8,9 - ] ; - ex:anotherObject [ - ex:prop1 "one2" ; - ex:prop2 12 - ], [ - ex:prop1 "two2" ; - ex:prop2 22 - ] ; + ex:objectValue ; + ex:anotherObject , ; ex:numOrStr 4 ; ex:lit1Or2 "lit2" ; ex:unrelated "some value2" ; ex:anotherUnrelated 42422 . + + + ex:nestedString "nested2" ; + ex:nestedNum 72 ; + ex:nestedArray 7,8,9 . + + + ex:prop1 "one2" ; + ex:prop2 12 . + + + ex:prop1 "two2" ; + ex:prop2 22 . } "# .to_string(), @@ -416,24 +432,77 @@ INSERT DATA { shape: "http://example.org/TestObject".to_string(), }; - log_info!("starting orm test"); let nuri = NuriV0::new_from(&doc_nuri).expect("parse nuri"); let (mut receiver, cancel_fn) = orm_start(nuri, shape_type, session_id) .await .expect("orm_start"); - log_info!("orm_start call ended"); - while let Some(app_response) = receiver.next().await { - match app_response { + let orm_json = match app_response { AppResponse::V0(v) => match v { - AppResponseV0::OrmInitial(json) => { - log_info!("ORM JSON arrived\n: {:?}", json); - break; - } - _ => (), + AppResponseV0::OrmInitial(json) => Some(json), + _ => None, }, } + .unwrap(); + + let mut expected = json!([{ + "type":"http://example.org/TestObject", + "id":"urn:test:obj1", + "anotherObject":{ + "urn:test:id1":{ + "prop1":"one", + "prop2":1.0 + }, + "urn:test:id2":{ + "prop1":"two", + "prop2":2.0 + } + }, + "arrayValue":[3.0,2.0,1.0], + "boolValue":true, + "lit1Or2":"lit1", + "numOrStr":"either", + "numValue":42.0, + "objectValue":{ + "id":"urn:test:id3", + "nestedArray":[5.0,6.0], + "nestedNum":7.0, + "nestedString":"nested" + }, + "stringValue": "hello world", + }, + { + "id":"urn:test:obj2", + "type":"http://example.org/TestObject", + "anotherObject": { + "urn:test:id4":{ + "prop1":"one2", + "prop2":12.0 + }, + "urn:test:id5":{ + "prop1":"two2", + "prop2":22.0 + } + }, + "arrayValue":[6.0,5.0,4.0], + "boolValue":false, + "lit1Or2":"lit2", + "numOrStr":4.0, + "numValue":422.0, + "objectValue":{ + "id":"urn:test:id6", + "nestedArray": [7.0,8.0,9.0], + "nestedNum":72.0, + "nestedString":"nested2" + }, + "stringValue":"hello world #2", + }]); + + let mut actual_mut = orm_json.clone(); + assert_json_eq(&mut expected, &mut actual_mut); + + break; } cancel_fn(); } @@ -531,7 +600,26 @@ INSERT DATA { } .unwrap(); - log_info!("ORM JSON arrived\n: {:?}", orm_json); + let mut expected = json!([ + { + "id": "urn:test:numArrayObj1", + "type": "http://example.org/TestObject", + "numArray": [1.0, 2.0, 3.0] + }, + { + "id": "urn:test:numArrayObj2", + "type": "http://example.org/TestObject", + "numArray": [] + }, + { + "id": "urn:test:numArrayObj3", + "type": "http://example.org/TestObject", + "numArray": [1.0, 2.0] + } + ]); + + let mut actual_mut = orm_json.clone(); + assert_json_eq(&mut expected, &mut actual_mut); break; } @@ -548,6 +636,7 @@ INSERT DATA { ex:opt true ; ex:str "s1" . + # Contains no matching data ex:str "s2" . } @@ -598,7 +687,15 @@ INSERT DATA { } .unwrap(); - log_info!("ORM JSON arrived for optional test\n: {:?}", orm_json); + let mut expected = json!([ + { + "id": "urn:test:oj1", + "opt": true + } + ]); + + let mut actual_mut = orm_json.clone(); + assert_json_eq(&mut expected, &mut actual_mut); break; } @@ -687,7 +784,21 @@ INSERT DATA { } .unwrap(); - log_info!("ORM JSON arrived for literal test\n: {:?}", orm_json); + let mut expected = json!([ + { + "id": "urn:test:oj1", + "lit1": ["lit 1"], + "lit2": "lit 2" + }, + { + "id": "urn:test:obj2", + "lit1": ["lit 1", "lit 1 extra"], + "lit2": "lit 2" + } + ]); + + let mut actual_mut = orm_json.clone(); + assert_json_eq(&mut expected, &mut actual_mut); break; } @@ -765,14 +876,22 @@ INSERT DATA { } .unwrap(); - log_info!("ORM JSON arrived for multi type test\n: {:?}", orm_json); + let mut expected = json!([ + { + "id": "urn:test:oj1", + "strOrNum": ["a string", "another string", 2.0] + } + ]); + + let mut actual_mut = orm_json.clone(); + assert_json_eq(&mut expected, &mut actual_mut); break; } cancel_fn(); } -async fn test_orm_nested(session_id: u64) { +async fn test_orm_nested_1(session_id: u64) { let doc_nuri = create_doc_with_data( session_id, r#" @@ -781,33 +900,37 @@ INSERT DATA { # Valid ex:str "obj1 str" ; - ex:nestedWithExtra [ - ex:nestedStr "obj1 nested with extra valid" ; - ex:nestedNum 2 - ] , [ - # Invalid, nestedNum is missing but okay because extra. - ex:nestedStr "obj1 nested with extra invalid" - ] ; - ex:nestedWithoutExtra [ - ex:nestedStr "obj1 nested without extra valid" ; - ex:nestedNum 2 - ] . + ex:nestedWithExtra , ; + ex:nestedWithoutExtra . + + + ex:nestedStr "obj1 nested with extra valid" ; + ex:nestedNum 2 . + + + ex:nestedStr "obj1 nested with extra invalid" . + + + ex:nestedStr "obj1 nested without extra valid" ; + ex:nestedNum 2 . # Invalid because nestedWithoutExtra has an invalid child. ex:str "obj2 str" ; - ex:nestedWithExtra [ - ex:nestedStr "obj2: a nested string valid" ; - ex:nestedNum 2 - ] ; - ex:nestedWithoutExtra [ - ex:nestedStr "obj2 nested without extra valid" ; - ex:nestedNum 2 - ] , - # Invalid because nestedNum is missing. - [ - ex:nestedStr "obj2 nested without extra invalid" - ] . + ex:nestedWithExtra ; + ex:nestedWithoutExtra , . + + + ex:nestedStr "obj2: a nested string valid" ; + ex:nestedNum 2 . + + + ex:nestedStr "obj2 nested without extra valid" ; + ex:nestedNum 2 . + + # Invalid because nestedNum is missing. + + ex:nestedStr "obj2 nested without extra invalid" . } "# .to_string(), @@ -954,7 +1077,483 @@ INSERT DATA { } .unwrap(); - log_info!("ORM JSON arrived for nested test\n: {:?}", orm_json); + let mut expected = json!([ + { + "id": "urn:test:oj1", + "str": "obj1 str", + "nestedWithExtra": { + "nestedStr": "obj1 nested with extra valid", + "nestedNum": 2.0 + }, + "nestedWithoutExtra": { + "nestedStr": "obj1 nested without extra valid", + "nestedNum": 2.0 + } + } + ]); + + let mut actual_mut = orm_json.clone(); + assert_json_eq(&mut expected, &mut actual_mut); + + break; + } + cancel_fn(); +} + +async fn test_orm_nested_2(session_id: u64) { + let doc_nuri = create_doc_with_data( + session_id, + r#" +PREFIX ex: +INSERT DATA { + # Valid + + ex:knows , ; + ex:name "Alice" . + + ex:knows ; + ex:name "Bob" . + + ex:name "Claire" . + + # Invalid because claire2 is invalid + + ex:knows , ; + ex:name "Alice" . + # Invalid because claire2 is invalid + + ex:knows ; + ex:name "Bob" . + # Invalid because name is missing. + + ex:missingName "Claire missing" . +} +"# + .to_string(), + ) + .await; + + // Define the ORM schema + let mut schema = HashMap::new(); + schema.insert( + "http://example.org/PersonShape".to_string(), + OrmSchemaShape { + iri: "http://example.org/PersonShape".to_string(), + predicates: vec![ + OrmSchemaPredicate { + iri: "http://example.org/name".to_string(), + extra: None, + maxCardinality: 1, + minCardinality: 1, + readablePredicate: "name".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::string, + literals: None, + shape: None, + }], + } + .into(), + OrmSchemaPredicate { + iri: "http://example.org/knows".to_string(), + extra: Some(false), + maxCardinality: -1, + minCardinality: 0, + readablePredicate: "knows".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::shape, + literals: None, + shape: Some("http://example.org/PersonShape".to_string()), + }], + } + .into(), + ], + } + .into(), + ); + + let shape_type = OrmShapeType { + schema, + shape: "http://example.org/PersonShape".to_string(), + }; + + let nuri = NuriV0::new_from(&doc_nuri).expect("parse nuri"); + let (mut receiver, cancel_fn) = orm_start(nuri, shape_type, session_id) + .await + .expect("orm_start"); + + while let Some(app_response) = receiver.next().await { + let orm_json = match app_response { + AppResponse::V0(v) => match v { + AppResponseV0::OrmInitial(json) => Some(json), + _ => None, + }, + } + .unwrap(); + + log_info!( + "ORM JSON arrived for nested2 (person) test\n: {:?}", + orm_json + ); + + // Expected: alice and bob with their nested knows relationships + // claire2 is invalid (missing name), so alice2's knows chain is incomplete + let mut expected = json!([ + { + "id": "urn:test:alice", + "name": "Alice", + "knows": { + "urn:test:bob": { + "name": "Bob", + "knows": { + "urn:test:claire": { + "name": "Claire", + "knows": {} + } + } + }, + "urn:test:claire": { + "name": "Claire", + "knows": {} + } + } + }, + { + "id": "urn:test:bob", + "name": "Bob", + "knows": { + "urn:test:claire": { + "name": "Claire", + "knows": {} + } + } + }, + { + "id": "urn:test:claire", + "name": "Claire", + "knows": {} + } + ]); + + let mut actual_mut = orm_json.clone(); + log_info!( + "JSON for nested2\n{}", + serde_json::to_string(&actual_mut).unwrap() + ); + assert_json_eq(&mut expected, &mut actual_mut); + + break; + } + cancel_fn(); +} + +async fn test_orm_nested_3(session_id: u64) { + let doc_nuri = create_doc_with_data( + session_id, + r#" +PREFIX ex: +INSERT DATA { + # Valid + + a ex:Alice ; + ex:knows , . + + a ex:Bob ; + ex:knows . + + a ex:Claire . + + # Invalid because claire is invalid + + a ex:Alice ; + ex:knows , . + # Invalid because claire is invalid + + a ex:Bob ; + ex:knows . + # Invalid, wrong type. + + a ex:Claire2 . +} +"# + .to_string(), + ) + .await; + + // Define the ORM schema + let mut schema = HashMap::new(); + schema.insert( + "http://example.org/AliceShape".to_string(), + OrmSchemaShape { + iri: "http://example.org/AliceShape".to_string(), + predicates: vec![ + OrmSchemaPredicate { + iri: "http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(), + extra: None, + maxCardinality: 1, + minCardinality: 1, + readablePredicate: "type".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::literal, + literals: Some(vec![BasicType::Str( + "http://example.org/Alice".to_string(), + )]), + shape: None, + }], + } + .into(), + OrmSchemaPredicate { + iri: "http://example.org/knows".to_string(), + extra: Some(false), + maxCardinality: -1, + minCardinality: 0, + readablePredicate: "knows".to_string(), + dataTypes: vec![ + OrmSchemaDataType { + valType: OrmSchemaLiteralType::shape, + literals: None, + shape: Some("http://example.org/BobShape".to_string()), + }, + OrmSchemaDataType { + valType: OrmSchemaLiteralType::shape, + literals: None, + shape: Some("http://example.org/ClaireShape".to_string()), + }, + ], + } + .into(), + ], + } + .into(), + ); + schema.insert( + "http://example.org/BobShape".to_string(), + OrmSchemaShape { + iri: "http://example.org/BobShape".to_string(), + predicates: vec![ + OrmSchemaPredicate { + iri: "http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(), + extra: Some(true), + maxCardinality: 1, + minCardinality: 1, + readablePredicate: "type".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::literal, + literals: Some(vec![BasicType::Str("http://example.org/Bob".to_string())]), + shape: None, + }], + } + .into(), + OrmSchemaPredicate { + iri: "http://example.org/knows".to_string(), + extra: Some(false), + maxCardinality: -1, + minCardinality: 0, + readablePredicate: "knows".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::shape, + literals: None, + shape: Some("http://example.org/ClaireShape".to_string()), + }], + } + .into(), + ], + } + .into(), + ); + schema.insert( + "http://example.org/ClaireShape".to_string(), + OrmSchemaShape { + iri: "http://example.org/ClaireShape".to_string(), + predicates: vec![OrmSchemaPredicate { + iri: "http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(), + extra: None, + maxCardinality: 1, + minCardinality: 1, + readablePredicate: "type".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::literal, + literals: Some(vec![BasicType::Str( + "http://example.org/Claire".to_string(), + )]), + shape: None, + }], + } + .into()], + } + .into(), + ); + + let shape_type = OrmShapeType { + schema, + shape: "http://example.org/AliceShape".to_string(), + }; + + let nuri = NuriV0::new_from(&doc_nuri).expect("parse nuri"); + let (mut receiver, cancel_fn) = orm_start(nuri, shape_type, session_id) + .await + .expect("orm_start"); + + while let Some(app_response) = receiver.next().await { + let orm_json = match app_response { + AppResponse::V0(v) => match v { + AppResponseV0::OrmInitial(json) => Some(json), + _ => None, + }, + } + .unwrap(); + + log_info!( + "ORM JSON arrived for nested3 (person) test\n: {:?}", + serde_json::to_string(&orm_json).unwrap() + ); + + // Expected: alice with knows relationships to bob and claire + // alice2 is incomplete because claire2 has wrong type + let mut expected = json!([ + { + "id": "urn:test:alice", + "type": "http://example.org/Alice", + "knows": { + "urn:test:bob": { + "type": "http://example.org/Bob", + "knows": { + "urn:test:claire": { + "type": "http://example.org/Claire" + } + } + }, + "urn:test:claire": { + "type": "http://example.org/Claire" + } + } + } + ]); + + let mut actual_mut = orm_json.clone(); + assert_json_eq(&mut expected, &mut actual_mut); + + break; + } + cancel_fn(); +} + +async fn test_orm_nested_4(session_id: u64) { + let doc_nuri = create_doc_with_data( + session_id, + r#" +PREFIX ex: +INSERT DATA { + # Valid + + a ex:Person ; + ex:hasCat , . + + a ex:Cat . + + a ex:Cat . +} +"# + .to_string(), + ) + .await; + + // Define the ORM schema + let mut schema = HashMap::new(); + schema.insert( + "http://example.org/PersonShape".to_string(), + OrmSchemaShape { + iri: "http://example.org/PersonShape".to_string(), + predicates: vec![ + OrmSchemaPredicate { + iri: "http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(), + extra: None, + maxCardinality: 1, + minCardinality: 1, + readablePredicate: "type".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::literal, + literals: Some(vec![BasicType::Str( + "http://example.org/Person".to_string(), + )]), + shape: None, + }], + } + .into(), + OrmSchemaPredicate { + iri: "http://example.org/hasCat".to_string(), + extra: Some(false), + maxCardinality: -1, + minCardinality: 0, + readablePredicate: "cats".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::shape, + literals: None, + shape: Some("http://example.org/CatShape".to_string()), + }], + } + .into(), + ], + } + .into(), + ); + schema.insert( + "http://example.org/CatShape".to_string(), + OrmSchemaShape { + iri: "http://example.org/CatShape".to_string(), + predicates: vec![OrmSchemaPredicate { + iri: "http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(), + extra: Some(true), + maxCardinality: 1, + minCardinality: 1, + readablePredicate: "type".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::literal, + literals: Some(vec![BasicType::Str("http://example.org/Cat".to_string())]), + shape: None, + }], + } + .into()], + } + .into(), + ); + + let shape_type = OrmShapeType { + schema, + shape: "http://example.org/PersonShape".to_string(), + }; + + let nuri = NuriV0::new_from(&doc_nuri).expect("parse nuri"); + let (mut receiver, cancel_fn) = orm_start(nuri, shape_type, session_id) + .await + .expect("orm_start"); + + while let Some(app_response) = receiver.next().await { + let orm_json = match app_response { + AppResponse::V0(v) => match v { + AppResponseV0::OrmInitial(json) => Some(json), + _ => None, + }, + } + .unwrap(); + + let mut expected = json!([ + { + "id": "urn:test:alice", + "type": "http://example.org/Person", + "cats": { + "urn:test:kitten1": { + "type": "http://example.org/Cat" + }, + "urn:test:kitten2": { + "type": "http://example.org/Cat" + } + }, + } + ]); + + let mut actual_mut = orm_json.clone(); + + assert_json_eq(&mut expected, &mut actual_mut); break; } @@ -1209,3 +1808,67 @@ async fn create_doc_with_data(session_id: u64, sparql_insert: String) -> String return doc_nuri; } + +fn assert_json_eq(expected: &mut Value, actual: &mut Value) { + remove_id_fields(expected); + remove_id_fields(actual); + + sort_arrays(expected); + sort_arrays(actual); + + let diff = serde_json_diff::values(expected.clone(), actual.clone()); + if let Some(diff_) = diff { + log_err!("Expected and actual ORM JSON mismatch.\nDiff: {:?}", diff_); + assert!(false); + } +} + +/// Helper to recursively sort all arrays in nested objects into a stable ordering. +/// Arrays are sorted by their JSON string representation. +fn sort_arrays(value: &mut Value) { + match value { + Value::Object(map) => { + for v in map.values_mut() { + sort_arrays(v); + } + } + Value::Array(arr) => { + // First, recursively sort nested structures + for v in arr.iter_mut() { + sort_arrays(v); + } + // Then sort the array itself by JSON string representation + arr.sort_by(|a, b| { + let a_str = canonical_json::ser::to_string(a).unwrap_or_default(); + let b_str = canonical_json::ser::to_string(b).unwrap_or_default(); + a_str.cmp(&b_str) + }); + } + _ => {} + } +} + +/// Helper to recursively remove nested "id" fields from nested objects, +/// but only if they are not at the root level. +fn remove_id_fields(value: &mut Value) { + fn remove_id_fields_inner(value: &mut Value, is_root: bool) { + match value { + Value::Object(map) => { + if !is_root { + map.remove("id"); + } + for v in map.values_mut() { + remove_id_fields_inner(v, false); + } + } + Value::Array(arr) => { + for v in arr { + remove_id_fields_inner(v, false); + } + } + _ => {} + } + } + + remove_id_fields_inner(value, true); +} diff --git a/ng-verifier/src/orm/add_remove_triples.rs b/ng-verifier/src/orm/add_remove_triples.rs index cfc7c94..a30c8b1 100644 --- a/ng-verifier/src/orm/add_remove_triples.rs +++ b/ng-verifier/src/orm/add_remove_triples.rs @@ -57,15 +57,18 @@ pub fn add_remove_triples( // Process added triples. // For each triple, check if it matches the shape. // In parallel, we record the values added and removed (tracked_changes) - log_debug!("Processing # triples: {}", triples_added.len()); for triple in triples_added { let obj_term = oxrdf_term_to_orm_basic_type(&triple.object); - log_debug!("processing triple {triple}"); + log_debug!(" - processing triple {triple}"); for predicate_schema in &shape.predicates { if predicate_schema.iri != triple.predicate.as_str() { // Triple does not match predicate. continue; } + log_debug!( + " - Matched triple for datatypes {:?}", + predicate_schema.dataTypes + ); // Predicate schema constraint matches this triple. let tracked_subject_lock = get_or_create_tracked_subject(subject_iri, &shape, tracked_subjects); @@ -124,7 +127,7 @@ pub fn add_remove_triples( None } }) { - // log_debug!("dealing with nesting for {shape_iri}"); + log_debug!(" - dealing with nested type {shape_iri}"); if let BasicType::Str(obj_iri) = &obj_term { let tracked_child_arc = { // Get or create object's tracked subject struct. diff --git a/ng-verifier/src/orm/mod.rs b/ng-verifier/src/orm/mod.rs index 8a835d6..336b998 100644 --- a/ng-verifier/src/orm/mod.rs +++ b/ng-verifier/src/orm/mod.rs @@ -75,6 +75,7 @@ impl Verifier { return Err(NgError::SparqlError(e.to_string())); } Ok(triple) => { + log_debug!("Triple fetched: {:?}", triple); result_triples.push(triple); } } @@ -173,9 +174,13 @@ impl Verifier { HashMap::new(); // For each subject, add/remove triples and validate. - log_debug!("all_modified_subjects: {:?}", modified_subject_iris); + log_debug!( + "processing modified subjects: {:?} against shape: {}", + modified_subject_iris, + shape.iri + ); - for subject_iri in modified_subject_iris { + for subject_iri in &modified_subject_iris { let validation_key = (shape.iri.clone(), subject_iri.to_string()); // Cycle detection: Check if this (shape, subject) pair is already being validated @@ -187,7 +192,8 @@ impl Verifier { ); // Mark as invalid due to cycle // TODO: We could handle this by handling nested references as IRIs. - if let Some(tracked_shapes) = orm_subscription.tracked_subjects.get(subject_iri) + if let Some(tracked_shapes) = + orm_subscription.tracked_subjects.get(*subject_iri) { if let Some(tracked_subject) = tracked_shapes.get(&shape.iri) { let mut ts = tracked_subject.write().unwrap(); @@ -201,12 +207,13 @@ impl Verifier { // Mark as currently validating currently_validating.insert(validation_key.clone()); + // Get triples of subject (added & removed). let triples_added_for_subj = added_triples_by_subject - .get(subject_iri) + .get(*subject_iri) .map(|v| v.as_slice()) .unwrap_or(&[]); let triples_removed_for_subj = removed_triples_by_subject - .get(subject_iri) + .get(*subject_iri) .map(|v| v.as_slice()) .unwrap_or(&[]); @@ -214,9 +221,9 @@ impl Verifier { let change = orm_changes .entry(shape.iri.clone()) .or_insert_with(HashMap::new) - .entry(subject_iri.clone()) + .entry((*subject_iri).clone()) .or_insert_with(|| OrmTrackedSubjectChange { - subject_iri: subject_iri.clone(), + subject_iri: (*subject_iri).clone(), predicates: HashMap::new(), data_applied: false, }); @@ -248,7 +255,7 @@ impl Verifier { let validity = { let tracked_subject_opt = orm_subscription .tracked_subjects - .get(subject_iri) + .get(*subject_iri) .and_then(|m| m.get(&shape.iri)); let Some(tracked_subject) = tracked_subject_opt else { continue; @@ -277,14 +284,8 @@ impl Verifier { } } } - - // Remove from validation stack after processing this subject - currently_validating.remove(&validation_key); } - // TODO: Currently, all shape <-> nested subject combinations are queued for re-evaluation. - // Is that okay? - // Now, we queue all non-evaluated objects for (shape_iri, objects_to_eval) in &nested_objects_to_eval { let orm_subscription = self.get_first_orm_subscription_for( @@ -333,6 +334,10 @@ impl Verifier { shape_validation_stack.push((shape_arc, objects_not_to_fetch)); } } + for subject_iri in modified_subject_iris { + let validation_key = (shape.iri.clone(), subject_iri.to_string()); + currently_validating.remove(&validation_key); + } } Ok(()) diff --git a/ng-verifier/src/orm/utils.rs b/ng-verifier/src/orm/utils.rs index 3a1662f..8df48c2 100644 --- a/ng-verifier/src/orm/utils.rs +++ b/ng-verifier/src/orm/utils.rs @@ -80,6 +80,7 @@ pub fn shape_type_to_sparql( let mut visited_shapes: HashSet = HashSet::new(); // Recursive function to call for (nested) shapes. + // Returns nested WHERE statements that should be included with this shape's binding. fn process_shape( schema: &OrmSchema, shape: &OrmSchemaShape, @@ -89,11 +90,11 @@ pub fn shape_type_to_sparql( var_counter: &mut i32, visited_shapes: &mut HashSet, in_recursion: bool, - ) { + ) -> Vec { // Prevent infinite recursion on cyclic schemas. // TODO: We could handle this as IRI string reference. if visited_shapes.contains(&shape.iri) { - return; + return vec![]; } let mut new_where_statements: Vec = vec![]; @@ -102,8 +103,12 @@ pub fn shape_type_to_sparql( visited_shapes.insert(shape.iri.clone()); // Add statements for each predicate. + // If we are in recursion, we want to get all triples. + // That's why we add a " ?p ?o" statement afterwards + // and the extra construct statements are skipped. for predicate in &shape.predicates { let mut union_branches = Vec::new(); + let mut nested_where_statements = Vec::new(); // Predicate constraints might have more than one acceptable nested shape. Traverse each. for datatype in &predicate.dataTypes { @@ -116,10 +121,12 @@ pub fn shape_type_to_sparql( // Each shape option gets its own var. let obj_var_name = get_new_var_name(var_counter); - new_construct_statements.push(format!( - " ?{} <{}> ?{}", - subject_var_name, predicate.iri, obj_var_name - )); + if !in_recursion { + new_construct_statements.push(format!( + " ?{} <{}> ?{}", + subject_var_name, predicate.iri, obj_var_name + )); + } // Those are later added to a UNION, if there is more than one shape. union_branches.push(format!( " ?{} <{}> ?{}", @@ -127,7 +134,8 @@ pub fn shape_type_to_sparql( )); // Recurse to add statements for nested object. - process_shape( + // Collect nested WHERE statements to include within this predicate's scope. + let nested_stmts = process_shape( schema, nested_shape, &obj_var_name, @@ -137,6 +145,7 @@ pub fn shape_type_to_sparql( visited_shapes, true, ); + nested_where_statements.extend(nested_stmts); } } @@ -145,21 +154,32 @@ pub fn shape_type_to_sparql( if !union_branches.is_empty() { // We have nested shape(s) which were already added to CONSTRUCT above. - // Join them with UNION. + // Join them with UNION and include nested WHERE statements. - where_body = union_branches + let union_body = union_branches .into_iter() .map(|b| format!("{{\n{}\n}}", b)) .collect::>() .join(" UNION "); + + // Combine the parent binding with nested statements + if !nested_where_statements.is_empty() { + let nested_joined = nested_where_statements.join(" .\n"); + where_body = format!("{} .\n{}", union_body, nested_joined); + } else { + where_body = union_body; + } } else { // Regular predicate data type. Just add basic CONSTRUCT and WHERE statements. let obj_var_name = get_new_var_name(var_counter); - new_construct_statements.push(format!( - " ?{} <{}> ?{}", - subject_var_name, predicate.iri, obj_var_name - )); + if !in_recursion { + // Only add construct, if we don't have catch-all statement already. + new_construct_statements.push(format!( + " ?{} <{}> ?{}", + subject_var_name, predicate.iri, obj_var_name + )); + } where_body = format!( " ?{} <{}> ?{}", subject_var_name, predicate.iri, obj_var_name @@ -181,7 +201,7 @@ pub fn shape_type_to_sparql( let pred_var_name = get_new_var_name(var_counter); let obj_var_name = get_new_var_name(var_counter); - // The "catch any triple in subject" where statement + // The "catch any triple in subject" construct statement construct_statements.push(format!( " ?{} ?{} ?{}", subject_var_name, pred_var_name, obj_var_name @@ -189,17 +209,20 @@ pub fn shape_type_to_sparql( let joined_where_statements = new_where_statements.join(" .\n"); - // We do a join of the where statements (which will take care of querying further nested objects) - // and the "catch any triple in subject" where statement. - where_statements.push(format!( - " {{?{} ?{} ?{}}}\n UNION {{\n {}\n }}", + // Return nested statements to be included in parent's scope + // Combine catch-all with specific predicates in a UNION + let nested_block = format!( + " {{\n {{?{} ?{} ?{}}}\n UNION {{\n {}\n }}\n }}", subject_var_name, pred_var_name, obj_var_name, joined_where_statements - )); + ); + visited_shapes.remove(&shape.iri); + return vec![nested_block]; } else { where_statements.append(&mut new_where_statements); construct_statements.append(&mut new_construct_statements); } visited_shapes.remove(&shape.iri); + vec![] } let root_shape = schema.get(shape).ok_or(VerifierError::InvalidOrmSchema)?; @@ -239,7 +262,6 @@ pub fn shape_type_to_sparql( construct_body, where_body )) } - /// SPARQL literal escape: backslash, quotes, newlines, tabs. fn escape_literal(lit: &str) -> String { let mut out = String::with_capacity(lit.len() + 4); diff --git a/ng-verifier/src/orm/validation.rs b/ng-verifier/src/orm/validation.rs index be44f18..94754fd 100644 --- a/ng-verifier/src/orm/validation.rs +++ b/ng-verifier/src/orm/validation.rs @@ -36,6 +36,12 @@ impl Verifier { // Keep track of objects that need to be validated against a shape to fetch and validate. let mut need_evaluation: Vec<(String, String, bool)> = vec![]; + log_debug!( + "[Validation] for shape {} and subject {}", + shape.iri, + s_change.subject_iri + ); + // Check 1) Check if this object is untracked and we need to remove children and ourselves. if previous_validity == OrmTrackedSubjectValidity::Untracked { // 1.1) Schedule children for deletion @@ -106,7 +112,7 @@ impl Verifier { // Check 3.1) Cardinality if count < p_schema.minCardinality { log_debug!( - "[VALIDATION] Invalid: minCardinality not met | predicate: {:?} | count: {} | min: {} | schema: {:?} | changed: {:?}", + " - Invalid: minCardinality not met | predicate: {:?} | count: {} | min: {} | schema: {:?} | changed: {:?}", p_schema.iri, count, p_schema.minCardinality, @@ -125,7 +131,7 @@ impl Verifier { && p_schema.extra != Some(true) { log_debug!( - "[VALIDATION] Invalid: maxCardinality exceeded | predicate: {:?} | count: {} | max: {} | schema: {:?} | changed: {:?}", + " - Invalid: maxCardinality exceeded | predicate: {:?} | count: {} | max: {} | schema: {:?} | changed: {:?}", p_schema.iri, count, p_schema.maxCardinality, @@ -171,12 +177,13 @@ impl Verifier { ); if !some_valid { log_debug!( - "[VALIDATION] Invalid: required literals missing | predicate: {:?} | schema: {:?} | changed: {:?}", + " - Invalid: required literals missing | predicate: {:?} | schema: {:?} | changed: {:?}", p_schema.iri, shape.iri, p_change ); set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid); + break; } // Check 3.4) Nested shape correct. } else if p_schema @@ -191,6 +198,7 @@ impl Verifier { .map(|tc| tc.read().unwrap()) .collect::>() }); + // First, Count valid, invalid, unknowns, and untracked let counts = tracked_children.as_ref().map_or((0, 0, 0, 0), |children| { children @@ -213,9 +221,11 @@ impl Verifier { }) }); + log_debug!(" - checking nested - Counts: {:?}", counts); + if counts.1 > 0 && p_schema.extra != Some(true) { log_debug!( - "[VALIDATION] Invalid: nested invalid child | predicate: {:?} | schema: {:?} | changed: {:?}", + " - Invalid: nested invalid child | predicate: {:?} | schema: {:?} | changed: {:?}", p_schema.iri, shape.iri, p_change @@ -226,7 +236,7 @@ impl Verifier { break; } else if counts.0 > p_schema.maxCardinality && p_schema.maxCardinality != -1 { log_debug!( - "[VALIDATION] Too many valid children: | predicate: {:?} | schema: {:?} | changed: {:?}", + " - Invalid: Too many valid children: | predicate: {:?} | schema: {:?} | changed: {:?}", p_schema.iri, shape.iri, p_change @@ -236,7 +246,7 @@ impl Verifier { break; } else if counts.0 + counts.2 + counts.3 < p_schema.minCardinality { log_debug!( - "[VALIDATION] Invalid: not enough nested children | predicate: {:?} | valid_count: {} | min: {} | schema: {:?} | changed: {:?}", + " - Invalid: not enough nested children | predicate: {:?} | valid_count: {} | min: {} | schema: {:?} | changed: {:?}", p_schema.iri, counts.0, p_schema.minCardinality, @@ -269,7 +279,7 @@ impl Verifier { } }); } else if counts.2 > 0 { - // If we have pending nested objects, we need to wait for their evaluation. + // If we have pending children, we need to wait for their evaluation. set_validity(&mut new_validity, OrmTrackedSubjectValidity::Pending); // Schedule pending children for re-evaluation without fetch. tracked_children.as_ref().map(|children| { @@ -307,7 +317,7 @@ impl Verifier { }; if !matches { log_debug!( - "[VALIDATION] Invalid: value type mismatch | predicate: {:?} | value: {:?} | allowed_types: {:?} | schema: {:?} | changed: {:?}", + " - Invalid: value type mismatch | predicate: {:?} | value: {:?} | allowed_types: {:?} | schema: {:?} | changed: {:?}", p_schema.iri, val_added, allowed_types, diff --git a/sdk/ng-sdk-js/ng-signals/src/connector/createSignalObjectForShape.ts b/sdk/ng-sdk-js/ng-signals/src/connector/createSignalObjectForShape.ts index 964acf1..761590b 100644 --- a/sdk/ng-sdk-js/ng-signals/src/connector/createSignalObjectForShape.ts +++ b/sdk/ng-sdk-js/ng-signals/src/connector/createSignalObjectForShape.ts @@ -1,6 +1,8 @@ import type { Diff, Scope } from "../types.ts"; import { applyDiff } from "./applyDiff.ts"; +import ng from "@nextgraph-monorepo/ng-sdk-js"; + import { deepSignal, watch, @@ -111,7 +113,7 @@ const setUpConnection = (entry: PoolEntry, wasmMessage: WasmMessage) => { }; // Handler for messages from wasm land. -const onWasmMessage = (event: MessageEvent) => { +const onMessage = (event: MessageEvent) => { console.debug("[JsLand] onWasmMessage", event); const { diff, connectionId, type } = event.data; @@ -121,7 +123,12 @@ const onWasmMessage = (event: MessageEvent) => { // And only process messages that are addressed to js-land. if (type === "FrontendUpdate") return; - if (type === "Request") return; + if (type === "Request") { + // TODO: Handle message from wasm land and js land + // in different functions + + return; + } if (type === "Stop") return; if (type === "InitialResponse") { @@ -137,7 +144,7 @@ const keyToEntry = new Map>(); const connectionIdToEntry = new Map>(); const communicationChannel = new BroadcastChannel("shape-manager"); -communicationChannel.addEventListener("message", onWasmMessage); +communicationChannel.addEventListener("message", onMessage); // FinalizationRegistry to clean up connections when signal objects are GC'd. const cleanupSignalRegistry = @@ -210,16 +217,11 @@ export function createSignalObjectForShape( keyToEntry.set(key, entry); connectionIdToEntry.set(entry.connectionId, entry); - // TODO: Just a hack since the channel is not set up in mock-mode - setTimeout( - () => - communicationChannel.postMessage({ - type: "Request", - connectionId: entry.connectionId, - shapeType, - } as WasmMessage), - 100 - ); + communicationChannel.postMessage({ + type: "Request", + connectionId: entry.connectionId, + shapeType, + } as WasmMessage); function buildReturn(entry: PoolEntry) { return { From e6d3cbc33729bb79192a38ed2c8eb2d91ccf3e3d Mon Sep 17 00:00:00 2001 From: Niko PLP Date: Mon, 13 Oct 2025 11:56:35 +0300 Subject: [PATCH 3/7] pass commit_uri to orm_update --- ng-verifier/src/commits/transaction.rs | 4 +++- ng-verifier/src/orm/mod.rs | 29 ++++++++++++++++++++++---- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/ng-verifier/src/commits/transaction.rs b/ng-verifier/src/commits/transaction.rs index bab25f3..4e88b6c 100644 --- a/ng-verifier/src/commits/transaction.rs +++ b/ng-verifier/src/commits/transaction.rs @@ -740,7 +740,8 @@ impl Verifier { .await; } else { let graph_patch = update.transaction.as_patch(); - commit_nuris.push(NuriV0::commit(&update.repo_id, &update.commit_id)); + let nuri = NuriV0::commit(&update.repo_id, &update.commit_id); + commit_nuris.push(nuri.clone()); self.push_app_response( &update.branch_id, AppResponse::V0(AppResponseV0::Patch(AppPatch { @@ -756,6 +757,7 @@ impl Verifier { NuriV0::repo_graph_name(&update.repo_id, &update.overlay_id); self.orm_update( &NuriV0::new_empty(), + nuri, update.transaction.as_quads_patch(graph_nuri), ) .await; diff --git a/ng-verifier/src/orm/mod.rs b/ng-verifier/src/orm/mod.rs index 336b998..cce5add 100644 --- a/ng-verifier/src/orm/mod.rs +++ b/ng-verifier/src/orm/mod.rs @@ -622,7 +622,28 @@ impl Verifier { return Ok(return_vals); } - pub(crate) async fn orm_update(&mut self, scope: &NuriV0, patch: GraphQuadsPatch) {} + pub(crate) async fn orm_update( + &mut self, + scope: &NuriV0, + commit_nuri: String, + patch: GraphQuadsPatch, + ) { + if let Some(subs) = self.orm_subscriptions.get(scope) { + for sub in subs { + + // //TODO fix this + // let orm_diff = ??; + + // self.push_orm_response( + // scope, + // sub.session_id, + // sub.sender.clone(), + // AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff)), + // ) + // .await; + } + } + } pub(crate) async fn orm_frontend_update( &mut self, @@ -686,14 +707,14 @@ impl Verifier { .or_insert(vec![]) .push(orm_subscription); - let _orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type)?; - // log_debug!("create_orm_object_for_shape return {:?}", _orm_objects); + let orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type)?; + // log_debug!("create_orm_object_for_shape return {:?}", orm_objects); self.push_orm_response( &nuri.clone(), session_id, tx.clone(), - AppResponse::V0(AppResponseV0::OrmInitial(_orm_objects)), + AppResponse::V0(AppResponseV0::OrmInitial(orm_objects)), ) .await; From d54290d99308cbf3322379f83aea68076208da98 Mon Sep 17 00:00:00 2001 From: Niko PLP Date: Mon, 13 Oct 2025 13:07:57 +0300 Subject: [PATCH 4/7] improve OrmUpdate logic for self update --- ng-net/src/app_protocol.rs | 3 +- ng-net/src/orm.rs | 8 ++++++ ng-verifier/src/commits/transaction.rs | 9 ++++-- ng-verifier/src/inbox_processor.rs | 10 +++---- ng-verifier/src/orm/mod.rs | 38 ++++++++++++++++++-------- ng-verifier/src/request_processor.rs | 13 +++++++-- ng-verifier/src/verifier.rs | 5 +++- 7 files changed, 63 insertions(+), 23 deletions(-) diff --git a/ng-net/src/app_protocol.rs b/ng-net/src/app_protocol.rs index 993586a..2def62a 100644 --- a/ng-net/src/app_protocol.rs +++ b/ng-net/src/app_protocol.rs @@ -20,7 +20,7 @@ use ng_repo::utils::{decode_digest, decode_key, decode_sym_key}; use ng_repo::utils::{decode_overlayid, display_timestamp_local}; use serde_json::Value; -use crate::orm::{OrmDiff, OrmShapeType}; +use crate::orm::{OrmDiff, OrmShapeType, OrmUpdateBlankNodeIds}; use crate::types::*; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -1309,6 +1309,7 @@ pub enum AppResponseV0 { Commits(Vec), OrmInitial(Value), OrmUpdate(OrmDiff), + OrmUpdateBlankNodeIds(OrmUpdateBlankNodeIds), OrmError(String), } diff --git a/ng-net/src/orm.rs b/ng-net/src/orm.rs index 2d16b93..afbd2f2 100644 --- a/ng-net/src/orm.rs +++ b/ng-net/src/orm.rs @@ -48,6 +48,14 @@ pub struct OrmDiffOp { pub type OrmDiff = Vec; +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct OrmUpdateBlankNodeId { + pub path: String, + pub nuri: String, +} + +pub type OrmUpdateBlankNodeIds = Vec; + pub type OrmSchema = HashMap>; #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/ng-verifier/src/commits/transaction.rs b/ng-verifier/src/commits/transaction.rs index 4e88b6c..0afe73c 100644 --- a/ng-verifier/src/commits/transaction.rs +++ b/ng-verifier/src/commits/transaction.rs @@ -295,7 +295,7 @@ impl Verifier { transaction, commit_info, }; - self.update_graph(vec![info]).await?; + self.update_graph(vec![info], 0).await?; } else //TODO: change the logic here. transaction commits can have both a discrete and graph update. Only one AppResponse should be sent in this case, containing both updates. if body.discrete.is_some() { @@ -417,6 +417,7 @@ impl Verifier { inserts: Vec, removes: Vec, peer_id: Vec, + session_id: u64, ) -> Result, VerifierError> { // options when not a publisher on the repo: // - skip @@ -531,12 +532,13 @@ impl Verifier { }; updates.push(info); } - self.update_graph(updates).await + self.update_graph(updates, session_id).await } async fn update_graph( &mut self, mut updates: Vec, + session_id: u64, ) -> Result, VerifierError> { let updates_ref = &mut updates; let res = self @@ -758,6 +760,7 @@ impl Verifier { self.orm_update( &NuriV0::new_empty(), nuri, + session_id, update.transaction.as_quads_patch(graph_nuri), ) .await; @@ -775,6 +778,7 @@ impl Verifier { query: &String, base: &Option, peer_id: Vec, + session_id: u64, ) -> Result, String> { let store = self.graph_dataset.as_ref().unwrap(); @@ -796,6 +800,7 @@ impl Verifier { Vec::from_iter(inserts), Vec::from_iter(removes), peer_id, + session_id, ) .await .map_err(|e| e.to_string()) diff --git a/ng-verifier/src/inbox_processor.rs b/ng-verifier/src/inbox_processor.rs index 287bc58..64b756f 100644 --- a/ng-verifier/src/inbox_processor.rs +++ b/ng-verifier/src/inbox_processor.rs @@ -65,7 +65,7 @@ impl Verifier { <> ng:social_query_from_profile <{from_profile_nuri_string}>. <> ng:social_query_started \"{}\"^^xsd:dateTime . }}",DateTime::now()); let ret = self - .process_sparql_update(&forwarder_nuri, &sparql_update, &Some(forwarder_nuri_string.clone()), vec![]) + .process_sparql_update(&forwarder_nuri, &sparql_update, &Some(forwarder_nuri_string.clone()), vec![],0) .await; if let Err(e) = ret { return Err(VerifierError::SparqlError(e)); @@ -78,7 +78,7 @@ impl Verifier { // adding triples in forwarder doc : ng:social_query_id let sparql_update = format!("INSERT DATA {{ <{forwarder_nuri_string}> \"{}\"^^ . }}",DateTime::now()); let ret = self - .process_sparql_update(forwarder_nuri, &sparql_update, &None, vec![]) + .process_sparql_update(forwarder_nuri, &sparql_update, &None, vec![],0) .await; if let Err(e) = ret { return Err(VerifierError::SparqlError(e)); @@ -153,7 +153,7 @@ impl Verifier { ng:social_query_forwarded_to_inbox <{to_inbox_nuri}> . }}"); let ret = self - .process_sparql_update(&forwarder_nuri, &sparql_update, &None, vec![]) + .process_sparql_update(&forwarder_nuri, &sparql_update, &None, vec![],0) .await; if let Err(e) = ret { return Err(VerifierError::SparqlError(e)); @@ -593,7 +593,7 @@ impl Verifier { let nuri_ov = NuriV0::repo_graph_name(&response.query_id, &overlay_id); let graph_name = NamedNode::new_unchecked(&nuri_ov); let quads = triples.into_iter().map(|t| t.in_graph(graph_name.clone()) ).collect(); - let commits = self.prepare_sparql_update(quads, vec![], self.get_peer_id_for_skolem()).await?; + let commits = self.prepare_sparql_update(quads, vec![], self.get_peer_id_for_skolem(), 0).await?; } else { @@ -665,7 +665,7 @@ impl Verifier { {has_email} }}", details.name); let ret = self - .process_sparql_update(&contact_nuri, &sparql_update, &Some(contact_nuri_string), vec![]) + .process_sparql_update(&contact_nuri, &sparql_update, &Some(contact_nuri_string), vec![],0) .await; if let Err(e) = ret { return Err(VerifierError::SparqlError(e)); diff --git a/ng-verifier/src/orm/mod.rs b/ng-verifier/src/orm/mod.rs index cce5add..3b83a64 100644 --- a/ng-verifier/src/orm/mod.rs +++ b/ng-verifier/src/orm/mod.rs @@ -626,23 +626,39 @@ impl Verifier { &mut self, scope: &NuriV0, commit_nuri: String, + session_id: u64, patch: GraphQuadsPatch, ) { + let mut responses = Vec::with_capacity(1); if let Some(subs) = self.orm_subscriptions.get(scope) { + let mut orm_diff: Option = None; for sub in subs { - - // //TODO fix this - // let orm_diff = ??; - - // self.push_orm_response( - // scope, - // sub.session_id, - // sub.sender.clone(), - // AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff)), - // ) - // .await; + if sub.session_id == session_id { + //TODO prepare OrmUpdateBlankNodeIds + let orm_bnids = vec![]; + + responses.push(( + sub.session_id, + sub.sender.clone(), + AppResponseV0::OrmUpdateBlankNodeIds(orm_bnids), + )); + } else { + if orm_diff.is_none() { + //orm_diff = Some(??) + //TODO implement this + } + responses.push(( + sub.session_id, + sub.sender.clone(), + AppResponseV0::OrmUpdate(orm_diff.as_ref().unwrap().to_vec()), + )); + } } } + for (session_id, sender, res) in responses { + self.push_orm_response(scope, session_id, sender, AppResponse::V0(res)) + .await; + } } pub(crate) async fn orm_frontend_update( diff --git a/ng-verifier/src/request_processor.rs b/ng-verifier/src/request_processor.rs index e76f959..e1b8a5f 100644 --- a/ng-verifier/src/request_processor.rs +++ b/ng-verifier/src/request_processor.rs @@ -216,6 +216,7 @@ impl Verifier { Vec::from_iter(inserts), Vec::from_iter(removes), self.get_peer_id_for_skolem(), + 0, ) .await?; Ok(()) @@ -696,7 +697,7 @@ impl Verifier { ); let ret = self - .process_sparql_update(&store_nuri, &query, &None, vec![]) + .process_sparql_update(&store_nuri, &query, &None, vec![], 0) .await; if let Err(e) = ret { return Err(NgError::SparqlError(e)); @@ -712,7 +713,9 @@ impl Verifier { object: Literal::new_simple_literal(primary_class).into(), graph_name: NamedNode::new_unchecked(&header_branch_nuri).into(), }; - let ret = self.prepare_sparql_update(vec![quad], vec![], vec![]).await; + let ret = self + .prepare_sparql_update(vec![quad], vec![], vec![], 0) + .await; if let Err(e) = ret { return Err(NgError::SparqlError(e.to_string())); } @@ -814,6 +817,7 @@ impl Verifier { &sparql_update, &Some(contact_doc_nuri_string), vec![], + 0, ) .await; if let Err(e) = ret { @@ -894,6 +898,7 @@ impl Verifier { command: &AppRequestCommandV0, nuri: NuriV0, payload: Option, + session_id: u64, ) -> Result { match command { AppRequestCommandV0::OrmUpdate => match payload { @@ -993,7 +998,7 @@ impl Verifier { let social_query_doc_nuri_string = NuriV0::repo_id(query_id); let sparql_update = format!("INSERT DATA {{ <{social_query_doc_nuri_string}> <{forwarder_nuri_string}>. }}"); let ret = self - .process_sparql_update(&nuri, &sparql_update, &None, vec![]) + .process_sparql_update(&nuri, &sparql_update, &None, vec![], 0) .await; if let Err(e) = ret { return Err(NgError::SparqlError(e)); @@ -1008,6 +1013,7 @@ impl Verifier { &sparql_update, &Some(forwarder_nuri_string), vec![], + 0, ) .await; if let Err(e) = ret { @@ -1217,6 +1223,7 @@ impl Verifier { &sparql, &base, self.get_peer_id_for_skolem(), + session_id, ) .await { diff --git a/ng-verifier/src/verifier.rs b/ng-verifier/src/verifier.rs index 05de65c..c05298a 100644 --- a/ng-verifier/src/verifier.rs +++ b/ng-verifier/src/verifier.rs @@ -2836,7 +2836,10 @@ impl Verifier { pub async fn app_request(&mut self, req: AppRequest) -> Result { match req { - AppRequest::V0(v0) => self.process(&v0.command, v0.nuri, v0.payload).await, + AppRequest::V0(v0) => { + self.process(&v0.command, v0.nuri, v0.payload, v0.session_id) + .await + } } } From 8472562913a0343bf979dcdc76e6bf1a2c5be32b Mon Sep 17 00:00:00 2001 From: Niko PLP Date: Mon, 13 Oct 2025 15:52:02 +0300 Subject: [PATCH 5/7] orm_update_self --- ng-verifier/src/commits/transaction.rs | 44 ++++++++++++++----- ng-verifier/src/orm/mod.rs | 59 +++++++++++++++++++------- 2 files changed, 76 insertions(+), 27 deletions(-) diff --git a/ng-verifier/src/commits/transaction.rs b/ng-verifier/src/commits/transaction.rs index 0afe73c..b17ba30 100644 --- a/ng-verifier/src/commits/transaction.rs +++ b/ng-verifier/src/commits/transaction.rs @@ -393,14 +393,16 @@ impl Verifier { // TODO: implement TargetBranchV0::Named _ => unimplemented!(), }; - let _ = branches.entry(branch_id).or_insert(( - store.get_store_repo().clone(), - repo.id, - branch_type, - topic_id, - token, - store.overlay_id, - )); + if is_publisher { + let _ = branches.entry(branch_id).or_insert(( + store.get_store_repo().clone(), + repo.id, + branch_type, + topic_id, + token, + store.overlay_id, + )); + } let _ = nuri_branches.entry(graph_name.clone()).or_insert(( repo.id, branch_id, @@ -424,6 +426,9 @@ impl Verifier { // - TODO: abort (the whole transaction) // - TODO: inbox (sent to inbox of document for a suggested update) // for now we just do skip, without giving option to user + let mut revert_inserts: Vec = vec![]; + let mut revert_removes: Vec = vec![]; + let mut skolemnized_blank_nodes: Vec = vec![]; let mut inserts_map: HashMap> = HashMap::with_capacity(1); let mut removes_map: HashMap> = HashMap::with_capacity(1); let mut branches: HashMap< @@ -438,6 +443,7 @@ impl Verifier { let (repo_id, branch_id, is_publisher) = self.find_branch_and_repo_for_quad(&insert, &mut branches, &mut nuri_branches)?; if !is_publisher { + revert_inserts.push(insert); continue; } let set = inserts_map.entry(branch_id).or_insert_with(|| { @@ -463,6 +469,7 @@ impl Verifier { let iri = NuriV0::repo_skolem(&repo_id, &peer_id, b.as_ref().unique_id().unwrap())?; insert.object = Term::NamedNode(NamedNode::new_unchecked(iri)); + skolemnized_blank_nodes.push(insert.clone()); } } // TODO deal with triples in subject and object (RDF-STAR) @@ -473,6 +480,7 @@ impl Verifier { let (repo_id, branch_id, is_publisher) = self.find_branch_and_repo_for_quad(&remove, &mut branches, &mut nuri_branches)?; if !is_publisher { + revert_removes.push(remove); continue; } let set = removes_map.entry(branch_id).or_insert_with(|| { @@ -532,7 +540,22 @@ impl Verifier { }; updates.push(info); } - self.update_graph(updates, session_id).await + match self.update_graph(updates, session_id).await { + Ok(commits) => { + if session_id != 0 { + self.orm_update_self( + &NuriV0::new_empty(), + session_id, + skolemnized_blank_nodes, + revert_inserts, + revert_removes, + ) + .await; + } + Ok(commits) + } + Err(e) => Err(e), + } } async fn update_graph( @@ -743,7 +766,7 @@ impl Verifier { } else { let graph_patch = update.transaction.as_patch(); let nuri = NuriV0::commit(&update.repo_id, &update.commit_id); - commit_nuris.push(nuri.clone()); + commit_nuris.push(nuri); self.push_app_response( &update.branch_id, AppResponse::V0(AppResponseV0::Patch(AppPatch { @@ -759,7 +782,6 @@ impl Verifier { NuriV0::repo_graph_name(&update.repo_id, &update.overlay_id); self.orm_update( &NuriV0::new_empty(), - nuri, session_id, update.transaction.as_quads_patch(graph_nuri), ) diff --git a/ng-verifier/src/orm/mod.rs b/ng-verifier/src/orm/mod.rs index 3b83a64..3f495e0 100644 --- a/ng-verifier/src/orm/mod.rs +++ b/ng-verifier/src/orm/mod.rs @@ -14,6 +14,7 @@ pub mod validation; use futures::channel::mpsc; use futures::channel::mpsc::UnboundedSender; +use ng_oxigraph::oxrdf::Quad; use std::collections::HashMap; use std::collections::HashSet; @@ -625,33 +626,57 @@ impl Verifier { pub(crate) async fn orm_update( &mut self, scope: &NuriV0, - commit_nuri: String, session_id: u64, patch: GraphQuadsPatch, ) { let mut responses = Vec::with_capacity(1); if let Some(subs) = self.orm_subscriptions.get(scope) { - let mut orm_diff: Option = None; + // TODO: implement this, generate orm_diff using the patch + let orm_diff: OrmDiff = vec![]; for sub in subs { - if sub.session_id == session_id { - //TODO prepare OrmUpdateBlankNodeIds - let orm_bnids = vec![]; - + if sub.session_id != session_id { responses.push(( sub.session_id, sub.sender.clone(), - AppResponseV0::OrmUpdateBlankNodeIds(orm_bnids), + AppResponseV0::OrmUpdate(orm_diff.to_vec()), )); - } else { - if orm_diff.is_none() { - //orm_diff = Some(??) - //TODO implement this - } + } + } + } + for (session_id, sender, res) in responses { + self.push_orm_response(scope, session_id, sender, AppResponse::V0(res)) + .await; + } + } + + pub(crate) async fn orm_update_self( + &mut self, + scope: &NuriV0, + session_id: u64, + skolemnized_blank_nodes: Vec, + revert_inserts: Vec, + revert_removes: Vec, + ) { + let mut responses = Vec::with_capacity(1); + if let Some(subs) = self.orm_subscriptions.get(scope) { + for sub in subs { + if sub.session_id == session_id { + // TODO prepare OrmUpdateBlankNodeIds with skolemnized_blank_nodes + // note(niko): I think skolemnized blank nodes can still be many, in case of multi-level nested sub-objects. + let orm_bnids = vec![]; responses.push(( - sub.session_id, + session_id, sub.sender.clone(), - AppResponseV0::OrmUpdate(orm_diff.as_ref().unwrap().to_vec()), + AppResponseV0::OrmUpdateBlankNodeIds(orm_bnids), )); + // TODO (later) revert the inserts and removes + // let orm_diff = vec![]; + // responses.push(( + // session_id, + // sub.sender.clone(), + // AppResponseV0::OrmUpdate(orm_diff), + // )); + break; } } } @@ -672,7 +697,7 @@ impl Verifier { pub(crate) async fn push_orm_response( &mut self, - nuri: &NuriV0, + scope: &NuriV0, session_id: u64, sender: UnboundedSender, response: AppResponse, @@ -682,7 +707,9 @@ impl Verifier { if sender.is_closed() { log_debug!("closed so removing session {}", session_id); - self.orm_subscriptions.remove(&nuri); + if let Some(subs) = self.orm_subscriptions.get_mut(&scope) { + subs.retain(|sub| sub.session_id != session_id); + } } else { let _ = sender.clone().send(response).await; } From ca042c6cf70230a711ce059dc687a121009fa8bf Mon Sep 17 00:00:00 2001 From: Niko PLP Date: Mon, 13 Oct 2025 18:51:11 +0300 Subject: [PATCH 6/7] improved orm_frontend_update, orm_update_self and orm_update --- ng-repo/src/errors.rs | 1 + ng-verifier/src/commits/transaction.rs | 33 ++- ng-verifier/src/inbox_processor.rs | 2 +- ng-verifier/src/orm/mod.rs | 191 +++++++++++------- ng-verifier/src/request_processor.rs | 25 ++- .../src/app/pages/index.astro | 20 +- 6 files changed, 164 insertions(+), 108 deletions(-) diff --git a/ng-repo/src/errors.rs b/ng-repo/src/errors.rs index 0b460b8..87bebc5 100644 --- a/ng-repo/src/errors.rs +++ b/ng-repo/src/errors.rs @@ -400,6 +400,7 @@ pub enum VerifierError { InvalidOrmSchema, OrmSubjectNotFound, OrmPredicateNotFound, + OrmSubscriptionNotFound, } impl Error for VerifierError {} diff --git a/ng-verifier/src/commits/transaction.rs b/ng-verifier/src/commits/transaction.rs index b17ba30..8303fe7 100644 --- a/ng-verifier/src/commits/transaction.rs +++ b/ng-verifier/src/commits/transaction.rs @@ -414,13 +414,18 @@ impl Verifier { } } + /// returns + /// - list of commit Nuris + /// - optional list of revert_inserts + /// - optional list of revert_removes + /// - optional list of skolemnized_blank_nodes pub(crate) async fn prepare_sparql_update( &mut self, inserts: Vec, removes: Vec, peer_id: Vec, session_id: u64, - ) -> Result, VerifierError> { + ) -> Result<(Vec, Vec, Vec, Vec), VerifierError> { // options when not a publisher on the repo: // - skip // - TODO: abort (the whole transaction) @@ -541,19 +546,12 @@ impl Verifier { updates.push(info); } match self.update_graph(updates, session_id).await { - Ok(commits) => { - if session_id != 0 { - self.orm_update_self( - &NuriV0::new_empty(), - session_id, - skolemnized_blank_nodes, - revert_inserts, - revert_removes, - ) - .await; - } - Ok(commits) - } + Ok(commits) => Ok(( + commits, + revert_inserts, + revert_removes, + skolemnized_blank_nodes, + )), Err(e) => Err(e), } } @@ -781,8 +779,9 @@ impl Verifier { let graph_nuri = NuriV0::repo_graph_name(&update.repo_id, &update.overlay_id); self.orm_update( - &NuriV0::new_empty(), session_id, + update.repo_id.clone(), + update.overlay_id, update.transaction.as_quads_patch(graph_nuri), ) .await; @@ -801,7 +800,7 @@ impl Verifier { base: &Option, peer_id: Vec, session_id: u64, - ) -> Result, String> { + ) -> Result<(Vec, Vec, Vec, Vec), String> { let store = self.graph_dataset.as_ref().unwrap(); let update = ng_oxigraph::oxigraph::sparql::Update::parse(query, base.as_deref()) @@ -816,7 +815,7 @@ impl Verifier { Err(e) => Err(e.to_string()), Ok((inserts, removes)) => { if inserts.is_empty() && removes.is_empty() { - Ok(vec![]) + Ok((vec![], vec![], vec![], vec![])) } else { self.prepare_sparql_update( Vec::from_iter(inserts), diff --git a/ng-verifier/src/inbox_processor.rs b/ng-verifier/src/inbox_processor.rs index 64b756f..f6571f2 100644 --- a/ng-verifier/src/inbox_processor.rs +++ b/ng-verifier/src/inbox_processor.rs @@ -593,7 +593,7 @@ impl Verifier { let nuri_ov = NuriV0::repo_graph_name(&response.query_id, &overlay_id); let graph_name = NamedNode::new_unchecked(&nuri_ov); let quads = triples.into_iter().map(|t| t.in_graph(graph_name.clone()) ).collect(); - let commits = self.prepare_sparql_update(quads, vec![], self.get_peer_id_for_skolem(), 0).await?; + let _ = self.prepare_sparql_update(quads, vec![], self.get_peer_id_for_skolem(), 0).await?; } else { diff --git a/ng-verifier/src/orm/mod.rs b/ng-verifier/src/orm/mod.rs index 3f495e0..d9379c5 100644 --- a/ng-verifier/src/orm/mod.rs +++ b/ng-verifier/src/orm/mod.rs @@ -14,7 +14,11 @@ pub mod validation; use futures::channel::mpsc; use futures::channel::mpsc::UnboundedSender; +use ng_net::types::OverlayLink; use ng_oxigraph::oxrdf::Quad; +use ng_repo::errors::VerifierError; +use ng_repo::types::OverlayId; +use ng_repo::types::RepoId; use std::collections::HashMap; use std::collections::HashSet; @@ -381,6 +385,37 @@ impl Verifier { }).next().unwrap() } + pub fn get_first_orm_subscription_sender_for( + &mut self, + nuri: &NuriV0, + shape: Option<&ShapeIri>, + session_id: Option<&u64>, + ) -> Result<(UnboundedSender, &OrmSubscription), VerifierError> { + let subs = self + .orm_subscriptions + .get_mut(nuri) + .unwrap(); + subs.retain(|sub| !sub.sender.is_closed()); + match + // Filter shapes, if present. + subs.iter() + .filter(|s| match shape { + Some(sh) => *sh == s.shape_type.shape, + None => true, // Filter session ids if present. + }) + .filter(|s| match session_id { + Some(id) => *id == s.session_id, + None => true, + }) + .next() + { + None => Err(VerifierError::OrmSubscriptionNotFound), + Some(subscription) => { + Ok((subscription.sender.clone(), subscription)) + } + } + } + /// Apply triples to a nuri's document. /// Updates tracked_subjects in orm_subscriptions. fn apply_triple_changes( @@ -625,93 +660,113 @@ impl Verifier { pub(crate) async fn orm_update( &mut self, - scope: &NuriV0, session_id: u64, + repo_id: RepoId, + overlay_id: OverlayId, patch: GraphQuadsPatch, ) { - let mut responses = Vec::with_capacity(1); - if let Some(subs) = self.orm_subscriptions.get(scope) { - // TODO: implement this, generate orm_diff using the patch - let orm_diff: OrmDiff = vec![]; - for sub in subs { - if sub.session_id != session_id { - responses.push(( - sub.session_id, - sub.sender.clone(), - AppResponseV0::OrmUpdate(orm_diff.to_vec()), - )); + let overlaylink: OverlayLink = overlay_id.into(); + for (scope, subs) in self.orm_subscriptions.iter_mut() { + subs.retain(|sub| !sub.sender.is_closed()); + if scope.entire_store + || scope.overlay.as_ref().map_or(false, |ol| overlaylink == *ol) + || scope.target == NuriTargetV0::Repo(repo_id) + { + for sub in subs { + if sub.session_id != session_id { // this is incorrect. we are excluding all the subscriptions from the originating session, + // while we should only exclude the one with exact same shape_type. but we don't have access to that here + + // TODO: implement this, generate orm_diff using the patch and the sub.shape_type + let orm_diff: OrmDiff = vec![]; + + _ = sub.sender.send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff.to_vec()))).await; + } } } } - for (session_id, sender, res) in responses { - self.push_orm_response(scope, session_id, sender, AppResponse::V0(res)) - .await; - } } pub(crate) async fn orm_update_self( &mut self, scope: &NuriV0, + shape_iri: ShapeIri, session_id: u64, skolemnized_blank_nodes: Vec, revert_inserts: Vec, revert_removes: Vec, - ) { - let mut responses = Vec::with_capacity(1); - if let Some(subs) = self.orm_subscriptions.get(scope) { - for sub in subs { - if sub.session_id == session_id { - // TODO prepare OrmUpdateBlankNodeIds with skolemnized_blank_nodes - // note(niko): I think skolemnized blank nodes can still be many, in case of multi-level nested sub-objects. - let orm_bnids = vec![]; - responses.push(( - session_id, - sub.sender.clone(), - AppResponseV0::OrmUpdateBlankNodeIds(orm_bnids), - )); - // TODO (later) revert the inserts and removes - // let orm_diff = vec![]; - // responses.push(( - // session_id, - // sub.sender.clone(), - // AppResponseV0::OrmUpdate(orm_diff), - // )); - break; - } - } - } - for (session_id, sender, res) in responses { - self.push_orm_response(scope, session_id, sender, AppResponse::V0(res)) - .await; - } + ) -> Result<(), VerifierError> { + + let (mut sender, orm_subscription) = + self.get_first_orm_subscription_sender_for(scope, Some(&shape_iri), Some(&session_id))?; + + // TODO prepare OrmUpdateBlankNodeIds with skolemnized_blank_nodes + // use orm_subscription if needed + // note(niko): I think skolemnized blank nodes can still be many, in case of multi-level nested sub-objects. + let orm_bnids = vec![]; + let _ = sender.send(AppResponse::V0(AppResponseV0::OrmUpdateBlankNodeIds(orm_bnids))).await; + + // TODO (later) revert the inserts and removes + // let orm_diff = vec![]; + // let _ = sender.send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff))).await; + + Ok(()) } pub(crate) async fn orm_frontend_update( &mut self, + session_id: u64, scope: &NuriV0, shape_iri: ShapeIri, diff: OrmDiff, - ) { - log_info!("frontend_update_orm {:?} {} {:?}", scope, shape_iri, diff); - } - - pub(crate) async fn push_orm_response( - &mut self, - scope: &NuriV0, - session_id: u64, - sender: UnboundedSender, - response: AppResponse, - ) { - log_debug!("sending orm response for session {}:", session_id); - - if sender.is_closed() { - log_debug!("closed so removing session {}", session_id); + ) -> Result<(), String> { + log_info!( + "frontend_update_orm session={} scope={:?} shape={} diff={:?}", + session_id, + scope, + shape_iri, + diff + ); + + // find OrmSubscription + let (doc_nuri, sparql_update) = { + let orm_subscription = + self.get_first_orm_subscription_for(scope, Some(&shape_iri), Some(&session_id)); + + // use orm_subscription as needed + // do the magic, then, find the doc where the query should start and generate the sparql update + let doc_nuri = NuriV0::new_empty(); + let sparql_update: String = String::new(); + (doc_nuri, sparql_update) + }; - if let Some(subs) = self.orm_subscriptions.get_mut(&scope) { - subs.retain(|sub| sub.session_id != session_id); + match self + .process_sparql_update( + &doc_nuri, + &sparql_update, + &None, + self.get_peer_id_for_skolem(), + session_id, + ) + .await + { + Err(e) => Err(e), + Ok((_, revert_inserts, revert_removes, skolemnized_blank_nodes)) => { + if !revert_inserts.is_empty() + || !revert_removes.is_empty() + || !skolemnized_blank_nodes.is_empty() + { + self.orm_update_self( + scope, + shape_iri, + session_id, + skolemnized_blank_nodes, + revert_inserts, + revert_removes, + ) + .await.map_err(|e|e.to_string())?; + } + Ok(()) } - } else { - let _ = sender.clone().send(response).await; } } @@ -730,7 +785,7 @@ impl Verifier { shape_type: &OrmShapeType, session_id: u64, ) -> Result<(Receiver, CancelFn), NgError> { - let (tx, rx) = mpsc::unbounded::(); + let (mut tx, rx) = mpsc::unbounded::(); // TODO: Validate schema: // If multiple data types are present for the same predicate, they must be of of the same type. @@ -753,13 +808,7 @@ impl Verifier { let orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type)?; // log_debug!("create_orm_object_for_shape return {:?}", orm_objects); - self.push_orm_response( - &nuri.clone(), - session_id, - tx.clone(), - AppResponse::V0(AppResponseV0::OrmInitial(orm_objects)), - ) - .await; + let _ = tx.send(AppResponse::V0(AppResponseV0::OrmInitial(orm_objects))).await; let close = Box::new(move || { log_debug!("CLOSE_CHANNEL of subscription"); diff --git a/ng-verifier/src/request_processor.rs b/ng-verifier/src/request_processor.rs index e1b8a5f..ab24952 100644 --- a/ng-verifier/src/request_processor.rs +++ b/ng-verifier/src/request_processor.rs @@ -212,13 +212,14 @@ impl Verifier { if inserts.is_empty() && removes.is_empty() { Ok(()) } else { - self.prepare_sparql_update( - Vec::from_iter(inserts), - Vec::from_iter(removes), - self.get_peer_id_for_skolem(), - 0, - ) - .await?; + let _ = self + .prepare_sparql_update( + Vec::from_iter(inserts), + Vec::from_iter(removes), + self.get_peer_id_for_skolem(), + 0, + ) + .await?; Ok(()) } } @@ -903,7 +904,13 @@ impl Verifier { match command { AppRequestCommandV0::OrmUpdate => match payload { Some(AppRequestPayload::V0(AppRequestPayloadV0::OrmUpdate((diff, shape_id)))) => { - self.orm_frontend_update(&nuri, shape_id, diff).await + return match self + .orm_frontend_update(session_id, &nuri, shape_id, diff) + .await + { + Err(e) => Ok(AppResponse::error(e)), + Ok(()) => Ok(AppResponse::ok()), + } } _ => return Err(NgError::InvalidArgument), }, @@ -1228,7 +1235,7 @@ impl Verifier { .await { Err(e) => AppResponse::error(e), - Ok(commits) => AppResponse::commits(commits), + Ok((commits, ..)) => AppResponse::commits(commits), }, ) } else { diff --git a/sdk/ng-sdk-js/examples/multi-framework-signals/src/app/pages/index.astro b/sdk/ng-sdk-js/examples/multi-framework-signals/src/app/pages/index.astro index a868115..a5e636d 100644 --- a/sdk/ng-sdk-js/examples/multi-framework-signals/src/app/pages/index.astro +++ b/sdk/ng-sdk-js/examples/multi-framework-signals/src/app/pages/index.astro @@ -6,21 +6,21 @@ import VueRoot from "../components/VueRoot.vue"; import ReactRoot from "../components/ReactRoot"; import SvelteRoot from "../components/SvelteRoot.svelte"; // Hack to get mock backend started -import { mockTestObject } from "../../ng-mock/wasm-land/shapeHandler"; +//import { mockTestObject } from "../../ng-mock/wasm-land/shapeHandler"; const title = "Multi-framework app"; --- - - - + + + - - - + + + - - - + + + From 0267abb31cb673e224177333dd0d2f89d0a84619 Mon Sep 17 00:00:00 2001 From: Niko PLP Date: Mon, 13 Oct 2025 20:16:36 +0300 Subject: [PATCH 7/7] ... --- ng-verifier/src/orm/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ng-verifier/src/orm/mod.rs b/ng-verifier/src/orm/mod.rs index d9379c5..08ef07d 100644 --- a/ng-verifier/src/orm/mod.rs +++ b/ng-verifier/src/orm/mod.rs @@ -811,7 +811,7 @@ impl Verifier { let _ = tx.send(AppResponse::V0(AppResponseV0::OrmInitial(orm_objects))).await; let close = Box::new(move || { - log_debug!("CLOSE_CHANNEL of subscription"); + log_debug!("closing ORM subscription"); if !tx.is_closed() { tx.close_channel(); }