Further cleanup of CompactionJob and MergeHelper

Summary:
Simplified logic in CompactionJob and removed unused parameter in
MergeHelper.

Test Plan: make && make check

Reviewers: rven, igor, sdong, yhchiang

Reviewed By: sdong

Subscribers: aekmekji, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D42687
main
Andres Notzli 10 years ago
parent e95c59cd2f
commit d06c82e477
  1. 311
      db/compaction_job.cc
  2. 13
      db/merge_helper.cc
  3. 13
      db/merge_helper.h
  4. 20
      db/merge_helper_test.cc

@ -210,29 +210,15 @@ Status CompactionJob::Run() {
ThreadStatus::STAGE_COMPACTION_RUN);
TEST_SYNC_POINT("CompactionJob::Run():Start");
log_buffer_->FlushBufferToLog();
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
auto* compaction = compact_->compaction;
LogCompaction(cfd, compaction);
LogCompaction(compaction->column_family_data(), compaction);
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
const uint64_t start_micros = env_->NowMicros();
std::unique_ptr<Iterator> input(
versions_->MakeInputIterator(compact_->compaction));
input->SeekToFirst();
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
std::unique_ptr<Iterator> input(versions_->MakeInputIterator(compaction));
input->SeekToFirst();
auto status = ProcessKeyValueCompaction(&imm_micros, input.get());
if (status.ok() &&
(shutting_down_->load(std::memory_order_acquire) || cfd->IsDropped())) {
status = Status::ShutdownInProgress(
"Database shutdown or Column family drop during compaction");
}
if (status.ok() && compact_->builder != nullptr) {
status = FinishCompactionOutputFile(input->status());
}
if (status.ok()) {
status = input->status();
}
input.reset();
if (output_directory_ && !db_options_.disableDataSync) {
@ -331,7 +317,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
false /* internal key corruption is expected */);
auto compaction_filter = cfd->ioptions()->compaction_filter;
std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
if (!compaction_filter) {
if (compaction_filter == nullptr) {
compaction_filter_from_factory =
compact_->compaction->CreateCompactionFilter();
compaction_filter = compaction_filter_from_factory.get();
@ -346,6 +332,9 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
StopWatchNano timer(env_, stats_ != nullptr);
uint64_t total_filter_time = 0;
// TODO(noetzli): check whether we could check !shutting_down_->... only
// only occasionally (see diff D42687)
while (input->Valid() && !shutting_down_->load(std::memory_order_acquire) &&
!cfd->IsDropped() && status.ok()) {
compact_->num_input_records++;
@ -360,10 +349,8 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
Slice value = input->value();
if (compaction_job_stats_ != nullptr) {
compaction_job_stats_->total_input_raw_key_bytes +=
input->key().size();
compaction_job_stats_->total_input_raw_value_bytes +=
input->value().size();
compaction_job_stats_->total_input_raw_key_bytes += key.size();
compaction_job_stats_->total_input_raw_value_bytes += value.size();
}
if (compact_->compaction->ShouldStopBefore(key) &&
@ -375,8 +362,6 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
}
// Handle key/value, add to state, etc.
bool drop = false;
bool current_entry_is_merging = false;
if (!ParseInternalKey(key, &ikey)) {
// Do not hide error keys
// TODO: error key stays in db forever? Figure out the intention/rationale
@ -389,177 +374,157 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
if (compaction_job_stats_ != nullptr) {
compaction_job_stats_->num_corrupt_keys++;
}
} else {
if (compaction_job_stats_ != nullptr && ikey.type == kTypeDeletion) {
compaction_job_stats_->num_input_deletion_records++;
}
if (!has_current_user_key ||
cfd->user_comparator()->Compare(ikey.user_key,
current_user_key.GetKey()) != 0) {
// First occurrence of this user key
current_user_key.SetKey(ikey.user_key);
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber;
visible_in_snapshot = kMaxSequenceNumber;
// apply the compaction filter to the first occurrence of the user key
if (compaction_filter && ikey.type == kTypeValue &&
(visible_at_tip_ || ikey.sequence > latest_snapshot_)) {
// If the user has specified a compaction filter and the sequence
// number is greater than any external snapshot, then invoke the
// filter.
// If the return value of the compaction filter is true, replace
// the entry with a delete marker.
bool value_changed = false;
compaction_filter_value.clear();
if (stats_ != nullptr) {
timer.Start();
}
bool to_delete = compaction_filter->Filter(
compact_->compaction->level(), ikey.user_key, value,
&compaction_filter_value, &value_changed);
total_filter_time += timer.ElapsedNanos();
if (to_delete) {
// make a copy of the original key and convert it to a delete
delete_key.SetInternalKey(ExtractUserKey(key), ikey.sequence,
kTypeDeletion);
// anchor the key again
key = delete_key.GetKey();
// needed because ikey is backed by key
ParseInternalKey(key, &ikey);
// no value associated with delete
value.clear();
++key_drop_user;
} else if (value_changed) {
value = compaction_filter_value;
}
}
}
status = WriteKeyValue(key, value, ikey, input->status());
input->Next();
continue;
}
// If there are no snapshots, then this kv affect visibility at tip.
// Otherwise, search though all existing snapshots to find
// the earlist snapshot that is affected by this kv.
SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot
SequenceNumber visible =
visible_at_tip_
? visible_at_tip_
: findEarliestVisibleSnapshot(ikey.sequence, existing_snapshots_,
&prev_snapshot);
if (visible_in_snapshot == visible) {
// If the earliest snapshot is which this key is visible in
// is the same as the visibily of a previous instance of the
// same key, then this kv is not visible in any snapshot.
// Hidden by an newer entry for same user key
// TODO: why not > ?
assert(last_sequence_for_key >= ikey.sequence);
drop = true; // (A)
++key_drop_newer_entry;
} else if (ikey.type == kTypeDeletion &&
ikey.sequence <= earliest_snapshot_ &&
compact_->compaction->KeyNotExistsBeyondOutputLevel(
ikey.user_key)) {
// For this user key:
// (1) there is no data in higher levels
// (2) data in lower levels will have larger sequence numbers
// (3) data in layers that are being compacted here and have
// smaller sequence numbers will be dropped in the next
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
++key_drop_obsolete;
} else if (ikey.type == kTypeMerge) {
if (!merge.HasOperator()) {
LogToBuffer(log_buffer_, "Options::merge_operator is null.");
status = Status::InvalidArgument(
"merge_operator is not properly initialized.");
break;
if (compaction_job_stats_ != nullptr && ikey.type == kTypeDeletion) {
compaction_job_stats_->num_input_deletion_records++;
}
if (!has_current_user_key ||
cfd->user_comparator()->Compare(ikey.user_key,
current_user_key.GetKey()) != 0) {
// First occurrence of this user key
current_user_key.SetKey(ikey.user_key);
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber;
visible_in_snapshot = kMaxSequenceNumber;
// apply the compaction filter to the first occurrence of the user key
if (compaction_filter && ikey.type == kTypeValue &&
(visible_at_tip_ || ikey.sequence > latest_snapshot_)) {
// If the user has specified a compaction filter and the sequence
// number is greater than any external snapshot, then invoke the
// filter.
// If the return value of the compaction filter is true, replace
// the entry with a delete marker.
bool value_changed = false;
compaction_filter_value.clear();
if (stats_ != nullptr) {
timer.Start();
}
// We know the merge type entry is not hidden, otherwise we would
// have hit (A)
// We encapsulate the merge related state machine in a different
// object to minimize change to the existing flow. Turn out this
// logic could also be nicely re-used for memtable flush purge
// optimization in BuildTable.
merge.MergeUntil(input, prev_snapshot, bottommost_level_,
db_options_.statistics.get(), nullptr, env_);
current_entry_is_merging = true;
if (merge.IsSuccess()) {
// Successfully found Put/Delete/(end-of-key-range) while merging
// Get the merge result
key = merge.key();
bool to_delete = compaction_filter->Filter(
compact_->compaction->level(), ikey.user_key, value,
&compaction_filter_value, &value_changed);
total_filter_time += timer.ElapsedNanos();
if (to_delete) {
// make a copy of the original key and convert it to a delete
delete_key.SetInternalKey(ExtractUserKey(key), ikey.sequence,
kTypeDeletion);
// anchor the key again
key = delete_key.GetKey();
// needed because ikey is backed by key
ParseInternalKey(key, &ikey);
value = merge.value();
} else {
// Did not find a Put/Delete/(end-of-key-range) while merging
// We now have some stack of merge operands to write out.
// NOTE: key,value, and ikey are now referring to old entries.
// These will be correctly set below.
assert(!merge.keys().empty());
assert(merge.keys().size() == merge.values().size());
// Hack to make sure last_sequence_for_key is correct
ParseInternalKey(merge.keys().front(), &ikey);
// no value associated with delete
value.clear();
++key_drop_user;
} else if (value_changed) {
value = compaction_filter_value;
}
}
last_sequence_for_key = ikey.sequence;
visible_in_snapshot = visible;
}
if (!drop) {
// We may write a single key (e.g.: for Put/Delete or successful merge).
// Or we may instead have to write a sequence/list of keys.
// We have to write a sequence iff we have an unsuccessful merge
if (current_entry_is_merging && !merge.IsSuccess()) {
// If there are no snapshots, then this kv affect visibility at tip.
// Otherwise, search though all existing snapshots to find
// the earlist snapshot that is affected by this kv.
SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot
SequenceNumber visible =
visible_at_tip_
? visible_at_tip_
: findEarliestVisibleSnapshot(ikey.sequence, existing_snapshots_,
&prev_snapshot);
if (visible_in_snapshot == visible) {
// If the earliest snapshot is which this key is visible in
// is the same as the visibily of a previous instance of the
// same key, then this kv is not visible in any snapshot.
// Hidden by an newer entry for same user key
// TODO: why not > ?
assert(last_sequence_for_key >= ikey.sequence);
++key_drop_newer_entry;
input->Next(); // (A)
} else if (ikey.type == kTypeDeletion &&
ikey.sequence <= earliest_snapshot_ &&
compact_->compaction->KeyNotExistsBeyondOutputLevel(
ikey.user_key)) {
// For this user key:
// (1) there is no data in higher levels
// (2) data in lower levels will have larger sequence numbers
// (3) data in layers that are being compacted here and have
// smaller sequence numbers will be dropped in the next
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
++key_drop_obsolete;
input->Next();
} else if (ikey.type == kTypeMerge) {
if (!merge.HasOperator()) {
LogToBuffer(log_buffer_, "Options::merge_operator is null.");
status = Status::InvalidArgument(
"merge_operator is not properly initialized.");
break;
}
// We know the merge type entry is not hidden, otherwise we would
// have hit (A)
// We encapsulate the merge related state machine in a different
// object to minimize change to the existing flow. Turn out this
// logic could also be nicely re-used for memtable flush purge
// optimization in BuildTable.
merge.MergeUntil(input, prev_snapshot, bottommost_level_,
db_options_.statistics.get(), env_);
if (merge.IsSuccess()) {
// Successfully found Put/Delete/(end-of-key-range) while merging
// Get the merge result
key = merge.key();
ParseInternalKey(key, &ikey);
status = WriteKeyValue(key, merge.value(), ikey, input->status());
} else {
// Did not find a Put/Delete/(end-of-key-range) while merging
// We now have some stack of merge operands to write out.
// NOTE: key,value, and ikey are now referring to old entries.
// These will be correctly set below.
const auto& keys = merge.keys();
const auto& values = merge.values();
std::deque<std::string>::const_reverse_iterator key_iter =
keys.rbegin(); // The back (*rbegin()) is the first key
std::deque<std::string>::const_reverse_iterator value_iter =
values.rbegin();
key = Slice(*key_iter);
value = Slice(*value_iter);
// We have a list of keys to write, traverse the list.
while (true) {
status = WriteKeyValue(key, value, ikey, input->status());
if (!status.ok()) {
break;
}
++key_iter;
++value_iter;
assert(!keys.empty());
assert(keys.size() == values.size());
// If at end of list
if (key_iter == keys.rend() || value_iter == values.rend()) {
// Sanity Check: if one ends, then both end
assert(key_iter == keys.rend() && value_iter == values.rend());
break;
}
// Otherwise not at end of list. Update key, value, and ikey.
// We have a list of keys to write, write all keys in the list.
for (auto key_iter = keys.rbegin(), value_iter = values.rbegin();
!status.ok() || key_iter != keys.rend();
key_iter++, value_iter++) {
key = Slice(*key_iter);
value = Slice(*value_iter);
ParseInternalKey(key, &ikey);
status = WriteKeyValue(key, value, ikey, input->status());
}
} else {
// There is only one item to be written out
status = WriteKeyValue(key, value, ikey, input->status());
}
} // if (!drop)
// MergeUntil has moved input to the next entry
if (!current_entry_is_merging) {
} else {
status = WriteKeyValue(key, value, ikey, input->status());
input->Next();
}
last_sequence_for_key = ikey.sequence;
visible_in_snapshot = visible;
}
RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time);
RecordDroppedKeys(&key_drop_user, &key_drop_newer_entry, &key_drop_obsolete);
RecordCompactionIOStats();
if (status.ok() &&
(shutting_down_->load(std::memory_order_acquire) || cfd->IsDropped())) {
status = Status::ShutdownInProgress(
"Database shutdown or Column family drop during compaction");
}
if (status.ok() && compact_->builder != nullptr) {
status = FinishCompactionOutputFile(input->status());
}
if (status.ok()) {
status = input->status();
}
return status;
}

@ -58,8 +58,8 @@ Status MergeHelper::TimedFullMerge(const Slice& key, const Slice* value,
// keys_ stores the list of keys encountered while merging.
// operands_ stores the list of merge operands encountered while merging.
// keys_[i] corresponds to operands_[i] for each i.
void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
bool at_bottom, Statistics* stats, int* steps,
void MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before,
const bool at_bottom, Statistics* stats,
Env* env_) {
// Get a copy of the internal key, before it's invalidated by iter->Next()
// Also maintain the list of merge operands seen.
@ -81,9 +81,6 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
ParseInternalKey(keys_.back(), &orig_ikey);
bool hit_the_next_user_key = false;
if (steps) {
++(*steps);
}
for (iter->Next(); iter->Valid(); iter->Next()) {
ParsedInternalKey ikey;
assert(operands_.size() >= 1); // Should be invariants!
@ -138,9 +135,6 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
// move iter to the next entry
iter->Next();
if (steps) {
++(*steps);
}
return;
} else {
// hit a merge
@ -153,9 +147,6 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
// request or later did a partial merge.
keys_.push_front(iter->key().ToString());
operands_.push_front(iter->value().ToString());
if (steps) {
++(*steps);
}
}
}

@ -6,11 +6,12 @@
#ifndef MERGE_HELPER_H
#define MERGE_HELPER_H
#include "db/dbformat.h"
#include "rocksdb/slice.h"
#include <string>
#include <deque>
#include <string>
#include "db/dbformat.h"
#include "rocksdb/env.h"
#include "rocksdb/slice.h"
namespace rocksdb {
@ -56,9 +57,9 @@ class MergeHelper {
// 0 means no restriction
// at_bottom: (IN) true if the iterator covers the bottem level, which means
// we could reach the start of the history of this user key.
void MergeUntil(Iterator* iter, SequenceNumber stop_before = 0,
bool at_bottom = false, Statistics* stats = nullptr,
int* steps = nullptr, Env* env_ = nullptr);
void MergeUntil(Iterator* iter, const SequenceNumber stop_before = 0,
const bool at_bottom = false, Statistics* stats = nullptr,
Env* env_ = nullptr);
// Query the merge result
// These are valid until the next MergeUntil call

@ -18,7 +18,7 @@ namespace rocksdb {
class MergeHelperTest : public testing::Test {
public:
MergeHelperTest() : steps_(0) {}
MergeHelperTest() = default;
~MergeHelperTest() = default;
void RunUInt64MergeHelper(SequenceNumber stop_before, bool at_bottom) {
@ -27,7 +27,7 @@ class MergeHelperTest : public testing::Test {
merge_helper_.reset(new MergeHelper(BytewiseComparator(), merge_op_.get(),
nullptr, 2U, true));
merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom, nullptr,
&steps_, Env::Default());
Env::Default());
}
void RunStringAppendMergeHelper(SequenceNumber stop_before, bool at_bottom) {
@ -36,7 +36,7 @@ class MergeHelperTest : public testing::Test {
merge_helper_.reset(new MergeHelper(BytewiseComparator(), merge_op_.get(),
nullptr, 2U, true));
merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom, nullptr,
&steps_, Env::Default());
Env::Default());
}
std::string Key(const std::string& user_key, const SequenceNumber& seq,
@ -63,9 +63,8 @@ class MergeHelperTest : public testing::Test {
return result;
}
void CheckState(bool success, int steps, int iter_pos) {
void CheckState(bool success, int iter_pos) {
ASSERT_EQ(success, merge_helper_->IsSuccess());
ASSERT_EQ(steps, steps_);
if (iter_pos == -1) {
ASSERT_FALSE(iter_->Valid());
} else {
@ -78,7 +77,6 @@ class MergeHelperTest : public testing::Test {
std::unique_ptr<MergeHelper> merge_helper_;
std::vector<std::string> ks_;
std::vector<std::string> vs_;
int steps_;
};
// If MergeHelper encounters a new key on the last level, we know that
@ -89,7 +87,7 @@ TEST_F(MergeHelperTest, MergeAtBottomSuccess) {
AddKeyVal("b", 10, kTypeMerge, EncodeInt(4U)); // <- Iterator after merge
RunUInt64MergeHelper(0, true);
CheckState(true, 2, 2);
CheckState(true, 2);
ASSERT_EQ(Key("a", 20, kTypeValue), merge_helper_->key());
ASSERT_EQ(EncodeInt(4U), merge_helper_->value());
}
@ -102,7 +100,7 @@ TEST_F(MergeHelperTest, MergeValue) {
AddKeyVal("a", 10, kTypeMerge, EncodeInt(1U));
RunUInt64MergeHelper(0, false);
CheckState(true, 3, 3);
CheckState(true, 3);
ASSERT_EQ(Key("a", 40, kTypeValue), merge_helper_->key());
ASSERT_EQ(EncodeInt(8U), merge_helper_->value());
}
@ -116,7 +114,7 @@ TEST_F(MergeHelperTest, SnapshotBeforeValue) {
AddKeyVal("a", 10, kTypeMerge, EncodeInt(1U));
RunUInt64MergeHelper(31, true);
CheckState(false, 2, 2);
CheckState(false, 2);
ASSERT_EQ(Key("a", 50, kTypeMerge), merge_helper_->keys()[0]);
ASSERT_EQ(EncodeInt(4U), merge_helper_->values()[0]);
}
@ -129,7 +127,7 @@ TEST_F(MergeHelperTest, NoPartialMerge) {
AddKeyVal("a", 30, kTypeMerge, "v");
RunStringAppendMergeHelper(31, true);
CheckState(false, 2, 2);
CheckState(false, 2);
ASSERT_EQ(Key("a", 40, kTypeMerge), merge_helper_->keys()[0]);
ASSERT_EQ("v", merge_helper_->values()[0]);
ASSERT_EQ(Key("a", 50, kTypeMerge), merge_helper_->keys()[1]);
@ -142,7 +140,7 @@ TEST_F(MergeHelperTest, MergeDeletion) {
AddKeyVal("a", 20, kTypeDeletion, "");
RunUInt64MergeHelper(15, false);
CheckState(true, 2, -1);
CheckState(true, -1);
ASSERT_EQ(Key("a", 30, kTypeValue), merge_helper_->key());
ASSERT_EQ(EncodeInt(3U), merge_helper_->value());
}

Loading…
Cancel
Save