You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
rocksdb/db/wal_edit.cc

176 lines
4.8 KiB

Define WAL related classes to be used in VersionEdit and VersionSet (#7164) Summary: `WalAddition`, `WalDeletion` are defined in `wal_version.h` and used in `VersionEdit`. `WalAddition` is used to represent events of creating a new WAL (no size, just log number), or closing a WAL (with size). `WalDeletion` is used to represent events of deleting or archiving a WAL, it means the WAL is no longer alive (won't be replayed during recovery). `WalSet` is the set of alive WALs kept in `VersionSet`. 1. Why use `WalDeletion` instead of relying on `MinLogNumber` to identify outdated WALs On recovery, we can compute `MinLogNumber()` based on the log numbers kept in MANIFEST, any log with number < MinLogNumber can be ignored. So it seems that we don't need to persist `WalDeletion` to MANIFEST, since we can ignore the WALs based on MinLogNumber. But the `MinLogNumber()` is actually a lower bound, it does not exactly mean that logs starting from MinLogNumber must exist. This is because in a corner case, when a column family is empty and never flushed, its log number is set to the largest log number, but not persisted in MANIFEST. So let's say there are 2 column families, when creating the DB, the first WAL has log number 1, so it's persisted to MANIFEST for both column families. Then CF 0 is empty and never flushed, CF 1 is updated and flushed, so a new WAL with log number 2 is created and persisted to MANIFEST for CF 1. But CF 0's log number in MANIFEST is still 1. So on recovery, MinLogNumber is 1, but since log 1 only contains data for CF 1, and CF 1 is flushed, log 1 might have already been deleted from disk. We can make `MinLogNumber()` be the exactly minimum log number that must exist, by persisting the most recent log number for empty column families that are not flushed. But if there are N such column families, then every time a new WAL is created, we need to add N records to MANIFEST. In current design, a record is persisted to MANIFEST only when WAL is created, closed, or deleted/archived, so the number of WAL related records are bounded to 3x number of WALs. 2. Why keep `WalSet` in `VersionSet` instead of applying the `VersionEdit`s to `VersionStorageInfo` `VersionEdit`s are originally designed to track the addition and deletion of SST files. The SST files are related to column families, each column family has a list of `Version`s, and each `Version` keeps the set of active SST files in `VersionStorageInfo`. But WALs are a concept of DB, they are not bounded to specific column families. So logically it does not make sense to store WALs in a column family's `Version`s. Also, `Version`'s purpose is to keep reference to SST / blob files, so that they are not deleted until there is no version referencing them. But a WAL is deleted regardless of version references. So we keep the WALs in `VersionSet` for the purpose of writing out the DB state's snapshot when creating new MANIFESTs. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7164 Test Plan: make version_edit_test && ./version_edit_test make wal_edit_test && ./wal_edit_test Reviewed By: ltamasi Differential Revision: D22677936 Pulled By: cheng-chang fbshipit-source-id: 5a3b6890140e572ffd79eb37e6e4c3c32361a859
4 years ago
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/wal_edit.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "util/coding.h"
namespace ROCKSDB_NAMESPACE {
void WalAddition::EncodeTo(std::string* dst) const {
PutVarint64(dst, number_);
if (metadata_.HasSize()) {
PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kSize));
PutVarint64(dst, metadata_.GetSizeInBytes());
}
PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kTerminate));
}
Status WalAddition::DecodeFrom(Slice* src) {
constexpr char class_name[] = "WalAddition";
if (!GetVarint64(src, &number_)) {
return Status::Corruption(class_name, "Error decoding WAL log number");
}
while (true) {
uint32_t tag_value = 0;
if (!GetVarint32(src, &tag_value)) {
return Status::Corruption(class_name, "Error decoding tag");
}
WalAdditionTag tag = static_cast<WalAdditionTag>(tag_value);
switch (tag) {
case WalAdditionTag::kSize: {
uint64_t size = 0;
if (!GetVarint64(src, &size)) {
return Status::Corruption(class_name, "Error decoding WAL file size");
}
metadata_.SetSizeInBytes(size);
break;
}
// TODO: process future tags such as checksum.
case WalAdditionTag::kTerminate:
return Status::OK();
default: {
std::stringstream ss;
ss << "Unknown tag " << tag_value;
return Status::Corruption(class_name, ss.str());
}
}
}
}
JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal) {
jw << "LogNumber" << wal.GetLogNumber() << "SizeInBytes"
<< wal.GetMetadata().GetSizeInBytes();
return jw;
}
std::ostream& operator<<(std::ostream& os, const WalAddition& wal) {
os << "log_number: " << wal.GetLogNumber()
<< " size_in_bytes: " << wal.GetMetadata().GetSizeInBytes();
return os;
}
std::string WalAddition::DebugString() const {
std::ostringstream oss;
oss << *this;
return oss.str();
}
void WalDeletion::EncodeTo(std::string* dst) const {
PutVarint64(dst, number_);
}
Status WalDeletion::DecodeFrom(Slice* src) {
constexpr char class_name[] = "WalDeletion";
if (!GetVarint64(src, &number_)) {
return Status::Corruption(class_name, "Error decoding WAL log number");
}
return Status::OK();
}
JSONWriter& operator<<(JSONWriter& jw, const WalDeletion& wal) {
jw << "LogNumber" << wal.GetLogNumber();
return jw;
}
std::ostream& operator<<(std::ostream& os, const WalDeletion& wal) {
os << "log_number: " << wal.GetLogNumber();
return os;
}
std::string WalDeletion::DebugString() const {
std::ostringstream oss;
oss << *this;
return oss.str();
}
Status WalSet::AddWal(const WalAddition& wal) {
auto it = wals_.lower_bound(wal.GetLogNumber());
if (wal.GetMetadata().HasSize()) {
// The WAL must exist without size.
if (it == wals_.end() || it->first != wal.GetLogNumber()) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " is not created before closing";
return Status::Corruption("WalSet", ss.str());
}
if (it->second.HasSize()) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " is closed more than once";
return Status::Corruption("WalSet", ss.str());
}
it->second = wal.GetMetadata();
} else {
// The WAL must not exist beforehand.
if (it != wals_.end() && it->first == wal.GetLogNumber()) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " is created more than once";
return Status::Corruption("WalSet", ss.str());
}
wals_[wal.GetLogNumber()] = wal.GetMetadata();
}
return Status::OK();
}
Status WalSet::AddWals(const WalAdditions& wals) {
Status s;
for (const WalAddition& wal : wals) {
s = AddWal(wal);
if (!s.ok()) {
break;
}
}
return s;
}
Status WalSet::DeleteWal(const WalDeletion& wal) {
auto it = wals_.lower_bound(wal.GetLogNumber());
// The WAL must exist and has been closed.
if (it == wals_.end() || it->first != wal.GetLogNumber()) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " must exist before deletion";
return Status::Corruption("WalSet", ss.str());
}
if (!it->second.HasSize()) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " must be closed before deletion";
return Status::Corruption("WalSet", ss.str());
}
wals_.erase(it);
return Status::OK();
}
Status WalSet::DeleteWals(const WalDeletions& wals) {
Status s;
for (const WalDeletion& wal : wals) {
s = DeleteWal(wal);
if (!s.ok()) {
break;
}
}
return s;
}
void WalSet::Reset() { wals_.clear(); }
} // namespace ROCKSDB_NAMESPACE