@ -13,6 +13,7 @@ use rocksdb_ffi;
pub struct RocksDBOptions {
inner : rocksdb_ffi ::RocksDBOptions ,
block_options : rocksdb_ffi ::RocksDBBlockBasedTableOptions ,
}
impl RocksDBOptions {
@ -23,8 +24,12 @@ impl RocksDBOptions {
if opt_ptr . is_null ( ) {
panic! ( "Could not create rocksdb options" . to_string ( ) ) ;
}
let block_opts = rocksdb_ffi ::rocksdb_block_based_options_create ( ) ;
RocksDBOptions { inner : opts }
RocksDBOptions {
inner : opts ,
block_options : block_opts ,
}
}
}
@ -45,16 +50,13 @@ impl RocksDBOptions {
pub fn create_if_missing ( & self , create_if_missing : bool ) {
unsafe {
match create_if_missing {
true = > rocksdb_ffi ::rocksdb_options_set_create_if_missing (
self . inner , 1 ) ,
false = > rocksdb_ffi ::rocksdb_options_set_create_if_missing (
self . inner , 0 ) ,
}
rocksdb_ffi ::rocksdb_options_set_create_if_missing (
self . inner , create_if_missing ) ;
}
}
pub fn add_merge_operator < ' a > ( & self , name : & str , merge_fn : fn ( & [ u8 ] , Option < & [ u8 ] > , & mut MergeOperands ) -> Vec < u8 > ) {
pub fn add_merge_operator < ' a > ( & self , name : & str ,
merge_fn : fn ( & [ u8 ] , Option < & [ u8 ] > , & mut MergeOperands ) -> Vec < u8 > ) {
let cb = box MergeOperatorCallback {
name : name . to_c_str ( ) ,
merge_fn : merge_fn ,
@ -71,6 +73,168 @@ impl RocksDBOptions {
rocksdb_ffi ::rocksdb_options_set_merge_operator ( self . inner , mo ) ;
}
}
pub fn set_block_size ( & self , size : u64 ) {
unsafe {
rocksdb_ffi ::rocksdb_block_based_options_set_block_size (
self . block_options , size ) ;
rocksdb_ffi ::rocksdb_options_set_block_based_table_factory (
self . inner ,
self . block_options ) ;
}
}
pub fn set_block_cache_size_mb ( & self , cache_size : u64 ) {
unsafe {
rocksdb_ffi ::rocksdb_options_optimize_for_point_lookup (
self . inner , cache_size ) ;
}
}
pub fn set_filter ( & self , filter : rocksdb_ffi ::RocksDBFilterPolicy ) {
unsafe {
rocksdb_ffi ::rocksdb_block_based_options_set_filter_policy (
self . block_options , filter ) ;
rocksdb_ffi ::rocksdb_options_set_block_based_table_factory (
self . inner ,
self . block_options ) ;
}
}
pub fn set_max_open_files ( & self , nfiles : c_int ) {
unsafe {
rocksdb_ffi ::rocksdb_options_set_max_open_files ( self . inner , nfiles ) ;
}
}
pub fn set_use_fsync ( & self , useit : bool ) {
unsafe {
match useit {
true = >
rocksdb_ffi ::rocksdb_options_set_use_fsync ( self . inner , 1 ) ,
false = >
rocksdb_ffi ::rocksdb_options_set_use_fsync ( self . inner , 0 ) ,
}
}
}
pub fn set_bytes_per_sync ( & self , nbytes : u64 ) {
unsafe {
rocksdb_ffi ::rocksdb_options_set_bytes_per_sync (
self . inner , nbytes ) ;
}
}
pub fn set_disable_data_sync ( & self , disable : bool ) {
unsafe {
match disable {
true = >
rocksdb_ffi ::rocksdb_options_set_disable_data_sync (
self . inner , 1 ) ,
false = >
rocksdb_ffi ::rocksdb_options_set_disable_data_sync (
self . inner , 0 ) ,
}
}
}
pub fn set_table_cache_num_shard_bits ( & self , nbits : c_int ) {
unsafe {
rocksdb_ffi ::rocksdb_options_set_table_cache_numshardbits (
self . inner , nbits ) ;
}
}
pub fn set_min_write_buffer_number ( & self , nbuf : c_int ) {
unsafe {
rocksdb_ffi ::rocksdb_options_set_min_write_buffer_number_to_merge (
self . inner , nbuf ) ;
}
}
pub fn set_max_write_buffer_number ( & self , nbuf : c_int ) {
unsafe {
rocksdb_ffi ::rocksdb_options_set_max_write_buffer_number (
self . inner , nbuf ) ;
}
}
pub fn set_write_buffer_size ( & self , size : size_t ) {
unsafe {
rocksdb_ffi ::rocksdb_options_set_write_buffer_size (
self . inner , size ) ;
}
}
pub fn set_target_file_size_base ( & self , size : u64 ) {
unsafe {
rocksdb_ffi ::rocksdb_options_set_target_file_size_base (
self . inner , size ) ;
}
}
pub fn set_min_write_buffer_number_to_merge ( & self , to_merge : c_int ) {
unsafe {
rocksdb_ffi ::rocksdb_options_set_min_write_buffer_number_to_merge (
self . inner , to_merge ) ;
}
}
pub fn set_level_zero_slowdown_writes_trigger ( & self , n : c_int ) {
unsafe {
rocksdb_ffi ::rocksdb_options_set_level0_slowdown_writes_trigger (
self . inner , n ) ;
}
}
pub fn set_level_zero_stop_writes_trigger ( & self , n : c_int ) {
unsafe {
rocksdb_ffi ::rocksdb_options_set_level0_stop_writes_trigger (
self . inner , n ) ;
}
}
pub fn set_compaction_style ( & self , style :
rocksdb_ffi ::RocksDBCompactionStyle ) {
unsafe {
rocksdb_ffi ::rocksdb_options_set_compaction_style (
self . inner , style ) ;
}
}
pub fn set_max_background_compactions ( & self , n : c_int ) {
unsafe {
rocksdb_ffi ::rocksdb_options_set_max_background_compactions (
self . inner , n ) ;
}
}
pub fn set_max_background_flushes ( & self , n : c_int ) {
unsafe {
rocksdb_ffi ::rocksdb_options_set_max_background_flushes (
self . inner , n ) ;
}
}
pub fn set_filter_deletes ( & self , filter : bool ) {
unsafe {
rocksdb_ffi ::rocksdb_options_set_filter_deletes (
self . inner , filter ) ;
}
}
pub fn set_disable_auto_compactions ( & self , disable : bool ) {
unsafe {
match disable {
true = >
rocksdb_ffi ::rocksdb_options_set_disable_auto_compactions (
self . inner , 1 ) ,
false = >
rocksdb_ffi ::rocksdb_options_set_disable_auto_compactions (
self . inner , 0 ) ,
}
}
}
}
pub struct RocksDB {
@ -101,7 +265,8 @@ impl RocksDB {
Some ( error_string ) = >
return Err ( error_string . to_string ( ) ) ,
None = >
return Err ( "Could not initialize database." . to_string ( ) ) ,
return Err (
"Could not initialize database." . to_string ( ) ) ,
}
}
if db_ptr . is_null ( ) {
@ -110,7 +275,7 @@ impl RocksDB {
Ok ( RocksDB { inner : db } )
}
}
pub fn destroy ( opts : RocksDBOptions , path : & str ) -> Result < ( ) , String > {
unsafe {
let cpath = path . to_c_str ( ) ;
@ -120,14 +285,16 @@ impl RocksDB {
// process currently
let err = 0 as * mut i8 ;
let result = rocksdb_ffi ::rocksdb_destroy_db ( opts . inner , cpath_ptr , err ) ;
let result = rocksdb_ffi ::rocksdb_destroy_db (
opts . inner , cpath_ptr , err ) ;
if err . is_not_null ( ) {
let cs = CString ::new ( err as * const i8 , true ) ;
match cs . as_str ( ) {
Some ( error_string ) = >
return Err ( error_string . to_string ( ) ) ,
None = >
return Err ( "Could not initialize database." . to_string ( ) ) ,
return Err (
"Could not initialize database." . to_string ( ) ) ,
}
}
Ok ( ( ) )
@ -375,8 +542,9 @@ pub struct MergeOperands<'a> {
}
impl < ' a > MergeOperands < ' a > {
fn new < ' a > ( operands_list : * const * const c_char , operands_list_len : * const size_t ,
num_operands : c_int ) -> MergeOperands < ' a > {
fn new < ' a > ( operands_list : * const * const c_char ,
operands_list_len : * const size_t ,
num_operands : c_int ) -> MergeOperands < ' a > {
assert! ( num_operands > = 0 ) ;
MergeOperands {
operands_list : operands_list ,
@ -398,13 +566,15 @@ impl <'a> Iterator<&'a [u8]> for &'a mut MergeOperands<'a> {
let base_len = self . operands_list_len as uint ;
let spacing = mem ::size_of ::< * const * const u8 > ( ) ;
let spacing_len = mem ::size_of ::< * const size_t > ( ) ;
let len_ptr = ( base_len + ( spacing_len * self . cursor ) ) as * const size_t ;
let len_ptr = ( base_len + ( spacing_len * self . cursor ) )
as * const size_t ;
let len = * len_ptr as uint ;
let ptr = base + ( spacing * self . cursor ) ;
let op = from_buf_len ( * ( ptr as * const * const u8 ) , len ) ;
let des : Option < uint > = from_str ( op . as_slice ( ) ) ;
self . cursor + = 1 ;
Some ( mem ::transmute ( Slice { data :* ( ptr as * const * const u8 ) as * const u8 , len : len } ) )
Some ( mem ::transmute ( Slice { data :* ( ptr as * const * const u8 )
as * const u8 , len : len } ) )
}
}
}
@ -424,12 +594,13 @@ struct MergeOperatorCallback {
extern "C" fn destructor_callback ( raw_cb : * mut c_void ) {
// turn this back into a local variable so rust will reclaim it
let _ : Box < MergeOperatorCallback > = unsafe { mem ::transmute ( raw_cb ) } ;
}
extern "C" fn name_callback ( raw_cb : * mut c_void ) -> * const c_char {
unsafe {
let cb : & mut MergeOperatorCallback = & mut * ( raw_cb as * mut MergeOperatorCallback ) ;
let cb : & mut MergeOperatorCallback =
& mut * ( raw_cb as * mut MergeOperatorCallback ) ;
let ptr = cb . name . as_ptr ( ) ;
ptr as * const c_char
}
@ -442,18 +613,30 @@ extern "C" fn full_merge_callback(
num_operands : c_int ,
success : * mut u8 , new_value_length : * mut size_t ) -> * const c_char {
unsafe {
let cb : & mut MergeOperatorCallback = & mut * ( raw_cb as * mut MergeOperatorCallback ) ;
let operands = & mut MergeOperands ::new ( operands_list , operands_list_len , num_operands ) ;
let cb : & mut MergeOperatorCallback =
& mut * ( raw_cb as * mut MergeOperatorCallback ) ;
let operands =
& mut MergeOperands ::new ( operands_list ,
operands_list_len ,
num_operands ) ;
let key = from_buf_len ( key as * const u8 , key_len as uint ) ;
let oldval = from_buf_len ( existing_value as * const u8 , existing_value_len as uint ) ;
let mut result = ( cb . merge_fn ) ( key . as_bytes ( ) , Some ( oldval . as_bytes ( ) ) , operands ) ;
let oldval = from_buf_len ( existing_value as * const u8 ,
existing_value_len as uint ) ;
let mut result =
( cb . merge_fn ) ( key . as_bytes ( ) , Some ( oldval . as_bytes ( ) ) , operands ) ;
result . shrink_to_fit ( ) ;
/*
let ptr = result . as_ptr ( ) ;
mem ::forget ( result ) ;
ptr as * const c_char
* /
//TODO(tan) investigate zero-copy techniques to improve performance
let buf = libc ::malloc ( result . len ( ) as size_t ) ;
assert! ( buf . is_not_null ( ) ) ;
* new_value_length = result . len ( ) as size_t ;
* success = 1 as u8 ;
ptr ::copy_memory ( & mut * buf , result . as_ptr ( ) as * const c_void , result . len ( ) ) ;
ptr ::copy_memory ( & mut * buf , result . as_ptr ( )
as * const c_void , result . len ( ) ) ;
buf as * const c_char
}
}
@ -464,8 +647,11 @@ extern "C" fn partial_merge_callback(
num_operands : c_int ,
success : * mut u8 , new_value_length : * mut size_t ) -> * const c_char {
unsafe {
let cb : & mut MergeOperatorCallback = & mut * ( raw_cb as * mut MergeOperatorCallback ) ;
let operands = & mut MergeOperands ::new ( operands_list , operands_list_len , num_operands ) ;
let cb : & mut MergeOperatorCallback =
& mut * ( raw_cb as * mut MergeOperatorCallback ) ;
let operands = & mut MergeOperands ::new ( operands_list ,
operands_list_len ,
num_operands ) ;
let key = from_buf_len ( key as * const u8 , key_len as uint ) ;
let mut result = ( cb . merge_fn ) ( key . as_bytes ( ) , None , operands ) ;
result . shrink_to_fit ( ) ;
@ -474,7 +660,8 @@ extern "C" fn partial_merge_callback(
assert! ( buf . is_not_null ( ) ) ;
* new_value_length = 1 as size_t ;
* success = 1 as u8 ;
ptr ::copy_memory ( & mut * buf , result . as_ptr ( ) as * const c_void , result . len ( ) ) ;
ptr ::copy_memory ( & mut * buf , result . as_ptr ( )
as * const c_void , result . len ( ) ) ;
buf as * const c_char
}
}
@ -517,7 +704,7 @@ fn mergetest() {
println! ( "did not read valid utf-8 out of the db" ) ,
}
} ) . on_absent ( | | { println! ( "value not present!" ) } )
. on_error ( | e | { println! ( "error reading value" ) } ) ; //: {} ", e) });
. on_error ( | e | { println! ( "error reading value" ) } ) ; //: {", e) });
assert! ( m . is_ok ( ) ) ;
let r : RocksDBResult < RocksDBVector , String > = db . get ( b" k1 " ) ;