diff --git a/CHANGELOG.txt b/CHANGELOG.txt index 45d5725..ea2bd2d 100644 --- a/CHANGELOG.txt +++ b/CHANGELOG.txt @@ -1,7 +1,42 @@ Changelog ========= -0.8.2 (2017-12-28 +0.10.0 (2018-03-17) +~~~~~~~~~~~~~~~~~~~~ + * Bump rocksdb to 5.11.3 (spacejam) + + **New features** + + * Link with system rocksdb and snappy libs through envvars (ozkriff) + + **Breaking Changes** + + * Fix reverse iteration from a given key (ongardie) + +0.9.1 (2018-02-10) +~~~~~~~~~~~~~~~~~~~~ + **New features** + + * SliceTransform support (spacejam) + +0.9.0 (2018-02-10) +~~~~~~~~~~~~~~~~~~~~ + **New features** + + * Allow creating iterators over prefixes (glittershark) + + **Breaking Changes** + + * Open cfs with options (garyttierney, rrichardson) + * Non-Associative merge ops (rrichardson) + +0.8.3 (2018-02-10) +~~~~~~~~~~~~~~~~~~~~ + * Bump rocksdb to 5.10.2 (ongardie) + * Add Send marker to Options (iSynaptic) + * Expose advise_random_on_open option (ongardie) + +0.8.2 (2017-12-28) ~~~~~~~~~~~~~~~~~~~~ * Bump rocksdb to 5.7.1 (jquesnelle) @@ -24,33 +59,39 @@ Changelog 0.7 (2017-07-26) ~~~~~~~~~~~~~~~~~~~~ -**Breaking Changes** + **Breaking Changes** + * Bumped rocksdb to 5.4.6 (derekdreery) * Remove `use_direct_writes` now that `use_direct_io_for_flush_and_compaction` exists (derekdreery) -**New features** + **New features** + * ReadOptions is now public (rschmukler) * Implement Clone and AsRef for Error (daboross) * Support for `seek_for_prev` (kaedroho) * Support for DirectIO (kaedroho) -**Internal cleanups** + **Internal cleanups** + * Fixed race condition in tests (debris) * Move tests to the default `tests` directory (vmx) 0.6.1 (2017-03-13) ~~~~~~~~~~~~~~~~~~ -**New features** + **New features** + * Support for raw iterator access (kaedroho) 0.6 (2016-12-18) ~~~~~~~~~~~~~~~~~~~~ **Breaking changes** + * Comparator function now returns an Ordering (alexreg) **New features** + * Compaction filter (tmccombs) * Support for backups (alexreg) @@ -64,9 +105,9 @@ Changelog * All errors changed to use a new `rocksdb::Error` type (kaedroho, alexreg) * Removed `Options.set_filter_deletes` as it was removed in RocksDB (kaedroho) * Renamed `add_merge_operator` to `set_merge_operator` and `add_comparator` to `set_comparator` (kaedroho) - + **New features** - + * Windows support (development by jsgf and arkpar. ported by kaedroho) * The RocksDB library is now built at crate compile-time and statically linked with the resulting binary (development by jsgf and arkpar. ported by kaedroho) * Cleaned up and improved coverage and tests of the ffi module (alexreg) diff --git a/Cargo.toml b/Cargo.toml index 4314f8e..8ebecae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "rocksdb" description = "Rust wrapper for Facebook's RocksDB embeddable database" -version = "0.8.2" +version = "0.10.0" authors = ["Tyler Neely ", "David Greenberg "] license = "Apache-2.0" keywords = ["database", "embedded", "LSM-tree", "persistence"] @@ -19,4 +19,4 @@ valgrind = [] [dependencies] libc = "0.2" -librocksdb-sys = { path = "librocksdb-sys", version = "5.7.1" } +librocksdb-sys = { path = "librocksdb-sys", version = "5.11.3" } diff --git a/LICENSE b/LICENSE index 08f07f5..f047845 100644 --- a/LICENSE +++ b/LICENSE @@ -187,6 +187,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. + Copyright 2014 Tyler Neely Copyright 2015 Tyler Neely Copyright 2016 Tyler Neely Copyright 2017 Tyler Neely diff --git a/librocksdb-sys/Cargo.toml b/librocksdb-sys/Cargo.toml index 6cf468e..d28b889 100644 --- a/librocksdb-sys/Cargo.toml +++ b/librocksdb-sys/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "librocksdb-sys" -version = "5.7.1" +version = "5.11.3" authors = ["Karl Hobley ", "Arkadiy Paronyan "] license = "MIT/Apache-2.0/BSD-3-Clause" description = "Native bindings to librocksdb" @@ -22,6 +22,6 @@ libc = "0.2" const-cstr = "0.2" [build-dependencies] -cc = { version = "1.0", features = ["parallel"] } +cc = { version = "^1.0", features = ["parallel"] } make-cmd = "0.1" bindgen = "0.29" diff --git a/librocksdb-sys/build.rs b/librocksdb-sys/build.rs index 3ac2dcc..1f19287 100644 --- a/librocksdb-sys/build.rs +++ b/librocksdb-sys/build.rs @@ -29,10 +29,7 @@ fn fail_on_empty_directory(name: &str) { } } -fn build_rocksdb() { - println!("cargo:rerun-if-changed=build.rs"); - println!("cargo:rerun-if-changed=rocksdb/"); - +fn bindgen_rocksdb() { let bindings = bindgen::Builder::default() .header("rocksdb/include/rocksdb/c.h") .hide_type("max_align_t") // https://github.com/rust-lang-nursery/rust-bindgen/issues/550 @@ -44,7 +41,9 @@ fn build_rocksdb() { bindings .write_to_file(out_path.join("bindings.rs")) .expect("unable to write rocksdb bindings"); +} +fn build_rocksdb() { let mut config = cc::Build::new(); config.include("rocksdb/include/"); config.include("rocksdb/"); @@ -148,9 +147,32 @@ fn build_snappy() { config.compile("libsnappy.a"); } +fn try_to_find_and_link_lib(lib_name: &str) -> bool { + if let Ok(lib_dir) = env::var(&format!("{}_LIB_DIR", lib_name)) { + println!("cargo:rustc-link-search=native={}", lib_dir); + let mode = match env::var_os(&format!("{}_STATIC", lib_name)) { + Some(_) => "static", + None => "dylib", + }; + println!("cargo:rustc-link-lib={}={}", mode, lib_name.to_lowercase()); + return true; + } + false +} + fn main() { + println!("cargo:rerun-if-changed=build.rs"); + println!("cargo:rerun-if-changed=rocksdb/"); + println!("cargo:rerun-if-changed=snappy/"); + fail_on_empty_directory("rocksdb"); fail_on_empty_directory("snappy"); - build_rocksdb(); - build_snappy(); + bindgen_rocksdb(); + + if !try_to_find_and_link_lib("ROCKSDB") { + build_rocksdb(); + } + if !try_to_find_and_link_lib("SNAPPY") { + build_snappy(); + } } diff --git a/librocksdb-sys/rocksdb b/librocksdb-sys/rocksdb index acf935e..dbd8fa0 160000 --- a/librocksdb-sys/rocksdb +++ b/librocksdb-sys/rocksdb @@ -1 +1 @@ -Subproject commit acf935e40f9d6f4c3d13c7d310def7064c1f1c95 +Subproject commit dbd8fa09b823826dd2a30bc119dad7a6fa9a4c6d diff --git a/librocksdb-sys/rocksdb_lib_sources.txt b/librocksdb-sys/rocksdb_lib_sources.txt index 66a2999..886b530 100644 --- a/librocksdb-sys/rocksdb_lib_sources.txt +++ b/librocksdb-sys/rocksdb_lib_sources.txt @@ -1 +1 @@ -cache/clock_cache.cc cache/lru_cache.cc cache/sharded_cache.cc db/builder.cc db/c.cc db/column_family.cc db/compacted_db_impl.cc db/compaction.cc db/compaction_iterator.cc db/compaction_job.cc db/compaction_picker.cc db/compaction_picker_universal.cc db/convenience.cc db/db_filesnapshot.cc db/db_impl.cc db/db_impl_write.cc db/db_impl_compaction_flush.cc db/db_impl_files.cc db/db_impl_open.cc db/db_impl_debug.cc db/db_impl_experimental.cc db/db_impl_readonly.cc db/db_info_dumper.cc db/db_iter.cc db/dbformat.cc db/event_helpers.cc db/experimental.cc db/external_sst_file_ingestion_job.cc db/file_indexer.cc db/flush_job.cc db/flush_scheduler.cc db/forward_iterator.cc db/internal_stats.cc db/log_reader.cc db/log_writer.cc db/malloc_stats.cc db/managed_iterator.cc db/memtable.cc db/memtable_list.cc db/merge_helper.cc db/merge_operator.cc db/range_del_aggregator.cc db/repair.cc db/snapshot_impl.cc db/table_cache.cc db/table_properties_collector.cc db/transaction_log_impl.cc db/version_builder.cc db/version_edit.cc db/version_set.cc db/wal_manager.cc db/write_batch.cc db/write_batch_base.cc db/write_controller.cc db/write_thread.cc env/env.cc env/env_chroot.cc env/env_encryption.cc env/env_hdfs.cc env/env_posix.cc env/io_posix.cc env/mock_env.cc memtable/alloc_tracker.cc memtable/hash_cuckoo_rep.cc memtable/hash_linklist_rep.cc memtable/hash_skiplist_rep.cc memtable/skiplistrep.cc memtable/vectorrep.cc memtable/write_buffer_manager.cc monitoring/histogram.cc monitoring/histogram_windowing.cc monitoring/instrumented_mutex.cc monitoring/iostats_context.cc monitoring/perf_context.cc monitoring/perf_level.cc monitoring/statistics.cc monitoring/thread_status_impl.cc monitoring/thread_status_updater.cc monitoring/thread_status_updater_debug.cc monitoring/thread_status_util.cc monitoring/thread_status_util_debug.cc options/cf_options.cc options/db_options.cc options/options.cc options/options_helper.cc options/options_parser.cc options/options_sanity_check.cc port/port_posix.cc port/stack_trace.cc table/adaptive_table_factory.cc table/block.cc table/block_based_filter_block.cc table/block_based_table_builder.cc table/block_based_table_factory.cc table/block_based_table_reader.cc table/block_builder.cc table/block_prefix_index.cc table/bloom_block.cc table/cuckoo_table_builder.cc table/cuckoo_table_factory.cc table/cuckoo_table_reader.cc table/flush_block_policy.cc table/format.cc table/full_filter_block.cc table/get_context.cc table/index_builder.cc table/iterator.cc table/merging_iterator.cc table/meta_blocks.cc table/partitioned_filter_block.cc table/persistent_cache_helper.cc table/plain_table_builder.cc table/plain_table_factory.cc table/plain_table_index.cc table/plain_table_key_coding.cc table/plain_table_reader.cc table/sst_file_writer.cc table/table_properties.cc table/two_level_iterator.cc tools/dump/db_dump_tool.cc util/arena.cc util/auto_roll_logger.cc util/bloom.cc util/build_version.cc util/coding.cc util/compaction_job_stats_impl.cc util/comparator.cc util/concurrent_arena.cc util/crc32c.cc util/delete_scheduler.cc util/dynamic_bloom.cc util/event_logger.cc util/file_reader_writer.cc util/file_util.cc util/filename.cc util/filter_policy.cc util/hash.cc util/log_buffer.cc util/murmurhash.cc util/random.cc util/rate_limiter.cc util/slice.cc util/sst_file_manager_impl.cc util/status.cc util/status_message.cc util/string_util.cc util/sync_point.cc util/thread_local.cc util/threadpool_imp.cc util/transaction_test_util.cc util/xxhash.cc utilities/backupable/backupable_db.cc utilities/blob_db/blob_db.cc utilities/blob_db/blob_db_impl.cc utilities/blob_db/blob_file.cc utilities/blob_db/blob_log_reader.cc utilities/blob_db/blob_log_writer.cc utilities/blob_db/blob_log_format.cc utilities/blob_db/ttl_extractor.cc utilities/cassandra/cassandra_compaction_filter.cc utilities/cassandra/format.cc utilities/cassandra/merge_operator.cc utilities/checkpoint/checkpoint_impl.cc utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc utilities/convenience/info_log_finder.cc utilities/date_tiered/date_tiered_db_impl.cc utilities/debug.cc utilities/document/document_db.cc utilities/document/json_document.cc utilities/document/json_document_builder.cc utilities/env_mirror.cc utilities/env_timed.cc utilities/geodb/geodb_impl.cc utilities/leveldb_options/leveldb_options.cc utilities/lua/rocks_lua_compaction_filter.cc utilities/memory/memory_util.cc utilities/merge_operators/max.cc utilities/merge_operators/put.cc utilities/merge_operators/string_append/stringappend.cc utilities/merge_operators/string_append/stringappend2.cc utilities/merge_operators/uint64add.cc utilities/option_change_migration/option_change_migration.cc utilities/options/options_util.cc utilities/persistent_cache/block_cache_tier.cc utilities/persistent_cache/block_cache_tier_file.cc utilities/persistent_cache/block_cache_tier_metadata.cc utilities/persistent_cache/persistent_cache_tier.cc utilities/persistent_cache/volatile_tier_impl.cc utilities/redis/redis_lists.cc utilities/simulator_cache/sim_cache.cc utilities/spatialdb/spatial_db.cc utilities/table_properties_collectors/compact_on_deletion_collector.cc utilities/transactions/optimistic_transaction_db_impl.cc utilities/transactions/optimistic_transaction.cc utilities/transactions/transaction_base.cc utilities/transactions/pessimistic_transaction_db.cc utilities/transactions/transaction_db_mutex_impl.cc utilities/transactions/pessimistic_transaction.cc utilities/transactions/transaction_lock_mgr.cc utilities/transactions/transaction_util.cc utilities/transactions/write_prepared_txn.cc utilities/ttl/db_ttl_impl.cc utilities/write_batch_with_index/write_batch_with_index.cc utilities/write_batch_with_index/write_batch_with_index_internal.cc \ No newline at end of file +cache/clock_cache.cc cache/lru_cache.cc cache/sharded_cache.cc db/builder.cc db/c.cc db/column_family.cc db/compacted_db_impl.cc db/compaction.cc db/compaction_iterator.cc db/compaction_job.cc db/compaction_picker.cc db/compaction_picker_universal.cc db/convenience.cc db/db_filesnapshot.cc db/db_impl.cc db/db_impl_compaction_flush.cc db/db_impl_debug.cc db/db_impl_experimental.cc db/db_impl_files.cc db/db_impl_open.cc db/db_impl_readonly.cc db/db_impl_write.cc db/db_info_dumper.cc db/db_iter.cc db/dbformat.cc db/event_helpers.cc db/experimental.cc db/external_sst_file_ingestion_job.cc db/file_indexer.cc db/flush_job.cc db/flush_scheduler.cc db/forward_iterator.cc db/internal_stats.cc db/log_reader.cc db/log_writer.cc db/malloc_stats.cc db/managed_iterator.cc db/memtable.cc db/memtable_list.cc db/merge_helper.cc db/merge_operator.cc db/range_del_aggregator.cc db/repair.cc db/snapshot_impl.cc db/table_cache.cc db/table_properties_collector.cc db/transaction_log_impl.cc db/version_builder.cc db/version_edit.cc db/version_set.cc db/wal_manager.cc db/write_batch.cc db/write_batch_base.cc db/write_controller.cc db/write_thread.cc env/env.cc env/env_chroot.cc env/env_encryption.cc env/env_hdfs.cc env/env_posix.cc env/io_posix.cc env/mock_env.cc memtable/alloc_tracker.cc memtable/hash_cuckoo_rep.cc memtable/hash_linklist_rep.cc memtable/hash_skiplist_rep.cc memtable/skiplistrep.cc memtable/vectorrep.cc memtable/write_buffer_manager.cc monitoring/histogram.cc monitoring/histogram_windowing.cc monitoring/instrumented_mutex.cc monitoring/iostats_context.cc monitoring/perf_context.cc monitoring/perf_level.cc monitoring/statistics.cc monitoring/thread_status_impl.cc monitoring/thread_status_updater.cc monitoring/thread_status_updater_debug.cc monitoring/thread_status_util.cc monitoring/thread_status_util_debug.cc options/cf_options.cc options/db_options.cc options/options.cc options/options_helper.cc options/options_parser.cc options/options_sanity_check.cc port/port_posix.cc port/stack_trace.cc table/adaptive_table_factory.cc table/block.cc table/block_based_filter_block.cc table/block_based_table_builder.cc table/block_based_table_factory.cc table/block_based_table_reader.cc table/block_builder.cc table/block_fetcher.cc table/block_prefix_index.cc table/bloom_block.cc table/cuckoo_table_builder.cc table/cuckoo_table_factory.cc table/cuckoo_table_reader.cc table/flush_block_policy.cc table/format.cc table/full_filter_block.cc table/get_context.cc table/index_builder.cc table/iterator.cc table/merging_iterator.cc table/meta_blocks.cc table/partitioned_filter_block.cc table/persistent_cache_helper.cc table/plain_table_builder.cc table/plain_table_factory.cc table/plain_table_index.cc table/plain_table_key_coding.cc table/plain_table_reader.cc table/sst_file_writer.cc table/table_properties.cc table/two_level_iterator.cc tools/dump/db_dump_tool.cc util/arena.cc util/auto_roll_logger.cc util/bloom.cc util/build_version.cc util/coding.cc util/compaction_job_stats_impl.cc util/comparator.cc util/concurrent_arena.cc util/crc32c.cc util/delete_scheduler.cc util/dynamic_bloom.cc util/event_logger.cc util/file_reader_writer.cc util/file_util.cc util/filename.cc util/filter_policy.cc util/hash.cc util/log_buffer.cc util/murmurhash.cc util/random.cc util/rate_limiter.cc util/slice.cc util/sst_file_manager_impl.cc util/status.cc util/status_message.cc util/string_util.cc util/sync_point.cc util/thread_local.cc util/threadpool_imp.cc util/transaction_test_util.cc util/xxhash.cc utilities/backupable/backupable_db.cc utilities/blob_db/blob_db.cc utilities/blob_db/blob_db_impl.cc utilities/blob_db/blob_file.cc utilities/blob_db/blob_log_format.cc utilities/blob_db/blob_log_reader.cc utilities/blob_db/blob_log_writer.cc utilities/blob_db/ttl_extractor.cc utilities/cassandra/cassandra_compaction_filter.cc utilities/cassandra/format.cc utilities/cassandra/merge_operator.cc utilities/checkpoint/checkpoint_impl.cc utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc utilities/convenience/info_log_finder.cc utilities/date_tiered/date_tiered_db_impl.cc utilities/debug.cc utilities/document/document_db.cc utilities/document/json_document.cc utilities/document/json_document_builder.cc utilities/env_mirror.cc utilities/env_timed.cc utilities/geodb/geodb_impl.cc utilities/leveldb_options/leveldb_options.cc utilities/lua/rocks_lua_compaction_filter.cc utilities/memory/memory_util.cc utilities/merge_operators/max.cc utilities/merge_operators/put.cc utilities/merge_operators/string_append/stringappend.cc utilities/merge_operators/string_append/stringappend2.cc utilities/merge_operators/uint64add.cc utilities/merge_operators/bytesxor.cc utilities/option_change_migration/option_change_migration.cc utilities/options/options_util.cc utilities/persistent_cache/block_cache_tier.cc utilities/persistent_cache/block_cache_tier_file.cc utilities/persistent_cache/block_cache_tier_metadata.cc utilities/persistent_cache/persistent_cache_tier.cc utilities/persistent_cache/volatile_tier_impl.cc utilities/redis/redis_lists.cc utilities/simulator_cache/sim_cache.cc utilities/spatialdb/spatial_db.cc utilities/table_properties_collectors/compact_on_deletion_collector.cc utilities/transactions/optimistic_transaction.cc utilities/transactions/optimistic_transaction_db_impl.cc utilities/transactions/pessimistic_transaction.cc utilities/transactions/pessimistic_transaction_db.cc utilities/transactions/snapshot_checker.cc utilities/transactions/transaction_base.cc utilities/transactions/transaction_db_mutex_impl.cc utilities/transactions/transaction_lock_mgr.cc utilities/transactions/transaction_util.cc utilities/transactions/write_prepared_txn.cc utilities/transactions/write_prepared_txn_db.cc utilities/ttl/db_ttl_impl.cc utilities/write_batch_with_index/write_batch_with_index.cc utilities/write_batch_with_index/write_batch_with_index_internal.cc \ No newline at end of file diff --git a/src/db.rs b/src/db.rs index 256f12f..a26fae9 100644 --- a/src/db.rs +++ b/src/db.rs @@ -14,7 +14,7 @@ // -use {DB, Error, Options, WriteOptions, ColumnFamily}; +use {DB, Error, Options, WriteOptions, ColumnFamily, ColumnFamilyDescriptor}; use ffi; use ffi_util::opt_bytes_to_ptr; @@ -466,10 +466,13 @@ impl DBIterator { self.raw.seek_to_last(); self.direction = Direction::Reverse; } - IteratorMode::From(key, dir) => { - // TODO: Should use seek_for_prev when reversing + IteratorMode::From(key, Direction::Forward) => { self.raw.seek(key); - self.direction = dir; + self.direction = Direction::Forward; + } + IteratorMode::From(key, Direction::Reverse) => { + self.raw.seek_for_prev(key); + self.direction = Direction::Reverse; } }; @@ -572,6 +575,16 @@ impl<'a> Drop for Snapshot<'a> { } } +impl ColumnFamilyDescriptor { + // Create a new column family descriptor with the specified name and options. + pub fn new(name: S, options: Options) -> Self where S: Into { + ColumnFamilyDescriptor { + name: name.into(), + options + } + } +} + impl DB { /// Open a database with default options. pub fn open_default>(path: P) -> Result { @@ -585,14 +598,17 @@ impl DB { DB::open_cf(opts, path, &[]) } - /// Open a database with specified options and column family. - /// - /// A column family must be created first by calling `DB::create_cf`. + /// Open a database with the given database options and column family names. /// - /// # Panics - /// - /// * Panics if the column family doesn't exist. + /// Column families opened using this function will be created with default `Options`. pub fn open_cf>(opts: &Options, path: P, cfs: &[&str]) -> Result { + let cfs_v = cfs.to_vec().iter().map(|name| ColumnFamilyDescriptor::new(*name, Options::default())).collect(); + + DB::open_cf_descriptors(opts, path, cfs_v) + } + + /// Open a database with the given database options and column family names/options. + pub fn open_cf_descriptors>(opts: &Options, path: P, cfs: Vec) -> Result { let path = path.as_ref(); let cpath = match CString::new(path.to_string_lossy().as_bytes()) { Ok(c) => c, @@ -621,17 +637,19 @@ impl DB { db = ffi_try!(ffi::rocksdb_open(opts.inner, cpath.as_ptr() as *const _,)); } } else { - let mut cfs_v = cfs.to_vec(); + let mut cfs_v = cfs; // Always open the default column family. - if !cfs_v.contains(&"default") { - cfs_v.push("default"); + if !cfs_v.iter().any(|cf| cf.name == "default") { + cfs_v.push(ColumnFamilyDescriptor { + name: String::from("default"), + options: Options::default() + }); } - // We need to store our CStrings in an intermediate vector // so that their pointers remain valid. let c_cfs: Vec = cfs_v .iter() - .map(|cf| CString::new(cf.as_bytes()).unwrap()) + .map(|cf| CString::new(cf.name.as_bytes()).unwrap()) .collect(); let mut cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect(); @@ -639,10 +657,8 @@ impl DB { // These handles will be populated by DB. let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect(); - // TODO(tyler) allow options to be passed in. - let mut cfopts: Vec<_> = cfs_v - .iter() - .map(|_| unsafe { ffi::rocksdb_options_create() as *const _ }) + let mut cfopts: Vec<_> = cfs_v.iter() + .map(|cf| cf.options.inner as *const _) .collect(); unsafe { @@ -666,7 +682,7 @@ impl DB { } for (n, h) in cfs_v.iter().zip(cfhandles) { - cf_map.insert(n.to_string(), ColumnFamily { inner: h }); + cf_map.insert(n.name.clone(), ColumnFamily { inner: h }); } } @@ -872,6 +888,12 @@ impl DB { DBIterator::new(self, &opts, mode) } + pub fn prefix_iterator<'a>(&self, prefix: &'a [u8]) -> DBIterator { + let mut opts = ReadOptions::default(); + opts.set_prefix_same_as_start(true); + DBIterator::new(self, &opts, IteratorMode::From(prefix, Direction::Forward)) + } + pub fn iterator_cf( &self, cf_handle: ColumnFamily, @@ -881,6 +903,16 @@ impl DB { DBIterator::new_cf(self, cf_handle, &opts, mode) } + pub fn prefix_iterator_cf<'a>( + &self, + cf_handle: ColumnFamily, + prefix: &'a [u8] + ) -> Result { + let mut opts = ReadOptions::default(); + opts.set_prefix_same_as_start(true); + DBIterator::new_cf(self, cf_handle, &opts, IteratorMode::From(prefix, Direction::Forward)) + } + pub fn raw_iterator(&self) -> DBRawIterator { let opts = ReadOptions::default(); DBRawIterator::new(self, &opts) @@ -1202,6 +1234,18 @@ impl ReadOptions { ); } } + + pub fn set_prefix_same_as_start(&mut self, v: bool) { + unsafe { + ffi::rocksdb_readoptions_set_prefix_same_as_start(self.inner, v as c_uchar) + } + } + + pub fn set_total_order_seek(&mut self, v:bool) { + unsafe { + ffi::rocksdb_readoptions_set_total_order_seek(self.inner, v as c_uchar) + } + } } impl Default for ReadOptions { diff --git a/src/db_options.rs b/src/db_options.rs index 0e13e99..716b6f7 100644 --- a/src/db_options.rs +++ b/src/db_options.rs @@ -11,20 +11,20 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -// +use std::ffi::{CStr, CString}; +use std::mem; + +use libc::{self, c_int, c_uchar, c_uint, c_void, size_t, uint64_t}; -use {BlockBasedOptions, DBCompactionStyle, DBCompressionType, DBRecoveryMode, Options, - WriteOptions}; +use ffi; +use {BlockBasedOptions, DBCompactionStyle, DBCompressionType, DBRecoveryMode, + Options, WriteOptions}; use compaction_filter::{self, CompactionFilterCallback, CompactionFilterFn, filter_callback}; use comparator::{self, ComparatorCallback, CompareFn}; -use ffi; - -use libc::{self, c_int, c_uchar, c_uint, c_void, size_t, uint64_t}; use merge_operator::{self, MergeFn, MergeOperatorCallback, full_merge_callback, partial_merge_callback}; -use std::ffi::{CStr, CString}; -use std::mem; +use slice_transform::SliceTransform; pub fn new_cache(capacity: size_t) -> *mut ffi::rocksdb_cache_t { unsafe { ffi::rocksdb_cache_create_lru(capacity) } @@ -149,6 +149,25 @@ impl Options { } } + /// If true, any column families that didn't exist when opening the database + /// will be created. + /// + /// Default: `false` + /// + /// # Example + /// + /// ``` + /// use rocksdb::Options; + /// + /// let mut opts = Options::default(); + /// opts.create_missing_column_families(true); + /// ``` + pub fn create_missing_column_families(&mut self, create_missing_cfs: bool) { + unsafe { + ffi::rocksdb_options_set_create_missing_column_families(self.inner, create_missing_cfs as c_uchar); + } + } + /// Sets the compression algorithm that will be used for the bottommost level that /// contain files. If level-compaction is used, this option will only affect /// levels after base level. @@ -202,10 +221,14 @@ impl Options { } } - pub fn set_merge_operator(&mut self, name: &str, merge_fn: MergeFn) { + pub fn set_merge_operator(&mut self, name: &str, + full_merge_fn: MergeFn, + partial_merge_fn: Option) { + let cb = Box::new(MergeOperatorCallback { name: CString::new(name.as_bytes()).unwrap(), - merge_fn: merge_fn, + full_merge_fn: full_merge_fn, + partial_merge_fn: partial_merge_fn.unwrap_or(full_merge_fn), }); unsafe { @@ -224,7 +247,7 @@ impl Options { #[deprecated(since = "0.5.0", note = "add_merge_operator has been renamed to set_merge_operator")] pub fn add_merge_operator(&mut self, name: &str, merge_fn: MergeFn) { - self.set_merge_operator(name, merge_fn); + self.set_merge_operator(name, merge_fn, None); } /// Sets a compaction filter used to determine if entries should be kept, changed, @@ -280,6 +303,14 @@ impl Options { } } + pub fn set_prefix_extractor(&mut self, prefix_extractor: SliceTransform) { + unsafe { + ffi::rocksdb_options_set_prefix_extractor( + self.inner, prefix_extractor.inner + ) + } + } + #[deprecated(since = "0.5.0", note = "add_comparator has been renamed to set_comparator")] pub fn add_comparator(&mut self, name: &str, compare_fn: CompareFn) { self.set_comparator(name, compare_fn); @@ -932,6 +963,17 @@ impl Options { } } + /// When set to true, reading SST files will opt out of the filesystem's + /// readahead. Setting this to false may improve sequential iteration + /// performance. + /// + /// Default: `true` + pub fn set_advise_random_on_open(&mut self, advise: bool) { + unsafe { + ffi::rocksdb_options_set_advise_random_on_open(self.inner, advise as c_uchar) + } + } + /// Sets the number of levels for this database. pub fn set_num_levels(&mut self, n: c_int) { unsafe { diff --git a/src/lib.rs b/src/lib.rs index d2f7394..f982702 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,6 +30,21 @@ //! db.delete(b"my key").unwrap(); //! ``` //! +//! Opening a database and a single column family with custom options: +//! +//! ``` +//! use rocksdb::{DB, ColumnFamilyDescriptor, Options}; +//! let mut cf_opts = Options::default(); +//! cf_opts.set_max_write_buffer_number(16); +//! let cf = ColumnFamilyDescriptor::new("cf1", cf_opts); +//! +//! let mut db_opts = Options::default(); +//! db_opts.create_missing_column_families(true); +//! db_opts.create_if_missing(true); +//! +//! let db = DB::open_cf_descriptors(&db_opts, "path/for/rocksdb/storage_with_cfs", vec![cf]).unwrap(); +//! ``` +//! extern crate libc; extern crate librocksdb_sys as ffi; @@ -43,12 +58,15 @@ pub mod merge_operator; pub mod compaction_filter; mod db; mod db_options; +mod slice_transform; pub use compaction_filter::Decision as CompactionDecision; pub use db::{DBCompactionStyle, DBCompressionType, DBIterator, DBRawIterator, DBRecoveryMode, DBVector, ReadOptions, Direction, IteratorMode, Snapshot, WriteBatch, new_bloom_filter}; +pub use slice_transform::SliceTransform; + pub use merge_operator::MergeOperands; use std::collections::BTreeMap; use std::error; @@ -64,6 +82,14 @@ pub struct DB { path: PathBuf, } +/// A descriptor for a RocksDB column family. +/// +/// A description of the column family, containing the name and `Options`. +pub struct ColumnFamilyDescriptor { + name: String, + options: Options, +} + /// A simple wrapper round a string, used for errors reported from /// ffi calls. #[derive(Debug, Clone, PartialEq)] @@ -173,6 +199,7 @@ pub struct WriteOptions { inner: *mut ffi::rocksdb_writeoptions_t, } + /// An opaque type used to represent a column family. Returned from some functions, and used /// in others #[derive(Copy, Clone)] diff --git a/src/merge_operator.rs b/src/merge_operator.rs index 6085a1e..ef86323 100644 --- a/src/merge_operator.rs +++ b/src/merge_operator.rs @@ -21,7 +21,7 @@ //! fn concat_merge(new_key: &[u8], //! existing_val: Option<&[u8]>, //! operands: &mut MergeOperands) -//! -> Vec { +//! -> Option> { //! //! let mut result: Vec = Vec::with_capacity(operands.size_hint().0); //! existing_val.map(|v| { @@ -34,14 +34,14 @@ //! result.push(*e) //! } //! } -//! result +//! Some(result) //! } //! //! fn main() { //! let path = "path/to/rocksdb"; //! let mut opts = Options::default(); //! opts.create_if_missing(true); -//! opts.add_merge_operator("test operator", concat_merge); +//! opts.set_merge_operator("test operator", concat_merge, None); //! let db = DB::open(&opts, path).unwrap(); //! let p = db.put(b"k1", b"a"); //! db.merge(b"k1", b"b"); @@ -60,183 +60,397 @@ use std::mem; use std::ptr; use std::slice; -pub type MergeFn = fn(&[u8], Option<&[u8]>, &mut MergeOperands) -> Vec; +pub type MergeFn = fn(&[u8], Option<&[u8]>, &mut MergeOperands) -> Option>; pub struct MergeOperatorCallback { - pub name: CString, - pub merge_fn: MergeFn, + pub name: CString, + pub full_merge_fn: MergeFn, + pub partial_merge_fn: MergeFn, } pub unsafe extern "C" fn destructor_callback(raw_cb: *mut c_void) { - let _: Box = mem::transmute(raw_cb); + let _: Box = mem::transmute(raw_cb); } pub unsafe extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char { - let cb = &mut *(raw_cb as *mut MergeOperatorCallback); - cb.name.as_ptr() + let cb = &mut *(raw_cb as *mut MergeOperatorCallback); + cb.name.as_ptr() } pub unsafe extern "C" fn full_merge_callback( - raw_cb: *mut c_void, - raw_key: *const c_char, - key_len: size_t, - existing_value: *const c_char, - existing_value_len: size_t, - operands_list: *const *const c_char, - operands_list_len: *const size_t, - num_operands: c_int, - success: *mut u8, - new_value_length: *mut size_t, -) -> *mut c_char { - let cb = &mut *(raw_cb as *mut MergeOperatorCallback); - let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); - let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize); - let oldval = slice::from_raw_parts(existing_value as *const u8, existing_value_len as usize); - let mut result = (cb.merge_fn)(key, Some(oldval), operands); - result.shrink_to_fit(); - // TODO(tan) investigate zero-copy techniques to improve performance - let buf = libc::malloc(result.len() as size_t); - assert!(!buf.is_null()); - *new_value_length = result.len() as size_t; - *success = 1 as u8; - ptr::copy(result.as_ptr() as *mut c_void, &mut *buf, result.len()); - buf as *mut c_char + raw_cb: *mut c_void, + raw_key: *const c_char, + key_len: size_t, + existing_value: *const c_char, + existing_value_len: size_t, + operands_list: *const *const c_char, + operands_list_len: *const size_t, + num_operands: c_int, + success: *mut u8, + new_value_length: *mut size_t, + ) -> *mut c_char { + let cb = &mut *(raw_cb as *mut MergeOperatorCallback); + let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); + let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize); + let oldval = + if existing_value == ptr::null() { + None + } else { + Some(slice::from_raw_parts(existing_value as *const u8, existing_value_len as usize)) + }; + if let Some(mut result) = (cb.full_merge_fn)(key, oldval, operands) { + result.shrink_to_fit(); + // TODO(tan) investigate zero-copy techniques to improve performance + let buf = libc::malloc(result.len() as size_t); + assert!(!buf.is_null()); + *new_value_length = result.len() as size_t; + *success = 1 as u8; + ptr::copy(result.as_ptr() as *mut c_void, &mut *buf, result.len()); + buf as *mut c_char + } else { + *success = 0 as u8; + ptr::null_mut() as *mut c_char + } } pub unsafe extern "C" fn partial_merge_callback( - raw_cb: *mut c_void, - raw_key: *const c_char, - key_len: size_t, - operands_list: *const *const c_char, - operands_list_len: *const size_t, - num_operands: c_int, - success: *mut u8, - new_value_length: *mut size_t, -) -> *mut c_char { - let cb = &mut *(raw_cb as *mut MergeOperatorCallback); - let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); - let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize); - let mut result = (cb.merge_fn)(key, None, operands); - result.shrink_to_fit(); - // TODO(tan) investigate zero-copy techniques to improve performance - let buf = libc::malloc(result.len() as size_t); - assert!(!buf.is_null()); - *new_value_length = result.len() as size_t; - *success = 1 as u8; - ptr::copy(result.as_ptr() as *mut c_void, &mut *buf, result.len()); - buf as *mut c_char + raw_cb: *mut c_void, + raw_key: *const c_char, + key_len: size_t, + operands_list: *const *const c_char, + operands_list_len: *const size_t, + num_operands: c_int, + success: *mut u8, + new_value_length: *mut size_t, + ) -> *mut c_char { + let cb = &mut *(raw_cb as *mut MergeOperatorCallback); + let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); + let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize); + if let Some(mut result) = (cb.partial_merge_fn)(key, None, operands) { + result.shrink_to_fit(); + // TODO(tan) investigate zero-copy techniques to improve performance + let buf = libc::malloc(result.len() as size_t); + assert!(!buf.is_null()); + *new_value_length = result.len() as size_t; + *success = 1 as u8; + ptr::copy(result.as_ptr() as *mut c_void, &mut *buf, result.len()); + buf as *mut c_char + } else { + *success = 0 as u8; + ptr::null_mut::() + } } pub struct MergeOperands { - operands_list: *const *const c_char, - operands_list_len: *const size_t, - num_operands: usize, - cursor: usize, + operands_list: *const *const c_char, + operands_list_len: *const size_t, + num_operands: usize, + cursor: usize, } impl MergeOperands { - fn new( - operands_list: *const *const c_char, - operands_list_len: *const size_t, - num_operands: c_int, - ) -> MergeOperands { - assert!(num_operands >= 0); - MergeOperands { - operands_list: operands_list, - operands_list_len: operands_list_len, - num_operands: num_operands as usize, - cursor: 0, - } - } + fn new( + operands_list: *const *const c_char, + operands_list_len: *const size_t, + num_operands: c_int, + ) -> MergeOperands { + assert!(num_operands >= 0); + MergeOperands { + operands_list: operands_list, + operands_list_len: operands_list_len, + num_operands: num_operands as usize, + cursor: 0, + } + } } impl<'a> Iterator for &'a mut MergeOperands { - type Item = &'a [u8]; - - fn next(&mut self) -> Option<&'a [u8]> { - if self.cursor == self.num_operands { - None - } else { - unsafe { - let base = self.operands_list as usize; - let base_len = self.operands_list_len as usize; - let spacing = mem::size_of::<*const *const u8>(); - let spacing_len = mem::size_of::<*const size_t>(); - let len_ptr = (base_len + (spacing_len * self.cursor)) as *const size_t; - let len = *len_ptr as usize; - let ptr = base + (spacing * self.cursor); - self.cursor += 1; - Some(mem::transmute(slice::from_raw_parts( - *(ptr as *const *const u8) as *const u8, - len, - ))) - } - } - } - - fn size_hint(&self) -> (usize, Option) { - let remaining = self.num_operands - self.cursor; - (remaining, Some(remaining)) - } + type Item = &'a [u8]; + + fn next(&mut self) -> Option<&'a [u8]> { + if self.cursor == self.num_operands { + None + } else { + unsafe { + let base = self.operands_list as usize; + let base_len = self.operands_list_len as usize; + let spacing = mem::size_of::<*const *const u8>(); + let spacing_len = mem::size_of::<*const size_t>(); + let len_ptr = (base_len + (spacing_len * self.cursor)) as *const size_t; + let len = *len_ptr as usize; + let ptr = base + (spacing * self.cursor); + self.cursor += 1; + Some(mem::transmute(slice::from_raw_parts( + *(ptr as *const *const u8) as *const u8, + len, + ))) + } + } + } + + fn size_hint(&self) -> (usize, Option) { + let remaining = self.num_operands - self.cursor; + (remaining, Some(remaining)) + } } #[cfg(test)] -#[allow(unused_variables)] -fn test_provided_merge( - new_key: &[u8], - existing_val: Option<&[u8]>, - operands: &mut MergeOperands, -) -> Vec { - let nops = operands.size_hint().0; - let mut result: Vec = Vec::with_capacity(nops); - if let Some(v) = existing_val { - for e in v { - result.push(*e); - } - } - for op in operands { - for e in op { - result.push(*e); - } - } - result -} +mod test { + + use super::*; + + fn test_provided_merge( + _new_key: &[u8], + existing_val: Option<&[u8]>, + operands: &mut MergeOperands, + ) -> Option> { + let nops = operands.size_hint().0; + let mut result: Vec = Vec::with_capacity(nops); + if let Some(v) = existing_val { + for e in v { + result.push(*e); + } + } + for op in operands { + for e in op { + result.push(*e); + } + } + Some(result) + } #[test] -fn mergetest() { - use {DB, Options}; - - let path = "_rust_rocksdb_mergetest"; - let mut opts = Options::default(); - opts.create_if_missing(true); - opts.set_merge_operator("test operator", test_provided_merge); - { - let db = DB::open(&opts, path).unwrap(); - let p = db.put(b"k1", b"a"); - assert!(p.is_ok()); - let _ = db.merge(b"k1", b"b"); - let _ = db.merge(b"k1", b"c"); - let _ = db.merge(b"k1", b"d"); - let _ = db.merge(b"k1", b"efg"); - let m = db.merge(b"k1", b"h"); - assert!(m.is_ok()); - match db.get(b"k1") { - Ok(Some(value)) => { - match value.to_utf8() { - Some(v) => println!("retrieved utf8 value: {}", v), - None => println!("did not read valid utf-8 out of the db"), - } - } - Err(_) => println!("error reading value"), - _ => panic!("value not present"), - } - - assert!(m.is_ok()); - let r = db.get(b"k1"); - assert!(r.unwrap().unwrap().to_utf8().unwrap() == "abcdefgh"); - assert!(db.delete(b"k1").is_ok()); - assert!(db.get(b"k1").unwrap().is_none()); - } - assert!(DB::destroy(&opts, path).is_ok()); + fn mergetest() { + use {DB, Options}; + + let path = "_rust_rocksdb_mergetest"; + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.set_merge_operator("test operator", test_provided_merge, None); + { + let db = DB::open(&opts, path).unwrap(); + let p = db.put(b"k1", b"a"); + assert!(p.is_ok()); + let _ = db.merge(b"k1", b"b"); + let _ = db.merge(b"k1", b"c"); + let _ = db.merge(b"k1", b"d"); + let _ = db.merge(b"k1", b"efg"); + let m = db.merge(b"k1", b"h"); + assert!(m.is_ok()); + match db.get(b"k1") { + Ok(Some(value)) => { + match value.to_utf8() { + Some(v) => println!("retrieved utf8 value: {}", v), + None => println!("did not read valid utf-8 out of the db"), + } + } + Err(_) => println!("error reading value"), + _ => panic!("value not present"), + } + + assert!(m.is_ok()); + let r = db.get(b"k1"); + assert!(r.unwrap().unwrap().to_utf8().unwrap() == "abcdefgh"); + assert!(db.delete(b"k1").is_ok()); + assert!(db.get(b"k1").unwrap().is_none()); + } + assert!(DB::destroy(&opts, path).is_ok()); + } + + unsafe fn to_slice(p: &T) -> &[u8] { + ::std::slice::from_raw_parts( + (p as *const T) as *const u8, + ::std::mem::size_of::(), + ) + } + + fn from_slice(s: &[u8]) -> Option<&T> { + if ::std::mem::size_of::() != s.len() { + println!("slice {:?} is len {}, but T is size {}", s, s.len(), ::std::mem::size_of::()); + None + } else { + unsafe { + Some(::std::mem::transmute(s.as_ptr())) + } + } + } + +#[repr(packed)] + + #[derive(Clone, Debug)] + struct ValueCounts { + num_a: u32, + num_b: u32, + num_c: u32, + num_d: u32, + } + + fn test_counting_partial_merge( + _new_key: &[u8], + _existing_val: Option<&[u8]>, + operands: &mut MergeOperands, + ) -> Option> { + let nops = operands.size_hint().0; + let mut result: Vec = Vec::with_capacity(nops); + for op in operands { + for e in op { + result.push(*e); + } + } + Some(result) + } + + fn test_counting_full_merge( + _new_key: &[u8], + existing_val: Option<&[u8]>, + operands: &mut MergeOperands, + ) -> Option> { + + let mut counts : ValueCounts = + if let Some(v) = existing_val { + from_slice::(v).unwrap().clone() + } else { + ValueCounts { + num_a: 0, + num_b: 0, + num_c: 0, + num_d: 0 } + }; + + for op in operands { + for e in op { + match *e { + b'a' => counts.num_a += 1, + b'b' => counts.num_b += 1, + b'c' => counts.num_c += 1, + b'd' => counts.num_d += 1, + _ => {} + } + } + } + let slc = unsafe { to_slice(&counts) }; + Some(slc.to_vec()) + } + +#[test] + fn counting_mergetest() { + use std::thread; + use std::sync::Arc; + use {DB, Options, DBCompactionStyle}; + + let path = "_rust_rocksdb_partial_mergetest"; + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.set_compaction_style(DBCompactionStyle::Universal); + opts.set_min_write_buffer_number_to_merge(10); + + opts.set_merge_operator("sort operator", test_counting_full_merge, Some(test_counting_partial_merge)); + { + let db = Arc::new(DB::open(&opts, path).unwrap()); + let _ = db.delete(b"k1"); + let _ = db.delete(b"k2"); + let _ = db.merge(b"k1", b"a"); + let _ = db.merge(b"k1", b"b"); + let _ = db.merge(b"k1", b"d"); + let _ = db.merge(b"k1", b"a"); + let _ = db.merge(b"k1", b"a"); + let _ = db.merge(b"k1", b"efg"); + for i in 0..500 { + let _ = db.merge(b"k2", b"c"); + if i % 20 == 0 { + let _ = db.get(b"k2"); + } + } + for i in 0..500 { + let _ = db.merge(b"k2", b"c"); + if i % 20 == 0 { + let _ = db.get(b"k2"); + } + } + db.compact_range(None, None); + let d1 = db.clone(); + let d2 = db.clone(); + let d3 = db.clone(); + let h1 = thread::spawn(move || { + for i in 0..500 { + let _ = d1.merge(b"k2", b"c"); + if i % 20 == 0 { + let _ = d1.get(b"k2"); + } + } + for i in 0..500 { + let _ = d1.merge(b"k2", b"a"); + if i % 20 == 0 { + let _ = d1.get(b"k2"); + } + } + }); + let h2 = thread::spawn(move || { + for i in 0..500 { + let _ = d2.merge(b"k2", b"b"); + if i % 20 == 0 { + let _ = d2.get(b"k2"); + } + } + for i in 0..500 { + let _ = d2.merge(b"k2", b"d"); + if i % 20 == 0 { + let _ = d2.get(b"k2"); + } + } + d2.compact_range(None, None); + }); + h2.join().unwrap(); + let h3 = thread::spawn(move || { + for i in 0..500 { + let _ = d3.merge(b"k2", b"a"); + if i % 20 == 0 { + let _ = d3.get(b"k2"); + } + } + for i in 0..500 { + let _ = d3.merge(b"k2", b"c"); + if i % 20 == 0 { + let _ = d3.get(b"k2"); + } + } + }); + let m = db.merge(b"k1", b"b"); + assert!(m.is_ok()); + h3.join().unwrap(); + h1.join().unwrap(); + match db.get(b"k2") { + Ok(Some(value)) => { + match from_slice::(&*value) { + Some(v) => { + assert_eq!(v.num_a, 1000); + assert_eq!(v.num_b, 500); + assert_eq!(v.num_c, 2000); + assert_eq!(v.num_d, 500); + }, + None => panic!("Failed to get ValueCounts from db"), + } + } + Err(e) => panic!("error reading value {:?}", e), + _ => panic!("value not present"), + } + match db.get(b"k1") { + Ok(Some(value)) => { + match from_slice::(&*value) { + Some(v) => { + assert_eq!(v.num_a, 3); + assert_eq!(v.num_b, 2); + assert_eq!(v.num_c, 0); + assert_eq!(v.num_d, 1); + }, + None => panic!("Failed to get ValueCounts from db"), + } + } + Err(e) => panic!("error reading value {:?}", e), + _ => panic!("value not present"), + } + } + assert!(DB::destroy(&opts, path).is_ok()); + } } diff --git a/src/slice_transform.rs b/src/slice_transform.rs new file mode 100644 index 0000000..f834be3 --- /dev/null +++ b/src/slice_transform.rs @@ -0,0 +1,149 @@ +// Copyright 2018 Tyler Neely +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ffi::CString; +use std::mem; +use std::ptr; +use std::slice; + +use libc::{self, c_char, c_void, size_t}; + +use ffi; + +/// A SliceTranform is a generic pluggable way of transforming one string +/// to another. Its primary use-case is in configuring rocksdb +/// to store prefix blooms by setting prefix_extractor in +/// ColumnFamilyOptions. +pub struct SliceTransform { + pub inner: *mut ffi::rocksdb_slicetransform_t, +} + +// NB we intentionally don't implement a Drop that passes +// through to rocksdb_slicetransform_destroy because +// this is currently only used (to my knowledge) +// by people passing it as a prefix extractor when +// opening a DB. + +impl SliceTransform { + pub fn create( + name: &str, + transform_fn: TransformFn, + in_domain_fn: Option, + ) -> SliceTransform{ + let cb = Box::new(TransformCallback { + name: CString::new(name.as_bytes()).unwrap(), + transform_fn: transform_fn, + in_domain_fn: in_domain_fn, + }); + + let st = unsafe { + ffi::rocksdb_slicetransform_create( + mem::transmute(cb), + Some(slice_transform_destructor_callback), + Some(transform_callback), + + // this is ugly, but I can't get the compiler + // not to barf with "expected fn pointer, found fn item" + // without this. sorry. + if let Some(_) = in_domain_fn { + Some(in_domain_callback) + } else { + None + }, + + // this None points to the deprecated InRange callback + None, + Some(slice_transform_name_callback), + ) + }; + + SliceTransform { + inner: st + } + } + + pub fn create_fixed_prefix(len: size_t) -> SliceTransform { + SliceTransform { + inner: unsafe { + ffi::rocksdb_slicetransform_create_fixed_prefix(len) + }, + } + } + + pub fn create_noop() -> SliceTransform { + SliceTransform { + inner: unsafe { + ffi::rocksdb_slicetransform_create_noop() + }, + } + } +} + +pub type TransformFn = fn(&[u8]) -> Vec; +pub type InDomainFn = fn(&[u8]) -> bool; + +pub struct TransformCallback { + pub name: CString, + pub transform_fn: TransformFn, + pub in_domain_fn: Option, +} + +pub unsafe extern "C" fn slice_transform_destructor_callback( + raw_cb: *mut c_void +) { + let transform: Box = mem::transmute(raw_cb); + drop(transform); +} + +pub unsafe extern "C" fn slice_transform_name_callback( + raw_cb: *mut c_void +) -> *const c_char { + let cb = &mut *(raw_cb as *mut TransformCallback); + cb.name.as_ptr() +} + +pub unsafe extern "C" fn transform_callback( + raw_cb: *mut c_void, + raw_key: *const c_char, + key_len: size_t, + dst_length: *mut size_t, +) -> *mut c_char { + let cb = &mut *(raw_cb as *mut TransformCallback); + let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize); + let mut result = (cb.transform_fn)(key); + result.shrink_to_fit(); + + // copy the result into a C++ destroyable buffer + let buf = libc::malloc(result.len() as size_t); + assert!(!buf.is_null()); + ptr::copy(result.as_ptr() as *mut c_void, &mut *buf, result.len()); + + *dst_length = result.len() as size_t; + buf as *mut c_char +} + +pub unsafe extern "C" fn in_domain_callback( + raw_cb: *mut c_void, + raw_key: *const c_char, + key_len: size_t, +) -> u8 { + let cb = &mut *(raw_cb as *mut TransformCallback); + let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize); + + if (cb.in_domain_fn.unwrap())(key) { + 1 + } else { + 0 + } +} diff --git a/tests/test_column_family.rs b/tests/test_column_family.rs index 9795b78..97ac54f 100644 --- a/tests/test_column_family.rs +++ b/tests/test_column_family.rs @@ -14,7 +14,7 @@ // extern crate rocksdb; -use rocksdb::{DB, MergeOperands, Options}; +use rocksdb::{DB, MergeOperands, Options, ColumnFamilyDescriptor}; #[test] pub fn test_column_family() { @@ -24,7 +24,7 @@ pub fn test_column_family() { { let mut opts = Options::default(); opts.create_if_missing(true); - opts.set_merge_operator("test operator", test_provided_merge); + opts.set_merge_operator("test operator", test_provided_merge, None); let mut db = DB::open(&opts, path).unwrap(); let opts = Options::default(); match db.create_cf("cf1", &opts) { @@ -38,7 +38,7 @@ pub fn test_column_family() { // should fail to open db without specifying same column families { let mut opts = Options::default(); - opts.set_merge_operator("test operator", test_provided_merge); + opts.set_merge_operator("test operator", test_provided_merge, None); match DB::open(&opts, path) { Ok(_) => { panic!("should not have opened DB successfully without \ @@ -56,7 +56,7 @@ pub fn test_column_family() { // should properly open db when specyfing all column families { let mut opts = Options::default(); - opts.set_merge_operator("test operator", test_provided_merge); + opts.set_merge_operator("test operator", test_provided_merge, None); match DB::open_cf(&opts, path, &["cf1"]) { Ok(_) => println!("successfully opened db with column family"), Err(e) => panic!("failed to open db with column family: {}", e), @@ -91,6 +91,25 @@ pub fn test_column_family() { assert!(DB::destroy(&Options::default(), path).is_ok()); } +#[test] +fn test_create_missing_column_family() { + let path = "_rust_rocksdb_missing_cftest"; + + // should be able to create new column families when opening a new database + { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + + match DB::open_cf(&opts, path, &["cf1"]) { + Ok(_) => println!("successfully created new column family"), + Err(e) => panic!("failed to create new column family: {}", e), + } + } + + assert!(DB::destroy(&Options::default(), path).is_ok()); +} + #[test] #[ignore] fn test_merge_operator() { @@ -98,7 +117,7 @@ fn test_merge_operator() { // TODO should be able to write, read, merge, batch, and iterate over a cf { let mut opts = Options::default(); - opts.set_merge_operator("test operator", test_provided_merge); + opts.set_merge_operator("test operator", test_provided_merge, None); let db = match DB::open_cf(&opts, path, &["cf1"]) { Ok(db) => { println!("successfully opened db with column family"); @@ -140,7 +159,7 @@ fn test_merge_operator() { fn test_provided_merge(_: &[u8], existing_val: Option<&[u8]>, operands: &mut MergeOperands) - -> Vec { + -> Option> { let nops = operands.size_hint().0; let mut result: Vec = Vec::with_capacity(nops); match existing_val { @@ -156,5 +175,45 @@ fn test_provided_merge(_: &[u8], result.push(*e); } } - result + Some(result) } + +#[test] +pub fn test_column_family_with_options() { + let path = "_rust_rocksdb_cf_with_optionstest"; + { + let mut cfopts = Options::default(); + cfopts.set_max_write_buffer_number(16); + let cf_descriptor = ColumnFamilyDescriptor::new("cf1", cfopts); + + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + + let cfs = vec![cf_descriptor]; + match DB::open_cf_descriptors(&opts, path, cfs) { + Ok(_) => println!("created db with column family descriptors succesfully"), + Err(e) => { + panic!("could not create new database with column family descriptors: {}", e); + } + } + } + + { + let mut cfopts = Options::default(); + cfopts.set_max_write_buffer_number(16); + let cf_descriptor = ColumnFamilyDescriptor::new("cf1", cfopts); + + let opts = Options::default(); + let cfs = vec![cf_descriptor]; + + match DB::open_cf_descriptors(&opts, path, cfs) { + Ok(_) => println!("succesfully re-opened database with column family descriptorrs"), + Err(e) => { + panic!("unable to re-open database with column family descriptors: {}", e); + } + } + } + + assert!(DB::destroy(&Options::default(), path).is_ok()); +} \ No newline at end of file diff --git a/tests/test_iterator.rs b/tests/test_iterator.rs index 4b8a5b2..3cb368f 100644 --- a/tests/test_iterator.rs +++ b/tests/test_iterator.rs @@ -122,6 +122,11 @@ pub fn test_iterator() { let expected = vec![(cba(&k2), cba(&v2)), (cba(&k1), cba(&v1))]; assert_eq!(iterator1.collect::>(), expected); } + { + let iterator1 = db.iterator(IteratorMode::From(b"zz", Direction::Reverse)); + let expected = vec![(cba(&k4), cba(&v4)), (cba(&k3), cba(&v3))]; + assert_eq!(iterator1.take(2).collect::>(), expected); + } { let iterator1 = db.iterator(IteratorMode::From(b"k0", Direction::Forward)); assert!(iterator1.valid()); @@ -132,13 +137,13 @@ pub fn test_iterator() { let iterator4 = db.iterator(IteratorMode::From(b"k5", Direction::Forward)); assert!(!iterator4.valid()); let iterator5 = db.iterator(IteratorMode::From(b"k0", Direction::Reverse)); - assert!(iterator5.valid()); + assert!(!iterator5.valid()); let iterator6 = db.iterator(IteratorMode::From(b"k1", Direction::Reverse)); assert!(iterator6.valid()); let iterator7 = db.iterator(IteratorMode::From(b"k11", Direction::Reverse)); assert!(iterator7.valid()); let iterator8 = db.iterator(IteratorMode::From(b"k5", Direction::Reverse)); - assert!(!iterator8.valid()); + assert!(iterator8.valid()); } { let mut iterator1 = db.iterator(IteratorMode::From(b"k4", Direction::Forward)); @@ -151,3 +156,41 @@ pub fn test_iterator() { let opts = Options::default(); assert!(DB::destroy(&opts, path).is_ok()); } + +fn key(k: &[u8]) -> Box<[u8]> { k.to_vec().into_boxed_slice() } + +#[test] +pub fn test_prefix_iterator() { + let path = "_rust_rocksdb_prefixiteratortest"; + { + let a1: Box<[u8]> = key(b"aaa1"); + let a2: Box<[u8]> = key(b"aaa2"); + let b1: Box<[u8]> = key(b"bbb1"); + let b2: Box<[u8]> = key(b"bbb2"); + + let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(3); + + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.set_prefix_extractor(prefix_extractor); + + let db = DB::open(&opts, path).unwrap(); + + assert!(db.put(&*a1, &*a1).is_ok()); + assert!(db.put(&*a2, &*a2).is_ok()); + assert!(db.put(&*b1, &*b1).is_ok()); + assert!(db.put(&*b2, &*b2).is_ok()); + + { + let expected = vec![(cba(&a1), cba(&a1)), (cba(&a2), cba(&a2))]; + let a_iterator = db.prefix_iterator(b"aaa"); + assert_eq!(a_iterator.collect::>(), expected) + } + + { + let expected = vec![(cba(&b1), cba(&b1)), (cba(&b2), cba(&b2))]; + let b_iterator = db.prefix_iterator(b"bbb"); + assert_eq!(b_iterator.collect::>(), expected) + } + } +} diff --git a/tests/test_slice_transform.rs b/tests/test_slice_transform.rs new file mode 100644 index 0000000..832681a --- /dev/null +++ b/tests/test_slice_transform.rs @@ -0,0 +1,48 @@ +extern crate rocksdb; + +use rocksdb::{DB, Options, SliceTransform}; + +#[test] +pub fn test_slice_transform() { + + let path = "_rust_rocksdb_slicetransform_test"; + let a1: Box<[u8]> = key(b"aaa1"); + let a2: Box<[u8]> = key(b"aaa2"); + let b1: Box<[u8]> = key(b"bbb1"); + let b2: Box<[u8]> = key(b"bbb2"); + + fn first_three(k: &[u8]) -> Vec { + k.iter().take(3).cloned().collect() + } + + let prefix_extractor = SliceTransform::create("first_three", first_three, None); + + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.set_prefix_extractor(prefix_extractor); + + let db = DB::open(&opts, path).unwrap(); + + assert!(db.put(&*a1, &*a1).is_ok()); + assert!(db.put(&*a2, &*a2).is_ok()); + assert!(db.put(&*b1, &*b1).is_ok()); + assert!(db.put(&*b2, &*b2).is_ok()); + + fn cba(input: &Box<[u8]>) -> Box<[u8]> { + input.iter().cloned().collect::>().into_boxed_slice() + } + + fn key(k: &[u8]) -> Box<[u8]> { k.to_vec().into_boxed_slice() } + + { + let expected = vec![(cba(&a1), cba(&a1)), (cba(&a2), cba(&a2))]; + let a_iterator = db.prefix_iterator(b"aaa"); + assert_eq!(a_iterator.collect::>(), expected) + } + + { + let expected = vec![(cba(&b1), cba(&b1)), (cba(&b2), cba(&b2))]; + let b_iterator = db.prefix_iterator(b"bbb"); + assert_eq!(b_iterator.collect::>(), expected) + } +}