Introduce ReadOptions::set_iterate_range and PrefixRange (#656)

master
Michal Nazarewicz 2 years ago committed by GitHub
parent 17471a6da7
commit 78dba2100b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 91
      src/db_options.rs
  2. 107
      src/iter_range.rs
  3. 2
      src/lib.rs
  4. 130
      tests/test_iterator.rs

@ -3302,38 +3302,83 @@ impl ReadOptions {
} }
} }
/// Sets the lower bound for an iterator.
pub fn set_iterate_lower_bound<K: Into<Vec<u8>>>(&mut self, key: K) {
self.set_lower_bound_impl(Some(key.into()));
}
/// Sets the upper bound for an iterator. /// Sets the upper bound for an iterator.
/// The upper bound itself is not included on the iteration result. /// The upper bound itself is not included on the iteration result.
pub fn set_iterate_upper_bound<K: Into<Vec<u8>>>(&mut self, key: K) { pub fn set_iterate_upper_bound<K: Into<Vec<u8>>>(&mut self, key: K) {
self.iterate_upper_bound = Some(key.into()); self.set_upper_bound_impl(Some(key.into()));
let upper_bound = self }
.iterate_upper_bound
.as_ref()
.expect("iterate_upper_bound must exist.");
/// Sets lower and upper bounds based on the provided range. This is
/// similar to setting lower and upper bounds separately except that it also
/// allows either bound to be reset.
///
/// The argument can be a regular Rust range, e.g. `lower..upper`. However,
/// since RocksDB upper bound is always excluded (i.e. range can never be
/// fully closed) inclusive ranges (`lower..=upper` and `..=upper`) are not
/// supported. For example:
///
/// ```
/// let mut options = rocksdb::ReadOptions::default();
/// options.set_iterate_range("xy".as_bytes().."xz".as_bytes());
/// ```
///
/// In addition, [`crate::PrefixRange`] can be used to specify a range of
/// keys with a given prefix. In particular, the above example is
/// equivalent to:
///
/// ```
/// let mut options = rocksdb::ReadOptions::default();
/// options.set_iterate_range(rocksdb::PrefixRange("xy".as_bytes()));
/// ```
///
/// Note that setting range using this method is separate to using prefix
/// iterators. Prefix iterators use prefix extractor configured for
/// a column family. Setting bounds via [`crate::PrefixRange`] is more akin
/// to using manual prefix.
///
/// Using this method clears any previously set bounds. In other words, the
/// bounds can be reset by setting the range to `..` as in:
///
/// ```
/// let mut options = rocksdb::ReadOptions::default();
/// options.set_iterate_range(..);
/// ```
pub fn set_iterate_range(&mut self, range: impl crate::IterateBounds) {
let (lower, upper) = range.into_bounds();
self.set_lower_bound_impl(lower);
self.set_upper_bound_impl(upper);
}
fn set_lower_bound_impl(&mut self, bound: Option<Vec<u8>>) {
let (ptr, len) = if let Some(ref bound) = bound {
(bound.as_ptr() as *const c_char, bound.len())
} else if self.iterate_lower_bound.is_some() {
(std::ptr::null(), 0)
} else {
return;
};
self.iterate_lower_bound = bound;
unsafe { unsafe {
ffi::rocksdb_readoptions_set_iterate_upper_bound( ffi::rocksdb_readoptions_set_iterate_lower_bound(self.inner, ptr, len);
self.inner,
upper_bound.as_ptr() as *const c_char,
upper_bound.len() as size_t,
);
} }
} }
/// Sets the lower bound for an iterator. fn set_upper_bound_impl(&mut self, bound: Option<Vec<u8>>) {
pub fn set_iterate_lower_bound<K: Into<Vec<u8>>>(&mut self, key: K) { let (ptr, len) = if let Some(ref bound) = bound {
self.iterate_lower_bound = Some(key.into()); (bound.as_ptr() as *const c_char, bound.len())
let lower_bound = self } else if self.iterate_upper_bound.is_some() {
.iterate_lower_bound (std::ptr::null(), 0)
.as_ref() } else {
.expect("iterate_lower_bound must exist."); return;
};
self.iterate_upper_bound = bound;
unsafe { unsafe {
ffi::rocksdb_readoptions_set_iterate_lower_bound( ffi::rocksdb_readoptions_set_iterate_upper_bound(self.inner, ptr, len);
self.inner,
lower_bound.as_ptr() as *const c_char,
lower_bound.len() as size_t,
);
} }
} }

