Add iterator API

master
David Greenberg 9 years ago
parent 909bc8819c
commit 86ab20ffd6
  1. 25
      src/comparator.rs
  2. 33
      src/ffi.rs
  3. 3
      src/lib.rs
  4. 124
      src/main.rs
  5. 47
      src/merge_operator.rs
  6. 252
      src/rocksdb.rs

@ -65,15 +65,16 @@ fn test_reverse_compare(a: &[u8], b: &[u8]) -> c_int {
}
}
#[allow(dead_code)]
#[test]
fn compare_works() {
let path = "_rust_rocksdb_comparetest";
let mut opts = Options::new();
opts.create_if_missing(true);
opts.add_comparator("test comparator", test_reverse_compare);
let db = RocksDB::open(&opts, path).unwrap();
// TODO add interesting test
db.close();
assert!(RocksDB::destroy(&opts, path).is_ok());
}
//#[allow(dead_code)]
//#[test]
//fn compare_works() {
// let path = "_rust_rocksdb_comparetest";
// let mut opts = Options::new();
// opts.create_if_missing(true);
// opts.add_comparator("test comparator", test_reverse_compare);
// {
// let db = RocksDB::open(&opts, path).unwrap();
// // TODO add interesting test
// }
// assert!(RocksDB::destroy(&opts, path).is_ok());
//}

