diff --git a/src/ffi.rs b/src/ffi.rs index d727556..dfca528 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -112,26 +112,28 @@ extern { options: RocksDBOptions, path: *const i8, err: *mut i8); pub fn rocksdb_repair_db( options: RocksDBOptions, path: *const i8, err: *mut i8); - - // Merge Operator + // Merge + pub fn rocksdb_merge(db: RocksDBInstance, writeopts: RocksDBWriteOptions, + k: *const u8, kLen: size_t, + v: *const u8, vLen: size_t, err: *mut i8); pub fn rocksdb_mergeoperator_create( state: *mut c_void, destroy: extern fn(*mut c_void) -> (), full_merge: extern fn ( - arg: *mut c_void, key: *const c_char, key_len: *mut size_t, - existing_value: *const c_char, existing_value_len: *mut size_t, - operands_list: &[*const c_char], operands_list_len: *const size_t, + arg: *mut c_void, key: *const c_char, key_len: size_t, + existing_value: *const c_char, existing_value_len: size_t, + operands_list: *const *const c_char, operands_list_len: *const size_t, num_operands: c_int, success: *mut u8, new_value_length: *mut size_t ) -> *const c_char, partial_merge: extern fn( - *mut c_void, key: *const c_char, key_len: *mut size_t, - operands_list: &[*const c_char], operands_list_len: *const size_t, + arg: *mut c_void, key: *const c_char, key_len: size_t, + operands_list: *const *const c_char, operands_list_len: *const size_t, num_operands: c_int, success: *mut u8, new_value_length: *mut size_t ) -> *const c_char, - delete_value: extern fn(*mut c_void, value: *const c_char, - value_len: *mut size_t) -> (), + delete_value: Option ()>, name_fn: extern fn(*mut c_void) -> *const c_char, ) -> RocksDBMergeOperator; pub fn rocksdb_mergeoperator_destroy(mo: RocksDBMergeOperator); @@ -152,14 +154,13 @@ fn internal() { rocksdb_options_optimize_level_style_compaction(opts, 0); rocksdb_options_set_create_if_missing(opts, 1); - let rustpath = "internaltest"; + let rustpath = "_rust_rocksdb_internaltest"; let cpath = rustpath.to_c_str(); let cpath_ptr = cpath.as_ptr(); let err = 0 as *mut i8; let db = rocksdb_open(opts, cpath_ptr, err); assert!(err.is_null()); - libc::free(err as *mut c_void); let writeopts = rocksdb_writeoptions_create(); let RocksDBWriteOptions(write_opt_ptr) = writeopts; @@ -170,18 +171,17 @@ fn internal() { rocksdb_put(db, writeopts, key.as_ptr(), 4, val.as_ptr(), 8, err); assert!(err.is_null()); - libc::free(err as *mut c_void); let readopts = rocksdb_readoptions_create(); let RocksDBReadOptions(read_opts_ptr) = readopts; assert!(read_opts_ptr.is_not_null()); - libc::free(err as *mut c_void); let val_len: size_t = 0; let val_len_ptr = &val_len as *const size_t; rocksdb_get(db, readopts, key.as_ptr(), 4, val_len_ptr, err); assert!(err.is_null()); - libc::free(err as *mut c_void); rocksdb_close(db); + rocksdb_destroy_db(opts, cpath_ptr, err); + assert!(err.is_null()); } } diff --git a/src/rocksdb.rs b/src/rocksdb.rs index fb3006b..b732955 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -1,9 +1,13 @@ extern crate libc; -use self::libc::{c_void, size_t}; +use self::libc::{c_char, c_int, c_void, size_t}; use std::io::{IoError}; use std::c_vec::CVec; use std::c_str::CString; use std::str::from_utf8; +use std::string::raw::from_buf_len; +use std::ptr; +use std::mem; +use std::slice; use rocksdb_ffi; @@ -50,8 +54,20 @@ impl RocksDBOptions { } } - pub fn set_merge_operator(&self, mo: rocksdb_ffi::RocksDBMergeOperator) { + pub fn add_merge_operator<'a>(&self, name: &str, merge_fn: for <'a> fn (&[u8], Option<&[u8]>, &mut MergeOperands) -> Vec) { + let cb = box MergeOperatorCallback { + name: name.to_c_str(), + merge_fn: merge_fn, + }; + unsafe { + let mo = rocksdb_ffi::rocksdb_mergeoperator_create( + mem::transmute(cb), + destructor_callback, + full_merge_callback, + partial_merge_callback, + None, + name_callback); rocksdb_ffi::rocksdb_options_set_merge_operator(self.inner, mo); } } @@ -94,6 +110,29 @@ impl RocksDB { Ok(RocksDB{inner: db}) } } + + pub fn destroy(opts: RocksDBOptions, path: &str) -> Result<(), String> { + unsafe { + let cpath = path.to_c_str(); + let cpath_ptr = cpath.as_ptr(); + + //TODO test path here, as if rocksdb fails it will just crash the + // process currently + + let err = 0 as *mut i8; + let result = rocksdb_ffi::rocksdb_destroy_db(opts.inner, cpath_ptr, err); + if err.is_not_null() { + let cs = CString::new(err as *const i8, true); + match cs.as_str() { + Some(error_string) => + return Err(error_string.to_string()), + None => + return Err("Could not initialize database.".to_string()), + } + } + Ok(()) + } + } pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> { unsafe { @@ -121,6 +160,33 @@ impl RocksDB { } } + pub fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> { + unsafe { + let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); + let err = 0 as *mut i8; + rocksdb_ffi::rocksdb_merge(self.inner, writeopts, key.as_ptr(), + key.len() as size_t, value.as_ptr(), + value.len() as size_t, err); + if err.is_not_null() { + let cs = CString::new(err as *const i8, true); + match cs.as_str() { + Some(error_string) => + return Err(error_string.to_string()), + None => { + let ie = IoError::last_error(); + return Err(format!( + "ERROR: desc:{}, details:{}", + ie.desc, + ie.detail.unwrap_or_else( + || {"none provided by OS".to_string()}))) + } + } + } + return Ok(()) + } + } + + pub fn get<'a>(&self, key: &[u8]) -> RocksDBResult<'a, RocksDBVector, String> { unsafe { @@ -288,7 +354,8 @@ impl <'a,T,E> RocksDBResult<'a,T,E> { #[allow(dead_code)] #[test] fn external() { - let db = RocksDB::open_default("externaltest").unwrap(); + let path = "_rust_rocksdb_externaltest"; + let db = RocksDB::open_default(path).unwrap(); let p = db.put(b"k1", b"v1111"); assert!(p.is_ok()); let r: RocksDBResult = db.get(b"k1"); @@ -296,4 +363,168 @@ fn external() { assert!(db.delete(b"k1").is_ok()); assert!(db.get(b"k1").is_none()); db.close(); + let opts = RocksDBOptions::new(); + assert!(RocksDB::destroy(opts, path).is_ok()); +} + +pub struct MergeOperands<'a> { + operands_list: *const *const c_char, + operands_list_len: *const size_t, + num_operands: uint, + cursor: uint, +} + +impl <'a> MergeOperands<'a> { + fn new<'a>(operands_list: *const *const c_char, operands_list_len: *const size_t, + num_operands: c_int) -> MergeOperands<'a> { + assert!(num_operands >= 0); + MergeOperands { + operands_list: operands_list, + operands_list_len: operands_list_len, + num_operands: num_operands as uint, + cursor: 0, + } + } +} + +impl <'a> Iterator<&'a [u8]> for &'a mut MergeOperands<'a> { + fn next(&mut self) -> Option<&'a [u8]> { + use std::raw::Slice; + match self.cursor == self.num_operands { + true => None, + false => { + unsafe { + let base = self.operands_list as uint; + let base_len = self.operands_list_len as uint; + let spacing = mem::size_of::<*const *const u8>(); + let spacing_len = mem::size_of::<*const size_t>(); + let len_ptr = (base_len + (spacing_len * self.cursor)) as *const size_t; + let len = *len_ptr as uint; + let ptr = base + (spacing * self.cursor); + let op = from_buf_len(*(ptr as *const *const u8), len); + let des: Option = from_str(op.as_slice()); + self.cursor += 1; + Some(mem::transmute(Slice{data:*(ptr as *const *const u8) as *const u8, len: len})) + } + } + } + } + + fn size_hint(&self) -> (uint, Option) { + let remaining = self.num_operands - self.cursor; + (remaining, Some(remaining)) + } +} + +struct MergeOperatorCallback { + name: CString, + merge_fn: for <'b> fn (&[u8], Option<&[u8]>, &mut MergeOperands) -> Vec, +} + +extern "C" fn destructor_callback(raw_cb: *mut c_void) { + // turn this back into a local variable so rust will reclaim it + let _: Box = unsafe {mem::transmute(raw_cb)}; + +} + +extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char { + unsafe { + let cb: &mut MergeOperatorCallback = &mut *(raw_cb as *mut MergeOperatorCallback); + let ptr = cb.name.as_ptr(); + ptr as *const c_char + } +} + +extern "C" fn full_merge_callback( + raw_cb: *mut c_void, key: *const c_char, key_len: size_t, + existing_value: *const c_char, existing_value_len: size_t, + operands_list: *const *const c_char, operands_list_len: *const size_t, + num_operands: c_int, + success: *mut u8, new_value_length: *mut size_t) -> *const c_char { + unsafe { + let cb: &mut MergeOperatorCallback = &mut *(raw_cb as *mut MergeOperatorCallback); + let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); + let key = from_buf_len(key as *const u8, key_len as uint); + let oldval = from_buf_len(existing_value as *const u8, existing_value_len as uint); + let mut result = (cb.merge_fn)(key.as_bytes(), Some(oldval.as_bytes()), operands); + result.shrink_to_fit(); + //TODO(tan) investigate zero-copy techniques to improve performance + let buf = libc::malloc(result.len() as size_t); + assert!(buf.is_not_null()); + *new_value_length = result.len() as size_t; + *success = 1 as u8; + ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len()); + buf as *const c_char + } +} + +extern "C" fn partial_merge_callback( + raw_cb: *mut c_void, key: *const c_char, key_len: size_t, + operands_list: *const *const c_char, operands_list_len: *const size_t, + num_operands: c_int, + success: *mut u8, new_value_length: *mut size_t) -> *const c_char { + unsafe { + let cb: &mut MergeOperatorCallback = &mut *(raw_cb as *mut MergeOperatorCallback); + let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); + let key = from_buf_len(key as *const u8, key_len as uint); + let mut result = (cb.merge_fn)(key.as_bytes(), None, operands); + result.shrink_to_fit(); + //TODO(tan) investigate zero-copy techniques to improve performance + let buf = libc::malloc(result.len() as size_t); + assert!(buf.is_not_null()); + *new_value_length = 1 as size_t; + *success = 1 as u8; + ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len()); + buf as *const c_char + } +} + +fn test_provided_merge<'a>(new_key: &[u8], existing_val: Option<&[u8]>, + mut operands: &mut MergeOperands) -> Vec { + let mut result: Vec = Vec::with_capacity(operands.size_hint().val0()); + match existing_val { + Some(v) => result.push_all(v), + None => (), + } + for op in operands { + result.push_all(op); + } + result +} + +#[allow(dead_code)] +#[test] +fn mergetest() { + let path = "_rust_rocksdb_mergetest"; + unsafe { + let opts = RocksDBOptions::new(); + opts.create_if_missing(true); + opts.add_merge_operator("test operator", test_provided_merge); + let db = RocksDB::open(opts, path).unwrap(); + let p = db.put(b"k1", b"a"); + assert!(p.is_ok()); + db.merge(b"k1", b"b"); + db.merge(b"k1", b"c"); + db.merge(b"k1", b"d"); + db.merge(b"k1", b"efg"); + let m = db.merge(b"k1", b"h"); + assert!(m.is_ok()); + db.get(b"k1").map( |value| { + match value.to_utf8() { + Some(v) => + println!("retrieved utf8 value: {}", v), + None => + println!("did not read valid utf-8 out of the db"), + } + }).on_absent( || { println!("value not present!") }) + .on_error( |e| { println!("error reading value")}); //: {}", e) }); + + assert!(m.is_ok()); + let r: RocksDBResult = db.get(b"k1"); + assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh"); + assert!(db.delete(b"k1").is_ok()); + assert!(db.get(b"k1").is_none()); + db.close(); + assert!(RocksDB::destroy(opts, path).is_ok()); + } }