@ -14,6 +14,11 @@ pub mod validation;
use futures ::channel ::mpsc ;
use futures ::channel ::mpsc ;
use futures ::channel ::mpsc ::UnboundedSender ;
use futures ::channel ::mpsc ::UnboundedSender ;
use ng_net ::types ::OverlayLink ;
use ng_oxigraph ::oxrdf ::Quad ;
use ng_repo ::errors ::VerifierError ;
use ng_repo ::types ::OverlayId ;
use ng_repo ::types ::RepoId ;
use std ::collections ::HashMap ;
use std ::collections ::HashMap ;
use std ::collections ::HashSet ;
use std ::collections ::HashSet ;
@ -75,6 +80,7 @@ impl Verifier {
return Err ( NgError ::SparqlError ( e . to_string ( ) ) ) ;
return Err ( NgError ::SparqlError ( e . to_string ( ) ) ) ;
}
}
Ok ( triple ) = > {
Ok ( triple ) = > {
log_debug ! ( "Triple fetched: {:?}" , triple ) ;
result_triples . push ( triple ) ;
result_triples . push ( triple ) ;
}
}
}
}
@ -173,9 +179,13 @@ impl Verifier {
HashMap ::new ( ) ;
HashMap ::new ( ) ;
// For each subject, add/remove triples and validate.
// For each subject, add/remove triples and validate.
log_debug ! ( "all_modified_subjects: {:?}" , modified_subject_iris ) ;
log_debug ! (
"processing modified subjects: {:?} against shape: {}" ,
modified_subject_iris ,
shape . iri
) ;
for subject_iri in modified_subject_iris {
for subject_iri in & modified_subject_iris {
let validation_key = ( shape . iri . clone ( ) , subject_iri . to_string ( ) ) ;
let validation_key = ( shape . iri . clone ( ) , subject_iri . to_string ( ) ) ;
// Cycle detection: Check if this (shape, subject) pair is already being validated
// Cycle detection: Check if this (shape, subject) pair is already being validated
@ -187,7 +197,8 @@ impl Verifier {
) ;
) ;
// Mark as invalid due to cycle
// Mark as invalid due to cycle
// TODO: We could handle this by handling nested references as IRIs.
// TODO: We could handle this by handling nested references as IRIs.
if let Some ( tracked_shapes ) = orm_subscription . tracked_subjects . get ( subject_iri )
if let Some ( tracked_shapes ) =
orm_subscription . tracked_subjects . get ( * subject_iri )
{
{
if let Some ( tracked_subject ) = tracked_shapes . get ( & shape . iri ) {
if let Some ( tracked_subject ) = tracked_shapes . get ( & shape . iri ) {
let mut ts = tracked_subject . write ( ) . unwrap ( ) ;
let mut ts = tracked_subject . write ( ) . unwrap ( ) ;
@ -201,12 +212,13 @@ impl Verifier {
// Mark as currently validating
// Mark as currently validating
currently_validating . insert ( validation_key . clone ( ) ) ;
currently_validating . insert ( validation_key . clone ( ) ) ;
// Get triples of subject (added & removed).
let triples_added_for_subj = added_triples_by_subject
let triples_added_for_subj = added_triples_by_subject
. get ( subject_iri )
. get ( * subject_iri )
. map ( | v | v . as_slice ( ) )
. map ( | v | v . as_slice ( ) )
. unwrap_or ( & [ ] ) ;
. unwrap_or ( & [ ] ) ;
let triples_removed_for_subj = removed_triples_by_subject
let triples_removed_for_subj = removed_triples_by_subject
. get ( subject_iri )
. get ( * subject_iri )
. map ( | v | v . as_slice ( ) )
. map ( | v | v . as_slice ( ) )
. unwrap_or ( & [ ] ) ;
. unwrap_or ( & [ ] ) ;
@ -214,9 +226,9 @@ impl Verifier {
let change = orm_changes
let change = orm_changes
. entry ( shape . iri . clone ( ) )
. entry ( shape . iri . clone ( ) )
. or_insert_with ( HashMap ::new )
. or_insert_with ( HashMap ::new )
. entry ( subject_iri . clone ( ) )
. entry ( ( * subject_iri ) . clone ( ) )
. or_insert_with ( | | OrmTrackedSubjectChange {
. or_insert_with ( | | OrmTrackedSubjectChange {
subject_iri : subject_iri . clone ( ) ,
subject_iri : ( * subject_iri ) . clone ( ) ,
predicates : HashMap ::new ( ) ,
predicates : HashMap ::new ( ) ,
data_applied : false ,
data_applied : false ,
} ) ;
} ) ;
@ -248,7 +260,7 @@ impl Verifier {
let validity = {
let validity = {
let tracked_subject_opt = orm_subscription
let tracked_subject_opt = orm_subscription
. tracked_subjects
. tracked_subjects
. get ( subject_iri )
. get ( * subject_iri )
. and_then ( | m | m . get ( & shape . iri ) ) ;
. and_then ( | m | m . get ( & shape . iri ) ) ;
let Some ( tracked_subject ) = tracked_subject_opt else {
let Some ( tracked_subject ) = tracked_subject_opt else {
continue ;
continue ;
@ -277,14 +289,8 @@ impl Verifier {
}
}
}
}
}
}
// Remove from validation stack after processing this subject
currently_validating . remove ( & validation_key ) ;
}
}
// TODO: Currently, all shape <-> nested subject combinations are queued for re-evaluation.
// Is that okay?
// Now, we queue all non-evaluated objects
// Now, we queue all non-evaluated objects
for ( shape_iri , objects_to_eval ) in & nested_objects_to_eval {
for ( shape_iri , objects_to_eval ) in & nested_objects_to_eval {
let orm_subscription = self . get_first_orm_subscription_for (
let orm_subscription = self . get_first_orm_subscription_for (
@ -333,6 +339,10 @@ impl Verifier {
shape_validation_stack . push ( ( shape_arc , objects_not_to_fetch ) ) ;
shape_validation_stack . push ( ( shape_arc , objects_not_to_fetch ) ) ;
}
}
}
}
for subject_iri in modified_subject_iris {
let validation_key = ( shape . iri . clone ( ) , subject_iri . to_string ( ) ) ;
currently_validating . remove ( & validation_key ) ;
}
}
}
Ok ( ( ) )
Ok ( ( ) )
@ -375,6 +385,37 @@ impl Verifier {
} ) . next ( ) . unwrap ( )
} ) . next ( ) . unwrap ( )
}
}
pub fn get_first_orm_subscription_sender_for (
& mut self ,
nuri : & NuriV0 ,
shape : Option < & ShapeIri > ,
session_id : Option < & u64 > ,
) -> Result < ( UnboundedSender < AppResponse > , & OrmSubscription ) , VerifierError > {
let subs = self
. orm_subscriptions
. get_mut ( nuri )
. unwrap ( ) ;
subs . retain ( | sub | ! sub . sender . is_closed ( ) ) ;
match
// Filter shapes, if present.
subs . iter ( )
. filter ( | s | match shape {
Some ( sh ) = > * sh = = s . shape_type . shape ,
None = > true , // Filter session ids if present.
} )
. filter ( | s | match session_id {
Some ( id ) = > * id = = s . session_id ,
None = > true ,
} )
. next ( )
{
None = > Err ( VerifierError ::OrmSubscriptionNotFound ) ,
Some ( subscription ) = > {
Ok ( ( subscription . sender . clone ( ) , subscription ) )
}
}
}
/// Apply triples to a nuri's document.
/// Apply triples to a nuri's document.
/// Updates tracked_subjects in orm_subscriptions.
/// Updates tracked_subjects in orm_subscriptions.
fn apply_triple_changes (
fn apply_triple_changes (
@ -617,32 +658,115 @@ impl Verifier {
return Ok ( return_vals ) ;
return Ok ( return_vals ) ;
}
}
pub ( crate ) async fn orm_update ( & mut self , scope : & NuriV0 , patch : GraphQuadsPatch ) { }
pub ( crate ) async fn orm_update (
& mut self ,
session_id : u64 ,
repo_id : RepoId ,
overlay_id : OverlayId ,
patch : GraphQuadsPatch ,
) {
let overlaylink : OverlayLink = overlay_id . into ( ) ;
for ( scope , subs ) in self . orm_subscriptions . iter_mut ( ) {
subs . retain ( | sub | ! sub . sender . is_closed ( ) ) ;
if scope . entire_store
| | scope . overlay . as_ref ( ) . map_or ( false , | ol | overlaylink = = * ol )
| | scope . target = = NuriTargetV0 ::Repo ( repo_id )
{
for sub in subs {
if sub . session_id ! = session_id { // this is incorrect. we are excluding all the subscriptions from the originating session,
// while we should only exclude the one with exact same shape_type. but we don't have access to that here
// TODO: implement this, generate orm_diff using the patch and the sub.shape_type
let orm_diff : OrmDiff = vec! [ ] ;
pub ( crate ) async fn orm_frontend_update (
_ = sub . sender . send ( AppResponse ::V0 ( AppResponseV0 ::OrmUpdate ( orm_diff . to_vec ( ) ) ) ) . await ;
}
}
}
}
}
pub ( crate ) async fn orm_update_self (
& mut self ,
& mut self ,
scope : & NuriV0 ,
scope : & NuriV0 ,
shape_iri : ShapeIri ,
shape_iri : ShapeIri ,
diff : OrmDiff ,
session_id : u64 ,
) {
skolemnized_blank_nodes : Vec < Quad > ,
log_info ! ( "frontend_update_orm {:?} {} {:?}" , scope , shape_iri , diff ) ;
revert_inserts : Vec < Quad > ,
revert_removes : Vec < Quad > ,
) -> Result < ( ) , VerifierError > {
let ( mut sender , orm_subscription ) =
self . get_first_orm_subscription_sender_for ( scope , Some ( & shape_iri ) , Some ( & session_id ) ) ? ;
// TODO prepare OrmUpdateBlankNodeIds with skolemnized_blank_nodes
// use orm_subscription if needed
// note(niko): I think skolemnized blank nodes can still be many, in case of multi-level nested sub-objects.
let orm_bnids = vec! [ ] ;
let _ = sender . send ( AppResponse ::V0 ( AppResponseV0 ::OrmUpdateBlankNodeIds ( orm_bnids ) ) ) . await ;
// TODO (later) revert the inserts and removes
// let orm_diff = vec![];
// let _ = sender.send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff))).await;
Ok ( ( ) )
}
}
pub ( crate ) async fn push_orm_response (
pub ( crate ) async fn orm_frontend_updat e(
& mut self ,
& mut self ,
nuri : & NuriV0 ,
session_id : u64 ,
session_id : u64 ,
sender : UnboundedSender < AppResponse > ,
scope : & NuriV0 ,
response : AppResponse ,
shape_iri : ShapeIri ,
) {
diff : OrmDiff ,
log_debug ! ( "sending orm response for session {}:" , session_id ) ;
) -> Result < ( ) , String > {
log_info ! (
if sender . is_closed ( ) {
"frontend_update_orm session={} scope={:?} shape={} diff={:?}" ,
log_debug ! ( "closed so removing session {}" , session_id ) ;
session_id ,
scope ,
shape_iri ,
diff
) ;
// find OrmSubscription
let ( doc_nuri , sparql_update ) = {
let orm_subscription =
self . get_first_orm_subscription_for ( scope , Some ( & shape_iri ) , Some ( & session_id ) ) ;
// use orm_subscription as needed
// do the magic, then, find the doc where the query should start and generate the sparql update
let doc_nuri = NuriV0 ::new_empty ( ) ;
let sparql_update : String = String ::new ( ) ;
( doc_nuri , sparql_update )
} ;
self . orm_subscriptions . remove ( & nuri ) ;
match self
} else {
. process_sparql_update (
let _ = sender . clone ( ) . send ( response ) . await ;
& doc_nuri ,
& sparql_update ,
& None ,
self . get_peer_id_for_skolem ( ) ,
session_id ,
)
. await
{
Err ( e ) = > Err ( e ) ,
Ok ( ( _ , revert_inserts , revert_removes , skolemnized_blank_nodes ) ) = > {
if ! revert_inserts . is_empty ( )
| | ! revert_removes . is_empty ( )
| | ! skolemnized_blank_nodes . is_empty ( )
{
self . orm_update_self (
scope ,
shape_iri ,
session_id ,
skolemnized_blank_nodes ,
revert_inserts ,
revert_removes ,
)
. await . map_err ( | e | e . to_string ( ) ) ? ;
}
Ok ( ( ) )
}
}
}
}
}
@ -661,7 +785,7 @@ impl Verifier {
shape_type : & OrmShapeType ,
shape_type : & OrmShapeType ,
session_id : u64 ,
session_id : u64 ,
) -> Result < ( Receiver < AppResponse > , CancelFn ) , NgError > {
) -> Result < ( Receiver < AppResponse > , CancelFn ) , NgError > {
let ( tx , rx ) = mpsc ::unbounded ::< AppResponse > ( ) ;
let ( mut tx , rx ) = mpsc ::unbounded ::< AppResponse > ( ) ;
// TODO: Validate schema:
// TODO: Validate schema:
// If multiple data types are present for the same predicate, they must be of of the same type.
// If multiple data types are present for the same predicate, they must be of of the same type.
@ -681,19 +805,13 @@ impl Verifier {
. or_insert ( vec! [ ] )
. or_insert ( vec! [ ] )
. push ( orm_subscription ) ;
. push ( orm_subscription ) ;
let _ orm_objects = self . create_orm_object_for_shape ( nuri , session_id , & shape_type ) ? ;
let orm_objects = self . create_orm_object_for_shape ( nuri , session_id , & shape_type ) ? ;
// log_debug!("create_orm_object_for_shape return {:?}", _ orm_objects);
// log_debug!("create_orm_object_for_shape return {:?}", orm_objects);
self . push_orm_response (
let _ = tx . send ( AppResponse ::V0 ( AppResponseV0 ::OrmInitial ( orm_objects ) ) ) . await ;
& nuri . clone ( ) ,
session_id ,
tx . clone ( ) ,
AppResponse ::V0 ( AppResponseV0 ::OrmInitial ( _orm_objects ) ) ,
)
. await ;
let close = Box ::new ( move | | {
let close = Box ::new ( move | | {
log_debug ! ( "CLOSE_CHANNEL of subscription" ) ;
log_debug ! ( "closing ORM subscription" ) ;
if ! tx . is_closed ( ) {
if ! tx . is_closed ( ) {
tx . close_channel ( ) ;
tx . close_channel ( ) ;
}
}