diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 72d9406f1..57dd9ee3b 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -322,6 +322,7 @@ Status CompactionJob::Run() { shared_ptr backup_input( versions_->MakeInputIterator(compact_->compaction)); backup_input->SeekToFirst(); + uint64_t total_filter_time = 0; while (backup_input->Valid() && !shutting_down_->load(std::memory_order_acquire) && !cfd->IsDropped()) { @@ -369,7 +370,9 @@ Status CompactionJob::Run() { // Now prefix changes, this batch is done. // Call compaction filter on the buffered values to change the value if (compact_->key_str_buf_.size() > 0) { - CallCompactionFilterV2(compaction_filter_v2); + uint64_t time = 0; + CallCompactionFilterV2(compaction_filter_v2, &time); + total_filter_time += time; } compact_->cur_prefix_ = key_prefix.ToString(); } @@ -401,7 +404,9 @@ Status CompactionJob::Run() { if (!backup_input->Valid()) { // If this is the single last value, we need to merge it. if (compact_->key_str_buf_.size() > 0) { - CallCompactionFilterV2(compaction_filter_v2); + uint64_t time = 0; + CallCompactionFilterV2(compaction_filter_v2, &time); + total_filter_time += time; } compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator()); @@ -417,11 +422,14 @@ Status CompactionJob::Run() { // finish the last batch if (status.ok()) { if (compact_->key_str_buf_.size() > 0) { - CallCompactionFilterV2(compaction_filter_v2); + uint64_t time = 0; + CallCompactionFilterV2(compaction_filter_v2, &time); + total_filter_time += time; } compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator()); status = ProcessKeyValueCompaction(&imm_micros, input.get(), true); } + RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time); } // checking for compaction filter v2 if (status.ok() && @@ -556,6 +564,9 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, int64_t key_drop_newer_entry = 0; int64_t key_drop_obsolete = 0; int64_t loop_cnt = 0; + + StopWatchNano timer(env_, stats_ != nullptr); + uint64_t total_filter_time = 0; while (input->Valid() && !shutting_down_->load(std::memory_order_acquire) && !cfd->IsDropped() && status.ok()) { compact_->num_input_records++; @@ -642,9 +653,13 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, // the entry with a delete marker. bool value_changed = false; compaction_filter_value.clear(); + if (stats_ != nullptr) { + timer.Start(); + } bool to_delete = compaction_filter->Filter( compact_->compaction->level(), ikey.user_key, value, &compaction_filter_value, &value_changed); + total_filter_time += timer.ElapsedNanos(); if (to_delete) { // make a copy of the original key and convert it to a delete delete_key.SetInternalKey(ExtractUserKey(key), ikey.sequence, @@ -712,7 +727,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, // optimization in BuildTable. int steps = 0; merge.MergeUntil(input, prev_snapshot, bottommost_level_, - db_options_.statistics.get(), &steps); + db_options_.statistics.get(), &steps, env_); // Skip the Merge ops combined_idx = combined_idx - 1 + steps; @@ -844,6 +859,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, input->Next(); } } + RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time); if (key_drop_user > 0) { RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user); } @@ -859,7 +875,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, } void CompactionJob::CallCompactionFilterV2( - CompactionFilterV2* compaction_filter_v2) { + CompactionFilterV2* compaction_filter_v2, uint64_t* time) { if (compact_ == nullptr || compaction_filter_v2 == nullptr) { return; } @@ -889,10 +905,11 @@ void CompactionJob::CallCompactionFilterV2( // filter. // If the return value of the compaction filter is true, replace // the entry with a delete marker. + StopWatchNano timer(env_, stats_ != nullptr); compact_->to_delete_buf_ = compaction_filter_v2->Filter( compact_->compaction->level(), user_key_buf, existing_value_buf, &compact_->new_value_buf_, &compact_->value_changed_buf_); - + *time = timer.ElapsedNanos(); // new_value_buf_.size() <= to_delete__buf_.size(). "=" iff all // kv-pairs in this compaction run needs to be deleted. assert(compact_->to_delete_buf_.size() == compact_->key_str_buf_.size()); diff --git a/db/compaction_job.h b/db/compaction_job.h index 0db7b6f06..512db9f97 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -84,7 +84,8 @@ class CompactionJob { Status ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input, bool is_compaction_v2); // Call compaction_filter_v2->Filter() on kv-pairs in compact - void CallCompactionFilterV2(CompactionFilterV2* compaction_filter_v2); + void CallCompactionFilterV2(CompactionFilterV2* compaction_filter_v2, + uint64_t* time); Status FinishCompactionOutputFile(Iterator* input); Status InstallCompactionResults(InstrumentedMutex* db_mutex); SequenceNumber findEarliestVisibleSnapshot( diff --git a/db/db_iter.cc b/db/db_iter.cc index 799cec97c..3ad90ffd4 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -313,8 +313,14 @@ void DBIter::MergeValuesNewToOld() { // final result in saved_value_. We are done! // ignore corruption if there is any. const Slice val = iter_->value(); - user_merge_operator_->FullMerge(ikey.user_key, &val, operands, - &saved_value_, logger_); + { + StopWatchNano timer(env_, statistics_ != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + user_merge_operator_->FullMerge(ikey.user_key, &val, operands, + &saved_value_, logger_); + RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, + timer.ElapsedNanos()); + } // iter_ is positioned after put iter_->Next(); return; @@ -328,12 +334,17 @@ void DBIter::MergeValuesNewToOld() { } } - // we either exhausted all internal keys under this user key, or hit - // a deletion marker. - // feed null as the existing value to the merge operator, such that - // client can differentiate this scenario and do things accordingly. - user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands, - &saved_value_, logger_); + { + StopWatchNano timer(env_, statistics_ != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + // we either exhausted all internal keys under this user key, or hit + // a deletion marker. + // feed null as the existing value to the merge operator, such that + // client can differentiate this scenario and do things accordingly. + user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands, + &saved_value_, logger_); + RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos()); + } } void DBIter::Prev() { @@ -434,14 +445,24 @@ bool DBIter::FindValueForCurrentKey() { return false; case kTypeMerge: if (last_not_merge_type == kTypeDeletion) { + StopWatchNano timer(env_, statistics_ != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands, &saved_value_, logger_); + RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, + timer.ElapsedNanos()); } else { assert(last_not_merge_type == kTypeValue); std::string last_put_value = saved_value_; Slice temp_slice(last_put_value); - user_merge_operator_->FullMerge(saved_key_.GetKey(), &temp_slice, - operands, &saved_value_, logger_); + { + StopWatchNano timer(env_, statistics_ != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + user_merge_operator_->FullMerge(saved_key_.GetKey(), &temp_slice, + operands, &saved_value_, logger_); + RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, + timer.ElapsedNanos()); + } } break; case kTypeValue: @@ -492,9 +513,13 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { if (!iter_->Valid() || (user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) != 0) || ikey.type == kTypeDeletion) { - user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands, - &saved_value_, logger_); - + { + StopWatchNano timer(env_, statistics_ != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands, + &saved_value_, logger_); + RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos()); + } // Make iter_ valid and point to saved_key_ if (!iter_->Valid() || (user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) != 0)) { @@ -506,8 +531,13 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { } const Slice& val = iter_->value(); - user_merge_operator_->FullMerge(saved_key_.GetKey(), &val, operands, - &saved_value_, logger_); + { + StopWatchNano timer(env_, statistics_ != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + user_merge_operator_->FullMerge(saved_key_.GetKey(), &val, operands, + &saved_value_, logger_); + RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos()); + } valid_ = true; return true; } diff --git a/db/db_test.cc b/db/db_test.cc index 8624ccf88..57558dbf6 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -394,6 +394,10 @@ class SpecialEnv : public EnvWrapper { } return s; } + + virtual uint64_t NowNanos() override { + return target()->NowNanos() + addon_time_ * 1000; + } }; class DBTest : public testing::Test { @@ -3755,6 +3759,22 @@ class DeleteFilter : public CompactionFilter { virtual const char* Name() const override { return "DeleteFilter"; } }; +class DelayFilter : public CompactionFilter { + public: + explicit DelayFilter(DBTest* d) : db_test(d) {} + virtual bool Filter(int level, const Slice& key, const Slice& value, + std::string* new_value, + bool* value_changed) const override { + db_test->env_->addon_time_ += 1000; + return true; + } + + virtual const char* Name() const override { return "DelayFilter"; } + + private: + DBTest* db_test; +}; + class ConditionalFilter : public CompactionFilter { public: explicit ConditionalFilter(const std::string* filtered_value) @@ -3821,6 +3841,20 @@ class DeleteFilterFactory : public CompactionFilterFactory { virtual const char* Name() const override { return "DeleteFilterFactory"; } }; +class DelayFilterFactory : public CompactionFilterFactory { + public: + explicit DelayFilterFactory(DBTest* d) : db_test(d) {} + virtual std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& context) override { + return std::unique_ptr(new DelayFilter(db_test)); + } + + virtual const char* Name() const override { return "DelayFilterFactory"; } + + private: + DBTest* db_test; +}; + class ConditionalFilterFactory : public CompactionFilterFactory { public: explicit ConditionalFilterFactory(const Slice& filtered_value) @@ -10216,14 +10250,10 @@ TEST_F(DBTest, ThreadStatusSingleCompaction) { const int kNumL0Files = 4; options.level0_file_num_compaction_trigger = kNumL0Files; - rocksdb::SyncPoint::GetInstance()->LoadDependency({ - {"DBTest::ThreadStatusSingleCompaction:0", - "DBImpl::BGWorkCompaction"}, - {"CompactionJob::Run():Start", - "DBTest::ThreadStatusSingleCompaction:1"}, - {"DBTest::ThreadStatusSingleCompaction:2", - "CompactionJob::Run():End"}, + {"DBTest::ThreadStatusSingleCompaction:0", "DBImpl::BGWorkCompaction"}, + {"CompactionJob::Run():Start", "DBTest::ThreadStatusSingleCompaction:1"}, + {"DBTest::ThreadStatusSingleCompaction:2", "CompactionJob::Run():End"}, }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); @@ -11810,6 +11840,118 @@ TEST_F(DBTest, CloseSpeedup) { Destroy(options); } + +class DelayedMergeOperator : public AssociativeMergeOperator { + private: + DBTest* db_test_; + + public: + explicit DelayedMergeOperator(DBTest* d) : db_test_(d) {} + virtual bool Merge(const Slice& key, const Slice* existing_value, + const Slice& value, std::string* new_value, + Logger* logger) const override { + db_test_->env_->addon_time_ += 1000; + return true; + } + + virtual const char* Name() const override { return "DelayedMergeOperator"; } +}; + +TEST_F(DBTest, MergeTestTime) { + std::string one, two, three; + PutFixed64(&one, 1); + PutFixed64(&two, 2); + PutFixed64(&three, 3); + + // Enable time profiling + SetPerfLevel(kEnableTime); + this->env_->addon_time_ = 0; + Options options; + options = CurrentOptions(options); + options.statistics = rocksdb::CreateDBStatistics(); + options.merge_operator.reset(new DelayedMergeOperator(this)); + DestroyAndReopen(options); + + ASSERT_EQ(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 0); + db_->Put(WriteOptions(), "foo", one); + ASSERT_OK(Flush()); + ASSERT_OK(db_->Merge(WriteOptions(), "foo", two)); + ASSERT_OK(Flush()); + ASSERT_OK(db_->Merge(WriteOptions(), "foo", three)); + ASSERT_OK(Flush()); + + ReadOptions opt; + opt.verify_checksums = true; + opt.snapshot = nullptr; + std::string result; + db_->Get(opt, "foo", &result); + + ASSERT_LT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 2100000); + ASSERT_GT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 1900000); + + ReadOptions read_options; + std::unique_ptr iter(db_->NewIterator(read_options)); + int count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + ++count; + } + + ASSERT_EQ(1, count); + + ASSERT_LT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 4200000); + ASSERT_GT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 3800000); +} + +TEST_F(DBTest, MergeCompactionTimeTest) { + SetPerfLevel(kEnableTime); + Options options; + options = CurrentOptions(options); + options.compaction_filter_factory = std::make_shared(); + options.statistics = rocksdb::CreateDBStatistics(); + options.merge_operator.reset(new DelayedMergeOperator(this)); + options.compaction_style = kCompactionStyleUniversal; + DestroyAndReopen(options); + + for (int i = 0; i < 1000; i++) { + ASSERT_OK(db_->Merge(WriteOptions(), "foo", "TEST")); + ASSERT_OK(Flush()); + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + + ASSERT_NE(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 0); +} + +TEST_F(DBTest, FilterCompactionTimeTest) { + Options options; + options.compaction_filter_factory = + std::make_shared(this); + options.disable_auto_compactions = true; + options.create_if_missing = true; + options.statistics = rocksdb::CreateDBStatistics(); + options = CurrentOptions(options); + DestroyAndReopen(options); + + // put some data + for (int table = 0; table < 4; ++table) { + for (int i = 0; i < 10 + table; ++i) { + Put(ToString(table * 100 + i), "val"); + } + Flush(); + } + + ASSERT_OK(db_->CompactRange(nullptr, nullptr)); + ASSERT_EQ(0U, CountLiveFiles()); + + Reopen(options); + + Iterator* itr = db_->NewIterator(ReadOptions()); + itr->SeekToFirst(); + ASSERT_NE(TestGetTickerCount(options, FILTER_OPERATION_TOTAL_TIME), 0); + delete itr; +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/memtable.cc b/db/memtable.cc index f94f97ce8..76392d6cc 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -76,7 +76,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp, : 0), prefix_extractor_(ioptions.prefix_extractor), should_flush_(ShouldFlushNow()), - flush_scheduled_(false) { + flush_scheduled_(false), + env_(ioptions.env) { // if should_flush_ == true without an entry inserted, something must have // gone wrong already. assert(!should_flush_); @@ -349,6 +350,7 @@ struct Saver { Logger* logger; Statistics* statistics; bool inplace_update_support; + Env* env_; }; } // namespace @@ -383,9 +385,17 @@ static bool SaveValue(void* arg, const char* entry) { *(s->status) = Status::OK(); if (*(s->merge_in_progress)) { assert(merge_operator); - if (!merge_operator->FullMerge(s->key->user_key(), &v, - merge_context->GetOperands(), s->value, - s->logger)) { + bool merge_success = false; + { + StopWatchNano timer(s->env_, s->statistics != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + merge_success = merge_operator->FullMerge( + s->key->user_key(), &v, merge_context->GetOperands(), s->value, + s->logger); + RecordTick(s->statistics, MERGE_OPERATION_TOTAL_TIME, + timer.ElapsedNanos()); + } + if (!merge_success) { RecordTick(s->statistics, NUMBER_MERGE_FAILURES); *(s->status) = Status::Corruption("Error: Could not perform merge."); @@ -403,9 +413,17 @@ static bool SaveValue(void* arg, const char* entry) { if (*(s->merge_in_progress)) { assert(merge_operator); *(s->status) = Status::OK(); - if (!merge_operator->FullMerge(s->key->user_key(), nullptr, - merge_context->GetOperands(), s->value, - s->logger)) { + bool merge_success = false; + { + StopWatchNano timer(s->env_, s->statistics != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + merge_success = merge_operator->FullMerge( + s->key->user_key(), nullptr, merge_context->GetOperands(), + s->value, s->logger); + RecordTick(s->statistics, MERGE_OPERATION_TOTAL_TIME, + timer.ElapsedNanos()); + } + if (!merge_success) { RecordTick(s->statistics, NUMBER_MERGE_FAILURES); *(s->status) = Status::Corruption("Error: Could not perform merge."); @@ -472,6 +490,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, saver.logger = moptions_.info_log; saver.inplace_update_support = moptions_.inplace_update_support; saver.statistics = moptions_.statistics; + saver.env_ = env_; table_->Get(key, &saver, SaveValue); } diff --git a/db/memtable.h b/db/memtable.h index 4234d5dfd..66fc98282 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -17,6 +17,7 @@ #include "db/skiplist.h" #include "db/version_edit.h" #include "rocksdb/db.h" +#include "rocksdb/env.h" #include "rocksdb/memtablerep.h" #include "rocksdb/immutable_options.h" #include "db/memtable_allocator.h" @@ -261,6 +262,7 @@ class MemTable { // a flag indicating if flush has been scheduled bool flush_scheduled_; + Env* env_; }; extern const char* EncodeKey(std::string* scratch, const Slice& target); diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 11b5d8f47..3a3cbe366 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -11,6 +11,8 @@ #include "util/statistics.h" #include #include +#include "util/perf_context_imp.h" +#include "util/stop_watch.h" namespace rocksdb { @@ -21,7 +23,8 @@ namespace rocksdb { // operands_ stores the list of merge operands encountered while merging. // keys_[i] corresponds to operands_[i] for each i. void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, - bool at_bottom, Statistics* stats, int* steps) { + bool at_bottom, Statistics* stats, int* steps, + Env* env_) { // Get a copy of the internal key, before it's invalidated by iter->Next() // Also maintain the list of merge operands seen. assert(HasOperator()); @@ -78,10 +81,14 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // => store result in operands_.back() (and update keys_.back()) // => change the entry type to kTypeValue for keys_.back() // We are done! Return a success if the merge passes. - success_ = user_merge_operator_->FullMerge(ikey.user_key, nullptr, - operands_, &merge_result, - logger_); - + { + StopWatchNano timer(env_, stats != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + success_ = user_merge_operator_->FullMerge( + ikey.user_key, nullptr, operands_, &merge_result, logger_); + RecordTick(stats, MERGE_OPERATION_TOTAL_TIME, + env_ != nullptr ? timer.ElapsedNanos() : 0); + } // We store the result in keys_.back() and operands_.back() // if nothing went wrong (i.e.: no operand corruption on disk) if (success_) { @@ -110,9 +117,14 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // => change the entry type to kTypeValue for keys_.back() // We are done! Success! const Slice val = iter->value(); - success_ = user_merge_operator_->FullMerge(ikey.user_key, &val, operands_, - &merge_result, logger_); - + { + StopWatchNano timer(env_, stats != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + success_ = user_merge_operator_->FullMerge( + ikey.user_key, &val, operands_, &merge_result, logger_); + RecordTick(stats, MERGE_OPERATION_TOTAL_TIME, + env_ != nullptr ? timer.ElapsedNanos() : 0); + } // We store the result in keys_.back() and operands_.back() // if nothing went wrong (i.e.: no operand corruption on disk) if (success_) { @@ -173,10 +185,14 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, assert(kTypeMerge == orig_ikey.type); assert(operands_.size() >= 1); assert(operands_.size() == keys_.size()); - success_ = user_merge_operator_->FullMerge(orig_ikey.user_key, nullptr, - operands_, &merge_result, - logger_); - + { + StopWatchNano timer(env_, stats != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + success_ = user_merge_operator_->FullMerge( + orig_ikey.user_key, nullptr, operands_, &merge_result, logger_); + RecordTick(stats, MERGE_OPERATION_TOTAL_TIME, + env_ != nullptr ? timer.ElapsedNanos() : 0); + } if (success_) { std::string& original_key = keys_.back(); // The original key encountered orig_ikey.type = kTypeValue; @@ -195,16 +211,25 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // merge the stacked merge operands into a single operand. if (operands_.size() >= 2 && - operands_.size() >= min_partial_merge_operands_ && - user_merge_operator_->PartialMergeMulti( + operands_.size() >= min_partial_merge_operands_) { + bool merge_success = false; + { + StopWatchNano timer(env_, stats != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + merge_success = user_merge_operator_->PartialMergeMulti( orig_ikey.user_key, std::deque(operands_.begin(), operands_.end()), - &merge_result, logger_)) { - // Merging of operands (associative merge) was successful. - // Replace operands with the merge result - operands_.clear(); - operands_.push_front(std::move(merge_result)); - keys_.erase(keys_.begin(), keys_.end() - 1); + &merge_result, logger_); + RecordTick(stats, MERGE_OPERATION_TOTAL_TIME, + env_ != nullptr ? timer.ElapsedNanos() : 0); + } + if (merge_success) { + // Merging of operands (associative merge) was successful. + // Replace operands with the merge result + operands_.clear(); + operands_.push_front(std::move(merge_result)); + keys_.erase(keys_.begin(), keys_.end() - 1); + } } } } diff --git a/db/merge_helper.h b/db/merge_helper.h index 7188d02a4..69f576ca1 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -10,6 +10,7 @@ #include "rocksdb/slice.h" #include #include +#include "rocksdb/env.h" namespace rocksdb { @@ -48,7 +49,7 @@ class MergeHelper { // we could reach the start of the history of this user key. void MergeUntil(Iterator* iter, SequenceNumber stop_before = 0, bool at_bottom = false, Statistics* stats = nullptr, - int* steps = nullptr); + int* steps = nullptr, Env* env_ = nullptr); // Query the merge result // These are valid until the next MergeUntil call diff --git a/db/version_set.cc b/db/version_set.cc index c1ad0bff2..8d67bf246 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -751,7 +751,8 @@ VersionStorageInfo::VersionStorageInfo( Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, uint64_t version_number) - : cfd_(column_family_data), + : env_(vset->env_), + cfd_(column_family_data), info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->info_log), db_statistics_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->statistics), @@ -786,7 +787,7 @@ void Version::Get(const ReadOptions& read_options, GetContext get_context( user_comparator(), merge_operator_, info_log_, db_statistics_, status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, - value, value_found, merge_context); + value, value_found, merge_context, this->env_); FilePicker fp( storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_, diff --git a/db/version_set.h b/db/version_set.h index 5d56e128c..92c04dfb5 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -36,6 +36,7 @@ #include "db/log_reader.h" #include "db/file_indexer.h" #include "db/write_controller.h" +#include "rocksdb/env.h" #include "util/instrumented_mutex.h" namespace rocksdb { @@ -435,6 +436,7 @@ class Version { void GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta); private: + Env* env_; friend class VersionSet; const InternalKeyComparator* internal_comparator() const { diff --git a/db/write_batch.cc b/db/write_batch.cc index aad9a643c..52956f8a8 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -33,6 +33,7 @@ #include "util/coding.h" #include "util/statistics.h" #include +#include "util/perf_context_imp.h" namespace rocksdb { @@ -435,8 +436,17 @@ class MemTableInserter : public WriteBatch::Handler { std::deque operands; operands.push_front(value.ToString()); std::string new_value; - if (!merge_operator->FullMerge(key, &get_value_slice, operands, - &new_value, moptions->info_log)) { + bool merge_success = false; + { + StopWatchNano timer(Env::Default(), moptions->statistics != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + merge_success = merge_operator->FullMerge( + key, &get_value_slice, operands, &new_value, moptions->info_log); + RecordTick(moptions->statistics, MERGE_OPERATION_TOTAL_TIME, + timer.ElapsedNanos()); + } + + if (!merge_success) { // Failed to merge! RecordTick(moptions->statistics, NUMBER_MERGE_FAILURES); diff --git a/include/rocksdb/perf_context.h b/include/rocksdb/perf_context.h index ac9ef7977..3b8145a94 100644 --- a/include/rocksdb/perf_context.h +++ b/include/rocksdb/perf_context.h @@ -72,6 +72,8 @@ struct PerfContext { uint64_t db_mutex_lock_nanos; // time spent on acquiring DB mutex. // Time spent on waiting with a condition variable created with DB mutex. uint64_t db_condition_wait_nanos; + // Time spent on merge operator. + uint64_t merge_operator_time_nanos; }; #if defined(NPERF_CONTEXT) || defined(IOS_CROSS_COMPILE) diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index c5b364a0c..09bf68739 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -139,6 +139,8 @@ enum Tickers : uint32_t { NUMBER_SUPERVERSION_RELEASES, NUMBER_SUPERVERSION_CLEANUPS, NUMBER_BLOCK_NOT_COMPRESSED, + MERGE_OPERATION_TOTAL_TIME, + FILTER_OPERATION_TOTAL_TIME, TICKER_ENUM_MAX }; @@ -205,6 +207,8 @@ const std::vector> TickersNameMap = { {NUMBER_SUPERVERSION_RELEASES, "rocksdb.number.superversion_releases"}, {NUMBER_SUPERVERSION_CLEANUPS, "rocksdb.number.superversion_cleanups"}, {NUMBER_BLOCK_NOT_COMPRESSED, "rocksdb.number.block.not_compressed"}, + {MERGE_OPERATION_TOTAL_TIME, "rocksdb.merge.operation.time.nanos"}, + {FILTER_OPERATION_TOTAL_TIME, "rocksdb.filter.operation.time.nanos"}, }; /** diff --git a/table/get_context.cc b/table/get_context.cc index 59dfa41e6..737b3660e 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -4,27 +4,29 @@ // of patent rights can be found in the PATENTS file in the same directory. #include "table/get_context.h" +#include "rocksdb/env.h" #include "rocksdb/merge_operator.h" #include "rocksdb/statistics.h" +#include "util/perf_context_imp.h" #include "util/statistics.h" namespace rocksdb { GetContext::GetContext(const Comparator* ucmp, - const MergeOperator* merge_operator, - Logger* logger, Statistics* statistics, - GetState init_state, const Slice& user_key, std::string* ret_value, - bool* value_found, MergeContext* merge_context) - : ucmp_(ucmp), - merge_operator_(merge_operator), - logger_(logger), - statistics_(statistics), - state_(init_state), - user_key_(user_key), - value_(ret_value), - value_found_(value_found), - merge_context_(merge_context) { -} + const MergeOperator* merge_operator, Logger* logger, + Statistics* statistics, GetState init_state, + const Slice& user_key, std::string* ret_value, + bool* value_found, MergeContext* merge_context, Env* env) + : ucmp_(ucmp), + merge_operator_(merge_operator), + logger_(logger), + statistics_(statistics), + state_(init_state), + user_key_(user_key), + value_(ret_value), + value_found_(value_found), + merge_context_(merge_context), + env_(env) {} // Called from TableCache::Get and Table::Get when file/block in which // key may exist are not there in TableCache/BlockCache respectively. In this @@ -58,9 +60,17 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, } else if (kMerge == state_) { assert(merge_operator_ != nullptr); state_ = kFound; - if (!merge_operator_->FullMerge(user_key_, &value, - merge_context_->GetOperands(), - value_, logger_)) { + bool merge_success = false; + { + StopWatchNano timer(env_, statistics_ != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + merge_success = merge_operator_->FullMerge( + user_key_, &value, merge_context_->GetOperands(), value_, + logger_); + RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, + timer.ElapsedNanos()); + } + if (!merge_success) { RecordTick(statistics_, NUMBER_MERGE_FAILURES); state_ = kCorrupt; } @@ -73,9 +83,17 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, state_ = kDeleted; } else if (kMerge == state_) { state_ = kFound; - if (!merge_operator_->FullMerge(user_key_, nullptr, - merge_context_->GetOperands(), - value_, logger_)) { + bool merge_success = false; + { + StopWatchNano timer(env_, statistics_ != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + merge_success = merge_operator_->FullMerge( + user_key_, nullptr, merge_context_->GetOperands(), value_, + logger_); + RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, + timer.ElapsedNanos()); + } + if (!merge_success) { RecordTick(statistics_, NUMBER_MERGE_FAILURES); state_ = kCorrupt; } diff --git a/table/get_context.h b/table/get_context.h index a38f3c533..700f23aeb 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -6,6 +6,7 @@ #pragma once #include #include "db/merge_context.h" +#include "rocksdb/env.h" namespace rocksdb { class MergeContext; @@ -21,9 +22,9 @@ class GetContext { }; GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, - Logger* logger, Statistics* statistics, - GetState init_state, const Slice& user_key, std::string* ret_value, - bool* value_found, MergeContext* merge_context); + Logger* logger, Statistics* statistics, GetState init_state, + const Slice& user_key, std::string* ret_value, bool* value_found, + MergeContext* merge_context, Env* env_); void MarkKeyMayExist(); void SaveValue(const Slice& value); @@ -42,6 +43,7 @@ class GetContext { std::string* value_; bool* value_found_; // Is value set correctly? Used by KeyMayExist MergeContext* merge_context_; + Env* env_; }; } // namespace rocksdb diff --git a/table/table_test.cc b/table/table_test.cc index aba7f7133..09bc513df 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -1611,7 +1611,7 @@ TEST_F(BlockBasedTableTest, BlockCacheDisabledTest) { { GetContext get_context(options.comparator, nullptr, nullptr, nullptr, - GetContext::kNotFound, Slice(), nullptr, + GetContext::kNotFound, Slice(), nullptr, nullptr, nullptr, nullptr); // a hack that just to trigger BlockBasedTable::GetFilter. reader->Get(ReadOptions(), "non-exist-key", &get_context); @@ -1747,7 +1747,7 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) { ASSERT_TRUE(!reader->TEST_filter_block_preloaded()); std::string value; GetContext get_context(options.comparator, nullptr, nullptr, nullptr, - GetContext::kNotFound, user_key, &value, + GetContext::kNotFound, user_key, &value, nullptr, nullptr, nullptr); ASSERT_OK(reader->Get(ReadOptions(), user_key, &get_context)); ASSERT_EQ(value, "hello"); @@ -2003,8 +2003,8 @@ TEST_F(MemTableTest, Simple) { options.memtable_factory = table_factory; ImmutableCFOptions ioptions(options); WriteBuffer wb(options.db_write_buffer_size); - MemTable* memtable = new MemTable(cmp, ioptions, - MutableCFOptions(options, ioptions), &wb); + MemTable* memtable = + new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb); memtable->Ref(); WriteBatch batch; WriteBatchInternal::SetSequence(&batch, 100); diff --git a/util/perf_context.cc b/util/perf_context.cc index dfc818e8b..7be9980e5 100644 --- a/util/perf_context.cc +++ b/util/perf_context.cc @@ -55,6 +55,7 @@ void PerfContext::Reset() { write_memtable_time = 0; db_mutex_lock_nanos = 0; db_condition_wait_nanos = 0; + merge_operator_time_nanos = 0; #endif } @@ -65,32 +66,20 @@ std::string PerfContext::ToString() const { return ""; #else std::ostringstream ss; - ss << OUTPUT(user_key_comparison_count) - << OUTPUT(block_cache_hit_count) - << OUTPUT(block_read_count) - << OUTPUT(block_read_byte) - << OUTPUT(block_read_time) - << OUTPUT(block_checksum_time) - << OUTPUT(block_decompress_time) - << OUTPUT(internal_key_skipped_count) - << OUTPUT(internal_delete_skipped_count) - << OUTPUT(write_wal_time) - << OUTPUT(get_snapshot_time) - << OUTPUT(get_from_memtable_time) - << OUTPUT(get_from_memtable_count) - << OUTPUT(get_post_process_time) - << OUTPUT(get_from_output_files_time) - << OUTPUT(seek_on_memtable_time) - << OUTPUT(seek_on_memtable_count) - << OUTPUT(seek_child_seek_time) - << OUTPUT(seek_child_seek_count) - << OUTPUT(seek_min_heap_time) - << OUTPUT(seek_internal_seek_time) - << OUTPUT(find_next_user_entry_time) - << OUTPUT(write_pre_and_post_process_time) - << OUTPUT(write_memtable_time) - << OUTPUT(db_mutex_lock_nanos) - << OUTPUT(db_condition_wait_nanos); + ss << OUTPUT(user_key_comparison_count) << OUTPUT(block_cache_hit_count) + << OUTPUT(block_read_count) << OUTPUT(block_read_byte) + << OUTPUT(block_read_time) << OUTPUT(block_checksum_time) + << OUTPUT(block_decompress_time) << OUTPUT(internal_key_skipped_count) + << OUTPUT(internal_delete_skipped_count) << OUTPUT(write_wal_time) + << OUTPUT(get_snapshot_time) << OUTPUT(get_from_memtable_time) + << OUTPUT(get_from_memtable_count) << OUTPUT(get_post_process_time) + << OUTPUT(get_from_output_files_time) << OUTPUT(seek_on_memtable_time) + << OUTPUT(seek_on_memtable_count) << OUTPUT(seek_child_seek_time) + << OUTPUT(seek_child_seek_count) << OUTPUT(seek_min_heap_time) + << OUTPUT(seek_internal_seek_time) << OUTPUT(find_next_user_entry_time) + << OUTPUT(write_pre_and_post_process_time) << OUTPUT(write_memtable_time) + << OUTPUT(db_mutex_lock_nanos) << OUTPUT(db_condition_wait_nanos) + << OUTPUT(merge_operator_time_nanos); return ss.str(); #endif } diff --git a/utilities/compacted_db/compacted_db_impl.cc b/utilities/compacted_db/compacted_db_impl.cc index 3bd27e46a..102e35728 100644 --- a/utilities/compacted_db/compacted_db_impl.cc +++ b/utilities/compacted_db/compacted_db_impl.cc @@ -45,7 +45,8 @@ size_t CompactedDBImpl::FindFile(const Slice& key) { Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*, const Slice& key, std::string* value) { GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, - GetContext::kNotFound, key, value, nullptr, nullptr); + GetContext::kNotFound, key, value, nullptr, nullptr, + nullptr); LookupKey lkey(key, kMaxSequenceNumber); files_.files[FindFile(key)].fd.table_reader->Get( options, lkey.internal_key(), &get_context); @@ -76,7 +77,7 @@ std::vector CompactedDBImpl::MultiGet(const ReadOptions& options, if (r != nullptr) { GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, GetContext::kNotFound, keys[idx], &(*values)[idx], - nullptr, nullptr); + nullptr, nullptr, nullptr); LookupKey lkey(keys[idx], kMaxSequenceNumber); r->Get(options, lkey.internal_key(), &get_context); if (get_context.State() == GetContext::kFound) {