Cancel compact range (#8351)

Summary:
Added the ability to cancel an in-progress range compaction by storing to an atomic "canceled" variable pointed to within the CompactRangeOptions structure.

Tested via two tests added to db_tests2.cc.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8351

Reviewed By: ajkr

Differential Revision: D28808894

Pulled By: ddevec

fbshipit-source-id: cb321361c9e23b084b188bb203f11c375a22c2dd
main
David Devecsery 4 years ago committed by Facebook GitHub Bot
parent 707f8d168a
commit 80a59a03a7
  1. 1
      HISTORY.md
  2. 3
      db/builder.cc
  3. 6
      db/compaction/compaction_iterator.cc
  4. 27
      db/compaction/compaction_iterator.h
  5. 3
      db/compaction/compaction_iterator_test.cc
  6. 20
      db/compaction/compaction_job.cc
  7. 2
      db/compaction/compaction_job.h
  8. 3
      db/compaction/compaction_job_test.cc
  9. 1
      db/db_impl/db_impl.h
  10. 16
      db/db_impl/db_impl_compaction_flush.cc
  11. 175
      db/db_test2.cc
  12. 3
      include/rocksdb/options.h

@ -26,6 +26,7 @@
* Add an experimental Remote Compaction feature, which allows the user to run Compaction on a different host or process. The feature is still under development, currently only works on some basic use cases. The interface will be changed without backward/forward compatibility support. * Add an experimental Remote Compaction feature, which allows the user to run Compaction on a different host or process. The feature is still under development, currently only works on some basic use cases. The interface will be changed without backward/forward compatibility support.
* RocksDB would validate total entries read in flush, and compare with counter inserted into it. If flush_verify_memtable_count = true (default), flush will fail. Otherwise, only log to info logs. * RocksDB would validate total entries read in flush, and compare with counter inserted into it. If flush_verify_memtable_count = true (default), flush will fail. Otherwise, only log to info logs.
* Add `TableProperties::num_filter_entries`, which can be used with `TableProperties::filter_size` to calculate the effective bits per filter entry (unique user key or prefix) for a table file. * Add `TableProperties::num_filter_entries`, which can be used with `TableProperties::filter_size` to calculate the effective bits per filter entry (unique user key or prefix) for a table file.
* Added a `cancel` field to `CompactRangeOptions`, allowing individual in-process manual range compactions to be cancelled.
### Performance Improvements ### Performance Improvements
* BlockPrefetcher is used by iterators to prefetch data if they anticipate more data to be used in future. It is enabled implicitly by rocksdb. Added change to take in account read pattern if reads are sequential. This would disable prefetching for random reads in MultiGet and iterators as readahead_size is increased exponential doing large prefetches. * BlockPrefetcher is used by iterators to prefetch data if they anticipate more data to be used in future. It is enabled implicitly by rocksdb. Added change to take in account read pattern if reads are sequential. This would disable prefetching for random reads in MultiGet and iterators as readahead_size is increased exponential doing large prefetches.

@ -191,7 +191,8 @@ Status BuildTable(
/*compaction=*/nullptr, compaction_filter.get(), /*compaction=*/nullptr, compaction_filter.get(),
/*shutting_down=*/nullptr, /*shutting_down=*/nullptr,
/*preserve_deletes_seqnum=*/0, /*manual_compaction_paused=*/nullptr, /*preserve_deletes_seqnum=*/0, /*manual_compaction_paused=*/nullptr,
db_options.info_log, full_history_ts_low); /*manual_compaction_canceled=*/nullptr, db_options.info_log,
full_history_ts_low);
c_iter.SeekToFirst(); c_iter.SeekToFirst();
for (; c_iter.Valid(); c_iter.Next()) { for (; c_iter.Valid(); c_iter.Next()) {

@ -45,6 +45,7 @@ CompactionIterator::CompactionIterator(
const std::atomic<bool>* shutting_down, const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum, const SequenceNumber preserve_deletes_seqnum,
const std::atomic<int>* manual_compaction_paused, const std::atomic<int>* manual_compaction_paused,
const std::atomic<bool>* manual_compaction_canceled,
const std::shared_ptr<Logger> info_log, const std::shared_ptr<Logger> info_log,
const std::string* full_history_ts_low) const std::string* full_history_ts_low)
: CompactionIterator( : CompactionIterator(
@ -55,7 +56,8 @@ CompactionIterator::CompactionIterator(
std::unique_ptr<CompactionProxy>( std::unique_ptr<CompactionProxy>(
compaction ? new RealCompaction(compaction) : nullptr), compaction ? new RealCompaction(compaction) : nullptr),
compaction_filter, shutting_down, preserve_deletes_seqnum, compaction_filter, shutting_down, preserve_deletes_seqnum,
manual_compaction_paused, info_log, full_history_ts_low) {} manual_compaction_paused, manual_compaction_canceled, info_log,
full_history_ts_low) {}
CompactionIterator::CompactionIterator( CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
@ -70,6 +72,7 @@ CompactionIterator::CompactionIterator(
const std::atomic<bool>* shutting_down, const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum, const SequenceNumber preserve_deletes_seqnum,
const std::atomic<int>* manual_compaction_paused, const std::atomic<int>* manual_compaction_paused,
const std::atomic<bool>* manual_compaction_canceled,
const std::shared_ptr<Logger> info_log, const std::shared_ptr<Logger> info_log,
const std::string* full_history_ts_low) const std::string* full_history_ts_low)
: input_( : input_(
@ -91,6 +94,7 @@ CompactionIterator::CompactionIterator(
compaction_filter_(compaction_filter), compaction_filter_(compaction_filter),
shutting_down_(shutting_down), shutting_down_(shutting_down),
manual_compaction_paused_(manual_compaction_paused), manual_compaction_paused_(manual_compaction_paused),
manual_compaction_canceled_(manual_compaction_canceled),
preserve_deletes_seqnum_(preserve_deletes_seqnum), preserve_deletes_seqnum_(preserve_deletes_seqnum),
info_log_(info_log), info_log_(info_log),
allow_data_in_errors_(allow_data_in_errors), allow_data_in_errors_(allow_data_in_errors),

@ -150,38 +150,38 @@ class CompactionIterator {
const Compaction* compaction_; const Compaction* compaction_;
}; };
CompactionIterator(InternalIterator* input, const Comparator* cmp, CompactionIterator(
MergeHelper* merge_helper, SequenceNumber last_sequence, InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
std::vector<SequenceNumber>* snapshots, SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot, SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env, const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key, bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg, CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder, BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
bool allow_data_in_errors,
const Compaction* compaction = nullptr, const Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr, const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr, const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0, const SequenceNumber preserve_deletes_seqnum = 0,
const std::atomic<int>* manual_compaction_paused = nullptr, const std::atomic<int>* manual_compaction_paused = nullptr,
const std::atomic<bool>* manual_compaction_canceled = nullptr,
const std::shared_ptr<Logger> info_log = nullptr, const std::shared_ptr<Logger> info_log = nullptr,
const std::string* full_history_ts_low = nullptr); const std::string* full_history_ts_low = nullptr);
// Constructor with custom CompactionProxy, used for tests. // Constructor with custom CompactionProxy, used for tests.
CompactionIterator(InternalIterator* input, const Comparator* cmp, CompactionIterator(
MergeHelper* merge_helper, SequenceNumber last_sequence, InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
std::vector<SequenceNumber>* snapshots, SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot, SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env, const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key, bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg, CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder, BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
bool allow_data_in_errors,
std::unique_ptr<CompactionProxy> compaction, std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter = nullptr, const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr, const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0, const SequenceNumber preserve_deletes_seqnum = 0,
const std::atomic<int>* manual_compaction_paused = nullptr, const std::atomic<int>* manual_compaction_paused = nullptr,
const std::atomic<bool>* manual_compaction_canceled = nullptr,
const std::shared_ptr<Logger> info_log = nullptr, const std::shared_ptr<Logger> info_log = nullptr,
const std::string* full_history_ts_low = nullptr); const std::string* full_history_ts_low = nullptr);
@ -303,6 +303,7 @@ class CompactionIterator {
const CompactionFilter* compaction_filter_; const CompactionFilter* compaction_filter_;
const std::atomic<bool>* shutting_down_; const std::atomic<bool>* shutting_down_;
const std::atomic<int>* manual_compaction_paused_; const std::atomic<int>* manual_compaction_paused_;
const std::atomic<bool>* manual_compaction_canceled_;
const SequenceNumber preserve_deletes_seqnum_; const SequenceNumber preserve_deletes_seqnum_;
bool bottommost_level_; bool bottommost_level_;
bool valid_ = false; bool valid_ = false;
@ -399,8 +400,10 @@ class CompactionIterator {
bool IsPausingManualCompaction() { bool IsPausingManualCompaction() {
// This is a best-effort facility, so memory_order_relaxed is sufficient. // This is a best-effort facility, so memory_order_relaxed is sufficient.
return manual_compaction_paused_ && return (manual_compaction_paused_ &&
manual_compaction_paused_->load(std::memory_order_relaxed) > 0; manual_compaction_paused_->load(std::memory_order_relaxed) > 0) ||
(manual_compaction_canceled_ &&
manual_compaction_canceled_->load(std::memory_order_relaxed));
} }
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -282,7 +282,8 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
range_del_agg_.get(), nullptr /* blob_file_builder */, range_del_agg_.get(), nullptr /* blob_file_builder */,
true /*allow_data_in_errors*/, std::move(compaction), filter, true /*allow_data_in_errors*/, std::move(compaction), filter,
&shutting_down_, /*preserve_deletes_seqnum=*/0, &shutting_down_, /*preserve_deletes_seqnum=*/0,
/*manual_compaction_paused=*/nullptr, /*info_log=*/nullptr, /*manual_compaction_paused=*/nullptr,
/*manual_compaction_canceled=*/nullptr, /*info_log=*/nullptr,
full_history_ts_low)); full_history_ts_low));
} }

