[RocksDB] Splits front facing API and storage

pull/10/head
Tpt 6 years ago
parent 49eda33d0a
commit c865cb3ee9
  1. 102
      src/store/rocksdb/mod.rs
  2. 158
      src/store/rocksdb/storage.rs

@ -0,0 +1,102 @@
mod storage;
use errors::*;
use model::*;
use std::ops::Deref;
use std::path::Path;
use std::slice;
use std::str;
use store::numeric_encoder::BytesStore;
use store::numeric_encoder::EncodedQuad;
use store::numeric_encoder::EncodedTerm;
use store::numeric_encoder::Encoder;
use store::numeric_encoder::STRING_KEY_SIZE;
use store::numeric_encoder::TERM_ENCODING_SIZE;
use store::rocksdb::storage::*;
use utils::to_bytes;
pub struct RocksDbDataset {
store: RocksDbStore,
}
impl RocksDbDataset {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
Ok(Self {
store: RocksDbStore::open(path)?,
})
}
fn graph(&self, name: &NamedOrBlankNode) -> RocksDbGraph {
RocksDbGraph {
store: &self.store,
name: name.clone(),
}
}
fn default_graph(&self) -> RocksDbDefaultGraph {
RocksDbDefaultGraph { store: &self.store }
}
fn union_graph(&self) -> RocksDbUnionGraph {
RocksDbUnionGraph { store: &self.store }
}
fn iter(&self) -> Result<QuadsIterator<SPOGIndexIterator>> {
Ok(QuadsIterator {
iter: self.store.quads()?,
encoder: self.store.encoder(),
})
}
fn quads_for_subject(
&self,
subject: &NamedOrBlankNode,
) -> Result<QuadsIterator<FilteringEncodedQuadsIterator<SPOGIndexIterator>>> {
Ok(QuadsIterator {
iter: self.store
.quads_for_subject(self.store.encoder().encode_named_or_blank_node(subject)?)?,
encoder: self.store.encoder(),
})
}
fn contains(&self, quad: &Quad) -> Result<bool> {
self.store
.contains(&self.store.encoder().encode_quad(quad)?)
}
fn insert(&self, quad: &Quad) -> Result<()> {
self.store.insert(&self.store.encoder().encode_quad(quad)?)
}
fn remove(&self, quad: &Quad) -> Result<()> {
self.store.remove(&self.store.encoder().encode_quad(quad)?)
}
}
struct RocksDbGraph<'a> {
store: &'a RocksDbStore,
name: NamedOrBlankNode, //TODO: better storage
}
struct RocksDbDefaultGraph<'a> {
store: &'a RocksDbStore,
}
struct RocksDbUnionGraph<'a> {
store: &'a RocksDbStore,
}
struct QuadsIterator<'a, I: Iterator<Item = Result<EncodedQuad>>> {
iter: I,
encoder: Encoder<RocksDbBytesStore<'a>>,
}
impl<'a, I: Iterator<Item = Result<EncodedQuad>>> Iterator for QuadsIterator<'a, I> {
type Item = Result<Quad>;
fn next(&mut self) -> Option<Result<Quad>> {
self.iter
.next()
.map(|k| k.and_then(|quad| self.encoder.decode_quad(quad)))
}
}

