Adding read/write/db/compaction options (#442)

master
Linh Tran Tuan 4 years ago committed by GitHub
parent bc07d0cd28
commit 890ef81fc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 275
      src/db.rs
  2. 912
      src/db_options.rs
  3. 10
      src/lib.rs
  4. 177
      src/perf.rs

@ -16,9 +16,10 @@
use crate::{ use crate::{
ffi, ffi,
ffi_util::{opt_bytes_to_ptr, to_cpath}, ffi_util::{opt_bytes_to_ptr, to_cpath},
ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBPinnableSlice, DBRawIterator, ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIterator, DBPinnableSlice,
DBWALIterator, Direction, Error, FlushOptions, IngestExternalFileOptions, IteratorMode, DBRawIterator, DBWALIterator, Direction, Error, FlushOptions, IngestExternalFileOptions,
Options, ReadOptions, Snapshot, WriteBatch, WriteOptions, DEFAULT_COLUMN_FAMILY_NAME, IteratorMode, Options, ReadOptions, Snapshot, WriteBatch, WriteOptions,
DEFAULT_COLUMN_FAMILY_NAME,
}; };
use libc::{self, c_char, c_int, c_uchar, c_void, size_t}; use libc::{self, c_char, c_int, c_uchar, c_void, size_t};
@ -918,6 +919,7 @@ impl DB {
self.delete_range_cf_opt(cf, from, to, &WriteOptions::default()) self.delete_range_cf_opt(cf, from, to, &WriteOptions::default())
} }
/// Runs a manual compaction on the Range of keys given. This is not likely to be needed for typical usage.
pub fn compact_range<S: AsRef<[u8]>, E: AsRef<[u8]>>(&self, start: Option<S>, end: Option<E>) { pub fn compact_range<S: AsRef<[u8]>, E: AsRef<[u8]>>(&self, start: Option<S>, end: Option<E>) {
unsafe { unsafe {
let start = start.as_ref().map(AsRef::as_ref); let start = start.as_ref().map(AsRef::as_ref);
@ -933,6 +935,30 @@ impl DB {
} }
} }
/// Same as `compact_range` but with custom options.
pub fn compact_range_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
&self,
start: Option<S>,
end: Option<E>,
opts: &CompactOptions,
) {
unsafe {
let start = start.as_ref().map(AsRef::as_ref);
let end = end.as_ref().map(AsRef::as_ref);
ffi::rocksdb_compact_range_opt(
self.inner,
opts.inner,
opt_bytes_to_ptr(start),
start.map_or(0, |s| s.len()) as size_t,
opt_bytes_to_ptr(end),
end.map_or(0, |e| e.len()) as size_t,
);
}
}
/// Runs a manual compaction on the Range of keys given on the
/// given column family. This is not likely to be needed for typical usage.
pub fn compact_range_cf<S: AsRef<[u8]>, E: AsRef<[u8]>>( pub fn compact_range_cf<S: AsRef<[u8]>, E: AsRef<[u8]>>(
&self, &self,
cf: &ColumnFamily, cf: &ColumnFamily,
@ -954,6 +980,30 @@ impl DB {
} }
} }
/// Same as `compact_range_cf` but with custom options.
pub fn compact_range_cf_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
&self,
cf: &ColumnFamily,
start: Option<S>,
end: Option<E>,
opts: &CompactOptions,
) {
unsafe {
let start = start.as_ref().map(AsRef::as_ref);
let end = end.as_ref().map(AsRef::as_ref);
ffi::rocksdb_compact_range_cf_opt(
self.inner,
cf.inner,
opts.inner,
opt_bytes_to_ptr(start),
start.map_or(0, |s| s.len()) as size_t,
opt_bytes_to_ptr(end),
end.map_or(0, |e| e.len()) as size_t,
);
}
}
pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), Error> { pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), Error> {
let copts = opts let copts = opts
.iter() .iter()
@ -1375,6 +1425,74 @@ fn writebatch_works() {
assert!(DB::destroy(&opts, path).is_ok()); assert!(DB::destroy(&opts, path).is_ok());
} }
#[test]
fn get_with_cache_and_bulkload_test() {
use crate::{BlockBasedOptions, Cache};
let path = "_rust_rocksdb_get_with_cache_and_bulkload_test";
let log_path = "_rust_rocksdb_log_path_test";
{
// create options
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.set_wal_bytes_per_sync(8 << 10); // 8KB
opts.set_writable_file_max_buffer_size(512 << 10); // 512KB
opts.set_enable_write_thread_adaptive_yield(true);
opts.set_unordered_write(true);
opts.set_max_subcompactions(2);
opts.set_max_background_jobs(4);
opts.set_use_adaptive_mutex(true);
// set block based table and cache
let cache = Cache::new_lru_cache(64 << 10).unwrap();
let mut block_based_opts = BlockBasedOptions::default();
block_based_opts.set_block_cache(&cache);
block_based_opts.set_cache_index_and_filter_blocks(true);
opts.set_block_based_table_factory(&block_based_opts);
// open db
let db = DB::open(&opts, path).unwrap();
// write a lot
let mut batch = WriteBatch::default();
for i in 0..1_000 {
batch.put(format!("{:0>4}", i).as_bytes(), b"v");
}
assert!(db.write(batch).is_ok());
// flush memory table to sst, trigger cache usage on `get`
assert!(db.flush().is_ok());
// get -> trigger caching
let _ = db.get(b"1");
assert!(cache.get_usage() > 0);
}
// bulk loading
{
// create new options
let mut opts = Options::default();
opts.set_delete_obsolete_files_period_micros(100_000);
opts.prepare_for_bulk_load();
opts.set_db_log_dir(log_path);
opts.set_max_sequential_skip_in_iterations(16);
// open db
let db = DB::open(&opts, path).unwrap();
// try to get key
let iter = db.iterator(IteratorMode::Start);
for (expected, (k, _)) in iter.enumerate() {
assert_eq!(k.as_ref(), format!("{:0>4}", expected).as_bytes());
}
}
let opts = Options::default();
assert!(DB::destroy(&opts, path).is_ok());
assert!(DB::destroy(&opts, log_path).is_ok());
}
#[test] #[test]
fn iterator_test() { fn iterator_test() {
let path = "_rust_rocksdb_iteratortest"; let path = "_rust_rocksdb_iteratortest";
@ -1439,6 +1557,65 @@ fn iterator_test_upper_bound() {
DB::destroy(&opts, path).unwrap(); DB::destroy(&opts, path).unwrap();
} }
#[test]
fn iterator_test_lower_bound() {
let path = "_rust_rocksdb_iteratortest_lower_bound";
{
let db = DB::open_default(path).unwrap();
db.put(b"k1", b"v1").unwrap();
db.put(b"k2", b"v2").unwrap();
db.put(b"k3", b"v3").unwrap();
db.put(b"k4", b"v4").unwrap();
db.put(b"k5", b"v5").unwrap();
let mut readopts = ReadOptions::default();
readopts.set_iterate_lower_bound(b"k4".to_vec());
let iter = db.iterator_opt(IteratorMode::Start, readopts);
let expected: Vec<_> = vec![(b"k4", b"v4"), (b"k5", b"v5")]
.into_iter()
.map(|(k, v)| (k.to_vec().into_boxed_slice(), v.to_vec().into_boxed_slice()))
.collect();
assert_eq!(expected, iter.collect::<Vec<_>>());
}
let opts = Options::default();
DB::destroy(&opts, path).unwrap();
}
#[test]
fn prefix_extract_and_iterate_test() {
use crate::SliceTransform;
let path = "_rust_rocksdb_prefix_extract_and_iterate";
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(2));
let db = DB::open(&opts, path).unwrap();
db.put(b"p1_k1", b"v1").unwrap();
db.put(b"p2_k2", b"v2").unwrap();
db.put(b"p1_k3", b"v3").unwrap();
db.put(b"p1_k4", b"v4").unwrap();
db.put(b"p2_k5", b"v5").unwrap();
let mut readopts = ReadOptions::default();
readopts.set_prefix_same_as_start(true);
readopts.set_iterate_lower_bound(b"p1".to_vec());
readopts.set_pin_data(true);
let iter = db.iterator_opt(IteratorMode::Start, readopts);
let expected: Vec<_> = vec![(b"p1_k1", b"v1"), (b"p1_k3", b"v3"), (b"p1_k4", b"v4")]
.into_iter()
.map(|(k, v)| (k.to_vec().into_boxed_slice(), v.to_vec().into_boxed_slice()))
.collect();
assert_eq!(expected, iter.collect::<Vec<_>>());
}
let opts = Options::default();
DB::destroy(&opts, path).unwrap();
}
#[test] #[test]
fn iterator_test_tailing() { fn iterator_test_tailing() {
let path = "_rust_rocksdb_iteratortest_tailing"; let path = "_rust_rocksdb_iteratortest_tailing";
@ -1561,3 +1738,95 @@ fn delete_range_test() {
let opts = Options::default(); let opts = Options::default();
DB::destroy(&opts, path).unwrap(); DB::destroy(&opts, path).unwrap();
} }
#[test]
fn compact_range_test() {
use crate::{
BottommostLevelCompaction, DBCompactionStyle, UniversalCompactOptions,
UniversalCompactionStopStyle,
};
let path = "_rust_rocksdb_compact_range_test";
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
// set compaction style
let mut uni_co_opts = UniversalCompactOptions::default();
uni_co_opts.set_size_ratio(2);
uni_co_opts.set_stop_style(UniversalCompactionStopStyle::Total);
opts.set_compaction_style(DBCompactionStyle::Universal);
opts.set_universal_compaction_options(&uni_co_opts);
// set compaction options
let mut compact_opts = CompactOptions::default();
compact_opts.set_exclusive_manual_compaction(true);
compact_opts.set_target_level(1);
compact_opts.set_change_level(true);
compact_opts.set_bottommost_level_compaction(BottommostLevelCompaction::ForceOptimized);
// put and compact column family cf1
let cfs = vec!["cf1"];
let db = DB::open_cf(&opts, path, cfs).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
db.put_cf(cf1, b"k1", b"v1").unwrap();
db.put_cf(cf1, b"k2", b"v2").unwrap();
db.put_cf(cf1, b"k3", b"v3").unwrap();
db.put_cf(cf1, b"k4", b"v4").unwrap();
db.put_cf(cf1, b"k5", b"v5").unwrap();
db.compact_range_cf(cf1, Some(b"k2"), Some(b"k4"));
db.compact_range_cf_opt(cf1, Some(b"k1"), None::<&str>, &compact_opts);
// put and compact default column family
db.put(b"k1", b"v1").unwrap();
db.put(b"k2", b"v2").unwrap();
db.put(b"k3", b"v3").unwrap();
db.put(b"k4", b"v4").unwrap();
db.put(b"k5", b"v5").unwrap();
db.compact_range(Some(b"k3"), None::<&str>);
db.compact_range_opt(None::<&str>, Some(b"k5"), &compact_opts);
}
let opts = Options::default();
DB::destroy(&opts, path).unwrap();
}
#[test]
fn fifo_compaction_test() {
use crate::{DBCompactionStyle, FifoCompactOptions, PerfContext, PerfMetric};
let path = "_rust_rocksdb_fifo_compaction_test";
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
// set compaction style
let mut fifo_co_opts = FifoCompactOptions::default();
fifo_co_opts.set_max_table_files_size(4 << 10); // 4KB
opts.set_compaction_style(DBCompactionStyle::Fifo);
opts.set_fifo_compaction_options(&fifo_co_opts);
// put and compact column family cf1
let cfs = vec!["cf1"];
let db = DB::open_cf(&opts, path, cfs).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
db.put_cf(cf1, b"k1", b"v1").unwrap();
db.put_cf(cf1, b"k2", b"v2").unwrap();
db.put_cf(cf1, b"k3", b"v3").unwrap();
db.put_cf(cf1, b"k4", b"v4").unwrap();
db.put_cf(cf1, b"k5", b"v5").unwrap();
db.compact_range_cf(cf1, Some(b"k2"), Some(b"k4"));
// check stats
let ctx = PerfContext::default();
let block_cache_hit_count = ctx.metric(PerfMetric::BlockCacheHitCount);
assert!(block_cache_hit_count > 0);
let expect = format!("block_cache_hit_count = {}", block_cache_hit_count);
assert!(ctx.report(true).contains(&expect));
}
let opts = Options::default();
DB::destroy(&opts, path).unwrap();
}

