Allows each store to use its own ID builder

pull/46/head
Tpt 5 years ago
parent fcb3a33606
commit 5038d95a00
  1. 173
      lib/src/sparql/dataset.rs
  2. 493
      lib/src/sparql/eval.rs
  3. 34
      lib/src/sparql/mod.rs
  4. 412
      lib/src/sparql/plan.rs
  5. 123
      lib/src/sparql/plan_builder.rs
  6. 98
      lib/src/store/memory.rs
  7. 72
      lib/src/store/mod.rs
  8. 1043
      lib/src/store/numeric_encoder.rs
  9. 328
      lib/src/store/rocksdb.rs
  10. 108
      lib/src/store/sled.rs
  11. 2
      wikibase/src/loader.rs

@ -0,0 +1,173 @@
use crate::sparql::EvaluationError;
use crate::store::numeric_encoder::{
EncodedQuad, EncodedTerm, MemoryStrStore, StrContainer, StrHash, StrId, StrLookup,
WithStoreError,
};
use crate::store::ReadableEncodedStore;
use std::cell::RefCell;
use std::iter::empty;
pub(crate) struct DatasetView<S: ReadableEncodedStore> {
store: S,
extra: RefCell<MemoryStrStore>,
default_graph_as_union: bool,
}
impl<S: ReadableEncodedStore> DatasetView<S> {
pub fn new(store: S, default_graph_as_union: bool) -> Self {
Self {
store,
extra: RefCell::new(MemoryStrStore::default()),
default_graph_as_union,
}
}
}
impl<S: ReadableEncodedStore> WithStoreError for DatasetView<S> {
type Error = EvaluationError;
type StrId = DatasetStrId<S::StrId>;
}
impl<S: ReadableEncodedStore> StrLookup for DatasetView<S> {
fn get_str(&self, id: DatasetStrId<S::StrId>) -> Result<Option<String>, EvaluationError> {
match id {
DatasetStrId::Store(id) => self.store.get_str(id).map_err(|e| e.into()),
DatasetStrId::Temporary(id) => Ok(self.extra.borrow().get_str(id)?),
}
}
fn get_str_id(&self, value: &str) -> Result<Option<DatasetStrId<S::StrId>>, EvaluationError> {
if let Some(id) = self.extra.borrow().get_str_id(value)? {
Ok(Some(DatasetStrId::Temporary(id)))
} else {
Ok(self
.store
.get_str_id(value)
.map_err(|e| e.into())?
.map(DatasetStrId::Store))
}
}
}
impl<S: ReadableEncodedStore> ReadableEncodedStore for DatasetView<S> {
type QuadsIter =
Box<dyn Iterator<Item = Result<EncodedQuad<DatasetStrId<S::StrId>>, EvaluationError>>>;
fn encoded_quads_for_pattern(
&self,
subject: Option<EncodedTerm<Self::StrId>>,
predicate: Option<EncodedTerm<Self::StrId>>,
object: Option<EncodedTerm<Self::StrId>>,
graph_name: Option<EncodedTerm<Self::StrId>>,
) -> Box<dyn Iterator<Item = Result<EncodedQuad<DatasetStrId<S::StrId>>, EvaluationError>>>
{
if let Some((subject, predicate, object, graph_name)) =
try_map_quad_pattern(subject, predicate, object, graph_name)
{
if graph_name == None {
Box::new(
map_iter(
self.store
.encoded_quads_for_pattern(subject, predicate, object, None),
)
.filter(|quad| match quad {
Err(_) => true,
Ok(quad) => quad.graph_name != EncodedTerm::DefaultGraph,
}),
)
} else if graph_name == Some(EncodedTerm::DefaultGraph) && self.default_graph_as_union {
Box::new(
map_iter(
self.store
.encoded_quads_for_pattern(subject, predicate, object, None),
)
.map(|quad| {
let quad = quad?;
Ok(EncodedQuad::new(
quad.subject,
quad.predicate,
quad.object,
EncodedTerm::DefaultGraph,
))
}),
)
} else {
Box::new(map_iter(self.store.encoded_quads_for_pattern(
subject, predicate, object, graph_name,
)))
}
} else {
Box::new(empty())
}
}
}
fn map_iter<'a, I: StrId>(
iter: impl Iterator<Item = Result<EncodedQuad<I>, impl Into<EvaluationError>>> + 'a,
) -> impl Iterator<Item = Result<EncodedQuad<DatasetStrId<I>>, EvaluationError>> + 'a {
iter.map(|t| {
t.map(|q| EncodedQuad {
subject: q.subject.map_id(DatasetStrId::Store),
predicate: q.predicate.map_id(DatasetStrId::Store),
object: q.object.map_id(DatasetStrId::Store),
graph_name: q.graph_name.map_id(DatasetStrId::Store),
})
.map_err(|e| e.into())
})
}
type QuadPattern<I> = (
Option<EncodedTerm<I>>,
Option<EncodedTerm<I>>,
Option<EncodedTerm<I>>,
Option<EncodedTerm<I>>,
);
fn try_map_quad_pattern<I: StrId>(
subject: Option<EncodedTerm<DatasetStrId<I>>>,
predicate: Option<EncodedTerm<DatasetStrId<I>>>,
object: Option<EncodedTerm<DatasetStrId<I>>>,
graph_name: Option<EncodedTerm<DatasetStrId<I>>>,
) -> Option<QuadPattern<I>> {
Some((
transpose(subject.map(|t| t.try_map_id(unwrap_store_id)))?,
transpose(predicate.map(|t| t.try_map_id(unwrap_store_id)))?,
transpose(object.map(|t| t.try_map_id(unwrap_store_id)))?,
transpose(graph_name.map(|t| t.try_map_id(unwrap_store_id)))?,
))
}
fn transpose<T>(o: Option<Option<T>>) -> Option<Option<T>> {
match o {
Some(Some(v)) => Some(Some(v)),
Some(None) => None,
None => Some(None),
}
}
fn unwrap_store_id<I: StrId>(id: DatasetStrId<I>) -> Option<I> {
match id {
DatasetStrId::Store(id) => Some(id),
DatasetStrId::Temporary(_) => None,
}
}
impl<'a, S: ReadableEncodedStore> StrContainer for &'a DatasetView<S> {
fn insert_str(&mut self, value: &str) -> Result<Self::StrId, EvaluationError> {
if let Some(id) = self.store.get_str_id(value).map_err(|e| e.into())? {
Ok(DatasetStrId::Store(id))
} else {
Ok(DatasetStrId::Temporary(
self.extra.borrow_mut().insert_str(value)?,
))
}
}
}
#[derive(Eq, PartialEq, Debug, Copy, Clone, Hash)]
pub enum DatasetStrId<I: StrId> {
Store(I),
Temporary(StrHash),
}
impl<I: StrId> StrId for DatasetStrId<I> {}

File diff suppressed because it is too large Load Diff

