diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 8f65a5ca4..7041f7e65 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1383,6 +1383,7 @@ class DBImpl : public DB { Status ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options, WriteBatch* my_batch); + // REQUIRES: mutex locked and in write thread. Status ScheduleFlushes(WriteContext* context); void MaybeFlushStatsCF(autovector* cfds); @@ -1454,10 +1455,10 @@ class DBImpl : public DB { // REQUIRES: mutex locked and in write thread. void AssignAtomicFlushSeq(const autovector& cfds); - // REQUIRES: mutex locked + // REQUIRES: mutex locked and in write thread. Status SwitchWAL(WriteContext* write_context); - // REQUIRES: mutex locked + // REQUIRES: mutex locked and in write thread. Status HandleWriteBufferFull(WriteContext* write_context); // REQUIRES: mutex locked diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index 566c17573..c3a4c3525 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -24,7 +24,9 @@ uint64_t DBImpl::TEST_GetLevel0TotalSize() { void DBImpl::TEST_SwitchWAL() { WriteContext write_context; InstrumentedMutexLock l(&mutex_); + void* writer = TEST_BeginWrite(); SwitchWAL(&write_context); + TEST_EndWrite(writer); } bool DBImpl::TEST_WALBufferIsEmpty(bool lock) { @@ -106,15 +108,18 @@ Status DBImpl::TEST_SwitchMemtable(ColumnFamilyData* cfd) { cfd = default_cf_handle_->cfd(); } + Status s; + void* writer = TEST_BeginWrite(); if (two_write_queues_) { WriteThread::Writer nonmem_w; nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); - Status s = SwitchMemtable(cfd, &write_context); + s = SwitchMemtable(cfd, &write_context); nonmem_write_thread_.ExitUnbatched(&nonmem_w); - return s; } else { - return SwitchMemtable(cfd, &write_context); + s = SwitchMemtable(cfd, &write_context); } + TEST_EndWrite(writer); + return s; } Status DBImpl::TEST_FlushMemTable(bool wait, bool allow_write_stall, diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 81d7f3f16..de703138f 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1329,6 +1329,8 @@ Status DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number, recycle_log_number); std::string old_log_fname = LogFileName(immutable_db_options_.wal_dir, recycle_log_number); + TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile1"); + TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile2"); s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options, &lfile, /*dbg=*/nullptr); } else { diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 5bf008147..08468935d 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1631,7 +1631,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { if (creating_new_log && immutable_db_options_.recycle_log_file_num && !log_recycle_files_.empty()) { recycle_log_number = log_recycle_files_.front(); - log_recycle_files_.pop_front(); } uint64_t new_log_number = creating_new_log ? versions_->NewFileNumber() : logfile_number_; @@ -1668,6 +1667,14 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { ". Immutable memtables: %d.\n", cfd->GetName().c_str(), new_log_number, num_imm_unflushed); mutex_.Lock(); + if (recycle_log_number != 0) { + // Since renaming the file is done outside DB mutex, we need to ensure + // concurrent full purges don't delete the file while we're recycling it. + // To achieve that we hold the old log number in the recyclable list until + // after it has been renamed. + assert(log_recycle_files_.front() == recycle_log_number); + log_recycle_files_.pop_front(); + } if (s.ok() && creating_new_log) { log_write_mutex_.Lock(); assert(new_log != nullptr); diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index e9167bcbe..50c6f4258 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -557,6 +557,46 @@ TEST_F(DBWALTest, FullPurgePreservesRecycledLog) { } } +TEST_F(DBWALTest, FullPurgePreservesLogPendingReuse) { + // Ensures full purge cannot delete a WAL while it's in the process of being + // recycled. In particular, we force the full purge after a file has been + // chosen for reuse, but before it has been renamed. + for (int i = 0; i < 2; ++i) { + Options options = CurrentOptions(); + options.recycle_log_file_num = 1; + if (i != 0) { + options.wal_dir = alternative_wal_dir_; + } + DestroyAndReopen(options); + + // The first flush creates a second log so writes can continue before the + // flush finishes. + ASSERT_OK(Put("foo", "bar")); + ASSERT_OK(Flush()); + + // The second flush can recycle the first log. Sync points enforce the + // full purge happens after choosing the log to recycle and before it is + // renamed. + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::CreateWAL:BeforeReuseWritableFile1", + "DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge"}, + {"DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge", + "DBImpl::CreateWAL:BeforeReuseWritableFile2"}, + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + rocksdb::port::Thread thread([&]() { + TEST_SYNC_POINT( + "DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge"); + ASSERT_OK(db_->EnableFileDeletions(true)); + TEST_SYNC_POINT( + "DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge"); + }); + ASSERT_OK(Put("foo", "bar")); + ASSERT_OK(Flush()); + thread.join(); + } +} + TEST_F(DBWALTest, GetSortedWalFiles) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions());