From 12093252c6ece15fda869d4e13bd9d4c8c891550 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Sat, 10 Feb 2018 22:03:14 +0100 Subject: [PATCH] add slice transform support --- src/db.rs | 6 ++ src/db_options.rs | 29 +++++-- src/lib.rs | 3 + src/slice_transform.rs | 149 ++++++++++++++++++++++++++++++++++ tests/test_slice_transform.rs | 48 +++++++++++ 5 files changed, 226 insertions(+), 9 deletions(-) create mode 100644 src/slice_transform.rs create mode 100644 tests/test_slice_transform.rs diff --git a/src/db.rs b/src/db.rs index 0e8ed26..c292283 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1237,6 +1237,12 @@ impl ReadOptions { ffi::rocksdb_readoptions_set_prefix_same_as_start(self.inner, v as c_uchar) } } + + pub fn set_total_order_seek(&mut self, v:bool) { + unsafe { + ffi::rocksdb_readoptions_set_total_order_seek(self.inner, v as c_uchar) + } + } } impl Default for ReadOptions { diff --git a/src/db_options.rs b/src/db_options.rs index fcffce9..716b6f7 100644 --- a/src/db_options.rs +++ b/src/db_options.rs @@ -11,20 +11,20 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -// +use std::ffi::{CStr, CString}; +use std::mem; + +use libc::{self, c_int, c_uchar, c_uint, c_void, size_t, uint64_t}; -use {BlockBasedOptions, DBCompactionStyle, DBCompressionType, DBRecoveryMode, Options, - WriteOptions}; +use ffi; +use {BlockBasedOptions, DBCompactionStyle, DBCompressionType, DBRecoveryMode, + Options, WriteOptions}; use compaction_filter::{self, CompactionFilterCallback, CompactionFilterFn, filter_callback}; use comparator::{self, ComparatorCallback, CompareFn}; -use ffi; - -use libc::{self, c_int, c_uchar, c_uint, c_void, size_t, uint64_t}; use merge_operator::{self, MergeFn, MergeOperatorCallback, full_merge_callback, partial_merge_callback}; -use std::ffi::{CStr, CString}; -use std::mem; +use slice_transform::SliceTransform; pub fn new_cache(capacity: size_t) -> *mut ffi::rocksdb_cache_t { unsafe { ffi::rocksdb_cache_create_lru(capacity) } @@ -221,7 +221,10 @@ impl Options { } } - pub fn set_merge_operator(&mut self, name: &str, full_merge_fn: MergeFn, partial_merge_fn: Option) { + pub fn set_merge_operator(&mut self, name: &str, + full_merge_fn: MergeFn, + partial_merge_fn: Option) { + let cb = Box::new(MergeOperatorCallback { name: CString::new(name.as_bytes()).unwrap(), full_merge_fn: full_merge_fn, @@ -300,6 +303,14 @@ impl Options { } } + pub fn set_prefix_extractor(&mut self, prefix_extractor: SliceTransform) { + unsafe { + ffi::rocksdb_options_set_prefix_extractor( + self.inner, prefix_extractor.inner + ) + } + } + #[deprecated(since = "0.5.0", note = "add_comparator has been renamed to set_comparator")] pub fn add_comparator(&mut self, name: &str, compare_fn: CompareFn) { self.set_comparator(name, compare_fn); diff --git a/src/lib.rs b/src/lib.rs index 6cdae3a..f982702 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,12 +58,15 @@ pub mod merge_operator; pub mod compaction_filter; mod db; mod db_options; +mod slice_transform; pub use compaction_filter::Decision as CompactionDecision; pub use db::{DBCompactionStyle, DBCompressionType, DBIterator, DBRawIterator, DBRecoveryMode, DBVector, ReadOptions, Direction, IteratorMode, Snapshot, WriteBatch, new_bloom_filter}; +pub use slice_transform::SliceTransform; + pub use merge_operator::MergeOperands; use std::collections::BTreeMap; use std::error; diff --git a/src/slice_transform.rs b/src/slice_transform.rs new file mode 100644 index 0000000..f834be3 --- /dev/null +++ b/src/slice_transform.rs @@ -0,0 +1,149 @@ +// Copyright 2018 Tyler Neely +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ffi::CString; +use std::mem; +use std::ptr; +use std::slice; + +use libc::{self, c_char, c_void, size_t}; + +use ffi; + +/// A SliceTranform is a generic pluggable way of transforming one string +/// to another. Its primary use-case is in configuring rocksdb +/// to store prefix blooms by setting prefix_extractor in +/// ColumnFamilyOptions. +pub struct SliceTransform { + pub inner: *mut ffi::rocksdb_slicetransform_t, +} + +// NB we intentionally don't implement a Drop that passes +// through to rocksdb_slicetransform_destroy because +// this is currently only used (to my knowledge) +// by people passing it as a prefix extractor when +// opening a DB. + +impl SliceTransform { + pub fn create( + name: &str, + transform_fn: TransformFn, + in_domain_fn: Option, + ) -> SliceTransform{ + let cb = Box::new(TransformCallback { + name: CString::new(name.as_bytes()).unwrap(), + transform_fn: transform_fn, + in_domain_fn: in_domain_fn, + }); + + let st = unsafe { + ffi::rocksdb_slicetransform_create( + mem::transmute(cb), + Some(slice_transform_destructor_callback), + Some(transform_callback), + + // this is ugly, but I can't get the compiler + // not to barf with "expected fn pointer, found fn item" + // without this. sorry. + if let Some(_) = in_domain_fn { + Some(in_domain_callback) + } else { + None + }, + + // this None points to the deprecated InRange callback + None, + Some(slice_transform_name_callback), + ) + }; + + SliceTransform { + inner: st + } + } + + pub fn create_fixed_prefix(len: size_t) -> SliceTransform { + SliceTransform { + inner: unsafe { + ffi::rocksdb_slicetransform_create_fixed_prefix(len) + }, + } + } + + pub fn create_noop() -> SliceTransform { + SliceTransform { + inner: unsafe { + ffi::rocksdb_slicetransform_create_noop() + }, + } + } +} + +pub type TransformFn = fn(&[u8]) -> Vec; +pub type InDomainFn = fn(&[u8]) -> bool; + +pub struct TransformCallback { + pub name: CString, + pub transform_fn: TransformFn, + pub in_domain_fn: Option, +} + +pub unsafe extern "C" fn slice_transform_destructor_callback( + raw_cb: *mut c_void +) { + let transform: Box = mem::transmute(raw_cb); + drop(transform); +} + +pub unsafe extern "C" fn slice_transform_name_callback( + raw_cb: *mut c_void +) -> *const c_char { + let cb = &mut *(raw_cb as *mut TransformCallback); + cb.name.as_ptr() +} + +pub unsafe extern "C" fn transform_callback( + raw_cb: *mut c_void, + raw_key: *const c_char, + key_len: size_t, + dst_length: *mut size_t, +) -> *mut c_char { + let cb = &mut *(raw_cb as *mut TransformCallback); + let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize); + let mut result = (cb.transform_fn)(key); + result.shrink_to_fit(); + + // copy the result into a C++ destroyable buffer + let buf = libc::malloc(result.len() as size_t); + assert!(!buf.is_null()); + ptr::copy(result.as_ptr() as *mut c_void, &mut *buf, result.len()); + + *dst_length = result.len() as size_t; + buf as *mut c_char +} + +pub unsafe extern "C" fn in_domain_callback( + raw_cb: *mut c_void, + raw_key: *const c_char, + key_len: size_t, +) -> u8 { + let cb = &mut *(raw_cb as *mut TransformCallback); + let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize); + + if (cb.in_domain_fn.unwrap())(key) { + 1 + } else { + 0 + } +} diff --git a/tests/test_slice_transform.rs b/tests/test_slice_transform.rs new file mode 100644 index 0000000..832681a --- /dev/null +++ b/tests/test_slice_transform.rs @@ -0,0 +1,48 @@ +extern crate rocksdb; + +use rocksdb::{DB, Options, SliceTransform}; + +#[test] +pub fn test_slice_transform() { + + let path = "_rust_rocksdb_slicetransform_test"; + let a1: Box<[u8]> = key(b"aaa1"); + let a2: Box<[u8]> = key(b"aaa2"); + let b1: Box<[u8]> = key(b"bbb1"); + let b2: Box<[u8]> = key(b"bbb2"); + + fn first_three(k: &[u8]) -> Vec { + k.iter().take(3).cloned().collect() + } + + let prefix_extractor = SliceTransform::create("first_three", first_three, None); + + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.set_prefix_extractor(prefix_extractor); + + let db = DB::open(&opts, path).unwrap(); + + assert!(db.put(&*a1, &*a1).is_ok()); + assert!(db.put(&*a2, &*a2).is_ok()); + assert!(db.put(&*b1, &*b1).is_ok()); + assert!(db.put(&*b2, &*b2).is_ok()); + + fn cba(input: &Box<[u8]>) -> Box<[u8]> { + input.iter().cloned().collect::>().into_boxed_slice() + } + + fn key(k: &[u8]) -> Box<[u8]> { k.to_vec().into_boxed_slice() } + + { + let expected = vec![(cba(&a1), cba(&a1)), (cba(&a2), cba(&a2))]; + let a_iterator = db.prefix_iterator(b"aaa"); + assert_eq!(a_iterator.collect::>(), expected) + } + + { + let expected = vec![(cba(&b1), cba(&b1)), (cba(&b2), cba(&b2))]; + let b_iterator = db.prefix_iterator(b"bbb"); + assert_eq!(b_iterator.collect::>(), expected) + } +}