diff --git a/HISTORY.md b/HISTORY.md index e1cf2816c..40bae8c74 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,9 +5,11 @@ ### Bug Fixes * Fixed an issue for backward iteration when user defined timestamp is enabled in combination with BlobDB. +* Fixed a couple of cases where a Merge operand encountered during iteration wasn't reflected in the `internal_merge_count` PerfContext counter. ### New Features * Add statistics rocksdb.secondary.cache.filter.hits, rocksdb.secondary.cache.index.hits, and rocksdb.secondary.cache.filter.hits +* Added a new PerfContext counter `internal_merge_count_point_lookups` which tracks the number of Merge operands applied while serving point lookup queries. ## 8.0.0 (02/19/2023) ### Behavior changes diff --git a/db/db_iter.cc b/db/db_iter.cc index 32d2ba8a6..a6417ef11 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -521,6 +521,8 @@ bool DBIter::MergeValuesNewToOld() { // Start the merge process by pushing the first operand merge_context_.PushOperand( iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */); + PERF_COUNTER_ADD(internal_merge_count, 1); + TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:PushedFirstOperand"); ParsedInternalKey ikey; @@ -1159,6 +1161,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { merge_context_.Clear(); merge_context_.PushOperand( iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */); + PERF_COUNTER_ADD(internal_merge_count, 1); + while (true) { iter_.Next(); diff --git a/db/memtable.cc b/db/memtable.cc index 09246fda6..0c9749d29 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -1230,6 +1230,8 @@ static bool SaveValue(void* arg, const char* entry) { *(s->merge_in_progress) = true; merge_context->PushOperand( v, s->inplace_update_support == false /* operand_pinned */); + PERF_COUNTER_ADD(internal_merge_count_point_lookups, 1); + if (s->do_merge && merge_operator->ShouldMerge( merge_context->GetOperandsDirectionBackward())) { if (s->value || s->columns) { diff --git a/db/perf_context_test.cc b/db/perf_context_test.cc index 8694252a2..b0f85a105 100644 --- a/db/perf_context_test.cc +++ b/db/perf_context_test.cc @@ -964,6 +964,159 @@ TEST_F(PerfContextTest, CPUTimer) { ASSERT_EQ(count, get_perf_context()->iter_seek_cpu_nanos); } } + +TEST_F(PerfContextTest, MergeOperandCount) { + ASSERT_OK(DestroyDB(kDbName, Options())); + + DB* db = nullptr; + Options options; + options.create_if_missing = true; + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + + ASSERT_OK(DB::Open(options, kDbName, &db)); + std::unique_ptr db_guard(db); + + constexpr size_t num_keys = 3; + const std::string key_prefix("key"); + const std::string value_prefix("value"); + + std::vector keys; + keys.reserve(num_keys); + + for (size_t i = 0; i < num_keys; ++i) { + keys.emplace_back(key_prefix + std::to_string(i)); + } + + // Write three keys with one Put each followed by 1, 2, and 3 + // Merge operations respectively. + constexpr size_t total_merges = num_keys * (num_keys + 1) / 2; + + std::vector snapshots; + snapshots.reserve(total_merges); + + for (size_t i = 0; i < num_keys; ++i) { + const std::string suffix = std::to_string(i); + const std::string value = value_prefix + suffix; + + ASSERT_OK(db->Put(WriteOptions(), keys[i], value)); + + for (size_t j = 0; j <= i; ++j) { + // Take a snapshot before each Merge so they are preserved and not + // collapsed during flush. + snapshots.emplace_back(db); + + ASSERT_OK(db->Merge(WriteOptions(), keys[i], value + std::to_string(j))); + } + } + + auto verify = [&]() { + get_perf_context()->Reset(); + + for (size_t i = 0; i < num_keys; ++i) { + // Get + { + PinnableSlice result; + ASSERT_OK(db->Get(ReadOptions(), db->DefaultColumnFamily(), keys[i], + &result)); + ASSERT_EQ(get_perf_context()->internal_merge_count_point_lookups, + i + 1); + + get_perf_context()->Reset(); + } + + // GetEntity + { + PinnableWideColumns result; + ASSERT_OK(db->GetEntity(ReadOptions(), db->DefaultColumnFamily(), + keys[i], &result)); + ASSERT_EQ(get_perf_context()->internal_merge_count_point_lookups, + i + 1); + + get_perf_context()->Reset(); + } + } + + { + std::vector key_slices; + key_slices.reserve(num_keys); + + for (size_t i = 0; i < num_keys; ++i) { + key_slices.emplace_back(keys[i]); + } + + // MultiGet + { + std::vector results(num_keys); + std::vector statuses(num_keys); + + db->MultiGet(ReadOptions(), db->DefaultColumnFamily(), num_keys, + &key_slices[0], &results[0], &statuses[0]); + + for (size_t i = 0; i < num_keys; ++i) { + ASSERT_OK(statuses[i]); + } + + ASSERT_EQ(get_perf_context()->internal_merge_count_point_lookups, + total_merges); + + get_perf_context()->Reset(); + } + + // MultiGetEntity + { + std::vector results(num_keys); + std::vector statuses(num_keys); + + db->MultiGetEntity(ReadOptions(), db->DefaultColumnFamily(), num_keys, + &key_slices[0], &results[0], &statuses[0]); + + for (size_t i = 0; i < num_keys; ++i) { + ASSERT_OK(statuses[i]); + } + + ASSERT_EQ(get_perf_context()->internal_merge_count_point_lookups, + total_merges); + + get_perf_context()->Reset(); + } + } + + std::unique_ptr it(db->NewIterator(ReadOptions())); + + // Forward iteration + { + size_t i = 0; + + for (it->SeekToFirst(); it->Valid(); it->Next(), ++i) { + ASSERT_EQ(it->key(), keys[i]); + ASSERT_EQ(get_perf_context()->internal_merge_count, i + 1); + + get_perf_context()->Reset(); + } + } + + // Backward iteration + { + size_t i = num_keys - 1; + + for (it->SeekToLast(); it->Valid(); it->Prev(), --i) { + ASSERT_EQ(it->key(), keys[i]); + ASSERT_EQ(get_perf_context()->internal_merge_count, i + 1); + + get_perf_context()->Reset(); + } + } + }; + + // Verify counters when reading from memtable + verify(); + + // Verify counters when reading from table files + db->Flush(FlushOptions()); + + verify(); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/include/rocksdb/perf_context.h b/include/rocksdb/perf_context.h index cd1dd99f0..15dec5018 100644 --- a/include/rocksdb/perf_context.h +++ b/include/rocksdb/perf_context.h @@ -135,9 +135,14 @@ struct PerfContext { // than the snapshot that iterator is using. // uint64_t internal_recent_skipped_count; - // How many values were fed into merge operator by iterators. + // How many merge operands were fed into the merge operator by iterators. + // Note: base values are not included in the count. // uint64_t internal_merge_count; + // How many merge operands were fed into the merge operator by point lookups. + // Note: base values are not included in the count. + // + uint64_t internal_merge_count_point_lookups; // Number of times we reseeked inside a merging iterator, specifically to skip // after or before a range of keys covered by a range deletion in a newer LSM // component. diff --git a/monitoring/perf_context.cc b/monitoring/perf_context.cc index 9068ede01..23745bbd0 100644 --- a/monitoring/perf_context.cc +++ b/monitoring/perf_context.cc @@ -69,6 +69,7 @@ PerfContext::PerfContext(const PerfContext& other) { internal_delete_skipped_count = other.internal_delete_skipped_count; internal_recent_skipped_count = other.internal_recent_skipped_count; internal_merge_count = other.internal_merge_count; + internal_merge_count_point_lookups = other.internal_merge_count_point_lookups; internal_range_del_reseek_count = other.internal_range_del_reseek_count; write_wal_time = other.write_wal_time; get_snapshot_time = other.get_snapshot_time; @@ -188,6 +189,7 @@ PerfContext::PerfContext(PerfContext&& other) noexcept { internal_delete_skipped_count = other.internal_delete_skipped_count; internal_recent_skipped_count = other.internal_recent_skipped_count; internal_merge_count = other.internal_merge_count; + internal_merge_count_point_lookups = other.internal_merge_count_point_lookups; internal_range_del_reseek_count = other.internal_range_del_reseek_count; write_wal_time = other.write_wal_time; get_snapshot_time = other.get_snapshot_time; @@ -309,6 +311,7 @@ PerfContext& PerfContext::operator=(const PerfContext& other) { internal_delete_skipped_count = other.internal_delete_skipped_count; internal_recent_skipped_count = other.internal_recent_skipped_count; internal_merge_count = other.internal_merge_count; + internal_merge_count_point_lookups = other.internal_merge_count_point_lookups; internal_range_del_reseek_count = other.internal_range_del_reseek_count; write_wal_time = other.write_wal_time; get_snapshot_time = other.get_snapshot_time; @@ -422,6 +425,7 @@ void PerfContext::Reset() { internal_delete_skipped_count = 0; internal_recent_skipped_count = 0; internal_merge_count = 0; + internal_merge_count_point_lookups = 0; internal_range_del_reseek_count = 0; write_wal_time = 0; @@ -556,6 +560,7 @@ std::string PerfContext::ToString(bool exclude_zero_counters) const { PERF_CONTEXT_OUTPUT(internal_delete_skipped_count); PERF_CONTEXT_OUTPUT(internal_recent_skipped_count); PERF_CONTEXT_OUTPUT(internal_merge_count); + PERF_CONTEXT_OUTPUT(internal_merge_count_point_lookups); PERF_CONTEXT_OUTPUT(internal_range_del_reseek_count); PERF_CONTEXT_OUTPUT(write_wal_time); PERF_CONTEXT_OUTPUT(get_snapshot_time); diff --git a/table/get_context.cc b/table/get_context.cc index 73b94b596..409604a5c 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -442,6 +442,8 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, state_ = kMerge; // value_pinner is not set from plain_table_reader.cc for example. push_operand(value, value_pinner); + PERF_COUNTER_ADD(internal_merge_count_point_lookups, 1); + if (do_merge_ && merge_operator_ != nullptr && merge_operator_->ShouldMerge( merge_context_->GetOperandsDirectionBackward())) {