From 729fb6999359ec1f138c97cb3a4d2f652bd5b313 Mon Sep 17 00:00:00 2001 From: Linh Tran Tuan Date: Wed, 15 Jul 2020 20:41:53 +0900 Subject: [PATCH] Add dbpath and env options (#445) --- src/db.rs | 542 ---------------------------------------------- src/db_options.rs | 219 ++++++++++++++++++- src/lib.rs | 2 +- tests/test_db.rs | 391 ++++++++++++++++++++++++++++++++- 4 files changed, 600 insertions(+), 554 deletions(-) diff --git a/src/db.rs b/src/db.rs index 36353da..8840b82 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1288,545 +1288,3 @@ impl fmt::Debug for DB { write!(f, "RocksDB {{ path: {:?} }}", self.path()) } } - -#[test] -fn test_open_for_read_only() { - let path = "_rust_rocksdb_test_open_for_read_only"; - { - let db = DB::open_default(path).unwrap(); - db.put(b"k1", b"v1").unwrap(); - } - { - let opts = Options::default(); - let error_if_log_file_exist = false; - let db = DB::open_for_read_only(&opts, path, error_if_log_file_exist).unwrap(); - assert_eq!(db.get(b"k1").unwrap().unwrap(), b"v1"); - assert!(db.put(b"k2", b"v2").is_err()); - } - let opts = Options::default(); - assert!(DB::destroy(&opts, path).is_ok()); -} - -#[test] -fn test_open_cf_for_read_only() { - let path = "_rust_rocksdb_test_open_cf_for_read_only"; - let cfs = vec!["cf1"]; - { - let mut opts = Options::default(); - opts.create_if_missing(true); - opts.create_missing_column_families(true); - let db = DB::open_cf(&opts, path, cfs.clone()).unwrap(); - let cf1 = db.cf_handle("cf1").unwrap(); - db.put_cf(cf1, b"k1", b"v1").unwrap(); - } - { - let opts = Options::default(); - let error_if_log_file_exist = false; - let db = DB::open_cf_for_read_only(&opts, path, cfs, error_if_log_file_exist).unwrap(); - let cf1 = db.cf_handle("cf1").unwrap(); - assert_eq!(db.get_cf(cf1, b"k1").unwrap().unwrap(), b"v1"); - assert!(db.put_cf(cf1, b"k2", b"v2").is_err()); - } - let opts = Options::default(); - assert!(DB::destroy(&opts, path).is_ok()); -} - -#[test] -fn external() { - let path = "_rust_rocksdb_externaltest"; - { - let db = DB::open_default(path).unwrap(); - let p = db.put(b"k1", b"v1111"); - assert!(p.is_ok()); - let r: Result>, Error> = db.get(b"k1"); - assert_eq!(r.unwrap().unwrap(), b"v1111"); - assert!(db.delete(b"k1").is_ok()); - assert!(db.get(b"k1").unwrap().is_none()); - } - let opts = Options::default(); - let result = DB::destroy(&opts, path); - assert!(result.is_ok()); -} - -#[test] -fn errors_do_stuff() { - let path = "_rust_rocksdb_error"; - { - let _db = DB::open_default(path).unwrap(); - let opts = Options::default(); - // The DB will still be open when we try to destroy it and the lock should fail. - match DB::destroy(&opts, path) { - Err(s) => { - let message = s.to_string(); - assert!(message.find("IO error:").is_some()); - assert!(message.find("_rust_rocksdb_error/LOCK:").is_some()); - } - Ok(_) => panic!("should fail"), - } - } - let opts = Options::default(); - let result = DB::destroy(&opts, path); - assert!(result.is_ok()); -} - -#[test] -fn writebatch_works() { - let path = "_rust_rocksdb_writebacktest"; - { - let db = DB::open_default(path).unwrap(); - { - // test putx - let mut batch = WriteBatch::default(); - assert!(db.get(b"k1").unwrap().is_none()); - assert_eq!(batch.len(), 0); - assert!(batch.is_empty()); - batch.put(b"k1", b"v1111"); - batch.put(b"k2", b"v2222"); - batch.put(b"k3", b"v3333"); - assert_eq!(batch.len(), 3); - assert!(!batch.is_empty()); - assert!(db.get(b"k1").unwrap().is_none()); - let p = db.write(batch); - assert!(p.is_ok()); - let r: Result>, Error> = db.get(b"k1"); - assert_eq!(r.unwrap().unwrap(), b"v1111"); - } - { - // test delete - let mut batch = WriteBatch::default(); - batch.delete(b"k1"); - assert_eq!(batch.len(), 1); - assert!(!batch.is_empty()); - let p = db.write(batch); - assert!(p.is_ok()); - assert!(db.get(b"k1").unwrap().is_none()); - } - { - // test delete_range - let mut batch = WriteBatch::default(); - batch.delete_range(b"k2", b"k4"); - assert_eq!(batch.len(), 1); - assert!(!batch.is_empty()); - let p = db.write(batch); - assert!(p.is_ok()); - assert!(db.get(b"k2").unwrap().is_none()); - assert!(db.get(b"k3").unwrap().is_none()); - } - { - // test size_in_bytes - let mut batch = WriteBatch::default(); - let before = batch.size_in_bytes(); - batch.put(b"k1", b"v1234567890"); - let after = batch.size_in_bytes(); - assert!(before + 10 <= after); - } - } - let opts = Options::default(); - assert!(DB::destroy(&opts, path).is_ok()); -} - -#[test] -fn get_with_cache_and_bulkload_test() { - use crate::{BlockBasedOptions, Cache}; - - let path = "_rust_rocksdb_get_with_cache_and_bulkload_test"; - let log_path = "_rust_rocksdb_log_path_test"; - { - // create options - let mut opts = Options::default(); - opts.create_if_missing(true); - opts.create_missing_column_families(true); - opts.set_wal_bytes_per_sync(8 << 10); // 8KB - opts.set_writable_file_max_buffer_size(512 << 10); // 512KB - opts.set_enable_write_thread_adaptive_yield(true); - opts.set_unordered_write(true); - opts.set_max_subcompactions(2); - opts.set_max_background_jobs(4); - opts.set_use_adaptive_mutex(true); - - // set block based table and cache - let cache = Cache::new_lru_cache(64 << 10).unwrap(); - let mut block_based_opts = BlockBasedOptions::default(); - block_based_opts.set_block_cache(&cache); - block_based_opts.set_cache_index_and_filter_blocks(true); - opts.set_block_based_table_factory(&block_based_opts); - - // open db - let db = DB::open(&opts, path).unwrap(); - - // write a lot - let mut batch = WriteBatch::default(); - for i in 0..1_000 { - batch.put(format!("{:0>4}", i).as_bytes(), b"v"); - } - assert!(db.write(batch).is_ok()); - - // flush memory table to sst, trigger cache usage on `get` - assert!(db.flush().is_ok()); - - // get -> trigger caching - let _ = db.get(b"1"); - assert!(cache.get_usage() > 0); - } - - // bulk loading - { - // create new options - let mut opts = Options::default(); - opts.set_delete_obsolete_files_period_micros(100_000); - opts.prepare_for_bulk_load(); - opts.set_db_log_dir(log_path); - opts.set_max_sequential_skip_in_iterations(16); - - // open db - let db = DB::open(&opts, path).unwrap(); - - // try to get key - let iter = db.iterator(IteratorMode::Start); - for (expected, (k, _)) in iter.enumerate() { - assert_eq!(k.as_ref(), format!("{:0>4}", expected).as_bytes()); - } - } - - let opts = Options::default(); - assert!(DB::destroy(&opts, path).is_ok()); - assert!(DB::destroy(&opts, log_path).is_ok()); -} - -#[test] -fn iterator_test() { - let path = "_rust_rocksdb_iteratortest"; - { - let db = DB::open_default(path).unwrap(); - let p = db.put(b"k1", b"v1111"); - assert!(p.is_ok()); - let p = db.put(b"k2", b"v2222"); - assert!(p.is_ok()); - let p = db.put(b"k3", b"v3333"); - assert!(p.is_ok()); - let iter = db.iterator(IteratorMode::Start); - for (k, v) in iter { - println!( - "Hello {}: {}", - str::from_utf8(&*k).unwrap(), - str::from_utf8(&*v).unwrap() - ); - } - } - let opts = Options::default(); - assert!(DB::destroy(&opts, path).is_ok()); -} - -#[test] -fn iterator_test_past_end() { - let path = "_rust_rocksdb_iteratortest_past_end"; - { - let db = DB::open_default(path).unwrap(); - db.put(b"k1", b"v1111").unwrap(); - let mut iter = db.iterator(IteratorMode::Start); - assert!(iter.next().is_some()); - assert!(iter.next().is_none()); - assert!(iter.next().is_none()); - } - let opts = Options::default(); - DB::destroy(&opts, path).unwrap(); -} - -#[test] -fn iterator_test_upper_bound() { - let path = "_rust_rocksdb_iteratortest_upper_bound"; - { - let db = DB::open_default(path).unwrap(); - db.put(b"k1", b"v1").unwrap(); - db.put(b"k2", b"v2").unwrap(); - db.put(b"k3", b"v3").unwrap(); - db.put(b"k4", b"v4").unwrap(); - db.put(b"k5", b"v5").unwrap(); - - let mut readopts = ReadOptions::default(); - readopts.set_iterate_upper_bound(b"k4".to_vec()); - - let iter = db.iterator_opt(IteratorMode::Start, readopts); - let expected: Vec<_> = vec![(b"k1", b"v1"), (b"k2", b"v2"), (b"k3", b"v3")] - .into_iter() - .map(|(k, v)| (k.to_vec().into_boxed_slice(), v.to_vec().into_boxed_slice())) - .collect(); - assert_eq!(expected, iter.collect::>()); - } - let opts = Options::default(); - DB::destroy(&opts, path).unwrap(); -} - -#[test] -fn iterator_test_lower_bound() { - let path = "_rust_rocksdb_iteratortest_lower_bound"; - { - let db = DB::open_default(path).unwrap(); - db.put(b"k1", b"v1").unwrap(); - db.put(b"k2", b"v2").unwrap(); - db.put(b"k3", b"v3").unwrap(); - db.put(b"k4", b"v4").unwrap(); - db.put(b"k5", b"v5").unwrap(); - - let mut readopts = ReadOptions::default(); - readopts.set_iterate_lower_bound(b"k4".to_vec()); - - let iter = db.iterator_opt(IteratorMode::Start, readopts); - let expected: Vec<_> = vec![(b"k4", b"v4"), (b"k5", b"v5")] - .into_iter() - .map(|(k, v)| (k.to_vec().into_boxed_slice(), v.to_vec().into_boxed_slice())) - .collect(); - assert_eq!(expected, iter.collect::>()); - } - let opts = Options::default(); - DB::destroy(&opts, path).unwrap(); -} - -#[test] -fn prefix_extract_and_iterate_test() { - use crate::SliceTransform; - - let path = "_rust_rocksdb_prefix_extract_and_iterate"; - { - let mut opts = Options::default(); - opts.create_if_missing(true); - opts.create_missing_column_families(true); - opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(2)); - - let db = DB::open(&opts, path).unwrap(); - db.put(b"p1_k1", b"v1").unwrap(); - db.put(b"p2_k2", b"v2").unwrap(); - db.put(b"p1_k3", b"v3").unwrap(); - db.put(b"p1_k4", b"v4").unwrap(); - db.put(b"p2_k5", b"v5").unwrap(); - - let mut readopts = ReadOptions::default(); - readopts.set_prefix_same_as_start(true); - readopts.set_iterate_lower_bound(b"p1".to_vec()); - readopts.set_pin_data(true); - - let iter = db.iterator_opt(IteratorMode::Start, readopts); - let expected: Vec<_> = vec![(b"p1_k1", b"v1"), (b"p1_k3", b"v3"), (b"p1_k4", b"v4")] - .into_iter() - .map(|(k, v)| (k.to_vec().into_boxed_slice(), v.to_vec().into_boxed_slice())) - .collect(); - assert_eq!(expected, iter.collect::>()); - } - let opts = Options::default(); - DB::destroy(&opts, path).unwrap(); -} - -#[test] -fn iterator_test_tailing() { - let path = "_rust_rocksdb_iteratortest_tailing"; - { - let data = [(b"k1", b"v1"), (b"k2", b"v2"), (b"k3", b"v3")]; - let mut ro = ReadOptions::default(); - ro.set_tailing(true); - let db = DB::open_default(path).unwrap(); - - let mut data_iter = data.iter(); - let (k, v) = data_iter.next().unwrap(); - let r = db.put(k, v); - assert!(r.is_ok()); - - let tail_iter = db.iterator_opt(IteratorMode::Start, ro); - for (k, v) in data_iter { - let r = db.put(k, v); - assert!(r.is_ok()); - } - - let mut tot = 0; - for (i, (k, v)) in tail_iter.enumerate() { - assert_eq!( - (k.to_vec(), v.to_vec()), - (data[i].0.to_vec(), data[i].1.to_vec()) - ); - tot += 1; - } - assert_eq!(tot, data.len()); - } - let opts = Options::default(); - assert!(DB::destroy(&opts, path).is_ok()); -} - -#[test] -fn snapshot_test() { - let path = "_rust_rocksdb_snapshottest"; - { - let db = DB::open_default(path).unwrap(); - let p = db.put(b"k1", b"v1111"); - assert!(p.is_ok()); - - let snap = db.snapshot(); - let r: Result>, Error> = snap.get(b"k1"); - assert_eq!(r.unwrap().unwrap(), b"v1111"); - - let p = db.put(b"k2", b"v2222"); - assert!(p.is_ok()); - - assert!(db.get(b"k2").unwrap().is_some()); - assert!(snap.get(b"k2").unwrap().is_none()); - } - let opts = Options::default(); - assert!(DB::destroy(&opts, path).is_ok()); -} - -#[test] -fn set_option_test() { - let path = "_rust_rocksdb_set_optionstest"; - { - let db = DB::open_default(path).unwrap(); - // set an option to valid values - assert!(db - .set_options(&[("disable_auto_compactions", "true")]) - .is_ok()); - assert!(db - .set_options(&[("disable_auto_compactions", "false")]) - .is_ok()); - // invalid names/values should result in an error - assert!(db - .set_options(&[("disable_auto_compactions", "INVALID_VALUE")]) - .is_err()); - assert!(db - .set_options(&[("INVALID_NAME", "INVALID_VALUE")]) - .is_err()); - // option names/values must not contain NULLs - assert!(db - .set_options(&[("disable_auto_compactions", "true\0")]) - .is_err()); - assert!(db - .set_options(&[("disable_auto_compactions\0", "true")]) - .is_err()); - // empty options are not allowed - assert!(db.set_options(&[]).is_err()); - // multiple options can be set in a single API call - let multiple_options = [ - ("paranoid_file_checks", "true"), - ("report_bg_io_stats", "true"), - ]; - db.set_options(&multiple_options).unwrap(); - } - assert!(DB::destroy(&Options::default(), path).is_ok()); -} - -#[test] -fn delete_range_test() { - let path = "_rust_rocksdb_delete_range_test"; - { - let mut opts = Options::default(); - opts.create_if_missing(true); - opts.create_missing_column_families(true); - - let cfs = vec!["cf1"]; - let db = DB::open_cf(&opts, path, cfs).unwrap(); - - let cf1 = db.cf_handle("cf1").unwrap(); - db.put_cf(cf1, b"k1", b"v1").unwrap(); - db.put_cf(cf1, b"k2", b"v2").unwrap(); - db.put_cf(cf1, b"k3", b"v3").unwrap(); - db.put_cf(cf1, b"k4", b"v4").unwrap(); - db.put_cf(cf1, b"k5", b"v5").unwrap(); - - db.delete_range_cf(cf1, b"k2", b"k4").unwrap(); - assert_eq!(db.get_cf(cf1, b"k1").unwrap().unwrap(), b"v1"); - assert_eq!(db.get_cf(cf1, b"k4").unwrap().unwrap(), b"v4"); - assert_eq!(db.get_cf(cf1, b"k5").unwrap().unwrap(), b"v5"); - assert!(db.get_cf(cf1, b"k2").unwrap().is_none()); - assert!(db.get_cf(cf1, b"k3").unwrap().is_none()); - } - let opts = Options::default(); - DB::destroy(&opts, path).unwrap(); -} - -#[test] -fn compact_range_test() { - use crate::{ - BottommostLevelCompaction, DBCompactionStyle, UniversalCompactOptions, - UniversalCompactionStopStyle, - }; - - let path = "_rust_rocksdb_compact_range_test"; - { - let mut opts = Options::default(); - opts.create_if_missing(true); - opts.create_missing_column_families(true); - - // set compaction style - let mut uni_co_opts = UniversalCompactOptions::default(); - uni_co_opts.set_size_ratio(2); - uni_co_opts.set_stop_style(UniversalCompactionStopStyle::Total); - opts.set_compaction_style(DBCompactionStyle::Universal); - opts.set_universal_compaction_options(&uni_co_opts); - - // set compaction options - let mut compact_opts = CompactOptions::default(); - compact_opts.set_exclusive_manual_compaction(true); - compact_opts.set_target_level(1); - compact_opts.set_change_level(true); - compact_opts.set_bottommost_level_compaction(BottommostLevelCompaction::ForceOptimized); - - // put and compact column family cf1 - let cfs = vec!["cf1"]; - let db = DB::open_cf(&opts, path, cfs).unwrap(); - let cf1 = db.cf_handle("cf1").unwrap(); - db.put_cf(cf1, b"k1", b"v1").unwrap(); - db.put_cf(cf1, b"k2", b"v2").unwrap(); - db.put_cf(cf1, b"k3", b"v3").unwrap(); - db.put_cf(cf1, b"k4", b"v4").unwrap(); - db.put_cf(cf1, b"k5", b"v5").unwrap(); - db.compact_range_cf(cf1, Some(b"k2"), Some(b"k4")); - db.compact_range_cf_opt(cf1, Some(b"k1"), None::<&str>, &compact_opts); - - // put and compact default column family - db.put(b"k1", b"v1").unwrap(); - db.put(b"k2", b"v2").unwrap(); - db.put(b"k3", b"v3").unwrap(); - db.put(b"k4", b"v4").unwrap(); - db.put(b"k5", b"v5").unwrap(); - db.compact_range(Some(b"k3"), None::<&str>); - db.compact_range_opt(None::<&str>, Some(b"k5"), &compact_opts); - } - let opts = Options::default(); - DB::destroy(&opts, path).unwrap(); -} - -#[test] -fn fifo_compaction_test() { - use crate::{DBCompactionStyle, FifoCompactOptions, PerfContext, PerfMetric}; - - let path = "_rust_rocksdb_fifo_compaction_test"; - { - let mut opts = Options::default(); - opts.create_if_missing(true); - opts.create_missing_column_families(true); - - // set compaction style - let mut fifo_co_opts = FifoCompactOptions::default(); - fifo_co_opts.set_max_table_files_size(4 << 10); // 4KB - opts.set_compaction_style(DBCompactionStyle::Fifo); - opts.set_fifo_compaction_options(&fifo_co_opts); - - // put and compact column family cf1 - let cfs = vec!["cf1"]; - let db = DB::open_cf(&opts, path, cfs).unwrap(); - let cf1 = db.cf_handle("cf1").unwrap(); - db.put_cf(cf1, b"k1", b"v1").unwrap(); - db.put_cf(cf1, b"k2", b"v2").unwrap(); - db.put_cf(cf1, b"k3", b"v3").unwrap(); - db.put_cf(cf1, b"k4", b"v4").unwrap(); - db.put_cf(cf1, b"k5", b"v5").unwrap(); - db.compact_range_cf(cf1, Some(b"k2"), Some(b"k4")); - - // check stats - let ctx = PerfContext::default(); - - let block_cache_hit_count = ctx.metric(PerfMetric::BlockCacheHitCount); - assert!(block_cache_hit_count > 0); - - let expect = format!("block_cache_hit_count = {}", block_cache_hit_count); - assert!(ctx.report(true).contains(&expect)); - } - let opts = Options::default(); - DB::destroy(&opts, path).unwrap(); -} diff --git a/src/db_options.rs b/src/db_options.rs index 64f5b74..52e5c7a 100644 --- a/src/db_options.rs +++ b/src/db_options.rs @@ -74,6 +74,104 @@ impl Drop for Cache { } } +/// An Env is an interface used by the rocksdb implementation to access +/// operating system functionality like the filesystem etc. Callers +/// may wish to provide a custom Env object when opening a database to +/// get fine gain control; e.g., to rate limit file system operations. +/// +/// All Env implementations are safe for concurrent access from +/// multiple threads without any external synchronization. +/// +/// Note: currently, C API behinds C++ API for various settings. +/// See also: `rocksdb/include/env.h` +pub struct Env { + pub(crate) inner: *mut ffi::rocksdb_env_t, +} + +impl Drop for Env { + fn drop(&mut self) { + unsafe { + ffi::rocksdb_env_destroy(self.inner); + } + } +} + +impl Env { + /// Returns default env + pub fn default() -> Result { + let env = unsafe { ffi::rocksdb_create_default_env() }; + if env.is_null() { + Err(Error::new("Could not create mem env".to_owned())) + } else { + Ok(Env { inner: env }) + } + } + + /// Returns a new environment that stores its data in memory and delegates + /// all non-file-storage tasks to base_env. + pub fn mem_env() -> Result { + let env = unsafe { ffi::rocksdb_create_mem_env() }; + if env.is_null() { + Err(Error::new("Could not create mem env".to_owned())) + } else { + Ok(Env { inner: env }) + } + } + + /// Sets the number of background worker threads of a specific thread pool for this environment. + /// `LOW` is the default pool. + /// + /// Default: 1 + pub fn set_background_threads(&mut self, num_threads: c_int) { + unsafe { + ffi::rocksdb_env_set_background_threads(self.inner, num_threads); + } + } + + /// Sets the size of the high priority thread pool that can be used to + /// prevent compactions from stalling memtable flushes. + pub fn set_high_priority_background_threads(&mut self, n: c_int) { + unsafe { + ffi::rocksdb_env_set_high_priority_background_threads(self.inner, n); + } + } + + /// Wait for all threads started by StartThread to terminate. + pub fn join_all_threads(&mut self) { + unsafe { + ffi::rocksdb_env_join_all_threads(self.inner); + } + } + + /// Lowering IO priority for threads from the specified pool. + pub fn lower_thread_pool_io_priority(&mut self) { + unsafe { + ffi::rocksdb_env_lower_thread_pool_io_priority(self.inner); + } + } + + /// Lowering IO priority for high priority thread pool. + pub fn lower_high_priority_thread_pool_io_priority(&mut self) { + unsafe { + ffi::rocksdb_env_lower_high_priority_thread_pool_io_priority(self.inner); + } + } + + /// Lowering CPU priority for threads from the specified pool. + pub fn lower_thread_pool_cpu_priority(&mut self) { + unsafe { + ffi::rocksdb_env_lower_thread_pool_cpu_priority(self.inner); + } + } + + /// Lowering CPU priority for high priority thread pool. + pub fn lower_high_priority_thread_pool_cpu_priority(&mut self) { + unsafe { + ffi::rocksdb_env_lower_high_priority_thread_pool_cpu_priority(self.inner); + } + } +} + /// Database-wide options around performance and behavior. /// /// Please read the official tuning [guide](https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide) @@ -121,7 +219,7 @@ pub struct Options { /// ``` /// use rocksdb::{DB, Options, WriteBatch, WriteOptions}; /// -/// let path = "_path_for_rocksdb_storageY"; +/// let path = "_path_for_rocksdb_storageY1"; /// { /// let db = DB::open_default(path).unwrap(); /// let mut batch = WriteBatch::default(); @@ -150,7 +248,7 @@ pub struct WriteOptions { /// ``` /// use rocksdb::{DB, Options, FlushOptions}; /// -/// let path = "_path_for_rocksdb_storageY"; +/// let path = "_path_for_rocksdb_storageY2"; /// { /// let db = DB::open_default(path).unwrap(); /// @@ -191,11 +289,14 @@ pub struct ReadOptions { /// writer.put(b"k1", b"v1").unwrap(); /// writer.finish().unwrap(); /// -/// let path = "_path_for_rocksdb_storageY"; -/// let db = DB::open_default(&path).unwrap(); -/// let mut ingest_opts = IngestExternalFileOptions::default(); -/// ingest_opts.set_move_files(true); -/// db.ingest_external_file_opts(&ingest_opts, vec!["_path_for_sst_file"]).unwrap(); +/// let path = "_path_for_rocksdb_storageY3"; +/// { +/// let db = DB::open_default(&path).unwrap(); +/// let mut ingest_opts = IngestExternalFileOptions::default(); +/// ingest_opts.set_move_files(true); +/// db.ingest_external_file_opts(&ingest_opts, vec!["_path_for_sst_file"]).unwrap(); +/// } +/// let _ = DB::destroy(&Options::default(), path); /// ``` pub struct IngestExternalFileOptions { pub(crate) inner: *mut ffi::rocksdb_ingestexternalfileoptions_t, @@ -618,6 +719,81 @@ impl Options { } } + /// Specifies whether an error should be raised if the database already exists. + /// + /// Default: false + pub fn set_error_if_exists(&mut self, enabled: bool) { + unsafe { + ffi::rocksdb_options_set_error_if_exists(self.inner, enabled as c_uchar); + } + } + + /// Enable/disable paranoid checks. + /// + /// If true, the implementation will do aggressive checking of the + /// data it is processing and will stop early if it detects any + /// errors. This may have unforeseen ramifications: for example, a + /// corruption of one DB entry may cause a large number of entries to + /// become unreadable or for the entire DB to become unopenable. + /// If any of the writes to the database fails (Put, Delete, Merge, Write), + /// the database will switch to read-only mode and fail all other + /// Write operations. + /// + /// Default: false + pub fn set_paranoid_checks(&mut self, enabled: bool) { + unsafe { + ffi::rocksdb_options_set_paranoid_checks(self.inner, enabled as c_uchar); + } + } + + /// A list of paths where SST files can be put into, with its target size. + /// Newer data is placed into paths specified earlier in the vector while + /// older data gradually moves to paths specified later in the vector. + /// + /// For example, you have a flash device with 10GB allocated for the DB, + /// as well as a hard drive of 2TB, you should config it to be: + /// [{"/flash_path", 10GB}, {"/hard_drive", 2TB}] + /// + /// The system will try to guarantee data under each path is close to but + /// not larger than the target size. But current and future file sizes used + /// by determining where to place a file are based on best-effort estimation, + /// which means there is a chance that the actual size under the directory + /// is slightly more than target size under some workloads. User should give + /// some buffer room for those cases. + /// + /// If none of the paths has sufficient room to place a file, the file will + /// be placed to the last path anyway, despite to the target size. + /// + /// Placing newer data to earlier paths is also best-efforts. User should + /// expect user files to be placed in higher levels in some extreme cases. + /// + /// If left empty, only one path will be used, which is `path` passed when + /// opening the DB. + /// + /// Default: empty + pub fn set_db_paths(&mut self, paths: &[DBPath]) { + let mut paths: Vec<_> = paths + .iter() + .map(|path| path.inner as *const ffi::rocksdb_dbpath_t) + .collect(); + let num_paths = paths.len(); + unsafe { + ffi::rocksdb_options_set_db_paths(self.inner, paths.as_mut_ptr(), num_paths); + } + } + + /// Use the specified object to interact with the environment, + /// e.g. to read/write files, schedule background work, etc. In the near + /// future, support for doing storage operations such as read/write files + /// through env will be deprecated in favor of file_system. + /// + /// Default: Env::default() + pub fn set_env(&mut self, env: &Env) { + unsafe { + ffi::rocksdb_options_set_env(self.inner, env.inner); + } + } + /// Sets the compression algorithm that will be used for compressing blocks. /// /// Default: `DBCompressionType::Snappy` (`DBCompressionType::None` if @@ -3014,6 +3190,35 @@ impl CompactOptions { } } +/// Represents a path where sst files can be put into +pub struct DBPath { + pub(crate) inner: *mut ffi::rocksdb_dbpath_t, +} + +impl DBPath { + /// Create a new path + pub fn new>(path: P, target_size: u64) -> Result { + let p = CString::new(path.as_ref().to_string_lossy().as_bytes()).unwrap(); + let dbpath = unsafe { ffi::rocksdb_dbpath_create(p.as_ptr(), target_size) }; + if dbpath.is_null() { + Err(Error::new(format!( + "Could not create path for storing sst files at location: {}", + path.as_ref().to_string_lossy() + ))) + } else { + Ok(DBPath { inner: dbpath }) + } + } +} + +impl Drop for DBPath { + fn drop(&mut self) { + unsafe { + ffi::rocksdb_dbpath_destroy(self.inner); + } + } +} + #[cfg(test)] mod tests { use crate::{MemtableFactory, Options}; diff --git a/src/lib.rs b/src/lib.rs index 37a7b10..5d6c017 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -97,7 +97,7 @@ pub use crate::{ db_iterator::{DBIterator, DBRawIterator, DBWALIterator, Direction, IteratorMode}, db_options::{ BlockBasedIndexType, BlockBasedOptions, BottommostLevelCompaction, Cache, CompactOptions, - DBCompactionStyle, DBCompressionType, DBRecoveryMode, DataBlockIndexType, + DBCompactionStyle, DBCompressionType, DBPath, DBRecoveryMode, DataBlockIndexType, Env, FifoCompactOptions, FlushOptions, IngestExternalFileOptions, MemtableFactory, Options, PlainTableFactoryOptions, ReadOptions, UniversalCompactOptions, UniversalCompactionStopStyle, WriteOptions, diff --git a/tests/test_db.rs b/tests/test_db.rs index cae1b85..c97c2e5 100644 --- a/tests/test_db.rs +++ b/tests/test_db.rs @@ -14,7 +14,12 @@ mod util; -use rocksdb::{Error, IteratorMode, Options, Snapshot, WriteBatch, DB}; +use rocksdb::{ + BlockBasedOptions, BottommostLevelCompaction, Cache, CompactOptions, DBCompactionStyle, Env, + Error, FifoCompactOptions, IteratorMode, Options, PerfContext, PerfMetric, ReadOptions, + SliceTransform, Snapshot, UniversalCompactOptions, UniversalCompactionStopStyle, WriteBatch, + DB, +}; use std::sync::Arc; use std::time::Duration; use std::{mem, thread}; @@ -85,10 +90,13 @@ fn writebatch_works() { assert_eq!(batch.len(), 0); assert!(batch.is_empty()); batch.put(b"k1", b"v1111"); - assert_eq!(batch.len(), 1); + batch.put(b"k2", b"v2222"); + batch.put(b"k3", b"v3333"); + assert_eq!(batch.len(), 3); assert!(!batch.is_empty()); assert!(db.get(b"k1").unwrap().is_none()); - assert!(db.write(batch).is_ok()); + let p = db.write(batch); + assert!(p.is_ok()); let r: Result>, Error> = db.get(b"k1"); assert_eq!(r.unwrap().unwrap(), b"v1111"); } @@ -98,9 +106,21 @@ fn writebatch_works() { batch.delete(b"k1"); assert_eq!(batch.len(), 1); assert!(!batch.is_empty()); - assert!(db.write(batch).is_ok()); + let p = db.write(batch); + assert!(p.is_ok()); assert!(db.get(b"k1").unwrap().is_none()); } + { + // test delete_range + let mut batch = WriteBatch::default(); + batch.delete_range(b"k2", b"k4"); + assert_eq!(batch.len(), 1); + assert!(!batch.is_empty()); + let p = db.write(batch); + assert!(p.is_ok()); + assert!(db.get(b"k2").unwrap().is_none()); + assert!(db.get(b"k3").unwrap().is_none()); + } { // test size_in_bytes let mut batch = WriteBatch::default(); @@ -132,6 +152,97 @@ fn iterator_test() { } } +#[test] +fn iterator_test_past_end() { + let path = DBPath::new("_rust_rocksdb_iteratortest_past_end"); + { + let db = DB::open_default(&path).unwrap(); + db.put(b"k1", b"v1111").unwrap(); + let mut iter = db.iterator(IteratorMode::Start); + assert!(iter.next().is_some()); + assert!(iter.next().is_none()); + assert!(iter.next().is_none()); + } +} + +#[test] +fn iterator_test_tailing() { + let path = DBPath::new("_rust_rocksdb_iteratortest_tailing"); + { + let data = [(b"k1", b"v1"), (b"k2", b"v2"), (b"k3", b"v3")]; + let mut ro = ReadOptions::default(); + ro.set_tailing(true); + let db = DB::open_default(&path).unwrap(); + + let mut data_iter = data.iter(); + let (k, v) = data_iter.next().unwrap(); + let r = db.put(k, v); + assert!(r.is_ok()); + + let tail_iter = db.iterator_opt(IteratorMode::Start, ro); + for (k, v) in data_iter { + let r = db.put(k, v); + assert!(r.is_ok()); + } + + let mut tot = 0; + for (i, (k, v)) in tail_iter.enumerate() { + assert_eq!( + (k.to_vec(), v.to_vec()), + (data[i].0.to_vec(), data[i].1.to_vec()) + ); + tot += 1; + } + assert_eq!(tot, data.len()); + } +} + +#[test] +fn iterator_test_upper_bound() { + let path = DBPath::new("_rust_rocksdb_iteratortest_upper_bound"); + { + let db = DB::open_default(&path).unwrap(); + db.put(b"k1", b"v1").unwrap(); + db.put(b"k2", b"v2").unwrap(); + db.put(b"k3", b"v3").unwrap(); + db.put(b"k4", b"v4").unwrap(); + db.put(b"k5", b"v5").unwrap(); + + let mut readopts = ReadOptions::default(); + readopts.set_iterate_upper_bound(b"k4".to_vec()); + + let iter = db.iterator_opt(IteratorMode::Start, readopts); + let expected: Vec<_> = vec![(b"k1", b"v1"), (b"k2", b"v2"), (b"k3", b"v3")] + .into_iter() + .map(|(k, v)| (k.to_vec().into_boxed_slice(), v.to_vec().into_boxed_slice())) + .collect(); + assert_eq!(expected, iter.collect::>()); + } +} + +#[test] +fn iterator_test_lower_bound() { + let path = DBPath::new("_rust_rocksdb_iteratortest_lower_bound"); + { + let db = DB::open_default(&path).unwrap(); + db.put(b"k1", b"v1").unwrap(); + db.put(b"k2", b"v2").unwrap(); + db.put(b"k3", b"v3").unwrap(); + db.put(b"k4", b"v4").unwrap(); + db.put(b"k5", b"v5").unwrap(); + + let mut readopts = ReadOptions::default(); + readopts.set_iterate_lower_bound(b"k4".to_vec()); + + let iter = db.iterator_opt(IteratorMode::Start, readopts); + let expected: Vec<_> = vec![(b"k4", b"v4"), (b"k5", b"v5")] + .into_iter() + .map(|(k, v)| (k.to_vec().into_boxed_slice(), v.to_vec().into_boxed_slice())) + .collect(); + assert_eq!(expected, iter.collect::>()); + } +} + #[test] fn snapshot_test() { let path = DBPath::new("_rust_rocksdb_snapshottest"); @@ -374,3 +485,275 @@ fn test_open_with_ttl() { db.compact_range(None::<&[u8]>, None::<&[u8]>); assert!(db.get(b"key1").unwrap().is_none()); } + +#[test] +fn compact_range_test() { + let path = DBPath::new("_rust_rocksdb_compact_range_test"); + { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + + // set compaction style + let mut uni_co_opts = UniversalCompactOptions::default(); + uni_co_opts.set_size_ratio(2); + uni_co_opts.set_stop_style(UniversalCompactionStopStyle::Total); + opts.set_compaction_style(DBCompactionStyle::Universal); + opts.set_universal_compaction_options(&uni_co_opts); + + // set compaction options + let mut compact_opts = CompactOptions::default(); + compact_opts.set_exclusive_manual_compaction(true); + compact_opts.set_target_level(1); + compact_opts.set_change_level(true); + compact_opts.set_bottommost_level_compaction(BottommostLevelCompaction::ForceOptimized); + + // put and compact column family cf1 + let cfs = vec!["cf1"]; + let db = DB::open_cf(&opts, &path, cfs).unwrap(); + let cf1 = db.cf_handle("cf1").unwrap(); + db.put_cf(cf1, b"k1", b"v1").unwrap(); + db.put_cf(cf1, b"k2", b"v2").unwrap(); + db.put_cf(cf1, b"k3", b"v3").unwrap(); + db.put_cf(cf1, b"k4", b"v4").unwrap(); + db.put_cf(cf1, b"k5", b"v5").unwrap(); + db.compact_range_cf(cf1, Some(b"k2"), Some(b"k4")); + db.compact_range_cf_opt(cf1, Some(b"k1"), None::<&str>, &compact_opts); + + // put and compact default column family + db.put(b"k1", b"v1").unwrap(); + db.put(b"k2", b"v2").unwrap(); + db.put(b"k3", b"v3").unwrap(); + db.put(b"k4", b"v4").unwrap(); + db.put(b"k5", b"v5").unwrap(); + db.compact_range(Some(b"k3"), None::<&str>); + db.compact_range_opt(None::<&str>, Some(b"k5"), &compact_opts); + } +} + +#[test] +fn fifo_compaction_test() { + let path = DBPath::new("_rust_rocksdb_fifo_compaction_test"); + { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + + // set compaction style + let mut fifo_co_opts = FifoCompactOptions::default(); + fifo_co_opts.set_max_table_files_size(4 << 10); // 4KB + opts.set_compaction_style(DBCompactionStyle::Fifo); + opts.set_fifo_compaction_options(&fifo_co_opts); + + // put and compact column family cf1 + let cfs = vec!["cf1"]; + let db = DB::open_cf(&opts, &path, cfs).unwrap(); + let cf1 = db.cf_handle("cf1").unwrap(); + db.put_cf(cf1, b"k1", b"v1").unwrap(); + db.put_cf(cf1, b"k2", b"v2").unwrap(); + db.put_cf(cf1, b"k3", b"v3").unwrap(); + db.put_cf(cf1, b"k4", b"v4").unwrap(); + db.put_cf(cf1, b"k5", b"v5").unwrap(); + db.compact_range_cf(cf1, Some(b"k2"), Some(b"k4")); + + // check stats + let ctx = PerfContext::default(); + + let block_cache_hit_count = ctx.metric(PerfMetric::BlockCacheHitCount); + assert!(block_cache_hit_count > 0); + + let expect = format!("block_cache_hit_count = {}", block_cache_hit_count); + assert!(ctx.report(true).contains(&expect)); + } +} + +#[test] +fn env_and_dbpaths_test() { + let path = DBPath::new("_rust_rocksdb_dbpath_test"); + let path1 = DBPath::new("_rust_rocksdb_dbpath_test_1"); + let path2 = DBPath::new("_rust_rocksdb_dbpath_test_2"); + { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + + let mut env = Env::default().unwrap(); + env.lower_high_priority_thread_pool_cpu_priority(); + opts.set_env(&env); + + let mut paths = Vec::new(); + paths.push(rocksdb::DBPath::new(&path1, 20 << 20).unwrap()); + paths.push(rocksdb::DBPath::new(&path2, 30 << 20).unwrap()); + opts.set_db_paths(&paths); + + let db = DB::open(&opts, &path).unwrap(); + db.put(b"k1", b"v1").unwrap(); + assert_eq!(db.get(b"k1").unwrap().unwrap(), b"v1"); + } +} + +#[test] +fn prefix_extract_and_iterate_test() { + let path = DBPath::new("_rust_rocksdb_prefix_extract_and_iterate"); + { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(2)); + + let db = DB::open(&opts, &path).unwrap(); + db.put(b"p1_k1", b"v1").unwrap(); + db.put(b"p2_k2", b"v2").unwrap(); + db.put(b"p1_k3", b"v3").unwrap(); + db.put(b"p1_k4", b"v4").unwrap(); + db.put(b"p2_k5", b"v5").unwrap(); + + let mut readopts = ReadOptions::default(); + readopts.set_prefix_same_as_start(true); + readopts.set_iterate_lower_bound(b"p1".to_vec()); + readopts.set_pin_data(true); + + let iter = db.iterator_opt(IteratorMode::Start, readopts); + let expected: Vec<_> = vec![(b"p1_k1", b"v1"), (b"p1_k3", b"v3"), (b"p1_k4", b"v4")] + .into_iter() + .map(|(k, v)| (k.to_vec().into_boxed_slice(), v.to_vec().into_boxed_slice())) + .collect(); + assert_eq!(expected, iter.collect::>()); + } +} + +#[test] +fn get_with_cache_and_bulkload_test() { + let path = DBPath::new("_rust_rocksdb_get_with_cache_and_bulkload_test"); + let log_path = DBPath::new("_rust_rocksdb_log_path_test"); + { + // create options + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + opts.set_wal_bytes_per_sync(8 << 10); // 8KB + opts.set_writable_file_max_buffer_size(512 << 10); // 512KB + opts.set_enable_write_thread_adaptive_yield(true); + opts.set_unordered_write(true); + opts.set_max_subcompactions(2); + opts.set_max_background_jobs(4); + opts.set_use_adaptive_mutex(true); + + // set block based table and cache + let cache = Cache::new_lru_cache(64 << 10).unwrap(); + let mut block_based_opts = BlockBasedOptions::default(); + block_based_opts.set_block_cache(&cache); + block_based_opts.set_cache_index_and_filter_blocks(true); + opts.set_block_based_table_factory(&block_based_opts); + + // open db + let db = DB::open(&opts, &path).unwrap(); + + // write a lot + let mut batch = WriteBatch::default(); + for i in 0..1_000 { + batch.put(format!("{:0>4}", i).as_bytes(), b"v"); + } + assert!(db.write(batch).is_ok()); + + // flush memory table to sst, trigger cache usage on `get` + assert!(db.flush().is_ok()); + + // get -> trigger caching + let _ = db.get(b"1"); + assert!(cache.get_usage() > 0); + } + + // bulk loading + { + // create new options + let mut opts = Options::default(); + opts.set_delete_obsolete_files_period_micros(100_000); + opts.prepare_for_bulk_load(); + opts.set_db_log_dir(&log_path); + opts.set_max_sequential_skip_in_iterations(16); + opts.set_paranoid_checks(true); + + // open db + let db = DB::open(&opts, &path).unwrap(); + + // try to get key + let iter = db.iterator(IteratorMode::Start); + for (expected, (k, _)) in iter.enumerate() { + assert_eq!(k.as_ref(), format!("{:0>4}", expected).as_bytes()); + } + } + + // raise error when db exists + { + // create new options + let mut opts = Options::default(); + opts.set_error_if_exists(true); + assert!(DB::open(&opts, &path).is_err()); + } +} + +#[test] +fn test_open_for_read_only() { + let path = DBPath::new("_rust_rocksdb_test_open_for_read_only"); + { + let db = DB::open_default(&path).unwrap(); + db.put(b"k1", b"v1").unwrap(); + } + { + let opts = Options::default(); + let error_if_log_file_exist = false; + let db = DB::open_for_read_only(&opts, &path, error_if_log_file_exist).unwrap(); + assert_eq!(db.get(b"k1").unwrap().unwrap(), b"v1"); + assert!(db.put(b"k2", b"v2").is_err()); + } +} + +#[test] +fn test_open_cf_for_read_only() { + let path = DBPath::new("_rust_rocksdb_test_open_cf_for_read_only"); + let cfs = vec!["cf1"]; + { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + let db = DB::open_cf(&opts, &path, cfs.clone()).unwrap(); + let cf1 = db.cf_handle("cf1").unwrap(); + db.put_cf(cf1, b"k1", b"v1").unwrap(); + } + { + let opts = Options::default(); + let error_if_log_file_exist = false; + let db = DB::open_cf_for_read_only(&opts, &path, cfs, error_if_log_file_exist).unwrap(); + let cf1 = db.cf_handle("cf1").unwrap(); + assert_eq!(db.get_cf(cf1, b"k1").unwrap().unwrap(), b"v1"); + assert!(db.put_cf(cf1, b"k2", b"v2").is_err()); + } +} + +#[test] +fn delete_range_test() { + let path = DBPath::new("_rust_rocksdb_delete_range_test"); + { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + + let cfs = vec!["cf1"]; + let db = DB::open_cf(&opts, &path, cfs).unwrap(); + + let cf1 = db.cf_handle("cf1").unwrap(); + db.put_cf(cf1, b"k1", b"v1").unwrap(); + db.put_cf(cf1, b"k2", b"v2").unwrap(); + db.put_cf(cf1, b"k3", b"v3").unwrap(); + db.put_cf(cf1, b"k4", b"v4").unwrap(); + db.put_cf(cf1, b"k5", b"v5").unwrap(); + + db.delete_range_cf(cf1, b"k2", b"k4").unwrap(); + assert_eq!(db.get_cf(cf1, b"k1").unwrap().unwrap(), b"v1"); + assert_eq!(db.get_cf(cf1, b"k4").unwrap().unwrap(), b"v4"); + assert_eq!(db.get_cf(cf1, b"k5").unwrap().unwrap(), b"v5"); + assert!(db.get_cf(cf1, b"k2").unwrap().is_none()); + assert!(db.get_cf(cf1, b"k3").unwrap().is_none()); + } +}