Fix data race issue when sub-compaction is used in CompactionJob

Summary:
When subcompaction is used, all subcompactions share the same Compaction
pointer in CompactionJob while each subcompaction all keeps their mutable
stats in SubcompactionState.  However, there're still some mutable part
that is currently store in the shared Compaction pointer.

This patch makes two changes:

1. Make the shared Compaction pointer const so that it can never be modified
   during the compaction.
2. Move necessary states from Compaction to SubcompactionState.
3. Make functions of Compaction const if the function does not modify
   its internal state.

Test Plan: rocksdb and MyRocks test

Reviewers: sdong, kradhakrishnan, andrewkr, IslamAbdelRahman

Reviewed By: IslamAbdelRahman

Subscribers: andrewkr, dhruba, yoshinorim, gunnarku, leveldb

Differential Revision: https://reviews.facebook.net/D55923
main
Yueh-Hsuan Chiang 9 years ago
parent e3802531f1
commit be9816b3d9
  1. 31
      db/compaction.cc
  2. 32
      db/compaction.h
  3. 2
      db/compaction_iterator.cc
  4. 4
      db/compaction_iterator.h
  5. 52
      db/compaction_job.cc
  6. 2
      db/version_set.cc
  7. 2
      db/version_set.h

@ -162,9 +162,6 @@ Compaction::Compaction(VersionStorageInfo* vstorage,
deletion_compaction_(_deletion_compaction),
inputs_(std::move(_inputs)),
grandparents_(std::move(_grandparents)),
grandparent_index_(0),
seen_key_(false),
overlapped_bytes_(0),
score_(_score),
bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)),
is_full_compaction_(IsFullCompaction(vstorage, inputs_)),
@ -290,32 +287,6 @@ bool Compaction::KeyNotExistsBeyondOutputLevel(
return true;
}
bool Compaction::ShouldStopBefore(const Slice& internal_key) {
// Scan to find earliest grandparent file that contains key.
const InternalKeyComparator* icmp = &cfd_->internal_comparator();
while (grandparent_index_ < grandparents_.size() &&
icmp->Compare(internal_key,
grandparents_[grandparent_index_]->largest.Encode()) > 0) {
if (seen_key_) {
overlapped_bytes_ += grandparents_[grandparent_index_]->fd.GetFileSize();
}
assert(grandparent_index_ + 1 >= grandparents_.size() ||
icmp->Compare(grandparents_[grandparent_index_]->largest.Encode(),
grandparents_[grandparent_index_+1]->smallest.Encode())
< 0);
grandparent_index_++;
}
seen_key_ = true;
if (overlapped_bytes_ > max_grandparent_overlap_bytes_) {
// Too much overlap for current output; start new output
overlapped_bytes_ = 0;
return true;
} else {
return false;
}
}
// Mark (or clear) each file that is being compacted
void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted) {
for (size_t i = 0; i < num_input_levels(); i++) {
@ -421,7 +392,7 @@ void Compaction::Summary(char* output, int len) {
snprintf(output + write, len - write, "]");
}
uint64_t Compaction::OutputFilePreallocationSize() {
uint64_t Compaction::OutputFilePreallocationSize() const {
uint64_t preallocation_size = 0;
if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel ||

@ -104,7 +104,7 @@ class Compaction {
}
// Returns the LevelFilesBrief of the specified compaction input level.
LevelFilesBrief* input_levels(size_t compaction_input_level) {
const LevelFilesBrief* input_levels(size_t compaction_input_level) const {
return &input_levels_[compaction_input_level];
}
@ -132,10 +132,6 @@ class Compaction {
bool KeyNotExistsBeyondOutputLevel(const Slice& user_key,
std::vector<size_t>* level_ptrs) const;
// Returns true iff we should stop building the current output
// before processing "internal_key".
bool ShouldStopBefore(const Slice& internal_key);
// Clear all files to indicate that they are not being compacted
// Delete this compaction from the list of running compactions.
//
@ -151,13 +147,13 @@ class Compaction {
double score() const { return score_; }
// Is this compaction creating a file in the bottom most level?
bool bottommost_level() { return bottommost_level_; }
bool bottommost_level() const { return bottommost_level_; }
// Does this compaction include all sst files?
bool is_full_compaction() { return is_full_compaction_; }
bool is_full_compaction() const { return is_full_compaction_; }
// Was this compaction triggered manually by the client?
bool is_manual_compaction() { return is_manual_compaction_; }
bool is_manual_compaction() const { return is_manual_compaction_; }
// Used when allow_trivial_move option is set in
// Universal compaction. If all the input files are
@ -170,19 +166,21 @@ class Compaction {
// Used when allow_trivial_move option is set in
// Universal compaction. Returns true, if the input files
// are non-overlapping and can be trivially moved.
bool is_trivial_move() { return is_trivial_move_; }
bool is_trivial_move() const { return is_trivial_move_; }
// How many total levels are there?
int number_levels() const { return number_levels_; }
// Return the MutableCFOptions that should be used throughout the compaction
// procedure
const MutableCFOptions* mutable_cf_options() { return &mutable_cf_options_; }
const MutableCFOptions* mutable_cf_options() const {
return &mutable_cf_options_;
}
// Returns the size in bytes that the output file should be preallocated to.
// In level compaction, that is max_file_size_. In universal compaction, that
// is the sum of all input file sizes.
uint64_t OutputFilePreallocationSize();
uint64_t OutputFilePreallocationSize() const;
void SetInputVersion(Version* input_version);
@ -225,6 +223,14 @@ class Compaction {
CompactionReason compaction_reason() { return compaction_reason_; }
const std::vector<FileMetaData*>& grandparents() const {
return grandparents_;
}
uint64_t max_grandparent_overlap_bytes() const {
return max_grandparent_overlap_bytes_;
}
private:
// mark (or clear) all files that are being compacted
void MarkFilesBeingCompacted(bool mark_as_compacted);
@ -268,10 +274,6 @@ class Compaction {
// State used to check for number of of overlapping grandparent files
// (grandparent == "output_level_ + 1")
std::vector<FileMetaData*> grandparents_;
size_t grandparent_index_; // Index in grandparent_starts_
bool seen_key_; // Some output key has been seen
uint64_t overlapped_bytes_; // Bytes of overlap between current output
// and grandparent files
const double score_; // score that was used to pick this compaction.
// Is this compaction creating a file in the bottom most level?

@ -14,7 +14,7 @@ CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot, Env* env,
bool expect_valid_internal_key, Compaction* compaction,
bool expect_valid_internal_key, const Compaction* compaction,
const CompactionFilter* compaction_filter, LogBuffer* log_buffer)
: input_(input),
cmp_(cmp),

@ -42,7 +42,7 @@ class CompactionIterator {
std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot, Env* env,
bool expect_valid_internal_key,
Compaction* compaction = nullptr,
const Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr,
LogBuffer* log_buffer = nullptr);
@ -92,7 +92,7 @@ class CompactionIterator {
const SequenceNumber earliest_write_conflict_snapshot_;
Env* env_;
bool expect_valid_internal_key_;
Compaction* compaction_;
const Compaction* compaction_;
const CompactionFilter* compaction_filter_;
LogBuffer* log_buffer_;
bool bottommost_level_;

@ -63,7 +63,7 @@ namespace rocksdb {
// Maintains state for each sub-compaction
struct CompactionJob::SubcompactionState {
Compaction* compaction;
const Compaction* compaction;
std::unique_ptr<CompactionIterator> c_iter;
// The boundaries of the key-range this compaction is interested in. No two
@ -104,6 +104,14 @@ struct CompactionJob::SubcompactionState {
uint64_t num_output_records;
CompactionJobStats compaction_job_stats;
uint64_t approx_size;
// An index that used to speed up Compaction::ShouldStopBefore().
size_t grandparent_index = 0;
// The number of bytes overlapping between the current output and
// grandparent files used in Compaction::ShouldStopBefore().
uint64_t overlapped_bytes = 0;
// A flag determine whether the key has been seen in
// Compaction::ShouldStopBefore()
bool seen_key = false;
SubcompactionState(Compaction* c, Slice* _start, Slice* _end,
uint64_t size = 0)
@ -115,7 +123,10 @@ struct CompactionJob::SubcompactionState {
total_bytes(0),
num_input_records(0),
num_output_records(0),
approx_size(size) {
approx_size(size),
grandparent_index(0),
overlapped_bytes(0),
seen_key(false) {
assert(compaction != nullptr);
}
@ -134,6 +145,9 @@ struct CompactionJob::SubcompactionState {
num_output_records = std::move(o.num_output_records);
compaction_job_stats = std::move(o.compaction_job_stats);
approx_size = std::move(o.approx_size);
grandparent_index = std::move(o.grandparent_index);
overlapped_bytes = std::move(o.overlapped_bytes);
seen_key = std::move(o.seen_key);
return *this;
}
@ -141,6 +155,38 @@ struct CompactionJob::SubcompactionState {
SubcompactionState(const SubcompactionState&) = delete;
SubcompactionState& operator=(const SubcompactionState&) = delete;
// Returns true iff we should stop building the current output
// before processing "internal_key".
bool ShouldStopBefore(const Slice& internal_key) {
const InternalKeyComparator* icmp =
&compaction->column_family_data()->internal_comparator();
const std::vector<FileMetaData*>& grandparents = compaction->grandparents();
// Scan to find earliest grandparent file that contains key.
while (grandparent_index < grandparents.size() &&
icmp->Compare(internal_key,
grandparents[grandparent_index]->largest.Encode()) >
0) {
if (seen_key) {
overlapped_bytes += grandparents[grandparent_index]->fd.GetFileSize();
}
assert(grandparent_index + 1 >= grandparents.size() ||
icmp->Compare(
grandparents[grandparent_index]->largest.Encode(),
grandparents[grandparent_index + 1]->smallest.Encode()) < 0);
grandparent_index++;
}
seen_key = true;
if (overlapped_bytes > compaction->max_grandparent_overlap_bytes()) {
// Too much overlap for current output; start new output
overlapped_bytes = 0;
return true;
}
return false;
}
};
// Maintains state for the entire compaction
@ -669,7 +715,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
if (end != nullptr &&
cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) {
break;
} else if (sub_compact->compaction->ShouldStopBefore(key) &&
} else if (sub_compact->ShouldStopBefore(key) &&
sub_compact->builder != nullptr) {
status = FinishCompactionOutputFile(input->status(), sub_compact);
if (!status.ok()) {

@ -3240,7 +3240,7 @@ void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
}
}
InternalIterator* VersionSet::MakeInputIterator(Compaction* c) {
InternalIterator* VersionSet::MakeInputIterator(const Compaction* c) {
auto cfd = c->column_family_data();
ReadOptions read_options;
read_options.verify_checksums =

@ -671,7 +671,7 @@ class VersionSet {
// Create an iterator that reads over the compaction inputs for "*c".
// The caller should delete the iterator when no longer needed.
InternalIterator* MakeInputIterator(Compaction* c);
InternalIterator* MakeInputIterator(const Compaction* c);
// Add all files listed in any live version to *live.
void AddLiveFiles(std::vector<FileDescriptor>* live_list);

Loading…
Cancel
Save