Change iterators to return Result (#648)

master
Michal Nazarewicz 2 years ago committed by GitHub
parent 2257be1563
commit 8710105cb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 93
      src/db_iterator.rs
  2. 20
      tests/test_db.rs
  3. 52
      tests/test_iterator.rs
  4. 17
      tests/util/mod.rs

@ -377,22 +377,26 @@ pub type DBIterator<'a> = DBIteratorWithThreadMode<'a, DB>;
/// { /// {
/// let db = DB::open_default(path).unwrap(); /// let db = DB::open_default(path).unwrap();
/// let mut iter = db.iterator(IteratorMode::Start); // Always iterates forward /// let mut iter = db.iterator(IteratorMode::Start); // Always iterates forward
/// for (key, value) in iter { /// for item in iter {
/// let (key, value) = item.unwrap();
/// println!("Saw {:?} {:?}", key, value); /// println!("Saw {:?} {:?}", key, value);
/// } /// }
/// iter = db.iterator(IteratorMode::End); // Always iterates backward /// iter = db.iterator(IteratorMode::End); // Always iterates backward
/// for (key, value) in iter { /// for item in iter {
/// let (key, value) = item.unwrap();
/// println!("Saw {:?} {:?}", key, value); /// println!("Saw {:?} {:?}", key, value);
/// } /// }
/// iter = db.iterator(IteratorMode::From(b"my key", Direction::Forward)); // From a key in Direction::{forward,reverse} /// iter = db.iterator(IteratorMode::From(b"my key", Direction::Forward)); // From a key in Direction::{forward,reverse}
/// for (key, value) in iter { /// for item in iter {
/// let (key, value) = item.unwrap();
/// println!("Saw {:?} {:?}", key, value); /// println!("Saw {:?} {:?}", key, value);
/// } /// }
/// ///
/// // You can seek with an existing Iterator instance, too /// // You can seek with an existing Iterator instance, too
/// iter = db.iterator(IteratorMode::Start); /// iter = db.iterator(IteratorMode::Start);
/// iter.set_mode(IteratorMode::From(b"another key", Direction::Reverse)); /// iter.set_mode(IteratorMode::From(b"another key", Direction::Reverse));
/// for (key, value) in iter { /// for item in iter {
/// let (key, value) = item.unwrap();
/// println!("Saw {:?} {:?}", key, value); /// println!("Saw {:?} {:?}", key, value);
/// } /// }
/// } /// }
@ -401,9 +405,10 @@ pub type DBIterator<'a> = DBIteratorWithThreadMode<'a, DB>;
pub struct DBIteratorWithThreadMode<'a, D: DBAccess> { pub struct DBIteratorWithThreadMode<'a, D: DBAccess> {
raw: DBRawIteratorWithThreadMode<'a, D>, raw: DBRawIteratorWithThreadMode<'a, D>,
direction: Direction, direction: Direction,
just_seeked: bool, done: bool,
} }
#[derive(Copy, Clone)]
pub enum Direction { pub enum Direction {
Forward, Forward,
Reverse, Reverse,
@ -411,6 +416,7 @@ pub enum Direction {
pub type KVBytes = (Box<[u8]>, Box<[u8]>); pub type KVBytes = (Box<[u8]>, Box<[u8]>);
#[derive(Copy, Clone)]
pub enum IteratorMode<'a> { pub enum IteratorMode<'a> {
Start, Start,
End, End,
@ -419,13 +425,7 @@ pub enum IteratorMode<'a> {
impl<'a, D: DBAccess> DBIteratorWithThreadMode<'a, D> { impl<'a, D: DBAccess> DBIteratorWithThreadMode<'a, D> {
pub(crate) fn new(db: &D, readopts: ReadOptions, mode: IteratorMode) -> Self { pub(crate) fn new(db: &D, readopts: ReadOptions, mode: IteratorMode) -> Self {
let mut rv = DBIteratorWithThreadMode { Self::from_raw(DBRawIteratorWithThreadMode::new(db, readopts), mode)
raw: DBRawIteratorWithThreadMode::new(db, readopts),
direction: Direction::Forward, // blown away by set_mode()
just_seeked: false,
};
rv.set_mode(mode);
rv
} }
pub(crate) fn new_cf( pub(crate) fn new_cf(
@ -434,76 +434,67 @@ impl<'a, D: DBAccess> DBIteratorWithThreadMode<'a, D> {
readopts: ReadOptions, readopts: ReadOptions,
mode: IteratorMode, mode: IteratorMode,
) -> Self { ) -> Self {
Self::from_raw(
DBRawIteratorWithThreadMode::new_cf(db, cf_handle, readopts),
mode,
)
}
fn from_raw(raw: DBRawIteratorWithThreadMode<'a, D>, mode: IteratorMode) -> Self {
let mut rv = DBIteratorWithThreadMode { let mut rv = DBIteratorWithThreadMode {
raw: DBRawIteratorWithThreadMode::new_cf(db, cf_handle, readopts), raw,
direction: Direction::Forward, // blown away by set_mode() direction: Direction::Forward, // blown away by set_mode()
just_seeked: false, done: false,
}; };
rv.set_mode(mode); rv.set_mode(mode);
rv rv
} }
pub fn set_mode(&mut self, mode: IteratorMode) { pub fn set_mode(&mut self, mode: IteratorMode) {
match mode { self.done = false;
self.direction = match mode {
IteratorMode::Start => { IteratorMode::Start => {
self.raw.seek_to_first(); self.raw.seek_to_first();
self.direction = Direction::Forward; Direction::Forward
} }
IteratorMode::End => { IteratorMode::End => {
self.raw.seek_to_last(); self.raw.seek_to_last();
self.direction = Direction::Reverse; Direction::Reverse
} }
IteratorMode::From(key, Direction::Forward) => { IteratorMode::From(key, Direction::Forward) => {
self.raw.seek(key); self.raw.seek(key);
self.direction = Direction::Forward; Direction::Forward
} }
IteratorMode::From(key, Direction::Reverse) => { IteratorMode::From(key, Direction::Reverse) => {
self.raw.seek_for_prev(key); self.raw.seek_for_prev(key);
self.direction = Direction::Reverse; Direction::Reverse
} }
}; };
self.just_seeked = true;
}
/// See [`valid`](DBRawIteratorWithThreadMode::valid)
pub fn valid(&self) -> bool {
self.raw.valid()
}
/// See [`status`](DBRawIteratorWithThreadMode::status)
pub fn status(&self) -> Result<(), Error> {
self.raw.status()
} }
} }
impl<'a, D: DBAccess> Iterator for DBIteratorWithThreadMode<'a, D> { impl<'a, D: DBAccess> Iterator for DBIteratorWithThreadMode<'a, D> {
type Item = KVBytes; type Item = Result<KVBytes, Error>;
fn next(&mut self) -> Option<KVBytes> {
if !self.raw.valid() {
return None;
}
// Initial call to next() after seeking should not move the iterator fn next(&mut self) -> Option<Result<KVBytes, Error>> {
// or the first item will not be returned if self.done {
if self.just_seeked { None
self.just_seeked = false; } else if let Some((key, value)) = self.raw.item() {
} else { let item = (Box::from(key), Box::from(value));
match self.direction { match self.direction {
Direction::Forward => self.raw.next(), Direction::Forward => self.raw.next(),
Direction::Reverse => self.raw.prev(), Direction::Reverse => self.raw.prev(),
} }
} Some(Ok(item))
if let Some((key, value)) = self.raw.item() {
Some((Box::from(key), Box::from(value)))
} else { } else {
None self.done = true;
self.raw.status().err().map(Result::Err)
} }
} }
} }
impl<'a, D: DBAccess> std::iter::FusedIterator for DBIteratorWithThreadMode<'a, D> {}
impl<'a, D: DBAccess> Into<DBRawIteratorWithThreadMode<'a, D>> for DBIteratorWithThreadMode<'a, D> { impl<'a, D: DBAccess> Into<DBRawIteratorWithThreadMode<'a, D>> for DBIteratorWithThreadMode<'a, D> {
fn into(self) -> DBRawIteratorWithThreadMode<'a, D> { fn into(self) -> DBRawIteratorWithThreadMode<'a, D> {
self.raw self.raw
@ -548,9 +539,9 @@ impl DBWALIterator {
} }
impl Iterator for DBWALIterator { impl Iterator for DBWALIterator {
type Item = (u64, WriteBatch); type Item = Result<(u64, WriteBatch), Error>;
fn next(&mut self) -> Option<(u64, WriteBatch)> { fn next(&mut self) -> Option<Self::Item> {
if !self.valid() { if !self.valid() {
return None; return None;
} }
@ -562,9 +553,9 @@ impl Iterator for DBWALIterator {
if self.valid() { if self.valid() {
let mut seq: u64 = 0; let mut seq: u64 = 0;
let inner = unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) }; let inner = unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) };
Some((seq, WriteBatch { inner })) Some(Ok((seq, WriteBatch { inner })))
} else { } else {
None self.status().err().map(Result::Err)
} }
} }
} }

@ -147,7 +147,7 @@ fn iterator_test() {
let iter = db.iterator(IteratorMode::Start); let iter = db.iterator(IteratorMode::Start);
for (idx, (db_key, db_value)) in iter.enumerate() { for (idx, (db_key, db_value)) in iter.map(Result::unwrap).enumerate() {
let (key, value) = data[idx]; let (key, value) = data[idx];
assert_eq!((&key[..], &value[..]), (db_key.as_ref(), db_value.as_ref())); assert_eq!((&key[..], &value[..]), (db_key.as_ref(), db_value.as_ref()));
} }
@ -188,7 +188,7 @@ fn iterator_test_tailing() {
} }
let mut tot = 0; let mut tot = 0;
for (i, (k, v)) in tail_iter.enumerate() { for (i, (k, v)) in tail_iter.map(Result::unwrap).enumerate() {
assert_eq!( assert_eq!(
(k.to_vec(), v.to_vec()), (k.to_vec(), v.to_vec()),
(data[i].0.to_vec(), data[i].1.to_vec()) (data[i].0.to_vec(), data[i].1.to_vec())
@ -424,13 +424,13 @@ fn test_get_updates_since_multiple_batches() {
puts: 0, puts: 0,
deletes: 0, deletes: 0,
}; };
let (seq, batch) = iter.next().unwrap(); let (seq, batch) = iter.next().unwrap().unwrap();
assert_eq!(seq, 2); assert_eq!(seq, 2);
batch.iterate(&mut counts); batch.iterate(&mut counts);
let (seq, batch) = iter.next().unwrap(); let (seq, batch) = iter.next().unwrap().unwrap();
assert_eq!(seq, 3); assert_eq!(seq, 3);
batch.iterate(&mut counts); batch.iterate(&mut counts);
let (seq, batch) = iter.next().unwrap(); let (seq, batch) = iter.next().unwrap().unwrap();
assert_eq!(seq, 4); assert_eq!(seq, 4);
batch.iterate(&mut counts); batch.iterate(&mut counts);
assert!(iter.next().is_none()); assert!(iter.next().is_none());
@ -457,7 +457,7 @@ fn test_get_updates_since_one_batch() {
puts: 0, puts: 0,
deletes: 0, deletes: 0,
}; };
let (seq, batch) = iter.next().unwrap(); let (seq, batch) = iter.next().unwrap().unwrap();
assert_eq!(seq, 2); assert_eq!(seq, 2);
batch.iterate(&mut counts); batch.iterate(&mut counts);
assert!(iter.next().is_none()); assert!(iter.next().is_none());
@ -857,7 +857,7 @@ fn get_with_cache_and_bulkload_test() {
// try to get key // try to get key
let iter = db.iterator(IteratorMode::Start); let iter = db.iterator(IteratorMode::Start);
for (expected, (k, _)) in iter.enumerate() { for (expected, (k, _)) in iter.map(Result::unwrap).enumerate() {
assert_eq!(k.as_ref(), format!("{:0>4}", expected).as_bytes()); assert_eq!(k.as_ref(), format!("{:0>4}", expected).as_bytes());
} }
@ -918,7 +918,7 @@ fn get_with_cache_and_bulkload_test() {
// try to get key // try to get key
let iter = db.iterator(IteratorMode::Start); let iter = db.iterator(IteratorMode::Start);
for (expected, (k, _)) in iter.enumerate() { for (expected, (k, _)) in iter.map(Result::unwrap).enumerate() {
assert_eq!(k.as_ref(), format!("{:0>4}", expected).as_bytes()); assert_eq!(k.as_ref(), format!("{:0>4}", expected).as_bytes());
} }
} }
@ -992,7 +992,7 @@ fn get_with_cache_and_bulkload_and_blobs_test() {
// try to get key // try to get key
let iter = db.iterator(IteratorMode::Start); let iter = db.iterator(IteratorMode::Start);
for (expected, (k, _)) in iter.enumerate() { for (expected, (k, _)) in iter.map(Result::unwrap).enumerate() {
assert_eq!(k.as_ref(), format!("{:0>4}", expected).as_bytes()); assert_eq!(k.as_ref(), format!("{:0>4}", expected).as_bytes());
} }
@ -1053,7 +1053,7 @@ fn get_with_cache_and_bulkload_and_blobs_test() {
// try to get key // try to get key
let iter = db.iterator(IteratorMode::Start); let iter = db.iterator(IteratorMode::Start);
for (expected, (k, _)) in iter.enumerate() { for (expected, (k, _)) in iter.map(Result::unwrap).enumerate() {
assert_eq!(k.as_ref(), format!("{:0>4}", expected).as_bytes()); assert_eq!(k.as_ref(), format!("{:0>4}", expected).as_bytes());
} }
} }

@ -73,29 +73,39 @@ fn test_iterator() {
); );
{ {
let iterator1 = db.iterator(IteratorMode::From(b"k0", Direction::Forward)); let test = |valid, key, dir| {
assert!(iterator1.valid()); let mut it = db.iterator(IteratorMode::From(key, dir));
let iterator2 = db.iterator(IteratorMode::From(b"k1", Direction::Forward)); let value = it.next();
assert!(iterator2.valid()); if valid {
let iterator3 = db.iterator(IteratorMode::From(b"k11", Direction::Forward)); assert!(matches!(value, Some(Ok(_))), "{:?}", value);
assert!(iterator3.valid()); } else {
let iterator4 = db.iterator(IteratorMode::From(b"k5", Direction::Forward)); assert_eq!(None, value);
assert!(!iterator4.valid()); assert_eq!(None, it.next()); // Iterator is fused
let iterator5 = db.iterator(IteratorMode::From(b"k0", Direction::Reverse)); }
assert!(!iterator5.valid()); };
let iterator6 = db.iterator(IteratorMode::From(b"k1", Direction::Reverse));
assert!(iterator6.valid()); test(true, b"k0", Direction::Forward);
let iterator7 = db.iterator(IteratorMode::From(b"k11", Direction::Reverse)); test(true, b"k1", Direction::Forward);
assert!(iterator7.valid()); test(true, b"k11", Direction::Forward);
let iterator8 = db.iterator(IteratorMode::From(b"k5", Direction::Reverse)); test(false, b"k5", Direction::Forward);
assert!(iterator8.valid()); test(false, b"k0", Direction::Reverse);
test(true, b"k1", Direction::Reverse);
test(true, b"k11", Direction::Reverse);
test(true, b"k5", Direction::Reverse);
} }
{ {
let mut iterator1 = db.iterator(IteratorMode::From(b"k4", Direction::Forward)); let mut iterator1 = db.iterator(IteratorMode::From(b"k4", Direction::Forward));
iterator1.next(); iterator1.next().unwrap().unwrap();
assert!(iterator1.valid()); assert_eq!(None, iterator1.next());
iterator1.next(); assert_eq!(None, iterator1.next());
assert!(!iterator1.valid()); }
{
// Check that set_mode resets the iterator
let mode = IteratorMode::From(K3, Direction::Forward);
let mut iterator = db.iterator(mode);
assert_iter(&mut iterator, &expected2[2..]);
iterator.set_mode(mode);
assert_iter(&mut iterator, &expected2[2..]);
} }
} }
} }
@ -217,6 +227,7 @@ fn test_full_iterator() {
fn custom_iter(db: &'_ DB) -> impl Iterator<Item = usize> + '_ { fn custom_iter(db: &'_ DB) -> impl Iterator<Item = usize> + '_ {
db.iterator(IteratorMode::Start) db.iterator(IteratorMode::Start)
.map(Result::unwrap)
.map(|(_, db_value)| db_value.len()) .map(|(_, db_value)| db_value.len())
} }
@ -275,6 +286,7 @@ fn test_iter_range() {
ro.set_iterate_range(range); ro.set_iterate_range(range);
let got = db let got = db
.iterator_opt(mode, ro) .iterator_opt(mode, ro)
.map(Result::unwrap)
.map(|(key, _value)| key) .map(|(key, _value)| key)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let mut got = got.iter().map(Box::as_ref).collect::<Vec<_>>(); let mut got = got.iter().map(Box::as_ref).collect::<Vec<_>>();

@ -2,7 +2,7 @@
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use rocksdb::{Options, DB}; use rocksdb::{Error, Options, DB};
/// Temporary database path which calls DB::Destroy when DBPath is dropped. /// Temporary database path which calls DB::Destroy when DBPath is dropped.
pub struct DBPath { pub struct DBPath {
@ -47,19 +47,14 @@ pub fn pair(left: &[u8], right: &[u8]) -> Pair {
} }
#[track_caller] #[track_caller]
pub fn assert_iter<D: rocksdb::DBAccess>( pub fn assert_iter(iter: impl Iterator<Item = Result<Pair, Error>>, want: &[Pair]) {
iter: rocksdb::DBIteratorWithThreadMode<'_, D>, let got = iter.collect::<Result<Vec<_>, _>>().unwrap();
want: &[Pair], assert_eq!(got.as_slice(), want);
) {
assert_eq!(iter.collect::<Vec<_>>().as_slice(), want);
} }
#[track_caller] #[track_caller]
pub fn assert_iter_reversed<D: rocksdb::DBAccess>( pub fn assert_iter_reversed(iter: impl Iterator<Item = Result<Pair, Error>>, want: &[Pair]) {
iter: rocksdb::DBIteratorWithThreadMode<'_, D>, let mut got = iter.collect::<Result<Vec<_>, _>>().unwrap();
want: &[Pair],
) {
let mut got = iter.collect::<Vec<_>>();
got.reverse(); got.reverse();
assert_eq!(got.as_slice(), want); assert_eq!(got.as_slice(), want);
} }

Loading…
Cancel
Save