diff --git a/src/backup.rs b/src/backup.rs index a146d16..31fb820 100644 --- a/src/backup.rs +++ b/src/backup.rs @@ -13,9 +13,11 @@ // limitations under the License. // +use crate::env::Env; use crate::{db::DBInner, ffi, ffi_util::to_cpath, DBCommon, Error, ThreadMode}; -use libc::{c_int, c_uchar}; +use libc::c_uchar; +use std::ffi::CString; use std::path::Path; /// Represents information of a backup including timestamp of the backup @@ -35,10 +37,11 @@ pub struct BackupEngineInfo { pub struct BackupEngine { inner: *mut ffi::rocksdb_backup_engine_t, + _outlive: Env, } pub struct BackupEngineOptions { - inner: *mut ffi::rocksdb_options_t, + inner: *mut ffi::rocksdb_backup_engine_options_t, } pub struct RestoreOptions { @@ -46,20 +49,24 @@ pub struct RestoreOptions { } impl BackupEngine { - /// Open a backup engine with the specified options. - pub fn open>(opts: &BackupEngineOptions, path: P) -> Result { - let cpath = to_cpath(path)?; - + /// Open a backup engine with the specified options and RocksDB Env. + pub fn open(opts: &BackupEngineOptions, env: &Env) -> Result { let be: *mut ffi::rocksdb_backup_engine_t; unsafe { - be = ffi_try!(ffi::rocksdb_backup_engine_open(opts.inner, cpath.as_ptr())); + be = ffi_try!(ffi::rocksdb_backup_engine_open_opts( + opts.inner, + env.0.inner + )); } if be.is_null() { return Err(Error::new("Could not initialize backup engine.".to_owned())); } - Ok(Self { inner: be }) + Ok(Self { + inner: be, + _outlive: env.clone(), + }) } /// Captures the state of the database in the latest backup. @@ -217,27 +224,52 @@ impl BackupEngine { } impl BackupEngineOptions { - // -} + /// Initializes BackupEngineOptions with the directory to be used for storing/accessing the + /// backup files. + pub fn new>(backup_dir: P) -> Result { + let backup_dir = backup_dir.as_ref(); + let c_backup_dir = if let Ok(c) = CString::new(backup_dir.to_string_lossy().as_bytes()) { + c + } else { + return Err(Error::new( + "Failed to convert backup_dir to CString \ + when constructing BackupEngineOptions" + .to_owned(), + )); + }; -impl RestoreOptions { - pub fn set_keep_log_files(&mut self, keep_log_files: bool) { unsafe { - ffi::rocksdb_restore_options_set_keep_log_files( + let opts = ffi::rocksdb_backup_engine_options_create(c_backup_dir.as_ptr()); + assert!(!opts.is_null(), "Could not create RocksDB backup options"); + + Ok(Self { inner: opts }) + } + } + + /// Sets the number of operations (such as file copies or file checksums) that RocksDB may + /// perform in parallel when executing a backup or restore. + /// + /// Default: 1 + pub fn set_max_background_operations(&mut self, max_background_operations: i32) { + unsafe { + ffi::rocksdb_backup_engine_options_set_max_background_operations( self.inner, - c_int::from(keep_log_files), + max_background_operations, ); } } } -impl Default for BackupEngineOptions { - fn default() -> Self { +impl RestoreOptions { + /// Sets `keep_log_files`. If true, restore won't overwrite the existing log files in wal_dir. + /// It will also move all log files from archive directory to wal_dir. Use this option in + /// combination with BackupEngineOptions::backup_log_files = false for persisting in-memory + /// databases. + /// + /// Default: false + pub fn set_keep_log_files(&mut self, keep_log_files: bool) { unsafe { - let opts = ffi::rocksdb_options_create(); - assert!(!opts.is_null(), "Could not create RocksDB backup options"); - - Self { inner: opts } + ffi::rocksdb_restore_options_set_keep_log_files(self.inner, i32::from(keep_log_files)); } } } @@ -264,7 +296,7 @@ impl Drop for BackupEngine { impl Drop for BackupEngineOptions { fn drop(&mut self) { unsafe { - ffi::rocksdb_options_destroy(self.inner); + ffi::rocksdb_backup_engine_options_destroy(self.inner); } } } diff --git a/src/db_options.rs b/src/db_options.rs index d4ee073..463f564 100644 --- a/src/db_options.rs +++ b/src/db_options.rs @@ -25,6 +25,7 @@ use crate::{ compaction_filter_factory::{self, CompactionFilterFactory}, comparator::{self, ComparatorCallback, CompareFn}, db::DBAccess, + env::Env, ffi, ffi_util::{from_cstr, to_cpath, CStrLike}, merge_operator::{ @@ -82,127 +83,6 @@ impl Cache { } } -/// An Env is an interface used by the rocksdb implementation to access -/// operating system functionality like the filesystem etc. Callers -/// may wish to provide a custom Env object when opening a database to -/// get fine gain control; e.g., to rate limit file system operations. -/// -/// All Env implementations are safe for concurrent access from -/// multiple threads without any external synchronization. -/// -/// Note: currently, C API behinds C++ API for various settings. -/// See also: `rocksdb/include/env.h` -#[derive(Clone)] -pub struct Env(Arc); - -pub(crate) struct EnvWrapper { - inner: *mut ffi::rocksdb_env_t, -} - -impl Drop for EnvWrapper { - fn drop(&mut self) { - unsafe { - ffi::rocksdb_env_destroy(self.inner); - } - } -} - -impl Env { - /// Returns default env - pub fn new() -> Result { - let env = unsafe { ffi::rocksdb_create_default_env() }; - if env.is_null() { - Err(Error::new("Could not create mem env".to_owned())) - } else { - Ok(Self(Arc::new(EnvWrapper { inner: env }))) - } - } - - /// Returns a new environment that stores its data in memory and delegates - /// all non-file-storage tasks to base_env. - pub fn mem_env() -> Result { - let env = unsafe { ffi::rocksdb_create_mem_env() }; - if env.is_null() { - Err(Error::new("Could not create mem env".to_owned())) - } else { - Ok(Self(Arc::new(EnvWrapper { inner: env }))) - } - } - - /// Sets the number of background worker threads of a specific thread pool for this environment. - /// `LOW` is the default pool. - /// - /// Default: 1 - pub fn set_background_threads(&mut self, num_threads: c_int) { - unsafe { - ffi::rocksdb_env_set_background_threads(self.0.inner, num_threads); - } - } - - /// Sets the size of the high priority thread pool that can be used to - /// prevent compactions from stalling memtable flushes. - pub fn set_high_priority_background_threads(&mut self, n: c_int) { - unsafe { - ffi::rocksdb_env_set_high_priority_background_threads(self.0.inner, n); - } - } - - /// Sets the size of the low priority thread pool that can be used to - /// prevent compactions from stalling memtable flushes. - pub fn set_low_priority_background_threads(&mut self, n: c_int) { - unsafe { - ffi::rocksdb_env_set_low_priority_background_threads(self.0.inner, n); - } - } - - /// Sets the size of the bottom priority thread pool that can be used to - /// prevent compactions from stalling memtable flushes. - pub fn set_bottom_priority_background_threads(&mut self, n: c_int) { - unsafe { - ffi::rocksdb_env_set_bottom_priority_background_threads(self.0.inner, n); - } - } - - /// Wait for all threads started by StartThread to terminate. - pub fn join_all_threads(&mut self) { - unsafe { - ffi::rocksdb_env_join_all_threads(self.0.inner); - } - } - - /// Lowering IO priority for threads from the specified pool. - pub fn lower_thread_pool_io_priority(&mut self) { - unsafe { - ffi::rocksdb_env_lower_thread_pool_io_priority(self.0.inner); - } - } - - /// Lowering IO priority for high priority thread pool. - pub fn lower_high_priority_thread_pool_io_priority(&mut self) { - unsafe { - ffi::rocksdb_env_lower_high_priority_thread_pool_io_priority(self.0.inner); - } - } - - /// Lowering CPU priority for threads from the specified pool. - pub fn lower_thread_pool_cpu_priority(&mut self) { - unsafe { - ffi::rocksdb_env_lower_thread_pool_cpu_priority(self.0.inner); - } - } - - /// Lowering CPU priority for high priority thread pool. - pub fn lower_high_priority_thread_pool_cpu_priority(&mut self) { - unsafe { - ffi::rocksdb_env_lower_high_priority_thread_pool_cpu_priority(self.0.inner); - } - } - - fn clone(&self) -> Self { - Self(self.0.clone()) - } -} - #[derive(Default)] pub(crate) struct OptionsMustOutliveDB { env: Option, @@ -385,7 +265,6 @@ unsafe impl Send for CuckooTableOptions {} unsafe impl Send for ReadOptions {} unsafe impl Send for IngestExternalFileOptions {} unsafe impl Send for CacheWrapper {} -unsafe impl Send for EnvWrapper {} // Sync is similarly safe for many types because they do not expose interior mutability, and their // use within the rocksdb library is generally behind a const reference @@ -396,7 +275,6 @@ unsafe impl Sync for CuckooTableOptions {} unsafe impl Sync for ReadOptions {} unsafe impl Sync for IngestExternalFileOptions {} unsafe impl Sync for CacheWrapper {} -unsafe impl Sync for EnvWrapper {} impl Drop for Options { fn drop(&mut self) { diff --git a/src/env.rs b/src/env.rs new file mode 100644 index 0000000..ac18cfd --- /dev/null +++ b/src/env.rs @@ -0,0 +1,125 @@ +use std::sync::Arc; + +use libc::{self, c_int}; + +use crate::{ffi, Error}; + +/// An Env is an interface used by the rocksdb implementation to access +/// operating system functionality like the filesystem etc. Callers +/// may wish to provide a custom Env object when opening a database to +/// get fine gain control; e.g., to rate limit file system operations. +/// +/// All Env implementations are safe for concurrent access from +/// multiple threads without any external synchronization. +/// +/// Note: currently, C API behinds C++ API for various settings. +/// See also: `rocksdb/include/env.h` +#[derive(Clone)] +pub struct Env(pub(crate) Arc); + +pub(crate) struct EnvWrapper { + pub(crate) inner: *mut ffi::rocksdb_env_t, +} + +impl Drop for EnvWrapper { + fn drop(&mut self) { + unsafe { + ffi::rocksdb_env_destroy(self.inner); + } + } +} + +impl Env { + /// Returns default env + pub fn new() -> Result { + let env = unsafe { ffi::rocksdb_create_default_env() }; + if env.is_null() { + Err(Error::new("Could not create mem env".to_owned())) + } else { + Ok(Self(Arc::new(EnvWrapper { inner: env }))) + } + } + + /// Returns a new environment that stores its data in memory and delegates + /// all non-file-storage tasks to base_env. + pub fn mem_env() -> Result { + let env = unsafe { ffi::rocksdb_create_mem_env() }; + if env.is_null() { + Err(Error::new("Could not create mem env".to_owned())) + } else { + Ok(Self(Arc::new(EnvWrapper { inner: env }))) + } + } + + /// Sets the number of background worker threads of a specific thread pool for this environment. + /// `LOW` is the default pool. + /// + /// Default: 1 + pub fn set_background_threads(&mut self, num_threads: c_int) { + unsafe { + ffi::rocksdb_env_set_background_threads(self.0.inner, num_threads); + } + } + + /// Sets the size of the high priority thread pool that can be used to + /// prevent compactions from stalling memtable flushes. + pub fn set_high_priority_background_threads(&mut self, n: c_int) { + unsafe { + ffi::rocksdb_env_set_high_priority_background_threads(self.0.inner, n); + } + } + + /// Sets the size of the low priority thread pool that can be used to + /// prevent compactions from stalling memtable flushes. + pub fn set_low_priority_background_threads(&mut self, n: c_int) { + unsafe { + ffi::rocksdb_env_set_low_priority_background_threads(self.0.inner, n); + } + } + + /// Sets the size of the bottom priority thread pool that can be used to + /// prevent compactions from stalling memtable flushes. + pub fn set_bottom_priority_background_threads(&mut self, n: c_int) { + unsafe { + ffi::rocksdb_env_set_bottom_priority_background_threads(self.0.inner, n); + } + } + + /// Wait for all threads started by StartThread to terminate. + pub fn join_all_threads(&mut self) { + unsafe { + ffi::rocksdb_env_join_all_threads(self.0.inner); + } + } + + /// Lowering IO priority for threads from the specified pool. + pub fn lower_thread_pool_io_priority(&mut self) { + unsafe { + ffi::rocksdb_env_lower_thread_pool_io_priority(self.0.inner); + } + } + + /// Lowering IO priority for high priority thread pool. + pub fn lower_high_priority_thread_pool_io_priority(&mut self) { + unsafe { + ffi::rocksdb_env_lower_high_priority_thread_pool_io_priority(self.0.inner); + } + } + + /// Lowering CPU priority for threads from the specified pool. + pub fn lower_thread_pool_cpu_priority(&mut self) { + unsafe { + ffi::rocksdb_env_lower_thread_pool_cpu_priority(self.0.inner); + } + } + + /// Lowering CPU priority for high priority thread pool. + pub fn lower_high_priority_thread_pool_cpu_priority(&mut self) { + unsafe { + ffi::rocksdb_env_lower_high_priority_thread_pool_cpu_priority(self.0.inner); + } + } +} + +unsafe impl Send for EnvWrapper {} +unsafe impl Sync for EnvWrapper {} diff --git a/src/lib.rs b/src/lib.rs index ae135d1..81ea602 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -85,6 +85,7 @@ mod db; mod db_iterator; mod db_options; mod db_pinnable_slice; +mod env; mod iter_range; pub mod merge_operator; pub mod perf; @@ -112,11 +113,12 @@ pub use crate::{ db_options::{ BlockBasedIndexType, BlockBasedOptions, BottommostLevelCompaction, Cache, CompactOptions, CuckooTableOptions, DBCompactionStyle, DBCompressionType, DBPath, DBRecoveryMode, - DataBlockIndexType, Env, FifoCompactOptions, FlushOptions, IngestExternalFileOptions, - LogLevel, MemtableFactory, Options, PlainTableFactoryOptions, ReadOptions, - UniversalCompactOptions, UniversalCompactionStopStyle, WriteOptions, + DataBlockIndexType, FifoCompactOptions, FlushOptions, IngestExternalFileOptions, LogLevel, + MemtableFactory, Options, PlainTableFactoryOptions, ReadOptions, UniversalCompactOptions, + UniversalCompactionStopStyle, WriteOptions, }, db_pinnable_slice::DBPinnableSlice, + env::Env, ffi_util::CStrLike, iter_range::{IterateBounds, PrefixRange}, merge_operator::MergeOperands, @@ -229,11 +231,11 @@ mod test { use super::{ column_family::UnboundColumnFamily, - db_options::{CacheWrapper, EnvWrapper}, + db_options::CacheWrapper, + env::{Env, EnvWrapper}, BlockBasedOptions, BoundColumnFamily, Cache, ColumnFamily, ColumnFamilyDescriptor, - DBIterator, DBRawIterator, Env, IngestExternalFileOptions, Options, - PlainTableFactoryOptions, ReadOptions, Snapshot, SstFileWriter, WriteBatch, WriteOptions, - DB, + DBIterator, DBRawIterator, IngestExternalFileOptions, Options, PlainTableFactoryOptions, + ReadOptions, Snapshot, SstFileWriter, WriteBatch, WriteOptions, DB, }; #[test] diff --git a/tests/test_backup.rs b/tests/test_backup.rs index 3c945a4..a44c126 100644 --- a/tests/test_backup.rs +++ b/tests/test_backup.rs @@ -18,7 +18,7 @@ use pretty_assertions::assert_eq; use rocksdb::{ backup::{BackupEngine, BackupEngineOptions, RestoreOptions}, - DB, + Env, DB, }; use util::DBPath; @@ -33,9 +33,10 @@ fn restore_from_latest() { let value = db.get(b"k1"); assert_eq!(value.unwrap().unwrap(), b"v1111"); { - let backup_path = DBPath::new("backup_path_1"); - let backup_opts = BackupEngineOptions::default(); - let mut backup_engine = BackupEngine::open(&backup_opts, &backup_path).unwrap(); + let env = Env::new().unwrap(); + let backup_opts = BackupEngineOptions::new("backup_path_1").unwrap(); + + let mut backup_engine = BackupEngine::open(&backup_opts, &env).unwrap(); assert!(backup_engine.create_new_backup(&db).is_ok()); // check backup info @@ -73,9 +74,10 @@ fn restore_from_backup() { let value = db.get(b"k1"); assert_eq!(value.unwrap().unwrap(), b"v1111"); { - let backup_path = DBPath::new("backup_path_2"); - let backup_opts = BackupEngineOptions::default(); - let mut backup_engine = BackupEngine::open(&backup_opts, &backup_path).unwrap(); + let env = Env::new().unwrap(); + let backup_opts = BackupEngineOptions::new("backup_path_2").unwrap(); + + let mut backup_engine = BackupEngine::open(&backup_opts, &env).unwrap(); assert!(backup_engine.create_new_backup(&db).is_ok()); // check backup info