Track WAL in MANIFEST: persist WALs to and recover WALs from MANIFEST (#7256)

Summary:
This PR makes it able to `LogAndApply` `VersionEdit`s related to WALs, and also be able to `Recover` from MANIFEST with WAL related `VersionEdit`s.

The `VersionEdit`s related to WAL are treated similarly as those related to column family operations, they are not applied to versions, but can be in a commit group. Mixing WAL related `VersionEdit`s with other types of edits will make logic in `ProcessManifestWrite` more complicated, so `VersionEdit`s related to WAL can either be WAL additions or deletions, like column family add and drop.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7256

Test Plan: a set of unit tests are added in `version_set_test.cc`

Reviewed By: riversand963

Differential Revision: D23123238

Pulled By: cheng-chang

fbshipit-source-id: 246be2ed4744fd03fa2738aba408aaa611d0379c
main
Cheng Chang 4 years ago committed by Facebook GitHub Bot
parent a16d1b2fd3
commit 1b224324b5
  1. 147
      db/db_impl/db_impl_open.cc
  2. 13
      db/version_edit.h
  3. 14
      db/version_edit_handler.cc
  4. 4
      db/version_edit_handler.h
  5. 13
      db/version_edit_test.cc
  6. 97
      db/version_set.cc
  7. 15
      db/version_set.h
  8. 503
      db/version_set_test.cc
  9. 51
      db/wal_edit.cc
  10. 18
      db/wal_edit.h
  11. 57
      db/wal_edit_test.cc

@ -536,7 +536,7 @@ Status DBImpl::Recover(
std::vector<std::string> files_in_wal_dir;
if (s.ok()) {
// Initial max_total_in_memory_state_ before recovery logs. Log recovery
// Initial max_total_in_memory_state_ before recovery wals. Log recovery
// may check this value to decide whether to flush.
max_total_in_memory_state_ = 0;
for (auto cfd : *versions_->GetColumnFamilySet()) {
@ -571,7 +571,7 @@ Status DBImpl::Recover(
return s;
}
std::vector<uint64_t> logs;
std::unordered_map<uint64_t, std::string> wal_files;
for (const auto& file : files_in_wal_dir) {
uint64_t number;
FileType type;
@ -582,21 +582,40 @@ Status DBImpl::Recover(
"existing log file: ",
file);
} else {
logs.push_back(number);
wal_files[number] =
LogFileName(immutable_db_options_.wal_dir, number);
}
}
}
if (logs.size() > 0) {
if (immutable_db_options_.track_and_verify_wals_in_manifest) {
// Verify WALs in MANIFEST.
s = versions_->GetWalSet().CheckWals(env_, wal_files);
} else if (!versions_->GetWalSet().GetWals().empty()) {
// Tracking is disabled, clear previously tracked WALs from MANIFEST,
// otherwise, in the future, if WAL tracking is enabled again,
// since the WALs deleted when WAL tracking is disabled are not persisted
// into MANIFEST, WAL check may fail.
VersionEdit edit;
for (const auto& wal : versions_->GetWalSet().GetWals()) {
WalNumber number = wal.first;
edit.DeleteWal(number);
}
s = versions_->LogAndApplyToDefaultColumnFamily(&edit, &mutex_);
}
if (!s.ok()) {
return s;
}
if (!wal_files.empty()) {
if (error_if_wal_file_exists) {
return Status::Corruption(
"The db was opened in readonly mode with error_if_wal_file_exists"
"flag but a WAL file already exists");
} else if (error_if_data_exists_in_wals) {
for (auto& log : logs) {
std::string fname = LogFileName(immutable_db_options_.wal_dir, log);
for (auto& wal_file : wal_files) {
uint64_t bytes;
s = env_->GetFileSize(fname, &bytes);
s = env_->GetFileSize(wal_file.second, &bytes);
if (s.ok()) {
if (bytes > 0) {
return Status::Corruption(
@ -608,13 +627,19 @@ Status DBImpl::Recover(
}
}
if (!logs.empty()) {
// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
bool corrupted_log_found = false;
s = RecoverLogFiles(logs, &next_sequence, read_only,
&corrupted_log_found);
if (corrupted_log_found && recovered_seq != nullptr) {
if (!wal_files.empty()) {
// Recover in the order in which the wals were generated
std::vector<uint64_t> wals;
wals.reserve(wal_files.size());
for (const auto& wal_file : wal_files) {
wals.push_back(wal_file.first);
}
std::sort(wals.begin(), wals.end());
bool corrupted_wal_found = false;
s = RecoverLogFiles(wals, &next_sequence, read_only,
&corrupted_wal_found);
if (corrupted_wal_found && recovered_seq != nullptr) {
*recovered_seq = next_sequence;
}
if (!s.ok()) {
@ -767,10 +792,10 @@ Status DBImpl::InitPersistStatsColumnFamily() {
return s;
}
// REQUIRES: log_numbers are sorted in ascending order
Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// REQUIRES: wal_numbers are sorted in ascending order
Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
SequenceNumber* next_sequence, bool read_only,
bool* corrupted_log_found) {
bool* corrupted_wal_found) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
@ -800,10 +825,10 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
auto stream = event_logger_.Log();
stream << "job" << job_id << "event"
<< "recovery_started";
stream << "log_files";
stream << "wal_files";
stream.StartArray();
for (auto log_number : log_numbers) {
stream << log_number;
for (auto wal_number : wal_numbers) {
stream << wal_number;
}
stream.EndArray();
}
@ -826,25 +851,25 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
bool stop_replay_by_wal_filter = false;
bool stop_replay_for_corruption = false;
bool flushed = false;
uint64_t corrupted_log_number = kMaxSequenceNumber;
uint64_t min_log_number = MinLogNumberToKeep();
for (auto log_number : log_numbers) {
if (log_number < min_log_number) {
uint64_t corrupted_wal_number = kMaxSequenceNumber;
uint64_t min_wal_number = MinLogNumberToKeep();
for (auto wal_number : wal_numbers) {
if (wal_number < min_wal_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Skipping log #%" PRIu64
" since it is older than min log to keep #%" PRIu64,
log_number, min_log_number);
wal_number, min_wal_number);
continue;
}
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(log_number);
versions_->MarkFileNumberUsed(wal_number);
// Open the log file
std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
std::string fname = LogFileName(immutable_db_options_.wal_dir, wal_number);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Recovering log #%" PRIu64 " mode %d", log_number,
"Recovering log #%" PRIu64 " mode %d", wal_number,
static_cast<int>(immutable_db_options_.wal_recovery_mode));
auto logFileDropped = [this, &fname]() {
uint64_t bytes;
@ -897,7 +922,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
&reporter, true /*checksum*/, log_number);
&reporter, true /*checksum*/, wal_number);
// Determine if we should tolerate incomplete records at the tail end of the
// Read all the records and add to a memtable
@ -945,7 +970,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
WalFilter::WalProcessingOption wal_processing_option =
immutable_db_options_.wal_filter->LogRecordFound(
log_number, fname, batch, &new_batch, &batch_changed);
wal_number, fname, batch, &new_batch, &batch_changed);
switch (wal_processing_option) {
case WalFilter::WalProcessingOption::kContinueProcessing:
@ -997,7 +1022,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
" mode %d log filter %s returned "
"more records (%d) than original (%d) which is not allowed. "
"Aborting recovery.",
log_number,
wal_number,
static_cast<int>(immutable_db_options_.wal_recovery_mode),
immutable_db_options_.wal_filter->Name(), new_count,
original_count);
@ -1024,7 +1049,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
bool has_valid_writes = false;
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), &flush_scheduler_,
&trim_history_scheduler_, true, log_number, this,
&trim_history_scheduler_, true, wal_number, this,
false /* concurrent_memtable_writes */, next_sequence,
&has_valid_writes, seq_per_batch_, batch_per_txn_);
MaybeIgnoreError(&status);
@ -1044,7 +1069,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
cfd->UnrefAndTryDelete();
// If this asserts, it means that InsertInto failed in
// filtering updates to already-flushed column families
assert(cfd->GetLogNumber() <= log_number);
assert(cfd->GetLogNumber() <= wal_number);
auto iter = version_edits.find(cfd->GetID());
assert(iter != version_edits.end());
VersionEdit* edit = &iter->second;
@ -1081,21 +1106,21 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
" seq #%" PRIu64
". %s. This likely mean loss of synced WAL, "
"thus recovery fails.",
log_number, *next_sequence,
wal_number, *next_sequence,
status.ToString().c_str());
return status;
}
// We should ignore the error but not continue replaying
status = Status::OK();
stop_replay_for_corruption = true;
corrupted_log_number = log_number;
if (corrupted_log_found != nullptr) {
*corrupted_log_found = true;
corrupted_wal_number = wal_number;
if (corrupted_wal_found != nullptr) {
*corrupted_wal_found = true;
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Point in time recovered to log #%" PRIu64
" seq #%" PRIu64,
log_number, *next_sequence);
wal_number, *next_sequence);
} else {
assert(immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kTolerateCorruptedTailRecords ||
@ -1121,7 +1146,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// corruption. This could during PIT recovery when the WAL is corrupted and
// some (but not all) CFs are flushed
// Exclude the PIT case where no log is dropped after the corruption point.
// This is to cover the case for empty logs after corrupted log, in which we
// This is to cover the case for empty wals after corrupted log, in which we
// don't reset stop_replay_for_corruption.
if (stop_replay_for_corruption == true &&
(immutable_db_options_.wal_recovery_mode ==
@ -1129,7 +1154,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kTolerateCorruptedTailRecords)) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->GetLogNumber() > corrupted_log_number) {
if (cfd->GetLogNumber() > corrupted_wal_number) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Column family inconsistency: SST file contains data"
" beyond the point of corruption.");
@ -1144,16 +1169,16 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
if (!read_only) {
// no need to refcount since client still doesn't have access
// to the DB and can not drop column families while we iterate
auto max_log_number = log_numbers.back();
auto max_wal_number = wal_numbers.back();
for (auto cfd : *versions_->GetColumnFamilySet()) {
auto iter = version_edits.find(cfd->GetID());
assert(iter != version_edits.end());
VersionEdit* edit = &iter->second;
if (cfd->GetLogNumber() > max_log_number) {
if (cfd->GetLogNumber() > max_wal_number) {
// Column family cfd has already flushed the data
// from all logs. Memtable has to be empty because
// we filter the updates based on log_number
// from all wals. Memtable has to be empty because
// we filter the updates based on wal_number
// (in WriteBatch::InsertInto)
assert(cfd->mem()->GetFirstSequenceNumber() == 0);
assert(edit->NumEntries() == 0);
@ -1185,13 +1210,13 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// Update the log number info in the version edit corresponding to this
// column family. Note that the version edits will be written to MANIFEST
// together later.
// writing log_number in the manifest means that any log file
// with number strongly less than (log_number + 1) is already
// writing wal_number in the manifest means that any log file
// with number strongly less than (wal_number + 1) is already
// recovered and should be ignored on next reincarnation.
// Since we already recovered max_log_number, we want all logs
// with numbers `<= max_log_number` (includes this one) to be ignored
// Since we already recovered max_wal_number, we want all wals
// with numbers `<= max_wal_number` (includes this one) to be ignored
if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
edit->SetLogNumber(max_log_number + 1);
edit->SetLogNumber(max_wal_number + 1);
}
}
if (status.ok()) {
@ -1199,7 +1224,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// not actually used. that is because VersionSet assumes
// VersionSet::next_file_number_ always to be strictly greater than any
// log number
versions_->MarkFileNumberUsed(max_log_number + 1);
versions_->MarkFileNumberUsed(max_wal_number + 1);
autovector<ColumnFamilyData*> cfds;
autovector<const MutableCFOptions*> cf_opts;
@ -1219,7 +1244,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
}
if (status.ok() && data_seen && !flushed) {
status = RestoreAliveLogFiles(log_numbers);
status = RestoreAliveLogFiles(wal_numbers);
}
event_logger_.Log() << "job" << job_id << "event"
@ -1228,8 +1253,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
return status;
}
Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& log_numbers) {
if (log_numbers.empty()) {
Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
if (wal_numbers.empty()) {
return Status::OK();
}
Status s;
@ -1242,20 +1267,20 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& log_numbers) {
// FindObsoleteFiles()
total_log_size_ = 0;
log_empty_ = false;
for (auto log_number : log_numbers) {
LogFileNumberSize log(log_number);
std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
// This gets the appear size of the logs, not including preallocated space.
for (auto wal_number : wal_numbers) {
LogFileNumberSize log(wal_number);
std::string fname = LogFileName(immutable_db_options_.wal_dir, wal_number);
// This gets the appear size of the wals, not including preallocated space.
s = env_->GetFileSize(fname, &log.size);
if (!s.ok()) {
break;
}
total_log_size_ += log.size;
alive_log_files_.push_back(log);
// We preallocate space for logs, but then after a crash and restart, those
// We preallocate space for wals, but then after a crash and restart, those
// preallocated space are not needed anymore. It is likely only the last
// log has such preallocated space, so we only truncate for the last log.
if (log_number == log_numbers.back()) {
if (wal_number == wal_numbers.back()) {
std::unique_ptr<FSWritableFile> last_log;
Status truncate_status = fs_->ReopenWritableFile(
fname,
@ -1272,7 +1297,7 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& log_numbers) {
// Not a critical error if fail to truncate.
if (!truncate_status.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Failed to truncate log #%" PRIu64 ": %s", log_number,
"Failed to truncate log #%" PRIu64 ": %s", wal_number,
truncate_status.ToString().c_str());
}
}
@ -1610,7 +1635,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
// In WritePrepared there could be gap in sequence numbers. This breaks
// the trick we use in kPointInTimeRecovery which assumes the first seq in
// the log right after the corrupted log is one larger than the last seq
// we read from the logs. To let this trick keep working, we add a dummy
// we read from the wals. To let this trick keep working, we add a dummy
// entry with the expected sequence to the first log right after recovery.
// In non-WritePrepared case also the new log after recovery could be
// empty, and thus missing the consecutive seq hint to distinguish

@ -453,21 +453,26 @@ class VersionEdit {
// Add a WAL (either just created or closed).
void AddWal(WalNumber number, WalMetadata metadata = WalMetadata()) {
assert(NumEntries() == wal_additions_.size());
wal_additions_.emplace_back(number, std::move(metadata));
}
// Retrieve all the added WALs.
const WalAdditions& GetWalAdditions() const { return wal_additions_; }
bool HasWalAddition() const { return !wal_additions_.empty(); }
bool IsWalAddition() const { return !wal_additions_.empty(); }
// Delete a WAL (either directly deleted or archived).
void DeleteWal(WalNumber number) { wal_deletions_.emplace_back(number); }
void DeleteWal(WalNumber number) {
assert(NumEntries() == wal_deletions_.size());
wal_deletions_.emplace_back(number);
}
// Retrieve all the deleted WALs.
const WalDeletions& GetWalDeletions() const { return wal_deletions_; }
bool HasWalDeletion() const { return !wal_deletions_.empty(); }
bool IsWalDeletion() const { return !wal_deletions_.empty(); }
bool IsWalManipulation() const { return IsWalAddition() || IsWalDeletion(); }
// Number of edits
size_t NumEntries() const {

@ -121,6 +121,10 @@ Status VersionEditHandler::ApplyVersionEdit(VersionEdit& edit,
s = OnColumnFamilyAdd(edit, cfd);
} else if (edit.is_column_family_drop_) {
s = OnColumnFamilyDrop(edit, cfd);
} else if (edit.IsWalAddition()) {
s = OnWalAddition(edit);
} else if (edit.IsWalDeletion()) {
s = OnWalDeletion(edit);
} else {
s = OnNonCfOperation(edit, cfd);
}
@ -190,6 +194,16 @@ Status VersionEditHandler::OnColumnFamilyDrop(VersionEdit& edit,
return s;
}
Status VersionEditHandler::OnWalAddition(VersionEdit& edit) {
assert(edit.IsWalAddition());
return version_set_->wals_.AddWals(edit.GetWalAdditions());
}
Status VersionEditHandler::OnWalDeletion(VersionEdit& edit) {
assert(edit.IsWalDeletion());
return version_set_->wals_.DeleteWals(edit.GetWalDeletions());
}
Status VersionEditHandler::OnNonCfOperation(VersionEdit& edit,
ColumnFamilyData** cfd) {
bool cf_in_not_found = false;

@ -56,6 +56,10 @@ class VersionEditHandler {
Status OnNonCfOperation(VersionEdit& edit, ColumnFamilyData** cfd);
Status OnWalAddition(VersionEdit& edit);
Status OnWalDeletion(VersionEdit& edit);
Status Initialize();
void CheckColumnFamilyId(const VersionEdit& edit, bool* cf_in_not_found,

@ -317,10 +317,6 @@ TEST_F(VersionEditTest, AddWalEncodeDecode) {
if (has_size) {
meta.SetSyncedSizeInBytes(rand() % 1000);
}
bool is_closed = rand() % 2 == 0;
if (is_closed) {
meta.SetClosed();
}
edit.AddWal(log_number, meta);
}
TestEncodeDecode(edit);
@ -442,7 +438,7 @@ TEST_F(VersionEditTest, AddWalDebug) {
const WalAdditions& wals = edit.GetWalAdditions();
ASSERT_TRUE(edit.HasWalAddition());
ASSERT_TRUE(edit.IsWalAddition());
ASSERT_EQ(wals.size(), n);
for (int i = 0; i < n; i++) {
const WalAddition& wal = wals[i];
@ -454,7 +450,7 @@ TEST_F(VersionEditTest, AddWalDebug) {
for (int i = 0; i < n; i++) {
std::stringstream ss;
ss << " WalAddition: log_number: " << kLogNumbers[i]
<< " synced_size_in_bytes: " << kSizeInBytes[i] << " closed: 0\n";
<< " synced_size_in_bytes: " << kSizeInBytes[i] << "\n";
expected_str += ss.str();
}
expected_str += " ColumnFamily: 0\n}\n";
@ -464,8 +460,7 @@ TEST_F(VersionEditTest, AddWalDebug) {
for (int i = 0; i < n; i++) {
std::stringstream ss;
ss << "{\"LogNumber\": " << kLogNumbers[i] << ", "
<< "\"SyncedSizeInBytes\": " << kSizeInBytes[i] << ", "
<< "\"Closed\": 0}";
<< "\"SyncedSizeInBytes\": " << kSizeInBytes[i] << "}";
if (i < n - 1) ss << ", ";
expected_json += ss.str();
}
@ -492,7 +487,7 @@ TEST_F(VersionEditTest, DeleteWalDebug) {
const WalDeletions& wals = edit.GetWalDeletions();
ASSERT_TRUE(edit.HasWalDeletion());
ASSERT_TRUE(edit.IsWalDeletion());
ASSERT_EQ(wals.size(), n);
for (int i = 0; i < n; i++) {
const WalDeletion& wal = wals[i];

@ -3674,7 +3674,19 @@ struct VersionSet::ManifestWriter {
cfd(_cfd),
mutable_cf_options(cf_options),
edit_list(e) {}
~ManifestWriter() { status.PermitUncheckedError(); }
bool IsAllWalEdits() const {
bool all_wal_edits = true;
for (const auto& e : edit_list) {
if (!e->IsWalManipulation()) {
all_wal_edits = false;
break;
}
}
return all_wal_edits;
}
};
Status AtomicGroupReadBuffer::AddEdit(VersionEdit* edit) {
@ -3826,6 +3838,7 @@ Status VersionSet::ProcessManifestWrites(
std::deque<ManifestWriter>& writers, InstrumentedMutex* mu,
FSDirectory* db_directory, bool new_descriptor_log,
const ColumnFamilyOptions* new_cf_options) {
mu->AssertHeld();
assert(!writers.empty());
ManifestWriter& first_writer = writers.front();
ManifestWriter* last_writer = &first_writer;
@ -3902,16 +3915,22 @@ Status VersionSet::ProcessManifestWrites(
}
}
if (version == nullptr) {
version = new Version(last_writer->cfd, this, file_options_,
last_writer->mutable_cf_options, io_tracer_,
current_version_number_++);
versions.push_back(version);
mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
builder_guards.emplace_back(
new BaseReferencedVersionBuilder(last_writer->cfd));
builder = builder_guards.back()->version_builder();
}
assert(builder != nullptr); // make checker happy
// WAL manipulations do not need to be applied to versions.
if (!last_writer->IsAllWalEdits()) {
version = new Version(last_writer->cfd, this, file_options_,
last_writer->mutable_cf_options, io_tracer_,
current_version_number_++);
versions.push_back(version);
mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
builder_guards.emplace_back(
new BaseReferencedVersionBuilder(last_writer->cfd));
builder = builder_guards.back()->version_builder();
}
assert(last_writer->IsAllWalEdits() || builder);
assert(last_writer->IsAllWalEdits() || version);
TEST_SYNC_POINT_CALLBACK("VersionSet::ProcessManifestWrites:NewVersion",
version);
}
for (const auto& e : last_writer->edit_list) {
if (e->is_in_atomic_group_) {
if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ ||
@ -3997,6 +4016,7 @@ Status VersionSet::ProcessManifestWrites(
// reads its content after releasing db mutex to avoid race with
// SwitchMemtable().
std::unordered_map<uint32_t, MutableCFState> curr_state;
VersionEdit wal_additions;
if (new_descriptor_log) {
pending_manifest_file_number_ = NewFileNumber();
batch_edits.back()->SetNextFile(next_file_number_.load());
@ -4011,6 +4031,10 @@ Status VersionSet::ProcessManifestWrites(
assert(curr_state.find(cfd->GetID()) == curr_state.end());
curr_state[cfd->GetID()] = {cfd->GetLogNumber()};
}
for (const auto& wal : wals_.GetWals()) {
wal_additions.AddWal(wal.first, wal.second);
}
}
uint64_t new_manifest_file_size = 0;
@ -4063,8 +4087,8 @@ Status VersionSet::ProcessManifestWrites(
io_tracer_, nullptr, db_options_->listeners));
descriptor_log_.reset(
new log::Writer(std::move(file_writer), 0, false));
s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get(),
io_s);
s = WriteCurrentStateToManifest(curr_state, wal_additions,
descriptor_log_.get(), io_s);
} else {
s = io_s;
}
@ -4145,6 +4169,20 @@ Status VersionSet::ProcessManifestWrites(
mu->Lock();
}
if (s.ok()) {
// Apply WAL edits, DB mutex must be held.
for (auto& e : batch_edits) {
if (e->IsWalAddition()) {
s = wals_.AddWals(e->GetWalAdditions());
} else if (e->IsWalDeletion()) {
s = wals_.DeleteWals(e->GetWalDeletions());
}
if (!s.ok()) {
break;
}
}
}
if (!io_s.ok()) {
if (io_status_.ok()) {
io_status_ = io_s;
@ -4392,9 +4430,11 @@ Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
: last_sequence_);
Status s = builder->Apply(edit);
return s;
// The builder can be nullptr only if edit is WAL manipulation,
// because WAL edits do not need to be applied to versions,
// we return Status::OK() in this case.
assert(builder || edit->IsWalManipulation());
return builder ? builder->Apply(edit) : Status::OK();
}
Status VersionSet::ApplyOneVersionEditToBuilder(
@ -4468,6 +4508,16 @@ Status VersionSet::ApplyOneVersionEditToBuilder(
return Status::Corruption(
"Manifest - dropping non-existing column family");
}
} else if (edit.IsWalAddition()) {
Status s = wals_.AddWals(edit.GetWalAdditions());
if (!s.ok()) {
return s;
}
} else if (edit.IsWalDeletion()) {
Status s = wals_.DeleteWals(edit.GetWalDeletions());
if (!s.ok()) {
return s;
}
} else if (!cf_in_not_found) {
if (!cf_in_builders) {
return Status::Corruption(
@ -5412,7 +5462,7 @@ void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
Status VersionSet::WriteCurrentStateToManifest(
const std::unordered_map<uint32_t, MutableCFState>& curr_state,
log::Writer* log, IOStatus& io_s) {
const VersionEdit& wal_additions, log::Writer* log, IOStatus& io_s) {
// TODO: Break up into multiple records to reduce memory usage on recovery?
// WARNING: This method doesn't hold a mutex!!
@ -5437,6 +5487,21 @@ Status VersionSet::WriteCurrentStateToManifest(
}
}
// Save WALs.
if (!wal_additions.GetWalAdditions().empty()) {
TEST_SYNC_POINT_CALLBACK("VersionSet::WriteCurrentStateToManifest:SaveWal",
const_cast<VersionEdit*>(&wal_additions));
std::string record;
if (!wal_additions.EncodeTo(&record)) {
return Status::Corruption("Unable to Encode VersionEdit: " +
wal_additions.DebugString(true));
}
io_s = log->AddRecord(record);
if (!io_s.ok()) {
return io_s;
}
}
for (auto cfd : *column_family_set_) {
assert(cfd);

@ -917,6 +917,17 @@ class VersionSet {
virtual ~VersionSet();
Status LogAndApplyToDefaultColumnFamily(
VersionEdit* edit, InstrumentedMutex* mu,
FSDirectory* db_directory = nullptr, bool new_descriptor_log = false,
const ColumnFamilyOptions* column_family_options = nullptr) {
ColumnFamilyData* default_cf = GetColumnFamilySet()->GetDefault();
const MutableCFOptions* cf_options =
default_cf->GetLatestMutableCFOptions();
return LogAndApply(default_cf, *cf_options, edit, mu, db_directory,
new_descriptor_log, column_family_options);
}
// Apply *edit to the current version to form a new descriptor that
// is both saved to persistent state and installed as the new
// current version. Will release *mu while actually writing to the file.
@ -1187,6 +1198,7 @@ class VersionSet {
// Get the IO Status returned by written Manifest.
const IOStatus& io_status() const { return io_status_; }
// The returned WalSet needs to be accessed with DB mutex held.
const WalSet& GetWalSet() const { return wals_; }
void TEST_CreateAndAppendVersion(ColumnFamilyData* cfd) {
@ -1242,7 +1254,7 @@ class VersionSet {
// Save current contents to *log
Status WriteCurrentStateToManifest(
const std::unordered_map<uint32_t, MutableCFState>& curr_state,
log::Writer* log, IOStatus& io_s);
const VersionEdit& wal_additions, log::Writer* log, IOStatus& io_s);
void AppendVersion(ColumnFamilyData* column_family_data, Version* v);
@ -1275,6 +1287,7 @@ class VersionSet {
Status VerifyFileMetadata(const std::string& fpath,
const FileMetaData& meta) const;
// Protected by DB mutex.
WalSet wals_;
std::unique_ptr<ColumnFamilySet> column_family_set_;

@ -816,18 +816,17 @@ class VersionSetTestBase {
// Create DB with 3 column families.
void NewDB() {
std::vector<ColumnFamilyDescriptor> column_families;
SequenceNumber last_seqno;
std::unique_ptr<log::Writer> log_writer;
SetIdentityFile(env_, dbname_);
PrepareManifest(&column_families, &last_seqno, &log_writer);
PrepareManifest(&column_families_, &last_seqno, &log_writer);
log_writer.reset();
// Make "CURRENT" file point to the new manifest file.
Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
ASSERT_OK(s);
EXPECT_OK(versions_->Recover(column_families, false));
EXPECT_EQ(column_families.size(),
EXPECT_OK(versions_->Recover(column_families_, false));
EXPECT_EQ(column_families_.size(),
versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
}
@ -840,6 +839,40 @@ class VersionSetTestBase {
ASSERT_EQ(1, manifest_file_number);
}
Status LogAndApplyToDefaultCF(VersionEdit& edit) {
mutex_.Lock();
Status s =
versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
mutable_cf_options_, &edit, &mutex_);
mutex_.Unlock();
return s;
}
Status LogAndApplyToDefaultCF(
const autovector<std::unique_ptr<VersionEdit>>& edits) {
autovector<VersionEdit*> vedits;
for (auto& e : edits) {
vedits.push_back(e.get());
}
mutex_.Lock();
Status s =
versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
mutable_cf_options_, vedits, &mutex_);
mutex_.Unlock();
return s;
}
void CreateNewManifest() {
constexpr FSDirectory* db_directory = nullptr;
constexpr bool new_descriptor_log = true;
mutex_.Lock();
VersionEdit dummy;
ASSERT_OK(versions_->LogAndApply(
versions_->GetColumnFamilySet()->GetDefault(), mutable_cf_options_,
&dummy, &mutex_, db_directory, new_descriptor_log));
mutex_.Unlock();
}
MockEnv* mem_env_;
Env* env_;
std::shared_ptr<Env> env_guard_;
@ -859,6 +892,7 @@ class VersionSetTestBase {
InstrumentedMutex mutex_;
std::atomic<bool> shutting_down_;
std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
std::vector<ColumnFamilyDescriptor> column_families_;
};
const std::string VersionSetTestBase::kColumnFamilyName1 = "alice";
@ -979,17 +1013,8 @@ TEST_F(VersionSetTest, PersistBlobFileStateInNewManifest) {
[&](void* /* arg */) { ++garbage_encoded; });
SyncPoint::GetInstance()->EnableProcessing();
VersionEdit dummy;
mutex_.Lock();
constexpr FSDirectory* db_directory = nullptr;
constexpr bool new_descriptor_log = true;
Status s = versions_->LogAndApply(
versions_->GetColumnFamilySet()->GetDefault(), mutable_cf_options_,
&dummy, &mutex_, db_directory, new_descriptor_log);
mutex_.Unlock();
CreateNewManifest();
ASSERT_OK(s);
ASSERT_EQ(addition_encoded, 2);
ASSERT_EQ(garbage_encoded, 1);
@ -1158,6 +1183,456 @@ TEST_F(VersionSetTest, ObsoleteBlobFile) {
}
}
TEST_F(VersionSetTest, WalEditsNotAppliedToVersion) {
NewDB();
constexpr uint64_t kNumWals = 5;
autovector<std::unique_ptr<VersionEdit>> edits;
// Add some WALs.
for (uint64_t i = 1; i <= kNumWals; i++) {
edits.emplace_back(new VersionEdit);
// WAL's size equals its log number.
edits.back()->AddWal(i, WalMetadata(i));
}
// Delete the first half of the WALs.
for (uint64_t i = 1; i <= kNumWals; i++) {
edits.emplace_back(new VersionEdit);
edits.back()->DeleteWal(i);
}
autovector<Version*> versions;
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::ProcessManifestWrites:NewVersion",
[&](void* arg) { versions.push_back(reinterpret_cast<Version*>(arg)); });
SyncPoint::GetInstance()->EnableProcessing();
LogAndApplyToDefaultCF(edits);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
// Since the edits are all WAL edits, no version should be created.
ASSERT_EQ(versions.size(), 1);
ASSERT_EQ(versions[0], nullptr);
}
// Similar to WalEditsNotAppliedToVersion, but contains a non-WAL edit.
TEST_F(VersionSetTest, NonWalEditsAppliedToVersion) {
NewDB();
const std::string kDBId = "db_db";
constexpr uint64_t kNumWals = 5;
autovector<std::unique_ptr<VersionEdit>> edits;
// Add some WALs.
for (uint64_t i = 1; i <= kNumWals; i++) {
edits.emplace_back(new VersionEdit);
// WAL's size equals its log number.
edits.back()->AddWal(i, WalMetadata(i));
}
// Delete the first half of the WALs.
for (uint64_t i = 1; i <= kNumWals; i++) {
edits.emplace_back(new VersionEdit);
edits.back()->DeleteWal(i);
}
edits.emplace_back(new VersionEdit);
edits.back()->SetDBId(kDBId);
autovector<Version*> versions;
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::ProcessManifestWrites:NewVersion",
[&](void* arg) { versions.push_back(reinterpret_cast<Version*>(arg)); });
SyncPoint::GetInstance()->EnableProcessing();
LogAndApplyToDefaultCF(edits);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
// Since the edits are all WAL edits, no version should be created.
ASSERT_EQ(versions.size(), 1);
ASSERT_NE(versions[0], nullptr);
}
TEST_F(VersionSetTest, WalAddition) {
NewDB();
constexpr WalNumber kLogNumber = 10;
constexpr uint64_t kSizeInBytes = 111;
// A WAL is just created.
{
VersionEdit edit;
edit.AddWal(kLogNumber);
ASSERT_OK(LogAndApplyToDefaultCF(edit));
const auto& wals = versions_->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
ASSERT_FALSE(wals.at(kLogNumber).HasSyncedSize());
}
// The WAL is synced for several times before closing.
{
for (uint64_t size_delta = 100; size_delta > 0; size_delta /= 2) {
uint64_t size = kSizeInBytes - size_delta;
WalMetadata wal(size);
VersionEdit edit;
edit.AddWal(kLogNumber, wal);
ASSERT_OK(LogAndApplyToDefaultCF(edit));
const auto& wals = versions_->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), size);
}
}
// The WAL is closed.
{
WalMetadata wal(kSizeInBytes);
VersionEdit edit;
edit.AddWal(kLogNumber, wal);
ASSERT_OK(LogAndApplyToDefaultCF(edit));
const auto& wals = versions_->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSizeInBytes);
}
// Recover a new VersionSet.
{
std::unique_ptr<VersionSet> new_versions(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
ASSERT_OK(new_versions->Recover(column_families_, /*read_only=*/false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSizeInBytes);
}
}
TEST_F(VersionSetTest, WalCloseWithoutSync) {
NewDB();
constexpr WalNumber kLogNumber = 10;
constexpr uint64_t kSizeInBytes = 111;
constexpr uint64_t kSyncedSizeInBytes = kSizeInBytes / 2;
// A WAL is just created.
{
VersionEdit edit;
edit.AddWal(kLogNumber);
ASSERT_OK(LogAndApplyToDefaultCF(edit));
const auto& wals = versions_->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
ASSERT_FALSE(wals.at(kLogNumber).HasSyncedSize());
}
// The WAL is synced before closing.
{
WalMetadata wal(kSyncedSizeInBytes);
VersionEdit edit;
edit.AddWal(kLogNumber, wal);
ASSERT_OK(LogAndApplyToDefaultCF(edit));
const auto& wals = versions_->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSyncedSizeInBytes);
}
// A new WAL with larger log number is created,
// implicitly marking the current WAL closed.
{
VersionEdit edit;
edit.AddWal(kLogNumber + 1);
ASSERT_OK(LogAndApplyToDefaultCF(edit));
const auto& wals = versions_->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 2);
ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSyncedSizeInBytes);
ASSERT_TRUE(wals.find(kLogNumber + 1) != wals.end());
ASSERT_FALSE(wals.at(kLogNumber + 1).HasSyncedSize());
}
// Recover a new VersionSet.
{
std::unique_ptr<VersionSet> new_versions(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 2);
ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSyncedSizeInBytes);
}
}
TEST_F(VersionSetTest, WalDeletion) {
NewDB();
constexpr WalNumber kClosedLogNumber = 10;
constexpr WalNumber kNonClosedLogNumber = 20;
constexpr uint64_t kSizeInBytes = 111;
// Add a non-closed and a closed WAL.
{
VersionEdit edit;
edit.AddWal(kClosedLogNumber, WalMetadata(kSizeInBytes));
edit.AddWal(kNonClosedLogNumber);
ASSERT_OK(LogAndApplyToDefaultCF(edit));
const auto& wals = versions_->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 2);
ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end());
ASSERT_TRUE(wals.find(kClosedLogNumber) != wals.end());
ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize());
ASSERT_TRUE(wals.at(kClosedLogNumber).HasSyncedSize());
ASSERT_EQ(wals.at(kClosedLogNumber).GetSyncedSizeInBytes(), kSizeInBytes);
}
// Delete the closed WAL.
{
VersionEdit edit;
edit.DeleteWal(kClosedLogNumber);
ASSERT_OK(LogAndApplyToDefaultCF(edit));
const auto& wals = versions_->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end());
ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize());
}
// Recover a new VersionSet, only the non-closed WAL should show up.
{
std::unique_ptr<VersionSet> new_versions(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end());
ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize());
}
// Force the creation of a new MANIFEST file,
// only the non-closed WAL should be written to the new MANIFEST.
{
std::vector<WalAddition> wal_additions;
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::WriteCurrentStateToManifest:SaveWal", [&](void* arg) {
VersionEdit* edit = reinterpret_cast<VersionEdit*>(arg);
ASSERT_TRUE(edit->IsWalAddition());
for (auto& addition : edit->GetWalAdditions()) {
wal_additions.push_back(addition);
}
});
SyncPoint::GetInstance()->EnableProcessing();
CreateNewManifest();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_EQ(wal_additions.size(), 1);
ASSERT_EQ(wal_additions[0].GetLogNumber(), kNonClosedLogNumber);
ASSERT_FALSE(wal_additions[0].GetMetadata().HasSyncedSize());
}
// Recover from the new MANIFEST, only the non-closed WAL should show up.
{
std::unique_ptr<VersionSet> new_versions(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end());
ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize());
}
}
TEST_F(VersionSetTest, WalCreateTwice) {
NewDB();
constexpr WalNumber kLogNumber = 10;
VersionEdit edit;
edit.AddWal(kLogNumber);
ASSERT_OK(LogAndApplyToDefaultCF(edit));
Status s = LogAndApplyToDefaultCF(edit);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("WAL 10 is created more than once") !=
std::string::npos)
<< s.ToString();
}
TEST_F(VersionSetTest, WalCreateAfterClose) {
NewDB();
constexpr WalNumber kLogNumber = 10;
constexpr uint64_t kSizeInBytes = 111;
{
// Add a closed WAL.
VersionEdit edit;
edit.AddWal(kLogNumber);
WalMetadata wal(kSizeInBytes);
edit.AddWal(kLogNumber, wal);
ASSERT_OK(LogAndApplyToDefaultCF(edit));
}
{
// Create the same WAL again.
VersionEdit edit;
edit.AddWal(kLogNumber);
Status s = LogAndApplyToDefaultCF(edit);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("WAL 10 is created more than once") !=
std::string::npos)
<< s.ToString();
}
}
TEST_F(VersionSetTest, AddWalWithSmallerSize) {
NewDB();
constexpr WalNumber kLogNumber = 10;
constexpr uint64_t kSizeInBytes = 111;
{
// Add a closed WAL.
VersionEdit edit;
WalMetadata wal(kSizeInBytes);
edit.AddWal(kLogNumber, wal);
ASSERT_OK(LogAndApplyToDefaultCF(edit));
}
{
// Add the same WAL with smaller synced size.
VersionEdit edit;
WalMetadata wal(kSizeInBytes / 2);
edit.AddWal(kLogNumber, wal);
Status s = LogAndApplyToDefaultCF(edit);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(
s.ToString().find(
"WAL 10 must not have smaller synced size than previous one") !=
std::string::npos)
<< s.ToString();
}
}
TEST_F(VersionSetTest, DeleteNonExistingWal) {
NewDB();
constexpr WalNumber kLogNumber = 10;
constexpr WalNumber kNonExistingNumber = 11;
constexpr uint64_t kSizeInBytes = 111;
{
// Add a closed WAL.
VersionEdit edit;
WalMetadata wal(kSizeInBytes);
edit.AddWal(kLogNumber, wal);
ASSERT_OK(LogAndApplyToDefaultCF(edit));
}
{
// Delete a non-existing WAL.
VersionEdit edit;
edit.DeleteWal(kNonExistingNumber);
Status s = LogAndApplyToDefaultCF(edit);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("WAL 11 must exist before deletion") !=
std::string::npos)
<< s.ToString();
}
}
TEST_F(VersionSetTest, AtomicGroupWithWalEdits) {
NewDB();
constexpr int kAtomicGroupSize = 10;
constexpr uint64_t kNumWals = 5;
const std::string kDBId = "db_db";
int remaining = kAtomicGroupSize;
autovector<std::unique_ptr<VersionEdit>> edits;
// Add 5 WALs.
for (uint64_t i = 1; i <= kNumWals; i++) {
edits.emplace_back(new VersionEdit);
// WAL's size equals its log number.
edits.back()->AddWal(i, WalMetadata(i));
edits.back()->MarkAtomicGroup(--remaining);
}
// One edit with the min log number set.
edits.emplace_back(new VersionEdit);
edits.back()->SetDBId(kDBId);
edits.back()->MarkAtomicGroup(--remaining);
// Delete the first added 4 WALs.
for (uint64_t i = 1; i < kNumWals; i++) {
edits.emplace_back(new VersionEdit);
edits.back()->DeleteWal(i);
edits.back()->MarkAtomicGroup(--remaining);
}
ASSERT_EQ(remaining, 0);
Status s = LogAndApplyToDefaultCF(edits);
// Recover a new VersionSet, the min log number and the last WAL should be
// kept.
{
std::unique_ptr<VersionSet> new_versions(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
std::string db_id;
ASSERT_OK(
new_versions->Recover(column_families_, /*read_only=*/false, &db_id));
ASSERT_EQ(db_id, kDBId);
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
ASSERT_TRUE(wals.find(kNumWals) != wals.end());
ASSERT_TRUE(wals.at(kNumWals).HasSyncedSize());
ASSERT_EQ(wals.at(kNumWals).GetSyncedSizeInBytes(), kNumWals);
}
}
class VersionSetAtomicGroupTest : public VersionSetTestBase,
public testing::Test {
public:

@ -19,10 +19,6 @@ void WalAddition::EncodeTo(std::string* dst) const {
PutVarint64(dst, metadata_.GetSyncedSizeInBytes());
}
if (metadata_.IsClosed()) {
PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kClosed));
}
PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kTerminate));
}
@ -48,10 +44,6 @@ Status WalAddition::DecodeFrom(Slice* src) {
metadata_.SetSyncedSizeInBytes(size);
break;
}
case WalAdditionTag::kClosed: {
metadata_.SetClosed();
break;
}
// TODO: process future tags such as checksum.
case WalAdditionTag::kTerminate:
return Status::OK();
@ -66,15 +58,13 @@ Status WalAddition::DecodeFrom(Slice* src) {
JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal) {
jw << "LogNumber" << wal.GetLogNumber() << "SyncedSizeInBytes"
<< wal.GetMetadata().GetSyncedSizeInBytes() << "Closed"
<< wal.GetMetadata().IsClosed();
<< wal.GetMetadata().GetSyncedSizeInBytes();
return jw;
}
std::ostream& operator<<(std::ostream& os, const WalAddition& wal) {
os << "log_number: " << wal.GetLogNumber()
<< " synced_size_in_bytes: " << wal.GetMetadata().GetSyncedSizeInBytes()
<< " closed: " << wal.GetMetadata().IsClosed();
<< " synced_size_in_bytes: " << wal.GetMetadata().GetSyncedSizeInBytes();
return os;
}
@ -117,31 +107,23 @@ std::string WalDeletion::DebugString() const {
Status WalSet::AddWal(const WalAddition& wal) {
auto it = wals_.lower_bound(wal.GetLogNumber());
bool existing = it != wals_.end() && it->first == wal.GetLogNumber();
if (wal.GetMetadata().IsClosed()) {
// The WAL must exist and not closed.
if (!existing) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " is not created before closing";
return Status::Corruption("WalSet", ss.str());
}
if (it->second.IsClosed()) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " is closed more than once";
return Status::Corruption("WalSet", ss.str());
}
if (existing && !wal.GetMetadata().HasSyncedSize()) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " is created more than once";
return Status::Corruption("WalSet", ss.str());
}
// If the WAL has synced size, it must >= the previous size.
if (existing && it->second.HasSyncedSize() &&
(!wal.GetMetadata().HasSyncedSize() ||
wal.GetMetadata().GetSyncedSizeInBytes() <
it->second.GetSyncedSizeInBytes())) {
if (wal.GetMetadata().HasSyncedSize() && existing &&
it->second.HasSyncedSize() &&
wal.GetMetadata().GetSyncedSizeInBytes() <
it->second.GetSyncedSizeInBytes()) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber()
<< " must not have smaller synced size than previous one";
return Status::Corruption("WalSet", ss.str());
}
if (existing) {
it->second = wal.GetMetadata();
it->second.SetSyncedSizeInBytes(wal.GetMetadata().GetSyncedSizeInBytes());
} else {
wals_.insert(it, {wal.GetLogNumber(), wal.GetMetadata()});
}
@ -161,17 +143,12 @@ Status WalSet::AddWals(const WalAdditions& wals) {
Status WalSet::DeleteWal(const WalDeletion& wal) {
auto it = wals_.find(wal.GetLogNumber());
// The WAL must exist and has been closed.
// The WAL must exist.
if (it == wals_.end()) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " must exist before deletion";
return Status::Corruption("WalSet", ss.str());
}
if (!it->second.IsClosed()) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " must be closed before deletion";
return Status::Corruption("WalSet", ss.str());
}
wals_.erase(it);
return Status::OK();
}
@ -187,6 +164,10 @@ Status WalSet::DeleteWals(const WalDeletions& wals) {
return s;
}
void WalSet::DeleteWalsBefore(WalNumber number) {
wals_.erase(wals_.begin(), wals_.lower_bound(number));
}
void WalSet::Reset() { wals_.clear(); }
Status WalSet::CheckWals(

@ -35,10 +35,6 @@ class WalMetadata {
explicit WalMetadata(uint64_t synced_size_bytes)
: synced_size_bytes_(synced_size_bytes) {}
bool IsClosed() const { return closed_; }
void SetClosed() { closed_ = true; }
bool HasSyncedSize() const { return synced_size_bytes_ != kUnknownWalSize; }
void SetSyncedSizeInBytes(uint64_t bytes) { synced_size_bytes_ = bytes; }
@ -52,9 +48,6 @@ class WalMetadata {
// Size of the most recently synced WAL in bytes.
uint64_t synced_size_bytes_ = kUnknownWalSize;
// Whether the WAL is closed.
bool closed_ = false;
};
// These tags are persisted to MANIFEST, so it's part of the user API.
@ -63,8 +56,6 @@ enum class WalAdditionTag : uint32_t {
kTerminate = 1,
// Synced Size in bytes.
kSyncedSize = 2,
// Whether the WAL is closed.
kClosed = 3,
// Add tags in the future, such as checksum?
};
@ -98,10 +89,10 @@ JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal);
using WalAdditions = std::vector<WalAddition>;
// Records the event of deleting/archiving a WAL in VersionEdit.
// Records the event of deleting a WAL.
class WalDeletion {
public:
WalDeletion() : number_(0) {}
WalDeletion() : number_(kEmpty) {}
explicit WalDeletion(WalNumber number) : number_(number) {}
@ -114,6 +105,8 @@ class WalDeletion {
std::string DebugString() const;
private:
static constexpr WalNumber kEmpty = 0;
WalNumber number_;
};
@ -146,6 +139,9 @@ class WalSet {
Status DeleteWal(const WalDeletion& wal);
Status DeleteWals(const WalDeletions& wals);
// Delete WALs with log number < wal_number.
void DeleteWalsBefore(WalNumber wal_number);
// Resets the internal state.
void Reset();

@ -24,14 +24,6 @@ TEST(WalSet, AddDeleteReset) {
}
ASSERT_EQ(wals.GetWals().size(), 10);
// Close WAL 1 - 5.
for (WalNumber log_number = 1; log_number <= 5; log_number++) {
WalMetadata wal(100);
wal.SetClosed();
wals.AddWal(WalAddition(log_number, wal));
}
ASSERT_EQ(wals.GetWals().size(), 10);
// Delete WAL 1 - 5.
for (WalNumber log_number = 1; log_number <= 5; log_number++) {
wals.DeleteWal(WalDeletion(log_number));
@ -72,46 +64,14 @@ TEST(WalSet, SmallerSyncedSize) {
std::string::npos);
}
TEST(WalSet, CloseTwice) {
constexpr WalNumber kNumber = 100;
constexpr uint64_t kBytes = 200;
WalSet wals;
ASSERT_OK(wals.AddWal(WalAddition(kNumber)));
WalMetadata wal(kBytes);
wal.SetClosed();
ASSERT_OK(wals.AddWal(WalAddition(kNumber, wal)));
Status s = wals.AddWal(WalAddition(kNumber, wal));
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("WAL 100 is closed more than once") !=
std::string::npos);
}
TEST(WalSet, CloseBeforeCreate) {
constexpr WalNumber kNumber = 100;
constexpr uint64_t kBytes = 200;
WalSet wals;
WalMetadata wal(kBytes);
wal.SetClosed();
Status s = wals.AddWal(WalAddition(kNumber, wal));
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("WAL 100 is not created before closing") !=
std::string::npos);
}
TEST(WalSet, CreateAfterClose) {
TEST(WalSet, CreateTwice) {
constexpr WalNumber kNumber = 100;
constexpr uint64_t kBytes = 200;
WalSet wals;
ASSERT_OK(wals.AddWal(WalAddition(kNumber)));
WalMetadata wal(kBytes);
wal.SetClosed();
ASSERT_OK(wals.AddWal(WalAddition(kNumber, wal)));
Status s = wals.AddWal(WalAddition(kNumber));
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(
s.ToString().find(
"WAL 100 must not have smaller synced size than previous one") !=
std::string::npos);
ASSERT_TRUE(s.ToString().find("WAL 100 is created more than once") !=
std::string::npos);
}
TEST(WalSet, DeleteNonExistingWal) {
@ -123,16 +83,6 @@ TEST(WalSet, DeleteNonExistingWal) {
std::string::npos);
}
TEST(WalSet, DeleteNonClosedWal) {
constexpr WalNumber kNonClosedWalNumber = 100;
WalSet wals;
ASSERT_OK(wals.AddWal(WalAddition(kNonClosedWalNumber)));
Status s = wals.DeleteWal(WalDeletion(kNonClosedWalNumber));
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("WAL 100 must be closed before deletion") !=
std::string::npos);
}
class WalSetTest : public DBTestBase {
public:
WalSetTest() : DBTestBase("WalSetTest", /* env_do_fsync */ true) {}
@ -165,7 +115,6 @@ class WalSetTest : public DBTestBase {
ASSERT_OK(wals_.AddWal(WalAddition(number)));
// Close WAL.
WalMetadata wal(size_bytes);
wal.SetClosed();
ASSERT_OK(wals_.AddWal(WalAddition(number, wal)));
}

Loading…
Cancel
Save