From 0b8f885939b2b7254f7a20191dc73467f252c530 Mon Sep 17 00:00:00 2001 From: Akanksha Mahajan Date: Wed, 6 Apr 2022 14:26:53 -0700 Subject: [PATCH] Update stats for Read and ReadAsync in random_access_file_reader for async prefetching (#9810) Summary: Update stats in random_access_file_reader for Read and ReadAsync API to take into account the read latency for async prefetching. It also fixes ERROR_HANDLER_AUTORESUME_RETRY_COUNT stat whose value was incorrect in portal.h Pull Request resolved: https://github.com/facebook/rocksdb/pull/9810 Test Plan: Update unit test Reviewed By: anand1976 Differential Revision: D35433081 Pulled By: akankshamahajan15 fbshipit-source-id: aeec3901270e58a003ce6b5214bd25ddcb3a12a9 --- HISTORY.md | 2 + file/prefetch_test.cc | 54 ++++++++++++++-- file/random_access_file_reader.cc | 62 ++++++++++++++++++- file/random_access_file_reader.h | 11 ++++ include/rocksdb/statistics.h | 2 + java/rocksjni/portal.h | 6 +- .../main/java/org/rocksdb/HistogramType.java | 2 + monitoring/statistics.cc | 1 + 8 files changed, 130 insertions(+), 10 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index ef55f6f56..15b66aeb2 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,11 +8,13 @@ * Fixed `file_type`, `relative_filename` and `directory` fields returned by `GetLiveFilesMetaData()`, which were added in inheriting from `FileStorageInfo`. * Fixed a bug affecting `track_and_verify_wals_in_manifest`. Without the fix, application may see "open error: Corruption: Missing WAL with log number" while trying to open the db. The corruption is a false alarm but prevents DB open (#9766). * Fix segfault in FilePrefetchBuffer with async_io as it doesn't wait for pending jobs to complete on destruction. +* Fix ERROR_HANDLER_AUTORESUME_RETRY_COUNT stat whose value was set wrong in portal.h ### New Features * For db_bench when --seed=0 or --seed is not set then it uses the current time as the seed value. Previously it used the value 1000. * For db_bench when --benchmark lists multiple tests and each test uses a seed for a RNG then the seeds across tests will no longer be repeated. * Added an option to dynamically charge an updating estimated memory usage of block-based table reader to block cache if block cache available. To enable this feature, set `BlockBasedTableOptions::reserve_table_reader_memory = true`. +* Add new stat ASYNC_READ_BYTES that calculates number of bytes read during async read call and users can check if async code path is being called by RocksDB internal automatic prefetching for sequential reads. ### Behavior changes * Disallow usage of commit-time-write-batch for write-prepared/write-unprepared transactions if TransactionOptions::use_only_the_last_commit_time_batch_for_recovery is false to prevent two (or more) uncommitted versions of the same key in the database. Otherwise, bottommost compaction may violate the internal key uniqueness invariant of SSTs if the sequence numbers of both internal keys are zeroed out (#9794). diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index 9319b9973..3e9bb7156 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -694,8 +694,10 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) { options.write_buffer_size = 1024; options.create_if_missing = true; options.compression = kNoCompression; + options.statistics = CreateDBStatistics(); options.env = env.get(); - if (std::get<0>(GetParam())) { + bool use_direct_io = std::get<0>(GetParam()); + if (use_direct_io) { options.use_direct_reads = true; options.use_direct_io_for_flush_and_compaction = true; } @@ -708,8 +710,7 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) { options.table_factory.reset(NewBlockBasedTableFactory(table_options)); Status s = TryReopen(options); - if (std::get<0>(GetParam()) && - (s.IsNotSupported() || s.IsInvalidArgument())) { + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { // If direct IO is not supported, skip the test return; } else { @@ -766,6 +767,8 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) { // TODO akanksha: Remove after adding new units. ro.async_io = true; } + + ASSERT_OK(options.statistics->Reset()); auto iter = std::unique_ptr(db_->NewIterator(ro)); int num_keys = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { @@ -773,15 +776,25 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) { num_keys++; } ASSERT_EQ(num_keys, total_keys); - ASSERT_GT(buff_prefetch_count, 0); - buff_prefetch_count = 0; // For index and data blocks. if (is_adaptive_readahead) { ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1)); } else { ASSERT_EQ(readahead_carry_over_count, 0); } + + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + if (ro.async_io && !use_direct_io) { + ASSERT_GT(async_read_bytes.count, 0); + } else { + ASSERT_EQ(async_read_bytes.count, 0); + } + } + SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); } @@ -902,6 +915,8 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { options.use_direct_reads = true; options.use_direct_io_for_flush_and_compaction = true; } + + options.statistics = CreateDBStatistics(); BlockBasedTableOptions table_options; std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); // 8MB table_options.block_cache = cache; @@ -948,7 +963,6 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { SyncPoint::GetInstance()->EnableProcessing(); ReadOptions ro; ro.adaptive_readahead = true; - // TODO akanksha: Remove after adding new units. ro.async_io = true; { /* @@ -964,7 +978,9 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { iter->Seek(BuildKey(1019)); buff_prefetch_count = 0; } + { + ASSERT_OK(options.statistics->Reset()); // After caching, blocks will be read from cache (Sequential blocks) auto iter = std::unique_ptr(db_->NewIterator(ro)); iter->Seek(BuildKey(0)); @@ -1008,6 +1024,18 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { ASSERT_TRUE(iter->Valid()); ASSERT_EQ(current_readahead_size, expected_current_readahead_size); ASSERT_EQ(buff_prefetch_count, 2); + + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + if (GetParam()) { + ASSERT_EQ(async_read_bytes.count, 0); + } else { + ASSERT_GT(async_read_bytes.count, 0); + } + } + buff_prefetch_count = 0; } Close(); @@ -1033,6 +1061,7 @@ TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) { options.create_if_missing = true; options.compression = kNoCompression; options.env = env.get(); + options.statistics = CreateDBStatistics(); if (use_direct_io) { options.use_direct_reads = true; options.use_direct_io_for_flush_and_compaction = true; @@ -1080,6 +1109,7 @@ TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) { ro.adaptive_readahead = true; ro.async_io = true; + ASSERT_OK(options.statistics->Reset()); auto iter = std::unique_ptr(db_->NewIterator(ro)); int num_keys = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { @@ -1088,7 +1118,19 @@ TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) { } ASSERT_EQ(num_keys, total_keys); ASSERT_GT(buff_prefetch_count, 0); + + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); +#if defined(ROCKSDB_IOURING_PRESENT) + ASSERT_GT(async_read_bytes.count, 0); +#else + ASSERT_EQ(async_read_bytes.count, 0); +#endif + } } + SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index 0d354cfa4..a919b6298 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -425,19 +425,75 @@ IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro, } } -// TODO akanksha: Add perf_times etc. +// TODO akanksha: +// 1. Handle use_direct_io case which currently calls Read API. IOStatus RandomAccessFileReader::ReadAsync( FSReadRequest& req, const IOOptions& opts, std::function cb, void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, Env::IOPriority rate_limiter_priority) { if (use_direct_io()) { + // For direct_io, it calls Read API. req.status = Read(opts, req.offset, req.len, &(req.result), req.scratch, nullptr /*dbg*/, rate_limiter_priority); cb(req, cb_arg); return IOStatus::OK(); } - return file_->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, - nullptr /*dbg*/); + + // Create a callback and populate info. + auto read_async_callback = + std::bind(&RandomAccessFileReader::ReadAsyncCallback, this, + std::placeholders::_1, std::placeholders::_2); + ReadAsyncInfo* read_async_info = new ReadAsyncInfo; + read_async_info->cb_ = cb; + read_async_info->cb_arg_ = cb_arg; + read_async_info->start_time_ = clock_->NowMicros(); + +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + read_async_info->fs_start_ts_ = FileOperationInfo::StartNow(); + } +#endif + + IOStatus s = file_->ReadAsync(req, opts, read_async_callback, read_async_info, + io_handle, del_fn, nullptr /*dbg*/); + if (!s.ok()) { + delete read_async_info; + } + return s; +} + +void RandomAccessFileReader::ReadAsyncCallback(const FSReadRequest& req, + void* cb_arg) { + ReadAsyncInfo* read_async_info = static_cast(cb_arg); + assert(read_async_info); + assert(read_async_info->cb_); + + read_async_info->cb_(req, read_async_info->cb_arg_); + + // Update stats and notify listeners. + if (stats_ != nullptr && file_read_hist_ != nullptr) { + // elapsed doesn't take into account delay and overwrite as StopWatch does + // in Read. + uint64_t elapsed = clock_->NowMicros() - read_async_info->start_time_; + file_read_hist_->Add(elapsed); + } + if (req.status.ok()) { + RecordInHistogram(stats_, ASYNC_READ_BYTES, req.result.size()); + } +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + auto finish_ts = FileOperationInfo::FinishNow(); + NotifyOnFileReadFinish(req.offset, req.result.size(), + read_async_info->fs_start_ts_, finish_ts, + req.status); + } + if (!req.status.ok()) { + NotifyOnIOError(req.status, FileOperationType::kRead, file_name(), + req.result.size(), req.offset); + } +#endif + RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size()); + delete read_async_info; } } // namespace ROCKSDB_NAMESPACE diff --git a/file/random_access_file_reader.h b/file/random_access_file_reader.h index 26e48478d..55c156f40 100644 --- a/file/random_access_file_reader.h +++ b/file/random_access_file_reader.h @@ -92,6 +92,15 @@ class RandomAccessFileReader { const Temperature file_temperature_; const bool is_last_level_; + struct ReadAsyncInfo { +#ifndef ROCKSDB_LITE + FileOperationInfo::StartTimePoint fs_start_ts_; +#endif + uint64_t start_time_; + std::function cb_; + void* cb_arg_; + }; + public: explicit RandomAccessFileReader( std::unique_ptr&& raf, const std::string& _file_name, @@ -179,5 +188,7 @@ class RandomAccessFileReader { std::function cb, void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, Env::IOPriority rate_limiter_priority); + + void ReadAsyncCallback(const FSReadRequest& req, void* cb_arg); }; } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 51cc8079b..dcd3fc333 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -536,6 +536,8 @@ enum Histograms : uint32_t { // Error handler statistics ERROR_HANDLER_AUTORESUME_RETRY_COUNT, + ASYNC_READ_BYTES, + HISTOGRAM_ENUM_MAX, }; diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index 694db7b15..a94b39065 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -5586,7 +5586,9 @@ class HistogramTypeJni { case ROCKSDB_NAMESPACE::Histograms::NUM_SST_READ_PER_LEVEL: return 0x31; case ROCKSDB_NAMESPACE::Histograms::ERROR_HANDLER_AUTORESUME_RETRY_COUNT: - return 0x31; + return 0x32; + case ROCKSDB_NAMESPACE::Histograms::ASYNC_READ_BYTES: + return 0x33; case ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX: // 0x1F for backwards compatibility on current minor version. return 0x1F; @@ -5704,6 +5706,8 @@ class HistogramTypeJni { case 0x32: return ROCKSDB_NAMESPACE::Histograms:: ERROR_HANDLER_AUTORESUME_RETRY_COUNT; + case 0x33: + return ROCKSDB_NAMESPACE::Histograms::ASYNC_READ_BYTES; case 0x1F: // 0x1F for backwards compatibility on current minor version. return ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX; diff --git a/java/src/main/java/org/rocksdb/HistogramType.java b/java/src/main/java/org/rocksdb/HistogramType.java index 5953a7d9b..d5f7da5e0 100644 --- a/java/src/main/java/org/rocksdb/HistogramType.java +++ b/java/src/main/java/org/rocksdb/HistogramType.java @@ -180,6 +180,8 @@ public enum HistogramType { */ ERROR_HANDLER_AUTORESUME_RETRY_COUNT((byte) 0x32), + ASYNC_READ_BYTES((byte) 0x33), + // 0x1F for backwards compatibility on current minor version. HISTOGRAM_ENUM_MAX((byte) 0x1F); diff --git a/monitoring/statistics.cc b/monitoring/statistics.cc index 99f0a3e36..566feb189 100644 --- a/monitoring/statistics.cc +++ b/monitoring/statistics.cc @@ -283,6 +283,7 @@ const std::vector> HistogramsNameMap = { {NUM_SST_READ_PER_LEVEL, "rocksdb.num.sst.read.per.level"}, {ERROR_HANDLER_AUTORESUME_RETRY_COUNT, "rocksdb.error.handler.autoresume.retry.count"}, + {ASYNC_READ_BYTES, "rocksdb.async.read.bytes"}, }; std::shared_ptr CreateDBStatistics() {