Aggregate hot Iterator counters in LocalStatistics (DBIter::Next perf regression)

Summary:
This patch bump the counters in the frequent code path DBIter::Next() / DBIter::Prev() in a local data members and send them to Statistics when the iterator is destroyed
A better solution will be to have thread_local implementation for Statistics

New performance
```
readseq      :       0.035 micros/op 28597881 ops/sec; 3163.7 MB/s
     1,851,568,819      stalled-cycles-frontend   #   31.29% frontend cycles idle    [49.86%]
       884,929,823      stalled-cycles-backend    #   14.95% backend  cycles idle    [50.21%]
readreverse  :       0.071 micros/op 14077393 ops/sec; 1557.3 MB/s
     3,239,575,993      stalled-cycles-frontend   #   27.36% frontend cycles idle    [49.96%]
     1,558,253,983      stalled-cycles-backend    #   13.16% backend  cycles idle    [50.14%]

```

Existing performance

```
readreverse  :       0.174 micros/op 5732342 ops/sec;  634.1 MB/s
    20,570,209,389      stalled-cycles-frontend   #   70.71% frontend cycles idle    [50.01%]
    18,422,816,837      stalled-cycles-backend    #   63.33% backend  cycles idle    [50.04%]

readseq      :       0.119 micros/op 8400537 ops/sec;  929.3 MB/s
    15,634,225,844      stalled-cycles-frontend   #   79.07% frontend cycles idle    [49.96%]
    14,227,427,453      stalled-cycles-backend    #   71.95% backend  cycles idle    [50.09%]
```

Test Plan: unit tests

Reviewers: yhchiang, sdong, igor

Reviewed By: sdong

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D55107
main
Islam AbdelRahman 9 years ago
parent 790252805d
commit 580fede347
  1. 1
      HISTORY.md
  2. 60
      db/db_iter.cc
  3. 82
      db/db_test.cc

@ -3,6 +3,7 @@
### Public API Changes ### Public API Changes
* Change default of BlockBasedTableOptions.format_version to 2. It means default DB created by 4.6 or up cannot be opened by RocksDB version 3.9 or earlier. * Change default of BlockBasedTableOptions.format_version to 2. It means default DB created by 4.6 or up cannot be opened by RocksDB version 3.9 or earlier.
* Added strict_capacity_limit option to NewLRUCache. If the flag is set to true, insert to cache will fail if no enough capacity can be free. Signiture of Cache::Insert() is updated accordingly. * Added strict_capacity_limit option to NewLRUCache. If the flag is set to true, insert to cache will fail if no enough capacity can be free. Signiture of Cache::Insert() is updated accordingly.
* Tickers [NUMBER_DB_NEXT, NUMBER_DB_PREV, NUMBER_DB_NEXT_FOUND, NUMBER_DB_PREV_FOUND, ITER_BYTES_READ] are not updated immediately. The are updated when the Iterator is deleted.
### New Features ### New Features
* Add CompactionPri::kMinOverlappingRatio, a compaction picking mode friendly to write amplification. * Add CompactionPri::kMinOverlappingRatio, a compaction picking mode friendly to write amplification.
* Deprecate Iterator::IsKeyPinned() and replace it with Iterator::GetProperty() with prop_name="rocksdb.iterator.is.key.pinned" * Deprecate Iterator::IsKeyPinned() and replace it with Iterator::GetProperty() with prop_name="rocksdb.iterator.is.key.pinned"

