Add merge operands iterator (#559)

master
Vladimir 3 years ago committed by GitHub
parent 0548acfd4d
commit 870e135ed6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 67
      src/merge_operator.rs
  2. 4
      tests/test_column_family.rs
  3. 14
      tests/test_merge_operator.rs

@ -20,10 +20,10 @@
//! //!
//! fn concat_merge(new_key: &[u8], //! fn concat_merge(new_key: &[u8],
//! existing_val: Option<&[u8]>, //! existing_val: Option<&[u8]>,
//! operands: &mut MergeOperands) //! operands: &MergeOperands)
//! -> Option<Vec<u8>> { //! -> Option<Vec<u8>> {
//! //!
//! let mut result: Vec<u8> = Vec::with_capacity(operands.size_hint().0); //! let mut result: Vec<u8> = Vec::with_capacity(operands.len());
//! existing_val.map(|v| { //! existing_val.map(|v| {
//! for e in v { //! for e in v {
//! result.push(*e) //! result.push(*e)
@ -62,11 +62,11 @@ use std::ptr;
use std::slice; use std::slice;
pub trait MergeFn: pub trait MergeFn:
Fn(&[u8], Option<&[u8]>, &mut MergeOperands) -> Option<Vec<u8>> + Send + Sync + 'static Fn(&[u8], Option<&[u8]>, &MergeOperands) -> Option<Vec<u8>> + Send + Sync + 'static
{ {
} }
impl<F> MergeFn for F where impl<F> MergeFn for F where
F: Fn(&[u8], Option<&[u8]>, &mut MergeOperands) -> Option<Vec<u8>> + Send + Sync + 'static F: Fn(&[u8], Option<&[u8]>, &MergeOperands) -> Option<Vec<u8>> + Send + Sync + 'static
{ {
} }
@ -113,7 +113,7 @@ pub unsafe extern "C" fn full_merge_callback<F: MergeFn, PF: MergeFn>(
new_value_length: *mut size_t, new_value_length: *mut size_t,
) -> *mut c_char { ) -> *mut c_char {
let cb = &mut *(raw_cb as *mut MergeOperatorCallback<F, PF>); let cb = &mut *(raw_cb as *mut MergeOperatorCallback<F, PF>);
let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); let operands = &MergeOperands::new(operands_list, operands_list_len, num_operands);
let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize); let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize);
let oldval = if existing_value.is_null() { let oldval = if existing_value.is_null() {
None None
@ -148,7 +148,7 @@ pub unsafe extern "C" fn partial_merge_callback<F: MergeFn, PF: MergeFn>(
new_value_length: *mut size_t, new_value_length: *mut size_t,
) -> *mut c_char { ) -> *mut c_char {
let cb = &mut *(raw_cb as *mut MergeOperatorCallback<F, PF>); let cb = &mut *(raw_cb as *mut MergeOperatorCallback<F, PF>);
let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands); let operands = &MergeOperands::new(operands_list, operands_list_len, num_operands);
let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize); let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize);
(cb.partial_merge_fn)(key, None, operands).map_or_else( (cb.partial_merge_fn)(key, None, operands).map_or_else(
|| { || {
@ -168,7 +168,6 @@ pub struct MergeOperands {
operands_list: *const *const c_char, operands_list: *const *const c_char,
operands_list_len: *const size_t, operands_list_len: *const size_t,
num_operands: usize, num_operands: usize,
cursor: usize,
} }
impl MergeOperands { impl MergeOperands {
@ -182,16 +181,26 @@ impl MergeOperands {
operands_list, operands_list,
operands_list_len, operands_list_len,
num_operands: num_operands as usize, num_operands: num_operands as usize,
cursor: 0,
} }
} }
pub fn len(&self) -> usize {
self.num_operands
} }
impl<'a> Iterator for &'a mut MergeOperands { pub fn is_empty(&self) -> bool {
type Item = &'a [u8]; self.num_operands == 0
}
fn next(&mut self) -> Option<&'a [u8]> { pub fn iter(&self) -> MergeOperandsIter {
if self.cursor == self.num_operands { MergeOperandsIter {
operands: self,
cursor: 0,
}
}
fn get_operand(&self, index: usize) -> Option<&[u8]> {
if index >= self.num_operands {
None None
} else { } else {
unsafe { unsafe {
@ -199,10 +208,9 @@ impl<'a> Iterator for &'a mut MergeOperands {
let base_len = self.operands_list_len as usize; let base_len = self.operands_list_len as usize;
let spacing = mem::size_of::<*const *const u8>(); let spacing = mem::size_of::<*const *const u8>();
let spacing_len = mem::size_of::<*const size_t>(); let spacing_len = mem::size_of::<*const size_t>();
let len_ptr = (base_len + (spacing_len * self.cursor)) as *const size_t; let len_ptr = (base_len + (spacing_len * index)) as *const size_t;
let len = *len_ptr as usize; let len = *len_ptr as usize;
let ptr = base + (spacing * self.cursor); let ptr = base + (spacing * index);
self.cursor += 1;
Some(mem::transmute(slice::from_raw_parts( Some(mem::transmute(slice::from_raw_parts(
*(ptr as *const *const u8) as *const u8, *(ptr as *const *const u8) as *const u8,
len, len,
@ -210,9 +218,36 @@ impl<'a> Iterator for &'a mut MergeOperands {
} }
} }
} }
}
pub struct MergeOperandsIter<'a> {
operands: &'a MergeOperands,
cursor: usize,
}
impl<'a> Iterator for MergeOperandsIter<'a> {
type Item = &'a [u8];
fn next(&mut self) -> Option<Self::Item> {
let operand = self.operands.get_operand(self.cursor)?;
self.cursor += 1;
Some(operand)
}
fn size_hint(&self) -> (usize, Option<usize>) { fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.num_operands - self.cursor; let remaining = self.operands.num_operands - self.cursor;
(remaining, Some(remaining)) (remaining, Some(remaining))
} }
} }
impl<'a> IntoIterator for &'a MergeOperands {
type Item = &'a [u8];
type IntoIter = MergeOperandsIter<'a>;
fn into_iter(self) -> Self::IntoIter {
Self::IntoIter {
operands: self,
cursor: 0,
}
}
}

@ -192,9 +192,9 @@ fn test_merge_operator() {
fn test_provided_merge( fn test_provided_merge(
_: &[u8], _: &[u8],
existing_val: Option<&[u8]>, existing_val: Option<&[u8]>,
operands: &mut MergeOperands, operands: &MergeOperands,
) -> Option<Vec<u8>> { ) -> Option<Vec<u8>> {
let nops = operands.size_hint().0; let nops = operands.len();
let mut result: Vec<u8> = Vec::with_capacity(nops); let mut result: Vec<u8> = Vec::with_capacity(nops);
if let Some(v) = existing_val { if let Some(v) = existing_val {
for e in v { for e in v {

@ -22,9 +22,9 @@ use util::DBPath;
fn test_provided_merge( fn test_provided_merge(
_new_key: &[u8], _new_key: &[u8],
existing_val: Option<&[u8]>, existing_val: Option<&[u8]>,
operands: &mut MergeOperands, operands: &MergeOperands,
) -> Option<Vec<u8>> { ) -> Option<Vec<u8>> {
let nops = operands.size_hint().0; let nops = operands.len();
let mut result: Vec<u8> = Vec::with_capacity(nops); let mut result: Vec<u8> = Vec::with_capacity(nops);
if let Some(v) = existing_val { if let Some(v) = existing_val {
for e in v { for e in v {
@ -97,9 +97,9 @@ impl ValueCounts {
fn test_counting_partial_merge( fn test_counting_partial_merge(
_new_key: &[u8], _new_key: &[u8],
_existing_val: Option<&[u8]>, _existing_val: Option<&[u8]>,
operands: &mut MergeOperands, operands: &MergeOperands,
) -> Option<Vec<u8>> { ) -> Option<Vec<u8>> {
let nops = operands.size_hint().0; let nops = operands.len();
let mut result: Vec<u8> = Vec::with_capacity(nops); let mut result: Vec<u8> = Vec::with_capacity(nops);
for op in operands { for op in operands {
for e in op { for e in op {
@ -112,7 +112,7 @@ fn test_counting_partial_merge(
fn test_counting_full_merge( fn test_counting_full_merge(
_new_key: &[u8], _new_key: &[u8],
existing_val: Option<&[u8]>, existing_val: Option<&[u8]>,
operands: &mut MergeOperands, operands: &MergeOperands,
) -> Option<Vec<u8>> { ) -> Option<Vec<u8>> {
let mut counts = existing_val let mut counts = existing_val
.map(|v| ValueCounts::from_slice(v)) .map(|v| ValueCounts::from_slice(v))
@ -251,7 +251,7 @@ fn failed_merge_test() {
fn test_failing_merge( fn test_failing_merge(
_key: &[u8], _key: &[u8],
_val: Option<&[u8]>, _val: Option<&[u8]>,
_operands: &mut MergeOperands, _operands: &MergeOperands,
) -> Option<Vec<u8>> { ) -> Option<Vec<u8>> {
None None
} }
@ -274,7 +274,7 @@ fn failed_merge_test() {
} }
fn make_merge_max_with_limit(limit: u64) -> impl MergeFn + Clone { fn make_merge_max_with_limit(limit: u64) -> impl MergeFn + Clone {
move |_key: &[u8], first: Option<&[u8]>, rest: &mut MergeOperands| { move |_key: &[u8], first: Option<&[u8]>, rest: &MergeOperands| {
let max = first let max = first
.into_iter() .into_iter()
.chain(rest) .chain(rest)

Loading…
Cancel
Save