Add compaction filter factory API (#441)

master
unrealhoang 5 years ago committed by GitHub
parent 729fb69993
commit 22c4780c59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 65
      src/compaction_filter.rs
  2. 141
      src/compaction_filter_factory.rs
  3. 123
      src/db_options.rs
  4. 1
      src/lib.rs

@ -14,8 +14,7 @@
//
use libc::{c_char, c_int, c_uchar, c_void, size_t};
use std::ffi::CString;
use std::mem;
use std::ffi::{CStr, CString};
use std::slice;
/// Decision about how to handle compacting an object
@ -32,6 +31,39 @@ pub enum Decision {
Change(&'static [u8]),
}
/// CompactionFilter allows an application to modify/delete a key-value at
/// the time of compaction.
pub trait CompactionFilter {
/// The compaction process invokes this
/// method for kv that is being compacted. The application can inspect
/// the existing value of the key and make decision based on it.
///
/// Key-Values that are results of merge operation during compaction are not
/// passed into this function. Currently, when you have a mix of Put()s and
/// Merge()s on a same key, we only guarantee to process the merge operands
/// through the compaction filters. Put()s might be processed, or might not.
///
/// When the value is to be preserved, the application has the option
/// to modify the existing_value and pass it back through new_value.
/// value_changed needs to be set to true in this case.
///
/// Note that RocksDB snapshots (i.e. call GetSnapshot() API on a
/// DB* object) will not guarantee to preserve the state of the DB with
/// CompactionFilter. Data seen from a snapshot might disppear after a
/// compaction finishes. If you use snapshots, think twice about whether you
/// want to use compaction filter and whether you are using it in a safe way.
///
/// If the CompactionFilter was created by a factory, then it will only ever
/// be used by a single thread that is doing the compaction run, and this
/// call does not need to be thread-safe. However, multiple filters may be
/// in existence and operating concurrently.
fn filter(&mut self, level: u32, key: &[u8], value: &[u8]) -> Decision;
/// Returns a name that identifies this compaction filter.
/// The name will be printed to LOG file on start up for diagnosis.
fn name(&self) -> &CStr;
}
/// Function to filter compaction with.
///
/// This function takes the level of compaction, the key, and the existing value
@ -51,19 +83,32 @@ where
pub filter_fn: F,
}
pub unsafe extern "C" fn destructor_callback<F>(raw_cb: *mut c_void)
impl<F> CompactionFilter for CompactionFilterCallback<F>
where
F: CompactionFilterFn,
{
let _: Box<CompactionFilterCallback<F>> = mem::transmute(raw_cb);
fn name(&self) -> &CStr {
self.name.as_c_str()
}
fn filter(&mut self, level: u32, key: &[u8], value: &[u8]) -> Decision {
(self.filter_fn)(level, key, value)
}
}
pub unsafe extern "C" fn destructor_callback<F>(raw_cb: *mut c_void)
where
F: CompactionFilter,
{
let _: Box<F> = Box::from_raw(raw_cb as *mut F);
}
pub unsafe extern "C" fn name_callback<F>(raw_cb: *mut c_void) -> *const c_char
where
F: CompactionFilterFn,
F: CompactionFilter,
{
let cb = &*(raw_cb as *mut CompactionFilterCallback<F>);
cb.name.as_ptr()
let cb = &*(raw_cb as *mut F);
cb.name().as_ptr()
}
pub unsafe extern "C" fn filter_callback<F>(
@ -78,14 +123,14 @@ pub unsafe extern "C" fn filter_callback<F>(
value_changed: *mut c_uchar,
) -> c_uchar
where
F: CompactionFilterFn,
F: CompactionFilter,
{
use self::Decision::{Change, Keep, Remove};
let cb = &mut *(raw_cb as *mut CompactionFilterCallback<F>);
let cb = &mut *(raw_cb as *mut F);
let key = slice::from_raw_parts(raw_key as *const u8, key_length as usize);
let oldval = slice::from_raw_parts(existing_value as *const u8, value_length as usize);
let result = (cb.filter_fn)(level as u32, key, oldval);
let result = cb.filter(level as u32, key, oldval);
match result {
Keep => 0,
Remove => 1,

@ -0,0 +1,141 @@
use std::ffi::CStr;
use libc::{self, c_char, c_void};
use crate::{
compaction_filter::{self, CompactionFilter},
ffi,
};
/// Each compaction will create a new CompactionFilter allowing the
/// application to know about different compactions.
///
/// See [compaction_filter::CompactionFilter][CompactionFilter] and
/// [Options::set_compaction_filter_factory][set_compaction_filter_factory]
/// for more details
///
/// [CompactionFilter]: ../compaction_filter/trait.CompactionFilter.html
/// [set_compaction_filter_factory]: ../struct.Options.html#method.set_compaction_filter_factory
pub trait CompactionFilterFactory {
type Filter: CompactionFilter;
/// Returns a CompactionFilter for the compaction process
fn create(&mut self, context: CompactionFilterContext) -> Self::Filter;
/// Returns a name that identifies this compaction filter factory.
fn name(&self) -> &CStr;
}
pub unsafe extern "C" fn destructor_callback<F>(raw_self: *mut c_void)
where
F: CompactionFilterFactory,
{
let _: Box<F> = Box::from_raw(raw_self as *mut F);
}
pub unsafe extern "C" fn name_callback<F>(raw_self: *mut c_void) -> *const c_char
where
F: CompactionFilterFactory,
{
let self_ = &*(raw_self as *const c_void as *const F);
self_.name().as_ptr()
}
/// Context information of a compaction run
pub struct CompactionFilterContext {
/// Does this compaction run include all data files
pub is_full_compaction: bool,
/// Is this compaction requested by the client (true),
/// or is it occurring as an automatic compaction process
pub is_manual_compaction: bool,
}
impl CompactionFilterContext {
unsafe fn from_raw(ptr: *mut ffi::rocksdb_compactionfiltercontext_t) -> Self {
let is_full_compaction = ffi::rocksdb_compactionfiltercontext_is_full_compaction(ptr) != 0;
let is_manual_compaction =
ffi::rocksdb_compactionfiltercontext_is_manual_compaction(ptr) != 0;
CompactionFilterContext {
is_full_compaction,
is_manual_compaction,
}
}
}
pub unsafe extern "C" fn create_compaction_filter_callback<F>(
raw_self: *mut c_void,
context: *mut ffi::rocksdb_compactionfiltercontext_t,
) -> *mut ffi::rocksdb_compactionfilter_t
where
F: CompactionFilterFactory,
{
let self_ = &mut *(raw_self as *mut F);
let context = CompactionFilterContext::from_raw(context);
let filter = Box::new(self_.create(context));
let filter_ptr = Box::into_raw(filter);
ffi::rocksdb_compactionfilter_create(
filter_ptr as *mut c_void,
Some(compaction_filter::destructor_callback::<F::Filter>),
Some(compaction_filter::filter_callback::<F::Filter>),
Some(compaction_filter::name_callback::<F::Filter>),
)
}
#[cfg(test)]
#[allow(clippy::wildcard_imports)]
mod tests {
use super::*;
use crate::compaction_filter::Decision;
use crate::{Options, DB};
use std::ffi::CString;
struct CountFilter(u16, CString);
impl CompactionFilter for CountFilter {
fn filter(&mut self, _level: u32, _key: &[u8], _value: &[u8]) -> crate::CompactionDecision {
self.0 += 1;
if self.0 > 2 {
Decision::Remove
} else {
Decision::Keep
}
}
fn name(&self) -> &CStr {
&self.1
}
}
struct TestFactory(CString);
impl CompactionFilterFactory for TestFactory {
type Filter = CountFilter;
fn create(&mut self, _context: CompactionFilterContext) -> Self::Filter {
CountFilter(0, CString::new("CountFilter").unwrap())
}
fn name(&self) -> &CStr {
&self.0
}
}
#[test]
fn compaction_filter_factory_test() {
let path = "_rust_rocksdb_filterfactorytest";
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_compaction_filter_factory(TestFactory(CString::new("TestFactory").unwrap()));
{
let db = DB::open(&opts, path).unwrap();
let _ = db.put(b"k1", b"a");
let _ = db.put(b"_k", b"b");
let _ = db.put(b"%k", b"c");
db.compact_range(None::<&[u8]>, None::<&[u8]>);
assert_eq!(db.get(b"%k1").unwrap(), None);
}
let result = DB::destroy(&opts, path);
assert!(result.is_ok());
}
}

@ -19,7 +19,8 @@ use std::path::Path;
use libc::{self, c_char, c_int, c_uchar, c_uint, c_void, size_t};
use crate::{
compaction_filter::{self, filter_callback, CompactionFilterCallback, CompactionFilterFn},
compaction_filter::{self, CompactionFilterCallback, CompactionFilterFn},
compaction_filter_factory::{self, CompactionFilterFactory},
comparator::{self, ComparatorCallback, CompareFn},
ffi,
merge_operator::{
@ -972,14 +973,40 @@ impl Options {
unsafe {
let cf = ffi::rocksdb_compactionfilter_create(
mem::transmute(cb),
Some(compaction_filter::destructor_callback::<F>),
Some(filter_callback::<F>),
Some(compaction_filter::name_callback::<F>),
Some(compaction_filter::destructor_callback::<CompactionFilterCallback<F>>),
Some(compaction_filter::filter_callback::<CompactionFilterCallback<F>>),
Some(compaction_filter::name_callback::<CompactionFilterCallback<F>>),
);
ffi::rocksdb_options_set_compaction_filter(self.inner, cf);
}
}
/// This is a factory that provides compaction filter objects which allow
/// an application to modify/delete a key-value during background compaction.
///
/// A new filter will be created on each compaction run. If multithreaded
/// compaction is being used, each created CompactionFilter will only be used
/// from a single thread and so does not need to be thread-safe.
///
/// Default: nullptr
pub fn set_compaction_filter_factory<F>(&mut self, factory: F)
where
F: CompactionFilterFactory + 'static,
{
let factory = Box::new(factory);
unsafe {
let cff = ffi::rocksdb_compactionfilterfactory_create(
Box::into_raw(factory) as *mut c_void,
Some(compaction_filter_factory::destructor_callback::<F>),
Some(compaction_filter_factory::create_compaction_filter_callback::<F>),
Some(compaction_filter_factory::name_callback::<F>),
);
ffi::rocksdb_options_set_compaction_filter_factory(self.inner, cff);
}
}
/// Sets the comparator used to define the order of keys in the table.
/// Default: a comparator that uses lexicographic byte-wise ordering
///
@ -1088,6 +1115,15 @@ impl Options {
}
}
/// If max_open_files is -1, DB will open all files on DB::Open(). You can
/// use this option to increase the number of threads used to open the files.
/// Default: 16
pub fn set_max_file_opening_threads(&mut self, nthreads: c_int) {
unsafe {
ffi::rocksdb_options_set_max_file_opening_threads(self.inner, nthreads);
}
}
/// If true, then every store to stable storage will issue a fsync.
/// If false, then every store to stable storage will issue a fdatasync.
/// This parameter should be set to true while storing data to
@ -1353,6 +1389,16 @@ impl Options {
}
}
/// By default target_file_size_multiplier is 1, which means
/// by default files in different levels will have similar size.
///
/// Dynamically changeable through SetOptions() API
pub fn set_target_file_size_multiplier(&mut self, multiplier: i32) {
unsafe {
ffi::rocksdb_options_set_target_file_size_multiplier(self.inner, multiplier as c_int)
}
}
/// Sets the minimum number of write buffers that will be merged together
/// before writing to storage. If set to `1`, then
/// all write buffers are flushed to L0 as individual files and this increases
@ -1872,6 +1918,75 @@ impl Options {
}
}
/// Different max-size multipliers for different levels.
/// These are multiplied by max_bytes_for_level_multiplier to arrive
/// at the max-size of each level.
///
/// Default: 1
///
/// Dynamically changeable through SetOptions() API
pub fn set_max_bytes_for_level_multiplier_additional(&mut self, level_values: &[i32]) {
let count = level_values.len();
unsafe {
ffi::rocksdb_options_set_max_bytes_for_level_multiplier_additional(
self.inner,
level_values.as_ptr() as *mut c_int,
count,
)
}
}
/// If true, then DB::Open() will not fetch and check sizes of all sst files.
/// This may significantly speed up startup if there are many sst files,
/// especially when using non-default Env with expensive GetFileSize().
/// We'll still check that all required sst files exist.
/// If paranoid_checks is false, this option is ignored, and sst files are
/// not checked at all.
///
/// Default: false
pub fn set_skip_checking_sst_file_sizes_on_db_open(&mut self, value: bool) {
unsafe {
ffi::rocksdb_options_set_skip_checking_sst_file_sizes_on_db_open(
self.inner,
value as c_uchar,
)
}
}
/// The total maximum size(bytes) of write buffers to maintain in memory
/// including copies of buffers that have already been flushed. This parameter
/// only affects trimming of flushed buffers and does not affect flushing.
/// This controls the maximum amount of write history that will be available
/// in memory for conflict checking when Transactions are used. The actual
/// size of write history (flushed Memtables) might be higher than this limit
/// if further trimming will reduce write history total size below this
/// limit. For example, if max_write_buffer_size_to_maintain is set to 64MB,
/// and there are three flushed Memtables, with sizes of 32MB, 20MB, 20MB.
/// Because trimming the next Memtable of size 20MB will reduce total memory
/// usage to 52MB which is below the limit, RocksDB will stop trimming.
///
/// When using an OptimisticTransactionDB:
/// If this value is too low, some transactions may fail at commit time due
/// to not being able to determine whether there were any write conflicts.
///
/// When using a TransactionDB:
/// If Transaction::SetSnapshot is used, TransactionDB will read either
/// in-memory write buffers or SST files to do write-conflict checking.
/// Increasing this value can reduce the number of reads to SST files
/// done for conflict detection.
///
/// Setting this value to 0 will cause write buffers to be freed immediately
/// after they are flushed. If this value is set to -1,
/// 'max_write_buffer_number * write_buffer_size' will be used.
///
/// Default:
/// If using a TransactionDB/OptimisticTransactionDB, the default value will
/// be set to the value of 'max_write_buffer_number * write_buffer_size'
/// if it is not explicitly set by the user. Otherwise, the default is 0.
pub fn set_max_write_buffer_size_to_maintain(&mut self, size: i64) {
unsafe { ffi::rocksdb_options_set_max_write_buffer_size_to_maintain(self.inner, size) }
}
/// By default, a single write thread queue is maintained. The thread gets
/// to the head of the queue becomes write batch group leader and responsible
/// for writing to WAL and memtable for the batch group.

@ -78,6 +78,7 @@ pub mod backup;
pub mod checkpoint;
mod column_family;
pub mod compaction_filter;
pub mod compaction_filter_factory;
mod comparator;
mod db;
mod db_iterator;

Loading…
Cancel
Save