Merge pull request #15 from dgrnbrg/iterator-dev

Add Iterators and Snapshots
master
Tyler Neely 10 years ago
commit b6cf467a00
  1. 62
      README.md
  2. 25
      src/comparator.rs
  3. 33
      src/ffi.rs
  4. 5
      src/lib.rs
  5. 153
      src/main.rs
  6. 47
      src/merge_operator.rs
  7. 291
      src/rocksdb.rs

@ -11,9 +11,9 @@ This library has been tested against RocksDB 3.8.1 on linux and OSX. The 0.0.6
- [x] compaction filter, style - [x] compaction filter, style
- [x] LRU cache - [x] LRU cache
- [x] destroy/repair - [x] destroy/repair
- [ ] iterator - [x] iterator
- [ ] comparator - [ ] comparator
- [ ] snapshot - [x] snapshot
- [ ] column family operations - [ ] column family operations
- [ ] slicetransform - [ ] slicetransform
- [ ] windows support - [ ] windows support
@ -39,7 +39,7 @@ extern crate rocksdb;
use rocksdb::RocksDB; use rocksdb::RocksDB;
fn main() { fn main() {
let db = RocksDB::open_default("/path/for/rocksdb/storage").unwrap(); let mut db = RocksDB::open_default("/path/for/rocksdb/storage").unwrap();
db.put(b"my key", b"my value"); db.put(b"my key", b"my value");
db.get(b"my key") db.get(b"my key")
.map( |value| { .map( |value| {
@ -49,7 +49,58 @@ fn main() {
.on_error( |e| { println!("operational problem encountered: {}", e) }); .on_error( |e| { println!("operational problem encountered: {}", e) });
db.delete(b"my key"); db.delete(b"my key");
db.close(); }
```
###### Doing an atomic commit of several writes
```rust
extern crate rocksdb;
use rocksdb::{RocksDB, WriteBatch, Writable};
fn main() {
// NB: db is automatically freed at end of lifetime
let mut db = RocksDB::open_default("/path/for/rocksdb/storage").unwrap();
{
let mut batch = WriteBatch::new(); // WriteBatch and db both have trait Writable
batch.put(b"my key", b"my value");
batch.put(b"key2", b"value2");
batch.put(b"key3", b"value3");
db.write(batch); // Atomically commits the batch
}
}
```
###### Getting an Iterator
```rust
extern crate rocksdb;
use rocksdb::{RocksDB, Direction};
fn main() {
// NB: db is automatically freed at end of lifetime
let mut db = RocksDB::open_default("/path/for/rocksdb/storage").unwrap();
let mut iter = db.iterator();
for (key, value) in iter.from_start() { // Always iterates forward
println!("Saw {} {}", key, value); //actually, need to convert [u8] keys into Strings
}
for (key, value) in iter.from_end() { //Always iterates backward
println!("Saw {} {}", key, value);
}
for (key, value) in iter.from(b"my key", Direction::forward) { // From a key in Direction::{forward,reverse}
println!("Saw {} {}", key, value);
}
}
```
###### Getting an Iterator
```rust
extern crate rocksdb;
use rocksdb::{RocksDB, Direction};
fn main() {
// NB: db is automatically freed at end of lifetime
let mut db = RocksDB::open_default("/path/for/rocksdb/storage").unwrap();
let snapshot = db.snapshot(); // Creates a longer-term snapshot of the DB, but freed when goes out of scope
let mut iter = snapshot.iterator(); // Make as many iterators as you'd like from one snapshot
} }
``` ```
@ -73,7 +124,7 @@ fn main() {
let opts = RocksDBOptions::new(); let opts = RocksDBOptions::new();
opts.create_if_missing(true); opts.create_if_missing(true);
opts.add_merge_operator("test operator", concat_merge); opts.add_merge_operator("test operator", concat_merge);
let db = RocksDB::open(opts, path).unwrap(); let mut db = RocksDB::open(opts, path).unwrap();
let p = db.put(b"k1", b"a"); let p = db.put(b"k1", b"a");
db.merge(b"k1", b"b"); db.merge(b"k1", b"b");
db.merge(b"k1", b"c"); db.merge(b"k1", b"c");
@ -81,7 +132,6 @@ fn main() {
db.merge(b"k1", b"efg"); db.merge(b"k1", b"efg");
let r = db.get(b"k1"); let r = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "abcdefg"); assert!(r.unwrap().to_utf8().unwrap() == "abcdefg");
db.close();
} }
``` ```

@ -65,15 +65,16 @@ fn test_reverse_compare(a: &[u8], b: &[u8]) -> c_int {
} }
} }
#[allow(dead_code)] //#[allow(dead_code)]
#[test] //#[test]
fn compare_works() { //fn compare_works() {
let path = "_rust_rocksdb_comparetest"; // let path = "_rust_rocksdb_comparetest";
let mut opts = Options::new(); // let mut opts = Options::new();
opts.create_if_missing(true); // opts.create_if_missing(true);
opts.add_comparator("test comparator", test_reverse_compare); // opts.add_comparator("test comparator", test_reverse_compare);
let db = RocksDB::open(&opts, path).unwrap(); // {
// TODO add interesting test // let db = RocksDB::open(&opts, path).unwrap();
db.close(); // // TODO add interesting test
assert!(RocksDB::destroy(&opts, path).is_ok()); // }
} // assert!(RocksDB::destroy(&opts, path).is_ok());
//}

@ -181,7 +181,7 @@ extern {
bits_per_key: c_int) -> RocksDBFilterPolicy; bits_per_key: c_int) -> RocksDBFilterPolicy;
pub fn rocksdb_open(options: RocksDBOptions, pub fn rocksdb_open(options: RocksDBOptions,
path: *const i8, path: *const i8,
err: *mut i8 err: *mut *const i8
) -> RocksDBInstance; ) -> RocksDBInstance;
pub fn rocksdb_writeoptions_create() -> RocksDBWriteOptions; pub fn rocksdb_writeoptions_create() -> RocksDBWriteOptions;
pub fn rocksdb_writeoptions_destroy(writeopts: RocksDBWriteOptions); pub fn rocksdb_writeoptions_destroy(writeopts: RocksDBWriteOptions);
@ -189,7 +189,7 @@ extern {
writeopts: RocksDBWriteOptions, writeopts: RocksDBWriteOptions,
k: *const u8, kLen: size_t, k: *const u8, kLen: size_t,
v: *const u8, vLen: size_t, v: *const u8, vLen: size_t,
err: *mut i8); err: *mut *const i8);
pub fn rocksdb_readoptions_create() -> RocksDBReadOptions; pub fn rocksdb_readoptions_create() -> RocksDBReadOptions;
pub fn rocksdb_readoptions_destroy(readopts: RocksDBReadOptions); pub fn rocksdb_readoptions_destroy(readopts: RocksDBReadOptions);
pub fn rocksdb_readoptions_set_verify_checksums( pub fn rocksdb_readoptions_set_verify_checksums(
@ -216,14 +216,14 @@ extern {
readopts: RocksDBReadOptions, readopts: RocksDBReadOptions,
k: *const u8, kLen: size_t, k: *const u8, kLen: size_t,
valLen: *const size_t, valLen: *const size_t,
err: *mut i8 err: *mut *const i8
) -> *mut c_void; ) -> *mut c_void;
pub fn rocksdb_get_cf(db: RocksDBInstance, pub fn rocksdb_get_cf(db: RocksDBInstance,
readopts: RocksDBReadOptions, readopts: RocksDBReadOptions,
cf_handle: RocksDBCFHandle, cf_handle: RocksDBCFHandle,
k: *const u8, kLen: size_t, k: *const u8, kLen: size_t,
valLen: *const size_t, valLen: *const size_t,
err: *mut i8 err: *mut *const i8
) -> *mut c_void; ) -> *mut c_void;
pub fn rocksdb_create_iterator(db: RocksDBInstance, pub fn rocksdb_create_iterator(db: RocksDBInstance,
readopts: RocksDBReadOptions readopts: RocksDBReadOptions
@ -239,19 +239,19 @@ extern {
pub fn rocksdb_delete(db: RocksDBInstance, pub fn rocksdb_delete(db: RocksDBInstance,
writeopts: RocksDBWriteOptions, writeopts: RocksDBWriteOptions,
k: *const u8, kLen: size_t, k: *const u8, kLen: size_t,
err: *mut i8 err: *mut *const i8
) -> *mut c_void; ) -> *mut c_void;
pub fn rocksdb_close(db: RocksDBInstance); pub fn rocksdb_close(db: RocksDBInstance);
pub fn rocksdb_destroy_db(options: RocksDBOptions, pub fn rocksdb_destroy_db(options: RocksDBOptions,
path: *const i8, err: *mut i8); path: *const i8, err: *mut *const i8);
pub fn rocksdb_repair_db(options: RocksDBOptions, pub fn rocksdb_repair_db(options: RocksDBOptions,
path: *const i8, err: *mut i8); path: *const i8, err: *mut *const i8);
// Merge // Merge
pub fn rocksdb_merge(db: RocksDBInstance, pub fn rocksdb_merge(db: RocksDBInstance,
writeopts: RocksDBWriteOptions, writeopts: RocksDBWriteOptions,
k: *const u8, kLen: size_t, k: *const u8, kLen: size_t,
v: *const u8, vLen: size_t, v: *const u8, vLen: size_t,
err: *mut i8); err: *mut *const i8);
pub fn rocksdb_mergeoperator_create( pub fn rocksdb_mergeoperator_create(
state: *mut c_void, state: *mut c_void,
destroy: extern fn(*mut c_void) -> (), destroy: extern fn(*mut c_void) -> (),
@ -286,7 +286,7 @@ extern {
pub fn rocksdb_iter_seek_to_first(iter: RocksDBIterator); pub fn rocksdb_iter_seek_to_first(iter: RocksDBIterator);
pub fn rocksdb_iter_seek_to_last(iter: RocksDBIterator); pub fn rocksdb_iter_seek_to_last(iter: RocksDBIterator);
pub fn rocksdb_iter_seek(iter: RocksDBIterator, pub fn rocksdb_iter_seek(iter: RocksDBIterator,
key: *mut u8, klen: size_t); key: *const u8, klen: size_t);
pub fn rocksdb_iter_next(iter: RocksDBIterator); pub fn rocksdb_iter_next(iter: RocksDBIterator);
pub fn rocksdb_iter_prev(iter: RocksDBIterator); pub fn rocksdb_iter_prev(iter: RocksDBIterator);
pub fn rocksdb_iter_key(iter: RocksDBIterator, pub fn rocksdb_iter_key(iter: RocksDBIterator,
@ -294,12 +294,12 @@ extern {
pub fn rocksdb_iter_value(iter: RocksDBIterator, pub fn rocksdb_iter_value(iter: RocksDBIterator,
vlen: *mut size_t) -> *mut u8; vlen: *mut size_t) -> *mut u8;
pub fn rocksdb_iter_get_error(iter: RocksDBIterator, pub fn rocksdb_iter_get_error(iter: RocksDBIterator,
err: *const *const u8); err: *mut *const u8);
// Write batch // Write batch
pub fn rocksdb_write(db: RocksDBInstance, pub fn rocksdb_write(db: RocksDBInstance,
writeopts: RocksDBWriteOptions, writeopts: RocksDBWriteOptions,
batch : RocksDBWriteBatch, batch : RocksDBWriteBatch,
err: *mut i8); err: *mut *const i8);
pub fn rocksdb_writebatch_create() -> RocksDBWriteBatch; pub fn rocksdb_writebatch_create() -> RocksDBWriteBatch;
pub fn rocksdb_writebatch_create_from(rep: *const u8, pub fn rocksdb_writebatch_create_from(rep: *const u8,
size: size_t) -> RocksDBWriteBatch; size: size_t) -> RocksDBWriteBatch;
@ -370,8 +370,9 @@ fn internal() {
let cpath = CString::new(rustpath).unwrap(); let cpath = CString::new(rustpath).unwrap();
let cpath_ptr = cpath.as_ptr(); let cpath_ptr = cpath.as_ptr();
let err = 0 as *mut i8; let mut err: *const i8 = 0 as *const i8;
let db = rocksdb_open(opts, cpath_ptr, err); let err_ptr: *mut *const i8 = &mut err;
let db = rocksdb_open(opts, cpath_ptr, err_ptr);
assert!(err.is_null()); assert!(err.is_null());
let writeopts = rocksdb_writeoptions_create(); let writeopts = rocksdb_writeoptions_create();
@ -379,7 +380,7 @@ fn internal() {
let key = b"name\x00"; let key = b"name\x00";
let val = b"spacejam\x00"; let val = b"spacejam\x00";
rocksdb_put(db, writeopts.clone(), key.as_ptr(), 4, val.as_ptr(), 8, err); rocksdb_put(db, writeopts.clone(), key.as_ptr(), 4, val.as_ptr(), 8, err_ptr);
rocksdb_writeoptions_destroy(writeopts); rocksdb_writeoptions_destroy(writeopts);
assert!(err.is_null()); assert!(err.is_null());
@ -388,11 +389,11 @@ fn internal() {
let val_len: size_t = 0; let val_len: size_t = 0;
let val_len_ptr = &val_len as *const size_t; let val_len_ptr = &val_len as *const size_t;
rocksdb_get(db, readopts.clone(), key.as_ptr(), 4, val_len_ptr, err); rocksdb_get(db, readopts.clone(), key.as_ptr(), 4, val_len_ptr, err_ptr);
rocksdb_readoptions_destroy(readopts); rocksdb_readoptions_destroy(readopts);
assert!(err.is_null()); assert!(err.is_null());
rocksdb_close(db); rocksdb_close(db);
rocksdb_destroy_db(opts, cpath_ptr, err); rocksdb_destroy_db(opts, cpath_ptr, err_ptr);
assert!(err.is_null()); assert!(err.is_null());
} }
} }

@ -23,11 +23,7 @@
pub use ffi as rocksdb_ffi; pub use ffi as rocksdb_ffi;
pub use ffi::{ pub use ffi::{
new_bloom_filter, new_bloom_filter,
new_cache,
RocksDBUniversalCompactionStyle,
RocksDBCompactionStyle, RocksDBCompactionStyle,
RocksDBCompressionType,
RocksDBSnapshot,
RocksDBComparator, RocksDBComparator,
}; };
pub use rocksdb::{ pub use rocksdb::{
@ -36,6 +32,7 @@ pub use rocksdb::{
RocksDBVector, RocksDBVector,
WriteBatch, WriteBatch,
Writable, Writable,
Direction,
}; };
pub use rocksdb_options::{ pub use rocksdb_options::{
Options, Options,

@ -17,27 +17,103 @@
extern crate rocksdb; extern crate rocksdb;
extern crate test; extern crate test;
use rocksdb::{Options, RocksDB, MergeOperands, new_bloom_filter, Writable}; use rocksdb::{Options, RocksDB, MergeOperands, new_bloom_filter, Writable, };
use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction; use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction;
fn snapshot_test() {
let path = "_rust_rocksdb_iteratortest";
{
let mut db = RocksDB::open_default(path).unwrap();
let p = db.put(b"k1", b"v1111");
assert!(p.is_ok());
let p = db.put(b"k2", b"v2222");
assert!(p.is_ok());
let p = db.put(b"k3", b"v3333");
assert!(p.is_ok());
let mut snap = db.snapshot();
let mut view1 = snap.iterator();
println!("See the output of the first iter");
for (k,v) in view1.from_start() {
println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap());
};
for (k,v) in view1.from_start() {
println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap());
};
for (k,v) in view1.from_end() {
println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap());
};
}
let opts = Options::new();
assert!(RocksDB::destroy(&opts, path).is_ok());
}
fn iterator_test() {
let path = "_rust_rocksdb_iteratortest";
{
let mut db = RocksDB::open_default(path).unwrap();
let p = db.put(b"k1", b"v1111");
assert!(p.is_ok());
let p = db.put(b"k2", b"v2222");
assert!(p.is_ok());
let p = db.put(b"k3", b"v3333");
assert!(p.is_ok());
{
let mut view1 = db.iterator();
println!("See the output of the first iter");
for (k,v) in view1.from_start() {
println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap());
};
for (k,v) in view1.from_start() {
println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap());
};
for (k,v) in view1.from_end() {
println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap());
};
}
let mut view2 = db.iterator();
let p = db.put(b"k4", b"v4444");
assert!(p.is_ok());
let mut view3 = db.iterator();
println!("See the output of the second iter");
for (k,v) in view2.from_start() {
println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap());
}
println!("See the output of the third iter");
for (k,v) in view3.from_start() {
println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap());
}
println!("now the 3rd iter from k2 fwd");
for (k,v) in view3.from(b"k2", rocksdb::Direction::forward) {
println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap());
}
println!("now the 3rd iter from k2 and back");
for (k,v) in view3.from(b"k2", rocksdb::Direction::reverse) {
println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap());
}
}
let opts = Options::new();
assert!(RocksDB::destroy(&opts, path).is_ok());
}
#[cfg(not(feature = "valgrind"))] #[cfg(not(feature = "valgrind"))]
fn main() { fn main() {
snapshot_test();
iterator_test();
let path = "/tmp/rust-rocksdb"; let path = "/tmp/rust-rocksdb";
let db = RocksDB::open_default(path).unwrap(); let mut db = RocksDB::open_default(path).unwrap();
assert!(db.put(b"my key", b"my value").is_ok()); assert!(db.put(b"my key", b"my value").is_ok());
db.get(b"my key").map( |value| { db.get(b"my key").map( |value| {
match value.to_utf8() { match value.to_utf8() {
Some(v) => Some(v) =>
println!("retrieved utf8 value: {}", v), println!("retrieved utf8 value: {}", v),
None => None =>
println!("did not read valid utf-8 out of the db"), println!("did not read valid utf-8 out of the db"),
} }
}) })
.on_absent( || { println!("value not found") }) .on_absent( || { println!("value not found") })
.on_error( |e| { println!("error retrieving value: {}", e) }); .on_error( |e| { println!("error retrieving value: {}", e) });
assert!(db.delete(b"my key").is_ok()); assert!(db.delete(b"my key").is_ok());
db.close();
custom_merge(); custom_merge();
} }
@ -60,25 +136,26 @@ fn custom_merge() {
let mut opts = Options::new(); let mut opts = Options::new();
opts.create_if_missing(true); opts.create_if_missing(true);
opts.add_merge_operator("test operator", concat_merge); opts.add_merge_operator("test operator", concat_merge);
let db = RocksDB::open(&opts, path).unwrap(); {
db.put(b"k1", b"a"); let mut db = RocksDB::open(&opts, path).unwrap();
db.merge(b"k1", b"b"); db.put(b"k1", b"a");
db.merge(b"k1", b"c"); db.merge(b"k1", b"b");
db.merge(b"k1", b"d"); db.merge(b"k1", b"c");
db.merge(b"k1", b"efg"); db.merge(b"k1", b"d");
db.merge(b"k1", b"h"); db.merge(b"k1", b"efg");
db.get(b"k1").map( |value| { db.merge(b"k1", b"h");
match value.to_utf8() { db.get(b"k1").map( |value| {
Some(v) => match value.to_utf8() {
Some(v) =>
println!("retrieved utf8 value: {}", v), println!("retrieved utf8 value: {}", v),
None => None =>
println!("did not read valid utf-8 out of the db"), println!("did not read valid utf-8 out of the db"),
} }
}) })
.on_absent( || { println!("value not found") }) .on_absent( || { println!("value not found") })
.on_error( |e| { println!("error retrieving value: {}", e) }); .on_error( |e| { println!("error retrieving value: {}", e) });
db.close(); }
RocksDB::destroy(&opts, path).is_ok(); RocksDB::destroy(&opts, path).is_ok();
} }
@ -114,10 +191,10 @@ mod tests {
use test::Bencher; use test::Bencher;
use std::thread::sleep_ms; use std::thread::sleep_ms;
use rocksdb::{BlockBasedOptions, Options, RocksDB, MergeOperands, new_bloom_filter, Writable}; use rocksdb::{BlockBasedOptions, Options, RocksDB, MergeOperands, new_bloom_filter, Writable };
use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction; use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction;
fn tuned_for_somebody_elses_disk(path: &str, opts: &mut Options, blockopts: &mut BlockBasedOptions) -> RocksDB { fn tuned_for_somebody_elses_disk(path: &str, opts: & mut Options, blockopts: &mut BlockBasedOptions) -> RocksDB {
opts.create_if_missing(true); opts.create_if_missing(true);
opts.set_max_open_files(10000); opts.set_max_open_files(10000);
opts.set_use_fsync(false); opts.set_use_fsync(false);
@ -152,13 +229,12 @@ mod tests {
let path = "_rust_rocksdb_optimizetest"; let path = "_rust_rocksdb_optimizetest";
let mut opts = Options::new(); let mut opts = Options::new();
let mut blockopts = BlockBasedOptions::new(); let mut blockopts = BlockBasedOptions::new();
let db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts); let mut db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts);
let mut i = 0 as u64; let mut i = 0 as u64;
b.iter(|| { b.iter(|| {
db.put(i.to_string().as_bytes(), b"v1111"); db.put(i.to_string().as_bytes(), b"v1111");
i += 1; i += 1;
}); });
db.close();
} }
#[bench] #[bench]
@ -166,16 +242,17 @@ mod tests {
let path = "_rust_rocksdb_optimizetest"; let path = "_rust_rocksdb_optimizetest";
let mut opts = Options::new(); let mut opts = Options::new();
let mut blockopts = BlockBasedOptions::new(); let mut blockopts = BlockBasedOptions::new();
let db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts); {
let mut i = 0 as u64; let db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts);
b.iter(|| { let mut i = 0 as u64;
db.get(i.to_string().as_bytes()).on_error( |e| { b.iter(|| {
println!("error: {}", e); db.get(i.to_string().as_bytes()).on_error( |e| {
e println!("error: {}", e);
}); e
i += 1; });
}); i += 1;
db.close(); });
}
RocksDB::destroy(&opts, path).is_ok(); RocksDB::destroy(&opts, path).is_ok();
} }
} }

