Integrated blob garbage collection: relocate blobs (#7694)

Summary:
The patch adds basic garbage collection support to the integrated BlobDB
implementation. Valid blobs residing in the oldest blob files are relocated
as they are encountered during compaction. The threshold that determines
which blob files qualify is computed based on the configuration option
`blob_garbage_collection_age_cutoff`, which was introduced in https://github.com/facebook/rocksdb/issues/7661 .
Once a blob is retrieved for the purposes of relocation, it passes through the
same logic that extracts large values to blob files in general. This means that
if, for instance, the size threshold for key-value separation (`min_blob_size`)
got changed or writing blob files got disabled altogether, it is possible for the
value to be moved back into the LSM tree. In particular, one way to re-inline
all blob values if needed would be to perform a full manual compaction with
`enable_blob_files` set to `false`, `enable_blob_garbage_collection` set to
`true`, and `blob_file_garbage_collection_age_cutoff` set to `1.0`.

Some TODOs that I plan to address in separate PRs:

1) We'll have to measure the amount of new garbage in each blob file and log
`BlobFileGarbage` entries as part of the compaction job's `VersionEdit`.
(For the time being, blob files are cleaned up solely based on the
`oldest_blob_file_number` relationships.)
2) When compression is used for blobs, the compression type hasn't changed,
and the blob still qualifies for being written to a blob file, we can simply copy
the compressed blob to the new file instead of going through decompression
and compression.
3) We need to update the formula for computing write amplification to account
for the amount of data read from blob files as part of GC.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7694

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D25069663

Pulled By: ltamasi

fbshipit-source-id: bdfa8feb09afcf5bca3b4eba2ba72ce2f15cd06a
main
Levi Tamasi 4 years ago committed by Facebook GitHub Bot
parent dd6b7fc520
commit 51a8dc6d14
  1. 9
      db/column_family.cc
  2. 24
      db/column_family_test.cc
  3. 188
      db/compaction/compaction_iterator.cc
  4. 53
      db/compaction/compaction_iterator.h
  5. 6
      db/compaction/compaction_iterator_test.cc
  6. 255
      db/db_compaction_test.cc
  7. 6
      db/version_set.cc
  8. 20
      db/version_set.h

@ -1339,6 +1339,15 @@ Status ColumnFamilyData::ValidateOptions(
"Block-Based Table format. ");
}
}
if (cf_options.enable_blob_garbage_collection &&
(cf_options.blob_garbage_collection_age_cutoff < 0.0 ||
cf_options.blob_garbage_collection_age_cutoff > 1.0)) {
return Status::InvalidArgument(
"The age cutoff for blob garbage collection should be in the range "
"[0.0, 1.0].");
}
return s;
}

@ -3391,6 +3391,30 @@ TEST_P(ColumnFamilyTest, MultipleCFPathsTest) {
}
}
TEST(ColumnFamilyTest, ValidateBlobGCCutoff) {
DBOptions db_options;
ColumnFamilyOptions cf_options;
cf_options.enable_blob_garbage_collection = true;
cf_options.blob_garbage_collection_age_cutoff = -0.5;
ASSERT_TRUE(ColumnFamilyData::ValidateOptions(db_options, cf_options)
.IsInvalidArgument());
cf_options.blob_garbage_collection_age_cutoff = 0.0;
ASSERT_OK(ColumnFamilyData::ValidateOptions(db_options, cf_options));
cf_options.blob_garbage_collection_age_cutoff = 0.5;
ASSERT_OK(ColumnFamilyData::ValidateOptions(db_options, cf_options));
cf_options.blob_garbage_collection_age_cutoff = 1.0;
ASSERT_OK(ColumnFamilyData::ValidateOptions(db_options, cf_options));
cf_options.blob_garbage_collection_age_cutoff = 1.5;
ASSERT_TRUE(ColumnFamilyData::ValidateOptions(db_options, cf_options)
.IsInvalidArgument());
}
} // namespace ROCKSDB_NAMESPACE
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS

