Refactoring of writing key/value pairs

Summary:
Before, writing key/value pairs out to files was done inside
ProcessKeyValueCompaction(). To make ProcessKeyValueCompaction()
more understandable, this patch moves the writing part to a separate
function. This is intended to be a stepping stone for additional
changes.

Test Plan: make && make check

Reviewers: sdong, rven, yhchiang, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D42243
main
Andres Notzli 10 years ago
parent e1c99e10c1
commit 6b2d44b2ff
  1. 148
      db/compaction_job.cc
  2. 6
      db/compaction_job.h

@ -327,7 +327,7 @@ Status CompactionJob::Run() {
} else { } else {
status = ProcessPrefixBatches(cfd, &imm_micros, input.get(), status = ProcessPrefixBatches(cfd, &imm_micros, input.get(),
compaction_filter_v2); compaction_filter_v2);
} // checking for compaction filter v2 }
if (status.ok() && if (status.ok() &&
(shutting_down_->load(std::memory_order_acquire) || cfd->IsDropped())) { (shutting_down_->load(std::memory_order_acquire) || cfd->IsDropped())) {
@ -335,7 +335,7 @@ Status CompactionJob::Run() {
"Database shutdown or Column family drop during compaction"); "Database shutdown or Column family drop during compaction");
} }
if (status.ok() && compact_->builder != nullptr) { if (status.ok() && compact_->builder != nullptr) {
status = FinishCompactionOutputFile(input.get()); status = FinishCompactionOutputFile(input->status());
} }
if (status.ok()) { if (status.ok()) {
status = input->status(); status = input->status();
@ -629,7 +629,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
if (compact_->compaction->ShouldStopBefore(key) && if (compact_->compaction->ShouldStopBefore(key) &&
compact_->builder != nullptr) { compact_->builder != nullptr) {
status = FinishCompactionOutputFile(input); status = FinishCompactionOutputFile(input->status());
if (!status.ok()) { if (!status.ok()) {
break; break;
} }
@ -772,85 +772,31 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
// We may write a single key (e.g.: for Put/Delete or successful merge). // We may write a single key (e.g.: for Put/Delete or successful merge).
// Or we may instead have to write a sequence/list of keys. // Or we may instead have to write a sequence/list of keys.
// We have to write a sequence iff we have an unsuccessful merge // We have to write a sequence iff we have an unsuccessful merge
bool has_merge_list = current_entry_is_merging && !merge.IsSuccess(); if (current_entry_is_merging && !merge.IsSuccess()) {
const std::deque<std::string>* keys = nullptr; const auto& keys = merge.keys();
const std::deque<std::string>* values = nullptr; const auto& values = merge.values();
std::deque<std::string>::const_reverse_iterator key_iter; std::deque<std::string>::const_reverse_iterator key_iter =
std::deque<std::string>::const_reverse_iterator value_iter; keys.rbegin(); // The back (*rbegin()) is the first key
if (has_merge_list) { std::deque<std::string>::const_reverse_iterator value_iter =
keys = &merge.keys(); values.rbegin();
values = &merge.values();
key_iter = keys->rbegin(); // The back (*rbegin()) is the first key
value_iter = values->rbegin();
key = Slice(*key_iter); key = Slice(*key_iter);
value = Slice(*value_iter); value = Slice(*value_iter);
}
// If we have a list of keys to write, traverse the list.
// If we have a single key to write, simply write that key.
while (true) {
// Invariant: key,value,ikey will always be the next entry to write
Slice newkey(key.data(), key.size());
std::string kstr;
// Zeroing out the sequence number leads to better compression.
// If this is the bottommost level (no files in lower levels)
// and the earliest snapshot is larger than this seqno
// then we can squash the seqno to zero.
if (bottommost_level_ && ikey.sequence < earliest_snapshot_ &&
ikey.type != kTypeMerge) {
assert(ikey.type != kTypeDeletion);
// make a copy because updating in place would cause problems
// with the priority queue that is managing the input key iterator
kstr.assign(key.data(), key.size());
UpdateInternalKey(&kstr, (uint64_t)0, ikey.type);
newkey = Slice(kstr);
}
assert((key.clear(), 1)); // we do not need 'key' anymore
// Open output file if necessary // We have a list of keys to write, traverse the list.
if (compact_->builder == nullptr) { while (true) {
status = OpenCompactionOutputFile(); status = WriteKeyValue(key, value, ikey, input->status());
if (!status.ok()) { if (!status.ok()) {
break; break;
} }
}
SequenceNumber seqno = GetInternalKeySeqno(newkey);
if (compact_->builder->NumEntries() == 0) {
compact_->current_output()->smallest.DecodeFrom(newkey);
compact_->current_output()->smallest_seqno = seqno;
} else {
compact_->current_output()->smallest_seqno =
std::min(compact_->current_output()->smallest_seqno, seqno);
}
compact_->current_output()->largest.DecodeFrom(newkey);
compact_->builder->Add(newkey, value);
compact_->num_output_records++;
compact_->current_output()->largest_seqno =
std::max(compact_->current_output()->largest_seqno, seqno);
// Close output file if it is big enough
if (compact_->builder->FileSize() >=
compact_->compaction->max_output_file_size()) {
status = FinishCompactionOutputFile(input);
if (!status.ok()) {
break;
}
}
// If we have a list of entries, move to next element
// If we only had one entry, then break the loop.
if (has_merge_list) {
++key_iter; ++key_iter;
++value_iter; ++value_iter;
// If at end of list // If at end of list
if (key_iter == keys->rend() || value_iter == values->rend()) { if (key_iter == keys.rend() || value_iter == values.rend()) {
// Sanity Check: if one ends, then both end // Sanity Check: if one ends, then both end
assert(key_iter == keys->rend() && value_iter == values->rend()); assert(key_iter == keys.rend() && value_iter == values.rend());
break; break;
} }
@ -858,12 +804,11 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
key = Slice(*key_iter); key = Slice(*key_iter);
value = Slice(*value_iter); value = Slice(*value_iter);
ParseInternalKey(key, &ikey); ParseInternalKey(key, &ikey);
} else {
// Only had one item to begin with (Put/Delete)
break;
} }
} // while (true) } else {
// There is only one item to be written out
status = WriteKeyValue(key, value, ikey, input->status());
}
} // if (!drop) } // if (!drop)
// MergeUntil has moved input to the next entry // MergeUntil has moved input to the next entry
@ -878,6 +823,57 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
return status; return status;
} }
Status CompactionJob::WriteKeyValue(const Slice& key, const Slice& value,
const ParsedInternalKey& ikey, const Status& input_status) {
Slice newkey(key.data(), key.size());
std::string kstr;
// Zeroing out the sequence number leads to better compression.
// If this is the bottommost level (no files in lower levels)
// and the earliest snapshot is larger than this seqno
// then we can squash the seqno to zero.
if (bottommost_level_ && ikey.sequence < earliest_snapshot_ &&
ikey.type != kTypeMerge) {
assert(ikey.type != kTypeDeletion);
// make a copy because updating in place would cause problems
// with the priority queue that is managing the input key iterator
kstr.assign(key.data(), key.size());
UpdateInternalKey(&kstr, (uint64_t)0, ikey.type);
newkey = Slice(kstr);
}
// Open output file if necessary
if (compact_->builder == nullptr) {
Status status = OpenCompactionOutputFile();
if (!status.ok()) {
return status;
}
}
SequenceNumber seqno = GetInternalKeySeqno(newkey);
if (compact_->builder->NumEntries() == 0) {
compact_->current_output()->smallest.DecodeFrom(newkey);
compact_->current_output()->smallest_seqno = seqno;
} else {
compact_->current_output()->smallest_seqno =
std::min(compact_->current_output()->smallest_seqno, seqno);
}
compact_->current_output()->largest.DecodeFrom(newkey);
compact_->builder->Add(newkey, value);
compact_->num_output_records++;
compact_->current_output()->largest_seqno =
std::max(compact_->current_output()->largest_seqno, seqno);
// Close output file if it is big enough
Status status;
if (compact_->builder->FileSize() >=
compact_->compaction->max_output_file_size()) {
status = FinishCompactionOutputFile(input_status);
}
return status;
}
void CompactionJob::RecordDroppedKeys( void CompactionJob::RecordDroppedKeys(
int64_t* key_drop_user, int64_t* key_drop_user,
int64_t* key_drop_newer_entry, int64_t* key_drop_newer_entry,
@ -967,7 +963,7 @@ void CompactionJob::CallCompactionFilterV2(
} // for } // for
} }
Status CompactionJob::FinishCompactionOutputFile(Iterator* input) { Status CompactionJob::FinishCompactionOutputFile(const Status& input_status) {
AutoThreadOperationStageUpdater stage_updater( AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_SYNC_FILE); ThreadStatus::STAGE_COMPACTION_SYNC_FILE);
assert(compact_ != nullptr); assert(compact_ != nullptr);
@ -980,7 +976,7 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) {
TableProperties table_properties; TableProperties table_properties;
// Check for iterator errors // Check for iterator errors
Status s = input->status(); Status s = input_status;
const uint64_t current_entries = compact_->builder->NumEntries(); const uint64_t current_entries = compact_->builder->NumEntries();
compact_->current_output()->need_compaction = compact_->current_output()->need_compaction =
compact_->builder->NeedCompact(); compact_->builder->NeedCompact();

@ -93,10 +93,14 @@ class CompactionJob {
// through input and compact the kv-pairs // through input and compact the kv-pairs
Status ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input, Status ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input,
bool is_compaction_v2); bool is_compaction_v2);
Status WriteKeyValue(const Slice& key, const Slice& value,
const ParsedInternalKey& ikey,
const Status& input_status);
// Call compaction_filter_v2->Filter() on kv-pairs in compact // Call compaction_filter_v2->Filter() on kv-pairs in compact
void CallCompactionFilterV2(CompactionFilterV2* compaction_filter_v2, void CallCompactionFilterV2(CompactionFilterV2* compaction_filter_v2,
uint64_t* time); uint64_t* time);
Status FinishCompactionOutputFile(Iterator* input); Status FinishCompactionOutputFile(const Status& input_status);
Status InstallCompactionResults(InstrumentedMutex* db_mutex, Status InstallCompactionResults(InstrumentedMutex* db_mutex,
const MutableCFOptions& mutable_cf_options); const MutableCFOptions& mutable_cf_options);
SequenceNumber findEarliestVisibleSnapshot( SequenceNumber findEarliestVisibleSnapshot(

Loading…
Cancel
Save