From 023f8298cafc35a09a53c71c6cda427b3ef7f830 Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Sat, 24 Jun 2023 00:47:32 +0800 Subject: [PATCH] Expose flush_cfs_opt to flush multiple column families (#793) --- src/db.rs | 22 ++++++++++++++++++++++ tests/test_db.rs | 45 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/src/db.rs b/src/db.rs index ac3ce10..7c0345c 100644 --- a/src/db.rs +++ b/src/db.rs @@ -915,6 +915,28 @@ impl DBCommon { Ok(()) } + /// Flushes multiple column families. + /// + /// If atomic flush is not enabled, it is equivalent to calling flush_cf multiple times. + /// If atomic flush is enabled, it will flush all column families specified in `cfs` up to the latest sequence + /// number at the time when flush is requested. + pub fn flush_cfs_opt( + &self, + cfs: &[&impl AsColumnFamilyRef], + opts: &FlushOptions, + ) -> Result<(), Error> { + let mut cfs = cfs.iter().map(|cf| cf.inner()).collect::>(); + unsafe { + ffi_try!(ffi::rocksdb_flush_cfs( + self.inner.inner(), + opts.inner, + cfs.as_mut_ptr(), + cfs.len() as libc::c_int, + )); + } + Ok(()) + } + /// Flushes database memtables to SST files on the disk for a given column family using default /// options. pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> { diff --git a/tests/test_db.rs b/tests/test_db.rs index 76b2c19..a4563e3 100644 --- a/tests/test_db.rs +++ b/tests/test_db.rs @@ -1397,3 +1397,48 @@ fn cuckoo() { assert!(db.get(b"k1").unwrap().is_none()); } } + +#[test] +fn test_atomic_flush_cfs() { + let n = DBPath::new("_rust_rocksdb_atomic_flush_cfs"); + { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + opts.set_atomic_flush(true); + + let db = DB::open_cf(&opts, &n, ["cf1", "cf2"]).unwrap(); + let cf1 = db.cf_handle("cf1").unwrap(); + let cf2 = db.cf_handle("cf2").unwrap(); + + let mut write_options = rocksdb::WriteOptions::new(); + write_options.disable_wal(true); + + db.put_cf_opt(&cf1, "k11", "v11", &write_options).unwrap(); + db.put_cf_opt(&cf2, "k21", "v21", &write_options).unwrap(); + + let mut opts = rocksdb::FlushOptions::new(); + opts.set_wait(true); + db.flush_cfs_opt(&[&cf1, &cf2], &opts).unwrap(); + } + + { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + opts.set_atomic_flush(true); + + let db = DB::open_cf(&opts, &n, ["cf1", "cf2"]).unwrap(); + let cf1 = db.cf_handle("cf1").unwrap(); + let cf2 = db.cf_handle("cf2").unwrap(); + + assert_eq!( + db.get_cf(&cf1, "k11").unwrap(), + Some("v11".as_bytes().to_vec()) + ); + assert_eq!( + db.get_cf(&cf2, "k21").unwrap(), + Some("v21".as_bytes().to_vec()) + ); + } +}