Log the amount of blob garbage generated by compactions in the MANIFEST (#8450)

Summary:
The patch builds on `BlobGarbageMeter` and `BlobCountingIterator`
(introduced in https://github.com/facebook/rocksdb/issues/8426 and
https://github.com/facebook/rocksdb/issues/8443 respectively)
and ties it all together. It measures the amount of garbage
generated by a compaction and logs the corresponding `BlobFileGarbage`
records as part of the compaction job's `VersionEdit`. Note: in order
to have accurate results, `kRemoveAndSkipUntil` for compaction filters
is implemented using iteration.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8450

Test Plan: Ran `make check` and the crash test script.

Reviewed By: jay-zhuang

Differential Revision: D29338207

Pulled By: ltamasi

fbshipit-source-id: 4381c432ac215139439f6d6fb801a6c0e4d8c128
main
Levi Tamasi 3 years ago committed by Facebook GitHub Bot
parent 75741eb0ce
commit 68d8b28389
  1. 4
      db/blob/blob_counting_iterator.h
  2. 175
      db/blob/db_blob_compaction_test.cc
  3. 23
      db/compaction/compaction.cc
  4. 5
      db/compaction/compaction.h
  5. 6
      db/compaction/compaction_iterator.cc
  6. 6
      db/compaction/compaction_iterator.h
  7. 2
      db/compaction/compaction_iterator_test.cc
  8. 51
      db/compaction/compaction_job.cc

@ -11,6 +11,7 @@
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/status.h"
#include "table/internal_iterator.h"
#include "test_util/sync_point.h"
namespace ROCKSDB_NAMESPACE {
@ -131,6 +132,9 @@ class BlobCountingIterator : public InternalIterator {
return;
}
TEST_SYNC_POINT(
"BlobCountingIterator::UpdateAndCountBlobIfNeeded:ProcessInFlow");
status_ = blob_garbage_meter_->ProcessInFlow(key(), value());
}

@ -4,6 +4,7 @@
// (found in the LICENSE.Apache file in the root directory).
#include "db/blob/blob_index.h"
#include "db/blob/blob_log_format.h"
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "test_util/sync_point.h"
@ -152,6 +153,31 @@ class AlwaysKeepFilter : public CompactionFilter {
return CompactionFilter::Decision::kKeep;
}
};
class SkipUntilFilter : public CompactionFilter {
public:
explicit SkipUntilFilter(std::string skip_until)
: skip_until_(std::move(skip_until)) {}
const char* Name() const override {
return "rocksdb.compaction.filter.skip.until";
}
CompactionFilter::Decision FilterV2(int /* level */, const Slice& /* key */,
ValueType /* value_type */,
const Slice& /* existing_value */,
std::string* /* new_value */,
std::string* skip_until) const override {
assert(skip_until);
*skip_until = skip_until_;
return CompactionFilter::Decision::kRemoveAndSkipUntil;
}
private:
std::string skip_until_;
};
} // anonymous namespace
class DBBlobBadCompactionFilterTest
@ -254,6 +280,49 @@ TEST_F(DBBlobCompactionTest, BlindWriteFilter) {
Close();
}
TEST_F(DBBlobCompactionTest, SkipUntilFilter) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
std::unique_ptr<CompactionFilter> compaction_filter_guard(
new SkipUntilFilter("z"));
options.compaction_filter = compaction_filter_guard.get();
Reopen(options);
const std::vector<std::string> keys{"a", "b", "c"};
const std::vector<std::string> values{"a_value", "b_value", "c_value"};
assert(keys.size() == values.size());
for (size_t i = 0; i < keys.size(); ++i) {
ASSERT_OK(Put(keys[i], values[i]));
}
ASSERT_OK(Flush());
int process_in_flow_called = 0;
SyncPoint::GetInstance()->SetCallBack(
"BlobCountingIterator::UpdateAndCountBlobIfNeeded:ProcessInFlow",
[&process_in_flow_called](void* /* arg */) { ++process_in_flow_called; });
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /* begin */ nullptr,
/* end */ nullptr));
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
for (const auto& key : keys) {
ASSERT_EQ(Get(key), "NOT_FOUND");
}
// Make sure SkipUntil was performed using iteration rather than Seek
ASSERT_EQ(process_in_flow_called, keys.size());
Close();
}
TEST_P(DBBlobBadCompactionFilterTest, BadDecisionFromCompactionFilter) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
@ -390,6 +459,112 @@ TEST_F(DBBlobCompactionTest, CompactionFilterReadBlobAndKeep) {
Close();
}
TEST_F(DBBlobCompactionTest, TrackGarbage) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
Reopen(options);
// First table+blob file pair: 4 blobs with different keys
constexpr char first_key[] = "first_key";
constexpr char first_value[] = "first_value";
constexpr char second_key[] = "second_key";
constexpr char second_value[] = "second_value";
constexpr char third_key[] = "third_key";
constexpr char third_value[] = "third_value";
constexpr char fourth_key[] = "fourth_key";
constexpr char fourth_value[] = "fourth_value";
ASSERT_OK(Put(first_key, first_value));
ASSERT_OK(Put(second_key, second_value));
ASSERT_OK(Put(third_key, third_value));
ASSERT_OK(Put(fourth_key, fourth_value));
ASSERT_OK(Flush());
// Second table+blob file pair: overwrite 2 existing keys
constexpr char new_first_value[] = "new_first_value";
constexpr char new_second_value[] = "new_second_value";
ASSERT_OK(Put(first_key, new_first_value));
ASSERT_OK(Put(second_key, new_second_value));
ASSERT_OK(Flush());
// Compact them together. The first blob file should have 2 garbage blobs
// corresponding to the 2 overwritten keys.
constexpr Slice* begin = nullptr;
constexpr Slice* end = nullptr;
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
VersionSet* const versions = dbfull()->TEST_GetVersionSet();
assert(versions);
assert(versions->GetColumnFamilySet());
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
Version* const current = cfd->current();
assert(current);
const VersionStorageInfo* const storage_info = current->storage_info();
assert(storage_info);
const auto& blob_files = storage_info->GetBlobFiles();
ASSERT_EQ(blob_files.size(), 2);
{
auto it = blob_files.begin();
const auto& meta = it->second;
assert(meta);
constexpr uint64_t first_expected_bytes =
sizeof(first_value) - 1 +
BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(first_key) -
1);
constexpr uint64_t second_expected_bytes =
sizeof(second_value) - 1 +
BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(second_key) -
1);
constexpr uint64_t third_expected_bytes =
sizeof(third_value) - 1 +
BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(third_key) -
1);
constexpr uint64_t fourth_expected_bytes =
sizeof(fourth_value) - 1 +
BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(fourth_key) -
1);
ASSERT_EQ(meta->GetTotalBlobCount(), 4);
ASSERT_EQ(meta->GetTotalBlobBytes(),
first_expected_bytes + second_expected_bytes +
third_expected_bytes + fourth_expected_bytes);
ASSERT_EQ(meta->GetGarbageBlobCount(), 2);
ASSERT_EQ(meta->GetGarbageBlobBytes(),
first_expected_bytes + second_expected_bytes);
}
{
auto it = blob_files.rbegin();
const auto& meta = it->second;
assert(meta);
constexpr uint64_t new_first_expected_bytes =
sizeof(new_first_value) - 1 +
BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(first_key) -
1);
constexpr uint64_t new_second_expected_bytes =
sizeof(new_second_value) - 1 +
BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(second_key) -
1);
ASSERT_EQ(meta->GetTotalBlobCount(), 2);
ASSERT_EQ(meta->GetTotalBlobBytes(),
new_first_expected_bytes + new_second_expected_bytes);
ASSERT_EQ(meta->GetGarbageBlobCount(), 0);
ASSERT_EQ(meta->GetGarbageBlobBytes(), 0);
}
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -581,6 +581,29 @@ bool Compaction::ShouldFormSubcompactions() const {
}
}
bool Compaction::DoesInputReferenceBlobFiles() const {
assert(input_version_);
const VersionStorageInfo* storage_info = input_version_->storage_info();
assert(storage_info);
if (storage_info->GetBlobFiles().empty()) {
return false;
}
for (size_t i = 0; i < inputs_.size(); ++i) {
for (const FileMetaData* meta : inputs_[i].files) {
assert(meta);
if (meta->oldest_blob_file_number != kInvalidBlobFileNumber) {
return true;
}
}
}
return false;
}
uint64_t Compaction::MinInputFileOldestAncesterTime() const {
uint64_t min_oldest_ancester_time = port::kMaxUint64;
for (const auto& level_files : inputs_) {

@ -266,6 +266,11 @@ class Compaction {
// Should this compaction be broken up into smaller ones run in parallel?
bool ShouldFormSubcompactions() const;
// Returns true iff at least one input file references a blob file.
//
// PRE: input version has been set.
bool DoesInputReferenceBlobFiles() const;
// test function to validate the functionality of IsBottommostLevel()
// function -- determines if compaction with inputs and storage is bottommost
static bool TEST_IsBottommostLevel(

@ -75,10 +75,8 @@ CompactionIterator::CompactionIterator(
const std::atomic<bool>* manual_compaction_canceled,
const std::shared_ptr<Logger> info_log,
const std::string* full_history_ts_low)
: input_(
input, cmp,
compaction ==
nullptr), // Now only need to count number of entries in flush.
: input_(input, cmp,
!compaction || compaction->DoesInputReferenceBlobFiles()),
cmp_(cmp),
merge_helper_(merge_helper),
snapshots_(snapshots),

@ -97,6 +97,8 @@ class CompactionIterator {
virtual double blob_garbage_collection_age_cutoff() const = 0;
virtual Version* input_version() const = 0;
virtual bool DoesInputReferenceBlobFiles() const = 0;
};
class RealCompaction : public CompactionProxy {
@ -146,6 +148,10 @@ class CompactionIterator {
return compaction_->input_version();
}
bool DoesInputReferenceBlobFiles() const override {
return compaction_->DoesInputReferenceBlobFiles();
}
private:
const Compaction* compaction_;
};

@ -182,6 +182,8 @@ class FakeCompaction : public CompactionIterator::CompactionProxy {
Version* input_version() const override { return nullptr; }
bool DoesInputReferenceBlobFiles() const override { return false; }
bool key_not_exists_beyond_output_level = false;
bool is_bottommost_level = false;

@ -20,8 +20,10 @@
#include <utility>
#include <vector>
#include "db/blob/blob_counting_iterator.h"
#include "db/blob/blob_file_addition.h"
#include "db/blob/blob_file_builder.h"
#include "db/blob/blob_garbage_meter.h"
#include "db/builder.h"
#include "db/compaction/clipping_iterator.h"
#include "db/db_impl/db_impl.h"
@ -147,6 +149,7 @@ struct CompactionJob::SubcompactionState {
// State kept for output being generated
std::vector<Output> outputs;
std::vector<BlobFileAddition> blob_file_additions;
std::unique_ptr<BlobGarbageMeter> blob_garbage_meter;
std::unique_ptr<WritableFileWriter> outfile;
std::unique_ptr<TableBuilder> builder;
@ -229,6 +232,14 @@ struct CompactionJob::SubcompactionState {
return false;
}
Status ProcessOutFlowIfNeeded(const Slice& key, const Slice& value) {
if (!blob_garbage_meter) {
return Status::OK();
}
return blob_garbage_meter->ProcessOutFlow(key, value);
}
};
// Maintains state for the entire compaction
@ -1136,6 +1147,15 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
input = clip.get();
}
std::unique_ptr<InternalIterator> blob_counter;
if (sub_compact->compaction->DoesInputReferenceBlobFiles()) {
sub_compact->blob_garbage_meter.reset(new BlobGarbageMeter);
blob_counter.reset(
new BlobCountingIterator(input, sub_compact->blob_garbage_meter.get()));
input = blob_counter.get();
}
input->SeekToFirst();
AutoThreadOperationStageUpdater stage_updater(
@ -1248,6 +1268,11 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
break;
}
status = sub_compact->ProcessOutFlowIfNeeded(key, value);
if (!status.ok()) {
break;
}
sub_compact->current_output_file_size =
sub_compact->builder->EstimatedFileSize();
const ParsedInternalKey& ikey = c_iter->ikey();
@ -1415,6 +1440,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
sub_compact->c_iter.reset();
blob_counter.reset();
clip.reset();
raw_input.reset();
sub_compact->status = status;
@ -1799,6 +1825,8 @@ Status CompactionJob::InstallCompactionResults(
// Add compaction inputs
compaction->AddInputDeletions(edit);
std::unordered_map<uint64_t, BlobGarbageMeter::BlobStats> blob_total_garbage;
for (const auto& sub_compact : compact_->sub_compact_states) {
for (const auto& out : sub_compact.outputs) {
edit->AddFile(compaction->output_level(), out.meta);
@ -1807,6 +1835,29 @@ Status CompactionJob::InstallCompactionResults(
for (const auto& blob : sub_compact.blob_file_additions) {
edit->AddBlobFile(blob);
}
if (sub_compact.blob_garbage_meter) {
const auto& flows = sub_compact.blob_garbage_meter->flows();
for (const auto& pair : flows) {
const uint64_t blob_file_number = pair.first;
const BlobGarbageMeter::BlobInOutFlow& flow = pair.second;
assert(flow.IsValid());
if (flow.HasGarbage()) {
blob_total_garbage[blob_file_number].Add(flow.GetGarbageCount(),
flow.GetGarbageBytes());
}
}
}
}
for (const auto& pair : blob_total_garbage) {
const uint64_t blob_file_number = pair.first;
const BlobGarbageMeter::BlobStats& stats = pair.second;
edit->AddBlobFileGarbage(blob_file_number, stats.GetCount(),
stats.GetBytes());
}
return versions_->LogAndApply(compaction->column_family_data(),

Loading…
Cancel
Save