@ -313,9 +313,10 @@ CompactionJob::CompactionJob(
EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats, EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname, CompactionJobStats* compaction_job_stats, const std::string& dbname, CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer, Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<int>* manual_compaction_paused, const std::string& db_id, const std::atomic<int>* manual_compaction_paused,
const std::string& db_session_id, std::string full_history_ts_low, const std::atomic<bool>* manual_compaction_canceled,
BlobFileCompletionCallback* blob_callback) const std::string& db_id, const std::string& db_session_id,
std::string full_history_ts_low, BlobFileCompletionCallback* blob_callback)
: compact_(new CompactionState(compaction)), : compact_(new CompactionState(compaction)),
compaction_stats_(compaction->compaction_reason(), 1), compaction_stats_(compaction->compaction_reason(), 1),
db_options_(db_options), db_options_(db_options),
@ -339,6 +340,7 @@ CompactionJob::CompactionJob(
versions_(versions), versions_(versions),
shutting_down_(shutting_down), shutting_down_(shutting_down),
manual_compaction_paused_(manual_compaction_paused), manual_compaction_paused_(manual_compaction_paused),
manual_compaction_canceled_(manual_compaction_canceled),
preserve_deletes_seqnum_(preserve_deletes_seqnum), preserve_deletes_seqnum_(preserve_deletes_seqnum),
db_directory_(db_directory), db_directory_(db_directory),
blob_output_directory_(blob_output_directory), blob_output_directory_(blob_output_directory),
@ -1172,8 +1174,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
/*expect_valid_internal_key=*/true, &range_del_agg, /*expect_valid_internal_key=*/true, &range_del_agg,
blob_file_builder.get(), db_options_.allow_data_in_errors, blob_file_builder.get(), db_options_.allow_data_in_errors,
sub_compact->compaction, compaction_filter, shutting_down_, sub_compact->compaction, compaction_filter, shutting_down_,
preserve_deletes_seqnum_, manual_compaction_paused_, db_options_.info_log, preserve_deletes_seqnum_, manual_compaction_paused_,
full_history_ts_low)); manual_compaction_canceled_, db_options_.info_log, full_history_ts_low));
auto c_iter = sub_compact->c_iter.get(); auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst(); c_iter->SeekToFirst();
if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) { if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
@ -1317,8 +1319,10 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
status = Status::ShutdownInProgress("Database shutdown"); status = Status::ShutdownInProgress("Database shutdown");
} }
if ((status.ok() || status.IsColumnFamilyDropped()) && if ((status.ok() || status.IsColumnFamilyDropped()) &&
(manual_compaction_paused_ && ((manual_compaction_paused_ &&
manual_compaction_paused_->load(std::memory_order_relaxed) > 0)) { manual_compaction_paused_->load(std::memory_order_relaxed) > 0) ||
(manual_compaction_canceled_ &&
manual_compaction_canceled_->load(std::memory_order_relaxed)))) {
status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
} }
if (status.ok()) { if (status.ok()) {
@ -2126,7 +2130,7 @@ CompactionServiceCompactionJob::CompactionServiceCompactionJob(
compaction->mutable_cf_options()->paranoid_file_checks, compaction->mutable_cf_options()->paranoid_file_checks,
compaction->mutable_cf_options()->report_bg_io_stats, dbname, compaction->mutable_cf_options()->report_bg_io_stats, dbname,
&(compaction_service_result->stats), Env::Priority::USER, io_tracer, &(compaction_service_result->stats), Env::Priority::USER, io_tracer,
nullptr, db_id, db_session_id, nullptr, nullptr, db_id, db_session_id,
compaction->column_family_data()->GetFullHistoryTsLow()), compaction->column_family_data()->GetFullHistoryTsLow()),
output_path_(output_path), output_path_(output_path),
compaction_input_(compaction_service_input), compaction_input_(compaction_service_input),

@ -81,6 +81,7 @@ class CompactionJob {
const std::string& dbname, CompactionJobStats* compaction_job_stats, const std::string& dbname, CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer, Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<int>* manual_compaction_paused = nullptr, const std::atomic<int>* manual_compaction_paused = nullptr,
const std::atomic<bool>* manual_compaction_canceled = nullptr,
const std::string& db_id = "", const std::string& db_session_id = "", const std::string& db_id = "", const std::string& db_session_id = "",
std::string full_history_ts_low = "", std::string full_history_ts_low = "",
BlobFileCompletionCallback* blob_callback = nullptr); BlobFileCompletionCallback* blob_callback = nullptr);
@ -185,6 +186,7 @@ class CompactionJob {
VersionSet* versions_; VersionSet* versions_;
const std::atomic<bool>* shutting_down_; const std::atomic<bool>* shutting_down_;
const std::atomic<int>* manual_compaction_paused_; const std::atomic<int>* manual_compaction_paused_;
const std::atomic<bool>* manual_compaction_canceled_;
const SequenceNumber preserve_deletes_seqnum_; const SequenceNumber preserve_deletes_seqnum_;
FSDirectory* db_directory_; FSDirectory* db_directory_;
FSDirectory* blob_output_directory_; FSDirectory* blob_output_directory_;

@ -350,7 +350,8 @@ class CompactionJobTestBase : public testing::Test {
earliest_write_conflict_snapshot, snapshot_checker, table_cache_, earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
&event_logger, false, false, dbname_, &compaction_job_stats_, &event_logger, false, false, dbname_, &compaction_job_stats_,
Env::Priority::USER, nullptr /* IOTracer */, Env::Priority::USER, nullptr /* IOTracer */,
/*manual_compaction_paused=*/nullptr, /*db_id=*/"", /*manual_compaction_paused=*/nullptr,
/*manual_compaction_canceled=*/nullptr, /*db_id=*/"",
/*db_session_id=*/"", full_history_ts_low_); /*db_session_id=*/"", full_history_ts_low_);
VerifyInitializationOfCompactionJobStats(compaction_job_stats_); VerifyInitializationOfCompactionJobStats(compaction_job_stats_);

