Merge pull request #93 from spacejam/0.5.0

0.5.0
master
Tyler Neely 8 years ago committed by GitHub
commit a84012c3c8
  1. 1
      .gitignore
  2. 8
      .gitmodules
  3. 2
      .travis.yml
  4. 17
      CHANGELOG.txt
  5. 6
      Cargo.toml
  6. 167
      README.md
  7. 0
      librocksdb-sys/.gitignore
  8. 6
      librocksdb-sys/Cargo.toml
  9. 0
      librocksdb-sys/Makefile
  10. 0
      librocksdb-sys/README.md
  11. 0
      librocksdb-sys/build.rs
  12. 0
      librocksdb-sys/build_version.cc
  13. 0
      librocksdb-sys/build_version.h
  14. 0
      librocksdb-sys/rocksdb
  15. 0
      librocksdb-sys/rocksdb_lib_sources.txt
  16. 0
      librocksdb-sys/snappy
  17. 0
      librocksdb-sys/snappy-stubs-public.h
  18. 0
      librocksdb-sys/src/lib.rs
  19. 0
      librocksdb-sys/src/test.rs
  20. 301
      librocksdb-sys/tests/ffi.rs
  21. 82
      src/lib.rs
  22. 42
      src/merge_operator.rs
  23. 233
      src/rocksdb.rs
  24. 5
      test/test_column_family.rs
  25. 43
      test/test_iterator.rs
  26. 2
      test/test_multithreaded.rs

1
.gitignore vendored

@ -7,3 +7,4 @@ Cargo.lock
_rust_rocksdb* _rust_rocksdb*
*rlib *rlib
tags tags
path

8
.gitmodules vendored

@ -1,6 +1,6 @@
[submodule "rocksdb-sys/snappy"] [submodule "rocksdb_sys/snappy"]
path = rocksdb-sys/snappy path = librocksdb-sys/snappy
url = https://github.com/google/snappy.git url = https://github.com/google/snappy.git
[submodule "rocksdb-sys/rocksdb"] [submodule "rocksdb_sys/rocksdb"]
path = rocksdb-sys/rocksdb path = librocksdb-sys/rocksdb
url = https://github.com/facebook/rocksdb.git url = https://github.com/facebook/rocksdb.git

@ -14,7 +14,7 @@ addons:
- g++-5 - g++-5
script: script:
- cargo test --manifest-path=rocksdb-sys/Cargo.toml - cargo test --manifest-path=librocksdb-sys/Cargo.toml
- cargo test - cargo test
cache: cache:

@ -4,6 +4,23 @@ Changelog
0.5 (in development) 0.5 (in development)
~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~
**Breaking changes**
* No more Writable trait, as WriteBatch is not thread-safe as a DB (spacejam)
* All imports of `rocksdb::rocksdb::*` should now be simply `rocksdb::*` (alexreg) * All imports of `rocksdb::rocksdb::*` should now be simply `rocksdb::*` (alexreg)
* All errors changed to use a new `rocksdb::Error` type (kaedroho, alexreg)
* Removed `Options.set_filter_deletes` as it was removed in RocksDB (kaedroho)
* Renamed `add_merge_operator` to `set_merge_operator` and `add_comparator` to `set_comparator` (kaedroho)
**New features**
* Windows support (development by jsgf and arkpar. ported by kaedroho)
* The RocksDB library is now built at crate compile-time and statically linked with the resulting binary (development by jsgf and arkpar. ported by kaedroho)
* Cleaned up and improved coverage and tests of the ffi module (alexreg)
* Added many new methods to the `Options` type (development by ngaut, BusyJay, zhangjinpeng1987, siddontang and hhkbp2. ported by kaedroho)
* Added `len` and `is_empty` methods to `WriteBatch` (development by siddontang. ported by kaedroho)
* Added `path` mathod to `DB` (development by siddontang. ported by kaedroho)
* `DB::open` now accepts any type that implements `Into<Path>` as the path argument (kaedroho)
* `DB` now implements the `Debug` trait (kaedroho)
* Add iterator_cf to snapshot (jezell) * Add iterator_cf to snapshot (jezell)
* Changelog started * Changelog started

@ -1,7 +1,7 @@
[package] [package]
name = "rocksdb" name = "rocksdb"
description = "Rust wrapper for Facebook's RocksDB embeddable database" description = "Rust wrapper for Facebook's RocksDB embeddable database"
version = "0.4.1" version = "0.5.0-rc.1"
authors = ["Tyler Neely <t@jujit.su>", "David Greenberg <dsg123456789@gmail.com>"] authors = ["Tyler Neely <t@jujit.su>", "David Greenberg <dsg123456789@gmail.com>"]
license = "Apache-2.0" license = "Apache-2.0"
keywords = ["database", "embedded", "LSM-tree", "persistence"] keywords = ["database", "embedded", "LSM-tree", "persistence"]
@ -22,5 +22,5 @@ name = "test"
path = "test/test.rs" path = "test/test.rs"
[dependencies] [dependencies]
libc = "0.2.13" libc = "0.2"
rocksdb-sys = { path = "rocksdb-sys", version = "0.4.0" } librocksdb-sys = { path = "librocksdb-sys", version = "0.4.0" }

