From 5d4fddfa52768952e58abae5044174ad0ce30f7d Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Tue, 15 Jan 2019 21:32:15 -0800 Subject: [PATCH] WritePrepared: Fix visible key compacted out by compaction (#4883) Summary: With WritePrepared transaction, flush/compaction can contain uncommitted keys, and those keys can get committed during compaction. If a snapshot is taken before the key is committed, it should not see the key. On the other hand, compaction grab the list of snapshots at its beginning, and only consider those snapshots to dedup keys. Consider the case: ``` seq = 1: put "foo" = "bar" seq = 2: transaction T: delete "foo", prepare seq = 3: compaction start seq = 4: take snapshot S seq = 5: transaction T: commit. ... seq = N: compaction iterator reached key "foo". ``` When compaction start, the list of snapshot is empty. Compaction doesn't take snapshot S into account. When it reached "foo", transaction T is committed. Compaction may think the value "foo=bar" is not visible by any snapshot (which is wrong), and compact the value out. The fix is to explicitly take a snapshot before compaction grabbing the list of snapshots. Compaction will then has to keep keys visible to this snapshot. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4883 Differential Revision: D13668775 Pulled By: maysamyabandeh fbshipit-source-id: 1cab9615f94b7d3e8522cc3d44c3a14c7d4720e4 --- db/compaction_iterator.cc | 1 + db/db_impl.cc | 17 ++++- db/db_impl.h | 9 ++- db/db_impl_compaction_flush.cc | 68 ++++++++++++------- db/job_context.h | 4 ++ .../write_prepared_transaction_test.cc | 28 ++++++++ 6 files changed, 98 insertions(+), 29 deletions(-) diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 413ecab08..7f15a186c 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -91,6 +91,7 @@ CompactionIterator::CompactionIterator( ignore_snapshots_ = false; } input_->SetPinnedItersMgr(&pinned_iters_mgr_); + TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get()); } CompactionIterator::~CompactionIterator() { diff --git a/db/db_impl.cc b/db/db_impl.cc index cdc66701b..ba9d7ff70 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1960,21 +1960,32 @@ const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() { } #endif // ROCKSDB_LITE -SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary) { +SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary, + bool lock) { int64_t unix_time = 0; env_->GetCurrentTime(&unix_time); // Ignore error SnapshotImpl* s = new SnapshotImpl; - InstrumentedMutexLock l(&mutex_); + if (lock) { + mutex_.Lock(); + } // returns null if the underlying memtable does not support snapshot. if (!is_snapshot_supported_) { + if (lock) { + mutex_.Unlock(); + } delete s; return nullptr; } auto snapshot_seq = last_seq_same_as_publish_seq_ ? versions_->LastSequence() : versions_->LastPublishedSequence(); - return snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary); + SnapshotImpl* snapshot = + snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary); + if (lock) { + mutex_.Unlock(); + } + return snapshot; } void DBImpl::ReleaseSnapshot(const Snapshot* s) { diff --git a/db/db_impl.h b/db/db_impl.h index 166dc6abe..4b663cf23 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -700,6 +700,12 @@ class DBImpl : public DB { void SetSnapshotChecker(SnapshotChecker* snapshot_checker); + // Fill JobContext with snapshot information needed by flush and compaction. + void GetSnapshotContext(JobContext* job_context, + std::vector* snapshot_seqs, + SequenceNumber* earliest_write_conflict_snapshot, + SnapshotChecker** snapshot_checker); + // Not thread-safe. void SetRecoverableStatePreReleaseCallback(PreReleaseCallback* callback); @@ -1148,7 +1154,8 @@ class DBImpl : public DB { // helper function to call after some of the logs_ were synced void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status); - SnapshotImpl* GetSnapshotImpl(bool is_write_conflict_boundary); + SnapshotImpl* GetSnapshotImpl(bool is_write_conflict_boundary, + bool lock = true); uint64_t GetMaxTotalWalSize() const; diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index dccc73deb..d0feb9c40 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -137,14 +137,12 @@ Status DBImpl::FlushMemTableToOutputFile( assert(cfd->imm()->NumNotFlushed() != 0); assert(cfd->imm()->IsFlushPending()); + std::vector snapshot_seqs; SequenceNumber earliest_write_conflict_snapshot; - std::vector snapshot_seqs = - snapshots_.GetAll(&earliest_write_conflict_snapshot); + SnapshotChecker* snapshot_checker; + GetSnapshotContext(job_context, &snapshot_seqs, + &earliest_write_conflict_snapshot, &snapshot_checker); - auto snapshot_checker = snapshot_checker_.get(); - if (use_custom_gc_ && snapshot_checker == nullptr) { - snapshot_checker = DisableGCSnapshotChecker::Instance(); - } FlushJob flush_job( dbname_, cfd, immutable_db_options_, mutable_cf_options, nullptr /* memtable_id */, env_options_for_compaction_, versions_.get(), @@ -287,14 +285,12 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( } #endif /* !NDEBUG */ + std::vector snapshot_seqs; SequenceNumber earliest_write_conflict_snapshot; - std::vector snapshot_seqs = - snapshots_.GetAll(&earliest_write_conflict_snapshot); + SnapshotChecker* snapshot_checker; + GetSnapshotContext(job_context, &snapshot_seqs, + &earliest_write_conflict_snapshot, &snapshot_checker); - auto snapshot_checker = snapshot_checker_.get(); - if (use_custom_gc_ && snapshot_checker == nullptr) { - snapshot_checker = DisableGCSnapshotChecker::Instance(); - } autovector distinct_output_dirs; std::vector jobs; std::vector all_mutable_cf_options; @@ -936,17 +932,15 @@ Status DBImpl::CompactFilesImpl( // deletion compaction currently not allowed in CompactFiles. assert(!c->deletion_compaction()); + std::vector snapshot_seqs; SequenceNumber earliest_write_conflict_snapshot; - std::vector snapshot_seqs = - snapshots_.GetAll(&earliest_write_conflict_snapshot); + SnapshotChecker* snapshot_checker; + GetSnapshotContext(job_context, &snapshot_seqs, + &earliest_write_conflict_snapshot, &snapshot_checker); auto pending_outputs_inserted_elem = CaptureCurrentFileNumberInPendingOutputs(); - auto snapshot_checker = snapshot_checker_.get(); - if (use_custom_gc_ && snapshot_checker == nullptr) { - snapshot_checker = DisableGCSnapshotChecker::Instance(); - } assert(is_snapshot_supported_ || snapshots_.empty()); CompactionJobStats compaction_job_stats; CompactionJob compaction_job( @@ -2576,14 +2570,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, output_level = c->output_level(); TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial", &output_level); + std::vector snapshot_seqs; SequenceNumber earliest_write_conflict_snapshot; - std::vector snapshot_seqs = - snapshots_.GetAll(&earliest_write_conflict_snapshot); - - auto snapshot_checker = snapshot_checker_.get(); - if (use_custom_gc_ && snapshot_checker == nullptr) { - snapshot_checker = DisableGCSnapshotChecker::Instance(); - } + SnapshotChecker* snapshot_checker; + GetSnapshotContext(job_context, &snapshot_seqs, + &earliest_write_conflict_snapshot, &snapshot_checker); assert(is_snapshot_supported_ || snapshots_.empty()); CompactionJob compaction_job( job_context->job_id, c.get(), immutable_db_options_, @@ -2914,4 +2905,31 @@ void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) { assert(!snapshot_checker_); snapshot_checker_.reset(snapshot_checker); } + +void DBImpl::GetSnapshotContext( + JobContext* job_context, std::vector* snapshot_seqs, + SequenceNumber* earliest_write_conflict_snapshot, + SnapshotChecker** snapshot_checker_ptr) { + mutex_.AssertHeld(); + assert(job_context != nullptr); + assert(snapshot_seqs != nullptr); + assert(earliest_write_conflict_snapshot != nullptr); + assert(snapshot_checker_ptr != nullptr); + + *snapshot_checker_ptr = snapshot_checker_.get(); + if (use_custom_gc_ && *snapshot_checker_ptr == nullptr) { + *snapshot_checker_ptr = DisableGCSnapshotChecker::Instance(); + } + if (*snapshot_checker_ptr != nullptr) { + // If snapshot_checker is used, that means the flush/compaction may + // contain values not visible to snapshot taken after + // flush/compaction job starts. Take a snapshot and it will appear + // in snapshot_seqs and force compaction iterator to consider such + // snapshots. + const Snapshot* job_snapshot = + GetSnapshotImpl(false /*write_conflict_boundary*/, false /*lock*/); + job_context->job_snapshot.reset(new ManagedSnapshot(this, job_snapshot)); + } + *snapshot_seqs = snapshots_.GetAll(earliest_write_conflict_snapshot); +} } // namespace rocksdb diff --git a/db/job_context.h b/db/job_context.h index 498ef7d17..3978fad33 100644 --- a/db/job_context.h +++ b/db/job_context.h @@ -175,6 +175,9 @@ struct JobContext { size_t num_alive_log_files = 0; uint64_t size_log_to_delete = 0; + // Snapshot taken before flush/compaction job. + std::unique_ptr job_snapshot; + explicit JobContext(int _job_id, bool create_superversion = false) { job_id = _job_id; manifest_file_number = 0; @@ -204,6 +207,7 @@ struct JobContext { memtables_to_free.clear(); logs_to_free.clear(); + job_snapshot.reset(); } ~JobContext() { diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index d0085de40..2ddfb9758 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -2374,6 +2374,34 @@ TEST_P(WritePreparedTransactionTest, delete transaction; } +TEST_P(WritePreparedTransactionTest, CommitAndSnapshotDuringCompaction) { + options.disable_auto_compactions = true; + ReOpen(); + + const Snapshot* snapshot = nullptr; + ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); + auto* txn = db->BeginTransaction(WriteOptions()); + ASSERT_OK(txn->SetName("txn")); + ASSERT_OK(txn->Put("key1", "value2")); + ASSERT_OK(txn->Prepare()); + + auto callback = [&](void*) { + // Snapshot is taken after compaction start. It should be taken into + // consideration for whether to compact out value1. + snapshot = db->GetSnapshot(); + ASSERT_OK(txn->Commit()); + delete txn; + }; + SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit", + callback); + SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(db->Flush(FlushOptions())); + ASSERT_NE(nullptr, snapshot); + VerifyKeys({{"key1", "value2"}}); + VerifyKeys({{"key1", "value1"}}, snapshot); + db->ReleaseSnapshot(snapshot); +} + TEST_P(WritePreparedTransactionTest, Iterate) { auto verify_state = [](Iterator* iter, const std::string& key, const std::string& value) {