diff --git a/src/rocksdb.rs b/src/rocksdb.rs index c855918..02db25e 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -54,10 +54,21 @@ impl RocksDBOptions { } } - pub fn add_merge_operator(&self, name: &[str], merge_fn: for <'b> fn (String, Option, &mut MergeOperands) -> &'b [u8]) { + pub fn add_merge_operator<'a>(&self, name: &str, merge_fn: for <'a> fn (String, Option, &mut MergeOperands) -> Vec) { + let cb = box MergeOperatorCallback { + name: name.to_c_str(), + merge_fn: merge_fn, + }; + unsafe { - let mo = MergeOperator::new(name, merge_fn); - rocksdb_ffi::rocksdb_options_set_merge_operator(self.inner, mo.mo); + let mo = rocksdb_ffi::rocksdb_mergeoperator_create( + mem::transmute(cb), + destructor_callback, + full_merge_callback, + partial_merge_callback, + None, + name_callback); + rocksdb_ffi::rocksdb_options_set_merge_operator(self.inner, mo); } } } @@ -356,7 +367,7 @@ fn external() { assert!(RocksDB::destroy(opts, path).is_ok()); } -struct MergeOperands<'a> { +pub struct MergeOperands<'a> { operands_list: *const *const c_char, operands_list_len: *const size_t, num_operands: uint, @@ -405,118 +416,80 @@ impl <'a> Iterator<&'a [u8]> for &'a mut MergeOperands<'a> { } } -struct MergeOperatorState<'a> { - name: &'a [str], - merge_fn: for <'b> fn (String, Option, &mut MergeOperands) -> &'b [u8], +struct MergeOperatorCallback { + name: CString, + merge_fn: for <'b> fn (String, Option, &mut MergeOperands) -> Vec, } -struct MergeOperator<'a> { - mo: rocksdb_ffi::RocksDBMergeOperator, - state: MergeOperatorState<'a>, +extern "C" fn destructor_callback(raw_cb: *mut c_void) { + // turn this back into a local variable so rust will reclaim it + let _: Box = unsafe {mem::transmute(raw_cb)}; + } -impl <'a> MergeOperator<'a> { - pub fn new<'a>(name: &'a [str], merge_fn: for <'b> fn (String, Option, &mut MergeOperands) -> &'b [u8]) -> &'a MergeOperator<'a> { - let state = &MergeOperatorState { - name: name, - merge_fn: merge_fn, - }; - - let ffi_operator = rocksdb_ffi::rocksdb_mergeoperator_create( - state as *mut c_void, - state.null_destructor, - state.full_merge, - state.partial_merge, - None, - state.mergeoperator_name); - - &MergeOperator { - mo: ffi_operator, - state: state, - } +extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char { + unsafe { + let cb: &mut MergeOperatorCallback = &mut *(raw_cb as *mut MergeOperatorCallback); + let ptr = cb.name.as_ptr(); + ptr as *const c_char } } -impl <'a> MergeOperatorState<'a> { - - extern "C" fn null_destructor(&self) { - println!("in null_destructor"); - } - - extern "C" fn mergeoperator_name(&self) -> *const c_char { - println!("in mergeoperator_name"); - let name = self.name.to_c_str(); - unsafe { - let buf = libc::malloc(8 as size_t); - ptr::copy_memory(&mut *buf, name.as_ptr() as *const c_void, 8); - println!("returning from mergeoperator_name"); - buf as *const c_char - } - } - - extern "C" fn full_merge( - &self, key: *const c_char, key_len: size_t, - existing_value: *const c_char, existing_value_len: size_t, - operands_list: *const *const c_char, operands_list_len: *const size_t, - num_operands: c_int, - success: *mut u8, new_value_length: *mut size_t) -> *const c_char { - unsafe { - println!("in the FULL merge operator"); - let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); - let key = from_buf_len(key as *const u8, key_len as uint); - let oldval = from_buf_len(existing_value as *const u8, existing_value_len as uint); - let result = self.merge_fn(key, Some(oldval), operands); - - let buf = libc::malloc(result.len() as size_t); - assert!(buf.is_not_null()); - *new_value_length = 1 as size_t; - *success = 1 as u8; - let newval = "2"; - ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len()); - println!("returning from full_merge"); - buf as *const c_char - } - } - - extern "C" fn partial_merge( - &self, key: *const c_char, key_len: size_t, - operands_list: *const *const c_char, operands_list_len: *const size_t, - num_operands: c_int, - success: *mut u8, new_value_length: *mut size_t) -> *const c_char { - unsafe { - println!("in the PARTIAL merge operator"); - let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); - let key = from_buf_len(key as *const u8, key_len as uint); - let result = self.merge_fn(key, None, operands); - - let buf = libc::malloc(result.len() as size_t); - assert!(buf.is_not_null()); - *new_value_length = 1 as size_t; - *success = 1 as u8; - let newval = "2"; - ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len()); - buf as *const c_char - } +extern "C" fn full_merge_callback( + raw_cb: *mut c_void, key: *const c_char, key_len: size_t, + existing_value: *const c_char, existing_value_len: size_t, + operands_list: *const *const c_char, operands_list_len: *const size_t, + num_operands: c_int, + success: *mut u8, new_value_length: *mut size_t) -> *const c_char { + unsafe { + let cb: &mut MergeOperatorCallback = &mut *(raw_cb as *mut MergeOperatorCallback); + let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); + let key = from_buf_len(key as *const u8, key_len as uint); + let oldval = from_buf_len(existing_value as *const u8, existing_value_len as uint); + let mut result = (cb.merge_fn)(key, Some(oldval), operands); + result.shrink_to_fit(); + //TODO(tan) investigate zero-copy techniques to improve performance + let buf = libc::malloc(result.len() as size_t); + assert!(buf.is_not_null()); + *new_value_length = result.len() as size_t; + *success = 1 as u8; + ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len()); + buf as *const c_char } } -fn create_full_merge(provided_merge: for<'a> fn (new_key: String, existing_val: Option, - mut operands: &mut MergeOperands) -> &'a [u8]) { - -} - -fn create_partial_merge(provided_merge: for<'a> fn (new_key: String, existing_val: Option, - mut operands: &mut MergeOperands) -> &'a [u8]) { - +extern "C" fn partial_merge_callback( + raw_cb: *mut c_void, key: *const c_char, key_len: size_t, + operands_list: *const *const c_char, operands_list_len: *const size_t, + num_operands: c_int, + success: *mut u8, new_value_length: *mut size_t) -> *const c_char { + unsafe { + let cb: &mut MergeOperatorCallback = &mut *(raw_cb as *mut MergeOperatorCallback); + let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); + let key = from_buf_len(key as *const u8, key_len as uint); + let mut result = (cb.merge_fn)(key, None, operands); + result.shrink_to_fit(); + //TODO(tan) investigate zero-copy techniques to improve performance + let buf = libc::malloc(result.len() as size_t); + assert!(buf.is_not_null()); + *new_value_length = 1 as size_t; + *success = 1 as u8; + ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len()); + buf as *const c_char + } } fn test_provided_merge<'a>(new_key: String, existing_val: Option, - mut operands: &mut MergeOperands) -> &'a [u8] { + mut operands: &mut MergeOperands) -> Vec { + let mut result: Vec = Vec::with_capacity(operands.size_hint().val0()); + match existing_val { + Some(v) => result.push_all(v.as_bytes()), + None => (), + } for op in operands { - println!("op: {}", from_utf8(op)); + result.push_all(op); } - - "yoyo".as_bytes() + result } #[allow(dead_code)] @@ -528,13 +501,13 @@ fn mergetest() { opts.create_if_missing(true); opts.add_merge_operator("test operator", test_provided_merge); let db = RocksDB::open(opts, path).unwrap(); - let p = db.put(b"k1", b"1"); + let p = db.put(b"k1", b"a"); assert!(p.is_ok()); - db.merge(b"k1", b"10"); - db.merge(b"k1", b"2"); - db.merge(b"k1", b"3"); - db.merge(b"k1", b"4"); - let m = db.merge(b"k1", b"5"); + db.merge(b"k1", b"b"); + db.merge(b"k1", b"c"); + db.merge(b"k1", b"d"); + db.merge(b"k1", b"efg"); + let m = db.merge(b"k1", b"h"); assert!(m.is_ok()); db.get(b"k1").map( |value| { match value.to_utf8() { @@ -548,7 +521,7 @@ fn mergetest() { assert!(m.is_ok()); let r: RocksDBResult = db.get(b"k1"); - //assert!(r.unwrap().to_utf8().unwrap() == "yoyo"); + assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh"); assert!(db.delete(b"k1").is_ok()); assert!(db.get(b"k1").is_none()); db.close();