Add dbpath and env options (#445)

master
Linh Tran Tuan 4 years ago committed by GitHub
parent d6c779c179
commit 729fb69993
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 542
      src/db.rs
  2. 211
      src/db_options.rs
  3. 2
      src/lib.rs
  4. 391
      tests/test_db.rs

@ -1288,545 +1288,3 @@ impl fmt::Debug for DB {
write!(f, "RocksDB {{ path: {:?} }}", self.path())
}
}
#[test]
fn test_open_for_read_only() {
let path = "_rust_rocksdb_test_open_for_read_only";
{
let db = DB::open_default(path).unwrap();
db.put(b"k1", b"v1").unwrap();
}
{
let opts = Options::default();
let error_if_log_file_exist = false;
let db = DB::open_for_read_only(&opts, path, error_if_log_file_exist).unwrap();
assert_eq!(db.get(b"k1").unwrap().unwrap(), b"v1");
assert!(db.put(b"k2", b"v2").is_err());
}
let opts = Options::default();
assert!(DB::destroy(&opts, path).is_ok());
}
#[test]
fn test_open_cf_for_read_only() {
let path = "_rust_rocksdb_test_open_cf_for_read_only";
let cfs = vec!["cf1"];
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
let db = DB::open_cf(&opts, path, cfs.clone()).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
db.put_cf(cf1, b"k1", b"v1").unwrap();
}
{
let opts = Options::default();
let error_if_log_file_exist = false;
let db = DB::open_cf_for_read_only(&opts, path, cfs, error_if_log_file_exist).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
assert_eq!(db.get_cf(cf1, b"k1").unwrap().unwrap(), b"v1");
assert!(db.put_cf(cf1, b"k2", b"v2").is_err());
}
let opts = Options::default();
assert!(DB::destroy(&opts, path).is_ok());
}
#[test]
fn external() {
let path = "_rust_rocksdb_externaltest";
{
let db = DB::open_default(path).unwrap();
let p = db.put(b"k1", b"v1111");
assert!(p.is_ok());
let r: Result<Option<Vec<u8>>, Error> = db.get(b"k1");
assert_eq!(r.unwrap().unwrap(), b"v1111");
assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").unwrap().is_none());
}
let opts = Options::default();
let result = DB::destroy(&opts, path);
assert!(result.is_ok());
}
#[test]
fn errors_do_stuff() {
let path = "_rust_rocksdb_error";
{
let _db = DB::open_default(path).unwrap();
let opts = Options::default();
// The DB will still be open when we try to destroy it and the lock should fail.
match DB::destroy(&opts, path) {
Err(s) => {
let message = s.to_string();
assert!(message.find("IO error:").is_some());
assert!(message.find("_rust_rocksdb_error/LOCK:").is_some());
}
Ok(_) => panic!("should fail"),
}
}
let opts = Options::default();
let result = DB::destroy(&opts, path);
assert!(result.is_ok());
}
#[test]
fn writebatch_works() {
let path = "_rust_rocksdb_writebacktest";
{
let db = DB::open_default(path).unwrap();
{
// test putx
let mut batch = WriteBatch::default();
assert!(db.get(b"k1").unwrap().is_none());
assert_eq!(batch.len(), 0);
assert!(batch.is_empty());
batch.put(b"k1", b"v1111");
batch.put(b"k2", b"v2222");
batch.put(b"k3", b"v3333");
assert_eq!(batch.len(), 3);
assert!(!batch.is_empty());
assert!(db.get(b"k1").unwrap().is_none());
let p = db.write(batch);
assert!(p.is_ok());
let r: Result<Option<Vec<u8>>, Error> = db.get(b"k1");
assert_eq!(r.unwrap().unwrap(), b"v1111");
}
{
// test delete
let mut batch = WriteBatch::default();
batch.delete(b"k1");
assert_eq!(batch.len(), 1);
assert!(!batch.is_empty());
let p = db.write(batch);
assert!(p.is_ok());
assert!(db.get(b"k1").unwrap().is_none());
}
{
// test delete_range
let mut batch = WriteBatch::default();
batch.delete_range(b"k2", b"k4");
assert_eq!(batch.len(), 1);
assert!(!batch.is_empty());
let p = db.write(batch);
assert!(p.is_ok());
assert!(db.get(b"k2").unwrap().is_none());
assert!(db.get(b"k3").unwrap().is_none());
}
{
// test size_in_bytes
let mut batch = WriteBatch::default();
let before = batch.size_in_bytes();
batch.put(b"k1", b"v1234567890");
let after = batch.size_in_bytes();
assert!(before + 10 <= after);
}
}
let opts = Options::default();
assert!(DB::destroy(&opts, path).is_ok());
}
#[test]
fn get_with_cache_and_bulkload_test() {
use crate::{BlockBasedOptions, Cache};
let path = "_rust_rocksdb_get_with_cache_and_bulkload_test";
let log_path = "_rust_rocksdb_log_path_test";
{
// create options
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.set_wal_bytes_per_sync(8 << 10); // 8KB
opts.set_writable_file_max_buffer_size(512 << 10); // 512KB
opts.set_enable_write_thread_adaptive_yield(true);
opts.set_unordered_write(true);
opts.set_max_subcompactions(2);
opts.set_max_background_jobs(4);
opts.set_use_adaptive_mutex(true);
// set block based table and cache
let cache = Cache::new_lru_cache(64 << 10).unwrap();
let mut block_based_opts = BlockBasedOptions::default();
block_based_opts.set_block_cache(&cache);
block_based_opts.set_cache_index_and_filter_blocks(true);
opts.set_block_based_table_factory(&block_based_opts);
// open db
let db = DB::open(&opts, path).unwrap();
// write a lot
let mut batch = WriteBatch::default();
for i in 0..1_000 {
batch.put(format!("{:0>4}", i).as_bytes(), b"v");
}
assert!(db.write(batch).is_ok());
// flush memory table to sst, trigger cache usage on `get`
assert!(db.flush().is_ok());
// get -> trigger caching
let _ = db.get(b"1");
assert!(cache.get_usage() > 0);
}
// bulk loading
{
// create new options
let mut opts = Options::default();
opts.set_delete_obsolete_files_period_micros(100_000);
opts.prepare_for_bulk_load();
opts.set_db_log_dir(log_path);
opts.set_max_sequential_skip_in_iterations(16);
// open db
let db = DB::open(&opts, path).unwrap();
// try to get key
let iter = db.iterator(IteratorMode::Start);
for (expected, (k, _)) in iter.enumerate() {
assert_eq!(k.as_ref(), format!("{:0>4}", expected).as_bytes());
}
}
let opts = Options::default();
assert!(DB::destroy(&opts, path).is_ok());
assert!(DB::destroy(&opts, log_path).is_ok());
}
#[test]
fn iterator_test() {
let path = "_rust_rocksdb_iteratortest";
{
let db = DB::open_default(path).unwrap();
let p = db.put(b"k1", b"v1111");
assert!(p.is_ok());
let p = db.put(b"k2", b"v2222");
assert!(p.is_ok());
let p = db.put(b"k3", b"v3333");
assert!(p.is_ok());
let iter = db.iterator(IteratorMode::Start);
for (k, v) in iter {
println!(
"Hello {}: {}",
str::from_utf8(&*k).unwrap(),
str::from_utf8(&*v).unwrap()
);
}
}
let opts = Options::default();
assert!(DB::destroy(&opts, path).is_ok());
}
#[test]
fn iterator_test_past_end() {
let path = "_rust_rocksdb_iteratortest_past_end";
{
let db = DB::open_default(path).unwrap();
db.put(b"k1", b"v1111").unwrap();
let mut iter = db.iterator(IteratorMode::Start);
assert!(iter.next().is_some());
assert!(iter.next().is_none());
assert!(iter.next().is_none());
}
let opts = Options::default();
DB::destroy(&opts, path).unwrap();
}
#[test]
fn iterator_test_upper_bound() {
let path = "_rust_rocksdb_iteratortest_upper_bound";
{
let db = DB::open_default(path).unwrap();
db.put(b"k1", b"v1").unwrap();
db.put(b"k2", b"v2").unwrap();
db.put(b"k3", b"v3").unwrap();
db.put(b"k4", b"v4").unwrap();
db.put(b"k5", b"v5").unwrap();
let mut readopts = ReadOptions::default();
readopts.set_iterate_upper_bound(b"k4".to_vec());
let iter = db.iterator_opt(IteratorMode::Start, readopts);
let expected: Vec<_> = vec![(b"k1", b"v1"), (b"k2", b"v2"), (b"k3", b"v3")]
.into_iter()
.map(|(k, v)| (k.to_vec().into_boxed_slice(), v.to_vec().into_boxed_slice()))
.collect();
assert_eq!(expected, iter.collect::<Vec<_>>());
}
let opts = Options::default();
DB::destroy(&opts, path).unwrap();
}
#[test]
fn iterator_test_lower_bound() {
let path = "_rust_rocksdb_iteratortest_lower_bound";
{
let db = DB::open_default(path).unwrap();
db.put(b"k1", b"v1").unwrap();
db.put(b"k2", b"v2").unwrap();
db.put(b"k3", b"v3").unwrap();
db.put(b"k4", b"v4").unwrap();
db.put(b"k5", b"v5").unwrap();
let mut readopts = ReadOptions::default();
readopts.set_iterate_lower_bound(b"k4".to_vec());
let iter = db.iterator_opt(IteratorMode::Start, readopts);
let expected: Vec<_> = vec![(b"k4", b"v4"), (b"k5", b"v5")]
.into_iter()
.map(|(k, v)| (k.to_vec().into_boxed_slice(), v.to_vec().into_boxed_slice()))
.collect();
assert_eq!(expected, iter.collect::<Vec<_>>());
}
let opts = Options::default();
DB::destroy(&opts, path).unwrap();
}
#[test]
fn prefix_extract_and_iterate_test() {
use crate::SliceTransform;
let path = "_rust_rocksdb_prefix_extract_and_iterate";
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(2));
let db = DB::open(&opts, path).unwrap();
db.put(b"p1_k1", b"v1").unwrap();
db.put(b"p2_k2", b"v2").unwrap();
db.put(b"p1_k3", b"v3").unwrap();
db.put(b"p1_k4", b"v4").unwrap();
db.put(b"p2_k5", b"v5").unwrap();
let mut readopts = ReadOptions::default();
readopts.set_prefix_same_as_start(true);
readopts.set_iterate_lower_bound(b"p1".to_vec());
readopts.set_pin_data(true);
let iter = db.iterator_opt(IteratorMode::Start, readopts);
let expected: Vec<_> = vec![(b"p1_k1", b"v1"), (b"p1_k3", b"v3"), (b"p1_k4", b"v4")]
.into_iter()
.map(|(k, v)| (k.to_vec().into_boxed_slice(), v.to_vec().into_boxed_slice()))
.collect();
assert_eq!(expected, iter.collect::<Vec<_>>());
}
let opts = Options::default();
DB::destroy(&opts, path).unwrap();
}
#[test]
fn iterator_test_tailing() {
let path = "_rust_rocksdb_iteratortest_tailing";
{
let data = [(b"k1", b"v1"), (b"k2", b"v2"), (b"k3", b"v3")];
let mut ro = ReadOptions::default();
ro.set_tailing(true);
let db = DB::open_default(path).unwrap();
let mut data_iter = data.iter();
let (k, v) = data_iter.next().unwrap();
let r = db.put(k, v);
assert!(r.is_ok());
let tail_iter = db.iterator_opt(IteratorMode::Start, ro);
for (k, v) in data_iter {
let r = db.put(k, v);
assert!(r.is_ok());
}
let mut tot = 0;
for (i, (k, v)) in tail_iter.enumerate() {
assert_eq!(
(k.to_vec(), v.to_vec()),
(data[i].0.to_vec(), data[i].1.to_vec())
);
tot += 1;
}
assert_eq!(tot, data.len());
}
let opts = Options::default();
assert!(DB::destroy(&opts, path).is_ok());
}
#[test]
fn snapshot_test() {
let path = "_rust_rocksdb_snapshottest";
{
let db = DB::open_default(path).unwrap();
let p = db.put(b"k1", b"v1111");
assert!(p.is_ok());
let snap = db.snapshot();
let r: Result<Option<Vec<u8>>, Error> = snap.get(b"k1");
assert_eq!(r.unwrap().unwrap(), b"v1111");
let p = db.put(b"k2", b"v2222");
assert!(p.is_ok());
assert!(db.get(b"k2").unwrap().is_some());
assert!(snap.get(b"k2").unwrap().is_none());
}
let opts = Options::default();
assert!(DB::destroy(&opts, path).is_ok());
}
#[test]
fn set_option_test() {
let path = "_rust_rocksdb_set_optionstest";
{
let db = DB::open_default(path).unwrap();
// set an option to valid values
assert!(db
.set_options(&[("disable_auto_compactions", "true")])
.is_ok());
assert!(db
.set_options(&[("disable_auto_compactions", "false")])
.is_ok());
// invalid names/values should result in an error
assert!(db
.set_options(&[("disable_auto_compactions", "INVALID_VALUE")])
.is_err());
assert!(db
.set_options(&[("INVALID_NAME", "INVALID_VALUE")])
.is_err());
// option names/values must not contain NULLs
assert!(db
.set_options(&[("disable_auto_compactions", "true\0")])
.is_err());
assert!(db
.set_options(&[("disable_auto_compactions\0", "true")])
.is_err());
// empty options are not allowed
assert!(db.set_options(&[]).is_err());
// multiple options can be set in a single API call
let multiple_options = [
("paranoid_file_checks", "true"),
("report_bg_io_stats", "true"),
];
db.set_options(&multiple_options).unwrap();
}
assert!(DB::destroy(&Options::default(), path).is_ok());
}
#[test]
fn delete_range_test() {
let path = "_rust_rocksdb_delete_range_test";
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
let cfs = vec!["cf1"];
let db = DB::open_cf(&opts, path, cfs).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
db.put_cf(cf1, b"k1", b"v1").unwrap();
db.put_cf(cf1, b"k2", b"v2").unwrap();
db.put_cf(cf1, b"k3", b"v3").unwrap();
db.put_cf(cf1, b"k4", b"v4").unwrap();
db.put_cf(cf1, b"k5", b"v5").unwrap();
db.delete_range_cf(cf1, b"k2", b"k4").unwrap();
assert_eq!(db.get_cf(cf1, b"k1").unwrap().unwrap(), b"v1");
assert_eq!(db.get_cf(cf1, b"k4").unwrap().unwrap(), b"v4");
assert_eq!(db.get_cf(cf1, b"k5").unwrap().unwrap(), b"v5");
assert!(db.get_cf(cf1, b"k2").unwrap().is_none());
assert!(db.get_cf(cf1, b"k3").unwrap().is_none());
}
let opts = Options::default();
DB::destroy(&opts, path).unwrap();
}
#[test]
fn compact_range_test() {
use crate::{
BottommostLevelCompaction, DBCompactionStyle, UniversalCompactOptions,
UniversalCompactionStopStyle,
};
let path = "_rust_rocksdb_compact_range_test";
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
// set compaction style
let mut uni_co_opts = UniversalCompactOptions::default();
uni_co_opts.set_size_ratio(2);
uni_co_opts.set_stop_style(UniversalCompactionStopStyle::Total);
opts.set_compaction_style(DBCompactionStyle::Universal);
opts.set_universal_compaction_options(&uni_co_opts);
// set compaction options
let mut compact_opts = CompactOptions::default();
compact_opts.set_exclusive_manual_compaction(true);
compact_opts.set_target_level(1);
compact_opts.set_change_level(true);
compact_opts.set_bottommost_level_compaction(BottommostLevelCompaction::ForceOptimized);
// put and compact column family cf1
let cfs = vec!["cf1"];
let db = DB::open_cf(&opts, path, cfs).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
db.put_cf(cf1, b"k1", b"v1").unwrap();
db.put_cf(cf1, b"k2", b"v2").unwrap();
db.put_cf(cf1, b"k3", b"v3").unwrap();
db.put_cf(cf1, b"k4", b"v4").unwrap();
db.put_cf(cf1, b"k5", b"v5").unwrap();
db.compact_range_cf(cf1, Some(b"k2"), Some(b"k4"));
db.compact_range_cf_opt(cf1, Some(b"k1"), None::<&str>, &compact_opts);
// put and compact default column family
db.put(b"k1", b"v1").unwrap();
db.put(b"k2", b"v2").unwrap();
db.put(b"k3", b"v3").unwrap();
db.put(b"k4", b"v4").unwrap();
db.put(b"k5", b"v5").unwrap();
db.compact_range(Some(b"k3"), None::<&str>);
db.compact_range_opt(None::<&str>, Some(b"k5"), &compact_opts);
}
let opts = Options::default();
DB::destroy(&opts, path).unwrap();
}
#[test]
fn fifo_compaction_test() {
use crate::{DBCompactionStyle, FifoCompactOptions, PerfContext, PerfMetric};
let path = "_rust_rocksdb_fifo_compaction_test";
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
// set compaction style
let mut fifo_co_opts = FifoCompactOptions::default();
fifo_co_opts.set_max_table_files_size(4 << 10); // 4KB
opts.set_compaction_style(DBCompactionStyle::Fifo);
opts.set_fifo_compaction_options(&fifo_co_opts);
// put and compact column family cf1
let cfs = vec!["cf1"];
let db = DB::open_cf(&opts, path, cfs).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
db.put_cf(cf1, b"k1", b"v1").unwrap();
db.put_cf(cf1, b"k2", b"v2").unwrap();
db.put_cf(cf1, b"k3", b"v3").unwrap();
db.put_cf(cf1, b"k4", b"v4").unwrap();
db.put_cf(cf1, b"k5", b"v5").unwrap();
db.compact_range_cf(cf1, Some(b"k2"), Some(b"k4"));
// check stats
let ctx = PerfContext::default();
let block_cache_hit_count = ctx.metric(PerfMetric::BlockCacheHitCount);
assert!(block_cache_hit_count > 0);
let expect = format!("block_cache_hit_count = {}", block_cache_hit_count);
assert!(ctx.report(true).contains(&expect));
}
let opts = Options::default();
DB::destroy(&opts, path).unwrap();
}