@ -1,6 +1,7 @@
//! [SPARQL](https://www.w3.org/TR/sparql11-overview/) implementation. //! [SPARQL](https://www.w3.org/TR/sparql11-overview/) implementation.
mod algebra; mod algebra;
mod dataset;
mod error; mod error;
mod eval; mod eval;
mod json_results; mod json_results;
@ -16,8 +17,7 @@ use crate::sparql::eval::SimpleEvaluator;
pub use crate::sparql::model::QuerySolution; pub use crate::sparql::model::QuerySolution;
pub use crate::sparql::model::QuerySolutionsIterator; pub use crate::sparql::model::QuerySolutionsIterator;
pub use crate::sparql::model::QueryTriplesIterator; pub use crate::sparql::model::QueryTriplesIterator;
use crate::sparql::plan::TripleTemplate; use crate::sparql::plan::{PlanNode, TripleTemplate};
use crate::sparql::plan::{DatasetView, PlanNode};
use crate::sparql::plan_builder::PlanBuilder; use crate::sparql::plan_builder::PlanBuilder;
use crate::store::ReadableEncodedStore; use crate::store::ReadableEncodedStore;
use std::convert::TryInto; use std::convert::TryInto;
@ -28,10 +28,12 @@ pub use crate::sparql::model::QueryResult;
pub use crate::sparql::model::QueryResultFormat; pub use crate::sparql::model::QueryResultFormat;
#[deprecated(note = "Use QueryResultFormat instead")] #[deprecated(note = "Use QueryResultFormat instead")]
pub type QueryResultSyntax = QueryResultFormat; pub type QueryResultSyntax = QueryResultFormat;
use crate::sparql::dataset::DatasetView;
pub use crate::sparql::error::EvaluationError; pub use crate::sparql::error::EvaluationError;
pub use crate::sparql::model::Variable; pub use crate::sparql::model::Variable;
pub use crate::sparql::parser::ParseError; pub use crate::sparql::parser::ParseError;
pub use crate::sparql::parser::Query; pub use crate::sparql::parser::Query;
use crate::store::numeric_encoder::WithStoreError;
use std::error::Error; use std::error::Error;
/// A prepared [SPARQL query](https://www.w3.org/TR/sparql11-query/) /// A prepared [SPARQL query](https://www.w3.org/TR/sparql11-query/)
@ -48,22 +50,22 @@ pub(crate) struct SimplePreparedQuery<S: ReadableEncodedStore + 'static>(
#[derive(Clone)] #[derive(Clone)]
enum SimplePreparedQueryAction<S: ReadableEncodedStore + 'static> { enum SimplePreparedQueryAction<S: ReadableEncodedStore + 'static> {
Select { Select {
plan: Rc<PlanNode>, plan: Rc<PlanNode<<DatasetView<S> as WithStoreError>::StrId>>,
variables: Rc<Vec<Variable>>, variables: Rc<Vec<Variable>>,
evaluator: SimpleEvaluator<S>, evaluator: SimpleEvaluator<DatasetView<S>>,
}, },
Ask { Ask {
plan: Rc<PlanNode>, plan: Rc<PlanNode<<DatasetView<S> as WithStoreError>::StrId>>,
evaluator: SimpleEvaluator<S>, evaluator: SimpleEvaluator<DatasetView<S>>,
}, },
Construct { Construct {
plan: Rc<PlanNode>, plan: Rc<PlanNode<<DatasetView<S> as WithStoreError>::StrId>>,
construct: Rc<Vec<TripleTemplate>>, construct: Rc<Vec<TripleTemplate<<DatasetView<S> as WithStoreError>::StrId>>>,
evaluator: SimpleEvaluator<S>, evaluator: SimpleEvaluator<DatasetView<S>>,
}, },
Describe { Describe {
plan: Rc<PlanNode>, plan: Rc<PlanNode<<DatasetView<S> as WithStoreError>::StrId>>,
evaluator: SimpleEvaluator<S>, evaluator: SimpleEvaluator<DatasetView<S>>,
}, },
} }
@ -78,7 +80,7 @@ impl<S: ReadableEncodedStore + 'static> SimplePreparedQuery<S> {
QueryVariants::Select { QueryVariants::Select {
algebra, base_iri, .. algebra, base_iri, ..
} => { } => {
let (plan, variables) = PlanBuilder::build(dataset.encoder(), &algebra)?; let (plan, variables) = PlanBuilder::build(dataset.as_ref(), &algebra)?;
SimplePreparedQueryAction::Select { SimplePreparedQueryAction::Select {
plan: Rc::new(plan), plan: Rc::new(plan),
variables: Rc::new(variables), variables: Rc::new(variables),
@ -88,7 +90,7 @@ impl<S: ReadableEncodedStore + 'static> SimplePreparedQuery<S> {
QueryVariants::Ask { QueryVariants::Ask {
algebra, base_iri, .. algebra, base_iri, ..
} => { } => {
let (plan, _) = PlanBuilder::build(dataset.encoder(), &algebra)?; let (plan, _) = PlanBuilder::build(dataset.as_ref(), &algebra)?;
SimplePreparedQueryAction::Ask { SimplePreparedQueryAction::Ask {
plan: Rc::new(plan), plan: Rc::new(plan),
evaluator: SimpleEvaluator::new(dataset, base_iri, options.service_handler), evaluator: SimpleEvaluator::new(dataset, base_iri, options.service_handler),
@ -100,11 +102,11 @@ impl<S: ReadableEncodedStore + 'static> SimplePreparedQuery<S> {
base_iri, base_iri,
.. ..
} => { } => {
let (plan, variables) = PlanBuilder::build(dataset.encoder(), &algebra)?; let (plan, variables) = PlanBuilder::build(dataset.as_ref(), &algebra)?;
SimplePreparedQueryAction::Construct { SimplePreparedQueryAction::Construct {
plan: Rc::new(plan), plan: Rc::new(plan),
construct: Rc::new(PlanBuilder::build_graph_template( construct: Rc::new(PlanBuilder::build_graph_template(
dataset.encoder(), dataset.as_ref(),
&construct, &construct,
variables, variables,
)?), )?),
@ -114,7 +116,7 @@ impl<S: ReadableEncodedStore + 'static> SimplePreparedQuery<S> {
QueryVariants::Describe { QueryVariants::Describe {
algebra, base_iri, .. algebra, base_iri, ..
} => { } => {
let (plan, _) = PlanBuilder::build(dataset.encoder(), &algebra)?; let (plan, _) = PlanBuilder::build(dataset.as_ref(), &algebra)?;
SimplePreparedQueryAction::Describe { SimplePreparedQueryAction::Describe {
plan: Rc::new(plan), plan: Rc::new(plan),
evaluator: SimpleEvaluator::new(dataset, base_iri, options.service_handler), evaluator: SimpleEvaluator::new(dataset, base_iri, options.service_handler),

@ -1,96 +1,89 @@
use crate::error::UnwrapInfallible;
use crate::sparql::algebra::GraphPattern; use crate::sparql::algebra::GraphPattern;
use crate::sparql::error::EvaluationError;
use crate::sparql::model::Variable; use crate::sparql::model::Variable;
use crate::store::numeric_encoder::{ use crate::store::numeric_encoder::{EncodedTerm, StrId};
EncodedQuad, EncodedTerm, Encoder, MemoryStrStore, StrContainer, StrHash, StrLookup,
WithStoreError,
};
use crate::store::ReadableEncodedStore;
use std::cell::{RefCell, RefMut};
use std::collections::BTreeSet; use std::collections::BTreeSet;
use std::rc::Rc; use std::rc::Rc;
#[derive(Eq, PartialEq, Debug, Clone, Hash)] #[derive(Eq, PartialEq, Debug, Clone, Hash)]
pub enum PlanNode { pub enum PlanNode<I: StrId> {
Init, Init,
StaticBindings { StaticBindings {
tuples: Vec<EncodedTuple>, tuples: Vec<EncodedTuple<I>>,
}, },
Service { Service {
service_name: PatternValue, service_name: PatternValue<I>,
variables: Rc<Vec<Variable>>, variables: Rc<Vec<Variable>>,
child: Rc<PlanNode>, child: Rc<PlanNode<I>>,
graph_pattern: Rc<GraphPattern>, graph_pattern: Rc<GraphPattern>,
silent: bool, silent: bool,
}, },
QuadPatternJoin { QuadPatternJoin {
child: Rc<PlanNode>, child: Rc<PlanNode<I>>,
subject: PatternValue, subject: PatternValue<I>,
predicate: PatternValue, predicate: PatternValue<I>,
object: PatternValue, object: PatternValue<I>,
graph_name: PatternValue, graph_name: PatternValue<I>,
}, },
PathPatternJoin { PathPatternJoin {
child: Rc<PlanNode>, child: Rc<PlanNode<I>>,
subject: PatternValue, subject: PatternValue<I>,
path: Rc<PlanPropertyPath>, path: Rc<PlanPropertyPath<I>>,
object: PatternValue, object: PatternValue<I>,
graph_name: PatternValue, graph_name: PatternValue<I>,
}, },
Join { Join {
left: Rc<PlanNode>, left: Rc<PlanNode<I>>,
right: Rc<PlanNode>, right: Rc<PlanNode<I>>,
}, },
AntiJoin { AntiJoin {
left: Rc<PlanNode>, left: Rc<PlanNode<I>>,
right: Rc<PlanNode>, right: Rc<PlanNode<I>>,
}, },
Filter { Filter {
child: Rc<PlanNode>, child: Rc<PlanNode<I>>,
expression: Rc<PlanExpression>, expression: Rc<PlanExpression<I>>,
}, },
Union { Union {
children: Vec<Rc<PlanNode>>, children: Vec<Rc<PlanNode<I>>>,
}, },
LeftJoin { LeftJoin {
left: Rc<PlanNode>, left: Rc<PlanNode<I>>,
right: Rc<PlanNode>, right: Rc<PlanNode<I>>,
possible_problem_vars: Rc<Vec<usize>>, //Variables that should not be part of the entry of the left join possible_problem_vars: Rc<Vec<usize>>, //Variables that should not be part of the entry of the left join
}, },
Extend { Extend {
child: Rc<PlanNode>, child: Rc<PlanNode<I>>,
position: usize, position: usize,
expression: Rc<PlanExpression>, expression: Rc<PlanExpression<I>>,
}, },
Sort { Sort {
child: Rc<PlanNode>, child: Rc<PlanNode<I>>,
by: Vec<Comparator>, by: Vec<Comparator<I>>,
}, },
HashDeduplicate { HashDeduplicate {
child: Rc<PlanNode>, child: Rc<PlanNode<I>>,
}, },
Skip { Skip {
child: Rc<PlanNode>, child: Rc<PlanNode<I>>,
count: usize, count: usize,
}, },
Limit { Limit {
child: Rc<PlanNode>, child: Rc<PlanNode<I>>,
count: usize, count: usize,
}, },
Project { Project {
child: Rc<PlanNode>, child: Rc<PlanNode<I>>,
mapping: Rc<Vec<(usize, usize)>>, // pairs of (variable key in child, variable key in output) mapping: Rc<Vec<(usize, usize)>>, // pairs of (variable key in child, variable key in output)
}, },
Aggregate { Aggregate {
// By definition the group by key are the range 0..key_mapping.len() // By definition the group by key are the range 0..key_mapping.len()
child: Rc<PlanNode>, child: Rc<PlanNode<I>>,
key_mapping: Rc<Vec<(usize, usize)>>, // aggregate key pairs of (variable key in child, variable key in output) key_mapping: Rc<Vec<(usize, usize)>>, // aggregate key pairs of (variable key in child, variable key in output)
aggregates: Rc<Vec<(PlanAggregation, usize)>>, aggregates: Rc<Vec<(PlanAggregation<I>, usize)>>,
}, },
} }
impl PlanNode { impl<I: StrId> PlanNode<I> {
/// Returns variables that might be bound in the result set /// Returns variables that might be bound in the result set
pub fn maybe_bound_variables(&self) -> BTreeSet<usize> { pub fn maybe_bound_variables(&self) -> BTreeSet<usize> {
let mut set = BTreeSet::default(); let mut set = BTreeSet::default();
@ -201,12 +194,12 @@ impl PlanNode {
} }
#[derive(Eq, PartialEq, Debug, Clone, Copy, Hash)] #[derive(Eq, PartialEq, Debug, Clone, Copy, Hash)]
pub enum PatternValue { pub enum PatternValue<I: StrId> {
Constant(EncodedTerm), Constant(EncodedTerm<I>),
Variable(usize), Variable(usize),
} }
impl PatternValue { impl<I: StrId> PatternValue<I> {
pub fn is_var(&self) -> bool { pub fn is_var(&self) -> bool {
match self { match self {
PatternValue::Constant(_) => false, PatternValue::Constant(_) => false,
@ -216,108 +209,108 @@ impl PatternValue {
} }
#[derive(Eq, PartialEq, Debug, Clone, Hash)] #[derive(Eq, PartialEq, Debug, Clone, Hash)]
pub enum PlanExpression { pub enum PlanExpression<I: StrId> {
Constant(EncodedTerm), Constant(EncodedTerm<I>),
Variable(usize), Variable(usize),
Exists(Rc<PlanNode>), Exists(Rc<PlanNode<I>>),
Or(Box<PlanExpression>, Box<PlanExpression>), Or(Box<PlanExpression<I>>, Box<PlanExpression<I>>),
And(Box<PlanExpression>, Box<PlanExpression>), And(Box<PlanExpression<I>>, Box<PlanExpression<I>>),
Equal(Box<PlanExpression>, Box<PlanExpression>), Equal(Box<PlanExpression<I>>, Box<PlanExpression<I>>),
NotEqual(Box<PlanExpression>, Box<PlanExpression>), NotEqual(Box<PlanExpression<I>>, Box<PlanExpression<I>>),
Greater(Box<PlanExpression>, Box<PlanExpression>), Greater(Box<PlanExpression<I>>, Box<PlanExpression<I>>),
GreaterOrEq(Box<PlanExpression>, Box<PlanExpression>), GreaterOrEq(Box<PlanExpression<I>>, Box<PlanExpression<I>>),
Lower(Box<PlanExpression>, Box<PlanExpression>), Lower(Box<PlanExpression<I>>, Box<PlanExpression<I>>),
LowerOrEq(Box<PlanExpression>, Box<PlanExpression>), LowerOrEq(Box<PlanExpression<I>>, Box<PlanExpression<I>>),
In(Box<PlanExpression>, Vec<PlanExpression>), In(Box<PlanExpression<I>>, Vec<PlanExpression<I>>),
Add(Box<PlanExpression>, Box<PlanExpression>), Add(Box<PlanExpression<I>>, Box<PlanExpression<I>>),
Sub(Box<PlanExpression>, Box<PlanExpression>), Sub(Box<PlanExpression<I>>, Box<PlanExpression<I>>),
Mul(Box<PlanExpression>, Box<PlanExpression>), Mul(Box<PlanExpression<I>>, Box<PlanExpression<I>>),
Div(Box<PlanExpression>, Box<PlanExpression>), Div(Box<PlanExpression<I>>, Box<PlanExpression<I>>),
UnaryPlus(Box<PlanExpression>), UnaryPlus(Box<PlanExpression<I>>),
UnaryMinus(Box<PlanExpression>), UnaryMinus(Box<PlanExpression<I>>),
UnaryNot(Box<PlanExpression>), UnaryNot(Box<PlanExpression<I>>),
Str(Box<PlanExpression>), Str(Box<PlanExpression<I>>),
Lang(Box<PlanExpression>), Lang(Box<PlanExpression<I>>),
LangMatches(Box<PlanExpression>, Box<PlanExpression>), LangMatches(Box<PlanExpression<I>>, Box<PlanExpression<I>>),
Datatype(Box<PlanExpression>), Datatype(Box<PlanExpression<I>>),
Bound(usize), Bound(usize),
IRI(Box<PlanExpression>), IRI(Box<PlanExpression<I>>),
BNode(Option<Box<PlanExpression>>), BNode(Option<Box<PlanExpression<I>>>),
Rand, Rand,
Abs(Box<PlanExpression>), Abs(Box<PlanExpression<I>>),
Ceil(Box<PlanExpression>), Ceil(Box<PlanExpression<I>>),
Floor(Box<PlanExpression>), Floor(Box<PlanExpression<I>>),
Round(Box<PlanExpression>), Round(Box<PlanExpression<I>>),
Concat(Vec<PlanExpression>), Concat(Vec<PlanExpression<I>>),
SubStr( SubStr(
Box<PlanExpression>, Box<PlanExpression<I>>,
Box<PlanExpression>, Box<PlanExpression<I>>,
Option<Box<PlanExpression>>, Option<Box<PlanExpression<I>>>,
), ),
StrLen(Box<PlanExpression>), StrLen(Box<PlanExpression<I>>),
Replace( Replace(
Box<PlanExpression>, Box<PlanExpression<I>>,
Box<PlanExpression>, Box<PlanExpression<I>>,
Box<PlanExpression>, Box<PlanExpression<I>>,
Option<Box<PlanExpression>>, Option<Box<PlanExpression<I>>>,
), ),
UCase(Box<PlanExpression>), UCase(Box<PlanExpression<I>>),
LCase(Box<PlanExpression>), LCase(Box<PlanExpression<I>>),
EncodeForURI(Box<PlanExpression>), EncodeForURI(Box<PlanExpression<I>>),
Contains(Box<PlanExpression>, Box<PlanExpression>), Contains(Box<PlanExpression<I>>, Box<PlanExpression<I>>),
StrStarts(Box<PlanExpression>, Box<PlanExpression>), StrStarts(Box<PlanExpression<I>>, Box<PlanExpression<I>>),
StrEnds(Box<PlanExpression>, Box<PlanExpression>), StrEnds(Box<PlanExpression<I>>, Box<PlanExpression<I>>),
StrBefore(Box<PlanExpression>, Box<PlanExpression>), StrBefore(Box<PlanExpression<I>>, Box<PlanExpression<I>>),
StrAfter(Box<PlanExpression>, Box<PlanExpression>), StrAfter(Box<PlanExpression<I>>, Box<PlanExpression<I>>),
Year(Box<PlanExpression>), Year(Box<PlanExpression<I>>),
Month(Box<PlanExpression>), Month(Box<PlanExpression<I>>),
Day(Box<PlanExpression>), Day(Box<PlanExpression<I>>),
Hours(Box<PlanExpression>), Hours(Box<PlanExpression<I>>),
Minutes(Box<PlanExpression>), Minutes(Box<PlanExpression<I>>),
Seconds(Box<PlanExpression>), Seconds(Box<PlanExpression<I>>),
Timezone(Box<PlanExpression>), Timezone(Box<PlanExpression<I>>),
Tz(Box<PlanExpression>), Tz(Box<PlanExpression<I>>),
Now, Now,
UUID, UUID,
StrUUID, StrUUID,
MD5(Box<PlanExpression>), MD5(Box<PlanExpression<I>>),
SHA1(Box<PlanExpression>), SHA1(Box<PlanExpression<I>>),
SHA256(Box<PlanExpression>), SHA256(Box<PlanExpression<I>>),
SHA384(Box<PlanExpression>), SHA384(Box<PlanExpression<I>>),
SHA512(Box<PlanExpression>), SHA512(Box<PlanExpression<I>>),
Coalesce(Vec<PlanExpression>), Coalesce(Vec<PlanExpression<I>>),
If( If(
Box<PlanExpression>, Box<PlanExpression<I>>,
Box<PlanExpression>, Box<PlanExpression<I>>,
Box<PlanExpression>, Box<PlanExpression<I>>,
), ),
StrLang(Box<PlanExpression>, Box<PlanExpression>), StrLang(Box<PlanExpression<I>>, Box<PlanExpression<I>>),
StrDT(Box<PlanExpression>, Box<PlanExpression>), StrDT(Box<PlanExpression<I>>, Box<PlanExpression<I>>),
SameTerm(Box<PlanExpression>, Box<PlanExpression>), SameTerm(Box<PlanExpression<I>>, Box<PlanExpression<I>>),
IsIRI(Box<PlanExpression>), IsIRI(Box<PlanExpression<I>>),
IsBlank(Box<PlanExpression>), IsBlank(Box<PlanExpression<I>>),
IsLiteral(Box<PlanExpression>), IsLiteral(Box<PlanExpression<I>>),
IsNumeric(Box<PlanExpression>), IsNumeric(Box<PlanExpression<I>>),
Regex( Regex(
Box<PlanExpression>, Box<PlanExpression<I>>,
Box<PlanExpression>, Box<PlanExpression<I>>,
Option<Box<PlanExpression>>, Option<Box<PlanExpression<I>>>,
), ),
BooleanCast(Box<PlanExpression>), BooleanCast(Box<PlanExpression<I>>),
DoubleCast(Box<PlanExpression>), DoubleCast(Box<PlanExpression<I>>),
FloatCast(Box<PlanExpression>), FloatCast(Box<PlanExpression<I>>),
DecimalCast(Box<PlanExpression>), DecimalCast(Box<PlanExpression<I>>),
IntegerCast(Box<PlanExpression>), IntegerCast(Box<PlanExpression<I>>),
DateCast(Box<PlanExpression>), DateCast(Box<PlanExpression<I>>),
TimeCast(Box<PlanExpression>), TimeCast(Box<PlanExpression<I>>),
DateTimeCast(Box<PlanExpression>), DateTimeCast(Box<PlanExpression<I>>),
DurationCast(Box<PlanExpression>), DurationCast(Box<PlanExpression<I>>),
YearMonthDurationCast(Box<PlanExpression>), YearMonthDurationCast(Box<PlanExpression<I>>),
DayTimeDurationCast(Box<PlanExpression>), DayTimeDurationCast(Box<PlanExpression<I>>),
StringCast(Box<PlanExpression>), StringCast(Box<PlanExpression<I>>),
} }
impl PlanExpression { impl<I: StrId> PlanExpression<I> {
pub fn add_maybe_bound_variables(&self, set: &mut BTreeSet<usize>) { pub fn add_maybe_bound_variables(&self, set: &mut BTreeSet<usize>) {
match self { match self {
PlanExpression::Variable(v) | PlanExpression::Bound(v) => { PlanExpression::Variable(v) | PlanExpression::Bound(v) => {
@ -434,9 +427,9 @@ impl PlanExpression {
} }
#[derive(Eq, PartialEq, Debug, Clone, Hash)] #[derive(Eq, PartialEq, Debug, Clone, Hash)]
pub struct PlanAggregation { pub struct PlanAggregation<I: StrId> {
pub function: PlanAggregationFunction, pub function: PlanAggregationFunction,
pub parameter: Option<PlanExpression>, pub parameter: Option<PlanExpression<I>>,
pub distinct: bool, pub distinct: bool,
} }
@ -452,43 +445,43 @@ pub enum PlanAggregationFunction {
} }
#[derive(Eq, PartialEq, Debug, Clone, Hash)] #[derive(Eq, PartialEq, Debug, Clone, Hash)]
pub enum PlanPropertyPath { pub enum PlanPropertyPath<I: StrId> {
PredicatePath(EncodedTerm), PredicatePath(EncodedTerm<I>),
InversePath(Rc<PlanPropertyPath>), InversePath(Rc<PlanPropertyPath<I>>),
SequencePath(Rc<PlanPropertyPath>, Rc<PlanPropertyPath>), SequencePath(Rc<PlanPropertyPath<I>>, Rc<PlanPropertyPath<I>>),
AlternativePath(Rc<PlanPropertyPath>, Rc<PlanPropertyPath>), AlternativePath(Rc<PlanPropertyPath<I>>, Rc<PlanPropertyPath<I>>),
ZeroOrMorePath(Rc<PlanPropertyPath>), ZeroOrMorePath(Rc<PlanPropertyPath<I>>),
OneOrMorePath(Rc<PlanPropertyPath>), OneOrMorePath(Rc<PlanPropertyPath<I>>),
ZeroOrOnePath(Rc<PlanPropertyPath>), ZeroOrOnePath(Rc<PlanPropertyPath<I>>),
NegatedPropertySet(Rc<Vec<EncodedTerm>>), NegatedPropertySet(Rc<Vec<EncodedTerm<I>>>),
} }
#[derive(Eq, PartialEq, Debug, Clone, Hash)] #[derive(Eq, PartialEq, Debug, Clone, Hash)]
pub enum Comparator { pub enum Comparator<I: StrId> {
Asc(PlanExpression), Asc(PlanExpression<I>),
Desc(PlanExpression), Desc(PlanExpression<I>),
} }
#[derive(Eq, PartialEq, Debug, Clone, Copy, Hash)] #[derive(Eq, PartialEq, Debug, Clone, Copy, Hash)]
pub struct TripleTemplate { pub struct TripleTemplate<I: StrId> {
pub subject: TripleTemplateValue, pub subject: TripleTemplateValue<I>,
pub predicate: TripleTemplateValue, pub predicate: TripleTemplateValue<I>,
pub object: TripleTemplateValue, pub object: TripleTemplateValue<I>,
} }
#[derive(Eq, PartialEq, Debug, Clone, Copy, Hash)] #[derive(Eq, PartialEq, Debug, Clone, Copy, Hash)]
pub enum TripleTemplateValue { pub enum TripleTemplateValue<I: StrId> {
Constant(EncodedTerm), Constant(EncodedTerm<I>),
BlankNode(usize), BlankNode(usize),
Variable(usize), Variable(usize),
} }
#[derive(Eq, PartialEq, Debug, Clone, Hash)] #[derive(Eq, PartialEq, Debug, Clone, Hash)]
pub struct EncodedTuple { pub struct EncodedTuple<I: StrId> {
inner: Vec<Option<EncodedTerm>>, inner: Vec<Option<EncodedTerm<I>>>,
} }
impl EncodedTuple { impl<I: StrId> EncodedTuple<I> {
pub fn with_capacity(capacity: usize) -> Self { pub fn with_capacity(capacity: usize) -> Self {
Self { Self {
inner: Vec::with_capacity(capacity), inner: Vec::with_capacity(capacity),
@ -503,15 +496,15 @@ impl EncodedTuple {
self.inner.get(index).map_or(false, Option::is_some) self.inner.get(index).map_or(false, Option::is_some)
} }
pub fn get(&self, index: usize) -> Option<EncodedTerm> { pub fn get(&self, index: usize) -> Option<EncodedTerm<I>> {
self.inner.get(index).cloned().unwrap_or(None) self.inner.get(index).cloned().unwrap_or(None)
} }
pub fn iter<'a>(&'a self) -> impl Iterator<Item = Option<EncodedTerm>> + 'a { pub fn iter<'a>(&'a self) -> impl Iterator<Item = Option<EncodedTerm<I>>> + 'a {
self.inner.iter().cloned() self.inner.iter().cloned()
} }
pub fn set(&mut self, index: usize, value: EncodedTerm) { pub fn set(&mut self, index: usize, value: EncodedTerm<I>) {
if self.inner.len() <= index { if self.inner.len() <= index {
self.inner.resize(index + 1, None); self.inner.resize(index + 1, None);
} }
@ -524,7 +517,7 @@ impl EncodedTuple {
} }
} }
pub fn combine_with(&self, other: &EncodedTuple) -> Option<Self> { pub fn combine_with(&self, other: &EncodedTuple<I>) -> Option<Self> {
if self.inner.len() < other.inner.len() { if self.inner.len() < other.inner.len() {
let mut result = other.inner.to_owned(); let mut result = other.inner.to_owned();
for (key, self_value) in self.inner.iter().enumerate() { for (key, self_value) in self.inner.iter().enumerate() {
@ -558,106 +551,3 @@ impl EncodedTuple {
} }
} }
} }
pub(crate) struct DatasetView<S: ReadableEncodedStore> {
store: S,
extra: RefCell<MemoryStrStore>,
default_graph_as_union: bool,
}
impl<S: ReadableEncodedStore> DatasetView<S> {
pub fn new(store: S, default_graph_as_union: bool) -> Self {
Self {
store,
extra: RefCell::new(MemoryStrStore::default()),
default_graph_as_union,
}
}
pub fn quads_for_pattern(
&self,
subject: Option<EncodedTerm>,
predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>,
) -> Box<dyn Iterator<Item = Result<EncodedQuad, EvaluationError>>> {
if graph_name == None {
Box::new(
map_iter_err(
self.store
.encoded_quads_for_pattern(subject, predicate, object, None),
)
.filter(|quad| match quad {
Err(_) => true,
Ok(quad) => quad.graph_name != EncodedTerm::DefaultGraph,
}),
)
} else if graph_name == Some(EncodedTerm::DefaultGraph) && self.default_graph_as_union {
Box::new(
map_iter_err(
self.store
.encoded_quads_for_pattern(subject, predicate, object, None),
)
.map(|quad| {
let quad = quad?;
Ok(EncodedQuad::new(
quad.subject,
quad.predicate,
quad.object,
EncodedTerm::DefaultGraph,
))
}),
)
} else {
Box::new(map_iter_err(self.store.encoded_quads_for_pattern(
subject, predicate, object, graph_name,
)))
}
}
pub fn encoder<'a>(&'a self) -> impl Encoder + StrContainer + 'a {
DatasetViewStrContainer {
store: &self.store,
extra: self.extra.borrow_mut(),
}
}
}
fn map_iter_err<'a, T>(
iter: impl Iterator<Item = Result<T, impl Into<EvaluationError>>> + 'a,
) -> impl Iterator<Item = Result<T, EvaluationError>> + 'a {
iter.map(|e| e.map_err(|e| e.into()))
}
impl<S: ReadableEncodedStore> WithStoreError for DatasetView<S> {
type Error = S::Error;
}
impl<S: ReadableEncodedStore> StrLookup for DatasetView<S> {
fn get_str(&self, id: StrHash) -> Result<Option<String>, Self::Error> {
if let Some(value) = self.extra.borrow().get_str(id).unwrap_infallible() {
Ok(Some(value))
} else {
self.store.get_str(id)
}
}
}
struct DatasetViewStrContainer<'a, S: ReadableEncodedStore> {
store: &'a S,
extra: RefMut<'a, MemoryStrStore>,
}
impl<'a, S: ReadableEncodedStore> WithStoreError for DatasetViewStrContainer<'a, S> {
type Error = S::Error;
}
impl<'a, S: ReadableEncodedStore> StrContainer for DatasetViewStrContainer<'a, S> {
fn insert_str(&mut self, value: &str) -> Result<StrHash, Self::Error> {
let key = StrHash::new(value);
if self.store.get_str(key)?.is_none() {
Ok(self.extra.insert_str(value).unwrap_infallible())
} else {
Ok(key)
}
}
}

@ -1,21 +1,21 @@
use crate::model::{BlankNode, Term}; use crate::model::{BlankNode, Literal, NamedNode, Term};
use crate::sparql::algebra::*; use crate::sparql::algebra::*;
use crate::sparql::error::EvaluationError; use crate::sparql::error::EvaluationError;
use crate::sparql::model::*; use crate::sparql::model::*;
use crate::sparql::plan::*; use crate::sparql::plan::*;
use crate::store::numeric_encoder::{EncodedTerm, Encoder}; use crate::store::numeric_encoder::{EncodedTerm, WriteEncoder};
use std::collections::{BTreeSet, HashSet}; use std::collections::{BTreeSet, HashSet};
use std::rc::Rc; use std::rc::Rc;
pub(crate) struct PlanBuilder<E: Encoder> { pub(crate) struct PlanBuilder<E: WriteEncoder> {
encoder: E, encoder: E,
} }
impl<E: Encoder> PlanBuilder<E> { impl<E: WriteEncoder<Error = EvaluationError>> PlanBuilder<E> {
pub fn build( pub fn build(
encoder: E, encoder: E,
pattern: &GraphPattern, pattern: &GraphPattern,
) -> Result<(PlanNode, Vec<Variable>), EvaluationError> { ) -> Result<(PlanNode<E::StrId>, Vec<Variable>), EvaluationError> {
let mut variables = Vec::default(); let mut variables = Vec::default();
let plan = PlanBuilder { encoder }.build_for_graph_pattern( let plan = PlanBuilder { encoder }.build_for_graph_pattern(
pattern, pattern,
@ -29,7 +29,7 @@ impl<E: Encoder> PlanBuilder<E> {
encoder: E, encoder: E,
template: &[TriplePattern], template: &[TriplePattern],
mut variables: Vec<Variable>, mut variables: Vec<Variable>,
) -> Result<Vec<TripleTemplate>, EvaluationError> { ) -> Result<Vec<TripleTemplate<E::StrId>>, EvaluationError> {
PlanBuilder { encoder }.build_for_graph_template(template, &mut variables) PlanBuilder { encoder }.build_for_graph_template(template, &mut variables)
} }
@ -37,8 +37,8 @@ impl<E: Encoder> PlanBuilder<E> {
&mut self, &mut self,
pattern: &GraphPattern, pattern: &GraphPattern,
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
graph_name: PatternValue, graph_name: PatternValue<E::StrId>,
) -> Result<PlanNode, EvaluationError> { ) -> Result<PlanNode<E::StrId>, EvaluationError> {
Ok(match pattern { Ok(match pattern {
GraphPattern::BGP(p) => self.build_for_bgp(p, variables, graph_name)?, GraphPattern::BGP(p) => self.build_for_bgp(p, variables, graph_name)?,
GraphPattern::Join(a, b) => PlanNode::Join { GraphPattern::Join(a, b) => PlanNode::Join {
@ -217,8 +217,8 @@ impl<E: Encoder> PlanBuilder<E> {
&mut self, &mut self,
p: &[TripleOrPathPattern], p: &[TripleOrPathPattern],
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
graph_name: PatternValue, graph_name: PatternValue<E::StrId>,
) -> Result<PlanNode, EvaluationError> { ) -> Result<PlanNode<E::StrId>, EvaluationError> {
let mut plan = PlanNode::Init; let mut plan = PlanNode::Init;
for pattern in sort_bgp(p) { for pattern in sort_bgp(p) {
plan = match pattern { plan = match pattern {
@ -244,11 +244,14 @@ impl<E: Encoder> PlanBuilder<E> {
Ok(plan) Ok(plan)
} }
fn build_for_path(&mut self, path: &PropertyPath) -> Result<PlanPropertyPath, EvaluationError> { fn build_for_path(
&mut self,
path: &PropertyPath,
) -> Result<PlanPropertyPath<E::StrId>, EvaluationError> {
Ok(match path { Ok(match path {
PropertyPath::PredicatePath(p) => PlanPropertyPath::PredicatePath( PropertyPath::PredicatePath(p) => {
self.encoder.encode_named_node(p).map_err(|e| e.into())?, PlanPropertyPath::PredicatePath(self.build_named_node(p)?)
), }
PropertyPath::InversePath(p) => { PropertyPath::InversePath(p) => {
PlanPropertyPath::InversePath(Rc::new(self.build_for_path(p)?)) PlanPropertyPath::InversePath(Rc::new(self.build_for_path(p)?))
} }
@ -271,7 +274,7 @@ impl<E: Encoder> PlanBuilder<E> {
} }
PropertyPath::NegatedPropertySet(p) => PlanPropertyPath::NegatedPropertySet(Rc::new( PropertyPath::NegatedPropertySet(p) => PlanPropertyPath::NegatedPropertySet(Rc::new(
p.iter() p.iter()
.map(|p| self.encoder.encode_named_node(p).map_err(|e| e.into())) .map(|p| self.build_named_node(p))
.collect::<Result<Vec<_>, _>>()?, .collect::<Result<Vec<_>, _>>()?,
)), )),
}) })
@ -281,15 +284,11 @@ impl<E: Encoder> PlanBuilder<E> {
&mut self, &mut self,
expression: &Expression, expression: &Expression,
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
graph_name: PatternValue, graph_name: PatternValue<E::StrId>,
) -> Result<PlanExpression, EvaluationError> { ) -> Result<PlanExpression<E::StrId>, EvaluationError> {
Ok(match expression { Ok(match expression {
Expression::NamedNode(node) => PlanExpression::Constant( Expression::NamedNode(node) => PlanExpression::Constant(self.build_named_node(node)?),
self.encoder.encode_named_node(node).map_err(|e| e.into())?, Expression::Literal(l) => PlanExpression::Constant(self.build_literal(l)?),
),
Expression::Literal(l) => {
PlanExpression::Constant(self.encoder.encode_literal(l).map_err(|e| e.into())?)
}
Expression::Variable(v) => PlanExpression::Variable(variable_key(variables, v)), Expression::Variable(v) => PlanExpression::Variable(variable_key(variables, v)),
Expression::Or(a, b) => PlanExpression::Or( Expression::Or(a, b) => PlanExpression::Or(
Box::new(self.build_for_expression(a, variables, graph_name)?), Box::new(self.build_for_expression(a, variables, graph_name)?),
@ -695,11 +694,11 @@ impl<E: Encoder> PlanBuilder<E> {
fn build_cast( fn build_cast(
&mut self, &mut self,
parameters: &[Expression], parameters: &[Expression],
constructor: impl Fn(Box<PlanExpression>) -> PlanExpression, constructor: impl Fn(Box<PlanExpression<E::StrId>>) -> PlanExpression<E::StrId>,
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
graph_name: PatternValue, graph_name: PatternValue<E::StrId>,
name: &'static str, name: &'static str,
) -> Result<PlanExpression, EvaluationError> { ) -> Result<PlanExpression<E::StrId>, EvaluationError> {
if parameters.len() == 1 { if parameters.len() == 1 {
Ok(constructor(Box::new(self.build_for_expression( Ok(constructor(Box::new(self.build_for_expression(
&parameters[0], &parameters[0],
@ -718,8 +717,8 @@ impl<E: Encoder> PlanBuilder<E> {
&mut self, &mut self,
l: &[Expression], l: &[Expression],
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
graph_name: PatternValue, graph_name: PatternValue<E::StrId>,
) -> Result<Vec<PlanExpression>, EvaluationError> { ) -> Result<Vec<PlanExpression<E::StrId>>, EvaluationError> {
l.iter() l.iter()
.map(|e| self.build_for_expression(e, variables, graph_name)) .map(|e| self.build_for_expression(e, variables, graph_name))
.collect() .collect()
@ -729,7 +728,7 @@ impl<E: Encoder> PlanBuilder<E> {
&mut self, &mut self,
term_or_variable: &TermOrVariable, term_or_variable: &TermOrVariable,
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
) -> Result<PatternValue, EvaluationError> { ) -> Result<PatternValue<E::StrId>, EvaluationError> {
Ok(match term_or_variable { Ok(match term_or_variable {
TermOrVariable::Variable(variable) => { TermOrVariable::Variable(variable) => {
PatternValue::Variable(variable_key(variables, variable)) PatternValue::Variable(variable_key(variables, variable))
@ -738,9 +737,7 @@ impl<E: Encoder> PlanBuilder<E> {
PatternValue::Variable(variable_key(variables, &Variable::new(bnode.as_str()))) PatternValue::Variable(variable_key(variables, &Variable::new(bnode.as_str())))
//TODO: very bad hack to convert bnode to variable //TODO: very bad hack to convert bnode to variable
} }
TermOrVariable::Term(term) => { TermOrVariable::Term(term) => PatternValue::Constant(self.build_term(term)?),
PatternValue::Constant(self.encoder.encode_term(term).map_err(|e| e.into())?)
}
}) })
} }
@ -748,13 +745,11 @@ impl<E: Encoder> PlanBuilder<E> {
&mut self, &mut self,
named_node_or_variable: &NamedNodeOrVariable, named_node_or_variable: &NamedNodeOrVariable,
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
) -> Result<PatternValue, EvaluationError> { ) -> Result<PatternValue<E::StrId>, EvaluationError> {
Ok(match named_node_or_variable { Ok(match named_node_or_variable {
NamedNodeOrVariable::NamedNode(named_node) => PatternValue::Constant( NamedNodeOrVariable::NamedNode(named_node) => {
self.encoder PatternValue::Constant(self.build_named_node(named_node)?)
.encode_named_node(named_node) }
.map_err(|e| e.into())?,
),
NamedNodeOrVariable::Variable(variable) => { NamedNodeOrVariable::Variable(variable) => {
PatternValue::Variable(variable_key(variables, variable)) PatternValue::Variable(variable_key(variables, variable))
} }
@ -765,7 +760,7 @@ impl<E: Encoder> PlanBuilder<E> {
&mut self, &mut self,
bindings: &StaticBindings, bindings: &StaticBindings,
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
) -> Result<Vec<EncodedTuple>, EvaluationError> { ) -> Result<Vec<EncodedTuple<E::StrId>>, EvaluationError> {
let bindings_variables_keys = bindings let bindings_variables_keys = bindings
.variables() .variables()
.iter() .iter()
@ -777,10 +772,7 @@ impl<E: Encoder> PlanBuilder<E> {
let mut result = EncodedTuple::with_capacity(variables.len()); let mut result = EncodedTuple::with_capacity(variables.len());
for (key, value) in values.iter().enumerate() { for (key, value) in values.iter().enumerate() {
if let Some(term) = value { if let Some(term) = value {
result.set( result.set(bindings_variables_keys[key], self.build_term(term)?);
bindings_variables_keys[key],
self.encoder.encode_term(term).map_err(|e| e.into())?,
);
} }
} }
Ok(result) Ok(result)
@ -792,8 +784,8 @@ impl<E: Encoder> PlanBuilder<E> {
&mut self, &mut self,
aggregate: &Aggregation, aggregate: &Aggregation,
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
graph_name: PatternValue, graph_name: PatternValue<E::StrId>,
) -> Result<PlanAggregation, EvaluationError> { ) -> Result<PlanAggregation<E::StrId>, EvaluationError> {
Ok(match aggregate { Ok(match aggregate {
Aggregation::Count(e, distinct) => PlanAggregation { Aggregation::Count(e, distinct) => PlanAggregation {
function: PlanAggregationFunction::Count, function: PlanAggregationFunction::Count,
@ -842,7 +834,7 @@ impl<E: Encoder> PlanBuilder<E> {
&mut self, &mut self,
template: &[TriplePattern], template: &[TriplePattern],
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
) -> Result<Vec<TripleTemplate>, EvaluationError> { ) -> Result<Vec<TripleTemplate<E::StrId>>, EvaluationError> {
let mut bnodes = Vec::default(); let mut bnodes = Vec::default();
template template
.iter() .iter()
@ -870,7 +862,7 @@ impl<E: Encoder> PlanBuilder<E> {
term_or_variable: &TermOrVariable, term_or_variable: &TermOrVariable,
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
bnodes: &mut Vec<BlankNode>, bnodes: &mut Vec<BlankNode>,
) -> Result<TripleTemplateValue, EvaluationError> { ) -> Result<TripleTemplateValue<E::StrId>, EvaluationError> {
Ok(match term_or_variable { Ok(match term_or_variable {
TermOrVariable::Variable(variable) => { TermOrVariable::Variable(variable) => {
TripleTemplateValue::Variable(variable_key(variables, variable)) TripleTemplateValue::Variable(variable_key(variables, variable))
@ -878,9 +870,7 @@ impl<E: Encoder> PlanBuilder<E> {
TermOrVariable::Term(Term::BlankNode(bnode)) => { TermOrVariable::Term(Term::BlankNode(bnode)) => {
TripleTemplateValue::BlankNode(bnode_key(bnodes, bnode)) TripleTemplateValue::BlankNode(bnode_key(bnodes, bnode))
} }
TermOrVariable::Term(term) => { TermOrVariable::Term(term) => TripleTemplateValue::Constant(self.build_term(term)?),
TripleTemplateValue::Constant(self.encoder.encode_term(term).map_err(|e| e.into())?)
}
}) })
} }
@ -888,23 +878,23 @@ impl<E: Encoder> PlanBuilder<E> {
&mut self, &mut self,
named_node_or_variable: &NamedNodeOrVariable, named_node_or_variable: &NamedNodeOrVariable,
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
) -> Result<TripleTemplateValue, EvaluationError> { ) -> Result<TripleTemplateValue<E::StrId>, EvaluationError> {
Ok(match named_node_or_variable { Ok(match named_node_or_variable {
NamedNodeOrVariable::Variable(variable) => { NamedNodeOrVariable::Variable(variable) => {
TripleTemplateValue::Variable(variable_key(variables, variable)) TripleTemplateValue::Variable(variable_key(variables, variable))
} }
NamedNodeOrVariable::NamedNode(term) => TripleTemplateValue::Constant( NamedNodeOrVariable::NamedNode(term) => {
self.encoder.encode_named_node(term).map_err(|e| e.into())?, TripleTemplateValue::Constant(self.build_named_node(term)?)
), }
}) })
} }
fn convert_pattern_value_id( fn convert_pattern_value_id(
&self, &self,
from_value: PatternValue, from_value: PatternValue<E::StrId>,
from: &[Variable], from: &[Variable],
to: &mut Vec<Variable>, to: &mut Vec<Variable>,
) -> PatternValue { ) -> PatternValue<E::StrId> {
match from_value { match from_value {
PatternValue::Constant(v) => PatternValue::Constant(v), PatternValue::Constant(v) => PatternValue::Constant(v),
PatternValue::Variable(from_id) => { PatternValue::Variable(from_id) => {
@ -933,7 +923,11 @@ impl<E: Encoder> PlanBuilder<E> {
} }
} }
fn add_left_join_problematic_variables(&self, node: &PlanNode, set: &mut BTreeSet<usize>) { fn add_left_join_problematic_variables(
&self,
node: &PlanNode<E::StrId>,
set: &mut BTreeSet<usize>,
) {
match node { match node {
PlanNode::Init PlanNode::Init
| PlanNode::StaticBindings { .. } | PlanNode::StaticBindings { .. }
@ -995,6 +989,21 @@ impl<E: Encoder> PlanBuilder<E> {
} }
} }
} }
fn build_named_node(
&mut self,
term: &NamedNode,
) -> Result<EncodedTerm<E::StrId>, EvaluationError> {
Ok(self.encoder.encode_named_node(term)?)
}
fn build_literal(&mut self, term: &Literal) -> Result<EncodedTerm<E::StrId>, EvaluationError> {
Ok(self.encoder.encode_literal(term)?)
}
fn build_term(&mut self, term: &Term) -> Result<EncodedTerm<E::StrId>, EvaluationError> {
Ok(self.encoder.encode_term(term)?)
}
} }
fn variable_key(variables: &mut Vec<Variable>, variable: &Variable) -> usize { fn variable_key(variables: &mut Vec<Variable>, variable: &Variable) -> usize {

@ -4,9 +4,13 @@ use crate::error::UnwrapInfallible;
use crate::io::{DatasetFormat, GraphFormat}; use crate::io::{DatasetFormat, GraphFormat};
use crate::model::*; use crate::model::*;
use crate::sparql::{EvaluationError, Query, QueryOptions, QueryResult, SimplePreparedQuery}; use crate::sparql::{EvaluationError, Query, QueryOptions, QueryResult, SimplePreparedQuery};
use crate::store::numeric_encoder::*; use crate::store::numeric_encoder::{
write_term, Decoder, ReadEncoder, StrContainer, StrHash, StrLookup, WithStoreError,
WriteEncoder, WRITTEN_TERM_MAX_SIZE,
};
use crate::store::{ use crate::store::{
dump_dataset, dump_graph, load_dataset, load_graph, ReadableEncodedStore, WritableEncodedStore, dump_dataset, dump_graph, get_encoded_quad_pattern, load_dataset, load_graph,
ReadableEncodedStore, WritableEncodedStore,
}; };
use std::collections::hash_map::DefaultHasher; use std::collections::hash_map::DefaultHasher;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
@ -55,6 +59,8 @@ type TrivialHashMap<K, V> = HashMap<K, V, BuildHasherDefault<TrivialHasher>>;
type TrivialHashSet<T> = HashSet<T, BuildHasherDefault<TrivialHasher>>; type TrivialHashSet<T> = HashSet<T, BuildHasherDefault<TrivialHasher>>;
type TripleMap<T> = TrivialHashMap<T, TrivialHashMap<T, TrivialHashSet<T>>>; type TripleMap<T> = TrivialHashMap<T, TrivialHashMap<T, TrivialHashSet<T>>>;
type QuadMap<T> = TrivialHashMap<T, TripleMap<T>>; type QuadMap<T> = TrivialHashMap<T, TripleMap<T>>;
type EncodedTerm = crate::store::numeric_encoder::EncodedTerm<StrHash>;
type EncodedQuad = crate::store::numeric_encoder::EncodedQuad<StrHash>;
#[derive(Default)] #[derive(Default)]
struct MemoryStoreIndexes { struct MemoryStoreIndexes {
@ -169,22 +175,25 @@ impl MemoryStore {
object: Option<&Term>, object: Option<&Term>,
graph_name: Option<&GraphName>, graph_name: Option<&GraphName>,
) -> impl Iterator<Item = Quad> { ) -> impl Iterator<Item = Quad> {
let subject = subject.map(|s| s.into()); let quads = if let Some((subject, predicate, object, graph_name)) =
let predicate = predicate.map(|p| p.into()); get_encoded_quad_pattern(self, subject, predicate, object, graph_name)
let object = object.map(|o| o.into()); .unwrap_infallible()
let graph_name = graph_name.map(|g| g.into()); {
self.encoded_quads_for_pattern_inner(subject, predicate, object, graph_name)
} else {
Vec::new()
};
let this = self.clone(); let this = self.clone();
self.encoded_quads_for_pattern_inner(subject, predicate, object, graph_name) quads.into_iter().map(
.into_iter() move |quad| this.decode_quad(&quad).unwrap(), // Could not fail
.map( )
move |quad| this.decode_quad(&quad).unwrap(), // Could not fail
)
} }
/// Checks if this store contains a given quad /// Checks if this store contains a given quad
pub fn contains(&self, quad: &Quad) -> bool { pub fn contains(&self, quad: &Quad) -> bool {
let quad = quad.into(); self.get_encoded_quad(quad)
self.contains_encoded(&quad) .unwrap_infallible()
.map_or(false, |q| self.contains_encoded(&q))
} }
/// Returns the number of quads in the store /// Returns the number of quads in the store
@ -238,7 +247,7 @@ impl MemoryStore {
let mut transaction = MemoryTransaction { let mut transaction = MemoryTransaction {
store: self, store: self,
ops: Vec::new(), ops: Vec::new(),
strings: Vec::new(), strings: TrivialHashMap::default(),
}; };
f(&mut transaction)?; f(&mut transaction)?;
transaction.commit(); transaction.commit();
@ -317,16 +326,17 @@ impl MemoryStore {
/// Adds a quad to this store. /// Adds a quad to this store.
#[allow(clippy::needless_pass_by_value)] #[allow(clippy::needless_pass_by_value)]
pub fn insert(&self, quad: Quad) { pub fn insert(&self, quad: Quad) {
let mut store = self; let mut indexes = self.indexes_mut();
let quad = store.encode_quad(&quad).unwrap_infallible(); let quad = indexes.encode_quad(&quad).unwrap_infallible();
store.insert_encoded(&quad).unwrap_infallible(); indexes.insert_encoded(&quad).unwrap_infallible();
} }
/// Removes a quad from this store. /// Removes a quad from this store.
pub fn remove(&self, quad: &Quad) { pub fn remove(&self, quad: &Quad) {
let mut store = self; let mut indexes = self.indexes_mut();
let quad = quad.into(); if let Some(quad) = indexes.get_encoded_quad(quad).unwrap_infallible() {
store.remove_encoded(&quad).unwrap_infallible(); indexes.remove_encoded(&quad).unwrap_infallible();
}
} }
/// Returns if the current dataset is [isomorphic](https://www.w3.org/TR/rdf11-concepts/#dfn-dataset-isomorphism) with another one. /// Returns if the current dataset is [isomorphic](https://www.w3.org/TR/rdf11-concepts/#dfn-dataset-isomorphism) with another one.
@ -689,16 +699,17 @@ impl MemoryStore {
impl WithStoreError for MemoryStore { impl WithStoreError for MemoryStore {
type Error = Infallible; type Error = Infallible;
} type StrId = StrHash;
impl<'a> WithStoreError for &'a MemoryStore {
type Error = Infallible;
} }
impl StrLookup for MemoryStore { impl StrLookup for MemoryStore {
fn get_str(&self, id: StrHash) -> Result<Option<String>, Infallible> { fn get_str(&self, id: StrHash) -> Result<Option<String>, Infallible> {
self.indexes().get_str(id) self.indexes().get_str(id)
} }
fn get_str_id(&self, value: &str) -> Result<Option<StrHash>, Infallible> {
self.indexes().get_str_id(value)
}
} }
impl<'a> StrContainer for &'a MemoryStore { impl<'a> StrContainer for &'a MemoryStore {
@ -737,6 +748,7 @@ impl<'a> WritableEncodedStore for &'a MemoryStore {
impl WithStoreError for MemoryStoreIndexes { impl WithStoreError for MemoryStoreIndexes {
type Error = Infallible; type Error = Infallible;
type StrId = StrHash;
} }
impl StrLookup for MemoryStoreIndexes { impl StrLookup for MemoryStoreIndexes {
@ -744,6 +756,15 @@ impl StrLookup for MemoryStoreIndexes {
//TODO: avoid copy by adding a lifetime limit to get_str //TODO: avoid copy by adding a lifetime limit to get_str
Ok(self.id2str.get(&id).cloned()) Ok(self.id2str.get(&id).cloned())
} }
fn get_str_id(&self, value: &str) -> Result<Option<StrHash>, Infallible> {
let id = StrHash::new(value);
Ok(if self.id2str.contains_key(&id) {
Some(id)
} else {
None
})
}
} }
impl StrContainer for MemoryStoreIndexes { impl StrContainer for MemoryStoreIndexes {
@ -941,7 +962,7 @@ impl MemoryPreparedQuery {
pub struct MemoryTransaction<'a> { pub struct MemoryTransaction<'a> {
store: &'a MemoryStore, store: &'a MemoryStore,
ops: Vec<TransactionOp>, ops: Vec<TransactionOp>,
strings: Vec<(StrHash, String)>, strings: TrivialHashMap<StrHash, String>,
} }
enum TransactionOp { enum TransactionOp {
@ -1022,8 +1043,9 @@ impl<'a> MemoryTransaction<'a> {
/// Removes a quad from this store during the transaction. /// Removes a quad from this store during the transaction.
pub fn remove(&mut self, quad: &Quad) { pub fn remove(&mut self, quad: &Quad) {
let quad = quad.into(); if let Some(quad) = self.get_encoded_quad(quad).unwrap_infallible() {
self.remove_encoded(&quad).unwrap_infallible(); self.remove_encoded(&quad).unwrap_infallible();
}
} }
fn commit(self) { fn commit(self) {
@ -1038,14 +1060,34 @@ impl<'a> MemoryTransaction<'a> {
} }
} }
impl StrLookup for MemoryTransaction<'_> {
fn get_str(&self, id: StrHash) -> Result<Option<String>, Infallible> {
if let Some(str) = self.strings.get(&id) {
Ok(Some(str.clone()))
} else {
self.store.get_str(id)
}
}
fn get_str_id(&self, value: &str) -> Result<Option<StrHash>, Infallible> {
let id = StrHash::new(value);
if self.strings.contains_key(&id) {
Ok(Some(id))
} else {
self.store.get_str_id(value)
}
}
}
impl WithStoreError for MemoryTransaction<'_> { impl WithStoreError for MemoryTransaction<'_> {
type Error = Infallible; type Error = Infallible;
type StrId = StrHash;
} }
impl StrContainer for MemoryTransaction<'_> { impl StrContainer for MemoryTransaction<'_> {
fn insert_str(&mut self, value: &str) -> Result<StrHash, Infallible> { fn insert_str(&mut self, value: &str) -> Result<StrHash, Infallible> {
let key = StrHash::new(value); let key = StrHash::new(value);
self.strings.push((key, value.to_owned())); self.strings.insert(key, value.to_owned());
Ok(key) Ok(key)
} }
} }

@ -34,21 +34,21 @@ use std::io::{BufRead, Write};
use std::iter::Iterator; use std::iter::Iterator;
pub(crate) trait ReadableEncodedStore: StrLookup { pub(crate) trait ReadableEncodedStore: StrLookup {
type QuadsIter: Iterator<Item = Result<EncodedQuad, Self::Error>> + 'static; type QuadsIter: Iterator<Item = Result<EncodedQuad<Self::StrId>, Self::Error>> + 'static;
fn encoded_quads_for_pattern( fn encoded_quads_for_pattern(
&self, &self,
subject: Option<EncodedTerm>, subject: Option<EncodedTerm<Self::StrId>>,
predicate: Option<EncodedTerm>, predicate: Option<EncodedTerm<Self::StrId>>,
object: Option<EncodedTerm>, object: Option<EncodedTerm<Self::StrId>>,
graph_name: Option<EncodedTerm>, graph_name: Option<EncodedTerm<Self::StrId>>,
) -> Self::QuadsIter; ) -> Self::QuadsIter;
} }
pub(crate) trait WritableEncodedStore: StrContainer { pub(crate) trait WritableEncodedStore: StrContainer {
fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<(), Self::Error>; fn insert_encoded(&mut self, quad: &EncodedQuad<Self::StrId>) -> Result<(), Self::Error>;
fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<(), Self::Error>; fn remove_encoded(&mut self, quad: &EncodedQuad<Self::StrId>) -> Result<(), Self::Error>;
} }
fn load_graph<S: WritableEncodedStore>( fn load_graph<S: WritableEncodedStore>(
@ -239,3 +239,61 @@ impl<P: Into<io::Error>> From<StoreOrParseError<Infallible, P>> for io::Error {
} }
} }
} }
type QuadPattern<I> = (
Option<EncodedTerm<I>>,
Option<EncodedTerm<I>>,
Option<EncodedTerm<I>>,
Option<EncodedTerm<I>>,
);
fn get_encoded_quad_pattern<E: ReadEncoder>(
encoder: &E,
subject: Option<&NamedOrBlankNode>,
predicate: Option<&NamedNode>,
object: Option<&Term>,
graph_name: Option<&GraphName>,
) -> Result<Option<QuadPattern<E::StrId>>, E::Error> {
Ok(Some((
if let Some(subject) = transpose(
subject
.map(|t| encoder.get_encoded_named_or_blank_node(t))
.transpose()?,
) {
subject
} else {
return Ok(None);
},
if let Some(predicate) = transpose(
predicate
.map(|t| encoder.get_encoded_named_node(t))
.transpose()?,
) {
predicate
} else {
return Ok(None);
},
if let Some(object) = transpose(object.map(|t| encoder.get_encoded_term(t)).transpose()?) {
object
} else {
return Ok(None);
},
if let Some(graph_name) = transpose(
graph_name
.map(|t| encoder.get_encoded_graph_name(t))
.transpose()?,
) {
graph_name
} else {
return Ok(None);
},
)))
}
fn transpose<T>(o: Option<Option<T>>) -> Option<Option<T>> {
match o {
Some(Some(v)) => Some(Some(v)),
Some(None) => None,
None => Some(None),
}
}

File diff suppressed because it is too large Load Diff

@ -1,17 +1,23 @@
//! Store based on the [RocksDB](https://rocksdb.org/) key-value database. //! Store based on the [RocksDB](https://rocksdb.org/) key-value database.
use crate::error::{invalid_data_error, UnwrapInfallible}; use crate::error::invalid_data_error;
use crate::io::{DatasetFormat, GraphFormat}; use crate::io::{DatasetFormat, GraphFormat};
use crate::model::*; use crate::model::*;
use crate::sparql::{EvaluationError, Query, QueryOptions, QueryResult, SimplePreparedQuery}; use crate::sparql::{EvaluationError, Query, QueryOptions, QueryResult, SimplePreparedQuery};
use crate::store::numeric_encoder::*; use crate::store::numeric_encoder::{
write_term, Decoder, ReadEncoder, StrContainer, StrHash, StrLookup, TermReader, WithStoreError,
WriteEncoder, WRITTEN_TERM_MAX_SIZE,
};
use crate::store::{ use crate::store::{
dump_dataset, dump_graph, load_dataset, load_graph, ReadableEncodedStore, WritableEncodedStore, dump_dataset, dump_graph, get_encoded_quad_pattern, load_dataset, load_graph,
ReadableEncodedStore, WritableEncodedStore,
}; };
use rocksdb::*; use rocksdb::*;
use std::convert::{Infallible, TryInto}; use std::collections::HashMap;
use std::convert::TryInto;
use std::io; use std::io;
use std::io::{BufRead, Cursor, Write}; use std::io::{BufRead, Cursor, Write};
use std::iter::{once, Once};
use std::mem::{take, transmute}; use std::mem::{take, transmute};
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
@ -55,6 +61,9 @@ pub struct RocksDbStore {
db: Arc<DB>, db: Arc<DB>,
} }
type EncodedTerm = crate::store::numeric_encoder::EncodedTerm<StrHash>;
type EncodedQuad = crate::store::numeric_encoder::EncodedQuad<StrHash>;
const ID2STR_CF: &str = "id2str"; const ID2STR_CF: &str = "id2str";
const SPOG_CF: &str = "spog"; const SPOG_CF: &str = "spog";
const POSG_CF: &str = "posg"; const POSG_CF: &str = "posg";
@ -121,19 +130,23 @@ impl RocksDbStore {
object: Option<&Term>, object: Option<&Term>,
graph_name: Option<&GraphName>, graph_name: Option<&GraphName>,
) -> impl Iterator<Item = Result<Quad, io::Error>> { ) -> impl Iterator<Item = Result<Quad, io::Error>> {
let subject = subject.map(|s| s.into()); match get_encoded_quad_pattern(self, subject, predicate, object, graph_name) {
let predicate = predicate.map(|p| p.into()); Ok(Some((subject, predicate, object, graph_name))) => QuadsIter::Quads {
let object = object.map(|o| o.into()); iter: self.encoded_quads_for_pattern(subject, predicate, object, graph_name),
let graph_name = graph_name.map(|g| g.into()); store: self.clone(),
let store = self.clone(); },
self.encoded_quads_for_pattern(subject, predicate, object, graph_name) Ok(None) => QuadsIter::Empty,
.map(move |quad| Ok(store.decode_quad(&quad?)?)) Err(error) => QuadsIter::Error(once(error)),
}
} }
/// Checks if this store contains a given quad /// Checks if this store contains a given quad
pub fn contains(&self, quad: &Quad) -> Result<bool, io::Error> { pub fn contains(&self, quad: &Quad) -> Result<bool, io::Error> {
let quad = quad.into(); if let Some(quad) = self.get_encoded_quad(quad)? {
self.contains_encoded(&quad) self.contains_encoded(&quad)
} else {
Ok(false)
}
} }
/// Returns the number of quads in the store /// Returns the number of quads in the store
@ -162,14 +175,13 @@ impl RocksDbStore {
f: impl FnOnce(&mut RocksDbTransaction<'a>) -> Result<(), E>, f: impl FnOnce(&mut RocksDbTransaction<'a>) -> Result<(), E>,
) -> Result<(), E> { ) -> Result<(), E> {
let mut transaction = RocksDbTransaction { let mut transaction = RocksDbTransaction {
inner: BatchWriter { store: self,
store: self, batch: WriteBatch::default(),
batch: WriteBatch::default(), buffer: Vec::new(),
buffer: Vec::default(), new_strings: HashMap::new(),
},
}; };
f(&mut transaction)?; f(&mut transaction)?;
Ok(transaction.inner.apply()?) Ok(transaction.apply()?)
} }
/// Loads a graph file (i.e. triples) into the store /// Loads a graph file (i.e. triples) into the store
@ -225,10 +237,13 @@ impl RocksDbStore {
/// Removes a quad from this store. /// Removes a quad from this store.
pub fn remove(&self, quad: &Quad) -> Result<(), io::Error> { pub fn remove(&self, quad: &Quad) -> Result<(), io::Error> {
let mut transaction = self.auto_batch_writer(); if let Some(quad) = self.get_encoded_quad(quad)? {
let quad = quad.into(); let mut transaction = self.auto_batch_writer();
transaction.remove_encoded(&quad)?; transaction.remove_encoded(&quad)?;
transaction.apply() transaction.apply()
} else {
Ok(())
}
} }
/// Dumps a store graph into a file. /// Dumps a store graph into a file.
@ -289,11 +304,9 @@ impl RocksDbStore {
fn auto_batch_writer(&self) -> AutoBatchWriter<'_> { fn auto_batch_writer(&self) -> AutoBatchWriter<'_> {
AutoBatchWriter { AutoBatchWriter {
inner: BatchWriter { store: self,
store: self, batch: WriteBatch::default(),
batch: WriteBatch::default(), buffer: Vec::default(),
buffer: Vec::default(),
},
} }
} }
@ -463,6 +476,7 @@ impl fmt::Display for RocksDbStore {
impl WithStoreError for RocksDbStore { impl WithStoreError for RocksDbStore {
type Error = io::Error; type Error = io::Error;
type StrId = StrHash;
} }
impl StrLookup for RocksDbStore { impl StrLookup for RocksDbStore {
@ -474,6 +488,22 @@ impl StrLookup for RocksDbStore {
.transpose() .transpose()
.map_err(invalid_data_error) .map_err(invalid_data_error)
} }
fn get_str_id(&self, value: &str) -> Result<Option<StrHash>, io::Error> {
let id = StrHash::new(value);
Ok(
if self
.db
.get_cf(self.id2str_cf(), &id.to_be_bytes())
.map_err(map_err)?
.is_some()
{
Some(id)
} else {
None
},
)
}
} }
impl ReadableEncodedStore for RocksDbStore { impl ReadableEncodedStore for RocksDbStore {
@ -553,9 +583,106 @@ impl RocksDbPreparedQuery {
} }
} }
struct AutoBatchWriter<'a> {
store: &'a RocksDbStore,
batch: WriteBatch,
buffer: Vec<u8>,
}
impl AutoBatchWriter<'_> {
fn apply(self) -> Result<(), io::Error> {
self.store.db.write(self.batch).map_err(map_err)
}
fn apply_if_big(&mut self) -> Result<(), io::Error> {
if self.batch.len() > MAX_TRANSACTION_SIZE {
self.store
.db
.write(take(&mut self.batch))
.map_err(map_err)?;
}
Ok(())
}
}
impl WithStoreError for AutoBatchWriter<'_> {
type Error = io::Error;
type StrId = StrHash;
}
impl StrContainer for AutoBatchWriter<'_> {
fn insert_str(&mut self, value: &str) -> Result<StrHash, io::Error> {
let key = StrHash::new(value);
self.batch
.put_cf(self.store.id2str_cf(), &key.to_be_bytes(), value);
Ok(key)
}
}
impl WritableEncodedStore for AutoBatchWriter<'_> {
fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> {
write_spog_quad(&mut self.buffer, quad);
self.batch.put_cf(self.store.spog_cf(), &self.buffer, &[]);
self.buffer.clear();
write_posg_quad(&mut self.buffer, quad);
self.batch.put_cf(self.store.posg_cf(), &self.buffer, &[]);
self.buffer.clear();
write_ospg_quad(&mut self.buffer, quad);
self.batch.put_cf(self.store.ospg_cf(), &self.buffer, &[]);
self.buffer.clear();
write_gspo_quad(&mut self.buffer, quad);
self.batch.put_cf(self.store.gspo_cf(), &self.buffer, &[]);
self.buffer.clear();
write_gpos_quad(&mut self.buffer, quad);
self.batch.put_cf(self.store.gpos_cf(), &self.buffer, &[]);
self.buffer.clear();
write_gosp_quad(&mut self.buffer, quad);
self.batch.put_cf(self.store.gosp_cf(), &self.buffer, &[]);
self.buffer.clear();
self.apply_if_big()
}
fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> {
write_spog_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.store.spog_cf(), &self.buffer);
self.buffer.clear();
write_posg_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.store.posg_cf(), &self.buffer);
self.buffer.clear();
write_ospg_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.store.ospg_cf(), &self.buffer);
self.buffer.clear();
write_gspo_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.store.gspo_cf(), &self.buffer);
self.buffer.clear();
write_gpos_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.store.gpos_cf(), &self.buffer);
self.buffer.clear();
write_gosp_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.store.gosp_cf(), &self.buffer);
self.buffer.clear();
self.apply_if_big()
}
}
/// Allows inserting and deleting quads during a transaction with the `RocksDbStore`. /// Allows inserting and deleting quads during a transaction with the `RocksDbStore`.
pub struct RocksDbTransaction<'a> { pub struct RocksDbTransaction<'a> {
inner: BatchWriter<'a>, store: &'a RocksDbStore,
batch: WriteBatch,
buffer: Vec<u8>,
new_strings: HashMap<StrHash, String>,
} }
impl RocksDbTransaction<'_> { impl RocksDbTransaction<'_> {
@ -576,7 +703,7 @@ impl RocksDbTransaction<'_> {
to_graph_name: &GraphName, to_graph_name: &GraphName,
base_iri: Option<&str>, base_iri: Option<&str>,
) -> Result<(), io::Error> { ) -> Result<(), io::Error> {
load_graph(&mut self.inner, reader, syntax, to_graph_name, base_iri)?; load_graph(self, reader, syntax, to_graph_name, base_iri)?;
Ok(()) Ok(())
} }
@ -596,44 +723,76 @@ impl RocksDbTransaction<'_> {
format: DatasetFormat, format: DatasetFormat,
base_iri: Option<&str>, base_iri: Option<&str>,
) -> Result<(), io::Error> { ) -> Result<(), io::Error> {
load_dataset(&mut self.inner, reader, format, base_iri)?; load_dataset(self, reader, format, base_iri)?;
Ok(()) Ok(())
} }
/// Adds a quad to this store during the transaction. /// Adds a quad to this store during the transaction.
pub fn insert(&mut self, quad: &Quad) { pub fn insert(&mut self, quad: &Quad) -> Result<(), io::Error> {
let quad = self.inner.encode_quad(quad).unwrap_infallible(); let quad = self.encode_quad(quad)?;
self.inner.insert_encoded(&quad).unwrap_infallible() self.insert_encoded(&quad)
} }
/// Removes a quad from this store during the transaction. /// Removes a quad from this store during the transaction.
pub fn remove(&mut self, quad: &Quad) { pub fn remove(&mut self, quad: &Quad) -> Result<(), io::Error> {
let quad = quad.into(); // Works because all strings could be encoded
self.inner.remove_encoded(&quad).unwrap_infallible() if let Some(quad) = self.get_encoded_quad(quad).unwrap() {
self.remove_encoded(&quad)
} else {
Ok(())
}
}
fn apply(self) -> Result<(), io::Error> {
self.store.db.write(self.batch).map_err(map_err)
} }
} }
struct BatchWriter<'a> { impl WithStoreError for RocksDbTransaction<'_> {
store: &'a RocksDbStore, type Error = io::Error;
batch: WriteBatch, type StrId = StrHash;
buffer: Vec<u8>,
} }
impl WithStoreError for BatchWriter<'_> { impl StrLookup for RocksDbTransaction<'_> {
type Error = Infallible; fn get_str(&self, id: StrHash) -> Result<Option<String>, io::Error> {
if let Some(str) = self.new_strings.get(&id) {
Ok(Some(str.clone()))
} else {
self.store.get_str(id)
}
}
fn get_str_id(&self, value: &str) -> Result<Option<StrHash>, io::Error> {
let id = StrHash::new(value);
Ok(
if self.new_strings.contains_key(&id)
|| self
.store
.db
.get_cf(self.store.id2str_cf(), &id.to_be_bytes())
.map_err(map_err)?
.is_some()
{
Some(id)
} else {
None
},
)
}
} }
impl StrContainer for BatchWriter<'_> { impl StrContainer for RocksDbTransaction<'_> {
fn insert_str(&mut self, value: &str) -> Result<StrHash, Infallible> { fn insert_str(&mut self, value: &str) -> Result<StrHash, io::Error> {
let key = StrHash::new(value); let key = StrHash::new(value);
self.batch self.batch
.put_cf(self.store.id2str_cf(), &key.to_be_bytes(), value); .put_cf(self.store.id2str_cf(), &key.to_be_bytes(), value);
self.new_strings.insert(key, value.to_owned());
Ok(key) Ok(key)
} }
} }
impl WritableEncodedStore for BatchWriter<'_> { impl WritableEncodedStore for RocksDbTransaction<'_> {
fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<(), Infallible> { fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> {
write_spog_quad(&mut self.buffer, quad); write_spog_quad(&mut self.buffer, quad);
self.batch.put_cf(self.store.spog_cf(), &self.buffer, &[]); self.batch.put_cf(self.store.spog_cf(), &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
@ -661,7 +820,7 @@ impl WritableEncodedStore for BatchWriter<'_> {
Ok(()) Ok(())
} }
fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<(), Infallible> { fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> {
write_spog_quad(&mut self.buffer, quad); write_spog_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.store.spog_cf(), &self.buffer); self.batch.delete_cf(self.store.spog_cf(), &self.buffer);
self.buffer.clear(); self.buffer.clear();
@ -690,55 +849,6 @@ impl WritableEncodedStore for BatchWriter<'_> {
} }
} }
impl BatchWriter<'_> {
fn apply(self) -> Result<(), io::Error> {
self.store.db.write(self.batch).map_err(map_err)
}
}
struct AutoBatchWriter<'a> {
inner: BatchWriter<'a>,
}
impl WithStoreError for AutoBatchWriter<'_> {
type Error = io::Error;
}
impl StrContainer for AutoBatchWriter<'_> {
fn insert_str(&mut self, value: &str) -> Result<StrHash, io::Error> {
Ok(self.inner.insert_str(value).unwrap_infallible())
}
}
impl WritableEncodedStore for AutoBatchWriter<'_> {
fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> {
self.inner.insert_encoded(quad).unwrap_infallible();
self.apply_if_big()
}
fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> {
self.inner.remove_encoded(quad).unwrap_infallible();
self.apply_if_big()
}
}
impl AutoBatchWriter<'_> {
fn apply(self) -> Result<(), io::Error> {
self.inner.apply()
}
fn apply_if_big(&mut self) -> Result<(), io::Error> {
if self.inner.batch.len() > MAX_TRANSACTION_SIZE {
self.inner
.store
.db
.write(take(&mut self.inner.batch))
.map_err(map_err)?;
}
Ok(())
}
}
#[allow(clippy::expect_used)] #[allow(clippy::expect_used)]
fn get_cf<'a>(db: &'a DB, name: &str) -> &'a ColumnFamily { fn get_cf<'a>(db: &'a DB, name: &str) -> &'a ColumnFamily {
db.cf_handle(name) db.cf_handle(name)
@ -889,6 +999,30 @@ fn map_err(e: Error) -> io::Error {
io::Error::new(io::ErrorKind::Other, e) io::Error::new(io::ErrorKind::Other, e)
} }
enum QuadsIter {
Quads {
iter: DecodingIndexIterator,
store: RocksDbStore,
},
Error(Once<io::Error>),
Empty,
}
impl Iterator for QuadsIter {
type Item = Result<Quad, io::Error>;
fn next(&mut self) -> Option<Result<Quad, io::Error>> {
match self {
Self::Quads { iter, store } => Some(match iter.next()? {
Ok(quad) => store.decode_quad(&quad).map_err(|e| e.into()),
Err(error) => Err(error),
}),
Self::Error(iter) => iter.next().map(Err),
Self::Empty => None,
}
}
}
#[test] #[test]
fn store() -> Result<(), io::Error> { fn store() -> Result<(), io::Error> {
use crate::model::*; use crate::model::*;

@ -4,10 +4,13 @@ use crate::error::invalid_data_error;
use crate::io::{DatasetFormat, GraphFormat}; use crate::io::{DatasetFormat, GraphFormat};
use crate::model::*; use crate::model::*;
use crate::sparql::{EvaluationError, Query, QueryOptions, QueryResult, SimplePreparedQuery}; use crate::sparql::{EvaluationError, Query, QueryOptions, QueryResult, SimplePreparedQuery};
use crate::store::numeric_encoder::*; use crate::store::numeric_encoder::{
write_term, Decoder, ReadEncoder, StrContainer, StrHash, StrLookup, TermReader, WithStoreError,
WriteEncoder, WRITTEN_TERM_MAX_SIZE,
};
use crate::store::{ use crate::store::{
dump_dataset, dump_graph, load_dataset, load_graph, ReadableEncodedStore, StoreOrParseError, dump_dataset, dump_graph, get_encoded_quad_pattern, load_dataset, load_graph,
WritableEncodedStore, ReadableEncodedStore, StoreOrParseError, WritableEncodedStore,
}; };
use sled::transaction::{ use sled::transaction::{
ConflictableTransactionError, TransactionError, Transactional, TransactionalTree, ConflictableTransactionError, TransactionError, Transactional, TransactionalTree,
@ -17,6 +20,7 @@ use sled::{Config, Iter, Tree};
use std::convert::TryInto; use std::convert::TryInto;
use std::error::Error; use std::error::Error;
use std::io::{BufRead, Cursor, Write}; use std::io::{BufRead, Cursor, Write};
use std::iter::{once, Once};
use std::path::Path; use std::path::Path;
use std::{fmt, io, str}; use std::{fmt, io, str};
@ -67,6 +71,8 @@ const OSPG_PREFIX: u8 = 3;
const GSPO_PREFIX: u8 = 4; const GSPO_PREFIX: u8 = 4;
const GPOS_PREFIX: u8 = 5; const GPOS_PREFIX: u8 = 5;
const GOSP_PREFIX: u8 = 6; const GOSP_PREFIX: u8 = 6;
type EncodedTerm = crate::store::numeric_encoder::EncodedTerm<StrHash>;
type EncodedQuad = crate::store::numeric_encoder::EncodedQuad<StrHash>;
//TODO: indexes for the default graph and indexes for the named graphs (no more Optional and space saving) //TODO: indexes for the default graph and indexes for the named graphs (no more Optional and space saving)
@ -125,19 +131,23 @@ impl SledStore {
object: Option<&Term>, object: Option<&Term>,
graph_name: Option<&GraphName>, graph_name: Option<&GraphName>,
) -> impl Iterator<Item = Result<Quad, io::Error>> { ) -> impl Iterator<Item = Result<Quad, io::Error>> {
let subject = subject.map(|s| s.into()); match get_encoded_quad_pattern(self, subject, predicate, object, graph_name) {
let predicate = predicate.map(|p| p.into()); Ok(Some((subject, predicate, object, graph_name))) => QuadsIter::Quads {
let object = object.map(|o| o.into()); iter: self.encoded_quads_for_pattern(subject, predicate, object, graph_name),
let graph_name = graph_name.map(|g| g.into()); store: self.clone(),
let this = self.clone(); },
self.encoded_quads_for_pattern(subject, predicate, object, graph_name) Ok(None) => QuadsIter::Empty,
.map(move |quad| Ok(this.decode_quad(&quad?)?)) Err(error) => QuadsIter::Error(once(error)),
}
} }
/// Checks if this store contains a given quad /// Checks if this store contains a given quad
pub fn contains(&self, quad: &Quad) -> Result<bool, io::Error> { pub fn contains(&self, quad: &Quad) -> Result<bool, io::Error> {
let quad = quad.into(); if let Some(quad) = self.get_encoded_quad(quad)? {
self.contains_encoded(&quad) self.contains_encoded(&quad)
} else {
Ok(false)
}
} }
/// Returns the number of quads in the store /// Returns the number of quads in the store
@ -237,9 +247,12 @@ impl SledStore {
/// Removes a quad from this store. /// Removes a quad from this store.
pub fn remove(&self, quad: &Quad) -> Result<(), io::Error> { pub fn remove(&self, quad: &Quad) -> Result<(), io::Error> {
let mut this = self; if let Some(quad) = self.get_encoded_quad(quad)? {
let quad = quad.into(); let mut this = self;
this.remove_encoded(&quad) this.remove_encoded(&quad)
} else {
Ok(())
}
} }
/// Dumps a store graph into a file. /// Dumps a store graph into a file.
@ -408,6 +421,7 @@ impl fmt::Display for SledStore {
impl WithStoreError for SledStore { impl WithStoreError for SledStore {
type Error = io::Error; type Error = io::Error;
type StrId = StrHash;
} }
impl StrLookup for SledStore { impl StrLookup for SledStore {
@ -418,6 +432,15 @@ impl StrLookup for SledStore {
.transpose() .transpose()
.map_err(invalid_data_error) .map_err(invalid_data_error)
} }
fn get_str_id(&self, value: &str) -> Result<Option<StrHash>, io::Error> {
let id = StrHash::new(value);
Ok(if self.id2str.contains_key(&id.to_be_bytes())? {
Some(id)
} else {
None
})
}
} }
impl ReadableEncodedStore for SledStore { impl ReadableEncodedStore for SledStore {
@ -491,10 +514,6 @@ impl ReadableEncodedStore for SledStore {
} }
} }
impl<'a> WithStoreError for &'a SledStore {
type Error = io::Error;
}
impl<'a> StrContainer for &'a SledStore { impl<'a> StrContainer for &'a SledStore {
fn insert_str(&mut self, value: &str) -> Result<StrHash, io::Error> { fn insert_str(&mut self, value: &str) -> Result<StrHash, io::Error> {
let key = StrHash::new(value); let key = StrHash::new(value);
@ -625,13 +644,36 @@ impl SledTransaction<'_> {
/// Removes a quad from this store during the transaction. /// Removes a quad from this store during the transaction.
pub fn remove(&self, quad: &Quad) -> Result<(), SledUnabortableTransactionError> { pub fn remove(&self, quad: &Quad) -> Result<(), SledUnabortableTransactionError> {
let mut this = self; let mut this = self;
let quad = quad.into(); if let Some(quad) = this.get_encoded_quad(quad)? {
this.remove_encoded(&quad) this.remove_encoded(&quad)
} else {
Ok(())
}
} }
} }
impl<'a> WithStoreError for &'a SledTransaction<'a> { impl<'a> WithStoreError for &'a SledTransaction<'a> {
type Error = SledUnabortableTransactionError; type Error = SledUnabortableTransactionError;
type StrId = StrHash;
}
impl<'a> StrLookup for &'a SledTransaction<'a> {
fn get_str(&self, id: StrHash) -> Result<Option<String>, SledUnabortableTransactionError> {
self.id2str
.get(id.to_be_bytes())?
.map(|v| String::from_utf8(v.to_vec()))
.transpose()
.map_err(|e| SledUnabortableTransactionError::Storage(invalid_data_error(e)))
}
fn get_str_id(&self, value: &str) -> Result<Option<StrHash>, SledUnabortableTransactionError> {
let id = StrHash::new(value);
Ok(if self.id2str.get(&id.to_be_bytes())?.is_some() {
Some(id)
} else {
None
})
}
} }
impl<'a> StrContainer for &'a SledTransaction<'a> { impl<'a> StrContainer for &'a SledTransaction<'a> {
@ -984,6 +1026,30 @@ fn decode_quad(encoded: &[u8]) -> Result<EncodedQuad, io::Error> {
} }
} }
enum QuadsIter {
Quads {
iter: DecodingQuadIterator,
store: SledStore,
},
Error(Once<io::Error>),
Empty,
}
impl Iterator for QuadsIter {
type Item = Result<Quad, io::Error>;
fn next(&mut self) -> Option<Result<Quad, io::Error>> {
match self {
Self::Quads { iter, store } => Some(match iter.next()? {
Ok(quad) => store.decode_quad(&quad).map_err(|e| e.into()),
Err(error) => Err(error),
}),
Self::Error(iter) => iter.next().map(Err),
Self::Empty => None,
}
}
}
#[test] #[test]
fn store() -> Result<(), io::Error> { fn store() -> Result<(), io::Error> {
use crate::model::*; use crate::model::*;

@ -267,7 +267,7 @@ impl WikibaseLoader {
.quads_for_pattern(None, None, None, Some(&graph_name)) .quads_for_pattern(None, None, None, Some(&graph_name))
.collect::<std::result::Result<Vec<_>, _>>()?; .collect::<std::result::Result<Vec<_>, _>>()?;
for q in to_remove { for q in to_remove {
transaction.remove(&q); transaction.remove(&q)?;
} }
transaction.load_graph( transaction.load_graph(

Loading…
Cancel
Save