@ -1,5 +1,4 @@
use errors::*;
use model::*;
use rocksdb::ColumnFamily;
use rocksdb::DBRawIterator;
use rocksdb::DBVector;
@ -19,77 +18,6 @@ use store::numeric_encoder::STRING_KEY_SIZE;
use store::numeric_encoder::TERM_ENCODING_SIZE;
use utils::to_bytes;
pub struct RocksDbDataset {
store: RocksDbStore,
}
impl RocksDbDataset {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
Ok(Self {
store: RocksDbStore::open(path)?,
})
}
fn graph(&self, name: &NamedOrBlankNode) -> RocksDbGraph {
RocksDbGraph {
store: &self.store,
name: name.clone(),
}
}
fn default_graph(&self) -> RocksDbDefaultGraph {
RocksDbDefaultGraph { store: &self.store }
}
fn union_graph(&self) -> RocksDbUnionGraph {
RocksDbUnionGraph { store: &self.store }
}
fn iter(&self) -> Result<QuadsIterator<SPOGIndexIterator>> {
Ok(QuadsIterator {
iter: self.store.quads()?,
encoder: self.store.encoder(),
})
}
fn quads_for_subject(
&self,
subject: &NamedOrBlankNode,
) -> Result<QuadsIterator<FilteringEncodedQuadsIterator<SPOGIndexIterator>>> {
Ok(QuadsIterator {
iter: self.store
.quads_for_subject(self.store.encoder().encode_named_or_blank_node(subject)?)?,
encoder: self.store.encoder(),
})
}
fn contains(&self, quad: &Quad) -> Result<bool> {
self.store
.contains(&self.store.encoder().encode_quad(quad)?)
}
fn insert(&self, quad: &Quad) -> Result<()> {
self.store.insert(&self.store.encoder().encode_quad(quad)?)
}
fn remove(&self, quad: &Quad) -> Result<()> {
self.store.remove(&self.store.encoder().encode_quad(quad)?)
}
}
struct RocksDbGraph<'a> {
store: &'a RocksDbStore,
name: NamedOrBlankNode, //TODO: better storage
}
struct RocksDbDefaultGraph<'a> {
store: &'a RocksDbStore,
}
struct RocksDbUnionGraph<'a> {
store: &'a RocksDbStore,
}
const ID2STR_CF: &'static str = "id2str";
const STR2ID_CF: &'static str = "id2str";
const SPOG_CF: &'static str = "spog";
@ -102,7 +30,7 @@ const EMPTY_BUF: [u8; 0] = [0 as u8; 0];
const COLUMN_FAMILIES: [&'static str; 5] = [ID2STR_CF, STR2ID_CF, SPOG_CF, POSG_CF, OSPG_CF];
struct RocksDbStore {
pub struct RocksDbStore {
db: DB,
id2str_cf: ColumnFamily,
str2id_cf: ColumnFamily,
@ -112,7 +40,7 @@ struct RocksDbStore {
}
impl RocksDbStore {
fn open(path: impl AsRef<Path>) -> Result<Self> {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let options = Options::default();
let db = DB::open_cf(&options, path, &COLUMN_FAMILIES)?;
@ -132,17 +60,17 @@ impl RocksDbStore {
})
}
fn encoder(&self) -> Encoder<RocksDbBytesStore> {
pub fn encoder(&self) -> Encoder<RocksDbBytesStore> {
Encoder::new(RocksDbBytesStore(&self))
}
fn quads(&self) -> Result<SPOGIndexIterator> {
pub fn quads(&self) -> Result<SPOGIndexIterator> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?;
iter.seek_to_first();
Ok(SPOGIndexIterator { iter })
}
fn quads_for_subject(
pub fn quads_for_subject(
&self,
subject: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> {
@ -154,7 +82,7 @@ impl RocksDbStore {
})
}
fn quads_for_subject_predicate(
pub fn quads_for_subject_predicate(
&self,
subject: EncodedTerm,
predicate: EncodedTerm,
@ -167,7 +95,7 @@ impl RocksDbStore {
})
}
fn quads_for_subject_predicate_object(
pub fn quads_for_subject_predicate_object(
&self,
subject: EncodedTerm,
predicate: EncodedTerm,
@ -181,7 +109,7 @@ impl RocksDbStore {
})
}
fn quads_for_predicate(
pub fn quads_for_predicate(
&self,
predicate: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<POSGIndexIterator>> {
@ -193,7 +121,7 @@ impl RocksDbStore {
})
}
fn quads_for_predicate_object(
pub fn quads_for_predicate_object(
&self,
predicate: EncodedTerm,
object: EncodedTerm,
@ -206,7 +134,7 @@ impl RocksDbStore {
})
}
fn quads_for_object(
pub fn quads_for_object(
&self,
object: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<OSPGIndexIterator>> {
@ -218,11 +146,11 @@ impl RocksDbStore {
})
}
fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
pub fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
Ok(self.db.get_cf(self.spog_cf, &quad.spog())?.is_some())
}
fn insert(&self, quad: &EncodedQuad) -> Result<()> {
pub fn insert(&self, quad: &EncodedQuad) -> Result<()> {
let mut batch = WriteBatch::default();
batch.put_cf(self.spog_cf, &quad.spog(), &EMPTY_BUF)?;
batch.put_cf(self.posg_cf, &quad.posg(), &EMPTY_BUF)?;
@ -230,7 +158,7 @@ impl RocksDbStore {
Ok(self.db.write(batch)?) //TODO: check what's going on if the key already exists
}
fn remove(&self, quad: &EncodedQuad) -> Result<()> {
pub fn remove(&self, quad: &EncodedQuad) -> Result<()> {
let mut batch = WriteBatch::default();
batch.delete_cf(self.spog_cf, &quad.spog())?;
batch.delete_cf(self.posg_cf, &quad.posg())?;
@ -239,12 +167,12 @@ impl RocksDbStore {
}
}
fn get_cf(db: &DB, name: &str) -> Result<ColumnFamily> {
pub fn get_cf(db: &DB, name: &str) -> Result<ColumnFamily> {
db.cf_handle(name)
.ok_or_else(|| Error::from("column family not found"))
}
struct RocksDbBytesStore<'a>(&'a RocksDbStore);
pub struct RocksDbBytesStore<'a>(&'a RocksDbStore);
impl<'a> BytesStore for RocksDbBytesStore<'a> {
type BytesOutput = DBVector;
@ -336,7 +264,7 @@ fn encode_term_triple(
bytes
}
struct SPOGIndexIterator {
pub struct SPOGIndexIterator {
iter: DBRawIterator,
}
@ -351,7 +279,7 @@ impl Iterator for SPOGIndexIterator {
}
}
struct POSGIndexIterator {
pub struct POSGIndexIterator {
iter: DBRawIterator,
}
@ -366,7 +294,7 @@ impl Iterator for POSGIndexIterator {
}
}
struct OSPGIndexIterator {
pub struct OSPGIndexIterator {
iter: DBRawIterator,
}
@ -381,7 +309,7 @@ impl Iterator for OSPGIndexIterator {
}
}
struct FilteringEncodedQuadsIterator<I: Iterator<Item = Result<EncodedQuad>>> {
pub struct FilteringEncodedQuadsIterator<I: Iterator<Item = Result<EncodedQuad>>> {
iter: I,
filter: EncodedQuadPattern,
}
@ -396,51 +324,3 @@ impl<I: Iterator<Item = Result<EncodedQuad>>> Iterator for FilteringEncodedQuads
})
}
}
struct QuadsIterator<'a, I: Iterator<Item = Result<EncodedQuad>>> {
iter: I,
encoder: Encoder<RocksDbBytesStore<'a>>,
}
impl<'a, I: Iterator<Item = Result<EncodedQuad>>> Iterator for QuadsIterator<'a, I> {
type Item = Result<Quad>;
fn next(&mut self) -> Option<Result<Quad>> {
self.iter
.next()
.map(|k| k.and_then(|quad| self.encoder.decode_quad(quad)))
}
}
/*fn encode_sp(
encoder: &Encoder<RocksDbBytesStore>,
subject: &NamedOrBlankNode,
predicate: &NamedNode,
) -> Result<[u8; 2 * TERM_ENCODING_SIZE]> {
let mut sp = [0 as u8; 2 * TERM_ENCODING_SIZE];
encoder.encode_named_or_blank_node(subject, &mut sp)?;
encoder.encode_named_node(predicate, &mut sp)?;
Ok(sp)
}
fn encode_po(
encoder: &Encoder<RocksDbBytesStore>,
predicate: &NamedNode,
object: &Term,
) -> Result<[u8; 2 * TERM_ENCODING_SIZE]> {
let mut po = [0 as u8; 2 * TERM_ENCODING_SIZE];
encoder.encode_named_node(predicate, &mut po)?;
encoder.encode_term(object, &mut po)?;
Ok(po)
}
fn encode_os(
encoder: &Encoder<RocksDbBytesStore>,
object: &Term,
subject: &NamedOrBlankNode,
) -> Result<[u8; 2 * TERM_ENCODING_SIZE]> {
let mut po = [0 as u8; 2 * TERM_ENCODING_SIZE];
encoder.encode_term(object, &mut po)?;
encoder.encode_named_or_blank_node(subject, &mut po)?;
Ok(po)
}*/
Loading…
Cancel
Save