From eb3acf0991820174264b8103293c538a2d25b437 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 28 Nov 2014 15:56:30 -0500 Subject: [PATCH] merge operator skeleton functionality, proper options wrapper, proper constructor --- README.md | 3 +- rocksdb-sys/lib.rs | 28 +++++++++- src/lib.rs | 2 - src/main.rs | 4 +- src/rocksdb.rs | 124 ++++++++++++++++++++++++++++----------------- 5 files changed, 108 insertions(+), 53 deletions(-) diff --git a/README.md b/README.md index d220cd9..c4818d1 100644 --- a/README.md +++ b/README.md @@ -11,9 +11,10 @@ RocksDB 3.8.1 will be pulled in and compiled automatically. ###### Code ```rust extern crate rocksdb; +use rocksdb::Rocksdb; fn main() { - match rocksdb::create_or_open("/path/for/rocksdb/storage".to_string()) { + match Rocksdb::open_default("/path/for/rocksdb/storage".to_string()) { Ok(db) => { db.put(b"my key", b"my value"); diff --git a/rocksdb-sys/lib.rs b/rocksdb-sys/lib.rs index c29dcfc..2025bb6 100644 --- a/rocksdb-sys/lib.rs +++ b/rocksdb-sys/lib.rs @@ -1,5 +1,5 @@ extern crate libc; -use self::libc::{c_int, c_void, size_t}; +use self::libc::{c_char, c_int, c_void, size_t}; #[repr(C)] pub struct RocksdbOptions(pub *const c_void); @@ -93,7 +93,6 @@ extern { pub fn rocksdb_options_set_filter_deletes( options: RocksdbOptions, v: u8); //pub fn rocksdb_compactionfilter_create() -> RocksdbCompactionFilter; - //pub fn rocksdb_mergeoperator_create() -> RocksdbMergeOperator; pub fn rocksdb_filterpolicy_create_bloom( bits_per_key: c_int) -> RocksdbFilterPolicy; pub fn rocksdb_open(options: RocksdbOptions, @@ -113,6 +112,31 @@ extern { options: RocksdbOptions, path: *const i8, err: *mut i8); pub fn rocksdb_repair_db( options: RocksdbOptions, path: *const i8, err: *mut i8); + + // Merge Operator + pub fn rocksdb_mergeoperator_create( + state: *mut c_void, + destroy: extern fn(*mut c_void) -> (), + full_merge: extern fn ( + arg: *mut c_void, key: *const c_char, key_len: *mut size_t, + existing_value: *const c_char, existing_value_len: *mut size_t, + operands_list: &[*const c_char], operands_list_len: *const size_t, + num_operands: c_int, + success: *mut u8, new_value_length: *mut size_t + ) -> *const c_char, + partial_merge: extern fn( + *mut c_void, key: *const c_char, key_len: *mut size_t, + operands_list: &[*const c_char], operands_list_len: *const size_t, + num_operands: c_int, + success: *mut u8, new_value_length: *mut size_t + ) -> *const c_char, + delete_value: extern fn(*mut c_void, value: *const c_char, value_len: *mut size_t) -> (), + name_fn: extern fn(*mut c_void) -> *const c_char, + ) -> RocksdbMergeOperator; + pub fn rocksdb_mergeoperator_destroy(mo: RocksdbMergeOperator); + pub fn rocksdb_options_set_merge_operator( + options: RocksdbOptions, + mo: RocksdbMergeOperator); } #[allow(dead_code)] diff --git a/src/lib.rs b/src/lib.rs index bd9a1a1..5cc2739 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,8 +5,6 @@ extern crate "rocksdb-sys" as rocksdb_ffi; pub use rocksdb::{ - create_or_open, - open, Rocksdb, RocksdbResult, }; diff --git a/src/main.rs b/src/main.rs index 2c948bd..dc1180b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,11 @@ extern crate rocksdb; extern crate test; -use rocksdb::open; +use rocksdb::Rocksdb; use test::Bencher; #[allow(dead_code)] fn main() { - match rocksdb::create_or_open("/tmp/rust-rocksdb".to_string()) { + match Rocksdb::open_default("/tmp/rust-rocksdb".to_string()) { Ok(db) => { assert!(db.put(b"my key", b"my value").is_ok()); diff --git a/src/rocksdb.rs b/src/rocksdb.rs index 4271722..efb7e92 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -7,11 +7,89 @@ use std::str::from_utf8; use rocksdb_ffi; +pub struct RocksdbOptions { + inner: rocksdb_ffi::RocksdbOptions, +} + +impl RocksdbOptions { + pub fn new() -> RocksdbOptions { + unsafe { + let opts = rocksdb_ffi::rocksdb_options_create(); + let rocksdb_ffi::RocksdbOptions(opt_ptr) = opts; + if opt_ptr.is_null() { + panic!("Could not create rocksdb options".to_string()); + } + + RocksdbOptions{inner: opts} + } + } + + pub fn increase_parallelism(&self, parallelism: i32) { + unsafe { + rocksdb_ffi::rocksdb_options_increase_parallelism(self.inner, parallelism); + } + } + + pub fn optimize_level_style_compaction(&self, memtable_memory_budget: i32) { + unsafe { + rocksdb_ffi::rocksdb_options_optimize_level_style_compaction(self.inner, memtable_memory_budget); + } + } + + pub fn create_if_missing(&self, create_if_missing: bool) { + unsafe { + match create_if_missing { + true => rocksdb_ffi::rocksdb_options_set_create_if_missing(self.inner, 1), + false => rocksdb_ffi::rocksdb_options_set_create_if_missing(self.inner, 0), + } + } + } + + pub fn set_merge_operator(&self, mo: rocksdb_ffi::RocksdbMergeOperator) { + unsafe { + rocksdb_ffi::rocksdb_options_set_merge_operator(self.inner, mo); + } + } +} + pub struct Rocksdb { inner: rocksdb_ffi::RocksdbInstance, } impl Rocksdb { + pub fn open_default(path: String) -> Result { + let opts = RocksdbOptions::new(); + opts.create_if_missing(true); + Rocksdb::open(opts, path) + } + + pub fn open(opts: RocksdbOptions, path: String) -> Result { + unsafe { + let cpath = path.to_c_str(); + let cpath_ptr = cpath.as_ptr(); + + //TODO test path here, as if rocksdb fails it will just crash the + // process currently + + let err = 0 as *mut i8; + let db = rocksdb_ffi::rocksdb_open(opts.inner, cpath_ptr, err); + let rocksdb_ffi::RocksdbInstance(db_ptr) = db; + if err.is_not_null() { + let cs = CString::new(err as *const i8, true); + match cs.as_str() { + Some(error_string) => + return Err(error_string.to_string()), + None => + return Err("Could not initialize database.".to_string()), + } + } + if db_ptr.is_null() { + return Err("Could not initialize database.".to_string()); + } + Ok(Rocksdb{inner: db}) + } + } + pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> { unsafe { let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); @@ -102,7 +180,6 @@ impl Rocksdb { pub fn close(&self) { unsafe { rocksdb_ffi::rocksdb_close(self.inner); } } - } pub struct RocksdbVector { @@ -200,51 +277,6 @@ impl <'a,T,E> RocksdbResult<'a,T,E> { } } -pub fn create_or_open(path: String) -> Result { - open(path, true) -} - -pub fn open(path: String, create_if_missing: bool) -> Result { - unsafe { - let opts = rocksdb_ffi::rocksdb_options_create(); - let rocksdb_ffi::RocksdbOptions(opt_ptr) = opts; - if opt_ptr.is_null() { - return Err("Could not create options".to_string()); - } - - //rocksdb_ffi::rocksdb_options_increase_parallelism(opts, 2); - //rocksdb_ffi::rocksdb_options_optimize_level_style_compaction(opts, 0); - - match create_if_missing { - true => rocksdb_ffi::rocksdb_options_set_create_if_missing(opts, 1), - false => rocksdb_ffi::rocksdb_options_set_create_if_missing(opts, 0), - } - - let cpath = path.to_c_str(); - let cpath_ptr = cpath.as_ptr(); - - //TODO test path here, as if rocksdb fails it will just crash the - // process currently - - let err = 0 as *mut i8; - let db = rocksdb_ffi::rocksdb_open(opts, cpath_ptr, err); - let rocksdb_ffi::RocksdbInstance(db_ptr) = db; - if err.is_not_null() { - let cs = CString::new(err as *const i8, true); - match cs.as_str() { - Some(error_string) => - return Err(error_string.to_string()), - None => - return Err("Could not initialize database.".to_string()), - } - } - if db_ptr.is_null() { - return Err("Could not initialize database.".to_string()); - } - Ok(Rocksdb{inner: db}) - } -} - #[allow(dead_code)] #[test] fn external() {