Track WAL in MANIFEST: LogAndApply WAL events to MANIFEST (#7601)

Summary:
When a WAL is synced, an edit is written to MANIFEST.
After flushing memtables, the obsoleted WALs are piggybacked to MANIFEST while writing the new L0 files to MANIFEST.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7601

Test Plan:
`track_and_verify_wals_in_manifest` is enabled by default for all tests extending `DBBasicTest`, and in db_stress_test.
Unit test `wal_edit_test`, `version_edit_test`, and `version_set_test` are also updated.
Watch all tests to pass.

Reviewed By: ltamasi

Differential Revision: D24553957

Pulled By: cheng-chang

fbshipit-source-id: 66a569ff1bdced38e22900bd240b73113906e040
main
Cheng Chang 4 years ago committed by Facebook GitHub Bot
parent 1ce105d0ea
commit 1e40696dd1
  1. 50
      db/db_impl/db_impl.cc
  2. 14
      db/db_impl/db_impl.h
  3. 6
      db/db_impl/db_impl_compaction_flush.cc
  4. 26
      db/db_impl/db_impl_files.cc
  5. 14
      db/db_impl/db_impl_open.cc
  6. 12
      db/db_impl/db_impl_write.cc
  7. 2
      db/db_range_del_test.cc
  8. 1
      db/db_test_util.cc
  9. 53
      db/memtable_list.cc
  10. 29
      db/version_edit.cc
  11. 22
      db/version_edit.h
  12. 3
      db/version_edit_handler.cc
  13. 29
      db/version_edit_test.cc
  14. 4
      db/version_set.cc
  15. 90
      db/version_set_test.cc
  16. 26
      db/wal_edit.cc
  17. 20
      db/wal_edit.h
  18. 16
      db/wal_edit_test.cc
  19. 1
      db_stress_tool/db_stress_test_base.cc

@ -1283,7 +1283,11 @@ Status DBImpl::SyncWAL() {
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
{
InstrumentedMutexLock l(&mutex_);
MarkLogsSynced(current_log_number, need_log_dir_sync, status);
if (status.ok()) {
status = MarkLogsSynced(current_log_number, need_log_dir_sync);
} else {
MarkLogsNotSynced(current_log_number);
}
}
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");
@ -1309,27 +1313,53 @@ Status DBImpl::UnlockWAL() {
return Status::OK();
}
void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
const Status& status) {
Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
mutex_.AssertHeld();
if (synced_dir && logfile_number_ == up_to && status.ok()) {
if (synced_dir && logfile_number_ == up_to) {
log_dir_synced_ = true;
}
VersionEdit synced_wals;
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
auto& log = *it;
assert(log.getting_synced);
if (status.ok() && logs_.size() > 1) {
logs_to_free_.push_back(log.ReleaseWriter());
auto& wal = *it;
assert(wal.getting_synced);
if (logs_.size() > 1) {
if (immutable_db_options_.track_and_verify_wals_in_manifest) {
synced_wals.AddWal(wal.number,
WalMetadata(wal.writer->file()->GetFileSize()));
}
logs_to_free_.push_back(wal.ReleaseWriter());
// To modify logs_ both mutex_ and log_write_mutex_ must be held
InstrumentedMutexLock l(&log_write_mutex_);
it = logs_.erase(it);
} else {
log.getting_synced = false;
wal.getting_synced = false;
++it;
}
}
assert(!status.ok() || logs_.empty() || logs_[0].number > up_to ||
assert(logs_.empty() || logs_[0].number > up_to ||
(logs_.size() == 1 && !logs_[0].getting_synced));
Status s;
if (synced_wals.IsWalAddition()) {
// not empty, write to MANIFEST.
s = versions_->LogAndApplyToDefaultColumnFamily(&synced_wals, &mutex_);
if (!s.ok() && versions_->io_status().IsIOError()) {
s = error_handler_.SetBGError(versions_->io_status(),
BackgroundErrorReason::kManifestWrite);
}
}
log_sync_cv_.SignalAll();
return s;
}
void DBImpl::MarkLogsNotSynced(uint64_t up_to) {
mutex_.AssertHeld();
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;
++it) {
auto& wal = *it;
assert(wal.getting_synced);
wal.getting_synced = false;
}
log_sync_cv_.SignalAll();
}

@ -1702,7 +1702,9 @@ class DBImpl : public DB {
std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer);
// helper function to call after some of the logs_ were synced
void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status);
Status MarkLogsSynced(uint64_t up_to, bool synced_dir);
// WALs with log number up to up_to are not synced successfully.
void MarkLogsNotSynced(uint64_t up_to);
SnapshotImpl* GetSnapshotImpl(bool is_write_conflict_boundary,
bool lock = true);
@ -2204,12 +2206,18 @@ extern CompressionType GetCompressionFlush(
// `memtables_to_flush`) will be flushed and thus will not depend on any WAL
// file.
// The function is only applicable to 2pc mode.
extern uint64_t PrecomputeMinLogNumberToKeep(
extern uint64_t PrecomputeMinLogNumberToKeep2PC(
VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
autovector<VersionEdit*> edit_list,
const autovector<VersionEdit*>& edit_list,
const autovector<MemTable*>& memtables_to_flush,
LogsWithPrepTracker* prep_tracker);
// In non-2PC mode, WALs with log number < the returned number can be
// deleted after the cfd_to_flush column family is flushed successfully.
extern uint64_t PrecomputeMinLogNumberToKeepNon2PC(
VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
const autovector<VersionEdit*>& edit_list);
// `cfd_to_flush` is the column family whose memtable will be flushed and thus
// will not depend on any WAL file. nullptr means no memtable is being flushed.
// The function is only applicable to 2pc mode.

@ -123,7 +123,11 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
// "number <= current_log_number - 1" is equivalent to
// "number < current_log_number".
MarkLogsSynced(current_log_number - 1, true, io_s);
if (io_s.ok()) {
io_s = status_to_io_status(MarkLogsSynced(current_log_number - 1, true));
} else {
MarkLogsNotSynced(current_log_number - 1);
}
if (!io_s.ok()) {
if (total_log_size_ > 0) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush)

@ -680,16 +680,10 @@ uint64_t FindMinPrepLogReferencedByMemTable(
return min_log;
}
uint64_t PrecomputeMinLogNumberToKeep(
uint64_t PrecomputeMinLogNumberToKeepNon2PC(
VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
autovector<VersionEdit*> edit_list,
const autovector<MemTable*>& memtables_to_flush,
LogsWithPrepTracker* prep_tracker) {
const autovector<VersionEdit*>& edit_list) {
assert(vset != nullptr);
assert(prep_tracker != nullptr);
// Calculate updated min_log_number_to_keep
// Since the function should only be called in 2pc mode, log number in
// the version edit should be sufficient.
// Precompute the min log number containing unflushed data for the column
// family being flushed (`cfd_to_flush`).
@ -713,6 +707,22 @@ uint64_t PrecomputeMinLogNumberToKeep(
min_log_number_to_keep =
std::min(cf_min_log_number_to_keep, min_log_number_to_keep);
}
return min_log_number_to_keep;
}
uint64_t PrecomputeMinLogNumberToKeep2PC(
VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
const autovector<VersionEdit*>& edit_list,
const autovector<MemTable*>& memtables_to_flush,
LogsWithPrepTracker* prep_tracker) {
assert(vset != nullptr);
assert(prep_tracker != nullptr);
// Calculate updated min_log_number_to_keep
// Since the function should only be called in 2pc mode, log number in
// the version edit should be sufficient.
uint64_t min_log_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, cfd_to_flush, edit_list);
// if are 2pc we must consider logs containing prepared
// sections of outstanding transactions.

@ -589,18 +589,20 @@ Status DBImpl::Recover(
}
if (immutable_db_options_.track_and_verify_wals_in_manifest) {
// Verify WALs in MANIFEST.
s = versions_->GetWalSet().CheckWals(env_, wal_files);
if (!immutable_db_options_.best_efforts_recovery) {
// Verify WALs in MANIFEST.
s = versions_->GetWalSet().CheckWals(env_, wal_files);
} // else since best effort recovery does not recover from WALs, no need
// to check WALs.
} else if (!versions_->GetWalSet().GetWals().empty()) {
// Tracking is disabled, clear previously tracked WALs from MANIFEST,
// otherwise, in the future, if WAL tracking is enabled again,
// since the WALs deleted when WAL tracking is disabled are not persisted
// into MANIFEST, WAL check may fail.
VersionEdit edit;
for (const auto& wal : versions_->GetWalSet().GetWals()) {
WalNumber number = wal.first;
edit.DeleteWal(number);
}
WalNumber max_wal_number =
versions_->GetWalSet().GetWals().rbegin()->first;
edit.DeleteWalsBefore(max_wal_number + 1);
s = versions_->LogAndApplyToDefaultColumnFamily(&edit, &mutex_);
}
if (!s.ok()) {

@ -426,7 +426,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (need_log_sync) {
mutex_.Lock();
MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
if (status.ok()) {
status = MarkLogsSynced(logfile_number_, need_log_dir_sync);
} else {
MarkLogsNotSynced(logfile_number_);
}
mutex_.Unlock();
// Requesting sync with two_write_queues_ is expected to be very rare. We
// hence provide a simple implementation that is not necessarily efficient.
@ -551,7 +555,11 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
if (need_log_sync) {
mutex_.Lock();
MarkLogsSynced(logfile_number_, need_log_dir_sync, w.status);
if (w.status.ok()) {
w.status = MarkLogsSynced(logfile_number_, need_log_dir_sync);
} else {
MarkLogsNotSynced(logfile_number_);
}
mutex_.Unlock();
}

@ -159,7 +159,7 @@ TEST_F(DBRangeDelTest, MaxCompactionBytesCutsOutputFiles) {
// Want max_compaction_bytes to trigger the end of compaction output file, not
// target_file_size_base, so make the latter much bigger
opts.target_file_size_base = 100 * opts.max_compaction_bytes;
Reopen(opts);
DestroyAndReopen(opts);
// snapshot protects range tombstone from dropping due to becoming obsolete.
const Snapshot* snapshot = db_->GetSnapshot();

@ -340,6 +340,7 @@ Options DBTestBase::GetDefaultOptions() const {
options.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
options.compaction_pri = CompactionPri::kByCompensatedSize;
options.env = env_;
options.track_and_verify_wals_in_manifest = true;
return options;
}

@ -473,12 +473,27 @@ Status MemTableList::TryInstallMemtableFlushResults(
// TODO(myabandeh): Not sure how batch_count could be 0 here.
if (batch_count > 0) {
uint64_t min_wal_number_to_keep = 0;
if (vset->db_options()->allow_2pc) {
assert(edit_list.size() > 0);
min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC(
vset, *cfd, edit_list, memtables_to_flush, prep_tracker);
// We piggyback the information of earliest log file to keep in the
// manifest entry for the last file flushed.
edit_list.back()->SetMinLogNumberToKeep(PrecomputeMinLogNumberToKeep(
vset, *cfd, edit_list, memtables_to_flush, prep_tracker));
edit_list.back()->SetMinLogNumberToKeep(min_wal_number_to_keep);
} else {
min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list);
}
std::unique_ptr<VersionEdit> wal_deletion;
if (vset->db_options()->track_and_verify_wals_in_manifest) {
const auto& wals = vset->GetWalSet().GetWals();
if (!wals.empty() && min_wal_number_to_keep > wals.begin()->first) {
wal_deletion.reset(new VersionEdit);
wal_deletion->DeleteWalsBefore(min_wal_number_to_keep);
edit_list.push_back(wal_deletion.get());
}
}
const auto manifest_write_cb = [this, cfd, batch_count, log_buffer,
@ -704,6 +719,10 @@ Status InstallMemtableAtomicFlushResults(
if (imm_lists != nullptr) {
assert(imm_lists->size() == num);
}
if (num == 0) {
return Status::OK();
}
for (size_t k = 0; k != num; ++k) {
#ifndef NDEBUG
const auto* imm =
@ -732,12 +751,36 @@ Status InstallMemtableAtomicFlushResults(
++num_entries;
edit_lists.emplace_back(edits);
}
// TODO(cc): after https://github.com/facebook/rocksdb/pull/7570, handle 2pc
// here.
std::unique_ptr<VersionEdit> wal_deletion;
if (vset->db_options()->track_and_verify_wals_in_manifest) {
uint64_t min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, *cfds[0], edit_lists[0]);
for (size_t i = 1; i < cfds.size(); i++) {
min_wal_number_to_keep = std::min(
min_wal_number_to_keep,
PrecomputeMinLogNumberToKeepNon2PC(vset, *cfds[i], edit_lists[i]));
}
const auto& wals = vset->GetWalSet().GetWals();
if (!wals.empty() && min_wal_number_to_keep > wals.begin()->first) {
wal_deletion.reset(new VersionEdit);
wal_deletion->DeleteWalsBefore(min_wal_number_to_keep);
edit_lists.back().push_back(wal_deletion.get());
++num_entries;
}
}
// Mark the version edits as an atomic group if the number of version edits
// exceeds 1.
if (cfds.size() > 1) {
for (auto& edits : edit_lists) {
assert(edits.size() == 1);
edits[0]->MarkAtomicGroup(--num_entries);
for (size_t i = 0; i < edit_lists.size(); i++) {
assert((edit_lists[i].size() == 1) ||
((edit_lists[i].size() == 2) && (i == edit_lists.size() - 1)));
for (auto& e : edit_lists[i]) {
e->MarkAtomicGroup(--num_entries);
}
}
assert(0 == num_entries);
}

@ -89,7 +89,7 @@ void VersionEdit::Clear() {
blob_file_additions_.clear();
blob_file_garbages_.clear();
wal_additions_.clear();
wal_deletions_.clear();
wal_deletion_.Reset();
column_family_ = 0;
is_column_family_add_ = false;
is_column_family_drop_ = false;
@ -229,9 +229,9 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
wal_addition.EncodeTo(dst);
}
for (const auto& wal_deletion : wal_deletions_) {
if (!wal_deletion_.IsEmpty()) {
PutVarint32(dst, kWalDeletion);
wal_deletion.EncodeTo(dst);
wal_deletion_.EncodeTo(dst);
}
// 0 is default and does not need to be explicitly written
@ -576,7 +576,7 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
return s;
}
wal_deletions_.emplace_back(std::move(wal_deletion));
wal_deletion_ = std::move(wal_deletion);
break;
}
@ -725,9 +725,9 @@ std::string VersionEdit::DebugString(bool hex_key) const {
r.append(wal_addition.DebugString());
}
for (const auto& wal_deletion : wal_deletions_) {
if (!wal_deletion_.IsEmpty()) {
r.append("\n WalDeletion: ");
r.append(wal_deletion.DebugString());
r.append(wal_deletion_.DebugString());
}
r.append("\n ColumnFamily: ");
@ -854,18 +854,11 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const {
jw.EndArray();
}
if (!wal_deletions_.empty()) {
jw << "WalDeletions";
jw.StartArray();
for (const auto& wal_deletion : wal_deletions_) {
jw.StartArrayedObject();
jw << wal_deletion;
jw.EndArrayedObject();
}
jw.EndArray();
if (!wal_deletion_.IsEmpty()) {
jw << "WalDeletion";
jw.StartObject();
jw << wal_deletion_;
jw.EndObject();
}
jw << "ColumnFamily" << column_family_;

@ -452,6 +452,7 @@ class VersionEdit {
}
// Add a WAL (either just created or closed).
// AddWal and DeleteWalsBefore cannot be called on the same VersionEdit.
void AddWal(WalNumber number, WalMetadata metadata = WalMetadata()) {
assert(NumEntries() == wal_additions_.size());
wal_additions_.emplace_back(number, std::move(metadata));
@ -463,22 +464,27 @@ class VersionEdit {
bool IsWalAddition() const { return !wal_additions_.empty(); }
// Delete a WAL (either directly deleted or archived).
void DeleteWal(WalNumber number) {
assert(NumEntries() == wal_deletions_.size());
wal_deletions_.emplace_back(number);
// AddWal and DeleteWalsBefore cannot be called on the same VersionEdit.
void DeleteWalsBefore(WalNumber number) {
assert((NumEntries() == 1) == !wal_deletion_.IsEmpty());
wal_deletion_ = WalDeletion(number);
}
const WalDeletions& GetWalDeletions() const { return wal_deletions_; }
const WalDeletion& GetWalDeletion() const { return wal_deletion_; }
bool IsWalDeletion() const { return !wal_deletions_.empty(); }
bool IsWalDeletion() const { return !wal_deletion_.IsEmpty(); }
bool IsWalManipulation() const { return IsWalAddition() || IsWalDeletion(); }
bool IsWalManipulation() const {
size_t entries = NumEntries();
return (entries > 0) && ((entries == wal_additions_.size()) ||
(entries == !wal_deletion_.IsEmpty()));
}
// Number of edits
size_t NumEntries() const {
return new_files_.size() + deleted_files_.size() +
blob_file_additions_.size() + blob_file_garbages_.size() +
wal_additions_.size() + wal_deletions_.size();
wal_additions_.size() + !wal_deletion_.IsEmpty();
}
void SetColumnFamily(uint32_t column_family_id) {
@ -563,7 +569,7 @@ class VersionEdit {
BlobFileGarbages blob_file_garbages_;
WalAdditions wal_additions_;
WalDeletions wal_deletions_;
WalDeletion wal_deletion_;
// Each version edit record should have column_family_ set
// If it's not set, it is default (0)

@ -201,7 +201,8 @@ Status VersionEditHandler::OnWalAddition(VersionEdit& edit) {
Status VersionEditHandler::OnWalDeletion(VersionEdit& edit) {
assert(edit.IsWalDeletion());
return version_set_->wals_.DeleteWals(edit.GetWalDeletions());
return version_set_->wals_.DeleteWalsBefore(
edit.GetWalDeletion().GetLogNumber());
}
Status VersionEditHandler::OnNonCfOperation(VersionEdit& edit,

@ -470,9 +470,7 @@ TEST_F(VersionEditTest, AddWalDebug) {
TEST_F(VersionEditTest, DeleteWalEncodeDecode) {
VersionEdit edit;
for (uint64_t log_number = 1; log_number <= 20; log_number++) {
edit.DeleteWal(log_number);
}
edit.DeleteWalsBefore(rand() % 100);
TestEncodeDecode(edit);
}
@ -481,36 +479,29 @@ TEST_F(VersionEditTest, DeleteWalDebug) {
constexpr std::array<uint64_t, n> kLogNumbers{{10, 20}};
VersionEdit edit;
for (int i = 0; i < n; i++) {
edit.DeleteWal(kLogNumbers[i]);
}
edit.DeleteWalsBefore(kLogNumbers[n - 1]);
const WalDeletions& wals = edit.GetWalDeletions();
const WalDeletion& wal = edit.GetWalDeletion();
ASSERT_TRUE(edit.IsWalDeletion());
ASSERT_EQ(wals.size(), n);
for (int i = 0; i < n; i++) {
const WalDeletion& wal = wals[i];
ASSERT_EQ(wal.GetLogNumber(), kLogNumbers[i]);
}
ASSERT_EQ(wal.GetLogNumber(), kLogNumbers[n - 1]);
std::string expected_str = "VersionEdit {\n";
for (int i = 0; i < n; i++) {
{
std::stringstream ss;
ss << " WalDeletion: log_number: " << kLogNumbers[i] << "\n";
ss << " WalDeletion: log_number: " << kLogNumbers[n - 1] << "\n";
expected_str += ss.str();
}
expected_str += " ColumnFamily: 0\n}\n";
ASSERT_EQ(edit.DebugString(true), expected_str);
std::string expected_json = "{\"EditNumber\": 4, \"WalDeletions\": [";
for (int i = 0; i < n; i++) {
std::string expected_json = "{\"EditNumber\": 4, \"WalDeletion\": ";
{
std::stringstream ss;
ss << "{\"LogNumber\": " << kLogNumbers[i] << "}";
if (i < n - 1) ss << ", ";
ss << "{\"LogNumber\": " << kLogNumbers[n - 1] << "}";
expected_json += ss.str();
}
expected_json += "], \"ColumnFamily\": 0}";
expected_json += ", \"ColumnFamily\": 0}";
ASSERT_EQ(edit.DebugJSON(4, true), expected_json);
}

@ -4179,7 +4179,7 @@ Status VersionSet::ProcessManifestWrites(
if (e->IsWalAddition()) {
s = wals_.AddWals(e->GetWalAdditions());
} else if (e->IsWalDeletion()) {
s = wals_.DeleteWals(e->GetWalDeletions());
s = wals_.DeleteWalsBefore(e->GetWalDeletion().GetLogNumber());
}
if (!s.ok()) {
break;
@ -4527,7 +4527,7 @@ Status VersionSet::ApplyOneVersionEditToBuilder(
return s;
}
} else if (edit.IsWalDeletion()) {
Status s = wals_.DeleteWals(edit.GetWalDeletions());
Status s = wals_.DeleteWalsBefore(edit.GetWalDeletion().GetLogNumber());
if (!s.ok()) {
return s;
}

@ -1192,10 +1192,8 @@ TEST_F(VersionSetTest, WalEditsNotAppliedToVersion) {
edits.back()->AddWal(i, WalMetadata(i));
}
// Delete the first half of the WALs.
for (uint64_t i = 1; i <= kNumWals; i++) {
edits.emplace_back(new VersionEdit);
edits.back()->DeleteWal(i);
}
edits.emplace_back(new VersionEdit);
edits.back()->DeleteWalsBefore(kNumWals / 2 + 1);
autovector<Version*> versions;
SyncPoint::GetInstance()->SetCallBack(
@ -1228,10 +1226,8 @@ TEST_F(VersionSetTest, NonWalEditsAppliedToVersion) {
edits.back()->AddWal(i, WalMetadata(i));
}
// Delete the first half of the WALs.
for (uint64_t i = 1; i <= kNumWals; i++) {
edits.emplace_back(new VersionEdit);
edits.back()->DeleteWal(i);
}
edits.emplace_back(new VersionEdit);
edits.back()->DeleteWalsBefore(kNumWals / 2 + 1);
edits.emplace_back(new VersionEdit);
edits.back()->SetDBId(kDBId);
@ -1411,7 +1407,7 @@ TEST_F(VersionSetTest, WalDeletion) {
// Delete the closed WAL.
{
VersionEdit edit;
edit.DeleteWal(kClosedLogNumber);
edit.DeleteWalsBefore(kNonClosedLogNumber);
ASSERT_OK(LogAndApplyToDefaultCF(edit));
@ -1549,39 +1545,83 @@ TEST_F(VersionSetTest, AddWalWithSmallerSize) {
}
}
TEST_F(VersionSetTest, DeleteNonExistingWal) {
TEST_F(VersionSetTest, DeleteWalsBeforeNonExistingWalNumber) {
NewDB();
constexpr WalNumber kLogNumber = 10;
constexpr WalNumber kNonExistingNumber = 11;
constexpr WalNumber kLogNumber0 = 10;
constexpr WalNumber kLogNumber1 = 20;
constexpr WalNumber kNonExistingNumber = 15;
constexpr uint64_t kSizeInBytes = 111;
{
// Add closed WALs.
VersionEdit edit;
WalMetadata wal(kSizeInBytes);
edit.AddWal(kLogNumber0, wal);
edit.AddWal(kLogNumber1, wal);
ASSERT_OK(LogAndApplyToDefaultCF(edit));
}
{
// Delete WALs before a non-existing WAL.
VersionEdit edit;
edit.DeleteWalsBefore(kNonExistingNumber);
ASSERT_OK(LogAndApplyToDefaultCF(edit));
}
// Recover a new VersionSet, WAL0 is deleted, WAL1 is not.
{
std::unique_ptr<VersionSet> new_versions(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
ASSERT_TRUE(wals.find(kLogNumber1) != wals.end());
}
}
TEST_F(VersionSetTest, DeleteAllWals) {
NewDB();
constexpr WalNumber kMaxLogNumber = 10;
constexpr uint64_t kSizeInBytes = 111;
{
// Add a closed WAL.
VersionEdit edit;
WalMetadata wal(kSizeInBytes);
edit.AddWal(kLogNumber, wal);
edit.AddWal(kMaxLogNumber, wal);
ASSERT_OK(LogAndApplyToDefaultCF(edit));
}
{
// Delete a non-existing WAL.
VersionEdit edit;
edit.DeleteWal(kNonExistingNumber);
edit.DeleteWalsBefore(kMaxLogNumber + 10);
Status s = LogAndApplyToDefaultCF(edit);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("WAL 11 must exist before deletion") !=
std::string::npos)
<< s.ToString();
ASSERT_OK(LogAndApplyToDefaultCF(edit));
}
// Recover a new VersionSet, all WALs are deleted.
{
std::unique_ptr<VersionSet> new_versions(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 0);
}
}
TEST_F(VersionSetTest, AtomicGroupWithWalEdits) {
NewDB();
constexpr int kAtomicGroupSize = 10;
constexpr int kAtomicGroupSize = 7;
constexpr uint64_t kNumWals = 5;
const std::string kDBId = "db_db";
@ -1599,11 +1639,9 @@ TEST_F(VersionSetTest, AtomicGroupWithWalEdits) {
edits.back()->SetDBId(kDBId);
edits.back()->MarkAtomicGroup(--remaining);
// Delete the first added 4 WALs.
for (uint64_t i = 1; i < kNumWals; i++) {
edits.emplace_back(new VersionEdit);
edits.back()->DeleteWal(i);
edits.back()->MarkAtomicGroup(--remaining);
}
edits.emplace_back(new VersionEdit);
edits.back()->DeleteWalsBefore(kNumWals);
edits.back()->MarkAtomicGroup(--remaining);
ASSERT_EQ(remaining, 0);
Status s = LogAndApplyToDefaultCF(edits);

@ -141,33 +141,11 @@ Status WalSet::AddWals(const WalAdditions& wals) {
return s;
}
Status WalSet::DeleteWal(const WalDeletion& wal) {
auto it = wals_.find(wal.GetLogNumber());
// The WAL must exist.
if (it == wals_.end()) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " must exist before deletion";
return Status::Corruption("WalSet", ss.str());
}
wals_.erase(it);
Status WalSet::DeleteWalsBefore(WalNumber wal) {
wals_.erase(wals_.begin(), wals_.lower_bound(wal));
return Status::OK();
}
Status WalSet::DeleteWals(const WalDeletions& wals) {
Status s;
for (const WalDeletion& wal : wals) {
s = DeleteWal(wal);
if (!s.ok()) {
break;
}
}
return s;
}
void WalSet::DeleteWalsBefore(WalNumber number) {
wals_.erase(wals_.begin(), wals_.lower_bound(number));
}
void WalSet::Reset() { wals_.clear(); }
Status WalSet::CheckWals(

@ -89,7 +89,7 @@ JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal);
using WalAdditions = std::vector<WalAddition>;
// Records the event of deleting a WAL.
// Records the event of deleting WALs before the specified log number.
class WalDeletion {
public:
WalDeletion() : number_(kEmpty) {}
@ -104,6 +104,10 @@ class WalDeletion {
std::string DebugString() const;
bool IsEmpty() const { return number_ == kEmpty; }
void Reset() { number_ = kEmpty; }
private:
static constexpr WalNumber kEmpty = 0;
@ -113,11 +117,9 @@ class WalDeletion {
std::ostream& operator<<(std::ostream& os, const WalDeletion& wal);
JSONWriter& operator<<(JSONWriter& jw, const WalDeletion& wal);
using WalDeletions = std::vector<WalDeletion>;
// Used in VersionSet to keep the current set of WALs.
//
// When a WAL is created, closed, deleted, or archived,
// When a WAL is synced or becomes obsoleted,
// a VersionEdit is logged to MANIFEST and
// the WAL is added to or deleted from WalSet.
//
@ -132,15 +134,9 @@ class WalSet {
Status AddWal(const WalAddition& wal);
Status AddWals(const WalAdditions& wals);
// Delete WAL(s).
// The WAL to be deleted must exist and be closed, otherwise,
// return Status::Corruption.
// Delete WALs with log number smaller than the specified wal number.
// Can happen when applying a VersionEdit or recovering from MANIFEST.
Status DeleteWal(const WalDeletion& wal);
Status DeleteWals(const WalDeletions& wals);
// Delete WALs with log number < wal_number.
void DeleteWalsBefore(WalNumber wal_number);
Status DeleteWalsBefore(WalNumber wal);
// Resets the internal state.
void Reset();

@ -25,9 +25,7 @@ TEST(WalSet, AddDeleteReset) {
ASSERT_EQ(wals.GetWals().size(), 10);
// Delete WAL 1 - 5.
for (WalNumber log_number = 1; log_number <= 5; log_number++) {
wals.DeleteWal(WalDeletion(log_number));
}
wals.DeleteWalsBefore(6);
ASSERT_EQ(wals.GetWals().size(), 5);
WalNumber expected_log_number = 6;
@ -74,13 +72,13 @@ TEST(WalSet, CreateTwice) {
std::string::npos);
}
TEST(WalSet, DeleteNonExistingWal) {
constexpr WalNumber kNonExistingNumber = 100;
TEST(WalSet, DeleteAllWals) {
constexpr WalNumber kMaxWalNumber = 10;
WalSet wals;
Status s = wals.DeleteWal(WalDeletion(kNonExistingNumber));
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("WAL 100 must exist before deletion") !=
std::string::npos);
for (WalNumber i = 1; i <= kMaxWalNumber; i++) {
wals.AddWal(WalAddition(i));
}
ASSERT_OK(wals.DeleteWalsBefore(kMaxWalNumber + 1));
}
class WalSetTest : public DBTestBase {

@ -2066,6 +2066,7 @@ void StressTest::Open() {
FLAGS_level_compaction_dynamic_level_bytes;
options_.file_checksum_gen_factory =
GetFileChecksumImpl(FLAGS_file_checksum_impl);
options_.track_and_verify_wals_in_manifest = true;
} else {
#ifdef ROCKSDB_LITE
fprintf(stderr, "--options_file not supported in lite mode\n");

Loading…
Cancel
Save