Makes RocksDbStore Send and Sync

pull/10/head
Tpt 6 years ago
parent fa1f48d0bf
commit 0290a2ce4b
  1. 73
      lib/src/store/rocksdb.rs

@ -8,6 +8,7 @@ use rocksdb::Options;
use rocksdb::WriteBatch; use rocksdb::WriteBatch;
use rocksdb::DB; use rocksdb::DB;
use std::io::Cursor; use std::io::Cursor;
use std::ops::Deref;
use std::path::Path; use std::path::Path;
use std::str; use std::str;
use std::sync::Mutex; use std::sync::Mutex;
@ -47,11 +48,11 @@ const COLUMN_FAMILIES: [&str; 5] = [ID2STR_CF, STR2ID_CF, SPOG_CF, POSG_CF, OSPG
pub struct RocksDbStore { pub struct RocksDbStore {
db: DB, db: DB,
str_id_counter: Mutex<RocksDBCounter>, str_id_counter: Mutex<RocksDBCounter>,
id2str_cf: ColumnFamily, id2str_cf: SendColumnFamily,
str2id_cf: ColumnFamily, str2id_cf: SendColumnFamily,
spog_cf: ColumnFamily, spog_cf: SendColumnFamily,
posg_cf: ColumnFamily, posg_cf: SendColumnFamily,
ospg_cf: ColumnFamily, ospg_cf: SendColumnFamily,
} }
impl RocksDbStore { impl RocksDbStore {
@ -61,11 +62,11 @@ impl RocksDbStore {
options.create_missing_column_families(true); options.create_missing_column_families(true);
let db = DB::open_cf(&options, path, &COLUMN_FAMILIES)?; let db = DB::open_cf(&options, path, &COLUMN_FAMILIES)?;
let id2str_cf = get_cf(&db, STR2ID_CF)?; let id2str_cf = SendColumnFamily(get_cf(&db, STR2ID_CF)?);
let str2id_cf = get_cf(&db, ID2STR_CF)?; let str2id_cf = SendColumnFamily(get_cf(&db, ID2STR_CF)?);
let spog_cf = get_cf(&db, SPOG_CF)?; let spog_cf = SendColumnFamily(get_cf(&db, SPOG_CF)?);
let posg_cf = get_cf(&db, POSG_CF)?; let posg_cf = SendColumnFamily(get_cf(&db, POSG_CF)?);
let ospg_cf = get_cf(&db, OSPG_CF)?; let ospg_cf = SendColumnFamily(get_cf(&db, OSPG_CF)?);
let new = Self { let new = Self {
db, db,
@ -85,7 +86,7 @@ impl BytesStore for RocksDbStore {
type BytesOutput = DBVector; type BytesOutput = DBVector;
fn insert_bytes(&self, value: &[u8]) -> Result<u64> { fn insert_bytes(&self, value: &[u8]) -> Result<u64> {
Ok(if let Some(id) = self.db.get_cf(self.str2id_cf, value)? { Ok(if let Some(id) = self.db.get_cf(*self.str2id_cf, value)? {
LittleEndian::read_u64(&id) LittleEndian::read_u64(&id)
} else { } else {
let id = self let id = self
@ -95,15 +96,15 @@ impl BytesStore for RocksDbStore {
.get_and_increment(&self.db)? as u64; .get_and_increment(&self.db)? as u64;
let id_bytes = to_bytes(id); let id_bytes = to_bytes(id);
let mut batch = WriteBatch::default(); let mut batch = WriteBatch::default();
batch.put_cf(self.id2str_cf, &id_bytes, value)?; batch.put_cf(*self.id2str_cf, &id_bytes, value)?;
batch.put_cf(self.str2id_cf, value, &id_bytes)?; batch.put_cf(*self.str2id_cf, value, &id_bytes)?;
self.db.write(batch)?; self.db.write(batch)?;
id id
}) })
} }
fn get_bytes(&self, id: u64) -> Result<Option<DBVector>> { fn get_bytes(&self, id: u64) -> Result<Option<DBVector>> {
Ok(self.db.get_cf(self.id2str_cf, &to_bytes(id))?) Ok(self.db.get_cf(*self.id2str_cf, &to_bytes(id))?)
} }
} }
@ -131,7 +132,7 @@ impl EncodedQuadsStore for RocksDbStore {
InGraphQuadsIterator<FilteringEncodedQuadsIterator<OSPGIndexIterator>>; InGraphQuadsIterator<FilteringEncodedQuadsIterator<OSPGIndexIterator>>;
fn quads(&self) -> Result<SPOGIndexIterator> { fn quads(&self) -> Result<SPOGIndexIterator> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?; let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?;
iter.seek_to_first(); iter.seek_to_first();
Ok(SPOGIndexIterator { iter }) Ok(SPOGIndexIterator { iter })
} }
@ -140,7 +141,7 @@ impl EncodedQuadsStore for RocksDbStore {
&self, &self,
subject: EncodedTerm, subject: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> { ) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?; let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?;
iter.seek(&encode_term(subject)?); iter.seek(&encode_term(subject)?);
Ok(FilteringEncodedQuadsIterator { Ok(FilteringEncodedQuadsIterator {
iter: SPOGIndexIterator { iter }, iter: SPOGIndexIterator { iter },
@ -153,7 +154,7 @@ impl EncodedQuadsStore for RocksDbStore {
subject: EncodedTerm, subject: EncodedTerm,
predicate: EncodedTerm, predicate: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> { ) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?; let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?;
iter.seek(&encode_term_pair(subject, predicate)?); iter.seek(&encode_term_pair(subject, predicate)?);
Ok(FilteringEncodedQuadsIterator { Ok(FilteringEncodedQuadsIterator {
iter: SPOGIndexIterator { iter }, iter: SPOGIndexIterator { iter },
@ -167,7 +168,7 @@ impl EncodedQuadsStore for RocksDbStore {
predicate: EncodedTerm, predicate: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> { ) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?; let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?;
iter.seek(&encode_term_triple(subject, predicate, object)?); iter.seek(&encode_term_triple(subject, predicate, object)?);
Ok(FilteringEncodedQuadsIterator { Ok(FilteringEncodedQuadsIterator {
iter: SPOGIndexIterator { iter }, iter: SPOGIndexIterator { iter },
@ -180,7 +181,7 @@ impl EncodedQuadsStore for RocksDbStore {
subject: EncodedTerm, subject: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<OSPGIndexIterator>> { ) -> Result<FilteringEncodedQuadsIterator<OSPGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?; let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?;
iter.seek(&encode_term_pair(object, subject)?); iter.seek(&encode_term_pair(object, subject)?);
Ok(FilteringEncodedQuadsIterator { Ok(FilteringEncodedQuadsIterator {
iter: OSPGIndexIterator { iter }, iter: OSPGIndexIterator { iter },
@ -192,7 +193,7 @@ impl EncodedQuadsStore for RocksDbStore {
&self, &self,
predicate: EncodedTerm, predicate: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<POSGIndexIterator>> { ) -> Result<FilteringEncodedQuadsIterator<POSGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.posg_cf)?; let mut iter = self.db.raw_iterator_cf(*self.posg_cf)?;
iter.seek(&encode_term(predicate)?); iter.seek(&encode_term(predicate)?);
Ok(FilteringEncodedQuadsIterator { Ok(FilteringEncodedQuadsIterator {
iter: POSGIndexIterator { iter }, iter: POSGIndexIterator { iter },
@ -205,7 +206,7 @@ impl EncodedQuadsStore for RocksDbStore {
predicate: EncodedTerm, predicate: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<POSGIndexIterator>> { ) -> Result<FilteringEncodedQuadsIterator<POSGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?; let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?;
iter.seek(&encode_term_pair(predicate, object)?); iter.seek(&encode_term_pair(predicate, object)?);
Ok(FilteringEncodedQuadsIterator { Ok(FilteringEncodedQuadsIterator {
iter: POSGIndexIterator { iter }, iter: POSGIndexIterator { iter },
@ -217,7 +218,7 @@ impl EncodedQuadsStore for RocksDbStore {
&self, &self,
object: EncodedTerm, object: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<OSPGIndexIterator>> { ) -> Result<FilteringEncodedQuadsIterator<OSPGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.ospg_cf)?; let mut iter = self.db.raw_iterator_cf(*self.ospg_cf)?;
iter.seek(&encode_term(object)?); iter.seek(&encode_term(object)?);
Ok(FilteringEncodedQuadsIterator { Ok(FilteringEncodedQuadsIterator {
iter: OSPGIndexIterator { iter }, iter: OSPGIndexIterator { iter },
@ -307,24 +308,24 @@ impl EncodedQuadsStore for RocksDbStore {
fn contains(&self, quad: &EncodedQuad) -> Result<bool> { fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
Ok(self Ok(self
.db .db
.get_cf(self.spog_cf, &encode_spog_quad(quad)?)? .get_cf(*self.spog_cf, &encode_spog_quad(quad)?)?
.is_some()) .is_some())
} }
fn insert(&self, quad: &EncodedQuad) -> Result<()> { fn insert(&self, quad: &EncodedQuad) -> Result<()> {
let mut batch = WriteBatch::default(); let mut batch = WriteBatch::default();
batch.put_cf(self.spog_cf, &encode_spog_quad(quad)?, &EMPTY_BUF)?; batch.put_cf(*self.spog_cf, &encode_spog_quad(quad)?, &EMPTY_BUF)?;
batch.put_cf(self.posg_cf, &encode_posg_quad(quad)?, &EMPTY_BUF)?; batch.put_cf(*self.posg_cf, &encode_posg_quad(quad)?, &EMPTY_BUF)?;
batch.put_cf(self.ospg_cf, &encode_ospg_quad(quad)?, &EMPTY_BUF)?; batch.put_cf(*self.ospg_cf, &encode_ospg_quad(quad)?, &EMPTY_BUF)?;
self.db.write(batch)?; //TODO: check what's going on if the key already exists self.db.write(batch)?; //TODO: check what's going on if the key already exists
Ok(()) Ok(())
} }
fn remove(&self, quad: &EncodedQuad) -> Result<()> { fn remove(&self, quad: &EncodedQuad) -> Result<()> {
let mut batch = WriteBatch::default(); let mut batch = WriteBatch::default();
batch.delete_cf(self.spog_cf, &encode_spog_quad(quad)?)?; batch.delete_cf(*self.spog_cf, &encode_spog_quad(quad)?)?;
batch.delete_cf(self.posg_cf, &encode_posg_quad(quad)?)?; batch.delete_cf(*self.posg_cf, &encode_posg_quad(quad)?)?;
batch.delete_cf(self.ospg_cf, &encode_ospg_quad(quad)?)?; batch.delete_cf(*self.ospg_cf, &encode_ospg_quad(quad)?)?;
self.db.write(batch)?; self.db.write(batch)?;
Ok(()) Ok(())
} }
@ -536,3 +537,17 @@ impl<T> From<PoisonError<T>> for RocksDBCounterMutexPoisonError {
} }
} }
} }
// TODO: very bad but I believe it is fine
#[derive(Clone, Copy)]
struct SendColumnFamily(ColumnFamily);
unsafe impl Send for SendColumnFamily {}
unsafe impl Sync for SendColumnFamily {}
impl Deref for SendColumnFamily {
type Target = ColumnFamily;
fn deref(&self) -> &ColumnFamily {
&self.0
}
}

Loading…
Cancel
Save