diff --git a/db/column_family.cc b/db/column_family.cc index b4e9b82d3..ecbb0e168 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -1339,6 +1339,15 @@ Status ColumnFamilyData::ValidateOptions( "Block-Based Table format. "); } } + + if (cf_options.enable_blob_garbage_collection && + (cf_options.blob_garbage_collection_age_cutoff < 0.0 || + cf_options.blob_garbage_collection_age_cutoff > 1.0)) { + return Status::InvalidArgument( + "The age cutoff for blob garbage collection should be in the range " + "[0.0, 1.0]."); + } + return s; } diff --git a/db/column_family_test.cc b/db/column_family_test.cc index ea55dbedd..a6f6cd393 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -3391,6 +3391,30 @@ TEST_P(ColumnFamilyTest, MultipleCFPathsTest) { } } +TEST(ColumnFamilyTest, ValidateBlobGCCutoff) { + DBOptions db_options; + + ColumnFamilyOptions cf_options; + cf_options.enable_blob_garbage_collection = true; + + cf_options.blob_garbage_collection_age_cutoff = -0.5; + ASSERT_TRUE(ColumnFamilyData::ValidateOptions(db_options, cf_options) + .IsInvalidArgument()); + + cf_options.blob_garbage_collection_age_cutoff = 0.0; + ASSERT_OK(ColumnFamilyData::ValidateOptions(db_options, cf_options)); + + cf_options.blob_garbage_collection_age_cutoff = 0.5; + ASSERT_OK(ColumnFamilyData::ValidateOptions(db_options, cf_options)); + + cf_options.blob_garbage_collection_age_cutoff = 1.0; + ASSERT_OK(ColumnFamilyData::ValidateOptions(db_options, cf_options)); + + cf_options.blob_garbage_collection_age_cutoff = 1.5; + ASSERT_TRUE(ColumnFamilyData::ValidateOptions(db_options, cf_options) + .IsInvalidArgument()); +} + } // namespace ROCKSDB_NAMESPACE #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 8ee5841ed..720cdbb65 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -5,9 +5,11 @@ #include "db/compaction/compaction_iterator.h" -#include +#include +#include #include "db/blob/blob_file_builder.h" +#include "db/blob/blob_index.h" #include "db/snapshot_checker.h" #include "port/likely.h" #include "rocksdb/listener.h" @@ -94,6 +96,8 @@ CompactionIterator::CompactionIterator( current_user_key_sequence_(0), current_user_key_snapshot_(0), merge_out_iter_(merge_helper_), + blob_garbage_collection_cutoff_file_number_( + ComputeBlobGarbageCollectionCutoffFileNumber(compaction_.get())), current_key_committed_(false), cmp_with_history_ts_low_(0) { assert(compaction_filter_ == nullptr || compaction_ != nullptr); @@ -733,40 +737,136 @@ void CompactionIterator::NextFromInput() { } } +bool CompactionIterator::ExtractLargeValueIfNeededImpl() { + if (!blob_file_builder_) { + return false; + } + + blob_index_.clear(); + const Status s = blob_file_builder_->Add(user_key(), value_, &blob_index_); + + if (!s.ok()) { + status_ = s; + valid_ = false; + + return false; + } + + if (blob_index_.empty()) { + return false; + } + + value_ = blob_index_; + + return true; +} + +void CompactionIterator::ExtractLargeValueIfNeeded() { + assert(ikey_.type == kTypeValue); + + if (!ExtractLargeValueIfNeededImpl()) { + return; + } + + ikey_.type = kTypeBlobIndex; + current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); +} + +void CompactionIterator::GarbageCollectBlobIfNeeded() { + assert(ikey_.type == kTypeBlobIndex); + + if (!compaction_) { + return; + } + + // GC for integrated BlobDB + if (compaction_->enable_blob_garbage_collection()) { + BlobIndex blob_index; + + { + const Status s = blob_index.DecodeFrom(value_); + + if (!s.ok()) { + status_ = s; + valid_ = false; + + return; + } + } + + if (blob_index.IsInlined() || blob_index.HasTTL()) { + status_ = Status::Corruption("Unexpected TTL/inlined blob index"); + valid_ = false; + + return; + } + + if (blob_index.file_number() >= + blob_garbage_collection_cutoff_file_number_) { + return; + } + + const Version* const version = compaction_->input_version(); + assert(version); + + { + const Status s = + version->GetBlob(ReadOptions(), user_key(), blob_index, &blob_value_); + + if (!s.ok()) { + status_ = s; + valid_ = false; + + return; + } + } + + value_ = blob_value_; + + if (ExtractLargeValueIfNeededImpl()) { + return; + } + + ikey_.type = kTypeValue; + current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); + + return; + } + + // GC for stacked BlobDB + if (compaction_filter_) { + const auto blob_decision = compaction_filter_->PrepareBlobOutput( + user_key(), value_, &compaction_filter_value_); + + if (blob_decision == CompactionFilter::BlobDecision::kCorruption) { + status_ = + Status::Corruption("Corrupted blob reference encountered during GC"); + valid_ = false; + + return; + } + + if (blob_decision == CompactionFilter::BlobDecision::kIOError) { + status_ = Status::IOError("Could not relocate blob during GC"); + valid_ = false; + + return; + } + + if (blob_decision == CompactionFilter::BlobDecision::kChangeValue) { + value_ = compaction_filter_value_; + + return; + } + } +} + void CompactionIterator::PrepareOutput() { if (valid_) { if (ikey_.type == kTypeValue) { - if (blob_file_builder_) { - blob_index_.clear(); - const Status s = - blob_file_builder_->Add(user_key(), value_, &blob_index_); - - if (!s.ok()) { - status_ = s; - valid_ = false; - } else if (!blob_index_.empty()) { - value_ = blob_index_; - ikey_.type = kTypeBlobIndex; - current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); - } - } + ExtractLargeValueIfNeeded(); } else if (ikey_.type == kTypeBlobIndex) { - if (compaction_filter_) { - const auto blob_decision = compaction_filter_->PrepareBlobOutput( - user_key(), value_, &compaction_filter_value_); - - if (blob_decision == CompactionFilter::BlobDecision::kCorruption) { - status_ = Status::Corruption( - "Corrupted blob reference encountered during GC"); - valid_ = false; - } else if (blob_decision == CompactionFilter::BlobDecision::kIOError) { - status_ = Status::IOError("Could not relocate blob during GC"); - valid_ = false; - } else if (blob_decision == - CompactionFilter::BlobDecision::kChangeValue) { - value_ = compaction_filter_value_; - } - } + GarbageCollectBlobIfNeeded(); } // Zeroing out the sequence number leads to better compression. @@ -894,4 +994,30 @@ bool CompactionIterator::IsInEarliestSnapshot(SequenceNumber sequence) { return in_snapshot == SnapshotCheckerResult::kInSnapshot; } +uint64_t CompactionIterator::ComputeBlobGarbageCollectionCutoffFileNumber( + const CompactionProxy* compaction) { + if (!compaction) { + return 0; + } + + if (!compaction->enable_blob_garbage_collection()) { + return 0; + } + + Version* const version = compaction->input_version(); + assert(version); + + const VersionStorageInfo* const storage_info = version->storage_info(); + assert(storage_info); + + const auto& blob_files = storage_info->GetBlobFiles(); + + auto it = blob_files.begin(); + std::advance( + it, compaction->blob_garbage_collection_age_cutoff() * blob_files.size()); + + return it != blob_files.end() ? it->first + : std::numeric_limits::max(); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index 5088038b6..a669b16cb 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -5,6 +5,7 @@ #pragma once #include +#include #include #include #include @@ -45,6 +46,12 @@ class CompactionIterator { virtual bool allow_ingest_behind() const = 0; virtual bool preserve_deletes() const = 0; + + virtual bool enable_blob_garbage_collection() const = 0; + + virtual double blob_garbage_collection_age_cutoff() const = 0; + + virtual Version* input_version() const = 0; }; class RealCompaction : public CompactionProxy { @@ -53,6 +60,7 @@ class CompactionIterator { : compaction_(compaction) { assert(compaction_); assert(compaction_->immutable_cf_options()); + assert(compaction_->mutable_cf_options()); } int level() const override { return compaction_->level(); } @@ -80,6 +88,19 @@ class CompactionIterator { return compaction_->immutable_cf_options()->preserve_deletes; } + bool enable_blob_garbage_collection() const override { + return compaction_->mutable_cf_options()->enable_blob_garbage_collection; + } + + double blob_garbage_collection_age_cutoff() const override { + return compaction_->mutable_cf_options() + ->blob_garbage_collection_age_cutoff; + } + + Version* input_version() const override { + return compaction_->input_version(); + } + private: const Compaction* compaction_; }; @@ -146,11 +167,30 @@ class CompactionIterator { // Processes the input stream to find the next output void NextFromInput(); - // Do last preparations before presenting the output to the callee. At this - // point this only zeroes out the sequence number if possible for better - // compression. + // Do final preparations before presenting the output to the callee. void PrepareOutput(); + // Passes the output value to the blob file builder (if any), and replaces it + // with the corresponding blob reference if it has been actually written to a + // blob file (i.e. if it passed the value size check). Returns true if the + // value got extracted to a blob file, false otherwise. + bool ExtractLargeValueIfNeededImpl(); + + // Extracts large values as described above, and updates the internal key's + // type to kTypeBlobIndex if the value got extracted. Should only be called + // for regular values (kTypeValue). + void ExtractLargeValueIfNeeded(); + + // Relocates valid blobs residing in the oldest blob files if garbage + // collection is enabled. Relocated blobs are written to new blob files or + // inlined in the LSM tree depending on the current settings (i.e. + // enable_blob_files and min_blob_size). Should only be called for blob + // references (kTypeBlobIndex). + // + // Note: the stacked BlobDB implementation's compaction filter based GC + // algorithm is also called from here. + void GarbageCollectBlobIfNeeded(); + // Invoke compaction filter if needed. // Return true on success, false on failures (e.g.: kIOError). bool InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until); @@ -191,6 +231,9 @@ class CompactionIterator { } } + static uint64_t ComputeBlobGarbageCollectionCutoffFileNumber( + const CompactionProxy* compaction); + InternalIterator* input_; const Comparator* cmp_; MergeHelper* merge_helper_; @@ -273,7 +316,11 @@ class CompactionIterator { // PinnedIteratorsManager used to pin input_ Iterator blocks while reading // merge operands and then releasing them after consuming them. PinnedIteratorsManager pinned_iters_mgr_; + + uint64_t blob_garbage_collection_cutoff_file_number_; + std::string blob_index_; + PinnableSlice blob_value_; std::string compaction_filter_value_; InternalKey compaction_filter_skip_until_; // "level_ptrs" holds indices that remember which file of an associated diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index 0885da5c5..acd3b6600 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -176,6 +176,12 @@ class FakeCompaction : public CompactionIterator::CompactionProxy { bool preserve_deletes() const override { return false; } + bool enable_blob_garbage_collection() const override { return false; } + + double blob_garbage_collection_age_cutoff() const override { return 0.0; } + + Version* input_version() const override { return nullptr; } + bool key_not_exists_beyond_output_level = false; bool is_bottommost_level = false; diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 3d24945eb..725811c56 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -7,6 +7,9 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include + +#include "db/blob/blob_index.h" #include "db/db_test_util.h" #include "port/port.h" #include "port/stack_trace.h" @@ -28,6 +31,31 @@ class DBCompactionTest : public DBTestBase { public: DBCompactionTest() : DBTestBase("/db_compaction_test", /*env_do_fsync=*/true) {} + + std::vector GetBlobFileNumbers() { + VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + assert(versions); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + Version* const current = cfd->current(); + assert(current); + + const VersionStorageInfo* const storage_info = current->storage_info(); + assert(storage_info); + + const auto& blob_files = storage_info->GetBlobFiles(); + + std::vector result; + result.reserve(blob_files.size()); + + for (const auto& blob_file : blob_files) { + result.emplace_back(blob_file.first); + } + + return result; + } }; class DBCompactionTestWithParam @@ -6020,6 +6048,233 @@ TEST_P(DBCompactionTestBlobError, CompactionError) { } } +class DBCompactionTestBlobGC + : public DBCompactionTest, + public testing::WithParamInterface> { + public: + DBCompactionTestBlobGC() + : blob_gc_age_cutoff_(std::get<0>(GetParam())), + updated_enable_blob_files_(std::get<1>(GetParam())) {} + + double blob_gc_age_cutoff_; + bool updated_enable_blob_files_; +}; + +INSTANTIATE_TEST_CASE_P(DBCompactionTestBlobGC, DBCompactionTestBlobGC, + ::testing::Combine(::testing::Values(0.0, 0.5, 1.0), + ::testing::Bool())); + +TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGC) { + Options options; + options.env = env_; + options.disable_auto_compactions = true; + options.enable_blob_files = true; + options.blob_file_size = 32; // one blob per file + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = blob_gc_age_cutoff_; + + Reopen(options); + + constexpr char first_key[] = "first_key"; + constexpr char first_value[] = "first_value"; + constexpr char second_key[] = "second_key"; + constexpr char second_value[] = "second_value"; + + ASSERT_OK(Put(first_key, first_value)); + ASSERT_OK(Put(second_key, second_value)); + ASSERT_OK(Flush()); + + constexpr char third_key[] = "third_key"; + constexpr char third_value[] = "third_value"; + constexpr char fourth_key[] = "fourth_key"; + constexpr char fourth_value[] = "fourth_value"; + + ASSERT_OK(Put(third_key, third_value)); + ASSERT_OK(Put(fourth_key, fourth_value)); + ASSERT_OK(Flush()); + + const std::vector original_blob_files = GetBlobFileNumbers(); + + ASSERT_EQ(original_blob_files.size(), 4); + + const size_t cutoff_index = static_cast( + options.blob_garbage_collection_age_cutoff * original_blob_files.size()); + + // Note: turning off enable_blob_files before the compaction results in + // garbage collected values getting inlined. + size_t expected_number_of_files = original_blob_files.size(); + + if (!updated_enable_blob_files_) { + ASSERT_OK(db_->SetOptions({{"enable_blob_files", "false"}})); + + expected_number_of_files -= cutoff_index; + } + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + + ASSERT_EQ(Get(first_key), first_value); + ASSERT_EQ(Get(second_key), second_value); + ASSERT_EQ(Get(third_key), third_value); + ASSERT_EQ(Get(fourth_key), fourth_value); + + const std::vector new_blob_files = GetBlobFileNumbers(); + + ASSERT_EQ(new_blob_files.size(), expected_number_of_files); + + // Original blob files below the cutoff should be gone, original blob files at + // or above the cutoff should be still there + for (size_t i = cutoff_index; i < original_blob_files.size(); ++i) { + ASSERT_EQ(new_blob_files[i - cutoff_index], original_blob_files[i]); + } +} + +TEST_F(DBCompactionTest, CompactionWithBlobGCError_CorruptIndex) { + Options options; + options.env = env_; + options.disable_auto_compactions = true; + options.enable_blob_files = true; + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 1.0; + + Reopen(options); + + constexpr char first_key[] = "first_key"; + constexpr char first_value[] = "first_value"; + ASSERT_OK(Put(first_key, first_value)); + + constexpr char second_key[] = "second_key"; + constexpr char second_value[] = "second_value"; + ASSERT_OK(Put(second_key, second_value)); + + ASSERT_OK(Flush()); + + constexpr char third_key[] = "third_key"; + constexpr char third_value[] = "third_value"; + ASSERT_OK(Put(third_key, third_value)); + + constexpr char fourth_key[] = "fourth_key"; + constexpr char corrupt_blob_index[] = "foobar"; + + WriteBatch batch; + ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, fourth_key, + corrupt_blob_index)); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + ASSERT_OK(Flush()); + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + + ASSERT_TRUE( + db_->CompactRange(CompactRangeOptions(), begin, end).IsCorruption()); +} + +TEST_F(DBCompactionTest, CompactionWithBlobGCError_InlinedTTLIndex) { + constexpr uint64_t min_blob_size = 10; + + Options options; + options.env = env_; + options.disable_auto_compactions = true; + options.enable_blob_files = true; + options.min_blob_size = min_blob_size; + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 1.0; + + Reopen(options); + + constexpr char first_key[] = "first_key"; + constexpr char first_value[] = "first_value"; + ASSERT_OK(Put(first_key, first_value)); + + constexpr char second_key[] = "second_key"; + constexpr char second_value[] = "second_value"; + ASSERT_OK(Put(second_key, second_value)); + + ASSERT_OK(Flush()); + + constexpr char third_key[] = "third_key"; + constexpr char third_value[] = "third_value"; + ASSERT_OK(Put(third_key, third_value)); + + constexpr char fourth_key[] = "fourth_key"; + constexpr char blob[] = "short"; + static_assert(sizeof(short) - 1 < min_blob_size, + "Blob too long to be inlined"); + + // Fake an inlined TTL blob index. + std::string blob_index; + + constexpr uint64_t expiration = 1234567890; + + BlobIndex::EncodeInlinedTTL(&blob_index, expiration, blob); + + WriteBatch batch; + ASSERT_OK( + WriteBatchInternal::PutBlobIndex(&batch, 0, fourth_key, blob_index)); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + ASSERT_OK(Flush()); + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + + ASSERT_TRUE( + db_->CompactRange(CompactRangeOptions(), begin, end).IsCorruption()); +} + +TEST_F(DBCompactionTest, CompactionWithBlobGCError_IndexWithInvalidFileNumber) { + Options options; + options.env = env_; + options.disable_auto_compactions = true; + options.enable_blob_files = true; + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 1.0; + + Reopen(options); + + constexpr char first_key[] = "first_key"; + constexpr char first_value[] = "first_value"; + ASSERT_OK(Put(first_key, first_value)); + + constexpr char second_key[] = "second_key"; + constexpr char second_value[] = "second_value"; + ASSERT_OK(Put(second_key, second_value)); + + ASSERT_OK(Flush()); + + constexpr char third_key[] = "third_key"; + constexpr char third_value[] = "third_value"; + ASSERT_OK(Put(third_key, third_value)); + + constexpr char fourth_key[] = "fourth_key"; + + // Fake a blob index referencing a non-existent blob file. + std::string blob_index; + + constexpr uint64_t blob_file_number = 1000; + constexpr uint64_t offset = 1234; + constexpr uint64_t size = 5678; + + BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size, + kNoCompression); + + WriteBatch batch; + ASSERT_OK( + WriteBatchInternal::PutBlobIndex(&batch, 0, fourth_key, blob_index)); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + ASSERT_OK(Flush()); + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + + ASSERT_TRUE( + db_->CompactRange(CompactRangeOptions(), begin, end).IsCorruption()); +} + #endif // !defined(ROCKSDB_LITE) } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_set.cc b/db/version_set.cc index cffc5979d..3c5bd5557 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1807,6 +1807,12 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, } } + return GetBlob(read_options, user_key, blob_index, value); +} + +Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, + const BlobIndex& blob_index, + PinnableSlice* value) const { if (blob_index.HasTTL() || blob_index.IsInlined()) { return Status::Corruption("Unexpected TTL/inlined blob index"); } diff --git a/db/version_set.h b/db/version_set.h index 86eeea614..1b3287f5e 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -60,6 +60,7 @@ namespace log { class Writer; } +class BlobIndex; class Compaction; class LogBuffer; class LookupKey; @@ -683,6 +684,18 @@ class Version { void MultiGet(const ReadOptions&, MultiGetRange* range, ReadCallback* callback = nullptr, bool* is_blob = nullptr); + // Interprets *value as a blob reference, and (assuming the corresponding + // blob file is part of this Version) retrieves the blob and saves it in + // *value, replacing the blob reference. + // REQUIRES: *value stores an encoded blob reference + Status GetBlob(const ReadOptions& read_options, const Slice& user_key, + PinnableSlice* value) const; + + // Retrieves a blob using a blob reference and saves it in *value, assuming + // the corresponding blob file is part of this Version. + Status GetBlob(const ReadOptions& read_options, const Slice& user_key, + const BlobIndex& blob_index, PinnableSlice* value) const; + // Loads some stats information from files. Call without mutex held. It needs // to be called before applying the version to the version set. void PrepareApply(const MutableCFOptions& mutable_cf_options, @@ -778,13 +791,6 @@ class Version { return storage_info_.user_comparator_; } - // Interprets *value as a blob reference, and (assuming the corresponding - // blob file is part of this Version) retrieves the blob and saves it in - // *value, replacing the blob reference. - // REQUIRES: *value stores an encoded blob reference - Status GetBlob(const ReadOptions& read_options, const Slice& user_key, - PinnableSlice* value) const; - // Returns true if the filter blocks in the specified level will not be // checked during read operations. In certain cases (trivial move or preload), // the filter block may already be cached, but we still do not access it such