@ -5,9 +5,11 @@
#include "db/compaction/compaction_iterator.h"
#include <cinttypes>
#include <iterator>
#include <limits>
#include "db/blob/blob_file_builder.h"
#include "db/blob/blob_index.h"
#include "db/snapshot_checker.h"
#include "port/likely.h"
#include "rocksdb/listener.h"
@ -94,6 +96,8 @@ CompactionIterator::CompactionIterator(
current_user_key_sequence_(0),
current_user_key_snapshot_(0),
merge_out_iter_(merge_helper_),
blob_garbage_collection_cutoff_file_number_(
ComputeBlobGarbageCollectionCutoffFileNumber(compaction_.get())),
current_key_committed_(false),
cmp_with_history_ts_low_(0) {
assert(compaction_filter_ == nullptr || compaction_ != nullptr);
@ -733,40 +737,136 @@ void CompactionIterator::NextFromInput() {
}
}
bool CompactionIterator::ExtractLargeValueIfNeededImpl() {
if (!blob_file_builder_) {
return false;
}
blob_index_.clear();
const Status s = blob_file_builder_->Add(user_key(), value_, &blob_index_);
if (!s.ok()) {
status_ = s;
valid_ = false;
return false;
}
if (blob_index_.empty()) {
return false;
}
value_ = blob_index_;
return true;
}
void CompactionIterator::ExtractLargeValueIfNeeded() {
assert(ikey_.type == kTypeValue);
if (!ExtractLargeValueIfNeededImpl()) {
return;
}
ikey_.type = kTypeBlobIndex;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
}
void CompactionIterator::GarbageCollectBlobIfNeeded() {
assert(ikey_.type == kTypeBlobIndex);
if (!compaction_) {
return;
}
// GC for integrated BlobDB
if (compaction_->enable_blob_garbage_collection()) {
BlobIndex blob_index;
{
const Status s = blob_index.DecodeFrom(value_);
if (!s.ok()) {
status_ = s;
valid_ = false;
return;
}
}
if (blob_index.IsInlined() || blob_index.HasTTL()) {
status_ = Status::Corruption("Unexpected TTL/inlined blob index");
valid_ = false;
return;
}
if (blob_index.file_number() >=
blob_garbage_collection_cutoff_file_number_) {
return;
}
const Version* const version = compaction_->input_version();
assert(version);
{
const Status s =
version->GetBlob(ReadOptions(), user_key(), blob_index, &blob_value_);
if (!s.ok()) {
status_ = s;
valid_ = false;
return;
}
}
value_ = blob_value_;
if (ExtractLargeValueIfNeededImpl()) {
return;
}
ikey_.type = kTypeValue;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
return;
}
// GC for stacked BlobDB
if (compaction_filter_) {
const auto blob_decision = compaction_filter_->PrepareBlobOutput(
user_key(), value_, &compaction_filter_value_);
if (blob_decision == CompactionFilter::BlobDecision::kCorruption) {
status_ =
Status::Corruption("Corrupted blob reference encountered during GC");
valid_ = false;
return;
}
if (blob_decision == CompactionFilter::BlobDecision::kIOError) {
status_ = Status::IOError("Could not relocate blob during GC");
valid_ = false;
return;
}
if (blob_decision == CompactionFilter::BlobDecision::kChangeValue) {
value_ = compaction_filter_value_;
return;
}
}
}
void CompactionIterator::PrepareOutput() {
if (valid_) {
if (ikey_.type == kTypeValue) {
if (blob_file_builder_) {
blob_index_.clear();
const Status s =
blob_file_builder_->Add(user_key(), value_, &blob_index_);
if (!s.ok()) {
status_ = s;
valid_ = false;
} else if (!blob_index_.empty()) {
value_ = blob_index_;
ikey_.type = kTypeBlobIndex;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
}
}
ExtractLargeValueIfNeeded();
} else if (ikey_.type == kTypeBlobIndex) {
if (compaction_filter_) {
const auto blob_decision = compaction_filter_->PrepareBlobOutput(
user_key(), value_, &compaction_filter_value_);
if (blob_decision == CompactionFilter::BlobDecision::kCorruption) {
status_ = Status::Corruption(
"Corrupted blob reference encountered during GC");
valid_ = false;
} else if (blob_decision == CompactionFilter::BlobDecision::kIOError) {
status_ = Status::IOError("Could not relocate blob during GC");
valid_ = false;
} else if (blob_decision ==
CompactionFilter::BlobDecision::kChangeValue) {
value_ = compaction_filter_value_;
}
}
GarbageCollectBlobIfNeeded();
}
// Zeroing out the sequence number leads to better compression.
@ -894,4 +994,30 @@ bool CompactionIterator::IsInEarliestSnapshot(SequenceNumber sequence) {
return in_snapshot == SnapshotCheckerResult::kInSnapshot;
}
uint64_t CompactionIterator::ComputeBlobGarbageCollectionCutoffFileNumber(
const CompactionProxy* compaction) {
if (!compaction) {
return 0;
}
if (!compaction->enable_blob_garbage_collection()) {
return 0;
}
Version* const version = compaction->input_version();
assert(version);
const VersionStorageInfo* const storage_info = version->storage_info();
assert(storage_info);
const auto& blob_files = storage_info->GetBlobFiles();
auto it = blob_files.begin();
std::advance(
it, compaction->blob_garbage_collection_age_cutoff() * blob_files.size());
return it != blob_files.end() ? it->first
: std::numeric_limits<uint64_t>::max();
}
} // namespace ROCKSDB_NAMESPACE

@ -5,6 +5,7 @@
#pragma once
#include <algorithm>
#include <cinttypes>
#include <deque>
#include <string>
#include <unordered_set>
@ -45,6 +46,12 @@ class CompactionIterator {
virtual bool allow_ingest_behind() const = 0;
virtual bool preserve_deletes() const = 0;
virtual bool enable_blob_garbage_collection() const = 0;
virtual double blob_garbage_collection_age_cutoff() const = 0;
virtual Version* input_version() const = 0;
};
class RealCompaction : public CompactionProxy {
@ -53,6 +60,7 @@ class CompactionIterator {
: compaction_(compaction) {
assert(compaction_);
assert(compaction_->immutable_cf_options());
assert(compaction_->mutable_cf_options());
}
int level() const override { return compaction_->level(); }
@ -80,6 +88,19 @@ class CompactionIterator {
return compaction_->immutable_cf_options()->preserve_deletes;
}
bool enable_blob_garbage_collection() const override {
return compaction_->mutable_cf_options()->enable_blob_garbage_collection;
}
double blob_garbage_collection_age_cutoff() const override {
return compaction_->mutable_cf_options()
->blob_garbage_collection_age_cutoff;
}
Version* input_version() const override {
return compaction_->input_version();
}
private:
const Compaction* compaction_;
};
@ -146,11 +167,30 @@ class CompactionIterator {
// Processes the input stream to find the next output
void NextFromInput();
// Do last preparations before presenting the output to the callee. At this
// point this only zeroes out the sequence number if possible for better
// compression.
// Do final preparations before presenting the output to the callee.
void PrepareOutput();
// Passes the output value to the blob file builder (if any), and replaces it
// with the corresponding blob reference if it has been actually written to a
// blob file (i.e. if it passed the value size check). Returns true if the
// value got extracted to a blob file, false otherwise.
bool ExtractLargeValueIfNeededImpl();
// Extracts large values as described above, and updates the internal key's
// type to kTypeBlobIndex if the value got extracted. Should only be called
// for regular values (kTypeValue).
void ExtractLargeValueIfNeeded();
// Relocates valid blobs residing in the oldest blob files if garbage
// collection is enabled. Relocated blobs are written to new blob files or
// inlined in the LSM tree depending on the current settings (i.e.
// enable_blob_files and min_blob_size). Should only be called for blob
// references (kTypeBlobIndex).
//
// Note: the stacked BlobDB implementation's compaction filter based GC
// algorithm is also called from here.
void GarbageCollectBlobIfNeeded();
// Invoke compaction filter if needed.
// Return true on success, false on failures (e.g.: kIOError).
bool InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until);
@ -191,6 +231,9 @@ class CompactionIterator {
}
}
static uint64_t ComputeBlobGarbageCollectionCutoffFileNumber(
const CompactionProxy* compaction);
InternalIterator* input_;
const Comparator* cmp_;
MergeHelper* merge_helper_;
@ -273,7 +316,11 @@ class CompactionIterator {
// PinnedIteratorsManager used to pin input_ Iterator blocks while reading
// merge operands and then releasing them after consuming them.
PinnedIteratorsManager pinned_iters_mgr_;
uint64_t blob_garbage_collection_cutoff_file_number_;
std::string blob_index_;
PinnableSlice blob_value_;
std::string compaction_filter_value_;
InternalKey compaction_filter_skip_until_;
// "level_ptrs" holds indices that remember which file of an associated

@ -176,6 +176,12 @@ class FakeCompaction : public CompactionIterator::CompactionProxy {
bool preserve_deletes() const override { return false; }
bool enable_blob_garbage_collection() const override { return false; }
double blob_garbage_collection_age_cutoff() const override { return 0.0; }
Version* input_version() const override { return nullptr; }
bool key_not_exists_beyond_output_level = false;
bool is_bottommost_level = false;

@ -7,6 +7,9 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <tuple>
#include "db/blob/blob_index.h"
#include "db/db_test_util.h"
#include "port/port.h"
#include "port/stack_trace.h"
@ -28,6 +31,31 @@ class DBCompactionTest : public DBTestBase {
public:
DBCompactionTest()
: DBTestBase("/db_compaction_test", /*env_do_fsync=*/true) {}
std::vector<uint64_t> GetBlobFileNumbers() {
VersionSet* const versions = dbfull()->TEST_GetVersionSet();
assert(versions);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
Version* const current = cfd->current();
assert(current);
const VersionStorageInfo* const storage_info = current->storage_info();
assert(storage_info);
const auto& blob_files = storage_info->GetBlobFiles();
std::vector<uint64_t> result;
result.reserve(blob_files.size());
for (const auto& blob_file : blob_files) {
result.emplace_back(blob_file.first);
}
return result;
}
};
class DBCompactionTestWithParam
@ -6020,6 +6048,233 @@ TEST_P(DBCompactionTestBlobError, CompactionError) {
}
}
class DBCompactionTestBlobGC
: public DBCompactionTest,
public testing::WithParamInterface<std::tuple<double, bool>> {
public:
DBCompactionTestBlobGC()
: blob_gc_age_cutoff_(std::get<0>(GetParam())),
updated_enable_blob_files_(std::get<1>(GetParam())) {}
double blob_gc_age_cutoff_;
bool updated_enable_blob_files_;
};
INSTANTIATE_TEST_CASE_P(DBCompactionTestBlobGC, DBCompactionTestBlobGC,
::testing::Combine(::testing::Values(0.0, 0.5, 1.0),
::testing::Bool()));
TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGC) {
Options options;
options.env = env_;
options.disable_auto_compactions = true;
options.enable_blob_files = true;
options.blob_file_size = 32; // one blob per file
options.enable_blob_garbage_collection = true;
options.blob_garbage_collection_age_cutoff = blob_gc_age_cutoff_;
Reopen(options);
constexpr char first_key[] = "first_key";
constexpr char first_value[] = "first_value";
constexpr char second_key[] = "second_key";
constexpr char second_value[] = "second_value";
ASSERT_OK(Put(first_key, first_value));
ASSERT_OK(Put(second_key, second_value));
ASSERT_OK(Flush());
constexpr char third_key[] = "third_key";
constexpr char third_value[] = "third_value";
constexpr char fourth_key[] = "fourth_key";
constexpr char fourth_value[] = "fourth_value";
ASSERT_OK(Put(third_key, third_value));
ASSERT_OK(Put(fourth_key, fourth_value));
ASSERT_OK(Flush());
const std::vector<uint64_t> original_blob_files = GetBlobFileNumbers();
ASSERT_EQ(original_blob_files.size(), 4);
const size_t cutoff_index = static_cast<size_t>(
options.blob_garbage_collection_age_cutoff * original_blob_files.size());
// Note: turning off enable_blob_files before the compaction results in
// garbage collected values getting inlined.
size_t expected_number_of_files = original_blob_files.size();
if (!updated_enable_blob_files_) {
ASSERT_OK(db_->SetOptions({{"enable_blob_files", "false"}}));
expected_number_of_files -= cutoff_index;
}
constexpr Slice* begin = nullptr;
constexpr Slice* end = nullptr;
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
ASSERT_EQ(Get(first_key), first_value);
ASSERT_EQ(Get(second_key), second_value);
ASSERT_EQ(Get(third_key), third_value);
ASSERT_EQ(Get(fourth_key), fourth_value);
const std::vector<uint64_t> new_blob_files = GetBlobFileNumbers();
ASSERT_EQ(new_blob_files.size(), expected_number_of_files);
// Original blob files below the cutoff should be gone, original blob files at
// or above the cutoff should be still there
for (size_t i = cutoff_index; i < original_blob_files.size(); ++i) {
ASSERT_EQ(new_blob_files[i - cutoff_index], original_blob_files[i]);
}
}
TEST_F(DBCompactionTest, CompactionWithBlobGCError_CorruptIndex) {
Options options;
options.env = env_;
options.disable_auto_compactions = true;
options.enable_blob_files = true;
options.enable_blob_garbage_collection = true;
options.blob_garbage_collection_age_cutoff = 1.0;
Reopen(options);
constexpr char first_key[] = "first_key";
constexpr char first_value[] = "first_value";
ASSERT_OK(Put(first_key, first_value));
constexpr char second_key[] = "second_key";
constexpr char second_value[] = "second_value";
ASSERT_OK(Put(second_key, second_value));
ASSERT_OK(Flush());
constexpr char third_key[] = "third_key";
constexpr char third_value[] = "third_value";
ASSERT_OK(Put(third_key, third_value));
constexpr char fourth_key[] = "fourth_key";
constexpr char corrupt_blob_index[] = "foobar";
WriteBatch batch;
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, fourth_key,
corrupt_blob_index));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
constexpr Slice* begin = nullptr;
constexpr Slice* end = nullptr;
ASSERT_TRUE(
db_->CompactRange(CompactRangeOptions(), begin, end).IsCorruption());
}
TEST_F(DBCompactionTest, CompactionWithBlobGCError_InlinedTTLIndex) {
constexpr uint64_t min_blob_size = 10;
Options options;
options.env = env_;
options.disable_auto_compactions = true;
options.enable_blob_files = true;
options.min_blob_size = min_blob_size;
options.enable_blob_garbage_collection = true;
options.blob_garbage_collection_age_cutoff = 1.0;
Reopen(options);
constexpr char first_key[] = "first_key";
constexpr char first_value[] = "first_value";
ASSERT_OK(Put(first_key, first_value));
constexpr char second_key[] = "second_key";
constexpr char second_value[] = "second_value";
ASSERT_OK(Put(second_key, second_value));
ASSERT_OK(Flush());
constexpr char third_key[] = "third_key";
constexpr char third_value[] = "third_value";
ASSERT_OK(Put(third_key, third_value));
constexpr char fourth_key[] = "fourth_key";
constexpr char blob[] = "short";
static_assert(sizeof(short) - 1 < min_blob_size,
"Blob too long to be inlined");
// Fake an inlined TTL blob index.
std::string blob_index;
constexpr uint64_t expiration = 1234567890;
BlobIndex::EncodeInlinedTTL(&blob_index, expiration, blob);
WriteBatch batch;
ASSERT_OK(
WriteBatchInternal::PutBlobIndex(&batch, 0, fourth_key, blob_index));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
constexpr Slice* begin = nullptr;
constexpr Slice* end = nullptr;
ASSERT_TRUE(
db_->CompactRange(CompactRangeOptions(), begin, end).IsCorruption());
}
TEST_F(DBCompactionTest, CompactionWithBlobGCError_IndexWithInvalidFileNumber) {
Options options;
options.env = env_;
options.disable_auto_compactions = true;
options.enable_blob_files = true;
options.enable_blob_garbage_collection = true;
options.blob_garbage_collection_age_cutoff = 1.0;
Reopen(options);
constexpr char first_key[] = "first_key";
constexpr char first_value[] = "first_value";
ASSERT_OK(Put(first_key, first_value));
constexpr char second_key[] = "second_key";
constexpr char second_value[] = "second_value";
ASSERT_OK(Put(second_key, second_value));
ASSERT_OK(Flush());
constexpr char third_key[] = "third_key";
constexpr char third_value[] = "third_value";
ASSERT_OK(Put(third_key, third_value));
constexpr char fourth_key[] = "fourth_key";
// Fake a blob index referencing a non-existent blob file.
std::string blob_index;
constexpr uint64_t blob_file_number = 1000;
constexpr uint64_t offset = 1234;
constexpr uint64_t size = 5678;
BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size,
kNoCompression);
WriteBatch batch;
ASSERT_OK(
WriteBatchInternal::PutBlobIndex(&batch, 0, fourth_key, blob_index));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
constexpr Slice* begin = nullptr;
constexpr Slice* end = nullptr;
ASSERT_TRUE(
db_->CompactRange(CompactRangeOptions(), begin, end).IsCorruption());
}
#endif // !defined(ROCKSDB_LITE)
} // namespace ROCKSDB_NAMESPACE