@ -181,7 +181,7 @@ extern {
bits_per_key: c_int) -> RocksDBFilterPolicy;
pub fn rocksdb_open(options: RocksDBOptions,
path: *const i8,
err: *mut i8
err: *mut *const i8
) -> RocksDBInstance;
pub fn rocksdb_writeoptions_create() -> RocksDBWriteOptions;
pub fn rocksdb_writeoptions_destroy(writeopts: RocksDBWriteOptions);
@ -189,7 +189,7 @@ extern {
writeopts: RocksDBWriteOptions,
k: *const u8, kLen: size_t,
v: *const u8, vLen: size_t,
err: *mut i8);
err: *mut *const i8);
pub fn rocksdb_readoptions_create() -> RocksDBReadOptions;
pub fn rocksdb_readoptions_destroy(readopts: RocksDBReadOptions);
pub fn rocksdb_readoptions_set_verify_checksums(
@ -216,14 +216,14 @@ extern {
readopts: RocksDBReadOptions,
k: *const u8, kLen: size_t,
valLen: *const size_t,
err: *mut i8
err: *mut *const i8
) -> *mut c_void;
pub fn rocksdb_get_cf(db: RocksDBInstance,
readopts: RocksDBReadOptions,
cf_handle: RocksDBCFHandle,
k: *const u8, kLen: size_t,
valLen: *const size_t,
err: *mut i8
err: *mut *const i8
) -> *mut c_void;
pub fn rocksdb_create_iterator(db: RocksDBInstance,
readopts: RocksDBReadOptions
@ -239,19 +239,19 @@ extern {
pub fn rocksdb_delete(db: RocksDBInstance,
writeopts: RocksDBWriteOptions,
k: *const u8, kLen: size_t,
err: *mut i8
err: *mut *const i8
) -> *mut c_void;
pub fn rocksdb_close(db: RocksDBInstance);
pub fn rocksdb_destroy_db(options: RocksDBOptions,
path: *const i8, err: *mut i8);
path: *const i8, err: *mut *const i8);
pub fn rocksdb_repair_db(options: RocksDBOptions,
path: *const i8, err: *mut i8);
path: *const i8, err: *mut *const i8);
// Merge
pub fn rocksdb_merge(db: RocksDBInstance,
writeopts: RocksDBWriteOptions,
k: *const u8, kLen: size_t,
v: *const u8, vLen: size_t,
err: *mut i8);
err: *mut *const i8);
pub fn rocksdb_mergeoperator_create(
state: *mut c_void,
destroy: extern fn(*mut c_void) -> (),
@ -286,7 +286,7 @@ extern {
pub fn rocksdb_iter_seek_to_first(iter: RocksDBIterator);
pub fn rocksdb_iter_seek_to_last(iter: RocksDBIterator);
pub fn rocksdb_iter_seek(iter: RocksDBIterator,
key: *mut u8, klen: size_t);
key: *const u8, klen: size_t);
pub fn rocksdb_iter_next(iter: RocksDBIterator);
pub fn rocksdb_iter_prev(iter: RocksDBIterator);
pub fn rocksdb_iter_key(iter: RocksDBIterator,
@ -294,12 +294,12 @@ extern {
pub fn rocksdb_iter_value(iter: RocksDBIterator,
vlen: *mut size_t) -> *mut u8;
pub fn rocksdb_iter_get_error(iter: RocksDBIterator,
err: *const *const u8);
err: *mut *const u8);
// Write batch
pub fn rocksdb_write(db: RocksDBInstance,
writeopts: RocksDBWriteOptions,
batch : RocksDBWriteBatch,
err: *mut i8);
err: *mut *const i8);
pub fn rocksdb_writebatch_create() -> RocksDBWriteBatch;
pub fn rocksdb_writebatch_create_from(rep: *const u8,
size: size_t) -> RocksDBWriteBatch;
@ -370,8 +370,9 @@ fn internal() {
let cpath = CString::new(rustpath).unwrap();
let cpath_ptr = cpath.as_ptr();
let err = 0 as *mut i8;
let db = rocksdb_open(opts, cpath_ptr, err);
let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
let db = rocksdb_open(opts, cpath_ptr, err_ptr);
assert!(err.is_null());
let writeopts = rocksdb_writeoptions_create();
@ -379,7 +380,7 @@ fn internal() {
let key = b"name\x00";
let val = b"spacejam\x00";
rocksdb_put(db, writeopts.clone(), key.as_ptr(), 4, val.as_ptr(), 8, err);
rocksdb_put(db, writeopts.clone(), key.as_ptr(), 4, val.as_ptr(), 8, err_ptr);
rocksdb_writeoptions_destroy(writeopts);
assert!(err.is_null());
@ -388,11 +389,11 @@ fn internal() {
let val_len: size_t = 0;
let val_len_ptr = &val_len as *const size_t;
rocksdb_get(db, readopts.clone(), key.as_ptr(), 4, val_len_ptr, err);
rocksdb_get(db, readopts.clone(), key.as_ptr(), 4, val_len_ptr, err_ptr);
rocksdb_readoptions_destroy(readopts);
assert!(err.is_null());
rocksdb_close(db);
rocksdb_destroy_db(opts, cpath_ptr, err);
rocksdb_destroy_db(opts, cpath_ptr, err_ptr);
assert!(err.is_null());
}
}

@ -36,6 +36,9 @@ pub use rocksdb::{
RocksDBVector,
WriteBatch,
Writable,
DBIterator,
SubDBIterator,
Direction,
};
pub use rocksdb_options::{
Options,

@ -17,27 +17,74 @@
extern crate rocksdb;
extern crate test;
use rocksdb::{Options, RocksDB, MergeOperands, new_bloom_filter, Writable};
use rocksdb::{Options, RocksDB, MergeOperands, new_bloom_filter, Writable, DBIterator, SubDBIterator };
use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction;
fn iterator_test() {
let path = "_rust_rocksdb_iteratortest";
{
let mut db = RocksDB::open_default(path).unwrap();
let p = db.put(b"k1", b"v1111");
assert!(p.is_ok());
let p = db.put(b"k2", b"v2222");
assert!(p.is_ok());
let p = db.put(b"k3", b"v3333");
assert!(p.is_ok());
{
let mut view1 = db.iterator();
println!("See the output of the first iter");
for (k,v) in view1.from_start() {
println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap());
};
for (k,v) in view1.from_start() {
println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap());
};
for (k,v) in view1.from_end() {
println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap());
};
}
let mut view2 = db.iterator();
let p = db.put(b"k4", b"v4444");
assert!(p.is_ok());
let mut view3 = db.iterator();
println!("See the output of the second iter");
for (k,v) in view2.from_start() {
println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap());
}
println!("See the output of the third iter");
for (k,v) in view3.from_start() {
println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap());
}
println!("now the 3rd iter from k2 fwd");
for (k,v) in view3.from(b"k2", rocksdb::Direction::forward) {
println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap());
}
println!("now the 3rd iter from k2 and back");
for (k,v) in view3.from(b"k2", rocksdb::Direction::reverse) {
println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap());
}
}
let opts = Options::new();
assert!(RocksDB::destroy(&opts, path).is_ok());
}
#[cfg(not(feature = "valgrind"))]
fn main() {
iterator_test();
let path = "/tmp/rust-rocksdb";
let db = RocksDB::open_default(path).unwrap();
let mut db = RocksDB::open_default(path).unwrap();
assert!(db.put(b"my key", b"my value").is_ok());
db.get(b"my key").map( |value| {
match value.to_utf8() {
match value.to_utf8() {
Some(v) =>
println!("retrieved utf8 value: {}", v),
println!("retrieved utf8 value: {}", v),
None =>
println!("did not read valid utf-8 out of the db"),
}
})
.on_absent( || { println!("value not found") })
println!("did not read valid utf-8 out of the db"),
}
})
.on_absent( || { println!("value not found") })
.on_error( |e| { println!("error retrieving value: {}", e) });
assert!(db.delete(b"my key").is_ok());
db.close();
custom_merge();
}
@ -60,25 +107,26 @@ fn custom_merge() {
let mut opts = Options::new();
opts.create_if_missing(true);
opts.add_merge_operator("test operator", concat_merge);
let db = RocksDB::open(&opts, path).unwrap();
db.put(b"k1", b"a");
db.merge(b"k1", b"b");
db.merge(b"k1", b"c");
db.merge(b"k1", b"d");
db.merge(b"k1", b"efg");
db.merge(b"k1", b"h");
db.get(b"k1").map( |value| {
match value.to_utf8() {
Some(v) =>
{
let mut db = RocksDB::open(&opts, path).unwrap();
db.put(b"k1", b"a");
db.merge(b"k1", b"b");
db.merge(b"k1", b"c");
db.merge(b"k1", b"d");
db.merge(b"k1", b"efg");
db.merge(b"k1", b"h");
db.get(b"k1").map( |value| {
match value.to_utf8() {
Some(v) =>
println!("retrieved utf8 value: {}", v),
None =>
None =>
println!("did not read valid utf-8 out of the db"),
}
})
}
})
.on_absent( || { println!("value not found") })
.on_error( |e| { println!("error retrieving value: {}", e) });
.on_error( |e| { println!("error retrieving value: {}", e) });
db.close();
}
RocksDB::destroy(&opts, path).is_ok();
}
@ -114,10 +162,10 @@ mod tests {
use test::Bencher;
use std::thread::sleep_ms;
use rocksdb::{BlockBasedOptions, Options, RocksDB, MergeOperands, new_bloom_filter, Writable};
use rocksdb::{BlockBasedOptions, Options, RocksDB, MergeOperands, new_bloom_filter, Writable };
use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction;
fn tuned_for_somebody_elses_disk(path: &str, opts: &mut Options, blockopts: &mut BlockBasedOptions) -> RocksDB {
fn tuned_for_somebody_elses_disk(path: &str, opts: & mut Options, blockopts: &mut BlockBasedOptions) -> RocksDB {
opts.create_if_missing(true);
opts.set_max_open_files(10000);
opts.set_use_fsync(false);
@ -152,13 +200,12 @@ mod tests {
let path = "_rust_rocksdb_optimizetest";
let mut opts = Options::new();
let mut blockopts = BlockBasedOptions::new();
let db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts);
let mut db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts);
let mut i = 0 as u64;
b.iter(|| {
db.put(i.to_string().as_bytes(), b"v1111");
i += 1;
});
db.close();
}
#[bench]
@ -166,16 +213,17 @@ mod tests {
let path = "_rust_rocksdb_optimizetest";
let mut opts = Options::new();
let mut blockopts = BlockBasedOptions::new();
let db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts);
let mut i = 0 as u64;
b.iter(|| {
db.get(i.to_string().as_bytes()).on_error( |e| {
println!("error: {}", e);
e
});
i += 1;
});
db.close();
{
let db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts);
let mut i = 0 as u64;
b.iter(|| {
db.get(i.to_string().as_bytes()).on_error( |e| {
println!("error: {}", e);
e
});
i += 1;
});
}
RocksDB::destroy(&opts, path).is_ok();
}
}

