Support specifying blob garbage collection parameters when CompactRange() (#10073)

Summary:
Garbage collection is generally controlled by the BlobDB configuration options `enable_blob_garbage_collection` and `blob_garbage_collection_age_cutoff`. However, there might be use cases where we would want to temporarily override these options while performing a manual compaction. (One use case would be doing a full key-space manual compaction with full=100% garbage collection age cutoff in order to minimize the space occupied by the database.) Our goal here is to make it possible to override the configured GC parameters when using the `CompactRange` API to perform manual compactions. This PR would involve:

- Extending the `CompactRangeOptions` structure so clients can both force-enable and force-disable GC, as well as use a different cutoff than what's currently configured
- Storing whether blob GC should actually be enabled during a certain manual compaction and the cutoff to use in the `Compaction` object (considering the above overrides) and passing it to `CompactionIterator` via `CompactionProxy`
- Updating the BlobDB wiki to document the new options.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10073

Test Plan: Adding unit tests and adding the new options to the stress test tool.

Reviewed By: ltamasi

Differential Revision: D36848700

Pulled By: gangliao

fbshipit-source-id: c878ef101d1c612429999f513453c319f75d78e9
main
Gang Liao 3 years ago committed by Facebook GitHub Bot
parent 65893ad959
commit 3dc6ebaf74
  1. 1
      HISTORY.md
  2. 22
      db/compaction/compaction.cc
  3. 30
      db/compaction/compaction.h
  4. 5
      db/compaction/compaction_iterator.h
  5. 12
      db/compaction/compaction_picker.cc
  6. 68
      db/db_compaction_test.cc
  7. 16
      db_stress_tool/db_stress_test_base.cc
  8. 24
      include/rocksdb/options.h

@ -18,6 +18,7 @@
### New Features ### New Features
* Add FileSystem::ReadAsync API in io_tracing * Add FileSystem::ReadAsync API in io_tracing
* Add blob garbage collection parameters `blob_garbage_collection_policy` and `blob_garbage_collection_age_cutoff` to both force-enable and force-disable GC, as well as selectively override age cutoff when using CompactRange.
* Add an extra sanity check in `GetSortedWalFiles()` (also used by `GetLiveFilesStorageInfo()`, `BackupEngine`, and `Checkpoint`) to reduce risk of successfully created backup or checkpoint failing to open because of missing WAL file. * Add an extra sanity check in `GetSortedWalFiles()` (also used by `GetLiveFilesStorageInfo()`, `BackupEngine`, and `Checkpoint`) to reduce risk of successfully created backup or checkpoint failing to open because of missing WAL file.
### Behavior changes ### Behavior changes

@ -214,7 +214,9 @@ Compaction::Compaction(
CompressionOptions _compression_opts, Temperature _output_temperature, CompressionOptions _compression_opts, Temperature _output_temperature,
uint32_t _max_subcompactions, std::vector<FileMetaData*> _grandparents, uint32_t _max_subcompactions, std::vector<FileMetaData*> _grandparents,
bool _manual_compaction, const std::string& _trim_ts, double _score, bool _manual_compaction, const std::string& _trim_ts, double _score,
bool _deletion_compaction, CompactionReason _compaction_reason) bool _deletion_compaction, CompactionReason _compaction_reason,
BlobGarbageCollectionPolicy _blob_garbage_collection_policy,
double _blob_garbage_collection_age_cutoff)
: input_vstorage_(vstorage), : input_vstorage_(vstorage),
start_level_(_inputs[0].level), start_level_(_inputs[0].level),
output_level_(_output_level), output_level_(_output_level),
@ -240,7 +242,19 @@ Compaction::Compaction(
trim_ts_(_trim_ts), trim_ts_(_trim_ts),
is_trivial_move_(false), is_trivial_move_(false),
compaction_reason_(_compaction_reason), compaction_reason_(_compaction_reason),
notify_on_compaction_completion_(false) { notify_on_compaction_completion_(false),
enable_blob_garbage_collection_(
_blob_garbage_collection_policy == BlobGarbageCollectionPolicy::kForce
? true
: (_blob_garbage_collection_policy ==
BlobGarbageCollectionPolicy::kDisable
? false
: mutable_cf_options()->enable_blob_garbage_collection)),
blob_garbage_collection_age_cutoff_(
_blob_garbage_collection_age_cutoff < 0 ||
_blob_garbage_collection_age_cutoff > 1
? mutable_cf_options()->blob_garbage_collection_age_cutoff
: _blob_garbage_collection_age_cutoff) {
MarkFilesBeingCompacted(true); MarkFilesBeingCompacted(true);
if (is_manual_compaction_) { if (is_manual_compaction_) {
compaction_reason_ = CompactionReason::kManualCompaction; compaction_reason_ = CompactionReason::kManualCompaction;
@ -325,8 +339,8 @@ bool Compaction::IsTrivialMove() const {
} }
if (!(start_level_ != output_level_ && num_input_levels() == 1 && if (!(start_level_ != output_level_ && num_input_levels() == 1 &&
input(0, 0)->fd.GetPathId() == output_path_id() && input(0, 0)->fd.GetPathId() == output_path_id() &&
InputCompressionMatchesOutput())) { InputCompressionMatchesOutput())) {
return false; return false;
} }

@ -81,7 +81,10 @@ class Compaction {
std::vector<FileMetaData*> grandparents, std::vector<FileMetaData*> grandparents,
bool manual_compaction = false, const std::string& trim_ts = "", bool manual_compaction = false, const std::string& trim_ts = "",
double score = -1, bool deletion_compaction = false, double score = -1, bool deletion_compaction = false,
CompactionReason compaction_reason = CompactionReason::kUnknown); CompactionReason compaction_reason = CompactionReason::kUnknown,
BlobGarbageCollectionPolicy blob_garbage_collection_policy =
BlobGarbageCollectionPolicy::kUseDefault,
double blob_garbage_collection_age_cutoff = -1);
// No copying allowed // No copying allowed
Compaction(const Compaction&) = delete; Compaction(const Compaction&) = delete;
@ -306,6 +309,14 @@ class Compaction {
uint32_t max_subcompactions() const { return max_subcompactions_; } uint32_t max_subcompactions() const { return max_subcompactions_; }
bool enable_blob_garbage_collection() const {
return enable_blob_garbage_collection_;
}
double blob_garbage_collection_age_cutoff() const {
return blob_garbage_collection_age_cutoff_;
}
// start and end are sub compact range. Null if no boundary. // start and end are sub compact range. Null if no boundary.
// This is used to filter out some input files' ancester's time range. // This is used to filter out some input files' ancester's time range.
uint64_t MinInputFileOldestAncesterTime(const InternalKey* start, uint64_t MinInputFileOldestAncesterTime(const InternalKey* start,
@ -332,8 +343,9 @@ class Compaction {
// Get the atomic file boundaries for all files in the compaction. Necessary // Get the atomic file boundaries for all files in the compaction. Necessary
// in order to avoid the scenario described in // in order to avoid the scenario described in
// https://github.com/facebook/rocksdb/pull/4432#discussion_r221072219 and plumb // https://github.com/facebook/rocksdb/pull/4432#discussion_r221072219 and
// down appropriate key boundaries to RangeDelAggregator during compaction. // plumb down appropriate key boundaries to RangeDelAggregator during
// compaction.
static std::vector<CompactionInputFiles> PopulateWithAtomicBoundaries( static std::vector<CompactionInputFiles> PopulateWithAtomicBoundaries(
VersionStorageInfo* vstorage, std::vector<CompactionInputFiles> inputs); VersionStorageInfo* vstorage, std::vector<CompactionInputFiles> inputs);
@ -348,7 +360,7 @@ class Compaction {
VersionStorageInfo* input_vstorage_; VersionStorageInfo* input_vstorage_;
const int start_level_; // the lowest level to be compacted const int start_level_; // the lowest level to be compacted
const int output_level_; // levels to which output files are stored const int output_level_; // levels to which output files are stored
uint64_t max_output_file_size_; uint64_t max_output_file_size_;
uint64_t max_compaction_bytes_; uint64_t max_compaction_bytes_;
@ -359,7 +371,7 @@ class Compaction {
VersionEdit edit_; VersionEdit edit_;
const int number_levels_; const int number_levels_;
ColumnFamilyData* cfd_; ColumnFamilyData* cfd_;
Arena arena_; // Arena used to allocate space for file_levels_ Arena arena_; // Arena used to allocate space for file_levels_
const uint32_t output_path_id_; const uint32_t output_path_id_;
CompressionType output_compression_; CompressionType output_compression_;
@ -377,7 +389,7 @@ class Compaction {
// State used to check for number of overlapping grandparent files // State used to check for number of overlapping grandparent files
// (grandparent == "output_level_ + 1") // (grandparent == "output_level_ + 1")
std::vector<FileMetaData*> grandparents_; std::vector<FileMetaData*> grandparents_;
const double score_; // score that was used to pick this compaction. const double score_; // score that was used to pick this compaction.
// Is this compaction creating a file in the bottom most level? // Is this compaction creating a file in the bottom most level?
const bool bottommost_level_; const bool bottommost_level_;
@ -412,6 +424,12 @@ class Compaction {
// Notify on compaction completion only if listener was notified on compaction // Notify on compaction completion only if listener was notified on compaction
// begin. // begin.
bool notify_on_compaction_completion_; bool notify_on_compaction_completion_;
// Enable/disable GC collection for blobs during compaction.
bool enable_blob_garbage_collection_;
// Blob garbage collection age cutoff.
double blob_garbage_collection_age_cutoff_;
}; };
// Return sum of sizes of all files in `files`. // Return sum of sizes of all files in `files`.

@ -142,12 +142,11 @@ class CompactionIterator {
} }
bool enable_blob_garbage_collection() const override { bool enable_blob_garbage_collection() const override {
return compaction_->mutable_cf_options()->enable_blob_garbage_collection; return compaction_->enable_blob_garbage_collection();
} }
double blob_garbage_collection_age_cutoff() const override { double blob_garbage_collection_age_cutoff() const override {
return compaction_->mutable_cf_options() return compaction_->blob_garbage_collection_age_cutoff();
->blob_garbage_collection_age_cutoff;
} }
uint64_t blob_compaction_readahead_size() const override { uint64_t blob_compaction_readahead_size() const override {

@ -640,7 +640,11 @@ Compaction* CompactionPicker::CompactRange(
GetCompressionType(vstorage, mutable_cf_options, output_level, 1), GetCompressionType(vstorage, mutable_cf_options, output_level, 1),
GetCompressionOptions(mutable_cf_options, vstorage, output_level), GetCompressionOptions(mutable_cf_options, vstorage, output_level),
Temperature::kUnknown, compact_range_options.max_subcompactions, Temperature::kUnknown, compact_range_options.max_subcompactions,
/* grandparents */ {}, /* is manual */ true, trim_ts); /* grandparents */ {}, /* is manual */ true, trim_ts, /* score */ -1,
/* deletion_compaction */ false, CompactionReason::kUnknown,
compact_range_options.blob_garbage_collection_policy,
compact_range_options.blob_garbage_collection_age_cutoff);
RegisterCompaction(c); RegisterCompaction(c);
vstorage->ComputeCompactionScore(ioptions_, mutable_cf_options); vstorage->ComputeCompactionScore(ioptions_, mutable_cf_options);
return c; return c;
@ -818,8 +822,10 @@ Compaction* CompactionPicker::CompactRange(
vstorage->base_level()), vstorage->base_level()),
GetCompressionOptions(mutable_cf_options, vstorage, output_level), GetCompressionOptions(mutable_cf_options, vstorage, output_level),
Temperature::kUnknown, compact_range_options.max_subcompactions, Temperature::kUnknown, compact_range_options.max_subcompactions,
std::move(grandparents), std::move(grandparents), /* is manual */ true, trim_ts, /* score */ -1,
/* is manual compaction */ true, trim_ts); /* deletion_compaction */ false, CompactionReason::kUnknown,
compact_range_options.blob_garbage_collection_policy,
compact_range_options.blob_garbage_collection_age_cutoff);
TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction); TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction);
RegisterCompaction(compaction); RegisterCompaction(compaction);

@ -6387,6 +6387,74 @@ INSTANTIATE_TEST_CASE_P(DBCompactionTestBlobGC, DBCompactionTestBlobGC,
::testing::Combine(::testing::Values(0.0, 0.5, 1.0), ::testing::Combine(::testing::Values(0.0, 0.5, 1.0),
::testing::Bool())); ::testing::Bool()));
TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGCOverrides) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.enable_blob_files = true;
options.blob_file_size = 32; // one blob per file
options.enable_blob_garbage_collection = true;
options.blob_garbage_collection_age_cutoff = 0;
DestroyAndReopen(options);
for (int i = 0; i < 128; i += 2) {
ASSERT_OK(Put("key" + std::to_string(i), "value" + std::to_string(i)));
ASSERT_OK(
Put("key" + std::to_string(i + 1), "value" + std::to_string(i + 1)));
ASSERT_OK(Flush());
}
std::vector<uint64_t> original_blob_files = GetBlobFileNumbers();
ASSERT_EQ(original_blob_files.size(), 128);
// Note: turning off enable_blob_files before the compaction results in
// garbage collected values getting inlined.
ASSERT_OK(db_->SetOptions({{"enable_blob_files", "false"}}));
CompactRangeOptions cro;
cro.blob_garbage_collection_policy = BlobGarbageCollectionPolicy::kForce;
cro.blob_garbage_collection_age_cutoff = blob_gc_age_cutoff_;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
// Check that the GC stats are correct
{
VersionSet* const versions = dbfull()->GetVersionSet();
assert(versions);
assert(versions->GetColumnFamilySet());
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
const InternalStats* const internal_stats = cfd->internal_stats();
assert(internal_stats);
const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
ASSERT_GE(compaction_stats.size(), 2);
ASSERT_GE(compaction_stats[1].bytes_read_blob, 0);
ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
}
const size_t cutoff_index = static_cast<size_t>(
cro.blob_garbage_collection_age_cutoff * original_blob_files.size());
const size_t expected_num_files = original_blob_files.size() - cutoff_index;
const std::vector<uint64_t> new_blob_files = GetBlobFileNumbers();
ASSERT_EQ(new_blob_files.size(), expected_num_files);
// Original blob files below the cutoff should be gone, original blob files
// at or above the cutoff should be still there
for (size_t i = cutoff_index; i < original_blob_files.size(); ++i) {
ASSERT_EQ(new_blob_files[i - cutoff_index], original_blob_files[i]);
}
for (size_t i = 0; i < 128; ++i) {
ASSERT_EQ(Get("key" + std::to_string(i)), "value" + std::to_string(i));
}
}
TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGC) { TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGC) {
Options options; Options options;
options.env = env_; options.env = env_;

@ -681,7 +681,7 @@ void StressTest::OperateDb(ThreadState* thread) {
#ifndef NDEBUG #ifndef NDEBUG
if (FLAGS_read_fault_one_in) { if (FLAGS_read_fault_one_in) {
fault_fs_guard->SetThreadLocalReadErrorContext(thread->shared->GetSeed(), fault_fs_guard->SetThreadLocalReadErrorContext(thread->shared->GetSeed(),
FLAGS_read_fault_one_in); FLAGS_read_fault_one_in);
} }
#endif // NDEBUG #endif // NDEBUG
if (FLAGS_write_fault_one_in) { if (FLAGS_write_fault_one_in) {
@ -2119,6 +2119,15 @@ void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key,
static_cast<uint32_t>(bottom_level_styles.size())]; static_cast<uint32_t>(bottom_level_styles.size())];
cro.allow_write_stall = static_cast<bool>(thread->rand.Next() % 2); cro.allow_write_stall = static_cast<bool>(thread->rand.Next() % 2);
cro.max_subcompactions = static_cast<uint32_t>(thread->rand.Next() % 4); cro.max_subcompactions = static_cast<uint32_t>(thread->rand.Next() % 4);
std::vector<BlobGarbageCollectionPolicy> blob_gc_policies = {
BlobGarbageCollectionPolicy::kForce,
BlobGarbageCollectionPolicy::kDisable,
BlobGarbageCollectionPolicy::kUseDefault};
cro.blob_garbage_collection_policy =
blob_gc_policies[thread->rand.Next() %
static_cast<uint32_t>(blob_gc_policies.size())];
cro.blob_garbage_collection_age_cutoff =
static_cast<double>(thread->rand.Next() % 100) / 100.0;
const Snapshot* pre_snapshot = nullptr; const Snapshot* pre_snapshot = nullptr;
uint32_t pre_hash = 0; uint32_t pre_hash = 0;
@ -2322,7 +2331,8 @@ void StressTest::PrintEnv() const {
fprintf(stdout, "Open metadata write fault one in:\n"); fprintf(stdout, "Open metadata write fault one in:\n");
fprintf(stdout, " %d\n", fprintf(stdout, " %d\n",
FLAGS_open_metadata_write_fault_one_in); FLAGS_open_metadata_write_fault_one_in);
fprintf(stdout, "Sync fault injection : %d\n", FLAGS_sync_fault_injection); fprintf(stdout, "Sync fault injection : %d\n",
FLAGS_sync_fault_injection);
fprintf(stdout, "Best efforts recovery : %d\n", fprintf(stdout, "Best efforts recovery : %d\n",
static_cast<int>(FLAGS_best_efforts_recovery)); static_cast<int>(FLAGS_best_efforts_recovery));
fprintf(stdout, "Fail if OPTIONS file error: %d\n", fprintf(stdout, "Fail if OPTIONS file error: %d\n",
@ -2697,7 +2707,7 @@ void StressTest::Reopen(ThreadState* thread) {
} }
assert(!write_prepared || bg_canceled); assert(!write_prepared || bg_canceled);
#else #else
(void) thread; (void)thread;
#endif #endif
for (auto cf : column_families_) { for (auto cf : column_families_) {

@ -1791,6 +1791,17 @@ enum class BottommostLevelCompaction {
kForceOptimized, kForceOptimized,
}; };
// For manual compaction, we can configure if we want to skip/force garbage
// collection of blob files.
enum class BlobGarbageCollectionPolicy {
// Force blob file garbage collection.
kForce,
// Skip blob file garbage collection.
kDisable,
// Inherit blob file garbage collection policy from ColumnFamilyOptions.
kUseDefault,
};
// CompactRangeOptions is used by CompactRange() call. // CompactRangeOptions is used by CompactRange() call.
struct CompactRangeOptions { struct CompactRangeOptions {
// If true, no other compaction will run at the same time as this // If true, no other compaction will run at the same time as this
@ -1823,6 +1834,19 @@ struct CompactRangeOptions {
// Cancellation can be delayed waiting on automatic compactions when used // Cancellation can be delayed waiting on automatic compactions when used
// together with `exclusive_manual_compaction == true`. // together with `exclusive_manual_compaction == true`.
std::atomic<bool>* canceled = nullptr; std::atomic<bool>* canceled = nullptr;
// If set to kForce, RocksDB will override enable_blob_file_garbage_collection
// to true; if set to kDisable, RocksDB will override it to false, and
// kUseDefault leaves the setting in effect. This enables customers to both
// force-enable and force-disable GC when calling CompactRange.
BlobGarbageCollectionPolicy blob_garbage_collection_policy =
BlobGarbageCollectionPolicy::kUseDefault;
// If set to < 0 or > 1, RocksDB leaves blob_garbage_collection_age_cutoff
// from ColumnFamilyOptions in effect. Otherwise, it will override the
// user-provided setting. This enables customers to selectively override the
// age cutoff.
double blob_garbage_collection_age_cutoff = -1;
}; };
// IngestExternalFileOptions is used by IngestExternalFile() // IngestExternalFileOptions is used by IngestExternalFile()

Loading…
Cancel
Save