@ -1463,6 +1463,7 @@ class DBImpl : public DB {
InternalKey* manual_end; // how far we are compacting InternalKey* manual_end; // how far we are compacting
InternalKey tmp_storage; // Used to keep track of compaction progress InternalKey tmp_storage; // Used to keep track of compaction progress
InternalKey tmp_storage1; // Used to keep track of compaction progress InternalKey tmp_storage1; // Used to keep track of compaction progress
std::atomic<bool>* canceled; // Compaction canceled by the user?
}; };
struct PrepickedCompaction { struct PrepickedCompaction {
// background compaction takes ownership of `compaction`. // background compaction takes ownership of `compaction`.

@ -807,6 +807,10 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
return Status::Incomplete(Status::SubCode::kManualCompactionPaused); return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
} }
if (options.canceled && options.canceled->load(std::memory_order_acquire)) {
return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
const Comparator* const ucmp = column_family->GetComparator(); const Comparator* const ucmp = column_family->GetComparator();
assert(ucmp); assert(ucmp);
size_t ts_sz = ucmp->timestamp_size(); size_t ts_sz = ucmp->timestamp_size();
@ -1253,7 +1257,7 @@ Status DBImpl::CompactFilesImpl(
c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_, c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, Env::Priority::USER, io_tracer_, &compaction_job_stats, Env::Priority::USER, io_tracer_,
&manual_compaction_paused_, db_id_, db_session_id_, &manual_compaction_paused_, nullptr, db_id_, db_session_id_,
c->column_family_data()->GetFullHistoryTsLow()); c->column_family_data()->GetFullHistoryTsLow());
// Creating a compaction influences the compaction score because the score // Creating a compaction influences the compaction score because the score
@ -1426,10 +1430,13 @@ void DBImpl::NotifyOnCompactionCompleted(
if (shutting_down_.load(std::memory_order_acquire)) { if (shutting_down_.load(std::memory_order_acquire)) {
return; return;
} }
// TODO: Should disabling manual compaction squash compaction completed
// notifications that aren't the result of a shutdown?
if (c->is_manual_compaction() && if (c->is_manual_compaction() &&
manual_compaction_paused_.load(std::memory_order_acquire) > 0) { manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
return; return;
} }
Version* current = cfd->current(); Version* current = cfd->current();
current->Ref(); current->Ref();
// release lock while notifying events // release lock while notifying events
@ -1654,6 +1661,7 @@ Status DBImpl::RunManualCompaction(
manual.incomplete = false; manual.incomplete = false;
manual.exclusive = exclusive; manual.exclusive = exclusive;
manual.disallow_trivial_move = disallow_trivial_move; manual.disallow_trivial_move = disallow_trivial_move;
manual.canceled = compact_range_options.canceled;
// For universal compaction, we enforce every manual compaction to compact // For universal compaction, we enforce every manual compaction to compact
// all files. // all files.
if (begin == nullptr || if (begin == nullptr ||
@ -2819,6 +2827,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
} else if (is_manual && } else if (is_manual &&
manual_compaction_paused_.load(std::memory_order_acquire) > 0) { manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
} else if (is_manual && manual_compaction->canceled &&
manual_compaction->canceled->load(std::memory_order_acquire)) {
status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
} }
} else { } else {
status = error_handler_.GetBGError(); status = error_handler_.GetBGError();
@ -3140,7 +3151,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_, c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, thread_pri, io_tracer_, &compaction_job_stats, thread_pri, io_tracer_,
is_manual ? &manual_compaction_paused_ : nullptr, db_id_, is_manual ? &manual_compaction_paused_ : nullptr,
is_manual ? manual_compaction->canceled : nullptr, db_id_,
db_session_id_, c->column_family_data()->GetFullHistoryTsLow()); db_session_id_, c->column_family_data()->GetFullHistoryTsLow());
compaction_job.Prepare(); compaction_job.Prepare();

