diff --git a/file/sst_file_manager_impl.cc b/file/sst_file_manager_impl.cc index 02005846e..48567d907 100644 --- a/file/sst_file_manager_impl.cc +++ b/file/sst_file_manager_impl.cc @@ -418,7 +418,8 @@ bool SstFileManagerImpl::CancelErrorRecovery(ErrorHandler* handler) { Status SstFileManagerImpl::ScheduleFileDeletion( const std::string& file_path, const std::string& path_to_sync, const bool force_bg) { - TEST_SYNC_POINT("SstFileManagerImpl::ScheduleFileDeletion"); + TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::ScheduleFileDeletion", + const_cast(&file_path)); return delete_scheduler_.DeleteFile(file_path, path_to_sync, force_bg); } diff --git a/utilities/blob_db/blob_compaction_filter.cc b/utilities/blob_db/blob_compaction_filter.cc index cb7adf64b..7b37e2e1e 100644 --- a/utilities/blob_db/blob_compaction_filter.cc +++ b/utilities/blob_db/blob_compaction_filter.cc @@ -32,8 +32,7 @@ CompactionFilter::Decision BlobIndexCompactionFilterBase::FilterV2( if (!blob_index.IsInlined() && blob_index.file_number() < context_.next_file_number && context_.current_blob_files.count(blob_index.file_number()) == 0) { - // Corresponding blob file gone. Could have been garbage collected or - // evicted by FIFO eviction. + // Corresponding blob file gone (most likely, evicted by FIFO eviction). evicted_count_++; evicted_size_ += key.size() + value.size(); return Decision::kRemove; diff --git a/utilities/blob_db/blob_db.h b/utilities/blob_db/blob_db.h index 25f10f14a..0616a8879 100644 --- a/utilities/blob_db/blob_db.h +++ b/utilities/blob_db/blob_db.h @@ -20,8 +20,6 @@ namespace blob_db { // A wrapped database which puts values of KV pairs in a separate log // and store location to the log in the underlying DB. -// It lacks lots of importatant functionalities, e.g. DB restarts, -// garbage collection, iterators, etc. // // The factory needs to be moved to include/rocksdb/utilities to allow // users to use blob DB. diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index d98c4f231..7ef5f73fa 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -940,7 +940,6 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key, const Slice& value, uint64_t expiration) { StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS); RecordTick(statistics_, BLOB_DB_NUM_PUT); - TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start"); Status s; WriteBatch batch; { @@ -953,7 +952,6 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key, if (s.ok()) { s = db_->Write(options, &batch); } - TEST_SYNC_POINT("BlobDBImpl::PutUntil:Finish"); return s; } @@ -1531,8 +1529,6 @@ Status BlobDBImpl::GetImpl(const ReadOptions& read_options, get_impl_options.value = &index_entry; get_impl_options.is_blob_index = &is_blob_index; s = db_impl_->GetImpl(ro, key, get_impl_options); - TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:1"); - TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:2"); if (expiration != nullptr) { *expiration = kNoExpiration; } @@ -1836,321 +1832,6 @@ std::pair BlobDBImpl::ReclaimOpenFiles(bool aborted) { return std::make_pair(true, -1); } -// Write callback for garbage collection to check if key has been updated -// since last read. Similar to how OptimisticTransaction works. See inline -// comment in GCFileAndUpdateLSM(). -class BlobDBImpl::GarbageCollectionWriteCallback : public WriteCallback { - public: - GarbageCollectionWriteCallback(ColumnFamilyData* cfd, const Slice& key, - SequenceNumber upper_bound) - : cfd_(cfd), key_(key), upper_bound_(upper_bound) {} - - Status Callback(DB* db) override { - auto* db_impl = static_cast_with_check(db); - auto* sv = db_impl->GetAndRefSuperVersion(cfd_); - SequenceNumber latest_seq = 0; - bool found_record_for_key = false; - bool is_blob_index = false; - Status s = db_impl->GetLatestSequenceForKey( - sv, key_, false /*cache_only*/, 0 /*lower_bound_seq*/, &latest_seq, - &found_record_for_key, &is_blob_index); - db_impl->ReturnAndCleanupSuperVersion(cfd_, sv); - if (!s.ok() && !s.IsNotFound()) { - // Error. - assert(!s.IsBusy()); - return s; - } - if (s.IsNotFound()) { - assert(!found_record_for_key); - return Status::Busy("Key deleted"); - } - assert(found_record_for_key); - assert(is_blob_index); - if (latest_seq > upper_bound_) { - return Status::Busy("Key overwritten"); - } - return s; - } - - bool AllowWriteBatching() override { return false; } - - private: - ColumnFamilyData* cfd_; - // Key to check - Slice key_; - // Upper bound of sequence number to proceed. - SequenceNumber upper_bound_; -}; - -// iterate over the blobs sequentially and check if the blob sequence number -// is the latest. If it is the latest, preserve it, otherwise delete it -// if it is TTL based, and the TTL has expired, then -// we can blow the entity if the key is still the latest or the Key is not -// found -// WHAT HAPPENS IF THE KEY HAS BEEN OVERRIDEN. Then we can drop the blob -// without doing anything if the earliest snapshot is not -// referring to that sequence number, i.e. it is later than the sequence number -// of the new key -// -// if it is not TTL based, then we can blow the key if the key has been -// DELETED in the LSM -Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, - GCStats* gc_stats) { - StopWatch gc_sw(env_, statistics_, BLOB_DB_GC_MICROS); - uint64_t now = EpochNow(); - - std::shared_ptr reader = - bfptr->OpenRandomAccessReader(env_, db_options_, env_options_); - if (!reader) { - ROCKS_LOG_ERROR(db_options_.info_log, - "File sequential reader could not be opened for %s", - bfptr->PathName().c_str()); - return Status::IOError("failed to create sequential reader"); - } - - BlobLogHeader header; - Status s = reader->ReadHeader(&header); - if (!s.ok()) { - ROCKS_LOG_ERROR(db_options_.info_log, - "Failure to read header for blob-file %s", - bfptr->PathName().c_str()); - return s; - } - - auto cfh = db_impl_->DefaultColumnFamily(); - auto* cfd = reinterpret_cast(cfh)->cfd(); - auto column_family_id = cfd->GetID(); - bool has_ttl = header.has_ttl; - - // this reads the key but skips the blob - Reader::ReadLevel shallow = Reader::kReadHeaderKey; - - ExpirationRange expiration_range; - - { - ReadLock file_lock(&bfptr->mutex_); - expiration_range = bfptr->GetExpirationRange(); - } - - bool file_expired = has_ttl && now >= expiration_range.second; - - if (!file_expired) { - // read the blob because you have to write it back to new file - shallow = Reader::kReadHeaderKeyBlob; - } - - BlobLogRecord record; - std::shared_ptr newfile; - std::shared_ptr new_writer; - uint64_t blob_offset = 0; - - while (true) { - assert(s.ok()); - - // Read the next blob record. - Status read_record_status = - reader->ReadRecord(&record, shallow, &blob_offset); - // Exit if we reach the end of blob file. - // TODO(yiwu): properly handle ReadRecord error. - if (!read_record_status.ok()) { - break; - } - gc_stats->blob_count++; - - // Similar to OptimisticTransaction, we obtain latest_seq from - // base DB, which is guaranteed to be no smaller than the sequence of - // current key. We use a WriteCallback on write to check the key sequence - // on write. If the key sequence is larger than latest_seq, we know - // a new versions is inserted and the old blob can be disgard. - // - // We cannot use OptimisticTransaction because we need to pass - // is_blob_index flag to GetImpl. - SequenceNumber latest_seq = GetLatestSequenceNumber(); - bool is_blob_index = false; - PinnableSlice index_entry; - DBImpl::GetImplOptions get_impl_options; - get_impl_options.column_family = cfh; - get_impl_options.value = &index_entry; - get_impl_options.is_blob_index = &is_blob_index; - Status get_status = - db_impl_->GetImpl(ReadOptions(), record.key, get_impl_options); - TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB"); - if (!get_status.ok() && !get_status.IsNotFound()) { - // error - s = get_status; - ROCKS_LOG_ERROR(db_options_.info_log, - "Error while getting index entry: %s", - s.ToString().c_str()); - break; - } - if (get_status.IsNotFound() || !is_blob_index) { - // Either the key is deleted or updated with a newer version whish is - // inlined in LSM. - gc_stats->num_keys_overwritten++; - gc_stats->bytes_overwritten += record.record_size(); - continue; - } - - BlobIndex blob_index; - s = blob_index.DecodeFrom(index_entry); - if (!s.ok()) { - ROCKS_LOG_ERROR(db_options_.info_log, - "Error while decoding index entry: %s", - s.ToString().c_str()); - break; - } - if (blob_index.IsInlined() || - blob_index.file_number() != bfptr->BlobFileNumber() || - blob_index.offset() != blob_offset) { - // Key has been overwritten. Drop the blob record. - gc_stats->num_keys_overwritten++; - gc_stats->bytes_overwritten += record.record_size(); - continue; - } - - GarbageCollectionWriteCallback callback(cfd, record.key, latest_seq); - - // If key has expired, remove it from base DB. - // TODO(yiwu): Blob indexes will be remove by BlobIndexCompactionFilter. - // We can just drop the blob record. - if (file_expired || (has_ttl && now >= record.expiration)) { - gc_stats->num_keys_expired++; - gc_stats->bytes_expired += record.record_size(); - TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"); - WriteBatch delete_batch; - Status delete_status = delete_batch.Delete(record.key); - if (delete_status.ok()) { - delete_status = db_impl_->WriteWithCallback(WriteOptions(), - &delete_batch, &callback); - } - if (!delete_status.ok() && !delete_status.IsBusy()) { - // We hit an error. - s = delete_status; - ROCKS_LOG_ERROR(db_options_.info_log, - "Error while deleting expired key: %s", - s.ToString().c_str()); - break; - } - // Continue to next blob record or retry. - continue; - } - - // Relocate the blob record to new file. - if (!newfile) { - // new file - std::string reason("GC of "); - reason += bfptr->PathName(); - newfile = NewBlobFile(bfptr->HasTTL(), bfptr->expiration_range_, reason); - - s = CheckOrCreateWriterLocked(newfile, &new_writer); - if (!s.ok()) { - ROCKS_LOG_ERROR(db_options_.info_log, - "Failed to open file %s for writer, error: %s", - newfile->PathName().c_str(), s.ToString().c_str()); - break; - } - newfile->file_size_ = BlobLogHeader::kSize; - - s = new_writer->WriteHeader(newfile->header_); - if (!s.ok()) { - ROCKS_LOG_ERROR(db_options_.info_log, - "File: %s - header writing failed", - newfile->PathName().c_str()); - break; - } - - // We don't add the file to open_ttl_files_ or open_non_ttl_files_, to - // avoid user writes writing to the file, and avoid - // EvictExpiredFiles close the file by mistake. - WriteLock wl(&mutex_); - blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile)); - } - - std::string new_index_entry; - uint64_t new_blob_offset = 0; - uint64_t new_key_offset = 0; - // write the blob to the blob log. - s = new_writer->AddRecord(record.key, record.value, record.expiration, - &new_key_offset, &new_blob_offset); - - BlobIndex::EncodeBlob(&new_index_entry, newfile->BlobFileNumber(), - new_blob_offset, record.value.size(), - bdb_options_.compression); - - newfile->blob_count_++; - newfile->file_size_ += - BlobLogRecord::kHeaderSize + record.key.size() + record.value.size(); - - TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"); - WriteBatch rewrite_batch; - Status rewrite_status = WriteBatchInternal::PutBlobIndex( - &rewrite_batch, column_family_id, record.key, new_index_entry); - if (rewrite_status.ok()) { - rewrite_status = db_impl_->WriteWithCallback(WriteOptions(), - &rewrite_batch, &callback); - } - if (rewrite_status.ok()) { - gc_stats->num_keys_relocated++; - gc_stats->bytes_relocated += record.record_size(); - } else if (rewrite_status.IsBusy()) { - // The key is overwritten in the meanwhile. Drop the blob record. - gc_stats->num_keys_overwritten++; - gc_stats->bytes_overwritten += record.record_size(); - } else { - // We hit an error. - s = rewrite_status; - ROCKS_LOG_ERROR(db_options_.info_log, "Error while relocating key: %s", - s.ToString().c_str()); - break; - } - } // end of ReadRecord loop - - { - WriteLock wl(&mutex_); - ObsoleteBlobFile(bfptr, GetLatestSequenceNumber(), true /*update_size*/); - } - - ROCKS_LOG_INFO( - db_options_.info_log, - "%s blob file %" PRIu64 ". Total blob records: %" PRIu64 - ", expired: %" PRIu64 " keys/%" PRIu64 - " bytes, updated or deleted by user: %" PRIu64 " keys/%" PRIu64 - " bytes, rewrite to new file: %" PRIu64 " keys/%" PRIu64 " bytes.", - s.ok() ? "Successfully garbage collected" : "Failed to garbage collect", - bfptr->BlobFileNumber(), gc_stats->blob_count, gc_stats->num_keys_expired, - gc_stats->bytes_expired, gc_stats->num_keys_overwritten, - gc_stats->bytes_overwritten, gc_stats->num_keys_relocated, - gc_stats->bytes_relocated); - RecordTick(statistics_, BLOB_DB_GC_NUM_FILES); - RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_OVERWRITTEN, - gc_stats->num_keys_overwritten); - RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_EXPIRED, - gc_stats->num_keys_expired); - RecordTick(statistics_, BLOB_DB_GC_BYTES_OVERWRITTEN, - gc_stats->bytes_overwritten); - RecordTick(statistics_, BLOB_DB_GC_BYTES_EXPIRED, gc_stats->bytes_expired); - if (newfile != nullptr) { - { - MutexLock l(&write_mutex_); - WriteLock lock(&mutex_); - WriteLock file_lock(&newfile->mutex_); - CloseBlobFile(newfile); - } - total_blob_size_ += newfile->file_size_; - ROCKS_LOG_INFO(db_options_.info_log, "New blob file %" PRIu64 ".", - newfile->BlobFileNumber()); - RecordTick(statistics_, BLOB_DB_GC_NUM_NEW_FILES); - RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_RELOCATED, - gc_stats->num_keys_relocated); - RecordTick(statistics_, BLOB_DB_GC_BYTES_RELOCATED, - gc_stats->bytes_relocated); - } - if (!s.ok()) { - RecordTick(statistics_, BLOB_DB_GC_FAILURES); - } - return s; -} - std::pair BlobDBImpl::DeleteObsoleteFiles(bool aborted) { if (aborted) { return std::make_pair(false, -1); @@ -2240,17 +1921,6 @@ void BlobDBImpl::CopyBlobFiles( } } -std::pair BlobDBImpl::RunGC(bool aborted) { - if (aborted) { - return std::make_pair(false, -1); - } - - // TODO(yiwu): Garbage collection implementation. - - // reschedule - return std::make_pair(true, -1); -} - Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) { auto* cfd = reinterpret_cast(DefaultColumnFamily())->cfd(); @@ -2365,13 +2035,6 @@ void BlobDBImpl::TEST_ObsoleteBlobFile(std::shared_ptr& blob_file, return ObsoleteBlobFile(blob_file, obsolete_seq, update_size); } -Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr& bfile, - GCStats* gc_stats) { - return GCFileAndUpdateLSM(bfile, gc_stats); -} - -void BlobDBImpl::TEST_RunGC() { RunGC(false /*abort*/); } - void BlobDBImpl::TEST_EvictExpiredFiles() { EvictExpiredFiles(false /*abort*/); } diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index aecdd36ed..2b475a30d 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -60,20 +60,11 @@ struct BlobFileComparator { const std::shared_ptr& rhs) const; }; -struct GCStats { - uint64_t blob_count = 0; - uint64_t num_keys_overwritten = 0; - uint64_t num_keys_expired = 0; - uint64_t num_keys_relocated = 0; - uint64_t bytes_overwritten = 0; - uint64_t bytes_expired = 0; - uint64_t bytes_relocated = 0; -}; - /** - * The implementation class for BlobDB. This manages the value - * part in TTL aware sequentially written files. These files are - * Garbage Collected. + * The implementation class for BlobDB. It manages the blob logs, which + * are sequentially written files. Blob logs can be of the TTL or non-TTL + * varieties; the former are cleaned up when they expire, while the latter + * are (optionally) garbage collected. */ class BlobDBImpl : public BlobDB { friend class BlobFile; @@ -86,12 +77,6 @@ class BlobDBImpl : public BlobDB { // deletions check period static constexpr uint32_t kDeleteCheckPeriodMillisecs = 2 * 1000; - // gc percentage each check period - static constexpr uint32_t kGCFilePercentage = 100; - - // gc period - static constexpr uint32_t kGCCheckPeriodMillisecs = 60 * 1000; - // sanity check task static constexpr uint32_t kSanityCheckPeriodMillisecs = 20 * 60 * 1000; @@ -208,11 +193,6 @@ class BlobDBImpl : public BlobDB { SequenceNumber obsolete_seq = 0, bool update_size = true); - Status TEST_GCFileAndUpdateLSM(std::shared_ptr& bfile, - GCStats* gc_stats); - - void TEST_RunGC(); - void TEST_EvictExpiredFiles(); void TEST_DeleteObsoleteFiles(); @@ -231,7 +211,6 @@ class BlobDBImpl : public BlobDB { #endif // !NDEBUG private: - class GarbageCollectionWriteCallback; class BlobInserter; // Create a snapshot if there isn't one in read options. @@ -300,14 +279,10 @@ class BlobDBImpl : public BlobDB { // periodic sanity check. Bunch of checks std::pair SanityCheck(bool aborted); - // delete files which have been garbage collected and marked - // obsolete. Check whether any snapshots exist which refer to - // the same + // Delete files that have been marked obsolete (either because of TTL + // or GC). Check whether any snapshots exist which refer to the same. std::pair DeleteObsoleteFiles(bool aborted); - // Major task to garbage collect expired and deleted blobs - std::pair RunGC(bool aborted); - // periodically check if open blob files and their TTL's has expired // if expired, close the sequential writer and make the file immutable std::pair EvictExpiredFiles(bool aborted); @@ -398,12 +373,6 @@ class BlobDBImpl : public BlobDB { Status CheckOrCreateWriterLocked(const std::shared_ptr& blob_file, std::shared_ptr* writer); - // Iterate through keys and values on Blob and write into - // separate file the remaining blobs and delete/update pointers - // in LSM atomically - Status GCFileAndUpdateLSM(const std::shared_ptr& bfptr, - GCStats* gcstats); - // checks if there is no snapshot which is referencing the // blobs bool VisibleToActiveSnapshot(const std::shared_ptr& file); diff --git a/utilities/blob_db/blob_db_iterator.h b/utilities/blob_db/blob_db_iterator.h index f332445de..529d81529 100644 --- a/utilities/blob_db/blob_db_iterator.h +++ b/utilities/blob_db/blob_db_iterator.h @@ -117,8 +117,6 @@ class BlobDBIterator : public Iterator { private: // Return true if caller should continue to next value. bool UpdateBlobValue() { - TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:1"); - TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:2"); value_.Reset(); status_ = Status::OK(); if (iter_->Valid() && iter_->status().ok() && iter_->IsBlob()) { diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 0edcb9bb9..b8e11ea66 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -341,10 +341,6 @@ TEST_F(BlobDBTest, PutWithTTL) { ASSERT_EQ(1, blob_files.size()); ASSERT_TRUE(blob_files[0]->HasTTL()); ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0])); - GCStats gc_stats; - ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); - ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired); - ASSERT_EQ(data.size(), gc_stats.num_keys_relocated); VerifyDB(data); } @@ -371,10 +367,6 @@ TEST_F(BlobDBTest, PutUntil) { ASSERT_EQ(1, blob_files.size()); ASSERT_TRUE(blob_files[0]->HasTTL()); ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0])); - GCStats gc_stats; - ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); - ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired); - ASSERT_EQ(data.size(), gc_stats.num_keys_relocated); VerifyDB(data); } @@ -588,243 +580,6 @@ TEST_F(BlobDBTest, MultipleWriters) { VerifyDB(data); } -TEST_F(BlobDBTest, GCAfterOverwriteKeys) { - Random rnd(301); - BlobDBOptions bdb_options; - bdb_options.min_blob_size = 0; - bdb_options.disable_background_tasks = true; - Open(bdb_options); - DBImpl *db_impl = static_cast_with_check(blob_db_->GetBaseDB()); - std::map data; - for (int i = 0; i < 200; i++) { - PutRandom("key" + ToString(i), &rnd, &data); - } - auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); - ASSERT_EQ(1, blob_files.size()); - ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); - // Test for data in SST - size_t new_keys = 0; - for (int i = 0; i < 100; i++) { - if (rnd.Next() % 2 == 1) { - new_keys++; - PutRandom("key" + ToString(i), &rnd, &data); - } - } - db_impl->TEST_FlushMemTable(true /*wait*/); - // Test for data in memtable - for (int i = 100; i < 200; i++) { - if (rnd.Next() % 2 == 1) { - new_keys++; - PutRandom("key" + ToString(i), &rnd, &data); - } - } - GCStats gc_stats; - ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); - ASSERT_EQ(200, gc_stats.blob_count); - ASSERT_EQ(0, gc_stats.num_keys_expired); - ASSERT_EQ(200 - new_keys, gc_stats.num_keys_relocated); - VerifyDB(data); -} - -TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) { - Random rnd(301); - BlobDBOptions bdb_options; - bdb_options.min_blob_size = 0; - bdb_options.disable_background_tasks = true; - Open(bdb_options); - ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v1")); - auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); - ASSERT_EQ(1, blob_files.size()); - ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); - - SyncPoint::GetInstance()->LoadDependency( - {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB", - "BlobDBImpl::PutUntil:Start"}, - {"BlobDBImpl::PutUntil:Finish", - "BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"}}); - SyncPoint::GetInstance()->EnableProcessing(); - - auto writer = port::Thread( - [this]() { ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v2")); }); - - GCStats gc_stats; - ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); - ASSERT_EQ(1, gc_stats.blob_count); - ASSERT_EQ(0, gc_stats.num_keys_expired); - ASSERT_EQ(1, gc_stats.num_keys_overwritten); - ASSERT_EQ(0, gc_stats.num_keys_relocated); - writer.join(); - VerifyDB({{"foo", "v2"}}); -} - -TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) { - Random rnd(301); - Options options; - options.env = mock_env_.get(); - BlobDBOptions bdb_options; - bdb_options.min_blob_size = 0; - bdb_options.disable_background_tasks = true; - Open(bdb_options, options); - mock_env_->set_current_time(100); - ASSERT_OK(blob_db_->PutUntil(WriteOptions(), "foo", "v1", 200)); - auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); - ASSERT_EQ(1, blob_files.size()); - ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); - mock_env_->set_current_time(300); - - SyncPoint::GetInstance()->LoadDependency( - {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB", - "BlobDBImpl::PutUntil:Start"}, - {"BlobDBImpl::PutUntil:Finish", - "BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"}}); - SyncPoint::GetInstance()->EnableProcessing(); - - auto writer = port::Thread([this]() { - ASSERT_OK(blob_db_->PutUntil(WriteOptions(), "foo", "v2", 400)); - }); - - GCStats gc_stats; - ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); - ASSERT_EQ(1, gc_stats.blob_count); - ASSERT_EQ(1, gc_stats.num_keys_expired); - ASSERT_EQ(0, gc_stats.num_keys_relocated); - writer.join(); - VerifyDB({{"foo", "v2"}}); -} - -TEST_F(BlobDBTest, NewFileGeneratedFromGCShouldMarkAsImmutable) { - BlobDBOptions bdb_options; - bdb_options.min_blob_size = 0; - bdb_options.disable_background_tasks = true; - Open(bdb_options); - ASSERT_OK(Put("foo", "bar")); - auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); - auto blob_file1 = blob_files[0]; - ASSERT_EQ(1, blob_files.size()); - ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file1)); - GCStats gc_stats; - ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_file1, &gc_stats)); - ASSERT_EQ(1, gc_stats.blob_count); - ASSERT_EQ(1, gc_stats.num_keys_relocated); - blob_files = blob_db_impl()->TEST_GetBlobFiles(); - ASSERT_EQ(2, blob_files.size()); - ASSERT_EQ(blob_file1, blob_files[0]); - ASSERT_TRUE(blob_files[1]->Immutable()); -} - -// This test is no longer valid since we now return an error when we go -// over the configured max_db_size. -// The test needs to be re-written later in such a way that writes continue -// after a GC happens. -TEST_F(BlobDBTest, DISABLED_GCOldestSimpleBlobFileWhenOutOfSpace) { - // Use mock env to stop wall clock. - Options options; - options.env = mock_env_.get(); - BlobDBOptions bdb_options; - bdb_options.max_db_size = 100; - bdb_options.blob_file_size = 100; - bdb_options.min_blob_size = 0; - bdb_options.disable_background_tasks = true; - Open(bdb_options); - std::string value(100, 'v'); - ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key_with_ttl", value, 60)); - for (int i = 0; i < 10; i++) { - ASSERT_OK(blob_db_->Put(WriteOptions(), "key" + ToString(i), value)); - } - auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); - ASSERT_EQ(11, blob_files.size()); - ASSERT_TRUE(blob_files[0]->HasTTL()); - ASSERT_TRUE(blob_files[0]->Immutable()); - for (int i = 1; i <= 10; i++) { - ASSERT_FALSE(blob_files[i]->HasTTL()); - if (i < 10) { - ASSERT_TRUE(blob_files[i]->Immutable()); - } - } - blob_db_impl()->TEST_RunGC(); - // The oldest simple blob file (i.e. blob_files[1]) has been selected for GC. - auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); - ASSERT_EQ(1, obsolete_files.size()); - ASSERT_EQ(blob_files[1]->BlobFileNumber(), - obsolete_files[0]->BlobFileNumber()); -} - -TEST_F(BlobDBTest, ReadWhileGC) { - // run the same test for Get(), MultiGet() and Iterator each. - for (int i = 0; i < 2; i++) { - BlobDBOptions bdb_options; - bdb_options.min_blob_size = 0; - bdb_options.disable_background_tasks = true; - Open(bdb_options); - blob_db_->Put(WriteOptions(), "foo", "bar"); - auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); - ASSERT_EQ(1, blob_files.size()); - std::shared_ptr bfile = blob_files[0]; - uint64_t bfile_number = bfile->BlobFileNumber(); - ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile)); - - switch (i) { - case 0: - SyncPoint::GetInstance()->LoadDependency( - {{"BlobDBImpl::Get:AfterIndexEntryGet:1", - "BlobDBTest::ReadWhileGC:1"}, - {"BlobDBTest::ReadWhileGC:2", - "BlobDBImpl::Get:AfterIndexEntryGet:2"}}); - break; - case 1: - SyncPoint::GetInstance()->LoadDependency( - {{"BlobDBIterator::UpdateBlobValue:Start:1", - "BlobDBTest::ReadWhileGC:1"}, - {"BlobDBTest::ReadWhileGC:2", - "BlobDBIterator::UpdateBlobValue:Start:2"}}); - break; - } - SyncPoint::GetInstance()->EnableProcessing(); - - auto reader = port::Thread([this, i]() { - std::string value; - std::vector values; - std::vector statuses; - switch (i) { - case 0: - ASSERT_OK(blob_db_->Get(ReadOptions(), "foo", &value)); - ASSERT_EQ("bar", value); - break; - case 1: - // VerifyDB use iterator to scan the DB. - VerifyDB({{"foo", "bar"}}); - break; - } - }); - - TEST_SYNC_POINT("BlobDBTest::ReadWhileGC:1"); - GCStats gc_stats; - ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats)); - ASSERT_EQ(1, gc_stats.blob_count); - ASSERT_EQ(1, gc_stats.num_keys_relocated); - blob_db_impl()->TEST_DeleteObsoleteFiles(); - // The file shouln't be deleted - blob_files = blob_db_impl()->TEST_GetBlobFiles(); - ASSERT_EQ(2, blob_files.size()); - ASSERT_EQ(bfile_number, blob_files[0]->BlobFileNumber()); - auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); - ASSERT_EQ(1, obsolete_files.size()); - ASSERT_EQ(bfile_number, obsolete_files[0]->BlobFileNumber()); - TEST_SYNC_POINT("BlobDBTest::ReadWhileGC:2"); - reader.join(); - SyncPoint::GetInstance()->DisableProcessing(); - - // The file is deleted this time - blob_db_impl()->TEST_DeleteObsoleteFiles(); - blob_files = blob_db_impl()->TEST_GetBlobFiles(); - ASSERT_EQ(1, blob_files.size()); - ASSERT_NE(bfile_number, blob_files[0]->BlobFileNumber()); - ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size()); - VerifyDB({{"foo", "bar"}}); - Destroy(); - } -} - TEST_F(BlobDBTest, SstFileManager) { // run the same test for Get(), MultiGet() and Iterator each. std::shared_ptr sst_file_manager( @@ -835,16 +590,20 @@ TEST_F(BlobDBTest, SstFileManager) { BlobDBOptions bdb_options; bdb_options.min_blob_size = 0; + bdb_options.enable_garbage_collection = true; + bdb_options.garbage_collection_cutoff = 1.0; Options db_options; - int files_deleted_directly = 0; int files_scheduled_to_delete = 0; rocksdb::SyncPoint::GetInstance()->SetCallBack( - "SstFileManagerImpl::ScheduleFileDeletion", - [&](void * /*arg*/) { files_scheduled_to_delete++; }); - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DeleteScheduler::DeleteFile", - [&](void * /*arg*/) { files_deleted_directly++; }); + "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) { + assert(arg); + const std::string *const file_path = + static_cast(arg); + if (file_path->find(".blob") != std::string::npos) { + ++files_scheduled_to_delete; + } + }); SyncPoint::GetInstance()->EnableProcessing(); db_options.sst_file_manager = sst_file_manager; @@ -856,34 +615,29 @@ TEST_F(BlobDBTest, SstFileManager) { ASSERT_EQ(1, blob_files.size()); std::shared_ptr bfile = blob_files[0]; ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile)); - GCStats gc_stats; - ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats)); + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); blob_db_impl()->TEST_DeleteObsoleteFiles(); // Even if SSTFileManager is not set, DB is creating a dummy one. ASSERT_EQ(1, files_scheduled_to_delete); - ASSERT_EQ(0, files_deleted_directly); Destroy(); // Make sure that DestroyBlobDB() also goes through delete scheduler. - ASSERT_GE(files_scheduled_to_delete, 2); - // Due to a timing issue, the WAL may or may not be deleted directly. The - // blob file is first scheduled, followed by WAL. If the background trash - // thread does not wake up on time, the WAL file will be directly - // deleted as the trash size will be > DB size - ASSERT_LE(files_deleted_directly, 1); + ASSERT_EQ(2, files_scheduled_to_delete); SyncPoint::GetInstance()->DisableProcessing(); sfm->WaitForEmptyTrash(); } TEST_F(BlobDBTest, SstFileManagerRestart) { - int files_deleted_directly = 0; int files_scheduled_to_delete = 0; rocksdb::SyncPoint::GetInstance()->SetCallBack( - "SstFileManagerImpl::ScheduleFileDeletion", - [&](void * /*arg*/) { files_scheduled_to_delete++; }); - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DeleteScheduler::DeleteFile", - [&](void * /*arg*/) { files_deleted_directly++; }); + "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) { + assert(arg); + const std::string *const file_path = + static_cast(arg); + if (file_path->find(".blob") != std::string::npos) { + ++files_scheduled_to_delete; + } + }); // run the same test for Get(), MultiGet() and Iterator each. std::shared_ptr sst_file_manager( @@ -912,9 +666,7 @@ TEST_F(BlobDBTest, SstFileManagerRestart) { // Make sure that reopening the DB rescan the existing trash files Open(bdb_options, db_options); - ASSERT_GE(files_scheduled_to_delete, 3); - // Depending on timing, the WAL file may or may not be directly deleted - ASSERT_LE(files_deleted_directly, 1); + ASSERT_EQ(files_scheduled_to_delete, 2); sfm->WaitForEmptyTrash(); @@ -937,67 +689,68 @@ TEST_F(BlobDBTest, SstFileManagerRestart) { TEST_F(BlobDBTest, SnapshotAndGarbageCollection) { BlobDBOptions bdb_options; bdb_options.min_blob_size = 0; + bdb_options.enable_garbage_collection = true; + bdb_options.garbage_collection_cutoff = 1.0; bdb_options.disable_background_tasks = true; + // i = when to take snapshot for (int i = 0; i < 4; i++) { - for (bool delete_key : {true, false}) { - const Snapshot *snapshot = nullptr; - Destroy(); - Open(bdb_options); - // First file - ASSERT_OK(Put("key1", "value")); - if (i == 0) { - snapshot = blob_db_->GetSnapshot(); - } - auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); - ASSERT_EQ(1, blob_files.size()); - ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); - // Second file - ASSERT_OK(Put("key2", "value")); - if (i == 1) { - snapshot = blob_db_->GetSnapshot(); - } - blob_files = blob_db_impl()->TEST_GetBlobFiles(); - ASSERT_EQ(2, blob_files.size()); - auto bfile = blob_files[1]; - ASSERT_FALSE(bfile->Immutable()); - ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile)); - // Third file - ASSERT_OK(Put("key3", "value")); - if (i == 2) { - snapshot = blob_db_->GetSnapshot(); - } - if (delete_key) { - Delete("key2"); - } - GCStats gc_stats; - ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats)); - ASSERT_TRUE(bfile->Obsolete()); - ASSERT_EQ(1, gc_stats.blob_count); - if (delete_key) { - ASSERT_EQ(0, gc_stats.num_keys_relocated); - } else { - ASSERT_EQ(1, gc_stats.num_keys_relocated); - } - ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), - bfile->GetObsoleteSequence()); - if (i == 3) { - snapshot = blob_db_->GetSnapshot(); - } - size_t num_files = delete_key ? 3 : 4; - ASSERT_EQ(num_files, blob_db_impl()->TEST_GetBlobFiles().size()); + Destroy(); + Open(bdb_options); + + const Snapshot *snapshot = nullptr; + + // First file + ASSERT_OK(Put("key1", "value")); + if (i == 0) { + snapshot = blob_db_->GetSnapshot(); + } + + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); + + // Second file + ASSERT_OK(Put("key2", "value")); + if (i == 1) { + snapshot = blob_db_->GetSnapshot(); + } + + blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(2, blob_files.size()); + auto bfile = blob_files[1]; + ASSERT_FALSE(bfile->Immutable()); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile)); + + // Third file + ASSERT_OK(Put("key3", "value")); + if (i == 2) { + snapshot = blob_db_->GetSnapshot(); + } + + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_TRUE(bfile->Obsolete()); + ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), + bfile->GetObsoleteSequence()); + + Delete("key2"); + if (i == 3) { + snapshot = blob_db_->GetSnapshot(); + } + + ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size()); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + + if (i >= 2) { + // The snapshot shouldn't see data in bfile + ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size()); + blob_db_->ReleaseSnapshot(snapshot); + } else { + // The snapshot will see data in bfile, so the file shouldn't be deleted + ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size()); + blob_db_->ReleaseSnapshot(snapshot); blob_db_impl()->TEST_DeleteObsoleteFiles(); - if (i == 3) { - // The snapshot shouldn't see data in bfile - ASSERT_EQ(num_files - 1, blob_db_impl()->TEST_GetBlobFiles().size()); - blob_db_->ReleaseSnapshot(snapshot); - } else { - // The snapshot will see data in bfile, so the file shouldn't be deleted - ASSERT_EQ(num_files, blob_db_impl()->TEST_GetBlobFiles().size()); - blob_db_->ReleaseSnapshot(snapshot); - blob_db_impl()->TEST_DeleteObsoleteFiles(); - ASSERT_EQ(num_files - 1, blob_db_impl()->TEST_GetBlobFiles().size()); - } + ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size()); } } } @@ -1434,9 +1187,7 @@ TEST_F(BlobDBTest, FilterExpiredBlobIndex) { // filtered regardless of snapshot. const Snapshot *snapshot = blob_db_->GetSnapshot(); // Issue manual compaction to trigger compaction filter. - ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), - blob_db_->DefaultColumnFamily(), nullptr, - nullptr)); + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); blob_db_->ReleaseSnapshot(snapshot); // Verify expired blob index are filtered. std::vector versions; @@ -1450,7 +1201,7 @@ TEST_F(BlobDBTest, FilterExpiredBlobIndex) { } // Test compaction filter should remove any blob index where corresponding -// blob file has been removed (either by FIFO or garbage collection). +// blob file has been removed. TEST_F(BlobDBTest, FilterFileNotAvailable) { BlobDBOptions bdb_options; bdb_options.min_blob_size = 0; @@ -1481,7 +1232,6 @@ TEST_F(BlobDBTest, FilterFileNotAvailable) { ASSERT_EQ("foo", versions[1].user_key); VerifyDB({{"bar", "v2"}, {"foo", "v1"}}); - ASSERT_OK(blob_db_->Flush(FlushOptions())); ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions)); ASSERT_EQ(2, versions.size()); @@ -1720,9 +1470,7 @@ TEST_F(BlobDBTest, GarbageCollection) { mock_env_->set_current_time(kCompactTime); - ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), - blob_db_->DefaultColumnFamily(), nullptr, - nullptr)); + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); // We expect the data to remain the same and the blobs from the oldest N files // to be moved to new files. Sequence numbers get zeroed out during the diff --git a/utilities/blob_db/blob_file.h b/utilities/blob_db/blob_file.h index 37fc895fb..81755f384 100644 --- a/utilities/blob_db/blob_file.h +++ b/utilities/blob_db/blob_file.h @@ -76,7 +76,7 @@ class BlobFile { // The latest sequence number when the file was closed/made immutable. SequenceNumber immutable_sequence_{0}; - // has a pass of garbage collection successfully finished on this file + // Whether the file was marked obsolete (due to either TTL or GC). // obsolete_ still needs to do iterator/snapshot checks std::atomic obsolete_{false}; @@ -168,13 +168,13 @@ class BlobFile { return immutable_sequence_; } - // if the file has gone through GC and blobs have been relocated + // Whether the file was marked obsolete (due to either TTL or GC). bool Obsolete() const { assert(Immutable() || !obsolete_.load()); return obsolete_.load(); } - // Mark file as obsolete by garbage collection. The file is not visible to + // Mark file as obsolete (due to either TTL or GC). The file is not visible to // snapshots with sequence greater or equal to the given sequence. void MarkObsolete(SequenceNumber sequence);