feat/orm
Laurin Weger 2 weeks ago
parent 3956c16a4e
commit b9ff05feee
No known key found for this signature in database
GPG Key ID: 9B372BB0B792770F
  1. 5
      ng-net/src/orm.rs
  2. 250
      ng-verifier/src/orm.rs

@ -116,7 +116,7 @@ pub struct OrmTrackedSubjectAndShape<'a> {
pub parents: HashMap<String, (OrmTrackedSubjectAndShape<'a>, bool)>, pub parents: HashMap<String, (OrmTrackedSubjectAndShape<'a>, bool)>,
/// Validity. When untracked, triple updates are not processed here. /// Validity. When untracked, triple updates are not processed here.
pub valid: OrmTrackedSubjectValidity, pub valid: OrmTrackedSubjectValidity,
pub subject_iri: &'a String, pub subject_iri: String,
/// The shape for which the predicates are tracked. /// The shape for which the predicates are tracked.
pub shape: &'a OrmSchemaShape, pub shape: &'a OrmSchemaShape,
} }
@ -134,7 +134,6 @@ pub enum OrmTrackedSubjectValidity {
pub struct OrmTrackedPredicate<'a> { pub struct OrmTrackedPredicate<'a> {
/// The predicate schema /// The predicate schema
pub schema: &'a OrmSchemaPredicate, pub schema: &'a OrmSchemaPredicate,
/// TODO: This is not correctly implemented.
/// If the schema is a nested object, the children. /// If the schema is a nested object, the children.
pub tracked_children: Vec<Weak<OrmTrackedSubjectAndShape<'a>>>, pub tracked_children: Vec<Weak<OrmTrackedSubjectAndShape<'a>>>,
/// The count of triples for this subject and predicate. /// The count of triples for this subject and predicate.
@ -149,7 +148,7 @@ pub struct OrmTrackedSubjectChange<'a> {
pub subject_iri: String, pub subject_iri: String,
/// Predicates that were changed. /// Predicates that were changed.
pub predicates: HashMap<String, OrmTrackedPredicateChanges<'a>>, pub predicates: HashMap<String, OrmTrackedPredicateChanges<'a>>,
/// During validation, the current state of validity. /// During validation, the current state of validity (can be subject to change).
pub valid: OrmTrackedSubjectValidity, pub valid: OrmTrackedSubjectValidity,
} }
pub struct OrmTrackedPredicateChanges<'a> { pub struct OrmTrackedPredicateChanges<'a> {

@ -7,14 +7,11 @@
// notice may not be copied, modified, or distributed except // notice may not be copied, modified, or distributed except
// according to those terms. // according to those terms.
use futures::channel::mpsc;
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::HashSet; use std::collections::HashSet;
use std::hash::Hash;
use std::rc::Weak; use std::rc::Weak;
use async_std::task::current;
use futures::channel::mpsc;
use futures::SinkExt; use futures::SinkExt;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use ng_net::orm::BasicType; use ng_net::orm::BasicType;
@ -29,21 +26,15 @@ use ng_net::orm::OrmTrackedSubjectAndShape;
use ng_net::orm::OrmTrackedSubjectChange; use ng_net::orm::OrmTrackedSubjectChange;
use ng_net::orm::OrmTrackedSubjectValidity; use ng_net::orm::OrmTrackedSubjectValidity;
use ng_net::orm::{OrmSchemaDataType, OrmSchemaShape}; use ng_net::orm::{OrmSchemaDataType, OrmSchemaShape};
use ng_net::utils::Receiver;
use ng_net::{app_protocol::*, orm::OrmSchema}; use ng_net::{app_protocol::*, orm::OrmSchema};
use ng_net::{ use ng_oxigraph::oxigraph::sparql::{Query, QueryResults};
types::*,
utils::{Receiver, Sender},
};
use ng_oxigraph::oxigraph::sparql::{results::*, Query, QueryResults};
use ng_oxigraph::oxrdf::LiteralRef;
use ng_oxigraph::oxrdf::NamedNode;
use ng_oxigraph::oxrdf::Subject; use ng_oxigraph::oxrdf::Subject;
use ng_oxigraph::oxrdf::Triple; use ng_oxigraph::oxrdf::Triple;
use ng_repo::errors::NgError; use ng_repo::errors::NgError;
use ng_repo::errors::VerifierError; use ng_repo::errors::VerifierError;
use ng_repo::log::*; use ng_repo::log::*;
use regex::Regex; use regex::Regex;
use serde::de::IntoDeserializer;
use serde_json::json; use serde_json::json;
use serde_json::Value; use serde_json::Value;
@ -148,52 +139,58 @@ impl Verifier {
tracked_subjects: &HashMap<String, HashMap<String, OrmTrackedSubjectAndShape>>, tracked_subjects: &HashMap<String, HashMap<String, OrmTrackedSubjectAndShape>>,
subject_changes: &OrmTrackedSubjectChange, subject_changes: &OrmTrackedSubjectChange,
) { ) {
let tracked_shapes_for_subject = tracked_subjects let get_or_create_tracked_subject = |subject_iri: &String, shape_iri: &String| {
.entry(subject_iri.clone()) let tracked_shapes_for_subject = tracked_subjects
.or_insert_with(|| HashMap::new()); .entry(subject_iri.clone())
.or_insert_with(|| HashMap::new());
let tracked_subject = tracked_shapes_for_subject
.entry(subject_iri.clone()) let tracked_subject = tracked_shapes_for_subject
.or_insert_with(|| OrmTrackedSubjectAndShape { .entry(shape_iri.clone())
tracked_predicates: HashMap::new(), .or_insert_with(|| OrmTrackedSubjectAndShape {
parents: HashMap::new(), tracked_predicates: HashMap::new(),
valid: ng_net::orm::OrmTrackedSubjectValidity::NotEvaluated, parents: HashMap::new(),
subject_iri, valid: ng_net::orm::OrmTrackedSubjectValidity::NotEvaluated,
shape, subject_iri: subject_iri.clone(),
}); shape,
});
tracked_subject
};
let tracked_subject = get_or_create_tracked_subject(subject_iri, &shape.iri);
// Process added triples. // Process added triples.
// For each triple, check matching predicates in shape. // For each triple, check matching predicates in shape.
// keeping track of value count (for later validations). // keeping track of value count (for later validations).
// In parallel, we keep track of the values added (tracked_changes) // In parallel, we keep track of the values added (tracked_changes)
for triple in triples_added { for triple in triples_added {
for schema_predicate in &shape.predicates { for predicate_schema in &shape.predicates {
if schema_predicate.iri != triple.predicate.as_str() { if predicate_schema.iri != triple.predicate.as_str() {
// Triple does not match predicate. // Triple does not match predicate.
continue; continue;
} }
// Predicate schema constraint matches this triple. // Predicate schema constraint matches this triple.
// Add tracked predicate or increase cardinality // Add tracked predicate or increase cardinality
let tp = tracked_subject let tracked_predicate = tracked_subject
.tracked_predicates .tracked_predicates
.entry(schema_predicate.iri.to_string()) .entry(predicate_schema.iri.to_string())
.or_insert_with(|| OrmTrackedPredicate { .or_insert_with(|| OrmTrackedPredicate {
current_cardinality: 0, current_cardinality: 0,
schema: schema_predicate, schema: predicate_schema,
tracked_children: Vec::new(), tracked_children: Vec::new(),
current_literals: None, current_literals: None,
}); });
tp.current_cardinality += 1; tracked_predicate.current_cardinality += 1;
let obj_term = oxrdf_term_to_orm_basic_type(&triple.object); let obj_term = oxrdf_term_to_orm_basic_type(&triple.object);
// Keep track of the changed values too. // Keep track of the changed values too.
let pred_changes = subject_changes let pred_changes = subject_changes
.predicates .predicates
.entry(schema_predicate.iri.clone()) .entry(predicate_schema.iri.clone())
.or_insert_with(|| OrmTrackedPredicateChanges { .or_insert_with(|| OrmTrackedPredicateChanges {
tracked_predicate: &tp, tracked_predicate: &tracked_predicate,
values_added: Vec::new(), values_added: Vec::new(),
values_removed: Vec::new(), values_removed: Vec::new(),
}); });
@ -201,16 +198,41 @@ impl Verifier {
pred_changes.values_added.push(obj_term.clone()); pred_changes.values_added.push(obj_term.clone());
// If value type is literal, we need to add the current value to the tracked predicate. // If value type is literal, we need to add the current value to the tracked predicate.
if tp if tracked_predicate
.schema .schema
.dataTypes .dataTypes
.iter() .iter()
.any(|dt| dt.valType == OrmSchemaLiteralType::literal) .any(|dt| dt.valType == OrmSchemaLiteralType::literal)
{ {
if let Some(current_literals) = tp.current_literals { if let Some(current_literals) = tracked_predicate.current_literals {
current_literals.push(obj_term); current_literals.push(obj_term);
} else { } else {
tp.current_literals.insert(vec![obj_term]); tracked_predicate.current_literals.insert(vec![obj_term]);
}
}
// If predicate is of type shape, register (parent -> child) links so that
// nested subjects can later be (lazily) fetched / validated.
for shape_iri in predicate_schema
.dataTypes
.iter()
.filter(|dt| dt.valType == OrmSchemaLiteralType::shape)
.flat_map(|dt| dt.shape)
{
if let BasicType::Str(obj_iri) = obj_term {
// Get or create object's tracked subject struct.
let tracked_child = get_or_create_tracked_subject(
triple.predicate.as_string(),
&shape_iri,
);
// Add self to parent (set tracked to true, preliminary).
tracked_child.parents.insert(obj_iri, (tracked_child, true));
// Add link to children
tracked_predicate
.tracked_children
.push(unsafe { Weak::from_raw(tracked_child) });
} }
} }
} }
@ -221,7 +243,7 @@ impl Verifier {
let pred_iri = triple.predicate.as_str(); let pred_iri = triple.predicate.as_str();
// Only adjust if we had tracked state. // Only adjust if we had tracked state.
let Some(tp) = tracked_subjects let Some(tracked_predicate) = tracked_subjects
.get_mut(subject_iri) .get_mut(subject_iri)
.map(|tss| tss.get(&shape.iri)) .map(|tss| tss.get(&shape.iri))
.flatten() .flatten()
@ -232,7 +254,7 @@ impl Verifier {
}; };
// The cardinality might become -1 or 0. We will remove them from the tracked predicates during validation. // The cardinality might become -1 or 0. We will remove them from the tracked predicates during validation.
tp.current_cardinality -= 1; tracked_predicate.current_cardinality -= 1;
let Some(pred_changes) = subject_changes.predicates.get(pred_iri) else { let Some(pred_changes) = subject_changes.predicates.get(pred_iri) else {
continue; continue;
@ -242,17 +264,47 @@ impl Verifier {
pred_changes.values_removed.push(val_removed.clone()); pred_changes.values_removed.push(val_removed.clone());
// If value type is literal, we need to remove the current value from the tracked predicate. // If value type is literal, we need to remove the current value from the tracked predicate.
if tp if tracked_predicate
.schema .schema
.dataTypes .dataTypes
.iter() .iter()
.any(|dt| dt.valType == OrmSchemaLiteralType::literal) .any(|dt| dt.valType == OrmSchemaLiteralType::literal)
{ {
if let Some(current_literals) = &mut tp.current_literals { if let Some(current_literals) = &mut tracked_predicate.current_literals {
// Remove obj_val from current_literals in-place // Remove obj_val from current_literals in-place
current_literals.retain(|val| *val != val_removed); current_literals.retain(|val| *val != val_removed);
} else { } else {
tp.current_literals = Some(vec![val_removed]); tracked_predicate.current_literals = Some(vec![val_removed]);
}
} else if tracked_predicate
.schema
.dataTypes
.iter()
.any(|dt| dt.valType == OrmSchemaLiteralType::shape)
{
// Remove parent from child and child from tracked children.
for shape_iri in tracked_predicate
.schema
.dataTypes
.iter()
.filter(|dt| dt.valType == OrmSchemaLiteralType::shape)
.flat_map(|dt| dt.shape)
{
if let BasicType::Str(object_iri) = val_removed {
// Get child.
let nested_tracked_subject = get_or_create_tracked_subject(
triple.predicate.as_string(),
&shape_iri,
);
// Add self to parent (set tracked to true, preliminary).
nested_tracked_subject.parents.remove(&object_iri);
// Remove from tracked_children
tracked_predicate.tracked_children.retain_mut(|el| {
el.upgrade().map(|el| el.subject_iri) == Some(object_iri)
});
}
} }
} }
} }
@ -320,7 +372,7 @@ impl Verifier {
} }
// Keep track of objects that need to be validated against a shape to fetch and validate. // Keep track of objects that need to be validated against a shape to fetch and validate.
let mut new_unknowns: Vec<(&String, &OrmSchemaShape, bool)> = vec![]; let mut need_evaluation: Vec<(&String, &OrmSchemaShape, bool)> = vec![];
// Check 4) Validate subject against each predicate in shape. // Check 4) Validate subject against each predicate in shape.
for p_schema in shape.predicates.iter() { for p_schema in shape.predicates.iter() {
@ -433,7 +485,7 @@ impl Verifier {
{ {
if let Some(tc) = o.upgrade() { if let Some(tc) = o.upgrade() {
if tc.valid == OrmTrackedSubjectValidity::Untracked { if tc.valid == OrmTrackedSubjectValidity::Untracked {
new_unknowns.push((tc.subject_iri, tc.shape, true)); need_evaluation.push((&tc.subject_iri, tc.shape, true));
} }
} }
} }
@ -475,25 +527,57 @@ impl Verifier {
}; };
} }
// If we are invalid, we can discard new unknowns again - they won't be kept in memory.
// We need to inform all children (by returning them for later evaluation), to untrack them.
// TODO: Collect info about all children to untrack them.
if new_validity == OrmTrackedSubjectValidity::Invalid { if new_validity == OrmTrackedSubjectValidity::Invalid {
return (OrmTrackedSubjectValidity::Invalid, vec![]); // If we are invalid, we can discard new unknowns again - they won't be kept in memory.
// We need to remove ourself from child objects parents field and
// remove them if no other is tracking.
for child_subject_weak in tracked_subject
.tracked_predicates
.iter()
.flat_map(|(tp_iri, tp)| tp.tracked_children)
{
if let Some(child_subject) = child_subject_weak.upgrade() {
child_subject.parents.remove(&s_change.subject_iri);
};
}
// Remove tracked predicates and set untracked.
tracked_subject.tracked_predicates = HashMap::new();
tracked_subject.valid = OrmTrackedSubjectValidity::Invalid;
s_change.valid = OrmTrackedSubjectValidity::Invalid;
// Empty list of children that need evaluation.
need_evaluation.retain(|_| false);
} else if new_validity == OrmTrackedSubjectValidity::Valid } else if new_validity == OrmTrackedSubjectValidity::Valid
&& previous_validity != OrmTrackedSubjectValidity::Valid && previous_validity != OrmTrackedSubjectValidity::Valid
{ {
// TODO // If this subject became valid, we need to refetch this subject;
// If this subject became valid, we need to refetch this subject. // For that we prepend self to needs_fetch.
s_change.valid = OrmTrackedSubjectValidity::Valid;
tracked_subject.valid = OrmTrackedSubjectValidity::Valid;
need_evaluation.insert(0, (&s_change.subject_iri, shape, true));
} }
// TODO...
// If validity changed, parents need to be re-evaluated. // If validity changed, parents need to be re-evaluated.
if new_validity != previous_validity { if new_validity != previous_validity {
// TODO // We return the tracking parents which need re-evaluation.
// Remember that the last elements (i.e. children or needs_fetch) are evaluated first.
return (
new_validity,
// Add parents and objects in `need_evaluation`.
tracked_subject
.parents
.values()
// Inform tracking parents only.
.filter(|(parent, is_tracked)| *is_tracked)
.map(|(parent, is_tracked)| (&parent.subject_iri, parent.shape, false))
// Add `need_evaluation`.
.chain(need_evaluation)
.collect(),
);
} }
// TODO return (new_validity, need_evaluation);
return (new_validity, new_unknowns);
} }
// === Validation === // === Validation ===
@ -574,7 +658,7 @@ impl Verifier {
// and return logic to add unprocessed nested objects after validation. // and return logic to add unprocessed nested objects after validation.
// We add the new_unknowns to be processed next // We add the new_unknowns to be processed next
for (iri, schema) in new_unknowns { for (iri, schema, needs_fetch) in new_unknowns {
// Add to nested_objects_to_validate. // Add to nested_objects_to_validate.
nested_objects_to_validate nested_objects_to_validate
.entry(schema.iri.clone()) .entry(schema.iri.clone())
@ -601,7 +685,10 @@ impl Verifier {
) -> Result<Value, NgError> { ) -> Result<Value, NgError> {
let changes = self.apply_changes_from_triples(scope, schema, shape, triples, &vec![]); let changes = self.apply_changes_from_triples(scope, schema, shape, triples, &vec![]);
let root_changes = changes.get(&shape.iri).unwrap().values(); let Some(root_changes) = changes.get(&shape.iri).map(|s| s.values()) else {
return Ok(Value::Array(vec![]));
};
let valid_roots = root_changes.filter(|v| v.valid == OrmTrackedSubjectValidity::Valid); let valid_roots = root_changes.filter(|v| v.valid == OrmTrackedSubjectValidity::Valid);
let mut return_vals: Value = Value::Array(vec![]); let mut return_vals: Value = Value::Array(vec![]);
@ -612,28 +699,35 @@ impl Verifier {
changes: &HashMap<String, HashMap<String, OrmTrackedSubjectChange<'_>>>, changes: &HashMap<String, HashMap<String, OrmTrackedSubjectChange<'_>>>,
shape: &OrmSchemaShape, shape: &OrmSchemaShape,
schema: &OrmSchema, schema: &OrmSchema,
tracked_subjects: &HashMap<String, HashMap<String, OrmTrackedSubjectAndShape>>,
) -> Value { ) -> Value {
let mut new_val = json!({"id": change.subject_iri}); let mut new_val = json!({"id": change.subject_iri});
let new_val_map = new_val.as_object_mut().unwrap(); let new_val_map = new_val.as_object_mut().unwrap();
for pred_schema in &shape.predicates { for pred_schema in &shape.predicates {
let Some(pred_change) = change.predicates.get(&pred_schema.iri) else {
continue;
};
let property_name = pred_schema.readablePredicate.clone(); let property_name = pred_schema.readablePredicate.clone();
let is_multi = pred_schema.maxCardinality > 1; let is_multi = pred_schema.maxCardinality > 1 || pred_schema.maxCardinality == -1;
let pred_change = change.predicates.get(&pred_schema.iri).unwrap();
if pred_schema if pred_schema
.dataTypes .dataTypes
.iter() .iter()
.any(|dt| dt.valType == OrmSchemaLiteralType::shape) .any(|dt| dt.valType == OrmSchemaLiteralType::shape)
{ {
// Helper to create nested objects. // We have a nested type.
// Helper to create Value structs from a nested object_iri.
let get_nested_value = |object_iri: &String| { let get_nested_value = |object_iri: &String| {
// Find allowed schemas for the predicate's datatype.
let shape_iris: Vec<String> = pred_schema let shape_iris: Vec<String> = pred_schema
.dataTypes .dataTypes
.iter() .iter()
.flat_map(|dt| dt.shape.clone()) .flat_map(|dt| dt.shape.clone())
.collect(); .collect();
// Find subject_change for this subject. There exists at least one (shape, subject). // Find subject_change for this subject. There exists at least one (shape, subject) pair.
// If multiple exist, we take the first one. _Which one is chosen is undefined behavior_
let nested_subject_change = shape_iris let nested_subject_change = shape_iris
.iter() .iter()
.find_map(|shape_iri| { .find_map(|shape_iri| {
@ -643,32 +737,43 @@ impl Verifier {
}) })
.unwrap(); .unwrap();
// Recurse if let Some(nested_schema) = tracked_subjects
create_value_from_change( .get(&nested_subject_change.subject_iri)
nested_subject_change, .and_then(|tracked_subjects| tracked_subjects.get(&shape.iri))
changes, {
schema.get(&nested_subject_change.subject_iri).unwrap(), // Recurse
schema, return create_value_from_change(
) nested_subject_change,
changes,
nested_schema.shape,
schema,
tracked_subjects,
);
}
return json!({});
}; };
if is_multi { if is_multi {
// Add each value to a new object (predicate being object IRIs). // Add each value to a new object (predicates being object IRIs) like {iri: <nested object>}.
let mut nested_objects = json!({"id": change.subject_iri}); let mut nested_objects = json!({"id": change.subject_iri});
let nested_objects_map = nested_objects.as_object_mut().unwrap(); let nested_objects_map = nested_objects.as_object_mut().unwrap();
// Add (object iri, nested object) pairs to object.
for new_val in &pred_change.values_added { for new_val in &pred_change.values_added {
if let BasicType::Str(object_iri) = new_val { if let BasicType::Str(object_iri) = new_val {
new_val_map new_val_map
.insert(object_iri.clone(), get_nested_value(&object_iri)); .insert(object_iri.clone(), get_nested_value(&object_iri));
} }
} }
new_val_map.insert(property_name.clone(), nested_objects);
} else { } else {
if let Some(BasicType::Str(object_iri)) = pred_change.values_added.get(0) { if let Some(BasicType::Str(object_iri)) = pred_change.values_added.get(0) {
new_val_map.insert(property_name.clone(), get_nested_value(object_iri)); new_val_map.insert(property_name.clone(), get_nested_value(object_iri));
} }
} }
} else { } else {
// We have a basic type (string, number, bool, literal).
if is_multi { if is_multi {
// Add values as array. // Add values as array.
new_val_map.insert( new_val_map.insert(
@ -692,16 +797,23 @@ impl Verifier {
} }
} }
} }
return new_val; return new_val;
} }
// For each valid change struct, we build an orm object.
for root_change in valid_roots { for root_change in valid_roots {
let new_val = create_value_from_change(root_change, &changes, shape, schema); let new_val = create_value_from_change(
root_change,
&changes,
shape,
schema,
self.orm_tracked_subjects,
);
return_val_vec.push(new_val); return_val_vec.push(new_val);
} }
return Ok(return_vals); return Ok(return_vals);
//
} }
// Collect result // Collect result

Loading…
Cancel
Save