add more configuration capabilities

master
Tyler Neely 10 years ago
parent acba178132
commit 286bd66be3
  1. 35
      README.md
  2. 54
      src/ffi.rs
  3. 1
      src/lib.rs
  4. 31
      src/main.rs
  5. 161
      src/rocksdb.rs

@ -67,6 +67,41 @@ fn main() {
}
```
###### Apply Some Tunings
Please read [the official tuning guide](https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide), and most importantly, measure performance under realistic workloads with realistic hardware.
```rust
use rocksdb::{RocksDBOptions, RocksDB, new_bloom_filter};
use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction;
fn tuned_for_somebody_elses_disk() -> RocksDB {
let path = "_rust_rocksdb_optimizetest";
let opts = RocksDBOptions::new();
opts.create_if_missing(true);
opts.set_block_size(524288);
opts.set_max_open_files(10000);
opts.set_use_fsync(false);
opts.set_bytes_per_sync(8388608);
opts.set_disable_data_sync(false);
opts.set_block_cache_size_mb(1024);
opts.set_table_cache_num_shard_bits(6);
opts.set_max_write_buffer_number(32);
opts.set_write_buffer_size(536870912);
opts.set_target_file_size_base(1073741824);
opts.set_min_write_buffer_number_to_merge(4);
opts.set_level_zero_stop_writes_trigger(2000);
opts.set_level_zero_slowdown_writes_trigger(0);
opts.set_compaction_style(RocksDBUniversalCompaction);
opts.set_max_background_compactions(4);
opts.set_max_background_flushes(4);
opts.set_filter_deletes(false);
opts.set_disable_auto_compactions(true);
let filter = new_bloom_filter(10);
opts.set_filter(filter);
RocksDB::open(opts, path).unwrap()
}
```
### status
- [x] basic open/put/get/delete/close

@ -10,12 +10,20 @@ pub struct RocksDBWriteOptions(pub *const c_void);
#[repr(C)]
pub struct RocksDBReadOptions(pub *const c_void);
#[repr(C)]
pub struct RocksDBCompactionFilter(pub *const c_void);
#[repr(C)]
pub struct RocksDBMergeOperator(pub *const c_void);
#[repr(C)]
pub struct RocksDBBlockBasedTableOptions(pub *const c_void);
#[repr(C)]
pub struct RocksDBCache(pub *const c_void);
#[repr(C)]
pub struct RocksDBFilterPolicy(pub *const c_void);
pub fn new_bloom_filter(bits: c_int) -> RocksDBFilterPolicy {
unsafe {
rocksdb_filterpolicy_create_bloom(bits)
}
}
#[repr(C)]
pub enum RocksDBCompressionType {
RocksDBNoCompression = 0,
@ -42,12 +50,41 @@ pub enum RocksDBUniversalCompactionStyle {
#[link(name = "rocksdb")]
extern {
pub fn rocksdb_options_create() -> RocksDBOptions;
pub fn rocksdb_cache_create_lru(capacity: size_t) -> RocksDBCache;
pub fn rocksdb_cache_destroy(cache: RocksDBCache);
pub fn rocksdb_block_based_options_create() -> RocksDBBlockBasedTableOptions;
pub fn rocksdb_block_based_options_destroy(
block_options: RocksDBBlockBasedTableOptions);
pub fn rocksdb_block_based_options_set_block_size(
block_options: RocksDBBlockBasedTableOptions,
block_size: size_t);
pub fn rocksdb_block_based_options_set_block_size_deviation(
block_options: RocksDBBlockBasedTableOptions,
block_size_deviation: c_int);
pub fn rocksdb_block_based_options_set_block_restart_interval(
block_options: RocksDBBlockBasedTableOptions,
block_restart_interval: c_int);
pub fn rocksdb_block_based_options_set_filter_policy(
block_options: RocksDBBlockBasedTableOptions,
filter_policy: RocksDBFilterPolicy);
pub fn rocksdb_block_based_options_set_no_block_cache(
block_options: RocksDBBlockBasedTableOptions, no_block_cache: bool);
pub fn rocksdb_block_based_options_set_block_cache(
block_options: RocksDBBlockBasedTableOptions, block_cache: RocksDBCache);
pub fn rocksdb_block_based_options_set_block_cache_compressed(
block_options: RocksDBBlockBasedTableOptions,
block_cache_compressed: RocksDBCache);
pub fn rocksdb_block_based_options_set_whole_key_filtering(
ck_options: RocksDBBlockBasedTableOptions, doit: bool);
pub fn rocksdb_options_set_block_based_table_factory(
options: RocksDBOptions,
block_options: RocksDBBlockBasedTableOptions);
pub fn rocksdb_options_increase_parallelism(
options: RocksDBOptions, threads: c_int);
pub fn rocksdb_options_optimize_level_style_compaction(
options: RocksDBOptions, memtable_memory_budget: c_int);
pub fn rocksdb_options_set_create_if_missing(
options: RocksDBOptions, v: c_int);
options: RocksDBOptions, v: bool);
pub fn rocksdb_options_set_max_open_files(
options: RocksDBOptions, files: c_int);
pub fn rocksdb_options_set_use_fsync(
@ -59,11 +96,9 @@ extern {
pub fn rocksdb_options_optimize_for_point_lookup(
options: RocksDBOptions, block_cache_size_mb: u64);
pub fn rocksdb_options_set_table_cache_numshardbits(
options: RocksDBOptions, bits: u64);
options: RocksDBOptions, bits: c_int);
pub fn rocksdb_options_set_max_write_buffer_number(
options: RocksDBOptions, bufno: c_int);
pub fn rocksdb_options_set_max_write_buffer_number_to_merge(
options: RocksDBOptions, bufno: c_int);
pub fn rocksdb_options_set_min_write_buffer_number_to_merge(
options: RocksDBOptions, bufno: c_int);
pub fn rocksdb_options_set_level0_file_num_compaction_trigger(
@ -93,10 +128,9 @@ extern {
pub fn rocksdb_options_set_max_background_flushes(
options: RocksDBOptions, max_bg_flushes: c_int);
pub fn rocksdb_options_set_filter_deletes(
options: RocksDBOptions, v: u8);
options: RocksDBOptions, v: bool);
pub fn rocksdb_options_set_disable_auto_compactions(
options: RocksDBOptions, v: u8);
//pub fn rocksdb_compactionfilter_create() -> RocksDBCompactionFilter;
options: RocksDBOptions, v: c_int);
pub fn rocksdb_filterpolicy_create_bloom(
bits_per_key: c_int) -> RocksDBFilterPolicy;
pub fn rocksdb_open(options: RocksDBOptions,
@ -156,7 +190,7 @@ fn internal() {
rocksdb_options_increase_parallelism(opts, 0);
rocksdb_options_optimize_level_style_compaction(opts, 0);
rocksdb_options_set_create_if_missing(opts, 1);
rocksdb_options_set_create_if_missing(opts, true);
let rustpath = "_rust_rocksdb_internaltest";
let cpath = rustpath.to_c_str();

@ -4,6 +4,7 @@
pub use ffi as rocksdb_ffi;
pub use ffi::{
new_bloom_filter,
RocksDBUniversalCompactionStyle,
RocksDBCompactionStyle,
RocksDBCompressionType,

@ -1,6 +1,6 @@
extern crate rocksdb;
extern crate test;
use rocksdb::{RocksDBOptions, RocksDB, MergeOperands};
use rocksdb::{RocksDBOptions, RocksDB, MergeOperands, new_bloom_filter};
use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction;
use test::Bencher;
@ -24,7 +24,6 @@ fn main() {
db.close();
custom_merge();
optimized();
}
#[allow(dead_code)]
@ -61,7 +60,7 @@ fn custom_merge() {
}
#[allow(dead_code)]
fn optimized() {
fn tuned_for_somebody_elses_disk() -> RocksDB {
let path = "_rust_rocksdb_optimizetest";
let opts = RocksDBOptions::new();
opts.create_if_missing(true);
@ -70,7 +69,7 @@ fn optimized() {
opts.set_use_fsync(false);
opts.set_bytes_per_sync(8388608);
opts.set_disable_data_sync(false);
opts.set_cache_size(8589934592);
opts.set_block_cache_size_mb(1024);
opts.set_table_cache_num_shard_bits(6);
opts.set_max_write_buffer_number(32);
opts.set_write_buffer_size(536870912);
@ -78,32 +77,22 @@ fn optimized() {
opts.set_min_write_buffer_number_to_merge(4);
opts.set_level_zero_stop_writes_trigger(2000);
opts.set_level_zero_slowdown_writes_trigger(0);
//opts.set_memtable_config(newSkipListMemTableConfig());
opts.set_compaction_style(RocksDBUniversalCompaction);
opts.set_max_background_compactions(4);
opts.set_max_background_flushes(4);
opts.set_filter_deletes(false);
opts.set_disable_auto_compaction(true);
//opts.set_filter(filter);
opts.set_disable_auto_compactions(true);
opts.add_merge_operator("test operator", concat_merge);
let db = RocksDB::open(opts, path).unwrap();
let p = db.put(b"k1", b"a");
db.merge(b"k1", b"b");
db.merge(b"k1", b"c");
db.merge(b"k1", b"d");
db.merge(b"k1", b"efg");
let m = db.merge(b"k1", b"h");
let r = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh");
db.close();
RocksDB::destroy(opts, path).is_ok();
let filter = new_bloom_filter(10);
opts.set_filter(filter);
RocksDB::open(opts, path).unwrap()
}
#[allow(dead_code)]
#[bench]
fn writes(b: &mut Bencher) {
let db = RocksDB::open_default("testdb").unwrap();
let db = tuned_for_somebody_elses_disk();
let mut i = 0 as u64;
b.iter(|| {
db.put(i.to_string().as_bytes(), b"v1111");
@ -115,7 +104,7 @@ fn writes(b: &mut Bencher) {
#[allow(dead_code)]
#[bench]
fn reads(b: &mut Bencher) {
let db = RocksDB::open_default("testdb").unwrap();
let db = tuned_for_somebody_elses_disk();
let mut i = 0 as u64;
b.iter(|| {
db.get(i.to_string().as_bytes()).on_error( |e| {

@ -13,6 +13,7 @@ use rocksdb_ffi;
pub struct RocksDBOptions {
inner: rocksdb_ffi::RocksDBOptions,
block_options: rocksdb_ffi::RocksDBBlockBasedTableOptions,
}
impl RocksDBOptions {
@ -23,8 +24,12 @@ impl RocksDBOptions {
if opt_ptr.is_null() {
panic!("Could not create rocksdb options".to_string());
}
let block_opts = rocksdb_ffi::rocksdb_block_based_options_create();
RocksDBOptions{inner: opts}
RocksDBOptions{
inner: opts,
block_options: block_opts,
}
}
}
@ -45,16 +50,13 @@ impl RocksDBOptions {
pub fn create_if_missing(&self, create_if_missing: bool) {
unsafe {
match create_if_missing {
true => rocksdb_ffi::rocksdb_options_set_create_if_missing(
self.inner, 1),
false => rocksdb_ffi::rocksdb_options_set_create_if_missing(
self.inner, 0),
}
rocksdb_ffi::rocksdb_options_set_create_if_missing(
self.inner, create_if_missing);
}
}
pub fn add_merge_operator<'a>(&self, name: &str, merge_fn: fn (&[u8], Option<&[u8]>, &mut MergeOperands) -> Vec<u8>) {
pub fn add_merge_operator<'a>( &self, name: &str,
merge_fn: fn (&[u8], Option<&[u8]>, &mut MergeOperands) -> Vec<u8>) {
let cb = box MergeOperatorCallback {
name: name.to_c_str(),
merge_fn: merge_fn,
@ -72,33 +74,33 @@ impl RocksDBOptions {
}
}
/* block based table options
pub fn set_block_size(&self, size: u64) {
unsafe {
rocksdb_ffi::rocksdb_block_based_options_set_block_size(self.table_options, size);
}
}
pub fn set_cache_size(&self, cache_size: u64) {
unsafe {
rocksdb_ffi::rocksdb_options_set(self.inner, );
rocksdb_ffi::rocksdb_block_based_options_set_block_size(
self.block_options, size);
rocksdb_ffi::rocksdb_options_set_block_based_table_factory(
self.inner,
self.block_options);
}
}
pub fn set_memtable_config(&self,newSkipListMemTableConfig()) {
pub fn set_block_cache_size_mb(&self, cache_size: u64) {
unsafe {
rocksdb_ffi::rocksdb_options_set(self.inner, );
rocksdb_ffi::rocksdb_options_optimize_for_point_lookup(
self.inner, cache_size);
}
}
pub fn set_filter(&self, filter: RocksDBFilterPolicy) {
pub fn set_filter(&self, filter: rocksdb_ffi::RocksDBFilterPolicy) {
unsafe {
rocksdb_ffi::rocksdb_block_based_options_set_filter_policy(self.inner, filter);
rocksdb_ffi::rocksdb_block_based_options_set_filter_policy(
self.block_options, filter);
rocksdb_ffi::rocksdb_options_set_block_based_table_factory(
self.inner,
self.block_options);
}
}
*/
pub fn set_max_open_files(&self, nfiles: c_int) {
unsafe {
rocksdb_ffi::rocksdb_options_set_max_open_files(self.inner, nfiles);
@ -108,17 +110,18 @@ impl RocksDBOptions {
pub fn set_use_fsync(&self, useit: bool) {
unsafe {
match useit {
true => rocksdb_ffi::rocksdb_options_set_use_fsync(
self.inner, 1),
false => rocksdb_ffi::rocksdb_options_set_use_fsync(
self.inner, 0),
true =>
rocksdb_ffi::rocksdb_options_set_use_fsync(self.inner, 1),
false =>
rocksdb_ffi::rocksdb_options_set_use_fsync(self.inner, 0),
}
}
}
pub fn set_bytes_per_sync(&self, nbytes: u64) {
unsafe {
rocksdb_ffi::rocksdb_options_set_bytes_per_sync(self.inner, nbytes);
rocksdb_ffi::rocksdb_options_set_bytes_per_sync(
self.inner, nbytes);
}
}
@ -126,88 +129,110 @@ impl RocksDBOptions {
unsafe {
match disable {
true =>
rocksdb_ffi::rocksdb_options_set_disable_data_sync(self.inner, 1),
rocksdb_ffi::rocksdb_options_set_disable_data_sync(
self.inner, 1),
false =>
rocksdb_ffi::rocksdb_options_set_disable_data_sync(self.inner, 0),
rocksdb_ffi::rocksdb_options_set_disable_data_sync(
self.inner, 0),
}
}
}
pub fn set_table_cache_num_shard_bits(&self, nbits: c_int) {
unsafe {
rocksdb_ffi::rocksdb_options_set_table_cache_numshardbits(self.inner, nbits);
rocksdb_ffi::rocksdb_options_set_table_cache_numshardbits(
self.inner, nbits);
}
}
pub fn set_min_write_buffer_number(&self, nbuf: c_int) {
unsafe {
rocksdb_ffi::rocksdb_options_set_min_write_buffer_number_to_merge(self.inner, nbuf);
rocksdb_ffi::rocksdb_options_set_min_write_buffer_number_to_merge(
self.inner, nbuf);
}
}
pub fn set_max_write_buffer_number(&self, nbuf: c_int) {
unsafe {
rocksdb_ffi::rocksdb_options_set_max_write_buffer_number_to_merge(self.inner, nbuf);
rocksdb_ffi::rocksdb_options_set_max_write_buffer_number(
self.inner, nbuf);
}
}
pub fn set_write_buffer_size(&self, size: size_t) {
unsafe {
rocksdb_ffi::rocksdb_options_set_write_buffer_size(self.inner, size);
rocksdb_ffi::rocksdb_options_set_write_buffer_size(
self.inner, size);
}
}
pub fn set_target_file_size_base(&self, size: u64) {
unsafe {
rocksdb_ffi::rocksdb_options_set_target_file_size_base(self.inner, size);
rocksdb_ffi::rocksdb_options_set_target_file_size_base(
self.inner, size);
}
}
pub fn set_min_write_buffer_number_to_merge(&self, to_merge: c_int) {
unsafe {
rocksdb_ffi::rocksdb_options_set_min_write_buffer_number_to_merge(self.inner, to_merge);
rocksdb_ffi::rocksdb_options_set_min_write_buffer_number_to_merge(
self.inner, to_merge);
}
}
pub fn set_level_zero_slowdown_writes_trigger(&self, n: c_int) {
unsafe {
rocksdb_ffi::rocksdb_options_set_level0_slowdown_writes_trigger(self.inner, n);
rocksdb_ffi::rocksdb_options_set_level0_slowdown_writes_trigger(
self.inner, n);
}
}
pub fn set_level_zero_stop_writes_trigger(&self, n: c_int) {
unsafe {
rocksdb_ffi::rocksdb_options_set_level0_stop_writes_trigger(self.inner, n);
rocksdb_ffi::rocksdb_options_set_level0_stop_writes_trigger(
self.inner, n);
}
}
pub fn set_compaction_style(&self, style: rocksdb_ffi::RocksDBCompactionStyle) {
pub fn set_compaction_style(&self, style:
rocksdb_ffi::RocksDBCompactionStyle) {
unsafe {
rocksdb_ffi::rocksdb_options_set_compaction_style(self.inner, style);
rocksdb_ffi::rocksdb_options_set_compaction_style(
self.inner, style);
}
}
pub fn set_max_background_compactions(&self, n: c_int) {
unsafe {
rocksdb_ffi::rocksdb_options_set_max_background_compactions(self.inner, n);
rocksdb_ffi::rocksdb_options_set_max_background_compactions(
self.inner, n);
}
}
pub fn set_max_background_flushes(&self, n: c_int) {
unsafe {
rocksdb_ffi::rocksdb_options_set_max_background_flushes(self.inner, n);
rocksdb_ffi::rocksdb_options_set_max_background_flushes(
self.inner, n);
}
}
pub fn set_filter_deletes(&self, filter: bool) { // to u8
pub fn set_filter_deletes(&self, filter: bool) {
unsafe {
rocksdb_ffi::rocksdb_options_set_filter_deletes(self.inner, filter);
rocksdb_ffi::rocksdb_options_set_filter_deletes(
self.inner, filter);
}
}
pub fn set_disable_auto_compactions(&self, disable: bool) {
unsafe {
rocksdb_ffi::rocksdb_options_set_disable_auto_compactions(self.inner, disable);
match disable {
true =>
rocksdb_ffi::rocksdb_options_set_disable_auto_compactions(
self.inner, 1),
false =>
rocksdb_ffi::rocksdb_options_set_disable_auto_compactions(
self.inner, 0),
}
}
}
}
@ -240,7 +265,8 @@ impl RocksDB {
Some(error_string) =>
return Err(error_string.to_string()),
None =>
return Err("Could not initialize database.".to_string()),
return Err(
"Could not initialize database.".to_string()),
}
}
if db_ptr.is_null() {
@ -259,14 +285,16 @@ impl RocksDB {
// process currently
let err = 0 as *mut i8;
let result = rocksdb_ffi::rocksdb_destroy_db(opts.inner, cpath_ptr, err);
let result = rocksdb_ffi::rocksdb_destroy_db(
opts.inner, cpath_ptr, err);
if err.is_not_null() {
let cs = CString::new(err as *const i8, true);
match cs.as_str() {
Some(error_string) =>
return Err(error_string.to_string()),
None =>
return Err("Could not initialize database.".to_string()),
return Err(
"Could not initialize database.".to_string()),
}
}
Ok(())
@ -514,7 +542,8 @@ pub struct MergeOperands<'a> {
}
impl <'a> MergeOperands<'a> {
fn new<'a>(operands_list: *const *const c_char, operands_list_len: *const size_t,
fn new<'a>(operands_list: *const *const c_char,
operands_list_len: *const size_t,
num_operands: c_int) -> MergeOperands<'a> {
assert!(num_operands >= 0);
MergeOperands {
@ -537,13 +566,15 @@ impl <'a> Iterator<&'a [u8]> for &'a mut MergeOperands<'a> {
let base_len = self.operands_list_len as uint;
let spacing = mem::size_of::<*const *const u8>();
let spacing_len = mem::size_of::<*const size_t>();
let len_ptr = (base_len + (spacing_len * self.cursor)) as *const size_t;
let len_ptr = (base_len + (spacing_len * self.cursor))
as *const size_t;
let len = *len_ptr as uint;
let ptr = base + (spacing * self.cursor);
let op = from_buf_len(*(ptr as *const *const u8), len);
let des: Option<uint> = from_str(op.as_slice());
self.cursor += 1;
Some(mem::transmute(Slice{data:*(ptr as *const *const u8) as *const u8, len: len}))
Some(mem::transmute(Slice{data:*(ptr as *const *const u8)
as *const u8, len: len}))
}
}
}
@ -568,7 +599,8 @@ extern "C" fn destructor_callback(raw_cb: *mut c_void) {
extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char {
unsafe {
let cb: &mut MergeOperatorCallback = &mut *(raw_cb as *mut MergeOperatorCallback);
let cb: &mut MergeOperatorCallback =
&mut *(raw_cb as *mut MergeOperatorCallback);
let ptr = cb.name.as_ptr();
ptr as *const c_char
}
@ -581,11 +613,17 @@ extern "C" fn full_merge_callback(
num_operands: c_int,
success: *mut u8, new_value_length: *mut size_t) -> *const c_char {
unsafe {
let cb: &mut MergeOperatorCallback = &mut *(raw_cb as *mut MergeOperatorCallback);
let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
let cb: &mut MergeOperatorCallback =
&mut *(raw_cb as *mut MergeOperatorCallback);
let operands =
&mut MergeOperands::new(operands_list,
operands_list_len,
num_operands);
let key = from_buf_len(key as *const u8, key_len as uint);
let oldval = from_buf_len(existing_value as *const u8, existing_value_len as uint);
let mut result = (cb.merge_fn)(key.as_bytes(), Some(oldval.as_bytes()), operands);
let oldval = from_buf_len(existing_value as *const u8,
existing_value_len as uint);
let mut result =
(cb.merge_fn)(key.as_bytes(), Some(oldval.as_bytes()), operands);
result.shrink_to_fit();
/*
let ptr = result.as_ptr();
@ -597,7 +635,8 @@ extern "C" fn full_merge_callback(
assert!(buf.is_not_null());
*new_value_length = result.len() as size_t;
*success = 1 as u8;
ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len());
ptr::copy_memory(&mut *buf, result.as_ptr()
as *const c_void, result.len());
buf as *const c_char
}
}
@ -608,8 +647,11 @@ extern "C" fn partial_merge_callback(
num_operands: c_int,
success: *mut u8, new_value_length: *mut size_t) -> *const c_char {
unsafe {
let cb: &mut MergeOperatorCallback = &mut *(raw_cb as *mut MergeOperatorCallback);
let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
let cb: &mut MergeOperatorCallback =
&mut *(raw_cb as *mut MergeOperatorCallback);
let operands = &mut MergeOperands::new(operands_list,
operands_list_len,
num_operands);
let key = from_buf_len(key as *const u8, key_len as uint);
let mut result = (cb.merge_fn)(key.as_bytes(), None, operands);
result.shrink_to_fit();
@ -618,7 +660,8 @@ extern "C" fn partial_merge_callback(
assert!(buf.is_not_null());
*new_value_length = 1 as size_t;
*success = 1 as u8;
ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len());
ptr::copy_memory(&mut *buf, result.as_ptr()
as *const c_void, result.len());
buf as *const c_char
}
}

Loading…
Cancel
Save