diff --git a/nextgraph/src/tests/orm.rs b/nextgraph/src/tests/orm.rs index dc55808..5678973 100644 --- a/nextgraph/src/tests/orm.rs +++ b/nextgraph/src/tests/orm.rs @@ -9,13 +9,13 @@ use crate::local_broker::{doc_create, doc_sparql_construct, doc_sparql_update, orm_start}; use crate::tests::create_or_open_wallet::create_or_open_wallet; -use async_std::stream::{IntoStream, StreamExt}; +use async_std::stream::StreamExt; use ng_net::app_protocol::{AppResponse, AppResponseV0, NuriV0}; use ng_net::orm::{ BasicType, OrmSchemaDataType, OrmSchemaLiteralType, OrmSchemaPredicate, OrmSchemaShape, OrmShapeType, }; -use ng_verifier::orm::shape_type_to_sparql; +use ng_verifier::orm::utils::shape_type_to_sparql; use ng_repo::log_info; use std::collections::HashMap; @@ -592,39 +592,6 @@ INSERT DATA { // Define the ORM schema let mut schema = HashMap::new(); - schema.insert( - "http://example.org/TestObject||http://example.org/anotherObject".to_string(), - Arc::new(OrmSchemaShape { - iri: "http://example.org/TestObject||http://example.org/anotherObject".to_string(), - predicates: vec![ - Arc::new(OrmSchemaPredicate { - dataTypes: vec![OrmSchemaDataType { - valType: OrmSchemaLiteralType::string, - literals: None, - shape: None, - }], - iri: "http://example.org/prop1".to_string(), - readablePredicate: "prop1".to_string(), - maxCardinality: 1, - minCardinality: 1, - extra: None, - }), - Arc::new(OrmSchemaPredicate { - dataTypes: vec![OrmSchemaDataType { - valType: OrmSchemaLiteralType::number, - literals: None, - shape: None, - }], - iri: "http://example.org/prop2".to_string(), - readablePredicate: "prop2".to_string(), - maxCardinality: 1, - minCardinality: 1, - extra: None, - }), - ], - }), - ); - schema.insert( "http://example.org/TestObject".to_string(), Arc::new(OrmSchemaShape { @@ -692,36 +659,6 @@ INSERT DATA { minCardinality: 0, extra: None, }), - Arc::new(OrmSchemaPredicate { - dataTypes: vec![OrmSchemaDataType { - valType: OrmSchemaLiteralType::shape, - literals: None, - shape: Some( - "http://example.org/TestObject||http://example.org/objectValue" - .to_string(), - ), - }], - iri: "http://example.org/objectValue".to_string(), - readablePredicate: "objectValue".to_string(), - maxCardinality: 1, - minCardinality: 1, - extra: None, - }), - Arc::new(OrmSchemaPredicate { - dataTypes: vec![OrmSchemaDataType { - valType: OrmSchemaLiteralType::shape, - literals: None, - shape: Some( - "http://example.org/TestObject||http://example.org/anotherObject" - .to_string(), - ), - }], - iri: "http://example.org/anotherObject".to_string(), - readablePredicate: "anotherObject".to_string(), - maxCardinality: -1, - minCardinality: 0, - extra: None, - }), Arc::new(OrmSchemaPredicate { dataTypes: vec![ OrmSchemaDataType { @@ -760,51 +697,6 @@ INSERT DATA { }), ); - schema.insert( - "http://example.org/TestObject||http://example.org/objectValue".to_string(), - Arc::new(OrmSchemaShape { - iri: "http://example.org/TestObject||http://example.org/objectValue".to_string(), - predicates: vec![ - Arc::new(OrmSchemaPredicate { - dataTypes: vec![OrmSchemaDataType { - valType: OrmSchemaLiteralType::string, - literals: None, - shape: None, - }], - iri: "http://example.org/nestedString".to_string(), - readablePredicate: "nestedString".to_string(), - maxCardinality: 1, - minCardinality: 1, - extra: None, - }), - Arc::new(OrmSchemaPredicate { - dataTypes: vec![OrmSchemaDataType { - valType: OrmSchemaLiteralType::number, - literals: None, - shape: None, - }], - iri: "http://example.org/nestedNum".to_string(), - readablePredicate: "nestedNum".to_string(), - maxCardinality: 1, - minCardinality: 1, - extra: None, - }), - Arc::new(OrmSchemaPredicate { - dataTypes: vec![OrmSchemaDataType { - valType: OrmSchemaLiteralType::number, - literals: None, - shape: None, - }], - iri: "http://example.org/nestedArray".to_string(), - readablePredicate: "nestedArray".to_string(), - maxCardinality: -1, - minCardinality: 0, - extra: None, - }), - ], - }), - ); - let shape_type = OrmShapeType { schema, shape: "http://example.org/TestObject".to_string(), diff --git a/ng-verifier/src/orm/add_remove_triples.rs b/ng-verifier/src/orm/add_remove_triples.rs index 2f61b35..1b374c1 100644 --- a/ng-verifier/src/orm/add_remove_triples.rs +++ b/ng-verifier/src/orm/add_remove_triples.rs @@ -78,6 +78,7 @@ pub fn add_remove_triples( // Process added triples. // For each triple, check if it matches the shape. // In parallel, we record the values added and removed (tracked_changes) + log_debug!("Processing # triples: {}", triples_added.len()); for triple in triples_added { let obj_term = oxrdf_term_to_orm_basic_type(&triple.object); log_debug!("processing triple {triple}"); @@ -175,8 +176,8 @@ pub fn add_remove_triples( ); tracked_predicate.tracked_children.push(tracked_child_arc); } + log_debug!("end of dealing with nesting"); } - log_debug!("end of dealing with nesting"); } } // Process removed triples. diff --git a/ng-verifier/src/orm/mod.rs b/ng-verifier/src/orm/mod.rs index dbb90c3..7d90c6d 100644 --- a/ng-verifier/src/orm/mod.rs +++ b/ng-verifier/src/orm/mod.rs @@ -9,11 +9,11 @@ pub mod add_remove_triples; pub mod types; +pub mod utils; pub mod validation; use futures::channel::mpsc; -use ng_oxigraph::oxrdf::Subject; -use ng_repo::types::OverlayId; +use futures::channel::mpsc::UnboundedSender; use std::collections::HashMap; use std::collections::HashSet; @@ -22,26 +22,22 @@ use std::sync::RwLock; use std::u64; use futures::SinkExt; -use lazy_static::lazy_static; pub use ng_net::orm::{OrmDiff, OrmShapeType}; use ng_net::utils::Receiver; use ng_net::{app_protocol::*, orm::*}; use ng_oxigraph::oxigraph::sparql::{Query, QueryResults}; use ng_oxigraph::oxrdf::Triple; use ng_repo::errors::NgError; -use ng_repo::errors::VerifierError; use ng_repo::log::*; -use regex::Regex; use serde_json::json; use serde_json::Value; use crate::orm::add_remove_triples::add_remove_triples; use crate::orm::types::*; +use crate::orm::utils::*; use crate::types::*; use crate::verifier::*; -type ShapeIri = String; -type SubjectIri = String; // Structure to store changes in. By shape iri > subject iri > OrmTrackedSubjectChange // **NOTE**: In comparison to OrmSubscription.tracked_subjects, the outer hashmap's keys are shape IRIs. // (shape IRI -> (subject IRI -> OrmTrackedSubjectChange)) @@ -61,9 +57,8 @@ impl Verifier { // ); //let base = NuriV0::repo_id(&repo.id); - log_debug!("querying construct\n{}\n\n", query); - let nuri_str = nuri.as_ref().map(|s| s.as_str()); + log_debug!("querying construct\n{}\n{}\n\n", nuri_str.unwrap(), query); let parsed = Query::parse(&query, nuri_str).map_err(|e| NgError::OxiGraphError(e.to_string()))?; @@ -95,6 +90,7 @@ impl Verifier { session_id: u64, triples_added: &[Triple], triples_removed: &[Triple], + data_already_fetched: bool, ) -> Result { let mut orm_changes = HashMap::new(); @@ -112,7 +108,6 @@ impl Verifier { }) .collect(); - log_debug!("process_changes_for_nuri_and_session {:?}", shapes); for root_shape in shapes { self.process_changes_for_shape_and_session( nuri, @@ -121,6 +116,7 @@ impl Verifier { triples_added, triples_removed, &mut orm_changes, + data_already_fetched, )?; } @@ -137,6 +133,7 @@ impl Verifier { triples_added: &[Triple], triples_removed: &[Triple], orm_changes: &mut OrmChanges, + data_already_fetched: bool, ) -> Result<(), NgError> { // First in, last out stack to keep track of objects to validate (nested objects first). Strings are object IRIs. let mut shape_validation_queue: Vec<(Arc, Vec)> = vec![]; @@ -147,6 +144,10 @@ impl Verifier { // For a given shape, we evaluate every subject against that shape. while let Some((shape, objects_to_validate)) = shape_validation_queue.pop() { // Collect triples relevant for validation. + log_debug!( + "process_changes_for_shape_and_session triples_added: {:?}", + triples_added + ); let added_triples_by_subject = group_by_subject_for_shape(&shape, triples_added, &objects_to_validate); let removed_triples_by_subject = @@ -168,11 +169,8 @@ impl Verifier { let mut nested_objects_to_eval: HashMap> = HashMap::new(); - log_debug!( - "processing_changes_for_shape_and_session for shape {:?}", - shape - ); // For each subject, add/remove triples and validate. + log_debug!("all_modified_subjects: {:?}", all_modified_subjects); for subject_iri in all_modified_subjects { let triples_added_for_subj = added_triples_by_subject @@ -240,42 +238,50 @@ impl Verifier { } } - // Now, we fetch all un-fetched subjects for re-evaluation. + // Now, we queue all non-evaluated objects for (shape_iri, objects_to_eval) in &nested_objects_to_eval { - let objects_to_fetch = objects_to_eval - .iter() - .filter(|(_iri, needs_fetch)| *needs_fetch) - .map(|(s, _)| s.clone()) - .collect(); - let orm_subscription = self.get_first_orm_subscription_for(nuri, Some(&shape.iri), Some(&session_id)); - // Extract schema and shape Arc before mutable borrow let schema = orm_subscription.shape_type.schema.clone(); let shape_arc = schema.get(shape_iri).unwrap().clone(); - // Create sparql query - let shape_query = - shape_type_to_sparql(&schema, &shape_iri, Some(objects_to_fetch))?; - let new_triples = - self.query_sparql_construct(shape_query, Some(nuri_to_string(nuri)))?; - - self.process_changes_for_shape_and_session( - nuri, - shape_arc.clone(), - session_id, - &new_triples, - &vec![], - orm_changes, - )?; - - let objects_not_to_fetch = objects_to_eval + // Data might need to be fetched (if it has not been during initialization or nested shape fetch). + if !data_already_fetched { + let objects_to_fetch = objects_to_eval + .iter() + .filter(|(_iri, needs_fetch)| *needs_fetch) + .map(|(s, _)| s.clone()) + .collect(); + + // Create sparql query + let shape_query = + shape_type_to_sparql(&schema, &shape_iri, Some(objects_to_fetch))?; + let new_triples = + self.query_sparql_construct(shape_query, Some(nuri.repo()))?; + + // Recursively process nested objects. + self.process_changes_for_shape_and_session( + nuri, + shape_arc.clone(), + session_id, + &new_triples, + &vec![], + orm_changes, + true, + )?; + } + + // Add objects + let objects_not_to_fetch: Vec = objects_to_eval .iter() .filter(|(_iri, needs_fetch)| !*needs_fetch) .map(|(s, _)| s.clone()) .collect(); - shape_validation_queue.push((shape_arc, objects_not_to_fetch)); + if objects_not_to_fetch.len() > 0 { + // Queue all objects that don't need fetching. + shape_validation_queue.push((shape_arc, objects_not_to_fetch)); + } } } @@ -327,6 +333,7 @@ impl Verifier { triples_removed: &[Triple], nuri: &NuriV0, only_for_session_id: Option, + data_already_fetched: bool, ) -> Result { log_debug!("apply_triple_changes {:?}", only_for_session_id); // If we have a specific session, handle only that subscription. @@ -336,6 +343,7 @@ impl Verifier { session_id, triples_added, triples_removed, + data_already_fetched, ); } @@ -356,6 +364,7 @@ impl Verifier { session_id, triples_added, triples_removed, + data_already_fetched, )?; for (shape_iri, subj_map) in changes { @@ -498,14 +507,12 @@ impl Verifier { session_id: u64, shape_type: &OrmShapeType, ) -> Result { - log_debug!("create_orm_object_for_shape {:?}", shape_type); // Query triples for this shape let shape_query = shape_type_to_sparql(&shape_type.schema, &shape_type.shape, None)?; - let shape_triples = self.query_sparql_construct(shape_query, Some(nuri_to_string(nuri)))?; - log_debug!("query_sparql_construct done {:?}", shape_triples); + let shape_triples = self.query_sparql_construct(shape_query, Some(nuri.repo()))?; + let changes: OrmChanges = - self.apply_triple_changes(&shape_triples, &[], nuri, Some(session_id.clone()))?; - log_debug!("apply_triple_changes done {:?}", changes); + self.apply_triple_changes(&shape_triples, &[], nuri, Some(session_id.clone()), true)?; let orm_subscription = self.get_first_orm_subscription_for(nuri, Some(&shape_type.shape), Some(&session_id)); @@ -519,10 +526,15 @@ impl Verifier { let mut return_vals: Value = Value::Array(vec![]); let return_val_vec = return_vals.as_array_mut().unwrap(); + log_debug!( + "Tracked subjects:\n{:?}\n", + orm_subscription.tracked_subjects, + ); // For each valid change struct, we build an orm object. // The way we get the changes from the tracked subjects is a bit hacky, sorry. for (subject_iri, tracked_subjects_by_shape) in &orm_subscription.tracked_subjects { if let Some(tracked_subject) = tracked_subjects_by_shape.get(&shape_type.shape) { + log_info!("changes for : {:?}\n{:?}", tracked_subject, changes); if tracked_subject.read().unwrap().valid == OrmTrackedSubjectValidity::Valid { if let Some(change) = changes .get(&shape_type.shape) @@ -534,6 +546,7 @@ impl Verifier { root_shape, &orm_subscription.tracked_subjects, ); + log_debug!("Materialized change:\n{:?}\ninto:\n{:?}", change, new_val); return_val_vec.push(new_val); } } @@ -556,21 +569,23 @@ impl Verifier { pub(crate) async fn push_orm_response( &mut self, - subscription: &Arc, + nuri: &NuriV0, + session_id: u64, + sender: UnboundedSender, response: AppResponse, ) { log_debug!( "sending orm response for session {}:\n{:?}", - subscription.session_id, + session_id, &response ); - if subscription.sender.is_closed() { - log_debug!("closed so removing session {}", subscription.session_id); + if sender.is_closed() { + log_debug!("closed so removing session {}", session_id); - self.orm_subscriptions.remove(&subscription.nuri); + self.orm_subscriptions.remove(&nuri); } else { - subscription.sender.clone().send(response); + sender.clone().send(response); } } @@ -603,16 +618,22 @@ impl Verifier { tracked_subjects: HashMap::new(), nuri: nuri.clone(), }; + self.orm_subscriptions .entry(nuri.clone()) .or_insert(vec![]) .push(orm_subscription); - let _orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type); + let _orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type)?; log_debug!("create_orm_object_for_shape return {:?}", _orm_objects); - // TODO integrate response - //self.push_orm_response().await; (only for requester, not all sessions) + self.push_orm_response( + &nuri.clone(), + session_id, + tx.clone(), + AppResponse::V0(AppResponseV0::OrmInitial(_orm_objects)), + ) + .await; let close = Box::new(move || { //log_debug!("CLOSE_CHANNEL of subscription for branch {}", branch_id); @@ -623,271 +644,3 @@ impl Verifier { Ok((rx, close)) } } - -/// Heuristic: -/// Consider a string an IRI if it contains alphanumeric characters and then a colon within the first 13 characters -fn is_iri(s: &str) -> bool { - lazy_static! { - static ref IRI_REGEX: Regex = Regex::new(r"^[A-Za-z][A-Za-z0-9+\.\-]{1,12}:").unwrap(); - } - IRI_REGEX.is_match(s) -} - -fn literal_to_sparql_str(var: OrmSchemaDataType) -> Vec { - match var.literals { - None => [].to_vec(), - Some(literals) => literals - .iter() - .map(|literal| match literal { - BasicType::Bool(val) => { - if *val { - "true".to_string() - } else { - "false".to_string() - } - } - BasicType::Num(number) => number.to_string(), - BasicType::Str(sting) => { - if is_iri(sting) { - format!("<{}>", sting) - } else { - format!("\"{}\"", escape_literal(sting)) - } - } - }) - .collect(), - } -} - -pub fn shape_type_to_sparql( - schema: &OrmSchema, - shape: &ShapeIri, - filter_subjects: Option>, -) -> Result { - // Use a counter to generate unique variable names. - let mut var_counter = 0; - fn get_new_var_name(counter: &mut i32) -> String { - let name = format!("v{}", counter); - *counter += 1; - name - } - - // Collect all statements to be added to the construct and where bodies. - let mut construct_statements = Vec::new(); - let mut where_statements = Vec::new(); - - // Keep track of visited shapes while recursing to prevent infinite loops. - let mut visited_shapes: HashSet = HashSet::new(); - - // Recursive function to call for (nested) shapes. - fn process_shape( - schema: &OrmSchema, - shape: &OrmSchemaShape, - subject_var_name: &str, - construct_statements: &mut Vec, - where_statements: &mut Vec, - var_counter: &mut i32, - visited_shapes: &mut HashSet, - ) { - // Prevent infinite recursion on cyclic schemas. - // TODO: We could handle this as IRI string reference. - if visited_shapes.contains(&shape.iri) { - return; - } - visited_shapes.insert(shape.iri.clone()); - - // Add statements for each predicate. - for predicate in &shape.predicates { - let mut union_branches = Vec::new(); - let mut allowed_literals = Vec::new(); - - // Predicate constraints might have more than one acceptable data type. Traverse each. - // It is assumed that constant literals, nested shapes and regular types are not mixed. - for datatype in &predicate.dataTypes { - if datatype.valType == OrmSchemaLiteralType::literal { - // Collect allowed literals and as strings - // (already in SPARQL-format, e.g. `"a astring"`, ``, `true`, or `42`). - allowed_literals.extend(literal_to_sparql_str(datatype.clone())); - } else if datatype.valType == OrmSchemaLiteralType::shape { - let shape_iri = &datatype.shape.clone().unwrap(); - let nested_shape = schema.get(shape_iri).unwrap(); - - // For the current acceptable shape, add CONSTRUCT, WHERE, and recurse. - - // Each shape option gets its own var. - let obj_var_name = get_new_var_name(var_counter); - - construct_statements.push(format!( - " ?{} <{}> ?{}", - subject_var_name, predicate.iri, obj_var_name - )); - // Those are later added to a UNION, if there is more than one shape. - union_branches.push(format!( - " ?{} <{}> ?{}", - subject_var_name, predicate.iri, obj_var_name - )); - - // Recurse to add statements for nested object. - process_shape( - schema, - nested_shape, - &obj_var_name, - construct_statements, - where_statements, - var_counter, - visited_shapes, - ); - } - } - - // The where statement which might be wrapped in OPTIONAL. - let where_body: String; - - if !allowed_literals.is_empty() - && !predicate.extra.unwrap_or(false) - && predicate.minCardinality > 0 - { - // If we have literal requirements and they are not optional ("extra"), - // Add CONSTRUCT, WHERE, and FILTER. - - let pred_var_name = get_new_var_name(var_counter); - construct_statements.push(format!( - " ?{} <{}> ?{}", - subject_var_name, predicate.iri, pred_var_name - )); - where_body = format!( - " ?{s} <{p}> ?{o} . \n FILTER (?{o} IN ({lits}))", - s = subject_var_name, - p = predicate.iri, - o = pred_var_name, - lits = allowed_literals.join(", ") - ); - } else if !union_branches.is_empty() { - // We have nested shape(s) which were already added to CONSTRUCT above. - // Join them with UNION. - - where_body = union_branches - .into_iter() - .map(|b| format!("{{\n{}\n}}", b)) - .collect::>() - .join(" UNION "); - } else { - // Regular predicate data type. Just add basic CONSTRUCT and WHERE statements. - - let pred_var_name = get_new_var_name(var_counter); - construct_statements.push(format!( - " ?{} <{}> ?{}", - subject_var_name, predicate.iri, pred_var_name - )); - where_body = format!( - " ?{} <{}> ?{}", - subject_var_name, predicate.iri, pred_var_name - ); - } - - // Wrap in optional, if necessary. - if predicate.minCardinality < 1 { - where_statements.push(format!(" OPTIONAL {{\n{}\n }}", where_body)); - } else { - where_statements.push(where_body); - }; - } - - visited_shapes.remove(&shape.iri); - } - - let root_shape = schema.get(shape).ok_or(VerifierError::InvalidOrmSchema)?; - - // Root subject variable name - let root_var_name = get_new_var_name(&mut var_counter); - - process_shape( - schema, - root_shape, - &root_var_name, - &mut construct_statements, - &mut where_statements, - &mut var_counter, - &mut visited_shapes, - ); - - // Filter subjects, if present. - if let Some(subjects) = filter_subjects { - let subjects_str = subjects - .iter() - .map(|s| format!("<{}>", s)) - .collect::>() - .join(", "); - where_statements.push(format!(" FILTER (?s0 IN ({})", subjects_str)); - } - - // Create query from statements. - let construct_body = construct_statements.join(" .\n"); - - let where_body = where_statements.join(" .\n"); - - Ok(format!( - "CONSTRUCT {{\n{}\n}}\nWHERE {{\n{}\n}}", - construct_body, where_body - )) -} - -/// SPARQL literal escape: backslash, quotes, newlines, tabs. -fn escape_literal(lit: &str) -> String { - let mut out = String::with_capacity(lit.len() + 4); - for c in lit.chars() { - match c { - '\\' => out.push_str("\\\\"), - '\"' => out.push_str("\\\""), - '\n' => out.push_str("\\n"), - '\r' => out.push_str("\\r"), - '\t' => out.push_str("\\t"), - _ => out.push(c), - } - } - return out; -} - -pub fn group_by_subject_for_shape<'a>( - shape: &OrmSchemaShape, - triples: &'a [Triple], - allowed_subjects: &[String], -) -> HashMap> { - let mut triples_by_subject: HashMap> = HashMap::new(); - let allowed_preds_set: HashSet<&str> = - shape.predicates.iter().map(|p| p.iri.as_str()).collect(); - let allowed_subject_set: HashSet<&str> = allowed_subjects.iter().map(|s| s.as_str()).collect(); - for triple in triples { - // triple.subject must be in allowed_subjects (or allowed_subjects empty) - // and triple.predicate must be in allowed_preds. - if allowed_preds_set.contains(triple.predicate.as_str()) { - // filter subjects if list provided - let subj = match &triple.subject { - Subject::NamedNode(n) => n.as_ref(), - _ => continue, - }; - // Subject must be in allowed subjects (or allowed_subjects is empty). - if allowed_subject_set.is_empty() || allowed_subject_set.contains(subj.as_str()) { - triples_by_subject - .entry(subj.to_string()) - .or_insert_with(Vec::new) - .push(triple); - } - } - } - - return triples_by_subject; -} - -fn nuri_to_string(nuri: &NuriV0) -> String { - // Get repo_id and overlay_id from the nuri - let repo_id = nuri.target.repo_id(); - let overlay_id = if let Some(overlay_link) = &nuri.overlay { - overlay_link.clone().try_into().unwrap() - } else { - // Default overlay for the repo - OverlayId::outer(repo_id) - }; - let graph_name = NuriV0::repo_graph_name(repo_id, &overlay_id); - graph_name -} diff --git a/ng-verifier/src/orm/types.rs b/ng-verifier/src/orm/types.rs index 8dd3916..23fdaf5 100644 --- a/ng-verifier/src/orm/types.rs +++ b/ng-verifier/src/orm/types.rs @@ -82,5 +82,5 @@ pub struct OrmSubscription { pub sender: Sender, pub tracked_subjects: HashMap>>>, } -type ShapeIri = String; -type SubjectIri = String; +pub type ShapeIri = String; +pub type SubjectIri = String; diff --git a/ng-verifier/src/orm/utils.rs b/ng-verifier/src/orm/utils.rs new file mode 100644 index 0000000..54aad45 --- /dev/null +++ b/ng-verifier/src/orm/utils.rs @@ -0,0 +1,294 @@ +// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers +// All rights reserved. +// Licensed under the Apache License, Version 2.0 +// +// or the MIT license , +// at your option. All files in the project carrying such +// notice may not be copied, modified, or distributed except +// according to those terms. + +use ng_oxigraph::oxrdf::Subject; +use ng_repo::log::*; +use ng_repo::types::OverlayId; + +use std::collections::HashMap; +use std::collections::HashSet; + +use lazy_static::lazy_static; +pub use ng_net::orm::{OrmDiff, OrmShapeType}; +use ng_net::{app_protocol::*, orm::*}; +use ng_oxigraph::oxrdf::Triple; +use ng_repo::errors::NgError; +use ng_repo::errors::VerifierError; +use regex::Regex; + +use crate::orm::types::*; + +/// Heuristic: +/// Consider a string an IRI if it contains alphanumeric characters and then a colon within the first 13 characters +pub fn is_iri(s: &str) -> bool { + lazy_static! { + static ref IRI_REGEX: Regex = Regex::new(r"^[A-Za-z][A-Za-z0-9+\.\-]{1,12}:").unwrap(); + } + IRI_REGEX.is_match(s) +} + +pub fn literal_to_sparql_str(var: OrmSchemaDataType) -> Vec { + match var.literals { + None => [].to_vec(), + Some(literals) => literals + .iter() + .map(|literal| match literal { + BasicType::Bool(val) => { + if *val { + "true".to_string() + } else { + "false".to_string() + } + } + BasicType::Num(number) => number.to_string(), + BasicType::Str(sting) => { + if is_iri(sting) { + format!("<{}>", sting) + } else { + format!("\"{}\"", escape_literal(sting)) + } + } + }) + .collect(), + } +} + +pub fn shape_type_to_sparql( + schema: &OrmSchema, + shape: &ShapeIri, + filter_subjects: Option>, +) -> Result { + // Use a counter to generate unique variable names. + let mut var_counter = 0; + fn get_new_var_name(counter: &mut i32) -> String { + let name = format!("v{}", counter); + *counter += 1; + name + } + + // Collect all statements to be added to the construct and where bodies. + let mut construct_statements = Vec::new(); + let mut where_statements = Vec::new(); + + // Keep track of visited shapes while recursing to prevent infinite loops. + let mut visited_shapes: HashSet = HashSet::new(); + + // Recursive function to call for (nested) shapes. + fn process_shape( + schema: &OrmSchema, + shape: &OrmSchemaShape, + subject_var_name: &str, + construct_statements: &mut Vec, + where_statements: &mut Vec, + var_counter: &mut i32, + visited_shapes: &mut HashSet, + ) { + // Prevent infinite recursion on cyclic schemas. + // TODO: We could handle this as IRI string reference. + if visited_shapes.contains(&shape.iri) { + return; + } + visited_shapes.insert(shape.iri.clone()); + + // Add statements for each predicate. + for predicate in &shape.predicates { + let mut union_branches = Vec::new(); + let mut allowed_literals = Vec::new(); + + // Predicate constraints might have more than one acceptable data type. Traverse each. + // It is assumed that constant literals, nested shapes and regular types are not mixed. + for datatype in &predicate.dataTypes { + if datatype.valType == OrmSchemaLiteralType::literal { + // Collect allowed literals and as strings + // (already in SPARQL-format, e.g. `"a astring"`, ``, `true`, or `42`). + allowed_literals.extend(literal_to_sparql_str(datatype.clone())); + } else if datatype.valType == OrmSchemaLiteralType::shape { + let shape_iri = &datatype.shape.clone().unwrap(); + let nested_shape = schema.get(shape_iri).unwrap(); + + // For the current acceptable shape, add CONSTRUCT, WHERE, and recurse. + + // Each shape option gets its own var. + let obj_var_name = get_new_var_name(var_counter); + + construct_statements.push(format!( + " ?{} <{}> ?{}", + subject_var_name, predicate.iri, obj_var_name + )); + // Those are later added to a UNION, if there is more than one shape. + union_branches.push(format!( + " ?{} <{}> ?{}", + subject_var_name, predicate.iri, obj_var_name + )); + + // Recurse to add statements for nested object. + process_shape( + schema, + nested_shape, + &obj_var_name, + construct_statements, + where_statements, + var_counter, + visited_shapes, + ); + } + } + + // The where statement which might be wrapped in OPTIONAL. + let where_body: String; + + if !allowed_literals.is_empty() + && !predicate.extra.unwrap_or(false) + && predicate.minCardinality > 0 + { + // If we have literal requirements and they are not optional ("extra"), + // Add CONSTRUCT, WHERE, and FILTER. + + let pred_var_name = get_new_var_name(var_counter); + construct_statements.push(format!( + " ?{} <{}> ?{}", + subject_var_name, predicate.iri, pred_var_name + )); + where_body = format!( + " ?{s} <{p}> ?{o} . \n FILTER(?{o} IN ({lits}))", + s = subject_var_name, + p = predicate.iri, + o = pred_var_name, + lits = allowed_literals.join(", ") + ); + } else if !union_branches.is_empty() { + // We have nested shape(s) which were already added to CONSTRUCT above. + // Join them with UNION. + + where_body = union_branches + .into_iter() + .map(|b| format!("{{\n{}\n}}", b)) + .collect::>() + .join(" UNION "); + } else { + // Regular predicate data type. Just add basic CONSTRUCT and WHERE statements. + + let pred_var_name = get_new_var_name(var_counter); + construct_statements.push(format!( + " ?{} <{}> ?{}", + subject_var_name, predicate.iri, pred_var_name + )); + where_body = format!( + " ?{} <{}> ?{}", + subject_var_name, predicate.iri, pred_var_name + ); + } + + // Wrap in optional, if necessary. + if predicate.minCardinality < 1 { + where_statements.push(format!(" OPTIONAL {{\n{}\n }}", where_body)); + } else { + where_statements.push(where_body); + }; + } + + visited_shapes.remove(&shape.iri); + } + + let root_shape = schema.get(shape).ok_or(VerifierError::InvalidOrmSchema)?; + + // Root subject variable name + let root_var_name = get_new_var_name(&mut var_counter); + + process_shape( + schema, + root_shape, + &root_var_name, + &mut construct_statements, + &mut where_statements, + &mut var_counter, + &mut visited_shapes, + ); + + // Filter subjects, if present. + if let Some(subjects) = filter_subjects { + log_debug!("filter_subjects: {:?}", subjects); + let subjects_str = subjects + .iter() + .map(|s| format!("<{}>", s)) + .collect::>() + .join(", "); + where_statements.push(format!(" FILTER(?v0 IN ({}))", subjects_str)); + } + + // Create query from statements. + let construct_body = construct_statements.join(" .\n"); + + let where_body = where_statements.join(" .\n"); + + Ok(format!( + "CONSTRUCT {{\n{}\n}}\nWHERE {{\n{}\n}}", + construct_body, where_body + )) +} + +/// SPARQL literal escape: backslash, quotes, newlines, tabs. +fn escape_literal(lit: &str) -> String { + let mut out = String::with_capacity(lit.len() + 4); + for c in lit.chars() { + match c { + '\\' => out.push_str("\\\\"), + '\"' => out.push_str("\\\""), + '\n' => out.push_str("\\n"), + '\r' => out.push_str("\\r"), + '\t' => out.push_str("\\t"), + _ => out.push(c), + } + } + return out; +} + +pub fn group_by_subject_for_shape<'a>( + shape: &OrmSchemaShape, + triples: &'a [Triple], + allowed_subjects: &[String], +) -> HashMap> { + let mut triples_by_subject: HashMap> = HashMap::new(); + let allowed_preds_set: HashSet<&str> = + shape.predicates.iter().map(|p| p.iri.as_str()).collect(); + let allowed_subject_set: HashSet<&str> = allowed_subjects.iter().map(|s| s.as_str()).collect(); + for triple in triples { + // triple.subject must be in allowed_subjects (or allowed_subjects empty) + // and triple.predicate must be in allowed_preds. + if allowed_preds_set.contains(triple.predicate.as_str()) { + // filter subjects if list provided + let subj = match &triple.subject { + Subject::NamedNode(n) => n.clone().into_string(), + _ => continue, + }; + // Subject must be in allowed subjects (or allowed_subjects is empty). + if allowed_subject_set.is_empty() || allowed_subject_set.contains(&subj.as_str()) { + triples_by_subject + .entry(subj) + .or_insert_with(Vec::new) + .push(triple); + } + } + } + + return triples_by_subject; +} + +pub fn nuri_to_string(nuri: &NuriV0) -> String { + // Get repo_id and overlay_id from the nuri + let repo_id = nuri.target.repo_id(); + let overlay_id = if let Some(overlay_link) = &nuri.overlay { + overlay_link.clone().try_into().unwrap() + } else { + // Default overlay for the repo + OverlayId::outer(repo_id) + }; + let graph_name = NuriV0::repo_graph_name(repo_id, &overlay_id); + graph_name +} diff --git a/ng-verifier/src/orm/validation.rs b/ng-verifier/src/orm/validation.rs index 13804bb..6129743 100644 --- a/ng-verifier/src/orm/validation.rs +++ b/ng-verifier/src/orm/validation.rs @@ -262,11 +262,14 @@ impl Verifier { }; } + tracked_subject.valid = new_validity.clone(); + if new_validity == OrmTrackedSubjectValidity::Invalid { // If we are invalid, we can discard new unknowns again - they won't be kept in memory. - // We need to remove ourself from child objects parents field and + // We need to remove ourself from child objects' parents field and // remove them if no other is tracking. - // Child relationship cleanup disabled (nested tracking disabled in this refactor step) + + // TODO: Child relationship cleanup disabled (nested tracking disabled in this refactor step) // Remove tracked predicates and set untracked. tracked_subject.tracked_predicates = HashMap::new(); @@ -276,8 +279,8 @@ impl Verifier { } else if new_validity == OrmTrackedSubjectValidity::Valid && previous_validity != OrmTrackedSubjectValidity::Valid { - // If this subject became valid, we need to refetch this subject; - // We fetch + // If this subject became valid, we need to refetch this subject. + // If the data has already been fetched, the parent function will prevent the fetch. need_evaluation.insert(0, (s_change.subject_iri.clone(), shape.iri.clone(), true)); } @@ -297,8 +300,6 @@ impl Verifier { .collect(); } - tracked_subject.valid = new_validity; - return need_evaluation; } }