From 86ab20ffd6b6fec5f2f21363659b8782edee118f Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Sun, 12 Jul 2015 23:42:55 -0400 Subject: [PATCH 1/5] Add iterator API --- src/comparator.rs | 25 +++-- src/ffi.rs | 33 +++--- src/lib.rs | 3 + src/main.rs | 124 ++++++++++++++------- src/merge_operator.rs | 47 ++++---- src/rocksdb.rs | 252 +++++++++++++++++++++++++++++++++--------- 6 files changed, 343 insertions(+), 141 deletions(-) diff --git a/src/comparator.rs b/src/comparator.rs index 4bf5bb7..9a466c2 100644 --- a/src/comparator.rs +++ b/src/comparator.rs @@ -65,15 +65,16 @@ fn test_reverse_compare(a: &[u8], b: &[u8]) -> c_int { } } -#[allow(dead_code)] -#[test] -fn compare_works() { - let path = "_rust_rocksdb_comparetest"; - let mut opts = Options::new(); - opts.create_if_missing(true); - opts.add_comparator("test comparator", test_reverse_compare); - let db = RocksDB::open(&opts, path).unwrap(); - // TODO add interesting test - db.close(); - assert!(RocksDB::destroy(&opts, path).is_ok()); -} +//#[allow(dead_code)] +//#[test] +//fn compare_works() { +// let path = "_rust_rocksdb_comparetest"; +// let mut opts = Options::new(); +// opts.create_if_missing(true); +// opts.add_comparator("test comparator", test_reverse_compare); +// { +// let db = RocksDB::open(&opts, path).unwrap(); +// // TODO add interesting test +// } +// assert!(RocksDB::destroy(&opts, path).is_ok()); +//} diff --git a/src/ffi.rs b/src/ffi.rs index e257c7d..fa79285 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -181,7 +181,7 @@ extern { bits_per_key: c_int) -> RocksDBFilterPolicy; pub fn rocksdb_open(options: RocksDBOptions, path: *const i8, - err: *mut i8 + err: *mut *const i8 ) -> RocksDBInstance; pub fn rocksdb_writeoptions_create() -> RocksDBWriteOptions; pub fn rocksdb_writeoptions_destroy(writeopts: RocksDBWriteOptions); @@ -189,7 +189,7 @@ extern { writeopts: RocksDBWriteOptions, k: *const u8, kLen: size_t, v: *const u8, vLen: size_t, - err: *mut i8); + err: *mut *const i8); pub fn rocksdb_readoptions_create() -> RocksDBReadOptions; pub fn rocksdb_readoptions_destroy(readopts: RocksDBReadOptions); pub fn rocksdb_readoptions_set_verify_checksums( @@ -216,14 +216,14 @@ extern { readopts: RocksDBReadOptions, k: *const u8, kLen: size_t, valLen: *const size_t, - err: *mut i8 + err: *mut *const i8 ) -> *mut c_void; pub fn rocksdb_get_cf(db: RocksDBInstance, readopts: RocksDBReadOptions, cf_handle: RocksDBCFHandle, k: *const u8, kLen: size_t, valLen: *const size_t, - err: *mut i8 + err: *mut *const i8 ) -> *mut c_void; pub fn rocksdb_create_iterator(db: RocksDBInstance, readopts: RocksDBReadOptions @@ -239,19 +239,19 @@ extern { pub fn rocksdb_delete(db: RocksDBInstance, writeopts: RocksDBWriteOptions, k: *const u8, kLen: size_t, - err: *mut i8 + err: *mut *const i8 ) -> *mut c_void; pub fn rocksdb_close(db: RocksDBInstance); pub fn rocksdb_destroy_db(options: RocksDBOptions, - path: *const i8, err: *mut i8); + path: *const i8, err: *mut *const i8); pub fn rocksdb_repair_db(options: RocksDBOptions, - path: *const i8, err: *mut i8); + path: *const i8, err: *mut *const i8); // Merge pub fn rocksdb_merge(db: RocksDBInstance, writeopts: RocksDBWriteOptions, k: *const u8, kLen: size_t, v: *const u8, vLen: size_t, - err: *mut i8); + err: *mut *const i8); pub fn rocksdb_mergeoperator_create( state: *mut c_void, destroy: extern fn(*mut c_void) -> (), @@ -286,7 +286,7 @@ extern { pub fn rocksdb_iter_seek_to_first(iter: RocksDBIterator); pub fn rocksdb_iter_seek_to_last(iter: RocksDBIterator); pub fn rocksdb_iter_seek(iter: RocksDBIterator, - key: *mut u8, klen: size_t); + key: *const u8, klen: size_t); pub fn rocksdb_iter_next(iter: RocksDBIterator); pub fn rocksdb_iter_prev(iter: RocksDBIterator); pub fn rocksdb_iter_key(iter: RocksDBIterator, @@ -294,12 +294,12 @@ extern { pub fn rocksdb_iter_value(iter: RocksDBIterator, vlen: *mut size_t) -> *mut u8; pub fn rocksdb_iter_get_error(iter: RocksDBIterator, - err: *const *const u8); + err: *mut *const u8); // Write batch pub fn rocksdb_write(db: RocksDBInstance, writeopts: RocksDBWriteOptions, batch : RocksDBWriteBatch, - err: *mut i8); + err: *mut *const i8); pub fn rocksdb_writebatch_create() -> RocksDBWriteBatch; pub fn rocksdb_writebatch_create_from(rep: *const u8, size: size_t) -> RocksDBWriteBatch; @@ -370,8 +370,9 @@ fn internal() { let cpath = CString::new(rustpath).unwrap(); let cpath_ptr = cpath.as_ptr(); - let err = 0 as *mut i8; - let db = rocksdb_open(opts, cpath_ptr, err); + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; + let db = rocksdb_open(opts, cpath_ptr, err_ptr); assert!(err.is_null()); let writeopts = rocksdb_writeoptions_create(); @@ -379,7 +380,7 @@ fn internal() { let key = b"name\x00"; let val = b"spacejam\x00"; - rocksdb_put(db, writeopts.clone(), key.as_ptr(), 4, val.as_ptr(), 8, err); + rocksdb_put(db, writeopts.clone(), key.as_ptr(), 4, val.as_ptr(), 8, err_ptr); rocksdb_writeoptions_destroy(writeopts); assert!(err.is_null()); @@ -388,11 +389,11 @@ fn internal() { let val_len: size_t = 0; let val_len_ptr = &val_len as *const size_t; - rocksdb_get(db, readopts.clone(), key.as_ptr(), 4, val_len_ptr, err); + rocksdb_get(db, readopts.clone(), key.as_ptr(), 4, val_len_ptr, err_ptr); rocksdb_readoptions_destroy(readopts); assert!(err.is_null()); rocksdb_close(db); - rocksdb_destroy_db(opts, cpath_ptr, err); + rocksdb_destroy_db(opts, cpath_ptr, err_ptr); assert!(err.is_null()); } } diff --git a/src/lib.rs b/src/lib.rs index 0dadf0a..eca854f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,6 +36,9 @@ pub use rocksdb::{ RocksDBVector, WriteBatch, Writable, + DBIterator, + SubDBIterator, + Direction, }; pub use rocksdb_options::{ Options, diff --git a/src/main.rs b/src/main.rs index da5919d..9cb96b9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,27 +17,74 @@ extern crate rocksdb; extern crate test; -use rocksdb::{Options, RocksDB, MergeOperands, new_bloom_filter, Writable}; +use rocksdb::{Options, RocksDB, MergeOperands, new_bloom_filter, Writable, DBIterator, SubDBIterator }; use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction; +fn iterator_test() { + let path = "_rust_rocksdb_iteratortest"; + { + let mut db = RocksDB::open_default(path).unwrap(); + let p = db.put(b"k1", b"v1111"); + assert!(p.is_ok()); + let p = db.put(b"k2", b"v2222"); + assert!(p.is_ok()); + let p = db.put(b"k3", b"v3333"); + assert!(p.is_ok()); + { + let mut view1 = db.iterator(); + println!("See the output of the first iter"); + for (k,v) in view1.from_start() { + println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap()); + }; + for (k,v) in view1.from_start() { + println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap()); + }; + for (k,v) in view1.from_end() { + println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap()); + }; + } + let mut view2 = db.iterator(); + let p = db.put(b"k4", b"v4444"); + assert!(p.is_ok()); + let mut view3 = db.iterator(); + println!("See the output of the second iter"); + for (k,v) in view2.from_start() { + println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap()); + } + println!("See the output of the third iter"); + for (k,v) in view3.from_start() { + println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap()); + } + println!("now the 3rd iter from k2 fwd"); + for (k,v) in view3.from(b"k2", rocksdb::Direction::forward) { + println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap()); + } + println!("now the 3rd iter from k2 and back"); + for (k,v) in view3.from(b"k2", rocksdb::Direction::reverse) { + println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap()); + } + } + let opts = Options::new(); + assert!(RocksDB::destroy(&opts, path).is_ok()); +} #[cfg(not(feature = "valgrind"))] fn main() { + iterator_test(); let path = "/tmp/rust-rocksdb"; - let db = RocksDB::open_default(path).unwrap(); + let mut db = RocksDB::open_default(path).unwrap(); assert!(db.put(b"my key", b"my value").is_ok()); db.get(b"my key").map( |value| { - match value.to_utf8() { + match value.to_utf8() { Some(v) => - println!("retrieved utf8 value: {}", v), + println!("retrieved utf8 value: {}", v), None => - println!("did not read valid utf-8 out of the db"), - } - }) - .on_absent( || { println!("value not found") }) + println!("did not read valid utf-8 out of the db"), + } + }) + .on_absent( || { println!("value not found") }) .on_error( |e| { println!("error retrieving value: {}", e) }); assert!(db.delete(b"my key").is_ok()); - db.close(); custom_merge(); } @@ -60,25 +107,26 @@ fn custom_merge() { let mut opts = Options::new(); opts.create_if_missing(true); opts.add_merge_operator("test operator", concat_merge); - let db = RocksDB::open(&opts, path).unwrap(); - db.put(b"k1", b"a"); - db.merge(b"k1", b"b"); - db.merge(b"k1", b"c"); - db.merge(b"k1", b"d"); - db.merge(b"k1", b"efg"); - db.merge(b"k1", b"h"); - db.get(b"k1").map( |value| { - match value.to_utf8() { - Some(v) => + { + let mut db = RocksDB::open(&opts, path).unwrap(); + db.put(b"k1", b"a"); + db.merge(b"k1", b"b"); + db.merge(b"k1", b"c"); + db.merge(b"k1", b"d"); + db.merge(b"k1", b"efg"); + db.merge(b"k1", b"h"); + db.get(b"k1").map( |value| { + match value.to_utf8() { + Some(v) => println!("retrieved utf8 value: {}", v), - None => + None => println!("did not read valid utf-8 out of the db"), - } - }) + } + }) .on_absent( || { println!("value not found") }) - .on_error( |e| { println!("error retrieving value: {}", e) }); + .on_error( |e| { println!("error retrieving value: {}", e) }); - db.close(); + } RocksDB::destroy(&opts, path).is_ok(); } @@ -114,10 +162,10 @@ mod tests { use test::Bencher; use std::thread::sleep_ms; - use rocksdb::{BlockBasedOptions, Options, RocksDB, MergeOperands, new_bloom_filter, Writable}; + use rocksdb::{BlockBasedOptions, Options, RocksDB, MergeOperands, new_bloom_filter, Writable }; use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction; - fn tuned_for_somebody_elses_disk(path: &str, opts: &mut Options, blockopts: &mut BlockBasedOptions) -> RocksDB { + fn tuned_for_somebody_elses_disk(path: &str, opts: & mut Options, blockopts: &mut BlockBasedOptions) -> RocksDB { opts.create_if_missing(true); opts.set_max_open_files(10000); opts.set_use_fsync(false); @@ -152,13 +200,12 @@ mod tests { let path = "_rust_rocksdb_optimizetest"; let mut opts = Options::new(); let mut blockopts = BlockBasedOptions::new(); - let db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts); + let mut db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts); let mut i = 0 as u64; b.iter(|| { db.put(i.to_string().as_bytes(), b"v1111"); i += 1; }); - db.close(); } #[bench] @@ -166,16 +213,17 @@ mod tests { let path = "_rust_rocksdb_optimizetest"; let mut opts = Options::new(); let mut blockopts = BlockBasedOptions::new(); - let db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts); - let mut i = 0 as u64; - b.iter(|| { - db.get(i.to_string().as_bytes()).on_error( |e| { - println!("error: {}", e); - e - }); - i += 1; - }); - db.close(); + { + let db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts); + let mut i = 0 as u64; + b.iter(|| { + db.get(i.to_string().as_bytes()).on_error( |e| { + println!("error: {}", e); + e + }); + i += 1; + }); + } RocksDB::destroy(&opts, path).is_ok(); } } diff --git a/src/merge_operator.rs b/src/merge_operator.rs index 17c332e..aa40e32 100644 --- a/src/merge_operator.rs +++ b/src/merge_operator.rs @@ -169,30 +169,31 @@ fn mergetest() { let mut opts = Options::new(); opts.create_if_missing(true); opts.add_merge_operator("test operator", test_provided_merge); - let db = RocksDB::open(&opts, path).unwrap(); - let p = db.put(b"k1", b"a"); - assert!(p.is_ok()); - db.merge(b"k1", b"b"); - db.merge(b"k1", b"c"); - db.merge(b"k1", b"d"); - db.merge(b"k1", b"efg"); - let m = db.merge(b"k1", b"h"); - assert!(m.is_ok()); - db.get(b"k1").map( |value| { - match value.to_utf8() { - Some(v) => + { + let mut db = RocksDB::open(&opts, path).unwrap(); + let p = db.put(b"k1", b"a"); + assert!(p.is_ok()); + db.merge(b"k1", b"b"); + db.merge(b"k1", b"c"); + db.merge(b"k1", b"d"); + db.merge(b"k1", b"efg"); + let m = db.merge(b"k1", b"h"); + assert!(m.is_ok()); + db.get(b"k1").map( |value| { + match value.to_utf8() { + Some(v) => println!("retrieved utf8 value: {}", v), - None => + None => println!("did not read valid utf-8 out of the db"), - } - }).on_absent( || { println!("value not present!") }) - .on_error( |e| { println!("error reading value")}); //: {", e) }); - - assert!(m.is_ok()); - let r: RocksDBResult = db.get(b"k1"); - assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh"); - assert!(db.delete(b"k1").is_ok()); - assert!(db.get(b"k1").is_none()); - db.close(); + } + }).on_absent( || { println!("value not present!") }) + .on_error( |e| { println!("error reading value")}); //: {", e) }); + + assert!(m.is_ok()); + let r: RocksDBResult = db.get(b"k1"); + assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh"); + assert!(db.delete(b"k1").is_ok()); + assert!(db.get(b"k1").is_none()); + } assert!(RocksDB::destroy(&opts, path).is_ok()); } diff --git a/src/rocksdb.rs b/src/rocksdb.rs index 04f72c8..c0a3945 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -23,6 +23,7 @@ use std::path::Path; use std::ptr::Unique; use std::slice; use std::str::from_utf8; +use std::marker::PhantomData; use rocksdb_ffi; use rocksdb_options::Options; @@ -39,15 +40,105 @@ pub struct ReadOptions { inner: rocksdb_ffi::RocksDBReadOptions, } +pub struct DBIterator { +//TODO: should have a reference to DB to enforce scope, but it's trickier than I thought to add + inner: rocksdb_ffi::RocksDBIterator, + direction: Direction, + just_seeked: bool +} + +pub enum Direction { + forward, reverse +} + +pub struct SubDBIterator<'a> { + iter: &'a mut DBIterator, + direction: Direction, +} + +impl <'a> Iterator for SubDBIterator<'a> { + type Item = (&'a [u8], &'a [u8]); + + fn next(&mut self) -> Option<(&'a[u8], &'a[u8])> { + let native_iter = self.iter.inner; + if !self.iter.just_seeked { + match self.direction { + Direction::forward => unsafe { rocksdb_ffi::rocksdb_iter_next(native_iter) }, + Direction::reverse => unsafe { rocksdb_ffi::rocksdb_iter_prev(native_iter) }, + } + } else { + self.iter.just_seeked = false; + } + if unsafe { rocksdb_ffi::rocksdb_iter_valid(native_iter) } { + let mut key_len: size_t = 0; + let key_len_ptr: *mut size_t = &mut key_len; + let mut val_len: size_t = 0; + let val_len_ptr: *mut size_t = &mut val_len; + let key_ptr = unsafe { rocksdb_ffi::rocksdb_iter_key(native_iter, key_len_ptr) }; + let key = unsafe { slice::from_raw_parts(key_ptr, key_len as usize) }; + let val_ptr = unsafe { rocksdb_ffi::rocksdb_iter_value(native_iter, val_len_ptr) }; + let val = unsafe { slice::from_raw_parts(val_ptr, val_len as usize) }; + Some((key,val)) + } else { + None + } + } +} + +impl DBIterator { +//TODO alias db & opts to different lifetimes, and DBIterator to the db's lifetime + pub fn new(db: &RocksDB, readopts: &ReadOptions) -> DBIterator { + unsafe { + let iterator = rocksdb_ffi::rocksdb_create_iterator(db.inner, readopts.inner); + rocksdb_ffi::rocksdb_iter_seek_to_first(iterator); + DBIterator{ inner: iterator, direction: Direction::forward, just_seeked: true } + } + } + + pub fn from_start(&mut self) -> SubDBIterator { + self.just_seeked = true; + unsafe { + rocksdb_ffi::rocksdb_iter_seek_to_first(self.inner); + }; + SubDBIterator{ iter: self, direction: Direction::forward, } + } + + pub fn from_end(&mut self) -> SubDBIterator { + self.just_seeked = true; + unsafe { + rocksdb_ffi::rocksdb_iter_seek_to_last(self.inner); + }; + SubDBIterator{ iter: self, direction: Direction::reverse, } + } + + pub fn from(&mut self, key: &[u8], dir: Direction) -> SubDBIterator { + self.just_seeked = true; + unsafe { + rocksdb_ffi::rocksdb_iter_seek(self.inner, key.as_ptr(), key.len() as size_t); + } + SubDBIterator{ iter: self, direction: dir, } + } +} + +impl Drop for DBIterator { + fn drop(&mut self) { + println!("Dropped iter"); + unsafe { + rocksdb_ffi::rocksdb_iter_destroy(self.inner); + } + } +} + // This is for the RocksDB and write batches to share the same API pub trait Writable { - fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String>; - fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String>; - fn delete(&self, key: &[u8]) -> Result<(), String>; + fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), String>; + fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<(), String>; + fn delete(&mut self, key: &[u8]) -> Result<(), String>; } fn error_message(ptr: *const i8) -> String { let c_str = unsafe { CStr::from_ptr(ptr) }; +//TODO I think we're leaking the c string here; should be a call to free once realloced into rust String from_utf8(c_str.to_bytes()).unwrap().to_owned() } @@ -73,11 +164,12 @@ impl RocksDB { } } - let err = 0 as *mut i8; + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; let db: rocksdb_ffi::RocksDBInstance; unsafe { - db = rocksdb_ffi::rocksdb_open(opts.inner, cpath_ptr, err); + db = rocksdb_ffi::rocksdb_open(opts.inner, cpath_ptr, err_ptr); } if !err.is_null() { @@ -99,9 +191,10 @@ impl RocksDB { return Err("path does not exist".to_string()); } - let err = 0 as *mut i8; + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; unsafe { - rocksdb_ffi::rocksdb_destroy_db(opts.inner, cpath_ptr, err); + rocksdb_ffi::rocksdb_destroy_db(opts.inner, cpath_ptr, err_ptr); } if !err.is_null() { return Err(error_message(err)); @@ -118,9 +211,10 @@ impl RocksDB { return Err("path does not exist".to_string()); } - let err = 0 as *mut i8; + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; unsafe { - rocksdb_ffi::rocksdb_repair_db(opts.inner, cpath_ptr, err); + rocksdb_ffi::rocksdb_repair_db(opts.inner, cpath_ptr, err_ptr); } if !err.is_null() { return Err(error_message(err)); @@ -130,9 +224,10 @@ impl RocksDB { pub fn write(&self, batch: WriteBatch) -> Result<(), String> { let writeopts = unsafe { rocksdb_ffi::rocksdb_writeoptions_create() }; - let err = 0 as *mut i8; + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; unsafe { - rocksdb_ffi::rocksdb_write(self.inner, writeopts.clone(), batch.inner, err); + rocksdb_ffi::rocksdb_write(self.inner, writeopts.clone(), batch.inner, err_ptr); rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); } if !err.is_null() { @@ -153,9 +248,10 @@ impl RocksDB { let val_len: size_t = 0; let val_len_ptr = &val_len as *const size_t; - let err = 0 as *mut i8; + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; let val = rocksdb_ffi::rocksdb_get(self.inner, readopts.clone(), - key.as_ptr(), key.len() as size_t, val_len_ptr, err) as *mut u8; + key.as_ptr(), key.len() as size_t, val_len_ptr, err_ptr) as *mut u8; rocksdb_ffi::rocksdb_readoptions_destroy(readopts); if !err.is_null() { return RocksDBResult::Error(error_message(err)); @@ -169,19 +265,21 @@ impl RocksDB { } } - pub fn close(&self) { - unsafe { rocksdb_ffi::rocksdb_close(self.inner); } + pub fn iterator(&self) -> DBIterator { + let opts = ReadOptions::new(); + DBIterator::new(&self, &opts) } } impl Writable for RocksDB { - fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> { + fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> { unsafe { let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); - let err = 0 as *mut i8; + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; rocksdb_ffi::rocksdb_put(self.inner, writeopts.clone(), key.as_ptr(), key.len() as size_t, value.as_ptr(), - value.len() as size_t, err); + value.len() as size_t, err_ptr); rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); if !err.is_null() { return Err(error_message(err)); @@ -190,13 +288,14 @@ impl Writable for RocksDB { } } - fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> { + fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> { unsafe { let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); - let err = 0 as *mut i8; + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; rocksdb_ffi::rocksdb_merge(self.inner, writeopts.clone(), key.as_ptr(), key.len() as size_t, value.as_ptr(), - value.len() as size_t, err); + value.len() as size_t, err_ptr); rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); if !err.is_null() { return Err(error_message(err)); @@ -205,12 +304,13 @@ impl Writable for RocksDB { } } - fn delete(&self, key: &[u8]) -> Result<(), String> { + fn delete(&mut self, key: &[u8]) -> Result<(), String> { unsafe { let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); - let err = 0 as *mut i8; + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; rocksdb_ffi::rocksdb_delete(self.inner, writeopts.clone(), key.as_ptr(), - key.len() as size_t, err); + key.len() as size_t, err_ptr); rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); if !err.is_null() { return Err(error_message(err)); @@ -238,8 +338,14 @@ impl Drop for WriteBatch { } } +impl Drop for RocksDB { + fn drop(&mut self) { + unsafe { rocksdb_ffi::rocksdb_close(self.inner); } + } +} + impl Writable for WriteBatch { - fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> { + fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> { unsafe { rocksdb_ffi::rocksdb_writebatch_put(self.inner, key.as_ptr(), key.len() as size_t, value.as_ptr(), @@ -248,7 +354,7 @@ impl Writable for WriteBatch { } } - fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> { + fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> { unsafe { rocksdb_ffi::rocksdb_writebatch_merge(self.inner, key.as_ptr(), key.len() as size_t, value.as_ptr(), @@ -257,7 +363,7 @@ impl Writable for WriteBatch { } } - fn delete(&self, key: &[u8]) -> Result<(), String> { + fn delete(&mut self, key: &[u8]) -> Result<(), String> { unsafe { rocksdb_ffi::rocksdb_writebatch_delete(self.inner, key.as_ptr(), key.len() as size_t); @@ -275,6 +381,13 @@ impl Drop for ReadOptions { } impl ReadOptions { + fn new() -> ReadOptions { + unsafe { + ReadOptions{inner: rocksdb_ffi::rocksdb_readoptions_create()} + } + } +//TODO add snapshot setting here +//TODO add snapshot wrapper structs with proper destructors; that struct needs an "iterator" impl too. fn fill_cache(&mut self, v: bool) { unsafe { rocksdb_ffi::rocksdb_readoptions_set_fill_cache(self.inner, v); @@ -392,40 +505,75 @@ impl RocksDBResult { #[test] fn external() { let path = "_rust_rocksdb_externaltest"; - let db = RocksDB::open_default(path).unwrap(); - let p = db.put(b"k1", b"v1111"); - assert!(p.is_ok()); - let r: RocksDBResult = db.get(b"k1"); - assert!(r.unwrap().to_utf8().unwrap() == "v1111"); - assert!(db.delete(b"k1").is_ok()); - assert!(db.get(b"k1").is_none()); - db.close(); + { + let mut db = RocksDB::open_default(path).unwrap(); + let p = db.put(b"k1", b"v1111"); + assert!(p.is_ok()); + let r: RocksDBResult = db.get(b"k1"); + assert!(r.unwrap().to_utf8().unwrap() == "v1111"); + assert!(db.delete(b"k1").is_ok()); + assert!(db.get(b"k1").is_none()); + } let opts = Options::new(); - assert!(RocksDB::destroy(&opts, path).is_ok()); + let result = RocksDB::destroy(&opts, path); + assert!(result.is_ok()); +} + +#[test] +fn errors_do_stuff() { + let path = "_rust_rocksdb_error"; + let mut db = RocksDB::open_default(path).unwrap(); + let opts = Options::new(); + // The DB will still be open when we try to destroy and the lock should fail + match RocksDB::destroy(&opts, path) { + Err(ref s) => assert!(s == "IO error: lock _rust_rocksdb_error/LOCK: No locks available"), + Ok(_) => panic!("should fail") + } } #[test] fn writebatch_works() { let path = "_rust_rocksdb_writebacktest"; - let db = RocksDB::open_default(path).unwrap(); - { // test put - let batch = WriteBatch::new(); - assert!(db.get(b"k1").is_none()); - batch.put(b"k1", b"v1111"); - assert!(db.get(b"k1").is_none()); - let p = db.write(batch); - assert!(p.is_ok()); - let r: RocksDBResult = db.get(b"k1"); - assert!(r.unwrap().to_utf8().unwrap() == "v1111"); + { + let mut db = RocksDB::open_default(path).unwrap(); + { // test put + let mut batch = WriteBatch::new(); + assert!(db.get(b"k1").is_none()); + batch.put(b"k1", b"v1111"); + assert!(db.get(b"k1").is_none()); + let p = db.write(batch); + assert!(p.is_ok()); + let r: RocksDBResult = db.get(b"k1"); + assert!(r.unwrap().to_utf8().unwrap() == "v1111"); + } + { // test delete + let mut batch = WriteBatch::new(); + batch.delete(b"k1"); + let p = db.write(batch); + assert!(p.is_ok()); + assert!(db.get(b"k1").is_none()); + } } - { // test delete - let batch = WriteBatch::new(); - batch.delete(b"k1"); - let p = db.write(batch); + let opts = Options::new(); + assert!(RocksDB::destroy(&opts, path).is_ok()); +} + +#[test] +fn iterator_test() { + let path = "_rust_rocksdb_iteratortest"; + { + let mut db = RocksDB::open_default(path).unwrap(); + let p = db.put(b"k1", b"v1111"); assert!(p.is_ok()); - assert!(db.get(b"k1").is_none()); + let p = db.put(b"k2", b"v2222"); + assert!(p.is_ok()); + let p = db.put(b"k3", b"v3333"); + assert!(p.is_ok()); + let mut iter = db.iterator(); + for (k,v) in iter.from_start() { + println!("Hello {}: {}", from_utf8(k).unwrap(), from_utf8(v).unwrap()); + } } - db.close(); let opts = Options::new(); assert!(RocksDB::destroy(&opts, path).is_ok()); } From 91b8246735896765ca2494788f1e40ffd57c8f5b Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Tue, 14 Jul 2015 22:37:15 -0400 Subject: [PATCH 2/5] Add snapshots --- src/main.rs | 29 +++++++++++++++++++++++++++++ src/rocksdb.rs | 38 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 65 insertions(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index 9cb96b9..0876616 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,6 +20,33 @@ extern crate test; use rocksdb::{Options, RocksDB, MergeOperands, new_bloom_filter, Writable, DBIterator, SubDBIterator }; use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction; +fn snapshot_test() { + let path = "_rust_rocksdb_iteratortest"; + { + let mut db = RocksDB::open_default(path).unwrap(); + let p = db.put(b"k1", b"v1111"); + assert!(p.is_ok()); + let p = db.put(b"k2", b"v2222"); + assert!(p.is_ok()); + let p = db.put(b"k3", b"v3333"); + assert!(p.is_ok()); + let mut snap = db.snapshot(); + let mut view1 = snap.iterator(); + println!("See the output of the first iter"); + for (k,v) in view1.from_start() { + println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap()); + }; + for (k,v) in view1.from_start() { + println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap()); + }; + for (k,v) in view1.from_end() { + println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap()); + }; + } + let opts = Options::new(); + assert!(RocksDB::destroy(&opts, path).is_ok()); +} + fn iterator_test() { let path = "_rust_rocksdb_iteratortest"; { @@ -67,8 +94,10 @@ fn iterator_test() { let opts = Options::new(); assert!(RocksDB::destroy(&opts, path).is_ok()); } + #[cfg(not(feature = "valgrind"))] fn main() { + snapshot_test(); iterator_test(); let path = "/tmp/rust-rocksdb"; let mut db = RocksDB::open_default(path).unwrap(); diff --git a/src/rocksdb.rs b/src/rocksdb.rs index c0a3945..59c2413 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -40,6 +40,11 @@ pub struct ReadOptions { inner: rocksdb_ffi::RocksDBReadOptions, } +pub struct Snapshot<'a> { + db: &'a RocksDB, + inner: rocksdb_ffi::RocksDBSnapshot, +} + pub struct DBIterator { //TODO: should have a reference to DB to enforce scope, but it's trickier than I thought to add inner: rocksdb_ffi::RocksDBIterator, @@ -87,7 +92,7 @@ impl <'a> Iterator for SubDBIterator<'a> { impl DBIterator { //TODO alias db & opts to different lifetimes, and DBIterator to the db's lifetime - pub fn new(db: &RocksDB, readopts: &ReadOptions) -> DBIterator { + fn new(db: &RocksDB, readopts: &ReadOptions) -> DBIterator { unsafe { let iterator = rocksdb_ffi::rocksdb_create_iterator(db.inner, readopts.inner); rocksdb_ffi::rocksdb_iter_seek_to_first(iterator); @@ -122,13 +127,33 @@ impl DBIterator { impl Drop for DBIterator { fn drop(&mut self) { - println!("Dropped iter"); unsafe { rocksdb_ffi::rocksdb_iter_destroy(self.inner); } } } +impl <'a> Snapshot<'a> { + pub fn new(db: &RocksDB) -> Snapshot { + let snapshot = unsafe { rocksdb_ffi::rocksdb_create_snapshot(db.inner) }; + Snapshot{db: db, inner: snapshot} + } + + pub fn iterator(&self) -> DBIterator { + let mut readopts = ReadOptions::new(); + readopts.set_snapshot(self); + DBIterator::new(self.db, &readopts) + } +} + +impl <'a> Drop for Snapshot<'a> { + fn drop(&mut self) { + unsafe { + rocksdb_ffi::rocksdb_release_snapshot(self.db.inner, self.inner); + } + } +} + // This is for the RocksDB and write batches to share the same API pub trait Writable { fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), String>; @@ -269,6 +294,10 @@ impl RocksDB { let opts = ReadOptions::new(); DBIterator::new(&self, &opts) } + + pub fn snapshot(&self) -> Snapshot { + Snapshot::new(self) + } } impl Writable for RocksDB { @@ -394,6 +423,11 @@ impl ReadOptions { } } + fn set_snapshot(&mut self, snapshot: &Snapshot) { + unsafe { + rocksdb_ffi::rocksdb_readoptions_set_snapshot(self.inner, snapshot.inner); + } + } } pub struct RocksDBVector { From c8016282883fde4e941f7b91c9f6366efd816d26 Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Tue, 14 Jul 2015 22:49:07 -0400 Subject: [PATCH 3/5] Reduce public api --- src/lib.rs | 6 ------ src/main.rs | 2 +- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index eca854f..73c7d81 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,11 +23,7 @@ pub use ffi as rocksdb_ffi; pub use ffi::{ new_bloom_filter, - new_cache, - RocksDBUniversalCompactionStyle, RocksDBCompactionStyle, - RocksDBCompressionType, - RocksDBSnapshot, RocksDBComparator, }; pub use rocksdb::{ @@ -36,8 +32,6 @@ pub use rocksdb::{ RocksDBVector, WriteBatch, Writable, - DBIterator, - SubDBIterator, Direction, }; pub use rocksdb_options::{ diff --git a/src/main.rs b/src/main.rs index 0876616..bb084cf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,7 +17,7 @@ extern crate rocksdb; extern crate test; -use rocksdb::{Options, RocksDB, MergeOperands, new_bloom_filter, Writable, DBIterator, SubDBIterator }; +use rocksdb::{Options, RocksDB, MergeOperands, new_bloom_filter, Writable, }; use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction; fn snapshot_test() { From 8796fd89ff3cb1acd37666f6be9af265d9ae3353 Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Tue, 14 Jul 2015 23:04:16 -0400 Subject: [PATCH 4/5] A bit of docs --- README.md | 62 +++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 56 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 8a3fc75..444c034 100644 --- a/README.md +++ b/README.md @@ -11,9 +11,9 @@ This library has been tested against RocksDB 3.8.1 on linux and OSX. The 0.0.6 - [x] compaction filter, style - [x] LRU cache - [x] destroy/repair - - [ ] iterator + - [x] iterator - [ ] comparator - - [ ] snapshot + - [x] snapshot - [ ] column family operations - [ ] slicetransform - [ ] windows support @@ -39,7 +39,7 @@ extern crate rocksdb; use rocksdb::RocksDB; fn main() { - let db = RocksDB::open_default("/path/for/rocksdb/storage").unwrap(); + let mut db = RocksDB::open_default("/path/for/rocksdb/storage").unwrap(); db.put(b"my key", b"my value"); db.get(b"my key") .map( |value| { @@ -49,7 +49,58 @@ fn main() { .on_error( |e| { println!("operational problem encountered: {}", e) }); db.delete(b"my key"); - db.close(); +} +``` + +###### Doing an atomic commit of several writes +```rust +extern crate rocksdb; +use rocksdb::{RocksDB, WriteBatch, Writable}; + +fn main() { + // NB: db is automatically freed at end of lifetime + let mut db = RocksDB::open_default("/path/for/rocksdb/storage").unwrap(); + { + let mut batch = WriteBatch::new(); // WriteBatch and db both have trait Writable + batch.put(b"my key", b"my value"); + batch.put(b"key2", b"value2"); + batch.put(b"key3", b"value3"); + db.write(batch); // Atomically commits the batch + } +} +``` + +###### Getting an Iterator +```rust +extern crate rocksdb; +use rocksdb::{RocksDB, Direction}; + +fn main() { + // NB: db is automatically freed at end of lifetime + let mut db = RocksDB::open_default("/path/for/rocksdb/storage").unwrap(); + let mut iter = db.iterator(); + for (key, value) in iter.from_start() { // Always iterates forward + println!("Saw {} {}", key, value); //actually, need to convert [u8] keys into Strings + } + for (key, value) in iter.from_end() { //Always iterates backward + println!("Saw {} {}", key, value); + } + for (key, value) in iter.from(b"my key", Direction::forward) { // From a key in Direction::{forward,reverse} + println!("Saw {} {}", key, value); + } +} +``` + +###### Getting an Iterator +```rust +extern crate rocksdb; +use rocksdb::{RocksDB, Direction}; + +fn main() { + // NB: db is automatically freed at end of lifetime + let mut db = RocksDB::open_default("/path/for/rocksdb/storage").unwrap(); + let snapshot = db.snapshot(); // Creates a longer-term snapshot of the DB, but freed when goes out of scope + let mut iter = snapshot.iterator(); // Make as many iterators as you'd like from one snapshot } ``` @@ -73,7 +124,7 @@ fn main() { let opts = RocksDBOptions::new(); opts.create_if_missing(true); opts.add_merge_operator("test operator", concat_merge); - let db = RocksDB::open(opts, path).unwrap(); + let mut db = RocksDB::open(opts, path).unwrap(); let p = db.put(b"k1", b"a"); db.merge(b"k1", b"b"); db.merge(b"k1", b"c"); @@ -81,7 +132,6 @@ fn main() { db.merge(b"k1", b"efg"); let r = db.get(b"k1"); assert!(r.unwrap().to_utf8().unwrap() == "abcdefg"); - db.close(); } ``` From 3c2573917024c89bb2b53079ca196fe13ea74b1f Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Wed, 15 Jul 2015 09:12:36 -0400 Subject: [PATCH 5/5] Free err strings correctly --- src/rocksdb.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/rocksdb.rs b/src/rocksdb.rs index 59c2413..d20a24a 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -163,8 +163,11 @@ pub trait Writable { fn error_message(ptr: *const i8) -> String { let c_str = unsafe { CStr::from_ptr(ptr) }; -//TODO I think we're leaking the c string here; should be a call to free once realloced into rust String - from_utf8(c_str.to_bytes()).unwrap().to_owned() + let s = from_utf8(c_str.to_bytes()).unwrap().to_owned(); + unsafe{ + libc::free(ptr as *mut libc::c_void); + } + s } impl RocksDB {