From 0290a2ce4bdb79a004ef7e824779e9eb54fee840 Mon Sep 17 00:00:00 2001 From: Tpt Date: Wed, 14 Nov 2018 14:20:17 +0100 Subject: [PATCH] Makes RocksDbStore Send and Sync --- lib/src/store/rocksdb.rs | 73 ++++++++++++++++++++++++---------------- 1 file changed, 44 insertions(+), 29 deletions(-) diff --git a/lib/src/store/rocksdb.rs b/lib/src/store/rocksdb.rs index e895eb66..2fd0f732 100644 --- a/lib/src/store/rocksdb.rs +++ b/lib/src/store/rocksdb.rs @@ -8,6 +8,7 @@ use rocksdb::Options; use rocksdb::WriteBatch; use rocksdb::DB; use std::io::Cursor; +use std::ops::Deref; use std::path::Path; use std::str; use std::sync::Mutex; @@ -47,11 +48,11 @@ const COLUMN_FAMILIES: [&str; 5] = [ID2STR_CF, STR2ID_CF, SPOG_CF, POSG_CF, OSPG pub struct RocksDbStore { db: DB, str_id_counter: Mutex, - id2str_cf: ColumnFamily, - str2id_cf: ColumnFamily, - spog_cf: ColumnFamily, - posg_cf: ColumnFamily, - ospg_cf: ColumnFamily, + id2str_cf: SendColumnFamily, + str2id_cf: SendColumnFamily, + spog_cf: SendColumnFamily, + posg_cf: SendColumnFamily, + ospg_cf: SendColumnFamily, } impl RocksDbStore { @@ -61,11 +62,11 @@ impl RocksDbStore { options.create_missing_column_families(true); let db = DB::open_cf(&options, path, &COLUMN_FAMILIES)?; - let id2str_cf = get_cf(&db, STR2ID_CF)?; - let str2id_cf = get_cf(&db, ID2STR_CF)?; - let spog_cf = get_cf(&db, SPOG_CF)?; - let posg_cf = get_cf(&db, POSG_CF)?; - let ospg_cf = get_cf(&db, OSPG_CF)?; + let id2str_cf = SendColumnFamily(get_cf(&db, STR2ID_CF)?); + let str2id_cf = SendColumnFamily(get_cf(&db, ID2STR_CF)?); + let spog_cf = SendColumnFamily(get_cf(&db, SPOG_CF)?); + let posg_cf = SendColumnFamily(get_cf(&db, POSG_CF)?); + let ospg_cf = SendColumnFamily(get_cf(&db, OSPG_CF)?); let new = Self { db, @@ -85,7 +86,7 @@ impl BytesStore for RocksDbStore { type BytesOutput = DBVector; fn insert_bytes(&self, value: &[u8]) -> Result { - Ok(if let Some(id) = self.db.get_cf(self.str2id_cf, value)? { + Ok(if let Some(id) = self.db.get_cf(*self.str2id_cf, value)? { LittleEndian::read_u64(&id) } else { let id = self @@ -95,15 +96,15 @@ impl BytesStore for RocksDbStore { .get_and_increment(&self.db)? as u64; let id_bytes = to_bytes(id); let mut batch = WriteBatch::default(); - batch.put_cf(self.id2str_cf, &id_bytes, value)?; - batch.put_cf(self.str2id_cf, value, &id_bytes)?; + batch.put_cf(*self.id2str_cf, &id_bytes, value)?; + batch.put_cf(*self.str2id_cf, value, &id_bytes)?; self.db.write(batch)?; id }) } fn get_bytes(&self, id: u64) -> Result> { - Ok(self.db.get_cf(self.id2str_cf, &to_bytes(id))?) + Ok(self.db.get_cf(*self.id2str_cf, &to_bytes(id))?) } } @@ -131,7 +132,7 @@ impl EncodedQuadsStore for RocksDbStore { InGraphQuadsIterator>; fn quads(&self) -> Result { - let mut iter = self.db.raw_iterator_cf(self.spog_cf)?; + let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; iter.seek_to_first(); Ok(SPOGIndexIterator { iter }) } @@ -140,7 +141,7 @@ impl EncodedQuadsStore for RocksDbStore { &self, subject: EncodedTerm, ) -> Result> { - let mut iter = self.db.raw_iterator_cf(self.spog_cf)?; + let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; iter.seek(&encode_term(subject)?); Ok(FilteringEncodedQuadsIterator { iter: SPOGIndexIterator { iter }, @@ -153,7 +154,7 @@ impl EncodedQuadsStore for RocksDbStore { subject: EncodedTerm, predicate: EncodedTerm, ) -> Result> { - let mut iter = self.db.raw_iterator_cf(self.spog_cf)?; + let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; iter.seek(&encode_term_pair(subject, predicate)?); Ok(FilteringEncodedQuadsIterator { iter: SPOGIndexIterator { iter }, @@ -167,7 +168,7 @@ impl EncodedQuadsStore for RocksDbStore { predicate: EncodedTerm, object: EncodedTerm, ) -> Result> { - let mut iter = self.db.raw_iterator_cf(self.spog_cf)?; + let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; iter.seek(&encode_term_triple(subject, predicate, object)?); Ok(FilteringEncodedQuadsIterator { iter: SPOGIndexIterator { iter }, @@ -180,7 +181,7 @@ impl EncodedQuadsStore for RocksDbStore { subject: EncodedTerm, object: EncodedTerm, ) -> Result> { - let mut iter = self.db.raw_iterator_cf(self.spog_cf)?; + let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; iter.seek(&encode_term_pair(object, subject)?); Ok(FilteringEncodedQuadsIterator { iter: OSPGIndexIterator { iter }, @@ -192,7 +193,7 @@ impl EncodedQuadsStore for RocksDbStore { &self, predicate: EncodedTerm, ) -> Result> { - let mut iter = self.db.raw_iterator_cf(self.posg_cf)?; + let mut iter = self.db.raw_iterator_cf(*self.posg_cf)?; iter.seek(&encode_term(predicate)?); Ok(FilteringEncodedQuadsIterator { iter: POSGIndexIterator { iter }, @@ -205,7 +206,7 @@ impl EncodedQuadsStore for RocksDbStore { predicate: EncodedTerm, object: EncodedTerm, ) -> Result> { - let mut iter = self.db.raw_iterator_cf(self.spog_cf)?; + let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; iter.seek(&encode_term_pair(predicate, object)?); Ok(FilteringEncodedQuadsIterator { iter: POSGIndexIterator { iter }, @@ -217,7 +218,7 @@ impl EncodedQuadsStore for RocksDbStore { &self, object: EncodedTerm, ) -> Result> { - let mut iter = self.db.raw_iterator_cf(self.ospg_cf)?; + let mut iter = self.db.raw_iterator_cf(*self.ospg_cf)?; iter.seek(&encode_term(object)?); Ok(FilteringEncodedQuadsIterator { iter: OSPGIndexIterator { iter }, @@ -307,24 +308,24 @@ impl EncodedQuadsStore for RocksDbStore { fn contains(&self, quad: &EncodedQuad) -> Result { Ok(self .db - .get_cf(self.spog_cf, &encode_spog_quad(quad)?)? + .get_cf(*self.spog_cf, &encode_spog_quad(quad)?)? .is_some()) } fn insert(&self, quad: &EncodedQuad) -> Result<()> { let mut batch = WriteBatch::default(); - batch.put_cf(self.spog_cf, &encode_spog_quad(quad)?, &EMPTY_BUF)?; - batch.put_cf(self.posg_cf, &encode_posg_quad(quad)?, &EMPTY_BUF)?; - batch.put_cf(self.ospg_cf, &encode_ospg_quad(quad)?, &EMPTY_BUF)?; + batch.put_cf(*self.spog_cf, &encode_spog_quad(quad)?, &EMPTY_BUF)?; + batch.put_cf(*self.posg_cf, &encode_posg_quad(quad)?, &EMPTY_BUF)?; + batch.put_cf(*self.ospg_cf, &encode_ospg_quad(quad)?, &EMPTY_BUF)?; self.db.write(batch)?; //TODO: check what's going on if the key already exists Ok(()) } fn remove(&self, quad: &EncodedQuad) -> Result<()> { let mut batch = WriteBatch::default(); - batch.delete_cf(self.spog_cf, &encode_spog_quad(quad)?)?; - batch.delete_cf(self.posg_cf, &encode_posg_quad(quad)?)?; - batch.delete_cf(self.ospg_cf, &encode_ospg_quad(quad)?)?; + batch.delete_cf(*self.spog_cf, &encode_spog_quad(quad)?)?; + batch.delete_cf(*self.posg_cf, &encode_posg_quad(quad)?)?; + batch.delete_cf(*self.ospg_cf, &encode_ospg_quad(quad)?)?; self.db.write(batch)?; Ok(()) } @@ -536,3 +537,17 @@ impl From> for RocksDBCounterMutexPoisonError { } } } + +// TODO: very bad but I believe it is fine +#[derive(Clone, Copy)] +struct SendColumnFamily(ColumnFamily); +unsafe impl Send for SendColumnFamily {} +unsafe impl Sync for SendColumnFamily {} + +impl Deref for SendColumnFamily { + type Target = ColumnFamily; + + fn deref(&self) -> &ColumnFamily { + &self.0 + } +}