diff --git a/lib/src/sparql/eval.rs b/lib/src/sparql/eval.rs index 2d545d8a..294fd7d2 100644 --- a/lib/src/sparql/eval.rs +++ b/lib/src/sparql/eval.rs @@ -2,6 +2,7 @@ use crate::model::BlankNode; use crate::model::Triple; use crate::sparql::model::*; use crate::sparql::plan::*; +use crate::store::numeric_encoder::MemoryStringStore; use crate::store::numeric_encoder::*; use crate::store::StoreConnection; use crate::Result; @@ -18,8 +19,10 @@ use std::collections::BTreeMap; use std::collections::HashSet; use std::iter::once; use std::iter::Iterator; +use std::ops::Deref; use std::sync::Arc; use std::sync::Mutex; +use std::u64; use uuid::Uuid; const REGEX_SIZE_LIMIT: usize = 1_000_000; @@ -28,14 +31,14 @@ type EncodedTuplesIterator<'a> = Box> + #[derive(Clone)] pub struct SimpleEvaluator { - store: S, + dataset: DatasetView, bnodes_map: Arc>>, } impl<'a, S: StoreConnection + 'a> SimpleEvaluator { - pub fn new(store: S) -> Self { + pub fn new(dataset: S) -> Self { Self { - store, + dataset: DatasetView::new(dataset), bnodes_map: Arc::new(Mutex::new(BTreeMap::default())), } } @@ -74,7 +77,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator { 'a: 'b, { Ok(QueryResult::Graph(Box::new(ConstructIterator { - store: self.store.clone(), + dataset: self.dataset.clone(), iter: self.eval_plan(plan, vec![]), template: construct, buffered_results: Vec::default(), @@ -87,7 +90,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator { 'a: 'b, { Ok(QueryResult::Graph(Box::new(DescribeIterator { - store: self.store.clone(), + dataset: self.dataset.clone(), iter: self.eval_plan(plan, vec![]), quads: Vec::default(), }))) @@ -110,7 +113,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator { self.eval_plan(&*child, from) .flat_map(move |tuple| match tuple { Ok(tuple) => { - let mut iter = self.store.quads_for_pattern( + let mut iter = self.dataset.quads_for_pattern( get_pattern_value(&subject, &tuple), get_pattern_value(&predicate, &tuple), get_pattern_value(&object, &tuple), @@ -528,13 +531,13 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator { }, PlanExpression::UUID() => Some(EncodedTerm::NamedNode { iri_id: self - .store + .dataset .insert_str(&Uuid::new_v4().to_urn().to_string()) .ok()?, }), PlanExpression::StrUUID() => Some(EncodedTerm::StringLiteral { value_id: self - .store + .dataset .insert_str(&Uuid::new_v4().to_simple().to_string()) .ok()?, }), @@ -638,7 +641,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator { PlanExpression::BooleanCast(e) => match self.eval_expression(e, tuple)? { EncodedTerm::BooleanLiteral(value) => Some(value.into()), EncodedTerm::StringLiteral { value_id } => { - match &*self.store.get_str(value_id).ok()? { + match &*self.dataset.get_str(value_id).ok()?? { "true" | "1" => Some(true.into()), "false" | "0" => Some(false.into()), _ => None, @@ -655,7 +658,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator { Some(if value { 1. as f64 } else { 0. }.into()) } EncodedTerm::StringLiteral { value_id } => Some(EncodedTerm::DoubleLiteral( - OrderedFloat(self.store.get_str(value_id).ok()?.parse().ok()?), + OrderedFloat(self.dataset.get_str(value_id).ok()??.parse().ok()?), )), _ => None, }, @@ -668,7 +671,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator { Some(if value { 1. as f32 } else { 0. }.into()) } EncodedTerm::StringLiteral { value_id } => Some(EncodedTerm::FloatLiteral( - OrderedFloat(self.store.get_str(value_id).ok()?.parse().ok()?), + OrderedFloat(self.dataset.get_str(value_id).ok()??.parse().ok()?), )), _ => None, }, @@ -679,7 +682,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator { EncodedTerm::DecimalLiteral(value) => Some(value.to_i128()?.into()), EncodedTerm::BooleanLiteral(value) => Some(if value { 1 } else { 0 }.into()), EncodedTerm::StringLiteral { value_id } => Some(EncodedTerm::IntegerLiteral( - self.store.get_str(value_id).ok()?.parse().ok()?, + self.dataset.get_str(value_id).ok()??.parse().ok()?, )), _ => None, }, @@ -697,7 +700,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator { .into(), ), EncodedTerm::StringLiteral { value_id } => Some(EncodedTerm::DecimalLiteral( - self.store.get_str(value_id).ok()?.parse().ok()?, + self.dataset.get_str(value_id).ok()??.parse().ok()?, )), _ => None, }, @@ -706,7 +709,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator { EncodedTerm::DateTime(value) => Some(value.date().naive_utc().into()), //TODO: use date with timezone EncodedTerm::NaiveDateTime(value) => Some(value.date().into()), EncodedTerm::StringLiteral { value_id } => { - let value = self.store.get_str(value_id).ok()?; + let value = self.dataset.get_str(value_id).ok()??; Some(NaiveDate::parse_from_str(&value, "%Y-%m-%d").ok()?.into()) } _ => None, @@ -716,7 +719,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator { EncodedTerm::DateTime(value) => Some(value.time().into()), EncodedTerm::NaiveDateTime(value) => Some(value.time().into()), EncodedTerm::StringLiteral { value_id } => { - let value = self.store.get_str(value_id).ok()?; + let value = self.dataset.get_str(value_id).ok()??; Some(NaiveTime::parse_from_str(&value, "%H:%M:%S").ok()?.into()) } _ => None, @@ -725,7 +728,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator { EncodedTerm::DateTime(value) => Some(value.into()), EncodedTerm::NaiveDateTime(value) => Some(value.into()), EncodedTerm::StringLiteral { value_id } => { - let value = self.store.get_str(value_id).ok()?; + let value = self.dataset.get_str(value_id).ok()??; Some(match DateTime::parse_from_rfc3339(&value) { Ok(value) => value.into(), Err(_) => NaiveDateTime::parse_from_str(&value, "%Y-%m-%dT%H:%M:%S") @@ -762,24 +765,27 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator { | EncodedTerm::LangStringLiteral { value_id, .. } | EncodedTerm::TypedLiteral { value_id, .. } => Some(value_id), EncodedTerm::BooleanLiteral(value) => self - .store + .dataset .insert_str(if value { "true" } else { "false" }) .ok(), - EncodedTerm::FloatLiteral(value) => self.store.insert_str(&value.to_string()).ok(), - EncodedTerm::DoubleLiteral(value) => self.store.insert_str(&value.to_string()).ok(), - EncodedTerm::IntegerLiteral(value) => self.store.insert_str(&value.to_string()).ok(), - EncodedTerm::DecimalLiteral(value) => self.store.insert_str(&value.to_string()).ok(), - EncodedTerm::Date(value) => self.store.insert_str(&value.to_string()).ok(), - EncodedTerm::NaiveDate(value) => self.store.insert_str(&value.to_string()).ok(), - EncodedTerm::NaiveTime(value) => self.store.insert_str(&value.to_string()).ok(), - EncodedTerm::DateTime(value) => self.store.insert_str(&value.to_string()).ok(), - EncodedTerm::NaiveDateTime(value) => self.store.insert_str(&value.to_string()).ok(), + EncodedTerm::FloatLiteral(value) => self.dataset.insert_str(&value.to_string()).ok(), + EncodedTerm::DoubleLiteral(value) => self.dataset.insert_str(&value.to_string()).ok(), + EncodedTerm::IntegerLiteral(value) => self.dataset.insert_str(&value.to_string()).ok(), + EncodedTerm::DecimalLiteral(value) => self.dataset.insert_str(&value.to_string()).ok(), + EncodedTerm::Date(value) => self.dataset.insert_str(&value.to_string()).ok(), + EncodedTerm::NaiveDate(value) => self.dataset.insert_str(&value.to_string()).ok(), + EncodedTerm::NaiveTime(value) => self.dataset.insert_str(&value.to_string()).ok(), + EncodedTerm::DateTime(value) => self.dataset.insert_str(&value.to_string()).ok(), + EncodedTerm::NaiveDateTime(value) => self.dataset.insert_str(&value.to_string()).ok(), } } - fn to_simple_string(&self, term: EncodedTerm) -> Option { + fn to_simple_string( + &self, + term: EncodedTerm, + ) -> Option< as StringStore>::StringType> { if let EncodedTerm::StringLiteral { value_id } = term { - Some(self.store.get_str(value_id).ok()?) + self.dataset.get_str(value_id).ok()? } else { None } @@ -793,11 +799,11 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator { } } - fn to_string(&self, term: EncodedTerm) -> Option { + fn to_string(&self, term: EncodedTerm) -> Option< as StringStore>::StringType> { match term { EncodedTerm::StringLiteral { value_id } | EncodedTerm::LangStringLiteral { value_id, .. } => { - Some(self.store.get_str(value_id).ok()?) + self.dataset.get_str(value_id).ok()? } _ => None, } @@ -873,11 +879,11 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator { where 'a: 'b, { - let store = self.store.clone(); + let dataset = self.dataset.clone(); BindingsIterator::new( variables, Box::new(iter.map(move |values| { - let encoder = store.encoder(); + let encoder = dataset.encoder(); values? .into_iter() .map(|value| { @@ -1108,10 +1114,105 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator { } fn compare_str_ids(&self, a: u64, b: u64) -> Option { - if let (Ok(a), Ok(b)) = (self.store.get_str(a), self.store.get_str(b)) { - Some(a.cmp(&b)) + Some( + self.dataset + .get_str(a) + .ok()?? + .cmp(&self.dataset.get_str(b).ok()??), + ) + } +} + +#[derive(Clone)] +struct DatasetView { + store: S, + extra: Arc, +} + +impl DatasetView { + fn new(store: S) -> Self { + Self { + store, + extra: Arc::new(MemoryStringStore::default()), + } + } + + fn quads_for_pattern<'a>( + &'a self, + subject: Option, + predicate: Option, + object: Option, + graph_name: Option, + ) -> Box> + 'a> { + self.store + .quads_for_pattern(subject, predicate, object, graph_name) + } + + fn encoder(&self) -> Encoder<&Self> { + Encoder::new(&self) + } +} + +impl StringStore for DatasetView { + type StringType = StringOrStoreString; + + fn get_str(&self, id: u64) -> Result>> { + Ok(if let Some(value) = self.store.get_str(id)? { + Some(StringOrStoreString::Store(value)) + } else if let Some(value) = self.extra.get_str(u64::MAX - id)? { + Some(StringOrStoreString::String(value)) } else { None + }) + } + + fn get_str_id(&self, value: &str) -> Result> { + Ok(if let Some(id) = self.store.get_str_id(value)? { + Some(id) + } else { + self.extra.get_str_id(value)?.map(|id| u64::MAX - id) + }) + } + + fn insert_str(&self, value: &str) -> Result { + Ok(if let Some(id) = self.store.get_str_id(value)? { + id + } else { + u64::MAX - self.extra.insert_str(value)? + }) + } +} + +pub enum StringOrStoreString + ToString + Into> { + String(String), + Store(S), +} + +impl + ToString + Into> Deref for StringOrStoreString { + type Target = str; + + fn deref(&self) -> &str { + match self { + StringOrStoreString::String(s) => &*s, + StringOrStoreString::Store(s) => &*s, + } + } +} + +impl + ToString + Into> ToString for StringOrStoreString { + fn to_string(&self) -> String { + match self { + StringOrStoreString::String(s) => s.to_string(), + StringOrStoreString::Store(s) => s.to_string(), + } + } +} + +impl + ToString + Into> From> for String { + fn from(string: StringOrStoreString) -> Self { + match string { + StringOrStoreString::String(s) => s, + StringOrStoreString::Store(s) => s.into(), } } } @@ -1360,7 +1461,7 @@ impl<'a> Iterator for HashDeduplicateIterator<'a> { } struct ConstructIterator<'a, S: StoreConnection> { - store: S, + dataset: DatasetView, iter: EncodedTuplesIterator<'a>, template: &'a [TripleTemplate], buffered_results: Vec>, @@ -1379,7 +1480,7 @@ impl<'a, S: StoreConnection> Iterator for ConstructIterator<'a, S> { Ok(tuple) => tuple, Err(error) => return Some(Err(error)), }; - let encoder = self.store.encoder(); + let encoder = self.dataset.encoder(); for template in self.template { if let (Some(subject), Some(predicate), Some(object)) = ( get_triple_template_value(&template.subject, &tuple, &mut self.bnodes), @@ -1431,7 +1532,7 @@ fn decode_triple( } struct DescribeIterator<'a, S: StoreConnection + 'a> { - store: S, + dataset: DatasetView, iter: EncodedTuplesIterator<'a>, quads: Vec>, } @@ -1443,7 +1544,7 @@ impl<'a, S: StoreConnection> Iterator for DescribeIterator<'a, S> { if let Some(quad) = self.quads.pop() { return Some(match quad { Ok(quad) => self - .store + .dataset .encoder() .decode_quad(&quad) .map(|q| q.into_triple()), @@ -1457,7 +1558,7 @@ impl<'a, S: StoreConnection> Iterator for DescribeIterator<'a, S> { for subject in tuple { if let Some(subject) = subject { self.quads = self - .store + .dataset .quads_for_pattern(Some(subject), None, None, None) .collect(); } diff --git a/lib/src/store/memory.rs b/lib/src/store/memory.rs index e0ee8129..4de91cbd 100644 --- a/lib/src/store/memory.rs +++ b/lib/src/store/memory.rs @@ -77,12 +77,16 @@ impl<'a> Store for &'a MemoryStore { impl StringStore for MemoryStore { type StringType = String; - fn insert_str(&self, value: &str) -> Result { - self.string_store.insert_str(value) + fn get_str(&self, id: u64) -> Result> { + self.string_store.get_str(id) } - fn get_str(&self, id: u64) -> Result { - self.string_store.get_str(id) + fn get_str_id(&self, value: &str) -> Result> { + self.string_store.get_str_id(value) + } + + fn insert_str(&self, value: &str) -> Result { + self.string_store.insert_str(value) } } diff --git a/lib/src/store/numeric_encoder.rs b/lib/src/store/numeric_encoder.rs index fca8289f..893aa751 100644 --- a/lib/src/store/numeric_encoder.rs +++ b/lib/src/store/numeric_encoder.rs @@ -34,8 +34,9 @@ const XSD_TIME_ID: u64 = 10; pub trait StringStore { type StringType: Deref + ToString + Into; + fn get_str(&self, id: u64) -> Result>; + fn get_str_id(&self, value: &str) -> Result>; fn insert_str(&self, value: &str) -> Result; - fn get_str(&self, id: u64) -> Result; /// Should be called when the bytes store is created fn set_first_strings(&self) -> Result<()> { @@ -67,9 +68,13 @@ impl<'a, S: StringStore> StringStore for &'a S { (*self).insert_str(value) } - fn get_str(&self, id: u64) -> Result { + fn get_str(&self, id: u64) -> Result> { (*self).get_str(id) } + + fn get_str_id(&self, value: &str) -> Result> { + (*self).get_str_id(value) + } } pub struct MemoryStringStore { @@ -94,21 +99,29 @@ impl StringStore for MemoryStringStore { fn insert_str(&self, value: &str) -> Result { let mut id2str = self.id2str.write().map_err(MutexPoisonError::from)?; let mut str2id = self.str2id.write().map_err(MutexPoisonError::from)?; - let id = str2id.entry(value.to_string()).or_insert_with(|| { + Ok(if let Some(id) = str2id.get(value) { + *id + } else { let id = id2str.len() as u64; id2str.push(value.to_string()); + str2id.insert(value.to_string(), id); id - }); - Ok(*id) + }) } - fn get_str(&self, id: u64) -> Result { + fn get_str(&self, id: u64) -> Result> { + //TODO: avoid copy by adding a lifetime limit to get_str let id2str = self.id2str.read().map_err(MutexPoisonError::from)?; - if id2str.len() as u64 <= id { - Err(format_err!("value not found in the dictionary")) + Ok(if id2str.len() as u64 <= id { + None } else { - Ok(id2str[id as usize].to_owned()) - } + Some(id2str[id as usize].to_owned()) + }) + } + + fn get_str_id(&self, value: &str) -> Result> { + let str2id = self.str2id.read().map_err(MutexPoisonError::from)?; + Ok(str2id.get(value).cloned()) } } @@ -806,26 +819,26 @@ impl Encoder { Err(format_err!("The default graph tag is not a valid term")) } EncodedTerm::NamedNode { iri_id } => { - Ok(NamedNode::new_from_string(self.string_store.get_str(iri_id)?).into()) + Ok(NamedNode::new_from_string(self.get_str(iri_id)?).into()) } EncodedTerm::BlankNode(id) => Ok(BlankNode::from(id).into()), EncodedTerm::StringLiteral { value_id } => { - Ok(Literal::new_simple_literal(self.string_store.get_str(value_id)?).into()) + Ok(Literal::new_simple_literal(self.get_str(value_id)?).into()) } EncodedTerm::LangStringLiteral { value_id, language_id, } => Ok(Literal::new_language_tagged_literal( - self.string_store.get_str(value_id)?, - self.string_store.get_str(language_id)?, + self.get_str(value_id)?, + self.get_str(language_id)?, ) .into()), EncodedTerm::TypedLiteral { value_id, datatype_id, } => Ok(Literal::new_typed_literal( - self.string_store.get_str(value_id)?, - NamedNode::new_from_string(self.string_store.get_str(datatype_id)?), + self.get_str(value_id)?, + NamedNode::new_from_string(self.get_str(datatype_id)?), ) .into()), EncodedTerm::BooleanLiteral(value) => Ok(Literal::from(value).into()), @@ -841,6 +854,15 @@ impl Encoder { } } + fn get_str(&self, id: u64) -> Result { + self.string_store.get_str(id)?.ok_or_else(|| { + format_err!( + "Not able to find the string with id {} in the string store", + id + ) + }) + } + pub fn decode_named_or_blank_node(&self, encoded: EncodedTerm) -> Result { match self.decode_term(encoded)? { Term::NamedNode(named_node) => Ok(named_node.into()), diff --git a/lib/src/store/rocksdb.rs b/lib/src/store/rocksdb.rs index 383975ba..e8ebe70f 100644 --- a/lib/src/store/rocksdb.rs +++ b/lib/src/store/rocksdb.rs @@ -132,34 +132,38 @@ impl StringStore for RocksDbStoreConnection<'_> { type StringType = RocksString; fn insert_str(&self, value: &str) -> Result { - let value = value.as_bytes(); - Ok( - if let Some(id) = self.store.db.get_cf(self.str2id_cf, value)? { - LittleEndian::read_u64(&id) - } else { - let id = self - .store - .str_id_counter - .lock() - .map_err(MutexPoisonError::from)? - .get_and_increment(&self.store.db)? as u64; - let id_bytes = to_bytes(id); - let mut batch = WriteBatch::default(); - batch.put_cf(self.id2str_cf, &id_bytes, value)?; - batch.put_cf(self.str2id_cf, value, &id_bytes)?; - self.store.db.write(batch)?; - id - }, - ) + Ok(if let Some(id) = self.get_str_id(value)? { + id + } else { + let id = self + .store + .str_id_counter + .lock() + .map_err(MutexPoisonError::from)? + .get_and_increment(&self.store.db)? as u64; + let id_bytes = to_bytes(id); + let mut batch = WriteBatch::default(); + batch.put_cf(self.id2str_cf, &id_bytes, value)?; + batch.put_cf(self.str2id_cf, value, &id_bytes)?; + self.store.db.write(batch)?; + id + }) } - fn get_str(&self, id: u64) -> Result { - let value = self.store.db.get_cf(self.id2str_cf, &to_bytes(id))?; - if let Some(value) = value { - Ok(RocksString { vec: value }) - } else { - Err(format_err!("value not found in the dictionary")) - } + fn get_str(&self, id: u64) -> Result> { + Ok(self + .store + .db + .get_cf(self.id2str_cf, &to_bytes(id))? + .map(|v| RocksString { vec: v })) + } + + fn get_str_id(&self, value: &str) -> Result> { + Ok(self + .store + .db + .get_cf(self.str2id_cf, value.as_bytes())? + .map(|id| LittleEndian::read_u64(&id))) } }