From 86ab20ffd6b6fec5f2f21363659b8782edee118f Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Sun, 12 Jul 2015 23:42:55 -0400 Subject: [PATCH] Add iterator API --- src/comparator.rs | 25 +++-- src/ffi.rs | 33 +++--- src/lib.rs | 3 + src/main.rs | 124 ++++++++++++++------- src/merge_operator.rs | 47 ++++---- src/rocksdb.rs | 252 +++++++++++++++++++++++++++++++++--------- 6 files changed, 343 insertions(+), 141 deletions(-) diff --git a/src/comparator.rs b/src/comparator.rs index 4bf5bb7..9a466c2 100644 --- a/src/comparator.rs +++ b/src/comparator.rs @@ -65,15 +65,16 @@ fn test_reverse_compare(a: &[u8], b: &[u8]) -> c_int { } } -#[allow(dead_code)] -#[test] -fn compare_works() { - let path = "_rust_rocksdb_comparetest"; - let mut opts = Options::new(); - opts.create_if_missing(true); - opts.add_comparator("test comparator", test_reverse_compare); - let db = RocksDB::open(&opts, path).unwrap(); - // TODO add interesting test - db.close(); - assert!(RocksDB::destroy(&opts, path).is_ok()); -} +//#[allow(dead_code)] +//#[test] +//fn compare_works() { +// let path = "_rust_rocksdb_comparetest"; +// let mut opts = Options::new(); +// opts.create_if_missing(true); +// opts.add_comparator("test comparator", test_reverse_compare); +// { +// let db = RocksDB::open(&opts, path).unwrap(); +// // TODO add interesting test +// } +// assert!(RocksDB::destroy(&opts, path).is_ok()); +//} diff --git a/src/ffi.rs b/src/ffi.rs index e257c7d..fa79285 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -181,7 +181,7 @@ extern { bits_per_key: c_int) -> RocksDBFilterPolicy; pub fn rocksdb_open(options: RocksDBOptions, path: *const i8, - err: *mut i8 + err: *mut *const i8 ) -> RocksDBInstance; pub fn rocksdb_writeoptions_create() -> RocksDBWriteOptions; pub fn rocksdb_writeoptions_destroy(writeopts: RocksDBWriteOptions); @@ -189,7 +189,7 @@ extern { writeopts: RocksDBWriteOptions, k: *const u8, kLen: size_t, v: *const u8, vLen: size_t, - err: *mut i8); + err: *mut *const i8); pub fn rocksdb_readoptions_create() -> RocksDBReadOptions; pub fn rocksdb_readoptions_destroy(readopts: RocksDBReadOptions); pub fn rocksdb_readoptions_set_verify_checksums( @@ -216,14 +216,14 @@ extern { readopts: RocksDBReadOptions, k: *const u8, kLen: size_t, valLen: *const size_t, - err: *mut i8 + err: *mut *const i8 ) -> *mut c_void; pub fn rocksdb_get_cf(db: RocksDBInstance, readopts: RocksDBReadOptions, cf_handle: RocksDBCFHandle, k: *const u8, kLen: size_t, valLen: *const size_t, - err: *mut i8 + err: *mut *const i8 ) -> *mut c_void; pub fn rocksdb_create_iterator(db: RocksDBInstance, readopts: RocksDBReadOptions @@ -239,19 +239,19 @@ extern { pub fn rocksdb_delete(db: RocksDBInstance, writeopts: RocksDBWriteOptions, k: *const u8, kLen: size_t, - err: *mut i8 + err: *mut *const i8 ) -> *mut c_void; pub fn rocksdb_close(db: RocksDBInstance); pub fn rocksdb_destroy_db(options: RocksDBOptions, - path: *const i8, err: *mut i8); + path: *const i8, err: *mut *const i8); pub fn rocksdb_repair_db(options: RocksDBOptions, - path: *const i8, err: *mut i8); + path: *const i8, err: *mut *const i8); // Merge pub fn rocksdb_merge(db: RocksDBInstance, writeopts: RocksDBWriteOptions, k: *const u8, kLen: size_t, v: *const u8, vLen: size_t, - err: *mut i8); + err: *mut *const i8); pub fn rocksdb_mergeoperator_create( state: *mut c_void, destroy: extern fn(*mut c_void) -> (), @@ -286,7 +286,7 @@ extern { pub fn rocksdb_iter_seek_to_first(iter: RocksDBIterator); pub fn rocksdb_iter_seek_to_last(iter: RocksDBIterator); pub fn rocksdb_iter_seek(iter: RocksDBIterator, - key: *mut u8, klen: size_t); + key: *const u8, klen: size_t); pub fn rocksdb_iter_next(iter: RocksDBIterator); pub fn rocksdb_iter_prev(iter: RocksDBIterator); pub fn rocksdb_iter_key(iter: RocksDBIterator, @@ -294,12 +294,12 @@ extern { pub fn rocksdb_iter_value(iter: RocksDBIterator, vlen: *mut size_t) -> *mut u8; pub fn rocksdb_iter_get_error(iter: RocksDBIterator, - err: *const *const u8); + err: *mut *const u8); // Write batch pub fn rocksdb_write(db: RocksDBInstance, writeopts: RocksDBWriteOptions, batch : RocksDBWriteBatch, - err: *mut i8); + err: *mut *const i8); pub fn rocksdb_writebatch_create() -> RocksDBWriteBatch; pub fn rocksdb_writebatch_create_from(rep: *const u8, size: size_t) -> RocksDBWriteBatch; @@ -370,8 +370,9 @@ fn internal() { let cpath = CString::new(rustpath).unwrap(); let cpath_ptr = cpath.as_ptr(); - let err = 0 as *mut i8; - let db = rocksdb_open(opts, cpath_ptr, err); + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; + let db = rocksdb_open(opts, cpath_ptr, err_ptr); assert!(err.is_null()); let writeopts = rocksdb_writeoptions_create(); @@ -379,7 +380,7 @@ fn internal() { let key = b"name\x00"; let val = b"spacejam\x00"; - rocksdb_put(db, writeopts.clone(), key.as_ptr(), 4, val.as_ptr(), 8, err); + rocksdb_put(db, writeopts.clone(), key.as_ptr(), 4, val.as_ptr(), 8, err_ptr); rocksdb_writeoptions_destroy(writeopts); assert!(err.is_null()); @@ -388,11 +389,11 @@ fn internal() { let val_len: size_t = 0; let val_len_ptr = &val_len as *const size_t; - rocksdb_get(db, readopts.clone(), key.as_ptr(), 4, val_len_ptr, err); + rocksdb_get(db, readopts.clone(), key.as_ptr(), 4, val_len_ptr, err_ptr); rocksdb_readoptions_destroy(readopts); assert!(err.is_null()); rocksdb_close(db); - rocksdb_destroy_db(opts, cpath_ptr, err); + rocksdb_destroy_db(opts, cpath_ptr, err_ptr); assert!(err.is_null()); } } diff --git a/src/lib.rs b/src/lib.rs index 0dadf0a..eca854f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,6 +36,9 @@ pub use rocksdb::{ RocksDBVector, WriteBatch, Writable, + DBIterator, + SubDBIterator, + Direction, }; pub use rocksdb_options::{ Options, diff --git a/src/main.rs b/src/main.rs index da5919d..9cb96b9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,27 +17,74 @@ extern crate rocksdb; extern crate test; -use rocksdb::{Options, RocksDB, MergeOperands, new_bloom_filter, Writable}; +use rocksdb::{Options, RocksDB, MergeOperands, new_bloom_filter, Writable, DBIterator, SubDBIterator }; use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction; +fn iterator_test() { + let path = "_rust_rocksdb_iteratortest"; + { + let mut db = RocksDB::open_default(path).unwrap(); + let p = db.put(b"k1", b"v1111"); + assert!(p.is_ok()); + let p = db.put(b"k2", b"v2222"); + assert!(p.is_ok()); + let p = db.put(b"k3", b"v3333"); + assert!(p.is_ok()); + { + let mut view1 = db.iterator(); + println!("See the output of the first iter"); + for (k,v) in view1.from_start() { + println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap()); + }; + for (k,v) in view1.from_start() { + println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap()); + }; + for (k,v) in view1.from_end() { + println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap()); + }; + } + let mut view2 = db.iterator(); + let p = db.put(b"k4", b"v4444"); + assert!(p.is_ok()); + let mut view3 = db.iterator(); + println!("See the output of the second iter"); + for (k,v) in view2.from_start() { + println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap()); + } + println!("See the output of the third iter"); + for (k,v) in view3.from_start() { + println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap()); + } + println!("now the 3rd iter from k2 fwd"); + for (k,v) in view3.from(b"k2", rocksdb::Direction::forward) { + println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap()); + } + println!("now the 3rd iter from k2 and back"); + for (k,v) in view3.from(b"k2", rocksdb::Direction::reverse) { + println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap()); + } + } + let opts = Options::new(); + assert!(RocksDB::destroy(&opts, path).is_ok()); +} #[cfg(not(feature = "valgrind"))] fn main() { + iterator_test(); let path = "/tmp/rust-rocksdb"; - let db = RocksDB::open_default(path).unwrap(); + let mut db = RocksDB::open_default(path).unwrap(); assert!(db.put(b"my key", b"my value").is_ok()); db.get(b"my key").map( |value| { - match value.to_utf8() { + match value.to_utf8() { Some(v) => - println!("retrieved utf8 value: {}", v), + println!("retrieved utf8 value: {}", v), None => - println!("did not read valid utf-8 out of the db"), - } - }) - .on_absent( || { println!("value not found") }) + println!("did not read valid utf-8 out of the db"), + } + }) + .on_absent( || { println!("value not found") }) .on_error( |e| { println!("error retrieving value: {}", e) }); assert!(db.delete(b"my key").is_ok()); - db.close(); custom_merge(); } @@ -60,25 +107,26 @@ fn custom_merge() { let mut opts = Options::new(); opts.create_if_missing(true); opts.add_merge_operator("test operator", concat_merge); - let db = RocksDB::open(&opts, path).unwrap(); - db.put(b"k1", b"a"); - db.merge(b"k1", b"b"); - db.merge(b"k1", b"c"); - db.merge(b"k1", b"d"); - db.merge(b"k1", b"efg"); - db.merge(b"k1", b"h"); - db.get(b"k1").map( |value| { - match value.to_utf8() { - Some(v) => + { + let mut db = RocksDB::open(&opts, path).unwrap(); + db.put(b"k1", b"a"); + db.merge(b"k1", b"b"); + db.merge(b"k1", b"c"); + db.merge(b"k1", b"d"); + db.merge(b"k1", b"efg"); + db.merge(b"k1", b"h"); + db.get(b"k1").map( |value| { + match value.to_utf8() { + Some(v) => println!("retrieved utf8 value: {}", v), - None => + None => println!("did not read valid utf-8 out of the db"), - } - }) + } + }) .on_absent( || { println!("value not found") }) - .on_error( |e| { println!("error retrieving value: {}", e) }); + .on_error( |e| { println!("error retrieving value: {}", e) }); - db.close(); + } RocksDB::destroy(&opts, path).is_ok(); } @@ -114,10 +162,10 @@ mod tests { use test::Bencher; use std::thread::sleep_ms; - use rocksdb::{BlockBasedOptions, Options, RocksDB, MergeOperands, new_bloom_filter, Writable}; + use rocksdb::{BlockBasedOptions, Options, RocksDB, MergeOperands, new_bloom_filter, Writable }; use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction; - fn tuned_for_somebody_elses_disk(path: &str, opts: &mut Options, blockopts: &mut BlockBasedOptions) -> RocksDB { + fn tuned_for_somebody_elses_disk(path: &str, opts: & mut Options, blockopts: &mut BlockBasedOptions) -> RocksDB { opts.create_if_missing(true); opts.set_max_open_files(10000); opts.set_use_fsync(false); @@ -152,13 +200,12 @@ mod tests { let path = "_rust_rocksdb_optimizetest"; let mut opts = Options::new(); let mut blockopts = BlockBasedOptions::new(); - let db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts); + let mut db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts); let mut i = 0 as u64; b.iter(|| { db.put(i.to_string().as_bytes(), b"v1111"); i += 1; }); - db.close(); } #[bench] @@ -166,16 +213,17 @@ mod tests { let path = "_rust_rocksdb_optimizetest"; let mut opts = Options::new(); let mut blockopts = BlockBasedOptions::new(); - let db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts); - let mut i = 0 as u64; - b.iter(|| { - db.get(i.to_string().as_bytes()).on_error( |e| { - println!("error: {}", e); - e - }); - i += 1; - }); - db.close(); + { + let db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts); + let mut i = 0 as u64; + b.iter(|| { + db.get(i.to_string().as_bytes()).on_error( |e| { + println!("error: {}", e); + e + }); + i += 1; + }); + } RocksDB::destroy(&opts, path).is_ok(); } } diff --git a/src/merge_operator.rs b/src/merge_operator.rs index 17c332e..aa40e32 100644 --- a/src/merge_operator.rs +++ b/src/merge_operator.rs @@ -169,30 +169,31 @@ fn mergetest() { let mut opts = Options::new(); opts.create_if_missing(true); opts.add_merge_operator("test operator", test_provided_merge); - let db = RocksDB::open(&opts, path).unwrap(); - let p = db.put(b"k1", b"a"); - assert!(p.is_ok()); - db.merge(b"k1", b"b"); - db.merge(b"k1", b"c"); - db.merge(b"k1", b"d"); - db.merge(b"k1", b"efg"); - let m = db.merge(b"k1", b"h"); - assert!(m.is_ok()); - db.get(b"k1").map( |value| { - match value.to_utf8() { - Some(v) => + { + let mut db = RocksDB::open(&opts, path).unwrap(); + let p = db.put(b"k1", b"a"); + assert!(p.is_ok()); + db.merge(b"k1", b"b"); + db.merge(b"k1", b"c"); + db.merge(b"k1", b"d"); + db.merge(b"k1", b"efg"); + let m = db.merge(b"k1", b"h"); + assert!(m.is_ok()); + db.get(b"k1").map( |value| { + match value.to_utf8() { + Some(v) => println!("retrieved utf8 value: {}", v), - None => + None => println!("did not read valid utf-8 out of the db"), - } - }).on_absent( || { println!("value not present!") }) - .on_error( |e| { println!("error reading value")}); //: {", e) }); - - assert!(m.is_ok()); - let r: RocksDBResult = db.get(b"k1"); - assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh"); - assert!(db.delete(b"k1").is_ok()); - assert!(db.get(b"k1").is_none()); - db.close(); + } + }).on_absent( || { println!("value not present!") }) + .on_error( |e| { println!("error reading value")}); //: {", e) }); + + assert!(m.is_ok()); + let r: RocksDBResult = db.get(b"k1"); + assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh"); + assert!(db.delete(b"k1").is_ok()); + assert!(db.get(b"k1").is_none()); + } assert!(RocksDB::destroy(&opts, path).is_ok()); } diff --git a/src/rocksdb.rs b/src/rocksdb.rs index 04f72c8..c0a3945 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -23,6 +23,7 @@ use std::path::Path; use std::ptr::Unique; use std::slice; use std::str::from_utf8; +use std::marker::PhantomData; use rocksdb_ffi; use rocksdb_options::Options; @@ -39,15 +40,105 @@ pub struct ReadOptions { inner: rocksdb_ffi::RocksDBReadOptions, } +pub struct DBIterator { +//TODO: should have a reference to DB to enforce scope, but it's trickier than I thought to add + inner: rocksdb_ffi::RocksDBIterator, + direction: Direction, + just_seeked: bool +} + +pub enum Direction { + forward, reverse +} + +pub struct SubDBIterator<'a> { + iter: &'a mut DBIterator, + direction: Direction, +} + +impl <'a> Iterator for SubDBIterator<'a> { + type Item = (&'a [u8], &'a [u8]); + + fn next(&mut self) -> Option<(&'a[u8], &'a[u8])> { + let native_iter = self.iter.inner; + if !self.iter.just_seeked { + match self.direction { + Direction::forward => unsafe { rocksdb_ffi::rocksdb_iter_next(native_iter) }, + Direction::reverse => unsafe { rocksdb_ffi::rocksdb_iter_prev(native_iter) }, + } + } else { + self.iter.just_seeked = false; + } + if unsafe { rocksdb_ffi::rocksdb_iter_valid(native_iter) } { + let mut key_len: size_t = 0; + let key_len_ptr: *mut size_t = &mut key_len; + let mut val_len: size_t = 0; + let val_len_ptr: *mut size_t = &mut val_len; + let key_ptr = unsafe { rocksdb_ffi::rocksdb_iter_key(native_iter, key_len_ptr) }; + let key = unsafe { slice::from_raw_parts(key_ptr, key_len as usize) }; + let val_ptr = unsafe { rocksdb_ffi::rocksdb_iter_value(native_iter, val_len_ptr) }; + let val = unsafe { slice::from_raw_parts(val_ptr, val_len as usize) }; + Some((key,val)) + } else { + None + } + } +} + +impl DBIterator { +//TODO alias db & opts to different lifetimes, and DBIterator to the db's lifetime + pub fn new(db: &RocksDB, readopts: &ReadOptions) -> DBIterator { + unsafe { + let iterator = rocksdb_ffi::rocksdb_create_iterator(db.inner, readopts.inner); + rocksdb_ffi::rocksdb_iter_seek_to_first(iterator); + DBIterator{ inner: iterator, direction: Direction::forward, just_seeked: true } + } + } + + pub fn from_start(&mut self) -> SubDBIterator { + self.just_seeked = true; + unsafe { + rocksdb_ffi::rocksdb_iter_seek_to_first(self.inner); + }; + SubDBIterator{ iter: self, direction: Direction::forward, } + } + + pub fn from_end(&mut self) -> SubDBIterator { + self.just_seeked = true; + unsafe { + rocksdb_ffi::rocksdb_iter_seek_to_last(self.inner); + }; + SubDBIterator{ iter: self, direction: Direction::reverse, } + } + + pub fn from(&mut self, key: &[u8], dir: Direction) -> SubDBIterator { + self.just_seeked = true; + unsafe { + rocksdb_ffi::rocksdb_iter_seek(self.inner, key.as_ptr(), key.len() as size_t); + } + SubDBIterator{ iter: self, direction: dir, } + } +} + +impl Drop for DBIterator { + fn drop(&mut self) { + println!("Dropped iter"); + unsafe { + rocksdb_ffi::rocksdb_iter_destroy(self.inner); + } + } +} + // This is for the RocksDB and write batches to share the same API pub trait Writable { - fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String>; - fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String>; - fn delete(&self, key: &[u8]) -> Result<(), String>; + fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), String>; + fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<(), String>; + fn delete(&mut self, key: &[u8]) -> Result<(), String>; } fn error_message(ptr: *const i8) -> String { let c_str = unsafe { CStr::from_ptr(ptr) }; +//TODO I think we're leaking the c string here; should be a call to free once realloced into rust String from_utf8(c_str.to_bytes()).unwrap().to_owned() } @@ -73,11 +164,12 @@ impl RocksDB { } } - let err = 0 as *mut i8; + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; let db: rocksdb_ffi::RocksDBInstance; unsafe { - db = rocksdb_ffi::rocksdb_open(opts.inner, cpath_ptr, err); + db = rocksdb_ffi::rocksdb_open(opts.inner, cpath_ptr, err_ptr); } if !err.is_null() { @@ -99,9 +191,10 @@ impl RocksDB { return Err("path does not exist".to_string()); } - let err = 0 as *mut i8; + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; unsafe { - rocksdb_ffi::rocksdb_destroy_db(opts.inner, cpath_ptr, err); + rocksdb_ffi::rocksdb_destroy_db(opts.inner, cpath_ptr, err_ptr); } if !err.is_null() { return Err(error_message(err)); @@ -118,9 +211,10 @@ impl RocksDB { return Err("path does not exist".to_string()); } - let err = 0 as *mut i8; + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; unsafe { - rocksdb_ffi::rocksdb_repair_db(opts.inner, cpath_ptr, err); + rocksdb_ffi::rocksdb_repair_db(opts.inner, cpath_ptr, err_ptr); } if !err.is_null() { return Err(error_message(err)); @@ -130,9 +224,10 @@ impl RocksDB { pub fn write(&self, batch: WriteBatch) -> Result<(), String> { let writeopts = unsafe { rocksdb_ffi::rocksdb_writeoptions_create() }; - let err = 0 as *mut i8; + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; unsafe { - rocksdb_ffi::rocksdb_write(self.inner, writeopts.clone(), batch.inner, err); + rocksdb_ffi::rocksdb_write(self.inner, writeopts.clone(), batch.inner, err_ptr); rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); } if !err.is_null() { @@ -153,9 +248,10 @@ impl RocksDB { let val_len: size_t = 0; let val_len_ptr = &val_len as *const size_t; - let err = 0 as *mut i8; + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; let val = rocksdb_ffi::rocksdb_get(self.inner, readopts.clone(), - key.as_ptr(), key.len() as size_t, val_len_ptr, err) as *mut u8; + key.as_ptr(), key.len() as size_t, val_len_ptr, err_ptr) as *mut u8; rocksdb_ffi::rocksdb_readoptions_destroy(readopts); if !err.is_null() { return RocksDBResult::Error(error_message(err)); @@ -169,19 +265,21 @@ impl RocksDB { } } - pub fn close(&self) { - unsafe { rocksdb_ffi::rocksdb_close(self.inner); } + pub fn iterator(&self) -> DBIterator { + let opts = ReadOptions::new(); + DBIterator::new(&self, &opts) } } impl Writable for RocksDB { - fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> { + fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> { unsafe { let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); - let err = 0 as *mut i8; + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; rocksdb_ffi::rocksdb_put(self.inner, writeopts.clone(), key.as_ptr(), key.len() as size_t, value.as_ptr(), - value.len() as size_t, err); + value.len() as size_t, err_ptr); rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); if !err.is_null() { return Err(error_message(err)); @@ -190,13 +288,14 @@ impl Writable for RocksDB { } } - fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> { + fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> { unsafe { let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); - let err = 0 as *mut i8; + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; rocksdb_ffi::rocksdb_merge(self.inner, writeopts.clone(), key.as_ptr(), key.len() as size_t, value.as_ptr(), - value.len() as size_t, err); + value.len() as size_t, err_ptr); rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); if !err.is_null() { return Err(error_message(err)); @@ -205,12 +304,13 @@ impl Writable for RocksDB { } } - fn delete(&self, key: &[u8]) -> Result<(), String> { + fn delete(&mut self, key: &[u8]) -> Result<(), String> { unsafe { let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); - let err = 0 as *mut i8; + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; rocksdb_ffi::rocksdb_delete(self.inner, writeopts.clone(), key.as_ptr(), - key.len() as size_t, err); + key.len() as size_t, err_ptr); rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); if !err.is_null() { return Err(error_message(err)); @@ -238,8 +338,14 @@ impl Drop for WriteBatch { } } +impl Drop for RocksDB { + fn drop(&mut self) { + unsafe { rocksdb_ffi::rocksdb_close(self.inner); } + } +} + impl Writable for WriteBatch { - fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> { + fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> { unsafe { rocksdb_ffi::rocksdb_writebatch_put(self.inner, key.as_ptr(), key.len() as size_t, value.as_ptr(), @@ -248,7 +354,7 @@ impl Writable for WriteBatch { } } - fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> { + fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> { unsafe { rocksdb_ffi::rocksdb_writebatch_merge(self.inner, key.as_ptr(), key.len() as size_t, value.as_ptr(), @@ -257,7 +363,7 @@ impl Writable for WriteBatch { } } - fn delete(&self, key: &[u8]) -> Result<(), String> { + fn delete(&mut self, key: &[u8]) -> Result<(), String> { unsafe { rocksdb_ffi::rocksdb_writebatch_delete(self.inner, key.as_ptr(), key.len() as size_t); @@ -275,6 +381,13 @@ impl Drop for ReadOptions { } impl ReadOptions { + fn new() -> ReadOptions { + unsafe { + ReadOptions{inner: rocksdb_ffi::rocksdb_readoptions_create()} + } + } +//TODO add snapshot setting here +//TODO add snapshot wrapper structs with proper destructors; that struct needs an "iterator" impl too. fn fill_cache(&mut self, v: bool) { unsafe { rocksdb_ffi::rocksdb_readoptions_set_fill_cache(self.inner, v); @@ -392,40 +505,75 @@ impl RocksDBResult { #[test] fn external() { let path = "_rust_rocksdb_externaltest"; - let db = RocksDB::open_default(path).unwrap(); - let p = db.put(b"k1", b"v1111"); - assert!(p.is_ok()); - let r: RocksDBResult = db.get(b"k1"); - assert!(r.unwrap().to_utf8().unwrap() == "v1111"); - assert!(db.delete(b"k1").is_ok()); - assert!(db.get(b"k1").is_none()); - db.close(); + { + let mut db = RocksDB::open_default(path).unwrap(); + let p = db.put(b"k1", b"v1111"); + assert!(p.is_ok()); + let r: RocksDBResult = db.get(b"k1"); + assert!(r.unwrap().to_utf8().unwrap() == "v1111"); + assert!(db.delete(b"k1").is_ok()); + assert!(db.get(b"k1").is_none()); + } let opts = Options::new(); - assert!(RocksDB::destroy(&opts, path).is_ok()); + let result = RocksDB::destroy(&opts, path); + assert!(result.is_ok()); +} + +#[test] +fn errors_do_stuff() { + let path = "_rust_rocksdb_error"; + let mut db = RocksDB::open_default(path).unwrap(); + let opts = Options::new(); + // The DB will still be open when we try to destroy and the lock should fail + match RocksDB::destroy(&opts, path) { + Err(ref s) => assert!(s == "IO error: lock _rust_rocksdb_error/LOCK: No locks available"), + Ok(_) => panic!("should fail") + } } #[test] fn writebatch_works() { let path = "_rust_rocksdb_writebacktest"; - let db = RocksDB::open_default(path).unwrap(); - { // test put - let batch = WriteBatch::new(); - assert!(db.get(b"k1").is_none()); - batch.put(b"k1", b"v1111"); - assert!(db.get(b"k1").is_none()); - let p = db.write(batch); - assert!(p.is_ok()); - let r: RocksDBResult = db.get(b"k1"); - assert!(r.unwrap().to_utf8().unwrap() == "v1111"); + { + let mut db = RocksDB::open_default(path).unwrap(); + { // test put + let mut batch = WriteBatch::new(); + assert!(db.get(b"k1").is_none()); + batch.put(b"k1", b"v1111"); + assert!(db.get(b"k1").is_none()); + let p = db.write(batch); + assert!(p.is_ok()); + let r: RocksDBResult = db.get(b"k1"); + assert!(r.unwrap().to_utf8().unwrap() == "v1111"); + } + { // test delete + let mut batch = WriteBatch::new(); + batch.delete(b"k1"); + let p = db.write(batch); + assert!(p.is_ok()); + assert!(db.get(b"k1").is_none()); + } } - { // test delete - let batch = WriteBatch::new(); - batch.delete(b"k1"); - let p = db.write(batch); + let opts = Options::new(); + assert!(RocksDB::destroy(&opts, path).is_ok()); +} + +#[test] +fn iterator_test() { + let path = "_rust_rocksdb_iteratortest"; + { + let mut db = RocksDB::open_default(path).unwrap(); + let p = db.put(b"k1", b"v1111"); assert!(p.is_ok()); - assert!(db.get(b"k1").is_none()); + let p = db.put(b"k2", b"v2222"); + assert!(p.is_ok()); + let p = db.put(b"k3", b"v3333"); + assert!(p.is_ok()); + let mut iter = db.iterator(); + for (k,v) in iter.from_start() { + println!("Hello {}: {}", from_utf8(k).unwrap(), from_utf8(v).unwrap()); + } } - db.close(); let opts = Options::new(); assert!(RocksDB::destroy(&opts, path).is_ok()); }