From 286bd66be3ab99590fb5b28d142ad54a79a3e70c Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Sun, 14 Dec 2014 21:22:29 -0500 Subject: [PATCH] add more configuration capabilities --- README.md | 35 +++++++++++ src/ffi.rs | 54 +++++++++++++--- src/lib.rs | 1 + src/main.rs | 31 +++------- src/rocksdb.rs | 163 +++++++++++++++++++++++++++++++------------------ 5 files changed, 193 insertions(+), 91 deletions(-) diff --git a/README.md b/README.md index dd4736e..d0b871d 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,41 @@ fn main() { } ``` +###### Apply Some Tunings +Please read [the official tuning guide](https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide), and most importantly, measure performance under realistic workloads with realistic hardware. +```rust +use rocksdb::{RocksDBOptions, RocksDB, new_bloom_filter}; +use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction; + +fn tuned_for_somebody_elses_disk() -> RocksDB { + let path = "_rust_rocksdb_optimizetest"; + let opts = RocksDBOptions::new(); + opts.create_if_missing(true); + opts.set_block_size(524288); + opts.set_max_open_files(10000); + opts.set_use_fsync(false); + opts.set_bytes_per_sync(8388608); + opts.set_disable_data_sync(false); + opts.set_block_cache_size_mb(1024); + opts.set_table_cache_num_shard_bits(6); + opts.set_max_write_buffer_number(32); + opts.set_write_buffer_size(536870912); + opts.set_target_file_size_base(1073741824); + opts.set_min_write_buffer_number_to_merge(4); + opts.set_level_zero_stop_writes_trigger(2000); + opts.set_level_zero_slowdown_writes_trigger(0); + opts.set_compaction_style(RocksDBUniversalCompaction); + opts.set_max_background_compactions(4); + opts.set_max_background_flushes(4); + opts.set_filter_deletes(false); + opts.set_disable_auto_compactions(true); + + let filter = new_bloom_filter(10); + opts.set_filter(filter); + + RocksDB::open(opts, path).unwrap() +} +``` ### status - [x] basic open/put/get/delete/close diff --git a/src/ffi.rs b/src/ffi.rs index 81e778f..10e3d0e 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -10,12 +10,20 @@ pub struct RocksDBWriteOptions(pub *const c_void); #[repr(C)] pub struct RocksDBReadOptions(pub *const c_void); #[repr(C)] -pub struct RocksDBCompactionFilter(pub *const c_void); -#[repr(C)] pub struct RocksDBMergeOperator(pub *const c_void); #[repr(C)] +pub struct RocksDBBlockBasedTableOptions(pub *const c_void); +#[repr(C)] +pub struct RocksDBCache(pub *const c_void); +#[repr(C)] pub struct RocksDBFilterPolicy(pub *const c_void); +pub fn new_bloom_filter(bits: c_int) -> RocksDBFilterPolicy { + unsafe { + rocksdb_filterpolicy_create_bloom(bits) + } +} + #[repr(C)] pub enum RocksDBCompressionType { RocksDBNoCompression = 0, @@ -42,12 +50,41 @@ pub enum RocksDBUniversalCompactionStyle { #[link(name = "rocksdb")] extern { pub fn rocksdb_options_create() -> RocksDBOptions; + pub fn rocksdb_cache_create_lru(capacity: size_t) -> RocksDBCache; + pub fn rocksdb_cache_destroy(cache: RocksDBCache); + pub fn rocksdb_block_based_options_create() -> RocksDBBlockBasedTableOptions; + pub fn rocksdb_block_based_options_destroy( + block_options: RocksDBBlockBasedTableOptions); + pub fn rocksdb_block_based_options_set_block_size( + block_options: RocksDBBlockBasedTableOptions, + block_size: size_t); + pub fn rocksdb_block_based_options_set_block_size_deviation( + block_options: RocksDBBlockBasedTableOptions, + block_size_deviation: c_int); + pub fn rocksdb_block_based_options_set_block_restart_interval( + block_options: RocksDBBlockBasedTableOptions, + block_restart_interval: c_int); + pub fn rocksdb_block_based_options_set_filter_policy( + block_options: RocksDBBlockBasedTableOptions, + filter_policy: RocksDBFilterPolicy); + pub fn rocksdb_block_based_options_set_no_block_cache( + block_options: RocksDBBlockBasedTableOptions, no_block_cache: bool); + pub fn rocksdb_block_based_options_set_block_cache( + block_options: RocksDBBlockBasedTableOptions, block_cache: RocksDBCache); + pub fn rocksdb_block_based_options_set_block_cache_compressed( + block_options: RocksDBBlockBasedTableOptions, + block_cache_compressed: RocksDBCache); + pub fn rocksdb_block_based_options_set_whole_key_filtering( + ck_options: RocksDBBlockBasedTableOptions, doit: bool); + pub fn rocksdb_options_set_block_based_table_factory( + options: RocksDBOptions, + block_options: RocksDBBlockBasedTableOptions); pub fn rocksdb_options_increase_parallelism( options: RocksDBOptions, threads: c_int); pub fn rocksdb_options_optimize_level_style_compaction( options: RocksDBOptions, memtable_memory_budget: c_int); pub fn rocksdb_options_set_create_if_missing( - options: RocksDBOptions, v: c_int); + options: RocksDBOptions, v: bool); pub fn rocksdb_options_set_max_open_files( options: RocksDBOptions, files: c_int); pub fn rocksdb_options_set_use_fsync( @@ -59,11 +96,9 @@ extern { pub fn rocksdb_options_optimize_for_point_lookup( options: RocksDBOptions, block_cache_size_mb: u64); pub fn rocksdb_options_set_table_cache_numshardbits( - options: RocksDBOptions, bits: u64); + options: RocksDBOptions, bits: c_int); pub fn rocksdb_options_set_max_write_buffer_number( options: RocksDBOptions, bufno: c_int); - pub fn rocksdb_options_set_max_write_buffer_number_to_merge( - options: RocksDBOptions, bufno: c_int); pub fn rocksdb_options_set_min_write_buffer_number_to_merge( options: RocksDBOptions, bufno: c_int); pub fn rocksdb_options_set_level0_file_num_compaction_trigger( @@ -93,10 +128,9 @@ extern { pub fn rocksdb_options_set_max_background_flushes( options: RocksDBOptions, max_bg_flushes: c_int); pub fn rocksdb_options_set_filter_deletes( - options: RocksDBOptions, v: u8); + options: RocksDBOptions, v: bool); pub fn rocksdb_options_set_disable_auto_compactions( - options: RocksDBOptions, v: u8); - //pub fn rocksdb_compactionfilter_create() -> RocksDBCompactionFilter; + options: RocksDBOptions, v: c_int); pub fn rocksdb_filterpolicy_create_bloom( bits_per_key: c_int) -> RocksDBFilterPolicy; pub fn rocksdb_open(options: RocksDBOptions, @@ -156,7 +190,7 @@ fn internal() { rocksdb_options_increase_parallelism(opts, 0); rocksdb_options_optimize_level_style_compaction(opts, 0); - rocksdb_options_set_create_if_missing(opts, 1); + rocksdb_options_set_create_if_missing(opts, true); let rustpath = "_rust_rocksdb_internaltest"; let cpath = rustpath.to_c_str(); diff --git a/src/lib.rs b/src/lib.rs index daae330..e20020d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ pub use ffi as rocksdb_ffi; pub use ffi::{ + new_bloom_filter, RocksDBUniversalCompactionStyle, RocksDBCompactionStyle, RocksDBCompressionType, diff --git a/src/main.rs b/src/main.rs index eb8ee47..ce69e24 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ extern crate rocksdb; extern crate test; -use rocksdb::{RocksDBOptions, RocksDB, MergeOperands}; +use rocksdb::{RocksDBOptions, RocksDB, MergeOperands, new_bloom_filter}; use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction; use test::Bencher; @@ -24,7 +24,6 @@ fn main() { db.close(); custom_merge(); - optimized(); } #[allow(dead_code)] @@ -61,7 +60,7 @@ fn custom_merge() { } #[allow(dead_code)] -fn optimized() { +fn tuned_for_somebody_elses_disk() -> RocksDB { let path = "_rust_rocksdb_optimizetest"; let opts = RocksDBOptions::new(); opts.create_if_missing(true); @@ -70,7 +69,7 @@ fn optimized() { opts.set_use_fsync(false); opts.set_bytes_per_sync(8388608); opts.set_disable_data_sync(false); - opts.set_cache_size(8589934592); + opts.set_block_cache_size_mb(1024); opts.set_table_cache_num_shard_bits(6); opts.set_max_write_buffer_number(32); opts.set_write_buffer_size(536870912); @@ -78,32 +77,22 @@ fn optimized() { opts.set_min_write_buffer_number_to_merge(4); opts.set_level_zero_stop_writes_trigger(2000); opts.set_level_zero_slowdown_writes_trigger(0); - //opts.set_memtable_config(newSkipListMemTableConfig()); opts.set_compaction_style(RocksDBUniversalCompaction); opts.set_max_background_compactions(4); opts.set_max_background_flushes(4); opts.set_filter_deletes(false); - opts.set_disable_auto_compaction(true); - //opts.set_filter(filter); + opts.set_disable_auto_compactions(true); - opts.add_merge_operator("test operator", concat_merge); - let db = RocksDB::open(opts, path).unwrap(); - let p = db.put(b"k1", b"a"); - db.merge(b"k1", b"b"); - db.merge(b"k1", b"c"); - db.merge(b"k1", b"d"); - db.merge(b"k1", b"efg"); - let m = db.merge(b"k1", b"h"); - let r = db.get(b"k1"); - assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh"); - db.close(); - RocksDB::destroy(opts, path).is_ok(); + let filter = new_bloom_filter(10); + opts.set_filter(filter); + + RocksDB::open(opts, path).unwrap() } #[allow(dead_code)] #[bench] fn writes(b: &mut Bencher) { - let db = RocksDB::open_default("testdb").unwrap(); + let db = tuned_for_somebody_elses_disk(); let mut i = 0 as u64; b.iter(|| { db.put(i.to_string().as_bytes(), b"v1111"); @@ -115,7 +104,7 @@ fn writes(b: &mut Bencher) { #[allow(dead_code)] #[bench] fn reads(b: &mut Bencher) { - let db = RocksDB::open_default("testdb").unwrap(); + let db = tuned_for_somebody_elses_disk(); let mut i = 0 as u64; b.iter(|| { db.get(i.to_string().as_bytes()).on_error( |e| { diff --git a/src/rocksdb.rs b/src/rocksdb.rs index fd5fff6..e9131d9 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -13,6 +13,7 @@ use rocksdb_ffi; pub struct RocksDBOptions { inner: rocksdb_ffi::RocksDBOptions, + block_options: rocksdb_ffi::RocksDBBlockBasedTableOptions, } impl RocksDBOptions { @@ -23,8 +24,12 @@ impl RocksDBOptions { if opt_ptr.is_null() { panic!("Could not create rocksdb options".to_string()); } + let block_opts = rocksdb_ffi::rocksdb_block_based_options_create(); - RocksDBOptions{inner: opts} + RocksDBOptions{ + inner: opts, + block_options: block_opts, + } } } @@ -45,16 +50,13 @@ impl RocksDBOptions { pub fn create_if_missing(&self, create_if_missing: bool) { unsafe { - match create_if_missing { - true => rocksdb_ffi::rocksdb_options_set_create_if_missing( - self.inner, 1), - false => rocksdb_ffi::rocksdb_options_set_create_if_missing( - self.inner, 0), - } + rocksdb_ffi::rocksdb_options_set_create_if_missing( + self.inner, create_if_missing); } } - pub fn add_merge_operator<'a>(&self, name: &str, merge_fn: fn (&[u8], Option<&[u8]>, &mut MergeOperands) -> Vec) { + pub fn add_merge_operator<'a>( &self, name: &str, + merge_fn: fn (&[u8], Option<&[u8]>, &mut MergeOperands) -> Vec) { let cb = box MergeOperatorCallback { name: name.to_c_str(), merge_fn: merge_fn, @@ -72,33 +74,33 @@ impl RocksDBOptions { } } - /* block based table options pub fn set_block_size(&self, size: u64) { unsafe { - rocksdb_ffi::rocksdb_block_based_options_set_block_size(self.table_options, size); + rocksdb_ffi::rocksdb_block_based_options_set_block_size( + self.block_options, size); + rocksdb_ffi::rocksdb_options_set_block_based_table_factory( + self.inner, + self.block_options); } } - pub fn set_cache_size(&self, cache_size: u64) { + pub fn set_block_cache_size_mb(&self, cache_size: u64) { unsafe { - rocksdb_ffi::rocksdb_options_set(self.inner, ); + rocksdb_ffi::rocksdb_options_optimize_for_point_lookup( + self.inner, cache_size); } } - pub fn set_memtable_config(&self,newSkipListMemTableConfig()) { + pub fn set_filter(&self, filter: rocksdb_ffi::RocksDBFilterPolicy) { unsafe { - rocksdb_ffi::rocksdb_options_set(self.inner, ); + rocksdb_ffi::rocksdb_block_based_options_set_filter_policy( + self.block_options, filter); + rocksdb_ffi::rocksdb_options_set_block_based_table_factory( + self.inner, + self.block_options); } } - pub fn set_filter(&self, filter: RocksDBFilterPolicy) { - unsafe { - rocksdb_ffi::rocksdb_block_based_options_set_filter_policy(self.inner, filter); - } - } - - */ - pub fn set_max_open_files(&self, nfiles: c_int) { unsafe { rocksdb_ffi::rocksdb_options_set_max_open_files(self.inner, nfiles); @@ -108,17 +110,18 @@ impl RocksDBOptions { pub fn set_use_fsync(&self, useit: bool) { unsafe { match useit { - true => rocksdb_ffi::rocksdb_options_set_use_fsync( - self.inner, 1), - false => rocksdb_ffi::rocksdb_options_set_use_fsync( - self.inner, 0), + true => + rocksdb_ffi::rocksdb_options_set_use_fsync(self.inner, 1), + false => + rocksdb_ffi::rocksdb_options_set_use_fsync(self.inner, 0), } } } pub fn set_bytes_per_sync(&self, nbytes: u64) { unsafe { - rocksdb_ffi::rocksdb_options_set_bytes_per_sync(self.inner, nbytes); + rocksdb_ffi::rocksdb_options_set_bytes_per_sync( + self.inner, nbytes); } } @@ -126,88 +129,110 @@ impl RocksDBOptions { unsafe { match disable { true => - rocksdb_ffi::rocksdb_options_set_disable_data_sync(self.inner, 1), + rocksdb_ffi::rocksdb_options_set_disable_data_sync( + self.inner, 1), false => - rocksdb_ffi::rocksdb_options_set_disable_data_sync(self.inner, 0), + rocksdb_ffi::rocksdb_options_set_disable_data_sync( + self.inner, 0), } } } pub fn set_table_cache_num_shard_bits(&self, nbits: c_int) { unsafe { - rocksdb_ffi::rocksdb_options_set_table_cache_numshardbits(self.inner, nbits); + rocksdb_ffi::rocksdb_options_set_table_cache_numshardbits( + self.inner, nbits); } } pub fn set_min_write_buffer_number(&self, nbuf: c_int) { unsafe { - rocksdb_ffi::rocksdb_options_set_min_write_buffer_number_to_merge(self.inner, nbuf); + rocksdb_ffi::rocksdb_options_set_min_write_buffer_number_to_merge( + self.inner, nbuf); } } pub fn set_max_write_buffer_number(&self, nbuf: c_int) { unsafe { - rocksdb_ffi::rocksdb_options_set_max_write_buffer_number_to_merge(self.inner, nbuf); + rocksdb_ffi::rocksdb_options_set_max_write_buffer_number( + self.inner, nbuf); } } pub fn set_write_buffer_size(&self, size: size_t) { unsafe { - rocksdb_ffi::rocksdb_options_set_write_buffer_size(self.inner, size); + rocksdb_ffi::rocksdb_options_set_write_buffer_size( + self.inner, size); } } pub fn set_target_file_size_base(&self, size: u64) { unsafe { - rocksdb_ffi::rocksdb_options_set_target_file_size_base(self.inner, size); + rocksdb_ffi::rocksdb_options_set_target_file_size_base( + self.inner, size); } } pub fn set_min_write_buffer_number_to_merge(&self, to_merge: c_int) { unsafe { - rocksdb_ffi::rocksdb_options_set_min_write_buffer_number_to_merge(self.inner, to_merge); + rocksdb_ffi::rocksdb_options_set_min_write_buffer_number_to_merge( + self.inner, to_merge); } } pub fn set_level_zero_slowdown_writes_trigger(&self, n: c_int) { unsafe { - rocksdb_ffi::rocksdb_options_set_level0_slowdown_writes_trigger(self.inner, n); + rocksdb_ffi::rocksdb_options_set_level0_slowdown_writes_trigger( + self.inner, n); } } pub fn set_level_zero_stop_writes_trigger(&self, n: c_int) { unsafe { - rocksdb_ffi::rocksdb_options_set_level0_stop_writes_trigger(self.inner, n); + rocksdb_ffi::rocksdb_options_set_level0_stop_writes_trigger( + self.inner, n); } } - pub fn set_compaction_style(&self, style: rocksdb_ffi::RocksDBCompactionStyle) { + pub fn set_compaction_style(&self, style: + rocksdb_ffi::RocksDBCompactionStyle) { unsafe { - rocksdb_ffi::rocksdb_options_set_compaction_style(self.inner, style); + rocksdb_ffi::rocksdb_options_set_compaction_style( + self.inner, style); } } pub fn set_max_background_compactions(&self, n: c_int) { unsafe { - rocksdb_ffi::rocksdb_options_set_max_background_compactions(self.inner, n); + rocksdb_ffi::rocksdb_options_set_max_background_compactions( + self.inner, n); } } pub fn set_max_background_flushes(&self, n: c_int) { unsafe { - rocksdb_ffi::rocksdb_options_set_max_background_flushes(self.inner, n); + rocksdb_ffi::rocksdb_options_set_max_background_flushes( + self.inner, n); } } - pub fn set_filter_deletes(&self, filter: bool) { // to u8 + pub fn set_filter_deletes(&self, filter: bool) { unsafe { - rocksdb_ffi::rocksdb_options_set_filter_deletes(self.inner, filter); + rocksdb_ffi::rocksdb_options_set_filter_deletes( + self.inner, filter); } } pub fn set_disable_auto_compactions(&self, disable: bool) { unsafe { - rocksdb_ffi::rocksdb_options_set_disable_auto_compactions(self.inner, disable); + match disable { + true => + rocksdb_ffi::rocksdb_options_set_disable_auto_compactions( + self.inner, 1), + false => + rocksdb_ffi::rocksdb_options_set_disable_auto_compactions( + self.inner, 0), + } } } } @@ -240,7 +265,8 @@ impl RocksDB { Some(error_string) => return Err(error_string.to_string()), None => - return Err("Could not initialize database.".to_string()), + return Err( + "Could not initialize database.".to_string()), } } if db_ptr.is_null() { @@ -259,14 +285,16 @@ impl RocksDB { // process currently let err = 0 as *mut i8; - let result = rocksdb_ffi::rocksdb_destroy_db(opts.inner, cpath_ptr, err); + let result = rocksdb_ffi::rocksdb_destroy_db( + opts.inner, cpath_ptr, err); if err.is_not_null() { let cs = CString::new(err as *const i8, true); match cs.as_str() { Some(error_string) => return Err(error_string.to_string()), None => - return Err("Could not initialize database.".to_string()), + return Err( + "Could not initialize database.".to_string()), } } Ok(()) @@ -514,8 +542,9 @@ pub struct MergeOperands<'a> { } impl <'a> MergeOperands<'a> { - fn new<'a>(operands_list: *const *const c_char, operands_list_len: *const size_t, - num_operands: c_int) -> MergeOperands<'a> { + fn new<'a>(operands_list: *const *const c_char, + operands_list_len: *const size_t, + num_operands: c_int) -> MergeOperands<'a> { assert!(num_operands >= 0); MergeOperands { operands_list: operands_list, @@ -537,13 +566,15 @@ impl <'a> Iterator<&'a [u8]> for &'a mut MergeOperands<'a> { let base_len = self.operands_list_len as uint; let spacing = mem::size_of::<*const *const u8>(); let spacing_len = mem::size_of::<*const size_t>(); - let len_ptr = (base_len + (spacing_len * self.cursor)) as *const size_t; + let len_ptr = (base_len + (spacing_len * self.cursor)) + as *const size_t; let len = *len_ptr as uint; let ptr = base + (spacing * self.cursor); let op = from_buf_len(*(ptr as *const *const u8), len); let des: Option = from_str(op.as_slice()); self.cursor += 1; - Some(mem::transmute(Slice{data:*(ptr as *const *const u8) as *const u8, len: len})) + Some(mem::transmute(Slice{data:*(ptr as *const *const u8) + as *const u8, len: len})) } } } @@ -568,7 +599,8 @@ extern "C" fn destructor_callback(raw_cb: *mut c_void) { extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char { unsafe { - let cb: &mut MergeOperatorCallback = &mut *(raw_cb as *mut MergeOperatorCallback); + let cb: &mut MergeOperatorCallback = + &mut *(raw_cb as *mut MergeOperatorCallback); let ptr = cb.name.as_ptr(); ptr as *const c_char } @@ -581,11 +613,17 @@ extern "C" fn full_merge_callback( num_operands: c_int, success: *mut u8, new_value_length: *mut size_t) -> *const c_char { unsafe { - let cb: &mut MergeOperatorCallback = &mut *(raw_cb as *mut MergeOperatorCallback); - let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); + let cb: &mut MergeOperatorCallback = + &mut *(raw_cb as *mut MergeOperatorCallback); + let operands = + &mut MergeOperands::new(operands_list, + operands_list_len, + num_operands); let key = from_buf_len(key as *const u8, key_len as uint); - let oldval = from_buf_len(existing_value as *const u8, existing_value_len as uint); - let mut result = (cb.merge_fn)(key.as_bytes(), Some(oldval.as_bytes()), operands); + let oldval = from_buf_len(existing_value as *const u8, + existing_value_len as uint); + let mut result = + (cb.merge_fn)(key.as_bytes(), Some(oldval.as_bytes()), operands); result.shrink_to_fit(); /* let ptr = result.as_ptr(); @@ -597,7 +635,8 @@ extern "C" fn full_merge_callback( assert!(buf.is_not_null()); *new_value_length = result.len() as size_t; *success = 1 as u8; - ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len()); + ptr::copy_memory(&mut *buf, result.as_ptr() + as *const c_void, result.len()); buf as *const c_char } } @@ -608,8 +647,11 @@ extern "C" fn partial_merge_callback( num_operands: c_int, success: *mut u8, new_value_length: *mut size_t) -> *const c_char { unsafe { - let cb: &mut MergeOperatorCallback = &mut *(raw_cb as *mut MergeOperatorCallback); - let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); + let cb: &mut MergeOperatorCallback = + &mut *(raw_cb as *mut MergeOperatorCallback); + let operands = &mut MergeOperands::new(operands_list, + operands_list_len, + num_operands); let key = from_buf_len(key as *const u8, key_len as uint); let mut result = (cb.merge_fn)(key.as_bytes(), None, operands); result.shrink_to_fit(); @@ -618,7 +660,8 @@ extern "C" fn partial_merge_callback( assert!(buf.is_not_null()); *new_value_length = 1 as size_t; *success = 1 as u8; - ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len()); + ptr::copy_memory(&mut *buf, result.as_ptr() + as *const c_void, result.len()); buf as *const c_char } }