Consolidate manual_compaction_paused_ check (#10070)

Summary:
As pointed out by [https://github.com/facebook/rocksdb/pull/8351#discussion_r645765422](https://github.com/facebook/rocksdb/pull/8351#discussion_r645765422), check `manual_compaction_paused` and `manual_compaction_canceled` can be reduced by setting `*canceled` to be true in `DisableManualCompaction()` and `*canceled` to be false in the last time calling `EnableManualCompaction()`.

Changed Tests: The origin `DBTest2.PausingManualCompaction1` uses a callback function to increase `manual_compaction_paused` and the origin CompactionJob/CompactionIterator with `manual_compaction_paused` can detect this. I changed the callback function so that it sets `*canceled` as true if `canceled` is not `nullptr` (to notify CompactionJob/CompactionIterator the compaction has been canceled).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10070

Test Plan: This change does not introduce new features, but some slight difference in compaction implementation. Run the same manual compaction unit tests as before (e.g., PausingManualCompaction[1-4], CancelManualCompaction[1-2], CancelManualCompactionWithListener in db_test2, and db_compaction_test).

Reviewed By: ajkr

Differential Revision: D36949133

Pulled By: littlepig2013

fbshipit-source-id: c5dc4c956fbf8f624003a0f5ad2690240063a821
main
zczhu 2 years ago committed by Facebook GitHub Bot
parent a101c9de60
commit 3ee6c9baec
  1. 7
      db/builder.cc
  2. 15
      db/compaction/compaction_iterator.cc
  3. 73
      db/compaction/compaction_iterator.h
  4. 8
      db/compaction/compaction_iterator_test.cc
  5. 27
      db/compaction/compaction_job.cc
  6. 8
      db/compaction/compaction_job.h
  7. 7
      db/compaction/compaction_job_test.cc
  8. 16
      db/db_impl/db_impl.h
  9. 34
      db/db_impl/db_impl_compaction_flush.cc
  10. 4
      db/db_impl/db_impl_secondary.cc
  11. 43
      db/db_test2.cc
  12. 6
      db/flush_job.cc
  13. 7
      include/rocksdb/options.h

@ -193,6 +193,7 @@ Status BuildTable(
&blob_file_paths, blob_file_additions)
: nullptr);
const std::atomic<bool> kManualCompactionCanceledFalse{false};
CompactionIterator c_iter(
iter, tboptions.internal_comparator.user_comparator(), &merge,
kMaxSequenceNumber, &snapshots, earliest_write_conflict_snapshot,
@ -201,11 +202,9 @@ Status BuildTable(
true /* internal key corruption is not ok */, range_del_agg.get(),
blob_file_builder.get(), ioptions.allow_data_in_errors,
ioptions.enforce_single_del_contracts,
/*manual_compaction_canceled=*/kManualCompactionCanceledFalse,
/*compaction=*/nullptr, compaction_filter.get(),
/*shutting_down=*/nullptr,
/*manual_compaction_paused=*/nullptr,
/*manual_compaction_canceled=*/nullptr, db_options.info_log,
full_history_ts_low);
/*shutting_down=*/nullptr, db_options.info_log, full_history_ts_low);
c_iter.SeekToFirst();
for (; c_iter.Valid(); c_iter.Next()) {

@ -28,11 +28,10 @@ CompactionIterator::CompactionIterator(
Env* env, bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
bool enforce_single_del_contracts, const Compaction* compaction,
const CompactionFilter* compaction_filter,
bool enforce_single_del_contracts,
const std::atomic<bool>& manual_compaction_canceled,
const Compaction* compaction, const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
const std::atomic<int>* manual_compaction_paused,
const std::atomic<bool>* manual_compaction_canceled,
const std::shared_ptr<Logger> info_log,
const std::string* full_history_ts_low)
: CompactionIterator(
@ -40,10 +39,10 @@ CompactionIterator::CompactionIterator(
earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env,
report_detailed_time, expect_valid_internal_key, range_del_agg,
blob_file_builder, allow_data_in_errors, enforce_single_del_contracts,
manual_compaction_canceled,
std::unique_ptr<CompactionProxy>(
compaction ? new RealCompaction(compaction) : nullptr),
compaction_filter, shutting_down, manual_compaction_paused,
manual_compaction_canceled, info_log, full_history_ts_low) {}
compaction_filter, shutting_down, info_log, full_history_ts_low) {}
CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
@ -54,11 +53,10 @@ CompactionIterator::CompactionIterator(
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
bool enforce_single_del_contracts,
const std::atomic<bool>& manual_compaction_canceled,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
const std::atomic<int>* manual_compaction_paused,
const std::atomic<bool>* manual_compaction_canceled,
const std::shared_ptr<Logger> info_log,
const std::string* full_history_ts_low)
: input_(input, cmp,
@ -78,7 +76,6 @@ CompactionIterator::CompactionIterator(
compaction_(std::move(compaction)),
compaction_filter_(compaction_filter),
shutting_down_(shutting_down),
manual_compaction_paused_(manual_compaction_paused),
manual_compaction_canceled_(manual_compaction_canceled),
info_log_(info_log),
allow_data_in_errors_(allow_data_in_errors),

@ -167,39 +167,42 @@ class CompactionIterator {
const Compaction* compaction_;
};
CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
Env* env, bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
bool enforce_single_del_contracts, const Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const std::atomic<int>* manual_compaction_paused = nullptr,
const std::atomic<bool>* manual_compaction_canceled = nullptr,
const std::shared_ptr<Logger> info_log = nullptr,
const std::string* full_history_ts_low = nullptr);
CompactionIterator(InternalIterator* input, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber last_sequence,
std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder,
bool allow_data_in_errors,
bool enforce_single_del_contracts,
const std::atomic<bool>& manual_compaction_canceled,
const Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const std::shared_ptr<Logger> info_log = nullptr,
const std::string* full_history_ts_low = nullptr);
// Constructor with custom CompactionProxy, used for tests.
CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
Env* env, bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
bool enforce_single_del_contracts,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const std::atomic<int>* manual_compaction_paused = nullptr,
const std::atomic<bool>* manual_compaction_canceled = nullptr,
const std::shared_ptr<Logger> info_log = nullptr,
const std::string* full_history_ts_low = nullptr);
CompactionIterator(InternalIterator* input, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber last_sequence,
std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder,
bool allow_data_in_errors,
bool enforce_single_del_contracts,
const std::atomic<bool>& manual_compaction_canceled,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const std::shared_ptr<Logger> info_log = nullptr,
const std::string* full_history_ts_low = nullptr);
~CompactionIterator();
@ -320,8 +323,7 @@ class CompactionIterator {
std::unique_ptr<CompactionProxy> compaction_;
const CompactionFilter* compaction_filter_;
const std::atomic<bool>* shutting_down_;
const std::atomic<int>* manual_compaction_paused_;
const std::atomic<bool>* manual_compaction_canceled_;
const std::atomic<bool>& manual_compaction_canceled_;
bool bottommost_level_;
bool valid_ = false;
bool visible_at_tip_;
@ -426,10 +428,7 @@ class CompactionIterator {
bool IsPausingManualCompaction() {
// This is a best-effort facility, so memory_order_relaxed is sufficient.
return (manual_compaction_paused_ &&
manual_compaction_paused_->load(std::memory_order_relaxed) > 0) ||
(manual_compaction_canceled_ &&
manual_compaction_canceled_->load(std::memory_order_relaxed));
return manual_compaction_canceled_.load(std::memory_order_relaxed);
}
};

@ -279,10 +279,9 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
snapshot_checker_.get(), Env::Default(),
false /* report_detailed_time */, false, range_del_agg_.get(),
nullptr /* blob_file_builder */, true /*allow_data_in_errors*/,
true /*enforce_single_del_contracts*/, std::move(compaction), filter,
&shutting_down_,
/*manual_compaction_paused=*/nullptr,
/*manual_compaction_canceled=*/nullptr, /*info_log=*/nullptr,
true /*enforce_single_del_contracts*/,
/*manual_compaction_canceled=*/kManualCompactionCanceledFalse_,
std::move(compaction), filter, &shutting_down_, /*info_log=*/nullptr,
full_history_ts_low));
}
@ -341,6 +340,7 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
std::unique_ptr<CompactionRangeDelAggregator> range_del_agg_;
std::unique_ptr<SnapshotChecker> snapshot_checker_;
std::atomic<bool> shutting_down_{false};
const std::atomic<bool> kManualCompactionCanceledFalse_{false};
FakeCompaction* compaction_proxy_;
};

