Add blob file state to VersionEdit (#6416)

Summary:
BlobDB currently does not keep track of blob files: no records are written to
the manifest when a blob file is added or removed, and upon opening a database,
the list of blob files is populated simply based on the contents of the blob directory.
This means that lost blob files cannot be detected at the moment. We plan to solve
this issue by making blob files a part of `Version`; as a first step, this patch makes
it possible to store information about blob files in `VersionEdit`. Currently, this information
includes blob file number, total number and size of all blobs, and total number and size
of garbage blobs. However, the format is extensible: new fields can be added in
both a forward compatible and a forward incompatible manner if needed (similarly
to `kNewFile4`).
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6416

Test Plan: `make check`

Differential Revision: D19894234

Pulled By: ltamasi

fbshipit-source-id: f9753e1f2aedf6dadb70c09b345207cb9c58c329
main
Levi Tamasi 5 years ago committed by Facebook Github Bot
parent eb367d45c0
commit d87c10c6ab
  1. 2
      CMakeLists.txt
  2. 4
      Makefile
  3. 8
      TARGETS
  4. 173
      db/blob_file_state.cc
  5. 102
      db/blob_file_state.h
  6. 284
      db/blob_file_state_test.cc
  7. 91
      db/version_edit.cc
  8. 32
      db/version_edit.h
  9. 28
      db/version_edit_test.cc
  10. 2
      src.mk

@ -496,6 +496,7 @@ set(SOURCES
cache/lru_cache.cc
cache/sharded_cache.cc
db/arena_wrapped_db_iter.cc
db/blob_file_state.cc
db/builder.cc
db/c.cc
db/column_family.cc
@ -919,6 +920,7 @@ if(WITH_TESTS)
set(TESTS
cache/cache_test.cc
cache/lru_cache_test.cc
db/blob_file_state_test.cc
db/column_family_test.cc
db/compact_files_test.cc
db/compaction/compaction_job_stats_test.cc

@ -597,6 +597,7 @@ TESTS = \
block_cache_tracer_test \
block_cache_trace_analyzer_test \
defer_test \
blob_file_state_test \
ifeq ($(USE_FOLLY_DISTRIBUTED_MUTEX),1)
TESTS += folly_synchronization_distributed_mutex_test
@ -1718,6 +1719,9 @@ block_cache_trace_analyzer_test: tools/block_cache_analyzer/block_cache_trace_an
defer_test: util/defer_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
blob_file_state_test: db/blob_file_state_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
#-------------------------------------------------
# make install related stuff
INSTALL_PATH ?= /usr/local

@ -116,6 +116,7 @@ cpp_library(
"cache/lru_cache.cc",
"cache/sharded_cache.cc",
"db/arena_wrapped_db_iter.cc",
"db/blob_file_state.cc",
"db/builder.cc",
"db/c.cc",
"db/column_family.cc",
@ -477,6 +478,13 @@ ROCKS_TESTS = [
[],
[],
],
[
"blob_file_state_test",
"db/blob_file_state_test.cc",
"serial",
[],
[],
],
[
"block_based_filter_block_test",
"table/block_based/block_based_filter_block_test.cc",

@ -0,0 +1,173 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/blob_file_state.h"
#include "logging/event_logger.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "test_util/sync_point.h"
#include "util/coding.h"
#include <ostream>
#include <sstream>
namespace ROCKSDB_NAMESPACE {
namespace {
// Tags for custom fields. Note that these get persisted in the manifest,
// so existing tags should not be modified.
enum CustomFieldTags : uint32_t {
kEndMarker,
// Add forward compatible fields here
/////////////////////////////////////////////////////////////////////
kForwardIncompatibleMask = 1 << 6,
// Add forward incompatible fields here
};
} // anonymous namespace
void BlobFileState::EncodeTo(std::string* output) const {
PutVarint64(output, blob_file_number_);
PutVarint64(output, total_blob_count_);
PutVarint64(output, total_blob_bytes_);
PutVarint64(output, garbage_blob_count_);
PutVarint64(output, garbage_blob_bytes_);
PutLengthPrefixedSlice(output, checksum_method_);
PutLengthPrefixedSlice(output, checksum_value_);
// Encode any custom fields here. The format to use is a Varint32 tag (see
// CustomFieldTags above) followed by a length prefixed slice. Unknown custom
// fields will be ignored during decoding unless they're in the forward
// incompatible range.
TEST_SYNC_POINT_CALLBACK("BlobFileState::EncodeTo::CustomFields", output);
PutVarint32(output, kEndMarker);
}
Status BlobFileState::DecodeFrom(Slice* input) {
constexpr char class_name[] = "BlobFileState";
if (!GetVarint64(input, &blob_file_number_)) {
return Status::Corruption(class_name, "Error decoding blob file number");
}
if (!GetVarint64(input, &total_blob_count_)) {
return Status::Corruption(class_name, "Error decoding total blob count");
}
if (!GetVarint64(input, &total_blob_bytes_)) {
return Status::Corruption(class_name, "Error decoding total blob bytes");
}
if (!GetVarint64(input, &garbage_blob_count_)) {
return Status::Corruption(class_name, "Error decoding garbage blob count");
}
if (!GetVarint64(input, &garbage_blob_bytes_)) {
return Status::Corruption(class_name, "Error decoding garbage blob bytes");
}
Slice checksum_method;
if (!GetLengthPrefixedSlice(input, &checksum_method)) {
return Status::Corruption(class_name, "Error decoding checksum method");
}
checksum_method_ = checksum_method.ToString();
Slice checksum_value;
if (!GetLengthPrefixedSlice(input, &checksum_value)) {
return Status::Corruption(class_name, "Error decoding checksum value");
}
checksum_value_ = checksum_value.ToString();
while (true) {
uint32_t custom_field_tag = 0;
if (!GetVarint32(input, &custom_field_tag)) {
return Status::Corruption(class_name, "Error decoding custom field tag");
}
if (custom_field_tag == kEndMarker) {
break;
}
if (custom_field_tag & kForwardIncompatibleMask) {
return Status::Corruption(
class_name, "Forward incompatible custom field encountered");
}
Slice custom_field_value;
if (!GetLengthPrefixedSlice(input, &custom_field_value)) {
return Status::Corruption(class_name,
"Error decoding custom field value");
}
}
return Status::OK();
}
std::string BlobFileState::DebugString() const {
std::ostringstream oss;
oss << *this;
return oss.str();
}
std::string BlobFileState::DebugJSON() const {
JSONWriter jw;
jw << *this;
jw.EndObject();
return jw.Get();
}
bool operator==(const BlobFileState& lhs, const BlobFileState& rhs) {
return lhs.GetBlobFileNumber() == rhs.GetBlobFileNumber() &&
lhs.GetTotalBlobCount() == rhs.GetTotalBlobCount() &&
lhs.GetTotalBlobBytes() == rhs.GetTotalBlobBytes() &&
lhs.GetGarbageBlobCount() == rhs.GetGarbageBlobCount() &&
lhs.GetGarbageBlobBytes() == rhs.GetGarbageBlobBytes() &&
lhs.GetChecksumMethod() == rhs.GetChecksumMethod() &&
lhs.GetChecksumValue() == rhs.GetChecksumValue();
}
bool operator!=(const BlobFileState& lhs, const BlobFileState& rhs) {
return !(lhs == rhs);
}
std::ostream& operator<<(std::ostream& os,
const BlobFileState& blob_file_state) {
os << "blob_file_number: " << blob_file_state.GetBlobFileNumber()
<< " total_blob_count: " << blob_file_state.GetTotalBlobCount()
<< " total_blob_bytes: " << blob_file_state.GetTotalBlobBytes()
<< " garbage_blob_count: " << blob_file_state.GetGarbageBlobCount()
<< " garbage_blob_bytes: " << blob_file_state.GetGarbageBlobBytes()
<< " checksum_method: " << blob_file_state.GetChecksumMethod()
<< " checksum_value: " << blob_file_state.GetChecksumValue();
return os;
}
JSONWriter& operator<<(JSONWriter& jw, const BlobFileState& blob_file_state) {
jw << "BlobFileNumber" << blob_file_state.GetBlobFileNumber()
<< "TotalBlobCount" << blob_file_state.GetTotalBlobCount()
<< "TotalBlobBytes" << blob_file_state.GetTotalBlobBytes()
<< "GarbageBlobCount" << blob_file_state.GetGarbageBlobCount()
<< "GarbageBlobBytes" << blob_file_state.GetGarbageBlobBytes()
<< "ChecksumMethod" << blob_file_state.GetChecksumMethod()
<< "ChecksumValue" << blob_file_state.GetChecksumValue();
return jw;
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,102 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "rocksdb/rocksdb_namespace.h"
#include <cassert>
#include <cstdint>
#include <iosfwd>
#include <string>
namespace ROCKSDB_NAMESPACE {
constexpr uint64_t kInvalidBlobFileNumber = 0;
class JSONWriter;
class Slice;
class Status;
class BlobFileState {
public:
BlobFileState() = default;
BlobFileState(uint64_t blob_file_number, uint64_t total_blob_count,
uint64_t total_blob_bytes, std::string checksum_method,
std::string checksum_value)
: blob_file_number_(blob_file_number),
total_blob_count_(total_blob_count),
total_blob_bytes_(total_blob_bytes),
checksum_method_(std::move(checksum_method)),
checksum_value_(std::move(checksum_value)) {
assert(checksum_method_.empty() == checksum_value_.empty());
}
BlobFileState(uint64_t blob_file_number, uint64_t total_blob_count,
uint64_t total_blob_bytes, uint64_t garbage_blob_count,
uint64_t garbage_blob_bytes, std::string checksum_method,
std::string checksum_value)
: blob_file_number_(blob_file_number),
total_blob_count_(total_blob_count),
total_blob_bytes_(total_blob_bytes),
garbage_blob_count_(garbage_blob_count),
garbage_blob_bytes_(garbage_blob_bytes),
checksum_method_(std::move(checksum_method)),
checksum_value_(std::move(checksum_value)) {
assert(checksum_method_.empty() == checksum_value_.empty());
assert(garbage_blob_count_ <= total_blob_count_);
assert(garbage_blob_bytes_ <= total_blob_bytes_);
}
uint64_t GetBlobFileNumber() const { return blob_file_number_; }
uint64_t GetTotalBlobCount() const { return total_blob_count_; }
uint64_t GetTotalBlobBytes() const { return total_blob_bytes_; }
void AddGarbageBlob(uint64_t size) {
assert(garbage_blob_count_ < total_blob_count_);
assert(garbage_blob_bytes_ + size <= total_blob_bytes_);
++garbage_blob_count_;
garbage_blob_bytes_ += size;
}
uint64_t GetGarbageBlobCount() const { return garbage_blob_count_; }
uint64_t GetGarbageBlobBytes() const { return garbage_blob_bytes_; }
bool IsObsolete() const {
assert(garbage_blob_count_ <= total_blob_count_);
return !(garbage_blob_count_ < total_blob_count_);
}
const std::string& GetChecksumMethod() const { return checksum_method_; }
const std::string& GetChecksumValue() const { return checksum_value_; }
void EncodeTo(std::string* output) const;
Status DecodeFrom(Slice* input);
std::string DebugString() const;
std::string DebugJSON() const;
private:
uint64_t blob_file_number_ = kInvalidBlobFileNumber;
uint64_t total_blob_count_ = 0;
uint64_t total_blob_bytes_ = 0;
uint64_t garbage_blob_count_ = 0;
uint64_t garbage_blob_bytes_ = 0;
std::string checksum_method_;
std::string checksum_value_;
};
bool operator==(const BlobFileState& lhs, const BlobFileState& rhs);
bool operator!=(const BlobFileState& lhs, const BlobFileState& rhs);
std::ostream& operator<<(std::ostream& os,
const BlobFileState& blob_file_state);
JSONWriter& operator<<(JSONWriter& jw, const BlobFileState& blob_file_state);
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,284 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/blob_file_state.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
#include "util/coding.h"
#include <cstdint>
#include <cstring>
#include <string>
namespace ROCKSDB_NAMESPACE {
class BlobFileStateTest : public testing::Test {
public:
static void TestEncodeDecode(const BlobFileState& blob_file_state) {
std::string encoded;
blob_file_state.EncodeTo(&encoded);
BlobFileState decoded;
Slice input(encoded);
ASSERT_OK(decoded.DecodeFrom(&input));
ASSERT_EQ(blob_file_state, decoded);
}
};
TEST_F(BlobFileStateTest, Empty) {
BlobFileState blob_file_state;
ASSERT_EQ(blob_file_state.GetBlobFileNumber(), kInvalidBlobFileNumber);
ASSERT_EQ(blob_file_state.GetTotalBlobCount(), 0);
ASSERT_EQ(blob_file_state.GetTotalBlobBytes(), 0);
ASSERT_EQ(blob_file_state.GetGarbageBlobCount(), 0);
ASSERT_EQ(blob_file_state.GetGarbageBlobBytes(), 0);
ASSERT_TRUE(blob_file_state.IsObsolete());
ASSERT_TRUE(blob_file_state.GetChecksumMethod().empty());
ASSERT_TRUE(blob_file_state.GetChecksumValue().empty());
TestEncodeDecode(blob_file_state);
}
TEST_F(BlobFileStateTest, NonEmpty) {
constexpr uint64_t blob_file_number = 123;
constexpr uint64_t total_blob_count = 2;
constexpr uint64_t total_blob_bytes = 123456;
constexpr uint64_t garbage_blob_count = 1;
constexpr uint64_t garbage_blob_bytes = 9876;
const std::string checksum_method("SHA1");
const std::string checksum_value("bdb7f34a59dfa1592ce7f52e99f98c570c525cbd");
BlobFileState blob_file_state(
blob_file_number, total_blob_count, total_blob_bytes, garbage_blob_count,
garbage_blob_bytes, checksum_method, checksum_value);
ASSERT_EQ(blob_file_state.GetBlobFileNumber(), blob_file_number);
ASSERT_EQ(blob_file_state.GetTotalBlobCount(), total_blob_count);
ASSERT_EQ(blob_file_state.GetTotalBlobBytes(), total_blob_bytes);
ASSERT_EQ(blob_file_state.GetGarbageBlobCount(), garbage_blob_count);
ASSERT_EQ(blob_file_state.GetGarbageBlobBytes(), garbage_blob_bytes);
ASSERT_FALSE(blob_file_state.IsObsolete());
ASSERT_EQ(blob_file_state.GetChecksumMethod(), checksum_method);
ASSERT_EQ(blob_file_state.GetChecksumValue(), checksum_value);
TestEncodeDecode(blob_file_state);
}
TEST_F(BlobFileStateTest, AddGarbageBlob) {
constexpr uint64_t blob_file_number = 123;
constexpr uint64_t total_blob_count = 2;
constexpr uint64_t total_blob_bytes = 123456;
const std::string checksum_method("MD5");
const std::string checksum_value("d8f72233c67a68c5ec2bd51c6be7556e");
BlobFileState blob_file_state(blob_file_number, total_blob_count,
total_blob_bytes, checksum_method,
checksum_value);
ASSERT_EQ(blob_file_state.GetBlobFileNumber(), blob_file_number);
ASSERT_EQ(blob_file_state.GetTotalBlobCount(), total_blob_count);
ASSERT_EQ(blob_file_state.GetTotalBlobBytes(), total_blob_bytes);
ASSERT_EQ(blob_file_state.GetGarbageBlobCount(), 0);
ASSERT_EQ(blob_file_state.GetGarbageBlobBytes(), 0);
ASSERT_FALSE(blob_file_state.IsObsolete());
ASSERT_EQ(blob_file_state.GetChecksumMethod(), checksum_method);
ASSERT_EQ(blob_file_state.GetChecksumValue(), checksum_value);
TestEncodeDecode(blob_file_state);
blob_file_state.AddGarbageBlob(123000);
ASSERT_EQ(blob_file_state.GetBlobFileNumber(), blob_file_number);
ASSERT_EQ(blob_file_state.GetTotalBlobCount(), total_blob_count);
ASSERT_EQ(blob_file_state.GetTotalBlobBytes(), total_blob_bytes);
ASSERT_EQ(blob_file_state.GetGarbageBlobCount(), 1);
ASSERT_EQ(blob_file_state.GetGarbageBlobBytes(), 123000);
ASSERT_FALSE(blob_file_state.IsObsolete());
ASSERT_EQ(blob_file_state.GetChecksumMethod(), checksum_method);
ASSERT_EQ(blob_file_state.GetChecksumValue(), checksum_value);
TestEncodeDecode(blob_file_state);
blob_file_state.AddGarbageBlob(456);
ASSERT_EQ(blob_file_state.GetBlobFileNumber(), blob_file_number);
ASSERT_EQ(blob_file_state.GetTotalBlobCount(), total_blob_count);
ASSERT_EQ(blob_file_state.GetTotalBlobBytes(), total_blob_bytes);
ASSERT_EQ(blob_file_state.GetGarbageBlobCount(), total_blob_count);
ASSERT_EQ(blob_file_state.GetGarbageBlobBytes(), total_blob_bytes);
ASSERT_TRUE(blob_file_state.IsObsolete());
ASSERT_EQ(blob_file_state.GetChecksumMethod(), checksum_method);
ASSERT_EQ(blob_file_state.GetChecksumValue(), checksum_value);
TestEncodeDecode(blob_file_state);
}
TEST_F(BlobFileStateTest, DecodeErrors) {
std::string str;
Slice slice(str);
BlobFileState blob_file_state;
{
const Status s = blob_file_state.DecodeFrom(&slice);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), "blob file number"));
}
constexpr uint64_t blob_file_number = 123;
PutVarint64(&str, blob_file_number);
slice = str;
{
const Status s = blob_file_state.DecodeFrom(&slice);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), "total blob count"));
}
constexpr uint64_t total_blob_count = 4567;
PutVarint64(&str, total_blob_count);
slice = str;
{
const Status s = blob_file_state.DecodeFrom(&slice);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), "total blob bytes"));
}
constexpr uint64_t total_blob_bytes = 12345678;
PutVarint64(&str, total_blob_bytes);
slice = str;
{
const Status s = blob_file_state.DecodeFrom(&slice);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), "garbage blob count"));
}
constexpr uint64_t garbage_blob_count = 1234;
PutVarint64(&str, garbage_blob_count);
slice = str;
{
const Status s = blob_file_state.DecodeFrom(&slice);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), "garbage blob bytes"));
}
constexpr uint64_t garbage_blob_bytes = 5678;
PutVarint64(&str, garbage_blob_bytes);
slice = str;
{
const Status s = blob_file_state.DecodeFrom(&slice);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), "checksum method"));
}
constexpr char checksum_method[] = "SHA1";
PutLengthPrefixedSlice(&str, checksum_method);
slice = str;
{
const Status s = blob_file_state.DecodeFrom(&slice);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), "checksum value"));
}
constexpr char checksum_value[] = "bdb7f34a59dfa1592ce7f52e99f98c570c525cbd";
PutLengthPrefixedSlice(&str, checksum_value);
slice = str;
{
const Status s = blob_file_state.DecodeFrom(&slice);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), "custom field tag"));
}
constexpr uint32_t custom_tag = 2;
PutVarint32(&str, custom_tag);
slice = str;
{
const Status s = blob_file_state.DecodeFrom(&slice);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), "custom field value"));
}
}
TEST_F(BlobFileStateTest, ForwardCompatibleCustomField) {
SyncPoint::GetInstance()->SetCallBack(
"BlobFileState::EncodeTo::CustomFields", [&](void* arg) {
std::string* output = static_cast<std::string*>(arg);
constexpr uint32_t forward_compatible_tag = 2;
PutVarint32(output, forward_compatible_tag);
PutLengthPrefixedSlice(output, "deadbeef");
});
SyncPoint::GetInstance()->EnableProcessing();
constexpr uint64_t blob_file_number = 678;
constexpr uint64_t total_blob_count = 9999;
constexpr uint64_t total_blob_bytes = 100000000;
constexpr uint64_t garbage_blob_count = 3333;
constexpr uint64_t garbage_blob_bytes = 2500000;
const std::string checksum_method("CRC32");
const std::string checksum_value("3d87ff57");
BlobFileState blob_file_state(
blob_file_number, total_blob_count, total_blob_bytes, garbage_blob_count,
garbage_blob_bytes, checksum_method, checksum_value);
TestEncodeDecode(blob_file_state);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(BlobFileStateTest, ForwardIncompatibleCustomField) {
SyncPoint::GetInstance()->SetCallBack(
"BlobFileState::EncodeTo::CustomFields", [&](void* arg) {
std::string* output = static_cast<std::string*>(arg);
constexpr uint32_t forward_incompatible_tag = (1 << 6) + 1;
PutVarint32(output, forward_incompatible_tag);
PutLengthPrefixedSlice(output, "foobar");
});
SyncPoint::GetInstance()->EnableProcessing();
constexpr uint64_t blob_file_number = 456;
constexpr uint64_t total_blob_count = 100;
constexpr uint64_t total_blob_bytes = 2000000;
const std::string checksum_method("CRC32B");
const std::string checksum_value("6dbdf23a");
BlobFileState blob_file_state(blob_file_number, total_blob_count,
total_blob_bytes, checksum_method,
checksum_value);
std::string encoded;
blob_file_state.EncodeTo(&encoded);
BlobFileState decoded_blob_file_state;
Slice input(encoded);
const Status s = decoded_blob_file_state.DecodeFrom(&input);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), "Forward incompatible"));
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -22,8 +22,8 @@ namespace ROCKSDB_NAMESPACE {
const std::string kUnknownFileChecksum("");
// The unknown sst file checksum function name.
const std::string kUnknownFileChecksumFuncName("Unknown");
// Mask for an identified tag from the future which can be safely ignored.
const uint32_t kTagSafeIgnoreMask = 1 << 13;
namespace {
// Tag numbers for serialized VersionEdit. These numbers are written to
// disk and should not be changed. The number should be forward compatible so
@ -40,8 +40,6 @@ enum Tag : uint32_t {
// 8 was used for large value refs
kPrevLogNumber = 9,
kMinLogNumberToKeep = 10,
// Ignore-able field
kDbId = kTagSafeIgnoreMask + 1,
// these are new formats divergent from open source leveldb
kNewFile2 = 100,
@ -53,26 +51,37 @@ enum Tag : uint32_t {
kMaxColumnFamily = 203,
kInAtomicGroup = 300,
// Mask for an unidentified tag from the future which can be safely ignored.
kTagSafeIgnoreMask = 1 << 13,
// Forward compatible (aka ignorable) records
kDbId,
kBlobFileState,
};
enum CustomTag : uint32_t {
enum NewFileCustomTag : uint32_t {
kTerminate = 1, // The end of customized fields
kNeedCompaction = 2,
// Since Manifest is not entirely currently forward-compatible, and the only
// forward-compatible part is the CutsomtTag of kNewFile, we currently encode
// kMinLogNumberToKeep as part of a CustomTag as a hack. This should be
// removed when manifest becomes forward-comptabile.
// Since Manifest is not entirely forward-compatible, we currently encode
// kMinLogNumberToKeep as part of NewFile as a hack. This should be removed
// when manifest becomes forward-comptabile.
kMinLogNumberToKeepHack = 3,
kOldestBlobFileNumber = 4,
kOldestAncesterTime = 5,
kFileCreationTime = 6,
kFileChecksum = 7,
kFileChecksumFuncName = 8,
kPathId = 65,
};
// If this bit for the custom tag is set, opening DB should fail if
// we don't know this field.
uint32_t kCustomTagNonSafeIgnoreMask = 1 << 6;
kCustomTagNonSafeIgnoreMask = 1 << 6,
// Forward incompatible (aka unignorable) fields
kPathId,
};
} // anonymous namespace
uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id) {
assert(number <= kFileNumberMask);
@ -142,6 +151,7 @@ void VersionEdit::Clear() {
has_last_sequence_ = false;
deleted_files_.clear();
new_files_.clear();
blob_file_states_.clear();
column_family_ = 0;
is_column_family_add_ = false;
is_column_family_drop_ = false;
@ -217,45 +227,45 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
// tag kNeedCompaction:
// now only can take one char value 1 indicating need-compaction
//
PutVarint32(dst, CustomTag::kOldestAncesterTime);
PutVarint32(dst, NewFileCustomTag::kOldestAncesterTime);
std::string varint_oldest_ancester_time;
PutVarint64(&varint_oldest_ancester_time, f.oldest_ancester_time);
TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:VarintOldestAncesterTime",
&varint_oldest_ancester_time);
PutLengthPrefixedSlice(dst, Slice(varint_oldest_ancester_time));
PutVarint32(dst, CustomTag::kFileCreationTime);
PutVarint32(dst, NewFileCustomTag::kFileCreationTime);
std::string varint_file_creation_time;
PutVarint64(&varint_file_creation_time, f.file_creation_time);
TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:VarintFileCreationTime",
&varint_file_creation_time);
PutLengthPrefixedSlice(dst, Slice(varint_file_creation_time));
PutVarint32(dst, CustomTag::kFileChecksum);
PutVarint32(dst, NewFileCustomTag::kFileChecksum);
PutLengthPrefixedSlice(dst, Slice(f.file_checksum));
PutVarint32(dst, CustomTag::kFileChecksumFuncName);
PutVarint32(dst, NewFileCustomTag::kFileChecksumFuncName);
PutLengthPrefixedSlice(dst, Slice(f.file_checksum_func_name));
if (f.fd.GetPathId() != 0) {
PutVarint32(dst, CustomTag::kPathId);
PutVarint32(dst, NewFileCustomTag::kPathId);
char p = static_cast<char>(f.fd.GetPathId());
PutLengthPrefixedSlice(dst, Slice(&p, 1));
}
if (f.marked_for_compaction) {
PutVarint32(dst, CustomTag::kNeedCompaction);
PutVarint32(dst, NewFileCustomTag::kNeedCompaction);
char p = static_cast<char>(1);
PutLengthPrefixedSlice(dst, Slice(&p, 1));
}
if (has_min_log_number_to_keep_ && !min_log_num_written) {
PutVarint32(dst, CustomTag::kMinLogNumberToKeepHack);
PutVarint32(dst, NewFileCustomTag::kMinLogNumberToKeepHack);
std::string varint_log_number;
PutFixed64(&varint_log_number, min_log_number_to_keep_);
PutLengthPrefixedSlice(dst, Slice(varint_log_number));
min_log_num_written = true;
}
if (f.oldest_blob_file_number != kInvalidBlobFileNumber) {
PutVarint32(dst, CustomTag::kOldestBlobFileNumber);
PutVarint32(dst, NewFileCustomTag::kOldestBlobFileNumber);
std::string oldest_blob_file_number;
PutVarint64(&oldest_blob_file_number, f.oldest_blob_file_number);
PutLengthPrefixedSlice(dst, Slice(oldest_blob_file_number));
@ -263,7 +273,12 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:NewFile4:CustomizeFields",
dst);
PutVarint32(dst, CustomTag::kTerminate);
PutVarint32(dst, NewFileCustomTag::kTerminate);
}
for (const auto& blob_file_state : blob_file_states_) {
PutVarint32(dst, kBlobFileState);
blob_file_state.EncodeTo(dst);
}
// 0 is default and does not need to be explicitly written
@ -319,9 +334,6 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
uint64_t file_size = 0;
SequenceNumber smallest_seqno = 0;
SequenceNumber largest_seqno = kMaxSequenceNumber;
// Since this is the only forward-compatible part of the code, we hack new
// extension into this record. When we do, we set this boolean to distinguish
// the record from the normal NewFile records.
if (GetLevel(input, &level, &msg) && GetVarint64(input, &number) &&
GetVarint64(input, &file_size) && GetInternalKey(input, &f.smallest) &&
GetInternalKey(input, &f.largest) &&
@ -571,6 +583,17 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
break;
}
case kBlobFileState: {
BlobFileState blob_file_state;
const Status s = blob_file_state.DecodeFrom(&input);
if (!s.ok()) {
return s;
}
blob_file_states_.emplace_back(blob_file_state);
break;
}
case kColumnFamily:
if (!GetVarint32(&input, &column_family_)) {
if (!msg) {
@ -700,6 +723,12 @@ std::string VersionEdit::DebugString(bool hex_key) const {
r.append(" file_checksum_func_name: ");
r.append(f.file_checksum_func_name);
}
for (const auto& blob_file_state : blob_file_states_) {
r.append("\n BlobFileState: ");
r.append(blob_file_state.DebugString());
}
r.append("\n ColumnFamily: ");
AppendNumberTo(&r, column_family_);
if (is_column_family_add_) {
@ -782,6 +811,20 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const {
jw.EndArray();
}
if (!blob_file_states_.empty()) {
jw << "BlobFileStates";
jw.StartArray();
for (const auto& blob_file_state : blob_file_states_) {
jw.StartArrayedObject();
jw << blob_file_state;
jw.EndArrayedObject();
}
jw.EndArray();
}
jw << "ColumnFamily" << column_family_;
if (is_column_family_add_) {

@ -13,6 +13,7 @@
#include <string>
#include <utility>
#include <vector>
#include "db/blob_file_state.h"
#include "db/dbformat.h"
#include "memory/arena.h"
#include "rocksdb/cache.h"
@ -24,7 +25,6 @@ namespace ROCKSDB_NAMESPACE {
class VersionSet;
constexpr uint64_t kFileNumberMask = 0x3FFFFFFFFFFFFFFF;
constexpr uint64_t kInvalidBlobFileNumber = 0;
constexpr uint64_t kUnknownOldestAncesterTime = 0;
constexpr uint64_t kUnknownFileCreationTime = 0;
@ -307,16 +307,16 @@ class VersionEdit {
bool HasLastSequence() const { return has_last_sequence_; }
SequenceNumber GetLastSequence() const { return last_sequence_; }
// Delete the specified "file" from the specified "level".
// Delete the specified table file from the specified level.
void DeleteFile(int level, uint64_t file) {
deleted_files_.emplace(level, file);
}
// Retrieve the files deleted as well as their associated levels.
// Retrieve the table files deleted as well as their associated levels.
using DeletedFiles = std::set<std::pair<int, uint64_t>>;
const DeletedFiles& GetDeletedFiles() const { return deleted_files_; }
// Add the specified file at the specified level.
// Add the specified table file at the specified level.
// REQUIRES: This version has not been saved (see VersionSet::SaveTo)
// REQUIRES: "smallest" and "largest" are smallest and largest keys in file
// REQUIRES: "oldest_blob_file_number" is the number of the oldest blob file
@ -342,12 +342,30 @@ class VersionEdit {
new_files_.emplace_back(level, f);
}
// Retrieve the files added as well as their associated levels.
// Retrieve the table files added as well as their associated levels.
using NewFiles = std::vector<std::pair<int, FileMetaData>>;
const NewFiles& GetNewFiles() const { return new_files_; }
// Add blob file state for the specified file.
void AddBlobFileState(uint64_t blob_file_number, uint64_t total_blob_count,
uint64_t total_blob_bytes, uint64_t garbage_blob_count,
uint64_t garbage_blob_bytes,
std::string checksum_method,
std::string checksum_value) {
blob_file_states_.emplace_back(
blob_file_number, total_blob_count, total_blob_bytes,
garbage_blob_count, garbage_blob_bytes, std::move(checksum_method),
std::move(checksum_value));
}
// Retrieve all the blob file states added.
using BlobFileStates = std::vector<BlobFileState>;
const BlobFileStates& GetBlobFileStates() const { return blob_file_states_; }
// Number of edits
size_t NumEntries() const { return new_files_.size() + deleted_files_.size(); }
size_t NumEntries() const {
return new_files_.size() + deleted_files_.size() + blob_file_states_.size();
}
void SetColumnFamily(uint32_t column_family_id) {
column_family_ = column_family_id;
@ -421,6 +439,8 @@ class VersionEdit {
DeletedFiles deleted_files_;
NewFiles new_files_;
BlobFileStates blob_file_states_;
// Each version edit record should have column_family_ set
// If it's not set, it is default (0)
uint32_t column_family_ = 0;

@ -11,6 +11,7 @@
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
#include "util/coding.h"
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
@ -278,6 +279,33 @@ TEST_F(VersionEditTest, DbId) {
TestEncodeDecode(edit);
}
TEST_F(VersionEditTest, BlobFileState) {
VersionEdit edit;
const std::string checksum_method_prefix = "Hash";
const std::string checksum_value_prefix = "Value";
for (uint64_t blob_file_number = 1; blob_file_number <= 10;
++blob_file_number) {
const uint64_t total_blob_count = blob_file_number << 10;
const uint64_t total_blob_bytes = blob_file_number << 20;
const uint64_t garbage_blob_count = total_blob_count >> 2;
const uint64_t garbage_blob_bytes = total_blob_bytes >> 1;
std::string checksum_method(checksum_method_prefix);
AppendNumberTo(&checksum_method, blob_file_number);
std::string checksum_value(checksum_value_prefix);
AppendNumberTo(&checksum_value, blob_file_number);
edit.AddBlobFileState(blob_file_number, total_blob_count, total_blob_bytes,
garbage_blob_count, garbage_blob_bytes,
checksum_method, checksum_value);
}
TestEncodeDecode(edit);
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -4,6 +4,7 @@ LIB_SOURCES = \
cache/lru_cache.cc \
cache/sharded_cache.cc \
db/arena_wrapped_db_iter.cc \
db/blob_file_state.cc \
db/builder.cc \
db/c.cc \
db/column_family.cc \
@ -295,6 +296,7 @@ MAIN_SOURCES = \
cache/cache_bench.cc \
cache/cache_test.cc \
db_stress_tool/db_stress.cc \
db/blob_file_state_test.cc \
db/column_family_test.cc \
db/compact_files_test.cc \
db/compaction/compaction_iterator_test.cc \

Loading…
Cancel
Save