Parallelize L0-L1 Compaction: Restructure Compaction Job

Summary:
As of now compactions involving files from Level 0 and Level 1 are single
threaded because the files in L0, although sorted, are not range partitioned like
the other levels. This means that during L0-L1 compaction each file from L1
needs to be merged with potentially all the files from L0.

This attempt to parallelize the L0-L1 compaction assigns a thread and a
corresponding iterator to each L1 file that then considers only the key range
found in that L1 file and only the L0 files that have those keys (and only the
specific portion of those L0 files in which those keys are found). In this way
the overlap is minimized and potentially eliminated between different iterators
focusing on the same files.

The first step is to restructure the compaction logic to break L0-L1 compactions
into multiple, smaller, sequential compactions. Eventually each of these smaller
jobs will be run simultaneously. Areas to pay extra attention to are

  # Correct aggregation of compaction job statistics across multiple threads
  # Proper opening/closing of output files (make sure each thread's is unique)
  # Keys that span multiple L1 files
  # Skewed distributions of keys within L0 files

Test Plan: Make and run db_test (newer version has separate compaction tests) and compaction_job_stats_test

Reviewers: igor, noetzli, anthony, sdong, yhchiang

Reviewed By: yhchiang

Subscribers: MarkCallaghan, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D42699
main
Ari Ekmekji 10 years ago
parent 47316c2d08
commit 40c64434d4
  1. 11
      db/compaction.h
  2. 135
      db/compaction_job.cc
  3. 11
      db/compaction_job.h
  4. 43
      db/compaction_job_stats_test.cc
  5. 3
      db/version_set.cc
  6. 9
      include/rocksdb/options.h
  7. 4
      util/db_test_util.cc
  8. 1
      util/db_test_util.h
  9. 3
      util/mutable_cf_options.h
  10. 2
      util/options.cc