@ -0,0 +1,107 @@
/// A range which can be set as iterate bounds on [`crate::ReadOptions`].
///
/// See [`crate::ReadOptions::set_iterate_range`] for documentation and
/// examples.
pub trait IterateBounds {
/// Converts object into lower and upper bounds pair.
///
/// If this object represents range with one of the bounds unset,
/// corresponding element is returned as `None`. For example, `..upper`
/// range would be converted into `(None, Some(upper))` pair.
fn into_bounds(self) -> (Option<Vec<u8>>, Option<Vec<u8>>);
}
impl IterateBounds for std::ops::RangeFull {
fn into_bounds(self) -> (Option<Vec<u8>>, Option<Vec<u8>>) {
(None, None)
}
}
impl<K: Into<Vec<u8>>> IterateBounds for std::ops::Range<K> {
fn into_bounds(self) -> (Option<Vec<u8>>, Option<Vec<u8>>) {
(Some(self.start.into()), Some(self.end.into()))
}
}
impl<K: Into<Vec<u8>>> IterateBounds for std::ops::RangeFrom<K> {
fn into_bounds(self) -> (Option<Vec<u8>>, Option<Vec<u8>>) {
(Some(self.start.into()), None)
}
}
impl<K: Into<Vec<u8>>> IterateBounds for std::ops::RangeTo<K> {
fn into_bounds(self) -> (Option<Vec<u8>>, Option<Vec<u8>>) {
(None, Some(self.end.into()))
}
}
/// Representation of a range of keys starting with given prefix.
///
/// Can be used as argument of [`crate::ReadOptions::set_iterate_range`] method
/// to set iterate bounds.
#[derive(Clone, Copy)]
pub struct PrefixRange<K>(pub K);
impl<K: Into<Vec<u8>>> IterateBounds for PrefixRange<K> {
/// Converts the prefix range representation into pair of bounds.
///
/// The conversion assumes lexicographical sorting on `u8` values. For
/// example, `PrefixRange("a")` is equivalent to `"a".."b"` range. Note
/// that for some prefixes, either of the bounds may be `None`. For
/// example, an empty prefix is equivalent to a full range (i.e. both bounds
/// being `None`).
fn into_bounds(self) -> (Option<Vec<u8>>, Option<Vec<u8>>) {
let start = self.0.into();
if start.is_empty() {
(None, None)
} else {
let end = next_prefix(&start);
(Some(start), end)
}
}
}
/// Returns lowest value following largest value with given prefix.
///
/// In other words, computes upper bound for a prefix scan over list of keys
/// sorted in lexicographical order. This means that a prefix scan can be
/// expressed as range scan over a right-open `[prefix, next_prefix(prefix))`
/// range.
///
/// For example, for prefix `foo` the function returns `fop`.
///
/// Returns `None` if there is no value which can follow value with given
/// prefix. This happens when prefix consists entirely of `'\xff'` bytes (or is
/// empty).
fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
let ffs = prefix
.iter()
.rev()
.take_while(|&&byte| byte == u8::MAX)
.count();
let next = &prefix[..(prefix.len() - ffs)];
if next.is_empty() {
// Prefix consisted of \xff bytes. There is no prefix that
// follows it.
None
} else {
let mut next = next.to_vec();
*next.last_mut().unwrap() += 1;
Some(next)
}
}
#[test]
fn test_prefix_range() {
fn test(start: &[u8], end: Option<&[u8]>) {
let got = PrefixRange(start).into_bounds();
assert_eq!((Some(start), end), (got.0.as_deref(), got.1.as_deref()));
}
let empty: &[u8] = &[];
assert_eq!((None, None), PrefixRange(empty).into_bounds());
test(b"\xff", None);
test(b"\xff\xff\xff\xff", None);
test(b"a", Some(b"b"));
test(b"a\xff\xff\xff", Some(b"b"));
}

@ -85,6 +85,7 @@ mod db;
mod db_iterator; mod db_iterator;
mod db_options; mod db_options;
mod db_pinnable_slice; mod db_pinnable_slice;
mod iter_range;
pub mod merge_operator; pub mod merge_operator;
pub mod perf; pub mod perf;
pub mod properties; pub mod properties;
@ -113,6 +114,7 @@ pub use crate::{
}, },
db_pinnable_slice::DBPinnableSlice, db_pinnable_slice::DBPinnableSlice,
ffi_util::CStrLike, ffi_util::CStrLike,
iter_range::{IterateBounds, PrefixRange},
merge_operator::MergeOperands, merge_operator::MergeOperands,
perf::{PerfContext, PerfMetric, PerfStatsLevel}, perf::{PerfContext, PerfMetric, PerfStatsLevel},
slice_transform::SliceTransform, slice_transform::SliceTransform,

