perf_context measure user bytes read

Summary:
With this PR, we can measure read-amp for queries where perf_context is enabled as follows:

```
SetPerfLevel(kEnableCount);
Get(1, "foo");
double read_amp = static_cast<double>(get_perf_context()->block_read_byte / get_perf_context()->get_read_bytes);
SetPerfLevel(kDisable);
```

Our internal infra enables perf_context for a sampling of queries. So we'll be able to compute the read-amp for the sample set, which can give us a good estimate of read-amp.
Closes https://github.com/facebook/rocksdb/pull/2749

Differential Revision: D5647240

Pulled By: ajkr

fbshipit-source-id: ad73550b06990cf040cc4528fa885360f308ec12
main
Andrew Kryczka 7 years ago committed by Facebook Github Bot
parent 1efc600ddf
commit ed0a4c93ef
  1. 7
      db/db_basic_test.cc
  2. 2
      db/db_impl.cc
  3. 5
      db/db_iter.cc
  4. 27
      db/db_iterator_test.cc
  5. 5
      include/rocksdb/perf_context.h
  6. 6
      monitoring/perf_context.cc

@ -360,7 +360,6 @@ TEST_F(DBBasicTest, FLUSH) {
WriteOptions writeOpt = WriteOptions(); WriteOptions writeOpt = WriteOptions();
writeOpt.disableWAL = true; writeOpt.disableWAL = true;
SetPerfLevel(kEnableTime); SetPerfLevel(kEnableTime);
;
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1")); ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
// this will now also flush the last 2 writes // this will now also flush the last 2 writes
ASSERT_OK(Flush(1)); ASSERT_OK(Flush(1));
@ -369,6 +368,7 @@ TEST_F(DBBasicTest, FLUSH) {
get_perf_context()->Reset(); get_perf_context()->Reset();
Get(1, "foo"); Get(1, "foo");
ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0); ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
ASSERT_EQ(2, (int)get_perf_context()->get_read_bytes);
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
ASSERT_EQ("v1", Get(1, "foo")); ASSERT_EQ("v1", Get(1, "foo"));
@ -725,6 +725,7 @@ TEST_F(DBBasicTest, FlushOneColumnFamily) {
TEST_F(DBBasicTest, MultiGetSimple) { TEST_F(DBBasicTest, MultiGetSimple) {
do { do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
SetPerfLevel(kEnableCount);
ASSERT_OK(Put(1, "k1", "v1")); ASSERT_OK(Put(1, "k1", "v1"));
ASSERT_OK(Put(1, "k2", "v2")); ASSERT_OK(Put(1, "k2", "v2"));
ASSERT_OK(Put(1, "k3", "v3")); ASSERT_OK(Put(1, "k3", "v3"));
@ -738,12 +739,15 @@ TEST_F(DBBasicTest, MultiGetSimple) {
std::vector<std::string> values(20, "Temporary data to be overwritten"); std::vector<std::string> values(20, "Temporary data to be overwritten");
std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]); std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
get_perf_context()->Reset();
std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values); std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
ASSERT_EQ(values.size(), keys.size()); ASSERT_EQ(values.size(), keys.size());
ASSERT_EQ(values[0], "v1"); ASSERT_EQ(values[0], "v1");
ASSERT_EQ(values[1], "v2"); ASSERT_EQ(values[1], "v2");
ASSERT_EQ(values[2], "v3"); ASSERT_EQ(values[2], "v3");
ASSERT_EQ(values[4], "v5"); ASSERT_EQ(values[4], "v5");
// four kv pairs * two bytes per value
ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
ASSERT_OK(s[0]); ASSERT_OK(s[0]);
ASSERT_OK(s[1]); ASSERT_OK(s[1]);
@ -751,6 +755,7 @@ TEST_F(DBBasicTest, MultiGetSimple) {
ASSERT_TRUE(s[3].IsNotFound()); ASSERT_TRUE(s[3].IsNotFound());
ASSERT_OK(s[4]); ASSERT_OK(s[4]);
ASSERT_TRUE(s[5].IsNotFound()); ASSERT_TRUE(s[5].IsNotFound());
SetPerfLevel(kDisable);
} while (ChangeCompactOptions()); } while (ChangeCompactOptions());
} }

