From 27ec9302a6d22defb3059e0938410234eb47d87c Mon Sep 17 00:00:00 2001 From: Laurin Weger Date: Wed, 1 Oct 2025 02:34:06 +0200 Subject: [PATCH] wip: apply_changes_from_triples --- nextgraph/src/tests/orm.rs | 4 +- ng-net/src/orm.rs | 78 +++- ng-verifier/src/orm.rs | 708 ++++++++++++++++++++++++++++++++---- ng-verifier/src/verifier.rs | 11 +- 4 files changed, 716 insertions(+), 85 deletions(-) diff --git a/nextgraph/src/tests/orm.rs b/nextgraph/src/tests/orm.rs index 499d7c5..d989985 100644 --- a/nextgraph/src/tests/orm.rs +++ b/nextgraph/src/tests/orm.rs @@ -18,7 +18,7 @@ use ng_verifier::orm::sparql_construct_from_orm_shape_type; use std::collections::HashMap; #[async_std::test] -async fn test_create_sparql_from_schema() { +#async fn test_create_sparql_from_schema() { // Setup wallet and document let (_wallet, session_id) = create_or_open_wallet().await; let doc_nuri = doc_create( @@ -536,5 +536,5 @@ INSERT DATA { // Assert: All 6 triples (3 per person) should be returned. log_info!("Triples:\n{:?}", triples); - assert_eq!(triples.len(), 24); + assert_eq!(triples.len(), 6); } diff --git a/ng-net/src/orm.rs b/ng-net/src/orm.rs index 362bd16..2d6a6de 100644 --- a/ng-net/src/orm.rs +++ b/ng-net/src/orm.rs @@ -11,12 +11,15 @@ #![allow(non_snake_case)] -use std::collections::HashMap; +use std::{collections::HashMap, rc::Weak}; use serde::{Deserialize, Serialize}; use serde_json::Value; +use crate::app_protocol::AppResponse; +use crate::utils::Sender; + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct OrmShapeType { pub schema: OrmSchema, @@ -68,18 +71,18 @@ pub enum OrmSchemaLiteralType { shape, } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(untagged)] -pub enum OrmSchemaLiterals { +pub enum BasicType { Bool(bool), - NumArray(Vec), - StrArray(Vec), + Num(f64), + Str(String), } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct OrmSchemaDataType { pub valType: OrmSchemaLiteralType, - pub literals: Option, + pub literals: Option>, pub shape: Option, } @@ -89,11 +92,66 @@ pub struct OrmSchemaPredicate { pub iri: String, pub readablePredicate: String, /// `-1` for infinity - pub maxCardinality: i64, - pub minCardinality: i64, + pub maxCardinality: i32, + pub minCardinality: i32, pub extra: Option, } +#[derive(Clone, Debug)] +pub struct OrmSubscription<'a> { + pub sender: Sender, + pub tracked_objects: HashMap>, +} + +#[derive(Clone, Debug)] +pub struct OrmTrackedSubject<'a> { + pub tracked_predicates: HashMap>, + // Parents and if they are currently tracking us. + pub parents: HashMap, bool)>, + pub valid: OrmTrackedSubjectValidity, + pub subj_iri: &'a String, + pub shape: &'a OrmSchemaShape, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum OrmTrackedSubjectValidity { + Valid, + Invalid, + Unknown, + Untracked, +} + +#[derive(Clone, Debug)] +pub struct OrmTrackedPredicate<'a> { + pub schema: &'a OrmSchemaPredicate, + pub tracked_children: Vec>>, + pub current_cardinality: i32, + pub current_literals: Option>, +} + +// Used only for tracking construction of new objects and diffs +// in parallel to modifying the tracked objects and predicates. +pub struct OrmTrackedSubjectChange<'a> { + pub subject_iri: String, + pub predicates: HashMap>, + pub valid: OrmTrackedSubjectValidity, + pub tracked_subject: &'a OrmTrackedSubject<'a>, +} +pub struct OrmTrackedPredicateChanges<'a> { + pub tracked_predicate: &'a OrmTrackedPredicate<'a>, + pub values_added: Vec, + pub values_removed: Vec, + pub validity: OrmTrackedSubjectValidity, +} + +#[derive(Clone, Debug)] +pub enum Term { + Str(String), + Num(f64), + Bool(bool), + Ref(String), +} + impl Default for OrmSchemaDataType { fn default() -> Self { Self { @@ -120,6 +178,6 @@ impl Default for OrmSchemaPredicate { /** == Internal data types == */ #[derive(Clone, Debug)] pub struct OrmShapeTypeRef { - ref_count: u64, - shape_type: OrmShapeType, + pub ref_count: u64, + pub shape_type: OrmShapeType, } diff --git a/ng-verifier/src/orm.rs b/ng-verifier/src/orm.rs index 183c7e8..c57bc03 100644 --- a/ng-verifier/src/orm.rs +++ b/ng-verifier/src/orm.rs @@ -8,21 +8,37 @@ // according to those terms. use std::collections::HashMap; +use std::collections::HashSet; +use std::hash::Hash; +use std::rc::Weak; +use async_std::task::current; use futures::channel::mpsc; use futures::SinkExt; use lazy_static::lazy_static; +use ng_net::orm::BasicType; pub use ng_net::orm::OrmDiff; +use ng_net::orm::OrmSchemaLiteralType; pub use ng_net::orm::OrmShapeType; +use ng_net::orm::OrmShapeTypeRef; +use ng_net::orm::OrmSubscription; +use ng_net::orm::OrmTrackedPredicate; +use ng_net::orm::OrmTrackedPredicateChanges; +use ng_net::orm::OrmTrackedSubject; +use ng_net::orm::OrmTrackedSubjectChange; +use ng_net::orm::OrmTrackedSubjectValidity; +use ng_net::orm::Term; use ng_net::orm::{OrmSchemaDataType, OrmSchemaShape}; -use ng_net::orm::{OrmSchemaLiteralType, OrmSchemaLiterals}; use ng_net::{app_protocol::*, orm::OrmSchema}; use ng_net::{ types::*, utils::{Receiver, Sender}, }; use ng_oxigraph::oxigraph::sparql::{results::*, Query, QueryResults}; +use ng_oxigraph::oxrdf::LiteralRef; +use ng_oxigraph::oxrdf::NamedNode; +use ng_oxigraph::oxrdf::Subject; use ng_oxigraph::oxrdf::Term; use ng_oxigraph::oxrdf::Triple; use ng_repo::errors::NgError; @@ -72,48 +88,514 @@ impl Verifier { } } - pub fn sparql_select( - &self, - query: String, - nuri: Option, - ) -> Result>>, NgError> { - let oxistore = self.graph_dataset.as_ref().unwrap(); + fn apply_changes_from_triples( + &mut self, + scope: &NuriV0, + schema: &OrmSchema, + shape: &String, + triples_added: &Vec, + triples_removed: &Vec, + ) { + let tracked_subjects: HashMap> = + self.orm_tracked_subjects; + // Structure to store changes in. + let mut subject_changes: HashMap = HashMap::new(); + + // Group triples by predicate (only keep predicates defined in the shape). Drop others. + let mut added_triples_by_pred: HashMap> = HashMap::new(); + let Some(shape_def) = schema.get(shape) else { + log_err!( + "Shape {} not found in schema when grouping triples by predicate", + shape + ); + return; + }; - // let graph_nuri = NuriV0::repo_graph_name( - // &update.repo_id, - // &update.overlay_id, - // ); + // Collect allowed predicate IRIs for this shape + let allowed: std::collections::HashSet<&str> = shape_def + .predicates + .iter() + .map(|p| p.iri.as_str()) + .collect(); + + for triple in triples_added { + if allowed.contains(triple.predicate.as_str()) { + added_triples_by_pred + .entry(triple.predicate.as_str().to_string()) + .or_insert_with(|| vec![]) + .push(*triple); + } + } - //let base = NuriV0::repo_id(&repo.id); - let nuri_str = nuri.as_ref().map(|s| s.as_str()); + // Based on those triples, group by subject. + let mut added_triples_by_subject: HashMap> = HashMap::new(); + for triple in triples_added { + let subject_iri = match &triple.subject { + Subject::NamedNode(node) => node.as_str(), + _ => continue, // Won't happen. + }; + added_triples_by_subject + .entry(subject_iri.to_string()) + .or_insert_with(|| vec![]) + .push(triple.clone()); + } - let parsed = - Query::parse(&query, nuri_str).map_err(|e| NgError::OxiGraphError(e.to_string()))?; - let results = oxistore - .query(parsed, None) - .map_err(|e| NgError::OxiGraphError(e.to_string()))?; - let sols = match results { - QueryResults::Solutions(sols) => { - let mut results = vec![]; - for t in sols { - match t { - Err(e) => { - log_err!("{}", e.to_string()); - return Err(NgError::SparqlError(e.to_string())); + // Do the same for removed ones. + let mut removed_triples_by_pred: HashMap> = HashMap::new(); + // Collect allowed predicate IRIs for this shape + let allowed: std::collections::HashSet<&str> = shape_def + .predicates + .iter() + .map(|p| p.iri.as_str()) + .collect(); + + for triple in triples_removed { + if allowed.contains(triple.predicate.as_str()) { + removed_triples_by_pred + .entry(triple.predicate.as_str().to_string()) + .or_insert_with(|| vec![]) + .push(*triple); + } + } + let mut removed_triples_by_subject: HashMap> = HashMap::new(); + for triple in triples_removed { + let subject_iri = match &triple.subject { + Subject::NamedNode(node) => node.as_str(), + _ => continue, // Won't happen. + }; + removed_triples_by_subject + .entry(subject_iri.to_string()) + .or_insert_with(|| vec![]) + .push(triple.clone()); + } + + // Assumes all triples have same subject. + fn orm_from_triple_for_level<'a>( + shape: &OrmSchemaShape, + subject_iri: &String, + triples_added: &Vec, + triples_removed: &Vec, + tracked_subjects: &HashMap>>, + changes: &HashMap, + ) -> ( + Vec<&'a OrmTrackedPredicateChanges<'a>>, + Vec<&'a OrmTrackedPredicateChanges<'a>>, + ) { + let tracked_shapes_for_subject = tracked_subjects + .entry(subject_iri.clone()) + .or_insert_with(|| HashMap::new()); + + let tracked_subject = tracked_shapes_for_subject + .entry(subject_iri.clone()) + .or_insert_with(|| OrmTrackedSubject { + tracked_predicates: HashMap::new(), + parents: HashMap::new(), + valid: ng_net::orm::OrmTrackedSubjectValidity::Unknown, + subj_iri: subject_iri, + shape, + }); + + let subject_changes = + changes + .entry(subject_iri.clone()) + .or_insert_with(|| OrmTrackedSubjectChange { + subject_iri: subject_iri.clone(), + predicates: HashMap::new(), + tracked_subject, + valid: OrmTrackedSubjectValidity::Unknown, + }); + + // Keep track of all children that were spotted or removed. + let mut children_removed: Vec<&OrmTrackedPredicateChanges> = vec![]; + let mut children_added: Vec<&OrmTrackedPredicateChanges> = vec![]; + + // For each triple, check matching predicates in shape. + // keeping track of value count (for later validations). + // In parallel, we keep track of the values added (tracked_changes) + for triple in triples_added { + for schema_predicate in &shape.predicates { + if schema_predicate.iri != triple.predicate.as_str() { + // Triple does not match predicate. + continue; + } + // Predicate schema constraint matches this triple. + + // Add tracked predicate or increase cardinality + let tp = tracked_subject + .tracked_predicates + .entry(schema_predicate.iri.to_string()) + .or_insert_with(|| OrmTrackedPredicate { + current_cardinality: 0, + schema: schema_predicate, + tracked_children: Vec::new(), + current_literals: None, + }); + tp.current_cardinality += 1; + + let obj_term = oxrdf_term_to_orm_basic_type(&triple.object); + + // Keep track of the changed values too. + let pred_changes = subject_changes + .predicates + .entry(schema_predicate.iri.clone()) + .or_insert_with(|| OrmTrackedPredicateChanges { + tracked_predicate: &tp, + values_added: Vec::new(), + values_removed: Vec::new(), + validity: OrmTrackedSubjectValidity::Unknown, + }); + + pred_changes.values_added.push(obj_term.clone()); + + // If value type is literal, we need to add the current value to the tracked predicate. + if tp + .schema + .dataTypes + .iter() + .any(|dt| dt.valType == OrmSchemaLiteralType::literal) + { + if let Some(current_literals) = &mut tp.current_literals { + current_literals.push(obj_term); + } else { + tp.current_literals = Some(vec![obj_term]); } - Ok(querysol) => results.push(querysol.values().to_vec()), + } else if tp + .schema + .dataTypes + .iter() + .any(|dt| dt.valType == OrmSchemaLiteralType::shape) + { + // For nested, add object to tracked predicates and add self as parent. + children_added.push(&pred_changes); } } - Ok(results) } - _ => return Err(NgError::InvalidResponse), - }; - sols + + // Removed triples + for triple in triples_removed { + let pred_iri = triple.predicate.as_str(); + + // Only adjust if we had tracked state. + let Some(tp) = tracked_subjects + .get_mut(subject_iri) + .map(|tss| tss.get(&shape.iri)) + .flatten() + .map(|ts| ts.tracked_predicates.get(pred_iri)) + .flatten() + else { + continue; + }; + + // The cardinality might become -1 or 0. We will remove them from the tracked predicates during validation. + tp.current_cardinality -= 1; + + let Some(pred_changes) = subject_changes.predicates.get(pred_iri) else { + continue; + }; + + let val_removed = oxrdf_term_to_orm_basic_type(&triple.object); + pred_changes.values_removed.push(val_removed.clone()); + + // If value type is literal, we need to remove the current value from the tracked predicate. + if tp + .schema + .dataTypes + .iter() + .any(|dt| dt.valType == OrmSchemaLiteralType::literal) + { + if let Some(current_literals) = &mut tp.current_literals { + // Remove obj_val from current_literals in-place + current_literals.retain(|val| *val != val_removed); + } else { + tp.current_literals = Some(vec![val_removed]); + } + } else if tp + .schema + .dataTypes + .iter() + .any(|dt| dt.valType == OrmSchemaLiteralType::shape) + { + // For nested, add object to tracked predicates and add self as parent. + children_removed.push(&pred_changes); + } + } + + return (children_added, children_removed); + } + + fn check_subject_validity<'a>( + s_change: &'a OrmTrackedSubjectChange<'a>, + shape: &String, + schema: &'a OrmSchema, + previous_validity: OrmTrackedSubjectValidity, + ) -> ( + OrmTrackedSubjectValidity, + HashMap>>, + ) { + if s_change.predicates.is_empty() { + // There has not been any changes. There is nothing to do. + return (previous_validity, HashMap::new()); + } + + let previous_validity = s_change.valid; + let mut new_validity = OrmTrackedSubjectValidity::Valid; + // Helper to set own validity which does not overwrite worse invalids. + let mut set_validity = |new_val: OrmTrackedSubjectValidity| { + if new_val == OrmTrackedSubjectValidity::Invalid { + new_validity = OrmTrackedSubjectValidity::Invalid; + // Remove all tracked predicates + s_change.tracked_subject.tracked_predicates = HashMap::new(); + } else if new_val == OrmTrackedSubjectValidity::Unknown + && new_validity != OrmTrackedSubjectValidity::Invalid + { + new_validity = OrmTrackedSubjectValidity::Unknown; + } + }; + + let tracked_subject = s_change.tracked_subject; + let shape = schema.get(shape).expect("Shape not available"); + + // TODO: Check parent validities: + // If no parent is tracking us, we are untracked. + // If there is an infinite loop of parents pointing back to use, return invalid. + + // Keep track of objects that need to be validated against a shape to fetch and validate. + let mut new_unknowns: Vec<(&String, &OrmSchemaShape)> = vec![]; + + for p_schema in shape.predicates.iter() { + let p_change = s_change.predicates.get(&p_schema.iri); + let tracked_pred = p_change.map(|pc| pc.tracked_predicate); + + let count = tracked_pred + .map_or_else(|| 0, |tp: &OrmTrackedPredicate<'_>| tp.current_cardinality); + + if count < p_schema.minCardinality { + set_validity(OrmTrackedSubjectValidity::Invalid); + if count <= 0 { + // If no other parent is tracking, remove all tracked predicates. + tracked_subject.tracked_predicates.remove(&p_schema.iri); + } + } else if count > p_schema.maxCardinality + && p_schema.maxCardinality != -1 + && p_schema.extra != Some(true) + { + // If cardinality is too high and no extra allowed, invalid. + set_validity(OrmTrackedSubjectValidity::Invalid); + break; + } else if p_schema + .dataTypes + .iter() + .any(|dt| dt.valType == OrmSchemaLiteralType::literal) + { + // If we have literals, check if all required literals are present. + let required_literals: Vec = p_schema + .dataTypes + .iter() + .flat_map(|dt| dt.literals) + .flatten() + .collect(); + + // Early stop: If no extra values allowed but the sizes + // between required and given values mismatches. + if !p_schema.extra.unwrap_or(false) + && (required_literals.len().into() + != tracked_pred.map_or(0, |p| p.current_cardinality)) + { + set_validity(OrmTrackedSubjectValidity::Invalid); + break; + } + + // Check that each required literal is present. + for required_literal in required_literals { + // Is tracked predicate present? + if !tracked_pred + .iter() + .flat_map(|tp| tp.current_literals) + .flatten() + .any(|literal| literal == required_literal) + { + set_validity(OrmTrackedSubjectValidity::Invalid); + break; + } + } + } else if p_schema + .dataTypes + .iter() + .any(|dt| dt.valType == OrmSchemaLiteralType::shape) + { + // If we have a nested shape, we need to check if the nested object is tracked and valid. + + // First, Count valid, invalid, unknowns, and untracked + let counts = tracked_pred + .iter() + .flat_map(|tp| tp.tracked_children) + .map(|tc| { + tc.upgrade().map(|tc| { + if tc.valid == OrmTrackedSubjectValidity::Valid { + (1, 0, 0, 0) + } else if tc.valid == OrmTrackedSubjectValidity::Invalid { + (0, 1, 0, 0) + } else if tc.valid == OrmTrackedSubjectValidity::Unknown { + (0, 0, 1, 0) + } else if tc.valid == OrmTrackedSubjectValidity::Untracked { + (0, 0, 0, 1) + } else { + (0, 0, 0, 0) + } + }) + }) + .flatten() + .fold((0, 0, 0, 0), |(v1, i1, u1, ut1), o| { + (v1 + o.0, i1 + o.1, u1 + o.2, ut1 + o.3) + }); + + if counts.1 > 0 && p_schema.extra != Some(true) { + // If we have at least one invalid nested object and no extra allowed, invalid. + set_validity(OrmTrackedSubjectValidity::Invalid); + break; + } else if counts.0 < p_schema.minCardinality { + // If we have not enough valid nested objects, invalid. + set_validity(OrmTrackedSubjectValidity::Invalid); + break; + } else if counts.3 > 0 { + // If we have untracked nested objects, we need to fetch them and validate. + set_validity(OrmTrackedSubjectValidity::Unknown); + // Add them to the list of unknowns to fetch and validate. + for o in tracked_pred + .iter() + .flat_map(|tp| tp.tracked_children.iter()) + { + if let Some(tc) = o.upgrade() { + if tc.valid == OrmTrackedSubjectValidity::Untracked { + new_unknowns.push((tc.subj_iri, tc.shape)); + } + } + } + } else if counts.2 > 0 { + // If we have unknown nested objects, we need to wait for their evaluation. + set_validity(OrmTrackedSubjectValidity::Unknown); + } else { + // All nested objects are valid and cardinality is correct. + // We are valid with this predicate. + } + } else { + // Check if the data type is correct. + let allowed_types: Vec = + p_schema.dataTypes.iter().map(|dt| dt.valType).collect(); + // For each new value, check that data type is in allowed_types. + for val_added in p_change.iter().map(|pc| pc.values_added).flatten() { + let matches = match val_added { + BasicType::Bool(_) => allowed_types + .iter() + .any(|t| *t == OrmSchemaLiteralType::boolean), + BasicType::Num(_) => allowed_types + .iter() + .any(|t| *t == OrmSchemaLiteralType::number), + BasicType::Str(_) => allowed_types.iter().any(|t| { + *t == OrmSchemaLiteralType::string + || *t == OrmSchemaLiteralType::iri + }), + }; + if !matches { + set_validity(OrmTrackedSubjectValidity::Invalid); + break; + } + } + // Break if validity has become invalid. + if new_validity == OrmTrackedSubjectValidity::Invalid { + break; + } + }; + } + + // TODO + // If we are invalid, we can discard new unknowns again - they won't be kept in memory. + if new_validity == OrmTrackedSubjectValidity::Invalid { + return (OrmTrackedSubjectValidity::Invalid, HashMap::new()); + } else if (new_validity == OrmTrackedSubjectValidity::Valid + && previous_validity != OrmTrackedSubjectValidity::Valid) + { + // If the validity is newly valid, we need to refetch this subject. + // TODO + } + // If validity changed, inform parents (add to new_unknowns). + if new_validity != previous_validity { + // TODO + } + + // TODO + return (new_validity, new_unknowns); + } + + let all_subjects: HashSet<&String> = added_triples_by_subject + .keys() + .chain(removed_triples_by_subject.keys()) + .collect(); + + // Process added/removed triples for each subject. + for subject_iri in all_subjects { + let triples_added_for_subj = added_triples_by_subject + .get(subject_iri) + .unwrap_or(&vec![]) + .to_vec(); + let triples_removed_for_subj = removed_triples_by_subject + .get(subject_iri) + .unwrap_or(&vec![]) + .to_vec(); + + let _ = orm_from_triple_for_level( + &shape_def, + &subject_iri, + &triples_added_for_subj, + &triples_removed_for_subj, + &tracked_subjects, + &subject_changes, + ); + } + + // TODO ==== + + // To process validation, we collect all subject changes in one of the buckets. + // Subjects for which we did not apply triples. + let un_processed: HashSet = HashSet::new(); + // Subjects that are invalid. No further processing needed. + let invalids: HashSet<&OrmTrackedSubjectChange> = HashSet::new(); + // Subjects that are valid. Fetch might still be required if newly valid. + let valids: HashSet<&OrmTrackedSubjectChange> = HashSet::new(); + // Will need re-evaluation + let unknowns: HashSet<&OrmTrackedSubjectChange> = HashSet::new(); + // Either because it became tracked again or because it's newly valid. + let needs_fetch: HashSet<&OrmTrackedSubjectChange> = HashSet::new(); + + while !unknowns.is_empty() || !needs_fetch.is_empty() { + // Process buckets by priority and nesting + // First unknown, then needs fetch (the latter could still become invalid). + // Start from from the end because nested objects will less likely need further nested eval. + + // Check validity for each modified subject. + for sc in un_processed { + let tracked_subject = tracked_subjects + .get(sc.tracked_subject.subj_iri) + .unwrap() + .get(shape) + .unwrap(); + let (is_valid, new_unknowns) = + check_subject_validity(s_change, &shape, schema, tracked_subject.valid); + } + if !unknowns.is_empty() { + continue; + } + for sc in needs_fetch { + // TODO: fetch and evaluate. + } + } + // === } - fn create_orm_from_triples(&mut self, scope: &NuriV0, shape_type: &OrmShapeType) {} + // Collect result + // For all valid tracked_subjects, build an object from the tracked_subject_changes. - pub(crate) async fn orm_update(&mut self, scope: &NuriV0, patch: GraphQuadsPatch) {} + pub(crate) async fn orm_update(&mut self, scope: &NuriV0, patch: GraphTransaction) {} pub(crate) async fn orm_frontend_update( &mut self, @@ -134,17 +616,17 @@ impl Verifier { "push_orm_response {:?} {} {:?}", scope, schema_iri, - self.orm_subscriptions + self.orm_tracked_subjects ); - if let Some(shapes) = self.orm_subscriptions.get_mut(scope) { + if let Some(shapes) = self.orm_tracked_subjects.get_mut(scope) { if let Some(sessions) = shapes.get_mut(schema_iri) { let mut sessions_to_close: Vec = vec![]; - for (session_id, sender) in sessions.iter_mut() { - if sender.is_closed() { + for (session_id, subscription) in sessions.iter_mut() { + if subscription.sender.is_closed() { log_debug!("closed so removing session {}", session_id); sessions_to_close.push(*session_id); } else { - let _ = sender.send(response.clone()).await; + let _ = subscription.sender.send(response.clone()).await; } } for session_id in sessions_to_close.iter() { @@ -157,19 +639,53 @@ impl Verifier { pub(crate) async fn start_orm( &mut self, nuri: &NuriV0, - shape_type: &OrmShapeType, + shape_type: OrmShapeType, session_id: u64, ) -> Result<(Receiver, CancelFn), NgError> { let (tx, rx) = mpsc::unbounded::(); - self.orm_subscriptions.insert( + // TODO: Validate schema: + // If multiple data types are present for the same predicate, they must be of of the same type. + // All referenced shapes must be available. + + // Keep track of connections here. + self.orm_tracked_subjects.insert( nuri.clone(), HashMap::from([( shape_type.shape.clone(), - HashMap::from([(session_id, tx.clone())]), + HashMap::from([( + session_id, + OrmSubscription { + sender: tx.clone(), + tracked_objects: HashMap::new(), + }, + )]), )]), ); + // Add shape to registry or increase ref count. + if let Some(shape_ref) = self.orm_shape_types.get_mut(&shape_type.shape) { + shape_ref.ref_count += 1; + } else { + self.orm_shape_types.insert( + shape_type.shape.clone(), + OrmShapeTypeRef { + ref_count: 1, + shape_type, + }, + ); + } + + let shape_query = + sparql_construct_from_orm_shape_type(&shape_type.schema, &shape_type.shape, None)?; + let shape_triples = self.sparql_construct(shape_query, Some(nuri))?; + let orm_object = self.create_orm_from_triples( + nuri, + &shape_type.schema, + &shape_type.shape, + &shape_triples, + ); + //self.push_orm_response().await; (only for requester, not all sessions) let close = Box::new(move || { @@ -192,34 +708,33 @@ fn is_iri(s: &str) -> bool { fn literal_to_sparql_str(var: OrmSchemaDataType) -> Vec { match var.literals { None => [].to_vec(), - Some(literals) => match literals { - OrmSchemaLiterals::Bool(val) => { - if val == true { - ["true".to_string()].to_vec() - } else { - ["false".to_string()].to_vec() + Some(literals) => literals + .iter() + .map(|literal| match literal { + BasicType::Bool(val) => { + if *val { + "true".to_string() + } else { + "false".to_string() + } } - } - OrmSchemaLiterals::NumArray(numbers) => { - numbers.iter().map(|num| num.to_string()).collect() - } - OrmSchemaLiterals::StrArray(stings) => stings - .iter() - .map(|str| { - // We assume that strings can be IRIs (currently no support for typed literals). - if is_iri(str) { - format!("<{}>", escape_iri(str)) + BasicType::Num(number) => number.to_string(), + BasicType::Str(sting) => { + if is_iri(sting) { + format!("<{}>", escape_iri(sting)) } else { - format!("\"{}\"", escape_literal(str)) + format!("\"{}\"", escape_literal(sting)) } - }) - .collect(), - }, + } + }) + .collect(), } } pub fn sparql_construct_from_orm_shape_type( - shape_type: &OrmShapeType, + schema: &OrmSchema, + shape: &String, + // TODO: Remove max_recursion max_recursion: Option, ) -> Result { // Use a counter to generate unique variable names. @@ -361,16 +876,13 @@ pub fn sparql_construct_from_orm_shape_type( visited_shapes.remove(&shape.iri); } - let root_shape = shape_type - .schema - .get(&shape_type.shape) - .ok_or(VerifierError::InvalidOrmSchema)?; + let root_shape = schema.get(shape).ok_or(VerifierError::InvalidOrmSchema)?; // Root subject variable name let root_var_name = get_new_var_name(&mut var_counter); process_shape( - &shape_type.schema, + schema, root_shape, &root_var_name, &mut construct_statements, @@ -389,12 +901,12 @@ pub fn sparql_construct_from_orm_shape_type( )) } -// Escape an IRI fragment if needed (very conservative, only wrap with <...>). Assumes input already a full IRI. +/// Escape an IRI fragment if needed (very conservative, only wrap with <...>). Assumes input already a full IRI. fn escape_iri(iri: &str) -> String { format!("<{}>", iri) } -// SPARQL literal escape: backslash, quotes, newlines, tabs. +/// SPARQL literal escape: backslash, quotes, newlines, tabs. fn escape_literal(lit: &str) -> String { let mut out = String::with_capacity(lit.len() + 4); for c in lit.chars() { @@ -409,3 +921,61 @@ fn escape_literal(lit: &str) -> String { } return out; } + +/// Converts an oxrdf::Term to an orm::Term +fn oxrdf_term_to_orm_term(term: &ng_oxigraph::oxrdf::Term) -> ng_net::orm::Term { + match term { + ng_oxigraph::oxrdf::Term::NamedNode(node) => { + ng_net::orm::Term::Ref(node.as_str().to_string()) + } + ng_oxigraph::oxrdf::Term::BlankNode(node) => { + ng_net::orm::Term::Ref(node.as_str().to_string()) + } + ng_oxigraph::oxrdf::Term::Literal(literal) => { + // Check the datatype to determine how to convert + match literal.datatype().as_str() { + // Check for string first, this is the most common. + "http://www.w3.org/2001/XMLSchema#string" => { + ng_net::orm::Term::Str(literal.value().to_string()) + } + "http://www.w3.org/2001/XMLSchema#boolean" => { + match literal.value().parse::() { + Ok(b) => ng_net::orm::Term::Bool(b), + Err(_) => ng_net::orm::Term::Str(literal.value().to_string()), + } + } + "http://www.w3.org/2001/XMLSchema#integer" + | "http://www.w3.org/2001/XMLSchema#decimal" + | "http://www.w3.org/2001/XMLSchema#double" + | "http://www.w3.org/2001/XMLSchema#float" + | "http://www.w3.org/2001/XMLSchema#int" + | "http://www.w3.org/2001/XMLSchema#long" + | "http://www.w3.org/2001/XMLSchema#short" + | "http://www.w3.org/2001/XMLSchema#byte" + | "http://www.w3.org/2001/XMLSchema#unsignedInt" + | "http://www.w3.org/2001/XMLSchema#unsignedLong" + | "http://www.w3.org/2001/XMLSchema#unsignedShort" + | "http://www.w3.org/2001/XMLSchema#unsignedByte" => { + match literal.value().parse::() { + Ok(n) => ng_net::orm::Term::Num(n), + Err(_) => ng_net::orm::Term::Str(literal.value().to_string()), + } + } + _ => ng_net::orm::Term::Str(literal.value().to_string()), + } + } + ng_oxigraph::oxrdf::Term::Triple(triple) => { + // For RDF-star triples, convert to string representation + ng_net::orm::Term::Str(triple.to_string()) + } + } +} + +fn oxrdf_term_to_orm_basic_type(term: &ng_oxigraph::oxrdf::Term) -> BasicType { + match oxrdf_term_to_orm_term(term) { + ng_net::orm::Term::Str(s) => BasicType::Str(s), + ng_net::orm::Term::Num(n) => BasicType::Num(n), + ng_net::orm::Term::Bool(b) => BasicType::Bool(b), + ng_net::orm::Term::Ref(b) => BasicType::Str(b), // Treat IRIs as strings + } +} diff --git a/ng-verifier/src/verifier.rs b/ng-verifier/src/verifier.rs index e9da845..d5b36fc 100644 --- a/ng-verifier/src/verifier.rs +++ b/ng-verifier/src/verifier.rs @@ -19,13 +19,16 @@ use std::fs::create_dir_all; use std::fs::{read, File, OpenOptions}; #[cfg(all(not(target_family = "wasm"), not(docsrs)))] use std::io::Write; +use std::rc::Weak; use std::{collections::HashMap, sync::Arc}; use async_std::stream::StreamExt; use async_std::sync::{Mutex, RwLockReadGuard}; use futures::channel::mpsc; use futures::SinkExt; +use ng_net::orm::OrmSchemaPredicate; use ng_net::orm::OrmShapeTypeRef; +use ng_net::orm::OrmSubscription; use ng_oxigraph::oxigraph::sparql::Query; use ng_oxigraph::oxigraph::sparql::QueryResults; use ng_oxigraph::oxrdf::Term; @@ -112,8 +115,8 @@ pub struct Verifier { in_memory_outbox: Vec, uploads: BTreeMap, branch_subscriptions: HashMap>, - pub(crate) orm_subscriptions: - HashMap>>>, + pub(crate) orm_tracked_subjects: + HashMap>>, pub(crate) orm_shape_types: HashMap, pub(crate) temporary_repo_certificates: HashMap, } @@ -520,7 +523,7 @@ impl Verifier { inner_to_outer: HashMap::new(), uploads: BTreeMap::new(), branch_subscriptions: HashMap::new(), - orm_subscriptions: HashMap::new(), + orm_tracked_subjects: HashMap::new(), orm_shape_types: HashMap::new(), temporary_repo_certificates: HashMap::new(), } @@ -2813,7 +2816,7 @@ impl Verifier { inner_to_outer: HashMap::new(), uploads: BTreeMap::new(), branch_subscriptions: HashMap::new(), - orm_subscriptions: HashMap::new(), + orm_tracked_subjects: HashMap::new(), orm_shape_types: HashMap::new(), temporary_repo_certificates: HashMap::new(), };