From 3a46074c36a3f1f1f066ca0c9be1a2efc91563e2 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Thu, 21 Apr 2022 10:53:32 +0200 Subject: [PATCH] Add BlobDB options (#610) --- src/db_options.rs | 83 ++++++++++++++++++++++++++++ tests/test_db.rs | 135 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 218 insertions(+) diff --git a/src/db_options.rs b/src/db_options.rs index c9d51c0..9113960 100644 --- a/src/db_options.rs +++ b/src/db_options.rs @@ -3031,6 +3031,89 @@ impl Options { ); } } + + /// Enable the use of key-value separation. + /// + /// More details can be found here: http://rocksdb.org/blog/2021/05/26/integrated-blob-db.html. + /// + /// Default: false (disable) + /// + /// Dynamically changeable through SetOptions() API + pub fn set_enable_blob_files(&mut self, val: bool) { + unsafe { + ffi::rocksdb_options_set_enable_blob_files(self.inner, val as _); + } + } + + /// Sets the minimum threshold value at or above which will be written + /// to blob files during flush or compaction. + /// + /// Dynamically changeable through SetOptions() API + pub fn set_min_blob_size(&mut self, val: u64) { + unsafe { + ffi::rocksdb_options_set_min_blob_size(self.inner, val); + } + } + + /// Sets the size limit for blob files. + /// + /// Dynamically changeable through SetOptions() API + pub fn set_blob_file_size(&mut self, val: u64) { + unsafe { + ffi::rocksdb_options_set_blob_file_size(self.inner, val); + } + } + + /// Sets the blob compression type. All blob files use the same + /// compression type. + /// + /// Dynamically changeable through SetOptions() API + pub fn set_blob_compression_type(&mut self, val: DBCompressionType) { + unsafe { + ffi::rocksdb_options_set_blob_compression_type(self.inner, val as _); + } + } + + /// If this is set to true RocksDB will actively relocate valid blobs from the oldest blob files + /// as they are encountered during compaction. + /// + /// Dynamically changeable through SetOptions() API + pub fn set_enable_blob_gc(&mut self, val: bool) { + unsafe { + ffi::rocksdb_options_set_enable_blob_gc(self.inner, val as _); + } + } + + /// Sets the threshold that the GC logic uses to determine which blob files should be considered “old.” + /// + /// For example, the default value of 0.25 signals to RocksDB that blobs residing in the + /// oldest 25% of blob files should be relocated by GC. This parameter can be tuned to adjust + /// the trade-off between write amplification and space amplification. + /// + /// Dynamically changeable through SetOptions() API + pub fn set_blob_gc_age_cutoff(&mut self, val: c_double) { + unsafe { + ffi::rocksdb_options_set_blob_gc_age_cutoff(self.inner, val); + } + } + + /// Sets the blob GC force threshold. + /// + /// Dynamically changeable through SetOptions() API + pub fn set_blob_gc_force_threshold(&mut self, val: c_double) { + unsafe { + ffi::rocksdb_options_set_blob_gc_force_threshold(self.inner, val); + } + } + + /// Sets the blob compaction read ahead size. + /// + /// Dynamically changeable through SetOptions() API + pub fn set_blob_compaction_readahead_size(&mut self, val: u64) { + unsafe { + ffi::rocksdb_options_set_blob_compaction_readahead_size(self.inner, val); + } + } } impl Default for Options { diff --git a/tests/test_db.rs b/tests/test_db.rs index b6b506a..7aa93aa 100644 --- a/tests/test_db.rs +++ b/tests/test_db.rs @@ -909,6 +909,141 @@ fn get_with_cache_and_bulkload_test() { } } +#[test] +fn get_with_cache_and_bulkload_and_blobs_test() { + let path = DBPath::new("_rust_rocksdb_get_with_cache_and_bulkload_and_blobs_test"); + let log_path = DBPath::new("_rust_rocksdb_log_path_test"); + + // create options + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + opts.set_wal_bytes_per_sync(8 << 10); // 8KB + opts.set_writable_file_max_buffer_size(512 << 10); // 512KB + opts.set_enable_write_thread_adaptive_yield(true); + opts.set_unordered_write(true); + opts.set_max_subcompactions(2); + opts.set_max_background_jobs(4); + opts.set_use_adaptive_mutex(true); + opts.set_db_log_dir(&log_path); + opts.set_memtable_whole_key_filtering(true); + opts.set_dump_malloc_stats(true); + opts.set_enable_blob_files(true); + opts.set_min_blob_size(256); // set small to ensure it is actually used + + // trigger all sst files in L1/2 instead of L0 + opts.set_max_bytes_for_level_base(64 << 10); // 64KB + + { + // set block based table and cache + let cache = Cache::new_lru_cache(512 << 10).unwrap(); + assert_eq!(cache.get_usage(), 0); + let mut block_based_opts = BlockBasedOptions::default(); + block_based_opts.set_block_cache(&cache); + block_based_opts.set_cache_index_and_filter_blocks(true); + opts.set_block_based_table_factory(&block_based_opts); + + // open db + let db = DB::open(&opts, &path).unwrap(); + + // write a lot + let mut batch = WriteBatch::default(); + for i in 0..10_000 { + batch.put(format!("{:0>4}", i).as_bytes(), b"v"); + } + assert!(db.write(batch).is_ok()); + + // flush memory table to sst and manual compaction + assert!(db.flush().is_ok()); + db.compact_range(Some(format!("{:0>4}", 0).as_bytes()), None::>); + + // get -> trigger caching + let _ = db.get(b"1"); + assert!(cache.get_usage() > 0); + + // get approximated memory usage + let mem_usage = get_memory_usage_stats(Some(&[&db]), None).unwrap(); + assert!(mem_usage.mem_table_total > 0); + + // get approximated cache usage + let mem_usage = get_memory_usage_stats(None, Some(&[&cache])).unwrap(); + assert!(mem_usage.cache_total > 0); + } + + // bulk loading + { + // open db + let db = DB::open(&opts, &path).unwrap(); + + // try to get key + let iter = db.iterator(IteratorMode::Start); + for (expected, (k, _)) in iter.enumerate() { + assert_eq!(k.as_ref(), format!("{:0>4}", expected).as_bytes()); + } + + // check live files (sst files meta) + let livefiles = db.live_files().unwrap(); + assert_eq!(livefiles.len(), 1); + livefiles.iter().for_each(|f| { + assert_eq!(f.level, 2); + assert_eq!(f.column_family_name, "default"); + assert!(!f.name.is_empty()); + assert_eq!( + f.start_key.as_ref().unwrap().as_slice(), + format!("{:0>4}", 0).as_bytes() + ); + assert_eq!( + f.end_key.as_ref().unwrap().as_slice(), + format!("{:0>4}", 9999).as_bytes() + ); + assert_eq!(f.num_entries, 10000); + assert_eq!(f.num_deletions, 0); + }); + + // delete sst file in range (except L0) + assert!(db + .delete_file_in_range( + format!("{:0>4}", 0).as_bytes(), + format!("{:0>4}", 9999).as_bytes() + ) + .is_ok()); + let livefiles = db.live_files().unwrap(); + assert_eq!(livefiles.len(), 0); + + // try to get a deleted key + assert!(db.get(format!("{:0>4}", 123).as_bytes()).unwrap().is_none()); + } + + // raise error when db exists + { + opts.set_error_if_exists(true); + assert!(DB::open(&opts, &path).is_err()); + } + + // disable all threads + { + // create new options + let mut opts = Options::default(); + opts.set_max_background_jobs(0); + opts.set_stats_dump_period_sec(0); + opts.set_stats_persist_period_sec(0); + + // test Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM); + let mut env = Env::default().unwrap(); + env.set_bottom_priority_background_threads(0); + opts.set_env(&env); + + // open db + let db = DB::open(&opts, &path).unwrap(); + + // try to get key + let iter = db.iterator(IteratorMode::Start); + for (expected, (k, _)) in iter.enumerate() { + assert_eq!(k.as_ref(), format!("{:0>4}", expected).as_bytes()); + } + } +} + #[test] fn test_open_for_read_only() { let path = DBPath::new("_rust_rocksdb_test_open_for_read_only");