Merge pull request #27 from spacejam/tyler_cf

basic cf support
master
Tyler Neely 9 years ago
commit 0dc5a4b0e7
  1. 1
      .gitignore
  2. 1
      .travis.yml
  3. 2
      Cargo.toml
  4. 6
      README.md
  5. 6
      src/comparator.rs
  6. 331
      src/ffi.rs
  7. 4
      src/lib.rs
  8. 28
      src/main.rs
  9. 8
      src/merge_operator.rs
  10. 408
      src/rocksdb.rs
  11. 16
      src/rocksdb_options.rs
  12. 1
      test/test.rs
  13. 133
      test/test_column_family.rs
  14. 6
      test/test_iterator.rs
  15. 65
      test/test_multithreaded.rs

1
.gitignore vendored

@ -1,3 +1,4 @@
*.swo
*.swp
target
Cargo.lock

@ -3,6 +3,7 @@ language: rust
rust:
- nightly
- 1.1.0
- 1.2.0
os:
- linux

@ -2,7 +2,7 @@
name = "rocksdb"
description = "A Rust wrapper for Facebook's RocksDB embeddable database."
version = "0.1.0"
version = "0.1.1"
authors = ["Tyler Neely <t@jujit.su>", "David Greenberg <dsg123456789@gmail.com>"]
license = "Apache-2.0"
exclude = [

@ -3,7 +3,7 @@ rust-rocksdb
[![Build Status](https://travis-ci.org/spacejam/rust-rocksdb.svg?branch=master)](https://travis-ci.org/spacejam/rust-rocksdb)
[![crates.io](http://meritbadge.herokuapp.com/rocksdb)](https://crates.io/crates/rocksdb)
This library has been tested against RocksDB 3.8.1 on linux and OSX. The 0.1.0 crate should work with the Rust 1.1 stable and nightly releases as of 8/2/15.
This library has been tested against RocksDB 3.8.1 on linux and OSX. The 0.1.1 crate should work with the Rust 1.2 stable and nightly releases as of 9/7/15.
### status
- [x] basic open/put/get/delete/close
@ -15,7 +15,7 @@ This library has been tested against RocksDB 3.8.1 on linux and OSX. The 0.1.0
- [x] iterator
- [x] comparator
- [x] snapshot
- [ ] column family operations
- [x] column family operations
- [ ] slicetransform
- [ ] windows support
@ -32,7 +32,7 @@ sudo make install
###### Cargo.toml
```rust
[dependencies]
rocksdb = "~0.1.0"
rocksdb = "~0.1.1"
```
###### Code
```rust

@ -21,7 +21,7 @@ use std::ptr;
use std::slice;
use rocksdb_options::Options;
use rocksdb::RocksDB;
use rocksdb::DB;
pub struct ComparatorCallback {
pub name: CString,
@ -75,8 +75,8 @@ fn test_reverse_compare(a: &[u8], b: &[u8]) -> c_int {
// opts.create_if_missing(true);
// opts.add_comparator("test comparator", test_reverse_compare);
// {
// let db = RocksDB::open(&opts, path).unwrap();
// let db = DB::open(&opts, path).unwrap();
// // TODO add interesting test
// }
// assert!(RocksDB::destroy(&opts, path).is_ok());
// assert!(DB::destroy(&opts, path).is_ok());
//}

@ -19,75 +19,75 @@ use std::ffi::CString;
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBOptions(pub *const c_void);
pub struct DBOptions(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBInstance(pub *const c_void);
pub struct DBInstance(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBWriteOptions(pub *const c_void);
pub struct DBWriteOptions(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBReadOptions(pub *const c_void);
pub struct DBReadOptions(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBMergeOperator(pub *const c_void);
pub struct DBMergeOperator(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBBlockBasedTableOptions(pub *const c_void);
pub struct DBBlockBasedTableOptions(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBCache(pub *const c_void);
pub struct DBCache(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBFilterPolicy(pub *const c_void);
pub struct DBFilterPolicy(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBSnapshot(pub *const c_void);
pub struct DBSnapshot(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBIterator(pub *const c_void);
pub struct DBIterator(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBCFHandle(pub *const c_void);
pub struct DBCFHandle(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBWriteBatch(pub *const c_void);
pub struct DBWriteBatch(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct RocksDBComparator(pub *const c_void);
pub struct DBComparator(pub *const c_void);
pub fn new_bloom_filter(bits: c_int) -> RocksDBFilterPolicy {
pub fn new_bloom_filter(bits: c_int) -> DBFilterPolicy {
unsafe {
rocksdb_filterpolicy_create_bloom(bits)
}
}
pub fn new_cache(capacity: size_t) -> RocksDBCache {
pub fn new_cache(capacity: size_t) -> DBCache {
unsafe {
rocksdb_cache_create_lru(capacity)
}
}
#[repr(C)]
pub enum RocksDBCompressionType {
RocksDBNoCompression = 0,
RocksDBSnappyCompression = 1,
RocksDBZlibCompression = 2,
RocksDBBz2Compression = 3,
RocksDBLz4Compression = 4,
RocksDBLz4hcCompression = 5,
pub enum DBCompressionType {
DBNoCompression = 0,
DBSnappyCompression = 1,
DBZlibCompression = 2,
DBBz2Compression = 3,
DBLz4Compression = 4,
DBLz4hcCompression = 5,
}
#[repr(C)]
pub enum RocksDBCompactionStyle {
RocksDBLevelCompaction = 0,
RocksDBUniversalCompaction = 1,
RocksDBFifoCompaction = 2,
pub enum DBCompactionStyle {
DBLevelCompaction = 0,
DBUniversalCompaction = 1,
DBFifoCompaction = 2,
}
#[repr(C)]
pub enum RocksDBUniversalCompactionStyle {
pub enum DBUniversalCompactionStyle {
rocksdb_similar_size_compaction_stop_style = 0,
rocksdb_total_size_compaction_stop_style = 1,
}
@ -95,160 +95,178 @@ pub enum RocksDBUniversalCompactionStyle {
//TODO audit the use of boolean arguments, b/c I think they need to be u8 instead...
#[link(name = "rocksdb")]
extern {
pub fn rocksdb_options_create() -> RocksDBOptions;
pub fn rocksdb_options_destroy(opts: RocksDBOptions);
pub fn rocksdb_cache_create_lru(capacity: size_t) -> RocksDBCache;
pub fn rocksdb_cache_destroy(cache: RocksDBCache);
pub fn rocksdb_block_based_options_create() -> RocksDBBlockBasedTableOptions;
pub fn rocksdb_block_based_options_destroy(opts: RocksDBBlockBasedTableOptions);
pub fn rocksdb_options_create() -> DBOptions;
pub fn rocksdb_options_destroy(opts: DBOptions);
pub fn rocksdb_cache_create_lru(capacity: size_t) -> DBCache;
pub fn rocksdb_cache_destroy(cache: DBCache);
pub fn rocksdb_block_based_options_create() -> DBBlockBasedTableOptions;
pub fn rocksdb_block_based_options_destroy(opts: DBBlockBasedTableOptions);
pub fn rocksdb_block_based_options_set_block_size(
block_options: RocksDBBlockBasedTableOptions,
block_options: DBBlockBasedTableOptions,
block_size: size_t);
pub fn rocksdb_block_based_options_set_block_size_deviation(
block_options: RocksDBBlockBasedTableOptions,
block_options: DBBlockBasedTableOptions,
block_size_deviation: c_int);
pub fn rocksdb_block_based_options_set_block_restart_interval(
block_options: RocksDBBlockBasedTableOptions,
block_options: DBBlockBasedTableOptions,
block_restart_interval: c_int);
pub fn rocksdb_block_based_options_set_filter_policy(
block_options: RocksDBBlockBasedTableOptions,
filter_policy: RocksDBFilterPolicy);
block_options: DBBlockBasedTableOptions,
filter_policy: DBFilterPolicy);
pub fn rocksdb_block_based_options_set_no_block_cache(
block_options: RocksDBBlockBasedTableOptions, no_block_cache: bool);
block_options: DBBlockBasedTableOptions, no_block_cache: bool);
pub fn rocksdb_block_based_options_set_block_cache(
block_options: RocksDBBlockBasedTableOptions, block_cache: RocksDBCache);
block_options: DBBlockBasedTableOptions, block_cache: DBCache);
pub fn rocksdb_block_based_options_set_block_cache_compressed(
block_options: RocksDBBlockBasedTableOptions,
block_cache_compressed: RocksDBCache);
block_options: DBBlockBasedTableOptions,
block_cache_compressed: DBCache);
pub fn rocksdb_block_based_options_set_whole_key_filtering(
ck_options: RocksDBBlockBasedTableOptions, doit: bool);
ck_options: DBBlockBasedTableOptions, doit: bool);
pub fn rocksdb_options_set_block_based_table_factory(
options: RocksDBOptions,
block_options: RocksDBBlockBasedTableOptions);
options: DBOptions,
block_options: DBBlockBasedTableOptions);
pub fn rocksdb_options_increase_parallelism(
options: RocksDBOptions, threads: c_int);
options: DBOptions, threads: c_int);
pub fn rocksdb_options_optimize_level_style_compaction(
options: RocksDBOptions, memtable_memory_budget: c_int);
options: DBOptions, memtable_memory_budget: c_int);
pub fn rocksdb_options_set_create_if_missing(
options: RocksDBOptions, v: bool);
options: DBOptions, v: bool);
pub fn rocksdb_options_set_max_open_files(
options: RocksDBOptions, files: c_int);
options: DBOptions, files: c_int);
pub fn rocksdb_options_set_use_fsync(
options: RocksDBOptions, v: c_int);
options: DBOptions, v: c_int);
pub fn rocksdb_options_set_bytes_per_sync(
options: RocksDBOptions, bytes: u64);
options: DBOptions, bytes: u64);
pub fn rocksdb_options_set_disable_data_sync(
options: RocksDBOptions, v: c_int);
options: DBOptions, v: c_int);
pub fn rocksdb_options_optimize_for_point_lookup(
options: RocksDBOptions, block_cache_size_mb: u64);
options: DBOptions, block_cache_size_mb: u64);
pub fn rocksdb_options_set_table_cache_numshardbits(
options: RocksDBOptions, bits: c_int);
options: DBOptions, bits: c_int);
pub fn rocksdb_options_set_max_write_buffer_number(
options: RocksDBOptions, bufno: c_int);
options: DBOptions, bufno: c_int);
pub fn rocksdb_options_set_min_write_buffer_number_to_merge(
options: RocksDBOptions, bufno: c_int);
options: DBOptions, bufno: c_int);
pub fn rocksdb_options_set_level0_file_num_compaction_trigger(
options: RocksDBOptions, no: c_int);
options: DBOptions, no: c_int);
pub fn rocksdb_options_set_level0_slowdown_writes_trigger(
options: RocksDBOptions, no: c_int);
options: DBOptions, no: c_int);
pub fn rocksdb_options_set_level0_stop_writes_trigger(
options: RocksDBOptions, no: c_int);
options: DBOptions, no: c_int);
pub fn rocksdb_options_set_write_buffer_size(
options: RocksDBOptions, bytes: u64);
options: DBOptions, bytes: u64);
pub fn rocksdb_options_set_target_file_size_base(
options: RocksDBOptions, bytes: u64);
options: DBOptions, bytes: u64);
pub fn rocksdb_options_set_target_file_size_multiplier(
options: RocksDBOptions, mul: c_int);
options: DBOptions, mul: c_int);
pub fn rocksdb_options_set_max_log_file_size(
options: RocksDBOptions, bytes: u64);
options: DBOptions, bytes: u64);
pub fn rocksdb_options_set_max_manifest_file_size(
options: RocksDBOptions, bytes: u64);
options: DBOptions, bytes: u64);
pub fn rocksdb_options_set_hash_skip_list_rep(
options: RocksDBOptions, bytes: u64, a1: i32, a2: i32);
options: DBOptions, bytes: u64, a1: i32, a2: i32);
pub fn rocksdb_options_set_compaction_style(
options: RocksDBOptions, cs: RocksDBCompactionStyle);
options: DBOptions, cs: DBCompactionStyle);
pub fn rocksdb_options_set_compression(
options: RocksDBOptions, compression_style_no: c_int);
options: DBOptions, compression_style_no: c_int);
pub fn rocksdb_options_set_max_background_compactions(
options: RocksDBOptions, max_bg_compactions: c_int);
options: DBOptions, max_bg_compactions: c_int);
pub fn rocksdb_options_set_max_background_flushes(
options: RocksDBOptions, max_bg_flushes: c_int);
options: DBOptions, max_bg_flushes: c_int);
pub fn rocksdb_options_set_filter_deletes(
options: RocksDBOptions, v: bool);
options: DBOptions, v: bool);
pub fn rocksdb_options_set_disable_auto_compactions(
options: RocksDBOptions, v: c_int);
options: DBOptions, v: c_int);
pub fn rocksdb_filterpolicy_create_bloom(
bits_per_key: c_int) -> RocksDBFilterPolicy;
pub fn rocksdb_open(options: RocksDBOptions,
bits_per_key: c_int) -> DBFilterPolicy;
pub fn rocksdb_open(options: DBOptions,
path: *const i8,
err: *mut *const i8
) -> RocksDBInstance;
pub fn rocksdb_writeoptions_create() -> RocksDBWriteOptions;
pub fn rocksdb_writeoptions_destroy(writeopts: RocksDBWriteOptions);
pub fn rocksdb_put(db: RocksDBInstance,
writeopts: RocksDBWriteOptions,
) -> DBInstance;
pub fn rocksdb_writeoptions_create() -> DBWriteOptions;
pub fn rocksdb_writeoptions_destroy(writeopts: DBWriteOptions);
pub fn rocksdb_put(db: DBInstance,
writeopts: DBWriteOptions,
k: *const u8, kLen: size_t,
v: *const u8, vLen: size_t,
err: *mut *const i8);
pub fn rocksdb_readoptions_create() -> RocksDBReadOptions;
pub fn rocksdb_readoptions_destroy(readopts: RocksDBReadOptions);
pub fn rocksdb_put_cf(db: DBInstance,
writeopts: DBWriteOptions,
cf: DBCFHandle,
k: *const u8, kLen: size_t,
v: *const u8, vLen: size_t,
err: *mut *const i8);
pub fn rocksdb_readoptions_create() -> DBReadOptions;
pub fn rocksdb_readoptions_destroy(readopts: DBReadOptions);
pub fn rocksdb_readoptions_set_verify_checksums(
readopts: RocksDBReadOptions,
readopts: DBReadOptions,
v: bool);
pub fn rocksdb_readoptions_set_fill_cache(
readopts: RocksDBReadOptions,
readopts: DBReadOptions,
v: bool);
pub fn rocksdb_readoptions_set_snapshot(
readopts: RocksDBReadOptions,
snapshot: RocksDBSnapshot); //TODO how do I make this a const ref?
readopts: DBReadOptions,
snapshot: DBSnapshot); //TODO how do I make this a const ref?
pub fn rocksdb_readoptions_set_iterate_upper_bound(
readopts: RocksDBReadOptions,
readopts: DBReadOptions,
k: *const u8,
kLen: size_t);
pub fn rocksdb_readoptions_set_read_tier(
readopts: RocksDBReadOptions,
readopts: DBReadOptions,
tier: c_int);
pub fn rocksdb_readoptions_set_tailing(
readopts: RocksDBReadOptions,
readopts: DBReadOptions,
v: bool);
pub fn rocksdb_get(db: RocksDBInstance,
readopts: RocksDBReadOptions,
pub fn rocksdb_get(db: DBInstance,
readopts: DBReadOptions,
k: *const u8, kLen: size_t,
valLen: *const size_t,
err: *mut *const i8
) -> *mut c_void;
pub fn rocksdb_get_cf(db: RocksDBInstance,
readopts: RocksDBReadOptions,
cf_handle: RocksDBCFHandle,
pub fn rocksdb_get_cf(db: DBInstance,
readopts: DBReadOptions,
cf_handle: DBCFHandle,
k: *const u8, kLen: size_t,
valLen: *const size_t,
err: *mut *const i8
) -> *mut c_void;
pub fn rocksdb_create_iterator(db: RocksDBInstance,
readopts: RocksDBReadOptions
) -> RocksDBIterator;
pub fn rocksdb_create_iterator_cf(db: RocksDBInstance,
readopts: RocksDBReadOptions,
cf_handle: RocksDBCFHandle
) -> RocksDBIterator;
pub fn rocksdb_create_snapshot(db: RocksDBInstance) -> RocksDBSnapshot;
pub fn rocksdb_release_snapshot(db: RocksDBInstance,
snapshot: RocksDBSnapshot);
pub fn rocksdb_create_iterator(db: DBInstance,
readopts: DBReadOptions
) -> DBIterator;
pub fn rocksdb_create_iterator_cf(db: DBInstance,
readopts: DBReadOptions,
cf_handle: DBCFHandle
) -> DBIterator;
pub fn rocksdb_create_snapshot(db: DBInstance) -> DBSnapshot;
pub fn rocksdb_release_snapshot(db: DBInstance,
snapshot: DBSnapshot);
pub fn rocksdb_delete(db: RocksDBInstance,
writeopts: RocksDBWriteOptions,
pub fn rocksdb_delete(db: DBInstance,
writeopts: DBWriteOptions,
k: *const u8, kLen: size_t,
err: *mut *const i8
) -> *mut c_void;
pub fn rocksdb_close(db: RocksDBInstance);
pub fn rocksdb_destroy_db(options: RocksDBOptions,
pub fn rocksdb_delete_cf(db: DBInstance,
writeopts: DBWriteOptions,
cf: DBCFHandle,
k: *const u8, kLen: size_t,
err: *mut *const i8
) -> *mut c_void;
pub fn rocksdb_close(db: DBInstance);
pub fn rocksdb_destroy_db(options: DBOptions,
path: *const i8, err: *mut *const i8);
pub fn rocksdb_repair_db(options: RocksDBOptions,
pub fn rocksdb_repair_db(options: DBOptions,
path: *const i8, err: *mut *const i8);
// Merge
pub fn rocksdb_merge(db: RocksDBInstance,
writeopts: RocksDBWriteOptions,
pub fn rocksdb_merge(db: DBInstance,
writeopts: DBWriteOptions,
k: *const u8, kLen: size_t,
v: *const u8, vLen: size_t,
err: *mut *const i8);
pub fn rocksdb_merge_cf(db: DBInstance,
writeopts: DBWriteOptions,
cf: DBCFHandle,
k: *const u8, kLen: size_t,
v: *const u8, vLen: size_t,
err: *mut *const i8);
@ -276,73 +294,73 @@ extern {
value_len: *mut size_t
) -> ()>,
name_fn: extern fn(*mut c_void) -> *const c_char,
) -> RocksDBMergeOperator;
pub fn rocksdb_mergeoperator_destroy(mo: RocksDBMergeOperator);
pub fn rocksdb_options_set_merge_operator(options: RocksDBOptions,
mo: RocksDBMergeOperator);
) -> DBMergeOperator;
pub fn rocksdb_mergeoperator_destroy(mo: DBMergeOperator);
pub fn rocksdb_options_set_merge_operator(options: DBOptions,
mo: DBMergeOperator);
// Iterator
pub fn rocksdb_iter_destroy(iter: RocksDBIterator);
pub fn rocksdb_iter_valid(iter: RocksDBIterator) -> bool;
pub fn rocksdb_iter_seek_to_first(iter: RocksDBIterator);
pub fn rocksdb_iter_seek_to_last(iter: RocksDBIterator);
pub fn rocksdb_iter_seek(iter: RocksDBIterator,
pub fn rocksdb_iter_destroy(iter: DBIterator);
pub fn rocksdb_iter_valid(iter: DBIterator) -> bool;
pub fn rocksdb_iter_seek_to_first(iter: DBIterator);
pub fn rocksdb_iter_seek_to_last(iter: DBIterator);
pub fn rocksdb_iter_seek(iter: DBIterator,
key: *const u8, klen: size_t);
pub fn rocksdb_iter_next(iter: RocksDBIterator);
pub fn rocksdb_iter_prev(iter: RocksDBIterator);
pub fn rocksdb_iter_key(iter: RocksDBIterator,
pub fn rocksdb_iter_next(iter: DBIterator);
pub fn rocksdb_iter_prev(iter: DBIterator);
pub fn rocksdb_iter_key(iter: DBIterator,
klen: *mut size_t) -> *mut u8;
pub fn rocksdb_iter_value(iter: RocksDBIterator,
pub fn rocksdb_iter_value(iter: DBIterator,
vlen: *mut size_t) -> *mut u8;
pub fn rocksdb_iter_get_error(iter: RocksDBIterator,
pub fn rocksdb_iter_get_error(iter: DBIterator,
err: *mut *const u8);
// Write batch
pub fn rocksdb_write(db: RocksDBInstance,
writeopts: RocksDBWriteOptions,
batch : RocksDBWriteBatch,
pub fn rocksdb_write(db: DBInstance,
writeopts: DBWriteOptions,
batch : DBWriteBatch,
err: *mut *const i8);
pub fn rocksdb_writebatch_create() -> RocksDBWriteBatch;
pub fn rocksdb_writebatch_create() -> DBWriteBatch;
pub fn rocksdb_writebatch_create_from(rep: *const u8,
size: size_t) -> RocksDBWriteBatch;
pub fn rocksdb_writebatch_destroy(batch: RocksDBWriteBatch);
pub fn rocksdb_writebatch_clear(batch: RocksDBWriteBatch);
pub fn rocksdb_writebatch_count(batch: RocksDBWriteBatch) -> c_int;
pub fn rocksdb_writebatch_put(batch: RocksDBWriteBatch,
size: size_t) -> DBWriteBatch;
pub fn rocksdb_writebatch_destroy(batch: DBWriteBatch);
pub fn rocksdb_writebatch_clear(batch: DBWriteBatch);
pub fn rocksdb_writebatch_count(batch: DBWriteBatch) -> c_int;
pub fn rocksdb_writebatch_put(batch: DBWriteBatch,
key: *const u8, klen: size_t,
val: *const u8, vlen: size_t);
pub fn rocksdb_writebatch_put_cf(batch: RocksDBWriteBatch,
cf: RocksDBCFHandle,
pub fn rocksdb_writebatch_put_cf(batch: DBWriteBatch,
cf: DBCFHandle,
key: *const u8, klen: size_t,
val: *const u8, vlen: size_t);
pub fn rocksdb_writebatch_merge(
batch: RocksDBWriteBatch,
batch: DBWriteBatch,
key: *const u8, klen: size_t,
val: *const u8, vlen: size_t);
pub fn rocksdb_writebatch_merge_cf(
batch: RocksDBWriteBatch,
cf: RocksDBCFHandle,
batch: DBWriteBatch,
cf: DBCFHandle,
key: *const u8, klen: size_t,
val: *const u8, vlen: size_t);
pub fn rocksdb_writebatch_delete(
batch: RocksDBWriteBatch,
batch: DBWriteBatch,
key: *const u8, klen: size_t);
pub fn rocksdb_writebatch_delete_cf(
batch: RocksDBWriteBatch,
cf: RocksDBCFHandle,
batch: DBWriteBatch,
cf: DBCFHandle,
key: *const u8, klen: size_t);
pub fn rocksdb_writebatch_iterate(
batch: RocksDBWriteBatch,
batch: DBWriteBatch,
state: *mut c_void,
put_fn: extern fn(state: *mut c_void,
k: *const u8, klen: size_t,
v: *const u8, vlen: size_t),
deleted_fn: extern fn(state: *mut c_void,
k: *const u8, klen: size_t));
pub fn rocksdb_writebatch_data(batch: RocksDBWriteBatch,
pub fn rocksdb_writebatch_data(batch: DBWriteBatch,
size: *mut size_t) -> *const u8;
// Comparator
pub fn rocksdb_options_set_comparator(options: RocksDBOptions,
cb: RocksDBComparator);
pub fn rocksdb_options_set_comparator(options: DBOptions,
cb: DBComparator);
pub fn rocksdb_comparator_create(
state: *mut c_void,
destroy: extern fn(*mut c_void) -> (),
@ -351,11 +369,30 @@ extern {
b: *const c_char, blen: size_t
) -> c_int,
name_fn: extern fn(*mut c_void) -> *const c_char
) -> RocksDBComparator;
pub fn rocksdb_comparator_destroy(cmp: RocksDBComparator);
) -> DBComparator;
pub fn rocksdb_comparator_destroy(cmp: DBComparator);
// Column Family
pub fn rocksdb_open_column_families(options: DBOptions,
path: *const i8,
num_column_families: c_int,
column_family_names: *const *const i8,
column_family_options: *const DBOptions,
column_family_handles: *const DBCFHandle,
err: *mut *const i8
) -> DBInstance;
pub fn rocksdb_create_column_family(db: DBInstance,
column_family_options: DBOptions,
column_family_name: *const i8,
err: *mut *const i8
) -> DBCFHandle;
pub fn rocksdb_drop_column_family(db: DBInstance,
column_family_handle: DBCFHandle,
err: *mut *const i8);
pub fn rocksdb_column_family_handle_destroy(column_family_handle: DBCFHandle);
}
#[allow(dead_code)]
#[test]
fn internal() {
unsafe {

@ -17,8 +17,8 @@
#![crate_type = "lib"]
pub use ffi as rocksdb_ffi;
pub use ffi::{new_bloom_filter, RocksDBCompactionStyle, RocksDBComparator};
pub use rocksdb::{RocksDB, RocksDBResult, RocksDBVector, WriteBatch, Writable, Direction};
pub use ffi::{new_bloom_filter, DBCompactionStyle, DBComparator};
pub use rocksdb::{DB, DBResult, DBVector, WriteBatch, Writable, Direction};
pub use rocksdb_options::{Options, BlockBasedOptions};
pub use merge_operator::MergeOperands;
pub mod rocksdb;

@ -14,13 +14,13 @@
limitations under the License.
*/
extern crate rocksdb;
use rocksdb::{Options, RocksDB, MergeOperands, new_bloom_filter, Writable, };
use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction;
use rocksdb::{Options, DB, MergeOperands, new_bloom_filter, Writable, };
use rocksdb::DBCompactionStyle::DBUniversalCompaction;
//fn snapshot_test() {
// let path = "_rust_rocksdb_iteratortest";
// {
// let mut db = RocksDB::open_default(path).unwrap();
// let mut db = DB::open_default(path).unwrap();
// let p = db.put(b"k1", b"v1111");
// assert!(p.is_ok());
// let p = db.put(b"k2", b"v2222");
@ -41,13 +41,13 @@ use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction;
// };
// }
// let opts = Options::new();
// assert!(RocksDB::destroy(&opts, path).is_ok());
// assert!(DB::destroy(&opts, path).is_ok());
//}
#[cfg(not(feature = "valgrind"))]
fn main() {
let path = "/tmp/rust-rocksdb";
let mut db = RocksDB::open_default(path).unwrap();
let mut db = DB::open_default(path).unwrap();
assert!(db.put(b"my key", b"my value").is_ok());
db.get(b"my key").map( |value| {
match value.to_utf8() {
@ -88,7 +88,7 @@ fn custom_merge() {
opts.create_if_missing(true);
opts.add_merge_operator("test operator", concat_merge);
{
let mut db = RocksDB::open(&opts, path).unwrap();
let mut db = DB::open(&opts, path).unwrap();
db.put(b"k1", b"a");
db.merge(b"k1", b"b");
db.merge(b"k1", b"c");
@ -107,7 +107,7 @@ fn custom_merge() {
.on_error( |e| { println!("error retrieving value: {}", e) });
}
RocksDB::destroy(&opts, path).is_ok();
DB::destroy(&opts, path).is_ok();
}
#[cfg(feature = "valgrind")]
@ -116,7 +116,7 @@ fn main() {
let mut opts = Options::new();
opts.create_if_missing(true);
opts.add_merge_operator("test operator", concat_merge);
let db = RocksDB::open(&opts, path).unwrap();
let db = DB::open(&opts, path).unwrap();
loop {
db.put(b"k1", b"a");
db.merge(b"k1", b"b");
@ -141,10 +141,10 @@ fn main() {
mod tests {
use std::thread::sleep_ms;
use rocksdb::{BlockBasedOptions, Options, RocksDB, MergeOperands, new_bloom_filter, Writable };
use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction;
use rocksdb::{BlockBasedOptions, Options, DB, MergeOperands, new_bloom_filter, Writable };
use rocksdb::DBCompactionStyle::DBUniversalCompaction;
fn tuned_for_somebody_elses_disk(path: &str, opts: & mut Options, blockopts: &mut BlockBasedOptions) -> RocksDB {
fn tuned_for_somebody_elses_disk(path: &str, opts: & mut Options, blockopts: &mut BlockBasedOptions) -> DB {
opts.create_if_missing(true);
opts.set_max_open_files(10000);
opts.set_use_fsync(false);
@ -158,7 +158,7 @@ mod tests {
opts.set_min_write_buffer_number_to_merge(4);
opts.set_level_zero_stop_writes_trigger(2000);
opts.set_level_zero_slowdown_writes_trigger(0);
opts.set_compaction_style(RocksDBUniversalCompaction);
opts.set_compaction_style(DBUniversalCompaction);
opts.set_max_background_compactions(4);
opts.set_max_background_flushes(4);
opts.set_filter_deletes(false);
@ -169,7 +169,7 @@ mod tests {
let filter = new_bloom_filter(10);
//opts.set_filter(filter);
RocksDB::open(&opts, path).unwrap()
DB::open(&opts, path).unwrap()
}
/* TODO(tyler) unstable
@ -204,7 +204,7 @@ mod tests {
i += 1;
});
}
RocksDB::destroy(&opts, path).is_ok();
DB::destroy(&opts, path).is_ok();
}
*/
}

@ -21,7 +21,7 @@ use std::ptr;
use std::slice;
use rocksdb_options::Options;
use rocksdb::{RocksDB, RocksDBResult, RocksDBVector, Writable};
use rocksdb::{DB, DBResult, DBVector, Writable};
pub struct MergeOperatorCallback {
pub name: CString,
@ -187,7 +187,7 @@ fn mergetest() {
opts.create_if_missing(true);
opts.add_merge_operator("test operator", test_provided_merge);
{
let mut db = RocksDB::open(&opts, path).unwrap();
let mut db = DB::open(&opts, path).unwrap();
let p = db.put(b"k1", b"a");
assert!(p.is_ok());
db.merge(b"k1", b"b");
@ -207,10 +207,10 @@ fn mergetest() {
.on_error( |e| { println!("error reading value")}); //: {", e) });
assert!(m.is_ok());
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
let r: DBResult<DBVector, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh");
assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").is_none());
}
assert!(RocksDB::destroy(&opts, path).is_ok());
assert!(DB::destroy(&opts, path).is_ok());
}

@ -15,42 +15,46 @@
*/
extern crate libc;
use self::libc::{c_void, size_t};
use std::collections::BTreeMap;
use std::ffi::{CString, CStr};
use std::fs;
use std::io;
use std::ops::Deref;
use std::path::Path;
use std::slice;
use std::str::from_utf8;
use std::marker::PhantomData;
use rocksdb_ffi;
use self::libc::{c_void, size_t};
use rocksdb_ffi::{self, DBCFHandle};
use rocksdb_options::Options;
pub struct RocksDB {
inner: rocksdb_ffi::RocksDBInstance,
pub struct DB {
inner: rocksdb_ffi::DBInstance,
cfs: BTreeMap<String, DBCFHandle>,
}
unsafe impl Send for RocksDB {}
unsafe impl Sync for RocksDB {}
unsafe impl Send for DB {}
unsafe impl Sync for DB {}
pub struct WriteBatch {
inner: rocksdb_ffi::RocksDBWriteBatch,
inner: rocksdb_ffi::DBWriteBatch,
}
pub struct ReadOptions {
inner: rocksdb_ffi::RocksDBReadOptions,
inner: rocksdb_ffi::DBReadOptions,
}
pub struct Snapshot<'a> {
db: &'a RocksDB,
inner: rocksdb_ffi::RocksDBSnapshot,
db: &'a DB,
inner: rocksdb_ffi::DBSnapshot,
}
pub struct DBIterator {
// TODO: should have a reference to DB to enforce scope, but it's trickier than I
// thought to add
inner: rocksdb_ffi::RocksDBIterator,
inner: rocksdb_ffi::DBIterator,
direction: Direction,
just_seeked: bool,
}
@ -97,7 +101,7 @@ impl <'a> Iterator for SubDBIterator<'a> {
impl DBIterator {
//TODO alias db & opts to different lifetimes, and DBIterator to the db's lifetime
fn new(db: &RocksDB, readopts: &ReadOptions) -> DBIterator {
fn new(db: &DB, readopts: &ReadOptions) -> DBIterator {
unsafe {
let iterator = rocksdb_ffi::rocksdb_create_iterator(db.inner, readopts.inner);
rocksdb_ffi::rocksdb_iter_seek_to_first(iterator);
@ -105,6 +109,20 @@ impl DBIterator {
}
}
fn new_cf(db: &DB, cf_name: &str, readopts: &ReadOptions) -> Result<DBIterator, String> {
let cf = db.cfs.get(cf_name);
if cf.is_none() {
return Err(format!("Invalid column family: {}", cf_name).to_string());
}
unsafe {
let iterator = rocksdb_ffi::rocksdb_create_iterator_cf(db.inner,
readopts.inner,
*cf.unwrap());
rocksdb_ffi::rocksdb_iter_seek_to_first(iterator);
Ok(DBIterator{ inner: iterator, direction: Direction::forward, just_seeked: true })
}
}
pub fn from_start(&mut self) -> SubDBIterator {
self.just_seeked = true;
unsafe {
@ -139,7 +157,7 @@ impl Drop for DBIterator {
}
impl <'a> Snapshot<'a> {
pub fn new(db: &RocksDB) -> Snapshot {
pub fn new(db: &DB) -> Snapshot {
let snapshot = unsafe { rocksdb_ffi::rocksdb_create_snapshot(db.inner) };
Snapshot { db: db, inner: snapshot }
}
@ -159,11 +177,14 @@ impl <'a> Drop for Snapshot<'a> {
}
}
// This is for the RocksDB and write batches to share the same API
// This is for the DB and write batches to share the same API
pub trait Writable {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String>;
fn put_cf(&self, cf: DBCFHandle, key: &[u8], value: &[u8]) -> Result<(), String>;
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String>;
fn merge_cf(&self, cf: DBCFHandle, key: &[u8], value: &[u8]) -> Result<(), String>;
fn delete(&self, key: &[u8]) -> Result<(), String>;
fn delete_cf(&self, cf: DBCFHandle, key: &[u8]) -> Result<(), String>;
}
fn error_message(ptr: *const i8) -> String {
@ -175,14 +196,18 @@ fn error_message(ptr: *const i8) -> String {
s
}
impl RocksDB {
pub fn open_default(path: &str) -> Result<RocksDB, String> {
impl DB {
pub fn open_default(path: &str) -> Result<DB, String> {
let mut opts = Options::new();
opts.create_if_missing(true);
RocksDB::open(&opts, path)
DB::open(&opts, path)
}
pub fn open(opts: &Options, path: &str) -> Result<DB, String> {
DB::open_cf(opts, path, &[])
}
pub fn open(opts: &Options, path: &str) -> Result<RocksDB, String> {
pub fn open_cf(opts: &Options, path: &str, cfs: &[&str]) -> Result<DB, String> {
let cpath = match CString::new(path.as_bytes()) {
Ok(c) => c,
Err(_) =>
@ -198,20 +223,71 @@ impl RocksDB {
let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
let db: rocksdb_ffi::RocksDBInstance;
let db: rocksdb_ffi::DBInstance;
let mut cfMap = BTreeMap::new();
unsafe {
db = rocksdb_ffi::rocksdb_open(opts.inner, cpath_ptr, err_ptr);
if cfs.len() == 0 {
unsafe {
db = rocksdb_ffi::rocksdb_open(opts.inner, cpath_ptr, err_ptr);
}
} else {
let mut cfs_v = cfs.to_vec();
// Always open the default column family
if !cfs_v.contains(&"default") {
cfs_v.push("default");
}
// We need to store our CStrings in an intermediate vector
// so that their pointers remain valid.
let c_cfs: Vec<CString> = cfs_v.iter().map( |cf| {
CString::new(cf.as_bytes()).unwrap()
}).collect();
let cfnames: Vec<*const i8> = c_cfs.iter().map( |cf| {
cf.as_ptr()
}).collect();
// These handles will be populated by DB.
let mut cfhandles: Vec<rocksdb_ffi::DBCFHandle> =
cfs_v.iter().map( |_| {
rocksdb_ffi::DBCFHandle(0 as *mut c_void)
}).collect();
// TODO(tyler) allow options to be passed in.
let cfopts: Vec<rocksdb_ffi::DBOptions> = cfs_v.iter().map( |_| {
unsafe { rocksdb_ffi::rocksdb_options_create() }
}).collect();
// Prepare to ship to C.
let copts: *const rocksdb_ffi::DBOptions = cfopts.as_ptr();
let handles: *const rocksdb_ffi::DBCFHandle = cfhandles.as_ptr();
let nfam = cfs_v.len();
unsafe {
db = rocksdb_ffi::rocksdb_open_column_families(opts.inner, cpath_ptr,
nfam as libc::c_int,
cfnames.as_ptr(),
copts, handles, err_ptr);
}
for handle in cfhandles.iter() {
if handle.0.is_null() {
return Err("Received null column family handle from DB.".to_string());
}
}
for (n, h) in cfs_v.iter().zip(cfhandles) {
cfMap.insert(n.to_string(), h);
}
}
if !err.is_null() {
return Err(error_message(err));
}
let rocksdb_ffi::RocksDBInstance(db_ptr) = db;
if db_ptr.is_null() {
if db.0.is_null() {
return Err("Could not initialize database.".to_string());
}
Ok(RocksDB { inner: db })
Ok(DB { inner: db, cfs: cfMap })
}
pub fn destroy(opts: &Options, path: &str) -> Result<(), String> {
@ -260,11 +336,11 @@ impl RocksDB {
return Ok(())
}
pub fn get(&self, key: &[u8]) -> RocksDBResult<RocksDBVector, String> {
pub fn get(&self, key: &[u8]) -> DBResult<DBVector, String> {
unsafe {
let readopts = rocksdb_ffi::rocksdb_readoptions_create();
if readopts.0.is_null() {
return RocksDBResult::Error("Unable to create rocksdb read \
return DBResult::Error("Unable to create rocksdb read \
options. This is a fairly trivial call, and its failure \
may be indicative of a mis-compiled or mis-loaded rocksdb \
library.".to_string());
@ -278,29 +354,107 @@ impl RocksDB {
key.as_ptr(), key.len() as size_t, val_len_ptr, err_ptr) as *mut u8;
rocksdb_ffi::rocksdb_readoptions_destroy(readopts);
if !err.is_null() {
return RocksDBResult::Error(error_message(err));
return DBResult::Error(error_message(err));
}
match val.is_null() {
true => DBResult::None,
false => {
DBResult::Some(DBVector::from_c(val, val_len))
}
}
}
}
pub fn get_cf(&self, cf: DBCFHandle, key: &[u8]) -> DBResult<DBVector, String> {
unsafe {
let readopts = rocksdb_ffi::rocksdb_readoptions_create();
if readopts.0.is_null() {
return DBResult::Error("Unable to create rocksdb read \
options. This is a fairly trivial call, and its failure \
may be indicative of a mis-compiled or mis-loaded rocksdb \
library.".to_string());
}
let val_len: size_t = 0;
let val_len_ptr = &val_len as *const size_t;
let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
let val = rocksdb_ffi::rocksdb_get_cf(self.inner, readopts.clone(),
cf, key.as_ptr(), key.len() as size_t, val_len_ptr,
err_ptr) as *mut u8;
rocksdb_ffi::rocksdb_readoptions_destroy(readopts);
if !err.is_null() {
return DBResult::Error(error_message(err));
}
match val.is_null() {
true => RocksDBResult::None,
true => DBResult::None,
false => {
RocksDBResult::Some(RocksDBVector::from_c(val, val_len))
DBResult::Some(DBVector::from_c(val, val_len))
}
}
}
}
pub fn create_cf(&mut self, name: &str, opts: &Options) -> Result<DBCFHandle, String> {
let cname = match CString::new(name.as_bytes()) {
Ok(c) => c,
Err(_) =>
return Err("Failed to convert path to CString when opening rocksdb".to_string()),
};
let cname_ptr = cname.as_ptr();
let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
let cf_handler = unsafe {
let cf_handler = rocksdb_ffi::rocksdb_create_column_family(
self.inner, opts.inner, cname_ptr, err_ptr);
self.cfs.insert(name.to_string(), cf_handler);
cf_handler
};
if !err.is_null() {
return Err(error_message(err));
}
Ok(cf_handler)
}
pub fn drop_cf(&mut self, name: &str) -> Result<(), String> {
let cf = self.cfs.get(name);
if cf.is_none() {
return Err(format!("Invalid column family: {}", name).to_string());
}
let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
unsafe {
rocksdb_ffi::rocksdb_drop_column_family(self.inner, *cf.unwrap(), err_ptr);
}
if !err.is_null() {
return Err(error_message(err));
}
Ok(())
}
pub fn cf_handle(&self, name: &str) -> Option<&DBCFHandle> {
self.cfs.get(name)
}
pub fn iterator(&self) -> DBIterator {
let opts = ReadOptions::new();
DBIterator::new(&self, &opts)
}
pub fn iterator_cf(&self, cf: &str) -> Result<DBIterator, String> {
let opts = ReadOptions::new();
DBIterator::new_cf(&self, cf, &opts)
}
pub fn snapshot(&self) -> Snapshot {
Snapshot::new(self)
}
}
impl Writable for RocksDB {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
impl Writable for DB {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let mut err: *const i8 = 0 as *const i8;
@ -312,7 +466,23 @@ impl Writable for RocksDB {
if !err.is_null() {
return Err(error_message(err));
}
return Ok(())
Ok(())
}
}
fn put_cf(&self, cf: DBCFHandle, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
rocksdb_ffi::rocksdb_put_cf(self.inner, writeopts.clone(), cf,
key.as_ptr(), key.len() as size_t, value.as_ptr(),
value.len() as size_t, err_ptr);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
if !err.is_null() {
return Err(error_message(err));
}
Ok(())
}
}
@ -328,7 +498,24 @@ impl Writable for RocksDB {
if !err.is_null() {
return Err(error_message(err));
}
return Ok(())
Ok(())
}
}
fn merge_cf(&self, cf: DBCFHandle, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
rocksdb_ffi::rocksdb_merge_cf(self.inner, writeopts.clone(),
cf, key.as_ptr(),
key.len() as size_t, value.as_ptr(),
value.len() as size_t, err_ptr);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
if !err.is_null() {
return Err(error_message(err));
}
Ok(())
}
}
@ -343,7 +530,23 @@ impl Writable for RocksDB {
if !err.is_null() {
return Err(error_message(err));
}
return Ok(())
Ok(())
}
}
fn delete_cf(&self, cf: DBCFHandle, key: &[u8]) -> Result<(), String> {
unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
rocksdb_ffi::rocksdb_delete_cf(self.inner, writeopts.clone(),
cf, key.as_ptr(),
key.len() as size_t, err_ptr);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
if !err.is_null() {
return Err(error_message(err));
}
Ok(())
}
}
}
@ -366,9 +569,14 @@ impl Drop for WriteBatch {
}
}
impl Drop for RocksDB {
impl Drop for DB {
fn drop(&mut self) {
unsafe { rocksdb_ffi::rocksdb_close(self.inner); }
unsafe {
for (_, cf) in self.cfs.iter() {
rocksdb_ffi::rocksdb_column_family_handle_destroy(*cf);
}
rocksdb_ffi::rocksdb_close(self.inner);
}
}
}
@ -378,7 +586,16 @@ impl Writable for WriteBatch {
rocksdb_ffi::rocksdb_writebatch_put(self.inner, key.as_ptr(),
key.len() as size_t, value.as_ptr(),
value.len() as size_t);
return Ok(())
Ok(())
}
}
fn put_cf(&self, cf: DBCFHandle, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe {
rocksdb_ffi::rocksdb_writebatch_put_cf(self.inner, cf, key.as_ptr(),
key.len() as size_t, value.as_ptr(),
value.len() as size_t);
Ok(())
}
}
@ -387,7 +604,16 @@ impl Writable for WriteBatch {
rocksdb_ffi::rocksdb_writebatch_merge(self.inner, key.as_ptr(),
key.len() as size_t, value.as_ptr(),
value.len() as size_t);
return Ok(())
Ok(())
}
}
fn merge_cf(&self, cf: DBCFHandle, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe {
rocksdb_ffi::rocksdb_writebatch_merge_cf(self.inner, cf, key.as_ptr(),
key.len() as size_t, value.as_ptr(),
value.len() as size_t);
Ok(())
}
}
@ -395,7 +621,16 @@ impl Writable for WriteBatch {
unsafe {
rocksdb_ffi::rocksdb_writebatch_delete(self.inner, key.as_ptr(),
key.len() as size_t);
return Ok(())
Ok(())
}
}
fn delete_cf(&self, cf: DBCFHandle, key: &[u8]) -> Result<(), String> {
unsafe {
rocksdb_ffi::rocksdb_writebatch_delete_cf(self.inner,
cf, key.as_ptr(),
key.len() as size_t);
Ok(())
}
}
}
@ -430,19 +665,19 @@ impl ReadOptions {
}
}
pub struct RocksDBVector {
pub struct DBVector {
base: *mut u8,
len: usize,
}
impl Deref for RocksDBVector {
impl Deref for DBVector {
type Target = [u8];
fn deref(&self) -> &[u8] {
unsafe { slice::from_raw_parts(self.base, self.len) }
}
}
impl Drop for RocksDBVector {
impl Drop for DBVector {
fn drop(&mut self) {
unsafe {
libc::free(self.base as *mut libc::c_void);
@ -450,10 +685,10 @@ impl Drop for RocksDBVector {
}
}
impl RocksDBVector {
pub fn from_c(val: *mut u8, val_len: size_t) -> RocksDBVector {
impl DBVector {
pub fn from_c(val: *mut u8, val_len: size_t) -> DBVector {
unsafe {
RocksDBVector {
DBVector {
base: val,
len: val_len as usize,
}
@ -465,101 +700,100 @@ impl RocksDBVector {
}
}
// RocksDBResult exists because of the inherent difference between
// DBResult exists because of the inherent difference between
// an operational failure and the absence of a possible result.
#[derive(Clone, PartialEq, PartialOrd, Eq, Ord, Debug)]
pub enum RocksDBResult<T, E> {
pub enum DBResult<T, E> {
Some(T),
None,
Error(E),
}
impl <T, E> RocksDBResult<T, E> {
pub fn map<U, F: FnOnce(T) -> U>(self, f: F) -> RocksDBResult<U, E> {
impl <T, E> DBResult<T, E> {
pub fn map<U, F: FnOnce(T) -> U>(self, f: F) -> DBResult<U, E> {
match self {
RocksDBResult::Some(x) => RocksDBResult::Some(f(x)),
RocksDBResult::None => RocksDBResult::None,
RocksDBResult::Error(e) => RocksDBResult::Error(e),
DBResult::Some(x) => DBResult::Some(f(x)),
DBResult::None => DBResult::None,
DBResult::Error(e) => DBResult::Error(e),
}
}
pub fn unwrap(self) -> T {
match self {
RocksDBResult::Some(x) => x,
RocksDBResult::None =>
panic!("Attempted unwrap on RocksDBResult::None"),
RocksDBResult::Error(_) =>
panic!("Attempted unwrap on RocksDBResult::Error"),
DBResult::Some(x) => x,
DBResult::None =>
panic!("Attempted unwrap on DBResult::None"),
DBResult::Error(_) =>
panic!("Attempted unwrap on DBResult::Error"),
}
}
pub fn on_error<U, F: FnOnce(E) -> U>(self, f: F) -> RocksDBResult<T, U> {
pub fn on_error<U, F: FnOnce(E) -> U>(self, f: F) -> DBResult<T, U> {
match self {
RocksDBResult::Some(x) => RocksDBResult::Some(x),
RocksDBResult::None => RocksDBResult::None,
RocksDBResult::Error(e) => RocksDBResult::Error(f(e)),
DBResult::Some(x) => DBResult::Some(x),
DBResult::None => DBResult::None,
DBResult::Error(e) => DBResult::Error(f(e)),
}
}
pub fn on_absent<F: FnOnce() -> ()>(self, f: F) -> RocksDBResult<T, E> {
pub fn on_absent<F: FnOnce() -> ()>(self, f: F) -> DBResult<T, E> {
match self {
RocksDBResult::Some(x) => RocksDBResult::Some(x),
RocksDBResult::None => {
DBResult::Some(x) => DBResult::Some(x),
DBResult::None => {
f();
RocksDBResult::None
DBResult::None
},
RocksDBResult::Error(e) => RocksDBResult::Error(e),
DBResult::Error(e) => DBResult::Error(e),
}
}
pub fn is_some(self) -> bool {
match self {
RocksDBResult::Some(_) => true,
RocksDBResult::None => false,
RocksDBResult::Error(_) => false,
DBResult::Some(_) => true,
DBResult::None => false,
DBResult::Error(_) => false,
}
}
pub fn is_none(self) -> bool {
match self {
RocksDBResult::Some(_) => false,
RocksDBResult::None => true,
RocksDBResult::Error(_) => false,
DBResult::Some(_) => false,
DBResult::None => true,
DBResult::Error(_) => false,
}
}
pub fn is_error(self) -> bool {
match self {
RocksDBResult::Some(_) => false,
RocksDBResult::None => false,
RocksDBResult::Error(_) => true,
DBResult::Some(_) => false,
DBResult::None => false,
DBResult::Error(_) => true,
}
}
}
#[allow(dead_code)]
#[test]
fn external() {
let path = "_rust_rocksdb_externaltest";
{
let mut db = RocksDB::open_default(path).unwrap();
let mut db = DB::open_default(path).unwrap();
let p = db.put(b"k1", b"v1111");
assert!(p.is_ok());
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
let r: DBResult<DBVector, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "v1111");
assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").is_none());
}
let opts = Options::new();
let result = RocksDB::destroy(&opts, path);
let result = DB::destroy(&opts, path);
assert!(result.is_ok());
}
#[test]
fn errors_do_stuff() {
let path = "_rust_rocksdb_error";
let mut db = RocksDB::open_default(path).unwrap();
let mut db = DB::open_default(path).unwrap();
let opts = Options::new();
// The DB will still be open when we try to destroy and the lock should fail
match RocksDB::destroy(&opts, path) {
match DB::destroy(&opts, path) {
Err(ref s) => assert!(s == "IO error: lock _rust_rocksdb_error/LOCK: No locks available"),
Ok(_) => panic!("should fail")
}
@ -569,7 +803,7 @@ fn errors_do_stuff() {
fn writebatch_works() {
let path = "_rust_rocksdb_writebacktest";
{
let mut db = RocksDB::open_default(path).unwrap();
let mut db = DB::open_default(path).unwrap();
{ // test put
let mut batch = WriteBatch::new();
assert!(db.get(b"k1").is_none());
@ -577,7 +811,7 @@ fn writebatch_works() {
assert!(db.get(b"k1").is_none());
let p = db.write(batch);
assert!(p.is_ok());
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
let r: DBResult<DBVector, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "v1111");
}
{ // test delete
@ -589,14 +823,14 @@ fn writebatch_works() {
}
}
let opts = Options::new();
assert!(RocksDB::destroy(&opts, path).is_ok());
assert!(DB::destroy(&opts, path).is_ok());
}
#[test]
fn iterator_test() {
let path = "_rust_rocksdb_iteratortest";
{
let mut db = RocksDB::open_default(path).unwrap();
let mut db = DB::open_default(path).unwrap();
let p = db.put(b"k1", b"v1111");
assert!(p.is_ok());
let p = db.put(b"k2", b"v2222");
@ -609,5 +843,5 @@ fn iterator_test() {
}
}
let opts = Options::new();
assert!(RocksDB::destroy(&opts, path).is_ok());
assert!(DB::destroy(&opts, path).is_ok());
}

@ -24,11 +24,11 @@ use merge_operator::{self, MergeOperatorCallback, MergeOperands,
use comparator::{self, ComparatorCallback, compare_callback};
pub struct BlockBasedOptions {
inner: rocksdb_ffi::RocksDBBlockBasedTableOptions,
inner: rocksdb_ffi::DBBlockBasedTableOptions,
}
pub struct Options {
pub inner: rocksdb_ffi::RocksDBOptions,
pub inner: rocksdb_ffi::DBOptions,
}
impl Drop for Options {
@ -50,7 +50,7 @@ impl Drop for BlockBasedOptions {
impl BlockBasedOptions {
pub fn new() -> BlockBasedOptions {
let block_opts = unsafe {rocksdb_ffi::rocksdb_block_based_options_create() };
let rocksdb_ffi::RocksDBBlockBasedTableOptions(opt_ptr) = block_opts;
let rocksdb_ffi::DBBlockBasedTableOptions(opt_ptr) = block_opts;
if opt_ptr.is_null() {
panic!("Could not create rocksdb block based options".to_string());
}
@ -65,21 +65,21 @@ impl BlockBasedOptions {
}
//TODO figure out how to create these in a Rusty way
////pub fn set_filter(&mut self, filter: rocksdb_ffi::RocksDBFilterPolicy) {
////pub fn set_filter(&mut self, filter: rocksdb_ffi::DBFilterPolicy) {
//// unsafe {
//// rocksdb_ffi::rocksdb_block_based_options_set_filter_policy(
//// self.inner, filter);
//// }
////}
////pub fn set_cache(&mut self, cache: rocksdb_ffi::RocksDBCache) {
////pub fn set_cache(&mut self, cache: rocksdb_ffi::DBCache) {
//// unsafe {
//// rocksdb_ffi::rocksdb_block_based_options_set_block_cache(
//// self.inner, cache);
//// }
////}
////pub fn set_cache_compressed(&mut self, cache: rocksdb_ffi::RocksDBCache) {
////pub fn set_cache_compressed(&mut self, cache: rocksdb_ffi::DBCache) {
//// unsafe {
//// rocksdb_ffi::rocksdb_block_based_options_set_block_cache_compressed(
//// self.inner, cache);
@ -92,7 +92,7 @@ impl Options {
pub fn new() -> Options {
unsafe {
let opts = rocksdb_ffi::rocksdb_options_create();
let rocksdb_ffi::RocksDBOptions(opt_ptr) = opts;
let rocksdb_ffi::DBOptions(opt_ptr) = opts;
if opt_ptr.is_null() {
panic!("Could not create rocksdb options".to_string());
}
@ -258,7 +258,7 @@ impl Options {
}
}
pub fn set_compaction_style(&mut self, style: rocksdb_ffi::RocksDBCompactionStyle) {
pub fn set_compaction_style(&mut self, style: rocksdb_ffi::DBCompactionStyle) {
unsafe {
rocksdb_ffi::rocksdb_options_set_compaction_style(
self.inner, style);

@ -2,3 +2,4 @@ extern crate rocksdb;
mod test_iterator;
mod test_multithreaded;
mod test_column_family;

@ -0,0 +1,133 @@
/*
Copyright 2014 Tyler Neely
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
use rocksdb::{Options, DB, DBResult, Writable, Direction, MergeOperands};
#[test]
pub fn test_column_family() {
let path = "_rust_rocksdb_cftest";
// should be able to create column families
{
let mut opts = Options::new();
opts.create_if_missing(true);
opts.add_merge_operator("test operator", test_provided_merge);
let mut db = DB::open(&opts, path).unwrap();
let opts = Options::new();
match db.create_cf("cf1", &opts) {
Ok(_) => println!("cf1 created successfully"),
Err(e) => {
panic!("could not create column family: {}", e);
},
}
}
// should fail to open db without specifying same column families
{
let mut opts = Options::new();
opts.add_merge_operator("test operator", test_provided_merge);
match DB::open(&opts, path) {
Ok(_) => panic!("should not have opened DB successfully without specifying column
families"),
Err(e) => assert!(e.starts_with("Invalid argument: You have to open all column families.")),
}
}
// should properly open db when specyfing all column families
{
let mut opts = Options::new();
opts.add_merge_operator("test operator", test_provided_merge);
match DB::open_cf(&opts, path, &["cf1"]) {
Ok(_) => println!("successfully opened db with column family"),
Err(e) => panic!("failed to open db with column family: {}", e),
}
}
// TODO should be able to write, read, merge, batch, and iterate over a cf
{
let mut opts = Options::new();
opts.add_merge_operator("test operator", test_provided_merge);
let mut db = match DB::open_cf(&opts, path, &["cf1"]) {
Ok(db) => {
println!("successfully opened db with column family");
db
},
Err(e) => panic!("failed to open db with column family: {}", e),
};
let cf1 = *db.cf_handle("cf1").unwrap();
assert!(db.put_cf(cf1, b"k1", b"v1").is_ok());
assert!(db.get_cf(cf1, b"k1").unwrap().to_utf8().unwrap() == "v1");
let p = db.put_cf(cf1, b"k1", b"a");
assert!(p.is_ok());
db.merge_cf(cf1, b"k1", b"b");
db.merge_cf(cf1, b"k1", b"c");
db.merge_cf(cf1, b"k1", b"d");
db.merge_cf(cf1, b"k1", b"efg");
let m = db.merge_cf(cf1, b"k1", b"h");
println!("m is {:?}", m);
// TODO assert!(m.is_ok());
db.get(b"k1").map( |value| {
match value.to_utf8() {
Some(v) =>
println!("retrieved utf8 value: {}", v),
None =>
println!("did not read valid utf-8 out of the db"),
}
}).on_absent( || { println!("value not present!") })
.on_error( |e| { println!("error reading value")}); //: {", e) });
let r = db.get_cf(cf1, b"k1");
// TODO assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh");
assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").is_none());
}
// TODO should be able to use writebatch ops with a cf
{
}
// TODO should be able to iterate over a cf
{
}
// should b able to drop a cf
{
let mut db = DB::open_cf(&Options::new(), path, &["cf1"]).unwrap();
match db.drop_cf("cf1") {
Ok(_) => println!("cf1 successfully dropped."),
Err(e) => panic!("failed to drop column family: {}", e),
}
}
assert!(DB::destroy(&Options::new(), path).is_ok());
}
fn test_provided_merge(new_key: &[u8],
existing_val: Option<&[u8]>,
mut operands: &mut MergeOperands)
-> Vec<u8> {
let nops = operands.size_hint().0;
let mut result: Vec<u8> = Vec::with_capacity(nops);
match existing_val {
Some(v) => {
for e in v {
result.push(*e);
}
},
None => (),
}
for op in operands {
for e in op {
result.push(*e);
}
}
result
}

@ -1,4 +1,4 @@
use rocksdb::{Options, RocksDB, Writable, Direction};
use rocksdb::{Options, DB, Writable, Direction};
use std;
fn cba(input: &Box<[u8]>) -> Box<[u8]> {
@ -17,7 +17,7 @@ pub fn test_iterator() {
let v2:Box<[u8]> = b"v2222".to_vec().into_boxed_slice();
let v3:Box<[u8]> = b"v3333".to_vec().into_boxed_slice();
let v4:Box<[u8]> = b"v4444".to_vec().into_boxed_slice();
let mut db = RocksDB::open_default(path).unwrap();
let mut db = DB::open_default(path).unwrap();
let p = db.put(&*k1, &*v1);
assert!(p.is_ok());
let p = db.put(&*k2, &*v2);
@ -109,6 +109,6 @@ pub fn test_iterator() {
}
}
let opts = Options::new();
assert!(RocksDB::destroy(&opts, path).is_ok());
assert!(DB::destroy(&opts, path).is_ok());
}

@ -1,48 +1,51 @@
use rocksdb::{Options, RocksDB, Writable, Direction, RocksDBResult};
use rocksdb::{Options, DB, Writable, Direction, DBResult};
use std::thread::{self, Builder};
use std::sync::Arc;
const N: usize = 1000_000;
const N: usize = 100_000;
#[test]
pub fn test_multithreaded() {
let path = "_rust_rocksdb_multithreadtest";
let db = RocksDB::open_default(path).unwrap();
let db = Arc::new(db);
{
let db = DB::open_default(path).unwrap();
let db = Arc::new(db);
db.put(b"key", b"value1");
db.put(b"key", b"value1");
let db1 = db.clone();
let j1 = thread::spawn(move|| {
for i in 1..N {
db1.put(b"key", b"value1");
}
});
let db1 = db.clone();
let j1 = thread::spawn(move|| {
for i in 1..N {
db1.put(b"key", b"value1");
}
});
let db2 = db.clone();
let j2 = thread::spawn(move|| {
for i in 1..N {
db2.put(b"key", b"value2");
}
});
let db2 = db.clone();
let j2 = thread::spawn(move|| {
for i in 1..N {
db2.put(b"key", b"value2");
}
});
let db3 = db.clone();
let j3 = thread::spawn(move|| {
for i in 1..N {
match db3.get(b"key") {
RocksDBResult::Some(v) => {
if &v[..] != b"value1" && &v[..] != b"value2" {
let db3 = db.clone();
let j3 = thread::spawn(move|| {
for i in 1..N {
match db3.get(b"key") {
DBResult::Some(v) => {
if &v[..] != b"value1" && &v[..] != b"value2" {
assert!(false);
}
}
_ => {
assert!(false);
}
}
_ => {
assert!(false);
}
}
}
});
});
j1.join();
j2.join();
j3.join();
j1.join();
j2.join();
j3.join();
}
assert!(DB::destroy(&Options::new(), path).is_ok());
}

Loading…
Cancel
Save