Remove need for &mut self in create_cf and drop_cf (v2) (#506)

master
Ryo Onodera 4 years ago committed by GitHub
parent bc59596da1
commit 0b700fe70d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      .github/workflows/rust.yml
  2. 1
      Cargo.toml
  3. 9
      README.md
  4. 4
      src/backup.rs
  5. 40
      src/column_family.rs
  6. 493
      src/db.rs
  7. 75
      src/db_iterator.rs
  8. 5
      src/db_options.rs
  9. 25
      src/lib.rs
  10. 75
      src/snapshot.rs
  11. 18
      src/write_batch.rs
  12. 10
      tests/fail/open_with_multiple_refs_as_single_threaded.rs
  13. 17
      tests/fail/open_with_multiple_refs_as_single_threaded.stderr
  14. 37
      tests/test_db.rs

@ -88,7 +88,14 @@ jobs:
with:
command: test
args: --manifest-path=librocksdb-sys/Cargo.toml
- name: Run rocksdb tests
- name: Run rocksdb tests (single-threaded cf)
uses: actions-rs/cargo@v1
with:
command: test
- name: Run rocksdb tests (multi-threaded cf)
uses: actions-rs/cargo@v1
env:
RUSTFLAGS: -Awarnings # Suppress "variable does not need to be mutable" warnings
with:
command: test
args: --features multi-threaded-cf

@ -24,6 +24,7 @@ lz4 = ["librocksdb-sys/lz4"]
zstd = ["librocksdb-sys/zstd"]
zlib = ["librocksdb-sys/zlib"]
bzip2 = ["librocksdb-sys/bzip2"]
multi-threaded-cf = []
[dependencies]
libc = "0.2"

@ -42,3 +42,12 @@ compression support, make these changes to your Cargo.toml:
default-features = false
features = ["lz4"]
```
## Multi-threaded ColumnFamily alternation
The underlying RocksDB does allow column families to be created and dropped
from multiple threads concurrently. But this crate doesn't allow it by default
for compatibility. If you need to modify column families concurrently, enable
crate feature called `multi-threaded-cf`, which makes this binding's
data structures to use RwLock by default. Alternatively, you can directly create
`DBWithThreadMode<MultiThreaded>` without enabling the crate feature.

@ -235,7 +235,7 @@ impl Default for BackupEngineOptions {
unsafe {
let opts = ffi::rocksdb_options_create();
if opts.is_null() {
panic!("Could not create RocksDB backup options".to_owned());
panic!("Could not create RocksDB backup options");
}
BackupEngineOptions { inner: opts }
}
@ -247,7 +247,7 @@ impl Default for RestoreOptions {
unsafe {
let opts = ffi::rocksdb_restore_options_create();
if opts.is_null() {
panic!("Could not create RocksDB restore options".to_owned());
panic!("Could not create RocksDB restore options");
}
RestoreOptions { inner: opts }
}

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{ffi, Options};
use crate::{db::MultiThreaded, ffi, Options};
/// The name of the default column family.
///
@ -47,4 +47,42 @@ pub struct ColumnFamily {
pub(crate) inner: *mut ffi::rocksdb_column_family_handle_t,
}
/// A specialized opaque type used to represent a column family by the [`MultiThreaded`]
/// mode. Clone (and Copy) is derived to behave like `&ColumnFamily` (this is used for
/// single-threaded mode). `Clone`/`Copy` is safe because this lifetime is bound to DB like
/// iterators/snapshots. On top of it, this is as cheap and small as `&ColumnFamily` because
/// this only has a single pointer-wide field.
#[derive(Clone, Copy)]
pub struct BoundColumnFamily<'a> {
pub(crate) inner: *mut ffi::rocksdb_column_family_handle_t,
pub(crate) multi_threaded_cfs: std::marker::PhantomData<&'a MultiThreaded>,
}
/// Handy type alias to hide actual type difference to reference [`ColumnFamily`]
/// depending on the `multi-threaded-cf` crate feature.
#[cfg(not(feature = "multi-threaded-cf"))]
pub type ColumnFamilyRef<'a> = &'a ColumnFamily;
#[cfg(feature = "multi-threaded-cf")]
pub type ColumnFamilyRef<'a> = BoundColumnFamily<'a>;
/// Utility trait to accept both supported references to `ColumnFamily`
/// (`&ColumnFamily` and `BoundColumnFamily`)
pub trait AsColumnFamilyRef {
fn inner(&self) -> *mut ffi::rocksdb_column_family_handle_t;
}
impl<'a> AsColumnFamilyRef for &'a ColumnFamily {
fn inner(&self) -> *mut ffi::rocksdb_column_family_handle_t {
self.inner
}
}
impl<'a> AsColumnFamilyRef for BoundColumnFamily<'a> {
fn inner(&self) -> *mut ffi::rocksdb_column_family_handle_t {
self.inner
}
}
unsafe impl Send for ColumnFamily {}
unsafe impl<'a> Send for BoundColumnFamily<'a> {}

@ -14,12 +14,14 @@
//
use crate::{
column_family::AsColumnFamilyRef,
column_family::BoundColumnFamily,
ffi,
ffi_util::{from_cstr, opt_bytes_to_ptr, raw_data, to_cpath},
ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIterator, DBPinnableSlice,
DBRawIterator, DBWALIterator, Direction, Error, FlushOptions, IngestExternalFileOptions,
IteratorMode, Options, ReadOptions, Snapshot, WriteBatch, WriteOptions,
DEFAULT_COLUMN_FAMILY_NAME,
ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode,
DBPinnableSlice, DBRawIteratorWithThreadMode, DBWALIterator, Direction, Error, FlushOptions,
IngestExternalFileOptions, IteratorMode, Options, ReadOptions, SnapshotWithThreadMode,
WriteBatch, WriteOptions, DEFAULT_COLUMN_FAMILY_NAME,
};
use libc::{self, c_char, c_int, c_uchar, c_void, size_t};
@ -27,30 +29,159 @@ use std::collections::BTreeMap;
use std::ffi::{CStr, CString};
use std::fmt;
use std::fs;
use std::marker::PhantomData;
use std::path::Path;
use std::path::PathBuf;
use std::ptr;
use std::slice;
use std::str;
use std::sync::RwLock;
use std::time::Duration;
// Marker trait to specify single or multi threaded column family alternations for DB
// Also, this is minimum common API sharable between SingleThreaded and
// MultiThreaded. Others differ in self mutability and return type.
pub trait ThreadMode {
fn new(cf_map: BTreeMap<String, ColumnFamily>) -> Self;
fn cf_drop_all(&mut self);
}
/// Actual marker type for the internal marker trait `ThreadMode`, which holds
/// a collection of column families without synchronization primitive, providing
/// no overhead for the single-threaded column family alternations. The other
/// mode is [`MultiThreaded`].
///
/// See [`DB`] for more details, including performance implications for each mode
pub struct SingleThreaded {
cfs: BTreeMap<String, ColumnFamily>,
}
/// Actual marker type for the internal marker trait `ThreadMode`, which holds
/// a collection of column families wrapped in a RwLock to be mutated
/// concurrently. The other mode is [`SingleThreaded`].
///
/// See [`DB`] for more details, including performance implications for each mode
pub struct MultiThreaded {
cfs: RwLock<BTreeMap<String, ColumnFamily>>,
}
impl ThreadMode for SingleThreaded {
fn new(cfs: BTreeMap<String, ColumnFamily>) -> Self {
Self { cfs }
}
fn cf_drop_all(&mut self) {
for cf in self.cfs.values() {
unsafe {
ffi::rocksdb_column_family_handle_destroy(cf.inner);
}
}
}
}
impl ThreadMode for MultiThreaded {
fn new(cfs: BTreeMap<String, ColumnFamily>) -> Self {
Self {
cfs: RwLock::new(cfs),
}
}
fn cf_drop_all(&mut self) {
for cf in self.cfs.read().unwrap().values() {
unsafe {
ffi::rocksdb_column_family_handle_destroy(cf.inner);
}
}
}
}
/// A RocksDB database.
///
/// See crate level documentation for a simple usage example.
pub struct DB {
pub struct DBWithThreadMode<T: ThreadMode> {
pub(crate) inner: *mut ffi::rocksdb_t,
cfs: BTreeMap<String, ColumnFamily>,
cfs: T, // Column families are held differently depending on thread mode
path: PathBuf,
}
/// Minimal set of DB-related methods, intended to be generic over
/// `DBWithThreadMode<T>`. Mainly used internally
pub trait DBAccess {
fn inner(&self) -> *mut ffi::rocksdb_t;
fn get_opt<K: AsRef<[u8]>>(
&self,
key: K,
readopts: &ReadOptions,
) -> Result<Option<Vec<u8>>, Error>;
fn get_cf_opt<K: AsRef<[u8]>>(
&self,
cf: impl AsColumnFamilyRef,
key: K,
readopts: &ReadOptions,
) -> Result<Option<Vec<u8>>, Error>;
}
impl<T: ThreadMode> DBAccess for DBWithThreadMode<T> {
fn inner(&self) -> *mut ffi::rocksdb_t {
self.inner
}
fn get_opt<K: AsRef<[u8]>>(
&self,
key: K,
readopts: &ReadOptions,
) -> Result<Option<Vec<u8>>, Error> {
self.get_opt(key, readopts)
}
fn get_cf_opt<K: AsRef<[u8]>>(
&self,
cf: impl AsColumnFamilyRef,
key: K,
readopts: &ReadOptions,
) -> Result<Option<Vec<u8>>, Error> {
self.get_cf_opt(cf, key, readopts)
}
}
/// A type alias to DB instance type with the single-threaded column family
/// creations/deletions
///
/// # Compatibility and multi-threaded mode
///
/// Previously, `DB` was defined as a direct struct. Now, it's type-aliased for
/// compatibility. Use `DBWithThreadMode<MultiThreaded>` for multi-threaded
/// column family alternations.
///
/// # Limited performance implication for single-threaded mode
///
/// Even with [`SingleThreaded`], almost all of RocksDB operations is
/// multi-threaded unless the underlying RocksDB instance is
/// specifically configured otherwise. `SingleThreaded` only forces
/// serialization of column family alternations by requring `&mut self` of DB
/// instance due to its wrapper implementation details.
///
/// # Multi-threaded mode
///
/// [`MultiThreaded`] can be appropriate for the situation of multi-threaded
/// workload including multi-threaded column family alternations, costing the
/// RwLock overhead inside `DB`.
#[cfg(not(feature = "multi-threaded-cf"))]
pub type DB = DBWithThreadMode<SingleThreaded>;
#[cfg(feature = "multi-threaded-cf")]
pub type DB = DBWithThreadMode<MultiThreaded>;
// Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI
// pointer. In most cases, however, this pointer is Send-safe because it is never aliased and
// rocksdb internally does not rely on thread-local information for its user-exposed types.
unsafe impl Send for DB {}
unsafe impl<T: ThreadMode> Send for DBWithThreadMode<T> {}
// Sync is similarly safe for many types because they do not expose interior mutability, and their
// use within the rocksdb library is generally behind a const reference
unsafe impl Sync for DB {}
unsafe impl<T: ThreadMode> Sync for DBWithThreadMode<T> {}
// Specifies whether open DB for read only.
enum AccessType<'a> {
@ -60,17 +191,17 @@ enum AccessType<'a> {
WithTTL { ttl: Duration },
}
impl DB {
impl<T: ThreadMode> DBWithThreadMode<T> {
/// Opens a database with default options.
pub fn open_default<P: AsRef<Path>>(path: P) -> Result<DB, Error> {
pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
let mut opts = Options::default();
opts.create_if_missing(true);
DB::open(&opts, path)
Self::open(&opts, path)
}
/// Opens the database with the specified options.
pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<DB, Error> {
DB::open_cf(opts, path, None::<&str>)
pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
Self::open_cf(opts, path, None::<&str>)
}
/// Opens the database for read only with the specified options.
@ -78,8 +209,8 @@ impl DB {
opts: &Options,
path: P,
error_if_log_file_exist: bool,
) -> Result<DB, Error> {
DB::open_cf_for_read_only(opts, path, None::<&str>, error_if_log_file_exist)
) -> Result<Self, Error> {
Self::open_cf_for_read_only(opts, path, None::<&str>, error_if_log_file_exist)
}
/// Opens the database as a secondary.
@ -87,8 +218,8 @@ impl DB {
opts: &Options,
primary_path: P,
secondary_path: P,
) -> Result<DB, Error> {
DB::open_cf_as_secondary(opts, primary_path, secondary_path, None::<&str>)
) -> Result<Self, Error> {
Self::open_cf_as_secondary(opts, primary_path, secondary_path, None::<&str>)
}
/// Opens the database with a Time to Live compaction filter.
@ -96,16 +227,16 @@ impl DB {
opts: &Options,
path: P,
ttl: Duration,
) -> Result<DB, Error> {
) -> Result<Self, Error> {
let c_path = to_cpath(&path)?;
let db = DB::open_raw(opts, &c_path, &AccessType::WithTTL { ttl })?;
let db = Self::open_raw(opts, &c_path, &AccessType::WithTTL { ttl })?;
if db.is_null() {
return Err(Error::new("Could not initialize database.".to_owned()));
}
Ok(DB {
Ok(Self {
inner: db,
cfs: BTreeMap::new(),
cfs: T::new(BTreeMap::new()),
path: path.as_ref().to_path_buf(),
})
}
@ -113,7 +244,7 @@ impl DB {
/// Opens a database with the given database options and column family names.
///
/// Column families opened using this function will be created with default `Options`.
pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<DB, Error>
pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
where
P: AsRef<Path>,
I: IntoIterator<Item = N>,
@ -123,7 +254,7 @@ impl DB {
.into_iter()
.map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
DB::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
}
/// Opens a database for read only with the given database options and column family names.
@ -132,7 +263,7 @@ impl DB {
path: P,
cfs: I,
error_if_log_file_exist: bool,
) -> Result<DB, Error>
) -> Result<Self, Error>
where
P: AsRef<Path>,
I: IntoIterator<Item = N>,
@ -142,7 +273,7 @@ impl DB {
.into_iter()
.map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
DB::open_cf_descriptors_internal(
Self::open_cf_descriptors_internal(
opts,
path,
cfs,
@ -158,7 +289,7 @@ impl DB {
primary_path: P,
secondary_path: P,
cfs: I,
) -> Result<DB, Error>
) -> Result<Self, Error>
where
P: AsRef<Path>,
I: IntoIterator<Item = N>,
@ -168,7 +299,7 @@ impl DB {
.into_iter()
.map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
DB::open_cf_descriptors_internal(
Self::open_cf_descriptors_internal(
opts,
primary_path,
cfs,
@ -179,12 +310,12 @@ impl DB {
}
/// Opens a database with the given database options and column family descriptors.
pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<DB, Error>
pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
where
P: AsRef<Path>,
I: IntoIterator<Item = ColumnFamilyDescriptor>,
{
DB::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
}
/// Internal implementation for opening RocksDB.
@ -193,7 +324,7 @@ impl DB {
path: P,
cfs: I,
access_type: &AccessType,
) -> Result<DB, Error>
) -> Result<Self, Error>
where
P: AsRef<Path>,
I: IntoIterator<Item = ColumnFamilyDescriptor>,
@ -213,7 +344,7 @@ impl DB {
let mut cf_map = BTreeMap::new();
if cfs.is_empty() {
db = DB::open_raw(opts, &cpath, access_type)?;
db = Self::open_raw(opts, &cpath, access_type)?;
} else {
let mut cfs_v = cfs;
// Always open the default column family.
@ -240,7 +371,7 @@ impl DB {
.map(|cf| cf.options.inner as *const _)
.collect();
db = DB::open_cf_raw(
db = Self::open_cf_raw(
opts,
&cpath,
&cfs_v,
@ -266,10 +397,10 @@ impl DB {
return Err(Error::new("Could not initialize database.".to_owned()));
}
Ok(DB {
Ok(Self {
inner: db,
cfs: cf_map,
path: path.as_ref().to_path_buf(),
cfs: T::new(cf_map),
})
}
@ -408,16 +539,24 @@ impl DB {
}
/// Flushes database memtables to SST files on the disk for a given column family.
pub fn flush_cf_opt(&self, cf: &ColumnFamily, flushopts: &FlushOptions) -> Result<(), Error> {
pub fn flush_cf_opt(
&self,
cf: impl AsColumnFamilyRef,
flushopts: &FlushOptions,
) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_flush_cf(self.inner, flushopts.inner, cf.inner));
ffi_try!(ffi::rocksdb_flush_cf(
self.inner,
flushopts.inner,
cf.inner()
));
}
Ok(())
}
/// Flushes database memtables to SST files on the disk for a given column family using default
/// options.
pub fn flush_cf(&self, cf: &ColumnFamily) -> Result<(), Error> {
pub fn flush_cf(&self, cf: impl AsColumnFamilyRef) -> Result<(), Error> {
self.flush_cf_opt(cf, &FlushOptions::default())
}
@ -462,7 +601,7 @@ impl DB {
/// [`get_pinned_cf_opt`](#method.get_pinned_cf_opt) to avoid unnecessary memory.
pub fn get_cf_opt<K: AsRef<[u8]>>(
&self,
cf: &ColumnFamily,
cf: impl AsColumnFamilyRef,
key: K,
readopts: &ReadOptions,
) -> Result<Option<Vec<u8>>, Error> {
@ -475,7 +614,7 @@ impl DB {
/// [`get_pinned_cf`](#method.get_pinned_cf) to avoid unnecessary memory.
pub fn get_cf<K: AsRef<[u8]>>(
&self,
cf: &ColumnFamily,
cf: impl AsColumnFamilyRef,
key: K,
) -> Result<Option<Vec<u8>>, Error> {
self.get_cf_opt(cf, key.as_ref(), &ReadOptions::default())
@ -524,7 +663,7 @@ impl DB {
/// allows specifying ColumnFamily
pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
&self,
cf: &ColumnFamily,
cf: impl AsColumnFamilyRef,
key: K,
readopts: &ReadOptions,
) -> Result<Option<DBPinnableSlice>, Error> {
@ -541,7 +680,7 @@ impl DB {
let val = ffi_try!(ffi::rocksdb_get_pinned_cf(
self.inner,
readopts.inner,
cf.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
));
@ -558,7 +697,7 @@ impl DB {
/// leverages default options.
pub fn get_pinned_cf<K: AsRef<[u8]>>(
&self,
cf: &ColumnFamily,
cf: impl AsColumnFamilyRef,
key: K,
) -> Result<Option<DBPinnableSlice>, Error> {
self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
@ -607,23 +746,25 @@ impl DB {
}
/// Return the values associated with the given keys and column families.
pub fn multi_get_cf<'c, K, I>(&self, keys: I) -> Result<Vec<Vec<u8>>, Error>
pub fn multi_get_cf<K, I, W>(&self, keys: I) -> Result<Vec<Vec<u8>>, Error>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = (&'c ColumnFamily, K)>,
I: IntoIterator<Item = (W, K)>,
W: AsColumnFamilyRef,
{
self.multi_get_cf_opt(keys, &ReadOptions::default())
}
/// Return the values associated with the given keys and column families using read options.
pub fn multi_get_cf_opt<'c, K, I>(
pub fn multi_get_cf_opt<K, I, W>(
&self,
keys: I,
readopts: &ReadOptions,
) -> Result<Vec<Vec<u8>>, Error>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = (&'c ColumnFamily, K)>,
I: IntoIterator<Item = (W, K)>,
W: AsColumnFamilyRef,
{
let mut boxed_keys: Vec<Box<[u8]>> = Vec::new();
let mut keys_sizes = Vec::new();
@ -639,7 +780,7 @@ impl DB {
.collect();
let ptr_cfs: Vec<_> = column_families
.iter()
.map(|c| c.inner as *const _)
.map(|c| c.inner() as *const _)
.collect();
let mut values = vec![ptr::null_mut(); boxed_keys.len()];
@ -660,44 +801,31 @@ impl DB {
Ok(convert_values(values, values_sizes))
}
pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
let cf_name = if let Ok(c) = CString::new(name.as_ref().as_bytes()) {
fn create_inner_cf_handle(
&self,
name: &str,
opts: &Options,
) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
let cf_name = if let Ok(c) = CString::new(name.as_bytes()) {
c
} else {
return Err(Error::new(
"Failed to convert path to CString when creating cf".to_owned(),
));
};
unsafe {
let inner = ffi_try!(ffi::rocksdb_create_column_family(
Ok(unsafe {
ffi_try!(ffi::rocksdb_create_column_family(
self.inner,
opts.inner,
cf_name.as_ptr(),
));
self.cfs
.insert(name.as_ref().to_string(), ColumnFamily { inner });
};
Ok(())
}
pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
if let Some(cf) = self.cfs.remove(name) {
unsafe {
ffi_try!(ffi::rocksdb_drop_column_family(self.inner, cf.inner));
}
Ok(())
} else {
Err(Error::new(format!("Invalid column family: {}", name)))
}
}
/// Return the underlying column family handle.
pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
self.cfs.get(name)
))
})
}
pub fn iterator<'a: 'b, 'b>(&'a self, mode: IteratorMode) -> DBIterator<'b> {
pub fn iterator<'a: 'b, 'b>(
&'a self,
mode: IteratorMode,
) -> DBIteratorWithThreadMode<'b, Self> {
let readopts = ReadOptions::default();
self.iterator_opt(mode, readopts)
}
@ -706,34 +834,40 @@ impl DB {
&'a self,
mode: IteratorMode,
readopts: ReadOptions,
) -> DBIterator<'b> {
DBIterator::new(self, readopts, mode)
) -> DBIteratorWithThreadMode<'b, Self> {
DBIteratorWithThreadMode::new(self, readopts, mode)
}
/// Opens an iterator using the provided ReadOptions.
/// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions
pub fn iterator_cf_opt<'a: 'b, 'b>(
&'a self,
cf_handle: &ColumnFamily,
cf_handle: impl AsColumnFamilyRef,
readopts: ReadOptions,
mode: IteratorMode,
) -> DBIterator<'b> {
DBIterator::new_cf(self, cf_handle, readopts, mode)
) -> DBIteratorWithThreadMode<'b, Self> {
DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
}
/// Opens an iterator with `set_total_order_seek` enabled.
/// This must be used to iterate across prefixes when `set_memtable_factory` has been called
/// with a Hash-based implementation.
pub fn full_iterator<'a: 'b, 'b>(&'a self, mode: IteratorMode) -> DBIterator<'b> {
pub fn full_iterator<'a: 'b, 'b>(
&'a self,
mode: IteratorMode,
) -> DBIteratorWithThreadMode<'b, Self> {
let mut opts = ReadOptions::default();
opts.set_total_order_seek(true);
DBIterator::new(self, opts, mode)
DBIteratorWithThreadMode::new(self, opts, mode)
}
pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(&'a self, prefix: P) -> DBIterator<'b> {
pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
&'a self,
prefix: P,
) -> DBIteratorWithThreadMode<'b, Self> {
let mut opts = ReadOptions::default();
opts.set_prefix_same_as_start(true);
DBIterator::new(
DBIteratorWithThreadMode::new(
self,
opts,
IteratorMode::From(prefix.as_ref(), Direction::Forward),
@ -742,66 +876,72 @@ impl DB {
pub fn iterator_cf<'a: 'b, 'b>(
&'a self,
cf_handle: &ColumnFamily,
cf_handle: impl AsColumnFamilyRef,
mode: IteratorMode,
) -> DBIterator<'b> {
) -> DBIteratorWithThreadMode<'b, Self> {
let opts = ReadOptions::default();
DBIterator::new_cf(self, cf_handle, opts, mode)
DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
}
pub fn full_iterator_cf<'a: 'b, 'b>(
&'a self,
cf_handle: &ColumnFamily,
cf_handle: impl AsColumnFamilyRef,
mode: IteratorMode,
) -> DBIterator<'b> {
) -> DBIteratorWithThreadMode<'b, Self> {
let mut opts = ReadOptions::default();
opts.set_total_order_seek(true);
DBIterator::new_cf(self, cf_handle, opts, mode)
DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
}
pub fn prefix_iterator_cf<'a: 'b, 'b, P: AsRef<[u8]>>(
pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
&'a self,
cf_handle: &ColumnFamily,
cf_handle: impl AsColumnFamilyRef,
prefix: P,
) -> DBIterator<'b> {
) -> DBIteratorWithThreadMode<'a, Self> {
let mut opts = ReadOptions::default();
opts.set_prefix_same_as_start(true);
DBIterator::new_cf(
DBIteratorWithThreadMode::<'a, Self>::new_cf(
self,
cf_handle,
cf_handle.inner(),
opts,
IteratorMode::From(prefix.as_ref(), Direction::Forward),
)
}
/// Opens a raw iterator over the database, using the default read options
pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIterator<'b> {
pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
let opts = ReadOptions::default();
DBRawIterator::new(self, opts)
DBRawIteratorWithThreadMode::new(self, opts)
}
/// Opens a raw iterator over the given column family, using the default read options
pub fn raw_iterator_cf<'a: 'b, 'b>(&'a self, cf_handle: &ColumnFamily) -> DBRawIterator<'b> {
pub fn raw_iterator_cf<'a: 'b, 'b>(
&'a self,
cf_handle: impl AsColumnFamilyRef,
) -> DBRawIteratorWithThreadMode<'b, Self> {
let opts = ReadOptions::default();
DBRawIterator::new_cf(self, cf_handle, opts)
DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
}
/// Opens a raw iterator over the database, using the given read options
pub fn raw_iterator_opt<'a: 'b, 'b>(&'a self, readopts: ReadOptions) -> DBRawIterator<'b> {
DBRawIterator::new(self, readopts)
pub fn raw_iterator_opt<'a: 'b, 'b>(
&'a self,
readopts: ReadOptions,
) -> DBRawIteratorWithThreadMode<'b, Self> {
DBRawIteratorWithThreadMode::new(self, readopts)
}
/// Opens a raw iterator over the given column family, using the given read options
pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
&'a self,
cf_handle: &ColumnFamily,
cf_handle: impl AsColumnFamilyRef,
readopts: ReadOptions,
) -> DBRawIterator<'b> {
DBRawIterator::new_cf(self, cf_handle, readopts)
) -> DBRawIteratorWithThreadMode<'b, Self> {
DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
}
pub fn snapshot(&self) -> Snapshot {
Snapshot::new(self)
pub fn snapshot(&self) -> SnapshotWithThreadMode<Self> {
SnapshotWithThreadMode::<Self>::new(self)
}
pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
@ -827,7 +967,7 @@ impl DB {
pub fn put_cf_opt<K, V>(
&self,
cf: &ColumnFamily,
cf: impl AsColumnFamilyRef,
key: K,
value: V,
writeopts: &WriteOptions,
@ -843,7 +983,7 @@ impl DB {
ffi_try!(ffi::rocksdb_put_cf(
self.inner,
writeopts.inner,
cf.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
@ -876,7 +1016,7 @@ impl DB {
pub fn merge_cf_opt<K, V>(
&self,
cf: &ColumnFamily,
cf: impl AsColumnFamilyRef,
key: K,
value: V,
writeopts: &WriteOptions,
@ -892,7 +1032,7 @@ impl DB {
ffi_try!(ffi::rocksdb_merge_cf(
self.inner,
writeopts.inner,
cf.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
@ -922,7 +1062,7 @@ impl DB {
pub fn delete_cf_opt<K: AsRef<[u8]>>(
&self,
cf: &ColumnFamily,
cf: impl AsColumnFamilyRef,
key: K,
writeopts: &WriteOptions,
) -> Result<(), Error> {
@ -932,7 +1072,7 @@ impl DB {
ffi_try!(ffi::rocksdb_delete_cf(
self.inner,
writeopts.inner,
cf.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
));
@ -943,7 +1083,7 @@ impl DB {
/// Removes the database entries in the range `["from", "to")` using given write options.
pub fn delete_range_cf_opt<K: AsRef<[u8]>>(
&self,
cf: &ColumnFamily,
cf: impl AsColumnFamilyRef,
from: K,
to: K,
writeopts: &WriteOptions,
@ -955,7 +1095,7 @@ impl DB {
ffi_try!(ffi::rocksdb_delete_range_cf(
self.inner,
writeopts.inner,
cf.inner,
cf.inner(),
from.as_ptr() as *const c_char,
from.len() as size_t,
to.as_ptr() as *const c_char,
@ -973,7 +1113,7 @@ impl DB {
self.put_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
}
pub fn put_cf<K, V>(&self, cf: &ColumnFamily, key: K, value: V) -> Result<(), Error>
pub fn put_cf<K, V>(&self, cf: impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
@ -989,7 +1129,7 @@ impl DB {
self.merge_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
}
pub fn merge_cf<K, V>(&self, cf: &ColumnFamily, key: K, value: V) -> Result<(), Error>
pub fn merge_cf<K, V>(&self, cf: impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
@ -1001,14 +1141,18 @@ impl DB {
self.delete_opt(key.as_ref(), &WriteOptions::default())
}
pub fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<(), Error> {
pub fn delete_cf<K: AsRef<[u8]>>(
&self,
cf: impl AsColumnFamilyRef,
key: K,
) -> Result<(), Error> {
self.delete_cf_opt(cf, key.as_ref(), &WriteOptions::default())
}
/// Removes the database entries in the range `["from", "to")` using default write options.
pub fn delete_range_cf<K: AsRef<[u8]>>(
&self,
cf: &ColumnFamily,
cf: impl AsColumnFamilyRef,
from: K,
to: K,
) -> Result<(), Error> {
@ -1057,7 +1201,7 @@ impl DB {
/// given column family. This is not likely to be needed for typical usage.
pub fn compact_range_cf<S: AsRef<[u8]>, E: AsRef<[u8]>>(
&self,
cf: &ColumnFamily,
cf: impl AsColumnFamilyRef,
start: Option<S>,
end: Option<E>,
) {
@ -1067,7 +1211,7 @@ impl DB {
ffi::rocksdb_compact_range_cf(
self.inner,
cf.inner,
cf.inner(),
opt_bytes_to_ptr(start),
start.map_or(0, |s| s.len()) as size_t,
opt_bytes_to_ptr(end),
@ -1079,7 +1223,7 @@ impl DB {
/// Same as `compact_range_cf` but with custom options.
pub fn compact_range_cf_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
&self,
cf: &ColumnFamily,
cf: impl AsColumnFamilyRef,
start: Option<S>,
end: Option<E>,
opts: &CompactOptions,
@ -1090,7 +1234,7 @@ impl DB {
ffi::rocksdb_compact_range_cf_opt(
self.inner,
cf.inner,
cf.inner(),
opts.inner,
opt_bytes_to_ptr(start),
start.map_or(0, |s| s.len()) as size_t,
@ -1118,7 +1262,7 @@ impl DB {
pub fn set_options_cf(
&self,
cf_handle: &ColumnFamily,
cf: impl AsColumnFamilyRef,
opts: &[(&str, &str)],
) -> Result<(), Error> {
let copts = convert_options(opts)?;
@ -1128,7 +1272,7 @@ impl DB {
unsafe {
ffi_try!(ffi::rocksdb_set_options_cf(
self.inner,
cf_handle.inner,
cf.inner(),
count,
cnames.as_ptr(),
cvalues.as_ptr(),
@ -1179,7 +1323,7 @@ impl DB {
/// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634).
pub fn property_value_cf(
&self,
cf: &ColumnFamily,
cf: impl AsColumnFamilyRef,
name: &str,
) -> Result<Option<String>, Error> {
let prop_name = match CString::new(name) {
@ -1193,7 +1337,7 @@ impl DB {
};
unsafe {
let value = ffi::rocksdb_property_value_cf(self.inner, cf.inner, prop_name.as_ptr());
let value = ffi::rocksdb_property_value_cf(self.inner, cf.inner(), prop_name.as_ptr());
if value.is_null() {
return Ok(None);
}
@ -1237,7 +1381,7 @@ impl DB {
/// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
pub fn property_int_value_cf(
&self,
cf: &ColumnFamily,
cf: impl AsColumnFamilyRef,
name: &str,
) -> Result<Option<u64>, Error> {
match self.property_value_cf(cf, name) {
@ -1314,17 +1458,17 @@ impl DB {
/// with default opts
pub fn ingest_external_file_cf<P: AsRef<Path>>(
&self,
cf: &ColumnFamily,
cf: impl AsColumnFamilyRef,
paths: Vec<P>,
) -> Result<(), Error> {
let opts = IngestExternalFileOptions::default();
self.ingest_external_file_cf_opts(&cf, &opts, paths)
self.ingest_external_file_cf_opts(cf, &opts, paths)
}
/// Loads a list of external SST files created with SstFileWriter into the DB for given Column Family
pub fn ingest_external_file_cf_opts<P: AsRef<Path>>(
&self,
cf: &ColumnFamily,
cf: impl AsColumnFamilyRef,
opts: &IngestExternalFileOptions,
paths: Vec<P>,
) -> Result<(), Error> {
@ -1335,7 +1479,7 @@ impl DB {
let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
self.ingest_external_file_raw_cf(&cf, &opts, &paths_v, &cpaths)
self.ingest_external_file_raw_cf(cf, &opts, &paths_v, &cpaths)
}
fn ingest_external_file_raw(
@ -1357,7 +1501,7 @@ impl DB {
fn ingest_external_file_raw_cf(
&self,
cf: &ColumnFamily,
cf: impl AsColumnFamilyRef,
opts: &IngestExternalFileOptions,
paths_v: &[CString],
cpaths: &[*const c_char],
@ -1365,7 +1509,7 @@ impl DB {
unsafe {
ffi_try!(ffi::rocksdb_ingest_external_file_cf(
self.inner,
cf.inner,
cf.inner(),
cpaths.as_ptr(),
paths_v.len(),
opts.inner as *const _
@ -1426,8 +1570,8 @@ impl DB {
/// entirely in the range.
///
/// Note: L0 files are left regardless of whether they're in the range.
///
/// Snapshots before the delete might not see the data in the given range.
///
/// SnapshotWithThreadModes before the delete might not see the data in the given range.
pub fn delete_file_in_range<K: AsRef<[u8]>>(&self, from: K, to: K) -> Result<(), Error> {
let from = from.as_ref();
let to = to.as_ref();
@ -1446,7 +1590,7 @@ impl DB {
/// Same as `delete_file_in_range` but only for specific column family
pub fn delete_file_in_range_cf<K: AsRef<[u8]>>(
&self,
cf: &ColumnFamily,
cf: impl AsColumnFamilyRef,
from: K,
to: K,
) -> Result<(), Error> {
@ -1455,7 +1599,7 @@ impl DB {
unsafe {
ffi_try!(ffi::rocksdb_delete_file_in_range_cf(
self.inner,
cf.inner,
cf.inner(),
from.as_ptr() as *const c_char,
from.len() as size_t,
to.as_ptr() as *const c_char,
@ -1473,18 +1617,85 @@ impl DB {
}
}
impl Drop for DB {
impl DBWithThreadMode<SingleThreaded> {
/// Creates column family with given name and options
pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
self.cfs
.cfs
.insert(name.as_ref().to_string(), ColumnFamily { inner });
Ok(())
}
/// Drops the column family with the given name
pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
let inner = self.inner;
if let Some(cf) = self.cfs.cfs.remove(name) {
unsafe {
ffi_try!(ffi::rocksdb_drop_column_family(inner, cf.inner));
}
Ok(())
} else {
Err(Error::new(format!("Invalid column family: {}", name)))
}
}
/// Returns the underlying column family handle
pub fn cf_handle<'a>(&'a self, name: &str) -> Option<&'a ColumnFamily> {
self.cfs.cfs.get(name)
}
}
impl DBWithThreadMode<MultiThreaded> {
/// Creates column family with given name and options
pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
self.cfs
.cfs
.write()
.unwrap()
.insert(name.as_ref().to_string(), ColumnFamily { inner });
Ok(())
}
/// Drops the column family with the given name by internally locking the inner column
/// family map. This avoids needing `&mut self` reference
pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
let inner = self.inner;
if let Some(cf) = self.cfs.cfs.write().unwrap().remove(name) {
unsafe {
ffi_try!(ffi::rocksdb_drop_column_family(inner, cf.inner));
}
Ok(())
} else {
Err(Error::new(format!("Invalid column family: {}", name)))
}
}
/// Returns the underlying column family handle
pub fn cf_handle(&self, name: &str) -> Option<BoundColumnFamily> {
self.cfs
.cfs
.read()
.unwrap()
.get(name)
.map(|cf| BoundColumnFamily {
inner: cf.inner,
multi_threaded_cfs: PhantomData,
})
}
}
impl<T: ThreadMode> Drop for DBWithThreadMode<T> {
fn drop(&mut self) {
unsafe {
for cf in self.cfs.values() {
ffi::rocksdb_column_family_handle_destroy(cf.inner);
}
self.cfs.cf_drop_all();
ffi::rocksdb_close(self.inner);
}
}
}
impl fmt::Debug for DB {
impl<T: ThreadMode> fmt::Debug for DBWithThreadMode<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "RocksDB {{ path: {:?} }}", self.path())
}

@ -12,15 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{ffi, ColumnFamily, Error, ReadOptions, WriteBatch, DB};
use crate::db::{DBAccess, DB};
use crate::{ffi, Error, ReadOptions, WriteBatch};
use libc::{c_char, c_uchar, size_t};
use std::marker::PhantomData;
use std::slice;
/// A type alias to keep compatibility. See [`DBRawIteratorWithThreadMode`] for details
pub type DBRawIterator<'a> = DBRawIteratorWithThreadMode<'a, DB>;
/// An iterator over a database or column family, with specifiable
/// ranges and direction.
///
/// This iterator is different to the standard ``DBIterator`` as it aims Into
/// This iterator is different to the standard ``DBIteratorWithThreadMode`` as it aims Into
/// replicate the underlying iterator API within RocksDB itself. This should
/// give access to more performance and flexibility but departs from the
/// widely recognised Rust idioms.
@ -65,7 +69,7 @@ use std::slice;
/// }
/// let _ = DB::destroy(&Options::default(), path);
/// ```
pub struct DBRawIterator<'a> {
pub struct DBRawIteratorWithThreadMode<'a, D: DBAccess> {
inner: *mut ffi::rocksdb_iterator_t,
/// When iterate_upper_bound is set, the inner C iterator keeps a pointer to the upper bound
@ -73,14 +77,14 @@ pub struct DBRawIterator<'a> {
/// iterator is being used.
_readopts: ReadOptions,
db: PhantomData<&'a DB>,
db: PhantomData<&'a D>,
}
impl<'a> DBRawIterator<'a> {
pub(crate) fn new(db: &DB, readopts: ReadOptions) -> DBRawIterator<'a> {
impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> {
pub(crate) fn new(db: &D, readopts: ReadOptions) -> DBRawIteratorWithThreadMode<'a, D> {
unsafe {
DBRawIterator {
inner: ffi::rocksdb_create_iterator(db.inner, readopts.inner),
DBRawIteratorWithThreadMode {
inner: ffi::rocksdb_create_iterator(db.inner(), readopts.inner),
_readopts: readopts,
db: PhantomData,
}
@ -88,13 +92,13 @@ impl<'a> DBRawIterator<'a> {
}
pub(crate) fn new_cf(
db: &DB,
cf_handle: &ColumnFamily,
db: &'a D,
cf_handle: *mut ffi::rocksdb_column_family_handle_t,
readopts: ReadOptions,
) -> DBRawIterator<'a> {
) -> DBRawIteratorWithThreadMode<'a, D> {
unsafe {
DBRawIterator {
inner: ffi::rocksdb_create_iterator_cf(db.inner, readopts.inner, cf_handle.inner),
DBRawIteratorWithThreadMode {
inner: ffi::rocksdb_create_iterator_cf(db.inner(), readopts.inner, cf_handle),
_readopts: readopts,
db: PhantomData,
}
@ -105,7 +109,7 @@ impl<'a> DBRawIterator<'a> {
/// it reaches the end of its defined range, or when it encounters an error.
///
/// To check whether the iterator encountered an error after `valid` has
/// returned `false`, use the [`status`](DBRawIterator::status) method. `status` will never
/// returned `false`, use the [`status`](DBRawIteratorWithThreadMode::status) method. `status` will never
/// return an error when `valid` is `true`.
pub fn valid(&self) -> bool {
unsafe { ffi::rocksdb_iter_valid(self.inner) != 0 }
@ -113,7 +117,7 @@ impl<'a> DBRawIterator<'a> {
/// Returns an error `Result` if the iterator has encountered an error
/// during operation. When an error is encountered, the iterator is
/// invalidated and [`valid`](DBRawIterator::valid) will return `false` when called.
/// invalidated and [`valid`](DBRawIteratorWithThreadMode::valid) will return `false` when called.
///
/// Performing a seek will discard the current status.
pub fn status(&self) -> Result<(), Error> {
@ -323,7 +327,7 @@ impl<'a> DBRawIterator<'a> {
}
}
impl<'a> Drop for DBRawIterator<'a> {
impl<'a, D: DBAccess> Drop for DBRawIteratorWithThreadMode<'a, D> {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_iter_destroy(self.inner);
@ -331,8 +335,11 @@ impl<'a> Drop for DBRawIterator<'a> {
}
}
unsafe impl<'a> Send for DBRawIterator<'a> {}
unsafe impl<'a> Sync for DBRawIterator<'a> {}
unsafe impl<'a, D: DBAccess> Send for DBRawIteratorWithThreadMode<'a, D> {}
unsafe impl<'a, D: DBAccess> Sync for DBRawIteratorWithThreadMode<'a, D> {}
/// A type alias to keep compatibility. See [`DBIteratorWithThreadMode`] for details
pub type DBIterator<'a> = DBIteratorWithThreadMode<'a, DB>;
/// An iterator over a database or column family, with specifiable
/// ranges and direction.
@ -365,8 +372,8 @@ unsafe impl<'a> Sync for DBRawIterator<'a> {}
/// }
/// let _ = DB::destroy(&Options::default(), path);
/// ```
pub struct DBIterator<'a> {
raw: DBRawIterator<'a>,
pub struct DBIteratorWithThreadMode<'a, D: DBAccess> {
raw: DBRawIteratorWithThreadMode<'a, D>,
direction: Direction,
just_seeked: bool,
}
@ -384,10 +391,10 @@ pub enum IteratorMode<'a> {
From(&'a [u8], Direction),
}
impl<'a> DBIterator<'a> {
pub(crate) fn new(db: &DB, readopts: ReadOptions, mode: IteratorMode) -> DBIterator<'a> {
let mut rv = DBIterator {
raw: DBRawIterator::new(db, readopts),
impl<'a, D: DBAccess> DBIteratorWithThreadMode<'a, D> {
pub(crate) fn new(db: &D, readopts: ReadOptions, mode: IteratorMode) -> Self {
let mut rv = DBIteratorWithThreadMode {
raw: DBRawIteratorWithThreadMode::new(db, readopts),
direction: Direction::Forward, // blown away by set_mode()
just_seeked: false,
};
@ -396,13 +403,13 @@ impl<'a> DBIterator<'a> {
}
pub(crate) fn new_cf(
db: &DB,
cf_handle: &ColumnFamily,
db: &'a D,
cf_handle: *mut ffi::rocksdb_column_family_handle_t,
readopts: ReadOptions,
mode: IteratorMode,
) -> DBIterator<'a> {
let mut rv = DBIterator {
raw: DBRawIterator::new_cf(db, cf_handle, readopts),
) -> Self {
let mut rv = DBIteratorWithThreadMode {
raw: DBRawIteratorWithThreadMode::new_cf(db, cf_handle, readopts),
direction: Direction::Forward, // blown away by set_mode()
just_seeked: false,
};
@ -433,18 +440,18 @@ impl<'a> DBIterator<'a> {
self.just_seeked = true;
}
/// See [`valid`](DBRawIterator::valid)
/// See [`valid`](DBRawIteratorWithThreadMode::valid)
pub fn valid(&self) -> bool {
self.raw.valid()
}
/// See [`status`](DBRawIterator::status)
/// See [`status`](DBRawIteratorWithThreadMode::status)
pub fn status(&self) -> Result<(), Error> {
self.raw.status()
}
}
impl<'a> Iterator for DBIterator<'a> {
impl<'a, D: DBAccess> Iterator for DBIteratorWithThreadMode<'a, D> {
type Item = KVBytes;
fn next(&mut self) -> Option<KVBytes> {
@ -475,8 +482,8 @@ impl<'a> Iterator for DBIterator<'a> {
}
}
impl<'a> Into<DBRawIterator<'a>> for DBIterator<'a> {
fn into(self) -> DBRawIterator<'a> {
impl<'a, D: DBAccess> Into<DBRawIteratorWithThreadMode<'a, D>> for DBIteratorWithThreadMode<'a, D> {
fn into(self) -> DBRawIteratorWithThreadMode<'a, D> {
self.raw
}
}

@ -22,12 +22,13 @@ use crate::{
compaction_filter::{self, CompactionFilterCallback, CompactionFilterFn},
compaction_filter_factory::{self, CompactionFilterFactory},
comparator::{self, ComparatorCallback, CompareFn},
db::DBAccess,
ffi,
merge_operator::{
self, full_merge_callback, partial_merge_callback, MergeFn, MergeOperatorCallback,
},
slice_transform::SliceTransform,
Error, Snapshot,
Error, SnapshotWithThreadMode,
};
fn new_cache(capacity: size_t) -> *mut ffi::rocksdb_cache_t {
@ -2849,7 +2850,7 @@ impl ReadOptions {
/// Sets the snapshot which should be used for the read.
/// The snapshot must belong to the DB that is being read and must
/// not have been released.
pub(crate) fn set_snapshot(&mut self, snapshot: &Snapshot) {
pub(crate) fn set_snapshot<D: DBAccess>(&mut self, snapshot: &SnapshotWithThreadMode<D>) {
unsafe {
ffi::rocksdb_readoptions_set_snapshot(self.inner, snapshot.inner);
}

@ -70,6 +70,10 @@
clippy::missing_safety_doc,
clippy::needless_pass_by_value,
clippy::option_if_let_else,
clippy::ptr_as_ptr,
clippy::missing_panics_doc,
clippy::from_over_into,
clippy::upper_case_acronyms,
)]
#[macro_use]
@ -93,10 +97,16 @@ mod sst_file_writer;
mod write_batch;
pub use crate::{
column_family::{ColumnFamily, ColumnFamilyDescriptor, DEFAULT_COLUMN_FAMILY_NAME},
column_family::{
AsColumnFamilyRef, BoundColumnFamily, ColumnFamily, ColumnFamilyDescriptor,
ColumnFamilyRef, DEFAULT_COLUMN_FAMILY_NAME,
},
compaction_filter::Decision as CompactionDecision,
db::{LiveFile, DB},
db_iterator::{DBIterator, DBRawIterator, DBWALIterator, Direction, IteratorMode},
db::{DBWithThreadMode, LiveFile, MultiThreaded, SingleThreaded, DB},
db_iterator::{
DBIterator, DBIteratorWithThreadMode, DBRawIterator, DBRawIteratorWithThreadMode,
DBWALIterator, Direction, IteratorMode,
},
db_options::{
BlockBasedIndexType, BlockBasedOptions, BottommostLevelCompaction, Cache, CompactOptions,
DBCompactionStyle, DBCompressionType, DBPath, DBRecoveryMode, DataBlockIndexType, Env,
@ -108,7 +118,7 @@ pub use crate::{
merge_operator::MergeOperands,
perf::{PerfContext, PerfMetric, PerfStatsLevel},
slice_transform::SliceTransform,
snapshot::Snapshot,
snapshot::{Snapshot, SnapshotWithThreadMode},
sst_file_writer::SstFileWriter,
write_batch::{WriteBatch, WriteBatchIterator},
};
@ -162,9 +172,9 @@ impl fmt::Display for Error {
#[cfg(test)]
mod test {
use super::{
BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator,
IngestExternalFileOptions, Options, PlainTableFactoryOptions, ReadOptions, Snapshot,
SstFileWriter, WriteBatch, WriteOptions, DB,
BlockBasedOptions, BoundColumnFamily, ColumnFamily, ColumnFamilyDescriptor, DBIterator,
DBRawIterator, IngestExternalFileOptions, Options, PlainTableFactoryOptions, ReadOptions,
Snapshot, SstFileWriter, WriteBatch, WriteOptions, DB,
};
#[test]
@ -188,6 +198,7 @@ mod test {
is_send::<PlainTableFactoryOptions>();
is_send::<ColumnFamilyDescriptor>();
is_send::<ColumnFamily>();
is_send::<BoundColumnFamily<'_>>();
is_send::<SstFileWriter>();
is_send::<WriteBatch>();
}

@ -12,7 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{ffi, ColumnFamily, DBIterator, DBRawIterator, Error, IteratorMode, ReadOptions, DB};
use crate::{
db::DBAccess, ffi, AsColumnFamilyRef, DBIteratorWithThreadMode, DBRawIteratorWithThreadMode,
Error, IteratorMode, ReadOptions, DB,
};
/// A type alias to keep compatibility. See [`SnapshotWithThreadMode`] for details
pub type Snapshot<'a> = SnapshotWithThreadMode<'a, DB>;
/// A consistent view of the database at the point of creation.
///
@ -30,80 +36,91 @@ use crate::{ffi, ColumnFamily, DBIterator, DBRawIterator, Error, IteratorMode, R
/// let _ = DB::destroy(&Options::default(), path);
/// ```
///
pub struct Snapshot<'a> {
db: &'a DB,
pub struct SnapshotWithThreadMode<'a, D: DBAccess> {
db: &'a D,
pub(crate) inner: *const ffi::rocksdb_snapshot_t,
}
impl<'a> Snapshot<'a> {
/// Creates a new `Snapshot` of the database `db`.
pub fn new(db: &DB) -> Snapshot {
let snapshot = unsafe { ffi::rocksdb_create_snapshot(db.inner) };
Snapshot {
impl<'a, D: DBAccess> SnapshotWithThreadMode<'a, D> {
/// Creates a new `SnapshotWithThreadMode` of the database `db`.
pub fn new(db: &'a D) -> Self {
let snapshot = unsafe { ffi::rocksdb_create_snapshot(db.inner()) };
Self {
db,
inner: snapshot,
}
}
/// Creates an iterator over the data in this snapshot, using the default read options.
pub fn iterator(&self, mode: IteratorMode) -> DBIterator<'a> {
pub fn iterator(&self, mode: IteratorMode) -> DBIteratorWithThreadMode<'a, D> {
let readopts = ReadOptions::default();
self.iterator_opt(mode, readopts)
}
/// Creates an iterator over the data in this snapshot under the given column family, using
/// the default read options.
pub fn iterator_cf(&self, cf_handle: &ColumnFamily, mode: IteratorMode) -> DBIterator {
pub fn iterator_cf(
&self,
cf_handle: impl AsColumnFamilyRef,
mode: IteratorMode,
) -> DBIteratorWithThreadMode<D> {
let readopts = ReadOptions::default();
self.iterator_cf_opt(cf_handle, readopts, mode)
}
/// Creates an iterator over the data in this snapshot, using the given read options.
pub fn iterator_opt(&self, mode: IteratorMode, mut readopts: ReadOptions) -> DBIterator<'a> {
pub fn iterator_opt(
&self,
mode: IteratorMode,
mut readopts: ReadOptions,
) -> DBIteratorWithThreadMode<'a, D> {
readopts.set_snapshot(self);
DBIterator::new(self.db, readopts, mode)
DBIteratorWithThreadMode::<D>::new(self.db, readopts, mode)
}
/// Creates an iterator over the data in this snapshot under the given column family, using
/// the given read options.
pub fn iterator_cf_opt(
&self,
cf_handle: &ColumnFamily,
cf_handle: impl AsColumnFamilyRef,
mut readopts: ReadOptions,
mode: IteratorMode,
) -> DBIterator {
) -> DBIteratorWithThreadMode<D> {
readopts.set_snapshot(self);
DBIterator::new_cf(self.db, cf_handle, readopts, mode)
DBIteratorWithThreadMode::new_cf(self.db, cf_handle.inner(), readopts, mode)
}
/// Creates a raw iterator over the data in this snapshot, using the default read options.
pub fn raw_iterator(&self) -> DBRawIterator {
pub fn raw_iterator(&self) -> DBRawIteratorWithThreadMode<D> {
let readopts = ReadOptions::default();
self.raw_iterator_opt(readopts)
}
/// Creates a raw iterator over the data in this snapshot under the given column family, using
/// the default read options.
pub fn raw_iterator_cf(&self, cf_handle: &ColumnFamily) -> DBRawIterator {
pub fn raw_iterator_cf(
&self,
cf_handle: impl AsColumnFamilyRef,
) -> DBRawIteratorWithThreadMode<D> {
let readopts = ReadOptions::default();
self.raw_iterator_cf_opt(cf_handle, readopts)
}
/// Creates a raw iterator over the data in this snapshot, using the given read options.
pub fn raw_iterator_opt(&self, mut readopts: ReadOptions) -> DBRawIterator {
pub fn raw_iterator_opt(&self, mut readopts: ReadOptions) -> DBRawIteratorWithThreadMode<D> {
readopts.set_snapshot(self);
DBRawIterator::new(self.db, readopts)
DBRawIteratorWithThreadMode::new(self.db, readopts)
}
/// Creates a raw iterator over the data in this snapshot under the given column family, using
/// the given read options.
pub fn raw_iterator_cf_opt(
&self,
cf_handle: &ColumnFamily,
cf_handle: impl AsColumnFamilyRef,
mut readopts: ReadOptions,
) -> DBRawIterator {
) -> DBRawIteratorWithThreadMode<D> {
readopts.set_snapshot(self);
DBRawIterator::new_cf(self.db, cf_handle, readopts)
DBRawIteratorWithThreadMode::new_cf(self.db, cf_handle.inner(), readopts)
}
/// Returns the bytes associated with a key value with default read options.
@ -116,7 +133,7 @@ impl<'a> Snapshot<'a> {
/// options.
pub fn get_cf<K: AsRef<[u8]>>(
&self,
cf: &ColumnFamily,
cf: impl AsColumnFamilyRef,
key: K,
) -> Result<Option<Vec<u8>>, Error> {
let readopts = ReadOptions::default();
@ -136,7 +153,7 @@ impl<'a> Snapshot<'a> {
/// Returns the bytes associated with a key value, given column family and read options.
pub fn get_cf_opt<K: AsRef<[u8]>>(
&self,
cf: &ColumnFamily,
cf: impl AsColumnFamilyRef,
key: K,
mut readopts: ReadOptions,
) -> Result<Option<Vec<u8>>, Error> {
@ -145,15 +162,15 @@ impl<'a> Snapshot<'a> {
}
}
impl<'a> Drop for Snapshot<'a> {
impl<'a, D: DBAccess> Drop for SnapshotWithThreadMode<'a, D> {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_release_snapshot(self.db.inner, self.inner);
ffi::rocksdb_release_snapshot(self.db.inner(), self.inner);
}
}
}
/// `Send` and `Sync` implementations for `Snapshot` are safe, because `Snapshot` is
/// `Send` and `Sync` implementations for `SnapshotWithThreadMode` are safe, because `SnapshotWithThreadMode` is
/// immutable and can be safely shared between threads.
unsafe impl<'a> Send for Snapshot<'a> {}
unsafe impl<'a> Sync for Snapshot<'a> {}
unsafe impl<'a, D: DBAccess> Send for SnapshotWithThreadMode<'a, D> {}
unsafe impl<'a, D: DBAccess> Sync for SnapshotWithThreadMode<'a, D> {}

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{ffi, ColumnFamily};
use crate::{ffi, AsColumnFamilyRef};
use libc::{c_char, c_void, size_t};
use std::slice;
@ -134,7 +134,7 @@ impl WriteBatch {
}
}
pub fn put_cf<K, V>(&mut self, cf: &ColumnFamily, key: K, value: V)
pub fn put_cf<K, V>(&mut self, cf: impl AsColumnFamilyRef, key: K, value: V)
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
@ -145,7 +145,7 @@ impl WriteBatch {
unsafe {
ffi::rocksdb_writebatch_put_cf(
self.inner,
cf.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
@ -173,7 +173,7 @@ impl WriteBatch {
}
}
pub fn merge_cf<K, V>(&mut self, cf: &ColumnFamily, key: K, value: V)
pub fn merge_cf<K, V>(&mut self, cf: impl AsColumnFamilyRef, key: K, value: V)
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
@ -184,7 +184,7 @@ impl WriteBatch {
unsafe {
ffi::rocksdb_writebatch_merge_cf(
self.inner,
cf.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
@ -206,13 +206,13 @@ impl WriteBatch {
}
}
pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: &ColumnFamily, key: K) {
pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: impl AsColumnFamilyRef, key: K) {
let key = key.as_ref();
unsafe {
ffi::rocksdb_writebatch_delete_cf(
self.inner,
cf.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
);
@ -243,13 +243,13 @@ impl WriteBatch {
/// Removes the database entries in the range ["begin_key", "end_key"), i.e.,
/// including "begin_key" and excluding "end_key". It is not an error if no
/// keys exist in the range ["begin_key", "end_key").
pub fn delete_range_cf<K: AsRef<[u8]>>(&mut self, cf: &ColumnFamily, from: K, to: K) {
pub fn delete_range_cf<K: AsRef<[u8]>>(&mut self, cf: impl AsColumnFamilyRef, from: K, to: K) {
let (start_key, end_key) = (from.as_ref(), to.as_ref());
unsafe {
ffi::rocksdb_writebatch_delete_range_cf(
self.inner,
cf.inner,
cf.inner(),
start_key.as_ptr() as *const c_char,
start_key.len() as size_t,
end_key.as_ptr() as *const c_char,

@ -0,0 +1,10 @@
use rocksdb::{SingleThreaded, DBWithThreadMode, Options};
fn main() {
let db = DBWithThreadMode::<SingleThreaded>::open_default("/path/to/dummy").unwrap();
let db_ref1 = &db;
let db_ref2 = &db;
let opts = Options::default();
db_ref1.create_cf("cf1", &opts).unwrap();
db_ref2.create_cf("cf2", &opts).unwrap();
}

@ -0,0 +1,17 @@
error[E0596]: cannot borrow `*db_ref1` as mutable, as it is behind a `&` reference
--> $DIR/open_with_multiple_refs_as_single_threaded.rs:8:5
|
5 | let db_ref1 = &db;
| --- help: consider changing this to be a mutable reference: `&mut db`
...
8 | db_ref1.create_cf("cf1", &opts).unwrap();
| ^^^^^^^ `db_ref1` is a `&` reference, so the data it refers to cannot be borrowed as mutable
error[E0596]: cannot borrow `*db_ref2` as mutable, as it is behind a `&` reference
--> $DIR/open_with_multiple_refs_as_single_threaded.rs:9:5
|
6 | let db_ref2 = &db;
| --- help: consider changing this to be a mutable reference: `&mut db`
...
9 | db_ref2.create_cf("cf2", &opts).unwrap();
| ^^^^^^^ `db_ref2` is a `&` reference, so the data it refers to cannot be borrowed as mutable

@ -20,9 +20,10 @@ use pretty_assertions::assert_eq;
use rocksdb::{
perf::get_memory_usage_stats, BlockBasedOptions, BottommostLevelCompaction, Cache,
CompactOptions, DBCompactionStyle, Env, Error, FifoCompactOptions, IteratorMode, Options,
PerfContext, PerfMetric, ReadOptions, SliceTransform, Snapshot, UniversalCompactOptions,
UniversalCompactionStopStyle, WriteBatch, DB,
CompactOptions, DBCompactionStyle, DBWithThreadMode, Env, Error, FifoCompactOptions,
IteratorMode, MultiThreaded, Options, PerfContext, PerfMetric, ReadOptions, SingleThreaded,
SliceTransform, Snapshot, UniversalCompactOptions, UniversalCompactionStopStyle, WriteBatch,
DB,
};
use util::DBPath;
@ -528,6 +529,36 @@ fn test_open_with_ttl() {
assert!(db.get(b"key1").unwrap().is_none());
}
#[test]
fn test_open_as_single_threaded() {
let primary_path = DBPath::new("_rust_rocksdb_test_open_as_single_threaded");
let mut db = DBWithThreadMode::<SingleThreaded>::open_default(&primary_path).unwrap();
let db_ref1 = &mut db;
let opts = Options::default();
db_ref1.create_cf("cf1", &opts).unwrap();
}
#[test]
fn test_open_with_multiple_refs_as_multi_threaded() {
// This tests multiple references can be allowed while creating column families
let primary_path = DBPath::new("_rust_rocksdb_test_open_as_multi_threaded");
let db = DBWithThreadMode::<MultiThreaded>::open_default(&primary_path).unwrap();
let db_ref1 = &db;
let db_ref2 = &db;
let opts = Options::default();
db_ref1.create_cf("cf1", &opts).unwrap();
db_ref2.create_cf("cf2", &opts).unwrap();
}
#[test]
fn test_open_with_multiple_refs_as_single_threaded() {
// This tests multiple references CANNOT be allowed while creating column families
let t = trybuild::TestCases::new();
t.compile_fail("tests/fail/open_with_multiple_refs_as_single_threaded.rs");
}
#[test]
fn compact_range_test() {
let path = DBPath::new("_rust_rocksdb_compact_range_test");

Loading…
Cancel
Save