From 7ace14916cc2b14101719f15938032f6d810a3cd Mon Sep 17 00:00:00 2001 From: Tpt Date: Fri, 7 Sep 2018 14:46:18 +0200 Subject: [PATCH] Refactor codes Allows to write easily binary dumps with our encoding --- src/store/numeric_encoder.rs | 242 +++++++++++++++++------------------ src/store/rocksdb/storage.rs | 60 ++++++--- 2 files changed, 157 insertions(+), 145 deletions(-) diff --git a/src/store/numeric_encoder.rs b/src/store/numeric_encoder.rs index 7ff4ae93..26ccadcd 100644 --- a/src/store/numeric_encoder.rs +++ b/src/store/numeric_encoder.rs @@ -1,10 +1,8 @@ use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt}; use errors::*; use model::*; -use std::io::Cursor; use std::io::Read; use std::io::Write; -use std::mem::size_of; use std::ops::Deref; use std::str; use std::str::FromStr; @@ -18,6 +16,7 @@ pub trait BytesStore { fn get(&self, id: u64) -> Result>; } +const TYPE_NOTHING_ID: u8 = 0; const TYPE_NAMED_NODE_ID: u8 = 1; const TYPE_BLANK_NODE_ID: u8 = 2; const TYPE_LANG_STRING_LITERAL_ID: u8 = 3; @@ -32,33 +31,6 @@ pub enum EncodedTerm { } impl EncodedTerm { - pub fn read(reader: &mut impl Read) -> Result { - let type_id = reader.read_u8()?; - match type_id { - TYPE_NAMED_NODE_ID => Ok(EncodedTerm::NamedNode { - iri_id: reader.read_u64::()?, - }), - TYPE_BLANK_NODE_ID => { - let mut uuid_buffer = [0 as u8; 16]; - reader.read_exact(&mut uuid_buffer)?; - Ok(EncodedTerm::BlankNode(Uuid::from_bytes(&uuid_buffer)?)) - } - TYPE_LANG_STRING_LITERAL_ID => Ok(EncodedTerm::LangStringLiteral { - language_id: reader.read_u64::()?, - value_id: reader.read_u64::()?, - }), - TYPE_TYPED_LITERAL_ID => Ok(EncodedTerm::TypedLiteral { - datatype_id: reader.read_u64::()?, - value_id: reader.read_u64::()?, - }), - _ => Err("the term buffer has an invalid type id".into()), - } - } - - pub fn encoding_size(&self) -> usize { - Self::type_length(self.type_id()).unwrap() //It is not possible to fail here - } - fn type_id(&self) -> u8 { match self { EncodedTerm::NamedNode { .. } => TYPE_NAMED_NODE_ID, @@ -67,40 +39,6 @@ impl EncodedTerm { EncodedTerm::TypedLiteral { .. } => TYPE_TYPED_LITERAL_ID, } } - - fn type_length(type_id: u8) -> Result { - //TODO: useful - match type_id { - TYPE_NAMED_NODE_ID => Ok(1 + size_of::()), - TYPE_BLANK_NODE_ID => Ok(17), //TODO: guess - TYPE_LANG_STRING_LITERAL_ID => Ok(1 + 2 * size_of::()), - TYPE_TYPED_LITERAL_ID => Ok(1 + 2 * size_of::()), - _ => Err(format!("{} is not a known type id", type_id).into()), - } - } - - pub fn write(&self, writer: &mut impl Write) -> Result<()> { - writer.write_u8(self.type_id())?; - match self { - EncodedTerm::NamedNode { iri_id } => writer.write_u64::(*iri_id)?, - EncodedTerm::BlankNode(id) => writer.write_all(id.as_bytes())?, - EncodedTerm::LangStringLiteral { - value_id, - language_id, - } => { - writer.write_u64::(*language_id)?; - writer.write_u64::(*value_id)?; - } - EncodedTerm::TypedLiteral { - value_id, - datatype_id, - } => { - writer.write_u64::(*datatype_id)?; - writer.write_u64::(*value_id)?; - } - } - Ok(()) - } } #[derive(Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Hash)] @@ -111,18 +49,35 @@ pub struct EncodedQuad { pub graph_name: Option, } -impl EncodedQuad { - pub fn new_from_spog_buffer(buffer: &[u8]) -> Result { - let mut cursor = Cursor::new(buffer); - let subject = EncodedTerm::read(&mut cursor)?; - let predicate = EncodedTerm::read(&mut cursor)?; - let object = EncodedTerm::read(&mut cursor)?; - let graph_name = if cursor.position() < buffer.len() as u64 { - Some(EncodedTerm::read(&mut cursor)?) +pub trait TermReader { + fn read_term(&mut self) -> Result; + fn read_optional_term(&mut self) -> Result>; + fn read_spog_quad(&mut self) -> Result; + fn read_posg_quad(&mut self) -> Result; + fn read_ospg_quad(&mut self) -> Result; +} + +impl TermReader for R { + fn read_term(&mut self) -> Result { + let type_id = self.read_u8()?; + read_term_after_type(self, type_id) + } + + fn read_optional_term(&mut self) -> Result> { + let type_id = self.read_u8()?; + if type_id == 0 { + Ok(None) } else { - None - }; - Ok(Self { + Ok(Some(read_term_after_type(self, type_id)?)) + } + } + + fn read_spog_quad(&mut self) -> Result { + let subject = self.read_term()?; + let predicate = self.read_term()?; + let object = self.read_term()?; + let graph_name = self.read_optional_term()?; + Ok(EncodedQuad { subject, predicate, object, @@ -130,17 +85,12 @@ impl EncodedQuad { }) } - pub fn new_from_posg_buffer(buffer: &[u8]) -> Result { - let mut cursor = Cursor::new(buffer); - let predicate = EncodedTerm::read(&mut cursor)?; - let object = EncodedTerm::read(&mut cursor)?; - let subject = EncodedTerm::read(&mut cursor)?; - let graph_name = if cursor.position() < buffer.len() as u64 { - Some(EncodedTerm::read(&mut cursor)?) - } else { - None - }; - Ok(Self { + fn read_posg_quad(&mut self) -> Result { + let predicate = self.read_term()?; + let object = self.read_term()?; + let subject = self.read_term()?; + let graph_name = self.read_optional_term()?; + Ok(EncodedQuad { subject, predicate, object, @@ -148,63 +98,103 @@ impl EncodedQuad { }) } - pub fn new_from_ospg_buffer(buffer: &[u8]) -> Result { - let mut cursor = Cursor::new(buffer); - let object = EncodedTerm::read(&mut cursor)?; - let subject = EncodedTerm::read(&mut cursor)?; - let predicate = EncodedTerm::read(&mut cursor)?; - let graph_name = if cursor.position() < buffer.len() as u64 { - Some(EncodedTerm::read(&mut cursor)?) - } else { - None - }; - Ok(Self { + fn read_ospg_quad(&mut self) -> Result { + let object = self.read_term()?; + let subject = self.read_term()?; + let predicate = self.read_term()?; + let graph_name = self.read_optional_term()?; + Ok(EncodedQuad { subject, predicate, object, graph_name, }) } +} - pub fn spog(&self) -> Result> { - let mut spog = Vec::with_capacity(self.encoding_size()); - self.subject.write(&mut spog)?; - self.predicate.write(&mut spog)?; - self.object.write(&mut spog)?; - if let Some(ref graph_name) = self.graph_name { - graph_name.write(&mut spog)?; +fn read_term_after_type(reader: &mut impl Read, type_id: u8) -> Result { + match type_id { + TYPE_NAMED_NODE_ID => Ok(EncodedTerm::NamedNode { + iri_id: reader.read_u64::()?, + }), + TYPE_BLANK_NODE_ID => { + let mut uuid_buffer = [0 as u8; 16]; + reader.read_exact(&mut uuid_buffer)?; + Ok(EncodedTerm::BlankNode(Uuid::from_bytes(&uuid_buffer)?)) } - Ok(spog) + TYPE_LANG_STRING_LITERAL_ID => Ok(EncodedTerm::LangStringLiteral { + language_id: reader.read_u64::()?, + value_id: reader.read_u64::()?, + }), + TYPE_TYPED_LITERAL_ID => Ok(EncodedTerm::TypedLiteral { + datatype_id: reader.read_u64::()?, + value_id: reader.read_u64::()?, + }), + _ => Err("the term buffer has an invalid type id".into()), } +} - pub fn posg(&self) -> Result> { - let mut posg = Vec::with_capacity(self.encoding_size()); - self.predicate.write(&mut posg)?; - self.object.write(&mut posg)?; - self.subject.write(&mut posg)?; - if let Some(ref graph_name) = self.graph_name { - graph_name.write(&mut posg)?; +pub trait TermWriter { + fn write_term(&mut self, term: &EncodedTerm) -> Result<()>; + fn write_optional_term(&mut self, term: &Option) -> Result<()>; + fn write_spog_quad(&mut self, quad: &EncodedQuad) -> Result<()>; + fn write_posg_quad(&mut self, quad: &EncodedQuad) -> Result<()>; + fn write_ospg_quad(&mut self, quad: &EncodedQuad) -> Result<()>; +} + +impl TermWriter for R { + fn write_term(&mut self, term: &EncodedTerm) -> Result<()> { + self.write_u8(term.type_id())?; + match term { + EncodedTerm::NamedNode { iri_id } => self.write_u64::(*iri_id)?, + EncodedTerm::BlankNode(id) => self.write_all(id.as_bytes())?, + EncodedTerm::LangStringLiteral { + value_id, + language_id, + } => { + self.write_u64::(*language_id)?; + self.write_u64::(*value_id)?; + } + EncodedTerm::TypedLiteral { + value_id, + datatype_id, + } => { + self.write_u64::(*datatype_id)?; + self.write_u64::(*value_id)?; + } } - Ok(posg) + Ok(()) } - pub fn ospg(&self) -> Result> { - let mut ospg = Vec::with_capacity(self.encoding_size()); - self.object.write(&mut ospg)?; - self.subject.write(&mut ospg)?; - self.predicate.write(&mut ospg)?; - if let Some(ref graph_name) = self.graph_name { - graph_name.write(&mut ospg)?; + fn write_optional_term(&mut self, term: &Option) -> Result<()> { + match term { + Some(term) => self.write_term(term), + None => Ok(self.write_u8(TYPE_NOTHING_ID)?), } - Ok(ospg) } - fn encoding_size(&self) -> usize { - self.subject.encoding_size() + self.predicate.encoding_size() + self.object.encoding_size() - + match self.graph_name { - Some(ref graph_name) => graph_name.encoding_size(), - None => 0, - } + fn write_spog_quad(&mut self, quad: &EncodedQuad) -> Result<()> { + self.write_term(&quad.subject)?; + self.write_term(&quad.predicate)?; + self.write_term(&quad.object)?; + self.write_optional_term(&quad.graph_name)?; + Ok(()) + } + + fn write_posg_quad(&mut self, quad: &EncodedQuad) -> Result<()> { + self.write_term(&quad.predicate)?; + self.write_term(&quad.object)?; + self.write_term(&quad.subject)?; + self.write_optional_term(&quad.graph_name)?; + Ok(()) + } + + fn write_ospg_quad(&mut self, quad: &EncodedQuad) -> Result<()> { + self.write_term(&quad.object)?; + self.write_term(&quad.subject)?; + self.write_term(&quad.predicate)?; + self.write_optional_term(&quad.graph_name)?; + Ok(()) } } diff --git a/src/store/rocksdb/storage.rs b/src/store/rocksdb/storage.rs index bab1c4c6..1b2e98ab 100644 --- a/src/store/rocksdb/storage.rs +++ b/src/store/rocksdb/storage.rs @@ -5,6 +5,7 @@ use rocksdb::DBVector; use rocksdb::Options; use rocksdb::WriteBatch; use rocksdb::DB; +use std::io::Cursor; use std::mem::size_of; use std::path::Path; use std::str; @@ -147,22 +148,25 @@ impl RocksDbStore { } pub fn contains(&self, quad: &EncodedQuad) -> Result { - Ok(self.db.get_cf(self.spog_cf, &quad.spog()?)?.is_some()) + Ok(self + .db + .get_cf(self.spog_cf, &encode_spog_quad(quad)?)? + .is_some()) } pub fn insert(&self, quad: &EncodedQuad) -> Result<()> { let mut batch = WriteBatch::default(); - batch.put_cf(self.spog_cf, &quad.spog()?, &EMPTY_BUF)?; - batch.put_cf(self.posg_cf, &quad.posg()?, &EMPTY_BUF)?; - batch.put_cf(self.ospg_cf, &quad.ospg()?, &EMPTY_BUF)?; + batch.put_cf(self.spog_cf, &encode_spog_quad(quad)?, &EMPTY_BUF)?; + batch.put_cf(self.posg_cf, &encode_posg_quad(quad)?, &EMPTY_BUF)?; + batch.put_cf(self.ospg_cf, &encode_ospg_quad(quad)?, &EMPTY_BUF)?; Ok(self.db.write(batch)?) //TODO: check what's going on if the key already exists } pub fn remove(&self, quad: &EncodedQuad) -> Result<()> { let mut batch = WriteBatch::default(); - batch.delete_cf(self.spog_cf, &quad.spog()?)?; - batch.delete_cf(self.posg_cf, &quad.posg()?)?; - batch.delete_cf(self.ospg_cf, &quad.ospg()?)?; + batch.delete_cf(self.spog_cf, &encode_spog_quad(quad)?)?; + batch.delete_cf(self.posg_cf, &encode_posg_quad(quad)?)?; + batch.delete_cf(self.ospg_cf, &encode_ospg_quad(quad)?)?; Ok(self.db.write(batch)?) } } @@ -273,23 +277,41 @@ impl EncodedQuadPattern { } fn encode_term(t: &EncodedTerm) -> Result> { - let mut vec = Vec::with_capacity(t.encoding_size()); - t.write(&mut vec)?; + let mut vec = Vec::default(); + vec.write_term(&t)?; Ok(vec) } fn encode_term_pair(t1: &EncodedTerm, t2: &EncodedTerm) -> Result> { - let mut vec = Vec::with_capacity(t1.encoding_size() + t2.encoding_size()); - t1.write(&mut vec)?; - t2.write(&mut vec)?; + let mut vec = Vec::default(); + vec.write_term(&t1)?; + vec.write_term(&t2)?; Ok(vec) } fn encode_term_triple(t1: &EncodedTerm, t2: &EncodedTerm, t3: &EncodedTerm) -> Result> { - let mut vec = Vec::with_capacity(t1.encoding_size() + t2.encoding_size() + t3.encoding_size()); - t1.write(&mut vec)?; - t2.write(&mut vec)?; - t3.write(&mut vec)?; + let mut vec = Vec::default(); + vec.write_term(&t1)?; + vec.write_term(&t2)?; + vec.write_term(&t3)?; + Ok(vec) +} + +fn encode_spog_quad(quad: &EncodedQuad) -> Result> { + let mut vec = Vec::default(); + vec.write_spog_quad(quad)?; + Ok(vec) +} + +fn encode_posg_quad(quad: &EncodedQuad) -> Result> { + let mut vec = Vec::default(); + vec.write_posg_quad(quad)?; + Ok(vec) +} + +fn encode_ospg_quad(quad: &EncodedQuad) -> Result> { + let mut vec = Vec::default(); + vec.write_ospg_quad(quad)?; Ok(vec) } @@ -304,7 +326,7 @@ impl Iterator for SPOGIndexIterator { self.iter.next(); self.iter .key() - .map(|buffer| EncodedQuad::new_from_spog_buffer(&buffer)) + .map(|buffer| Cursor::new(buffer).read_spog_quad()) } } @@ -319,7 +341,7 @@ impl Iterator for POSGIndexIterator { self.iter.next(); self.iter .key() - .map(|buffer| EncodedQuad::new_from_posg_buffer(&buffer)) + .map(|buffer| Cursor::new(buffer).read_posg_quad()) } } @@ -334,7 +356,7 @@ impl Iterator for OSPGIndexIterator { self.iter.next(); self.iter .key() - .map(|buffer| EncodedQuad::new_from_ospg_buffer(&buffer)) + .map(|buffer| Cursor::new(buffer).read_ospg_quad()) } }