wip, more tests

feat/orm
Laurin Weger 1 week ago
parent 2dc6136d51
commit 8db51578be
No known key found for this signature in database
GPG Key ID: 9B372BB0B792770F
  1. 1
      nextgraph/src/local_broker.rs
  2. 315
      nextgraph/src/tests/orm.rs
  3. 27
      ng-verifier/src/orm/mod.rs

@ -13,7 +13,6 @@ use std::fs::{read, remove_file, write};
use std::path::PathBuf; use std::path::PathBuf;
use async_once_cell::OnceCell; use async_once_cell::OnceCell;
use async_std::prelude::FutureExt;
use async_std::sync::{Arc, Condvar, Mutex, RwLock}; use async_std::sync::{Arc, Condvar, Mutex, RwLock};
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};

@ -330,19 +330,9 @@ INSERT DATA {
async fn test_orm_creation() { async fn test_orm_creation() {
// Setup wallet and document // Setup wallet and document
let (_wallet, session_id) = create_or_open_wallet().await; let (_wallet, session_id) = create_or_open_wallet().await;
let doc_nuri = doc_create( let doc_nuri = create_doc_with_data(
session_id, session_id,
"Graph".to_string(), r#"
"test_orm_query".to_string(),
"store".to_string(),
None,
None,
)
.await
.expect("error creating doc");
// Insert data with unrelated predicates
let insert_sparql = r#"
PREFIX ex: <http://example.org/> PREFIX ex: <http://example.org/>
INSERT DATA { INSERT DATA {
<urn:test:obj1> a ex:TestObject ; <urn:test:obj1> a ex:TestObject ;
@ -391,11 +381,9 @@ INSERT DATA {
ex:anotherUnrelated 42422 . ex:anotherUnrelated 42422 .
} }
"# "#
.to_string(); .to_string(),
)
doc_sparql_update(session_id, insert_sparql, Some(doc_nuri.clone())) .await;
.await
.expect("SPARQL update failed");
// Define the ORM schema // Define the ORM schema
let schema = create_big_schema(); let schema = create_big_schema();
@ -411,12 +399,115 @@ INSERT DATA {
.await .await
.expect("orm_start"); .expect("orm_start");
log_info!("orm_start called"); log_info!("orm_start call ended");
// TODO: remove this call to cancel_fn() while let Some(app_response) = receiver.next().await {
match app_response {
AppResponse::V0(v) => match v {
AppResponseV0::OrmInitial(json) => {
log_info!("ORM JSON arrived\n: {:?}", json);
break;
}
_ => (),
},
}
}
cancel_fn(); cancel_fn();
//
// ===
test_orm_root_array(session_id).await;
// ===
test_orm_with_optional(session_id).await;
// ===
test_orm_literal(session_id).await;
}
async fn test_orm_root_array(session_id: u64) {
let doc_nuri = create_doc_with_data(
session_id,
r#"
PREFIX ex: <http://example.org/>
INSERT DATA {
<urn:test:numArrayObj1> a ex:TestObject ;
ex:arr 1, 2, 3 .
<urn:test:numArrayObj2> a ex:TestObject .
<urn:test:numArrayObj3> a ex:TestObject ;
ex:unrelated ex:TestObject ;
ex:arr 1, 2 .
# Invalid
<urn:test:otherObj> a ex:Other ;
ex:arr 1, 2 .
# Invalid
<urn:test:numStringArrayObj1> a ex:TestObject ;
ex:unrelated ex:TestObject ;
ex:arr 1, "2" .
# Invalid
<urn:test:stringArrayObj1> a ex:TestObject ;
ex:unrelated ex:TestObject ;
ex:arr "1", "2" .
}
"#
.to_string(),
)
.await;
// Define the ORM schema
let mut schema = HashMap::new();
schema.insert(
"http://example.org/TestShape".to_string(),
OrmSchemaShape {
iri: "http://example.org/TestShape".to_string(),
predicates: vec![
OrmSchemaPredicate {
iri: "http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(),
extra: Some(false),
maxCardinality: 1,
minCardinality: 1,
readablePredicate: "type".to_string(),
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::literal,
literals: Some(vec![BasicType::Str(
"http://example.org/TestObject".to_string(),
)]),
shape: None,
}],
}
.into(),
OrmSchemaPredicate {
iri: "http://example.org/arr".to_string(),
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::number,
literals: None,
shape: None,
}],
extra: Some(false),
maxCardinality: -1,
minCardinality: 0,
readablePredicate: "numArray".to_string(),
}
.into(),
],
}
.into(),
);
// TODO : change the data with sparql_query let shape_type = OrmShapeType {
schema,
shape: "http://example.org/TestShape".to_string(),
};
let nuri = NuriV0::new_from(&doc_nuri).expect("parse nuri");
let (mut receiver, cancel_fn) = orm_start(nuri, shape_type, session_id)
.await
.expect("orm_start");
while let Some(app_response) = receiver.next().await { while let Some(app_response) = receiver.next().await {
let orm_json = match app_response { let orm_json = match app_response {
@ -429,11 +520,171 @@ INSERT DATA {
log_info!("ORM JSON arrived\n: {:?}", orm_json); log_info!("ORM JSON arrived\n: {:?}", orm_json);
// TODO: after we got what we wanted, call cancel_fn, otherwise the test never ends. break;
} }
// cancel_fn();
}
async fn test_orm_with_optional(session_id: u64) {
let doc_nuri = create_doc_with_data(
session_id,
r#"
PREFIX ex: <http://example.org/>
INSERT DATA {
<urn:test:oj1>
ex:opt true ;
ex:str "s1" .
<urn:test:oj2>
ex:str "s2" .
}
"#
.to_string(),
)
.await;
// Define the ORM schema
let mut schema = HashMap::new();
schema.insert(
"http://example.org/OptionShape".to_string(),
OrmSchemaShape {
iri: "http://example.org/OptionShape".to_string(),
predicates: vec![OrmSchemaPredicate {
iri: "http://example.org/opt".to_string(),
extra: Some(false),
maxCardinality: 1,
minCardinality: 1,
readablePredicate: "opt".to_string(),
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::boolean,
literals: None,
shape: None,
}],
}
.into()],
}
.into(),
);
let shape_type = OrmShapeType {
schema,
shape: "http://example.org/OptionShape".to_string(),
};
let nuri = NuriV0::new_from(&doc_nuri).expect("parse nuri");
let (mut receiver, cancel_fn) = orm_start(nuri, shape_type, session_id)
.await
.expect("orm_start");
while let Some(app_response) = receiver.next().await {
let orm_json = match app_response {
AppResponse::V0(v) => match v {
AppResponseV0::OrmInitial(json) => Some(json),
_ => None,
},
}
.unwrap();
log_info!("ORM JSON arrived for optional test\n: {:?}", orm_json);
break;
}
cancel_fn();
} }
async fn test_orm_literal(session_id: u64) {
let doc_nuri = create_doc_with_data(
session_id,
r#"
PREFIX ex: <http://example.org/>
INSERT DATA {
<urn:test:oj1>
ex:lit1 "lit 1" ;
ex:lit2 "lit 2" .
# Valid because ex:lit1 allows extra.
<urn:test:obj2>
ex:lit1 "lit 1", "lit 1 extra" ;
ex:lit2 "lit 2" .
# Invalid because ex:lit2 does not allow extra.
<urn:test:obj3>
ex:lit1 "lit 1" ;
ex:lit2 "lit 2", "lit 2 extra" .
}
"#
.to_string(),
)
.await;
// Define the ORM schema
let mut schema = HashMap::new();
schema.insert(
"http://example.org/OptionShape".to_string(),
OrmSchemaShape {
iri: "http://example.org/OptionShape".to_string(),
predicates: vec![
OrmSchemaPredicate {
iri: "http://example.org/lit1".to_string(),
extra: Some(true),
maxCardinality: -1,
minCardinality: 1,
readablePredicate: "lit1".to_string(),
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::literal,
literals: Some(vec![BasicType::Str("lit 1".to_string())]),
shape: None,
}],
}
.into(),
OrmSchemaPredicate {
iri: "http://example.org/lit2".to_string(),
extra: Some(false),
maxCardinality: 1,
minCardinality: 1,
readablePredicate: "lit2".to_string(),
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::literal,
literals: Some(vec![BasicType::Str("lit 2".to_string())]),
shape: None,
}],
}
.into(),
],
}
.into(),
);
// TODO =======
// obj3 valid even though it should not.
let shape_type = OrmShapeType {
schema,
shape: "http://example.org/OptionShape".to_string(),
};
let nuri = NuriV0::new_from(&doc_nuri).expect("parse nuri");
let (mut receiver, cancel_fn) = orm_start(nuri, shape_type, session_id)
.await
.expect("orm_start");
while let Some(app_response) = receiver.next().await {
let orm_json = match app_response {
AppResponse::V0(v) => match v {
AppResponseV0::OrmInitial(json) => Some(json),
_ => None,
},
}
.unwrap();
log_info!("ORM JSON arrived for literal test\n: {:?}", orm_json);
break;
}
cancel_fn();
}
//
// Helpers
fn create_big_schema() -> OrmSchema { fn create_big_schema() -> OrmSchema {
// Define the ORM schema // Define the ORM schema
let mut schema: OrmSchema = HashMap::new(); let mut schema: OrmSchema = HashMap::new();
@ -660,3 +911,23 @@ fn create_big_schema() -> OrmSchema {
return schema; return schema;
} }
async fn create_doc_with_data(session_id: u64, sparql_insert: String) -> String {
let doc_nuri = doc_create(
session_id,
"Graph".to_string(),
"test_orm_query".to_string(),
"store".to_string(),
None,
None,
)
.await
.expect("error creating doc");
// Insert data
doc_sparql_update(session_id, sparql_insert, Some(doc_nuri.clone()))
.await
.expect("SPARQL update failed");
return doc_nuri;
}

@ -282,6 +282,9 @@ impl Verifier {
currently_validating.remove(&validation_key); currently_validating.remove(&validation_key);
} }
// TODO: Currently, all shape <-> nested subject combinations are queued for re-evaluation.
// Is that okay?
// Now, we queue all non-evaluated objects // Now, we queue all non-evaluated objects
for (shape_iri, objects_to_eval) in &nested_objects_to_eval { for (shape_iri, objects_to_eval) in &nested_objects_to_eval {
let orm_subscription = self.get_first_orm_subscription_for( let orm_subscription = self.get_first_orm_subscription_for(
@ -434,11 +437,19 @@ impl Verifier {
let mut orm_obj = json!({"id": change.subject_iri}); let mut orm_obj = json!({"id": change.subject_iri});
let orm_obj_map = orm_obj.as_object_mut().unwrap(); let orm_obj_map = orm_obj.as_object_mut().unwrap();
for pred_schema in &shape.predicates { for pred_schema in &shape.predicates {
let property_name = &pred_schema.readablePredicate;
let is_multi = pred_schema.maxCardinality > 1 || pred_schema.maxCardinality == -1;
let Some(pred_change) = change.predicates.get(&pred_schema.iri) else { let Some(pred_change) = change.predicates.get(&pred_schema.iri) else {
// No triples for this property.
if pred_schema.minCardinality == 0 && is_multi {
// If this predicate schema is an array though, insert empty array.
orm_obj_map.insert(property_name.clone(), Value::Array(vec![]));
}
continue; continue;
}; };
let property_name = &pred_schema.readablePredicate;
let is_multi = pred_schema.maxCardinality > 1 || pred_schema.maxCardinality == -1;
if pred_schema if pred_schema
.dataTypes .dataTypes
@ -624,18 +635,14 @@ impl Verifier {
sender: UnboundedSender<AppResponse>, sender: UnboundedSender<AppResponse>,
response: AppResponse, response: AppResponse,
) { ) {
log_debug!( log_debug!("sending orm response for session {}:", session_id);
"sending orm response for session {}:\n{:?}",
session_id,
&response
);
if sender.is_closed() { if sender.is_closed() {
log_debug!("closed so removing session {}", session_id); log_debug!("closed so removing session {}", session_id);
self.orm_subscriptions.remove(&nuri); self.orm_subscriptions.remove(&nuri);
} else { } else {
sender.clone().send(response); let _ = sender.clone().send(response).await;
} }
} }
@ -675,7 +682,7 @@ impl Verifier {
.push(orm_subscription); .push(orm_subscription);
let _orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type)?; let _orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type)?;
log_debug!("create_orm_object_for_shape return {:?}", _orm_objects); // log_debug!("create_orm_object_for_shape return {:?}", _orm_objects);
self.push_orm_response( self.push_orm_response(
&nuri.clone(), &nuri.clone(),
@ -686,7 +693,7 @@ impl Verifier {
.await; .await;
let close = Box::new(move || { let close = Box::new(move || {
//log_debug!("CLOSE_CHANNEL of subscription for branch {}", branch_id); log_debug!("CLOSE_CHANNEL of subscription");
if !tx.is_closed() { if !tx.is_closed() {
tx.close_channel(); tx.close_channel();
} }

Loading…
Cancel
Save