making Rust compiler happy

feat/orm
Niko PLP 2 weeks ago
parent d641a93f1f
commit c71df72e92
  1. 128
      nextgraph/src/tests/orm.rs
  2. 18
      ng-net/src/orm.rs
  3. 52
      ng-verifier/src/commits/mod.rs
  4. 173
      ng-verifier/src/orm.rs
  5. 166
      ng-verifier/src/utils/orm_validation.rs

@ -10,15 +10,15 @@
use crate::local_broker::{doc_create, doc_sparql_construct, doc_sparql_update};
use crate::tests::create_or_open_wallet::create_or_open_wallet;
use ng_net::orm::{
OrmSchemaDataType, OrmSchemaLiteralType, OrmSchemaLiterals, OrmSchemaPredicate, OrmSchemaShape,
OrmShapeType,
BasicType, OrmSchemaDataType, OrmSchemaLiteralType, OrmSchemaPredicate, OrmSchemaShape, OrmShapeType
};
use ng_repo::log_info;
use ng_verifier::orm::shape_type_to_sparql;
use std::collections::HashMap;
use std::sync::Arc;
#[async_std::test]
#async fn test_create_sparql_from_schema() {
async fn test_create_sparql_from_schema() {
// Setup wallet and document
let (_wallet, session_id) = create_or_open_wallet().await;
let doc_nuri = doc_create(
@ -70,10 +70,10 @@ INSERT DATA {
schema.insert(
"http://example.org/TestObject||http://example.org/anotherObject".to_string(),
OrmSchemaShape {
Arc::new(OrmSchemaShape {
iri: "http://example.org/TestObject||http://example.org/anotherObject".to_string(),
predicates: vec![
OrmSchemaPredicate {
Arc::new(OrmSchemaPredicate {
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::string,
literals: None,
@ -84,8 +84,8 @@ INSERT DATA {
maxCardinality: 1,
minCardinality: 1,
extra: None,
},
OrmSchemaPredicate {
}),
Arc::new(OrmSchemaPredicate {
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::number,
literals: None,
@ -96,22 +96,22 @@ INSERT DATA {
maxCardinality: 1,
minCardinality: 1,
extra: None,
},
}),
],
},
}),
);
schema.insert(
"http://example.org/TestObject".to_string(),
OrmSchemaShape {
Arc::new(OrmSchemaShape {
iri: "http://example.org/TestObject".to_string(),
predicates: vec![
OrmSchemaPredicate {
Arc::new(OrmSchemaPredicate {
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::literal,
literals: Some(OrmSchemaLiterals::StrArray(vec![
"http://example.org/TestObject".to_string(),
])),
literals: Some(vec![
BasicType::Str("http://example.org/TestObject".to_string()),
]),
shape: None,
}],
iri: "http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(),
@ -119,8 +119,8 @@ INSERT DATA {
maxCardinality: 1,
minCardinality: 1,
extra: Some(true),
},
OrmSchemaPredicate {
}),
Arc::new(OrmSchemaPredicate {
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::string,
literals: None,
@ -131,8 +131,8 @@ INSERT DATA {
maxCardinality: 1,
minCardinality: 1,
extra: None,
},
OrmSchemaPredicate {
}),
Arc::new(OrmSchemaPredicate {
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::number,
literals: None,
@ -143,8 +143,8 @@ INSERT DATA {
maxCardinality: 1,
minCardinality: 1,
extra: None,
},
OrmSchemaPredicate {
}),
Arc::new(OrmSchemaPredicate {
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::boolean,
literals: None,
@ -155,8 +155,8 @@ INSERT DATA {
maxCardinality: 1,
minCardinality: 1,
extra: None,
},
OrmSchemaPredicate {
}),
Arc::new(OrmSchemaPredicate {
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::number,
literals: None,
@ -167,8 +167,8 @@ INSERT DATA {
maxCardinality: -1,
minCardinality: 0,
extra: None,
},
OrmSchemaPredicate {
}),
Arc::new(OrmSchemaPredicate {
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::shape,
literals: None,
@ -182,8 +182,8 @@ INSERT DATA {
maxCardinality: 1,
minCardinality: 1,
extra: None,
},
OrmSchemaPredicate {
}),
Arc::new(OrmSchemaPredicate {
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::shape,
literals: None,
@ -197,8 +197,8 @@ INSERT DATA {
maxCardinality: -1,
minCardinality: 0,
extra: None,
},
OrmSchemaPredicate {
}),
Arc::new(OrmSchemaPredicate {
dataTypes: vec![
OrmSchemaDataType {
valType: OrmSchemaLiteralType::string,
@ -216,14 +216,14 @@ INSERT DATA {
maxCardinality: 1,
minCardinality: 1,
extra: None,
},
OrmSchemaPredicate {
}),
Arc::new(OrmSchemaPredicate {
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::literal,
literals: Some(OrmSchemaLiterals::StrArray(vec![
"lit1".to_string(),
"lit2".to_string(),
])),
literals: Some(vec![
BasicType::Str("lit1".to_string()),
BasicType::Str("lit2".to_string()),
]),
shape: None,
}],
iri: "http://example.org/lit1Or2".to_string(),
@ -231,17 +231,17 @@ INSERT DATA {
maxCardinality: 1,
minCardinality: 1,
extra: None,
},
}),
],
},
}),
);
schema.insert(
"http://example.org/TestObject||http://example.org/objectValue".to_string(),
OrmSchemaShape {
Arc::new(OrmSchemaShape {
iri: "http://example.org/TestObject||http://example.org/objectValue".to_string(),
predicates: vec![
OrmSchemaPredicate {
Arc::new(OrmSchemaPredicate {
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::string,
literals: None,
@ -252,8 +252,8 @@ INSERT DATA {
maxCardinality: 1,
minCardinality: 1,
extra: None,
},
OrmSchemaPredicate {
}),
Arc::new(OrmSchemaPredicate {
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::number,
literals: None,
@ -264,8 +264,8 @@ INSERT DATA {
maxCardinality: 1,
minCardinality: 1,
extra: None,
},
OrmSchemaPredicate {
}),
Arc::new(OrmSchemaPredicate {
dataTypes: vec![OrmSchemaDataType {
valType: OrmSchemaLiteralType::number,
literals: None,
@ -276,9 +276,9 @@ INSERT DATA {
maxCardinality: -1,
minCardinality: 0,
extra: None,
},
}),
],
},
}),
);
let shape_type = OrmShapeType {
@ -368,21 +368,21 @@ INSERT DATA {
let mut schema = HashMap::new();
schema.insert(
"http://example.org/TestObject".to_string(),
OrmSchemaShape {
Arc::new(OrmSchemaShape {
iri: "http://example.org/TestObject".to_string(),
predicates: vec![
OrmSchemaPredicate {
Arc::new(OrmSchemaPredicate {
iri: "http://example.org/prop1".to_string(),
minCardinality: 1,
..Default::default()
},
OrmSchemaPredicate {
}),
Arc::new(OrmSchemaPredicate {
iri: "http://example.org/prop2".to_string(),
minCardinality: 1,
..Default::default()
},
}),
],
},
}),
);
let shape_type = OrmShapeType {
schema,
@ -431,21 +431,21 @@ INSERT DATA {
let mut schema = HashMap::new();
schema.insert(
"http://example.org/TestObject".to_string(),
OrmSchemaShape {
Arc::new(OrmSchemaShape {
iri: "http://example.org/TestObject".to_string(),
predicates: vec![
OrmSchemaPredicate {
Arc::new(OrmSchemaPredicate {
iri: "http://example.org/prop1".to_string(),
minCardinality: 1,
..Default::default()
},
OrmSchemaPredicate {
}),
Arc::new(OrmSchemaPredicate {
iri: "http://example.org/prop2".to_string(),
minCardinality: 0, // Optional
..Default::default()
},
}),
],
},
}),
);
let shape_type = OrmShapeType {
schema,
@ -495,20 +495,20 @@ INSERT DATA {
let mut schema = HashMap::new();
schema.insert(
"http://example.org/Person".to_string(),
OrmSchemaShape {
Arc::new(OrmSchemaShape {
iri: "http://example.org/Person".to_string(),
predicates: vec![
OrmSchemaPredicate {
Arc::new(OrmSchemaPredicate {
iri: "http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(),
minCardinality: 1,
..Default::default()
},
OrmSchemaPredicate {
}),
Arc::new(OrmSchemaPredicate {
iri: "http://example.org/name".to_string(),
minCardinality: 1,
..Default::default()
},
OrmSchemaPredicate {
}),
Arc::new(OrmSchemaPredicate {
iri: "http://example.org/knows".to_string(),
minCardinality: 0,
maxCardinality: -1,
@ -518,9 +518,9 @@ INSERT DATA {
literals: None,
}],
..Default::default()
},
}),
],
},
}),
);
let shape_type = OrmShapeType {
schema,

@ -54,12 +54,12 @@ pub struct OrmDiffOp {
pub type OrmDiff = Vec<OrmDiffOp>;
pub type OrmSchema = HashMap<String, OrmSchemaShape>;
pub type OrmSchema = HashMap<String, Arc<OrmSchemaShape>>;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OrmSchemaShape {
pub iri: String,
pub predicates: Vec<OrmSchemaPredicate>,
pub predicates: Vec<Arc<OrmSchemaPredicate>>,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
@ -105,10 +105,10 @@ pub struct OrmSchemaPredicate {
pub struct OrmTrackedSubject {
/// The known predicates (only those relevant to the shape).
/// If there are no triples with a predicate, they are discarded
pub tracked_predicates: HashMap<String, OrmTrackedPredicate>,
pub tracked_predicates: HashMap<String, Arc<OrmTrackedPredicate>>,
/// If this is a nested subject, this records the parents
/// and if they are currently tracking this subject.
pub parents: HashMap<String, OrmTrackedSubject>,
pub parents: HashMap<String, Weak<OrmTrackedSubject>>,
/// Validity. When untracked, triple updates are not processed here.
pub valid: OrmTrackedSubjectValidity,
pub subject_iri: String,
@ -138,14 +138,14 @@ pub struct OrmTrackedPredicate {
// Used only for tracking construction of new objects and diffs
// in parallel to modifying the tracked objects and predicates.
pub struct OrmTrackedSubjectChange<'a> {
pub struct OrmTrackedSubjectChange {
pub subject_iri: String,
/// Predicates that were changed.
pub predicates: HashMap<String, OrmTrackedPredicateChanges<'a>>,
pub predicates: HashMap<String, OrmTrackedPredicateChanges>,
}
pub struct OrmTrackedPredicateChanges<'a> {
pub struct OrmTrackedPredicateChanges {
/// The tracked predicate for which those changes were recorded.
pub tracked_predicate: &'a OrmTrackedPredicate,
pub tracked_predicate: Weak<OrmTrackedPredicate>,
pub values_added: Vec<BasicType>,
pub values_removed: Vec<BasicType>,
}
@ -164,7 +164,7 @@ pub struct OrmSubscription {
pub session_id: u64,
pub nuri: NuriV0,
pub sender: Sender<AppResponse>,
pub tracked_subjects: HashMap<SubjectIri, HashMap<ShapeIri, OrmTrackedSubject>>,
pub tracked_subjects: HashMap<SubjectIri, HashMap<ShapeIri, Arc<OrmTrackedSubject>>>,
}
type ShapeIri = String;
type SubjectIri = String;

@ -29,7 +29,7 @@ use ng_net::app_protocol::*;
use crate::verifier::Verifier;
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
pub trait CommitVerifier {
async fn verify(
&self,
@ -288,7 +288,7 @@ impl CommitVerifier for AddBranch {
Ok(())
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for Repository {
async fn verify(
&self,
@ -302,7 +302,7 @@ impl CommitVerifier for Repository {
Ok(())
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for StoreUpdate {
async fn verify(
&self,
@ -315,7 +315,7 @@ impl CommitVerifier for StoreUpdate {
verifier.new_store_from_update(self)
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for AddInboxCap {
async fn verify(
&self,
@ -330,7 +330,7 @@ impl CommitVerifier for AddInboxCap {
}
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for AddSignerCap {
async fn verify(
&self,
@ -345,7 +345,7 @@ impl CommitVerifier for AddSignerCap {
}
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for AddMember {
#[allow(unused_variables)]
async fn verify(
@ -359,7 +359,7 @@ impl CommitVerifier for AddMember {
Ok(())
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for RemoveMember {
#[allow(unused_variables)]
async fn verify(
@ -373,7 +373,7 @@ impl CommitVerifier for RemoveMember {
Ok(())
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for AddPermission {
#[allow(unused_variables)]
async fn verify(
@ -387,7 +387,7 @@ impl CommitVerifier for AddPermission {
Ok(())
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for RemovePermission {
#[allow(unused_variables)]
async fn verify(
@ -401,7 +401,7 @@ impl CommitVerifier for RemovePermission {
Ok(())
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for RemoveBranch {
#[allow(unused_variables)]
async fn verify(
@ -415,7 +415,7 @@ impl CommitVerifier for RemoveBranch {
Ok(())
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for AddName {
#[allow(unused_variables)]
async fn verify(
@ -429,7 +429,7 @@ impl CommitVerifier for AddName {
Ok(())
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for RemoveName {
#[allow(unused_variables)]
async fn verify(
@ -443,7 +443,7 @@ impl CommitVerifier for RemoveName {
Ok(())
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for () {
#[allow(unused_variables)]
async fn verify(
@ -457,7 +457,7 @@ impl CommitVerifier for () {
Ok(())
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for Snapshot {
#[allow(unused_variables)]
async fn verify(
@ -484,7 +484,7 @@ impl CommitVerifier for Snapshot {
Ok(())
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for AddFile {
#[allow(unused_variables)]
async fn verify(
@ -529,7 +529,7 @@ impl CommitVerifier for AddFile {
}
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for RemoveFile {
#[allow(unused_variables)]
async fn verify(
@ -543,7 +543,7 @@ impl CommitVerifier for RemoveFile {
Ok(())
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for Compact {
#[allow(unused_variables)]
async fn verify(
@ -557,7 +557,7 @@ impl CommitVerifier for Compact {
Ok(())
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for AsyncSignature {
#[allow(unused_variables)]
async fn verify(
@ -605,7 +605,7 @@ impl CommitVerifier for AsyncSignature {
}
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for RootCapRefresh {
#[allow(unused_variables)]
async fn verify(
@ -619,7 +619,7 @@ impl CommitVerifier for RootCapRefresh {
Ok(())
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for BranchCapRefresh {
#[allow(unused_variables)]
async fn verify(
@ -633,7 +633,7 @@ impl CommitVerifier for BranchCapRefresh {
Ok(())
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for AddRepo {
#[allow(unused_variables)]
async fn verify(
@ -656,7 +656,7 @@ impl CommitVerifier for AddRepo {
Ok(())
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for RemoveRepo {
#[allow(unused_variables)]
async fn verify(
@ -670,7 +670,7 @@ impl CommitVerifier for RemoveRepo {
Ok(())
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for AddLink {
#[allow(unused_variables)]
async fn verify(
@ -684,7 +684,7 @@ impl CommitVerifier for AddLink {
Ok(())
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for RemoveLink {
#[allow(unused_variables)]
async fn verify(
@ -698,7 +698,7 @@ impl CommitVerifier for RemoveLink {
Ok(())
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for RemoveSignerCap {
#[allow(unused_variables)]
async fn verify(
@ -712,7 +712,7 @@ impl CommitVerifier for RemoveSignerCap {
Ok(())
}
}
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl CommitVerifier for WalletUpdate {
#[allow(unused_variables)]
async fn verify(

@ -8,22 +8,16 @@
// according to those terms.
use futures::channel::mpsc;
use ng_net::orm::OrmSubscription;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use futures::SinkExt;
use lazy_static::lazy_static;
use ng_net::orm::BasicType;
pub use ng_net::orm::OrmDiff;
use ng_net::orm::OrmSchemaLiteralType;
pub use ng_net::orm::OrmShapeType;
use ng_net::orm::OrmTrackedSubject;
use ng_net::orm::OrmTrackedSubjectChange;
use ng_net::orm::OrmTrackedSubjectValidity;
use ng_net::orm::{OrmSchemaDataType, OrmSchemaShape};
use ng_net::utils::Receiver;
use ng_net::{app_protocol::*, orm::OrmSchema};
use ng_net::{app_protocol::*, orm::*};
pub use ng_net::orm::{OrmDiff, OrmShapeType};
use ng_oxigraph::oxigraph::sparql::{Query, QueryResults};
use ng_oxigraph::oxrdf::Triple;
use ng_repo::errors::NgError;
@ -42,7 +36,7 @@ type SubjectIri = String;
// Structure to store changes in. By shape iri > subject iri > OrmTrackedSubjectChange
// **NOTE**: In comparison to OrmSubscription.tracked_subjects, the outer hashmap's keys are shape IRIs.
// (shape IRI -> (subject IRI -> OrmTrackedSubjectChange))
type OrmChanges<'a> = HashMap<ShapeIri, HashMap<SubjectIri, OrmTrackedSubjectChange<'a>>>;
type OrmChanges = HashMap<ShapeIri, HashMap<SubjectIri, OrmTrackedSubjectChange>>;
impl Verifier {
pub fn query_sparql_construct(
@ -83,44 +77,45 @@ impl Verifier {
}
}
fn process_changes_for_subscription<'a>(
fn process_changes_for_subscription(
self: &mut Self,
orm_subscription: &'a mut OrmSubscription,
triples_added: &'a [Triple],
triples_removed: &'a [Triple],
nuri: &String,
) -> Result<OrmChanges<'a>, NgError> {
let tracked_subjects = &mut orm_subscription.tracked_subjects;
nuri: &NuriV0,
triples_added: &[Triple],
triples_removed: &[Triple]
) -> Result<OrmChanges, NgError> {
let orm_subscription = self.orm_subscriptions.get(nuri).unwrap();
//let tracked_subjects = orm_subscription.tracked_subjects;
let schema = &orm_subscription.shape_type.schema;
let root_shape = schema
.get(&orm_subscription.shape_type.shape)
.ok_or(VerifierError::InvalidOrmSchema)?;
return self.process_changes_for_subject_and_shape(
root_shape,
tracked_subjects,
schema,
let mut orm_changes = HashMap::new();
self.process_changes_for_subject_and_shape(
root_shape.clone(),
triples_added,
triples_removed,
nuri,
None,
);
&mut orm_changes,
)?;
Ok(orm_changes)
}
fn process_changes_for_subject_and_shape<'a>(
fn process_changes_for_subject_and_shape(
self: &mut Self,
root_shape: &OrmSchemaShape,
tracked_subjects: &mut HashMap<String, HashMap<String, OrmTrackedSubject>>,
schema: &OrmSchema,
triples_added: &'a [Triple],
triples_removed: &'a [Triple],
nuri: &String,
mut orm_changes: Option<OrmChanges<'a>>,
) -> Result<OrmChanges<'a>, NgError> {
let mut orm_changes: OrmChanges<'a> = orm_changes.take().unwrap_or_else(HashMap::new);
root_shape: Arc<OrmSchemaShape>,
triples_added: &[Triple],
triples_removed: &[Triple],
nuri: &NuriV0,
orm_changes: &mut OrmChanges,
) -> Result<(), NgError> {
let nuri_repo = nuri.repo();
//let orm_subscription = self.orm_subscriptions.get(nuri).unwrap();
//let tracked_subjects = orm_subscription.tracked_subjects;
//let schema = &orm_subscription.shape_type.schema;
// First in, last out stack to keep track of objects to validate (nested objects first). Strings are object IRIs.
let mut shape_validation_queue: Vec<(&OrmSchemaShape, Vec<String>)> = vec![];
let mut shape_validation_queue: Vec<(Arc<OrmSchemaShape>, Vec<String>)> = vec![];
// Add root shape for first validation run.
shape_validation_queue.push((root_shape, vec![]));
@ -129,9 +124,9 @@ impl Verifier {
while let Some((shape, objects_to_validate)) = shape_validation_queue.pop() {
// Collect triples relevant for validation.
let added_triples_by_subject =
group_by_subject_for_shape(shape, triples_added, &objects_to_validate);
group_by_subject_for_shape(&shape, triples_added, &objects_to_validate);
let removed_triples_by_subject =
group_by_subject_for_shape(shape, triples_removed, &objects_to_validate);
group_by_subject_for_shape(&shape, triples_removed, &objects_to_validate);
let all_modified_subjects: HashSet<&SubjectIri> = added_triples_by_subject
.keys()
.chain(removed_triples_by_subject.keys())
@ -165,27 +160,32 @@ impl Verifier {
// Apply all triples for that subject to the tracked (shape, subject) pair.
// Record the changes.
{
let orm_subscription = self.orm_subscriptions.get_mut(nuri).unwrap();
if let Err(e) = add_remove_triples_mut(
shape,
shape.clone(),
subject_iri,
triples_added_for_subj,
triples_removed_for_subj,
tracked_subjects,
&mut orm_subscription.tracked_subjects,
change,
) {
log_err!("apply_changes_from_triples add/remove error: {:?}", e);
}
let tracked_subject_opt = tracked_subjects
.get_mut(subject_iri)
.and_then(|m| m.get_mut(&shape.iri));
let validity = {
let tracked_subject_opt = orm_subscription.tracked_subjects
.get(subject_iri)
.and_then(|m| m.get(&shape.iri));
let Some(tracked_subject) = tracked_subject_opt else {
continue;
}; // skip if missing
tracked_subject.valid.clone()
};
// Validate the subject.
let need_eval =
update_subject_validity(change, shape, tracked_subjects, tracked_subject.valid);
update_subject_validity(change, &shape, &mut orm_subscription.tracked_subjects, validity);
// We add the need_eval to be processed next after loop.
for (iri, schema_shape, needs_refetch) in need_eval {
@ -196,6 +196,10 @@ impl Verifier {
.push((iri.clone(), needs_refetch));
}
}
}
let orm_subscription = self.orm_subscriptions.get(nuri).unwrap();
let schema = &orm_subscription.shape_type.schema;
// Now, we fetch all un-fetched subjects for re-evaluation.
for (shape_iri, objects_to_eval) in &nested_objects_to_eval {
@ -211,16 +215,14 @@ impl Verifier {
// Create sparql query
let shape_query =
shape_type_to_sparql(&schema, &shape_iri, Some(objects_to_fetch))?;
let new_triples = self.query_sparql_construct(shape_query, Some(nuri.clone()))?;
orm_changes = self.process_changes_for_subject_and_shape(
shape,
tracked_subjects,
schema,
new_triples,
&vec![],
nuri,
Some(orm_changes),
)?;
let new_triples = self.query_sparql_construct(shape_query, Some(nuri_repo.clone()))?;
// self.process_changes_for_subject_and_shape(
// shape.clone(),
// &new_triples,
// &vec![],
// nuri,
// orm_changes,
// )?;
}
// Now, add all subjects to the queue that did not need a fetch.
for (shape_iri, objects_to_eval) in &nested_objects_to_eval {
@ -233,33 +235,32 @@ impl Verifier {
.get(shape_iri)
.ok_or(VerifierError::InvalidOrmSchema)?;
shape_validation_queue.push((shape, objects_to_fetch));
shape_validation_queue.push((shape.clone(), objects_to_fetch));
}
}
Ok(orm_changes)
Ok(())
}
fn apply_triple_changes<'a>(
&'a mut self,
triples_added: &'a [Triple],
triples_removed: &'a [Triple],
fn apply_triple_changes(
&mut self,
triples_added: &[Triple],
triples_removed: &[Triple],
only_for_nuri: Option<&NuriV0>,
only_for_session_id: Option<u64>,
) -> Result<OrmChanges<'a>, NgError> {
) -> Result<OrmChanges, NgError> {
// If we have a specific session, handle only that subscription.
if let Some(session_id) = only_for_session_id {
if let Some((nuri, sub)) = self
.orm_subscriptions
.iter_mut()
.iter()
.find(|(_, s)| s.session_id == session_id)
{
// TODO: Is repo correct?
return self.process_changes_for_subscription(
sub,
&nuri.clone(),
triples_added,
triples_removed,
&nuri.repo(),
triples_removed
);
} else {
return Ok(HashMap::new());
@ -268,17 +269,19 @@ impl Verifier {
// Otherwise, iterate all (optionally filter by nuri) and merge.
let mut merged: OrmChanges = HashMap::new();
for (nuri, sub) in self.orm_subscriptions.iter_mut() {
for nuri in self.orm_subscriptions.iter().filter(|(nuri,_)|{
if let Some(filter_nuri) = only_for_nuri {
if nuri != filter_nuri {
continue;
if *nuri != filter_nuri {
return false;
}
}
true
}).map(|(nuri,_)| nuri.clone()).collect::<Vec<_>>() {
let changes = self.process_changes_for_subscription(
sub,
&nuri,
triples_added,
triples_removed,
&nuri.repo(), // TODO: Is this correct?
)?;
for (shape_iri, subj_map) in changes {
merged
@ -294,7 +297,7 @@ impl Verifier {
change: &OrmTrackedSubjectChange,
changes: &OrmChanges,
shape: &OrmSchemaShape,
tracked_subjects: &HashMap<String, HashMap<String, OrmTrackedSubject>>,
tracked_subjects: &HashMap<String, HashMap<String, Arc<OrmTrackedSubject>>>,
) -> Value {
let mut orm_obj = json!({"id": change.subject_iri});
let orm_obj_map = orm_obj.as_object_mut().unwrap();
@ -413,17 +416,22 @@ impl Verifier {
fn create_orm_from_triples(
&mut self,
orm_subscription: &OrmSubscription,
nuri: &NuriV0,
triples: &[Triple],
) -> Result<Value, NgError> {
let session_id = {
let orm_subscription = self.orm_subscriptions.get(nuri).unwrap();
orm_subscription.session_id
};
let changes: OrmChanges =
self.apply_triple_changes(triples, &[], None, Some(session_id))?;
let orm_subscription = self.orm_subscriptions.get(nuri).unwrap();
let root_shape_iri = &orm_subscription.shape_type.shape;
let schema = &orm_subscription.shape_type.schema;
let root_shape = schema
.get(root_shape_iri)
.ok_or(VerifierError::InvalidOrmSchema)?;
let changes: OrmChanges =
self.apply_triple_changes(triples, &[], None, Some(orm_subscription.session_id))?;
let Some(_root_changes) = changes.get(&root_shape.iri).map(|s| s.values()) else {
return Ok(Value::Array(vec![]));
};
@ -455,7 +463,7 @@ impl Verifier {
return Ok(return_vals);
}
pub(crate) async fn orm_update(&mut self, scope: &NuriV0, patch: GraphTransaction) {}
pub(crate) async fn orm_update(&mut self, scope: &NuriV0, patch: GraphQuadsPatch) {}
pub(crate) async fn orm_frontend_update(
&mut self,
@ -487,14 +495,10 @@ impl Verifier {
}
pub(crate) fn clean_orm_subscriptions(&mut self) {
let subscriptions = self.orm_subscriptions;
// TODO: Ownership issues.
// for (nuri, subscription) in subscriptions {
// if subscription.sender.is_closed() {
// subscriptions.remove(nuri);
// }
// }
self.orm_subscriptions.retain(|_,subscription|
!subscription.sender.is_closed()
);
}
pub(crate) async fn start_orm(
@ -526,8 +530,7 @@ impl Verifier {
let shape_triples = self.query_sparql_construct(shape_query, Some(nuri.repo()))?;
// Create objects from queried triples.
let subscription_ref = self.orm_subscriptions.get(nuri).unwrap();
let _orm_objects = self.create_orm_from_triples(subscription_ref, &shape_triples)?;
let _orm_objects = self.create_orm_from_triples(nuri, &shape_triples)?;
// TODO integrate response

@ -9,20 +9,16 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Weak;
use std::sync::Arc;
use ng_net::orm::BasicType;
use ng_net::orm::OrmSchemaLiteralType;
use ng_net::orm::OrmSchemaShape;
use ng_net::orm::OrmTrackedPredicate;
use ng_net::orm::OrmTrackedSubject;
use ng_net::orm::OrmTrackedSubjectChange;
use ng_net::orm::OrmTrackedSubjectValidity;
use ng_net::orm::*;
use ng_oxigraph::oxrdf::Subject;
use ng_oxigraph::oxrdf::Triple;
use ng_repo::errors::NgError;
pub fn group_by_subject_for_shape<'a>(
shape: &'a OrmSchemaShape,
shape: &OrmSchemaShape,
triples: &'a [Triple],
allowed_subjects: &[String],
) -> HashMap<String, Vec<&'a Triple>> {
@ -30,7 +26,7 @@ pub fn group_by_subject_for_shape<'a>(
let allowed_preds_set: HashSet<&str> = shape
.predicates
.iter()
.map(|OrmSchemaDataType, OrmSchemaPredicatep| p.iri.as_str())
.map(|p| p.iri.as_str())
.collect();
let allowed_subject_set: HashSet<&str> = allowed_subjects.iter().map(|s| s.as_str()).collect();
for triple in triples {
@ -59,33 +55,37 @@ pub fn group_by_subject_for_shape<'a>(
/// Returns predicates to nested objects that were touched and need processing.
/// Assumes all triples have same subject.
pub fn add_remove_triples_mut(
shape: &OrmSchemaShape,
shape: Arc<OrmSchemaShape>,
subject_iri: &str,
triples_added: &[&Triple],
triples_removed: &[&Triple],
tracked_subjects: &mut HashMap<String, HashMap<String, OrmTrackedSubject>>,
tracked_subjects: &mut HashMap<String, HashMap<String, Arc<OrmTrackedSubject>>>,
subject_changes: &mut OrmTrackedSubjectChange,
) -> Result<(), NgError> {
let get_or_create_tracked_subject =
|subject_iri: &str,
shape_iri: &str,
tracked_subjects: &mut HashMap<String, HashMap<String, OrmTrackedSubject>>| {
fn get_or_create_tracked_subject<'a> (
subject_iri: &str,
shape: &Arc<OrmSchemaShape>,
tracked_subjects: &'a mut HashMap<String, HashMap<String, Arc<OrmTrackedSubject>>>)
-> (&'a mut OrmTrackedSubject, Weak<OrmTrackedSubject>)
{
let tracked_shapes_for_subject = tracked_subjects
.entry(subject_iri.to_string())
.or_insert_with(HashMap::new);
tracked_shapes_for_subject
.entry(shape_iri.to_string())
.or_insert_with(|| OrmTrackedSubject {
let subject = tracked_shapes_for_subject
.entry(shape.iri.clone())
.or_insert_with(|| Arc::new(OrmTrackedSubject {
tracked_predicates: HashMap::new(),
parents: HashMap::new(),
valid: ng_net::orm::OrmTrackedSubjectValidity::Pending,
subject_iri: subject_iri.to_string(),
shape,
})
};
shape: shape.clone(),
}));
let weak = Arc::downgrade(&subject);
(Arc::get_mut(subject).unwrap(), weak)
}
let tracked_subject = get_or_create_tracked_subject(subject_iri, &shape.iri, tracked_subjects);
let (_, tracked_subject_weak) = get_or_create_tracked_subject(subject_iri, &shape, tracked_subjects);
// Process added triples.
// For each triple, check if it matches the shape.
@ -98,26 +98,30 @@ pub fn add_remove_triples_mut(
}
// Predicate schema constraint matches this triple.
let mut upgraded = tracked_subject_weak.upgrade().unwrap();
let tracked_subject = Arc::get_mut(&mut upgraded).unwrap();
// Add tracked predicate or increase cardinality
let tracked_predicate = tracked_subject
let _tracked_predicate = tracked_subject
.tracked_predicates
.entry(predicate_schema.iri.to_string())
.or_insert_with(|| OrmTrackedPredicate {
.or_insert_with(|| Arc::new(OrmTrackedPredicate {
current_cardinality: 0,
schema: predicate_schema,
schema: predicate_schema.clone(),
tracked_children: Vec::new(),
current_literals: None,
});
}));
let tracked_predicate_weak = Arc::downgrade(&_tracked_predicate);
let tracked_predicate = Arc::get_mut(_tracked_predicate).unwrap();
tracked_predicate.current_cardinality += 1;
let obj_term = oxrdf_term_to_orm_basic_type(&triple.object);
// Keep track of the changed values too.
let pred_changes = subject_changes
let pred_changes: &mut OrmTrackedPredicateChanges = subject_changes
.predicates
.entry(predicate_schema.iri.clone())
.or_insert_with(|| OrmTrackedPredicateChanges {
tracked_predicate: tracked_predicate, // reference remains inside lifetime of this call
tracked_predicate: tracked_predicate_weak.clone(), // reference remains inside lifetime of this call
values_added: Vec::new(),
values_removed: Vec::new(),
});
@ -132,33 +136,35 @@ pub fn add_remove_triples_mut(
.any(|dt| dt.valType == OrmSchemaLiteralType::literal)
{
match &mut tracked_predicate.current_literals {
Some(lits) => lits.push(obj_term),
Some(lits) => lits.push(obj_term.clone()),
None => {
tracked_predicate.current_literals = Some(vec![obj_term]);
tracked_predicate.current_literals = Some(vec![obj_term.clone()]);
}
}
}
// If predicate is of type shape, register (parent -> child) links so that
// nested subjects can later be (lazily) fetched / validated.
// FIXME : shape_iri is never used
for shape_iri in predicate_schema
.dataTypes
.iter()
.filter(|dt| dt.valType == OrmSchemaLiteralType::shape)
.flat_map(|dt| dt.shape)
.filter_map(|dt| if dt.valType == OrmSchemaLiteralType::shape {dt.shape.clone()} else {None} )
{
if let BasicType::Str(obj_iri) = obj_term {
if let BasicType::Str(obj_iri) = &obj_term {
// Get or create object's tracked subject struct.
let tracked_child =
get_or_create_tracked_subject(triple.predicate.as_string(), &shape_iri);
let (tracked_child, tracked_child_weak) =
get_or_create_tracked_subject(triple.predicate.as_string(), &shape, tracked_subjects);
// Add self to parent (set tracked to true, preliminary).
tracked_child.parents.insert(obj_iri, tracked_child);
tracked_child.parents.insert(obj_iri.clone(), tracked_child_weak.clone());
// Add link to children
let mut upgraded = tracked_predicate_weak.upgrade().unwrap();
let tracked_predicate = Arc::get_mut(&mut upgraded).unwrap();
tracked_predicate
.tracked_children
.push(unsafe { Weak::from_raw(tracked_child) });
.push(tracked_child_weak);
}
}
}
@ -171,10 +177,11 @@ pub fn add_remove_triples_mut(
let tracked_predicate_opt = tracked_subjects
.get_mut(subject_iri)
.and_then(|tss| tss.get_mut(&shape.iri))
.and_then(|ts| ts.tracked_predicates.get_mut(pred_iri));
let Some(tracked_predicate) = tracked_predicate_opt else {
.and_then(|ts| Arc::get_mut(ts).unwrap().tracked_predicates.get_mut(pred_iri));
let Some(tracked_predicate_arc) = tracked_predicate_opt else {
continue;
};
let tracked_predicate = Arc::get_mut(tracked_predicate_arc).unwrap();
// The cardinality might become -1 or 0. We will remove them from the tracked predicates during validation.
tracked_predicate.current_cardinality =
@ -211,8 +218,7 @@ pub fn add_remove_triples_mut(
.schema
.dataTypes
.iter()
.filter(|dt| dt.valType == OrmSchemaLiteralType::shape)
.flat_map(|dt| dt.shape)
.filter_map(|dt| if dt.valType == OrmSchemaLiteralType::shape {dt.shape.clone()} else {None} )
{
// Nested shape removal logic disabled (see note above).
}
@ -224,26 +230,31 @@ pub fn add_remove_triples_mut(
/// Check the validity of a subject and update affecting tracked subjects' validity.
/// Might return nested objects that need to be validated.
/// Assumes all triples to be of same subject.
pub fn update_subject_validity<'a>(
s_change: &'a OrmTrackedSubjectChange<'a>,
pub fn update_subject_validity(
s_change: &OrmTrackedSubjectChange,
shape: &OrmSchemaShape,
tracked_subjects: &HashMap<String, HashMap<String, OrmTrackedSubject<'a>>>,
tracked_subjects: &mut HashMap<String, HashMap<String, Arc<OrmTrackedSubject>>>,
previous_validity: OrmTrackedSubjectValidity,
) -> Vec<(String, String, bool)> {
let Some(tracked_shapes) = tracked_subjects.get_mut(subject_iri) else {
let Some(tracked_shapes) = tracked_subjects.get_mut(&s_change.subject_iri) else {
return vec![];
};
let Some(tracked_subject) = tracked_shapes.get_mut(&shape.iri) else {
return vec![];
};
let tracked_subject = Arc::get_mut(tracked_subject).unwrap();
// Keep track of objects that need to be validated against a shape to fetch and validate.
let mut need_evaluation: Vec<(String, String, bool)> = vec![];
// Check 1) Check if we need to fetch this object or all parents are untracked.
if tracked_subject.parents.len() != 0 {
let no_parents_tracking = tracked_subject.parents.values().all(|parent| {
parent.valid == OrmTrackedSubjectValidity::Untracked
|| parent.valid == OrmTrackedSubjectValidity::Invalid
match parent.upgrade() {
Some(subject) =>
subject.valid == OrmTrackedSubjectValidity::Untracked
|| subject.valid == OrmTrackedSubjectValidity::Invalid,
None => true
}
});
if no_parents_tracking {
@ -256,7 +267,7 @@ pub fn update_subject_validity<'a>(
// We need to fetch the subject's current state:
// We have new parents but were previously not recording changes.
return vec![(s_change.subject_iri,)];
return vec![(s_change.subject_iri.clone(), shape.iri.clone(), true)];
// TODO
}
}
@ -287,10 +298,10 @@ pub fn update_subject_validity<'a>(
// Check 4) Validate subject against each predicate in shape.
for p_schema in shape.predicates.iter() {
let p_change = s_change.predicates.get(&p_schema.iri);
let tracked_pred = p_change.map(|pc| pc.tracked_predicate);
let tracked_pred = p_change.and_then(|pc| pc.tracked_predicate.upgrade());
let count =
tracked_pred.map_or_else(|| 0, |tp: &OrmTrackedPredicate| tp.current_cardinality);
tracked_pred.as_ref().map_or_else(|| 0, |tp| tp.current_cardinality);
// Check 4.1) Cardinality
if count < p_schema.minCardinality {
@ -326,7 +337,7 @@ pub fn update_subject_validity<'a>(
// between required and given values mismatches.
if !p_schema.extra.unwrap_or(false)
&& ((required_literals.len() as i32)
!= tracked_pred.map_or(0, |p| p.current_cardinality))
!= tracked_pred.as_ref().map_or(0, |p| p.current_cardinality))
{
return false;
}
@ -334,11 +345,9 @@ pub fn update_subject_validity<'a>(
// Check that each required literal is present.
for required_literal in required_literals {
// Is tracked predicate present?
if !tracked_pred
.iter()
.flat_map(|tp| &tp.current_literals)
.flatten()
.any(|literal| *literal == *required_literal)
if !tracked_pred.as_ref().map_or(false,
|t|t.current_literals.as_ref().map_or(false,
|tt|tt.iter().any(|literal| *literal == *required_literal)))
{
return false;
}
@ -356,13 +365,12 @@ pub fn update_subject_validity<'a>(
.any(|dt| dt.valType == OrmSchemaLiteralType::shape)
{
// If we have a nested shape, we need to check if the nested objects are tracked and valid.
let tracked_children = tracked_pred.as_ref().map(|tp|
tp.tracked_children.iter().filter_map(|weak_tc| weak_tc.upgrade()).collect::<Vec<_>>()
);
// First, Count valid, invalid, unknowns, and untracked
let counts = tracked_pred
.iter()
.flat_map(|tp| tp.tracked_children)
.map(|tc| {
tc.upgrade().map(|tc| {
let counts = tracked_children.as_ref()
.map_or((0,0,0,0),|children| children.iter().map(|tc| {
if tc.valid == OrmTrackedSubjectValidity::Valid {
(1, 0, 0, 0)
} else if tc.valid == OrmTrackedSubjectValidity::Invalid {
@ -375,11 +383,9 @@ pub fn update_subject_validity<'a>(
(0, 0, 0, 0)
}
})
})
.flatten()
.fold((0, 0, 0, 0), |(v1, i1, u1, ut1), o| {
(v1 + o.0, i1 + o.1, u1 + o.2, ut1 + o.3)
});
}));
if counts.1 > 0 && p_schema.extra != Some(true) {
// If we have at least one invalid nested object and no extra allowed, invalid.
@ -393,11 +399,10 @@ pub fn update_subject_validity<'a>(
// If we have untracked nested objects, we need to fetch them and validate.
set_validity(&mut new_validity, OrmTrackedSubjectValidity::Pending);
// After that we need to reevaluate this (subject,shape) again.
need_evaluation.push((subject_iri.to_string(), shape.iri.clone(), false));
need_evaluation.push((s_change.subject_iri.to_string(), shape.iri.clone(), false));
// Also schedule untracked children for fetching and validation.
if let Some(tp) = tracked_pred {
for weak_child in &tp.tracked_children {
if let Some(child) = weak_child.upgrade() {
tracked_children.as_ref().map(|children|
for child in children {
if child.valid == OrmTrackedSubjectValidity::Untracked {
need_evaluation.push((
child.subject_iri.clone(),
@ -406,15 +411,13 @@ pub fn update_subject_validity<'a>(
));
}
}
}
}
);
} else if counts.2 > 0 {
// If we have unknown nested objects, we need to wait for their evaluation.
set_validity(&mut new_validity, OrmTrackedSubjectValidity::Pending);
// Schedule unknown children (NotEvaluated) for re-evaluation without fetch.
if let Some(tp) = tracked_pred {
for weak_child in &tp.tracked_children {
if let Some(child) = weak_child.upgrade() {
tracked_children.as_ref().map(|children|
for child in children {
if child.valid == OrmTrackedSubjectValidity::Pending {
need_evaluation.push((
child.subject_iri.clone(),
@ -423,8 +426,7 @@ pub fn update_subject_validity<'a>(
));
}
}
}
}
);
} else {
// All nested objects are valid and cardinality is correct.
// We are valid with this predicate.
@ -435,7 +437,7 @@ pub fn update_subject_validity<'a>(
let allowed_types: Vec<&OrmSchemaLiteralType> =
p_schema.dataTypes.iter().map(|dt| &dt.valType).collect();
// For each new value, check that data type is in allowed_types.
for val_added in p_change.iter().map(|pc| pc.values_added).flatten() {
for val_added in p_change.iter().map(|pc| &pc.values_added).flatten() {
let matches = match val_added {
BasicType::Bool(_) => allowed_types
.iter()
@ -475,7 +477,7 @@ pub fn update_subject_validity<'a>(
{
// If this subject became valid, we need to refetch this subject;
// We fetch
need_evaluation.insert(0, (s_change.subject_iri, shape.iri.clone(), true));
need_evaluation.insert(0, (s_change.subject_iri.clone(), shape.iri.clone(), true));
}
// If validity changed, parents need to be re-evaluated.
@ -485,7 +487,7 @@ pub fn update_subject_validity<'a>(
return tracked_subject
.parents
.values()
.map(|parent| (&parent.subject_iri, parent.shape, false))
.filter_map(|parent| parent.upgrade().map(|parent| {(parent.subject_iri.clone(), parent.shape.iri.clone(), false)}) )
// Add `need_evaluation`.
.chain(need_evaluation)
.collect();
@ -510,11 +512,13 @@ fn has_cycle(subject: &OrmTrackedSubject, visited: &mut HashSet<String>) -> bool
return true;
}
visited.insert(subject.subject_iri.clone());
for (_parent_iri, (parent_subject, _)) in &subject.parents {
if has_cycle(parent_subject, visited) {
for (_parent_iri, parent_subject) in &subject.parents {
if let Some(parent_subject) = parent_subject.upgrade() {
if has_cycle(&parent_subject, visited) {
return true;
}
}
}
visited.remove(&subject.subject_iri);
false
}

Loading…
Cancel
Save