Skip deleted WALs during recovery

Summary:
This patch record the deleted WAL numbers in the manifest to ignore them and any WAL older than them during recovery. This is to avoid scenarios when we have a gap between the WAL files are fed to the recovery procedure. The gap could happen by for example out-of-order WAL deletion. Such gap could cause problems in 2PC recovery where the prepared and commit entry are placed into two separate WAL and gap in the WALs could result into not processing the WAL with the commit entry and hence breaking the 2PC recovery logic.
Closes https://github.com/facebook/rocksdb/pull/3488

Differential Revision: D6967893

Pulled By: maysamyabandeh

fbshipit-source-id: 13119feb155a08ab6d4909f437c7a750480dc8a1
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent 89d989ed75
commit 73f21a7b21
  1. 8
      db/db_flush_test.cc
  2. 12
      db/db_impl_files.cc
  3. 7
      db/db_impl_open.cc
  4. 3
      db/db_test_util.h
  5. 102
      db/db_wal_test.cc
  6. 4
      db/external_sst_file_test.cc
  7. 58
      db/version_edit.cc
  8. 7
      db/version_edit.h
  9. 10
      db/version_edit_test.cc
  10. 29
      db/version_set.cc
  11. 11
      db/version_set.h

@ -72,19 +72,23 @@ TEST_F(DBFlushTest, SyncFail) {
auto* cfd =
reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
->cfd();
int refs_before = cfd->current()->TEST_refs();
FlushOptions flush_options;
flush_options.wait = false;
ASSERT_OK(dbfull()->Flush(flush_options));
// Flush installs a new super-version. Get the ref count after that.
auto current_before = cfd->current();
int refs_before = cfd->current()->TEST_refs();
fault_injection_env->SetFilesystemActive(false);
TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
fault_injection_env->SetFilesystemActive(true);
// Now the background job will do the flush; wait for it.
dbfull()->TEST_WaitForFlushMemTable();
#ifndef ROCKSDB_LITE
ASSERT_EQ("", FilesPerLevel()); // flush failed.
#endif // ROCKSDB_LITE
// Flush job should release ref count to current version.
// Backgroun flush job should release ref count to current version.
ASSERT_EQ(current_before, cfd->current());
ASSERT_EQ(refs_before, cfd->current()->TEST_refs());
Destroy(options);
}

@ -332,6 +332,8 @@ bool CompareCandidateFile(const JobContext::CandidateFileInfo& first,
}; // namespace
// Delete obsolete files and log status and information of file deletion
// Note: All WAL files must be deleted through this function (unelss they are
// archived) to ensure that maniefest is updated properly.
void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
FileType type, uint64_t number,
uint32_t path_id) {
@ -340,6 +342,16 @@ void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
file_deletion_status =
DeleteSSTFile(&immutable_db_options_, fname, path_id);
} else {
if (type == kLogFile) {
// Before deleting the file, mark file as deleted in the manifest
VersionEdit edit;
edit.SetDeletedLogNumber(number);
auto edit_cfd = versions_->GetColumnFamilySet()->GetDefault();
auto edit_cf_opts = edit_cfd->GetLatestMutableCFOptions();
mutex_.Lock();
versions_->LogAndApply(edit_cfd, *edit_cf_opts, &edit, &mutex_);
mutex_.Unlock();
}
file_deletion_status = env_->DeleteFile(fname);
}
TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl:AfterDeletion",

@ -529,6 +529,13 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
bool flushed = false;
uint64_t corrupted_log_number = kMaxSequenceNumber;
for (auto log_number : log_numbers) {
if (log_number <= versions_->latest_deleted_log_number()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Skipping log #%" PRIu64
" since it is not newer than latest deleted log #%" PRIu64,
log_number, versions_->latest_deleted_log_number());
continue;
}
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.

