From 7fd6c98a23e87f5de38396ee5152fd0d5486c917 Mon Sep 17 00:00:00 2001 From: Simon Rasmussen Date: Tue, 9 May 2023 13:11:01 +0200 Subject: [PATCH] Fix bug in `DBWALIterator` that would miss updates (#771) --- src/db.rs | 5 ++++- src/db_iterator.rs | 37 ++++++++++++++++++++++++++++++------- tests/test_db.rs | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 67 insertions(+), 8 deletions(-) diff --git a/src/db.rs b/src/db.rs index 87becae..1cd44ac 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1889,7 +1889,10 @@ impl DBCommon { seq_number, opts )); - Ok(DBWALIterator { inner: iter }) + Ok(DBWALIterator { + inner: iter, + start_seq_number: seq_number, + }) } } diff --git a/src/db_iterator.rs b/src/db_iterator.rs index 787ac94..51b00ab 100644 --- a/src/db_iterator.rs +++ b/src/db_iterator.rs @@ -513,6 +513,7 @@ impl<'a, D: DBAccess> Into> for DBIteratorWit /// pub struct DBWALIterator { pub(crate) inner: *mut ffi::rocksdb_wal_iterator_t, + pub(crate) start_seq_number: u64, } impl DBWALIterator { @@ -546,17 +547,39 @@ impl Iterator for DBWALIterator { return None; } + let mut seq: u64 = 0; + let mut batch = WriteBatch { + inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) }, + }; + + // if the initial sequence number is what was requested we skip it to + // only provide changes *after* it + if seq == self.start_seq_number { + unsafe { + ffi::rocksdb_wal_iter_next(self.inner); + } + + if !self.valid() { + return None; + } + + // this drops which in turn frees the skipped batch + batch = WriteBatch { + inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) }, + }; + } + + if !self.valid() { + return self.status().err().map(Result::Err); + } + // Seek to the next write batch. + // Note that WriteBatches live independently of the WAL iterator so this is safe to do unsafe { ffi::rocksdb_wal_iter_next(self.inner); } - if self.valid() { - let mut seq: u64 = 0; - let inner = unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) }; - Some(Ok((seq, WriteBatch { inner }))) - } else { - self.status().err().map(Result::Err) - } + + Some(Ok((seq, batch))) } } diff --git a/tests/test_db.rs b/tests/test_db.rs index 336670b..76b2c19 100644 --- a/tests/test_db.rs +++ b/tests/test_db.rs @@ -409,6 +409,39 @@ fn test_get_updates_since_empty() { assert!(iter.next().is_none()); } +#[test] +fn test_get_updates_since_start() { + let path = DBPath::new("_rust_rocksdb_test_test_get_updates_since_start"); + let db = DB::open_default(&path).unwrap(); + // add some records and collect sequence numbers, + // verify 4 batches of 1 put each were done + let seq0 = db.latest_sequence_number(); + db.put(b"key1", b"value1").unwrap(); + db.put(b"key2", b"value2").unwrap(); + db.put(b"key3", b"value3").unwrap(); + db.put(b"key4", b"value4").unwrap(); + let mut iter = db.get_updates_since(seq0).unwrap(); + let mut counts = OperationCounts { + puts: 0, + deletes: 0, + }; + let (seq, batch) = iter.next().unwrap().unwrap(); + assert_eq!(seq, 1); + batch.iterate(&mut counts); + let (seq, batch) = iter.next().unwrap().unwrap(); + assert_eq!(seq, 2); + batch.iterate(&mut counts); + let (seq, batch) = iter.next().unwrap().unwrap(); + assert_eq!(seq, 3); + batch.iterate(&mut counts); + let (seq, batch) = iter.next().unwrap().unwrap(); + assert_eq!(seq, 4); + batch.iterate(&mut counts); + assert!(iter.next().is_none()); + assert_eq!(counts.puts, 4); + assert_eq!(counts.deletes, 0); +} + #[test] fn test_get_updates_since_multiple_batches() { let path = DBPath::new("_rust_rocksdb_test_get_updates_since_multiple_batches");