WritePrepared: Fix visible key compacted out by compaction (#4883)

Summary:
With WritePrepared transaction, flush/compaction can contain uncommitted keys, and those keys can get committed during compaction. If a snapshot is taken before the key is committed, it should not see the key. On the other hand, compaction grab the list of snapshots at its beginning, and only consider those snapshots to dedup keys. Consider the case:
```
seq = 1: put "foo" = "bar"
seq = 2: transaction T: delete "foo", prepare
seq = 3: compaction start
seq = 4: take snapshot S
seq = 5: transaction T: commit.
...
seq = N: compaction iterator reached key "foo".
```
When compaction start, the list of snapshot is empty. Compaction doesn't take snapshot S into account. When it reached "foo", transaction T is committed. Compaction may think the value "foo=bar" is not visible by any snapshot (which is wrong), and compact the value out.

The fix is to explicitly take a snapshot before compaction grabbing the list of snapshots. Compaction will then has to keep keys visible to this snapshot.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4883

Differential Revision: D13668775

Pulled By: maysamyabandeh

fbshipit-source-id: 1cab9615f94b7d3e8522cc3d44c3a14c7d4720e4
main
Yi Wu 6 years ago committed by Facebook Github Bot
parent cad99a6031
commit 5d4fddfa52
  1. 1
      db/compaction_iterator.cc
  2. 17
      db/db_impl.cc
  3. 9
      db/db_impl.h
  4. 68
      db/db_impl_compaction_flush.cc
  5. 4
      db/job_context.h
  6. 28
      utilities/transactions/write_prepared_transaction_test.cc

@ -91,6 +91,7 @@ CompactionIterator::CompactionIterator(
ignore_snapshots_ = false; ignore_snapshots_ = false;
} }
input_->SetPinnedItersMgr(&pinned_iters_mgr_); input_->SetPinnedItersMgr(&pinned_iters_mgr_);
TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get());
} }
CompactionIterator::~CompactionIterator() { CompactionIterator::~CompactionIterator() {

@ -1960,21 +1960,32 @@ const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() {
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary) { SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
bool lock) {
int64_t unix_time = 0; int64_t unix_time = 0;
env_->GetCurrentTime(&unix_time); // Ignore error env_->GetCurrentTime(&unix_time); // Ignore error
SnapshotImpl* s = new SnapshotImpl; SnapshotImpl* s = new SnapshotImpl;
InstrumentedMutexLock l(&mutex_); if (lock) {
mutex_.Lock();
}
// returns null if the underlying memtable does not support snapshot. // returns null if the underlying memtable does not support snapshot.
if (!is_snapshot_supported_) { if (!is_snapshot_supported_) {
if (lock) {
mutex_.Unlock();
}
delete s; delete s;
return nullptr; return nullptr;
} }
auto snapshot_seq = last_seq_same_as_publish_seq_ auto snapshot_seq = last_seq_same_as_publish_seq_
? versions_->LastSequence() ? versions_->LastSequence()
: versions_->LastPublishedSequence(); : versions_->LastPublishedSequence();
return snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary); SnapshotImpl* snapshot =
snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
if (lock) {
mutex_.Unlock();
}
return snapshot;
} }
void DBImpl::ReleaseSnapshot(const Snapshot* s) { void DBImpl::ReleaseSnapshot(const Snapshot* s) {

@ -700,6 +700,12 @@ class DBImpl : public DB {
void SetSnapshotChecker(SnapshotChecker* snapshot_checker); void SetSnapshotChecker(SnapshotChecker* snapshot_checker);
// Fill JobContext with snapshot information needed by flush and compaction.
void GetSnapshotContext(JobContext* job_context,
std::vector<SequenceNumber>* snapshot_seqs,
SequenceNumber* earliest_write_conflict_snapshot,
SnapshotChecker** snapshot_checker);
// Not thread-safe. // Not thread-safe.
void SetRecoverableStatePreReleaseCallback(PreReleaseCallback* callback); void SetRecoverableStatePreReleaseCallback(PreReleaseCallback* callback);
@ -1148,7 +1154,8 @@ class DBImpl : public DB {
// helper function to call after some of the logs_ were synced // helper function to call after some of the logs_ were synced
void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status); void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status);
SnapshotImpl* GetSnapshotImpl(bool is_write_conflict_boundary); SnapshotImpl* GetSnapshotImpl(bool is_write_conflict_boundary,
bool lock = true);
uint64_t GetMaxTotalWalSize() const; uint64_t GetMaxTotalWalSize() const;

@ -137,14 +137,12 @@ Status DBImpl::FlushMemTableToOutputFile(
assert(cfd->imm()->NumNotFlushed() != 0); assert(cfd->imm()->NumNotFlushed() != 0);
assert(cfd->imm()->IsFlushPending()); assert(cfd->imm()->IsFlushPending());
std::vector<SequenceNumber> snapshot_seqs;
SequenceNumber earliest_write_conflict_snapshot; SequenceNumber earliest_write_conflict_snapshot;
std::vector<SequenceNumber> snapshot_seqs = SnapshotChecker* snapshot_checker;
snapshots_.GetAll(&earliest_write_conflict_snapshot); GetSnapshotContext(job_context, &snapshot_seqs,
&earliest_write_conflict_snapshot, &snapshot_checker);
auto snapshot_checker = snapshot_checker_.get();
if (use_custom_gc_ && snapshot_checker == nullptr) {
snapshot_checker = DisableGCSnapshotChecker::Instance();
}
FlushJob flush_job( FlushJob flush_job(
dbname_, cfd, immutable_db_options_, mutable_cf_options, dbname_, cfd, immutable_db_options_, mutable_cf_options,
nullptr /* memtable_id */, env_options_for_compaction_, versions_.get(), nullptr /* memtable_id */, env_options_for_compaction_, versions_.get(),
@ -287,14 +285,12 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
} }
#endif /* !NDEBUG */ #endif /* !NDEBUG */
std::vector<SequenceNumber> snapshot_seqs;
SequenceNumber earliest_write_conflict_snapshot; SequenceNumber earliest_write_conflict_snapshot;
std::vector<SequenceNumber> snapshot_seqs = SnapshotChecker* snapshot_checker;
snapshots_.GetAll(&earliest_write_conflict_snapshot); GetSnapshotContext(job_context, &snapshot_seqs,
&earliest_write_conflict_snapshot, &snapshot_checker);
auto snapshot_checker = snapshot_checker_.get();
if (use_custom_gc_ && snapshot_checker == nullptr) {
snapshot_checker = DisableGCSnapshotChecker::Instance();
}
autovector<Directory*> distinct_output_dirs; autovector<Directory*> distinct_output_dirs;
std::vector<FlushJob> jobs; std::vector<FlushJob> jobs;
std::vector<MutableCFOptions> all_mutable_cf_options; std::vector<MutableCFOptions> all_mutable_cf_options;
@ -936,17 +932,15 @@ Status DBImpl::CompactFilesImpl(
// deletion compaction currently not allowed in CompactFiles. // deletion compaction currently not allowed in CompactFiles.
assert(!c->deletion_compaction()); assert(!c->deletion_compaction());
std::vector<SequenceNumber> snapshot_seqs;
SequenceNumber earliest_write_conflict_snapshot; SequenceNumber earliest_write_conflict_snapshot;
std::vector<SequenceNumber> snapshot_seqs = SnapshotChecker* snapshot_checker;
snapshots_.GetAll(&earliest_write_conflict_snapshot); GetSnapshotContext(job_context, &snapshot_seqs,
&earliest_write_conflict_snapshot, &snapshot_checker);
auto pending_outputs_inserted_elem = auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs(); CaptureCurrentFileNumberInPendingOutputs();
auto snapshot_checker = snapshot_checker_.get();
if (use_custom_gc_ && snapshot_checker == nullptr) {
snapshot_checker = DisableGCSnapshotChecker::Instance();
}
assert(is_snapshot_supported_ || snapshots_.empty()); assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJobStats compaction_job_stats; CompactionJobStats compaction_job_stats;
CompactionJob compaction_job( CompactionJob compaction_job(
@ -2576,14 +2570,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
output_level = c->output_level(); output_level = c->output_level();
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial", TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
&output_level); &output_level);
std::vector<SequenceNumber> snapshot_seqs;
SequenceNumber earliest_write_conflict_snapshot; SequenceNumber earliest_write_conflict_snapshot;
std::vector<SequenceNumber> snapshot_seqs = SnapshotChecker* snapshot_checker;
snapshots_.GetAll(&earliest_write_conflict_snapshot); GetSnapshotContext(job_context, &snapshot_seqs,
&earliest_write_conflict_snapshot, &snapshot_checker);
auto snapshot_checker = snapshot_checker_.get();
if (use_custom_gc_ && snapshot_checker == nullptr) {
snapshot_checker = DisableGCSnapshotChecker::Instance();
}
assert(is_snapshot_supported_ || snapshots_.empty()); assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJob compaction_job( CompactionJob compaction_job(
job_context->job_id, c.get(), immutable_db_options_, job_context->job_id, c.get(), immutable_db_options_,
@ -2914,4 +2905,31 @@ void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) {
assert(!snapshot_checker_); assert(!snapshot_checker_);
snapshot_checker_.reset(snapshot_checker); snapshot_checker_.reset(snapshot_checker);
} }
void DBImpl::GetSnapshotContext(
JobContext* job_context, std::vector<SequenceNumber>* snapshot_seqs,
SequenceNumber* earliest_write_conflict_snapshot,
SnapshotChecker** snapshot_checker_ptr) {
mutex_.AssertHeld();
assert(job_context != nullptr);
assert(snapshot_seqs != nullptr);
assert(earliest_write_conflict_snapshot != nullptr);
assert(snapshot_checker_ptr != nullptr);
*snapshot_checker_ptr = snapshot_checker_.get();
if (use_custom_gc_ && *snapshot_checker_ptr == nullptr) {
*snapshot_checker_ptr = DisableGCSnapshotChecker::Instance();
}
if (*snapshot_checker_ptr != nullptr) {
// If snapshot_checker is used, that means the flush/compaction may
// contain values not visible to snapshot taken after
// flush/compaction job starts. Take a snapshot and it will appear
// in snapshot_seqs and force compaction iterator to consider such
// snapshots.
const Snapshot* job_snapshot =
GetSnapshotImpl(false /*write_conflict_boundary*/, false /*lock*/);
job_context->job_snapshot.reset(new ManagedSnapshot(this, job_snapshot));
}
*snapshot_seqs = snapshots_.GetAll(earliest_write_conflict_snapshot);
}
} // namespace rocksdb } // namespace rocksdb

@ -175,6 +175,9 @@ struct JobContext {
size_t num_alive_log_files = 0; size_t num_alive_log_files = 0;
uint64_t size_log_to_delete = 0; uint64_t size_log_to_delete = 0;
// Snapshot taken before flush/compaction job.
std::unique_ptr<ManagedSnapshot> job_snapshot;
explicit JobContext(int _job_id, bool create_superversion = false) { explicit JobContext(int _job_id, bool create_superversion = false) {
job_id = _job_id; job_id = _job_id;
manifest_file_number = 0; manifest_file_number = 0;
@ -204,6 +207,7 @@ struct JobContext {
memtables_to_free.clear(); memtables_to_free.clear();
logs_to_free.clear(); logs_to_free.clear();
job_snapshot.reset();
} }
~JobContext() { ~JobContext() {

@ -2374,6 +2374,34 @@ TEST_P(WritePreparedTransactionTest,
delete transaction; delete transaction;
} }
TEST_P(WritePreparedTransactionTest, CommitAndSnapshotDuringCompaction) {
options.disable_auto_compactions = true;
ReOpen();
const Snapshot* snapshot = nullptr;
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
auto* txn = db->BeginTransaction(WriteOptions());
ASSERT_OK(txn->SetName("txn"));
ASSERT_OK(txn->Put("key1", "value2"));
ASSERT_OK(txn->Prepare());
auto callback = [&](void*) {
// Snapshot is taken after compaction start. It should be taken into
// consideration for whether to compact out value1.
snapshot = db->GetSnapshot();
ASSERT_OK(txn->Commit());
delete txn;
};
SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
callback);
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db->Flush(FlushOptions()));
ASSERT_NE(nullptr, snapshot);
VerifyKeys({{"key1", "value2"}});
VerifyKeys({{"key1", "value1"}}, snapshot);
db->ReleaseSnapshot(snapshot);
}
TEST_P(WritePreparedTransactionTest, Iterate) { TEST_P(WritePreparedTransactionTest, Iterate) {
auto verify_state = [](Iterator* iter, const std::string& key, auto verify_state = [](Iterator* iter, const std::string& key,
const std::string& value) { const std::string& value) {

Loading…
Cancel
Save