Don't leak dropped column families (#509)

master
Ryo Onodera 4 years ago committed by GitHub
parent 32fbd92444
commit abf121f20c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 62
      src/column_family.rs
  2. 202
      src/db.rs
  3. 2
      src/lib.rs
  4. 12
      src/snapshot.rs
  5. 8
      src/write_batch.rs
  6. 100
      tests/test_column_family.rs
  7. 86
      tests/test_db.rs
  8. 4
      tests/test_property.rs

@ -14,6 +14,8 @@
use crate::{db::MultiThreaded, ffi, Options};
use std::sync::Arc;
/// The name of the default column family.
///
/// The column family with this name is created implicitly whenever column
@ -52,19 +54,62 @@ pub struct ColumnFamily {
/// single-threaded mode). `Clone`/`Copy` is safe because this lifetime is bound to DB like
/// iterators/snapshots. On top of it, this is as cheap and small as `&ColumnFamily` because
/// this only has a single pointer-wide field.
#[derive(Clone, Copy)]
pub struct BoundColumnFamily<'a> {
pub(crate) inner: *mut ffi::rocksdb_column_family_handle_t,
pub(crate) multi_threaded_cfs: std::marker::PhantomData<&'a MultiThreaded>,
}
// internal struct which isn't exposed to public api.
// but its memory will be exposed after transmute()-ing to BoundColumnFamily.
// ColumnFamily's lifetime should be bound to DB. But, db holds cfs and cfs can't easily
// self-reference DB as its lifetime due to rust's type system
pub(crate) struct UnboundColumnFamily {
pub(crate) inner: *mut ffi::rocksdb_column_family_handle_t,
}
impl UnboundColumnFamily {
pub(crate) fn bound_column_family<'a>(self: Arc<Self>) -> Arc<BoundColumnFamily<'a>> {
// SAFETY: the new BoundColumnFamily here just adding lifetime,
// so that column family handle won't outlive db.
unsafe { std::mem::transmute(self) }
}
}
fn destroy_handle(handle: *mut ffi::rocksdb_column_family_handle_t) {
// SAFETY: This should be called only from various Drop::drop(), strictly keeping a 1-to-1
// ownership to avoid double invocation to the rocksdb function with same handle.
unsafe {
ffi::rocksdb_column_family_handle_destroy(handle);
}
}
impl Drop for ColumnFamily {
fn drop(&mut self) {
destroy_handle(self.inner);
}
}
// these behaviors must be identical between BoundColumnFamily and UnboundColumnFamily
// due to the unsafe transmute() in bound_column_family()!
impl<'a> Drop for BoundColumnFamily<'a> {
fn drop(&mut self) {
destroy_handle(self.inner);
}
}
impl Drop for UnboundColumnFamily {
fn drop(&mut self) {
destroy_handle(self.inner);
}
}
/// Handy type alias to hide actual type difference to reference [`ColumnFamily`]
/// depending on the `multi-threaded-cf` crate feature.
#[cfg(not(feature = "multi-threaded-cf"))]
pub type ColumnFamilyRef<'a> = &'a ColumnFamily;
#[cfg(feature = "multi-threaded-cf")]
pub type ColumnFamilyRef<'a> = BoundColumnFamily<'a>;
pub type ColumnFamilyRef<'a> = Arc<BoundColumnFamily<'a>>;
/// Utility trait to accept both supported references to `ColumnFamily`
/// (`&ColumnFamily` and `BoundColumnFamily`)
@ -72,13 +117,24 @@ pub trait AsColumnFamilyRef {
fn inner(&self) -> *mut ffi::rocksdb_column_family_handle_t;
}
impl AsColumnFamilyRef for ColumnFamily {
fn inner(&self) -> *mut ffi::rocksdb_column_family_handle_t {
self.inner
}
}
impl<'a> AsColumnFamilyRef for &'a ColumnFamily {
fn inner(&self) -> *mut ffi::rocksdb_column_family_handle_t {
self.inner
}
}
impl<'a> AsColumnFamilyRef for BoundColumnFamily<'a> {
// Only implement for Arc-ed BoundColumnFamily as this tightly coupled and
// implmetation detail, considering use of std::mem::transmute. BoundColumnFamily
// isn't expected to be used as naked.
// Also, ColumnFamilyRef might not be Arc<BoundColumnFamily<'a>> depending crate
// feature flags so, we can't use the type alias here.
impl<'a> AsColumnFamilyRef for Arc<BoundColumnFamily<'a>> {
fn inner(&self) -> *mut ffi::rocksdb_column_family_handle_t {
self.inner
}

@ -16,6 +16,7 @@
use crate::{
column_family::AsColumnFamilyRef,
column_family::BoundColumnFamily,
column_family::UnboundColumnFamily,
db_options::OptionsMustOutliveDB,
ffi,
ffi_util::{from_cstr, opt_bytes_to_ptr, raw_data, to_cpath},
@ -31,24 +32,35 @@ use std::ffi::{CStr, CString};
use std::fmt;
use std::fs;
use std::iter;
use std::marker::PhantomData;
use std::path::Path;
use std::path::PathBuf;
use std::ptr;
use std::slice;
use std::str;
use std::sync::Arc;
use std::sync::RwLock;
use std::time::Duration;
// Marker trait to specify single or multi threaded column family alternations for DB
// Also, this is minimum common API sharable between SingleThreaded and
// MultiThreaded. Others differ in self mutability and return type.
/// Marker trait to specify single or multi threaded column family alternations for
/// [`DBWithThreadMode<T>`]
///
/// This arrangement makes differences in self mutability and return type in
/// some of `DBWithThreadMode` methods.
///
/// While being a marker trait to be generic over `DBWithThreadMode`, this trait
/// also has a minimum set of not-encapsulated internal methods between
/// [`SingleThreaded`] and [`MultiThreaded`]. These methods aren't expected to be
/// called and defined externally.
pub trait ThreadMode {
fn new(cf_map: BTreeMap<String, ColumnFamily>) -> Self;
fn cf_drop_all(&mut self);
/// Internal implementation for storing column family handles
fn new_cf_map_internal(
cf_map: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
) -> Self;
/// Internal implementation for dropping column family handles
fn drop_all_cfs_internal(&mut self);
}
/// Actual marker type for the internal marker trait `ThreadMode`, which holds
/// Actual marker type for the marker trait `ThreadMode`, which holds
/// a collection of column families without synchronization primitive, providing
/// no overhead for the single-threaded column family alternations. The other
/// mode is [`MultiThreaded`].
@ -58,47 +70,56 @@ pub struct SingleThreaded {
cfs: BTreeMap<String, ColumnFamily>,
}
/// Actual marker type for the internal marker trait `ThreadMode`, which holds
/// Actual marker type for the marker trait `ThreadMode`, which holds
/// a collection of column families wrapped in a RwLock to be mutated
/// concurrently. The other mode is [`SingleThreaded`].
///
/// See [`DB`] for more details, including performance implications for each mode
pub struct MultiThreaded {
cfs: RwLock<BTreeMap<String, ColumnFamily>>,
cfs: RwLock<BTreeMap<String, Arc<UnboundColumnFamily>>>,
}
impl ThreadMode for SingleThreaded {
fn new(cfs: BTreeMap<String, ColumnFamily>) -> Self {
Self { cfs }
fn new_cf_map_internal(
cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
) -> Self {
Self {
cfs: cfs
.into_iter()
.map(|(n, c)| (n, ColumnFamily { inner: c }))
.collect(),
}
}
fn cf_drop_all(&mut self) {
for cf in self.cfs.values() {
unsafe {
ffi::rocksdb_column_family_handle_destroy(cf.inner);
}
}
fn drop_all_cfs_internal(&mut self) {
// Cause all ColumnFamily objects to be Drop::drop()-ed.
self.cfs.clear();
}
}
impl ThreadMode for MultiThreaded {
fn new(cfs: BTreeMap<String, ColumnFamily>) -> Self {
fn new_cf_map_internal(
cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
) -> Self {
Self {
cfs: RwLock::new(cfs),
cfs: RwLock::new(
cfs.into_iter()
.map(|(n, c)| (n, Arc::new(UnboundColumnFamily { inner: c })))
.collect(),
),
}
}
fn cf_drop_all(&mut self) {
for cf in self.cfs.read().unwrap().values() {
unsafe {
ffi::rocksdb_column_family_handle_destroy(cf.inner);
}
}
fn drop_all_cfs_internal(&mut self) {
// Cause all UnboundColumnFamily objects to be Drop::drop()-ed.
self.cfs.write().unwrap().clear();
}
}
/// A RocksDB database.
///
/// This is previously named [`DB`], which is a type alias now for compatibility.
///
/// See crate level documentation for a simple usage example.
pub struct DBWithThreadMode<T: ThreadMode> {
pub(crate) inner: *mut ffi::rocksdb_t,
@ -120,7 +141,7 @@ pub trait DBAccess {
fn get_cf_opt<K: AsRef<[u8]>>(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
key: K,
readopts: &ReadOptions,
) -> Result<Option<Vec<u8>>, Error>;
@ -141,7 +162,7 @@ impl<T: ThreadMode> DBAccess for DBWithThreadMode<T> {
fn get_cf_opt<K: AsRef<[u8]>>(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
key: K,
readopts: &ReadOptions,
) -> Result<Option<Vec<u8>>, Error> {
@ -154,7 +175,7 @@ impl<T: ThreadMode> DBAccess for DBWithThreadMode<T> {
///
/// # Compatibility and multi-threaded mode
///
/// Previously, `DB` was defined as a direct struct. Now, it's type-aliased for
/// Previously, [`DB`] was defined as a direct `struct`. Now, it's type-aliased for
/// compatibility. Use `DBWithThreadMode<MultiThreaded>` for multi-threaded
/// column family alternations.
///
@ -421,7 +442,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
}
for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
cf_map.insert(cf_desc.name.clone(), ColumnFamily { inner });
cf_map.insert(cf_desc.name.clone(), inner);
}
}
@ -432,7 +453,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
Ok(Self {
inner: db,
path: path.as_ref().to_path_buf(),
cfs: T::new(cf_map),
cfs: T::new_cf_map_internal(cf_map),
_outlive: outlive,
})
}
@ -584,7 +605,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// Flushes database memtables to SST files on the disk for a given column family.
pub fn flush_cf_opt(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
flushopts: &FlushOptions,
) -> Result<(), Error> {
unsafe {
@ -599,7 +620,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// Flushes database memtables to SST files on the disk for a given column family using default
/// options.
pub fn flush_cf(&self, cf: impl AsColumnFamilyRef) -> Result<(), Error> {
pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> {
self.flush_cf_opt(cf, &FlushOptions::default())
}
@ -644,7 +665,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// [`get_pinned_cf_opt`](#method.get_pinned_cf_opt) to avoid unnecessary memory.
pub fn get_cf_opt<K: AsRef<[u8]>>(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
key: K,
readopts: &ReadOptions,
) -> Result<Option<Vec<u8>>, Error> {
@ -657,7 +678,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// [`get_pinned_cf`](#method.get_pinned_cf) to avoid unnecessary memory.
pub fn get_cf<K: AsRef<[u8]>>(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
key: K,
) -> Result<Option<Vec<u8>>, Error> {
self.get_cf_opt(cf, key.as_ref(), &ReadOptions::default())
@ -706,7 +727,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// allows specifying ColumnFamily
pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
key: K,
readopts: &ReadOptions,
) -> Result<Option<DBPinnableSlice>, Error> {
@ -740,7 +761,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// leverages default options.
pub fn get_pinned_cf<K: AsRef<[u8]>>(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
key: K,
) -> Result<Option<DBPinnableSlice>, Error> {
self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
@ -791,24 +812,27 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
}
/// Return the values associated with the given keys and column families.
pub fn multi_get_cf<K, I, W>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
pub fn multi_get_cf<'a, 'b: 'a, K, I, W: 'b>(
&'a self,
keys: I,
) -> Vec<Result<Option<Vec<u8>>, Error>>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = (W, K)>,
I: IntoIterator<Item = (&'b W, K)>,
W: AsColumnFamilyRef,
{
self.multi_get_cf_opt(keys, &ReadOptions::default())
}
/// Return the values associated with the given keys and column families using read options.
pub fn multi_get_cf_opt<K, I, W>(
&self,
pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W: 'b>(
&'a self,
keys: I,
readopts: &ReadOptions,
) -> Vec<Result<Option<Vec<u8>>, Error>>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = (W, K)>,
I: IntoIterator<Item = (&'b W, K)>,
W: AsColumnFamilyRef,
{
let mut boxed_keys: Vec<Box<[u8]>> = Vec::new();
@ -875,7 +899,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// Returns `false` if the given key definitely doesn't exist in the specified column family,
/// otherwise returns `true`. This function uses default `ReadOptions`.
pub fn key_may_exist_cf<K: AsRef<[u8]>>(&self, cf: impl AsColumnFamilyRef, key: K) -> bool {
pub fn key_may_exist_cf<K: AsRef<[u8]>>(&self, cf: &impl AsColumnFamilyRef, key: K) -> bool {
self.key_may_exist_cf_opt(cf, key, &ReadOptions::default())
}
@ -883,7 +907,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// otherwise returns `true`.
pub fn key_may_exist_cf_opt<K: AsRef<[u8]>>(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
key: K,
readopts: &ReadOptions,
) -> bool {
@ -945,7 +969,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions
pub fn iterator_cf_opt<'a: 'b, 'b>(
&'a self,
cf_handle: impl AsColumnFamilyRef,
cf_handle: &impl AsColumnFamilyRef,
readopts: ReadOptions,
mode: IteratorMode,
) -> DBIteratorWithThreadMode<'b, Self> {
@ -979,7 +1003,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
pub fn iterator_cf<'a: 'b, 'b>(
&'a self,
cf_handle: impl AsColumnFamilyRef,
cf_handle: &impl AsColumnFamilyRef,
mode: IteratorMode,
) -> DBIteratorWithThreadMode<'b, Self> {
let opts = ReadOptions::default();
@ -988,7 +1012,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
pub fn full_iterator_cf<'a: 'b, 'b>(
&'a self,
cf_handle: impl AsColumnFamilyRef,
cf_handle: &impl AsColumnFamilyRef,
mode: IteratorMode,
) -> DBIteratorWithThreadMode<'b, Self> {
let mut opts = ReadOptions::default();
@ -998,7 +1022,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
&'a self,
cf_handle: impl AsColumnFamilyRef,
cf_handle: &impl AsColumnFamilyRef,
prefix: P,
) -> DBIteratorWithThreadMode<'a, Self> {
let mut opts = ReadOptions::default();
@ -1020,7 +1044,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// Opens a raw iterator over the given column family, using the default read options
pub fn raw_iterator_cf<'a: 'b, 'b>(
&'a self,
cf_handle: impl AsColumnFamilyRef,
cf_handle: &impl AsColumnFamilyRef,
) -> DBRawIteratorWithThreadMode<'b, Self> {
let opts = ReadOptions::default();
DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
@ -1037,7 +1061,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// Opens a raw iterator over the given column family, using the given read options
pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
&'a self,
cf_handle: impl AsColumnFamilyRef,
cf_handle: &impl AsColumnFamilyRef,
readopts: ReadOptions,
) -> DBRawIteratorWithThreadMode<'b, Self> {
DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
@ -1070,7 +1094,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
pub fn put_cf_opt<K, V>(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
key: K,
value: V,
writeopts: &WriteOptions,
@ -1119,7 +1143,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
pub fn merge_cf_opt<K, V>(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
key: K,
value: V,
writeopts: &WriteOptions,
@ -1165,7 +1189,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
pub fn delete_cf_opt<K: AsRef<[u8]>>(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
key: K,
writeopts: &WriteOptions,
) -> Result<(), Error> {
@ -1186,7 +1210,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// Removes the database entries in the range `["from", "to")` using given write options.
pub fn delete_range_cf_opt<K: AsRef<[u8]>>(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
from: K,
to: K,
writeopts: &WriteOptions,
@ -1216,7 +1240,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
self.put_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
}
pub fn put_cf<K, V>(&self, cf: impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
@ -1232,7 +1256,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
self.merge_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
}
pub fn merge_cf<K, V>(&self, cf: impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
@ -1246,7 +1270,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
pub fn delete_cf<K: AsRef<[u8]>>(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
key: K,
) -> Result<(), Error> {
self.delete_cf_opt(cf, key.as_ref(), &WriteOptions::default())
@ -1255,7 +1279,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// Removes the database entries in the range `["from", "to")` using default write options.
pub fn delete_range_cf<K: AsRef<[u8]>>(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
from: K,
to: K,
) -> Result<(), Error> {
@ -1304,7 +1328,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// given column family. This is not likely to be needed for typical usage.
pub fn compact_range_cf<S: AsRef<[u8]>, E: AsRef<[u8]>>(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
start: Option<S>,
end: Option<E>,
) {
@ -1326,7 +1350,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// Same as `compact_range_cf` but with custom options.
pub fn compact_range_cf_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
start: Option<S>,
end: Option<E>,
opts: &CompactOptions,
@ -1365,7 +1389,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
pub fn set_options_cf(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
opts: &[(&str, &str)],
) -> Result<(), Error> {
let copts = convert_options(opts)?;
@ -1426,7 +1450,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634).
pub fn property_value_cf(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
name: &str,
) -> Result<Option<String>, Error> {
let prop_name = match CString::new(name) {
@ -1484,7 +1508,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
pub fn property_int_value_cf(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
name: &str,
) -> Result<Option<u64>, Error> {
match self.property_value_cf(cf, name) {
@ -1561,7 +1585,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// with default opts
pub fn ingest_external_file_cf<P: AsRef<Path>>(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
paths: Vec<P>,
) -> Result<(), Error> {
let opts = IngestExternalFileOptions::default();
@ -1571,7 +1595,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// Loads a list of external SST files created with SstFileWriter into the DB for given Column Family
pub fn ingest_external_file_cf_opts<P: AsRef<Path>>(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
opts: &IngestExternalFileOptions,
paths: Vec<P>,
) -> Result<(), Error> {
@ -1604,7 +1628,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
fn ingest_external_file_raw_cf(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
opts: &IngestExternalFileOptions,
paths_v: &[CString],
cpaths: &[*const c_char],
@ -1693,7 +1717,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// Same as `delete_file_in_range` but only for specific column family
pub fn delete_file_in_range_cf<K: AsRef<[u8]>>(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
from: K,
to: K,
) -> Result<(), Error> {
@ -1718,6 +1742,21 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
ffi::rocksdb_cancel_all_background_work(self.inner, wait as u8);
}
}
fn drop_column_family<C>(
&self,
cf_inner: *mut ffi::rocksdb_column_family_handle_t,
cf: C,
) -> Result<(), Error> {
unsafe {
// first mark the column family as dropped
ffi_try!(ffi::rocksdb_drop_column_family(self.inner, cf_inner));
}
// then finally reclaim any resources (mem, files) by destroying the only single column
// family handle by drop()-ing it
drop(cf);
Ok(())
}
}
impl DBWithThreadMode<SingleThreaded> {
@ -1732,12 +1771,8 @@ impl DBWithThreadMode<SingleThreaded> {
/// Drops the column family with the given name
pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
let inner = self.inner;
if let Some(cf) = self.cfs.cfs.remove(name) {
unsafe {
ffi_try!(ffi::rocksdb_drop_column_family(inner, cf.inner));
}
Ok(())
self.drop_column_family(cf.inner, cf)
} else {
Err(Error::new(format!("Invalid column family: {}", name)))
}
@ -1753,46 +1788,39 @@ impl DBWithThreadMode<MultiThreaded> {
/// Creates column family with given name and options
pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
self.cfs
.cfs
.write()
.unwrap()
.insert(name.as_ref().to_string(), ColumnFamily { inner });
self.cfs.cfs.write().unwrap().insert(
name.as_ref().to_string(),
Arc::new(UnboundColumnFamily { inner }),
);
Ok(())
}
/// Drops the column family with the given name by internally locking the inner column
/// family map. This avoids needing `&mut self` reference
pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
let inner = self.inner;
if let Some(cf) = self.cfs.cfs.write().unwrap().remove(name) {
unsafe {
ffi_try!(ffi::rocksdb_drop_column_family(inner, cf.inner));
}
Ok(())
self.drop_column_family(cf.inner, cf)
} else {
Err(Error::new(format!("Invalid column family: {}", name)))
}
}
/// Returns the underlying column family handle
pub fn cf_handle(&self, name: &str) -> Option<BoundColumnFamily> {
pub fn cf_handle(&self, name: &str) -> Option<Arc<BoundColumnFamily>> {
self.cfs
.cfs
.read()
.unwrap()
.get(name)
.map(|cf| BoundColumnFamily {
inner: cf.inner,
multi_threaded_cfs: PhantomData,
})
.cloned()
.map(UnboundColumnFamily::bound_column_family)
}
}
impl<T: ThreadMode> Drop for DBWithThreadMode<T> {
fn drop(&mut self) {
unsafe {
self.cfs.cf_drop_all();
self.cfs.drop_all_cfs_internal();
ffi::rocksdb_close(self.inner);
}
}

@ -102,7 +102,7 @@ pub use crate::{
ColumnFamilyRef, DEFAULT_COLUMN_FAMILY_NAME,
},
compaction_filter::Decision as CompactionDecision,
db::{DBWithThreadMode, LiveFile, MultiThreaded, SingleThreaded, DB},
db::{DBWithThreadMode, LiveFile, MultiThreaded, SingleThreaded, ThreadMode, DB},
db_iterator::{
DBIterator, DBIteratorWithThreadMode, DBRawIterator, DBRawIteratorWithThreadMode,
DBWALIterator, Direction, IteratorMode,

@ -61,7 +61,7 @@ impl<'a, D: DBAccess> SnapshotWithThreadMode<'a, D> {
/// the default read options.
pub fn iterator_cf(
&self,
cf_handle: impl AsColumnFamilyRef,
cf_handle: &impl AsColumnFamilyRef,
mode: IteratorMode,
) -> DBIteratorWithThreadMode<D> {
let readopts = ReadOptions::default();
@ -82,7 +82,7 @@ impl<'a, D: DBAccess> SnapshotWithThreadMode<'a, D> {
/// the given read options.
pub fn iterator_cf_opt(
&self,
cf_handle: impl AsColumnFamilyRef,
cf_handle: &impl AsColumnFamilyRef,
mut readopts: ReadOptions,
mode: IteratorMode,
) -> DBIteratorWithThreadMode<D> {
@ -100,7 +100,7 @@ impl<'a, D: DBAccess> SnapshotWithThreadMode<'a, D> {
/// the default read options.
pub fn raw_iterator_cf(
&self,
cf_handle: impl AsColumnFamilyRef,
cf_handle: &impl AsColumnFamilyRef,
) -> DBRawIteratorWithThreadMode<D> {
let readopts = ReadOptions::default();
self.raw_iterator_cf_opt(cf_handle, readopts)
@ -116,7 +116,7 @@ impl<'a, D: DBAccess> SnapshotWithThreadMode<'a, D> {
/// the given read options.
pub fn raw_iterator_cf_opt(
&self,
cf_handle: impl AsColumnFamilyRef,
cf_handle: &impl AsColumnFamilyRef,
mut readopts: ReadOptions,
) -> DBRawIteratorWithThreadMode<D> {
readopts.set_snapshot(self);
@ -133,7 +133,7 @@ impl<'a, D: DBAccess> SnapshotWithThreadMode<'a, D> {
/// options.
pub fn get_cf<K: AsRef<[u8]>>(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
key: K,
) -> Result<Option<Vec<u8>>, Error> {
let readopts = ReadOptions::default();
@ -153,7 +153,7 @@ impl<'a, D: DBAccess> SnapshotWithThreadMode<'a, D> {
/// Returns the bytes associated with a key value, given column family and read options.
pub fn get_cf_opt<K: AsRef<[u8]>>(
&self,
cf: impl AsColumnFamilyRef,
cf: &impl AsColumnFamilyRef,
key: K,
mut readopts: ReadOptions,
) -> Result<Option<Vec<u8>>, Error> {

@ -134,7 +134,7 @@ impl WriteBatch {
}
}
pub fn put_cf<K, V>(&mut self, cf: impl AsColumnFamilyRef, key: K, value: V)
pub fn put_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
@ -173,7 +173,7 @@ impl WriteBatch {
}
}
pub fn merge_cf<K, V>(&mut self, cf: impl AsColumnFamilyRef, key: K, value: V)
pub fn merge_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
@ -206,7 +206,7 @@ impl WriteBatch {
}
}
pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: impl AsColumnFamilyRef, key: K) {
pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, key: K) {
let key = key.as_ref();
unsafe {
@ -243,7 +243,7 @@ impl WriteBatch {
/// Removes the database entries in the range ["begin_key", "end_key"), i.e.,
/// including "begin_key" and excluding "end_key". It is not an error if no
/// keys exist in the range ["begin_key", "end_key").
pub fn delete_range_cf<K: AsRef<[u8]>>(&mut self, cf: impl AsColumnFamilyRef, from: K, to: K) {
pub fn delete_range_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, from: K, to: K) {
let (start_key, end_key) = (from.as_ref(), to.as_ref());
unsafe {

@ -19,6 +19,25 @@ use pretty_assertions::assert_eq;
use rocksdb::{ColumnFamilyDescriptor, MergeOperands, Options, DB, DEFAULT_COLUMN_FAMILY_NAME};
use util::DBPath;
use std::fs;
use std::io;
use std::path::Path;
fn dir_size(path: impl AsRef<Path>) -> io::Result<u64> {
fn dir_size(mut dir: fs::ReadDir) -> io::Result<u64> {
dir.try_fold(0, |acc, file| {
let file = file?;
let size = match file.metadata()? {
data if data.is_dir() => dir_size(fs::read_dir(file.path())?)?,
data => data.len(),
};
Ok(acc + size)
})
}
dir_size(fs::read_dir(path)?)
}
#[test]
fn test_column_family() {
let n = DBPath::new("_rust_rocksdb_cftest");
@ -143,15 +162,15 @@ fn test_merge_operator() {
Err(e) => panic!("failed to open db with column family: {}", e),
};
let cf1 = db.cf_handle("cf1").unwrap();
assert!(db.put_cf(cf1, b"k1", b"v1").is_ok());
assert_eq!(db.get_cf(cf1, b"k1").unwrap().unwrap(), b"v1");
let p = db.put_cf(cf1, b"k1", b"a");
assert!(db.put_cf(&cf1, b"k1", b"v1").is_ok());
assert_eq!(db.get_cf(&cf1, b"k1").unwrap().unwrap(), b"v1");
let p = db.put_cf(&cf1, b"k1", b"a");
assert!(p.is_ok());
db.merge_cf(cf1, b"k1", b"b").unwrap();
db.merge_cf(cf1, b"k1", b"c").unwrap();
db.merge_cf(cf1, b"k1", b"d").unwrap();
db.merge_cf(cf1, b"k1", b"efg").unwrap();
let m = db.merge_cf(cf1, b"k1", b"h");
db.merge_cf(&cf1, b"k1", b"b").unwrap();
db.merge_cf(&cf1, b"k1", b"c").unwrap();
db.merge_cf(&cf1, b"k1", b"d").unwrap();
db.merge_cf(&cf1, b"k1", b"efg").unwrap();
let m = db.merge_cf(&cf1, b"k1", b"h");
println!("m is {:?}", m);
// TODO assert!(m.is_ok());
match db.get(b"k1") {
@ -163,7 +182,7 @@ fn test_merge_operator() {
_ => panic!("value not present!"),
}
let _ = db.get_cf(cf1, b"k1");
let _ = db.get_cf(&cf1, b"k1");
// TODO assert!(r.unwrap().as_ref() == b"abcdefgh");
assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").unwrap().is_none());
@ -250,3 +269,66 @@ fn test_create_duplicate_column_family() {
assert!(db.create_cf("cf1", &opts).is_err());
}
}
#[test]
fn test_no_leaked_column_family() {
let n = DBPath::new("_rust_rocksdb_no_leaked_column_family");
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
let mut write_options = rocksdb::WriteOptions::default();
write_options.set_sync(false);
write_options.disable_wal(true);
let mut db = DB::open(&opts, &n).unwrap();
let large_blob = [0x20; 1024 * 1024];
#[cfg(feature = "multi-threaded-cf")]
let mut outlived_cf = None;
// repeat creating and dropping cfs many time to indirectly detect
// possible leak via large dir.
for cf_index in 0..20 {
let cf_name = format!("cf{}", cf_index);
db.create_cf(&cf_name, &Options::default()).unwrap();
let cf = db.cf_handle(&cf_name).unwrap();
let mut batch = rocksdb::WriteBatch::default();
for key_index in 0..100 {
batch.put_cf(&cf, format!("k{}", key_index), &large_blob);
}
db.write_opt(batch, &write_options).unwrap();
// force create an SST file
db.flush_cf(&cf).unwrap();
db.drop_cf(&cf_name).unwrap();
#[cfg(feature = "multi-threaded-cf")]
{
outlived_cf = Some(cf);
}
}
// if we're not leaking, the dir bytes should be well under 10M bytes in total
let dir_bytes = dir_size(&n).unwrap();
assert!(
dir_bytes < 10_000_000,
"{} is too large (maybe leaking...)",
dir_bytes
);
// only if MultiThreaded, cf can outlive db.drop_cf() and shouldn't cause SEGV...
#[cfg(feature = "multi-threaded-cf")]
{
let outlived_cf = outlived_cf.unwrap();
assert_eq!(db.get_cf(&outlived_cf, "k0").unwrap().unwrap(), &large_blob);
drop(outlived_cf);
}
// make it explicit not to drop the db until we get dir size above...
drop(db);
}
}

@ -348,27 +348,27 @@ fn set_option_cf_test() {
let cf = db.cf_handle("cf1").unwrap();
// set an option to valid values
assert!(db
.set_options_cf(cf, &[("disable_auto_compactions", "true")])
.set_options_cf(&cf, &[("disable_auto_compactions", "true")])
.is_ok());
assert!(db
.set_options_cf(cf, &[("disable_auto_compactions", "false")])
.set_options_cf(&cf, &[("disable_auto_compactions", "false")])
.is_ok());
// invalid names/values should result in an error
assert!(db
.set_options_cf(cf, &[("disable_auto_compactions", "INVALID_VALUE")])
.set_options_cf(&cf, &[("disable_auto_compactions", "INVALID_VALUE")])
.is_err());
assert!(db
.set_options_cf(cf, &[("INVALID_NAME", "INVALID_VALUE")])
.set_options_cf(&cf, &[("INVALID_NAME", "INVALID_VALUE")])
.is_err());
// option names/values must not contain NULLs
assert!(db
.set_options_cf(cf, &[("disable_auto_compactions", "true\0")])
.set_options_cf(&cf, &[("disable_auto_compactions", "true\0")])
.is_err());
assert!(db
.set_options_cf(cf, &[("disable_auto_compactions\0", "true")])
.set_options_cf(&cf, &[("disable_auto_compactions\0", "true")])
.is_err());
// empty options are not allowed
assert!(db.set_options_cf(cf, &[]).is_err());
assert!(db.set_options_cf(&cf, &[]).is_err());
// multiple options can be set in a single API call
let multiple_options = [
("paranoid_file_checks", "true"),
@ -538,14 +538,14 @@ fn test_open_cf_with_ttl() {
opts.create_missing_column_families(true);
let db = DB::open_cf_with_ttl(&opts, &path, &["test_cf"], Duration::from_secs(1)).unwrap();
let cf = db.cf_handle("test_cf").unwrap();
db.put_cf(cf, b"key1", b"value1").unwrap();
db.put_cf(&cf, b"key1", b"value1").unwrap();
thread::sleep(Duration::from_secs(2));
// Trigger a manual compaction, this will check the TTL filter
// in the database and drop all expired entries.
db.compact_range_cf(cf, None::<&[u8]>, None::<&[u8]>);
db.compact_range_cf(&cf, None::<&[u8]>, None::<&[u8]>);
assert!(db.get_cf(cf, b"key1").unwrap().is_none());
assert!(db.get_cf(&cf, b"key1").unwrap().is_none());
}
#[test]
@ -606,13 +606,13 @@ fn compact_range_test() {
let cfs = vec!["cf1"];
let db = DB::open_cf(&opts, &path, cfs).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
db.put_cf(cf1, b"k1", b"v1").unwrap();
db.put_cf(cf1, b"k2", b"v2").unwrap();
db.put_cf(cf1, b"k3", b"v3").unwrap();
db.put_cf(cf1, b"k4", b"v4").unwrap();
db.put_cf(cf1, b"k5", b"v5").unwrap();
db.compact_range_cf(cf1, Some(b"k2"), Some(b"k4"));
db.compact_range_cf_opt(cf1, Some(b"k1"), None::<&str>, &compact_opts);
db.put_cf(&cf1, b"k1", b"v1").unwrap();
db.put_cf(&cf1, b"k2", b"v2").unwrap();
db.put_cf(&cf1, b"k3", b"v3").unwrap();
db.put_cf(&cf1, b"k4", b"v4").unwrap();
db.put_cf(&cf1, b"k5", b"v5").unwrap();
db.compact_range_cf(&cf1, Some(b"k2"), Some(b"k4"));
db.compact_range_cf_opt(&cf1, Some(b"k1"), None::<&str>, &compact_opts);
// put and compact default column family
db.put(b"k1", b"v1").unwrap();
@ -645,12 +645,12 @@ fn fifo_compaction_test() {
let cfs = vec!["cf1"];
let db = DB::open_cf(&opts, &path, cfs).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
db.put_cf(cf1, b"k1", b"v1").unwrap();
db.put_cf(cf1, b"k2", b"v2").unwrap();
db.put_cf(cf1, b"k3", b"v3").unwrap();
db.put_cf(cf1, b"k4", b"v4").unwrap();
db.put_cf(cf1, b"k5", b"v5").unwrap();
db.compact_range_cf(cf1, Some(b"k2"), Some(b"k4"));
db.put_cf(&cf1, b"k1", b"v1").unwrap();
db.put_cf(&cf1, b"k2", b"v2").unwrap();
db.put_cf(&cf1, b"k3", b"v3").unwrap();
db.put_cf(&cf1, b"k4", b"v4").unwrap();
db.put_cf(&cf1, b"k5", b"v5").unwrap();
db.compact_range_cf(&cf1, Some(b"k2"), Some(b"k4"));
// check stats
let ctx = PerfContext::default();
@ -880,15 +880,15 @@ fn test_open_cf_for_read_only() {
opts.create_missing_column_families(true);
let db = DB::open_cf(&opts, &path, cfs.clone()).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
db.put_cf(cf1, b"k1", b"v1").unwrap();
db.put_cf(&cf1, b"k1", b"v1").unwrap();
}
{
let opts = Options::default();
let error_if_log_file_exist = false;
let db = DB::open_cf_for_read_only(&opts, &path, cfs, error_if_log_file_exist).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
assert_eq!(db.get_cf(cf1, b"k1").unwrap().unwrap(), b"v1");
assert!(db.put_cf(cf1, b"k2", b"v2").is_err());
assert_eq!(db.get_cf(&cf1, b"k1").unwrap().unwrap(), b"v1");
assert!(db.put_cf(&cf1, b"k2", b"v2").is_err());
}
}
@ -904,18 +904,18 @@ fn delete_range_test() {
let db = DB::open_cf(&opts, &path, cfs).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
db.put_cf(cf1, b"k1", b"v1").unwrap();
db.put_cf(cf1, b"k2", b"v2").unwrap();
db.put_cf(cf1, b"k3", b"v3").unwrap();
db.put_cf(cf1, b"k4", b"v4").unwrap();
db.put_cf(cf1, b"k5", b"v5").unwrap();
db.delete_range_cf(cf1, b"k2", b"k4").unwrap();
assert_eq!(db.get_cf(cf1, b"k1").unwrap().unwrap(), b"v1");
assert_eq!(db.get_cf(cf1, b"k4").unwrap().unwrap(), b"v4");
assert_eq!(db.get_cf(cf1, b"k5").unwrap().unwrap(), b"v5");
assert!(db.get_cf(cf1, b"k2").unwrap().is_none());
assert!(db.get_cf(cf1, b"k3").unwrap().is_none());
db.put_cf(&cf1, b"k1", b"v1").unwrap();
db.put_cf(&cf1, b"k2", b"v2").unwrap();
db.put_cf(&cf1, b"k3", b"v3").unwrap();
db.put_cf(&cf1, b"k4", b"v4").unwrap();
db.put_cf(&cf1, b"k5", b"v5").unwrap();
db.delete_range_cf(&cf1, b"k2", b"k4").unwrap();
assert_eq!(db.get_cf(&cf1, b"k1").unwrap().unwrap(), b"v1");
assert_eq!(db.get_cf(&cf1, b"k4").unwrap().unwrap(), b"v4");
assert_eq!(db.get_cf(&cf1, b"k5").unwrap().unwrap(), b"v5");
assert!(db.get_cf(&cf1, b"k2").unwrap().is_none());
assert!(db.get_cf(&cf1, b"k3").unwrap().is_none());
}
}
@ -955,13 +955,13 @@ fn multi_get_cf() {
let cf0 = db.cf_handle("cf0").unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
db.put_cf(cf1, b"k1", b"v1").unwrap();
db.put_cf(&cf1, b"k1", b"v1").unwrap();
let cf2 = db.cf_handle("cf2").unwrap();
db.put_cf(cf2, b"k2", b"v2").unwrap();
db.put_cf(&cf2, b"k2", b"v2").unwrap();
let values = db
.multi_get_cf(vec![(cf0, b"k0"), (cf1, b"k1"), (cf2, b"k2")])
.multi_get_cf(vec![(&cf0, b"k0"), (&cf1, b"k1"), (&cf2, b"k2")])
.into_iter()
.map(Result::unwrap)
.collect::<Vec<_>>();
@ -997,10 +997,10 @@ fn key_may_exist_cf() {
let db = DB::open_cf(&opts, &path, &["cf"]).unwrap();
let cf = db.cf_handle("cf").unwrap();
assert_eq!(false, db.key_may_exist_cf(cf, "nonexistent"));
assert_eq!(false, db.key_may_exist_cf(&cf, "nonexistent"));
assert_eq!(
false,
db.key_may_exist_cf_opt(cf, "nonexistent", &ReadOptions::default())
db.key_may_exist_cf_opt(&cf, "nonexistent", &ReadOptions::default())
);
}
}

@ -38,7 +38,7 @@ fn property_cf_test() {
let mut db = DB::open_default(&n).unwrap();
db.create_cf("cf1", &opts).unwrap();
let cf = db.cf_handle("cf1").unwrap();
let value = db.property_value_cf(cf, "rocksdb.stats").unwrap().unwrap();
let value = db.property_value_cf(&cf, "rocksdb.stats").unwrap().unwrap();
assert!(value.contains("Stats"));
}
@ -66,7 +66,7 @@ fn property_int_cf_test() {
db.create_cf("cf1", &opts).unwrap();
let cf = db.cf_handle("cf1").unwrap();
let total_keys = db
.property_int_value_cf(cf, "rocksdb.estimate-num-keys")
.property_int_value_cf(&cf, "rocksdb.estimate-num-keys")
.unwrap();
assert_eq!(total_keys, Some(0));

Loading…
Cancel
Save