Refactor codes

Allows to write easily binary dumps with our encoding
pull/10/head
Tpt 6 years ago
parent a495af1b75
commit 7ace14916c
  1. 242
      src/store/numeric_encoder.rs
  2. 60
      src/store/rocksdb/storage.rs

@ -1,10 +1,8 @@
use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt}; use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt};
use errors::*; use errors::*;
use model::*; use model::*;
use std::io::Cursor;
use std::io::Read; use std::io::Read;
use std::io::Write; use std::io::Write;
use std::mem::size_of;
use std::ops::Deref; use std::ops::Deref;
use std::str; use std::str;
use std::str::FromStr; use std::str::FromStr;
@ -18,6 +16,7 @@ pub trait BytesStore {
fn get(&self, id: u64) -> Result<Option<Self::BytesOutput>>; fn get(&self, id: u64) -> Result<Option<Self::BytesOutput>>;
} }
const TYPE_NOTHING_ID: u8 = 0;
const TYPE_NAMED_NODE_ID: u8 = 1; const TYPE_NAMED_NODE_ID: u8 = 1;
const TYPE_BLANK_NODE_ID: u8 = 2; const TYPE_BLANK_NODE_ID: u8 = 2;
const TYPE_LANG_STRING_LITERAL_ID: u8 = 3; const TYPE_LANG_STRING_LITERAL_ID: u8 = 3;
@ -32,33 +31,6 @@ pub enum EncodedTerm {
} }
impl EncodedTerm { impl EncodedTerm {
pub fn read(reader: &mut impl Read) -> Result<Self> {
let type_id = reader.read_u8()?;
match type_id {
TYPE_NAMED_NODE_ID => Ok(EncodedTerm::NamedNode {
iri_id: reader.read_u64::<NetworkEndian>()?,
}),
TYPE_BLANK_NODE_ID => {
let mut uuid_buffer = [0 as u8; 16];
reader.read_exact(&mut uuid_buffer)?;
Ok(EncodedTerm::BlankNode(Uuid::from_bytes(&uuid_buffer)?))
}
TYPE_LANG_STRING_LITERAL_ID => Ok(EncodedTerm::LangStringLiteral {
language_id: reader.read_u64::<NetworkEndian>()?,
value_id: reader.read_u64::<NetworkEndian>()?,
}),
TYPE_TYPED_LITERAL_ID => Ok(EncodedTerm::TypedLiteral {
datatype_id: reader.read_u64::<NetworkEndian>()?,
value_id: reader.read_u64::<NetworkEndian>()?,
}),
_ => Err("the term buffer has an invalid type id".into()),
}
}
pub fn encoding_size(&self) -> usize {
Self::type_length(self.type_id()).unwrap() //It is not possible to fail here
}
fn type_id(&self) -> u8 { fn type_id(&self) -> u8 {
match self { match self {
EncodedTerm::NamedNode { .. } => TYPE_NAMED_NODE_ID, EncodedTerm::NamedNode { .. } => TYPE_NAMED_NODE_ID,
@ -67,40 +39,6 @@ impl EncodedTerm {
EncodedTerm::TypedLiteral { .. } => TYPE_TYPED_LITERAL_ID, EncodedTerm::TypedLiteral { .. } => TYPE_TYPED_LITERAL_ID,
} }
} }
fn type_length(type_id: u8) -> Result<usize> {
//TODO: useful
match type_id {
TYPE_NAMED_NODE_ID => Ok(1 + size_of::<u64>()),
TYPE_BLANK_NODE_ID => Ok(17), //TODO: guess
TYPE_LANG_STRING_LITERAL_ID => Ok(1 + 2 * size_of::<u64>()),
TYPE_TYPED_LITERAL_ID => Ok(1 + 2 * size_of::<u64>()),
_ => Err(format!("{} is not a known type id", type_id).into()),
}
}
pub fn write(&self, writer: &mut impl Write) -> Result<()> {
writer.write_u8(self.type_id())?;
match self {
EncodedTerm::NamedNode { iri_id } => writer.write_u64::<NetworkEndian>(*iri_id)?,
EncodedTerm::BlankNode(id) => writer.write_all(id.as_bytes())?,
EncodedTerm::LangStringLiteral {
value_id,
language_id,
} => {
writer.write_u64::<NetworkEndian>(*language_id)?;
writer.write_u64::<NetworkEndian>(*value_id)?;
}
EncodedTerm::TypedLiteral {
value_id,
datatype_id,
} => {
writer.write_u64::<NetworkEndian>(*datatype_id)?;
writer.write_u64::<NetworkEndian>(*value_id)?;
}
}
Ok(())
}
} }
#[derive(Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Hash)] #[derive(Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Hash)]
@ -111,18 +49,35 @@ pub struct EncodedQuad {
pub graph_name: Option<EncodedTerm>, pub graph_name: Option<EncodedTerm>,
} }
impl EncodedQuad { pub trait TermReader {
pub fn new_from_spog_buffer(buffer: &[u8]) -> Result<Self> { fn read_term(&mut self) -> Result<EncodedTerm>;
let mut cursor = Cursor::new(buffer); fn read_optional_term(&mut self) -> Result<Option<EncodedTerm>>;
let subject = EncodedTerm::read(&mut cursor)?; fn read_spog_quad(&mut self) -> Result<EncodedQuad>;
let predicate = EncodedTerm::read(&mut cursor)?; fn read_posg_quad(&mut self) -> Result<EncodedQuad>;
let object = EncodedTerm::read(&mut cursor)?; fn read_ospg_quad(&mut self) -> Result<EncodedQuad>;
let graph_name = if cursor.position() < buffer.len() as u64 { }
Some(EncodedTerm::read(&mut cursor)?)
impl<R: Read> TermReader for R {
fn read_term(&mut self) -> Result<EncodedTerm> {
let type_id = self.read_u8()?;
read_term_after_type(self, type_id)
}
fn read_optional_term(&mut self) -> Result<Option<EncodedTerm>> {
let type_id = self.read_u8()?;
if type_id == 0 {
Ok(None)
} else { } else {
None Ok(Some(read_term_after_type(self, type_id)?))
}; }
Ok(Self { }
fn read_spog_quad(&mut self) -> Result<EncodedQuad> {
let subject = self.read_term()?;
let predicate = self.read_term()?;
let object = self.read_term()?;
let graph_name = self.read_optional_term()?;
Ok(EncodedQuad {
subject, subject,
predicate, predicate,
object, object,
@ -130,17 +85,12 @@ impl EncodedQuad {
}) })
} }
pub fn new_from_posg_buffer(buffer: &[u8]) -> Result<Self> { fn read_posg_quad(&mut self) -> Result<EncodedQuad> {
let mut cursor = Cursor::new(buffer); let predicate = self.read_term()?;
let predicate = EncodedTerm::read(&mut cursor)?; let object = self.read_term()?;
let object = EncodedTerm::read(&mut cursor)?; let subject = self.read_term()?;
let subject = EncodedTerm::read(&mut cursor)?; let graph_name = self.read_optional_term()?;
let graph_name = if cursor.position() < buffer.len() as u64 { Ok(EncodedQuad {
Some(EncodedTerm::read(&mut cursor)?)
} else {
None
};
Ok(Self {
subject, subject,
predicate, predicate,
object, object,
@ -148,63 +98,103 @@ impl EncodedQuad {
}) })
} }
pub fn new_from_ospg_buffer(buffer: &[u8]) -> Result<Self> { fn read_ospg_quad(&mut self) -> Result<EncodedQuad> {
let mut cursor = Cursor::new(buffer); let object = self.read_term()?;
let object = EncodedTerm::read(&mut cursor)?; let subject = self.read_term()?;
let subject = EncodedTerm::read(&mut cursor)?; let predicate = self.read_term()?;
let predicate = EncodedTerm::read(&mut cursor)?; let graph_name = self.read_optional_term()?;
let graph_name = if cursor.position() < buffer.len() as u64 { Ok(EncodedQuad {
Some(EncodedTerm::read(&mut cursor)?)
} else {
None
};
Ok(Self {
subject, subject,
predicate, predicate,
object, object,
graph_name, graph_name,
}) })
} }
}
pub fn spog(&self) -> Result<Vec<u8>> { fn read_term_after_type(reader: &mut impl Read, type_id: u8) -> Result<EncodedTerm> {
let mut spog = Vec::with_capacity(self.encoding_size()); match type_id {
self.subject.write(&mut spog)?; TYPE_NAMED_NODE_ID => Ok(EncodedTerm::NamedNode {
self.predicate.write(&mut spog)?; iri_id: reader.read_u64::<NetworkEndian>()?,
self.object.write(&mut spog)?; }),
if let Some(ref graph_name) = self.graph_name { TYPE_BLANK_NODE_ID => {
graph_name.write(&mut spog)?; let mut uuid_buffer = [0 as u8; 16];
reader.read_exact(&mut uuid_buffer)?;
Ok(EncodedTerm::BlankNode(Uuid::from_bytes(&uuid_buffer)?))
} }
Ok(spog) TYPE_LANG_STRING_LITERAL_ID => Ok(EncodedTerm::LangStringLiteral {
language_id: reader.read_u64::<NetworkEndian>()?,
value_id: reader.read_u64::<NetworkEndian>()?,
}),
TYPE_TYPED_LITERAL_ID => Ok(EncodedTerm::TypedLiteral {
datatype_id: reader.read_u64::<NetworkEndian>()?,
value_id: reader.read_u64::<NetworkEndian>()?,
}),
_ => Err("the term buffer has an invalid type id".into()),
} }
}
pub fn posg(&self) -> Result<Vec<u8>> { pub trait TermWriter {
let mut posg = Vec::with_capacity(self.encoding_size()); fn write_term(&mut self, term: &EncodedTerm) -> Result<()>;
self.predicate.write(&mut posg)?; fn write_optional_term(&mut self, term: &Option<EncodedTerm>) -> Result<()>;
self.object.write(&mut posg)?; fn write_spog_quad(&mut self, quad: &EncodedQuad) -> Result<()>;
self.subject.write(&mut posg)?; fn write_posg_quad(&mut self, quad: &EncodedQuad) -> Result<()>;
if let Some(ref graph_name) = self.graph_name { fn write_ospg_quad(&mut self, quad: &EncodedQuad) -> Result<()>;
graph_name.write(&mut posg)?; }
impl<R: Write> TermWriter for R {
fn write_term(&mut self, term: &EncodedTerm) -> Result<()> {
self.write_u8(term.type_id())?;
match term {
EncodedTerm::NamedNode { iri_id } => self.write_u64::<NetworkEndian>(*iri_id)?,
EncodedTerm::BlankNode(id) => self.write_all(id.as_bytes())?,
EncodedTerm::LangStringLiteral {
value_id,
language_id,
} => {
self.write_u64::<NetworkEndian>(*language_id)?;
self.write_u64::<NetworkEndian>(*value_id)?;
}
EncodedTerm::TypedLiteral {
value_id,
datatype_id,
} => {
self.write_u64::<NetworkEndian>(*datatype_id)?;
self.write_u64::<NetworkEndian>(*value_id)?;
}
} }
Ok(posg) Ok(())
} }
pub fn ospg(&self) -> Result<Vec<u8>> { fn write_optional_term(&mut self, term: &Option<EncodedTerm>) -> Result<()> {
let mut ospg = Vec::with_capacity(self.encoding_size()); match term {
self.object.write(&mut ospg)?; Some(term) => self.write_term(term),
self.subject.write(&mut ospg)?; None => Ok(self.write_u8(TYPE_NOTHING_ID)?),
self.predicate.write(&mut ospg)?;
if let Some(ref graph_name) = self.graph_name {
graph_name.write(&mut ospg)?;
} }
Ok(ospg)
} }
fn encoding_size(&self) -> usize { fn write_spog_quad(&mut self, quad: &EncodedQuad) -> Result<()> {
self.subject.encoding_size() + self.predicate.encoding_size() + self.object.encoding_size() self.write_term(&quad.subject)?;
+ match self.graph_name { self.write_term(&quad.predicate)?;
Some(ref graph_name) => graph_name.encoding_size(), self.write_term(&quad.object)?;
None => 0, self.write_optional_term(&quad.graph_name)?;
} Ok(())
}
fn write_posg_quad(&mut self, quad: &EncodedQuad) -> Result<()> {
self.write_term(&quad.predicate)?;
self.write_term(&quad.object)?;
self.write_term(&quad.subject)?;
self.write_optional_term(&quad.graph_name)?;
Ok(())
}
fn write_ospg_quad(&mut self, quad: &EncodedQuad) -> Result<()> {
self.write_term(&quad.object)?;
self.write_term(&quad.subject)?;
self.write_term(&quad.predicate)?;
self.write_optional_term(&quad.graph_name)?;
Ok(())
} }
} }

@ -5,6 +5,7 @@ use rocksdb::DBVector;
use rocksdb::Options; use rocksdb::Options;
use rocksdb::WriteBatch; use rocksdb::WriteBatch;
use rocksdb::DB; use rocksdb::DB;
use std::io::Cursor;
use std::mem::size_of; use std::mem::size_of;
use std::path::Path; use std::path::Path;
use std::str; use std::str;
@ -147,22 +148,25 @@ impl RocksDbStore {
} }
pub fn contains(&self, quad: &EncodedQuad) -> Result<bool> { pub fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
Ok(self.db.get_cf(self.spog_cf, &quad.spog()?)?.is_some()) Ok(self
.db
.get_cf(self.spog_cf, &encode_spog_quad(quad)?)?
.is_some())
} }
pub fn insert(&self, quad: &EncodedQuad) -> Result<()> { pub fn insert(&self, quad: &EncodedQuad) -> Result<()> {
let mut batch = WriteBatch::default(); let mut batch = WriteBatch::default();
batch.put_cf(self.spog_cf, &quad.spog()?, &EMPTY_BUF)?; batch.put_cf(self.spog_cf, &encode_spog_quad(quad)?, &EMPTY_BUF)?;
batch.put_cf(self.posg_cf, &quad.posg()?, &EMPTY_BUF)?; batch.put_cf(self.posg_cf, &encode_posg_quad(quad)?, &EMPTY_BUF)?;
batch.put_cf(self.ospg_cf, &quad.ospg()?, &EMPTY_BUF)?; batch.put_cf(self.ospg_cf, &encode_ospg_quad(quad)?, &EMPTY_BUF)?;
Ok(self.db.write(batch)?) //TODO: check what's going on if the key already exists Ok(self.db.write(batch)?) //TODO: check what's going on if the key already exists
} }
pub fn remove(&self, quad: &EncodedQuad) -> Result<()> { pub fn remove(&self, quad: &EncodedQuad) -> Result<()> {
let mut batch = WriteBatch::default(); let mut batch = WriteBatch::default();
batch.delete_cf(self.spog_cf, &quad.spog()?)?; batch.delete_cf(self.spog_cf, &encode_spog_quad(quad)?)?;
batch.delete_cf(self.posg_cf, &quad.posg()?)?; batch.delete_cf(self.posg_cf, &encode_posg_quad(quad)?)?;
batch.delete_cf(self.ospg_cf, &quad.ospg()?)?; batch.delete_cf(self.ospg_cf, &encode_ospg_quad(quad)?)?;
Ok(self.db.write(batch)?) Ok(self.db.write(batch)?)
} }
} }
@ -273,23 +277,41 @@ impl EncodedQuadPattern {
} }
fn encode_term(t: &EncodedTerm) -> Result<Vec<u8>> { fn encode_term(t: &EncodedTerm) -> Result<Vec<u8>> {
let mut vec = Vec::with_capacity(t.encoding_size()); let mut vec = Vec::default();
t.write(&mut vec)?; vec.write_term(&t)?;
Ok(vec) Ok(vec)
} }
fn encode_term_pair(t1: &EncodedTerm, t2: &EncodedTerm) -> Result<Vec<u8>> { fn encode_term_pair(t1: &EncodedTerm, t2: &EncodedTerm) -> Result<Vec<u8>> {
let mut vec = Vec::with_capacity(t1.encoding_size() + t2.encoding_size()); let mut vec = Vec::default();
t1.write(&mut vec)?; vec.write_term(&t1)?;
t2.write(&mut vec)?; vec.write_term(&t2)?;
Ok(vec) Ok(vec)
} }
fn encode_term_triple(t1: &EncodedTerm, t2: &EncodedTerm, t3: &EncodedTerm) -> Result<Vec<u8>> { fn encode_term_triple(t1: &EncodedTerm, t2: &EncodedTerm, t3: &EncodedTerm) -> Result<Vec<u8>> {
let mut vec = Vec::with_capacity(t1.encoding_size() + t2.encoding_size() + t3.encoding_size()); let mut vec = Vec::default();
t1.write(&mut vec)?; vec.write_term(&t1)?;
t2.write(&mut vec)?; vec.write_term(&t2)?;
t3.write(&mut vec)?; vec.write_term(&t3)?;
Ok(vec)
}
fn encode_spog_quad(quad: &EncodedQuad) -> Result<Vec<u8>> {
let mut vec = Vec::default();
vec.write_spog_quad(quad)?;
Ok(vec)
}
fn encode_posg_quad(quad: &EncodedQuad) -> Result<Vec<u8>> {
let mut vec = Vec::default();
vec.write_posg_quad(quad)?;
Ok(vec)
}
fn encode_ospg_quad(quad: &EncodedQuad) -> Result<Vec<u8>> {
let mut vec = Vec::default();
vec.write_ospg_quad(quad)?;
Ok(vec) Ok(vec)
} }
@ -304,7 +326,7 @@ impl Iterator for SPOGIndexIterator {
self.iter.next(); self.iter.next();
self.iter self.iter
.key() .key()
.map(|buffer| EncodedQuad::new_from_spog_buffer(&buffer)) .map(|buffer| Cursor::new(buffer).read_spog_quad())
} }
} }
@ -319,7 +341,7 @@ impl Iterator for POSGIndexIterator {
self.iter.next(); self.iter.next();
self.iter self.iter
.key() .key()
.map(|buffer| EncodedQuad::new_from_posg_buffer(&buffer)) .map(|buffer| Cursor::new(buffer).read_posg_quad())
} }
} }
@ -334,7 +356,7 @@ impl Iterator for OSPGIndexIterator {
self.iter.next(); self.iter.next();
self.iter self.iter
.key() .key()
.map(|buffer| EncodedQuad::new_from_ospg_buffer(&buffer)) .map(|buffer| Cursor::new(buffer).read_ospg_quad())
} }
} }

Loading…
Cancel
Save