From 18aa0d37beff6a69a2a33055e5f4005a044575ae Mon Sep 17 00:00:00 2001 From: Michael Neumann Date: Sat, 18 Jul 2015 15:46:48 +0200 Subject: [PATCH] Allow RocksDB to be shared across threads --- src/rocksdb.rs | 21 ++++++++++------- test/test.rs | 1 + test/test_multithreaded.rs | 48 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 9 deletions(-) create mode 100644 test/test_multithreaded.rs diff --git a/src/rocksdb.rs b/src/rocksdb.rs index dba957a..f0f1e78 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -32,6 +32,9 @@ pub struct RocksDB { inner: rocksdb_ffi::RocksDBInstance, } +unsafe impl Send for RocksDB {} +unsafe impl Sync for RocksDB {} + pub struct WriteBatch { inner: rocksdb_ffi::RocksDBWriteBatch, } @@ -159,9 +162,9 @@ impl <'a> Drop for Snapshot<'a> { // This is for the RocksDB and write batches to share the same API pub trait Writable { - fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), String>; - fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<(), String>; - fn delete(&mut self, key: &[u8]) -> Result<(), String>; + fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String>; + fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String>; + fn delete(&self, key: &[u8]) -> Result<(), String>; } fn error_message(ptr: *const i8) -> String { @@ -308,7 +311,7 @@ impl RocksDB { } impl Writable for RocksDB { - fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> { + fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> { unsafe { let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let mut err: *const i8 = 0 as *const i8; @@ -324,7 +327,7 @@ impl Writable for RocksDB { } } - fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> { + fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> { unsafe { let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let mut err: *const i8 = 0 as *const i8; @@ -340,7 +343,7 @@ impl Writable for RocksDB { } } - fn delete(&mut self, key: &[u8]) -> Result<(), String> { + fn delete(&self, key: &[u8]) -> Result<(), String> { unsafe { let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let mut err: *const i8 = 0 as *const i8; @@ -381,7 +384,7 @@ impl Drop for RocksDB { } impl Writable for WriteBatch { - fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> { + fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> { unsafe { rocksdb_ffi::rocksdb_writebatch_put(self.inner, key.as_ptr(), key.len() as size_t, value.as_ptr(), @@ -390,7 +393,7 @@ impl Writable for WriteBatch { } } - fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> { + fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> { unsafe { rocksdb_ffi::rocksdb_writebatch_merge(self.inner, key.as_ptr(), key.len() as size_t, value.as_ptr(), @@ -399,7 +402,7 @@ impl Writable for WriteBatch { } } - fn delete(&mut self, key: &[u8]) -> Result<(), String> { + fn delete(&self, key: &[u8]) -> Result<(), String> { unsafe { rocksdb_ffi::rocksdb_writebatch_delete(self.inner, key.as_ptr(), key.len() as size_t); diff --git a/test/test.rs b/test/test.rs index d10d9ae..5a6aca5 100644 --- a/test/test.rs +++ b/test/test.rs @@ -1,3 +1,4 @@ extern crate rocksdb; mod test_iterator; +mod test_multithreaded; diff --git a/test/test_multithreaded.rs b/test/test_multithreaded.rs new file mode 100644 index 0000000..0d240e0 --- /dev/null +++ b/test/test_multithreaded.rs @@ -0,0 +1,48 @@ +use rocksdb::{Options, RocksDB, Writable, Direction, RocksDBResult}; +use std::thread::{self, Builder}; +use std::sync::Arc; + +const N: usize = 1000_000; + +#[test] +pub fn test_multithreaded() { + let path = "_rust_rocksdb_multithreadtest"; + let db = RocksDB::open_default(path).unwrap(); + let db = Arc::new(db); + + db.put(b"key", b"value1"); + + let db1 = db.clone(); + let j1 = thread::spawn(move|| { + for i in 1..N { + db1.put(b"key", b"value1"); + } + }); + + let db2 = db.clone(); + let j2 = thread::spawn(move|| { + for i in 1..N { + db2.put(b"key", b"value2"); + } + }); + + let db3 = db.clone(); + let j3 = thread::spawn(move|| { + for i in 1..N { + match db3.get(b"key") { + RocksDBResult::Some(v) => { + if &v[..] != b"value1" && &v[..] != b"value2" { + assert!(false); + } + } + _ => { + assert!(false); + } + } + } + }); + + j1.join(); + j2.join(); + j3.join(); +}