From 5f0b895cca2ff5c0e584f09db21b1c1257cd19a5 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Mon, 8 Dec 2014 01:31:27 -0500 Subject: [PATCH] more thrashing about for merge operator support --- src/ffi.rs | 2 +- src/rocksdb.rs | 186 ++++++++++++++++++++++++++++--------------------- 2 files changed, 106 insertions(+), 82 deletions(-) diff --git a/src/ffi.rs b/src/ffi.rs index d44fd9c..dfca528 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -128,7 +128,7 @@ extern { ) -> *const c_char, partial_merge: extern fn( arg: *mut c_void, key: *const c_char, key_len: size_t, - operands_list: *const c_void, operands_list_len: *const c_void, + operands_list: *const *const c_char, operands_list_len: *const size_t, num_operands: c_int, success: *mut u8, new_value_length: *mut size_t ) -> *const c_char, diff --git a/src/rocksdb.rs b/src/rocksdb.rs index 0047017..c855918 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -54,9 +54,10 @@ impl RocksDBOptions { } } - pub fn set_merge_operator(&self, mo: rocksdb_ffi::RocksDBMergeOperator) { + pub fn add_merge_operator(&self, name: &[str], merge_fn: for <'b> fn (String, Option, &mut MergeOperands) -> &'b [u8]) { unsafe { - rocksdb_ffi::rocksdb_options_set_merge_operator(self.inner, mo); + let mo = MergeOperator::new(name, merge_fn); + rocksdb_ffi::rocksdb_options_set_merge_operator(self.inner, mo.mo); } } } @@ -355,20 +356,6 @@ fn external() { assert!(RocksDB::destroy(opts, path).is_ok()); } -extern "C" fn null_destructor(args: *mut c_void) { - println!("in null_destructor"); -} -extern "C" fn mergeoperator_name(args: *mut c_void) -> *const c_char { - println!("in mergeoperator_name"); - let name = "test_mo".to_c_str(); - unsafe { - let buf = libc::malloc(8 as size_t); - ptr::copy_memory(&mut *buf, name.as_ptr() as *const c_void, 8); - println!("returning from mergeoperator_name"); - buf as *const c_char - } -} - struct MergeOperands<'a> { operands_list: *const *const c_char, operands_list_len: *const size_t, @@ -389,7 +376,7 @@ impl <'a> MergeOperands<'a> { } } -impl <'a> Iterator<&'a [u8]> for MergeOperands<'a> { +impl <'a> Iterator<&'a [u8]> for &'a mut MergeOperands<'a> { fn next(&mut self) -> Option<&'a [u8]> { use std::raw::Slice; match self.cursor == self.num_operands { @@ -418,70 +405,114 @@ impl <'a> Iterator<&'a [u8]> for MergeOperands<'a> { } } -extern "C" fn full_merge( - arg: *mut c_void, key: *const c_char, key_len: size_t, - existing_value: *const c_char, existing_value_len: size_t, - operands_list: *const *const c_char, operands_list_len: *const size_t, - num_operands: c_int, - success: *mut u8, new_value_length: *mut size_t) -> *const c_char { - unsafe { - println!("in the FULL merge operator"); - let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); - let key = from_buf_len(key as *const u8, key_len as uint); - let oldval = from_buf_len(existing_value as *const u8, existing_value_len as uint); +struct MergeOperatorState<'a> { + name: &'a [str], + merge_fn: for <'b> fn (String, Option, &mut MergeOperands) -> &'b [u8], +} - println!("returning from FULL merge"); - //TODO rust will "free" this when it goes out of scope, copy this to a non-gc'd buffer to return - merge(key, Some(oldval), operands); +struct MergeOperator<'a> { + mo: rocksdb_ffi::RocksDBMergeOperator, + state: MergeOperatorState<'a>, +} - let buf = libc::malloc(1 as size_t); - match buf.is_null() { - false => { - *new_value_length = 1 as size_t; - *success = 1 as u8; - let newval = "2"; - ptr::copy_memory(&mut *buf, newval.as_ptr() as *const c_void, 1); - println!("returning from full_merge"); - buf as *const c_char - }, - true => { - println!("returning from full_merge"); - 0 as *const c_char - } +impl <'a> MergeOperator<'a> { + pub fn new<'a>(name: &'a [str], merge_fn: for <'b> fn (String, Option, &mut MergeOperands) -> &'b [u8]) -> &'a MergeOperator<'a> { + let state = &MergeOperatorState { + name: name, + merge_fn: merge_fn, + }; + + let ffi_operator = rocksdb_ffi::rocksdb_mergeoperator_create( + state as *mut c_void, + state.null_destructor, + state.full_merge, + state.partial_merge, + None, + state.mergeoperator_name); + + &MergeOperator { + mo: ffi_operator, + state: state, } } } -extern "C" fn partial_merge( - arg: *mut c_void, key: *const c_char, key_len: size_t, - operands_list: *const c_void, operands_list_len: *const c_void, - num_operands: c_int, - success: *mut u8, new_value_length: *mut size_t) -> *const c_char { - unsafe { - println!("in the PARTIAL merge operator"); - *new_value_length = 2; - *success = 1 as u8; - let buf = libc::malloc(1 as size_t); - match buf.is_null() { - false => { - println!("number of operands: {}", num_operands); - println!("first operand: {}", from_buf_len(operands_list as *const u8, 1)); - *new_value_length = 1 as size_t; - *success = 1 as u8; - let newval = "2"; - ptr::copy_memory(&mut *buf, newval.as_ptr() as *const c_void, 1); - println!("returning from partial_merge"); - buf as *const c_char - }, - true => { - println!("returning from partial_merge"); - 0 as *const c_char - } + +impl <'a> MergeOperatorState<'a> { + + extern "C" fn null_destructor(&self) { + println!("in null_destructor"); + } + + extern "C" fn mergeoperator_name(&self) -> *const c_char { + println!("in mergeoperator_name"); + let name = self.name.to_c_str(); + unsafe { + let buf = libc::malloc(8 as size_t); + ptr::copy_memory(&mut *buf, name.as_ptr() as *const c_void, 8); + println!("returning from mergeoperator_name"); + buf as *const c_char + } + } + + extern "C" fn full_merge( + &self, key: *const c_char, key_len: size_t, + existing_value: *const c_char, existing_value_len: size_t, + operands_list: *const *const c_char, operands_list_len: *const size_t, + num_operands: c_int, + success: *mut u8, new_value_length: *mut size_t) -> *const c_char { + unsafe { + println!("in the FULL merge operator"); + let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); + let key = from_buf_len(key as *const u8, key_len as uint); + let oldval = from_buf_len(existing_value as *const u8, existing_value_len as uint); + let result = self.merge_fn(key, Some(oldval), operands); + + let buf = libc::malloc(result.len() as size_t); + assert!(buf.is_not_null()); + *new_value_length = 1 as size_t; + *success = 1 as u8; + let newval = "2"; + ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len()); + println!("returning from full_merge"); + buf as *const c_char + } + } + + extern "C" fn partial_merge( + &self, key: *const c_char, key_len: size_t, + operands_list: *const *const c_char, operands_list_len: *const size_t, + num_operands: c_int, + success: *mut u8, new_value_length: *mut size_t) -> *const c_char { + unsafe { + println!("in the PARTIAL merge operator"); + let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); + let key = from_buf_len(key as *const u8, key_len as uint); + let result = self.merge_fn(key, None, operands); + + let buf = libc::malloc(result.len() as size_t); + assert!(buf.is_not_null()); + *new_value_length = 1 as size_t; + *success = 1 as u8; + let newval = "2"; + ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len()); + buf as *const c_char } } } -fn merge<'a>(new_key: String, existing_val: Option, operands: &mut MergeOperands) -> &'a [u8] { - for op in *operands { +fn create_full_merge(provided_merge: for<'a> fn (new_key: String, existing_val: Option, + mut operands: &mut MergeOperands) -> &'a [u8]) { + +} + +fn create_partial_merge(provided_merge: for<'a> fn (new_key: String, existing_val: Option, + mut operands: &mut MergeOperands) -> &'a [u8]) { + +} + +fn test_provided_merge<'a>(new_key: String, existing_val: Option, + mut operands: &mut MergeOperands) -> &'a [u8] { + for op in operands { println!("op: {}", from_utf8(op)); } @@ -494,15 +525,8 @@ fn mergetest() { let path = "_rust_rocksdb_mergetest"; unsafe { let opts = RocksDBOptions::new(); - let mo = rocksdb_ffi::rocksdb_mergeoperator_create( - 0 as *mut c_void, - null_destructor, - full_merge, - partial_merge, - None, - mergeoperator_name); opts.create_if_missing(true); - opts.set_merge_operator(mo); + opts.add_merge_operator("test operator", test_provided_merge); let db = RocksDB::open(opts, path).unwrap(); let p = db.put(b"k1", b"1"); assert!(p.is_ok()); @@ -524,7 +548,7 @@ fn mergetest() { assert!(m.is_ok()); let r: RocksDBResult = db.get(b"k1"); - assert!(r.unwrap().to_utf8().unwrap() == "2"); + //assert!(r.unwrap().to_utf8().unwrap() == "yoyo"); assert!(db.delete(b"k1").is_ok()); assert!(db.get(b"k1").is_none()); db.close();