Support RocksDB 7.x BackupEngineOptions (#700)

master
Matt Jurik 2 years ago committed by GitHub
parent 3805d1fd1a
commit 28237601a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 74
      src/backup.rs
  2. 124
      src/db_options.rs
  3. 125
      src/env.rs
  4. 16
      src/lib.rs
  5. 16
      tests/test_backup.rs

@ -13,9 +13,11 @@
// limitations under the License.
//
use crate::env::Env;
use crate::{db::DBInner, ffi, ffi_util::to_cpath, DBCommon, Error, ThreadMode};
use libc::{c_int, c_uchar};
use libc::c_uchar;
use std::ffi::CString;
use std::path::Path;
/// Represents information of a backup including timestamp of the backup
@ -35,10 +37,11 @@ pub struct BackupEngineInfo {
pub struct BackupEngine {
inner: *mut ffi::rocksdb_backup_engine_t,
_outlive: Env,
}
pub struct BackupEngineOptions {
inner: *mut ffi::rocksdb_options_t,
inner: *mut ffi::rocksdb_backup_engine_options_t,
}
pub struct RestoreOptions {
@ -46,20 +49,24 @@ pub struct RestoreOptions {
}
impl BackupEngine {
/// Open a backup engine with the specified options.
pub fn open<P: AsRef<Path>>(opts: &BackupEngineOptions, path: P) -> Result<Self, Error> {
let cpath = to_cpath(path)?;
/// Open a backup engine with the specified options and RocksDB Env.
pub fn open(opts: &BackupEngineOptions, env: &Env) -> Result<Self, Error> {
let be: *mut ffi::rocksdb_backup_engine_t;
unsafe {
be = ffi_try!(ffi::rocksdb_backup_engine_open(opts.inner, cpath.as_ptr()));
be = ffi_try!(ffi::rocksdb_backup_engine_open_opts(
opts.inner,
env.0.inner
));
}
if be.is_null() {
return Err(Error::new("Could not initialize backup engine.".to_owned()));
}
Ok(Self { inner: be })
Ok(Self {
inner: be,
_outlive: env.clone(),
})
}
/// Captures the state of the database in the latest backup.
@ -217,27 +224,52 @@ impl BackupEngine {
}
impl BackupEngineOptions {
//
}
/// Initializes BackupEngineOptions with the directory to be used for storing/accessing the
/// backup files.
pub fn new<P: AsRef<Path>>(backup_dir: P) -> Result<Self, Error> {
let backup_dir = backup_dir.as_ref();
let c_backup_dir = if let Ok(c) = CString::new(backup_dir.to_string_lossy().as_bytes()) {
c
} else {
return Err(Error::new(
"Failed to convert backup_dir to CString \
when constructing BackupEngineOptions"
.to_owned(),
));
};
impl RestoreOptions {
pub fn set_keep_log_files(&mut self, keep_log_files: bool) {
unsafe {
ffi::rocksdb_restore_options_set_keep_log_files(
let opts = ffi::rocksdb_backup_engine_options_create(c_backup_dir.as_ptr());
assert!(!opts.is_null(), "Could not create RocksDB backup options");
Ok(Self { inner: opts })
}
}
/// Sets the number of operations (such as file copies or file checksums) that RocksDB may
/// perform in parallel when executing a backup or restore.
///
/// Default: 1
pub fn set_max_background_operations(&mut self, max_background_operations: i32) {
unsafe {
ffi::rocksdb_backup_engine_options_set_max_background_operations(
self.inner,
c_int::from(keep_log_files),
max_background_operations,
);
}
}
}
impl Default for BackupEngineOptions {
fn default() -> Self {
impl RestoreOptions {
/// Sets `keep_log_files`. If true, restore won't overwrite the existing log files in wal_dir.
/// It will also move all log files from archive directory to wal_dir. Use this option in
/// combination with BackupEngineOptions::backup_log_files = false for persisting in-memory
/// databases.
///
/// Default: false
pub fn set_keep_log_files(&mut self, keep_log_files: bool) {
unsafe {
let opts = ffi::rocksdb_options_create();
assert!(!opts.is_null(), "Could not create RocksDB backup options");
Self { inner: opts }
ffi::rocksdb_restore_options_set_keep_log_files(self.inner, i32::from(keep_log_files));
}
}
}
@ -264,7 +296,7 @@ impl Drop for BackupEngine {
impl Drop for BackupEngineOptions {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_options_destroy(self.inner);
ffi::rocksdb_backup_engine_options_destroy(self.inner);
}
}
}

@ -25,6 +25,7 @@ use crate::{
compaction_filter_factory::{self, CompactionFilterFactory},
comparator::{self, ComparatorCallback, CompareFn},
db::DBAccess,
env::Env,
ffi,
ffi_util::{from_cstr, to_cpath, CStrLike},
merge_operator::{
@ -82,127 +83,6 @@ impl Cache {
}
}
/// An Env is an interface used by the rocksdb implementation to access
/// operating system functionality like the filesystem etc. Callers
/// may wish to provide a custom Env object when opening a database to
/// get fine gain control; e.g., to rate limit file system operations.
///
/// All Env implementations are safe for concurrent access from
/// multiple threads without any external synchronization.
///
/// Note: currently, C API behinds C++ API for various settings.
/// See also: `rocksdb/include/env.h`
#[derive(Clone)]
pub struct Env(Arc<EnvWrapper>);
pub(crate) struct EnvWrapper {
inner: *mut ffi::rocksdb_env_t,
}
impl Drop for EnvWrapper {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_env_destroy(self.inner);
}
}
}
impl Env {
/// Returns default env
pub fn new() -> Result<Self, Error> {
let env = unsafe { ffi::rocksdb_create_default_env() };
if env.is_null() {
Err(Error::new("Could not create mem env".to_owned()))
} else {
Ok(Self(Arc::new(EnvWrapper { inner: env })))
}
}
/// Returns a new environment that stores its data in memory and delegates
/// all non-file-storage tasks to base_env.
pub fn mem_env() -> Result<Self, Error> {
let env = unsafe { ffi::rocksdb_create_mem_env() };
if env.is_null() {
Err(Error::new("Could not create mem env".to_owned()))
} else {
Ok(Self(Arc::new(EnvWrapper { inner: env })))
}
}
/// Sets the number of background worker threads of a specific thread pool for this environment.
/// `LOW` is the default pool.
///
/// Default: 1
pub fn set_background_threads(&mut self, num_threads: c_int) {
unsafe {
ffi::rocksdb_env_set_background_threads(self.0.inner, num_threads);
}
}
/// Sets the size of the high priority thread pool that can be used to
/// prevent compactions from stalling memtable flushes.
pub fn set_high_priority_background_threads(&mut self, n: c_int) {
unsafe {
ffi::rocksdb_env_set_high_priority_background_threads(self.0.inner, n);
}
}
/// Sets the size of the low priority thread pool that can be used to
/// prevent compactions from stalling memtable flushes.
pub fn set_low_priority_background_threads(&mut self, n: c_int) {
unsafe {
ffi::rocksdb_env_set_low_priority_background_threads(self.0.inner, n);
}
}
/// Sets the size of the bottom priority thread pool that can be used to
/// prevent compactions from stalling memtable flushes.
pub fn set_bottom_priority_background_threads(&mut self, n: c_int) {
unsafe {
ffi::rocksdb_env_set_bottom_priority_background_threads(self.0.inner, n);
}
}
/// Wait for all threads started by StartThread to terminate.
pub fn join_all_threads(&mut self) {
unsafe {
ffi::rocksdb_env_join_all_threads(self.0.inner);
}
}
/// Lowering IO priority for threads from the specified pool.
pub fn lower_thread_pool_io_priority(&mut self) {
unsafe {
ffi::rocksdb_env_lower_thread_pool_io_priority(self.0.inner);
}
}
/// Lowering IO priority for high priority thread pool.
pub fn lower_high_priority_thread_pool_io_priority(&mut self) {
unsafe {
ffi::rocksdb_env_lower_high_priority_thread_pool_io_priority(self.0.inner);
}
}
/// Lowering CPU priority for threads from the specified pool.
pub fn lower_thread_pool_cpu_priority(&mut self) {
unsafe {
ffi::rocksdb_env_lower_thread_pool_cpu_priority(self.0.inner);
}
}
/// Lowering CPU priority for high priority thread pool.
pub fn lower_high_priority_thread_pool_cpu_priority(&mut self) {
unsafe {
ffi::rocksdb_env_lower_high_priority_thread_pool_cpu_priority(self.0.inner);
}
}
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
#[derive(Default)]
pub(crate) struct OptionsMustOutliveDB {
env: Option<Env>,
@ -385,7 +265,6 @@ unsafe impl Send for CuckooTableOptions {}
unsafe impl Send for ReadOptions {}
unsafe impl Send for IngestExternalFileOptions {}
unsafe impl Send for CacheWrapper {}
unsafe impl Send for EnvWrapper {}
// Sync is similarly safe for many types because they do not expose interior mutability, and their
// use within the rocksdb library is generally behind a const reference
@ -396,7 +275,6 @@ unsafe impl Sync for CuckooTableOptions {}
unsafe impl Sync for ReadOptions {}
unsafe impl Sync for IngestExternalFileOptions {}
unsafe impl Sync for CacheWrapper {}
unsafe impl Sync for EnvWrapper {}
impl Drop for Options {
fn drop(&mut self) {

@ -0,0 +1,125 @@
use std::sync::Arc;
use libc::{self, c_int};
use crate::{ffi, Error};
/// An Env is an interface used by the rocksdb implementation to access
/// operating system functionality like the filesystem etc. Callers
/// may wish to provide a custom Env object when opening a database to
/// get fine gain control; e.g., to rate limit file system operations.
///
/// All Env implementations are safe for concurrent access from
/// multiple threads without any external synchronization.
///
/// Note: currently, C API behinds C++ API for various settings.
/// See also: `rocksdb/include/env.h`
#[derive(Clone)]
pub struct Env(pub(crate) Arc<EnvWrapper>);
pub(crate) struct EnvWrapper {
pub(crate) inner: *mut ffi::rocksdb_env_t,
}
impl Drop for EnvWrapper {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_env_destroy(self.inner);
}
}
}
impl Env {
/// Returns default env
pub fn new() -> Result<Self, Error> {
let env = unsafe { ffi::rocksdb_create_default_env() };
if env.is_null() {
Err(Error::new("Could not create mem env".to_owned()))
} else {
Ok(Self(Arc::new(EnvWrapper { inner: env })))
}
}
/// Returns a new environment that stores its data in memory and delegates
/// all non-file-storage tasks to base_env.
pub fn mem_env() -> Result<Self, Error> {
let env = unsafe { ffi::rocksdb_create_mem_env() };
if env.is_null() {
Err(Error::new("Could not create mem env".to_owned()))
} else {
Ok(Self(Arc::new(EnvWrapper { inner: env })))
}
}
/// Sets the number of background worker threads of a specific thread pool for this environment.
/// `LOW` is the default pool.
///
/// Default: 1
pub fn set_background_threads(&mut self, num_threads: c_int) {
unsafe {
ffi::rocksdb_env_set_background_threads(self.0.inner, num_threads);
}
}
/// Sets the size of the high priority thread pool that can be used to
/// prevent compactions from stalling memtable flushes.
pub fn set_high_priority_background_threads(&mut self, n: c_int) {
unsafe {
ffi::rocksdb_env_set_high_priority_background_threads(self.0.inner, n);
}
}
/// Sets the size of the low priority thread pool that can be used to
/// prevent compactions from stalling memtable flushes.
pub fn set_low_priority_background_threads(&mut self, n: c_int) {
unsafe {
ffi::rocksdb_env_set_low_priority_background_threads(self.0.inner, n);
}
}
/// Sets the size of the bottom priority thread pool that can be used to
/// prevent compactions from stalling memtable flushes.
pub fn set_bottom_priority_background_threads(&mut self, n: c_int) {
unsafe {
ffi::rocksdb_env_set_bottom_priority_background_threads(self.0.inner, n);
}
}
/// Wait for all threads started by StartThread to terminate.
pub fn join_all_threads(&mut self) {
unsafe {
ffi::rocksdb_env_join_all_threads(self.0.inner);
}
}
/// Lowering IO priority for threads from the specified pool.
pub fn lower_thread_pool_io_priority(&mut self) {
unsafe {
ffi::rocksdb_env_lower_thread_pool_io_priority(self.0.inner);
}
}
/// Lowering IO priority for high priority thread pool.
pub fn lower_high_priority_thread_pool_io_priority(&mut self) {
unsafe {
ffi::rocksdb_env_lower_high_priority_thread_pool_io_priority(self.0.inner);
}
}
/// Lowering CPU priority for threads from the specified pool.
pub fn lower_thread_pool_cpu_priority(&mut self) {
unsafe {
ffi::rocksdb_env_lower_thread_pool_cpu_priority(self.0.inner);
}
}
/// Lowering CPU priority for high priority thread pool.
pub fn lower_high_priority_thread_pool_cpu_priority(&mut self) {
unsafe {
ffi::rocksdb_env_lower_high_priority_thread_pool_cpu_priority(self.0.inner);
}
}
}
unsafe impl Send for EnvWrapper {}
unsafe impl Sync for EnvWrapper {}

@ -85,6 +85,7 @@ mod db;
mod db_iterator;
mod db_options;
mod db_pinnable_slice;
mod env;
mod iter_range;
pub mod merge_operator;
pub mod perf;
@ -112,11 +113,12 @@ pub use crate::{
db_options::{
BlockBasedIndexType, BlockBasedOptions, BottommostLevelCompaction, Cache, CompactOptions,
CuckooTableOptions, DBCompactionStyle, DBCompressionType, DBPath, DBRecoveryMode,
DataBlockIndexType, Env, FifoCompactOptions, FlushOptions, IngestExternalFileOptions,
LogLevel, MemtableFactory, Options, PlainTableFactoryOptions, ReadOptions,
UniversalCompactOptions, UniversalCompactionStopStyle, WriteOptions,
DataBlockIndexType, FifoCompactOptions, FlushOptions, IngestExternalFileOptions, LogLevel,
MemtableFactory, Options, PlainTableFactoryOptions, ReadOptions, UniversalCompactOptions,
UniversalCompactionStopStyle, WriteOptions,
},
db_pinnable_slice::DBPinnableSlice,
env::Env,
ffi_util::CStrLike,
iter_range::{IterateBounds, PrefixRange},
merge_operator::MergeOperands,
@ -229,11 +231,11 @@ mod test {
use super::{
column_family::UnboundColumnFamily,
db_options::{CacheWrapper, EnvWrapper},
db_options::CacheWrapper,
env::{Env, EnvWrapper},
BlockBasedOptions, BoundColumnFamily, Cache, ColumnFamily, ColumnFamilyDescriptor,
DBIterator, DBRawIterator, Env, IngestExternalFileOptions, Options,
PlainTableFactoryOptions, ReadOptions, Snapshot, SstFileWriter, WriteBatch, WriteOptions,
DB,
DBIterator, DBRawIterator, IngestExternalFileOptions, Options, PlainTableFactoryOptions,
ReadOptions, Snapshot, SstFileWriter, WriteBatch, WriteOptions, DB,
};
#[test]

@ -18,7 +18,7 @@ use pretty_assertions::assert_eq;
use rocksdb::{
backup::{BackupEngine, BackupEngineOptions, RestoreOptions},
DB,
Env, DB,
};
use util::DBPath;
@ -33,9 +33,10 @@ fn restore_from_latest() {
let value = db.get(b"k1");
assert_eq!(value.unwrap().unwrap(), b"v1111");
{
let backup_path = DBPath::new("backup_path_1");
let backup_opts = BackupEngineOptions::default();
let mut backup_engine = BackupEngine::open(&backup_opts, &backup_path).unwrap();
let env = Env::new().unwrap();
let backup_opts = BackupEngineOptions::new("backup_path_1").unwrap();
let mut backup_engine = BackupEngine::open(&backup_opts, &env).unwrap();
assert!(backup_engine.create_new_backup(&db).is_ok());
// check backup info
@ -73,9 +74,10 @@ fn restore_from_backup() {
let value = db.get(b"k1");
assert_eq!(value.unwrap().unwrap(), b"v1111");
{
let backup_path = DBPath::new("backup_path_2");
let backup_opts = BackupEngineOptions::default();
let mut backup_engine = BackupEngine::open(&backup_opts, &backup_path).unwrap();
let env = Env::new().unwrap();
let backup_opts = BackupEngineOptions::new("backup_path_2").unwrap();
let mut backup_engine = BackupEngine::open(&backup_opts, &env).unwrap();
assert!(backup_engine.create_new_backup(&db).is_ok());
// check backup info

Loading…
Cancel
Save