diff --git a/CMakeLists.txt b/CMakeLists.txt index a5ced51fc..763daabcc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -635,6 +635,7 @@ set(SOURCES db/blob/blob_file_garbage.cc db/blob/blob_file_meta.cc db/blob/blob_file_reader.cc + db/blob/blob_garbage_meter.cc db/blob/blob_log_format.cc db/blob/blob_log_sequential_reader.cc db/blob/blob_log_writer.cc @@ -1128,6 +1129,7 @@ if(WITH_TESTS) db/blob/blob_file_cache_test.cc db/blob/blob_file_garbage_test.cc db/blob/blob_file_reader_test.cc + db/blob/blob_garbage_meter_test.cc db/blob/db_blob_basic_test.cc db/blob/db_blob_compaction_test.cc db/blob/db_blob_corruption_test.cc diff --git a/Makefile b/Makefile index e8e09588e..3daa2c037 100644 --- a/Makefile +++ b/Makefile @@ -1842,6 +1842,9 @@ blob_file_garbage_test: $(OBJ_DIR)/db/blob/blob_file_garbage_test.o $(TEST_LIBRA blob_file_reader_test: $(OBJ_DIR)/db/blob/blob_file_reader_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +blob_garbage_meter_test: $(OBJ_DIR)/db/blob/blob_garbage_meter_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + timer_test: $(OBJ_DIR)/util/timer_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 248a810ba..ac8f9509b 100644 --- a/TARGETS +++ b/TARGETS @@ -144,6 +144,7 @@ cpp_library( "db/blob/blob_file_garbage.cc", "db/blob/blob_file_meta.cc", "db/blob/blob_file_reader.cc", + "db/blob/blob_garbage_meter.cc", "db/blob/blob_log_format.cc", "db/blob/blob_log_sequential_reader.cc", "db/blob/blob_log_writer.cc", @@ -456,6 +457,7 @@ cpp_library( "db/blob/blob_file_garbage.cc", "db/blob/blob_file_meta.cc", "db/blob/blob_file_reader.cc", + "db/blob/blob_garbage_meter.cc", "db/blob/blob_log_format.cc", "db/blob/blob_log_sequential_reader.cc", "db/blob/blob_log_writer.cc", @@ -948,6 +950,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "blob_garbage_meter_test", + "db/blob/blob_garbage_meter_test.cc", + "parallel", + [], + [], + ], [ "block_based_filter_block_test", "table/block_based/block_based_filter_block_test.cc", diff --git a/db/blob/blob_garbage_meter.cc b/db/blob/blob_garbage_meter.cc new file mode 100644 index 000000000..d328d7ff4 --- /dev/null +++ b/db/blob/blob_garbage_meter.cc @@ -0,0 +1,100 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob/blob_garbage_meter.h" + +#include "db/blob/blob_index.h" +#include "db/blob/blob_log_format.h" +#include "db/dbformat.h" + +namespace ROCKSDB_NAMESPACE { + +Status BlobGarbageMeter::ProcessInFlow(const Slice& key, const Slice& value) { + uint64_t blob_file_number = kInvalidBlobFileNumber; + uint64_t bytes = 0; + + const Status s = Parse(key, value, &blob_file_number, &bytes); + if (!s.ok()) { + return s; + } + + if (blob_file_number == kInvalidBlobFileNumber) { + return Status::OK(); + } + + flows_[blob_file_number].AddInFlow(bytes); + + return Status::OK(); +} + +Status BlobGarbageMeter::ProcessOutFlow(const Slice& key, const Slice& value) { + uint64_t blob_file_number = kInvalidBlobFileNumber; + uint64_t bytes = 0; + + const Status s = Parse(key, value, &blob_file_number, &bytes); + if (!s.ok()) { + return s; + } + + if (blob_file_number == kInvalidBlobFileNumber) { + return Status::OK(); + } + + // Note: in order to measure the amount of additional garbage, we only need to + // track the outflow for preexisting files, i.e. those that also had inflow. + // (Newly written files would only have outflow.) + auto it = flows_.find(blob_file_number); + if (it == flows_.end()) { + return Status::OK(); + } + + it->second.AddOutFlow(bytes); + + return Status::OK(); +} + +Status BlobGarbageMeter::Parse(const Slice& key, const Slice& value, + uint64_t* blob_file_number, uint64_t* bytes) { + assert(blob_file_number); + assert(*blob_file_number == kInvalidBlobFileNumber); + assert(bytes); + assert(*bytes == 0); + + ParsedInternalKey ikey; + + { + constexpr bool log_err_key = false; + const Status s = ParseInternalKey(key, &ikey, log_err_key); + if (!s.ok()) { + return s; + } + } + + if (ikey.type != kTypeBlobIndex) { + return Status::OK(); + } + + BlobIndex blob_index; + + { + const Status s = blob_index.DecodeFrom(value); + if (!s.ok()) { + return s; + } + } + + if (blob_index.IsInlined() || blob_index.HasTTL()) { + return Status::Corruption("Unexpected TTL/inlined blob index"); + } + + *blob_file_number = blob_index.file_number(); + *bytes = + blob_index.size() + + BlobLogRecord::CalculateAdjustmentForRecordHeader(ikey.user_key.size()); + + return Status::OK(); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_garbage_meter.h b/db/blob/blob_garbage_meter.h new file mode 100644 index 000000000..a6c04b0b2 --- /dev/null +++ b/db/blob/blob_garbage_meter.h @@ -0,0 +1,102 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include +#include + +#include "db/blob/blob_constants.h" +#include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/status.h" + +namespace ROCKSDB_NAMESPACE { + +class Slice; + +// A class that can be used to compute the amount of additional garbage +// generated by a compaction. It parses the keys and blob references in the +// input and output of a compaction, and aggregates the "inflow" and "outflow" +// on a per-blob file basis. The amount of additional garbage for any given blob +// file can then be computed by subtracting the outflow from the inflow. +class BlobGarbageMeter { + public: + // A class to store the number and total size of blobs on a per-blob file + // basis. + class BlobStats { + public: + void Add(uint64_t bytes) { + ++count_; + bytes_ += bytes; + } + void Add(uint64_t count, uint64_t bytes) { + count_ += count; + bytes_ += bytes; + } + + uint64_t GetCount() const { return count_; } + uint64_t GetBytes() const { return bytes_; } + + private: + uint64_t count_ = 0; + uint64_t bytes_ = 0; + }; + + // A class to keep track of the "inflow" and the "outflow" and to compute the + // amount of additional garbage for a given blob file. + class BlobInOutFlow { + public: + void AddInFlow(uint64_t bytes) { + in_flow_.Add(bytes); + assert(IsValid()); + } + void AddOutFlow(uint64_t bytes) { + out_flow_.Add(bytes); + assert(IsValid()); + } + + const BlobStats& GetInFlow() const { return in_flow_; } + const BlobStats& GetOutFlow() const { return out_flow_; } + + bool IsValid() const { + return in_flow_.GetCount() >= out_flow_.GetCount() && + in_flow_.GetBytes() >= out_flow_.GetBytes(); + } + bool HasGarbage() const { + assert(IsValid()); + return in_flow_.GetCount() > out_flow_.GetCount(); + } + uint64_t GetGarbageCount() const { + assert(IsValid()); + assert(HasGarbage()); + return in_flow_.GetCount() - out_flow_.GetCount(); + } + uint64_t GetGarbageBytes() const { + assert(IsValid()); + assert(HasGarbage()); + return in_flow_.GetBytes() - out_flow_.GetBytes(); + } + + private: + BlobStats in_flow_; + BlobStats out_flow_; + }; + + Status ProcessInFlow(const Slice& key, const Slice& value); + Status ProcessOutFlow(const Slice& key, const Slice& value); + + const std::unordered_map& flows() const { + return flows_; + } + + private: + static Status Parse(const Slice& key, const Slice& value, + uint64_t* blob_file_number, uint64_t* bytes); + + std::unordered_map flows_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_garbage_meter_test.cc b/db/blob/blob_garbage_meter_test.cc new file mode 100644 index 000000000..7066a2c33 --- /dev/null +++ b/db/blob/blob_garbage_meter_test.cc @@ -0,0 +1,196 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob/blob_garbage_meter.h" + +#include +#include + +#include "db/blob/blob_index.h" +#include "db/blob/blob_log_format.h" +#include "db/dbformat.h" +#include "test_util/testharness.h" + +namespace ROCKSDB_NAMESPACE { + +TEST(BlobGarbageMeterTest, MeasureGarbage) { + BlobGarbageMeter blob_garbage_meter; + + struct BlobDescriptor { + std::string user_key; + uint64_t blob_file_number; + uint64_t offset; + uint64_t size; + CompressionType compression_type; + bool has_in_flow; + bool has_out_flow; + + uint64_t GetExpectedBytes() const { + return size + + BlobLogRecord::CalculateAdjustmentForRecordHeader(user_key.size()); + } + }; + + // Note: blob file 4 has the same inflow and outflow and hence no additional + // garbage. Blob file 5 has less outflow than inflow and thus it does have + // additional garbage. Blob file 6 is a newly written file (i.e. no inflow, + // only outflow) and is thus not tracked by the meter. + std::vector blobs{ + {"key", 4, 1234, 555, kLZ4Compression, true, true}, + {"other_key", 4, 6789, 101010, kLZ4Compression, true, true}, + {"yet_another_key", 5, 22222, 3456, kLZ4Compression, true, true}, + {"foo_key", 5, 77777, 8888, kLZ4Compression, true, true}, + {"bar_key", 5, 999999, 1212, kLZ4Compression, true, false}, + {"baz_key", 5, 1234567, 890, kLZ4Compression, true, false}, + {"new_key", 6, 7777, 9999, kNoCompression, false, true}}; + + for (const auto& blob : blobs) { + constexpr SequenceNumber seq = 123; + const InternalKey key(blob.user_key, seq, kTypeBlobIndex); + const Slice key_slice = key.Encode(); + + std::string value; + BlobIndex::EncodeBlob(&value, blob.blob_file_number, blob.offset, blob.size, + blob.compression_type); + const Slice value_slice(value); + + if (blob.has_in_flow) { + ASSERT_OK(blob_garbage_meter.ProcessInFlow(key_slice, value_slice)); + } + if (blob.has_out_flow) { + ASSERT_OK(blob_garbage_meter.ProcessOutFlow(key_slice, value_slice)); + } + } + + const auto& flows = blob_garbage_meter.flows(); + ASSERT_EQ(flows.size(), 2); + + { + const auto it = flows.find(4); + ASSERT_NE(it, flows.end()); + + const auto& flow = it->second; + + constexpr uint64_t expected_count = 2; + const uint64_t expected_bytes = + blobs[0].GetExpectedBytes() + blobs[1].GetExpectedBytes(); + + const auto& in = flow.GetInFlow(); + ASSERT_EQ(in.GetCount(), expected_count); + ASSERT_EQ(in.GetBytes(), expected_bytes); + + const auto& out = flow.GetOutFlow(); + ASSERT_EQ(out.GetCount(), expected_count); + ASSERT_EQ(out.GetBytes(), expected_bytes); + + ASSERT_TRUE(flow.IsValid()); + ASSERT_FALSE(flow.HasGarbage()); + } + + { + const auto it = flows.find(5); + ASSERT_NE(it, flows.end()); + + const auto& flow = it->second; + + const auto& in = flow.GetInFlow(); + + constexpr uint64_t expected_in_count = 4; + const uint64_t expected_in_bytes = + blobs[2].GetExpectedBytes() + blobs[3].GetExpectedBytes() + + blobs[4].GetExpectedBytes() + blobs[5].GetExpectedBytes(); + + ASSERT_EQ(in.GetCount(), expected_in_count); + ASSERT_EQ(in.GetBytes(), expected_in_bytes); + + const auto& out = flow.GetOutFlow(); + + constexpr uint64_t expected_out_count = 2; + const uint64_t expected_out_bytes = + blobs[2].GetExpectedBytes() + blobs[3].GetExpectedBytes(); + + ASSERT_EQ(out.GetCount(), expected_out_count); + ASSERT_EQ(out.GetBytes(), expected_out_bytes); + + ASSERT_TRUE(flow.IsValid()); + ASSERT_TRUE(flow.HasGarbage()); + ASSERT_EQ(flow.GetGarbageCount(), expected_in_count - expected_out_count); + ASSERT_EQ(flow.GetGarbageBytes(), expected_in_bytes - expected_out_bytes); + } +} + +TEST(BlobGarbageMeterTest, PlainValue) { + constexpr char user_key[] = "user_key"; + constexpr SequenceNumber seq = 123; + + const InternalKey key(user_key, seq, kTypeValue); + const Slice key_slice = key.Encode(); + + constexpr char value[] = "value"; + const Slice value_slice(value); + + BlobGarbageMeter blob_garbage_meter; + + ASSERT_OK(blob_garbage_meter.ProcessInFlow(key_slice, value_slice)); + ASSERT_OK(blob_garbage_meter.ProcessOutFlow(key_slice, value_slice)); + ASSERT_TRUE(blob_garbage_meter.flows().empty()); +} + +TEST(BlobGarbageMeterTest, CorruptInternalKey) { + constexpr char corrupt_key[] = "i_am_corrupt"; + const Slice key_slice(corrupt_key); + + constexpr char value[] = "value"; + const Slice value_slice(value); + + BlobGarbageMeter blob_garbage_meter; + + ASSERT_NOK(blob_garbage_meter.ProcessInFlow(key_slice, value_slice)); + ASSERT_NOK(blob_garbage_meter.ProcessOutFlow(key_slice, value_slice)); +} + +TEST(BlobGarbageMeterTest, CorruptBlobIndex) { + constexpr char user_key[] = "user_key"; + constexpr SequenceNumber seq = 123; + + const InternalKey key(user_key, seq, kTypeBlobIndex); + const Slice key_slice = key.Encode(); + + constexpr char value[] = "i_am_not_a_blob_index"; + const Slice value_slice(value); + + BlobGarbageMeter blob_garbage_meter; + + ASSERT_NOK(blob_garbage_meter.ProcessInFlow(key_slice, value_slice)); + ASSERT_NOK(blob_garbage_meter.ProcessOutFlow(key_slice, value_slice)); +} + +TEST(BlobGarbageMeterTest, InlinedTTLBlobIndex) { + constexpr char user_key[] = "user_key"; + constexpr SequenceNumber seq = 123; + + const InternalKey key(user_key, seq, kTypeBlobIndex); + const Slice key_slice = key.Encode(); + + constexpr uint64_t expiration = 1234567890; + constexpr char inlined_value[] = "inlined"; + + std::string value; + BlobIndex::EncodeInlinedTTL(&value, expiration, inlined_value); + + const Slice value_slice(value); + + BlobGarbageMeter blob_garbage_meter; + + ASSERT_NOK(blob_garbage_meter.ProcessInFlow(key_slice, value_slice)); + ASSERT_NOK(blob_garbage_meter.ProcessOutFlow(key_slice, value_slice)); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/blob/blob_log_format.h b/db/blob/blob_log_format.h index 539bbb526..4caa5a5f8 100644 --- a/db/blob/blob_log_format.h +++ b/db/blob/blob_log_format.h @@ -107,7 +107,8 @@ struct BlobLogRecord { // Note that the offset field of BlobIndex actually points to the blob value // as opposed to the start of the blob record. The following method can // be used to calculate the adjustment needed to read the blob record header. - static uint64_t CalculateAdjustmentForRecordHeader(uint64_t key_size) { + static constexpr uint64_t CalculateAdjustmentForRecordHeader( + uint64_t key_size) { return key_size + kHeaderSize; } diff --git a/src.mk b/src.mk index b088e8b70..95ba2e70d 100644 --- a/src.mk +++ b/src.mk @@ -13,6 +13,7 @@ LIB_SOURCES = \ db/blob/blob_file_garbage.cc \ db/blob/blob_file_meta.cc \ db/blob/blob_file_reader.cc \ + db/blob/blob_garbage_meter.cc \ db/blob/blob_log_format.cc \ db/blob/blob_log_sequential_reader.cc \ db/blob/blob_log_writer.cc \ @@ -383,6 +384,7 @@ TEST_MAIN_SOURCES = \ db/blob/blob_file_cache_test.cc \ db/blob/blob_file_garbage_test.cc \ db/blob/blob_file_reader_test.cc \ + db/blob/blob_garbage_meter_test.cc \ db/blob/db_blob_basic_test.cc \ db/blob/db_blob_compaction_test.cc \ db/blob/db_blob_corruption_test.cc \