From abf121f20cd3e0e95cd336a79973d2b47610f5f2 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 22 Jul 2021 15:35:30 +0900 Subject: [PATCH] Don't leak dropped column families (#509) --- src/column_family.rs | 62 ++++++++++- src/db.rs | 202 ++++++++++++++++++++---------------- src/lib.rs | 2 +- src/snapshot.rs | 12 +-- src/write_batch.rs | 8 +- tests/test_column_family.rs | 100 ++++++++++++++++-- tests/test_db.rs | 86 +++++++-------- tests/test_property.rs | 4 +- 8 files changed, 321 insertions(+), 155 deletions(-) diff --git a/src/column_family.rs b/src/column_family.rs index c82d5c1..3184c65 100644 --- a/src/column_family.rs +++ b/src/column_family.rs @@ -14,6 +14,8 @@ use crate::{db::MultiThreaded, ffi, Options}; +use std::sync::Arc; + /// The name of the default column family. /// /// The column family with this name is created implicitly whenever column @@ -52,19 +54,62 @@ pub struct ColumnFamily { /// single-threaded mode). `Clone`/`Copy` is safe because this lifetime is bound to DB like /// iterators/snapshots. On top of it, this is as cheap and small as `&ColumnFamily` because /// this only has a single pointer-wide field. -#[derive(Clone, Copy)] pub struct BoundColumnFamily<'a> { pub(crate) inner: *mut ffi::rocksdb_column_family_handle_t, pub(crate) multi_threaded_cfs: std::marker::PhantomData<&'a MultiThreaded>, } +// internal struct which isn't exposed to public api. +// but its memory will be exposed after transmute()-ing to BoundColumnFamily. +// ColumnFamily's lifetime should be bound to DB. But, db holds cfs and cfs can't easily +// self-reference DB as its lifetime due to rust's type system +pub(crate) struct UnboundColumnFamily { + pub(crate) inner: *mut ffi::rocksdb_column_family_handle_t, +} + +impl UnboundColumnFamily { + pub(crate) fn bound_column_family<'a>(self: Arc) -> Arc> { + // SAFETY: the new BoundColumnFamily here just adding lifetime, + // so that column family handle won't outlive db. + unsafe { std::mem::transmute(self) } + } +} + +fn destroy_handle(handle: *mut ffi::rocksdb_column_family_handle_t) { + // SAFETY: This should be called only from various Drop::drop(), strictly keeping a 1-to-1 + // ownership to avoid double invocation to the rocksdb function with same handle. + unsafe { + ffi::rocksdb_column_family_handle_destroy(handle); + } +} + +impl Drop for ColumnFamily { + fn drop(&mut self) { + destroy_handle(self.inner); + } +} + +// these behaviors must be identical between BoundColumnFamily and UnboundColumnFamily +// due to the unsafe transmute() in bound_column_family()! +impl<'a> Drop for BoundColumnFamily<'a> { + fn drop(&mut self) { + destroy_handle(self.inner); + } +} + +impl Drop for UnboundColumnFamily { + fn drop(&mut self) { + destroy_handle(self.inner); + } +} + /// Handy type alias to hide actual type difference to reference [`ColumnFamily`] /// depending on the `multi-threaded-cf` crate feature. #[cfg(not(feature = "multi-threaded-cf"))] pub type ColumnFamilyRef<'a> = &'a ColumnFamily; #[cfg(feature = "multi-threaded-cf")] -pub type ColumnFamilyRef<'a> = BoundColumnFamily<'a>; +pub type ColumnFamilyRef<'a> = Arc>; /// Utility trait to accept both supported references to `ColumnFamily` /// (`&ColumnFamily` and `BoundColumnFamily`) @@ -72,13 +117,24 @@ pub trait AsColumnFamilyRef { fn inner(&self) -> *mut ffi::rocksdb_column_family_handle_t; } +impl AsColumnFamilyRef for ColumnFamily { + fn inner(&self) -> *mut ffi::rocksdb_column_family_handle_t { + self.inner + } +} + impl<'a> AsColumnFamilyRef for &'a ColumnFamily { fn inner(&self) -> *mut ffi::rocksdb_column_family_handle_t { self.inner } } -impl<'a> AsColumnFamilyRef for BoundColumnFamily<'a> { +// Only implement for Arc-ed BoundColumnFamily as this tightly coupled and +// implmetation detail, considering use of std::mem::transmute. BoundColumnFamily +// isn't expected to be used as naked. +// Also, ColumnFamilyRef might not be Arc> depending crate +// feature flags so, we can't use the type alias here. +impl<'a> AsColumnFamilyRef for Arc> { fn inner(&self) -> *mut ffi::rocksdb_column_family_handle_t { self.inner } diff --git a/src/db.rs b/src/db.rs index 72c83bd..1890d8f 100644 --- a/src/db.rs +++ b/src/db.rs @@ -16,6 +16,7 @@ use crate::{ column_family::AsColumnFamilyRef, column_family::BoundColumnFamily, + column_family::UnboundColumnFamily, db_options::OptionsMustOutliveDB, ffi, ffi_util::{from_cstr, opt_bytes_to_ptr, raw_data, to_cpath}, @@ -31,24 +32,35 @@ use std::ffi::{CStr, CString}; use std::fmt; use std::fs; use std::iter; -use std::marker::PhantomData; use std::path::Path; use std::path::PathBuf; use std::ptr; use std::slice; use std::str; +use std::sync::Arc; use std::sync::RwLock; use std::time::Duration; -// Marker trait to specify single or multi threaded column family alternations for DB -// Also, this is minimum common API sharable between SingleThreaded and -// MultiThreaded. Others differ in self mutability and return type. +/// Marker trait to specify single or multi threaded column family alternations for +/// [`DBWithThreadMode`] +/// +/// This arrangement makes differences in self mutability and return type in +/// some of `DBWithThreadMode` methods. +/// +/// While being a marker trait to be generic over `DBWithThreadMode`, this trait +/// also has a minimum set of not-encapsulated internal methods between +/// [`SingleThreaded`] and [`MultiThreaded`]. These methods aren't expected to be +/// called and defined externally. pub trait ThreadMode { - fn new(cf_map: BTreeMap) -> Self; - fn cf_drop_all(&mut self); + /// Internal implementation for storing column family handles + fn new_cf_map_internal( + cf_map: BTreeMap, + ) -> Self; + /// Internal implementation for dropping column family handles + fn drop_all_cfs_internal(&mut self); } -/// Actual marker type for the internal marker trait `ThreadMode`, which holds +/// Actual marker type for the marker trait `ThreadMode`, which holds /// a collection of column families without synchronization primitive, providing /// no overhead for the single-threaded column family alternations. The other /// mode is [`MultiThreaded`]. @@ -58,47 +70,56 @@ pub struct SingleThreaded { cfs: BTreeMap, } -/// Actual marker type for the internal marker trait `ThreadMode`, which holds +/// Actual marker type for the marker trait `ThreadMode`, which holds /// a collection of column families wrapped in a RwLock to be mutated /// concurrently. The other mode is [`SingleThreaded`]. /// /// See [`DB`] for more details, including performance implications for each mode pub struct MultiThreaded { - cfs: RwLock>, + cfs: RwLock>>, } impl ThreadMode for SingleThreaded { - fn new(cfs: BTreeMap) -> Self { - Self { cfs } + fn new_cf_map_internal( + cfs: BTreeMap, + ) -> Self { + Self { + cfs: cfs + .into_iter() + .map(|(n, c)| (n, ColumnFamily { inner: c })) + .collect(), + } } - fn cf_drop_all(&mut self) { - for cf in self.cfs.values() { - unsafe { - ffi::rocksdb_column_family_handle_destroy(cf.inner); - } - } + fn drop_all_cfs_internal(&mut self) { + // Cause all ColumnFamily objects to be Drop::drop()-ed. + self.cfs.clear(); } } impl ThreadMode for MultiThreaded { - fn new(cfs: BTreeMap) -> Self { + fn new_cf_map_internal( + cfs: BTreeMap, + ) -> Self { Self { - cfs: RwLock::new(cfs), + cfs: RwLock::new( + cfs.into_iter() + .map(|(n, c)| (n, Arc::new(UnboundColumnFamily { inner: c }))) + .collect(), + ), } } - fn cf_drop_all(&mut self) { - for cf in self.cfs.read().unwrap().values() { - unsafe { - ffi::rocksdb_column_family_handle_destroy(cf.inner); - } - } + fn drop_all_cfs_internal(&mut self) { + // Cause all UnboundColumnFamily objects to be Drop::drop()-ed. + self.cfs.write().unwrap().clear(); } } /// A RocksDB database. /// +/// This is previously named [`DB`], which is a type alias now for compatibility. +/// /// See crate level documentation for a simple usage example. pub struct DBWithThreadMode { pub(crate) inner: *mut ffi::rocksdb_t, @@ -120,7 +141,7 @@ pub trait DBAccess { fn get_cf_opt>( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, key: K, readopts: &ReadOptions, ) -> Result>, Error>; @@ -141,7 +162,7 @@ impl DBAccess for DBWithThreadMode { fn get_cf_opt>( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, key: K, readopts: &ReadOptions, ) -> Result>, Error> { @@ -154,7 +175,7 @@ impl DBAccess for DBWithThreadMode { /// /// # Compatibility and multi-threaded mode /// -/// Previously, `DB` was defined as a direct struct. Now, it's type-aliased for +/// Previously, [`DB`] was defined as a direct `struct`. Now, it's type-aliased for /// compatibility. Use `DBWithThreadMode` for multi-threaded /// column family alternations. /// @@ -421,7 +442,7 @@ impl DBWithThreadMode { } for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) { - cf_map.insert(cf_desc.name.clone(), ColumnFamily { inner }); + cf_map.insert(cf_desc.name.clone(), inner); } } @@ -432,7 +453,7 @@ impl DBWithThreadMode { Ok(Self { inner: db, path: path.as_ref().to_path_buf(), - cfs: T::new(cf_map), + cfs: T::new_cf_map_internal(cf_map), _outlive: outlive, }) } @@ -584,7 +605,7 @@ impl DBWithThreadMode { /// Flushes database memtables to SST files on the disk for a given column family. pub fn flush_cf_opt( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, flushopts: &FlushOptions, ) -> Result<(), Error> { unsafe { @@ -599,7 +620,7 @@ impl DBWithThreadMode { /// Flushes database memtables to SST files on the disk for a given column family using default /// options. - pub fn flush_cf(&self, cf: impl AsColumnFamilyRef) -> Result<(), Error> { + pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> { self.flush_cf_opt(cf, &FlushOptions::default()) } @@ -644,7 +665,7 @@ impl DBWithThreadMode { /// [`get_pinned_cf_opt`](#method.get_pinned_cf_opt) to avoid unnecessary memory. pub fn get_cf_opt>( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, key: K, readopts: &ReadOptions, ) -> Result>, Error> { @@ -657,7 +678,7 @@ impl DBWithThreadMode { /// [`get_pinned_cf`](#method.get_pinned_cf) to avoid unnecessary memory. pub fn get_cf>( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, key: K, ) -> Result>, Error> { self.get_cf_opt(cf, key.as_ref(), &ReadOptions::default()) @@ -706,7 +727,7 @@ impl DBWithThreadMode { /// allows specifying ColumnFamily pub fn get_pinned_cf_opt>( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, key: K, readopts: &ReadOptions, ) -> Result, Error> { @@ -740,7 +761,7 @@ impl DBWithThreadMode { /// leverages default options. pub fn get_pinned_cf>( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, key: K, ) -> Result, Error> { self.get_pinned_cf_opt(cf, key, &ReadOptions::default()) @@ -791,24 +812,27 @@ impl DBWithThreadMode { } /// Return the values associated with the given keys and column families. - pub fn multi_get_cf(&self, keys: I) -> Vec>, Error>> + pub fn multi_get_cf<'a, 'b: 'a, K, I, W: 'b>( + &'a self, + keys: I, + ) -> Vec>, Error>> where K: AsRef<[u8]>, - I: IntoIterator, + I: IntoIterator, W: AsColumnFamilyRef, { self.multi_get_cf_opt(keys, &ReadOptions::default()) } /// Return the values associated with the given keys and column families using read options. - pub fn multi_get_cf_opt( - &self, + pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W: 'b>( + &'a self, keys: I, readopts: &ReadOptions, ) -> Vec>, Error>> where K: AsRef<[u8]>, - I: IntoIterator, + I: IntoIterator, W: AsColumnFamilyRef, { let mut boxed_keys: Vec> = Vec::new(); @@ -875,7 +899,7 @@ impl DBWithThreadMode { /// Returns `false` if the given key definitely doesn't exist in the specified column family, /// otherwise returns `true`. This function uses default `ReadOptions`. - pub fn key_may_exist_cf>(&self, cf: impl AsColumnFamilyRef, key: K) -> bool { + pub fn key_may_exist_cf>(&self, cf: &impl AsColumnFamilyRef, key: K) -> bool { self.key_may_exist_cf_opt(cf, key, &ReadOptions::default()) } @@ -883,7 +907,7 @@ impl DBWithThreadMode { /// otherwise returns `true`. pub fn key_may_exist_cf_opt>( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, key: K, readopts: &ReadOptions, ) -> bool { @@ -945,7 +969,7 @@ impl DBWithThreadMode { /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions pub fn iterator_cf_opt<'a: 'b, 'b>( &'a self, - cf_handle: impl AsColumnFamilyRef, + cf_handle: &impl AsColumnFamilyRef, readopts: ReadOptions, mode: IteratorMode, ) -> DBIteratorWithThreadMode<'b, Self> { @@ -979,7 +1003,7 @@ impl DBWithThreadMode { pub fn iterator_cf<'a: 'b, 'b>( &'a self, - cf_handle: impl AsColumnFamilyRef, + cf_handle: &impl AsColumnFamilyRef, mode: IteratorMode, ) -> DBIteratorWithThreadMode<'b, Self> { let opts = ReadOptions::default(); @@ -988,7 +1012,7 @@ impl DBWithThreadMode { pub fn full_iterator_cf<'a: 'b, 'b>( &'a self, - cf_handle: impl AsColumnFamilyRef, + cf_handle: &impl AsColumnFamilyRef, mode: IteratorMode, ) -> DBIteratorWithThreadMode<'b, Self> { let mut opts = ReadOptions::default(); @@ -998,7 +1022,7 @@ impl DBWithThreadMode { pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>( &'a self, - cf_handle: impl AsColumnFamilyRef, + cf_handle: &impl AsColumnFamilyRef, prefix: P, ) -> DBIteratorWithThreadMode<'a, Self> { let mut opts = ReadOptions::default(); @@ -1020,7 +1044,7 @@ impl DBWithThreadMode { /// Opens a raw iterator over the given column family, using the default read options pub fn raw_iterator_cf<'a: 'b, 'b>( &'a self, - cf_handle: impl AsColumnFamilyRef, + cf_handle: &impl AsColumnFamilyRef, ) -> DBRawIteratorWithThreadMode<'b, Self> { let opts = ReadOptions::default(); DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts) @@ -1037,7 +1061,7 @@ impl DBWithThreadMode { /// Opens a raw iterator over the given column family, using the given read options pub fn raw_iterator_cf_opt<'a: 'b, 'b>( &'a self, - cf_handle: impl AsColumnFamilyRef, + cf_handle: &impl AsColumnFamilyRef, readopts: ReadOptions, ) -> DBRawIteratorWithThreadMode<'b, Self> { DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts) @@ -1070,7 +1094,7 @@ impl DBWithThreadMode { pub fn put_cf_opt( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, key: K, value: V, writeopts: &WriteOptions, @@ -1119,7 +1143,7 @@ impl DBWithThreadMode { pub fn merge_cf_opt( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, key: K, value: V, writeopts: &WriteOptions, @@ -1165,7 +1189,7 @@ impl DBWithThreadMode { pub fn delete_cf_opt>( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, key: K, writeopts: &WriteOptions, ) -> Result<(), Error> { @@ -1186,7 +1210,7 @@ impl DBWithThreadMode { /// Removes the database entries in the range `["from", "to")` using given write options. pub fn delete_range_cf_opt>( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, from: K, to: K, writeopts: &WriteOptions, @@ -1216,7 +1240,7 @@ impl DBWithThreadMode { self.put_opt(key.as_ref(), value.as_ref(), &WriteOptions::default()) } - pub fn put_cf(&self, cf: impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error> + pub fn put_cf(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error> where K: AsRef<[u8]>, V: AsRef<[u8]>, @@ -1232,7 +1256,7 @@ impl DBWithThreadMode { self.merge_opt(key.as_ref(), value.as_ref(), &WriteOptions::default()) } - pub fn merge_cf(&self, cf: impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error> + pub fn merge_cf(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error> where K: AsRef<[u8]>, V: AsRef<[u8]>, @@ -1246,7 +1270,7 @@ impl DBWithThreadMode { pub fn delete_cf>( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, key: K, ) -> Result<(), Error> { self.delete_cf_opt(cf, key.as_ref(), &WriteOptions::default()) @@ -1255,7 +1279,7 @@ impl DBWithThreadMode { /// Removes the database entries in the range `["from", "to")` using default write options. pub fn delete_range_cf>( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, from: K, to: K, ) -> Result<(), Error> { @@ -1304,7 +1328,7 @@ impl DBWithThreadMode { /// given column family. This is not likely to be needed for typical usage. pub fn compact_range_cf, E: AsRef<[u8]>>( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, start: Option, end: Option, ) { @@ -1326,7 +1350,7 @@ impl DBWithThreadMode { /// Same as `compact_range_cf` but with custom options. pub fn compact_range_cf_opt, E: AsRef<[u8]>>( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, start: Option, end: Option, opts: &CompactOptions, @@ -1365,7 +1389,7 @@ impl DBWithThreadMode { pub fn set_options_cf( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, opts: &[(&str, &str)], ) -> Result<(), Error> { let copts = convert_options(opts)?; @@ -1426,7 +1450,7 @@ impl DBWithThreadMode { /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634). pub fn property_value_cf( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, name: &str, ) -> Result, Error> { let prop_name = match CString::new(name) { @@ -1484,7 +1508,7 @@ impl DBWithThreadMode { /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689). pub fn property_int_value_cf( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, name: &str, ) -> Result, Error> { match self.property_value_cf(cf, name) { @@ -1561,7 +1585,7 @@ impl DBWithThreadMode { /// with default opts pub fn ingest_external_file_cf>( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, paths: Vec

, ) -> Result<(), Error> { let opts = IngestExternalFileOptions::default(); @@ -1571,7 +1595,7 @@ impl DBWithThreadMode { /// Loads a list of external SST files created with SstFileWriter into the DB for given Column Family pub fn ingest_external_file_cf_opts>( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, opts: &IngestExternalFileOptions, paths: Vec

, ) -> Result<(), Error> { @@ -1604,7 +1628,7 @@ impl DBWithThreadMode { fn ingest_external_file_raw_cf( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, opts: &IngestExternalFileOptions, paths_v: &[CString], cpaths: &[*const c_char], @@ -1693,7 +1717,7 @@ impl DBWithThreadMode { /// Same as `delete_file_in_range` but only for specific column family pub fn delete_file_in_range_cf>( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, from: K, to: K, ) -> Result<(), Error> { @@ -1718,6 +1742,21 @@ impl DBWithThreadMode { ffi::rocksdb_cancel_all_background_work(self.inner, wait as u8); } } + + fn drop_column_family( + &self, + cf_inner: *mut ffi::rocksdb_column_family_handle_t, + cf: C, + ) -> Result<(), Error> { + unsafe { + // first mark the column family as dropped + ffi_try!(ffi::rocksdb_drop_column_family(self.inner, cf_inner)); + } + // then finally reclaim any resources (mem, files) by destroying the only single column + // family handle by drop()-ing it + drop(cf); + Ok(()) + } } impl DBWithThreadMode { @@ -1732,12 +1771,8 @@ impl DBWithThreadMode { /// Drops the column family with the given name pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> { - let inner = self.inner; if let Some(cf) = self.cfs.cfs.remove(name) { - unsafe { - ffi_try!(ffi::rocksdb_drop_column_family(inner, cf.inner)); - } - Ok(()) + self.drop_column_family(cf.inner, cf) } else { Err(Error::new(format!("Invalid column family: {}", name))) } @@ -1753,46 +1788,39 @@ impl DBWithThreadMode { /// Creates column family with given name and options pub fn create_cf>(&self, name: N, opts: &Options) -> Result<(), Error> { let inner = self.create_inner_cf_handle(name.as_ref(), opts)?; - self.cfs - .cfs - .write() - .unwrap() - .insert(name.as_ref().to_string(), ColumnFamily { inner }); + self.cfs.cfs.write().unwrap().insert( + name.as_ref().to_string(), + Arc::new(UnboundColumnFamily { inner }), + ); Ok(()) } /// Drops the column family with the given name by internally locking the inner column /// family map. This avoids needing `&mut self` reference pub fn drop_cf(&self, name: &str) -> Result<(), Error> { - let inner = self.inner; if let Some(cf) = self.cfs.cfs.write().unwrap().remove(name) { - unsafe { - ffi_try!(ffi::rocksdb_drop_column_family(inner, cf.inner)); - } - Ok(()) + self.drop_column_family(cf.inner, cf) } else { Err(Error::new(format!("Invalid column family: {}", name))) } } /// Returns the underlying column family handle - pub fn cf_handle(&self, name: &str) -> Option { + pub fn cf_handle(&self, name: &str) -> Option> { self.cfs .cfs .read() .unwrap() .get(name) - .map(|cf| BoundColumnFamily { - inner: cf.inner, - multi_threaded_cfs: PhantomData, - }) + .cloned() + .map(UnboundColumnFamily::bound_column_family) } } impl Drop for DBWithThreadMode { fn drop(&mut self) { unsafe { - self.cfs.cf_drop_all(); + self.cfs.drop_all_cfs_internal(); ffi::rocksdb_close(self.inner); } } diff --git a/src/lib.rs b/src/lib.rs index 869135f..be9d02a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -102,7 +102,7 @@ pub use crate::{ ColumnFamilyRef, DEFAULT_COLUMN_FAMILY_NAME, }, compaction_filter::Decision as CompactionDecision, - db::{DBWithThreadMode, LiveFile, MultiThreaded, SingleThreaded, DB}, + db::{DBWithThreadMode, LiveFile, MultiThreaded, SingleThreaded, ThreadMode, DB}, db_iterator::{ DBIterator, DBIteratorWithThreadMode, DBRawIterator, DBRawIteratorWithThreadMode, DBWALIterator, Direction, IteratorMode, diff --git a/src/snapshot.rs b/src/snapshot.rs index 0f77b03..a4356ca 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -61,7 +61,7 @@ impl<'a, D: DBAccess> SnapshotWithThreadMode<'a, D> { /// the default read options. pub fn iterator_cf( &self, - cf_handle: impl AsColumnFamilyRef, + cf_handle: &impl AsColumnFamilyRef, mode: IteratorMode, ) -> DBIteratorWithThreadMode { let readopts = ReadOptions::default(); @@ -82,7 +82,7 @@ impl<'a, D: DBAccess> SnapshotWithThreadMode<'a, D> { /// the given read options. pub fn iterator_cf_opt( &self, - cf_handle: impl AsColumnFamilyRef, + cf_handle: &impl AsColumnFamilyRef, mut readopts: ReadOptions, mode: IteratorMode, ) -> DBIteratorWithThreadMode { @@ -100,7 +100,7 @@ impl<'a, D: DBAccess> SnapshotWithThreadMode<'a, D> { /// the default read options. pub fn raw_iterator_cf( &self, - cf_handle: impl AsColumnFamilyRef, + cf_handle: &impl AsColumnFamilyRef, ) -> DBRawIteratorWithThreadMode { let readopts = ReadOptions::default(); self.raw_iterator_cf_opt(cf_handle, readopts) @@ -116,7 +116,7 @@ impl<'a, D: DBAccess> SnapshotWithThreadMode<'a, D> { /// the given read options. pub fn raw_iterator_cf_opt( &self, - cf_handle: impl AsColumnFamilyRef, + cf_handle: &impl AsColumnFamilyRef, mut readopts: ReadOptions, ) -> DBRawIteratorWithThreadMode { readopts.set_snapshot(self); @@ -133,7 +133,7 @@ impl<'a, D: DBAccess> SnapshotWithThreadMode<'a, D> { /// options. pub fn get_cf>( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, key: K, ) -> Result>, Error> { let readopts = ReadOptions::default(); @@ -153,7 +153,7 @@ impl<'a, D: DBAccess> SnapshotWithThreadMode<'a, D> { /// Returns the bytes associated with a key value, given column family and read options. pub fn get_cf_opt>( &self, - cf: impl AsColumnFamilyRef, + cf: &impl AsColumnFamilyRef, key: K, mut readopts: ReadOptions, ) -> Result>, Error> { diff --git a/src/write_batch.rs b/src/write_batch.rs index 8e09709..16a9ff7 100644 --- a/src/write_batch.rs +++ b/src/write_batch.rs @@ -134,7 +134,7 @@ impl WriteBatch { } } - pub fn put_cf(&mut self, cf: impl AsColumnFamilyRef, key: K, value: V) + pub fn put_cf(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V) where K: AsRef<[u8]>, V: AsRef<[u8]>, @@ -173,7 +173,7 @@ impl WriteBatch { } } - pub fn merge_cf(&mut self, cf: impl AsColumnFamilyRef, key: K, value: V) + pub fn merge_cf(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V) where K: AsRef<[u8]>, V: AsRef<[u8]>, @@ -206,7 +206,7 @@ impl WriteBatch { } } - pub fn delete_cf>(&mut self, cf: impl AsColumnFamilyRef, key: K) { + pub fn delete_cf>(&mut self, cf: &impl AsColumnFamilyRef, key: K) { let key = key.as_ref(); unsafe { @@ -243,7 +243,7 @@ impl WriteBatch { /// Removes the database entries in the range ["begin_key", "end_key"), i.e., /// including "begin_key" and excluding "end_key". It is not an error if no /// keys exist in the range ["begin_key", "end_key"). - pub fn delete_range_cf>(&mut self, cf: impl AsColumnFamilyRef, from: K, to: K) { + pub fn delete_range_cf>(&mut self, cf: &impl AsColumnFamilyRef, from: K, to: K) { let (start_key, end_key) = (from.as_ref(), to.as_ref()); unsafe { diff --git a/tests/test_column_family.rs b/tests/test_column_family.rs index 40db910..21bede5 100644 --- a/tests/test_column_family.rs +++ b/tests/test_column_family.rs @@ -19,6 +19,25 @@ use pretty_assertions::assert_eq; use rocksdb::{ColumnFamilyDescriptor, MergeOperands, Options, DB, DEFAULT_COLUMN_FAMILY_NAME}; use util::DBPath; +use std::fs; +use std::io; +use std::path::Path; + +fn dir_size(path: impl AsRef) -> io::Result { + fn dir_size(mut dir: fs::ReadDir) -> io::Result { + dir.try_fold(0, |acc, file| { + let file = file?; + let size = match file.metadata()? { + data if data.is_dir() => dir_size(fs::read_dir(file.path())?)?, + data => data.len(), + }; + Ok(acc + size) + }) + } + + dir_size(fs::read_dir(path)?) +} + #[test] fn test_column_family() { let n = DBPath::new("_rust_rocksdb_cftest"); @@ -143,15 +162,15 @@ fn test_merge_operator() { Err(e) => panic!("failed to open db with column family: {}", e), }; let cf1 = db.cf_handle("cf1").unwrap(); - assert!(db.put_cf(cf1, b"k1", b"v1").is_ok()); - assert_eq!(db.get_cf(cf1, b"k1").unwrap().unwrap(), b"v1"); - let p = db.put_cf(cf1, b"k1", b"a"); + assert!(db.put_cf(&cf1, b"k1", b"v1").is_ok()); + assert_eq!(db.get_cf(&cf1, b"k1").unwrap().unwrap(), b"v1"); + let p = db.put_cf(&cf1, b"k1", b"a"); assert!(p.is_ok()); - db.merge_cf(cf1, b"k1", b"b").unwrap(); - db.merge_cf(cf1, b"k1", b"c").unwrap(); - db.merge_cf(cf1, b"k1", b"d").unwrap(); - db.merge_cf(cf1, b"k1", b"efg").unwrap(); - let m = db.merge_cf(cf1, b"k1", b"h"); + db.merge_cf(&cf1, b"k1", b"b").unwrap(); + db.merge_cf(&cf1, b"k1", b"c").unwrap(); + db.merge_cf(&cf1, b"k1", b"d").unwrap(); + db.merge_cf(&cf1, b"k1", b"efg").unwrap(); + let m = db.merge_cf(&cf1, b"k1", b"h"); println!("m is {:?}", m); // TODO assert!(m.is_ok()); match db.get(b"k1") { @@ -163,7 +182,7 @@ fn test_merge_operator() { _ => panic!("value not present!"), } - let _ = db.get_cf(cf1, b"k1"); + let _ = db.get_cf(&cf1, b"k1"); // TODO assert!(r.unwrap().as_ref() == b"abcdefgh"); assert!(db.delete(b"k1").is_ok()); assert!(db.get(b"k1").unwrap().is_none()); @@ -250,3 +269,66 @@ fn test_create_duplicate_column_family() { assert!(db.create_cf("cf1", &opts).is_err()); } } + +#[test] +fn test_no_leaked_column_family() { + let n = DBPath::new("_rust_rocksdb_no_leaked_column_family"); + { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + + let mut write_options = rocksdb::WriteOptions::default(); + write_options.set_sync(false); + write_options.disable_wal(true); + + let mut db = DB::open(&opts, &n).unwrap(); + let large_blob = [0x20; 1024 * 1024]; + + #[cfg(feature = "multi-threaded-cf")] + let mut outlived_cf = None; + + // repeat creating and dropping cfs many time to indirectly detect + // possible leak via large dir. + for cf_index in 0..20 { + let cf_name = format!("cf{}", cf_index); + db.create_cf(&cf_name, &Options::default()).unwrap(); + let cf = db.cf_handle(&cf_name).unwrap(); + + let mut batch = rocksdb::WriteBatch::default(); + for key_index in 0..100 { + batch.put_cf(&cf, format!("k{}", key_index), &large_blob); + } + db.write_opt(batch, &write_options).unwrap(); + + // force create an SST file + db.flush_cf(&cf).unwrap(); + + db.drop_cf(&cf_name).unwrap(); + + #[cfg(feature = "multi-threaded-cf")] + { + outlived_cf = Some(cf); + } + } + + // if we're not leaking, the dir bytes should be well under 10M bytes in total + let dir_bytes = dir_size(&n).unwrap(); + assert!( + dir_bytes < 10_000_000, + "{} is too large (maybe leaking...)", + dir_bytes + ); + + // only if MultiThreaded, cf can outlive db.drop_cf() and shouldn't cause SEGV... + #[cfg(feature = "multi-threaded-cf")] + { + let outlived_cf = outlived_cf.unwrap(); + assert_eq!(db.get_cf(&outlived_cf, "k0").unwrap().unwrap(), &large_blob); + drop(outlived_cf); + } + + // make it explicit not to drop the db until we get dir size above... + drop(db); + } +} diff --git a/tests/test_db.rs b/tests/test_db.rs index 6d70ffd..0504475 100644 --- a/tests/test_db.rs +++ b/tests/test_db.rs @@ -348,27 +348,27 @@ fn set_option_cf_test() { let cf = db.cf_handle("cf1").unwrap(); // set an option to valid values assert!(db - .set_options_cf(cf, &[("disable_auto_compactions", "true")]) + .set_options_cf(&cf, &[("disable_auto_compactions", "true")]) .is_ok()); assert!(db - .set_options_cf(cf, &[("disable_auto_compactions", "false")]) + .set_options_cf(&cf, &[("disable_auto_compactions", "false")]) .is_ok()); // invalid names/values should result in an error assert!(db - .set_options_cf(cf, &[("disable_auto_compactions", "INVALID_VALUE")]) + .set_options_cf(&cf, &[("disable_auto_compactions", "INVALID_VALUE")]) .is_err()); assert!(db - .set_options_cf(cf, &[("INVALID_NAME", "INVALID_VALUE")]) + .set_options_cf(&cf, &[("INVALID_NAME", "INVALID_VALUE")]) .is_err()); // option names/values must not contain NULLs assert!(db - .set_options_cf(cf, &[("disable_auto_compactions", "true\0")]) + .set_options_cf(&cf, &[("disable_auto_compactions", "true\0")]) .is_err()); assert!(db - .set_options_cf(cf, &[("disable_auto_compactions\0", "true")]) + .set_options_cf(&cf, &[("disable_auto_compactions\0", "true")]) .is_err()); // empty options are not allowed - assert!(db.set_options_cf(cf, &[]).is_err()); + assert!(db.set_options_cf(&cf, &[]).is_err()); // multiple options can be set in a single API call let multiple_options = [ ("paranoid_file_checks", "true"), @@ -538,14 +538,14 @@ fn test_open_cf_with_ttl() { opts.create_missing_column_families(true); let db = DB::open_cf_with_ttl(&opts, &path, &["test_cf"], Duration::from_secs(1)).unwrap(); let cf = db.cf_handle("test_cf").unwrap(); - db.put_cf(cf, b"key1", b"value1").unwrap(); + db.put_cf(&cf, b"key1", b"value1").unwrap(); thread::sleep(Duration::from_secs(2)); // Trigger a manual compaction, this will check the TTL filter // in the database and drop all expired entries. - db.compact_range_cf(cf, None::<&[u8]>, None::<&[u8]>); + db.compact_range_cf(&cf, None::<&[u8]>, None::<&[u8]>); - assert!(db.get_cf(cf, b"key1").unwrap().is_none()); + assert!(db.get_cf(&cf, b"key1").unwrap().is_none()); } #[test] @@ -606,13 +606,13 @@ fn compact_range_test() { let cfs = vec!["cf1"]; let db = DB::open_cf(&opts, &path, cfs).unwrap(); let cf1 = db.cf_handle("cf1").unwrap(); - db.put_cf(cf1, b"k1", b"v1").unwrap(); - db.put_cf(cf1, b"k2", b"v2").unwrap(); - db.put_cf(cf1, b"k3", b"v3").unwrap(); - db.put_cf(cf1, b"k4", b"v4").unwrap(); - db.put_cf(cf1, b"k5", b"v5").unwrap(); - db.compact_range_cf(cf1, Some(b"k2"), Some(b"k4")); - db.compact_range_cf_opt(cf1, Some(b"k1"), None::<&str>, &compact_opts); + db.put_cf(&cf1, b"k1", b"v1").unwrap(); + db.put_cf(&cf1, b"k2", b"v2").unwrap(); + db.put_cf(&cf1, b"k3", b"v3").unwrap(); + db.put_cf(&cf1, b"k4", b"v4").unwrap(); + db.put_cf(&cf1, b"k5", b"v5").unwrap(); + db.compact_range_cf(&cf1, Some(b"k2"), Some(b"k4")); + db.compact_range_cf_opt(&cf1, Some(b"k1"), None::<&str>, &compact_opts); // put and compact default column family db.put(b"k1", b"v1").unwrap(); @@ -645,12 +645,12 @@ fn fifo_compaction_test() { let cfs = vec!["cf1"]; let db = DB::open_cf(&opts, &path, cfs).unwrap(); let cf1 = db.cf_handle("cf1").unwrap(); - db.put_cf(cf1, b"k1", b"v1").unwrap(); - db.put_cf(cf1, b"k2", b"v2").unwrap(); - db.put_cf(cf1, b"k3", b"v3").unwrap(); - db.put_cf(cf1, b"k4", b"v4").unwrap(); - db.put_cf(cf1, b"k5", b"v5").unwrap(); - db.compact_range_cf(cf1, Some(b"k2"), Some(b"k4")); + db.put_cf(&cf1, b"k1", b"v1").unwrap(); + db.put_cf(&cf1, b"k2", b"v2").unwrap(); + db.put_cf(&cf1, b"k3", b"v3").unwrap(); + db.put_cf(&cf1, b"k4", b"v4").unwrap(); + db.put_cf(&cf1, b"k5", b"v5").unwrap(); + db.compact_range_cf(&cf1, Some(b"k2"), Some(b"k4")); // check stats let ctx = PerfContext::default(); @@ -880,15 +880,15 @@ fn test_open_cf_for_read_only() { opts.create_missing_column_families(true); let db = DB::open_cf(&opts, &path, cfs.clone()).unwrap(); let cf1 = db.cf_handle("cf1").unwrap(); - db.put_cf(cf1, b"k1", b"v1").unwrap(); + db.put_cf(&cf1, b"k1", b"v1").unwrap(); } { let opts = Options::default(); let error_if_log_file_exist = false; let db = DB::open_cf_for_read_only(&opts, &path, cfs, error_if_log_file_exist).unwrap(); let cf1 = db.cf_handle("cf1").unwrap(); - assert_eq!(db.get_cf(cf1, b"k1").unwrap().unwrap(), b"v1"); - assert!(db.put_cf(cf1, b"k2", b"v2").is_err()); + assert_eq!(db.get_cf(&cf1, b"k1").unwrap().unwrap(), b"v1"); + assert!(db.put_cf(&cf1, b"k2", b"v2").is_err()); } } @@ -904,18 +904,18 @@ fn delete_range_test() { let db = DB::open_cf(&opts, &path, cfs).unwrap(); let cf1 = db.cf_handle("cf1").unwrap(); - db.put_cf(cf1, b"k1", b"v1").unwrap(); - db.put_cf(cf1, b"k2", b"v2").unwrap(); - db.put_cf(cf1, b"k3", b"v3").unwrap(); - db.put_cf(cf1, b"k4", b"v4").unwrap(); - db.put_cf(cf1, b"k5", b"v5").unwrap(); - - db.delete_range_cf(cf1, b"k2", b"k4").unwrap(); - assert_eq!(db.get_cf(cf1, b"k1").unwrap().unwrap(), b"v1"); - assert_eq!(db.get_cf(cf1, b"k4").unwrap().unwrap(), b"v4"); - assert_eq!(db.get_cf(cf1, b"k5").unwrap().unwrap(), b"v5"); - assert!(db.get_cf(cf1, b"k2").unwrap().is_none()); - assert!(db.get_cf(cf1, b"k3").unwrap().is_none()); + db.put_cf(&cf1, b"k1", b"v1").unwrap(); + db.put_cf(&cf1, b"k2", b"v2").unwrap(); + db.put_cf(&cf1, b"k3", b"v3").unwrap(); + db.put_cf(&cf1, b"k4", b"v4").unwrap(); + db.put_cf(&cf1, b"k5", b"v5").unwrap(); + + db.delete_range_cf(&cf1, b"k2", b"k4").unwrap(); + assert_eq!(db.get_cf(&cf1, b"k1").unwrap().unwrap(), b"v1"); + assert_eq!(db.get_cf(&cf1, b"k4").unwrap().unwrap(), b"v4"); + assert_eq!(db.get_cf(&cf1, b"k5").unwrap().unwrap(), b"v5"); + assert!(db.get_cf(&cf1, b"k2").unwrap().is_none()); + assert!(db.get_cf(&cf1, b"k3").unwrap().is_none()); } } @@ -955,13 +955,13 @@ fn multi_get_cf() { let cf0 = db.cf_handle("cf0").unwrap(); let cf1 = db.cf_handle("cf1").unwrap(); - db.put_cf(cf1, b"k1", b"v1").unwrap(); + db.put_cf(&cf1, b"k1", b"v1").unwrap(); let cf2 = db.cf_handle("cf2").unwrap(); - db.put_cf(cf2, b"k2", b"v2").unwrap(); + db.put_cf(&cf2, b"k2", b"v2").unwrap(); let values = db - .multi_get_cf(vec![(cf0, b"k0"), (cf1, b"k1"), (cf2, b"k2")]) + .multi_get_cf(vec![(&cf0, b"k0"), (&cf1, b"k1"), (&cf2, b"k2")]) .into_iter() .map(Result::unwrap) .collect::>(); @@ -997,10 +997,10 @@ fn key_may_exist_cf() { let db = DB::open_cf(&opts, &path, &["cf"]).unwrap(); let cf = db.cf_handle("cf").unwrap(); - assert_eq!(false, db.key_may_exist_cf(cf, "nonexistent")); + assert_eq!(false, db.key_may_exist_cf(&cf, "nonexistent")); assert_eq!( false, - db.key_may_exist_cf_opt(cf, "nonexistent", &ReadOptions::default()) + db.key_may_exist_cf_opt(&cf, "nonexistent", &ReadOptions::default()) ); } } diff --git a/tests/test_property.rs b/tests/test_property.rs index 2b9d399..5571441 100644 --- a/tests/test_property.rs +++ b/tests/test_property.rs @@ -38,7 +38,7 @@ fn property_cf_test() { let mut db = DB::open_default(&n).unwrap(); db.create_cf("cf1", &opts).unwrap(); let cf = db.cf_handle("cf1").unwrap(); - let value = db.property_value_cf(cf, "rocksdb.stats").unwrap().unwrap(); + let value = db.property_value_cf(&cf, "rocksdb.stats").unwrap().unwrap(); assert!(value.contains("Stats")); } @@ -66,7 +66,7 @@ fn property_int_cf_test() { db.create_cf("cf1", &opts).unwrap(); let cf = db.cf_handle("cf1").unwrap(); let total_keys = db - .property_int_value_cf(cf, "rocksdb.estimate-num-keys") + .property_int_value_cf(&cf, "rocksdb.estimate-num-keys") .unwrap(); assert_eq!(total_keys, Some(0));