[Parallel L0-L1 Compaction Prep]: Giving Subcompactions Their Own State

Summary:
In prepration for running multiple threads at the same time during
a compaction job, this patch assigns each subcompaction its own state
(instead of sharing the one global CompactionState). Each subcompaction then
uses this state to update its statistics, keep track of its snapshots, etc.
during the course of execution. Then at the end of all the executions the
statistics are aggregated across the subcompactions so that the final result
is the same as if only one larger compaction had run.

Test Plan: ./db_test  ./db_compaction_test  ./compaction_job_test

Reviewers: sdong, anthony, igor, noetzli, yhchiang

Reviewed By: yhchiang

Subscribers: MarkCallaghan, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D43239
main
Ari Ekmekji 10 years ago
parent f32a572099
commit f0da6977a3
  1. 13
      db/compaction.cc
  2. 14
      db/compaction.h
  3. 633
      db/compaction_job.cc
  4. 48
      db/compaction_job.h
  5. 4
      db/compaction_job_test.cc
  6. 8
      db/db_impl.cc
  7. 2
      include/rocksdb/compaction_job_stats.h
  8. 7
      tools/db_stress.cc
  9. 29
      util/compaction_job_stats_impl.cc

@ -101,8 +101,7 @@ Compaction::Compaction(VersionStorageInfo* vstorage,
score_(_score),
bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)),
is_full_compaction_(IsFullCompaction(vstorage, inputs_)),
is_manual_compaction_(_manual_compaction),
level_ptrs_(std::vector<size_t>(number_levels_, 0)) {
is_manual_compaction_(_manual_compaction) {
MarkFilesBeingCompacted(true);
#ifndef NDEBUG
@ -187,8 +186,11 @@ void Compaction::AddInputDeletions(VersionEdit* out_edit) {
}
}
bool Compaction::KeyNotExistsBeyondOutputLevel(const Slice& user_key) {
bool Compaction::KeyNotExistsBeyondOutputLevel(
const Slice& user_key, std::vector<size_t>* level_ptrs) const {
assert(input_version_ != nullptr);
assert(level_ptrs != nullptr);
assert(level_ptrs->size() == static_cast<size_t>(number_levels_));
assert(cfd_->ioptions()->compaction_style != kCompactionStyleFIFO);
if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) {
return bottommost_level_;
@ -198,8 +200,8 @@ bool Compaction::KeyNotExistsBeyondOutputLevel(const Slice& user_key) {
for (int lvl = output_level_ + 1; lvl < number_levels_; lvl++) {
const std::vector<FileMetaData*>& files =
input_version_->storage_info()->LevelFiles(lvl);
for (; level_ptrs_[lvl] < files.size(); ) {
FileMetaData* f = files[level_ptrs_[lvl]];
for (; level_ptrs->at(lvl) < files.size(); level_ptrs->at(lvl)++) {
auto* f = files[level_ptrs->at(lvl)];
if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) {
// We've advanced far enough
if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) {
@ -209,7 +211,6 @@ bool Compaction::KeyNotExistsBeyondOutputLevel(const Slice& user_key) {
}
break;
}
level_ptrs_[lvl]++;
}
}
return true;

@ -128,7 +128,8 @@ class Compaction {
// Returns true if the available information we have guarantees that
// the input "user_key" does not exist in any level beyond "output_level()".
bool KeyNotExistsBeyondOutputLevel(const Slice& user_key);
bool KeyNotExistsBeyondOutputLevel(const Slice& user_key,
std::vector<size_t>* level_ptrs) const;
// Returns true iff we should stop building the current output
// before processing "internal_key".
@ -168,6 +169,9 @@ class Compaction {
// are non-overlapping and can be trivially moved.
bool is_trivial_move() { return is_trivial_move_; }
// How many total levels are there?
int number_levels() const { return number_levels_; }
// Return the MutableCFOptions that should be used throughout the compaction
// procedure
const MutableCFOptions* mutable_cf_options() { return &mutable_cf_options_; }
@ -258,16 +262,8 @@ class Compaction {
// True if we can do trivial move in Universal multi level
// compaction
bool is_trivial_move_;
// "level_ptrs_" holds indices into "input_version_->levels_", where each
// index remembers which file of an associated level we are currently used
// to check KeyNotExistsBeyondOutputLevel() for deletion operation.
// As it is for checking KeyNotExistsBeyondOutputLevel(), it only
// records indices for all levels beyond "output_level_".
std::vector<size_t> level_ptrs_;
// Does input compression match the output compression?
bool InputCompressionMatchesOutput() const;
};

File diff suppressed because it is too large Load Diff

@ -75,48 +75,51 @@ class CompactionJob {
Status Run();
// REQUIRED: mutex held
// status is the return of Run()
void Install(Status* status, const MutableCFOptions& mutable_cf_options,
InstrumentedMutex* db_mutex);
Status Install(const MutableCFOptions& mutable_cf_options,
InstrumentedMutex* db_mutex);
private:
// REQUIRED: mutex not held
Status SubCompactionRun(Slice* start, Slice* end);
struct SubCompactionState;
void AggregateStatistics();
// Set up the individual states used by each subcompaction
void InitializeSubCompactions(const SequenceNumber& earliest,
const SequenceNumber& visible,
const SequenceNumber& latest);
void GetSubCompactionBoundaries();
// update the thread status for starting a compaction.
void ReportStartedCompaction(Compaction* compaction);
void AllocateCompactionOutputFileNumbers();
// Call compaction filter. Then iterate through input and compact the
// kv-pairs
Status ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input,
Slice* start = nullptr,
Slice* end = nullptr);
void ProcessKeyValueCompaction(SubCompactionState* sub_compact);
Status WriteKeyValue(const Slice& key, const Slice& value,
const ParsedInternalKey& ikey,
const Status& input_status);
Status FinishCompactionOutputFile(const Status& input_status);
Status InstallCompactionResults(InstrumentedMutex* db_mutex,
const MutableCFOptions& mutable_cf_options);
SequenceNumber findEarliestVisibleSnapshot(
SequenceNumber in, const std::vector<SequenceNumber>& snapshots,
SequenceNumber* prev_snapshot);
const Status& input_status,
SubCompactionState* sub_compact);
Status FinishCompactionOutputFile(const Status& input_status,
SubCompactionState* sub_compact);
Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options,
InstrumentedMutex* db_mutex);
SequenceNumber findEarliestVisibleSnapshot(SequenceNumber in,
SequenceNumber* prev_snapshot);
void RecordCompactionIOStats();
Status OpenCompactionOutputFile();
void CleanupCompaction(const Status& status);
Status OpenCompactionOutputFile(SubCompactionState* sub_compact);
void CleanupCompaction();
void UpdateCompactionJobStats(
const InternalStats::CompactionStats& stats) const;
void RecordDroppedKeys(int64_t* key_drop_user,
int64_t* key_drop_newer_entry,
int64_t* key_drop_obsolete);
int64_t* key_drop_obsolete,
CompactionJobStats* compaction_job_stats = nullptr);
void UpdateCompactionStats();
void UpdateCompactionInputStatsHelper(
int* num_files, uint64_t* bytes_read, int input_level);
void LogCompaction(ColumnFamilyData* cfd, Compaction* compaction);
void LogCompaction();
int job_id_;
@ -126,9 +129,6 @@ class CompactionJob {
CompactionJobStats* compaction_job_stats_;
bool bottommost_level_;
SequenceNumber earliest_snapshot_;
SequenceNumber visible_at_tip_;
SequenceNumber latest_snapshot_;
InternalStats::CompactionStats compaction_stats_;

@ -253,8 +253,8 @@ class CompactionJobTest : public testing::Test {
s = compaction_job.Run();
ASSERT_OK(s);
mutex_.Lock();
compaction_job.Install(&s, *cfd->GetLatestMutableCFOptions(), &mutex_);
ASSERT_OK(s);
ASSERT_OK(compaction_job.Install(*cfd->GetLatestMutableCFOptions(),
&mutex_));
mutex_.Unlock();
ASSERT_GE(compaction_job_stats_.elapsed_micros, 0U);

@ -1687,10 +1687,10 @@ Status DBImpl::CompactFilesImpl(
compaction_job.Prepare();
mutex_.Unlock();
Status status = compaction_job.Run();
compaction_job.Run();
mutex_.Lock();
compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_);
Status status = compaction_job.Install(*c->mutable_cf_options(), &mutex_);
if (status.ok()) {
InstallSuperVersionAndScheduleWorkWrapper(
c->column_family_data(), job_context, *c->mutable_cf_options());
@ -2689,11 +2689,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
compaction_job.Prepare();
mutex_.Unlock();
status = compaction_job.Run();
compaction_job.Run();
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
mutex_.Lock();
compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_);
status = compaction_job.Install(*c->mutable_cf_options(), &mutex_);
if (status.ok()) {
InstallSuperVersionAndScheduleWorkWrapper(
c->column_family_data(), job_context, *c->mutable_cf_options());

@ -12,6 +12,8 @@ namespace rocksdb {
struct CompactionJobStats {
CompactionJobStats() { Reset(); }
void Reset();
// Aggregate the CompactionJobStats from another instance with this one
void Add(const CompactionJobStats& stats);
// the elapsed time in micro of this compaction.
uint64_t elapsed_micros;

@ -226,6 +226,12 @@ DEFINE_int32(set_in_place_one_in, 0,
DEFINE_int64(cache_size, 2 * KB * KB * KB,
"Number of bytes to use as a cache of uncompressed data.");
DEFINE_uint64(subcompactions, 1,
"Maximum number of subcompactions to divide L0-L1 compactions "
"into.");
static const bool FLAGS_subcompactions_dummy __attribute__((unused)) =
RegisterFlagValidator(&FLAGS_subcompactions, &ValidateUint32Range);
static bool ValidateInt32Positive(const char* flagname, int32_t value) {
if (value < 0) {
fprintf(stderr, "Invalid value for --%s: %d, must be >=0\n",
@ -1877,6 +1883,7 @@ class StressTest {
options_.max_manifest_file_size = 10 * 1024;
options_.filter_deletes = FLAGS_filter_deletes;
options_.inplace_update_support = FLAGS_in_place_update;
options_.num_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kHashSkipList)) {
fprintf(stderr,
"prefix_size should be non-zero iff memtablerep == prefix_hash\n");

@ -40,6 +40,35 @@ void CompactionJobStats::Reset() {
file_prepare_write_nanos = 0;
}
void CompactionJobStats::Add(const CompactionJobStats& stats) {
elapsed_micros += stats.elapsed_micros;
num_input_records += stats.num_input_records;
num_input_files += stats.num_input_files;
num_input_files_at_output_level += stats.num_input_files_at_output_level;
num_output_records += stats.num_output_records;
num_output_files += stats.num_output_files;
total_input_bytes += stats.total_input_bytes;
total_output_bytes += stats.total_output_bytes;
num_records_replaced += stats.num_records_replaced;
total_input_raw_key_bytes += stats.total_input_raw_key_bytes;
total_input_raw_value_bytes += stats.total_input_raw_value_bytes;
num_input_deletion_records += stats.num_input_deletion_records;
num_expired_deletion_records += stats.num_expired_deletion_records;
num_corrupt_keys += stats.num_corrupt_keys;
file_write_nanos += stats.file_write_nanos;
file_range_sync_nanos += stats.file_range_sync_nanos;
file_fsync_nanos += stats.file_fsync_nanos;
file_prepare_write_nanos += stats.file_prepare_write_nanos;
}
#else
void CompactionJobStats::Reset() {}

Loading…
Cancel
Save