Merge pull request #14 from dgrnbrg/master

Improve safety of borrow checker with ffi objects
master
Tyler Neely 9 years ago
commit 90b5421cf9
  1. 8
      src/comparator.rs
  2. 27
      src/ffi.rs
  3. 3
      src/lib.rs
  4. 35
      src/main.rs
  5. 10
      src/merge_operator.rs
  6. 115
      src/rocksdb.rs
  7. 168
      src/rocksdb_options.rs

@ -20,7 +20,7 @@ use std::mem;
use std::ptr; use std::ptr;
use std::slice; use std::slice;
use rocksdb_options::RocksDBOptions; use rocksdb_options::Options;
use rocksdb::RocksDB; use rocksdb::RocksDB;
pub struct ComparatorCallback { pub struct ComparatorCallback {
@ -69,11 +69,11 @@ fn test_reverse_compare(a: &[u8], b: &[u8]) -> c_int {
#[test] #[test]
fn compare_works() { fn compare_works() {
let path = "_rust_rocksdb_comparetest"; let path = "_rust_rocksdb_comparetest";
let opts = RocksDBOptions::new(); let mut opts = Options::new();
opts.create_if_missing(true); opts.create_if_missing(true);
opts.add_comparator("test comparator", test_reverse_compare); opts.add_comparator("test comparator", test_reverse_compare);
let db = RocksDB::open(opts, path).unwrap(); let db = RocksDB::open(&opts, path).unwrap();
// TODO add interesting test // TODO add interesting test
db.close(); db.close();
assert!(RocksDB::destroy(opts, path).is_ok()); assert!(RocksDB::destroy(&opts, path).is_ok());
} }

@ -92,14 +92,15 @@ pub enum RocksDBUniversalCompactionStyle {
rocksdb_total_size_compaction_stop_style = 1 rocksdb_total_size_compaction_stop_style = 1
} }
//TODO audit the use of boolean arguments, b/c I think they need to be u8 instead...
#[link(name = "rocksdb")] #[link(name = "rocksdb")]
extern { extern {
pub fn rocksdb_options_create() -> RocksDBOptions; pub fn rocksdb_options_create() -> RocksDBOptions;
pub fn rocksdb_options_destroy(opts: RocksDBOptions);
pub fn rocksdb_cache_create_lru(capacity: size_t) -> RocksDBCache; pub fn rocksdb_cache_create_lru(capacity: size_t) -> RocksDBCache;
pub fn rocksdb_cache_destroy(cache: RocksDBCache); pub fn rocksdb_cache_destroy(cache: RocksDBCache);
pub fn rocksdb_block_based_options_create() -> RocksDBBlockBasedTableOptions; pub fn rocksdb_block_based_options_create() -> RocksDBBlockBasedTableOptions;
pub fn rocksdb_block_based_options_destroy( pub fn rocksdb_block_based_options_destroy(opts: RocksDBBlockBasedTableOptions);
block_options: RocksDBBlockBasedTableOptions);
pub fn rocksdb_block_based_options_set_block_size( pub fn rocksdb_block_based_options_set_block_size(
block_options: RocksDBBlockBasedTableOptions, block_options: RocksDBBlockBasedTableOptions,
block_size: size_t); block_size: size_t);
@ -191,8 +192,26 @@ extern {
err: *mut i8); err: *mut i8);
pub fn rocksdb_readoptions_create() -> RocksDBReadOptions; pub fn rocksdb_readoptions_create() -> RocksDBReadOptions;
pub fn rocksdb_readoptions_destroy(readopts: RocksDBReadOptions); pub fn rocksdb_readoptions_destroy(readopts: RocksDBReadOptions);
pub fn rocksdb_readoptions_set_snapshot(read_opts: RocksDBReadOptions, pub fn rocksdb_readoptions_set_verify_checksums(
snapshot: RocksDBSnapshot); readopts: RocksDBReadOptions,
v: bool);
pub fn rocksdb_readoptions_set_fill_cache(
readopts: RocksDBReadOptions,
v: bool);
pub fn rocksdb_readoptions_set_snapshot(
readopts: RocksDBReadOptions,
snapshot: RocksDBSnapshot); //TODO how do I make this a const ref?
pub fn rocksdb_readoptions_set_iterate_upper_bound(
readopts: RocksDBReadOptions,
k: *const u8,
kLen: size_t);
pub fn rocksdb_readoptions_set_read_tier(
readopts: RocksDBReadOptions,
tier: c_int);
pub fn rocksdb_readoptions_set_tailing(
readopts: RocksDBReadOptions,
v: bool);
pub fn rocksdb_get(db: RocksDBInstance, pub fn rocksdb_get(db: RocksDBInstance,
readopts: RocksDBReadOptions, readopts: RocksDBReadOptions,
k: *const u8, kLen: size_t, k: *const u8, kLen: size_t,

@ -38,7 +38,8 @@ pub use rocksdb::{
Writable, Writable,
}; };
pub use rocksdb_options::{ pub use rocksdb_options::{
RocksDBOptions, Options,
BlockBasedOptions,
}; };
pub use merge_operator::{ pub use merge_operator::{
MergeOperands, MergeOperands,

@ -17,7 +17,7 @@
extern crate rocksdb; extern crate rocksdb;
extern crate test; extern crate test;
use rocksdb::{RocksDBOptions, RocksDB, MergeOperands, new_bloom_filter, Writable}; use rocksdb::{Options, RocksDB, MergeOperands, new_bloom_filter, Writable};
use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction; use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction;
#[cfg(not(feature = "valgrind"))] #[cfg(not(feature = "valgrind"))]
@ -57,10 +57,10 @@ fn concat_merge(new_key: &[u8], existing_val: Option<&[u8]>,
fn custom_merge() { fn custom_merge() {
let path = "_rust_rocksdb_mergetest"; let path = "_rust_rocksdb_mergetest";
let opts = RocksDBOptions::new(); let mut opts = Options::new();
opts.create_if_missing(true); opts.create_if_missing(true);
opts.add_merge_operator("test operator", concat_merge); opts.add_merge_operator("test operator", concat_merge);
let db = RocksDB::open(opts, path).unwrap(); let db = RocksDB::open(&opts, path).unwrap();
db.put(b"k1", b"a"); db.put(b"k1", b"a");
db.merge(b"k1", b"b"); db.merge(b"k1", b"b");
db.merge(b"k1", b"c"); db.merge(b"k1", b"c");
@ -79,16 +79,16 @@ fn custom_merge() {
.on_error( |e| { println!("error retrieving value: {}", e) }); .on_error( |e| { println!("error retrieving value: {}", e) });
db.close(); db.close();
RocksDB::destroy(opts, path).is_ok(); RocksDB::destroy(&opts, path).is_ok();
} }
#[cfg(feature = "valgrind")] #[cfg(feature = "valgrind")]
fn main() { fn main() {
let path = "_rust_rocksdb_valgrind"; let path = "_rust_rocksdb_valgrind";
let opts = RocksDBOptions::new(); let mut opts = Options::new();
opts.create_if_missing(true); opts.create_if_missing(true);
opts.add_merge_operator("test operator", concat_merge); opts.add_merge_operator("test operator", concat_merge);
let db = RocksDB::open(opts, path).unwrap(); let db = RocksDB::open(&opts, path).unwrap();
loop { loop {
db.put(b"k1", b"a"); db.put(b"k1", b"a");
db.merge(b"k1", b"b"); db.merge(b"k1", b"b");
@ -114,12 +114,11 @@ mod tests {
use test::Bencher; use test::Bencher;
use std::thread::sleep_ms; use std::thread::sleep_ms;
use rocksdb::{RocksDBOptions, RocksDB, MergeOperands, new_bloom_filter, Writable}; use rocksdb::{BlockBasedOptions, Options, RocksDB, MergeOperands, new_bloom_filter, Writable};
use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction; use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction;
fn tuned_for_somebody_elses_disk(path: &str, opts: RocksDBOptions) -> RocksDB { fn tuned_for_somebody_elses_disk(path: &str, opts: &mut Options, blockopts: &mut BlockBasedOptions) -> RocksDB {
opts.create_if_missing(true); opts.create_if_missing(true);
opts.set_block_size(524288);
opts.set_max_open_files(10000); opts.set_max_open_files(10000);
opts.set_use_fsync(false); opts.set_use_fsync(false);
opts.set_bytes_per_sync(8388608); opts.set_bytes_per_sync(8388608);
@ -136,12 +135,14 @@ mod tests {
opts.set_max_background_compactions(4); opts.set_max_background_compactions(4);
opts.set_max_background_flushes(4); opts.set_max_background_flushes(4);
opts.set_filter_deletes(false); opts.set_filter_deletes(false);
blockopts.set_block_size(524288);
opts.set_block_based_table_factory(blockopts);
opts.set_disable_auto_compactions(true); opts.set_disable_auto_compactions(true);
let filter = new_bloom_filter(10); let filter = new_bloom_filter(10);
opts.set_filter(filter); //opts.set_filter(filter);
RocksDB::open(opts, path).unwrap() RocksDB::open(&opts, path).unwrap()
} }
#[bench] #[bench]
@ -149,8 +150,9 @@ mod tests {
// dirty hack due to parallel tests causing contention. // dirty hack due to parallel tests causing contention.
sleep_ms(1000); sleep_ms(1000);
let path = "_rust_rocksdb_optimizetest"; let path = "_rust_rocksdb_optimizetest";
let opts = RocksDBOptions::new(); let mut opts = Options::new();
let db = tuned_for_somebody_elses_disk(path, opts); let mut blockopts = BlockBasedOptions::new();
let db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts);
let mut i = 0 as u64; let mut i = 0 as u64;
b.iter(|| { b.iter(|| {
db.put(i.to_string().as_bytes(), b"v1111"); db.put(i.to_string().as_bytes(), b"v1111");
@ -162,8 +164,9 @@ mod tests {
#[bench] #[bench]
fn b_reads(b: &mut Bencher) { fn b_reads(b: &mut Bencher) {
let path = "_rust_rocksdb_optimizetest"; let path = "_rust_rocksdb_optimizetest";
let opts = RocksDBOptions::new(); let mut opts = Options::new();
let db = tuned_for_somebody_elses_disk(path, opts); let mut blockopts = BlockBasedOptions::new();
let db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts);
let mut i = 0 as u64; let mut i = 0 as u64;
b.iter(|| { b.iter(|| {
db.get(i.to_string().as_bytes()).on_error( |e| { db.get(i.to_string().as_bytes()).on_error( |e| {
@ -173,6 +176,6 @@ mod tests {
i += 1; i += 1;
}); });
db.close(); db.close();
RocksDB::destroy(opts, path).is_ok(); RocksDB::destroy(&opts, path).is_ok();
} }
} }

@ -20,7 +20,7 @@ use std::mem;
use std::ptr; use std::ptr;
use std::slice; use std::slice;
use rocksdb_options::{RocksDBOptions}; use rocksdb_options::{Options};
use rocksdb::{RocksDB, RocksDBResult, RocksDBVector, Writable}; use rocksdb::{RocksDB, RocksDBResult, RocksDBVector, Writable};
pub struct MergeOperatorCallback { pub struct MergeOperatorCallback {
@ -166,10 +166,10 @@ fn test_provided_merge(new_key: &[u8], existing_val: Option<&[u8]>,
#[test] #[test]
fn mergetest() { fn mergetest() {
let path = "_rust_rocksdb_mergetest"; let path = "_rust_rocksdb_mergetest";
let opts = RocksDBOptions::new(); let mut opts = Options::new();
opts.create_if_missing(true); opts.create_if_missing(true);
opts.add_merge_operator("test operator", test_provided_merge); opts.add_merge_operator("test operator", test_provided_merge);
let db = RocksDB::open(opts, path).unwrap(); let db = RocksDB::open(&opts, path).unwrap();
let p = db.put(b"k1", b"a"); let p = db.put(b"k1", b"a");
assert!(p.is_ok()); assert!(p.is_ok());
db.merge(b"k1", b"b"); db.merge(b"k1", b"b");
@ -189,10 +189,10 @@ fn mergetest() {
.on_error( |e| { println!("error reading value")}); //: {", e) }); .on_error( |e| { println!("error reading value")}); //: {", e) });
assert!(m.is_ok()); assert!(m.is_ok());
let r: RocksDBResult<RocksDBVector, &str> = db.get(b"k1"); let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh"); assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh");
assert!(db.delete(b"k1").is_ok()); assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").is_none()); assert!(db.get(b"k1").is_none());
db.close(); db.close();
assert!(RocksDB::destroy(opts, path).is_ok()); assert!(RocksDB::destroy(&opts, path).is_ok());
} }

@ -25,45 +25,50 @@ use std::slice;
use std::str::from_utf8; use std::str::from_utf8;
use rocksdb_ffi; use rocksdb_ffi;
use rocksdb_ffi::RocksDBSnapshot; use rocksdb_options::Options;
use rocksdb_options::RocksDBOptions;
pub struct RocksDB { pub struct RocksDB {
inner: rocksdb_ffi::RocksDBInstance, inner: rocksdb_ffi::RocksDBInstance,
} }
pub struct WriteBatch { pub struct WriteBatch {
inner: rocksdb_ffi::RocksDBWriteBatch inner: rocksdb_ffi::RocksDBWriteBatch,
}
pub struct ReadOptions {
inner: rocksdb_ffi::RocksDBReadOptions,
} }
// This is for the RocksDB and write batches to share the same API // This is for the RocksDB and write batches to share the same API
pub trait Writable { pub trait Writable {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), &str>; fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String>;
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), &str>; fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String>;
fn delete(&self, key: &[u8]) -> Result<(),&str>; fn delete(&self, key: &[u8]) -> Result<(), String>;
} }
fn error_message<'a>(ptr: *const i8) -> &'a str { fn error_message(ptr: *const i8) -> String {
unsafe { let c_str = unsafe { CStr::from_ptr(ptr) };
return from_utf8(CStr::from_ptr(ptr).to_bytes()).unwrap(); from_utf8(c_str.to_bytes()).unwrap().to_owned()
}
} }
impl RocksDB { impl RocksDB {
pub fn open_default(path: &str) -> Result<RocksDB, &str> { pub fn open_default(path: &str) -> Result<RocksDB, String> {
let opts = RocksDBOptions::new(); let mut opts = Options::new();
opts.create_if_missing(true); opts.create_if_missing(true);
RocksDB::open(opts, path) RocksDB::open(&opts, path)
} }
pub fn open(opts: RocksDBOptions, path: &str) -> Result<RocksDB, &str> { pub fn open(opts: &Options, path: &str) -> Result<RocksDB, String> {
let cpath = CString::new(path.as_bytes()).unwrap(); let cpath = match CString::new(path.as_bytes()) {
Ok(c) => c,
Err(_) => return Err("Failed to convert path to CString when opening rocksdb".to_string()),
};
let cpath_ptr = cpath.as_ptr(); let cpath_ptr = cpath.as_ptr();
let ospath = Path::new(path); let ospath = Path::new(path);
if !ospath.exists() { if !ospath.exists() {
match fs::create_dir_all(&ospath) { match fs::create_dir_all(&ospath) {
Err(e) => return Err("Failed to create rocksdb directory."), Err(e) => return Err("Failed to create rocksdb directory.".to_string()),
Ok(_) => (), Ok(_) => (),
} }
} }
@ -78,19 +83,20 @@ impl RocksDB {
if !err.is_null() { if !err.is_null() {
return Err(error_message(err)); return Err(error_message(err));
} }
if db.0.is_null() { let rocksdb_ffi::RocksDBInstance(db_ptr) = db;
return Err("Could not initialize database."); if db_ptr.is_null() {
return Err("Could not initialize database.".to_string());
} }
Ok(RocksDB{inner: db}) Ok(RocksDB{inner: db})
} }
pub fn destroy(opts: RocksDBOptions, path: &str) -> Result<(), &str> { pub fn destroy(opts: &Options, path: &str) -> Result<(), String> {
let cpath = CString::new(path.as_bytes()).unwrap(); let cpath = CString::new(path.as_bytes()).unwrap();
let cpath_ptr = cpath.as_ptr(); let cpath_ptr = cpath.as_ptr();
let ospath = Path::new(path); let ospath = Path::new(path);
if !ospath.exists() { if !ospath.exists() {
return Err("path does not exist"); return Err("path does not exist".to_string());
} }
let err = 0 as *mut i8; let err = 0 as *mut i8;
@ -103,13 +109,13 @@ impl RocksDB {
Ok(()) Ok(())
} }
pub fn repair(opts: RocksDBOptions, path: &str) -> Result<(), &str> { pub fn repair(opts: Options, path: &str) -> Result<(), String> {
let cpath = CString::new(path.as_bytes()).unwrap(); let cpath = CString::new(path.as_bytes()).unwrap();
let cpath_ptr = cpath.as_ptr(); let cpath_ptr = cpath.as_ptr();
let ospath = Path::new(path); let ospath = Path::new(path);
if !ospath.exists() { if !ospath.exists() {
return Err("path does not exist"); return Err("path does not exist".to_string());
} }
let err = 0 as *mut i8; let err = 0 as *mut i8;
@ -122,33 +128,27 @@ impl RocksDB {
Ok(()) Ok(())
} }
pub fn create_snapshot(self) -> RocksDBSnapshot { pub fn write(&self, batch: WriteBatch) -> Result<(), String> {
unsafe { let writeopts = unsafe { rocksdb_ffi::rocksdb_writeoptions_create() };
rocksdb_ffi::rocksdb_create_snapshot(self.inner) let err = 0 as *mut i8;
}
}
pub fn write(&self, batch: WriteBatch) -> Result<(), &str> {
unsafe { unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8;
rocksdb_ffi::rocksdb_write(self.inner, writeopts.clone(), batch.inner, err); rocksdb_ffi::rocksdb_write(self.inner, writeopts.clone(), batch.inner, err);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
if !err.is_null() {
return Err(error_message(err));
}
return Ok(())
} }
if !err.is_null() {
return Err(error_message(err));
}
return Ok(())
} }
pub fn get(&self, key: &[u8]) -> RocksDBResult<RocksDBVector, &str> { pub fn get(&self, key: &[u8]) -> RocksDBResult<RocksDBVector, String> {
unsafe { unsafe {
let readopts = rocksdb_ffi::rocksdb_readoptions_create(); let readopts = rocksdb_ffi::rocksdb_readoptions_create();
if readopts.0.is_null() { if readopts.0.is_null() {
return RocksDBResult::Error("Unable to create rocksdb read \ return RocksDBResult::Error("Unable to create rocksdb read \
options. This is a fairly trivial call, and its failure \ options. This is a fairly trivial call, and its failure \
may be indicative of a mis-compiled or mis-loaded rocksdb \ may be indicative of a mis-compiled or mis-loaded rocksdb \
library."); library.".to_string());
} }
let val_len: size_t = 0; let val_len: size_t = 0;
@ -175,7 +175,7 @@ impl RocksDB {
} }
impl Writable for RocksDB { impl Writable for RocksDB {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), &str> { fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe { unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8; let err = 0 as *mut i8;
@ -190,7 +190,7 @@ impl Writable for RocksDB {
} }
} }
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), &str> { fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe { unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8; let err = 0 as *mut i8;
@ -205,7 +205,7 @@ impl Writable for RocksDB {
} }
} }
fn delete(&self, key: &[u8]) -> Result<(),&str> { fn delete(&self, key: &[u8]) -> Result<(), String> {
unsafe { unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8; let err = 0 as *mut i8;
@ -239,7 +239,7 @@ impl Drop for WriteBatch {
} }
impl Writable for WriteBatch { impl Writable for WriteBatch {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), &str> { fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe { unsafe {
rocksdb_ffi::rocksdb_writebatch_put(self.inner, key.as_ptr(), rocksdb_ffi::rocksdb_writebatch_put(self.inner, key.as_ptr(),
key.len() as size_t, value.as_ptr(), key.len() as size_t, value.as_ptr(),
@ -248,7 +248,7 @@ impl Writable for WriteBatch {
} }
} }
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), &str> { fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe { unsafe {
rocksdb_ffi::rocksdb_writebatch_merge(self.inner, key.as_ptr(), rocksdb_ffi::rocksdb_writebatch_merge(self.inner, key.as_ptr(),
key.len() as size_t, value.as_ptr(), key.len() as size_t, value.as_ptr(),
@ -257,7 +257,7 @@ impl Writable for WriteBatch {
} }
} }
fn delete(&self, key: &[u8]) -> Result<(),&str> { fn delete(&self, key: &[u8]) -> Result<(), String> {
unsafe { unsafe {
rocksdb_ffi::rocksdb_writebatch_delete(self.inner, key.as_ptr(), rocksdb_ffi::rocksdb_writebatch_delete(self.inner, key.as_ptr(),
key.len() as size_t); key.len() as size_t);
@ -266,6 +266,23 @@ impl Writable for WriteBatch {
} }
} }
impl Drop for ReadOptions {
fn drop(&mut self) {
unsafe {
rocksdb_ffi::rocksdb_readoptions_destroy(self.inner)
}
}
}
impl ReadOptions {
fn fill_cache(&mut self, v: bool) {
unsafe {
rocksdb_ffi::rocksdb_readoptions_set_fill_cache(self.inner, v);
}
}
}
pub struct RocksDBVector { pub struct RocksDBVector {
base: Unique<u8>, base: Unique<u8>,
len: usize, len: usize,
@ -378,13 +395,13 @@ fn external() {
let db = RocksDB::open_default(path).unwrap(); let db = RocksDB::open_default(path).unwrap();
let p = db.put(b"k1", b"v1111"); let p = db.put(b"k1", b"v1111");
assert!(p.is_ok()); assert!(p.is_ok());
let r: RocksDBResult<RocksDBVector, &str> = db.get(b"k1"); let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "v1111"); assert!(r.unwrap().to_utf8().unwrap() == "v1111");
assert!(db.delete(b"k1").is_ok()); assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").is_none()); assert!(db.get(b"k1").is_none());
db.close(); db.close();
let opts = RocksDBOptions::new(); let opts = Options::new();
assert!(RocksDB::destroy(opts, path).is_ok()); assert!(RocksDB::destroy(&opts, path).is_ok());
} }
#[test] #[test]
@ -398,7 +415,7 @@ fn writebatch_works() {
assert!(db.get(b"k1").is_none()); assert!(db.get(b"k1").is_none());
let p = db.write(batch); let p = db.write(batch);
assert!(p.is_ok()); assert!(p.is_ok());
let r: RocksDBResult<RocksDBVector, &str> = db.get(b"k1"); let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "v1111"); assert!(r.unwrap().to_utf8().unwrap() == "v1111");
} }
{ // test delete { // test delete
@ -409,6 +426,6 @@ fn writebatch_works() {
assert!(db.get(b"k1").is_none()); assert!(db.get(b"k1").is_none());
} }
db.close(); db.close();
let opts = RocksDBOptions::new(); let opts = Options::new();
assert!(RocksDB::destroy(opts, path).is_ok()); assert!(RocksDB::destroy(&opts, path).is_ok());
} }

@ -23,37 +23,91 @@ use merge_operator::{self, MergeOperatorCallback, MergeOperands, full_merge_call
partial_merge_callback}; partial_merge_callback};
use comparator::{self, ComparatorCallback, compare_callback}; use comparator::{self, ComparatorCallback, compare_callback};
#[derive(Copy, Clone)] pub struct BlockBasedOptions {
pub struct RocksDBOptions { inner: rocksdb_ffi::RocksDBBlockBasedTableOptions,
}
pub struct Options {
pub inner: rocksdb_ffi::RocksDBOptions, pub inner: rocksdb_ffi::RocksDBOptions,
block_options: rocksdb_ffi::RocksDBBlockBasedTableOptions,
} }
impl RocksDBOptions { impl Drop for Options {
pub fn new() -> RocksDBOptions { fn drop(&mut self) {
unsafe {
rocksdb_ffi::rocksdb_options_destroy(self.inner);
}
}
}
impl Drop for BlockBasedOptions {
fn drop(&mut self) {
unsafe {
rocksdb_ffi::rocksdb_block_based_options_destroy(self.inner);
}
}
}
impl BlockBasedOptions {
pub fn new() -> BlockBasedOptions {
let block_opts = unsafe {rocksdb_ffi::rocksdb_block_based_options_create() };
let rocksdb_ffi::RocksDBBlockBasedTableOptions(opt_ptr) = block_opts;
if opt_ptr.is_null() {
panic!("Could not create rocksdb block based options".to_string());
}
BlockBasedOptions{ inner: block_opts, }
}
pub fn set_block_size(&mut self, size: u64) {
unsafe {
rocksdb_ffi::rocksdb_block_based_options_set_block_size(
self.inner, size);
}
}
//TODO figure out how to create these in a Rusty way
////pub fn set_filter(&mut self, filter: rocksdb_ffi::RocksDBFilterPolicy) {
//// unsafe {
//// rocksdb_ffi::rocksdb_block_based_options_set_filter_policy(
//// self.inner, filter);
//// }
////}
////pub fn set_cache(&mut self, cache: rocksdb_ffi::RocksDBCache) {
//// unsafe {
//// rocksdb_ffi::rocksdb_block_based_options_set_block_cache(
//// self.inner, cache);
//// }
////}
////pub fn set_cache_compressed(&mut self, cache: rocksdb_ffi::RocksDBCache) {
//// unsafe {
//// rocksdb_ffi::rocksdb_block_based_options_set_block_cache_compressed(
//// self.inner, cache);
//// }
////}
}
impl Options {
pub fn new() -> Options {
unsafe { unsafe {
let opts = rocksdb_ffi::rocksdb_options_create(); let opts = rocksdb_ffi::rocksdb_options_create();
let rocksdb_ffi::RocksDBOptions(opt_ptr) = opts; let rocksdb_ffi::RocksDBOptions(opt_ptr) = opts;
if opt_ptr.is_null() { if opt_ptr.is_null() {
panic!("Could not create rocksdb options".to_string()); panic!("Could not create rocksdb options".to_string());
} }
let block_opts = rocksdb_ffi::rocksdb_block_based_options_create(); Options{ inner: opts, }
RocksDBOptions{
inner: opts,
block_options: block_opts,
}
} }
} }
pub fn increase_parallelism(&self, parallelism: i32) { pub fn increase_parallelism(&mut self, parallelism: i32) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_increase_parallelism( rocksdb_ffi::rocksdb_options_increase_parallelism(
self.inner, parallelism); self.inner, parallelism);
} }
} }
pub fn optimize_level_style_compaction(&self, pub fn optimize_level_style_compaction(&mut self,
memtable_memory_budget: i32) { memtable_memory_budget: i32) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_optimize_level_style_compaction( rocksdb_ffi::rocksdb_options_optimize_level_style_compaction(
@ -61,14 +115,14 @@ impl RocksDBOptions {
} }
} }
pub fn create_if_missing(&self, create_if_missing: bool) { pub fn create_if_missing(&mut self, create_if_missing: bool) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_create_if_missing( rocksdb_ffi::rocksdb_options_set_create_if_missing(
self.inner, create_if_missing); self.inner, create_if_missing);
} }
} }
pub fn add_merge_operator<'a>(&self, name: &str, pub fn add_merge_operator<'a>(&mut self, name: &str,
merge_fn: fn (&[u8], Option<&[u8]>, &mut MergeOperands) -> Vec<u8>) { merge_fn: fn (&[u8], Option<&[u8]>, &mut MergeOperands) -> Vec<u8>) {
let cb = Box::new(MergeOperatorCallback { let cb = Box::new(MergeOperatorCallback {
name: CString::new(name.as_bytes()).unwrap(), name: CString::new(name.as_bytes()).unwrap(),
@ -87,7 +141,7 @@ impl RocksDBOptions {
} }
} }
pub fn add_comparator<'a>(&self, name: &str, compare_fn: fn(&[u8], &[u8]) -> i32) { pub fn add_comparator<'a>(&mut self, name: &str, compare_fn: fn(&[u8], &[u8]) -> i32) {
let cb = Box::new(ComparatorCallback { let cb = Box::new(ComparatorCallback {
name: CString::new(name.as_bytes()).unwrap(), name: CString::new(name.as_bytes()).unwrap(),
f: compare_fn, f: compare_fn,
@ -104,60 +158,20 @@ impl RocksDBOptions {
} }
pub fn set_block_size(&self, size: u64) { pub fn set_block_cache_size_mb(&mut self, cache_size: u64) {
unsafe {
rocksdb_ffi::rocksdb_block_based_options_set_block_size(
self.block_options, size);
rocksdb_ffi::rocksdb_options_set_block_based_table_factory(
self.inner,
self.block_options);
}
}
pub fn set_block_cache_size_mb(&self, cache_size: u64) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_optimize_for_point_lookup( rocksdb_ffi::rocksdb_options_optimize_for_point_lookup(
self.inner, cache_size); self.inner, cache_size);
} }
} }
pub fn set_filter(&self, filter: rocksdb_ffi::RocksDBFilterPolicy) { pub fn set_max_open_files(&mut self, nfiles: c_int) {
unsafe {
rocksdb_ffi::rocksdb_block_based_options_set_filter_policy(
self.block_options, filter);
rocksdb_ffi::rocksdb_options_set_block_based_table_factory(
self.inner,
self.block_options);
}
}
pub fn set_cache(&self, cache: rocksdb_ffi::RocksDBCache) {
unsafe {
rocksdb_ffi::rocksdb_block_based_options_set_block_cache(
self.block_options, cache);
rocksdb_ffi::rocksdb_options_set_block_based_table_factory(
self.inner,
self.block_options);
}
}
pub fn set_cache_compressed(&self, cache: rocksdb_ffi::RocksDBCache) {
unsafe {
rocksdb_ffi::rocksdb_block_based_options_set_block_cache_compressed(
self.block_options, cache);
rocksdb_ffi::rocksdb_options_set_block_based_table_factory(
self.inner,
self.block_options);
}
}
pub fn set_max_open_files(&self, nfiles: c_int) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_max_open_files(self.inner, nfiles); rocksdb_ffi::rocksdb_options_set_max_open_files(self.inner, nfiles);
} }
} }
pub fn set_use_fsync(&self, useit: bool) { pub fn set_use_fsync(&mut self, useit: bool) {
unsafe { unsafe {
match useit { match useit {
true => true =>
@ -168,14 +182,14 @@ impl RocksDBOptions {
} }
} }
pub fn set_bytes_per_sync(&self, nbytes: u64) { pub fn set_bytes_per_sync(&mut self, nbytes: u64) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_bytes_per_sync( rocksdb_ffi::rocksdb_options_set_bytes_per_sync(
self.inner, nbytes); self.inner, nbytes);
} }
} }
pub fn set_disable_data_sync(&self, disable: bool) { pub fn set_disable_data_sync(&mut self, disable: bool) {
unsafe { unsafe {
match disable { match disable {
true => true =>
@ -188,63 +202,63 @@ impl RocksDBOptions {
} }
} }
pub fn set_table_cache_num_shard_bits(&self, nbits: c_int) { pub fn set_table_cache_num_shard_bits(&mut self, nbits: c_int) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_table_cache_numshardbits( rocksdb_ffi::rocksdb_options_set_table_cache_numshardbits(
self.inner, nbits); self.inner, nbits);
} }
} }
pub fn set_min_write_buffer_number(&self, nbuf: c_int) { pub fn set_min_write_buffer_number(&mut self, nbuf: c_int) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_min_write_buffer_number_to_merge( rocksdb_ffi::rocksdb_options_set_min_write_buffer_number_to_merge(
self.inner, nbuf); self.inner, nbuf);
} }
} }
pub fn set_max_write_buffer_number(&self, nbuf: c_int) { pub fn set_max_write_buffer_number(&mut self, nbuf: c_int) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_max_write_buffer_number( rocksdb_ffi::rocksdb_options_set_max_write_buffer_number(
self.inner, nbuf); self.inner, nbuf);
} }
} }
pub fn set_write_buffer_size(&self, size: size_t) { pub fn set_write_buffer_size(&mut self, size: size_t) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_write_buffer_size( rocksdb_ffi::rocksdb_options_set_write_buffer_size(
self.inner, size); self.inner, size);
} }
} }
pub fn set_target_file_size_base(&self, size: u64) { pub fn set_target_file_size_base(&mut self, size: u64) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_target_file_size_base( rocksdb_ffi::rocksdb_options_set_target_file_size_base(
self.inner, size); self.inner, size);
} }
} }
pub fn set_min_write_buffer_number_to_merge(&self, to_merge: c_int) { pub fn set_min_write_buffer_number_to_merge(&mut self, to_merge: c_int) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_min_write_buffer_number_to_merge( rocksdb_ffi::rocksdb_options_set_min_write_buffer_number_to_merge(
self.inner, to_merge); self.inner, to_merge);
} }
} }
pub fn set_level_zero_slowdown_writes_trigger(&self, n: c_int) { pub fn set_level_zero_slowdown_writes_trigger(&mut self, n: c_int) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_level0_slowdown_writes_trigger( rocksdb_ffi::rocksdb_options_set_level0_slowdown_writes_trigger(
self.inner, n); self.inner, n);
} }
} }
pub fn set_level_zero_stop_writes_trigger(&self, n: c_int) { pub fn set_level_zero_stop_writes_trigger(&mut self, n: c_int) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_level0_stop_writes_trigger( rocksdb_ffi::rocksdb_options_set_level0_stop_writes_trigger(
self.inner, n); self.inner, n);
} }
} }
pub fn set_compaction_style(&self, style: pub fn set_compaction_style(&mut self, style:
rocksdb_ffi::RocksDBCompactionStyle) { rocksdb_ffi::RocksDBCompactionStyle) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_compaction_style( rocksdb_ffi::rocksdb_options_set_compaction_style(
@ -252,28 +266,28 @@ impl RocksDBOptions {
} }
} }
pub fn set_max_background_compactions(&self, n: c_int) { pub fn set_max_background_compactions(&mut self, n: c_int) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_max_background_compactions( rocksdb_ffi::rocksdb_options_set_max_background_compactions(
self.inner, n); self.inner, n);
} }
} }
pub fn set_max_background_flushes(&self, n: c_int) { pub fn set_max_background_flushes(&mut self, n: c_int) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_max_background_flushes( rocksdb_ffi::rocksdb_options_set_max_background_flushes(
self.inner, n); self.inner, n);
} }
} }
pub fn set_filter_deletes(&self, filter: bool) { pub fn set_filter_deletes(&mut self, filter: bool) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_filter_deletes( rocksdb_ffi::rocksdb_options_set_filter_deletes(
self.inner, filter); self.inner, filter);
} }
} }
pub fn set_disable_auto_compactions(&self, disable: bool) { pub fn set_disable_auto_compactions(&mut self, disable: bool) {
unsafe { unsafe {
match disable { match disable {
true => true =>
@ -285,6 +299,12 @@ impl RocksDBOptions {
} }
} }
} }
pub fn set_block_based_table_factory(&mut self, factory: &BlockBasedOptions) {
unsafe {
rocksdb_ffi::rocksdb_options_set_block_based_table_factory(self.inner, factory.inner);
}
}
} }

Loading…
Cancel
Save