diff --git a/nextgraph/src/local_broker.rs b/nextgraph/src/local_broker.rs index 6624b33..d43a849 100644 --- a/nextgraph/src/local_broker.rs +++ b/nextgraph/src/local_broker.rs @@ -13,7 +13,6 @@ use std::fs::{read, remove_file, write}; use std::path::PathBuf; use async_once_cell::OnceCell; -use async_std::prelude::FutureExt; use async_std::sync::{Arc, Condvar, Mutex, RwLock}; use futures::channel::mpsc; use futures::{SinkExt, StreamExt}; diff --git a/nextgraph/src/tests/orm.rs b/nextgraph/src/tests/orm.rs index c172c2e..b829893 100644 --- a/nextgraph/src/tests/orm.rs +++ b/nextgraph/src/tests/orm.rs @@ -330,19 +330,9 @@ INSERT DATA { async fn test_orm_creation() { // Setup wallet and document let (_wallet, session_id) = create_or_open_wallet().await; - let doc_nuri = doc_create( + let doc_nuri = create_doc_with_data( session_id, - "Graph".to_string(), - "test_orm_query".to_string(), - "store".to_string(), - None, - None, - ) - .await - .expect("error creating doc"); - - // Insert data with unrelated predicates - let insert_sparql = r#" + r#" PREFIX ex: INSERT DATA { a ex:TestObject ; @@ -391,11 +381,9 @@ INSERT DATA { ex:anotherUnrelated 42422 . } "# - .to_string(); - - doc_sparql_update(session_id, insert_sparql, Some(doc_nuri.clone())) - .await - .expect("SPARQL update failed"); + .to_string(), + ) + .await; // Define the ORM schema let schema = create_big_schema(); @@ -411,12 +399,115 @@ INSERT DATA { .await .expect("orm_start"); - log_info!("orm_start called"); + log_info!("orm_start call ended"); - // TODO: remove this call to cancel_fn() + while let Some(app_response) = receiver.next().await { + match app_response { + AppResponse::V0(v) => match v { + AppResponseV0::OrmInitial(json) => { + log_info!("ORM JSON arrived\n: {:?}", json); + break; + } + _ => (), + }, + } + } cancel_fn(); + // + + // === + test_orm_root_array(session_id).await; + + // === + test_orm_with_optional(session_id).await; + + // === + test_orm_literal(session_id).await; +} + +async fn test_orm_root_array(session_id: u64) { + let doc_nuri = create_doc_with_data( + session_id, + r#" +PREFIX ex: +INSERT DATA { + a ex:TestObject ; + ex:arr 1, 2, 3 . + + a ex:TestObject . + + a ex:TestObject ; + ex:unrelated ex:TestObject ; + ex:arr 1, 2 . + + # Invalid + a ex:Other ; + ex:arr 1, 2 . + + # Invalid + a ex:TestObject ; + ex:unrelated ex:TestObject ; + ex:arr 1, "2" . + + # Invalid + a ex:TestObject ; + ex:unrelated ex:TestObject ; + ex:arr "1", "2" . +} +"# + .to_string(), + ) + .await; + + // Define the ORM schema + let mut schema = HashMap::new(); + schema.insert( + "http://example.org/TestShape".to_string(), + OrmSchemaShape { + iri: "http://example.org/TestShape".to_string(), + predicates: vec![ + OrmSchemaPredicate { + iri: "http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(), + extra: Some(false), + maxCardinality: 1, + minCardinality: 1, + readablePredicate: "type".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::literal, + literals: Some(vec![BasicType::Str( + "http://example.org/TestObject".to_string(), + )]), + shape: None, + }], + } + .into(), + OrmSchemaPredicate { + iri: "http://example.org/arr".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::number, + literals: None, + shape: None, + }], + extra: Some(false), + maxCardinality: -1, + minCardinality: 0, + readablePredicate: "numArray".to_string(), + } + .into(), + ], + } + .into(), + ); - // TODO : change the data with sparql_query + let shape_type = OrmShapeType { + schema, + shape: "http://example.org/TestShape".to_string(), + }; + + let nuri = NuriV0::new_from(&doc_nuri).expect("parse nuri"); + let (mut receiver, cancel_fn) = orm_start(nuri, shape_type, session_id) + .await + .expect("orm_start"); while let Some(app_response) = receiver.next().await { let orm_json = match app_response { @@ -429,11 +520,171 @@ INSERT DATA { log_info!("ORM JSON arrived\n: {:?}", orm_json); - // TODO: after we got what we wanted, call cancel_fn, otherwise the test never ends. + break; } - // + cancel_fn(); +} + +async fn test_orm_with_optional(session_id: u64) { + let doc_nuri = create_doc_with_data( + session_id, + r#" +PREFIX ex: +INSERT DATA { + + ex:opt true ; + ex:str "s1" . + + + ex:str "s2" . } +"# + .to_string(), + ) + .await; + // Define the ORM schema + let mut schema = HashMap::new(); + schema.insert( + "http://example.org/OptionShape".to_string(), + OrmSchemaShape { + iri: "http://example.org/OptionShape".to_string(), + predicates: vec![OrmSchemaPredicate { + iri: "http://example.org/opt".to_string(), + extra: Some(false), + maxCardinality: 1, + minCardinality: 1, + readablePredicate: "opt".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::boolean, + literals: None, + shape: None, + }], + } + .into()], + } + .into(), + ); + + let shape_type = OrmShapeType { + schema, + shape: "http://example.org/OptionShape".to_string(), + }; + + let nuri = NuriV0::new_from(&doc_nuri).expect("parse nuri"); + let (mut receiver, cancel_fn) = orm_start(nuri, shape_type, session_id) + .await + .expect("orm_start"); + + while let Some(app_response) = receiver.next().await { + let orm_json = match app_response { + AppResponse::V0(v) => match v { + AppResponseV0::OrmInitial(json) => Some(json), + _ => None, + }, + } + .unwrap(); + + log_info!("ORM JSON arrived for optional test\n: {:?}", orm_json); + + break; + } + cancel_fn(); +} + +async fn test_orm_literal(session_id: u64) { + let doc_nuri = create_doc_with_data( + session_id, + r#" +PREFIX ex: +INSERT DATA { + + ex:lit1 "lit 1" ; + ex:lit2 "lit 2" . + + # Valid because ex:lit1 allows extra. + + ex:lit1 "lit 1", "lit 1 extra" ; + ex:lit2 "lit 2" . + + # Invalid because ex:lit2 does not allow extra. + + ex:lit1 "lit 1" ; + ex:lit2 "lit 2", "lit 2 extra" . +} +"# + .to_string(), + ) + .await; + + // Define the ORM schema + let mut schema = HashMap::new(); + schema.insert( + "http://example.org/OptionShape".to_string(), + OrmSchemaShape { + iri: "http://example.org/OptionShape".to_string(), + predicates: vec![ + OrmSchemaPredicate { + iri: "http://example.org/lit1".to_string(), + extra: Some(true), + maxCardinality: -1, + minCardinality: 1, + readablePredicate: "lit1".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::literal, + literals: Some(vec![BasicType::Str("lit 1".to_string())]), + shape: None, + }], + } + .into(), + OrmSchemaPredicate { + iri: "http://example.org/lit2".to_string(), + extra: Some(false), + maxCardinality: 1, + minCardinality: 1, + readablePredicate: "lit2".to_string(), + dataTypes: vec![OrmSchemaDataType { + valType: OrmSchemaLiteralType::literal, + literals: Some(vec![BasicType::Str("lit 2".to_string())]), + shape: None, + }], + } + .into(), + ], + } + .into(), + ); + + // TODO ======= + // obj3 valid even though it should not. + + let shape_type = OrmShapeType { + schema, + shape: "http://example.org/OptionShape".to_string(), + }; + + let nuri = NuriV0::new_from(&doc_nuri).expect("parse nuri"); + let (mut receiver, cancel_fn) = orm_start(nuri, shape_type, session_id) + .await + .expect("orm_start"); + + while let Some(app_response) = receiver.next().await { + let orm_json = match app_response { + AppResponse::V0(v) => match v { + AppResponseV0::OrmInitial(json) => Some(json), + _ => None, + }, + } + .unwrap(); + + log_info!("ORM JSON arrived for literal test\n: {:?}", orm_json); + + break; + } + cancel_fn(); +} +// +// Helpers fn create_big_schema() -> OrmSchema { // Define the ORM schema let mut schema: OrmSchema = HashMap::new(); @@ -660,3 +911,23 @@ fn create_big_schema() -> OrmSchema { return schema; } + +async fn create_doc_with_data(session_id: u64, sparql_insert: String) -> String { + let doc_nuri = doc_create( + session_id, + "Graph".to_string(), + "test_orm_query".to_string(), + "store".to_string(), + None, + None, + ) + .await + .expect("error creating doc"); + + // Insert data + doc_sparql_update(session_id, sparql_insert, Some(doc_nuri.clone())) + .await + .expect("SPARQL update failed"); + + return doc_nuri; +} diff --git a/ng-verifier/src/orm/mod.rs b/ng-verifier/src/orm/mod.rs index 0c742a7..8a835d6 100644 --- a/ng-verifier/src/orm/mod.rs +++ b/ng-verifier/src/orm/mod.rs @@ -282,6 +282,9 @@ impl Verifier { currently_validating.remove(&validation_key); } + // TODO: Currently, all shape <-> nested subject combinations are queued for re-evaluation. + // Is that okay? + // Now, we queue all non-evaluated objects for (shape_iri, objects_to_eval) in &nested_objects_to_eval { let orm_subscription = self.get_first_orm_subscription_for( @@ -434,11 +437,19 @@ impl Verifier { let mut orm_obj = json!({"id": change.subject_iri}); let orm_obj_map = orm_obj.as_object_mut().unwrap(); for pred_schema in &shape.predicates { + let property_name = &pred_schema.readablePredicate; + let is_multi = pred_schema.maxCardinality > 1 || pred_schema.maxCardinality == -1; + let Some(pred_change) = change.predicates.get(&pred_schema.iri) else { + // No triples for this property. + + if pred_schema.minCardinality == 0 && is_multi { + // If this predicate schema is an array though, insert empty array. + orm_obj_map.insert(property_name.clone(), Value::Array(vec![])); + } + continue; }; - let property_name = &pred_schema.readablePredicate; - let is_multi = pred_schema.maxCardinality > 1 || pred_schema.maxCardinality == -1; if pred_schema .dataTypes @@ -624,18 +635,14 @@ impl Verifier { sender: UnboundedSender, response: AppResponse, ) { - log_debug!( - "sending orm response for session {}:\n{:?}", - session_id, - &response - ); + log_debug!("sending orm response for session {}:", session_id); if sender.is_closed() { log_debug!("closed so removing session {}", session_id); self.orm_subscriptions.remove(&nuri); } else { - sender.clone().send(response); + let _ = sender.clone().send(response).await; } } @@ -675,7 +682,7 @@ impl Verifier { .push(orm_subscription); let _orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type)?; - log_debug!("create_orm_object_for_shape return {:?}", _orm_objects); + // log_debug!("create_orm_object_for_shape return {:?}", _orm_objects); self.push_orm_response( &nuri.clone(), @@ -686,7 +693,7 @@ impl Verifier { .await; let close = Box::new(move || { - //log_debug!("CLOSE_CHANNEL of subscription for branch {}", branch_id); + log_debug!("CLOSE_CHANNEL of subscription"); if !tx.is_closed() { tx.close_channel(); }