Expose flush_cfs_opt to flush multiple column families (#793)

master
Zhanhui Li 2 years ago committed by GitHub
parent e0dab6a098
commit 023f8298ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 22
      src/db.rs
  2. 45
      tests/test_db.rs

@ -915,6 +915,28 @@ impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
Ok(()) Ok(())
} }
/// Flushes multiple column families.
///
/// If atomic flush is not enabled, it is equivalent to calling flush_cf multiple times.
/// If atomic flush is enabled, it will flush all column families specified in `cfs` up to the latest sequence
/// number at the time when flush is requested.
pub fn flush_cfs_opt(
&self,
cfs: &[&impl AsColumnFamilyRef],
opts: &FlushOptions,
) -> Result<(), Error> {
let mut cfs = cfs.iter().map(|cf| cf.inner()).collect::<Vec<_>>();
unsafe {
ffi_try!(ffi::rocksdb_flush_cfs(
self.inner.inner(),
opts.inner,
cfs.as_mut_ptr(),
cfs.len() as libc::c_int,
));
}
Ok(())
}
/// Flushes database memtables to SST files on the disk for a given column family using default /// Flushes database memtables to SST files on the disk for a given column family using default
/// options. /// options.
pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> { pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> {

@ -1397,3 +1397,48 @@ fn cuckoo() {
assert!(db.get(b"k1").unwrap().is_none()); assert!(db.get(b"k1").unwrap().is_none());
} }
} }
#[test]
fn test_atomic_flush_cfs() {
let n = DBPath::new("_rust_rocksdb_atomic_flush_cfs");
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.set_atomic_flush(true);
let db = DB::open_cf(&opts, &n, ["cf1", "cf2"]).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
let cf2 = db.cf_handle("cf2").unwrap();
let mut write_options = rocksdb::WriteOptions::new();
write_options.disable_wal(true);
db.put_cf_opt(&cf1, "k11", "v11", &write_options).unwrap();
db.put_cf_opt(&cf2, "k21", "v21", &write_options).unwrap();
let mut opts = rocksdb::FlushOptions::new();
opts.set_wait(true);
db.flush_cfs_opt(&[&cf1, &cf2], &opts).unwrap();
}
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.set_atomic_flush(true);
let db = DB::open_cf(&opts, &n, ["cf1", "cf2"]).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
let cf2 = db.cf_handle("cf2").unwrap();
assert_eq!(
db.get_cf(&cf1, "k11").unwrap(),
Some("v11".as_bytes().to_vec())
);
assert_eq!(
db.get_cf(&cf2, "k21").unwrap(),
Some("v21".as_bytes().to_vec())
);
}
}

Loading…
Cancel
Save