From 0b700fe70da8ee30483fde79f44df549f8fe11ec Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 6 Apr 2021 20:52:17 +0900 Subject: [PATCH] Remove need for &mut self in create_cf and drop_cf (v2) (#506) --- .github/workflows/rust.yml | 9 +- Cargo.toml | 1 + README.md | 9 + src/backup.rs | 4 +- src/column_family.rs | 40 +- src/db.rs | 493 +++++++++++++----- src/db_iterator.rs | 75 +-- src/db_options.rs | 5 +- src/lib.rs | 25 +- src/snapshot.rs | 75 +-- src/write_batch.rs | 18 +- ...n_with_multiple_refs_as_single_threaded.rs | 10 + ...th_multiple_refs_as_single_threaded.stderr | 17 + tests/test_db.rs | 37 +- 14 files changed, 589 insertions(+), 229 deletions(-) create mode 100644 tests/fail/open_with_multiple_refs_as_single_threaded.rs create mode 100644 tests/fail/open_with_multiple_refs_as_single_threaded.stderr diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 3c75a87..a21299f 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -88,7 +88,14 @@ jobs: with: command: test args: --manifest-path=librocksdb-sys/Cargo.toml - - name: Run rocksdb tests + - name: Run rocksdb tests (single-threaded cf) uses: actions-rs/cargo@v1 with: command: test + - name: Run rocksdb tests (multi-threaded cf) + uses: actions-rs/cargo@v1 + env: + RUSTFLAGS: -Awarnings # Suppress "variable does not need to be mutable" warnings + with: + command: test + args: --features multi-threaded-cf diff --git a/Cargo.toml b/Cargo.toml index edb395b..b333b97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ lz4 = ["librocksdb-sys/lz4"] zstd = ["librocksdb-sys/zstd"] zlib = ["librocksdb-sys/zlib"] bzip2 = ["librocksdb-sys/bzip2"] +multi-threaded-cf = [] [dependencies] libc = "0.2" diff --git a/README.md b/README.md index 62102f3..d0541a4 100644 --- a/README.md +++ b/README.md @@ -42,3 +42,12 @@ compression support, make these changes to your Cargo.toml: default-features = false features = ["lz4"] ``` + +## Multi-threaded ColumnFamily alternation + +The underlying RocksDB does allow column families to be created and dropped +from multiple threads concurrently. But this crate doesn't allow it by default +for compatibility. If you need to modify column families concurrently, enable +crate feature called `multi-threaded-cf`, which makes this binding's +data structures to use RwLock by default. Alternatively, you can directly create +`DBWithThreadMode` without enabling the crate feature. diff --git a/src/backup.rs b/src/backup.rs index f84e0e9..f079042 100644 --- a/src/backup.rs +++ b/src/backup.rs @@ -235,7 +235,7 @@ impl Default for BackupEngineOptions { unsafe { let opts = ffi::rocksdb_options_create(); if opts.is_null() { - panic!("Could not create RocksDB backup options".to_owned()); + panic!("Could not create RocksDB backup options"); } BackupEngineOptions { inner: opts } } @@ -247,7 +247,7 @@ impl Default for RestoreOptions { unsafe { let opts = ffi::rocksdb_restore_options_create(); if opts.is_null() { - panic!("Could not create RocksDB restore options".to_owned()); + panic!("Could not create RocksDB restore options"); } RestoreOptions { inner: opts } } diff --git a/src/column_family.rs b/src/column_family.rs index 2c1b505..1967f7d 100644 --- a/src/column_family.rs +++ b/src/column_family.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{ffi, Options}; +use crate::{db::MultiThreaded, ffi, Options}; /// The name of the default column family. /// @@ -47,4 +47,42 @@ pub struct ColumnFamily { pub(crate) inner: *mut ffi::rocksdb_column_family_handle_t, } +/// A specialized opaque type used to represent a column family by the [`MultiThreaded`] +/// mode. Clone (and Copy) is derived to behave like `&ColumnFamily` (this is used for +/// single-threaded mode). `Clone`/`Copy` is safe because this lifetime is bound to DB like +/// iterators/snapshots. On top of it, this is as cheap and small as `&ColumnFamily` because +/// this only has a single pointer-wide field. +#[derive(Clone, Copy)] +pub struct BoundColumnFamily<'a> { + pub(crate) inner: *mut ffi::rocksdb_column_family_handle_t, + pub(crate) multi_threaded_cfs: std::marker::PhantomData<&'a MultiThreaded>, +} + +/// Handy type alias to hide actual type difference to reference [`ColumnFamily`] +/// depending on the `multi-threaded-cf` crate feature. +#[cfg(not(feature = "multi-threaded-cf"))] +pub type ColumnFamilyRef<'a> = &'a ColumnFamily; + +#[cfg(feature = "multi-threaded-cf")] +pub type ColumnFamilyRef<'a> = BoundColumnFamily<'a>; + +/// Utility trait to accept both supported references to `ColumnFamily` +/// (`&ColumnFamily` and `BoundColumnFamily`) +pub trait AsColumnFamilyRef { + fn inner(&self) -> *mut ffi::rocksdb_column_family_handle_t; +} + +impl<'a> AsColumnFamilyRef for &'a ColumnFamily { + fn inner(&self) -> *mut ffi::rocksdb_column_family_handle_t { + self.inner + } +} + +impl<'a> AsColumnFamilyRef for BoundColumnFamily<'a> { + fn inner(&self) -> *mut ffi::rocksdb_column_family_handle_t { + self.inner + } +} + unsafe impl Send for ColumnFamily {} +unsafe impl<'a> Send for BoundColumnFamily<'a> {} diff --git a/src/db.rs b/src/db.rs index e49fe36..684d6b1 100644 --- a/src/db.rs +++ b/src/db.rs @@ -14,12 +14,14 @@ // use crate::{ + column_family::AsColumnFamilyRef, + column_family::BoundColumnFamily, ffi, ffi_util::{from_cstr, opt_bytes_to_ptr, raw_data, to_cpath}, - ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIterator, DBPinnableSlice, - DBRawIterator, DBWALIterator, Direction, Error, FlushOptions, IngestExternalFileOptions, - IteratorMode, Options, ReadOptions, Snapshot, WriteBatch, WriteOptions, - DEFAULT_COLUMN_FAMILY_NAME, + ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode, + DBPinnableSlice, DBRawIteratorWithThreadMode, DBWALIterator, Direction, Error, FlushOptions, + IngestExternalFileOptions, IteratorMode, Options, ReadOptions, SnapshotWithThreadMode, + WriteBatch, WriteOptions, DEFAULT_COLUMN_FAMILY_NAME, }; use libc::{self, c_char, c_int, c_uchar, c_void, size_t}; @@ -27,30 +29,159 @@ use std::collections::BTreeMap; use std::ffi::{CStr, CString}; use std::fmt; use std::fs; +use std::marker::PhantomData; use std::path::Path; use std::path::PathBuf; use std::ptr; use std::slice; use std::str; +use std::sync::RwLock; use std::time::Duration; +// Marker trait to specify single or multi threaded column family alternations for DB +// Also, this is minimum common API sharable between SingleThreaded and +// MultiThreaded. Others differ in self mutability and return type. +pub trait ThreadMode { + fn new(cf_map: BTreeMap) -> Self; + fn cf_drop_all(&mut self); +} + +/// Actual marker type for the internal marker trait `ThreadMode`, which holds +/// a collection of column families without synchronization primitive, providing +/// no overhead for the single-threaded column family alternations. The other +/// mode is [`MultiThreaded`]. +/// +/// See [`DB`] for more details, including performance implications for each mode +pub struct SingleThreaded { + cfs: BTreeMap, +} + +/// Actual marker type for the internal marker trait `ThreadMode`, which holds +/// a collection of column families wrapped in a RwLock to be mutated +/// concurrently. The other mode is [`SingleThreaded`]. +/// +/// See [`DB`] for more details, including performance implications for each mode +pub struct MultiThreaded { + cfs: RwLock>, +} + +impl ThreadMode for SingleThreaded { + fn new(cfs: BTreeMap) -> Self { + Self { cfs } + } + + fn cf_drop_all(&mut self) { + for cf in self.cfs.values() { + unsafe { + ffi::rocksdb_column_family_handle_destroy(cf.inner); + } + } + } +} + +impl ThreadMode for MultiThreaded { + fn new(cfs: BTreeMap) -> Self { + Self { + cfs: RwLock::new(cfs), + } + } + + fn cf_drop_all(&mut self) { + for cf in self.cfs.read().unwrap().values() { + unsafe { + ffi::rocksdb_column_family_handle_destroy(cf.inner); + } + } + } +} + /// A RocksDB database. /// /// See crate level documentation for a simple usage example. -pub struct DB { +pub struct DBWithThreadMode { pub(crate) inner: *mut ffi::rocksdb_t, - cfs: BTreeMap, + cfs: T, // Column families are held differently depending on thread mode path: PathBuf, } +/// Minimal set of DB-related methods, intended to be generic over +/// `DBWithThreadMode`. Mainly used internally +pub trait DBAccess { + fn inner(&self) -> *mut ffi::rocksdb_t; + + fn get_opt>( + &self, + key: K, + readopts: &ReadOptions, + ) -> Result>, Error>; + + fn get_cf_opt>( + &self, + cf: impl AsColumnFamilyRef, + key: K, + readopts: &ReadOptions, + ) -> Result>, Error>; +} + +impl DBAccess for DBWithThreadMode { + fn inner(&self) -> *mut ffi::rocksdb_t { + self.inner + } + + fn get_opt>( + &self, + key: K, + readopts: &ReadOptions, + ) -> Result>, Error> { + self.get_opt(key, readopts) + } + + fn get_cf_opt>( + &self, + cf: impl AsColumnFamilyRef, + key: K, + readopts: &ReadOptions, + ) -> Result>, Error> { + self.get_cf_opt(cf, key, readopts) + } +} + +/// A type alias to DB instance type with the single-threaded column family +/// creations/deletions +/// +/// # Compatibility and multi-threaded mode +/// +/// Previously, `DB` was defined as a direct struct. Now, it's type-aliased for +/// compatibility. Use `DBWithThreadMode` for multi-threaded +/// column family alternations. +/// +/// # Limited performance implication for single-threaded mode +/// +/// Even with [`SingleThreaded`], almost all of RocksDB operations is +/// multi-threaded unless the underlying RocksDB instance is +/// specifically configured otherwise. `SingleThreaded` only forces +/// serialization of column family alternations by requring `&mut self` of DB +/// instance due to its wrapper implementation details. +/// +/// # Multi-threaded mode +/// +/// [`MultiThreaded`] can be appropriate for the situation of multi-threaded +/// workload including multi-threaded column family alternations, costing the +/// RwLock overhead inside `DB`. +#[cfg(not(feature = "multi-threaded-cf"))] +pub type DB = DBWithThreadMode; + +#[cfg(feature = "multi-threaded-cf")] +pub type DB = DBWithThreadMode; + // Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI // pointer. In most cases, however, this pointer is Send-safe because it is never aliased and // rocksdb internally does not rely on thread-local information for its user-exposed types. -unsafe impl Send for DB {} +unsafe impl Send for DBWithThreadMode {} // Sync is similarly safe for many types because they do not expose interior mutability, and their // use within the rocksdb library is generally behind a const reference -unsafe impl Sync for DB {} +unsafe impl Sync for DBWithThreadMode {} // Specifies whether open DB for read only. enum AccessType<'a> { @@ -60,17 +191,17 @@ enum AccessType<'a> { WithTTL { ttl: Duration }, } -impl DB { +impl DBWithThreadMode { /// Opens a database with default options. - pub fn open_default>(path: P) -> Result { + pub fn open_default>(path: P) -> Result { let mut opts = Options::default(); opts.create_if_missing(true); - DB::open(&opts, path) + Self::open(&opts, path) } /// Opens the database with the specified options. - pub fn open>(opts: &Options, path: P) -> Result { - DB::open_cf(opts, path, None::<&str>) + pub fn open>(opts: &Options, path: P) -> Result { + Self::open_cf(opts, path, None::<&str>) } /// Opens the database for read only with the specified options. @@ -78,8 +209,8 @@ impl DB { opts: &Options, path: P, error_if_log_file_exist: bool, - ) -> Result { - DB::open_cf_for_read_only(opts, path, None::<&str>, error_if_log_file_exist) + ) -> Result { + Self::open_cf_for_read_only(opts, path, None::<&str>, error_if_log_file_exist) } /// Opens the database as a secondary. @@ -87,8 +218,8 @@ impl DB { opts: &Options, primary_path: P, secondary_path: P, - ) -> Result { - DB::open_cf_as_secondary(opts, primary_path, secondary_path, None::<&str>) + ) -> Result { + Self::open_cf_as_secondary(opts, primary_path, secondary_path, None::<&str>) } /// Opens the database with a Time to Live compaction filter. @@ -96,16 +227,16 @@ impl DB { opts: &Options, path: P, ttl: Duration, - ) -> Result { + ) -> Result { let c_path = to_cpath(&path)?; - let db = DB::open_raw(opts, &c_path, &AccessType::WithTTL { ttl })?; + let db = Self::open_raw(opts, &c_path, &AccessType::WithTTL { ttl })?; if db.is_null() { return Err(Error::new("Could not initialize database.".to_owned())); } - Ok(DB { + Ok(Self { inner: db, - cfs: BTreeMap::new(), + cfs: T::new(BTreeMap::new()), path: path.as_ref().to_path_buf(), }) } @@ -113,7 +244,7 @@ impl DB { /// Opens a database with the given database options and column family names. /// /// Column families opened using this function will be created with default `Options`. - pub fn open_cf(opts: &Options, path: P, cfs: I) -> Result + pub fn open_cf(opts: &Options, path: P, cfs: I) -> Result where P: AsRef, I: IntoIterator, @@ -123,7 +254,7 @@ impl DB { .into_iter() .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default())); - DB::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite) + Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite) } /// Opens a database for read only with the given database options and column family names. @@ -132,7 +263,7 @@ impl DB { path: P, cfs: I, error_if_log_file_exist: bool, - ) -> Result + ) -> Result where P: AsRef, I: IntoIterator, @@ -142,7 +273,7 @@ impl DB { .into_iter() .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default())); - DB::open_cf_descriptors_internal( + Self::open_cf_descriptors_internal( opts, path, cfs, @@ -158,7 +289,7 @@ impl DB { primary_path: P, secondary_path: P, cfs: I, - ) -> Result + ) -> Result where P: AsRef, I: IntoIterator, @@ -168,7 +299,7 @@ impl DB { .into_iter() .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default())); - DB::open_cf_descriptors_internal( + Self::open_cf_descriptors_internal( opts, primary_path, cfs, @@ -179,12 +310,12 @@ impl DB { } /// Opens a database with the given database options and column family descriptors. - pub fn open_cf_descriptors(opts: &Options, path: P, cfs: I) -> Result + pub fn open_cf_descriptors(opts: &Options, path: P, cfs: I) -> Result where P: AsRef, I: IntoIterator, { - DB::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite) + Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite) } /// Internal implementation for opening RocksDB. @@ -193,7 +324,7 @@ impl DB { path: P, cfs: I, access_type: &AccessType, - ) -> Result + ) -> Result where P: AsRef, I: IntoIterator, @@ -213,7 +344,7 @@ impl DB { let mut cf_map = BTreeMap::new(); if cfs.is_empty() { - db = DB::open_raw(opts, &cpath, access_type)?; + db = Self::open_raw(opts, &cpath, access_type)?; } else { let mut cfs_v = cfs; // Always open the default column family. @@ -240,7 +371,7 @@ impl DB { .map(|cf| cf.options.inner as *const _) .collect(); - db = DB::open_cf_raw( + db = Self::open_cf_raw( opts, &cpath, &cfs_v, @@ -266,10 +397,10 @@ impl DB { return Err(Error::new("Could not initialize database.".to_owned())); } - Ok(DB { + Ok(Self { inner: db, - cfs: cf_map, path: path.as_ref().to_path_buf(), + cfs: T::new(cf_map), }) } @@ -408,16 +539,24 @@ impl DB { } /// Flushes database memtables to SST files on the disk for a given column family. - pub fn flush_cf_opt(&self, cf: &ColumnFamily, flushopts: &FlushOptions) -> Result<(), Error> { + pub fn flush_cf_opt( + &self, + cf: impl AsColumnFamilyRef, + flushopts: &FlushOptions, + ) -> Result<(), Error> { unsafe { - ffi_try!(ffi::rocksdb_flush_cf(self.inner, flushopts.inner, cf.inner)); + ffi_try!(ffi::rocksdb_flush_cf( + self.inner, + flushopts.inner, + cf.inner() + )); } Ok(()) } /// Flushes database memtables to SST files on the disk for a given column family using default /// options. - pub fn flush_cf(&self, cf: &ColumnFamily) -> Result<(), Error> { + pub fn flush_cf(&self, cf: impl AsColumnFamilyRef) -> Result<(), Error> { self.flush_cf_opt(cf, &FlushOptions::default()) } @@ -462,7 +601,7 @@ impl DB { /// [`get_pinned_cf_opt`](#method.get_pinned_cf_opt) to avoid unnecessary memory. pub fn get_cf_opt>( &self, - cf: &ColumnFamily, + cf: impl AsColumnFamilyRef, key: K, readopts: &ReadOptions, ) -> Result>, Error> { @@ -475,7 +614,7 @@ impl DB { /// [`get_pinned_cf`](#method.get_pinned_cf) to avoid unnecessary memory. pub fn get_cf>( &self, - cf: &ColumnFamily, + cf: impl AsColumnFamilyRef, key: K, ) -> Result>, Error> { self.get_cf_opt(cf, key.as_ref(), &ReadOptions::default()) @@ -524,7 +663,7 @@ impl DB { /// allows specifying ColumnFamily pub fn get_pinned_cf_opt>( &self, - cf: &ColumnFamily, + cf: impl AsColumnFamilyRef, key: K, readopts: &ReadOptions, ) -> Result, Error> { @@ -541,7 +680,7 @@ impl DB { let val = ffi_try!(ffi::rocksdb_get_pinned_cf( self.inner, readopts.inner, - cf.inner, + cf.inner(), key.as_ptr() as *const c_char, key.len() as size_t, )); @@ -558,7 +697,7 @@ impl DB { /// leverages default options. pub fn get_pinned_cf>( &self, - cf: &ColumnFamily, + cf: impl AsColumnFamilyRef, key: K, ) -> Result, Error> { self.get_pinned_cf_opt(cf, key, &ReadOptions::default()) @@ -607,23 +746,25 @@ impl DB { } /// Return the values associated with the given keys and column families. - pub fn multi_get_cf<'c, K, I>(&self, keys: I) -> Result>, Error> + pub fn multi_get_cf(&self, keys: I) -> Result>, Error> where K: AsRef<[u8]>, - I: IntoIterator, + I: IntoIterator, + W: AsColumnFamilyRef, { self.multi_get_cf_opt(keys, &ReadOptions::default()) } /// Return the values associated with the given keys and column families using read options. - pub fn multi_get_cf_opt<'c, K, I>( + pub fn multi_get_cf_opt( &self, keys: I, readopts: &ReadOptions, ) -> Result>, Error> where K: AsRef<[u8]>, - I: IntoIterator, + I: IntoIterator, + W: AsColumnFamilyRef, { let mut boxed_keys: Vec> = Vec::new(); let mut keys_sizes = Vec::new(); @@ -639,7 +780,7 @@ impl DB { .collect(); let ptr_cfs: Vec<_> = column_families .iter() - .map(|c| c.inner as *const _) + .map(|c| c.inner() as *const _) .collect(); let mut values = vec![ptr::null_mut(); boxed_keys.len()]; @@ -660,44 +801,31 @@ impl DB { Ok(convert_values(values, values_sizes)) } - pub fn create_cf>(&mut self, name: N, opts: &Options) -> Result<(), Error> { - let cf_name = if let Ok(c) = CString::new(name.as_ref().as_bytes()) { + fn create_inner_cf_handle( + &self, + name: &str, + opts: &Options, + ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> { + let cf_name = if let Ok(c) = CString::new(name.as_bytes()) { c } else { return Err(Error::new( "Failed to convert path to CString when creating cf".to_owned(), )); }; - unsafe { - let inner = ffi_try!(ffi::rocksdb_create_column_family( + Ok(unsafe { + ffi_try!(ffi::rocksdb_create_column_family( self.inner, opts.inner, cf_name.as_ptr(), - )); - - self.cfs - .insert(name.as_ref().to_string(), ColumnFamily { inner }); - }; - Ok(()) - } - - pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> { - if let Some(cf) = self.cfs.remove(name) { - unsafe { - ffi_try!(ffi::rocksdb_drop_column_family(self.inner, cf.inner)); - } - Ok(()) - } else { - Err(Error::new(format!("Invalid column family: {}", name))) - } - } - - /// Return the underlying column family handle. - pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> { - self.cfs.get(name) + )) + }) } - pub fn iterator<'a: 'b, 'b>(&'a self, mode: IteratorMode) -> DBIterator<'b> { + pub fn iterator<'a: 'b, 'b>( + &'a self, + mode: IteratorMode, + ) -> DBIteratorWithThreadMode<'b, Self> { let readopts = ReadOptions::default(); self.iterator_opt(mode, readopts) } @@ -706,34 +834,40 @@ impl DB { &'a self, mode: IteratorMode, readopts: ReadOptions, - ) -> DBIterator<'b> { - DBIterator::new(self, readopts, mode) + ) -> DBIteratorWithThreadMode<'b, Self> { + DBIteratorWithThreadMode::new(self, readopts, mode) } /// Opens an iterator using the provided ReadOptions. /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions pub fn iterator_cf_opt<'a: 'b, 'b>( &'a self, - cf_handle: &ColumnFamily, + cf_handle: impl AsColumnFamilyRef, readopts: ReadOptions, mode: IteratorMode, - ) -> DBIterator<'b> { - DBIterator::new_cf(self, cf_handle, readopts, mode) + ) -> DBIteratorWithThreadMode<'b, Self> { + DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode) } /// Opens an iterator with `set_total_order_seek` enabled. /// This must be used to iterate across prefixes when `set_memtable_factory` has been called /// with a Hash-based implementation. - pub fn full_iterator<'a: 'b, 'b>(&'a self, mode: IteratorMode) -> DBIterator<'b> { + pub fn full_iterator<'a: 'b, 'b>( + &'a self, + mode: IteratorMode, + ) -> DBIteratorWithThreadMode<'b, Self> { let mut opts = ReadOptions::default(); opts.set_total_order_seek(true); - DBIterator::new(self, opts, mode) + DBIteratorWithThreadMode::new(self, opts, mode) } - pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(&'a self, prefix: P) -> DBIterator<'b> { + pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>( + &'a self, + prefix: P, + ) -> DBIteratorWithThreadMode<'b, Self> { let mut opts = ReadOptions::default(); opts.set_prefix_same_as_start(true); - DBIterator::new( + DBIteratorWithThreadMode::new( self, opts, IteratorMode::From(prefix.as_ref(), Direction::Forward), @@ -742,66 +876,72 @@ impl DB { pub fn iterator_cf<'a: 'b, 'b>( &'a self, - cf_handle: &ColumnFamily, + cf_handle: impl AsColumnFamilyRef, mode: IteratorMode, - ) -> DBIterator<'b> { + ) -> DBIteratorWithThreadMode<'b, Self> { let opts = ReadOptions::default(); - DBIterator::new_cf(self, cf_handle, opts, mode) + DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode) } pub fn full_iterator_cf<'a: 'b, 'b>( &'a self, - cf_handle: &ColumnFamily, + cf_handle: impl AsColumnFamilyRef, mode: IteratorMode, - ) -> DBIterator<'b> { + ) -> DBIteratorWithThreadMode<'b, Self> { let mut opts = ReadOptions::default(); opts.set_total_order_seek(true); - DBIterator::new_cf(self, cf_handle, opts, mode) + DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode) } - pub fn prefix_iterator_cf<'a: 'b, 'b, P: AsRef<[u8]>>( + pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>( &'a self, - cf_handle: &ColumnFamily, + cf_handle: impl AsColumnFamilyRef, prefix: P, - ) -> DBIterator<'b> { + ) -> DBIteratorWithThreadMode<'a, Self> { let mut opts = ReadOptions::default(); opts.set_prefix_same_as_start(true); - DBIterator::new_cf( + DBIteratorWithThreadMode::<'a, Self>::new_cf( self, - cf_handle, + cf_handle.inner(), opts, IteratorMode::From(prefix.as_ref(), Direction::Forward), ) } /// Opens a raw iterator over the database, using the default read options - pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIterator<'b> { + pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> { let opts = ReadOptions::default(); - DBRawIterator::new(self, opts) + DBRawIteratorWithThreadMode::new(self, opts) } /// Opens a raw iterator over the given column family, using the default read options - pub fn raw_iterator_cf<'a: 'b, 'b>(&'a self, cf_handle: &ColumnFamily) -> DBRawIterator<'b> { + pub fn raw_iterator_cf<'a: 'b, 'b>( + &'a self, + cf_handle: impl AsColumnFamilyRef, + ) -> DBRawIteratorWithThreadMode<'b, Self> { let opts = ReadOptions::default(); - DBRawIterator::new_cf(self, cf_handle, opts) + DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts) } /// Opens a raw iterator over the database, using the given read options - pub fn raw_iterator_opt<'a: 'b, 'b>(&'a self, readopts: ReadOptions) -> DBRawIterator<'b> { - DBRawIterator::new(self, readopts) + pub fn raw_iterator_opt<'a: 'b, 'b>( + &'a self, + readopts: ReadOptions, + ) -> DBRawIteratorWithThreadMode<'b, Self> { + DBRawIteratorWithThreadMode::new(self, readopts) } /// Opens a raw iterator over the given column family, using the given read options pub fn raw_iterator_cf_opt<'a: 'b, 'b>( &'a self, - cf_handle: &ColumnFamily, + cf_handle: impl AsColumnFamilyRef, readopts: ReadOptions, - ) -> DBRawIterator<'b> { - DBRawIterator::new_cf(self, cf_handle, readopts) + ) -> DBRawIteratorWithThreadMode<'b, Self> { + DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts) } - pub fn snapshot(&self) -> Snapshot { - Snapshot::new(self) + pub fn snapshot(&self) -> SnapshotWithThreadMode { + SnapshotWithThreadMode::::new(self) } pub fn put_opt(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error> @@ -827,7 +967,7 @@ impl DB { pub fn put_cf_opt( &self, - cf: &ColumnFamily, + cf: impl AsColumnFamilyRef, key: K, value: V, writeopts: &WriteOptions, @@ -843,7 +983,7 @@ impl DB { ffi_try!(ffi::rocksdb_put_cf( self.inner, writeopts.inner, - cf.inner, + cf.inner(), key.as_ptr() as *const c_char, key.len() as size_t, value.as_ptr() as *const c_char, @@ -876,7 +1016,7 @@ impl DB { pub fn merge_cf_opt( &self, - cf: &ColumnFamily, + cf: impl AsColumnFamilyRef, key: K, value: V, writeopts: &WriteOptions, @@ -892,7 +1032,7 @@ impl DB { ffi_try!(ffi::rocksdb_merge_cf( self.inner, writeopts.inner, - cf.inner, + cf.inner(), key.as_ptr() as *const c_char, key.len() as size_t, value.as_ptr() as *const c_char, @@ -922,7 +1062,7 @@ impl DB { pub fn delete_cf_opt>( &self, - cf: &ColumnFamily, + cf: impl AsColumnFamilyRef, key: K, writeopts: &WriteOptions, ) -> Result<(), Error> { @@ -932,7 +1072,7 @@ impl DB { ffi_try!(ffi::rocksdb_delete_cf( self.inner, writeopts.inner, - cf.inner, + cf.inner(), key.as_ptr() as *const c_char, key.len() as size_t, )); @@ -943,7 +1083,7 @@ impl DB { /// Removes the database entries in the range `["from", "to")` using given write options. pub fn delete_range_cf_opt>( &self, - cf: &ColumnFamily, + cf: impl AsColumnFamilyRef, from: K, to: K, writeopts: &WriteOptions, @@ -955,7 +1095,7 @@ impl DB { ffi_try!(ffi::rocksdb_delete_range_cf( self.inner, writeopts.inner, - cf.inner, + cf.inner(), from.as_ptr() as *const c_char, from.len() as size_t, to.as_ptr() as *const c_char, @@ -973,7 +1113,7 @@ impl DB { self.put_opt(key.as_ref(), value.as_ref(), &WriteOptions::default()) } - pub fn put_cf(&self, cf: &ColumnFamily, key: K, value: V) -> Result<(), Error> + pub fn put_cf(&self, cf: impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error> where K: AsRef<[u8]>, V: AsRef<[u8]>, @@ -989,7 +1129,7 @@ impl DB { self.merge_opt(key.as_ref(), value.as_ref(), &WriteOptions::default()) } - pub fn merge_cf(&self, cf: &ColumnFamily, key: K, value: V) -> Result<(), Error> + pub fn merge_cf(&self, cf: impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error> where K: AsRef<[u8]>, V: AsRef<[u8]>, @@ -1001,14 +1141,18 @@ impl DB { self.delete_opt(key.as_ref(), &WriteOptions::default()) } - pub fn delete_cf>(&self, cf: &ColumnFamily, key: K) -> Result<(), Error> { + pub fn delete_cf>( + &self, + cf: impl AsColumnFamilyRef, + key: K, + ) -> Result<(), Error> { self.delete_cf_opt(cf, key.as_ref(), &WriteOptions::default()) } /// Removes the database entries in the range `["from", "to")` using default write options. pub fn delete_range_cf>( &self, - cf: &ColumnFamily, + cf: impl AsColumnFamilyRef, from: K, to: K, ) -> Result<(), Error> { @@ -1057,7 +1201,7 @@ impl DB { /// given column family. This is not likely to be needed for typical usage. pub fn compact_range_cf, E: AsRef<[u8]>>( &self, - cf: &ColumnFamily, + cf: impl AsColumnFamilyRef, start: Option, end: Option, ) { @@ -1067,7 +1211,7 @@ impl DB { ffi::rocksdb_compact_range_cf( self.inner, - cf.inner, + cf.inner(), opt_bytes_to_ptr(start), start.map_or(0, |s| s.len()) as size_t, opt_bytes_to_ptr(end), @@ -1079,7 +1223,7 @@ impl DB { /// Same as `compact_range_cf` but with custom options. pub fn compact_range_cf_opt, E: AsRef<[u8]>>( &self, - cf: &ColumnFamily, + cf: impl AsColumnFamilyRef, start: Option, end: Option, opts: &CompactOptions, @@ -1090,7 +1234,7 @@ impl DB { ffi::rocksdb_compact_range_cf_opt( self.inner, - cf.inner, + cf.inner(), opts.inner, opt_bytes_to_ptr(start), start.map_or(0, |s| s.len()) as size_t, @@ -1118,7 +1262,7 @@ impl DB { pub fn set_options_cf( &self, - cf_handle: &ColumnFamily, + cf: impl AsColumnFamilyRef, opts: &[(&str, &str)], ) -> Result<(), Error> { let copts = convert_options(opts)?; @@ -1128,7 +1272,7 @@ impl DB { unsafe { ffi_try!(ffi::rocksdb_set_options_cf( self.inner, - cf_handle.inner, + cf.inner(), count, cnames.as_ptr(), cvalues.as_ptr(), @@ -1179,7 +1323,7 @@ impl DB { /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634). pub fn property_value_cf( &self, - cf: &ColumnFamily, + cf: impl AsColumnFamilyRef, name: &str, ) -> Result, Error> { let prop_name = match CString::new(name) { @@ -1193,7 +1337,7 @@ impl DB { }; unsafe { - let value = ffi::rocksdb_property_value_cf(self.inner, cf.inner, prop_name.as_ptr()); + let value = ffi::rocksdb_property_value_cf(self.inner, cf.inner(), prop_name.as_ptr()); if value.is_null() { return Ok(None); } @@ -1237,7 +1381,7 @@ impl DB { /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689). pub fn property_int_value_cf( &self, - cf: &ColumnFamily, + cf: impl AsColumnFamilyRef, name: &str, ) -> Result, Error> { match self.property_value_cf(cf, name) { @@ -1314,17 +1458,17 @@ impl DB { /// with default opts pub fn ingest_external_file_cf>( &self, - cf: &ColumnFamily, + cf: impl AsColumnFamilyRef, paths: Vec

, ) -> Result<(), Error> { let opts = IngestExternalFileOptions::default(); - self.ingest_external_file_cf_opts(&cf, &opts, paths) + self.ingest_external_file_cf_opts(cf, &opts, paths) } /// Loads a list of external SST files created with SstFileWriter into the DB for given Column Family pub fn ingest_external_file_cf_opts>( &self, - cf: &ColumnFamily, + cf: impl AsColumnFamilyRef, opts: &IngestExternalFileOptions, paths: Vec

