From 023dd07b1c3a9b6efda7dab194ee357cb667f79c Mon Sep 17 00:00:00 2001 From: BoOTheFurious <74402778+BoOTheFurious@users.noreply.github.com> Date: Tue, 8 Dec 2020 16:43:10 +0100 Subject: [PATCH] Fix logic related to merge operator settings (#481) --- src/db_options.rs | 48 +++++++++++++++++------ src/merge_operator.rs | 36 +++++++++++------ tests/test_column_family.rs | 8 ++-- tests/test_merge_operator.rs | 76 ++++++++++++++++++++++++++++++++++-- 4 files changed, 136 insertions(+), 32 deletions(-) diff --git a/src/db_options.rs b/src/db_options.rs index c92027e..6777d08 100644 --- a/src/db_options.rs +++ b/src/db_options.rs @@ -929,26 +929,50 @@ impl Options { } } - pub fn set_merge_operator( + pub fn set_merge_operator_associative( &mut self, name: &str, - full_merge_fn: MergeFn, - partial_merge_fn: Option, + full_merge_fn: F, + ) { + let cb = Box::new(MergeOperatorCallback { + name: CString::new(name.as_bytes()).unwrap(), + full_merge_fn: full_merge_fn.clone(), + partial_merge_fn: full_merge_fn, + }); + + unsafe { + let mo = ffi::rocksdb_mergeoperator_create( + Box::into_raw(cb) as _, + Some(merge_operator::destructor_callback::), + Some(full_merge_callback::), + Some(partial_merge_callback::), + None, + Some(merge_operator::name_callback::), + ); + ffi::rocksdb_options_set_merge_operator(self.inner, mo); + } + } + + pub fn set_merge_operator( + &mut self, + name: &str, + full_merge_fn: F, + partial_merge_fn: PF, ) { let cb = Box::new(MergeOperatorCallback { name: CString::new(name.as_bytes()).unwrap(), full_merge_fn, - partial_merge_fn: partial_merge_fn.unwrap_or(full_merge_fn), + partial_merge_fn, }); unsafe { let mo = ffi::rocksdb_mergeoperator_create( - mem::transmute(cb), - Some(merge_operator::destructor_callback), - Some(full_merge_callback), - Some(partial_merge_callback), - Some(merge_operator::delete_callback), - Some(merge_operator::name_callback), + Box::into_raw(cb) as _, + Some(merge_operator::destructor_callback::), + Some(full_merge_callback::), + Some(partial_merge_callback::), + None, + Some(merge_operator::name_callback::), ); ffi::rocksdb_options_set_merge_operator(self.inner, mo); } @@ -958,8 +982,8 @@ impl Options { since = "0.5.0", note = "add_merge_operator has been renamed to set_merge_operator" )] - pub fn add_merge_operator(&mut self, name: &str, merge_fn: MergeFn) { - self.set_merge_operator(name, merge_fn, None); + pub fn add_merge_operator(&mut self, name: &str, merge_fn: F) { + self.set_merge_operator_associative(name, merge_fn); } /// Sets a compaction filter used to determine if entries should be kept, changed, diff --git a/src/merge_operator.rs b/src/merge_operator.rs index bdee01a..e78a23b 100644 --- a/src/merge_operator.rs +++ b/src/merge_operator.rs @@ -41,7 +41,7 @@ //!let mut opts = Options::default(); //! //!opts.create_if_missing(true); -//!opts.set_merge_operator("test operator", concat_merge, None); +//!opts.set_merge_operator_associative("test operator", concat_merge); //!{ //! let db = DB::open(&opts, path).unwrap(); //! let p = db.put(b"k1", b"a"); @@ -61,16 +61,24 @@ use std::mem; use std::ptr; use std::slice; -pub type MergeFn = fn(&[u8], Option<&[u8]>, &mut MergeOperands) -> Option>; +pub trait MergeFn: + Fn(&[u8], Option<&[u8]>, &mut MergeOperands) -> Option> + Send + Sync + 'static +{ +} +impl MergeFn for F where + F: Fn(&[u8], Option<&[u8]>, &mut MergeOperands) -> Option> + Send + Sync + 'static +{ +} -pub struct MergeOperatorCallback { +pub struct MergeOperatorCallback { pub name: CString, - pub full_merge_fn: MergeFn, - pub partial_merge_fn: MergeFn, + pub full_merge_fn: F, + pub partial_merge_fn: PF, } -pub unsafe extern "C" fn destructor_callback(raw_cb: *mut c_void) { - let _: Box = mem::transmute(raw_cb); +pub unsafe extern "C" fn destructor_callback(raw_cb: *mut c_void) { + let _: Box> = + Box::from_raw(raw_cb as *mut MergeOperatorCallback); } pub unsafe extern "C" fn delete_callback( @@ -86,12 +94,14 @@ pub unsafe extern "C" fn delete_callback( } } -pub unsafe extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char { - let cb = &mut *(raw_cb as *mut MergeOperatorCallback); +pub unsafe extern "C" fn name_callback( + raw_cb: *mut c_void, +) -> *const c_char { + let cb = &mut *(raw_cb as *mut MergeOperatorCallback); cb.name.as_ptr() } -pub unsafe extern "C" fn full_merge_callback( +pub unsafe extern "C" fn full_merge_callback( raw_cb: *mut c_void, raw_key: *const c_char, key_len: size_t, @@ -103,7 +113,7 @@ pub unsafe extern "C" fn full_merge_callback( success: *mut u8, new_value_length: *mut size_t, ) -> *mut c_char { - let cb = &mut *(raw_cb as *mut MergeOperatorCallback); + let cb = &mut *(raw_cb as *mut MergeOperatorCallback); let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize); let oldval = if existing_value.is_null() { @@ -128,7 +138,7 @@ pub unsafe extern "C" fn full_merge_callback( ) } -pub unsafe extern "C" fn partial_merge_callback( +pub unsafe extern "C" fn partial_merge_callback( raw_cb: *mut c_void, raw_key: *const c_char, key_len: size_t, @@ -138,7 +148,7 @@ pub unsafe extern "C" fn partial_merge_callback( success: *mut u8, new_value_length: *mut size_t, ) -> *mut c_char { - let cb = &mut *(raw_cb as *mut MergeOperatorCallback); + let cb = &mut *(raw_cb as *mut MergeOperatorCallback); let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize); (cb.partial_merge_fn)(key, None, operands).map_or_else( diff --git a/tests/test_column_family.rs b/tests/test_column_family.rs index 16afef0..95bb138 100644 --- a/tests/test_column_family.rs +++ b/tests/test_column_family.rs @@ -27,7 +27,7 @@ fn test_column_family() { { let mut opts = Options::default(); opts.create_if_missing(true); - opts.set_merge_operator("test operator", test_provided_merge, None); + opts.set_merge_operator_associative("test operator", test_provided_merge); let mut db = DB::open(&opts, &n).unwrap(); let opts = Options::default(); match db.create_cf("cf1", &opts) { @@ -41,7 +41,7 @@ fn test_column_family() { // should fail to open db without specifying same column families { let mut opts = Options::default(); - opts.set_merge_operator("test operator", test_provided_merge, None); + opts.set_merge_operator_associative("test operator", test_provided_merge); match DB::open(&opts, &n) { Ok(_db) => panic!( "should not have opened DB successfully without \ @@ -58,7 +58,7 @@ fn test_column_family() { // should properly open db when specyfing all column families { let mut opts = Options::default(); - opts.set_merge_operator("test operator", test_provided_merge, None); + opts.set_merge_operator_associative("test operator", test_provided_merge); match DB::open_cf(&opts, &n, &["cf1"]) { Ok(_db) => println!("successfully opened db with column family"), Err(e) => panic!("failed to open db with column family: {}", e), @@ -137,7 +137,7 @@ fn test_merge_operator() { // TODO should be able to write, read, merge, batch, and iterate over a cf { let mut opts = Options::default(); - opts.set_merge_operator("test operator", test_provided_merge, None); + opts.set_merge_operator_associative("test operator", test_provided_merge); let db = match DB::open_cf(&opts, &n, &["cf1"]) { Ok(db) => { println!("successfully opened db with column family"); diff --git a/tests/test_merge_operator.rs b/tests/test_merge_operator.rs index 74c0424..021eb2a 100644 --- a/tests/test_merge_operator.rs +++ b/tests/test_merge_operator.rs @@ -16,6 +16,7 @@ mod util; use pretty_assertions::assert_eq; +use rocksdb::merge_operator::MergeFn; use rocksdb::{DBCompactionStyle, MergeOperands, Options, DB}; use util::DBPath; @@ -46,7 +47,7 @@ fn merge_test() { let db_path = DBPath::new("_rust_rocksdb_merge_test"); let mut opts = Options::default(); opts.create_if_missing(true); - opts.set_merge_operator("test operator", test_provided_merge, None); + opts.set_merge_operator_associative("test operator", test_provided_merge); let db = DB::open(&opts, &db_path).unwrap(); let p = db.put(b"k1", b"a"); @@ -159,7 +160,7 @@ fn counting_merge_test() { opts.set_merge_operator( "sort operator", test_counting_full_merge, - Some(test_counting_partial_merge), + test_counting_partial_merge, ); let db = Arc::new(DB::open(&opts, &db_path).unwrap()); @@ -278,7 +279,7 @@ fn failed_merge_test() { let db_path = DBPath::new("_rust_rocksdb_failed_merge_test"); let mut opts = Options::default(); opts.create_if_missing(true); - opts.set_merge_operator("test operator", test_failing_merge, None); + opts.set_merge_operator_associative("test operator", test_failing_merge); let db = DB::open(&opts, &db_path).expect("open with a merge operator"); db.put(b"key", b"value").expect("put_ok"); @@ -290,3 +291,72 @@ fn failed_merge_test() { } } } + +fn make_merge_max_with_limit(limit: u64) -> impl MergeFn + Clone { + move |_key: &[u8], first: Option<&[u8]>, rest: &mut MergeOperands| { + let max = first + .into_iter() + .chain(rest) + .map(|slice| { + let mut bytes: [u8; 8] = Default::default(); + bytes.clone_from_slice(slice); + u64::from_ne_bytes(bytes) + }) + .fold(0, u64::max); + let new_value = max.min(limit); + Some(Vec::from(new_value.to_ne_bytes().as_ref())) + } +} + +#[test] +fn test_merge_state() { + use {Options, DB}; + let path = "_rust_rocksdb_mergetest_state"; + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.set_merge_operator_associative("max-limit-12", make_merge_max_with_limit(12)); + { + let db = DB::open(&opts, path).unwrap(); + let p = db.put(b"k1", 1u64.to_ne_bytes()); + assert!(p.is_ok()); + let _ = db.merge(b"k1", 7u64.to_ne_bytes()); + let m = db.merge(b"k1", 64u64.to_ne_bytes()); + assert!(m.is_ok()); + match db.get(b"k1") { + Ok(Some(value)) => { + let mut bytes: [u8; 8] = Default::default(); + bytes.copy_from_slice(&value); + assert_eq!(u64::from_ne_bytes(bytes), 12); + } + Err(_) => println!("error reading value"), + _ => panic!("value not present"), + } + + assert!(db.delete(b"k1").is_ok()); + assert!(db.get(b"k1").unwrap().is_none()); + } + assert!(DB::destroy(&opts, path).is_ok()); + + opts.set_merge_operator_associative("max-limit-128", make_merge_max_with_limit(128)); + { + let db = DB::open(&opts, path).unwrap(); + let p = db.put(b"k1", 1u64.to_ne_bytes()); + assert!(p.is_ok()); + let _ = db.merge(b"k1", 7u64.to_ne_bytes()); + let m = db.merge(b"k1", 64u64.to_ne_bytes()); + assert!(m.is_ok()); + match db.get(b"k1") { + Ok(Some(value)) => { + let mut bytes: [u8; 8] = Default::default(); + bytes.copy_from_slice(&value); + assert_eq!(u64::from_ne_bytes(bytes), 64); + } + Err(_) => println!("error reading value"), + _ => panic!("value not present"), + } + + assert!(db.delete(b"k1").is_ok()); + assert!(db.get(b"k1").unwrap().is_none()); + } + assert!(DB::destroy(&opts, path).is_ok()); +}