diff --git a/.gitignore b/.gitignore index b235e4d..8a4f43d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +*.swo *.swp target Cargo.lock diff --git a/.travis.yml b/.travis.yml index 1c4b5ba..e511cbc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,6 +3,7 @@ language: rust rust: - nightly - 1.1.0 + - 1.2.0 os: - linux diff --git a/Cargo.toml b/Cargo.toml index 7048181..62dd8cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "rocksdb" description = "A Rust wrapper for Facebook's RocksDB embeddable database." -version = "0.1.0" +version = "0.1.1" authors = ["Tyler Neely ", "David Greenberg "] license = "Apache-2.0" exclude = [ diff --git a/README.md b/README.md index 93e3f55..07c411e 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ rust-rocksdb [![Build Status](https://travis-ci.org/spacejam/rust-rocksdb.svg?branch=master)](https://travis-ci.org/spacejam/rust-rocksdb) [![crates.io](http://meritbadge.herokuapp.com/rocksdb)](https://crates.io/crates/rocksdb) -This library has been tested against RocksDB 3.8.1 on linux and OSX. The 0.1.0 crate should work with the Rust 1.1 stable and nightly releases as of 8/2/15. +This library has been tested against RocksDB 3.8.1 on linux and OSX. The 0.1.1 crate should work with the Rust 1.2 stable and nightly releases as of 9/7/15. ### status - [x] basic open/put/get/delete/close @@ -15,7 +15,7 @@ This library has been tested against RocksDB 3.8.1 on linux and OSX. The 0.1.0 - [x] iterator - [x] comparator - [x] snapshot - - [ ] column family operations + - [x] column family operations - [ ] slicetransform - [ ] windows support @@ -32,7 +32,7 @@ sudo make install ###### Cargo.toml ```rust [dependencies] -rocksdb = "~0.1.0" +rocksdb = "~0.1.1" ``` ###### Code ```rust diff --git a/src/comparator.rs b/src/comparator.rs index 6cf8d0d..fb41676 100644 --- a/src/comparator.rs +++ b/src/comparator.rs @@ -21,7 +21,7 @@ use std::ptr; use std::slice; use rocksdb_options::Options; -use rocksdb::RocksDB; +use rocksdb::DB; pub struct ComparatorCallback { pub name: CString, @@ -75,8 +75,8 @@ fn test_reverse_compare(a: &[u8], b: &[u8]) -> c_int { // opts.create_if_missing(true); // opts.add_comparator("test comparator", test_reverse_compare); // { -// let db = RocksDB::open(&opts, path).unwrap(); +// let db = DB::open(&opts, path).unwrap(); // // TODO add interesting test // } -// assert!(RocksDB::destroy(&opts, path).is_ok()); +// assert!(DB::destroy(&opts, path).is_ok()); //} diff --git a/src/ffi.rs b/src/ffi.rs index 10c8cc9..5dd20a2 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -19,75 +19,75 @@ use std::ffi::CString; #[derive(Copy, Clone)] #[repr(C)] -pub struct RocksDBOptions(pub *const c_void); +pub struct DBOptions(pub *const c_void); #[derive(Copy, Clone)] #[repr(C)] -pub struct RocksDBInstance(pub *const c_void); +pub struct DBInstance(pub *const c_void); #[derive(Copy, Clone)] #[repr(C)] -pub struct RocksDBWriteOptions(pub *const c_void); +pub struct DBWriteOptions(pub *const c_void); #[derive(Copy, Clone)] #[repr(C)] -pub struct RocksDBReadOptions(pub *const c_void); +pub struct DBReadOptions(pub *const c_void); #[derive(Copy, Clone)] #[repr(C)] -pub struct RocksDBMergeOperator(pub *const c_void); +pub struct DBMergeOperator(pub *const c_void); #[derive(Copy, Clone)] #[repr(C)] -pub struct RocksDBBlockBasedTableOptions(pub *const c_void); +pub struct DBBlockBasedTableOptions(pub *const c_void); #[derive(Copy, Clone)] #[repr(C)] -pub struct RocksDBCache(pub *const c_void); +pub struct DBCache(pub *const c_void); #[derive(Copy, Clone)] #[repr(C)] -pub struct RocksDBFilterPolicy(pub *const c_void); +pub struct DBFilterPolicy(pub *const c_void); #[derive(Copy, Clone)] #[repr(C)] -pub struct RocksDBSnapshot(pub *const c_void); +pub struct DBSnapshot(pub *const c_void); #[derive(Copy, Clone)] #[repr(C)] -pub struct RocksDBIterator(pub *const c_void); +pub struct DBIterator(pub *const c_void); #[derive(Copy, Clone)] #[repr(C)] -pub struct RocksDBCFHandle(pub *const c_void); +pub struct DBCFHandle(pub *const c_void); #[derive(Copy, Clone)] #[repr(C)] -pub struct RocksDBWriteBatch(pub *const c_void); +pub struct DBWriteBatch(pub *const c_void); #[derive(Copy, Clone)] #[repr(C)] -pub struct RocksDBComparator(pub *const c_void); +pub struct DBComparator(pub *const c_void); -pub fn new_bloom_filter(bits: c_int) -> RocksDBFilterPolicy { +pub fn new_bloom_filter(bits: c_int) -> DBFilterPolicy { unsafe { rocksdb_filterpolicy_create_bloom(bits) } } -pub fn new_cache(capacity: size_t) -> RocksDBCache { +pub fn new_cache(capacity: size_t) -> DBCache { unsafe { rocksdb_cache_create_lru(capacity) } } #[repr(C)] -pub enum RocksDBCompressionType { - RocksDBNoCompression = 0, - RocksDBSnappyCompression = 1, - RocksDBZlibCompression = 2, - RocksDBBz2Compression = 3, - RocksDBLz4Compression = 4, - RocksDBLz4hcCompression = 5, +pub enum DBCompressionType { + DBNoCompression = 0, + DBSnappyCompression = 1, + DBZlibCompression = 2, + DBBz2Compression = 3, + DBLz4Compression = 4, + DBLz4hcCompression = 5, } #[repr(C)] -pub enum RocksDBCompactionStyle { - RocksDBLevelCompaction = 0, - RocksDBUniversalCompaction = 1, - RocksDBFifoCompaction = 2, +pub enum DBCompactionStyle { + DBLevelCompaction = 0, + DBUniversalCompaction = 1, + DBFifoCompaction = 2, } #[repr(C)] -pub enum RocksDBUniversalCompactionStyle { +pub enum DBUniversalCompactionStyle { rocksdb_similar_size_compaction_stop_style = 0, rocksdb_total_size_compaction_stop_style = 1, } @@ -95,160 +95,178 @@ pub enum RocksDBUniversalCompactionStyle { //TODO audit the use of boolean arguments, b/c I think they need to be u8 instead... #[link(name = "rocksdb")] extern { - pub fn rocksdb_options_create() -> RocksDBOptions; - pub fn rocksdb_options_destroy(opts: RocksDBOptions); - pub fn rocksdb_cache_create_lru(capacity: size_t) -> RocksDBCache; - pub fn rocksdb_cache_destroy(cache: RocksDBCache); - pub fn rocksdb_block_based_options_create() -> RocksDBBlockBasedTableOptions; - pub fn rocksdb_block_based_options_destroy(opts: RocksDBBlockBasedTableOptions); + pub fn rocksdb_options_create() -> DBOptions; + pub fn rocksdb_options_destroy(opts: DBOptions); + pub fn rocksdb_cache_create_lru(capacity: size_t) -> DBCache; + pub fn rocksdb_cache_destroy(cache: DBCache); + pub fn rocksdb_block_based_options_create() -> DBBlockBasedTableOptions; + pub fn rocksdb_block_based_options_destroy(opts: DBBlockBasedTableOptions); pub fn rocksdb_block_based_options_set_block_size( - block_options: RocksDBBlockBasedTableOptions, + block_options: DBBlockBasedTableOptions, block_size: size_t); pub fn rocksdb_block_based_options_set_block_size_deviation( - block_options: RocksDBBlockBasedTableOptions, + block_options: DBBlockBasedTableOptions, block_size_deviation: c_int); pub fn rocksdb_block_based_options_set_block_restart_interval( - block_options: RocksDBBlockBasedTableOptions, + block_options: DBBlockBasedTableOptions, block_restart_interval: c_int); pub fn rocksdb_block_based_options_set_filter_policy( - block_options: RocksDBBlockBasedTableOptions, - filter_policy: RocksDBFilterPolicy); + block_options: DBBlockBasedTableOptions, + filter_policy: DBFilterPolicy); pub fn rocksdb_block_based_options_set_no_block_cache( - block_options: RocksDBBlockBasedTableOptions, no_block_cache: bool); + block_options: DBBlockBasedTableOptions, no_block_cache: bool); pub fn rocksdb_block_based_options_set_block_cache( - block_options: RocksDBBlockBasedTableOptions, block_cache: RocksDBCache); + block_options: DBBlockBasedTableOptions, block_cache: DBCache); pub fn rocksdb_block_based_options_set_block_cache_compressed( - block_options: RocksDBBlockBasedTableOptions, - block_cache_compressed: RocksDBCache); + block_options: DBBlockBasedTableOptions, + block_cache_compressed: DBCache); pub fn rocksdb_block_based_options_set_whole_key_filtering( - ck_options: RocksDBBlockBasedTableOptions, doit: bool); + ck_options: DBBlockBasedTableOptions, doit: bool); pub fn rocksdb_options_set_block_based_table_factory( - options: RocksDBOptions, - block_options: RocksDBBlockBasedTableOptions); + options: DBOptions, + block_options: DBBlockBasedTableOptions); pub fn rocksdb_options_increase_parallelism( - options: RocksDBOptions, threads: c_int); + options: DBOptions, threads: c_int); pub fn rocksdb_options_optimize_level_style_compaction( - options: RocksDBOptions, memtable_memory_budget: c_int); + options: DBOptions, memtable_memory_budget: c_int); pub fn rocksdb_options_set_create_if_missing( - options: RocksDBOptions, v: bool); + options: DBOptions, v: bool); pub fn rocksdb_options_set_max_open_files( - options: RocksDBOptions, files: c_int); + options: DBOptions, files: c_int); pub fn rocksdb_options_set_use_fsync( - options: RocksDBOptions, v: c_int); + options: DBOptions, v: c_int); pub fn rocksdb_options_set_bytes_per_sync( - options: RocksDBOptions, bytes: u64); + options: DBOptions, bytes: u64); pub fn rocksdb_options_set_disable_data_sync( - options: RocksDBOptions, v: c_int); + options: DBOptions, v: c_int); pub fn rocksdb_options_optimize_for_point_lookup( - options: RocksDBOptions, block_cache_size_mb: u64); + options: DBOptions, block_cache_size_mb: u64); pub fn rocksdb_options_set_table_cache_numshardbits( - options: RocksDBOptions, bits: c_int); + options: DBOptions, bits: c_int); pub fn rocksdb_options_set_max_write_buffer_number( - options: RocksDBOptions, bufno: c_int); + options: DBOptions, bufno: c_int); pub fn rocksdb_options_set_min_write_buffer_number_to_merge( - options: RocksDBOptions, bufno: c_int); + options: DBOptions, bufno: c_int); pub fn rocksdb_options_set_level0_file_num_compaction_trigger( - options: RocksDBOptions, no: c_int); + options: DBOptions, no: c_int); pub fn rocksdb_options_set_level0_slowdown_writes_trigger( - options: RocksDBOptions, no: c_int); + options: DBOptions, no: c_int); pub fn rocksdb_options_set_level0_stop_writes_trigger( - options: RocksDBOptions, no: c_int); + options: DBOptions, no: c_int); pub fn rocksdb_options_set_write_buffer_size( - options: RocksDBOptions, bytes: u64); + options: DBOptions, bytes: u64); pub fn rocksdb_options_set_target_file_size_base( - options: RocksDBOptions, bytes: u64); + options: DBOptions, bytes: u64); pub fn rocksdb_options_set_target_file_size_multiplier( - options: RocksDBOptions, mul: c_int); + options: DBOptions, mul: c_int); pub fn rocksdb_options_set_max_log_file_size( - options: RocksDBOptions, bytes: u64); + options: DBOptions, bytes: u64); pub fn rocksdb_options_set_max_manifest_file_size( - options: RocksDBOptions, bytes: u64); + options: DBOptions, bytes: u64); pub fn rocksdb_options_set_hash_skip_list_rep( - options: RocksDBOptions, bytes: u64, a1: i32, a2: i32); + options: DBOptions, bytes: u64, a1: i32, a2: i32); pub fn rocksdb_options_set_compaction_style( - options: RocksDBOptions, cs: RocksDBCompactionStyle); + options: DBOptions, cs: DBCompactionStyle); pub fn rocksdb_options_set_compression( - options: RocksDBOptions, compression_style_no: c_int); + options: DBOptions, compression_style_no: c_int); pub fn rocksdb_options_set_max_background_compactions( - options: RocksDBOptions, max_bg_compactions: c_int); + options: DBOptions, max_bg_compactions: c_int); pub fn rocksdb_options_set_max_background_flushes( - options: RocksDBOptions, max_bg_flushes: c_int); + options: DBOptions, max_bg_flushes: c_int); pub fn rocksdb_options_set_filter_deletes( - options: RocksDBOptions, v: bool); + options: DBOptions, v: bool); pub fn rocksdb_options_set_disable_auto_compactions( - options: RocksDBOptions, v: c_int); + options: DBOptions, v: c_int); pub fn rocksdb_filterpolicy_create_bloom( - bits_per_key: c_int) -> RocksDBFilterPolicy; - pub fn rocksdb_open(options: RocksDBOptions, + bits_per_key: c_int) -> DBFilterPolicy; + pub fn rocksdb_open(options: DBOptions, path: *const i8, err: *mut *const i8 - ) -> RocksDBInstance; - pub fn rocksdb_writeoptions_create() -> RocksDBWriteOptions; - pub fn rocksdb_writeoptions_destroy(writeopts: RocksDBWriteOptions); - pub fn rocksdb_put(db: RocksDBInstance, - writeopts: RocksDBWriteOptions, + ) -> DBInstance; + pub fn rocksdb_writeoptions_create() -> DBWriteOptions; + pub fn rocksdb_writeoptions_destroy(writeopts: DBWriteOptions); + pub fn rocksdb_put(db: DBInstance, + writeopts: DBWriteOptions, k: *const u8, kLen: size_t, v: *const u8, vLen: size_t, err: *mut *const i8); - pub fn rocksdb_readoptions_create() -> RocksDBReadOptions; - pub fn rocksdb_readoptions_destroy(readopts: RocksDBReadOptions); + pub fn rocksdb_put_cf(db: DBInstance, + writeopts: DBWriteOptions, + cf: DBCFHandle, + k: *const u8, kLen: size_t, + v: *const u8, vLen: size_t, + err: *mut *const i8); + pub fn rocksdb_readoptions_create() -> DBReadOptions; + pub fn rocksdb_readoptions_destroy(readopts: DBReadOptions); pub fn rocksdb_readoptions_set_verify_checksums( - readopts: RocksDBReadOptions, + readopts: DBReadOptions, v: bool); pub fn rocksdb_readoptions_set_fill_cache( - readopts: RocksDBReadOptions, + readopts: DBReadOptions, v: bool); pub fn rocksdb_readoptions_set_snapshot( - readopts: RocksDBReadOptions, - snapshot: RocksDBSnapshot); //TODO how do I make this a const ref? + readopts: DBReadOptions, + snapshot: DBSnapshot); //TODO how do I make this a const ref? pub fn rocksdb_readoptions_set_iterate_upper_bound( - readopts: RocksDBReadOptions, + readopts: DBReadOptions, k: *const u8, kLen: size_t); pub fn rocksdb_readoptions_set_read_tier( - readopts: RocksDBReadOptions, + readopts: DBReadOptions, tier: c_int); pub fn rocksdb_readoptions_set_tailing( - readopts: RocksDBReadOptions, + readopts: DBReadOptions, v: bool); - pub fn rocksdb_get(db: RocksDBInstance, - readopts: RocksDBReadOptions, + pub fn rocksdb_get(db: DBInstance, + readopts: DBReadOptions, k: *const u8, kLen: size_t, valLen: *const size_t, err: *mut *const i8 ) -> *mut c_void; - pub fn rocksdb_get_cf(db: RocksDBInstance, - readopts: RocksDBReadOptions, - cf_handle: RocksDBCFHandle, + pub fn rocksdb_get_cf(db: DBInstance, + readopts: DBReadOptions, + cf_handle: DBCFHandle, k: *const u8, kLen: size_t, valLen: *const size_t, err: *mut *const i8 ) -> *mut c_void; - pub fn rocksdb_create_iterator(db: RocksDBInstance, - readopts: RocksDBReadOptions - ) -> RocksDBIterator; - pub fn rocksdb_create_iterator_cf(db: RocksDBInstance, - readopts: RocksDBReadOptions, - cf_handle: RocksDBCFHandle - ) -> RocksDBIterator; - pub fn rocksdb_create_snapshot(db: RocksDBInstance) -> RocksDBSnapshot; - pub fn rocksdb_release_snapshot(db: RocksDBInstance, - snapshot: RocksDBSnapshot); + pub fn rocksdb_create_iterator(db: DBInstance, + readopts: DBReadOptions + ) -> DBIterator; + pub fn rocksdb_create_iterator_cf(db: DBInstance, + readopts: DBReadOptions, + cf_handle: DBCFHandle + ) -> DBIterator; + pub fn rocksdb_create_snapshot(db: DBInstance) -> DBSnapshot; + pub fn rocksdb_release_snapshot(db: DBInstance, + snapshot: DBSnapshot); - pub fn rocksdb_delete(db: RocksDBInstance, - writeopts: RocksDBWriteOptions, + pub fn rocksdb_delete(db: DBInstance, + writeopts: DBWriteOptions, k: *const u8, kLen: size_t, err: *mut *const i8 ) -> *mut c_void; - pub fn rocksdb_close(db: RocksDBInstance); - pub fn rocksdb_destroy_db(options: RocksDBOptions, + pub fn rocksdb_delete_cf(db: DBInstance, + writeopts: DBWriteOptions, + cf: DBCFHandle, + k: *const u8, kLen: size_t, + err: *mut *const i8 + ) -> *mut c_void; + pub fn rocksdb_close(db: DBInstance); + pub fn rocksdb_destroy_db(options: DBOptions, path: *const i8, err: *mut *const i8); - pub fn rocksdb_repair_db(options: RocksDBOptions, + pub fn rocksdb_repair_db(options: DBOptions, path: *const i8, err: *mut *const i8); // Merge - pub fn rocksdb_merge(db: RocksDBInstance, - writeopts: RocksDBWriteOptions, + pub fn rocksdb_merge(db: DBInstance, + writeopts: DBWriteOptions, + k: *const u8, kLen: size_t, + v: *const u8, vLen: size_t, + err: *mut *const i8); + pub fn rocksdb_merge_cf(db: DBInstance, + writeopts: DBWriteOptions, + cf: DBCFHandle, k: *const u8, kLen: size_t, v: *const u8, vLen: size_t, err: *mut *const i8); @@ -276,73 +294,73 @@ extern { value_len: *mut size_t ) -> ()>, name_fn: extern fn(*mut c_void) -> *const c_char, - ) -> RocksDBMergeOperator; - pub fn rocksdb_mergeoperator_destroy(mo: RocksDBMergeOperator); - pub fn rocksdb_options_set_merge_operator(options: RocksDBOptions, - mo: RocksDBMergeOperator); + ) -> DBMergeOperator; + pub fn rocksdb_mergeoperator_destroy(mo: DBMergeOperator); + pub fn rocksdb_options_set_merge_operator(options: DBOptions, + mo: DBMergeOperator); // Iterator - pub fn rocksdb_iter_destroy(iter: RocksDBIterator); - pub fn rocksdb_iter_valid(iter: RocksDBIterator) -> bool; - pub fn rocksdb_iter_seek_to_first(iter: RocksDBIterator); - pub fn rocksdb_iter_seek_to_last(iter: RocksDBIterator); - pub fn rocksdb_iter_seek(iter: RocksDBIterator, + pub fn rocksdb_iter_destroy(iter: DBIterator); + pub fn rocksdb_iter_valid(iter: DBIterator) -> bool; + pub fn rocksdb_iter_seek_to_first(iter: DBIterator); + pub fn rocksdb_iter_seek_to_last(iter: DBIterator); + pub fn rocksdb_iter_seek(iter: DBIterator, key: *const u8, klen: size_t); - pub fn rocksdb_iter_next(iter: RocksDBIterator); - pub fn rocksdb_iter_prev(iter: RocksDBIterator); - pub fn rocksdb_iter_key(iter: RocksDBIterator, + pub fn rocksdb_iter_next(iter: DBIterator); + pub fn rocksdb_iter_prev(iter: DBIterator); + pub fn rocksdb_iter_key(iter: DBIterator, klen: *mut size_t) -> *mut u8; - pub fn rocksdb_iter_value(iter: RocksDBIterator, + pub fn rocksdb_iter_value(iter: DBIterator, vlen: *mut size_t) -> *mut u8; - pub fn rocksdb_iter_get_error(iter: RocksDBIterator, + pub fn rocksdb_iter_get_error(iter: DBIterator, err: *mut *const u8); // Write batch - pub fn rocksdb_write(db: RocksDBInstance, - writeopts: RocksDBWriteOptions, - batch : RocksDBWriteBatch, + pub fn rocksdb_write(db: DBInstance, + writeopts: DBWriteOptions, + batch : DBWriteBatch, err: *mut *const i8); - pub fn rocksdb_writebatch_create() -> RocksDBWriteBatch; + pub fn rocksdb_writebatch_create() -> DBWriteBatch; pub fn rocksdb_writebatch_create_from(rep: *const u8, - size: size_t) -> RocksDBWriteBatch; - pub fn rocksdb_writebatch_destroy(batch: RocksDBWriteBatch); - pub fn rocksdb_writebatch_clear(batch: RocksDBWriteBatch); - pub fn rocksdb_writebatch_count(batch: RocksDBWriteBatch) -> c_int; - pub fn rocksdb_writebatch_put(batch: RocksDBWriteBatch, + size: size_t) -> DBWriteBatch; + pub fn rocksdb_writebatch_destroy(batch: DBWriteBatch); + pub fn rocksdb_writebatch_clear(batch: DBWriteBatch); + pub fn rocksdb_writebatch_count(batch: DBWriteBatch) -> c_int; + pub fn rocksdb_writebatch_put(batch: DBWriteBatch, key: *const u8, klen: size_t, val: *const u8, vlen: size_t); - pub fn rocksdb_writebatch_put_cf(batch: RocksDBWriteBatch, - cf: RocksDBCFHandle, + pub fn rocksdb_writebatch_put_cf(batch: DBWriteBatch, + cf: DBCFHandle, key: *const u8, klen: size_t, val: *const u8, vlen: size_t); pub fn rocksdb_writebatch_merge( - batch: RocksDBWriteBatch, + batch: DBWriteBatch, key: *const u8, klen: size_t, val: *const u8, vlen: size_t); pub fn rocksdb_writebatch_merge_cf( - batch: RocksDBWriteBatch, - cf: RocksDBCFHandle, + batch: DBWriteBatch, + cf: DBCFHandle, key: *const u8, klen: size_t, val: *const u8, vlen: size_t); pub fn rocksdb_writebatch_delete( - batch: RocksDBWriteBatch, + batch: DBWriteBatch, key: *const u8, klen: size_t); pub fn rocksdb_writebatch_delete_cf( - batch: RocksDBWriteBatch, - cf: RocksDBCFHandle, + batch: DBWriteBatch, + cf: DBCFHandle, key: *const u8, klen: size_t); pub fn rocksdb_writebatch_iterate( - batch: RocksDBWriteBatch, + batch: DBWriteBatch, state: *mut c_void, put_fn: extern fn(state: *mut c_void, k: *const u8, klen: size_t, v: *const u8, vlen: size_t), deleted_fn: extern fn(state: *mut c_void, k: *const u8, klen: size_t)); - pub fn rocksdb_writebatch_data(batch: RocksDBWriteBatch, + pub fn rocksdb_writebatch_data(batch: DBWriteBatch, size: *mut size_t) -> *const u8; // Comparator - pub fn rocksdb_options_set_comparator(options: RocksDBOptions, - cb: RocksDBComparator); + pub fn rocksdb_options_set_comparator(options: DBOptions, + cb: DBComparator); pub fn rocksdb_comparator_create( state: *mut c_void, destroy: extern fn(*mut c_void) -> (), @@ -351,11 +369,30 @@ extern { b: *const c_char, blen: size_t ) -> c_int, name_fn: extern fn(*mut c_void) -> *const c_char - ) -> RocksDBComparator; - pub fn rocksdb_comparator_destroy(cmp: RocksDBComparator); + ) -> DBComparator; + pub fn rocksdb_comparator_destroy(cmp: DBComparator); + + // Column Family + pub fn rocksdb_open_column_families(options: DBOptions, + path: *const i8, + num_column_families: c_int, + column_family_names: *const *const i8, + column_family_options: *const DBOptions, + column_family_handles: *const DBCFHandle, + err: *mut *const i8 + ) -> DBInstance; + pub fn rocksdb_create_column_family(db: DBInstance, + column_family_options: DBOptions, + column_family_name: *const i8, + err: *mut *const i8 + ) -> DBCFHandle; + pub fn rocksdb_drop_column_family(db: DBInstance, + column_family_handle: DBCFHandle, + err: *mut *const i8); + pub fn rocksdb_column_family_handle_destroy(column_family_handle: DBCFHandle); + } -#[allow(dead_code)] #[test] fn internal() { unsafe { diff --git a/src/lib.rs b/src/lib.rs index 7cee152..9ba104a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,8 +17,8 @@ #![crate_type = "lib"] pub use ffi as rocksdb_ffi; -pub use ffi::{new_bloom_filter, RocksDBCompactionStyle, RocksDBComparator}; -pub use rocksdb::{RocksDB, RocksDBResult, RocksDBVector, WriteBatch, Writable, Direction}; +pub use ffi::{new_bloom_filter, DBCompactionStyle, DBComparator}; +pub use rocksdb::{DB, DBResult, DBVector, WriteBatch, Writable, Direction}; pub use rocksdb_options::{Options, BlockBasedOptions}; pub use merge_operator::MergeOperands; pub mod rocksdb; diff --git a/src/main.rs b/src/main.rs index 0d134db..4c4eabb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,13 +14,13 @@ limitations under the License. */ extern crate rocksdb; -use rocksdb::{Options, RocksDB, MergeOperands, new_bloom_filter, Writable, }; -use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction; +use rocksdb::{Options, DB, MergeOperands, new_bloom_filter, Writable, }; +use rocksdb::DBCompactionStyle::DBUniversalCompaction; //fn snapshot_test() { // let path = "_rust_rocksdb_iteratortest"; // { -// let mut db = RocksDB::open_default(path).unwrap(); +// let mut db = DB::open_default(path).unwrap(); // let p = db.put(b"k1", b"v1111"); // assert!(p.is_ok()); // let p = db.put(b"k2", b"v2222"); @@ -41,13 +41,13 @@ use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction; // }; // } // let opts = Options::new(); -// assert!(RocksDB::destroy(&opts, path).is_ok()); +// assert!(DB::destroy(&opts, path).is_ok()); //} #[cfg(not(feature = "valgrind"))] fn main() { let path = "/tmp/rust-rocksdb"; - let mut db = RocksDB::open_default(path).unwrap(); + let mut db = DB::open_default(path).unwrap(); assert!(db.put(b"my key", b"my value").is_ok()); db.get(b"my key").map( |value| { match value.to_utf8() { @@ -88,7 +88,7 @@ fn custom_merge() { opts.create_if_missing(true); opts.add_merge_operator("test operator", concat_merge); { - let mut db = RocksDB::open(&opts, path).unwrap(); + let mut db = DB::open(&opts, path).unwrap(); db.put(b"k1", b"a"); db.merge(b"k1", b"b"); db.merge(b"k1", b"c"); @@ -107,7 +107,7 @@ fn custom_merge() { .on_error( |e| { println!("error retrieving value: {}", e) }); } - RocksDB::destroy(&opts, path).is_ok(); + DB::destroy(&opts, path).is_ok(); } #[cfg(feature = "valgrind")] @@ -116,7 +116,7 @@ fn main() { let mut opts = Options::new(); opts.create_if_missing(true); opts.add_merge_operator("test operator", concat_merge); - let db = RocksDB::open(&opts, path).unwrap(); + let db = DB::open(&opts, path).unwrap(); loop { db.put(b"k1", b"a"); db.merge(b"k1", b"b"); @@ -141,10 +141,10 @@ fn main() { mod tests { use std::thread::sleep_ms; - use rocksdb::{BlockBasedOptions, Options, RocksDB, MergeOperands, new_bloom_filter, Writable }; - use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction; + use rocksdb::{BlockBasedOptions, Options, DB, MergeOperands, new_bloom_filter, Writable }; + use rocksdb::DBCompactionStyle::DBUniversalCompaction; - fn tuned_for_somebody_elses_disk(path: &str, opts: & mut Options, blockopts: &mut BlockBasedOptions) -> RocksDB { + fn tuned_for_somebody_elses_disk(path: &str, opts: & mut Options, blockopts: &mut BlockBasedOptions) -> DB { opts.create_if_missing(true); opts.set_max_open_files(10000); opts.set_use_fsync(false); @@ -158,7 +158,7 @@ mod tests { opts.set_min_write_buffer_number_to_merge(4); opts.set_level_zero_stop_writes_trigger(2000); opts.set_level_zero_slowdown_writes_trigger(0); - opts.set_compaction_style(RocksDBUniversalCompaction); + opts.set_compaction_style(DBUniversalCompaction); opts.set_max_background_compactions(4); opts.set_max_background_flushes(4); opts.set_filter_deletes(false); @@ -169,7 +169,7 @@ mod tests { let filter = new_bloom_filter(10); //opts.set_filter(filter); - RocksDB::open(&opts, path).unwrap() + DB::open(&opts, path).unwrap() } /* TODO(tyler) unstable @@ -204,7 +204,7 @@ mod tests { i += 1; }); } - RocksDB::destroy(&opts, path).is_ok(); + DB::destroy(&opts, path).is_ok(); } */ } diff --git a/src/merge_operator.rs b/src/merge_operator.rs index 6783e49..4495380 100644 --- a/src/merge_operator.rs +++ b/src/merge_operator.rs @@ -21,7 +21,7 @@ use std::ptr; use std::slice; use rocksdb_options::Options; -use rocksdb::{RocksDB, RocksDBResult, RocksDBVector, Writable}; +use rocksdb::{DB, DBResult, DBVector, Writable}; pub struct MergeOperatorCallback { pub name: CString, @@ -187,7 +187,7 @@ fn mergetest() { opts.create_if_missing(true); opts.add_merge_operator("test operator", test_provided_merge); { - let mut db = RocksDB::open(&opts, path).unwrap(); + let mut db = DB::open(&opts, path).unwrap(); let p = db.put(b"k1", b"a"); assert!(p.is_ok()); db.merge(b"k1", b"b"); @@ -207,10 +207,10 @@ fn mergetest() { .on_error( |e| { println!("error reading value")}); //: {", e) }); assert!(m.is_ok()); - let r: RocksDBResult = db.get(b"k1"); + let r: DBResult = db.get(b"k1"); assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh"); assert!(db.delete(b"k1").is_ok()); assert!(db.get(b"k1").is_none()); } - assert!(RocksDB::destroy(&opts, path).is_ok()); + assert!(DB::destroy(&opts, path).is_ok()); } diff --git a/src/rocksdb.rs b/src/rocksdb.rs index 6ae0ab0..20e32b6 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -15,42 +15,46 @@ */ extern crate libc; -use self::libc::{c_void, size_t}; + +use std::collections::BTreeMap; use std::ffi::{CString, CStr}; use std::fs; +use std::io; use std::ops::Deref; use std::path::Path; use std::slice; use std::str::from_utf8; -use std::marker::PhantomData; -use rocksdb_ffi; +use self::libc::{c_void, size_t}; + +use rocksdb_ffi::{self, DBCFHandle}; use rocksdb_options::Options; -pub struct RocksDB { - inner: rocksdb_ffi::RocksDBInstance, +pub struct DB { + inner: rocksdb_ffi::DBInstance, + cfs: BTreeMap, } -unsafe impl Send for RocksDB {} -unsafe impl Sync for RocksDB {} +unsafe impl Send for DB {} +unsafe impl Sync for DB {} pub struct WriteBatch { - inner: rocksdb_ffi::RocksDBWriteBatch, + inner: rocksdb_ffi::DBWriteBatch, } pub struct ReadOptions { - inner: rocksdb_ffi::RocksDBReadOptions, + inner: rocksdb_ffi::DBReadOptions, } pub struct Snapshot<'a> { - db: &'a RocksDB, - inner: rocksdb_ffi::RocksDBSnapshot, + db: &'a DB, + inner: rocksdb_ffi::DBSnapshot, } pub struct DBIterator { // TODO: should have a reference to DB to enforce scope, but it's trickier than I // thought to add - inner: rocksdb_ffi::RocksDBIterator, + inner: rocksdb_ffi::DBIterator, direction: Direction, just_seeked: bool, } @@ -97,7 +101,7 @@ impl <'a> Iterator for SubDBIterator<'a> { impl DBIterator { //TODO alias db & opts to different lifetimes, and DBIterator to the db's lifetime - fn new(db: &RocksDB, readopts: &ReadOptions) -> DBIterator { + fn new(db: &DB, readopts: &ReadOptions) -> DBIterator { unsafe { let iterator = rocksdb_ffi::rocksdb_create_iterator(db.inner, readopts.inner); rocksdb_ffi::rocksdb_iter_seek_to_first(iterator); @@ -105,6 +109,20 @@ impl DBIterator { } } + fn new_cf(db: &DB, cf_name: &str, readopts: &ReadOptions) -> Result { + let cf = db.cfs.get(cf_name); + if cf.is_none() { + return Err(format!("Invalid column family: {}", cf_name).to_string()); + } + unsafe { + let iterator = rocksdb_ffi::rocksdb_create_iterator_cf(db.inner, + readopts.inner, + *cf.unwrap()); + rocksdb_ffi::rocksdb_iter_seek_to_first(iterator); + Ok(DBIterator{ inner: iterator, direction: Direction::forward, just_seeked: true }) + } + } + pub fn from_start(&mut self) -> SubDBIterator { self.just_seeked = true; unsafe { @@ -139,7 +157,7 @@ impl Drop for DBIterator { } impl <'a> Snapshot<'a> { - pub fn new(db: &RocksDB) -> Snapshot { + pub fn new(db: &DB) -> Snapshot { let snapshot = unsafe { rocksdb_ffi::rocksdb_create_snapshot(db.inner) }; Snapshot { db: db, inner: snapshot } } @@ -159,11 +177,14 @@ impl <'a> Drop for Snapshot<'a> { } } -// This is for the RocksDB and write batches to share the same API +// This is for the DB and write batches to share the same API pub trait Writable { fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String>; + fn put_cf(&self, cf: DBCFHandle, key: &[u8], value: &[u8]) -> Result<(), String>; fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String>; + fn merge_cf(&self, cf: DBCFHandle, key: &[u8], value: &[u8]) -> Result<(), String>; fn delete(&self, key: &[u8]) -> Result<(), String>; + fn delete_cf(&self, cf: DBCFHandle, key: &[u8]) -> Result<(), String>; } fn error_message(ptr: *const i8) -> String { @@ -175,14 +196,18 @@ fn error_message(ptr: *const i8) -> String { s } -impl RocksDB { - pub fn open_default(path: &str) -> Result { +impl DB { + pub fn open_default(path: &str) -> Result { let mut opts = Options::new(); opts.create_if_missing(true); - RocksDB::open(&opts, path) + DB::open(&opts, path) + } + + pub fn open(opts: &Options, path: &str) -> Result { + DB::open_cf(opts, path, &[]) } - pub fn open(opts: &Options, path: &str) -> Result { + pub fn open_cf(opts: &Options, path: &str, cfs: &[&str]) -> Result { let cpath = match CString::new(path.as_bytes()) { Ok(c) => c, Err(_) => @@ -198,20 +223,71 @@ impl RocksDB { let mut err: *const i8 = 0 as *const i8; let err_ptr: *mut *const i8 = &mut err; - let db: rocksdb_ffi::RocksDBInstance; + let db: rocksdb_ffi::DBInstance; + let mut cfMap = BTreeMap::new(); - unsafe { - db = rocksdb_ffi::rocksdb_open(opts.inner, cpath_ptr, err_ptr); + if cfs.len() == 0 { + unsafe { + db = rocksdb_ffi::rocksdb_open(opts.inner, cpath_ptr, err_ptr); + } + } else { + let mut cfs_v = cfs.to_vec(); + // Always open the default column family + if !cfs_v.contains(&"default") { + cfs_v.push("default"); + } + + // We need to store our CStrings in an intermediate vector + // so that their pointers remain valid. + let c_cfs: Vec = cfs_v.iter().map( |cf| { + CString::new(cf.as_bytes()).unwrap() + }).collect(); + + let cfnames: Vec<*const i8> = c_cfs.iter().map( |cf| { + cf.as_ptr() + }).collect(); + + // These handles will be populated by DB. + let mut cfhandles: Vec = + cfs_v.iter().map( |_| { + rocksdb_ffi::DBCFHandle(0 as *mut c_void) + }).collect(); + + // TODO(tyler) allow options to be passed in. + let cfopts: Vec = cfs_v.iter().map( |_| { + unsafe { rocksdb_ffi::rocksdb_options_create() } + }).collect(); + + // Prepare to ship to C. + let copts: *const rocksdb_ffi::DBOptions = cfopts.as_ptr(); + let handles: *const rocksdb_ffi::DBCFHandle = cfhandles.as_ptr(); + let nfam = cfs_v.len(); + unsafe { + db = rocksdb_ffi::rocksdb_open_column_families(opts.inner, cpath_ptr, + nfam as libc::c_int, + cfnames.as_ptr(), + copts, handles, err_ptr); + } + + for handle in cfhandles.iter() { + if handle.0.is_null() { + return Err("Received null column family handle from DB.".to_string()); + } + } + + for (n, h) in cfs_v.iter().zip(cfhandles) { + cfMap.insert(n.to_string(), h); + } } if !err.is_null() { return Err(error_message(err)); } - let rocksdb_ffi::RocksDBInstance(db_ptr) = db; - if db_ptr.is_null() { + if db.0.is_null() { return Err("Could not initialize database.".to_string()); } - Ok(RocksDB { inner: db }) + + Ok(DB { inner: db, cfs: cfMap }) } pub fn destroy(opts: &Options, path: &str) -> Result<(), String> { @@ -260,11 +336,11 @@ impl RocksDB { return Ok(()) } - pub fn get(&self, key: &[u8]) -> RocksDBResult { + pub fn get(&self, key: &[u8]) -> DBResult { unsafe { let readopts = rocksdb_ffi::rocksdb_readoptions_create(); if readopts.0.is_null() { - return RocksDBResult::Error("Unable to create rocksdb read \ + return DBResult::Error("Unable to create rocksdb read \ options. This is a fairly trivial call, and its failure \ may be indicative of a mis-compiled or mis-loaded rocksdb \ library.".to_string()); @@ -278,29 +354,107 @@ impl RocksDB { key.as_ptr(), key.len() as size_t, val_len_ptr, err_ptr) as *mut u8; rocksdb_ffi::rocksdb_readoptions_destroy(readopts); if !err.is_null() { - return RocksDBResult::Error(error_message(err)); + return DBResult::Error(error_message(err)); + } + match val.is_null() { + true => DBResult::None, + false => { + DBResult::Some(DBVector::from_c(val, val_len)) + } + } + } + } + + pub fn get_cf(&self, cf: DBCFHandle, key: &[u8]) -> DBResult { + unsafe { + let readopts = rocksdb_ffi::rocksdb_readoptions_create(); + if readopts.0.is_null() { + return DBResult::Error("Unable to create rocksdb read \ + options. This is a fairly trivial call, and its failure \ + may be indicative of a mis-compiled or mis-loaded rocksdb \ + library.".to_string()); + } + + let val_len: size_t = 0; + let val_len_ptr = &val_len as *const size_t; + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; + let val = rocksdb_ffi::rocksdb_get_cf(self.inner, readopts.clone(), + cf, key.as_ptr(), key.len() as size_t, val_len_ptr, + err_ptr) as *mut u8; + rocksdb_ffi::rocksdb_readoptions_destroy(readopts); + if !err.is_null() { + return DBResult::Error(error_message(err)); } match val.is_null() { - true => RocksDBResult::None, + true => DBResult::None, false => { - RocksDBResult::Some(RocksDBVector::from_c(val, val_len)) + DBResult::Some(DBVector::from_c(val, val_len)) } } } } + pub fn create_cf(&mut self, name: &str, opts: &Options) -> Result { + let cname = match CString::new(name.as_bytes()) { + Ok(c) => c, + Err(_) => + return Err("Failed to convert path to CString when opening rocksdb".to_string()), + }; + let cname_ptr = cname.as_ptr(); + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; + let cf_handler = unsafe { + let cf_handler = rocksdb_ffi::rocksdb_create_column_family( + self.inner, opts.inner, cname_ptr, err_ptr); + self.cfs.insert(name.to_string(), cf_handler); + cf_handler + }; + if !err.is_null() { + return Err(error_message(err)); + } + Ok(cf_handler) + } + + pub fn drop_cf(&mut self, name: &str) -> Result<(), String> { + let cf = self.cfs.get(name); + if cf.is_none() { + return Err(format!("Invalid column family: {}", name).to_string()); + } + + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; + unsafe { + rocksdb_ffi::rocksdb_drop_column_family(self.inner, *cf.unwrap(), err_ptr); + } + if !err.is_null() { + return Err(error_message(err)); + } + + Ok(()) + } + + pub fn cf_handle(&self, name: &str) -> Option<&DBCFHandle> { + self.cfs.get(name) + } + pub fn iterator(&self) -> DBIterator { let opts = ReadOptions::new(); DBIterator::new(&self, &opts) } + pub fn iterator_cf(&self, cf: &str) -> Result { + let opts = ReadOptions::new(); + DBIterator::new_cf(&self, cf, &opts) + } + pub fn snapshot(&self) -> Snapshot { Snapshot::new(self) } } -impl Writable for RocksDB { - fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> { +impl Writable for DB { + fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> { unsafe { let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let mut err: *const i8 = 0 as *const i8; @@ -312,7 +466,23 @@ impl Writable for RocksDB { if !err.is_null() { return Err(error_message(err)); } - return Ok(()) + Ok(()) + } + } + + fn put_cf(&self, cf: DBCFHandle, key: &[u8], value: &[u8]) -> Result<(), String> { + unsafe { + let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; + rocksdb_ffi::rocksdb_put_cf(self.inner, writeopts.clone(), cf, + key.as_ptr(), key.len() as size_t, value.as_ptr(), + value.len() as size_t, err_ptr); + rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); + if !err.is_null() { + return Err(error_message(err)); + } + Ok(()) } } @@ -328,7 +498,24 @@ impl Writable for RocksDB { if !err.is_null() { return Err(error_message(err)); } - return Ok(()) + Ok(()) + } + } + + fn merge_cf(&self, cf: DBCFHandle, key: &[u8], value: &[u8]) -> Result<(), String> { + unsafe { + let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; + rocksdb_ffi::rocksdb_merge_cf(self.inner, writeopts.clone(), + cf, key.as_ptr(), + key.len() as size_t, value.as_ptr(), + value.len() as size_t, err_ptr); + rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); + if !err.is_null() { + return Err(error_message(err)); + } + Ok(()) } } @@ -343,7 +530,23 @@ impl Writable for RocksDB { if !err.is_null() { return Err(error_message(err)); } - return Ok(()) + Ok(()) + } + } + + fn delete_cf(&self, cf: DBCFHandle, key: &[u8]) -> Result<(), String> { + unsafe { + let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; + rocksdb_ffi::rocksdb_delete_cf(self.inner, writeopts.clone(), + cf, key.as_ptr(), + key.len() as size_t, err_ptr); + rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); + if !err.is_null() { + return Err(error_message(err)); + } + Ok(()) } } } @@ -366,9 +569,14 @@ impl Drop for WriteBatch { } } -impl Drop for RocksDB { +impl Drop for DB { fn drop(&mut self) { - unsafe { rocksdb_ffi::rocksdb_close(self.inner); } + unsafe { + for (_, cf) in self.cfs.iter() { + rocksdb_ffi::rocksdb_column_family_handle_destroy(*cf); + } + rocksdb_ffi::rocksdb_close(self.inner); + } } } @@ -378,7 +586,16 @@ impl Writable for WriteBatch { rocksdb_ffi::rocksdb_writebatch_put(self.inner, key.as_ptr(), key.len() as size_t, value.as_ptr(), value.len() as size_t); - return Ok(()) + Ok(()) + } + } + + fn put_cf(&self, cf: DBCFHandle, key: &[u8], value: &[u8]) -> Result<(), String> { + unsafe { + rocksdb_ffi::rocksdb_writebatch_put_cf(self.inner, cf, key.as_ptr(), + key.len() as size_t, value.as_ptr(), + value.len() as size_t); + Ok(()) } } @@ -387,7 +604,16 @@ impl Writable for WriteBatch { rocksdb_ffi::rocksdb_writebatch_merge(self.inner, key.as_ptr(), key.len() as size_t, value.as_ptr(), value.len() as size_t); - return Ok(()) + Ok(()) + } + } + + fn merge_cf(&self, cf: DBCFHandle, key: &[u8], value: &[u8]) -> Result<(), String> { + unsafe { + rocksdb_ffi::rocksdb_writebatch_merge_cf(self.inner, cf, key.as_ptr(), + key.len() as size_t, value.as_ptr(), + value.len() as size_t); + Ok(()) } } @@ -395,7 +621,16 @@ impl Writable for WriteBatch { unsafe { rocksdb_ffi::rocksdb_writebatch_delete(self.inner, key.as_ptr(), key.len() as size_t); - return Ok(()) + Ok(()) + } + } + + fn delete_cf(&self, cf: DBCFHandle, key: &[u8]) -> Result<(), String> { + unsafe { + rocksdb_ffi::rocksdb_writebatch_delete_cf(self.inner, + cf, key.as_ptr(), + key.len() as size_t); + Ok(()) } } } @@ -430,19 +665,19 @@ impl ReadOptions { } } -pub struct RocksDBVector { +pub struct DBVector { base: *mut u8, len: usize, } -impl Deref for RocksDBVector { +impl Deref for DBVector { type Target = [u8]; fn deref(&self) -> &[u8] { unsafe { slice::from_raw_parts(self.base, self.len) } } } -impl Drop for RocksDBVector { +impl Drop for DBVector { fn drop(&mut self) { unsafe { libc::free(self.base as *mut libc::c_void); @@ -450,10 +685,10 @@ impl Drop for RocksDBVector { } } -impl RocksDBVector { - pub fn from_c(val: *mut u8, val_len: size_t) -> RocksDBVector { +impl DBVector { + pub fn from_c(val: *mut u8, val_len: size_t) -> DBVector { unsafe { - RocksDBVector { + DBVector { base: val, len: val_len as usize, } @@ -465,101 +700,100 @@ impl RocksDBVector { } } -// RocksDBResult exists because of the inherent difference between +// DBResult exists because of the inherent difference between // an operational failure and the absence of a possible result. #[derive(Clone, PartialEq, PartialOrd, Eq, Ord, Debug)] -pub enum RocksDBResult { +pub enum DBResult { Some(T), None, Error(E), } -impl RocksDBResult { - pub fn map U>(self, f: F) -> RocksDBResult { +impl DBResult { + pub fn map U>(self, f: F) -> DBResult { match self { - RocksDBResult::Some(x) => RocksDBResult::Some(f(x)), - RocksDBResult::None => RocksDBResult::None, - RocksDBResult::Error(e) => RocksDBResult::Error(e), + DBResult::Some(x) => DBResult::Some(f(x)), + DBResult::None => DBResult::None, + DBResult::Error(e) => DBResult::Error(e), } } pub fn unwrap(self) -> T { match self { - RocksDBResult::Some(x) => x, - RocksDBResult::None => - panic!("Attempted unwrap on RocksDBResult::None"), - RocksDBResult::Error(_) => - panic!("Attempted unwrap on RocksDBResult::Error"), + DBResult::Some(x) => x, + DBResult::None => + panic!("Attempted unwrap on DBResult::None"), + DBResult::Error(_) => + panic!("Attempted unwrap on DBResult::Error"), } } - pub fn on_error U>(self, f: F) -> RocksDBResult { + pub fn on_error U>(self, f: F) -> DBResult { match self { - RocksDBResult::Some(x) => RocksDBResult::Some(x), - RocksDBResult::None => RocksDBResult::None, - RocksDBResult::Error(e) => RocksDBResult::Error(f(e)), + DBResult::Some(x) => DBResult::Some(x), + DBResult::None => DBResult::None, + DBResult::Error(e) => DBResult::Error(f(e)), } } - pub fn on_absent ()>(self, f: F) -> RocksDBResult { + pub fn on_absent ()>(self, f: F) -> DBResult { match self { - RocksDBResult::Some(x) => RocksDBResult::Some(x), - RocksDBResult::None => { + DBResult::Some(x) => DBResult::Some(x), + DBResult::None => { f(); - RocksDBResult::None + DBResult::None }, - RocksDBResult::Error(e) => RocksDBResult::Error(e), + DBResult::Error(e) => DBResult::Error(e), } } pub fn is_some(self) -> bool { match self { - RocksDBResult::Some(_) => true, - RocksDBResult::None => false, - RocksDBResult::Error(_) => false, + DBResult::Some(_) => true, + DBResult::None => false, + DBResult::Error(_) => false, } } pub fn is_none(self) -> bool { match self { - RocksDBResult::Some(_) => false, - RocksDBResult::None => true, - RocksDBResult::Error(_) => false, + DBResult::Some(_) => false, + DBResult::None => true, + DBResult::Error(_) => false, } } pub fn is_error(self) -> bool { match self { - RocksDBResult::Some(_) => false, - RocksDBResult::None => false, - RocksDBResult::Error(_) => true, + DBResult::Some(_) => false, + DBResult::None => false, + DBResult::Error(_) => true, } } } -#[allow(dead_code)] #[test] fn external() { let path = "_rust_rocksdb_externaltest"; { - let mut db = RocksDB::open_default(path).unwrap(); + let mut db = DB::open_default(path).unwrap(); let p = db.put(b"k1", b"v1111"); assert!(p.is_ok()); - let r: RocksDBResult = db.get(b"k1"); + let r: DBResult = db.get(b"k1"); assert!(r.unwrap().to_utf8().unwrap() == "v1111"); assert!(db.delete(b"k1").is_ok()); assert!(db.get(b"k1").is_none()); } let opts = Options::new(); - let result = RocksDB::destroy(&opts, path); + let result = DB::destroy(&opts, path); assert!(result.is_ok()); } #[test] fn errors_do_stuff() { let path = "_rust_rocksdb_error"; - let mut db = RocksDB::open_default(path).unwrap(); + let mut db = DB::open_default(path).unwrap(); let opts = Options::new(); // The DB will still be open when we try to destroy and the lock should fail - match RocksDB::destroy(&opts, path) { + match DB::destroy(&opts, path) { Err(ref s) => assert!(s == "IO error: lock _rust_rocksdb_error/LOCK: No locks available"), Ok(_) => panic!("should fail") } @@ -569,7 +803,7 @@ fn errors_do_stuff() { fn writebatch_works() { let path = "_rust_rocksdb_writebacktest"; { - let mut db = RocksDB::open_default(path).unwrap(); + let mut db = DB::open_default(path).unwrap(); { // test put let mut batch = WriteBatch::new(); assert!(db.get(b"k1").is_none()); @@ -577,7 +811,7 @@ fn writebatch_works() { assert!(db.get(b"k1").is_none()); let p = db.write(batch); assert!(p.is_ok()); - let r: RocksDBResult = db.get(b"k1"); + let r: DBResult = db.get(b"k1"); assert!(r.unwrap().to_utf8().unwrap() == "v1111"); } { // test delete @@ -589,14 +823,14 @@ fn writebatch_works() { } } let opts = Options::new(); - assert!(RocksDB::destroy(&opts, path).is_ok()); + assert!(DB::destroy(&opts, path).is_ok()); } #[test] fn iterator_test() { let path = "_rust_rocksdb_iteratortest"; { - let mut db = RocksDB::open_default(path).unwrap(); + let mut db = DB::open_default(path).unwrap(); let p = db.put(b"k1", b"v1111"); assert!(p.is_ok()); let p = db.put(b"k2", b"v2222"); @@ -609,5 +843,5 @@ fn iterator_test() { } } let opts = Options::new(); - assert!(RocksDB::destroy(&opts, path).is_ok()); + assert!(DB::destroy(&opts, path).is_ok()); } diff --git a/src/rocksdb_options.rs b/src/rocksdb_options.rs index 08f290c..050c805 100644 --- a/src/rocksdb_options.rs +++ b/src/rocksdb_options.rs @@ -24,11 +24,11 @@ use merge_operator::{self, MergeOperatorCallback, MergeOperands, use comparator::{self, ComparatorCallback, compare_callback}; pub struct BlockBasedOptions { - inner: rocksdb_ffi::RocksDBBlockBasedTableOptions, + inner: rocksdb_ffi::DBBlockBasedTableOptions, } pub struct Options { - pub inner: rocksdb_ffi::RocksDBOptions, + pub inner: rocksdb_ffi::DBOptions, } impl Drop for Options { @@ -50,7 +50,7 @@ impl Drop for BlockBasedOptions { impl BlockBasedOptions { pub fn new() -> BlockBasedOptions { let block_opts = unsafe {rocksdb_ffi::rocksdb_block_based_options_create() }; - let rocksdb_ffi::RocksDBBlockBasedTableOptions(opt_ptr) = block_opts; + let rocksdb_ffi::DBBlockBasedTableOptions(opt_ptr) = block_opts; if opt_ptr.is_null() { panic!("Could not create rocksdb block based options".to_string()); } @@ -65,21 +65,21 @@ impl BlockBasedOptions { } //TODO figure out how to create these in a Rusty way - ////pub fn set_filter(&mut self, filter: rocksdb_ffi::RocksDBFilterPolicy) { + ////pub fn set_filter(&mut self, filter: rocksdb_ffi::DBFilterPolicy) { //// unsafe { //// rocksdb_ffi::rocksdb_block_based_options_set_filter_policy( //// self.inner, filter); //// } ////} - ////pub fn set_cache(&mut self, cache: rocksdb_ffi::RocksDBCache) { + ////pub fn set_cache(&mut self, cache: rocksdb_ffi::DBCache) { //// unsafe { //// rocksdb_ffi::rocksdb_block_based_options_set_block_cache( //// self.inner, cache); //// } ////} - ////pub fn set_cache_compressed(&mut self, cache: rocksdb_ffi::RocksDBCache) { + ////pub fn set_cache_compressed(&mut self, cache: rocksdb_ffi::DBCache) { //// unsafe { //// rocksdb_ffi::rocksdb_block_based_options_set_block_cache_compressed( //// self.inner, cache); @@ -92,7 +92,7 @@ impl Options { pub fn new() -> Options { unsafe { let opts = rocksdb_ffi::rocksdb_options_create(); - let rocksdb_ffi::RocksDBOptions(opt_ptr) = opts; + let rocksdb_ffi::DBOptions(opt_ptr) = opts; if opt_ptr.is_null() { panic!("Could not create rocksdb options".to_string()); } @@ -258,7 +258,7 @@ impl Options { } } - pub fn set_compaction_style(&mut self, style: rocksdb_ffi::RocksDBCompactionStyle) { + pub fn set_compaction_style(&mut self, style: rocksdb_ffi::DBCompactionStyle) { unsafe { rocksdb_ffi::rocksdb_options_set_compaction_style( self.inner, style); diff --git a/test/test.rs b/test/test.rs index 5a6aca5..20a1b69 100644 --- a/test/test.rs +++ b/test/test.rs @@ -2,3 +2,4 @@ extern crate rocksdb; mod test_iterator; mod test_multithreaded; +mod test_column_family; diff --git a/test/test_column_family.rs b/test/test_column_family.rs new file mode 100644 index 0000000..0a20354 --- /dev/null +++ b/test/test_column_family.rs @@ -0,0 +1,133 @@ +/* + Copyright 2014 Tyler Neely + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +use rocksdb::{Options, DB, DBResult, Writable, Direction, MergeOperands}; + +#[test] +pub fn test_column_family() { + let path = "_rust_rocksdb_cftest"; + + // should be able to create column families + { + let mut opts = Options::new(); + opts.create_if_missing(true); + opts.add_merge_operator("test operator", test_provided_merge); + let mut db = DB::open(&opts, path).unwrap(); + let opts = Options::new(); + match db.create_cf("cf1", &opts) { + Ok(_) => println!("cf1 created successfully"), + Err(e) => { + panic!("could not create column family: {}", e); + }, + } + } + + // should fail to open db without specifying same column families + { + let mut opts = Options::new(); + opts.add_merge_operator("test operator", test_provided_merge); + match DB::open(&opts, path) { + Ok(_) => panic!("should not have opened DB successfully without specifying column + families"), + Err(e) => assert!(e.starts_with("Invalid argument: You have to open all column families.")), + } + } + + // should properly open db when specyfing all column families + { + let mut opts = Options::new(); + opts.add_merge_operator("test operator", test_provided_merge); + match DB::open_cf(&opts, path, &["cf1"]) { + Ok(_) => println!("successfully opened db with column family"), + Err(e) => panic!("failed to open db with column family: {}", e), + } + } + // TODO should be able to write, read, merge, batch, and iterate over a cf + { + let mut opts = Options::new(); + opts.add_merge_operator("test operator", test_provided_merge); + let mut db = match DB::open_cf(&opts, path, &["cf1"]) { + Ok(db) => { + println!("successfully opened db with column family"); + db + }, + Err(e) => panic!("failed to open db with column family: {}", e), + }; + let cf1 = *db.cf_handle("cf1").unwrap(); + assert!(db.put_cf(cf1, b"k1", b"v1").is_ok()); + assert!(db.get_cf(cf1, b"k1").unwrap().to_utf8().unwrap() == "v1"); + let p = db.put_cf(cf1, b"k1", b"a"); + assert!(p.is_ok()); + db.merge_cf(cf1, b"k1", b"b"); + db.merge_cf(cf1, b"k1", b"c"); + db.merge_cf(cf1, b"k1", b"d"); + db.merge_cf(cf1, b"k1", b"efg"); + let m = db.merge_cf(cf1, b"k1", b"h"); + println!("m is {:?}", m); + // TODO assert!(m.is_ok()); + db.get(b"k1").map( |value| { + match value.to_utf8() { + Some(v) => + println!("retrieved utf8 value: {}", v), + None => + println!("did not read valid utf-8 out of the db"), + } + }).on_absent( || { println!("value not present!") }) + .on_error( |e| { println!("error reading value")}); //: {", e) }); + + let r = db.get_cf(cf1, b"k1"); + // TODO assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh"); + assert!(db.delete(b"k1").is_ok()); + assert!(db.get(b"k1").is_none()); + } + // TODO should be able to use writebatch ops with a cf + { + } + // TODO should be able to iterate over a cf + { + } + // should b able to drop a cf + { + let mut db = DB::open_cf(&Options::new(), path, &["cf1"]).unwrap(); + match db.drop_cf("cf1") { + Ok(_) => println!("cf1 successfully dropped."), + Err(e) => panic!("failed to drop column family: {}", e), + } + } + + assert!(DB::destroy(&Options::new(), path).is_ok()); +} + +fn test_provided_merge(new_key: &[u8], + existing_val: Option<&[u8]>, + mut operands: &mut MergeOperands) + -> Vec { + let nops = operands.size_hint().0; + let mut result: Vec = Vec::with_capacity(nops); + match existing_val { + Some(v) => { + for e in v { + result.push(*e); + } + }, + None => (), + } + for op in operands { + for e in op { + result.push(*e); + } + } + result +} diff --git a/test/test_iterator.rs b/test/test_iterator.rs index f5cb0c0..6cefa33 100644 --- a/test/test_iterator.rs +++ b/test/test_iterator.rs @@ -1,4 +1,4 @@ -use rocksdb::{Options, RocksDB, Writable, Direction}; +use rocksdb::{Options, DB, Writable, Direction}; use std; fn cba(input: &Box<[u8]>) -> Box<[u8]> { @@ -17,7 +17,7 @@ pub fn test_iterator() { let v2:Box<[u8]> = b"v2222".to_vec().into_boxed_slice(); let v3:Box<[u8]> = b"v3333".to_vec().into_boxed_slice(); let v4:Box<[u8]> = b"v4444".to_vec().into_boxed_slice(); - let mut db = RocksDB::open_default(path).unwrap(); + let mut db = DB::open_default(path).unwrap(); let p = db.put(&*k1, &*v1); assert!(p.is_ok()); let p = db.put(&*k2, &*v2); @@ -109,6 +109,6 @@ pub fn test_iterator() { } } let opts = Options::new(); - assert!(RocksDB::destroy(&opts, path).is_ok()); + assert!(DB::destroy(&opts, path).is_ok()); } diff --git a/test/test_multithreaded.rs b/test/test_multithreaded.rs index 0d240e0..674b29d 100644 --- a/test/test_multithreaded.rs +++ b/test/test_multithreaded.rs @@ -1,48 +1,51 @@ -use rocksdb::{Options, RocksDB, Writable, Direction, RocksDBResult}; +use rocksdb::{Options, DB, Writable, Direction, DBResult}; use std::thread::{self, Builder}; use std::sync::Arc; -const N: usize = 1000_000; +const N: usize = 100_000; #[test] pub fn test_multithreaded() { let path = "_rust_rocksdb_multithreadtest"; - let db = RocksDB::open_default(path).unwrap(); - let db = Arc::new(db); + { + let db = DB::open_default(path).unwrap(); + let db = Arc::new(db); - db.put(b"key", b"value1"); + db.put(b"key", b"value1"); - let db1 = db.clone(); - let j1 = thread::spawn(move|| { - for i in 1..N { - db1.put(b"key", b"value1"); - } - }); + let db1 = db.clone(); + let j1 = thread::spawn(move|| { + for i in 1..N { + db1.put(b"key", b"value1"); + } + }); - let db2 = db.clone(); - let j2 = thread::spawn(move|| { - for i in 1..N { - db2.put(b"key", b"value2"); - } - }); + let db2 = db.clone(); + let j2 = thread::spawn(move|| { + for i in 1..N { + db2.put(b"key", b"value2"); + } + }); - let db3 = db.clone(); - let j3 = thread::spawn(move|| { - for i in 1..N { - match db3.get(b"key") { - RocksDBResult::Some(v) => { - if &v[..] != b"value1" && &v[..] != b"value2" { + let db3 = db.clone(); + let j3 = thread::spawn(move|| { + for i in 1..N { + match db3.get(b"key") { + DBResult::Some(v) => { + if &v[..] != b"value1" && &v[..] != b"value2" { + assert!(false); + } + } + _ => { assert!(false); } } - _ => { - assert!(false); - } } - } - }); + }); - j1.join(); - j2.join(); - j3.join(); + j1.join(); + j2.join(); + j3.join(); + } + assert!(DB::destroy(&Options::new(), path).is_ok()); }