restructure handle_backend_update

feat/orm-diffs
Laurin Weger 3 days ago
parent 94dc33d6bb
commit cc9e3c0940
No known key found for this signature in database
GPG Key ID: 9B372BB0B792770F
  1. 736
      engine/verifier/src/orm/handle_backend_update.rs

@ -136,400 +136,6 @@ impl Verifier {
let mut paths_of_objects_to_create: HashSet<(Vec<String>, Option<SubjectIri>)> = let mut paths_of_objects_to_create: HashSet<(Vec<String>, Option<SubjectIri>)> =
HashSet::new(); HashSet::new();
// Function to create diff objects from a given change.
// The function recurses from child to parents down to a root tracked subject.
// If multiple parents exist, it adds separate patches for each.
fn add_diff_ops_for_tracked_subject(
tracked_subject: &OrmTrackedSubject,
tracked_subjects: &HashMap<
String,
HashMap<String, Arc<RwLock<OrmTrackedSubject>>>,
>,
root_shape: &String,
path: &mut Vec<String>,
diff_op: (
OrmDiffOpType,
Option<OrmDiffType>,
Option<Value>, // The value added / removed
Option<String>, // The IRI, if change is an added / removed object.
),
patches: &mut Vec<OrmDiffOp>,
paths_of_objects_to_create: &mut HashSet<(Vec<String>, Option<SubjectIri>)>,
) {
// If this subject has no parents or its shape matches the root shape, we've reached the root
if tracked_subject.parents.is_empty()
|| tracked_subject.shape.iri == *root_shape
{
// Build the final JSON Pointer path
let escaped_path: Vec<String> =
path.iter().map(|seg| escape_json_pointer(seg)).collect();
let json_pointer = format!("/{}", escaped_path.join("/"));
// Create the patch
let patch = OrmDiffOp {
op: diff_op.0.clone(),
valType: diff_op.1.clone(),
path: json_pointer,
value: diff_op.2.clone(),
};
patches.push(patch);
return;
}
// Recurse to parents
for (parent_iri, parent_tracked_subject) in tracked_subject.parents.iter() {
// Get predicate schema linking parent with tracked_subject
// Use predicate schema readable_predicate to add to path.
// If predicate schema is multi, add our own subject iri to path first.
// If parent is root, we don't need to recurse.
// Instead we add new patches based on the path (we need to escape segments before)
// and the diff_op content
let parent_ts = parent_tracked_subject.read().unwrap();
// Find the predicate schema linking parent to this tracked subject
for pred_arc in &parent_ts.shape.predicates {
// Check if this predicate has our subject as a child
if let Some(tracked_pred) =
parent_ts.tracked_predicates.get(&pred_arc.iri)
{
let tp = tracked_pred.read().unwrap();
// Check if this tracked subject is in the children
let is_child = tp.tracked_children.iter().any(|child| {
let child_read = child.read().unwrap();
child_read.subject_iri == tracked_subject.subject_iri
});
if is_child {
// Build the path segment
let mut new_path = path.clone();
let is_multi = pred_arc.maxCardinality > 1
|| pred_arc.maxCardinality == -1;
// For multi-valued predicates, add the object IRI as a key first
if is_multi {
new_path.insert(0, tracked_subject.subject_iri.clone());
}
// Add the readable predicate name
new_path.insert(0, pred_arc.readablePredicate.clone());
// Recurse to the parent
add_diff_ops_for_tracked_subject(
&parent_ts,
tracked_subjects,
root_shape,
&mut new_path,
diff_op.clone(),
patches,
paths_of_objects_to_create,
);
break;
}
}
}
}
}
fn diff_op_from_pred_change(
pred_change: &OrmTrackedPredicateChanges,
) -> Vec<(
OrmDiffOpType,
Option<OrmDiffType>,
Option<Value>, // The value added / removed
Option<String>, // The IRI, if change is an added / removed object.
)> {
let tracked_predicate = pred_change.tracked_predicate.read().unwrap();
let is_multi = tracked_predicate.schema.maxCardinality > 1
|| tracked_predicate.schema.maxCardinality == -1;
let is_object = tracked_predicate
.schema
.dataTypes
.iter()
.any(|dt| dt.shape.is_some());
if !is_multi && !is_object {
if pred_change.values_added.len() == 1 {
// A value was added. Another one might have been removed
// but the add patch overwrite previous values.
return [(
OrmDiffOpType::add,
None,
Some(json!(pred_change.values_added[0])),
None,
)]
.to_vec();
} else {
// Since there is only one possible value, removing the path is enough.
return [(OrmDiffOpType::remove, None, None, None)].to_vec();
}
} else if is_multi && !is_object {
let mut ops = vec![];
if pred_change.values_added.len() > 0 {
ops.push((
OrmDiffOpType::add,
Some(OrmDiffType::set),
Some(json!(pred_change.values_added)),
None,
));
}
if pred_change.values_removed.len() > 0 {
ops.push((
OrmDiffOpType::remove,
Some(OrmDiffType::set),
Some(json!(pred_change.values_removed)),
None,
));
}
return ops;
}
// objects are not handled here because objects to create
// are registered during path traversal.
return vec![];
}
// Helper function to determine the highest-priority valid shape for a subject
// given the allowed shapes in a predicate's dataTypes.
// Returns (current_valid_shape, previous_valid_shape)
#[allow(dead_code)]
fn get_highest_priority_valid_shapes(
subject_iri: &SubjectIri,
allowed_shapes: &[OrmSchemaDataType], // From predicate.dataTypes (in priority order)
tracked_subjects: &HashMap<
String,
HashMap<String, Arc<RwLock<OrmTrackedSubject>>>,
>,
) -> (Option<String>, Option<String>) {
let Some(shapes_for_subject) = tracked_subjects.get(subject_iri) else {
return (None, None);
};
// Find current highest-priority valid shape
let current_valid = allowed_shapes
.iter()
.filter_map(|dt| dt.shape.as_ref())
.find_map(|shape_iri| {
shapes_for_subject.get(shape_iri).and_then(|ts| {
let tracked = ts.read().unwrap();
if tracked.valid == OrmTrackedSubjectValidity::Valid {
Some(shape_iri.clone())
} else {
None
}
})
});
// Find previous highest-priority valid shape
let previous_valid = allowed_shapes
.iter()
.filter_map(|dt| dt.shape.as_ref())
.find_map(|shape_iri| {
shapes_for_subject.get(shape_iri).and_then(|ts| {
let tracked = ts.read().unwrap();
if tracked.prev_valid == OrmTrackedSubjectValidity::Valid {
Some(shape_iri.clone())
} else {
None
}
})
});
(current_valid, previous_valid)
}
// Helper function to handle validity changes when highest-priority shape changes
#[allow(dead_code)]
fn handle_shape_priority_change(
subject_iri: &SubjectIri,
shape_iri: &ShapeIri,
tracked_subjects: &HashMap<
String,
HashMap<String, Arc<RwLock<OrmTrackedSubject>>>,
>,
root_shape: &String,
orm_changes: &OrmChanges,
patches: &mut Vec<OrmDiffOp>,
paths_of_objects_to_create: &mut HashSet<(Vec<String>, Option<SubjectIri>)>,
) {
// Step 1: Check if this subject has multiple tracked shapes
let Some(shapes_for_subject) = tracked_subjects.get(subject_iri) else {
return;
};
if shapes_for_subject.len() <= 1 {
// Only one shape, no priority conflict possible
return;
}
// Step 2: Get the current tracked subject
let Some(tracked_subject_arc) = shapes_for_subject.get(shape_iri) else {
return;
};
let tracked_subject = tracked_subject_arc.read().unwrap();
// Step 3: For each parent, check if the highest-priority valid shape changed
for (parent_iri, parent_tracked_subject_arc) in tracked_subject.parents.iter() {
let parent_ts = parent_tracked_subject_arc.read().unwrap();
// Find the predicate linking parent to this subject
for pred_arc in &parent_ts.shape.predicates {
if let Some(tracked_pred) =
parent_ts.tracked_predicates.get(&pred_arc.iri)
{
let tp = tracked_pred.read().unwrap();
// Check if this tracked subject is a child of this predicate
let is_child = tp.tracked_children.iter().any(|child| {
let child_read = child.read().unwrap();
child_read.subject_iri == *subject_iri
});
if !is_child {
continue;
}
// Get the allowed shapes for this predicate (in priority order)
let allowed_shapes: Vec<_> = pred_arc
.dataTypes
.iter()
.filter(|dt| dt.shape.is_some())
.collect();
if allowed_shapes.len() <= 1 {
// No priority conflict possible with single shape
continue;
}
// Determine current and previous highest-priority valid shapes
let (current_valid, previous_valid) =
get_highest_priority_valid_shapes(
subject_iri,
&pred_arc.dataTypes,
tracked_subjects,
);
// Step 4: Create patches based on what changed
if current_valid != previous_valid {
let is_multi = pred_arc.maxCardinality > 1
|| pred_arc.maxCardinality == -1;
// Case A: Shape switch (ShapeA -> ShapeB)
if let (Some(new_shape), Some(old_shape)) =
(&current_valid, &previous_valid)
{
// Remove the old object
if let Some(old_ts) = shapes_for_subject.get(old_shape) {
let old_tracked = old_ts.read().unwrap();
let mut path = vec![];
let diff_op = (
OrmDiffOpType::remove,
Some(OrmDiffType::object),
None,
Some(subject_iri.clone()),
);
add_diff_ops_for_tracked_subject(
&old_tracked,
tracked_subjects,
root_shape,
&mut path,
diff_op,
patches,
paths_of_objects_to_create,
);
}
// Add the new object (need to materialize it)
if let Some(new_ts) = shapes_for_subject.get(new_shape) {
let new_tracked = new_ts.read().unwrap();
// TODO: Materialize the object with current triples
// This requires access to the change data or re-querying
// For now, we'll just create an object placeholder patch
let mut path = vec![];
let diff_op = (
OrmDiffOpType::add,
Some(OrmDiffType::object),
Some(Value::Null),
Some(subject_iri.clone()),
);
add_diff_ops_for_tracked_subject(
&new_tracked,
tracked_subjects,
root_shape,
&mut path,
diff_op,
patches,
paths_of_objects_to_create,
);
}
}
// Case B: Object became valid (None -> ShapeX)
else if let (Some(new_shape), None) =
(&current_valid, &previous_valid)
{
if let Some(new_ts) = shapes_for_subject.get(new_shape) {
let new_tracked = new_ts.read().unwrap();
let mut path = vec![];
let diff_op = (
OrmDiffOpType::add,
Some(OrmDiffType::object),
Some(Value::Null),
Some(subject_iri.clone()),
);
add_diff_ops_for_tracked_subject(
&new_tracked,
tracked_subjects,
root_shape,
&mut path,
diff_op,
patches,
paths_of_objects_to_create,
);
}
}
// Case C: Object became invalid (ShapeX -> None)
else if let (None, Some(old_shape)) =
(&current_valid, &previous_valid)
{
if let Some(old_ts) = shapes_for_subject.get(old_shape) {
let old_tracked = old_ts.read().unwrap();
let mut path = vec![];
let diff_op = (
OrmDiffOpType::remove,
Some(OrmDiffType::object),
None,
Some(subject_iri.clone()),
);
add_diff_ops_for_tracked_subject(
&old_tracked,
tracked_subjects,
root_shape,
&mut path,
diff_op,
patches,
paths_of_objects_to_create,
);
}
}
}
break; // Found the predicate, no need to check others
}
}
}
}
// We construct object patches from a change (which is associated with a shape type). {op: add, valType: object, value: Null, path: ...} // We construct object patches from a change (which is associated with a shape type). {op: add, valType: object, value: Null, path: ...}
// For each change that has a subject tracked in this subscription, // For each change that has a subject tracked in this subscription,
// - Get change operation (calling diff_op_from_pred_change). // - Get change operation (calling diff_op_from_pred_change).
@ -548,59 +154,64 @@ impl Verifier {
// Problem: We might not have the triples present to materialize the newly valid object so we need to fetch them. // Problem: We might not have the triples present to materialize the newly valid object so we need to fetch them.
// Process changes for this subscription // Process changes for this subscription
// Iterate through all changes and create patches // Iterate over all changes and create patches
for (shape_iri, subject_changes) in &orm_changes { for (shape_iri, subject_changes) in &orm_changes {
for (subject_iri, change) in subject_changes { for (subject_iri, change) in subject_changes {
// Get the tracked subject for this (subject, shape) pair // Get the tracked subject for this (subject, shape) pair
let tracked_subject_opt = sub let tracked_subject = sub
.tracked_subjects .tracked_subjects
.get(subject_iri) .get(subject_iri)
.and_then(|shapes| shapes.get(shape_iri)) .and_then(|shapes| shapes.get(shape_iri))
.map(|ts| ts.read().unwrap()); .map(|ts| ts.read().unwrap())
.unwrap();
let Some(tracked_subject) = tracked_subject_opt else {
// Now we have the tracked predicate (containing the shape) and the change.
// Check validity changes
if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Invalid
&& tracked_subject.valid == OrmTrackedSubjectValidity::Invalid
{
// Is the subject invalid and was it before? There is nothing we need to inform about.
continue; continue;
}; } else if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Valid
&& tracked_subject.valid == OrmTrackedSubjectValidity::Invalid
// Process each predicate change || tracked_subject.valid == OrmTrackedSubjectValidity::Untracked
for (pred_iri, pred_change) in &change.predicates { {
let tracked_predicate = pred_change.tracked_predicate.read().unwrap(); // Has the subject become invalid or untracked?
let pred_name = tracked_predicate.schema.readablePredicate.clone(); // We add a patch, deleting the object at its root.
// Check validity changes let mut path: Vec<String>;
if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Invalid if tracked_subject.parents.is_empty() {
&& tracked_subject.valid == OrmTrackedSubjectValidity::Invalid // If this is a root object, we need to add the object's id itself.
{ path = vec![tracked_subject.subject_iri.clone()];
// Is the subject invalid and was it before? There is nothing we need to inform about.
return;
} else if tracked_subject.prev_valid == OrmTrackedSubjectValidity::Valid
&& tracked_subject.valid == OrmTrackedSubjectValidity::Invalid
|| tracked_subject.valid == OrmTrackedSubjectValidity::Untracked
{
// Has the subject become invalid or untracked?
// We add a patch, deleting the object at its root.
let mut path: Vec<String> =
vec![subject_iri.clone(), pred_name.clone()];
add_diff_ops_for_tracked_subject(
&tracked_subject,
&sub.tracked_subjects,
&sub.shape_type.shape,
&mut path,
(OrmDiffOpType::remove, Some(OrmDiffType::object), None, None),
&mut patches,
&mut paths_of_objects_to_create,
);
} else { } else {
// The subject is valid or has become valid. path = vec![];
}
build_path_to_root_and_create_patches(
&tracked_subject,
&sub.tracked_subjects,
&sub.shape_type.shape,
&mut path,
(OrmDiffOpType::remove, Some(OrmDiffType::object), None, None),
&mut patches,
&mut paths_of_objects_to_create,
);
} else {
// The subject is valid or has become valid.
// Process each predicate change
for (_pred_iri, pred_change) in &change.predicates {
let tracked_predicate =
pred_change.tracked_predicate.read().unwrap();
let pred_name = tracked_predicate.schema.readablePredicate.clone();
// Get the diff operations for this predicate change // Get the diff operations for this predicate change
let diff_ops = diff_op_from_pred_change(pred_change); let diff_ops = create_diff_ops_from_predicate_change(pred_change);
// For each diff operation, traverse up to the root to build the path // For each diff operation, traverse up to the root to build the path
for diff_op in diff_ops { for diff_op in diff_ops {
let mut path = vec![subject_iri.clone(), pred_name.clone()]; let mut path = vec![pred_name.clone()];
// Start recursion from this tracked subject // Start recursion from this tracked subject
add_diff_ops_for_tracked_subject( build_path_to_root_and_create_patches(
&tracked_subject, &tracked_subject,
&sub.tracked_subjects, &sub.tracked_subjects,
&sub.shape_type.shape, &sub.shape_type.shape,
@ -654,3 +265,264 @@ impl Verifier {
} }
} }
} }
/// Find the predicate schema linking a parent to a child tracked subject and build the path segment.
/// Returns the updated path if a linking predicate is found.
fn build_path_segment_for_parent(
tracked_subject: &OrmTrackedSubject,
parent_ts: &OrmTrackedSubject,
base_path: &[String],
) -> Option<Vec<String>> {
// Find the predicate schema linking parent to this tracked subject
for pred_arc in &parent_ts.shape.predicates {
// Check if this predicate has our subject as a child
if let Some(tracked_pred) = parent_ts.tracked_predicates.get(&pred_arc.iri) {
let tp = tracked_pred.read().unwrap();
// Check if this tracked subject is in the children
let is_child = tp.tracked_children.iter().any(|child| {
let child_read = child.read().unwrap();
child_read.subject_iri == tracked_subject.subject_iri
});
if is_child {
// Build the path segment
let mut new_path = base_path.to_vec();
let is_multi = pred_arc.maxCardinality > 1 || pred_arc.maxCardinality == -1;
// For multi-valued predicates, add the object IRI as a key first
if is_multi {
new_path.insert(0, tracked_subject.subject_iri.clone());
}
// Add the readable predicate name
new_path.insert(0, pred_arc.readablePredicate.clone());
return Some(new_path);
}
}
}
None
}
/// Recursively build the path from a tracked subject to the root and create diff operation patches.
/// The function recurses from child to parents down to a root tracked subject.
/// If multiple parents exist, it adds separate patches for each.
fn build_path_to_root_and_create_patches(
tracked_subject: &OrmTrackedSubject,
tracked_subjects: &HashMap<String, HashMap<String, Arc<RwLock<OrmTrackedSubject>>>>,
root_shape: &String,
path: &mut Vec<String>,
diff_op: (
OrmDiffOpType,
Option<OrmDiffType>,
Option<Value>, // The value added / removed
Option<String>, // The IRI, if change is an added / removed object.
),
patches: &mut Vec<OrmDiffOp>,
paths_of_objects_to_create: &mut HashSet<(Vec<String>, Option<SubjectIri>)>,
) {
// If the tracked subject is not valid, we don't create patches for it
if tracked_subject.valid != OrmTrackedSubjectValidity::Valid {
return;
}
// If the tracked subject is newly valid (was not valid before but is now),
// we need to ensure the object is created with an "add object" patch
if tracked_subject.prev_valid != OrmTrackedSubjectValidity::Valid {
// Check if we're at a root subject or need to traverse to parents
if tracked_subject.parents.is_empty() || tracked_subject.shape.iri == *root_shape {
// At root: build the path with the subject IRI
let escaped_path: Vec<String> =
path.iter().map(|seg| escape_json_pointer(seg)).collect();
let json_pointer = format!(
"/{}/{}",
escape_json_pointer(&tracked_subject.subject_iri),
escaped_path.join("/")
);
// Create an "add object" patch to ensure the object exists
patches.push(OrmDiffOp {
op: OrmDiffOpType::add,
valType: Some(OrmDiffType::object),
path: json_pointer.clone(),
value: None,
});
// Also add the id field for the object
patches.push(OrmDiffOp {
op: OrmDiffOpType::add,
valType: None,
path: format!("{}/id", json_pointer),
value: Some(json!(tracked_subject.subject_iri)),
});
} else {
// Not at root: traverse to parents and create object patches along the way
for (_parent_iri, parent_tracked_subject) in tracked_subject.parents.iter() {
let parent_ts = parent_tracked_subject.read().unwrap();
if let Some(new_path) =
build_path_segment_for_parent(tracked_subject, &parent_ts, path)
{
// Recurse to the parent first
build_path_to_root_and_create_patches(
&parent_ts,
tracked_subjects,
root_shape,
&mut new_path.clone(),
(OrmDiffOpType::add, Some(OrmDiffType::object), None, None),
patches,
paths_of_objects_to_create,
);
}
}
}
}
// If this subject has no parents or its shape matches the root shape, we've reached the root
if tracked_subject.parents.is_empty() || tracked_subject.shape.iri == *root_shape {
// Build the final JSON Pointer path
let escaped_path: Vec<String> = path.iter().map(|seg| escape_json_pointer(seg)).collect();
// Always add the root subject to the path.
let json_pointer = format!(
"/{}/{}",
escape_json_pointer(&tracked_subject.subject_iri),
escaped_path.join("/")
);
// Create the patch
patches.push(OrmDiffOp {
op: diff_op.0.clone(),
valType: diff_op.1.clone(),
path: json_pointer.clone(),
value: diff_op.2.clone(),
});
// // If a new object is created on a predicate where multiple ones are allowed, create IRI path too.
// if let Some(added_obj_iri) = diff_op.3 {
// patches.push(OrmDiffOp {
// op: diff_op.0.clone(),
// valType: diff_op.1.clone(),
// path: format!("{}/{}", json_pointer, escape_json_pointer(&added_obj_iri)),
// value: diff_op.2.clone(),
// });
// }
return;
}
// Recurse to parents
for (_parent_iri, parent_tracked_subject) in tracked_subject.parents.iter() {
let parent_ts = parent_tracked_subject.read().unwrap();
// Build the path segment for this parent
if let Some(new_path) = build_path_segment_for_parent(tracked_subject, &parent_ts, path) {
// Recurse to the parent
build_path_to_root_and_create_patches(
&parent_ts,
tracked_subjects,
root_shape,
&mut new_path.clone(),
diff_op.clone(),
patches,
paths_of_objects_to_create,
);
}
}
}
/// Create diff operations from a predicate change.
/// Returns a list of (op_type, value_type, value, iri) tuples.
fn create_diff_ops_from_predicate_change(
pred_change: &OrmTrackedPredicateChanges,
) -> Vec<(
OrmDiffOpType,
Option<OrmDiffType>,
Option<Value>, // The value added / removed
Option<String>, // The IRI, if change is an added / removed object.
)> {
let tracked_predicate = pred_change.tracked_predicate.read().unwrap();
let is_multi = tracked_predicate.schema.maxCardinality > 1
|| tracked_predicate.schema.maxCardinality == -1;
let is_object = tracked_predicate
.schema
.dataTypes
.iter()
.any(|dt| dt.shape.is_some());
let mut ops = vec![];
if !is_multi && !is_object {
if pred_change.values_added.len() == 1 {
// A value was added. Another one might have been removed
// but the add patch overwrite previous values.
return [(
OrmDiffOpType::add,
None,
Some(json!(pred_change.values_added[0])),
None,
)]
.to_vec();
} else {
// Since there is only one possible value, removing the path is enough.
return [(OrmDiffOpType::remove, None, None, None)].to_vec();
}
} else if is_multi && !is_object {
if pred_change.values_added.len() > 0 {
ops.push((
OrmDiffOpType::add,
Some(OrmDiffType::set),
Some(json!(pred_change.values_added)),
None,
));
}
if pred_change.values_removed.len() > 0 {
ops.push((
OrmDiffOpType::remove,
Some(OrmDiffType::set),
Some(json!(pred_change.values_removed)),
None,
));
}
return ops;
}
// else if !is_multi && is_object {
// if pred_change.values_added.len() > 0 {
// ops.push((OrmDiffOpType::add, Some(OrmDiffType::object), None, None));
// } else if pred_change.values_removed.len() > 0 {
// ops.push((OrmDiffOpType::remove, Some(OrmDiffType::object), None, None));
// }
// } else if is_multi && is_object {
// for val_added in pred_change.values_added.iter() {
// let iri = match val_added {
// BasicType::Str(s) => s,
// _ => {
// continue;
// }
// };
// ops.push((
// OrmDiffOpType::add,
// Some(OrmDiffType::object),
// None,
// Some(iri.clone()),
// ));
// }
// for val_removed in pred_change.values_added.iter() {
// let iri = match val_removed {
// BasicType::Str(s) => s,
// _ => {
// continue;
// }
// };
// ops.push((
// OrmDiffOpType::remove,
// Some(OrmDiffType::object),
// None,
// Some(iri.clone()),
// ));
// }
// }
return ops;
}

Loading…
Cancel
Save