Update stats for Read and ReadAsync in random_access_file_reader for async prefetching (#9810)

Summary:
Update stats in random_access_file_reader for Read and
ReadAsync API to take into account the read latency for async
prefetching.

It also fixes ERROR_HANDLER_AUTORESUME_RETRY_COUNT stat whose value was
incorrect in portal.h

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9810

Test Plan: Update unit test

Reviewed By: anand1976

Differential Revision: D35433081

Pulled By: akankshamahajan15

fbshipit-source-id: aeec3901270e58a003ce6b5214bd25ddcb3a12a9
main
Akanksha Mahajan 2 years ago committed by Facebook GitHub Bot
parent 49623f9c8e
commit 0b8f885939
  1. 2
      HISTORY.md
  2. 54
      file/prefetch_test.cc
  3. 62
      file/random_access_file_reader.cc
  4. 11
      file/random_access_file_reader.h
  5. 2
      include/rocksdb/statistics.h
  6. 6
      java/rocksjni/portal.h
  7. 2
      java/src/main/java/org/rocksdb/HistogramType.java
  8. 1
      monitoring/statistics.cc

@ -8,11 +8,13 @@
* Fixed `file_type`, `relative_filename` and `directory` fields returned by `GetLiveFilesMetaData()`, which were added in inheriting from `FileStorageInfo`.
* Fixed a bug affecting `track_and_verify_wals_in_manifest`. Without the fix, application may see "open error: Corruption: Missing WAL with log number" while trying to open the db. The corruption is a false alarm but prevents DB open (#9766).
* Fix segfault in FilePrefetchBuffer with async_io as it doesn't wait for pending jobs to complete on destruction.
* Fix ERROR_HANDLER_AUTORESUME_RETRY_COUNT stat whose value was set wrong in portal.h
### New Features
* For db_bench when --seed=0 or --seed is not set then it uses the current time as the seed value. Previously it used the value 1000.
* For db_bench when --benchmark lists multiple tests and each test uses a seed for a RNG then the seeds across tests will no longer be repeated.
* Added an option to dynamically charge an updating estimated memory usage of block-based table reader to block cache if block cache available. To enable this feature, set `BlockBasedTableOptions::reserve_table_reader_memory = true`.
* Add new stat ASYNC_READ_BYTES that calculates number of bytes read during async read call and users can check if async code path is being called by RocksDB internal automatic prefetching for sequential reads.
### Behavior changes
* Disallow usage of commit-time-write-batch for write-prepared/write-unprepared transactions if TransactionOptions::use_only_the_last_commit_time_batch_for_recovery is false to prevent two (or more) uncommitted versions of the same key in the database. Otherwise, bottommost compaction may violate the internal key uniqueness invariant of SSTs if the sequence numbers of both internal keys are zeroed out (#9794).

@ -694,8 +694,10 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.statistics = CreateDBStatistics();
options.env = env.get();
if (std::get<0>(GetParam())) {
bool use_direct_io = std::get<0>(GetParam());
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
@ -708,8 +710,7 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (std::get<0>(GetParam()) &&
(s.IsNotSupported() || s.IsInvalidArgument())) {
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test
return;
} else {
@ -766,6 +767,8 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
// TODO akanksha: Remove after adding new units.
ro.async_io = true;
}
ASSERT_OK(options.statistics->Reset());
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
@ -773,15 +776,25 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
num_keys++;
}
ASSERT_EQ(num_keys, total_keys);
ASSERT_GT(buff_prefetch_count, 0);
buff_prefetch_count = 0;
// For index and data blocks.
if (is_adaptive_readahead) {
ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1));
} else {
ASSERT_EQ(readahead_carry_over_count, 0);
}
// Check stats to make sure async prefetch is done.
{
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
if (ro.async_io && !use_direct_io) {
ASSERT_GT(async_read_bytes.count, 0);
} else {
ASSERT_EQ(async_read_bytes.count, 0);
}
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
@ -902,6 +915,8 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); // 8MB
table_options.block_cache = cache;
@ -948,7 +963,6 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
SyncPoint::GetInstance()->EnableProcessing();
ReadOptions ro;
ro.adaptive_readahead = true;
// TODO akanksha: Remove after adding new units.
ro.async_io = true;
{
/*
@ -964,7 +978,9 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
iter->Seek(BuildKey(1019));
buff_prefetch_count = 0;
}
{
ASSERT_OK(options.statistics->Reset());
// After caching, blocks will be read from cache (Sequential blocks)
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
iter->Seek(BuildKey(0));
@ -1008,6 +1024,18 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(current_readahead_size, expected_current_readahead_size);
ASSERT_EQ(buff_prefetch_count, 2);
// Check stats to make sure async prefetch is done.
{
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
if (GetParam()) {
ASSERT_EQ(async_read_bytes.count, 0);
} else {
ASSERT_GT(async_read_bytes.count, 0);
}
}
buff_prefetch_count = 0;
}
Close();
@ -1033,6 +1061,7 @@ TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) {
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
options.statistics = CreateDBStatistics();
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
@ -1080,6 +1109,7 @@ TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) {
ro.adaptive_readahead = true;
ro.async_io = true;
ASSERT_OK(options.statistics->Reset());
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
@ -1088,7 +1118,19 @@ TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) {
}
ASSERT_EQ(num_keys, total_keys);
ASSERT_GT(buff_prefetch_count, 0);
// Check stats to make sure async prefetch is done.
{
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
#if defined(ROCKSDB_IOURING_PRESENT)
ASSERT_GT(async_read_bytes.count, 0);
#else
ASSERT_EQ(async_read_bytes.count, 0);
#endif
}
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();

@ -425,19 +425,75 @@ IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro,
}
}
// TODO akanksha: Add perf_times etc.
// TODO akanksha:
// 1. Handle use_direct_io case which currently calls Read API.
IOStatus RandomAccessFileReader::ReadAsync(
FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg,
void** io_handle, IOHandleDeleter* del_fn,
Env::IOPriority rate_limiter_priority) {
if (use_direct_io()) {
// For direct_io, it calls Read API.
req.status = Read(opts, req.offset, req.len, &(req.result), req.scratch,
nullptr /*dbg*/, rate_limiter_priority);
cb(req, cb_arg);
return IOStatus::OK();
}
return file_->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn,
nullptr /*dbg*/);
// Create a callback and populate info.
auto read_async_callback =
std::bind(&RandomAccessFileReader::ReadAsyncCallback, this,
std::placeholders::_1, std::placeholders::_2);
ReadAsyncInfo* read_async_info = new ReadAsyncInfo;
read_async_info->cb_ = cb;
read_async_info->cb_arg_ = cb_arg;
read_async_info->start_time_ = clock_->NowMicros();
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
read_async_info->fs_start_ts_ = FileOperationInfo::StartNow();
}
#endif
IOStatus s = file_->ReadAsync(req, opts, read_async_callback, read_async_info,
io_handle, del_fn, nullptr /*dbg*/);
if (!s.ok()) {
delete read_async_info;
}
return s;
}
void RandomAccessFileReader::ReadAsyncCallback(const FSReadRequest& req,
void* cb_arg) {
ReadAsyncInfo* read_async_info = static_cast<ReadAsyncInfo*>(cb_arg);
assert(read_async_info);
assert(read_async_info->cb_);
read_async_info->cb_(req, read_async_info->cb_arg_);
// Update stats and notify listeners.
if (stats_ != nullptr && file_read_hist_ != nullptr) {
// elapsed doesn't take into account delay and overwrite as StopWatch does
// in Read.
uint64_t elapsed = clock_->NowMicros() - read_async_info->start_time_;
file_read_hist_->Add(elapsed);
}
if (req.status.ok()) {
RecordInHistogram(stats_, ASYNC_READ_BYTES, req.result.size());
}
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
NotifyOnFileReadFinish(req.offset, req.result.size(),
read_async_info->fs_start_ts_, finish_ts,
req.status);
}
if (!req.status.ok()) {
NotifyOnIOError(req.status, FileOperationType::kRead, file_name(),
req.result.size(), req.offset);
}
#endif
RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size());
delete read_async_info;
}
} // namespace ROCKSDB_NAMESPACE

