Allows different terms to have various encoding size

pull/10/head
Tpt 6 years ago
parent 67623efa2f
commit 6725fc6dc5
  1. 423
      src/store/numeric_encoder.rs
  2. 2
      src/store/rocksdb/mod.rs
  3. 80
      src/store/rocksdb/storage.rs
  4. 6
      src/utils.rs

@ -1,45 +1,110 @@
use errors::*; use errors::*;
use model::*; use model::*;
use std::mem::size_of;
use std::ops::Deref; use std::ops::Deref;
use std::str; use std::str;
use std::str::FromStr; use std::str::FromStr;
use url::Url; use url::Url;
use utils::from_bytes_slice;
use utils::to_bytes;
use uuid::Uuid; use uuid::Uuid;
pub const STRING_KEY_SIZE: usize = 8;
pub trait BytesStore { pub trait BytesStore {
type BytesOutput: Deref<Target = [u8]>; type BytesOutput: Deref<Target = [u8]>;
fn put(&self, value: &[u8], id_buffer: &mut [u8]) -> Result<()>; fn put(&self, value: &[u8]) -> Result<usize>;
fn get(&self, id: &[u8]) -> Result<Option<Self::BytesOutput>>; fn get(&self, id: usize) -> Result<Option<Self::BytesOutput>>;
} }
const TYPE_KEY_SIZE: usize = 1;
const TYPE_NAMED_NODE_ID: u8 = 1; const TYPE_NAMED_NODE_ID: u8 = 1;
const TYPE_BLANK_NODE_ID: u8 = 2; const TYPE_BLANK_NODE_ID: u8 = 2;
const TYPE_LANG_STRING_LITERAL_ID: u8 = 3; const TYPE_LANG_STRING_LITERAL_ID: u8 = 3;
const TYPE_TYPED_LITERAL_ID: u8 = 4; const TYPE_TYPED_LITERAL_ID: u8 = 4;
pub const TERM_ENCODING_SIZE: usize = TYPE_KEY_SIZE + 2 * STRING_KEY_SIZE;
const EMPTY_TERM: [u8; TERM_ENCODING_SIZE] = [0 as u8; TERM_ENCODING_SIZE];
#[derive(Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Hash)] #[derive(Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Hash)]
pub struct EncodedTerm([u8; TERM_ENCODING_SIZE]); pub enum EncodedTerm {
NamedNode { iri_id: usize },
BlankNode(Uuid),
LangStringLiteral { value_id: usize, language_id: usize },
TypedLiteral { value_id: usize, datatype_id: usize },
}
impl EncodedTerm { impl EncodedTerm {
pub fn new_from_buffer(buffer: &[u8]) -> Result<Self> { pub fn new_from_buffer(buffer: &[u8]) -> Result<Self> {
if buffer.len() != TERM_ENCODING_SIZE { if buffer.is_empty() {
return Err("the term buffer has not the correct length".into()); return Err("the term buffer is empty.".into());
}
if buffer.len() < Self::type_length(buffer[0])? {
return Err(format!(
"the term buffer with id {} do not have at least {} bytes.",
buffer[0],
buffer.len()
).into());
}
match buffer[0] {
TYPE_NAMED_NODE_ID => Ok(EncodedTerm::NamedNode {
iri_id: from_bytes_slice(&buffer[1..1 + size_of::<usize>()]),
}),
TYPE_BLANK_NODE_ID => Ok(EncodedTerm::BlankNode(Uuid::from_bytes(&buffer[1..17])?)),
TYPE_LANG_STRING_LITERAL_ID => Ok(EncodedTerm::LangStringLiteral {
language_id: from_bytes_slice(&buffer[1..1 + size_of::<usize>()]),
value_id: from_bytes_slice(
&buffer[1 + size_of::<usize>()..1 + 2 * size_of::<usize>()],
),
}),
TYPE_TYPED_LITERAL_ID => Ok(EncodedTerm::TypedLiteral {
datatype_id: from_bytes_slice(&buffer[1..1 + size_of::<usize>()]),
value_id: from_bytes_slice(
&buffer[1 + size_of::<usize>()..1 + 2 * size_of::<usize>()],
),
}),
_ => Err("the term buffer has an invalid type id".into()),
}
}
pub fn encoding_size(&self) -> usize {
Self::type_length(self.type_id()).unwrap() //It is not possible to fail here
}
fn type_id(&self) -> u8 {
match self {
EncodedTerm::NamedNode { .. } => TYPE_NAMED_NODE_ID,
EncodedTerm::BlankNode(_) => TYPE_BLANK_NODE_ID,
EncodedTerm::LangStringLiteral { .. } => TYPE_LANG_STRING_LITERAL_ID,
EncodedTerm::TypedLiteral { .. } => TYPE_TYPED_LITERAL_ID,
}
}
fn type_length(type_id: u8) -> Result<usize> {
match type_id {
TYPE_NAMED_NODE_ID => Ok(1 + size_of::<usize>()),
TYPE_BLANK_NODE_ID => Ok(17), //TODO: guess
TYPE_LANG_STRING_LITERAL_ID => Ok(1 + 2 * size_of::<usize>()),
TYPE_TYPED_LITERAL_ID => Ok(1 + 2 * size_of::<usize>()),
_ => Err(format!("{} is not a known type id", type_id).into()),
} }
let mut buf = [0 as u8; TERM_ENCODING_SIZE];
buf.copy_from_slice(buffer);
return Ok(EncodedTerm(buf));
} }
}
impl AsRef<[u8]> for EncodedTerm { pub fn add_to_vec(&self, vec: &mut Vec<u8>) {
fn as_ref(&self) -> &[u8] { vec.push(self.type_id());
&self.0[..] match self {
EncodedTerm::NamedNode { iri_id } => vec.extend_from_slice(&to_bytes(*iri_id)),
EncodedTerm::BlankNode(id) => vec.extend_from_slice(id.as_bytes()),
EncodedTerm::LangStringLiteral {
value_id,
language_id,
} => {
vec.extend_from_slice(&to_bytes(*language_id));
vec.extend_from_slice(&to_bytes(*value_id));
}
EncodedTerm::TypedLiteral {
value_id,
datatype_id,
} => {
vec.extend_from_slice(&to_bytes(*datatype_id));
vec.extend_from_slice(&to_bytes(*value_id));
}
}
} }
} }
@ -48,94 +113,113 @@ pub struct EncodedQuad {
pub subject: EncodedTerm, pub subject: EncodedTerm,
pub predicate: EncodedTerm, pub predicate: EncodedTerm,
pub object: EncodedTerm, pub object: EncodedTerm,
pub graph_name: EncodedTerm, pub graph_name: Option<EncodedTerm>,
} }
impl EncodedQuad { impl EncodedQuad {
pub fn new_from_spog_buffer(buffer: &[u8]) -> Result<Self> { pub fn new_from_spog_buffer(buffer: &[u8]) -> Result<Self> {
if buffer.len() != 4 * TERM_ENCODING_SIZE { let mut start = 0 as usize;
return Err("the spog buffer has not the correct length".into()); let subject = EncodedTerm::new_from_buffer(&buffer[start..])?;
} start += subject.encoding_size();
let predicate = EncodedTerm::new_from_buffer(&buffer[start..])?;
start += predicate.encoding_size();
let object = EncodedTerm::new_from_buffer(&buffer[start..])?;
start += object.encoding_size();
let graph_name = if start < buffer.len() {
Some(EncodedTerm::new_from_buffer(&buffer[start..])?)
} else {
None
};
Ok(Self { Ok(Self {
subject: EncodedTerm::new_from_buffer(&buffer[0..TERM_ENCODING_SIZE])?, subject,
predicate: EncodedTerm::new_from_buffer( predicate,
&buffer[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE], object,
)?, graph_name,
object: EncodedTerm::new_from_buffer(
&buffer[2 * TERM_ENCODING_SIZE..3 * TERM_ENCODING_SIZE],
)?,
graph_name: EncodedTerm::new_from_buffer(
&buffer[3 * TERM_ENCODING_SIZE..4 * TERM_ENCODING_SIZE],
)?,
}) })
} }
pub fn new_from_posg_buffer(buffer: &[u8]) -> Result<Self> { pub fn new_from_posg_buffer(buffer: &[u8]) -> Result<Self> {
if buffer.len() != 4 * TERM_ENCODING_SIZE { let mut start = 0 as usize;
return Err("the posg buffer has not the correct length".into()); let predicate = EncodedTerm::new_from_buffer(&buffer[start..])?;
} start += predicate.encoding_size();
let object = EncodedTerm::new_from_buffer(&buffer[start..])?;
start += object.encoding_size();
let subject = EncodedTerm::new_from_buffer(&buffer[start..])?;
start += subject.encoding_size();
let graph_name = if start < buffer.len() {
Some(EncodedTerm::new_from_buffer(&buffer[start..])?)
} else {
None
};
Ok(Self { Ok(Self {
subject: EncodedTerm::new_from_buffer( subject,
&buffer[2 * TERM_ENCODING_SIZE..3 * TERM_ENCODING_SIZE], predicate,
)?, object,
predicate: EncodedTerm::new_from_buffer(&buffer[0..TERM_ENCODING_SIZE])?, graph_name,
object: EncodedTerm::new_from_buffer(
&buffer[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE],
)?,
graph_name: EncodedTerm::new_from_buffer(
&buffer[3 * TERM_ENCODING_SIZE..4 * TERM_ENCODING_SIZE],
)?,
}) })
} }
pub fn new_from_ospg_buffer(buffer: &[u8]) -> Result<Self> { pub fn new_from_ospg_buffer(buffer: &[u8]) -> Result<Self> {
if buffer.len() != 4 * TERM_ENCODING_SIZE { let mut start = 0 as usize;
return Err("the ospg buffer has not the correct length".into()); let object = EncodedTerm::new_from_buffer(&buffer[start..])?;
} start += object.encoding_size();
let subject = EncodedTerm::new_from_buffer(&buffer[start..])?;
start += subject.encoding_size();
let predicate = EncodedTerm::new_from_buffer(&buffer[start..])?;
start += predicate.encoding_size();
let graph_name = if start < buffer.len() {
Some(EncodedTerm::new_from_buffer(&buffer[start..])?)
} else {
None
};
Ok(Self { Ok(Self {
subject: EncodedTerm::new_from_buffer( subject,
&buffer[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE], predicate,
)?, object,
predicate: EncodedTerm::new_from_buffer( graph_name,
&buffer[2 * TERM_ENCODING_SIZE..3 * TERM_ENCODING_SIZE],
)?,
object: EncodedTerm::new_from_buffer(&buffer[0..TERM_ENCODING_SIZE])?,
graph_name: EncodedTerm::new_from_buffer(
&buffer[3 * TERM_ENCODING_SIZE..4 * TERM_ENCODING_SIZE],
)?,
}) })
} }
pub fn spog(&self) -> [u8; 4 * TERM_ENCODING_SIZE] { pub fn spog(&self) -> Vec<u8> {
let mut spog = [0 as u8; 4 * TERM_ENCODING_SIZE]; let mut spog = Vec::with_capacity(self.encoding_size());
spog[0..TERM_ENCODING_SIZE].copy_from_slice(self.subject.as_ref()); self.subject.add_to_vec(&mut spog);
spog[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE].copy_from_slice(self.predicate.as_ref()); self.predicate.add_to_vec(&mut spog);
spog[2 * TERM_ENCODING_SIZE..3 * TERM_ENCODING_SIZE].copy_from_slice(self.object.as_ref()); self.object.add_to_vec(&mut spog);
spog[3 * TERM_ENCODING_SIZE..4 * TERM_ENCODING_SIZE] if let Some(ref graph_name) = self.graph_name {
.copy_from_slice(self.graph_name.as_ref()); graph_name.add_to_vec(&mut spog);
}
spog spog
} }
pub fn posg(&self) -> [u8; 4 * TERM_ENCODING_SIZE] { pub fn posg(&self) -> Vec<u8> {
let mut posg = [0 as u8; 4 * TERM_ENCODING_SIZE]; let mut posg = Vec::with_capacity(self.encoding_size());
posg[0..TERM_ENCODING_SIZE].copy_from_slice(self.predicate.as_ref()); self.predicate.add_to_vec(&mut posg);
posg[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE].copy_from_slice(self.object.as_ref()); self.object.add_to_vec(&mut posg);
posg[2 * TERM_ENCODING_SIZE..3 * TERM_ENCODING_SIZE].copy_from_slice(self.subject.as_ref()); self.subject.add_to_vec(&mut posg);
posg[3 * TERM_ENCODING_SIZE..4 * TERM_ENCODING_SIZE] if let Some(ref graph_name) = self.graph_name {
.copy_from_slice(self.graph_name.as_ref()); graph_name.add_to_vec(&mut posg);
}
posg posg
} }
pub fn ospg(&self) -> [u8; 4 * TERM_ENCODING_SIZE] { pub fn ospg(&self) -> Vec<u8> {
let mut ospg = [0 as u8; 4 * TERM_ENCODING_SIZE]; let mut ospg = Vec::with_capacity(self.encoding_size());
ospg[0..TERM_ENCODING_SIZE].copy_from_slice(self.object.as_ref()); self.object.add_to_vec(&mut ospg);
ospg[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE].copy_from_slice(self.subject.as_ref()); self.subject.add_to_vec(&mut ospg);
ospg[2 * TERM_ENCODING_SIZE..3 * TERM_ENCODING_SIZE] self.predicate.add_to_vec(&mut ospg);
.copy_from_slice(self.predicate.as_ref()); if let Some(ref graph_name) = self.graph_name {
ospg[3 * TERM_ENCODING_SIZE..4 * TERM_ENCODING_SIZE] graph_name.add_to_vec(&mut ospg);
.copy_from_slice(self.graph_name.as_ref()); }
ospg ospg
} }
fn encoding_size(&self) -> usize {
self.subject.encoding_size() + self.predicate.encoding_size() + self.object.encoding_size()
+ match self.graph_name {
Some(ref graph_name) => graph_name.encoding_size(),
None => 0,
}
}
} }
pub struct Encoder<S: BytesStore> { pub struct Encoder<S: BytesStore> {
@ -148,30 +232,27 @@ impl<S: BytesStore> Encoder<S> {
} }
pub fn encode_named_node(&self, named_node: &NamedNode) -> Result<EncodedTerm> { pub fn encode_named_node(&self, named_node: &NamedNode) -> Result<EncodedTerm> {
let mut bytes = [0 as u8; TERM_ENCODING_SIZE]; Ok(EncodedTerm::NamedNode {
bytes[0] = TYPE_NAMED_NODE_ID; iri_id: self.encode_str_value(named_node.as_str())?,
self.encode_str_value_to_lower_bytes(named_node.as_str(), &mut bytes)?; })
Ok(EncodedTerm(bytes))
} }
pub fn encode_blank_node(&self, blank_node: &BlankNode) -> Result<EncodedTerm> { pub fn encode_blank_node(&self, blank_node: &BlankNode) -> Result<EncodedTerm> {
let mut bytes = [0 as u8; TERM_ENCODING_SIZE]; Ok(EncodedTerm::BlankNode(blank_node.deref().clone()))
bytes[0] = TYPE_BLANK_NODE_ID;
bytes[TYPE_KEY_SIZE..].copy_from_slice(blank_node.as_bytes());
Ok(EncodedTerm(bytes))
} }
pub fn encode_literal(&self, literal: &Literal) -> Result<EncodedTerm> { pub fn encode_literal(&self, literal: &Literal) -> Result<EncodedTerm> {
let mut bytes = [0 as u8; TERM_ENCODING_SIZE];
if let Some(language) = literal.language() { if let Some(language) = literal.language() {
bytes[0] = TYPE_LANG_STRING_LITERAL_ID; Ok(EncodedTerm::LangStringLiteral {
self.encode_str_value_to_upper_bytes(language, &mut bytes)?; value_id: self.encode_str_value(&literal.value())?,
language_id: self.encode_str_value(language)?,
})
} else { } else {
bytes[0] = TYPE_TYPED_LITERAL_ID; Ok(EncodedTerm::TypedLiteral {
self.encode_str_value_to_upper_bytes(literal.datatype().as_str(), &mut bytes)?; value_id: self.encode_str_value(&literal.value())?,
datatype_id: self.encode_str_value(literal.datatype().as_ref())?,
})
} }
self.encode_str_value_to_lower_bytes(literal.value().as_str(), &mut bytes)?;
Ok(EncodedTerm(bytes))
} }
pub fn encode_named_or_blank_node(&self, term: &NamedOrBlankNode) -> Result<EncodedTerm> { pub fn encode_named_or_blank_node(&self, term: &NamedOrBlankNode) -> Result<EncodedTerm> {
@ -181,16 +262,6 @@ impl<S: BytesStore> Encoder<S> {
} }
} }
pub fn encode_optional_named_or_blank_node(
&self,
term: &Option<NamedOrBlankNode>,
) -> Result<EncodedTerm> {
match term {
Some(node) => self.encode_named_or_blank_node(node),
None => Ok(EncodedTerm(EMPTY_TERM)),
}
}
pub fn encode_term(&self, term: &Term) -> Result<EncodedTerm> { pub fn encode_term(&self, term: &Term) -> Result<EncodedTerm> {
match term { match term {
Term::NamedNode(named_node) => self.encode_named_node(named_node), Term::NamedNode(named_node) => self.encode_named_node(named_node),
@ -204,117 +275,82 @@ impl<S: BytesStore> Encoder<S> {
subject: self.encode_named_or_blank_node(quad.subject())?, subject: self.encode_named_or_blank_node(quad.subject())?,
predicate: self.encode_named_node(quad.predicate())?, predicate: self.encode_named_node(quad.predicate())?,
object: self.encode_term(quad.object())?, object: self.encode_term(quad.object())?,
graph_name: self.encode_optional_named_or_blank_node(quad.graph_name())?, graph_name: match quad.graph_name() {
Some(graph_name) => Some(self.encode_named_or_blank_node(&graph_name)?),
None => None,
},
}) })
} }
pub fn decode_term(&self, encoded: impl AsRef<[u8]>) -> Result<Term> { pub fn decode_term(&self, encoded: &EncodedTerm) -> Result<Term> {
let encoding = encoded.as_ref(); match encoded {
match encoding[0] { EncodedTerm::NamedNode { iri_id } => {
TYPE_NAMED_NODE_ID => { Ok(NamedNode::from(self.decode_url_value(*iri_id)?).into())
let iri = self.decode_url_value_from_lower_bytes(encoding)?;
Ok(NamedNode::from(iri).into())
} }
TYPE_BLANK_NODE_ID => Ok(BlankNode::from(Uuid::from_bytes(&encoding[1..])?).into()), EncodedTerm::BlankNode(id) => Ok(BlankNode::from(*id).into()),
TYPE_LANG_STRING_LITERAL_ID => { EncodedTerm::LangStringLiteral {
let value = self.decode_str_value_from_lower_bytes(encoding)?; value_id,
let language = self.decode_str_value_from_upper_bytes(encoding)?; language_id,
Ok(Literal::new_language_tagged_literal(value, language).into()) } => Ok(Literal::new_language_tagged_literal(
} self.decode_str_value(*value_id)?,
TYPE_TYPED_LITERAL_ID => { self.decode_str_value(*language_id)?,
let value = self.decode_str_value_from_lower_bytes(encoding)?; ).into()),
let datatype = NamedNode::from(self.decode_url_value_from_upper_bytes(encoding)?); EncodedTerm::TypedLiteral {
Ok(Literal::new_typed_literal(value, datatype).into()) value_id,
} datatype_id,
_ => Err("invalid term type encoding".into()), } => Ok(Literal::new_typed_literal(
self.decode_str_value(*value_id)?,
NamedNode::from(self.decode_url_value(*datatype_id)?),
).into()),
} }
} }
pub fn decode_named_or_blank_node( pub fn decode_named_or_blank_node(&self, encoded: &EncodedTerm) -> Result<NamedOrBlankNode> {
&self, match self.decode_term(encoded)? {
encoded: impl AsRef<[u8]>,
) -> Result<NamedOrBlankNode> {
let encoding = encoded.as_ref();
match self.decode_term(encoding)? {
Term::NamedNode(named_node) => Ok(named_node.into()), Term::NamedNode(named_node) => Ok(named_node.into()),
Term::BlankNode(blank_node) => Ok(blank_node.into()), Term::BlankNode(blank_node) => Ok(blank_node.into()),
Term::Literal(_) => Err("A literal has ben found instead of a named node".into()), Term::Literal(_) => Err("A literal has ben found instead of a named node".into()),
} }
} }
pub fn decode_optional_named_or_blank_node( pub fn decode_named_node(&self, encoded: &EncodedTerm) -> Result<NamedNode> {
&self, match self.decode_term(encoded)? {
encoded: impl AsRef<[u8]>,
) -> Result<Option<NamedOrBlankNode>> {
let encoding = encoded.as_ref();
if encoding == EMPTY_TERM {
Ok(None)
} else {
Ok(Some(self.decode_named_or_blank_node(encoding)?))
}
}
pub fn decode_named_node(&self, encoded: impl AsRef<[u8]>) -> Result<NamedNode> {
let encoding = encoded.as_ref();
match self.decode_term(encoding)? {
Term::NamedNode(named_node) => Ok(named_node), Term::NamedNode(named_node) => Ok(named_node),
Term::BlankNode(_) => Err("A blank node has been found instead of a named node".into()), Term::BlankNode(_) => Err("A blank node has been found instead of a named node".into()),
Term::Literal(_) => Err("A literal has ben found instead of a named node".into()), Term::Literal(_) => Err("A literal has ben found instead of a named node".into()),
} }
} }
pub fn decode_quad(&self, encoded: EncodedQuad) -> Result<Quad> { pub fn decode_quad(&self, encoded: &EncodedQuad) -> Result<Quad> {
Ok(Quad::new( Ok(Quad::new(
self.decode_named_or_blank_node(encoded.subject)?, self.decode_named_or_blank_node(&encoded.subject)?,
self.decode_named_node(encoded.predicate)?, self.decode_named_node(&encoded.predicate)?,
self.decode_term(encoded.object)?, self.decode_term(&encoded.object)?,
self.decode_optional_named_or_blank_node(encoded.graph_name)?, match encoded.graph_name {
Some(ref graph_name) => Some(self.decode_named_or_blank_node(&graph_name)?),
None => None,
},
)) ))
} }
fn encode_str_value_to_upper_bytes(&self, text: &str, bytes: &mut [u8]) -> Result<()> { fn encode_str_value(&self, text: &str) -> Result<usize> {
self.string_store.put( self.string_store.put(text.as_bytes())
text.as_bytes(),
&mut bytes[TYPE_KEY_SIZE..TYPE_KEY_SIZE + STRING_KEY_SIZE],
)
}
fn encode_str_value_to_lower_bytes(&self, text: &str, bytes: &mut [u8]) -> Result<()> {
self.string_store.put(
text.as_bytes(),
&mut bytes[TYPE_KEY_SIZE + STRING_KEY_SIZE..TYPE_KEY_SIZE + 2 * STRING_KEY_SIZE],
)
}
fn decode_str_value_from_upper_bytes(&self, encoding: &[u8]) -> Result<String> {
let bytes = self.decode_value_from_upper_bytes(encoding)?;
Ok(str::from_utf8(&bytes)?.to_string())
} }
fn decode_url_value_from_upper_bytes(&self, encoding: &[u8]) -> Result<Url> { fn decode_url_value(&self, id: usize) -> Result<Url> {
let bytes = self.decode_value_from_upper_bytes(encoding)?; let bytes = self.decode_value(id)?;
Ok(Url::from_str(str::from_utf8(&bytes)?)?) Ok(Url::from_str(str::from_utf8(&bytes)?)?)
} }
fn decode_value_from_upper_bytes(&self, encoding: &[u8]) -> Result<S::BytesOutput> { fn decode_str_value(&self, id: usize) -> Result<String> {
self.string_store let bytes = self.decode_value(id)?;
.get(&encoding[TYPE_KEY_SIZE..TYPE_KEY_SIZE + STRING_KEY_SIZE])? Ok(str::from_utf8(&bytes)?.to_owned())
.ok_or(Error::from("value not found in the dictionary"))
}
fn decode_str_value_from_lower_bytes(&self, encoding: &[u8]) -> Result<String> {
let bytes = self.decode_value_from_lower_bytes(encoding)?;
Ok(str::from_utf8(&bytes)?.to_string())
}
fn decode_url_value_from_lower_bytes(&self, encoding: &[u8]) -> Result<Url> {
let bytes = self.decode_value_from_lower_bytes(encoding)?;
Ok(Url::from_str(str::from_utf8(&bytes)?)?)
} }
fn decode_value_from_lower_bytes(&self, encoding: &[u8]) -> Result<S::BytesOutput> { fn decode_value(&self, id: usize) -> Result<S::BytesOutput> {
self.string_store self.string_store
.get(&encoding[TYPE_KEY_SIZE + STRING_KEY_SIZE..TYPE_KEY_SIZE + 2 * STRING_KEY_SIZE])? .get(id)?
.ok_or(Error::from("value not found in the dictionary")) .ok_or("value not found in the dictionary".into())
} }
} }
@ -327,37 +363,34 @@ impl<S: BytesStore + Default> Default for Encoder<S> {
} }
mod test { mod test {
use errors::*;
use model::*; use model::*;
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::str::FromStr;
use store::numeric_encoder::*; use store::numeric_encoder::*;
use utils::to_bytes; use utils::to_bytes;
#[derive(Default)] #[derive(Default)]
struct MemoryBytesStore { struct MemoryBytesStore {
id2str: RefCell<BTreeMap<[u8; STRING_KEY_SIZE], Vec<u8>>>, id2str: RefCell<BTreeMap<usize, Vec<u8>>>,
str2id: RefCell<BTreeMap<Vec<u8>, [u8; STRING_KEY_SIZE]>>, str2id: RefCell<BTreeMap<Vec<u8>, usize>>,
} }
impl BytesStore for MemoryBytesStore { impl BytesStore for MemoryBytesStore {
type BytesOutput = Vec<u8>; type BytesOutput = Vec<u8>;
fn put(&self, value: &[u8], id_buffer: &mut [u8]) -> Result<()> { fn put(&self, value: &[u8]) -> Result<usize> {
let mut str2id = self.str2id.borrow_mut(); let mut str2id = self.str2id.borrow_mut();
let mut id2str = self.id2str.borrow_mut(); let mut id2str = self.id2str.borrow_mut();
let id = str2id.entry(value.to_vec()).or_insert_with(|| { let id = str2id.entry(value.to_vec()).or_insert_with(|| {
let id = to_bytes(id2str.len()); let id = id2str.len();
id2str.insert(id, value.to_vec()); id2str.insert(id, value.to_vec());
id id
}); });
id_buffer.copy_from_slice(id); Ok(*id)
Ok(())
} }
fn get(&self, id: &[u8]) -> Result<Option<Vec<u8>>> { fn get(&self, id: usize) -> Result<Option<Vec<u8>>> {
Ok(self.id2str.borrow().get(id).map(|s| s.to_owned())) Ok(self.id2str.borrow().get(&id).map(|s| s.to_owned()))
} }
} }
@ -376,7 +409,7 @@ mod test {
]; ];
for term in terms { for term in terms {
let encoded = encoder.encode_term(&term).unwrap(); let encoded = encoder.encode_term(&term).unwrap();
assert_eq!(term, encoder.decode_term(encoded).unwrap()) assert_eq!(term, encoder.decode_term(&encoded).unwrap())
} }
} }

