From d31e2bb88e08b0dfcf85252113c098b93821ec6c Mon Sep 17 00:00:00 2001 From: Rick Richardson Date: Sun, 22 Oct 2017 12:36:39 -0700 Subject: [PATCH] updated merge_operator framework to support merge types other than associative (support a partial merge and full merges that may or may not have an existing value) --- librocksdb-sys/Cargo.toml | 2 +- src/db_options.rs | 12 +- src/lib.rs | 1 + src/merge_operator.rs | 528 +++++++++++++++++++++++++----------- tests/test_column_family.rs | 12 +- 5 files changed, 392 insertions(+), 163 deletions(-) diff --git a/librocksdb-sys/Cargo.toml b/librocksdb-sys/Cargo.toml index 4f5adc0..53f508d 100644 --- a/librocksdb-sys/Cargo.toml +++ b/librocksdb-sys/Cargo.toml @@ -22,6 +22,6 @@ libc = "0.2" const-cstr = "0.2" [build-dependencies] -cc = { version = "^1.0", features = ["parallel"] } +cc = { git = "https://github.com/alexcrichton/cc-rs", features = ["parallel"] } make-cmd = "0.1" bindgen = "0.29" diff --git a/src/db_options.rs b/src/db_options.rs index 8f3a1b2..05a68c3 100644 --- a/src/db_options.rs +++ b/src/db_options.rs @@ -200,10 +200,16 @@ impl Options { } } - pub fn set_merge_operator(&mut self, name: &str, merge_fn: MergeFn) { + pub fn set_merge_operator(&mut self, name: &str, full_merge_fn: MergeFn, partial_merge_fn: Option) { + if partial_merge_fn.is_none() { + println!("partial_merge not supplied, defaulting partial merge to full"); + } else { + println!("using supplied partial_merge function"); + } let cb = Box::new(MergeOperatorCallback { name: CString::new(name.as_bytes()).unwrap(), - merge_fn: merge_fn, + full_merge_fn: full_merge_fn, + partial_merge_fn: partial_merge_fn.unwrap_or(full_merge_fn), }); unsafe { @@ -222,7 +228,7 @@ impl Options { #[deprecated(since = "0.5.0", note = "add_merge_operator has been renamed to set_merge_operator")] pub fn add_merge_operator(&mut self, name: &str, merge_fn: MergeFn) { - self.set_merge_operator(name, merge_fn); + self.set_merge_operator(name, merge_fn, None); } /// Sets a compaction filter used to determine if entries should be kept, changed, diff --git a/src/lib.rs b/src/lib.rs index d2f7394..3180264 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -173,6 +173,7 @@ pub struct WriteOptions { inner: *mut ffi::rocksdb_writeoptions_t, } + /// An opaque type used to represent a column family. Returned from some functions, and used /// in others #[derive(Copy, Clone)] diff --git a/src/merge_operator.rs b/src/merge_operator.rs index 6085a1e..da09aee 100644 --- a/src/merge_operator.rs +++ b/src/merge_operator.rs @@ -21,7 +21,7 @@ //! fn concat_merge(new_key: &[u8], //! existing_val: Option<&[u8]>, //! operands: &mut MergeOperands) -//! -> Vec { +//! -> Option> { //! //! let mut result: Vec = Vec::with_capacity(operands.size_hint().0); //! existing_val.map(|v| { @@ -34,14 +34,14 @@ //! result.push(*e) //! } //! } -//! result +//! Some(result) //! } //! //! fn main() { //! let path = "path/to/rocksdb"; //! let mut opts = Options::default(); //! opts.create_if_missing(true); -//! opts.add_merge_operator("test operator", concat_merge); +//! opts.set_merge_operator("test operator", concat_merge, None); //! let db = DB::open(&opts, path).unwrap(); //! let p = db.put(b"k1", b"a"); //! db.merge(b"k1", b"b"); @@ -60,183 +60,405 @@ use std::mem; use std::ptr; use std::slice; -pub type MergeFn = fn(&[u8], Option<&[u8]>, &mut MergeOperands) -> Vec; +pub type MergeFn = fn(&[u8], Option<&[u8]>, &mut MergeOperands) -> Option>; pub struct MergeOperatorCallback { - pub name: CString, - pub merge_fn: MergeFn, + pub name: CString, + pub full_merge_fn: MergeFn, + pub partial_merge_fn: MergeFn, } pub unsafe extern "C" fn destructor_callback(raw_cb: *mut c_void) { - let _: Box = mem::transmute(raw_cb); + let _: Box = mem::transmute(raw_cb); } pub unsafe extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char { - let cb = &mut *(raw_cb as *mut MergeOperatorCallback); - cb.name.as_ptr() + let cb = &mut *(raw_cb as *mut MergeOperatorCallback); + cb.name.as_ptr() } pub unsafe extern "C" fn full_merge_callback( - raw_cb: *mut c_void, - raw_key: *const c_char, - key_len: size_t, - existing_value: *const c_char, - existing_value_len: size_t, - operands_list: *const *const c_char, - operands_list_len: *const size_t, - num_operands: c_int, - success: *mut u8, - new_value_length: *mut size_t, -) -> *mut c_char { - let cb = &mut *(raw_cb as *mut MergeOperatorCallback); - let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); - let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize); - let oldval = slice::from_raw_parts(existing_value as *const u8, existing_value_len as usize); - let mut result = (cb.merge_fn)(key, Some(oldval), operands); - result.shrink_to_fit(); - // TODO(tan) investigate zero-copy techniques to improve performance - let buf = libc::malloc(result.len() as size_t); - assert!(!buf.is_null()); - *new_value_length = result.len() as size_t; - *success = 1 as u8; - ptr::copy(result.as_ptr() as *mut c_void, &mut *buf, result.len()); - buf as *mut c_char + raw_cb: *mut c_void, + raw_key: *const c_char, + key_len: size_t, + existing_value: *const c_char, + existing_value_len: size_t, + operands_list: *const *const c_char, + operands_list_len: *const size_t, + num_operands: c_int, + success: *mut u8, + new_value_length: *mut size_t, + ) -> *mut c_char { + let cb = &mut *(raw_cb as *mut MergeOperatorCallback); + let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); + let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize); + let oldval = + if existing_value == ptr::null() { + None + } else { + Some(slice::from_raw_parts(existing_value as *const u8, existing_value_len as usize)) + }; + if let Some(mut result) = (cb.full_merge_fn)(key, oldval, operands) { + result.shrink_to_fit(); + // TODO(tan) investigate zero-copy techniques to improve performance + let buf = libc::malloc(result.len() as size_t); + assert!(!buf.is_null()); + *new_value_length = result.len() as size_t; + *success = 1 as u8; + ptr::copy(result.as_ptr() as *mut c_void, &mut *buf, result.len()); + buf as *mut c_char + } else { + *success = 0 as u8; + ptr::null_mut() as *mut c_char + } } pub unsafe extern "C" fn partial_merge_callback( - raw_cb: *mut c_void, - raw_key: *const c_char, - key_len: size_t, - operands_list: *const *const c_char, - operands_list_len: *const size_t, - num_operands: c_int, - success: *mut u8, - new_value_length: *mut size_t, -) -> *mut c_char { - let cb = &mut *(raw_cb as *mut MergeOperatorCallback); - let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); - let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize); - let mut result = (cb.merge_fn)(key, None, operands); - result.shrink_to_fit(); - // TODO(tan) investigate zero-copy techniques to improve performance - let buf = libc::malloc(result.len() as size_t); - assert!(!buf.is_null()); - *new_value_length = result.len() as size_t; - *success = 1 as u8; - ptr::copy(result.as_ptr() as *mut c_void, &mut *buf, result.len()); - buf as *mut c_char + raw_cb: *mut c_void, + raw_key: *const c_char, + key_len: size_t, + operands_list: *const *const c_char, + operands_list_len: *const size_t, + num_operands: c_int, + success: *mut u8, + new_value_length: *mut size_t, + ) -> *mut c_char { + println!("In partial_merge_callback"); + let cb = &mut *(raw_cb as *mut MergeOperatorCallback); + let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); + let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize); + if let Some(mut result) = (cb.partial_merge_fn)(key, None, operands) { + result.shrink_to_fit(); + // TODO(tan) investigate zero-copy techniques to improve performance + let buf = libc::malloc(result.len() as size_t); + assert!(!buf.is_null()); + *new_value_length = result.len() as size_t; + *success = 1 as u8; + ptr::copy(result.as_ptr() as *mut c_void, &mut *buf, result.len()); + buf as *mut c_char + } else { + *success = 0 as u8; + ptr::null_mut::() + } } pub struct MergeOperands { - operands_list: *const *const c_char, - operands_list_len: *const size_t, - num_operands: usize, - cursor: usize, + operands_list: *const *const c_char, + operands_list_len: *const size_t, + num_operands: usize, + cursor: usize, } impl MergeOperands { - fn new( - operands_list: *const *const c_char, - operands_list_len: *const size_t, - num_operands: c_int, - ) -> MergeOperands { - assert!(num_operands >= 0); - MergeOperands { - operands_list: operands_list, - operands_list_len: operands_list_len, - num_operands: num_operands as usize, - cursor: 0, - } - } + fn new( + operands_list: *const *const c_char, + operands_list_len: *const size_t, + num_operands: c_int, + ) -> MergeOperands { + assert!(num_operands >= 0); + MergeOperands { + operands_list: operands_list, + operands_list_len: operands_list_len, + num_operands: num_operands as usize, + cursor: 0, + } + } } impl<'a> Iterator for &'a mut MergeOperands { - type Item = &'a [u8]; - - fn next(&mut self) -> Option<&'a [u8]> { - if self.cursor == self.num_operands { - None - } else { - unsafe { - let base = self.operands_list as usize; - let base_len = self.operands_list_len as usize; - let spacing = mem::size_of::<*const *const u8>(); - let spacing_len = mem::size_of::<*const size_t>(); - let len_ptr = (base_len + (spacing_len * self.cursor)) as *const size_t; - let len = *len_ptr as usize; - let ptr = base + (spacing * self.cursor); - self.cursor += 1; - Some(mem::transmute(slice::from_raw_parts( - *(ptr as *const *const u8) as *const u8, - len, - ))) - } - } - } - - fn size_hint(&self) -> (usize, Option) { - let remaining = self.num_operands - self.cursor; - (remaining, Some(remaining)) - } + type Item = &'a [u8]; + + fn next(&mut self) -> Option<&'a [u8]> { + if self.cursor == self.num_operands { + None + } else { + unsafe { + let base = self.operands_list as usize; + let base_len = self.operands_list_len as usize; + let spacing = mem::size_of::<*const *const u8>(); + let spacing_len = mem::size_of::<*const size_t>(); + let len_ptr = (base_len + (spacing_len * self.cursor)) as *const size_t; + let len = *len_ptr as usize; + let ptr = base + (spacing * self.cursor); + self.cursor += 1; + Some(mem::transmute(slice::from_raw_parts( + *(ptr as *const *const u8) as *const u8, + len, + ))) + } + } + } + + fn size_hint(&self) -> (usize, Option) { + let remaining = self.num_operands - self.cursor; + (remaining, Some(remaining)) + } } #[cfg(test)] -#[allow(unused_variables)] -fn test_provided_merge( - new_key: &[u8], - existing_val: Option<&[u8]>, - operands: &mut MergeOperands, -) -> Vec { - let nops = operands.size_hint().0; - let mut result: Vec = Vec::with_capacity(nops); - if let Some(v) = existing_val { - for e in v { - result.push(*e); - } - } - for op in operands { - for e in op { - result.push(*e); - } - } - result -} +mod test { + + use super::*; + + fn test_provided_merge( + _new_key: &[u8], + existing_val: Option<&[u8]>, + operands: &mut MergeOperands, + ) -> Option> { + let nops = operands.size_hint().0; + let mut result: Vec = Vec::with_capacity(nops); + if let Some(v) = existing_val { + for e in v { + result.push(*e); + } + } + for op in operands { + for e in op { + result.push(*e); + } + } + Some(result) + } +#[ignore] #[test] -fn mergetest() { - use {DB, Options}; - - let path = "_rust_rocksdb_mergetest"; - let mut opts = Options::default(); - opts.create_if_missing(true); - opts.set_merge_operator("test operator", test_provided_merge); - { - let db = DB::open(&opts, path).unwrap(); - let p = db.put(b"k1", b"a"); - assert!(p.is_ok()); - let _ = db.merge(b"k1", b"b"); - let _ = db.merge(b"k1", b"c"); - let _ = db.merge(b"k1", b"d"); - let _ = db.merge(b"k1", b"efg"); - let m = db.merge(b"k1", b"h"); - assert!(m.is_ok()); - match db.get(b"k1") { - Ok(Some(value)) => { - match value.to_utf8() { - Some(v) => println!("retrieved utf8 value: {}", v), - None => println!("did not read valid utf-8 out of the db"), - } - } - Err(_) => println!("error reading value"), - _ => panic!("value not present"), - } - - assert!(m.is_ok()); - let r = db.get(b"k1"); - assert!(r.unwrap().unwrap().to_utf8().unwrap() == "abcdefgh"); - assert!(db.delete(b"k1").is_ok()); - assert!(db.get(b"k1").unwrap().is_none()); - } - assert!(DB::destroy(&opts, path).is_ok()); + fn mergetest() { + use {DB, Options}; + + let path = "_rust_rocksdb_mergetest"; + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.set_merge_operator("test operator", test_provided_merge, None); + { + let db = DB::open(&opts, path).unwrap(); + let p = db.put(b"k1", b"a"); + assert!(p.is_ok()); + let _ = db.merge(b"k1", b"b"); + let _ = db.merge(b"k1", b"c"); + let _ = db.merge(b"k1", b"d"); + let _ = db.merge(b"k1", b"efg"); + let m = db.merge(b"k1", b"h"); + assert!(m.is_ok()); + match db.get(b"k1") { + Ok(Some(value)) => { + match value.to_utf8() { + Some(v) => println!("retrieved utf8 value: {}", v), + None => println!("did not read valid utf-8 out of the db"), + } + } + Err(_) => println!("error reading value"), + _ => panic!("value not present"), + } + + assert!(m.is_ok()); + let r = db.get(b"k1"); + assert!(r.unwrap().unwrap().to_utf8().unwrap() == "abcdefgh"); + assert!(db.delete(b"k1").is_ok()); + assert!(db.get(b"k1").unwrap().is_none()); + } + assert!(DB::destroy(&opts, path).is_ok()); + } + + unsafe fn to_slice(p: &T) -> &[u8] { + ::std::slice::from_raw_parts( + (p as *const T) as *const u8, + ::std::mem::size_of::(), + ) + } + + fn from_slice(s: &[u8]) -> Option<&T> { + if ::std::mem::size_of::() != s.len() { + println!("slice {:?} is len {}, but T is size {}", s, s.len(), ::std::mem::size_of::()); + None + } else { + unsafe { + Some(::std::mem::transmute(s.as_ptr())) + } + } + } + +#[repr(packed)] + + #[derive(Clone, Debug)] + struct ValueCounts { + num_a: u32, + num_b: u32, + num_c: u32, + num_d: u32, + } + + fn test_counting_partial_merge( + _new_key: &[u8], + existing_val: Option<&[u8]>, + operands: &mut MergeOperands, + ) -> Option> { + let nops = operands.size_hint().0; + let mut result: Vec = Vec::with_capacity(nops); + println!("Partial merge operands size hint {}", nops); + for op in operands { + for e in op { + result.push(*e); + } + } + Some(result) + } + + fn test_counting_full_merge( + _new_key: &[u8], + existing_val: Option<&[u8]>, + operands: &mut MergeOperands, + ) -> Option> { + + let nops = operands.size_hint().0; + println!("Full merge operands size hint {}", nops); + let mut counts : ValueCounts = + if let Some(v) = existing_val { + println!("Full merge unpacking ValueCounts from {:?}", v); + from_slice::(v).unwrap().clone() + } else { + println!("Full merge creating new ValueCounts"); + ValueCounts { + num_a: 0, + num_b: 0, + num_c: 0, + num_d: 0 } + }; + + for op in operands { + for e in op { + match *e { + b'a' => counts.num_a += 1, + b'b' => counts.num_b += 1, + b'c' => counts.num_c += 1, + b'd' => counts.num_d += 1, + _ => {} + } + } + } + let slc = unsafe { to_slice(&counts) }; + Some(slc.to_vec()) + } + +#[test] + fn counting_mergetest() { + use std::thread; + use std::time::Duration; + use std::sync::Arc; + use {DB, Options, DBCompactionStyle}; + + let path = "_rust_rocksdb_partial_mergetest"; + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.set_compaction_style(DBCompactionStyle::Universal); + opts.set_min_write_buffer_number_to_merge(10); + + opts.set_merge_operator("sort operator", test_counting_full_merge, Some(test_counting_partial_merge)); + { + let db = Arc::new(DB::open(&opts, path).unwrap()); + let _ = db.delete(b"k1"); + let _ = db.delete(b"k2"); + let _ = db.merge(b"k1", b"a"); + let _ = db.merge(b"k1", b"b"); + let _ = db.merge(b"k1", b"d"); + let _ = db.merge(b"k1", b"a"); + let _ = db.merge(b"k1", b"a"); + let _ = db.merge(b"k1", b"efg"); + for i in 0..500 { + let _ = db.merge(b"k2", b"c"); + if i % 20 == 0 { + let _ = db.get(b"k2"); + } + } + for i in 0..500 { + let _ = db.merge(b"k2", b"c"); + if i % 20 == 0 { + let _ = db.get(b"k2"); + } + } + db.compact_range(None, None); + let d1 = db.clone(); + let d2 = db.clone(); + let d3 = db.clone(); + let h1 = thread::spawn(move || { + for i in 0..500 { + let _ = d1.merge(b"k2", b"c"); + if i % 20 == 0 { + let _ = d1.get(b"k2"); + } + } + for i in 0..500 { + let _ = d1.merge(b"k2", b"a"); + if i % 20 == 0 { + let _ = d1.get(b"k2"); + } + } + }); + let h2 = thread::spawn(move || { + for i in 0..500 { + let _ = d2.merge(b"k2", b"b"); + if i % 20 == 0 { + let _ = d2.get(b"k2"); + } + } + for i in 0..500 { + let _ = d2.merge(b"k2", b"d"); + if i % 20 == 0 { + let _ = d2.get(b"k2"); + } + } + d2.compact_range(None, None); + }); + h2.join(); + let h3 = thread::spawn(move || { + for i in 0..500 { + let _ = d3.merge(b"k2", b"a"); + if i % 20 == 0 { + let _ = d3.get(b"k2"); + } + } + for i in 0..500 { + let _ = d3.merge(b"k2", b"c"); + if i % 20 == 0 { + let _ = d3.get(b"k2"); + } + } + }); + let m = db.merge(b"k1", b"b"); + assert!(m.is_ok()); + h3.join(); + h1.join(); + match db.get(b"k2") { + Ok(Some(value)) => { + match from_slice::(&*value) { + Some(v) => { + assert_eq!(v.num_a, 1000); + assert_eq!(v.num_b, 500); + assert_eq!(v.num_c, 2000); + assert_eq!(v.num_d, 500); + }, + None => panic!("Failed to get ValueCounts from db"), + } + } + Err(_) => println!("error reading value"), + _ => panic!("value not present"), + } + match db.get(b"k1") { + Ok(Some(value)) => { + match from_slice::(&*value) { + Some(v) => { + assert_eq!(v.num_a, 3); + assert_eq!(v.num_b, 2); + assert_eq!(v.num_c, 0); + assert_eq!(v.num_d, 1); + }, + None => panic!("Failed to get ValueCounts from db"), + } + } + Err(_) => println!("error reading value"), + _ => panic!("value not present"), + } + } + assert!(DB::destroy(&opts, path).is_ok()); + } } diff --git a/tests/test_column_family.rs b/tests/test_column_family.rs index 9795b78..85342a9 100644 --- a/tests/test_column_family.rs +++ b/tests/test_column_family.rs @@ -24,7 +24,7 @@ pub fn test_column_family() { { let mut opts = Options::default(); opts.create_if_missing(true); - opts.set_merge_operator("test operator", test_provided_merge); + opts.set_merge_operator("test operator", test_provided_merge, None); let mut db = DB::open(&opts, path).unwrap(); let opts = Options::default(); match db.create_cf("cf1", &opts) { @@ -38,7 +38,7 @@ pub fn test_column_family() { // should fail to open db without specifying same column families { let mut opts = Options::default(); - opts.set_merge_operator("test operator", test_provided_merge); + opts.set_merge_operator("test operator", test_provided_merge, None); match DB::open(&opts, path) { Ok(_) => { panic!("should not have opened DB successfully without \ @@ -56,7 +56,7 @@ pub fn test_column_family() { // should properly open db when specyfing all column families { let mut opts = Options::default(); - opts.set_merge_operator("test operator", test_provided_merge); + opts.set_merge_operator("test operator", test_provided_merge, None); match DB::open_cf(&opts, path, &["cf1"]) { Ok(_) => println!("successfully opened db with column family"), Err(e) => panic!("failed to open db with column family: {}", e), @@ -98,7 +98,7 @@ fn test_merge_operator() { // TODO should be able to write, read, merge, batch, and iterate over a cf { let mut opts = Options::default(); - opts.set_merge_operator("test operator", test_provided_merge); + opts.set_merge_operator("test operator", test_provided_merge, None); let db = match DB::open_cf(&opts, path, &["cf1"]) { Ok(db) => { println!("successfully opened db with column family"); @@ -140,7 +140,7 @@ fn test_merge_operator() { fn test_provided_merge(_: &[u8], existing_val: Option<&[u8]>, operands: &mut MergeOperands) - -> Vec { + -> Option> { let nops = operands.size_hint().0; let mut result: Vec = Vec::with_capacity(nops); match existing_val { @@ -156,5 +156,5 @@ fn test_provided_merge(_: &[u8], result.push(*e); } } - result + Some(result) }