Disable manual compaction during `ReFitLevel()` (#7250)

Summary:
Manual compaction with `CompactRangeOptions::change_levels` set could
refit to a level targeted by another manual compaction. If
force_consistency_checks were disabled, it could be possible for
overlapping files to be written at that target level.

This PR prevents the possibility by calling `DisableManualCompaction()`
prior to `ReFitLevel()`. It also improves the manual compaction disabling
mechanism to wait for pending manual compactions to complete before
returning, and support disabling from multiple threads.

Fixes https://github.com/facebook/rocksdb/issues/6432.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7250

Test Plan:
crash test command that repro'd the bug reliably:

```
$ TEST_TMPDIR=/dev/shm python tools/db_crashtest.py blackbox --simple -target_file_size_base=524288 -write_buffer_size=1048576 -clear_column_family_one_in=0 -reopen=0 -max_key=10000000 -column_families=1 -max_background_compactions=8 -compact_range_one_in=100000 -compression_type=none -compaction_style=1 -num_levels=5 -universal_min_merge_width=4 -universal_max_merge_width=8 -level0_file_num_compaction_trigger=12 -rate_limiter_bytes_per_sec=1048576000 -universal_max_size_amplification_percent=100 --duration=3600 --interval=60 --use_direct_io_for_flush_and_compaction=0 --use_direct_reads=0 --enable_compaction_filter=0
```

Reviewed By: ltamasi

Differential Revision: D23090800

Pulled By: ajkr

fbshipit-source-id: afcbcd51b42ce76789fdb907d8b9ada790709c13
main
Andrew Kryczka 4 years ago committed by Facebook GitHub Bot
parent e503f5e0a0
commit a1aa3f8385
  1. 2
      HISTORY.md
  2. 4
      db/compaction/compaction_iterator.cc
  3. 56
      db/compaction/compaction_iterator.h
  4. 8
      db/compaction/compaction_job.cc
  5. 4
      db/compaction/compaction_job.h
  6. 97
      db/db_compaction_test.cc
  7. 9
      db/db_impl/db_impl.h
  8. 28
      db/db_impl/db_impl_compaction_flush.cc
  9. 13
      db/db_test2.cc

@ -2,12 +2,12 @@
## Unreleased
### Bug fixes
* Fix a performance regression introduced in 6.4 that makes a upper bound check for every Next() even if keys are within a data block that is within the upper bound.
* Fix a possible corruption to the LSM state (overlapping files within a level) when a `CompactRange()` for refitting levels (`CompactRangeOptions::change_level == true`) and another manual compaction are executed in parallel.
### New Features
* A new option `std::shared_ptr<FileChecksumGenFactory> file_checksum_gen_factory` is added to `BackupableDBOptions`. The default value for this option is `nullptr`. If this option is null, the default backup engine checksum function (crc32c) will be used for creating, verifying, or restoring backups. If it is not null and is set to the DB custom checksum factory, the custom checksum function used in DB will also be used for creating, verifying, or restoring backups, in addition to the default checksum function (crc32c). If it is not null and is set to a custom checksum factory different than the DB custom checksum factory (which may be null), BackupEngine will return `Status::InvalidArgument()`.
* A new field `std::string requested_checksum_func_name` is added to `FileChecksumGenContext`, which enables the checksum factory to create generators for a suite of different functions.
## 6.12 (2020-07-28)
### Public API Change
* Encryption file classes now exposed for inheritance in env_encryption.h

@ -40,7 +40,7 @@ CompactionIterator::CompactionIterator(
const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum,
const std::atomic<bool>* manual_compaction_paused,
const std::atomic<int>* manual_compaction_paused,
const std::shared_ptr<Logger> info_log)
: CompactionIterator(
input, cmp, merge_helper, last_sequence, snapshots,
@ -62,7 +62,7 @@ CompactionIterator::CompactionIterator(
const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum,
const std::atomic<bool>* manual_compaction_paused,
const std::atomic<int>* manual_compaction_paused,
const std::shared_ptr<Logger> info_log)
: input_(input),
cmp_(cmp),

@ -59,34 +59,34 @@ class CompactionIterator {
const Compaction* compaction_;
};
CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
const Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0,
const std::atomic<bool>* manual_compaction_paused = nullptr,
const std::shared_ptr<Logger> info_log = nullptr);
CompactionIterator(InternalIterator* input, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber last_sequence,
std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
const Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0,
const std::atomic<int>* manual_compaction_paused = nullptr,
const std::shared_ptr<Logger> info_log = nullptr);
// Constructor with custom CompactionProxy, used for tests.
CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0,
const std::atomic<bool>* manual_compaction_paused = nullptr,
const std::shared_ptr<Logger> info_log = nullptr);
CompactionIterator(InternalIterator* input, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber last_sequence,
std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0,
const std::atomic<int>* manual_compaction_paused = nullptr,
const std::shared_ptr<Logger> info_log = nullptr);
~CompactionIterator();
@ -166,7 +166,7 @@ class CompactionIterator {
std::unique_ptr<CompactionProxy> compaction_;
const CompactionFilter* compaction_filter_;
const std::atomic<bool>* shutting_down_;
const std::atomic<bool>* manual_compaction_paused_;
const std::atomic<int>* manual_compaction_paused_;
const SequenceNumber preserve_deletes_seqnum_;
bool bottommost_level_;
bool valid_ = false;
@ -235,7 +235,7 @@ class CompactionIterator {
bool IsPausingManualCompaction() {
// This is a best-effort facility, so memory_order_relaxed is sufficient.
return manual_compaction_paused_ &&
manual_compaction_paused_->load(std::memory_order_relaxed);
manual_compaction_paused_->load(std::memory_order_relaxed) > 0;
}
};
} // namespace ROCKSDB_NAMESPACE

