Support fetching sst files metadata, delete files in range, get mem usage (#446)

master
Linh Tran Tuan 4 years ago committed by GitHub
parent afe0f7dca0
commit 6d60d48ba2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 112
      src/db.rs
  2. 34
      src/db_options.rs
  3. 21
      src/ffi_util.rs
  4. 2
      src/lib.rs
  5. 140
      src/perf.rs
  6. 86
      tests/test_db.rs

@ -15,7 +15,7 @@
use crate::{
ffi,
ffi_util::{opt_bytes_to_ptr, to_cpath},
ffi_util::{from_cstr, opt_bytes_to_ptr, raw_data, to_cpath},
ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIterator, DBPinnableSlice,
DBRawIterator, DBWALIterator, Direction, Error, FlushOptions, IngestExternalFileOptions,
IteratorMode, Options, ReadOptions, Snapshot, WriteBatch, WriteOptions,
@ -1270,6 +1270,97 @@ impl DB {
Ok(())
}
}
/// Returns a list of all table files with their level, start key
/// and end key
pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
unsafe {
let files = ffi::rocksdb_livefiles(self.inner);
if files.is_null() {
Err(Error::new("Could not get live files".to_owned()))
} else {
let n = ffi::rocksdb_livefiles_count(files);
let mut livefiles = Vec::with_capacity(n as usize);
let mut key_size: usize = 0;
for i in 0..n {
let name = from_cstr(ffi::rocksdb_livefiles_name(files, i));
let size = ffi::rocksdb_livefiles_size(files, i);
let level = ffi::rocksdb_livefiles_level(files, i) as i32;
// get smallest key inside file
let smallest_key = ffi::rocksdb_livefiles_smallestkey(files, i, &mut key_size);
let smallest_key = raw_data(smallest_key, key_size);
// get largest key inside file
let largest_key = ffi::rocksdb_livefiles_largestkey(files, i, &mut key_size);
let largest_key = raw_data(largest_key, key_size);
livefiles.push(LiveFile {
name,
size,
level,
start_key: smallest_key,
end_key: largest_key,
num_entries: ffi::rocksdb_livefiles_entries(files, i),
num_deletions: ffi::rocksdb_livefiles_deletions(files, i),
})
}
// destroy livefiles metadata(s)
ffi::rocksdb_livefiles_destroy(files);
// return
Ok(livefiles)
}
}
}
/// Delete sst files whose keys are entirely in the given range.
///
/// Could leave some keys in the range which are in files which are not
/// entirely in the range.
///
/// Note: L0 files are left regardless of whether they're in the range.
///
/// Snapshots before the delete might not see the data in the given range.
pub fn delete_file_in_range<K: AsRef<[u8]>>(&self, from: K, to: K) -> Result<(), Error> {
let from = from.as_ref();
let to = to.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_delete_file_in_range(
self.inner,
from.as_ptr() as *const c_char,
from.len() as size_t,
to.as_ptr() as *const c_char,
to.len() as size_t,
));
Ok(())
}
}
/// Same as `delete_file_in_range` but only for specific column family
pub fn delete_file_in_range_cf<K: AsRef<[u8]>>(
&self,
cf: &ColumnFamily,
from: K,
to: K,
) -> Result<(), Error> {
let from = from.as_ref();
let to = to.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_delete_file_in_range_cf(
self.inner,
cf.inner,
from.as_ptr() as *const c_char,
from.len() as size_t,
to.as_ptr() as *const c_char,
to.len() as size_t,
));
Ok(())
}
}
}
impl Drop for DB {
@ -1288,3 +1379,22 @@ impl fmt::Debug for DB {
write!(f, "RocksDB {{ path: {:?} }}", self.path())
}
}
/// The metadata that describes a SST file
#[derive(Debug, Clone)]
pub struct LiveFile {
/// Name of the file
pub name: String,
/// Size of the file
pub size: usize,
/// Level at which this file resides
pub level: i32,
/// Smallest user defined key in the file
pub start_key: Option<Vec<u8>>,
/// Largest user defined key in the file
pub end_key: Option<Vec<u8>>,
/// Number of entries/alive keys in the file
pub num_entries: u64,
/// Number of deletions/tomb key(s) in the file
pub num_deletions: u64,
}