@ -451,7 +451,8 @@ class SpecialEnv : public EnvWrapper {
return s;
}
Status NewSequentialFile(const std::string& f, unique_ptr<SequentialFile>* r,
virtual Status NewSequentialFile(const std::string& f,
unique_ptr<SequentialFile>* r,
const EnvOptions& soptions) override {
class CountingFile : public SequentialFile {
public:

@ -20,6 +20,106 @@ class DBWALTest : public DBTestBase {
DBWALTest() : DBTestBase("/db_wal_test") {}
};
// A SpecialEnv enriched to give more insight about deleted files
class EnrichedSpecialEnv : public SpecialEnv {
public:
explicit EnrichedSpecialEnv(Env* base) : SpecialEnv(base) {}
Status NewSequentialFile(const std::string& f, unique_ptr<SequentialFile>* r,
const EnvOptions& soptions) override {
InstrumentedMutexLock l(&env_mutex_);
if (f == skipped_wal) {
deleted_wal_reopened = true;
if (IsWAL(f) && largetest_deleted_wal.size() != 0 &&
f.compare(largetest_deleted_wal) <= 0) {
gap_in_wals = true;
}
}
return SpecialEnv::NewSequentialFile(f, r, soptions);
}
Status DeleteFile(const std::string& fname) override {
if (IsWAL(fname)) {
deleted_wal_cnt++;
InstrumentedMutexLock l(&env_mutex_);
// If this is the first WAL, remember its name and skip deleting it. We
// remember its name partly because the application might attempt to
// delete the file again.
if (skipped_wal.size() != 0 && skipped_wal != fname) {
if (largetest_deleted_wal.size() == 0 ||
largetest_deleted_wal.compare(fname) < 0) {
largetest_deleted_wal = fname;
}
} else {
skipped_wal = fname;
return Status::OK();
}
}
return SpecialEnv::DeleteFile(fname);
}
bool IsWAL(const std::string& fname) {
// printf("iswal %s\n", fname.c_str());
return fname.compare(fname.size() - 3, 3, "log") == 0;
}
InstrumentedMutex env_mutex_;
// the wal whose actual delete was skipped by the env
std::string skipped_wal = "";
// the largest WAL that was requested to be deleted
std::string largetest_deleted_wal = "";
// number of WALs that were successfully deleted
std::atomic<size_t> deleted_wal_cnt = {0};
// the WAL whose delete from fs was skipped is reopened during recovery
std::atomic<bool> deleted_wal_reopened = {false};
// whether a gap in the WALs was detected during recovery
std::atomic<bool> gap_in_wals = {false};
};
class DBWALTestWithEnrichedEnv : public DBTestBase {
public:
DBWALTestWithEnrichedEnv() : DBTestBase("/db_wal_test") {
enriched_env_ = new EnrichedSpecialEnv(env_->target());
auto options = CurrentOptions();
options.env = enriched_env_;
Reopen(options);
delete env_;
// to be deleted by the parent class
env_ = enriched_env_;
}
protected:
EnrichedSpecialEnv* enriched_env_;
};
// Test that the recovery would successfully avoid the gaps between the logs.
// One known scenario that could cause this is that the application issue the
// WAL deletion out of order. For the sake of simplicity in the test, here we
// create the gap by manipulating the env to skip deletion of the first WAL but
// not the ones after it.
TEST_F(DBWALTestWithEnrichedEnv, SkipDeletedWALs) {
auto options = last_options_;
// To cause frequent WAL deletion
options.write_buffer_size = 128;
Reopen(options);
WriteOptions writeOpt = WriteOptions();
for (int i = 0; i < 128 * 5; i++) {
ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1"));
}
FlushOptions fo;
fo.wait = true;
ASSERT_OK(db_->Flush(fo));
// some wals are deleted
ASSERT_NE(0, enriched_env_->deleted_wal_cnt);
// but not the first one
ASSERT_NE(0, enriched_env_->skipped_wal.size());
// Test that the WAL that was not deleted will be skipped during recovery
options = last_options_;
Reopen(options);
ASSERT_FALSE(enriched_env_->deleted_wal_reopened);
ASSERT_FALSE(enriched_env_->gap_in_wals);
}
TEST_F(DBWALTest, WAL) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
@ -891,7 +991,7 @@ TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) {
// Record the offset at this point
Env* env = options.env;
int wal_file_id = RecoveryTestHelper::kWALFileOffset + 1;
uint64_t wal_file_id = dbfull()->TEST_LogfileNumber();
std::string fname = LogFileName(dbname_, wal_file_id);
uint64_t offset_to_corrupt;
ASSERT_OK(env->GetFileSize(fname, &offset_to_corrupt));

@ -1413,8 +1413,12 @@ TEST_F(ExternalSSTFileTest, AddFileTrivialMoveBug) {
// fit in L3 but will overlap with compaction so will be added
// to L2 but a compaction will trivially move it to L3
// and break LSM consistency
static std::atomic<bool> called = {false};
if (!called) {
called = true;
ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}}));
ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7));
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();