File diff suppressed because it is too large Load Diff

@ -84,6 +84,7 @@ mod db_iterator;
mod db_options; mod db_options;
mod db_pinnable_slice; mod db_pinnable_slice;
pub mod merge_operator; pub mod merge_operator;
pub mod perf;
mod slice_transform; mod slice_transform;
mod snapshot; mod snapshot;
mod sst_file_writer; mod sst_file_writer;
@ -95,12 +96,15 @@ pub use crate::{
db::DB, db::DB,
db_iterator::{DBIterator, DBRawIterator, DBWALIterator, Direction, IteratorMode}, db_iterator::{DBIterator, DBRawIterator, DBWALIterator, Direction, IteratorMode},
db_options::{ db_options::{
BlockBasedIndexType, BlockBasedOptions, DBCompactionStyle, DBCompressionType, BlockBasedIndexType, BlockBasedOptions, BottommostLevelCompaction, Cache, CompactOptions,
DBRecoveryMode, DataBlockIndexType, FlushOptions, IngestExternalFileOptions, DBCompactionStyle, DBCompressionType, DBRecoveryMode, DataBlockIndexType,
MemtableFactory, Options, PlainTableFactoryOptions, ReadOptions, WriteOptions, FifoCompactOptions, FlushOptions, IngestExternalFileOptions, MemtableFactory, Options,
PlainTableFactoryOptions, ReadOptions, UniversalCompactOptions,
UniversalCompactionStopStyle, WriteOptions,
}, },
db_pinnable_slice::DBPinnableSlice, db_pinnable_slice::DBPinnableSlice,
merge_operator::MergeOperands, merge_operator::MergeOperands,
perf::{PerfContext, PerfMetric, PerfStatsLevel},
slice_transform::SliceTransform, slice_transform::SliceTransform,
snapshot::Snapshot, snapshot::Snapshot,
sst_file_writer::SstFileWriter, sst_file_writer::SstFileWriter,

@ -0,0 +1,177 @@
// Copyright 2020 Tran Tuan Linh
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use libc::{c_char, c_int, c_uchar, c_void};
use std::ffi::CStr;
use crate::ffi;
#[derive(Debug, Copy, Clone, PartialEq)]
#[repr(i32)]
pub enum PerfStatsLevel {
/// Unknown settings
Uninitialized = 0,
/// Disable perf stats
Disable,
/// Enables only count stats
EnableCount,
/// Count stats and enable time stats except for mutexes
EnableTimeExceptForMutex,
/// Other than time, also measure CPU time counters. Still don't measure
/// time (neither wall time nor CPU time) for mutexes
EnableTimeAndCPUTimeExceptForMutex,
/// Enables count and time stats
EnableTime,
/// N.B must always be the last value!
OutOfBound,
}
#[derive(Debug, Copy, Clone, PartialEq)]
#[non_exhaustive]
#[repr(i32)]
pub enum PerfMetric {
UserKeyComparisonCount = 0,
BlockCacheHitCount = 1,
BlockReadCount = 2,
BlockReadByte = 3,
BlockReadTime = 4,
BlockChecksumTime = 5,
BlockDecompressTime = 6,
GetReadBytes = 7,
MultigetReadBytes = 8,
IterReadBytes = 9,
InternalKeySkippedCount = 10,
InternalDeleteSkippedCount = 11,
InternalRecentSkippedCount = 12,
InternalMergeCount = 13,
GetSnapshotTime = 14,
GetFromMemtableTime = 15,
GetFromMemtableCount = 16,
GetPostProcessTime = 17,
GetFromOutputFilesTime = 18,
SeekOnMemtableTime = 19,
SeekOnMemtableCount = 20,
NextOnMemtableCount = 21,
PrevOnMemtableCount = 22,
SeekChildSeekTime = 23,
SeekChildSeekCount = 24,
SeekMinHeapTime = 25,
SeekMaxHeapTime = 26,
SeekInternalSeekTime = 27,
FindNextUserEntryTime = 28,
WriteWalTime = 29,
WriteMemtableTime = 30,
WriteDelayTime = 31,
WritePreAndPostProcessTime = 32,
DbMutexLockNanos = 33,
DbConditionWaitNanos = 34,
MergeOperatorTimeNanos = 35,
ReadIndexBlockNanos = 36,
ReadFilterBlockNanos = 37,
NewTableBlockIterNanos = 38,
NewTableIteratorNanos = 39,
BlockSeekNanos = 40,
FindTableNanos = 41,
BloomMemtableHitCount = 42,
BloomMemtableMissCount = 43,
BloomSstHitCount = 44,
BloomSstMissCount = 45,
KeyLockWaitTime = 46,
KeyLockWaitCount = 47,
EnvNewSequentialFileNanos = 48,
EnvNewRandomAccessFileNanos = 49,
EnvNewWritableFileNanos = 50,
EnvReuseWritableFileNanos = 51,
EnvNewRandomRwFileNanos = 52,
EnvNewDirectoryNanos = 53,
EnvFileExistsNanos = 54,
EnvGetChildrenNanos = 55,
EnvGetChildrenFileAttributesNanos = 56,
EnvDeleteFileNanos = 57,
EnvCreateDirNanos = 58,
EnvCreateDirIfMissingNanos = 59,
EnvDeleteDirNanos = 60,
EnvGetFileSizeNanos = 61,
EnvGetFileModificationTimeNanos = 62,
EnvRenameFileNanos = 63,
EnvLinkFileNanos = 64,
EnvLockFileNanos = 65,
EnvUnlockFileNanos = 66,
EnvNewLoggerNanos = 67,
TotalMetricCount = 68,
}
/// Sets the perf stats level for current thread.
pub fn set_perf_stats(lvl: PerfStatsLevel) {
unsafe {
ffi::rocksdb_set_perf_level(lvl as c_int);
}
}
/// Thread local context for gathering performance counter efficiently
/// and transparently.
pub struct PerfContext {
pub(crate) inner: *mut ffi::rocksdb_perfcontext_t,
}
impl Default for PerfContext {
fn default() -> PerfContext {
let ctx = unsafe { ffi::rocksdb_perfcontext_create() };
if ctx.is_null() {
panic!("Could not create Perf Context");
}
PerfContext { inner: ctx }
}
}
impl Drop for PerfContext {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_perfcontext_destroy(self.inner);
}
}
}
fn convert(ptr: *const c_char) -> String {
let cstr = unsafe { CStr::from_ptr(ptr as *const _) };
let s = String::from_utf8_lossy(cstr.to_bytes()).into_owned();
unsafe {
libc::free(ptr as *mut c_void);
}
s
}
impl PerfContext {
/// Reset context
pub fn reset(&mut self) {
unsafe {
ffi::rocksdb_perfcontext_reset(self.inner);
}
}
/// Get the report on perf
pub fn report(&self, exclude_zero_counters: bool) -> String {
unsafe {
convert(ffi::rocksdb_perfcontext_report(
self.inner,
exclude_zero_counters as c_uchar,
))
}
}
/// Returns value of a metric
pub fn metric(&self, id: PerfMetric) -> u64 {
unsafe { ffi::rocksdb_perfcontext_metric(self.inner, id as c_int) }
}
}
Loading…
Cancel
Save