Keep Cache and Env alive with Rc (#497)

master
Andrea Corradi 3 years ago committed by GitHub
parent 0b700fe70d
commit b7af3946c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      src/checkpoint.rs
  2. 8
      src/db.rs
  3. 128
      src/db_options.rs
  4. 2
      src/perf.rs
  5. 8
      tests/fail/checkpoint_outlive_db.rs
  6. 10
      tests/fail/checkpoint_outlive_db.stderr
  7. 8
      tests/fail/snapshot_outlive_db.rs
  8. 10
      tests/fail/snapshot_outlive_db.stderr
  9. 6
      tests/test_checkpoint.rs
  10. 6
      tests/test_db.rs

@ -19,6 +19,7 @@
use crate::{ffi, Error, DB};
use std::ffi::CString;
use std::marker::PhantomData;
use std::path::Path;
/// Undocumented parameter for `ffi::rocksdb_checkpoint_create` function. Zero by default.
@ -26,16 +27,17 @@ const LOG_SIZE_FOR_FLUSH: u64 = 0_u64;
/// Database's checkpoint object.
/// Used to create checkpoints of the specified DB from time to time.
pub struct Checkpoint {
pub struct Checkpoint<'db> {
inner: *mut ffi::rocksdb_checkpoint_t,
_db: PhantomData<&'db ()>,
}
impl Checkpoint {
impl<'db> Checkpoint<'db> {
/// Creates new checkpoint object for specific DB.
///
/// Does not actually produce checkpoints, call `.create_checkpoint()` method to produce
/// a DB checkpoint.
pub fn new(db: &DB) -> Result<Checkpoint, Error> {
pub fn new(db: &'db DB) -> Result<Checkpoint<'db>, Error> {
let checkpoint: *mut ffi::rocksdb_checkpoint_t;
unsafe { checkpoint = ffi_try!(ffi::rocksdb_checkpoint_object_create(db.inner)) };
@ -44,7 +46,10 @@ impl Checkpoint {
return Err(Error::new("Could not create checkpoint object.".to_owned()));
}
Ok(Checkpoint { inner: checkpoint })
Ok(Checkpoint {
inner: checkpoint,
_db: PhantomData,
})
}
/// Creates new physical DB checkpoint in directory specified by `path`.
@ -70,7 +75,7 @@ impl Checkpoint {
}
}
impl Drop for Checkpoint {
impl<'db> Drop for Checkpoint<'db> {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_checkpoint_object_destroy(self.inner);

@ -16,6 +16,7 @@
use crate::{
column_family::AsColumnFamilyRef,
column_family::BoundColumnFamily,
db_options::OptionsMustOutliveDB,
ffi,
ffi_util::{from_cstr, opt_bytes_to_ptr, raw_data, to_cpath},
ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode,
@ -29,6 +30,7 @@ use std::collections::BTreeMap;
use std::ffi::{CStr, CString};
use std::fmt;
use std::fs;
use std::iter;
use std::marker::PhantomData;
use std::path::Path;
use std::path::PathBuf;
@ -102,6 +104,7 @@ pub struct DBWithThreadMode<T: ThreadMode> {
pub(crate) inner: *mut ffi::rocksdb_t,
cfs: T, // Column families are held differently depending on thread mode
path: PathBuf,
_outlive: Vec<OptionsMustOutliveDB>,
}
/// Minimal set of DB-related methods, intended to be generic over
@ -238,6 +241,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
inner: db,
cfs: T::new(BTreeMap::new()),
path: path.as_ref().to_path_buf(),
_outlive: vec![opts.outlive.clone()],
})
}
@ -330,6 +334,9 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
I: IntoIterator<Item = ColumnFamilyDescriptor>,
{
let cfs: Vec<_> = cfs.into_iter().collect();
let outlive = iter::once(opts.outlive.clone())
.chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
.collect();
let cpath = to_cpath(&path)?;
@ -401,6 +408,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
inner: db,
path: path.as_ref().to_path_buf(),
cfs: T::new(cf_map),
_outlive: outlive,
})
}

@ -15,6 +15,7 @@
use std::ffi::{CStr, CString};
use std::mem;
use std::path::Path;
use std::sync::Arc;
use libc::{self, c_char, c_int, c_uchar, c_uint, c_void, size_t};
@ -35,10 +36,20 @@ fn new_cache(capacity: size_t) -> *mut ffi::rocksdb_cache_t {
unsafe { ffi::rocksdb_cache_create_lru(capacity) }
}
pub struct Cache {
pub(crate) struct CacheWrapper {
pub(crate) inner: *mut ffi::rocksdb_cache_t,
}
impl Drop for CacheWrapper {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_cache_destroy(self.inner);
}
}
}
pub struct Cache(pub(crate) Arc<CacheWrapper>);
impl Cache {
/// Create a lru cache with capacity
pub fn new_lru_cache(capacity: size_t) -> Result<Cache, Error> {
@ -46,33 +57,29 @@ impl Cache {
if cache.is_null() {
Err(Error::new("Could not create Cache".to_owned()))
} else {
Ok(Cache { inner: cache })
Ok(Cache(Arc::new(CacheWrapper { inner: cache })))
}
}
/// Returns the Cache memory usage
pub fn get_usage(&self) -> usize {
unsafe { ffi::rocksdb_cache_get_usage(self.inner) }
unsafe { ffi::rocksdb_cache_get_usage(self.0.inner) }
}
/// Returns pinned memory usage
pub fn get_pinned_usage(&self) -> usize {
unsafe { ffi::rocksdb_cache_get_pinned_usage(self.inner) }
unsafe { ffi::rocksdb_cache_get_pinned_usage(self.0.inner) }
}
/// Sets cache capacity
pub fn set_capacity(&mut self, capacity: size_t) {
unsafe {
ffi::rocksdb_cache_set_capacity(self.inner, capacity);
ffi::rocksdb_cache_set_capacity(self.0.inner, capacity);
}
}
}
impl Drop for Cache {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_cache_destroy(self.inner);
}
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
@ -86,11 +93,13 @@ impl Drop for Cache {
///
/// Note: currently, C API behinds C++ API for various settings.
/// See also: `rocksdb/include/env.h`
pub struct Env {
pub(crate) inner: *mut ffi::rocksdb_env_t,
pub struct Env(Arc<EnvWrapper>);
struct EnvWrapper {
inner: *mut ffi::rocksdb_env_t,
}
impl Drop for Env {
impl Drop for EnvWrapper {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_env_destroy(self.inner);
@ -105,7 +114,7 @@ impl Env {
if env.is_null() {
Err(Error::new("Could not create mem env".to_owned()))
} else {
Ok(Env { inner: env })
Ok(Env(Arc::new(EnvWrapper { inner: env })))
}
}
@ -116,7 +125,7 @@ impl Env {
if env.is_null() {
Err(Error::new("Could not create mem env".to_owned()))
} else {
Ok(Env { inner: env })
Ok(Env(Arc::new(EnvWrapper { inner: env })))
}
}
@ -126,7 +135,7 @@ impl Env {
/// Default: 1
pub fn set_background_threads(&mut self, num_threads: c_int) {
unsafe {
ffi::rocksdb_env_set_background_threads(self.inner, num_threads);
ffi::rocksdb_env_set_background_threads(self.0.inner, num_threads);
}
}
@ -134,7 +143,7 @@ impl Env {
/// prevent compactions from stalling memtable flushes.
pub fn set_high_priority_background_threads(&mut self, n: c_int) {
unsafe {
ffi::rocksdb_env_set_high_priority_background_threads(self.inner, n);
ffi::rocksdb_env_set_high_priority_background_threads(self.0.inner, n);
}
}
@ -142,7 +151,7 @@ impl Env {
/// prevent compactions from stalling memtable flushes.
pub fn set_low_priority_background_threads(&mut self, n: c_int) {
unsafe {
ffi::rocksdb_env_set_low_priority_background_threads(self.inner, n);
ffi::rocksdb_env_set_low_priority_background_threads(self.0.inner, n);
}
}
@ -150,42 +159,81 @@ impl Env {
/// prevent compactions from stalling memtable flushes.
pub fn set_bottom_priority_background_threads(&mut self, n: c_int) {
unsafe {
ffi::rocksdb_env_set_bottom_priority_background_threads(self.inner, n);
ffi::rocksdb_env_set_bottom_priority_background_threads(self.0.inner, n);
}
}
/// Wait for all threads started by StartThread to terminate.
pub fn join_all_threads(&mut self) {
unsafe {
ffi::rocksdb_env_join_all_threads(self.inner);
ffi::rocksdb_env_join_all_threads(self.0.inner);
}
}
/// Lowering IO priority for threads from the specified pool.
pub fn lower_thread_pool_io_priority(&mut self) {
unsafe {
ffi::rocksdb_env_lower_thread_pool_io_priority(self.inner);
ffi::rocksdb_env_lower_thread_pool_io_priority(self.0.inner);
}
}
/// Lowering IO priority for high priority thread pool.
pub fn lower_high_priority_thread_pool_io_priority(&mut self) {
unsafe {
ffi::rocksdb_env_lower_high_priority_thread_pool_io_priority(self.inner);
ffi::rocksdb_env_lower_high_priority_thread_pool_io_priority(self.0.inner);
}
}
/// Lowering CPU priority for threads from the specified pool.
pub fn lower_thread_pool_cpu_priority(&mut self) {
unsafe {
ffi::rocksdb_env_lower_thread_pool_cpu_priority(self.inner);
ffi::rocksdb_env_lower_thread_pool_cpu_priority(self.0.inner);
}
}
/// Lowering CPU priority for high priority thread pool.
pub fn lower_high_priority_thread_pool_cpu_priority(&mut self) {
unsafe {
ffi::rocksdb_env_lower_high_priority_thread_pool_cpu_priority(self.inner);
ffi::rocksdb_env_lower_high_priority_thread_pool_cpu_priority(self.0.inner);
}
}
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
#[derive(Default)]
pub(crate) struct OptionsMustOutliveDB {
env: Option<Env>,
row_cache: Option<Cache>,
block_based: Option<BlockBasedOptionsMustOutliveDB>,
}
impl OptionsMustOutliveDB {
pub(crate) fn clone(&self) -> Self {
Self {
env: self.env.as_ref().map(Env::clone),
row_cache: self.row_cache.as_ref().map(Cache::clone),
block_based: self
.block_based
.as_ref()
.map(BlockBasedOptionsMustOutliveDB::clone),
}
}
}
#[derive(Default)]
struct BlockBasedOptionsMustOutliveDB {
block_cache: Option<Cache>,
block_cache_compressed: Option<Cache>,
}
impl BlockBasedOptionsMustOutliveDB {
fn clone(&self) -> Self {
Self {
block_cache: self.block_cache.as_ref().map(Cache::clone),
block_cache_compressed: self.block_cache_compressed.as_ref().map(Cache::clone),
}
}
}
@ -226,6 +274,7 @@ impl Env {
/// ```
pub struct Options {
pub(crate) inner: *mut ffi::rocksdb_options_t,
pub(crate) outlive: OptionsMustOutliveDB,
}
/// Optionally disable WAL or sync for this write.
@ -284,6 +333,7 @@ pub struct FlushOptions {
/// For configuring block-based file storage.
pub struct BlockBasedOptions {
pub(crate) inner: *mut ffi::rocksdb_block_based_table_options_t,
outlive: BlockBasedOptionsMustOutliveDB,
}
pub struct ReadOptions {
@ -351,7 +401,10 @@ impl Clone for Options {
if inner.is_null() {
panic!("Could not copy RocksDB options");
}
Self { inner }
Self {
inner,
outlive: self.outlive.clone(),
}
}
}
@ -467,8 +520,9 @@ impl BlockBasedOptions {
/// By default, rocksdb will automatically create and use an 8MB internal cache.
pub fn set_block_cache(&mut self, cache: &Cache) {
unsafe {
ffi::rocksdb_block_based_options_set_block_cache(self.inner, cache.inner);
ffi::rocksdb_block_based_options_set_block_cache(self.inner, cache.0.inner);
}
self.outlive.block_cache = Some(cache.clone());
}
/// Sets global cache for compressed blocks. Cache must outlive DB instance which uses it.
@ -476,8 +530,9 @@ impl BlockBasedOptions {
/// By default, rocksdb will not use a compressed block cache.
pub fn set_block_cache_compressed(&mut self, cache: &Cache) {
unsafe {
ffi::rocksdb_block_based_options_set_block_cache_compressed(self.inner, cache.inner);
ffi::rocksdb_block_based_options_set_block_cache_compressed(self.inner, cache.0.inner);
}
self.outlive.block_cache_compressed = Some(cache.clone());
}
/// Disable block cache
@ -633,7 +688,10 @@ impl Default for BlockBasedOptions {
if block_opts.is_null() {
panic!("Could not create RocksDB block based options");
}
BlockBasedOptions { inner: block_opts }
BlockBasedOptions {
inner: block_opts,
outlive: BlockBasedOptionsMustOutliveDB::default(),
}
}
}
@ -819,8 +877,9 @@ impl Options {
/// Default: Env::default()
pub fn set_env(&mut self, env: &Env) {
unsafe {
ffi::rocksdb_options_set_env(self.inner, env.inner);
ffi::rocksdb_options_set_env(self.inner, env.0.inner);
}
self.outlive.env = Some(env.clone());
}
/// Sets the compression algorithm that will be used for compressing blocks.
@ -2101,6 +2160,7 @@ impl Options {
unsafe {
ffi::rocksdb_options_set_block_based_table_factory(self.inner, factory.inner);
}
self.outlive.block_based = Some(factory.outlive.clone());
}
// This is a factory that provides TableFactory objects.
@ -2504,8 +2564,9 @@ impl Options {
/// Not supported in ROCKSDB_LITE mode!
pub fn set_row_cache(&mut self, cache: &Cache) {
unsafe {
ffi::rocksdb_options_set_row_cache(self.inner, cache.inner);
ffi::rocksdb_options_set_row_cache(self.inner, cache.0.inner);
}
self.outlive.row_cache = Some(cache.clone());
}
/// Use to control write rate of flush and compaction. Flush has higher
@ -2695,7 +2756,10 @@ impl Default for Options {
if opts.is_null() {
panic!("Could not create RocksDB options");
}
Options { inner: opts }
Options {
inner: opts,
outlive: OptionsMustOutliveDB::default(),
}
}
}
}

@ -249,7 +249,7 @@ impl MemoryUsageBuilder {
/// Add a cache to collect memory usage from it and add up in total stats
fn add_cache(&mut self, cache: &Cache) {
unsafe {
ffi::rocksdb_memory_consumers_add_cache(self.inner, cache.inner);
ffi::rocksdb_memory_consumers_add_cache(self.inner, cache.0.inner);
}
}

@ -0,0 +1,8 @@
use rocksdb::{DB, checkpoint::Checkpoint};
fn main() {
let _checkpoint = {
let db = DB::open_default("foo").unwrap();
Checkpoint::new(&db)
};
}

@ -0,0 +1,10 @@
error[E0597]: `db` does not live long enough
--> $DIR/checkpoint_outlive_db.rs:6:25
|
4 | let _checkpoint = {
| ----------- borrow later stored here
5 | let db = DB::open_default("foo").unwrap();
6 | Checkpoint::new(&db)
| ^^^ borrowed value does not live long enough
7 | };
| - `db` dropped here while still borrowed

@ -0,0 +1,8 @@
use rocksdb::DB;
fn main() {
let _snapshot = {
let db = DB::open_default("foo").unwrap();
db.snapshot()
};
}

@ -0,0 +1,10 @@
error[E0597]: `db` does not live long enough
--> $DIR/snapshot_outlive_db.rs:6:9
|
4 | let _snapshot = {
| --------- borrow later stored here
5 | let db = DB::open_default("foo").unwrap();
6 | db.snapshot()
| ^^ borrowed value does not live long enough
7 | };
| - `db` dropped here while still borrowed

@ -99,3 +99,9 @@ pub fn test_multi_checkpoints() {
assert_eq!(cp.get(b"k5").unwrap().unwrap(), b"v5");
assert_eq!(cp.get(b"k6").unwrap().unwrap(), b"v6");
}
#[test]
fn test_checkpoint_outlive_db() {
let t = trybuild::TestCases::new();
t.compile_fail("tests/fail/checkpoint_outlive_db.rs");
}

@ -946,3 +946,9 @@ fn multi_get_cf() {
assert_eq!(values[2], b"v2");
}
}
#[test]
fn test_snapshot_outlive_db() {
let t = trybuild::TestCases::new();
t.compile_fail("tests/fail/snapshot_outlive_db.rs");
}

Loading…
Cancel
Save