diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 6939e73aa..d787529b1 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -305,8 +305,8 @@ void BlobDBImpl::StartBackgroundTasks() { kDeleteCheckPeriodMillisecs, std::bind(&BlobDBImpl::EvictCompacted, this, std::placeholders::_1)); tqueue_.add( - kDeleteObsoletedFilesPeriodMillisecs, - std::bind(&BlobDBImpl::DeleteObsFiles, this, std::placeholders::_1)); + kDeleteObsoleteFilesPeriodMillisecs, + std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1)); tqueue_.add(kSanityCheckPeriodMillisecs, std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1)); tqueue_.add(kWriteAmplificationStatsPeriodMillisecs, @@ -1117,15 +1117,25 @@ Status BlobDBImpl::AppendSN(const std::shared_ptr& bfile, } std::vector BlobDBImpl::MultiGet( - const ReadOptions& options, + const ReadOptions& read_options, const std::vector& column_family, const std::vector& keys, std::vector* values) { + // Get a snapshot to avoid blob file get deleted between we + // fetch and index entry and reading from the file. + ReadOptions ro(read_options); + bool snapshot_created = SetSnapshotIfNeeded(&ro); std::vector values_lsm; values_lsm.resize(keys.size()); - auto statuses = db_->MultiGet(options, column_family, keys, &values_lsm); + auto statuses = db_->MultiGet(ro, column_family, keys, &values_lsm); + TEST_SYNC_POINT("BlobDBImpl::MultiGet:AfterIndexEntryGet:1"); + TEST_SYNC_POINT("BlobDBImpl::MultiGet:AfterIndexEntryGet:2"); + values->resize(keys.size()); + assert(statuses.size() == keys.size()); for (size_t i = 0; i < keys.size(); ++i) { - if (!statuses[i].ok()) continue; + if (!statuses[i].ok()) { + continue; + } auto cfh = reinterpret_cast(column_family[i]); auto cfd = cfh->cfd(); @@ -1133,9 +1143,21 @@ std::vector BlobDBImpl::MultiGet( Status s = CommonGet(cfd, keys[i], values_lsm[i], &((*values)[i])); statuses[i] = s; } + if (snapshot_created) { + db_->ReleaseSnapshot(ro.snapshot); + } return statuses; } +bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) { + assert(read_options != nullptr); + if (read_options->snapshot != nullptr) { + return false; + } + read_options->snapshot = db_->GetSnapshot(); + return true; +} + Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key, const std::string& index_entry, std::string* value, SequenceNumber* sequence) { @@ -1172,11 +1194,6 @@ Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key, bfile = hitr->second; } - if (bfile->Obsolete()) { - return Status::NotFound( - "Blob Not Found as blob file was garbage collected"); - } - // 0 - size if (!handle.size() && value != nullptr) { value->clear(); @@ -1274,25 +1291,30 @@ Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key, return s; } -Status BlobDBImpl::Get(const ReadOptions& options, +Status BlobDBImpl::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) { auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); + // Get a snapshot to avoid blob file get deleted between we + // fetch and index entry and reading from the file. + // TODO(yiwu): For Get() retry if file not found would be a simpler strategy. + ReadOptions ro(read_options); + bool snapshot_created = SetSnapshotIfNeeded(&ro); + Status s; std::string index_entry; - s = db_->Get(options, column_family, key, &index_entry); - if (!s.ok()) { - if (debug_level_ >= 3) - ROCKS_LOG_WARN(db_options_.info_log, - "Get Failed on LSM KEY: %s status: '%s'", - key.ToString().c_str(), s.ToString().c_str()); - return s; + s = db_->Get(ro, column_family, key, &index_entry); + TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:1"); + TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:2"); + if (s.ok()) { + s = CommonGet(cfd, key, index_entry, value->GetSelf()); + value->PinSelf(); + } + if (snapshot_created) { + db_->ReleaseSnapshot(ro.snapshot); } - - s = CommonGet(cfd, key, index_entry, value->GetSelf()); - value->PinSelf(); return s; } @@ -1302,6 +1324,8 @@ Slice BlobDBIterator::value() const { auto cfh = reinterpret_cast(cfh_); auto cfd = cfh->cfd(); + TEST_SYNC_POINT("BlobDBIterator::value:BeforeGetBlob:1"); + TEST_SYNC_POINT("BlobDBIterator::value:BeforeGetBlob:2"); Status s = db_impl_->CommonGet(cfd, iter_->key(), index_entry.ToString(false), &vpart_); return Slice(vpart_); @@ -1977,7 +2001,7 @@ bool BlobDBImpl::ShouldGCFile(std::shared_ptr bfile, uint64_t now, return false; } -std::pair BlobDBImpl::DeleteObsFiles(bool aborted) { +std::pair BlobDBImpl::DeleteObsoleteFiles(bool aborted) { if (aborted) return std::make_pair(false, -1); { @@ -2002,6 +2026,7 @@ std::pair BlobDBImpl::DeleteObsFiles(bool aborted) { } } + blob_files_.erase(bfile->BlobFileNumber()); Status s = env_->DeleteFile(bfile->PathName()); if (!s.ok()) { ROCKS_LOG_ERROR(db_options_.info_log, @@ -2026,7 +2051,9 @@ std::pair BlobDBImpl::DeleteObsFiles(bool aborted) { // put files back into obsolete if for some reason, delete failed if (!tobsolete.empty()) { WriteLock wl(&mutex_); - for (auto bfile : tobsolete) obsolete_files_.push_front(bfile); + for (auto bfile : tobsolete) { + obsolete_files_.push_front(bfile); + } } return std::make_pair(!aborted, -1); @@ -2212,8 +2239,6 @@ std::pair BlobDBImpl::RunGC(bool aborted) { WriteLock wl(&mutex_); for (auto bfile : obsoletes) { bool last_file = (bfile == obsoletes.back()); - // remove from global list so writers - blob_files_.erase(bfile->BlobFileNumber()); if (!evict_cb) { bfile->SetCanBeDeleted(); @@ -2231,10 +2256,14 @@ std::pair BlobDBImpl::RunGC(bool aborted) { return std::make_pair(true, -1); } -Iterator* BlobDBImpl::NewIterator(const ReadOptions& opts, +Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options, ColumnFamilyHandle* column_family) { - return new BlobDBIterator(db_->NewIterator(opts, column_family), - column_family, this); + // Get a snapshot to avoid blob file get deleted between we + // fetch and index entry and reading from the file. + ReadOptions ro(read_options); + bool snapshot_created = SetSnapshotIfNeeded(&ro); + return new BlobDBIterator(db_->NewIterator(ro, column_family), column_family, + this, snapshot_created, ro.snapshot); } Status DestroyBlobDB(const std::string& dbname, const Options& options, @@ -2283,6 +2312,7 @@ Status BlobDBImpl::TEST_GetSequenceNumber(const Slice& key, } std::vector> BlobDBImpl::TEST_GetBlobFiles() const { + ReadLock l(&mutex_); std::vector> blob_files; for (auto& p : blob_files_) { blob_files.emplace_back(p.second); @@ -2300,6 +2330,10 @@ std::vector> BlobDBImpl::TEST_GetObsoleteFiles() return obsolete_files; } +void BlobDBImpl::TEST_DeleteObsoleteFiles() { + DeleteObsoleteFiles(false /*abort*/); +} + void BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr& bfile) { CloseSeqWrite(bfile, false /*abort*/); } @@ -2310,6 +2344,16 @@ Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr& bfile, } void BlobDBImpl::TEST_RunGC() { RunGC(false /*abort*/); } + +void BlobDBImpl::TEST_ObsoleteFile(std::shared_ptr& bfile) { + uint64_t number = bfile->BlobFileNumber(); + assert(blob_files_.count(number) > 0); + bfile->SetCanBeDeleted(); + { + WriteLock l(&mutex_); + obsolete_files_.push_back(bfile); + } +} #endif // !NDEBUG } // namespace blob_db diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 080834952..9886dbe5b 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -200,7 +200,7 @@ class BlobDBImpl : public BlobDB { static constexpr uint32_t kReclaimOpenFilesPeriodMillisecs = 1 * 1000; // how often to schedule delete obs files periods - static constexpr uint32_t kDeleteObsoletedFilesPeriodMillisecs = 10 * 1000; + static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs = 10 * 1000; // how often to schedule check seq files period static constexpr uint32_t kCheckSeqFilesPeriodMillisecs = 10 * 1000; @@ -219,16 +219,16 @@ class BlobDBImpl : public BlobDB { const Slice& key) override; using rocksdb::StackableDB::Get; - Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, + Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; using rocksdb::StackableDB::NewIterator; - virtual Iterator* NewIterator(const ReadOptions& opts, + virtual Iterator* NewIterator(const ReadOptions& read_options, ColumnFamilyHandle* column_family) override; using rocksdb::StackableDB::MultiGet; virtual std::vector MultiGet( - const ReadOptions& options, + const ReadOptions& read_options, const std::vector& column_family, const std::vector& keys, std::vector* values) override; @@ -269,11 +269,19 @@ class BlobDBImpl : public BlobDB { GCStats* gc_stats); void TEST_RunGC(); + + void TEST_ObsoleteFile(std::shared_ptr& bfile); + + void TEST_DeleteObsoleteFiles(); #endif // !NDEBUG private: Status OpenPhase1(); + // Create a snapshot if there isn't one in read options. + // Return true if a snapshot is created. + bool SetSnapshotIfNeeded(ReadOptions* read_options); + Status CommonGet(const ColumnFamilyData* cfd, const Slice& key, const std::string& index_entry, std::string* value, SequenceNumber* sequence = nullptr); @@ -332,7 +340,7 @@ class BlobDBImpl : public BlobDB { // delete files which have been garbage collected and marked // obsolete. Check whether any snapshots exist which refer to // the same - std::pair DeleteObsFiles(bool aborted); + std::pair DeleteObsoleteFiles(bool aborted); // Major task to garbage collect expired and deleted blobs std::pair RunGC(bool aborted); @@ -593,7 +601,7 @@ class BlobFile { // This Read-Write mutex is per file specific and protects // all the datastructures - port::RWMutex mutex_; + mutable port::RWMutex mutex_; // time when the random access reader was last created. std::atomic last_access_; @@ -700,12 +708,23 @@ class BlobFile { class BlobDBIterator : public Iterator { public: explicit BlobDBIterator(Iterator* iter, ColumnFamilyHandle* column_family, - BlobDBImpl* impl) - : iter_(iter), cfh_(column_family), db_impl_(impl) { - assert(iter_); + BlobDBImpl* impl, bool own_snapshot, + const Snapshot* snapshot) + : iter_(iter), + cfh_(column_family), + db_impl_(impl), + own_snapshot_(own_snapshot), + snapshot_(snapshot) { + assert(iter != nullptr); + assert(snapshot != nullptr); } - ~BlobDBIterator() { delete iter_; } + ~BlobDBIterator() { + if (own_snapshot_) { + db_impl_->ReleaseSnapshot(snapshot_); + } + delete iter_; + } bool Valid() const override { return iter_->Valid(); } @@ -727,10 +746,14 @@ class BlobDBIterator : public Iterator { Status status() const override { return iter_->status(); } + // Iterator::Refresh() not supported. + private: Iterator* iter_; ColumnFamilyHandle* cfh_; BlobDBImpl* db_impl_; + bool own_snapshot_; + const Snapshot* snapshot_; mutable std::string vpart_; }; diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 4b742157e..8ec01698a 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -723,6 +723,98 @@ TEST_F(BlobDBTest, GCOldestSimpleBlobFileWhenOutOfSpace) { obsolete_files[0]->BlobFileNumber()); } +TEST_F(BlobDBTest, ReadWhileGC) { + // run the same test for Get(), MultiGet() and Iterator each. + for (int i = 0; i < 3; i++) { + BlobDBOptions bdb_options; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + blob_db_->Put(WriteOptions(), "foo", "bar"); + BlobDBImpl *blob_db_impl = + static_cast_with_check(blob_db_); + auto blob_files = blob_db_impl->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + std::shared_ptr bfile = blob_files[0]; + uint64_t bfile_number = bfile->BlobFileNumber(); + blob_db_impl->TEST_CloseBlobFile(bfile); + + switch (i) { + case 0: + SyncPoint::GetInstance()->LoadDependency( + {{"BlobDBImpl::Get:AfterIndexEntryGet:1", + "BlobDBTest::ReadWhileGC:1"}, + {"BlobDBTest::ReadWhileGC:2", + "BlobDBImpl::Get:AfterIndexEntryGet:2"}}); + break; + case 1: + SyncPoint::GetInstance()->LoadDependency( + {{"BlobDBImpl::MultiGet:AfterIndexEntryGet:1", + "BlobDBTest::ReadWhileGC:1"}, + {"BlobDBTest::ReadWhileGC:2", + "BlobDBImpl::MultiGet:AfterIndexEntryGet:2"}}); + break; + case 2: + SyncPoint::GetInstance()->LoadDependency( + {{"BlobDBIterator::value:BeforeGetBlob:1", + "BlobDBTest::ReadWhileGC:1"}, + {"BlobDBTest::ReadWhileGC:2", + "BlobDBIterator::value:BeforeGetBlob:2"}}); + break; + } + SyncPoint::GetInstance()->EnableProcessing(); + + auto reader = port::Thread([this, i]() { + std::string value; + std::vector values; + std::vector statuses; + switch (i) { + case 0: + ASSERT_OK(blob_db_->Get(ReadOptions(), "foo", &value)); + ASSERT_EQ("bar", value); + break; + case 1: + statuses = blob_db_->MultiGet(ReadOptions(), {"foo"}, &values); + ASSERT_EQ(1, statuses.size()); + ASSERT_EQ(1, values.size()); + ASSERT_EQ("bar", values[0]); + break; + case 2: + // VerifyDB use iterator to scan the DB. + VerifyDB({{"foo", "bar"}}); + break; + } + }); + + TEST_SYNC_POINT("BlobDBTest::ReadWhileGC:1"); + GCStats gc_stats; + ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(bfile, &gc_stats)); + ASSERT_EQ(1, gc_stats.blob_count); + ASSERT_EQ(1, gc_stats.num_relocate); + ASSERT_EQ(1, gc_stats.relocate_succeeded); + blob_db_impl->TEST_ObsoleteFile(blob_files[0]); + blob_db_impl->TEST_DeleteObsoleteFiles(); + // The file shouln't be deleted + blob_files = blob_db_impl->TEST_GetBlobFiles(); + ASSERT_EQ(2, blob_files.size()); + ASSERT_EQ(bfile_number, blob_files[0]->BlobFileNumber()); + auto obsolete_files = blob_db_impl->TEST_GetObsoleteFiles(); + ASSERT_EQ(1, obsolete_files.size()); + ASSERT_EQ(bfile_number, obsolete_files[0]->BlobFileNumber()); + TEST_SYNC_POINT("BlobDBTest::ReadWhileGC:2"); + reader.join(); + SyncPoint::GetInstance()->DisableProcessing(); + + // The file is deleted this time + blob_db_impl->TEST_DeleteObsoleteFiles(); + blob_files = blob_db_impl->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_NE(bfile_number, blob_files[0]->BlobFileNumber()); + ASSERT_EQ(0, blob_db_impl->TEST_GetObsoleteFiles().size()); + VerifyDB({{"foo", "bar"}}); + Destroy(); + } +} + } // namespace blob_db } // namespace rocksdb