diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index ea68178f8..4d259eab1 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -10,6 +10,7 @@ #include #include #include +#include #include "db/blob_index.h" #include "db/db_impl/db_impl.h" @@ -82,6 +83,7 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname, env_options_(db_options), statistics_(db_options_.statistics.get()), next_file_number_(1), + flush_sequence_(0), epoch_of_(0), closed_(true), open_file_count_(0), @@ -206,6 +208,8 @@ Status BlobDBImpl::Open(std::vector* handles) { InitializeBlobFileToSstMapping(live_files); + MarkUnreferencedBlobFilesObsoleteDuringOpen(); + if (!disable_auto_compactions) { s = db_->EnableAutoCompaction(*handles); if (!s.ok()) { @@ -288,23 +292,25 @@ Status BlobDBImpl::OpenAllBlobFiles() { next_file_number_.store(*file_numbers.rbegin() + 1); } - std::string blob_file_list; - std::string obsolete_file_list; + std::ostringstream blob_file_oss; + std::ostringstream live_imm_oss; + std::ostringstream obsolete_file_oss; for (auto& file_number : file_numbers) { std::shared_ptr blob_file = std::make_shared( this, blob_dir_, file_number, db_options_.info_log.get()); - blob_file->MarkImmutable(); + blob_file->MarkImmutable(/* sequence */ 0); // Read file header and footer Status read_metadata_status = blob_file->ReadMetadata(env_, env_options_); if (read_metadata_status.IsCorruption()) { // Remove incomplete file. - ObsoleteBlobFile(blob_file, 0 /*obsolete_seq*/, false /*update_size*/); - if (!obsolete_file_list.empty()) { - obsolete_file_list.append(", "); + if (!obsolete_files_.empty()) { + obsolete_file_oss << ", "; } - obsolete_file_list.append(ToString(file_number)); + obsolete_file_oss << file_number; + + ObsoleteBlobFile(blob_file, 0 /*obsolete_seq*/, false /*update_size*/); continue; } else if (!read_metadata_status.ok()) { ROCKS_LOG_ERROR(db_options_.info_log, @@ -316,20 +322,33 @@ Status BlobDBImpl::OpenAllBlobFiles() { total_blob_size_ += blob_file->GetFileSize(); + if (!blob_files_.empty()) { + blob_file_oss << ", "; + } + blob_file_oss << file_number; + blob_files_[file_number] = blob_file; - if (!blob_file_list.empty()) { - blob_file_list.append(", "); + + if (!blob_file->HasTTL()) { + if (!live_imm_non_ttl_blob_files_.empty()) { + live_imm_oss << ", "; + } + live_imm_oss << file_number; + + live_imm_non_ttl_blob_files_[file_number] = blob_file; } - blob_file_list.append(ToString(file_number)); } ROCKS_LOG_INFO(db_options_.info_log, "Found %" ROCKSDB_PRIszt " blob files: %s", blob_files_.size(), - blob_file_list.c_str()); + blob_file_oss.str().c_str()); + ROCKS_LOG_INFO( + db_options_.info_log, "Found %" ROCKSDB_PRIszt " non-TTL blob files: %s", + live_imm_non_ttl_blob_files_.size(), live_imm_oss.str().c_str()); ROCKS_LOG_INFO(db_options_.info_log, "Found %" ROCKSDB_PRIszt " incomplete or corrupted blob files: %s", - obsolete_files_.size(), obsolete_file_list.c_str()); + obsolete_files_.size(), obsolete_file_oss.str().c_str()); return s; } @@ -426,14 +445,16 @@ void BlobDBImpl::InitializeBlobFileToSstMapping( void BlobDBImpl::ProcessFlushJobInfo(const FlushJobInfo& info) { assert(bdb_options_.enable_garbage_collection); - if (info.oldest_blob_file_number == kInvalidBlobFileNumber) { - return; - } + WriteLock lock(&mutex_); - { - ReadLock lock(&mutex_); + if (info.oldest_blob_file_number != kInvalidBlobFileNumber) { LinkSstToBlobFile(info.file_number, info.oldest_blob_file_number); } + + assert(flush_sequence_ < info.largest_seqno); + flush_sequence_ = info.largest_seqno; + + MarkUnreferencedBlobFilesObsolete(); } void BlobDBImpl::ProcessCompactionJobInfo(const CompactionJobInfo& info) { @@ -443,27 +464,107 @@ void BlobDBImpl::ProcessCompactionJobInfo(const CompactionJobInfo& info) { // file list in case of a trivial move. We process the inputs first // to ensure the blob file still has a link after processing all updates. - { - ReadLock lock(&mutex_); + WriteLock lock(&mutex_); - for (const auto& input : info.input_file_infos) { - if (input.oldest_blob_file_number == kInvalidBlobFileNumber) { - continue; - } + for (const auto& input : info.input_file_infos) { + if (input.oldest_blob_file_number == kInvalidBlobFileNumber) { + continue; + } - UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number); + UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number); + } + + for (const auto& output : info.output_file_infos) { + if (output.oldest_blob_file_number == kInvalidBlobFileNumber) { + continue; } - for (const auto& output : info.output_file_infos) { - if (output.oldest_blob_file_number == kInvalidBlobFileNumber) { - continue; - } + LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number); + } - LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number); + MarkUnreferencedBlobFilesObsolete(); +} + +bool BlobDBImpl::MarkBlobFileObsoleteIfNeeded( + const std::shared_ptr& blob_file, SequenceNumber obsolete_seq) { + assert(blob_file); + assert(!blob_file->HasTTL()); + assert(blob_file->Immutable()); + assert(bdb_options_.enable_garbage_collection); + + // Note: FIFO eviction could have marked this file obsolete already. + if (blob_file->Obsolete()) { + return true; + } + + // We cannot mark this file (or any higher-numbered files for that matter) + // obsolete if it is referenced by any memtables or SSTs. We keep track of + // the SSTs explicitly. To account for memtables, we keep track of the highest + // sequence number received in flush notifications, and we do not mark the + // blob file obsolete if there are still unflushed memtables from before + // the time the blob file was closed. + if (blob_file->GetImmutableSequence() > flush_sequence_ || + !blob_file->GetLinkedSstFiles().empty()) { + return false; + } + + ROCKS_LOG_INFO(db_options_.info_log, + "Blob file %" PRIu64 " is no longer needed, marking obsolete", + blob_file->BlobFileNumber()); + + ObsoleteBlobFile(blob_file, obsolete_seq, /* update_size */ true); + return true; +} + +template +void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed) { + assert(bdb_options_.enable_garbage_collection); + + // Iterate through all live immutable non-TTL blob files, and mark them + // obsolete assuming no SST files or memtables rely on the blobs in them. + // Note: we need to stop as soon as we find a blob file that has any + // linked SSTs (or one potentially referenced by memtables). + + auto it = live_imm_non_ttl_blob_files_.begin(); + while (it != live_imm_non_ttl_blob_files_.end()) { + const auto& blob_file = it->second; + assert(blob_file); + assert(blob_file->BlobFileNumber() == it->first); + assert(!blob_file->HasTTL()); + assert(blob_file->Immutable()); + + // Small optimization: Obsolete() does an atomic read, so we can do + // this check without taking a lock on the blob file's mutex. + if (blob_file->Obsolete()) { + it = live_imm_non_ttl_blob_files_.erase(it); + continue; + } + + if (!mark_if_needed(blob_file)) { + break; } + + it = live_imm_non_ttl_blob_files_.erase(it); } } +void BlobDBImpl::MarkUnreferencedBlobFilesObsolete() { + const SequenceNumber obsolete_seq = GetLatestSequenceNumber(); + + MarkUnreferencedBlobFilesObsoleteImpl( + [=](const std::shared_ptr& blob_file) { + WriteLock file_lock(&blob_file->mutex_); + return MarkBlobFileObsoleteIfNeeded(blob_file, obsolete_seq); + }); +} + +void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteDuringOpen() { + MarkUnreferencedBlobFilesObsoleteImpl( + [=](const std::shared_ptr& blob_file) { + return MarkBlobFileObsoleteIfNeeded(blob_file, /* obsolete_seq */ 0); + }); +} + void BlobDBImpl::CloseRandomAccessLocked( const std::shared_ptr& bfile) { bfile->CloseRandomAccessLocked(); @@ -1041,11 +1142,12 @@ Status BlobDBImpl::CheckSizeAndEvictBlobFiles(uint64_t blob_size, WriteLock file_lock(&blob_file->mutex_); if (blob_file->Obsolete()) { // File already obsoleted by someone else. + assert(blob_file->Immutable()); continue; } // FIFO eviction can evict open blob files. if (!blob_file->Immutable()) { - Status s = CloseBlobFile(blob_file, false /*need_lock*/); + Status s = CloseBlobFile(blob_file); if (!s.ok()) { return s; } @@ -1380,8 +1482,16 @@ std::pair BlobDBImpl::SanityCheck(bool aborted) { ROCKS_LOG_INFO(db_options_.info_log, "Number of open files %" ROCKSDB_PRIszt, open_ttl_files_.size()); - for (auto bfile : open_ttl_files_) { - assert(!bfile->Immutable()); + for (const auto& blob_file : open_ttl_files_) { + (void)blob_file; + assert(!blob_file->Immutable()); + } + + for (const auto& pair : live_imm_non_ttl_blob_files_) { + const auto& blob_file = pair.second; + (void)blob_file; + assert(!blob_file->HasTTL()); + assert(blob_file->Immutable()); } uint64_t now = EpochNow(); @@ -1423,58 +1533,75 @@ std::pair BlobDBImpl::SanityCheck(bool aborted) { return std::make_pair(true, -1); } -Status BlobDBImpl::CloseBlobFile(std::shared_ptr bfile, - bool need_lock) { - assert(bfile != nullptr); +Status BlobDBImpl::CloseBlobFile(std::shared_ptr bfile) { + assert(bfile); + assert(!bfile->Immutable()); + assert(!bfile->Obsolete()); write_mutex_.AssertHeld(); - Status s; + ROCKS_LOG_INFO(db_options_.info_log, "Closing blob file %" PRIu64 ". Path: %s", bfile->BlobFileNumber(), bfile->PathName().c_str()); - { - std::unique_ptr lock; - if (need_lock) { - lock.reset(new WriteLock(&mutex_)); - } - if (bfile->HasTTL()) { - size_t erased __attribute__((__unused__)); - erased = open_ttl_files_.erase(bfile); - } else if (bfile == open_non_ttl_file_) { - open_non_ttl_file_ = nullptr; - } - } + const SequenceNumber sequence = GetLatestSequenceNumber(); - if (!bfile->closed_.load()) { - std::unique_ptr file_lock; - if (need_lock) { - file_lock.reset(new WriteLock(&bfile->mutex_)); - } - s = bfile->WriteFooterAndCloseLocked(); - } + const Status s = bfile->WriteFooterAndCloseLocked(sequence); if (s.ok()) { total_blob_size_ += BlobLogFooter::kSize; } else { + bfile->MarkImmutable(sequence); + ROCKS_LOG_ERROR(db_options_.info_log, "Failed to close blob file %" PRIu64 "with error: %s", bfile->BlobFileNumber(), s.ToString().c_str()); } + if (bfile->HasTTL()) { + size_t erased __attribute__((__unused__)); + erased = open_ttl_files_.erase(bfile); + } else { + if (bfile == open_non_ttl_file_) { + open_non_ttl_file_ = nullptr; + } + + const uint64_t blob_file_number = bfile->BlobFileNumber(); + auto it = live_imm_non_ttl_blob_files_.lower_bound(blob_file_number); + assert(it == live_imm_non_ttl_blob_files_.end() || + it->first != blob_file_number); + live_imm_non_ttl_blob_files_.insert( + it, std::map>::value_type( + blob_file_number, bfile)); + } + return s; } Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr& bfile) { + write_mutex_.AssertHeld(); + // atomic read if (bfile->GetFileSize() < bdb_options_.blob_file_size) { return Status::OK(); } + + WriteLock lock(&mutex_); + WriteLock file_lock(&bfile->mutex_); + + assert(!bfile->Obsolete() || bfile->Immutable()); + if (bfile->Immutable()) { + return Status::OK(); + } + return CloseBlobFile(bfile); } void BlobDBImpl::ObsoleteBlobFile(std::shared_ptr blob_file, SequenceNumber obsolete_seq, bool update_size) { + assert(blob_file->Immutable()); + assert(!blob_file->Obsolete()); + // Should hold write lock of mutex_ or during DB open. blob_file->MarkObsolete(obsolete_seq); obsolete_files_.push_back(blob_file); @@ -1545,15 +1672,23 @@ std::pair BlobDBImpl::EvictExpiredFiles(bool aborted) { SequenceNumber seq = GetLatestSequenceNumber(); { MutexLock l(&write_mutex_); + WriteLock lock(&mutex_); for (auto& blob_file : process_files) { WriteLock file_lock(&blob_file->mutex_); - if (!blob_file->Immutable()) { - CloseBlobFile(blob_file, false /*need_lock*/); - } + // Need to double check if the file is obsolete. - if (!blob_file->Obsolete()) { - ObsoleteBlobFile(blob_file, seq, true /*update_size*/); + if (blob_file->Obsolete()) { + assert(blob_file->Immutable()); + continue; } + + if (!blob_file->Immutable()) { + CloseBlobFile(blob_file); + } + + assert(blob_file->Immutable()); + + ObsoleteBlobFile(blob_file, seq, true /*update_size*/); } } @@ -1918,6 +2053,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, if (newfile != nullptr) { { MutexLock l(&write_mutex_); + WriteLock lock(&mutex_); + WriteLock file_lock(&newfile->mutex_); CloseBlobFile(newfile); } total_blob_size_ += newfile->file_size_; @@ -2092,9 +2229,14 @@ Status BlobDBImpl::TEST_GetBlobValue(const Slice& key, const Slice& index_entry, return GetBlobValue(key, index_entry, value); } -void BlobDBImpl::TEST_AddDummyBlobFile(uint64_t blob_file_number) { - blob_files_[blob_file_number] = std::make_shared( - this, blob_dir_, blob_file_number, db_options_.info_log.get()); +void BlobDBImpl::TEST_AddDummyBlobFile(uint64_t blob_file_number, + SequenceNumber immutable_sequence) { + auto blob_file = std::make_shared(this, blob_dir_, blob_file_number, + db_options_.info_log.get()); + blob_file->MarkImmutable(immutable_sequence); + + blob_files_[blob_file_number] = blob_file; + live_imm_non_ttl_blob_files_[blob_file_number] = blob_file; } std::vector> BlobDBImpl::TEST_GetBlobFiles() const { @@ -2106,6 +2248,16 @@ std::vector> BlobDBImpl::TEST_GetBlobFiles() const { return blob_files; } +std::vector> BlobDBImpl::TEST_GetLiveImmNonTTLFiles() + const { + ReadLock l(&mutex_); + std::vector> live_imm_non_ttl_files; + for (const auto& pair : live_imm_non_ttl_blob_files_) { + live_imm_non_ttl_files.emplace_back(pair.second); + } + return live_imm_non_ttl_files; +} + std::vector> BlobDBImpl::TEST_GetObsoleteFiles() const { ReadLock l(&mutex_); @@ -2122,6 +2274,9 @@ void BlobDBImpl::TEST_DeleteObsoleteFiles() { Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr& bfile) { MutexLock l(&write_mutex_); + WriteLock lock(&mutex_); + WriteLock file_lock(&bfile->mutex_); + return CloseBlobFile(bfile); } diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 49e39c89f..17c97abf9 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -185,10 +185,13 @@ class BlobDBImpl : public BlobDB { Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry, PinnableSlice* value); - void TEST_AddDummyBlobFile(uint64_t blob_file_number); + void TEST_AddDummyBlobFile(uint64_t blob_file_number, + SequenceNumber immutable_sequence); std::vector> TEST_GetBlobFiles() const; + std::vector> TEST_GetLiveImmNonTTLFiles() const; + std::vector> TEST_GetObsoleteFiles() const; Status TEST_CloseBlobFile(std::shared_ptr& bfile); @@ -238,9 +241,12 @@ class BlobDBImpl : public BlobDB { std::string* compression_output) const; // Close a file by appending a footer, and removes file from open files list. - Status CloseBlobFile(std::shared_ptr bfile, bool need_lock = true); + // REQUIRES: lock held on write_mutex_, write lock held on both the db mutex_ + // and the blob file's mutex_. + Status CloseBlobFile(std::shared_ptr bfile); // Close a file if its size exceeds blob_file_size + // REQUIRES: lock held on write_mutex_. Status CloseBlobFileIfNeeded(std::shared_ptr& bfile); // Mark file as obsolete and move the file to obsolete file list. @@ -321,12 +327,30 @@ class BlobDBImpl : public BlobDB { void InitializeBlobFileToSstMapping( const std::vector& live_files); - // Update the mapping between blob files and SSTs after a flush. + // Update the mapping between blob files and SSTs after a flush and mark + // any unneeded blob files obsolete. void ProcessFlushJobInfo(const FlushJobInfo& info); - // Update the mapping between blob files and SSTs after a compaction. + // Update the mapping between blob files and SSTs after a compaction and + // mark any unneeded blob files obsolete. void ProcessCompactionJobInfo(const CompactionJobInfo& info); + // Mark an immutable non-TTL blob file obsolete assuming it has no more SSTs + // linked to it, and all memtables from before the blob file became immutable + // have been flushed. Note: should only be called if the condition holds for + // all lower-numbered non-TTL blob files as well. + bool MarkBlobFileObsoleteIfNeeded(const std::shared_ptr& blob_file, + SequenceNumber obsolete_seq); + + // Mark all immutable non-TTL blob files that aren't needed by any SSTs as + // obsolete. Comes in two varieties; the version used during Open need not + // worry about locking or snapshots. + template + void MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed); + + void MarkUnreferencedBlobFilesObsolete(); + void MarkUnreferencedBlobFilesObsoleteDuringOpen(); + void UpdateLiveSSTSize(); Status GetBlobFileReader(const std::shared_ptr& blob_file, @@ -404,6 +428,12 @@ class BlobDBImpl : public BlobDB { // entire metadata of all the BLOB files memory std::map> blob_files_; + // All live immutable non-TTL blob files. + std::map> live_imm_non_ttl_blob_files_; + + // The largest sequence number that has been flushed. + SequenceNumber flush_sequence_; + // epoch or version of the open files. std::atomic epoch_of_; diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index ce18f144e..a538ad0f6 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -1621,11 +1621,11 @@ TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) { Open(bdb_options); // Register some dummy blob files. - blob_db_impl()->TEST_AddDummyBlobFile(1); - blob_db_impl()->TEST_AddDummyBlobFile(2); - blob_db_impl()->TEST_AddDummyBlobFile(3); - blob_db_impl()->TEST_AddDummyBlobFile(4); - blob_db_impl()->TEST_AddDummyBlobFile(5); + blob_db_impl()->TEST_AddDummyBlobFile(1, /* immutable_sequence */ 200); + blob_db_impl()->TEST_AddDummyBlobFile(2, /* immutable_sequence */ 300); + blob_db_impl()->TEST_AddDummyBlobFile(3, /* immutable_sequence */ 400); + blob_db_impl()->TEST_AddDummyBlobFile(4, /* immutable_sequence */ 500); + blob_db_impl()->TEST_AddDummyBlobFile(5, /* immutable_sequence */ 600); // Initialize the blob <-> SST file mapping. First, add some SST files with // blob file references, then some without. @@ -1653,28 +1653,62 @@ TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) { ASSERT_EQ(blob_files.size(), 5); + { + auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles(); + ASSERT_EQ(live_imm_files.size(), 5); + for (size_t i = 0; i < 5; ++i) { + ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1); + } + + ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty()); + } + { const std::vector> expected_sst_files{ {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}}; + const std::vector expected_obsolete{false, false, false, false, + false}; for (size_t i = 0; i < 5; ++i) { const auto &blob_file = blob_files[i]; ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]); + ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]); + } + + auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles(); + ASSERT_EQ(live_imm_files.size(), 5); + for (size_t i = 0; i < 5; ++i) { + ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1); } + + ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty()); } // Simulate a flush where the SST does not reference any blob files. { FlushJobInfo info{}; info.file_number = 21; + info.smallest_seqno = 1; + info.largest_seqno = 100; blob_db_impl()->TEST_ProcessFlushJobInfo(info); const std::vector> expected_sst_files{ {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}}; + const std::vector expected_obsolete{false, false, false, false, + false}; for (size_t i = 0; i < 5; ++i) { const auto &blob_file = blob_files[i]; ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]); + ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]); } + + auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles(); + ASSERT_EQ(live_imm_files.size(), 5); + for (size_t i = 0; i < 5; ++i) { + ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1); + } + + ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty()); } // Simulate a flush where the SST references a blob file. @@ -1682,40 +1716,130 @@ TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) { FlushJobInfo info{}; info.file_number = 22; info.oldest_blob_file_number = 5; + info.smallest_seqno = 101; + info.largest_seqno = 200; blob_db_impl()->TEST_ProcessFlushJobInfo(info); const std::vector> expected_sst_files{ {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10, 22}}; + const std::vector expected_obsolete{false, false, false, false, + false}; for (size_t i = 0; i < 5; ++i) { const auto &blob_file = blob_files[i]; ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]); + ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]); } + + auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles(); + ASSERT_EQ(live_imm_files.size(), 5); + for (size_t i = 0; i < 5; ++i) { + ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1); + } + + ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty()); } // Simulate a compaction. Some inputs and outputs have blob file references, // some don't. There is also a trivial move (which means the SST appears on - // both the input and the output list). + // both the input and the output list). Blob file 1 loses all its linked SSTs, + // and since it got marked immutable at sequence number 200 which has already + // been flushed, it can be marked obsolete. { CompactionJobInfo info{}; info.input_file_infos.emplace_back(CompactionFileInfo{1, 1, 1}); info.input_file_infos.emplace_back(CompactionFileInfo{1, 2, 2}); + info.input_file_infos.emplace_back(CompactionFileInfo{1, 6, 1}); info.input_file_infos.emplace_back( CompactionFileInfo{1, 11, kInvalidBlobFileNumber}); - info.input_file_infos.emplace_back(CompactionFileInfo{1, 5, 22}); + info.input_file_infos.emplace_back(CompactionFileInfo{1, 22, 5}); + info.output_file_infos.emplace_back(CompactionFileInfo{2, 22, 5}); info.output_file_infos.emplace_back(CompactionFileInfo{2, 23, 3}); info.output_file_infos.emplace_back( CompactionFileInfo{2, 24, kInvalidBlobFileNumber}); - info.output_file_infos.emplace_back(CompactionFileInfo{2, 5, 22}); blob_db_impl()->TEST_ProcessCompactionJobInfo(info); const std::vector> expected_sst_files{ - {6}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}}; + {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}}; + const std::vector expected_obsolete{true, false, false, false, false}; + for (size_t i = 0; i < 5; ++i) { + const auto &blob_file = blob_files[i]; + ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]); + ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]); + } + + auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles(); + ASSERT_EQ(live_imm_files.size(), 4); + for (size_t i = 0; i < 4; ++i) { + ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2); + } + + auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); + ASSERT_EQ(obsolete_files.size(), 1); + ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1); + } + + // Simulate another compaction. Blob file 2 loses all its linked SSTs + // but since it got marked immutable at sequence number 300 which hasn't + // been flushed yet, it cannot be marked obsolete at this point. + { + CompactionJobInfo info{}; + info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2}); + info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5}); + info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3}); + + blob_db_impl()->TEST_ProcessCompactionJobInfo(info); + + const std::vector> expected_sst_files{ + {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}}; + const std::vector expected_obsolete{true, false, false, false, false}; for (size_t i = 0; i < 5; ++i) { const auto &blob_file = blob_files[i]; ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]); + ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]); } + + auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles(); + ASSERT_EQ(live_imm_files.size(), 4); + for (size_t i = 0; i < 4; ++i) { + ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2); + } + + auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); + ASSERT_EQ(obsolete_files.size(), 1); + ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1); + } + + // Simulate a flush with largest sequence number 300. This will make it + // possible to mark blob file 2 obsolete. + { + FlushJobInfo info{}; + info.file_number = 26; + info.smallest_seqno = 201; + info.largest_seqno = 300; + + blob_db_impl()->TEST_ProcessFlushJobInfo(info); + + const std::vector> expected_sst_files{ + {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}}; + const std::vector expected_obsolete{true, true, false, false, false}; + for (size_t i = 0; i < 5; ++i) { + const auto &blob_file = blob_files[i]; + ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]); + ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]); + } + + auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles(); + ASSERT_EQ(live_imm_files.size(), 3); + for (size_t i = 0; i < 3; ++i) { + ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 3); + } + + auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); + ASSERT_EQ(obsolete_files.size(), 2); + ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1); + ASSERT_EQ(obsolete_files[1]->BlobFileNumber(), 2); } } diff --git a/utilities/blob_db/blob_file.cc b/utilities/blob_db/blob_file.cc index 14bfb4309..a3a037ac8 100644 --- a/utilities/blob_db/blob_file.cc +++ b/utilities/blob_db/blob_file.cc @@ -35,7 +35,9 @@ BlobFile::BlobFile() blob_count_(0), file_size_(0), closed_(false), + immutable_sequence_(0), obsolete_(false), + obsolete_sequence_(0), expiration_range_({0, 0}), last_access_(-1), last_fsync_(0), @@ -54,7 +56,9 @@ BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn, blob_count_(0), file_size_(0), closed_(false), + immutable_sequence_(0), obsolete_(false), + obsolete_sequence_(0), expiration_range_({0, 0}), last_access_(-1), last_fsync_(0), @@ -125,7 +129,7 @@ bool BlobFile::NeedsFsync(bool hard, uint64_t bytes_per_sync) const { : (file_size_ - last_fsync_) >= bytes_per_sync; } -Status BlobFile::WriteFooterAndCloseLocked() { +Status BlobFile::WriteFooterAndCloseLocked(SequenceNumber sequence) { BlobLogFooter footer; footer.blob_count = blob_count_; if (HasTTL()) { @@ -136,6 +140,7 @@ Status BlobFile::WriteFooterAndCloseLocked() { Status s = log_writer_->AppendFooter(footer); if (s.ok()) { closed_ = true; + immutable_sequence_ = sequence; file_size_ += BlobLogFooter::kSize; } // delete the sequential writer diff --git a/utilities/blob_db/blob_file.h b/utilities/blob_db/blob_file.h index 534676199..06ada8635 100644 --- a/utilities/blob_db/blob_file.h +++ b/utilities/blob_db/blob_file.h @@ -68,6 +68,9 @@ class BlobFile { // no more blobs will be appended and the footer has been written out std::atomic closed_; + // The latest sequence number when the file was closed/made immutable. + SequenceNumber immutable_sequence_; + // has a pass of garbage collection successfully finished on this file // obsolete_ still needs to do iterator/snapshot checks std::atomic obsolete_; @@ -98,8 +101,6 @@ class BlobFile { bool footer_valid_; - SequenceNumber garbage_collection_finish_sequence_; - public: BlobFile(); @@ -153,7 +154,15 @@ class BlobFile { // Mark the file as immutable. // REQUIRES: write lock held, or access from single thread (on DB open). - void MarkImmutable() { closed_ = true; } + void MarkImmutable(SequenceNumber sequence) { + closed_ = true; + immutable_sequence_ = sequence; + } + + SequenceNumber GetImmutableSequence() const { + assert(Immutable()); + return immutable_sequence_; + } // if the file has gone through GC and blobs have been relocated bool Obsolete() const { @@ -216,7 +225,7 @@ class BlobFile { Status ReadFooter(BlobLogFooter* footer); - Status WriteFooterAndCloseLocked(); + Status WriteFooterAndCloseLocked(SequenceNumber sequence); void CloseRandomAccessLocked();