Merge pull request #8 from spacejam/nightly_6.28.15

Nightly 6.28.15 update.
master
Tyler Neely 10 years ago
commit 1f3471ca13
  1. 31
      src/ffi.rs
  2. 5
      src/lib.rs
  3. 136
      src/main.rs
  4. 12
      src/merge_operator.rs
  5. 7
      src/rocksdb.rs
  6. 3
      src/rocksdb_options.rs

@ -15,48 +15,45 @@
*/
extern crate libc;
use self::libc::{c_char, c_int, c_void, size_t};
use std::ffi::CString;
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBOptions(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBInstance(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBWriteOptions(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBReadOptions(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBMergeOperator(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBBlockBasedTableOptions(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBCache(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBFilterPolicy(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBSnapshot(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBIterator(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBCFHandle(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBWriteBatch(pub *const c_void);
impl Copy for RocksDBOptions {}
impl Copy for RocksDBInstance {}
impl Copy for RocksDBWriteOptions {}
impl Copy for RocksDBReadOptions {}
impl Copy for RocksDBMergeOperator {}
impl Copy for RocksDBBlockBasedTableOptions {}
impl Copy for RocksDBCache {}
impl Copy for RocksDBFilterPolicy {}
impl Copy for RocksDBCompactionStyle {}
impl Copy for RocksDBCompressionType {}
impl Copy for RocksDBUniversalCompactionStyle {}
impl Copy for RocksDBSnapshot {}
impl Copy for RocksDBIterator {}
impl Copy for RocksDBCFHandle {}
impl Copy for RocksDBWriteBatch {}
pub fn new_bloom_filter(bits: c_int) -> RocksDBFilterPolicy {
unsafe {
rocksdb_filterpolicy_create_bloom(bits)
@ -329,7 +326,7 @@ fn internal() {
rocksdb_options_set_create_if_missing(opts, true);
let rustpath = "_rust_rocksdb_internaltest";
let cpath = CString::from_slice(rustpath.as_bytes());
let cpath = CString::new(rustpath).unwrap();
let cpath_ptr = cpath.as_ptr();
let err = 0 as *mut i8;

@ -15,12 +15,11 @@
*/
#![crate_id = "rocksdb"]
#![crate_type = "lib"]
#![feature(collections)]
#![feature(core)]
#![feature(libc)]
#![feature(unique)]
#![feature(path_ext)]
#![feature(raw)]
#![feature(vec_push_all)]
pub use ffi as rocksdb_ffi;
pub use ffi::{

@ -13,16 +13,14 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
#![feature(collections)]
#![feature(test)]
#![feature(vec_push_all)]
extern crate rocksdb;
extern crate test;
use rocksdb::{RocksDBOptions, RocksDB, MergeOperands, new_bloom_filter};
use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction;
use test::Bencher;
#[allow(dead_code)]
fn main() {
let path = "/tmp/rust-rocksdb";
let db = RocksDB::open_default(path).unwrap();
@ -44,7 +42,6 @@ fn main() {
custom_merge();
}
#[allow(dead_code)]
fn concat_merge(new_key: &[u8], existing_val: Option<&[u8]>,
mut operands: &mut MergeOperands) -> Vec<u8> {
let mut result: Vec<u8> = Vec::with_capacity(operands.size_hint().0);
@ -58,78 +55,95 @@ fn concat_merge(new_key: &[u8], existing_val: Option<&[u8]>,
result
}
#[allow(dead_code)]
fn custom_merge() {
let path = "_rust_rocksdb_mergetest";
let opts = RocksDBOptions::new();
opts.create_if_missing(true);
opts.add_merge_operator("test operator", concat_merge);
let db = RocksDB::open(opts, path).unwrap();
let p = db.put(b"k1", b"a");
db.put(b"k1", b"a");
db.merge(b"k1", b"b");
db.merge(b"k1", b"c");
db.merge(b"k1", b"d");
db.merge(b"k1", b"efg");
let m = db.merge(b"k1", b"h");
let r = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh");
db.merge(b"k1", b"h");
db.get(b"k1").map( |value| {
match value.to_utf8() {
Some(v) =>
println!("retrieved utf8 value: {}", v),
None =>
println!("did not read valid utf-8 out of the db"),
}
})
.on_absent( || { println!("value not found") })
.on_error( |e| { println!("error retrieving value: {}", e) });
db.close();
RocksDB::destroy(opts, path).is_ok();
}
#[allow(dead_code)]
fn tuned_for_somebody_elses_disk() -> RocksDB {
let path = "_rust_rocksdb_optimizetest";
let opts = RocksDBOptions::new();
opts.create_if_missing(true);
opts.set_block_size(524288);
opts.set_max_open_files(10000);
opts.set_use_fsync(false);
opts.set_bytes_per_sync(8388608);
opts.set_disable_data_sync(false);
opts.set_block_cache_size_mb(1024);
opts.set_table_cache_num_shard_bits(6);
opts.set_max_write_buffer_number(32);
opts.set_write_buffer_size(536870912);
opts.set_target_file_size_base(1073741824);
opts.set_min_write_buffer_number_to_merge(4);
opts.set_level_zero_stop_writes_trigger(2000);
opts.set_level_zero_slowdown_writes_trigger(0);
opts.set_compaction_style(RocksDBUniversalCompaction);
opts.set_max_background_compactions(4);
opts.set_max_background_flushes(4);
opts.set_filter_deletes(false);
opts.set_disable_auto_compactions(true);
let filter = new_bloom_filter(10);
opts.set_filter(filter);
RocksDB::open(opts, path).unwrap()
}
#[allow(dead_code)]
#[bench]
fn writes(b: &mut Bencher) {
let db = tuned_for_somebody_elses_disk();
let mut i = 0 as u64;
b.iter(|| {
db.put(i.to_string().as_bytes(), b"v1111");
i += 1;
});
db.close();
}
#[cfg(test)]
mod tests {
use test::Bencher;
use std::thread::sleep_ms;
use rocksdb::{RocksDBOptions, RocksDB, MergeOperands, new_bloom_filter};
use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction;
fn tuned_for_somebody_elses_disk() -> RocksDB {
let path = "_rust_rocksdb_optimizetest";
let opts = RocksDBOptions::new();
opts.create_if_missing(true);
opts.set_block_size(524288);
opts.set_max_open_files(10000);
opts.set_use_fsync(false);
opts.set_bytes_per_sync(8388608);
opts.set_disable_data_sync(false);
opts.set_block_cache_size_mb(1024);
opts.set_table_cache_num_shard_bits(6);
opts.set_max_write_buffer_number(32);
opts.set_write_buffer_size(536870912);
opts.set_target_file_size_base(1073741824);
opts.set_min_write_buffer_number_to_merge(4);
opts.set_level_zero_stop_writes_trigger(2000);
opts.set_level_zero_slowdown_writes_trigger(0);
opts.set_compaction_style(RocksDBUniversalCompaction);
opts.set_max_background_compactions(4);
opts.set_max_background_flushes(4);
opts.set_filter_deletes(false);
opts.set_disable_auto_compactions(true);
let filter = new_bloom_filter(10);
opts.set_filter(filter);
RocksDB::open(opts, path).unwrap()
}
#[allow(dead_code)]
#[bench]
fn reads(b: &mut Bencher) {
let db = tuned_for_somebody_elses_disk();
let mut i = 0 as u64;
b.iter(|| {
db.get(i.to_string().as_bytes()).on_error( |e| {
println!("error: {}", e);
e
#[bench]
fn writes(b: &mut Bencher) {
// dirty hack due to parallel tests causing contention.
sleep_ms(1000);
let db = tuned_for_somebody_elses_disk();
let mut i = 0 as u64;
b.iter(|| {
db.put(i.to_string().as_bytes(), b"v1111");
i += 1;
});
i += 1;
});
db.close();
db.close();
}
#[bench]
fn reads(b: &mut Bencher) {
let db = tuned_for_somebody_elses_disk();
let mut i = 0 as u64;
b.iter(|| {
db.get(i.to_string().as_bytes()).on_error( |e| {
println!("error: {}", e);
e
});
i += 1;
});
db.close();
}
}

@ -56,9 +56,9 @@ pub extern "C" fn full_merge_callback(
&mut MergeOperands::new(operands_list,
operands_list_len,
num_operands);
let key: &[u8] = mem::transmute(slice::from_raw_buf(&raw_key, key_len as usize));
let oldval: &[u8] = mem::transmute(slice::from_raw_buf(&existing_value,
existing_value_len as usize));
let key: &[u8] = slice::from_raw_parts(raw_key as *const u8, key_len as usize);
let oldval: &[u8] = slice::from_raw_parts(existing_value as *const u8,
existing_value_len as usize);
let mut result =
(cb.merge_fn)(key, Some(oldval), operands);
result.shrink_to_fit();
@ -67,7 +67,7 @@ pub extern "C" fn full_merge_callback(
assert!(!buf.is_null());
*new_value_length = result.len() as size_t;
*success = 1 as u8;
ptr::copy(&mut *buf, result.as_ptr() as *const c_void, result.len());
ptr::copy(result.as_ptr() as *mut c_void, &mut *buf, result.len());
buf as *const c_char
}
}
@ -83,7 +83,7 @@ pub extern "C" fn partial_merge_callback(
let operands = &mut MergeOperands::new(operands_list,
operands_list_len,
num_operands);
let key: &[u8] = mem::transmute(slice::from_raw_buf(&raw_key, key_len as usize));
let key: &[u8] = slice::from_raw_parts(raw_key as *const u8, key_len as usize);
let mut result = (cb.merge_fn)(key, None, operands);
result.shrink_to_fit();
//TODO(tan) investigate zero-copy techniques to improve performance
@ -91,7 +91,7 @@ pub extern "C" fn partial_merge_callback(
assert!(!buf.is_null());
*new_value_length = 1 as size_t;
*success = 1 as u8;
ptr::copy(&mut *buf, result.as_ptr() as *const c_void, result.len());
ptr::copy(result.as_ptr() as *mut c_void, &mut *buf, result.len());
buf as *const c_char
}
}

@ -52,7 +52,7 @@ impl RocksDB {
let ospath = Path::new(path);
if !ospath.exists() {
match fs::create_dir_all(&ospath) {
Err(_) => return Err(""),
Err(e) => return Err("Failed to create rocksdb directory."),
Ok(_) => (),
}
}
@ -198,7 +198,7 @@ pub struct RocksDBVector {
impl Deref for RocksDBVector {
type Target = [u8];
fn deref(&self) -> &[u8] {
unsafe { slice::from_raw_mut_buf(self.base.deref(), self.len) }
unsafe { slice::from_raw_parts(self.base.get(), self.len) }
}
}
@ -213,9 +213,8 @@ impl Drop for RocksDBVector {
impl RocksDBVector {
pub fn from_c(val: *mut u8, val_len: size_t) -> RocksDBVector {
unsafe {
let base = Unique::new(val);
RocksDBVector {
base: base,
base: Unique::new(val),
len: val_len as usize,
}
}

@ -22,13 +22,12 @@ use rocksdb_ffi;
use merge_operator::{MergeOperatorCallback, MergeOperands, destructor_callback, full_merge_callback,
partial_merge_callback, name_callback};
#[derive(Copy, Clone)]
pub struct RocksDBOptions {
pub inner: rocksdb_ffi::RocksDBOptions,
block_options: rocksdb_ffi::RocksDBBlockBasedTableOptions,
}
impl Copy for RocksDBOptions {}
impl RocksDBOptions {
pub fn new() -> RocksDBOptions {
unsafe {

Loading…
Cancel
Save