diff --git a/src/column_family.rs b/src/column_family.rs new file mode 100644 index 0000000..62665f9 --- /dev/null +++ b/src/column_family.rs @@ -0,0 +1,44 @@ +// Copyright 2020 Tyler Neely +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::{ffi, Options}; + +/// A descriptor for a RocksDB column family. +/// +/// A description of the column family, containing the name and `Options`. +pub struct ColumnFamilyDescriptor { + pub(crate) name: String, + pub(crate) options: Options, +} + +impl ColumnFamilyDescriptor { + // Create a new column family descriptor with the specified name and options. + pub fn new(name: S, options: Options) -> Self + where + S: Into, + { + ColumnFamilyDescriptor { + name: name.into(), + options, + } + } +} + +/// An opaque type used to represent a column family. Returned from some functions, and used +/// in others +pub struct ColumnFamily { + pub(crate) inner: *mut ffi::rocksdb_column_family_handle_t, +} + +unsafe impl Send for ColumnFamily {} diff --git a/src/db.rs b/src/db.rs index fafe067..5c1bc5e 100644 --- a/src/db.rs +++ b/src/db.rs @@ -14,733 +14,41 @@ // use crate::{ - ffi, ffi_util::opt_bytes_to_ptr, ColumnFamily, ColumnFamilyDescriptor, Error, FlushOptions, - Options, WriteOptions, DB, + ffi, + ffi_util::{opt_bytes_to_ptr, to_cpath}, + ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBPinnableSlice, DBRawIterator, + DBWALIterator, Direction, Error, FlushOptions, IteratorMode, Options, ReadOptions, Snapshot, + WriteBatch, WriteOptions, }; -use libc::{self, c_char, c_int, c_uchar, c_void, size_t}; +use libc::{self, c_char, c_int, c_void, size_t}; use std::collections::BTreeMap; use std::ffi::{CStr, CString}; use std::fmt; use std::fs; -use std::marker::PhantomData; -use std::ops::Deref; use std::path::Path; +use std::path::PathBuf; use std::ptr; use std::slice; use std::str; -unsafe impl Send for DB {} -unsafe impl Sync for DB {} - -#[derive(Debug, Copy, Clone, PartialEq)] -pub enum DBCompressionType { - None = ffi::rocksdb_no_compression as isize, - Snappy = ffi::rocksdb_snappy_compression as isize, - Zlib = ffi::rocksdb_zlib_compression as isize, - Bz2 = ffi::rocksdb_bz2_compression as isize, - Lz4 = ffi::rocksdb_lz4_compression as isize, - Lz4hc = ffi::rocksdb_lz4hc_compression as isize, - Zstd = ffi::rocksdb_zstd_compression as isize, -} - -#[derive(Debug, Copy, Clone, PartialEq)] -pub enum DBCompactionStyle { - Level = ffi::rocksdb_level_compaction as isize, - Universal = ffi::rocksdb_universal_compaction as isize, - Fifo = ffi::rocksdb_fifo_compaction as isize, -} - -#[derive(Debug, Copy, Clone, PartialEq)] -pub enum DBRecoveryMode { - TolerateCorruptedTailRecords = ffi::rocksdb_tolerate_corrupted_tail_records_recovery as isize, - AbsoluteConsistency = ffi::rocksdb_absolute_consistency_recovery as isize, - PointInTime = ffi::rocksdb_point_in_time_recovery as isize, - SkipAnyCorruptedRecord = ffi::rocksdb_skip_any_corrupted_records_recovery as isize, -} - -/// An atomic batch of write operations. -/// -/// Making an atomic commit of several writes: -/// -/// ``` -/// use rocksdb::{DB, Options, WriteBatch}; -/// -/// let path = "_path_for_rocksdb_storage1"; -/// { -/// let db = DB::open_default(path).unwrap(); -/// let mut batch = WriteBatch::default(); -/// batch.put(b"my key", b"my value"); -/// batch.put(b"key2", b"value2"); -/// batch.put(b"key3", b"value3"); -/// db.write(batch); // Atomically commits the batch -/// } -/// let _ = DB::destroy(&Options::default(), path); -/// ``` -pub struct WriteBatch { - inner: *mut ffi::rocksdb_writebatch_t, -} - -pub struct ReadOptions { - inner: *mut ffi::rocksdb_readoptions_t, -} - -/// A consistent view of the database at the point of creation. -/// -/// ``` -/// use rocksdb::{DB, IteratorMode, Options}; -/// -/// let path = "_path_for_rocksdb_storage3"; -/// { -/// let db = DB::open_default(path).unwrap(); -/// let snapshot = db.snapshot(); // Creates a longer-term snapshot of the DB, but closed when goes out of scope -/// let mut iter = snapshot.iterator(IteratorMode::Start); // Make as many iterators as you'd like from one snapshot -/// } -/// let _ = DB::destroy(&Options::default(), path); -/// ``` -/// -pub struct Snapshot<'a> { - db: &'a DB, - inner: *const ffi::rocksdb_snapshot_t, -} - -/// `Send` and `Sync` implementations for `Snapshot` are safe, because `Snapshot` is -/// immutable and can be safely shared between threads. -unsafe impl<'a> Send for Snapshot<'a> {} -unsafe impl<'a> Sync for Snapshot<'a> {} - -/// An iterator over a database or column family, with specifiable -/// ranges and direction. -/// -/// This iterator is different to the standard ``DBIterator`` as it aims Into -/// replicate the underlying iterator API within RocksDB itself. This should -/// give access to more performance and flexibility but departs from the -/// widely recognised Rust idioms. -/// -/// ``` -/// use rocksdb::{DB, Options}; -/// -/// let path = "_path_for_rocksdb_storage4"; -/// { -/// let db = DB::open_default(path).unwrap(); -/// let mut iter = db.raw_iterator(); -/// -/// // Forwards iteration -/// iter.seek_to_first(); -/// while iter.valid() { -/// println!("Saw {:?} {:?}", iter.key(), iter.value()); -/// iter.next(); -/// } -/// -/// // Reverse iteration -/// iter.seek_to_last(); -/// while iter.valid() { -/// println!("Saw {:?} {:?}", iter.key(), iter.value()); -/// iter.prev(); -/// } -/// -/// // Seeking -/// iter.seek(b"my key"); -/// while iter.valid() { -/// println!("Saw {:?} {:?}", iter.key(), iter.value()); -/// iter.next(); -/// } -/// -/// // Reverse iteration from key -/// // Note, use seek_for_prev when reversing because if this key doesn't exist, -/// // this will make the iterator start from the previous key rather than the next. -/// iter.seek_for_prev(b"my key"); -/// while iter.valid() { -/// println!("Saw {:?} {:?}", iter.key(), iter.value()); -/// iter.prev(); -/// } -/// } -/// let _ = DB::destroy(&Options::default(), path); -/// ``` -pub struct DBRawIterator<'a> { - inner: *mut ffi::rocksdb_iterator_t, - db: PhantomData<&'a DB>, -} - -/// Iterates the batches of writes since a given sequence number. -/// -/// `DBWALIterator` is returned by `DB::get_updates_since()` and will return the -/// batches of write operations that have occurred since a given sequence number -/// (see `DB::latest_sequence_number()`). This iterator cannot be constructed by -/// the application. +/// A RocksDB database. /// -/// The iterator item type is a tuple of (`u64`, `WriteBatch`) where the first -/// value is the sequence number of the associated write batch. -/// -pub struct DBWALIterator { - inner: *mut ffi::rocksdb_wal_iterator_t, -} - -impl DBWALIterator { - /// Returns `true` if the iterator is valid. An iterator is invalidated when - /// it reaches the end of its defined range, or when it encounters an error. - /// - /// To check whether the iterator encountered an error after `valid` has - /// returned `false`, use the [`status`](DBWALIterator::status) method. - /// `status` will never return an error when `valid` is `true`. - pub fn valid(&self) -> bool { - unsafe { ffi::rocksdb_wal_iter_valid(self.inner) != 0 } - } - - /// Returns an error `Result` if the iterator has encountered an error - /// during operation. When an error is encountered, the iterator is - /// invalidated and [`valid`](DBWALIterator::valid) will return `false` when - /// called. - pub fn status(&self) -> Result<(), Error> { - unsafe { - ffi_try!(ffi::rocksdb_wal_iter_status(self.inner)); - } - Ok(()) - } -} - -impl Iterator for DBWALIterator { - type Item = (u64, WriteBatch); - - fn next(&mut self) -> Option<(u64, WriteBatch)> { - // Seek to the next write batch. - unsafe { - ffi::rocksdb_wal_iter_next(self.inner); - } - if self.valid() { - let mut seq: u64 = 0; - let inner = unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) }; - Some((seq, WriteBatch { inner })) - } else { - None - } - } -} - -impl Drop for DBWALIterator { - fn drop(&mut self) { - unsafe { - ffi::rocksdb_wal_iter_destroy(self.inner); - } - } -} - -/// An iterator over a database or column family, with specifiable -/// ranges and direction. -/// -/// ``` -/// use rocksdb::{DB, Direction, IteratorMode, Options}; -/// -/// let path = "_path_for_rocksdb_storage2"; -/// { -/// let db = DB::open_default(path).unwrap(); -/// let mut iter = db.iterator(IteratorMode::Start); // Always iterates forward -/// for (key, value) in iter { -/// println!("Saw {:?} {:?}", key, value); -/// } -/// iter = db.iterator(IteratorMode::End); // Always iterates backward -/// for (key, value) in iter { -/// println!("Saw {:?} {:?}", key, value); -/// } -/// iter = db.iterator(IteratorMode::From(b"my key", Direction::Forward)); // From a key in Direction::{forward,reverse} -/// for (key, value) in iter { -/// println!("Saw {:?} {:?}", key, value); -/// } -/// -/// // You can seek with an existing Iterator instance, too -/// iter = db.iterator(IteratorMode::Start); -/// iter.set_mode(IteratorMode::From(b"another key", Direction::Reverse)); -/// for (key, value) in iter { -/// println!("Saw {:?} {:?}", key, value); -/// } -/// } -/// let _ = DB::destroy(&Options::default(), path); -/// ``` -pub struct DBIterator<'a> { - raw: DBRawIterator<'a>, - direction: Direction, - just_seeked: bool, -} - -pub enum Direction { - Forward, - Reverse, -} - -pub type KVBytes = (Box<[u8]>, Box<[u8]>); - -pub enum IteratorMode<'a> { - Start, - End, - From(&'a [u8], Direction), -} - -impl<'a> DBRawIterator<'a> { - fn new(db: &DB, readopts: &ReadOptions) -> DBRawIterator<'a> { - unsafe { - DBRawIterator { - inner: ffi::rocksdb_create_iterator(db.inner, readopts.inner), - db: PhantomData, - } - } - } - - fn new_cf(db: &DB, cf_handle: &ColumnFamily, readopts: &ReadOptions) -> DBRawIterator<'a> { - unsafe { - DBRawIterator { - inner: ffi::rocksdb_create_iterator_cf(db.inner, readopts.inner, cf_handle.inner), - db: PhantomData, - } - } - } - - /// Returns `true` if the iterator is valid. An iterator is invalidated when - /// it reaches the end of its defined range, or when it encounters an error. - /// - /// To check whether the iterator encountered an error after `valid` has - /// returned `false`, use the [`status`](DBRawIterator::status) method. `status` will never - /// return an error when `valid` is `true`. - pub fn valid(&self) -> bool { - unsafe { ffi::rocksdb_iter_valid(self.inner) != 0 } - } - - /// Returns an error `Result` if the iterator has encountered an error - /// during operation. When an error is encountered, the iterator is - /// invalidated and [`valid`](DBRawIterator::valid) will return `false` when called. - /// - /// Performing a seek will discard the current status. - pub fn status(&self) -> Result<(), Error> { - unsafe { - ffi_try!(ffi::rocksdb_iter_get_error(self.inner)); - } - Ok(()) - } - - /// Seeks to the first key in the database. - /// - /// # Examples - /// - /// ```rust - /// use rocksdb::{DB, Options}; - /// - /// let path = "_path_for_rocksdb_storage5"; - /// { - /// let db = DB::open_default(path).unwrap(); - /// let mut iter = db.raw_iterator(); - /// - /// // Iterate all keys from the start in lexicographic order - /// iter.seek_to_first(); - /// - /// while iter.valid() { - /// println!("{:?} {:?}", iter.key(), iter.value()); - /// iter.next(); - /// } - /// - /// // Read just the first key - /// iter.seek_to_first(); - /// - /// if iter.valid() { - /// println!("{:?} {:?}", iter.key(), iter.value()); - /// } else { - /// // There are no keys in the database - /// } - /// } - /// let _ = DB::destroy(&Options::default(), path); - /// ``` - pub fn seek_to_first(&mut self) { - unsafe { - ffi::rocksdb_iter_seek_to_first(self.inner); - } - } - - /// Seeks to the last key in the database. - /// - /// # Examples - /// - /// ```rust - /// use rocksdb::{DB, Options}; - /// - /// let path = "_path_for_rocksdb_storage6"; - /// { - /// let db = DB::open_default(path).unwrap(); - /// let mut iter = db.raw_iterator(); - /// - /// // Iterate all keys from the end in reverse lexicographic order - /// iter.seek_to_last(); - /// - /// while iter.valid() { - /// println!("{:?} {:?}", iter.key(), iter.value()); - /// iter.prev(); - /// } - /// - /// // Read just the last key - /// iter.seek_to_last(); - /// - /// if iter.valid() { - /// println!("{:?} {:?}", iter.key(), iter.value()); - /// } else { - /// // There are no keys in the database - /// } - /// } - /// let _ = DB::destroy(&Options::default(), path); - /// ``` - pub fn seek_to_last(&mut self) { - unsafe { - ffi::rocksdb_iter_seek_to_last(self.inner); - } - } - - /// Seeks to the specified key or the first key that lexicographically follows it. - /// - /// This method will attempt to seek to the specified key. If that key does not exist, it will - /// find and seek to the key that lexicographically follows it instead. - /// - /// # Examples - /// - /// ```rust - /// use rocksdb::{DB, Options}; - /// - /// let path = "_path_for_rocksdb_storage7"; - /// { - /// let db = DB::open_default(path).unwrap(); - /// let mut iter = db.raw_iterator(); - /// - /// // Read the first key that starts with 'a' - /// iter.seek(b"a"); - /// - /// if iter.valid() { - /// println!("{:?} {:?}", iter.key(), iter.value()); - /// } else { - /// // There are no keys in the database - /// } - /// } - /// let _ = DB::destroy(&Options::default(), path); - /// ``` - pub fn seek>(&mut self, key: K) { - let key = key.as_ref(); - - unsafe { - ffi::rocksdb_iter_seek( - self.inner, - key.as_ptr() as *const c_char, - key.len() as size_t, - ); - } - } - - /// Seeks to the specified key, or the first key that lexicographically precedes it. - /// - /// Like ``.seek()`` this method will attempt to seek to the specified key. - /// The difference with ``.seek()`` is that if the specified key do not exist, this method will - /// seek to key that lexicographically precedes it instead. - /// - /// # Examples - /// - /// ```rust - /// use rocksdb::{DB, Options}; - /// - /// let path = "_path_for_rocksdb_storage8"; - /// { - /// let db = DB::open_default(path).unwrap(); - /// let mut iter = db.raw_iterator(); - /// - /// // Read the last key that starts with 'a' - /// iter.seek_for_prev(b"b"); - /// - /// if iter.valid() { - /// println!("{:?} {:?}", iter.key(), iter.value()); - /// } else { - /// // There are no keys in the database - /// } - /// } - /// let _ = DB::destroy(&Options::default(), path); - /// ``` - pub fn seek_for_prev>(&mut self, key: K) { - let key = key.as_ref(); - - unsafe { - ffi::rocksdb_iter_seek_for_prev( - self.inner, - key.as_ptr() as *const c_char, - key.len() as size_t, - ); - } - } - - /// Seeks to the next key. - /// - /// Returns true if the iterator is valid after this operation. - pub fn next(&mut self) { - unsafe { - ffi::rocksdb_iter_next(self.inner); - } - } - - /// Seeks to the previous key. - /// - /// Returns true if the iterator is valid after this operation. - pub fn prev(&mut self) { - unsafe { - ffi::rocksdb_iter_prev(self.inner); - } - } - - /// Returns a slice of the current key. - pub fn key(&self) -> Option<&[u8]> { - if self.valid() { - // Safety Note: This is safe as all methods that may invalidate the buffer returned - // take `&mut self`, so borrow checker will prevent use of buffer after seek. - unsafe { - let mut key_len: size_t = 0; - let key_len_ptr: *mut size_t = &mut key_len; - let key_ptr = ffi::rocksdb_iter_key(self.inner, key_len_ptr) as *const c_uchar; - - Some(slice::from_raw_parts(key_ptr, key_len as usize)) - } - } else { - None - } - } - - /// Returns a slice of the current value. - pub fn value(&self) -> Option<&[u8]> { - if self.valid() { - // Safety Note: This is safe as all methods that may invalidate the buffer returned - // take `&mut self`, so borrow checker will prevent use of buffer after seek. - unsafe { - let mut val_len: size_t = 0; - let val_len_ptr: *mut size_t = &mut val_len; - let val_ptr = ffi::rocksdb_iter_value(self.inner, val_len_ptr) as *const c_uchar; - - Some(slice::from_raw_parts(val_ptr, val_len as usize)) - } - } else { - None - } - } -} - -impl<'a> Drop for DBRawIterator<'a> { - fn drop(&mut self) { - unsafe { - ffi::rocksdb_iter_destroy(self.inner); - } - } -} - -impl<'a> DBIterator<'a> { - fn new(db: &DB, readopts: &ReadOptions, mode: IteratorMode) -> DBIterator<'a> { - let mut rv = DBIterator { - raw: DBRawIterator::new(db, readopts), - direction: Direction::Forward, // blown away by set_mode() - just_seeked: false, - }; - rv.set_mode(mode); - rv - } - - fn new_cf( - db: &DB, - cf_handle: &ColumnFamily, - readopts: &ReadOptions, - mode: IteratorMode, - ) -> DBIterator<'a> { - let mut rv = DBIterator { - raw: DBRawIterator::new_cf(db, cf_handle, readopts), - direction: Direction::Forward, // blown away by set_mode() - just_seeked: false, - }; - rv.set_mode(mode); - rv - } - - pub fn set_mode(&mut self, mode: IteratorMode) { - match mode { - IteratorMode::Start => { - self.raw.seek_to_first(); - self.direction = Direction::Forward; - } - IteratorMode::End => { - self.raw.seek_to_last(); - self.direction = Direction::Reverse; - } - IteratorMode::From(key, Direction::Forward) => { - self.raw.seek(key); - self.direction = Direction::Forward; - } - IteratorMode::From(key, Direction::Reverse) => { - self.raw.seek_for_prev(key); - self.direction = Direction::Reverse; - } - }; - - self.just_seeked = true; - } - - /// See [`valid`](DBRawIterator::valid) - pub fn valid(&self) -> bool { - self.raw.valid() - } - - /// See [`status`](DBRawIterator::status) - pub fn status(&self) -> Result<(), Error> { - self.raw.status() - } -} - -impl<'a> Iterator for DBIterator<'a> { - type Item = KVBytes; - - fn next(&mut self) -> Option { - if !self.raw.valid() { - return None; - } - - // Initial call to next() after seeking should not move the iterator - // or the first item will not be returned - if !self.just_seeked { - match self.direction { - Direction::Forward => self.raw.next(), - Direction::Reverse => self.raw.prev(), - } - } else { - self.just_seeked = false; - } - - if self.raw.valid() { - // .key() and .value() only ever return None if valid == false, which we've just cheked - Some(( - Box::from(self.raw.key().unwrap()), - Box::from(self.raw.value().unwrap()), - )) - } else { - None - } - } -} - -impl<'a> Into> for DBIterator<'a> { - fn into(self) -> DBRawIterator<'a> { - self.raw - } -} - -impl<'a> Snapshot<'a> { - pub fn new(db: &DB) -> Snapshot { - let snapshot = unsafe { ffi::rocksdb_create_snapshot(db.inner) }; - Snapshot { - db, - inner: snapshot, - } - } - - pub fn iterator(&self, mode: IteratorMode) -> DBIterator<'a> { - let readopts = ReadOptions::default(); - self.iterator_opt(mode, readopts) - } - - pub fn iterator_cf(&self, cf_handle: &ColumnFamily, mode: IteratorMode) -> DBIterator { - let readopts = ReadOptions::default(); - self.iterator_cf_opt(cf_handle, readopts, mode) - } - - pub fn iterator_opt(&self, mode: IteratorMode, mut readopts: ReadOptions) -> DBIterator<'a> { - readopts.set_snapshot(self); - DBIterator::new(self.db, &readopts, mode) - } - - pub fn iterator_cf_opt( - &self, - cf_handle: &ColumnFamily, - mut readopts: ReadOptions, - mode: IteratorMode, - ) -> DBIterator { - readopts.set_snapshot(self); - DBIterator::new_cf(self.db, cf_handle, &readopts, mode) - } - - /// Opens a raw iterator over the data in this snapshot, using the default read options. - pub fn raw_iterator(&self) -> DBRawIterator { - let readopts = ReadOptions::default(); - self.raw_iterator_opt(readopts) - } - - /// Opens a raw iterator over the data in this snapshot under the given column family, using the default read options. - pub fn raw_iterator_cf(&self, cf_handle: &ColumnFamily) -> DBRawIterator { - let readopts = ReadOptions::default(); - self.raw_iterator_cf_opt(cf_handle, readopts) - } - - /// Opens a raw iterator over the data in this snapshot, using the given read options. - pub fn raw_iterator_opt(&self, mut readopts: ReadOptions) -> DBRawIterator { - readopts.set_snapshot(self); - DBRawIterator::new(self.db, &readopts) - } - - /// Opens a raw iterator over the data in this snapshot under the given column family, using the given read options. - pub fn raw_iterator_cf_opt( - &self, - cf_handle: &ColumnFamily, - mut readopts: ReadOptions, - ) -> DBRawIterator { - readopts.set_snapshot(self); - DBRawIterator::new_cf(self.db, cf_handle, &readopts) - } - - pub fn get>(&self, key: K) -> Result>, Error> { - let readopts = ReadOptions::default(); - self.get_opt(key, readopts) - } - - pub fn get_cf>( - &self, - cf: &ColumnFamily, - key: K, - ) -> Result>, Error> { - let readopts = ReadOptions::default(); - self.get_cf_opt(cf, key.as_ref(), readopts) - } - - pub fn get_opt>( - &self, - key: K, - mut readopts: ReadOptions, - ) -> Result>, Error> { - readopts.set_snapshot(self); - self.db.get_opt(key.as_ref(), &readopts) - } - - pub fn get_cf_opt>( - &self, - cf: &ColumnFamily, - key: K, - mut readopts: ReadOptions, - ) -> Result>, Error> { - readopts.set_snapshot(self); - self.db.get_cf_opt(cf, key.as_ref(), &readopts) - } +/// See crate level documentation for a simple usage example. +pub struct DB { + pub(crate) inner: *mut ffi::rocksdb_t, + cfs: BTreeMap, + path: PathBuf, } -impl<'a> Drop for Snapshot<'a> { - fn drop(&mut self) { - unsafe { - ffi::rocksdb_release_snapshot(self.db.inner, self.inner); - } - } -} +// Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI +// pointer. In most cases, however, this pointer is Send-safe because it is never aliased and +// rocksdb internally does not rely on thread-local information for its user-exposed types. +unsafe impl Send for DB {} -impl ColumnFamilyDescriptor { - // Create a new column family descriptor with the specified name and options. - pub fn new(name: S, options: Options) -> Self - where - S: Into, - { - ColumnFamilyDescriptor { - name: name.into(), - options, - } - } -} +// Sync is similarly safe for many types because they do not expose interior mutability, and their +// use within the rocksdb library is generally behind a const reference +unsafe impl Sync for DB {} impl DB { /// Open a database with default options. @@ -1598,248 +906,6 @@ impl DB { } } -/// Receives the puts and deletes of a write batch. -/// -/// The application must provide an implementation of this trait when -/// iterating the operations within a `WriteBatch` -pub trait WriteBatchIterator { - /// Called with a key and value that were `put` into the batch. - fn put(&mut self, key: Box<[u8]>, value: Box<[u8]>); - /// Called with a key that was `delete`d from the batch. - fn delete(&mut self, key: Box<[u8]>); -} - -unsafe extern "C" fn writebatch_put_callback( - state: *mut c_void, - k: *const c_char, - klen: usize, - v: *const c_char, - vlen: usize, -) { - // coerce the raw pointer back into a box, but "leak" it so we prevent - // freeing the resource before we are done with it - let boxed_cb = Box::from_raw(state as *mut &mut dyn WriteBatchIterator); - let leaked_cb = Box::leak(boxed_cb); - let key = slice::from_raw_parts(k as *const u8, klen as usize); - let value = slice::from_raw_parts(v as *const u8, vlen as usize); - leaked_cb.put( - key.to_vec().into_boxed_slice(), - value.to_vec().into_boxed_slice(), - ); -} - -unsafe extern "C" fn writebatch_delete_callback(state: *mut c_void, k: *const c_char, klen: usize) { - // coerce the raw pointer back into a box, but "leak" it so we prevent - // freeing the resource before we are done with it - let boxed_cb = Box::from_raw(state as *mut &mut dyn WriteBatchIterator); - let leaked_cb = Box::leak(boxed_cb); - let key = slice::from_raw_parts(k as *const u8, klen as usize); - leaked_cb.delete(key.to_vec().into_boxed_slice()); -} - -impl WriteBatch { - pub fn len(&self) -> usize { - unsafe { ffi::rocksdb_writebatch_count(self.inner) as usize } - } - - /// Return WriteBatch serialized size (in bytes). - pub fn size_in_bytes(&self) -> usize { - unsafe { - let mut batch_size: size_t = 0; - ffi::rocksdb_writebatch_data(self.inner, &mut batch_size); - batch_size as usize - } - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Iterate the put and delete operations within this write batch. Note that - /// this does _not_ return an `Iterator` but instead will invoke the `put()` - /// and `delete()` member functions of the provided `WriteBatchIterator` - /// trait implementation. - pub fn iterate(&self, callbacks: &mut dyn WriteBatchIterator) { - let state = Box::into_raw(Box::new(callbacks)); - unsafe { - ffi::rocksdb_writebatch_iterate( - self.inner, - state as *mut c_void, - Some(writebatch_put_callback), - Some(writebatch_delete_callback), - ); - // we must manually set the raw box free since there is no - // associated "destroy" callback for this object - Box::from_raw(state); - } - } - - /// Insert a value into the database under the given key. - pub fn put(&mut self, key: K, value: V) - where - K: AsRef<[u8]>, - V: AsRef<[u8]>, - { - let key = key.as_ref(); - let value = value.as_ref(); - - unsafe { - ffi::rocksdb_writebatch_put( - self.inner, - key.as_ptr() as *const c_char, - key.len() as size_t, - value.as_ptr() as *const c_char, - value.len() as size_t, - ); - } - } - - pub fn put_cf(&mut self, cf: &ColumnFamily, key: K, value: V) - where - K: AsRef<[u8]>, - V: AsRef<[u8]>, - { - let key = key.as_ref(); - let value = value.as_ref(); - - unsafe { - ffi::rocksdb_writebatch_put_cf( - self.inner, - cf.inner, - key.as_ptr() as *const c_char, - key.len() as size_t, - value.as_ptr() as *const c_char, - value.len() as size_t, - ); - } - } - - pub fn merge(&mut self, key: K, value: V) - where - K: AsRef<[u8]>, - V: AsRef<[u8]>, - { - let key = key.as_ref(); - let value = value.as_ref(); - - unsafe { - ffi::rocksdb_writebatch_merge( - self.inner, - key.as_ptr() as *const c_char, - key.len() as size_t, - value.as_ptr() as *const c_char, - value.len() as size_t, - ); - } - } - - pub fn merge_cf(&mut self, cf: &ColumnFamily, key: K, value: V) - where - K: AsRef<[u8]>, - V: AsRef<[u8]>, - { - let key = key.as_ref(); - let value = value.as_ref(); - - unsafe { - ffi::rocksdb_writebatch_merge_cf( - self.inner, - cf.inner, - key.as_ptr() as *const c_char, - key.len() as size_t, - value.as_ptr() as *const c_char, - value.len() as size_t, - ); - } - } - - /// Removes the database entry for key. Does nothing if the key was not found. - pub fn delete>(&mut self, key: K) { - let key = key.as_ref(); - - unsafe { - ffi::rocksdb_writebatch_delete( - self.inner, - key.as_ptr() as *const c_char, - key.len() as size_t, - ); - } - } - - pub fn delete_cf>(&mut self, cf: &ColumnFamily, key: K) { - let key = key.as_ref(); - - unsafe { - ffi::rocksdb_writebatch_delete_cf( - self.inner, - cf.inner, - key.as_ptr() as *const c_char, - key.len() as size_t, - ); - } - } - - /// Remove database entries from start key to end key. - /// - /// Removes the database entries in the range ["begin_key", "end_key"), i.e., - /// including "begin_key" and excluding "end_key". It is not an error if no - /// keys exist in the range ["begin_key", "end_key"). - pub fn delete_range>(&mut self, from: K, to: K) { - let (start_key, end_key) = (from.as_ref(), to.as_ref()); - - unsafe { - ffi::rocksdb_writebatch_delete_range( - self.inner, - start_key.as_ptr() as *const c_char, - start_key.len() as size_t, - end_key.as_ptr() as *const c_char, - end_key.len() as size_t, - ); - } - } - - /// Remove database entries in column family from start key to end key. - /// - /// Removes the database entries in the range ["begin_key", "end_key"), i.e., - /// including "begin_key" and excluding "end_key". It is not an error if no - /// keys exist in the range ["begin_key", "end_key"). - pub fn delete_range_cf>(&mut self, cf: &ColumnFamily, from: K, to: K) { - let (start_key, end_key) = (from.as_ref(), to.as_ref()); - - unsafe { - ffi::rocksdb_writebatch_delete_range_cf( - self.inner, - cf.inner, - start_key.as_ptr() as *const c_char, - start_key.len() as size_t, - end_key.as_ptr() as *const c_char, - end_key.len() as size_t, - ); - } - } - - /// Clear all updates buffered in this batch. - pub fn clear(&mut self) { - unsafe { - ffi::rocksdb_writebatch_clear(self.inner); - } - } -} - -impl Default for WriteBatch { - fn default() -> WriteBatch { - WriteBatch { - inner: unsafe { ffi::rocksdb_writebatch_create() }, - } - } -} - -impl Drop for WriteBatch { - fn drop(&mut self) { - unsafe { ffi::rocksdb_writebatch_destroy(self.inner) } - } -} - impl Drop for DB { fn drop(&mut self) { unsafe { @@ -1857,181 +923,6 @@ impl fmt::Debug for DB { } } -impl Drop for ReadOptions { - fn drop(&mut self) { - unsafe { ffi::rocksdb_readoptions_destroy(self.inner) } - } -} - -impl ReadOptions { - // TODO add snapshot setting here - // TODO add snapshot wrapper structs with proper destructors; - // that struct needs an "iterator" impl too. - #[allow(dead_code)] - fn fill_cache(&mut self, v: bool) { - unsafe { - ffi::rocksdb_readoptions_set_fill_cache(self.inner, v as c_uchar); - } - } - - fn set_snapshot(&mut self, snapshot: &Snapshot) { - unsafe { - ffi::rocksdb_readoptions_set_snapshot(self.inner, snapshot.inner); - } - } - - /// Set the upper bound for an iterator. - /// The upper bound itself is not included on the iteration result. - /// - /// # Safety - /// - /// This function will store a clone of key and will give a raw pointer of it to the - /// underlying C++ API, therefore, when given to any other [`DB`] method you must ensure - /// that this [`ReadOptions`] value does not leave the scope too early (e.g. `DB::iterator_cf_opt`). - pub unsafe fn set_iterate_upper_bound>(&mut self, key: K) { - let key = key.as_ref(); - ffi::rocksdb_readoptions_set_iterate_upper_bound( - self.inner, - key.as_ptr() as *const c_char, - key.len() as size_t, - ); - } - - pub fn set_prefix_same_as_start(&mut self, v: bool) { - unsafe { ffi::rocksdb_readoptions_set_prefix_same_as_start(self.inner, v as c_uchar) } - } - - pub fn set_total_order_seek(&mut self, v: bool) { - unsafe { ffi::rocksdb_readoptions_set_total_order_seek(self.inner, v as c_uchar) } - } - - /// If true, all data read from underlying storage will be - /// verified against corresponding checksums. - /// - /// Default: true - pub fn set_verify_checksums(&mut self, v: bool) { - unsafe { - ffi::rocksdb_readoptions_set_verify_checksums(self.inner, v as c_uchar); - } - } - - /// If non-zero, an iterator will create a new table reader which - /// performs reads of the given size. Using a large size (> 2MB) can - /// improve the performance of forward iteration on spinning disks. - /// Default: 0 - /// - /// ``` - /// use rocksdb::{ReadOptions}; - /// - /// let mut opts = ReadOptions::default(); - /// opts.set_readahead_size(4_194_304); // 4mb - /// ``` - pub fn set_readahead_size(&mut self, v: usize) { - unsafe { - ffi::rocksdb_readoptions_set_readahead_size(self.inner, v as size_t); - } - } - - /// If true, create a tailing iterator. Note that tailing iterators - /// only support moving in the forward direction. Iterating in reverse - /// or seek_to_last are not supported. - pub fn set_tailing(&mut self, v: bool) { - unsafe { - ffi::rocksdb_readoptions_set_tailing(self.inner, v as c_uchar); - } - } -} - -impl Default for ReadOptions { - fn default() -> ReadOptions { - unsafe { - ReadOptions { - inner: ffi::rocksdb_readoptions_create(), - } - } - } -} - -// Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI -// pointer. In most cases, however, this pointer is Send-safe because it is never aliased and -// rocksdb internally does not rely on thread-local information for its user-exposed types. -unsafe impl<'a> Send for DBRawIterator<'a> {} -unsafe impl Send for ReadOptions {} - -// Sync is similarly safe for many types because they do not expose interior mutability, and their -// use within the rocksdb library is generally behind a const reference -unsafe impl<'a> Sync for DBRawIterator<'a> {} -unsafe impl Sync for ReadOptions {} - -fn to_cpath>(path: P) -> Result { - match CString::new(path.as_ref().to_string_lossy().as_bytes()) { - Ok(c) => Ok(c), - Err(e) => Err(Error::new(format!( - "Failed to convert path to CString: {}", - e, - ))), - } -} - -/// Wrapper around RocksDB PinnableSlice struct. -/// -/// With a pinnable slice, we can directly leverage in-memory data within -/// RocksDB to avoid unnecessary memory copies. The struct here wraps the -/// returned raw pointer and ensures proper finalization work. -pub struct DBPinnableSlice<'a> { - ptr: *mut ffi::rocksdb_pinnableslice_t, - db: PhantomData<&'a DB>, -} - -// Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI -// pointer. In most cases, however, this pointer is Send-safe because it is never aliased and -// rocksdb internally does not rely on thread-local information for its user-exposed types. -unsafe impl<'a> Send for DBPinnableSlice<'a> {} - -// Sync is similarly safe for many types because they do not expose interior mutability, and their -// use within the rocksdb library is generally behind a const reference -unsafe impl<'a> Sync for DBPinnableSlice<'a> {} - -impl<'a> AsRef<[u8]> for DBPinnableSlice<'a> { - fn as_ref(&self) -> &[u8] { - // Implement this via Deref so as not to repeat ourselves - &*self - } -} - -impl<'a> Deref for DBPinnableSlice<'a> { - type Target = [u8]; - - fn deref(&self) -> &[u8] { - unsafe { - let mut val_len: size_t = 0; - let val = ffi::rocksdb_pinnableslice_value(self.ptr, &mut val_len) as *mut u8; - slice::from_raw_parts(val, val_len) - } - } -} - -impl<'a> Drop for DBPinnableSlice<'a> { - fn drop(&mut self) { - unsafe { - ffi::rocksdb_pinnableslice_destroy(self.ptr); - } - } -} - -impl<'a> DBPinnableSlice<'a> { - /// Used to wrap a PinnableSlice from rocksdb to avoid unnecessary memcpy - /// - /// # Unsafe - /// Requires that the pointer must be generated by rocksdb_get_pinned - unsafe fn from_c(ptr: *mut ffi::rocksdb_pinnableslice_t) -> DBPinnableSlice<'a> { - DBPinnableSlice { - ptr, - db: PhantomData, - } - } -} - #[test] fn external() { let path = "_rust_rocksdb_externaltest"; diff --git a/src/db_iterator.rs b/src/db_iterator.rs new file mode 100644 index 0000000..f539e78 --- /dev/null +++ b/src/db_iterator.rs @@ -0,0 +1,541 @@ +// Copyright 2020 Tyler Neely +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::{ffi, ColumnFamily, Error, ReadOptions, WriteBatch, DB}; +use libc::{c_char, c_uchar, size_t}; +use std::marker::PhantomData; +use std::slice; + +/// An iterator over a database or column family, with specifiable +/// ranges and direction. +/// +/// This iterator is different to the standard ``DBIterator`` as it aims Into +/// replicate the underlying iterator API within RocksDB itself. This should +/// give access to more performance and flexibility but departs from the +/// widely recognised Rust idioms. +/// +/// ``` +/// use rocksdb::{DB, Options}; +/// +/// let path = "_path_for_rocksdb_storage4"; +/// { +/// let db = DB::open_default(path).unwrap(); +/// let mut iter = db.raw_iterator(); +/// +/// // Forwards iteration +/// iter.seek_to_first(); +/// while iter.valid() { +/// println!("Saw {:?} {:?}", iter.key(), iter.value()); +/// iter.next(); +/// } +/// +/// // Reverse iteration +/// iter.seek_to_last(); +/// while iter.valid() { +/// println!("Saw {:?} {:?}", iter.key(), iter.value()); +/// iter.prev(); +/// } +/// +/// // Seeking +/// iter.seek(b"my key"); +/// while iter.valid() { +/// println!("Saw {:?} {:?}", iter.key(), iter.value()); +/// iter.next(); +/// } +/// +/// // Reverse iteration from key +/// // Note, use seek_for_prev when reversing because if this key doesn't exist, +/// // this will make the iterator start from the previous key rather than the next. +/// iter.seek_for_prev(b"my key"); +/// while iter.valid() { +/// println!("Saw {:?} {:?}", iter.key(), iter.value()); +/// iter.prev(); +/// } +/// } +/// let _ = DB::destroy(&Options::default(), path); +/// ``` +pub struct DBRawIterator<'a> { + inner: *mut ffi::rocksdb_iterator_t, + db: PhantomData<&'a DB>, +} + +impl<'a> DBRawIterator<'a> { + pub(crate) fn new(db: &DB, readopts: &ReadOptions) -> DBRawIterator<'a> { + unsafe { + DBRawIterator { + inner: ffi::rocksdb_create_iterator(db.inner, readopts.inner), + db: PhantomData, + } + } + } + + pub(crate) fn new_cf( + db: &DB, + cf_handle: &ColumnFamily, + readopts: &ReadOptions, + ) -> DBRawIterator<'a> { + unsafe { + DBRawIterator { + inner: ffi::rocksdb_create_iterator_cf(db.inner, readopts.inner, cf_handle.inner), + db: PhantomData, + } + } + } + + /// Returns `true` if the iterator is valid. An iterator is invalidated when + /// it reaches the end of its defined range, or when it encounters an error. + /// + /// To check whether the iterator encountered an error after `valid` has + /// returned `false`, use the [`status`](DBRawIterator::status) method. `status` will never + /// return an error when `valid` is `true`. + pub fn valid(&self) -> bool { + unsafe { ffi::rocksdb_iter_valid(self.inner) != 0 } + } + + /// Returns an error `Result` if the iterator has encountered an error + /// during operation. When an error is encountered, the iterator is + /// invalidated and [`valid`](DBRawIterator::valid) will return `false` when called. + /// + /// Performing a seek will discard the current status. + pub fn status(&self) -> Result<(), Error> { + unsafe { + ffi_try!(ffi::rocksdb_iter_get_error(self.inner)); + } + Ok(()) + } + + /// Seeks to the first key in the database. + /// + /// # Examples + /// + /// ```rust + /// use rocksdb::{DB, Options}; + /// + /// let path = "_path_for_rocksdb_storage5"; + /// { + /// let db = DB::open_default(path).unwrap(); + /// let mut iter = db.raw_iterator(); + /// + /// // Iterate all keys from the start in lexicographic order + /// iter.seek_to_first(); + /// + /// while iter.valid() { + /// println!("{:?} {:?}", iter.key(), iter.value()); + /// iter.next(); + /// } + /// + /// // Read just the first key + /// iter.seek_to_first(); + /// + /// if iter.valid() { + /// println!("{:?} {:?}", iter.key(), iter.value()); + /// } else { + /// // There are no keys in the database + /// } + /// } + /// let _ = DB::destroy(&Options::default(), path); + /// ``` + pub fn seek_to_first(&mut self) { + unsafe { + ffi::rocksdb_iter_seek_to_first(self.inner); + } + } + + /// Seeks to the last key in the database. + /// + /// # Examples + /// + /// ```rust + /// use rocksdb::{DB, Options}; + /// + /// let path = "_path_for_rocksdb_storage6"; + /// { + /// let db = DB::open_default(path).unwrap(); + /// let mut iter = db.raw_iterator(); + /// + /// // Iterate all keys from the end in reverse lexicographic order + /// iter.seek_to_last(); + /// + /// while iter.valid() { + /// println!("{:?} {:?}", iter.key(), iter.value()); + /// iter.prev(); + /// } + /// + /// // Read just the last key + /// iter.seek_to_last(); + /// + /// if iter.valid() { + /// println!("{:?} {:?}", iter.key(), iter.value()); + /// } else { + /// // There are no keys in the database + /// } + /// } + /// let _ = DB::destroy(&Options::default(), path); + /// ``` + pub fn seek_to_last(&mut self) { + unsafe { + ffi::rocksdb_iter_seek_to_last(self.inner); + } + } + + /// Seeks to the specified key or the first key that lexicographically follows it. + /// + /// This method will attempt to seek to the specified key. If that key does not exist, it will + /// find and seek to the key that lexicographically follows it instead. + /// + /// # Examples + /// + /// ```rust + /// use rocksdb::{DB, Options}; + /// + /// let path = "_path_for_rocksdb_storage7"; + /// { + /// let db = DB::open_default(path).unwrap(); + /// let mut iter = db.raw_iterator(); + /// + /// // Read the first key that starts with 'a' + /// iter.seek(b"a"); + /// + /// if iter.valid() { + /// println!("{:?} {:?}", iter.key(), iter.value()); + /// } else { + /// // There are no keys in the database + /// } + /// } + /// let _ = DB::destroy(&Options::default(), path); + /// ``` + pub fn seek>(&mut self, key: K) { + let key = key.as_ref(); + + unsafe { + ffi::rocksdb_iter_seek( + self.inner, + key.as_ptr() as *const c_char, + key.len() as size_t, + ); + } + } + + /// Seeks to the specified key, or the first key that lexicographically precedes it. + /// + /// Like ``.seek()`` this method will attempt to seek to the specified key. + /// The difference with ``.seek()`` is that if the specified key do not exist, this method will + /// seek to key that lexicographically precedes it instead. + /// + /// # Examples + /// + /// ```rust + /// use rocksdb::{DB, Options}; + /// + /// let path = "_path_for_rocksdb_storage8"; + /// { + /// let db = DB::open_default(path).unwrap(); + /// let mut iter = db.raw_iterator(); + /// + /// // Read the last key that starts with 'a' + /// iter.seek_for_prev(b"b"); + /// + /// if iter.valid() { + /// println!("{:?} {:?}", iter.key(), iter.value()); + /// } else { + /// // There are no keys in the database + /// } + /// } + /// let _ = DB::destroy(&Options::default(), path); + /// ``` + pub fn seek_for_prev>(&mut self, key: K) { + let key = key.as_ref(); + + unsafe { + ffi::rocksdb_iter_seek_for_prev( + self.inner, + key.as_ptr() as *const c_char, + key.len() as size_t, + ); + } + } + + /// Seeks to the next key. + /// + /// Returns true if the iterator is valid after this operation. + pub fn next(&mut self) { + unsafe { + ffi::rocksdb_iter_next(self.inner); + } + } + + /// Seeks to the previous key. + /// + /// Returns true if the iterator is valid after this operation. + pub fn prev(&mut self) { + unsafe { + ffi::rocksdb_iter_prev(self.inner); + } + } + + /// Returns a slice of the current key. + pub fn key(&self) -> Option<&[u8]> { + if self.valid() { + // Safety Note: This is safe as all methods that may invalidate the buffer returned + // take `&mut self`, so borrow checker will prevent use of buffer after seek. + unsafe { + let mut key_len: size_t = 0; + let key_len_ptr: *mut size_t = &mut key_len; + let key_ptr = ffi::rocksdb_iter_key(self.inner, key_len_ptr) as *const c_uchar; + + Some(slice::from_raw_parts(key_ptr, key_len as usize)) + } + } else { + None + } + } + + /// Returns a slice of the current value. + pub fn value(&self) -> Option<&[u8]> { + if self.valid() { + // Safety Note: This is safe as all methods that may invalidate the buffer returned + // take `&mut self`, so borrow checker will prevent use of buffer after seek. + unsafe { + let mut val_len: size_t = 0; + let val_len_ptr: *mut size_t = &mut val_len; + let val_ptr = ffi::rocksdb_iter_value(self.inner, val_len_ptr) as *const c_uchar; + + Some(slice::from_raw_parts(val_ptr, val_len as usize)) + } + } else { + None + } + } +} + +impl<'a> Drop for DBRawIterator<'a> { + fn drop(&mut self) { + unsafe { + ffi::rocksdb_iter_destroy(self.inner); + } + } +} + +unsafe impl<'a> Send for DBRawIterator<'a> {} +unsafe impl<'a> Sync for DBRawIterator<'a> {} + +/// An iterator over a database or column family, with specifiable +/// ranges and direction. +/// +/// ``` +/// use rocksdb::{DB, Direction, IteratorMode, Options}; +/// +/// let path = "_path_for_rocksdb_storage2"; +/// { +/// let db = DB::open_default(path).unwrap(); +/// let mut iter = db.iterator(IteratorMode::Start); // Always iterates forward +/// for (key, value) in iter { +/// println!("Saw {:?} {:?}", key, value); +/// } +/// iter = db.iterator(IteratorMode::End); // Always iterates backward +/// for (key, value) in iter { +/// println!("Saw {:?} {:?}", key, value); +/// } +/// iter = db.iterator(IteratorMode::From(b"my key", Direction::Forward)); // From a key in Direction::{forward,reverse} +/// for (key, value) in iter { +/// println!("Saw {:?} {:?}", key, value); +/// } +/// +/// // You can seek with an existing Iterator instance, too +/// iter = db.iterator(IteratorMode::Start); +/// iter.set_mode(IteratorMode::From(b"another key", Direction::Reverse)); +/// for (key, value) in iter { +/// println!("Saw {:?} {:?}", key, value); +/// } +/// } +/// let _ = DB::destroy(&Options::default(), path); +/// ``` +pub struct DBIterator<'a> { + raw: DBRawIterator<'a>, + direction: Direction, + just_seeked: bool, +} + +pub enum Direction { + Forward, + Reverse, +} + +pub type KVBytes = (Box<[u8]>, Box<[u8]>); + +pub enum IteratorMode<'a> { + Start, + End, + From(&'a [u8], Direction), +} + +impl<'a> DBIterator<'a> { + pub(crate) fn new(db: &DB, readopts: &ReadOptions, mode: IteratorMode) -> DBIterator<'a> { + let mut rv = DBIterator { + raw: DBRawIterator::new(db, readopts), + direction: Direction::Forward, // blown away by set_mode() + just_seeked: false, + }; + rv.set_mode(mode); + rv + } + + pub(crate) fn new_cf( + db: &DB, + cf_handle: &ColumnFamily, + readopts: &ReadOptions, + mode: IteratorMode, + ) -> DBIterator<'a> { + let mut rv = DBIterator { + raw: DBRawIterator::new_cf(db, cf_handle, readopts), + direction: Direction::Forward, // blown away by set_mode() + just_seeked: false, + }; + rv.set_mode(mode); + rv + } + + pub fn set_mode(&mut self, mode: IteratorMode) { + match mode { + IteratorMode::Start => { + self.raw.seek_to_first(); + self.direction = Direction::Forward; + } + IteratorMode::End => { + self.raw.seek_to_last(); + self.direction = Direction::Reverse; + } + IteratorMode::From(key, Direction::Forward) => { + self.raw.seek(key); + self.direction = Direction::Forward; + } + IteratorMode::From(key, Direction::Reverse) => { + self.raw.seek_for_prev(key); + self.direction = Direction::Reverse; + } + }; + + self.just_seeked = true; + } + + /// See [`valid`](DBRawIterator::valid) + pub fn valid(&self) -> bool { + self.raw.valid() + } + + /// See [`status`](DBRawIterator::status) + pub fn status(&self) -> Result<(), Error> { + self.raw.status() + } +} + +impl<'a> Iterator for DBIterator<'a> { + type Item = KVBytes; + + fn next(&mut self) -> Option { + if !self.raw.valid() { + return None; + } + + // Initial call to next() after seeking should not move the iterator + // or the first item will not be returned + if !self.just_seeked { + match self.direction { + Direction::Forward => self.raw.next(), + Direction::Reverse => self.raw.prev(), + } + } else { + self.just_seeked = false; + } + + if self.raw.valid() { + // .key() and .value() only ever return None if valid == false, which we've just cheked + Some(( + Box::from(self.raw.key().unwrap()), + Box::from(self.raw.value().unwrap()), + )) + } else { + None + } + } +} + +impl<'a> Into> for DBIterator<'a> { + fn into(self) -> DBRawIterator<'a> { + self.raw + } +} + +/// Iterates the batches of writes since a given sequence number. +/// +/// `DBWALIterator` is returned by `DB::get_updates_since()` and will return the +/// batches of write operations that have occurred since a given sequence number +/// (see `DB::latest_sequence_number()`). This iterator cannot be constructed by +/// the application. +/// +/// The iterator item type is a tuple of (`u64`, `WriteBatch`) where the first +/// value is the sequence number of the associated write batch. +/// +pub struct DBWALIterator { + pub(crate) inner: *mut ffi::rocksdb_wal_iterator_t, +} + +impl DBWALIterator { + /// Returns `true` if the iterator is valid. An iterator is invalidated when + /// it reaches the end of its defined range, or when it encounters an error. + /// + /// To check whether the iterator encountered an error after `valid` has + /// returned `false`, use the [`status`](DBWALIterator::status) method. + /// `status` will never return an error when `valid` is `true`. + pub fn valid(&self) -> bool { + unsafe { ffi::rocksdb_wal_iter_valid(self.inner) != 0 } + } + + /// Returns an error `Result` if the iterator has encountered an error + /// during operation. When an error is encountered, the iterator is + /// invalidated and [`valid`](DBWALIterator::valid) will return `false` when + /// called. + pub fn status(&self) -> Result<(), Error> { + unsafe { + ffi_try!(ffi::rocksdb_wal_iter_status(self.inner)); + } + Ok(()) + } +} + +impl Iterator for DBWALIterator { + type Item = (u64, WriteBatch); + + fn next(&mut self) -> Option<(u64, WriteBatch)> { + // Seek to the next write batch. + unsafe { + ffi::rocksdb_wal_iter_next(self.inner); + } + if self.valid() { + let mut seq: u64 = 0; + let inner = unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) }; + Some((seq, WriteBatch { inner })) + } else { + None + } + } +} + +impl Drop for DBWALIterator { + fn drop(&mut self) { + unsafe { + ffi::rocksdb_wal_iter_destroy(self.inner); + } + } +} diff --git a/src/db_options.rs b/src/db_options.rs index 91c014d..6109ba0 100644 --- a/src/db_options.rs +++ b/src/db_options.rs @@ -16,7 +16,7 @@ use std::ffi::{CStr, CString}; use std::mem; use std::path::Path; -use libc::{self, c_int, c_uchar, c_uint, c_void, size_t}; +use libc::{self, c_char, c_int, c_uchar, c_uint, c_void, size_t}; use crate::{ compaction_filter::{self, filter_callback, CompactionFilterCallback, CompactionFilterFn}, @@ -26,25 +26,126 @@ use crate::{ self, full_merge_callback, partial_merge_callback, MergeFn, MergeOperatorCallback, }, slice_transform::SliceTransform, - BlockBasedIndexType, BlockBasedOptions, DBCompactionStyle, DBCompressionType, DBRecoveryMode, - FlushOptions, MemtableFactory, Options, PlainTableFactoryOptions, WriteOptions, + Snapshot, }; pub fn new_cache(capacity: size_t) -> *mut ffi::rocksdb_cache_t { unsafe { ffi::rocksdb_cache_create_lru(capacity) } } +/// Database-wide options around performance and behavior. +/// +/// Please read [the official tuning guide](https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide), and most importantly, measure performance under realistic workloads with realistic hardware. +/// +/// # Examples +/// +/// ``` +/// use rocksdb::{Options, DB}; +/// use rocksdb::DBCompactionStyle; +/// +/// fn badly_tuned_for_somebody_elses_disk() -> DB { +/// let path = "path/for/rocksdb/storageX"; +/// let mut opts = Options::default(); +/// opts.create_if_missing(true); +/// opts.set_max_open_files(10000); +/// opts.set_use_fsync(false); +/// opts.set_bytes_per_sync(8388608); +/// opts.optimize_for_point_lookup(1024); +/// opts.set_table_cache_num_shard_bits(6); +/// opts.set_max_write_buffer_number(32); +/// opts.set_write_buffer_size(536870912); +/// opts.set_target_file_size_base(1073741824); +/// opts.set_min_write_buffer_number_to_merge(4); +/// opts.set_level_zero_stop_writes_trigger(2000); +/// opts.set_level_zero_slowdown_writes_trigger(0); +/// opts.set_compaction_style(DBCompactionStyle::Universal); +/// opts.set_max_background_compactions(4); +/// opts.set_max_background_flushes(4); +/// opts.set_disable_auto_compactions(true); +/// +/// DB::open(&opts, path).unwrap() +/// } +/// ``` +pub struct Options { + pub(crate) inner: *mut ffi::rocksdb_options_t, +} + +/// Optionally disable WAL or sync for this write. +/// +/// # Examples +/// +/// Making an unsafe write of a batch: +/// +/// ``` +/// use rocksdb::{DB, Options, WriteBatch, WriteOptions}; +/// +/// let path = "_path_for_rocksdb_storageY"; +/// { +/// let db = DB::open_default(path).unwrap(); +/// let mut batch = WriteBatch::default(); +/// batch.put(b"my key", b"my value"); +/// batch.put(b"key2", b"value2"); +/// batch.put(b"key3", b"value3"); +/// +/// let mut write_options = WriteOptions::default(); +/// write_options.set_sync(false); +/// write_options.disable_wal(true); +/// +/// db.write_opt(batch, &write_options); +/// } +/// let _ = DB::destroy(&Options::default(), path); +/// ``` +pub struct WriteOptions { + pub(crate) inner: *mut ffi::rocksdb_writeoptions_t, +} + +/// Optionally wait for the memtable flush to be performed. +/// +/// # Examples +/// +/// Manually flushing the memtable: +/// +/// ``` +/// use rocksdb::{DB, Options, FlushOptions}; +/// +/// let path = "_path_for_rocksdb_storageY"; +/// { +/// let db = DB::open_default(path).unwrap(); +/// +/// let mut flush_options = FlushOptions::default(); +/// flush_options.set_wait(true); +/// +/// db.flush_opt(&flush_options); +/// } +/// let _ = DB::destroy(&Options::default(), path); +/// ``` +pub struct FlushOptions { + pub(crate) inner: *mut ffi::rocksdb_flushoptions_t, +} + +/// For configuring block-based file storage. +pub struct BlockBasedOptions { + pub(crate) inner: *mut ffi::rocksdb_block_based_table_options_t, +} + +pub struct ReadOptions { + pub(crate) inner: *mut ffi::rocksdb_readoptions_t, +} + // Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI // pointer. In most cases, however, this pointer is Send-safe because it is never aliased and // rocksdb internally does not rely on thread-local information for its user-exposed types. unsafe impl Send for Options {} unsafe impl Send for WriteOptions {} unsafe impl Send for BlockBasedOptions {} +unsafe impl Send for ReadOptions {} + // Sync is similarly safe for many types because they do not expose interior mutability, and their // use within the rocksdb library is generally behind a const reference unsafe impl Sync for Options {} unsafe impl Sync for WriteOptions {} unsafe impl Sync for BlockBasedOptions {} +unsafe impl Sync for ReadOptions {} impl Drop for Options { fn drop(&mut self) { @@ -78,6 +179,12 @@ impl Drop for WriteOptions { } } +impl Drop for ReadOptions { + fn drop(&mut self) { + unsafe { ffi::rocksdb_readoptions_destroy(self.inner) } + } +} + impl BlockBasedOptions { pub fn set_block_size(&mut self, size: usize) { unsafe { @@ -1488,6 +1595,164 @@ impl Default for WriteOptions { } } +impl ReadOptions { + // TODO add snapshot setting here + // TODO add snapshot wrapper structs with proper destructors; + // that struct needs an "iterator" impl too. + #[allow(dead_code)] + fn fill_cache(&mut self, v: bool) { + unsafe { + ffi::rocksdb_readoptions_set_fill_cache(self.inner, v as c_uchar); + } + } + + pub(crate) fn set_snapshot(&mut self, snapshot: &Snapshot) { + unsafe { + ffi::rocksdb_readoptions_set_snapshot(self.inner, snapshot.inner); + } + } + + /// Set the upper bound for an iterator. + /// The upper bound itself is not included on the iteration result. + /// + /// # Safety + /// + /// This function will store a clone of key and will give a raw pointer of it to the + /// underlying C++ API, therefore, when given to any other [`DB`] method you must ensure + /// that this [`ReadOptions`] value does not leave the scope too early (e.g. `DB::iterator_cf_opt`). + pub unsafe fn set_iterate_upper_bound>(&mut self, key: K) { + let key = key.as_ref(); + ffi::rocksdb_readoptions_set_iterate_upper_bound( + self.inner, + key.as_ptr() as *const c_char, + key.len() as size_t, + ); + } + + pub fn set_prefix_same_as_start(&mut self, v: bool) { + unsafe { ffi::rocksdb_readoptions_set_prefix_same_as_start(self.inner, v as c_uchar) } + } + + pub fn set_total_order_seek(&mut self, v: bool) { + unsafe { ffi::rocksdb_readoptions_set_total_order_seek(self.inner, v as c_uchar) } + } + + /// If true, all data read from underlying storage will be + /// verified against corresponding checksums. + /// + /// Default: true + pub fn set_verify_checksums(&mut self, v: bool) { + unsafe { + ffi::rocksdb_readoptions_set_verify_checksums(self.inner, v as c_uchar); + } + } + + /// If non-zero, an iterator will create a new table reader which + /// performs reads of the given size. Using a large size (> 2MB) can + /// improve the performance of forward iteration on spinning disks. + /// Default: 0 + /// + /// ``` + /// use rocksdb::{ReadOptions}; + /// + /// let mut opts = ReadOptions::default(); + /// opts.set_readahead_size(4_194_304); // 4mb + /// ``` + pub fn set_readahead_size(&mut self, v: usize) { + unsafe { + ffi::rocksdb_readoptions_set_readahead_size(self.inner, v as size_t); + } + } + + /// If true, create a tailing iterator. Note that tailing iterators + /// only support moving in the forward direction. Iterating in reverse + /// or seek_to_last are not supported. + pub fn set_tailing(&mut self, v: bool) { + unsafe { + ffi::rocksdb_readoptions_set_tailing(self.inner, v as c_uchar); + } + } +} + +impl Default for ReadOptions { + fn default() -> ReadOptions { + unsafe { + ReadOptions { + inner: ffi::rocksdb_readoptions_create(), + } + } + } +} + +/// Used by BlockBasedOptions::set_index_type. +pub enum BlockBasedIndexType { + /// A space efficient index block that is optimized for + /// binary-search-based index. + BinarySearch, + + /// The hash index, if enabled, will perform a hash lookup if + /// a prefix extractor has been provided through Options::set_prefix_extractor. + HashSearch, + + /// A two-level index implementation. Both levels are binary search indexes. + TwoLevelIndexSearch, +} + +/// Defines the underlying memtable implementation. +/// See https://github.com/facebook/rocksdb/wiki/MemTable for more information. +pub enum MemtableFactory { + Vector, + HashSkipList { + bucket_count: usize, + height: i32, + branching_factor: i32, + }, + HashLinkList { + bucket_count: usize, + }, +} + +/// Used with DBOptions::set_plain_table_factory. +/// See https://github.com/facebook/rocksdb/wiki/PlainTable-Format. +/// +/// Defaults: +/// user_key_length: 0 (variable length) +/// bloom_bits_per_key: 10 +/// hash_table_ratio: 0.75 +/// index_sparseness: 16 +pub struct PlainTableFactoryOptions { + pub user_key_length: u32, + pub bloom_bits_per_key: i32, + pub hash_table_ratio: f64, + pub index_sparseness: usize, +} + +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum DBCompressionType { + None = ffi::rocksdb_no_compression as isize, + Snappy = ffi::rocksdb_snappy_compression as isize, + Zlib = ffi::rocksdb_zlib_compression as isize, + Bz2 = ffi::rocksdb_bz2_compression as isize, + Lz4 = ffi::rocksdb_lz4_compression as isize, + Lz4hc = ffi::rocksdb_lz4hc_compression as isize, + Zstd = ffi::rocksdb_zstd_compression as isize, +} + +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum DBCompactionStyle { + Level = ffi::rocksdb_level_compaction as isize, + Universal = ffi::rocksdb_universal_compaction as isize, + Fifo = ffi::rocksdb_fifo_compaction as isize, +} + +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum DBRecoveryMode { + TolerateCorruptedTailRecords = ffi::rocksdb_tolerate_corrupted_tail_records_recovery as isize, + AbsoluteConsistency = ffi::rocksdb_absolute_consistency_recovery as isize, + PointInTime = ffi::rocksdb_point_in_time_recovery as isize, + SkipAnyCorruptedRecord = ffi::rocksdb_skip_any_corrupted_records_recovery as isize, +} + #[cfg(test)] mod tests { use crate::{MemtableFactory, Options}; diff --git a/src/db_pinnable_slice.rs b/src/db_pinnable_slice.rs new file mode 100644 index 0000000..465af1e --- /dev/null +++ b/src/db_pinnable_slice.rs @@ -0,0 +1,72 @@ +// Copyright 2020 Tyler Neely +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::{ffi, DB}; +use core::ops::Deref; +use libc::size_t; +use std::marker::PhantomData; +use std::slice; + +/// Wrapper around RocksDB PinnableSlice struct. +/// +/// With a pinnable slice, we can directly leverage in-memory data within +/// RocksDB to avoid unnecessary memory copies. The struct here wraps the +/// returned raw pointer and ensures proper finalization work. +pub struct DBPinnableSlice<'a> { + ptr: *mut ffi::rocksdb_pinnableslice_t, + db: PhantomData<&'a DB>, +} + +unsafe impl<'a> Send for DBPinnableSlice<'a> {} +unsafe impl<'a> Sync for DBPinnableSlice<'a> {} + +impl<'a> AsRef<[u8]> for DBPinnableSlice<'a> { + fn as_ref(&self) -> &[u8] { + // Implement this via Deref so as not to repeat ourselves + &*self + } +} + +impl<'a> Deref for DBPinnableSlice<'a> { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + unsafe { + let mut val_len: size_t = 0; + let val = ffi::rocksdb_pinnableslice_value(self.ptr, &mut val_len) as *mut u8; + slice::from_raw_parts(val, val_len) + } + } +} + +impl<'a> Drop for DBPinnableSlice<'a> { + fn drop(&mut self) { + unsafe { + ffi::rocksdb_pinnableslice_destroy(self.ptr); + } + } +} + +impl<'a> DBPinnableSlice<'a> { + /// Used to wrap a PinnableSlice from rocksdb to avoid unnecessary memcpy + /// + /// # Unsafe + /// Requires that the pointer must be generated by rocksdb_get_pinned + pub(crate) unsafe fn from_c(ptr: *mut ffi::rocksdb_pinnableslice_t) -> DBPinnableSlice<'a> { + DBPinnableSlice { + ptr, + db: PhantomData, + } + } +} diff --git a/src/ffi_util.rs b/src/ffi_util.rs index b79df3d..da78167 100644 --- a/src/ffi_util.rs +++ b/src/ffi_util.rs @@ -13,8 +13,10 @@ // limitations under the License. // +use crate::Error; use libc::{self, c_char, c_void}; -use std::ffi::CStr; +use std::ffi::{CStr, CString}; +use std::path::Path; use std::ptr; pub fn error_message(ptr: *const c_char) -> String { @@ -33,6 +35,16 @@ pub fn opt_bytes_to_ptr>(opt: Option) -> *const c_char { } } +pub(crate) fn to_cpath>(path: P) -> Result { + match CString::new(path.as_ref().to_string_lossy().as_bytes()) { + Ok(c) => Ok(c), + Err(e) => Err(Error::new(format!( + "Failed to convert path to CString: {}", + e, + ))), + } +} + macro_rules! ffi_try { ( $($function:ident)::*() ) => { ffi_try_impl!($($function)::*()) diff --git a/src/lib.rs b/src/lib.rs index 21f565f..304434b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,47 +59,39 @@ mod ffi_util; pub mod backup; pub mod checkpoint; +mod column_family; pub mod compaction_filter; mod comparator; mod db; +mod db_iterator; mod db_options; +mod db_pinnable_slice; pub mod merge_operator; mod slice_transform; +mod snapshot; +mod write_batch; pub use crate::{ + column_family::{ColumnFamily, ColumnFamilyDescriptor}, compaction_filter::Decision as CompactionDecision, - db::{ - DBCompactionStyle, DBCompressionType, DBIterator, DBPinnableSlice, DBRawIterator, - DBRecoveryMode, DBWALIterator, Direction, IteratorMode, ReadOptions, Snapshot, WriteBatch, - WriteBatchIterator, + db::DB, + db_iterator::{DBIterator, DBRawIterator, DBWALIterator, Direction, IteratorMode}, + db_options::{ + BlockBasedIndexType, BlockBasedOptions, DBCompactionStyle, DBCompressionType, + DBRecoveryMode, FlushOptions, MemtableFactory, Options, PlainTableFactoryOptions, + ReadOptions, WriteOptions, }, + db_pinnable_slice::DBPinnableSlice, merge_operator::MergeOperands, slice_transform::SliceTransform, + snapshot::Snapshot, + write_batch::{WriteBatch, WriteBatchIterator}, }; use librocksdb_sys as ffi; -use std::collections::BTreeMap; use std::error; use std::fmt; -use std::path::PathBuf; - -/// A RocksDB database. -/// -/// See crate level documentation for a simple usage example. -pub struct DB { - inner: *mut ffi::rocksdb_t, - cfs: BTreeMap, - path: PathBuf, -} - -/// A descriptor for a RocksDB column family. -/// -/// A description of the column family, containing the name and `Options`. -pub struct ColumnFamilyDescriptor { - name: String, - options: Options, -} /// A simple wrapper round a string, used for errors reported from /// ffi calls. @@ -142,152 +134,6 @@ impl fmt::Display for Error { } } -/// For configuring block-based file storage. -pub struct BlockBasedOptions { - inner: *mut ffi::rocksdb_block_based_table_options_t, -} - -/// Used by BlockBasedOptions::set_index_type. -pub enum BlockBasedIndexType { - /// A space efficient index block that is optimized for - /// binary-search-based index. - BinarySearch, - - /// The hash index, if enabled, will perform a hash lookup if - /// a prefix extractor has been provided through Options::set_prefix_extractor. - HashSearch, - - /// A two-level index implementation. Both levels are binary search indexes. - TwoLevelIndexSearch, -} - -/// Defines the underlying memtable implementation. -/// See https://github.com/facebook/rocksdb/wiki/MemTable for more information. -pub enum MemtableFactory { - Vector, - HashSkipList { - bucket_count: usize, - height: i32, - branching_factor: i32, - }, - HashLinkList { - bucket_count: usize, - }, -} - -/// Used with DBOptions::set_plain_table_factory. -/// See https://github.com/facebook/rocksdb/wiki/PlainTable-Format. -/// -/// Defaults: -/// user_key_length: 0 (variable length) -/// bloom_bits_per_key: 10 -/// hash_table_ratio: 0.75 -/// index_sparseness: 16 -pub struct PlainTableFactoryOptions { - pub user_key_length: u32, - pub bloom_bits_per_key: i32, - pub hash_table_ratio: f64, - pub index_sparseness: usize, -} - -/// Database-wide options around performance and behavior. -/// -/// Please read [the official tuning guide](https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide), and most importantly, measure performance under realistic workloads with realistic hardware. -/// -/// # Examples -/// -/// ``` -/// use rocksdb::{Options, DB}; -/// use rocksdb::DBCompactionStyle; -/// -/// fn badly_tuned_for_somebody_elses_disk() -> DB { -/// let path = "path/for/rocksdb/storageX"; -/// let mut opts = Options::default(); -/// opts.create_if_missing(true); -/// opts.set_max_open_files(10000); -/// opts.set_use_fsync(false); -/// opts.set_bytes_per_sync(8388608); -/// opts.optimize_for_point_lookup(1024); -/// opts.set_table_cache_num_shard_bits(6); -/// opts.set_max_write_buffer_number(32); -/// opts.set_write_buffer_size(536870912); -/// opts.set_target_file_size_base(1073741824); -/// opts.set_min_write_buffer_number_to_merge(4); -/// opts.set_level_zero_stop_writes_trigger(2000); -/// opts.set_level_zero_slowdown_writes_trigger(0); -/// opts.set_compaction_style(DBCompactionStyle::Universal); -/// opts.set_max_background_compactions(4); -/// opts.set_max_background_flushes(4); -/// opts.set_disable_auto_compactions(true); -/// -/// DB::open(&opts, path).unwrap() -/// } -/// ``` -pub struct Options { - inner: *mut ffi::rocksdb_options_t, -} - -/// Optionally wait for the memtable flush to be performed. -/// -/// # Examples -/// -/// Manually flushing the memtable: -/// -/// ``` -/// use rocksdb::{DB, Options, FlushOptions}; -/// -/// let path = "_path_for_rocksdb_storageY"; -/// { -/// let db = DB::open_default(path).unwrap(); -/// -/// let mut flush_options = FlushOptions::default(); -/// flush_options.set_wait(true); -/// -/// db.flush_opt(&flush_options); -/// } -/// let _ = DB::destroy(&Options::default(), path); -/// ``` -pub struct FlushOptions { - inner: *mut ffi::rocksdb_flushoptions_t, -} - -/// Optionally disable WAL or sync for this write. -/// -/// # Examples -/// -/// Making an unsafe write of a batch: -/// -/// ``` -/// use rocksdb::{DB, Options, WriteBatch, WriteOptions}; -/// -/// let path = "_path_for_rocksdb_storageY"; -/// { -/// let db = DB::open_default(path).unwrap(); -/// let mut batch = WriteBatch::default(); -/// batch.put(b"my key", b"my value"); -/// batch.put(b"key2", b"value2"); -/// batch.put(b"key3", b"value3"); -/// -/// let mut write_options = WriteOptions::default(); -/// write_options.set_sync(false); -/// write_options.disable_wal(true); -/// -/// db.write_opt(batch, &write_options); -/// } -/// let _ = DB::destroy(&Options::default(), path); -/// ``` -pub struct WriteOptions { - inner: *mut ffi::rocksdb_writeoptions_t, -} - -/// An opaque type used to represent a column family. Returned from some functions, and used -/// in others -pub struct ColumnFamily { - inner: *mut ffi::rocksdb_column_family_handle_t, -} - -unsafe impl Send for ColumnFamily {} - #[cfg(test)] mod test { use super::*; diff --git a/src/snapshot.rs b/src/snapshot.rs new file mode 100644 index 0000000..a73fc77 --- /dev/null +++ b/src/snapshot.rs @@ -0,0 +1,143 @@ +// Copyright 2020 Tyler Neely +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::{ffi, ColumnFamily, DBIterator, DBRawIterator, Error, IteratorMode, ReadOptions, DB}; + +/// A consistent view of the database at the point of creation. +/// +/// ``` +/// use rocksdb::{DB, IteratorMode, Options}; +/// +/// let path = "_path_for_rocksdb_storage3"; +/// { +/// let db = DB::open_default(path).unwrap(); +/// let snapshot = db.snapshot(); // Creates a longer-term snapshot of the DB, but closed when goes out of scope +/// let mut iter = snapshot.iterator(IteratorMode::Start); // Make as many iterators as you'd like from one snapshot +/// } +/// let _ = DB::destroy(&Options::default(), path); +/// ``` +/// +pub struct Snapshot<'a> { + db: &'a DB, + pub(crate) inner: *const ffi::rocksdb_snapshot_t, +} + +impl<'a> Snapshot<'a> { + pub fn new(db: &DB) -> Snapshot { + let snapshot = unsafe { ffi::rocksdb_create_snapshot(db.inner) }; + Snapshot { + db, + inner: snapshot, + } + } + + pub fn iterator(&self, mode: IteratorMode) -> DBIterator<'a> { + let readopts = ReadOptions::default(); + self.iterator_opt(mode, readopts) + } + + pub fn iterator_cf(&self, cf_handle: &ColumnFamily, mode: IteratorMode) -> DBIterator { + let readopts = ReadOptions::default(); + self.iterator_cf_opt(cf_handle, readopts, mode) + } + + pub fn iterator_opt(&self, mode: IteratorMode, mut readopts: ReadOptions) -> DBIterator<'a> { + readopts.set_snapshot(self); + DBIterator::new(self.db, &readopts, mode) + } + + pub fn iterator_cf_opt( + &self, + cf_handle: &ColumnFamily, + mut readopts: ReadOptions, + mode: IteratorMode, + ) -> DBIterator { + readopts.set_snapshot(self); + DBIterator::new_cf(self.db, cf_handle, &readopts, mode) + } + + /// Opens a raw iterator over the data in this snapshot, using the default read options. + pub fn raw_iterator(&self) -> DBRawIterator { + let readopts = ReadOptions::default(); + self.raw_iterator_opt(readopts) + } + + /// Opens a raw iterator over the data in this snapshot under the given column family, using the default read options. + pub fn raw_iterator_cf(&self, cf_handle: &ColumnFamily) -> DBRawIterator { + let readopts = ReadOptions::default(); + self.raw_iterator_cf_opt(cf_handle, readopts) + } + + /// Opens a raw iterator over the data in this snapshot, using the given read options. + pub fn raw_iterator_opt(&self, mut readopts: ReadOptions) -> DBRawIterator { + readopts.set_snapshot(self); + DBRawIterator::new(self.db, &readopts) + } + + /// Opens a raw iterator over the data in this snapshot under the given column family, using the given read options. + pub fn raw_iterator_cf_opt( + &self, + cf_handle: &ColumnFamily, + mut readopts: ReadOptions, + ) -> DBRawIterator { + readopts.set_snapshot(self); + DBRawIterator::new_cf(self.db, cf_handle, &readopts) + } + + pub fn get>(&self, key: K) -> Result>, Error> { + let readopts = ReadOptions::default(); + self.get_opt(key, readopts) + } + + pub fn get_cf>( + &self, + cf: &ColumnFamily, + key: K, + ) -> Result>, Error> { + let readopts = ReadOptions::default(); + self.get_cf_opt(cf, key.as_ref(), readopts) + } + + pub fn get_opt>( + &self, + key: K, + mut readopts: ReadOptions, + ) -> Result>, Error> { + readopts.set_snapshot(self); + self.db.get_opt(key.as_ref(), &readopts) + } + + pub fn get_cf_opt>( + &self, + cf: &ColumnFamily, + key: K, + mut readopts: ReadOptions, + ) -> Result>, Error> { + readopts.set_snapshot(self); + self.db.get_cf_opt(cf, key.as_ref(), &readopts) + } +} + +impl<'a> Drop for Snapshot<'a> { + fn drop(&mut self) { + unsafe { + ffi::rocksdb_release_snapshot(self.db.inner, self.inner); + } + } +} + +/// `Send` and `Sync` implementations for `Snapshot` are safe, because `Snapshot` is +/// immutable and can be safely shared between threads. +unsafe impl<'a> Send for Snapshot<'a> {} +unsafe impl<'a> Sync for Snapshot<'a> {} diff --git a/src/write_batch.rs b/src/write_batch.rs new file mode 100644 index 0000000..c07fe6b --- /dev/null +++ b/src/write_batch.rs @@ -0,0 +1,281 @@ +// Copyright 2020 Tyler Neely +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::{ffi, ColumnFamily}; +use libc::{c_char, c_void, size_t}; +use std::slice; + +/// An atomic batch of write operations. +/// +/// Making an atomic commit of several writes: +/// +/// ``` +/// use rocksdb::{DB, Options, WriteBatch}; +/// +/// let path = "_path_for_rocksdb_storage1"; +/// { +/// let db = DB::open_default(path).unwrap(); +/// let mut batch = WriteBatch::default(); +/// batch.put(b"my key", b"my value"); +/// batch.put(b"key2", b"value2"); +/// batch.put(b"key3", b"value3"); +/// db.write(batch); // Atomically commits the batch +/// } +/// let _ = DB::destroy(&Options::default(), path); +/// ``` +pub struct WriteBatch { + pub(crate) inner: *mut ffi::rocksdb_writebatch_t, +} + +/// Receives the puts and deletes of a write batch. +/// +/// The application must provide an implementation of this trait when +/// iterating the operations within a `WriteBatch` +pub trait WriteBatchIterator { + /// Called with a key and value that were `put` into the batch. + fn put(&mut self, key: Box<[u8]>, value: Box<[u8]>); + /// Called with a key that was `delete`d from the batch. + fn delete(&mut self, key: Box<[u8]>); +} + +unsafe extern "C" fn writebatch_put_callback( + state: *mut c_void, + k: *const c_char, + klen: usize, + v: *const c_char, + vlen: usize, +) { + // coerce the raw pointer back into a box, but "leak" it so we prevent + // freeing the resource before we are done with it + let boxed_cb = Box::from_raw(state as *mut &mut dyn WriteBatchIterator); + let leaked_cb = Box::leak(boxed_cb); + let key = slice::from_raw_parts(k as *const u8, klen as usize); + let value = slice::from_raw_parts(v as *const u8, vlen as usize); + leaked_cb.put( + key.to_vec().into_boxed_slice(), + value.to_vec().into_boxed_slice(), + ); +} + +unsafe extern "C" fn writebatch_delete_callback(state: *mut c_void, k: *const c_char, klen: usize) { + // coerce the raw pointer back into a box, but "leak" it so we prevent + // freeing the resource before we are done with it + let boxed_cb = Box::from_raw(state as *mut &mut dyn WriteBatchIterator); + let leaked_cb = Box::leak(boxed_cb); + let key = slice::from_raw_parts(k as *const u8, klen as usize); + leaked_cb.delete(key.to_vec().into_boxed_slice()); +} + +impl WriteBatch { + pub fn len(&self) -> usize { + unsafe { ffi::rocksdb_writebatch_count(self.inner) as usize } + } + + /// Return WriteBatch serialized size (in bytes). + pub fn size_in_bytes(&self) -> usize { + unsafe { + let mut batch_size: size_t = 0; + ffi::rocksdb_writebatch_data(self.inner, &mut batch_size); + batch_size as usize + } + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Iterate the put and delete operations within this write batch. Note that + /// this does _not_ return an `Iterator` but instead will invoke the `put()` + /// and `delete()` member functions of the provided `WriteBatchIterator` + /// trait implementation. + pub fn iterate(&self, callbacks: &mut dyn WriteBatchIterator) { + let state = Box::into_raw(Box::new(callbacks)); + unsafe { + ffi::rocksdb_writebatch_iterate( + self.inner, + state as *mut c_void, + Some(writebatch_put_callback), + Some(writebatch_delete_callback), + ); + // we must manually set the raw box free since there is no + // associated "destroy" callback for this object + Box::from_raw(state); + } + } + + /// Insert a value into the database under the given key. + pub fn put(&mut self, key: K, value: V) + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + { + let key = key.as_ref(); + let value = value.as_ref(); + + unsafe { + ffi::rocksdb_writebatch_put( + self.inner, + key.as_ptr() as *const c_char, + key.len() as size_t, + value.as_ptr() as *const c_char, + value.len() as size_t, + ); + } + } + + pub fn put_cf(&mut self, cf: &ColumnFamily, key: K, value: V) + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + { + let key = key.as_ref(); + let value = value.as_ref(); + + unsafe { + ffi::rocksdb_writebatch_put_cf( + self.inner, + cf.inner, + key.as_ptr() as *const c_char, + key.len() as size_t, + value.as_ptr() as *const c_char, + value.len() as size_t, + ); + } + } + + pub fn merge(&mut self, key: K, value: V) + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + { + let key = key.as_ref(); + let value = value.as_ref(); + + unsafe { + ffi::rocksdb_writebatch_merge( + self.inner, + key.as_ptr() as *const c_char, + key.len() as size_t, + value.as_ptr() as *const c_char, + value.len() as size_t, + ); + } + } + + pub fn merge_cf(&mut self, cf: &ColumnFamily, key: K, value: V) + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + { + let key = key.as_ref(); + let value = value.as_ref(); + + unsafe { + ffi::rocksdb_writebatch_merge_cf( + self.inner, + cf.inner, + key.as_ptr() as *const c_char, + key.len() as size_t, + value.as_ptr() as *const c_char, + value.len() as size_t, + ); + } + } + + /// Removes the database entry for key. Does nothing if the key was not found. + pub fn delete>(&mut self, key: K) { + let key = key.as_ref(); + + unsafe { + ffi::rocksdb_writebatch_delete( + self.inner, + key.as_ptr() as *const c_char, + key.len() as size_t, + ); + } + } + + pub fn delete_cf>(&mut self, cf: &ColumnFamily, key: K) { + let key = key.as_ref(); + + unsafe { + ffi::rocksdb_writebatch_delete_cf( + self.inner, + cf.inner, + key.as_ptr() as *const c_char, + key.len() as size_t, + ); + } + } + + /// Remove database entries from start key to end key. + /// + /// Removes the database entries in the range ["begin_key", "end_key"), i.e., + /// including "begin_key" and excluding "end_key". It is not an error if no + /// keys exist in the range ["begin_key", "end_key"). + pub fn delete_range>(&mut self, from: K, to: K) { + let (start_key, end_key) = (from.as_ref(), to.as_ref()); + + unsafe { + ffi::rocksdb_writebatch_delete_range( + self.inner, + start_key.as_ptr() as *const c_char, + start_key.len() as size_t, + end_key.as_ptr() as *const c_char, + end_key.len() as size_t, + ); + } + } + + /// Remove database entries in column family from start key to end key. + /// + /// Removes the database entries in the range ["begin_key", "end_key"), i.e., + /// including "begin_key" and excluding "end_key". It is not an error if no + /// keys exist in the range ["begin_key", "end_key"). + pub fn delete_range_cf>(&mut self, cf: &ColumnFamily, from: K, to: K) { + let (start_key, end_key) = (from.as_ref(), to.as_ref()); + + unsafe { + ffi::rocksdb_writebatch_delete_range_cf( + self.inner, + cf.inner, + start_key.as_ptr() as *const c_char, + start_key.len() as size_t, + end_key.as_ptr() as *const c_char, + end_key.len() as size_t, + ); + } + } + + /// Clear all updates buffered in this batch. + pub fn clear(&mut self) { + unsafe { + ffi::rocksdb_writebatch_clear(self.inner); + } + } +} + +impl Default for WriteBatch { + fn default() -> WriteBatch { + WriteBatch { + inner: unsafe { ffi::rocksdb_writebatch_create() }, + } + } +} + +impl Drop for WriteBatch { + fn drop(&mut self) { + unsafe { ffi::rocksdb_writebatch_destroy(self.inner) } + } +}