diff --git a/db/db_impl.cc b/db/db_impl.cc index d3f445926..570928b1e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -464,8 +464,18 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, } } + // don't delete files that might be currently written to from compaction + // threads + if (!pending_outputs_.empty()) { + job_context->min_pending_output = *pending_outputs_.begin(); + } else { + // delete all of them + job_context->min_pending_output = std::numeric_limits::max(); + } + // get obsolete files - versions_->GetObsoleteFiles(&job_context->sst_delete_files); + versions_->GetObsoleteFiles(&job_context->sst_delete_files, + job_context->min_pending_output); // store the current filenum, lognum, etc job_context->manifest_file_number = versions_->manifest_file_number(); @@ -474,14 +484,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, job_context->log_number = versions_->MinLogNumber(); job_context->prev_log_number = versions_->prev_log_number(); - // don't delete live files - if (pending_outputs_.size()) { - job_context->min_pending_output = *pending_outputs_.begin(); - } else { - // delete all of them - job_context->min_pending_output = std::numeric_limits::max(); - } - versions_->AddLiveFiles(&job_context->sst_live); if (doing_the_full_scan) { for (uint32_t path_id = 0; path_id < db_options_.db_paths.size(); diff --git a/db/db_test.cc b/db/db_test.cc index 47227396b..b4e0a46d0 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -22,6 +22,7 @@ #include "db/job_context.h" #include "db/version_set.h" #include "db/write_batch_internal.h" +#include "port/stack_trace.h" #include "rocksdb/cache.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/db.h" @@ -10425,6 +10426,95 @@ TEST(DBTest, MutexWaitStats) { ThreadStatus::STATE_MUTEX_WAIT, 0); } +// This reproduces a bug where we don't delete a file because when it was +// supposed to be deleted, it was blocked by pending_outputs +// Consider: +// 1. current file_number is 13 +// 2. compaction (1) starts, blocks deletion of all files starting with 13 +// (pending outputs) +// 3. file 13 is created by compaction (2) +// 4. file 13 is consumed by compaction (3) and file 15 was created. Since file +// 13 has no references, it is put into VersionSet::obsolete_files_ +// 5. FindObsoleteFiles() gets file 13 from VersionSet::obsolete_files_. File 13 +// is deleted from obsolete_files_ set. +// 6. PurgeObsoleteFiles() tries to delete file 13, but this file is blocked by +// pending outputs since compaction (1) is still running. It is not deleted and +// it is not present in obsolete_files_ anymore. Therefore, we never delete it. +TEST(DBTest, DeleteObsoleteFilesPendingOutputs) { + Options options = CurrentOptions(); + options.env = env_; + options.write_buffer_size = 2 * 1024 * 1024; // 2 MB + options.max_bytes_for_level_base = 1024 * 1024; // 1 MB + options.level0_file_num_compaction_trigger = + 2; // trigger compaction when we have 2 files + options.max_background_flushes = 2; + options.max_background_compactions = 2; + Reopen(options); + + Random rnd(301); + // Create two 1MB sst files + for (int i = 0; i < 2; ++i) { + // Create 1MB sst file + for (int j = 0; j < 100; ++j) { + ASSERT_OK(Put(Key(i * 50 + j), RandomString(&rnd, 10 * 1024))); + } + ASSERT_OK(Flush()); + } + // this should execute both L0->L1 and L1->(move)->L2 compactions + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ("0,0,1", FilesPerLevel(0)); + + SleepingBackgroundTask blocking_thread; + port::Mutex mutex_; + bool already_blocked(false); + + // block the flush + std::function block_first_time = [&]() { + bool blocking = false; + { + MutexLock l(&mutex_); + if (!already_blocked) { + blocking = true; + already_blocked = true; + } + } + if (blocking) { + blocking_thread.DoSleep(); + } + }; + env_->table_write_callback_ = &block_first_time; + // Create 1MB sst file + for (int j = 0; j < 256; ++j) { + ASSERT_OK(Put(Key(j), RandomString(&rnd, 10 * 1024))); + } + // this should trigger a flush, which is blocked with block_first_time + // pending_file is protecting all the files created after + + ASSERT_OK(dbfull()->TEST_CompactRange(2, nullptr, nullptr)); + + ASSERT_EQ("0,0,0,1", FilesPerLevel(0)); + std::vector metadata; + db_->GetLiveFilesMetaData(&metadata); + ASSERT_EQ(metadata.size(), 1U); + auto file_on_L2 = metadata[0].name; + + ASSERT_OK(dbfull()->TEST_CompactRange(3, nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0)); + + // finish the flush! + blocking_thread.WakeUp(); + blocking_thread.WaitUntilDone(); + dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_EQ("1,0,0,0,1", FilesPerLevel(0)); + + metadata.clear(); + db_->GetLiveFilesMetaData(&metadata); + ASSERT_EQ(metadata.size(), 2U); + + // This file should have been deleted + ASSERT_TRUE(!env_->FileExists(dbname_ + "/" + file_on_L2)); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/version_set.cc b/db/version_set.cc index e3ec67a8f..09f45b7dc 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2772,9 +2772,17 @@ void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { } } -void VersionSet::GetObsoleteFiles(std::vector* files) { - files->insert(files->end(), obsolete_files_.begin(), obsolete_files_.end()); - obsolete_files_.clear(); +void VersionSet::GetObsoleteFiles(std::vector* files, + uint64_t min_pending_output) { + std::vector pending_files; + for (auto f : obsolete_files_) { + if (f->fd.GetNumber() < min_pending_output) { + files->push_back(f); + } else { + pending_files.push_back(f); + } + } + obsolete_files_.swap(pending_files); } ColumnFamilyData* VersionSet::CreateColumnFamily( diff --git a/db/version_set.h b/db/version_set.h index ca79aff4e..b00c9ce2b 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -590,7 +590,8 @@ class VersionSet { void GetLiveFilesMetaData(std::vector *metadata); - void GetObsoleteFiles(std::vector* files); + void GetObsoleteFiles(std::vector* files, + uint64_t min_pending_output); ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); } const EnvOptions& env_options() { return env_options_; }