CompactionIterator sees consistent view of which keys are committed (#9830)

Summary:
**This PR does not affect the functionality of `DB` and write-committed transactions.**

`CompactionIterator` uses `KeyCommitted(seq)` to determine if a key in the database is committed.
As the name 'write-committed' implies, if write-committed policy is used, a key exists in the database only if
it is committed. In fact, the implementation of `KeyCommitted()` is as follows:

```
inline bool KeyCommitted(SequenceNumber seq) {
  // For non-txn-db and write-committed, snapshot_checker_ is always nullptr.
  return snapshot_checker_ == nullptr ||
         snapshot_checker_->CheckInSnapshot(seq, kMaxSequence) == SnapshotCheckerResult::kInSnapshot;
}
```

With that being said, we focus on write-prepared/write-unprepared transactions.

A few notes:
- A key can exist in the db even if it's uncommitted. Therefore, we rely on `snapshot_checker_` to determine data visibility. We also require that all writes go through transaction API instead of the raw `WriteBatch` + `Write`, thus at most one uncommitted version of one user key can exist in the database.
- `CompactionIterator` outputs a key as long as the key is uncommitted.

Due to the above reasons, it is possible that `CompactionIterator` decides to output an uncommitted key without
doing further checks on the key (`NextFromInput()`). By the time the key is being prepared for output, the key becomes
committed because the `snapshot_checker_(seq, kMaxSequence)` becomes true in the implementation of `KeyCommitted()`.
Then `CompactionIterator` will try to zero its sequence number and hit assertion error if the key is a tombstone.

To fix this issue, we should make the `CompactionIterator` see a consistent view of the input keys. Note that
for write-prepared/write-unprepared, the background flush/compaction jobs already take a "job snapshot" before starting
processing keys. The job snapshot is released only after the entire flush/compaction finishes. We can use this snapshot
to determine whether a key is committed or not with minor change to `KeyCommitted()`.

```
inline bool KeyCommitted(SequenceNumber sequence) {
  // For non-txn-db and write-committed, snapshot_checker_ is always nullptr.
  return snapshot_checker_ == nullptr ||
         snapshot_checker_->CheckInSnapshot(sequence, job_snapshot_) ==
             SnapshotCheckerResult::kInSnapshot;
}
```

As a result, whether a key is committed or not will remain a constant throughout compaction, causing no trouble
for `CompactionIterator`s assertions.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9830

Test Plan: make check

Reviewed By: ltamasi

Differential Revision: D35561162

Pulled By: riversand963

fbshipit-source-id: 0e00d200c195240341cfe6d34cbc86798b315b9f
main
Yanqin Jin 2 years ago committed by Facebook GitHub Bot
parent 844a35108b
commit 0bd4dcde6b
  1. 9
      db/builder.cc
  2. 6
      db/builder.h
  3. 13
      db/compaction/compaction_iterator.cc
  4. 11
      db/compaction/compaction_iterator.h
  5. 10
      db/compaction/compaction_iterator_test.cc
  6. 17
      db/compaction/compaction_job.cc
  7. 4
      db/compaction/compaction_job.h
  8. 6
      db/compaction/compaction_job_test.cc
  9. 4
      db/db_impl/db_impl_compaction_flush.cc
  10. 6
      db/db_impl/db_impl_open.cc
  11. 22
      db/flush_job.cc
  12. 8
      db/job_context.h
  13. 2
      db/repair.cc

@ -62,9 +62,9 @@ Status BuildTable(
FileMetaData* meta, std::vector<BlobFileAddition>* blob_file_additions,
std::vector<SequenceNumber> snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, bool paranoid_file_checks,
InternalStats* internal_stats, IOStatus* io_status,
const std::shared_ptr<IOTracer>& io_tracer,
SequenceNumber job_snapshot, SnapshotChecker* snapshot_checker,
bool paranoid_file_checks, InternalStats* internal_stats,
IOStatus* io_status, const std::shared_ptr<IOTracer>& io_tracer,
BlobFileCreationReason blob_creation_reason, EventLogger* event_logger,
int job_id, const Env::IOPriority io_priority,
TableProperties* table_properties, Env::WriteLifeTimeHint write_hint,
@ -189,7 +189,8 @@ Status BuildTable(
CompactionIterator c_iter(
iter, tboptions.internal_comparator.user_comparator(), &merge,
kMaxSequenceNumber, &snapshots, earliest_write_conflict_snapshot,
snapshot_checker, env, ShouldReportDetailedTime(env, ioptions.stats),
job_snapshot, snapshot_checker, env,
ShouldReportDetailedTime(env, ioptions.stats),
true /* internal key corruption is not ok */, range_del_agg.get(),
blob_file_builder.get(), ioptions.allow_data_in_errors,
/*compaction=*/nullptr, compaction_filter.get(),

@ -57,9 +57,9 @@ extern Status BuildTable(
FileMetaData* meta, std::vector<BlobFileAddition>* blob_file_additions,
std::vector<SequenceNumber> snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, bool paranoid_file_checks,
InternalStats* internal_stats, IOStatus* io_status,
const std::shared_ptr<IOTracer>& io_tracer,
SequenceNumber job_snapshot, SnapshotChecker* snapshot_checker,
bool paranoid_file_checks, InternalStats* internal_stats,
IOStatus* io_status, const std::shared_ptr<IOTracer>& io_tracer,
BlobFileCreationReason blob_creation_reason,
EventLogger* event_logger = nullptr, int job_id = 0,
const Env::IOPriority io_priority = Env::IO_HIGH,

@ -24,8 +24,8 @@ CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
Env* env, bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
const Compaction* compaction, const CompactionFilter* compaction_filter,
@ -36,7 +36,7 @@ CompactionIterator::CompactionIterator(
const std::string* full_history_ts_low)
: CompactionIterator(
input, cmp, merge_helper, last_sequence, snapshots,
earliest_write_conflict_snapshot, snapshot_checker, env,
earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env,
report_detailed_time, expect_valid_internal_key, range_del_agg,
blob_file_builder, allow_data_in_errors,
std::unique_ptr<CompactionProxy>(
@ -48,8 +48,8 @@ CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber /*last_sequence*/, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
Env* env, bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
std::unique_ptr<CompactionProxy> compaction,
@ -65,6 +65,7 @@ CompactionIterator::CompactionIterator(
merge_helper_(merge_helper),
snapshots_(snapshots),
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
job_snapshot_(job_snapshot),
snapshot_checker_(snapshot_checker),
env_(env),
clock_(env_->GetSystemClock().get()),
@ -1057,7 +1058,7 @@ void CompactionIterator::PrepareOutput() {
if (valid_ && compaction_ != nullptr &&
!compaction_->allow_ingest_behind() && bottommost_level_ &&
DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
ikey_.type != kTypeMerge) {
ikey_.type != kTypeMerge && current_key_committed_) {
assert(ikey_.type != kTypeDeletion);
assert(ikey_.type != kTypeSingleDeletion ||
(timestamp_size_ || full_history_ts_low_));

@ -172,8 +172,8 @@ class CompactionIterator {
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
Env* env, bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
const Compaction* compaction = nullptr,
@ -189,8 +189,8 @@ class CompactionIterator {
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
Env* env, bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
std::unique_ptr<CompactionProxy> compaction,
@ -268,7 +268,7 @@ class CompactionIterator {
inline bool KeyCommitted(SequenceNumber sequence) {
return snapshot_checker_ == nullptr ||
snapshot_checker_->CheckInSnapshot(sequence, kMaxSequenceNumber) ==
snapshot_checker_->CheckInSnapshot(sequence, job_snapshot_) ==
SnapshotCheckerResult::kInSnapshot;
}
@ -309,6 +309,7 @@ class CompactionIterator {
std::unordered_set<SequenceNumber> released_snapshots_;
std::vector<SequenceNumber>::const_iterator earliest_snapshot_iter_;
const SequenceNumber earliest_write_conflict_snapshot_;
const SequenceNumber job_snapshot_;
const SnapshotChecker* const snapshot_checker_;
Env* env_;
SystemClock* clock_;

@ -275,11 +275,11 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
iter_->SeekToFirst();
c_iter_.reset(new CompactionIterator(
iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
earliest_write_conflict_snapshot, snapshot_checker_.get(),
Env::Default(), false /* report_detailed_time */, false,
range_del_agg_.get(), nullptr /* blob_file_builder */,
true /*allow_data_in_errors*/, std::move(compaction), filter,
&shutting_down_,
earliest_write_conflict_snapshot, kMaxSequenceNumber,
snapshot_checker_.get(), Env::Default(),
false /* report_detailed_time */, false, range_del_agg_.get(),
nullptr /* blob_file_builder */, true /*allow_data_in_errors*/,
std::move(compaction), filter, &shutting_down_,
/*manual_compaction_paused=*/nullptr,
/*manual_compaction_canceled=*/nullptr, /*info_log=*/nullptr,
full_history_ts_low));

@ -423,10 +423,11 @@ CompactionJob::CompactionJob(
ErrorHandler* db_error_handler,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache,
EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname, CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const SnapshotChecker* snapshot_checker, JobContext* job_context,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname,
CompactionJobStats* compaction_job_stats, Env::Priority thread_pri,
const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<int>* manual_compaction_paused,
const std::atomic<bool>* manual_compaction_canceled,
const std::string& db_id, const std::string& db_session_id,
@ -463,6 +464,7 @@ CompactionJob::CompactionJob(
existing_snapshots_(std::move(existing_snapshots)),
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
snapshot_checker_(snapshot_checker),
job_context_(job_context),
table_cache_(std::move(table_cache)),
event_logger_(event_logger),
paranoid_file_checks_(paranoid_file_checks),
@ -1468,9 +1470,12 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
Status status;
const std::string* const full_history_ts_low =
full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_;
const SequenceNumber job_snapshot_seq =
job_context_ ? job_context_->GetJobSnapshotSequence()
: kMaxSequenceNumber;
sub_compact->c_iter.reset(new CompactionIterator(
input, cfd->user_comparator(), &merge, versions_->LastSequence(),
&existing_snapshots_, earliest_write_conflict_snapshot_,
&existing_snapshots_, earliest_write_conflict_snapshot_, job_snapshot_seq,
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_),
/*expect_valid_internal_key=*/true, &range_del_agg,
blob_file_builder.get(), db_options_.allow_data_in_errors,
@ -2496,7 +2501,7 @@ CompactionServiceCompactionJob::CompactionServiceCompactionJob(
job_id, compaction, db_options, mutable_db_options, file_options,
versions, shutting_down, log_buffer, nullptr, output_directory,
nullptr, stats, db_mutex, db_error_handler, existing_snapshots,
kMaxSequenceNumber, nullptr, table_cache, event_logger,
kMaxSequenceNumber, nullptr, nullptr, table_cache, event_logger,
compaction->mutable_cf_options()->paranoid_file_checks,
compaction->mutable_cf_options()->report_bg_io_stats, dbname,
&(compaction_service_result->stats), Env::Priority::USER, io_tracer,

@ -73,7 +73,7 @@ class CompactionJob {
InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker,
const SnapshotChecker* snapshot_checker, JobContext* job_context,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname, CompactionJobStats* compaction_job_stats,
@ -212,6 +212,8 @@ class CompactionJob {
const SnapshotChecker* const snapshot_checker_;
JobContext* job_context_;
std::shared_ptr<Cache> table_cache_;
EventLogger* event_logger_;

@ -355,9 +355,9 @@ class CompactionJobTestBase : public testing::Test {
0, &compaction, db_options_, mutable_db_options_, env_options_,
versions_.get(), &shutting_down_, &log_buffer, nullptr, nullptr,
nullptr, nullptr, &mutex_, &error_handler_, snapshots,
earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
&event_logger, false, false, dbname_, &compaction_job_stats_,
Env::Priority::USER, nullptr /* IOTracer */,
earliest_write_conflict_snapshot, snapshot_checker, nullptr,
table_cache_, &event_logger, false, false, dbname_,
&compaction_job_stats_, Env::Priority::USER, nullptr /* IOTracer */,
/*manual_compaction_paused=*/nullptr,
/*manual_compaction_canceled=*/nullptr, /*db_id=*/"",
/*db_session_id=*/"", full_history_ts_low_);

@ -1367,7 +1367,7 @@ Status DBImpl::CompactFilesImpl(
GetDataDir(c->column_family_data(), c->output_path_id()),
GetDataDir(c->column_family_data(), 0), stats_, &mutex_, &error_handler_,
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
table_cache_, &event_logger_,
job_context, table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, Env::Priority::USER, io_tracer_,
@ -3361,7 +3361,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
GetDataDir(c->column_family_data(), c->output_path_id()),
GetDataDir(c->column_family_data(), 0), stats_, &mutex_,
&error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, table_cache_, &event_logger_,
snapshot_checker, job_context, table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, thread_pri, io_tracer_,

@ -1528,9 +1528,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
dbname_, versions_.get(), immutable_db_options_, tboptions,
file_options_for_compaction_, cfd->table_cache(), iter.get(),
std::move(range_del_iters), &meta, &blob_file_additions,
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
paranoid_file_checks, cfd->internal_stats(), &io_s, io_tracer_,
BlobFileCreationReason::kRecovery, &event_logger_, job_id,
snapshot_seqs, earliest_write_conflict_snapshot, kMaxSequenceNumber,
snapshot_checker, paranoid_file_checks, cfd->internal_stats(), &io_s,
io_tracer_, BlobFileCreationReason::kRecovery, &event_logger_, job_id,
Env::IO_HIGH, nullptr /* table_properties */, write_hint,
nullptr /*full_history_ts_low*/, &blob_callback_);
LogFlush(immutable_db_options_.info_log);

@ -455,11 +455,13 @@ Status FlushJob::MemPurge() {
ioptions->logger, true /* internal key corruption is not ok */,
existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
snapshot_checker_);
assert(job_context_);
SequenceNumber job_snapshot_seq = job_context_->GetJobSnapshotSequence();
CompactionIterator c_iter(
iter.get(), (cfd_->internal_comparator()).user_comparator(), &merge,
kMaxSequenceNumber, &existing_snapshots_,
earliest_write_conflict_snapshot_, snapshot_checker_, env,
ShouldReportDetailedTime(env, ioptions->stats),
earliest_write_conflict_snapshot_, job_snapshot_seq, snapshot_checker_,
env, ShouldReportDetailedTime(env, ioptions->stats),
true /* internal key corruption is not ok */, range_del_agg.get(),
nullptr, ioptions->allow_data_in_errors,
/*compaction=*/nullptr, compaction_filter.get(),
@ -829,6 +831,7 @@ Status FlushJob::WriteLevel0Table() {
// TEST_SYNC_POINT_CALLBACK not used.
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:num_memtables",
&mems_size);
assert(job_context_);
for (MemTable* m : mems_) {
ROCKS_LOG_INFO(
db_options_.info_log,
@ -911,16 +914,19 @@ Status FlushJob::WriteLevel0Table() {
TableFileCreationReason::kFlush, creation_time, oldest_key_time,
current_time, db_id_, db_session_id_, 0 /* target_file_size */,
meta_.fd.GetNumber());
const SequenceNumber job_snapshot_seq =
job_context_->GetJobSnapshotSequence();
s = BuildTable(
dbname_, versions_, db_options_, tboptions, file_options_,
cfd_->table_cache(), iter.get(), std::move(range_del_iters), &meta_,
&blob_file_additions, existing_snapshots_,
earliest_write_conflict_snapshot_, snapshot_checker_,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
&io_s, io_tracer_, BlobFileCreationReason::kFlush, event_logger_,
job_context_->job_id, Env::IO_HIGH, &table_properties_, write_hint,
full_history_ts_low, blob_callback_, &num_input_entries,
&memtable_payload_bytes, &memtable_garbage_bytes);
earliest_write_conflict_snapshot_, job_snapshot_seq,
snapshot_checker_, mutable_cf_options_.paranoid_file_checks,
cfd_->internal_stats(), &io_s, io_tracer_,
BlobFileCreationReason::kFlush, event_logger_, job_context_->job_id,
Env::IO_HIGH, &table_properties_, write_hint, full_history_ts_low,
blob_callback_, &num_input_entries, &memtable_payload_bytes,
&memtable_garbage_bytes);
// TODO: Cleanup io_status in BuildTable and table builders
assert(!s.ok() || io_s.ok());
io_s.PermitUncheckedError();

@ -124,6 +124,14 @@ struct JobContext {
job_snapshot != nullptr || sv_have_sth;
}
SequenceNumber GetJobSnapshotSequence() const {
if (job_snapshot) {
assert(job_snapshot->snapshot());
return job_snapshot->snapshot()->GetSequenceNumber();
}
return kMaxSequenceNumber;
}
// Structure to store information for candidate files to delete.
struct CandidateFileInfo {
std::string file_name;

@ -450,7 +450,7 @@ class Repairer {
dbname_, /* versions */ nullptr, immutable_db_options_, tboptions,
file_options_, table_cache_.get(), iter.get(),
std::move(range_del_iters), &meta, nullptr /* blob_file_additions */,
{}, kMaxSequenceNumber, snapshot_checker,
{}, kMaxSequenceNumber, kMaxSequenceNumber, snapshot_checker,
false /* paranoid_file_checks*/, nullptr /* internal_stats */, &io_s,
nullptr /*IOTracer*/, BlobFileCreationReason::kRecovery,
nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH,

Loading…
Cancel
Save