@ -169,30 +169,31 @@ fn mergetest() {
let mut opts = Options::new(); let mut opts = Options::new();
opts.create_if_missing(true); opts.create_if_missing(true);
opts.add_merge_operator("test operator", test_provided_merge); opts.add_merge_operator("test operator", test_provided_merge);
let db = RocksDB::open(&opts, path).unwrap(); {
let p = db.put(b"k1", b"a"); let mut db = RocksDB::open(&opts, path).unwrap();
assert!(p.is_ok()); let p = db.put(b"k1", b"a");
db.merge(b"k1", b"b"); assert!(p.is_ok());
db.merge(b"k1", b"c"); db.merge(b"k1", b"b");
db.merge(b"k1", b"d"); db.merge(b"k1", b"c");
db.merge(b"k1", b"efg"); db.merge(b"k1", b"d");
let m = db.merge(b"k1", b"h"); db.merge(b"k1", b"efg");
assert!(m.is_ok()); let m = db.merge(b"k1", b"h");
db.get(b"k1").map( |value| { assert!(m.is_ok());
match value.to_utf8() { db.get(b"k1").map( |value| {
Some(v) => match value.to_utf8() {
Some(v) =>
println!("retrieved utf8 value: {}", v), println!("retrieved utf8 value: {}", v),
None => None =>
println!("did not read valid utf-8 out of the db"), println!("did not read valid utf-8 out of the db"),
} }
}).on_absent( || { println!("value not present!") }) }).on_absent( || { println!("value not present!") })
.on_error( |e| { println!("error reading value")}); //: {", e) }); .on_error( |e| { println!("error reading value")}); //: {", e) });
assert!(m.is_ok()); assert!(m.is_ok());
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1"); let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh"); assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh");
assert!(db.delete(b"k1").is_ok()); assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").is_none()); assert!(db.get(b"k1").is_none());
db.close(); }
assert!(RocksDB::destroy(&opts, path).is_ok()); assert!(RocksDB::destroy(&opts, path).is_ok());
} }

