@ -21,7 +21,7 @@
//! fn concat_merge(new_key: &[u8],
//! fn concat_merge(new_key: &[u8],
//! existing_val: Option<&[u8]>,
//! existing_val: Option<&[u8]>,
//! operands: &mut MergeOperands)
//! operands: &mut MergeOperands)
//! -> Vec<u8> {
//! -> Option< Vec<u8> > {
//!
//!
//! let mut result: Vec<u8> = Vec::with_capacity(operands.size_hint().0);
//! let mut result: Vec<u8> = Vec::with_capacity(operands.size_hint().0);
//! existing_val.map(|v| {
//! existing_val.map(|v| {
@ -34,14 +34,14 @@
//! result.push(*e)
//! result.push(*e)
//! }
//! }
//! }
//! }
//! result
//! Some( result)
//! }
//! }
//!
//!
//! fn main() {
//! fn main() {
//! let path = "path/to/rocksdb";
//! let path = "path/to/rocksdb";
//! let mut opts = Options::default();
//! let mut opts = Options::default();
//! opts.create_if_missing(true);
//! opts.create_if_missing(true);
//! opts.add _merge_operator("test operator", concat_merge);
//! opts.set _merge_operator("test operator", concat_merge, Non e);
//! let db = DB::open(&opts, path).unwrap();
//! let db = DB::open(&opts, path).unwrap();
//! let p = db.put(b"k1", b"a");
//! let p = db.put(b"k1", b"a");
//! db.merge(b"k1", b"b");
//! db.merge(b"k1", b"b");
@ -60,11 +60,12 @@ use std::mem;
use std ::ptr ;
use std ::ptr ;
use std ::slice ;
use std ::slice ;
pub type MergeFn = fn ( & [ u8 ] , Option < & [ u8 ] > , & mut MergeOperands ) -> Vec < u8 > ;
pub type MergeFn = fn ( & [ u8 ] , Option < & [ u8 ] > , & mut MergeOperands ) -> Option < Vec < u8 > > ;
pub struct MergeOperatorCallback {
pub struct MergeOperatorCallback {
pub name : CString ,
pub name : CString ,
pub merge_fn : MergeFn ,
pub full_merge_fn : MergeFn ,
pub partial_merge_fn : MergeFn ,
}
}
pub unsafe extern "C" fn destructor_callback ( raw_cb : * mut c_void ) {
pub unsafe extern "C" fn destructor_callback ( raw_cb : * mut c_void ) {
@ -87,12 +88,17 @@ pub unsafe extern "C" fn full_merge_callback(
num_operands : c_int ,
num_operands : c_int ,
success : * mut u8 ,
success : * mut u8 ,
new_value_length : * mut size_t ,
new_value_length : * mut size_t ,
) -> * mut c_char {
) -> * mut c_char {
let cb = & mut * ( raw_cb as * mut MergeOperatorCallback ) ;
let cb = & mut * ( raw_cb as * mut MergeOperatorCallback ) ;
let operands = & mut MergeOperands ::new ( operands_list , operands_list_len , num_operands ) ;
let operands = & mut MergeOperands ::new ( operands_list , operands_list_len , num_operands ) ;
let key = slice ::from_raw_parts ( raw_key as * const u8 , key_len as usize ) ;
let key = slice ::from_raw_parts ( raw_key as * const u8 , key_len as usize ) ;
let oldval = slice ::from_raw_parts ( existing_value as * const u8 , existing_value_len as usize ) ;
let oldval =
let mut result = ( cb . merge_fn ) ( key , Some ( oldval ) , operands ) ;
if existing_value = = ptr ::null ( ) {
None
} else {
Some ( slice ::from_raw_parts ( existing_value as * const u8 , existing_value_len as usize ) )
} ;
if let Some ( mut result ) = ( cb . full_merge_fn ) ( key , oldval , operands ) {
result . shrink_to_fit ( ) ;
result . shrink_to_fit ( ) ;
// TODO(tan) investigate zero-copy techniques to improve performance
// TODO(tan) investigate zero-copy techniques to improve performance
let buf = libc ::malloc ( result . len ( ) as size_t ) ;
let buf = libc ::malloc ( result . len ( ) as size_t ) ;
@ -101,6 +107,10 @@ pub unsafe extern "C" fn full_merge_callback(
* success = 1 as u8 ;
* success = 1 as u8 ;
ptr ::copy ( result . as_ptr ( ) as * mut c_void , & mut * buf , result . len ( ) ) ;
ptr ::copy ( result . as_ptr ( ) as * mut c_void , & mut * buf , result . len ( ) ) ;
buf as * mut c_char
buf as * mut c_char
} else {
* success = 0 as u8 ;
ptr ::null_mut ( ) as * mut c_char
}
}
}
pub unsafe extern "C" fn partial_merge_callback (
pub unsafe extern "C" fn partial_merge_callback (
@ -112,11 +122,12 @@ pub unsafe extern "C" fn partial_merge_callback(
num_operands : c_int ,
num_operands : c_int ,
success : * mut u8 ,
success : * mut u8 ,
new_value_length : * mut size_t ,
new_value_length : * mut size_t ,
) -> * mut c_char {
) -> * mut c_char {
println! ( "In partial_merge_callback" ) ;
let cb = & mut * ( raw_cb as * mut MergeOperatorCallback ) ;
let cb = & mut * ( raw_cb as * mut MergeOperatorCallback ) ;
let operands = & mut MergeOperands ::new ( operands_list , operands_list_len , num_operands ) ;
let operands = & mut MergeOperands ::new ( operands_list , operands_list_len , num_operands ) ;
let key = slice ::from_raw_parts ( raw_key as * const u8 , key_len as usize ) ;
let key = slice ::from_raw_parts ( raw_key as * const u8 , key_len as usize ) ;
let mut result = ( cb . merge_fn ) ( key , None , operands ) ;
if let Some ( mut result ) = ( cb . partial_ merge_fn) ( key , None , operands ) {
result . shrink_to_fit ( ) ;
result . shrink_to_fit ( ) ;
// TODO(tan) investigate zero-copy techniques to improve performance
// TODO(tan) investigate zero-copy techniques to improve performance
let buf = libc ::malloc ( result . len ( ) as size_t ) ;
let buf = libc ::malloc ( result . len ( ) as size_t ) ;
@ -125,6 +136,10 @@ pub unsafe extern "C" fn partial_merge_callback(
* success = 1 as u8 ;
* success = 1 as u8 ;
ptr ::copy ( result . as_ptr ( ) as * mut c_void , & mut * buf , result . len ( ) ) ;
ptr ::copy ( result . as_ptr ( ) as * mut c_void , & mut * buf , result . len ( ) ) ;
buf as * mut c_char
buf as * mut c_char
} else {
* success = 0 as u8 ;
ptr ::null_mut ::< c_char > ( )
}
}
}
@ -182,12 +197,15 @@ impl<'a> Iterator for &'a mut MergeOperands {
}
}
#[ cfg(test) ]
#[ cfg(test) ]
#[ allow(unused_variables) ]
mod test {
fn test_provided_merge (
new_key : & [ u8 ] ,
use super ::* ;
fn test_provided_merge (
_new_key : & [ u8 ] ,
existing_val : Option < & [ u8 ] > ,
existing_val : Option < & [ u8 ] > ,
operands : & mut MergeOperands ,
operands : & mut MergeOperands ,
) -> Vec < u8 > {
) -> Option < Vec < u8 > > {
let nops = operands . size_hint ( ) . 0 ;
let nops = operands . size_hint ( ) . 0 ;
let mut result : Vec < u8 > = Vec ::with_capacity ( nops ) ;
let mut result : Vec < u8 > = Vec ::with_capacity ( nops ) ;
if let Some ( v ) = existing_val {
if let Some ( v ) = existing_val {
@ -200,17 +218,18 @@ fn test_provided_merge(
result . push ( * e ) ;
result . push ( * e ) ;
}
}
}
}
result
Some ( result )
}
}
#[ ignore ]
#[ test ]
#[ test ]
fn mergetest ( ) {
fn mergetest ( ) {
use { DB , Options } ;
use { DB , Options } ;
let path = "_rust_rocksdb_mergetest" ;
let path = "_rust_rocksdb_mergetest" ;
let mut opts = Options ::default ( ) ;
let mut opts = Options ::default ( ) ;
opts . create_if_missing ( true ) ;
opts . create_if_missing ( true ) ;
opts . set_merge_operator ( "test operator" , test_provided_merge ) ;
opts . set_merge_operator ( "test operator" , test_provided_merge , Non e ) ;
{
{
let db = DB ::open ( & opts , path ) . unwrap ( ) ;
let db = DB ::open ( & opts , path ) . unwrap ( ) ;
let p = db . put ( b" k1 " , b" a " ) ;
let p = db . put ( b" k1 " , b" a " ) ;
@ -239,4 +258,207 @@ fn mergetest() {
assert! ( db . get ( b" k1 " ) . unwrap ( ) . is_none ( ) ) ;
assert! ( db . get ( b" k1 " ) . unwrap ( ) . is_none ( ) ) ;
}
}
assert! ( DB ::destroy ( & opts , path ) . is_ok ( ) ) ;
assert! ( DB ::destroy ( & opts , path ) . is_ok ( ) ) ;
}
unsafe fn to_slice < T : Sized > ( p : & T ) -> & [ u8 ] {
::std ::slice ::from_raw_parts (
( p as * const T ) as * const u8 ,
::std ::mem ::size_of ::< T > ( ) ,
)
}
fn from_slice < T : Sized > ( s : & [ u8 ] ) -> Option < & T > {
if ::std ::mem ::size_of ::< T > ( ) ! = s . len ( ) {
println! ( "slice {:?} is len {}, but T is size {}" , s , s . len ( ) , ::std ::mem ::size_of ::< T > ( ) ) ;
None
} else {
unsafe {
Some ( ::std ::mem ::transmute ( s . as_ptr ( ) ) )
}
}
}
#[ repr(packed) ]
#[ derive(Clone, Debug) ]
struct ValueCounts {
num_a : u32 ,
num_b : u32 ,
num_c : u32 ,
num_d : u32 ,
}
fn test_counting_partial_merge (
_new_key : & [ u8 ] ,
existing_val : Option < & [ u8 ] > ,
operands : & mut MergeOperands ,
) -> Option < Vec < u8 > > {
let nops = operands . size_hint ( ) . 0 ;
let mut result : Vec < u8 > = Vec ::with_capacity ( nops ) ;
println! ( "Partial merge operands size hint {}" , nops ) ;
for op in operands {
for e in op {
result . push ( * e ) ;
}
}
Some ( result )
}
fn test_counting_full_merge (
_new_key : & [ u8 ] ,
existing_val : Option < & [ u8 ] > ,
operands : & mut MergeOperands ,
) -> Option < Vec < u8 > > {
let nops = operands . size_hint ( ) . 0 ;
println! ( "Full merge operands size hint {}" , nops ) ;
let mut counts : ValueCounts =
if let Some ( v ) = existing_val {
println! ( "Full merge unpacking ValueCounts from {:?}" , v ) ;
from_slice ::< ValueCounts > ( v ) . unwrap ( ) . clone ( )
} else {
println! ( "Full merge creating new ValueCounts" ) ;
ValueCounts {
num_a : 0 ,
num_b : 0 ,
num_c : 0 ,
num_d : 0 }
} ;
for op in operands {
for e in op {
match * e {
b'a' = > counts . num_a + = 1 ,
b'b' = > counts . num_b + = 1 ,
b'c' = > counts . num_c + = 1 ,
b'd' = > counts . num_d + = 1 ,
_ = > { }
}
}
}
let slc = unsafe { to_slice ( & counts ) } ;
Some ( slc . to_vec ( ) )
}
#[ test ]
fn counting_mergetest ( ) {
use std ::thread ;
use std ::time ::Duration ;
use std ::sync ::Arc ;
use { DB , Options , DBCompactionStyle } ;
let path = "_rust_rocksdb_partial_mergetest" ;
let mut opts = Options ::default ( ) ;
opts . create_if_missing ( true ) ;
opts . set_compaction_style ( DBCompactionStyle ::Universal ) ;
opts . set_min_write_buffer_number_to_merge ( 10 ) ;
opts . set_merge_operator ( "sort operator" , test_counting_full_merge , Some ( test_counting_partial_merge ) ) ;
{
let db = Arc ::new ( DB ::open ( & opts , path ) . unwrap ( ) ) ;
let _ = db . delete ( b" k1 " ) ;
let _ = db . delete ( b" k2 " ) ;
let _ = db . merge ( b" k1 " , b" a " ) ;
let _ = db . merge ( b" k1 " , b" b " ) ;
let _ = db . merge ( b" k1 " , b" d " ) ;
let _ = db . merge ( b" k1 " , b" a " ) ;
let _ = db . merge ( b" k1 " , b" a " ) ;
let _ = db . merge ( b" k1 " , b" efg " ) ;
for i in 0 .. 500 {
let _ = db . merge ( b" k2 " , b" c " ) ;
if i % 20 = = 0 {
let _ = db . get ( b" k2 " ) ;
}
}
for i in 0 .. 500 {
let _ = db . merge ( b" k2 " , b" c " ) ;
if i % 20 = = 0 {
let _ = db . get ( b" k2 " ) ;
}
}
db . compact_range ( None , None ) ;
let d1 = db . clone ( ) ;
let d2 = db . clone ( ) ;
let d3 = db . clone ( ) ;
let h1 = thread ::spawn ( move | | {
for i in 0 .. 500 {
let _ = d1 . merge ( b" k2 " , b" c " ) ;
if i % 20 = = 0 {
let _ = d1 . get ( b" k2 " ) ;
}
}
for i in 0 .. 500 {
let _ = d1 . merge ( b" k2 " , b" a " ) ;
if i % 20 = = 0 {
let _ = d1 . get ( b" k2 " ) ;
}
}
} ) ;
let h2 = thread ::spawn ( move | | {
for i in 0 .. 500 {
let _ = d2 . merge ( b" k2 " , b" b " ) ;
if i % 20 = = 0 {
let _ = d2 . get ( b" k2 " ) ;
}
}
for i in 0 .. 500 {
let _ = d2 . merge ( b" k2 " , b" d " ) ;
if i % 20 = = 0 {
let _ = d2 . get ( b" k2 " ) ;
}
}
d2 . compact_range ( None , None ) ;
} ) ;
h2 . join ( ) ;
let h3 = thread ::spawn ( move | | {
for i in 0 .. 500 {
let _ = d3 . merge ( b" k2 " , b" a " ) ;
if i % 20 = = 0 {
let _ = d3 . get ( b" k2 " ) ;
}
}
for i in 0 .. 500 {
let _ = d3 . merge ( b" k2 " , b" c " ) ;
if i % 20 = = 0 {
let _ = d3 . get ( b" k2 " ) ;
}
}
} ) ;
let m = db . merge ( b" k1 " , b" b " ) ;
assert! ( m . is_ok ( ) ) ;
h3 . join ( ) ;
h1 . join ( ) ;
match db . get ( b" k2 " ) {
Ok ( Some ( value ) ) = > {
match from_slice ::< ValueCounts > ( & * value ) {
Some ( v ) = > {
assert_eq! ( v . num_a , 1000 ) ;
assert_eq! ( v . num_b , 500 ) ;
assert_eq! ( v . num_c , 2000 ) ;
assert_eq! ( v . num_d , 500 ) ;
} ,
None = > panic! ( "Failed to get ValueCounts from db" ) ,
}
}
Err ( _ ) = > println! ( "error reading value" ) ,
_ = > panic! ( "value not present" ) ,
}
match db . get ( b" k1 " ) {
Ok ( Some ( value ) ) = > {
match from_slice ::< ValueCounts > ( & * value ) {
Some ( v ) = > {
assert_eq! ( v . num_a , 3 ) ;
assert_eq! ( v . num_b , 2 ) ;
assert_eq! ( v . num_c , 0 ) ;
assert_eq! ( v . num_d , 1 ) ;
} ,
None = > panic! ( "Failed to get ValueCounts from db" ) ,
}
}
Err ( _ ) = > println! ( "error reading value" ) ,
_ = > panic! ( "value not present" ) ,
}
}
assert! ( DB ::destroy ( & opts , path ) . is_ok ( ) ) ;
}
}
}