Transforms BytesStore into StringStore

pull/10/head
Tpt 6 years ago
parent b2d9218aee
commit 8faba13f5a
  1. 84
      lib/src/sparql/eval.rs
  2. 23
      lib/src/store/encoded.rs
  3. 36
      lib/src/store/memory.rs
  4. 120
      lib/src/store/numeric_encoder.rs
  5. 28
      lib/src/store/rocksdb.rs

@ -17,7 +17,6 @@ use std::collections::BTreeMap;
use std::collections::HashSet;
use std::iter::once;
use std::iter::Iterator;
use std::str;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::Mutex;
@ -540,9 +539,9 @@ impl<S: EncodedQuadsStore> SimpleEvaluator<S> {
EncodedTerm::BooleanLiteral(value) => Some(value.into()),
EncodedTerm::SimpleLiteral { value_id }
| EncodedTerm::StringLiteral { value_id } => {
match &*self.store.get_bytes(value_id).ok()?? {
b"true" | b"1" => Some(true.into()),
b"false" | b"0" => Some(false.into()),
match &*self.store.get_str(value_id).ok()? {
"true" | "1" => Some(true.into()),
"false" | "0" => Some(false.into()),
_ => None,
}
}
@ -557,14 +556,9 @@ impl<S: EncodedQuadsStore> SimpleEvaluator<S> {
Some(if value { 1. as f64 } else { 0. }.into())
}
EncodedTerm::SimpleLiteral { value_id }
| EncodedTerm::StringLiteral { value_id } => {
Some(EncodedTerm::DoubleLiteral(OrderedFloat(
str::from_utf8(&self.store.get_bytes(value_id).ok()??)
.ok()?
.parse()
.ok()?,
)))
}
| EncodedTerm::StringLiteral { value_id } => Some(EncodedTerm::DoubleLiteral(
OrderedFloat(self.store.get_str(value_id).ok()?.parse().ok()?),
)),
_ => None,
},
PlanExpression::FloatCast(e) => match self.eval_expression(e, tuple)? {
@ -576,14 +570,9 @@ impl<S: EncodedQuadsStore> SimpleEvaluator<S> {
Some(if value { 1. as f32 } else { 0. }.into())
}
EncodedTerm::SimpleLiteral { value_id }
| EncodedTerm::StringLiteral { value_id } => {
Some(EncodedTerm::FloatLiteral(OrderedFloat(
str::from_utf8(&self.store.get_bytes(value_id).ok()??)
.ok()?
.parse()
.ok()?,
)))
}
| EncodedTerm::StringLiteral { value_id } => Some(EncodedTerm::FloatLiteral(
OrderedFloat(self.store.get_str(value_id).ok()?.parse().ok()?),
)),
_ => None,
},
PlanExpression::IntegerCast(e) => match self.eval_expression(e, tuple)? {
@ -594,10 +583,7 @@ impl<S: EncodedQuadsStore> SimpleEvaluator<S> {
EncodedTerm::BooleanLiteral(value) => Some(if value { 1 } else { 0 }.into()),
EncodedTerm::SimpleLiteral { value_id }
| EncodedTerm::StringLiteral { value_id } => Some(EncodedTerm::IntegerLiteral(
str::from_utf8(&self.store.get_bytes(value_id).ok()??)
.ok()?
.parse()
.ok()?,
self.store.get_str(value_id).ok()?.parse().ok()?,
)),
_ => None,
},
@ -615,10 +601,7 @@ impl<S: EncodedQuadsStore> SimpleEvaluator<S> {
),
EncodedTerm::SimpleLiteral { value_id }
| EncodedTerm::StringLiteral { value_id } => Some(EncodedTerm::DecimalLiteral(
str::from_utf8(&self.store.get_bytes(value_id).ok()??)
.ok()?
.parse()
.ok()?,
self.store.get_str(value_id).ok()?.parse().ok()?,
)),
_ => None,
},
@ -627,8 +610,7 @@ impl<S: EncodedQuadsStore> SimpleEvaluator<S> {
EncodedTerm::NaiveDateTime(value) => Some(value.into()),
EncodedTerm::SimpleLiteral { value_id }
| EncodedTerm::StringLiteral { value_id } => {
let bytes = self.store.get_bytes(value_id).ok()??;
let value = str::from_utf8(&bytes).ok()?;
let value = self.store.get_str(value_id).ok()?;
Some(match DateTime::parse_from_rfc3339(&value) {
Ok(value) => value.into(),
Err(_) => NaiveDateTime::parse_from_str(&value, "%Y-%m-%dT%H:%M:%S")
@ -668,36 +650,20 @@ impl<S: EncodedQuadsStore> SimpleEvaluator<S> {
| EncodedTerm::TypedLiteral { value_id, .. } => Some(value_id),
EncodedTerm::BooleanLiteral(value) => self
.store
.insert_bytes(if value { b"true" } else { b"false" })
.insert_str(if value { "true" } else { "false" })
.ok(),
EncodedTerm::FloatLiteral(value) => {
self.store.insert_bytes(value.to_string().as_bytes()).ok()
}
EncodedTerm::DoubleLiteral(value) => {
self.store.insert_bytes(value.to_string().as_bytes()).ok()
}
EncodedTerm::IntegerLiteral(value) => {
self.store.insert_bytes(value.to_string().as_bytes()).ok()
}
EncodedTerm::DecimalLiteral(value) => {
self.store.insert_bytes(value.to_string().as_bytes()).ok()
}
EncodedTerm::DateTime(value) => {
self.store.insert_bytes(value.to_string().as_bytes()).ok()
}
EncodedTerm::NaiveDateTime(value) => {
self.store.insert_bytes(value.to_string().as_bytes()).ok()
}
EncodedTerm::FloatLiteral(value) => self.store.insert_str(&value.to_string()).ok(),
EncodedTerm::DoubleLiteral(value) => self.store.insert_str(&value.to_string()).ok(),
EncodedTerm::IntegerLiteral(value) => self.store.insert_str(&value.to_string()).ok(),
EncodedTerm::DecimalLiteral(value) => self.store.insert_str(&value.to_string()).ok(),
EncodedTerm::DateTime(value) => self.store.insert_str(&value.to_string()).ok(),
EncodedTerm::NaiveDateTime(value) => self.store.insert_str(&value.to_string()).ok(),
}
}
fn to_simple_string(&self, term: EncodedTerm) -> Option<String> {
if let EncodedTerm::SimpleLiteral { value_id } = term {
Some(
str::from_utf8(&self.store.get_bytes(value_id).ok()??)
.ok()?
.to_owned(),
)
self.store.get_str(value_id).ok()
} else {
None
}
@ -707,11 +673,7 @@ impl<S: EncodedQuadsStore> SimpleEvaluator<S> {
match term {
EncodedTerm::SimpleLiteral { value_id }
| EncodedTerm::StringLiteral { value_id }
| EncodedTerm::LangStringLiteral { value_id, .. } => Some(
str::from_utf8(&self.store.get_bytes(value_id).ok()??)
.ok()?
.to_owned(),
),
| EncodedTerm::LangStringLiteral { value_id, .. } => self.store.get_str(value_id).ok(),
_ => None,
}
}
@ -875,7 +837,7 @@ impl<S: EncodedQuadsStore> SimpleEvaluator<S> {
}
fn compare_str_ids(&self, a: u64, b: u64) -> Option<Ordering> {
if let (Ok(Some(a)), Ok(Some(b))) = (self.store.get_bytes(a), self.store.get_bytes(b)) {
if let (Ok(a), Ok(b)) = (self.store.get_str(a), self.store.get_str(b)) {
Some(a.cmp(&b))
} else {
None
@ -1181,7 +1143,7 @@ fn get_triple_template_value(
}
}
fn decode_triple<S: BytesStore>(
fn decode_triple<S: StringStore>(
encoder: &Encoder<S>,
subject: EncodedTerm,
predicate: EncodedTerm,

@ -6,11 +6,12 @@ use std::iter::FromIterator;
use std::iter::Iterator;
use std::sync::Arc;
use store::numeric_encoder::*;
use url::Url;
use Result;
/// Defines the Store traits that is used to have efficient binary storage
pub trait EncodedQuadsStore: BytesStore + Sized + 'static {
pub trait EncodedQuadsStore: StringStore + Sized + 'static {
type QuadsIterator: Iterator<Item = Result<EncodedQuad>> + 'static;
type QuadsForSubjectIterator: Iterator<Item = Result<EncodedQuad>> + 'static;
type QuadsForSubjectPredicateIterator: Iterator<Item = Result<EncodedQuad>> + 'static;
@ -27,8 +28,8 @@ pub trait EncodedQuadsStore: BytesStore + Sized + 'static {
type QuadsForPredicateObjectGraphIterator: Iterator<Item = Result<EncodedQuad>> + 'static;
type QuadsForObjectGraphIterator: Iterator<Item = Result<EncodedQuad>> + 'static;
fn encoder(&self) -> Encoder<DelegatingBytesStore<Self>> {
Encoder::new(DelegatingBytesStore(&self))
fn encoder(&self) -> Encoder<DelegatingStringStore<Self>> {
Encoder::new(DelegatingStringStore(&self))
}
fn quads(&self) -> Result<Self::QuadsIterator>;
@ -865,17 +866,19 @@ impl<S: EncodedQuadsStore> fmt::Display for StoreUnionGraph<S> {
}
}
pub struct DelegatingBytesStore<'a, S: 'a + BytesStore + Sized>(&'a S);
pub struct DelegatingStringStore<'a, S: 'a + StringStore + Sized>(&'a S);
impl<'a, S: BytesStore> BytesStore for DelegatingBytesStore<'a, S> {
type BytesOutput = S::BytesOutput;
impl<'a, S: StringStore> StringStore for DelegatingStringStore<'a, S> {
fn insert_str(&self, value: &str) -> Result<u64> {
self.0.insert_str(value)
}
fn insert_bytes(&self, value: &[u8]) -> Result<u64> {
self.0.insert_bytes(value)
fn get_str(&self, id: u64) -> Result<String> {
self.0.get_str(id)
}
fn get_bytes(&self, id: u64) -> Result<Option<S::BytesOutput>> {
self.0.get_bytes(id)
fn get_url(&self, id: u64) -> Result<Url> {
self.0.get_url(id)
}
}

@ -1,12 +1,14 @@
use failure::Backtrace;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::str::FromStr;
use std::sync::PoisonError;
use std::sync::RwLock;
use std::sync::RwLockReadGuard;
use std::sync::RwLockWriteGuard;
use store::encoded::*;
use store::numeric_encoder::*;
use url::Url;
use Result;
/// Memory based implementation of the `rudf::model::Dataset` trait.
@ -47,8 +49,8 @@ pub type MemoryDataset = StoreDataset<MemoryStore>;
pub type MemoryGraph = StoreDefaultGraph<MemoryStore>;
pub struct MemoryStore {
id2str: RwLock<Vec<Vec<u8>>>,
str2id: RwLock<BTreeMap<Vec<u8>, u64>>,
id2str: RwLock<Vec<String>>,
str2id: RwLock<BTreeMap<String, u64>>,
graph_indexes: RwLock<BTreeMap<EncodedTerm, MemoryGraphIndexes>>,
}
@ -71,28 +73,34 @@ struct MemoryGraphIndexes {
osp: BTreeMap<EncodedTerm, BTreeMap<EncodedTerm, BTreeSet<EncodedTerm>>>,
}
impl BytesStore for MemoryStore {
type BytesOutput = Vec<u8>;
fn insert_bytes(&self, value: &[u8]) -> Result<u64> {
impl StringStore for MemoryStore {
fn insert_str(&self, value: &str) -> Result<u64> {
let mut id2str = self.id2str.write().map_err(MemoryStorePoisonError::from)?;
let mut str2id = self.str2id.write().map_err(MemoryStorePoisonError::from)?;
let id = str2id.entry(value.to_vec()).or_insert_with(|| {
let id = str2id.entry(value.to_string()).or_insert_with(|| {
let id = id2str.len() as u64;
id2str.push(value.to_vec());
id2str.push(value.to_string());
id
});
Ok(*id)
}
fn get_bytes(&self, id: u64) -> Result<Option<Vec<u8>>> {
//TODO: use try_from when stable
fn get_str(&self, id: u64) -> Result<String> {
let id2str = self.id2str.read().map_err(MemoryStorePoisonError::from)?;
Ok(if id2str.len() as u64 <= id {
None
if id2str.len() as u64 <= id {
Err(format_err!("value not found in the dictionary"))
} else {
Some(id2str[id as usize].to_owned())
})
Ok(id2str[id as usize].to_owned())
}
}
fn get_url(&self, id: u64) -> Result<Url> {
let id2str = self.id2str.read().map_err(MemoryStorePoisonError::from)?;
if id2str.len() as u64 <= id {
Err(format_err!("value not found in the dictionary"))
} else {
Ok(Url::from_str(&id2str[id as usize])?)
}
}
}

@ -9,9 +9,7 @@ use ordered_float::OrderedFloat;
use rust_decimal::Decimal;
use std::io::Read;
use std::io::Write;
use std::ops::Deref;
use std::str;
use std::str::FromStr;
use url::Url;
use uuid::Uuid;
use Result;
@ -26,23 +24,22 @@ const XSD_INTEGER_ID: u64 = 6;
const XSD_DECIMAL_ID: u64 = 7;
const XSD_DATE_TIME_ID: u64 = 8;
pub trait BytesStore {
type BytesOutput: Deref<Target = [u8]>;
fn insert_bytes(&self, value: &[u8]) -> Result<u64>;
fn get_bytes(&self, id: u64) -> Result<Option<Self::BytesOutput>>;
pub trait StringStore {
fn insert_str(&self, value: &str) -> Result<u64>;
fn get_str(&self, id: u64) -> Result<String>;
fn get_url(&self, id: u64) -> Result<Url>;
/// Should be called when the bytes store is created
fn set_first_strings(&self) -> Result<()> {
if EMPTY_STRING_ID == self.insert_bytes(b"")?
&& RDF_LANG_STRING_ID == self.insert_bytes(rdf::LANG_STRING.as_str().as_bytes())?
&& XSD_STRING_ID == self.insert_bytes(xsd::STRING.as_str().as_bytes())?
&& XSD_BOOLEAN_ID == self.insert_bytes(xsd::BOOLEAN.as_str().as_bytes())?
&& XSD_FLOAT_ID == self.insert_bytes(xsd::FLOAT.as_str().as_bytes())?
&& XSD_DOUBLE_ID == self.insert_bytes(xsd::DOUBLE.as_str().as_bytes())?
&& XSD_INTEGER_ID == self.insert_bytes(xsd::INTEGER.as_str().as_bytes())?
&& XSD_DECIMAL_ID == self.insert_bytes(xsd::DECIMAL.as_str().as_bytes())?
&& XSD_DATE_TIME_ID == self.insert_bytes(xsd::DATE_TIME.as_str().as_bytes())?
if EMPTY_STRING_ID == self.insert_str("")?
&& RDF_LANG_STRING_ID == self.insert_str(rdf::LANG_STRING.as_str())?
&& XSD_STRING_ID == self.insert_str(xsd::STRING.as_str())?
&& XSD_BOOLEAN_ID == self.insert_str(xsd::BOOLEAN.as_str())?
&& XSD_FLOAT_ID == self.insert_str(xsd::FLOAT.as_str())?
&& XSD_DOUBLE_ID == self.insert_str(xsd::DOUBLE.as_str())?
&& XSD_INTEGER_ID == self.insert_str(xsd::INTEGER.as_str())?
&& XSD_DECIMAL_ID == self.insert_str(xsd::DECIMAL.as_str())?
&& XSD_DATE_TIME_ID == self.insert_str(xsd::DATE_TIME.as_str())?
{
Ok(())
} else {
@ -446,18 +443,18 @@ impl<R: Write> TermWriter for R {
}
}
pub struct Encoder<S: BytesStore> {
pub struct Encoder<S: StringStore> {
string_store: S,
}
impl<S: BytesStore> Encoder<S> {
impl<S: StringStore> Encoder<S> {
pub fn new(string_store: S) -> Self {
Self { string_store }
}
pub fn encode_named_node(&self, named_node: &NamedNode) -> Result<EncodedTerm> {
Ok(EncodedTerm::NamedNode {
iri_id: self.encode_str_value(named_node.as_str())?,
iri_id: self.string_store.insert_str(named_node.as_str())?,
})
}
@ -469,17 +466,17 @@ impl<S: BytesStore> Encoder<S> {
Ok(if literal.is_plain() {
if let Some(language) = literal.language() {
EncodedTerm::LangStringLiteral {
value_id: self.encode_str_value(&literal.value())?,
language_id: self.encode_str_value(language)?,
value_id: self.string_store.insert_str(&literal.value())?,
language_id: self.string_store.insert_str(language)?,
}
} else {
EncodedTerm::SimpleLiteral {
value_id: self.encode_str_value(&literal.value())?,
value_id: self.string_store.insert_str(&literal.value())?,
}
}
} else if literal.is_string() {
EncodedTerm::StringLiteral {
value_id: self.encode_str_value(&literal.value())?,
value_id: self.string_store.insert_str(&literal.value())?,
}
} else if literal.is_boolean() {
literal
@ -518,8 +515,8 @@ impl<S: BytesStore> Encoder<S> {
.into()
} else {
EncodedTerm::TypedLiteral {
value_id: self.encode_str_value(&literal.value())?,
datatype_id: self.encode_str_value(literal.datatype().as_str())?,
value_id: self.string_store.insert_str(&literal.value())?,
datatype_id: self.string_store.insert_str(literal.datatype().as_str())?,
}
})
}
@ -570,28 +567,28 @@ impl<S: BytesStore> Encoder<S> {
Err(format_err!("The default graph tag is not a valid term"))
}
EncodedTerm::NamedNode { iri_id } => {
Ok(NamedNode::from(self.decode_url_value(iri_id)?).into())
Ok(NamedNode::from(self.string_store.get_url(iri_id)?).into())
}
EncodedTerm::BlankNode(id) => Ok(BlankNode::from(id).into()),
EncodedTerm::SimpleLiteral { value_id } => {
Ok(Literal::new_simple_literal(self.decode_str_value(value_id)?).into())
Ok(Literal::new_simple_literal(self.string_store.get_str(value_id)?).into())
}
EncodedTerm::LangStringLiteral {
value_id,
language_id,
} => Ok(Literal::new_language_tagged_literal(
self.decode_str_value(value_id)?,
self.decode_str_value(language_id)?,
self.string_store.get_str(value_id)?,
self.string_store.get_str(language_id)?,
).into()),
EncodedTerm::TypedLiteral {
value_id,
datatype_id,
} => Ok(Literal::new_typed_literal(
self.decode_str_value(value_id)?,
NamedNode::from(self.decode_url_value(datatype_id)?),
self.string_store.get_str(value_id)?,
NamedNode::from(self.string_store.get_url(datatype_id)?),
).into()),
EncodedTerm::StringLiteral { value_id } => {
Ok(Literal::from(self.decode_str_value(value_id)?).into())
Ok(Literal::from(self.string_store.get_str(value_id)?).into())
}
EncodedTerm::BooleanLiteral(value) => Ok(Literal::from(value).into()),
EncodedTerm::FloatLiteral(value) => Ok(Literal::from(*value).into()),
@ -644,29 +641,9 @@ impl<S: BytesStore> Encoder<S> {
},
))
}
fn encode_str_value(&self, text: &str) -> Result<u64> {
self.string_store.insert_bytes(text.as_bytes())
}
fn decode_url_value(&self, id: u64) -> Result<Url> {
let bytes = self.decode_value(id)?;
Ok(Url::from_str(str::from_utf8(&bytes)?)?)
}
fn decode_str_value(&self, id: u64) -> Result<String> {
let bytes = self.decode_value(id)?;
Ok(str::from_utf8(&bytes)?.to_owned())
}
fn decode_value(&self, id: u64) -> Result<S::BytesOutput> {
self.string_store
.get_bytes(id)?
.ok_or_else(|| format_err!("value not found in the dictionary"))
}
}
impl<S: BytesStore + Default> Default for Encoder<S> {
impl<S: StringStore + Default> Default for Encoder<S> {
fn default() -> Self {
Self {
string_store: S::default(),
@ -677,30 +654,43 @@ impl<S: BytesStore + Default> Default for Encoder<S> {
mod test {
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::str::FromStr;
use store::numeric_encoder::*;
#[derive(Default)]
struct MemoryBytesStore {
id2str: RefCell<BTreeMap<u64, Vec<u8>>>,
str2id: RefCell<BTreeMap<Vec<u8>, u64>>,
struct MemoryStringStore {
id2str: RefCell<Vec<String>>,
str2id: RefCell<BTreeMap<String, u64>>,
}
impl BytesStore for MemoryBytesStore {
type BytesOutput = Vec<u8>;
fn insert_bytes(&self, value: &[u8]) -> Result<u64> {
impl StringStore for MemoryStringStore {
fn insert_str(&self, value: &str) -> Result<u64> {
let mut str2id = self.str2id.borrow_mut();
let mut id2str = self.id2str.borrow_mut();
let id = str2id.entry(value.to_vec()).or_insert_with(|| {
let id = str2id.entry(value.to_string()).or_insert_with(|| {
let id = id2str.len() as u64;
id2str.insert(id, value.to_vec());
id2str.push(value.to_string());
id
});
Ok(*id)
}
fn get_bytes(&self, id: u64) -> Result<Option<Vec<u8>>> {
Ok(self.id2str.borrow().get(&id).map(|s| s.to_owned()))
fn get_str(&self, id: u64) -> Result<String> {
let id2str = self.id2str.borrow();
if id2str.len() as u64 <= id {
Err(format_err!("value not found in the dictionary"))
} else {
Ok(id2str[id as usize].to_owned())
}
}
fn get_url(&self, id: u64) -> Result<Url> {
let id2str = self.id2str.borrow();
if id2str.len() as u64 <= id {
Err(format_err!("value not found in the dictionary"))
} else {
Ok(Url::from_str(&id2str[id as usize])?)
}
}
}
@ -708,7 +698,7 @@ mod test {
fn test_encoding() {
use model::*;
let encoder: Encoder<MemoryBytesStore> = Encoder::default();
let encoder: Encoder<MemoryStringStore> = Encoder::default();
let terms: Vec<Term> = vec![
NamedNode::from_str("http://foo.com").unwrap().into(),
NamedNode::from_str("http://bar.com").unwrap().into(),

@ -3,7 +3,6 @@ use byteorder::LittleEndian;
use failure::Backtrace;
use rocksdb::ColumnFamily;
use rocksdb::DBRawIterator;
use rocksdb::DBVector;
use rocksdb::Options;
use rocksdb::WriteBatch;
use rocksdb::DB;
@ -11,11 +10,13 @@ use std::io::Cursor;
use std::ops::Deref;
use std::path::Path;
use std::str;
use std::str::FromStr;
use std::sync::Mutex;
use std::sync::PoisonError;
use store::encoded::EncodedQuadsStore;
use store::encoded::StoreDataset;
use store::numeric_encoder::*;
use url::Url;
use Result;
/// `rudf::model::Dataset` trait implementation based on the [RocksDB](https://rocksdb.org/) key-value store
@ -84,10 +85,9 @@ impl RocksDbStore {
}
}
impl BytesStore for RocksDbStore {
type BytesOutput = DBVector;
fn insert_bytes(&self, value: &[u8]) -> Result<u64> {
impl StringStore for RocksDbStore {
fn insert_str(&self, value: &str) -> Result<u64> {
let value = value.as_bytes();
Ok(if let Some(id) = self.db.get_cf(*self.str2id_cf, value)? {
LittleEndian::read_u64(&id)
} else {
@ -105,8 +105,22 @@ impl BytesStore for RocksDbStore {
})
}
fn get_bytes(&self, id: u64) -> Result<Option<DBVector>> {
Ok(self.db.get_cf(*self.id2str_cf, &to_bytes(id))?)
fn get_str(&self, id: u64) -> Result<String> {
let value = self.db.get_cf(*self.id2str_cf, &to_bytes(id))?;
if let Some(value) = value {
Ok(str::from_utf8(&value)?.to_owned())
} else {
Err(format_err!("value not found in the dictionary"))
}
}
fn get_url(&self, id: u64) -> Result<Url> {
let value = self.db.get_cf(*self.id2str_cf, &to_bytes(id))?;
if let Some(value) = value {
Ok(Url::from_str(str::from_utf8(&value)?)?)
} else {
Err(format_err!("value not found in the dictionary"))
}
}
}

Loading…
Cancel
Save