Track WAL in MANIFEST: update WalMetadata for WAL syncing (#7414)

Summary:
There are some tricky behaviors related to WAL sync:

- When creating a WAL, the WAL might not be synced, if the WAL directory is not synced, the WAL file's metadata may not even be synced to disk, so during recovery, when listing the WAL directory, the WAL may not even show up.
- During each DB::Write, the WriteOption can control whether the WAL should be synced, so a WAL previously not synced on creation can be synced during Write.

For each `SyncWAL`, we'll track the synced status and the current WAL size. Previously, we only track the WAL size on closing.
During recovery, we check that the on-disk WAL size is >= the last synced size.

So this PR introduces `synced_size` and `closed` to `WalMetadata` for the above design update.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7414

Test Plan:
- updated wal_edit_test
- updated version_edit_test

Reviewed By: riversand963

Differential Revision: D23796127

Pulled By: cheng-chang

fbshipit-source-id: 5498ab80f537c48a10157e71a4745716aef5cf30
main
Cheng Chang 4 years ago committed by Facebook GitHub Bot
parent cd72f8974b
commit 00ee89b584
  1. 21
      db/version_edit_test.cc
  2. 55
      db/wal_edit.cc
  3. 42
      db/wal_edit.h
  4. 43
      db/wal_edit_test.cc

@ -312,10 +312,14 @@ TEST_F(VersionEditTest, BlobFileAdditionAndGarbage) {
TEST_F(VersionEditTest, AddWalEncodeDecode) { TEST_F(VersionEditTest, AddWalEncodeDecode) {
VersionEdit edit; VersionEdit edit;
for (uint64_t log_number = 1; log_number <= 20; log_number++) { for (uint64_t log_number = 1; log_number <= 20; log_number++) {
WalMetadata meta(rand() % 100); WalMetadata meta;
bool has_size = rand() % 2 == 0; bool has_size = rand() % 2 == 0;
if (has_size) { if (has_size) {
meta.SetSizeInBytes(rand() % 1000); meta.SetSyncedSizeInBytes(rand() % 1000);
}
bool is_closed = rand() % 2 == 0;
if (is_closed) {
meta.SetClosed();
} }
edit.AddWal(log_number, meta); edit.AddWal(log_number, meta);
} }
@ -373,7 +377,7 @@ TEST_F(VersionEditTest, AddWalDecodeBadTag) {
// Only has size tag, no terminate tag. // Only has size tag, no terminate tag.
std::string encoded_with_size = encoded_without_tag; std::string encoded_with_size = encoded_without_tag;
PutVarint32(&encoded_with_size, PutVarint32(&encoded_with_size,
static_cast<uint32_t>(WalAdditionTag::kSize)); static_cast<uint32_t>(WalAdditionTag::kSyncedSize));
PutVarint64(&encoded_with_size, kSizeInBytes); PutVarint64(&encoded_with_size, kSizeInBytes);
VersionEdit edit; VersionEdit edit;
Status s = edit.DecodeFrom(encoded_with_size); Status s = edit.DecodeFrom(encoded_with_size);
@ -391,7 +395,7 @@ TEST_F(VersionEditTest, AddWalDecodeBadTag) {
ASSERT_OK(edit.DecodeFrom(encoded_with_terminate)); ASSERT_OK(edit.DecodeFrom(encoded_with_terminate));
auto& wal_addition = edit.GetWalAdditions()[0]; auto& wal_addition = edit.GetWalAdditions()[0];
ASSERT_EQ(wal_addition.GetLogNumber(), kLogNumber); ASSERT_EQ(wal_addition.GetLogNumber(), kLogNumber);
ASSERT_FALSE(wal_addition.GetMetadata().HasSize()); ASSERT_FALSE(wal_addition.GetMetadata().HasSyncedSize());
} }
} }
@ -401,7 +405,7 @@ TEST_F(VersionEditTest, AddWalDecodeNoSize) {
std::string encoded; std::string encoded;
PutVarint32(&encoded, Tag::kWalAddition); PutVarint32(&encoded, Tag::kWalAddition);
PutVarint64(&encoded, kLogNumber); PutVarint64(&encoded, kLogNumber);
PutVarint32(&encoded, static_cast<uint32_t>(WalAdditionTag::kSize)); PutVarint32(&encoded, static_cast<uint32_t>(WalAdditionTag::kSyncedSize));
// No real size after the size tag. // No real size after the size tag.
{ {
@ -443,14 +447,14 @@ TEST_F(VersionEditTest, AddWalDebug) {
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
const WalAddition& wal = wals[i]; const WalAddition& wal = wals[i];
ASSERT_EQ(wal.GetLogNumber(), kLogNumbers[i]); ASSERT_EQ(wal.GetLogNumber(), kLogNumbers[i]);
ASSERT_EQ(wal.GetMetadata().GetSizeInBytes(), kSizeInBytes[i]); ASSERT_EQ(wal.GetMetadata().GetSyncedSizeInBytes(), kSizeInBytes[i]);
} }
std::string expected_str = "VersionEdit {\n"; std::string expected_str = "VersionEdit {\n";
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
std::stringstream ss; std::stringstream ss;
ss << " WalAddition: log_number: " << kLogNumbers[i] ss << " WalAddition: log_number: " << kLogNumbers[i]
<< " size_in_bytes: " << kSizeInBytes[i] << "\n"; << " synced_size_in_bytes: " << kSizeInBytes[i] << " closed: 0\n";
expected_str += ss.str(); expected_str += ss.str();
} }
expected_str += " ColumnFamily: 0\n}\n"; expected_str += " ColumnFamily: 0\n}\n";
@ -460,7 +464,8 @@ TEST_F(VersionEditTest, AddWalDebug) {
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
std::stringstream ss; std::stringstream ss;
ss << "{\"LogNumber\": " << kLogNumbers[i] << ", " ss << "{\"LogNumber\": " << kLogNumbers[i] << ", "
<< "\"SizeInBytes\": " << kSizeInBytes[i] << "}"; << "\"SyncedSizeInBytes\": " << kSizeInBytes[i] << ", "
<< "\"Closed\": 0}";
if (i < n - 1) ss << ", "; if (i < n - 1) ss << ", ";
expected_json += ss.str(); expected_json += ss.str();
} }

@ -14,9 +14,13 @@ namespace ROCKSDB_NAMESPACE {
void WalAddition::EncodeTo(std::string* dst) const { void WalAddition::EncodeTo(std::string* dst) const {
PutVarint64(dst, number_); PutVarint64(dst, number_);
if (metadata_.HasSize()) { if (metadata_.HasSyncedSize()) {
PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kSize)); PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kSyncedSize));
PutVarint64(dst, metadata_.GetSizeInBytes()); PutVarint64(dst, metadata_.GetSyncedSizeInBytes());
}
if (metadata_.IsClosed()) {
PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kClosed));
} }
PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kTerminate)); PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kTerminate));
@ -36,12 +40,16 @@ Status WalAddition::DecodeFrom(Slice* src) {
} }
WalAdditionTag tag = static_cast<WalAdditionTag>(tag_value); WalAdditionTag tag = static_cast<WalAdditionTag>(tag_value);
switch (tag) { switch (tag) {
case WalAdditionTag::kSize: { case WalAdditionTag::kSyncedSize: {
uint64_t size = 0; uint64_t size = 0;
if (!GetVarint64(src, &size)) { if (!GetVarint64(src, &size)) {
return Status::Corruption(class_name, "Error decoding WAL file size"); return Status::Corruption(class_name, "Error decoding WAL file size");
} }
metadata_.SetSizeInBytes(size); metadata_.SetSyncedSizeInBytes(size);
break;
}
case WalAdditionTag::kClosed: {
metadata_.SetClosed();
break; break;
} }
// TODO: process future tags such as checksum. // TODO: process future tags such as checksum.
@ -57,14 +65,16 @@ Status WalAddition::DecodeFrom(Slice* src) {
} }
JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal) { JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal) {
jw << "LogNumber" << wal.GetLogNumber() << "SizeInBytes" jw << "LogNumber" << wal.GetLogNumber() << "SyncedSizeInBytes"
<< wal.GetMetadata().GetSizeInBytes(); << wal.GetMetadata().GetSyncedSizeInBytes() << "Closed"
<< wal.GetMetadata().IsClosed();
return jw; return jw;
} }
std::ostream& operator<<(std::ostream& os, const WalAddition& wal) { std::ostream& operator<<(std::ostream& os, const WalAddition& wal) {
os << "log_number: " << wal.GetLogNumber() os << "log_number: " << wal.GetLogNumber()
<< " size_in_bytes: " << wal.GetMetadata().GetSizeInBytes(); << " synced_size_in_bytes: " << wal.GetMetadata().GetSyncedSizeInBytes()
<< " closed: " << wal.GetMetadata().IsClosed();
return os; return os;
} }
@ -106,26 +116,33 @@ std::string WalDeletion::DebugString() const {
Status WalSet::AddWal(const WalAddition& wal) { Status WalSet::AddWal(const WalAddition& wal) {
auto it = wals_.lower_bound(wal.GetLogNumber()); auto it = wals_.lower_bound(wal.GetLogNumber());
if (wal.GetMetadata().HasSize()) { bool existing = it != wals_.end() && it->first == wal.GetLogNumber();
// The WAL must exist without size. if (wal.GetMetadata().IsClosed()) {
if (it == wals_.end() || it->first != wal.GetLogNumber()) { // The WAL must exist and not closed.
if (!existing) {
std::stringstream ss; std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " is not created before closing"; ss << "WAL " << wal.GetLogNumber() << " is not created before closing";
return Status::Corruption("WalSet", ss.str()); return Status::Corruption("WalSet", ss.str());
} }
if (it->second.HasSize()) { if (it->second.IsClosed()) {
std::stringstream ss; std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " is closed more than once"; ss << "WAL " << wal.GetLogNumber() << " is closed more than once";
return Status::Corruption("WalSet", ss.str()); return Status::Corruption("WalSet", ss.str());
} }
}
// If the WAL has synced size, it must >= the previous size.
if (existing && it->second.HasSyncedSize() &&
(!wal.GetMetadata().HasSyncedSize() ||
wal.GetMetadata().GetSyncedSizeInBytes() <
it->second.GetSyncedSizeInBytes())) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber()
<< " must not have smaller synced size than previous one";
return Status::Corruption("WalSet", ss.str());
}
if (existing) {
it->second = wal.GetMetadata(); it->second = wal.GetMetadata();
} else { } else {
// The WAL must not exist beforehand.
if (it != wals_.end() && it->first == wal.GetLogNumber()) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " is created more than once";
return Status::Corruption("WalSet", ss.str());
}
wals_.insert(it, {wal.GetLogNumber(), wal.GetMetadata()}); wals_.insert(it, {wal.GetLogNumber(), wal.GetMetadata()});
} }
return Status::OK(); return Status::OK();
@ -150,7 +167,7 @@ Status WalSet::DeleteWal(const WalDeletion& wal) {
ss << "WAL " << wal.GetLogNumber() << " must exist before deletion"; ss << "WAL " << wal.GetLogNumber() << " must exist before deletion";
return Status::Corruption("WalSet", ss.str()); return Status::Corruption("WalSet", ss.str());
} }
if (!it->second.HasSize()) { if (!it->second.IsClosed()) {
std::stringstream ss; std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " must be closed before deletion"; ss << "WAL " << wal.GetLogNumber() << " must be closed before deletion";
return Status::Corruption("WalSet", ss.str()); return Status::Corruption("WalSet", ss.str());

@ -4,6 +4,8 @@
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
// WAL related classes used in VersionEdit and VersionSet. // WAL related classes used in VersionEdit and VersionSet.
// Modifications to WalAddition and WalDeletion may need to update
// VersionEdit and its related tests.
#pragma once #pragma once
@ -13,6 +15,7 @@
#include <vector> #include <vector>
#include "logging/event_logger.h" #include "logging/event_logger.h"
#include "port/port.h"
#include "rocksdb/rocksdb_namespace.h" #include "rocksdb/rocksdb_namespace.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -28,28 +31,39 @@ class WalMetadata {
public: public:
WalMetadata() = default; WalMetadata() = default;
explicit WalMetadata(uint64_t size_bytes) : size_bytes_(size_bytes) {} explicit WalMetadata(uint64_t synced_size_bytes)
: synced_size_bytes_(synced_size_bytes) {}
bool HasSize() const { return size_bytes_ != kUnknownWalSize; } bool IsClosed() const { return closed_; }
void SetSizeInBytes(uint64_t bytes) { size_bytes_ = bytes; } void SetClosed() { closed_ = true; }
uint64_t GetSizeInBytes() const { return size_bytes_; } bool HasSyncedSize() const { return synced_size_bytes_ != kUnknownWalSize; }
void SetSyncedSizeInBytes(uint64_t bytes) { synced_size_bytes_ = bytes; }
uint64_t GetSyncedSizeInBytes() const { return synced_size_bytes_; }
private: private:
// The size of WAL is unknown, used when the WAL is not closed yet. // The size of WAL is unknown, used when the WAL is not synced yet or is
constexpr static uint64_t kUnknownWalSize = 0; // empty.
constexpr static uint64_t kUnknownWalSize = port::kMaxUint64;
// Size of the most recently synced WAL in bytes.
uint64_t synced_size_bytes_ = kUnknownWalSize;
// Size of a closed WAL in bytes. // Whether the WAL is closed.
uint64_t size_bytes_ = kUnknownWalSize; bool closed_ = false;
}; };
// These tags are persisted to MANIFEST, so it's part of the user API. // These tags are persisted to MANIFEST, so it's part of the user API.
enum class WalAdditionTag : uint32_t { enum class WalAdditionTag : uint32_t {
// Indicates that there are no more tags. // Indicates that there are no more tags.
kTerminate = 1, kTerminate = 1,
// Size in bytes. // Synced Size in bytes.
kSize = 2, kSyncedSize = 2,
// Whether the WAL is closed.
kClosed = 3,
// Add tags in the future, such as checksum? // Add tags in the future, such as checksum?
}; };
@ -117,15 +131,15 @@ using WalDeletions = std::vector<WalDeletion>;
class WalSet { class WalSet {
public: public:
// Add WAL(s). // Add WAL(s).
// If the WAL has size, it means the WAL is closed, // If the WAL is closed,
// then there must be an existing WAL without size that is added // then there must be an existing unclosed WAL,
// when creating the WAL, otherwise, return Status::Corruption. // otherwise, return Status::Corruption.
// Can happen when applying a VersionEdit or recovering from MANIFEST. // Can happen when applying a VersionEdit or recovering from MANIFEST.
Status AddWal(const WalAddition& wal); Status AddWal(const WalAddition& wal);
Status AddWals(const WalAdditions& wals); Status AddWals(const WalAdditions& wals);
// Delete WAL(s). // Delete WAL(s).
// The WAL to be deleted must exist, otherwise, // The WAL to be deleted must exist and be closed, otherwise,
// return Status::Corruption. // return Status::Corruption.
// Can happen when applying a VersionEdit or recovering from MANIFEST. // Can happen when applying a VersionEdit or recovering from MANIFEST.
Status DeleteWal(const WalDeletion& wal); Status DeleteWal(const WalDeletion& wal);

@ -24,7 +24,9 @@ TEST(WalSet, AddDeleteReset) {
// Close WAL 1 - 5. // Close WAL 1 - 5.
for (WalNumber log_number = 1; log_number <= 5; log_number++) { for (WalNumber log_number = 1; log_number <= 5; log_number++) {
wals.AddWal(WalAddition(log_number, WalMetadata(100))); WalMetadata wal(100);
wal.SetClosed();
wals.AddWal(WalAddition(log_number, wal));
} }
ASSERT_EQ(wals.GetWals().size(), 10); ASSERT_EQ(wals.GetWals().size(), 10);
@ -49,20 +51,23 @@ TEST(WalSet, Overwrite) {
constexpr uint64_t kBytes = 200; constexpr uint64_t kBytes = 200;
WalSet wals; WalSet wals;
wals.AddWal(WalAddition(kNumber)); wals.AddWal(WalAddition(kNumber));
ASSERT_FALSE(wals.GetWals().at(kNumber).HasSize()); ASSERT_FALSE(wals.GetWals().at(kNumber).HasSyncedSize());
wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes))); wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes)));
ASSERT_TRUE(wals.GetWals().at(kNumber).HasSize()); ASSERT_TRUE(wals.GetWals().at(kNumber).HasSyncedSize());
ASSERT_EQ(wals.GetWals().at(kNumber).GetSizeInBytes(), kBytes); ASSERT_EQ(wals.GetWals().at(kNumber).GetSyncedSizeInBytes(), kBytes);
} }
TEST(WalSet, CreateTwice) { TEST(WalSet, SmallerSyncedSize) {
constexpr WalNumber kNumber = 100; constexpr WalNumber kNumber = 100;
constexpr uint64_t kBytes = 100;
WalSet wals; WalSet wals;
ASSERT_OK(wals.AddWal(WalAddition(kNumber))); ASSERT_OK(wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes))));
Status s = wals.AddWal(WalAddition(kNumber)); Status s = wals.AddWal(WalAddition(kNumber, WalMetadata(0)));
ASSERT_TRUE(s.IsCorruption()); ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("WAL 100 is created more than once") != ASSERT_TRUE(
std::string::npos); s.ToString().find(
"WAL 100 must not have smaller synced size than previous one") !=
std::string::npos);
} }
TEST(WalSet, CloseTwice) { TEST(WalSet, CloseTwice) {
@ -70,8 +75,10 @@ TEST(WalSet, CloseTwice) {
constexpr uint64_t kBytes = 200; constexpr uint64_t kBytes = 200;
WalSet wals; WalSet wals;
ASSERT_OK(wals.AddWal(WalAddition(kNumber))); ASSERT_OK(wals.AddWal(WalAddition(kNumber)));
ASSERT_OK(wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes)))); WalMetadata wal(kBytes);
Status s = wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes))); wal.SetClosed();
ASSERT_OK(wals.AddWal(WalAddition(kNumber, wal)));
Status s = wals.AddWal(WalAddition(kNumber, wal));
ASSERT_TRUE(s.IsCorruption()); ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("WAL 100 is closed more than once") != ASSERT_TRUE(s.ToString().find("WAL 100 is closed more than once") !=
std::string::npos); std::string::npos);
@ -81,7 +88,9 @@ TEST(WalSet, CloseBeforeCreate) {
constexpr WalNumber kNumber = 100; constexpr WalNumber kNumber = 100;
constexpr uint64_t kBytes = 200; constexpr uint64_t kBytes = 200;
WalSet wals; WalSet wals;
Status s = wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes))); WalMetadata wal(kBytes);
wal.SetClosed();
Status s = wals.AddWal(WalAddition(kNumber, wal));
ASSERT_TRUE(s.IsCorruption()); ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("WAL 100 is not created before closing") != ASSERT_TRUE(s.ToString().find("WAL 100 is not created before closing") !=
std::string::npos); std::string::npos);
@ -92,11 +101,15 @@ TEST(WalSet, CreateAfterClose) {
constexpr uint64_t kBytes = 200; constexpr uint64_t kBytes = 200;
WalSet wals; WalSet wals;
ASSERT_OK(wals.AddWal(WalAddition(kNumber))); ASSERT_OK(wals.AddWal(WalAddition(kNumber)));
ASSERT_OK(wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes)))); WalMetadata wal(kBytes);
wal.SetClosed();
ASSERT_OK(wals.AddWal(WalAddition(kNumber, wal)));
Status s = wals.AddWal(WalAddition(kNumber)); Status s = wals.AddWal(WalAddition(kNumber));
ASSERT_TRUE(s.IsCorruption()); ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("WAL 100 is created more than once") != ASSERT_TRUE(
std::string::npos); s.ToString().find(
"WAL 100 must not have smaller synced size than previous one") !=
std::string::npos);
} }
TEST(WalSet, DeleteNonExistingWal) { TEST(WalSet, DeleteNonExistingWal) {

Loading…
Cancel
Save