, ) -> Result<(), Error> { @@ -1335,7 +1479,7 @@ impl DB { let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect(); - self.ingest_external_file_raw_cf(&cf, &opts, &paths_v, &cpaths) + self.ingest_external_file_raw_cf(cf, &opts, &paths_v, &cpaths) } fn ingest_external_file_raw( @@ -1357,7 +1501,7 @@ impl DB { fn ingest_external_file_raw_cf( &self, - cf: &ColumnFamily, + cf: impl AsColumnFamilyRef, opts: &IngestExternalFileOptions, paths_v: &[CString], cpaths: &[*const c_char], @@ -1365,7 +1509,7 @@ impl DB { unsafe { ffi_try!(ffi::rocksdb_ingest_external_file_cf( self.inner, - cf.inner, + cf.inner(), cpaths.as_ptr(), paths_v.len(), opts.inner as *const _ @@ -1426,8 +1570,8 @@ impl DB { /// entirely in the range. /// /// Note: L0 files are left regardless of whether they're in the range. - /// - /// Snapshots before the delete might not see the data in the given range. + /// + /// SnapshotWithThreadModes before the delete might not see the data in the given range. pub fn delete_file_in_range>(&self, from: K, to: K) -> Result<(), Error> { let from = from.as_ref(); let to = to.as_ref(); @@ -1446,7 +1590,7 @@ impl DB { /// Same as `delete_file_in_range` but only for specific column family pub fn delete_file_in_range_cf>( &self, - cf: &ColumnFamily, + cf: impl AsColumnFamilyRef, from: K, to: K, ) -> Result<(), Error> { @@ -1455,7 +1599,7 @@ impl DB { unsafe { ffi_try!(ffi::rocksdb_delete_file_in_range_cf( self.inner, - cf.inner, + cf.inner(), from.as_ptr() as *const c_char, from.len() as size_t, to.as_ptr() as *const c_char, @@ -1473,18 +1617,85 @@ impl DB { } } -impl Drop for DB { +impl DBWithThreadMode { + /// Creates column family with given name and options + pub fn create_cf>(&mut self, name: N, opts: &Options) -> Result<(), Error> { + let inner = self.create_inner_cf_handle(name.as_ref(), opts)?; + self.cfs + .cfs + .insert(name.as_ref().to_string(), ColumnFamily { inner }); + Ok(()) + } + + /// Drops the column family with the given name + pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> { + let inner = self.inner; + if let Some(cf) = self.cfs.cfs.remove(name) { + unsafe { + ffi_try!(ffi::rocksdb_drop_column_family(inner, cf.inner)); + } + Ok(()) + } else { + Err(Error::new(format!("Invalid column family: {}", name))) + } + } + + /// Returns the underlying column family handle + pub fn cf_handle<'a>(&'a self, name: &str) -> Option<&'a ColumnFamily> { + self.cfs.cfs.get(name) + } +} + +impl DBWithThreadMode { + /// Creates column family with given name and options + pub fn create_cf>(&self, name: N, opts: &Options) -> Result<(), Error> { + let inner = self.create_inner_cf_handle(name.as_ref(), opts)?; + self.cfs + .cfs + .write() + .unwrap() + .insert(name.as_ref().to_string(), ColumnFamily { inner }); + Ok(()) + } + + /// Drops the column family with the given name by internally locking the inner column + /// family map. This avoids needing `&mut self` reference + pub fn drop_cf(&self, name: &str) -> Result<(), Error> { + let inner = self.inner; + if let Some(cf) = self.cfs.cfs.write().unwrap().remove(name) { + unsafe { + ffi_try!(ffi::rocksdb_drop_column_family(inner, cf.inner)); + } + Ok(()) + } else { + Err(Error::new(format!("Invalid column family: {}", name))) + } + } + + /// Returns the underlying column family handle + pub fn cf_handle(&self, name: &str) -> Option { + self.cfs + .cfs + .read() + .unwrap() + .get(name) + .map(|cf| BoundColumnFamily { + inner: cf.inner, + multi_threaded_cfs: PhantomData, + }) + } +} + +impl Drop for DBWithThreadMode { fn drop(&mut self) { unsafe { - for cf in self.cfs.values() { - ffi::rocksdb_column_family_handle_destroy(cf.inner); - } + self.cfs.cf_drop_all(); ffi::rocksdb_close(self.inner); } } } -impl fmt::Debug for DB { +impl fmt::Debug for DBWithThreadMode { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "RocksDB {{ path: {:?} }}", self.path()) } diff --git a/src/db_iterator.rs b/src/db_iterator.rs index e109885..2a958c8 100644 --- a/src/db_iterator.rs +++ b/src/db_iterator.rs @@ -12,15 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{ffi, ColumnFamily, Error, ReadOptions, WriteBatch, DB}; +use crate::db::{DBAccess, DB}; +use crate::{ffi, Error, ReadOptions, WriteBatch}; use libc::{c_char, c_uchar, size_t}; use std::marker::PhantomData; use std::slice; +/// A type alias to keep compatibility. See [`DBRawIteratorWithThreadMode`] for details +pub type DBRawIterator<'a> = DBRawIteratorWithThreadMode<'a, DB>; + /// An iterator over a database or column family, with specifiable /// ranges and direction. /// -/// This iterator is different to the standard ``DBIterator`` as it aims Into +/// This iterator is different to the standard ``DBIteratorWithThreadMode`` as it aims Into /// replicate the underlying iterator API within RocksDB itself. This should /// give access to more performance and flexibility but departs from the /// widely recognised Rust idioms. @@ -65,7 +69,7 @@ use std::slice; /// } /// let _ = DB::destroy(&Options::default(), path); /// ``` -pub struct DBRawIterator<'a> { +pub struct DBRawIteratorWithThreadMode<'a, D: DBAccess> { inner: *mut ffi::rocksdb_iterator_t, /// When iterate_upper_bound is set, the inner C iterator keeps a pointer to the upper bound @@ -73,14 +77,14 @@ pub struct DBRawIterator<'a> { /// iterator is being used. _readopts: ReadOptions, - db: PhantomData<&'a DB>, + db: PhantomData<&'a D>, } -impl<'a> DBRawIterator<'a> { - pub(crate) fn new(db: &DB, readopts: ReadOptions) -> DBRawIterator<'a> { +impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> { + pub(crate) fn new(db: &D, readopts: ReadOptions) -> DBRawIteratorWithThreadMode<'a, D> { unsafe { - DBRawIterator { - inner: ffi::rocksdb_create_iterator(db.inner, readopts.inner), + DBRawIteratorWithThreadMode { + inner: ffi::rocksdb_create_iterator(db.inner(), readopts.inner), _readopts: readopts, db: PhantomData, } @@ -88,13 +92,13 @@ impl<'a> DBRawIterator<'a> { } pub(crate) fn new_cf( - db: &DB, - cf_handle: &ColumnFamily, + db: &'a D, + cf_handle: *mut ffi::rocksdb_column_family_handle_t, readopts: ReadOptions, - ) -> DBRawIterator<'a> { + ) -> DBRawIteratorWithThreadMode<'a, D> { unsafe { - DBRawIterator { - inner: ffi::rocksdb_create_iterator_cf(db.inner, readopts.inner, cf_handle.inner), + DBRawIteratorWithThreadMode { + inner: ffi::rocksdb_create_iterator_cf(db.inner(), readopts.inner, cf_handle), _readopts: readopts, db: PhantomData, } @@ -105,7 +109,7 @@ impl<'a> DBRawIterator<'a> { /// it reaches the end of its defined range, or when it encounters an error. /// /// To check whether the iterator encountered an error after `valid` has - /// returned `false`, use the [`status`](DBRawIterator::status) method. `status` will never + /// returned `false`, use the [`status`](DBRawIteratorWithThreadMode::status) method. `status` will never /// return an error when `valid` is `true`. pub fn valid(&self) -> bool { unsafe { ffi::rocksdb_iter_valid(self.inner) != 0 } @@ -113,7 +117,7 @@ impl<'a> DBRawIterator<'a> { /// Returns an error `Result` if the iterator has encountered an error /// during operation. When an error is encountered, the iterator is - /// invalidated and [`valid`](DBRawIterator::valid) will return `false` when called. + /// invalidated and [`valid`](DBRawIteratorWithThreadMode::valid) will return `false` when called. /// /// Performing a seek will discard the current status. pub fn status(&self) -> Result<(), Error> { @@ -323,7 +327,7 @@ impl<'a> DBRawIterator<'a> { } } -impl<'a> Drop for DBRawIterator<'a> { +impl<'a, D: DBAccess> Drop for DBRawIteratorWithThreadMode<'a, D> { fn drop(&mut self) { unsafe { ffi::rocksdb_iter_destroy(self.inner); @@ -331,8 +335,11 @@ impl<'a> Drop for DBRawIterator<'a> { } } -unsafe impl<'a> Send for DBRawIterator<'a> {} -unsafe impl<'a> Sync for DBRawIterator<'a> {} +unsafe impl<'a, D: DBAccess> Send for DBRawIteratorWithThreadMode<'a, D> {} +unsafe impl<'a, D: DBAccess> Sync for DBRawIteratorWithThreadMode<'a, D> {} + +/// A type alias to keep compatibility. See [`DBIteratorWithThreadMode`] for details +pub type DBIterator<'a> = DBIteratorWithThreadMode<'a, DB>; /// An iterator over a database or column family, with specifiable /// ranges and direction. @@ -365,8 +372,8 @@ unsafe impl<'a> Sync for DBRawIterator<'a> {} /// } /// let _ = DB::destroy(&Options::default(), path); /// ``` -pub struct DBIterator<'a> { - raw: DBRawIterator<'a>, +pub struct DBIteratorWithThreadMode<'a, D: DBAccess> { + raw: DBRawIteratorWithThreadMode<'a, D>, direction: Direction, just_seeked: bool, } @@ -384,10 +391,10 @@ pub enum IteratorMode<'a> { From(&'a [u8], Direction), } -impl<'a> DBIterator<'a> { - pub(crate) fn new(db: &DB, readopts: ReadOptions, mode: IteratorMode) -> DBIterator<'a> { - let mut rv = DBIterator { - raw: DBRawIterator::new(db, readopts), +impl<'a, D: DBAccess> DBIteratorWithThreadMode<'a, D> { + pub(crate) fn new(db: &D, readopts: ReadOptions, mode: IteratorMode) -> Self { + let mut rv = DBIteratorWithThreadMode { + raw: DBRawIteratorWithThreadMode::new(db, readopts), direction: Direction::Forward, // blown away by set_mode() just_seeked: false, }; @@ -396,13 +403,13 @@ impl<'a> DBIterator<'a> { } pub(crate) fn new_cf( - db: &DB, - cf_handle: &ColumnFamily, + db: &'a D, + cf_handle: *mut ffi::rocksdb_column_family_handle_t, readopts: ReadOptions, mode: IteratorMode, - ) -> DBIterator<'a> { - let mut rv = DBIterator { - raw: DBRawIterator::new_cf(db, cf_handle, readopts), + ) -> Self { + let mut rv = DBIteratorWithThreadMode { + raw: DBRawIteratorWithThreadMode::new_cf(db, cf_handle, readopts), direction: Direction::Forward, // blown away by set_mode() just_seeked: false, }; @@ -433,18 +440,18 @@ impl<'a> DBIterator<'a> { self.just_seeked = true; } - /// See [`valid`](DBRawIterator::valid) + /// See [`valid`](DBRawIteratorWithThreadMode::valid) pub fn valid(&self) -> bool { self.raw.valid() } - /// See [`status`](DBRawIterator::status) + /// See [`status`](DBRawIteratorWithThreadMode::status) pub fn status(&self) -> Result<(), Error> { self.raw.status() } } -impl<'a> Iterator for DBIterator<'a> { +impl<'a, D: DBAccess> Iterator for DBIteratorWithThreadMode<'a, D> { type Item = KVBytes; fn next(&mut self) -> Option { @@ -475,8 +482,8 @@ impl<'a> Iterator for DBIterator<'a> { } } -impl<'a> Into> for DBIterator<'a> { - fn into(self) -> DBRawIterator<'a> { +impl<'a, D: DBAccess> Into> for DBIteratorWithThreadMode<'a, D> { + fn into(self) -> DBRawIteratorWithThreadMode<'a, D> { self.raw } } diff --git a/src/db_options.rs b/src/db_options.rs index 40d1771..c99207e 100644 --- a/src/db_options.rs +++ b/src/db_options.rs @@ -22,12 +22,13 @@ use crate::{ compaction_filter::{self, CompactionFilterCallback, CompactionFilterFn}, compaction_filter_factory::{self, CompactionFilterFactory}, comparator::{self, ComparatorCallback, CompareFn}, + db::DBAccess, ffi, merge_operator::{ self, full_merge_callback, partial_merge_callback, MergeFn, MergeOperatorCallback, }, slice_transform::SliceTransform, - Error, Snapshot, + Error, SnapshotWithThreadMode, }; fn new_cache(capacity: size_t) -> *mut ffi::rocksdb_cache_t { @@ -2849,7 +2850,7 @@ impl ReadOptions { /// Sets the snapshot which should be used for the read. /// The snapshot must belong to the DB that is being read and must /// not have been released. - pub(crate) fn set_snapshot(&mut self, snapshot: &Snapshot) { + pub(crate) fn set_snapshot(&mut self, snapshot: &SnapshotWithThreadMode) { unsafe { ffi::rocksdb_readoptions_set_snapshot(self.inner, snapshot.inner); } diff --git a/src/lib.rs b/src/lib.rs index 74a902d..2858c4a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -70,6 +70,10 @@ clippy::missing_safety_doc, clippy::needless_pass_by_value, clippy::option_if_let_else, + clippy::ptr_as_ptr, + clippy::missing_panics_doc, + clippy::from_over_into, + clippy::upper_case_acronyms, )] #[macro_use] @@ -93,10 +97,16 @@ mod sst_file_writer; mod write_batch; pub use crate::{ - column_family::{ColumnFamily, ColumnFamilyDescriptor, DEFAULT_COLUMN_FAMILY_NAME}, + column_family::{ + AsColumnFamilyRef, BoundColumnFamily, ColumnFamily, ColumnFamilyDescriptor, + ColumnFamilyRef, DEFAULT_COLUMN_FAMILY_NAME, + }, compaction_filter::Decision as CompactionDecision, - db::{LiveFile, DB}, - db_iterator::{DBIterator, DBRawIterator, DBWALIterator, Direction, IteratorMode}, + db::{DBWithThreadMode, LiveFile, MultiThreaded, SingleThreaded, DB}, + db_iterator::{ + DBIterator, DBIteratorWithThreadMode, DBRawIterator, DBRawIteratorWithThreadMode, + DBWALIterator, Direction, IteratorMode, + }, db_options::{ BlockBasedIndexType, BlockBasedOptions, BottommostLevelCompaction, Cache, CompactOptions, DBCompactionStyle, DBCompressionType, DBPath, DBRecoveryMode, DataBlockIndexType, Env, @@ -108,7 +118,7 @@ pub use crate::{ merge_operator::MergeOperands, perf::{PerfContext, PerfMetric, PerfStatsLevel}, slice_transform::SliceTransform, - snapshot::Snapshot, + snapshot::{Snapshot, SnapshotWithThreadMode}, sst_file_writer::SstFileWriter, write_batch::{WriteBatch, WriteBatchIterator}, }; @@ -162,9 +172,9 @@ impl fmt::Display for Error { #[cfg(test)] mod test { use super::{ - BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, - IngestExternalFileOptions, Options, PlainTableFactoryOptions, ReadOptions, Snapshot, - SstFileWriter, WriteBatch, WriteOptions, DB, + BlockBasedOptions, BoundColumnFamily, ColumnFamily, ColumnFamilyDescriptor, DBIterator, + DBRawIterator, IngestExternalFileOptions, Options, PlainTableFactoryOptions, ReadOptions, + Snapshot, SstFileWriter, WriteBatch, WriteOptions, DB, }; #[test] @@ -188,6 +198,7 @@ mod test { is_send::(); is_send::(); is_send::(); + is_send::>(); is_send::(); is_send::(); } diff --git a/src/snapshot.rs b/src/snapshot.rs index 41b2482..0f77b03 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -12,7 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{ffi, ColumnFamily, DBIterator, DBRawIterator, Error, IteratorMode, ReadOptions, DB}; +use crate::{ + db::DBAccess, ffi, AsColumnFamilyRef, DBIteratorWithThreadMode, DBRawIteratorWithThreadMode, + Error, IteratorMode, ReadOptions, DB, +}; + +/// A type alias to keep compatibility. See [`SnapshotWithThreadMode`] for details +pub type Snapshot<'a> = SnapshotWithThreadMode<'a, DB>; /// A consistent view of the database at the point of creation. /// @@ -30,80 +36,91 @@ use crate::{ffi, ColumnFamily, DBIterator, DBRawIterator, Error, IteratorMode, R /// let _ = DB::destroy(&Options::default(), path); /// ``` /// -pub struct Snapshot<'a> { - db: &'a DB, +pub struct SnapshotWithThreadMode<'a, D: DBAccess> { + db: &'a D, pub(crate) inner: *const ffi::rocksdb_snapshot_t, } -impl<'a> Snapshot<'a> { - /// Creates a new `Snapshot` of the database `db`. - pub fn new(db: &DB) -> Snapshot { - let snapshot = unsafe { ffi::rocksdb_create_snapshot(db.inner) }; - Snapshot { +impl<'a, D: DBAccess> SnapshotWithThreadMode<'a, D> { + /// Creates a new `SnapshotWithThreadMode` of the database `db`. + pub fn new(db: &'a D) -> Self { + let snapshot = unsafe { ffi::rocksdb_create_snapshot(db.inner()) }; + Self { db, inner: snapshot, } } /// Creates an iterator over the data in this snapshot, using the default read options. - pub fn iterator(&self, mode: IteratorMode) -> DBIterator<'a> { + pub fn iterator(&self, mode: IteratorMode) -> DBIteratorWithThreadMode<'a, D> { let readopts = ReadOptions::default(); self.iterator_opt(mode, readopts) } /// Creates an iterator over the data in this snapshot under the given column family, using /// the default read options. - pub fn iterator_cf(&self, cf_handle: &ColumnFamily, mode: IteratorMode) -> DBIterator { + pub fn iterator_cf( + &self, + cf_handle: impl AsColumnFamilyRef, + mode: IteratorMode, + ) -> DBIteratorWithThreadMode { let readopts = ReadOptions::default(); self.iterator_cf_opt(cf_handle, readopts, mode) } /// Creates an iterator over the data in this snapshot, using the given read options. - pub fn iterator_opt(&self, mode: IteratorMode, mut readopts: ReadOptions) -> DBIterator<'a> { + pub fn iterator_opt( + &self, + mode: IteratorMode, + mut readopts: ReadOptions, + ) -> DBIteratorWithThreadMode<'a, D> { readopts.set_snapshot(self); - DBIterator::new(self.db, readopts, mode) + DBIteratorWithThreadMode::::new(self.db, readopts, mode) } /// Creates an iterator over the data in this snapshot under the given column family, using /// the given read options. pub fn iterator_cf_opt( &self, - cf_handle: &ColumnFamily, + cf_handle: impl AsColumnFamilyRef, mut readopts: ReadOptions, mode: IteratorMode, - ) -> DBIterator { + ) -> DBIteratorWithThreadMode { readopts.set_snapshot(self); - DBIterator::new_cf(self.db, cf_handle, readopts, mode) + DBIteratorWithThreadMode::new_cf(self.db, cf_handle.inner(), readopts, mode) } /// Creates a raw iterator over the data in this snapshot, using the default read options. - pub fn raw_iterator(&self) -> DBRawIterator { + pub fn raw_iterator(&self) -> DBRawIteratorWithThreadMode { let readopts = ReadOptions::default(); self.raw_iterator_opt(readopts) } /// Creates a raw iterator over the data in this snapshot under the given column family, using /// the default read options. - pub fn raw_iterator_cf(&self, cf_handle: &ColumnFamily) -> DBRawIterator { + pub fn raw_iterator_cf( + &self, + cf_handle: impl AsColumnFamilyRef, + ) -> DBRawIteratorWithThreadMode { let readopts = ReadOptions::default(); self.raw_iterator_cf_opt(cf_handle, readopts) } /// Creates a raw iterator over the data in this snapshot, using the given read options. - pub fn raw_iterator_opt(&self, mut readopts: ReadOptions) -> DBRawIterator { + pub fn raw_iterator_opt(&self, mut readopts: ReadOptions) -> DBRawIteratorWithThreadMode { readopts.set_snapshot(self); - DBRawIterator::new(self.db, readopts) + DBRawIteratorWithThreadMode::new(self.db, readopts) } /// Creates a raw iterator over the data in this snapshot under the given column family, using /// the given read options. pub fn raw_iterator_cf_opt( &self, - cf_handle: &ColumnFamily, + cf_handle: impl AsColumnFamilyRef, mut readopts: ReadOptions, - ) -> DBRawIterator { + ) -> DBRawIteratorWithThreadMode { readopts.set_snapshot(self); - DBRawIterator::new_cf(self.db, cf_handle, readopts) + DBRawIteratorWithThreadMode::new_cf(self.db, cf_handle.inner(), readopts) } /// Returns the bytes associated with a key value with default read options. @@ -116,7 +133,7 @@ impl<'a> Snapshot<'a> { /// options. pub fn get_cf>( &self, - cf: &ColumnFamily, + cf: impl AsColumnFamilyRef, key: K, ) -> Result>, Error> { let readopts = ReadOptions::default(); @@ -136,7 +153,7 @@ impl<'a> Snapshot<'a> { /// Returns the bytes associated with a key value, given column family and read options. pub fn get_cf_opt>( &self, - cf: &ColumnFamily, + cf: impl AsColumnFamilyRef, key: K, mut readopts: ReadOptions, ) -> Result>, Error> { @@ -145,15 +162,15 @@ impl<'a> Snapshot<'a> { } } -impl<'a> Drop for Snapshot<'a> { +impl<'a, D: DBAccess> Drop for SnapshotWithThreadMode<'a, D> { fn drop(&mut self) { unsafe { - ffi::rocksdb_release_snapshot(self.db.inner, self.inner); + ffi::rocksdb_release_snapshot(self.db.inner(), self.inner); } } } -/// `Send` and `Sync` implementations for `Snapshot` are safe, because `Snapshot` is +/// `Send` and `Sync` implementations for `SnapshotWithThreadMode` are safe, because `SnapshotWithThreadMode` is /// immutable and can be safely shared between threads. -unsafe impl<'a> Send for Snapshot<'a> {} -unsafe impl<'a> Sync for Snapshot<'a> {} +unsafe impl<'a, D: DBAccess> Send for SnapshotWithThreadMode<'a, D> {} +unsafe impl<'a, D: DBAccess> Sync for SnapshotWithThreadMode<'a, D> {} diff --git a/src/write_batch.rs b/src/write_batch.rs index a2566bd..00707f9 100644 --- a/src/write_batch.rs +++ b/src/write_batch.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{ffi, ColumnFamily}; +use crate::{ffi, AsColumnFamilyRef}; use libc::{c_char, c_void, size_t}; use std::slice; @@ -134,7 +134,7 @@ impl WriteBatch { } } - pub fn put_cf(&mut self, cf: &ColumnFamily, key: K, value: V) + pub fn put_cf(&mut self, cf: impl AsColumnFamilyRef, key: K, value: V) where K: AsRef<[u8]>, V: AsRef<[u8]>, @@ -145,7 +145,7 @@ impl WriteBatch { unsafe { ffi::rocksdb_writebatch_put_cf( self.inner, - cf.inner, + cf.inner(), key.as_ptr() as *const c_char, key.len() as size_t, value.as_ptr() as *const c_char, @@ -173,7 +173,7 @@ impl WriteBatch { } } - pub fn merge_cf(&mut self, cf: &ColumnFamily, key: K, value: V) + pub fn merge_cf(&mut self, cf: impl AsColumnFamilyRef, key: K, value: V) where K: AsRef<[u8]>, V: AsRef<[u8]>, @@ -184,7 +184,7 @@ impl WriteBatch { unsafe { ffi::rocksdb_writebatch_merge_cf( self.inner, - cf.inner, + cf.inner(), key.as_ptr() as *const c_char, key.len() as size_t, value.as_ptr() as *const c_char, @@ -206,13 +206,13 @@ impl WriteBatch { } } - pub fn delete_cf>(&mut self, cf: &ColumnFamily, key: K) { + pub fn delete_cf>(&mut self, cf: impl AsColumnFamilyRef, key: K) { let key = key.as_ref(); unsafe { ffi::rocksdb_writebatch_delete_cf( self.inner, - cf.inner, + cf.inner(), key.as_ptr() as *const c_char, key.len() as size_t, ); @@ -243,13 +243,13 @@ impl WriteBatch { /// Removes the database entries in the range ["begin_key", "end_key"), i.e., /// including "begin_key" and excluding "end_key". It is not an error if no /// keys exist in the range ["begin_key", "end_key"). - pub fn delete_range_cf>(&mut self, cf: &ColumnFamily, from: K, to: K) { + pub fn delete_range_cf>(&mut self, cf: impl AsColumnFamilyRef, from: K, to: K) { let (start_key, end_key) = (from.as_ref(), to.as_ref()); unsafe { ffi::rocksdb_writebatch_delete_range_cf( self.inner, - cf.inner, + cf.inner(), start_key.as_ptr() as *const c_char, start_key.len() as size_t, end_key.as_ptr() as *const c_char, diff --git a/tests/fail/open_with_multiple_refs_as_single_threaded.rs b/tests/fail/open_with_multiple_refs_as_single_threaded.rs new file mode 100644 index 0000000..8bf3524 --- /dev/null +++ b/tests/fail/open_with_multiple_refs_as_single_threaded.rs @@ -0,0 +1,10 @@ +use rocksdb::{SingleThreaded, DBWithThreadMode, Options}; + +fn main() { + let db = DBWithThreadMode::::open_default("/path/to/dummy").unwrap(); + let db_ref1 = &db; + let db_ref2 = &db; + let opts = Options::default(); + db_ref1.create_cf("cf1", &opts).unwrap(); + db_ref2.create_cf("cf2", &opts).unwrap(); +} diff --git a/tests/fail/open_with_multiple_refs_as_single_threaded.stderr b/tests/fail/open_with_multiple_refs_as_single_threaded.stderr new file mode 100644 index 0000000..61879c0 --- /dev/null +++ b/tests/fail/open_with_multiple_refs_as_single_threaded.stderr @@ -0,0 +1,17 @@ +error[E0596]: cannot borrow `*db_ref1` as mutable, as it is behind a `&` reference + --> $DIR/open_with_multiple_refs_as_single_threaded.rs:8:5 + | +5 | let db_ref1 = &db; + | --- help: consider changing this to be a mutable reference: `&mut db` +... +8 | db_ref1.create_cf("cf1", &opts).unwrap(); + | ^^^^^^^ `db_ref1` is a `&` reference, so the data it refers to cannot be borrowed as mutable + +error[E0596]: cannot borrow `*db_ref2` as mutable, as it is behind a `&` reference + --> $DIR/open_with_multiple_refs_as_single_threaded.rs:9:5 + | +6 | let db_ref2 = &db; + | --- help: consider changing this to be a mutable reference: `&mut db` +... +9 | db_ref2.create_cf("cf2", &opts).unwrap(); + | ^^^^^^^ `db_ref2` is a `&` reference, so the data it refers to cannot be borrowed as mutable diff --git a/tests/test_db.rs b/tests/test_db.rs index d9ec85b..9c82c5e 100644 --- a/tests/test_db.rs +++ b/tests/test_db.rs @@ -20,9 +20,10 @@ use pretty_assertions::assert_eq; use rocksdb::{ perf::get_memory_usage_stats, BlockBasedOptions, BottommostLevelCompaction, Cache, - CompactOptions, DBCompactionStyle, Env, Error, FifoCompactOptions, IteratorMode, Options, - PerfContext, PerfMetric, ReadOptions, SliceTransform, Snapshot, UniversalCompactOptions, - UniversalCompactionStopStyle, WriteBatch, DB, + CompactOptions, DBCompactionStyle, DBWithThreadMode, Env, Error, FifoCompactOptions, + IteratorMode, MultiThreaded, Options, PerfContext, PerfMetric, ReadOptions, SingleThreaded, + SliceTransform, Snapshot, UniversalCompactOptions, UniversalCompactionStopStyle, WriteBatch, + DB, }; use util::DBPath; @@ -528,6 +529,36 @@ fn test_open_with_ttl() { assert!(db.get(b"key1").unwrap().is_none()); } +#[test] +fn test_open_as_single_threaded() { + let primary_path = DBPath::new("_rust_rocksdb_test_open_as_single_threaded"); + + let mut db = DBWithThreadMode::::open_default(&primary_path).unwrap(); + let db_ref1 = &mut db; + let opts = Options::default(); + db_ref1.create_cf("cf1", &opts).unwrap(); +} + +#[test] +fn test_open_with_multiple_refs_as_multi_threaded() { + // This tests multiple references can be allowed while creating column families + let primary_path = DBPath::new("_rust_rocksdb_test_open_as_multi_threaded"); + + let db = DBWithThreadMode::::open_default(&primary_path).unwrap(); + let db_ref1 = &db; + let db_ref2 = &db; + let opts = Options::default(); + db_ref1.create_cf("cf1", &opts).unwrap(); + db_ref2.create_cf("cf2", &opts).unwrap(); +} + +#[test] +fn test_open_with_multiple_refs_as_single_threaded() { + // This tests multiple references CANNOT be allowed while creating column families + let t = trybuild::TestCases::new(); + t.compile_fail("tests/fail/open_with_multiple_refs_as_single_threaded.rs"); +} + #[test] fn compact_range_test() { let path = DBPath::new("_rust_rocksdb_compact_range_test");