diff --git a/src/compaction_filter.rs b/src/compaction_filter.rs index 0b1818a..53bcba0 100644 --- a/src/compaction_filter.rs +++ b/src/compaction_filter.rs @@ -14,8 +14,7 @@ // use libc::{c_char, c_int, c_uchar, c_void, size_t}; -use std::ffi::CString; -use std::mem; +use std::ffi::{CStr, CString}; use std::slice; /// Decision about how to handle compacting an object @@ -32,6 +31,39 @@ pub enum Decision { Change(&'static [u8]), } +/// CompactionFilter allows an application to modify/delete a key-value at +/// the time of compaction. +pub trait CompactionFilter { + /// The compaction process invokes this + /// method for kv that is being compacted. The application can inspect + /// the existing value of the key and make decision based on it. + /// + /// Key-Values that are results of merge operation during compaction are not + /// passed into this function. Currently, when you have a mix of Put()s and + /// Merge()s on a same key, we only guarantee to process the merge operands + /// through the compaction filters. Put()s might be processed, or might not. + /// + /// When the value is to be preserved, the application has the option + /// to modify the existing_value and pass it back through new_value. + /// value_changed needs to be set to true in this case. + /// + /// Note that RocksDB snapshots (i.e. call GetSnapshot() API on a + /// DB* object) will not guarantee to preserve the state of the DB with + /// CompactionFilter. Data seen from a snapshot might disppear after a + /// compaction finishes. If you use snapshots, think twice about whether you + /// want to use compaction filter and whether you are using it in a safe way. + /// + /// If the CompactionFilter was created by a factory, then it will only ever + /// be used by a single thread that is doing the compaction run, and this + /// call does not need to be thread-safe. However, multiple filters may be + /// in existence and operating concurrently. + fn filter(&mut self, level: u32, key: &[u8], value: &[u8]) -> Decision; + + /// Returns a name that identifies this compaction filter. + /// The name will be printed to LOG file on start up for diagnosis. + fn name(&self) -> &CStr; +} + /// Function to filter compaction with. /// /// This function takes the level of compaction, the key, and the existing value @@ -51,19 +83,32 @@ where pub filter_fn: F, } -pub unsafe extern "C" fn destructor_callback(raw_cb: *mut c_void) +impl CompactionFilter for CompactionFilterCallback where F: CompactionFilterFn, { - let _: Box> = mem::transmute(raw_cb); + fn name(&self) -> &CStr { + self.name.as_c_str() + } + + fn filter(&mut self, level: u32, key: &[u8], value: &[u8]) -> Decision { + (self.filter_fn)(level, key, value) + } +} + +pub unsafe extern "C" fn destructor_callback(raw_cb: *mut c_void) +where + F: CompactionFilter, +{ + let _: Box = Box::from_raw(raw_cb as *mut F); } pub unsafe extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char where - F: CompactionFilterFn, + F: CompactionFilter, { - let cb = &*(raw_cb as *mut CompactionFilterCallback); - cb.name.as_ptr() + let cb = &*(raw_cb as *mut F); + cb.name().as_ptr() } pub unsafe extern "C" fn filter_callback( @@ -78,14 +123,14 @@ pub unsafe extern "C" fn filter_callback( value_changed: *mut c_uchar, ) -> c_uchar where - F: CompactionFilterFn, + F: CompactionFilter, { use self::Decision::{Change, Keep, Remove}; - let cb = &mut *(raw_cb as *mut CompactionFilterCallback); + let cb = &mut *(raw_cb as *mut F); let key = slice::from_raw_parts(raw_key as *const u8, key_length as usize); let oldval = slice::from_raw_parts(existing_value as *const u8, value_length as usize); - let result = (cb.filter_fn)(level as u32, key, oldval); + let result = cb.filter(level as u32, key, oldval); match result { Keep => 0, Remove => 1, diff --git a/src/compaction_filter_factory.rs b/src/compaction_filter_factory.rs new file mode 100644 index 0000000..596c96d --- /dev/null +++ b/src/compaction_filter_factory.rs @@ -0,0 +1,141 @@ +use std::ffi::CStr; + +use libc::{self, c_char, c_void}; + +use crate::{ + compaction_filter::{self, CompactionFilter}, + ffi, +}; + +/// Each compaction will create a new CompactionFilter allowing the +/// application to know about different compactions. +/// +/// See [compaction_filter::CompactionFilter][CompactionFilter] and +/// [Options::set_compaction_filter_factory][set_compaction_filter_factory] +/// for more details +/// +/// [CompactionFilter]: ../compaction_filter/trait.CompactionFilter.html +/// [set_compaction_filter_factory]: ../struct.Options.html#method.set_compaction_filter_factory +pub trait CompactionFilterFactory { + type Filter: CompactionFilter; + + /// Returns a CompactionFilter for the compaction process + fn create(&mut self, context: CompactionFilterContext) -> Self::Filter; + + /// Returns a name that identifies this compaction filter factory. + fn name(&self) -> &CStr; +} + +pub unsafe extern "C" fn destructor_callback(raw_self: *mut c_void) +where + F: CompactionFilterFactory, +{ + let _: Box = Box::from_raw(raw_self as *mut F); +} + +pub unsafe extern "C" fn name_callback(raw_self: *mut c_void) -> *const c_char +where + F: CompactionFilterFactory, +{ + let self_ = &*(raw_self as *const c_void as *const F); + self_.name().as_ptr() +} + +/// Context information of a compaction run +pub struct CompactionFilterContext { + /// Does this compaction run include all data files + pub is_full_compaction: bool, + /// Is this compaction requested by the client (true), + /// or is it occurring as an automatic compaction process + pub is_manual_compaction: bool, +} + +impl CompactionFilterContext { + unsafe fn from_raw(ptr: *mut ffi::rocksdb_compactionfiltercontext_t) -> Self { + let is_full_compaction = ffi::rocksdb_compactionfiltercontext_is_full_compaction(ptr) != 0; + let is_manual_compaction = + ffi::rocksdb_compactionfiltercontext_is_manual_compaction(ptr) != 0; + + CompactionFilterContext { + is_full_compaction, + is_manual_compaction, + } + } +} + +pub unsafe extern "C" fn create_compaction_filter_callback( + raw_self: *mut c_void, + context: *mut ffi::rocksdb_compactionfiltercontext_t, +) -> *mut ffi::rocksdb_compactionfilter_t +where + F: CompactionFilterFactory, +{ + let self_ = &mut *(raw_self as *mut F); + let context = CompactionFilterContext::from_raw(context); + let filter = Box::new(self_.create(context)); + + let filter_ptr = Box::into_raw(filter); + + ffi::rocksdb_compactionfilter_create( + filter_ptr as *mut c_void, + Some(compaction_filter::destructor_callback::), + Some(compaction_filter::filter_callback::), + Some(compaction_filter::name_callback::), + ) +} + +#[cfg(test)] +#[allow(clippy::wildcard_imports)] +mod tests { + use super::*; + use crate::compaction_filter::Decision; + use crate::{Options, DB}; + use std::ffi::CString; + + struct CountFilter(u16, CString); + impl CompactionFilter for CountFilter { + fn filter(&mut self, _level: u32, _key: &[u8], _value: &[u8]) -> crate::CompactionDecision { + self.0 += 1; + if self.0 > 2 { + Decision::Remove + } else { + Decision::Keep + } + } + + fn name(&self) -> &CStr { + &self.1 + } + } + + struct TestFactory(CString); + impl CompactionFilterFactory for TestFactory { + type Filter = CountFilter; + + fn create(&mut self, _context: CompactionFilterContext) -> Self::Filter { + CountFilter(0, CString::new("CountFilter").unwrap()) + } + + fn name(&self) -> &CStr { + &self.0 + } + } + + #[test] + fn compaction_filter_factory_test() { + let path = "_rust_rocksdb_filterfactorytest"; + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.set_compaction_filter_factory(TestFactory(CString::new("TestFactory").unwrap())); + { + let db = DB::open(&opts, path).unwrap(); + let _ = db.put(b"k1", b"a"); + let _ = db.put(b"_k", b"b"); + let _ = db.put(b"%k", b"c"); + db.compact_range(None::<&[u8]>, None::<&[u8]>); + assert_eq!(db.get(b"%k1").unwrap(), None); + } + let result = DB::destroy(&opts, path); + assert!(result.is_ok()); + } +} diff --git a/src/db_options.rs b/src/db_options.rs index 52e5c7a..c623996 100644 --- a/src/db_options.rs +++ b/src/db_options.rs @@ -19,7 +19,8 @@ use std::path::Path; use libc::{self, c_char, c_int, c_uchar, c_uint, c_void, size_t}; use crate::{ - compaction_filter::{self, filter_callback, CompactionFilterCallback, CompactionFilterFn}, + compaction_filter::{self, CompactionFilterCallback, CompactionFilterFn}, + compaction_filter_factory::{self, CompactionFilterFactory}, comparator::{self, ComparatorCallback, CompareFn}, ffi, merge_operator::{ @@ -972,14 +973,40 @@ impl Options { unsafe { let cf = ffi::rocksdb_compactionfilter_create( mem::transmute(cb), - Some(compaction_filter::destructor_callback::), - Some(filter_callback::), - Some(compaction_filter::name_callback::), + Some(compaction_filter::destructor_callback::>), + Some(compaction_filter::filter_callback::>), + Some(compaction_filter::name_callback::>), ); ffi::rocksdb_options_set_compaction_filter(self.inner, cf); } } + /// This is a factory that provides compaction filter objects which allow + /// an application to modify/delete a key-value during background compaction. + /// + /// A new filter will be created on each compaction run. If multithreaded + /// compaction is being used, each created CompactionFilter will only be used + /// from a single thread and so does not need to be thread-safe. + /// + /// Default: nullptr + pub fn set_compaction_filter_factory(&mut self, factory: F) + where + F: CompactionFilterFactory + 'static, + { + let factory = Box::new(factory); + + unsafe { + let cff = ffi::rocksdb_compactionfilterfactory_create( + Box::into_raw(factory) as *mut c_void, + Some(compaction_filter_factory::destructor_callback::), + Some(compaction_filter_factory::create_compaction_filter_callback::), + Some(compaction_filter_factory::name_callback::), + ); + + ffi::rocksdb_options_set_compaction_filter_factory(self.inner, cff); + } + } + /// Sets the comparator used to define the order of keys in the table. /// Default: a comparator that uses lexicographic byte-wise ordering /// @@ -1088,6 +1115,15 @@ impl Options { } } + /// If max_open_files is -1, DB will open all files on DB::Open(). You can + /// use this option to increase the number of threads used to open the files. + /// Default: 16 + pub fn set_max_file_opening_threads(&mut self, nthreads: c_int) { + unsafe { + ffi::rocksdb_options_set_max_file_opening_threads(self.inner, nthreads); + } + } + /// If true, then every store to stable storage will issue a fsync. /// If false, then every store to stable storage will issue a fdatasync. /// This parameter should be set to true while storing data to @@ -1353,6 +1389,16 @@ impl Options { } } + /// By default target_file_size_multiplier is 1, which means + /// by default files in different levels will have similar size. + /// + /// Dynamically changeable through SetOptions() API + pub fn set_target_file_size_multiplier(&mut self, multiplier: i32) { + unsafe { + ffi::rocksdb_options_set_target_file_size_multiplier(self.inner, multiplier as c_int) + } + } + /// Sets the minimum number of write buffers that will be merged together /// before writing to storage. If set to `1`, then /// all write buffers are flushed to L0 as individual files and this increases @@ -1872,6 +1918,75 @@ impl Options { } } + /// Different max-size multipliers for different levels. + /// These are multiplied by max_bytes_for_level_multiplier to arrive + /// at the max-size of each level. + /// + /// Default: 1 + /// + /// Dynamically changeable through SetOptions() API + pub fn set_max_bytes_for_level_multiplier_additional(&mut self, level_values: &[i32]) { + let count = level_values.len(); + unsafe { + ffi::rocksdb_options_set_max_bytes_for_level_multiplier_additional( + self.inner, + level_values.as_ptr() as *mut c_int, + count, + ) + } + } + + /// If true, then DB::Open() will not fetch and check sizes of all sst files. + /// This may significantly speed up startup if there are many sst files, + /// especially when using non-default Env with expensive GetFileSize(). + /// We'll still check that all required sst files exist. + /// If paranoid_checks is false, this option is ignored, and sst files are + /// not checked at all. + /// + /// Default: false + pub fn set_skip_checking_sst_file_sizes_on_db_open(&mut self, value: bool) { + unsafe { + ffi::rocksdb_options_set_skip_checking_sst_file_sizes_on_db_open( + self.inner, + value as c_uchar, + ) + } + } + + /// The total maximum size(bytes) of write buffers to maintain in memory + /// including copies of buffers that have already been flushed. This parameter + /// only affects trimming of flushed buffers and does not affect flushing. + /// This controls the maximum amount of write history that will be available + /// in memory for conflict checking when Transactions are used. The actual + /// size of write history (flushed Memtables) might be higher than this limit + /// if further trimming will reduce write history total size below this + /// limit. For example, if max_write_buffer_size_to_maintain is set to 64MB, + /// and there are three flushed Memtables, with sizes of 32MB, 20MB, 20MB. + /// Because trimming the next Memtable of size 20MB will reduce total memory + /// usage to 52MB which is below the limit, RocksDB will stop trimming. + /// + /// When using an OptimisticTransactionDB: + /// If this value is too low, some transactions may fail at commit time due + /// to not being able to determine whether there were any write conflicts. + /// + /// When using a TransactionDB: + /// If Transaction::SetSnapshot is used, TransactionDB will read either + /// in-memory write buffers or SST files to do write-conflict checking. + /// Increasing this value can reduce the number of reads to SST files + /// done for conflict detection. + /// + /// Setting this value to 0 will cause write buffers to be freed immediately + /// after they are flushed. If this value is set to -1, + /// 'max_write_buffer_number * write_buffer_size' will be used. + /// + /// Default: + /// If using a TransactionDB/OptimisticTransactionDB, the default value will + /// be set to the value of 'max_write_buffer_number * write_buffer_size' + /// if it is not explicitly set by the user. Otherwise, the default is 0. + pub fn set_max_write_buffer_size_to_maintain(&mut self, size: i64) { + unsafe { ffi::rocksdb_options_set_max_write_buffer_size_to_maintain(self.inner, size) } + } + /// By default, a single write thread queue is maintained. The thread gets /// to the head of the queue becomes write batch group leader and responsible /// for writing to WAL and memtable for the batch group. diff --git a/src/lib.rs b/src/lib.rs index 5d6c017..e4fa090 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,6 +78,7 @@ pub mod backup; pub mod checkpoint; mod column_family; pub mod compaction_filter; +pub mod compaction_filter_factory; mod comparator; mod db; mod db_iterator;