Support RocksDB transaction. (#565)

master
Yiyuan Liu 2 years ago committed by GitHub
parent 934855fe54
commit 2257be1563
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      librocksdb-sys/build.rs
  2. 6
      librocksdb-sys/rocksdb_lib_sources.txt
  3. 4
      src/backup.rs
  4. 9
      src/checkpoint.rs
  5. 339
      src/db.rs
  6. 20
      src/db_iterator.rs
  7. 4
      src/db_options.rs
  8. 71
      src/lib.rs
  9. 4
      src/perf.rs
  10. 4
      src/snapshot.rs
  11. 24
      src/transactions/mod.rs
  12. 294
      src/transactions/optimistic_transaction_db.rs
  13. 297
      src/transactions/options.rs
  14. 881
      src/transactions/transaction.rs
  15. 983
      src/transactions/transaction_db.rs
  16. 42
      src/write_batch.rs
  17. 9
      tests/fail/snapshot_outlive_transaction.rs
  18. 10
      tests/fail/snapshot_outlive_transaction.stderr
  19. 8
      tests/fail/snapshot_outlive_transaction_db.rs
  20. 10
      tests/fail/snapshot_outlive_transaction_db.stderr
  21. 8
      tests/fail/transaction_outlive_transaction_db.rs
  22. 10
      tests/fail/transaction_outlive_transaction_db.stderr
  23. 6
      tests/test_db.rs
  24. 577
      tests/test_optimistic_transaction_db.rs
  25. 689
      tests/test_transaction_db.rs

@ -98,13 +98,8 @@ fn build_rocksdb() {
.trim()
.split('\n')
.map(str::trim)
.collect::<Vec<&'static str>>();
// We have a pregenerated a version of build_version.cc in the local directory
lib_sources = lib_sources
.iter()
.cloned()
.filter(|&file| file != "util/build_version.cc")
// We have a pre-generated a version of build_version.cc in the local directory
.filter(|file| !matches!(*file, "util/build_version.cc"))
.collect::<Vec<&'static str>>();
if target.contains("x86_64") {
@ -143,10 +138,6 @@ fn build_rocksdb() {
}
}
if target.contains("aarch64") {
lib_sources.push("util/crc32c_arm64.cc")
}
if target.contains("apple-ios") {
config.define("OS_MACOSX", None);
@ -248,8 +239,7 @@ fn build_rocksdb() {
}
for file in lib_sources {
let file = "rocksdb/".to_string() + file;
config.file(&file);
config.file(&format!("rocksdb/{file}"));
}
config.file("build_version.cc");

@ -145,12 +145,6 @@ options/options.cc
options/options_helper.cc
options/options_parser.cc
port/port_posix.cc
port/win/env_default.cc
port/win/env_win.cc
port/win/io_win.cc
port/win/port_win.cc
port/win/win_logger.cc
port/win/win_thread.cc
port/stack_trace.cc
table/adaptive/adaptive_table_factory.cc
table/block_based/binary_search_index_reader.cc

@ -13,7 +13,7 @@
// limitations under the License.
//
use crate::{ffi, ffi_util::to_cpath, Error, DB};
use crate::{db::DBInner, ffi, ffi_util::to_cpath, Error, DB};
use libc::{c_int, c_uchar};
use std::path::Path;
@ -82,7 +82,7 @@ impl BackupEngine {
unsafe {
ffi_try!(ffi::rocksdb_backup_engine_create_new_backup_flush(
self.inner,
db.inner,
db.inner.inner(),
c_uchar::from(flush_before_backup),
));
Ok(())

@ -17,9 +17,8 @@
//!
//! [1]: https://github.com/facebook/rocksdb/wiki/Checkpoints
use crate::{ffi, ffi_util::to_cpath, Error, DB};
use std::marker::PhantomData;
use std::path::Path;
use crate::{db::DBInner, ffi, ffi_util::to_cpath, DBCommon, Error, ThreadMode};
use std::{marker::PhantomData, path::Path};
/// Undocumented parameter for `ffi::rocksdb_checkpoint_create` function. Zero by default.
const LOG_SIZE_FOR_FLUSH: u64 = 0_u64;
@ -36,11 +35,11 @@ impl<'db> Checkpoint<'db> {
///
/// Does not actually produce checkpoints, call `.create_checkpoint()` method to produce
/// a DB checkpoint.
pub fn new(db: &'db DB) -> Result<Self, Error> {
pub fn new<T: ThreadMode, I: DBInner>(db: &'db DBCommon<T, I>) -> Result<Self, Error> {
let checkpoint: *mut ffi::rocksdb_checkpoint_t;
unsafe {
checkpoint = ffi_try!(ffi::rocksdb_checkpoint_object_create(db.inner));
checkpoint = ffi_try!(ffi::rocksdb_checkpoint_object_create(db.inner.inner()));
}
if checkpoint.is_null() {

@ -67,7 +67,7 @@ pub trait ThreadMode {
///
/// See [`DB`] for more details, including performance implications for each mode
pub struct SingleThreaded {
cfs: BTreeMap<String, ColumnFamily>,
pub(crate) cfs: BTreeMap<String, ColumnFamily>,
}
/// Actual marker type for the marker trait `ThreadMode`, which holds
@ -76,7 +76,7 @@ pub struct SingleThreaded {
///
/// See [`DB`] for more details, including performance implications for each mode
pub struct MultiThreaded {
cfs: RwLock<BTreeMap<String, Arc<UnboundColumnFamily>>>,
pub(crate) cfs: RwLock<BTreeMap<String, Arc<UnboundColumnFamily>>>,
}
impl ThreadMode for SingleThreaded {
@ -116,22 +116,36 @@ impl ThreadMode for MultiThreaded {
}
}
/// A RocksDB database.
///
/// This is previously named [`DB`], which is a type alias now for compatibility.
/// Get underlying `rocksdb_t`.
pub trait DBInner {
fn inner(&self) -> *mut ffi::rocksdb_t;
}
/// A helper type to implement some common methods for [`DBWithThreadMode`]
/// and [`OptimisticTransactionDB`].
///
/// See crate level documentation for a simple usage example.
pub struct DBWithThreadMode<T: ThreadMode> {
pub(crate) inner: *mut ffi::rocksdb_t,
/// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
pub struct DBCommon<T: ThreadMode, D: DBInner> {
pub(crate) inner: D,
cfs: T, // Column families are held differently depending on thread mode
path: PathBuf,
_outlive: Vec<OptionsMustOutliveDB>,
}
/// Minimal set of DB-related methods, intended to be generic over
/// Minimal set of DB-related methods, intended to be generic over
/// `DBWithThreadMode<T>`. Mainly used internally
pub trait DBAccess {
fn inner(&self) -> *mut ffi::rocksdb_t;
unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t;
unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t);
unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t;
unsafe fn create_iterator_cf(
&self,
cf_handle: *mut ffi::rocksdb_column_family_handle_t,
readopts: &ReadOptions,
) -> *mut ffi::rocksdb_iterator_t;
fn get_opt<K: AsRef<[u8]>>(
&self,
@ -177,22 +191,27 @@ pub trait DBAccess {
K: AsRef<[u8]>,
I: IntoIterator<Item = (&'b W, K)>,
W: AsColumnFamilyRef + 'b;
}
impl<T: ThreadMode, D: DBInner> DBAccess for DBCommon<T, D> {
unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
ffi::rocksdb_create_snapshot(self.inner.inner())
}
fn batched_multi_get_cf_opt<K, I>(
unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
ffi::rocksdb_release_snapshot(self.inner.inner(), snapshot);
}
unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
ffi::rocksdb_create_iterator(self.inner.inner(), readopts.inner)
}
unsafe fn create_iterator_cf(
&self,
cf: &impl AsColumnFamilyRef,
keys: I,
sorted_input: bool,
cf_handle: *mut ffi::rocksdb_column_family_handle_t,
readopts: &ReadOptions,
) -> Vec<Result<Option<DBPinnableSlice>, Error>>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = K>;
}
impl<T: ThreadMode> DBAccess for DBWithThreadMode<T> {
fn inner(&self) -> *mut ffi::rocksdb_t {
self.inner
) -> *mut ffi::rocksdb_iterator_t {
ffi::rocksdb_create_iterator_cf(self.inner.inner(), readopts.inner, cf_handle)
}
fn get_opt<K: AsRef<[u8]>>(
@ -229,53 +248,63 @@ impl<T: ThreadMode> DBAccess for DBWithThreadMode<T> {
self.get_pinned_cf_opt(cf, key, readopts)
}
fn multi_get_opt<K, I>(
fn multi_get_opt<K, Iter>(
&self,
keys: I,
keys: Iter,
readopts: &ReadOptions,
) -> Vec<Result<Option<Vec<u8>>, Error>>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = K>,
Iter: IntoIterator<Item = K>,
{
self.multi_get_opt(keys, readopts)
}
fn multi_get_cf_opt<'b, K, I, W>(
fn multi_get_cf_opt<'b, K, Iter, W>(
&self,
keys_cf: I,
keys_cf: Iter,
readopts: &ReadOptions,
) -> Vec<Result<Option<Vec<u8>>, Error>>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = (&'b W, K)>,
Iter: IntoIterator<Item = (&'b W, K)>,
W: AsColumnFamilyRef + 'b,
{
self.multi_get_cf_opt(keys_cf, readopts)
}
}
fn batched_multi_get_cf_opt<K, I>(
&self,
cf: &impl AsColumnFamilyRef,
keys: I,
sorted_input: bool,
readopts: &ReadOptions,
) -> Vec<Result<Option<DBPinnableSlice>, Error>>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = K>,
{
self.batched_multi_get_cf_opt(cf, keys, sorted_input, readopts)
pub struct DBWithThreadModeInner {
inner: *mut ffi::rocksdb_t,
}
impl DBInner for DBWithThreadModeInner {
fn inner(&self) -> *mut ffi::rocksdb_t {
self.inner
}
}
impl Drop for DBWithThreadModeInner {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_close(self.inner);
}
}
}
/// A type alias to RocksDB database.
///
/// See crate level documentation for a simple usage example.
/// See [`DBCommon`] for full list of methods.
pub type DBWithThreadMode<T> = DBCommon<T, DBWithThreadModeInner>;
/// A type alias to DB instance type with the single-threaded column family
/// creations/deletions
///
/// # Compatibility and multi-threaded mode
///
/// Previously, [`DB`] was defined as a direct `struct`. Now, it's type-aliased for
/// compatibility. Use `DBWithThreadMode<MultiThreaded>` for multi-threaded
/// compatibility. Use `DBCommon<MultiThreaded>` for multi-threaded
/// column family alternations.
///
/// # Limited performance implication for single-threaded mode
@ -300,11 +329,11 @@ pub type DB = DBWithThreadMode<MultiThreaded>;
// Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI
// pointer. In most cases, however, this pointer is Send-safe because it is never aliased and
// rocksdb internally does not rely on thread-local information for its user-exposed types.
unsafe impl<T: ThreadMode + Send> Send for DBWithThreadMode<T> {}
unsafe impl<T: ThreadMode + Send, I: DBInner> Send for DBCommon<T, I> {}
// Sync is similarly safe for many types because they do not expose interior mutability, and their
// use within the rocksdb library is generally behind a const reference
unsafe impl<T: ThreadMode> Sync for DBWithThreadMode<T> {}
unsafe impl<T: ThreadMode, I: DBInner> Sync for DBCommon<T, I> {}
// Specifies whether open DB for read only.
enum AccessType<'a> {
@ -314,6 +343,7 @@ enum AccessType<'a> {
WithTTL { ttl: Duration },
}
/// Methods of `DBWithThreadMode`.
impl<T: ThreadMode> DBWithThreadMode<T> {
/// Opens a database with default options.
pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
@ -620,7 +650,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
}
Ok(Self {
inner: db,
inner: DBWithThreadModeInner { inner: db },
path: path.as_ref().to_path_buf(),
cfs: T::new_cf_map_internal(cf_map),
_outlive: outlive,
@ -720,6 +750,74 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
Ok(db)
}
/// Removes the database entries in the range `["from", "to")` using given write options.
pub fn delete_range_cf_opt<K: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
from: K,
to: K,
writeopts: &WriteOptions,
) -> Result<(), Error> {
let from = from.as_ref();
let to = to.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_delete_range_cf(
self.inner.inner(),
writeopts.inner,
cf.inner(),
from.as_ptr() as *const c_char,
from.len() as size_t,
to.as_ptr() as *const c_char,
to.len() as size_t,
));
Ok(())
}
}
/// Removes the database entries in the range `["from", "to")` using default write options.
pub fn delete_range_cf<K: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
from: K,
to: K,
) -> Result<(), Error> {
self.delete_range_cf_opt(cf, from, to, &WriteOptions::default())
}
pub fn write_opt(&self, batch: WriteBatch, writeopts: &WriteOptions) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_write(
self.inner.inner(),
writeopts.inner,
batch.inner
));
}
Ok(())
}
pub fn write(&self, batch: WriteBatch) -> Result<(), Error> {
self.write_opt(batch, &WriteOptions::default())
}
pub fn write_without_wal(&self, batch: WriteBatch) -> Result<(), Error> {
let mut wo = WriteOptions::new();
wo.disable_wal(true);
self.write_opt(batch, &wo)
}
}
/// Common methods of `DBWithThreadMode` and `OptimisticTransactionDB`.
impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
pub(crate) fn new(inner: D, cfs: T, path: PathBuf, outlive: Vec<OptionsMustOutliveDB>) -> Self {
Self {
inner,
cfs,
path,
_outlive: outlive,
}
}
pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
let cpath = to_cpath(path)?;
let mut length = 0;
@ -764,7 +862,10 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// the data to disk.
pub fn flush_wal(&self, sync: bool) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_flush_wal(self.inner, c_uchar::from(sync)));
ffi_try!(ffi::rocksdb_flush_wal(
self.inner.inner(),
c_uchar::from(sync)
));
}
Ok(())
}
@ -772,7 +873,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// Flushes database memtables to SST files on the disk.
pub fn flush_opt(&self, flushopts: &FlushOptions) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_flush(self.inner, flushopts.inner));
ffi_try!(ffi::rocksdb_flush(self.inner.inner(), flushopts.inner));
}
Ok(())
}
@ -790,7 +891,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_flush_cf(
self.inner,
self.inner.inner(),
flushopts.inner,
cf.inner()
));
@ -804,23 +905,6 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
self.flush_cf_opt(cf, &FlushOptions::default())
}
pub fn write_opt(&self, batch: WriteBatch, writeopts: &WriteOptions) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_write(self.inner, writeopts.inner, batch.inner));
}
Ok(())
}
pub fn write(&self, batch: WriteBatch) -> Result<(), Error> {
self.write_opt(batch, &WriteOptions::default())
}
pub fn write_without_wal(&self, batch: WriteBatch) -> Result<(), Error> {
let mut wo = WriteOptions::new();
wo.disable_wal(true);
self.write_opt(batch, &wo)
}
/// Return the bytes associated with a key value with read options. If you only intend to use
/// the vector returned temporarily, consider using [`get_pinned_opt`](#method.get_pinned_opt)
/// to avoid unnecessary memory copy.
@ -882,7 +966,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let key = key.as_ref();
unsafe {
let val = ffi_try!(ffi::rocksdb_get_pinned(
self.inner,
self.inner.inner(),
readopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
@ -922,7 +1006,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let key = key.as_ref();
unsafe {
let val = ffi_try!(ffi::rocksdb_get_pinned_cf(
self.inner,
self.inner.inner(),
readopts.inner,
cf.inner(),
key.as_ptr() as *const c_char,
@ -977,7 +1061,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let mut errors = vec![ptr::null_mut(); keys.len()];
unsafe {
ffi::rocksdb_multi_get(
self.inner,
self.inner.inner(),
readopts.inner,
ptr_keys.len(),
ptr_keys.as_ptr(),
@ -1033,7 +1117,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
unsafe {
ffi::rocksdb_multi_get_cf(
self.inner,
self.inner.inner(),
readopts.inner,
ptr_cfs.as_ptr(),
ptr_keys.len(),
@ -1089,7 +1173,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
unsafe {
ffi::rocksdb_batched_multi_get_cf(
self.inner,
self.inner.inner(),
readopts.inner,
cf.inner(),
ptr_keys.len(),
@ -1129,7 +1213,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let key = key.as_ref();
unsafe {
0 != ffi::rocksdb_key_may_exist(
self.inner,
self.inner.inner(),
readopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
@ -1159,7 +1243,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let key = key.as_ref();
0 != unsafe {
ffi::rocksdb_key_may_exist_cf(
self.inner,
self.inner.inner(),
readopts.inner,
cf.inner(),
key.as_ptr() as *const c_char,
@ -1185,7 +1269,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
})?;
Ok(unsafe {
ffi_try!(ffi::rocksdb_create_column_family(
self.inner,
self.inner.inner(),
opts.inner,
cf_name.as_ptr(),
))
@ -1324,7 +1408,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
unsafe {
ffi_try!(ffi::rocksdb_put(
self.inner,
self.inner.inner(),
writeopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
@ -1351,7 +1435,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
unsafe {
ffi_try!(ffi::rocksdb_put_cf(
self.inner,
self.inner.inner(),
writeopts.inner,
cf.inner(),
key.as_ptr() as *const c_char,
@ -1373,7 +1457,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
unsafe {
ffi_try!(ffi::rocksdb_merge(
self.inner,
self.inner.inner(),
writeopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
@ -1400,7 +1484,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
unsafe {
ffi_try!(ffi::rocksdb_merge_cf(
self.inner,
self.inner.inner(),
writeopts.inner,
cf.inner(),
key.as_ptr() as *const c_char,
@ -1421,7 +1505,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
unsafe {
ffi_try!(ffi::rocksdb_delete(
self.inner,
self.inner.inner(),
writeopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
@ -1440,7 +1524,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
unsafe {
ffi_try!(ffi::rocksdb_delete_cf(
self.inner,
self.inner.inner(),
writeopts.inner,
cf.inner(),
key.as_ptr() as *const c_char,
@ -1450,31 +1534,6 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
}
}
/// Removes the database entries in the range `["from", "to")` using given write options.
pub fn delete_range_cf_opt<K: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
from: K,
to: K,
writeopts: &WriteOptions,
) -> Result<(), Error> {
let from = from.as_ref();
let to = to.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_delete_range_cf(
self.inner,
writeopts.inner,
cf.inner(),
from.as_ptr() as *const c_char,
from.len() as size_t,
to.as_ptr() as *const c_char,
to.len() as size_t,
));
Ok(())
}
}
pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
where
K: AsRef<[u8]>,
@ -1519,16 +1578,6 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
self.delete_cf_opt(cf, key.as_ref(), &WriteOptions::default())
}
/// Removes the database entries in the range `["from", "to")` using default write options.
pub fn delete_range_cf<K: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
from: K,
to: K,
) -> Result<(), Error> {
self.delete_range_cf_opt(cf, from, to, &WriteOptions::default())
}
/// Runs a manual compaction on the Range of keys given. This is not likely to be needed for typical usage.
pub fn compact_range<S: AsRef<[u8]>, E: AsRef<[u8]>>(&self, start: Option<S>, end: Option<E>) {
unsafe {
@ -1536,7 +1585,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let end = end.as_ref().map(AsRef::as_ref);
ffi::rocksdb_compact_range(
self.inner,
self.inner.inner(),
opt_bytes_to_ptr(start),
start.map_or(0, <[u8]>::len) as size_t,
opt_bytes_to_ptr(end),
@ -1557,7 +1606,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let end = end.as_ref().map(AsRef::as_ref);
ffi::rocksdb_compact_range_opt(
self.inner,
self.inner.inner(),
opts.inner,
opt_bytes_to_ptr(start),
start.map_or(0, <[u8]>::len) as size_t,
@ -1580,7 +1629,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let end = end.as_ref().map(AsRef::as_ref);
ffi::rocksdb_compact_range_cf(
self.inner,
self.inner.inner(),
cf.inner(),
opt_bytes_to_ptr(start),
start.map_or(0, <[u8]>::len) as size_t,
@ -1603,7 +1652,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let end = end.as_ref().map(AsRef::as_ref);
ffi::rocksdb_compact_range_cf_opt(
self.inner,
self.inner.inner(),
cf.inner(),
opts.inner,
opt_bytes_to_ptr(start),
@ -1621,7 +1670,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let count = opts.len() as i32;
unsafe {
ffi_try!(ffi::rocksdb_set_options(
self.inner,
self.inner.inner(),
count,
cnames.as_ptr(),
cvalues.as_ptr(),
@ -1641,7 +1690,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let count = opts.len() as i32;
unsafe {
ffi_try!(ffi::rocksdb_set_options_cf(
self.inner,
self.inner.inner(),
cf.inner(),
count,
cnames.as_ptr(),
@ -1696,7 +1745,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
Self::property_value_impl(
name,
|prop_name| unsafe { ffi::rocksdb_property_value(self.inner, prop_name) },
|prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
|str_value| Ok(str_value.to_owned()),
)
}
@ -1713,7 +1762,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
Self::property_value_impl(
name,
|prop_name| unsafe {
ffi::rocksdb_property_value_cf(self.inner, cf.inner(), prop_name)
ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
},
|str_value| Ok(str_value.to_owned()),
)
@ -1735,7 +1784,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
Self::property_value_impl(
name,
|prop_name| unsafe { ffi::rocksdb_property_value(self.inner, prop_name) },
|prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
Self::parse_property_int_value,
)
}
@ -1752,7 +1801,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
Self::property_value_impl(
name,
|prop_name| unsafe {
ffi::rocksdb_property_value_cf(self.inner, cf.inner(), prop_name)
ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
},
Self::parse_property_int_value,
)
@ -1760,7 +1809,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// The sequence number of the most recent transaction.
pub fn latest_sequence_number(&self) -> u64 {
unsafe { ffi::rocksdb_get_latest_sequence_number(self.inner) }
unsafe { ffi::rocksdb_get_latest_sequence_number(self.inner.inner()) }
}
/// Iterate over batches of write operations since a given sequence.
@ -1779,7 +1828,11 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
// for creating and destroying it; fortunately we can pass a nullptr
// here to get the default behavior
let opts: *const ffi::rocksdb_wal_readoptions_t = ptr::null();
let iter = ffi_try!(ffi::rocksdb_get_updates_since(self.inner, seq_number, opts));
let iter = ffi_try!(ffi::rocksdb_get_updates_since(
self.inner.inner(),
seq_number,
opts
));
Ok(DBWALIterator { inner: iter })
}
}
@ -1788,7 +1841,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// log files.
pub fn try_catch_up_with_primary(&self) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_try_catch_up_with_primary(self.inner));
ffi_try!(ffi::rocksdb_try_catch_up_with_primary(self.inner.inner()));
}
Ok(())
}
@ -1851,7 +1904,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_ingest_external_file(
self.inner,
self.inner.inner(),
cpaths.as_ptr(),
paths_v.len(),
opts.inner as *const _
@ -1869,7 +1922,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_ingest_external_file_cf(
self.inner,
self.inner.inner(),
cf.inner(),
cpaths.as_ptr(),
paths_v.len(),
@ -1883,7 +1936,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// and end key
pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
unsafe {
let files = ffi::rocksdb_livefiles(self.inner);
let files = ffi::rocksdb_livefiles(self.inner.inner());
if files.is_null() {
Err(Error::new("Could not get live files".to_owned()))
} else {
@ -1941,7 +1994,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let to = to.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_delete_file_in_range(
self.inner,
self.inner.inner(),
from.as_ptr() as *const c_char,
from.len() as size_t,
to.as_ptr() as *const c_char,
@ -1962,7 +2015,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let to = to.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_delete_file_in_range_cf(
self.inner,
self.inner.inner(),
cf.inner(),
from.as_ptr() as *const c_char,
from.len() as size_t,
@ -1976,7 +2029,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// Request stopping background work, if wait is true wait until it's done.
pub fn cancel_all_background_work(&self, wait: bool) {
unsafe {
ffi::rocksdb_cancel_all_background_work(self.inner, c_uchar::from(wait));
ffi::rocksdb_cancel_all_background_work(self.inner.inner(), c_uchar::from(wait));
}
}
@ -1987,7 +2040,10 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
) -> Result<(), Error> {
unsafe {
// first mark the column family as dropped
ffi_try!(ffi::rocksdb_drop_column_family(self.inner, cf_inner));
ffi_try!(ffi::rocksdb_drop_column_family(
self.inner.inner(),
cf_inner
));
}
// then finally reclaim any resources (mem, files) by destroying the only single column
// family handle by drop()-ing it
@ -1996,7 +2052,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
}
}
impl DBWithThreadMode<SingleThreaded> {
impl<I: DBInner> DBCommon<SingleThreaded, I> {
/// Creates column family with given name and options
pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
@ -2021,7 +2077,7 @@ impl DBWithThreadMode<SingleThreaded> {
}
}
impl DBWithThreadMode<MultiThreaded> {
impl<I: DBInner> DBCommon<MultiThreaded, I> {
/// Creates column family with given name and options
pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
@ -2054,16 +2110,13 @@ impl DBWithThreadMode<MultiThreaded> {
}
}
impl<T: ThreadMode> Drop for DBWithThreadMode<T> {
impl<T: ThreadMode, I: DBInner> Drop for DBCommon<T, I> {
fn drop(&mut self) {
unsafe {
self.cfs.drop_all_cfs_internal();
ffi::rocksdb_close(self.inner);
}
self.cfs.drop_all_cfs_internal();
}
}
impl<T: ThreadMode> fmt::Debug for DBWithThreadMode<T> {
impl<T: ThreadMode, I: DBInner> fmt::Debug for DBCommon<T, I> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "RocksDB {{ path: {:?} }}", self.path())
}
@ -2106,7 +2159,7 @@ fn convert_options(opts: &[(&str, &str)]) -> Result<Vec<(CString, CString)>, Err
.collect()
}
fn convert_values(
pub(crate) fn convert_values(
values: Vec<*mut c_char>,
values_sizes: Vec<usize>,
errors: Vec<*mut c_char>,

@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::db::{DBAccess, DB};
use crate::{ffi, Error, ReadOptions, WriteBatch};
use crate::{
db::{DBAccess, DB},
ffi, Error, ReadOptions, WriteBatch,
};
use libc::{c_char, c_uchar, size_t};
use std::marker::PhantomData;
use std::slice;
use std::{marker::PhantomData, slice};
/// A type alias to keep compatibility. See [`DBRawIteratorWithThreadMode`] for details
pub type DBRawIterator<'a> = DBRawIteratorWithThreadMode<'a, DB>;
@ -87,7 +88,7 @@ pub struct DBRawIteratorWithThreadMode<'a, D: DBAccess> {
impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> {
pub(crate) fn new(db: &D, readopts: ReadOptions) -> Self {
let inner = unsafe { ffi::rocksdb_create_iterator(db.inner(), readopts.inner) };
let inner = unsafe { db.create_iterator(&readopts) };
Self::from_inner(inner, readopts)
}
@ -96,16 +97,15 @@ impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> {
cf_handle: *mut ffi::rocksdb_column_family_handle_t,
readopts: ReadOptions,
) -> Self {
let inner =
unsafe { ffi::rocksdb_create_iterator_cf(db.inner(), readopts.inner, cf_handle) };
let inner = unsafe { db.create_iterator_cf(cf_handle, &readopts) };
Self::from_inner(inner, readopts)
}
fn from_inner(inner: *mut ffi::rocksdb_iterator_t, readopts: ReadOptions) -> Self {
// This unwrap will never fail since rocksdb_create_iterator and
// rocksdb_create_iterator_cf functions always return non-null. They
// use new and deference the result so any nulls would end up in SIGSEGV
// there and we have bigger issue.
// rocksdb_create_iterator_cf functions always return non-null. They
// use new and deference the result so any nulls would end up with SIGSEGV
// there and we would have a bigger issue.
let inner = std::ptr::NonNull::new(inner).unwrap();
Self {
inner,

@ -576,9 +576,9 @@ impl BlockBasedOptions {
pub fn set_bloom_filter(&mut self, bits_per_key: c_double, block_based: bool) {
unsafe {
let bloom = if block_based {
ffi::rocksdb_filterpolicy_create_bloom(bits_per_key)
ffi::rocksdb_filterpolicy_create_bloom(bits_per_key as _)
} else {
ffi::rocksdb_filterpolicy_create_bloom_full(bits_per_key)
ffi::rocksdb_filterpolicy_create_bloom_full(bits_per_key as _)
};
ffi::rocksdb_block_based_options_set_filter_policy(self.inner, bloom);

@ -92,6 +92,7 @@ pub mod properties;
mod slice_transform;
mod snapshot;
mod sst_file_writer;
mod transactions;
mod write_batch;
pub use crate::{
@ -100,7 +101,10 @@ pub use crate::{
ColumnFamilyRef, DEFAULT_COLUMN_FAMILY_NAME,
},
compaction_filter::Decision as CompactionDecision,
db::{DBAccess, DBWithThreadMode, LiveFile, MultiThreaded, SingleThreaded, ThreadMode, DB},
db::{
DBAccess, DBCommon, DBWithThreadMode, LiveFile, MultiThreaded, SingleThreaded, ThreadMode,
DB,
},
db_iterator::{
DBIterator, DBIteratorWithThreadMode, DBRawIterator, DBRawIteratorWithThreadMode,
DBWALIterator, Direction, IteratorMode,
@ -120,7 +124,11 @@ pub use crate::{
slice_transform::SliceTransform,
snapshot::{Snapshot, SnapshotWithThreadMode},
sst_file_writer::SstFileWriter,
write_batch::{WriteBatch, WriteBatchIterator},
transactions::{
OptimisticTransactionDB, OptimisticTransactionOptions, Transaction, TransactionDB,
TransactionDBOptions, TransactionOptions,
},
write_batch::{WriteBatch, WriteBatchIterator, WriteBatchWithTransaction},
};
use librocksdb_sys as ffi;
@ -128,6 +136,27 @@ use librocksdb_sys as ffi;
use std::error;
use std::fmt;
/// RocksDB error kind.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ErrorKind {
NotFound,
Corruption,
NotSupported,
InvalidArgument,
IOError,
MergeInProgress,
Incomplete,
ShutdownInProgress,
TimedOut,
Aborted,
Busy,
Expired,
TryAgain,
CompactionTooLarge,
ColumnFamilyDropped,
Unknown,
}
/// A simple wrapper round a string, used for errors reported from
/// ffi calls.
#[derive(Debug, Clone, PartialEq)]
@ -143,6 +172,28 @@ impl Error {
pub fn into_string(self) -> String {
self.into()
}
/// Parse corresponding [`ErrorKind`] from error message.
pub fn kind(&self) -> ErrorKind {
match self.message.split(':').next().unwrap_or("") {
"NotFound" => ErrorKind::NotFound,
"Corruption" => ErrorKind::Corruption,
"Not implemented" => ErrorKind::NotSupported,
"Invalid argument" => ErrorKind::InvalidArgument,
"IO error" => ErrorKind::IOError,
"Merge in progress" => ErrorKind::MergeInProgress,
"Result incomplete" => ErrorKind::Incomplete,
"Shutdown in progress" => ErrorKind::ShutdownInProgress,
"Operation timed out" => ErrorKind::TimedOut,
"Operation aborted" => ErrorKind::Aborted,
"Resource busy" => ErrorKind::Busy,
"Operation expired" => ErrorKind::Expired,
"Operation failed. Try again." => ErrorKind::TryAgain,
"Compaction too large" => ErrorKind::CompactionTooLarge,
"Column family dropped" => ErrorKind::ColumnFamilyDropped,
_ => ErrorKind::Unknown,
}
}
}
impl AsRef<str> for Error {
@ -171,6 +222,11 @@ impl fmt::Display for Error {
#[cfg(test)]
mod test {
use crate::{
OptimisticTransactionDB, OptimisticTransactionOptions, Transaction, TransactionDB,
TransactionDBOptions, TransactionOptions,
};
use super::{
column_family::UnboundColumnFamily,
db_options::{CacheWrapper, EnvWrapper},
@ -209,6 +265,12 @@ mod test {
is_send::<CacheWrapper>();
is_send::<Env>();
is_send::<EnvWrapper>();
is_send::<TransactionDB>();
is_send::<OptimisticTransactionDB>();
is_send::<Transaction<'_, TransactionDB>>();
is_send::<TransactionDBOptions>();
is_send::<OptimisticTransactionOptions>();
is_send::<TransactionOptions>();
}
#[test]
@ -234,5 +296,10 @@ mod test {
is_sync::<CacheWrapper>();
is_sync::<Env>();
is_sync::<EnvWrapper>();
is_sync::<TransactionDB>();
is_sync::<OptimisticTransactionDB>();
is_sync::<TransactionDBOptions>();
is_sync::<OptimisticTransactionOptions>();
is_sync::<TransactionOptions>();
}
}

@ -14,7 +14,7 @@
use libc::{c_int, c_uchar, c_void};
use crate::{ffi, ffi_util::from_cstr, Cache, Error, DB};
use crate::{db::DBInner, ffi, ffi_util::from_cstr, Cache, Error, DB};
#[derive(Debug, Copy, Clone, PartialEq)]
#[repr(i32)]
@ -242,7 +242,7 @@ impl MemoryUsageBuilder {
/// Add a DB instance to collect memory usage from it and add up in total stats
fn add_db(&mut self, db: &DB) {
unsafe {
ffi::rocksdb_memory_consumers_add_db(self.inner, db.inner);
ffi::rocksdb_memory_consumers_add_db(self.inner, db.inner.inner());
}
}

@ -44,7 +44,7 @@ pub struct SnapshotWithThreadMode<'a, D: DBAccess> {
impl<'a, D: DBAccess> SnapshotWithThreadMode<'a, D> {
/// Creates a new `SnapshotWithThreadMode` of the database `db`.
pub fn new(db: &'a D) -> Self {
let snapshot = unsafe { ffi::rocksdb_create_snapshot(db.inner()) };
let snapshot = unsafe { db.create_snapshot() };
Self {
db,
inner: snapshot,
@ -258,7 +258,7 @@ impl<'a, D: DBAccess> SnapshotWithThreadMode<'a, D> {
impl<'a, D: DBAccess> Drop for SnapshotWithThreadMode<'a, D> {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_release_snapshot(self.db.inner(), self.inner);
self.db.release_snapshot(self.inner);
}
}
}

@ -0,0 +1,24 @@
// Copyright 2021 Yiyuan Liu
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
mod optimistic_transaction_db;
mod options;
mod transaction;
mod transaction_db;
pub use optimistic_transaction_db::OptimisticTransactionDB;
pub use options::{OptimisticTransactionOptions, TransactionDBOptions, TransactionOptions};
pub use transaction::Transaction;
pub use transaction_db::TransactionDB;

@ -0,0 +1,294 @@
// Copyright 2021 Yiyuan Liu
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
use std::{collections::BTreeMap, ffi::CString, fs, iter, marker::PhantomData, path::Path, ptr};
use libc::{c_char, c_int};
use crate::{
db::DBCommon, db::DBInner, ffi, ffi_util::to_cpath, write_batch::WriteBatchWithTransaction,
ColumnFamilyDescriptor, Error, OptimisticTransactionOptions, Options, ThreadMode, Transaction,
WriteOptions, DEFAULT_COLUMN_FAMILY_NAME,
};
/// A type alias to RocksDB Optimistic Transaction DB.
///
/// Please read the official
/// [guide](https://github.com/facebook/rocksdb/wiki/Transactions#optimistictransactiondb)
/// to learn more about RocksDB OptimisticTransactionDB.
///
/// The default thread mode for [`OptimisticTransactionDB`] is [`SingleThreaded`]
/// if feature `multi-threaded-cf` is not enabled.
///
/// See [`DBCommon`] for full list of methods.
///
/// # Examples