wip: apply_changes_from_triples

feat/orm
Laurin Weger 3 weeks ago
parent 0b37d103c2
commit 27ec9302a6
No known key found for this signature in database
GPG Key ID: 9B372BB0B792770F
  1. 4
      nextgraph/src/tests/orm.rs
  2. 78
      ng-net/src/orm.rs
  3. 708
      ng-verifier/src/orm.rs
  4. 11
      ng-verifier/src/verifier.rs

@ -18,7 +18,7 @@ use ng_verifier::orm::sparql_construct_from_orm_shape_type;
use std::collections::HashMap; use std::collections::HashMap;
#[async_std::test] #[async_std::test]
async fn test_create_sparql_from_schema() { #async fn test_create_sparql_from_schema() {
// Setup wallet and document // Setup wallet and document
let (_wallet, session_id) = create_or_open_wallet().await; let (_wallet, session_id) = create_or_open_wallet().await;
let doc_nuri = doc_create( let doc_nuri = doc_create(
@ -536,5 +536,5 @@ INSERT DATA {
// Assert: All 6 triples (3 per person) should be returned. // Assert: All 6 triples (3 per person) should be returned.
log_info!("Triples:\n{:?}", triples); log_info!("Triples:\n{:?}", triples);
assert_eq!(triples.len(), 24); assert_eq!(triples.len(), 6);
} }

@ -11,12 +11,15 @@
#![allow(non_snake_case)] #![allow(non_snake_case)]
use std::collections::HashMap; use std::{collections::HashMap, rc::Weak};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use crate::app_protocol::AppResponse;
use crate::utils::Sender;
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OrmShapeType { pub struct OrmShapeType {
pub schema: OrmSchema, pub schema: OrmSchema,
@ -68,18 +71,18 @@ pub enum OrmSchemaLiteralType {
shape, shape,
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(untagged)] #[serde(untagged)]
pub enum OrmSchemaLiterals { pub enum BasicType {
Bool(bool), Bool(bool),
NumArray(Vec<f64>), Num(f64),
StrArray(Vec<String>), Str(String),
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OrmSchemaDataType { pub struct OrmSchemaDataType {
pub valType: OrmSchemaLiteralType, pub valType: OrmSchemaLiteralType,
pub literals: Option<OrmSchemaLiterals>, pub literals: Option<Vec<BasicType>>,
pub shape: Option<String>, pub shape: Option<String>,
} }
@ -89,11 +92,66 @@ pub struct OrmSchemaPredicate {
pub iri: String, pub iri: String,
pub readablePredicate: String, pub readablePredicate: String,
/// `-1` for infinity /// `-1` for infinity
pub maxCardinality: i64, pub maxCardinality: i32,
pub minCardinality: i64, pub minCardinality: i32,
pub extra: Option<bool>, pub extra: Option<bool>,
} }
#[derive(Clone, Debug)]
pub struct OrmSubscription<'a> {
pub sender: Sender<AppResponse>,
pub tracked_objects: HashMap<String, OrmTrackedSubject<'a>>,
}
#[derive(Clone, Debug)]
pub struct OrmTrackedSubject<'a> {
pub tracked_predicates: HashMap<String, OrmTrackedPredicate<'a>>,
// Parents and if they are currently tracking us.
pub parents: HashMap<String, (OrmTrackedSubject<'a>, bool)>,
pub valid: OrmTrackedSubjectValidity,
pub subj_iri: &'a String,
pub shape: &'a OrmSchemaShape,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum OrmTrackedSubjectValidity {
Valid,
Invalid,
Unknown,
Untracked,
}
#[derive(Clone, Debug)]
pub struct OrmTrackedPredicate<'a> {
pub schema: &'a OrmSchemaPredicate,
pub tracked_children: Vec<Weak<OrmTrackedSubject<'a>>>,
pub current_cardinality: i32,
pub current_literals: Option<Vec<BasicType>>,
}
// Used only for tracking construction of new objects and diffs
// in parallel to modifying the tracked objects and predicates.
pub struct OrmTrackedSubjectChange<'a> {
pub subject_iri: String,
pub predicates: HashMap<String, OrmTrackedPredicateChanges<'a>>,
pub valid: OrmTrackedSubjectValidity,
pub tracked_subject: &'a OrmTrackedSubject<'a>,
}
pub struct OrmTrackedPredicateChanges<'a> {
pub tracked_predicate: &'a OrmTrackedPredicate<'a>,
pub values_added: Vec<BasicType>,
pub values_removed: Vec<BasicType>,
pub validity: OrmTrackedSubjectValidity,
}
#[derive(Clone, Debug)]
pub enum Term {
Str(String),
Num(f64),
Bool(bool),
Ref(String),
}
impl Default for OrmSchemaDataType { impl Default for OrmSchemaDataType {
fn default() -> Self { fn default() -> Self {
Self { Self {
@ -120,6 +178,6 @@ impl Default for OrmSchemaPredicate {
/** == Internal data types == */ /** == Internal data types == */
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct OrmShapeTypeRef { pub struct OrmShapeTypeRef {
ref_count: u64, pub ref_count: u64,
shape_type: OrmShapeType, pub shape_type: OrmShapeType,
} }

@ -8,21 +8,37 @@
// according to those terms. // according to those terms.
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::HashSet;
use std::hash::Hash;
use std::rc::Weak;
use async_std::task::current;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::SinkExt; use futures::SinkExt;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use ng_net::orm::BasicType;
pub use ng_net::orm::OrmDiff; pub use ng_net::orm::OrmDiff;
use ng_net::orm::OrmSchemaLiteralType;
pub use ng_net::orm::OrmShapeType; pub use ng_net::orm::OrmShapeType;
use ng_net::orm::OrmShapeTypeRef;
use ng_net::orm::OrmSubscription;
use ng_net::orm::OrmTrackedPredicate;
use ng_net::orm::OrmTrackedPredicateChanges;
use ng_net::orm::OrmTrackedSubject;
use ng_net::orm::OrmTrackedSubjectChange;
use ng_net::orm::OrmTrackedSubjectValidity;
use ng_net::orm::Term;
use ng_net::orm::{OrmSchemaDataType, OrmSchemaShape}; use ng_net::orm::{OrmSchemaDataType, OrmSchemaShape};
use ng_net::orm::{OrmSchemaLiteralType, OrmSchemaLiterals};
use ng_net::{app_protocol::*, orm::OrmSchema}; use ng_net::{app_protocol::*, orm::OrmSchema};
use ng_net::{ use ng_net::{
types::*, types::*,
utils::{Receiver, Sender}, utils::{Receiver, Sender},
}; };
use ng_oxigraph::oxigraph::sparql::{results::*, Query, QueryResults}; use ng_oxigraph::oxigraph::sparql::{results::*, Query, QueryResults};
use ng_oxigraph::oxrdf::LiteralRef;
use ng_oxigraph::oxrdf::NamedNode;
use ng_oxigraph::oxrdf::Subject;
use ng_oxigraph::oxrdf::Term; use ng_oxigraph::oxrdf::Term;
use ng_oxigraph::oxrdf::Triple; use ng_oxigraph::oxrdf::Triple;
use ng_repo::errors::NgError; use ng_repo::errors::NgError;
@ -72,48 +88,514 @@ impl Verifier {
} }
} }
pub fn sparql_select( fn apply_changes_from_triples(
&self, &mut self,
query: String, scope: &NuriV0,
nuri: Option<String>, schema: &OrmSchema,
) -> Result<Vec<Vec<Option<Term>>>, NgError> { shape: &String,
let oxistore = self.graph_dataset.as_ref().unwrap(); triples_added: &Vec<Triple>,
triples_removed: &Vec<Triple>,
) {
let tracked_subjects: HashMap<String, HashMap<String, OrmTrackedSubject>> =
self.orm_tracked_subjects;
// Structure to store changes in.
let mut subject_changes: HashMap<String, OrmTrackedSubjectChange> = HashMap::new();
// Group triples by predicate (only keep predicates defined in the shape). Drop others.
let mut added_triples_by_pred: HashMap<String, Vec<Triple>> = HashMap::new();
let Some(shape_def) = schema.get(shape) else {
log_err!(
"Shape {} not found in schema when grouping triples by predicate",
shape
);
return;
};
// let graph_nuri = NuriV0::repo_graph_name( // Collect allowed predicate IRIs for this shape
// &update.repo_id, let allowed: std::collections::HashSet<&str> = shape_def
// &update.overlay_id, .predicates
// ); .iter()
.map(|p| p.iri.as_str())
.collect();
for triple in triples_added {
if allowed.contains(triple.predicate.as_str()) {
added_triples_by_pred
.entry(triple.predicate.as_str().to_string())
.or_insert_with(|| vec![])
.push(*triple);
}
}
//let base = NuriV0::repo_id(&repo.id); // Based on those triples, group by subject.
let nuri_str = nuri.as_ref().map(|s| s.as_str()); let mut added_triples_by_subject: HashMap<String, Vec<Triple>> = HashMap::new();
for triple in triples_added {
let subject_iri = match &triple.subject {
Subject::NamedNode(node) => node.as_str(),
_ => continue, // Won't happen.
};
added_triples_by_subject
.entry(subject_iri.to_string())
.or_insert_with(|| vec![])
.push(triple.clone());
}
let parsed = // Do the same for removed ones.
Query::parse(&query, nuri_str).map_err(|e| NgError::OxiGraphError(e.to_string()))?; let mut removed_triples_by_pred: HashMap<String, Vec<Triple>> = HashMap::new();
let results = oxistore // Collect allowed predicate IRIs for this shape
.query(parsed, None) let allowed: std::collections::HashSet<&str> = shape_def
.map_err(|e| NgError::OxiGraphError(e.to_string()))?; .predicates
let sols = match results { .iter()
QueryResults::Solutions(sols) => { .map(|p| p.iri.as_str())
let mut results = vec![]; .collect();
for t in sols {
match t { for triple in triples_removed {
Err(e) => { if allowed.contains(triple.predicate.as_str()) {
log_err!("{}", e.to_string()); removed_triples_by_pred
return Err(NgError::SparqlError(e.to_string())); .entry(triple.predicate.as_str().to_string())
.or_insert_with(|| vec![])
.push(*triple);
}
}
let mut removed_triples_by_subject: HashMap<String, Vec<Triple>> = HashMap::new();
for triple in triples_removed {
let subject_iri = match &triple.subject {
Subject::NamedNode(node) => node.as_str(),
_ => continue, // Won't happen.
};
removed_triples_by_subject
.entry(subject_iri.to_string())
.or_insert_with(|| vec![])
.push(triple.clone());
}
// Assumes all triples have same subject.
fn orm_from_triple_for_level<'a>(
shape: &OrmSchemaShape,
subject_iri: &String,
triples_added: &Vec<Triple>,
triples_removed: &Vec<Triple>,
tracked_subjects: &HashMap<String, HashMap<String, OrmTrackedSubject<'a>>>,
changes: &HashMap<String, OrmTrackedSubjectChange>,
) -> (
Vec<&'a OrmTrackedPredicateChanges<'a>>,
Vec<&'a OrmTrackedPredicateChanges<'a>>,
) {
let tracked_shapes_for_subject = tracked_subjects
.entry(subject_iri.clone())
.or_insert_with(|| HashMap::new());
let tracked_subject = tracked_shapes_for_subject
.entry(subject_iri.clone())
.or_insert_with(|| OrmTrackedSubject {
tracked_predicates: HashMap::new(),
parents: HashMap::new(),
valid: ng_net::orm::OrmTrackedSubjectValidity::Unknown,
subj_iri: subject_iri,
shape,
});
let subject_changes =
changes
.entry(subject_iri.clone())
.or_insert_with(|| OrmTrackedSubjectChange {
subject_iri: subject_iri.clone(),
predicates: HashMap::new(),
tracked_subject,
valid: OrmTrackedSubjectValidity::Unknown,
});
// Keep track of all children that were spotted or removed.
let mut children_removed: Vec<&OrmTrackedPredicateChanges> = vec![];
let mut children_added: Vec<&OrmTrackedPredicateChanges> = vec![];
// For each triple, check matching predicates in shape.
// keeping track of value count (for later validations).
// In parallel, we keep track of the values added (tracked_changes)
for triple in triples_added {
for schema_predicate in &shape.predicates {
if schema_predicate.iri != triple.predicate.as_str() {
// Triple does not match predicate.
continue;
}
// Predicate schema constraint matches this triple.
// Add tracked predicate or increase cardinality
let tp = tracked_subject
.tracked_predicates
.entry(schema_predicate.iri.to_string())
.or_insert_with(|| OrmTrackedPredicate {
current_cardinality: 0,
schema: schema_predicate,
tracked_children: Vec::new(),
current_literals: None,
});
tp.current_cardinality += 1;
let obj_term = oxrdf_term_to_orm_basic_type(&triple.object);
// Keep track of the changed values too.
let pred_changes = subject_changes
.predicates
.entry(schema_predicate.iri.clone())
.or_insert_with(|| OrmTrackedPredicateChanges {
tracked_predicate: &tp,
values_added: Vec::new(),
values_removed: Vec::new(),
validity: OrmTrackedSubjectValidity::Unknown,
});
pred_changes.values_added.push(obj_term.clone());
// If value type is literal, we need to add the current value to the tracked predicate.
if tp
.schema
.dataTypes
.iter()
.any(|dt| dt.valType == OrmSchemaLiteralType::literal)
{
if let Some(current_literals) = &mut tp.current_literals {
current_literals.push(obj_term);
} else {
tp.current_literals = Some(vec![obj_term]);
} }
Ok(querysol) => results.push(querysol.values().to_vec()), } else if tp
.schema
.dataTypes
.iter()
.any(|dt| dt.valType == OrmSchemaLiteralType::shape)
{
// For nested, add object to tracked predicates and add self as parent.
children_added.push(&pred_changes);
} }
} }
Ok(results)
} }
_ => return Err(NgError::InvalidResponse),
}; // Removed triples
sols for triple in triples_removed {
let pred_iri = triple.predicate.as_str();
// Only adjust if we had tracked state.
let Some(tp) = tracked_subjects
.get_mut(subject_iri)
.map(|tss| tss.get(&shape.iri))
.flatten()
.map(|ts| ts.tracked_predicates.get(pred_iri))
.flatten()
else {
continue;
};
// The cardinality might become -1 or 0. We will remove them from the tracked predicates during validation.
tp.current_cardinality -= 1;
let Some(pred_changes) = subject_changes.predicates.get(pred_iri) else {
continue;
};
let val_removed = oxrdf_term_to_orm_basic_type(&triple.object);
pred_changes.values_removed.push(val_removed.clone());
// If value type is literal, we need to remove the current value from the tracked predicate.
if tp
.schema
.dataTypes
.iter()
.any(|dt| dt.valType == OrmSchemaLiteralType::literal)
{
if let Some(current_literals) = &mut tp.current_literals {
// Remove obj_val from current_literals in-place
current_literals.retain(|val| *val != val_removed);
} else {
tp.current_literals = Some(vec![val_removed]);
}
} else if tp
.schema
.dataTypes
.iter()
.any(|dt| dt.valType == OrmSchemaLiteralType::shape)
{
// For nested, add object to tracked predicates and add self as parent.
children_removed.push(&pred_changes);
}
}
return (children_added, children_removed);
}
fn check_subject_validity<'a>(
s_change: &'a OrmTrackedSubjectChange<'a>,
shape: &String,
schema: &'a OrmSchema,
previous_validity: OrmTrackedSubjectValidity,
) -> (
OrmTrackedSubjectValidity,
HashMap<String, HashMap<String, &'a OrmTrackedSubjectChange<'a>>>,
) {
if s_change.predicates.is_empty() {
// There has not been any changes. There is nothing to do.
return (previous_validity, HashMap::new());
}
let previous_validity = s_change.valid;
let mut new_validity = OrmTrackedSubjectValidity::Valid;
// Helper to set own validity which does not overwrite worse invalids.
let mut set_validity = |new_val: OrmTrackedSubjectValidity| {
if new_val == OrmTrackedSubjectValidity::Invalid {
new_validity = OrmTrackedSubjectValidity::Invalid;
// Remove all tracked predicates
s_change.tracked_subject.tracked_predicates = HashMap::new();
} else if new_val == OrmTrackedSubjectValidity::Unknown
&& new_validity != OrmTrackedSubjectValidity::Invalid
{
new_validity = OrmTrackedSubjectValidity::Unknown;
}
};
let tracked_subject = s_change.tracked_subject;
let shape = schema.get(shape).expect("Shape not available");
// TODO: Check parent validities:
// If no parent is tracking us, we are untracked.
// If there is an infinite loop of parents pointing back to use, return invalid.
// Keep track of objects that need to be validated against a shape to fetch and validate.
let mut new_unknowns: Vec<(&String, &OrmSchemaShape)> = vec![];
for p_schema in shape.predicates.iter() {
let p_change = s_change.predicates.get(&p_schema.iri);
let tracked_pred = p_change.map(|pc| pc.tracked_predicate);
let count = tracked_pred
.map_or_else(|| 0, |tp: &OrmTrackedPredicate<'_>| tp.current_cardinality);
if count < p_schema.minCardinality {
set_validity(OrmTrackedSubjectValidity::Invalid);
if count <= 0 {
// If no other parent is tracking, remove all tracked predicates.
tracked_subject.tracked_predicates.remove(&p_schema.iri);
}
} else if count > p_schema.maxCardinality
&& p_schema.maxCardinality != -1
&& p_schema.extra != Some(true)
{
// If cardinality is too high and no extra allowed, invalid.
set_validity(OrmTrackedSubjectValidity::Invalid);
break;
} else if p_schema
.dataTypes
.iter()
.any(|dt| dt.valType == OrmSchemaLiteralType::literal)
{
// If we have literals, check if all required literals are present.
let required_literals: Vec<BasicType> = p_schema
.dataTypes
.iter()
.flat_map(|dt| dt.literals)
.flatten()
.collect();
// Early stop: If no extra values allowed but the sizes
// between required and given values mismatches.
if !p_schema.extra.unwrap_or(false)
&& (required_literals.len().into()
!= tracked_pred.map_or(0, |p| p.current_cardinality))
{
set_validity(OrmTrackedSubjectValidity::Invalid);
break;
}
// Check that each required literal is present.
for required_literal in required_literals {
// Is tracked predicate present?
if !tracked_pred
.iter()
.flat_map(|tp| tp.current_literals)
.flatten()
.any(|literal| literal == required_literal)
{
set_validity(OrmTrackedSubjectValidity::Invalid);
break;
}
}
} else if p_schema
.dataTypes
.iter()
.any(|dt| dt.valType == OrmSchemaLiteralType::shape)
{
// If we have a nested shape, we need to check if the nested object is tracked and valid.
// First, Count valid, invalid, unknowns, and untracked
let counts = tracked_pred
.iter()
.flat_map(|tp| tp.tracked_children)
.map(|tc| {
tc.upgrade().map(|tc| {
if tc.valid == OrmTrackedSubjectValidity::Valid {
(1, 0, 0, 0)
} else if tc.valid == OrmTrackedSubjectValidity::Invalid {
(0, 1, 0, 0)
} else if tc.valid == OrmTrackedSubjectValidity::Unknown {
(0, 0, 1, 0)
} else if tc.valid == OrmTrackedSubjectValidity::Untracked {
(0, 0, 0, 1)
} else {
(0, 0, 0, 0)
}
})
})
.flatten()
.fold((0, 0, 0, 0), |(v1, i1, u1, ut1), o| {
(v1 + o.0, i1 + o.1, u1 + o.2, ut1 + o.3)
});
if counts.1 > 0 && p_schema.extra != Some(true) {
// If we have at least one invalid nested object and no extra allowed, invalid.
set_validity(OrmTrackedSubjectValidity::Invalid);
break;
} else if counts.0 < p_schema.minCardinality {
// If we have not enough valid nested objects, invalid.
set_validity(OrmTrackedSubjectValidity::Invalid);
break;
} else if counts.3 > 0 {
// If we have untracked nested objects, we need to fetch them and validate.
set_validity(OrmTrackedSubjectValidity::Unknown);
// Add them to the list of unknowns to fetch and validate.
for o in tracked_pred
.iter()
.flat_map(|tp| tp.tracked_children.iter())
{
if let Some(tc) = o.upgrade() {
if tc.valid == OrmTrackedSubjectValidity::Untracked {
new_unknowns.push((tc.subj_iri, tc.shape));
}
}
}
} else if counts.2 > 0 {
// If we have unknown nested objects, we need to wait for their evaluation.
set_validity(OrmTrackedSubjectValidity::Unknown);
} else {
// All nested objects are valid and cardinality is correct.
// We are valid with this predicate.
}
} else {
// Check if the data type is correct.
let allowed_types: Vec<OrmSchemaLiteralType> =
p_schema.dataTypes.iter().map(|dt| dt.valType).collect();
// For each new value, check that data type is in allowed_types.
for val_added in p_change.iter().map(|pc| pc.values_added).flatten() {
let matches = match val_added {
BasicType::Bool(_) => allowed_types
.iter()
.any(|t| *t == OrmSchemaLiteralType::boolean),
BasicType::Num(_) => allowed_types
.iter()
.any(|t| *t == OrmSchemaLiteralType::number),
BasicType::Str(_) => allowed_types.iter().any(|t| {
*t == OrmSchemaLiteralType::string
|| *t == OrmSchemaLiteralType::iri
}),
};
if !matches {
set_validity(OrmTrackedSubjectValidity::Invalid);
break;
}
}
// Break if validity has become invalid.
if new_validity == OrmTrackedSubjectValidity::Invalid {
break;
}
};
}
// TODO
// If we are invalid, we can discard new unknowns again - they won't be kept in memory.
if new_validity == OrmTrackedSubjectValidity::Invalid {
return (OrmTrackedSubjectValidity::Invalid, HashMap::new());
} else if (new_validity == OrmTrackedSubjectValidity::Valid
&& previous_validity != OrmTrackedSubjectValidity::Valid)
{
// If the validity is newly valid, we need to refetch this subject.
// TODO
}
// If validity changed, inform parents (add to new_unknowns).
if new_validity != previous_validity {
// TODO
}
// TODO
return (new_validity, new_unknowns);
}
let all_subjects: HashSet<&String> = added_triples_by_subject
.keys()
.chain(removed_triples_by_subject.keys())
.collect();
// Process added/removed triples for each subject.
for subject_iri in all_subjects {
let triples_added_for_subj = added_triples_by_subject
.get(subject_iri)
.unwrap_or(&vec![])
.to_vec();
let triples_removed_for_subj = removed_triples_by_subject
.get(subject_iri)
.unwrap_or(&vec![])
.to_vec();
let _ = orm_from_triple_for_level(
&shape_def,
&subject_iri,
&triples_added_for_subj,
&triples_removed_for_subj,
&tracked_subjects,
&subject_changes,
);
}
// TODO ====
// To process validation, we collect all subject changes in one of the buckets.
// Subjects for which we did not apply triples.
let un_processed: HashSet<String> = HashSet::new();
// Subjects that are invalid. No further processing needed.
let invalids: HashSet<&OrmTrackedSubjectChange> = HashSet::new();
// Subjects that are valid. Fetch might still be required if newly valid.
let valids: HashSet<&OrmTrackedSubjectChange> = HashSet::new();
// Will need re-evaluation
let unknowns: HashSet<&OrmTrackedSubjectChange> = HashSet::new();
// Either because it became tracked again or because it's newly valid.
let needs_fetch: HashSet<&OrmTrackedSubjectChange> = HashSet::new();
while !unknowns.is_empty() || !needs_fetch.is_empty() {
// Process buckets by priority and nesting
// First unknown, then needs fetch (the latter could still become invalid).
// Start from from the end because nested objects will less likely need further nested eval.
// Check validity for each modified subject.
for sc in un_processed {
let tracked_subject = tracked_subjects
.get(sc.tracked_subject.subj_iri)
.unwrap()
.get(shape)
.unwrap();
let (is_valid, new_unknowns) =
check_subject_validity(s_change, &shape, schema, tracked_subject.valid);
}
if !unknowns.is_empty() {
continue;
}
for sc in needs_fetch {
// TODO: fetch and evaluate.
}
}
// ===
} }
fn create_orm_from_triples(&mut self, scope: &NuriV0, shape_type: &OrmShapeType) {} // Collect result
// For all valid tracked_subjects, build an object from the tracked_subject_changes.
pub(crate) async fn orm_update(&mut self, scope: &NuriV0, patch: GraphQuadsPatch) {} pub(crate) async fn orm_update(&mut self, scope: &NuriV0, patch: GraphTransaction) {}
pub(crate) async fn orm_frontend_update( pub(crate) async fn orm_frontend_update(
&mut self, &mut self,
@ -134,17 +616,17 @@ impl Verifier {
"push_orm_response {:?} {} {:?}", "push_orm_response {:?} {} {:?}",
scope, scope,
schema_iri, schema_iri,
self.orm_subscriptions self.orm_tracked_subjects
); );
if let Some(shapes) = self.orm_subscriptions.get_mut(scope) { if let Some(shapes) = self.orm_tracked_subjects.get_mut(scope) {
if let Some(sessions) = shapes.get_mut(schema_iri) { if let Some(sessions) = shapes.get_mut(schema_iri) {
let mut sessions_to_close: Vec<u64> = vec![]; let mut sessions_to_close: Vec<u64> = vec![];
for (session_id, sender) in sessions.iter_mut() { for (session_id, subscription) in sessions.iter_mut() {
if sender.is_closed() { if subscription.sender.is_closed() {
log_debug!("closed so removing session {}", session_id); log_debug!("closed so removing session {}", session_id);
sessions_to_close.push(*session_id); sessions_to_close.push(*session_id);
} else { } else {
let _ = sender.send(response.clone()).await; let _ = subscription.sender.send(response.clone()).await;
} }
} }
for session_id in sessions_to_close.iter() { for session_id in sessions_to_close.iter() {
@ -157,19 +639,53 @@ impl Verifier {
pub(crate) async fn start_orm( pub(crate) async fn start_orm(
&mut self, &mut self,
nuri: &NuriV0, nuri: &NuriV0,
shape_type: &OrmShapeType, shape_type: OrmShapeType,
session_id: u64, session_id: u64,
) -> Result<(Receiver<AppResponse>, CancelFn), NgError> { ) -> Result<(Receiver<AppResponse>, CancelFn), NgError> {
let (tx, rx) = mpsc::unbounded::<AppResponse>(); let (tx, rx) = mpsc::unbounded::<AppResponse>();
self.orm_subscriptions.insert( // TODO: Validate schema:
// If multiple data types are present for the same predicate, they must be of of the same type.
// All referenced shapes must be available.
// Keep track of connections here.
self.orm_tracked_subjects.insert(
nuri.clone(), nuri.clone(),
HashMap::from([( HashMap::from([(
shape_type.shape.clone(), shape_type.shape.clone(),
HashMap::from([(session_id, tx.clone())]), HashMap::from([(
session_id,
OrmSubscription {
sender: tx.clone(),
tracked_objects: HashMap::new(),
},
)]),
)]), )]),
); );
// Add shape to registry or increase ref count.
if let Some(shape_ref) = self.orm_shape_types.get_mut(&shape_type.shape) {
shape_ref.ref_count += 1;
} else {
self.orm_shape_types.insert(
shape_type.shape.clone(),
OrmShapeTypeRef {
ref_count: 1,
shape_type,
},
);
}
let shape_query =
sparql_construct_from_orm_shape_type(&shape_type.schema, &shape_type.shape, None)?;
let shape_triples = self.sparql_construct(shape_query, Some(nuri))?;
let orm_object = self.create_orm_from_triples(
nuri,
&shape_type.schema,
&shape_type.shape,
&shape_triples,
);
//self.push_orm_response().await; (only for requester, not all sessions) //self.push_orm_response().await; (only for requester, not all sessions)
let close = Box::new(move || { let close = Box::new(move || {
@ -192,34 +708,33 @@ fn is_iri(s: &str) -> bool {
fn literal_to_sparql_str(var: OrmSchemaDataType) -> Vec<String> { fn literal_to_sparql_str(var: OrmSchemaDataType) -> Vec<String> {
match var.literals { match var.literals {
None => [].to_vec(), None => [].to_vec(),
Some(literals) => match literals { Some(literals) => literals
OrmSchemaLiterals::Bool(val) => { .iter()
if val == true { .map(|literal| match literal {
["true".to_string()].to_vec() BasicType::Bool(val) => {
} else { if *val {
["false".to_string()].to_vec() "true".to_string()
} else {
"false".to_string()
}
} }
} BasicType::Num(number) => number.to_string(),
OrmSchemaLiterals::NumArray(numbers) => { BasicType::Str(sting) => {
numbers.iter().map(|num| num.to_string()).collect() if is_iri(sting) {
} format!("<{}>", escape_iri(sting))
OrmSchemaLiterals::StrArray(stings) => stings
.iter()
.map(|str| {
// We assume that strings can be IRIs (currently no support for typed literals).
if is_iri(str) {
format!("<{}>", escape_iri(str))
} else { } else {
format!("\"{}\"", escape_literal(str)) format!("\"{}\"", escape_literal(sting))
} }
}) }
.collect(), })
}, .collect(),
} }
} }
pub fn sparql_construct_from_orm_shape_type( pub fn sparql_construct_from_orm_shape_type(
shape_type: &OrmShapeType, schema: &OrmSchema,
shape: &String,
// TODO: Remove max_recursion
max_recursion: Option<u8>, max_recursion: Option<u8>,
) -> Result<String, NgError> { ) -> Result<String, NgError> {
// Use a counter to generate unique variable names. // Use a counter to generate unique variable names.
@ -361,16 +876,13 @@ pub fn sparql_construct_from_orm_shape_type(
visited_shapes.remove(&shape.iri); visited_shapes.remove(&shape.iri);
} }
let root_shape = shape_type let root_shape = schema.get(shape).ok_or(VerifierError::InvalidOrmSchema)?;
.schema
.get(&shape_type.shape)
.ok_or(VerifierError::InvalidOrmSchema)?;
// Root subject variable name // Root subject variable name
let root_var_name = get_new_var_name(&mut var_counter); let root_var_name = get_new_var_name(&mut var_counter);
process_shape( process_shape(
&shape_type.schema, schema,
root_shape, root_shape,
&root_var_name, &root_var_name,
&mut construct_statements, &mut construct_statements,
@ -389,12 +901,12 @@ pub fn sparql_construct_from_orm_shape_type(
)) ))
} }
// Escape an IRI fragment if needed (very conservative, only wrap with <...>). Assumes input already a full IRI. /// Escape an IRI fragment if needed (very conservative, only wrap with <...>). Assumes input already a full IRI.
fn escape_iri(iri: &str) -> String { fn escape_iri(iri: &str) -> String {
format!("<{}>", iri) format!("<{}>", iri)
} }
// SPARQL literal escape: backslash, quotes, newlines, tabs. /// SPARQL literal escape: backslash, quotes, newlines, tabs.
fn escape_literal(lit: &str) -> String { fn escape_literal(lit: &str) -> String {
let mut out = String::with_capacity(lit.len() + 4); let mut out = String::with_capacity(lit.len() + 4);
for c in lit.chars() { for c in lit.chars() {
@ -409,3 +921,61 @@ fn escape_literal(lit: &str) -> String {
} }
return out; return out;
} }
/// Converts an oxrdf::Term to an orm::Term
fn oxrdf_term_to_orm_term(term: &ng_oxigraph::oxrdf::Term) -> ng_net::orm::Term {
match term {
ng_oxigraph::oxrdf::Term::NamedNode(node) => {
ng_net::orm::Term::Ref(node.as_str().to_string())
}
ng_oxigraph::oxrdf::Term::BlankNode(node) => {
ng_net::orm::Term::Ref(node.as_str().to_string())
}
ng_oxigraph::oxrdf::Term::Literal(literal) => {
// Check the datatype to determine how to convert
match literal.datatype().as_str() {
// Check for string first, this is the most common.
"http://www.w3.org/2001/XMLSchema#string" => {
ng_net::orm::Term::Str(literal.value().to_string())
}
"http://www.w3.org/2001/XMLSchema#boolean" => {
match literal.value().parse::<bool>() {
Ok(b) => ng_net::orm::Term::Bool(b),
Err(_) => ng_net::orm::Term::Str(literal.value().to_string()),
}
}
"http://www.w3.org/2001/XMLSchema#integer"
| "http://www.w3.org/2001/XMLSchema#decimal"
| "http://www.w3.org/2001/XMLSchema#double"
| "http://www.w3.org/2001/XMLSchema#float"
| "http://www.w3.org/2001/XMLSchema#int"
| "http://www.w3.org/2001/XMLSchema#long"
| "http://www.w3.org/2001/XMLSchema#short"
| "http://www.w3.org/2001/XMLSchema#byte"
| "http://www.w3.org/2001/XMLSchema#unsignedInt"
| "http://www.w3.org/2001/XMLSchema#unsignedLong"
| "http://www.w3.org/2001/XMLSchema#unsignedShort"
| "http://www.w3.org/2001/XMLSchema#unsignedByte" => {
match literal.value().parse::<f64>() {
Ok(n) => ng_net::orm::Term::Num(n),
Err(_) => ng_net::orm::Term::Str(literal.value().to_string()),
}
}
_ => ng_net::orm::Term::Str(literal.value().to_string()),
}
}
ng_oxigraph::oxrdf::Term::Triple(triple) => {
// For RDF-star triples, convert to string representation
ng_net::orm::Term::Str(triple.to_string())
}
}
}
fn oxrdf_term_to_orm_basic_type(term: &ng_oxigraph::oxrdf::Term) -> BasicType {
match oxrdf_term_to_orm_term(term) {
ng_net::orm::Term::Str(s) => BasicType::Str(s),
ng_net::orm::Term::Num(n) => BasicType::Num(n),
ng_net::orm::Term::Bool(b) => BasicType::Bool(b),
ng_net::orm::Term::Ref(b) => BasicType::Str(b), // Treat IRIs as strings
}
}

@ -19,13 +19,16 @@ use std::fs::create_dir_all;
use std::fs::{read, File, OpenOptions}; use std::fs::{read, File, OpenOptions};
#[cfg(all(not(target_family = "wasm"), not(docsrs)))] #[cfg(all(not(target_family = "wasm"), not(docsrs)))]
use std::io::Write; use std::io::Write;
use std::rc::Weak;
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use async_std::stream::StreamExt; use async_std::stream::StreamExt;
use async_std::sync::{Mutex, RwLockReadGuard}; use async_std::sync::{Mutex, RwLockReadGuard};
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::SinkExt; use futures::SinkExt;
use ng_net::orm::OrmSchemaPredicate;
use ng_net::orm::OrmShapeTypeRef; use ng_net::orm::OrmShapeTypeRef;
use ng_net::orm::OrmSubscription;
use ng_oxigraph::oxigraph::sparql::Query; use ng_oxigraph::oxigraph::sparql::Query;
use ng_oxigraph::oxigraph::sparql::QueryResults; use ng_oxigraph::oxigraph::sparql::QueryResults;
use ng_oxigraph::oxrdf::Term; use ng_oxigraph::oxrdf::Term;
@ -112,8 +115,8 @@ pub struct Verifier {
in_memory_outbox: Vec<EventOutboxStorage>, in_memory_outbox: Vec<EventOutboxStorage>,
uploads: BTreeMap<u32, RandomAccessFile>, uploads: BTreeMap<u32, RandomAccessFile>,
branch_subscriptions: HashMap<BranchId, Sender<AppResponse>>, branch_subscriptions: HashMap<BranchId, Sender<AppResponse>>,
pub(crate) orm_subscriptions: pub(crate) orm_tracked_subjects:
HashMap<NuriV0, HashMap<String, HashMap<u64, Sender<AppResponse>>>>, HashMap<NuriV0, HashMap<String, HashMap<u64, OrmSubscription>>>,
pub(crate) orm_shape_types: HashMap<String, OrmShapeTypeRef>, pub(crate) orm_shape_types: HashMap<String, OrmShapeTypeRef>,
pub(crate) temporary_repo_certificates: HashMap<RepoId, ObjectRef>, pub(crate) temporary_repo_certificates: HashMap<RepoId, ObjectRef>,
} }
@ -520,7 +523,7 @@ impl Verifier {
inner_to_outer: HashMap::new(), inner_to_outer: HashMap::new(),
uploads: BTreeMap::new(), uploads: BTreeMap::new(),
branch_subscriptions: HashMap::new(), branch_subscriptions: HashMap::new(),
orm_subscriptions: HashMap::new(), orm_tracked_subjects: HashMap::new(),
orm_shape_types: HashMap::new(), orm_shape_types: HashMap::new(),
temporary_repo_certificates: HashMap::new(), temporary_repo_certificates: HashMap::new(),
} }
@ -2813,7 +2816,7 @@ impl Verifier {
inner_to_outer: HashMap::new(), inner_to_outer: HashMap::new(),
uploads: BTreeMap::new(), uploads: BTreeMap::new(),
branch_subscriptions: HashMap::new(), branch_subscriptions: HashMap::new(),
orm_subscriptions: HashMap::new(), orm_tracked_subjects: HashMap::new(),
orm_shape_types: HashMap::new(), orm_shape_types: HashMap::new(),
temporary_repo_certificates: HashMap::new(), temporary_repo_certificates: HashMap::new(),
}; };

Loading…
Cancel
Save