@ -9,6 +9,7 @@
#include <atomic> #include <atomic>
#include <cstdlib> #include <cstdlib>
#include <functional> #include <functional>
#include <memory>
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "db/read_callback.h" #include "db/read_callback.h"
@ -3201,6 +3202,180 @@ TEST_F(DBTest2, PausingManualCompaction4) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
} }
TEST_F(DBTest2, CancelManualCompaction1) {
CompactRangeOptions compact_options;
auto canceledPtr =
std::unique_ptr<std::atomic<bool>>(new std::atomic<bool>{true});
compact_options.canceled = canceledPtr.get();
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.num_levels = 7;
Random rnd(301);
auto generate_files = [&]() {
for (int i = 0; i < options.num_levels; i++) {
for (int j = 0; j < options.num_levels - i + 1; j++) {
for (int k = 0; k < 1000; k++) {
ASSERT_OK(Put(Key(k + j * 1000), rnd.RandomString(50)));
}
Flush();
}
for (int l = 1; l < options.num_levels - i; l++) {
MoveFilesToLevel(l);
}
}
};
DestroyAndReopen(options);
generate_files();
#ifndef ROCKSDB_LITE
ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
#endif // !ROCKSDB_LITE
int run_manual_compactions = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():PausingManualCompaction:1",
[&](void* /*arg*/) { run_manual_compactions++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// Setup a callback to disable compactions after a couple of levels are
// compacted
int compactions_run = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::RunManualCompaction()::1",
[&](void* /*arg*/) { ++compactions_run; });
dbfull()->CompactRange(compact_options, nullptr, nullptr);
dbfull()->TEST_WaitForCompact(true);
// Since compactions are disabled, we shouldn't start compacting.
// E.g. we should call the compaction function exactly one time.
ASSERT_EQ(compactions_run, 0);
ASSERT_EQ(run_manual_compactions, 0);
#ifndef ROCKSDB_LITE
ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
#endif // !ROCKSDB_LITE
compactions_run = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"DBImpl::RunManualCompaction()::1");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::RunManualCompaction()::1", [&](void* /*arg*/) {
++compactions_run;
// After 3 compactions disable
if (compactions_run == 3) {
compact_options.canceled->store(true, std::memory_order_release);
}
});
compact_options.canceled->store(false, std::memory_order_release);
dbfull()->CompactRange(compact_options, nullptr, nullptr);
dbfull()->TEST_WaitForCompact(true);
ASSERT_EQ(compactions_run, 3);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"DBImpl::RunManualCompaction()::1");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"CompactionJob::Run():PausingManualCompaction:1");
// Compactions should work again if we re-enable them..
compact_options.canceled->store(false, std::memory_order_relaxed);
dbfull()->CompactRange(compact_options, nullptr, nullptr);
dbfull()->TEST_WaitForCompact(true);
#ifndef ROCKSDB_LITE
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
#endif // !ROCKSDB_LITE
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, CancelManualCompaction2) {
CompactRangeOptions compact_options;
auto canceledPtr =
std::unique_ptr<std::atomic<bool>>(new std::atomic<bool>{true});
compact_options.canceled = canceledPtr.get();
compact_options.max_subcompactions = 1;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.num_levels = 7;
Random rnd(301);
auto generate_files = [&]() {
for (int i = 0; i < options.num_levels; i++) {
for (int j = 0; j < options.num_levels - i + 1; j++) {
for (int k = 0; k < 1000; k++) {
ASSERT_OK(Put(Key(k + j * 1000), rnd.RandomString(50)));
}
Flush();
}
for (int l = 1; l < options.num_levels - i; l++) {
MoveFilesToLevel(l);
}
}
};
DestroyAndReopen(options);
generate_files();
#ifndef ROCKSDB_LITE
ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
#endif // !ROCKSDB_LITE
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
int compactions_run = 0;
std::atomic<int> kv_compactions{0};
int compactions_stopped_at = 0;
int kv_compactions_stopped_at = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::RunManualCompaction()::1", [&](void* /*arg*/) {
++compactions_run;
// After 3 compactions disable
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionIterator:ProcessKV", [&](void* /*arg*/) {
int kv_compactions_run =
kv_compactions.fetch_add(1, std::memory_order_release);
if (kv_compactions_run == 5) {
compact_options.canceled->store(true, std::memory_order_release);
kv_compactions_stopped_at = kv_compactions_run;
compactions_stopped_at = compactions_run;
}
});
compact_options.canceled->store(false, std::memory_order_release);
dbfull()->CompactRange(compact_options, nullptr, nullptr);
dbfull()->TEST_WaitForCompact(true);
// NOTE: as we set compact_options.max_subcompacitons = 1, and store true to
// the canceled variable from the single compacting thread (via callback),
// this value is deterministically kv_compactions_stopped_at + 1.
ASSERT_EQ(kv_compactions, kv_compactions_stopped_at + 1);
ASSERT_EQ(compactions_run, compactions_stopped_at);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"CompactionIterator::ProcessKV");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"DBImpl::RunManualCompaction()::1");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"CompactionJob::Run():PausingManualCompaction:1");
// Compactions should work again if we re-enable them..
compact_options.canceled->store(false, std::memory_order_relaxed);
dbfull()->CompactRange(compact_options, nullptr, nullptr);
dbfull()->TEST_WaitForCompact(true);
#ifndef ROCKSDB_LITE
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
#endif // !ROCKSDB_LITE
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, OptimizeForPointLookup) { TEST_F(DBTest2, OptimizeForPointLookup) {
Options options = CurrentOptions(); Options options = CurrentOptions();
Close(); Close();

@ -1656,6 +1656,9 @@ struct CompactRangeOptions {
// Set user-defined timestamp low bound, the data with older timestamp than // Set user-defined timestamp low bound, the data with older timestamp than
// low bound maybe GCed by compaction. Default: nullptr // low bound maybe GCed by compaction. Default: nullptr
Slice* full_history_ts_low = nullptr; Slice* full_history_ts_low = nullptr;
// Allows cancellation of an in-progress manual compaction.
std::atomic<bool>* canceled = nullptr;
}; };
// IngestExternalFileOptions is used by IngestExternalFile() // IngestExternalFileOptions is used by IngestExternalFile()

Loading…
Cancel
Save