Get rid of mutex in CompactionJob's state

Summary: Based on @sdong's feedback in the diff, we shouldn't keep db_mutex in CompactionJob's state. This diff removes db_mutex from CompactionJob state, by making next_file_number_ atomic. That way we only need to pass the lock to InstallCompactionResults() because of LogAndApply()

Test Plan: make check

Reviewers: ljin, yhchiang, rven, sdong

Reviewed By: sdong

Subscribers: sdong, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D28491
main
Igor Canadi 10 years ago
parent 344edbb044
commit e3d3567b5b
  1. 57
      db/compaction_job.cc
  2. 13
      db/compaction_job.h
  3. 26
      db/db_impl.cc
  4. 28
      db/version_set.cc
  5. 16
      db/version_set.h

@ -71,7 +71,6 @@ struct CompactionJob::CompactionState {
SequenceNumber smallest_seqno, largest_seqno;
};
std::vector<Output> outputs;
std::list<uint64_t> allocated_file_numbers;
// State kept for output being generated
std::unique_ptr<WritableFile> outfile;
@ -204,10 +203,10 @@ struct CompactionJob::CompactionState {
CompactionJob::CompactionJob(
Compaction* compaction, const DBOptions& db_options,
const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
VersionSet* versions, port::Mutex* db_mutex,
std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
Directory* db_directory, Statistics* stats, SnapshotList* snapshots,
bool is_snapshot_supported, std::shared_ptr<Cache> table_cache,
VersionSet* versions, std::atomic<bool>* shutting_down,
LogBuffer* log_buffer, Directory* db_directory, Statistics* stats,
SnapshotList* snapshots, bool is_snapshot_supported,
std::shared_ptr<Cache> table_cache,
std::function<uint64_t()> yield_callback)
: compact_(new CompactionState(compaction)),
compaction_stats_(1),
@ -216,7 +215,6 @@ CompactionJob::CompactionJob(
env_options_(env_options),
env_(db_options.env),
versions_(versions),
db_mutex_(db_mutex),
shutting_down_(shutting_down),
log_buffer_(log_buffer),
db_directory_(db_directory),
@ -227,7 +225,6 @@ CompactionJob::CompactionJob(
yield_callback_(std::move(yield_callback)) {}
void CompactionJob::Prepare() {
db_mutex_->AssertHeld();
compact_->CleanupBatchBuffer();
compact_->CleanupMergedBuffer();
@ -267,9 +264,6 @@ void CompactionJob::Prepare() {
// Is this compaction producing files at the bottommost level?
bottommost_level_ = compact_->compaction->BottomMostLevel();
// Allocate the output file numbers before we release the lock
AllocateCompactionOutputFileNumbers();
}
Status CompactionJob::Run() {
@ -461,14 +455,14 @@ Status CompactionJob::Run() {
return status;
}
Status CompactionJob::Install(Status status) {
db_mutex_->AssertHeld();
Status CompactionJob::Install(Status status, port::Mutex* db_mutex) {
db_mutex->AssertHeld();
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
cfd->internal_stats()->AddCompactionStats(
compact_->compaction->output_level(), compaction_stats_);
if (status.ok()) {
status = InstallCompactionResults();
status = InstallCompactionResults(db_mutex);
}
VersionStorageInfo::LevelSummaryStorage tmp;
const auto& stats = compaction_stats_;
@ -496,19 +490,6 @@ Status CompactionJob::Install(Status status) {
return status;
}
// Allocate the file numbers for the output file. We allocate as
// many output file numbers as there are files in level+1 (at least one)
// Insert them into pending_outputs so that they do not get deleted.
void CompactionJob::AllocateCompactionOutputFileNumbers() {
db_mutex_->AssertHeld();
assert(compact_->builder == nullptr);
int filesNeeded = compact_->compaction->num_input_files(1);
for (int i = 0; i < std::max(filesNeeded, 1); i++) {
uint64_t file_number = versions_->NewFileNumber();
compact_->allocated_file_numbers.push_back(file_number);
}
}
Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
Iterator* input,
bool is_compaction_v2) {
@ -958,8 +939,8 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) {
return s;
}
Status CompactionJob::InstallCompactionResults() {
db_mutex_->AssertHeld();
Status CompactionJob::InstallCompactionResults(port::Mutex* db_mutex) {
db_mutex->AssertHeld();
// paranoia: verify that the files that we started with
// still exist in the current version and in the same original level.
@ -995,7 +976,7 @@ Status CompactionJob::InstallCompactionResults() {
}
return versions_->LogAndApply(
compact_->compaction->column_family_data(), mutable_cf_options_,
compact_->compaction->edit(), db_mutex_, db_directory_);
compact_->compaction->edit(), db_mutex, db_directory_);
}
// Given a sequence number, return the sequence number of the
@ -1036,21 +1017,8 @@ void CompactionJob::RecordCompactionIOStats() {
Status CompactionJob::OpenCompactionOutputFile() {
assert(compact_ != nullptr);
assert(compact_->builder == nullptr);
uint64_t file_number;
// If we have not yet exhausted the pre-allocated file numbers,
// then use the one from the front. Otherwise, we have to acquire
// the heavyweight lock and allocate a new file number.
if (!compact_->allocated_file_numbers.empty()) {
file_number = compact_->allocated_file_numbers.front();
compact_->allocated_file_numbers.pop_front();
} else {
db_mutex_->Lock();
// TODO(icanadi) make Versions::next_file_number_ atomic and remove db_lock
// around here. Once we do that, AllocateCompactionOutputFileNumbers() will
// not be needed.
file_number = versions_->NewFileNumber();
db_mutex_->Unlock();
}
// no need to lock because VersionSet::next_file_number_ is atomic
uint64_t file_number = versions_->NewFileNumber();
// Make the output file
std::string fname = TableFileName(db_options_.db_paths, file_number,
compact_->compaction->GetOutputPathId());
@ -1087,7 +1055,6 @@ Status CompactionJob::OpenCompactionOutputFile() {
}
void CompactionJob::CleanupCompaction(Status status) {
db_mutex_->AssertHeld();
if (compact_->builder != nullptr) {
// May happen if we get a shutdown call in the middle of compaction
compact_->builder->Abandon();

@ -56,10 +56,10 @@ class CompactionJob {
CompactionJob(Compaction* compaction, const DBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
const EnvOptions& env_options, VersionSet* versions,
port::Mutex* db_mutex, std::atomic<bool>* shutting_down,
LogBuffer* log_buffer, Directory* db_directory,
Statistics* stats, SnapshotList* snapshot_list,
bool is_snapshot_supported, std::shared_ptr<Cache> table_cache,
std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
Directory* db_directory, Statistics* stats,
SnapshotList* snapshot_list, bool is_snapshot_supported,
std::shared_ptr<Cache> table_cache,
std::function<uint64_t()> yield_callback);
~CompactionJob() { assert(compact_ == nullptr); }
@ -75,7 +75,7 @@ class CompactionJob {
Status Run();
// REQUIRED: mutex held
// status is the return of Run()
Status Install(Status status);
Status Install(Status status, port::Mutex* db_mutex);
private:
void AllocateCompactionOutputFileNumbers();
@ -86,7 +86,7 @@ class CompactionJob {
// Call compaction_filter_v2->Filter() on kv-pairs in compact
void CallCompactionFilterV2(CompactionFilterV2* compaction_filter_v2);
Status FinishCompactionOutputFile(Iterator* input);
Status InstallCompactionResults();
Status InstallCompactionResults(port::Mutex* db_mutex);
SequenceNumber findEarliestVisibleSnapshot(
SequenceNumber in, const std::vector<SequenceNumber>& snapshots,
SequenceNumber* prev_snapshot);
@ -111,7 +111,6 @@ class CompactionJob {
const EnvOptions& env_options_;
Env* env_;
VersionSet* versions_;
port::Mutex* db_mutex_;
std::atomic<bool>* shutting_down_;
LogBuffer* log_buffer_;
Directory* db_directory_;

@ -836,7 +836,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(log_number);
versions_->MarkFileNumberUsedDuringRecovery(log_number);
// Open the log file
std::string fname = LogFileName(db_options_.wal_dir, log_number);
unique_ptr<SequentialFile> file;
@ -970,7 +970,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// not actually used. that is because VersionSet assumes
// VersionSet::next_file_number_ always to be strictly greater than any
// log number
versions_->MarkFileNumberUsed(max_log_number + 1);
versions_->MarkFileNumberUsedDuringRecovery(max_log_number + 1);
status = versions_->LogAndApply(
cfd, *cfd->GetLatestMutableCFOptions(), edit, &mutex_);
if (!status.ok()) {
@ -1285,18 +1285,18 @@ Status DBImpl::CompactFilesImpl(
*c->mutable_cf_options(), &job_context,
&log_buffer);
};
CompactionJob compaction_job(
c.get(), db_options_, *c->mutable_cf_options(), env_options_,
versions_.get(), &mutex_, &shutting_down_,
&log_buffer, db_directory_.get(), stats_, &snapshots_,
IsSnapshotSupported(), table_cache_, std::move(yield_callback));
CompactionJob compaction_job(c.get(), db_options_, *c->mutable_cf_options(),
env_options_, versions_.get(), &shutting_down_,
&log_buffer, db_directory_.get(), stats_,
&snapshots_, IsSnapshotSupported(), table_cache_,
std::move(yield_callback));
compaction_job.Prepare();
mutex_.Unlock();
Status status = compaction_job.Run();
mutex_.Lock();
if (status.ok()) {
status = compaction_job.Install(status);
status = compaction_job.Install(status, &mutex_);
if (status.ok()) {
InstallSuperVersionBackground(c->column_family_data(), &job_context,
*c->mutable_cf_options());
@ -2061,16 +2061,16 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
*c->mutable_cf_options(), job_context,
log_buffer);
};
CompactionJob compaction_job(
c.get(), db_options_, *c->mutable_cf_options(), env_options_,
versions_.get(), &mutex_, &shutting_down_, log_buffer,
db_directory_.get(), stats_, &snapshots_, IsSnapshotSupported(),
CompactionJob compaction_job(c.get(), db_options_, *c->mutable_cf_options(),
env_options_, versions_.get(), &shutting_down_,
log_buffer, db_directory_.get(), stats_,
&snapshots_, IsSnapshotSupported(),
table_cache_, std::move(yield_callback));
compaction_job.Prepare();
mutex_.Unlock();
status = compaction_job.Run();
mutex_.Lock();
status = compaction_job.Install(status);
status = compaction_job.Install(status, &mutex_);
if (status.ok()) {
InstallSuperVersionBackground(c->column_family_data(), job_context,
*c->mutable_cf_options());

@ -1613,7 +1613,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
if (!descriptor_log_ ||
manifest_file_size_ > db_options_->max_manifest_file_size) {
pending_manifest_file_number_ = NewFileNumber();
batch_edits.back()->SetNextFile(next_file_number_);
batch_edits.back()->SetNextFile(next_file_number_.load());
new_descriptor_log = true;
} else {
pending_manifest_file_number_ = manifest_file_number_;
@ -1814,7 +1814,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
assert(edit->IsColumnFamilyManipulation());
edit->SetNextFile(next_file_number_);
edit->SetNextFile(next_file_number_.load());
edit->SetLastSequence(last_sequence_);
if (edit->is_column_family_drop_) {
// if we drop column family, we have to make sure to save max column family,
@ -1831,13 +1831,13 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
if (edit->has_log_number_) {
assert(edit->log_number_ >= cfd->GetLogNumber());
assert(edit->log_number_ < next_file_number_);
assert(edit->log_number_ < next_file_number_.load());
}
if (!edit->has_prev_log_number_) {
edit->SetPrevLogNumber(prev_log_number_);
}
edit->SetNextFile(next_file_number_);
edit->SetNextFile(next_file_number_.load());
edit->SetLastSequence(last_sequence_);
builder->Apply(edit);
@ -2064,8 +2064,8 @@ Status VersionSet::Recover(
column_family_set_->UpdateMaxColumnFamily(max_column_family);
MarkFileNumberUsed(previous_log_number);
MarkFileNumberUsed(log_number);
MarkFileNumberUsedDuringRecovery(previous_log_number);
MarkFileNumberUsedDuringRecovery(log_number);
}
// there were some column families in the MANIFEST that weren't specified
@ -2105,7 +2105,7 @@ Status VersionSet::Recover(
}
manifest_file_size_ = current_manifest_file_size;
next_file_number_ = next_file + 1;
next_file_number_.store(next_file + 1);
last_sequence_ = last_sequence;
prev_log_number_ = previous_log_number;
@ -2116,7 +2116,7 @@ Status VersionSet::Recover(
"prev_log_number is %lu,"
"max_column_family is %u\n",
manifest_filename.c_str(), (unsigned long)manifest_file_number_,
(unsigned long)next_file_number_, (unsigned long)last_sequence_,
(unsigned long)next_file_number_.load(), (unsigned long)last_sequence_,
(unsigned long)log_number, (unsigned long)prev_log_number_,
column_family_set_->GetMaxColumnFamily());
@ -2452,14 +2452,14 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
delete v;
}
next_file_number_ = next_file + 1;
next_file_number_.store(next_file + 1);
last_sequence_ = last_sequence;
prev_log_number_ = previous_log_number;
printf(
"next_file_number %lu last_sequence "
"%lu prev_log_number %lu max_column_family %u\n",
(unsigned long)next_file_number_, (unsigned long)last_sequence,
(unsigned long)next_file_number_.load(), (unsigned long)last_sequence,
(unsigned long)previous_log_number,
column_family_set_->GetMaxColumnFamily());
}
@ -2468,9 +2468,11 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
}
#endif // ROCKSDB_LITE
void VersionSet::MarkFileNumberUsed(uint64_t number) {
if (next_file_number_ <= number) {
next_file_number_ = number + 1;
void VersionSet::MarkFileNumberUsedDuringRecovery(uint64_t number) {
// only called during recovery which is single threaded, so this works because
// there can't be concurrent calls
if (next_file_number_.load(std::memory_order_relaxed) <= number) {
next_file_number_.store(number + 1, std::memory_order_relaxed);
}
}

@ -532,19 +532,18 @@ class VersionSet {
return pending_manifest_file_number_;
}
// REQUIRED: mutex locked
uint64_t current_next_file_number() const { return next_file_number_; }
uint64_t current_next_file_number() const { return next_file_number_.load(); }
// Allocate and return a new file number
uint64_t NewFileNumber() { return next_file_number_++; }
uint64_t NewFileNumber() { return next_file_number_.fetch_add(1) + 1; }
// Arrange to reuse "file_number" unless a newer file number has
// already been allocated.
// REQUIRES: "file_number" was returned by a call to NewFileNumber().
void ReuseLogFileNumber(uint64_t file_number) {
if (next_file_number_ == file_number + 1) {
next_file_number_ = file_number;
}
auto expected = file_number + 1;
std::atomic_compare_exchange_strong(&next_file_number_, &expected,
file_number);
}
// Return the last sequence number.
@ -559,7 +558,8 @@ class VersionSet {
}
// Mark the specified file number as used.
void MarkFileNumberUsed(uint64_t number);
// REQUIRED: this is only called during single-threaded recovery
void MarkFileNumberUsedDuringRecovery(uint64_t number);
// Return the log file number for the log file that is currently
// being compacted, or zero if there is no such log file.
@ -636,7 +636,7 @@ class VersionSet {
Env* const env_;
const std::string dbname_;
const DBOptions* const db_options_;
uint64_t next_file_number_;
std::atomic<uint64_t> next_file_number_;
uint64_t manifest_file_number_;
uint64_t pending_manifest_file_number_;
std::atomic<uint64_t> last_sequence_;

Loading…
Cancel
Save