diff --git a/db/blob/blob_counting_iterator.h b/db/blob/blob_counting_iterator.h index b73f545eb..de549afa2 100644 --- a/db/blob/blob_counting_iterator.h +++ b/db/blob/blob_counting_iterator.h @@ -11,6 +11,7 @@ #include "rocksdb/rocksdb_namespace.h" #include "rocksdb/status.h" #include "table/internal_iterator.h" +#include "test_util/sync_point.h" namespace ROCKSDB_NAMESPACE { @@ -131,6 +132,9 @@ class BlobCountingIterator : public InternalIterator { return; } + TEST_SYNC_POINT( + "BlobCountingIterator::UpdateAndCountBlobIfNeeded:ProcessInFlow"); + status_ = blob_garbage_meter_->ProcessInFlow(key(), value()); } diff --git a/db/blob/db_blob_compaction_test.cc b/db/blob/db_blob_compaction_test.cc index 29f10f2e2..69835def5 100644 --- a/db/blob/db_blob_compaction_test.cc +++ b/db/blob/db_blob_compaction_test.cc @@ -4,6 +4,7 @@ // (found in the LICENSE.Apache file in the root directory). #include "db/blob/blob_index.h" +#include "db/blob/blob_log_format.h" #include "db/db_test_util.h" #include "port/stack_trace.h" #include "test_util/sync_point.h" @@ -152,6 +153,31 @@ class AlwaysKeepFilter : public CompactionFilter { return CompactionFilter::Decision::kKeep; } }; + +class SkipUntilFilter : public CompactionFilter { + public: + explicit SkipUntilFilter(std::string skip_until) + : skip_until_(std::move(skip_until)) {} + + const char* Name() const override { + return "rocksdb.compaction.filter.skip.until"; + } + + CompactionFilter::Decision FilterV2(int /* level */, const Slice& /* key */, + ValueType /* value_type */, + const Slice& /* existing_value */, + std::string* /* new_value */, + std::string* skip_until) const override { + assert(skip_until); + *skip_until = skip_until_; + + return CompactionFilter::Decision::kRemoveAndSkipUntil; + } + + private: + std::string skip_until_; +}; + } // anonymous namespace class DBBlobBadCompactionFilterTest @@ -254,6 +280,49 @@ TEST_F(DBBlobCompactionTest, BlindWriteFilter) { Close(); } +TEST_F(DBBlobCompactionTest, SkipUntilFilter) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + + std::unique_ptr compaction_filter_guard( + new SkipUntilFilter("z")); + options.compaction_filter = compaction_filter_guard.get(); + + Reopen(options); + + const std::vector keys{"a", "b", "c"}; + const std::vector values{"a_value", "b_value", "c_value"}; + assert(keys.size() == values.size()); + + for (size_t i = 0; i < keys.size(); ++i) { + ASSERT_OK(Put(keys[i], values[i])); + } + + ASSERT_OK(Flush()); + + int process_in_flow_called = 0; + + SyncPoint::GetInstance()->SetCallBack( + "BlobCountingIterator::UpdateAndCountBlobIfNeeded:ProcessInFlow", + [&process_in_flow_called](void* /* arg */) { ++process_in_flow_called; }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /* begin */ nullptr, + /* end */ nullptr)); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + for (const auto& key : keys) { + ASSERT_EQ(Get(key), "NOT_FOUND"); + } + + // Make sure SkipUntil was performed using iteration rather than Seek + ASSERT_EQ(process_in_flow_called, keys.size()); + + Close(); +} + TEST_P(DBBlobBadCompactionFilterTest, BadDecisionFromCompactionFilter) { Options options = GetDefaultOptions(); options.enable_blob_files = true; @@ -390,6 +459,112 @@ TEST_F(DBBlobCompactionTest, CompactionFilterReadBlobAndKeep) { Close(); } +TEST_F(DBBlobCompactionTest, TrackGarbage) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + + Reopen(options); + + // First table+blob file pair: 4 blobs with different keys + constexpr char first_key[] = "first_key"; + constexpr char first_value[] = "first_value"; + constexpr char second_key[] = "second_key"; + constexpr char second_value[] = "second_value"; + constexpr char third_key[] = "third_key"; + constexpr char third_value[] = "third_value"; + constexpr char fourth_key[] = "fourth_key"; + constexpr char fourth_value[] = "fourth_value"; + + ASSERT_OK(Put(first_key, first_value)); + ASSERT_OK(Put(second_key, second_value)); + ASSERT_OK(Put(third_key, third_value)); + ASSERT_OK(Put(fourth_key, fourth_value)); + ASSERT_OK(Flush()); + + // Second table+blob file pair: overwrite 2 existing keys + constexpr char new_first_value[] = "new_first_value"; + constexpr char new_second_value[] = "new_second_value"; + + ASSERT_OK(Put(first_key, new_first_value)); + ASSERT_OK(Put(second_key, new_second_value)); + ASSERT_OK(Flush()); + + // Compact them together. The first blob file should have 2 garbage blobs + // corresponding to the 2 overwritten keys. + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + + VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + assert(versions); + assert(versions->GetColumnFamilySet()); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + Version* const current = cfd->current(); + assert(current); + + const VersionStorageInfo* const storage_info = current->storage_info(); + assert(storage_info); + + const auto& blob_files = storage_info->GetBlobFiles(); + ASSERT_EQ(blob_files.size(), 2); + + { + auto it = blob_files.begin(); + const auto& meta = it->second; + assert(meta); + + constexpr uint64_t first_expected_bytes = + sizeof(first_value) - 1 + + BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(first_key) - + 1); + constexpr uint64_t second_expected_bytes = + sizeof(second_value) - 1 + + BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(second_key) - + 1); + constexpr uint64_t third_expected_bytes = + sizeof(third_value) - 1 + + BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(third_key) - + 1); + constexpr uint64_t fourth_expected_bytes = + sizeof(fourth_value) - 1 + + BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(fourth_key) - + 1); + + ASSERT_EQ(meta->GetTotalBlobCount(), 4); + ASSERT_EQ(meta->GetTotalBlobBytes(), + first_expected_bytes + second_expected_bytes + + third_expected_bytes + fourth_expected_bytes); + ASSERT_EQ(meta->GetGarbageBlobCount(), 2); + ASSERT_EQ(meta->GetGarbageBlobBytes(), + first_expected_bytes + second_expected_bytes); + } + + { + auto it = blob_files.rbegin(); + const auto& meta = it->second; + assert(meta); + + constexpr uint64_t new_first_expected_bytes = + sizeof(new_first_value) - 1 + + BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(first_key) - + 1); + constexpr uint64_t new_second_expected_bytes = + sizeof(new_second_value) - 1 + + BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(second_key) - + 1); + + ASSERT_EQ(meta->GetTotalBlobCount(), 2); + ASSERT_EQ(meta->GetTotalBlobBytes(), + new_first_expected_bytes + new_second_expected_bytes); + ASSERT_EQ(meta->GetGarbageBlobCount(), 0); + ASSERT_EQ(meta->GetGarbageBlobBytes(), 0); + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index 57f814fbc..10de6a6ea 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -581,6 +581,29 @@ bool Compaction::ShouldFormSubcompactions() const { } } +bool Compaction::DoesInputReferenceBlobFiles() const { + assert(input_version_); + + const VersionStorageInfo* storage_info = input_version_->storage_info(); + assert(storage_info); + + if (storage_info->GetBlobFiles().empty()) { + return false; + } + + for (size_t i = 0; i < inputs_.size(); ++i) { + for (const FileMetaData* meta : inputs_[i].files) { + assert(meta); + + if (meta->oldest_blob_file_number != kInvalidBlobFileNumber) { + return true; + } + } + } + + return false; +} + uint64_t Compaction::MinInputFileOldestAncesterTime() const { uint64_t min_oldest_ancester_time = port::kMaxUint64; for (const auto& level_files : inputs_) { diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h index eda9bf002..070657d5c 100644 --- a/db/compaction/compaction.h +++ b/db/compaction/compaction.h @@ -266,6 +266,11 @@ class Compaction { // Should this compaction be broken up into smaller ones run in parallel? bool ShouldFormSubcompactions() const; + // Returns true iff at least one input file references a blob file. + // + // PRE: input version has been set. + bool DoesInputReferenceBlobFiles() const; + // test function to validate the functionality of IsBottommostLevel() // function -- determines if compaction with inputs and storage is bottommost static bool TEST_IsBottommostLevel( diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index e48818fd0..1869020f3 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -75,10 +75,8 @@ CompactionIterator::CompactionIterator( const std::atomic* manual_compaction_canceled, const std::shared_ptr info_log, const std::string* full_history_ts_low) - : input_( - input, cmp, - compaction == - nullptr), // Now only need to count number of entries in flush. + : input_(input, cmp, + !compaction || compaction->DoesInputReferenceBlobFiles()), cmp_(cmp), merge_helper_(merge_helper), snapshots_(snapshots), diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index 65df5c444..76be6a7aa 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -97,6 +97,8 @@ class CompactionIterator { virtual double blob_garbage_collection_age_cutoff() const = 0; virtual Version* input_version() const = 0; + + virtual bool DoesInputReferenceBlobFiles() const = 0; }; class RealCompaction : public CompactionProxy { @@ -146,6 +148,10 @@ class CompactionIterator { return compaction_->input_version(); } + bool DoesInputReferenceBlobFiles() const override { + return compaction_->DoesInputReferenceBlobFiles(); + } + private: const Compaction* compaction_; }; diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index fef7b5417..168fb45b1 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -182,6 +182,8 @@ class FakeCompaction : public CompactionIterator::CompactionProxy { Version* input_version() const override { return nullptr; } + bool DoesInputReferenceBlobFiles() const override { return false; } + bool key_not_exists_beyond_output_level = false; bool is_bottommost_level = false; diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 6e30d9a65..0a43aa0b4 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -20,8 +20,10 @@ #include #include +#include "db/blob/blob_counting_iterator.h" #include "db/blob/blob_file_addition.h" #include "db/blob/blob_file_builder.h" +#include "db/blob/blob_garbage_meter.h" #include "db/builder.h" #include "db/compaction/clipping_iterator.h" #include "db/db_impl/db_impl.h" @@ -147,6 +149,7 @@ struct CompactionJob::SubcompactionState { // State kept for output being generated std::vector outputs; std::vector blob_file_additions; + std::unique_ptr blob_garbage_meter; std::unique_ptr outfile; std::unique_ptr builder; @@ -229,6 +232,14 @@ struct CompactionJob::SubcompactionState { return false; } + + Status ProcessOutFlowIfNeeded(const Slice& key, const Slice& value) { + if (!blob_garbage_meter) { + return Status::OK(); + } + + return blob_garbage_meter->ProcessOutFlow(key, value); + } }; // Maintains state for the entire compaction @@ -1136,6 +1147,15 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { input = clip.get(); } + std::unique_ptr blob_counter; + + if (sub_compact->compaction->DoesInputReferenceBlobFiles()) { + sub_compact->blob_garbage_meter.reset(new BlobGarbageMeter); + blob_counter.reset( + new BlobCountingIterator(input, sub_compact->blob_garbage_meter.get())); + input = blob_counter.get(); + } + input->SeekToFirst(); AutoThreadOperationStageUpdater stage_updater( @@ -1248,6 +1268,11 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { break; } + status = sub_compact->ProcessOutFlowIfNeeded(key, value); + if (!status.ok()) { + break; + } + sub_compact->current_output_file_size = sub_compact->builder->EstimatedFileSize(); const ParsedInternalKey& ikey = c_iter->ikey(); @@ -1415,6 +1440,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { #endif // ROCKSDB_ASSERT_STATUS_CHECKED sub_compact->c_iter.reset(); + blob_counter.reset(); clip.reset(); raw_input.reset(); sub_compact->status = status; @@ -1799,6 +1825,8 @@ Status CompactionJob::InstallCompactionResults( // Add compaction inputs compaction->AddInputDeletions(edit); + std::unordered_map blob_total_garbage; + for (const auto& sub_compact : compact_->sub_compact_states) { for (const auto& out : sub_compact.outputs) { edit->AddFile(compaction->output_level(), out.meta); @@ -1807,6 +1835,29 @@ Status CompactionJob::InstallCompactionResults( for (const auto& blob : sub_compact.blob_file_additions) { edit->AddBlobFile(blob); } + + if (sub_compact.blob_garbage_meter) { + const auto& flows = sub_compact.blob_garbage_meter->flows(); + + for (const auto& pair : flows) { + const uint64_t blob_file_number = pair.first; + const BlobGarbageMeter::BlobInOutFlow& flow = pair.second; + + assert(flow.IsValid()); + if (flow.HasGarbage()) { + blob_total_garbage[blob_file_number].Add(flow.GetGarbageCount(), + flow.GetGarbageBytes()); + } + } + } + } + + for (const auto& pair : blob_total_garbage) { + const uint64_t blob_file_number = pair.first; + const BlobGarbageMeter::BlobStats& stats = pair.second; + + edit->AddBlobFileGarbage(blob_file_number, stats.GetCount(), + stats.GetBytes()); } return versions_->LogAndApply(compaction->column_family_data(),