diff --git a/CMakeLists.txt b/CMakeLists.txt index aeef92e25..5e8c55390 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -496,6 +496,7 @@ set(SOURCES cache/lru_cache.cc cache/sharded_cache.cc db/arena_wrapped_db_iter.cc + db/blob_file_state.cc db/builder.cc db/c.cc db/column_family.cc @@ -919,6 +920,7 @@ if(WITH_TESTS) set(TESTS cache/cache_test.cc cache/lru_cache_test.cc + db/blob_file_state_test.cc db/column_family_test.cc db/compact_files_test.cc db/compaction/compaction_job_stats_test.cc diff --git a/Makefile b/Makefile index ce2e581cc..d016614dc 100644 --- a/Makefile +++ b/Makefile @@ -597,6 +597,7 @@ TESTS = \ block_cache_tracer_test \ block_cache_trace_analyzer_test \ defer_test \ + blob_file_state_test \ ifeq ($(USE_FOLLY_DISTRIBUTED_MUTEX),1) TESTS += folly_synchronization_distributed_mutex_test @@ -1718,6 +1719,9 @@ block_cache_trace_analyzer_test: tools/block_cache_analyzer/block_cache_trace_an defer_test: util/defer_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +blob_file_state_test: db/blob_file_state_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + #------------------------------------------------- # make install related stuff INSTALL_PATH ?= /usr/local diff --git a/TARGETS b/TARGETS index 23952528d..86536c04a 100644 --- a/TARGETS +++ b/TARGETS @@ -116,6 +116,7 @@ cpp_library( "cache/lru_cache.cc", "cache/sharded_cache.cc", "db/arena_wrapped_db_iter.cc", + "db/blob_file_state.cc", "db/builder.cc", "db/c.cc", "db/column_family.cc", @@ -477,6 +478,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "blob_file_state_test", + "db/blob_file_state_test.cc", + "serial", + [], + [], + ], [ "block_based_filter_block_test", "table/block_based/block_based_filter_block_test.cc", diff --git a/db/blob_file_state.cc b/db/blob_file_state.cc new file mode 100644 index 000000000..52a205c94 --- /dev/null +++ b/db/blob_file_state.cc @@ -0,0 +1,173 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob_file_state.h" + +#include "logging/event_logger.h" +#include "rocksdb/slice.h" +#include "rocksdb/status.h" +#include "test_util/sync_point.h" +#include "util/coding.h" + +#include +#include + +namespace ROCKSDB_NAMESPACE { + +namespace { + +// Tags for custom fields. Note that these get persisted in the manifest, +// so existing tags should not be modified. +enum CustomFieldTags : uint32_t { + kEndMarker, + + // Add forward compatible fields here + + ///////////////////////////////////////////////////////////////////// + + kForwardIncompatibleMask = 1 << 6, + + // Add forward incompatible fields here +}; + +} // anonymous namespace + +void BlobFileState::EncodeTo(std::string* output) const { + PutVarint64(output, blob_file_number_); + PutVarint64(output, total_blob_count_); + PutVarint64(output, total_blob_bytes_); + PutVarint64(output, garbage_blob_count_); + PutVarint64(output, garbage_blob_bytes_); + PutLengthPrefixedSlice(output, checksum_method_); + PutLengthPrefixedSlice(output, checksum_value_); + + // Encode any custom fields here. The format to use is a Varint32 tag (see + // CustomFieldTags above) followed by a length prefixed slice. Unknown custom + // fields will be ignored during decoding unless they're in the forward + // incompatible range. + + TEST_SYNC_POINT_CALLBACK("BlobFileState::EncodeTo::CustomFields", output); + + PutVarint32(output, kEndMarker); +} + +Status BlobFileState::DecodeFrom(Slice* input) { + constexpr char class_name[] = "BlobFileState"; + + if (!GetVarint64(input, &blob_file_number_)) { + return Status::Corruption(class_name, "Error decoding blob file number"); + } + + if (!GetVarint64(input, &total_blob_count_)) { + return Status::Corruption(class_name, "Error decoding total blob count"); + } + + if (!GetVarint64(input, &total_blob_bytes_)) { + return Status::Corruption(class_name, "Error decoding total blob bytes"); + } + + if (!GetVarint64(input, &garbage_blob_count_)) { + return Status::Corruption(class_name, "Error decoding garbage blob count"); + } + + if (!GetVarint64(input, &garbage_blob_bytes_)) { + return Status::Corruption(class_name, "Error decoding garbage blob bytes"); + } + + Slice checksum_method; + if (!GetLengthPrefixedSlice(input, &checksum_method)) { + return Status::Corruption(class_name, "Error decoding checksum method"); + } + checksum_method_ = checksum_method.ToString(); + + Slice checksum_value; + if (!GetLengthPrefixedSlice(input, &checksum_value)) { + return Status::Corruption(class_name, "Error decoding checksum value"); + } + checksum_value_ = checksum_value.ToString(); + + while (true) { + uint32_t custom_field_tag = 0; + if (!GetVarint32(input, &custom_field_tag)) { + return Status::Corruption(class_name, "Error decoding custom field tag"); + } + + if (custom_field_tag == kEndMarker) { + break; + } + + if (custom_field_tag & kForwardIncompatibleMask) { + return Status::Corruption( + class_name, "Forward incompatible custom field encountered"); + } + + Slice custom_field_value; + if (!GetLengthPrefixedSlice(input, &custom_field_value)) { + return Status::Corruption(class_name, + "Error decoding custom field value"); + } + } + + return Status::OK(); +} + +std::string BlobFileState::DebugString() const { + std::ostringstream oss; + + oss << *this; + + return oss.str(); +} + +std::string BlobFileState::DebugJSON() const { + JSONWriter jw; + + jw << *this; + + jw.EndObject(); + + return jw.Get(); +} + +bool operator==(const BlobFileState& lhs, const BlobFileState& rhs) { + return lhs.GetBlobFileNumber() == rhs.GetBlobFileNumber() && + lhs.GetTotalBlobCount() == rhs.GetTotalBlobCount() && + lhs.GetTotalBlobBytes() == rhs.GetTotalBlobBytes() && + lhs.GetGarbageBlobCount() == rhs.GetGarbageBlobCount() && + lhs.GetGarbageBlobBytes() == rhs.GetGarbageBlobBytes() && + lhs.GetChecksumMethod() == rhs.GetChecksumMethod() && + lhs.GetChecksumValue() == rhs.GetChecksumValue(); +} + +bool operator!=(const BlobFileState& lhs, const BlobFileState& rhs) { + return !(lhs == rhs); +} + +std::ostream& operator<<(std::ostream& os, + const BlobFileState& blob_file_state) { + os << "blob_file_number: " << blob_file_state.GetBlobFileNumber() + << " total_blob_count: " << blob_file_state.GetTotalBlobCount() + << " total_blob_bytes: " << blob_file_state.GetTotalBlobBytes() + << " garbage_blob_count: " << blob_file_state.GetGarbageBlobCount() + << " garbage_blob_bytes: " << blob_file_state.GetGarbageBlobBytes() + << " checksum_method: " << blob_file_state.GetChecksumMethod() + << " checksum_value: " << blob_file_state.GetChecksumValue(); + + return os; +} + +JSONWriter& operator<<(JSONWriter& jw, const BlobFileState& blob_file_state) { + jw << "BlobFileNumber" << blob_file_state.GetBlobFileNumber() + << "TotalBlobCount" << blob_file_state.GetTotalBlobCount() + << "TotalBlobBytes" << blob_file_state.GetTotalBlobBytes() + << "GarbageBlobCount" << blob_file_state.GetGarbageBlobCount() + << "GarbageBlobBytes" << blob_file_state.GetGarbageBlobBytes() + << "ChecksumMethod" << blob_file_state.GetChecksumMethod() + << "ChecksumValue" << blob_file_state.GetChecksumValue(); + + return jw; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob_file_state.h b/db/blob_file_state.h new file mode 100644 index 000000000..0591d52d2 --- /dev/null +++ b/db/blob_file_state.h @@ -0,0 +1,102 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "rocksdb/rocksdb_namespace.h" + +#include +#include +#include +#include + +namespace ROCKSDB_NAMESPACE { + +constexpr uint64_t kInvalidBlobFileNumber = 0; + +class JSONWriter; +class Slice; +class Status; + +class BlobFileState { + public: + BlobFileState() = default; + + BlobFileState(uint64_t blob_file_number, uint64_t total_blob_count, + uint64_t total_blob_bytes, std::string checksum_method, + std::string checksum_value) + : blob_file_number_(blob_file_number), + total_blob_count_(total_blob_count), + total_blob_bytes_(total_blob_bytes), + checksum_method_(std::move(checksum_method)), + checksum_value_(std::move(checksum_value)) { + assert(checksum_method_.empty() == checksum_value_.empty()); + } + + BlobFileState(uint64_t blob_file_number, uint64_t total_blob_count, + uint64_t total_blob_bytes, uint64_t garbage_blob_count, + uint64_t garbage_blob_bytes, std::string checksum_method, + std::string checksum_value) + : blob_file_number_(blob_file_number), + total_blob_count_(total_blob_count), + total_blob_bytes_(total_blob_bytes), + garbage_blob_count_(garbage_blob_count), + garbage_blob_bytes_(garbage_blob_bytes), + checksum_method_(std::move(checksum_method)), + checksum_value_(std::move(checksum_value)) { + assert(checksum_method_.empty() == checksum_value_.empty()); + assert(garbage_blob_count_ <= total_blob_count_); + assert(garbage_blob_bytes_ <= total_blob_bytes_); + } + + uint64_t GetBlobFileNumber() const { return blob_file_number_; } + + uint64_t GetTotalBlobCount() const { return total_blob_count_; } + uint64_t GetTotalBlobBytes() const { return total_blob_bytes_; } + + void AddGarbageBlob(uint64_t size) { + assert(garbage_blob_count_ < total_blob_count_); + assert(garbage_blob_bytes_ + size <= total_blob_bytes_); + + ++garbage_blob_count_; + garbage_blob_bytes_ += size; + } + + uint64_t GetGarbageBlobCount() const { return garbage_blob_count_; } + uint64_t GetGarbageBlobBytes() const { return garbage_blob_bytes_; } + + bool IsObsolete() const { + assert(garbage_blob_count_ <= total_blob_count_); + + return !(garbage_blob_count_ < total_blob_count_); + } + + const std::string& GetChecksumMethod() const { return checksum_method_; } + const std::string& GetChecksumValue() const { return checksum_value_; } + + void EncodeTo(std::string* output) const; + Status DecodeFrom(Slice* input); + + std::string DebugString() const; + std::string DebugJSON() const; + + private: + uint64_t blob_file_number_ = kInvalidBlobFileNumber; + uint64_t total_blob_count_ = 0; + uint64_t total_blob_bytes_ = 0; + uint64_t garbage_blob_count_ = 0; + uint64_t garbage_blob_bytes_ = 0; + std::string checksum_method_; + std::string checksum_value_; +}; + +bool operator==(const BlobFileState& lhs, const BlobFileState& rhs); +bool operator!=(const BlobFileState& lhs, const BlobFileState& rhs); + +std::ostream& operator<<(std::ostream& os, + const BlobFileState& blob_file_state); +JSONWriter& operator<<(JSONWriter& jw, const BlobFileState& blob_file_state); + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob_file_state_test.cc b/db/blob_file_state_test.cc new file mode 100644 index 000000000..b6cd3f6f9 --- /dev/null +++ b/db/blob_file_state_test.cc @@ -0,0 +1,284 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob_file_state.h" +#include "test_util/sync_point.h" +#include "test_util/testharness.h" +#include "util/coding.h" + +#include +#include +#include + +namespace ROCKSDB_NAMESPACE { + +class BlobFileStateTest : public testing::Test { + public: + static void TestEncodeDecode(const BlobFileState& blob_file_state) { + std::string encoded; + blob_file_state.EncodeTo(&encoded); + + BlobFileState decoded; + Slice input(encoded); + ASSERT_OK(decoded.DecodeFrom(&input)); + + ASSERT_EQ(blob_file_state, decoded); + } +}; + +TEST_F(BlobFileStateTest, Empty) { + BlobFileState blob_file_state; + + ASSERT_EQ(blob_file_state.GetBlobFileNumber(), kInvalidBlobFileNumber); + ASSERT_EQ(blob_file_state.GetTotalBlobCount(), 0); + ASSERT_EQ(blob_file_state.GetTotalBlobBytes(), 0); + ASSERT_EQ(blob_file_state.GetGarbageBlobCount(), 0); + ASSERT_EQ(blob_file_state.GetGarbageBlobBytes(), 0); + ASSERT_TRUE(blob_file_state.IsObsolete()); + ASSERT_TRUE(blob_file_state.GetChecksumMethod().empty()); + ASSERT_TRUE(blob_file_state.GetChecksumValue().empty()); + + TestEncodeDecode(blob_file_state); +} + +TEST_F(BlobFileStateTest, NonEmpty) { + constexpr uint64_t blob_file_number = 123; + constexpr uint64_t total_blob_count = 2; + constexpr uint64_t total_blob_bytes = 123456; + constexpr uint64_t garbage_blob_count = 1; + constexpr uint64_t garbage_blob_bytes = 9876; + const std::string checksum_method("SHA1"); + const std::string checksum_value("bdb7f34a59dfa1592ce7f52e99f98c570c525cbd"); + + BlobFileState blob_file_state( + blob_file_number, total_blob_count, total_blob_bytes, garbage_blob_count, + garbage_blob_bytes, checksum_method, checksum_value); + + ASSERT_EQ(blob_file_state.GetBlobFileNumber(), blob_file_number); + ASSERT_EQ(blob_file_state.GetTotalBlobCount(), total_blob_count); + ASSERT_EQ(blob_file_state.GetTotalBlobBytes(), total_blob_bytes); + ASSERT_EQ(blob_file_state.GetGarbageBlobCount(), garbage_blob_count); + ASSERT_EQ(blob_file_state.GetGarbageBlobBytes(), garbage_blob_bytes); + ASSERT_FALSE(blob_file_state.IsObsolete()); + ASSERT_EQ(blob_file_state.GetChecksumMethod(), checksum_method); + ASSERT_EQ(blob_file_state.GetChecksumValue(), checksum_value); + + TestEncodeDecode(blob_file_state); +} + +TEST_F(BlobFileStateTest, AddGarbageBlob) { + constexpr uint64_t blob_file_number = 123; + constexpr uint64_t total_blob_count = 2; + constexpr uint64_t total_blob_bytes = 123456; + const std::string checksum_method("MD5"); + const std::string checksum_value("d8f72233c67a68c5ec2bd51c6be7556e"); + + BlobFileState blob_file_state(blob_file_number, total_blob_count, + total_blob_bytes, checksum_method, + checksum_value); + + ASSERT_EQ(blob_file_state.GetBlobFileNumber(), blob_file_number); + ASSERT_EQ(blob_file_state.GetTotalBlobCount(), total_blob_count); + ASSERT_EQ(blob_file_state.GetTotalBlobBytes(), total_blob_bytes); + ASSERT_EQ(blob_file_state.GetGarbageBlobCount(), 0); + ASSERT_EQ(blob_file_state.GetGarbageBlobBytes(), 0); + ASSERT_FALSE(blob_file_state.IsObsolete()); + ASSERT_EQ(blob_file_state.GetChecksumMethod(), checksum_method); + ASSERT_EQ(blob_file_state.GetChecksumValue(), checksum_value); + + TestEncodeDecode(blob_file_state); + + blob_file_state.AddGarbageBlob(123000); + + ASSERT_EQ(blob_file_state.GetBlobFileNumber(), blob_file_number); + ASSERT_EQ(blob_file_state.GetTotalBlobCount(), total_blob_count); + ASSERT_EQ(blob_file_state.GetTotalBlobBytes(), total_blob_bytes); + ASSERT_EQ(blob_file_state.GetGarbageBlobCount(), 1); + ASSERT_EQ(blob_file_state.GetGarbageBlobBytes(), 123000); + ASSERT_FALSE(blob_file_state.IsObsolete()); + ASSERT_EQ(blob_file_state.GetChecksumMethod(), checksum_method); + ASSERT_EQ(blob_file_state.GetChecksumValue(), checksum_value); + + TestEncodeDecode(blob_file_state); + + blob_file_state.AddGarbageBlob(456); + + ASSERT_EQ(blob_file_state.GetBlobFileNumber(), blob_file_number); + ASSERT_EQ(blob_file_state.GetTotalBlobCount(), total_blob_count); + ASSERT_EQ(blob_file_state.GetTotalBlobBytes(), total_blob_bytes); + ASSERT_EQ(blob_file_state.GetGarbageBlobCount(), total_blob_count); + ASSERT_EQ(blob_file_state.GetGarbageBlobBytes(), total_blob_bytes); + ASSERT_TRUE(blob_file_state.IsObsolete()); + ASSERT_EQ(blob_file_state.GetChecksumMethod(), checksum_method); + ASSERT_EQ(blob_file_state.GetChecksumValue(), checksum_value); + + TestEncodeDecode(blob_file_state); +} + +TEST_F(BlobFileStateTest, DecodeErrors) { + std::string str; + Slice slice(str); + + BlobFileState blob_file_state; + + { + const Status s = blob_file_state.DecodeFrom(&slice); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "blob file number")); + } + + constexpr uint64_t blob_file_number = 123; + PutVarint64(&str, blob_file_number); + slice = str; + + { + const Status s = blob_file_state.DecodeFrom(&slice); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "total blob count")); + } + + constexpr uint64_t total_blob_count = 4567; + PutVarint64(&str, total_blob_count); + slice = str; + + { + const Status s = blob_file_state.DecodeFrom(&slice); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "total blob bytes")); + } + + constexpr uint64_t total_blob_bytes = 12345678; + PutVarint64(&str, total_blob_bytes); + slice = str; + + { + const Status s = blob_file_state.DecodeFrom(&slice); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "garbage blob count")); + } + + constexpr uint64_t garbage_blob_count = 1234; + PutVarint64(&str, garbage_blob_count); + slice = str; + + { + const Status s = blob_file_state.DecodeFrom(&slice); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "garbage blob bytes")); + } + + constexpr uint64_t garbage_blob_bytes = 5678; + PutVarint64(&str, garbage_blob_bytes); + slice = str; + + { + const Status s = blob_file_state.DecodeFrom(&slice); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "checksum method")); + } + + constexpr char checksum_method[] = "SHA1"; + PutLengthPrefixedSlice(&str, checksum_method); + slice = str; + + { + const Status s = blob_file_state.DecodeFrom(&slice); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "checksum value")); + } + + constexpr char checksum_value[] = "bdb7f34a59dfa1592ce7f52e99f98c570c525cbd"; + PutLengthPrefixedSlice(&str, checksum_value); + slice = str; + + { + const Status s = blob_file_state.DecodeFrom(&slice); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "custom field tag")); + } + + constexpr uint32_t custom_tag = 2; + PutVarint32(&str, custom_tag); + slice = str; + + { + const Status s = blob_file_state.DecodeFrom(&slice); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "custom field value")); + } +} + +TEST_F(BlobFileStateTest, ForwardCompatibleCustomField) { + SyncPoint::GetInstance()->SetCallBack( + "BlobFileState::EncodeTo::CustomFields", [&](void* arg) { + std::string* output = static_cast(arg); + + constexpr uint32_t forward_compatible_tag = 2; + PutVarint32(output, forward_compatible_tag); + + PutLengthPrefixedSlice(output, "deadbeef"); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + constexpr uint64_t blob_file_number = 678; + constexpr uint64_t total_blob_count = 9999; + constexpr uint64_t total_blob_bytes = 100000000; + constexpr uint64_t garbage_blob_count = 3333; + constexpr uint64_t garbage_blob_bytes = 2500000; + const std::string checksum_method("CRC32"); + const std::string checksum_value("3d87ff57"); + + BlobFileState blob_file_state( + blob_file_number, total_blob_count, total_blob_bytes, garbage_blob_count, + garbage_blob_bytes, checksum_method, checksum_value); + + TestEncodeDecode(blob_file_state); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_F(BlobFileStateTest, ForwardIncompatibleCustomField) { + SyncPoint::GetInstance()->SetCallBack( + "BlobFileState::EncodeTo::CustomFields", [&](void* arg) { + std::string* output = static_cast(arg); + + constexpr uint32_t forward_incompatible_tag = (1 << 6) + 1; + PutVarint32(output, forward_incompatible_tag); + + PutLengthPrefixedSlice(output, "foobar"); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + constexpr uint64_t blob_file_number = 456; + constexpr uint64_t total_blob_count = 100; + constexpr uint64_t total_blob_bytes = 2000000; + const std::string checksum_method("CRC32B"); + const std::string checksum_value("6dbdf23a"); + + BlobFileState blob_file_state(blob_file_number, total_blob_count, + total_blob_bytes, checksum_method, + checksum_value); + + std::string encoded; + blob_file_state.EncodeTo(&encoded); + + BlobFileState decoded_blob_file_state; + Slice input(encoded); + const Status s = decoded_blob_file_state.DecodeFrom(&input); + + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "Forward incompatible")); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/version_edit.cc b/db/version_edit.cc index 5d1714421..890b5f539 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -22,8 +22,8 @@ namespace ROCKSDB_NAMESPACE { const std::string kUnknownFileChecksum(""); // The unknown sst file checksum function name. const std::string kUnknownFileChecksumFuncName("Unknown"); -// Mask for an identified tag from the future which can be safely ignored. -const uint32_t kTagSafeIgnoreMask = 1 << 13; + +namespace { // Tag numbers for serialized VersionEdit. These numbers are written to // disk and should not be changed. The number should be forward compatible so @@ -40,8 +40,6 @@ enum Tag : uint32_t { // 8 was used for large value refs kPrevLogNumber = 9, kMinLogNumberToKeep = 10, - // Ignore-able field - kDbId = kTagSafeIgnoreMask + 1, // these are new formats divergent from open source leveldb kNewFile2 = 100, @@ -53,26 +51,37 @@ enum Tag : uint32_t { kMaxColumnFamily = 203, kInAtomicGroup = 300, + + // Mask for an unidentified tag from the future which can be safely ignored. + kTagSafeIgnoreMask = 1 << 13, + + // Forward compatible (aka ignorable) records + kDbId, + kBlobFileState, }; -enum CustomTag : uint32_t { +enum NewFileCustomTag : uint32_t { kTerminate = 1, // The end of customized fields kNeedCompaction = 2, - // Since Manifest is not entirely currently forward-compatible, and the only - // forward-compatible part is the CutsomtTag of kNewFile, we currently encode - // kMinLogNumberToKeep as part of a CustomTag as a hack. This should be - // removed when manifest becomes forward-comptabile. + // Since Manifest is not entirely forward-compatible, we currently encode + // kMinLogNumberToKeep as part of NewFile as a hack. This should be removed + // when manifest becomes forward-comptabile. kMinLogNumberToKeepHack = 3, kOldestBlobFileNumber = 4, kOldestAncesterTime = 5, kFileCreationTime = 6, kFileChecksum = 7, kFileChecksumFuncName = 8, - kPathId = 65, + + // If this bit for the custom tag is set, opening DB should fail if + // we don't know this field. + kCustomTagNonSafeIgnoreMask = 1 << 6, + + // Forward incompatible (aka unignorable) fields + kPathId, }; -// If this bit for the custom tag is set, opening DB should fail if -// we don't know this field. -uint32_t kCustomTagNonSafeIgnoreMask = 1 << 6; + +} // anonymous namespace uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id) { assert(number <= kFileNumberMask); @@ -142,6 +151,7 @@ void VersionEdit::Clear() { has_last_sequence_ = false; deleted_files_.clear(); new_files_.clear(); + blob_file_states_.clear(); column_family_ = 0; is_column_family_add_ = false; is_column_family_drop_ = false; @@ -217,45 +227,45 @@ bool VersionEdit::EncodeTo(std::string* dst) const { // tag kNeedCompaction: // now only can take one char value 1 indicating need-compaction // - PutVarint32(dst, CustomTag::kOldestAncesterTime); + PutVarint32(dst, NewFileCustomTag::kOldestAncesterTime); std::string varint_oldest_ancester_time; PutVarint64(&varint_oldest_ancester_time, f.oldest_ancester_time); TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:VarintOldestAncesterTime", &varint_oldest_ancester_time); PutLengthPrefixedSlice(dst, Slice(varint_oldest_ancester_time)); - PutVarint32(dst, CustomTag::kFileCreationTime); + PutVarint32(dst, NewFileCustomTag::kFileCreationTime); std::string varint_file_creation_time; PutVarint64(&varint_file_creation_time, f.file_creation_time); TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:VarintFileCreationTime", &varint_file_creation_time); PutLengthPrefixedSlice(dst, Slice(varint_file_creation_time)); - PutVarint32(dst, CustomTag::kFileChecksum); + PutVarint32(dst, NewFileCustomTag::kFileChecksum); PutLengthPrefixedSlice(dst, Slice(f.file_checksum)); - PutVarint32(dst, CustomTag::kFileChecksumFuncName); + PutVarint32(dst, NewFileCustomTag::kFileChecksumFuncName); PutLengthPrefixedSlice(dst, Slice(f.file_checksum_func_name)); if (f.fd.GetPathId() != 0) { - PutVarint32(dst, CustomTag::kPathId); + PutVarint32(dst, NewFileCustomTag::kPathId); char p = static_cast(f.fd.GetPathId()); PutLengthPrefixedSlice(dst, Slice(&p, 1)); } if (f.marked_for_compaction) { - PutVarint32(dst, CustomTag::kNeedCompaction); + PutVarint32(dst, NewFileCustomTag::kNeedCompaction); char p = static_cast(1); PutLengthPrefixedSlice(dst, Slice(&p, 1)); } if (has_min_log_number_to_keep_ && !min_log_num_written) { - PutVarint32(dst, CustomTag::kMinLogNumberToKeepHack); + PutVarint32(dst, NewFileCustomTag::kMinLogNumberToKeepHack); std::string varint_log_number; PutFixed64(&varint_log_number, min_log_number_to_keep_); PutLengthPrefixedSlice(dst, Slice(varint_log_number)); min_log_num_written = true; } if (f.oldest_blob_file_number != kInvalidBlobFileNumber) { - PutVarint32(dst, CustomTag::kOldestBlobFileNumber); + PutVarint32(dst, NewFileCustomTag::kOldestBlobFileNumber); std::string oldest_blob_file_number; PutVarint64(&oldest_blob_file_number, f.oldest_blob_file_number); PutLengthPrefixedSlice(dst, Slice(oldest_blob_file_number)); @@ -263,7 +273,12 @@ bool VersionEdit::EncodeTo(std::string* dst) const { TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:NewFile4:CustomizeFields", dst); - PutVarint32(dst, CustomTag::kTerminate); + PutVarint32(dst, NewFileCustomTag::kTerminate); + } + + for (const auto& blob_file_state : blob_file_states_) { + PutVarint32(dst, kBlobFileState); + blob_file_state.EncodeTo(dst); } // 0 is default and does not need to be explicitly written @@ -319,9 +334,6 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) { uint64_t file_size = 0; SequenceNumber smallest_seqno = 0; SequenceNumber largest_seqno = kMaxSequenceNumber; - // Since this is the only forward-compatible part of the code, we hack new - // extension into this record. When we do, we set this boolean to distinguish - // the record from the normal NewFile records. if (GetLevel(input, &level, &msg) && GetVarint64(input, &number) && GetVarint64(input, &file_size) && GetInternalKey(input, &f.smallest) && GetInternalKey(input, &f.largest) && @@ -571,6 +583,17 @@ Status VersionEdit::DecodeFrom(const Slice& src) { break; } + case kBlobFileState: { + BlobFileState blob_file_state; + const Status s = blob_file_state.DecodeFrom(&input); + if (!s.ok()) { + return s; + } + + blob_file_states_.emplace_back(blob_file_state); + break; + } + case kColumnFamily: if (!GetVarint32(&input, &column_family_)) { if (!msg) { @@ -700,6 +723,12 @@ std::string VersionEdit::DebugString(bool hex_key) const { r.append(" file_checksum_func_name: "); r.append(f.file_checksum_func_name); } + + for (const auto& blob_file_state : blob_file_states_) { + r.append("\n BlobFileState: "); + r.append(blob_file_state.DebugString()); + } + r.append("\n ColumnFamily: "); AppendNumberTo(&r, column_family_); if (is_column_family_add_) { @@ -782,6 +811,20 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const { jw.EndArray(); } + if (!blob_file_states_.empty()) { + jw << "BlobFileStates"; + + jw.StartArray(); + + for (const auto& blob_file_state : blob_file_states_) { + jw.StartArrayedObject(); + jw << blob_file_state; + jw.EndArrayedObject(); + } + + jw.EndArray(); + } + jw << "ColumnFamily" << column_family_; if (is_column_family_add_) { diff --git a/db/version_edit.h b/db/version_edit.h index 6d1893f2a..88954641d 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -13,6 +13,7 @@ #include #include #include +#include "db/blob_file_state.h" #include "db/dbformat.h" #include "memory/arena.h" #include "rocksdb/cache.h" @@ -24,7 +25,6 @@ namespace ROCKSDB_NAMESPACE { class VersionSet; constexpr uint64_t kFileNumberMask = 0x3FFFFFFFFFFFFFFF; -constexpr uint64_t kInvalidBlobFileNumber = 0; constexpr uint64_t kUnknownOldestAncesterTime = 0; constexpr uint64_t kUnknownFileCreationTime = 0; @@ -307,16 +307,16 @@ class VersionEdit { bool HasLastSequence() const { return has_last_sequence_; } SequenceNumber GetLastSequence() const { return last_sequence_; } - // Delete the specified "file" from the specified "level". + // Delete the specified table file from the specified level. void DeleteFile(int level, uint64_t file) { deleted_files_.emplace(level, file); } - // Retrieve the files deleted as well as their associated levels. + // Retrieve the table files deleted as well as their associated levels. using DeletedFiles = std::set>; const DeletedFiles& GetDeletedFiles() const { return deleted_files_; } - // Add the specified file at the specified level. + // Add the specified table file at the specified level. // REQUIRES: This version has not been saved (see VersionSet::SaveTo) // REQUIRES: "smallest" and "largest" are smallest and largest keys in file // REQUIRES: "oldest_blob_file_number" is the number of the oldest blob file @@ -342,12 +342,30 @@ class VersionEdit { new_files_.emplace_back(level, f); } - // Retrieve the files added as well as their associated levels. + // Retrieve the table files added as well as their associated levels. using NewFiles = std::vector>; const NewFiles& GetNewFiles() const { return new_files_; } + // Add blob file state for the specified file. + void AddBlobFileState(uint64_t blob_file_number, uint64_t total_blob_count, + uint64_t total_blob_bytes, uint64_t garbage_blob_count, + uint64_t garbage_blob_bytes, + std::string checksum_method, + std::string checksum_value) { + blob_file_states_.emplace_back( + blob_file_number, total_blob_count, total_blob_bytes, + garbage_blob_count, garbage_blob_bytes, std::move(checksum_method), + std::move(checksum_value)); + } + + // Retrieve all the blob file states added. + using BlobFileStates = std::vector; + const BlobFileStates& GetBlobFileStates() const { return blob_file_states_; } + // Number of edits - size_t NumEntries() const { return new_files_.size() + deleted_files_.size(); } + size_t NumEntries() const { + return new_files_.size() + deleted_files_.size() + blob_file_states_.size(); + } void SetColumnFamily(uint32_t column_family_id) { column_family_ = column_family_id; @@ -421,6 +439,8 @@ class VersionEdit { DeletedFiles deleted_files_; NewFiles new_files_; + BlobFileStates blob_file_states_; + // Each version edit record should have column_family_ set // If it's not set, it is default (0) uint32_t column_family_ = 0; diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index 8bc884df9..d03d1d735 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -11,6 +11,7 @@ #include "test_util/sync_point.h" #include "test_util/testharness.h" #include "util/coding.h" +#include "util/string_util.h" namespace ROCKSDB_NAMESPACE { @@ -278,6 +279,33 @@ TEST_F(VersionEditTest, DbId) { TestEncodeDecode(edit); } +TEST_F(VersionEditTest, BlobFileState) { + VersionEdit edit; + + const std::string checksum_method_prefix = "Hash"; + const std::string checksum_value_prefix = "Value"; + + for (uint64_t blob_file_number = 1; blob_file_number <= 10; + ++blob_file_number) { + const uint64_t total_blob_count = blob_file_number << 10; + const uint64_t total_blob_bytes = blob_file_number << 20; + const uint64_t garbage_blob_count = total_blob_count >> 2; + const uint64_t garbage_blob_bytes = total_blob_bytes >> 1; + + std::string checksum_method(checksum_method_prefix); + AppendNumberTo(&checksum_method, blob_file_number); + + std::string checksum_value(checksum_value_prefix); + AppendNumberTo(&checksum_value, blob_file_number); + + edit.AddBlobFileState(blob_file_number, total_blob_count, total_blob_bytes, + garbage_blob_count, garbage_blob_bytes, + checksum_method, checksum_value); + } + + TestEncodeDecode(edit); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/src.mk b/src.mk index 5db947a21..1732cf089 100644 --- a/src.mk +++ b/src.mk @@ -4,6 +4,7 @@ LIB_SOURCES = \ cache/lru_cache.cc \ cache/sharded_cache.cc \ db/arena_wrapped_db_iter.cc \ + db/blob_file_state.cc \ db/builder.cc \ db/c.cc \ db/column_family.cc \ @@ -295,6 +296,7 @@ MAIN_SOURCES = \ cache/cache_bench.cc \ cache/cache_test.cc \ db_stress_tool/db_stress.cc \ + db/blob_file_state_test.cc \ db/column_family_test.cc \ db/compact_files_test.cc \ db/compaction/compaction_iterator_test.cc \