Add stats related to async prefetching (#9845)

Summary:
Add stats PREFETCHED_BYTES_DISCARDED and POLL_WAIT_MICROS.
PREFETCHED_BYTES_DISCARDED records number of prefetched bytes discarded by
FilePrefetchBuffer. POLL_WAIT_MICROS records the time taken by underling
file_system Poll API.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9845

Test Plan: Update existing tests

Reviewed By: anand1976

Differential Revision: D35909694

Pulled By: akankshamahajan15

fbshipit-source-id: e009ef940bb9ed72c9446f5529095caabb8a1e36
main
Akanksha Mahajan 3 years ago committed by Facebook GitHub Bot
parent 6d2577e567
commit 3653029dda
  1. 1
      HISTORY.md
  2. 1
      file/file_prefetch_buffer.cc
  3. 23
      file/file_prefetch_buffer.h
  4. 5
      file/prefetch_test.cc
  5. 5
      include/rocksdb/statistics.h
  6. 8
      java/rocksjni/portal.h
  7. 3
      monitoring/statistics.cc
  8. 3
      table/block_based/block_based_table_reader.h

@ -5,6 +5,7 @@
### New Features ### New Features
* DB::GetLiveFilesStorageInfo is ready for production use. * DB::GetLiveFilesStorageInfo is ready for production use.
* Add new stats PREFETCHED_BYTES_DISCARDED which records number of prefetched bytes discarded by RocksDB FilePrefetchBuffer on destruction and POLL_WAIT_MICROS records wait time for FS::Poll API completion.
### Public API changes ### Public API changes
* Add rollback_deletion_type_callback to TransactionDBOptions so that write-prepared transactions know whether to issue a Delete or SingleDelete to cancel a previous key written during prior prepare phase. The PR aims to prevent mixing SingleDeletes and Deletes for the same key that can lead to undefined behaviors for write-prepared transactions. * Add rollback_deletion_type_callback to TransactionDBOptions so that write-prepared transactions know whether to issue a Delete or SingleDelete to cancel a previous key written during prior prepare phase. The PR aims to prevent mixing SingleDeletes and Deletes for the same key that can lead to undefined behaviors for write-prepared transactions.

@ -229,6 +229,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
// second buffer. // second buffer.
std::vector<void*> handles; std::vector<void*> handles;
handles.emplace_back(io_handle_); handles.emplace_back(io_handle_);
StopWatch sw(clock_, stats_, POLL_WAIT_MICROS);
fs_->Poll(handles, 1).PermitUncheckedError(); fs_->Poll(handles, 1).PermitUncheckedError();
} }

