diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index bbae53c0e..6939e73aa 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -1560,7 +1560,9 @@ std::pair BlobDBImpl::CheckSeqFiles(bool aborted) { } } - for (auto bfile : process_files) CloseSeqWrite(bfile, false); + for (auto bfile : process_files) { + CloseSeqWrite(bfile, false); + } return std::make_pair(true, -1); } @@ -1909,7 +1911,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, // but under the asusmption that this is only called when a // file is Immutable, we can reduce the critical section bool BlobDBImpl::ShouldGCFile(std::shared_ptr bfile, uint64_t now, - uint64_t last_id, std::string* reason) { + bool is_oldest_simple_blob_file, + std::string* reason) { if (bfile->HasTTL()) { ttlrange_t ttl_range = bfile->GetTTLRange(); if (now > ttl_range.second) { @@ -1966,13 +1969,12 @@ bool BlobDBImpl::ShouldGCFile(std::shared_ptr bfile, uint64_t now, return false; } - bool ret = bfile->BlobFileNumber() == last_id; - if (ret) { - *reason = "eligible last simple blob file"; - } else { - *reason = "not eligible since not last simple blob file"; + if (is_oldest_simple_blob_file) { + *reason = "out of space and is the oldest simple blob file"; + return true; } - return ret; + *reason = "out of space but is not the oldest simple blob file"; + return false; } std::pair BlobDBImpl::DeleteObsFiles(bool aborted) { @@ -2096,31 +2098,27 @@ std::pair BlobDBImpl::CallbackEvicts( } void BlobDBImpl::CopyBlobFiles( - std::vector>* bfiles_copy, uint64_t* last_id) { + std::vector>* bfiles_copy) { ReadLock rl(&mutex_); // take a copy bfiles_copy->reserve(blob_files_.size()); - for (auto const& ent : blob_files_) { - bfiles_copy->push_back(ent.second); - - // A. has ttl is immutable, once set, hence no locks required - // B. blob files are sorted based on number(i.e. index of creation ) - // so we will return the last blob file - if (!ent.second->HasTTL()) *last_id = ent.second->BlobFileNumber(); + for (auto const& p : blob_files_) { + bfiles_copy->push_back(p.second); } } void BlobDBImpl::FilterSubsetOfFiles( const std::vector>& blob_files, std::vector>* to_process, uint64_t epoch, - uint64_t last_id, size_t files_to_collect) { + size_t files_to_collect) { // 100.0 / 15.0 = 7 uint64_t next_epoch_increment = static_cast( std::ceil(100 / static_cast(kGCFilePercentage))); uint64_t now = EpochNow(); size_t files_processed = 0; + bool simple_blob_file_found = false; for (auto bfile : blob_files) { if (files_processed >= files_to_collect) break; // if this is the first time processing the file @@ -2140,8 +2138,15 @@ void BlobDBImpl::FilterSubsetOfFiles( // then it should not be GC'd if (bfile->Obsolete() || !bfile->Immutable()) continue; + bool is_oldest_simple_blob_file = false; + if (!simple_blob_file_found && !bfile->HasTTL()) { + is_oldest_simple_blob_file = true; + simple_blob_file_found = true; + } + std::string reason; - bool shouldgc = ShouldGCFile(bfile, now, last_id, &reason); + bool shouldgc = + ShouldGCFile(bfile, now, is_oldest_simple_blob_file, &reason); if (!shouldgc) { ROCKS_LOG_DEBUG(db_options_.info_log, "File has been skipped for GC ttl %s %" PRIu64 " %" PRIu64 @@ -2165,11 +2170,8 @@ std::pair BlobDBImpl::RunGC(bool aborted) { current_epoch_++; - // collect the ID of the last regular file, in case we need to GC it. - uint64_t last_id = std::numeric_limits::max(); - std::vector> blob_files; - CopyBlobFiles(&blob_files, &last_id); + CopyBlobFiles(&blob_files); if (!blob_files.size()) return std::make_pair(true, -1); @@ -2178,7 +2180,7 @@ std::pair BlobDBImpl::RunGC(bool aborted) { size_t files_to_collect = (kGCFilePercentage * blob_files.size()) / 100; std::vector> to_process; - FilterSubsetOfFiles(blob_files, &to_process, current_epoch_, last_id, + FilterSubsetOfFiles(blob_files, &to_process, current_epoch_, files_to_collect); // in this collect the set of files, which became obsolete @@ -2288,6 +2290,16 @@ std::vector> BlobDBImpl::TEST_GetBlobFiles() const { return blob_files; } +std::vector> BlobDBImpl::TEST_GetObsoleteFiles() + const { + ReadLock l(&mutex_); + std::vector> obsolete_files; + for (auto& bfile : obsolete_files_) { + obsolete_files.emplace_back(bfile); + } + return obsolete_files; +} + void BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr& bfile) { CloseSeqWrite(bfile, false /*abort*/); } @@ -2296,6 +2308,8 @@ Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr& bfile, GCStats* gc_stats) { return GCFileAndUpdateLSM(bfile, gc_stats); } + +void BlobDBImpl::TEST_RunGC() { RunGC(false /*abort*/); } #endif // !NDEBUG } // namespace blob_db diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 6247fa22b..080834952 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -261,10 +261,14 @@ class BlobDBImpl : public BlobDB { std::vector> TEST_GetBlobFiles() const; + std::vector> TEST_GetObsoleteFiles() const; + void TEST_CloseBlobFile(std::shared_ptr& bfile); Status TEST_GCFileAndUpdateLSM(std::shared_ptr& bfile, GCStats* gc_stats); + + void TEST_RunGC(); #endif // !NDEBUG private: @@ -291,7 +295,7 @@ class BlobDBImpl : public BlobDB { // tt - current time // last_id - the id of the non-TTL file to evict bool ShouldGCFile(std::shared_ptr bfile, uint64_t now, - uint64_t last_id, std::string* reason); + bool is_oldest_simple_blob_file, std::string* reason); // collect all the blob log files from the blob directory Status GetAllLogFiles(std::set>* file_nums); @@ -403,13 +407,12 @@ class BlobDBImpl : public BlobDB { bool FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size, uint64_t blob_offset, uint64_t blob_size); - void CopyBlobFiles(std::vector>* bfiles_copy, - uint64_t* last_id); + void CopyBlobFiles(std::vector>* bfiles_copy); void FilterSubsetOfFiles( const std::vector>& blob_files, std::vector>* to_process, uint64_t epoch, - uint64_t last_id, size_t files_to_collect); + size_t files_to_collect); uint64_t EpochNow() { return env_->NowMicros() / 1000000; } @@ -445,7 +448,7 @@ class BlobDBImpl : public BlobDB { // Read Write Mutex, which protects all the data structures // HEAVILY TRAFFICKED - port::RWMutex mutex_; + mutable port::RWMutex mutex_; // Writers has to hold write_mutex_ before writing. mutable port::Mutex write_mutex_; @@ -454,7 +457,7 @@ class BlobDBImpl : public BlobDB { std::atomic next_file_number_; // entire metadata of all the BLOB files memory - std::unordered_map> blob_files_; + std::map> blob_files_; // epoch or version of the open files. std::atomic epoch_of_; diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index be42c395b..4b742157e 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -688,6 +688,41 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) { VerifyDB({{"foo", "v2"}}); } +TEST_F(BlobDBTest, GCOldestSimpleBlobFileWhenOutOfSpace) { + // Use mock env to stop wall clock. + Options options; + options.env = mock_env_.get(); + BlobDBOptions bdb_options; + bdb_options.blob_dir_size = 100; + bdb_options.blob_file_size = 100; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + std::string value(100, 'v'); + ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key_with_ttl", value, 60)); + for (int i = 0; i < 10; i++) { + ASSERT_OK(blob_db_->Put(WriteOptions(), "key" + ToString(i), value)); + } + BlobDBImpl *blob_db_impl = + static_cast_with_check(blob_db_); + auto blob_files = blob_db_impl->TEST_GetBlobFiles(); + ASSERT_EQ(11, blob_files.size()); + ASSERT_TRUE(blob_files[0]->HasTTL()); + ASSERT_TRUE(blob_files[0]->Immutable()); + blob_db_impl->TEST_CloseBlobFile(blob_files[0]); + for (int i = 1; i <= 10; i++) { + ASSERT_FALSE(blob_files[i]->HasTTL()); + if (i < 10) { + ASSERT_TRUE(blob_files[i]->Immutable()); + } + } + blob_db_impl->TEST_RunGC(); + // The oldest simple blob file (i.e. blob_files[1]) has been selected for GC. + auto obsolete_files = blob_db_impl->TEST_GetObsoleteFiles(); + ASSERT_EQ(1, obsolete_files.size()); + ASSERT_EQ(blob_files[1]->BlobFileNumber(), + obsolete_files[0]->BlobFileNumber()); +} + } // namespace blob_db } // namespace rocksdb