@ -429,8 +429,7 @@ CompactionJob::CompactionJob(
bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname,
CompactionJobStats* compaction_job_stats, Env::Priority thread_pri,
const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<int>* manual_compaction_paused,
const std::atomic<bool>* manual_compaction_canceled,
const std::atomic<bool>& manual_compaction_canceled,
const std::string& db_id, const std::string& db_session_id,
std::string full_history_ts_low, std::string trim_ts,
BlobFileCompletionCallback* blob_callback)
@ -456,7 +455,6 @@ CompactionJob::CompactionJob(
fs_->OptimizeForCompactionTableRead(file_options, db_options_)),
versions_(versions),
shutting_down_(shutting_down),
manual_compaction_paused_(manual_compaction_paused),
manual_compaction_canceled_(manual_compaction_canceled),
db_directory_(db_directory),
blob_output_directory_(blob_output_directory),
@ -1256,8 +1254,8 @@ void CompactionJob::NotifyOnSubcompactionBegin(
if (shutting_down_->load(std::memory_order_acquire)) {
return;
}
if (c->is_manual_compaction() && manual_compaction_paused_ &&
manual_compaction_paused_->load(std::memory_order_acquire) > 0) {
if (c->is_manual_compaction() &&
manual_compaction_canceled_.load(std::memory_order_acquire)) {
return;
}
@ -1470,7 +1468,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
TEST_SYNC_POINT_CALLBACK(
"CompactionJob::Run():PausingManualCompaction:1",
reinterpret_cast<void*>(
const_cast<std::atomic<int>*>(manual_compaction_paused_)));
const_cast<std::atomic<bool>*>(&manual_compaction_canceled_)));
Status status;
const std::string* const full_history_ts_low =
@ -1484,9 +1482,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_),
/*expect_valid_internal_key=*/true, &range_del_agg,
blob_file_builder.get(), db_options_.allow_data_in_errors,
db_options_.enforce_single_del_contracts, sub_compact->compaction,
compaction_filter, shutting_down_, manual_compaction_paused_,
manual_compaction_canceled_, db_options_.info_log, full_history_ts_low));
db_options_.enforce_single_del_contracts, manual_compaction_canceled_,
sub_compact->compaction, compaction_filter, shutting_down_,
db_options_.info_log, full_history_ts_low));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
@ -1568,7 +1566,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
TEST_SYNC_POINT_CALLBACK(
"CompactionJob::Run():PausingManualCompaction:2",
reinterpret_cast<void*>(
const_cast<std::atomic<int>*>(manual_compaction_paused_)));
const_cast<std::atomic<bool>*>(&manual_compaction_canceled_)));
if (partitioner.get()) {
last_key_for_partitioner.assign(c_iter->user_key().data_,
c_iter->user_key().size_);
@ -1647,10 +1645,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
status = Status::ShutdownInProgress("Database shutdown");
}
if ((status.ok() || status.IsColumnFamilyDropped()) &&
((manual_compaction_paused_ &&
manual_compaction_paused_->load(std::memory_order_relaxed) > 0) ||
(manual_compaction_canceled_ &&
manual_compaction_canceled_->load(std::memory_order_relaxed)))) {
(manual_compaction_canceled_.load(std::memory_order_relaxed))) {
status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
if (status.ok()) {
@ -2527,7 +2522,7 @@ CompactionServiceCompactionJob::CompactionServiceCompactionJob(
std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
const std::string& dbname, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<bool>* manual_compaction_canceled,
const std::atomic<bool>& manual_compaction_canceled,
const std::string& db_id, const std::string& db_session_id,
const std::string& output_path,
const CompactionServiceInput& compaction_service_input,
@ -2540,7 +2535,7 @@ CompactionServiceCompactionJob::CompactionServiceCompactionJob(
compaction->mutable_cf_options()->paranoid_file_checks,
compaction->mutable_cf_options()->report_bg_io_stats, dbname,
&(compaction_service_result->stats), Env::Priority::USER, io_tracer,
nullptr, manual_compaction_canceled, db_id, db_session_id,
manual_compaction_canceled, db_id, db_session_id,
compaction->column_family_data()->GetFullHistoryTsLow()),
output_path_(output_path),
compaction_input_(compaction_service_input),

@ -78,8 +78,7 @@ class CompactionJob {
bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname, CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<int>* manual_compaction_paused = nullptr,
const std::atomic<bool>* manual_compaction_canceled = nullptr,
const std::atomic<bool>& manual_compaction_canceled,
const std::string& db_id = "", const std::string& db_session_id = "",
std::string full_history_ts_low = "", std::string trim_ts = "",
BlobFileCompletionCallback* blob_callback = nullptr);
@ -195,8 +194,7 @@ class CompactionJob {
FileOptions file_options_for_read_;
VersionSet* versions_;
const std::atomic<bool>* shutting_down_;
const std::atomic<int>* manual_compaction_paused_;
const std::atomic<bool>* manual_compaction_canceled_;
const std::atomic<bool>& manual_compaction_canceled_;
FSDirectory* db_directory_;
FSDirectory* blob_output_directory_;
InstrumentedMutex* db_mutex_;
@ -357,7 +355,7 @@ class CompactionServiceCompactionJob : private CompactionJob {
std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
const std::string& dbname, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<bool>* manual_compaction_canceled,
const std::atomic<bool>& manual_compaction_canceled,
const std::string& db_id, const std::string& db_session_id,
const std::string& output_path,
const CompactionServiceInput& compaction_service_input,

@ -353,6 +353,7 @@ class CompactionJobTestBase : public testing::Test {
SnapshotChecker* snapshot_checker = nullptr;
ASSERT_TRUE(full_history_ts_low_.empty() ||
ucmp_->timestamp_size() == full_history_ts_low_.size());
const std::atomic<bool> kManualCompactionCanceledFalse{false};
CompactionJob compaction_job(
0, &compaction, db_options_, mutable_db_options_, env_options_,
versions_.get(), &shutting_down_, &log_buffer, nullptr, nullptr,
@ -360,9 +361,9 @@ class CompactionJobTestBase : public testing::Test {
earliest_write_conflict_snapshot, snapshot_checker, nullptr,
table_cache_, &event_logger, false, false, dbname_,
&compaction_job_stats_, Env::Priority::USER, nullptr /* IOTracer */,
/*manual_compaction_paused=*/nullptr,
/*manual_compaction_canceled=*/nullptr, env_->GenerateUniqueId(),
DBImpl::GenerateDbSessionId(nullptr), full_history_ts_low_);
/*manual_compaction_canceled=*/kManualCompactionCanceledFalse,
env_->GenerateUniqueId(), DBImpl::GenerateDbSessionId(nullptr),
full_history_ts_low_);
VerifyInitializationOfCompactionJobStats(compaction_job_stats_);
compaction_job.Prepare();

@ -1216,6 +1216,9 @@ class DBImpl : public DB {
InstrumentedMutex trace_mutex_;
BlockCacheTracer block_cache_tracer_;
// constant false canceled flag, used when the compaction is not manual
const std::atomic<bool> kManualCompactionCanceledFalse_{false};
// State below is protected by mutex_
// With two_write_queues enabled, some of the variables that accessed during
// WriteToWAL need different synchronization: log_empty_, alive_log_files_,
@ -1603,7 +1606,11 @@ class DBImpl : public DB {
output_path_id(_output_path_id),
exclusive(_exclusive),
disallow_trivial_move(_disallow_trivial_move),
canceled(_canceled) {}
canceled(_canceled ? *_canceled : canceled_internal_storage) {}
// When _canceled is not provided by ther user, we assign the reference of
// canceled_internal_storage to it to consolidate canceled and
// manual_compaction_paused since DisableManualCompaction() might be
// called
ColumnFamilyData* cfd;
int input_level;
@ -1620,7 +1627,12 @@ class DBImpl : public DB {
InternalKey* manual_end = nullptr; // how far we are compacting
InternalKey tmp_storage; // Used to keep track of compaction progress
InternalKey tmp_storage1; // Used to keep track of compaction progress
std::atomic<bool>* canceled; // Compaction canceled by the user?
// When the user provides a canceled pointer in CompactRangeOptions, the
// above varaibe is the reference of the user-provided
// `canceled`, otherwise, it is the reference of canceled_internal_storage
std::atomic<bool> canceled_internal_storage = false;
std::atomic<bool>& canceled; // Compaction canceled pointer reference
};
struct PrepickedCompaction {
// background compaction takes ownership of `compaction`.

@ -1217,6 +1217,10 @@ Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
// Perform CompactFiles
TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2");
TEST_SYNC_POINT_CALLBACK(
"TestCompactFiles:PausingManualCompaction:3",
reinterpret_cast<void*>(
const_cast<std::atomic<int>*>(&manual_compaction_paused_)));
{
InstrumentedMutexLock l(&mutex_);
@ -1372,7 +1376,7 @@ Status DBImpl::CompactFilesImpl(
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, Env::Priority::USER, io_tracer_,
&manual_compaction_paused_, nullptr, db_id_, db_session_id_,
kManualCompactionCanceledFalse_, db_id_, db_session_id_,
c->column_family_data()->GetFullHistoryTsLow(), c->trim_ts(),
&blob_callback_);
@ -1838,8 +1842,7 @@ Status DBImpl::RunManualCompaction(
// and `CompactRangeOptions::canceled` might not work well together.
while (bg_bottom_compaction_scheduled_ > 0 ||
bg_compaction_scheduled_ > 0) {
if (manual_compaction_paused_ > 0 ||
(manual.canceled != nullptr && *manual.canceled == true)) {
if (manual_compaction_paused_ > 0 || manual.canceled == true) {
// Pretend the error came from compaction so the below cleanup/error
// handling code can process it.
manual.done = true;
@ -2376,10 +2379,18 @@ Status DBImpl::EnableAutoCompaction(
return s;
}
// NOTE: Calling DisableManualCompaction() may overwrite the
// user-provided canceled variable in CompactRangeOptions
void DBImpl::DisableManualCompaction() {
InstrumentedMutexLock l(&mutex_);
manual_compaction_paused_.fetch_add(1, std::memory_order_release);
// Mark the canceled as true when the cancellation is triggered by
// manual_compaction_paused (may overwrite user-provided `canceled`)
for (const auto& manual_compaction : manual_compaction_dequeue_) {
manual_compaction->canceled = true;
}
// Wake up manual compactions waiting to start.
bg_cv_.SignalAll();
@ -2392,6 +2403,11 @@ void DBImpl::DisableManualCompaction() {
}
}
// NOTE: In contrast to DisableManualCompaction(), calling
// EnableManualCompaction() does NOT overwrite the user-provided *canceled
// variable to be false since there is NO CHANCE a canceled compaction
// is uncanceled. In other words, a canceled compaction must have been
// dropped out of the manual compaction queue, when we disable it.
void DBImpl::EnableManualCompaction() {
InstrumentedMutexLock l(&mutex_);
assert(manual_compaction_paused_ > 0);
@ -3037,10 +3053,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
if (shutting_down_.load(std::memory_order_acquire)) {
status = Status::ShutdownInProgress();
} else if (is_manual &&
manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
} else if (is_manual && manual_compaction->canceled &&
manual_compaction->canceled->load(std::memory_order_acquire)) {
manual_compaction->canceled.load(std::memory_order_acquire)) {
status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
} else {
@ -3357,6 +3370,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
GetSnapshotContext(job_context, &snapshot_seqs,
&earliest_write_conflict_snapshot, &snapshot_checker);
assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJob compaction_job(
job_context->job_id, c.get(), immutable_db_options_,
mutable_db_options_, file_options_for_compaction_, versions_.get(),
@ -3368,9 +3382,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, thread_pri, io_tracer_,
is_manual ? &manual_compaction_paused_ : nullptr,
is_manual ? manual_compaction->canceled : nullptr, db_id_,
db_session_id_, c->column_family_data()->GetFullHistoryTsLow(),
is_manual ? manual_compaction->canceled
: kManualCompactionCanceledFalse_,
db_id_, db_session_id_, c->column_family_data()->GetFullHistoryTsLow(),
c->trim_ts(), &blob_callback_);
compaction_job.Prepare();

@ -820,8 +820,8 @@ Status DBImplSecondary::CompactWithoutInstallation(
file_options_for_compaction_, versions_.get(), &shutting_down_,
&log_buffer, output_dir.get(), stats_, &mutex_, &error_handler_,
input.snapshots, table_cache_, &event_logger_, dbname_, io_tracer_,
options.canceled, input.db_id, db_session_id_, secondary_path_, input,
result);
options.canceled ? *options.canceled : kManualCompactionCanceledFalse_,
input.db_id, db_session_id_, secondary_path_, input, result);
mutex_.Unlock();
s = compaction_job.Run();

@ -3081,10 +3081,21 @@ TEST_F(DBTest2, PausingManualCompaction1) {
int manual_compactions_paused = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():PausingManualCompaction:1", [&](void* arg) {
auto canceled = static_cast<std::atomic<bool>*>(arg);
// CompactRange triggers manual compaction and cancel the compaction
// by set *canceled as true
if (canceled != nullptr) {
canceled->store(true, std::memory_order_release);
}
manual_compactions_paused += 1;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"TestCompactFiles:PausingManualCompaction:3", [&](void* arg) {
auto paused = static_cast<std::atomic<int>*>(arg);
// CompactFiles() relies on manual_compactions_paused to
// determine if thie compaction should be paused or not
ASSERT_EQ(0, paused->load(std::memory_order_acquire));
paused->fetch_add(1, std::memory_order_release);
manual_compactions_paused += 1;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
@ -3149,7 +3160,7 @@ TEST_F(DBTest2, PausingManualCompaction2) {
Random rnd(301);
for (int i = 0; i < 2; i++) {
// Generate a file containing 10 keys.
// Generate a file containing 100 keys.
for (int j = 0; j < 100; j++) {
ASSERT_OK(Put(Key(j), rnd.RandomString(50)));
}
@ -3248,10 +3259,21 @@ TEST_F(DBTest2, PausingManualCompaction4) {
int run_manual_compactions = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():PausingManualCompaction:2", [&](void* arg) {
auto canceled = static_cast<std::atomic<bool>*>(arg);
// CompactRange triggers manual compaction and cancel the compaction
// by set *canceled as true
if (canceled != nullptr) {
canceled->store(true, std::memory_order_release);
}
run_manual_compactions++;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"TestCompactFiles:PausingManualCompaction:3", [&](void* arg) {
auto paused = static_cast<std::atomic<int>*>(arg);
// CompactFiles() relies on manual_compactions_paused to
// determine if thie compaction should be paused or not
ASSERT_EQ(0, paused->load(std::memory_order_acquire));
paused->fetch_add(1, std::memory_order_release);
run_manual_compactions++;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
@ -3266,7 +3288,6 @@ TEST_F(DBTest2, PausingManualCompaction4) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"CompactionJob::Run():PausingManualCompaction:2");
dbfull()->EnableManualCompaction();
ASSERT_OK(dbfull()->CompactRange(compact_options, nullptr, nullptr));
ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
#ifndef ROCKSDB_LITE
@ -3515,8 +3536,9 @@ TEST_F(DBTest2, CancelManualCompactionWithListener) {
"CompactionJob::FinishCompactionOutputFile1",
[&](void* /*arg*/) { running_compaction++; });
// Case I: 1 Notify begin compaction, 2 DisableManualCompaction, 3 Compaction
// not run, 4 Notify compaction end.
// Case I: 1 Notify begin compaction, 2 Set *canceled as true to disable
// manual compaction in the callback function, 3 Compaction not run,
// 4 Notify compaction end.
listener->code_ = Status::kIncomplete;
listener->subcode_ = Status::SubCode::kManualCompactionPaused;
@ -3533,8 +3555,9 @@ TEST_F(DBTest2, CancelManualCompactionWithListener) {
listener->num_compaction_started_ = 0;
listener->num_compaction_ended_ = 0;
// Case II: 1 DisableManualCompaction, 2 Notify begin compaction (return
// without notifying), 3 Notify compaction end (return without notifying).
// Case II: 1 Set *canceled as true in the callback function to disable manual
// compaction, 2 Notify begin compaction (return without notifying), 3 Notify
// compaction end (return without notifying).
ASSERT_TRUE(dbfull()
->CompactRange(compact_options, nullptr, nullptr)
.IsManualCompactionPaused());
@ -3545,8 +3568,8 @@ TEST_F(DBTest2, CancelManualCompactionWithListener) {
ASSERT_EQ(running_compaction, 0);
// Case III: 1 Notify begin compaction, 2 Compaction in between
// 3. DisableManualCompaction, , 4 Notify compaction end.
// compact_options.canceled->store(false, std::memory_order_release);
// 3. Set *canceled as true in the callback function to disable manual
// compaction, 4 Notify compaction end.
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"CompactionIterator:ProcessKV");

@ -456,6 +456,7 @@ Status FlushJob::MemPurge() {
snapshot_checker_);
assert(job_context_);
SequenceNumber job_snapshot_seq = job_context_->GetJobSnapshotSequence();
const std::atomic<bool> kManualCompactionCanceledFalse{false};
CompactionIterator c_iter(
iter.get(), (cfd_->internal_comparator()).user_comparator(), &merge,
kMaxSequenceNumber, &existing_snapshots_,
@ -464,10 +465,9 @@ Status FlushJob::MemPurge() {
true /* internal key corruption is not ok */, range_del_agg.get(),
nullptr, ioptions->allow_data_in_errors,
ioptions->enforce_single_del_contracts,
/*manual_compaction_canceled=*/kManualCompactionCanceledFalse,
/*compaction=*/nullptr, compaction_filter.get(),
/*shutting_down=*/nullptr,
/*manual_compaction_paused=*/nullptr,
/*manual_compaction_canceled=*/nullptr, ioptions->info_log,
/*shutting_down=*/nullptr, ioptions->info_log,
&(cfd_->GetFullHistoryTsLow()));
// Set earliest sequence number in the new memtable

@ -1834,6 +1834,13 @@ struct CompactRangeOptions {
// Cancellation can be delayed waiting on automatic compactions when used
// together with `exclusive_manual_compaction == true`.
std::atomic<bool>* canceled = nullptr;
// NOTE: Calling DisableManualCompaction() overwrites the uer-provided
// canceled variable in CompactRangeOptions.
// Typically, when CompactRange is being called in one thread (t1) with
// canceled = false, and DisableManualCompaction is being called in the
// other thread (t2), manual compaction is disabled normally, even if the
// compaction iterator may still scan a few items before *canceled is
// set to true
// If set to kForce, RocksDB will override enable_blob_file_garbage_collection
// to true; if set to kDisable, RocksDB will override it to false, and

Loading…
Cancel
Save