diff --git a/db/db_iter.cc b/db/db_iter.cc index ccd8f3de7..9f2d283fb 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -611,10 +611,12 @@ void DBIter::MergeValuesNewToOld() { // Start the merge process by pushing the first operand merge_context_.PushOperand(iter_->value(), iter_->IsValuePinned() /* operand_pinned */); + TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:PushedFirstOperand"); ParsedInternalKey ikey; Status s; for (iter_->Next(); iter_->Valid(); iter_->Next()) { + TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand"); if (!ParseKey(&ikey)) { // skip corrupted key continue; diff --git a/db/db_merge_operator_test.cc b/db/db_merge_operator_test.cc index 73eafcf9a..a1b3ec43c 100644 --- a/db/db_merge_operator_test.cc +++ b/db/db_merge_operator_test.cc @@ -449,6 +449,63 @@ TEST_P(MergeOperatorPinningTest, TailingIterator) { writer_thread.join(); reader_thread.join(); } + +TEST_F(DBMergeOperatorTest, TailingIteratorMemtableUnrefedBySomeoneElse) { + Options options = CurrentOptions(); + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + DestroyAndReopen(options); + + // Overview of the test: + // * There are two merge operands for the same key: one in an sst file, + // another in a memtable. + // * Seek a tailing iterator to this key. + // * As part of the seek, the iterator will: + // (a) first visit the operand in the memtable and tell ForwardIterator + // to pin this operand, then + // (b) move on to the operand in the sst file, then pass both operands + // to merge operator. + // * The memtable may get flushed and unreferenced by another thread between + // (a) and (b). The test simulates it by flushing the memtable inside a + // SyncPoint callback located between (a) and (b). + // * In this case it's ForwardIterator's responsibility to keep the memtable + // pinned until (b) is complete. There used to be a bug causing + // ForwardIterator to not pin it in some circumstances. This test + // reproduces it. + + db_->Merge(WriteOptions(), "key", "sst"); + db_->Flush(FlushOptions()); // Switch to SuperVersion A + db_->Merge(WriteOptions(), "key", "memtable"); + + // Pin SuperVersion A + std::unique_ptr someone_else(db_->NewIterator(ReadOptions())); + + bool pushed_first_operand = false; + bool stepped_to_next_operand = false; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBIter::MergeValuesNewToOld:PushedFirstOperand", [&](void*) { + EXPECT_FALSE(pushed_first_operand); + pushed_first_operand = true; + db_->Flush(FlushOptions()); // Switch to SuperVersion B + }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBIter::MergeValuesNewToOld:SteppedToNextOperand", [&](void*) { + EXPECT_FALSE(stepped_to_next_operand); + stepped_to_next_operand = true; + someone_else.reset(); // Unpin SuperVersion A + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + ReadOptions ro; + ro.tailing = true; + std::unique_ptr iter(db_->NewIterator(ro)); + iter->Seek("key"); + + ASSERT_TRUE(iter->status().ok()); + ASSERT_TRUE(iter->Valid()); + EXPECT_EQ(std::string("sst,memtable"), iter->value().ToString()); + EXPECT_TRUE(pushed_first_operand); + EXPECT_TRUE(stepped_to_next_operand); +} #endif // ROCKSDB_LITE } // namespace rocksdb diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index 6074c4d12..a2db7772a 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -196,38 +196,60 @@ ForwardIterator::~ForwardIterator() { Cleanup(true); } -namespace { -// Used in PinnedIteratorsManager to release pinned SuperVersion -static void ReleaseSuperVersionFunc(void* sv) { - delete reinterpret_cast(sv); -} -} // namespace - -void ForwardIterator::SVCleanup() { - if (sv_ != nullptr && sv_->Unref()) { +void ForwardIterator::SVCleanup(DBImpl* db, SuperVersion* sv, + bool background_purge_on_iterator_cleanup) { + if (sv->Unref()) { // Job id == 0 means that this is not our background process, but rather // user thread JobContext job_context(0); - db_->mutex_.Lock(); - sv_->Cleanup(); - db_->FindObsoleteFiles(&job_context, false, true); - if (read_options_.background_purge_on_iterator_cleanup) { - db_->ScheduleBgLogWriterClose(&job_context); - } - db_->mutex_.Unlock(); - if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) { - pinned_iters_mgr_->PinPtr(sv_, &ReleaseSuperVersionFunc); - } else { - delete sv_; - } + db->mutex_.Lock(); + sv->Cleanup(); + db->FindObsoleteFiles(&job_context, false, true); + if (background_purge_on_iterator_cleanup) { + db->ScheduleBgLogWriterClose(&job_context); + } + db->mutex_.Unlock(); + delete sv; if (job_context.HaveSomethingToDelete()) { - db_->PurgeObsoleteFiles( - job_context, read_options_.background_purge_on_iterator_cleanup); + db->PurgeObsoleteFiles(job_context, background_purge_on_iterator_cleanup); } job_context.Clean(); } } +namespace { +struct SVCleanupParams { + DBImpl* db; + SuperVersion* sv; + bool background_purge_on_iterator_cleanup; +}; +} + +// Used in PinnedIteratorsManager to release pinned SuperVersion +void ForwardIterator::DeferredSVCleanup(void* arg) { + auto d = reinterpret_cast(arg); + ForwardIterator::SVCleanup( + d->db, d->sv, d->background_purge_on_iterator_cleanup); + delete d; +} + +void ForwardIterator::SVCleanup() { + if (sv_ == nullptr) { + return; + } + if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) { + // pinned_iters_mgr_ tells us to make sure that all visited key-value slices + // are alive until pinned_iters_mgr_->ReleasePinnedData() is called. + // The slices may point into some memtables owned by sv_, so we need to keep + // sv_ referenced until pinned_iters_mgr_ unpins everything. + auto p = new SVCleanupParams{ + db_, sv_, read_options_.background_purge_on_iterator_cleanup}; + pinned_iters_mgr_->PinPtr(p, &ForwardIterator::DeferredSVCleanup); + } else { + SVCleanup(db_, sv_, read_options_.background_purge_on_iterator_cleanup); + } +} + void ForwardIterator::Cleanup(bool release_sv) { if (mutable_iter_ != nullptr) { DeleteIterator(mutable_iter_, true /* is_arena */); diff --git a/db/forward_iterator.h b/db/forward_iterator.h index d4f32cba9..d6c825c3e 100644 --- a/db/forward_iterator.h +++ b/db/forward_iterator.h @@ -85,7 +85,14 @@ class ForwardIterator : public InternalIterator { private: void Cleanup(bool release_sv); + // Unreference and, if needed, clean up the current SuperVersion. This is + // either done immediately or deferred until this iterator is unpinned by + // PinnedIteratorsManager. void SVCleanup(); + static void SVCleanup( + DBImpl* db, SuperVersion* sv, bool background_purge_on_iterator_cleanup); + static void DeferredSVCleanup(void* arg); + void RebuildIterators(bool refresh_sv); void RenewIterators(); void BuildLevelIterators(const VersionStorageInfo* vstorage); diff --git a/util/sync_point.h b/util/sync_point.h index d836ed468..ab5468059 100644 --- a/util/sync_point.h +++ b/util/sync_point.h @@ -82,6 +82,9 @@ class SyncPoint { const std::vector& markers); // Set up a call back function in sync point. + // The argument to the callback is passed through from + // TEST_SYNC_POINT_CALLBACK(); nullptr if TEST_SYNC_POINT or + // TEST_IDX_SYNC_POINT was used. void SetCallBack(const std::string point, std::function callback);