@ -3,174 +3,11 @@ rust-rocksdb
[![Build Status](https://travis-ci.org/spacejam/rust-rocksdb.svg?branch=master)](https://travis-ci.org/spacejam/rust-rocksdb) [![Build Status](https://travis-ci.org/spacejam/rust-rocksdb.svg?branch=master)](https://travis-ci.org/spacejam/rust-rocksdb)
[![crates.io](http://meritbadge.herokuapp.com/rocksdb)](https://crates.io/crates/rocksdb) [![crates.io](http://meritbadge.herokuapp.com/rocksdb)](https://crates.io/crates/rocksdb)
This library has been tested against RocksDB 3.13.1 on Linux and OS X. The 0.4.1 crate should work with the Rust 1.9 stable and nightly releases as of 7/1/16. [documentation](http://spacejam.github.io/docs/rocksdb/rocksdb/)
### Status
- [x] basic open/put/get/delete/close
- [x] rustic merge operator
- [x] write batch (thanks @dgrnbrg!)
- [x] compaction filter, style
- [x] LRU cache
- [x] destroy/repair
- [x] iterator
- [x] comparator
- [x] snapshot
- [x] column family operations
- [ ] prefix seek
- [ ] slicetransform
- [x] windows support
Feedback and pull requests welcome! If a particular feature of RocksDB is important to you, please let me know by opening an issue, and I'll prioritize it. Feedback and pull requests welcome! If a particular feature of RocksDB is important to you, please let me know by opening an issue, and I'll prioritize it.
### Running
###### Cargo.toml
```rust ```rust
[dependencies] [dependencies]
rocksdb = "0.4.1" rocksdb = "0.5.0"
```
###### Code
```rust
extern crate rocksdb;
use rocksdb::{DB, Writable};
fn main() {
let mut db = DB::open_default("/path/for/rocksdb/storage").unwrap();
db.put(b"my key", b"my value");
match db.get(b"my key") {
Ok(Some(value)) => println!("retrieved value {}", value.to_utf8().unwrap()),
Ok(None) => println!("value not found"),
Err(e) => println!("operational problem encountered: {}", e),
}
db.delete(b"my key");
}
```
###### Making an atomic commit of several writes
```rust
extern crate rocksdb;
use rocksdb::{DB, WriteBatch, Writable};
fn main() {
// NB: db is automatically freed at end of lifetime
let mut db = DB::open_default("/path/for/rocksdb/storage").unwrap();
{
let mut batch = WriteBatch::default(); // WriteBatch and db both have trait Writable
batch.put(b"my key", b"my value");
batch.put(b"key2", b"value2");
batch.put(b"key3", b"value3");
db.write(batch); // Atomically commits the batch
}
}
```
###### Getting an `Iterator`
```rust
extern crate rocksdb;
use rocksdb::{DB, Direction, IteratorMode};
fn main() {
// NB: db is automatically freed at end of lifetime
let mut db = DB::open_default("/path/for/rocksdb/storage").unwrap();
let mut iter = db.iterator(IteratorMode::Start); // Always iterates forward
for (key, value) in iter {
println!("Saw {} {}", key, value); //actually, need to convert [u8] keys into Strings
}
iter = db.iterator(IteratorMode::End); // Always iterates backward
for (key, value) in iter {
println!("Saw {} {}", key, value);
}
iter = db.iterator(IteratorMode::From(b"my key", Direction::forward)); // From a key in Direction::{forward,reverse}
for (key, value) in iter {
println!("Saw {} {}", key, value);
}
// You can seek with an existing Iterator instance, too
iter.set_mode(IteratorMode::From(b"another key", Direction::reverse));
for (key, value) in iter {
println!("Saw {} {}", key, value);
}
}
```
###### Getting an `Iterator` from a `Snapshot`
```rust
extern crate rocksdb;
use rocksdb::{DB, Direction};
fn main() {
// NB: db is automatically freed at end of lifetime
let mut db = DB::open_default("/path/for/rocksdb/storage").unwrap();
let snapshot = db.snapshot(); // Creates a longer-term snapshot of the DB, but freed when goes out of scope
let mut iter = snapshot.iterator(IteratorMode::Start); // Make as many iterators as you'd like from one snapshot
}
```
###### Rustic Merge Operator
```rust
extern crate rocksdb;
use rocksdb::{Options, DB, MergeOperands, Writable};
fn concat_merge(new_key: &[u8], existing_val: Option<&[u8]>,
operands: &mut MergeOperands) -> Vec<u8> {
let mut result: Vec<u8> = Vec::with_capacity(operands.size_hint().0);
existing_val.map(|v| {
for e in v {
result.push(*e)
}
});
for op in operands {
for e in op {
result.push(*e)
}
}
result
}
fn main() {
let path = "/path/to/rocksdb";
let mut opts = Options::default();
opts.create_if_missing(true);
opts.add_merge_operator("test operator", concat_merge);
let mut db = DB::open(&opts, path).unwrap();
let p = db.put(b"k1", b"a");
db.merge(b"k1", b"b");
db.merge(b"k1", b"c");
db.merge(b"k1", b"d");
db.merge(b"k1", b"efg");
let r = db.get(b"k1");
assert!(r.unwrap().unwrap().to_utf8().unwrap() == "abcdefg");
}
```
###### Apply Some Tunings
Please read [the official tuning guide](https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide), and most importantly, measure performance under realistic workloads with realistic hardware.
```rust
use rocksdb::{Options, DB};
use rocksdb::DBCompactionStyle::DBUniversalCompaction;
fn badly_tuned_for_somebody_elses_disk() -> DB {
let path = "_rust_rocksdb_optimizetest";
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_max_open_files(10000);
opts.set_use_fsync(false);
opts.set_bytes_per_sync(8388608);
opts.set_disable_data_sync(false);
opts.set_block_cache_size_mb(1024);
opts.set_table_cache_num_shard_bits(6);
opts.set_max_write_buffer_number(32);
opts.set_write_buffer_size(536870912);
opts.set_target_file_size_base(1073741824);
opts.set_min_write_buffer_number_to_merge(4);
opts.set_level_zero_stop_writes_trigger(2000);
opts.set_level_zero_slowdown_writes_trigger(0);
opts.set_compaction_style(DBUniversalCompaction);
opts.set_max_background_compactions(4);
opts.set_max_background_flushes(4);
opts.set_filter_deletes(false);
opts.set_disable_auto_compactions(true);
DB::open(&opts, path).unwrap()
}
``` ```

@ -1,11 +1,11 @@
[package] [package]
name = "rocksdb-sys" name = "librocksdb-sys"
version = "0.4.0" version = "0.4.0"
authors = ["Karl Hobley <karlhobley10@gmail.com>", "Arkadiy Paronyan <arkadiy@ethcore.io>"] authors = ["Karl Hobley <karlhobley10@gmail.com>", "Arkadiy Paronyan <arkadiy@ethcore.io>"]
license = "MIT/Apache-2.0/BSD" license = "MIT/Apache-2.0/BSD-3-Clause"
description = "Native bindings to librocksdb" description = "Native bindings to librocksdb"
readme = "README.md" readme = "README.md"
repository = "https://github.com/jsgf/rocksdb-sys.git" repository = "https://github.com/spacejam/rust-rocksdb.git"
keywords = [ "ffi", "rocksdb" ] keywords = [ "ffi", "rocksdb" ]
build = "build.rs" build = "build.rs"

@ -20,7 +20,7 @@
#[macro_use] #[macro_use]
extern crate const_cstr; extern crate const_cstr;
extern crate libc; extern crate libc;
extern crate rocksdb_sys as ffi; extern crate librocksdb_sys as ffi;
use ::ffi::*; use ::ffi::*;
use ::libc::*; use ::libc::*;
@ -113,8 +113,7 @@ unsafe fn CheckGet(mut db: *mut rocksdb_t,
expected: *const c_char) { expected: *const c_char) {
let mut err: *mut c_char = ptr::null_mut(); let mut err: *mut c_char = ptr::null_mut();
let mut val_len: size_t = 0; let mut val_len: size_t = 0;
let mut val: *mut c_char = let mut val: *mut c_char = rocksdb_get(db, options, key, strlen(key), &mut val_len, &mut err);
rocksdb_get(db, options, key, strlen(key), &mut val_len, &mut err);
CheckNoError!(err); CheckNoError!(err);
CheckEqual(expected, val, val_len); CheckEqual(expected, val, val_len);
Free(&mut val); Free(&mut val);
@ -139,9 +138,7 @@ unsafe fn CheckGetCF(db: *mut rocksdb_t,
Free(&mut val); Free(&mut val);
} }
unsafe fn CheckIter(iter: *mut rocksdb_iterator_t, unsafe fn CheckIter(iter: *mut rocksdb_iterator_t, key: *const c_char, val: *const c_char) {
key: *const c_char,
val: *const c_char) {
let mut len: size_t = 0; let mut len: size_t = 0;
let mut str: *const c_char; let mut str: *const c_char;
str = rocksdb_iter_key(iter, &mut len); str = rocksdb_iter_key(iter, &mut len);
@ -173,9 +170,7 @@ unsafe extern "C" fn CheckPut(ptr: *mut c_void,
} }
// Callback from rocksdb_writebatch_iterate() // Callback from rocksdb_writebatch_iterate()
unsafe extern "C" fn CheckDel(ptr: *mut c_void, unsafe extern "C" fn CheckDel(ptr: *mut c_void, k: *const c_char, klen: size_t) {
k: *const c_char,
klen: size_t) {
let mut state: *mut c_int = ptr as *mut c_int; let mut state: *mut c_int = ptr as *mut c_int;
CheckCondition!(*state == 2); CheckCondition!(*state == 2);
CheckEqual(cstrp!("bar"), k, klen); CheckEqual(cstrp!("bar"), k, klen);
@ -286,7 +281,9 @@ unsafe extern "C" fn CFilterFactoryName(arg: *mut c_void) -> *const c_char {
cstrp!("foo") cstrp!("foo")
} }
unsafe extern "C" fn CFilterCreate(arg: *mut c_void, context: *mut rocksdb_compactionfiltercontext_t) -> *mut rocksdb_compactionfilter_t { unsafe extern "C" fn CFilterCreate(arg: *mut c_void,
context: *mut rocksdb_compactionfiltercontext_t)
-> *mut rocksdb_compactionfilter_t {
rocksdb_compactionfilter_create(ptr::null_mut(), rocksdb_compactionfilter_create(ptr::null_mut(),
Some(CFilterDestroy), Some(CFilterDestroy),
Some(CFilterFilter), Some(CFilterFilter),
@ -365,7 +362,15 @@ unsafe extern "C" fn MergeOperatorFullMerge(arg: *mut c_void,
result result
} }
unsafe extern "C" fn MergeOperatorPartialMerge(arg: *mut c_void, key: *const c_char, key_length: size_t, operands_list: *const *const c_char, operands_list_length: *const size_t, num_operands: c_int, success: *mut u8, new_value_length: *mut size_t) -> *mut c_char { unsafe extern "C" fn MergeOperatorPartialMerge(arg: *mut c_void,
key: *const c_char,
key_length: size_t,
operands_list: *const *const c_char,
operands_list_length: *const size_t,
num_operands: c_int,
success: *mut u8,
new_value_length: *mut size_t)
-> *mut c_char {
*new_value_length = 4; *new_value_length = 4;
*success = 1; *success = 1;
let result: *mut c_char = malloc(4) as *mut _; let result: *mut c_char = malloc(4) as *mut _;
@ -433,8 +438,8 @@ fn ffi() {
no_compression, no_compression,
]; ];
rocksdb_options_set_compression_per_level(options, rocksdb_options_set_compression_per_level(options,
mem::transmute(compression_levels.as_ptr()), mem::transmute(compression_levels.as_ptr()),
compression_levels.len() as size_t); compression_levels.len() as size_t);
roptions = rocksdb_readoptions_create(); roptions = rocksdb_readoptions_create();
rocksdb_readoptions_set_verify_checksums(roptions, 1); rocksdb_readoptions_set_verify_checksums(roptions, 1);
@ -459,13 +464,7 @@ fn ffi() {
CheckGet(db, roptions, cstrp!("foo") as *const _, ptr::null()); CheckGet(db, roptions, cstrp!("foo") as *const _, ptr::null());
StartPhase("put"); StartPhase("put");
rocksdb_put(db, rocksdb_put(db, woptions, cstrp!("foo"), 3, cstrp!("hello"), 5, &mut err);
woptions,
cstrp!("foo"),
3,
cstrp!("hello"),
5,
&mut err);
CheckNoError!(err); CheckNoError!(err);
CheckGet(db, roptions, cstrp!("foo"), cstrp!("hello")); CheckGet(db, roptions, cstrp!("foo"), cstrp!("hello"));
@ -474,19 +473,14 @@ fn ffi() {
rocksdb_destroy_db(options, dbbackupname, &mut err); rocksdb_destroy_db(options, dbbackupname, &mut err);
CheckNoError!(err); CheckNoError!(err);
let be = let be = rocksdb_backup_engine_open(options, dbbackupname, &mut err);
rocksdb_backup_engine_open(options, dbbackupname, &mut err);
CheckNoError!(err); CheckNoError!(err);
rocksdb_backup_engine_create_new_backup(be, db, &mut err); rocksdb_backup_engine_create_new_backup(be, db, &mut err);
CheckNoError!(err); CheckNoError!(err);
// need a change to trigger a new backup // need a change to trigger a new backup
rocksdb_delete(db, rocksdb_delete(db, woptions, cstrp!("does-not-exist"), 14, &mut err);
woptions,
cstrp!("does-not-exist"),
14,
&mut err);
CheckNoError!(err); CheckNoError!(err);
rocksdb_backup_engine_create_new_backup(be, db, &mut err); rocksdb_backup_engine_create_new_backup(be, db, &mut err);
@ -515,7 +509,11 @@ fn ffi() {
let restore_options = rocksdb_restore_options_create(); let restore_options = rocksdb_restore_options_create();
rocksdb_restore_options_set_keep_log_files(restore_options, 0); rocksdb_restore_options_set_keep_log_files(restore_options, 0);
rocksdb_backup_engine_restore_db_from_latest_backup(be, dbname, dbname, restore_options, &mut err); rocksdb_backup_engine_restore_db_from_latest_backup(be,
dbname,
dbname,
restore_options,
&mut err);
CheckNoError!(err); CheckNoError!(err);
rocksdb_restore_options_destroy(restore_options); rocksdb_restore_options_destroy(restore_options);
@ -564,8 +562,7 @@ fn ffi() {
let wb = rocksdb_writebatch_create(); let wb = rocksdb_writebatch_create();
let k_list: [*const c_char; 2] = [cstrp!("z"), cstrp!("ap")]; let k_list: [*const c_char; 2] = [cstrp!("z"), cstrp!("ap")];
let k_sizes: [size_t; 2] = [1, 2]; let k_sizes: [size_t; 2] = [1, 2];
let v_list: [*const c_char; 3] = let v_list: [*const c_char; 3] = [cstrp!("x"), cstrp!("y"), cstrp!("z")];
[cstrp!("x"), cstrp!("y"), cstrp!("z")];
let v_sizes: [size_t; 3] = [1, 1, 1]; let v_sizes: [size_t; 3] = [1, 1, 1];
rocksdb_writebatch_putv(wb, rocksdb_writebatch_putv(wb,
k_list.len() as c_int, k_list.len() as c_int,
@ -591,14 +588,14 @@ fn ffi() {
rocksdb_writebatch_put(wb1, cstrp!("quux"), 4, cstrp!("e"), 1); rocksdb_writebatch_put(wb1, cstrp!("quux"), 4, cstrp!("e"), 1);
rocksdb_writebatch_delete(wb1, cstrp!("quux"), 4); rocksdb_writebatch_delete(wb1, cstrp!("quux"), 4);
let mut repsize1: size_t = 0; let mut repsize1: size_t = 0;
let mut rep = let mut rep = rocksdb_writebatch_data(wb1, &mut repsize1) as *const c_void;
rocksdb_writebatch_data(wb1, &mut repsize1) as *const c_void; let mut wb2 = rocksdb_writebatch_create_from(rep as *const c_char, repsize1);
let mut wb2 = rocksdb_writebatch_create_from(rep as *const c_char, CheckCondition!(rocksdb_writebatch_count(wb1) == rocksdb_writebatch_count(wb2));
repsize1);
CheckCondition!(rocksdb_writebatch_count(wb1) ==
rocksdb_writebatch_count(wb2));
let mut repsize2: size_t = 0; let mut repsize2: size_t = 0;
CheckCondition!(memcmp(rep, rocksdb_writebatch_data(wb2, &mut repsize2) as *const c_void, repsize1) == 0); CheckCondition!(memcmp(rep,
rocksdb_writebatch_data(wb2, &mut repsize2) as *const c_void,
repsize1) ==
0);
rocksdb_writebatch_destroy(wb1); rocksdb_writebatch_destroy(wb1);
rocksdb_writebatch_destroy(wb2); rocksdb_writebatch_destroy(wb2);
} }
@ -627,14 +624,11 @@ fn ffi() {
StartPhase("multiget"); StartPhase("multiget");
{ {
let keys: [*const c_char; 3] = let keys: [*const c_char; 3] = [cstrp!("box"), cstrp!("foo"), cstrp!("notfound")];
[cstrp!("box"), cstrp!("foo"), cstrp!("notfound")];
let keys_sizes: [size_t; 3] = [3, 3, 8]; let keys_sizes: [size_t; 3] = [3, 3, 8];
let mut vals: [*mut c_char; 3] = let mut vals: [*mut c_char; 3] = [ptr::null_mut(), ptr::null_mut(), ptr::null_mut()];
[ptr::null_mut(), ptr::null_mut(), ptr::null_mut()];
let mut vals_sizes: [size_t; 3] = [0, 0, 0]; let mut vals_sizes: [size_t; 3] = [0, 0, 0];
let mut errs: [*mut c_char; 3] = let mut errs: [*mut c_char; 3] = [ptr::null_mut(), ptr::null_mut(), ptr::null_mut()];
[ptr::null_mut(), ptr::null_mut(), ptr::null_mut()];
rocksdb_multi_get(db, rocksdb_multi_get(db,
roptions, roptions,
3, 3,
@ -659,11 +653,9 @@ fn ffi() {
StartPhase("approximate_sizes"); StartPhase("approximate_sizes");
{ {
let mut sizes: [uint64_t; 2] = [0, 0]; let mut sizes: [uint64_t; 2] = [0, 0];
let start: [*const c_char; 2] = [cstrp!("a"), let start: [*const c_char; 2] = [cstrp!("a"), cstrp!("k00000000000000010000")];
cstrp!("k00000000000000010000")];
let start_len: [size_t; 2] = [1, 21]; let start_len: [size_t; 2] = [1, 21];
let limit: [*const c_char; 2] = [cstrp!("k00000000000000010000"), let limit: [*const c_char; 2] = [cstrp!("k00000000000000010000"), cstrp!("z")];
cstrp!("z")];
let limit_len: [size_t; 2] = [21, 1]; let limit_len: [size_t; 2] = [21, 1];
rocksdb_writeoptions_set_sync(woptions, 0); rocksdb_writeoptions_set_sync(woptions, 0);
for i in 0..20000 { for i in 0..20000 {
@ -747,14 +739,12 @@ fn ffi() {
rocksdb_filterpolicy_create_bloom(10) rocksdb_filterpolicy_create_bloom(10)
}; };
rocksdb_block_based_options_set_filter_policy(table_options, rocksdb_block_based_options_set_filter_policy(table_options, policy);
policy);
// Create new database // Create new database
rocksdb_close(db); rocksdb_close(db);
rocksdb_destroy_db(options, dbname, &mut err); rocksdb_destroy_db(options, dbname, &mut err);
rocksdb_options_set_block_based_table_factory(options, rocksdb_options_set_block_based_table_factory(options, table_options);
table_options);
db = rocksdb_open(options, dbname, &mut err); db = rocksdb_open(options, dbname, &mut err);
CheckNoError!(err); CheckNoError!(err);
rocksdb_put(db, rocksdb_put(db,
@ -789,10 +779,8 @@ fn ffi() {
CheckGet(db, roptions, cstrp!("bar"), cstrp!("barvalue")); CheckGet(db, roptions, cstrp!("bar"), cstrp!("barvalue"));
} }
// Reset the policy // Reset the policy
rocksdb_block_based_options_set_filter_policy(table_options, rocksdb_block_based_options_set_filter_policy(table_options, ptr::null_mut());
ptr::null_mut()); rocksdb_options_set_block_based_table_factory(options, table_options);
rocksdb_options_set_block_based_table_factory(options,
table_options);
} }
StartPhase("compaction_filter"); StartPhase("compaction_filter");
@ -807,14 +795,9 @@ fn ffi() {
rocksdb_close(db); rocksdb_close(db);
rocksdb_destroy_db(options_with_filter, dbname, &mut err); rocksdb_destroy_db(options_with_filter, dbname, &mut err);
rocksdb_options_set_compaction_filter(options_with_filter, cfilter); rocksdb_options_set_compaction_filter(options_with_filter, cfilter);
db = CheckCompaction(dbname, db = CheckCompaction(dbname, db, options_with_filter, roptions, woptions);
db,
options_with_filter,
roptions,
woptions);
rocksdb_options_set_compaction_filter(options_with_filter, rocksdb_options_set_compaction_filter(options_with_filter, ptr::null_mut());
ptr::null_mut());
rocksdb_compactionfilter_destroy(cfilter); rocksdb_compactionfilter_destroy(cfilter);
rocksdb_options_destroy(options_with_filter); rocksdb_options_destroy(options_with_filter);
} }
@ -822,32 +805,30 @@ fn ffi() {
StartPhase("compaction_filter_factory"); StartPhase("compaction_filter_factory");
{ {
let mut options_with_filter_factory = rocksdb_options_create(); let mut options_with_filter_factory = rocksdb_options_create();
rocksdb_options_set_create_if_missing(options_with_filter_factory, rocksdb_options_set_create_if_missing(options_with_filter_factory, 1);
1); let mut factory = rocksdb_compactionfilterfactory_create(ptr::null_mut(),
let mut factory = rocksdb_compactionfilterfactory_create(ptr::null_mut(), Some(CFilterFactoryDestroy), Some(CFilterCreate), Some(CFilterFactoryName)); Some(CFilterFactoryDestroy),
Some(CFilterCreate),
Some(CFilterFactoryName));
// Create new database // Create new database
rocksdb_close(db); rocksdb_close(db);
rocksdb_destroy_db(options_with_filter_factory, dbname, &mut err); rocksdb_destroy_db(options_with_filter_factory, dbname, &mut err);
rocksdb_options_set_compaction_filter_factory(options_with_filter_factory, factory); rocksdb_options_set_compaction_filter_factory(options_with_filter_factory, factory);
db = CheckCompaction(dbname, db = CheckCompaction(dbname, db, options_with_filter_factory, roptions, woptions);
db,
options_with_filter_factory,
roptions,
woptions);
rocksdb_options_set_compaction_filter_factory(options_with_filter_factory, ptr::null_mut()); rocksdb_options_set_compaction_filter_factory(options_with_filter_factory,
ptr::null_mut());
rocksdb_options_destroy(options_with_filter_factory); rocksdb_options_destroy(options_with_filter_factory);
} }
StartPhase("merge_operator"); StartPhase("merge_operator");
{ {
let mut merge_operator = let mut merge_operator = rocksdb_mergeoperator_create(ptr::null_mut(),
rocksdb_mergeoperator_create(ptr::null_mut(), Some(MergeOperatorDestroy),
Some(MergeOperatorDestroy), Some(MergeOperatorFullMerge),
Some(MergeOperatorFullMerge), Some(MergeOperatorPartialMerge),
Some(MergeOperatorPartialMerge), None,
None, Some(MergeOperatorName));
Some(MergeOperatorName));
// Create new database // Create new database
rocksdb_close(db); rocksdb_close(db);
rocksdb_destroy_db(options, dbname, &mut err); rocksdb_destroy_db(options, dbname, &mut err);
@ -895,21 +876,15 @@ fn ffi() {
rocksdb_options_set_create_if_missing(db_options, 1); rocksdb_options_set_create_if_missing(db_options, 1);
db = rocksdb_open(db_options, dbname, &mut err); db = rocksdb_open(db_options, dbname, &mut err);
CheckNoError!(err); CheckNoError!(err);
let mut cfh = rocksdb_create_column_family(db, let mut cfh = rocksdb_create_column_family(db, db_options, cstrp!("cf1"), &mut err);
db_options,
cstrp!("cf1"),
&mut err);
rocksdb_column_family_handle_destroy(cfh); rocksdb_column_family_handle_destroy(cfh);
CheckNoError!(err); CheckNoError!(err);
rocksdb_close(db); rocksdb_close(db);
let mut cflen: size_t = 0; let mut cflen: size_t = 0;
let column_fams_raw = rocksdb_list_column_families(db_options, let column_fams_raw =
dbname, rocksdb_list_column_families(db_options, dbname, &mut cflen, &mut err);
&mut cflen, let column_fams = slice::from_raw_parts(column_fams_raw, cflen as usize);
&mut err);
let column_fams = slice::from_raw_parts(column_fams_raw,
cflen as usize);
CheckEqual(cstrp!("default"), column_fams[0], 7); CheckEqual(cstrp!("default"), column_fams[0], 7);
CheckEqual(cstrp!("cf1"), column_fams[1], 3); CheckEqual(cstrp!("cf1"), column_fams[1], 3);
CheckCondition!(cflen == 2); CheckCondition!(cflen == 2);
@ -917,12 +892,10 @@ fn ffi() {
let mut cf_options = rocksdb_options_create(); let mut cf_options = rocksdb_options_create();
let cf_names: [*const c_char; 2] = [cstrp!("default"), let cf_names: [*const c_char; 2] = [cstrp!("default"), cstrp!("cf1")];
cstrp!("cf1")]; let cf_opts: [*const rocksdb_options_t; 2] = [cf_options, cf_options];
let cf_opts: [*const rocksdb_options_t; 2] = [cf_options, let mut handles: [*mut rocksdb_column_family_handle_t; 2] = [ptr::null_mut(),
cf_options]; ptr::null_mut()];
let mut handles: [*mut rocksdb_column_family_handle_t; 2] =
[ptr::null_mut(), ptr::null_mut()];
db = rocksdb_open_column_families(db_options, db = rocksdb_open_column_families(db_options,
dbname, dbname,
2, 2,
@ -942,42 +915,18 @@ fn ffi() {
&mut err); &mut err);
CheckNoError!(err); CheckNoError!(err);
CheckGetCF(db, CheckGetCF(db, roptions, handles[1], cstrp!("foo"), cstrp!("hello"));
roptions,
handles[1],
cstrp!("foo"),
cstrp!("hello"));
rocksdb_delete_cf(db, rocksdb_delete_cf(db, woptions, handles[1], cstrp!("foo"), 3, &mut err);
woptions,
handles[1],
cstrp!("foo"),
3,
&mut err);
CheckNoError!(err); CheckNoError!(err);
CheckGetCF(db, roptions, handles[1], cstrp!("foo"), ptr::null()); CheckGetCF(db, roptions, handles[1], cstrp!("foo"), ptr::null());
let mut wb = rocksdb_writebatch_create(); let mut wb = rocksdb_writebatch_create();
rocksdb_writebatch_put_cf(wb, rocksdb_writebatch_put_cf(wb, handles[1], cstrp!("baz"), 3, cstrp!("a"), 1);
handles[1],
cstrp!("baz"),
3,
cstrp!("a"),
1);
rocksdb_writebatch_clear(wb); rocksdb_writebatch_clear(wb);
rocksdb_writebatch_put_cf(wb, rocksdb_writebatch_put_cf(wb, handles[1], cstrp!("bar"), 3, cstrp!("b"), 1);
handles[1], rocksdb_writebatch_put_cf(wb, handles[1], cstrp!("box"), 3, cstrp!("c"), 1);
cstrp!("bar"),
3,
cstrp!("b"),
1);
rocksdb_writebatch_put_cf(wb,
handles[1],
cstrp!("box"),
3,
cstrp!("c"),
1);
rocksdb_writebatch_delete_cf(wb, handles[1], cstrp!("bar"), 3); rocksdb_writebatch_delete_cf(wb, handles[1], cstrp!("bar"), 3);
rocksdb_write(db, woptions, wb, &mut err); rocksdb_write(db, woptions, wb, &mut err);
CheckNoError!(err); CheckNoError!(err);
@ -986,16 +935,13 @@ fn ffi() {
CheckGetCF(db, roptions, handles[1], cstrp!("box"), cstrp!("c")); CheckGetCF(db, roptions, handles[1], cstrp!("box"), cstrp!("c"));
rocksdb_writebatch_destroy(wb); rocksdb_writebatch_destroy(wb);
let keys: [*const c_char; 3] = let keys: [*const c_char; 3] = [cstrp!("box"), cstrp!("box"), cstrp!("barfooxx")];
[cstrp!("box"), cstrp!("box"), cstrp!("barfooxx")]; let get_handles: [*const rocksdb_column_family_handle_t; 3] = [handles[0], handles[1],
let get_handles: [*const rocksdb_column_family_handle_t; 3] = handles[1]];
[handles[0], handles[1], handles[1]];
let keys_sizes: [size_t; 3] = [3, 3, 8]; let keys_sizes: [size_t; 3] = [3, 3, 8];
let mut vals: [*mut c_char; 3] = let mut vals: [*mut c_char; 3] = [ptr::null_mut(), ptr::null_mut(), ptr::null_mut()];
[ptr::null_mut(), ptr::null_mut(), ptr::null_mut()];
let mut vals_sizes: [size_t; 3] = [0, 0, 0]; let mut vals_sizes: [size_t; 3] = [0, 0, 0];
let mut errs: [*mut c_char; 3] = let mut errs: [*mut c_char; 3] = [ptr::null_mut(), ptr::null_mut(), ptr::null_mut()];
[ptr::null_mut(), ptr::null_mut(), ptr::null_mut()];
rocksdb_multi_get_cf(db, rocksdb_multi_get_cf(db,
roptions, roptions,
get_handles.as_ptr(), get_handles.as_ptr(),
@ -1032,9 +978,10 @@ fn ffi() {
CheckNoError!(err); CheckNoError!(err);
rocksdb_iter_destroy(iter); rocksdb_iter_destroy(iter);
let mut iters_cf_handles: [*mut rocksdb_column_family_handle_t; 2] = [handles[0], handles[1]]; let mut iters_cf_handles: [*mut rocksdb_column_family_handle_t; 2] = [handles[0],
let mut iters_handles: [*mut rocksdb_iterator_t; 2] = handles[1]];
[ptr::null_mut(), ptr::null_mut()]; let mut iters_handles: [*mut rocksdb_iterator_t; 2] = [ptr::null_mut(),
ptr::null_mut()];
rocksdb_create_iterators(db, rocksdb_create_iterators(db,
roptions, roptions,
iters_cf_handles.as_mut_ptr(), iters_cf_handles.as_mut_ptr(),
@ -1079,60 +1026,25 @@ fn ffi() {
{ {
// Create new database // Create new database
rocksdb_options_set_allow_mmap_reads(options, 1); rocksdb_options_set_allow_mmap_reads(options, 1);
rocksdb_options_set_prefix_extractor(options, rocksdb_slicetransform_create_fixed_prefix(3)); rocksdb_options_set_prefix_extractor(options,
rocksdb_slicetransform_create_fixed_prefix(3));
rocksdb_options_set_hash_skip_list_rep(options, 5000, 4, 4); rocksdb_options_set_hash_skip_list_rep(options, 5000, 4, 4);
rocksdb_options_set_plain_table_factory(options, 4, 10, 0.75, 16); rocksdb_options_set_plain_table_factory(options, 4, 10, 0.75, 16);
db = rocksdb_open(options, dbname, &mut err); db = rocksdb_open(options, dbname, &mut err);
CheckNoError!(err); CheckNoError!(err);
rocksdb_put(db, rocksdb_put(db, woptions, cstrp!("foo1"), 4, cstrp!("foo"), 3, &mut err);
woptions,
cstrp!("foo1"),
4,
cstrp!("foo"),
3,
&mut err);
CheckNoError!(err); CheckNoError!(err);
rocksdb_put(db, rocksdb_put(db, woptions, cstrp!("foo2"), 4, cstrp!("foo"), 3, &mut err);
woptions,
cstrp!("foo2"),
4,
cstrp!("foo"),
3,
&mut err);
CheckNoError!(err); CheckNoError!(err);
rocksdb_put(db, rocksdb_put(db, woptions, cstrp!("foo3"), 4, cstrp!("foo"), 3, &mut err);
woptions,
cstrp!("foo3"),
4,
cstrp!("foo"),
3,
&mut err);
CheckNoError!(err); CheckNoError!(err);
rocksdb_put(db, rocksdb_put(db, woptions, cstrp!("bar1"), 4, cstrp!("bar"), 3, &mut err);
woptions,
cstrp!("bar1"),
4,
cstrp!("bar"),
3,
&mut err);
CheckNoError!(err); CheckNoError!(err);
rocksdb_put(db, rocksdb_put(db, woptions, cstrp!("bar2"), 4, cstrp!("bar"), 3, &mut err);
woptions,
cstrp!("bar2"),
4,
cstrp!("bar"),
3,
&mut err);
CheckNoError!(err); CheckNoError!(err);
rocksdb_put(db, rocksdb_put(db, woptions, cstrp!("bar3"), 4, cstrp!("bar"), 3, &mut err);
woptions,
cstrp!("bar3"),
4,
cstrp!("bar"),
3,
&mut err);
CheckNoError!(err); CheckNoError!(err);
let mut iter = rocksdb_create_iterator(db, roptions); let mut iter = rocksdb_create_iterator(db, roptions);
@ -1162,8 +1074,7 @@ fn ffi() {
rocksdb_cuckoo_options_set_hash_ratio(cuckoo_options, 0.5); rocksdb_cuckoo_options_set_hash_ratio(cuckoo_options, 0.5);
rocksdb_cuckoo_options_set_max_search_depth(cuckoo_options, 200); rocksdb_cuckoo_options_set_max_search_depth(cuckoo_options, 200);
rocksdb_cuckoo_options_set_cuckoo_block_size(cuckoo_options, 10); rocksdb_cuckoo_options_set_cuckoo_block_size(cuckoo_options, 10);
rocksdb_cuckoo_options_set_identity_as_first_hash(cuckoo_options, rocksdb_cuckoo_options_set_identity_as_first_hash(cuckoo_options, 1);
1);
rocksdb_cuckoo_options_set_use_module_hash(cuckoo_options, 0); rocksdb_cuckoo_options_set_use_module_hash(cuckoo_options, 0);
rocksdb_options_set_cuckoo_table_factory(options, cuckoo_options); rocksdb_options_set_cuckoo_table_factory(options, cuckoo_options);
@ -1186,36 +1097,16 @@ fn ffi() {
rocksdb_put(db, woptions, cstrp!("a"), 1, cstrp!("0"), 1, &mut err); rocksdb_put(db, woptions, cstrp!("a"), 1, cstrp!("0"), 1, &mut err);
CheckNoError!(err); CheckNoError!(err);
rocksdb_put(db, rocksdb_put(db, woptions, cstrp!("foo"), 3, cstrp!("bar"), 3, &mut err);
woptions,
cstrp!("foo"),
3,
cstrp!("bar"),
3,
&mut err);
CheckNoError!(err); CheckNoError!(err);
rocksdb_put(db, rocksdb_put(db, woptions, cstrp!("foo1"), 4, cstrp!("bar1"), 4, &mut err);
woptions,
cstrp!("foo1"),
4,
cstrp!("bar1"),
4,
&mut err);
CheckNoError!(err); CheckNoError!(err);
rocksdb_put(db, rocksdb_put(db, woptions, cstrp!("g1"), 2, cstrp!("0"), 1, &mut err);
woptions,
cstrp!("g1"),
2,
cstrp!("0"),
1,
&mut err);
CheckNoError!(err); CheckNoError!(err);
// testing basic case with no iterate_upper_bound and no prefix_extractor // testing basic case with no iterate_upper_bound and no prefix_extractor
{ {
rocksdb_readoptions_set_iterate_upper_bound(roptions, rocksdb_readoptions_set_iterate_upper_bound(roptions, ptr::null(), 0);
ptr::null(),
0);
let mut iter = rocksdb_create_iterator(db, roptions); let mut iter = rocksdb_create_iterator(db, roptions);
rocksdb_iter_seek(iter, cstrp!("foo"), 3); rocksdb_iter_seek(iter, cstrp!("foo"), 3);
@ -1237,9 +1128,7 @@ fn ffi() {
// to make sure it stops at bound // to make sure it stops at bound
{ {
// iterate_upper_bound points beyond the last expected entry // iterate_upper_bound points beyond the last expected entry
rocksdb_readoptions_set_iterate_upper_bound(roptions, rocksdb_readoptions_set_iterate_upper_bound(roptions, cstrp!("foo2"), 4);
cstrp!("foo2"),
4);
let mut iter = rocksdb_create_iterator(db, roptions); let mut iter = rocksdb_create_iterator(db, roptions);

@ -13,29 +13,105 @@
// limitations under the License. // limitations under the License.
// //
//! Rust wrapper for RocksDB.
//!
//! # Examples
//!
//! ```
//! use rocksdb::DB;
//! // NB: db is automatically closed at end of lifetime
//! let db = DB::open_default("path/for/rocksdb/storage").unwrap();
//! db.put(b"my key", b"my value");
//! match db.get(b"my key") {
//! Ok(Some(value)) => println!("retrieved value {}", value.to_utf8().unwrap()),
//! Ok(None) => println!("value not found"),
//! Err(e) => println!("operational problem encountered: {}", e),
//! }
//! db.delete(b"my key").unwrap();
//! ```
//!
extern crate libc; extern crate libc;
extern crate rocksdb_sys as ffi; extern crate librocksdb_sys as ffi;
#[macro_use] #[macro_use]
mod ffi_util; mod ffi_util;
pub mod comparator; mod comparator;
pub mod merge_operator; pub mod merge_operator;
mod rocksdb; mod rocksdb;
mod rocksdb_options; mod rocksdb_options;
pub use rocksdb::{DB, DBCompactionStyle, DBCompressionType, DBIterator, DBRecoveryMode, DBVector, pub use rocksdb::{DB, DBCompactionStyle, DBCompressionType, DBIterator, DBRecoveryMode, DBVector,
Direction, Error, IteratorMode, Snapshot, Writable, WriteBatch, new_bloom_filter}; Direction, Error, IteratorMode, Snapshot, WriteBatch, new_bloom_filter};
pub use merge_operator::MergeOperands; pub use merge_operator::MergeOperands;
/// For configuring block-based file storage.
pub struct BlockBasedOptions { pub struct BlockBasedOptions {
inner: *mut ffi::rocksdb_block_based_table_options_t, inner: *mut ffi::rocksdb_block_based_table_options_t,
} }
/// Database-wide options around performance and behavior.
///
/// Please read [the official tuning guide](https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide), and most importantly, measure performance under realistic workloads with realistic hardware.
///
/// # Examples
///
/// ```
/// use rocksdb::{Options, DB};
/// use rocksdb::DBCompactionStyle;
///
/// fn badly_tuned_for_somebody_elses_disk() -> DB {
/// let path = "path/for/rocksdb/storage5";
/// let mut opts = Options::default();
/// opts.create_if_missing(true);
/// opts.set_max_open_files(10000);
/// opts.set_use_fsync(false);
/// opts.set_bytes_per_sync(8388608);
/// opts.set_disable_data_sync(false);
/// opts.optimize_for_point_lookup(1024);
/// opts.set_table_cache_num_shard_bits(6);
/// opts.set_max_write_buffer_number(32);
/// opts.set_write_buffer_size(536870912);
/// opts.set_target_file_size_base(1073741824);
/// opts.set_min_write_buffer_number_to_merge(4);
/// opts.set_level_zero_stop_writes_trigger(2000);
/// opts.set_level_zero_slowdown_writes_trigger(0);
/// opts.set_compaction_style(DBCompactionStyle::Universal);
/// opts.set_max_background_compactions(4);
/// opts.set_max_background_flushes(4);
/// opts.set_disable_auto_compactions(true);
///
/// DB::open(&opts, path).unwrap()
/// }
/// ```
pub struct Options { pub struct Options {
inner: *mut ffi::rocksdb_options_t, inner: *mut ffi::rocksdb_options_t,
} }
/// Optionally disable WAL or sync for this write.
///
/// # Examples
///
/// Making an unsafe write of a batch:
///
/// ```
/// use rocksdb::{DB, WriteBatch, WriteOptions};
///
/// let db = DB::open_default("path/for/rocksdb/storage6").unwrap();
///
/// let mut batch = WriteBatch::default();
/// batch.put(b"my key", b"my value");
/// batch.put(b"key2", b"value2");
/// batch.put(b"key3", b"value3");
///
/// let mut write_options = WriteOptions::default();
/// write_options.set_sync(false);
/// write_options.disable_wal(true);
///
/// db.write_opt(batch, &write_options);
/// ```
pub struct WriteOptions { pub struct WriteOptions {
inner: *mut ffi::rocksdb_writeoptions_t, inner: *mut ffi::rocksdb_writeoptions_t,
} }

@ -13,6 +13,46 @@
// limitations under the License. // limitations under the License.
// //
//! rustic merge operator
//!
//! ```
//! use rocksdb::{Options, DB, MergeOperands};
//!
//! fn concat_merge(new_key: &[u8],
//! existing_val: Option<&[u8]>,
//! operands: &mut MergeOperands)
//! -> Vec<u8> {
//!
//! let mut result: Vec<u8> = Vec::with_capacity(operands.size_hint().0);
//! existing_val.map(|v| {
//! for e in v {
//! result.push(*e)
//! }
//! });
//! for op in operands {
//! for e in op {
//! result.push(*e)
//! }
//! }
//! result
//! }
//!
//! fn main() {
//! let path = "path/to/rocksdb";
//! let mut opts = Options::default();
//! opts.create_if_missing(true);
//! opts.add_merge_operator("test operator", concat_merge);
//! let db = DB::open(&opts, path).unwrap();
//! let p = db.put(b"k1", b"a");
//! db.merge(b"k1", b"b");
//! db.merge(b"k1", b"c");
//! db.merge(b"k1", b"d");
//! db.merge(b"k1", b"efg");
//! let r = db.get(b"k1");
//! assert!(r.unwrap().unwrap().to_utf8().unwrap() == "abcdefg");
//! }
//! ```
use std::ffi::CString; use std::ffi::CString;
use std::mem; use std::mem;
use std::ptr; use std::ptr;
@ -160,7 +200,7 @@ fn test_provided_merge(new_key: &[u8],
#[test] #[test]
fn mergetest() { fn mergetest() {
use Options; use Options;
use rocksdb::{DB, Writable}; use rocksdb::DB;
let path = "_rust_rocksdb_mergetest"; let path = "_rust_rocksdb_mergetest";
let mut opts = Options::default(); let mut opts = Options::default();

@ -37,6 +37,7 @@ pub fn new_cache(capacity: size_t) -> *mut ffi::rocksdb_cache_t {
unsafe { ffi::rocksdb_cache_create_lru(capacity) } unsafe { ffi::rocksdb_cache_create_lru(capacity) }
} }
/// RocksDB wrapper object.
pub struct DB { pub struct DB {
inner: *mut ffi::rocksdb_t, inner: *mut ffi::rocksdb_t,
cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>, cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
@ -71,6 +72,22 @@ pub enum DBRecoveryMode {
SkipAnyCorruptedRecords = 3, SkipAnyCorruptedRecords = 3,
} }
/// An atomic batch of mutations.
///
/// Making an atomic commit of several writes:
///
/// ```
/// use rocksdb::{DB, WriteBatch};
///
/// let db = DB::open_default("path/for/rocksdb/storage1").unwrap();
/// {
/// let mut batch = WriteBatch::default();
/// batch.put(b"my key", b"my value");
/// batch.put(b"key2", b"value2");
/// batch.put(b"key3", b"value3");
/// db.write(batch); // Atomically commits the batch
/// }
/// ```
pub struct WriteBatch { pub struct WriteBatch {
inner: *mut ffi::rocksdb_writebatch_t, inner: *mut ffi::rocksdb_writebatch_t,
} }
@ -79,11 +96,48 @@ pub struct ReadOptions {
inner: *mut ffi::rocksdb_readoptions_t, inner: *mut ffi::rocksdb_readoptions_t,
} }
/// A consistent view of the database at the point of creation.
///
/// ```
/// use rocksdb::{DB, IteratorMode};
///
/// let db = DB::open_default("path/for/rocksdb/storage3").unwrap();
/// let snapshot = db.snapshot(); // Creates a longer-term snapshot of the DB, but closed when goes out of scope
/// let mut iter = snapshot.iterator(IteratorMode::Start); // Make as many iterators as you'd like from one snapshot
/// ```
///
pub struct Snapshot<'a> { pub struct Snapshot<'a> {
db: &'a DB, db: &'a DB,
inner: *const ffi::rocksdb_snapshot_t, inner: *const ffi::rocksdb_snapshot_t,
} }
/// An iterator over a database or column family, with specifiable
/// ranges and direction.
///
/// ```
/// use rocksdb::{DB, Direction, IteratorMode};
///
/// let mut db = DB::open_default("path/for/rocksdb/storage2").unwrap();
/// let mut iter = db.iterator(IteratorMode::Start); // Always iterates forward
/// for (key, value) in iter {
/// println!("Saw {:?} {:?}", key, value);
/// }
/// iter = db.iterator(IteratorMode::End); // Always iterates backward
/// for (key, value) in iter {
/// println!("Saw {:?} {:?}", key, value);
/// }
/// iter = db.iterator(IteratorMode::From(b"my key", Direction::Forward)); // From a key in Direction::{forward,reverse}
/// for (key, value) in iter {
/// println!("Saw {:?} {:?}", key, value);
/// }
///
/// // You can seek with an existing Iterator instance, too
/// iter = db.iterator(IteratorMode::Start);
/// iter.set_mode(IteratorMode::From(b"another key", Direction::Reverse));
/// for (key, value) in iter {
/// println!("Saw {:?} {:?}", key, value);
/// }
/// ```
pub struct DBIterator { pub struct DBIterator {
inner: *mut ffi::rocksdb_iterator_t, inner: *mut ffi::rocksdb_iterator_t,
direction: Direction, direction: Direction,
@ -284,27 +338,6 @@ impl<'a> Drop for Snapshot<'a> {
} }
} }
// This is for the DB and write batches to share the same API.
pub trait Writable {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error>;
fn put_cf(&self,
cf: *mut ffi::rocksdb_column_family_handle_t,
key: &[u8],
value: &[u8])
-> Result<(), Error>;
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), Error>;
fn merge_cf(&self,
cf: *mut ffi::rocksdb_column_family_handle_t,
key: &[u8],
value: &[u8])
-> Result<(), Error>;
fn delete(&self, key: &[u8]) -> Result<(), Error>;
fn delete_cf(&self,
cf: *mut ffi::rocksdb_column_family_handle_t,
key: &[u8])
-> Result<(), Error>;
}
impl DB { impl DB {
/// Open a database with default options. /// Open a database with default options.
pub fn open_default<P: AsRef<Path>>(path: P) -> Result<DB, Error> { pub fn open_default<P: AsRef<Path>>(path: P) -> Result<DB, Error> {
@ -611,12 +644,12 @@ impl DB {
} }
} }
fn merge_cf_opt(&self, pub fn merge_cf_opt(&self,
cf: *mut ffi::rocksdb_column_family_handle_t, cf: *mut ffi::rocksdb_column_family_handle_t,
key: &[u8], key: &[u8],
value: &[u8], value: &[u8],
writeopts: &WriteOptions) writeopts: &WriteOptions)
-> Result<(), Error> { -> Result<(), Error> {
unsafe { unsafe {
ffi_try!(ffi::rocksdb_merge_cf(self.inner, ffi_try!(ffi::rocksdb_merge_cf(self.inner,
writeopts.inner, writeopts.inner,
@ -629,7 +662,7 @@ impl DB {
} }
} }
fn delete_opt(&self, key: &[u8], writeopts: &WriteOptions) -> Result<(), Error> { pub fn delete_opt(&self, key: &[u8], writeopts: &WriteOptions) -> Result<(), Error> {
unsafe { unsafe {
ffi_try!(ffi::rocksdb_delete(self.inner, ffi_try!(ffi::rocksdb_delete(self.inner,
writeopts.inner, writeopts.inner,
@ -639,11 +672,11 @@ impl DB {
} }
} }
fn delete_cf_opt(&self, pub fn delete_cf_opt(&self,
cf: *mut ffi::rocksdb_column_family_handle_t, cf: *mut ffi::rocksdb_column_family_handle_t,
key: &[u8], key: &[u8],
writeopts: &WriteOptions) writeopts: &WriteOptions)
-> Result<(), Error> { -> Result<(), Error> {
unsafe { unsafe {
ffi_try!(ffi::rocksdb_delete_cf(self.inner, ffi_try!(ffi::rocksdb_delete_cf(self.inner,
writeopts.inner, writeopts.inner,
@ -653,41 +686,39 @@ impl DB {
Ok(()) Ok(())
} }
} }
}
impl Writable for DB { pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> {
self.put_opt(key, value, &WriteOptions::default()) self.put_opt(key, value, &WriteOptions::default())
} }
fn put_cf(&self, pub fn put_cf(&self,
cf: *mut ffi::rocksdb_column_family_handle_t, cf: *mut ffi::rocksdb_column_family_handle_t,
key: &[u8], key: &[u8],
value: &[u8]) value: &[u8])
-> Result<(), Error> { -> Result<(), Error> {
self.put_cf_opt(cf, key, value, &WriteOptions::default()) self.put_cf_opt(cf, key, value, &WriteOptions::default())
} }
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), Error> { pub fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), Error> {
self.merge_opt(key, value, &WriteOptions::default()) self.merge_opt(key, value, &WriteOptions::default())
} }
fn merge_cf(&self, pub fn merge_cf(&self,
cf: *mut ffi::rocksdb_column_family_handle_t, cf: *mut ffi::rocksdb_column_family_handle_t,
key: &[u8], key: &[u8],
value: &[u8]) value: &[u8])
-> Result<(), Error> { -> Result<(), Error> {
self.merge_cf_opt(cf, key, value, &WriteOptions::default()) self.merge_cf_opt(cf, key, value, &WriteOptions::default())
} }
fn delete(&self, key: &[u8]) -> Result<(), Error> { pub fn delete(&self, key: &[u8]) -> Result<(), Error> {
self.delete_opt(key, &WriteOptions::default()) self.delete_opt(key, &WriteOptions::default())
} }
fn delete_cf(&self, pub fn delete_cf(&self,
cf: *mut ffi::rocksdb_column_family_handle_t, cf: *mut ffi::rocksdb_column_family_handle_t,
key: &[u8]) key: &[u8])
-> Result<(), Error> { -> Result<(), Error> {
self.delete_cf_opt(cf, key, &WriteOptions::default()) self.delete_cf_opt(cf, key, &WriteOptions::default())
} }
} }
@ -700,40 +731,9 @@ impl WriteBatch {
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.len() == 0 self.len() == 0
} }
}
impl Default for WriteBatch {
fn default() -> WriteBatch {
WriteBatch { inner: unsafe { ffi::rocksdb_writebatch_create() } }
}
}
impl Drop for WriteBatch {
fn drop(&mut self) {
unsafe { ffi::rocksdb_writebatch_destroy(self.inner) }
}
}
impl Drop for DB {
fn drop(&mut self) {
unsafe {
for cf in self.cfs.values() {
ffi::rocksdb_column_family_handle_destroy(*cf);
}
ffi::rocksdb_close(self.inner);
}
}
}
impl fmt::Debug for DB {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "RocksDB {{ path: {:?} }}", self.path())
}
}
impl Writable for WriteBatch {
/// Insert a value into the database under the given key. /// Insert a value into the database under the given key.
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> { pub fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), Error> {
unsafe { unsafe {
ffi::rocksdb_writebatch_put(self.inner, ffi::rocksdb_writebatch_put(self.inner,
key.as_ptr() as *const i8, key.as_ptr() as *const i8,
@ -744,11 +744,11 @@ impl Writable for WriteBatch {
} }
} }
fn put_cf(&self, pub fn put_cf(&mut self,
cf: *mut ffi::rocksdb_column_family_handle_t, cf: *mut ffi::rocksdb_column_family_handle_t,
key: &[u8], key: &[u8],
value: &[u8]) value: &[u8])
-> Result<(), Error> { -> Result<(), Error> {
unsafe { unsafe {
ffi::rocksdb_writebatch_put_cf(self.inner, ffi::rocksdb_writebatch_put_cf(self.inner,
cf, cf,
@ -760,7 +760,7 @@ impl Writable for WriteBatch {
} }
} }
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), Error> { pub fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<(), Error> {
unsafe { unsafe {
ffi::rocksdb_writebatch_merge(self.inner, ffi::rocksdb_writebatch_merge(self.inner,
key.as_ptr() as *const i8, key.as_ptr() as *const i8,
@ -771,11 +771,11 @@ impl Writable for WriteBatch {
} }
} }
fn merge_cf(&self, pub fn merge_cf(&mut self,
cf: *mut ffi::rocksdb_column_family_handle_t, cf: *mut ffi::rocksdb_column_family_handle_t,
key: &[u8], key: &[u8],
value: &[u8]) value: &[u8])
-> Result<(), Error> { -> Result<(), Error> {
unsafe { unsafe {
ffi::rocksdb_writebatch_merge_cf(self.inner, ffi::rocksdb_writebatch_merge_cf(self.inner,
cf, cf,
@ -790,7 +790,7 @@ impl Writable for WriteBatch {
/// Remove the database entry for key. /// Remove the database entry for key.
/// ///
/// Returns Err if the key was not found /// Returns Err if the key was not found
fn delete(&self, key: &[u8]) -> Result<(), Error> { pub fn delete(&mut self, key: &[u8]) -> Result<(), Error> {
unsafe { unsafe {
ffi::rocksdb_writebatch_delete(self.inner, ffi::rocksdb_writebatch_delete(self.inner,
key.as_ptr() as *const i8, key.as_ptr() as *const i8,
@ -799,10 +799,10 @@ impl Writable for WriteBatch {
} }
} }
fn delete_cf(&self, pub fn delete_cf(&mut self,
cf: *mut ffi::rocksdb_column_family_handle_t, cf: *mut ffi::rocksdb_column_family_handle_t,
key: &[u8]) key: &[u8])
-> Result<(), Error> { -> Result<(), Error> {
unsafe { unsafe {
ffi::rocksdb_writebatch_delete_cf(self.inner, ffi::rocksdb_writebatch_delete_cf(self.inner,
cf, cf,
@ -813,6 +813,35 @@ impl Writable for WriteBatch {
} }
} }
impl Default for WriteBatch {
fn default() -> WriteBatch {
WriteBatch { inner: unsafe { ffi::rocksdb_writebatch_create() } }
}
}
impl Drop for WriteBatch {
fn drop(&mut self) {
unsafe { ffi::rocksdb_writebatch_destroy(self.inner) }
}
}
impl Drop for DB {
fn drop(&mut self) {
unsafe {
for cf in self.cfs.values() {
ffi::rocksdb_column_family_handle_destroy(*cf);
}
ffi::rocksdb_close(self.inner);
}
}
}
impl fmt::Debug for DB {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "RocksDB {{ path: {:?} }}", self.path())
}
}
impl Drop for ReadOptions { impl Drop for ReadOptions {
fn drop(&mut self) { fn drop(&mut self) {
unsafe { ffi::rocksdb_readoptions_destroy(self.inner) } unsafe { ffi::rocksdb_readoptions_destroy(self.inner) }
@ -926,7 +955,7 @@ fn writebatch_works() {
let db = DB::open_default(path).unwrap(); let db = DB::open_default(path).unwrap();
{ {
// test put // test put
let batch = WriteBatch::default(); let mut batch = WriteBatch::default();
assert!(db.get(b"k1").unwrap().is_none()); assert!(db.get(b"k1").unwrap().is_none());
assert_eq!(batch.len(), 0); assert_eq!(batch.len(), 0);
assert!(batch.is_empty()); assert!(batch.is_empty());
@ -941,7 +970,7 @@ fn writebatch_works() {
} }
{ {
// test delete // test delete
let batch = WriteBatch::default(); let mut batch = WriteBatch::default();
let _ = batch.delete(b"k1"); let _ = batch.delete(b"k1");
assert_eq!(batch.len(), 1); assert_eq!(batch.len(), 1);
assert!(!batch.is_empty()); assert!(!batch.is_empty());

@ -13,7 +13,7 @@
// limitations under the License. // limitations under the License.
// //
use rocksdb::{DB, MergeOperands, Options, Writable}; use rocksdb::{DB, MergeOperands, Options};
#[test] #[test]
pub fn test_column_family() { pub fn test_column_family() {
@ -96,8 +96,7 @@ fn test_merge_operator() {
}; };
let cf1 = *db.cf_handle("cf1").unwrap(); let cf1 = *db.cf_handle("cf1").unwrap();
assert!(db.put_cf(cf1, b"k1", b"v1").is_ok()); assert!(db.put_cf(cf1, b"k1", b"v1").is_ok());
assert!(db.get_cf(cf1, b"k1").unwrap().unwrap().to_utf8().unwrap() == assert!(db.get_cf(cf1, b"k1").unwrap().unwrap().to_utf8().unwrap() == "v1");
"v1");
let p = db.put_cf(cf1, b"k1", b"a"); let p = db.put_cf(cf1, b"k1", b"a");
assert!(p.is_ok()); assert!(p.is_ok());
db.merge_cf(cf1, b"k1", b"b").unwrap(); db.merge_cf(cf1, b"k1", b"b").unwrap();

@ -13,7 +13,7 @@
// limitations under the License. // limitations under the License.
// //
use rocksdb::{DB, Direction, IteratorMode, Options, Writable}; use rocksdb::{DB, Direction, IteratorMode, Options};
fn cba(input: &Box<[u8]>) -> Box<[u8]> { fn cba(input: &Box<[u8]>) -> Box<[u8]> {
input.iter().cloned().collect::<Vec<_>>().into_boxed_slice() input.iter().cloned().collect::<Vec<_>>().into_boxed_slice()
@ -38,9 +38,7 @@ pub fn test_iterator() {
assert!(p.is_ok()); assert!(p.is_ok());
let p = db.put(&*k3, &*v3); let p = db.put(&*k3, &*v3);
assert!(p.is_ok()); assert!(p.is_ok());
let expected = vec![(cba(&k1), cba(&v1)), let expected = vec![(cba(&k1), cba(&v1)), (cba(&k2), cba(&v2)), (cba(&k3), cba(&v3))];
(cba(&k2), cba(&v2)),
(cba(&k3), cba(&v3))];
{ {
let iterator1 = db.iterator(IteratorMode::Start); let iterator1 = db.iterator(IteratorMode::Start);
assert_eq!(iterator1.collect::<Vec<_>>(), expected); assert_eq!(iterator1.collect::<Vec<_>>(), expected);
@ -114,48 +112,35 @@ pub fn test_iterator() {
assert_eq!(iterator1.collect::<Vec<_>>(), expected2); assert_eq!(iterator1.collect::<Vec<_>>(), expected2);
} }
{ {
let iterator1 = let iterator1 = db.iterator(IteratorMode::From(b"k2", Direction::Forward));
db.iterator(IteratorMode::From(b"k2", Direction::Forward)); let expected = vec![(cba(&k2), cba(&v2)), (cba(&k3), cba(&v3)), (cba(&k4), cba(&v4))];
let expected = vec![(cba(&k2), cba(&v2)),
(cba(&k3), cba(&v3)),
(cba(&k4), cba(&v4))];
assert_eq!(iterator1.collect::<Vec<_>>(), expected); assert_eq!(iterator1.collect::<Vec<_>>(), expected);
} }
{ {
let iterator1 = let iterator1 = db.iterator(IteratorMode::From(b"k2", Direction::Reverse));
db.iterator(IteratorMode::From(b"k2", Direction::Reverse));
let expected = vec![(cba(&k2), cba(&v2)), (cba(&k1), cba(&v1))]; let expected = vec![(cba(&k2), cba(&v2)), (cba(&k1), cba(&v1))];
assert_eq!(iterator1.collect::<Vec<_>>(), expected); assert_eq!(iterator1.collect::<Vec<_>>(), expected);
} }
{ {
let iterator1 = let iterator1 = db.iterator(IteratorMode::From(b"k0", Direction::Forward));
db.iterator(IteratorMode::From(b"k0", Direction::Forward));
assert!(iterator1.valid()); assert!(iterator1.valid());
let iterator2 = let iterator2 = db.iterator(IteratorMode::From(b"k1", Direction::Forward));
db.iterator(IteratorMode::From(b"k1", Direction::Forward));
assert!(iterator2.valid()); assert!(iterator2.valid());
let iterator3 = let iterator3 = db.iterator(IteratorMode::From(b"k11", Direction::Forward));
db.iterator(IteratorMode::From(b"k11", Direction::Forward));
assert!(iterator3.valid()); assert!(iterator3.valid());
let iterator4 = let iterator4 = db.iterator(IteratorMode::From(b"k5", Direction::Forward));
db.iterator(IteratorMode::From(b"k5", Direction::Forward));
assert!(!iterator4.valid()); assert!(!iterator4.valid());
let iterator5 = let iterator5 = db.iterator(IteratorMode::From(b"k0", Direction::Reverse));
db.iterator(IteratorMode::From(b"k0", Direction::Reverse));
assert!(iterator5.valid()); assert!(iterator5.valid());
let iterator6 = let iterator6 = db.iterator(IteratorMode::From(b"k1", Direction::Reverse));
db.iterator(IteratorMode::From(b"k1", Direction::Reverse));
assert!(iterator6.valid()); assert!(iterator6.valid());
let iterator7 = let iterator7 = db.iterator(IteratorMode::From(b"k11", Direction::Reverse));
db.iterator(IteratorMode::From(b"k11", Direction::Reverse));
assert!(iterator7.valid()); assert!(iterator7.valid());
let iterator8 = let iterator8 = db.iterator(IteratorMode::From(b"k5", Direction::Reverse));
db.iterator(IteratorMode::From(b"k5", Direction::Reverse));
assert!(!iterator8.valid()); assert!(!iterator8.valid());
} }
{ {
let mut iterator1 = let mut iterator1 = db.iterator(IteratorMode::From(b"k4", Direction::Forward));
db.iterator(IteratorMode::From(b"k4", Direction::Forward));
iterator1.next(); iterator1.next();
assert!(iterator1.valid()); assert!(iterator1.valid());
iterator1.next(); iterator1.next();

@ -13,7 +13,7 @@
// limitations under the License. // limitations under the License.
// //
use rocksdb::{DB, Options, Writable}; use rocksdb::{DB, Options};
use std::thread; use std::thread;
use std::sync::Arc; use std::sync::Arc;

Loading…
Cancel
Save