@ -90,6 +90,6 @@ impl<'a, I: Iterator<Item = Result<EncodedQuad>>> Iterator for QuadsIterator<'a,
fn next(&mut self) -> Option<Result<Quad>> { fn next(&mut self) -> Option<Result<Quad>> {
self.iter self.iter
.next() .next()
.map(|k| k.and_then(|quad| self.encoder.decode_quad(quad))) .map(|k| k.and_then(|quad| self.encoder.decode_quad(&quad)))
} }
} }

@ -11,6 +11,7 @@ use std::str;
use std::sync::Mutex; use std::sync::Mutex;
use store::numeric_encoder::*; use store::numeric_encoder::*;
use utils::from_bytes; use utils::from_bytes;
use utils::from_bytes_slice;
use utils::to_bytes; use utils::to_bytes;
const ID2STR_CF: &'static str = "id2str"; const ID2STR_CF: &'static str = "id2str";
@ -74,7 +75,7 @@ impl RocksDbStore {
subject: EncodedTerm, subject: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> { ) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?; let mut iter = self.db.raw_iterator_cf(self.spog_cf)?;
iter.seek(subject.as_ref()); iter.seek(&encode_term(&subject));
Ok(FilteringEncodedQuadsIterator { Ok(FilteringEncodedQuadsIterator {
iter: SPOGIndexIterator { iter }, iter: SPOGIndexIterator { iter },
filter: EncodedQuadPattern::new(Some(subject), None, None, None), filter: EncodedQuadPattern::new(Some(subject), None, None, None),
@ -113,7 +114,7 @@ impl RocksDbStore {
predicate: EncodedTerm, predicate: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<POSGIndexIterator>> { ) -> Result<FilteringEncodedQuadsIterator<POSGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.posg_cf)?; let mut iter = self.db.raw_iterator_cf(self.posg_cf)?;
iter.seek(predicate.as_ref()); iter.seek(&encode_term(&predicate));
Ok(FilteringEncodedQuadsIterator { Ok(FilteringEncodedQuadsIterator {
iter: POSGIndexIterator { iter }, iter: POSGIndexIterator { iter },
filter: EncodedQuadPattern::new(None, Some(predicate), None, None), filter: EncodedQuadPattern::new(None, Some(predicate), None, None),
@ -138,7 +139,7 @@ impl RocksDbStore {
object: EncodedTerm, object: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<OSPGIndexIterator>> { ) -> Result<FilteringEncodedQuadsIterator<OSPGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.ospg_cf)?; let mut iter = self.db.raw_iterator_cf(self.ospg_cf)?;
iter.seek(object.as_ref()); iter.seek(&encode_term(&object));
Ok(FilteringEncodedQuadsIterator { Ok(FilteringEncodedQuadsIterator {
iter: OSPGIndexIterator { iter }, iter: OSPGIndexIterator { iter },
filter: EncodedQuadPattern::new(None, None, Some(object), None), filter: EncodedQuadPattern::new(None, None, Some(object), None),
@ -168,7 +169,7 @@ impl RocksDbStore {
pub fn get_cf(db: &DB, name: &str) -> Result<ColumnFamily> { pub fn get_cf(db: &DB, name: &str) -> Result<ColumnFamily> {
db.cf_handle(name) db.cf_handle(name)
.ok_or_else(|| Error::from("column family not found")) .ok_or_else(|| "column family not found".into())
} }
pub struct RocksDbBytesStore<'a>(&'a RocksDbStore); pub struct RocksDbBytesStore<'a>(&'a RocksDbStore);
@ -176,29 +177,28 @@ pub struct RocksDbBytesStore<'a>(&'a RocksDbStore);
impl<'a> BytesStore for RocksDbBytesStore<'a> { impl<'a> BytesStore for RocksDbBytesStore<'a> {
type BytesOutput = DBVector; type BytesOutput = DBVector;
fn put(&self, value: &[u8], id_buffer: &mut [u8]) -> Result<()> { fn put(&self, value: &[u8]) -> Result<usize> {
match self.0.db.get_cf(self.0.str2id_cf, value)? { Ok(match self.0.db.get_cf(self.0.str2id_cf, value)? {
Some(id) => id_buffer.copy_from_slice(&id), Some(id) => from_bytes_slice(&id),
None => { None => {
let id = to_bytes( let id = self
self.0 .0
.str_id_counter .str_id_counter
.lock() .lock()
.unwrap() .unwrap()
.get_and_increment(&self.0.db)?, .get_and_increment(&self.0.db)?;
); let id_bytes = to_bytes(id);
let mut batch = WriteBatch::default(); let mut batch = WriteBatch::default();
batch.put_cf(self.0.id2str_cf, &id, value)?; batch.put_cf(self.0.id2str_cf, &id_bytes, value)?;
batch.put_cf(self.0.str2id_cf, value, &id)?; batch.put_cf(self.0.str2id_cf, value, &id_bytes)?;
self.0.db.write(batch)?; self.0.db.write(batch)?;
id_buffer.copy_from_slice(&id) id
} }
} })
Ok(())
} }
fn get(&self, id: &[u8]) -> Result<Option<DBVector>> { fn get(&self, id: usize) -> Result<Option<DBVector>> {
Ok(self.0.db.get_cf(self.0.id2str_cf, id)?) Ok(self.0.db.get_cf(self.0.id2str_cf, &to_bytes(id))?)
} }
} }
@ -229,7 +229,7 @@ struct EncodedQuadPattern {
subject: Option<EncodedTerm>, subject: Option<EncodedTerm>,
predicate: Option<EncodedTerm>, predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>, object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>, graph_name: Option<Option<EncodedTerm>>,
} }
impl EncodedQuadPattern { impl EncodedQuadPattern {
@ -237,7 +237,7 @@ impl EncodedQuadPattern {
subject: Option<EncodedTerm>, subject: Option<EncodedTerm>,
predicate: Option<EncodedTerm>, predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>, object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>, graph_name: Option<Option<EncodedTerm>>,
) -> Self { ) -> Self {
Self { Self {
subject, subject,
@ -272,23 +272,25 @@ impl EncodedQuadPattern {
} }
} }
fn encode_term_pair(t1: &EncodedTerm, t2: &EncodedTerm) -> [u8; 2 * TERM_ENCODING_SIZE] { fn encode_term(t: &EncodedTerm) -> Vec<u8> {
let mut bytes = [0 as u8; 2 * TERM_ENCODING_SIZE]; let mut vec = Vec::with_capacity(t.encoding_size());
bytes[0..TERM_ENCODING_SIZE].copy_from_slice(t1.as_ref()); t.add_to_vec(&mut vec);
bytes[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE].copy_from_slice(t2.as_ref()); vec
bytes }
fn encode_term_pair(t1: &EncodedTerm, t2: &EncodedTerm) -> Vec<u8> {
let mut vec = Vec::with_capacity(t1.encoding_size() + t2.encoding_size());
t1.add_to_vec(&mut vec);
t2.add_to_vec(&mut vec);
vec
} }
fn encode_term_triple( fn encode_term_triple(t1: &EncodedTerm, t2: &EncodedTerm, t3: &EncodedTerm) -> Vec<u8> {
t1: &EncodedTerm, let mut vec = Vec::with_capacity(t1.encoding_size() + t2.encoding_size() + t3.encoding_size());
t2: &EncodedTerm, t1.add_to_vec(&mut vec);
t3: &EncodedTerm, t2.add_to_vec(&mut vec);
) -> [u8; 3 * TERM_ENCODING_SIZE] { t3.add_to_vec(&mut vec);
let mut bytes = [0 as u8; 3 * TERM_ENCODING_SIZE]; vec
bytes[0..TERM_ENCODING_SIZE].copy_from_slice(t1.as_ref());
bytes[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE].copy_from_slice(t2.as_ref());
bytes[2 * TERM_ENCODING_SIZE..3 * TERM_ENCODING_SIZE].copy_from_slice(t3.as_ref());
bytes
} }
pub struct SPOGIndexIterator { pub struct SPOGIndexIterator {
@ -347,7 +349,7 @@ impl<I: Iterator<Item = Result<EncodedQuad>>> Iterator for FilteringEncodedQuads
fn next(&mut self) -> Option<Result<EncodedQuad>> { fn next(&mut self) -> Option<Result<EncodedQuad>> {
self.iter.next().filter(|quad| match quad { self.iter.next().filter(|quad| match quad {
Ok(quad) => self.filter.filter(quad), Ok(quad) => self.filter.filter(quad),
Err(e) => true, Err(_) => true,
}) })
} }
} }

@ -90,3 +90,9 @@ pub fn from_bytes(bytes: [u8; size_of::<usize>()]) -> usize {
//TODO: remove when next rust version stabilize this method //TODO: remove when next rust version stabilize this method
unsafe { transmute(bytes) } unsafe { transmute(bytes) }
} }
pub fn from_bytes_slice(bytes: &[u8]) -> usize {
let mut buf = [0 as u8; size_of::<usize>()];
buf.copy_from_slice(bytes);
from_bytes(buf)
}

Loading…
Cancel
Save