diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 72f163da6..6426612fd 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -212,8 +212,8 @@ void BlobDBImpl::StartBackgroundTasks() { tqueue_.add(kSanityCheckPeriodMillisecs, std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1)); tqueue_.add( - kCheckSeqFilesPeriodMillisecs, - std::bind(&BlobDBImpl::CheckSeqFiles, this, std::placeholders::_1)); + kEvictExpiredFilesPeriodMillisecs, + std::bind(&BlobDBImpl::EvictExpiredFiles, this, std::placeholders::_1)); } Status BlobDBImpl::GetAllBlobFiles(std::set* file_numbers) { @@ -1282,29 +1282,38 @@ bool BlobDBImpl::VisibleToActiveSnapshot( return oldest_snapshot < obsolete_sequence; } -std::pair BlobDBImpl::CheckSeqFiles(bool aborted) { - if (aborted) return std::make_pair(false, -1); +std::pair BlobDBImpl::EvictExpiredFiles(bool aborted) { + if (aborted) { + return std::make_pair(false, -1); + } std::vector> process_files; + uint64_t now = EpochNow(); { - uint64_t epoch_now = EpochNow(); - ReadLock rl(&mutex_); - for (auto bfile : open_ttl_files_) { - { - ReadLock lockbfile_r(&bfile->mutex_); - - if (bfile->expiration_range_.second > epoch_now) { - continue; - } - process_files.push_back(bfile); + for (auto p : blob_files_) { + auto& blob_file = p.second; + ReadLock file_lock(&blob_file->mutex_); + if (blob_file->HasTTL() && !blob_file->Obsolete() && + blob_file->GetExpirationRange().second <= now) { + process_files.push_back(blob_file); } } } - MutexLock l(&write_mutex_); - for (auto bfile : process_files) { - CloseBlobFile(bfile); + SequenceNumber seq = GetLatestSequenceNumber(); + { + MutexLock l(&write_mutex_); + for (auto& blob_file : process_files) { + WriteLock file_lock(&blob_file->mutex_); + if (!blob_file->Immutable()) { + CloseBlobFile(blob_file, false /*need_lock*/); + } + // Need to double check if the file is obsolete. + if (!blob_file->Obsolete()) { + ObsoleteBlobFile(blob_file, seq, true /*update_size*/); + } + } } return std::make_pair(true, -1); @@ -1581,8 +1590,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, } // We don't add the file to open_ttl_files_ or open_non_ttl_files_, to - // avoid user writes writing to the file, and avoid CheckSeqFiles close - // the file by mistake. + // avoid user writes writing to the file, and avoid + // EvictExpiredFiles close the file by mistake. WriteLock wl(&mutex_); blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile)); } @@ -1851,6 +1860,10 @@ Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr& bfile, void BlobDBImpl::TEST_RunGC() { RunGC(false /*abort*/); } +void BlobDBImpl::TEST_EvictExpiredFiles() { + EvictExpiredFiles(false /*abort*/); +} + uint64_t BlobDBImpl::TEST_live_sst_size() { return live_sst_size_.load(); } #endif // !NDEBUG diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 0865b885f..0579a9ab4 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -115,8 +115,8 @@ class BlobDBImpl : public BlobDB { // how often to schedule delete obs files periods static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs = 10 * 1000; - // how often to schedule check seq files period - static constexpr uint32_t kCheckSeqFilesPeriodMillisecs = 10 * 1000; + // how often to schedule expired files eviction. + static constexpr uint32_t kEvictExpiredFilesPeriodMillisecs = 10 * 1000; // when should oldest file be evicted: // on reaching 90% of blob_dir_size @@ -204,6 +204,8 @@ class BlobDBImpl : public BlobDB { void TEST_RunGC(); + void TEST_EvictExpiredFiles(); + void TEST_DeleteObsoleteFiles(); uint64_t TEST_live_sst_size(); @@ -270,7 +272,7 @@ class BlobDBImpl : public BlobDB { // periodically check if open blob files and their TTL's has expired // if expired, close the sequential writer and make the file immutable - std::pair CheckSeqFiles(bool aborted); + std::pair EvictExpiredFiles(bool aborted); // if the number of open files, approaches ULIMIT's this // task will close random readers, which are kept around for diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index f00515c0c..99a2cb1ac 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -1385,6 +1385,36 @@ TEST_F(BlobDBTest, FilterForFIFOEviction) { VerifyDB(data_after_compact); } +// File should be evicted after expiration. +TEST_F(BlobDBTest, EvictExpiredFile) { + BlobDBOptions bdb_options; + bdb_options.ttl_range_secs = 100; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Options options; + options.env = mock_env_.get(); + Open(bdb_options, options); + mock_env_->set_current_time(50); + std::map data; + ASSERT_OK(PutWithTTL("foo", "bar", 100, &data)); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + auto blob_file = blob_files[0]; + ASSERT_FALSE(blob_file->Immutable()); + ASSERT_FALSE(blob_file->Obsolete()); + VerifyDB(data); + mock_env_->set_current_time(250); + // The key should expired now. + blob_db_impl()->TEST_EvictExpiredFiles(); + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size()); + ASSERT_TRUE(blob_file->Immutable()); + ASSERT_TRUE(blob_file->Obsolete()); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size()); +} + } // namespace blob_db } // namespace rocksdb