Make `CacheWrapper` and `EnvWrapper` Send and Sync (#586)

master
Oleksandr Anyshchenko 3 years ago committed by GitHub
parent 92b27f5d33
commit 04a0dca01d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/column_family.rs
  2. 2
      src/db.rs
  3. 10
      src/db_options.rs
  4. 4
      src/ffi_util.rs
  5. 8
      src/lib.rs
  6. 21
      tests/test_column_family.rs
  7. 6
      tests/test_property.rs

@ -141,4 +141,6 @@ impl<'a> AsColumnFamilyRef for Arc<BoundColumnFamily<'a>> {
} }
unsafe impl Send for ColumnFamily {} unsafe impl Send for ColumnFamily {}
unsafe impl Send for UnboundColumnFamily {}
unsafe impl Sync for UnboundColumnFamily {}
unsafe impl<'a> Send for BoundColumnFamily<'a> {} unsafe impl<'a> Send for BoundColumnFamily<'a> {}

@ -245,7 +245,7 @@ pub type DB = DBWithThreadMode<MultiThreaded>;
// Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI // Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI
// pointer. In most cases, however, this pointer is Send-safe because it is never aliased and // pointer. In most cases, however, this pointer is Send-safe because it is never aliased and
// rocksdb internally does not rely on thread-local information for its user-exposed types. // rocksdb internally does not rely on thread-local information for its user-exposed types.
unsafe impl<T: ThreadMode> Send for DBWithThreadMode<T> {} unsafe impl<T: ThreadMode + Send> Send for DBWithThreadMode<T> {}
// Sync is similarly safe for many types because they do not expose interior mutability, and their // Sync is similarly safe for many types because they do not expose interior mutability, and their
// use within the rocksdb library is generally behind a const reference // use within the rocksdb library is generally behind a const reference

