Add BlobDB options (#610)

master
Friedel Ziegelmayer 3 years ago committed by GitHub
parent 16e4eaf1c7
commit 3a46074c36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 83
      src/db_options.rs
  2. 135
      tests/test_db.rs

@ -3031,6 +3031,89 @@ impl Options {
); );
} }
} }
/// Enable the use of key-value separation.
///
/// More details can be found here: http://rocksdb.org/blog/2021/05/26/integrated-blob-db.html.
///
/// Default: false (disable)
///
/// Dynamically changeable through SetOptions() API
pub fn set_enable_blob_files(&mut self, val: bool) {
unsafe {
ffi::rocksdb_options_set_enable_blob_files(self.inner, val as _);
}
}
/// Sets the minimum threshold value at or above which will be written
/// to blob files during flush or compaction.
///
/// Dynamically changeable through SetOptions() API
pub fn set_min_blob_size(&mut self, val: u64) {
unsafe {
ffi::rocksdb_options_set_min_blob_size(self.inner, val);
}
}
/// Sets the size limit for blob files.
///
/// Dynamically changeable through SetOptions() API
pub fn set_blob_file_size(&mut self, val: u64) {
unsafe {
ffi::rocksdb_options_set_blob_file_size(self.inner, val);
}
}
/// Sets the blob compression type. All blob files use the same
/// compression type.
///
/// Dynamically changeable through SetOptions() API
pub fn set_blob_compression_type(&mut self, val: DBCompressionType) {
unsafe {
ffi::rocksdb_options_set_blob_compression_type(self.inner, val as _);
}
}
/// If this is set to true RocksDB will actively relocate valid blobs from the oldest blob files
/// as they are encountered during compaction.
///
/// Dynamically changeable through SetOptions() API
pub fn set_enable_blob_gc(&mut self, val: bool) {
unsafe {
ffi::rocksdb_options_set_enable_blob_gc(self.inner, val as _);
}
}
/// Sets the threshold that the GC logic uses to determine which blob files should be considered “old.”
///
/// For example, the default value of 0.25 signals to RocksDB that blobs residing in the
/// oldest 25% of blob files should be relocated by GC. This parameter can be tuned to adjust
/// the trade-off between write amplification and space amplification.
///
/// Dynamically changeable through SetOptions() API
pub fn set_blob_gc_age_cutoff(&mut self, val: c_double) {
unsafe {
ffi::rocksdb_options_set_blob_gc_age_cutoff(self.inner, val);
}
}
/// Sets the blob GC force threshold.
///
/// Dynamically changeable through SetOptions() API
pub fn set_blob_gc_force_threshold(&mut self, val: c_double) {
unsafe {
ffi::rocksdb_options_set_blob_gc_force_threshold(self.inner, val);
}
}
/// Sets the blob compaction read ahead size.
///
/// Dynamically changeable through SetOptions() API
pub fn set_blob_compaction_readahead_size(&mut self, val: u64) {
unsafe {
ffi::rocksdb_options_set_blob_compaction_readahead_size(self.inner, val);
}
}
} }
impl Default for Options { impl Default for Options {

@ -909,6 +909,141 @@ fn get_with_cache_and_bulkload_test() {
} }
} }
#[test]
fn get_with_cache_and_bulkload_and_blobs_test() {
let path = DBPath::new("_rust_rocksdb_get_with_cache_and_bulkload_and_blobs_test");
let log_path = DBPath::new("_rust_rocksdb_log_path_test");
// create options
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.set_wal_bytes_per_sync(8 << 10); // 8KB
opts.set_writable_file_max_buffer_size(512 << 10); // 512KB
opts.set_enable_write_thread_adaptive_yield(true);
opts.set_unordered_write(true);
opts.set_max_subcompactions(2);
opts.set_max_background_jobs(4);
opts.set_use_adaptive_mutex(true);
opts.set_db_log_dir(&log_path);
opts.set_memtable_whole_key_filtering(true);
opts.set_dump_malloc_stats(true);
opts.set_enable_blob_files(true);
opts.set_min_blob_size(256); // set small to ensure it is actually used
// trigger all sst files in L1/2 instead of L0
opts.set_max_bytes_for_level_base(64 << 10); // 64KB
{
// set block based table and cache
let cache = Cache::new_lru_cache(512 << 10).unwrap();
assert_eq!(cache.get_usage(), 0);
let mut block_based_opts = BlockBasedOptions::default();
block_based_opts.set_block_cache(&cache);
block_based_opts.set_cache_index_and_filter_blocks(true);
opts.set_block_based_table_factory(&block_based_opts);
// open db
let db = DB::open(&opts, &path).unwrap();
// write a lot
let mut batch = WriteBatch::default();
for i in 0..10_000 {
batch.put(format!("{:0>4}", i).as_bytes(), b"v");
}
assert!(db.write(batch).is_ok());
// flush memory table to sst and manual compaction
assert!(db.flush().is_ok());
db.compact_range(Some(format!("{:0>4}", 0).as_bytes()), None::<Vec<u8>>);
// get -> trigger caching
let _ = db.get(b"1");
assert!(cache.get_usage() > 0);
// get approximated memory usage
let mem_usage = get_memory_usage_stats(Some(&[&db]), None).unwrap();
assert!(mem_usage.mem_table_total > 0);
// get approximated cache usage
let mem_usage = get_memory_usage_stats(None, Some(&[&cache])).unwrap();
assert!(mem_usage.cache_total > 0);
}
// bulk loading
{
// open db
let db = DB::open(&opts, &path).unwrap();
// try to get key
let iter = db.iterator(IteratorMode::Start);
for (expected, (k, _)) in iter.enumerate() {
assert_eq!(k.as_ref(), format!("{:0>4}", expected).as_bytes());
}
// check live files (sst files meta)
let livefiles = db.live_files().unwrap();
assert_eq!(livefiles.len(), 1);
livefiles.iter().for_each(|f| {
assert_eq!(f.level, 2);
assert_eq!(f.column_family_name, "default");
assert!(!f.name.is_empty());
assert_eq!(
f.start_key.as_ref().unwrap().as_slice(),
format!("{:0>4}", 0).as_bytes()
);
assert_eq!(
f.end_key.as_ref().unwrap().as_slice(),
format!("{:0>4}", 9999).as_bytes()
);
assert_eq!(f.num_entries, 10000);
assert_eq!(f.num_deletions, 0);
});
// delete sst file in range (except L0)
assert!(db
.delete_file_in_range(
format!("{:0>4}", 0).as_bytes(),
format!("{:0>4}", 9999).as_bytes()
)
.is_ok());
let livefiles = db.live_files().unwrap();
assert_eq!(livefiles.len(), 0);
// try to get a deleted key
assert!(db.get(format!("{:0>4}", 123).as_bytes()).unwrap().is_none());
}
// raise error when db exists
{
opts.set_error_if_exists(true);
assert!(DB::open(&opts, &path).is_err());
}
// disable all threads
{
// create new options
let mut opts = Options::default();
opts.set_max_background_jobs(0);
opts.set_stats_dump_period_sec(0);
opts.set_stats_persist_period_sec(0);
// test Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM);
let mut env = Env::default().unwrap();
env.set_bottom_priority_background_threads(0);
opts.set_env(&env);
// open db
let db = DB::open(&opts, &path).unwrap();
// try to get key
let iter = db.iterator(IteratorMode::Start);
for (expected, (k, _)) in iter.enumerate() {
assert_eq!(k.as_ref(), format!("{:0>4}", expected).as_bytes());
}
}
}
#[test] #[test]
fn test_open_for_read_only() { fn test_open_for_read_only() {
let path = DBPath::new("_rust_rocksdb_test_open_for_read_only"); let path = DBPath::new("_rust_rocksdb_test_open_for_read_only");

Loading…
Cancel
Save