@ -236,3 +236,133 @@ fn test_iterator_outlive_db() {
let t = trybuild::TestCases::new(); let t = trybuild::TestCases::new();
t.compile_fail("tests/fail/iterator_outlive_db.rs"); t.compile_fail("tests/fail/iterator_outlive_db.rs");
} }
#[test]
fn test_iter_range() {
#[rustfmt::skip]
const ALL_KEYS: [&[u8]; 12] = [
/* 0 */ b"a0",
/* 1 */ b"a1",
/* 2 */ b"a11",
/* 3 */ b"a2",
/* 4 */ b"a\xff0",
/* 5 */ b"a\xff1",
/* 6 */ b"b0",
/* 7 */ b"b1",
/* 8 */ b"\xff",
/* 9 */ b"\xff0",
/* 10 */ b"\xff1",
/* 11 */ b"\xff2",
];
let path = DBPath::new("_rust_rocksdb_iter_range_test");
let db = DB::open_default(&path).unwrap();
for key in ALL_KEYS.iter() {
assert!(db.put(key, key).is_ok());
}
fn test(
db: &DB,
mode: IteratorMode,
range: impl rocksdb::IterateBounds,
want: std::ops::Range<usize>,
reverse: bool,
) {
let mut ro = rocksdb::ReadOptions::default();
// Set bounds to test that set_iterate_range clears old bounds.
ro.set_iterate_lower_bound(vec![b'z']);
ro.set_iterate_upper_bound(vec![b'z']);
ro.set_iterate_range(range);
let got = db
.iterator_opt(mode, ro)
.map(|(key, _value)| key)
.collect::<Vec<_>>();
let mut got = got.iter().map(Box::as_ref).collect::<Vec<_>>();
if reverse {
got.reverse();
}
assert_eq!(&ALL_KEYS[want], got);
}
fn prefix(key: &[u8]) -> rocksdb::PrefixRange<&[u8]> {
rocksdb::PrefixRange(key)
}
// Test Start and End modes
{
fn check<R>(db: &DB, range: R, want: std::ops::Range<usize>)
where
R: rocksdb::IterateBounds + Clone,
{
test(db, IteratorMode::Start, range.clone(), want.clone(), false);
test(db, IteratorMode::End, range, want, true);
}
check(&db, .., 0..12);
check(&db, "b1".as_bytes().., 7..12);
check(&db, .."b1".as_bytes(), 0..7);
check(&db, "a1".as_bytes().."b1".as_bytes(), 1..7);
check(&db, prefix(b""), 0..12);
check(&db, prefix(b"a"), 0..6);
check(&db, prefix(b"a1"), 1..3);
check(&db, prefix(b"a\xff"), 4..6);
check(&db, prefix(b"\xff"), 8..12);
}
// Test From mode with Forward direction
{
fn check<R>(db: &DB, from: &[u8], range: R, want: std::ops::Range<usize>)
where
R: rocksdb::IterateBounds + Clone,
{
let mode = IteratorMode::From(from, Direction::Forward);
test(db, mode, range, want, false);
}
check(&db, b"b0", .., 6..12);
check(&db, b"b0", "a2".as_bytes().., 6..12);
check(&db, b"b0", .."a1".as_bytes(), 0..0);
check(&db, b"b0", .."b0".as_bytes(), 0..0);
check(&db, b"b0", .."b1".as_bytes(), 6..7);
check(&db, b"b0", "a1".as_bytes().."b0".as_bytes(), 0..0);
check(&db, b"b0", "a1".as_bytes().."b1".as_bytes(), 6..7);
check(&db, b"b0", prefix(b""), 6..12);
check(&db, b"a1", prefix(b"a"), 1..6);
check(&db, b"b0", prefix(b"a"), 0..0);
check(&db, b"a1", prefix(b"a1"), 1..3);
check(&db, b"b0", prefix(b"a1"), 0..0);
check(&db, b"a1", prefix(b"a\xff"), 4..6);
check(&db, b"b0", prefix(b"a\xff"), 0..0);
check(&db, b"b0", prefix(b"\xff"), 8..12);
}
// Test From mode with Reverse direction
{
fn check<R>(db: &DB, from: &[u8], range: R, want: std::ops::Range<usize>)
where
R: rocksdb::IterateBounds + Clone,
{
let mode = IteratorMode::From(from, Direction::Reverse);
test(db, mode, range, want, true);
}
check(&db, b"b0", .., 0..7);
check(&db, b"b0", "a2".as_bytes().., 3..7);
check(&db, b"b0", .."a1".as_bytes(), 0..1);
check(&db, b"b0", .."b0".as_bytes(), 0..6);
check(&db, b"b0", .."b1".as_bytes(), 0..7);
check(&db, b"b0", "a1".as_bytes().."b0".as_bytes(), 1..6);
check(&db, b"b0", "a1".as_bytes().."b1".as_bytes(), 1..7);
check(&db, b"b0", prefix(b""), 0..7);
check(&db, b"a1", prefix(b"a"), 0..2);
check(&db, b"b0", prefix(b"a"), 0..6);
check(&db, b"a1", prefix(b"a1"), 1..2);
check(&db, b"b0", prefix(b"a1"), 1..3);
check(&db, b"a1", prefix(b"a\xff"), 0..0);
check(&db, b"b0", prefix(b"a\xff"), 4..6);
check(&db, b"b0", prefix(b"\xff"), 0..0);
}
}

Loading…
Cancel
Save