basic rustic merge operator

master
Tyler Neely 10 years ago
parent 5f0b895cca
commit 6eb01eb07c
  1. 189
      src/rocksdb.rs

@ -54,10 +54,21 @@ impl RocksDBOptions {
} }
} }
pub fn add_merge_operator(&self, name: &[str], merge_fn: for <'b> fn (String, Option<String>, &mut MergeOperands) -> &'b [u8]) { pub fn add_merge_operator<'a>(&self, name: &str, merge_fn: for <'a> fn (String, Option<String>, &mut MergeOperands) -> Vec<u8>) {
let cb = box MergeOperatorCallback {
name: name.to_c_str(),
merge_fn: merge_fn,
};
unsafe { unsafe {
let mo = MergeOperator::new(name, merge_fn); let mo = rocksdb_ffi::rocksdb_mergeoperator_create(
rocksdb_ffi::rocksdb_options_set_merge_operator(self.inner, mo.mo); mem::transmute(cb),
destructor_callback,
full_merge_callback,
partial_merge_callback,
None,
name_callback);
rocksdb_ffi::rocksdb_options_set_merge_operator(self.inner, mo);
} }
} }
} }
@ -356,7 +367,7 @@ fn external() {
assert!(RocksDB::destroy(opts, path).is_ok()); assert!(RocksDB::destroy(opts, path).is_ok());
} }
struct MergeOperands<'a> { pub struct MergeOperands<'a> {
operands_list: *const *const c_char, operands_list: *const *const c_char,
operands_list_len: *const size_t, operands_list_len: *const size_t,
num_operands: uint, num_operands: uint,
@ -405,118 +416,80 @@ impl <'a> Iterator<&'a [u8]> for &'a mut MergeOperands<'a> {
} }
} }
struct MergeOperatorState<'a> { struct MergeOperatorCallback {
name: &'a [str], name: CString,
merge_fn: for <'b> fn (String, Option<String>, &mut MergeOperands) -> &'b [u8], merge_fn: for <'b> fn (String, Option<String>, &mut MergeOperands) -> Vec<u8>,
} }
struct MergeOperator<'a> { extern "C" fn destructor_callback(raw_cb: *mut c_void) {
mo: rocksdb_ffi::RocksDBMergeOperator, // turn this back into a local variable so rust will reclaim it
state: MergeOperatorState<'a>, let _: Box<MergeOperatorCallback> = unsafe {mem::transmute(raw_cb)};
} }
impl <'a> MergeOperator<'a> { extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char {
pub fn new<'a>(name: &'a [str], merge_fn: for <'b> fn (String, Option<String>, &mut MergeOperands) -> &'b [u8]) -> &'a MergeOperator<'a> { unsafe {
let state = &MergeOperatorState { let cb: &mut MergeOperatorCallback = &mut *(raw_cb as *mut MergeOperatorCallback);
name: name, let ptr = cb.name.as_ptr();
merge_fn: merge_fn, ptr as *const c_char
};
let ffi_operator = rocksdb_ffi::rocksdb_mergeoperator_create(
state as *mut c_void,
state.null_destructor,
state.full_merge,
state.partial_merge,
None,
state.mergeoperator_name);
&MergeOperator {
mo: ffi_operator,
state: state,
}
} }
} }
impl <'a> MergeOperatorState<'a> { extern "C" fn full_merge_callback(
raw_cb: *mut c_void, key: *const c_char, key_len: size_t,
extern "C" fn null_destructor(&self) { existing_value: *const c_char, existing_value_len: size_t,
println!("in null_destructor"); operands_list: *const *const c_char, operands_list_len: *const size_t,
} num_operands: c_int,
success: *mut u8, new_value_length: *mut size_t) -> *const c_char {
extern "C" fn mergeoperator_name(&self) -> *const c_char { unsafe {
println!("in mergeoperator_name"); let cb: &mut MergeOperatorCallback = &mut *(raw_cb as *mut MergeOperatorCallback);
let name = self.name.to_c_str(); let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
unsafe { let key = from_buf_len(key as *const u8, key_len as uint);
let buf = libc::malloc(8 as size_t); let oldval = from_buf_len(existing_value as *const u8, existing_value_len as uint);
ptr::copy_memory(&mut *buf, name.as_ptr() as *const c_void, 8); let mut result = (cb.merge_fn)(key, Some(oldval), operands);
println!("returning from mergeoperator_name"); result.shrink_to_fit();
buf as *const c_char //TODO(tan) investigate zero-copy techniques to improve performance
} let buf = libc::malloc(result.len() as size_t);
} assert!(buf.is_not_null());
*new_value_length = result.len() as size_t;
extern "C" fn full_merge( *success = 1 as u8;
&self, key: *const c_char, key_len: size_t, ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len());
existing_value: *const c_char, existing_value_len: size_t, buf as *const c_char
operands_list: *const *const c_char, operands_list_len: *const size_t,
num_operands: c_int,
success: *mut u8, new_value_length: *mut size_t) -> *const c_char {
unsafe {
println!("in the FULL merge operator");
let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
let key = from_buf_len(key as *const u8, key_len as uint);
let oldval = from_buf_len(existing_value as *const u8, existing_value_len as uint);
let result = self.merge_fn(key, Some(oldval), operands);
let buf = libc::malloc(result.len() as size_t);
assert!(buf.is_not_null());
*new_value_length = 1 as size_t;
*success = 1 as u8;
let newval = "2";
ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len());
println!("returning from full_merge");
buf as *const c_char
}
}
extern "C" fn partial_merge(
&self, key: *const c_char, key_len: size_t,
operands_list: *const *const c_char, operands_list_len: *const size_t,
num_operands: c_int,
success: *mut u8, new_value_length: *mut size_t) -> *const c_char {
unsafe {
println!("in the PARTIAL merge operator");
let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
let key = from_buf_len(key as *const u8, key_len as uint);
let result = self.merge_fn(key, None, operands);
let buf = libc::malloc(result.len() as size_t);
assert!(buf.is_not_null());
*new_value_length = 1 as size_t;
*success = 1 as u8;
let newval = "2";
ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len());
buf as *const c_char
}
} }
} }
fn create_full_merge(provided_merge: for<'a> fn (new_key: String, existing_val: Option<String>, extern "C" fn partial_merge_callback(
mut operands: &mut MergeOperands) -> &'a [u8]) { raw_cb: *mut c_void, key: *const c_char, key_len: size_t,
operands_list: *const *const c_char, operands_list_len: *const size_t,
} num_operands: c_int,
success: *mut u8, new_value_length: *mut size_t) -> *const c_char {
fn create_partial_merge(provided_merge: for<'a> fn (new_key: String, existing_val: Option<String>, unsafe {
mut operands: &mut MergeOperands) -> &'a [u8]) { let cb: &mut MergeOperatorCallback = &mut *(raw_cb as *mut MergeOperatorCallback);
let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
let key = from_buf_len(key as *const u8, key_len as uint);
let mut result = (cb.merge_fn)(key, None, operands);
result.shrink_to_fit();
//TODO(tan) investigate zero-copy techniques to improve performance
let buf = libc::malloc(result.len() as size_t);
assert!(buf.is_not_null());
*new_value_length = 1 as size_t;
*success = 1 as u8;
ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len());
buf as *const c_char
}
} }
fn test_provided_merge<'a>(new_key: String, existing_val: Option<String>, fn test_provided_merge<'a>(new_key: String, existing_val: Option<String>,
mut operands: &mut MergeOperands) -> &'a [u8] { mut operands: &mut MergeOperands) -> Vec<u8> {
let mut result: Vec<u8> = Vec::with_capacity(operands.size_hint().val0());
match existing_val {
Some(v) => result.push_all(v.as_bytes()),
None => (),
}
for op in operands { for op in operands {
println!("op: {}", from_utf8(op)); result.push_all(op);
} }
result
"yoyo".as_bytes()
} }
#[allow(dead_code)] #[allow(dead_code)]
@ -528,13 +501,13 @@ fn mergetest() {
opts.create_if_missing(true); opts.create_if_missing(true);
opts.add_merge_operator("test operator", test_provided_merge); opts.add_merge_operator("test operator", test_provided_merge);
let db = RocksDB::open(opts, path).unwrap(); let db = RocksDB::open(opts, path).unwrap();
let p = db.put(b"k1", b"1"); let p = db.put(b"k1", b"a");
assert!(p.is_ok()); assert!(p.is_ok());
db.merge(b"k1", b"10"); db.merge(b"k1", b"b");
db.merge(b"k1", b"2"); db.merge(b"k1", b"c");
db.merge(b"k1", b"3"); db.merge(b"k1", b"d");
db.merge(b"k1", b"4"); db.merge(b"k1", b"efg");
let m = db.merge(b"k1", b"5"); let m = db.merge(b"k1", b"h");
assert!(m.is_ok()); assert!(m.is_ok());
db.get(b"k1").map( |value| { db.get(b"k1").map( |value| {
match value.to_utf8() { match value.to_utf8() {
@ -548,7 +521,7 @@ fn mergetest() {
assert!(m.is_ok()); assert!(m.is_ok());
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1"); let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
//assert!(r.unwrap().to_utf8().unwrap() == "yoyo"); assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh");
assert!(db.delete(b"k1").is_ok()); assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").is_none()); assert!(db.get(b"k1").is_none());
db.close(); db.close();

Loading…
Cancel
Save