Add get_updates_since() wrapper and its iterator (#332)

master
Nathan Fiedler 5 years ago committed by Ilya Bogdanov
parent 25652570df
commit 6e39fd715b
  1. 8
      CHANGELOG.md
  2. 142
      src/db.rs
  3. 3
      src/lib.rs
  4. 101
      tests/test_db.rs

@ -2,7 +2,13 @@
## Unreleased ## Unreleased
### 0.12.3 (2019-07-19) ### Added
* `DB::get_updates_since()` to iterate write batches in a given sequence (nlfiedler).
## 0.12.3 (2019-07-19)
### Changes
* Enabled sse4.2/pclmul for accelerated crc32c (yjh0502) * Enabled sse4.2/pclmul for accelerated crc32c (yjh0502)
* Added `set_db_write_buffer_size` to the Options API (rnarubin) * Added `set_db_write_buffer_size` to the Options API (rnarubin)

@ -161,6 +161,69 @@ pub struct DBRawIterator<'a> {
db: PhantomData<&'a DB>, db: PhantomData<&'a DB>,
} }
/// Iterates the batches of writes since a given sequence number.
///
/// `DBWALIterator` is returned by `DB::get_updates_since()` and will return the
/// batches of write operations that have occurred since a given sequence number
/// (see `DB::latest_sequence_number()`). This iterator cannot be constructed by
/// the application.
///
/// The iterator item type is a tuple of (`u64`, `WriteBatch`) where the first
/// value is the sequence number of the associated write batch.
///
pub struct DBWALIterator {
inner: *mut ffi::rocksdb_wal_iterator_t,
}
impl DBWALIterator {
/// Returns `true` if the iterator is valid. An iterator is invalidated when
/// it reaches the end of its defined range, or when it encounters an error.
///
/// To check whether the iterator encountered an error after `valid` has
/// returned `false`, use the [`status`](DBWALIterator::status) method.
/// `status` will never return an error when `valid` is `true`.
pub fn valid(&self) -> bool {
unsafe { ffi::rocksdb_wal_iter_valid(self.inner) != 0 }
}
/// Returns an error `Result` if the iterator has encountered an error
/// during operation. When an error is encountered, the iterator is
/// invalidated and [`valid`](DBWALIterator::valid) will return `false` when
/// called.
pub fn status(&self) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_wal_iter_status(self.inner,));
}
Ok(())
}
}
impl Iterator for DBWALIterator {
type Item = (u64, WriteBatch);
fn next(&mut self) -> Option<(u64, WriteBatch)> {
// Seek to the next write batch.
unsafe {
ffi::rocksdb_wal_iter_next(self.inner);
}
if self.valid() {
let mut seq: u64 = 0;
let inner = unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) };
Some((seq, WriteBatch { inner }))
} else {
None
}
}
}
impl Drop for DBWALIterator {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_wal_iter_destroy(self.inner);
}
}
}
/// An iterator over a database or column family, with specifiable /// An iterator over a database or column family, with specifiable
/// ranges and direction. /// ranges and direction.
/// ///
@ -1573,6 +1636,66 @@ impl DB {
pub fn latest_sequence_number(&self) -> u64 { pub fn latest_sequence_number(&self) -> u64 {
unsafe { ffi::rocksdb_get_latest_sequence_number(self.inner) } unsafe { ffi::rocksdb_get_latest_sequence_number(self.inner) }
} }
/// Iterate over batches of write operations since a given sequence.
///
/// Produce an iterator that will provide the batches of write operations
/// that have occurred since the given sequence (see
/// `latest_sequence_number()`). Use the provided iterator to retrieve each
/// (`u64`, `WriteBatch`) tuple, and then gather the individual puts and
/// deletes using the `WriteBatch::iterate()` function.
///
/// Calling `get_updates_since()` with a sequence number that is out of
/// bounds will return an error.
pub fn get_updates_since(&self, seq_number: u64) -> Result<DBWALIterator, Error> {
unsafe {
// rocksdb_wal_readoptions_t does not appear to have any functions
// for creating and destroying it; fortunately we can pass a nullptr
// here to get the default behavior
let opts: *const ffi::rocksdb_wal_readoptions_t = ptr::null();;
let iter = ffi_try!(ffi::rocksdb_get_updates_since(self.inner, seq_number, opts,));
Ok(DBWALIterator { inner: iter })
}
}
}
/// Receives the puts and deletes of a write batch.
///
/// The application must provide an implementation of this trait when
/// iterating the operations within a `WriteBatch`
pub trait WriteBatchIterator {
/// Called with a key and value that were `put` into the batch.
fn put(&mut self, key: Box<[u8]>, value: Box<[u8]>);
/// Called with a key that was `delete`d from the batch.
fn delete(&mut self, key: Box<[u8]>);
}
unsafe extern "C" fn writebatch_put_callback(
state: *mut c_void,
k: *const c_char,
klen: usize,
v: *const c_char,
vlen: usize,
) {
// coerce the raw pointer back into a box, but "leak" it so we prevent
// freeing the resource before we are done with it
let boxed_cb = Box::from_raw(state as *mut &mut dyn WriteBatchIterator);
let leaked_cb = Box::leak(boxed_cb);
let key = slice::from_raw_parts(k as *const u8, klen as usize);
let value = slice::from_raw_parts(v as *const u8, vlen as usize);
leaked_cb.put(
key.to_vec().into_boxed_slice(),
value.to_vec().into_boxed_slice(),
);
}
unsafe extern "C" fn writebatch_delete_callback(state: *mut c_void, k: *const c_char, klen: usize) {
// coerce the raw pointer back into a box, but "leak" it so we prevent
// freeing the resource before we are done with it
let boxed_cb = Box::from_raw(state as *mut &mut dyn WriteBatchIterator);
let leaked_cb = Box::leak(boxed_cb);
let key = slice::from_raw_parts(k as *const u8, klen as usize);
leaked_cb.delete(key.to_vec().into_boxed_slice());
} }
impl WriteBatch { impl WriteBatch {
@ -1593,6 +1716,25 @@ impl WriteBatch {
self.len() == 0 self.len() == 0
} }
/// Iterate the put and delete operations within this write batch. Note that
/// this does _not_ return an `Iterator` but instead will invoke the `put()`
/// and `delete()` member functions of the provided `WriteBatchIterator`
/// trait implementation.
pub fn iterate(&self, callbacks: &mut dyn WriteBatchIterator) {
let state = Box::into_raw(Box::new(callbacks));
unsafe {
ffi::rocksdb_writebatch_iterate(
self.inner,
state as *mut c_void,
Some(writebatch_put_callback),
Some(writebatch_delete_callback),
);
// we must manually set the raw box free since there is no
// associated "destroy" callback for this object
Box::from_raw(state);
}
}
/// Insert a value into the database under the given key. /// Insert a value into the database under the given key.
pub fn put<K, V>(&mut self, key: K, value: V) -> Result<(), Error> pub fn put<K, V>(&mut self, key: K, value: V) -> Result<(), Error>
where where

@ -72,7 +72,8 @@ mod slice_transform;
pub use compaction_filter::Decision as CompactionDecision; pub use compaction_filter::Decision as CompactionDecision;
pub use db::{ pub use db::{
DBCompactionStyle, DBCompressionType, DBIterator, DBPinnableSlice, DBRawIterator, DBCompactionStyle, DBCompressionType, DBIterator, DBPinnableSlice, DBRawIterator,
DBRecoveryMode, DBVector, Direction, IteratorMode, ReadOptions, Snapshot, WriteBatch, DBRecoveryMode, DBVector, DBWALIterator, Direction, IteratorMode, ReadOptions, Snapshot,
WriteBatch, WriteBatchIterator,
}; };
pub use slice_transform::SliceTransform; pub use slice_transform::SliceTransform;

@ -251,3 +251,104 @@ fn test_sequence_number() {
assert_eq!(db.latest_sequence_number(), 1); assert_eq!(db.latest_sequence_number(), 1);
} }
} }
struct OperationCounts {
puts: usize,
deletes: usize,
}
impl rocksdb::WriteBatchIterator for OperationCounts {
fn put(&mut self, _key: Box<[u8]>, _value: Box<[u8]>) {
self.puts += 1;
}
fn delete(&mut self, _key: Box<[u8]>) {
self.deletes += 1;
}
}
#[test]
fn test_get_updates_since_empty() {
let path = DBPath::new("_rust_rocksdb_test_get_updates_since_empty");
let db = DB::open_default(&path).unwrap();
// get_updates_since() on an empty database
let mut iter = db.get_updates_since(0).unwrap();
assert!(iter.next().is_none());
}
#[test]
fn test_get_updates_since_multiple_batches() {
let path = DBPath::new("_rust_rocksdb_test_get_updates_since_multiple_batches");
let db = DB::open_default(&path).unwrap();
// add some records and collect sequence numbers,
// verify 3 batches of 1 put each were done
db.put(b"key1", b"value1").unwrap();
let seq1 = db.latest_sequence_number();
db.put(b"key2", b"value2").unwrap();
db.put(b"key3", b"value3").unwrap();
db.put(b"key4", b"value4").unwrap();
let mut iter = db.get_updates_since(seq1).unwrap();
let mut counts = OperationCounts {
puts: 0,
deletes: 0,
};
let (seq, batch) = iter.next().unwrap();
assert_eq!(seq, 2);
batch.iterate(&mut counts);
let (seq, batch) = iter.next().unwrap();
assert_eq!(seq, 3);
batch.iterate(&mut counts);
let (seq, batch) = iter.next().unwrap();
assert_eq!(seq, 4);
batch.iterate(&mut counts);
assert!(iter.next().is_none());
assert_eq!(counts.puts, 3);
assert_eq!(counts.deletes, 0);
}
#[test]
fn test_get_updates_since_one_batch() {
let path = DBPath::new("_rust_rocksdb_test_get_updates_since_one_batch");
let db = DB::open_default(&path).unwrap();
db.put(b"key2", b"value2").unwrap();
// some puts and deletes in a single batch,
// verify 1 put and 1 delete were done
let seq1 = db.latest_sequence_number();
assert_eq!(seq1, 1);
let mut batch = WriteBatch::default();
batch.put(b"key1", b"value1").unwrap();
batch.delete(b"key2").unwrap();
db.write(batch).unwrap();
assert_eq!(db.latest_sequence_number(), 3);
let mut iter = db.get_updates_since(seq1).unwrap();
let mut counts = OperationCounts {
puts: 0,
deletes: 0,
};
let (seq, batch) = iter.next().unwrap();
assert_eq!(seq, 2);
batch.iterate(&mut counts);
assert!(iter.next().is_none());
assert_eq!(counts.puts, 1);
assert_eq!(counts.deletes, 1);
}
#[test]
fn test_get_updates_since_nothing() {
let path = DBPath::new("_rust_rocksdb_test_get_updates_since_nothing");
let db = DB::open_default(&path).unwrap();
// get_updates_since() with no new changes
db.put(b"key1", b"value1").unwrap();
let seq1 = db.latest_sequence_number();
let mut iter = db.get_updates_since(seq1).unwrap();
assert!(iter.next().is_none());
}
#[test]
fn test_get_updates_since_out_of_range() {
let path = DBPath::new("_rust_rocksdb_test_get_updates_since_out_of_range");
let db = DB::open_default(&path).unwrap();
db.put(b"key1", b"value1").unwrap();
// get_updates_since() with an out of bounds sequence number
let result = db.get_updates_since(1000);
assert!(result.is_err());
}

Loading…
Cancel
Save