From f1bf169484aa59ae65988275918c29a7438816ba Mon Sep 17 00:00:00 2001 From: Mayank Agarwal Date: Thu, 8 Aug 2013 23:07:36 -0700 Subject: [PATCH] Counter for merge failure Summary: With Merge returning bool, it can keep failing silently(eg. While faling to fetch timestamp in TTL). We need to detect this through a rocksdb counter which can get bumped whenever Merge returns false. This will also be super-useful for the mcrocksdb-counter service where Merge may fail. Added a counter NUMBER_MERGE_FAILURES and appropriately updated db/merge_helper.cc I felt that it would be better to directly add counter-bumping in Merge as a default function of MergeOperator class but user should not be aware of this, so this approach seems better to me. Test Plan: make all check Reviewers: dnicholas, haobo, dhruba, vamsi CC: leveldb Differential Revision: https://reviews.facebook.net/D12129 --- db/db_impl.cc | 3 ++- db/memtable.cc | 2 ++ db/memtable.h | 2 +- db/memtablelist.cc | 28 +++++++++++----------------- db/merge_helper.cc | 7 ++++++- db/merge_helper.h | 3 ++- db/version_set.cc | 5 +++++ include/leveldb/statistics.h | 7 +++++-- utilities/ttl/db_ttl.h | 8 +++----- 9 files changed, 37 insertions(+), 28 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index e510c4505..a06bc3b9e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1915,7 +1915,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // object to minimize change to the existing flow. Turn out this // logic could also be nicely re-used for memtable flush purge // optimization in BuildTable. - merge.MergeUntil(input.get(), prev_snapshot, bottommost_level); + merge.MergeUntil(input.get(), prev_snapshot, bottommost_level, + options_.statistics); current_entry_is_merging = true; if (merge.IsSuccess()) { // Successfully found Put/Delete/(end-of-key-range) while merging diff --git a/db/memtable.cc b/db/memtable.cc index a9a684967..4844bcd05 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -169,6 +169,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, assert(merge_operator); if (!merge_operator->Merge(key.user_key(), &v, *operands, value, logger.get())) { + RecordTick(options.statistics, NUMBER_MERGE_FAILURES); *s = Status::Corruption("Error: Could not perform merge."); } } else { @@ -182,6 +183,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, *s = Status::OK(); if (!merge_operator->Merge(key.user_key(), nullptr, *operands, value, logger.get())) { + RecordTick(options.statistics, NUMBER_MERGE_FAILURES); *s = Status::Corruption("Error: Could not perform merge."); } } else { diff --git a/db/memtable.h b/db/memtable.h index fe8cec86b..481f84079 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -8,10 +8,10 @@ #include #include #include -#include "leveldb/db.h" #include "db/dbformat.h" #include "db/skiplist.h" #include "db/version_set.h" +#include "leveldb/db.h" #include "leveldb/memtablerep.h" #include "util/arena_impl.h" diff --git a/db/memtablelist.cc b/db/memtablelist.cc index 6f66faab8..256afe361 100644 --- a/db/memtablelist.cc +++ b/db/memtablelist.cc @@ -21,17 +21,15 @@ using std::list; // Increase reference count on all underling memtables void MemTableList::RefAll() { - for (list::iterator it = memlist_.begin(); - it != memlist_.end() ; ++it) { - (*it)->Ref(); + for (auto &memtable : memlist_) { + memtable->Ref(); } } // Drop reference count on all underling memtables void MemTableList::UnrefAll() { - for (list::iterator it = memlist_.begin(); - it != memlist_.end() ; ++it) { - (*it)->Unref(); + for (auto &memtable : memlist_) { + memtable->Unref(); } } @@ -53,8 +51,7 @@ bool MemTableList::IsFlushPending(int min_write_buffer_number_to_merge) { // Returns the memtables that need to be flushed. void MemTableList::PickMemtablesToFlush(std::vector* ret) { - for (list::reverse_iterator it = memlist_.rbegin(); - it != memlist_.rend(); it++) { + for (auto it = memlist_.rbegin(); it != memlist_.rend(); it++) { MemTable* m = *it; if (!m->flush_in_progress_) { assert(!m->flush_completed_); @@ -184,9 +181,8 @@ void MemTableList::Add(MemTable* m) { // Returns an estimate of the number of bytes of data in use. size_t MemTableList::ApproximateMemoryUsage() { size_t size = 0; - for (list::iterator it = memlist_.begin(); - it != memlist_.end(); ++it) { - size += (*it)->ApproximateMemoryUsage(); + for (auto &memtable : memlist_) { + size += memtable->ApproximateMemoryUsage(); } return size; } @@ -197,9 +193,8 @@ size_t MemTableList::ApproximateMemoryUsage() { bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s, std::deque* operands, const Options& options) { - for (list::iterator it = memlist_.begin(); - it != memlist_.end(); ++it) { - if ((*it)->Get(key, value, s, operands, options)) { + for (auto &memtable : memlist_) { + if (memtable->Get(key, value, s, operands, options)) { return true; } } @@ -207,9 +202,8 @@ bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s, } void MemTableList::GetMemTables(std::vector* output) { - for (list::iterator it = memlist_.begin(); - it != memlist_.end(); ++it) { - output->push_back(*it); + for (auto &memtable : memlist_) { + output->push_back(memtable); } } diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 4f363fec3..d1f3c6683 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -15,7 +15,7 @@ namespace leveldb { // operands_ stores the list of merge operands encountered while merging. // keys_[i] corresponds to operands_[i] for each i. void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, - bool at_bottom) { + bool at_bottom, shared_ptr stats) { // Get a copy of the internal key, before it's invalidated by iter->Next() // Also maintain the list of merge operands seen. keys_.clear(); @@ -79,6 +79,8 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, UpdateInternalKey(&key[0], key.size(), orig_ikey.sequence, orig_ikey.type); swap(operands_.back(), merge_result); + } else { + RecordTick(stats, NUMBER_MERGE_FAILURES); } // move iter to the next entry (before doing anything else) @@ -105,6 +107,8 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, UpdateInternalKey(&key[0], key.size(), orig_ikey.sequence, orig_ikey.type); swap(operands_.back(), merge_result); + } else { + RecordTick(stats, NUMBER_MERGE_FAILURES); } // move iter to the next entry @@ -179,6 +183,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // The final value() is always stored in operands_.back() swap(operands_.back(),merge_result); } else { + RecordTick(stats, NUMBER_MERGE_FAILURES); // Do nothing if not success_. Leave keys() and operands() as they are. } } diff --git a/db/merge_helper.h b/db/merge_helper.h index 9cad8ad04..2d8ed5668 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -3,6 +3,7 @@ #include "db/dbformat.h" #include "leveldb/slice.h" +#include "leveldb/statistics.h" #include #include @@ -40,7 +41,7 @@ class MergeHelper { // at_bottom: (IN) true if the iterator covers the bottem level, which means // we could reach the start of the history of this user key. void MergeUntil(Iterator* iter, SequenceNumber stop_before = 0, - bool at_bottom = false); + bool at_bottom = false, shared_ptr stats=nullptr); // Query the merge result // These are valid until the next MergeUntil call diff --git a/db/version_set.cc b/db/version_set.cc index f2a426bb1..981d7932f 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -245,6 +245,7 @@ struct Saver { std::deque* merge_operands; // the merge operations encountered Logger* logger; bool didIO; // did we do any disk io? + shared_ptr statistics; }; } @@ -287,6 +288,7 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ s->state = kFound; if (!s->merge_operator->Merge(s->user_key, &v, *ops, s->value, s->logger)) { + RecordTick(s->statistics, NUMBER_MERGE_FAILURES); s->state = kCorrupt; } } else { @@ -301,6 +303,7 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ s->state = kFound; if (!s->merge_operator->Merge(s->user_key, nullptr, *ops, s->value, s->logger)) { + RecordTick(s->statistics, NUMBER_MERGE_FAILURES); s->state = kCorrupt; } } else { @@ -391,6 +394,7 @@ void Version::Get(const ReadOptions& options, saver.merge_operands = operands; saver.logger = logger.get(); saver.didIO = false; + saver.statistics = db_options.statistics; stats->seek_file = nullptr; stats->seek_file_level = -1; @@ -517,6 +521,7 @@ void Version::Get(const ReadOptions& options, value, logger.get())) { *status = Status::OK(); } else { + RecordTick(db_options.statistics, NUMBER_MERGE_FAILURES); *status = Status::Corruption("could not perform end-of-key merge for ", user_key); } diff --git a/include/leveldb/statistics.h b/include/leveldb/statistics.h index 5ab7f8ed6..e214c2bef 100644 --- a/include/leveldb/statistics.h +++ b/include/leveldb/statistics.h @@ -60,7 +60,9 @@ enum Tickers { NUMBER_FILTERED_DELETES = 21, - TICKER_ENUM_MAX = 22 + NUMBER_MERGE_FAILURES = 22, + + TICKER_ENUM_MAX = 23 }; const std::vector> TickersNameMap = { @@ -85,7 +87,8 @@ const std::vector> TickersNameMap = { { NUMBER_MULTIGET_CALLS, "rocksdb.number.multiget.get" }, { NUMBER_MULTIGET_KEYS_READ, "rocksdb.number.multiget.keys.read" }, { NUMBER_MULTIGET_BYTES_READ, "rocksdb.number.multiget.bytes.read" }, - { NUMBER_FILTERED_DELETES, "rocksdb.number.deletes.filtered" } + { NUMBER_FILTERED_DELETES, "rocksdb.number.deletes.filtered" }, + { NUMBER_MERGE_FAILURES, "rocksdb.number.merge.failures" } }; /** diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index dcc41bce5..ada7f99ba 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -268,17 +268,16 @@ class TtlMergeOperator : public MergeOperator { if (existing_value && existing_value->size() < ts_len) { Log(logger, "Error: Could not remove timestamp from existing value."); return false; - // TODO: Change Merge semantics and add a counter here } // Extract time-stamp from each operand to be passed to user_merge_op_ std::deque operands_without_ts; - for (auto it = operands.begin(); it != operands.end(); ++it) { - if (it->size() < ts_len) { + for (const auto &operand : operands) { + if (operand.size() < ts_len) { Log(logger, "Error: Could not remove timestamp from operand value."); return false; } - operands_without_ts.push_back(it->substr(0, it->size() - ts_len)); + operands_without_ts.push_back(operand.substr(0, operand.size() - ts_len)); } // Apply the user merge operator (store result in *new_value) @@ -316,7 +315,6 @@ class TtlMergeOperator : public MergeOperator { if (left_operand.size() < ts_len || right_operand.size() < ts_len) { Log(logger, "Error: Could not remove timestamp from value."); return false; - //TODO: Change Merge semantics and add a counter here } // Apply the user partial-merge operator (store result in *new_value)