Merge pull request #112 from kitesearch/raw-iterator

Raw iterators
master
Tyler Neely 8 years ago committed by GitHub
commit 68c13ea65f
  1. 419
      src/db.rs
  2. 2
      src/lib.rs
  3. 1
      test/test.rs
  4. 125
      test/test_raw_iterator.rs

@ -100,6 +100,47 @@ pub struct Snapshot<'a> {
inner: *const ffi::rocksdb_snapshot_t,
}
/// An iterator over a database or column family, with specifiable
/// ranges and direction.
///
/// This iterator is different to the standard ``DBIterator`` as it aims Into
/// replicate the underlying iterator API within RocksDB itself. This should
/// give access to more performance and flexibility but departs from the
/// widely recognised Rust idioms.
///
/// ```
/// use rocksdb::DB;
///
/// let mut db = DB::open_default("path/for/rocksdb/storage4").unwrap();
/// let mut iter = db.raw_iterator();
///
/// // Forwards iteration
/// iter.seek_to_first();
/// while iter.valid() {
/// println!("Saw {:?} {:?}", iter.key(), iter.value());
/// iter.next();
/// }
///
/// // Reverse iteration
/// iter.seek_to_last();
/// while iter.valid() {
/// println!("Saw {:?} {:?}", iter.key(), iter.value());
/// iter.prev();
/// }
///
/// // Seeking
/// iter.seek(b"my key");
/// while iter.valid() {
/// println!("Saw {:?} {:?}", iter.key(), iter.value());
/// iter.next();
/// }
///
/// ```
pub struct DBRawIterator {
inner: *mut ffi::rocksdb_iterator_t,
}
/// An iterator over a database or column family, with specifiable
/// ranges and direction.
///
@ -128,7 +169,7 @@ pub struct Snapshot<'a> {
/// }
/// ```
pub struct DBIterator {
inner: *mut ffi::rocksdb_iterator_t,
raw: DBRawIterator,
direction: Direction,
just_seeked: bool,
}
@ -140,83 +181,247 @@ pub enum Direction {
pub type KVBytes = (Box<[u8]>, Box<[u8]>);
impl Iterator for DBIterator {
type Item = KVBytes;
pub enum IteratorMode<'a> {
Start,
End,
From(&'a [u8], Direction),
}
fn next(&mut self) -> Option<KVBytes> {
let native_iter = self.inner;
if !self.just_seeked {
match self.direction {
Direction::Forward => unsafe { ffi::rocksdb_iter_next(native_iter) },
Direction::Reverse => unsafe { ffi::rocksdb_iter_prev(native_iter) },
impl DBRawIterator {
fn new(db: &DB, readopts: &ReadOptions) -> DBRawIterator {
unsafe {
DBRawIterator {
inner: ffi::rocksdb_create_iterator(db.inner, readopts.inner),
}
} else {
self.just_seeked = false;
}
if unsafe { ffi::rocksdb_iter_valid(native_iter) != 0 } {
}
fn new_cf(db: &DB,
cf_handle: ColumnFamily,
readopts: &ReadOptions)
-> Result<DBRawIterator, Error> {
unsafe {
Ok(DBRawIterator {
inner: ffi::rocksdb_create_iterator_cf(db.inner, readopts.inner, cf_handle.inner),
})
}
}
/// Returns true if the iterator is valid.
pub fn valid(&self) -> bool {
unsafe { ffi::rocksdb_iter_valid(self.inner) != 0 }
}
/// Seeks to the first key in the database.
///
/// # Examples
///
/// ```rust
/// use rocksdb::DB;
///
/// let mut db = DB::open_default("path/for/rocksdb/storage5").unwrap();
/// let mut iter = db.raw_iterator();
///
/// // Iterate all keys from the start in lexicographic order
///
/// iter.seek_to_first();
///
/// while iter.valid() {
/// println!("{:?} {:?}", iter.key(), iter.value());
///
/// iter.next();
/// }
///
/// // Read just the first key
///
/// iter.seek_to_first();
///
/// if iter.valid() {
/// println!("{:?} {:?}", iter.key(), iter.value());
/// } else {
/// // There are no keys in the database
/// }
/// ```
pub fn seek_to_first(&mut self) {
unsafe { ffi::rocksdb_iter_seek_to_first(self.inner); }
}
/// Seeks to the last key in the database.
///
/// # Examples
///
/// ```rust
/// use rocksdb::DB;
///
/// let mut db = DB::open_default("path/for/rocksdb/storage6").unwrap();
/// let mut iter = db.raw_iterator();
///
/// // Iterate all keys from the end in reverse lexicographic order
///
/// iter.seek_to_last();
///
/// while iter.valid() {
/// println!("{:?} {:?}", iter.key(), iter.value());
///
/// iter.prev();
/// }
///
/// // Read just the last key
///
/// iter.seek_to_last();
///
/// if iter.valid() {
/// println!("{:?} {:?}", iter.key(), iter.value());
/// } else {
/// // There are no keys in the database
/// }
/// ```
pub fn seek_to_last(&mut self) {
unsafe { ffi::rocksdb_iter_seek_to_last(self.inner); }
}
/// Seeks to the specified key or the first key that lexicographically follows it.
///
/// This method will attempt to seek to the specified key. If that key does not exist, it will
/// find and seek to the key that lexicographically follows it instead.
///
/// # Examples
///
/// ```rust
/// use rocksdb::DB;
///
/// let mut db = DB::open_default("path/for/rocksdb/storage7").unwrap();
/// let mut iter = db.raw_iterator();
///
/// // Read the first key that starts with 'a'
///
/// iter.seek(b"a");
///
/// if iter.valid() {
/// println!("{:?} {:?}", iter.key(), iter.value());
/// } else {
/// // There are no keys in the database
/// }
/// ```
pub fn seek(&mut self, key: &[u8]) {
unsafe { ffi::rocksdb_iter_seek(self.inner, key.as_ptr() as *const c_char, key.len() as size_t); }
}
/*
SeekForPrev was added in RocksDB 4.13 but not implemented in the C API until RocksDB 5.0
/// Seeks to the specified key, or the first key that lexicographically precedes it.
///
/// Like ``.seek()`` this method will attempt to seek to the specified key.
/// The difference with ``.seek()`` is that if the specified key do not exist, this method will
/// seek to key that lexicographically precedes it instead.
///
/// # Examples
///
/// ```rust
/// use rocksdb::DB;
///
/// let mut db = DB::open_default("path/for/rocksdb/storage8").unwrap();
/// let mut iter = db.raw_iterator();
///
/// // Read the last key that starts with 'a'
///
/// iter.seek_for_prev(b"b");
///
/// if iter.valid() {
/// println!("{:?} {:?}", iter.key(), iter.value());
/// } else {
/// // There are no keys in the database
/// }
pub fn seek_for_prev(&mut self, key: &[u8]) {
unsafe { ffi::rocksdb_iter_seek_for_prev(self.inner, key.as_ptr() as *const c_char, key.len() as size_t); }
}
*/
/// Seeks to the next key.
///
/// Returns true if the iterator is valid after this operation.
pub fn next(&mut self) {
unsafe { ffi::rocksdb_iter_next(self.inner); }
}
/// Seeks to the previous key.
///
/// Returns true if the iterator is valid after this operation.
pub fn prev(&mut self) {
unsafe { ffi::rocksdb_iter_prev(self.inner); }
}
/// Returns a slice to the internal buffer storing the current key.
///
/// This may be slightly more performant to use than the standard ``.key()`` method
/// as it does not copy the key. However, you must be careful to not use the buffer
/// if the iterator's seek position is ever moved by any of the seek commands or the
/// ``.next()`` and ``.previous()`` methods as the underlying buffer may be reused
/// for something else or freed entirely.
pub unsafe fn key_inner<'a>(&'a self) -> Option<&'a [u8]> {
if self.valid() {
let mut key_len: size_t = 0;
let key_len_ptr: *mut size_t = &mut key_len;
let key_ptr = ffi::rocksdb_iter_key(self.inner, key_len_ptr) as *const c_uchar;
Some(slice::from_raw_parts(key_ptr, key_len as usize))
} else {
None
}
}
/// Returns a copy of the current key.
pub fn key(&self) -> Option<Vec<u8>> {
unsafe {
self.key_inner().map(|key| key.to_vec())
}
}
/// Returns a slice to the internal buffer storing the current value.
///
/// This may be slightly more performant to use than the standard ``.value()`` method
/// as it does not copy the value. However, you must be careful to not use the buffer
/// if the iterator's seek position is ever moved by any of the seek commands or the
/// ``.next()`` and ``.previous()`` methods as the underlying buffer may be reused
/// for something else or freed entirely.
pub unsafe fn value_inner<'a>(&'a self) -> Option<&'a [u8]> {
if self.valid() {
let mut val_len: size_t = 0;
let val_len_ptr: *mut size_t = &mut val_len;
let key_ptr =
unsafe { ffi::rocksdb_iter_key(native_iter, key_len_ptr) as *const c_uchar };
let key = unsafe { slice::from_raw_parts(key_ptr, key_len as usize) };
let val_ptr =
unsafe { ffi::rocksdb_iter_value(native_iter, val_len_ptr) as *const c_uchar };
let val = unsafe { slice::from_raw_parts(val_ptr, val_len as usize) };
Some((key.to_vec().into_boxed_slice(), val.to_vec().into_boxed_slice()))
let val_ptr = ffi::rocksdb_iter_value(self.inner, val_len_ptr) as *const c_uchar;
Some(slice::from_raw_parts(val_ptr, val_len as usize))
} else {
None
}
}
}
pub enum IteratorMode<'a> {
Start,
End,
From(&'a [u8], Direction),
}
impl DBIterator {
fn new(db: &DB, readopts: &ReadOptions, mode: IteratorMode) -> DBIterator {
/// Returns a copy of the current value.
pub fn value(&self) -> Option<Vec<u8>> {
unsafe {
let iterator = ffi::rocksdb_create_iterator(db.inner, readopts.inner);
let mut rv = DBIterator {
inner: iterator,
direction: Direction::Forward, // blown away by set_mode()
just_seeked: false,
};
rv.set_mode(mode);
rv
self.value_inner().map(|value| value.to_vec())
}
}
}
pub fn set_mode(&mut self, mode: IteratorMode) {
impl Drop for DBRawIterator {
fn drop(&mut self) {
unsafe {
match mode {
IteratorMode::Start => {
ffi::rocksdb_iter_seek_to_first(self.inner);
self.direction = Direction::Forward;
}
IteratorMode::End => {
ffi::rocksdb_iter_seek_to_last(self.inner);
self.direction = Direction::Reverse;
}
IteratorMode::From(key, dir) => {
ffi::rocksdb_iter_seek(self.inner,
key.as_ptr() as *const c_char,
key.len() as size_t);
self.direction = dir;
}
};
self.just_seeked = true;
ffi::rocksdb_iter_destroy(self.inner);
}
}
}
pub fn valid(&self) -> bool {
unsafe { ffi::rocksdb_iter_valid(self.inner) != 0 }
impl DBIterator {
fn new(db: &DB, readopts: &ReadOptions, mode: IteratorMode) -> DBIterator {
let mut rv = DBIterator {
raw: DBRawIterator::new(db, readopts),
direction: Direction::Forward, // blown away by set_mode()
just_seeked: false,
};
rv.set_mode(mode);
rv
}
fn new_cf(db: &DB,
@ -224,28 +429,70 @@ impl DBIterator {
readopts: &ReadOptions,
mode: IteratorMode)
-> Result<DBIterator, Error> {
unsafe {
let iterator = ffi::rocksdb_create_iterator_cf(db.inner, readopts.inner, cf_handle.inner);
let mut rv = DBIterator {
inner: iterator,
direction: Direction::Forward, // blown away by set_mode()
just_seeked: false,
};
rv.set_mode(mode);
Ok(rv)
}
let mut rv = DBIterator {
raw: try!(DBRawIterator::new_cf(db, cf_handle, readopts)),
direction: Direction::Forward, // blown away by set_mode()
just_seeked: false,
};
rv.set_mode(mode);
Ok(rv)
}
pub fn set_mode(&mut self, mode: IteratorMode) {
match mode {
IteratorMode::Start => {
self.raw.seek_to_first();
self.direction = Direction::Forward;
}
IteratorMode::End => {
self.raw.seek_to_last();
self.direction = Direction::Reverse;
}
IteratorMode::From(key, dir) => {
// TODO: Should use seek_for_prev when reversing
self.raw.seek(key);
self.direction = dir;
}
};
self.just_seeked = true;
}
pub fn valid(&self) -> bool {
self.raw.valid()
}
}
impl Drop for DBIterator {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_iter_destroy(self.inner);
impl Iterator for DBIterator {
type Item = KVBytes;
fn next(&mut self) -> Option<KVBytes> {
// Initial call to next() after seeking should not move the iterator
// or the first item will not be returned
if !self.just_seeked {
match self.direction {
Direction::Forward => self.raw.next(),
Direction::Reverse => self.raw.prev(),
}
} else {
self.just_seeked = false;
}
if self.raw.valid() {
// .key() and .value() only ever return None if valid == false, which we've just cheked
Some((self.raw.key().unwrap().into_boxed_slice(), self.raw.value().unwrap().into_boxed_slice()))
} else {
None
}
}
}
impl Into<DBRawIterator> for DBIterator {
fn into(self) -> DBRawIterator {
self.raw
}
}
impl<'a> Snapshot<'a> {
pub fn new(db: &DB) -> Snapshot {
let snapshot = unsafe { ffi::rocksdb_create_snapshot(db.inner) };
@ -270,6 +517,20 @@ impl<'a> Snapshot<'a> {
DBIterator::new_cf(self.db, cf_handle, &readopts, mode)
}
pub fn raw_iterator(&self) -> DBRawIterator {
let mut readopts = ReadOptions::default();
readopts.set_snapshot(self);
DBRawIterator::new(self.db, &readopts)
}
pub fn raw_iterator_cf(&self,
cf_handle: ColumnFamily)
-> Result<DBRawIterator, Error> {
let mut readopts = ReadOptions::default();
readopts.set_snapshot(self);
DBRawIterator::new_cf(self.db, cf_handle, &readopts)
}
pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, Error> {
let mut readopts = ReadOptions::default();
readopts.set_snapshot(self);
@ -549,6 +810,18 @@ impl DB {
DBIterator::new_cf(self, cf_handle, &opts, mode)
}
pub fn raw_iterator(&self) -> DBRawIterator {
let opts = ReadOptions::default();
DBRawIterator::new(self, &opts)
}
pub fn raw_iterator_cf(&self,
cf_handle: ColumnFamily)
-> Result<DBRawIterator, Error> {
let opts = ReadOptions::default();
DBRawIterator::new_cf(self, cf_handle, &opts)
}
pub fn snapshot(&self) -> Snapshot {
Snapshot::new(self)
}

@ -44,7 +44,7 @@ pub mod compaction_filter;
mod db;
mod db_options;
pub use db::{DBCompactionStyle, DBCompressionType, DBIterator, DBRecoveryMode, DBVector,
pub use db::{DBCompactionStyle, DBCompressionType, DBIterator, DBRawIterator, DBRecoveryMode, DBVector,
Direction, IteratorMode, Snapshot, WriteBatch, new_bloom_filter};
pub use merge_operator::MergeOperands;

@ -16,6 +16,7 @@
extern crate rocksdb;
mod test_iterator;
mod test_raw_iterator;
mod test_multithreaded;
mod test_column_family;
mod test_rocksdb_options;

@ -0,0 +1,125 @@
// Copyright 2014 Tyler Neely
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
use rocksdb::DB;
fn setup_test_db(name: &str) -> DB {
use std::fs::remove_dir_all;
let path = "_rust_rocksdb_rawiteratortest_".to_string() + name;
match remove_dir_all(&path) {
Ok(_) => {}
Err(_) => {} // Don't care if tis fails
}
DB::open_default(path).unwrap()
}
#[test]
pub fn test_forwards_iteration() {
let db = setup_test_db("forwards_iteration");
db.put(b"k1", b"v1").unwrap();
db.put(b"k2", b"v2").unwrap();
db.put(b"k3", b"v3").unwrap();
db.put(b"k4", b"v4").unwrap();
let mut iter = db.raw_iterator();
iter.seek_to_first();
assert_eq!(iter.valid(), true);
assert_eq!(iter.key(), Some(b"k1".to_vec()));
assert_eq!(iter.value(), Some(b"v1".to_vec()));
iter.next();
assert_eq!(iter.valid(), true);
assert_eq!(iter.key(), Some(b"k2".to_vec()));
assert_eq!(iter.value(), Some(b"v2".to_vec()));
iter.next(); // k3
iter.next(); // k4
iter.next(); // invalid!
assert_eq!(iter.valid(), false);
assert_eq!(iter.key(), None);
assert_eq!(iter.value(), None);
}
#[test]
pub fn test_seek_last() {
let db = setup_test_db("backwards_iteration");
db.put(b"k1", b"v1").unwrap();
db.put(b"k2", b"v2").unwrap();
db.put(b"k3", b"v3").unwrap();
db.put(b"k4", b"v4").unwrap();
let mut iter = db.raw_iterator();
iter.seek_to_last();
assert_eq!(iter.valid(), true);
assert_eq!(iter.key(), Some(b"k4".to_vec()));
assert_eq!(iter.value(), Some(b"v4".to_vec()));
iter.prev();
assert_eq!(iter.valid(), true);
assert_eq!(iter.key(), Some(b"k3".to_vec()));
assert_eq!(iter.value(), Some(b"v3".to_vec()));
iter.prev(); // k2
iter.prev(); // k1
iter.prev(); // invalid!
assert_eq!(iter.valid(), false);
assert_eq!(iter.key(), None);
assert_eq!(iter.value(), None);
}
#[test]
pub fn test_seek() {
let db = setup_test_db("seek");
db.put(b"k1", b"v1").unwrap();
db.put(b"k2", b"v2").unwrap();
db.put(b"k3", b"v3").unwrap();
db.put(b"k4", b"v4").unwrap();
let mut iter = db.raw_iterator();
iter.seek(b"k2");
assert_eq!(iter.valid(), true);
assert_eq!(iter.key(), Some(b"k2".to_vec()));
assert_eq!(iter.value(), Some(b"v2".to_vec()));
}
#[test]
pub fn test_seek_to_nonexistant() {
let db = setup_test_db("seek_to_nonexistant");
db.put(b"k1", b"v1").unwrap();
db.put(b"k3", b"v3").unwrap();
db.put(b"k4", b"v4").unwrap();
let mut iter = db.raw_iterator();
iter.seek(b"k2");
assert_eq!(iter.valid(), true);
assert_eq!(iter.key(), Some(b"k3".to_vec()));
assert_eq!(iter.value(), Some(b"v3".to_vec()));
}
Loading…
Cancel
Save