Fix logic related to merge operator settings (#481)

master
BoOTheFurious 4 years ago committed by GitHub
parent e8bb30aaa1
commit 023dd07b1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 48
      src/db_options.rs
  2. 36
      src/merge_operator.rs
  3. 8
      tests/test_column_family.rs
  4. 76
      tests/test_merge_operator.rs

@ -929,26 +929,50 @@ impl Options {
}
}
pub fn set_merge_operator(
pub fn set_merge_operator_associative<F: MergeFn + Clone>(
&mut self,
name: &str,
full_merge_fn: MergeFn,
partial_merge_fn: Option<MergeFn>,
full_merge_fn: F,
) {
let cb = Box::new(MergeOperatorCallback {
name: CString::new(name.as_bytes()).unwrap(),
full_merge_fn: full_merge_fn.clone(),
partial_merge_fn: full_merge_fn,
});
unsafe {
let mo = ffi::rocksdb_mergeoperator_create(
Box::into_raw(cb) as _,
Some(merge_operator::destructor_callback::<F, F>),
Some(full_merge_callback::<F, F>),
Some(partial_merge_callback::<F, F>),
None,
Some(merge_operator::name_callback::<F, F>),
);
ffi::rocksdb_options_set_merge_operator(self.inner, mo);
}
}
pub fn set_merge_operator<F: MergeFn, PF: MergeFn>(
&mut self,
name: &str,
full_merge_fn: F,
partial_merge_fn: PF,
) {
let cb = Box::new(MergeOperatorCallback {
name: CString::new(name.as_bytes()).unwrap(),
full_merge_fn,
partial_merge_fn: partial_merge_fn.unwrap_or(full_merge_fn),
partial_merge_fn,
});
unsafe {
let mo = ffi::rocksdb_mergeoperator_create(
mem::transmute(cb),
Some(merge_operator::destructor_callback),
Some(full_merge_callback),
Some(partial_merge_callback),
Some(merge_operator::delete_callback),
Some(merge_operator::name_callback),
Box::into_raw(cb) as _,
Some(merge_operator::destructor_callback::<F, PF>),
Some(full_merge_callback::<F, PF>),
Some(partial_merge_callback::<F, PF>),
None,
Some(merge_operator::name_callback::<F, PF>),
);
ffi::rocksdb_options_set_merge_operator(self.inner, mo);
}
@ -958,8 +982,8 @@ impl Options {
since = "0.5.0",
note = "add_merge_operator has been renamed to set_merge_operator"
)]
pub fn add_merge_operator(&mut self, name: &str, merge_fn: MergeFn) {
self.set_merge_operator(name, merge_fn, None);
pub fn add_merge_operator<F: MergeFn + Clone>(&mut self, name: &str, merge_fn: F) {
self.set_merge_operator_associative(name, merge_fn);
}
/// Sets a compaction filter used to determine if entries should be kept, changed,

@ -41,7 +41,7 @@
//!let mut opts = Options::default();
//!
//!opts.create_if_missing(true);
//!opts.set_merge_operator("test operator", concat_merge, None);
//!opts.set_merge_operator_associative("test operator", concat_merge);
//!{
//! let db = DB::open(&opts, path).unwrap();
//! let p = db.put(b"k1", b"a");
@ -61,16 +61,24 @@ use std::mem;
use std::ptr;
use std::slice;
pub type MergeFn = fn(&[u8], Option<&[u8]>, &mut MergeOperands) -> Option<Vec<u8>>;
pub trait MergeFn:
Fn(&[u8], Option<&[u8]>, &mut MergeOperands) -> Option<Vec<u8>> + Send + Sync + 'static
{
}
impl<F> MergeFn for F where
F: Fn(&[u8], Option<&[u8]>, &mut MergeOperands) -> Option<Vec<u8>> + Send + Sync + 'static
{
}
pub struct MergeOperatorCallback {
pub struct MergeOperatorCallback<F: MergeFn, PF: MergeFn> {
pub name: CString,
pub full_merge_fn: MergeFn,
pub partial_merge_fn: MergeFn,
pub full_merge_fn: F,
pub partial_merge_fn: PF,
}
pub unsafe extern "C" fn destructor_callback(raw_cb: *mut c_void) {
let _: Box<MergeOperatorCallback> = mem::transmute(raw_cb);
pub unsafe extern "C" fn destructor_callback<F: MergeFn, PF: MergeFn>(raw_cb: *mut c_void) {
let _: Box<MergeOperatorCallback<F, PF>> =
Box::from_raw(raw_cb as *mut MergeOperatorCallback<F, PF>);
}
pub unsafe extern "C" fn delete_callback(
@ -86,12 +94,14 @@ pub unsafe extern "C" fn delete_callback(
}
}
pub unsafe extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char {
let cb = &mut *(raw_cb as *mut MergeOperatorCallback);
pub unsafe extern "C" fn name_callback<F: MergeFn, PF: MergeFn>(
raw_cb: *mut c_void,
) -> *const c_char {
let cb = &mut *(raw_cb as *mut MergeOperatorCallback<F, PF>);
cb.name.as_ptr()
}
pub unsafe extern "C" fn full_merge_callback(
pub unsafe extern "C" fn full_merge_callback<F: MergeFn, PF: MergeFn>(
raw_cb: *mut c_void,
raw_key: *const c_char,
key_len: size_t,
@ -103,7 +113,7 @@ pub unsafe extern "C" fn full_merge_callback(
success: *mut u8,
new_value_length: *mut size_t,
) -> *mut c_char {
let cb = &mut *(raw_cb as *mut MergeOperatorCallback);
let cb = &mut *(raw_cb as *mut MergeOperatorCallback<F, PF>);
let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize);
let oldval = if existing_value.is_null() {
@ -128,7 +138,7 @@ pub unsafe extern "C" fn full_merge_callback(
)
}
pub unsafe extern "C" fn partial_merge_callback(
pub unsafe extern "C" fn partial_merge_callback<F: MergeFn, PF: MergeFn>(
raw_cb: *mut c_void,
raw_key: *const c_char,
key_len: size_t,
@ -138,7 +148,7 @@ pub unsafe extern "C" fn partial_merge_callback(
success: *mut u8,
new_value_length: *mut size_t,
) -> *mut c_char {
let cb = &mut *(raw_cb as *mut MergeOperatorCallback);
let cb = &mut *(raw_cb as *mut MergeOperatorCallback<F, PF>);
let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize);
(cb.partial_merge_fn)(key, None, operands).map_or_else(

@ -27,7 +27,7 @@ fn test_column_family() {
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_merge_operator("test operator", test_provided_merge, None);
opts.set_merge_operator_associative("test operator", test_provided_merge);
let mut db = DB::open(&opts, &n).unwrap();
let opts = Options::default();
match db.create_cf("cf1", &opts) {
@ -41,7 +41,7 @@ fn test_column_family() {
// should fail to open db without specifying same column families
{
let mut opts = Options::default();
opts.set_merge_operator("test operator", test_provided_merge, None);
opts.set_merge_operator_associative("test operator", test_provided_merge);
match DB::open(&opts, &n) {
Ok(_db) => panic!(
"should not have opened DB successfully without \
@ -58,7 +58,7 @@ fn test_column_family() {
// should properly open db when specyfing all column families
{
let mut opts = Options::default();
opts.set_merge_operator("test operator", test_provided_merge, None);
opts.set_merge_operator_associative("test operator", test_provided_merge);
match DB::open_cf(&opts, &n, &["cf1"]) {
Ok(_db) => println!("successfully opened db with column family"),
Err(e) => panic!("failed to open db with column family: {}", e),
@ -137,7 +137,7 @@ fn test_merge_operator() {
// TODO should be able to write, read, merge, batch, and iterate over a cf
{
let mut opts = Options::default();
opts.set_merge_operator("test operator", test_provided_merge, None);
opts.set_merge_operator_associative("test operator", test_provided_merge);
let db = match DB::open_cf(&opts, &n, &["cf1"]) {
Ok(db) => {
println!("successfully opened db with column family");

@ -16,6 +16,7 @@ mod util;
use pretty_assertions::assert_eq;
use rocksdb::merge_operator::MergeFn;
use rocksdb::{DBCompactionStyle, MergeOperands, Options, DB};
use util::DBPath;
@ -46,7 +47,7 @@ fn merge_test() {
let db_path = DBPath::new("_rust_rocksdb_merge_test");
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_merge_operator("test operator", test_provided_merge, None);
opts.set_merge_operator_associative("test operator", test_provided_merge);
let db = DB::open(&opts, &db_path).unwrap();
let p = db.put(b"k1", b"a");
@ -159,7 +160,7 @@ fn counting_merge_test() {
opts.set_merge_operator(
"sort operator",
test_counting_full_merge,
Some(test_counting_partial_merge),
test_counting_partial_merge,
);
let db = Arc::new(DB::open(&opts, &db_path).unwrap());
@ -278,7 +279,7 @@ fn failed_merge_test() {
let db_path = DBPath::new("_rust_rocksdb_failed_merge_test");
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_merge_operator("test operator", test_failing_merge, None);
opts.set_merge_operator_associative("test operator", test_failing_merge);
let db = DB::open(&opts, &db_path).expect("open with a merge operator");
db.put(b"key", b"value").expect("put_ok");
@ -290,3 +291,72 @@ fn failed_merge_test() {
}
}
}
fn make_merge_max_with_limit(limit: u64) -> impl MergeFn + Clone {
move |_key: &[u8], first: Option<&[u8]>, rest: &mut MergeOperands| {
let max = first
.into_iter()
.chain(rest)
.map(|slice| {
let mut bytes: [u8; 8] = Default::default();
bytes.clone_from_slice(slice);
u64::from_ne_bytes(bytes)
})
.fold(0, u64::max);
let new_value = max.min(limit);
Some(Vec::from(new_value.to_ne_bytes().as_ref()))
}
}
#[test]
fn test_merge_state() {
use {Options, DB};
let path = "_rust_rocksdb_mergetest_state";
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_merge_operator_associative("max-limit-12", make_merge_max_with_limit(12));
{
let db = DB::open(&opts, path).unwrap();
let p = db.put(b"k1", 1u64.to_ne_bytes());
assert!(p.is_ok());
let _ = db.merge(b"k1", 7u64.to_ne_bytes());
let m = db.merge(b"k1", 64u64.to_ne_bytes());
assert!(m.is_ok());
match db.get(b"k1") {
Ok(Some(value)) => {
let mut bytes: [u8; 8] = Default::default();
bytes.copy_from_slice(&value);
assert_eq!(u64::from_ne_bytes(bytes), 12);
}
Err(_) => println!("error reading value"),
_ => panic!("value not present"),
}
assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").unwrap().is_none());
}
assert!(DB::destroy(&opts, path).is_ok());
opts.set_merge_operator_associative("max-limit-128", make_merge_max_with_limit(128));
{
let db = DB::open(&opts, path).unwrap();
let p = db.put(b"k1", 1u64.to_ne_bytes());
assert!(p.is_ok());
let _ = db.merge(b"k1", 7u64.to_ne_bytes());
let m = db.merge(b"k1", 64u64.to_ne_bytes());
assert!(m.is_ok());
match db.get(b"k1") {
Ok(Some(value)) => {
let mut bytes: [u8; 8] = Default::default();
bytes.copy_from_slice(&value);
assert_eq!(u64::from_ne_bytes(bytes), 64);
}
Err(_) => println!("error reading value"),
_ => panic!("value not present"),
}
assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").unwrap().is_none());
}
assert!(DB::destroy(&opts, path).is_ok());
}

Loading…
Cancel
Save