diff --git a/HISTORY.md b/HISTORY.md index 28db16720..72c8d1b63 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,12 +2,12 @@ ## Unreleased ### Bug fixes * Fix a performance regression introduced in 6.4 that makes a upper bound check for every Next() even if keys are within a data block that is within the upper bound. +* Fix a possible corruption to the LSM state (overlapping files within a level) when a `CompactRange()` for refitting levels (`CompactRangeOptions::change_level == true`) and another manual compaction are executed in parallel. ### New Features * A new option `std::shared_ptr file_checksum_gen_factory` is added to `BackupableDBOptions`. The default value for this option is `nullptr`. If this option is null, the default backup engine checksum function (crc32c) will be used for creating, verifying, or restoring backups. If it is not null and is set to the DB custom checksum factory, the custom checksum function used in DB will also be used for creating, verifying, or restoring backups, in addition to the default checksum function (crc32c). If it is not null and is set to a custom checksum factory different than the DB custom checksum factory (which may be null), BackupEngine will return `Status::InvalidArgument()`. * A new field `std::string requested_checksum_func_name` is added to `FileChecksumGenContext`, which enables the checksum factory to create generators for a suite of different functions. - ## 6.12 (2020-07-28) ### Public API Change * Encryption file classes now exposed for inheritance in env_encryption.h diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 94873255b..3a99f19a8 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -40,7 +40,7 @@ CompactionIterator::CompactionIterator( const CompactionFilter* compaction_filter, const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, - const std::atomic* manual_compaction_paused, + const std::atomic* manual_compaction_paused, const std::shared_ptr info_log) : CompactionIterator( input, cmp, merge_helper, last_sequence, snapshots, @@ -62,7 +62,7 @@ CompactionIterator::CompactionIterator( const CompactionFilter* compaction_filter, const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, - const std::atomic* manual_compaction_paused, + const std::atomic* manual_compaction_paused, const std::shared_ptr info_log) : input_(input), cmp_(cmp), diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index 41100a144..dc7984180 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -59,34 +59,34 @@ class CompactionIterator { const Compaction* compaction_; }; - CompactionIterator( - InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, - SequenceNumber last_sequence, std::vector* snapshots, - SequenceNumber earliest_write_conflict_snapshot, - const SnapshotChecker* snapshot_checker, Env* env, - bool report_detailed_time, bool expect_valid_internal_key, - CompactionRangeDelAggregator* range_del_agg, - const Compaction* compaction = nullptr, - const CompactionFilter* compaction_filter = nullptr, - const std::atomic* shutting_down = nullptr, - const SequenceNumber preserve_deletes_seqnum = 0, - const std::atomic* manual_compaction_paused = nullptr, - const std::shared_ptr info_log = nullptr); + CompactionIterator(InternalIterator* input, const Comparator* cmp, + MergeHelper* merge_helper, SequenceNumber last_sequence, + std::vector* snapshots, + SequenceNumber earliest_write_conflict_snapshot, + const SnapshotChecker* snapshot_checker, Env* env, + bool report_detailed_time, bool expect_valid_internal_key, + CompactionRangeDelAggregator* range_del_agg, + const Compaction* compaction = nullptr, + const CompactionFilter* compaction_filter = nullptr, + const std::atomic* shutting_down = nullptr, + const SequenceNumber preserve_deletes_seqnum = 0, + const std::atomic* manual_compaction_paused = nullptr, + const std::shared_ptr info_log = nullptr); // Constructor with custom CompactionProxy, used for tests. - CompactionIterator( - InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, - SequenceNumber last_sequence, std::vector* snapshots, - SequenceNumber earliest_write_conflict_snapshot, - const SnapshotChecker* snapshot_checker, Env* env, - bool report_detailed_time, bool expect_valid_internal_key, - CompactionRangeDelAggregator* range_del_agg, - std::unique_ptr compaction, - const CompactionFilter* compaction_filter = nullptr, - const std::atomic* shutting_down = nullptr, - const SequenceNumber preserve_deletes_seqnum = 0, - const std::atomic* manual_compaction_paused = nullptr, - const std::shared_ptr info_log = nullptr); + CompactionIterator(InternalIterator* input, const Comparator* cmp, + MergeHelper* merge_helper, SequenceNumber last_sequence, + std::vector* snapshots, + SequenceNumber earliest_write_conflict_snapshot, + const SnapshotChecker* snapshot_checker, Env* env, + bool report_detailed_time, bool expect_valid_internal_key, + CompactionRangeDelAggregator* range_del_agg, + std::unique_ptr compaction, + const CompactionFilter* compaction_filter = nullptr, + const std::atomic* shutting_down = nullptr, + const SequenceNumber preserve_deletes_seqnum = 0, + const std::atomic* manual_compaction_paused = nullptr, + const std::shared_ptr info_log = nullptr); ~CompactionIterator(); @@ -166,7 +166,7 @@ class CompactionIterator { std::unique_ptr compaction_; const CompactionFilter* compaction_filter_; const std::atomic* shutting_down_; - const std::atomic* manual_compaction_paused_; + const std::atomic* manual_compaction_paused_; const SequenceNumber preserve_deletes_seqnum_; bool bottommost_level_; bool valid_ = false; @@ -235,7 +235,7 @@ class CompactionIterator { bool IsPausingManualCompaction() { // This is a best-effort facility, so memory_order_relaxed is sufficient. return manual_compaction_paused_ && - manual_compaction_paused_->load(std::memory_order_relaxed); + manual_compaction_paused_->load(std::memory_order_relaxed) > 0; } }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 74f3c7ccd..5e0e5bd47 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -329,7 +329,7 @@ CompactionJob::CompactionJob( EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname, CompactionJobStats* compaction_job_stats, Env::Priority thread_pri, const std::shared_ptr& io_tracer, - const std::atomic* manual_compaction_paused, const std::string& db_id, + const std::atomic* manual_compaction_paused, const std::string& db_id, const std::string& db_session_id) : job_id_(job_id), compact_(new CompactionState(compaction)), @@ -929,7 +929,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { TEST_SYNC_POINT_CALLBACK( "CompactionJob::Run():PausingManualCompaction:1", reinterpret_cast( - const_cast*>(manual_compaction_paused_))); + const_cast*>(manual_compaction_paused_))); Slice* start = sub_compact->start; Slice* end = sub_compact->end; @@ -1023,7 +1023,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { TEST_SYNC_POINT_CALLBACK( "CompactionJob::Run():PausingManualCompaction:2", reinterpret_cast( - const_cast*>(manual_compaction_paused_))); + const_cast*>(manual_compaction_paused_))); if (partitioner.get()) { last_key_for_partitioner.assign(c_iter->user_key().data_, c_iter->user_key().size_); @@ -1090,7 +1090,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { } if ((status.ok() || status.IsColumnFamilyDropped()) && (manual_compaction_paused_ && - manual_compaction_paused_->load(std::memory_order_relaxed))) { + manual_compaction_paused_->load(std::memory_order_relaxed) > 0)) { status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); } if (status.ok()) { diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 4160063b4..14c28b5f5 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -77,7 +77,7 @@ class CompactionJob { bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname, CompactionJobStats* compaction_job_stats, Env::Priority thread_pri, const std::shared_ptr& io_tracer, - const std::atomic* manual_compaction_paused = nullptr, + const std::atomic* manual_compaction_paused = nullptr, const std::string& db_id = "", const std::string& db_session_id = ""); ~CompactionJob(); @@ -163,7 +163,7 @@ class CompactionJob { FileOptions file_options_for_read_; VersionSet* versions_; const std::atomic* shutting_down_; - const std::atomic* manual_compaction_paused_; + const std::atomic* manual_compaction_paused_; const SequenceNumber preserve_deletes_seqnum_; LogBuffer* log_buffer_; FSDirectory* db_directory_; diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index c007dde6b..b8c3edb78 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -5442,7 +5442,102 @@ TEST_F(DBCompactionTest, UpdateUniversalSubCompactionTest) { ASSERT_TRUE(has_compaction); } -#endif // !defined(ROCKSDB_LITE) +TEST_F(DBCompactionTest, ChangeLevelCompactRangeConflictsWithManual) { + // A `CompactRange()` with `change_level == true` needs to execute its final + // step, `ReFitLevel()`, in isolation. Previously there was a bug where + // refitting could target the same level as an ongoing manual compaction, + // leading to overlapping files in that level. + // + // This test ensures that case is not possible by verifying any manual + // compaction issued during the `ReFitLevel()` phase fails with + // `Status::Incomplete`. + Options options = CurrentOptions(); + options.memtable_factory.reset( + new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1)); + options.level0_file_num_compaction_trigger = 2; + options.num_levels = 3; + Reopen(options); + + // Setup an LSM with three levels populated. + Random rnd(301); + int key_idx = 0; + GenerateNewFile(&rnd, &key_idx); + { + CompactRangeOptions cro; + cro.change_level = true; + cro.target_level = 2; + ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + } + ASSERT_EQ("0,0,2", FilesPerLevel(0)); + + GenerateNewFile(&rnd, &key_idx); + GenerateNewFile(&rnd, &key_idx); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ("1,1,2", FilesPerLevel(0)); + + // The background thread will refit L2->L1 while the + // foreground thread will try to simultaneously compact L0->L1. + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ + // The first two dependencies ensure the foreground creates an L0 file + // between the background compaction's L0->L1 and its L1->L2. + { + "DBImpl::RunManualCompaction()::1", + "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:" + "PutFG", + }, + { + "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:" + "FlushedFG", + "DBImpl::RunManualCompaction()::2", + }, + // The next two dependencies ensure the foreground invokes + // `CompactRange()` while the background is refitting. The + // foreground's `CompactRange()` is guaranteed to attempt an L0->L1 + // as we set it up with an empty memtable and a new L0 file. + { + "DBImpl::CompactRange:PreRefitLevel", + "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:" + "CompactFG", + }, + { + "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:" + "CompactedFG", + "DBImpl::CompactRange:PostRefitLevel", + }, + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ROCKSDB_NAMESPACE::port::Thread refit_level_thread([&] { + CompactRangeOptions cro; + cro.change_level = true; + cro.target_level = 1; + ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + }); + + TEST_SYNC_POINT( + "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:PutFG"); + // Make sure we have something new to compact in the foreground. + // Note key 1 is carefully chosen as it ensures the file we create here + // overlaps with one of the files being refitted L2->L1 in the background. + // If we chose key 0, the file created here would not overlap. + ASSERT_OK(Put(Key(1), "val")); + ASSERT_OK(Flush()); + TEST_SYNC_POINT( + "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:FlushedFG"); + + TEST_SYNC_POINT( + "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:CompactFG"); + ASSERT_TRUE(dbfull() + ->CompactRange(CompactRangeOptions(), nullptr, nullptr) + .IsIncomplete()); + TEST_SYNC_POINT( + "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:" + "CompactedFG"); + refit_level_thread.join(); +} + +#endif // !defined(ROCKSDB_LITE) + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 1db89b3b0..13b22b543 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1833,7 +1833,14 @@ class DBImpl : public DB { InstrumentedMutex log_write_mutex_; std::atomic shutting_down_; - std::atomic manual_compaction_paused_; + + // If zero, manual compactions are allowed to proceed. If non-zero, manual + // compactions may still be running, but will quickly fail with + // `Status::Incomplete`. The value indicates how many threads have paused + // manual compactions. It is accessed in read mode outside the DB mutex in + // compaction code paths. + std::atomic manual_compaction_paused_; + // This condition variable is signaled on these conditions: // * whenever bg_compaction_scheduled_ goes down to 0 // * if AnyManualCompaction, whenever a compaction finishes, even if it hasn't diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index b427375f7..6355790ae 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -848,11 +848,15 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, if (options.change_level) { ROCKS_LOG_INFO(immutable_db_options_.info_log, "[RefitLevel] waiting for background threads to stop"); + DisableManualCompaction(); s = PauseBackgroundWork(); if (s.ok()) { + TEST_SYNC_POINT("DBImpl::CompactRange:PreRefitLevel"); s = ReFitLevel(cfd, final_output_level, options.target_level); + TEST_SYNC_POINT("DBImpl::CompactRange:PostRefitLevel"); + ContinueBackgroundWork(); } - ContinueBackgroundWork(); + EnableManualCompaction(); } LogFlush(immutable_db_options_.info_log); @@ -959,7 +963,7 @@ Status DBImpl::CompactFilesImpl( if (shutting_down_.load(std::memory_order_acquire)) { return Status::ShutdownInProgress(); } - if (manual_compaction_paused_.load(std::memory_order_acquire)) { + if (manual_compaction_paused_.load(std::memory_order_acquire) > 0) { return Status::Incomplete(Status::SubCode::kManualCompactionPaused); } @@ -1180,7 +1184,7 @@ void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c, return; } if (c->is_manual_compaction() && - manual_compaction_paused_.load(std::memory_order_acquire)) { + manual_compaction_paused_.load(std::memory_order_acquire) > 0) { return; } Version* current = cfd->current(); @@ -1254,7 +1258,7 @@ void DBImpl::NotifyOnCompactionCompleted( return; } if (c->is_manual_compaction() && - manual_compaction_paused_.load(std::memory_order_acquire)) { + manual_compaction_paused_.load(std::memory_order_acquire) > 0) { return; } Version* current = cfd->current(); @@ -1965,11 +1969,21 @@ Status DBImpl::EnableAutoCompaction( } void DBImpl::DisableManualCompaction() { - manual_compaction_paused_.store(true, std::memory_order_release); + InstrumentedMutexLock l(&mutex_); + manual_compaction_paused_.fetch_add(1, std::memory_order_release); + // Wait for any pending manual compactions to finish (typically through + // failing with `Status::Incomplete`) prior to returning. This way we are + // guaranteed no pending manual compaction will commit while manual + // compactions are "disabled". + while (HasPendingManualCompaction()) { + bg_cv_.Wait(); + } } void DBImpl::EnableManualCompaction() { - manual_compaction_paused_.store(false, std::memory_order_release); + InstrumentedMutexLock l(&mutex_); + assert(manual_compaction_paused_ > 0); + manual_compaction_paused_.fetch_sub(1, std::memory_order_release); } void DBImpl::MaybeScheduleFlushOrCompaction() { @@ -2528,7 +2542,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, if (shutting_down_.load(std::memory_order_acquire)) { status = Status::ShutdownInProgress(); } else if (is_manual && - manual_compaction_paused_.load(std::memory_order_acquire)) { + manual_compaction_paused_.load(std::memory_order_acquire) > 0) { status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); } } else { diff --git a/db/db_test2.cc b/db/db_test2.cc index c9d33fb93..de5c29e92 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2760,9 +2760,9 @@ TEST_F(DBTest2, PausingManualCompaction1) { int manual_compactions_paused = 0; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "CompactionJob::Run():PausingManualCompaction:1", [&](void* arg) { - auto paused = reinterpret_cast*>(arg); - ASSERT_FALSE(paused->load(std::memory_order_acquire)); - paused->store(true, std::memory_order_release); + auto paused = static_cast*>(arg); + ASSERT_EQ(0, paused->load(std::memory_order_acquire)); + paused->fetch_add(1, std::memory_order_release); manual_compactions_paused += 1; }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); @@ -2921,14 +2921,13 @@ TEST_F(DBTest2, PausingManualCompaction4) { int run_manual_compactions = 0; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "CompactionJob::Run():PausingManualCompaction:2", [&](void* arg) { - auto paused = reinterpret_cast*>(arg); - ASSERT_FALSE(paused->load(std::memory_order_acquire)); - paused->store(true, std::memory_order_release); + auto paused = static_cast*>(arg); + ASSERT_EQ(0, paused->load(std::memory_order_acquire)); + paused->fetch_add(1, std::memory_order_release); run_manual_compactions++; }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); - dbfull()->EnableManualCompaction(); dbfull()->CompactRange(compact_options, nullptr, nullptr); dbfull()->TEST_WaitForCompact(true); ASSERT_EQ(run_manual_compactions, 1);