Avoids to put in Repository StringStore temporary SPARQL execution strings

pull/10/head
Tpt 5 years ago
parent 153eeb1033
commit 78a2d59e19
  1. 179
      lib/src/sparql/eval.rs
  2. 12
      lib/src/store/memory.rs
  3. 54
      lib/src/store/numeric_encoder.rs
  4. 56
      lib/src/store/rocksdb.rs

@ -2,6 +2,7 @@ use crate::model::BlankNode;
use crate::model::Triple;
use crate::sparql::model::*;
use crate::sparql::plan::*;
use crate::store::numeric_encoder::MemoryStringStore;
use crate::store::numeric_encoder::*;
use crate::store::StoreConnection;
use crate::Result;
@ -18,8 +19,10 @@ use std::collections::BTreeMap;
use std::collections::HashSet;
use std::iter::once;
use std::iter::Iterator;
use std::ops::Deref;
use std::sync::Arc;
use std::sync::Mutex;
use std::u64;
use uuid::Uuid;
const REGEX_SIZE_LIMIT: usize = 1_000_000;
@ -28,14 +31,14 @@ type EncodedTuplesIterator<'a> = Box<dyn Iterator<Item = Result<EncodedTuple>> +
#[derive(Clone)]
pub struct SimpleEvaluator<S: StoreConnection> {
store: S,
dataset: DatasetView<S>,
bnodes_map: Arc<Mutex<BTreeMap<u64, BlankNode>>>,
}
impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
pub fn new(store: S) -> Self {
pub fn new(dataset: S) -> Self {
Self {
store,
dataset: DatasetView::new(dataset),
bnodes_map: Arc::new(Mutex::new(BTreeMap::default())),
}
}
@ -74,7 +77,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
'a: 'b,
{
Ok(QueryResult::Graph(Box::new(ConstructIterator {
store: self.store.clone(),
dataset: self.dataset.clone(),
iter: self.eval_plan(plan, vec![]),
template: construct,
buffered_results: Vec::default(),
@ -87,7 +90,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
'a: 'b,
{
Ok(QueryResult::Graph(Box::new(DescribeIterator {
store: self.store.clone(),
dataset: self.dataset.clone(),
iter: self.eval_plan(plan, vec![]),
quads: Vec::default(),
})))
@ -110,7 +113,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
self.eval_plan(&*child, from)
.flat_map(move |tuple| match tuple {
Ok(tuple) => {
let mut iter = self.store.quads_for_pattern(
let mut iter = self.dataset.quads_for_pattern(
get_pattern_value(&subject, &tuple),
get_pattern_value(&predicate, &tuple),
get_pattern_value(&object, &tuple),
@ -528,13 +531,13 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
},
PlanExpression::UUID() => Some(EncodedTerm::NamedNode {
iri_id: self
.store
.dataset
.insert_str(&Uuid::new_v4().to_urn().to_string())
.ok()?,
}),
PlanExpression::StrUUID() => Some(EncodedTerm::StringLiteral {
value_id: self
.store
.dataset
.insert_str(&Uuid::new_v4().to_simple().to_string())
.ok()?,
}),
@ -638,7 +641,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
PlanExpression::BooleanCast(e) => match self.eval_expression(e, tuple)? {
EncodedTerm::BooleanLiteral(value) => Some(value.into()),
EncodedTerm::StringLiteral { value_id } => {
match &*self.store.get_str(value_id).ok()? {
match &*self.dataset.get_str(value_id).ok()?? {
"true" | "1" => Some(true.into()),
"false" | "0" => Some(false.into()),
_ => None,
@ -655,7 +658,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
Some(if value { 1. as f64 } else { 0. }.into())
}
EncodedTerm::StringLiteral { value_id } => Some(EncodedTerm::DoubleLiteral(
OrderedFloat(self.store.get_str(value_id).ok()?.parse().ok()?),
OrderedFloat(self.dataset.get_str(value_id).ok()??.parse().ok()?),
)),
_ => None,
},
@ -668,7 +671,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
Some(if value { 1. as f32 } else { 0. }.into())
}
EncodedTerm::StringLiteral { value_id } => Some(EncodedTerm::FloatLiteral(
OrderedFloat(self.store.get_str(value_id).ok()?.parse().ok()?),
OrderedFloat(self.dataset.get_str(value_id).ok()??.parse().ok()?),
)),
_ => None,
},
@ -679,7 +682,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
EncodedTerm::DecimalLiteral(value) => Some(value.to_i128()?.into()),
EncodedTerm::BooleanLiteral(value) => Some(if value { 1 } else { 0 }.into()),
EncodedTerm::StringLiteral { value_id } => Some(EncodedTerm::IntegerLiteral(
self.store.get_str(value_id).ok()?.parse().ok()?,
self.dataset.get_str(value_id).ok()??.parse().ok()?,
)),
_ => None,
},
@ -697,7 +700,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
.into(),
),
EncodedTerm::StringLiteral { value_id } => Some(EncodedTerm::DecimalLiteral(
self.store.get_str(value_id).ok()?.parse().ok()?,
self.dataset.get_str(value_id).ok()??.parse().ok()?,
)),
_ => None,
},
@ -706,7 +709,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
EncodedTerm::DateTime(value) => Some(value.date().naive_utc().into()), //TODO: use date with timezone
EncodedTerm::NaiveDateTime(value) => Some(value.date().into()),
EncodedTerm::StringLiteral { value_id } => {
let value = self.store.get_str(value_id).ok()?;
let value = self.dataset.get_str(value_id).ok()??;
Some(NaiveDate::parse_from_str(&value, "%Y-%m-%d").ok()?.into())
}
_ => None,
@ -716,7 +719,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
EncodedTerm::DateTime(value) => Some(value.time().into()),
EncodedTerm::NaiveDateTime(value) => Some(value.time().into()),
EncodedTerm::StringLiteral { value_id } => {
let value = self.store.get_str(value_id).ok()?;
let value = self.dataset.get_str(value_id).ok()??;
Some(NaiveTime::parse_from_str(&value, "%H:%M:%S").ok()?.into())
}
_ => None,
@ -725,7 +728,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
EncodedTerm::DateTime(value) => Some(value.into()),
EncodedTerm::NaiveDateTime(value) => Some(value.into()),
EncodedTerm::StringLiteral { value_id } => {
let value = self.store.get_str(value_id).ok()?;
let value = self.dataset.get_str(value_id).ok()??;
Some(match DateTime::parse_from_rfc3339(&value) {
Ok(value) => value.into(),
Err(_) => NaiveDateTime::parse_from_str(&value, "%Y-%m-%dT%H:%M:%S")
@ -762,24 +765,27 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
| EncodedTerm::LangStringLiteral { value_id, .. }
| EncodedTerm::TypedLiteral { value_id, .. } => Some(value_id),
EncodedTerm::BooleanLiteral(value) => self
.store
.dataset
.insert_str(if value { "true" } else { "false" })
.ok(),
EncodedTerm::FloatLiteral(value) => self.store.insert_str(&value.to_string()).ok(),
EncodedTerm::DoubleLiteral(value) => self.store.insert_str(&value.to_string()).ok(),
EncodedTerm::IntegerLiteral(value) => self.store.insert_str(&value.to_string()).ok(),
EncodedTerm::DecimalLiteral(value) => self.store.insert_str(&value.to_string()).ok(),
EncodedTerm::Date(value) => self.store.insert_str(&value.to_string()).ok(),
EncodedTerm::NaiveDate(value) => self.store.insert_str(&value.to_string()).ok(),
EncodedTerm::NaiveTime(value) => self.store.insert_str(&value.to_string()).ok(),
EncodedTerm::DateTime(value) => self.store.insert_str(&value.to_string()).ok(),
EncodedTerm::NaiveDateTime(value) => self.store.insert_str(&value.to_string()).ok(),
EncodedTerm::FloatLiteral(value) => self.dataset.insert_str(&value.to_string()).ok(),
EncodedTerm::DoubleLiteral(value) => self.dataset.insert_str(&value.to_string()).ok(),
EncodedTerm::IntegerLiteral(value) => self.dataset.insert_str(&value.to_string()).ok(),
EncodedTerm::DecimalLiteral(value) => self.dataset.insert_str(&value.to_string()).ok(),
EncodedTerm::Date(value) => self.dataset.insert_str(&value.to_string()).ok(),
EncodedTerm::NaiveDate(value) => self.dataset.insert_str(&value.to_string()).ok(),
EncodedTerm::NaiveTime(value) => self.dataset.insert_str(&value.to_string()).ok(),
EncodedTerm::DateTime(value) => self.dataset.insert_str(&value.to_string()).ok(),
EncodedTerm::NaiveDateTime(value) => self.dataset.insert_str(&value.to_string()).ok(),
}
}
fn to_simple_string(&self, term: EncodedTerm) -> Option<S::StringType> {
fn to_simple_string(
&self,
term: EncodedTerm,
) -> Option<<DatasetView<S> as StringStore>::StringType> {
if let EncodedTerm::StringLiteral { value_id } = term {
Some(self.store.get_str(value_id).ok()?)
self.dataset.get_str(value_id).ok()?
} else {
None
}
@ -793,11 +799,11 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
}
}
fn to_string(&self, term: EncodedTerm) -> Option<S::StringType> {
fn to_string(&self, term: EncodedTerm) -> Option<<DatasetView<S> as StringStore>::StringType> {
match term {
EncodedTerm::StringLiteral { value_id }
| EncodedTerm::LangStringLiteral { value_id, .. } => {
Some(self.store.get_str(value_id).ok()?)
self.dataset.get_str(value_id).ok()?
}
_ => None,
}
@ -873,11 +879,11 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
where
'a: 'b,
{
let store = self.store.clone();
let dataset = self.dataset.clone();
BindingsIterator::new(
variables,
Box::new(iter.map(move |values| {
let encoder = store.encoder();
let encoder = dataset.encoder();
values?
.into_iter()
.map(|value| {
@ -1108,10 +1114,105 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
}
fn compare_str_ids(&self, a: u64, b: u64) -> Option<Ordering> {
if let (Ok(a), Ok(b)) = (self.store.get_str(a), self.store.get_str(b)) {
Some(a.cmp(&b))
Some(
self.dataset
.get_str(a)
.ok()??
.cmp(&self.dataset.get_str(b).ok()??),
)
}
}
#[derive(Clone)]
struct DatasetView<S: StoreConnection> {
store: S,
extra: Arc<MemoryStringStore>,
}
impl<S: StoreConnection> DatasetView<S> {
fn new(store: S) -> Self {
Self {
store,
extra: Arc::new(MemoryStringStore::default()),
}
}
fn quads_for_pattern<'a>(
&'a self,
subject: Option<EncodedTerm>,
predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>,
) -> Box<dyn Iterator<Item = Result<EncodedQuad>> + 'a> {
self.store
.quads_for_pattern(subject, predicate, object, graph_name)
}
fn encoder(&self) -> Encoder<&Self> {
Encoder::new(&self)
}
}
impl<S: StoreConnection> StringStore for DatasetView<S> {
type StringType = StringOrStoreString<S::StringType>;
fn get_str(&self, id: u64) -> Result<Option<StringOrStoreString<S::StringType>>> {
Ok(if let Some(value) = self.store.get_str(id)? {
Some(StringOrStoreString::Store(value))
} else if let Some(value) = self.extra.get_str(u64::MAX - id)? {
Some(StringOrStoreString::String(value))
} else {
None
})
}
fn get_str_id(&self, value: &str) -> Result<Option<u64>> {
Ok(if let Some(id) = self.store.get_str_id(value)? {
Some(id)
} else {
self.extra.get_str_id(value)?.map(|id| u64::MAX - id)
})
}
fn insert_str(&self, value: &str) -> Result<u64> {
Ok(if let Some(id) = self.store.get_str_id(value)? {
id
} else {
u64::MAX - self.extra.insert_str(value)?
})
}
}
pub enum StringOrStoreString<S: Deref<Target = str> + ToString + Into<String>> {
String(String),
Store(S),
}
impl<S: Deref<Target = str> + ToString + Into<String>> Deref for StringOrStoreString<S> {
type Target = str;
fn deref(&self) -> &str {
match self {
StringOrStoreString::String(s) => &*s,
StringOrStoreString::Store(s) => &*s,
}
}
}
impl<S: Deref<Target = str> + ToString + Into<String>> ToString for StringOrStoreString<S> {
fn to_string(&self) -> String {
match self {
StringOrStoreString::String(s) => s.to_string(),
StringOrStoreString::Store(s) => s.to_string(),
}
}
}
impl<S: Deref<Target = str> + ToString + Into<String>> From<StringOrStoreString<S>> for String {
fn from(string: StringOrStoreString<S>) -> Self {
match string {
StringOrStoreString::String(s) => s,
StringOrStoreString::Store(s) => s.into(),
}
}
}
@ -1360,7 +1461,7 @@ impl<'a> Iterator for HashDeduplicateIterator<'a> {
}
struct ConstructIterator<'a, S: StoreConnection> {
store: S,
dataset: DatasetView<S>,
iter: EncodedTuplesIterator<'a>,
template: &'a [TripleTemplate],
buffered_results: Vec<Result<Triple>>,
@ -1379,7 +1480,7 @@ impl<'a, S: StoreConnection> Iterator for ConstructIterator<'a, S> {
Ok(tuple) => tuple,
Err(error) => return Some(Err(error)),
};
let encoder = self.store.encoder();
let encoder = self.dataset.encoder();
for template in self.template {
if let (Some(subject), Some(predicate), Some(object)) = (
get_triple_template_value(&template.subject, &tuple, &mut self.bnodes),
@ -1431,7 +1532,7 @@ fn decode_triple<S: StringStore>(
}
struct DescribeIterator<'a, S: StoreConnection + 'a> {
store: S,
dataset: DatasetView<S>,
iter: EncodedTuplesIterator<'a>,
quads: Vec<Result<EncodedQuad>>,
}
@ -1443,7 +1544,7 @@ impl<'a, S: StoreConnection> Iterator for DescribeIterator<'a, S> {
if let Some(quad) = self.quads.pop() {
return Some(match quad {
Ok(quad) => self
.store
.dataset
.encoder()
.decode_quad(&quad)
.map(|q| q.into_triple()),
@ -1457,7 +1558,7 @@ impl<'a, S: StoreConnection> Iterator for DescribeIterator<'a, S> {
for subject in tuple {
if let Some(subject) = subject {
self.quads = self
.store
.dataset
.quads_for_pattern(Some(subject), None, None, None)
.collect();
}

@ -77,12 +77,16 @@ impl<'a> Store for &'a MemoryStore {
impl StringStore for MemoryStore {
type StringType = String;
fn insert_str(&self, value: &str) -> Result<u64> {
self.string_store.insert_str(value)
fn get_str(&self, id: u64) -> Result<Option<String>> {
self.string_store.get_str(id)
}
fn get_str(&self, id: u64) -> Result<String> {
self.string_store.get_str(id)
fn get_str_id(&self, value: &str) -> Result<Option<u64>> {
self.string_store.get_str_id(value)
}
fn insert_str(&self, value: &str) -> Result<u64> {
self.string_store.insert_str(value)
}
}

@ -34,8 +34,9 @@ const XSD_TIME_ID: u64 = 10;
pub trait StringStore {
type StringType: Deref<Target = str> + ToString + Into<String>;
fn get_str(&self, id: u64) -> Result<Option<Self::StringType>>;
fn get_str_id(&self, value: &str) -> Result<Option<u64>>;
fn insert_str(&self, value: &str) -> Result<u64>;
fn get_str(&self, id: u64) -> Result<Self::StringType>;
/// Should be called when the bytes store is created
fn set_first_strings(&self) -> Result<()> {
@ -67,9 +68,13 @@ impl<'a, S: StringStore> StringStore for &'a S {
(*self).insert_str(value)
}
fn get_str(&self, id: u64) -> Result<S::StringType> {
fn get_str(&self, id: u64) -> Result<Option<S::StringType>> {
(*self).get_str(id)
}
fn get_str_id(&self, value: &str) -> Result<Option<u64>> {
(*self).get_str_id(value)
}
}
pub struct MemoryStringStore {
@ -94,21 +99,29 @@ impl StringStore for MemoryStringStore {
fn insert_str(&self, value: &str) -> Result<u64> {
let mut id2str = self.id2str.write().map_err(MutexPoisonError::from)?;
let mut str2id = self.str2id.write().map_err(MutexPoisonError::from)?;
let id = str2id.entry(value.to_string()).or_insert_with(|| {
Ok(if let Some(id) = str2id.get(value) {
*id
} else {
let id = id2str.len() as u64;
id2str.push(value.to_string());
str2id.insert(value.to_string(), id);
id
});
Ok(*id)
})
}
fn get_str(&self, id: u64) -> Result<String> {
fn get_str(&self, id: u64) -> Result<Option<String>> {
//TODO: avoid copy by adding a lifetime limit to get_str
let id2str = self.id2str.read().map_err(MutexPoisonError::from)?;
if id2str.len() as u64 <= id {
Err(format_err!("value not found in the dictionary"))
Ok(if id2str.len() as u64 <= id {
None
} else {
Ok(id2str[id as usize].to_owned())
}
Some(id2str[id as usize].to_owned())
})
}
fn get_str_id(&self, value: &str) -> Result<Option<u64>> {
let str2id = self.str2id.read().map_err(MutexPoisonError::from)?;
Ok(str2id.get(value).cloned())
}
}
@ -806,26 +819,26 @@ impl<S: StringStore> Encoder<S> {
Err(format_err!("The default graph tag is not a valid term"))
}
EncodedTerm::NamedNode { iri_id } => {
Ok(NamedNode::new_from_string(self.string_store.get_str(iri_id)?).into())
Ok(NamedNode::new_from_string(self.get_str(iri_id)?).into())
}
EncodedTerm::BlankNode(id) => Ok(BlankNode::from(id).into()),
EncodedTerm::StringLiteral { value_id } => {
Ok(Literal::new_simple_literal(self.string_store.get_str(value_id)?).into())
Ok(Literal::new_simple_literal(self.get_str(value_id)?).into())
}
EncodedTerm::LangStringLiteral {
value_id,
language_id,
} => Ok(Literal::new_language_tagged_literal(
self.string_store.get_str(value_id)?,
self.string_store.get_str(language_id)?,
self.get_str(value_id)?,
self.get_str(language_id)?,
)
.into()),
EncodedTerm::TypedLiteral {
value_id,
datatype_id,
} => Ok(Literal::new_typed_literal(
self.string_store.get_str(value_id)?,
NamedNode::new_from_string(self.string_store.get_str(datatype_id)?),
self.get_str(value_id)?,
NamedNode::new_from_string(self.get_str(datatype_id)?),
)
.into()),
EncodedTerm::BooleanLiteral(value) => Ok(Literal::from(value).into()),
@ -841,6 +854,15 @@ impl<S: StringStore> Encoder<S> {
}
}
fn get_str(&self, id: u64) -> Result<S::StringType> {
self.string_store.get_str(id)?.ok_or_else(|| {
format_err!(
"Not able to find the string with id {} in the string store",
id
)
})
}
pub fn decode_named_or_blank_node(&self, encoded: EncodedTerm) -> Result<NamedOrBlankNode> {
match self.decode_term(encoded)? {
Term::NamedNode(named_node) => Ok(named_node.into()),

@ -132,34 +132,38 @@ impl StringStore for RocksDbStoreConnection<'_> {
type StringType = RocksString;
fn insert_str(&self, value: &str) -> Result<u64> {
let value = value.as_bytes();
Ok(
if let Some(id) = self.store.db.get_cf(self.str2id_cf, value)? {
LittleEndian::read_u64(&id)
} else {
let id = self
.store
.str_id_counter
.lock()
.map_err(MutexPoisonError::from)?
.get_and_increment(&self.store.db)? as u64;
let id_bytes = to_bytes(id);
let mut batch = WriteBatch::default();
batch.put_cf(self.id2str_cf, &id_bytes, value)?;
batch.put_cf(self.str2id_cf, value, &id_bytes)?;
self.store.db.write(batch)?;
id
},
)
Ok(if let Some(id) = self.get_str_id(value)? {
id
} else {
let id = self
.store
.str_id_counter
.lock()
.map_err(MutexPoisonError::from)?
.get_and_increment(&self.store.db)? as u64;
let id_bytes = to_bytes(id);
let mut batch = WriteBatch::default();
batch.put_cf(self.id2str_cf, &id_bytes, value)?;
batch.put_cf(self.str2id_cf, value, &id_bytes)?;
self.store.db.write(batch)?;
id
})
}
fn get_str(&self, id: u64) -> Result<RocksString> {
let value = self.store.db.get_cf(self.id2str_cf, &to_bytes(id))?;
if let Some(value) = value {
Ok(RocksString { vec: value })
} else {
Err(format_err!("value not found in the dictionary"))
}
fn get_str(&self, id: u64) -> Result<Option<RocksString>> {
Ok(self
.store
.db
.get_cf(self.id2str_cf, &to_bytes(id))?
.map(|v| RocksString { vec: v }))
}
fn get_str_id(&self, value: &str) -> Result<Option<u64>> {
Ok(self
.store
.db
.get_cf(self.str2id_cf, value.as_bytes())?
.map(|id| LittleEndian::read_u64(&id)))
}
}

Loading…
Cancel
Save