Fix use-after-free in tailing iterator with merge operator

Summary:
ForwardIterator::SVCleanup() sometimes didn't pin superversion when it was supposed to. See the added test for the scenario. Here's the ASAN output of the added test without the fix (using `COMPILE_WITH_ASAN=1 make`): https://pastebin.com/9rD0Ywws
Closes https://github.com/facebook/rocksdb/pull/3415

Differential Revision: D6817414

Pulled By: al13n321

fbshipit-source-id: bc80c44ea78a3a1fa885dfa448a26111f91afb24
main
Mike Kolupaev 7 years ago committed by Facebook Github Bot
parent cd5092e168
commit cb5b8f2090
  1. 2
      db/db_iter.cc
  2. 57
      db/db_merge_operator_test.cc
  3. 64
      db/forward_iterator.cc
  4. 7
      db/forward_iterator.h
  5. 3
      util/sync_point.h

@ -611,10 +611,12 @@ void DBIter::MergeValuesNewToOld() {
// Start the merge process by pushing the first operand // Start the merge process by pushing the first operand
merge_context_.PushOperand(iter_->value(), merge_context_.PushOperand(iter_->value(),
iter_->IsValuePinned() /* operand_pinned */); iter_->IsValuePinned() /* operand_pinned */);
TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:PushedFirstOperand");
ParsedInternalKey ikey; ParsedInternalKey ikey;
Status s; Status s;
for (iter_->Next(); iter_->Valid(); iter_->Next()) { for (iter_->Next(); iter_->Valid(); iter_->Next()) {
TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand");
if (!ParseKey(&ikey)) { if (!ParseKey(&ikey)) {
// skip corrupted key // skip corrupted key
continue; continue;

@ -449,6 +449,63 @@ TEST_P(MergeOperatorPinningTest, TailingIterator) {
writer_thread.join(); writer_thread.join();
reader_thread.join(); reader_thread.join();
} }
TEST_F(DBMergeOperatorTest, TailingIteratorMemtableUnrefedBySomeoneElse) {
Options options = CurrentOptions();
options.merge_operator = MergeOperators::CreateStringAppendOperator();
DestroyAndReopen(options);
// Overview of the test:
// * There are two merge operands for the same key: one in an sst file,
// another in a memtable.
// * Seek a tailing iterator to this key.
// * As part of the seek, the iterator will:
// (a) first visit the operand in the memtable and tell ForwardIterator
// to pin this operand, then
// (b) move on to the operand in the sst file, then pass both operands
// to merge operator.
// * The memtable may get flushed and unreferenced by another thread between
// (a) and (b). The test simulates it by flushing the memtable inside a
// SyncPoint callback located between (a) and (b).
// * In this case it's ForwardIterator's responsibility to keep the memtable
// pinned until (b) is complete. There used to be a bug causing
// ForwardIterator to not pin it in some circumstances. This test
// reproduces it.
db_->Merge(WriteOptions(), "key", "sst");
db_->Flush(FlushOptions()); // Switch to SuperVersion A
db_->Merge(WriteOptions(), "key", "memtable");
// Pin SuperVersion A
std::unique_ptr<Iterator> someone_else(db_->NewIterator(ReadOptions()));
bool pushed_first_operand = false;
bool stepped_to_next_operand = false;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBIter::MergeValuesNewToOld:PushedFirstOperand", [&](void*) {
EXPECT_FALSE(pushed_first_operand);
pushed_first_operand = true;
db_->Flush(FlushOptions()); // Switch to SuperVersion B
});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBIter::MergeValuesNewToOld:SteppedToNextOperand", [&](void*) {
EXPECT_FALSE(stepped_to_next_operand);
stepped_to_next_operand = true;
someone_else.reset(); // Unpin SuperVersion A
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
ReadOptions ro;
ro.tailing = true;
std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
iter->Seek("key");
ASSERT_TRUE(iter->status().ok());
ASSERT_TRUE(iter->Valid());
EXPECT_EQ(std::string("sst,memtable"), iter->value().ToString());
EXPECT_TRUE(pushed_first_operand);
EXPECT_TRUE(stepped_to_next_operand);
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} // namespace rocksdb } // namespace rocksdb

@ -196,35 +196,57 @@ ForwardIterator::~ForwardIterator() {
Cleanup(true); Cleanup(true);
} }
void ForwardIterator::SVCleanup(DBImpl* db, SuperVersion* sv,
bool background_purge_on_iterator_cleanup) {
if (sv->Unref()) {
// Job id == 0 means that this is not our background process, but rather
// user thread
JobContext job_context(0);
db->mutex_.Lock();
sv->Cleanup();
db->FindObsoleteFiles(&job_context, false, true);
if (background_purge_on_iterator_cleanup) {
db->ScheduleBgLogWriterClose(&job_context);
}
db->mutex_.Unlock();
delete sv;
if (job_context.HaveSomethingToDelete()) {
db->PurgeObsoleteFiles(job_context, background_purge_on_iterator_cleanup);
}
job_context.Clean();
}
}
namespace { namespace {
struct SVCleanupParams {
DBImpl* db;
SuperVersion* sv;
bool background_purge_on_iterator_cleanup;
};
}
// Used in PinnedIteratorsManager to release pinned SuperVersion // Used in PinnedIteratorsManager to release pinned SuperVersion
static void ReleaseSuperVersionFunc(void* sv) { void ForwardIterator::DeferredSVCleanup(void* arg) {
delete reinterpret_cast<SuperVersion*>(sv); auto d = reinterpret_cast<SVCleanupParams*>(arg);
ForwardIterator::SVCleanup(
d->db, d->sv, d->background_purge_on_iterator_cleanup);
delete d;
} }
} // namespace
void ForwardIterator::SVCleanup() { void ForwardIterator::SVCleanup() {
if (sv_ != nullptr && sv_->Unref()) { if (sv_ == nullptr) {
// Job id == 0 means that this is not our background process, but rather return;
// user thread
JobContext job_context(0);
db_->mutex_.Lock();
sv_->Cleanup();
db_->FindObsoleteFiles(&job_context, false, true);
if (read_options_.background_purge_on_iterator_cleanup) {
db_->ScheduleBgLogWriterClose(&job_context);
} }
db_->mutex_.Unlock();
if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) { if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
pinned_iters_mgr_->PinPtr(sv_, &ReleaseSuperVersionFunc); // pinned_iters_mgr_ tells us to make sure that all visited key-value slices
// are alive until pinned_iters_mgr_->ReleasePinnedData() is called.
// The slices may point into some memtables owned by sv_, so we need to keep
// sv_ referenced until pinned_iters_mgr_ unpins everything.
auto p = new SVCleanupParams{
db_, sv_, read_options_.background_purge_on_iterator_cleanup};
pinned_iters_mgr_->PinPtr(p, &ForwardIterator::DeferredSVCleanup);
} else { } else {
delete sv_; SVCleanup(db_, sv_, read_options_.background_purge_on_iterator_cleanup);
}
if (job_context.HaveSomethingToDelete()) {
db_->PurgeObsoleteFiles(
job_context, read_options_.background_purge_on_iterator_cleanup);
}
job_context.Clean();
} }
} }