@ -329,7 +329,7 @@ CompactionJob::CompactionJob(
EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname, CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<bool>* manual_compaction_paused, const std::string& db_id,
const std::atomic<int>* manual_compaction_paused, const std::string& db_id,
const std::string& db_session_id)
: job_id_(job_id),
compact_(new CompactionState(compaction)),
@ -929,7 +929,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
TEST_SYNC_POINT_CALLBACK(
"CompactionJob::Run():PausingManualCompaction:1",
reinterpret_cast<void*>(
const_cast<std::atomic<bool>*>(manual_compaction_paused_)));
const_cast<std::atomic<int>*>(manual_compaction_paused_)));
Slice* start = sub_compact->start;
Slice* end = sub_compact->end;
@ -1023,7 +1023,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
TEST_SYNC_POINT_CALLBACK(
"CompactionJob::Run():PausingManualCompaction:2",
reinterpret_cast<void*>(
const_cast<std::atomic<bool>*>(manual_compaction_paused_)));
const_cast<std::atomic<int>*>(manual_compaction_paused_)));
if (partitioner.get()) {
last_key_for_partitioner.assign(c_iter->user_key().data_,
c_iter->user_key().size_);
@ -1090,7 +1090,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
}
if ((status.ok() || status.IsColumnFamilyDropped()) &&
(manual_compaction_paused_ &&
manual_compaction_paused_->load(std::memory_order_relaxed))) {
manual_compaction_paused_->load(std::memory_order_relaxed) > 0)) {
status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
if (status.ok()) {

@ -77,7 +77,7 @@ class CompactionJob {
bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname, CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<bool>* manual_compaction_paused = nullptr,
const std::atomic<int>* manual_compaction_paused = nullptr,
const std::string& db_id = "", const std::string& db_session_id = "");
~CompactionJob();
@ -163,7 +163,7 @@ class CompactionJob {
FileOptions file_options_for_read_;
VersionSet* versions_;
const std::atomic<bool>* shutting_down_;
const std::atomic<bool>* manual_compaction_paused_;
const std::atomic<int>* manual_compaction_paused_;
const SequenceNumber preserve_deletes_seqnum_;
LogBuffer* log_buffer_;
FSDirectory* db_directory_;

@ -5442,7 +5442,102 @@ TEST_F(DBCompactionTest, UpdateUniversalSubCompactionTest) {
ASSERT_TRUE(has_compaction);
}
#endif // !defined(ROCKSDB_LITE)
TEST_F(DBCompactionTest, ChangeLevelCompactRangeConflictsWithManual) {
// A `CompactRange()` with `change_level == true` needs to execute its final
// step, `ReFitLevel()`, in isolation. Previously there was a bug where
// refitting could target the same level as an ongoing manual compaction,
// leading to overlapping files in that level.
//
// This test ensures that case is not possible by verifying any manual
// compaction issued during the `ReFitLevel()` phase fails with
// `Status::Incomplete`.
Options options = CurrentOptions();
options.memtable_factory.reset(
new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
options.level0_file_num_compaction_trigger = 2;
options.num_levels = 3;
Reopen(options);
// Setup an LSM with three levels populated.
Random rnd(301);
int key_idx = 0;
GenerateNewFile(&rnd, &key_idx);
{
CompactRangeOptions cro;
cro.change_level = true;
cro.target_level = 2;
ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
}
ASSERT_EQ("0,0,2", FilesPerLevel(0));
GenerateNewFile(&rnd, &key_idx);
GenerateNewFile(&rnd, &key_idx);
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("1,1,2", FilesPerLevel(0));
// The background thread will refit L2->L1 while the
// foreground thread will try to simultaneously compact L0->L1.
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
// The first two dependencies ensure the foreground creates an L0 file
// between the background compaction's L0->L1 and its L1->L2.
{
"DBImpl::RunManualCompaction()::1",
"DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
"PutFG",
},
{
"DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
"FlushedFG",
"DBImpl::RunManualCompaction()::2",
},
// The next two dependencies ensure the foreground invokes
// `CompactRange()` while the background is refitting. The
// foreground's `CompactRange()` is guaranteed to attempt an L0->L1
// as we set it up with an empty memtable and a new L0 file.
{
"DBImpl::CompactRange:PreRefitLevel",
"DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
"CompactFG",
},
{
"DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
"CompactedFG",
"DBImpl::CompactRange:PostRefitLevel",
},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::port::Thread refit_level_thread([&] {
CompactRangeOptions cro;
cro.change_level = true;
cro.target_level = 1;
ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
});
TEST_SYNC_POINT(
"DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:PutFG");
// Make sure we have something new to compact in the foreground.
// Note key 1 is carefully chosen as it ensures the file we create here
// overlaps with one of the files being refitted L2->L1 in the background.
// If we chose key 0, the file created here would not overlap.
ASSERT_OK(Put(Key(1), "val"));
ASSERT_OK(Flush());
TEST_SYNC_POINT(
"DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:FlushedFG");
TEST_SYNC_POINT(
"DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:CompactFG");
ASSERT_TRUE(dbfull()
->CompactRange(CompactRangeOptions(), nullptr, nullptr)
.IsIncomplete());
TEST_SYNC_POINT(
"DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
"CompactedFG");
refit_level_thread.join();
}
#endif // !defined(ROCKSDB_LITE)
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -1833,7 +1833,14 @@ class DBImpl : public DB {
InstrumentedMutex log_write_mutex_;
std::atomic<bool> shutting_down_;
std::atomic<bool> manual_compaction_paused_;
// If zero, manual compactions are allowed to proceed. If non-zero, manual
// compactions may still be running, but will quickly fail with
// `Status::Incomplete`. The value indicates how many threads have paused
// manual compactions. It is accessed in read mode outside the DB mutex in
// compaction code paths.
std::atomic<int> manual_compaction_paused_;
// This condition variable is signaled on these conditions:
// * whenever bg_compaction_scheduled_ goes down to 0
// * if AnyManualCompaction, whenever a compaction finishes, even if it hasn't

@ -848,11 +848,15 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
if (options.change_level) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[RefitLevel] waiting for background threads to stop");
DisableManualCompaction();
s = PauseBackgroundWork();
if (s.ok()) {
TEST_SYNC_POINT("DBImpl::CompactRange:PreRefitLevel");
s = ReFitLevel(cfd, final_output_level, options.target_level);
TEST_SYNC_POINT("DBImpl::CompactRange:PostRefitLevel");
ContinueBackgroundWork();
}
ContinueBackgroundWork();
EnableManualCompaction();
}
LogFlush(immutable_db_options_.info_log);
@ -959,7 +963,7 @@ Status DBImpl::CompactFilesImpl(
if (shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();
}
if (manual_compaction_paused_.load(std::memory_order_acquire)) {
if (manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
@ -1180,7 +1184,7 @@ void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
return;
}
if (c->is_manual_compaction() &&
manual_compaction_paused_.load(std::memory_order_acquire)) {
manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
return;
}
Version* current = cfd->current();
@ -1254,7 +1258,7 @@ void DBImpl::NotifyOnCompactionCompleted(
return;
}
if (c->is_manual_compaction() &&
manual_compaction_paused_.load(std::memory_order_acquire)) {
manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
return;
}
Version* current = cfd->current();
@ -1965,11 +1969,21 @@ Status DBImpl::EnableAutoCompaction(
}
void DBImpl::DisableManualCompaction() {
manual_compaction_paused_.store(true, std::memory_order_release);
InstrumentedMutexLock l(&mutex_);
manual_compaction_paused_.fetch_add(1, std::memory_order_release);
// Wait for any pending manual compactions to finish (typically through
// failing with `Status::Incomplete`) prior to returning. This way we are
// guaranteed no pending manual compaction will commit while manual
// compactions are "disabled".
while (HasPendingManualCompaction()) {
bg_cv_.Wait();
}
}
void DBImpl::EnableManualCompaction() {
manual_compaction_paused_.store(false, std::memory_order_release);
InstrumentedMutexLock l(&mutex_);
assert(manual_compaction_paused_ > 0);
manual_compaction_paused_.fetch_sub(1, std::memory_order_release);
}
void DBImpl::MaybeScheduleFlushOrCompaction() {
@ -2528,7 +2542,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
if (shutting_down_.load(std::memory_order_acquire)) {
status = Status::ShutdownInProgress();
} else if (is_manual &&
manual_compaction_paused_.load(std::memory_order_acquire)) {
manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
} else {

@ -2760,9 +2760,9 @@ TEST_F(DBTest2, PausingManualCompaction1) {
int manual_compactions_paused = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():PausingManualCompaction:1", [&](void* arg) {
auto paused = reinterpret_cast<std::atomic<bool>*>(arg);
ASSERT_FALSE(paused->load(std::memory_order_acquire));
paused->store(true, std::memory_order_release);
auto paused = static_cast<std::atomic<int>*>(arg);
ASSERT_EQ(0, paused->load(std::memory_order_acquire));
paused->fetch_add(1, std::memory_order_release);
manual_compactions_paused += 1;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
@ -2921,14 +2921,13 @@ TEST_F(DBTest2, PausingManualCompaction4) {
int run_manual_compactions = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():PausingManualCompaction:2", [&](void* arg) {
auto paused = reinterpret_cast<std::atomic<bool>*>(arg);
ASSERT_FALSE(paused->load(std::memory_order_acquire));
paused->store(true, std::memory_order_release);
auto paused = static_cast<std::atomic<int>*>(arg);
ASSERT_EQ(0, paused->load(std::memory_order_acquire));
paused->fetch_add(1, std::memory_order_release);
run_manual_compactions++;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
dbfull()->EnableManualCompaction();
dbfull()->CompactRange(compact_options, nullptr, nullptr);
dbfull()->TEST_WaitForCompact(true);
ASSERT_EQ(run_manual_compactions, 1);

Loading…
Cancel
Save