Fix bug in `DBWALIterator` that would miss updates (#771)

master
Simon Rasmussen 2 years ago committed by GitHub
parent c14532d8c2
commit 7fd6c98a23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      src/db.rs
  2. 37
      src/db_iterator.rs
  3. 33
      tests/test_db.rs

@ -1889,7 +1889,10 @@ impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
seq_number, seq_number,
opts opts
)); ));
Ok(DBWALIterator { inner: iter }) Ok(DBWALIterator {
inner: iter,
start_seq_number: seq_number,
})
} }
} }

@ -513,6 +513,7 @@ impl<'a, D: DBAccess> Into<DBRawIteratorWithThreadMode<'a, D>> for DBIteratorWit
/// ///
pub struct DBWALIterator { pub struct DBWALIterator {
pub(crate) inner: *mut ffi::rocksdb_wal_iterator_t, pub(crate) inner: *mut ffi::rocksdb_wal_iterator_t,
pub(crate) start_seq_number: u64,
} }
impl DBWALIterator { impl DBWALIterator {
@ -546,17 +547,39 @@ impl Iterator for DBWALIterator {
return None; return None;
} }
let mut seq: u64 = 0;
let mut batch = WriteBatch {
inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) },
};
// if the initial sequence number is what was requested we skip it to
// only provide changes *after* it
if seq == self.start_seq_number {
unsafe {
ffi::rocksdb_wal_iter_next(self.inner);
}
if !self.valid() {
return None;
}
// this drops which in turn frees the skipped batch
batch = WriteBatch {
inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) },
};
}
if !self.valid() {
return self.status().err().map(Result::Err);
}
// Seek to the next write batch. // Seek to the next write batch.
// Note that WriteBatches live independently of the WAL iterator so this is safe to do
unsafe { unsafe {
ffi::rocksdb_wal_iter_next(self.inner); ffi::rocksdb_wal_iter_next(self.inner);
} }
if self.valid() {
let mut seq: u64 = 0; Some(Ok((seq, batch)))
let inner = unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) };
Some(Ok((seq, WriteBatch { inner })))
} else {
self.status().err().map(Result::Err)
}
} }
} }

@ -409,6 +409,39 @@ fn test_get_updates_since_empty() {
assert!(iter.next().is_none()); assert!(iter.next().is_none());
} }
#[test]
fn test_get_updates_since_start() {
let path = DBPath::new("_rust_rocksdb_test_test_get_updates_since_start");
let db = DB::open_default(&path).unwrap();
// add some records and collect sequence numbers,
// verify 4 batches of 1 put each were done
let seq0 = db.latest_sequence_number();
db.put(b"key1", b"value1").unwrap();
db.put(b"key2", b"value2").unwrap();
db.put(b"key3", b"value3").unwrap();
db.put(b"key4", b"value4").unwrap();
let mut iter = db.get_updates_since(seq0).unwrap();
let mut counts = OperationCounts {
puts: 0,
deletes: 0,
};
let (seq, batch) = iter.next().unwrap().unwrap();
assert_eq!(seq, 1);
batch.iterate(&mut counts);
let (seq, batch) = iter.next().unwrap().unwrap();
assert_eq!(seq, 2);
batch.iterate(&mut counts);
let (seq, batch) = iter.next().unwrap().unwrap();
assert_eq!(seq, 3);
batch.iterate(&mut counts);
let (seq, batch) = iter.next().unwrap().unwrap();
assert_eq!(seq, 4);
batch.iterate(&mut counts);
assert!(iter.next().is_none());
assert_eq!(counts.puts, 4);
assert_eq!(counts.deletes, 0);
}
#[test] #[test]
fn test_get_updates_since_multiple_batches() { fn test_get_updates_since_multiple_batches() {
let path = DBPath::new("_rust_rocksdb_test_get_updates_since_multiple_batches"); let path = DBPath::new("_rust_rocksdb_test_get_updates_since_multiple_batches");

Loading…
Cancel
Save