You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
rocksdb/db/compaction_job.cc

1226 lines
46 KiB

// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/compaction_job.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <algorithm>
#include <vector>
#include <memory>
#include <list>
#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "db/event_helpers.h"
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/merge_helper.h"
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/version_set.h"
#include "port/port.h"
#include "port/likely.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
#include "table/merger.h"
#include "table/table_builder.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
#include "util/logging.h"
#include "util/log_buffer.h"
#include "util/mutexlock.h"
#include "util/perf_context_imp.h"
#include "util/iostats_context_imp.h"
#include "util/stop_watch.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "util/thread_status_util.h"
namespace rocksdb {
struct CompactionJob::CompactionState {
Compaction* const compaction;
// Files produced by compaction
struct Output {
uint64_t number;
uint32_t path_id;
uint64_t file_size;
InternalKey smallest, largest;
SequenceNumber smallest_seqno, largest_seqno;
};
std::vector<Output> outputs;
// State kept for output being generated
std::unique_ptr<WritableFile> outfile;
std::unique_ptr<TableBuilder> builder;
uint64_t total_bytes;
Output* current_output() { return &outputs[outputs.size() - 1]; }
explicit CompactionState(Compaction* c)
: compaction(c),
total_bytes(0),
num_input_records(0),
num_output_records(0) {}
// Create a client visible context of this compaction
CompactionFilter::Context GetFilterContextV1() {
CompactionFilter::Context context;
context.is_full_compaction = compaction->IsFullCompaction();
context.is_manual_compaction = compaction->IsManualCompaction();
return context;
}
// Create a client visible context of this compaction
CompactionFilterContext GetFilterContext() {
CompactionFilterContext context;
context.is_full_compaction = compaction->IsFullCompaction();
context.is_manual_compaction = compaction->IsManualCompaction();
return context;
}
std::vector<std::string> key_str_buf_;
std::vector<std::string> existing_value_str_buf_;
// new_value_buf_ will only be appended if a value changes
std::vector<std::string> new_value_buf_;
// if values_changed_buf_[i] is true
// new_value_buf_ will add a new entry with the changed value
std::vector<bool> value_changed_buf_;
// to_delete_buf_[i] is true iff key_buf_[i] is deleted
std::vector<bool> to_delete_buf_;
std::vector<std::string> other_key_str_buf_;
std::vector<std::string> other_value_str_buf_;
std::vector<Slice> combined_key_buf_;
std::vector<Slice> combined_value_buf_;
std::string cur_prefix_;
uint64_t num_input_records;
uint64_t num_output_records;
// Buffers the kv-pair that will be run through compaction filter V2
// in the future.
void BufferKeyValueSlices(const Slice& key, const Slice& value) {
key_str_buf_.emplace_back(key.ToString());
existing_value_str_buf_.emplace_back(value.ToString());
}
// Buffers the kv-pair that will not be run through compaction filter V2
// in the future.
void BufferOtherKeyValueSlices(const Slice& key, const Slice& value) {
other_key_str_buf_.emplace_back(key.ToString());
other_value_str_buf_.emplace_back(value.ToString());
}
// Add a kv-pair to the combined buffer
void AddToCombinedKeyValueSlices(const Slice& key, const Slice& value) {
// The real strings are stored in the batch buffers
combined_key_buf_.emplace_back(key);
combined_value_buf_.emplace_back(value);
}
// Merging the two buffers
void MergeKeyValueSliceBuffer(const InternalKeyComparator* comparator) {
size_t i = 0;
size_t j = 0;
size_t total_size = key_str_buf_.size() + other_key_str_buf_.size();
combined_key_buf_.reserve(total_size);
combined_value_buf_.reserve(total_size);
while (i + j < total_size) {
int comp_res = 0;
if (i < key_str_buf_.size() && j < other_key_str_buf_.size()) {
comp_res = comparator->Compare(key_str_buf_[i], other_key_str_buf_[j]);
} else if (i >= key_str_buf_.size() && j < other_key_str_buf_.size()) {
comp_res = 1;
} else if (j >= other_key_str_buf_.size() && i < key_str_buf_.size()) {
comp_res = -1;
}
if (comp_res > 0) {
AddToCombinedKeyValueSlices(other_key_str_buf_[j],
other_value_str_buf_[j]);
j++;
} else if (comp_res < 0) {
AddToCombinedKeyValueSlices(key_str_buf_[i],
existing_value_str_buf_[i]);
i++;
}
}
}
void CleanupBatchBuffer() {
to_delete_buf_.clear();
key_str_buf_.clear();
existing_value_str_buf_.clear();
new_value_buf_.clear();
value_changed_buf_.clear();
to_delete_buf_.shrink_to_fit();
key_str_buf_.shrink_to_fit();
existing_value_str_buf_.shrink_to_fit();
new_value_buf_.shrink_to_fit();
value_changed_buf_.shrink_to_fit();
other_key_str_buf_.clear();
other_value_str_buf_.clear();
other_key_str_buf_.shrink_to_fit();
other_value_str_buf_.shrink_to_fit();
}
void CleanupMergedBuffer() {
combined_key_buf_.clear();
combined_value_buf_.clear();
combined_key_buf_.shrink_to_fit();
combined_value_buf_.shrink_to_fit();
}
};
CompactionJob::CompactionJob(
int job_id, Compaction* compaction, const DBOptions& db_options,
const EnvOptions& env_options, VersionSet* versions,
std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
Directory* db_directory, Directory* output_directory, Statistics* stats,
std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache,
std::function<uint64_t()> yield_callback, EventLogger* event_logger,
bool paranoid_file_checks)
: job_id_(job_id),
compact_(new CompactionState(compaction)),
compaction_stats_(1),
db_options_(db_options),
env_options_(env_options),
env_(db_options.env),
versions_(versions),
shutting_down_(shutting_down),
log_buffer_(log_buffer),
db_directory_(db_directory),
output_directory_(output_directory),
stats_(stats),
existing_snapshots_(std::move(existing_snapshots)),
table_cache_(std::move(table_cache)),
yield_callback_(std::move(yield_callback)),
event_logger_(event_logger),
paranoid_file_checks_(paranoid_file_checks) {
ThreadStatusUtil::SetColumnFamily(compact_->compaction->column_family_data());
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
ReportStartedCompaction(compaction);
}
CompactionJob::~CompactionJob() {
assert(compact_ == nullptr);
ThreadStatusUtil::ResetThreadStatus();
}
void CompactionJob::ReportStartedCompaction(
Compaction* compaction) {
ThreadStatusUtil::SetColumnFamily(
compact_->compaction->column_family_data());
ThreadStatusUtil::SetThreadOperationProperty(
ThreadStatus::COMPACTION_JOB_ID,
job_id_);
ThreadStatusUtil::SetThreadOperationProperty(
ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL,
(static_cast<uint64_t>(compact_->compaction->start_level()) << 32) +
compact_->compaction->output_level());
ThreadStatusUtil::SetThreadOperationProperty(
ThreadStatus::COMPACTION_PROP_FLAGS,
compaction->IsManualCompaction() +
(compaction->IsDeletionCompaction() << 1) +
(compaction->IsTrivialMove() << 2));
ThreadStatusUtil::SetThreadOperationProperty(
ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES,
compaction->CalculateTotalInputSize());
IOSTATS_RESET(bytes_written);
IOSTATS_RESET(bytes_read);
ThreadStatusUtil::SetThreadOperationProperty(
ThreadStatus::COMPACTION_BYTES_WRITTEN, 0);
ThreadStatusUtil::SetThreadOperationProperty(
ThreadStatus::COMPACTION_BYTES_READ, 0);
// Set the thread operation after operation properties
// to ensure GetThreadList() can always show them all together.
ThreadStatusUtil::SetThreadOperation(
ThreadStatus::OP_COMPACTION);
}
void CompactionJob::Prepare() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_PREPARE);
compact_->CleanupBatchBuffer();
compact_->CleanupMergedBuffer();
// Generate file_levels_ for compaction berfore making Iterator
ColumnFamilyData* cfd __attribute__((unused)) =
compact_->compaction->column_family_data();
assert(cfd != nullptr);
assert(cfd->current()->storage_info()->NumLevelFiles(
compact_->compaction->level()) > 0);
assert(compact_->builder == nullptr);
assert(!compact_->outfile);
visible_at_tip_ = 0;
latest_snapshot_ = 0;
if (existing_snapshots_.size() == 0) {
// optimize for fast path if there are no snapshots
visible_at_tip_ = versions_->LastSequence();
earliest_snapshot_ = visible_at_tip_;
} else {
latest_snapshot_ = existing_snapshots_.back();
// Add the current seqno as the 'latest' virtual
// snapshot to the end of this list.
existing_snapshots_.push_back(versions_->LastSequence());
earliest_snapshot_ = existing_snapshots_[0];
}
// Is this compaction producing files at the bottommost level?
bottommost_level_ = compact_->compaction->BottomMostLevel();
}
Status CompactionJob::Run() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_RUN);
TEST_SYNC_POINT("CompactionJob::Run():Start");
log_buffer_->FlushBufferToLog();
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
auto* compaction = compact_->compaction;
// Let's check if anything will get logged. Don't prepare all the info if
// we're not logging
if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) {
Compaction::InputLevelSummaryBuffer inputs_summary;
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Compacting %s, score %.2f", cfd->GetName().c_str(),
job_id_, compaction->InputLevelSummary(&inputs_summary),
compaction->score());
char scratch[2345];
compact_->compaction->Summary(scratch, sizeof(scratch));
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] Compaction start summary: %s\n", cfd->GetName().c_str(), scratch);
// build event logger report
auto stream = event_logger_->Log();
stream << "job" << job_id_ << "event"
<< "compaction_started";
for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
stream << ("files_L" + ToString(compaction->level(i)));
stream.StartArray();
for (auto f : *compaction->inputs(i)) {
stream << f->fd.GetNumber();
}
stream.EndArray();
}
stream << "score" << compaction->score() << "input_data_size"
<< compaction->CalculateTotalInputSize();
}
const uint64_t start_micros = env_->NowMicros();
std::unique_ptr<Iterator> input(
versions_->MakeInputIterator(compact_->compaction));
input->SeekToFirst();
Status status;
ParsedInternalKey ikey;
std::unique_ptr<CompactionFilterV2> compaction_filter_from_factory_v2 =
nullptr;
auto context = compact_->GetFilterContext();
compaction_filter_from_factory_v2 =
cfd->ioptions()->compaction_filter_factory_v2->CreateCompactionFilterV2(
context);
auto compaction_filter_v2 = compaction_filter_from_factory_v2.get();
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
if (!compaction_filter_v2) {
status = ProcessKeyValueCompaction(&imm_micros, input.get(), false);
} else {
// temp_backup_input always point to the start of the current buffer
// temp_backup_input = backup_input;
// iterate through input,
// 1) buffer ineligible keys and value keys into 2 separate buffers;
// 2) send value_buffer to compaction filter and alternate the values;
// 3) merge value_buffer with ineligible_value_buffer;
// 4) run the modified "compaction" using the old for loop.
bool prefix_initialized = false;
shared_ptr<Iterator> backup_input(
versions_->MakeInputIterator(compact_->compaction));
backup_input->SeekToFirst();
uint64_t total_filter_time = 0;
while (backup_input->Valid() &&
!shutting_down_->load(std::memory_order_acquire) &&
!cfd->IsDropped()) {
// FLUSH preempts compaction
// TODO(icanadi) this currently only checks if flush is necessary on
// compacting column family. we should also check if flush is necessary on
// other column families, too
imm_micros += yield_callback_();
Slice key = backup_input->key();
Slice value = backup_input->value();
if (!ParseInternalKey(key, &ikey)) {
// log error
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Failed to parse key: %s", cfd->GetName().c_str(),
job_id_, key.ToString().c_str());
continue;
} else {
const SliceTransform* transformer =
cfd->ioptions()->compaction_filter_factory_v2->GetPrefixExtractor();
const auto key_prefix = transformer->Transform(ikey.user_key);
if (!prefix_initialized) {
compact_->cur_prefix_ = key_prefix.ToString();
prefix_initialized = true;
}
// If the prefix remains the same, keep buffering
if (key_prefix.compare(Slice(compact_->cur_prefix_)) == 0) {
// Apply the compaction filter V2 to all the kv pairs sharing
// the same prefix
if (ikey.type == kTypeValue &&
(visible_at_tip_ || ikey.sequence > latest_snapshot_)) {
// Buffer all keys sharing the same prefix for CompactionFilterV2
// Iterate through keys to check prefix
compact_->BufferKeyValueSlices(key, value);
} else {
// buffer ineligible keys
compact_->BufferOtherKeyValueSlices(key, value);
}
backup_input->Next();
continue;
// finish changing values for eligible keys
} else {
// Now prefix changes, this batch is done.
// Call compaction filter on the buffered values to change the value
if (compact_->key_str_buf_.size() > 0) {
uint64_t time = 0;
CallCompactionFilterV2(compaction_filter_v2, &time);
total_filter_time += time;
}
compact_->cur_prefix_ = key_prefix.ToString();
}
}
// Merge this batch of data (values + ineligible keys)
compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
// Done buffering for the current prefix. Spit it out to disk
// Now just iterate through all the kv-pairs
status = ProcessKeyValueCompaction(&imm_micros, input.get(), true);
if (!status.ok()) {
break;
}
// After writing the kv-pairs, we can safely remove the reference
// to the string buffer and clean them up
compact_->CleanupBatchBuffer();
compact_->CleanupMergedBuffer();
// Buffer the key that triggers the mismatch in prefix
if (ikey.type == kTypeValue &&
(visible_at_tip_ || ikey.sequence > latest_snapshot_)) {
compact_->BufferKeyValueSlices(key, value);
} else {
compact_->BufferOtherKeyValueSlices(key, value);
}
backup_input->Next();
if (!backup_input->Valid()) {
// If this is the single last value, we need to merge it.
if (compact_->key_str_buf_.size() > 0) {
uint64_t time = 0;
CallCompactionFilterV2(compaction_filter_v2, &time);
total_filter_time += time;
}
compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
status = ProcessKeyValueCompaction(&imm_micros, input.get(), true);
if (!status.ok()) {
break;
}
compact_->CleanupBatchBuffer();
compact_->CleanupMergedBuffer();
}
} // done processing all prefix batches
// finish the last batch
if (status.ok()) {
if (compact_->key_str_buf_.size() > 0) {
uint64_t time = 0;
CallCompactionFilterV2(compaction_filter_v2, &time);
total_filter_time += time;
}
compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
status = ProcessKeyValueCompaction(&imm_micros, input.get(), true);
}
RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time);
} // checking for compaction filter v2
if (status.ok() &&
(shutting_down_->load(std::memory_order_acquire) || cfd->IsDropped())) {
status = Status::ShutdownInProgress(
"Database shutdown or Column family drop during compaction");
}
if (status.ok() && compact_->builder != nullptr) {
status = FinishCompactionOutputFile(input.get());
}
if (status.ok()) {
status = input->status();
}
input.reset();
if (output_directory_ && !db_options_.disableDataSync) {
output_directory_->Fsync();
}
compaction_stats_.micros = env_->NowMicros() - start_micros - imm_micros;
compaction_stats_.files_in_leveln =
static_cast<int>(compact_->compaction->num_input_files(0));
compaction_stats_.files_in_levelnp1 =
static_cast<int>(compact_->compaction->num_input_files(1));
MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros);
size_t num_output_files = compact_->outputs.size();
if (compact_->builder != nullptr) {
// An error occurred so ignore the last output.
assert(num_output_files > 0);
--num_output_files;
}
compaction_stats_.files_out_levelnp1 = static_cast<int>(num_output_files);
for (size_t i = 0; i < compact_->compaction->num_input_files(0); i++) {
compaction_stats_.bytes_readn +=
compact_->compaction->input(0, i)->fd.GetFileSize();
compaction_stats_.num_input_records +=
static_cast<uint64_t>(compact_->compaction->input(0, i)->num_entries);
}
for (size_t i = 0; i < compact_->compaction->num_input_files(1); i++) {
compaction_stats_.bytes_readnp1 +=
compact_->compaction->input(1, i)->fd.GetFileSize();
}
for (size_t i = 0; i < num_output_files; i++) {
compaction_stats_.bytes_written += compact_->outputs[i].file_size;
}
if (compact_->num_input_records > compact_->num_output_records) {
compaction_stats_.num_dropped_records +=
compact_->num_input_records - compact_->num_output_records;
}
RecordCompactionIOStats();
LogFlush(db_options_.info_log);
TEST_SYNC_POINT("CompactionJob::Run():End");
return status;
}
void CompactionJob::Install(Status* status,
const MutableCFOptions& mutable_cf_options,
InstrumentedMutex* db_mutex) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_INSTALL);
db_mutex->AssertHeld();
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
cfd->internal_stats()->AddCompactionStats(
compact_->compaction->output_level(), compaction_stats_);
if (status->ok()) {
*status = InstallCompactionResults(db_mutex, mutable_cf_options);
}
VersionStorageInfo::LevelSummaryStorage tmp;
auto vstorage = cfd->current()->storage_info();
const auto& stats = compaction_stats_;
LogToBuffer(log_buffer_,
"[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
"files in(%d, %d) out(%d) "
"MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
"write-amplify(%.1f) %s, records in: %d, records dropped: %d\n",
cfd->GetName().c_str(), vstorage->LevelSummary(&tmp),
(stats.bytes_readn + stats.bytes_readnp1) /
static_cast<double>(stats.micros),
stats.bytes_written / static_cast<double>(stats.micros),
compact_->compaction->output_level(), stats.files_in_leveln,
stats.files_in_levelnp1, stats.files_out_levelnp1,
stats.bytes_readn / 1048576.0, stats.bytes_readnp1 / 1048576.0,
stats.bytes_written / 1048576.0,
(stats.bytes_written + stats.bytes_readnp1 + stats.bytes_readn) /
static_cast<double>(stats.bytes_readn),
stats.bytes_written / static_cast<double>(stats.bytes_readn),
status->ToString().c_str(), stats.num_input_records,
stats.num_dropped_records);
auto stream = event_logger_->LogToBuffer(log_buffer_);
stream << "job" << job_id_ << "event"
<< "compaction_finished"
<< "output_level" << compact_->compaction->output_level()
<< "num_output_files" << compact_->outputs.size()
<< "total_output_size" << compact_->total_bytes << "num_input_records"
<< compact_->num_input_records << "num_output_records"
<< compact_->num_output_records;
stream << "lsm_state";
stream.StartArray();
for (int level = 0; level < vstorage->num_levels(); ++level) {
stream << vstorage->NumLevelFiles(level);
}
stream.EndArray();
CleanupCompaction(*status);
}
Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
Iterator* input,
bool is_compaction_v2) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
size_t combined_idx = 0;
Status status;
std::string compaction_filter_value;
ParsedInternalKey ikey;
IterKey current_user_key;
bool has_current_user_key = false;
IterKey delete_key;
SequenceNumber last_sequence_for_key __attribute__((unused)) =
kMaxSequenceNumber;
SequenceNumber visible_in_snapshot = kMaxSequenceNumber;
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
MergeHelper merge(cfd->user_comparator(), cfd->ioptions()->merge_operator,
db_options_.info_log.get(),
cfd->ioptions()->min_partial_merge_operands,
false /* internal key corruption is expected */);
auto compaction_filter = cfd->ioptions()->compaction_filter;
std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
if (!compaction_filter) {
auto context = compact_->GetFilterContextV1();
compaction_filter_from_factory =
cfd->ioptions()->compaction_filter_factory->CreateCompactionFilter(
context);
compaction_filter = compaction_filter_from_factory.get();
}
TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
int64_t key_drop_user = 0;
int64_t key_drop_newer_entry = 0;
int64_t key_drop_obsolete = 0;
int64_t loop_cnt = 0;
StopWatchNano timer(env_, stats_ != nullptr);
uint64_t total_filter_time = 0;
while (input->Valid() && !shutting_down_->load(std::memory_order_acquire) &&
!cfd->IsDropped() && status.ok()) {
compact_->num_input_records++;
if (++loop_cnt > 1000) {
if (key_drop_user > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user);
key_drop_user = 0;
}
if (key_drop_newer_entry > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY,
key_drop_newer_entry);
key_drop_newer_entry = 0;
}
if (key_drop_obsolete > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, key_drop_obsolete);
key_drop_obsolete = 0;
}
RecordCompactionIOStats();
loop_cnt = 0;
}
// FLUSH preempts compaction
// TODO(icanadi) this currently only checks if flush is necessary on
// compacting column family. we should also check if flush is necessary on
// other column families, too
(*imm_micros) += yield_callback_();
Slice key;
Slice value;
// If is_compaction_v2 is on, kv-pairs are reset to the prefix batch.
// This prefix batch should contain results after calling
// compaction_filter_v2.
//
// If is_compaction_v2 is off, this function will go through all the
// kv-pairs in input.
if (!is_compaction_v2) {
key = input->key();
value = input->value();
} else {
if (combined_idx >= compact_->combined_key_buf_.size()) {
break;
}
assert(combined_idx < compact_->combined_key_buf_.size());
key = compact_->combined_key_buf_[combined_idx];
value = compact_->combined_value_buf_[combined_idx];
++combined_idx;
}
if (compact_->compaction->ShouldStopBefore(key) &&
compact_->builder != nullptr) {
status = FinishCompactionOutputFile(input);
if (!status.ok()) {
break;
}
}
// Handle key/value, add to state, etc.
bool drop = false;
bool current_entry_is_merging = false;
if (!ParseInternalKey(key, &ikey)) {
// Do not hide error keys
// TODO: error key stays in db forever? Figure out the intention/rationale
// v10 error v8 : we cannot hide v8 even though it's pretty obvious.
current_user_key.Clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
visible_in_snapshot = kMaxSequenceNumber;
} else {
if (!has_current_user_key ||
cfd->user_comparator()->Compare(ikey.user_key,
current_user_key.GetKey()) != 0) {
// First occurrence of this user key
current_user_key.SetKey(ikey.user_key);
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber;
visible_in_snapshot = kMaxSequenceNumber;
// apply the compaction filter to the first occurrence of the user key
if (compaction_filter && !is_compaction_v2 && ikey.type == kTypeValue &&
(visible_at_tip_ || ikey.sequence > latest_snapshot_)) {
// If the user has specified a compaction filter and the sequence
// number is greater than any external snapshot, then invoke the
// filter.
// If the return value of the compaction filter is true, replace
// the entry with a delete marker.
bool value_changed = false;
compaction_filter_value.clear();
if (stats_ != nullptr) {
timer.Start();
}
bool to_delete = compaction_filter->Filter(
compact_->compaction->level(), ikey.user_key, value,
&compaction_filter_value, &value_changed);
total_filter_time += timer.ElapsedNanos();
if (to_delete) {
// make a copy of the original key and convert it to a delete
delete_key.SetInternalKey(ExtractUserKey(key), ikey.sequence,
kTypeDeletion);
// anchor the key again
key = delete_key.GetKey();
// needed because ikey is backed by key
ParseInternalKey(key, &ikey);
// no value associated with delete
value.clear();
++key_drop_user;
} else if (value_changed) {
value = compaction_filter_value;
}
}
}
// If there are no snapshots, then this kv affect visibility at tip.
// Otherwise, search though all existing snapshots to find
// the earlist snapshot that is affected by this kv.
SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot
SequenceNumber visible =
visible_at_tip_
? visible_at_tip_
: findEarliestVisibleSnapshot(ikey.sequence, existing_snapshots_,
&prev_snapshot);
if (visible_in_snapshot == visible) {
// If the earliest snapshot is which this key is visible in
// is the same as the visibily of a previous instance of the
// same key, then this kv is not visible in any snapshot.
// Hidden by an newer entry for same user key
// TODO: why not > ?
assert(last_sequence_for_key >= ikey.sequence);
drop = true; // (A)
++key_drop_newer_entry;
} else if (ikey.type == kTypeDeletion &&
ikey.sequence <= earliest_snapshot_ &&
compact_->compaction->KeyNotExistsBeyondOutputLevel(
ikey.user_key)) {
// For this user key:
// (1) there is no data in higher levels
// (2) data in lower levels will have larger sequence numbers
// (3) data in layers that are being compacted here and have
// smaller sequence numbers will be dropped in the next
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
++key_drop_obsolete;
} else if (ikey.type == kTypeMerge) {
if (!merge.HasOperator()) {
LogToBuffer(log_buffer_, "Options::merge_operator is null.");
status = Status::InvalidArgument(
"merge_operator is not properly initialized.");
break;
}
// We know the merge type entry is not hidden, otherwise we would
// have hit (A)
// We encapsulate the merge related state machine in a different
// object to minimize change to the existing flow. Turn out this
// logic could also be nicely re-used for memtable flush purge
// optimization in BuildTable.
int steps = 0;
merge.MergeUntil(input, prev_snapshot, bottommost_level_,
db_options_.statistics.get(), &steps, env_);
// Skip the Merge ops
combined_idx = combined_idx - 1 + steps;
current_entry_is_merging = true;
if (merge.IsSuccess()) {
// Successfully found Put/Delete/(end-of-key-range) while merging
// Get the merge result
key = merge.key();
ParseInternalKey(key, &ikey);
value = merge.value();
} else {
// Did not find a Put/Delete/(end-of-key-range) while merging
// We now have some stack of merge operands to write out.
// NOTE: key,value, and ikey are now referring to old entries.
// These will be correctly set below.
assert(!merge.keys().empty());
assert(merge.keys().size() == merge.values().size());
// Hack to make sure last_sequence_for_key is correct
ParseInternalKey(merge.keys().front(), &ikey);
}
}
last_sequence_for_key = ikey.sequence;
visible_in_snapshot = visible;
}
if (!drop) {
// We may write a single key (e.g.: for Put/Delete or successful merge).
// Or we may instead have to write a sequence/list of keys.
// We have to write a sequence iff we have an unsuccessful merge
bool has_merge_list = current_entry_is_merging && !merge.IsSuccess();
const std::deque<std::string>* keys = nullptr;
const std::deque<std::string>* values = nullptr;
std::deque<std::string>::const_reverse_iterator key_iter;
std::deque<std::string>::const_reverse_iterator value_iter;
if (has_merge_list) {
keys = &merge.keys();
values = &merge.values();
key_iter = keys->rbegin(); // The back (*rbegin()) is the first key
value_iter = values->rbegin();
key = Slice(*key_iter);
value = Slice(*value_iter);
}
// If we have a list of keys to write, traverse the list.
// If we have a single key to write, simply write that key.
while (true) {
// Invariant: key,value,ikey will always be the next entry to write
char* kptr = (char*)key.data();
std::string kstr;
// Zeroing out the sequence number leads to better compression.
// If this is the bottommost level (no files in lower levels)
// and the earliest snapshot is larger than this seqno
// then we can squash the seqno to zero.
if (bottommost_level_ && ikey.sequence < earliest_snapshot_ &&
ikey.type != kTypeMerge) {
assert(ikey.type != kTypeDeletion);
// make a copy because updating in place would cause problems
// with the priority queue that is managing the input key iterator
kstr.assign(key.data(), key.size());
kptr = (char*)kstr.c_str();
UpdateInternalKey(kptr, key.size(), (uint64_t)0, ikey.type);
}
Slice newkey(kptr, key.size());
assert((key.clear(), 1)); // we do not need 'key' anymore
// Open output file if necessary
if (compact_->builder == nullptr) {
status = OpenCompactionOutputFile();
if (!status.ok()) {
break;
}
}
SequenceNumber seqno = GetInternalKeySeqno(newkey);
if (compact_->builder->NumEntries() == 0) {
compact_->current_output()->smallest.DecodeFrom(newkey);
compact_->current_output()->smallest_seqno = seqno;
} else {
compact_->current_output()->smallest_seqno =
std::min(compact_->current_output()->smallest_seqno, seqno);
}
compact_->current_output()->largest.DecodeFrom(newkey);
compact_->builder->Add(newkey, value);
compact_->num_output_records++,
compact_->current_output()->largest_seqno =
std::max(compact_->current_output()->largest_seqno, seqno);
// Close output file if it is big enough
if (compact_->builder->FileSize() >=
compact_->compaction->MaxOutputFileSize()) {
status = FinishCompactionOutputFile(input);
if (!status.ok()) {
break;
}
}
// If we have a list of entries, move to next element
// If we only had one entry, then break the loop.
if (has_merge_list) {
++key_iter;
++value_iter;
// If at end of list
if (key_iter == keys->rend() || value_iter == values->rend()) {
// Sanity Check: if one ends, then both end
assert(key_iter == keys->rend() && value_iter == values->rend());
break;
}
// Otherwise not at end of list. Update key, value, and ikey.
key = Slice(*key_iter);
value = Slice(*value_iter);
ParseInternalKey(key, &ikey);
} else {
// Only had one item to begin with (Put/Delete)
break;
}
} // while (true)
} // if (!drop)
// MergeUntil has moved input to the next entry
if (!current_entry_is_merging) {
input->Next();
}
}
RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time);
if (key_drop_user > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user);
}
if (key_drop_newer_entry > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, key_drop_newer_entry);
}
if (key_drop_obsolete > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, key_drop_obsolete);
}
RecordCompactionIOStats();
return status;
}
void CompactionJob::CallCompactionFilterV2(
CompactionFilterV2* compaction_filter_v2, uint64_t* time) {
if (compact_ == nullptr || compaction_filter_v2 == nullptr) {
return;
}
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_FILTER_V2);
// Assemble slice vectors for user keys and existing values.
// We also keep track of our parsed internal key structs because
// we may need to access the sequence number in the event that
// keys are garbage collected during the filter process.
std::vector<ParsedInternalKey> ikey_buf;
std::vector<Slice> user_key_buf;
std::vector<Slice> existing_value_buf;
for (const auto& key : compact_->key_str_buf_) {
ParsedInternalKey ikey;
ParseInternalKey(Slice(key), &ikey);
ikey_buf.emplace_back(ikey);
user_key_buf.emplace_back(ikey.user_key);
}
for (const auto& value : compact_->existing_value_str_buf_) {
existing_value_buf.emplace_back(Slice(value));
}
// If the user has specified a compaction filter and the sequence
// number is greater than any external snapshot, then invoke the
// filter.
// If the return value of the compaction filter is true, replace
// the entry with a delete marker.
StopWatchNano timer(env_, stats_ != nullptr);
compact_->to_delete_buf_ = compaction_filter_v2->Filter(
compact_->compaction->level(), user_key_buf, existing_value_buf,
&compact_->new_value_buf_, &compact_->value_changed_buf_);
*time = timer.ElapsedNanos();
// new_value_buf_.size() <= to_delete__buf_.size(). "=" iff all
// kv-pairs in this compaction run needs to be deleted.
assert(compact_->to_delete_buf_.size() == compact_->key_str_buf_.size());
assert(compact_->to_delete_buf_.size() ==
compact_->existing_value_str_buf_.size());
assert(compact_->value_changed_buf_.empty() ||
compact_->to_delete_buf_.size() ==
compact_->value_changed_buf_.size());
int new_value_idx = 0;
for (unsigned int i = 0; i < compact_->to_delete_buf_.size(); ++i) {
if (compact_->to_delete_buf_[i]) {
// update the string buffer directly
// the Slice buffer points to the updated buffer
UpdateInternalKey(&compact_->key_str_buf_[i][0],
compact_->key_str_buf_[i].size(), ikey_buf[i].sequence,
kTypeDeletion);
// no value associated with delete
compact_->existing_value_str_buf_[i].clear();
RecordTick(stats_, COMPACTION_KEY_DROP_USER);
} else if (!compact_->value_changed_buf_.empty() &&
compact_->value_changed_buf_[i]) {
compact_->existing_value_str_buf_[i] =
compact_->new_value_buf_[new_value_idx++];
}
} // for
}
Status CompactionJob::FinishCompactionOutputFile(Iterator* input) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_SYNC_FILE);
assert(compact_ != nullptr);
assert(compact_->outfile);
assert(compact_->builder != nullptr);
const uint64_t output_number = compact_->current_output()->number;
const uint32_t output_path_id = compact_->current_output()->path_id;
assert(output_number != 0);
TableProperties table_properties;
// Check for iterator errors
Status s = input->status();
const uint64_t current_entries = compact_->builder->NumEntries();
if (s.ok()) {
s = compact_->builder->Finish();
} else {
compact_->builder->Abandon();
}
if (s.ok()) {
table_properties = compact_->builder->GetTableProperties();
}
const uint64_t current_bytes = compact_->builder->FileSize();
compact_->current_output()->file_size = current_bytes;
compact_->total_bytes += current_bytes;
compact_->builder.reset();
// Finish and check for file errors
if (s.ok() && !db_options_.disableDataSync) {
if (db_options_.use_fsync) {
StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
s = compact_->outfile->Fsync();
} else {
StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
s = compact_->outfile->Sync();
}
}
if (s.ok()) {
s = compact_->outfile->Close();
}
compact_->outfile.reset();
if (s.ok() && current_entries > 0) {
// Verify that the table is usable
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
FileDescriptor fd(output_number, output_path_id, current_bytes);
Iterator* iter = cfd->table_cache()->NewIterator(
ReadOptions(), env_options_, cfd->internal_comparator(), fd);
s = iter->status();
if (s.ok() && paranoid_file_checks_) {
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {}
s = iter->status();
}
delete iter;
if (s.ok()) {
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
" keys, %" PRIu64 " bytes",
cfd->GetName().c_str(), job_id_, output_number, current_entries,
current_bytes);
EventHelpers::LogTableFileCreation(event_logger_, job_id_,
output_number, current_bytes,
table_properties);
}
}
return s;
}
Status CompactionJob::InstallCompactionResults(
InstrumentedMutex* db_mutex, const MutableCFOptions& mutable_cf_options) {
db_mutex->AssertHeld();
auto* compaction = compact_->compaction;
// paranoia: verify that the files that we started with
// still exist in the current version and in the same original level.
// This ensures that a concurrent compaction did not erroneously
// pick the same files to compact_.
if (!versions_->VerifyCompactionFileConsistency(compaction)) {
Compaction::InputLevelSummaryBuffer inputs_summary;
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Compaction %s aborted",
compaction->column_family_data()->GetName().c_str(), job_id_,
compaction->InputLevelSummary(&inputs_summary));
return Status::Corruption("Compaction input files inconsistent");
}
{
Compaction::InputLevelSummaryBuffer inputs_summary;
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
compaction->column_family_data()->GetName().c_str(), job_id_,
compaction->InputLevelSummary(&inputs_summary), compact_->total_bytes);
}
// Add compaction outputs
compaction->AddInputDeletions(compact_->compaction->edit());
for (size_t i = 0; i < compact_->outputs.size(); i++) {
const CompactionState::Output& out = compact_->outputs[i];
compaction->edit()->AddFile(
compaction->output_level(), out.number, out.path_id, out.file_size,
out.smallest, out.largest, out.smallest_seqno, out.largest_seqno);
}
return versions_->LogAndApply(compaction->column_family_data(),
mutable_cf_options, compaction->edit(),
db_mutex, db_directory_);
}
// Given a sequence number, return the sequence number of the
// earliest snapshot that this sequence number is visible in.
// The snapshots themselves are arranged in ascending order of
// sequence numbers.
// Employ a sequential search because the total number of
// snapshots are typically small.
inline SequenceNumber CompactionJob::findEarliestVisibleSnapshot(
SequenceNumber in, const std::vector<SequenceNumber>& snapshots,
SequenceNumber* prev_snapshot) {
assert(snapshots.size());
SequenceNumber prev __attribute__((unused)) = 0;
for (const auto cur : snapshots) {
assert(prev <= cur);
if (cur >= in) {
*prev_snapshot = prev;
return cur;
}
prev = cur; // assignment
assert(prev);
}
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
"CompactionJob is not able to find snapshot"
" with SeqId later than %" PRIu64
": current MaxSeqId is %" PRIu64 "",
in, snapshots[snapshots.size() - 1]);
assert(0);
return 0;
}
void CompactionJob::RecordCompactionIOStats() {
RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read));
ThreadStatusUtil::IncreaseThreadOperationProperty(
ThreadStatus::COMPACTION_BYTES_READ, IOSTATS(bytes_read));
IOSTATS_RESET(bytes_read);
RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written));
ThreadStatusUtil::IncreaseThreadOperationProperty(
ThreadStatus::COMPACTION_BYTES_WRITTEN, IOSTATS(bytes_written));
IOSTATS_RESET(bytes_written);
}
Status CompactionJob::OpenCompactionOutputFile() {
assert(compact_ != nullptr);
assert(compact_->builder == nullptr);
// no need to lock because VersionSet::next_file_number_ is atomic
uint64_t file_number = versions_->NewFileNumber();
// Make the output file
std::string fname = TableFileName(db_options_.db_paths, file_number,
compact_->compaction->GetOutputPathId());
Status s = env_->NewWritableFile(fname, &compact_->outfile, env_options_);
if (!s.ok()) {
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64
" fails at NewWritableFile with status %s",
compact_->compaction->column_family_data()->GetName().c_str(), job_id_,
file_number, s.ToString().c_str());
LogFlush(db_options_.info_log);
return s;
}
CompactionState::Output out;
out.number = file_number;
out.path_id = compact_->compaction->GetOutputPathId();
out.smallest.Clear();
out.largest.Clear();
out.smallest_seqno = out.largest_seqno = 0;
compact_->outputs.push_back(out);
compact_->outfile->SetIOPriority(Env::IO_LOW);
compact_->outfile->SetPreallocationBlockSize(
static_cast<size_t>(compact_->compaction->OutputFilePreallocationSize()));
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
bool skip_filters = false;
// If the Column family flag is to only optimize filters for hits,
// we can skip creating filters if this is the bottommost_level where
// data is going to be found
//
if (cfd->ioptions()->optimize_filters_for_hits && bottommost_level_) {
skip_filters = true;
}
compact_->builder.reset(NewTableBuilder(
*cfd->ioptions(), cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), compact_->outfile.get(),
compact_->compaction->OutputCompressionType(),
cfd->ioptions()->compression_opts, skip_filters));
LogFlush(db_options_.info_log);
return s;
}
void CompactionJob::CleanupCompaction(const Status& status) {
if (compact_->builder != nullptr) {
// May happen if we get a shutdown call in the middle of compaction
compact_->builder->Abandon();
compact_->builder.reset();
} else {
assert(!status.ok() || compact_->outfile == nullptr);
}
for (size_t i = 0; i < compact_->outputs.size(); i++) {
const CompactionState::Output& out = compact_->outputs[i];
// If this file was inserted into the table cache then remove
// them here because this compaction was not committed.
if (!status.ok()) {
TableCache::Evict(table_cache_.get(), out.number);
}
}
delete compact_;
compact_ = nullptr;
}
} // namespace rocksdb