@ -74,6 +74,104 @@ impl Drop for Cache {
}
}
/// An Env is an interface used by the rocksdb implementation to access
/// operating system functionality like the filesystem etc. Callers
/// may wish to provide a custom Env object when opening a database to
/// get fine gain control; e.g., to rate limit file system operations.
///
/// All Env implementations are safe for concurrent access from
/// multiple threads without any external synchronization.
///
/// Note: currently, C API behinds C++ API for various settings.
/// See also: `rocksdb/include/env.h`
pub struct Env {
pub(crate) inner: *mut ffi::rocksdb_env_t,
}
impl Drop for Env {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_env_destroy(self.inner);
}
}
}
impl Env {
/// Returns default env
pub fn default() -> Result<Env, Error> {
let env = unsafe { ffi::rocksdb_create_default_env() };
if env.is_null() {
Err(Error::new("Could not create mem env".to_owned()))
} else {
Ok(Env { inner: env })
}
}
/// Returns a new environment that stores its data in memory and delegates
/// all non-file-storage tasks to base_env.
pub fn mem_env() -> Result<Env, Error> {
let env = unsafe { ffi::rocksdb_create_mem_env() };
if env.is_null() {
Err(Error::new("Could not create mem env".to_owned()))
} else {
Ok(Env { inner: env })
}
}
/// Sets the number of background worker threads of a specific thread pool for this environment.
/// `LOW` is the default pool.
///
/// Default: 1
pub fn set_background_threads(&mut self, num_threads: c_int) {
unsafe {
ffi::rocksdb_env_set_background_threads(self.inner, num_threads);
}
}
/// Sets the size of the high priority thread pool that can be used to
/// prevent compactions from stalling memtable flushes.
pub fn set_high_priority_background_threads(&mut self, n: c_int) {
unsafe {
ffi::rocksdb_env_set_high_priority_background_threads(self.inner, n);
}
}
/// Wait for all threads started by StartThread to terminate.
pub fn join_all_threads(&mut self) {
unsafe {
ffi::rocksdb_env_join_all_threads(self.inner);
}
}
/// Lowering IO priority for threads from the specified pool.
pub fn lower_thread_pool_io_priority(&mut self) {
unsafe {
ffi::rocksdb_env_lower_thread_pool_io_priority(self.inner);
}
}
/// Lowering IO priority for high priority thread pool.
pub fn lower_high_priority_thread_pool_io_priority(&mut self) {
unsafe {
ffi::rocksdb_env_lower_high_priority_thread_pool_io_priority(self.inner);
}
}
/// Lowering CPU priority for threads from the specified pool.
pub fn lower_thread_pool_cpu_priority(&mut self) {
unsafe {
ffi::rocksdb_env_lower_thread_pool_cpu_priority(self.inner);
}
}
/// Lowering CPU priority for high priority thread pool.
pub fn lower_high_priority_thread_pool_cpu_priority(&mut self) {
unsafe {
ffi::rocksdb_env_lower_high_priority_thread_pool_cpu_priority(self.inner);
}
}
}
/// Database-wide options around performance and behavior.
///
/// Please read the official tuning [guide](https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide)
@ -121,7 +219,7 @@ pub struct Options {
/// ```
/// use rocksdb::{DB, Options, WriteBatch, WriteOptions};
///
/// let path = "_path_for_rocksdb_storageY";
/// let path = "_path_for_rocksdb_storageY1";
/// {
/// let db = DB::open_default(path).unwrap();
/// let mut batch = WriteBatch::default();
@ -150,7 +248,7 @@ pub struct WriteOptions {
/// ```
/// use rocksdb::{DB, Options, FlushOptions};
///
/// let path = "_path_for_rocksdb_storageY";
/// let path = "_path_for_rocksdb_storageY2";
/// {
/// let db = DB::open_default(path).unwrap();
///
@ -191,11 +289,14 @@ pub struct ReadOptions {
/// writer.put(b"k1", b"v1").unwrap();
/// writer.finish().unwrap();
///
/// let path = "_path_for_rocksdb_storageY";
/// let path = "_path_for_rocksdb_storageY3";
/// {
/// let db = DB::open_default(&path).unwrap();
/// let mut ingest_opts = IngestExternalFileOptions::default();
/// ingest_opts.set_move_files(true);
/// db.ingest_external_file_opts(&ingest_opts, vec!["_path_for_sst_file"]).unwrap();
/// }
/// let _ = DB::destroy(&Options::default(), path);
/// ```
pub struct IngestExternalFileOptions {
pub(crate) inner: *mut ffi::rocksdb_ingestexternalfileoptions_t,
@ -618,6 +719,81 @@ impl Options {
}
}
/// Specifies whether an error should be raised if the database already exists.
///
/// Default: false
pub fn set_error_if_exists(&mut self, enabled: bool) {
unsafe {
ffi::rocksdb_options_set_error_if_exists(self.inner, enabled as c_uchar);
}
}
/// Enable/disable paranoid checks.
///
/// If true, the implementation will do aggressive checking of the
/// data it is processing and will stop early if it detects any
/// errors. This may have unforeseen ramifications: for example, a
/// corruption of one DB entry may cause a large number of entries to
/// become unreadable or for the entire DB to become unopenable.
/// If any of the writes to the database fails (Put, Delete, Merge, Write),
/// the database will switch to read-only mode and fail all other
/// Write operations.
///
/// Default: false
pub fn set_paranoid_checks(&mut self, enabled: bool) {
unsafe {
ffi::rocksdb_options_set_paranoid_checks(self.inner, enabled as c_uchar);
}
}
/// A list of paths where SST files can be put into, with its target size.
/// Newer data is placed into paths specified earlier in the vector while
/// older data gradually moves to paths specified later in the vector.
///
/// For example, you have a flash device with 10GB allocated for the DB,
/// as well as a hard drive of 2TB, you should config it to be:
/// [{"/flash_path", 10GB}, {"/hard_drive", 2TB}]
///
/// The system will try to guarantee data under each path is close to but
/// not larger than the target size. But current and future file sizes used
/// by determining where to place a file are based on best-effort estimation,
/// which means there is a chance that the actual size under the directory
/// is slightly more than target size under some workloads. User should give
/// some buffer room for those cases.
///
/// If none of the paths has sufficient room to place a file, the file will
/// be placed to the last path anyway, despite to the target size.
///
/// Placing newer data to earlier paths is also best-efforts. User should
/// expect user files to be placed in higher levels in some extreme cases.
///
/// If left empty, only one path will be used, which is `path` passed when
/// opening the DB.
///
/// Default: empty
pub fn set_db_paths(&mut self, paths: &[DBPath]) {
let mut paths: Vec<_> = paths
.iter()
.map(|path| path.inner as *const ffi::rocksdb_dbpath_t)
.collect();
let num_paths = paths.len();
unsafe {
ffi::rocksdb_options_set_db_paths(self.inner, paths.as_mut_ptr(), num_paths);
}
}
/// Use the specified object to interact with the environment,
/// e.g. to read/write files, schedule background work, etc. In the near
/// future, support for doing storage operations such as read/write files
/// through env will be deprecated in favor of file_system.
///
/// Default: Env::default()
pub fn set_env(&mut self, env: &Env) {
unsafe {
ffi::rocksdb_options_set_env(self.inner, env.inner);
}
}
/// Sets the compression algorithm that will be used for compressing blocks.
///
/// Default: `DBCompressionType::Snappy` (`DBCompressionType::None` if
@ -3014,6 +3190,35 @@ impl CompactOptions {
}
}
/// Represents a path where sst files can be put into
pub struct DBPath {
pub(crate) inner: *mut ffi::rocksdb_dbpath_t,
}
impl DBPath {
/// Create a new path
pub fn new<P: AsRef<Path>>(path: P, target_size: u64) -> Result<Self, Error> {
let p = CString::new(path.as_ref().to_string_lossy().as_bytes()).unwrap();
let dbpath = unsafe { ffi::rocksdb_dbpath_create(p.as_ptr(), target_size) };
if dbpath.is_null() {
Err(Error::new(format!(
"Could not create path for storing sst files at location: {}",
path.as_ref().to_string_lossy()
)))
} else {
Ok(DBPath { inner: dbpath })
}
}
}
impl Drop for DBPath {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_dbpath_destroy(self.inner);
}
}
}
#[cfg(test)]
mod tests {
use crate::{MemtableFactory, Options};

@ -97,7 +97,7 @@ pub use crate::{
db_iterator::{DBIterator, DBRawIterator, DBWALIterator, Direction, IteratorMode},
db_options::{
BlockBasedIndexType, BlockBasedOptions, BottommostLevelCompaction, Cache, CompactOptions,
DBCompactionStyle, DBCompressionType, DBRecoveryMode, DataBlockIndexType,
DBCompactionStyle, DBCompressionType, DBPath, DBRecoveryMode, DataBlockIndexType, Env,
FifoCompactOptions, FlushOptions, IngestExternalFileOptions, MemtableFactory, Options,
PlainTableFactoryOptions, ReadOptions, UniversalCompactOptions,
UniversalCompactionStopStyle, WriteOptions,

@ -14,7 +14,12 @@
mod util;
use rocksdb::{Error, IteratorMode, Options, Snapshot, WriteBatch, DB};
use rocksdb::{
BlockBasedOptions, BottommostLevelCompaction, Cache, CompactOptions, DBCompactionStyle, Env,
Error, FifoCompactOptions, IteratorMode, Options, PerfContext, PerfMetric, ReadOptions,
SliceTransform, Snapshot, UniversalCompactOptions, UniversalCompactionStopStyle, WriteBatch,
DB,
};
use std::sync::Arc;
use std::time::Duration;
use std::{mem, thread};
@ -85,10 +90,13 @@ fn writebatch_works() {
assert_eq!(batch.len(), 0);
assert!(batch.is_empty());
batch.put(b"k1", b"v1111");
assert_eq!(batch.len(), 1);
batch.put(b"k2", b"v2222");
batch.put(b"k3", b"v3333");
assert_eq!(batch.len(), 3);
assert!(!batch.is_empty());
assert!(db.get(b"k1").unwrap().is_none());
assert!(db.write(batch).is_ok());
let p = db.write(batch);
assert!(p.is_ok());
let r: Result<Option<Vec<u8>>, Error> = db.get(b"k1");
assert_eq!(r.unwrap().unwrap(), b"v1111");
}
@ -98,9 +106,21 @@ fn writebatch_works() {
batch.delete(b"k1");
assert_eq!(batch.len(), 1);
assert!(!batch.is_empty());
assert!(db.write(batch).is_ok());
let p = db.write(batch);
assert!(p.is_ok());
assert!(db.get(b"k1").unwrap().is_none());
}
{
// test delete_range
let mut batch = WriteBatch::default();
batch.delete_range(b"k2", b"k4");
assert_eq!(batch.len(), 1);
assert!(!batch.is_empty());
let p = db.write(batch);
assert!(p.is_ok());
assert!(db.get(b"k2").unwrap().is_none());
assert!(db.get(b"k3").unwrap().is_none());
}
{
// test size_in_bytes
let mut batch = WriteBatch::default();
@ -132,6 +152,97 @@ fn iterator_test() {
}
}
#[test]
fn iterator_test_past_end() {
let path = DBPath::new("_rust_rocksdb_iteratortest_past_end");
{
let db = DB::open_default(&path).unwrap();
db.put(b"k1", b"v1111").unwrap();
let mut iter = db.iterator(IteratorMode::Start);
assert!(iter.next().is_some());
assert!(iter.next().is_none());
assert!(iter.next().is_none());
}
}
#[test]
fn iterator_test_tailing() {
let path = DBPath::new("_rust_rocksdb_iteratortest_tailing");
{
let data = [(b"k1", b"v1"), (b"k2", b"v2"), (b"k3", b"v3")];
let mut ro = ReadOptions::default();
ro.set_tailing(true);
let db = DB::open_default(&path).unwrap();
let mut data_iter = data.iter();
let (k, v) = data_iter.next().unwrap();
let r = db.put(k, v);
assert!(r.is_ok());
let tail_iter = db.iterator_opt(IteratorMode::Start, ro);
for (k, v) in data_iter {
let r = db.put(k, v);
assert!(r.is_ok());
}
let mut tot = 0;
for (i, (k, v)) in tail_iter.enumerate() {
assert_eq!(
(k.to_vec(), v.to_vec()),
(data[i].0.to_vec(), data[i].1.to_vec())
);
tot += 1;
}
assert_eq!(tot, data.len());
}
}
#[test]
fn iterator_test_upper_bound() {
let path = DBPath::new("_rust_rocksdb_iteratortest_upper_bound");
{
let db = DB::open_default(&path).unwrap();
db.put(b"k1", b"v1").unwrap();
db.put(b"k2", b"v2").unwrap();
db.put(b"k3", b"v3").unwrap();
db.put(b"k4", b"v4").unwrap();
db.put(b"k5", b"v5").unwrap();
let mut readopts = ReadOptions::default();
readopts.set_iterate_upper_bound(b"k4".to_vec());
let iter = db.iterator_opt(IteratorMode::Start, readopts);
let expected: Vec<_> = vec![(b"k1", b"v1"), (b"k2", b"v2"), (b"k3", b"v3")]
.into_iter()
.map(|(k, v)| (k.to_vec().into_boxed_slice(), v.to_vec().into_boxed_slice()))
.collect();
assert_eq!(expected, iter.collect::<Vec<_>>());
}
}
#[test]
fn iterator_test_lower_bound() {
let path = DBPath::new("_rust_rocksdb_iteratortest_lower_bound");
{
let db = DB::open_default(&path).unwrap();
db.put(b"k1", b"v1").unwrap();
db.put(b"k2", b"v2").unwrap();
db.put(b"k3", b"v3").unwrap();
db.put(b"k4", b"v4").unwrap();
db.put(b"k5", b"v5").unwrap();
let mut readopts = ReadOptions::default();
readopts.set_iterate_lower_bound(b"k4".to_vec());
let iter = db.iterator_opt(IteratorMode::Start, readopts);
let expected: Vec<_> = vec![(b"k4", b"v4"), (b"k5", b"v5")]
.into_iter()
.map(|(k, v)| (k.to_vec().into_boxed_slice(), v.to_vec().into_boxed_slice()))
.collect();
assert_eq!(expected, iter.collect::<Vec<_>>());
}
}
#[test]
fn snapshot_test() {
let path = DBPath::new("_rust_rocksdb_snapshottest");
@ -374,3 +485,275 @@ fn test_open_with_ttl() {
db.compact_range(None::<&[u8]>, None::<&[u8]>);
assert!(db.get(b"key1").unwrap().is_none());
}
#[test]
fn compact_range_test() {
let path = DBPath::new("_rust_rocksdb_compact_range_test");
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
// set compaction style
let mut uni_co_opts = UniversalCompactOptions::default();
uni_co_opts.set_size_ratio(2);
uni_co_opts.set_stop_style(UniversalCompactionStopStyle::Total);
opts.set_compaction_style(DBCompactionStyle::Universal);
opts.set_universal_compaction_options(&uni_co_opts);
// set compaction options
let mut compact_opts = CompactOptions::default();
compact_opts.set_exclusive_manual_compaction(true);
compact_opts.set_target_level(1);
compact_opts.set_change_level(true);
compact_opts.set_bottommost_level_compaction(BottommostLevelCompaction::ForceOptimized);
// put and compact column family cf1
let cfs = vec!["cf1"];
let db = DB::open_cf(&opts, &path, cfs).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
db.put_cf(cf1, b"k1", b"v1").unwrap();
db.put_cf(cf1, b"k2", b"v2").unwrap();
db.put_cf(cf1, b"k3", b"v3").unwrap();
db.put_cf(cf1, b"k4", b"v4").unwrap();
db.put_cf(cf1, b"k5", b"v5").unwrap();
db.compact_range_cf(cf1, Some(b"k2"), Some(b"k4"));
db.compact_range_cf_opt(cf1, Some(b"k1"), None::<&str>, &compact_opts);
// put and compact default column family
db.put(b"k1", b"v1").unwrap();
db.put(b"k2", b"v2").unwrap();
db.put(b"k3", b"v3").unwrap();
db.put(b"k4", b"v4").unwrap();
db.put(b"k5", b"v5").unwrap();
db.compact_range(Some(b"k3"), None::<&str>);
db.compact_range_opt(None::<&str>, Some(b"k5"), &compact_opts);
}
}
#[test]
fn fifo_compaction_test() {
let path = DBPath::new("_rust_rocksdb_fifo_compaction_test");
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
// set compaction style
let mut fifo_co_opts = FifoCompactOptions::default();
fifo_co_opts.set_max_table_files_size(4 << 10); // 4KB
opts.set_compaction_style(DBCompactionStyle::Fifo);
opts.set_fifo_compaction_options(&fifo_co_opts);
// put and compact column family cf1
let cfs = vec!["cf1"];
let db = DB::open_cf(&opts, &path, cfs).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
db.put_cf(cf1, b"k1", b"v1").unwrap();
db.put_cf(cf1, b"k2", b"v2").unwrap();
db.put_cf(cf1, b"k3", b"v3").unwrap();
db.put_cf(cf1, b"k4", b"v4").unwrap();
db.put_cf(cf1, b"k5", b"v5").unwrap();
db.compact_range_cf(cf1, Some(b"k2"), Some(b"k4"));
// check stats
let ctx = PerfContext::default();
let block_cache_hit_count = ctx.metric(PerfMetric::BlockCacheHitCount);
assert!(block_cache_hit_count > 0);
let expect = format!("block_cache_hit_count = {}", block_cache_hit_count);
assert!(ctx.report(true).contains(&expect));
}
}
#[test]
fn env_and_dbpaths_test() {
let path = DBPath::new("_rust_rocksdb_dbpath_test");
let path1 = DBPath::new("_rust_rocksdb_dbpath_test_1");
let path2 = DBPath::new("_rust_rocksdb_dbpath_test_2");
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
let mut env = Env::default().unwrap();
env.lower_high_priority_thread_pool_cpu_priority();
opts.set_env(&env);
let mut paths = Vec::new();
paths.push(rocksdb::DBPath::new(&path1, 20 << 20).unwrap());
paths.push(rocksdb::DBPath::new(&path2, 30 << 20).unwrap());
opts.set_db_paths(&paths);
let db = DB::open(&opts, &path).unwrap();
db.put(b"k1", b"v1").unwrap();
assert_eq!(db.get(b"k1").unwrap().unwrap(), b"v1");
}
}
#[test]
fn prefix_extract_and_iterate_test() {
let path = DBPath::new("_rust_rocksdb_prefix_extract_and_iterate");
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(2));
let db = DB::open(&opts, &path).unwrap();
db.put(b"p1_k1", b"v1").unwrap();
db.put(b"p2_k2", b"v2").unwrap();
db.put(b"p1_k3", b"v3").unwrap();
db.put(b"p1_k4", b"v4").unwrap();
db.put(b"p2_k5", b"v5").unwrap();
let mut readopts = ReadOptions::default();
readopts.set_prefix_same_as_start(true);
readopts.set_iterate_lower_bound(b"p1".to_vec());
readopts.set_pin_data(true);
let iter = db.iterator_opt(IteratorMode::Start, readopts);
let expected: Vec<_> = vec![(b"p1_k1", b"v1"), (b"p1_k3", b"v3"), (b"p1_k4", b"v4")]
.into_iter()
.map(|(k, v)| (k.to_vec().into_boxed_slice(), v.to_vec().into_boxed_slice()))
.collect();
assert_eq!(expected, iter.collect::<Vec<_>>());
}
}
#[test]
fn get_with_cache_and_bulkload_test() {
let path = DBPath::new("_rust_rocksdb_get_with_cache_and_bulkload_test");
let log_path = DBPath::new("_rust_rocksdb_log_path_test");
{
// create options
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.set_wal_bytes_per_sync(8 << 10); // 8KB
opts.set_writable_file_max_buffer_size(512 << 10); // 512KB
opts.set_enable_write_thread_adaptive_yield(true);
opts.set_unordered_write(true);
opts.set_max_subcompactions(2);
opts.set_max_background_jobs(4);
opts.set_use_adaptive_mutex(true);
// set block based table and cache
let cache = Cache::new_lru_cache(64 << 10).unwrap();
let mut block_based_opts = BlockBasedOptions::default();
block_based_opts.set_block_cache(&cache);
block_based_opts.set_cache_index_and_filter_blocks(true);
opts.set_block_based_table_factory(&block_based_opts);
// open db
let db = DB::open(&opts, &path).unwrap();
// write a lot
let mut batch = WriteBatch::default();
for i in 0..1_000 {
batch.put(format!("{:0>4}", i).as_bytes(), b"v");
}
assert!(db.write(batch).is_ok());
// flush memory table to sst, trigger cache usage on `get`
assert!(db.flush().is_ok());
// get -> trigger caching
let _ = db.get(b"1");
assert!(cache.get_usage() > 0);
}
// bulk loading
{
// create new options
let mut opts = Options::default();
opts.set_delete_obsolete_files_period_micros(100_000);
opts.prepare_for_bulk_load();
opts.set_db_log_dir(&log_path);
opts.set_max_sequential_skip_in_iterations(16);
opts.set_paranoid_checks(true);
// open db
let db = DB::open(&opts, &path).unwrap();
// try to get key
let iter = db.iterator(IteratorMode::Start);
for (expected, (k, _)) in iter.enumerate() {
assert_eq!(k.as_ref(), format!("{:0>4}", expected).as_bytes());
}
}
// raise error when db exists
{
// create new options
let mut opts = Options::default();
opts.set_error_if_exists(true);
assert!(DB::open(&opts, &path).is_err());
}
}
#[test]
fn test_open_for_read_only() {
let path = DBPath::new("_rust_rocksdb_test_open_for_read_only");
{
let db = DB::open_default(&path).unwrap();
db.put(b"k1", b"v1").unwrap();
}
{
let opts = Options::default();
let error_if_log_file_exist = false;
let db = DB::open_for_read_only(&opts, &path, error_if_log_file_exist).unwrap();
assert_eq!(db.get(b"k1").unwrap().unwrap(), b"v1");
assert!(db.put(b"k2", b"v2").is_err());
}
}
#[test]
fn test_open_cf_for_read_only() {
let path = DBPath::new("_rust_rocksdb_test_open_cf_for_read_only");
let cfs = vec!["cf1"];
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
let db = DB::open_cf(&opts, &path, cfs.clone()).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
db.put_cf(cf1, b"k1", b"v1").unwrap();
}
{
let opts = Options::default();
let error_if_log_file_exist = false;
let db = DB::open_cf_for_read_only(&opts, &path, cfs, error_if_log_file_exist).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
assert_eq!(db.get_cf(cf1, b"k1").unwrap().unwrap(), b"v1");
assert!(db.put_cf(cf1, b"k2", b"v2").is_err());
}
}
#[test]
fn delete_range_test() {
let path = DBPath::new("_rust_rocksdb_delete_range_test");
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
let cfs = vec!["cf1"];
let db = DB::open_cf(&opts, &path, cfs).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
db.put_cf(cf1, b"k1", b"v1").unwrap();
db.put_cf(cf1, b"k2", b"v2").unwrap();
db.put_cf(cf1, b"k3", b"v3").unwrap();
db.put_cf(cf1, b"k4", b"v4").unwrap();
db.put_cf(cf1, b"k5", b"v5").unwrap();
db.delete_range_cf(cf1, b"k2", b"k4").unwrap();
assert_eq!(db.get_cf(cf1, b"k1").unwrap().unwrap(), b"v1");
assert_eq!(db.get_cf(cf1, b"k4").unwrap().unwrap(), b"v4");
assert_eq!(db.get_cf(cf1, b"k5").unwrap().unwrap(), b"v5");
assert!(db.get_cf(cf1, b"k2").unwrap().is_none());
assert!(db.get_cf(cf1, b"k3").unwrap().is_none());
}
}

Loading…
Cancel
Save