@ -23,6 +23,7 @@ use std::path::Path;
use std::ptr::Unique; use std::ptr::Unique;
use std::slice; use std::slice;
use std::str::from_utf8; use std::str::from_utf8;
use std::marker::PhantomData;
use rocksdb_ffi; use rocksdb_ffi;
use rocksdb_options::Options; use rocksdb_options::Options;
@ -39,16 +40,134 @@ pub struct ReadOptions {
inner: rocksdb_ffi::RocksDBReadOptions, inner: rocksdb_ffi::RocksDBReadOptions,
} }
pub struct Snapshot<'a> {
db: &'a RocksDB,
inner: rocksdb_ffi::RocksDBSnapshot,
}
pub struct DBIterator {
//TODO: should have a reference to DB to enforce scope, but it's trickier than I thought to add
inner: rocksdb_ffi::RocksDBIterator,
direction: Direction,
just_seeked: bool
}
pub enum Direction {
forward, reverse
}
pub struct SubDBIterator<'a> {
iter: &'a mut DBIterator,
direction: Direction,
}
impl <'a> Iterator for SubDBIterator<'a> {
type Item = (&'a [u8], &'a [u8]);
fn next(&mut self) -> Option<(&'a[u8], &'a[u8])> {
let native_iter = self.iter.inner;
if !self.iter.just_seeked {
match self.direction {
Direction::forward => unsafe { rocksdb_ffi::rocksdb_iter_next(native_iter) },
Direction::reverse => unsafe { rocksdb_ffi::rocksdb_iter_prev(native_iter) },
}
} else {
self.iter.just_seeked = false;
}
if unsafe { rocksdb_ffi::rocksdb_iter_valid(native_iter) } {
let mut key_len: size_t = 0;
let key_len_ptr: *mut size_t = &mut key_len;
let mut val_len: size_t = 0;
let val_len_ptr: *mut size_t = &mut val_len;
let key_ptr = unsafe { rocksdb_ffi::rocksdb_iter_key(native_iter, key_len_ptr) };
let key = unsafe { slice::from_raw_parts(key_ptr, key_len as usize) };
let val_ptr = unsafe { rocksdb_ffi::rocksdb_iter_value(native_iter, val_len_ptr) };
let val = unsafe { slice::from_raw_parts(val_ptr, val_len as usize) };
Some((key,val))
} else {
None
}
}
}
impl DBIterator {
//TODO alias db & opts to different lifetimes, and DBIterator to the db's lifetime
fn new(db: &RocksDB, readopts: &ReadOptions) -> DBIterator {
unsafe {
let iterator = rocksdb_ffi::rocksdb_create_iterator(db.inner, readopts.inner);
rocksdb_ffi::rocksdb_iter_seek_to_first(iterator);
DBIterator{ inner: iterator, direction: Direction::forward, just_seeked: true }
}
}
pub fn from_start(&mut self) -> SubDBIterator {
self.just_seeked = true;
unsafe {
rocksdb_ffi::rocksdb_iter_seek_to_first(self.inner);
};
SubDBIterator{ iter: self, direction: Direction::forward, }
}
pub fn from_end(&mut self) -> SubDBIterator {
self.just_seeked = true;
unsafe {
rocksdb_ffi::rocksdb_iter_seek_to_last(self.inner);
};
SubDBIterator{ iter: self, direction: Direction::reverse, }
}
pub fn from(&mut self, key: &[u8], dir: Direction) -> SubDBIterator {
self.just_seeked = true;
unsafe {
rocksdb_ffi::rocksdb_iter_seek(self.inner, key.as_ptr(), key.len() as size_t);
}
SubDBIterator{ iter: self, direction: dir, }
}
}
impl Drop for DBIterator {
fn drop(&mut self) {
unsafe {
rocksdb_ffi::rocksdb_iter_destroy(self.inner);
}
}
}
impl <'a> Snapshot<'a> {
pub fn new(db: &RocksDB) -> Snapshot {
let snapshot = unsafe { rocksdb_ffi::rocksdb_create_snapshot(db.inner) };
Snapshot{db: db, inner: snapshot}
}
pub fn iterator(&self) -> DBIterator {
let mut readopts = ReadOptions::new();
readopts.set_snapshot(self);
DBIterator::new(self.db, &readopts)
}
}
impl <'a> Drop for Snapshot<'a> {
fn drop(&mut self) {
unsafe {
rocksdb_ffi::rocksdb_release_snapshot(self.db.inner, self.inner);
}
}
}
// This is for the RocksDB and write batches to share the same API // This is for the RocksDB and write batches to share the same API
pub trait Writable { pub trait Writable {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String>; fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), String>;
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String>; fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<(), String>;
fn delete(&self, key: &[u8]) -> Result<(), String>; fn delete(&mut self, key: &[u8]) -> Result<(), String>;
} }
fn error_message(ptr: *const i8) -> String { fn error_message(ptr: *const i8) -> String {
let c_str = unsafe { CStr::from_ptr(ptr) }; let c_str = unsafe { CStr::from_ptr(ptr) };
from_utf8(c_str.to_bytes()).unwrap().to_owned() let s = from_utf8(c_str.to_bytes()).unwrap().to_owned();
unsafe{
libc::free(ptr as *mut libc::c_void);
}
s
} }
impl RocksDB { impl RocksDB {
@ -73,11 +192,12 @@ impl RocksDB {
} }
} }
let err = 0 as *mut i8; let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
let db: rocksdb_ffi::RocksDBInstance; let db: rocksdb_ffi::RocksDBInstance;
unsafe { unsafe {
db = rocksdb_ffi::rocksdb_open(opts.inner, cpath_ptr, err); db = rocksdb_ffi::rocksdb_open(opts.inner, cpath_ptr, err_ptr);
} }
if !err.is_null() { if !err.is_null() {
@ -99,9 +219,10 @@ impl RocksDB {
return Err("path does not exist".to_string()); return Err("path does not exist".to_string());
} }
let err = 0 as *mut i8; let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
unsafe { unsafe {
rocksdb_ffi::rocksdb_destroy_db(opts.inner, cpath_ptr, err); rocksdb_ffi::rocksdb_destroy_db(opts.inner, cpath_ptr, err_ptr);
} }
if !err.is_null() { if !err.is_null() {
return Err(error_message(err)); return Err(error_message(err));
@ -118,9 +239,10 @@ impl RocksDB {
return Err("path does not exist".to_string()); return Err("path does not exist".to_string());
} }
let err = 0 as *mut i8; let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
unsafe { unsafe {
rocksdb_ffi::rocksdb_repair_db(opts.inner, cpath_ptr, err); rocksdb_ffi::rocksdb_repair_db(opts.inner, cpath_ptr, err_ptr);
} }
if !err.is_null() { if !err.is_null() {
return Err(error_message(err)); return Err(error_message(err));
@ -130,9 +252,10 @@ impl RocksDB {
pub fn write(&self, batch: WriteBatch) -> Result<(), String> { pub fn write(&self, batch: WriteBatch) -> Result<(), String> {
let writeopts = unsafe { rocksdb_ffi::rocksdb_writeoptions_create() }; let writeopts = unsafe { rocksdb_ffi::rocksdb_writeoptions_create() };
let err = 0 as *mut i8; let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
unsafe { unsafe {
rocksdb_ffi::rocksdb_write(self.inner, writeopts.clone(), batch.inner, err); rocksdb_ffi::rocksdb_write(self.inner, writeopts.clone(), batch.inner, err_ptr);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
} }
if !err.is_null() { if !err.is_null() {
@ -153,9 +276,10 @@ impl RocksDB {
let val_len: size_t = 0; let val_len: size_t = 0;
let val_len_ptr = &val_len as *const size_t; let val_len_ptr = &val_len as *const size_t;
let err = 0 as *mut i8; let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
let val = rocksdb_ffi::rocksdb_get(self.inner, readopts.clone(), let val = rocksdb_ffi::rocksdb_get(self.inner, readopts.clone(),
key.as_ptr(), key.len() as size_t, val_len_ptr, err) as *mut u8; key.as_ptr(), key.len() as size_t, val_len_ptr, err_ptr) as *mut u8;
rocksdb_ffi::rocksdb_readoptions_destroy(readopts); rocksdb_ffi::rocksdb_readoptions_destroy(readopts);
if !err.is_null() { if !err.is_null() {
return RocksDBResult::Error(error_message(err)); return RocksDBResult::Error(error_message(err));
@ -169,19 +293,25 @@ impl RocksDB {
} }
} }
pub fn close(&self) { pub fn iterator(&self) -> DBIterator {
unsafe { rocksdb_ffi::rocksdb_close(self.inner); } let opts = ReadOptions::new();
DBIterator::new(&self, &opts)
}
pub fn snapshot(&self) -> Snapshot {
Snapshot::new(self)
} }
} }
impl Writable for RocksDB { impl Writable for RocksDB {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> { fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe { unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8; let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
rocksdb_ffi::rocksdb_put(self.inner, writeopts.clone(), key.as_ptr(), rocksdb_ffi::rocksdb_put(self.inner, writeopts.clone(), key.as_ptr(),
key.len() as size_t, value.as_ptr(), key.len() as size_t, value.as_ptr(),
value.len() as size_t, err); value.len() as size_t, err_ptr);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
if !err.is_null() { if !err.is_null() {
return Err(error_message(err)); return Err(error_message(err));
@ -190,13 +320,14 @@ impl Writable for RocksDB {
} }
} }
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> { fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe { unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8; let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
rocksdb_ffi::rocksdb_merge(self.inner, writeopts.clone(), key.as_ptr(), rocksdb_ffi::rocksdb_merge(self.inner, writeopts.clone(), key.as_ptr(),
key.len() as size_t, value.as_ptr(), key.len() as size_t, value.as_ptr(),
value.len() as size_t, err); value.len() as size_t, err_ptr);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
if !err.is_null() { if !err.is_null() {
return Err(error_message(err)); return Err(error_message(err));
@ -205,12 +336,13 @@ impl Writable for RocksDB {
} }
} }
fn delete(&self, key: &[u8]) -> Result<(), String> { fn delete(&mut self, key: &[u8]) -> Result<(), String> {
unsafe { unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8; let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
rocksdb_ffi::rocksdb_delete(self.inner, writeopts.clone(), key.as_ptr(), rocksdb_ffi::rocksdb_delete(self.inner, writeopts.clone(), key.as_ptr(),
key.len() as size_t, err); key.len() as size_t, err_ptr);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
if !err.is_null() { if !err.is_null() {
return Err(error_message(err)); return Err(error_message(err));
@ -238,8 +370,14 @@ impl Drop for WriteBatch {
} }
} }
impl Drop for RocksDB {
fn drop(&mut self) {
unsafe { rocksdb_ffi::rocksdb_close(self.inner); }
}
}
impl Writable for WriteBatch { impl Writable for WriteBatch {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> { fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe { unsafe {
rocksdb_ffi::rocksdb_writebatch_put(self.inner, key.as_ptr(), rocksdb_ffi::rocksdb_writebatch_put(self.inner, key.as_ptr(),
key.len() as size_t, value.as_ptr(), key.len() as size_t, value.as_ptr(),
@ -248,7 +386,7 @@ impl Writable for WriteBatch {
} }
} }
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> { fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe { unsafe {
rocksdb_ffi::rocksdb_writebatch_merge(self.inner, key.as_ptr(), rocksdb_ffi::rocksdb_writebatch_merge(self.inner, key.as_ptr(),
key.len() as size_t, value.as_ptr(), key.len() as size_t, value.as_ptr(),
@ -257,7 +395,7 @@ impl Writable for WriteBatch {
} }
} }
fn delete(&self, key: &[u8]) -> Result<(), String> { fn delete(&mut self, key: &[u8]) -> Result<(), String> {
unsafe { unsafe {
rocksdb_ffi::rocksdb_writebatch_delete(self.inner, key.as_ptr(), rocksdb_ffi::rocksdb_writebatch_delete(self.inner, key.as_ptr(),
key.len() as size_t); key.len() as size_t);
@ -275,12 +413,24 @@ impl Drop for ReadOptions {
} }
impl ReadOptions { impl ReadOptions {
fn new() -> ReadOptions {
unsafe {
ReadOptions{inner: rocksdb_ffi::rocksdb_readoptions_create()}
}
}
//TODO add snapshot setting here
//TODO add snapshot wrapper structs with proper destructors; that struct needs an "iterator" impl too.
fn fill_cache(&mut self, v: bool) { fn fill_cache(&mut self, v: bool) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_readoptions_set_fill_cache(self.inner, v); rocksdb_ffi::rocksdb_readoptions_set_fill_cache(self.inner, v);
} }
} }
fn set_snapshot(&mut self, snapshot: &Snapshot) {
unsafe {
rocksdb_ffi::rocksdb_readoptions_set_snapshot(self.inner, snapshot.inner);
}
}
} }
pub struct RocksDBVector { pub struct RocksDBVector {
@ -392,40 +542,75 @@ impl <T, E> RocksDBResult<T, E> {
#[test] #[test]
fn external() { fn external() {
let path = "_rust_rocksdb_externaltest"; let path = "_rust_rocksdb_externaltest";
let db = RocksDB::open_default(path).unwrap(); {
let p = db.put(b"k1", b"v1111"); let mut db = RocksDB::open_default(path).unwrap();
assert!(p.is_ok()); let p = db.put(b"k1", b"v1111");
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1"); assert!(p.is_ok());
assert!(r.unwrap().to_utf8().unwrap() == "v1111"); let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
assert!(db.delete(b"k1").is_ok()); assert!(r.unwrap().to_utf8().unwrap() == "v1111");
assert!(db.get(b"k1").is_none()); assert!(db.delete(b"k1").is_ok());
db.close(); assert!(db.get(b"k1").is_none());
}
let opts = Options::new(); let opts = Options::new();
assert!(RocksDB::destroy(&opts, path).is_ok()); let result = RocksDB::destroy(&opts, path);
assert!(result.is_ok());
}
#[test]
fn errors_do_stuff() {
let path = "_rust_rocksdb_error";
let mut db = RocksDB::open_default(path).unwrap();
let opts = Options::new();
// The DB will still be open when we try to destroy and the lock should fail
match RocksDB::destroy(&opts, path) {
Err(ref s) => assert!(s == "IO error: lock _rust_rocksdb_error/LOCK: No locks available"),
Ok(_) => panic!("should fail")
}
} }
#[test] #[test]
fn writebatch_works() { fn writebatch_works() {
let path = "_rust_rocksdb_writebacktest"; let path = "_rust_rocksdb_writebacktest";
let db = RocksDB::open_default(path).unwrap(); {
{ // test put let mut db = RocksDB::open_default(path).unwrap();
let batch = WriteBatch::new(); { // test put
assert!(db.get(b"k1").is_none()); let mut batch = WriteBatch::new();
batch.put(b"k1", b"v1111"); assert!(db.get(b"k1").is_none());
assert!(db.get(b"k1").is_none()); batch.put(b"k1", b"v1111");
let p = db.write(batch); assert!(db.get(b"k1").is_none());
assert!(p.is_ok()); let p = db.write(batch);
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1"); assert!(p.is_ok());
assert!(r.unwrap().to_utf8().unwrap() == "v1111"); let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "v1111");
}
{ // test delete
let mut batch = WriteBatch::new();
batch.delete(b"k1");
let p = db.write(batch);
assert!(p.is_ok());
assert!(db.get(b"k1").is_none());
}
} }
{ // test delete let opts = Options::new();
let batch = WriteBatch::new(); assert!(RocksDB::destroy(&opts, path).is_ok());
batch.delete(b"k1"); }
let p = db.write(batch);
#[test]
fn iterator_test() {
let path = "_rust_rocksdb_iteratortest";
{
let mut db = RocksDB::open_default(path).unwrap();
let p = db.put(b"k1", b"v1111");
assert!(p.is_ok()); assert!(p.is_ok());
assert!(db.get(b"k1").is_none()); let p = db.put(b"k2", b"v2222");
assert!(p.is_ok());
let p = db.put(b"k3", b"v3333");
assert!(p.is_ok());
let mut iter = db.iterator();
for (k,v) in iter.from_start() {
println!("Hello {}: {}", from_utf8(k).unwrap(), from_utf8(v).unwrap());
}
} }
db.close();
let opts = Options::new(); let opts = Options::new();
assert!(RocksDB::destroy(&opts, path).is_ok()); assert!(RocksDB::destroy(&opts, path).is_ok());
} }

Loading…
Cancel
Save