From e5a1b727c0ebd028c647649489e008c7d7840ee7 Mon Sep 17 00:00:00 2001 From: yiwu-arbug Date: Fri, 11 Aug 2017 12:30:02 -0700 Subject: [PATCH] Fix blob DB transaction usage while GC Summary: While GC, blob DB use optimistic transaction to delete or replace the index entry in LSM, to guarantee correctness if there's a normal write writing to the same key. However, the previous implementation doesn't call SetSnapshot() nor use GetForUpdate() of transaction API, instead it do its own sequence number checking before beginning the transaction. A normal write can sneak in after the sequence number check and overwrite the key, and the GC will delete or relocate the old version of the key by mistake. Update the code to property use GetForUpdate() to check the existing index entry. After the patch the sequence number store with each blob record is useless, So I'm considering remove the sequence number from blob record, in another patch. Closes https://github.com/facebook/rocksdb/pull/2703 Differential Revision: D5589178 Pulled By: yiwu-arbug fbshipit-source-id: 8dc960cd5f4e61b36024ba7c32d05584ce149c24 --- utilities/blob_db/blob_db_impl.cc | 320 ++++++++++++++++----------- utilities/blob_db/blob_db_impl.h | 11 +- utilities/blob_db/blob_db_test.cc | 103 +++++++-- utilities/blob_db/blob_log_reader.cc | 5 +- utilities/blob_db/blob_log_reader.h | 4 +- 5 files changed, 293 insertions(+), 150 deletions(-) diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 9e1623eb5..a15fe4a18 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -30,6 +30,7 @@ #include "util/logging.h" #include "util/mutexlock.h" #include "util/random.h" +#include "util/sync_point.h" #include "util/timer_queue.h" #include "utilities/transactions/optimistic_transaction_db_impl.h" #include "utilities/transactions/optimistic_transaction.h" @@ -951,6 +952,7 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw, Status BlobDBImpl::PutUntil(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value_unc, uint64_t expiration) { + TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start"); MutexLock l(&write_mutex_); UpdateWriteOptions(options); @@ -1022,6 +1024,7 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, CloseIf(bfile); + TEST_SYNC_POINT("BlobDBImpl::PutUntil:Finish"); return s; } @@ -1655,8 +1658,8 @@ std::pair BlobDBImpl::WaStats(bool aborted) { // DELETED in the LSM //////////////////////////////////////////////////////////////////////////////// Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, - GCStats* gcstats) { - uint64_t tt = EpochNow(); + GCStats* gc_stats) { + uint64_t now = EpochNow(); std::shared_ptr reader = bfptr->OpenSequentialReader(env_, db_options_, env_options_); @@ -1679,8 +1682,6 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, bool first_gc = bfptr->gc_once_after_open_; ColumnFamilyHandle* cfh = bfptr->GetColumnFamily(db_); - auto cfhi = reinterpret_cast(cfh); - auto cfd = cfhi->cfd(); bool has_ttl = header.HasTTL(); // this reads the key but skips the blob @@ -1688,7 +1689,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, assert(opt_db_); - bool no_relocation_ttl = (has_ttl && tt > bfptr->GetTTLRange().second); + bool no_relocation_ttl = (has_ttl && now >= bfptr->GetTTLRange().second); bool no_relocation_lsmdel = false; { @@ -1707,136 +1708,199 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, BlobLogRecord record; std::shared_ptr newfile; std::shared_ptr new_writer; - - while (reader->ReadRecord(&record, shallow).ok()) { - gcstats->blob_count++; - - bool del_this = false; - bool reloc_this = false; - - // TODO(yiwu): The following logic should use GetForUpdate() from - // optimistic transaction to check if the key is current, otherwise - // there can be another writer sneak in between sequence number of - // and the deletion. - - // this particular TTL has expired - if (no_relocation_ttl || (has_ttl && tt > record.GetTTL())) { - del_this = true; - } else if (!first_gc) { - SequenceNumber seq = kMaxSequenceNumber; - bool found_record_for_key = false; - SuperVersion* sv = db_impl_->GetAndRefSuperVersion(cfd); - if (sv == nullptr) { - Status result = - Status::InvalidArgument("Could not access column family 0"); - return result; - } - Status s1 = db_impl_->GetLatestSequenceForKey( - sv, record.Key(), false, &seq, &found_record_for_key); - if (found_record_for_key && seq == record.GetSN()) { - reloc_this = true; + Transaction* transaction = nullptr; + uint64_t blob_offset = 0; + bool retry = false; + + static const WriteOptions kGarbageCollectionWriteOptions = []() { + WriteOptions write_options; + // TODO(yiwu): Disable WAL for garbage colection to make it compatible with + // use cases that don't use WAL. However without WAL there are at least + // two issues with crash: + // 1. If a key is dropped from blob file (e.g. due to TTL), right before a + // crash, the key may still presents in LSM after restart. + // 2. If a key is relocated to another blob file, right before a crash, + // after restart the new offset may be lost with the old offset pointing + // to the removed blob file. + // We need to have better recovery mechanism to address these issues. + write_options.disableWAL = true; + // It is ok to ignore column families that were dropped. + write_options.ignore_missing_column_families = true; + return write_options; + }(); + + while (true) { + assert(s.ok()); + if (retry) { + // Retry in case transaction fail with Status::TryAgain. + retry = false; + } else { + // Read the next blob record. + Status read_record_status = + reader->ReadRecord(&record, shallow, &blob_offset); + // Exit if we reach the end of blob file. + // TODO(yiwu): properly handle ReadRecord error. + if (!read_record_status.ok()) { + break; } - db_impl_->ReturnAndCleanupSuperVersion(cfd, sv); + gc_stats->blob_count++; } - if (del_this) { - gcstats->num_deletes++; - gcstats->deleted_size += record.GetBlobSize(); - if (first_gc) continue; - - Transaction* txn = opt_db_->BeginTransaction( - write_options_, OptimisticTransactionOptions(), nullptr); - txn->Delete(cfh, record.Key()); - Status s1 = txn->Commit(); - // chances that this DELETE will fail is low. If it fails, it would be - // because a new version of the key came in at this time, which will - // override the current version being iterated on. - if (!s1.IsBusy()) { - // assume that failures happen due to new writes. - gcstats->overrided_while_delete++; - } - delete txn; - } + transaction = + opt_db_->BeginTransaction(kGarbageCollectionWriteOptions, + OptimisticTransactionOptions(), transaction); - if (reloc_this) { - if (!newfile) { - // new file - std::string reason("GC of "); - reason += bfptr->PathName(); - newfile = NewBlobFile(reason); - gcstats->newfile = newfile; - - new_writer = CheckOrCreateWriterLocked(newfile); - newfile->header_ = std::move(header); - // Can't use header beyond this point - newfile->header_valid_ = true; - newfile->file_size_ = BlobLogHeader::kHeaderSize; - s = new_writer->WriteHeader(newfile->header_); - - if (!s.ok()) { - ROCKS_LOG_ERROR(db_options_.info_log, - "File: %s - header writing failed", - newfile->PathName().c_str()); - return s; - } + std::string index_entry; + Status get_status = transaction->GetForUpdate(ReadOptions(), cfh, + record.Key(), &index_entry); + TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetForUpdate"); + if (get_status.IsNotFound()) { + // Key has been deleted. Drop the blob record. + continue; + } + if (!get_status.ok()) { + s = get_status; + ROCKS_LOG_ERROR(db_options_.info_log, + "Error while getting index entry: %s", + s.ToString().c_str()); + break; + } - WriteLock wl(&mutex_); + // TODO(yiwu): We should have an override of GetForUpdate returning a + // PinnableSlice. + Slice index_entry_slice(index_entry); + BlobHandle handle; + s = handle.DecodeFrom(&index_entry_slice); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Error while decoding index entry: %s", + s.ToString().c_str()); + break; + } + if (handle.filenumber() != bfptr->BlobFileNumber() || + handle.offset() != blob_offset) { + // Key has been overwritten. Drop the blob record. + continue; + } - dir_change_.store(true); - blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile)); + // If key has expired, remove it from base DB. + if (no_relocation_ttl || (has_ttl && now >= record.GetTTL())) { + gc_stats->num_deletes++; + gc_stats->deleted_size += record.GetBlobSize(); + TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"); + transaction->Delete(cfh, record.Key()); + Status delete_status = transaction->Commit(); + if (delete_status.ok()) { + gc_stats->delete_succeeded++; + } else if (delete_status.IsBusy()) { + // The key is overwritten in the meanwhile. Drop the blob record. + gc_stats->overwritten_while_delete++; + } else if (delete_status.IsTryAgain()) { + // Retry the transaction. + retry = true; + } else { + // We hit an error. + s = delete_status; + ROCKS_LOG_ERROR(db_options_.info_log, + "Error while deleting expired key: %s", + s.ToString().c_str()); + break; } + // Continue to next blob record or retry. + continue; + } - gcstats->num_relocs++; - std::string index_entry; + if (first_gc) { + // Do not relocate blob record for initial GC. + continue; + } - uint64_t blob_offset = 0; - uint64_t key_offset = 0; - // write the blob to the blob log. - s = new_writer->AddRecord(record.Key(), record.Blob(), &key_offset, - &blob_offset, record.GetTTL()); - - BlobHandle handle; - handle.set_filenumber(newfile->BlobFileNumber()); - handle.set_size(record.Blob().size()); - handle.set_offset(blob_offset); - handle.set_compression(bdb_options_.compression); - handle.EncodeTo(&index_entry); - - new_writer->AddRecordFooter(record.GetSN()); - newfile->blob_count_++; - newfile->file_size_ += BlobLogRecord::kHeaderSize + record.Key().size() + - record.Blob().size() + BlobLogRecord::kFooterSize; - - Transaction* txn = opt_db_->BeginTransaction( - write_options_, OptimisticTransactionOptions(), nullptr); - txn->Put(cfh, record.Key(), index_entry); - Status s1 = txn->Commit(); - // chances that this Put will fail is low. If it fails, it would be - // because a new version of the key came in at this time, which will - // override the current version being iterated on. - if (s1.IsBusy()) { - ROCKS_LOG_INFO(db_options_.info_log, - "Optimistic transaction failed: %s put bn: %" PRIu32, - bfptr->PathName().c_str(), gcstats->blob_count); - } else { - gcstats->succ_relocs++; - ROCKS_LOG_DEBUG(db_options_.info_log, - "Successfully added put back into LSM: %s bn: %" PRIu32, - bfptr->PathName().c_str(), gcstats->blob_count); + // Relocate the blob record to new file. + if (!newfile) { + // new file + std::string reason("GC of "); + reason += bfptr->PathName(); + newfile = NewBlobFile(reason); + gc_stats->newfile = newfile; + + new_writer = CheckOrCreateWriterLocked(newfile); + newfile->header_ = std::move(header); + // Can't use header beyond this point + newfile->header_valid_ = true; + newfile->file_size_ = BlobLogHeader::kHeaderSize; + s = new_writer->WriteHeader(newfile->header_); + + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "File: %s - header writing failed", + newfile->PathName().c_str()); + break; } - delete txn; - } - } - if (gcstats->newfile) total_blob_space_ += newfile->file_size_; + WriteLock wl(&mutex_); - ROCKS_LOG_INFO(db_options_.info_log, - "File: %s Num deletes %" PRIu32 " Num relocs: %" PRIu32 - " Succ Deletes: %" PRIu32 " Succ relocs: %" PRIu32, - bfptr->PathName().c_str(), gcstats->num_deletes, - gcstats->num_relocs, gcstats->succ_deletes_lsm, - gcstats->succ_relocs); + dir_change_.store(true); + blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile)); + } + + gc_stats->num_relocate++; + std::string new_index_entry; + uint64_t new_blob_offset = 0; + uint64_t new_key_offset = 0; + // write the blob to the blob log. + s = new_writer->AddRecord(record.Key(), record.Blob(), &new_key_offset, + &new_blob_offset, record.GetTTL()); + + BlobHandle new_handle; + new_handle.set_filenumber(newfile->BlobFileNumber()); + new_handle.set_size(record.Blob().size()); + new_handle.set_offset(new_blob_offset); + new_handle.set_compression(bdb_options_.compression); + new_handle.EncodeTo(&new_index_entry); + + new_writer->AddRecordFooter(record.GetSN()); + newfile->blob_count_++; + newfile->file_size_ += BlobLogRecord::kHeaderSize + record.Key().size() + + record.Blob().size() + BlobLogRecord::kFooterSize; + + TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"); + transaction->Put(cfh, record.Key(), new_index_entry); + Status put_status = transaction->Commit(); + if (put_status.ok()) { + gc_stats->relocate_succeeded++; + } else if (put_status.IsBusy()) { + // The key is overwritten in the meanwhile. Drop the blob record. + gc_stats->overwritten_while_relocate++; + } else if (put_status.IsTryAgain()) { + // Retry the transaction. + // TODO(yiwu): On retry, we can reuse the new blob record. + retry = true; + } else { + // We hit an error. + s = put_status; + ROCKS_LOG_ERROR(db_options_.info_log, "Error while relocating key: %s", + s.ToString().c_str()); + break; + } + } // end of ReadRecord loop + + if (transaction != nullptr) { + delete transaction; + } + ROCKS_LOG_INFO( + db_options_.info_log, "%s blob file %" PRIu64 ".", + ". Total blob records: %" PRIu64 ", Deletes: %" PRIu64 "/%" PRIu64 + " succeeded, Relocates: %" PRIu64 "/%" PRIu64 " succeeded.", + s.ok() ? "Successfully garbage collected" : "Failed to garbage collect", + bfptr->BlobFileNumber(), gc_stats->blob_count, gc_stats->delete_succeeded, + gc_stats->num_deletes, gc_stats->relocate_succeeded, + gc_stats->num_relocate); + if (newfile != nullptr) { + total_blob_space_ += newfile->file_size_; + ROCKS_LOG_INFO(db_options_.info_log, "New blob file %" PRIu64 ".", + newfile->BlobFileNumber()); + } return s; } @@ -2119,15 +2183,17 @@ std::pair BlobDBImpl::RunGC(bool aborted) { // in this collect the set of files, which became obsolete std::vector> obsoletes; for (auto bfile : to_process) { - GCStats gcstats; - Status s = GCFileAndUpdateLSM(bfile, &gcstats); - if (!s.ok()) continue; + GCStats gc_stats; + Status s = GCFileAndUpdateLSM(bfile, &gc_stats); + if (!s.ok()) { + continue; + } if (bfile->gc_once_after_open_.load()) { WriteLock lockbfile_w(&bfile->mutex_); - bfile->deleted_size_ = gcstats.deleted_size; - bfile->deleted_count_ = gcstats.num_deletes; + bfile->deleted_size_ = gc_stats.deleted_size; + bfile->deleted_count_ = gc_stats.num_deletes; bfile->gc_once_after_open_ = false; } else { obsoletes.push_back(bfile); diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index d812604be..6247fa22b 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -137,10 +137,13 @@ struct GCStats { uint64_t blob_count = 0; uint64_t num_deletes = 0; uint64_t deleted_size = 0; - uint64_t num_relocs = 0; - uint64_t succ_deletes_lsm = 0; - uint64_t overrided_while_delete = 0; - uint64_t succ_relocs = 0; + uint64_t retry_delete = 0; + uint64_t delete_succeeded = 0; + uint64_t overwritten_while_delete = 0; + uint64_t num_relocate = 0; + uint64_t retry_relocate = 0; + uint64_t relocate_succeeded = 0; + uint64_t overwritten_while_relocate = 0; std::shared_ptr newfile = nullptr; }; diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 9f3ae1b01..28d4d5b8d 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -15,6 +15,7 @@ #include "util/cast_util.h" #include "util/random.h" #include "util/string_util.h" +#include "util/sync_point.h" #include "util/testharness.h" #include "utilities/blob_db/blob_db_impl.h" @@ -177,7 +178,7 @@ TEST_F(BlobDBTest, PutWithTTL) { for (size_t i = 0; i < 100; i++) { uint64_t ttl = rnd.Next() % 100; PutRandomWithTTL("key" + ToString(i), ttl, &rnd, - (ttl < 50 ? nullptr : &data)); + (ttl <= 50 ? nullptr : &data)); } mock_env_->set_now_micros(100 * 1000000); auto *bdb_impl = static_cast(blob_db_); @@ -188,7 +189,7 @@ TEST_F(BlobDBTest, PutWithTTL) { GCStats gc_stats; ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); - ASSERT_EQ(data.size(), gc_stats.num_relocs); + ASSERT_EQ(data.size(), gc_stats.num_relocate); VerifyDB(data); } @@ -206,7 +207,7 @@ TEST_F(BlobDBTest, PutUntil) { for (size_t i = 0; i < 100; i++) { uint64_t expiration = rnd.Next() % 100 + 50; PutRandomUntil("key" + ToString(i), expiration, &rnd, - (expiration < 100 ? nullptr : &data)); + (expiration <= 100 ? nullptr : &data)); } mock_env_->set_now_micros(100 * 1000000); auto *bdb_impl = static_cast(blob_db_); @@ -217,7 +218,7 @@ TEST_F(BlobDBTest, PutUntil) { GCStats gc_stats; ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); - ASSERT_EQ(data.size(), gc_stats.num_relocs); + ASSERT_EQ(data.size(), gc_stats.num_relocate); VerifyDB(data); } @@ -249,7 +250,7 @@ TEST_F(BlobDBTest, TTLExtrator_NoTTL) { GCStats gc_stats; ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); ASSERT_EQ(0, gc_stats.num_deletes); - ASSERT_EQ(100, gc_stats.num_relocs); + ASSERT_EQ(100, gc_stats.num_relocate); VerifyDB(data); } @@ -263,7 +264,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractTTL) { std::string * /*new_value*/, bool * /*value_changed*/) override { *ttl = rnd->Next() % 100; - if (*ttl >= 50) { + if (*ttl > 50) { data[key.ToString()] = value.ToString(); } return true; @@ -295,7 +296,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractTTL) { ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); auto &data = static_cast(ttl_extractor_.get())->data; ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); - ASSERT_EQ(data.size(), gc_stats.num_relocs); + ASSERT_EQ(data.size(), gc_stats.num_relocate); VerifyDB(data); } @@ -310,7 +311,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractExpiration) { std::string * /*new_value*/, bool * /*value_changed*/) override { *expiration = rnd->Next() % 100 + 50; - if (*expiration >= 100) { + if (*expiration > 100) { data[key.ToString()] = value.ToString(); } return true; @@ -342,7 +343,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractExpiration) { ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); auto &data = static_cast(ttl_extractor_.get())->data; ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); - ASSERT_EQ(data.size(), gc_stats.num_relocs); + ASSERT_EQ(data.size(), gc_stats.num_relocate); VerifyDB(data); } @@ -385,7 +386,7 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) { std::string value_ttl = value + "ttl:"; PutFixed64(&value_ttl, ttl); ASSERT_OK(blob_db_->Put(WriteOptions(), Slice(key), Slice(value_ttl))); - if (ttl >= 50) { + if (ttl > 50) { data[key] = value; } } @@ -398,7 +399,7 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) { GCStats gc_stats; ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); - ASSERT_EQ(data.size(), gc_stats.num_relocs); + ASSERT_EQ(data.size(), gc_stats.num_relocate); VerifyDB(data); } @@ -534,9 +535,7 @@ TEST_F(BlobDBTest, MultipleWriters) { i)); std::map data; for (size_t i = 0; i < 10; i++) { - if (workers[i].joinable()) { - workers[i].join(); - } + workers[i].join(); data.insert(data_set[i].begin(), data_set[i].end()); } VerifyDB(data); @@ -579,7 +578,7 @@ TEST_F(BlobDBTest, SequenceNumber) { } } -TEST_F(BlobDBTest, GCShouldKeepKeysWithNewerVersion) { +TEST_F(BlobDBTest, GCAfterOverwriteKeys) { Random rnd(301); BlobDBOptions bdb_options; bdb_options.disable_background_tasks = true; @@ -612,11 +611,83 @@ TEST_F(BlobDBTest, GCShouldKeepKeysWithNewerVersion) { } GCStats gc_stats; ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_EQ(200, gc_stats.blob_count); ASSERT_EQ(0, gc_stats.num_deletes); - ASSERT_EQ(200 - new_keys, gc_stats.num_relocs); + ASSERT_EQ(200 - new_keys, gc_stats.num_relocate); VerifyDB(data); } +TEST_F(BlobDBTest, GCRelocateKeyWhileOverwritting) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v1")); + BlobDBImpl *blob_db_impl = + static_cast_with_check(blob_db_); + auto blob_files = blob_db_impl->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + blob_db_impl->TEST_CloseBlobFile(blob_files[0]); + + SyncPoint::GetInstance()->LoadDependency( + {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetForUpdate", + "BlobDBImpl::PutUntil:Start"}, + {"BlobDBImpl::PutUntil:Finish", + "BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + auto writer = port::Thread( + [this]() { ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v2")); }); + + GCStats gc_stats; + ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_EQ(1, gc_stats.blob_count); + ASSERT_EQ(0, gc_stats.num_deletes); + ASSERT_EQ(1, gc_stats.num_relocate); + ASSERT_EQ(0, gc_stats.relocate_succeeded); + ASSERT_EQ(1, gc_stats.overwritten_while_relocate); + writer.join(); + VerifyDB({{"foo", "v2"}}); +} + +TEST_F(BlobDBTest, GCExpiredKeyWhileOverwritting) { + Random rnd(301); + Options options; + options.env = mock_env_.get(); + BlobDBOptions bdb_options; + bdb_options.disable_background_tasks = true; + Open(bdb_options, options); + mock_env_->set_now_micros(100 * 1000000); + ASSERT_OK(blob_db_->PutUntil(WriteOptions(), "foo", "v1", 200)); + BlobDBImpl *blob_db_impl = + static_cast_with_check(blob_db_); + auto blob_files = blob_db_impl->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + blob_db_impl->TEST_CloseBlobFile(blob_files[0]); + mock_env_->set_now_micros(300 * 1000000); + + SyncPoint::GetInstance()->LoadDependency( + {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetForUpdate", + "BlobDBImpl::PutUntil:Start"}, + {"BlobDBImpl::PutUntil:Finish", + "BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + auto writer = port::Thread([this]() { + ASSERT_OK(blob_db_->PutUntil(WriteOptions(), "foo", "v2", 400)); + }); + + GCStats gc_stats; + ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_EQ(1, gc_stats.blob_count); + ASSERT_EQ(1, gc_stats.num_deletes); + ASSERT_EQ(0, gc_stats.delete_succeeded); + ASSERT_EQ(1, gc_stats.overwritten_while_delete); + ASSERT_EQ(0, gc_stats.num_relocate); + writer.join(); + VerifyDB({{"foo", "v2"}}); +} + } // namespace blob_db } // namespace rocksdb diff --git a/utilities/blob_db/blob_log_reader.cc b/utilities/blob_db/blob_log_reader.cc index 3931c8669..75afab2e7 100644 --- a/utilities/blob_db/blob_log_reader.cc +++ b/utilities/blob_db/blob_log_reader.cc @@ -41,7 +41,7 @@ Status Reader::ReadHeader(BlobLogHeader* header) { } Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level, - WALRecoveryMode wal_recovery_mode) { + uint64_t* blob_offset) { record->Clear(); buffer_.clear(); backing_store_[0] = '\0'; @@ -65,6 +65,9 @@ Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level, header_crc = crc32c::Extend(header_crc, buffer_.data(), crc_data_size); uint64_t kb_size = record->GetKeySize() + record->GetBlobSize(); + if (blob_offset != nullptr) { + *blob_offset = next_byte_ + record->GetKeySize(); + } switch (level) { case kReadHdrFooter: file_->Skip(kb_size); diff --git a/utilities/blob_db/blob_log_reader.h b/utilities/blob_db/blob_log_reader.h index 05f53fe93..5522ec3a2 100644 --- a/utilities/blob_db/blob_log_reader.h +++ b/utilities/blob_db/blob_log_reader.h @@ -60,9 +60,9 @@ class Reader { // "*scratch" as temporary storage. The contents filled in *record // will only be valid until the next mutating operation on this // reader or the next mutation to *scratch. + // If blob_offset is non-null, return offset of the blob through it. Status ReadRecord(BlobLogRecord* record, ReadLevel level = kReadHdrFooter, - WALRecoveryMode wal_recovery_mode = - WALRecoveryMode::kTolerateCorruptedTailRecords); + uint64_t* blob_offset = nullptr); SequentialFileReader* file() { return file_.get(); }