@ -433,8 +433,8 @@ impl BlockBasedOptions {
}
}
/// Sets the control over blocks (user data is stored in a set of blocks, and
/// a block is the unit of reading from disk).
/// Sets global cache for blocks (user data is stored in a set of blocks, and
/// a block is the unit of reading from disk). Cache must outlive DB instance which uses it.
///
/// If set, use the specified cache for blocks.
/// By default, rocksdb will automatically create and use an 8MB internal cache.
@ -444,7 +444,8 @@ impl BlockBasedOptions {
}
}
/// Sets the cache for compressed blocks.
/// Sets global cache for compressed blocks. Cache must outlive DB instance which uses it.
///
/// By default, rocksdb will not use a compressed block cache.
pub fn set_block_cache_compressed(&mut self, cache: &Cache) {
unsafe {
@ -2428,7 +2429,7 @@ impl Options {
}
}
/// Sets global cache for table-level rows.
/// Sets global cache for table-level rows. Cache must outlive DB instance which uses it.
///
/// Default: null (disabled)
/// Not supported in ROCKSDB_LITE mode!
@ -2591,6 +2592,31 @@ impl Options {
ffi::rocksdb_options_set_arena_block_size(self.inner, size);
}
}
/// If true, then print malloc stats together with rocksdb.stats when printing to LOG.
///
/// Default: false
pub fn set_dump_malloc_stats(&mut self, enabled: bool) {
unsafe {
ffi::rocksdb_options_set_dump_malloc_stats(self.inner, enabled as c_uchar);
}
}
/// Enable whole key bloom filter in memtable. Note this will only take effect
/// if memtable_prefix_bloom_size_ratio is not 0. Enabling whole key filtering
/// can potentially reduce CPU usage for point-look-ups.
///
/// Default: false (disable)
///
/// Dynamically changeable through SetOptions() API
pub fn set_memtable_whole_key_filtering(&mut self, whole_key_filter: bool) {
unsafe {
ffi::rocksdb_options_set_memtable_whole_key_filtering(
self.inner,
whole_key_filter as c_uchar,
);
}
}
}
impl Default for Options {

@ -19,14 +19,29 @@ use std::ffi::{CStr, CString};
use std::path::Path;
use std::ptr;
pub(crate) unsafe fn from_cstr(ptr: *const c_char) -> String {
let cstr = CStr::from_ptr(ptr as *const _);
String::from_utf8_lossy(cstr.to_bytes()).into_owned()
}
pub(crate) unsafe fn raw_data(ptr: *const c_char, size: usize) -> Option<Vec<u8>> {
if ptr.is_null() {
None
} else {
let mut dst = Vec::with_capacity(size);
dst.set_len(size);
ptr::copy(ptr as *const u8, dst.as_mut_ptr(), size);
Some(dst)
}
}
pub fn error_message(ptr: *const c_char) -> String {
let cstr = unsafe { CStr::from_ptr(ptr as *const _) };
let s = String::from_utf8_lossy(cstr.to_bytes()).into_owned();
unsafe {
let s = from_cstr(ptr);
libc::free(ptr as *mut c_void);
}
s
}
}
pub fn opt_bytes_to_ptr<T: AsRef<[u8]>>(opt: Option<T>) -> *const c_char {
match opt {

@ -94,7 +94,7 @@ mod write_batch;
pub use crate::{
column_family::{ColumnFamily, ColumnFamilyDescriptor, DEFAULT_COLUMN_FAMILY_NAME},
compaction_filter::Decision as CompactionDecision,
db::DB,
db::{LiveFile, DB},
db_iterator::{DBIterator, DBRawIterator, DBWALIterator, Direction, IteratorMode},
db_options::{
BlockBasedIndexType, BlockBasedOptions, BottommostLevelCompaction, Cache, CompactOptions,

@ -12,10 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use libc::{c_char, c_int, c_uchar, c_void};
use std::ffi::CStr;
use libc::{c_int, c_uchar, c_void};
use crate::ffi;
use crate::{ffi, ffi_util::from_cstr, Cache, Error, DB};
#[derive(Debug, Copy, Clone, PartialEq)]
#[repr(i32)]
@ -143,15 +142,6 @@ impl Drop for PerfContext {
}
}
fn convert(ptr: *const c_char) -> String {
let cstr = unsafe { CStr::from_ptr(ptr as *const _) };
let s = String::from_utf8_lossy(cstr.to_bytes()).into_owned();
unsafe {
libc::free(ptr as *mut c_void);
}
s
}
impl PerfContext {
/// Reset context
pub fn reset(&mut self) {
@ -163,10 +153,10 @@ impl PerfContext {
/// Get the report on perf
pub fn report(&self, exclude_zero_counters: bool) -> String {
unsafe {
convert(ffi::rocksdb_perfcontext_report(
self.inner,
exclude_zero_counters as c_uchar,
))
let ptr = ffi::rocksdb_perfcontext_report(self.inner, exclude_zero_counters as c_uchar);
let report = from_cstr(ptr);
libc::free(ptr as *mut c_void);
report
}
}
@ -175,3 +165,121 @@ impl PerfContext {
unsafe { ffi::rocksdb_perfcontext_metric(self.inner, id as c_int) }
}
}
/// Memory usage stats
pub struct MemoryUsageStats {
/// Approximate memory usage of all the mem-tables
pub mem_table_total: u64,
/// Approximate memory usage of un-flushed mem-tables
pub mem_table_unflushed: u64,
/// Approximate memory usage of all the table readers
pub mem_table_readers_total: u64,
/// Approximate memory usage by cache
pub cache_total: u64,
}
/// Wrap over memory_usage_t. Hold current memory usage of the specified DB instances and caches
struct MemoryUsage {
inner: *mut ffi::rocksdb_memory_usage_t,
}
impl Drop for MemoryUsage {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_approximate_memory_usage_destroy(self.inner);
}
}
}
impl MemoryUsage {
/// Approximate memory usage of all the mem-tables
fn approximate_mem_table_total(&self) -> u64 {
unsafe { ffi::rocksdb_approximate_memory_usage_get_mem_table_total(self.inner) }
}
/// Approximate memory usage of un-flushed mem-tables
fn approximate_mem_table_unflushed(&self) -> u64 {
unsafe { ffi::rocksdb_approximate_memory_usage_get_mem_table_unflushed(self.inner) }
}
/// Approximate memory usage of all the table readers
fn approximate_mem_table_readers_total(&self) -> u64 {
unsafe { ffi::rocksdb_approximate_memory_usage_get_mem_table_readers_total(self.inner) }
}
/// Approximate memory usage by cache
fn approximate_cache_total(&self) -> u64 {
unsafe { ffi::rocksdb_approximate_memory_usage_get_cache_total(self.inner) }
}
}
/// Builder for MemoryUsage
struct MemoryUsageBuilder {
inner: *mut ffi::rocksdb_memory_consumers_t,
}
impl Drop for MemoryUsageBuilder {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_memory_consumers_destroy(self.inner);
}
}
}
impl MemoryUsageBuilder {
/// Create new instance
fn new() -> Result<Self, Error> {
let mc = unsafe { ffi::rocksdb_memory_consumers_create() };
if mc.is_null() {
Err(Error::new(
"Could not create MemoryUsage builder".to_owned(),
))
} else {
Ok(Self { inner: mc })
}
}
/// Add a DB instance to collect memory usage from it and add up in total stats
fn add_db(&mut self, db: &DB) {
unsafe {
ffi::rocksdb_memory_consumers_add_db(self.inner, db.inner);
}
}
/// Add a cache to collect memory usage from it and add up in total stats
fn add_cache(&mut self, cache: &Cache) {
unsafe {
ffi::rocksdb_memory_consumers_add_cache(self.inner, cache.inner);
}
}
/// Build up MemoryUsage
fn build(&self) -> Result<MemoryUsage, Error> {
unsafe {
let mu = ffi_try!(ffi::rocksdb_approximate_memory_usage_create(self.inner));
Ok(MemoryUsage { inner: mu })
}
}
}
/// Get memory usage stats from DB instances and Cache instances
pub fn get_memory_usage_stats(
dbs: Option<&[&DB]>,
caches: Option<&[&Cache]>,
) -> Result<MemoryUsageStats, Error> {
let mut builder = MemoryUsageBuilder::new()?;
if let Some(dbs_) = dbs {
dbs_.iter().for_each(|db| builder.add_db(db));
}
if let Some(caches_) = caches {
caches_.iter().for_each(|cache| builder.add_cache(cache));
}
let mu = builder.build()?;
Ok(MemoryUsageStats {
mem_table_total: mu.approximate_mem_table_total(),
mem_table_unflushed: mu.approximate_mem_table_unflushed(),
mem_table_readers_total: mu.approximate_mem_table_readers_total(),
cache_total: mu.approximate_cache_total(),
})
}

@ -15,10 +15,10 @@
mod util;
use rocksdb::{
BlockBasedOptions, BottommostLevelCompaction, Cache, CompactOptions, DBCompactionStyle, Env,
Error, FifoCompactOptions, IteratorMode, Options, PerfContext, PerfMetric, ReadOptions,
SliceTransform, Snapshot, UniversalCompactOptions, UniversalCompactionStopStyle, WriteBatch,
DB,
perf::get_memory_usage_stats, BlockBasedOptions, BottommostLevelCompaction, Cache,
CompactOptions, DBCompactionStyle, Env, Error, FifoCompactOptions, IteratorMode, Options,
PerfContext, PerfMetric, ReadOptions, SliceTransform, Snapshot, UniversalCompactOptions,
UniversalCompactionStopStyle, WriteBatch, DB,
};
use std::sync::Arc;
use std::time::Duration;
@ -495,11 +495,13 @@ fn compact_range_test() {
opts.create_missing_column_families(true);
// set compaction style
{
let mut uni_co_opts = UniversalCompactOptions::default();
uni_co_opts.set_size_ratio(2);
uni_co_opts.set_stop_style(UniversalCompactionStopStyle::Total);
opts.set_compaction_style(DBCompactionStyle::Universal);
opts.set_universal_compaction_options(&uni_co_opts);
}
// set compaction options
let mut compact_opts = CompactOptions::default();
@ -540,10 +542,12 @@ fn fifo_compaction_test() {
opts.create_missing_column_families(true);
// set compaction style
{
let mut fifo_co_opts = FifoCompactOptions::default();
fifo_co_opts.set_max_table_files_size(4 << 10); // 4KB
opts.set_compaction_style(DBCompactionStyle::Fifo);
opts.set_fifo_compaction_options(&fifo_co_opts);
}
// put and compact column family cf1
let cfs = vec!["cf1"];
@ -560,12 +564,12 @@ fn fifo_compaction_test() {
let ctx = PerfContext::default();
let block_cache_hit_count = ctx.metric(PerfMetric::BlockCacheHitCount);
assert!(block_cache_hit_count > 0);
if block_cache_hit_count > 0 {
let expect = format!("block_cache_hit_count = {}", block_cache_hit_count);
assert!(ctx.report(true).contains(&expect));
}
}
}
#[test]
fn env_and_dbpaths_test() {
@ -577,14 +581,18 @@ fn env_and_dbpaths_test() {
opts.create_if_missing(true);
opts.create_missing_column_families(true);
{
let mut env = Env::default().unwrap();
env.lower_high_priority_thread_pool_cpu_priority();
opts.set_env(&env);
}
{
let mut paths = Vec::new();
paths.push(rocksdb::DBPath::new(&path1, 20 << 20).unwrap());
paths.push(rocksdb::DBPath::new(&path2, 30 << 20).unwrap());
opts.set_db_paths(&paths);
}
let db = DB::open(&opts, &path).unwrap();
db.put(b"k1", b"v1").unwrap();
@ -626,7 +634,7 @@ fn prefix_extract_and_iterate_test() {
fn get_with_cache_and_bulkload_test() {
let path = DBPath::new("_rust_rocksdb_get_with_cache_and_bulkload_test");
let log_path = DBPath::new("_rust_rocksdb_log_path_test");
{
// create options
let mut opts = Options::default();
opts.create_if_missing(true);
@ -638,9 +646,17 @@ fn get_with_cache_and_bulkload_test() {
opts.set_max_subcompactions(2);
opts.set_max_background_jobs(4);
opts.set_use_adaptive_mutex(true);
opts.set_db_log_dir(&log_path);
opts.set_memtable_whole_key_filtering(true);
opts.set_dump_malloc_stats(true);
// trigger all sst files in L1/2 instead of L0
opts.set_max_bytes_for_level_base(64 << 10); // 64KB
{
// set block based table and cache
let cache = Cache::new_lru_cache(64 << 10).unwrap();
let cache = Cache::new_lru_cache(512 << 10).unwrap();
assert_eq!(cache.get_usage(), 0);
let mut block_based_opts = BlockBasedOptions::default();
block_based_opts.set_block_cache(&cache);
block_based_opts.set_cache_index_and_filter_blocks(true);
@ -651,29 +667,30 @@ fn get_with_cache_and_bulkload_test() {
// write a lot
let mut batch = WriteBatch::default();
for i in 0..1_000 {
for i in 0..10_000 {
batch.put(format!("{:0>4}", i).as_bytes(), b"v");
}
assert!(db.write(batch).is_ok());
// flush memory table to sst, trigger cache usage on `get`
// flush memory table to sst and manual compaction
assert!(db.flush().is_ok());
db.compact_range(Some(format!("{:0>4}", 0).as_bytes()), None::<Vec<u8>>);
// get -> trigger caching
let _ = db.get(b"1");
assert!(cache.get_usage() > 0);
// get approximated memory usage
let mem_usage = get_memory_usage_stats(Some(&[&db]), None).unwrap();
assert!(mem_usage.mem_table_total > 0);
// get approximated cache usage
let mem_usage = get_memory_usage_stats(None, Some(&[&cache])).unwrap();
assert!(mem_usage.cache_total > 0);
}
// bulk loading
{
// create new options
let mut opts = Options::default();
opts.set_delete_obsolete_files_period_micros(100_000);
opts.prepare_for_bulk_load();
opts.set_db_log_dir(&log_path);
opts.set_max_sequential_skip_in_iterations(16);
opts.set_paranoid_checks(true);
// open db
let db = DB::open(&opts, &path).unwrap();
@ -682,12 +699,41 @@ fn get_with_cache_and_bulkload_test() {
for (expected, (k, _)) in iter.enumerate() {
assert_eq!(k.as_ref(), format!("{:0>4}", expected).as_bytes());
}
// check live files (sst files meta)
let livefiles = db.live_files().unwrap();
assert_eq!(livefiles.len(), 1);
livefiles.iter().for_each(|f| {
assert_eq!(f.level, 2);
assert!(!f.name.is_empty());
assert_eq!(
f.start_key.as_ref().unwrap().as_slice(),
format!("{:0>4}", 0).as_bytes()
);
assert_eq!(
f.end_key.as_ref().unwrap().as_slice(),
format!("{:0>4}", 9999).as_bytes()
);
assert_eq!(f.num_entries, 10000);
assert_eq!(f.num_deletions, 0);
});
// delete sst file in range (except L0)
assert!(db
.delete_file_in_range(
format!("{:0>4}", 0).as_bytes(),
format!("{:0>4}", 9999).as_bytes()
)
.is_ok());
let livefiles = db.live_files().unwrap();
assert_eq!(livefiles.len(), 0);
// try to get a deleted key
assert!(db.get(format!("{:0>4}", 123).as_bytes()).unwrap().is_none());
}
// raise error when db exists
{
// create new options
let mut opts = Options::default();
opts.set_error_if_exists(true);
assert!(DB::open(&opts, &path).is_err());
}

Loading…
Cancel
Save