@ -169,30 +169,31 @@ fn mergetest() {
let mut opts = Options::new();
opts.create_if_missing(true);
opts.add_merge_operator("test operator", test_provided_merge);
let db = RocksDB::open(&opts, path).unwrap();
let p = db.put(b"k1", b"a");
assert!(p.is_ok());
db.merge(b"k1", b"b");
db.merge(b"k1", b"c");
db.merge(b"k1", b"d");
db.merge(b"k1", b"efg");
let m = db.merge(b"k1", b"h");
assert!(m.is_ok());
db.get(b"k1").map( |value| {
match value.to_utf8() {
Some(v) =>
{
let mut db = RocksDB::open(&opts, path).unwrap();
let p = db.put(b"k1", b"a");
assert!(p.is_ok());
db.merge(b"k1", b"b");
db.merge(b"k1", b"c");
db.merge(b"k1", b"d");
db.merge(b"k1", b"efg");
let m = db.merge(b"k1", b"h");
assert!(m.is_ok());
db.get(b"k1").map( |value| {
match value.to_utf8() {
Some(v) =>
println!("retrieved utf8 value: {}", v),
None =>
None =>
println!("did not read valid utf-8 out of the db"),
}
}).on_absent( || { println!("value not present!") })
.on_error( |e| { println!("error reading value")}); //: {", e) });
assert!(m.is_ok());
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh");
assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").is_none());
db.close();
}
}).on_absent( || { println!("value not present!") })
.on_error( |e| { println!("error reading value")}); //: {", e) });
assert!(m.is_ok());
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh");
assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").is_none());
}
assert!(RocksDB::destroy(&opts, path).is_ok());
}