@ -85,7 +85,14 @@ class ForwardIterator : public InternalIterator {
private: private:
void Cleanup(bool release_sv); void Cleanup(bool release_sv);
// Unreference and, if needed, clean up the current SuperVersion. This is
// either done immediately or deferred until this iterator is unpinned by
// PinnedIteratorsManager.
void SVCleanup(); void SVCleanup();
static void SVCleanup(
DBImpl* db, SuperVersion* sv, bool background_purge_on_iterator_cleanup);
static void DeferredSVCleanup(void* arg);
void RebuildIterators(bool refresh_sv); void RebuildIterators(bool refresh_sv);
void RenewIterators(); void RenewIterators();
void BuildLevelIterators(const VersionStorageInfo* vstorage); void BuildLevelIterators(const VersionStorageInfo* vstorage);

@ -82,6 +82,9 @@ class SyncPoint {
const std::vector<SyncPointPair>& markers); const std::vector<SyncPointPair>& markers);
// Set up a call back function in sync point. // Set up a call back function in sync point.
// The argument to the callback is passed through from
// TEST_SYNC_POINT_CALLBACK(); nullptr if TEST_SYNC_POINT or
// TEST_IDX_SYNC_POINT was used.
void SetCallBack(const std::string point, void SetCallBack(const std::string point,
std::function<void(void*)> callback); std::function<void(void*)> callback);

Loading…
Cancel
Save