Make StrContainer.insert_str mutable

Allows to avoid some not useful RwLock
pull/10/head
Tpt 5 years ago
parent df8e265f0f
commit 88a97aa904
  1. 5
      lib/src/sparql/eval.rs
  2. 14
      lib/src/sparql/mod.rs
  3. 25
      lib/src/sparql/plan.rs
  4. 26
      lib/src/sparql/plan_builder.rs
  5. 79
      lib/src/store/memory.rs
  6. 19
      lib/src/store/mod.rs
  7. 255
      lib/src/store/numeric_encoder.rs
  8. 2
      lib/src/store/rocksdb.rs

@ -1230,6 +1230,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
None None
}?; }?;
self.dataset self.dataset
.encoder()
.encode_rio_literal(rio::Literal::Typed { .encode_rio_literal(rio::Literal::Typed {
value: &value, value: &value,
datatype: rio::NamedNode { iri: &datatype }, datatype: rio::NamedNode { iri: &datatype },
@ -1474,7 +1475,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
fn build_string_id(&self, value: &str) -> Option<u128> { fn build_string_id(&self, value: &str) -> Option<u128> {
let value_id = get_str_id(value); let value_id = get_str_id(value);
self.dataset.insert_str(value_id, value).ok()?; self.dataset.encoder().insert_str(value_id, value).ok()?;
Some(value_id) Some(value_id)
} }
@ -2211,7 +2212,7 @@ fn get_triple_template_value(
} }
fn decode_triple( fn decode_triple(
decoder: impl Decoder, decoder: &impl Decoder,
subject: EncodedTerm, subject: EncodedTerm,
predicate: EncodedTerm, predicate: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,

@ -64,7 +64,7 @@ impl<S: StoreConnection> SimplePreparedQuery<S> {
dataset: _, dataset: _,
base_iri, base_iri,
} => { } => {
let (plan, variables) = PlanBuilder::build(&dataset, &algebra)?; let (plan, variables) = PlanBuilder::build(dataset.encoder(), &algebra)?;
SimplePreparedQueryOptions::Select { SimplePreparedQueryOptions::Select {
plan, plan,
variables, variables,
@ -76,7 +76,7 @@ impl<S: StoreConnection> SimplePreparedQuery<S> {
dataset: _, dataset: _,
base_iri, base_iri,
} => { } => {
let (plan, _) = PlanBuilder::build(&dataset, &algebra)?; let (plan, _) = PlanBuilder::build(dataset.encoder(), &algebra)?;
SimplePreparedQueryOptions::Ask { SimplePreparedQueryOptions::Ask {
plan, plan,
evaluator: SimpleEvaluator::new(dataset, base_iri), evaluator: SimpleEvaluator::new(dataset, base_iri),
@ -88,10 +88,14 @@ impl<S: StoreConnection> SimplePreparedQuery<S> {
dataset: _, dataset: _,
base_iri, base_iri,
} => { } => {
let (plan, variables) = PlanBuilder::build(&dataset, &algebra)?; let (plan, variables) = PlanBuilder::build(dataset.encoder(), &algebra)?;
SimplePreparedQueryOptions::Construct { SimplePreparedQueryOptions::Construct {
plan, plan,
construct: PlanBuilder::build_graph_template(&dataset, &construct, variables)?, construct: PlanBuilder::build_graph_template(
dataset.encoder(),
&construct,
variables,
)?,
evaluator: SimpleEvaluator::new(dataset, base_iri), evaluator: SimpleEvaluator::new(dataset, base_iri),
} }
} }
@ -100,7 +104,7 @@ impl<S: StoreConnection> SimplePreparedQuery<S> {
dataset: _, dataset: _,
base_iri, base_iri,
} => { } => {
let (plan, _) = PlanBuilder::build(&dataset, &algebra)?; let (plan, _) = PlanBuilder::build(dataset.encoder(), &algebra)?;
SimplePreparedQueryOptions::Describe { SimplePreparedQueryOptions::Describe {
plan, plan,
evaluator: SimpleEvaluator::new(dataset, base_iri), evaluator: SimpleEvaluator::new(dataset, base_iri),

@ -1,9 +1,10 @@
use crate::sparql::eval::StringOrStoreString; use crate::sparql::eval::StringOrStoreString;
use crate::store::numeric_encoder::{ use crate::store::numeric_encoder::{
EncodedQuad, EncodedTerm, MemoryStrStore, StrContainer, StrLookup, EncodedQuad, EncodedTerm, Encoder, MemoryStrStore, StrContainer, StrLookup,
}; };
use crate::store::StoreConnection; use crate::store::StoreConnection;
use crate::Result; use crate::Result;
use std::cell::{RefCell, RefMut};
use std::collections::BTreeSet; use std::collections::BTreeSet;
pub type EncodedTuple = Vec<Option<EncodedTerm>>; pub type EncodedTuple = Vec<Option<EncodedTerm>>;
@ -460,14 +461,14 @@ pub enum TripleTemplateValue {
pub struct DatasetView<S: StoreConnection> { pub struct DatasetView<S: StoreConnection> {
store: S, store: S,
extra: MemoryStrStore, extra: RefCell<MemoryStrStore>,
} }
impl<S: StoreConnection> DatasetView<S> { impl<S: StoreConnection> DatasetView<S> {
pub fn new(store: S) -> Self { pub fn new(store: S) -> Self {
Self { Self {
store, store,
extra: MemoryStrStore::default(), extra: RefCell::new(MemoryStrStore::default()),
} }
} }
@ -481,13 +482,20 @@ impl<S: StoreConnection> DatasetView<S> {
self.store self.store
.quads_for_pattern(subject, predicate, object, graph_name) .quads_for_pattern(subject, predicate, object, graph_name)
} }
pub fn encoder<'a>(&'a self) -> impl Encoder + StrContainer + 'a {
DatasetViewStrContainer {
store: &self.store,
extra: self.extra.borrow_mut(),
}
}
} }
impl<S: StoreConnection> StrLookup for DatasetView<S> { impl<S: StoreConnection> StrLookup for DatasetView<S> {
type StrType = StringOrStoreString<S::StrType>; type StrType = StringOrStoreString<S::StrType>;
fn get_str(&self, id: u128) -> Result<Option<StringOrStoreString<S::StrType>>> { fn get_str(&self, id: u128) -> Result<Option<StringOrStoreString<S::StrType>>> {
Ok(if let Some(value) = self.extra.get_str(id)? { Ok(if let Some(value) = self.extra.borrow().get_str(id)? {
Some(StringOrStoreString::String(value)) Some(StringOrStoreString::String(value))
} else if let Some(value) = self.store.get_str(id)? { } else if let Some(value) = self.store.get_str(id)? {
Some(StringOrStoreString::Store(value)) Some(StringOrStoreString::Store(value))
@ -497,8 +505,13 @@ impl<S: StoreConnection> StrLookup for DatasetView<S> {
} }
} }
impl<S: StoreConnection> StrContainer for DatasetView<S> { struct DatasetViewStrContainer<'a, S: StoreConnection> {
fn insert_str(&self, key: u128, value: &str) -> Result<()> { store: &'a S,
extra: RefMut<'a, MemoryStrStore>,
}
impl<'a, S: StoreConnection> StrContainer for DatasetViewStrContainer<'a, S> {
fn insert_str(&mut self, key: u128, value: &str) -> Result<()> {
if self.store.get_str(key)?.is_none() { if self.store.get_str(key)?.is_none() {
self.extra.insert_str(key, value) self.extra.insert_str(key, value)
} else { } else {

@ -33,7 +33,7 @@ impl<E: Encoder> PlanBuilder<E> {
} }
fn build_for_graph_pattern( fn build_for_graph_pattern(
&self, &mut self,
pattern: &GraphPattern, pattern: &GraphPattern,
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
graph_name: PatternValue, graph_name: PatternValue,
@ -197,7 +197,7 @@ impl<E: Encoder> PlanBuilder<E> {
} }
fn build_for_bgp( fn build_for_bgp(
&self, &mut self,
p: &[TripleOrPathPattern], p: &[TripleOrPathPattern],
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
graph_name: PatternValue, graph_name: PatternValue,
@ -227,7 +227,7 @@ impl<E: Encoder> PlanBuilder<E> {
Ok(plan) Ok(plan)
} }
fn build_for_path(&self, path: &PropertyPath) -> Result<PlanPropertyPath> { fn build_for_path(&mut self, path: &PropertyPath) -> Result<PlanPropertyPath> {
Ok(match path { Ok(match path {
PropertyPath::PredicatePath(p) => { PropertyPath::PredicatePath(p) => {
PlanPropertyPath::PredicatePath(self.encoder.encode_named_node(p)?) PlanPropertyPath::PredicatePath(self.encoder.encode_named_node(p)?)
@ -261,7 +261,7 @@ impl<E: Encoder> PlanBuilder<E> {
} }
fn build_for_expression( fn build_for_expression(
&self, &mut self,
expression: &Expression, expression: &Expression,
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
graph_name: PatternValue, graph_name: PatternValue,
@ -650,7 +650,7 @@ impl<E: Encoder> PlanBuilder<E> {
} }
fn build_cast( fn build_cast(
&self, &mut self,
parameters: &[Expression], parameters: &[Expression],
constructor: impl Fn(Box<PlanExpression>) -> PlanExpression, constructor: impl Fn(Box<PlanExpression>) -> PlanExpression,
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
@ -672,7 +672,7 @@ impl<E: Encoder> PlanBuilder<E> {
} }
fn expression_list( fn expression_list(
&self, &mut self,
l: &[Expression], l: &[Expression],
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
graph_name: PatternValue, graph_name: PatternValue,
@ -683,7 +683,7 @@ impl<E: Encoder> PlanBuilder<E> {
} }
fn pattern_value_from_term_or_variable( fn pattern_value_from_term_or_variable(
&self, &mut self,
term_or_variable: &TermOrVariable, term_or_variable: &TermOrVariable,
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
) -> Result<PatternValue> { ) -> Result<PatternValue> {
@ -696,7 +696,7 @@ impl<E: Encoder> PlanBuilder<E> {
} }
fn pattern_value_from_named_node_or_variable( fn pattern_value_from_named_node_or_variable(
&self, &mut self,
named_node_or_variable: &NamedNodeOrVariable, named_node_or_variable: &NamedNodeOrVariable,
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
) -> Result<PatternValue> { ) -> Result<PatternValue> {
@ -711,7 +711,7 @@ impl<E: Encoder> PlanBuilder<E> {
} }
fn encode_bindings( fn encode_bindings(
&self, &mut self,
bindings: &StaticBindings, bindings: &StaticBindings,
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
) -> Result<Vec<EncodedTuple>> { ) -> Result<Vec<EncodedTuple>> {
@ -736,7 +736,7 @@ impl<E: Encoder> PlanBuilder<E> {
} }
fn build_for_aggregate( fn build_for_aggregate(
&self, &mut self,
aggregate: &Aggregation, aggregate: &Aggregation,
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
graph_name: PatternValue, graph_name: PatternValue,
@ -786,7 +786,7 @@ impl<E: Encoder> PlanBuilder<E> {
} }
fn build_for_graph_template( fn build_for_graph_template(
&self, &mut self,
template: &[TriplePattern], template: &[TriplePattern],
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
) -> Result<Vec<TripleTemplate>> { ) -> Result<Vec<TripleTemplate>> {
@ -816,7 +816,7 @@ impl<E: Encoder> PlanBuilder<E> {
} }
fn template_value_from_term_or_variable( fn template_value_from_term_or_variable(
&self, &mut self,
term_or_variable: &TermOrVariable, term_or_variable: &TermOrVariable,
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
bnodes: &mut Vec<Variable>, bnodes: &mut Vec<Variable>,
@ -836,7 +836,7 @@ impl<E: Encoder> PlanBuilder<E> {
} }
fn template_value_from_named_node_or_variable( fn template_value_from_named_node_or_variable(
&self, &mut self,
named_node_or_variable: &NamedNodeOrVariable, named_node_or_variable: &NamedNodeOrVariable,
variables: &mut Vec<Variable>, variables: &mut Vec<Variable>,
bnodes: &mut Vec<Variable>, bnodes: &mut Vec<Variable>,

@ -1,9 +1,10 @@
use crate::store::numeric_encoder::*; use crate::store::numeric_encoder::*;
use crate::store::*; use crate::store::*;
use crate::{Repository, Result}; use crate::{Repository, Result};
use failure::{Backtrace, Fail};
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
use std::iter::{empty, once}; use std::iter::{empty, once};
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::sync::{PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard};
/// Memory based implementation of the `Repository` trait. /// Memory based implementation of the `Repository` trait.
/// They are cheap to build using the `MemoryRepository::default()` method. /// They are cheap to build using the `MemoryRepository::default()` method.
@ -45,8 +46,7 @@ type QuadMap<T> = BTreeMap<T, TripleMap<T>>;
#[derive(Default)] #[derive(Default)]
pub struct MemoryStore { pub struct MemoryStore {
str_store: MemoryStrStore, indexes: RwLock<MemoryStoreIndexes>,
quad_indexes: RwLock<MemoryStoreIndexes>,
} }
#[derive(Default)] #[derive(Default)]
@ -57,6 +57,7 @@ struct MemoryStoreIndexes {
gspo: QuadMap<EncodedTerm>, gspo: QuadMap<EncodedTerm>,
gpos: QuadMap<EncodedTerm>, gpos: QuadMap<EncodedTerm>,
gosp: QuadMap<EncodedTerm>, gosp: QuadMap<EncodedTerm>,
str_store: MemoryStrStore,
} }
impl<'a> Repository for &'a MemoryRepository { impl<'a> Repository for &'a MemoryRepository {
@ -75,24 +76,24 @@ impl<'a> Store for &'a MemoryStore {
} }
} }
impl StrLookup for MemoryStore { impl<'a> StrLookup for &'a MemoryStore {
type StrType = String; type StrType = String;
fn get_str(&self, id: u128) -> Result<Option<String>> { fn get_str(&self, id: u128) -> Result<Option<String>> {
self.str_store.get_str(id) self.indexes()?.str_store.get_str(id)
} }
} }
impl StrContainer for MemoryStore { impl<'a> StrContainer for &'a MemoryStore {
fn insert_str(&self, key: u128, value: &str) -> Result<()> { fn insert_str(&mut self, key: u128, value: &str) -> Result<()> {
self.str_store.insert_str(key, value) self.indexes_mut()?.str_store.insert_str(key, value)
} }
} }
impl<'a> StoreConnection for &'a MemoryStore { impl<'a> StoreConnection for &'a MemoryStore {
fn contains(&self, quad: &EncodedQuad) -> Result<bool> { fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
Ok(self Ok(self
.quad_indexes()? .indexes()?
.spog .spog
.get(&quad.subject) .get(&quad.subject)
.map_or(false, |pog| { .map_or(false, |pog| {
@ -104,7 +105,7 @@ impl<'a> StoreConnection for &'a MemoryStore {
} }
fn insert(&mut self, quad: &EncodedQuad) -> Result<()> { fn insert(&mut self, quad: &EncodedQuad) -> Result<()> {
let mut quad_indexes = self.quad_indexes_mut()?; let mut quad_indexes = self.indexes_mut()?;
insert_into_quad_map( insert_into_quad_map(
&mut quad_indexes.gosp, &mut quad_indexes.gosp,
quad.graph_name, quad.graph_name,
@ -151,7 +152,7 @@ impl<'a> StoreConnection for &'a MemoryStore {
} }
fn remove(&mut self, quad: &EncodedQuad) -> Result<()> { fn remove(&mut self, quad: &EncodedQuad) -> Result<()> {
let mut quad_indexes = self.quad_indexes_mut()?; let mut quad_indexes = self.indexes_mut()?;
remove_from_quad_map( remove_from_quad_map(
&mut quad_indexes.gosp, &mut quad_indexes.gosp,
&quad.graph_name, &quad.graph_name,
@ -275,16 +276,16 @@ impl<'a> StoreConnection for &'a MemoryStore {
} }
impl MemoryStore { impl MemoryStore {
fn quad_indexes(&self) -> Result<RwLockReadGuard<'_, MemoryStoreIndexes>> { fn indexes(&self) -> Result<RwLockReadGuard<'_, MemoryStoreIndexes>> {
Ok(self.quad_indexes.read().map_err(MutexPoisonError::from)?) Ok(self.indexes.read().map_err(MutexPoisonError::from)?)
} }
fn quad_indexes_mut(&self) -> Result<RwLockWriteGuard<'_, MemoryStoreIndexes>> { fn indexes_mut(&self) -> Result<RwLockWriteGuard<'_, MemoryStoreIndexes>> {
Ok(self.quad_indexes.write().map_err(MutexPoisonError::from)?) Ok(self.indexes.write().map_err(MutexPoisonError::from)?)
} }
fn quads<'a>(&'a self) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> { fn quads<'a>(&'a self) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
Ok(quad_map_flatten(&self.quad_indexes()?.gspo) Ok(quad_map_flatten(&self.indexes()?.gspo)
.map(|(g, s, p, o)| Ok(EncodedQuad::new(s, p, o, g))) .map(|(g, s, p, o)| Ok(EncodedQuad::new(s, p, o, g)))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.into_iter()) .into_iter())
@ -295,7 +296,7 @@ impl MemoryStore {
subject: EncodedTerm, subject: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok( Ok(
option_triple_map_flatten(self.quad_indexes()?.spog.get(&subject)) option_triple_map_flatten(self.indexes()?.spog.get(&subject))
.map(|(p, o, g)| Ok(EncodedQuad::new(subject, p, o, g))) .map(|(p, o, g)| Ok(EncodedQuad::new(subject, p, o, g)))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.into_iter(), .into_iter(),
@ -308,7 +309,7 @@ impl MemoryStore {
predicate: EncodedTerm, predicate: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok(option_pair_map_flatten( Ok(option_pair_map_flatten(
self.quad_indexes()? self.indexes()?
.spog .spog
.get(&subject) .get(&subject)
.and_then(|pog| pog.get(&predicate)), .and_then(|pog| pog.get(&predicate)),
@ -325,7 +326,7 @@ impl MemoryStore {
object: EncodedTerm, object: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok(option_set_flatten( Ok(option_set_flatten(
self.quad_indexes()? self.indexes()?
.spog .spog
.get(&subject) .get(&subject)
.and_then(|pog| pog.get(&predicate)) .and_then(|pog| pog.get(&predicate))
@ -342,7 +343,7 @@ impl MemoryStore {
object: EncodedTerm, object: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok(option_pair_map_flatten( Ok(option_pair_map_flatten(
self.quad_indexes()? self.indexes()?
.ospg .ospg
.get(&object) .get(&object)
.and_then(|spg| spg.get(&subject)), .and_then(|spg| spg.get(&subject)),
@ -357,7 +358,7 @@ impl MemoryStore {
predicate: EncodedTerm, predicate: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok( Ok(
option_triple_map_flatten(self.quad_indexes()?.posg.get(&predicate)) option_triple_map_flatten(self.indexes()?.posg.get(&predicate))
.map(|(o, s, g)| Ok(EncodedQuad::new(s, predicate, o, g))) .map(|(o, s, g)| Ok(EncodedQuad::new(s, predicate, o, g)))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.into_iter(), .into_iter(),
@ -370,7 +371,7 @@ impl MemoryStore {
object: EncodedTerm, object: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok(option_pair_map_flatten( Ok(option_pair_map_flatten(
self.quad_indexes()? self.indexes()?
.posg .posg
.get(&predicate) .get(&predicate)
.and_then(|osg| osg.get(&object)), .and_then(|osg| osg.get(&object)),
@ -384,12 +385,10 @@ impl MemoryStore {
&self, &self,
object: EncodedTerm, object: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok( Ok(option_triple_map_flatten(self.indexes()?.ospg.get(&object))
option_triple_map_flatten(self.quad_indexes()?.ospg.get(&object))
.map(|(s, p, g)| Ok(EncodedQuad::new(s, p, object, g))) .map(|(s, p, g)| Ok(EncodedQuad::new(s, p, object, g)))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.into_iter(), .into_iter())
)
} }
fn quads_for_graph( fn quads_for_graph(
@ -397,7 +396,7 @@ impl MemoryStore {
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok( Ok(
option_triple_map_flatten(self.quad_indexes()?.gspo.get(&graph_name)) option_triple_map_flatten(self.indexes()?.gspo.get(&graph_name))
.map(|(s, p, o)| Ok(EncodedQuad::new(s, p, o, graph_name))) .map(|(s, p, o)| Ok(EncodedQuad::new(s, p, o, graph_name)))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.into_iter(), .into_iter(),
@ -410,7 +409,7 @@ impl MemoryStore {
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok(option_pair_map_flatten( Ok(option_pair_map_flatten(
self.quad_indexes()? self.indexes()?
.gspo .gspo
.get(&graph_name) .get(&graph_name)
.and_then(|spo| spo.get(&subject)), .and_then(|spo| spo.get(&subject)),
@ -427,7 +426,7 @@ impl MemoryStore {
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok(option_set_flatten( Ok(option_set_flatten(
self.quad_indexes()? self.indexes()?
.gspo .gspo
.get(&graph_name) .get(&graph_name)
.and_then(|spo| spo.get(&subject)) .and_then(|spo| spo.get(&subject))
@ -445,7 +444,7 @@ impl MemoryStore {
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok(option_set_flatten( Ok(option_set_flatten(
self.quad_indexes()? self.indexes()?
.gosp .gosp
.get(&graph_name) .get(&graph_name)
.and_then(|osp| osp.get(&object)) .and_then(|osp| osp.get(&object))
@ -462,7 +461,7 @@ impl MemoryStore {
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok(option_pair_map_flatten( Ok(option_pair_map_flatten(
self.quad_indexes()? self.indexes()?
.gpos .gpos
.get(&graph_name) .get(&graph_name)
.and_then(|pos| pos.get(&predicate)), .and_then(|pos| pos.get(&predicate)),
@ -479,7 +478,7 @@ impl MemoryStore {
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok(option_set_flatten( Ok(option_set_flatten(
self.quad_indexes()? self.indexes()?
.gpos .gpos
.get(&graph_name) .get(&graph_name)
.and_then(|pos| pos.get(&predicate)) .and_then(|pos| pos.get(&predicate))
@ -496,7 +495,7 @@ impl MemoryStore {
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok(option_pair_map_flatten( Ok(option_pair_map_flatten(
self.quad_indexes()? self.indexes()?
.gosp .gosp
.get(&graph_name) .get(&graph_name)
.and_then(|osp| osp.get(&object)), .and_then(|osp| osp.get(&object)),
@ -592,3 +591,17 @@ fn quad_map_flatten<'a, T: Copy>(gspo: &'a QuadMap<T>) -> impl Iterator<Item = (
}) })
}) })
} }
#[derive(Debug, Fail)]
#[fail(display = "Mutex Mutex was poisoned")]
pub struct MutexPoisonError {
backtrace: Backtrace,
}
impl<T> From<PoisonError<T>> for MutexPoisonError {
fn from(_: PoisonError<T>) -> Self {
Self {
backtrace: Backtrace::new(),
}
}
}

@ -123,11 +123,13 @@ impl<S: StoreConnection> RepositoryConnection for StoreRepositoryConnection<S> {
} }
fn insert(&mut self, quad: &Quad) -> Result<()> { fn insert(&mut self, quad: &Quad) -> Result<()> {
self.inner.insert(&self.inner.encode_quad(quad)?) let quad = self.inner.encode_quad(quad)?;
self.inner.insert(&quad)
} }
fn remove(&mut self, quad: &Quad) -> Result<()> { fn remove(&mut self, quad: &Quad) -> Result<()> {
self.inner.remove(&self.inner.encode_quad(quad)?) let quad = self.inner.encode_quad(quad)?;
self.inner.remove(&quad)
} }
} }
@ -147,11 +149,10 @@ impl<S: StoreConnection> StoreRepositoryConnection<S> {
EncodedTerm::DefaultGraph EncodedTerm::DefaultGraph
}; };
parser.parse_all(&mut move |t| { parser.parse_all(&mut move |t| {
self.inner.insert(&self.inner.encode_rio_triple_in_graph( let quad = self
t, .inner
graph_name, .encode_rio_triple_in_graph(t, graph_name, &mut bnode_map)?;
&mut bnode_map, self.inner.insert(&quad)
)?)
})?; })?;
Ok(()) Ok(())
} }
@ -162,8 +163,8 @@ impl<S: StoreConnection> StoreRepositoryConnection<S> {
{ {
let mut bnode_map = HashMap::default(); let mut bnode_map = HashMap::default();
parser.parse_all(&mut move |q| { parser.parse_all(&mut move |q| {
self.inner let quad = self.inner.encode_rio_quad(q, &mut bnode_map)?;
.insert(&self.inner.encode_rio_quad(q, &mut bnode_map)?) self.inner.insert(&quad)
})?; })?;
Ok(()) Ok(())
} }

@ -6,8 +6,6 @@ use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use chrono::format::{parse, Parsed, StrftimeItems}; use chrono::format::{parse, Parsed, StrftimeItems};
use chrono::prelude::*; use chrono::prelude::*;
use failure::format_err; use failure::format_err;
use failure::Backtrace;
use failure::Fail;
use md5::digest::Digest; use md5::digest::Digest;
use md5::Md5; use md5::Md5;
use ordered_float::OrderedFloat; use ordered_float::OrderedFloat;
@ -20,8 +18,6 @@ use std::io::Write;
use std::mem::size_of; use std::mem::size_of;
use std::ops::Deref; use std::ops::Deref;
use std::str; use std::str;
use std::sync::PoisonError;
use std::sync::RwLock;
const EMPTY_STRING_ID: u128 = 167830467844043968176572005485231480276; const EMPTY_STRING_ID: u128 = 167830467844043968176572005485231480276;
const RDF_LANG_STRING_ID: u128 = 32982328051974780078994098831023510434; const RDF_LANG_STRING_ID: u128 = 32982328051974780078994098831023510434;
@ -703,10 +699,10 @@ pub trait StrLookup {
} }
pub trait StrContainer { pub trait StrContainer {
fn insert_str(&self, key: u128, value: &str) -> Result<()>; fn insert_str(&mut self, key: u128, value: &str) -> Result<()>;
/// Should be called when the bytes store is created /// Should be called when the bytes store is created
fn set_first_strings(&self) -> Result<()> { fn set_first_strings(&mut self) -> Result<()> {
self.insert_str(EMPTY_STRING_ID, "")?; self.insert_str(EMPTY_STRING_ID, "")?;
self.insert_str(RDF_LANG_STRING_ID, rdf::LANG_STRING.as_str())?; self.insert_str(RDF_LANG_STRING_ID, rdf::LANG_STRING.as_str())?;
self.insert_str(XSD_STRING_ID, xsd::STRING.as_str())?; self.insert_str(XSD_STRING_ID, xsd::STRING.as_str())?;
@ -722,28 +718,14 @@ pub trait StrContainer {
} }
} }
impl<'a, S: StrLookup + 'a> StrLookup for &'a S {
type StrType = S::StrType;
fn get_str(&self, id: u128) -> Result<Option<S::StrType>> {
(*self).get_str(id)
}
}
impl<'a, S: StrContainer + 'a> StrContainer for &'a S {
fn insert_str(&self, key: u128, value: &str) -> Result<()> {
(*self).insert_str(key, value)
}
}
pub struct MemoryStrStore { pub struct MemoryStrStore {
id2str: RwLock<HashMap<u128, String>>, id2str: HashMap<u128, String>,
} }
impl Default for MemoryStrStore { impl Default for MemoryStrStore {
fn default() -> Self { fn default() -> Self {
let new = Self { let mut new = Self {
id2str: RwLock::default(), id2str: HashMap::default(),
}; };
new.set_first_strings().unwrap(); new.set_first_strings().unwrap();
new new
@ -755,82 +737,19 @@ impl StrLookup for MemoryStrStore {
fn get_str(&self, id: u128) -> Result<Option<String>> { fn get_str(&self, id: u128) -> Result<Option<String>> {
//TODO: avoid copy by adding a lifetime limit to get_str //TODO: avoid copy by adding a lifetime limit to get_str
Ok(self Ok(self.id2str.get(&id).cloned())
.id2str
.read()
.map_err(MutexPoisonError::from)?
.get(&id)
.cloned())
} }
} }
impl StrContainer for MemoryStrStore { impl StrContainer for MemoryStrStore {
fn insert_str(&self, key: u128, value: &str) -> Result<()> { fn insert_str(&mut self, key: u128, value: &str) -> Result<()> {
let mut id2str = self.id2str.write().map_err(MutexPoisonError::from)?; self.id2str.entry(key).or_insert_with(|| value.to_owned());
if !id2str.contains_key(&key) {
id2str.insert(key, value.to_owned());
}
Ok(()) Ok(())
} }
} }
pub trait Encoder { pub trait Encoder {
fn encode_named_node(&self, named_node: &NamedNode) -> Result<EncodedTerm>; fn encode_named_node(&mut self, named_node: &NamedNode) -> Result<EncodedTerm> {
fn encode_blank_node(&self, blank_node: &BlankNode) -> Result<EncodedTerm>;
fn encode_literal(&self, literal: &Literal) -> Result<EncodedTerm>;
fn encode_named_or_blank_node(&self, term: &NamedOrBlankNode) -> Result<EncodedTerm>;
fn encode_term(&self, term: &Term) -> Result<EncodedTerm>;
fn encode_quad(&self, quad: &Quad) -> Result<EncodedQuad>;
fn encode_triple_in_graph(
&self,
triple: &Triple,
graph_name: EncodedTerm,
) -> Result<EncodedQuad>;
fn encode_rio_named_node(&self, named_node: rio::NamedNode) -> Result<EncodedTerm>;
fn encode_rio_blank_node(
&self,
blank_node: rio::BlankNode,
bnodes_map: &mut HashMap<String, u128>,
) -> Result<EncodedTerm>;
fn encode_rio_literal(&self, literal: rio::Literal) -> Result<EncodedTerm>;
fn encode_rio_named_or_blank_node(
&self,
term: rio::NamedOrBlankNode,
bnodes_map: &mut HashMap<String, u128>,
) -> Result<EncodedTerm>;
fn encode_rio_term(
&self,
term: rio::Term,
bnodes_map: &mut HashMap<String, u128>,
) -> Result<EncodedTerm>;
fn encode_rio_quad(
&self,
quad: rio::Quad,
bnodes_map: &mut HashMap<String, u128>,
) -> Result<EncodedQuad>;
fn encode_rio_triple_in_graph(
&self,
triple: rio::Triple,
graph_name: EncodedTerm,
bnodes_map: &mut HashMap<String, u128>,
) -> Result<EncodedQuad>;
}
impl<S: StrContainer> Encoder for S {
fn encode_named_node(&self, named_node: &NamedNode) -> Result<EncodedTerm> {
self.encode_rio_named_node(named_node.into()) self.encode_rio_named_node(named_node.into())
} }
@ -838,18 +757,18 @@ impl<S: StrContainer> Encoder for S {
Ok(blank_node.into()) Ok(blank_node.into())
} }
fn encode_literal(&self, literal: &Literal) -> Result<EncodedTerm> { fn encode_literal(&mut self, literal: &Literal) -> Result<EncodedTerm> {
self.encode_rio_literal(literal.into()) self.encode_rio_literal(literal.into())
} }
fn encode_named_or_blank_node(&self, term: &NamedOrBlankNode) -> Result<EncodedTerm> { fn encode_named_or_blank_node(&mut self, term: &NamedOrBlankNode) -> Result<EncodedTerm> {
match term { match term {
NamedOrBlankNode::NamedNode(named_node) => self.encode_named_node(named_node), NamedOrBlankNode::NamedNode(named_node) => self.encode_named_node(named_node),
NamedOrBlankNode::BlankNode(blank_node) => self.encode_blank_node(blank_node), NamedOrBlankNode::BlankNode(blank_node) => self.encode_blank_node(blank_node),
} }
} }
fn encode_term(&self, term: &Term) -> Result<EncodedTerm> { fn encode_term(&mut self, term: &Term) -> Result<EncodedTerm> {
match term { match term {
Term::NamedNode(named_node) => self.encode_named_node(named_node), Term::NamedNode(named_node) => self.encode_named_node(named_node),
Term::BlankNode(blank_node) => self.encode_blank_node(blank_node), Term::BlankNode(blank_node) => self.encode_blank_node(blank_node),
@ -857,7 +776,7 @@ impl<S: StrContainer> Encoder for S {
} }
} }
fn encode_quad(&self, quad: &Quad) -> Result<EncodedQuad> { fn encode_quad(&mut self, quad: &Quad) -> Result<EncodedQuad> {
Ok(EncodedQuad { Ok(EncodedQuad {
subject: self.encode_named_or_blank_node(quad.subject())?, subject: self.encode_named_or_blank_node(quad.subject())?,
predicate: self.encode_named_node(quad.predicate())?, predicate: self.encode_named_node(quad.predicate())?,
@ -870,7 +789,7 @@ impl<S: StrContainer> Encoder for S {
} }
fn encode_triple_in_graph( fn encode_triple_in_graph(
&self, &mut self,
triple: &Triple, triple: &Triple,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<EncodedQuad> { ) -> Result<EncodedQuad> {
@ -882,14 +801,81 @@ impl<S: StrContainer> Encoder for S {
}) })
} }
fn encode_rio_named_node(&self, named_node: rio::NamedNode) -> Result<EncodedTerm> { fn encode_rio_named_node(&mut self, named_node: rio::NamedNode) -> Result<EncodedTerm>;
fn encode_rio_blank_node(
&mut self,
blank_node: rio::BlankNode,
bnodes_map: &mut HashMap<String, u128>,
) -> Result<EncodedTerm>;
fn encode_rio_literal(&mut self, literal: rio::Literal) -> Result<EncodedTerm>;
fn encode_rio_named_or_blank_node(
&mut self,
term: rio::NamedOrBlankNode,
bnodes_map: &mut HashMap<String, u128>,
) -> Result<EncodedTerm> {
match term {
rio::NamedOrBlankNode::NamedNode(named_node) => self.encode_rio_named_node(named_node),
rio::NamedOrBlankNode::BlankNode(blank_node) => {
self.encode_rio_blank_node(blank_node, bnodes_map)
}
}
}
fn encode_rio_term(
&mut self,
term: rio::Term,
bnodes_map: &mut HashMap<String, u128>,
) -> Result<EncodedTerm> {
match term {
rio::Term::NamedNode(named_node) => self.encode_rio_named_node(named_node),
rio::Term::BlankNode(blank_node) => self.encode_rio_blank_node(blank_node, bnodes_map),
rio::Term::Literal(literal) => self.encode_rio_literal(literal),
}
}
fn encode_rio_quad(
&mut self,
quad: rio::Quad,
bnodes_map: &mut HashMap<String, u128>,
) -> Result<EncodedQuad> {
Ok(EncodedQuad {
subject: self.encode_rio_named_or_blank_node(quad.subject, bnodes_map)?,
predicate: self.encode_rio_named_node(quad.predicate)?,
object: self.encode_rio_term(quad.object, bnodes_map)?,
graph_name: match quad.graph_name {
Some(graph_name) => self.encode_rio_named_or_blank_node(graph_name, bnodes_map)?,
None => ENCODED_DEFAULT_GRAPH,
},
})
}
fn encode_rio_triple_in_graph(
&mut self,
triple: rio::Triple,
graph_name: EncodedTerm,
bnodes_map: &mut HashMap<String, u128>,
) -> Result<EncodedQuad> {
Ok(EncodedQuad {
subject: self.encode_rio_named_or_blank_node(triple.subject, bnodes_map)?,
predicate: self.encode_rio_named_node(triple.predicate)?,
object: self.encode_rio_term(triple.object, bnodes_map)?,
graph_name,
})
}
}
impl<S: StrContainer> Encoder for S {
fn encode_rio_named_node(&mut self, named_node: rio::NamedNode) -> Result<EncodedTerm> {
let iri_id = get_str_id(named_node.iri); let iri_id = get_str_id(named_node.iri);
self.insert_str(iri_id, named_node.iri)?; self.insert_str(iri_id, named_node.iri)?;
Ok(EncodedTerm::NamedNode { iri_id }) Ok(EncodedTerm::NamedNode { iri_id })
} }
fn encode_rio_blank_node( fn encode_rio_blank_node(
&self, &mut self,
blank_node: rio::BlankNode, blank_node: rio::BlankNode,
bnodes_map: &mut HashMap<String, u128>, bnodes_map: &mut HashMap<String, u128>,
) -> Result<EncodedTerm> { ) -> Result<EncodedTerm> {
@ -902,7 +888,7 @@ impl<S: StrContainer> Encoder for S {
}) })
} }
fn encode_rio_literal(&self, literal: rio::Literal) -> Result<EncodedTerm> { fn encode_rio_literal(&mut self, literal: rio::Literal) -> Result<EncodedTerm> {
Ok(match literal { Ok(match literal {
rio::Literal::Simple { value } => { rio::Literal::Simple { value } => {
let value_id = get_str_id(value); let value_id = get_str_id(value);
@ -978,61 +964,6 @@ impl<S: StrContainer> Encoder for S {
} }
}) })
} }
fn encode_rio_named_or_blank_node(
&self,
term: rio::NamedOrBlankNode,
bnodes_map: &mut HashMap<String, u128>,
) -> Result<EncodedTerm> {
match term {
rio::NamedOrBlankNode::NamedNode(named_node) => self.encode_rio_named_node(named_node),
rio::NamedOrBlankNode::BlankNode(blank_node) => {
self.encode_rio_blank_node(blank_node, bnodes_map)
}
}
}
fn encode_rio_term(
&self,
term: rio::Term,
bnodes_map: &mut HashMap<String, u128>,
) -> Result<EncodedTerm> {
match term {
rio::Term::NamedNode(named_node) => self.encode_rio_named_node(named_node),
rio::Term::BlankNode(blank_node) => self.encode_rio_blank_node(blank_node, bnodes_map),
rio::Term::Literal(literal) => self.encode_rio_literal(literal),
}
}
fn encode_rio_quad(
&self,
quad: rio::Quad,
bnodes_map: &mut HashMap<String, u128>,
) -> Result<EncodedQuad> {
Ok(EncodedQuad {
subject: self.encode_rio_named_or_blank_node(quad.subject, bnodes_map)?,
predicate: self.encode_rio_named_node(quad.predicate)?,
object: self.encode_rio_term(quad.object, bnodes_map)?,
graph_name: match quad.graph_name {
Some(graph_name) => self.encode_rio_named_or_blank_node(graph_name, bnodes_map)?,
None => ENCODED_DEFAULT_GRAPH,
},
})
}
fn encode_rio_triple_in_graph(
&self,
triple: rio::Triple,
graph_name: EncodedTerm,
bnodes_map: &mut HashMap<String, u128>,
) -> Result<EncodedQuad> {
Ok(EncodedQuad {
subject: self.encode_rio_named_or_blank_node(triple.subject, bnodes_map)?,
predicate: self.encode_rio_named_node(triple.predicate)?,
object: self.encode_rio_term(triple.object, bnodes_map)?,
graph_name,
})
}
} }
pub fn parse_boolean_str(value: &str) -> Option<EncodedTerm> { pub fn parse_boolean_str(value: &str) -> Option<EncodedTerm> {
@ -1190,7 +1121,7 @@ impl<S: StrLookup> Decoder for S {
} }
} }
fn get_required_str<S: StrLookup>(lookup: S, id: u128) -> Result<S::StrType> { fn get_required_str<S: StrLookup>(lookup: &S, id: u128) -> Result<S::StrType> {
lookup.get_str(id)?.ok_or_else(|| { lookup.get_str(id)?.ok_or_else(|| {
format_err!( format_err!(
"Not able to find the string with id {} in the string store", "Not able to find the string with id {} in the string store",
@ -1199,23 +1130,9 @@ fn get_required_str<S: StrLookup>(lookup: S, id: u128) -> Result<S::StrType> {
}) })
} }
#[derive(Debug, Fail)]
#[fail(display = "Mutex Mutex was poisoned")]
pub struct MutexPoisonError {
backtrace: Backtrace,
}
impl<T> From<PoisonError<T>> for MutexPoisonError {
fn from(_: PoisonError<T>) -> Self {
Self {
backtrace: Backtrace::new(),
}
}
}
#[test] #[test]
fn test_encoding() { fn test_encoding() {
let store = MemoryStrStore::default(); let mut store = MemoryStrStore::default();
let terms: Vec<Term> = vec![ let terms: Vec<Term> = vec![
NamedNode::new_from_string("http://foo.com").into(), NamedNode::new_from_string("http://foo.com").into(),
NamedNode::new_from_string("http://bar.com").into(), NamedNode::new_from_string("http://bar.com").into(),

@ -146,7 +146,7 @@ impl StrLookup for RocksDbStoreConnection<'_> {
} }
impl StrContainer for RocksDbStoreConnection<'_> { impl StrContainer for RocksDbStoreConnection<'_> {
fn insert_str(&self, key: u128, value: &str) -> Result<()> { fn insert_str(&mut self, key: u128, value: &str) -> Result<()> {
self.store self.store
.db .db
.put_cf(self.id2str_cf, &key.to_le_bytes(), value)?; .put_cf(self.id2str_cf, &key.to_le_bytes(), value)?;

Loading…
Cancel
Save