@ -1807,6 +1807,12 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
}
}
return GetBlob(read_options, user_key, blob_index, value);
}
Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
const BlobIndex& blob_index,
PinnableSlice* value) const {
if (blob_index.HasTTL() || blob_index.IsInlined()) {
return Status::Corruption("Unexpected TTL/inlined blob index");
}

@ -60,6 +60,7 @@ namespace log {
class Writer;
}
class BlobIndex;
class Compaction;
class LogBuffer;
class LookupKey;
@ -683,6 +684,18 @@ class Version {
void MultiGet(const ReadOptions&, MultiGetRange* range,
ReadCallback* callback = nullptr, bool* is_blob = nullptr);
// Interprets *value as a blob reference, and (assuming the corresponding
// blob file is part of this Version) retrieves the blob and saves it in
// *value, replacing the blob reference.
// REQUIRES: *value stores an encoded blob reference
Status GetBlob(const ReadOptions& read_options, const Slice& user_key,
PinnableSlice* value) const;
// Retrieves a blob using a blob reference and saves it in *value, assuming
// the corresponding blob file is part of this Version.
Status GetBlob(const ReadOptions& read_options, const Slice& user_key,
const BlobIndex& blob_index, PinnableSlice* value) const;
// Loads some stats information from files. Call without mutex held. It needs
// to be called before applying the version to the version set.
void PrepareApply(const MutableCFOptions& mutable_cf_options,
@ -778,13 +791,6 @@ class Version {
return storage_info_.user_comparator_;
}
// Interprets *value as a blob reference, and (assuming the corresponding
// blob file is part of this Version) retrieves the blob and saves it in
// *value, replacing the blob reference.
// REQUIRES: *value stores an encoded blob reference
Status GetBlob(const ReadOptions& read_options, const Slice& user_key,
PinnableSlice* value) const;
// Returns true if the filter blocks in the specified level will not be
// checked during read operations. In certain cases (trivial move or preload),
// the filter block may already be cached, but we still do not access it such

Loading…
Cancel
Save