From c865cb3ee9e5a2c33893dfe592fef44b27043ea3 Mon Sep 17 00:00:00 2001 From: Tpt Date: Tue, 31 Jul 2018 17:16:13 +0200 Subject: [PATCH] [RocksDB] Splits front facing API and storage --- src/store/rocksdb/mod.rs | 102 ++++++++++++ src/store/{rocksdb.rs => rocksdb/storage.rs} | 158 +++---------------- 2 files changed, 121 insertions(+), 139 deletions(-) create mode 100644 src/store/rocksdb/mod.rs rename src/store/{rocksdb.rs => rocksdb/storage.rs} (70%) diff --git a/src/store/rocksdb/mod.rs b/src/store/rocksdb/mod.rs new file mode 100644 index 00000000..5d2c0b5c --- /dev/null +++ b/src/store/rocksdb/mod.rs @@ -0,0 +1,102 @@ +mod storage; + +use errors::*; +use model::*; +use std::ops::Deref; +use std::path::Path; +use std::slice; +use std::str; +use store::numeric_encoder::BytesStore; +use store::numeric_encoder::EncodedQuad; +use store::numeric_encoder::EncodedTerm; +use store::numeric_encoder::Encoder; +use store::numeric_encoder::STRING_KEY_SIZE; +use store::numeric_encoder::TERM_ENCODING_SIZE; +use store::rocksdb::storage::*; +use utils::to_bytes; + +pub struct RocksDbDataset { + store: RocksDbStore, +} + +impl RocksDbDataset { + pub fn open(path: impl AsRef) -> Result { + Ok(Self { + store: RocksDbStore::open(path)?, + }) + } + + fn graph(&self, name: &NamedOrBlankNode) -> RocksDbGraph { + RocksDbGraph { + store: &self.store, + name: name.clone(), + } + } + + fn default_graph(&self) -> RocksDbDefaultGraph { + RocksDbDefaultGraph { store: &self.store } + } + + fn union_graph(&self) -> RocksDbUnionGraph { + RocksDbUnionGraph { store: &self.store } + } + + fn iter(&self) -> Result> { + Ok(QuadsIterator { + iter: self.store.quads()?, + encoder: self.store.encoder(), + }) + } + + fn quads_for_subject( + &self, + subject: &NamedOrBlankNode, + ) -> Result>> { + Ok(QuadsIterator { + iter: self.store + .quads_for_subject(self.store.encoder().encode_named_or_blank_node(subject)?)?, + encoder: self.store.encoder(), + }) + } + + fn contains(&self, quad: &Quad) -> Result { + self.store + .contains(&self.store.encoder().encode_quad(quad)?) + } + + fn insert(&self, quad: &Quad) -> Result<()> { + self.store.insert(&self.store.encoder().encode_quad(quad)?) + } + + fn remove(&self, quad: &Quad) -> Result<()> { + self.store.remove(&self.store.encoder().encode_quad(quad)?) + } +} + +struct RocksDbGraph<'a> { + store: &'a RocksDbStore, + name: NamedOrBlankNode, //TODO: better storage +} + +struct RocksDbDefaultGraph<'a> { + store: &'a RocksDbStore, +} + +struct RocksDbUnionGraph<'a> { + store: &'a RocksDbStore, +} + +struct QuadsIterator<'a, I: Iterator>> { + iter: I, + encoder: Encoder>, +} + +impl<'a, I: Iterator>> Iterator for QuadsIterator<'a, I> { + type Item = Result; + + fn next(&mut self) -> Option> { + self.iter + .next() + .map(|k| k.and_then(|quad| self.encoder.decode_quad(quad))) + } +} diff --git a/src/store/rocksdb.rs b/src/store/rocksdb/storage.rs similarity index 70% rename from src/store/rocksdb.rs rename to src/store/rocksdb/storage.rs index 84b7c905..5a8e71e6 100644 --- a/src/store/rocksdb.rs +++ b/src/store/rocksdb/storage.rs @@ -1,5 +1,4 @@ use errors::*; -use model::*; use rocksdb::ColumnFamily; use rocksdb::DBRawIterator; use rocksdb::DBVector; @@ -19,77 +18,6 @@ use store::numeric_encoder::STRING_KEY_SIZE; use store::numeric_encoder::TERM_ENCODING_SIZE; use utils::to_bytes; -pub struct RocksDbDataset { - store: RocksDbStore, -} - -impl RocksDbDataset { - pub fn open(path: impl AsRef) -> Result { - Ok(Self { - store: RocksDbStore::open(path)?, - }) - } - - fn graph(&self, name: &NamedOrBlankNode) -> RocksDbGraph { - RocksDbGraph { - store: &self.store, - name: name.clone(), - } - } - - fn default_graph(&self) -> RocksDbDefaultGraph { - RocksDbDefaultGraph { store: &self.store } - } - - fn union_graph(&self) -> RocksDbUnionGraph { - RocksDbUnionGraph { store: &self.store } - } - - fn iter(&self) -> Result> { - Ok(QuadsIterator { - iter: self.store.quads()?, - encoder: self.store.encoder(), - }) - } - - fn quads_for_subject( - &self, - subject: &NamedOrBlankNode, - ) -> Result>> { - Ok(QuadsIterator { - iter: self.store - .quads_for_subject(self.store.encoder().encode_named_or_blank_node(subject)?)?, - encoder: self.store.encoder(), - }) - } - - fn contains(&self, quad: &Quad) -> Result { - self.store - .contains(&self.store.encoder().encode_quad(quad)?) - } - - fn insert(&self, quad: &Quad) -> Result<()> { - self.store.insert(&self.store.encoder().encode_quad(quad)?) - } - - fn remove(&self, quad: &Quad) -> Result<()> { - self.store.remove(&self.store.encoder().encode_quad(quad)?) - } -} - -struct RocksDbGraph<'a> { - store: &'a RocksDbStore, - name: NamedOrBlankNode, //TODO: better storage -} - -struct RocksDbDefaultGraph<'a> { - store: &'a RocksDbStore, -} - -struct RocksDbUnionGraph<'a> { - store: &'a RocksDbStore, -} - const ID2STR_CF: &'static str = "id2str"; const STR2ID_CF: &'static str = "id2str"; const SPOG_CF: &'static str = "spog"; @@ -102,7 +30,7 @@ const EMPTY_BUF: [u8; 0] = [0 as u8; 0]; const COLUMN_FAMILIES: [&'static str; 5] = [ID2STR_CF, STR2ID_CF, SPOG_CF, POSG_CF, OSPG_CF]; -struct RocksDbStore { +pub struct RocksDbStore { db: DB, id2str_cf: ColumnFamily, str2id_cf: ColumnFamily, @@ -112,7 +40,7 @@ struct RocksDbStore { } impl RocksDbStore { - fn open(path: impl AsRef) -> Result { + pub fn open(path: impl AsRef) -> Result { let options = Options::default(); let db = DB::open_cf(&options, path, &COLUMN_FAMILIES)?; @@ -132,17 +60,17 @@ impl RocksDbStore { }) } - fn encoder(&self) -> Encoder { + pub fn encoder(&self) -> Encoder { Encoder::new(RocksDbBytesStore(&self)) } - fn quads(&self) -> Result { + pub fn quads(&self) -> Result { let mut iter = self.db.raw_iterator_cf(self.spog_cf)?; iter.seek_to_first(); Ok(SPOGIndexIterator { iter }) } - fn quads_for_subject( + pub fn quads_for_subject( &self, subject: EncodedTerm, ) -> Result> { @@ -154,7 +82,7 @@ impl RocksDbStore { }) } - fn quads_for_subject_predicate( + pub fn quads_for_subject_predicate( &self, subject: EncodedTerm, predicate: EncodedTerm, @@ -167,7 +95,7 @@ impl RocksDbStore { }) } - fn quads_for_subject_predicate_object( + pub fn quads_for_subject_predicate_object( &self, subject: EncodedTerm, predicate: EncodedTerm, @@ -181,7 +109,7 @@ impl RocksDbStore { }) } - fn quads_for_predicate( + pub fn quads_for_predicate( &self, predicate: EncodedTerm, ) -> Result> { @@ -193,7 +121,7 @@ impl RocksDbStore { }) } - fn quads_for_predicate_object( + pub fn quads_for_predicate_object( &self, predicate: EncodedTerm, object: EncodedTerm, @@ -206,7 +134,7 @@ impl RocksDbStore { }) } - fn quads_for_object( + pub fn quads_for_object( &self, object: EncodedTerm, ) -> Result> { @@ -218,11 +146,11 @@ impl RocksDbStore { }) } - fn contains(&self, quad: &EncodedQuad) -> Result { + pub fn contains(&self, quad: &EncodedQuad) -> Result { Ok(self.db.get_cf(self.spog_cf, &quad.spog())?.is_some()) } - fn insert(&self, quad: &EncodedQuad) -> Result<()> { + pub fn insert(&self, quad: &EncodedQuad) -> Result<()> { let mut batch = WriteBatch::default(); batch.put_cf(self.spog_cf, &quad.spog(), &EMPTY_BUF)?; batch.put_cf(self.posg_cf, &quad.posg(), &EMPTY_BUF)?; @@ -230,7 +158,7 @@ impl RocksDbStore { Ok(self.db.write(batch)?) //TODO: check what's going on if the key already exists } - fn remove(&self, quad: &EncodedQuad) -> Result<()> { + pub fn remove(&self, quad: &EncodedQuad) -> Result<()> { let mut batch = WriteBatch::default(); batch.delete_cf(self.spog_cf, &quad.spog())?; batch.delete_cf(self.posg_cf, &quad.posg())?; @@ -239,12 +167,12 @@ impl RocksDbStore { } } -fn get_cf(db: &DB, name: &str) -> Result { +pub fn get_cf(db: &DB, name: &str) -> Result { db.cf_handle(name) .ok_or_else(|| Error::from("column family not found")) } -struct RocksDbBytesStore<'a>(&'a RocksDbStore); +pub struct RocksDbBytesStore<'a>(&'a RocksDbStore); impl<'a> BytesStore for RocksDbBytesStore<'a> { type BytesOutput = DBVector; @@ -336,7 +264,7 @@ fn encode_term_triple( bytes } -struct SPOGIndexIterator { +pub struct SPOGIndexIterator { iter: DBRawIterator, } @@ -351,7 +279,7 @@ impl Iterator for SPOGIndexIterator { } } -struct POSGIndexIterator { +pub struct POSGIndexIterator { iter: DBRawIterator, } @@ -366,7 +294,7 @@ impl Iterator for POSGIndexIterator { } } -struct OSPGIndexIterator { +pub struct OSPGIndexIterator { iter: DBRawIterator, } @@ -381,7 +309,7 @@ impl Iterator for OSPGIndexIterator { } } -struct FilteringEncodedQuadsIterator>> { +pub struct FilteringEncodedQuadsIterator>> { iter: I, filter: EncodedQuadPattern, } @@ -396,51 +324,3 @@ impl>> Iterator for FilteringEncodedQuads }) } } - -struct QuadsIterator<'a, I: Iterator>> { - iter: I, - encoder: Encoder>, -} - -impl<'a, I: Iterator>> Iterator for QuadsIterator<'a, I> { - type Item = Result; - - fn next(&mut self) -> Option> { - self.iter - .next() - .map(|k| k.and_then(|quad| self.encoder.decode_quad(quad))) - } -} - -/*fn encode_sp( - encoder: &Encoder, - subject: &NamedOrBlankNode, - predicate: &NamedNode, -) -> Result<[u8; 2 * TERM_ENCODING_SIZE]> { - let mut sp = [0 as u8; 2 * TERM_ENCODING_SIZE]; - encoder.encode_named_or_blank_node(subject, &mut sp)?; - encoder.encode_named_node(predicate, &mut sp)?; - Ok(sp) -} - -fn encode_po( - encoder: &Encoder, - predicate: &NamedNode, - object: &Term, -) -> Result<[u8; 2 * TERM_ENCODING_SIZE]> { - let mut po = [0 as u8; 2 * TERM_ENCODING_SIZE]; - encoder.encode_named_node(predicate, &mut po)?; - encoder.encode_term(object, &mut po)?; - Ok(po) -} - -fn encode_os( - encoder: &Encoder, - object: &Term, - subject: &NamedOrBlankNode, -) -> Result<[u8; 2 * TERM_ENCODING_SIZE]> { - let mut po = [0 as u8; 2 * TERM_ENCODING_SIZE]; - encoder.encode_term(object, &mut po)?; - encoder.encode_named_or_blank_node(subject, &mut po)?; - Ok(po) -}*/