// Copyright 2014 Tyler Neely // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // use ffi; use ffi_util::opt_bytes_to_ptr; use {ColumnFamily, ColumnFamilyDescriptor, Error, FlushOptions, Options, WriteOptions, DB}; use libc::{self, c_char, c_int, c_uchar, c_void, size_t}; use std::collections::BTreeMap; use std::ffi::{CStr, CString}; use std::fmt; use std::fs; use std::marker::PhantomData; use std::ops::Deref; use std::path::Path; use std::ptr; use std::slice; use std::str; unsafe impl Send for DB {} unsafe impl Sync for DB {} #[derive(Debug, Copy, Clone, PartialEq)] pub enum DBCompressionType { None = ffi::rocksdb_no_compression as isize, Snappy = ffi::rocksdb_snappy_compression as isize, Zlib = ffi::rocksdb_zlib_compression as isize, Bz2 = ffi::rocksdb_bz2_compression as isize, Lz4 = ffi::rocksdb_lz4_compression as isize, Lz4hc = ffi::rocksdb_lz4hc_compression as isize, Zstd = ffi::rocksdb_zstd_compression as isize, } #[derive(Debug, Copy, Clone, PartialEq)] pub enum DBCompactionStyle { Level = ffi::rocksdb_level_compaction as isize, Universal = ffi::rocksdb_universal_compaction as isize, Fifo = ffi::rocksdb_fifo_compaction as isize, } #[derive(Debug, Copy, Clone, PartialEq)] pub enum DBRecoveryMode { TolerateCorruptedTailRecords = ffi::rocksdb_tolerate_corrupted_tail_records_recovery as isize, AbsoluteConsistency = ffi::rocksdb_absolute_consistency_recovery as isize, PointInTime = ffi::rocksdb_point_in_time_recovery as isize, SkipAnyCorruptedRecord = ffi::rocksdb_skip_any_corrupted_records_recovery as isize, } /// An atomic batch of write operations. /// /// Making an atomic commit of several writes: /// /// ``` /// use rocksdb::{DB, Options, WriteBatch}; /// /// let path = "_path_for_rocksdb_storage1"; /// { /// let db = DB::open_default(path).unwrap(); /// let mut batch = WriteBatch::default(); /// batch.put(b"my key", b"my value"); /// batch.put(b"key2", b"value2"); /// batch.put(b"key3", b"value3"); /// db.write(batch); // Atomically commits the batch /// } /// let _ = DB::destroy(&Options::default(), path); /// ``` pub struct WriteBatch { inner: *mut ffi::rocksdb_writebatch_t, } pub struct ReadOptions { inner: *mut ffi::rocksdb_readoptions_t, } /// A consistent view of the database at the point of creation. /// /// ``` /// use rocksdb::{DB, IteratorMode, Options}; /// /// let path = "_path_for_rocksdb_storage3"; /// { /// let db = DB::open_default(path).unwrap(); /// let snapshot = db.snapshot(); // Creates a longer-term snapshot of the DB, but closed when goes out of scope /// let mut iter = snapshot.iterator(IteratorMode::Start); // Make as many iterators as you'd like from one snapshot /// } /// let _ = DB::destroy(&Options::default(), path); /// ``` /// pub struct Snapshot<'a> { db: &'a DB, inner: *const ffi::rocksdb_snapshot_t, } /// `Send` and `Sync` implementations for `Snapshot` are safe, because `Snapshot` is /// immutable and can be safely shared between threads. unsafe impl<'a> Send for Snapshot<'a> {} unsafe impl<'a> Sync for Snapshot<'a> {} /// An iterator over a database or column family, with specifiable /// ranges and direction. /// /// This iterator is different to the standard ``DBIterator`` as it aims Into /// replicate the underlying iterator API within RocksDB itself. This should /// give access to more performance and flexibility but departs from the /// widely recognised Rust idioms. /// /// ``` /// use rocksdb::{DB, Options}; /// /// let path = "_path_for_rocksdb_storage4"; /// { /// let db = DB::open_default(path).unwrap(); /// let mut iter = db.raw_iterator(); /// /// // Forwards iteration /// iter.seek_to_first(); /// while iter.valid() { /// println!("Saw {:?} {:?}", iter.key(), iter.value()); /// iter.next(); /// } /// /// // Reverse iteration /// iter.seek_to_last(); /// while iter.valid() { /// println!("Saw {:?} {:?}", iter.key(), iter.value()); /// iter.prev(); /// } /// /// // Seeking /// iter.seek(b"my key"); /// while iter.valid() { /// println!("Saw {:?} {:?}", iter.key(), iter.value()); /// iter.next(); /// } /// /// // Reverse iteration from key /// // Note, use seek_for_prev when reversing because if this key doesn't exist, /// // this will make the iterator start from the previous key rather than the next. /// iter.seek_for_prev(b"my key"); /// while iter.valid() { /// println!("Saw {:?} {:?}", iter.key(), iter.value()); /// iter.prev(); /// } /// } /// let _ = DB::destroy(&Options::default(), path); /// ``` pub struct DBRawIterator<'a> { inner: *mut ffi::rocksdb_iterator_t, db: PhantomData<&'a DB>, } /// Iterates the batches of writes since a given sequence number. /// /// `DBWALIterator` is returned by `DB::get_updates_since()` and will return the /// batches of write operations that have occurred since a given sequence number /// (see `DB::latest_sequence_number()`). This iterator cannot be constructed by /// the application. /// /// The iterator item type is a tuple of (`u64`, `WriteBatch`) where the first /// value is the sequence number of the associated write batch. /// pub struct DBWALIterator { inner: *mut ffi::rocksdb_wal_iterator_t, } impl DBWALIterator { /// Returns `true` if the iterator is valid. An iterator is invalidated when /// it reaches the end of its defined range, or when it encounters an error. /// /// To check whether the iterator encountered an error after `valid` has /// returned `false`, use the [`status`](DBWALIterator::status) method. /// `status` will never return an error when `valid` is `true`. pub fn valid(&self) -> bool { unsafe { ffi::rocksdb_wal_iter_valid(self.inner) != 0 } } /// Returns an error `Result` if the iterator has encountered an error /// during operation. When an error is encountered, the iterator is /// invalidated and [`valid`](DBWALIterator::valid) will return `false` when /// called. pub fn status(&self) -> Result<(), Error> { unsafe { ffi_try!(ffi::rocksdb_wal_iter_status(self.inner,)); } Ok(()) } } impl Iterator for DBWALIterator { type Item = (u64, WriteBatch); fn next(&mut self) -> Option<(u64, WriteBatch)> { // Seek to the next write batch. unsafe { ffi::rocksdb_wal_iter_next(self.inner); } if self.valid() { let mut seq: u64 = 0; let inner = unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) }; Some((seq, WriteBatch { inner })) } else { None } } } impl Drop for DBWALIterator { fn drop(&mut self) { unsafe { ffi::rocksdb_wal_iter_destroy(self.inner); } } } /// An iterator over a database or column family, with specifiable /// ranges and direction. /// /// ``` /// use rocksdb::{DB, Direction, IteratorMode, Options}; /// /// let path = "_path_for_rocksdb_storage2"; /// { /// let db = DB::open_default(path).unwrap(); /// let mut iter = db.iterator(IteratorMode::Start); // Always iterates forward /// for (key, value) in iter { /// println!("Saw {:?} {:?}", key, value); /// } /// iter = db.iterator(IteratorMode::End); // Always iterates backward /// for (key, value) in iter { /// println!("Saw {:?} {:?}", key, value); /// } /// iter = db.iterator(IteratorMode::From(b"my key", Direction::Forward)); // From a key in Direction::{forward,reverse} /// for (key, value) in iter { /// println!("Saw {:?} {:?}", key, value); /// } /// /// // You can seek with an existing Iterator instance, too /// iter = db.iterator(IteratorMode::Start); /// iter.set_mode(IteratorMode::From(b"another key", Direction::Reverse)); /// for (key, value) in iter { /// println!("Saw {:?} {:?}", key, value); /// } /// } /// let _ = DB::destroy(&Options::default(), path); /// ``` pub struct DBIterator<'a> { raw: DBRawIterator<'a>, direction: Direction, just_seeked: bool, } pub enum Direction { Forward, Reverse, } pub type KVBytes = (Box<[u8]>, Box<[u8]>); pub enum IteratorMode<'a> { Start, End, From(&'a [u8], Direction), } impl<'a> DBRawIterator<'a> { fn new(db: &DB, readopts: &ReadOptions) -> DBRawIterator<'a> { unsafe { DBRawIterator { inner: ffi::rocksdb_create_iterator(db.inner, readopts.inner), db: PhantomData, } } } fn new_cf( db: &DB, cf_handle: &ColumnFamily, readopts: &ReadOptions, ) -> Result, Error> { unsafe { Ok(DBRawIterator { inner: ffi::rocksdb_create_iterator_cf(db.inner, readopts.inner, cf_handle.inner), db: PhantomData, }) } } /// Returns `true` if the iterator is valid. An iterator is invalidated when /// it reaches the end of its defined range, or when it encounters an error. /// /// To check whether the iterator encountered an error after `valid` has /// returned `false`, use the [`status`](DBRawIterator::status) method. `status` will never /// return an error when `valid` is `true`. pub fn valid(&self) -> bool { unsafe { ffi::rocksdb_iter_valid(self.inner) != 0 } } /// Returns an error `Result` if the iterator has encountered an error /// during operation. When an error is encountered, the iterator is /// invalidated and [`valid`](DBRawIterator::valid) will return `false` when called. /// /// Performing a seek will discard the current status. pub fn status(&self) -> Result<(), Error> { unsafe { ffi_try!(ffi::rocksdb_iter_get_error(self.inner,)); } Ok(()) } /// Seeks to the first key in the database. /// /// # Examples /// /// ```rust /// use rocksdb::{DB, Options}; /// /// let path = "_path_for_rocksdb_storage5"; /// { /// let db = DB::open_default(path).unwrap(); /// let mut iter = db.raw_iterator(); /// /// // Iterate all keys from the start in lexicographic order /// iter.seek_to_first(); /// /// while iter.valid() { /// println!("{:?} {:?}", iter.key(), iter.value()); /// iter.next(); /// } /// /// // Read just the first key /// iter.seek_to_first(); /// /// if iter.valid() { /// println!("{:?} {:?}", iter.key(), iter.value()); /// } else { /// // There are no keys in the database /// } /// } /// let _ = DB::destroy(&Options::default(), path); /// ``` pub fn seek_to_first(&mut self) { unsafe { ffi::rocksdb_iter_seek_to_first(self.inner); } } /// Seeks to the last key in the database. /// /// # Examples /// /// ```rust /// use rocksdb::{DB, Options}; /// /// let path = "_path_for_rocksdb_storage6"; /// { /// let db = DB::open_default(path).unwrap(); /// let mut iter = db.raw_iterator(); /// /// // Iterate all keys from the end in reverse lexicographic order /// iter.seek_to_last(); /// /// while iter.valid() { /// println!("{:?} {:?}", iter.key(), iter.value()); /// iter.prev(); /// } /// /// // Read just the last key /// iter.seek_to_last(); /// /// if iter.valid() { /// println!("{:?} {:?}", iter.key(), iter.value()); /// } else { /// // There are no keys in the database /// } /// } /// let _ = DB::destroy(&Options::default(), path); /// ``` pub fn seek_to_last(&mut self) { unsafe { ffi::rocksdb_iter_seek_to_last(self.inner); } } /// Seeks to the specified key or the first key that lexicographically follows it. /// /// This method will attempt to seek to the specified key. If that key does not exist, it will /// find and seek to the key that lexicographically follows it instead. /// /// # Examples /// /// ```rust /// use rocksdb::{DB, Options}; /// /// let path = "_path_for_rocksdb_storage7"; /// { /// let db = DB::open_default(path).unwrap(); /// let mut iter = db.raw_iterator(); /// /// // Read the first key that starts with 'a' /// iter.seek(b"a"); /// /// if iter.valid() { /// println!("{:?} {:?}", iter.key(), iter.value()); /// } else { /// // There are no keys in the database /// } /// } /// let _ = DB::destroy(&Options::default(), path); /// ``` pub fn seek>(&mut self, key: K) { let key = key.as_ref(); unsafe { ffi::rocksdb_iter_seek( self.inner, key.as_ptr() as *const c_char, key.len() as size_t, ); } } /// Seeks to the specified key, or the first key that lexicographically precedes it. /// /// Like ``.seek()`` this method will attempt to seek to the specified key. /// The difference with ``.seek()`` is that if the specified key do not exist, this method will /// seek to key that lexicographically precedes it instead. /// /// # Examples /// /// ```rust /// use rocksdb::{DB, Options}; /// /// let path = "_path_for_rocksdb_storage8"; /// { /// let db = DB::open_default(path).unwrap(); /// let mut iter = db.raw_iterator(); /// /// // Read the last key that starts with 'a' /// iter.seek_for_prev(b"b"); /// /// if iter.valid() { /// println!("{:?} {:?}", iter.key(), iter.value()); /// } else { /// // There are no keys in the database /// } /// } /// let _ = DB::destroy(&Options::default(), path); /// ``` pub fn seek_for_prev>(&mut self, key: K) { let key = key.as_ref(); unsafe { ffi::rocksdb_iter_seek_for_prev( self.inner, key.as_ptr() as *const c_char, key.len() as size_t, ); } } /// Seeks to the next key. /// /// Returns true if the iterator is valid after this operation. pub fn next(&mut self) { unsafe { ffi::rocksdb_iter_next(self.inner); } } /// Seeks to the previous key. /// /// Returns true if the iterator is valid after this operation. pub fn prev(&mut self) { unsafe { ffi::rocksdb_iter_prev(self.inner); } } /// Returns a slice of the current key. pub fn key(&self) -> Option<&[u8]> { if self.valid() { // Safety Note: This is safe as all methods that may invalidate the buffer returned // take `&mut self`, so borrow checker will prevent use of buffer after seek. unsafe { let mut key_len: size_t = 0; let key_len_ptr: *mut size_t = &mut key_len; let key_ptr = ffi::rocksdb_iter_key(self.inner, key_len_ptr) as *const c_uchar; Some(slice::from_raw_parts(key_ptr, key_len as usize)) } } else { None } } /// Returns a slice of the current value. pub fn value(&self) -> Option<&[u8]> { if self.valid() { // Safety Note: This is safe as all methods that may invalidate the buffer returned // take `&mut self`, so borrow checker will prevent use of buffer after seek. unsafe { let mut val_len: size_t = 0; let val_len_ptr: *mut size_t = &mut val_len; let val_ptr = ffi::rocksdb_iter_value(self.inner, val_len_ptr) as *const c_uchar; Some(slice::from_raw_parts(val_ptr, val_len as usize)) } } else { None } } } impl<'a> Drop for DBRawIterator<'a> { fn drop(&mut self) { unsafe { ffi::rocksdb_iter_destroy(self.inner); } } } impl<'a> DBIterator<'a> { fn new(db: &DB, readopts: &ReadOptions, mode: IteratorMode) -> DBIterator<'a> { let mut rv = DBIterator { raw: DBRawIterator::new(db, readopts), direction: Direction::Forward, // blown away by set_mode() just_seeked: false, }; rv.set_mode(mode); rv } fn new_cf( db: &DB, cf_handle: &ColumnFamily, readopts: &ReadOptions, mode: IteratorMode, ) -> Result, Error> { let mut rv = DBIterator { raw: DBRawIterator::new_cf(db, cf_handle, readopts)?, direction: Direction::Forward, // blown away by set_mode() just_seeked: false, }; rv.set_mode(mode); Ok(rv) } pub fn set_mode(&mut self, mode: IteratorMode) { match mode { IteratorMode::Start => { self.raw.seek_to_first(); self.direction = Direction::Forward; } IteratorMode::End => { self.raw.seek_to_last(); self.direction = Direction::Reverse; } IteratorMode::From(key, Direction::Forward) => { self.raw.seek(key); self.direction = Direction::Forward; } IteratorMode::From(key, Direction::Reverse) => { self.raw.seek_for_prev(key); self.direction = Direction::Reverse; } }; self.just_seeked = true; } /// See [`valid`](DBRawIterator::valid) pub fn valid(&self) -> bool { self.raw.valid() } /// See [`status`](DBRawIterator::status) pub fn status(&self) -> Result<(), Error> { self.raw.status() } } impl<'a> Iterator for DBIterator<'a> { type Item = KVBytes; fn next(&mut self) -> Option { // Initial call to next() after seeking should not move the iterator // or the first item will not be returned if !self.just_seeked { match self.direction { Direction::Forward => self.raw.next(), Direction::Reverse => self.raw.prev(), } } else { self.just_seeked = false; } if self.raw.valid() { // .key() and .value() only ever return None if valid == false, which we've just cheked Some(( self.raw.key().unwrap().to_vec().into_boxed_slice(), self.raw.value().unwrap().to_vec().into_boxed_slice(), )) } else { None } } } impl<'a> Into> for DBIterator<'a> { fn into(self) -> DBRawIterator<'a> { self.raw } } impl<'a> Snapshot<'a> { pub fn new(db: &DB) -> Snapshot { let snapshot = unsafe { ffi::rocksdb_create_snapshot(db.inner) }; Snapshot { db, inner: snapshot, } } pub fn iterator(&self, mode: IteratorMode) -> DBIterator<'a> { let readopts = ReadOptions::default(); self.iterator_opt(mode, readopts) } pub fn iterator_cf( &self, cf_handle: &ColumnFamily, mode: IteratorMode, ) -> Result { let readopts = ReadOptions::default(); self.iterator_cf_opt(cf_handle, readopts, mode) } pub fn iterator_opt(&self, mode: IteratorMode, mut readopts: ReadOptions) -> DBIterator<'a> { readopts.set_snapshot(self); DBIterator::new(self.db, &readopts, mode) } pub fn iterator_cf_opt( &self, cf_handle: &ColumnFamily, mut readopts: ReadOptions, mode: IteratorMode, ) -> Result { readopts.set_snapshot(self); DBIterator::new_cf(self.db, cf_handle, &readopts, mode) } /// Opens a raw iterator over the data in this snapshot, using the default read options. pub fn raw_iterator(&self) -> DBRawIterator { let readopts = ReadOptions::default(); self.raw_iterator_opt(readopts) } /// Opens a raw iterator over the data in this snapshot under the given column family, using the default read options. pub fn raw_iterator_cf(&self, cf_handle: &ColumnFamily) -> Result { let readopts = ReadOptions::default(); self.raw_iterator_cf_opt(cf_handle, readopts) } /// Opens a raw iterator over the data in this snapshot, using the given read options. pub fn raw_iterator_opt(&self, mut readopts: ReadOptions) -> DBRawIterator { readopts.set_snapshot(self); DBRawIterator::new(self.db, &readopts) } /// Opens a raw iterator over the data in this snapshot under the given column family, using the given read options. pub fn raw_iterator_cf_opt( &self, cf_handle: &ColumnFamily, mut readopts: ReadOptions, ) -> Result { readopts.set_snapshot(self); DBRawIterator::new_cf(self.db, cf_handle, &readopts) } pub fn get>(&self, key: K) -> Result>, Error> { let readopts = ReadOptions::default(); self.get_opt(key, readopts) } pub fn get_cf>( &self, cf: &ColumnFamily, key: K, ) -> Result>, Error> { let readopts = ReadOptions::default(); self.get_cf_opt(cf, key.as_ref(), readopts) } pub fn get_opt>( &self, key: K, mut readopts: ReadOptions, ) -> Result>, Error> { readopts.set_snapshot(self); self.db.get_opt(key.as_ref(), &readopts) } pub fn get_cf_opt>( &self, cf: &ColumnFamily, key: K, mut readopts: ReadOptions, ) -> Result>, Error> { readopts.set_snapshot(self); self.db.get_cf_opt(cf, key.as_ref(), &readopts) } } impl<'a> Drop for Snapshot<'a> { fn drop(&mut self) { unsafe { ffi::rocksdb_release_snapshot(self.db.inner, self.inner); } } } impl ColumnFamilyDescriptor { // Create a new column family descriptor with the specified name and options. pub fn new(name: S, options: Options) -> Self where S: Into, { ColumnFamilyDescriptor { name: name.into(), options, } } } impl DB { /// Open a database with default options. pub fn open_default>(path: P) -> Result { let mut opts = Options::default(); opts.create_if_missing(true); DB::open(&opts, path) } /// Open the database with the specified options. pub fn open>(opts: &Options, path: P) -> Result { DB::open_cf(opts, path, None::<&str>) } /// Open a database with the given database options and column family names. /// /// Column families opened using this function will be created with default `Options`. pub fn open_cf(opts: &Options, path: P, cfs: I) -> Result where P: AsRef, I: IntoIterator, N: AsRef, { let cfs = cfs .into_iter() .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default())); DB::open_cf_descriptors(opts, path, cfs) } /// Open a database with the given database options and column family descriptors. pub fn open_cf_descriptors(opts: &Options, path: P, cfs: I) -> Result where P: AsRef, I: IntoIterator, { let cfs: Vec<_> = cfs.into_iter().collect(); let path = path.as_ref(); let cpath = match CString::new(path.to_string_lossy().as_bytes()) { Ok(c) => c, Err(_) => { return Err(Error::new( "Failed to convert path to CString \ when opening DB." .to_owned(), )); } }; if let Err(e) = fs::create_dir_all(&path) { return Err(Error::new(format!( "Failed to create RocksDB directory: `{:?}`.", e ))); } let db: *mut ffi::rocksdb_t; let mut cf_map = BTreeMap::new(); if cfs.is_empty() { unsafe { db = ffi_try!(ffi::rocksdb_open(opts.inner, cpath.as_ptr() as *const _,)); } } else { let mut cfs_v = cfs; // Always open the default column family. if !cfs_v.iter().any(|cf| cf.name == "default") { cfs_v.push(ColumnFamilyDescriptor { name: String::from("default"), options: Options::default(), }); } // We need to store our CStrings in an intermediate vector // so that their pointers remain valid. let c_cfs: Vec = cfs_v .iter() .map(|cf| CString::new(cf.name.as_bytes()).unwrap()) .collect(); let mut cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect(); // These handles will be populated by DB. let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect(); let mut cfopts: Vec<_> = cfs_v .iter() .map(|cf| cf.options.inner as *const _) .collect(); unsafe { db = ffi_try!(ffi::rocksdb_open_column_families( opts.inner, cpath.as_ptr(), cfs_v.len() as c_int, cfnames.as_mut_ptr(), cfopts.as_mut_ptr(), cfhandles.as_mut_ptr(), )); } for handle in &cfhandles { if handle.is_null() { return Err(Error::new( "Received null column family \ handle from DB." .to_owned(), )); } } for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) { cf_map.insert(cf_desc.name.clone(), ColumnFamily { inner }); } } if db.is_null() { return Err(Error::new("Could not initialize database.".to_owned())); } Ok(DB { inner: db, cfs: cf_map, path: path.to_path_buf(), }) } pub fn list_cf>(opts: &Options, path: P) -> Result, Error> { let cpath = to_cpath(path)?; let mut length = 0; unsafe { let ptr = ffi_try!(ffi::rocksdb_list_column_families( opts.inner, cpath.as_ptr() as *const _, &mut length, )); let vec = slice::from_raw_parts(ptr, length) .iter() .map(|ptr| CStr::from_ptr(*ptr).to_string_lossy().into_owned()) .collect(); ffi::rocksdb_list_column_families_destroy(ptr, length); Ok(vec) } } pub fn destroy>(opts: &Options, path: P) -> Result<(), Error> { let cpath = to_cpath(path)?; unsafe { ffi_try!(ffi::rocksdb_destroy_db(opts.inner, cpath.as_ptr(),)); } Ok(()) } pub fn repair>(opts: Options, path: P) -> Result<(), Error> { let cpath = to_cpath(path)?; unsafe { ffi_try!(ffi::rocksdb_repair_db(opts.inner, cpath.as_ptr(),)); } Ok(()) } pub fn path(&self) -> &Path { &self.path.as_path() } /// Flush database memtable to SST files on disk (with options). pub fn flush_opt(&self, flushopts: &FlushOptions) -> Result<(), Error> { unsafe { ffi_try!(ffi::rocksdb_flush(self.inner, flushopts.inner,)); } Ok(()) } /// Flush database memtable to SST files on disk. pub fn flush(&self) -> Result<(), Error> { self.flush_opt(&FlushOptions::default()) } pub fn write_opt(&self, batch: WriteBatch, writeopts: &WriteOptions) -> Result<(), Error> { unsafe { ffi_try!(ffi::rocksdb_write(self.inner, writeopts.inner, batch.inner,)); } Ok(()) } pub fn write(&self, batch: WriteBatch) -> Result<(), Error> { self.write_opt(batch, &WriteOptions::default()) } pub fn write_without_wal(&self, batch: WriteBatch) -> Result<(), Error> { let mut wo = WriteOptions::new(); wo.disable_wal(true); self.write_opt(batch, &wo) } /// Return the bytes associated with a key value with read options. If you only intend to use /// the vector returned temporarily, consider using [`get_pinned_opt`](#method.get_pinned_opt) /// to avoid unnecessary memory copy. pub fn get_opt>( &self, key: K, readopts: &ReadOptions, ) -> Result>, Error> { self.get_pinned_opt(key, readopts) .map(|x| x.map(|v| v.as_ref().to_vec())) } /// Return the bytes associated with a key value. If you only intend to use the vector returned /// temporarily, consider using [`get_pinned`](#method.get_pinned) to avoid unnecessary memory /// copy. pub fn get>(&self, key: K) -> Result>, Error> { self.get_opt(key.as_ref(), &ReadOptions::default()) } /// Return the bytes associated with a key value and the given column family with read options. /// If you only intend to use the vector returned temporarily, consider using /// [`get_pinned_cf_opt`](#method.get_pinned_cf_opt) to avoid unnecessary memory. pub fn get_cf_opt>( &self, cf: &ColumnFamily, key: K, readopts: &ReadOptions, ) -> Result>, Error> { self.get_pinned_cf_opt(cf, key, readopts) .map(|x| x.map(|v| v.as_ref().to_vec())) } /// Return the bytes associated with a key value and the given column family. If you only /// intend to use the vector returned temporarily, consider using /// [`get_pinned_cf`](#method.get_pinned_cf) to avoid unnecessary memory. pub fn get_cf>( &self, cf: &ColumnFamily, key: K, ) -> Result>, Error> { self.get_cf_opt(cf, key.as_ref(), &ReadOptions::default()) } /// Return the value associated with a key using RocksDB's PinnableSlice /// so as to avoid unnecessary memory copy. pub fn get_pinned_opt>( &self, key: K, readopts: &ReadOptions, ) -> Result, Error> { if readopts.inner.is_null() { return Err(Error::new( "Unable to create RocksDB read options. \ This is a fairly trivial call, and its \ failure may be indicative of a \ mis-compiled or mis-loaded RocksDB \ library." .to_owned(), )); } let key = key.as_ref(); unsafe { let val = ffi_try!(ffi::rocksdb_get_pinned( self.inner, readopts.inner, key.as_ptr() as *const c_char, key.len() as size_t, )); if val.is_null() { Ok(None) } else { Ok(Some(DBPinnableSlice::from_c(val))) } } } /// Return the value associated with a key using RocksDB's PinnableSlice /// so as to avoid unnecessary memory copy. Similar to get_pinned_opt but /// leverages default options. pub fn get_pinned>(&self, key: K) -> Result, Error> { self.get_pinned_opt(key, &ReadOptions::default()) } /// Return the value associated with a key using RocksDB's PinnableSlice /// so as to avoid unnecessary memory copy. Similar to get_pinned_opt but /// allows specifying ColumnFamily pub fn get_pinned_cf_opt>( &self, cf: &ColumnFamily, key: K, readopts: &ReadOptions, ) -> Result, Error> { if readopts.inner.is_null() { return Err(Error::new( "Unable to create RocksDB read options. \ This is a fairly trivial call, and its \ failure may be indicative of a \ mis-compiled or mis-loaded RocksDB \ library." .to_owned(), )); } let key = key.as_ref(); unsafe { let val = ffi_try!(ffi::rocksdb_get_pinned_cf( self.inner, readopts.inner, cf.inner, key.as_ptr() as *const c_char, key.len() as size_t, )); if val.is_null() { Ok(None) } else { Ok(Some(DBPinnableSlice::from_c(val))) } } } /// Return the value associated with a key using RocksDB's PinnableSlice /// so as to avoid unnecessary memory copy. Similar to get_pinned_cf_opt but /// leverages default options. pub fn get_pinned_cf>( &self, cf: &ColumnFamily, key: K, ) -> Result, Error> { self.get_pinned_cf_opt(cf, key, &ReadOptions::default()) } pub fn create_cf>(&mut self, name: N, opts: &Options) -> Result<(), Error> { let cname = match CString::new(name.as_ref().as_bytes()) { Ok(c) => c, Err(_) => { return Err(Error::new( "Failed to convert path to CString \ when opening rocksdb" .to_owned(), )); } }; unsafe { let inner = ffi_try!(ffi::rocksdb_create_column_family( self.inner, opts.inner, cname.as_ptr(), )); self.cfs .insert(name.as_ref().to_string(), ColumnFamily { inner }); }; Ok(()) } pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> { if let Some(cf) = self.cfs.remove(name) { unsafe { ffi_try!(ffi::rocksdb_drop_column_family(self.inner, cf.inner,)); } Ok(()) } else { Err(Error::new( format!("Invalid column family: {}", name).to_owned(), )) } } /// Return the underlying column family handle. pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> { self.cfs.get(name) } pub fn iterator<'a, 'b: 'a>(&'a self, mode: IteratorMode) -> DBIterator<'b> { let readopts = ReadOptions::default(); self.iterator_opt(mode, &readopts) } pub fn iterator_opt<'a, 'b: 'a>( &'a self, mode: IteratorMode, readopts: &ReadOptions, ) -> DBIterator<'b> { DBIterator::new(self, &readopts, mode) } /// Opens an iterator using the provided ReadOptions. /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions pub fn iterator_cf_opt( &self, cf_handle: &ColumnFamily, readopts: &ReadOptions, mode: IteratorMode, ) -> Result { DBIterator::new_cf(self, cf_handle, &readopts, mode) } /// Opens an iterator with `set_total_order_seek` enabled. /// This must be used to iterate across prefixes when `set_memtable_factory` has been called /// with a Hash-based implementation. pub fn full_iterator<'a, 'b: 'a>(&'a self, mode: IteratorMode) -> DBIterator<'b> { let mut opts = ReadOptions::default(); opts.set_total_order_seek(true); DBIterator::new(self, &opts, mode) } pub fn prefix_iterator<'a, 'b: 'a, P: AsRef<[u8]>>(&'a self, prefix: P) -> DBIterator<'b> { let mut opts = ReadOptions::default(); opts.set_prefix_same_as_start(true); DBIterator::new( self, &opts, IteratorMode::From(prefix.as_ref(), Direction::Forward), ) } pub fn iterator_cf( &self, cf_handle: &ColumnFamily, mode: IteratorMode, ) -> Result { let opts = ReadOptions::default(); DBIterator::new_cf(self, cf_handle, &opts, mode) } pub fn full_iterator_cf( &self, cf_handle: &ColumnFamily, mode: IteratorMode, ) -> Result { let mut opts = ReadOptions::default(); opts.set_total_order_seek(true); DBIterator::new_cf(self, cf_handle, &opts, mode) } pub fn prefix_iterator_cf>( &self, cf_handle: &ColumnFamily, prefix: P, ) -> Result { let mut opts = ReadOptions::default(); opts.set_prefix_same_as_start(true); DBIterator::new_cf( self, cf_handle, &opts, IteratorMode::From(prefix.as_ref(), Direction::Forward), ) } /// Opens a raw iterator over the database, using the default read options pub fn raw_iterator(&self) -> DBRawIterator { let opts = ReadOptions::default(); DBRawIterator::new(self, &opts) } /// Opens a raw iterator over the given column family, using the default read options pub fn raw_iterator_cf(&self, cf_handle: &ColumnFamily) -> Result { let opts = ReadOptions::default(); DBRawIterator::new_cf(self, cf_handle, &opts) } /// Opens a raw iterator over the database, using the given read options pub fn raw_iterator_opt(&self, readopts: &ReadOptions) -> DBRawIterator { DBRawIterator::new(self, readopts) } /// Opens a raw iterator over the given column family, using the given read options pub fn raw_iterator_cf_opt( &self, cf_handle: &ColumnFamily, readopts: &ReadOptions, ) -> Result { DBRawIterator::new_cf(self, cf_handle, readopts) } pub fn snapshot(&self) -> Snapshot { Snapshot::new(self) } pub fn put_opt(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error> where K: AsRef<[u8]>, V: AsRef<[u8]>, { let key = key.as_ref(); let value = value.as_ref(); unsafe { ffi_try!(ffi::rocksdb_put( self.inner, writeopts.inner, key.as_ptr() as *const c_char, key.len() as size_t, value.as_ptr() as *const c_char, value.len() as size_t, )); Ok(()) } } pub fn put_cf_opt( &self, cf: &ColumnFamily, key: K, value: V, writeopts: &WriteOptions, ) -> Result<(), Error> where K: AsRef<[u8]>, V: AsRef<[u8]>, { let key = key.as_ref(); let value = value.as_ref(); unsafe { ffi_try!(ffi::rocksdb_put_cf( self.inner, writeopts.inner, cf.inner, key.as_ptr() as *const c_char, key.len() as size_t, value.as_ptr() as *const c_char, value.len() as size_t, )); Ok(()) } } pub fn merge_opt(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error> where K: AsRef<[u8]>, V: AsRef<[u8]>, { let key = key.as_ref(); let value = value.as_ref(); unsafe { ffi_try!(ffi::rocksdb_merge( self.inner, writeopts.inner, key.as_ptr() as *const c_char, key.len() as size_t, value.as_ptr() as *const c_char, value.len() as size_t, )); Ok(()) } } pub fn merge_cf_opt( &self, cf: &ColumnFamily, key: K, value: V, writeopts: &WriteOptions, ) -> Result<(), Error> where K: AsRef<[u8]>, V: AsRef<[u8]>, { let key = key.as_ref(); let value = value.as_ref(); unsafe { ffi_try!(ffi::rocksdb_merge_cf( self.inner, writeopts.inner, cf.inner, key.as_ptr() as *const c_char, key.len() as size_t, value.as_ptr() as *const c_char, value.len() as size_t, )); Ok(()) } } pub fn delete_opt>( &self, key: K, writeopts: &WriteOptions, ) -> Result<(), Error> { let key = key.as_ref(); unsafe { ffi_try!(ffi::rocksdb_delete( self.inner, writeopts.inner, key.as_ptr() as *const c_char, key.len() as size_t, )); Ok(()) } } pub fn delete_cf_opt>( &self, cf: &ColumnFamily, key: K, writeopts: &WriteOptions, ) -> Result<(), Error> { let key = key.as_ref(); unsafe { ffi_try!(ffi::rocksdb_delete_cf( self.inner, writeopts.inner, cf.inner, key.as_ptr() as *const c_char, key.len() as size_t, )); Ok(()) } } pub fn put(&self, key: K, value: V) -> Result<(), Error> where K: AsRef<[u8]>, V: AsRef<[u8]>, { self.put_opt(key.as_ref(), value.as_ref(), &WriteOptions::default()) } pub fn put_cf(&self, cf: &ColumnFamily, key: K, value: V) -> Result<(), Error> where K: AsRef<[u8]>, V: AsRef<[u8]>, { self.put_cf_opt(cf, key.as_ref(), value.as_ref(), &WriteOptions::default()) } pub fn merge(&self, key: K, value: V) -> Result<(), Error> where K: AsRef<[u8]>, V: AsRef<[u8]>, { self.merge_opt(key.as_ref(), value.as_ref(), &WriteOptions::default()) } pub fn merge_cf(&self, cf: &ColumnFamily, key: K, value: V) -> Result<(), Error> where K: AsRef<[u8]>, V: AsRef<[u8]>, { self.merge_cf_opt(cf, key.as_ref(), value.as_ref(), &WriteOptions::default()) } pub fn delete>(&self, key: K) -> Result<(), Error> { self.delete_opt(key.as_ref(), &WriteOptions::default()) } pub fn delete_cf>(&self, cf: &ColumnFamily, key: K) -> Result<(), Error> { self.delete_cf_opt(cf, key.as_ref(), &WriteOptions::default()) } pub fn compact_range, E: AsRef<[u8]>>(&self, start: Option, end: Option) { unsafe { let start = start.as_ref().map(|s| s.as_ref()); let end = end.as_ref().map(|e| e.as_ref()); ffi::rocksdb_compact_range( self.inner, opt_bytes_to_ptr(start), start.map_or(0, |s| s.len()) as size_t, opt_bytes_to_ptr(end), end.map_or(0, |e| e.len()) as size_t, ); } } pub fn compact_range_cf, E: AsRef<[u8]>>( &self, cf: &ColumnFamily, start: Option, end: Option, ) { unsafe { let start = start.as_ref().map(|s| s.as_ref()); let end = end.as_ref().map(|e| e.as_ref()); ffi::rocksdb_compact_range_cf( self.inner, cf.inner, opt_bytes_to_ptr(start), start.map_or(0, |s| s.len()) as size_t, opt_bytes_to_ptr(end), end.map_or(0, |e| e.len()) as size_t, ); } } pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), Error> { let copts = opts .iter() .map(|(name, value)| { let cname = match CString::new(name.as_bytes()) { Ok(cname) => cname, Err(e) => return Err(Error::new(format!("Invalid option name `{}`", e))), }; let cvalue = match CString::new(value.as_bytes()) { Ok(cvalue) => cvalue, Err(e) => return Err(Error::new(format!("Invalid option value: `{}`", e))), }; Ok((cname, cvalue)) }) .collect::, Error>>()?; let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect(); let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect(); let count = opts.len() as i32; unsafe { ffi_try!(ffi::rocksdb_set_options( self.inner, count, cnames.as_ptr(), cvalues.as_ptr(), )); } Ok(()) } /// Retrieves a RocksDB property by name. /// /// For a full list of properties, see /// https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634 pub fn property_value(&self, name: &str) -> Result, Error> { let prop_name = match CString::new(name) { Ok(c) => c, Err(e) => { return Err(Error::new(format!( "Failed to convert property name to CString: {}", e ))); } }; unsafe { let value = ffi::rocksdb_property_value(self.inner, prop_name.as_ptr()); if value.is_null() { return Ok(None); } let str_value = match CStr::from_ptr(value).to_str() { Ok(s) => s.to_owned(), Err(e) => { return Err(Error::new(format!( "Failed to convert property value to string: {}", e ))); } }; libc::free(value as *mut c_void); Ok(Some(str_value)) } } /// Retrieves a RocksDB property by name, for a specific column family. /// /// For a full list of properties, see /// https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634 pub fn property_value_cf( &self, cf: &ColumnFamily, name: &str, ) -> Result, Error> { let prop_name = match CString::new(name) { Ok(c) => c, Err(e) => { return Err(Error::new(format!( "Failed to convert property name to CString: {}", e ))); } }; unsafe { let value = ffi::rocksdb_property_value_cf(self.inner, cf.inner, prop_name.as_ptr()); if value.is_null() { return Ok(None); } let str_value = match CStr::from_ptr(value).to_str() { Ok(s) => s.to_owned(), Err(e) => { return Err(Error::new(format!( "Failed to convert property value to string: {}", e ))); } }; libc::free(value as *mut c_void); Ok(Some(str_value)) } } /// Retrieves a RocksDB property and casts it to an integer. /// /// For a full list of properties that return int values, see /// https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689 pub fn property_int_value(&self, name: &str) -> Result, Error> { match self.property_value(name) { Ok(Some(value)) => match value.parse::() { Ok(int_value) => Ok(Some(int_value)), Err(e) => Err(Error::new(format!( "Failed to convert property value to int: {}", e ))), }, Ok(None) => Ok(None), Err(e) => Err(e), } } /// Retrieves a RocksDB property for a specific column family and casts it to an integer. /// /// For a full list of properties that return int values, see /// https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689 pub fn property_int_value_cf( &self, cf: &ColumnFamily, name: &str, ) -> Result, Error> { match self.property_value_cf(cf, name) { Ok(Some(value)) => match value.parse::() { Ok(int_value) => Ok(Some(int_value)), Err(e) => Err(Error::new(format!( "Failed to convert property value to int: {}", e ))), }, Ok(None) => Ok(None), Err(e) => Err(e), } } /// The sequence number of the most recent transaction. pub fn latest_sequence_number(&self) -> u64 { unsafe { ffi::rocksdb_get_latest_sequence_number(self.inner) } } /// Iterate over batches of write operations since a given sequence. /// /// Produce an iterator that will provide the batches of write operations /// that have occurred since the given sequence (see /// `latest_sequence_number()`). Use the provided iterator to retrieve each /// (`u64`, `WriteBatch`) tuple, and then gather the individual puts and /// deletes using the `WriteBatch::iterate()` function. /// /// Calling `get_updates_since()` with a sequence number that is out of /// bounds will return an error. pub fn get_updates_since(&self, seq_number: u64) -> Result { unsafe { // rocksdb_wal_readoptions_t does not appear to have any functions // for creating and destroying it; fortunately we can pass a nullptr // here to get the default behavior let opts: *const ffi::rocksdb_wal_readoptions_t = ptr::null(); let iter = ffi_try!(ffi::rocksdb_get_updates_since(self.inner, seq_number, opts,)); Ok(DBWALIterator { inner: iter }) } } } /// Receives the puts and deletes of a write batch. /// /// The application must provide an implementation of this trait when /// iterating the operations within a `WriteBatch` pub trait WriteBatchIterator { /// Called with a key and value that were `put` into the batch. fn put(&mut self, key: Box<[u8]>, value: Box<[u8]>); /// Called with a key that was `delete`d from the batch. fn delete(&mut self, key: Box<[u8]>); } unsafe extern "C" fn writebatch_put_callback( state: *mut c_void, k: *const c_char, klen: usize, v: *const c_char, vlen: usize, ) { // coerce the raw pointer back into a box, but "leak" it so we prevent // freeing the resource before we are done with it let boxed_cb = Box::from_raw(state as *mut &mut dyn WriteBatchIterator); let leaked_cb = Box::leak(boxed_cb); let key = slice::from_raw_parts(k as *const u8, klen as usize); let value = slice::from_raw_parts(v as *const u8, vlen as usize); leaked_cb.put( key.to_vec().into_boxed_slice(), value.to_vec().into_boxed_slice(), ); } unsafe extern "C" fn writebatch_delete_callback(state: *mut c_void, k: *const c_char, klen: usize) { // coerce the raw pointer back into a box, but "leak" it so we prevent // freeing the resource before we are done with it let boxed_cb = Box::from_raw(state as *mut &mut dyn WriteBatchIterator); let leaked_cb = Box::leak(boxed_cb); let key = slice::from_raw_parts(k as *const u8, klen as usize); leaked_cb.delete(key.to_vec().into_boxed_slice()); } impl WriteBatch { pub fn len(&self) -> usize { unsafe { ffi::rocksdb_writebatch_count(self.inner) as usize } } /// Return WriteBatch serialized size (in bytes). pub fn size_in_bytes(&self) -> usize { unsafe { let mut batch_size: size_t = 0; ffi::rocksdb_writebatch_data(self.inner, &mut batch_size); batch_size as usize } } pub fn is_empty(&self) -> bool { self.len() == 0 } /// Iterate the put and delete operations within this write batch. Note that /// this does _not_ return an `Iterator` but instead will invoke the `put()` /// and `delete()` member functions of the provided `WriteBatchIterator` /// trait implementation. pub fn iterate(&self, callbacks: &mut dyn WriteBatchIterator) { let state = Box::into_raw(Box::new(callbacks)); unsafe { ffi::rocksdb_writebatch_iterate( self.inner, state as *mut c_void, Some(writebatch_put_callback), Some(writebatch_delete_callback), ); // we must manually set the raw box free since there is no // associated "destroy" callback for this object Box::from_raw(state); } } /// Insert a value into the database under the given key. pub fn put(&mut self, key: K, value: V) -> Result<(), Error> where K: AsRef<[u8]>, V: AsRef<[u8]>, { let key = key.as_ref(); let value = value.as_ref(); unsafe { ffi::rocksdb_writebatch_put( self.inner, key.as_ptr() as *const c_char, key.len() as size_t, value.as_ptr() as *const c_char, value.len() as size_t, ); Ok(()) } } pub fn put_cf(&mut self, cf: &ColumnFamily, key: K, value: V) -> Result<(), Error> where K: AsRef<[u8]>, V: AsRef<[u8]>, { let key = key.as_ref(); let value = value.as_ref(); unsafe { ffi::rocksdb_writebatch_put_cf( self.inner, cf.inner, key.as_ptr() as *const c_char, key.len() as size_t, value.as_ptr() as *const c_char, value.len() as size_t, ); Ok(()) } } pub fn merge(&mut self, key: K, value: V) -> Result<(), Error> where K: AsRef<[u8]>, V: AsRef<[u8]>, { let key = key.as_ref(); let value = value.as_ref(); unsafe { ffi::rocksdb_writebatch_merge( self.inner, key.as_ptr() as *const c_char, key.len() as size_t, value.as_ptr() as *const c_char, value.len() as size_t, ); Ok(()) } } pub fn merge_cf(&mut self, cf: &ColumnFamily, key: K, value: V) -> Result<(), Error> where K: AsRef<[u8]>, V: AsRef<[u8]>, { let key = key.as_ref(); let value = value.as_ref(); unsafe { ffi::rocksdb_writebatch_merge_cf( self.inner, cf.inner, key.as_ptr() as *const c_char, key.len() as size_t, value.as_ptr() as *const c_char, value.len() as size_t, ); Ok(()) } } /// Remove the database entry for key. /// /// Returns an error if the key was not found. pub fn delete>(&mut self, key: K) -> Result<(), Error> { let key = key.as_ref(); unsafe { ffi::rocksdb_writebatch_delete( self.inner, key.as_ptr() as *const c_char, key.len() as size_t, ); Ok(()) } } pub fn delete_cf>(&mut self, cf: &ColumnFamily, key: K) -> Result<(), Error> { let key = key.as_ref(); unsafe { ffi::rocksdb_writebatch_delete_cf( self.inner, cf.inner, key.as_ptr() as *const c_char, key.len() as size_t, ); Ok(()) } } /// Remove database entries from start key to end key. /// /// Removes the database entries in the range ["begin_key", "end_key"), i.e., /// including "begin_key" and excluding "end_key". It is not an error if no /// keys exist in the range ["begin_key", "end_key"). pub fn delete_range>(&mut self, from: K, to: K) -> Result<(), Error> { let (start_key, end_key) = (from.as_ref(), to.as_ref()); unsafe { ffi::rocksdb_writebatch_delete_range( self.inner, start_key.as_ptr() as *const c_char, start_key.len() as size_t, end_key.as_ptr() as *const c_char, end_key.len() as size_t, ); Ok(()) } } /// Remove database entries in column family from start key to end key. /// /// Removes the database entries in the range ["begin_key", "end_key"), i.e., /// including "begin_key" and excluding "end_key". It is not an error if no /// keys exist in the range ["begin_key", "end_key"). pub fn delete_range_cf>( &mut self, cf: &ColumnFamily, from: K, to: K, ) -> Result<(), Error> { let (start_key, end_key) = (from.as_ref(), to.as_ref()); unsafe { ffi::rocksdb_writebatch_delete_range_cf( self.inner, cf.inner, start_key.as_ptr() as *const c_char, start_key.len() as size_t, end_key.as_ptr() as *const c_char, end_key.len() as size_t, ); Ok(()) } } /// Clear all updates buffered in this batch. pub fn clear(&mut self) -> Result<(), Error> { unsafe { ffi::rocksdb_writebatch_clear(self.inner); } Ok(()) } } impl Default for WriteBatch { fn default() -> WriteBatch { WriteBatch { inner: unsafe { ffi::rocksdb_writebatch_create() }, } } } impl Drop for WriteBatch { fn drop(&mut self) { unsafe { ffi::rocksdb_writebatch_destroy(self.inner) } } } impl Drop for DB { fn drop(&mut self) { unsafe { for cf in self.cfs.values() { ffi::rocksdb_column_family_handle_destroy(cf.inner); } ffi::rocksdb_close(self.inner); } } } impl fmt::Debug for DB { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "RocksDB {{ path: {:?} }}", self.path()) } } impl Drop for ReadOptions { fn drop(&mut self) { unsafe { ffi::rocksdb_readoptions_destroy(self.inner) } } } impl ReadOptions { // TODO add snapshot setting here // TODO add snapshot wrapper structs with proper destructors; // that struct needs an "iterator" impl too. #[allow(dead_code)] fn fill_cache(&mut self, v: bool) { unsafe { ffi::rocksdb_readoptions_set_fill_cache(self.inner, v as c_uchar); } } fn set_snapshot(&mut self, snapshot: &Snapshot) { unsafe { ffi::rocksdb_readoptions_set_snapshot(self.inner, snapshot.inner); } } /// Set the upper bound for an iterator. /// The upper bound itself is not included on the iteration result. /// /// # Safety /// /// This function will store a clone of key and will give a raw pointer of it to the /// underlying C++ API, therefore, when given to any other [`DB`] method you must ensure /// that this [`ReadOptions`] value does not leave the scope too early (e.g. `DB::iterator_cf_opt`). pub unsafe fn set_iterate_upper_bound>(&mut self, key: K) { let key = key.as_ref(); ffi::rocksdb_readoptions_set_iterate_upper_bound( self.inner, key.as_ptr() as *const c_char, key.len() as size_t, ); } pub fn set_prefix_same_as_start(&mut self, v: bool) { unsafe { ffi::rocksdb_readoptions_set_prefix_same_as_start(self.inner, v as c_uchar) } } pub fn set_total_order_seek(&mut self, v: bool) { unsafe { ffi::rocksdb_readoptions_set_total_order_seek(self.inner, v as c_uchar) } } /// If true, all data read from underlying storage will be /// verified against corresponding checksums. /// /// Default: true pub fn set_verify_checksums(&mut self, v: bool) { unsafe { ffi::rocksdb_readoptions_set_verify_checksums(self.inner, v as c_uchar); } } /// If non-zero, an iterator will create a new table reader which /// performs reads of the given size. Using a large size (> 2MB) can /// improve the performance of forward iteration on spinning disks. /// Default: 0 /// /// ``` /// use rocksdb::{ReadOptions}; /// /// let mut opts = ReadOptions::default(); /// opts.set_readahead_size(4_194_304); // 4mb /// ``` pub fn set_readahead_size(&mut self, v: usize) { unsafe { ffi::rocksdb_readoptions_set_readahead_size(self.inner, v as size_t); } } /// If true, create a tailing iterator. Note that tailing iterators /// only support moving in the forward direction. Iterating in reverse /// or seek_to_last are not supported. pub fn set_tailing(&mut self, v: bool) { unsafe { ffi::rocksdb_readoptions_set_tailing(self.inner, v as c_uchar); } } } impl Default for ReadOptions { fn default() -> ReadOptions { unsafe { ReadOptions { inner: ffi::rocksdb_readoptions_create(), } } } } // Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI // pointer. In most cases, however, this pointer is Send-safe because it is never aliased and // rocksdb internally does not rely on thread-local information for its user-exposed types. unsafe impl<'a> Send for DBRawIterator<'a> {} unsafe impl Send for ReadOptions {} // Sync is similarly safe for many types because they do not expose interior mutability, and their // use within the rocksdb library is generally behind a const reference unsafe impl<'a> Sync for DBRawIterator<'a> {} unsafe impl Sync for ReadOptions {} fn to_cpath>(path: P) -> Result { match CString::new(path.as_ref().to_string_lossy().as_bytes()) { Ok(c) => Ok(c), Err(_) => Err(Error::new( "Failed to convert path to CString when opening DB.".to_owned(), )), } } /// Wrapper around RocksDB PinnableSlice struct. /// /// With a pinnable slice, we can directly leverage in-memory data within /// RocksDB toa void unnecessary memory copies. The struct here wraps the /// returned raw pointer and ensures proper finalization work. pub struct DBPinnableSlice<'a> { ptr: *mut ffi::rocksdb_pinnableslice_t, db: PhantomData<&'a DB>, } // Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI // pointer. In most cases, however, this pointer is Send-safe because it is never aliased and // rocksdb internally does not rely on thread-local information for its user-exposed types. unsafe impl<'a> Send for DBPinnableSlice<'a> {} // Sync is similarly safe for many types because they do not expose interior mutability, and their // use within the rocksdb library is generally behind a const reference unsafe impl<'a> Sync for DBPinnableSlice<'a> {} impl<'a> AsRef<[u8]> for DBPinnableSlice<'a> { fn as_ref(&self) -> &[u8] { // Implement this via Deref so as not to repeat ourselves &*self } } impl<'a> Deref for DBPinnableSlice<'a> { type Target = [u8]; fn deref(&self) -> &[u8] { unsafe { let mut val_len: size_t = 0; let val = ffi::rocksdb_pinnableslice_value(self.ptr, &mut val_len) as *mut u8; slice::from_raw_parts(val, val_len) } } } impl<'a> Drop for DBPinnableSlice<'a> { fn drop(&mut self) { unsafe { ffi::rocksdb_pinnableslice_destroy(self.ptr); } } } impl<'a> DBPinnableSlice<'a> { /// Used to wrap a PinnableSlice from rocksdb to avoid unnecessary memcpy /// /// # Unsafe /// Requires that the pointer must be generated by rocksdb_get_pinned unsafe fn from_c(ptr: *mut ffi::rocksdb_pinnableslice_t) -> DBPinnableSlice<'a> { DBPinnableSlice { ptr, db: PhantomData, } } } #[test] fn external() { let path = "_rust_rocksdb_externaltest"; { let db = DB::open_default(path).unwrap(); let p = db.put(b"k1", b"v1111"); assert!(p.is_ok()); let r: Result>, Error> = db.get(b"k1"); assert_eq!(r.unwrap().unwrap(), b"v1111"); assert!(db.delete(b"k1").is_ok()); assert!(db.get(b"k1").unwrap().is_none()); } let opts = Options::default(); let result = DB::destroy(&opts, path); assert!(result.is_ok()); } #[test] fn errors_do_stuff() { let path = "_rust_rocksdb_error"; { let _db = DB::open_default(path).unwrap(); let opts = Options::default(); // The DB will still be open when we try to destroy it and the lock should fail. match DB::destroy(&opts, path) { Err(s) => { let message = s.to_string(); assert!(message.find("IO error:").is_some()); assert!(message.find("_rust_rocksdb_error/LOCK:").is_some()); } Ok(_) => panic!("should fail"), } } let opts = Options::default(); let result = DB::destroy(&opts, path); assert!(result.is_ok()); } #[test] fn writebatch_works() { let path = "_rust_rocksdb_writebacktest"; { let db = DB::open_default(path).unwrap(); { // test putx let mut batch = WriteBatch::default(); assert!(db.get(b"k1").unwrap().is_none()); assert_eq!(batch.len(), 0); assert!(batch.is_empty()); let _ = batch.put(b"k1", b"v1111"); let _ = batch.put(b"k2", b"v2222"); let _ = batch.put(b"k3", b"v3333"); assert_eq!(batch.len(), 3); assert!(!batch.is_empty()); assert!(db.get(b"k1").unwrap().is_none()); let p = db.write(batch); assert!(p.is_ok()); let r: Result>, Error> = db.get(b"k1"); assert_eq!(r.unwrap().unwrap(), b"v1111"); } { // test delete let mut batch = WriteBatch::default(); let _ = batch.delete(b"k1"); assert_eq!(batch.len(), 1); assert!(!batch.is_empty()); let p = db.write(batch); assert!(p.is_ok()); assert!(db.get(b"k1").unwrap().is_none()); } { // test delete_range let mut batch = WriteBatch::default(); let _ = batch.delete_range(b"k2", b"k4"); assert_eq!(batch.len(), 1); assert!(!batch.is_empty()); let p = db.write(batch); assert!(p.is_ok()); assert!(db.get(b"k2").unwrap().is_none()); assert!(db.get(b"k3").unwrap().is_none()); } { // test size_in_bytes let mut batch = WriteBatch::default(); let before = batch.size_in_bytes(); let _ = batch.put(b"k1", b"v1234567890"); let after = batch.size_in_bytes(); assert!(before + 10 <= after); } } let opts = Options::default(); assert!(DB::destroy(&opts, path).is_ok()); } #[test] fn iterator_test() { let path = "_rust_rocksdb_iteratortest"; { let db = DB::open_default(path).unwrap(); let p = db.put(b"k1", b"v1111"); assert!(p.is_ok()); let p = db.put(b"k2", b"v2222"); assert!(p.is_ok()); let p = db.put(b"k3", b"v3333"); assert!(p.is_ok()); let iter = db.iterator(IteratorMode::Start); for (k, v) in iter { println!( "Hello {}: {}", str::from_utf8(&*k).unwrap(), str::from_utf8(&*v).unwrap() ); } } let opts = Options::default(); assert!(DB::destroy(&opts, path).is_ok()); } #[test] fn iterator_test_tailing() { let path = "_rust_rocksdb_iteratortest_tailing"; { let data = [(b"k1", b"v1"), (b"k2", b"v2"), (b"k3", b"v3")]; let mut ro = ReadOptions::default(); ro.set_tailing(true); let db = DB::open_default(path).unwrap(); let mut data_iter = data.iter(); let (k, v) = data_iter.next().unwrap(); let r = db.put(k, v); assert!(r.is_ok()); let tail_iter = db.iterator_opt(IteratorMode::Start, &ro); for (k, v) in data_iter { let r = db.put(k, v); assert!(r.is_ok()); } let mut tot = 0; for (i, (k, v)) in tail_iter.enumerate() { assert_eq!( (k.to_vec(), v.to_vec()), (data[i].0.to_vec(), data[i].1.to_vec()) ); tot = tot + 1; } assert_eq!(tot, data.len()); } let opts = Options::default(); assert!(DB::destroy(&opts, path).is_ok()); } #[test] fn snapshot_test() { let path = "_rust_rocksdb_snapshottest"; { let db = DB::open_default(path).unwrap(); let p = db.put(b"k1", b"v1111"); assert!(p.is_ok()); let snap = db.snapshot(); let r: Result>, Error> = snap.get(b"k1"); assert_eq!(r.unwrap().unwrap(), b"v1111"); let p = db.put(b"k2", b"v2222"); assert!(p.is_ok()); assert!(db.get(b"k2").unwrap().is_some()); assert!(snap.get(b"k2").unwrap().is_none()); } let opts = Options::default(); assert!(DB::destroy(&opts, path).is_ok()); } #[test] fn set_option_test() { let path = "_rust_rocksdb_set_optionstest"; { let db = DB::open_default(path).unwrap(); // set an option to valid values assert!(db .set_options(&[("disable_auto_compactions", "true")]) .is_ok()); assert!(db .set_options(&[("disable_auto_compactions", "false")]) .is_ok()); // invalid names/values should result in an error assert!(db .set_options(&[("disable_auto_compactions", "INVALID_VALUE")]) .is_err()); assert!(db .set_options(&[("INVALID_NAME", "INVALID_VALUE")]) .is_err()); // option names/values must not contain NULLs assert!(db .set_options(&[("disable_auto_compactions", "true\0")]) .is_err()); assert!(db .set_options(&[("disable_auto_compactions\0", "true")]) .is_err()); // empty options are not allowed assert!(db.set_options(&[]).is_err()); // multiple options can be set in a single API call let multiple_options = [ ("paranoid_file_checks", "true"), ("report_bg_io_stats", "true"), ]; db.set_options(&multiple_options).unwrap(); } assert!(DB::destroy(&Options::default(), path).is_ok()); }