From 49eda33d0a3d59a64ebfc7e09d4528d68b0aa360 Mon Sep 17 00:00:00 2001 From: Tpt Date: Fri, 6 Jul 2018 16:53:28 +0200 Subject: [PATCH] Adds basic RocksDB store --- .travis.yml | 6 +- Cargo.toml | 4 +- src/errors.rs | 8 + src/lib.rs | 4 + src/model/blank_node.rs | 6 + src/store/mod.rs | 2 + src/store/numeric_encoder.rs | 386 ++++++++++++++++++++++++++++++ src/store/rocksdb.rs | 446 +++++++++++++++++++++++++++++++++++ src/utils.rs | 8 + 9 files changed, 868 insertions(+), 2 deletions(-) create mode 100644 src/errors.rs create mode 100644 src/store/numeric_encoder.rs create mode 100644 src/store/rocksdb.rs diff --git a/.travis.yml b/.travis.yml index a8ef108d..748e7518 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,4 +7,8 @@ matrix: allow_failures: - rust: nightly fast_finish: true -cache: cargo \ No newline at end of file +cache: cargo +addons: + apt: + packages: + - clang \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 4d91f657..2c977f53 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,8 +15,10 @@ build = "build.rs" travis-ci = { repository = "Tpt/rudf" } [dependencies] +error-chain = "0.12" lazy_static = "1.0" -url = "1.7" +rocksdb = "0.10" +url = "1" uuid = { version = "0.6", features = ["v4"] } [build-dependencies] diff --git a/src/errors.rs b/src/errors.rs new file mode 100644 index 00000000..db534266 --- /dev/null +++ b/src/errors.rs @@ -0,0 +1,8 @@ +error_chain! { + foreign_links { + Url(::url::ParseError); + Uuid(::uuid::ParseError); + RocksDB(::rocksdb::Error); + Utf8(::std::str::Utf8Error); + } +} diff --git a/src/lib.rs b/src/lib.rs index 08669af7..5aa94920 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,12 @@ #[macro_use] +extern crate error_chain; +#[macro_use] extern crate lazy_static; +extern crate rocksdb; extern crate url; extern crate uuid; +pub mod errors; pub mod model; pub mod rio; pub mod sparql; diff --git a/src/model/blank_node.rs b/src/model/blank_node.rs index fade3969..14633bcd 100644 --- a/src/model/blank_node.rs +++ b/src/model/blank_node.rs @@ -28,3 +28,9 @@ impl Default for BlankNode { BlankNode { id: Uuid::new_v4() } } } + +impl From for BlankNode { + fn from(id: Uuid) -> Self { + Self { id } + } +} diff --git a/src/store/mod.rs b/src/store/mod.rs index 959f8e6c..7c423a32 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -1,2 +1,4 @@ pub mod isomorphism; pub mod memory; +mod numeric_encoder; +pub mod rocksdb; diff --git a/src/store/numeric_encoder.rs b/src/store/numeric_encoder.rs new file mode 100644 index 00000000..59c408b4 --- /dev/null +++ b/src/store/numeric_encoder.rs @@ -0,0 +1,386 @@ +use errors::*; +use model::*; +use std::ops::Deref; +use std::str; +use std::str::FromStr; +use url::Url; +use uuid::Uuid; + +pub const STRING_KEY_SIZE: usize = 8; + +pub trait BytesStore { + type BytesOutput: Deref; + + fn put(&self, value: &[u8], id_buffer: &mut [u8]) -> Result<()>; + fn get(&self, id: &[u8]) -> Result>; +} + +const TYPE_KEY_SIZE: usize = 1; +const TYPE_NAMED_NODE_ID: u8 = 1; +const TYPE_BLANK_NODE_ID: u8 = 2; +const TYPE_LANG_STRING_LITERAL_ID: u8 = 3; +const TYPE_TYPED_LITERAL_ID: u8 = 4; +pub const TERM_ENCODING_SIZE: usize = TYPE_KEY_SIZE + 2 * STRING_KEY_SIZE; +const EMPTY_TERM: [u8; TERM_ENCODING_SIZE] = [0 as u8; TERM_ENCODING_SIZE]; + +#[derive(Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Hash)] +pub struct EncodedTerm([u8; TERM_ENCODING_SIZE]); + +impl EncodedTerm { + pub fn new_from_buffer(buffer: &[u8]) -> Result { + if buffer.len() != TERM_ENCODING_SIZE { + return Err("the term buffer has not the correct length".into()); + } + let mut buf = [0 as u8; TERM_ENCODING_SIZE]; + buf.copy_from_slice(buffer); + return Ok(EncodedTerm(buf)); + } +} + +impl AsRef<[u8]> for EncodedTerm { + fn as_ref(&self) -> &[u8] { + &self.0[..] + } +} + +#[derive(Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Hash)] +pub struct EncodedQuad { + pub subject: EncodedTerm, + pub predicate: EncodedTerm, + pub object: EncodedTerm, + pub graph_name: EncodedTerm, +} + +impl EncodedQuad { + pub fn new_from_spog_buffer(buffer: &[u8]) -> Result { + if buffer.len() != 4 * TERM_ENCODING_SIZE { + return Err("the spog buffer has not the correct length".into()); + } + Ok(Self { + subject: EncodedTerm::new_from_buffer(&buffer[0..TERM_ENCODING_SIZE])?, + predicate: EncodedTerm::new_from_buffer( + &buffer[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE], + )?, + object: EncodedTerm::new_from_buffer( + &buffer[2 * TERM_ENCODING_SIZE..3 * TERM_ENCODING_SIZE], + )?, + graph_name: EncodedTerm::new_from_buffer( + &buffer[3 * TERM_ENCODING_SIZE..4 * TERM_ENCODING_SIZE], + )?, + }) + } + + pub fn new_from_posg_buffer(buffer: &[u8]) -> Result { + if buffer.len() != 4 * TERM_ENCODING_SIZE { + return Err("the posg buffer has not the correct length".into()); + } + Ok(Self { + subject: EncodedTerm::new_from_buffer( + &buffer[2 * TERM_ENCODING_SIZE..3 * TERM_ENCODING_SIZE], + )?, + predicate: EncodedTerm::new_from_buffer(&buffer[0..TERM_ENCODING_SIZE])?, + object: EncodedTerm::new_from_buffer( + &buffer[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE], + )?, + graph_name: EncodedTerm::new_from_buffer( + &buffer[3 * TERM_ENCODING_SIZE..4 * TERM_ENCODING_SIZE], + )?, + }) + } + + pub fn new_from_ospg_buffer(buffer: &[u8]) -> Result { + if buffer.len() != 4 * TERM_ENCODING_SIZE { + return Err("the ospg buffer has not the correct length".into()); + } + Ok(Self { + subject: EncodedTerm::new_from_buffer( + &buffer[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE], + )?, + predicate: EncodedTerm::new_from_buffer( + &buffer[2 * TERM_ENCODING_SIZE..3 * TERM_ENCODING_SIZE], + )?, + object: EncodedTerm::new_from_buffer(&buffer[0..TERM_ENCODING_SIZE])?, + graph_name: EncodedTerm::new_from_buffer( + &buffer[3 * TERM_ENCODING_SIZE..4 * TERM_ENCODING_SIZE], + )?, + }) + } + + pub fn spog(&self) -> [u8; 4 * TERM_ENCODING_SIZE] { + let mut spog = [0 as u8; 4 * TERM_ENCODING_SIZE]; + spog[0..TERM_ENCODING_SIZE].copy_from_slice(self.subject.as_ref()); + spog[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE].copy_from_slice(self.predicate.as_ref()); + spog[2 * TERM_ENCODING_SIZE..3 * TERM_ENCODING_SIZE].copy_from_slice(self.object.as_ref()); + spog[3 * TERM_ENCODING_SIZE..4 * TERM_ENCODING_SIZE] + .copy_from_slice(self.graph_name.as_ref()); + spog + } + + pub fn posg(&self) -> [u8; 4 * TERM_ENCODING_SIZE] { + let mut posg = [0 as u8; 4 * TERM_ENCODING_SIZE]; + posg[0..TERM_ENCODING_SIZE].copy_from_slice(self.predicate.as_ref()); + posg[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE].copy_from_slice(self.object.as_ref()); + posg[2 * TERM_ENCODING_SIZE..3 * TERM_ENCODING_SIZE].copy_from_slice(self.subject.as_ref()); + posg[3 * TERM_ENCODING_SIZE..4 * TERM_ENCODING_SIZE] + .copy_from_slice(self.graph_name.as_ref()); + posg + } + + pub fn ospg(&self) -> [u8; 4 * TERM_ENCODING_SIZE] { + let mut ospg = [0 as u8; 4 * TERM_ENCODING_SIZE]; + ospg[0..TERM_ENCODING_SIZE].copy_from_slice(self.object.as_ref()); + ospg[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE].copy_from_slice(self.subject.as_ref()); + ospg[2 * TERM_ENCODING_SIZE..3 * TERM_ENCODING_SIZE] + .copy_from_slice(self.predicate.as_ref()); + ospg[3 * TERM_ENCODING_SIZE..4 * TERM_ENCODING_SIZE] + .copy_from_slice(self.graph_name.as_ref()); + ospg + } +} + +pub struct Encoder { + string_store: S, +} + +impl Encoder { + pub fn new(string_store: S) -> Self { + Self { string_store } + } + + pub fn encode_named_node(&self, named_node: &NamedNode) -> Result { + let mut bytes = [0 as u8; TERM_ENCODING_SIZE]; + bytes[0] = TYPE_NAMED_NODE_ID; + self.encode_str_value_to_lower_bytes(named_node.as_str(), &mut bytes)?; + Ok(EncodedTerm(bytes)) + } + + pub fn encode_blank_node(&self, blank_node: &BlankNode) -> Result { + let mut bytes = [0 as u8; TERM_ENCODING_SIZE]; + bytes[0] = TYPE_BLANK_NODE_ID; + bytes[TYPE_KEY_SIZE..TERM_ENCODING_SIZE].copy_from_slice(blank_node.as_bytes()); + Ok(EncodedTerm(bytes)) + } + + pub fn encode_literal(&self, literal: &Literal) -> Result { + let mut bytes = [0 as u8; TERM_ENCODING_SIZE]; + if let Some(language) = literal.language() { + bytes[0] = TYPE_LANG_STRING_LITERAL_ID; + self.encode_str_value_to_upper_bytes(language, &mut bytes)?; + } else { + bytes[0] = TYPE_TYPED_LITERAL_ID; + self.encode_str_value_to_upper_bytes(literal.datatype().as_str(), &mut bytes)?; + } + self.encode_str_value_to_lower_bytes(literal.value().as_str(), &mut bytes)?; + Ok(EncodedTerm(bytes)) + } + + pub fn encode_named_or_blank_node(&self, term: &NamedOrBlankNode) -> Result { + match term { + NamedOrBlankNode::NamedNode(named_node) => self.encode_named_node(named_node), + NamedOrBlankNode::BlankNode(blank_node) => self.encode_blank_node(blank_node), + } + } + + pub fn encode_optional_named_or_blank_node( + &self, + term: &Option, + ) -> Result { + match term { + Some(node) => self.encode_named_or_blank_node(node), + None => Ok(EncodedTerm(EMPTY_TERM)), + } + } + + pub fn encode_term(&self, term: &Term) -> Result { + match term { + Term::NamedNode(named_node) => self.encode_named_node(named_node), + Term::BlankNode(blank_node) => self.encode_blank_node(blank_node), + Term::Literal(literal) => self.encode_literal(literal), + } + } + + pub fn encode_quad(&self, quad: &Quad) -> Result { + Ok(EncodedQuad { + subject: self.encode_named_or_blank_node(quad.subject())?, + predicate: self.encode_named_node(quad.predicate())?, + object: self.encode_term(quad.object())?, + graph_name: self.encode_optional_named_or_blank_node(quad.graph_name())?, + }) + } + + pub fn decode_term(&self, encoded: impl AsRef<[u8]>) -> Result { + let encoding = encoded.as_ref(); + match encoding[0] { + TYPE_NAMED_NODE_ID => { + let iri = self.decode_url_value_from_lower_bytes(encoding)?; + Ok(NamedNode::from(iri).into()) + } + TYPE_BLANK_NODE_ID => Ok(BlankNode::from(Uuid::from_bytes(&encoding[1..])?).into()), + TYPE_LANG_STRING_LITERAL_ID => { + let value = self.decode_str_value_from_lower_bytes(encoding)?; + let language = self.decode_str_value_from_upper_bytes(encoding)?; + Ok(Literal::new_language_tagged_literal(value, language).into()) + } + TYPE_TYPED_LITERAL_ID => { + let value = self.decode_str_value_from_lower_bytes(encoding)?; + let datatype = NamedNode::from(self.decode_url_value_from_upper_bytes(encoding)?); + Ok(Literal::new_typed_literal(value, datatype).into()) + } + _ => Err("invalid term type encoding".into()), + } + } + + pub fn decode_named_or_blank_node( + &self, + encoded: impl AsRef<[u8]>, + ) -> Result { + let encoding = encoded.as_ref(); + match self.decode_term(encoding)? { + Term::NamedNode(named_node) => Ok(named_node.into()), + Term::BlankNode(blank_node) => Ok(blank_node.into()), + Term::Literal(_) => Err("A literal has ben found instead of a named node".into()), + } + } + + pub fn decode_optional_named_or_blank_node( + &self, + encoded: impl AsRef<[u8]>, + ) -> Result> { + let encoding = encoded.as_ref(); + if encoding == EMPTY_TERM { + Ok(None) + } else { + Ok(Some(self.decode_named_or_blank_node(encoding)?)) + } + } + + pub fn decode_named_node(&self, encoded: impl AsRef<[u8]>) -> Result { + let encoding = encoded.as_ref(); + match self.decode_term(encoding)? { + Term::NamedNode(named_node) => Ok(named_node), + Term::BlankNode(_) => Err("A blank node has been found instead of a named node".into()), + Term::Literal(_) => Err("A literal has ben found instead of a named node".into()), + } + } + + pub fn decode_quad(&self, encoded: EncodedQuad) -> Result { + Ok(Quad::new( + self.decode_named_or_blank_node(encoded.subject)?, + self.decode_named_node(encoded.predicate)?, + self.decode_term(encoded.object)?, + self.decode_optional_named_or_blank_node(encoded.graph_name)?, + )) + } + + fn encode_str_value_to_upper_bytes(&self, text: &str, bytes: &mut [u8]) -> Result<()> { + self.string_store.put( + text.as_bytes(), + &mut bytes[TYPE_KEY_SIZE..TYPE_KEY_SIZE + STRING_KEY_SIZE], + ) + } + fn encode_str_value_to_lower_bytes(&self, text: &str, bytes: &mut [u8]) -> Result<()> { + self.string_store.put( + text.as_bytes(), + &mut bytes[TYPE_KEY_SIZE + STRING_KEY_SIZE..TYPE_KEY_SIZE + 2 * STRING_KEY_SIZE], + ) + } + + fn decode_str_value_from_upper_bytes(&self, encoding: &[u8]) -> Result { + let bytes = self.decode_value_from_upper_bytes(encoding)?; + Ok(str::from_utf8(&bytes)?.to_string()) + } + + fn decode_url_value_from_upper_bytes(&self, encoding: &[u8]) -> Result { + let bytes = self.decode_value_from_upper_bytes(encoding)?; + Ok(Url::from_str(str::from_utf8(&bytes)?)?) + } + + fn decode_value_from_upper_bytes(&self, encoding: &[u8]) -> Result { + self.string_store + .get(&encoding[TYPE_KEY_SIZE..TYPE_KEY_SIZE + STRING_KEY_SIZE])? + .ok_or(Error::from("value not found in the dictionary")) + } + + fn decode_str_value_from_lower_bytes(&self, encoding: &[u8]) -> Result { + let bytes = self.decode_value_from_lower_bytes(encoding)?; + Ok(str::from_utf8(&bytes)?.to_string()) + } + + fn decode_url_value_from_lower_bytes(&self, encoding: &[u8]) -> Result { + let bytes = self.decode_value_from_lower_bytes(encoding)?; + Ok(Url::from_str(str::from_utf8(&bytes)?)?) + } + + fn decode_value_from_lower_bytes(&self, encoding: &[u8]) -> Result { + self.string_store + .get(&encoding[TYPE_KEY_SIZE + STRING_KEY_SIZE..TYPE_KEY_SIZE + 2 * STRING_KEY_SIZE])? + .ok_or(Error::from("value not found in the dictionary")) + } +} + +impl Default for Encoder { + fn default() -> Self { + Self { + string_store: S::default(), + } + } +} + +mod test { + use errors::*; + use model::*; + use std::cell::RefCell; + use std::collections::BTreeMap; + use std::str::FromStr; + use store::numeric_encoder::BytesStore; + use store::numeric_encoder::Encoder; + use store::numeric_encoder::STRING_KEY_SIZE; + use store::numeric_encoder::TERM_ENCODING_SIZE; + use utils::to_bytes; + + #[derive(Default)] + struct MemoryBytesStore { + id2str: RefCell>>, + str2id: RefCell, [u8; STRING_KEY_SIZE]>>, + } + + impl BytesStore for MemoryBytesStore { + type BytesOutput = Vec; + + fn put(&self, value: &[u8], id_buffer: &mut [u8]) -> Result<()> { + let mut str2id = self.str2id.borrow_mut(); + let mut id2str = self.id2str.borrow_mut(); + let id = str2id.entry(value.to_vec()).or_insert_with(|| { + let id = to_bytes(id2str.len()); + id2str.insert(id, value.to_vec()); + id + }); + id_buffer.copy_from_slice(id); + Ok(()) + } + + fn get(&self, id: &[u8]) -> Result>> { + Ok(self.id2str.borrow().get(id).map(|s| s.to_owned())) + } + } + + #[test] + fn test_encoding() { + let encoder: Encoder = Encoder::default(); + let terms: Vec = vec![ + NamedNode::from_str("http://foo.com").unwrap().into(), + NamedNode::from_str("http://bar.com").unwrap().into(), + NamedNode::from_str("http://foo.com").unwrap().into(), + BlankNode::default().into(), + Literal::from(true).into(), + Literal::from(1.2).into(), + Literal::from("foo").into(), + Literal::new_language_tagged_literal("foo", "fr").into(), + ]; + for term in terms { + let encoded = encoder.encode_term(&term).unwrap(); + assert_eq!(term, encoder.decode_term(encoded).unwrap()) + } + } + +} diff --git a/src/store/rocksdb.rs b/src/store/rocksdb.rs new file mode 100644 index 00000000..84b7c905 --- /dev/null +++ b/src/store/rocksdb.rs @@ -0,0 +1,446 @@ +use errors::*; +use model::*; +use rocksdb::ColumnFamily; +use rocksdb::DBRawIterator; +use rocksdb::DBVector; +use rocksdb::IteratorMode; +use rocksdb::Options; +use rocksdb::WriteBatch; +use rocksdb::DB; +use std::ops::Deref; +use std::path::Path; +use std::slice; +use std::str; +use store::numeric_encoder::BytesStore; +use store::numeric_encoder::EncodedQuad; +use store::numeric_encoder::EncodedTerm; +use store::numeric_encoder::Encoder; +use store::numeric_encoder::STRING_KEY_SIZE; +use store::numeric_encoder::TERM_ENCODING_SIZE; +use utils::to_bytes; + +pub struct RocksDbDataset { + store: RocksDbStore, +} + +impl RocksDbDataset { + pub fn open(path: impl AsRef) -> Result { + Ok(Self { + store: RocksDbStore::open(path)?, + }) + } + + fn graph(&self, name: &NamedOrBlankNode) -> RocksDbGraph { + RocksDbGraph { + store: &self.store, + name: name.clone(), + } + } + + fn default_graph(&self) -> RocksDbDefaultGraph { + RocksDbDefaultGraph { store: &self.store } + } + + fn union_graph(&self) -> RocksDbUnionGraph { + RocksDbUnionGraph { store: &self.store } + } + + fn iter(&self) -> Result> { + Ok(QuadsIterator { + iter: self.store.quads()?, + encoder: self.store.encoder(), + }) + } + + fn quads_for_subject( + &self, + subject: &NamedOrBlankNode, + ) -> Result>> { + Ok(QuadsIterator { + iter: self.store + .quads_for_subject(self.store.encoder().encode_named_or_blank_node(subject)?)?, + encoder: self.store.encoder(), + }) + } + + fn contains(&self, quad: &Quad) -> Result { + self.store + .contains(&self.store.encoder().encode_quad(quad)?) + } + + fn insert(&self, quad: &Quad) -> Result<()> { + self.store.insert(&self.store.encoder().encode_quad(quad)?) + } + + fn remove(&self, quad: &Quad) -> Result<()> { + self.store.remove(&self.store.encoder().encode_quad(quad)?) + } +} + +struct RocksDbGraph<'a> { + store: &'a RocksDbStore, + name: NamedOrBlankNode, //TODO: better storage +} + +struct RocksDbDefaultGraph<'a> { + store: &'a RocksDbStore, +} + +struct RocksDbUnionGraph<'a> { + store: &'a RocksDbStore, +} + +const ID2STR_CF: &'static str = "id2str"; +const STR2ID_CF: &'static str = "id2str"; +const SPOG_CF: &'static str = "spog"; +const POSG_CF: &'static str = "posg"; +const OSPG_CF: &'static str = "ospg"; + +const EMPTY_BUF: [u8; 0] = [0 as u8; 0]; + +//TODO: indexes for the default graph and indexes for the named graphs (no more Optional and space saving) + +const COLUMN_FAMILIES: [&'static str; 5] = [ID2STR_CF, STR2ID_CF, SPOG_CF, POSG_CF, OSPG_CF]; + +struct RocksDbStore { + db: DB, + id2str_cf: ColumnFamily, + str2id_cf: ColumnFamily, + spog_cf: ColumnFamily, + posg_cf: ColumnFamily, + ospg_cf: ColumnFamily, +} + +impl RocksDbStore { + fn open(path: impl AsRef) -> Result { + let options = Options::default(); + + let db = DB::open_cf(&options, path, &COLUMN_FAMILIES)?; + let id2str_cf = get_cf(&db, STR2ID_CF)?; + let str2id_cf = get_cf(&db, ID2STR_CF)?; + let spog_cf = get_cf(&db, SPOG_CF)?; + let posg_cf = get_cf(&db, POSG_CF)?; + let ospg_cf = get_cf(&db, OSPG_CF)?; + + Ok(Self { + db, + id2str_cf, + str2id_cf, + spog_cf, + posg_cf, + ospg_cf, + }) + } + + fn encoder(&self) -> Encoder { + Encoder::new(RocksDbBytesStore(&self)) + } + + fn quads(&self) -> Result { + let mut iter = self.db.raw_iterator_cf(self.spog_cf)?; + iter.seek_to_first(); + Ok(SPOGIndexIterator { iter }) + } + + fn quads_for_subject( + &self, + subject: EncodedTerm, + ) -> Result> { + let mut iter = self.db.raw_iterator_cf(self.spog_cf)?; + iter.seek(subject.as_ref()); + Ok(FilteringEncodedQuadsIterator { + iter: SPOGIndexIterator { iter }, + filter: EncodedQuadPattern::new(Some(subject), None, None, None), + }) + } + + fn quads_for_subject_predicate( + &self, + subject: EncodedTerm, + predicate: EncodedTerm, + ) -> Result> { + let mut iter = self.db.raw_iterator_cf(self.spog_cf)?; + iter.seek(&encode_term_pair(&subject, &predicate)); + Ok(FilteringEncodedQuadsIterator { + iter: SPOGIndexIterator { iter }, + filter: EncodedQuadPattern::new(Some(subject), Some(predicate), None, None), + }) + } + + fn quads_for_subject_predicate_object( + &self, + subject: EncodedTerm, + predicate: EncodedTerm, + object: EncodedTerm, + ) -> Result> { + let mut iter = self.db.raw_iterator_cf(self.spog_cf)?; + iter.seek(&encode_term_triple(&subject, &predicate, &object)); + Ok(FilteringEncodedQuadsIterator { + iter: SPOGIndexIterator { iter }, + filter: EncodedQuadPattern::new(Some(subject), Some(predicate), Some(object), None), + }) + } + + fn quads_for_predicate( + &self, + predicate: EncodedTerm, + ) -> Result> { + let mut iter = self.db.raw_iterator_cf(self.posg_cf)?; + iter.seek(predicate.as_ref()); + Ok(FilteringEncodedQuadsIterator { + iter: POSGIndexIterator { iter }, + filter: EncodedQuadPattern::new(None, Some(predicate), None, None), + }) + } + + fn quads_for_predicate_object( + &self, + predicate: EncodedTerm, + object: EncodedTerm, + ) -> Result> { + let mut iter = self.db.raw_iterator_cf(self.spog_cf)?; + iter.seek(&encode_term_pair(&predicate, &object)); + Ok(FilteringEncodedQuadsIterator { + iter: POSGIndexIterator { iter }, + filter: EncodedQuadPattern::new(None, Some(predicate), Some(object), None), + }) + } + + fn quads_for_object( + &self, + object: EncodedTerm, + ) -> Result> { + let mut iter = self.db.raw_iterator_cf(self.ospg_cf)?; + iter.seek(object.as_ref()); + Ok(FilteringEncodedQuadsIterator { + iter: OSPGIndexIterator { iter }, + filter: EncodedQuadPattern::new(None, None, Some(object), None), + }) + } + + fn contains(&self, quad: &EncodedQuad) -> Result { + Ok(self.db.get_cf(self.spog_cf, &quad.spog())?.is_some()) + } + + fn insert(&self, quad: &EncodedQuad) -> Result<()> { + let mut batch = WriteBatch::default(); + batch.put_cf(self.spog_cf, &quad.spog(), &EMPTY_BUF)?; + batch.put_cf(self.posg_cf, &quad.posg(), &EMPTY_BUF)?; + batch.put_cf(self.ospg_cf, &quad.ospg(), &EMPTY_BUF)?; + Ok(self.db.write(batch)?) //TODO: check what's going on if the key already exists + } + + fn remove(&self, quad: &EncodedQuad) -> Result<()> { + let mut batch = WriteBatch::default(); + batch.delete_cf(self.spog_cf, &quad.spog())?; + batch.delete_cf(self.posg_cf, &quad.posg())?; + batch.delete_cf(self.ospg_cf, &quad.ospg())?; + Ok(self.db.write(batch)?) + } +} + +fn get_cf(db: &DB, name: &str) -> Result { + db.cf_handle(name) + .ok_or_else(|| Error::from("column family not found")) +} + +struct RocksDbBytesStore<'a>(&'a RocksDbStore); + +impl<'a> BytesStore for RocksDbBytesStore<'a> { + type BytesOutput = DBVector; + + fn put(&self, value: &[u8], id_buffer: &mut [u8]) -> Result<()> { + match self.0.db.get_cf(self.0.str2id_cf, value)? { + Some(id) => id_buffer.copy_from_slice(&id), + None => { + let mut batch = WriteBatch::default(); + // TODO: id allocation + let id = [0 as u8; STRING_KEY_SIZE]; + batch.put_cf(self.0.id2str_cf, &id, value)?; + batch.put_cf(self.0.str2id_cf, value, &id)?; + self.0.db.write(batch)?; + id_buffer.copy_from_slice(&id) + } + } + Ok(()) + } + + fn get(&self, id: &[u8]) -> Result> { + Ok(self.0.db.get_cf(self.0.id2str_cf, id)?) + } +} + +struct EncodedQuadPattern { + subject: Option, + predicate: Option, + object: Option, + graph_name: Option, +} + +impl EncodedQuadPattern { + fn new( + subject: Option, + predicate: Option, + object: Option, + graph_name: Option, + ) -> Self { + Self { + subject, + predicate, + object, + graph_name, + } + } + + fn filter(&self, quad: &EncodedQuad) -> bool { + if let Some(ref subject) = self.subject { + if &quad.subject != subject { + return false; + } + } + if let Some(ref predicate) = self.predicate { + if &quad.predicate != predicate { + return false; + } + } + if let Some(ref object) = self.object { + if &quad.object != object { + return false; + } + } + if let Some(ref graph_name) = self.graph_name { + if &quad.graph_name != graph_name { + return false; + } + } + true + } +} + +fn encode_term_pair(t1: &EncodedTerm, t2: &EncodedTerm) -> [u8; 2 * TERM_ENCODING_SIZE] { + let mut bytes = [0 as u8; 2 * TERM_ENCODING_SIZE]; + bytes[0..TERM_ENCODING_SIZE].copy_from_slice(t1.as_ref()); + bytes[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE].copy_from_slice(t2.as_ref()); + bytes +} + +fn encode_term_triple( + t1: &EncodedTerm, + t2: &EncodedTerm, + t3: &EncodedTerm, +) -> [u8; 3 * TERM_ENCODING_SIZE] { + let mut bytes = [0 as u8; 3 * TERM_ENCODING_SIZE]; + bytes[0..TERM_ENCODING_SIZE].copy_from_slice(t1.as_ref()); + bytes[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE].copy_from_slice(t2.as_ref()); + bytes[2 * TERM_ENCODING_SIZE..3 * TERM_ENCODING_SIZE].copy_from_slice(t2.as_ref()); + bytes +} + +struct SPOGIndexIterator { + iter: DBRawIterator, +} + +impl Iterator for SPOGIndexIterator { + type Item = Result; + + fn next(&mut self) -> Option> { + self.iter.next(); + self.iter + .key() + .map(|buffer| EncodedQuad::new_from_spog_buffer(&buffer)) + } +} + +struct POSGIndexIterator { + iter: DBRawIterator, +} + +impl Iterator for POSGIndexIterator { + type Item = Result; + + fn next(&mut self) -> Option> { + self.iter.next(); + self.iter + .key() + .map(|buffer| EncodedQuad::new_from_posg_buffer(&buffer)) + } +} + +struct OSPGIndexIterator { + iter: DBRawIterator, +} + +impl Iterator for OSPGIndexIterator { + type Item = Result; + + fn next(&mut self) -> Option> { + self.iter.next(); + self.iter + .key() + .map(|buffer| EncodedQuad::new_from_ospg_buffer(&buffer)) + } +} + +struct FilteringEncodedQuadsIterator>> { + iter: I, + filter: EncodedQuadPattern, +} + +impl>> Iterator for FilteringEncodedQuadsIterator { + type Item = Result; + + fn next(&mut self) -> Option> { + self.iter.next().filter(|quad| match quad { + Ok(quad) => self.filter.filter(quad), + Err(e) => true, + }) + } +} + +struct QuadsIterator<'a, I: Iterator>> { + iter: I, + encoder: Encoder>, +} + +impl<'a, I: Iterator>> Iterator for QuadsIterator<'a, I> { + type Item = Result; + + fn next(&mut self) -> Option> { + self.iter + .next() + .map(|k| k.and_then(|quad| self.encoder.decode_quad(quad))) + } +} + +/*fn encode_sp( + encoder: &Encoder, + subject: &NamedOrBlankNode, + predicate: &NamedNode, +) -> Result<[u8; 2 * TERM_ENCODING_SIZE]> { + let mut sp = [0 as u8; 2 * TERM_ENCODING_SIZE]; + encoder.encode_named_or_blank_node(subject, &mut sp)?; + encoder.encode_named_node(predicate, &mut sp)?; + Ok(sp) +} + +fn encode_po( + encoder: &Encoder, + predicate: &NamedNode, + object: &Term, +) -> Result<[u8; 2 * TERM_ENCODING_SIZE]> { + let mut po = [0 as u8; 2 * TERM_ENCODING_SIZE]; + encoder.encode_named_node(predicate, &mut po)?; + encoder.encode_term(object, &mut po)?; + Ok(po) +} + +fn encode_os( + encoder: &Encoder, + object: &Term, + subject: &NamedOrBlankNode, +) -> Result<[u8; 2 * TERM_ENCODING_SIZE]> { + let mut po = [0 as u8; 2 * TERM_ENCODING_SIZE]; + encoder.encode_term(object, &mut po)?; + encoder.encode_named_or_blank_node(subject, &mut po)?; + Ok(po) +}*/ diff --git a/src/utils.rs b/src/utils.rs index 5f1a00fb..c0ac1c15 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,3 +1,6 @@ +use std::mem::size_of; +use std::mem::transmute; + pub trait Escaper { fn escape(&self) -> String; } @@ -77,3 +80,8 @@ impl ExactSizeIterator for EscapeRDF { } } } + +pub fn to_bytes(int: usize) -> [u8; size_of::()] { + //TODO: remove when next rust version stabilize this method + unsafe { transmute(int) } +}