From 6e39fd715be16de157f0858540ae761a48785927 Mon Sep 17 00:00:00 2001 From: Nathan Fiedler Date: Fri, 20 Sep 2019 09:36:51 -0700 Subject: [PATCH] Add get_updates_since() wrapper and its iterator (#332) --- CHANGELOG.md | 8 ++- src/db.rs | 142 +++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 3 +- tests/test_db.rs | 101 +++++++++++++++++++++++++++++++++ 4 files changed, 252 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f063735..4506574 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,13 @@ ## Unreleased -### 0.12.3 (2019-07-19) +### Added + +* `DB::get_updates_since()` to iterate write batches in a given sequence (nlfiedler). + +## 0.12.3 (2019-07-19) + +### Changes * Enabled sse4.2/pclmul for accelerated crc32c (yjh0502) * Added `set_db_write_buffer_size` to the Options API (rnarubin) diff --git a/src/db.rs b/src/db.rs index 539282e..67d59fc 100644 --- a/src/db.rs +++ b/src/db.rs @@ -161,6 +161,69 @@ pub struct DBRawIterator<'a> { db: PhantomData<&'a DB>, } +/// Iterates the batches of writes since a given sequence number. +/// +/// `DBWALIterator` is returned by `DB::get_updates_since()` and will return the +/// batches of write operations that have occurred since a given sequence number +/// (see `DB::latest_sequence_number()`). This iterator cannot be constructed by +/// the application. +/// +/// The iterator item type is a tuple of (`u64`, `WriteBatch`) where the first +/// value is the sequence number of the associated write batch. +/// +pub struct DBWALIterator { + inner: *mut ffi::rocksdb_wal_iterator_t, +} + +impl DBWALIterator { + /// Returns `true` if the iterator is valid. An iterator is invalidated when + /// it reaches the end of its defined range, or when it encounters an error. + /// + /// To check whether the iterator encountered an error after `valid` has + /// returned `false`, use the [`status`](DBWALIterator::status) method. + /// `status` will never return an error when `valid` is `true`. + pub fn valid(&self) -> bool { + unsafe { ffi::rocksdb_wal_iter_valid(self.inner) != 0 } + } + + /// Returns an error `Result` if the iterator has encountered an error + /// during operation. When an error is encountered, the iterator is + /// invalidated and [`valid`](DBWALIterator::valid) will return `false` when + /// called. + pub fn status(&self) -> Result<(), Error> { + unsafe { + ffi_try!(ffi::rocksdb_wal_iter_status(self.inner,)); + } + Ok(()) + } +} + +impl Iterator for DBWALIterator { + type Item = (u64, WriteBatch); + + fn next(&mut self) -> Option<(u64, WriteBatch)> { + // Seek to the next write batch. + unsafe { + ffi::rocksdb_wal_iter_next(self.inner); + } + if self.valid() { + let mut seq: u64 = 0; + let inner = unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) }; + Some((seq, WriteBatch { inner })) + } else { + None + } + } +} + +impl Drop for DBWALIterator { + fn drop(&mut self) { + unsafe { + ffi::rocksdb_wal_iter_destroy(self.inner); + } + } +} + /// An iterator over a database or column family, with specifiable /// ranges and direction. /// @@ -1573,6 +1636,66 @@ impl DB { pub fn latest_sequence_number(&self) -> u64 { unsafe { ffi::rocksdb_get_latest_sequence_number(self.inner) } } + + /// Iterate over batches of write operations since a given sequence. + /// + /// Produce an iterator that will provide the batches of write operations + /// that have occurred since the given sequence (see + /// `latest_sequence_number()`). Use the provided iterator to retrieve each + /// (`u64`, `WriteBatch`) tuple, and then gather the individual puts and + /// deletes using the `WriteBatch::iterate()` function. + /// + /// Calling `get_updates_since()` with a sequence number that is out of + /// bounds will return an error. + pub fn get_updates_since(&self, seq_number: u64) -> Result { + unsafe { + // rocksdb_wal_readoptions_t does not appear to have any functions + // for creating and destroying it; fortunately we can pass a nullptr + // here to get the default behavior + let opts: *const ffi::rocksdb_wal_readoptions_t = ptr::null();; + let iter = ffi_try!(ffi::rocksdb_get_updates_since(self.inner, seq_number, opts,)); + Ok(DBWALIterator { inner: iter }) + } + } +} + +/// Receives the puts and deletes of a write batch. +/// +/// The application must provide an implementation of this trait when +/// iterating the operations within a `WriteBatch` +pub trait WriteBatchIterator { + /// Called with a key and value that were `put` into the batch. + fn put(&mut self, key: Box<[u8]>, value: Box<[u8]>); + /// Called with a key that was `delete`d from the batch. + fn delete(&mut self, key: Box<[u8]>); +} + +unsafe extern "C" fn writebatch_put_callback( + state: *mut c_void, + k: *const c_char, + klen: usize, + v: *const c_char, + vlen: usize, +) { + // coerce the raw pointer back into a box, but "leak" it so we prevent + // freeing the resource before we are done with it + let boxed_cb = Box::from_raw(state as *mut &mut dyn WriteBatchIterator); + let leaked_cb = Box::leak(boxed_cb); + let key = slice::from_raw_parts(k as *const u8, klen as usize); + let value = slice::from_raw_parts(v as *const u8, vlen as usize); + leaked_cb.put( + key.to_vec().into_boxed_slice(), + value.to_vec().into_boxed_slice(), + ); +} + +unsafe extern "C" fn writebatch_delete_callback(state: *mut c_void, k: *const c_char, klen: usize) { + // coerce the raw pointer back into a box, but "leak" it so we prevent + // freeing the resource before we are done with it + let boxed_cb = Box::from_raw(state as *mut &mut dyn WriteBatchIterator); + let leaked_cb = Box::leak(boxed_cb); + let key = slice::from_raw_parts(k as *const u8, klen as usize); + leaked_cb.delete(key.to_vec().into_boxed_slice()); } impl WriteBatch { @@ -1593,6 +1716,25 @@ impl WriteBatch { self.len() == 0 } + /// Iterate the put and delete operations within this write batch. Note that + /// this does _not_ return an `Iterator` but instead will invoke the `put()` + /// and `delete()` member functions of the provided `WriteBatchIterator` + /// trait implementation. + pub fn iterate(&self, callbacks: &mut dyn WriteBatchIterator) { + let state = Box::into_raw(Box::new(callbacks)); + unsafe { + ffi::rocksdb_writebatch_iterate( + self.inner, + state as *mut c_void, + Some(writebatch_put_callback), + Some(writebatch_delete_callback), + ); + // we must manually set the raw box free since there is no + // associated "destroy" callback for this object + Box::from_raw(state); + } + } + /// Insert a value into the database under the given key. pub fn put(&mut self, key: K, value: V) -> Result<(), Error> where diff --git a/src/lib.rs b/src/lib.rs index 6daba9a..7e235ab 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -72,7 +72,8 @@ mod slice_transform; pub use compaction_filter::Decision as CompactionDecision; pub use db::{ DBCompactionStyle, DBCompressionType, DBIterator, DBPinnableSlice, DBRawIterator, - DBRecoveryMode, DBVector, Direction, IteratorMode, ReadOptions, Snapshot, WriteBatch, + DBRecoveryMode, DBVector, DBWALIterator, Direction, IteratorMode, ReadOptions, Snapshot, + WriteBatch, WriteBatchIterator, }; pub use slice_transform::SliceTransform; diff --git a/tests/test_db.rs b/tests/test_db.rs index cc7a85e..0e6ae38 100644 --- a/tests/test_db.rs +++ b/tests/test_db.rs @@ -251,3 +251,104 @@ fn test_sequence_number() { assert_eq!(db.latest_sequence_number(), 1); } } + +struct OperationCounts { + puts: usize, + deletes: usize, +} + +impl rocksdb::WriteBatchIterator for OperationCounts { + fn put(&mut self, _key: Box<[u8]>, _value: Box<[u8]>) { + self.puts += 1; + } + fn delete(&mut self, _key: Box<[u8]>) { + self.deletes += 1; + } +} + +#[test] +fn test_get_updates_since_empty() { + let path = DBPath::new("_rust_rocksdb_test_get_updates_since_empty"); + let db = DB::open_default(&path).unwrap(); + // get_updates_since() on an empty database + let mut iter = db.get_updates_since(0).unwrap(); + assert!(iter.next().is_none()); +} + +#[test] +fn test_get_updates_since_multiple_batches() { + let path = DBPath::new("_rust_rocksdb_test_get_updates_since_multiple_batches"); + let db = DB::open_default(&path).unwrap(); + // add some records and collect sequence numbers, + // verify 3 batches of 1 put each were done + db.put(b"key1", b"value1").unwrap(); + let seq1 = db.latest_sequence_number(); + db.put(b"key2", b"value2").unwrap(); + db.put(b"key3", b"value3").unwrap(); + db.put(b"key4", b"value4").unwrap(); + let mut iter = db.get_updates_since(seq1).unwrap(); + let mut counts = OperationCounts { + puts: 0, + deletes: 0, + }; + let (seq, batch) = iter.next().unwrap(); + assert_eq!(seq, 2); + batch.iterate(&mut counts); + let (seq, batch) = iter.next().unwrap(); + assert_eq!(seq, 3); + batch.iterate(&mut counts); + let (seq, batch) = iter.next().unwrap(); + assert_eq!(seq, 4); + batch.iterate(&mut counts); + assert!(iter.next().is_none()); + assert_eq!(counts.puts, 3); + assert_eq!(counts.deletes, 0); +} + +#[test] +fn test_get_updates_since_one_batch() { + let path = DBPath::new("_rust_rocksdb_test_get_updates_since_one_batch"); + let db = DB::open_default(&path).unwrap(); + db.put(b"key2", b"value2").unwrap(); + // some puts and deletes in a single batch, + // verify 1 put and 1 delete were done + let seq1 = db.latest_sequence_number(); + assert_eq!(seq1, 1); + let mut batch = WriteBatch::default(); + batch.put(b"key1", b"value1").unwrap(); + batch.delete(b"key2").unwrap(); + db.write(batch).unwrap(); + assert_eq!(db.latest_sequence_number(), 3); + let mut iter = db.get_updates_since(seq1).unwrap(); + let mut counts = OperationCounts { + puts: 0, + deletes: 0, + }; + let (seq, batch) = iter.next().unwrap(); + assert_eq!(seq, 2); + batch.iterate(&mut counts); + assert!(iter.next().is_none()); + assert_eq!(counts.puts, 1); + assert_eq!(counts.deletes, 1); +} + +#[test] +fn test_get_updates_since_nothing() { + let path = DBPath::new("_rust_rocksdb_test_get_updates_since_nothing"); + let db = DB::open_default(&path).unwrap(); + // get_updates_since() with no new changes + db.put(b"key1", b"value1").unwrap(); + let seq1 = db.latest_sequence_number(); + let mut iter = db.get_updates_since(seq1).unwrap(); + assert!(iter.next().is_none()); +} + +#[test] +fn test_get_updates_since_out_of_range() { + let path = DBPath::new("_rust_rocksdb_test_get_updates_since_out_of_range"); + let db = DB::open_default(&path).unwrap(); + db.put(b"key1", b"value1").unwrap(); + // get_updates_since() with an out of bounds sequence number + let result = db.get_updates_since(1000); + assert!(result.is_err()); +}