@ -14,6 +14,7 @@
#include <string> #include <string>
#include "file/readahead_file_info.h" #include "file/readahead_file_info.h"
#include "monitoring/statistics.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/file_system.h" #include "rocksdb/file_system.h"
@ -64,7 +65,8 @@ class FilePrefetchBuffer {
FilePrefetchBuffer(size_t readahead_size = 0, size_t max_readahead_size = 0, FilePrefetchBuffer(size_t readahead_size = 0, size_t max_readahead_size = 0,
bool enable = true, bool track_min_offset = false, bool enable = true, bool track_min_offset = false,
bool implicit_auto_readahead = false, bool implicit_auto_readahead = false,
bool async_io = false, FileSystem* fs = nullptr) bool async_io = false, FileSystem* fs = nullptr,
SystemClock* clock = nullptr, Statistics* stats = nullptr)
: curr_(0), : curr_(0),
readahead_size_(readahead_size), readahead_size_(readahead_size),
initial_auto_readahead_size_(readahead_size), initial_auto_readahead_size_(readahead_size),
@ -80,7 +82,9 @@ class FilePrefetchBuffer {
del_fn_(nullptr), del_fn_(nullptr),
async_read_in_progress_(false), async_read_in_progress_(false),
async_io_(async_io), async_io_(async_io),
fs_(fs) { fs_(fs),
clock_(clock),
stats_(stats) {
// If async_io_ is enabled, data is asynchronously filled in second buffer // If async_io_ is enabled, data is asynchronously filled in second buffer
// while curr_ is being consumed. If data is overlapping in two buffers, // while curr_ is being consumed. If data is overlapping in two buffers,
// data is copied to third buffer to return continuous buffer. // data is copied to third buffer to return continuous buffer.
@ -88,13 +92,24 @@ class FilePrefetchBuffer {
} }
~FilePrefetchBuffer() { ~FilePrefetchBuffer() {
// Wait for any pending async job before destroying the class object. // Abort any pending async read request before destroying the class object.
if (async_read_in_progress_ && fs_ != nullptr) { if (async_read_in_progress_ && fs_ != nullptr) {
std::vector<void*> handles; std::vector<void*> handles;
handles.emplace_back(io_handle_); handles.emplace_back(io_handle_);
Status s = fs_->AbortIO(handles); Status s = fs_->AbortIO(handles);
assert(s.ok()); assert(s.ok());
} }
// Prefetch buffer bytes discarded.
uint64_t bytes_discarded = 0;
if (bufs_[curr_].buffer_.CurrentSize() != 0) {
bytes_discarded = bufs_[curr_].buffer_.CurrentSize();
}
if (bufs_[curr_ ^ 1].buffer_.CurrentSize() != 0) {
bytes_discarded += bufs_[curr_ ^ 1].buffer_.CurrentSize();
}
RecordInHistogram(stats_, PREFETCHED_BYTES_DISCARDED, bytes_discarded);
// Release io_handle_. // Release io_handle_.
if (io_handle_ != nullptr && del_fn_ != nullptr) { if (io_handle_ != nullptr && del_fn_ != nullptr) {
del_fn_(io_handle_); del_fn_(io_handle_);
@ -273,5 +288,7 @@ class FilePrefetchBuffer {
bool async_read_in_progress_; bool async_read_in_progress_;
bool async_io_; bool async_io_;
FileSystem* fs_; FileSystem* fs_;
SystemClock* clock_;
Statistics* stats_;
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -1288,6 +1288,10 @@ TEST_P(PrefetchTestWithPosix, ReadAsyncWithPosixFS) {
{ {
HistogramData async_read_bytes; HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
HistogramData prefetched_bytes_discarded;
options.statistics->histogramData(PREFETCHED_BYTES_DISCARDED,
&prefetched_bytes_discarded);
// Not all platforms support iouring. In that case, ReadAsync in posix // Not all platforms support iouring. In that case, ReadAsync in posix
// won't submit async requests. // won't submit async requests.
if (read_async_called) { if (read_async_called) {
@ -1295,6 +1299,7 @@ TEST_P(PrefetchTestWithPosix, ReadAsyncWithPosixFS) {
} else { } else {
ASSERT_EQ(async_read_bytes.count, 0); ASSERT_EQ(async_read_bytes.count, 0);
} }
ASSERT_GT(prefetched_bytes_discarded.count, 0);
} }
} }

@ -536,7 +536,12 @@ enum Histograms : uint32_t {
// Error handler statistics // Error handler statistics
ERROR_HANDLER_AUTORESUME_RETRY_COUNT, ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
// Stats related to asynchronous read requests.
ASYNC_READ_BYTES, ASYNC_READ_BYTES,
POLL_WAIT_MICROS,
// Number of prefetched bytes discarded by RocksDB.
PREFETCHED_BYTES_DISCARDED,
HISTOGRAM_ENUM_MAX, HISTOGRAM_ENUM_MAX,
}; };

@ -5589,6 +5589,10 @@ class HistogramTypeJni {
return 0x32; return 0x32;
case ROCKSDB_NAMESPACE::Histograms::ASYNC_READ_BYTES: case ROCKSDB_NAMESPACE::Histograms::ASYNC_READ_BYTES:
return 0x33; return 0x33;
case ROCKSDB_NAMESPACE::Histograms::POLL_WAIT_MICROS:
return 0x34;
case ROCKSDB_NAMESPACE::Histograms::PREFETCHED_BYTES_DISCARDED:
return 0x35;
case ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX: case ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX:
// 0x1F for backwards compatibility on current minor version. // 0x1F for backwards compatibility on current minor version.
return 0x1F; return 0x1F;
@ -5708,6 +5712,10 @@ class HistogramTypeJni {
ERROR_HANDLER_AUTORESUME_RETRY_COUNT; ERROR_HANDLER_AUTORESUME_RETRY_COUNT;
case 0x33: case 0x33:
return ROCKSDB_NAMESPACE::Histograms::ASYNC_READ_BYTES; return ROCKSDB_NAMESPACE::Histograms::ASYNC_READ_BYTES;
case 0x34:
return ROCKSDB_NAMESPACE::Histograms::POLL_WAIT_MICROS;
case 0x35:
return ROCKSDB_NAMESPACE::Histograms::PREFETCHED_BYTES_DISCARDED;
case 0x1F: case 0x1F:
// 0x1F for backwards compatibility on current minor version. // 0x1F for backwards compatibility on current minor version.
return ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX; return ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX;

@ -284,6 +284,9 @@ const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
{ERROR_HANDLER_AUTORESUME_RETRY_COUNT, {ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
"rocksdb.error.handler.autoresume.retry.count"}, "rocksdb.error.handler.autoresume.retry.count"},
{ASYNC_READ_BYTES, "rocksdb.async.read.bytes"}, {ASYNC_READ_BYTES, "rocksdb.async.read.bytes"},
{POLL_WAIT_MICROS, "rocksdb.poll.wait.micros"},
{PREFETCHED_BYTES_DISCARDED, "rocksdb.prefetched.bytes.discarded"},
}; };
std::shared_ptr<Statistics> CreateDBStatistics() { std::shared_ptr<Statistics> CreateDBStatistics() {

@ -666,7 +666,8 @@ struct BlockBasedTable::Rep {
fpb->reset(new FilePrefetchBuffer( fpb->reset(new FilePrefetchBuffer(
readahead_size, max_readahead_size, readahead_size, max_readahead_size,
!ioptions.allow_mmap_reads /* enable */, false /* track_min_offset */, !ioptions.allow_mmap_reads /* enable */, false /* track_min_offset */,
implicit_auto_readahead, async_io, ioptions.fs.get())); implicit_auto_readahead, async_io, ioptions.fs.get(), ioptions.clock,
ioptions.stats));
} }
void CreateFilePrefetchBufferIfNotExists( void CreateFilePrefetchBufferIfNotExists(

Loading…
Cancel
Save