Makes the term not dependant of the OS

pull/10/head
Tpt 6 years ago
parent 6725fc6dc5
commit a495af1b75
  1. 1
      Cargo.toml
  2. 1
      src/lib.rs
  3. 178
      src/store/numeric_encoder.rs
  4. 58
      src/store/rocksdb/storage.rs
  5. 8
      src/utils.rs

@ -20,6 +20,7 @@ lazy_static = "1"
rocksdb = "0.10"
url = "1"
uuid = { version = "0.6", features = ["v4"] }
byteorder = "1"
[build-dependencies]
peg = "0.5"

@ -1,3 +1,4 @@
extern crate byteorder;
#[macro_use]
extern crate error_chain;
#[macro_use]

@ -1,19 +1,21 @@
use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt};
use errors::*;
use model::*;
use std::io::Cursor;
use std::io::Read;
use std::io::Write;
use std::mem::size_of;
use std::ops::Deref;
use std::str;
use std::str::FromStr;
use url::Url;
use utils::from_bytes_slice;
use utils::to_bytes;
use uuid::Uuid;
pub trait BytesStore {
type BytesOutput: Deref<Target = [u8]>;
fn put(&self, value: &[u8]) -> Result<usize>;
fn get(&self, id: usize) -> Result<Option<Self::BytesOutput>>;
fn put(&self, value: &[u8]) -> Result<u64>;
fn get(&self, id: u64) -> Result<Option<Self::BytesOutput>>;
}
const TYPE_NAMED_NODE_ID: u8 = 1;
@ -23,40 +25,31 @@ const TYPE_TYPED_LITERAL_ID: u8 = 4;
#[derive(Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Hash)]
pub enum EncodedTerm {
NamedNode { iri_id: usize },
NamedNode { iri_id: u64 },
BlankNode(Uuid),
LangStringLiteral { value_id: usize, language_id: usize },
TypedLiteral { value_id: usize, datatype_id: usize },
LangStringLiteral { value_id: u64, language_id: u64 },
TypedLiteral { value_id: u64, datatype_id: u64 },
}
impl EncodedTerm {
pub fn new_from_buffer(buffer: &[u8]) -> Result<Self> {
if buffer.is_empty() {
return Err("the term buffer is empty.".into());
}
if buffer.len() < Self::type_length(buffer[0])? {
return Err(format!(
"the term buffer with id {} do not have at least {} bytes.",
buffer[0],
buffer.len()
).into());
}
match buffer[0] {
pub fn read(reader: &mut impl Read) -> Result<Self> {
let type_id = reader.read_u8()?;
match type_id {
TYPE_NAMED_NODE_ID => Ok(EncodedTerm::NamedNode {
iri_id: from_bytes_slice(&buffer[1..1 + size_of::<usize>()]),
iri_id: reader.read_u64::<NetworkEndian>()?,
}),
TYPE_BLANK_NODE_ID => Ok(EncodedTerm::BlankNode(Uuid::from_bytes(&buffer[1..17])?)),
TYPE_BLANK_NODE_ID => {
let mut uuid_buffer = [0 as u8; 16];
reader.read_exact(&mut uuid_buffer)?;
Ok(EncodedTerm::BlankNode(Uuid::from_bytes(&uuid_buffer)?))
}
TYPE_LANG_STRING_LITERAL_ID => Ok(EncodedTerm::LangStringLiteral {
language_id: from_bytes_slice(&buffer[1..1 + size_of::<usize>()]),
value_id: from_bytes_slice(
&buffer[1 + size_of::<usize>()..1 + 2 * size_of::<usize>()],
),
language_id: reader.read_u64::<NetworkEndian>()?,
value_id: reader.read_u64::<NetworkEndian>()?,
}),
TYPE_TYPED_LITERAL_ID => Ok(EncodedTerm::TypedLiteral {
datatype_id: from_bytes_slice(&buffer[1..1 + size_of::<usize>()]),
value_id: from_bytes_slice(
&buffer[1 + size_of::<usize>()..1 + 2 * size_of::<usize>()],
),
datatype_id: reader.read_u64::<NetworkEndian>()?,
value_id: reader.read_u64::<NetworkEndian>()?,
}),
_ => Err("the term buffer has an invalid type id".into()),
}
@ -76,35 +69,37 @@ impl EncodedTerm {
}
fn type_length(type_id: u8) -> Result<usize> {
//TODO: useful
match type_id {
TYPE_NAMED_NODE_ID => Ok(1 + size_of::<usize>()),
TYPE_NAMED_NODE_ID => Ok(1 + size_of::<u64>()),
TYPE_BLANK_NODE_ID => Ok(17), //TODO: guess
TYPE_LANG_STRING_LITERAL_ID => Ok(1 + 2 * size_of::<usize>()),
TYPE_TYPED_LITERAL_ID => Ok(1 + 2 * size_of::<usize>()),
TYPE_LANG_STRING_LITERAL_ID => Ok(1 + 2 * size_of::<u64>()),
TYPE_TYPED_LITERAL_ID => Ok(1 + 2 * size_of::<u64>()),
_ => Err(format!("{} is not a known type id", type_id).into()),
}
}
pub fn add_to_vec(&self, vec: &mut Vec<u8>) {
vec.push(self.type_id());
pub fn write(&self, writer: &mut impl Write) -> Result<()> {
writer.write_u8(self.type_id())?;
match self {
EncodedTerm::NamedNode { iri_id } => vec.extend_from_slice(&to_bytes(*iri_id)),
EncodedTerm::BlankNode(id) => vec.extend_from_slice(id.as_bytes()),
EncodedTerm::NamedNode { iri_id } => writer.write_u64::<NetworkEndian>(*iri_id)?,
EncodedTerm::BlankNode(id) => writer.write_all(id.as_bytes())?,
EncodedTerm::LangStringLiteral {
value_id,
language_id,
} => {
vec.extend_from_slice(&to_bytes(*language_id));
vec.extend_from_slice(&to_bytes(*value_id));
writer.write_u64::<NetworkEndian>(*language_id)?;
writer.write_u64::<NetworkEndian>(*value_id)?;
}
EncodedTerm::TypedLiteral {
value_id,
datatype_id,
} => {
vec.extend_from_slice(&to_bytes(*datatype_id));
vec.extend_from_slice(&to_bytes(*value_id));
writer.write_u64::<NetworkEndian>(*datatype_id)?;
writer.write_u64::<NetworkEndian>(*value_id)?;
}
}
Ok(())
}
}
@ -118,15 +113,12 @@ pub struct EncodedQuad {
impl EncodedQuad {
pub fn new_from_spog_buffer(buffer: &[u8]) -> Result<Self> {
let mut start = 0 as usize;
let subject = EncodedTerm::new_from_buffer(&buffer[start..])?;
start += subject.encoding_size();
let predicate = EncodedTerm::new_from_buffer(&buffer[start..])?;
start += predicate.encoding_size();
let object = EncodedTerm::new_from_buffer(&buffer[start..])?;
start += object.encoding_size();
let graph_name = if start < buffer.len() {
Some(EncodedTerm::new_from_buffer(&buffer[start..])?)
let mut cursor = Cursor::new(buffer);
let subject = EncodedTerm::read(&mut cursor)?;
let predicate = EncodedTerm::read(&mut cursor)?;
let object = EncodedTerm::read(&mut cursor)?;
let graph_name = if cursor.position() < buffer.len() as u64 {
Some(EncodedTerm::read(&mut cursor)?)
} else {
None
};
@ -139,15 +131,12 @@ impl EncodedQuad {
}
pub fn new_from_posg_buffer(buffer: &[u8]) -> Result<Self> {
let mut start = 0 as usize;
let predicate = EncodedTerm::new_from_buffer(&buffer[start..])?;
start += predicate.encoding_size();
let object = EncodedTerm::new_from_buffer(&buffer[start..])?;
start += object.encoding_size();
let subject = EncodedTerm::new_from_buffer(&buffer[start..])?;
start += subject.encoding_size();
let graph_name = if start < buffer.len() {
Some(EncodedTerm::new_from_buffer(&buffer[start..])?)
let mut cursor = Cursor::new(buffer);
let predicate = EncodedTerm::read(&mut cursor)?;
let object = EncodedTerm::read(&mut cursor)?;
let subject = EncodedTerm::read(&mut cursor)?;
let graph_name = if cursor.position() < buffer.len() as u64 {
Some(EncodedTerm::read(&mut cursor)?)
} else {
None
};
@ -160,15 +149,12 @@ impl EncodedQuad {
}
pub fn new_from_ospg_buffer(buffer: &[u8]) -> Result<Self> {
let mut start = 0 as usize;
let object = EncodedTerm::new_from_buffer(&buffer[start..])?;
start += object.encoding_size();
let subject = EncodedTerm::new_from_buffer(&buffer[start..])?;
start += subject.encoding_size();
let predicate = EncodedTerm::new_from_buffer(&buffer[start..])?;
start += predicate.encoding_size();
let graph_name = if start < buffer.len() {
Some(EncodedTerm::new_from_buffer(&buffer[start..])?)
let mut cursor = Cursor::new(buffer);
let object = EncodedTerm::read(&mut cursor)?;
let subject = EncodedTerm::read(&mut cursor)?;
let predicate = EncodedTerm::read(&mut cursor)?;
let graph_name = if cursor.position() < buffer.len() as u64 {
Some(EncodedTerm::read(&mut cursor)?)
} else {
None
};
@ -180,37 +166,37 @@ impl EncodedQuad {
})
}
pub fn spog(&self) -> Vec<u8> {
pub fn spog(&self) -> Result<Vec<u8>> {
let mut spog = Vec::with_capacity(self.encoding_size());
self.subject.add_to_vec(&mut spog);
self.predicate.add_to_vec(&mut spog);
self.object.add_to_vec(&mut spog);
self.subject.write(&mut spog)?;
self.predicate.write(&mut spog)?;
self.object.write(&mut spog)?;
if let Some(ref graph_name) = self.graph_name {
graph_name.add_to_vec(&mut spog);
graph_name.write(&mut spog)?;
}
spog
Ok(spog)
}
pub fn posg(&self) -> Vec<u8> {
pub fn posg(&self) -> Result<Vec<u8>> {
let mut posg = Vec::with_capacity(self.encoding_size());
self.predicate.add_to_vec(&mut posg);
self.object.add_to_vec(&mut posg);
self.subject.add_to_vec(&mut posg);
self.predicate.write(&mut posg)?;
self.object.write(&mut posg)?;
self.subject.write(&mut posg)?;
if let Some(ref graph_name) = self.graph_name {
graph_name.add_to_vec(&mut posg);
graph_name.write(&mut posg)?;
}
posg
Ok(posg)
}
pub fn ospg(&self) -> Vec<u8> {
pub fn ospg(&self) -> Result<Vec<u8>> {
let mut ospg = Vec::with_capacity(self.encoding_size());
self.object.add_to_vec(&mut ospg);
self.subject.add_to_vec(&mut ospg);
self.predicate.add_to_vec(&mut ospg);
self.object.write(&mut ospg)?;
self.subject.write(&mut ospg)?;
self.predicate.write(&mut ospg)?;
if let Some(ref graph_name) = self.graph_name {
graph_name.add_to_vec(&mut ospg);
graph_name.write(&mut ospg)?;
}
ospg
Ok(ospg)
}
fn encoding_size(&self) -> usize {
@ -333,21 +319,21 @@ impl<S: BytesStore> Encoder<S> {
))
}
fn encode_str_value(&self, text: &str) -> Result<usize> {
fn encode_str_value(&self, text: &str) -> Result<u64> {
self.string_store.put(text.as_bytes())
}
fn decode_url_value(&self, id: usize) -> Result<Url> {
fn decode_url_value(&self, id: u64) -> Result<Url> {
let bytes = self.decode_value(id)?;
Ok(Url::from_str(str::from_utf8(&bytes)?)?)
}
fn decode_str_value(&self, id: usize) -> Result<String> {
fn decode_str_value(&self, id: u64) -> Result<String> {
let bytes = self.decode_value(id)?;
Ok(str::from_utf8(&bytes)?.to_owned())
}
fn decode_value(&self, id: usize) -> Result<S::BytesOutput> {
fn decode_value(&self, id: u64) -> Result<S::BytesOutput> {
self.string_store
.get(id)?
.ok_or("value not found in the dictionary".into())
@ -363,39 +349,39 @@ impl<S: BytesStore + Default> Default for Encoder<S> {
}
mod test {
use model::*;
use std::cell::RefCell;
use std::collections::BTreeMap;
use store::numeric_encoder::*;
use utils::to_bytes;
#[derive(Default)]
struct MemoryBytesStore {
id2str: RefCell<BTreeMap<usize, Vec<u8>>>,
str2id: RefCell<BTreeMap<Vec<u8>, usize>>,
id2str: RefCell<BTreeMap<u64, Vec<u8>>>,
str2id: RefCell<BTreeMap<Vec<u8>, u64>>,
}
impl BytesStore for MemoryBytesStore {
type BytesOutput = Vec<u8>;
fn put(&self, value: &[u8]) -> Result<usize> {
fn put(&self, value: &[u8]) -> Result<u64> {
let mut str2id = self.str2id.borrow_mut();
let mut id2str = self.id2str.borrow_mut();
let id = str2id.entry(value.to_vec()).or_insert_with(|| {
let id = id2str.len();
let id = id2str.len() as u64;
id2str.insert(id, value.to_vec());
id
});
Ok(*id)
}
fn get(&self, id: usize) -> Result<Option<Vec<u8>>> {
fn get(&self, id: u64) -> Result<Option<Vec<u8>>> {
Ok(self.id2str.borrow().get(&id).map(|s| s.to_owned()))
}
}
#[test]
fn test_encoding() {
use model::*;
let encoder: Encoder<MemoryBytesStore> = Encoder::default();
let terms: Vec<Term> = vec![
NamedNode::from_str("http://foo.com").unwrap().into(),

@ -75,7 +75,7 @@ impl RocksDbStore {
subject: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?;
iter.seek(&encode_term(&subject));
iter.seek(&encode_term(&subject)?);
Ok(FilteringEncodedQuadsIterator {
iter: SPOGIndexIterator { iter },
filter: EncodedQuadPattern::new(Some(subject), None, None, None),
@ -88,7 +88,7 @@ impl RocksDbStore {
predicate: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?;
iter.seek(&encode_term_pair(&subject, &predicate));
iter.seek(&encode_term_pair(&subject, &predicate)?);
Ok(FilteringEncodedQuadsIterator {
iter: SPOGIndexIterator { iter },
filter: EncodedQuadPattern::new(Some(subject), Some(predicate), None, None),
@ -102,7 +102,7 @@ impl RocksDbStore {
object: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?;
iter.seek(&encode_term_triple(&subject, &predicate, &object));
iter.seek(&encode_term_triple(&subject, &predicate, &object)?);
Ok(FilteringEncodedQuadsIterator {
iter: SPOGIndexIterator { iter },
filter: EncodedQuadPattern::new(Some(subject), Some(predicate), Some(object), None),
@ -114,7 +114,7 @@ impl RocksDbStore {
predicate: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<POSGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.posg_cf)?;
iter.seek(&encode_term(&predicate));
iter.seek(&encode_term(&predicate)?);
Ok(FilteringEncodedQuadsIterator {
iter: POSGIndexIterator { iter },
filter: EncodedQuadPattern::new(None, Some(predicate), None, None),
@ -127,7 +127,7 @@ impl RocksDbStore {
object: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<POSGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?;
iter.seek(&encode_term_pair(&predicate, &object));
iter.seek(&encode_term_pair(&predicate, &object)?);
Ok(FilteringEncodedQuadsIterator {
iter: POSGIndexIterator { iter },
filter: EncodedQuadPattern::new(None, Some(predicate), Some(object), None),
@ -139,7 +139,7 @@ impl RocksDbStore {
object: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<OSPGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.ospg_cf)?;
iter.seek(&encode_term(&object));
iter.seek(&encode_term(&object)?);
Ok(FilteringEncodedQuadsIterator {
iter: OSPGIndexIterator { iter },
filter: EncodedQuadPattern::new(None, None, Some(object), None),
@ -147,22 +147,22 @@ impl RocksDbStore {
}
pub fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
Ok(self.db.get_cf(self.spog_cf, &quad.spog())?.is_some())
Ok(self.db.get_cf(self.spog_cf, &quad.spog()?)?.is_some())
}
pub fn insert(&self, quad: &EncodedQuad) -> Result<()> {
let mut batch = WriteBatch::default();
batch.put_cf(self.spog_cf, &quad.spog(), &EMPTY_BUF)?;
batch.put_cf(self.posg_cf, &quad.posg(), &EMPTY_BUF)?;
batch.put_cf(self.ospg_cf, &quad.ospg(), &EMPTY_BUF)?;
batch.put_cf(self.spog_cf, &quad.spog()?, &EMPTY_BUF)?;
batch.put_cf(self.posg_cf, &quad.posg()?, &EMPTY_BUF)?;
batch.put_cf(self.ospg_cf, &quad.ospg()?, &EMPTY_BUF)?;
Ok(self.db.write(batch)?) //TODO: check what's going on if the key already exists
}
pub fn remove(&self, quad: &EncodedQuad) -> Result<()> {
let mut batch = WriteBatch::default();
batch.delete_cf(self.spog_cf, &quad.spog())?;
batch.delete_cf(self.posg_cf, &quad.posg())?;
batch.delete_cf(self.ospg_cf, &quad.ospg())?;
batch.delete_cf(self.spog_cf, &quad.spog()?)?;
batch.delete_cf(self.posg_cf, &quad.posg()?)?;
batch.delete_cf(self.ospg_cf, &quad.ospg()?)?;
Ok(self.db.write(batch)?)
}
}
@ -177,7 +177,7 @@ pub struct RocksDbBytesStore<'a>(&'a RocksDbStore);
impl<'a> BytesStore for RocksDbBytesStore<'a> {
type BytesOutput = DBVector;
fn put(&self, value: &[u8]) -> Result<usize> {
fn put(&self, value: &[u8]) -> Result<u64> {
Ok(match self.0.db.get_cf(self.0.str2id_cf, value)? {
Some(id) => from_bytes_slice(&id),
None => {
@ -186,7 +186,7 @@ impl<'a> BytesStore for RocksDbBytesStore<'a> {
.str_id_counter
.lock()
.unwrap()
.get_and_increment(&self.0.db)?;
.get_and_increment(&self.0.db)? as u64;
let id_bytes = to_bytes(id);
let mut batch = WriteBatch::default();
batch.put_cf(self.0.id2str_cf, &id_bytes, value)?;
@ -197,7 +197,7 @@ impl<'a> BytesStore for RocksDbBytesStore<'a> {
})
}
fn get(&self, id: usize) -> Result<Option<DBVector>> {
fn get(&self, id: u64) -> Result<Option<DBVector>> {
Ok(self.0.db.get_cf(self.0.id2str_cf, &to_bytes(id))?)
}
}
@ -211,7 +211,7 @@ impl RocksDBCounter {
Self { name }
}
fn get_and_increment(&self, db: &DB) -> Result<usize> {
fn get_and_increment(&self, db: &DB) -> Result<u64> {
let value = db
.get(self.name.as_bytes())?
.map(|b| {
@ -272,25 +272,25 @@ impl EncodedQuadPattern {
}
}
fn encode_term(t: &EncodedTerm) -> Vec<u8> {
fn encode_term(t: &EncodedTerm) -> Result<Vec<u8>> {
let mut vec = Vec::with_capacity(t.encoding_size());
t.add_to_vec(&mut vec);
vec
t.write(&mut vec)?;
Ok(vec)
}
fn encode_term_pair(t1: &EncodedTerm, t2: &EncodedTerm) -> Vec<u8> {
fn encode_term_pair(t1: &EncodedTerm, t2: &EncodedTerm) -> Result<Vec<u8>> {
let mut vec = Vec::with_capacity(t1.encoding_size() + t2.encoding_size());
t1.add_to_vec(&mut vec);
t2.add_to_vec(&mut vec);
vec
t1.write(&mut vec)?;
t2.write(&mut vec)?;
Ok(vec)
}
fn encode_term_triple(t1: &EncodedTerm, t2: &EncodedTerm, t3: &EncodedTerm) -> Vec<u8> {
fn encode_term_triple(t1: &EncodedTerm, t2: &EncodedTerm, t3: &EncodedTerm) -> Result<Vec<u8>> {
let mut vec = Vec::with_capacity(t1.encoding_size() + t2.encoding_size() + t3.encoding_size());
t1.add_to_vec(&mut vec);
t2.add_to_vec(&mut vec);
t3.add_to_vec(&mut vec);
vec
t1.write(&mut vec)?;
t2.write(&mut vec)?;
t3.write(&mut vec)?;
Ok(vec)
}
pub struct SPOGIndexIterator {

@ -81,18 +81,18 @@ impl ExactSizeIterator for EscapeRDF {
}
}
pub fn to_bytes(int: usize) -> [u8; size_of::<usize>()] {
pub fn to_bytes(int: u64) -> [u8; size_of::<u64>()] {
//TODO: remove when next rust version stabilize this method
unsafe { transmute(int) }
}
pub fn from_bytes(bytes: [u8; size_of::<usize>()]) -> usize {
pub fn from_bytes(bytes: [u8; size_of::<u64>()]) -> u64 {
//TODO: remove when next rust version stabilize this method
unsafe { transmute(bytes) }
}
pub fn from_bytes_slice(bytes: &[u8]) -> usize {
let mut buf = [0 as u8; size_of::<usize>()];
pub fn from_bytes_slice(bytes: &[u8]) -> u64 {
let mut buf = [0 as u8; size_of::<u64>()];
buf.copy_from_slice(bytes);
from_bytes(buf)
}

Loading…
Cancel
Save