diff --git a/src/db_options.rs b/src/db_options.rs index 167e73e..b58a0b4 100644 --- a/src/db_options.rs +++ b/src/db_options.rs @@ -3302,38 +3302,83 @@ impl ReadOptions { } } + /// Sets the lower bound for an iterator. + pub fn set_iterate_lower_bound>>(&mut self, key: K) { + self.set_lower_bound_impl(Some(key.into())); + } + /// Sets the upper bound for an iterator. /// The upper bound itself is not included on the iteration result. pub fn set_iterate_upper_bound>>(&mut self, key: K) { - self.iterate_upper_bound = Some(key.into()); - let upper_bound = self - .iterate_upper_bound - .as_ref() - .expect("iterate_upper_bound must exist."); + self.set_upper_bound_impl(Some(key.into())); + } + /// Sets lower and upper bounds based on the provided range. This is + /// similar to setting lower and upper bounds separately except that it also + /// allows either bound to be reset. + /// + /// The argument can be a regular Rust range, e.g. `lower..upper`. However, + /// since RocksDB upper bound is always excluded (i.e. range can never be + /// fully closed) inclusive ranges (`lower..=upper` and `..=upper`) are not + /// supported. For example: + /// + /// ``` + /// let mut options = rocksdb::ReadOptions::default(); + /// options.set_iterate_range("xy".as_bytes().."xz".as_bytes()); + /// ``` + /// + /// In addition, [`crate::PrefixRange`] can be used to specify a range of + /// keys with a given prefix. In particular, the above example is + /// equivalent to: + /// + /// ``` + /// let mut options = rocksdb::ReadOptions::default(); + /// options.set_iterate_range(rocksdb::PrefixRange("xy".as_bytes())); + /// ``` + /// + /// Note that setting range using this method is separate to using prefix + /// iterators. Prefix iterators use prefix extractor configured for + /// a column family. Setting bounds via [`crate::PrefixRange`] is more akin + /// to using manual prefix. + /// + /// Using this method clears any previously set bounds. In other words, the + /// bounds can be reset by setting the range to `..` as in: + /// + /// ``` + /// let mut options = rocksdb::ReadOptions::default(); + /// options.set_iterate_range(..); + /// ``` + pub fn set_iterate_range(&mut self, range: impl crate::IterateBounds) { + let (lower, upper) = range.into_bounds(); + self.set_lower_bound_impl(lower); + self.set_upper_bound_impl(upper); + } + + fn set_lower_bound_impl(&mut self, bound: Option>) { + let (ptr, len) = if let Some(ref bound) = bound { + (bound.as_ptr() as *const c_char, bound.len()) + } else if self.iterate_lower_bound.is_some() { + (std::ptr::null(), 0) + } else { + return; + }; + self.iterate_lower_bound = bound; unsafe { - ffi::rocksdb_readoptions_set_iterate_upper_bound( - self.inner, - upper_bound.as_ptr() as *const c_char, - upper_bound.len() as size_t, - ); + ffi::rocksdb_readoptions_set_iterate_lower_bound(self.inner, ptr, len); } } - /// Sets the lower bound for an iterator. - pub fn set_iterate_lower_bound>>(&mut self, key: K) { - self.iterate_lower_bound = Some(key.into()); - let lower_bound = self - .iterate_lower_bound - .as_ref() - .expect("iterate_lower_bound must exist."); - + fn set_upper_bound_impl(&mut self, bound: Option>) { + let (ptr, len) = if let Some(ref bound) = bound { + (bound.as_ptr() as *const c_char, bound.len()) + } else if self.iterate_upper_bound.is_some() { + (std::ptr::null(), 0) + } else { + return; + }; + self.iterate_upper_bound = bound; unsafe { - ffi::rocksdb_readoptions_set_iterate_lower_bound( - self.inner, - lower_bound.as_ptr() as *const c_char, - lower_bound.len() as size_t, - ); + ffi::rocksdb_readoptions_set_iterate_upper_bound(self.inner, ptr, len); } } diff --git a/src/iter_range.rs b/src/iter_range.rs new file mode 100644 index 0000000..2bbdfc3 --- /dev/null +++ b/src/iter_range.rs @@ -0,0 +1,107 @@ +/// A range which can be set as iterate bounds on [`crate::ReadOptions`]. +/// +/// See [`crate::ReadOptions::set_iterate_range`] for documentation and +/// examples. +pub trait IterateBounds { + /// Converts object into lower and upper bounds pair. + /// + /// If this object represents range with one of the bounds unset, + /// corresponding element is returned as `None`. For example, `..upper` + /// range would be converted into `(None, Some(upper))` pair. + fn into_bounds(self) -> (Option>, Option>); +} + +impl IterateBounds for std::ops::RangeFull { + fn into_bounds(self) -> (Option>, Option>) { + (None, None) + } +} + +impl>> IterateBounds for std::ops::Range { + fn into_bounds(self) -> (Option>, Option>) { + (Some(self.start.into()), Some(self.end.into())) + } +} + +impl>> IterateBounds for std::ops::RangeFrom { + fn into_bounds(self) -> (Option>, Option>) { + (Some(self.start.into()), None) + } +} + +impl>> IterateBounds for std::ops::RangeTo { + fn into_bounds(self) -> (Option>, Option>) { + (None, Some(self.end.into())) + } +} + +/// Representation of a range of keys starting with given prefix. +/// +/// Can be used as argument of [`crate::ReadOptions::set_iterate_range`] method +/// to set iterate bounds. +#[derive(Clone, Copy)] +pub struct PrefixRange(pub K); + +impl>> IterateBounds for PrefixRange { + /// Converts the prefix range representation into pair of bounds. + /// + /// The conversion assumes lexicographical sorting on `u8` values. For + /// example, `PrefixRange("a")` is equivalent to `"a".."b"` range. Note + /// that for some prefixes, either of the bounds may be `None`. For + /// example, an empty prefix is equivalent to a full range (i.e. both bounds + /// being `None`). + fn into_bounds(self) -> (Option>, Option>) { + let start = self.0.into(); + if start.is_empty() { + (None, None) + } else { + let end = next_prefix(&start); + (Some(start), end) + } + } +} + +/// Returns lowest value following largest value with given prefix. +/// +/// In other words, computes upper bound for a prefix scan over list of keys +/// sorted in lexicographical order. This means that a prefix scan can be +/// expressed as range scan over a right-open `[prefix, next_prefix(prefix))` +/// range. +/// +/// For example, for prefix `foo` the function returns `fop`. +/// +/// Returns `None` if there is no value which can follow value with given +/// prefix. This happens when prefix consists entirely of `'\xff'` bytes (or is +/// empty). +fn next_prefix(prefix: &[u8]) -> Option> { + let ffs = prefix + .iter() + .rev() + .take_while(|&&byte| byte == u8::MAX) + .count(); + let next = &prefix[..(prefix.len() - ffs)]; + if next.is_empty() { + // Prefix consisted of \xff bytes. There is no prefix that + // follows it. + None + } else { + let mut next = next.to_vec(); + *next.last_mut().unwrap() += 1; + Some(next) + } +} + +#[test] +fn test_prefix_range() { + fn test(start: &[u8], end: Option<&[u8]>) { + let got = PrefixRange(start).into_bounds(); + assert_eq!((Some(start), end), (got.0.as_deref(), got.1.as_deref())); + } + + let empty: &[u8] = &[]; + assert_eq!((None, None), PrefixRange(empty).into_bounds()); + test(b"\xff", None); + test(b"\xff\xff\xff\xff", None); + test(b"a", Some(b"b")); + test(b"a\xff\xff\xff", Some(b"b")); +} diff --git a/src/lib.rs b/src/lib.rs index 3b239ac..a26b755 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -85,6 +85,7 @@ mod db; mod db_iterator; mod db_options; mod db_pinnable_slice; +mod iter_range; pub mod merge_operator; pub mod perf; pub mod properties; @@ -113,6 +114,7 @@ pub use crate::{ }, db_pinnable_slice::DBPinnableSlice, ffi_util::CStrLike, + iter_range::{IterateBounds, PrefixRange}, merge_operator::MergeOperands, perf::{PerfContext, PerfMetric, PerfStatsLevel}, slice_transform::SliceTransform, diff --git a/tests/test_iterator.rs b/tests/test_iterator.rs index 5f561fa..c49a233 100644 --- a/tests/test_iterator.rs +++ b/tests/test_iterator.rs @@ -236,3 +236,133 @@ fn test_iterator_outlive_db() { let t = trybuild::TestCases::new(); t.compile_fail("tests/fail/iterator_outlive_db.rs"); } + +#[test] +fn test_iter_range() { + #[rustfmt::skip] + const ALL_KEYS: [&[u8]; 12] = [ + /* 0 */ b"a0", + /* 1 */ b"a1", + /* 2 */ b"a11", + /* 3 */ b"a2", + /* 4 */ b"a\xff0", + /* 5 */ b"a\xff1", + /* 6 */ b"b0", + /* 7 */ b"b1", + /* 8 */ b"\xff", + /* 9 */ b"\xff0", + /* 10 */ b"\xff1", + /* 11 */ b"\xff2", + ]; + + let path = DBPath::new("_rust_rocksdb_iter_range_test"); + let db = DB::open_default(&path).unwrap(); + for key in ALL_KEYS.iter() { + assert!(db.put(key, key).is_ok()); + } + + fn test( + db: &DB, + mode: IteratorMode, + range: impl rocksdb::IterateBounds, + want: std::ops::Range, + reverse: bool, + ) { + let mut ro = rocksdb::ReadOptions::default(); + // Set bounds to test that set_iterate_range clears old bounds. + ro.set_iterate_lower_bound(vec![b'z']); + ro.set_iterate_upper_bound(vec![b'z']); + ro.set_iterate_range(range); + let got = db + .iterator_opt(mode, ro) + .map(|(key, _value)| key) + .collect::>(); + let mut got = got.iter().map(Box::as_ref).collect::>(); + if reverse { + got.reverse(); + } + assert_eq!(&ALL_KEYS[want], got); + } + + fn prefix(key: &[u8]) -> rocksdb::PrefixRange<&[u8]> { + rocksdb::PrefixRange(key) + } + + // Test Start and End modes + { + fn check(db: &DB, range: R, want: std::ops::Range) + where + R: rocksdb::IterateBounds + Clone, + { + test(db, IteratorMode::Start, range.clone(), want.clone(), false); + test(db, IteratorMode::End, range, want, true); + } + + check(&db, .., 0..12); + check(&db, "b1".as_bytes().., 7..12); + check(&db, .."b1".as_bytes(), 0..7); + check(&db, "a1".as_bytes().."b1".as_bytes(), 1..7); + + check(&db, prefix(b""), 0..12); + check(&db, prefix(b"a"), 0..6); + check(&db, prefix(b"a1"), 1..3); + check(&db, prefix(b"a\xff"), 4..6); + check(&db, prefix(b"\xff"), 8..12); + } + + // Test From mode with Forward direction + { + fn check(db: &DB, from: &[u8], range: R, want: std::ops::Range) + where + R: rocksdb::IterateBounds + Clone, + { + let mode = IteratorMode::From(from, Direction::Forward); + test(db, mode, range, want, false); + } + + check(&db, b"b0", .., 6..12); + check(&db, b"b0", "a2".as_bytes().., 6..12); + check(&db, b"b0", .."a1".as_bytes(), 0..0); + check(&db, b"b0", .."b0".as_bytes(), 0..0); + check(&db, b"b0", .."b1".as_bytes(), 6..7); + check(&db, b"b0", "a1".as_bytes().."b0".as_bytes(), 0..0); + check(&db, b"b0", "a1".as_bytes().."b1".as_bytes(), 6..7); + + check(&db, b"b0", prefix(b""), 6..12); + check(&db, b"a1", prefix(b"a"), 1..6); + check(&db, b"b0", prefix(b"a"), 0..0); + check(&db, b"a1", prefix(b"a1"), 1..3); + check(&db, b"b0", prefix(b"a1"), 0..0); + check(&db, b"a1", prefix(b"a\xff"), 4..6); + check(&db, b"b0", prefix(b"a\xff"), 0..0); + check(&db, b"b0", prefix(b"\xff"), 8..12); + } + + // Test From mode with Reverse direction + { + fn check(db: &DB, from: &[u8], range: R, want: std::ops::Range) + where + R: rocksdb::IterateBounds + Clone, + { + let mode = IteratorMode::From(from, Direction::Reverse); + test(db, mode, range, want, true); + } + + check(&db, b"b0", .., 0..7); + check(&db, b"b0", "a2".as_bytes().., 3..7); + check(&db, b"b0", .."a1".as_bytes(), 0..1); + check(&db, b"b0", .."b0".as_bytes(), 0..6); + check(&db, b"b0", .."b1".as_bytes(), 0..7); + check(&db, b"b0", "a1".as_bytes().."b0".as_bytes(), 1..6); + check(&db, b"b0", "a1".as_bytes().."b1".as_bytes(), 1..7); + + check(&db, b"b0", prefix(b""), 0..7); + check(&db, b"a1", prefix(b"a"), 0..2); + check(&db, b"b0", prefix(b"a"), 0..6); + check(&db, b"a1", prefix(b"a1"), 1..2); + check(&db, b"b0", prefix(b"a1"), 1..3); + check(&db, b"a1", prefix(b"a\xff"), 0..0); + check(&db, b"b0", prefix(b"a\xff"), 4..6); + check(&db, b"b0", prefix(b"\xff"), 0..0); + } +}