diff --git a/src/compaction_filter.rs b/src/compaction_filter.rs new file mode 100644 index 0000000..6638c8a --- /dev/null +++ b/src/compaction_filter.rs @@ -0,0 +1,120 @@ +// Copyright 2016 Tyler Neely +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use libc::{c_char, c_int, c_uchar, c_void, size_t}; +use std::ffi::CString; +use std::mem; +use std::slice; + +/// Decision about how to handle compacting an object +/// +/// This is returned by a compaction filter callback. Depending +/// on the value, the object may be kept, removed, or changed +/// in the database during a compaction. +pub enum Decision { + /// Keep the old value + Keep, + /// Remove the object from the database + Remove, + /// Change the value for the key + Change(&'static [u8]) +} + + +/// Function to filter compaction with. +/// +/// This function takes the level of compaction, the key, and the existing value +/// and returns the decision about how to handle the Key-Value pair. +/// +/// See [Options::set_compaction_filter][set_compaction_filter] for more details +/// +/// [set_compaction_filter]: ../struct.Options.html#method.set_compaction_filter +pub trait CompactionFilterFn: FnMut(u32, &[u8], &[u8]) -> Decision {} +impl CompactionFilterFn for F where F: FnMut(u32, &[u8], &[u8]) -> Decision, F: Send + 'static {} + +pub struct CompactionFilterCallback where F: CompactionFilterFn { + pub name: CString, + pub filter_fn: F +} + +pub unsafe extern "C" fn destructor_callback(raw_cb: *mut c_void) where F: CompactionFilterFn { + let _: Box> = mem::transmute(raw_cb); +} + +pub unsafe extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char where F: CompactionFilterFn { + let cb = &*(raw_cb as *mut CompactionFilterCallback); + cb.name.as_ptr() +} + +pub unsafe extern "C" fn filter_callback(raw_cb: *mut c_void, + level: c_int, + raw_key: *const c_char, + key_length: size_t, + existing_value: *const c_char, + value_length: size_t, + new_value: *mut *mut c_char, + new_value_length: *mut size_t, + value_changed: *mut c_uchar) -> c_uchar where F: CompactionFilterFn { + use self::Decision::*; + + let cb = &mut *(raw_cb as *mut CompactionFilterCallback); + let key = slice::from_raw_parts(raw_key as *const u8, key_length as usize); + let oldval = slice::from_raw_parts(existing_value as *const u8, value_length as usize); + let result = (cb.filter_fn)(level as u32, key, oldval); + match result { + Keep => 0, + Remove => 1, + Change(newval) => { + *new_value = newval.as_ptr() as *mut c_char; + *new_value_length = newval.len() as size_t; + *value_changed = 1 as c_uchar; + 0 + } + } +} + +#[cfg(test)] +#[allow(unused_variables)] +fn test_filter(level: u32, key: &[u8], value: &[u8]) -> Decision { + use self::Decision::*; + match key.first() { + Some(&b'_') => Remove, + Some(&b'%') => Change(b"secret"), + _ => Keep + } +} + +#[test] +fn compaction_filter_test() { + use {DB, Options}; + + let path = "_rust_rocksdb_filtertest"; + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.set_compaction_filter("test", test_filter); + { + let db = DB::open(&opts, path).unwrap(); + let _ = db.put(b"k1", b"a"); + let _ = db.put(b"_k", b"b"); + let _ = db.put(b"%k", b"c"); + db.compact_range(None, None); + assert_eq!(&*db.get(b"k1").unwrap().unwrap(), b"a"); + assert!(db.get(b"_k").unwrap().is_none()); + assert_eq!(&*db.get(b"%k").unwrap().unwrap(), b"secret"); + } + +} + + diff --git a/src/db_options.rs b/src/db_options.rs index 6ab3c96..18422cb 100644 --- a/src/db_options.rs +++ b/src/db_options.rs @@ -22,6 +22,8 @@ use ffi; use libc::{self, c_int, c_uchar, c_uint, c_void, size_t, uint64_t}; use merge_operator::{self, MergeFn, MergeOperatorCallback, full_merge_callback, partial_merge_callback}; +use compaction_filter::{self, CompactionFilterFn, CompactionFilterCallback, + filter_callback}; use std::ffi::{CStr, CString}; use std::mem; @@ -218,6 +220,31 @@ impl Options { self.set_merge_operator(name, merge_fn); } + /// Sets a compaction filter used to determine if entries should be kept, changed, + /// or removed during compaction. + /// + /// An example use case is to remove entries with an expired TTL. + /// + /// If you take a snapshot of the database, only values written since the last + /// snapshot will be passed through the compaction filter. + /// + /// If multi-threaded compaction is used, `filter_fn` may be called multiple times + /// simultaneously. + pub fn set_compaction_filter(&mut self, name: &str, filter_fn: F) where F: CompactionFilterFn + Send + 'static { + let cb = Box::new(CompactionFilterCallback { + name: CString::new(name.as_bytes()).unwrap(), + filter_fn: filter_fn, + }); + + unsafe { + let cf = ffi::rocksdb_compactionfilter_create(mem::transmute(cb), + Some(compaction_filter::destructor_callback::), + Some(filter_callback::), + Some(compaction_filter::name_callback::)); + ffi::rocksdb_options_set_compaction_filter(self.inner, cf); + } + } + /// Sets the comparator used to define the order of keys in the table. /// Default: a comparator that uses lexicographic byte-wise ordering /// diff --git a/src/lib.rs b/src/lib.rs index 5a54a83..102f95d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,6 +40,7 @@ mod ffi_util; pub mod backup; mod comparator; pub mod merge_operator; +pub mod compaction_filter; mod db; mod db_options; @@ -47,6 +48,7 @@ pub use db::{DBCompactionStyle, DBCompressionType, DBIterator, DBRecoveryMode, D Direction, IteratorMode, Snapshot, WriteBatch, new_bloom_filter}; pub use merge_operator::MergeOperands; +pub use compaction_filter::Decision as CompactionDecision; use std::collections::BTreeMap; use std::error; use std::fmt;