diff --git a/src/checkpoint.rs b/src/checkpoint.rs index b1bd667..19f2313 100644 --- a/src/checkpoint.rs +++ b/src/checkpoint.rs @@ -19,6 +19,7 @@ use crate::{ffi, Error, DB}; use std::ffi::CString; +use std::marker::PhantomData; use std::path::Path; /// Undocumented parameter for `ffi::rocksdb_checkpoint_create` function. Zero by default. @@ -26,16 +27,17 @@ const LOG_SIZE_FOR_FLUSH: u64 = 0_u64; /// Database's checkpoint object. /// Used to create checkpoints of the specified DB from time to time. -pub struct Checkpoint { +pub struct Checkpoint<'db> { inner: *mut ffi::rocksdb_checkpoint_t, + _db: PhantomData<&'db ()>, } -impl Checkpoint { +impl<'db> Checkpoint<'db> { /// Creates new checkpoint object for specific DB. /// /// Does not actually produce checkpoints, call `.create_checkpoint()` method to produce /// a DB checkpoint. - pub fn new(db: &DB) -> Result { + pub fn new(db: &'db DB) -> Result, Error> { let checkpoint: *mut ffi::rocksdb_checkpoint_t; unsafe { checkpoint = ffi_try!(ffi::rocksdb_checkpoint_object_create(db.inner)) }; @@ -44,7 +46,10 @@ impl Checkpoint { return Err(Error::new("Could not create checkpoint object.".to_owned())); } - Ok(Checkpoint { inner: checkpoint }) + Ok(Checkpoint { + inner: checkpoint, + _db: PhantomData, + }) } /// Creates new physical DB checkpoint in directory specified by `path`. @@ -70,7 +75,7 @@ impl Checkpoint { } } -impl Drop for Checkpoint { +impl<'db> Drop for Checkpoint<'db> { fn drop(&mut self) { unsafe { ffi::rocksdb_checkpoint_object_destroy(self.inner); diff --git a/src/db.rs b/src/db.rs index 684d6b1..3684590 100644 --- a/src/db.rs +++ b/src/db.rs @@ -16,6 +16,7 @@ use crate::{ column_family::AsColumnFamilyRef, column_family::BoundColumnFamily, + db_options::OptionsMustOutliveDB, ffi, ffi_util::{from_cstr, opt_bytes_to_ptr, raw_data, to_cpath}, ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode, @@ -29,6 +30,7 @@ use std::collections::BTreeMap; use std::ffi::{CStr, CString}; use std::fmt; use std::fs; +use std::iter; use std::marker::PhantomData; use std::path::Path; use std::path::PathBuf; @@ -102,6 +104,7 @@ pub struct DBWithThreadMode { pub(crate) inner: *mut ffi::rocksdb_t, cfs: T, // Column families are held differently depending on thread mode path: PathBuf, + _outlive: Vec, } /// Minimal set of DB-related methods, intended to be generic over @@ -238,6 +241,7 @@ impl DBWithThreadMode { inner: db, cfs: T::new(BTreeMap::new()), path: path.as_ref().to_path_buf(), + _outlive: vec![opts.outlive.clone()], }) } @@ -330,6 +334,9 @@ impl DBWithThreadMode { I: IntoIterator, { let cfs: Vec<_> = cfs.into_iter().collect(); + let outlive = iter::once(opts.outlive.clone()) + .chain(cfs.iter().map(|cf| cf.options.outlive.clone())) + .collect(); let cpath = to_cpath(&path)?; @@ -401,6 +408,7 @@ impl DBWithThreadMode { inner: db, path: path.as_ref().to_path_buf(), cfs: T::new(cf_map), + _outlive: outlive, }) } diff --git a/src/db_options.rs b/src/db_options.rs index c99207e..50e2d28 100644 --- a/src/db_options.rs +++ b/src/db_options.rs @@ -15,6 +15,7 @@ use std::ffi::{CStr, CString}; use std::mem; use std::path::Path; +use std::sync::Arc; use libc::{self, c_char, c_int, c_uchar, c_uint, c_void, size_t}; @@ -35,10 +36,20 @@ fn new_cache(capacity: size_t) -> *mut ffi::rocksdb_cache_t { unsafe { ffi::rocksdb_cache_create_lru(capacity) } } -pub struct Cache { +pub(crate) struct CacheWrapper { pub(crate) inner: *mut ffi::rocksdb_cache_t, } +impl Drop for CacheWrapper { + fn drop(&mut self) { + unsafe { + ffi::rocksdb_cache_destroy(self.inner); + } + } +} + +pub struct Cache(pub(crate) Arc); + impl Cache { /// Create a lru cache with capacity pub fn new_lru_cache(capacity: size_t) -> Result { @@ -46,33 +57,29 @@ impl Cache { if cache.is_null() { Err(Error::new("Could not create Cache".to_owned())) } else { - Ok(Cache { inner: cache }) + Ok(Cache(Arc::new(CacheWrapper { inner: cache }))) } } /// Returns the Cache memory usage pub fn get_usage(&self) -> usize { - unsafe { ffi::rocksdb_cache_get_usage(self.inner) } + unsafe { ffi::rocksdb_cache_get_usage(self.0.inner) } } /// Returns pinned memory usage pub fn get_pinned_usage(&self) -> usize { - unsafe { ffi::rocksdb_cache_get_pinned_usage(self.inner) } + unsafe { ffi::rocksdb_cache_get_pinned_usage(self.0.inner) } } /// Sets cache capacity pub fn set_capacity(&mut self, capacity: size_t) { unsafe { - ffi::rocksdb_cache_set_capacity(self.inner, capacity); + ffi::rocksdb_cache_set_capacity(self.0.inner, capacity); } } -} -impl Drop for Cache { - fn drop(&mut self) { - unsafe { - ffi::rocksdb_cache_destroy(self.inner); - } + fn clone(&self) -> Self { + Self(self.0.clone()) } } @@ -86,11 +93,13 @@ impl Drop for Cache { /// /// Note: currently, C API behinds C++ API for various settings. /// See also: `rocksdb/include/env.h` -pub struct Env { - pub(crate) inner: *mut ffi::rocksdb_env_t, +pub struct Env(Arc); + +struct EnvWrapper { + inner: *mut ffi::rocksdb_env_t, } -impl Drop for Env { +impl Drop for EnvWrapper { fn drop(&mut self) { unsafe { ffi::rocksdb_env_destroy(self.inner); @@ -105,7 +114,7 @@ impl Env { if env.is_null() { Err(Error::new("Could not create mem env".to_owned())) } else { - Ok(Env { inner: env }) + Ok(Env(Arc::new(EnvWrapper { inner: env }))) } } @@ -116,7 +125,7 @@ impl Env { if env.is_null() { Err(Error::new("Could not create mem env".to_owned())) } else { - Ok(Env { inner: env }) + Ok(Env(Arc::new(EnvWrapper { inner: env }))) } } @@ -126,7 +135,7 @@ impl Env { /// Default: 1 pub fn set_background_threads(&mut self, num_threads: c_int) { unsafe { - ffi::rocksdb_env_set_background_threads(self.inner, num_threads); + ffi::rocksdb_env_set_background_threads(self.0.inner, num_threads); } } @@ -134,7 +143,7 @@ impl Env { /// prevent compactions from stalling memtable flushes. pub fn set_high_priority_background_threads(&mut self, n: c_int) { unsafe { - ffi::rocksdb_env_set_high_priority_background_threads(self.inner, n); + ffi::rocksdb_env_set_high_priority_background_threads(self.0.inner, n); } } @@ -142,7 +151,7 @@ impl Env { /// prevent compactions from stalling memtable flushes. pub fn set_low_priority_background_threads(&mut self, n: c_int) { unsafe { - ffi::rocksdb_env_set_low_priority_background_threads(self.inner, n); + ffi::rocksdb_env_set_low_priority_background_threads(self.0.inner, n); } } @@ -150,42 +159,81 @@ impl Env { /// prevent compactions from stalling memtable flushes. pub fn set_bottom_priority_background_threads(&mut self, n: c_int) { unsafe { - ffi::rocksdb_env_set_bottom_priority_background_threads(self.inner, n); + ffi::rocksdb_env_set_bottom_priority_background_threads(self.0.inner, n); } } /// Wait for all threads started by StartThread to terminate. pub fn join_all_threads(&mut self) { unsafe { - ffi::rocksdb_env_join_all_threads(self.inner); + ffi::rocksdb_env_join_all_threads(self.0.inner); } } /// Lowering IO priority for threads from the specified pool. pub fn lower_thread_pool_io_priority(&mut self) { unsafe { - ffi::rocksdb_env_lower_thread_pool_io_priority(self.inner); + ffi::rocksdb_env_lower_thread_pool_io_priority(self.0.inner); } } /// Lowering IO priority for high priority thread pool. pub fn lower_high_priority_thread_pool_io_priority(&mut self) { unsafe { - ffi::rocksdb_env_lower_high_priority_thread_pool_io_priority(self.inner); + ffi::rocksdb_env_lower_high_priority_thread_pool_io_priority(self.0.inner); } } /// Lowering CPU priority for threads from the specified pool. pub fn lower_thread_pool_cpu_priority(&mut self) { unsafe { - ffi::rocksdb_env_lower_thread_pool_cpu_priority(self.inner); + ffi::rocksdb_env_lower_thread_pool_cpu_priority(self.0.inner); } } /// Lowering CPU priority for high priority thread pool. pub fn lower_high_priority_thread_pool_cpu_priority(&mut self) { unsafe { - ffi::rocksdb_env_lower_high_priority_thread_pool_cpu_priority(self.inner); + ffi::rocksdb_env_lower_high_priority_thread_pool_cpu_priority(self.0.inner); + } + } + + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +#[derive(Default)] +pub(crate) struct OptionsMustOutliveDB { + env: Option, + row_cache: Option, + block_based: Option, +} + +impl OptionsMustOutliveDB { + pub(crate) fn clone(&self) -> Self { + Self { + env: self.env.as_ref().map(Env::clone), + row_cache: self.row_cache.as_ref().map(Cache::clone), + block_based: self + .block_based + .as_ref() + .map(BlockBasedOptionsMustOutliveDB::clone), + } + } +} + +#[derive(Default)] +struct BlockBasedOptionsMustOutliveDB { + block_cache: Option, + block_cache_compressed: Option, +} + +impl BlockBasedOptionsMustOutliveDB { + fn clone(&self) -> Self { + Self { + block_cache: self.block_cache.as_ref().map(Cache::clone), + block_cache_compressed: self.block_cache_compressed.as_ref().map(Cache::clone), } } } @@ -226,6 +274,7 @@ impl Env { /// ``` pub struct Options { pub(crate) inner: *mut ffi::rocksdb_options_t, + pub(crate) outlive: OptionsMustOutliveDB, } /// Optionally disable WAL or sync for this write. @@ -284,6 +333,7 @@ pub struct FlushOptions { /// For configuring block-based file storage. pub struct BlockBasedOptions { pub(crate) inner: *mut ffi::rocksdb_block_based_table_options_t, + outlive: BlockBasedOptionsMustOutliveDB, } pub struct ReadOptions { @@ -351,7 +401,10 @@ impl Clone for Options { if inner.is_null() { panic!("Could not copy RocksDB options"); } - Self { inner } + Self { + inner, + outlive: self.outlive.clone(), + } } } @@ -467,8 +520,9 @@ impl BlockBasedOptions { /// By default, rocksdb will automatically create and use an 8MB internal cache. pub fn set_block_cache(&mut self, cache: &Cache) { unsafe { - ffi::rocksdb_block_based_options_set_block_cache(self.inner, cache.inner); + ffi::rocksdb_block_based_options_set_block_cache(self.inner, cache.0.inner); } + self.outlive.block_cache = Some(cache.clone()); } /// Sets global cache for compressed blocks. Cache must outlive DB instance which uses it. @@ -476,8 +530,9 @@ impl BlockBasedOptions { /// By default, rocksdb will not use a compressed block cache. pub fn set_block_cache_compressed(&mut self, cache: &Cache) { unsafe { - ffi::rocksdb_block_based_options_set_block_cache_compressed(self.inner, cache.inner); + ffi::rocksdb_block_based_options_set_block_cache_compressed(self.inner, cache.0.inner); } + self.outlive.block_cache_compressed = Some(cache.clone()); } /// Disable block cache @@ -633,7 +688,10 @@ impl Default for BlockBasedOptions { if block_opts.is_null() { panic!("Could not create RocksDB block based options"); } - BlockBasedOptions { inner: block_opts } + BlockBasedOptions { + inner: block_opts, + outlive: BlockBasedOptionsMustOutliveDB::default(), + } } } @@ -819,8 +877,9 @@ impl Options { /// Default: Env::default() pub fn set_env(&mut self, env: &Env) { unsafe { - ffi::rocksdb_options_set_env(self.inner, env.inner); + ffi::rocksdb_options_set_env(self.inner, env.0.inner); } + self.outlive.env = Some(env.clone()); } /// Sets the compression algorithm that will be used for compressing blocks. @@ -2101,6 +2160,7 @@ impl Options { unsafe { ffi::rocksdb_options_set_block_based_table_factory(self.inner, factory.inner); } + self.outlive.block_based = Some(factory.outlive.clone()); } // This is a factory that provides TableFactory objects. @@ -2504,8 +2564,9 @@ impl Options { /// Not supported in ROCKSDB_LITE mode! pub fn set_row_cache(&mut self, cache: &Cache) { unsafe { - ffi::rocksdb_options_set_row_cache(self.inner, cache.inner); + ffi::rocksdb_options_set_row_cache(self.inner, cache.0.inner); } + self.outlive.row_cache = Some(cache.clone()); } /// Use to control write rate of flush and compaction. Flush has higher @@ -2695,7 +2756,10 @@ impl Default for Options { if opts.is_null() { panic!("Could not create RocksDB options"); } - Options { inner: opts } + Options { + inner: opts, + outlive: OptionsMustOutliveDB::default(), + } } } } diff --git a/src/perf.rs b/src/perf.rs index dee70be..d099a89 100644 --- a/src/perf.rs +++ b/src/perf.rs @@ -249,7 +249,7 @@ impl MemoryUsageBuilder { /// Add a cache to collect memory usage from it and add up in total stats fn add_cache(&mut self, cache: &Cache) { unsafe { - ffi::rocksdb_memory_consumers_add_cache(self.inner, cache.inner); + ffi::rocksdb_memory_consumers_add_cache(self.inner, cache.0.inner); } } diff --git a/tests/fail/checkpoint_outlive_db.rs b/tests/fail/checkpoint_outlive_db.rs new file mode 100644 index 0000000..d8400e0 --- /dev/null +++ b/tests/fail/checkpoint_outlive_db.rs @@ -0,0 +1,8 @@ +use rocksdb::{DB, checkpoint::Checkpoint}; + +fn main() { + let _checkpoint = { + let db = DB::open_default("foo").unwrap(); + Checkpoint::new(&db) + }; +} diff --git a/tests/fail/checkpoint_outlive_db.stderr b/tests/fail/checkpoint_outlive_db.stderr new file mode 100644 index 0000000..7bb598c --- /dev/null +++ b/tests/fail/checkpoint_outlive_db.stderr @@ -0,0 +1,10 @@ +error[E0597]: `db` does not live long enough + --> $DIR/checkpoint_outlive_db.rs:6:25 + | +4 | let _checkpoint = { + | ----------- borrow later stored here +5 | let db = DB::open_default("foo").unwrap(); +6 | Checkpoint::new(&db) + | ^^^ borrowed value does not live long enough +7 | }; + | - `db` dropped here while still borrowed diff --git a/tests/fail/snapshot_outlive_db.rs b/tests/fail/snapshot_outlive_db.rs new file mode 100644 index 0000000..09141d9 --- /dev/null +++ b/tests/fail/snapshot_outlive_db.rs @@ -0,0 +1,8 @@ +use rocksdb::DB; + +fn main() { + let _snapshot = { + let db = DB::open_default("foo").unwrap(); + db.snapshot() + }; +} diff --git a/tests/fail/snapshot_outlive_db.stderr b/tests/fail/snapshot_outlive_db.stderr new file mode 100644 index 0000000..9720e6b --- /dev/null +++ b/tests/fail/snapshot_outlive_db.stderr @@ -0,0 +1,10 @@ +error[E0597]: `db` does not live long enough + --> $DIR/snapshot_outlive_db.rs:6:9 + | +4 | let _snapshot = { + | --------- borrow later stored here +5 | let db = DB::open_default("foo").unwrap(); +6 | db.snapshot() + | ^^ borrowed value does not live long enough +7 | }; + | - `db` dropped here while still borrowed diff --git a/tests/test_checkpoint.rs b/tests/test_checkpoint.rs index 01e4dc5..a7668bb 100644 --- a/tests/test_checkpoint.rs +++ b/tests/test_checkpoint.rs @@ -99,3 +99,9 @@ pub fn test_multi_checkpoints() { assert_eq!(cp.get(b"k5").unwrap().unwrap(), b"v5"); assert_eq!(cp.get(b"k6").unwrap().unwrap(), b"v6"); } + +#[test] +fn test_checkpoint_outlive_db() { + let t = trybuild::TestCases::new(); + t.compile_fail("tests/fail/checkpoint_outlive_db.rs"); +} diff --git a/tests/test_db.rs b/tests/test_db.rs index 9c82c5e..0826d93 100644 --- a/tests/test_db.rs +++ b/tests/test_db.rs @@ -946,3 +946,9 @@ fn multi_get_cf() { assert_eq!(values[2], b"v2"); } } + +#[test] +fn test_snapshot_outlive_db() { + let t = trybuild::TestCases::new(); + t.compile_fail("tests/fail/snapshot_outlive_db.rs"); +}