@ -30,6 +30,7 @@ enum Tag {
kNewFile = 7,
// 8 was used for large value refs
kPrevLogNumber = 9,
kDeletedLogNumber = 10,
// these are new formats divergent from open source leveldb
kNewFile2 = 100,
@ -44,6 +45,11 @@ enum Tag {
enum CustomTag {
kTerminate = 1, // The end of customized fields
kNeedCompaction = 2,
// Since Manifest is not entirely currently forward-compatible, and the only
// forward-compatbile part is the CutsomtTag of kNewFile, we currently encode
// kDeletedLogNumber as part of a CustomTag as a hack. This should be removed
// when manifest becomes forward-comptabile.
kDeletedLogNumberHack = 3,
kPathId = 65,
};
// If this bit for the custom tag is set, opening DB should fail if
@ -63,12 +69,14 @@ void VersionEdit::Clear() {
last_sequence_ = 0;
next_file_number_ = 0;
max_column_family_ = 0;
deleted_log_number_ = 0;
has_comparator_ = false;
has_log_number_ = false;
has_prev_log_number_ = false;
has_next_file_number_ = false;
has_last_sequence_ = false;
has_max_column_family_ = false;
has_deleted_log_number_ = false;
deleted_files_.clear();
new_files_.clear();
column_family_ = 0;
@ -97,6 +105,24 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
if (has_max_column_family_) {
PutVarint32Varint32(dst, kMaxColumnFamily, max_column_family_);
}
if (has_deleted_log_number_) {
// TODO(myabandeh): uncomment me when manifest is forward-compatible
// PutVarint32Varint64(dst, kDeletedLogNumber, deleted_log_number_);
// Since currently manifest is not forward compatible we encode this entry
// disguised as a kNewFile4 entry which has forward-compatible extensions.
PutVarint32(dst, kNewFile4);
PutVarint32Varint64(dst, 0u, 0ull); // level and number
PutVarint64(dst, 0ull); // file size
InternalKey dummy_key(Slice("dummy_key"), 0ull, ValueType::kTypeValue);
PutLengthPrefixedSlice(dst, dummy_key.Encode()); // smallest
PutLengthPrefixedSlice(dst, dummy_key.Encode()); // largest
PutVarint64Varint64(dst, 0ull, 0ull); // smallest_seqno and largerst
PutVarint32(dst, CustomTag::kDeletedLogNumberHack);
std::string buf;
PutFixed64(&buf, deleted_log_number_);
PutLengthPrefixedSlice(dst, Slice(buf));
PutVarint32(dst, CustomTag::kTerminate);
}
for (const auto& deleted : deleted_files_) {
PutVarint32Varint32Varint64(dst, kDeletedFile, deleted.first /* level */,
@ -218,6 +244,10 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
uint64_t number;
uint32_t path_id = 0;
uint64_t file_size;
// Since this is the only forward-compatible part of the code, we hack new
// extension into this record. When we do, we set this boolean to distinguish
// the record from the normal NewFile records.
bool this_is_not_a_new_file_record = false;
if (GetLevel(input, &level, &msg) && GetVarint64(input, &number) &&
GetVarint64(input, &file_size) && GetInternalKey(input, &f.smallest) &&
GetInternalKey(input, &f.largest) &&
@ -252,6 +282,15 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
}
f.marked_for_compaction = (field[0] == 1);
break;
case kDeletedLogNumberHack:
// This is a hack to encode kDeletedLogNumber in a forward-compatbile
// fashion.
this_is_not_a_new_file_record = true;
if (!GetFixed64(&field, &deleted_log_number_)) {
return "deleted log number malformatted";
}
has_deleted_log_number_ = true;
break;
default:
if ((custom_tag & kCustomTagNonSafeIgnoreMask) != 0) {
// Should not proceed if cannot understand it
@ -263,6 +302,10 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
} else {
return "new-file4 entry";
}
if (this_is_not_a_new_file_record) {
// Since this has nothing to do with NewFile, return immediately.
return nullptr;
}
f.fd = FileDescriptor(number, path_id, file_size);
new_files_.push_back(std::make_pair(level, f));
return nullptr;
@ -331,6 +374,14 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
}
break;
case kDeletedLogNumber:
if (GetVarint64(&input, &deleted_log_number_)) {
has_deleted_log_number_ = true;
} else {
msg = "deleted log number";
}
break;
case kCompactPointer:
if (GetLevel(&input, &level, &msg) &&
GetInternalKey(&input, &key)) {
@ -513,6 +564,10 @@ std::string VersionEdit::DebugString(bool hex_key) const {
r.append("\n MaxColumnFamily: ");
AppendNumberTo(&r, max_column_family_);
}
if (has_deleted_log_number_) {
r.append("\n DeletedLogNumber: ");
AppendNumberTo(&r, deleted_log_number_);
}
r.append("\n}\n");
return r;
}
@ -582,6 +637,9 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const {
if (has_max_column_family_) {
jw << "MaxColumnFamily" << max_column_family_;
}
if (has_deleted_log_number_) {
jw << "DeletedLogNumber" << deleted_log_number_;
}
jw.EndObject();

@ -199,6 +199,10 @@ class VersionEdit {
has_max_column_family_ = true;
max_column_family_ = max_column_family;
}
void SetDeletedLogNumber(uint64_t num) {
has_deleted_log_number_ = true;
deleted_log_number_ = num;
}
// Add the specified file at the specified number.
// REQUIRES: This version has not been saved (see VersionSet::SaveTo)
@ -285,6 +289,8 @@ class VersionEdit {
uint64_t prev_log_number_;
uint64_t next_file_number_;
uint32_t max_column_family_;
// The most recent WAL log number that is deleted
uint64_t deleted_log_number_;
SequenceNumber last_sequence_;
bool has_comparator_;
bool has_log_number_;
@ -292,6 +298,7 @@ class VersionEdit {
bool has_next_file_number_;
bool has_last_sequence_;
bool has_max_column_family_;
bool has_deleted_log_number_;
DeletedFileSet deleted_files_;
std::vector<std::pair<int, FileMetaData>> new_files_;

@ -181,6 +181,16 @@ TEST_F(VersionEditTest, ColumnFamilyTest) {
TestEncodeDecode(edit);
}
TEST_F(VersionEditTest, DeletedLogNumber) {
VersionEdit edit;
edit.SetDeletedLogNumber(13);
TestEncodeDecode(edit);
edit.Clear();
edit.SetDeletedLogNumber(23);
TestEncodeDecode(edit);
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -3076,6 +3076,7 @@ Status VersionSet::Recover(
uint64_t log_number = 0;
uint64_t previous_log_number = 0;
uint32_t max_column_family = 0;
uint64_t deleted_log_number = 0;
std::unordered_map<uint32_t, BaseReferencedVersionBuilder*> builders;
// add default column family
@ -3216,6 +3217,11 @@ Status VersionSet::Recover(
max_column_family = edit.max_column_family_;
}
if (edit.has_deleted_log_number_) {
deleted_log_number =
std::max(deleted_log_number, edit.deleted_log_number_);
}
if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
@ -3238,6 +3244,7 @@ Status VersionSet::Recover(
column_family_set_->UpdateMaxColumnFamily(max_column_family);
MarkDeletedLogNumber(deleted_log_number);
MarkFileNumberUsed(previous_log_number);
MarkFileNumberUsed(log_number);
}
@ -3309,11 +3316,12 @@ Status VersionSet::Recover(
"manifest_file_number is %lu, next_file_number is %lu, "
"last_sequence is %lu, log_number is %lu,"
"prev_log_number is %lu,"
"max_column_family is %u\n",
"max_column_family is %u,"
"deleted_log_number is %lu\n",
manifest_filename.c_str(), (unsigned long)manifest_file_number_,
(unsigned long)next_file_number_.load(), (unsigned long)last_sequence_,
(unsigned long)log_number, (unsigned long)prev_log_number_,
column_family_set_->GetMaxColumnFamily());
column_family_set_->GetMaxColumnFamily(), latest_deleted_log_number());
for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
@ -3619,6 +3627,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
if (edit.has_max_column_family_) {
column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_);
}
if (edit.has_deleted_log_number_) {
MarkDeletedLogNumber(edit.deleted_log_number_);
}
}
}
file_reader.reset();
@ -3677,10 +3689,11 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
printf(
"next_file_number %lu last_sequence "
"%lu prev_log_number %lu max_column_family %u\n",
"%lu prev_log_number %lu max_column_family %u deleted_log_number "
"%" PRIu64 "\n",
(unsigned long)next_file_number_.load(), (unsigned long)last_sequence,
(unsigned long)previous_log_number,
column_family_set_->GetMaxColumnFamily());
column_family_set_->GetMaxColumnFamily(), latest_deleted_log_number());
}
return s;
@ -3695,6 +3708,14 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) {
}
}
// Called only either from ::LogAndApply which is protected by mutex or during
// recovery which is single-threaded.
void VersionSet::MarkDeletedLogNumber(uint64_t number) {
if (latest_deleted_log_number_.load(std::memory_order_relaxed) < number) {
latest_deleted_log_number_.store(number, std::memory_order_relaxed);
}
}
Status VersionSet::WriteSnapshot(log::Writer* log) {
// TODO: Break up into multiple records to reduce memory usage on recovery?

@ -759,6 +759,10 @@ class VersionSet {
uint64_t current_next_file_number() const { return next_file_number_.load(); }
uint64_t latest_deleted_log_number() const {
return latest_deleted_log_number_.load();
}
// Allocate and return a new file number
uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); }
@ -806,6 +810,11 @@ class VersionSet {
// REQUIRED: this is only called during single-threaded recovery or repair.
void MarkFileNumberUsed(uint64_t number);
// Mark the specified log number as deleted
// REQUIRED: this is only called during single-threaded recovery or repair, or
// from ::LogAndApply where the global mutex is held.
void MarkDeletedLogNumber(uint64_t number);
// Return the log file number for the log file that is currently
// being compacted, or zero if there is no such log file.
uint64_t prev_log_number() const { return prev_log_number_; }
@ -903,6 +912,8 @@ class VersionSet {
const std::string dbname_;
const ImmutableDBOptions* const db_options_;
std::atomic<uint64_t> next_file_number_;
// Any log number equal or lower than this should be ignored during recovery.
std::atomic<uint64_t> latest_deleted_log_number_ = {0};
uint64_t manifest_file_number_;
uint64_t options_file_number_;
uint64_t pending_manifest_file_number_;

Loading…
Cancel
Save