wip:fix bugs, restructure, some compilation errors left

feat/orm
Laurin Weger 2 weeks ago
parent b9ff05feee
commit 04b843e0fa
No known key found for this signature in database
GPG Key ID: 9B372BB0B792770F
  1. 54
      ng-net/src/orm.rs
  2. 52
      ng-verifier/src/commits/mod.rs
  3. 1
      ng-verifier/src/lib.rs
  4. 1067
      ng-verifier/src/orm.rs
  5. 2
      ng-verifier/src/request_processor.rs
  6. 10
      ng-verifier/src/utils/mod.rs
  7. 585
      ng-verifier/src/utils/orm_validation.rs
  8. 15
      ng-verifier/src/verifier.rs

@ -11,14 +11,16 @@
#![allow(non_snake_case)] #![allow(non_snake_case)]
use std::collections::HashSet; use std::{
use std::{collections::HashMap, rc::Weak}; collections::HashMap,
sync::{Arc, Weak},
};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use crate::app_protocol::AppResponse; use crate::app_protocol::{AppResponse, NuriV0};
use crate::utils::Sender; use crate::utils::Sender;
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
@ -52,7 +54,6 @@ pub struct OrmDiffOp {
pub type OrmDiff = Vec<OrmDiffOp>; pub type OrmDiff = Vec<OrmDiffOp>;
/* == ORM Schema == */
pub type OrmSchema = HashMap<String, OrmSchemaShape>; pub type OrmSchema = HashMap<String, OrmSchemaShape>;
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
@ -98,44 +99,37 @@ pub struct OrmSchemaPredicate {
pub extra: Option<bool>, pub extra: Option<bool>,
} }
#[derive(Clone, Debug)]
pub struct OrmSubscription<'a> {
pub sender: Sender<AppResponse>,
pub tracked_objects: HashMap<String, OrmTrackedSubjectAndShape<'a>>,
}
/// A struct for recording the state of subjects and its predicates /// A struct for recording the state of subjects and its predicates
/// relevant to its shape. /// relevant to its shape.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct OrmTrackedSubjectAndShape<'a> { pub struct OrmTrackedSubject {
/// The known predicates (only those relevant to the shape). /// The known predicates (only those relevant to the shape).
/// If there are no triples with a predicate, they are discarded /// If there are no triples with a predicate, they are discarded
pub tracked_predicates: HashMap<String, OrmTrackedPredicate<'a>>, pub tracked_predicates: HashMap<String, OrmTrackedPredicate>,
/// If this is a nested subject, this records the parents /// If this is a nested subject, this records the parents
/// and if they are currently tracking this subject. /// and if they are currently tracking this subject.
pub parents: HashMap<String, (OrmTrackedSubjectAndShape<'a>, bool)>, pub parents: HashMap<String, OrmTrackedSubject>,
/// Validity. When untracked, triple updates are not processed here. /// Validity. When untracked, triple updates are not processed here.
pub valid: OrmTrackedSubjectValidity, pub valid: OrmTrackedSubjectValidity,
pub subject_iri: String, pub subject_iri: String,
/// The shape for which the predicates are tracked. /// The shape for which the predicates are tracked.
pub shape: &'a OrmSchemaShape, pub shape: Arc<OrmSchemaShape>,
} }
#[derive(Clone, Debug, Eq, PartialEq)] #[derive(Clone, Debug, Eq, PartialEq)]
pub enum OrmTrackedSubjectValidity { pub enum OrmTrackedSubjectValidity {
Valid, Valid,
Invalid, Invalid,
NotEvaluated, Pending,
Untracked, Untracked,
NeedsFetch,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct OrmTrackedPredicate<'a> { pub struct OrmTrackedPredicate {
/// The predicate schema /// The predicate schema
pub schema: &'a OrmSchemaPredicate, pub schema: Arc<OrmSchemaPredicate>,
/// If the schema is a nested object, the children. /// If the schema is a nested object, the children.
pub tracked_children: Vec<Weak<OrmTrackedSubjectAndShape<'a>>>, pub tracked_children: Vec<Weak<OrmTrackedSubject>>,
/// The count of triples for this subject and predicate. /// The count of triples for this subject and predicate.
pub current_cardinality: i32, pub current_cardinality: i32,
/// If schema is of type literal, the currently present ones. /// If schema is of type literal, the currently present ones.
@ -148,12 +142,10 @@ pub struct OrmTrackedSubjectChange<'a> {
pub subject_iri: String, pub subject_iri: String,
/// Predicates that were changed. /// Predicates that were changed.
pub predicates: HashMap<String, OrmTrackedPredicateChanges<'a>>, pub predicates: HashMap<String, OrmTrackedPredicateChanges<'a>>,
/// During validation, the current state of validity (can be subject to change).
pub valid: OrmTrackedSubjectValidity,
} }
pub struct OrmTrackedPredicateChanges<'a> { pub struct OrmTrackedPredicateChanges<'a> {
/// The tracked predicate for which those changes were recorded. /// The tracked predicate for which those changes were recorded.
pub tracked_predicate: &'a OrmTrackedPredicate<'a>, pub tracked_predicate: &'a OrmTrackedPredicate,
pub values_added: Vec<BasicType>, pub values_added: Vec<BasicType>,
pub values_removed: Vec<BasicType>, pub values_removed: Vec<BasicType>,
} }
@ -166,6 +158,17 @@ pub enum Term {
Ref(String), Ref(String),
} }
#[derive(Clone, Debug)]
pub struct OrmSubscription {
pub shape_type: OrmShapeType,
pub session_id: u64,
pub nuri: NuriV0,
pub sender: Sender<AppResponse>,
pub tracked_subjects: HashMap<SubjectIri, HashMap<ShapeIri, OrmTrackedSubject>>,
}
type ShapeIri = String;
type SubjectIri = String;
impl Default for OrmSchemaDataType { impl Default for OrmSchemaDataType {
fn default() -> Self { fn default() -> Self {
Self { Self {
@ -188,10 +191,3 @@ impl Default for OrmSchemaPredicate {
} }
} }
} }
/** == Internal data types == */
#[derive(Clone, Debug)]
pub struct OrmShapeTypeRef {
pub ref_count: u64,
pub shape_type: OrmShapeType,
}

@ -29,7 +29,7 @@ use ng_net::app_protocol::*;
use crate::verifier::Verifier; use crate::verifier::Verifier;
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
pub trait CommitVerifier { pub trait CommitVerifier {
async fn verify( async fn verify(
&self, &self,
@ -288,7 +288,7 @@ impl CommitVerifier for AddBranch {
Ok(()) Ok(())
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for Repository { impl CommitVerifier for Repository {
async fn verify( async fn verify(
&self, &self,
@ -302,7 +302,7 @@ impl CommitVerifier for Repository {
Ok(()) Ok(())
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for StoreUpdate { impl CommitVerifier for StoreUpdate {
async fn verify( async fn verify(
&self, &self,
@ -315,7 +315,7 @@ impl CommitVerifier for StoreUpdate {
verifier.new_store_from_update(self) verifier.new_store_from_update(self)
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for AddInboxCap { impl CommitVerifier for AddInboxCap {
async fn verify( async fn verify(
&self, &self,
@ -330,7 +330,7 @@ impl CommitVerifier for AddInboxCap {
} }
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for AddSignerCap { impl CommitVerifier for AddSignerCap {
async fn verify( async fn verify(
&self, &self,
@ -345,7 +345,7 @@ impl CommitVerifier for AddSignerCap {
} }
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for AddMember { impl CommitVerifier for AddMember {
#[allow(unused_variables)] #[allow(unused_variables)]
async fn verify( async fn verify(
@ -359,7 +359,7 @@ impl CommitVerifier for AddMember {
Ok(()) Ok(())
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for RemoveMember { impl CommitVerifier for RemoveMember {
#[allow(unused_variables)] #[allow(unused_variables)]
async fn verify( async fn verify(
@ -373,7 +373,7 @@ impl CommitVerifier for RemoveMember {
Ok(()) Ok(())
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for AddPermission { impl CommitVerifier for AddPermission {
#[allow(unused_variables)] #[allow(unused_variables)]
async fn verify( async fn verify(
@ -387,7 +387,7 @@ impl CommitVerifier for AddPermission {
Ok(()) Ok(())
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for RemovePermission { impl CommitVerifier for RemovePermission {
#[allow(unused_variables)] #[allow(unused_variables)]
async fn verify( async fn verify(
@ -401,7 +401,7 @@ impl CommitVerifier for RemovePermission {
Ok(()) Ok(())
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for RemoveBranch { impl CommitVerifier for RemoveBranch {
#[allow(unused_variables)] #[allow(unused_variables)]
async fn verify( async fn verify(
@ -415,7 +415,7 @@ impl CommitVerifier for RemoveBranch {
Ok(()) Ok(())
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for AddName { impl CommitVerifier for AddName {
#[allow(unused_variables)] #[allow(unused_variables)]
async fn verify( async fn verify(
@ -429,7 +429,7 @@ impl CommitVerifier for AddName {
Ok(()) Ok(())
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for RemoveName { impl CommitVerifier for RemoveName {
#[allow(unused_variables)] #[allow(unused_variables)]
async fn verify( async fn verify(
@ -443,7 +443,7 @@ impl CommitVerifier for RemoveName {
Ok(()) Ok(())
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for () { impl CommitVerifier for () {
#[allow(unused_variables)] #[allow(unused_variables)]
async fn verify( async fn verify(
@ -457,7 +457,7 @@ impl CommitVerifier for () {
Ok(()) Ok(())
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for Snapshot { impl CommitVerifier for Snapshot {
#[allow(unused_variables)] #[allow(unused_variables)]
async fn verify( async fn verify(
@ -484,7 +484,7 @@ impl CommitVerifier for Snapshot {
Ok(()) Ok(())
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for AddFile { impl CommitVerifier for AddFile {
#[allow(unused_variables)] #[allow(unused_variables)]
async fn verify( async fn verify(
@ -529,7 +529,7 @@ impl CommitVerifier for AddFile {
} }
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for RemoveFile { impl CommitVerifier for RemoveFile {
#[allow(unused_variables)] #[allow(unused_variables)]
async fn verify( async fn verify(
@ -543,7 +543,7 @@ impl CommitVerifier for RemoveFile {
Ok(()) Ok(())
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for Compact { impl CommitVerifier for Compact {
#[allow(unused_variables)] #[allow(unused_variables)]
async fn verify( async fn verify(
@ -557,7 +557,7 @@ impl CommitVerifier for Compact {
Ok(()) Ok(())
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for AsyncSignature { impl CommitVerifier for AsyncSignature {
#[allow(unused_variables)] #[allow(unused_variables)]
async fn verify( async fn verify(
@ -605,7 +605,7 @@ impl CommitVerifier for AsyncSignature {
} }
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for RootCapRefresh { impl CommitVerifier for RootCapRefresh {
#[allow(unused_variables)] #[allow(unused_variables)]
async fn verify( async fn verify(
@ -619,7 +619,7 @@ impl CommitVerifier for RootCapRefresh {
Ok(()) Ok(())
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for BranchCapRefresh { impl CommitVerifier for BranchCapRefresh {
#[allow(unused_variables)] #[allow(unused_variables)]
async fn verify( async fn verify(
@ -633,7 +633,7 @@ impl CommitVerifier for BranchCapRefresh {
Ok(()) Ok(())
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for AddRepo { impl CommitVerifier for AddRepo {
#[allow(unused_variables)] #[allow(unused_variables)]
async fn verify( async fn verify(
@ -656,7 +656,7 @@ impl CommitVerifier for AddRepo {
Ok(()) Ok(())
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for RemoveRepo { impl CommitVerifier for RemoveRepo {
#[allow(unused_variables)] #[allow(unused_variables)]
async fn verify( async fn verify(
@ -670,7 +670,7 @@ impl CommitVerifier for RemoveRepo {
Ok(()) Ok(())
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for AddLink { impl CommitVerifier for AddLink {
#[allow(unused_variables)] #[allow(unused_variables)]
async fn verify( async fn verify(
@ -684,7 +684,7 @@ impl CommitVerifier for AddLink {
Ok(()) Ok(())
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for RemoveLink { impl CommitVerifier for RemoveLink {
#[allow(unused_variables)] #[allow(unused_variables)]
async fn verify( async fn verify(
@ -698,7 +698,7 @@ impl CommitVerifier for RemoveLink {
Ok(()) Ok(())
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for RemoveSignerCap { impl CommitVerifier for RemoveSignerCap {
#[allow(unused_variables)] #[allow(unused_variables)]
async fn verify( async fn verify(
@ -712,7 +712,7 @@ impl CommitVerifier for RemoveSignerCap {
Ok(()) Ok(())
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl CommitVerifier for WalletUpdate { impl CommitVerifier for WalletUpdate {
#[allow(unused_variables)] #[allow(unused_variables)]
async fn verify( async fn verify(

@ -26,6 +26,7 @@ mod inbox_processor;
#[cfg(all(not(target_family = "wasm"), not(docsrs)))] #[cfg(all(not(target_family = "wasm"), not(docsrs)))]
mod rocksdb_user_storage; mod rocksdb_user_storage;
pub(crate) mod utils;
use ng_net::app_protocol::*; use ng_net::app_protocol::*;
use ng_oxigraph::oxrdf::Triple; use ng_oxigraph::oxrdf::Triple;

File diff suppressed because it is too large Load Diff

@ -54,7 +54,7 @@ impl Verifier {
match command { match command {
AppRequestCommandV0::OrmStart => match _payload { AppRequestCommandV0::OrmStart => match _payload {
Some(AppRequestPayload::V0(AppRequestPayloadV0::OrmStart(shape_type))) => { Some(AppRequestPayload::V0(AppRequestPayloadV0::OrmStart(shape_type))) => {
self.start_orm(nuri, shape_type, session_id).await self.start_orm(nuri, &shape_type, session_id).await
} }
_ => return Err(NgError::InvalidArgument), _ => return Err(NgError::InvalidArgument),
}, },

@ -0,0 +1,10 @@
// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
// at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except
// according to those terms.
pub mod orm_validation;

@ -0,0 +1,585 @@
// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
// at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except
// according to those terms.
use std::collections::HashMap;
use std::collections::HashSet;
use ng_net::orm::BasicType;
use ng_net::orm::OrmSchemaLiteralType;
use ng_net::orm::OrmSchemaShape;
use ng_net::orm::OrmTrackedPredicate;
use ng_net::orm::OrmTrackedSubject;
use ng_net::orm::OrmTrackedSubjectChange;
use ng_net::orm::OrmTrackedSubjectValidity;
use ng_oxigraph::oxrdf::Subject;
use ng_oxigraph::oxrdf::Triple;
use ng_repo::errors::NgError;
pub fn group_by_subject_for_shape<'a>(
shape: &'a OrmSchemaShape,
triples: &'a [Triple],
allowed_subjects: &[String],
) -> HashMap<String, Vec<&'a Triple>> {
let mut triples_by_subject: HashMap<String, Vec<&Triple>> = HashMap::new();
let allowed_preds_set: HashSet<&str> = shape
.predicates
.iter()
.map(|OrmSchemaDataType, OrmSchemaPredicatep| p.iri.as_str())
.collect();
let allowed_subject_set: HashSet<&str> = allowed_subjects.iter().map(|s| s.as_str()).collect();
for triple in triples {
// triple.subject must be in allowed_subjects (or allowed_subjects empty)
// and triple.predicate must be in allowed_preds.
if allowed_preds_set.contains(triple.predicate.as_str()) {
// filter subjects if list provided
let subj = match &triple.subject {
Subject::NamedNode(n) => n.as_ref(),
_ => continue,
};
// Subject must be in allowed subjects (or allowed_subjects is empty).
if allowed_subject_set.is_empty() || allowed_subject_set.contains(subj.as_str()) {
triples_by_subject
.entry(subj.to_string())
.or_insert_with(Vec::new)
.push(triple);
}
}
}
return triples_by_subject;
}
/// Add all triples to `subject_changes`
/// Returns predicates to nested objects that were touched and need processing.
/// Assumes all triples have same subject.
use std::sync::Arc;
pub fn add_remove_triples_mut(
shape: &Arc<OrmSchemaShape>,
subject_iri: &str,
triples_added: &[&Triple],
triples_removed: &[&Triple],
tracked_subjects: &mut HashMap<String, HashMap<String, OrmTrackedSubject>>,
subject_changes: &mut OrmTrackedSubjectChange,
) -> Result<(), NgError> {
let get_or_create_tracked_subject =
|subject_iri: &str,
shape_iri: &str,
tracked_subjects: &mut HashMap<String, HashMap<String, OrmTrackedSubject>>| {
let tracked_shapes_for_subject = tracked_subjects
.entry(subject_iri.to_string())
.or_insert_with(HashMap::new);
tracked_shapes_for_subject
.entry(shape_iri.to_string())
.or_insert_with(|| OrmTrackedSubject {
tracked_predicates: HashMap::new(),
parents: HashMap::new(),
valid: ng_net::orm::OrmTrackedSubjectValidity::Pending,
subject_iri: subject_iri.to_string(),
shape,
})
};
let tracked_subject = get_or_create_tracked_subject(subject_iri, &shape.iri, tracked_subjects);
// Process added triples.
// For each triple, check matching predicates in shape.
// keeping track of value count (for later validations).
// In parallel, we keep track of the values added (tracked_changes)
for triple in triples_added {
for predicate_schema in &shape.predicates {
if predicate_schema.iri != triple.predicate.as_str() {
// Triple does not match predicate.
continue;
}
// Predicate schema constraint matches this triple.
// Add tracked predicate or increase cardinality
let tracked_predicate = tracked_subject
.tracked_predicates
.entry(predicate_schema.iri.to_string())
.or_insert_with(|| OrmTrackedPredicate {
current_cardinality: 0,
schema: predicate_schema,
tracked_children: Vec::new(),
current_literals: None,
});
tracked_predicate.current_cardinality += 1;
let obj_term = oxrdf_term_to_orm_basic_type(&triple.object);
// Keep track of the changed values too.
let pred_changes = subject_changes
.predicates
.entry(predicate_schema.iri.clone())
.or_insert_with(|| OrmTrackedPredicateChanges {
tracked_predicate: tracked_predicate, // reference remains inside lifetime of this call
values_added: Vec::new(),
values_removed: Vec::new(),
});
pred_changes.values_added.push(obj_term.clone());
// If value type is literal, we need to add the current value to the tracked predicate.
if tracked_predicate
.schema
.dataTypes
.iter()
.any(|dt| dt.valType == OrmSchemaLiteralType::literal)
{
match &mut tracked_predicate.current_literals {
Some(lits) => lits.push(obj_term),
None => {
tracked_predicate.current_literals = Some(vec![obj_term]);
}
}
}
// If predicate is of type shape, register (parent -> child) links so that
// nested subjects can later be (lazily) fetched / validated.
for shape_iri in predicate_schema
.dataTypes
.iter()
.filter(|dt| dt.valType == OrmSchemaLiteralType::shape)
.flat_map(|dt| dt.shape)
{
if let BasicType::Str(obj_iri) = obj_term {
// Get or create object's tracked subject struct.
let tracked_child =
get_or_create_tracked_subject(triple.predicate.as_string(), &shape_iri);
// Add self to parent (set tracked to true, preliminary).
tracked_child.parents.insert(obj_iri, tracked_child);
// Add link to children
tracked_predicate
.tracked_children
.push(unsafe { Weak::from_raw(tracked_child) });
}
}
}
}
// Process removed triples.
for triple in triples_removed {
let pred_iri = triple.predicate.as_str();
// Only adjust if we had tracked state.
let tracked_predicate_opt = tracked_subjects
.get_mut(subject_iri)
.and_then(|tss| tss.get_mut(&shape.iri))
.and_then(|ts| ts.tracked_predicates.get_mut(pred_iri));
let Some(tracked_predicate) = tracked_predicate_opt else {
continue;
};
// The cardinality might become -1 or 0. We will remove them from the tracked predicates during validation.
tracked_predicate.current_cardinality =
tracked_predicate.current_cardinality.saturating_sub(1);
let Some(pred_changes) = subject_changes.predicates.get_mut(pred_iri) else {
continue;
};
let val_removed = oxrdf_term_to_orm_basic_type(&triple.object);
pred_changes.values_removed.push(val_removed.clone());
// If value type is literal, we need to remove the current value from the tracked predicate.
if tracked_predicate
.schema
.dataTypes
.iter()
.any(|dt| dt.valType == OrmSchemaLiteralType::literal)
{
if let Some(current_literals) = &mut tracked_predicate.current_literals {
// Remove obj_val from current_literals in-place
current_literals.retain(|val| *val != val_removed);
} else {
tracked_predicate.current_literals = Some(vec![val_removed]);
}
} else if tracked_predicate
.schema
.dataTypes
.iter()
.any(|dt| dt.valType == OrmSchemaLiteralType::shape)
{
// Remove parent from child and child from tracked children.
for shape_iri in tracked_predicate
.schema
.dataTypes
.iter()
.filter(|dt| dt.valType == OrmSchemaLiteralType::shape)
.flat_map(|dt| dt.shape)
{
// Nested shape removal logic disabled (see note above).
}
}
}
Ok(())
}
/// Check the validity of a subject and update affecting tracked subjects' validity.
/// Might return nested objects that need to be validated.
/// Assumes all triples to be of same subject.
pub fn update_subject_validity<'a>(
s_change: &'a OrmTrackedSubjectChange<'a>,
shape: &OrmSchemaShape,
tracked_subjects: &HashMap<String, HashMap<String, OrmTrackedSubject<'a>>>,
previous_validity: OrmTrackedSubjectValidity,
) -> (
OrmTrackedSubjectValidity,
// Vec<subject_iri, shape_iri>
Vec<(String, String)>,
) {
let Some(tracked_shapes) = tracked_subjects.get_mut(subject_iri) else {
return (previous_validity, vec![]);
};
let Some(tracked_subject) = tracked_shapes.get_mut(&shape.iri) else {
return (previous_validity, vec![]);
};
// Check 1) Check if we need to fetch this object.
// If this subject has not been monitored but parents are now valid or need evaluation, we fetch.
// Check 2) If all parents are untracked, return untracked.
if tracked_subject.parents.len() != 0 {
let no_parents_tracking = tracked_subject.parents.values().all(|parent| {
parent.valid == OrmTrackedSubjectValidity::Untracked
|| parent.valid == OrmTrackedSubjectValidity::Invalid
});
if no_parents_tracking {
// Remove tracked predicates and set untracked.
tracked_subject.tracked_predicates = HashMap::new();
tracked_subject.valid = OrmTrackedSubjectValidity::Untracked;
return (OrmTrackedSubjectValidity::Untracked, vec![]);
} else if !no_parents_tracking && previous_validity == OrmTrackedSubjectValidity::Untracked
{
// We need to fetch the subject's current state:
// We have new parents but were previously not recording changes.
// TODO
}
}
// Check 2) If there are no changes, there is nothing to do.
if s_change.predicates.is_empty() {
return (previous_validity, vec![]);
}
let mut new_validity = OrmTrackedSubjectValidity::Valid;
fn set_validity(current: &mut OrmTrackedSubjectValidity, new_val: OrmTrackedSubjectValidity) {
if new_val == OrmTrackedSubjectValidity::Invalid {
*current = OrmTrackedSubjectValidity::Invalid;
} else {
*current = new_val;
}
}
// Check 3) If there is an infinite loop of parents pointing back to us, return invalid.
// Create a set of visited parents to detect cycles.
if has_cycle(tracked_subject, &mut HashSet::new()) {
// Remove tracked predicates and set invalid.
tracked_subject.tracked_predicates = HashMap::new();
tracked_subject.valid = OrmTrackedSubjectValidity::Invalid;
return (OrmTrackedSubjectValidity::Invalid, vec![]);
}
// Keep track of objects that need to be validated against a shape to fetch and validate.
let mut need_evaluation: Vec<(String, String)> = vec![];
// Check 4) Validate subject against each predicate in shape.
for p_schema in shape.predicates.iter() {
let p_change = s_change.predicates.get(&p_schema.iri);
let tracked_pred = p_change.map(|pc| pc.tracked_predicate);
let count =
tracked_pred.map_or_else(|| 0, |tp: &OrmTrackedPredicate| tp.current_cardinality);
// Check 4.1) Cardinality
if count < p_schema.minCardinality {
set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid);
if count <= 0 {
// If cardinality is 0, we can remove the tracked predicate.
tracked_subject.tracked_predicates.remove(&p_schema.iri);
}
break;
// Check 4.2) Cardinality too high and extra values not allowed.
} else if count > p_schema.maxCardinality
&& p_schema.maxCardinality != -1
&& p_schema.extra != Some(true)
{
// If cardinality is too high and no extra allowed, invalid.
set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid);
break;
// Check 4.3) Required literals present.
} else if p_schema
.dataTypes
.iter()
.any(|dt| dt.valType == OrmSchemaLiteralType::literal)
{
// If we have literals, check if all required literals are present.
// At least one datatype must match.
let some_valid =
p_schema
.dataTypes
.iter()
.flat_map(|dt| &dt.literals)
.any(|required_literals| {
// Early stop: If no extra values allowed but the sizes
// between required and given values mismatches.
if !p_schema.extra.unwrap_or(false)
&& ((required_literals.len() as i32)
!= tracked_pred.map_or(0, |p| p.current_cardinality))
{
return false;
}
// Check that each required literal is present.
for required_literal in required_literals {
// Is tracked predicate present?
if !tracked_pred
.iter()
.flat_map(|tp| &tp.current_literals)
.flatten()
.any(|literal| *literal == *required_literal)
{
return false;
}
}
// All required literals present.
return true;
});
if !some_valid {
set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid);
}
// Check 4.4) Nested shape correct.
} else if p_schema
.dataTypes
.iter()
.any(|dt| dt.valType == OrmSchemaLiteralType::shape)
{
// If we have a nested shape, we need to check if the nested objects are tracked and valid.
// First, Count valid, invalid, unknowns, and untracked
let counts = tracked_pred
.iter()
.flat_map(|tp| tp.tracked_children)
.map(|tc| {
tc.upgrade().map(|tc| {
if tc.valid == OrmTrackedSubjectValidity::Valid {
(1, 0, 0, 0)
} else if tc.valid == OrmTrackedSubjectValidity::Invalid {
(0, 1, 0, 0)
} else if tc.valid == OrmTrackedSubjectValidity::Pending {
(0, 0, 1, 0)
} else if tc.valid == OrmTrackedSubjectValidity::Untracked {
(0, 0, 0, 1)
} else {
(0, 0, 0, 0)
}
})
})
.flatten()
.fold((0, 0, 0, 0), |(v1, i1, u1, ut1), o| {
(v1 + o.0, i1 + o.1, u1 + o.2, ut1 + o.3)
});
if counts.1 > 0 && p_schema.extra != Some(true) {
// If we have at least one invalid nested object and no extra allowed, invalid.
set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid);
break;
} else if counts.0 < p_schema.minCardinality {
// If we have not enough valid nested objects, invalid.
set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid);
break;
} else if counts.3 > 0 {
// If we have untracked nested objects, we need to fetch them and validate.
set_validity(&mut new_validity, OrmTrackedSubjectValidity::Pending);
// After that we need to reevaluate this (subject,shape) again.
need_evaluation.push((subject_iri.to_string(), shape.iri.clone(), false));
// Also schedule untracked children for fetching and validation.
if let Some(tp) = tracked_pred {
for weak_child in &tp.tracked_children {
if let Some(child) = weak_child.upgrade() {
if child.valid == OrmTrackedSubjectValidity::Untracked {
need_evaluation.push((
child.subject_iri.clone(),
child.shape.iri.clone(),
true,
));
}
}
}
}
} else if counts.2 > 0 {
// If we have unknown nested objects, we need to wait for their evaluation.
set_validity(&mut new_validity, OrmTrackedSubjectValidity::Pending);
// Schedule unknown children (NotEvaluated) for re-evaluation without fetch.
if let Some(tp) = tracked_pred {
for weak_child in &tp.tracked_children {
if let Some(child) = weak_child.upgrade() {
if child.valid == OrmTrackedSubjectValidity::Pending {
need_evaluation.push((
child.subject_iri.clone(),
child.shape.iri.clone(),
false,
));
}
}
}
}
} else {
// All nested objects are valid and cardinality is correct.
// We are valid with this predicate.
}
// Check 4.5) Data types correct.
} else {
// Check if the data type is correct.
let allowed_types: Vec<&OrmSchemaLiteralType> =
p_schema.dataTypes.iter().map(|dt| &dt.valType).collect();
// For each new value, check that data type is in allowed_types.
for val_added in p_change.iter().map(|pc| pc.values_added).flatten() {
let matches = match val_added {
BasicType::Bool(_) => allowed_types
.iter()
.any(|t| **t == OrmSchemaLiteralType::boolean),
BasicType::Num(_) => allowed_types
.iter()
.any(|t| **t == OrmSchemaLiteralType::number),
BasicType::Str(_) => allowed_types.iter().any(|t| {
**t == OrmSchemaLiteralType::string || **t == OrmSchemaLiteralType::iri
}),
};
if !matches {
set_validity(&mut new_validity, OrmTrackedSubjectValidity::Invalid);
break;
}
}
// Break again if validity has become invalid.
if new_validity == OrmTrackedSubjectValidity::Invalid {
break;
}
};
}
if new_validity == OrmTrackedSubjectValidity::Invalid {
// If we are invalid, we can discard new unknowns again - they won't be kept in memory.
// We need to remove ourself from child objects parents field and
// remove them if no other is tracking.
// Child relationship cleanup disabled (nested tracking disabled in this refactor step)
// Remove tracked predicates and set untracked.
tracked_subject.tracked_predicates = HashMap::new();
// Empty list of children that need evaluation.
need_evaluation.retain(|_| false);
} else if new_validity == OrmTrackedSubjectValidity::Valid
&& previous_validity != OrmTrackedSubjectValidity::Valid
{
// If this subject became valid, we need to refetch this subject;
// For that we prepend self to needs_fetch.
need_evaluation.insert(0, (s_change.subject_iri, shape.iri.clone()));
}
// If validity changed, parents need to be re-evaluated.
if new_validity != previous_validity {
// We return the tracking parents which need re-evaluation.
// Remember that the last elements (i.e. children or needs_fetch) are evaluated first.
return (
new_validity,
// Add parents and objects in `need_evaluation`.
tracked_subject
.parents
.values()
// Inform tracking parents only.
.filter(|(parent, is_tracked)| *is_tracked)
.map(|(parent, is_tracked)| (&parent.subject_iri, parent.shape))
// Add `need_evaluation`.
.chain(need_evaluation)
.collect(),
);
}
tracked_subject.valid = new_validity;
return (new_validity, need_evaluation);
}
fn oxrdf_term_to_orm_basic_type(term: &ng_oxigraph::oxrdf::Term) -> BasicType {
match oxrdf_term_to_orm_term(term) {
ng_net::orm::Term::Str(s) => BasicType::Str(s),
ng_net::orm::Term::Num(n) => BasicType::Num(n),
ng_net::orm::Term::Bool(b) => BasicType::Bool(b),
ng_net::orm::Term::Ref(b) => BasicType::Str(b), // Treat IRIs as strings
}
}
fn has_cycle(subject: &OrmTrackedSubject, visited: &mut HashSet<String>) -> bool {
if visited.contains(&subject.subject_iri) {
return true;
}
visited.insert(subject.subject_iri.clone());
for (_parent_iri, (parent_subject, _)) in &subject.parents {
if has_cycle(parent_subject, visited) {
return true;
}
}
visited.remove(&subject.subject_iri);
false
}
/// Converts an oxrdf::Term to an orm::Term
fn oxrdf_term_to_orm_term(term: &ng_oxigraph::oxrdf::Term) -> ng_net::orm::Term {
match term {
ng_oxigraph::oxrdf::Term::NamedNode(node) => {
ng_net::orm::Term::Ref(node.as_str().to_string())
}
ng_oxigraph::oxrdf::Term::BlankNode(node) => {
ng_net::orm::Term::Ref(node.as_str().to_string())
}
ng_oxigraph::oxrdf::Term::Literal(literal) => {
// Check the datatype to determine how to convert
match literal.datatype().as_str() {
// Check for string first, this is the most common.
"http://www.w3.org/2001/XMLSchema#string" => {
ng_net::orm::Term::Str(literal.value().to_string())
}
"http://www.w3.org/2001/XMLSchema#boolean" => {
match literal.value().parse::<bool>() {
Ok(b) => ng_net::orm::Term::Bool(b),
Err(_) => ng_net::orm::Term::Str(literal.value().to_string()),
}
}
"http://www.w3.org/2001/XMLSchema#integer"
| "http://www.w3.org/2001/XMLSchema#decimal"
| "http://www.w3.org/2001/XMLSchema#double"
| "http://www.w3.org/2001/XMLSchema#float"
| "http://www.w3.org/2001/XMLSchema#int"
| "http://www.w3.org/2001/XMLSchema#long"
| "http://www.w3.org/2001/XMLSchema#short"
| "http://www.w3.org/2001/XMLSchema#byte"
| "http://www.w3.org/2001/XMLSchema#unsignedInt"
| "http://www.w3.org/2001/XMLSchema#unsignedLong"
| "http://www.w3.org/2001/XMLSchema#unsignedShort"
| "http://www.w3.org/2001/XMLSchema#unsignedByte" => {
match literal.value().parse::<f64>() {
Ok(n) => ng_net::orm::Term::Num(n),
Err(_) => ng_net::orm::Term::Str(literal.value().to_string()),
}
}
_ => ng_net::orm::Term::Str(literal.value().to_string()),
}
}
ng_oxigraph::oxrdf::Term::Triple(triple) => {
// For RDF-star triples, convert to string representation
ng_net::orm::Term::Str(triple.to_string())
}
}
}

@ -19,15 +19,12 @@ use std::fs::create_dir_all;
use std::fs::{read, File, OpenOptions}; use std::fs::{read, File, OpenOptions};
#[cfg(all(not(target_family = "wasm"), not(docsrs)))] #[cfg(all(not(target_family = "wasm"), not(docsrs)))]
use std::io::Write; use std::io::Write;
use std::rc::Weak;
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use async_std::stream::StreamExt; use async_std::stream::StreamExt;
use async_std::sync::{Mutex, RwLockReadGuard}; use async_std::sync::{Mutex, RwLockReadGuard};
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::SinkExt; use futures::SinkExt;
use ng_net::orm::OrmSchemaPredicate;
use ng_net::orm::OrmShapeTypeRef;
use ng_net::orm::OrmSubscription; use ng_net::orm::OrmSubscription;
use ng_oxigraph::oxigraph::sparql::Query; use ng_oxigraph::oxigraph::sparql::Query;
use ng_oxigraph::oxigraph::sparql::QueryResults; use ng_oxigraph::oxigraph::sparql::QueryResults;
@ -115,12 +112,12 @@ pub struct Verifier {
in_memory_outbox: Vec<EventOutboxStorage>, in_memory_outbox: Vec<EventOutboxStorage>,
uploads: BTreeMap<u32, RandomAccessFile>, uploads: BTreeMap<u32, RandomAccessFile>,
branch_subscriptions: HashMap<BranchId, Sender<AppResponse>>, branch_subscriptions: HashMap<BranchId, Sender<AppResponse>>,
pub(crate) orm_tracked_subjects: pub(crate) orm_subscriptions: HashMap<NuriV0, OrmSubscription>,
HashMap<NuriV0, HashMap<String, HashMap<u64, OrmSubscription>>>,
pub(crate) orm_shape_types: HashMap<String, OrmShapeTypeRef>,
pub(crate) temporary_repo_certificates: HashMap<RepoId, ObjectRef>, pub(crate) temporary_repo_certificates: HashMap<RepoId, ObjectRef>,
} }
type SessionId = u64;
impl fmt::Debug for Verifier { impl fmt::Debug for Verifier {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "Verifier\nconfig: {:?}", self.config)?; writeln!(f, "Verifier\nconfig: {:?}", self.config)?;
@ -523,8 +520,7 @@ impl Verifier {
inner_to_outer: HashMap::new(), inner_to_outer: HashMap::new(),
uploads: BTreeMap::new(), uploads: BTreeMap::new(),
branch_subscriptions: HashMap::new(), branch_subscriptions: HashMap::new(),
orm_tracked_subjects: HashMap::new(), orm_subscriptions: HashMap::new(),
orm_shape_types: HashMap::new(),
temporary_repo_certificates: HashMap::new(), temporary_repo_certificates: HashMap::new(),
} }
} }
@ -2816,8 +2812,7 @@ impl Verifier {
inner_to_outer: HashMap::new(), inner_to_outer: HashMap::new(),
uploads: BTreeMap::new(), uploads: BTreeMap::new(),
branch_subscriptions: HashMap::new(), branch_subscriptions: HashMap::new(),
orm_tracked_subjects: HashMap::new(), orm_subscriptions: HashMap::new(),
orm_shape_types: HashMap::new(),
temporary_repo_certificates: HashMap::new(), temporary_repo_certificates: HashMap::new(),
}; };
// this is important as it will load the last seq from storage // this is important as it will load the last seq from storage

Loading…
Cancel
Save