@ -60,6 +60,44 @@ class DBIter: public Iterator {
kReverse kReverse
}; };
// LocalStatistics contain Statistics counters that will be aggregated per
// each iterator instance and then will be sent to the global statistics when
// the iterator is destroyed.
//
// The purpose of this approach is to avoid perf regression happening
// when multiple threads bump the atomic counters from a DBIter::Next().
struct LocalStatistics {
explicit LocalStatistics() { ResetCounters(); }
void ResetCounters() {
next_count_ = 0;
next_found_count_ = 0;
prev_count_ = 0;
prev_found_count_ = 0;
bytes_read_ = 0;
}
void BumpGlobalStatistics(Statistics* global_statistics) {
RecordTick(global_statistics, NUMBER_DB_NEXT, next_count_);
RecordTick(global_statistics, NUMBER_DB_NEXT_FOUND, next_found_count_);
RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_);
RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_);
RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_);
ResetCounters();
}
// Map to Tickers::NUMBER_DB_NEXT
uint64_t next_count_;
// Map to Tickers::NUMBER_DB_NEXT_FOUND
uint64_t next_found_count_;
// Map to Tickers::NUMBER_DB_PREV
uint64_t prev_count_;
// Map to Tickers::NUMBER_DB_PREV_FOUND
uint64_t prev_found_count_;
// Map to Tickers::ITER_BYTES_READ
uint64_t bytes_read_;
};
DBIter(Env* env, const ImmutableCFOptions& ioptions, const Comparator* cmp, DBIter(Env* env, const ImmutableCFOptions& ioptions, const Comparator* cmp,
InternalIterator* iter, SequenceNumber s, bool arena_mode, InternalIterator* iter, SequenceNumber s, bool arena_mode,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number, uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
@ -86,6 +124,7 @@ class DBIter: public Iterator {
} }
virtual ~DBIter() { virtual ~DBIter() {
RecordTick(statistics_, NO_ITERATORS, -1); RecordTick(statistics_, NO_ITERATORS, -1);
local_stats_.BumpGlobalStatistics(statistics_);
if (!arena_mode_) { if (!arena_mode_) {
delete iter_; delete iter_;
} else { } else {
@ -213,6 +252,7 @@ class DBIter: public Iterator {
bool iter_pinned_; bool iter_pinned_;
// List of operands for merge operator. // List of operands for merge operator.
std::deque<std::string> merge_operands_; std::deque<std::string> merge_operands_;
LocalStatistics local_stats_;
// No copying allowed // No copying allowed
DBIter(const DBIter&); DBIter(const DBIter&);
@ -250,6 +290,9 @@ void DBIter::Next() {
PERF_COUNTER_ADD(internal_key_skipped_count, 1); PERF_COUNTER_ADD(internal_key_skipped_count, 1);
} }
if (statistics_ != nullptr) {
local_stats_.next_count_++;
}
// Now we point to the next internal position, for both of merge and // Now we point to the next internal position, for both of merge and
// not merge cases. // not merge cases.
if (!iter_->Valid()) { if (!iter_->Valid()) {
@ -257,18 +300,15 @@ void DBIter::Next() {
return; return;
} }
FindNextUserEntry(true /* skipping the current user key */); FindNextUserEntry(true /* skipping the current user key */);
if (statistics_ != nullptr) {
RecordTick(statistics_, NUMBER_DB_NEXT);
if (valid_) {
RecordTick(statistics_, NUMBER_DB_NEXT_FOUND);
RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
}
}
if (valid_ && prefix_extractor_ && prefix_same_as_start_ && if (valid_ && prefix_extractor_ && prefix_same_as_start_ &&
prefix_extractor_->Transform(saved_key_.GetKey()) prefix_extractor_->Transform(saved_key_.GetKey())
.compare(prefix_start_.GetKey()) != 0) { .compare(prefix_start_.GetKey()) != 0) {
valid_ = false; valid_ = false;
} }
if (statistics_ != nullptr && valid_) {
local_stats_.next_found_count_++;
local_stats_.bytes_read_ += (key().size() + value().size());
}
} }
// PRE: saved_key_ has the current user key if skipping // PRE: saved_key_ has the current user key if skipping
@ -436,10 +476,10 @@ void DBIter::Prev() {
} }
PrevInternal(); PrevInternal();
if (statistics_ != nullptr) { if (statistics_ != nullptr) {
RecordTick(statistics_, NUMBER_DB_PREV); local_stats_.prev_count_++;
if (valid_) { if (valid_) {
RecordTick(statistics_, NUMBER_DB_PREV_FOUND); local_stats_.prev_found_count_++;
RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size()); local_stats_.bytes_read_ += (key().size() + value().size());
} }
} }
if (valid_ && prefix_extractor_ && prefix_same_as_start_ && if (valid_ && prefix_extractor_ && prefix_same_as_start_ &&

@ -10611,6 +10611,88 @@ TEST_F(DBTest, PrefixExtractorBlockFilter) {
delete iter; delete iter;
} }
TEST_F(DBTest, IteratorWithLocalStatistics) {
Options options = CurrentOptions();
options.statistics = rocksdb::CreateDBStatistics();
DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < 1000; i++) {
// Key 10 bytes / Value 10 bytes
ASSERT_OK(Put(RandomString(&rnd, 10), RandomString(&rnd, 10)));
}
std::atomic<uint64_t> total_next(0);
std::atomic<uint64_t> total_next_found(0);
std::atomic<uint64_t> total_prev(0);
std::atomic<uint64_t> total_prev_found(0);
std::atomic<uint64_t> total_bytes(0);
std::vector<std::thread> threads;
std::function<void()> reader_func_next = [&]() {
Iterator* iter = db_->NewIterator(ReadOptions());
iter->SeekToFirst();
// Seek will bump ITER_BYTES_READ
total_bytes += iter->key().size();
total_bytes += iter->value().size();
while (true) {
iter->Next();
total_next++;
if (!iter->Valid()) {
break;
}
total_next_found++;
total_bytes += iter->key().size();
total_bytes += iter->value().size();
}
delete iter;
};
std::function<void()> reader_func_prev = [&]() {
Iterator* iter = db_->NewIterator(ReadOptions());
iter->SeekToLast();
// Seek will bump ITER_BYTES_READ
total_bytes += iter->key().size();
total_bytes += iter->value().size();
while (true) {
iter->Prev();
total_prev++;
if (!iter->Valid()) {
break;
}
total_prev_found++;
total_bytes += iter->key().size();
total_bytes += iter->value().size();
}
delete iter;
};
for (int i = 0; i < 10; i++) {
threads.emplace_back(reader_func_next);
}
for (int i = 0; i < 15; i++) {
threads.emplace_back(reader_func_prev);
}
for (auto& t : threads) {
t.join();
}
ASSERT_EQ(TestGetTickerCount(options, NUMBER_DB_NEXT), total_next);
ASSERT_EQ(TestGetTickerCount(options, NUMBER_DB_NEXT_FOUND),
total_next_found);
ASSERT_EQ(TestGetTickerCount(options, NUMBER_DB_PREV), total_prev);
ASSERT_EQ(TestGetTickerCount(options, NUMBER_DB_PREV_FOUND),
total_prev_found);
ASSERT_EQ(TestGetTickerCount(options, ITER_BYTES_READ), total_bytes);
}
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
class BloomStatsTestWithParam class BloomStatsTestWithParam
: public DBTest, : public DBTest,

Loading…
Cancel
Save