From 90ef2333c3bcb262be6ce5c2acbd5357cb58e7e8 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Sun, 30 Nov 2014 02:41:08 -0500 Subject: [PATCH 1/8] initial implementation fragments for supporting merge operators --- src/ffi.rs | 19 +++++----- src/rocksdb.rs | 97 +++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 105 insertions(+), 11 deletions(-) diff --git a/src/ffi.rs b/src/ffi.rs index d727556..23345cd 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -113,25 +113,28 @@ extern { pub fn rocksdb_repair_db( options: RocksDBOptions, path: *const i8, err: *mut i8); - // Merge Operator + // Merge + pub fn rocksdb_merge(db: RocksDBInstance, writeopts: RocksDBWriteOptions, + k: *const u8, kLen: size_t, + v: *const u8, vLen: size_t, err: *mut i8); pub fn rocksdb_mergeoperator_create( state: *mut c_void, destroy: extern fn(*mut c_void) -> (), full_merge: extern fn ( arg: *mut c_void, key: *const c_char, key_len: *mut size_t, existing_value: *const c_char, existing_value_len: *mut size_t, - operands_list: &[*const c_char], operands_list_len: *const size_t, + operands_list: &[*const c_char], operands_list_len: &[size_t], num_operands: c_int, success: *mut u8, new_value_length: *mut size_t ) -> *const c_char, partial_merge: extern fn( - *mut c_void, key: *const c_char, key_len: *mut size_t, - operands_list: &[*const c_char], operands_list_len: *const size_t, + arg: *mut c_void, key: *const c_char, key_len: *mut size_t, + operands_list: &[*const c_char], operands_list_len: &[size_t], num_operands: c_int, success: *mut u8, new_value_length: *mut size_t ) -> *const c_char, - delete_value: extern fn(*mut c_void, value: *const c_char, - value_len: *mut size_t) -> (), + delete_value: Option ()>, name_fn: extern fn(*mut c_void) -> *const c_char, ) -> RocksDBMergeOperator; pub fn rocksdb_mergeoperator_destroy(mo: RocksDBMergeOperator); @@ -159,7 +162,6 @@ fn internal() { let err = 0 as *mut i8; let db = rocksdb_open(opts, cpath_ptr, err); assert!(err.is_null()); - libc::free(err as *mut c_void); let writeopts = rocksdb_writeoptions_create(); let RocksDBWriteOptions(write_opt_ptr) = writeopts; @@ -170,18 +172,15 @@ fn internal() { rocksdb_put(db, writeopts, key.as_ptr(), 4, val.as_ptr(), 8, err); assert!(err.is_null()); - libc::free(err as *mut c_void); let readopts = rocksdb_readoptions_create(); let RocksDBReadOptions(read_opts_ptr) = readopts; assert!(read_opts_ptr.is_not_null()); - libc::free(err as *mut c_void); let val_len: size_t = 0; let val_len_ptr = &val_len as *const size_t; rocksdb_get(db, readopts, key.as_ptr(), 4, val_len_ptr, err); assert!(err.is_null()); - libc::free(err as *mut c_void); rocksdb_close(db); } } diff --git a/src/rocksdb.rs b/src/rocksdb.rs index fb3006b..606a0c9 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -1,9 +1,10 @@ extern crate libc; -use self::libc::{c_void, size_t}; +use self::libc::{c_char, c_int, c_void, size_t}; use std::io::{IoError}; use std::c_vec::CVec; use std::c_str::CString; use std::str::from_utf8; +use std::string::raw::from_buf_len; use rocksdb_ffi; @@ -121,6 +122,33 @@ impl RocksDB { } } + pub fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> { + unsafe { + let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); + let err = 0 as *mut i8; + rocksdb_ffi::rocksdb_merge(self.inner, writeopts, key.as_ptr(), + key.len() as size_t, value.as_ptr(), + value.len() as size_t, err); + if err.is_not_null() { + let cs = CString::new(err as *const i8, true); + match cs.as_str() { + Some(error_string) => + return Err(error_string.to_string()), + None => { + let ie = IoError::last_error(); + return Err(format!( + "ERROR: desc:{}, details:{}", + ie.desc, + ie.detail.unwrap_or_else( + || {"none provided by OS".to_string()}))) + } + } + } + return Ok(()) + } + } + + pub fn get<'a>(&self, key: &[u8]) -> RocksDBResult<'a, RocksDBVector, String> { unsafe { @@ -297,3 +325,70 @@ fn external() { assert!(db.get(b"k1").is_none()); db.close(); } + +extern "C" fn null_destructor(args: *mut c_void) { + println!("in null_destructor now"); +} +extern "C" fn mo_name(args: *mut c_void) -> *const c_char { + "test_mo".to_c_str().as_ptr() +} +extern "C" fn full_merge( + arg: *mut c_void, key: *const c_char, key_len: *mut size_t, + existing_value: *const c_char, existing_value_len: *mut size_t, + operands_list: &[*const c_char], operands_list_len: &[size_t], + num_operands: c_int, + success: *mut u8, new_value_length: *mut size_t) -> *const c_char { + unsafe { + println!("in the FULL merge operator right now!"); + //println!("first opt len: {}", operands_list_len[0]); + let oldkey = from_buf_len(key as *const u8, key_len as uint); + let oldval = from_buf_len(existing_value as *const u8, existing_value_len as uint); + *new_value_length = 1; + *success = 1 as u8; + } + "2".to_c_str().as_ptr() +} +extern "C" fn partial_merge( + arg: *mut c_void, key: *const c_char, key_len: *mut size_t, + operands_list: &[*const c_char], operands_list_len: &[size_t], + num_operands: c_int, + success: *mut u8, new_value_length: *mut size_t) -> *const c_char { + unsafe { + println!("in the PARTIAL merge operator right now!"); + *new_value_length = 2; + *success = 1 as u8; + } + "3".to_c_str().as_ptr() +} + + +#[allow(dead_code)] +#[zest] +fn mergetest() { + unsafe { + let opts = RocksDBOptions::new(); + let mo = rocksdb_ffi::rocksdb_mergeoperator_create( + 0 as *mut c_void, + null_destructor, + full_merge, + partial_merge, + None, + mo_name); + opts.create_if_missing(true); + opts.set_merge_operator(mo); + let db = RocksDB::open(opts, "externaltest").unwrap(); + let p = db.put(b"k1", b"1"); + //assert!(p.is_ok()); + let m = db.merge(b"k1", b"1"); + println!("after merge"); + println!("k1's value is: {}", db.get(b"k1").unwrap().to_utf8().unwrap()); + /* + assert!(m.is_ok()); + let r: RocksDBResult = db.get(b"k1"); + assert!(r.unwrap().to_utf8().unwrap() == "2"); + assert!(db.delete(b"k1").is_ok()); + assert!(db.get(b"k1").is_none()); + */ + db.close(); + } +} From 64a298ff700ff6e2fce709e215330aab4afdb942 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Sun, 30 Nov 2014 17:41:21 -0500 Subject: [PATCH 2/8] add debugging info, some proper malloc'ing --- src/ffi.rs | 2 +- src/rocksdb.rs | 55 ++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/src/ffi.rs b/src/ffi.rs index 23345cd..66e06fd 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -144,7 +144,7 @@ extern { } #[allow(dead_code)] -#[test] +#[zest] fn internal() { unsafe { let opts = rocksdb_options_create(); diff --git a/src/rocksdb.rs b/src/rocksdb.rs index 606a0c9..e3e7170 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -5,6 +5,7 @@ use std::c_vec::CVec; use std::c_str::CString; use std::str::from_utf8; use std::string::raw::from_buf_len; +use std::ptr; use rocksdb_ffi; @@ -164,8 +165,10 @@ impl RocksDB { let val_len: size_t = 0; let val_len_ptr = &val_len as *const size_t; let err = 0 as *mut i8; + println!("above ffi get"); let val = rocksdb_ffi::rocksdb_get(self.inner, readopts, key.as_ptr(), key.len() as size_t, val_len_ptr, err) as *mut u8; + println!("below ffi get"); if err.is_not_null() { let cs = CString::new(err as *const i8, true); match cs.as_str() { @@ -314,7 +317,7 @@ impl <'a,T,E> RocksDBResult<'a,T,E> { } #[allow(dead_code)] -#[test] +#[zest] fn external() { let db = RocksDB::open_default("externaltest").unwrap(); let p = db.put(b"k1", b"v1111"); @@ -327,10 +330,17 @@ fn external() { } extern "C" fn null_destructor(args: *mut c_void) { - println!("in null_destructor now"); + println!("in null_destructor"); } extern "C" fn mo_name(args: *mut c_void) -> *const c_char { - "test_mo".to_c_str().as_ptr() + println!("in mo_name"); + let name = "test_mo".to_c_str(); + unsafe { + let buf = libc::malloc(8 as size_t); + ptr::copy_memory(&mut *buf, name.as_ptr() as *const c_void, 8); + println!("returning from mo_name"); + buf as *const c_char + } } extern "C" fn full_merge( arg: *mut c_void, key: *const c_char, key_len: *mut size_t, @@ -339,14 +349,25 @@ extern "C" fn full_merge( num_operands: c_int, success: *mut u8, new_value_length: *mut size_t) -> *const c_char { unsafe { - println!("in the FULL merge operator right now!"); + println!("in the FULL merge operator"); //println!("first opt len: {}", operands_list_len[0]); let oldkey = from_buf_len(key as *const u8, key_len as uint); let oldval = from_buf_len(existing_value as *const u8, existing_value_len as uint); - *new_value_length = 1; - *success = 1 as u8; + let buf = libc::malloc(1 as size_t); + match buf.is_null() { + false => { + *new_value_length = 1; + *success = 1 as u8; + let newval = "2"; + ptr::copy_memory(&mut *buf, newval.as_ptr() as *const c_void, 1); + println!("returning from full_merge"); + buf as *const c_char + }, + true => { + return 0 as *const c_char; + } + } } - "2".to_c_str().as_ptr() } extern "C" fn partial_merge( arg: *mut c_void, key: *const c_char, key_len: *mut size_t, @@ -354,7 +375,7 @@ extern "C" fn partial_merge( num_operands: c_int, success: *mut u8, new_value_length: *mut size_t) -> *const c_char { unsafe { - println!("in the PARTIAL merge operator right now!"); + println!("in the PARTIAL merge operator"); *new_value_length = 2; *success = 1 as u8; } @@ -363,7 +384,7 @@ extern "C" fn partial_merge( #[allow(dead_code)] -#[zest] +#[test] fn mergetest() { unsafe { let opts = RocksDBOptions::new(); @@ -377,11 +398,23 @@ fn mergetest() { opts.create_if_missing(true); opts.set_merge_operator(mo); let db = RocksDB::open(opts, "externaltest").unwrap(); + println!("after open"); let p = db.put(b"k1", b"1"); - //assert!(p.is_ok()); + assert!(p.is_ok()); + println!("before merge"); let m = db.merge(b"k1", b"1"); + println!("m is {}", m); println!("after merge"); - println!("k1's value is: {}", db.get(b"k1").unwrap().to_utf8().unwrap()); + db.get(b"k1").map( |value| { + match value.to_utf8() { + Some(v) => + println!("retrieved utf8 value: {}", v), + None => + println!("did not read valid utf-8 out of the db"), + } + }).on_absent( || { println!("value not present!") }) + .on_error( |e| { println!("error reading value: {}", e) }); + /* assert!(m.is_ok()); let r: RocksDBResult = db.get(b"k1"); From 423f90a45d83e438d3ddb31db9079378356af828 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Sat, 6 Dec 2014 09:20:37 -0500 Subject: [PATCH 3/8] correct signature alignment for merge operator --- src/ffi.rs | 12 +++--- src/rocksdb.rs | 108 ++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 94 insertions(+), 26 deletions(-) diff --git a/src/ffi.rs b/src/ffi.rs index 66e06fd..1219276 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -121,15 +121,15 @@ extern { state: *mut c_void, destroy: extern fn(*mut c_void) -> (), full_merge: extern fn ( - arg: *mut c_void, key: *const c_char, key_len: *mut size_t, - existing_value: *const c_char, existing_value_len: *mut size_t, - operands_list: &[*const c_char], operands_list_len: &[size_t], + arg: *mut c_void, key: *const c_char, key_len: size_t, + existing_value: *const c_char, existing_value_len: size_t, + operands_list: *const *const c_char, operands_list_len: *const size_t, num_operands: c_int, success: *mut u8, new_value_length: *mut size_t ) -> *const c_char, partial_merge: extern fn( - arg: *mut c_void, key: *const c_char, key_len: *mut size_t, - operands_list: &[*const c_char], operands_list_len: &[size_t], + arg: *mut c_void, key: *const c_char, key_len: size_t, + operands_list: *const c_void, operands_list_len: *const c_void, num_operands: c_int, success: *mut u8, new_value_length: *mut size_t ) -> *const c_char, @@ -144,7 +144,7 @@ extern { } #[allow(dead_code)] -#[zest] +#[test] fn internal() { unsafe { let opts = rocksdb_options_create(); diff --git a/src/rocksdb.rs b/src/rocksdb.rs index e3e7170..63b5f85 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -6,6 +6,9 @@ use std::c_str::CString; use std::str::from_utf8; use std::string::raw::from_buf_len; use std::ptr; +use std::mem; +use std::num; +use std::slice; use rocksdb_ffi; @@ -165,10 +168,8 @@ impl RocksDB { let val_len: size_t = 0; let val_len_ptr = &val_len as *const size_t; let err = 0 as *mut i8; - println!("above ffi get"); let val = rocksdb_ffi::rocksdb_get(self.inner, readopts, key.as_ptr(), key.len() as size_t, val_len_ptr, err) as *mut u8; - println!("below ffi get"); if err.is_not_null() { let cs = CString::new(err as *const i8, true); match cs.as_str() { @@ -317,7 +318,7 @@ impl <'a,T,E> RocksDBResult<'a,T,E> { } #[allow(dead_code)] -#[zest] +#[test] fn external() { let db = RocksDB::open_default("externaltest").unwrap(); let p = db.put(b"k1", b"v1111"); @@ -342,21 +343,71 @@ extern "C" fn mo_name(args: *mut c_void) -> *const c_char { buf as *const c_char } } + +struct MergeOperands { + operands_list: *const *const c_char, + operands_list_len: *const size_t, + num_operands: uint, + cursor: uint, +} + +impl MergeOperands { + fn new(operands_list: *const *const c_char, operands_list_len: *const size_t, + num_operands: c_int) -> MergeOperands { + assert!(num_operands >= 0); + MergeOperands { + operands_list: operands_list, + operands_list_len: operands_list_len, + num_operands: num_operands as uint, + cursor: 0, + } + } +} + +impl <'a> Iterator<&'a [u8]> for MergeOperands { + fn next(&mut self) -> Option<&'a [u8]> { + match self.cursor == self.num_operands { + true => None, + false => { + unsafe { + let base = self.operands_list as uint; + let base_len = self.operands_list_len as uint; + let spacing = mem::size_of::<*const *const u8>(); + let spacing_len = mem::size_of::<*const size_t>(); + let len_ptr = (base_len + (spacing_len * self.cursor)) as *const size_t; + let len = *len_ptr; + println!("len: {}", len); + let ptr = base + (spacing * self.cursor); + let op = slice::from_raw_buf(*(ptr as *const &*const u8), len as uint); + self.cursor += 1; + println!("returning: {}", from_utf8(op)); + Some(op) + } + } + } + } +} + extern "C" fn full_merge( - arg: *mut c_void, key: *const c_char, key_len: *mut size_t, - existing_value: *const c_char, existing_value_len: *mut size_t, - operands_list: &[*const c_char], operands_list_len: &[size_t], + arg: *mut c_void, key: *const c_char, key_len: size_t, + existing_value: *const c_char, existing_value_len: size_t, + operands_list: *const *const c_char, operands_list_len: *const size_t, num_operands: c_int, success: *mut u8, new_value_length: *mut size_t) -> *const c_char { unsafe { println!("in the FULL merge operator"); - //println!("first opt len: {}", operands_list_len[0]); + /* + for mo in MergeOperands::new(operands_list, operands_list_len, num_operands) { + println!("buf: {}", mo); + } + */ let oldkey = from_buf_len(key as *const u8, key_len as uint); let oldval = from_buf_len(existing_value as *const u8, existing_value_len as uint); + println!("old key: {}", oldval); let buf = libc::malloc(1 as size_t); match buf.is_null() { false => { - *new_value_length = 1; + *new_value_length = 1 as size_t; *success = 1 as u8; let newval = "2"; ptr::copy_memory(&mut *buf, newval.as_ptr() as *const c_void, 1); @@ -364,27 +415,44 @@ extern "C" fn full_merge( buf as *const c_char }, true => { - return 0 as *const c_char; + println!("returning from full_merge"); + 0 as *const c_char } } } } extern "C" fn partial_merge( - arg: *mut c_void, key: *const c_char, key_len: *mut size_t, - operands_list: &[*const c_char], operands_list_len: &[size_t], + arg: *mut c_void, key: *const c_char, key_len: size_t, + operands_list: *const c_void, operands_list_len: *const c_void, num_operands: c_int, success: *mut u8, new_value_length: *mut size_t) -> *const c_char { unsafe { println!("in the PARTIAL merge operator"); *new_value_length = 2; *success = 1 as u8; + let buf = libc::malloc(1 as size_t); + match buf.is_null() { + false => { + println!("number of operands: {}", num_operands); + println!("first operand: {}", from_buf_len(operands_list as *const u8, 1)); + *new_value_length = 1 as size_t; + *success = 1 as u8; + let newval = "2"; + ptr::copy_memory(&mut *buf, newval.as_ptr() as *const c_void, 1); + println!("returning from partial_merge"); + buf as *const c_char + }, + true => { + println!("returning from partial_merge"); + 0 as *const c_char + } + } } - "3".to_c_str().as_ptr() } #[allow(dead_code)] -#[test] +#[zest] fn mergetest() { unsafe { let opts = RocksDBOptions::new(); @@ -398,12 +466,14 @@ fn mergetest() { opts.create_if_missing(true); opts.set_merge_operator(mo); let db = RocksDB::open(opts, "externaltest").unwrap(); - println!("after open"); let p = db.put(b"k1", b"1"); assert!(p.is_ok()); - println!("before merge"); - let m = db.merge(b"k1", b"1"); - println!("m is {}", m); + db.merge(b"k1", b"10"); + db.merge(b"k1", b"2"); + db.merge(b"k1", b"3"); + db.merge(b"k1", b"4"); + let m = db.merge(b"k1", b"5"); + assert!(m.is_ok()); println!("after merge"); db.get(b"k1").map( |value| { match value.to_utf8() { @@ -413,15 +483,13 @@ fn mergetest() { println!("did not read valid utf-8 out of the db"), } }).on_absent( || { println!("value not present!") }) - .on_error( |e| { println!("error reading value: {}", e) }); + .on_error( |e| { println!("error reading value")}); //: {}", e) }); - /* assert!(m.is_ok()); let r: RocksDBResult = db.get(b"k1"); assert!(r.unwrap().to_utf8().unwrap() == "2"); assert!(db.delete(b"k1").is_ok()); assert!(db.get(b"k1").is_none()); - */ db.close(); } } From 2d35e4de8d4f96066a2b86038ba1da35e190dee5 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Sat, 6 Dec 2014 18:27:42 -0500 Subject: [PATCH 4/8] add destroy db so that we can clean up after tests --- src/ffi.rs | 5 +++-- src/rocksdb.rs | 35 ++++++++++++++++++++++++++++++++--- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/src/ffi.rs b/src/ffi.rs index 1219276..d44fd9c 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -112,7 +112,6 @@ extern { options: RocksDBOptions, path: *const i8, err: *mut i8); pub fn rocksdb_repair_db( options: RocksDBOptions, path: *const i8, err: *mut i8); - // Merge pub fn rocksdb_merge(db: RocksDBInstance, writeopts: RocksDBWriteOptions, k: *const u8, kLen: size_t, @@ -155,7 +154,7 @@ fn internal() { rocksdb_options_optimize_level_style_compaction(opts, 0); rocksdb_options_set_create_if_missing(opts, 1); - let rustpath = "internaltest"; + let rustpath = "_rust_rocksdb_internaltest"; let cpath = rustpath.to_c_str(); let cpath_ptr = cpath.as_ptr(); @@ -182,5 +181,7 @@ fn internal() { rocksdb_get(db, readopts, key.as_ptr(), 4, val_len_ptr, err); assert!(err.is_null()); rocksdb_close(db); + rocksdb_destroy_db(opts, cpath_ptr, err); + assert!(err.is_null()); } } diff --git a/src/rocksdb.rs b/src/rocksdb.rs index 63b5f85..9857872 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -99,6 +99,29 @@ impl RocksDB { Ok(RocksDB{inner: db}) } } + + pub fn destroy(opts: RocksDBOptions, path: &str) -> Result<(), String> { + unsafe { + let cpath = path.to_c_str(); + let cpath_ptr = cpath.as_ptr(); + + //TODO test path here, as if rocksdb fails it will just crash the + // process currently + + let err = 0 as *mut i8; + let result = rocksdb_ffi::rocksdb_destroy_db(opts.inner, cpath_ptr, err); + if err.is_not_null() { + let cs = CString::new(err as *const i8, true); + match cs.as_str() { + Some(error_string) => + return Err(error_string.to_string()), + None => + return Err("Could not initialize database.".to_string()), + } + } + Ok(()) + } + } pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> { unsafe { @@ -320,7 +343,8 @@ impl <'a,T,E> RocksDBResult<'a,T,E> { #[allow(dead_code)] #[test] fn external() { - let db = RocksDB::open_default("externaltest").unwrap(); + let path = "_rust_rocksdb_externaltest"; + let db = RocksDB::open_default(path).unwrap(); let p = db.put(b"k1", b"v1111"); assert!(p.is_ok()); let r: RocksDBResult = db.get(b"k1"); @@ -328,6 +352,8 @@ fn external() { assert!(db.delete(b"k1").is_ok()); assert!(db.get(b"k1").is_none()); db.close(); + let opts = RocksDBOptions::new(); + assert!(RocksDB::destroy(opts, path).is_ok()); } extern "C" fn null_destructor(args: *mut c_void) { @@ -452,8 +478,9 @@ extern "C" fn partial_merge( #[allow(dead_code)] -#[zest] +#[test] fn mergetest() { + let path = "_rust_rocksdb_mergetest"; unsafe { let opts = RocksDBOptions::new(); let mo = rocksdb_ffi::rocksdb_mergeoperator_create( @@ -465,7 +492,8 @@ fn mergetest() { mo_name); opts.create_if_missing(true); opts.set_merge_operator(mo); - let db = RocksDB::open(opts, "externaltest").unwrap(); + let db = RocksDB::open(opts, path).unwrap(); + println!("here!"); let p = db.put(b"k1", b"1"); assert!(p.is_ok()); db.merge(b"k1", b"10"); @@ -491,5 +519,6 @@ fn mergetest() { assert!(db.delete(b"k1").is_ok()); assert!(db.get(b"k1").is_none()); db.close(); + assert!(RocksDB::destroy(opts, path).is_ok()); } } From 12c7afad9ec2701e768ca12db019d36c8f7fc7f2 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Sun, 7 Dec 2014 16:56:39 -0500 Subject: [PATCH 5/8] added operand iterator, beginnings of rustic merge operator --- src/rocksdb.rs | 57 +++++++++++++++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 24 deletions(-) diff --git a/src/rocksdb.rs b/src/rocksdb.rs index 9857872..0047017 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -7,7 +7,6 @@ use std::str::from_utf8; use std::string::raw::from_buf_len; use std::ptr; use std::mem; -use std::num; use std::slice; use rocksdb_ffi; @@ -359,27 +358,27 @@ fn external() { extern "C" fn null_destructor(args: *mut c_void) { println!("in null_destructor"); } -extern "C" fn mo_name(args: *mut c_void) -> *const c_char { - println!("in mo_name"); +extern "C" fn mergeoperator_name(args: *mut c_void) -> *const c_char { + println!("in mergeoperator_name"); let name = "test_mo".to_c_str(); unsafe { let buf = libc::malloc(8 as size_t); ptr::copy_memory(&mut *buf, name.as_ptr() as *const c_void, 8); - println!("returning from mo_name"); + println!("returning from mergeoperator_name"); buf as *const c_char } } -struct MergeOperands { +struct MergeOperands<'a> { operands_list: *const *const c_char, operands_list_len: *const size_t, num_operands: uint, cursor: uint, } -impl MergeOperands { - fn new(operands_list: *const *const c_char, operands_list_len: *const size_t, - num_operands: c_int) -> MergeOperands { +impl <'a> MergeOperands<'a> { + fn new<'a>(operands_list: *const *const c_char, operands_list_len: *const size_t, + num_operands: c_int) -> MergeOperands<'a> { assert!(num_operands >= 0); MergeOperands { operands_list: operands_list, @@ -390,8 +389,9 @@ impl MergeOperands { } } -impl <'a> Iterator<&'a [u8]> for MergeOperands { +impl <'a> Iterator<&'a [u8]> for MergeOperands<'a> { fn next(&mut self) -> Option<&'a [u8]> { + use std::raw::Slice; match self.cursor == self.num_operands { true => None, false => { @@ -401,17 +401,21 @@ impl <'a> Iterator<&'a [u8]> for MergeOperands { let spacing = mem::size_of::<*const *const u8>(); let spacing_len = mem::size_of::<*const size_t>(); let len_ptr = (base_len + (spacing_len * self.cursor)) as *const size_t; - let len = *len_ptr; - println!("len: {}", len); + let len = *len_ptr as uint; let ptr = base + (spacing * self.cursor); - let op = slice::from_raw_buf(*(ptr as *const &*const u8), len as uint); + let op = from_buf_len(*(ptr as *const *const u8), len); + let des: Option = from_str(op.as_slice()); self.cursor += 1; - println!("returning: {}", from_utf8(op)); - Some(op) + Some(mem::transmute(Slice{data:*(ptr as *const *const u8) as *const u8, len: len})) } } } } + + fn size_hint(&self) -> (uint, Option) { + let remaining = self.num_operands - self.cursor; + (remaining, Some(remaining)) + } } extern "C" fn full_merge( @@ -422,14 +426,14 @@ extern "C" fn full_merge( success: *mut u8, new_value_length: *mut size_t) -> *const c_char { unsafe { println!("in the FULL merge operator"); - /* - for mo in MergeOperands::new(operands_list, operands_list_len, num_operands) { - println!("buf: {}", mo); - } - */ - let oldkey = from_buf_len(key as *const u8, key_len as uint); + let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); + let key = from_buf_len(key as *const u8, key_len as uint); let oldval = from_buf_len(existing_value as *const u8, existing_value_len as uint); - println!("old key: {}", oldval); + + println!("returning from FULL merge"); + //TODO rust will "free" this when it goes out of scope, copy this to a non-gc'd buffer to return + merge(key, Some(oldval), operands); + let buf = libc::malloc(1 as size_t); match buf.is_null() { false => { @@ -476,6 +480,13 @@ extern "C" fn partial_merge( } } +fn merge<'a>(new_key: String, existing_val: Option, operands: &mut MergeOperands) -> &'a [u8] { + for op in *operands { + println!("op: {}", from_utf8(op)); + } + + "yoyo".as_bytes() +} #[allow(dead_code)] #[test] @@ -489,11 +500,10 @@ fn mergetest() { full_merge, partial_merge, None, - mo_name); + mergeoperator_name); opts.create_if_missing(true); opts.set_merge_operator(mo); let db = RocksDB::open(opts, path).unwrap(); - println!("here!"); let p = db.put(b"k1", b"1"); assert!(p.is_ok()); db.merge(b"k1", b"10"); @@ -502,7 +512,6 @@ fn mergetest() { db.merge(b"k1", b"4"); let m = db.merge(b"k1", b"5"); assert!(m.is_ok()); - println!("after merge"); db.get(b"k1").map( |value| { match value.to_utf8() { Some(v) => From 5f0b895cca2ff5c0e584f09db21b1c1257cd19a5 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Mon, 8 Dec 2014 01:31:27 -0500 Subject: [PATCH 6/8] more thrashing about for merge operator support --- src/ffi.rs | 2 +- src/rocksdb.rs | 186 ++++++++++++++++++++++++++++--------------------- 2 files changed, 106 insertions(+), 82 deletions(-) diff --git a/src/ffi.rs b/src/ffi.rs index d44fd9c..dfca528 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -128,7 +128,7 @@ extern { ) -> *const c_char, partial_merge: extern fn( arg: *mut c_void, key: *const c_char, key_len: size_t, - operands_list: *const c_void, operands_list_len: *const c_void, + operands_list: *const *const c_char, operands_list_len: *const size_t, num_operands: c_int, success: *mut u8, new_value_length: *mut size_t ) -> *const c_char, diff --git a/src/rocksdb.rs b/src/rocksdb.rs index 0047017..c855918 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -54,9 +54,10 @@ impl RocksDBOptions { } } - pub fn set_merge_operator(&self, mo: rocksdb_ffi::RocksDBMergeOperator) { + pub fn add_merge_operator(&self, name: &[str], merge_fn: for <'b> fn (String, Option, &mut MergeOperands) -> &'b [u8]) { unsafe { - rocksdb_ffi::rocksdb_options_set_merge_operator(self.inner, mo); + let mo = MergeOperator::new(name, merge_fn); + rocksdb_ffi::rocksdb_options_set_merge_operator(self.inner, mo.mo); } } } @@ -355,20 +356,6 @@ fn external() { assert!(RocksDB::destroy(opts, path).is_ok()); } -extern "C" fn null_destructor(args: *mut c_void) { - println!("in null_destructor"); -} -extern "C" fn mergeoperator_name(args: *mut c_void) -> *const c_char { - println!("in mergeoperator_name"); - let name = "test_mo".to_c_str(); - unsafe { - let buf = libc::malloc(8 as size_t); - ptr::copy_memory(&mut *buf, name.as_ptr() as *const c_void, 8); - println!("returning from mergeoperator_name"); - buf as *const c_char - } -} - struct MergeOperands<'a> { operands_list: *const *const c_char, operands_list_len: *const size_t, @@ -389,7 +376,7 @@ impl <'a> MergeOperands<'a> { } } -impl <'a> Iterator<&'a [u8]> for MergeOperands<'a> { +impl <'a> Iterator<&'a [u8]> for &'a mut MergeOperands<'a> { fn next(&mut self) -> Option<&'a [u8]> { use std::raw::Slice; match self.cursor == self.num_operands { @@ -418,70 +405,114 @@ impl <'a> Iterator<&'a [u8]> for MergeOperands<'a> { } } -extern "C" fn full_merge( - arg: *mut c_void, key: *const c_char, key_len: size_t, - existing_value: *const c_char, existing_value_len: size_t, - operands_list: *const *const c_char, operands_list_len: *const size_t, - num_operands: c_int, - success: *mut u8, new_value_length: *mut size_t) -> *const c_char { - unsafe { - println!("in the FULL merge operator"); - let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); - let key = from_buf_len(key as *const u8, key_len as uint); - let oldval = from_buf_len(existing_value as *const u8, existing_value_len as uint); +struct MergeOperatorState<'a> { + name: &'a [str], + merge_fn: for <'b> fn (String, Option, &mut MergeOperands) -> &'b [u8], +} - println!("returning from FULL merge"); - //TODO rust will "free" this when it goes out of scope, copy this to a non-gc'd buffer to return - merge(key, Some(oldval), operands); +struct MergeOperator<'a> { + mo: rocksdb_ffi::RocksDBMergeOperator, + state: MergeOperatorState<'a>, +} - let buf = libc::malloc(1 as size_t); - match buf.is_null() { - false => { - *new_value_length = 1 as size_t; - *success = 1 as u8; - let newval = "2"; - ptr::copy_memory(&mut *buf, newval.as_ptr() as *const c_void, 1); - println!("returning from full_merge"); - buf as *const c_char - }, - true => { - println!("returning from full_merge"); - 0 as *const c_char - } +impl <'a> MergeOperator<'a> { + pub fn new<'a>(name: &'a [str], merge_fn: for <'b> fn (String, Option, &mut MergeOperands) -> &'b [u8]) -> &'a MergeOperator<'a> { + let state = &MergeOperatorState { + name: name, + merge_fn: merge_fn, + }; + + let ffi_operator = rocksdb_ffi::rocksdb_mergeoperator_create( + state as *mut c_void, + state.null_destructor, + state.full_merge, + state.partial_merge, + None, + state.mergeoperator_name); + + &MergeOperator { + mo: ffi_operator, + state: state, } } } -extern "C" fn partial_merge( - arg: *mut c_void, key: *const c_char, key_len: size_t, - operands_list: *const c_void, operands_list_len: *const c_void, - num_operands: c_int, - success: *mut u8, new_value_length: *mut size_t) -> *const c_char { - unsafe { - println!("in the PARTIAL merge operator"); - *new_value_length = 2; - *success = 1 as u8; - let buf = libc::malloc(1 as size_t); - match buf.is_null() { - false => { - println!("number of operands: {}", num_operands); - println!("first operand: {}", from_buf_len(operands_list as *const u8, 1)); - *new_value_length = 1 as size_t; - *success = 1 as u8; - let newval = "2"; - ptr::copy_memory(&mut *buf, newval.as_ptr() as *const c_void, 1); - println!("returning from partial_merge"); - buf as *const c_char - }, - true => { - println!("returning from partial_merge"); - 0 as *const c_char - } + +impl <'a> MergeOperatorState<'a> { + + extern "C" fn null_destructor(&self) { + println!("in null_destructor"); + } + + extern "C" fn mergeoperator_name(&self) -> *const c_char { + println!("in mergeoperator_name"); + let name = self.name.to_c_str(); + unsafe { + let buf = libc::malloc(8 as size_t); + ptr::copy_memory(&mut *buf, name.as_ptr() as *const c_void, 8); + println!("returning from mergeoperator_name"); + buf as *const c_char + } + } + + extern "C" fn full_merge( + &self, key: *const c_char, key_len: size_t, + existing_value: *const c_char, existing_value_len: size_t, + operands_list: *const *const c_char, operands_list_len: *const size_t, + num_operands: c_int, + success: *mut u8, new_value_length: *mut size_t) -> *const c_char { + unsafe { + println!("in the FULL merge operator"); + let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); + let key = from_buf_len(key as *const u8, key_len as uint); + let oldval = from_buf_len(existing_value as *const u8, existing_value_len as uint); + let result = self.merge_fn(key, Some(oldval), operands); + + let buf = libc::malloc(result.len() as size_t); + assert!(buf.is_not_null()); + *new_value_length = 1 as size_t; + *success = 1 as u8; + let newval = "2"; + ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len()); + println!("returning from full_merge"); + buf as *const c_char + } + } + + extern "C" fn partial_merge( + &self, key: *const c_char, key_len: size_t, + operands_list: *const *const c_char, operands_list_len: *const size_t, + num_operands: c_int, + success: *mut u8, new_value_length: *mut size_t) -> *const c_char { + unsafe { + println!("in the PARTIAL merge operator"); + let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); + let key = from_buf_len(key as *const u8, key_len as uint); + let result = self.merge_fn(key, None, operands); + + let buf = libc::malloc(result.len() as size_t); + assert!(buf.is_not_null()); + *new_value_length = 1 as size_t; + *success = 1 as u8; + let newval = "2"; + ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len()); + buf as *const c_char } } } -fn merge<'a>(new_key: String, existing_val: Option, operands: &mut MergeOperands) -> &'a [u8] { - for op in *operands { +fn create_full_merge(provided_merge: for<'a> fn (new_key: String, existing_val: Option, + mut operands: &mut MergeOperands) -> &'a [u8]) { + +} + +fn create_partial_merge(provided_merge: for<'a> fn (new_key: String, existing_val: Option, + mut operands: &mut MergeOperands) -> &'a [u8]) { + +} + +fn test_provided_merge<'a>(new_key: String, existing_val: Option, + mut operands: &mut MergeOperands) -> &'a [u8] { + for op in operands { println!("op: {}", from_utf8(op)); } @@ -494,15 +525,8 @@ fn mergetest() { let path = "_rust_rocksdb_mergetest"; unsafe { let opts = RocksDBOptions::new(); - let mo = rocksdb_ffi::rocksdb_mergeoperator_create( - 0 as *mut c_void, - null_destructor, - full_merge, - partial_merge, - None, - mergeoperator_name); opts.create_if_missing(true); - opts.set_merge_operator(mo); + opts.add_merge_operator("test operator", test_provided_merge); let db = RocksDB::open(opts, path).unwrap(); let p = db.put(b"k1", b"1"); assert!(p.is_ok()); @@ -524,7 +548,7 @@ fn mergetest() { assert!(m.is_ok()); let r: RocksDBResult = db.get(b"k1"); - assert!(r.unwrap().to_utf8().unwrap() == "2"); + //assert!(r.unwrap().to_utf8().unwrap() == "yoyo"); assert!(db.delete(b"k1").is_ok()); assert!(db.get(b"k1").is_none()); db.close(); From 6eb01eb07c20cd758719e5c3561804771074f24c Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Sat, 13 Dec 2014 15:58:23 -0500 Subject: [PATCH 7/8] basic rustic merge operator --- src/rocksdb.rs | 189 +++++++++++++++++++++---------------------------- 1 file changed, 81 insertions(+), 108 deletions(-) diff --git a/src/rocksdb.rs b/src/rocksdb.rs index c855918..02db25e 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -54,10 +54,21 @@ impl RocksDBOptions { } } - pub fn add_merge_operator(&self, name: &[str], merge_fn: for <'b> fn (String, Option, &mut MergeOperands) -> &'b [u8]) { + pub fn add_merge_operator<'a>(&self, name: &str, merge_fn: for <'a> fn (String, Option, &mut MergeOperands) -> Vec) { + let cb = box MergeOperatorCallback { + name: name.to_c_str(), + merge_fn: merge_fn, + }; + unsafe { - let mo = MergeOperator::new(name, merge_fn); - rocksdb_ffi::rocksdb_options_set_merge_operator(self.inner, mo.mo); + let mo = rocksdb_ffi::rocksdb_mergeoperator_create( + mem::transmute(cb), + destructor_callback, + full_merge_callback, + partial_merge_callback, + None, + name_callback); + rocksdb_ffi::rocksdb_options_set_merge_operator(self.inner, mo); } } } @@ -356,7 +367,7 @@ fn external() { assert!(RocksDB::destroy(opts, path).is_ok()); } -struct MergeOperands<'a> { +pub struct MergeOperands<'a> { operands_list: *const *const c_char, operands_list_len: *const size_t, num_operands: uint, @@ -405,118 +416,80 @@ impl <'a> Iterator<&'a [u8]> for &'a mut MergeOperands<'a> { } } -struct MergeOperatorState<'a> { - name: &'a [str], - merge_fn: for <'b> fn (String, Option, &mut MergeOperands) -> &'b [u8], +struct MergeOperatorCallback { + name: CString, + merge_fn: for <'b> fn (String, Option, &mut MergeOperands) -> Vec, } -struct MergeOperator<'a> { - mo: rocksdb_ffi::RocksDBMergeOperator, - state: MergeOperatorState<'a>, +extern "C" fn destructor_callback(raw_cb: *mut c_void) { + // turn this back into a local variable so rust will reclaim it + let _: Box = unsafe {mem::transmute(raw_cb)}; + } -impl <'a> MergeOperator<'a> { - pub fn new<'a>(name: &'a [str], merge_fn: for <'b> fn (String, Option, &mut MergeOperands) -> &'b [u8]) -> &'a MergeOperator<'a> { - let state = &MergeOperatorState { - name: name, - merge_fn: merge_fn, - }; - - let ffi_operator = rocksdb_ffi::rocksdb_mergeoperator_create( - state as *mut c_void, - state.null_destructor, - state.full_merge, - state.partial_merge, - None, - state.mergeoperator_name); - - &MergeOperator { - mo: ffi_operator, - state: state, - } +extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char { + unsafe { + let cb: &mut MergeOperatorCallback = &mut *(raw_cb as *mut MergeOperatorCallback); + let ptr = cb.name.as_ptr(); + ptr as *const c_char } } -impl <'a> MergeOperatorState<'a> { - - extern "C" fn null_destructor(&self) { - println!("in null_destructor"); - } - - extern "C" fn mergeoperator_name(&self) -> *const c_char { - println!("in mergeoperator_name"); - let name = self.name.to_c_str(); - unsafe { - let buf = libc::malloc(8 as size_t); - ptr::copy_memory(&mut *buf, name.as_ptr() as *const c_void, 8); - println!("returning from mergeoperator_name"); - buf as *const c_char - } - } - - extern "C" fn full_merge( - &self, key: *const c_char, key_len: size_t, - existing_value: *const c_char, existing_value_len: size_t, - operands_list: *const *const c_char, operands_list_len: *const size_t, - num_operands: c_int, - success: *mut u8, new_value_length: *mut size_t) -> *const c_char { - unsafe { - println!("in the FULL merge operator"); - let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); - let key = from_buf_len(key as *const u8, key_len as uint); - let oldval = from_buf_len(existing_value as *const u8, existing_value_len as uint); - let result = self.merge_fn(key, Some(oldval), operands); - - let buf = libc::malloc(result.len() as size_t); - assert!(buf.is_not_null()); - *new_value_length = 1 as size_t; - *success = 1 as u8; - let newval = "2"; - ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len()); - println!("returning from full_merge"); - buf as *const c_char - } - } - - extern "C" fn partial_merge( - &self, key: *const c_char, key_len: size_t, - operands_list: *const *const c_char, operands_list_len: *const size_t, - num_operands: c_int, - success: *mut u8, new_value_length: *mut size_t) -> *const c_char { - unsafe { - println!("in the PARTIAL merge operator"); - let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); - let key = from_buf_len(key as *const u8, key_len as uint); - let result = self.merge_fn(key, None, operands); - - let buf = libc::malloc(result.len() as size_t); - assert!(buf.is_not_null()); - *new_value_length = 1 as size_t; - *success = 1 as u8; - let newval = "2"; - ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len()); - buf as *const c_char - } +extern "C" fn full_merge_callback( + raw_cb: *mut c_void, key: *const c_char, key_len: size_t, + existing_value: *const c_char, existing_value_len: size_t, + operands_list: *const *const c_char, operands_list_len: *const size_t, + num_operands: c_int, + success: *mut u8, new_value_length: *mut size_t) -> *const c_char { + unsafe { + let cb: &mut MergeOperatorCallback = &mut *(raw_cb as *mut MergeOperatorCallback); + let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); + let key = from_buf_len(key as *const u8, key_len as uint); + let oldval = from_buf_len(existing_value as *const u8, existing_value_len as uint); + let mut result = (cb.merge_fn)(key, Some(oldval), operands); + result.shrink_to_fit(); + //TODO(tan) investigate zero-copy techniques to improve performance + let buf = libc::malloc(result.len() as size_t); + assert!(buf.is_not_null()); + *new_value_length = result.len() as size_t; + *success = 1 as u8; + ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len()); + buf as *const c_char } } -fn create_full_merge(provided_merge: for<'a> fn (new_key: String, existing_val: Option, - mut operands: &mut MergeOperands) -> &'a [u8]) { - -} - -fn create_partial_merge(provided_merge: for<'a> fn (new_key: String, existing_val: Option, - mut operands: &mut MergeOperands) -> &'a [u8]) { - +extern "C" fn partial_merge_callback( + raw_cb: *mut c_void, key: *const c_char, key_len: size_t, + operands_list: *const *const c_char, operands_list_len: *const size_t, + num_operands: c_int, + success: *mut u8, new_value_length: *mut size_t) -> *const c_char { + unsafe { + let cb: &mut MergeOperatorCallback = &mut *(raw_cb as *mut MergeOperatorCallback); + let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); + let key = from_buf_len(key as *const u8, key_len as uint); + let mut result = (cb.merge_fn)(key, None, operands); + result.shrink_to_fit(); + //TODO(tan) investigate zero-copy techniques to improve performance + let buf = libc::malloc(result.len() as size_t); + assert!(buf.is_not_null()); + *new_value_length = 1 as size_t; + *success = 1 as u8; + ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len()); + buf as *const c_char + } } fn test_provided_merge<'a>(new_key: String, existing_val: Option, - mut operands: &mut MergeOperands) -> &'a [u8] { + mut operands: &mut MergeOperands) -> Vec { + let mut result: Vec = Vec::with_capacity(operands.size_hint().val0()); + match existing_val { + Some(v) => result.push_all(v.as_bytes()), + None => (), + } for op in operands { - println!("op: {}", from_utf8(op)); + result.push_all(op); } - - "yoyo".as_bytes() + result } #[allow(dead_code)] @@ -528,13 +501,13 @@ fn mergetest() { opts.create_if_missing(true); opts.add_merge_operator("test operator", test_provided_merge); let db = RocksDB::open(opts, path).unwrap(); - let p = db.put(b"k1", b"1"); + let p = db.put(b"k1", b"a"); assert!(p.is_ok()); - db.merge(b"k1", b"10"); - db.merge(b"k1", b"2"); - db.merge(b"k1", b"3"); - db.merge(b"k1", b"4"); - let m = db.merge(b"k1", b"5"); + db.merge(b"k1", b"b"); + db.merge(b"k1", b"c"); + db.merge(b"k1", b"d"); + db.merge(b"k1", b"efg"); + let m = db.merge(b"k1", b"h"); assert!(m.is_ok()); db.get(b"k1").map( |value| { match value.to_utf8() { @@ -548,7 +521,7 @@ fn mergetest() { assert!(m.is_ok()); let r: RocksDBResult = db.get(b"k1"); - //assert!(r.unwrap().to_utf8().unwrap() == "yoyo"); + assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh"); assert!(db.delete(b"k1").is_ok()); assert!(db.get(b"k1").is_none()); db.close(); From 06e5e44b67741f1c7df3b2c43c206f0c0fb1c180 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Sat, 13 Dec 2014 16:04:22 -0500 Subject: [PATCH 8/8] training wheels off. &[u8] not String --- src/rocksdb.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/rocksdb.rs b/src/rocksdb.rs index 02db25e..b732955 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -54,7 +54,7 @@ impl RocksDBOptions { } } - pub fn add_merge_operator<'a>(&self, name: &str, merge_fn: for <'a> fn (String, Option, &mut MergeOperands) -> Vec) { + pub fn add_merge_operator<'a>(&self, name: &str, merge_fn: for <'a> fn (&[u8], Option<&[u8]>, &mut MergeOperands) -> Vec) { let cb = box MergeOperatorCallback { name: name.to_c_str(), merge_fn: merge_fn, @@ -418,7 +418,7 @@ impl <'a> Iterator<&'a [u8]> for &'a mut MergeOperands<'a> { struct MergeOperatorCallback { name: CString, - merge_fn: for <'b> fn (String, Option, &mut MergeOperands) -> Vec, + merge_fn: for <'b> fn (&[u8], Option<&[u8]>, &mut MergeOperands) -> Vec, } extern "C" fn destructor_callback(raw_cb: *mut c_void) { @@ -446,7 +446,7 @@ extern "C" fn full_merge_callback( let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); let key = from_buf_len(key as *const u8, key_len as uint); let oldval = from_buf_len(existing_value as *const u8, existing_value_len as uint); - let mut result = (cb.merge_fn)(key, Some(oldval), operands); + let mut result = (cb.merge_fn)(key.as_bytes(), Some(oldval.as_bytes()), operands); result.shrink_to_fit(); //TODO(tan) investigate zero-copy techniques to improve performance let buf = libc::malloc(result.len() as size_t); @@ -467,7 +467,7 @@ extern "C" fn partial_merge_callback( let cb: &mut MergeOperatorCallback = &mut *(raw_cb as *mut MergeOperatorCallback); let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); let key = from_buf_len(key as *const u8, key_len as uint); - let mut result = (cb.merge_fn)(key, None, operands); + let mut result = (cb.merge_fn)(key.as_bytes(), None, operands); result.shrink_to_fit(); //TODO(tan) investigate zero-copy techniques to improve performance let buf = libc::malloc(result.len() as size_t); @@ -479,11 +479,11 @@ extern "C" fn partial_merge_callback( } } -fn test_provided_merge<'a>(new_key: String, existing_val: Option, +fn test_provided_merge<'a>(new_key: &[u8], existing_val: Option<&[u8]>, mut operands: &mut MergeOperands) -> Vec { let mut result: Vec = Vec::with_capacity(operands.size_hint().val0()); match existing_val { - Some(v) => result.push_all(v.as_bytes()), + Some(v) => result.push_all(v), None => (), } for op in operands {