diff --git a/src/ffi.rs b/src/ffi.rs index d5cc415..5a15cfd 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -15,48 +15,45 @@ */ extern crate libc; use self::libc::{c_char, c_int, c_void, size_t}; +use std::ffi::CString; +#[derive(Copy, Clone)] #[repr(C)] pub struct RocksDBOptions(pub *const c_void); +#[derive(Copy, Clone)] #[repr(C)] pub struct RocksDBInstance(pub *const c_void); +#[derive(Copy, Clone)] #[repr(C)] pub struct RocksDBWriteOptions(pub *const c_void); +#[derive(Copy, Clone)] #[repr(C)] pub struct RocksDBReadOptions(pub *const c_void); +#[derive(Copy, Clone)] #[repr(C)] pub struct RocksDBMergeOperator(pub *const c_void); +#[derive(Copy, Clone)] #[repr(C)] pub struct RocksDBBlockBasedTableOptions(pub *const c_void); +#[derive(Copy, Clone)] #[repr(C)] pub struct RocksDBCache(pub *const c_void); +#[derive(Copy, Clone)] #[repr(C)] pub struct RocksDBFilterPolicy(pub *const c_void); +#[derive(Copy, Clone)] #[repr(C)] pub struct RocksDBSnapshot(pub *const c_void); +#[derive(Copy, Clone)] #[repr(C)] pub struct RocksDBIterator(pub *const c_void); +#[derive(Copy, Clone)] #[repr(C)] pub struct RocksDBCFHandle(pub *const c_void); +#[derive(Copy, Clone)] #[repr(C)] pub struct RocksDBWriteBatch(pub *const c_void); -impl Copy for RocksDBOptions {} -impl Copy for RocksDBInstance {} -impl Copy for RocksDBWriteOptions {} -impl Copy for RocksDBReadOptions {} -impl Copy for RocksDBMergeOperator {} -impl Copy for RocksDBBlockBasedTableOptions {} -impl Copy for RocksDBCache {} -impl Copy for RocksDBFilterPolicy {} -impl Copy for RocksDBCompactionStyle {} -impl Copy for RocksDBCompressionType {} -impl Copy for RocksDBUniversalCompactionStyle {} -impl Copy for RocksDBSnapshot {} -impl Copy for RocksDBIterator {} -impl Copy for RocksDBCFHandle {} -impl Copy for RocksDBWriteBatch {} - pub fn new_bloom_filter(bits: c_int) -> RocksDBFilterPolicy { unsafe { rocksdb_filterpolicy_create_bloom(bits) @@ -329,7 +326,7 @@ fn internal() { rocksdb_options_set_create_if_missing(opts, true); let rustpath = "_rust_rocksdb_internaltest"; - let cpath = CString::from_slice(rustpath.as_bytes()); + let cpath = CString::new(rustpath).unwrap(); let cpath_ptr = cpath.as_ptr(); let err = 0 as *mut i8; diff --git a/src/lib.rs b/src/lib.rs index 91c2ef1..5a760e4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,12 +15,11 @@ */ #![crate_id = "rocksdb"] #![crate_type = "lib"] -#![feature(collections)] -#![feature(core)] #![feature(libc)] #![feature(unique)] #![feature(path_ext)] - +#![feature(raw)] +#![feature(vec_push_all)] pub use ffi as rocksdb_ffi; pub use ffi::{ diff --git a/src/main.rs b/src/main.rs index 1c62bdf..dddeabe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,16 +13,14 @@ See the License for the specific language governing permissions and limitations under the License. */ -#![feature(collections)] #![feature(test)] +#![feature(vec_push_all)] extern crate rocksdb; extern crate test; use rocksdb::{RocksDBOptions, RocksDB, MergeOperands, new_bloom_filter}; use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction; -use test::Bencher; -#[allow(dead_code)] fn main() { let path = "/tmp/rust-rocksdb"; let db = RocksDB::open_default(path).unwrap(); @@ -44,7 +42,6 @@ fn main() { custom_merge(); } -#[allow(dead_code)] fn concat_merge(new_key: &[u8], existing_val: Option<&[u8]>, mut operands: &mut MergeOperands) -> Vec { let mut result: Vec = Vec::with_capacity(operands.size_hint().0); @@ -58,78 +55,95 @@ fn concat_merge(new_key: &[u8], existing_val: Option<&[u8]>, result } -#[allow(dead_code)] fn custom_merge() { let path = "_rust_rocksdb_mergetest"; let opts = RocksDBOptions::new(); opts.create_if_missing(true); opts.add_merge_operator("test operator", concat_merge); let db = RocksDB::open(opts, path).unwrap(); - let p = db.put(b"k1", b"a"); + db.put(b"k1", b"a"); db.merge(b"k1", b"b"); db.merge(b"k1", b"c"); db.merge(b"k1", b"d"); db.merge(b"k1", b"efg"); - let m = db.merge(b"k1", b"h"); - let r = db.get(b"k1"); - assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh"); + db.merge(b"k1", b"h"); + db.get(b"k1").map( |value| { + match value.to_utf8() { + Some(v) => + println!("retrieved utf8 value: {}", v), + None => + println!("did not read valid utf-8 out of the db"), + } + }) + .on_absent( || { println!("value not found") }) + .on_error( |e| { println!("error retrieving value: {}", e) }); + db.close(); RocksDB::destroy(opts, path).is_ok(); } -#[allow(dead_code)] -fn tuned_for_somebody_elses_disk() -> RocksDB { - let path = "_rust_rocksdb_optimizetest"; - let opts = RocksDBOptions::new(); - opts.create_if_missing(true); - opts.set_block_size(524288); - opts.set_max_open_files(10000); - opts.set_use_fsync(false); - opts.set_bytes_per_sync(8388608); - opts.set_disable_data_sync(false); - opts.set_block_cache_size_mb(1024); - opts.set_table_cache_num_shard_bits(6); - opts.set_max_write_buffer_number(32); - opts.set_write_buffer_size(536870912); - opts.set_target_file_size_base(1073741824); - opts.set_min_write_buffer_number_to_merge(4); - opts.set_level_zero_stop_writes_trigger(2000); - opts.set_level_zero_slowdown_writes_trigger(0); - opts.set_compaction_style(RocksDBUniversalCompaction); - opts.set_max_background_compactions(4); - opts.set_max_background_flushes(4); - opts.set_filter_deletes(false); - opts.set_disable_auto_compactions(true); - - let filter = new_bloom_filter(10); - opts.set_filter(filter); - - RocksDB::open(opts, path).unwrap() -} -#[allow(dead_code)] -#[bench] -fn writes(b: &mut Bencher) { - let db = tuned_for_somebody_elses_disk(); - let mut i = 0 as u64; - b.iter(|| { - db.put(i.to_string().as_bytes(), b"v1111"); - i += 1; - }); - db.close(); -} +#[cfg(test)] +mod tests { + use test::Bencher; + use std::thread::sleep_ms; + + use rocksdb::{RocksDBOptions, RocksDB, MergeOperands, new_bloom_filter}; + use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction; + + fn tuned_for_somebody_elses_disk() -> RocksDB { + let path = "_rust_rocksdb_optimizetest"; + let opts = RocksDBOptions::new(); + opts.create_if_missing(true); + opts.set_block_size(524288); + opts.set_max_open_files(10000); + opts.set_use_fsync(false); + opts.set_bytes_per_sync(8388608); + opts.set_disable_data_sync(false); + opts.set_block_cache_size_mb(1024); + opts.set_table_cache_num_shard_bits(6); + opts.set_max_write_buffer_number(32); + opts.set_write_buffer_size(536870912); + opts.set_target_file_size_base(1073741824); + opts.set_min_write_buffer_number_to_merge(4); + opts.set_level_zero_stop_writes_trigger(2000); + opts.set_level_zero_slowdown_writes_trigger(0); + opts.set_compaction_style(RocksDBUniversalCompaction); + opts.set_max_background_compactions(4); + opts.set_max_background_flushes(4); + opts.set_filter_deletes(false); + opts.set_disable_auto_compactions(true); + + let filter = new_bloom_filter(10); + opts.set_filter(filter); + + RocksDB::open(opts, path).unwrap() + } -#[allow(dead_code)] -#[bench] -fn reads(b: &mut Bencher) { - let db = tuned_for_somebody_elses_disk(); - let mut i = 0 as u64; - b.iter(|| { - db.get(i.to_string().as_bytes()).on_error( |e| { - println!("error: {}", e); - e + #[bench] + fn writes(b: &mut Bencher) { + // dirty hack due to parallel tests causing contention. + sleep_ms(1000); + let db = tuned_for_somebody_elses_disk(); + let mut i = 0 as u64; + b.iter(|| { + db.put(i.to_string().as_bytes(), b"v1111"); + i += 1; }); - i += 1; - }); - db.close(); + db.close(); + } + + #[bench] + fn reads(b: &mut Bencher) { + let db = tuned_for_somebody_elses_disk(); + let mut i = 0 as u64; + b.iter(|| { + db.get(i.to_string().as_bytes()).on_error( |e| { + println!("error: {}", e); + e + }); + i += 1; + }); + db.close(); + } } diff --git a/src/merge_operator.rs b/src/merge_operator.rs index 3621576..35006d6 100644 --- a/src/merge_operator.rs +++ b/src/merge_operator.rs @@ -56,9 +56,9 @@ pub extern "C" fn full_merge_callback( &mut MergeOperands::new(operands_list, operands_list_len, num_operands); - let key: &[u8] = mem::transmute(slice::from_raw_buf(&raw_key, key_len as usize)); - let oldval: &[u8] = mem::transmute(slice::from_raw_buf(&existing_value, - existing_value_len as usize)); + let key: &[u8] = slice::from_raw_parts(raw_key as *const u8, key_len as usize); + let oldval: &[u8] = slice::from_raw_parts(existing_value as *const u8, + existing_value_len as usize); let mut result = (cb.merge_fn)(key, Some(oldval), operands); result.shrink_to_fit(); @@ -67,7 +67,7 @@ pub extern "C" fn full_merge_callback( assert!(!buf.is_null()); *new_value_length = result.len() as size_t; *success = 1 as u8; - ptr::copy(&mut *buf, result.as_ptr() as *const c_void, result.len()); + ptr::copy(result.as_ptr() as *mut c_void, &mut *buf, result.len()); buf as *const c_char } } @@ -83,7 +83,7 @@ pub extern "C" fn partial_merge_callback( let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); - let key: &[u8] = mem::transmute(slice::from_raw_buf(&raw_key, key_len as usize)); + let key: &[u8] = slice::from_raw_parts(raw_key as *const u8, key_len as usize); let mut result = (cb.merge_fn)(key, None, operands); result.shrink_to_fit(); //TODO(tan) investigate zero-copy techniques to improve performance @@ -91,7 +91,7 @@ pub extern "C" fn partial_merge_callback( assert!(!buf.is_null()); *new_value_length = 1 as size_t; *success = 1 as u8; - ptr::copy(&mut *buf, result.as_ptr() as *const c_void, result.len()); + ptr::copy(result.as_ptr() as *mut c_void, &mut *buf, result.len()); buf as *const c_char } } diff --git a/src/rocksdb.rs b/src/rocksdb.rs index 942795d..f4e84f7 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -52,7 +52,7 @@ impl RocksDB { let ospath = Path::new(path); if !ospath.exists() { match fs::create_dir_all(&ospath) { - Err(_) => return Err(""), + Err(e) => return Err("Failed to create rocksdb directory."), Ok(_) => (), } } @@ -198,7 +198,7 @@ pub struct RocksDBVector { impl Deref for RocksDBVector { type Target = [u8]; fn deref(&self) -> &[u8] { - unsafe { slice::from_raw_mut_buf(self.base.deref(), self.len) } + unsafe { slice::from_raw_parts(self.base.get(), self.len) } } } @@ -213,9 +213,8 @@ impl Drop for RocksDBVector { impl RocksDBVector { pub fn from_c(val: *mut u8, val_len: size_t) -> RocksDBVector { unsafe { - let base = Unique::new(val); RocksDBVector { - base: base, + base: Unique::new(val), len: val_len as usize, } } diff --git a/src/rocksdb_options.rs b/src/rocksdb_options.rs index 16bbecf..34f8c84 100644 --- a/src/rocksdb_options.rs +++ b/src/rocksdb_options.rs @@ -22,13 +22,12 @@ use rocksdb_ffi; use merge_operator::{MergeOperatorCallback, MergeOperands, destructor_callback, full_merge_callback, partial_merge_callback, name_callback}; +#[derive(Copy, Clone)] pub struct RocksDBOptions { pub inner: rocksdb_ffi::RocksDBOptions, block_options: rocksdb_ffi::RocksDBBlockBasedTableOptions, } -impl Copy for RocksDBOptions {} - impl RocksDBOptions { pub fn new() -> RocksDBOptions { unsafe {