Provide support for IOTracing for ReadAsync API (#9833)

Summary:
Same as title

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9833

Test Plan:
Add unit test and manually check the output of tracing logs
For fixed readahead_size it logs as:
```
Access Time : 193352113447923     , File Name: 000026.sst          , File Operation: ReadAsync         , Latency: 15075     , IO Status: OK, Length: 12288, Offset: 659456
Access Time : 193352113465232     , File Name: 000026.sst          , File Operation: ReadAsync         , Latency: 14425     , IO Status: OK, Length: 12288, Offset: 671744
Access Time : 193352113481539     , File Name: 000026.sst          , File Operation: ReadAsync         , Latency: 13062     , IO Status: OK, Length: 12288, Offset: 684032
Access Time : 193352113497692     , File Name: 000026.sst          , File Operation: ReadAsync         , Latency: 13649     , IO Status: OK, Length: 12288, Offset: 696320
Access Time : 193352113520043     , File Name: 000026.sst          , File Operation: ReadAsync         , Latency: 19384     , IO Status: OK, Length: 12288, Offset: 708608
Access Time : 193352113538401     , File Name: 000026.sst          , File Operation: ReadAsync         , Latency: 15406     , IO Status: OK, Length: 12288, Offset: 720896
Access Time : 193352113554855     , File Name: 000026.sst          , File Operation: ReadAsync         , Latency: 13670     , IO Status: OK, Length: 12288, Offset: 733184
Access Time : 193352113571624     , File Name: 000026.sst          , File Operation: ReadAsync         , Latency: 13855     , IO Status: OK, Length: 12288, Offset: 745472
Access Time : 193352113587924     , File Name: 000026.sst          , File Operation: ReadAsync         , Latency: 13953     , IO Status: OK, Length: 12288, Offset: 757760
Access Time : 193352113603285     , File Name: 000026.sst          , File Operation: Prefetch          , Latency: 59        , IO Status: Not implemented: Prefetch not supported, Length: 8868, Offset: 898349
```

For implicit readahead:
```
Access Time : 193351865156587     , File Name: 000026.sst          , File Operation: Prefetch          , Latency: 48        , IO Status: Not implemented: Prefetch not supported, Length: 12266, Offset: 391174
Access Time : 193351865160354     , File Name: 000026.sst          , File Operation: Prefetch          , Latency: 51        , IO Status: Not implemented: Prefetch not supported, Length: 12266, Offset: 395248
Access Time : 193351865164253     , File Name: 000026.sst          , File Operation: Prefetch          , Latency: 49        , IO Status: Not implemented: Prefetch not supported, Length: 12266, Offset: 399322
Access Time : 193351865165461     , File Name: 000026.sst          , File Operation: ReadAsync         , Latency: 222871    , IO Status: OK, Length: 135168, Offset: 401408
```

Reviewed By: anand1976

Differential Revision: D35601634

Pulled By: akankshamahajan15

fbshipit-source-id: 5a4f32a850af878efa0767bd5706380152a1f26e
main
Akanksha Mahajan 2 years ago committed by Facebook GitHub Bot
parent 5490da20a5
commit 28ea1fb44a
  1. 3
      HISTORY.md
  2. 2
      Makefile
  3. 45
      env/file_system_tracer.cc
  4. 14
      env/file_system_tracer.h
  5. 157
      file/prefetch_test.cc
  6. 7
      file/random_access_file_reader.cc

@ -6,6 +6,9 @@
* Add two-phase commit support to C API.
* Add `rocksdb_transaction_get_writebatch_wi` and `rocksdb_transaction_rebuild_from_writebatch` to C API.
### New Features
* Add FileSystem::ReadAsync API in io_tracing
## 7.3.0 (05/20/2022)
### Bug Fixes
* Fixed a bug where manual flush would block forever even though flush options had wait=false.

@ -1876,7 +1876,7 @@ testutil_test: $(OBJ_DIR)/test_util/testutil_test.o $(TEST_LIBRARY) $(LIBRARY)
io_tracer_test: $(OBJ_DIR)/trace_replay/io_tracer_test.o $(OBJ_DIR)/trace_replay/io_tracer.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
prefetch_test: $(OBJ_DIR)/file/prefetch_test.o $(TEST_LIBRARY) $(LIBRARY)
prefetch_test: $(OBJ_DIR)/file/prefetch_test.o $(OBJ_DIR)/tools/io_tracer_parser_tool.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
io_tracer_parser_test: $(OBJ_DIR)/tools/io_tracer_parser_test.o $(OBJ_DIR)/tools/io_tracer_parser_tool.o $(TEST_LIBRARY) $(LIBRARY)

@ -338,6 +338,51 @@ IOStatus FSRandomAccessFileTracingWrapper::InvalidateCache(size_t offset,
return s;
}
IOStatus FSRandomAccessFileTracingWrapper::ReadAsync(
FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg,
void** io_handle, IOHandleDeleter* del_fn, IODebugContext* dbg) {
// Create a callback and populate info.
auto read_async_callback =
std::bind(&FSRandomAccessFileTracingWrapper::ReadAsyncCallback, this,
std::placeholders::_1, std::placeholders::_2);
ReadAsyncCallbackInfo* read_async_cb_info = new ReadAsyncCallbackInfo;
read_async_cb_info->cb_ = cb;
read_async_cb_info->cb_arg_ = cb_arg;
read_async_cb_info->start_time_ = clock_->NowNanos();
read_async_cb_info->file_op_ = __func__;
IOStatus s = target()->ReadAsync(req, opts, read_async_callback,
read_async_cb_info, io_handle, del_fn, dbg);
if (!s.ok()) {
delete read_async_cb_info;
}
return s;
}
void FSRandomAccessFileTracingWrapper::ReadAsyncCallback(
const FSReadRequest& req, void* cb_arg) {
ReadAsyncCallbackInfo* read_async_cb_info =
static_cast<ReadAsyncCallbackInfo*>(cb_arg);
assert(read_async_cb_info);
assert(read_async_cb_info->cb_);
uint64_t elapsed = clock_->NowNanos() - read_async_cb_info->start_time_;
uint64_t io_op_data = 0;
io_op_data |= (1 << IOTraceOp::kIOLen);
io_op_data |= (1 << IOTraceOp::kIOOffset);
IOTraceRecord io_record(clock_->NowNanos(), TraceType::kIOTracer, io_op_data,
read_async_cb_info->file_op_, elapsed,
req.status.ToString(), file_name_, req.result.size(),
req.offset);
io_tracer_->WriteIOOp(io_record, nullptr /*dbg*/);
// call the underlying callback.
read_async_cb_info->cb_(req, read_async_cb_info->cb_arg_);
delete read_async_cb_info;
}
IOStatus FSWritableFileTracingWrapper::Append(const Slice& data,
const IOOptions& options,
IODebugContext* dbg) {

@ -228,11 +228,25 @@ class FSRandomAccessFileTracingWrapper : public FSRandomAccessFileOwnerWrapper {
IOStatus InvalidateCache(size_t offset, size_t length) override;
IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb,
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
IODebugContext* dbg) override;
void ReadAsyncCallback(const FSReadRequest& req, void* cb_arg);
private:
std::shared_ptr<IOTracer> io_tracer_;
SystemClock* clock_;
// Stores file name instead of full path.
std::string file_name_;
struct ReadAsyncCallbackInfo {
uint64_t start_time_;
std::function<void(const FSReadRequest&, void*)> cb_;
void* cb_arg_;
std::string file_op_;
};
};
// The FSRandomAccessFilePtr is a wrapper class that takes pointer to storage

@ -5,6 +5,9 @@
#include "db/db_test_util.h"
#include "test_util/sync_point.h"
#ifdef GFLAGS
#include "tools/io_tracer_parser_tool.h"
#endif
namespace ROCKSDB_NAMESPACE {
@ -1266,6 +1269,33 @@ class PrefetchTestWithPosix : public DBTestBase,
public ::testing::WithParamInterface<bool> {
public:
PrefetchTestWithPosix() : DBTestBase("prefetch_test_with_posix", true) {}
#ifndef ROCKSDB_LITE
#ifdef GFLAGS
const int kMaxArgCount = 100;
const size_t kArgBufferSize = 100000;
void RunIOTracerParserTool(std::string trace_file) {
std::vector<std::string> params = {"./io_tracer_parser",
"-io_trace_file=" + trace_file};
char arg_buffer[kArgBufferSize];
char* argv[kMaxArgCount];
int argc = 0;
int cursor = 0;
for (const auto& arg : params) {
ASSERT_LE(cursor + arg.size() + 1, kArgBufferSize);
ASSERT_LE(argc + 1, kMaxArgCount);
snprintf(arg_buffer + cursor, arg.size() + 1, "%s", arg.c_str());
argv[argc++] = arg_buffer + cursor;
cursor += static_cast<int>(arg.size()) + 1;
}
ASSERT_EQ(0, ROCKSDB_NAMESPACE::io_tracer_parser(argc, argv));
}
#endif // GFLAGS
#endif // ROCKSDB_LITE
};
INSTANTIATE_TEST_CASE_P(PrefetchTestWithPosix, PrefetchTestWithPosix,
@ -1433,6 +1463,133 @@ TEST_P(PrefetchTestWithPosix, ReadAsyncWithPosixFS) {
Close();
}
#ifndef ROCKSDB_LITE
#ifdef GFLAGS
TEST_P(PrefetchTestWithPosix, TraceReadAsyncWithCallbackWrapper) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
}
const int kNumKeys = 1000;
std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(
FileSystem::Default(), /*support_prefetch=*/false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = false;
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
options.statistics = CreateDBStatistics();
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test
return;
} else {
ASSERT_OK(s);
}
int total_keys = 0;
// Write the keys.
{
WriteBatch batch;
Random rnd(309);
for (int j = 0; j < 5; j++) {
for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
total_keys++;
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
}
MoveFilesToLevel(2);
}
int buff_prefetch_count = 0;
bool read_async_called = false;
ReadOptions ro;
ro.adaptive_readahead = true;
ro.async_io = true;
if (GetParam()) {
ro.readahead_size = 16 * 1024;
}
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::PrefetchAsyncInternal:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"UpdateResults::io_uring_result",
[&](void* /*arg*/) { read_async_called = true; });
SyncPoint::GetInstance()->EnableProcessing();
// Read the keys.
{
// Start io_tracing.
WriteOptions write_opt;
TraceOptions trace_opt;
std::unique_ptr<TraceWriter> trace_writer;
std::string trace_file_path = dbname_ + "/io_trace_file";
ASSERT_OK(
NewFileTraceWriter(env_, EnvOptions(), trace_file_path, &trace_writer));
ASSERT_OK(db_->StartIOTrace(trace_opt, std::move(trace_writer)));
ASSERT_OK(options.statistics->Reset());
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
num_keys++;
}
// End the tracing.
ASSERT_OK(db_->EndIOTrace());
ASSERT_OK(env_->FileExists(trace_file_path));
ASSERT_EQ(num_keys, total_keys);
ASSERT_GT(buff_prefetch_count, 0);
// Check stats to make sure async prefetch is done.
{
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
// Not all platforms support iouring. In that case, ReadAsync in posix
// won't submit async requests.
if (read_async_called) {
ASSERT_GT(async_read_bytes.count, 0);
} else {
ASSERT_EQ(async_read_bytes.count, 0);
}
}
// Check the file to see if ReadAsync is logged.
RunIOTracerParserTool(trace_file_path);
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
}
#endif // GFLAGS
#endif // ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -457,9 +457,16 @@ IOStatus RandomAccessFileReader::ReadAsync(
IOStatus s = file_->ReadAsync(req, opts, read_async_callback, read_async_info,
io_handle, del_fn, nullptr /*dbg*/);
// Suppress false positive clang analyzer warnings.
// Memory is not released if file_->ReadAsync returns !s.ok(), because
// ReadAsyncCallback is never called in that case. If ReadAsyncCallback is
// called then ReadAsync should always return IOStatus::OK().
#ifndef __clang_analyzer__
if (!s.ok()) {
delete read_async_info;
}
#endif // __clang_analyzer__
return s;
}

Loading…
Cancel
Save