reduce recordTick overhead in compaction loop

Summary: It is too expensive to bump ticker to every key/vaue pair

Test Plan: make release

Reviewers: sdong, yhchiang, igor

Reviewed By: igor

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D22527
main
Lei Jin 11 years ago
parent 22a0a60dc4
commit 722d80c374
  1. 36
      db/db_impl.cc

@ -2634,9 +2634,29 @@ Status DBImpl::ProcessKeyValueCompaction(
compaction_filter = compaction_filter_from_factory.get(); compaction_filter = compaction_filter_from_factory.get();
} }
int64_t key_drop_user = 0;
int64_t key_drop_newer_entry = 0;
int64_t key_drop_obsolete = 0;
int64_t loop_cnt = 0;
while (input->Valid() && !shutting_down_.Acquire_Load() && while (input->Valid() && !shutting_down_.Acquire_Load() &&
!cfd->IsDropped()) { !cfd->IsDropped()) {
if (++loop_cnt > 1000) {
if (key_drop_user > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user);
key_drop_user = 0;
}
if (key_drop_newer_entry > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY,
key_drop_newer_entry);
key_drop_newer_entry = 0;
}
if (key_drop_obsolete > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, key_drop_obsolete);
key_drop_obsolete = 0;
}
RecordCompactionIOStats(); RecordCompactionIOStats();
loop_cnt = 0;
}
// FLUSH preempts compaction // FLUSH preempts compaction
// TODO(icanadi) this currently only checks if flush is necessary on // TODO(icanadi) this currently only checks if flush is necessary on
// compacting column family. we should also check if flush is necessary on // compacting column family. we should also check if flush is necessary on
@ -2717,7 +2737,7 @@ Status DBImpl::ProcessKeyValueCompaction(
ParseInternalKey(key, &ikey); ParseInternalKey(key, &ikey);
// no value associated with delete // no value associated with delete
value.clear(); value.clear();
RecordTick(stats_, COMPACTION_KEY_DROP_USER); ++key_drop_user;
} else if (value_changed) { } else if (value_changed) {
value = compaction_filter_value; value = compaction_filter_value;
} }
@ -2741,7 +2761,7 @@ Status DBImpl::ProcessKeyValueCompaction(
// TODO: why not > ? // TODO: why not > ?
assert(last_sequence_for_key >= ikey.sequence); assert(last_sequence_for_key >= ikey.sequence);
drop = true; // (A) drop = true; // (A)
RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY); ++key_drop_newer_entry;
} else if (ikey.type == kTypeDeletion && } else if (ikey.type == kTypeDeletion &&
ikey.sequence <= earliest_snapshot && ikey.sequence <= earliest_snapshot &&
compact->compaction->KeyNotExistsBeyondOutputLevel(ikey.user_key)) { compact->compaction->KeyNotExistsBeyondOutputLevel(ikey.user_key)) {
@ -2753,7 +2773,7 @@ Status DBImpl::ProcessKeyValueCompaction(
// few iterations of this loop (by rule (A) above). // few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped. // Therefore this deletion marker is obsolete and can be dropped.
drop = true; drop = true;
RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE); ++key_drop_obsolete;
} else if (ikey.type == kTypeMerge) { } else if (ikey.type == kTypeMerge) {
if (!merge.HasOperator()) { if (!merge.HasOperator()) {
LogToBuffer(log_buffer, "Options::merge_operator is null."); LogToBuffer(log_buffer, "Options::merge_operator is null.");
@ -2900,7 +2920,15 @@ Status DBImpl::ProcessKeyValueCompaction(
input->Next(); input->Next();
} }
} }
if (key_drop_user > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user);
}
if (key_drop_newer_entry > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, key_drop_newer_entry);
}
if (key_drop_obsolete > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, key_drop_obsolete);
}
RecordCompactionIOStats(); RecordCompactionIOStats();
return status; return status;

Loading…
Cancel
Save