diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 87f894a7b..0e6dd0bf5 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -72,19 +72,23 @@ TEST_F(DBFlushTest, SyncFail) { auto* cfd = reinterpret_cast(db_->DefaultColumnFamily()) ->cfd(); - int refs_before = cfd->current()->TEST_refs(); FlushOptions flush_options; flush_options.wait = false; ASSERT_OK(dbfull()->Flush(flush_options)); + // Flush installs a new super-version. Get the ref count after that. + auto current_before = cfd->current(); + int refs_before = cfd->current()->TEST_refs(); fault_injection_env->SetFilesystemActive(false); TEST_SYNC_POINT("DBFlushTest::SyncFail:1"); TEST_SYNC_POINT("DBFlushTest::SyncFail:2"); fault_injection_env->SetFilesystemActive(true); + // Now the background job will do the flush; wait for it. dbfull()->TEST_WaitForFlushMemTable(); #ifndef ROCKSDB_LITE ASSERT_EQ("", FilesPerLevel()); // flush failed. #endif // ROCKSDB_LITE - // Flush job should release ref count to current version. + // Backgroun flush job should release ref count to current version. + ASSERT_EQ(current_before, cfd->current()); ASSERT_EQ(refs_before, cfd->current()->TEST_refs()); Destroy(options); } diff --git a/db/db_impl_files.cc b/db/db_impl_files.cc index 983787129..c3e698e98 100644 --- a/db/db_impl_files.cc +++ b/db/db_impl_files.cc @@ -332,6 +332,8 @@ bool CompareCandidateFile(const JobContext::CandidateFileInfo& first, }; // namespace // Delete obsolete files and log status and information of file deletion +// Note: All WAL files must be deleted through this function (unelss they are +// archived) to ensure that maniefest is updated properly. void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname, FileType type, uint64_t number, uint32_t path_id) { @@ -340,6 +342,16 @@ void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname, file_deletion_status = DeleteSSTFile(&immutable_db_options_, fname, path_id); } else { + if (type == kLogFile) { + // Before deleting the file, mark file as deleted in the manifest + VersionEdit edit; + edit.SetDeletedLogNumber(number); + auto edit_cfd = versions_->GetColumnFamilySet()->GetDefault(); + auto edit_cf_opts = edit_cfd->GetLatestMutableCFOptions(); + mutex_.Lock(); + versions_->LogAndApply(edit_cfd, *edit_cf_opts, &edit, &mutex_); + mutex_.Unlock(); + } file_deletion_status = env_->DeleteFile(fname); } TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl:AfterDeletion", diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 047a17b21..6e05af70a 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -529,6 +529,13 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, bool flushed = false; uint64_t corrupted_log_number = kMaxSequenceNumber; for (auto log_number : log_numbers) { + if (log_number <= versions_->latest_deleted_log_number()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Skipping log #%" PRIu64 + " since it is not newer than latest deleted log #%" PRIu64, + log_number, versions_->latest_deleted_log_number()); + continue; + } // The previous incarnation may not have written any MANIFEST // records after allocating this log number. So we manually // update the file number allocation counter in VersionSet. diff --git a/db/db_test_util.h b/db/db_test_util.h index 936823eff..0c3da91ce 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -451,8 +451,9 @@ class SpecialEnv : public EnvWrapper { return s; } - Status NewSequentialFile(const std::string& f, unique_ptr* r, - const EnvOptions& soptions) override { + virtual Status NewSequentialFile(const std::string& f, + unique_ptr* r, + const EnvOptions& soptions) override { class CountingFile : public SequentialFile { public: CountingFile(unique_ptr&& target, diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 796ef251c..0349bdc8d 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -20,6 +20,106 @@ class DBWALTest : public DBTestBase { DBWALTest() : DBTestBase("/db_wal_test") {} }; +// A SpecialEnv enriched to give more insight about deleted files +class EnrichedSpecialEnv : public SpecialEnv { + public: + explicit EnrichedSpecialEnv(Env* base) : SpecialEnv(base) {} + Status NewSequentialFile(const std::string& f, unique_ptr* r, + const EnvOptions& soptions) override { + InstrumentedMutexLock l(&env_mutex_); + if (f == skipped_wal) { + deleted_wal_reopened = true; + if (IsWAL(f) && largetest_deleted_wal.size() != 0 && + f.compare(largetest_deleted_wal) <= 0) { + gap_in_wals = true; + } + } + return SpecialEnv::NewSequentialFile(f, r, soptions); + } + Status DeleteFile(const std::string& fname) override { + if (IsWAL(fname)) { + deleted_wal_cnt++; + InstrumentedMutexLock l(&env_mutex_); + // If this is the first WAL, remember its name and skip deleting it. We + // remember its name partly because the application might attempt to + // delete the file again. + if (skipped_wal.size() != 0 && skipped_wal != fname) { + if (largetest_deleted_wal.size() == 0 || + largetest_deleted_wal.compare(fname) < 0) { + largetest_deleted_wal = fname; + } + } else { + skipped_wal = fname; + return Status::OK(); + } + } + return SpecialEnv::DeleteFile(fname); + } + bool IsWAL(const std::string& fname) { + // printf("iswal %s\n", fname.c_str()); + return fname.compare(fname.size() - 3, 3, "log") == 0; + } + + InstrumentedMutex env_mutex_; + // the wal whose actual delete was skipped by the env + std::string skipped_wal = ""; + // the largest WAL that was requested to be deleted + std::string largetest_deleted_wal = ""; + // number of WALs that were successfully deleted + std::atomic deleted_wal_cnt = {0}; + // the WAL whose delete from fs was skipped is reopened during recovery + std::atomic deleted_wal_reopened = {false}; + // whether a gap in the WALs was detected during recovery + std::atomic gap_in_wals = {false}; +}; + +class DBWALTestWithEnrichedEnv : public DBTestBase { + public: + DBWALTestWithEnrichedEnv() : DBTestBase("/db_wal_test") { + enriched_env_ = new EnrichedSpecialEnv(env_->target()); + auto options = CurrentOptions(); + options.env = enriched_env_; + Reopen(options); + delete env_; + // to be deleted by the parent class + env_ = enriched_env_; + } + + protected: + EnrichedSpecialEnv* enriched_env_; +}; + +// Test that the recovery would successfully avoid the gaps between the logs. +// One known scenario that could cause this is that the application issue the +// WAL deletion out of order. For the sake of simplicity in the test, here we +// create the gap by manipulating the env to skip deletion of the first WAL but +// not the ones after it. +TEST_F(DBWALTestWithEnrichedEnv, SkipDeletedWALs) { + auto options = last_options_; + // To cause frequent WAL deletion + options.write_buffer_size = 128; + Reopen(options); + + WriteOptions writeOpt = WriteOptions(); + for (int i = 0; i < 128 * 5; i++) { + ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1")); + } + FlushOptions fo; + fo.wait = true; + ASSERT_OK(db_->Flush(fo)); + + // some wals are deleted + ASSERT_NE(0, enriched_env_->deleted_wal_cnt); + // but not the first one + ASSERT_NE(0, enriched_env_->skipped_wal.size()); + + // Test that the WAL that was not deleted will be skipped during recovery + options = last_options_; + Reopen(options); + ASSERT_FALSE(enriched_env_->deleted_wal_reopened); + ASSERT_FALSE(enriched_env_->gap_in_wals); +} + TEST_F(DBWALTest, WAL) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); @@ -891,7 +991,7 @@ TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) { // Record the offset at this point Env* env = options.env; - int wal_file_id = RecoveryTestHelper::kWALFileOffset + 1; + uint64_t wal_file_id = dbfull()->TEST_LogfileNumber(); std::string fname = LogFileName(dbname_, wal_file_id); uint64_t offset_to_corrupt; ASSERT_OK(env->GetFileSize(fname, &offset_to_corrupt)); diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index fcdf07adc..aeaee8fc0 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -1413,8 +1413,12 @@ TEST_F(ExternalSSTFileTest, AddFileTrivialMoveBug) { // fit in L3 but will overlap with compaction so will be added // to L2 but a compaction will trivially move it to L3 // and break LSM consistency - ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}})); - ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7)); + static std::atomic called = {false}; + if (!called) { + called = true; + ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}})); + ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7)); + } }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); diff --git a/db/version_edit.cc b/db/version_edit.cc index ebfc10584..b966122a3 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -30,6 +30,7 @@ enum Tag { kNewFile = 7, // 8 was used for large value refs kPrevLogNumber = 9, + kDeletedLogNumber = 10, // these are new formats divergent from open source leveldb kNewFile2 = 100, @@ -44,6 +45,11 @@ enum Tag { enum CustomTag { kTerminate = 1, // The end of customized fields kNeedCompaction = 2, + // Since Manifest is not entirely currently forward-compatible, and the only + // forward-compatbile part is the CutsomtTag of kNewFile, we currently encode + // kDeletedLogNumber as part of a CustomTag as a hack. This should be removed + // when manifest becomes forward-comptabile. + kDeletedLogNumberHack = 3, kPathId = 65, }; // If this bit for the custom tag is set, opening DB should fail if @@ -63,12 +69,14 @@ void VersionEdit::Clear() { last_sequence_ = 0; next_file_number_ = 0; max_column_family_ = 0; + deleted_log_number_ = 0; has_comparator_ = false; has_log_number_ = false; has_prev_log_number_ = false; has_next_file_number_ = false; has_last_sequence_ = false; has_max_column_family_ = false; + has_deleted_log_number_ = false; deleted_files_.clear(); new_files_.clear(); column_family_ = 0; @@ -97,6 +105,24 @@ bool VersionEdit::EncodeTo(std::string* dst) const { if (has_max_column_family_) { PutVarint32Varint32(dst, kMaxColumnFamily, max_column_family_); } + if (has_deleted_log_number_) { + // TODO(myabandeh): uncomment me when manifest is forward-compatible + // PutVarint32Varint64(dst, kDeletedLogNumber, deleted_log_number_); + // Since currently manifest is not forward compatible we encode this entry + // disguised as a kNewFile4 entry which has forward-compatible extensions. + PutVarint32(dst, kNewFile4); + PutVarint32Varint64(dst, 0u, 0ull); // level and number + PutVarint64(dst, 0ull); // file size + InternalKey dummy_key(Slice("dummy_key"), 0ull, ValueType::kTypeValue); + PutLengthPrefixedSlice(dst, dummy_key.Encode()); // smallest + PutLengthPrefixedSlice(dst, dummy_key.Encode()); // largest + PutVarint64Varint64(dst, 0ull, 0ull); // smallest_seqno and largerst + PutVarint32(dst, CustomTag::kDeletedLogNumberHack); + std::string buf; + PutFixed64(&buf, deleted_log_number_); + PutLengthPrefixedSlice(dst, Slice(buf)); + PutVarint32(dst, CustomTag::kTerminate); + } for (const auto& deleted : deleted_files_) { PutVarint32Varint32Varint64(dst, kDeletedFile, deleted.first /* level */, @@ -218,6 +244,10 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) { uint64_t number; uint32_t path_id = 0; uint64_t file_size; + // Since this is the only forward-compatible part of the code, we hack new + // extension into this record. When we do, we set this boolean to distinguish + // the record from the normal NewFile records. + bool this_is_not_a_new_file_record = false; if (GetLevel(input, &level, &msg) && GetVarint64(input, &number) && GetVarint64(input, &file_size) && GetInternalKey(input, &f.smallest) && GetInternalKey(input, &f.largest) && @@ -252,6 +282,15 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) { } f.marked_for_compaction = (field[0] == 1); break; + case kDeletedLogNumberHack: + // This is a hack to encode kDeletedLogNumber in a forward-compatbile + // fashion. + this_is_not_a_new_file_record = true; + if (!GetFixed64(&field, &deleted_log_number_)) { + return "deleted log number malformatted"; + } + has_deleted_log_number_ = true; + break; default: if ((custom_tag & kCustomTagNonSafeIgnoreMask) != 0) { // Should not proceed if cannot understand it @@ -263,6 +302,10 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) { } else { return "new-file4 entry"; } + if (this_is_not_a_new_file_record) { + // Since this has nothing to do with NewFile, return immediately. + return nullptr; + } f.fd = FileDescriptor(number, path_id, file_size); new_files_.push_back(std::make_pair(level, f)); return nullptr; @@ -331,6 +374,14 @@ Status VersionEdit::DecodeFrom(const Slice& src) { } break; + case kDeletedLogNumber: + if (GetVarint64(&input, &deleted_log_number_)) { + has_deleted_log_number_ = true; + } else { + msg = "deleted log number"; + } + break; + case kCompactPointer: if (GetLevel(&input, &level, &msg) && GetInternalKey(&input, &key)) { @@ -513,6 +564,10 @@ std::string VersionEdit::DebugString(bool hex_key) const { r.append("\n MaxColumnFamily: "); AppendNumberTo(&r, max_column_family_); } + if (has_deleted_log_number_) { + r.append("\n DeletedLogNumber: "); + AppendNumberTo(&r, deleted_log_number_); + } r.append("\n}\n"); return r; } @@ -582,6 +637,9 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const { if (has_max_column_family_) { jw << "MaxColumnFamily" << max_column_family_; } + if (has_deleted_log_number_) { + jw << "DeletedLogNumber" << deleted_log_number_; + } jw.EndObject(); diff --git a/db/version_edit.h b/db/version_edit.h index 391e61434..f826b11bc 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -199,6 +199,10 @@ class VersionEdit { has_max_column_family_ = true; max_column_family_ = max_column_family; } + void SetDeletedLogNumber(uint64_t num) { + has_deleted_log_number_ = true; + deleted_log_number_ = num; + } // Add the specified file at the specified number. // REQUIRES: This version has not been saved (see VersionSet::SaveTo) @@ -285,6 +289,8 @@ class VersionEdit { uint64_t prev_log_number_; uint64_t next_file_number_; uint32_t max_column_family_; + // The most recent WAL log number that is deleted + uint64_t deleted_log_number_; SequenceNumber last_sequence_; bool has_comparator_; bool has_log_number_; @@ -292,6 +298,7 @@ class VersionEdit { bool has_next_file_number_; bool has_last_sequence_; bool has_max_column_family_; + bool has_deleted_log_number_; DeletedFileSet deleted_files_; std::vector> new_files_; diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index 338bb36f6..0fc4a5e75 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -181,6 +181,16 @@ TEST_F(VersionEditTest, ColumnFamilyTest) { TestEncodeDecode(edit); } +TEST_F(VersionEditTest, DeletedLogNumber) { + VersionEdit edit; + edit.SetDeletedLogNumber(13); + TestEncodeDecode(edit); + + edit.Clear(); + edit.SetDeletedLogNumber(23); + TestEncodeDecode(edit); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/version_set.cc b/db/version_set.cc index b7a62d5e7..d2bfcc8de 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3076,6 +3076,7 @@ Status VersionSet::Recover( uint64_t log_number = 0; uint64_t previous_log_number = 0; uint32_t max_column_family = 0; + uint64_t deleted_log_number = 0; std::unordered_map builders; // add default column family @@ -3216,6 +3217,11 @@ Status VersionSet::Recover( max_column_family = edit.max_column_family_; } + if (edit.has_deleted_log_number_) { + deleted_log_number = + std::max(deleted_log_number, edit.deleted_log_number_); + } + if (edit.has_last_sequence_) { last_sequence = edit.last_sequence_; have_last_sequence = true; @@ -3238,6 +3244,7 @@ Status VersionSet::Recover( column_family_set_->UpdateMaxColumnFamily(max_column_family); + MarkDeletedLogNumber(deleted_log_number); MarkFileNumberUsed(previous_log_number); MarkFileNumberUsed(log_number); } @@ -3309,11 +3316,12 @@ Status VersionSet::Recover( "manifest_file_number is %lu, next_file_number is %lu, " "last_sequence is %lu, log_number is %lu," "prev_log_number is %lu," - "max_column_family is %u\n", + "max_column_family is %u," + "deleted_log_number is %lu\n", manifest_filename.c_str(), (unsigned long)manifest_file_number_, (unsigned long)next_file_number_.load(), (unsigned long)last_sequence_, (unsigned long)log_number, (unsigned long)prev_log_number_, - column_family_set_->GetMaxColumnFamily()); + column_family_set_->GetMaxColumnFamily(), latest_deleted_log_number()); for (auto cfd : *column_family_set_) { if (cfd->IsDropped()) { @@ -3619,6 +3627,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, if (edit.has_max_column_family_) { column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_); } + + if (edit.has_deleted_log_number_) { + MarkDeletedLogNumber(edit.deleted_log_number_); + } } } file_reader.reset(); @@ -3677,10 +3689,11 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, printf( "next_file_number %lu last_sequence " - "%lu prev_log_number %lu max_column_family %u\n", + "%lu prev_log_number %lu max_column_family %u deleted_log_number " + "%" PRIu64 "\n", (unsigned long)next_file_number_.load(), (unsigned long)last_sequence, (unsigned long)previous_log_number, - column_family_set_->GetMaxColumnFamily()); + column_family_set_->GetMaxColumnFamily(), latest_deleted_log_number()); } return s; @@ -3695,6 +3708,14 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) { } } +// Called only either from ::LogAndApply which is protected by mutex or during +// recovery which is single-threaded. +void VersionSet::MarkDeletedLogNumber(uint64_t number) { + if (latest_deleted_log_number_.load(std::memory_order_relaxed) < number) { + latest_deleted_log_number_.store(number, std::memory_order_relaxed); + } +} + Status VersionSet::WriteSnapshot(log::Writer* log) { // TODO: Break up into multiple records to reduce memory usage on recovery? diff --git a/db/version_set.h b/db/version_set.h index 832857f63..bf92b200c 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -759,6 +759,10 @@ class VersionSet { uint64_t current_next_file_number() const { return next_file_number_.load(); } + uint64_t latest_deleted_log_number() const { + return latest_deleted_log_number_.load(); + } + // Allocate and return a new file number uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); } @@ -806,6 +810,11 @@ class VersionSet { // REQUIRED: this is only called during single-threaded recovery or repair. void MarkFileNumberUsed(uint64_t number); + // Mark the specified log number as deleted + // REQUIRED: this is only called during single-threaded recovery or repair, or + // from ::LogAndApply where the global mutex is held. + void MarkDeletedLogNumber(uint64_t number); + // Return the log file number for the log file that is currently // being compacted, or zero if there is no such log file. uint64_t prev_log_number() const { return prev_log_number_; } @@ -903,6 +912,8 @@ class VersionSet { const std::string dbname_; const ImmutableDBOptions* const db_options_; std::atomic next_file_number_; + // Any log number equal or lower than this should be ignored during recovery. + std::atomic latest_deleted_log_number_ = {0}; uint64_t manifest_file_number_; uint64_t options_file_number_; uint64_t pending_manifest_file_number_;