@ -92,7 +92,7 @@ impl Cache {
#[derive(Clone)] #[derive(Clone)]
pub struct Env(Arc<EnvWrapper>); pub struct Env(Arc<EnvWrapper>);
struct EnvWrapper { pub(crate) struct EnvWrapper {
inner: *mut ffi::rocksdb_env_t, inner: *mut ffi::rocksdb_env_t,
} }
@ -381,8 +381,8 @@ unsafe impl Send for BlockBasedOptions {}
unsafe impl Send for CuckooTableOptions {} unsafe impl Send for CuckooTableOptions {}
unsafe impl Send for ReadOptions {} unsafe impl Send for ReadOptions {}
unsafe impl Send for IngestExternalFileOptions {} unsafe impl Send for IngestExternalFileOptions {}
unsafe impl Send for Cache {} unsafe impl Send for CacheWrapper {}
unsafe impl Send for Env {} unsafe impl Send for EnvWrapper {}
// Sync is similarly safe for many types because they do not expose interior mutability, and their // Sync is similarly safe for many types because they do not expose interior mutability, and their
// use within the rocksdb library is generally behind a const reference // use within the rocksdb library is generally behind a const reference
@ -392,8 +392,8 @@ unsafe impl Sync for BlockBasedOptions {}
unsafe impl Sync for CuckooTableOptions {} unsafe impl Sync for CuckooTableOptions {}
unsafe impl Sync for ReadOptions {} unsafe impl Sync for ReadOptions {}
unsafe impl Sync for IngestExternalFileOptions {} unsafe impl Sync for IngestExternalFileOptions {}
unsafe impl Sync for Cache {} unsafe impl Sync for CacheWrapper {}
unsafe impl Sync for Env {} unsafe impl Sync for EnvWrapper {}
impl Drop for Options { impl Drop for Options {
fn drop(&mut self) { fn drop(&mut self) {

@ -28,9 +28,9 @@ pub(crate) unsafe fn raw_data(ptr: *const c_char, size: usize) -> Option<Vec<u8>
if ptr.is_null() { if ptr.is_null() {
None None
} else { } else {
let mut dst = Vec::with_capacity(size); let mut dst = vec![0; size];
ptr::copy_nonoverlapping(ptr as *const u8, dst.as_mut_ptr(), size); ptr::copy_nonoverlapping(ptr as *const u8, dst.as_mut_ptr(), size);
dst.set_len(size);
Some(dst) Some(dst)
} }
} }

@ -168,6 +168,8 @@ impl fmt::Display for Error {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::{ use super::{
column_family::UnboundColumnFamily,
db_options::{CacheWrapper, EnvWrapper},
BlockBasedOptions, BoundColumnFamily, Cache, ColumnFamily, ColumnFamilyDescriptor, BlockBasedOptions, BoundColumnFamily, Cache, ColumnFamily, ColumnFamilyDescriptor,
DBIterator, DBRawIterator, Env, IngestExternalFileOptions, Options, DBIterator, DBRawIterator, Env, IngestExternalFileOptions, Options,
PlainTableFactoryOptions, ReadOptions, Snapshot, SstFileWriter, WriteBatch, WriteOptions, PlainTableFactoryOptions, ReadOptions, Snapshot, SstFileWriter, WriteBatch, WriteOptions,
@ -196,10 +198,13 @@ mod test {
is_send::<ColumnFamilyDescriptor>(); is_send::<ColumnFamilyDescriptor>();
is_send::<ColumnFamily>(); is_send::<ColumnFamily>();
is_send::<BoundColumnFamily<'_>>(); is_send::<BoundColumnFamily<'_>>();
is_send::<UnboundColumnFamily>();
is_send::<SstFileWriter>(); is_send::<SstFileWriter>();
is_send::<WriteBatch>(); is_send::<WriteBatch>();
is_send::<Cache>(); is_send::<Cache>();
is_send::<CacheWrapper>();
is_send::<Env>(); is_send::<Env>();
is_send::<EnvWrapper>();
} }
#[test] #[test]
@ -218,9 +223,12 @@ mod test {
is_sync::<IngestExternalFileOptions>(); is_sync::<IngestExternalFileOptions>();
is_sync::<BlockBasedOptions>(); is_sync::<BlockBasedOptions>();
is_sync::<PlainTableFactoryOptions>(); is_sync::<PlainTableFactoryOptions>();
is_sync::<UnboundColumnFamily>();
is_sync::<ColumnFamilyDescriptor>(); is_sync::<ColumnFamilyDescriptor>();
is_sync::<SstFileWriter>(); is_sync::<SstFileWriter>();
is_sync::<Cache>(); is_sync::<Cache>();
is_sync::<CacheWrapper>();
is_sync::<Env>(); is_sync::<Env>();
is_sync::<EnvWrapper>();
} }
} }

@ -47,6 +47,9 @@ fn test_column_family() {
let mut opts = Options::default(); let mut opts = Options::default();
opts.create_if_missing(true); opts.create_if_missing(true);
opts.set_merge_operator_associative("test operator", test_provided_merge); opts.set_merge_operator_associative("test operator", test_provided_merge);
#[cfg(feature = "multi-threaded-cf")]
let db = DB::open(&opts, &n).unwrap();
#[cfg(not(feature = "multi-threaded-cf"))]
let mut db = DB::open(&opts, &n).unwrap(); let mut db = DB::open(&opts, &n).unwrap();
let opts = Options::default(); let opts = Options::default();
match db.create_cf("cf1", &opts) { match db.create_cf("cf1", &opts) {
@ -97,7 +100,11 @@ fn test_column_family() {
{} {}
// should b able to drop a cf // should b able to drop a cf
{ {
#[cfg(feature = "multi-threaded-cf")]
let db = DB::open_cf(&Options::default(), &n, &["cf1"]).unwrap();
#[cfg(not(feature = "multi-threaded-cf"))]
let mut db = DB::open_cf(&Options::default(), &n, &["cf1"]).unwrap(); let mut db = DB::open_cf(&Options::default(), &n, &["cf1"]).unwrap();
match db.drop_cf("cf1") { match db.drop_cf("cf1") {
Ok(_) => println!("cf1 successfully dropped."), Ok(_) => println!("cf1 successfully dropped."),
Err(e) => panic!("failed to drop column family: {}", e), Err(e) => panic!("failed to drop column family: {}", e),
@ -114,6 +121,9 @@ fn test_can_open_db_with_results_of_list_cf() {
{ {
let mut opts = Options::default(); let mut opts = Options::default();
opts.create_if_missing(true); opts.create_if_missing(true);
#[cfg(feature = "multi-threaded-cf")]
let db = DB::open(&opts, &n).unwrap();
#[cfg(not(feature = "multi-threaded-cf"))]
let mut db = DB::open(&opts, &n).unwrap(); let mut db = DB::open(&opts, &n).unwrap();
let opts = Options::default(); let opts = Options::default();
@ -261,10 +271,10 @@ fn test_create_duplicate_column_family() {
opts.create_if_missing(true); opts.create_if_missing(true);
opts.create_missing_column_families(true); opts.create_missing_column_families(true);
let mut db = match DB::open_cf(&opts, &n, &["cf1"]) { #[cfg(feature = "multi-threaded-cf")]
Ok(d) => d, let db = DB::open_cf(&opts, &n, &["cf1"]).unwrap();
Err(e) => panic!("failed to create new column family: {}", e), #[cfg(not(feature = "multi-threaded-cf"))]
}; let mut db = DB::open_cf(&opts, &n, &["cf1"]).unwrap();
assert!(db.create_cf("cf1", &opts).is_err()); assert!(db.create_cf("cf1", &opts).is_err());
} }
@ -282,6 +292,9 @@ fn test_no_leaked_column_family() {
write_options.set_sync(false); write_options.set_sync(false);
write_options.disable_wal(true); write_options.disable_wal(true);
#[cfg(feature = "multi-threaded-cf")]
let db = DB::open(&opts, &n).unwrap();
#[cfg(not(feature = "multi-threaded-cf"))]
let mut db = DB::open(&opts, &n).unwrap(); let mut db = DB::open(&opts, &n).unwrap();
let large_blob = [0x20; 1024 * 1024]; let large_blob = [0x20; 1024 * 1024];

@ -35,6 +35,9 @@ fn property_cf_test() {
let n = DBPath::new("_rust_rocksdb_property_cf_test"); let n = DBPath::new("_rust_rocksdb_property_cf_test");
{ {
let opts = Options::default(); let opts = Options::default();
#[cfg(feature = "multi-threaded-cf")]
let db = DB::open_default(&n).unwrap();
#[cfg(not(feature = "multi-threaded-cf"))]
let mut db = DB::open_default(&n).unwrap(); let mut db = DB::open_default(&n).unwrap();
db.create_cf("cf1", &opts).unwrap(); db.create_cf("cf1", &opts).unwrap();
let cf = db.cf_handle("cf1").unwrap(); let cf = db.cf_handle("cf1").unwrap();
@ -62,6 +65,9 @@ fn property_int_cf_test() {
let n = DBPath::new("_rust_rocksdb_property_int_cf_test"); let n = DBPath::new("_rust_rocksdb_property_int_cf_test");
{ {
let opts = Options::default(); let opts = Options::default();
#[cfg(feature = "multi-threaded-cf")]
let db = DB::open_default(&n).unwrap();
#[cfg(not(feature = "multi-threaded-cf"))]
let mut db = DB::open_default(&n).unwrap(); let mut db = DB::open_default(&n).unwrap();
db.create_cf("cf1", &opts).unwrap(); db.create_cf("cf1", &opts).unwrap();
let cf = db.cf_handle("cf1").unwrap(); let cf = db.cf_handle("cf1").unwrap();

Loading…
Cancel
Save