feat/orm
Laurin Weger 2 weeks ago
parent 04b843e0fa
commit d641a93f1f
No known key found for this signature in database
GPG Key ID: 9B372BB0B792770F
  1. 2
      nextgraph/src/local_broker.rs
  2. 10
      nextgraph/src/tests/orm.rs
  3. 181
      ng-verifier/src/orm.rs
  4. 62
      ng-verifier/src/utils/orm_validation.rs

@ -2761,7 +2761,7 @@ pub async fn doc_sparql_construct(
) -> Result<Vec<Triple>, NgError> {
let broker = get_broker().await?;
let session = broker.get_session(session_id)?;
session.verifier.sparql_construct(sparql, nuri)
session.verifier.query_sparql_construct(sparql, nuri)
}
pub async fn doc_create(

@ -14,7 +14,7 @@ use ng_net::orm::{
OrmShapeType,
};
use ng_repo::log_info;
use ng_verifier::orm::sparql_construct_from_orm_shape_type;
use ng_verifier::orm::shape_type_to_sparql;
use std::collections::HashMap;
#[async_std::test]
@ -287,7 +287,7 @@ INSERT DATA {
};
// Generate and execute the CONSTRUCT query
let query = sparql_construct_from_orm_shape_type(&shape_type, Some(1)).unwrap();
let query = shape_type_to_sparql(&shape_type, Some(1)).unwrap();
let triples = doc_sparql_construct(session_id, query, Some(doc_nuri.clone()))
.await
@ -390,7 +390,7 @@ INSERT DATA {
};
// Generate and run query
let query = sparql_construct_from_orm_shape_type(&shape_type, Some(1)).unwrap();
let query = shape_type_to_sparql(&shape_type, Some(1)).unwrap();
let triples = doc_sparql_construct(session_id, query, Some(doc_nuri.clone()))
.await
.unwrap();
@ -453,7 +453,7 @@ INSERT DATA {
};
// Generate and run query
let query = sparql_construct_from_orm_shape_type(&shape_type, Some(1)).unwrap();
let query = shape_type_to_sparql(&shape_type, Some(1)).unwrap();
let triples = doc_sparql_construct(session_id, query, Some(doc_nuri.clone()))
.await
.unwrap();
@ -528,7 +528,7 @@ INSERT DATA {
};
// Generate and run query. This must not infinite loop.
let query = sparql_construct_from_orm_shape_type(&shape_type, Some(1)).unwrap();
let query = shape_type_to_sparql(&shape_type, Some(1)).unwrap();
log_info!("cyclic query result:\n{}", query);
let triples = doc_sparql_construct(session_id, query, Some(doc_nuri.clone()))
.await

@ -45,7 +45,7 @@ type SubjectIri = String;
type OrmChanges<'a> = HashMap<ShapeIri, HashMap<SubjectIri, OrmTrackedSubjectChange<'a>>>;
impl Verifier {
pub fn sparql_construct(
pub fn query_sparql_construct(
&self,
query: String,
nuri: Option<String>,
@ -83,10 +83,12 @@ impl Verifier {
}
}
fn build_changes_for_subscription<'a>(
fn process_changes_for_subscription<'a>(
self: &mut Self,
orm_subscription: &'a mut OrmSubscription,
triples_added: &'a [Triple],
triples_removed: &'a [Triple],
nuri: &String,
) -> Result<OrmChanges<'a>, NgError> {
let tracked_subjects = &mut orm_subscription.tracked_subjects;
let schema = &orm_subscription.shape_type.schema;
@ -94,7 +96,28 @@ impl Verifier {
.get(&orm_subscription.shape_type.shape)
.ok_or(VerifierError::InvalidOrmSchema)?;
let mut orm_changes: OrmChanges = HashMap::new();
return self.process_changes_for_subject_and_shape(
root_shape,
tracked_subjects,
schema,
triples_added,
triples_removed,
nuri,
None,
);
}
fn process_changes_for_subject_and_shape<'a>(
self: &mut Self,
root_shape: &OrmSchemaShape,
tracked_subjects: &mut HashMap<String, HashMap<String, OrmTrackedSubject>>,
schema: &OrmSchema,
triples_added: &'a [Triple],
triples_removed: &'a [Triple],
nuri: &String,
mut orm_changes: Option<OrmChanges<'a>>,
) -> Result<OrmChanges<'a>, NgError> {
let mut orm_changes: OrmChanges<'a> = orm_changes.take().unwrap_or_else(HashMap::new);
// First in, last out stack to keep track of objects to validate (nested objects first). Strings are object IRIs.
let mut shape_validation_queue: Vec<(&OrmSchemaShape, Vec<String>)> = vec![];
@ -115,10 +138,11 @@ impl Verifier {
.collect();
// Variable to collect nested objects that need validation.
// First string is shape IRI, second are object IRIs.
let mut nested_objects_to_validate: HashMap<String, Vec<String>> = HashMap::new();
let mut nested_objects_to_eval: HashMap<ShapeIri, Vec<(SubjectIri, bool)>> =
HashMap::new();
// For each subject, add/remove triples and validate.
for subject_iri in all_modified_subjects {
let triples_added_for_subj = added_triples_by_subject
.get(subject_iri)
@ -141,7 +165,7 @@ impl Verifier {
// Apply all triples for that subject to the tracked (shape, subject) pair.
// Record the changes.
if let Err(e) = add_remove_triples(
if let Err(e) = add_remove_triples_mut(
shape,
subject_iri,
triples_added_for_subj,
@ -160,30 +184,59 @@ impl Verifier {
}; // skip if missing
// Validate the subject.
let (_new_validity, new_unknowns) =
let need_eval =
update_subject_validity(change, shape, tracked_subjects, tracked_subject.valid);
// TODO: Add logic to fetch un-fetched objects after validation.
// We add the new_unknowns to be processed next
for (iri, schema_shape) in new_unknowns {
// We add the need_eval to be processed next after loop.
for (iri, schema_shape, needs_refetch) in need_eval {
// Add to nested_objects_to_validate.
nested_objects_to_validate
nested_objects_to_eval
.entry(schema_shape)
.or_insert_with(Vec::new)
.push(iri.clone());
.push((iri.clone(), needs_refetch));
}
}
// Now, we add all objects that need re-evaluation to the queue.
for (shape_iri, objects) in nested_objects_to_validate {
if let Some(next_shape) = schema.get(&shape_iri) {
shape_validation_queue.push((next_shape, objects));
} else {
log_err!("Schema missing for nested shape {}", shape_iri);
}
// Now, we fetch all un-fetched subjects for re-evaluation.
for (shape_iri, objects_to_eval) in &nested_objects_to_eval {
let objects_to_fetch = objects_to_eval
.iter()
.filter(|(_iri, needs_fetch)| *needs_fetch)
.map(|(s, _)| s.clone())
.collect();
let shape = schema
.get(shape_iri)
.ok_or(VerifierError::InvalidOrmSchema)?;
// Create sparql query
let shape_query =
shape_type_to_sparql(&schema, &shape_iri, Some(objects_to_fetch))?;
let new_triples = self.query_sparql_construct(shape_query, Some(nuri.clone()))?;
orm_changes = self.process_changes_for_subject_and_shape(
shape,
tracked_subjects,
schema,
new_triples,
&vec![],
nuri,
Some(orm_changes),
)?;
}
// Now, add all subjects to the queue that did not need a fetch.
for (shape_iri, objects_to_eval) in &nested_objects_to_eval {
let objects_to_fetch = objects_to_eval
.iter()
.filter(|(_iri, needs_fetch)| !*needs_fetch)
.map(|(s, _)| s.clone())
.collect();
let shape = schema
.get(shape_iri)
.ok_or(VerifierError::InvalidOrmSchema)?;
shape_validation_queue.push((shape, objects_to_fetch));
}
}
Ok(orm_changes)
}
@ -196,12 +249,18 @@ impl Verifier {
) -> Result<OrmChanges<'a>, NgError> {
// If we have a specific session, handle only that subscription.
if let Some(session_id) = only_for_session_id {
if let Some((_, sub)) = self
if let Some((nuri, sub)) = self
.orm_subscriptions
.iter_mut()
.find(|(_, s)| s.session_id == session_id)
{
return Self::build_changes_for_subscription(sub, triples_added, triples_removed);
// TODO: Is repo correct?
return self.process_changes_for_subscription(
sub,
triples_added,
triples_removed,
&nuri.repo(),
);
} else {
return Ok(HashMap::new());
}
@ -215,8 +274,12 @@ impl Verifier {
continue;
}
}
let changes =
Self::build_changes_for_subscription(sub, triples_added, triples_removed)?;
let changes = self.process_changes_for_subscription(
sub,
triples_added,
triples_removed,
&nuri.repo(), // TODO: Is this correct?
)?;
for (shape_iri, subj_map) in changes {
merged
.entry(shape_iri)
@ -405,34 +468,35 @@ impl Verifier {
pub(crate) async fn push_orm_response(
&mut self,
scope: &NuriV0,
schema_iri: &String,
subscription: &OrmSubscription,
response: AppResponse,
) {
log_info!(
"push_orm_response {:?} {} {:?}",
scope,
schema_iri,
self.orm_subscriptions
log_debug!(
"sending orm response for session {}:\n{:?}",
subscription.session_id,
&response
);
if let Some(shapes) = self.orm_subscriptions.get_mut(scope) {
if let Some(sessions) = shapes.get_mut(schema_iri) {
let mut sessions_to_close: Vec<u64> = vec![];
for (session_id, subscription) in sessions.iter_mut() {
if subscription.sender.is_closed() {
log_debug!("closed so removing session {}", session_id);
sessions_to_close.push(*session_id);
} else {
let _ = subscription.sender.send(response.clone()).await;
}
}
for session_id in sessions_to_close.iter() {
sessions.remove(session_id);
}
}
if subscription.sender.is_closed() {
log_debug!("closed so removing session {}", subscription.session_id);
self.orm_subscriptions.remove(&subscription.nuri);
} else {
subscription.sender.clone().send(response);
}
}
pub(crate) fn clean_orm_subscriptions(&mut self) {
let subscriptions = self.orm_subscriptions;
// TODO: Ownership issues.
// for (nuri, subscription) in subscriptions {
// if subscription.sender.is_closed() {
// subscriptions.remove(nuri);
// }
// }
}
pub(crate) async fn start_orm(
&mut self,
nuri: &NuriV0,
@ -457,14 +521,15 @@ impl Verifier {
.insert(nuri.clone(), orm_subscription);
// Query triples for this shape
let shape_query =
sparql_construct_from_orm_shape_type(&shape_type.schema, &shape_type.shape)?;
let shape_query = shape_type_to_sparql(&shape_type.schema, &shape_type.shape, None)?;
// TODO: How to stringify nuri correctly?
let shape_triples = self.sparql_construct(shape_query, Some(nuri.repo()))?;
let shape_triples = self.query_sparql_construct(shape_query, Some(nuri.repo()))?;
// Create objects from queried triples.
let subscription_ref = self.orm_subscriptions.get(nuri).unwrap();
let _orm_objects = self.create_orm_from_triples(subscription_ref, &shape_triples)?; // placeholder call; TODO integrate response
let _orm_objects = self.create_orm_from_triples(subscription_ref, &shape_triples)?;
// TODO integrate response
//self.push_orm_response().await; (only for requester, not all sessions)
@ -513,9 +578,10 @@ fn literal_to_sparql_str(var: OrmSchemaDataType) -> Vec<String> {
}
}
pub fn sparql_construct_from_orm_shape_type(
pub fn shape_type_to_sparql(
schema: &OrmSchema,
shape: &ShapeIri,
filter_subjects: Option<Vec<String>>,
) -> Result<String, NgError> {
// Use a counter to generate unique variable names.
let mut var_counter = 0;
@ -530,7 +596,6 @@ pub fn sparql_construct_from_orm_shape_type(
let mut where_statements = Vec::new();
// Keep track of visited shapes while recursing to prevent infinite loops.
// TODO: Update type
let mut visited_shapes: HashSet<ShapeIri> = HashSet::new();
// Recursive function to call for (nested) shapes.
@ -666,9 +731,21 @@ pub fn sparql_construct_from_orm_shape_type(
&mut visited_shapes,
);
// Filter subjects, if present.
if let Some(subjects) = filter_subjects {
let subjects_str = subjects
.iter()
.map(|s| format!("<{}>", s))
.collect::<Vec<_>>()
.join(", ");
where_statements.push(format!(" FILTER (?s0 IN ({})", subjects_str));
}
// Create query from statements.
let construct_body = construct_statements.join(" .\n");
let where_body = where_statements.join(" .\n");
Ok(format!(
"CONSTRUCT {{\n{}\n}}\nWHERE {{\n{}\n}}",
construct_body, where_body

@ -58,10 +58,8 @@ pub fn group_by_subject_for_shape<'a>(
/// Add all triples to `subject_changes`
/// Returns predicates to nested objects that were touched and need processing.
/// Assumes all triples have same subject.
use std::sync::Arc;
pub fn add_remove_triples_mut(
shape: &Arc<OrmSchemaShape>,
shape: &OrmSchemaShape,
subject_iri: &str,
triples_added: &[&Triple],
triples_removed: &[&Triple],
@ -90,9 +88,8 @@ pub fn add_remove_triples_mut(
let tracked_subject = get_or_create_tracked_subject(subject_iri, &shape.iri, tracked_subjects);
// Process added triples.
// For each triple, check matching predicates in shape.
// keeping track of value count (for later validations).
// In parallel, we keep track of the values added (tracked_changes)
// For each triple, check if it matches the shape.
// In parallel, we record the values added and removed (tracked_changes)
for triple in triples_added {
for predicate_schema in &shape.predicates {
if predicate_schema.iri != triple.predicate.as_str() {
@ -232,22 +229,17 @@ pub fn update_subject_validity<'a>(
shape: &OrmSchemaShape,
tracked_subjects: &HashMap<String, HashMap<String, OrmTrackedSubject<'a>>>,
previous_validity: OrmTrackedSubjectValidity,
) -> (
OrmTrackedSubjectValidity,
// Vec<subject_iri, shape_iri>
Vec<(String, String)>,
) {
) -> Vec<(String, String, bool)> {
let Some(tracked_shapes) = tracked_subjects.get_mut(subject_iri) else {
return (previous_validity, vec![]);
return vec![];
};
let Some(tracked_subject) = tracked_shapes.get_mut(&shape.iri) else {
return (previous_validity, vec![]);
return vec![];
};
// Keep track of objects that need to be validated against a shape to fetch and validate.
let mut need_evaluation: Vec<(String, String, bool)> = vec![];
// Check 1) Check if we need to fetch this object.
// If this subject has not been monitored but parents are now valid or need evaluation, we fetch.
// Check 2) If all parents are untracked, return untracked.
// Check 1) Check if we need to fetch this object or all parents are untracked.
if tracked_subject.parents.len() != 0 {
let no_parents_tracking = tracked_subject.parents.values().all(|parent| {
parent.valid == OrmTrackedSubjectValidity::Untracked
@ -258,19 +250,20 @@ pub fn update_subject_validity<'a>(
// Remove tracked predicates and set untracked.
tracked_subject.tracked_predicates = HashMap::new();
tracked_subject.valid = OrmTrackedSubjectValidity::Untracked;
return (OrmTrackedSubjectValidity::Untracked, vec![]);
return vec![];
} else if !no_parents_tracking && previous_validity == OrmTrackedSubjectValidity::Untracked
{
// We need to fetch the subject's current state:
// We have new parents but were previously not recording changes.
return vec![(s_change.subject_iri,)];
// TODO
}
}
// Check 2) If there are no changes, there is nothing to do.
if s_change.predicates.is_empty() {
return (previous_validity, vec![]);
return vec![];
}
let mut new_validity = OrmTrackedSubjectValidity::Valid;
@ -288,12 +281,9 @@ pub fn update_subject_validity<'a>(
// Remove tracked predicates and set invalid.
tracked_subject.tracked_predicates = HashMap::new();
tracked_subject.valid = OrmTrackedSubjectValidity::Invalid;
return (OrmTrackedSubjectValidity::Invalid, vec![]);
return vec![];
}
// Keep track of objects that need to be validated against a shape to fetch and validate.
let mut need_evaluation: Vec<(String, String)> = vec![];
// Check 4) Validate subject against each predicate in shape.
for p_schema in shape.predicates.iter() {
let p_change = s_change.predicates.get(&p_schema.iri);
@ -484,32 +474,26 @@ pub fn update_subject_validity<'a>(
&& previous_validity != OrmTrackedSubjectValidity::Valid
{
// If this subject became valid, we need to refetch this subject;
// For that we prepend self to needs_fetch.
need_evaluation.insert(0, (s_change.subject_iri, shape.iri.clone()));
// We fetch
need_evaluation.insert(0, (s_change.subject_iri, shape.iri.clone(), true));
}
// If validity changed, parents need to be re-evaluated.
if new_validity != previous_validity {
// We return the tracking parents which need re-evaluation.
// Remember that the last elements (i.e. children or needs_fetch) are evaluated first.
return (
new_validity,
// Add parents and objects in `need_evaluation`.
tracked_subject
.parents
.values()
// Inform tracking parents only.
.filter(|(parent, is_tracked)| *is_tracked)
.map(|(parent, is_tracked)| (&parent.subject_iri, parent.shape))
// Add `need_evaluation`.
.chain(need_evaluation)
.collect(),
);
return tracked_subject
.parents
.values()
.map(|parent| (&parent.subject_iri, parent.shape, false))
// Add `need_evaluation`.
.chain(need_evaluation)
.collect();
}
tracked_subject.valid = new_validity;
return (new_validity, need_evaluation);
return need_evaluation;
}
fn oxrdf_term_to_orm_basic_type(term: &ng_oxigraph::oxrdf::Term) -> BasicType {

Loading…
Cancel
Save