From 3d1a924ff346afaaad1f1774f93d5b0ab77b7292 Mon Sep 17 00:00:00 2001 From: Anurag Indu Date: Tue, 3 Mar 2015 10:59:36 -0800 Subject: [PATCH] Adding stats for the merge and filter operation Summary: We have addded new stats and perf_context for measuring the merge and filter operation time consumption. We have bounded all the merge operations within the GUARD statment and collected the total time for these operations in the DB. Test Plan: WIP Reviewers: rven, yhchiang, kradhakrishnan, igor, sdong Reviewed By: sdong Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D34377 --- db/compaction_job.cc | 29 +++- db/compaction_job.h | 3 +- db/db_iter.cc | 60 ++++++-- db/db_test.cc | 156 +++++++++++++++++++- db/memtable.cc | 33 ++++- db/memtable.h | 2 + db/merge_helper.cc | 65 +++++--- db/merge_helper.h | 3 +- db/version_set.cc | 5 +- db/version_set.h | 2 + db/write_batch.cc | 14 +- include/rocksdb/perf_context.h | 2 + include/rocksdb/statistics.h | 4 + table/get_context.cc | 58 +++++--- table/get_context.h | 8 +- table/table_test.cc | 8 +- util/perf_context.cc | 41 ++--- utilities/compacted_db/compacted_db_impl.cc | 5 +- 18 files changed, 382 insertions(+), 116 deletions(-) diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 72d9406f1..57dd9ee3b 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -322,6 +322,7 @@ Status CompactionJob::Run() { shared_ptr backup_input( versions_->MakeInputIterator(compact_->compaction)); backup_input->SeekToFirst(); + uint64_t total_filter_time = 0; while (backup_input->Valid() && !shutting_down_->load(std::memory_order_acquire) && !cfd->IsDropped()) { @@ -369,7 +370,9 @@ Status CompactionJob::Run() { // Now prefix changes, this batch is done. // Call compaction filter on the buffered values to change the value if (compact_->key_str_buf_.size() > 0) { - CallCompactionFilterV2(compaction_filter_v2); + uint64_t time = 0; + CallCompactionFilterV2(compaction_filter_v2, &time); + total_filter_time += time; } compact_->cur_prefix_ = key_prefix.ToString(); } @@ -401,7 +404,9 @@ Status CompactionJob::Run() { if (!backup_input->Valid()) { // If this is the single last value, we need to merge it. if (compact_->key_str_buf_.size() > 0) { - CallCompactionFilterV2(compaction_filter_v2); + uint64_t time = 0; + CallCompactionFilterV2(compaction_filter_v2, &time); + total_filter_time += time; } compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator()); @@ -417,11 +422,14 @@ Status CompactionJob::Run() { // finish the last batch if (status.ok()) { if (compact_->key_str_buf_.size() > 0) { - CallCompactionFilterV2(compaction_filter_v2); + uint64_t time = 0; + CallCompactionFilterV2(compaction_filter_v2, &time); + total_filter_time += time; } compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator()); status = ProcessKeyValueCompaction(&imm_micros, input.get(), true); } + RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time); } // checking for compaction filter v2 if (status.ok() && @@ -556,6 +564,9 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, int64_t key_drop_newer_entry = 0; int64_t key_drop_obsolete = 0; int64_t loop_cnt = 0; + + StopWatchNano timer(env_, stats_ != nullptr); + uint64_t total_filter_time = 0; while (input->Valid() && !shutting_down_->load(std::memory_order_acquire) && !cfd->IsDropped() && status.ok()) { compact_->num_input_records++; @@ -642,9 +653,13 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, // the entry with a delete marker. bool value_changed = false; compaction_filter_value.clear(); + if (stats_ != nullptr) { + timer.Start(); + } bool to_delete = compaction_filter->Filter( compact_->compaction->level(), ikey.user_key, value, &compaction_filter_value, &value_changed); + total_filter_time += timer.ElapsedNanos(); if (to_delete) { // make a copy of the original key and convert it to a delete delete_key.SetInternalKey(ExtractUserKey(key), ikey.sequence, @@ -712,7 +727,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, // optimization in BuildTable. int steps = 0; merge.MergeUntil(input, prev_snapshot, bottommost_level_, - db_options_.statistics.get(), &steps); + db_options_.statistics.get(), &steps, env_); // Skip the Merge ops combined_idx = combined_idx - 1 + steps; @@ -844,6 +859,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, input->Next(); } } + RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time); if (key_drop_user > 0) { RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user); } @@ -859,7 +875,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, } void CompactionJob::CallCompactionFilterV2( - CompactionFilterV2* compaction_filter_v2) { + CompactionFilterV2* compaction_filter_v2, uint64_t* time) { if (compact_ == nullptr || compaction_filter_v2 == nullptr) { return; } @@ -889,10 +905,11 @@ void CompactionJob::CallCompactionFilterV2( // filter. // If the return value of the compaction filter is true, replace // the entry with a delete marker. + StopWatchNano timer(env_, stats_ != nullptr); compact_->to_delete_buf_ = compaction_filter_v2->Filter( compact_->compaction->level(), user_key_buf, existing_value_buf, &compact_->new_value_buf_, &compact_->value_changed_buf_); - + *time = timer.ElapsedNanos(); // new_value_buf_.size() <= to_delete__buf_.size(). "=" iff all // kv-pairs in this compaction run needs to be deleted. assert(compact_->to_delete_buf_.size() == compact_->key_str_buf_.size()); diff --git a/db/compaction_job.h b/db/compaction_job.h index 0db7b6f06..512db9f97 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -84,7 +84,8 @@ class CompactionJob { Status ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input, bool is_compaction_v2); // Call compaction_filter_v2->Filter() on kv-pairs in compact - void CallCompactionFilterV2(CompactionFilterV2* compaction_filter_v2); + void CallCompactionFilterV2(CompactionFilterV2* compaction_filter_v2, + uint64_t* time); Status FinishCompactionOutputFile(Iterator* input); Status InstallCompactionResults(InstrumentedMutex* db_mutex); SequenceNumber findEarliestVisibleSnapshot( diff --git a/db/db_iter.cc b/db/db_iter.cc index 799cec97c..3ad90ffd4 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -313,8 +313,14 @@ void DBIter::MergeValuesNewToOld() { // final result in saved_value_. We are done! // ignore corruption if there is any. const Slice val = iter_->value(); - user_merge_operator_->FullMerge(ikey.user_key, &val, operands, - &saved_value_, logger_); + { + StopWatchNano timer(env_, statistics_ != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + user_merge_operator_->FullMerge(ikey.user_key, &val, operands, + &saved_value_, logger_); + RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, + timer.ElapsedNanos()); + } // iter_ is positioned after put iter_->Next(); return; @@ -328,12 +334,17 @@ void DBIter::MergeValuesNewToOld() { } } - // we either exhausted all internal keys under this user key, or hit - // a deletion marker. - // feed null as the existing value to the merge operator, such that - // client can differentiate this scenario and do things accordingly. - user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands, - &saved_value_, logger_); + { + StopWatchNano timer(env_, statistics_ != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + // we either exhausted all internal keys under this user key, or hit + // a deletion marker. + // feed null as the existing value to the merge operator, such that + // client can differentiate this scenario and do things accordingly. + user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands, + &saved_value_, logger_); + RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos()); + } } void DBIter::Prev() { @@ -434,14 +445,24 @@ bool DBIter::FindValueForCurrentKey() { return false; case kTypeMerge: if (last_not_merge_type == kTypeDeletion) { + StopWatchNano timer(env_, statistics_ != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands, &saved_value_, logger_); + RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, + timer.ElapsedNanos()); } else { assert(last_not_merge_type == kTypeValue); std::string last_put_value = saved_value_; Slice temp_slice(last_put_value); - user_merge_operator_->FullMerge(saved_key_.GetKey(), &temp_slice, - operands, &saved_value_, logger_); + { + StopWatchNano timer(env_, statistics_ != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + user_merge_operator_->FullMerge(saved_key_.GetKey(), &temp_slice, + operands, &saved_value_, logger_); + RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, + timer.ElapsedNanos()); + } } break; case kTypeValue: @@ -492,9 +513,13 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { if (!iter_->Valid() || (user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) != 0) || ikey.type == kTypeDeletion) { - user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands, - &saved_value_, logger_); - + { + StopWatchNano timer(env_, statistics_ != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands, + &saved_value_, logger_); + RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos()); + } // Make iter_ valid and point to saved_key_ if (!iter_->Valid() || (user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) != 0)) { @@ -506,8 +531,13 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { } const Slice& val = iter_->value(); - user_merge_operator_->FullMerge(saved_key_.GetKey(), &val, operands, - &saved_value_, logger_); + { + StopWatchNano timer(env_, statistics_ != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + user_merge_operator_->FullMerge(saved_key_.GetKey(), &val, operands, + &saved_value_, logger_); + RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos()); + } valid_ = true; return true; } diff --git a/db/db_test.cc b/db/db_test.cc index 8624ccf88..57558dbf6 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -394,6 +394,10 @@ class SpecialEnv : public EnvWrapper { } return s; } + + virtual uint64_t NowNanos() override { + return target()->NowNanos() + addon_time_ * 1000; + } }; class DBTest : public testing::Test { @@ -3755,6 +3759,22 @@ class DeleteFilter : public CompactionFilter { virtual const char* Name() const override { return "DeleteFilter"; } }; +class DelayFilter : public CompactionFilter { + public: + explicit DelayFilter(DBTest* d) : db_test(d) {} + virtual bool Filter(int level, const Slice& key, const Slice& value, + std::string* new_value, + bool* value_changed) const override { + db_test->env_->addon_time_ += 1000; + return true; + } + + virtual const char* Name() const override { return "DelayFilter"; } + + private: + DBTest* db_test; +}; + class ConditionalFilter : public CompactionFilter { public: explicit ConditionalFilter(const std::string* filtered_value) @@ -3821,6 +3841,20 @@ class DeleteFilterFactory : public CompactionFilterFactory { virtual const char* Name() const override { return "DeleteFilterFactory"; } }; +class DelayFilterFactory : public CompactionFilterFactory { + public: + explicit DelayFilterFactory(DBTest* d) : db_test(d) {} + virtual std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& context) override { + return std::unique_ptr(new DelayFilter(db_test)); + } + + virtual const char* Name() const override { return "DelayFilterFactory"; } + + private: + DBTest* db_test; +}; + class ConditionalFilterFactory : public CompactionFilterFactory { public: explicit ConditionalFilterFactory(const Slice& filtered_value) @@ -10216,14 +10250,10 @@ TEST_F(DBTest, ThreadStatusSingleCompaction) { const int kNumL0Files = 4; options.level0_file_num_compaction_trigger = kNumL0Files; - rocksdb::SyncPoint::GetInstance()->LoadDependency({ - {"DBTest::ThreadStatusSingleCompaction:0", - "DBImpl::BGWorkCompaction"}, - {"CompactionJob::Run():Start", - "DBTest::ThreadStatusSingleCompaction:1"}, - {"DBTest::ThreadStatusSingleCompaction:2", - "CompactionJob::Run():End"}, + {"DBTest::ThreadStatusSingleCompaction:0", "DBImpl::BGWorkCompaction"}, + {"CompactionJob::Run():Start", "DBTest::ThreadStatusSingleCompaction:1"}, + {"DBTest::ThreadStatusSingleCompaction:2", "CompactionJob::Run():End"}, }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); @@ -11810,6 +11840,118 @@ TEST_F(DBTest, CloseSpeedup) { Destroy(options); } + +class DelayedMergeOperator : public AssociativeMergeOperator { + private: + DBTest* db_test_; + + public: + explicit DelayedMergeOperator(DBTest* d) : db_test_(d) {} + virtual bool Merge(const Slice& key, const Slice* existing_value, + const Slice& value, std::string* new_value, + Logger* logger) const override { + db_test_->env_->addon_time_ += 1000; + return true; + } + + virtual const char* Name() const override { return "DelayedMergeOperator"; } +}; + +TEST_F(DBTest, MergeTestTime) { + std::string one, two, three; + PutFixed64(&one, 1); + PutFixed64(&two, 2); + PutFixed64(&three, 3); + + // Enable time profiling + SetPerfLevel(kEnableTime); + this->env_->addon_time_ = 0; + Options options; + options = CurrentOptions(options); + options.statistics = rocksdb::CreateDBStatistics(); + options.merge_operator.reset(new DelayedMergeOperator(this)); + DestroyAndReopen(options); + + ASSERT_EQ(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 0); + db_->Put(WriteOptions(), "foo", one); + ASSERT_OK(Flush()); + ASSERT_OK(db_->Merge(WriteOptions(), "foo", two)); + ASSERT_OK(Flush()); + ASSERT_OK(db_->Merge(WriteOptions(), "foo", three)); + ASSERT_OK(Flush()); + + ReadOptions opt; + opt.verify_checksums = true; + opt.snapshot = nullptr; + std::string result; + db_->Get(opt, "foo", &result); + + ASSERT_LT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 2100000); + ASSERT_GT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 1900000); + + ReadOptions read_options; + std::unique_ptr iter(db_->NewIterator(read_options)); + int count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + ++count; + } + + ASSERT_EQ(1, count); + + ASSERT_LT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 4200000); + ASSERT_GT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 3800000); +} + +TEST_F(DBTest, MergeCompactionTimeTest) { + SetPerfLevel(kEnableTime); + Options options; + options = CurrentOptions(options); + options.compaction_filter_factory = std::make_shared(); + options.statistics = rocksdb::CreateDBStatistics(); + options.merge_operator.reset(new DelayedMergeOperator(this)); + options.compaction_style = kCompactionStyleUniversal; + DestroyAndReopen(options); + + for (int i = 0; i < 1000; i++) { + ASSERT_OK(db_->Merge(WriteOptions(), "foo", "TEST")); + ASSERT_OK(Flush()); + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + + ASSERT_NE(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 0); +} + +TEST_F(DBTest, FilterCompactionTimeTest) { + Options options; + options.compaction_filter_factory = + std::make_shared(this); + options.disable_auto_compactions = true; + options.create_if_missing = true; + options.statistics = rocksdb::CreateDBStatistics(); + options = CurrentOptions(options); + DestroyAndReopen(options); + + // put some data + for (int table = 0; table < 4; ++table) { + for (int i = 0; i < 10 + table; ++i) { + Put(ToString(table * 100 + i), "val"); + } + Flush(); + } + + ASSERT_OK(db_->CompactRange(nullptr, nullptr)); + ASSERT_EQ(0U, CountLiveFiles()); + + Reopen(options); + + Iterator* itr = db_->NewIterator(ReadOptions()); + itr->SeekToFirst(); + ASSERT_NE(TestGetTickerCount(options, FILTER_OPERATION_TOTAL_TIME), 0); + delete itr; +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/memtable.cc b/db/memtable.cc index f94f97ce8..76392d6cc 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -76,7 +76,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp, : 0), prefix_extractor_(ioptions.prefix_extractor), should_flush_(ShouldFlushNow()), - flush_scheduled_(false) { + flush_scheduled_(false), + env_(ioptions.env) { // if should_flush_ == true without an entry inserted, something must have // gone wrong already. assert(!should_flush_); @@ -349,6 +350,7 @@ struct Saver { Logger* logger; Statistics* statistics; bool inplace_update_support; + Env* env_; }; } // namespace @@ -383,9 +385,17 @@ static bool SaveValue(void* arg, const char* entry) { *(s->status) = Status::OK(); if (*(s->merge_in_progress)) { assert(merge_operator); - if (!merge_operator->FullMerge(s->key->user_key(), &v, - merge_context->GetOperands(), s->value, - s->logger)) { + bool merge_success = false; + { + StopWatchNano timer(s->env_, s->statistics != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + merge_success = merge_operator->FullMerge( + s->key->user_key(), &v, merge_context->GetOperands(), s->value, + s->logger); + RecordTick(s->statistics, MERGE_OPERATION_TOTAL_TIME, + timer.ElapsedNanos()); + } + if (!merge_success) { RecordTick(s->statistics, NUMBER_MERGE_FAILURES); *(s->status) = Status::Corruption("Error: Could not perform merge."); @@ -403,9 +413,17 @@ static bool SaveValue(void* arg, const char* entry) { if (*(s->merge_in_progress)) { assert(merge_operator); *(s->status) = Status::OK(); - if (!merge_operator->FullMerge(s->key->user_key(), nullptr, - merge_context->GetOperands(), s->value, - s->logger)) { + bool merge_success = false; + { + StopWatchNano timer(s->env_, s->statistics != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + merge_success = merge_operator->FullMerge( + s->key->user_key(), nullptr, merge_context->GetOperands(), + s->value, s->logger); + RecordTick(s->statistics, MERGE_OPERATION_TOTAL_TIME, + timer.ElapsedNanos()); + } + if (!merge_success) { RecordTick(s->statistics, NUMBER_MERGE_FAILURES); *(s->status) = Status::Corruption("Error: Could not perform merge."); @@ -472,6 +490,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, saver.logger = moptions_.info_log; saver.inplace_update_support = moptions_.inplace_update_support; saver.statistics = moptions_.statistics; + saver.env_ = env_; table_->Get(key, &saver, SaveValue); } diff --git a/db/memtable.h b/db/memtable.h index 4234d5dfd..66fc98282 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -17,6 +17,7 @@ #include "db/skiplist.h" #include "db/version_edit.h" #include "rocksdb/db.h" +#include "rocksdb/env.h" #include "rocksdb/memtablerep.h" #include "rocksdb/immutable_options.h" #include "db/memtable_allocator.h" @@ -261,6 +262,7 @@ class MemTable { // a flag indicating if flush has been scheduled bool flush_scheduled_; + Env* env_; }; extern const char* EncodeKey(std::string* scratch, const Slice& target); diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 11b5d8f47..3a3cbe366 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -11,6 +11,8 @@ #include "util/statistics.h" #include #include +#include "util/perf_context_imp.h" +#include "util/stop_watch.h" namespace rocksdb { @@ -21,7 +23,8 @@ namespace rocksdb { // operands_ stores the list of merge operands encountered while merging. // keys_[i] corresponds to operands_[i] for each i. void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, - bool at_bottom, Statistics* stats, int* steps) { + bool at_bottom, Statistics* stats, int* steps, + Env* env_) { // Get a copy of the internal key, before it's invalidated by iter->Next() // Also maintain the list of merge operands seen. assert(HasOperator()); @@ -78,10 +81,14 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // => store result in operands_.back() (and update keys_.back()) // => change the entry type to kTypeValue for keys_.back() // We are done! Return a success if the merge passes. - success_ = user_merge_operator_->FullMerge(ikey.user_key, nullptr, - operands_, &merge_result, - logger_); - + { + StopWatchNano timer(env_, stats != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + success_ = user_merge_operator_->FullMerge( + ikey.user_key, nullptr, operands_, &merge_result, logger_); + RecordTick(stats, MERGE_OPERATION_TOTAL_TIME, + env_ != nullptr ? timer.ElapsedNanos() : 0); + } // We store the result in keys_.back() and operands_.back() // if nothing went wrong (i.e.: no operand corruption on disk) if (success_) { @@ -110,9 +117,14 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // => change the entry type to kTypeValue for keys_.back() // We are done! Success! const Slice val = iter->value(); - success_ = user_merge_operator_->FullMerge(ikey.user_key, &val, operands_, - &merge_result, logger_); - + { + StopWatchNano timer(env_, stats != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + success_ = user_merge_operator_->FullMerge( + ikey.user_key, &val, operands_, &merge_result, logger_); + RecordTick(stats, MERGE_OPERATION_TOTAL_TIME, + env_ != nullptr ? timer.ElapsedNanos() : 0); + } // We store the result in keys_.back() and operands_.back() // if nothing went wrong (i.e.: no operand corruption on disk) if (success_) { @@ -173,10 +185,14 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, assert(kTypeMerge == orig_ikey.type); assert(operands_.size() >= 1); assert(operands_.size() == keys_.size()); - success_ = user_merge_operator_->FullMerge(orig_ikey.user_key, nullptr, - operands_, &merge_result, - logger_); - + { + StopWatchNano timer(env_, stats != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + success_ = user_merge_operator_->FullMerge( + orig_ikey.user_key, nullptr, operands_, &merge_result, logger_); + RecordTick(stats, MERGE_OPERATION_TOTAL_TIME, + env_ != nullptr ? timer.ElapsedNanos() : 0); + } if (success_) { std::string& original_key = keys_.back(); // The original key encountered orig_ikey.type = kTypeValue; @@ -195,16 +211,25 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // merge the stacked merge operands into a single operand. if (operands_.size() >= 2 && - operands_.size() >= min_partial_merge_operands_ && - user_merge_operator_->PartialMergeMulti( + operands_.size() >= min_partial_merge_operands_) { + bool merge_success = false; + { + StopWatchNano timer(env_, stats != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + merge_success = user_merge_operator_->PartialMergeMulti( orig_ikey.user_key, std::deque(operands_.begin(), operands_.end()), - &merge_result, logger_)) { - // Merging of operands (associative merge) was successful. - // Replace operands with the merge result - operands_.clear(); - operands_.push_front(std::move(merge_result)); - keys_.erase(keys_.begin(), keys_.end() - 1); + &merge_result, logger_); + RecordTick(stats, MERGE_OPERATION_TOTAL_TIME, + env_ != nullptr ? timer.ElapsedNanos() : 0); + } + if (merge_success) { + // Merging of operands (associative merge) was successful. + // Replace operands with the merge result + operands_.clear(); + operands_.push_front(std::move(merge_result)); + keys_.erase(keys_.begin(), keys_.end() - 1); + } } } } diff --git a/db/merge_helper.h b/db/merge_helper.h index 7188d02a4..69f576ca1 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -10,6 +10,7 @@ #include "rocksdb/slice.h" #include #include +#include "rocksdb/env.h" namespace rocksdb { @@ -48,7 +49,7 @@ class MergeHelper { // we could reach the start of the history of this user key. void MergeUntil(Iterator* iter, SequenceNumber stop_before = 0, bool at_bottom = false, Statistics* stats = nullptr, - int* steps = nullptr); + int* steps = nullptr, Env* env_ = nullptr); // Query the merge result // These are valid until the next MergeUntil call diff --git a/db/version_set.cc b/db/version_set.cc index c1ad0bff2..8d67bf246 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -751,7 +751,8 @@ VersionStorageInfo::VersionStorageInfo( Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, uint64_t version_number) - : cfd_(column_family_data), + : env_(vset->env_), + cfd_(column_family_data), info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->info_log), db_statistics_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->statistics), @@ -786,7 +787,7 @@ void Version::Get(const ReadOptions& read_options, GetContext get_context( user_comparator(), merge_operator_, info_log_, db_statistics_, status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, - value, value_found, merge_context); + value, value_found, merge_context, this->env_); FilePicker fp( storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_, diff --git a/db/version_set.h b/db/version_set.h index 5d56e128c..92c04dfb5 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -36,6 +36,7 @@ #include "db/log_reader.h" #include "db/file_indexer.h" #include "db/write_controller.h" +#include "rocksdb/env.h" #include "util/instrumented_mutex.h" namespace rocksdb { @@ -435,6 +436,7 @@ class Version { void GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta); private: + Env* env_; friend class VersionSet; const InternalKeyComparator* internal_comparator() const { diff --git a/db/write_batch.cc b/db/write_batch.cc index aad9a643c..52956f8a8 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -33,6 +33,7 @@ #include "util/coding.h" #include "util/statistics.h" #include +#include "util/perf_context_imp.h" namespace rocksdb { @@ -435,8 +436,17 @@ class MemTableInserter : public WriteBatch::Handler { std::deque operands; operands.push_front(value.ToString()); std::string new_value; - if (!merge_operator->FullMerge(key, &get_value_slice, operands, - &new_value, moptions->info_log)) { + bool merge_success = false; + { + StopWatchNano timer(Env::Default(), moptions->statistics != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + merge_success = merge_operator->FullMerge( + key, &get_value_slice, operands, &new_value, moptions->info_log); + RecordTick(moptions->statistics, MERGE_OPERATION_TOTAL_TIME, + timer.ElapsedNanos()); + } + + if (!merge_success) { // Failed to merge! RecordTick(moptions->statistics, NUMBER_MERGE_FAILURES); diff --git a/include/rocksdb/perf_context.h b/include/rocksdb/perf_context.h index ac9ef7977..3b8145a94 100644 --- a/include/rocksdb/perf_context.h +++ b/include/rocksdb/perf_context.h @@ -72,6 +72,8 @@ struct PerfContext { uint64_t db_mutex_lock_nanos; // time spent on acquiring DB mutex. // Time spent on waiting with a condition variable created with DB mutex. uint64_t db_condition_wait_nanos; + // Time spent on merge operator. + uint64_t merge_operator_time_nanos; }; #if defined(NPERF_CONTEXT) || defined(IOS_CROSS_COMPILE) diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index c5b364a0c..09bf68739 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -139,6 +139,8 @@ enum Tickers : uint32_t { NUMBER_SUPERVERSION_RELEASES, NUMBER_SUPERVERSION_CLEANUPS, NUMBER_BLOCK_NOT_COMPRESSED, + MERGE_OPERATION_TOTAL_TIME, + FILTER_OPERATION_TOTAL_TIME, TICKER_ENUM_MAX }; @@ -205,6 +207,8 @@ const std::vector> TickersNameMap = { {NUMBER_SUPERVERSION_RELEASES, "rocksdb.number.superversion_releases"}, {NUMBER_SUPERVERSION_CLEANUPS, "rocksdb.number.superversion_cleanups"}, {NUMBER_BLOCK_NOT_COMPRESSED, "rocksdb.number.block.not_compressed"}, + {MERGE_OPERATION_TOTAL_TIME, "rocksdb.merge.operation.time.nanos"}, + {FILTER_OPERATION_TOTAL_TIME, "rocksdb.filter.operation.time.nanos"}, }; /** diff --git a/table/get_context.cc b/table/get_context.cc index 59dfa41e6..737b3660e 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -4,27 +4,29 @@ // of patent rights can be found in the PATENTS file in the same directory. #include "table/get_context.h" +#include "rocksdb/env.h" #include "rocksdb/merge_operator.h" #include "rocksdb/statistics.h" +#include "util/perf_context_imp.h" #include "util/statistics.h" namespace rocksdb { GetContext::GetContext(const Comparator* ucmp, - const MergeOperator* merge_operator, - Logger* logger, Statistics* statistics, - GetState init_state, const Slice& user_key, std::string* ret_value, - bool* value_found, MergeContext* merge_context) - : ucmp_(ucmp), - merge_operator_(merge_operator), - logger_(logger), - statistics_(statistics), - state_(init_state), - user_key_(user_key), - value_(ret_value), - value_found_(value_found), - merge_context_(merge_context) { -} + const MergeOperator* merge_operator, Logger* logger, + Statistics* statistics, GetState init_state, + const Slice& user_key, std::string* ret_value, + bool* value_found, MergeContext* merge_context, Env* env) + : ucmp_(ucmp), + merge_operator_(merge_operator), + logger_(logger), + statistics_(statistics), + state_(init_state), + user_key_(user_key), + value_(ret_value), + value_found_(value_found), + merge_context_(merge_context), + env_(env) {} // Called from TableCache::Get and Table::Get when file/block in which // key may exist are not there in TableCache/BlockCache respectively. In this @@ -58,9 +60,17 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, } else if (kMerge == state_) { assert(merge_operator_ != nullptr); state_ = kFound; - if (!merge_operator_->FullMerge(user_key_, &value, - merge_context_->GetOperands(), - value_, logger_)) { + bool merge_success = false; + { + StopWatchNano timer(env_, statistics_ != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + merge_success = merge_operator_->FullMerge( + user_key_, &value, merge_context_->GetOperands(), value_, + logger_); + RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, + timer.ElapsedNanos()); + } + if (!merge_success) { RecordTick(statistics_, NUMBER_MERGE_FAILURES); state_ = kCorrupt; } @@ -73,9 +83,17 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, state_ = kDeleted; } else if (kMerge == state_) { state_ = kFound; - if (!merge_operator_->FullMerge(user_key_, nullptr, - merge_context_->GetOperands(), - value_, logger_)) { + bool merge_success = false; + { + StopWatchNano timer(env_, statistics_ != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + merge_success = merge_operator_->FullMerge( + user_key_, nullptr, merge_context_->GetOperands(), value_, + logger_); + RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, + timer.ElapsedNanos()); + } + if (!merge_success) { RecordTick(statistics_, NUMBER_MERGE_FAILURES); state_ = kCorrupt; } diff --git a/table/get_context.h b/table/get_context.h index a38f3c533..700f23aeb 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -6,6 +6,7 @@ #pragma once #include #include "db/merge_context.h" +#include "rocksdb/env.h" namespace rocksdb { class MergeContext; @@ -21,9 +22,9 @@ class GetContext { }; GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, - Logger* logger, Statistics* statistics, - GetState init_state, const Slice& user_key, std::string* ret_value, - bool* value_found, MergeContext* merge_context); + Logger* logger, Statistics* statistics, GetState init_state, + const Slice& user_key, std::string* ret_value, bool* value_found, + MergeContext* merge_context, Env* env_); void MarkKeyMayExist(); void SaveValue(const Slice& value); @@ -42,6 +43,7 @@ class GetContext { std::string* value_; bool* value_found_; // Is value set correctly? Used by KeyMayExist MergeContext* merge_context_; + Env* env_; }; } // namespace rocksdb diff --git a/table/table_test.cc b/table/table_test.cc index aba7f7133..09bc513df 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -1611,7 +1611,7 @@ TEST_F(BlockBasedTableTest, BlockCacheDisabledTest) { { GetContext get_context(options.comparator, nullptr, nullptr, nullptr, - GetContext::kNotFound, Slice(), nullptr, + GetContext::kNotFound, Slice(), nullptr, nullptr, nullptr, nullptr); // a hack that just to trigger BlockBasedTable::GetFilter. reader->Get(ReadOptions(), "non-exist-key", &get_context); @@ -1747,7 +1747,7 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) { ASSERT_TRUE(!reader->TEST_filter_block_preloaded()); std::string value; GetContext get_context(options.comparator, nullptr, nullptr, nullptr, - GetContext::kNotFound, user_key, &value, + GetContext::kNotFound, user_key, &value, nullptr, nullptr, nullptr); ASSERT_OK(reader->Get(ReadOptions(), user_key, &get_context)); ASSERT_EQ(value, "hello"); @@ -2003,8 +2003,8 @@ TEST_F(MemTableTest, Simple) { options.memtable_factory = table_factory; ImmutableCFOptions ioptions(options); WriteBuffer wb(options.db_write_buffer_size); - MemTable* memtable = new MemTable(cmp, ioptions, - MutableCFOptions(options, ioptions), &wb); + MemTable* memtable = + new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb); memtable->Ref(); WriteBatch batch; WriteBatchInternal::SetSequence(&batch, 100); diff --git a/util/perf_context.cc b/util/perf_context.cc index dfc818e8b..7be9980e5 100644 --- a/util/perf_context.cc +++ b/util/perf_context.cc @@ -55,6 +55,7 @@ void PerfContext::Reset() { write_memtable_time = 0; db_mutex_lock_nanos = 0; db_condition_wait_nanos = 0; + merge_operator_time_nanos = 0; #endif } @@ -65,32 +66,20 @@ std::string PerfContext::ToString() const { return ""; #else std::ostringstream ss; - ss << OUTPUT(user_key_comparison_count) - << OUTPUT(block_cache_hit_count) - << OUTPUT(block_read_count) - << OUTPUT(block_read_byte) - << OUTPUT(block_read_time) - << OUTPUT(block_checksum_time) - << OUTPUT(block_decompress_time) - << OUTPUT(internal_key_skipped_count) - << OUTPUT(internal_delete_skipped_count) - << OUTPUT(write_wal_time) - << OUTPUT(get_snapshot_time) - << OUTPUT(get_from_memtable_time) - << OUTPUT(get_from_memtable_count) - << OUTPUT(get_post_process_time) - << OUTPUT(get_from_output_files_time) - << OUTPUT(seek_on_memtable_time) - << OUTPUT(seek_on_memtable_count) - << OUTPUT(seek_child_seek_time) - << OUTPUT(seek_child_seek_count) - << OUTPUT(seek_min_heap_time) - << OUTPUT(seek_internal_seek_time) - << OUTPUT(find_next_user_entry_time) - << OUTPUT(write_pre_and_post_process_time) - << OUTPUT(write_memtable_time) - << OUTPUT(db_mutex_lock_nanos) - << OUTPUT(db_condition_wait_nanos); + ss << OUTPUT(user_key_comparison_count) << OUTPUT(block_cache_hit_count) + << OUTPUT(block_read_count) << OUTPUT(block_read_byte) + << OUTPUT(block_read_time) << OUTPUT(block_checksum_time) + << OUTPUT(block_decompress_time) << OUTPUT(internal_key_skipped_count) + << OUTPUT(internal_delete_skipped_count) << OUTPUT(write_wal_time) + << OUTPUT(get_snapshot_time) << OUTPUT(get_from_memtable_time) + << OUTPUT(get_from_memtable_count) << OUTPUT(get_post_process_time) + << OUTPUT(get_from_output_files_time) << OUTPUT(seek_on_memtable_time) + << OUTPUT(seek_on_memtable_count) << OUTPUT(seek_child_seek_time) + << OUTPUT(seek_child_seek_count) << OUTPUT(seek_min_heap_time) + << OUTPUT(seek_internal_seek_time) << OUTPUT(find_next_user_entry_time) + << OUTPUT(write_pre_and_post_process_time) << OUTPUT(write_memtable_time) + << OUTPUT(db_mutex_lock_nanos) << OUTPUT(db_condition_wait_nanos) + << OUTPUT(merge_operator_time_nanos); return ss.str(); #endif } diff --git a/utilities/compacted_db/compacted_db_impl.cc b/utilities/compacted_db/compacted_db_impl.cc index 3bd27e46a..102e35728 100644 --- a/utilities/compacted_db/compacted_db_impl.cc +++ b/utilities/compacted_db/compacted_db_impl.cc @@ -45,7 +45,8 @@ size_t CompactedDBImpl::FindFile(const Slice& key) { Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*, const Slice& key, std::string* value) { GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, - GetContext::kNotFound, key, value, nullptr, nullptr); + GetContext::kNotFound, key, value, nullptr, nullptr, + nullptr); LookupKey lkey(key, kMaxSequenceNumber); files_.files[FindFile(key)].fd.table_reader->Get( options, lkey.internal_key(), &get_context); @@ -76,7 +77,7 @@ std::vector CompactedDBImpl::MultiGet(const ReadOptions& options, if (r != nullptr) { GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, GetContext::kNotFound, keys[idx], &(*values)[idx], - nullptr, nullptr); + nullptr, nullptr, nullptr); LookupKey lkey(keys[idx], kMaxSequenceNumber); r->Get(options, lkey.internal_key(), &get_context); if (get_context.State() == GetContext::kFound) {