Cleanup CompactionJob

Summary:
Couple changes:
1. instead of SnapshotList, just take a vector of snapshots
2. don't take a separate parameter is_snapshots_supported. If there are snapshots in the list, that means they are supported. I actually think we should get rid of this notion of snapshots not being supported.
3. don't pass in mutable_cf_options as a parameter. Lifetime of mutable_cf_options is a bit tricky to maintain, so it's better to not pass it in for the whole compaction job. We only really need it when we install the compaction results.

Test Plan: make check

Reviewers: sdong, rven, yhchiang

Reviewed By: yhchiang

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D36627
main
Igor Canadi 10 years ago
parent df4130ad85
commit 65fe1cfbb3
  1. 5
      db/compaction.cc
  2. 2
      db/compaction.h
  3. 61
      db/compaction_job.cc
  4. 28
      db/compaction_job.h
  5. 13
      db/compaction_job_test.cc
  6. 26
      db/db_impl.cc
  7. 12
      db/snapshot.h

@ -322,12 +322,11 @@ void Compaction::Summary(char* output, int len) {
snprintf(output + write, len - write, "]");
}
uint64_t Compaction::OutputFilePreallocationSize(
const MutableCFOptions& mutable_options) {
uint64_t Compaction::OutputFilePreallocationSize() {
uint64_t preallocation_size = 0;
if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
preallocation_size = mutable_options.MaxFileSizeForLevel(output_level());
preallocation_size = max_output_file_size_;
} else {
for (size_t level_iter = 0; level_iter < num_input_levels(); ++level_iter) {
for (const auto& f : inputs_[level_iter].files) {

@ -163,7 +163,7 @@ class Compaction {
// Returns the size in bytes that the output file should be preallocated to.
// In level compaction, that is max_file_size_. In universal compaction, that
// is the sum of all input file sizes.
uint64_t OutputFilePreallocationSize(const MutableCFOptions& mutable_options);
uint64_t OutputFilePreallocationSize();
void SetInputVersion(Version* input_version);

@ -58,12 +58,6 @@ namespace rocksdb {
struct CompactionJob::CompactionState {
Compaction* const compaction;
// If there were two snapshots with seq numbers s1 and
// s2 and s1 < s2, and if we find two instances of a key k1 then lies
// entirely within s1 and s2, then the earlier version of k1 can be safely
// deleted because that version is not visible in any snapshot.
std::vector<SequenceNumber> existing_snapshots;
// Files produced by compaction
struct Output {
uint64_t number;
@ -204,17 +198,17 @@ struct CompactionJob::CompactionState {
CompactionJob::CompactionJob(
int job_id, Compaction* compaction, const DBOptions& db_options,
const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
VersionSet* versions, std::atomic<bool>* shutting_down,
LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory,
Statistics* stats, SnapshotList* snapshots, bool is_snapshot_supported,
const EnvOptions& env_options, VersionSet* versions,
std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
Directory* db_directory, Directory* output_directory, Statistics* stats,
std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache,
std::function<uint64_t()> yield_callback, EventLogger* event_logger)
std::function<uint64_t()> yield_callback, EventLogger* event_logger,
bool paranoid_file_checks)
: job_id_(job_id),
compact_(new CompactionState(compaction)),
compaction_stats_(1),
db_options_(db_options),
mutable_cf_options_(mutable_cf_options),
env_options_(env_options),
env_(db_options.env),
versions_(versions),
@ -223,13 +217,12 @@ CompactionJob::CompactionJob(
db_directory_(db_directory),
output_directory_(output_directory),
stats_(stats),
snapshots_(snapshots),
is_snapshot_supported_(is_snapshot_supported),
existing_snapshots_(std::move(existing_snapshots)),
table_cache_(std::move(table_cache)),
yield_callback_(std::move(yield_callback)),
event_logger_(event_logger) {
ThreadStatusUtil::SetColumnFamily(
compact_->compaction->column_family_data());
event_logger_(event_logger),
paranoid_file_checks_(paranoid_file_checks) {
ThreadStatusUtil::SetColumnFamily(compact_->compaction->column_family_data());
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
}
@ -256,18 +249,16 @@ void CompactionJob::Prepare() {
visible_at_tip_ = 0;
latest_snapshot_ = 0;
// TODO(icanadi) move snapshots_ out of CompactionJob
snapshots_->getAll(compact_->existing_snapshots);
if (compact_->existing_snapshots.size() == 0) {
if (existing_snapshots_.size() == 0) {
// optimize for fast path if there are no snapshots
visible_at_tip_ = versions_->LastSequence();
earliest_snapshot_ = visible_at_tip_;
} else {
latest_snapshot_ = compact_->existing_snapshots.back();
latest_snapshot_ = existing_snapshots_.back();
// Add the current seqno as the 'latest' virtual
// snapshot to the end of this list.
compact_->existing_snapshots.push_back(versions_->LastSequence());
earliest_snapshot_ = compact_->existing_snapshots[0];
existing_snapshots_.push_back(versions_->LastSequence());
earliest_snapshot_ = existing_snapshots_[0];
}
// Is this compaction producing files at the bottommost level?
@ -509,7 +500,9 @@ Status CompactionJob::Run() {
return status;
}
void CompactionJob::Install(Status* status, InstrumentedMutex* db_mutex) {
void CompactionJob::Install(Status* status,
const MutableCFOptions& mutable_cf_options,
InstrumentedMutex* db_mutex) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_INSTALL);
db_mutex->AssertHeld();
@ -518,7 +511,7 @@ void CompactionJob::Install(Status* status, InstrumentedMutex* db_mutex) {
compact_->compaction->output_level(), compaction_stats_);
if (status->ok()) {
*status = InstallCompactionResults(db_mutex);
*status = InstallCompactionResults(db_mutex, mutable_cf_options);
}
VersionStorageInfo::LevelSummaryStorage tmp;
auto vstorage = cfd->current()->storage_info();
@ -716,11 +709,8 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
SequenceNumber visible =
visible_at_tip_
? visible_at_tip_
: is_snapshot_supported_
? findEarliestVisibleSnapshot(ikey.sequence,
compact_->existing_snapshots,
&prev_snapshot)
: 0;
: findEarliestVisibleSnapshot(ikey.sequence, existing_snapshots_,
&prev_snapshot);
if (visible_in_snapshot == visible) {
// If the earliest snapshot is which this key is visible in
@ -1018,7 +1008,7 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) {
ReadOptions(), env_options_, cfd->internal_comparator(), fd);
s = iter->status();
if (s.ok() && mutable_cf_options_.paranoid_file_checks) {
if (s.ok() && paranoid_file_checks_) {
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {}
s = iter->status();
}
@ -1039,7 +1029,8 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) {
return s;
}
Status CompactionJob::InstallCompactionResults(InstrumentedMutex* db_mutex) {
Status CompactionJob::InstallCompactionResults(
InstrumentedMutex* db_mutex, const MutableCFOptions& mutable_cf_options) {
db_mutex->AssertHeld();
auto* compaction = compact_->compaction;
@ -1074,7 +1065,7 @@ Status CompactionJob::InstallCompactionResults(InstrumentedMutex* db_mutex) {
out.smallest, out.largest, out.smallest_seqno, out.largest_seqno);
}
return versions_->LogAndApply(compaction->column_family_data(),
mutable_cf_options_, compaction->edit(),
mutable_cf_options, compaction->edit(),
db_mutex, db_directory_);
}
@ -1142,8 +1133,8 @@ Status CompactionJob::OpenCompactionOutputFile() {
compact_->outputs.push_back(out);
compact_->outfile->SetIOPriority(Env::IO_LOW);
compact_->outfile->SetPreallocationBlockSize(static_cast<size_t>(
compact_->compaction->OutputFilePreallocationSize(mutable_cf_options_)));
compact_->outfile->SetPreallocationBlockSize(
static_cast<size_t>(compact_->compaction->OutputFilePreallocationSize()));
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
bool skip_filters = false;

@ -19,7 +19,6 @@
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
#include "db/column_family.h"
#include "db/version_edit.h"
#include "db/memtable_list.h"
@ -51,18 +50,15 @@ class Arena;
class CompactionJob {
public:
// TODO(icanadi) make effort to reduce number of parameters here
// IMPORTANT: mutable_cf_options needs to be alive while CompactionJob is
// alive
CompactionJob(int job_id, Compaction* compaction, const DBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
const EnvOptions& env_options, VersionSet* versions,
std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
Directory* db_directory, Directory* output_directory,
Statistics* stats, SnapshotList* snapshot_list,
bool is_snapshot_supported, std::shared_ptr<Cache> table_cache,
Statistics* stats,
std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache,
std::function<uint64_t()> yield_callback,
EventLogger* event_logger);
EventLogger* event_logger, bool paranoid_file_checks);
~CompactionJob();
@ -77,7 +73,8 @@ class CompactionJob {
Status Run();
// REQUIRED: mutex held
// status is the return of Run()
void Install(Status* status, InstrumentedMutex* db_mutex);
void Install(Status* status, const MutableCFOptions& mutable_cf_options,
InstrumentedMutex* db_mutex);
private:
void AllocateCompactionOutputFileNumbers();
@ -89,7 +86,8 @@ class CompactionJob {
void CallCompactionFilterV2(CompactionFilterV2* compaction_filter_v2,
uint64_t* time);
Status FinishCompactionOutputFile(Iterator* input);
Status InstallCompactionResults(InstrumentedMutex* db_mutex);
Status InstallCompactionResults(InstrumentedMutex* db_mutex,
const MutableCFOptions& mutable_cf_options);
SequenceNumber findEarliestVisibleSnapshot(
SequenceNumber in, const std::vector<SequenceNumber>& snapshots,
SequenceNumber* prev_snapshot);
@ -112,7 +110,6 @@ class CompactionJob {
// DBImpl state
const DBOptions& db_options_;
const MutableCFOptions& mutable_cf_options_;
const EnvOptions& env_options_;
Env* env_;
VersionSet* versions_;
@ -121,14 +118,19 @@ class CompactionJob {
Directory* db_directory_;
Directory* output_directory_;
Statistics* stats_;
SnapshotList* snapshots_;
bool is_snapshot_supported_;
// If there were two snapshots with seq numbers s1 and
// s2 and s1 < s2, and if we find two instances of a key k1 then lies
// entirely within s1 and s2, then the earlier version of k1 can be safely
// deleted because that version is not visible in any snapshot.
std::vector<SequenceNumber> existing_snapshots_;
std::shared_ptr<Cache> table_cache_;
// yield callback
std::function<uint64_t()> yield_callback_;
EventLogger* event_logger_;
bool paranoid_file_checks_;
};
} // namespace rocksdb

@ -155,7 +155,6 @@ TEST_F(CompactionJobTest, Simple) {
{compaction_input_files}, 1, 1024 * 1024, 10, 0, kNoCompression, {}));
compaction->SetInputVersion(cfd->current());
SnapshotList snapshots;
int yield_callback_called = 0;
std::function<uint64_t()> yield_callback = [&]() {
yield_callback_called++;
@ -164,17 +163,17 @@ TEST_F(CompactionJobTest, Simple) {
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
mutex_.Lock();
EventLogger event_logger(db_options_.info_log.get());
CompactionJob compaction_job(
0, compaction.get(), db_options_, *cfd->GetLatestMutableCFOptions(),
env_options_, versions_.get(), &shutting_down_, &log_buffer, nullptr,
nullptr, nullptr, &snapshots, true, table_cache_,
std::move(yield_callback), &event_logger);
CompactionJob compaction_job(0, compaction.get(), db_options_, env_options_,
versions_.get(), &shutting_down_, &log_buffer,
nullptr, nullptr, nullptr, {}, table_cache_,
std::move(yield_callback), &event_logger, false);
compaction_job.Prepare();
mutex_.Unlock();
ASSERT_OK(compaction_job.Run());
mutex_.Lock();
Status s;
compaction_job.Install(&s, &mutex_);
compaction_job.Install(&s, *cfd->GetLatestMutableCFOptions(), &mutex_);
ASSERT_OK(s);
mutex_.Unlock();

@ -1482,18 +1482,19 @@ Status DBImpl::CompactFilesImpl(
c->column_family_data(), *c->mutable_cf_options(),
job_context, log_buffer);
};
assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJob compaction_job(
job_context->job_id, c.get(), db_options_, *c->mutable_cf_options(),
env_options_, versions_.get(), &shutting_down_, log_buffer,
directories_.GetDbDir(), directories_.GetDataDir(c->GetOutputPathId()),
stats_, &snapshots_, is_snapshot_supported_, table_cache_,
std::move(yield_callback), &event_logger_);
job_context->job_id, c.get(), db_options_, env_options_, versions_.get(),
&shutting_down_, log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->GetOutputPathId()), stats_,
snapshots_.GetAll(), table_cache_, std::move(yield_callback),
&event_logger_, c->mutable_cf_options()->paranoid_file_checks);
compaction_job.Prepare();
mutex_.Unlock();
Status status = compaction_job.Run();
mutex_.Lock();
compaction_job.Install(&status, &mutex_);
compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_);
if (status.ok()) {
InstallSuperVersionBackground(c->column_family_data(), job_context,
*c->mutable_cf_options());
@ -2357,17 +2358,18 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
*c->mutable_cf_options(), job_context,
log_buffer);
};
assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJob compaction_job(
job_context->job_id, c.get(), db_options_, *c->mutable_cf_options(),
env_options_, versions_.get(), &shutting_down_, log_buffer,
directories_.GetDbDir(), directories_.GetDataDir(c->GetOutputPathId()),
stats_, &snapshots_, is_snapshot_supported_, table_cache_,
std::move(yield_callback), &event_logger_);
job_context->job_id, c.get(), db_options_, env_options_,
versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->GetOutputPathId()), stats_,
snapshots_.GetAll(), table_cache_, std::move(yield_callback),
&event_logger_, c->mutable_cf_options()->paranoid_file_checks);
compaction_job.Prepare();
mutex_.Unlock();
status = compaction_job.Run();
mutex_.Lock();
compaction_job.Install(&status, &mutex_);
compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_);
if (status.ok()) {
InstallSuperVersionBackground(c->column_family_data(), job_context,
*c->mutable_cf_options());

@ -8,6 +8,8 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <vector>
#include "rocksdb/db.h"
namespace rocksdb {
@ -69,13 +71,17 @@ class SnapshotList {
}
// retrieve all snapshot numbers. They are sorted in ascending order.
void getAll(std::vector<SequenceNumber>& ret) {
if (empty()) return;
std::vector<SequenceNumber> GetAll() {
std::vector<SequenceNumber> ret;
if (empty()) {
return ret;
}
SnapshotImpl* s = &list_;
while (s->next_ != &list_) {
ret.push_back(s->next_->number_);
s = s ->next_;
s = s->next_;
}
return ret;
}
// get the sequence number of the most recent snapshot

Loading…
Cancel
Save