SPARQL Query CRDT

pull/19/head
Niko PLP 7 months ago
parent 74c49d43e6
commit dd20bfde5b
  1. 2
      Cargo.lock
  2. 4
      ng-oxigraph/Cargo.toml
  3. 68
      ng-oxigraph/src/oxigraph/sparql/dataset.rs
  4. 2
      ng-oxigraph/src/oxigraph/storage/backend/fallback.rs
  5. 37
      ng-oxigraph/src/oxigraph/storage/binary_encoder.rs
  6. 856
      ng-oxigraph/src/oxigraph/storage/mod.rs
  7. 24
      ng-oxigraph/src/oxigraph/store.rs
  8. 7
      ng-verifier/src/commits/transaction.rs

2
Cargo.lock generated

@ -3387,12 +3387,14 @@ dependencies = [
name = "ng-oxigraph"
version = "0.4.0-alpha.7-ngpreview6"
dependencies = [
"base64-url",
"codspeed-criterion-compat",
"digest 0.10.7",
"getrandom 0.2.10",
"hex",
"js-sys",
"json-event-parser",
"lazy_static",
"libc",
"md-5",
"memchr",

@ -28,6 +28,7 @@ sep-0006 = []
oxsdatatypes = []
[dependencies]
lazy_static = "1.4.0"
digest = "0.10"
hex = "0.4"
json-event-parser = "0.2.0-alpha.2"
@ -35,7 +36,7 @@ md-5 = "0.10"
oxilangtag = "0.1"
oxiri = "0.2.3"
rand = "0.8"
regex = "1.7"
regex = "1.8.4"
serde = { version = "1.0.142", features = ["derive"] }
sha1 = "0.10"
sha2 = "0.10"
@ -44,6 +45,7 @@ thiserror = "1.0.50"
quick-xml = ">=0.29, <0.32"
memchr = "2.5"
peg = "0.8"
base64-url = "2.0.0"
[target.'cfg(all(not(target_family = "wasm"),not(docsrs)))'.dependencies]
libc = "0.2"

@ -4,18 +4,30 @@ use crate::oxigraph::sparql::EvaluationError;
use crate::oxigraph::storage::numeric_encoder::{
insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup,
};
use crate::oxigraph::storage::{StorageError, StorageReader};
use crate::oxigraph::storage::{MatchBy, StorageError, StorageReader};
use crate::oxigraph::store::CorruptionError;
use std::cell::RefCell;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::iter::empty;
pub struct DatasetView {
reader: StorageReader,
extra: RefCell<HashMap<StrHash, String>>,
dataset: EncodedDatasetSpec,
}
struct ErrorIterator {
err: Option<Result<EncodedQuad, EvaluationError>>,
}
impl Iterator for ErrorIterator {
type Item = Result<EncodedQuad, EvaluationError>;
fn next(&mut self) -> Option<Self::Item> {
self.err.take()
}
}
impl DatasetView {
pub fn new(reader: StorageReader, dataset: &QueryDataset) -> Self {
let dataset = EncodedDatasetSpec {
@ -33,16 +45,47 @@ impl DatasetView {
}
}
fn store_encoded_quads_for_pattern(
&self,
subject: Option<&EncodedTerm>,
predicate: Option<&EncodedTerm>,
object: Option<&EncodedTerm>,
graph_name: Option<&EncodedTerm>,
) -> impl Iterator<Item = Result<EncodedQuad, EvaluationError>> + 'static {
self.reader
.quads_for_pattern(subject, predicate, object, graph_name)
.map(|t| t.map_err(Into::into))
fn parse_graph_name(&self, graph_name: &EncodedTerm) -> Result<MatchBy, StorageError> {
match graph_name {
EncodedTerm::NamedNode { iri_id } => {
let graph_name_string = self.get_str(iri_id)?.ok_or::<StorageError>(
CorruptionError::msg("graph_name not found in parse_graph_name").into(),
)?;
self.reader
.parse_graph_name(&graph_name_string, Some(*iri_id))
}
_ => Err(CorruptionError::msg(
"Invalid graph_name (not a NamedNode) in parse_graph_name",
)
.into()),
}
}
fn store_encoded_quads_for_pattern<'a>(
&'a self,
subject: Option<&'a EncodedTerm>,
predicate: Option<&'a EncodedTerm>,
object: Option<&'a EncodedTerm>,
graph_name: Option<&'a EncodedTerm>,
) -> Box<dyn Iterator<Item = Result<EncodedQuad, EvaluationError>>> {
let graph = if let Some(g) = graph_name {
match self.parse_graph_name(g) {
Ok(match_by) => Some(match_by),
Err(e) => {
return Box::new(ErrorIterator {
err: Some(Err(e.into())),
})
}
}
} else {
None
};
Box::new(
self.reader
.quads_for_pattern(subject, predicate, object, graph)
.map(|t| t.map_err(Into::into)),
)
}
#[allow(clippy::needless_collect)]
@ -141,6 +184,7 @@ impl DatasetView {
Box::new(iters.into_iter().flatten())
} else {
Box::new(
// TODO: filter could be removed here as we never return quads with defaultGraph as graph
self.store_encoded_quads_for_pattern(subject, predicate, object, None)
.filter(|quad| match quad {
Err(_) => true,

@ -85,8 +85,10 @@ impl Db {
#[derive(Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub struct ColumnFamily(&'static str);
#[derive(Clone)]
pub struct Reader(InnerReader);
#[derive(Clone)]
enum InnerReader {
Simple(Arc<RwLock<HashMap<ColumnFamily, BTreeMap<Vec<u8>, Vec<u8>>>>>),
Transaction(

@ -5,7 +5,7 @@ use crate::oxsdatatypes::*;
use std::io::Read;
use std::mem::size_of;
#[cfg(all(not(target_family = "wasm"),not(docsrs)))]
#[cfg(all(not(target_family = "wasm"), not(docsrs)))]
pub const LATEST_STORAGE_VERSION: u64 = 1;
pub const WRITTEN_TERM_MAX_SIZE: usize = size_of::<u8>() + 2 * size_of::<StrHash>();
@ -465,6 +465,12 @@ pub fn encode_term(t: &EncodedTerm) -> Vec<u8> {
vec
}
pub fn encode_graph(t1: StrHash) -> Vec<u8> {
let mut vec = Vec::with_capacity(17);
write_term(&mut vec, &EncodedTerm::NamedNode { iri_id: t1 });
vec
}
pub fn encode_term_pair(t1: &EncodedTerm, t2: &EncodedTerm) -> Vec<u8> {
let mut vec = Vec::with_capacity(2 * WRITTEN_TERM_MAX_SIZE);
write_term(&mut vec, t1);
@ -472,6 +478,13 @@ pub fn encode_term_pair(t1: &EncodedTerm, t2: &EncodedTerm) -> Vec<u8> {
vec
}
pub fn encode_graph_term(t1: StrHash, t2: EncodedTerm) -> Vec<u8> {
let mut vec = Vec::with_capacity(WRITTEN_TERM_MAX_SIZE + 17);
write_term(&mut vec, &EncodedTerm::NamedNode { iri_id: t1 });
write_term(&mut vec, &t2);
vec
}
pub fn encode_term_triple(t1: &EncodedTerm, t2: &EncodedTerm, t3: &EncodedTerm) -> Vec<u8> {
let mut vec = Vec::with_capacity(3 * WRITTEN_TERM_MAX_SIZE);
write_term(&mut vec, t1);
@ -480,6 +493,14 @@ pub fn encode_term_triple(t1: &EncodedTerm, t2: &EncodedTerm, t3: &EncodedTerm)
vec
}
pub fn encode_term_graph_pair(t1: StrHash, t2: EncodedTerm, t3: EncodedTerm) -> Vec<u8> {
let mut vec = Vec::with_capacity(2 * WRITTEN_TERM_MAX_SIZE + 17);
write_term(&mut vec, &EncodedTerm::NamedNode { iri_id: t1 });
write_term(&mut vec, &t2);
write_term(&mut vec, &t3);
vec
}
pub fn encode_term_quad(
t1: &EncodedTerm,
t2: &EncodedTerm,
@ -494,6 +515,20 @@ pub fn encode_term_quad(
vec
}
pub fn encode_term_graph_triple(
t1: StrHash,
t2: EncodedTerm,
t3: EncodedTerm,
t4: EncodedTerm,
) -> Vec<u8> {
let mut vec = Vec::with_capacity(3 * WRITTEN_TERM_MAX_SIZE + 17);
write_term(&mut vec, &EncodedTerm::NamedNode { iri_id: t1 });
write_term(&mut vec, &t2);
write_term(&mut vec, &t3);
write_term(&mut vec, &t4);
vec
}
pub fn write_term(sink: &mut Vec<u8>, term: &EncodedTerm) {
match term {
EncodedTerm::DefaultGraph => (),

File diff suppressed because it is too large Load Diff

@ -294,7 +294,13 @@ impl Store {
subject.map(EncodedTerm::from).as_ref(),
predicate.map(EncodedTerm::from).as_ref(),
object.map(EncodedTerm::from).as_ref(),
graph_name.map(EncodedTerm::from).as_ref(),
graph_name.map(|graph_name_ref| {
if let GraphName::NamedNode(nn) = graph_name_ref.into_owned() {
reader.parse_graph_name(nn.as_string(), None).unwrap() //TODO improve error mng (remove unwrap)
} else {
panic!("invalid graph name");
}
}),
),
reader,
}
@ -1187,7 +1193,13 @@ impl<'a> Transaction<'a> {
subject.map(EncodedTerm::from).as_ref(),
predicate.map(EncodedTerm::from).as_ref(),
object.map(EncodedTerm::from).as_ref(),
graph_name.map(EncodedTerm::from).as_ref(),
graph_name.map(|graph_name_ref| {
if let GraphName::NamedNode(nn) = graph_name_ref.into_owned() {
reader.parse_graph_name(nn.as_string(), None).unwrap() //TODO improve error mng (remove unwrap)
} else {
panic!("invalid graph name");
}
}),
),
reader,
}
@ -1483,16 +1495,18 @@ impl<'a> Transaction<'a> {
&mut self,
quad: impl Into<QuadRef<'b>>,
value: u8,
cv: bool,
) -> Result<(), StorageError> {
self.writer.ng_insert(quad.into(), value)
self.writer.ng_insert(quad.into(), value, cv)
}
pub fn insert_encoded(
&mut self,
encoded: &EncodedQuad,
value: u8,
cv: bool,
) -> Result<bool, StorageError> {
self.writer.ng_insert_encoded(encoded, value)
self.writer.ng_insert_encoded(encoded, value, cv)
}
pub fn ng_remove(&mut self, quad: &EncodedQuad, commit: &StrHash) -> Result<(), StorageError> {
@ -1662,7 +1676,7 @@ impl IntoIterator for &Transaction<'_> {
/// An iterator returning the quads contained in a [`Store`].
pub struct QuadIter {
iter: ChainedDecodingQuadIterator,
iter: Box<dyn Iterator<Item = Result<EncodedQuad, StorageError>>>,
reader: StorageReader,
}

@ -262,6 +262,7 @@ impl Verifier {
b.read_cap.tokenize(),
)
}
// TODO: implement TargetBranchV0::Named
_ => unimplemented!(),
};
let _ = branches.entry(branch_id).or_insert((
@ -404,12 +405,12 @@ impl Verifier {
for triple in update.transaction.inserts.iter() {
let triple_ref: TripleRef = triple.into();
let quad_ref = triple_ref.in_graph(cv_graphname_ref);
transaction.insert(quad_ref, value)?;
transaction.insert(quad_ref, value, true)?;
if let Some(ov_graphname) = ov_main.as_ref() {
let ov_graphname_ref = GraphNameRef::NamedNode(ov_graphname.into());
let triple_ref: TripleRef = triple.into();
let quad_ref = triple_ref.in_graph(ov_graphname_ref);
transaction.insert(quad_ref, REPO_IN_MAIN)?;
transaction.insert(quad_ref, REPO_IN_MAIN, false)?;
}
}
@ -493,7 +494,7 @@ impl Verifier {
encoded_object.clone(),
graph_encoded,
);
transaction.insert_encoded(&quad_encoded, value)?;
transaction.insert_encoded(&quad_encoded, value, true)?;
transaction.ng_remove(&quad_encoded, &commit_encoded)?;
}
if let Some(ov_graphname) = ov_main.as_ref() {

Loading…
Cancel
Save