@ -23,6 +23,7 @@ use std::path::Path;
use std::ptr::Unique;
use std::slice;
use std::str::from_utf8;
use std::marker::PhantomData;
use rocksdb_ffi;
use rocksdb_options::Options;
@ -39,15 +40,105 @@ pub struct ReadOptions {
inner: rocksdb_ffi::RocksDBReadOptions,
}
pub struct DBIterator {
//TODO: should have a reference to DB to enforce scope, but it's trickier than I thought to add
inner: rocksdb_ffi::RocksDBIterator,
direction: Direction,
just_seeked: bool
}
pub enum Direction {
forward, reverse
}
pub struct SubDBIterator<'a> {
iter: &'a mut DBIterator,
direction: Direction,
}
impl <'a> Iterator for SubDBIterator<'a> {
type Item = (&'a [u8], &'a [u8]);
fn next(&mut self) -> Option<(&'a[u8], &'a[u8])> {
let native_iter = self.iter.inner;
if !self.iter.just_seeked {
match self.direction {
Direction::forward => unsafe { rocksdb_ffi::rocksdb_iter_next(native_iter) },
Direction::reverse => unsafe { rocksdb_ffi::rocksdb_iter_prev(native_iter) },
}
} else {
self.iter.just_seeked = false;
}
if unsafe { rocksdb_ffi::rocksdb_iter_valid(native_iter) } {
let mut key_len: size_t = 0;
let key_len_ptr: *mut size_t = &mut key_len;
let mut val_len: size_t = 0;
let val_len_ptr: *mut size_t = &mut val_len;
let key_ptr = unsafe { rocksdb_ffi::rocksdb_iter_key(native_iter, key_len_ptr) };
let key = unsafe { slice::from_raw_parts(key_ptr, key_len as usize) };
let val_ptr = unsafe { rocksdb_ffi::rocksdb_iter_value(native_iter, val_len_ptr) };
let val = unsafe { slice::from_raw_parts(val_ptr, val_len as usize) };
Some((key,val))
} else {
None
}
}
}
impl DBIterator {
//TODO alias db & opts to different lifetimes, and DBIterator to the db's lifetime
pub fn new(db: &RocksDB, readopts: &ReadOptions) -> DBIterator {
unsafe {
let iterator = rocksdb_ffi::rocksdb_create_iterator(db.inner, readopts.inner);
rocksdb_ffi::rocksdb_iter_seek_to_first(iterator);
DBIterator{ inner: iterator, direction: Direction::forward, just_seeked: true }
}
}
pub fn from_start(&mut self) -> SubDBIterator {
self.just_seeked = true;
unsafe {
rocksdb_ffi::rocksdb_iter_seek_to_first(self.inner);
};
SubDBIterator{ iter: self, direction: Direction::forward, }
}
pub fn from_end(&mut self) -> SubDBIterator {
self.just_seeked = true;
unsafe {
rocksdb_ffi::rocksdb_iter_seek_to_last(self.inner);
};
SubDBIterator{ iter: self, direction: Direction::reverse, }
}
pub fn from(&mut self, key: &[u8], dir: Direction) -> SubDBIterator {
self.just_seeked = true;
unsafe {
rocksdb_ffi::rocksdb_iter_seek(self.inner, key.as_ptr(), key.len() as size_t);
}
SubDBIterator{ iter: self, direction: dir, }
}
}
impl Drop for DBIterator {
fn drop(&mut self) {
println!("Dropped iter");
unsafe {
rocksdb_ffi::rocksdb_iter_destroy(self.inner);
}
}
}
// This is for the RocksDB and write batches to share the same API
pub trait Writable {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String>;
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String>;
fn delete(&self, key: &[u8]) -> Result<(), String>;
fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), String>;
fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<(), String>;
fn delete(&mut self, key: &[u8]) -> Result<(), String>;
}
fn error_message(ptr: *const i8) -> String {
let c_str = unsafe { CStr::from_ptr(ptr) };
//TODO I think we're leaking the c string here; should be a call to free once realloced into rust String
from_utf8(c_str.to_bytes()).unwrap().to_owned()
}
@ -73,11 +164,12 @@ impl RocksDB {
}
}
let err = 0 as *mut i8;
let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
let db: rocksdb_ffi::RocksDBInstance;
unsafe {
db = rocksdb_ffi::rocksdb_open(opts.inner, cpath_ptr, err);
db = rocksdb_ffi::rocksdb_open(opts.inner, cpath_ptr, err_ptr);
}
if !err.is_null() {
@ -99,9 +191,10 @@ impl RocksDB {
return Err("path does not exist".to_string());
}
let err = 0 as *mut i8;
let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
unsafe {
rocksdb_ffi::rocksdb_destroy_db(opts.inner, cpath_ptr, err);
rocksdb_ffi::rocksdb_destroy_db(opts.inner, cpath_ptr, err_ptr);
}
if !err.is_null() {
return Err(error_message(err));
@ -118,9 +211,10 @@ impl RocksDB {
return Err("path does not exist".to_string());
}
let err = 0 as *mut i8;
let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
unsafe {
rocksdb_ffi::rocksdb_repair_db(opts.inner, cpath_ptr, err);
rocksdb_ffi::rocksdb_repair_db(opts.inner, cpath_ptr, err_ptr);
}
if !err.is_null() {
return Err(error_message(err));
@ -130,9 +224,10 @@ impl RocksDB {
pub fn write(&self, batch: WriteBatch) -> Result<(), String> {
let writeopts = unsafe { rocksdb_ffi::rocksdb_writeoptions_create() };
let err = 0 as *mut i8;
let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
unsafe {
rocksdb_ffi::rocksdb_write(self.inner, writeopts.clone(), batch.inner, err);
rocksdb_ffi::rocksdb_write(self.inner, writeopts.clone(), batch.inner, err_ptr);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
}
if !err.is_null() {
@ -153,9 +248,10 @@ impl RocksDB {
let val_len: size_t = 0;
let val_len_ptr = &val_len as *const size_t;
let err = 0 as *mut i8;
let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
let val = rocksdb_ffi::rocksdb_get(self.inner, readopts.clone(),
key.as_ptr(), key.len() as size_t, val_len_ptr, err) as *mut u8;
key.as_ptr(), key.len() as size_t, val_len_ptr, err_ptr) as *mut u8;
rocksdb_ffi::rocksdb_readoptions_destroy(readopts);
if !err.is_null() {
return RocksDBResult::Error(error_message(err));
@ -169,19 +265,21 @@ impl RocksDB {
}
}
pub fn close(&self) {
unsafe { rocksdb_ffi::rocksdb_close(self.inner); }
pub fn iterator(&self) -> DBIterator {
let opts = ReadOptions::new();
DBIterator::new(&self, &opts)
}
}
impl Writable for RocksDB {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8;
let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
rocksdb_ffi::rocksdb_put(self.inner, writeopts.clone(), key.as_ptr(),
key.len() as size_t, value.as_ptr(),
value.len() as size_t, err);
value.len() as size_t, err_ptr);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
if !err.is_null() {
return Err(error_message(err));
@ -190,13 +288,14 @@ impl Writable for RocksDB {
}
}
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8;
let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
rocksdb_ffi::rocksdb_merge(self.inner, writeopts.clone(), key.as_ptr(),
key.len() as size_t, value.as_ptr(),
value.len() as size_t, err);
value.len() as size_t, err_ptr);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
if !err.is_null() {
return Err(error_message(err));
@ -205,12 +304,13 @@ impl Writable for RocksDB {
}
}
fn delete(&self, key: &[u8]) -> Result<(), String> {
fn delete(&mut self, key: &[u8]) -> Result<(), String> {
unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8;
let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
rocksdb_ffi::rocksdb_delete(self.inner, writeopts.clone(), key.as_ptr(),
key.len() as size_t, err);
key.len() as size_t, err_ptr);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
if !err.is_null() {
return Err(error_message(err));
@ -238,8 +338,14 @@ impl Drop for WriteBatch {
}
}
impl Drop for RocksDB {
fn drop(&mut self) {
unsafe { rocksdb_ffi::rocksdb_close(self.inner); }
}
}
impl Writable for WriteBatch {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe {
rocksdb_ffi::rocksdb_writebatch_put(self.inner, key.as_ptr(),
key.len() as size_t, value.as_ptr(),
@ -248,7 +354,7 @@ impl Writable for WriteBatch {
}
}
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe {
rocksdb_ffi::rocksdb_writebatch_merge(self.inner, key.as_ptr(),
key.len() as size_t, value.as_ptr(),
@ -257,7 +363,7 @@ impl Writable for WriteBatch {
}
}
fn delete(&self, key: &[u8]) -> Result<(), String> {
fn delete(&mut self, key: &[u8]) -> Result<(), String> {
unsafe {
rocksdb_ffi::rocksdb_writebatch_delete(self.inner, key.as_ptr(),
key.len() as size_t);
@ -275,6 +381,13 @@ impl Drop for ReadOptions {
}
impl ReadOptions {
fn new() -> ReadOptions {
unsafe {
ReadOptions{inner: rocksdb_ffi::rocksdb_readoptions_create()}
}
}
//TODO add snapshot setting here
//TODO add snapshot wrapper structs with proper destructors; that struct needs an "iterator" impl too.
fn fill_cache(&mut self, v: bool) {
unsafe {
rocksdb_ffi::rocksdb_readoptions_set_fill_cache(self.inner, v);
@ -392,40 +505,75 @@ impl <T, E> RocksDBResult<T, E> {
#[test]
fn external() {
let path = "_rust_rocksdb_externaltest";
let db = RocksDB::open_default(path).unwrap();
let p = db.put(b"k1", b"v1111");
assert!(p.is_ok());
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "v1111");
assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").is_none());
db.close();
{
let mut db = RocksDB::open_default(path).unwrap();
let p = db.put(b"k1", b"v1111");
assert!(p.is_ok());
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "v1111");
assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").is_none());
}
let opts = Options::new();
assert!(RocksDB::destroy(&opts, path).is_ok());
let result = RocksDB::destroy(&opts, path);
assert!(result.is_ok());
}
#[test]
fn errors_do_stuff() {
let path = "_rust_rocksdb_error";
let mut db = RocksDB::open_default(path).unwrap();
let opts = Options::new();
// The DB will still be open when we try to destroy and the lock should fail
match RocksDB::destroy(&opts, path) {
Err(ref s) => assert!(s == "IO error: lock _rust_rocksdb_error/LOCK: No locks available"),
Ok(_) => panic!("should fail")
}
}
#[test]
fn writebatch_works() {
let path = "_rust_rocksdb_writebacktest";
let db = RocksDB::open_default(path).unwrap();
{ // test put
let batch = WriteBatch::new();
assert!(db.get(b"k1").is_none());
batch.put(b"k1", b"v1111");
assert!(db.get(b"k1").is_none());
let p = db.write(batch);
assert!(p.is_ok());
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "v1111");
{
let mut db = RocksDB::open_default(path).unwrap();
{ // test put
let mut batch = WriteBatch::new();
assert!(db.get(b"k1").is_none());
batch.put(b"k1", b"v1111");
assert!(db.get(b"k1").is_none());
let p = db.write(batch);
assert!(p.is_ok());
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "v1111");
}
{ // test delete
let mut batch = WriteBatch::new();
batch.delete(b"k1");
let p = db.write(batch);
assert!(p.is_ok());
assert!(db.get(b"k1").is_none());
}
}
{ // test delete
let batch = WriteBatch::new();
batch.delete(b"k1");
let p = db.write(batch);
let opts = Options::new();
assert!(RocksDB::destroy(&opts, path).is_ok());
}
#[test]
fn iterator_test() {
let path = "_rust_rocksdb_iteratortest";
{
let mut db = RocksDB::open_default(path).unwrap();
let p = db.put(b"k1", b"v1111");
assert!(p.is_ok());
assert!(db.get(b"k1").is_none());
let p = db.put(b"k2", b"v2222");
assert!(p.is_ok());
let p = db.put(b"k3", b"v3333");
assert!(p.is_ok());
let mut iter = db.iterator();
for (k,v) in iter.from_start() {
println!("Hello {}: {}", from_utf8(k).unwrap(), from_utf8(v).unwrap());
}
}
db.close();
let opts = Options::new();
assert!(RocksDB::destroy(&opts, path).is_ok());
}

Loading…
Cancel
Save