@ -990,6 +990,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
size_t size = pinnable_val->size(); size_t size = pinnable_val->size();
RecordTick(stats_, BYTES_READ, size); RecordTick(stats_, BYTES_READ, size);
MeasureTime(stats_, BYTES_PER_READ, size); MeasureTime(stats_, BYTES_PER_READ, size);
PERF_COUNTER_ADD(get_read_bytes, size);
} }
return s; return s;
} }
@ -1117,6 +1118,7 @@ std::vector<Status> DBImpl::MultiGet(
RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys); RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read); RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
MeasureTime(stats_, BYTES_PER_MULTIGET, bytes_read); MeasureTime(stats_, BYTES_PER_MULTIGET, bytes_read);
PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
PERF_TIMER_STOP(get_post_process_time); PERF_TIMER_STOP(get_post_process_time);
return stat_list; return stat_list;

@ -86,6 +86,7 @@ class DBIter: public Iterator {
RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_); RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_);
RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_); RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_);
RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_); RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_);
PERF_COUNTER_ADD(iter_read_bytes, bytes_read_);
ResetCounters(); ResetCounters();
} }
@ -1014,6 +1015,7 @@ void DBIter::Seek(const Slice& target) {
if (valid_) { if (valid_) {
RecordTick(statistics_, NUMBER_DB_SEEK_FOUND); RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size()); RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
} }
} }
} else { } else {
@ -1056,6 +1058,7 @@ void DBIter::SeekForPrev(const Slice& target) {
if (valid_) { if (valid_) {
RecordTick(statistics_, NUMBER_DB_SEEK_FOUND); RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size()); RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
} }
} }
} else { } else {
@ -1094,6 +1097,7 @@ void DBIter::SeekToFirst() {
if (valid_) { if (valid_) {
RecordTick(statistics_, NUMBER_DB_SEEK_FOUND); RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size()); RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
} }
} }
} else { } else {
@ -1141,6 +1145,7 @@ void DBIter::SeekToLast() {
if (valid_) { if (valid_) {
RecordTick(statistics_, NUMBER_DB_SEEK_FOUND); RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size()); RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
} }
} }
if (valid_ && prefix_extractor_ && prefix_same_as_start_) { if (valid_ && prefix_extractor_ && prefix_same_as_start_) {

@ -1719,12 +1719,15 @@ TEST_F(DBIteratorTest, IteratorWithLocalStatistics) {
std::vector<port::Thread> threads; std::vector<port::Thread> threads;
std::function<void()> reader_func_next = [&]() { std::function<void()> reader_func_next = [&]() {
SetPerfLevel(kEnableCount);
get_perf_context()->Reset();
Iterator* iter = db_->NewIterator(ReadOptions()); Iterator* iter = db_->NewIterator(ReadOptions());
iter->SeekToFirst(); iter->SeekToFirst();
// Seek will bump ITER_BYTES_READ // Seek will bump ITER_BYTES_READ
total_bytes += iter->key().size(); uint64_t bytes = 0;
total_bytes += iter->value().size(); bytes += iter->key().size();
bytes += iter->value().size();
while (true) { while (true) {
iter->Next(); iter->Next();
total_next++; total_next++;
@ -1733,20 +1736,25 @@ TEST_F(DBIteratorTest, IteratorWithLocalStatistics) {
break; break;
} }
total_next_found++; total_next_found++;
total_bytes += iter->key().size(); bytes += iter->key().size();
total_bytes += iter->value().size(); bytes += iter->value().size();
} }
delete iter; delete iter;
ASSERT_EQ(bytes, get_perf_context()->iter_read_bytes);
SetPerfLevel(kDisable);
total_bytes += bytes;
}; };
std::function<void()> reader_func_prev = [&]() { std::function<void()> reader_func_prev = [&]() {
SetPerfLevel(kEnableCount);
Iterator* iter = db_->NewIterator(ReadOptions()); Iterator* iter = db_->NewIterator(ReadOptions());
iter->SeekToLast(); iter->SeekToLast();
// Seek will bump ITER_BYTES_READ // Seek will bump ITER_BYTES_READ
total_bytes += iter->key().size(); uint64_t bytes = 0;
total_bytes += iter->value().size(); bytes += iter->key().size();
bytes += iter->value().size();
while (true) { while (true) {
iter->Prev(); iter->Prev();
total_prev++; total_prev++;
@ -1755,11 +1763,14 @@ TEST_F(DBIteratorTest, IteratorWithLocalStatistics) {
break; break;
} }
total_prev_found++; total_prev_found++;
total_bytes += iter->key().size(); bytes += iter->key().size();
total_bytes += iter->value().size(); bytes += iter->value().size();
} }
delete iter; delete iter;
ASSERT_EQ(bytes, get_perf_context()->iter_read_bytes);
SetPerfLevel(kDisable);
total_bytes += bytes;
}; };
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {

@ -30,6 +30,11 @@ struct PerfContext {
uint64_t block_read_time; // total nanos spent on block reads uint64_t block_read_time; // total nanos spent on block reads
uint64_t block_checksum_time; // total nanos spent on block checksum uint64_t block_checksum_time; // total nanos spent on block checksum
uint64_t block_decompress_time; // total nanos spent on block decompression uint64_t block_decompress_time; // total nanos spent on block decompression
uint64_t get_read_bytes; // bytes for vals returned by Get
uint64_t multiget_read_bytes; // bytes for vals returned by MultiGet
uint64_t iter_read_bytes; // bytes for keys/vals decoded by iterator
// total number of internal keys skipped over during iteration. // total number of internal keys skipped over during iteration.
// There are several reasons for it: // There are several reasons for it:
// 1. when calling Next(), the iterator is in the position of the previous // 1. when calling Next(), the iterator is in the position of the previous

@ -40,6 +40,9 @@ void PerfContext::Reset() {
block_read_time = 0; block_read_time = 0;
block_checksum_time = 0; block_checksum_time = 0;
block_decompress_time = 0; block_decompress_time = 0;
get_read_bytes = 0;
multiget_read_bytes = 0;
iter_read_bytes = 0;
internal_key_skipped_count = 0; internal_key_skipped_count = 0;
internal_delete_skipped_count = 0; internal_delete_skipped_count = 0;
internal_recent_skipped_count = 0; internal_recent_skipped_count = 0;
@ -117,6 +120,9 @@ std::string PerfContext::ToString(bool exclude_zero_counters) const {
PERF_CONTEXT_OUTPUT(block_read_time); PERF_CONTEXT_OUTPUT(block_read_time);
PERF_CONTEXT_OUTPUT(block_checksum_time); PERF_CONTEXT_OUTPUT(block_checksum_time);
PERF_CONTEXT_OUTPUT(block_decompress_time); PERF_CONTEXT_OUTPUT(block_decompress_time);
PERF_CONTEXT_OUTPUT(get_read_bytes);
PERF_CONTEXT_OUTPUT(multiget_read_bytes);
PERF_CONTEXT_OUTPUT(iter_read_bytes);
PERF_CONTEXT_OUTPUT(internal_key_skipped_count); PERF_CONTEXT_OUTPUT(internal_key_skipped_count);
PERF_CONTEXT_OUTPUT(internal_delete_skipped_count); PERF_CONTEXT_OUTPUT(internal_delete_skipped_count);
PERF_CONTEXT_OUTPUT(internal_recent_skipped_count); PERF_CONTEXT_OUTPUT(internal_recent_skipped_count);

Loading…
Cancel
Save