From 065bea158798f5d67eb95ce339f3650db174c109 Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Mon, 21 Jun 2021 22:24:23 -0700 Subject: [PATCH] Add a class for measuring the amount of garbage generated during compaction (#8426) Summary: This is part of an alternative approach to https://github.com/facebook/rocksdb/issues/8316. Unlike that approach, this one relies on key-values getting processed one by one during compaction, and does not involve persistence. Specifically, the patch adds a class `BlobGarbageMeter` that can track the number and total size of blobs in a (sub)compaction's input and output on a per-blob file basis. This information can then be used to compute the amount of additional garbage generated by the compaction for any given blob file by subtracting the "outflow" from the "inflow." Note: this patch only adds `BlobGarbageMeter` and associated unit tests. I plan to hook up this class to the input and output of `CompactionIterator` in a subsequent PR. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8426 Test Plan: `make check` Reviewed By: jay-zhuang Differential Revision: D29242250 Pulled By: ltamasi fbshipit-source-id: 597e50ad556540e413a50e804ba15bc044d809bb --- CMakeLists.txt | 2 + Makefile | 3 + TARGETS | 9 ++ db/blob/blob_garbage_meter.cc | 100 +++++++++++++++ db/blob/blob_garbage_meter.h | 102 +++++++++++++++ db/blob/blob_garbage_meter_test.cc | 196 +++++++++++++++++++++++++++++ db/blob/blob_log_format.h | 3 +- src.mk | 2 + 8 files changed, 416 insertions(+), 1 deletion(-) create mode 100644 db/blob/blob_garbage_meter.cc create mode 100644 db/blob/blob_garbage_meter.h create mode 100644 db/blob/blob_garbage_meter_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index a5ced51fc..763daabcc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -635,6 +635,7 @@ set(SOURCES db/blob/blob_file_garbage.cc db/blob/blob_file_meta.cc db/blob/blob_file_reader.cc + db/blob/blob_garbage_meter.cc db/blob/blob_log_format.cc db/blob/blob_log_sequential_reader.cc db/blob/blob_log_writer.cc @@ -1128,6 +1129,7 @@ if(WITH_TESTS) db/blob/blob_file_cache_test.cc db/blob/blob_file_garbage_test.cc db/blob/blob_file_reader_test.cc + db/blob/blob_garbage_meter_test.cc db/blob/db_blob_basic_test.cc db/blob/db_blob_compaction_test.cc db/blob/db_blob_corruption_test.cc diff --git a/Makefile b/Makefile index e8e09588e..3daa2c037 100644 --- a/Makefile +++ b/Makefile @@ -1842,6 +1842,9 @@ blob_file_garbage_test: $(OBJ_DIR)/db/blob/blob_file_garbage_test.o $(TEST_LIBRA blob_file_reader_test: $(OBJ_DIR)/db/blob/blob_file_reader_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +blob_garbage_meter_test: $(OBJ_DIR)/db/blob/blob_garbage_meter_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + timer_test: $(OBJ_DIR)/util/timer_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 248a810ba..ac8f9509b 100644 --- a/TARGETS +++ b/TARGETS @@ -144,6 +144,7 @@ cpp_library( "db/blob/blob_file_garbage.cc", "db/blob/blob_file_meta.cc", "db/blob/blob_file_reader.cc", + "db/blob/blob_garbage_meter.cc", "db/blob/blob_log_format.cc", "db/blob/blob_log_sequential_reader.cc", "db/blob/blob_log_writer.cc", @@ -456,6 +457,7 @@ cpp_library( "db/blob/blob_file_garbage.cc", "db/blob/blob_file_meta.cc", "db/blob/blob_file_reader.cc", + "db/blob/blob_garbage_meter.cc", "db/blob/blob_log_format.cc", "db/blob/blob_log_sequential_reader.cc", "db/blob/blob_log_writer.cc", @@ -948,6 +950,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "blob_garbage_meter_test", + "db/blob/blob_garbage_meter_test.cc", + "parallel", + [], + [], + ], [ "block_based_filter_block_test", "table/block_based/block_based_filter_block_test.cc", diff --git a/db/blob/blob_garbage_meter.cc b/db/blob/blob_garbage_meter.cc new file mode 100644 index 000000000..d328d7ff4 --- /dev/null +++ b/db/blob/blob_garbage_meter.cc @@ -0,0 +1,100 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob/blob_garbage_meter.h" + +#include "db/blob/blob_index.h" +#include "db/blob/blob_log_format.h" +#include "db/dbformat.h" + +namespace ROCKSDB_NAMESPACE { + +Status BlobGarbageMeter::ProcessInFlow(const Slice& key, const Slice& value) { + uint64_t blob_file_number = kInvalidBlobFileNumber; + uint64_t bytes = 0; + + const Status s = Parse(key, value, &blob_file_number, &bytes); + if (!s.ok()) { + return s; + } + + if (blob_file_number == kInvalidBlobFileNumber) { + return Status::OK(); + } + + flows_[blob_file_number].AddInFlow(bytes); + + return Status::OK(); +} + +Status BlobGarbageMeter::ProcessOutFlow(const Slice& key, const Slice& value) { + uint64_t blob_file_number = kInvalidBlobFileNumber; + uint64_t bytes = 0; + + const Status s = Parse(key, value, &blob_file_number, &bytes); + if (!s.ok()) { + return s; + } + + if (blob_file_number == kInvalidBlobFileNumber) { + return Status::OK(); + } + + // Note: in order to measure the amount of additional garbage, we only need to + // track the outflow for preexisting files, i.e. those that also had inflow. + // (Newly written files would only have outflow.) + auto it = flows_.find(blob_file_number); + if (it == flows_.end()) { + return Status::OK(); + } + + it->second.AddOutFlow(bytes); + + return Status::OK(); +} + +Status BlobGarbageMeter::Parse(const Slice& key, const Slice& value, + uint64_t* blob_file_number, uint64_t* bytes) { + assert(blob_file_number); + assert(*blob_file_number == kInvalidBlobFileNumber); + assert(bytes); + assert(*bytes == 0); + + ParsedInternalKey ikey; + + { + constexpr bool log_err_key = false; + const Status s = ParseInternalKey(key, &ikey, log_err_key); + if (!s.ok()) { + return s; + } + } + + if (ikey.type != kTypeBlobIndex) { + return Status::OK(); + } + + BlobIndex blob_index; + + { + const Status s = blob_index.DecodeFrom(value); + if (!s.ok()) { + return s; + } + } + + if (blob_index.IsInlined() || blob_index.HasTTL()) { + return Status::Corruption("Unexpected TTL/inlined blob index"); + } + + *blob_file_number = blob_index.file_number(); + *bytes = + blob_index.size() + + BlobLogRecord::CalculateAdjustmentForRecordHeader(ikey.user_key.size()); + + return Status::OK(); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_garbage_meter.h b/db/blob/blob_garbage_meter.h new file mode 100644 index 000000000..a6c04b0b2 --- /dev/null +++ b/db/blob/blob_garbage_meter.h @@ -0,0 +1,102 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include +#include + +#include "db/blob/blob_constants.h" +#include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/status.h" + +namespace ROCKSDB_NAMESPACE { + +class Slice; + +// A class that can be used to compute the amount of additional garbage +// generated by a compaction. It parses the keys and blob references in the +// input and output of a compaction, and aggregates the "inflow" and "outflow" +// on a per-blob file basis. The amount of additional garbage for any given blob +// file can then be computed by subtracting the outflow from the inflow. +class BlobGarbageMeter { + public: + // A class to store the number and total size of blobs on a per-blob file + // basis. + class BlobStats { + public: + void Add(uint64_t bytes) { + ++count_; + bytes_ += bytes; + } + void Add(uint64_t count, uint64_t bytes) { + count_ += count; + bytes_ += bytes; + } + + uint64_t GetCount() const { return count_; } + uint64_t GetBytes() const { return bytes_; } + + private: + uint64_t count_ = 0; + uint64_t bytes_ = 0; + }; + + // A class to keep track of the "inflow" and the "outflow" and to compute the + // amount of additional garbage for a given blob file. + class BlobInOutFlow { + public: + void AddInFlow(uint64_t bytes) { + in_flow_.Add(bytes); + assert(IsValid()); + } + void AddOutFlow(uint64_t bytes) { + out_flow_.Add(bytes); + assert(IsValid()); + } + + const BlobStats& GetInFlow() const { return in_flow_; } + const BlobStats& GetOutFlow() const { return out_flow_; } + + bool IsValid() const { + return in_flow_.GetCount() >= out_flow_.GetCount() && + in_flow_.GetBytes() >= out_flow_.GetBytes(); + } + bool HasGarbage() const { + assert(IsValid()); + return in_flow_.GetCount() > out_flow_.GetCount(); + } + uint64_t GetGarbageCount() const { + assert(IsValid()); + assert(HasGarbage()); + return in_flow_.GetCount() - out_flow_.GetCount(); + } + uint64_t GetGarbageBytes() const { + assert(IsValid()); + assert(HasGarbage()); + return in_flow_.GetBytes() - out_flow_.GetBytes(); + } + + private: + BlobStats in_flow_; + BlobStats out_flow_; + }; + + Status ProcessInFlow(const Slice& key, const Slice& value); + Status ProcessOutFlow(const Slice& key, const Slice& value); + + const std::unordered_map& flows() const { + return flows_; + } + + private: + static Status Parse(const Slice& key, const Slice& value, + uint64_t* blob_file_number, uint64_t* bytes); + + std::unordered_map flows_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_garbage_meter_test.cc b/db/blob/blob_garbage_meter_test.cc new file mode 100644 index 000000000..7066a2c33 --- /dev/null +++ b/db/blob/blob_garbage_meter_test.cc @@ -0,0 +1,196 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob/blob_garbage_meter.h" + +#include +#include + +#include "db/blob/blob_index.h" +#include "db/blob/blob_log_format.h" +#include "db/dbformat.h" +#include "test_util/testharness.h" + +namespace ROCKSDB_NAMESPACE { + +TEST(BlobGarbageMeterTest, MeasureGarbage) { + BlobGarbageMeter blob_garbage_meter; + + struct BlobDescriptor { + std::string user_key; + uint64_t blob_file_number; + uint64_t offset; + uint64_t size; + CompressionType compression_type; + bool has_in_flow; + bool has_out_flow; + + uint64_t GetExpectedBytes() const { + return size + + BlobLogRecord::CalculateAdjustmentForRecordHeader(user_key.size()); + } + }; + + // Note: blob file 4 has the same inflow and outflow and hence no additional + // garbage. Blob file 5 has less outflow than inflow and thus it does have + // additional garbage. Blob file 6 is a newly written file (i.e. no inflow, + // only outflow) and is thus not tracked by the meter. + std::vector blobs{ + {"key", 4, 1234, 555, kLZ4Compression, true, true}, + {"other_key", 4, 6789, 101010, kLZ4Compression, true, true}, + {"yet_another_key", 5, 22222, 3456, kLZ4Compression, true, true}, + {"foo_key", 5, 77777, 8888, kLZ4Compression, true, true}, + {"bar_key", 5, 999999, 1212, kLZ4Compression, true, false}, + {"baz_key", 5, 1234567, 890, kLZ4Compression, true, false}, + {"new_key", 6, 7777, 9999, kNoCompression, false, true}}; + + for (const auto& blob : blobs) { + constexpr SequenceNumber seq = 123; + const InternalKey key(blob.user_key, seq, kTypeBlobIndex); + const Slice key_slice = key.Encode(); + + std::string value; + BlobIndex::EncodeBlob(&value, blob.blob_file_number, blob.offset, blob.size, + blob.compression_type); + const Slice value_slice(value); + + if (blob.has_in_flow) { + ASSERT_OK(blob_garbage_meter.ProcessInFlow(key_slice, value_slice)); + } + if (blob.has_out_flow) { + ASSERT_OK(blob_garbage_meter.ProcessOutFlow(key_slice, value_slice)); + } + } + + const auto& flows = blob_garbage_meter.flows(); + ASSERT_EQ(flows.size(), 2); + + { + const auto it = flows.find(4); + ASSERT_NE(it, flows.end()); + + const auto& flow = it->second; + + constexpr uint64_t expected_count = 2; + const uint64_t expected_bytes = + blobs[0].GetExpectedBytes() + blobs[1].GetExpectedBytes(); + + const auto& in = flow.GetInFlow(); + ASSERT_EQ(in.GetCount(), expected_count); + ASSERT_EQ(in.GetBytes(), expected_bytes); + + const auto& out = flow.GetOutFlow(); + ASSERT_EQ(out.GetCount(), expected_count); + ASSERT_EQ(out.GetBytes(), expected_bytes); + + ASSERT_TRUE(flow.IsValid()); + ASSERT_FALSE(flow.HasGarbage()); + } + + { + const auto it = flows.find(5); + ASSERT_NE(it, flows.end()); + + const auto& flow = it->second; + + const auto& in = flow.GetInFlow(); + + constexpr uint64_t expected_in_count = 4; + const uint64_t expected_in_bytes = + blobs[2].GetExpectedBytes() + blobs[3].GetExpectedBytes() + + blobs[4].GetExpectedBytes() + blobs[5].GetExpectedBytes(); + + ASSERT_EQ(in.GetCount(), expected_in_count); + ASSERT_EQ(in.GetBytes(), expected_in_bytes); + + const auto& out = flow.GetOutFlow(); + + constexpr uint64_t expected_out_count = 2; + const uint64_t expected_out_bytes = + blobs[2].GetExpectedBytes() + blobs[3].GetExpectedBytes(); + + ASSERT_EQ(out.GetCount(), expected_out_count); + ASSERT_EQ(out.GetBytes(), expected_out_bytes); + + ASSERT_TRUE(flow.IsValid()); + ASSERT_TRUE(flow.HasGarbage()); + ASSERT_EQ(flow.GetGarbageCount(), expected_in_count - expected_out_count); + ASSERT_EQ(flow.GetGarbageBytes(), expected_in_bytes - expected_out_bytes); + } +} + +TEST(BlobGarbageMeterTest, PlainValue) { + constexpr char user_key[] = "user_key"; + constexpr SequenceNumber seq = 123; + + const InternalKey key(user_key, seq, kTypeValue); + const Slice key_slice = key.Encode(); + + constexpr char value[] = "value"; + const Slice value_slice(value); + + BlobGarbageMeter blob_garbage_meter; + + ASSERT_OK(blob_garbage_meter.ProcessInFlow(key_slice, value_slice)); + ASSERT_OK(blob_garbage_meter.ProcessOutFlow(key_slice, value_slice)); + ASSERT_TRUE(blob_garbage_meter.flows().empty()); +} + +TEST(BlobGarbageMeterTest, CorruptInternalKey) { + constexpr char corrupt_key[] = "i_am_corrupt"; + const Slice key_slice(corrupt_key); + + constexpr char value[] = "value"; + const Slice value_slice(value); + + BlobGarbageMeter blob_garbage_meter; + + ASSERT_NOK(blob_garbage_meter.ProcessInFlow(key_slice, value_slice)); + ASSERT_NOK(blob_garbage_meter.ProcessOutFlow(key_slice, value_slice)); +} + +TEST(BlobGarbageMeterTest, CorruptBlobIndex) { + constexpr char user_key[] = "user_key"; + constexpr SequenceNumber seq = 123; + + const InternalKey key(user_key, seq, kTypeBlobIndex); + const Slice key_slice = key.Encode(); + + constexpr char value[] = "i_am_not_a_blob_index"; + const Slice value_slice(value); + + BlobGarbageMeter blob_garbage_meter; + + ASSERT_NOK(blob_garbage_meter.ProcessInFlow(key_slice, value_slice)); + ASSERT_NOK(blob_garbage_meter.ProcessOutFlow(key_slice, value_slice)); +} + +TEST(BlobGarbageMeterTest, InlinedTTLBlobIndex) { + constexpr char user_key[] = "user_key"; + constexpr SequenceNumber seq = 123; + + const InternalKey key(user_key, seq, kTypeBlobIndex); + const Slice key_slice = key.Encode(); + + constexpr uint64_t expiration = 1234567890; + constexpr char inlined_value[] = "inlined"; + + std::string value; + BlobIndex::EncodeInlinedTTL(&value, expiration, inlined_value); + + const Slice value_slice(value); + + BlobGarbageMeter blob_garbage_meter; + + ASSERT_NOK(blob_garbage_meter.ProcessInFlow(key_slice, value_slice)); + ASSERT_NOK(blob_garbage_meter.ProcessOutFlow(key_slice, value_slice)); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/blob/blob_log_format.h b/db/blob/blob_log_format.h index 539bbb526..4caa5a5f8 100644 --- a/db/blob/blob_log_format.h +++ b/db/blob/blob_log_format.h @@ -107,7 +107,8 @@ struct BlobLogRecord { // Note that the offset field of BlobIndex actually points to the blob value // as opposed to the start of the blob record. The following method can // be used to calculate the adjustment needed to read the blob record header. - static uint64_t CalculateAdjustmentForRecordHeader(uint64_t key_size) { + static constexpr uint64_t CalculateAdjustmentForRecordHeader( + uint64_t key_size) { return key_size + kHeaderSize; } diff --git a/src.mk b/src.mk index b088e8b70..95ba2e70d 100644 --- a/src.mk +++ b/src.mk @@ -13,6 +13,7 @@ LIB_SOURCES = \ db/blob/blob_file_garbage.cc \ db/blob/blob_file_meta.cc \ db/blob/blob_file_reader.cc \ + db/blob/blob_garbage_meter.cc \ db/blob/blob_log_format.cc \ db/blob/blob_log_sequential_reader.cc \ db/blob/blob_log_writer.cc \ @@ -383,6 +384,7 @@ TEST_MAIN_SOURCES = \ db/blob/blob_file_cache_test.cc \ db/blob/blob_file_garbage_test.cc \ db/blob/blob_file_reader_test.cc \ + db/blob/blob_garbage_meter_test.cc \ db/blob/db_blob_basic_test.cc \ db/blob/db_blob_compaction_test.cc \ db/blob/db_blob_corruption_test.cc \