@ -92,6 +92,15 @@ class RandomAccessFileReader {
const Temperature file_temperature_;
const bool is_last_level_;
struct ReadAsyncInfo {
#ifndef ROCKSDB_LITE
FileOperationInfo::StartTimePoint fs_start_ts_;
#endif
uint64_t start_time_;
std::function<void(const FSReadRequest&, void*)> cb_;
void* cb_arg_;
};
public:
explicit RandomAccessFileReader(
std::unique_ptr<FSRandomAccessFile>&& raf, const std::string& _file_name,
@ -179,5 +188,7 @@ class RandomAccessFileReader {
std::function<void(const FSReadRequest&, void*)> cb,
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
Env::IOPriority rate_limiter_priority);
void ReadAsyncCallback(const FSReadRequest& req, void* cb_arg);
};
} // namespace ROCKSDB_NAMESPACE

@ -536,6 +536,8 @@ enum Histograms : uint32_t {
// Error handler statistics
ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
ASYNC_READ_BYTES,
HISTOGRAM_ENUM_MAX,
};

@ -5586,7 +5586,9 @@ class HistogramTypeJni {
case ROCKSDB_NAMESPACE::Histograms::NUM_SST_READ_PER_LEVEL:
return 0x31;
case ROCKSDB_NAMESPACE::Histograms::ERROR_HANDLER_AUTORESUME_RETRY_COUNT:
return 0x31;
return 0x32;
case ROCKSDB_NAMESPACE::Histograms::ASYNC_READ_BYTES:
return 0x33;
case ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX:
// 0x1F for backwards compatibility on current minor version.
return 0x1F;
@ -5704,6 +5706,8 @@ class HistogramTypeJni {
case 0x32:
return ROCKSDB_NAMESPACE::Histograms::
ERROR_HANDLER_AUTORESUME_RETRY_COUNT;
case 0x33:
return ROCKSDB_NAMESPACE::Histograms::ASYNC_READ_BYTES;
case 0x1F:
// 0x1F for backwards compatibility on current minor version.
return ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX;

@ -180,6 +180,8 @@ public enum HistogramType {
*/
ERROR_HANDLER_AUTORESUME_RETRY_COUNT((byte) 0x32),
ASYNC_READ_BYTES((byte) 0x33),
// 0x1F for backwards compatibility on current minor version.
HISTOGRAM_ENUM_MAX((byte) 0x1F);

@ -283,6 +283,7 @@ const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
{NUM_SST_READ_PER_LEVEL, "rocksdb.num.sst.read.per.level"},
{ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
"rocksdb.error.handler.autoresume.retry.count"},
{ASYNC_READ_BYTES, "rocksdb.async.read.bytes"},
};
std::shared_ptr<Statistics> CreateDBStatistics() {

Loading…
Cancel
Save