@ -194,6 +194,17 @@ class Compaction {
// Create a CompactionFilter from compaction_filter_factory
std::unique_ptr<CompactionFilter> CreateCompactionFilter() const;
// Should this compaction be broken up into smaller ones run in parallel?
bool IsSubCompaction() const {
return start_level_ == 0 && output_level_ == 1
&& mutable_cf_options_.num_subcompactions > 1;
}
// If is_sub_compaction == true, how many smaller compactions should execute
int NumSubCompactions() const {
return mutable_cf_options_.num_subcompactions;
}
private:
// mark (or clear) all files that are being compacted
void MarkFilesBeingCompacted(bool mark_as_compacted);

@ -203,6 +203,34 @@ void CompactionJob::Prepare() {
// Is this compaction producing files at the bottommost level?
bottommost_level_ = compact_->compaction->bottommost_level();
GetSubCompactionBoundaries();
}
// For L0-L1 compaction, iterators work in parallel by processing
// different subsets of the full key range. This function returns
// the Slices that designate the boundaries of these ranges. Now
// these boundaries are defined the key ranges of the files in L1,
// and the first and last entries are always nullptr (unrestricted)
void CompactionJob::GetSubCompactionBoundaries() {
auto* c = compact_->compaction;
auto& slices = sub_compaction_boundaries_;
if (c->IsSubCompaction()) {
// TODO(aekmekji): take the option num_subcompactions into account
// when dividing up the key range between multiple iterators instead
// of just assigning each iterator one L1 file's key range
for (size_t which = 0; which < c->num_input_levels(); which++) {
if (c->level(which) == 1) {
if (c->input_levels(which)->num_files > 1) {
const LevelFilesBrief* flevel = c->input_levels(which);
for (size_t i = 1; i < flevel->num_files; i++) {
slices.emplace_back(flevel->files[i].smallest_key);
}
}
break;
}
}
}
}
Status CompactionJob::Run() {
@ -213,12 +241,43 @@ Status CompactionJob::Run() {
auto* compaction = compact_->compaction;
LogCompaction(compaction->column_family_data(), compaction);
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
Status status;
Slice *start, *end;
for (size_t i = 0; i < sub_compaction_boundaries_.size() + 1; i++) {
if (i == 0) {
start = nullptr;
} else {
start = &sub_compaction_boundaries_[i - 1];
}
if (i == sub_compaction_boundaries_.size()) {
end = nullptr;
} else {
end = &sub_compaction_boundaries_[i];
}
status = SubCompactionRun(start, end);
if (!status.ok()) {
break;
}
}
UpdateCompactionStats();
RecordCompactionIOStats();
LogFlush(db_options_.info_log);
TEST_SYNC_POINT("CompactionJob::Run():End");
return status;
}
Status CompactionJob::SubCompactionRun(Slice* start, Slice* end) {
auto* compaction = compact_->compaction;
const uint64_t start_micros = env_->NowMicros();
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
std::unique_ptr<Iterator> input(versions_->MakeInputIterator(compaction));
input->SeekToFirst();
auto status = ProcessKeyValueCompaction(&imm_micros, input.get());
Status status = ProcessKeyValueCompaction(&imm_micros, input.get(),
start, end);
input.reset();
if (output_directory_ && !db_options_.disableDataSync) {
@ -227,12 +286,6 @@ Status CompactionJob::Run() {
compaction_stats_.micros = env_->NowMicros() - start_micros - imm_micros;
MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros);
UpdateCompactionStats();
RecordCompactionIOStats();
LogFlush(db_options_.info_log);
TEST_SYNC_POINT("CompactionJob::Run():End");
return status;
}
@ -298,7 +351,8 @@ void CompactionJob::Install(Status* status,
}
Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
Iterator* input) {
Iterator* input,
Slice* start, Slice* end) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
Status status;
@ -333,10 +387,49 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
StopWatchNano timer(env_, stats_ != nullptr);
uint64_t total_filter_time = 0;
if (start != nullptr) {
IterKey start_iter;
start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek);
Slice start_key = start_iter.GetKey();
input->Seek(start_key);
} else {
input->SeekToFirst();
}
// TODO(noetzli): check whether we could check !shutting_down_->... only
// only occasionally (see diff D42687)
while (input->Valid() && !shutting_down_->load(std::memory_order_acquire) &&
!cfd->IsDropped() && status.ok()) {
Slice key = input->key();
Slice value = input->value();
// First check that the key is parseable before performing the comparison
// to determine if it's within the range we want
if (!ParseInternalKey(key, &ikey)) {
// Do not hide error keys
// TODO: error key stays in db forever? Figure out the rationale
// v10 error v8 : we cannot hide v8 even though it's pretty obvious.
current_user_key.Clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
visible_in_snapshot = kMaxSequenceNumber;
if (compaction_job_stats_ != nullptr) {
compaction_job_stats_->num_corrupt_keys++;
}
status = WriteKeyValue(key, value, ikey, input->status());
input->Next();
continue;
}
// If an end key is specified, check if the current key is >= than it
// and exit if it is because the iterator is out of the range desired
if (end != nullptr &&
cfd->user_comparator()->Compare(ikey.user_key, *end) >= 0) {
break;
}
compact_->num_input_records++;
if (++loop_cnt > 1000) {
RecordDroppedKeys(
@ -345,9 +438,6 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
loop_cnt = 0;
}
Slice key = input->key();
Slice value = input->value();
if (compaction_job_stats_ != nullptr) {
compaction_job_stats_->total_input_raw_key_bytes += key.size();
compaction_job_stats_->total_input_raw_value_bytes += value.size();
@ -361,25 +451,6 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
}
}
// Handle key/value, add to state, etc.
if (!ParseInternalKey(key, &ikey)) {
// Do not hide error keys
// TODO: error key stays in db forever? Figure out the intention/rationale
// v10 error v8 : we cannot hide v8 even though it's pretty obvious.
current_user_key.Clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
visible_in_snapshot = kMaxSequenceNumber;
if (compaction_job_stats_ != nullptr) {
compaction_job_stats_->num_corrupt_keys++;
}
status = WriteKeyValue(key, value, ikey, input->status());
input->Next();
continue;
}
if (compaction_job_stats_ != nullptr && ikey.type == kTypeDeletion) {
compaction_job_stats_->num_input_deletion_records++;
}

@ -73,19 +73,25 @@ class CompactionJob {
void Prepare();
// REQUIRED mutex not held
Status Run();
// REQUIRED: mutex held
// status is the return of Run()
void Install(Status* status, const MutableCFOptions& mutable_cf_options,
InstrumentedMutex* db_mutex);
private:
// REQUIRED: mutex not held
Status SubCompactionRun(Slice* start, Slice* end);
void GetSubCompactionBoundaries();
// update the thread status for starting a compaction.
void ReportStartedCompaction(Compaction* compaction);
void AllocateCompactionOutputFileNumbers();
// Call compaction filter. Then iterate through input and compact the
// kv-pairs
Status ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input);
Status ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input,
Slice* start = nullptr,
Slice* end = nullptr);
Status WriteKeyValue(const Slice& key, const Slice& value,
const ParsedInternalKey& ikey,
@ -147,6 +153,7 @@ class CompactionJob {
EventLogger* event_logger_;
bool paranoid_file_checks_;
std::vector<Slice> sub_compaction_boundaries_;
};
} // namespace rocksdb

@ -84,13 +84,15 @@ std::string Key(uint64_t key, int length) {
return std::string(buf);
}
class CompactionJobStatsTest : public testing::Test {
class CompactionJobStatsTest : public testing::Test,
public testing::WithParamInterface<bool> {
public:
std::string dbname_;
std::string alternative_wal_dir_;
Env* env_;
DB* db_;
std::vector<ColumnFamilyHandle*> handles_;
bool subcompactions_enabled_;
Options last_options_;
@ -101,6 +103,8 @@ class CompactionJobStatsTest : public testing::Test {
alternative_wal_dir_ = dbname_ + "/wal";
Options options;
options.create_if_missing = true;
subcompactions_enabled_ = GetParam();
options.num_subcompactions = subcompactions_enabled_ ? 2 : 1;
auto delete_options = options;
delete_options.wal_dir = alternative_wal_dir_;
EXPECT_OK(DestroyDB(dbname_, delete_options));
@ -123,6 +127,9 @@ class CompactionJobStatsTest : public testing::Test {
EXPECT_OK(DestroyDB(dbname_, options));
}
static void SetUpTestCase() {}
static void TearDownTestCase() {}
DBImpl* dbfull() {
return reinterpret_cast<DBImpl*>(db_);
}
@ -607,7 +614,7 @@ CompressionType GetAnyCompression() {
} // namespace
TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) {
TEST_P(CompactionJobStatsTest, CompactionJobStatsTest) {
Random rnd(301);
const int kBufSize = 100;
char buf[kBufSize];
@ -634,6 +641,7 @@ TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) {
options.level0_file_num_compaction_trigger = kTestScale + 1;
options.num_levels = 3;
options.compression = kNoCompression;
options.num_subcompactions = subcompactions_enabled_ ? 2 : 1;
for (int test = 0; test < 2; ++test) {
DestroyAndReopen(options);
@ -711,6 +719,11 @@ TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) {
}
// 4th Phase: perform L0 -> L1 compaction again, expect higher write amp
// When subcompactions are enabled, the number of output files increases
// by 1 because multiple threads are consuming the input and generating
// output files without coordinating to see if the output could fit into
// a smaller number of files like it does when it runs sequentially
int num_output_files = options.num_subcompactions > 1 ? 2 : 1;
for (uint64_t start_key = key_base;
num_L0_files > 1;
start_key += key_base * sparseness) {
@ -722,13 +735,18 @@ TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) {
smallest_key, largest_key,
3, 2, num_keys_per_L0_file * 3,
kKeySize, kValueSize,
1, num_keys_per_L0_file * 2, // 1/3 of the data will be updated.
num_output_files,
num_keys_per_L0_file * 2, // 1/3 of the data will be updated.
compression_ratio,
num_keys_per_L0_file));
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
Compact(1, smallest_key, largest_key);
snprintf(buf, kBufSize, "%d,%d",
--num_L0_files, --num_L1_files);
// TODO(aekmekji): account for whether parallel L0-L1 compaction is
// enabled or not. If so then num_L1_files will increase by 1
if (options.num_subcompactions == 1) {
--num_L1_files;
}
snprintf(buf, kBufSize, "%d,%d", --num_L0_files, num_L1_files);
ASSERT_EQ(std::string(buf), FilesPerLevel(1));
}
@ -747,7 +765,11 @@ TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) {
num_keys_per_L0_file));
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
Compact(1, smallest_key, largest_key);
ASSERT_EQ("0,4", FilesPerLevel(1));
num_L1_files = options.num_subcompactions > 1 ? 7 : 4;
char L1_buf[4];
snprintf(L1_buf, sizeof(L1_buf), "0,%d", num_L1_files);
std::string L1_files(L1_buf);
ASSERT_EQ(L1_files, FilesPerLevel(1));
options.compression = GetAnyCompression();
if (options.compression == kNoCompression) {
break;
@ -758,7 +780,7 @@ TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) {
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
}
TEST_F(CompactionJobStatsTest, DeletionStatsTest) {
TEST_P(CompactionJobStatsTest, DeletionStatsTest) {
Random rnd(301);
uint64_t key_base = 100000l;
// Note: key_base must be multiple of num_keys_per_L0_file
@ -785,6 +807,7 @@ TEST_F(CompactionJobStatsTest, DeletionStatsTest) {
options.num_levels = 3;
options.compression = kNoCompression;
options.max_bytes_for_level_multiplier = 2;
options.num_subcompactions = subcompactions_enabled_ ? 2 : 1;
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
@ -854,7 +877,7 @@ int GetUniversalCompactionInputUnits(uint32_t num_flushes) {
}
} // namespace
TEST_F(CompactionJobStatsTest, UniversalCompactionTest) {
TEST_P(CompactionJobStatsTest, UniversalCompactionTest) {
Random rnd(301);
uint64_t key_base = 100000000l;
// Note: key_base must be multiple of num_keys_per_L0_file
@ -876,6 +899,8 @@ TEST_F(CompactionJobStatsTest, UniversalCompactionTest) {
options.compaction_style = kCompactionStyleUniversal;
options.compaction_options_universal.size_ratio = 1;
options.compaction_options_universal.max_size_amplification_percent = 1000;
options.num_subcompactions = subcompactions_enabled_ ? 2 : 1;
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
@ -923,6 +948,8 @@ TEST_F(CompactionJobStatsTest, UniversalCompactionTest) {
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
}
INSTANTIATE_TEST_CASE_P(CompactionJobStatsTest, CompactionJobStatsTest,
::testing::Bool());
} // namespace rocksdb
int main(int argc, char** argv) {

@ -3034,6 +3034,9 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) {
read_options.verify_checksums =
c->mutable_cf_options()->verify_checksums_in_compaction;
read_options.fill_cache = false;
if (c->IsSubCompaction()) {
read_options.total_order_seek = true;
}
// Level-0 files have to be merged together. For other levels,
// we will make a concatenating iterator per level.

@ -885,6 +885,15 @@ struct DBOptions {
// Default: 1
int max_background_compactions;
// This integer represents the maximum number of threads that will
// concurrently perform a level-based compaction from L0 to L1. A value
// of 1 means there is no parallelism, and a greater number enables a
// multi-threaded version of the L0-L1 compaction that divides the compaction
// into multiple, smaller ones that are run simultaneously. This is still
// under development and is only available for level-based compaction.
// Default: 1
int num_subcompactions;
// Maximum number of concurrent background memtable flush jobs, submitted to
// the HIGH priority thread pool.
//

@ -302,6 +302,10 @@ Options DBTestBase::CurrentOptions(
options.row_cache = NewLRUCache(1024 * 1024);
break;
}
case kLevelSubcompactions: {
options.num_subcompactions = 2;
break;
}
default:
break;

@ -421,6 +421,7 @@ class DBTestBase : public testing::Test {
kFIFOCompaction = 25,
kOptimizeFiltersForHits = 26,
kRowCache = 27,
kLevelSubcompactions = 28,
kEnd = 28
};
int option_config_;

@ -40,6 +40,7 @@ struct MutableCFOptions {
max_bytes_for_level_multiplier_additional(
options.max_bytes_for_level_multiplier_additional),
verify_checksums_in_compaction(options.verify_checksums_in_compaction),
num_subcompactions(options.num_subcompactions),
max_sequential_skip_in_iterations(
options.max_sequential_skip_in_iterations),
paranoid_file_checks(options.paranoid_file_checks)
@ -70,6 +71,7 @@ struct MutableCFOptions {
max_bytes_for_level_base(0),
max_bytes_for_level_multiplier(0),
verify_checksums_in_compaction(false),
num_subcompactions(1),
max_sequential_skip_in_iterations(0),
paranoid_file_checks(false)
{}
@ -121,6 +123,7 @@ struct MutableCFOptions {
int max_bytes_for_level_multiplier;
std::vector<int> max_bytes_for_level_multiplier_additional;
bool verify_checksums_in_compaction;
int num_subcompactions;
// Misc options
uint64_t max_sequential_skip_in_iterations;

@ -214,6 +214,7 @@ DBOptions::DBOptions()
wal_dir(""),
delete_obsolete_files_period_micros(6 * 60 * 60 * 1000000UL),
max_background_compactions(1),
num_subcompactions(1),
max_background_flushes(1),
max_log_file_size(0),
log_file_time_to_roll(0),
@ -261,6 +262,7 @@ DBOptions::DBOptions(const Options& options)
delete_obsolete_files_period_micros(
options.delete_obsolete_files_period_micros),
max_background_compactions(options.max_background_compactions),
num_subcompactions(options.num_subcompactions),
max_background_flushes(options.max_background_flushes),
max_log_file_size(options.max_log_file_size),
log_file_time_to_roll(options.log_file_time_to_roll),

Loading…
Cancel
Save