Define WAL related classes to be used in VersionEdit and VersionSet (#7164)

Summary:
`WalAddition`, `WalDeletion` are defined in `wal_version.h` and used in `VersionEdit`.
`WalAddition` is used to represent events of creating a new WAL (no size, just log number), or closing a WAL (with size).
`WalDeletion` is used to represent events of deleting or archiving a WAL, it means the WAL is no longer alive (won't be replayed during recovery).

`WalSet` is the set of alive WALs kept in `VersionSet`.

1. Why use `WalDeletion` instead of relying on `MinLogNumber` to identify outdated WALs

On recovery, we can compute `MinLogNumber()` based on the log numbers kept in MANIFEST, any log with number < MinLogNumber can be ignored. So it seems that we don't need to persist `WalDeletion` to MANIFEST, since we can ignore the WALs based on MinLogNumber.

But the `MinLogNumber()` is actually a lower bound, it does not exactly mean that logs starting from MinLogNumber must exist. This is because in a corner case, when a column family is empty and never flushed, its log number is set to the largest log number, but not persisted in MANIFEST. So let's say there are 2 column families, when creating the DB, the first WAL has log number 1, so it's persisted to MANIFEST for both column families. Then CF 0 is empty and never flushed, CF 1 is updated and flushed, so a new WAL with log number 2 is created and persisted to MANIFEST for CF 1. But CF 0's log number in MANIFEST is still 1. So on recovery, MinLogNumber is 1, but since log 1 only contains data for CF 1, and CF 1 is flushed, log 1 might have already been deleted from disk.

We can make `MinLogNumber()` be the exactly minimum log number that must exist, by persisting the most recent log number for empty column families that are not flushed. But if there are N such column families, then every time a new WAL is created, we need to add N records to MANIFEST.

In current design, a record is persisted to MANIFEST only when WAL is created, closed, or deleted/archived, so the number of WAL related records are bounded to 3x number of WALs.

2. Why keep `WalSet` in `VersionSet` instead of applying the `VersionEdit`s to `VersionStorageInfo`

`VersionEdit`s are originally designed to track the addition and deletion of SST files. The SST files are related to column families, each column family has a list of `Version`s, and each `Version` keeps the set of active SST files in `VersionStorageInfo`.

But WALs are a concept of DB, they are not bounded to specific column families. So logically it does not make sense to store WALs in a column family's `Version`s.
Also, `Version`'s purpose is to keep reference to SST / blob files, so that they are not deleted until there is no version referencing them. But a WAL is deleted regardless of version references.
So we keep the WALs in `VersionSet`  for the purpose of writing out the DB state's snapshot when creating new MANIFESTs.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7164

Test Plan:
make version_edit_test && ./version_edit_test
make wal_edit_test && ./wal_edit_test

Reviewed By: ltamasi

Differential Revision: D22677936

Pulled By: cheng-chang

fbshipit-source-id: 5a3b6890140e572ffd79eb37e6e4c3c32361a859
main
Cheng Chang 4 years ago committed by Facebook GitHub Bot
parent 124fbd96d8
commit cd48ecaa1a
  1. 2
      CMakeLists.txt
  2. 3
      Makefile
  3. 1
      TARGETS
  4. 129
      db/version_edit.cc
  5. 85
      db/version_edit.h
  6. 205
      db/version_edit_test.cc
  7. 1
      db/version_set.cc
  8. 4
      db/version_set.h
  9. 175
      db/wal_edit.cc
  10. 143
      db/wal_edit.h
  11. 127
      db/wal_edit_test.cc
  12. 1
      src.mk

@ -607,6 +607,7 @@ set(SOURCES
db/version_edit.cc db/version_edit.cc
db/version_edit_handler.cc db/version_edit_handler.cc
db/version_set.cc db/version_set.cc
db/wal_edit.cc
db/wal_manager.cc db/wal_manager.cc
db/write_batch.cc db/write_batch.cc
db/write_batch_base.cc db/write_batch_base.cc
@ -1078,6 +1079,7 @@ if(WITH_TESTS)
db/version_edit_test.cc db/version_edit_test.cc
db/version_set_test.cc db/version_set_test.cc
db/wal_manager_test.cc db/wal_manager_test.cc
db/wal_edit_test.cc
db/write_batch_test.cc db/write_batch_test.cc
db/write_callback_test.cc db/write_callback_test.cc
db/write_controller_test.cc db/write_controller_test.cc

@ -1525,6 +1525,9 @@ compact_on_deletion_collector_test: $(OBJ_DIR)/utilities/table_properties_collec
wal_manager_test: $(OBJ_DIR)/db/wal_manager_test.o $(TEST_LIBRARY) $(LIBRARY) wal_manager_test: $(OBJ_DIR)/db/wal_manager_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK) $(AM_LINK)
wal_edit_test: $(OBJ_DIR)/db/wal_edit_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
dbformat_test: $(OBJ_DIR)/db/dbformat_test.o $(TEST_LIBRARY) $(LIBRARY) dbformat_test: $(OBJ_DIR)/db/dbformat_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK) $(AM_LINK)

@ -183,6 +183,7 @@ cpp_library(
"db/version_edit.cc", "db/version_edit.cc",
"db/version_edit_handler.cc", "db/version_edit_handler.cc",
"db/version_set.cc", "db/version_set.cc",
"db/wal_edit.cc",
"db/wal_manager.cc", "db/wal_manager.cc",
"db/write_batch.cc", "db/write_batch.cc",
"db/write_batch_base.cc", "db/write_batch_base.cc",

@ -21,63 +21,6 @@ namespace ROCKSDB_NAMESPACE {
namespace { namespace {
// Tag numbers for serialized VersionEdit. These numbers are written to
// disk and should not be changed. The number should be forward compatible so
// users can down-grade RocksDB safely. A future Tag is ignored by doing '&'
// between Tag and kTagSafeIgnoreMask field.
enum Tag : uint32_t {
kComparator = 1,
kLogNumber = 2,
kNextFileNumber = 3,
kLastSequence = 4,
kCompactPointer = 5,
kDeletedFile = 6,
kNewFile = 7,
// 8 was used for large value refs
kPrevLogNumber = 9,
kMinLogNumberToKeep = 10,
// these are new formats divergent from open source leveldb
kNewFile2 = 100,
kNewFile3 = 102,
kNewFile4 = 103, // 4th (the latest) format version of adding files
kColumnFamily = 200, // specify column family for version edit
kColumnFamilyAdd = 201,
kColumnFamilyDrop = 202,
kMaxColumnFamily = 203,
kInAtomicGroup = 300,
// Mask for an unidentified tag from the future which can be safely ignored.
kTagSafeIgnoreMask = 1 << 13,
// Forward compatible (aka ignorable) records
kDbId,
kBlobFileAddition,
kBlobFileGarbage,
};
enum NewFileCustomTag : uint32_t {
kTerminate = 1, // The end of customized fields
kNeedCompaction = 2,
// Since Manifest is not entirely forward-compatible, we currently encode
// kMinLogNumberToKeep as part of NewFile as a hack. This should be removed
// when manifest becomes forward-comptabile.
kMinLogNumberToKeepHack = 3,
kOldestBlobFileNumber = 4,
kOldestAncesterTime = 5,
kFileCreationTime = 6,
kFileChecksum = 7,
kFileChecksumFuncName = 8,
// If this bit for the custom tag is set, opening DB should fail if
// we don't know this field.
kCustomTagNonSafeIgnoreMask = 1 << 6,
// Forward incompatible (aka unignorable) fields
kPathId,
};
} // anonymous namespace } // anonymous namespace
uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id) { uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id) {
@ -150,6 +93,8 @@ void VersionEdit::Clear() {
new_files_.clear(); new_files_.clear();
blob_file_additions_.clear(); blob_file_additions_.clear();
blob_file_garbages_.clear(); blob_file_garbages_.clear();
wal_additions_.clear();
wal_deletions_.clear();
column_family_ = 0; column_family_ = 0;
is_column_family_add_ = false; is_column_family_add_ = false;
is_column_family_drop_ = false; is_column_family_drop_ = false;
@ -284,6 +229,16 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
blob_file_garbage.EncodeTo(dst); blob_file_garbage.EncodeTo(dst);
} }
for (const auto& wal_addition : wal_additions_) {
PutVarint32(dst, kWalAddition);
wal_addition.EncodeTo(dst);
}
for (const auto& wal_deletion : wal_deletions_) {
PutVarint32(dst, kWalDeletion);
wal_deletion.EncodeTo(dst);
}
// 0 is default and does not need to be explicitly written // 0 is default and does not need to be explicitly written
if (column_family_ != 0) { if (column_family_ != 0) {
PutVarint32Varint32(dst, kColumnFamily, column_family_); PutVarint32Varint32(dst, kColumnFamily, column_family_);
@ -608,6 +563,28 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
break; break;
} }
case kWalAddition: {
WalAddition wal_addition;
const Status s = wal_addition.DecodeFrom(&input);
if (!s.ok()) {
return s;
}
wal_additions_.emplace_back(std::move(wal_addition));
break;
}
case kWalDeletion: {
WalDeletion wal_deletion;
const Status s = wal_deletion.DecodeFrom(&input);
if (!s.ok()) {
return s;
}
wal_deletions_.emplace_back(std::move(wal_deletion));
break;
}
case kColumnFamily: case kColumnFamily:
if (!GetVarint32(&input, &column_family_)) { if (!GetVarint32(&input, &column_family_)) {
if (!msg) { if (!msg) {
@ -748,6 +725,16 @@ std::string VersionEdit::DebugString(bool hex_key) const {
r.append(blob_file_garbage.DebugString()); r.append(blob_file_garbage.DebugString());
} }
for (const auto& wal_addition : wal_additions_) {
r.append("\n WalAddition: ");
r.append(wal_addition.DebugString());
}
for (const auto& wal_deletion : wal_deletions_) {
r.append("\n WalDeletion: ");
r.append(wal_deletion.DebugString());
}
r.append("\n ColumnFamily: "); r.append("\n ColumnFamily: ");
AppendNumberTo(&r, column_family_); AppendNumberTo(&r, column_family_);
if (is_column_family_add_) { if (is_column_family_add_) {
@ -858,6 +845,34 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const {
jw.EndArray(); jw.EndArray();
} }
if (!wal_additions_.empty()) {
jw << "WalAdditions";
jw.StartArray();
for (const auto& wal_addition : wal_additions_) {
jw.StartArrayedObject();
jw << wal_addition;
jw.EndArrayedObject();
}
jw.EndArray();
}
if (!wal_deletions_.empty()) {
jw << "WalDeletions";
jw.StartArray();
for (const auto& wal_deletion : wal_deletions_) {
jw.StartArrayedObject();
jw << wal_deletion;
jw.EndArrayedObject();
}
jw.EndArray();
}
jw << "ColumnFamily" << column_family_; jw << "ColumnFamily" << column_family_;
if (is_column_family_add_) { if (is_column_family_add_) {

@ -13,9 +13,11 @@
#include <string> #include <string>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "db/blob/blob_file_addition.h" #include "db/blob/blob_file_addition.h"
#include "db/blob/blob_file_garbage.h" #include "db/blob/blob_file_garbage.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/wal_edit.h"
#include "memory/arena.h" #include "memory/arena.h"
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
#include "table/table_reader.h" #include "table/table_reader.h"
@ -23,6 +25,65 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
// Tag numbers for serialized VersionEdit. These numbers are written to
// disk and should not be changed. The number should be forward compatible so
// users can down-grade RocksDB safely. A future Tag is ignored by doing '&'
// between Tag and kTagSafeIgnoreMask field.
enum Tag : uint32_t {
kComparator = 1,
kLogNumber = 2,
kNextFileNumber = 3,
kLastSequence = 4,
kCompactPointer = 5,
kDeletedFile = 6,
kNewFile = 7,
// 8 was used for large value refs
kPrevLogNumber = 9,
kMinLogNumberToKeep = 10,
// these are new formats divergent from open source leveldb
kNewFile2 = 100,
kNewFile3 = 102,
kNewFile4 = 103, // 4th (the latest) format version of adding files
kColumnFamily = 200, // specify column family for version edit
kColumnFamilyAdd = 201,
kColumnFamilyDrop = 202,
kMaxColumnFamily = 203,
kInAtomicGroup = 300,
// Mask for an unidentified tag from the future which can be safely ignored.
kTagSafeIgnoreMask = 1 << 13,
// Forward compatible (aka ignorable) records
kDbId,
kBlobFileAddition,
kBlobFileGarbage,
kWalAddition,
kWalDeletion,
};
enum NewFileCustomTag : uint32_t {
kTerminate = 1, // The end of customized fields
kNeedCompaction = 2,
// Since Manifest is not entirely forward-compatible, we currently encode
// kMinLogNumberToKeep as part of NewFile as a hack. This should be removed
// when manifest becomes forward-comptabile.
kMinLogNumberToKeepHack = 3,
kOldestBlobFileNumber = 4,
kOldestAncesterTime = 5,
kFileCreationTime = 6,
kFileChecksum = 7,
kFileChecksumFuncName = 8,
// If this bit for the custom tag is set, opening DB should fail if
// we don't know this field.
kCustomTagNonSafeIgnoreMask = 1 << 6,
// Forward incompatible (aka unignorable) fields
kPathId,
};
class VersionSet; class VersionSet;
constexpr uint64_t kFileNumberMask = 0x3FFFFFFFFFFFFFFF; constexpr uint64_t kFileNumberMask = 0x3FFFFFFFFFFFFFFF;
@ -374,10 +435,29 @@ class VersionEdit {
return blob_file_garbages_; return blob_file_garbages_;
} }
// Add a WAL (either just created or closed).
void AddWal(WalNumber number, WalMetadata metadata = WalMetadata()) {
wal_additions_.emplace_back(number, std::move(metadata));
}
// Retrieve all the added WALs.
const WalAdditions& GetWalAdditions() const { return wal_additions_; }
bool HasWalAddition() const { return !wal_additions_.empty(); }
// Delete a WAL (either directly deleted or archived).
void DeleteWal(WalNumber number) { wal_deletions_.emplace_back(number); }
// Retrieve all the deleted WALs.
const WalDeletions& GetWalDeletions() const { return wal_deletions_; }
bool HasWalDeletion() const { return !wal_deletions_.empty(); }
// Number of edits // Number of edits
size_t NumEntries() const { size_t NumEntries() const {
return new_files_.size() + deleted_files_.size() + return new_files_.size() + deleted_files_.size() +
blob_file_additions_.size() + blob_file_garbages_.size(); blob_file_additions_.size() + blob_file_garbages_.size() +
wal_additions_.size() + wal_deletions_.size();
} }
void SetColumnFamily(uint32_t column_family_id) { void SetColumnFamily(uint32_t column_family_id) {
@ -457,6 +537,9 @@ class VersionEdit {
BlobFileAdditions blob_file_additions_; BlobFileAdditions blob_file_additions_;
BlobFileGarbages blob_file_garbages_; BlobFileGarbages blob_file_garbages_;
WalAdditions wal_additions_;
WalDeletions wal_deletions_;
// Each version edit record should have column_family_ set // Each version edit record should have column_family_ set
// If it's not set, it is default (0) // If it's not set, it is default (0)
uint32_t column_family_ = 0; uint32_t column_family_ = 0;

@ -309,6 +309,211 @@ TEST_F(VersionEditTest, BlobFileAdditionAndGarbage) {
TestEncodeDecode(edit); TestEncodeDecode(edit);
} }
TEST_F(VersionEditTest, AddWalEncodeDecode) {
VersionEdit edit;
for (uint64_t log_number = 1; log_number <= 20; log_number++) {
WalMetadata meta(rand() % 100);
bool has_size = rand() % 2 == 0;
if (has_size) {
meta.SetSizeInBytes(rand() % 1000);
}
edit.AddWal(log_number, meta);
}
TestEncodeDecode(edit);
}
TEST_F(VersionEditTest, AddWalDecodeBadLogNumber) {
std::string encoded;
PutVarint32(&encoded, Tag::kWalAddition);
{
// No log number.
VersionEdit edit;
Status s = edit.DecodeFrom(encoded);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("Error decoding WAL log number") !=
std::string::npos)
<< s.ToString();
}
{
// log number should be varint64,
// but we only encode 128 which is not a valid representation of varint64.
char c = 0;
unsigned char* ptr = reinterpret_cast<unsigned char*>(&c);
*ptr = 128;
encoded.append(1, c);
VersionEdit edit;
Status s = edit.DecodeFrom(encoded);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("Error decoding WAL log number") !=
std::string::npos)
<< s.ToString();
}
}
TEST_F(VersionEditTest, AddWalDecodeBadTag) {
constexpr WalNumber kLogNumber = 100;
constexpr uint64_t kSizeInBytes = 100;
std::string encoded_without_tag;
PutVarint32(&encoded_without_tag, Tag::kWalAddition);
PutVarint64(&encoded_without_tag, kLogNumber);
{
// No tag.
VersionEdit edit;
Status s = edit.DecodeFrom(encoded_without_tag);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos)
<< s.ToString();
}
{
// Only has size tag, no terminate tag.
std::string encoded_with_size = encoded_without_tag;
PutVarint32(&encoded_with_size,
static_cast<uint32_t>(WalAdditionTag::kSize));
PutVarint64(&encoded_with_size, kSizeInBytes);
VersionEdit edit;
Status s = edit.DecodeFrom(encoded_with_size);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos)
<< s.ToString();
}
{
// Only has terminate tag.
std::string encoded_with_terminate = encoded_without_tag;
PutVarint32(&encoded_with_terminate,
static_cast<uint32_t>(WalAdditionTag::kTerminate));
VersionEdit edit;
ASSERT_OK(edit.DecodeFrom(encoded_with_terminate));
auto& wal_addition = edit.GetWalAdditions()[0];
ASSERT_EQ(wal_addition.GetLogNumber(), kLogNumber);
ASSERT_FALSE(wal_addition.GetMetadata().HasSize());
}
}
TEST_F(VersionEditTest, AddWalDecodeNoSize) {
constexpr WalNumber kLogNumber = 100;
std::string encoded;
PutVarint32(&encoded, Tag::kWalAddition);
PutVarint64(&encoded, kLogNumber);
PutVarint32(&encoded, static_cast<uint32_t>(WalAdditionTag::kSize));
// No real size after the size tag.
{
// Without terminate tag.
VersionEdit edit;
Status s = edit.DecodeFrom(encoded);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("Error decoding WAL file size") !=
std::string::npos)
<< s.ToString();
}
{
// With terminate tag.
PutVarint32(&encoded, static_cast<uint32_t>(WalAdditionTag::kTerminate));
VersionEdit edit;
Status s = edit.DecodeFrom(encoded);
ASSERT_TRUE(s.IsCorruption());
// The terminate tag is misunderstood as the size.
ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos)
<< s.ToString();
}
}
TEST_F(VersionEditTest, AddWalDebug) {
constexpr int n = 2;
constexpr std::array<uint64_t, n> kLogNumbers{{10, 20}};
constexpr std::array<uint64_t, n> kSizeInBytes{{100, 200}};
VersionEdit edit;
for (int i = 0; i < n; i++) {
edit.AddWal(kLogNumbers[i], WalMetadata(kSizeInBytes[i]));
}
const WalAdditions& wals = edit.GetWalAdditions();
ASSERT_TRUE(edit.HasWalAddition());
ASSERT_EQ(wals.size(), n);
for (int i = 0; i < n; i++) {
const WalAddition& wal = wals[i];
ASSERT_EQ(wal.GetLogNumber(), kLogNumbers[i]);
ASSERT_EQ(wal.GetMetadata().GetSizeInBytes(), kSizeInBytes[i]);
}
std::string expected_str = "VersionEdit {\n";
for (int i = 0; i < n; i++) {
std::stringstream ss;
ss << " WalAddition: log_number: " << kLogNumbers[i]
<< " size_in_bytes: " << kSizeInBytes[i] << "\n";
expected_str += ss.str();
}
expected_str += " ColumnFamily: 0\n}\n";
ASSERT_EQ(edit.DebugString(true), expected_str);
std::string expected_json = "{\"EditNumber\": 4, \"WalAdditions\": [";
for (int i = 0; i < n; i++) {
std::stringstream ss;
ss << "{\"LogNumber\": " << kLogNumbers[i] << ", "
<< "\"SizeInBytes\": " << kSizeInBytes[i] << "}";
if (i < n - 1) ss << ", ";
expected_json += ss.str();
}
expected_json += "], \"ColumnFamily\": 0}";
ASSERT_EQ(edit.DebugJSON(4, true), expected_json);
}
TEST_F(VersionEditTest, DeleteWalEncodeDecode) {
VersionEdit edit;
for (uint64_t log_number = 1; log_number <= 20; log_number++) {
edit.DeleteWal(log_number);
}
TestEncodeDecode(edit);
}
TEST_F(VersionEditTest, DeleteWalDebug) {
constexpr int n = 2;
constexpr std::array<uint64_t, n> kLogNumbers{{10, 20}};
VersionEdit edit;
for (int i = 0; i < n; i++) {
edit.DeleteWal(kLogNumbers[i]);
}
const WalDeletions& wals = edit.GetWalDeletions();
ASSERT_TRUE(edit.HasWalDeletion());
ASSERT_EQ(wals.size(), n);
for (int i = 0; i < n; i++) {
const WalDeletion& wal = wals[i];
ASSERT_EQ(wal.GetLogNumber(), kLogNumbers[i]);
}
std::string expected_str = "VersionEdit {\n";
for (int i = 0; i < n; i++) {
std::stringstream ss;
ss << " WalDeletion: log_number: " << kLogNumbers[i] << "\n";
expected_str += ss.str();
}
expected_str += " ColumnFamily: 0\n}\n";
ASSERT_EQ(edit.DebugString(true), expected_str);
std::string expected_json = "{\"EditNumber\": 4, \"WalDeletions\": [";
for (int i = 0; i < n; i++) {
std::stringstream ss;
ss << "{\"LogNumber\": " << kLogNumbers[i] << "}";
if (i < n - 1) ss << ", ";
expected_json += ss.str();
}
expected_json += "], \"ColumnFamily\": 0}";
ASSERT_EQ(edit.DebugJSON(4, true), expected_json);
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -3696,6 +3696,7 @@ void VersionSet::Reset() {
manifest_file_size_ = 0; manifest_file_size_ = 0;
obsolete_files_.clear(); obsolete_files_.clear();
obsolete_manifests_.clear(); obsolete_manifests_.clear();
wals_.Reset();
} }
void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,

@ -1185,6 +1185,8 @@ class VersionSet {
// Get the IO Status returned by written Manifest. // Get the IO Status returned by written Manifest.
const IOStatus& io_status() const { return io_status_; } const IOStatus& io_status() const { return io_status_; }
const WalSet& GetWalSet() const { return wals_; }
void TEST_CreateAndAppendVersion(ColumnFamilyData* cfd) { void TEST_CreateAndAppendVersion(ColumnFamilyData* cfd) {
assert(cfd); assert(cfd);
@ -1271,6 +1273,8 @@ class VersionSet {
Status VerifyFileMetadata(const std::string& fpath, Status VerifyFileMetadata(const std::string& fpath,
const FileMetaData& meta) const; const FileMetaData& meta) const;
WalSet wals_;
std::unique_ptr<ColumnFamilySet> column_family_set_; std::unique_ptr<ColumnFamilySet> column_family_set_;
Env* const env_; Env* const env_;
FileSystem* const fs_; FileSystem* const fs_;

@ -0,0 +1,175 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/wal_edit.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "util/coding.h"
namespace ROCKSDB_NAMESPACE {
void WalAddition::EncodeTo(std::string* dst) const {
PutVarint64(dst, number_);
if (metadata_.HasSize()) {
PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kSize));
PutVarint64(dst, metadata_.GetSizeInBytes());
}
PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kTerminate));
}
Status WalAddition::DecodeFrom(Slice* src) {
constexpr char class_name[] = "WalAddition";
if (!GetVarint64(src, &number_)) {
return Status::Corruption(class_name, "Error decoding WAL log number");
}
while (true) {
uint32_t tag_value = 0;
if (!GetVarint32(src, &tag_value)) {
return Status::Corruption(class_name, "Error decoding tag");
}
WalAdditionTag tag = static_cast<WalAdditionTag>(tag_value);
switch (tag) {
case WalAdditionTag::kSize: {
uint64_t size = 0;
if (!GetVarint64(src, &size)) {
return Status::Corruption(class_name, "Error decoding WAL file size");
}
metadata_.SetSizeInBytes(size);
break;
}
// TODO: process future tags such as checksum.
case WalAdditionTag::kTerminate:
return Status::OK();
default: {
std::stringstream ss;
ss << "Unknown tag " << tag_value;
return Status::Corruption(class_name, ss.str());
}
}
}
}
JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal) {
jw << "LogNumber" << wal.GetLogNumber() << "SizeInBytes"
<< wal.GetMetadata().GetSizeInBytes();
return jw;
}
std::ostream& operator<<(std::ostream& os, const WalAddition& wal) {
os << "log_number: " << wal.GetLogNumber()
<< " size_in_bytes: " << wal.GetMetadata().GetSizeInBytes();
return os;
}
std::string WalAddition::DebugString() const {
std::ostringstream oss;
oss << *this;
return oss.str();
}
void WalDeletion::EncodeTo(std::string* dst) const {
PutVarint64(dst, number_);
}
Status WalDeletion::DecodeFrom(Slice* src) {
constexpr char class_name[] = "WalDeletion";
if (!GetVarint64(src, &number_)) {
return Status::Corruption(class_name, "Error decoding WAL log number");
}
return Status::OK();
}
JSONWriter& operator<<(JSONWriter& jw, const WalDeletion& wal) {
jw << "LogNumber" << wal.GetLogNumber();
return jw;
}
std::ostream& operator<<(std::ostream& os, const WalDeletion& wal) {
os << "log_number: " << wal.GetLogNumber();
return os;
}
std::string WalDeletion::DebugString() const {
std::ostringstream oss;
oss << *this;
return oss.str();
}
Status WalSet::AddWal(const WalAddition& wal) {
auto it = wals_.lower_bound(wal.GetLogNumber());
if (wal.GetMetadata().HasSize()) {
// The WAL must exist without size.
if (it == wals_.end() || it->first != wal.GetLogNumber()) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " is not created before closing";
return Status::Corruption("WalSet", ss.str());
}
if (it->second.HasSize()) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " is closed more than once";
return Status::Corruption("WalSet", ss.str());
}
it->second = wal.GetMetadata();
} else {
// The WAL must not exist beforehand.
if (it != wals_.end() && it->first == wal.GetLogNumber()) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " is created more than once";
return Status::Corruption("WalSet", ss.str());
}
wals_[wal.GetLogNumber()] = wal.GetMetadata();
}
return Status::OK();
}
Status WalSet::AddWals(const WalAdditions& wals) {
Status s;
for (const WalAddition& wal : wals) {
s = AddWal(wal);
if (!s.ok()) {
break;
}
}
return s;
}
Status WalSet::DeleteWal(const WalDeletion& wal) {
auto it = wals_.lower_bound(wal.GetLogNumber());
// The WAL must exist and has been closed.
if (it == wals_.end() || it->first != wal.GetLogNumber()) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " must exist before deletion";
return Status::Corruption("WalSet", ss.str());
}
if (!it->second.HasSize()) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " must be closed before deletion";
return Status::Corruption("WalSet", ss.str());
}
wals_.erase(it);
return Status::OK();
}
Status WalSet::DeleteWals(const WalDeletions& wals) {
Status s;
for (const WalDeletion& wal : wals) {
s = DeleteWal(wal);
if (!s.ok()) {
break;
}
}
return s;
}
void WalSet::Reset() { wals_.clear(); }
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,143 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// WAL related classes used in VersionEdit and VersionSet.
#pragma once
#include <map>
#include <ostream>
#include <string>
#include <vector>
#include "logging/event_logger.h"
#include "rocksdb/rocksdb_namespace.h"
namespace ROCKSDB_NAMESPACE {
class JSONWriter;
class Slice;
class Status;
using WalNumber = uint64_t;
// Metadata of a WAL.
class WalMetadata {
public:
WalMetadata() = default;
explicit WalMetadata(uint64_t size_bytes) : size_bytes_(size_bytes) {}
bool HasSize() const { return size_bytes_ != kUnknownWalSize; }
void SetSizeInBytes(uint64_t bytes) { size_bytes_ = bytes; }
uint64_t GetSizeInBytes() const { return size_bytes_; }
private:
// The size of WAL is unknown, used when the WAL is not closed yet.
constexpr static uint64_t kUnknownWalSize = 0;
// Size of a closed WAL in bytes.
uint64_t size_bytes_ = kUnknownWalSize;
};
// These tags are persisted to MANIFEST, so it's part of the user API.
enum class WalAdditionTag : uint32_t {
// Indicates that there are no more tags.
kTerminate = 1,
// Size in bytes.
kSize = 2,
// Add tags in the future, such as checksum?
};
// Records the event of adding a WAL in VersionEdit.
class WalAddition {
public:
WalAddition() : number_(0), metadata_() {}
explicit WalAddition(WalNumber number) : number_(number), metadata_() {}
WalAddition(WalNumber number, WalMetadata meta)
: number_(number), metadata_(std::move(meta)) {}
WalNumber GetLogNumber() const { return number_; }
const WalMetadata& GetMetadata() const { return metadata_; }
void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* src);
std::string DebugString() const;
private:
WalNumber number_;
WalMetadata metadata_;
};
std::ostream& operator<<(std::ostream& os, const WalAddition& wal);
JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal);
using WalAdditions = std::vector<WalAddition>;
// Records the event of deleting/archiving a WAL in VersionEdit.
class WalDeletion {
public:
WalDeletion() : number_(0) {}
explicit WalDeletion(WalNumber number) : number_(number) {}
WalNumber GetLogNumber() const { return number_; }
void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* src);
std::string DebugString() const;
private:
WalNumber number_;
};
std::ostream& operator<<(std::ostream& os, const WalDeletion& wal);
JSONWriter& operator<<(JSONWriter& jw, const WalDeletion& wal);
using WalDeletions = std::vector<WalDeletion>;
// Used in VersionSet to keep the current set of WALs.
//
// When a WAL is created, closed, deleted, or archived,
// a VersionEdit is logged to MANIFEST and
// the WAL is added to or deleted from WalSet.
//
// Not thread safe, needs external synchronization such as holding DB mutex.
class WalSet {
public:
// Add WAL(s).
// If the WAL has size, it means the WAL is closed,
// then there must be an existing WAL without size that is added
// when creating the WAL, otherwise, return Status::Corruption.
// Can happen when applying a VersionEdit or recovering from MANIFEST.
Status AddWal(const WalAddition& wal);
Status AddWals(const WalAdditions& wals);
// Delete WAL(s).
// The WAL to be deleted must exist, otherwise,
// return Status::Corruption.
// Can happen when applying a VersionEdit or recovering from MANIFEST.
Status DeleteWal(const WalDeletion& wal);
Status DeleteWals(const WalDeletions& wals);
// Resets the internal state.
void Reset();
const std::map<WalNumber, WalMetadata>& GetWals() const { return wals_; }
private:
std::map<WalNumber, WalMetadata> wals_;
};
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,127 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/wal_edit.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
namespace ROCKSDB_NAMESPACE {
TEST(WalSet, AddDeleteReset) {
WalSet wals;
ASSERT_TRUE(wals.GetWals().empty());
// Create WAL 1 - 10.
for (WalNumber log_number = 1; log_number <= 10; log_number++) {
wals.AddWal(WalAddition(log_number));
}
ASSERT_EQ(wals.GetWals().size(), 10);
// Close WAL 1 - 5.
for (WalNumber log_number = 1; log_number <= 5; log_number++) {
wals.AddWal(WalAddition(log_number, WalMetadata(100)));
}
ASSERT_EQ(wals.GetWals().size(), 10);
// Delete WAL 1 - 5.
for (WalNumber log_number = 1; log_number <= 5; log_number++) {
wals.DeleteWal(WalDeletion(log_number));
}
ASSERT_EQ(wals.GetWals().size(), 5);
WalNumber expected_log_number = 6;
for (auto it : wals.GetWals()) {
WalNumber log_number = it.first;
ASSERT_EQ(log_number, expected_log_number++);
}
wals.Reset();
ASSERT_TRUE(wals.GetWals().empty());
}
TEST(WalSet, Overwrite) {
constexpr WalNumber kNumber = 100;
constexpr uint64_t kBytes = 200;
WalSet wals;
wals.AddWal(WalAddition(kNumber));
ASSERT_FALSE(wals.GetWals().at(kNumber).HasSize());
wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes)));
ASSERT_TRUE(wals.GetWals().at(kNumber).HasSize());
ASSERT_EQ(wals.GetWals().at(kNumber).GetSizeInBytes(), kBytes);
}
TEST(WalSet, CreateTwice) {
constexpr WalNumber kNumber = 100;
WalSet wals;
ASSERT_OK(wals.AddWal(WalAddition(kNumber)));
Status s = wals.AddWal(WalAddition(kNumber));
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("WAL 100 is created more than once") !=
std::string::npos);
}
TEST(WalSet, CloseTwice) {
constexpr WalNumber kNumber = 100;
constexpr uint64_t kBytes = 200;
WalSet wals;
ASSERT_OK(wals.AddWal(WalAddition(kNumber)));
ASSERT_OK(wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes))));
Status s = wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes)));
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("WAL 100 is closed more than once") !=
std::string::npos);
}
TEST(WalSet, CloseBeforeCreate) {
constexpr WalNumber kNumber = 100;
constexpr uint64_t kBytes = 200;
WalSet wals;
Status s = wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes)));
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("WAL 100 is not created before closing") !=
std::string::npos);
}
TEST(WalSet, CreateAfterClose) {
constexpr WalNumber kNumber = 100;
constexpr uint64_t kBytes = 200;
WalSet wals;
ASSERT_OK(wals.AddWal(WalAddition(kNumber)));
ASSERT_OK(wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes))));
Status s = wals.AddWal(WalAddition(kNumber));
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("WAL 100 is created more than once") !=
std::string::npos);
}
TEST(WalSet, DeleteNonExistingWal) {
constexpr WalNumber kNonExistingNumber = 100;
WalSet wals;
Status s = wals.DeleteWal(WalDeletion(kNonExistingNumber));
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("WAL 100 must exist before deletion") !=
std::string::npos);
}
TEST(WalSet, DeleteNonClosedWal) {
constexpr WalNumber kNonExistingNumber = 100;
WalSet wals;
ASSERT_OK(wals.AddWal(WalAddition(kNonExistingNumber)));
Status s = wals.DeleteWal(WalDeletion(kNonExistingNumber));
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("WAL 100 must be closed before deletion") !=
std::string::npos);
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -67,6 +67,7 @@ LIB_SOURCES = \
db/version_edit.cc \ db/version_edit.cc \
db/version_edit_handler.cc \ db/version_edit_handler.cc \
db/version_set.cc \ db/version_set.cc \
db/wal_edit.cc \
db/wal_manager.cc \ db/wal_manager.cc \
db/write_batch.cc \ db/write_batch.cc \
db/write_batch_base.cc \ db/write_batch_base.cc \

Loading…
Cancel
Save