diff --git a/db/db_impl.cc b/db/db_impl.cc index 9448f804a..c60995986 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -350,12 +350,15 @@ DBImpl::~DBImpl() { if (opened_successfully_) { JobContext job_context(next_job_id_.fetch_add(1)); FindObsoleteFiles(&job_context, true); + + mutex_.Unlock(); // manifest number starting from 2 job_context.manifest_file_number = 1; if (job_context.HaveSomethingToDelete()) { PurgeObsoleteFiles(job_context); } job_context.Clean(); + mutex_.Lock(); } for (auto l : logs_to_free_) { @@ -520,7 +523,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, job_context->min_pending_output = std::numeric_limits::max(); } - // get obsolete files + // Get obsolete files. This function will also update the list of + // pending files in VersionSet(). versions_->GetObsoleteFiles(&job_context->sst_delete_files, job_context->min_pending_output); @@ -714,10 +718,10 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) { file_deletion_status.ToString().c_str()); } if (type == kTableFile) { - event_logger_.Log() << "job" << state.job_id << "event" - << "table_file_deletion" - << "file_number" << number - << "status" << file_deletion_status.ToString(); + EventHelpers::LogAndNotifyTableFileDeletion( + &event_logger_, state.job_id, number, fname, + file_deletion_status, GetName(), + db_options_.listeners); } } @@ -751,10 +755,13 @@ void DBImpl::DeleteObsoleteFiles() { mutex_.AssertHeld(); JobContext job_context(next_job_id_.fetch_add(1)); FindObsoleteFiles(&job_context, true); + + mutex_.Unlock(); if (job_context.HaveSomethingToDelete()) { PurgeObsoleteFiles(job_context); } job_context.Clean(); + mutex_.Lock(); } Status DBImpl::Directories::CreateAndNewDirectory( @@ -1433,7 +1440,7 @@ Status DBImpl::CompactFiles( // FindObsoleteFiles(). This is because job_context does not // catch all created files if compaction failed. FindObsoleteFiles(&job_context, !s.ok()); - } + } // release the mutex // delete unnecessary files if any, this is done outside the mutex if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { @@ -1444,6 +1451,7 @@ Status DBImpl::CompactFiles( // It also applies to access other states that DB owns. log_buffer.FlushBufferToLog(); if (job_context.HaveSomethingToDelete()) { + // no mutex is locked here. No need to Unlock() and Lock() here. PurgeObsoleteFiles(job_context); } job_context.Clean(); @@ -3948,9 +3956,11 @@ Status DBImpl::DeleteFile(std::string name) { } FindObsoleteFiles(&job_context, false); } // lock released here + LogFlush(db_options_.info_log); // remove files outside the db-lock if (job_context.HaveSomethingToDelete()) { + // Call PurgeObsoleteFiles() without holding mutex. PurgeObsoleteFiles(job_context); } job_context.Clean(); diff --git a/db/db_test.cc b/db/db_test.cc index 73d97c489..7e3c9ec82 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -11400,6 +11400,38 @@ TEST_F(DBTest, MigrateToDynamicLevelMaxBytesBase) { ASSERT_EQ(NumTableFilesAtLevel(2), 0); } +namespace { +class OnFileDeletionListener : public EventListener { + public: + OnFileDeletionListener() : + matched_count_(0), + expected_file_name_("") {} + + void SetExpectedFileName( + const std::string file_name) { + expected_file_name_ = file_name; + } + + void VerifyMatchedCount(size_t expected_value) { + ASSERT_EQ(matched_count_, expected_value); + } + + void OnTableFileDeleted( + const TableFileDeletionInfo& info) override { + if (expected_file_name_ != "") { + ASSERT_EQ(expected_file_name_, info.file_path); + expected_file_name_ = ""; + matched_count_++; + } + } + + private: + size_t matched_count_; + std::string expected_file_name_; +}; + +} // namespace + TEST_F(DBTest, DynamicLevelCompressionPerLevel) { if (!Snappy_Supported()) { return; @@ -11432,6 +11464,9 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel) { options.compression_per_level[1] = kNoCompression; options.compression_per_level[2] = kSnappyCompression; + OnFileDeletionListener* listener = new OnFileDeletionListener(); + options.listeners.emplace_back(listener); + DestroyAndReopen(options); // Insert more than 80K. L4 should be base level. Neither L0 nor L4 should @@ -11464,8 +11499,11 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel) { ColumnFamilyMetaData cf_meta; db_->GetColumnFamilyMetaData(&cf_meta); for (auto file : cf_meta.levels[4].files) { + listener->SetExpectedFileName(dbname_ + file.name); ASSERT_OK(dbfull()->DeleteFile(file.name)); } + listener->VerifyMatchedCount(cf_meta.levels[4].files.size()); + int num_keys = 0; std::unique_ptr iter(db_->NewIterator(ReadOptions())); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { @@ -12162,6 +12200,8 @@ TEST_F(DBTest, DeleteMovedFileAfterCompaction) { options.create_if_missing = true; options.level0_file_num_compaction_trigger = 2; // trigger compaction when we have 2 files + OnFileDeletionListener* listener = new OnFileDeletionListener(); + options.listeners.emplace_back(listener); DestroyAndReopen(options); Random rnd(301); @@ -12214,12 +12254,14 @@ TEST_F(DBTest, DeleteMovedFileAfterCompaction) { ASSERT_EQ("0,0,2", FilesPerLevel(0)); // iterator is holding the file - ASSERT_TRUE(env_->FileExists(dbname_ + "/" + moved_file_name)); + ASSERT_TRUE(env_->FileExists(dbname_ + moved_file_name)); + listener->SetExpectedFileName(dbname_ + moved_file_name); iterator.reset(); // this file should have been compacted away - ASSERT_TRUE(!env_->FileExists(dbname_ + "/" + moved_file_name)); + ASSERT_TRUE(!env_->FileExists(dbname_ + moved_file_name)); + listener->VerifyMatchedCount(1); } } @@ -12393,6 +12435,10 @@ TEST_F(DBTest, DeleteObsoleteFilesPendingOutputs) { 2; // trigger compaction when we have 2 files options.max_background_flushes = 2; options.max_background_compactions = 2; + + OnFileDeletionListener* listener = new OnFileDeletionListener(); + options.listeners.emplace_back(listener); + Reopen(options); Random rnd(301); @@ -12441,6 +12487,7 @@ TEST_F(DBTest, DeleteObsoleteFilesPendingOutputs) { db_->GetLiveFilesMetaData(&metadata); ASSERT_EQ(metadata.size(), 1U); auto file_on_L2 = metadata[0].name; + listener->SetExpectedFileName(dbname_ + file_on_L2); ASSERT_OK(dbfull()->TEST_CompactRange(3, nullptr, nullptr)); ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0)); @@ -12456,7 +12503,8 @@ TEST_F(DBTest, DeleteObsoleteFilesPendingOutputs) { ASSERT_EQ(metadata.size(), 2U); // This file should have been deleted - ASSERT_TRUE(!env_->FileExists(dbname_ + "/" + file_on_L2)); + ASSERT_TRUE(!env_->FileExists(dbname_ + file_on_L2)); + listener->VerifyMatchedCount(1); } TEST_F(DBTest, CloseSpeedup) { diff --git a/db/event_helpers.cc b/db/event_helpers.cc index 49b2c53c2..9035c0c4b 100644 --- a/db/event_helpers.cc +++ b/db/event_helpers.cc @@ -73,4 +73,36 @@ void EventHelpers::LogAndNotifyTableFileCreation( #endif // !ROCKSDB_LITE } +void EventHelpers::LogAndNotifyTableFileDeletion( + EventLogger* event_logger, int job_id, + uint64_t file_number, const std::string& file_path, + const Status& status, const std::string& dbname, + const std::vector>& listeners) { + + JSONWriter jwriter; + AppendCurrentTime(&jwriter); + + jwriter << "job" << job_id + << "event" << "table_file_deletion" + << "file_number" << file_number; + if (!status.ok()) { + jwriter << "status" << status.ToString(); + } + + jwriter.EndObject(); + + event_logger->Log(jwriter); + +#ifndef ROCKSDB_LITE + TableFileDeletionInfo info; + info.db_name = dbname; + info.job_id = job_id; + info.file_path = file_path; + info.status = status; + for (auto listener : listeners) { + listener->OnTableFileDeleted(info); + } +#endif // !ROCKSDB_LITE +} + } // namespace rocksdb diff --git a/db/event_helpers.h b/db/event_helpers.h index 0a3843151..a60bc9a9e 100644 --- a/db/event_helpers.h +++ b/db/event_helpers.h @@ -23,6 +23,11 @@ class EventHelpers { EventLogger* event_logger, const std::vector>& listeners, const FileDescriptor& fd, const TableFileCreationInfo& info); + static void LogAndNotifyTableFileDeletion( + EventLogger* event_logger, int job_id, + uint64_t file_number, const std::string& file_path, + const Status& status, const std::string& db_name, + const std::vector>& listeners); }; } // namespace rocksdb diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 65130b1a2..dfda708af 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -39,6 +39,17 @@ struct TableFileCreationInfo { #ifndef ROCKSDB_LITE +struct TableFileDeletionInfo { + // The name of the database where the file was deleted. + std::string db_name; + // The path to the deleted file. + std::string file_path; + // The id of the job which deleted the file. + int job_id; + // The status indicating whether the deletion was successfull or not. + Status status; +}; + struct CompactionJobInfo { CompactionJobInfo() = default; explicit CompactionJobInfo(const CompactionJobStats& _stats) : @@ -122,6 +133,20 @@ class EventListener { bool triggered_writes_slowdown, bool triggered_writes_stop) {} + // A call-back function for RocksDB which will be called whenever + // a SST file is deleted. Different from OnCompactionCompleted and + // OnFlushCompleted, this call-back is designed for external logging + // service and thus only provide string parameters instead + // of a pointer to DB. Applications that build logic basic based + // on file creations and deletions is suggested to implement + // OnFlushCompleted and OnCompactionCompleted. + // + // Note that if applications would like to use the passed reference + // outside this function call, they should make copies from the + // returned value. + virtual void OnTableFileDeleted( + const TableFileDeletionInfo& info) {} + // A call-back function for RocksDB which will be called whenever // a registered RocksDB compacts a file. The default implementation // is a no-op.