From dd20bfde5be83c816f0c5f1d09678338aefc72ce Mon Sep 17 00:00:00 2001 From: Niko PLP Date: Wed, 12 Jun 2024 03:09:06 +0300 Subject: [PATCH] SPARQL Query CRDT --- Cargo.lock | 2 + ng-oxigraph/Cargo.toml | 4 +- ng-oxigraph/src/oxigraph/sparql/dataset.rs | 68 +- .../src/oxigraph/storage/backend/fallback.rs | 2 + .../src/oxigraph/storage/binary_encoder.rs | 37 +- ng-oxigraph/src/oxigraph/storage/mod.rs | 856 +++++++++++++++--- ng-oxigraph/src/oxigraph/store.rs | 24 +- ng-verifier/src/commits/transaction.rs | 7 +- 8 files changed, 870 insertions(+), 130 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 846e161..b680dae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3387,12 +3387,14 @@ dependencies = [ name = "ng-oxigraph" version = "0.4.0-alpha.7-ngpreview6" dependencies = [ + "base64-url", "codspeed-criterion-compat", "digest 0.10.7", "getrandom 0.2.10", "hex", "js-sys", "json-event-parser", + "lazy_static", "libc", "md-5", "memchr", diff --git a/ng-oxigraph/Cargo.toml b/ng-oxigraph/Cargo.toml index 16336cd..427b7fc 100644 --- a/ng-oxigraph/Cargo.toml +++ b/ng-oxigraph/Cargo.toml @@ -28,6 +28,7 @@ sep-0006 = [] oxsdatatypes = [] [dependencies] +lazy_static = "1.4.0" digest = "0.10" hex = "0.4" json-event-parser = "0.2.0-alpha.2" @@ -35,7 +36,7 @@ md-5 = "0.10" oxilangtag = "0.1" oxiri = "0.2.3" rand = "0.8" -regex = "1.7" +regex = "1.8.4" serde = { version = "1.0.142", features = ["derive"] } sha1 = "0.10" sha2 = "0.10" @@ -44,6 +45,7 @@ thiserror = "1.0.50" quick-xml = ">=0.29, <0.32" memchr = "2.5" peg = "0.8" +base64-url = "2.0.0" [target.'cfg(all(not(target_family = "wasm"),not(docsrs)))'.dependencies] libc = "0.2" diff --git a/ng-oxigraph/src/oxigraph/sparql/dataset.rs b/ng-oxigraph/src/oxigraph/sparql/dataset.rs index ddd8816..e60a455 100644 --- a/ng-oxigraph/src/oxigraph/sparql/dataset.rs +++ b/ng-oxigraph/src/oxigraph/sparql/dataset.rs @@ -4,18 +4,30 @@ use crate::oxigraph::sparql::EvaluationError; use crate::oxigraph::storage::numeric_encoder::{ insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup, }; -use crate::oxigraph::storage::{StorageError, StorageReader}; +use crate::oxigraph::storage::{MatchBy, StorageError, StorageReader}; +use crate::oxigraph::store::CorruptionError; + use std::cell::RefCell; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::iter::empty; - pub struct DatasetView { reader: StorageReader, extra: RefCell>, dataset: EncodedDatasetSpec, } +struct ErrorIterator { + err: Option>, +} + +impl Iterator for ErrorIterator { + type Item = Result; + fn next(&mut self) -> Option { + self.err.take() + } +} + impl DatasetView { pub fn new(reader: StorageReader, dataset: &QueryDataset) -> Self { let dataset = EncodedDatasetSpec { @@ -33,16 +45,47 @@ impl DatasetView { } } - fn store_encoded_quads_for_pattern( - &self, - subject: Option<&EncodedTerm>, - predicate: Option<&EncodedTerm>, - object: Option<&EncodedTerm>, - graph_name: Option<&EncodedTerm>, - ) -> impl Iterator> + 'static { - self.reader - .quads_for_pattern(subject, predicate, object, graph_name) - .map(|t| t.map_err(Into::into)) + fn parse_graph_name(&self, graph_name: &EncodedTerm) -> Result { + match graph_name { + EncodedTerm::NamedNode { iri_id } => { + let graph_name_string = self.get_str(iri_id)?.ok_or::( + CorruptionError::msg("graph_name not found in parse_graph_name").into(), + )?; + self.reader + .parse_graph_name(&graph_name_string, Some(*iri_id)) + } + _ => Err(CorruptionError::msg( + "Invalid graph_name (not a NamedNode) in parse_graph_name", + ) + .into()), + } + } + + fn store_encoded_quads_for_pattern<'a>( + &'a self, + subject: Option<&'a EncodedTerm>, + predicate: Option<&'a EncodedTerm>, + object: Option<&'a EncodedTerm>, + graph_name: Option<&'a EncodedTerm>, + ) -> Box>> { + let graph = if let Some(g) = graph_name { + match self.parse_graph_name(g) { + Ok(match_by) => Some(match_by), + Err(e) => { + return Box::new(ErrorIterator { + err: Some(Err(e.into())), + }) + } + } + } else { + None + }; + + Box::new( + self.reader + .quads_for_pattern(subject, predicate, object, graph) + .map(|t| t.map_err(Into::into)), + ) } #[allow(clippy::needless_collect)] @@ -141,6 +184,7 @@ impl DatasetView { Box::new(iters.into_iter().flatten()) } else { Box::new( + // TODO: filter could be removed here as we never return quads with defaultGraph as graph self.store_encoded_quads_for_pattern(subject, predicate, object, None) .filter(|quad| match quad { Err(_) => true, diff --git a/ng-oxigraph/src/oxigraph/storage/backend/fallback.rs b/ng-oxigraph/src/oxigraph/storage/backend/fallback.rs index aff1973..5863940 100644 --- a/ng-oxigraph/src/oxigraph/storage/backend/fallback.rs +++ b/ng-oxigraph/src/oxigraph/storage/backend/fallback.rs @@ -85,8 +85,10 @@ impl Db { #[derive(Clone, Ord, PartialOrd, Eq, PartialEq, Hash)] pub struct ColumnFamily(&'static str); +#[derive(Clone)] pub struct Reader(InnerReader); +#[derive(Clone)] enum InnerReader { Simple(Arc, Vec>>>>), Transaction( diff --git a/ng-oxigraph/src/oxigraph/storage/binary_encoder.rs b/ng-oxigraph/src/oxigraph/storage/binary_encoder.rs index 427bb7b..2e7b0cb 100644 --- a/ng-oxigraph/src/oxigraph/storage/binary_encoder.rs +++ b/ng-oxigraph/src/oxigraph/storage/binary_encoder.rs @@ -5,7 +5,7 @@ use crate::oxsdatatypes::*; use std::io::Read; use std::mem::size_of; -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] pub const LATEST_STORAGE_VERSION: u64 = 1; pub const WRITTEN_TERM_MAX_SIZE: usize = size_of::() + 2 * size_of::(); @@ -465,6 +465,12 @@ pub fn encode_term(t: &EncodedTerm) -> Vec { vec } +pub fn encode_graph(t1: StrHash) -> Vec { + let mut vec = Vec::with_capacity(17); + write_term(&mut vec, &EncodedTerm::NamedNode { iri_id: t1 }); + vec +} + pub fn encode_term_pair(t1: &EncodedTerm, t2: &EncodedTerm) -> Vec { let mut vec = Vec::with_capacity(2 * WRITTEN_TERM_MAX_SIZE); write_term(&mut vec, t1); @@ -472,6 +478,13 @@ pub fn encode_term_pair(t1: &EncodedTerm, t2: &EncodedTerm) -> Vec { vec } +pub fn encode_graph_term(t1: StrHash, t2: EncodedTerm) -> Vec { + let mut vec = Vec::with_capacity(WRITTEN_TERM_MAX_SIZE + 17); + write_term(&mut vec, &EncodedTerm::NamedNode { iri_id: t1 }); + write_term(&mut vec, &t2); + vec +} + pub fn encode_term_triple(t1: &EncodedTerm, t2: &EncodedTerm, t3: &EncodedTerm) -> Vec { let mut vec = Vec::with_capacity(3 * WRITTEN_TERM_MAX_SIZE); write_term(&mut vec, t1); @@ -480,6 +493,14 @@ pub fn encode_term_triple(t1: &EncodedTerm, t2: &EncodedTerm, t3: &EncodedTerm) vec } +pub fn encode_term_graph_pair(t1: StrHash, t2: EncodedTerm, t3: EncodedTerm) -> Vec { + let mut vec = Vec::with_capacity(2 * WRITTEN_TERM_MAX_SIZE + 17); + write_term(&mut vec, &EncodedTerm::NamedNode { iri_id: t1 }); + write_term(&mut vec, &t2); + write_term(&mut vec, &t3); + vec +} + pub fn encode_term_quad( t1: &EncodedTerm, t2: &EncodedTerm, @@ -494,6 +515,20 @@ pub fn encode_term_quad( vec } +pub fn encode_term_graph_triple( + t1: StrHash, + t2: EncodedTerm, + t3: EncodedTerm, + t4: EncodedTerm, +) -> Vec { + let mut vec = Vec::with_capacity(3 * WRITTEN_TERM_MAX_SIZE + 17); + write_term(&mut vec, &EncodedTerm::NamedNode { iri_id: t1 }); + write_term(&mut vec, &t2); + write_term(&mut vec, &t3); + write_term(&mut vec, &t4); + vec +} + pub fn write_term(sink: &mut Vec, term: &EncodedTerm) { match term { EncodedTerm::DefaultGraph => (), diff --git a/ng-oxigraph/src/oxigraph/storage/mod.rs b/ng-oxigraph/src/oxigraph/storage/mod.rs index 1a167bd..78f4f04 100644 --- a/ng-oxigraph/src/oxigraph/storage/mod.rs +++ b/ng-oxigraph/src/oxigraph/storage/mod.rs @@ -5,7 +5,8 @@ use crate::oxigraph::storage::backend::{Reader, Transaction}; #[cfg(all(not(target_family = "wasm"), not(docsrs)))] use crate::oxigraph::storage::binary_encoder::LATEST_STORAGE_VERSION; use crate::oxigraph::storage::binary_encoder::{ - decode_term, encode_term, encode_term_pair, encode_term_quad, encode_term_triple, + decode_term, encode_graph, encode_graph_term, encode_term, encode_term_graph_pair, + encode_term_graph_triple, encode_term_pair, encode_term_quad, encode_term_triple, write_gosp_quad, write_gpos_quad, write_gspo_quad, write_osp_quad, write_ospg_quad, write_pos_quad, write_posg_quad, write_spo_quad, write_spog_quad, write_term, QuadEncoding, WRITTEN_TERM_MAX_SIZE, @@ -20,11 +21,15 @@ use crate::oxigraph::storage::numeric_encoder::{ }; use crate::oxrdf::NamedNodeRef; use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter}; +use lazy_static::lazy_static; +use regex::Regex; + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] use std::collections::VecDeque; use std::collections::{HashMap, HashSet}; use std::error::Error; use std::io::Read; +use std::iter::Peekable; #[cfg(all(not(target_family = "wasm"), not(docsrs)))] use std::mem::{swap, take}; #[cfg(all(not(target_family = "wasm"), not(docsrs)))] @@ -35,6 +40,8 @@ use std::sync::Mutex; #[cfg(all(not(target_family = "wasm"), not(docsrs)))] use std::{io, thread}; +use self::numeric_encoder::EncodedTriple; + mod backend; mod binary_encoder; mod error; @@ -63,6 +70,32 @@ const DEFAULT_CF: &str = "default"; #[cfg(all(not(target_family = "wasm"), not(docsrs)))] const DEFAULT_BULK_LOAD_BATCH_SIZE: usize = 1_000_000; +const DID_PREFIX: &str = "did:ng"; + +lazy_static! { + #[doc(hidden)] + static ref RE_REPO: Regex = + Regex::new(r"^did:ng:o:([A-Za-z0-9-_%.]*):v:([A-Za-z0-9-_%.]*)$").unwrap(); + #[doc(hidden)] + static ref RE_BRANCH: Regex = + Regex::new(r"^:b:([A-Za-z0-9-_%.]*)$").unwrap(); + #[doc(hidden)] + static ref RE_COMMITS: Regex = + Regex::new(r":c:([A-Za-z0-9-_%.]*)").unwrap(); + #[doc(hidden)] + static ref RE_NAMED_BRANCH_OR_COMMIT: Regex = + Regex::new(r"^:a:([A-Za-z0-9-_%.]*)$").unwrap(); //TODO: allow international chars. disallow digit as first char +} + +pub(crate) enum MatchBy { + Repos(RepoIterator), + Commits { + heads: HashSet, + at_current_heads: bool, + original_graph_name: StrHash, + }, +} + /// Low level storage primitives #[derive(Clone)] pub struct Storage { @@ -421,7 +454,368 @@ fn encode_three_hashes(sh1: &StrHash, sh2: &StrHash, sh3: &StrHash) -> Vec { } } */ +pub(crate) struct RepoIterator { + pub iter: Option, + pub single: Option, +} + +impl Iterator for RepoIterator { + type Item = StrHash; + + fn next(&mut self) -> Option { + if let Some(iter) = self.iter.as_mut() { + if let Err(e) = iter.status() { + return None; + } + let key = iter.key()?; + let mut buffer = [0; 16]; + (&key[16..32]).read_exact(&mut buffer).ok()?; + let res = StrHash::from_be_bytes(buffer); + iter.next(); + Some(res) + } else { + self.single.take() + } + } +} + +impl From for Box>> { + fn from(e: StorageError) -> Self { + Box::new(std::iter::once(Err(e))) + } +} + +struct NgRepoQuadIterator { + reader: StorageReader, + iter: RepoIterator, +} + +impl NgRepoQuadIterator { + fn new(storage_reader: &StorageReader, iter: RepoIterator) -> Self { + NgRepoQuadIterator { + reader: StorageReader { + reader: storage_reader.reader.clone(), + storage: storage_reader.storage.clone(), + }, + iter, + } + } + fn into_iter( + self, + f: impl Fn(&StorageReader, StrHash) -> DecodingQuadIterator + 'static, + ) -> Box>> { + Box::new(self.iter.map(move |repo| f(&self.reader, repo)).flatten()) + } +} + +struct NgPastQuadIterator<'a> { + reader: StorageReader, + iter: std::collections::hash_set::IntoIter, + current: Option, + f: Box DecodingNgQuadIterator + 'a>, +} + +impl<'a> Iterator for NgPastQuadIterator<'a> { + type Item = Result<(EncodedQuad, u8), StorageError>; + + fn next(&mut self) -> Option { + loop { + let next = self.find_next(); + if next.is_ok() { + return next.unwrap(); + } + } + } +} + +impl<'a> NgPastQuadIterator<'a> { + fn find_next(&mut self) -> Result>, bool> { + if self.current.is_none() { + let i = self.iter.next(); + if i.is_none() { + return Ok(None); + } + self.current = Some((self.f)(&self.reader, i.unwrap())); + } + let next = self.current.as_mut().unwrap().next(); + if next.is_none() { + self.current = None; + return Err(false); + } + Ok(next) + } + + fn new( + storage_reader: &StorageReader, + past: Arc>, + f: impl Fn(&StorageReader, StrHash) -> DecodingNgQuadIterator + 'a, + ) -> NgPastQuadIterator<'a> { + NgPastQuadIterator { + current: None, + reader: StorageReader { + reader: storage_reader.reader.clone(), + storage: storage_reader.storage.clone(), + }, + f: Box::new(f), + //TODO: avoid the copy of past (store a Vec instead of HashSet in cache) + iter: past.as_ref().clone().into_iter(), + } + } +} + +struct NgCommitQuadIterator { + iter: Peekable>>>, + at_current_heads: bool, + original_graph_name: StrHash, + current: Option, + current_add_is_removed: Option>, + current_is_added: bool, + reader: StorageReader, + past: Arc>, + skip_check_is_in_past: bool, +} +impl NgCommitQuadIterator { + fn end_of_triple(&mut self) -> Option> { + let mut ret = None; + if let Some(cur) = self.current.as_ref() { + if !self.current_is_added && !self.at_current_heads { + if let Some(removed) = &self.current_add_is_removed { + let removed_in = HashSet::from_iter( + removed + .iter() + .map(|c| { + if let Ok(res) = self.reader.ng_get_removed( + &cur.subject, + &cur.predicate, + &cur.object, + c, + ) { + res.into_iter() + } else { + HashSet::new().into_iter() + } + }) + .flatten(), + ); + // if at least one of removed_in is not in past, then we add it. + if !removed_in.is_subset(&self.past) { + let cur = self.current.take().unwrap(); + ret = Some(Ok(EncodedQuad::new( + cur.subject, + cur.predicate, + cur.object, + EncodedTerm::NamedNode { + iri_id: self.original_graph_name, + }, + ))); + } + } + } + } + self.current_is_added = false; + self.current_add_is_removed = None; + self.current = None; + + ret + } + + fn find_next(&mut self) -> Result>, bool> { + match self.iter.peek() { + None | Some(Err(_)) => match self.end_of_triple() { + Some(res) => Ok(Some(res)), + None => match self.iter.next() { + None => Ok(None), + Some(Err(e)) => Ok(Some(Err(e))), + _ => { + panic!(""); + } + }, + }, + Some(Ok((quad, value))) => { + if let EncodedTerm::NamedNode { iri_id } = quad.graph_name { + if self.skip_check_is_in_past || self.past.contains(&iri_id) { + let triple = EncodedTriple::new( + quad.subject.clone(), + quad.predicate.clone(), + quad.object.clone(), + ); + if let Some(cur) = self.current.as_ref() { + if &triple != cur { + match self.end_of_triple() { + Some(res) => return Ok(Some(res)), + None => { + self.current = Some(triple); + } + } + } + } else { + self.current = Some(triple); + } + let (q, value) = self.iter.next().unwrap().unwrap(); + if is_added(value) { + self.current_is_added = true; + self.current_add_is_removed = None; + return Ok(Some(Ok(EncodedQuad::new( + q.subject, + q.predicate, + q.object, + EncodedTerm::NamedNode { + iri_id: self.original_graph_name, + }, + )))); + } else if is_removed(value) { + if !self.at_current_heads && !self.current_is_added { + if self.current_add_is_removed.is_none() { + self.current_add_is_removed = Some(HashSet::new()); + } + self.current_add_is_removed.as_mut().unwrap().insert(iri_id); + } + return Err(false); + } + } else { + return Err(false); + } + } + Ok(None) + } + } + } +} + +impl Iterator for NgCommitQuadIterator { + type Item = Result; + + fn next(&mut self) -> Option { + loop { + let res = self.find_next(); + if let Ok(found) = res { + return found; + } + } + } +} + impl StorageReader { + pub(crate) fn parse_graph_name( + &self, + graph_name_string: &String, + iri_id: Option, + ) -> Result { + let graph_name_string_len = graph_name_string.len(); + // TODO: deal with <:v> and <:v:n> + if graph_name_string_len < 100 { + return Err( + CorruptionError::msg("Invalid graph_name (too short) in parse_graph_name").into(), + ); + } + let (repo_part, other_part) = graph_name_string.split_at(100); + + let c = RE_REPO.captures(repo_part); + let (_repo, overlay) = if c.is_some() + && c.as_ref().unwrap().get(1).is_some() + && c.as_ref().unwrap().get(2).is_some() + { + let cap = c.unwrap(); + let o = cap.get(1).unwrap().as_str(); + let v = cap.get(2).unwrap().as_str(); + (o, v) + } else { + return Err(CorruptionError::msg( + "Invalid graph_name (does not start with did:ng:o:v) in parse_graph_name", + ) + .into()); + }; + + let ov_hash = StrHash::new(repo_part); + + let overlay_hash = StrHash::new(&format!("{DID_PREFIX}:v:{overlay}")); + + // we check that did:ng:o:v is present in dataset + self.get_str(&ov_hash)?.ok_or::( + CorruptionError::msg( + "Invalid graph_name (did:ng:o:v part not found) in parse_graph_name", + ) + .into(), + )?; + + Ok(if graph_name_string_len > 100 && iri_id.is_some() { + if graph_name_string_len < 104 { + return Err(CorruptionError::msg( + "Invalid graph_name (second part is too short) in parse_graph_name", + ) + .into()); + } + let original_graph_name = iri_id.unwrap(); + + match &other_part[0..3] { + ":b:" => { + RE_BRANCH.find(other_part).ok_or::( + CorruptionError::msg( + "Invalid graph_name (invalid branch part) in parse_graph_name", + ) + .into(), + )?; + let branch_hash = StrHash::new(&format!("{DID_PREFIX}{other_part}")); + let topic_hash = self.ng_get_branch_topic(&branch_hash, &overlay_hash)?; + let heads = self.ng_get_heads(&topic_hash, &overlay_hash)?; + MatchBy::Commits { + heads, + at_current_heads: true, + original_graph_name, + } + } + ":a:" => { + RE_NAMED_BRANCH_OR_COMMIT + .find(other_part) + .ok_or::( + CorruptionError::msg( + "Invalid graph_name (invalid named part) in parse_graph_name", + ) + .into(), + )?; + unimplemented!(); + } + ":c:" => { + let commits: Vec<&str> = RE_COMMITS + .find_iter(other_part) + .map(|m| m.as_str()) + .collect(); + if commits.is_empty() { + return Err(CorruptionError::msg( + "Invalid graph_name (invalid commit IDs) in parse_graph_name", + ) + .into()); + } + // TODO: check that all the commits are from the same branch + // TODO: if commits are exactly like current heads of branch, set at_current_heads = true + MatchBy::Commits { + heads: HashSet::from_iter( + commits + .into_iter() + .map(|c| StrHash::new(&format!("{DID_PREFIX}:c:{c}:v:{overlay}"))), + ), + at_current_heads: false, + original_graph_name, + } + } + ":n:" => { + unimplemented!() + } + _ => { + return Err(CorruptionError::msg( + "Invalid graph_name (unknown second part) in parse_graph_name", + ) + .into()) + } + } + } else { + MatchBy::Repos(RepoIterator { + iter: None, + single: Some(ov_hash), + }) + }) + } + pub fn ng_get_heads( &self, topic: &StrHash, @@ -442,6 +836,31 @@ impl StorageReader { Ok(set) } + pub fn ng_get_branch_topic( + &self, + branch: &StrHash, + overlay: &StrHash, + ) -> Result { + let mut key = Vec::with_capacity(33); + key.push(BRANCH_PREFIX); + key.extend_from_slice(&branch.to_be_bytes()); + key.extend_from_slice(&overlay.to_be_bytes()); + + let val = self + .reader + .get(&self.storage.branches_cf, &key)? + .ok_or(CorruptionError::msg("Branch not found"))?; + + if val[0] == 1 { + return Err(CorruptionError::msg("This is a store branch").into()); + } + + let mut buffer = [0; 16]; + (&val[1..17]).read_exact(&mut buffer)?; + + Ok(StrHash::from_be_bytes(buffer)) + } + pub fn ng_get_removed( &self, subject: &EncodedTerm, @@ -590,13 +1009,13 @@ impl StorageReader { } } - pub fn quads_for_pattern( + pub(crate) fn quads_for_pattern( &self, subject: Option<&EncodedTerm>, predicate: Option<&EncodedTerm>, object: Option<&EncodedTerm>, - graph_name: Option<&EncodedTerm>, - ) -> ChainedDecodingQuadIterator { + graph_name: Option, + ) -> Box>> { match subject { Some(subject) => match predicate { Some(predicate) => match object { @@ -653,30 +1072,29 @@ impl StorageReader { } } - pub fn quads(&self) -> ChainedDecodingQuadIterator { - ChainedDecodingQuadIterator::pair(self.dspo_quads(&[]), self.gspo_quads(&[])) + pub fn quads(&self) -> Box>> { + self.ng_spog_quads(&[]).into_dataset_iter() } fn quads_in_named_graph(&self) -> DecodingQuadIterator { self.gspo_quads(&[]) } - fn quads_for_subject(&self, subject: &EncodedTerm) -> ChainedDecodingQuadIterator { - ChainedDecodingQuadIterator::pair( - self.dspo_quads(&encode_term(subject)), - self.spog_quads(&encode_term(subject)), - ) + fn quads_for_subject( + &self, + subject: &EncodedTerm, + ) -> Box>> { + self.ng_spog_quads(&encode_term(subject)) + .into_dataset_iter() } fn quads_for_subject_predicate( &self, subject: &EncodedTerm, predicate: &EncodedTerm, - ) -> ChainedDecodingQuadIterator { - ChainedDecodingQuadIterator::pair( - self.dspo_quads(&encode_term_pair(subject, predicate)), - self.spog_quads(&encode_term_pair(subject, predicate)), - ) + ) -> Box>> { + self.ng_spog_quads(&encode_term_pair(subject, predicate)) + .into_dataset_iter() } fn quads_for_subject_predicate_object( @@ -684,80 +1102,132 @@ impl StorageReader { subject: &EncodedTerm, predicate: &EncodedTerm, object: &EncodedTerm, - ) -> ChainedDecodingQuadIterator { - ChainedDecodingQuadIterator::pair( - self.dspo_quads(&encode_term_triple(subject, predicate, object)), - self.spog_quads(&encode_term_triple(subject, predicate, object)), - ) + ) -> Box>> { + self.ng_spog_quads(&encode_term_triple(subject, predicate, object)) + .into_dataset_iter() } fn quads_for_subject_object( &self, subject: &EncodedTerm, object: &EncodedTerm, - ) -> ChainedDecodingQuadIterator { - ChainedDecodingQuadIterator::pair( - self.dosp_quads(&encode_term_pair(object, subject)), - self.ospg_quads(&encode_term_pair(object, subject)), - ) + ) -> Box>> { + self.ng_ospg_quads(&encode_term_pair(object, subject)) + .into_dataset_iter() } - fn quads_for_predicate(&self, predicate: &EncodedTerm) -> ChainedDecodingQuadIterator { - ChainedDecodingQuadIterator::pair( - self.dpos_quads(&encode_term(predicate)), - self.posg_quads(&encode_term(predicate)), - ) + fn quads_for_predicate( + &self, + predicate: &EncodedTerm, + ) -> Box>> { + self.ng_posg_quads(&encode_term(predicate)) + .into_dataset_iter() } fn quads_for_predicate_object( &self, predicate: &EncodedTerm, object: &EncodedTerm, - ) -> ChainedDecodingQuadIterator { - ChainedDecodingQuadIterator::pair( - self.dpos_quads(&encode_term_pair(predicate, object)), - self.posg_quads(&encode_term_pair(predicate, object)), - ) + ) -> Box>> { + self.ng_posg_quads(&encode_term_pair(predicate, object)) + .into_dataset_iter() } - fn quads_for_object(&self, object: &EncodedTerm) -> ChainedDecodingQuadIterator { - ChainedDecodingQuadIterator::pair( - self.dosp_quads(&encode_term(object)), - self.ospg_quads(&encode_term(object)), - ) + fn quads_for_object( + &self, + object: &EncodedTerm, + ) -> Box>> { + self.ng_ospg_quads(&encode_term(object)).into_dataset_iter() } - fn quads_for_graph(&self, graph_name: &EncodedTerm) -> ChainedDecodingQuadIterator { - ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() { - self.dspo_quads(&Vec::default()) - } else { - self.gspo_quads(&encode_term(graph_name)) - }) + fn quads_for_graph( + &self, + graph_name: MatchBy, + ) -> Box>> { + match graph_name { + MatchBy::Repos(repo_iter) => NgRepoQuadIterator::new(self, repo_iter) + .into_iter(|reader, repo| reader.gspo_quads(&encode_graph(repo))), + MatchBy::Commits { + heads, + at_current_heads, + original_graph_name, + } => self.quads_for_ng_iter_graph_heads(heads, at_current_heads, original_graph_name), + } } fn quads_for_subject_graph( &self, subject: &EncodedTerm, - graph_name: &EncodedTerm, - ) -> ChainedDecodingQuadIterator { - ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() { - self.dspo_quads(&encode_term(subject)) - } else { - self.gspo_quads(&encode_term_pair(graph_name, subject)) - }) + graph_name: MatchBy, + ) -> Box>> { + match graph_name { + MatchBy::Repos(repo_iter) => { + let subject = subject.clone(); + NgRepoQuadIterator::new(self, repo_iter).into_iter(move |reader, repo| { + reader.gspo_quads(&encode_graph_term(repo, subject.clone())) + }) + } + + MatchBy::Commits { + heads, + at_current_heads, + original_graph_name, + } => self.quads_for_ng_iter_heads( + self.ng_spog_quads(&encode_term(subject)), + heads, + at_current_heads, + original_graph_name, + ), + } } fn quads_for_subject_predicate_graph( &self, subject: &EncodedTerm, predicate: &EncodedTerm, - graph_name: &EncodedTerm, - ) -> ChainedDecodingQuadIterator { - ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() { - self.dspo_quads(&encode_term_pair(subject, predicate)) - } else { - self.gspo_quads(&encode_term_triple(graph_name, subject, predicate)) - }) + graph_name: MatchBy, + ) -> Box>> { + match graph_name { + MatchBy::Repos(repo_iter) => { + let subject = subject.clone(); + let predicate = predicate.clone(); + NgRepoQuadIterator::new(self, repo_iter).into_iter(move |reader, repo| { + reader.gspo_quads(&encode_term_graph_pair( + repo, + subject.clone(), + predicate.clone(), + )) + }) + } + MatchBy::Commits { + heads, + at_current_heads, + original_graph_name, + } => self.quads_for_ng_iter_heads( + self.ng_spog_quads(&encode_term_pair(subject, predicate)), + heads, + at_current_heads, + original_graph_name, + ), + } + } + + fn repos_in_store(&self, store: &StrHash) -> RepoIterator { + RepoIterator { + single: None, + iter: Some( + self.reader + .scan_prefix(&self.storage.stores_cf, &store.to_be_bytes()) + .unwrap(), + ), // TODO: propagate error? + } + } + + fn single_repo(&self, repo: StrHash) -> RepoIterator { + RepoIterator { + single: Some(repo), + iter: None, + } } fn quads_for_subject_predicate_object_graph( @@ -765,12 +1235,94 @@ impl StorageReader { subject: &EncodedTerm, predicate: &EncodedTerm, object: &EncodedTerm, - graph_name: &EncodedTerm, - ) -> ChainedDecodingQuadIterator { - ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() { - self.dspo_quads(&encode_term_triple(subject, predicate, object)) - } else { - self.gspo_quads(&encode_term_quad(graph_name, subject, predicate, object)) + graph_name: MatchBy, + ) -> Box>> { + match graph_name { + MatchBy::Repos(repo_iter) => { + let subject = subject.clone(); + let predicate = predicate.clone(); + let object = object.clone(); + NgRepoQuadIterator::new(self, repo_iter).into_iter(move |reader, repo| { + reader.gspo_quads(&encode_term_graph_triple( + repo, + subject.clone(), + predicate.clone(), + object.clone(), + )) + }) + } + MatchBy::Commits { + heads, + at_current_heads, + original_graph_name, + } => self.quads_for_ng_iter_heads( + self.ng_spog_quads(&encode_term_triple(subject, predicate, object)), + heads, + at_current_heads, + original_graph_name, + ), + } + } + + pub fn quads_for_ng_iter_graph_heads( + &self, + heads: HashSet, + at_current_heads: bool, + original_graph_name: StrHash, + ) -> Box>> { + let past = self.past_for_heads(&heads); + if past.is_err() { + return past.unwrap_err().into(); + } + let past = past.unwrap(); + + let j = NgPastQuadIterator::new(self, Arc::clone(&past), |reader, c| { + reader.ng_gspo_quads(&encode_graph(c)) + }); + let i: Box>> = Box::new(j); + + Box::new(NgCommitQuadIterator { + iter: i.peekable(), + at_current_heads, + original_graph_name, + current: None, + current_is_added: false, + current_add_is_removed: None, + reader: StorageReader { + reader: self.reader.clone(), + storage: self.storage.clone(), + }, + past: Arc::clone(&past), + skip_check_is_in_past: true, + }) + } + + pub fn quads_for_ng_iter_heads( + &self, + iter: DecodingNgQuadIterator, + heads: HashSet, + at_current_heads: bool, + original_graph_name: StrHash, + ) -> Box>> { + let past = self.past_for_heads(&heads); + if past.is_err() { + return past.unwrap_err().into(); + } + let past = past.unwrap(); + let i: Box>> = Box::new(iter); + Box::new(NgCommitQuadIterator { + iter: i.peekable(), + at_current_heads, + original_graph_name, + current: None, + current_is_added: false, + current_add_is_removed: None, + reader: StorageReader { + reader: self.reader.clone(), + storage: self.storage.clone(), + }, + past, + skip_check_is_in_past: false, }) } @@ -812,50 +1364,114 @@ impl StorageReader { &self, subject: &EncodedTerm, object: &EncodedTerm, - graph_name: &EncodedTerm, - ) -> ChainedDecodingQuadIterator { - ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() { - self.dosp_quads(&encode_term_pair(object, subject)) - } else { - self.gosp_quads(&encode_term_triple(graph_name, object, subject)) - }) + graph_name: MatchBy, + ) -> Box>> { + match graph_name { + MatchBy::Repos(repo_iter) => { + let subject = subject.clone(); + let object = object.clone(); + NgRepoQuadIterator::new(self, repo_iter).into_iter(move |reader, repo| { + reader.gosp_quads(&encode_term_graph_pair( + repo, + object.clone(), + subject.clone(), + )) + }) + } + MatchBy::Commits { + heads, + at_current_heads, + original_graph_name, + } => self.quads_for_ng_iter_heads( + self.ng_ospg_quads(&encode_term_pair(object, subject)), + heads, + at_current_heads, + original_graph_name, + ), + } } fn quads_for_predicate_graph( &self, predicate: &EncodedTerm, - graph_name: &EncodedTerm, - ) -> ChainedDecodingQuadIterator { - ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() { - self.dpos_quads(&encode_term(predicate)) - } else { - self.gpos_quads(&encode_term_pair(graph_name, predicate)) - }) + graph_name: MatchBy, + ) -> Box>> { + match graph_name { + MatchBy::Repos(repo_iter) => { + let predicate = predicate.clone(); + + NgRepoQuadIterator::new(self, repo_iter).into_iter(move |reader, repo| { + reader.gpos_quads(&encode_graph_term(repo, predicate.clone())) + }) + } + MatchBy::Commits { + heads, + at_current_heads, + original_graph_name, + } => self.quads_for_ng_iter_heads( + self.ng_posg_quads(&encode_term(predicate)), + heads, + at_current_heads, + original_graph_name, + ), + } } fn quads_for_predicate_object_graph( &self, predicate: &EncodedTerm, object: &EncodedTerm, - graph_name: &EncodedTerm, - ) -> ChainedDecodingQuadIterator { - ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() { - self.dpos_quads(&encode_term_pair(predicate, object)) - } else { - self.gpos_quads(&encode_term_triple(graph_name, predicate, object)) - }) + graph_name: MatchBy, + ) -> Box>> { + match graph_name { + MatchBy::Repos(repo_iter) => { + let predicate = predicate.clone(); + let object = object.clone(); + NgRepoQuadIterator::new(self, repo_iter).into_iter(move |reader, repo| { + reader.gpos_quads(&encode_term_graph_pair( + repo, + predicate.clone(), + object.clone(), + )) + }) + } + + MatchBy::Commits { + heads, + at_current_heads, + original_graph_name, + } => self.quads_for_ng_iter_heads( + self.ng_posg_quads(&encode_term_pair(predicate, object)), + heads, + at_current_heads, + original_graph_name, + ), + } } fn quads_for_object_graph( &self, object: &EncodedTerm, - graph_name: &EncodedTerm, - ) -> ChainedDecodingQuadIterator { - ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() { - self.dosp_quads(&encode_term(object)) - } else { - self.gosp_quads(&encode_term_pair(graph_name, object)) - }) + graph_name: MatchBy, + ) -> Box>> { + match graph_name { + MatchBy::Repos(repo_iter) => { + let object = object.clone(); + NgRepoQuadIterator::new(self, repo_iter).into_iter(move |reader, repo| { + reader.gosp_quads(&encode_graph_term(repo, object.clone())) + }) + } + MatchBy::Commits { + heads, + at_current_heads, + original_graph_name, + } => self.quads_for_ng_iter_heads( + self.ng_ospg_quads(&encode_term(object)), + heads, + at_current_heads, + original_graph_name, + ), + } } // pub fn named_graphs(&self) -> DecodingGraphIterator { @@ -1187,6 +1803,21 @@ impl Iterator for DecodingNgQuadIterator { } } +impl DecodingNgQuadIterator { + fn into_dataset_iter(self) -> Box>> { + Box::new(self.filter_map(|q| match q { + Err(e) => Some(Err(e)), + Ok((q, val)) => { + if val == REPO_IN_MAIN { + Some(Ok(q)) + } else { + None + } + } + })) + } +} + pub struct DecodingGraphIterator { iter: Iter, } @@ -1406,11 +2037,13 @@ impl<'a> StorageWriter<'a> { key.extend_from_slice(&branch_encoded.to_be_bytes()); key.extend_from_slice(&overlay_encoded.to_be_bytes()); - let topic = topic_encoded.to_be_bytes(); + let mut topic = Vec::with_capacity(17); + topic.push(0); // 1 would mean there is no topic hash andthis is the "store" branch of a store. + topic.extend_from_slice(&topic_encoded.to_be_bytes()); let reader = self.transaction.reader(); if match reader.get(&self.storage.branches_cf, &key)? { - Some(val) => val.to_vec() != topic.to_vec(), + Some(val) => val.to_vec() != topic, None => true, } { self.transaction @@ -1501,9 +2134,14 @@ impl<'a> StorageWriter<'a> { } } - pub fn ng_insert(&mut self, quad: QuadRef<'_>, value: u8) -> Result<(), StorageError> { + pub fn ng_insert( + &mut self, + quad: QuadRef<'_>, + value: u8, + cv: bool, + ) -> Result<(), StorageError> { let encoded = quad.into(); - if self.ng_insert_encoded(&encoded, value)? { + if self.ng_insert_encoded(&encoded, value, cv)? { self.insert_term(quad.subject.into(), &encoded.subject)?; self.insert_term(quad.predicate.into(), &encoded.predicate)?; self.insert_term(quad.object, &encoded.object)?; @@ -1526,6 +2164,7 @@ impl<'a> StorageWriter<'a> { &mut self, encoded: &EncodedQuad, value: u8, + cv: bool, ) -> Result { let value = [value]; self.buffer.clear(); @@ -1554,16 +2193,17 @@ impl<'a> StorageWriter<'a> { self.transaction .insert(&self.storage.gspo_cf, &self.buffer, &value)?; - self.buffer.clear(); - write_gpos_quad(&mut self.buffer, encoded); - self.transaction - .insert(&self.storage.gpos_cf, &self.buffer, &value)?; - - self.buffer.clear(); - write_gosp_quad(&mut self.buffer, encoded); - self.transaction - .insert(&self.storage.gosp_cf, &self.buffer, &value)?; + if !cv { + self.buffer.clear(); + write_gpos_quad(&mut self.buffer, encoded); + self.transaction + .insert(&self.storage.gpos_cf, &self.buffer, &value)?; + self.buffer.clear(); + write_gosp_quad(&mut self.buffer, encoded); + self.transaction + .insert(&self.storage.gosp_cf, &self.buffer, &value)?; + } true }; Ok(result) diff --git a/ng-oxigraph/src/oxigraph/store.rs b/ng-oxigraph/src/oxigraph/store.rs index e915516..86096ab 100644 --- a/ng-oxigraph/src/oxigraph/store.rs +++ b/ng-oxigraph/src/oxigraph/store.rs @@ -294,7 +294,13 @@ impl Store { subject.map(EncodedTerm::from).as_ref(), predicate.map(EncodedTerm::from).as_ref(), object.map(EncodedTerm::from).as_ref(), - graph_name.map(EncodedTerm::from).as_ref(), + graph_name.map(|graph_name_ref| { + if let GraphName::NamedNode(nn) = graph_name_ref.into_owned() { + reader.parse_graph_name(nn.as_string(), None).unwrap() //TODO improve error mng (remove unwrap) + } else { + panic!("invalid graph name"); + } + }), ), reader, } @@ -1187,7 +1193,13 @@ impl<'a> Transaction<'a> { subject.map(EncodedTerm::from).as_ref(), predicate.map(EncodedTerm::from).as_ref(), object.map(EncodedTerm::from).as_ref(), - graph_name.map(EncodedTerm::from).as_ref(), + graph_name.map(|graph_name_ref| { + if let GraphName::NamedNode(nn) = graph_name_ref.into_owned() { + reader.parse_graph_name(nn.as_string(), None).unwrap() //TODO improve error mng (remove unwrap) + } else { + panic!("invalid graph name"); + } + }), ), reader, } @@ -1483,16 +1495,18 @@ impl<'a> Transaction<'a> { &mut self, quad: impl Into>, value: u8, + cv: bool, ) -> Result<(), StorageError> { - self.writer.ng_insert(quad.into(), value) + self.writer.ng_insert(quad.into(), value, cv) } pub fn insert_encoded( &mut self, encoded: &EncodedQuad, value: u8, + cv: bool, ) -> Result { - self.writer.ng_insert_encoded(encoded, value) + self.writer.ng_insert_encoded(encoded, value, cv) } pub fn ng_remove(&mut self, quad: &EncodedQuad, commit: &StrHash) -> Result<(), StorageError> { @@ -1662,7 +1676,7 @@ impl IntoIterator for &Transaction<'_> { /// An iterator returning the quads contained in a [`Store`]. pub struct QuadIter { - iter: ChainedDecodingQuadIterator, + iter: Box>>, reader: StorageReader, } diff --git a/ng-verifier/src/commits/transaction.rs b/ng-verifier/src/commits/transaction.rs index 002820e..b422a57 100644 --- a/ng-verifier/src/commits/transaction.rs +++ b/ng-verifier/src/commits/transaction.rs @@ -262,6 +262,7 @@ impl Verifier { b.read_cap.tokenize(), ) } + // TODO: implement TargetBranchV0::Named _ => unimplemented!(), }; let _ = branches.entry(branch_id).or_insert(( @@ -404,12 +405,12 @@ impl Verifier { for triple in update.transaction.inserts.iter() { let triple_ref: TripleRef = triple.into(); let quad_ref = triple_ref.in_graph(cv_graphname_ref); - transaction.insert(quad_ref, value)?; + transaction.insert(quad_ref, value, true)?; if let Some(ov_graphname) = ov_main.as_ref() { let ov_graphname_ref = GraphNameRef::NamedNode(ov_graphname.into()); let triple_ref: TripleRef = triple.into(); let quad_ref = triple_ref.in_graph(ov_graphname_ref); - transaction.insert(quad_ref, REPO_IN_MAIN)?; + transaction.insert(quad_ref, REPO_IN_MAIN, false)?; } } @@ -493,7 +494,7 @@ impl Verifier { encoded_object.clone(), graph_encoded, ); - transaction.insert_encoded(&quad_encoded, value)?; + transaction.insert_encoded(&quad_encoded, value, true)?; transaction.ng_remove(&quad_encoded, &commit_encoded)?; } if let Some(ov_graphname) = ov_main.as_ref() {