Add drop_cf function to TransactionDB (#721)

master
Abhay Bothra 2 years ago committed by GitHub
parent b5b9e56d08
commit 0da4a7ac31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      .gitignore
  2. 36
      src/transactions/transaction_db.rs
  3. 128
      tests/test_column_family.rs

1
.gitignore vendored

@ -8,3 +8,4 @@ tags
path path
.DS_Store .DS_Store
.idea .idea
.vscode

@ -931,6 +931,23 @@ impl<T: ThreadMode> TransactionDB<T> {
pub fn snapshot(&self) -> SnapshotWithThreadMode<Self> { pub fn snapshot(&self) -> SnapshotWithThreadMode<Self> {
SnapshotWithThreadMode::<Self>::new(self) SnapshotWithThreadMode::<Self>::new(self)
} }
fn drop_column_family<C>(
&self,
cf_inner: *mut ffi::rocksdb_column_family_handle_t,
_cf: C,
) -> Result<(), Error> {
unsafe {
// first mark the column family as dropped
ffi_try!(ffi::rocksdb_drop_column_family(
self.inner as *mut ffi::rocksdb_t,
cf_inner
));
}
// Since `_cf` is dropped here, the column family handle is destroyed
// and any resources (mem, files) are reclaimed.
Ok(())
}
} }
impl TransactionDB<SingleThreaded> { impl TransactionDB<SingleThreaded> {
@ -947,6 +964,15 @@ impl TransactionDB<SingleThreaded> {
pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> { pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
self.cfs.cfs.get(name) self.cfs.cfs.get(name)
} }
/// Drops the column family with the given name
pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
if let Some(cf) = self.cfs.cfs.remove(name) {
self.drop_column_family(cf.inner, cf)
} else {
Err(Error::new(format!("Invalid column family: {name}")))
}
}
} }
impl TransactionDB<MultiThreaded> { impl TransactionDB<MultiThreaded> {
@ -970,6 +996,16 @@ impl TransactionDB<MultiThreaded> {
.cloned() .cloned()
.map(UnboundColumnFamily::bound_column_family) .map(UnboundColumnFamily::bound_column_family)
} }
/// Drops the column family with the given name by internally locking the inner column
/// family map. This avoids needing `&mut self` reference
pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
if let Some(cf) = self.cfs.cfs.write().unwrap().remove(name) {
self.drop_column_family(cf.inner, cf)
} else {
Err(Error::new(format!("Invalid column family: {name}")))
}
}
} }
impl<T: ThreadMode> Drop for TransactionDB<T> { impl<T: ThreadMode> Drop for TransactionDB<T> {

@ -17,12 +17,18 @@ mod util;
use pretty_assertions::assert_eq; use pretty_assertions::assert_eq;
use rocksdb::{ColumnFamilyDescriptor, MergeOperands, Options, DB, DEFAULT_COLUMN_FAMILY_NAME}; use rocksdb::{ColumnFamilyDescriptor, MergeOperands, Options, DB, DEFAULT_COLUMN_FAMILY_NAME};
use rocksdb::{TransactionDB, TransactionDBOptions};
use util::DBPath; use util::DBPath;
use std::fs; use std::fs;
use std::io; use std::io;
use std::path::Path; use std::path::Path;
#[cfg(feature = "multi-threaded-cf")]
use rocksdb::MultiThreaded;
#[cfg(not(feature = "multi-threaded-cf"))]
use rocksdb::SingleThreaded;
fn dir_size(path: impl AsRef<Path>) -> io::Result<u64> { fn dir_size(path: impl AsRef<Path>) -> io::Result<u64> {
fn dir_size(mut dir: fs::ReadDir) -> io::Result<u64> { fn dir_size(mut dir: fs::ReadDir) -> io::Result<u64> {
dir.try_fold(0, |acc, file| { dir.try_fold(0, |acc, file| {
@ -112,6 +118,128 @@ fn test_column_family() {
} }
} }
#[test]
fn test_column_family_with_transactiondb() {
let n = DBPath::new("_rust_rocksdb_cftest");
// should be able to create column families
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_merge_operator_associative("test operator", test_provided_merge);
#[cfg(feature = "multi-threaded-cf")]
let db = TransactionDB::open(&opts, &TransactionDBOptions::default(), &n).unwrap();
#[cfg(not(feature = "multi-threaded-cf"))]
let db = TransactionDB::open(&opts, &TransactionDBOptions::default(), &n).unwrap();
let opts = Options::default();
match db.create_cf("cf1", &opts) {
Ok(()) => println!("cf1 created successfully"),
Err(e) => {
panic!("could not create column family: {}", e);
}
}
}
// should fail to open db without specifying same column families
{
let mut opts = Options::default();
opts.set_merge_operator_associative("test operator", test_provided_merge);
#[cfg(feature = "multi-threaded-cf")]
let db = TransactionDB::<MultiThreaded>::open(&opts, &TransactionDBOptions::default(), &n);
#[cfg(not(feature = "multi-threaded-cf"))]
let db = TransactionDB::<SingleThreaded>::open(&opts, &TransactionDBOptions::default(), &n);
match db {
Ok(_db) => panic!(
"should not have opened TransactionDB successfully without \
specifying column
families"
),
Err(e) => assert!(e.to_string().starts_with("Invalid argument")),
}
}
// should properly open db when specyfing all column families
{
let mut opts = Options::default();
opts.set_merge_operator_associative("test operator", test_provided_merge);
let cfs = &["cf1"];
#[cfg(feature = "multi-threaded-cf")]
let db = TransactionDB::<MultiThreaded>::open_cf(
&opts,
&TransactionDBOptions::default(),
&n,
cfs,
);
#[cfg(not(feature = "multi-threaded-cf"))]
let db = TransactionDB::<SingleThreaded>::open_cf(
&opts,
&TransactionDBOptions::default(),
&n,
cfs,
);
match db {
Ok(_db) => println!("successfully opened db with column family"),
Err(e) => panic!("failed to open db with column family: {}", e),
}
}
// should be able to list a cf
{
let opts = Options::default();
let vec = DB::list_cf(&opts, &n);
match vec {
Ok(vec) => assert_eq!(vec, vec![DEFAULT_COLUMN_FAMILY_NAME, "cf1"]),
Err(e) => panic!("failed to drop column family: {}", e),
}
}
// should b able to drop a cf
{
let opts = Options::default();
let cfs = &["cf1"];
#[cfg(feature = "multi-threaded-cf")]
let mut db = TransactionDB::<MultiThreaded>::open_cf(
&opts,
&TransactionDBOptions::default(),
&n,
cfs,
)
.unwrap();
#[cfg(not(feature = "multi-threaded-cf"))]
let mut db = TransactionDB::<SingleThreaded>::open_cf(
&opts,
&TransactionDBOptions::default(),
&n,
cfs,
)
.unwrap();
match db.drop_cf("cf1") {
Ok(_) => println!("cf1 successfully dropped."),
Err(e) => panic!("failed to drop column family: {}", e),
}
}
// should not be able to open cf after dropping.
{
let opts = Options::default();
let cfs = &["cf1"];
#[cfg(feature = "multi-threaded-cf")]
let db = TransactionDB::<MultiThreaded>::open_cf(
&opts,
&TransactionDBOptions::default(),
&n,
cfs,
);
#[cfg(not(feature = "multi-threaded-cf"))]
let db = TransactionDB::<SingleThreaded>::open_cf(
&opts,
&TransactionDBOptions::default(),
&n,
cfs,
);
assert!(db.is_err())
}
}
#[test] #[test]
fn test_can_open_db_with_results_of_list_cf() { fn test_can_open_db_with_results_of_list_cf() {
// Test scenario derived from GitHub issue #175 and 177 // Test scenario derived from GitHub issue #175 and 177

Loading…
Cancel
Save