diff --git a/HISTORY.md b/HISTORY.md index 3659b8532..bb16b3ad9 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,9 @@ * Add two-phase commit support to C API. * Add `rocksdb_transaction_get_writebatch_wi` and `rocksdb_transaction_rebuild_from_writebatch` to C API. +### New Features +* Add FileSystem::ReadAsync API in io_tracing + ## 7.3.0 (05/20/2022) ### Bug Fixes * Fixed a bug where manual flush would block forever even though flush options had wait=false. diff --git a/Makefile b/Makefile index 8a4e3eb72..20c78edba 100644 --- a/Makefile +++ b/Makefile @@ -1876,7 +1876,7 @@ testutil_test: $(OBJ_DIR)/test_util/testutil_test.o $(TEST_LIBRARY) $(LIBRARY) io_tracer_test: $(OBJ_DIR)/trace_replay/io_tracer_test.o $(OBJ_DIR)/trace_replay/io_tracer.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) -prefetch_test: $(OBJ_DIR)/file/prefetch_test.o $(TEST_LIBRARY) $(LIBRARY) +prefetch_test: $(OBJ_DIR)/file/prefetch_test.o $(OBJ_DIR)/tools/io_tracer_parser_tool.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) io_tracer_parser_test: $(OBJ_DIR)/tools/io_tracer_parser_test.o $(OBJ_DIR)/tools/io_tracer_parser_tool.o $(TEST_LIBRARY) $(LIBRARY) diff --git a/env/file_system_tracer.cc b/env/file_system_tracer.cc index 733f45571..d0c45c57e 100644 --- a/env/file_system_tracer.cc +++ b/env/file_system_tracer.cc @@ -338,6 +338,51 @@ IOStatus FSRandomAccessFileTracingWrapper::InvalidateCache(size_t offset, return s; } +IOStatus FSRandomAccessFileTracingWrapper::ReadAsync( + FSReadRequest& req, const IOOptions& opts, + std::function cb, void* cb_arg, + void** io_handle, IOHandleDeleter* del_fn, IODebugContext* dbg) { + // Create a callback and populate info. + auto read_async_callback = + std::bind(&FSRandomAccessFileTracingWrapper::ReadAsyncCallback, this, + std::placeholders::_1, std::placeholders::_2); + ReadAsyncCallbackInfo* read_async_cb_info = new ReadAsyncCallbackInfo; + read_async_cb_info->cb_ = cb; + read_async_cb_info->cb_arg_ = cb_arg; + read_async_cb_info->start_time_ = clock_->NowNanos(); + read_async_cb_info->file_op_ = __func__; + + IOStatus s = target()->ReadAsync(req, opts, read_async_callback, + read_async_cb_info, io_handle, del_fn, dbg); + + if (!s.ok()) { + delete read_async_cb_info; + } + return s; +} + +void FSRandomAccessFileTracingWrapper::ReadAsyncCallback( + const FSReadRequest& req, void* cb_arg) { + ReadAsyncCallbackInfo* read_async_cb_info = + static_cast(cb_arg); + assert(read_async_cb_info); + assert(read_async_cb_info->cb_); + + uint64_t elapsed = clock_->NowNanos() - read_async_cb_info->start_time_; + uint64_t io_op_data = 0; + io_op_data |= (1 << IOTraceOp::kIOLen); + io_op_data |= (1 << IOTraceOp::kIOOffset); + IOTraceRecord io_record(clock_->NowNanos(), TraceType::kIOTracer, io_op_data, + read_async_cb_info->file_op_, elapsed, + req.status.ToString(), file_name_, req.result.size(), + req.offset); + io_tracer_->WriteIOOp(io_record, nullptr /*dbg*/); + + // call the underlying callback. + read_async_cb_info->cb_(req, read_async_cb_info->cb_arg_); + delete read_async_cb_info; +} + IOStatus FSWritableFileTracingWrapper::Append(const Slice& data, const IOOptions& options, IODebugContext* dbg) { diff --git a/env/file_system_tracer.h b/env/file_system_tracer.h index c8ee160f9..979a0bf12 100644 --- a/env/file_system_tracer.h +++ b/env/file_system_tracer.h @@ -228,11 +228,25 @@ class FSRandomAccessFileTracingWrapper : public FSRandomAccessFileOwnerWrapper { IOStatus InvalidateCache(size_t offset, size_t length) override; + IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts, + std::function cb, + void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, + IODebugContext* dbg) override; + + void ReadAsyncCallback(const FSReadRequest& req, void* cb_arg); + private: std::shared_ptr io_tracer_; SystemClock* clock_; // Stores file name instead of full path. std::string file_name_; + + struct ReadAsyncCallbackInfo { + uint64_t start_time_; + std::function cb_; + void* cb_arg_; + std::string file_op_; + }; }; // The FSRandomAccessFilePtr is a wrapper class that takes pointer to storage diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index 47ddb431a..de896a99e 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -5,6 +5,9 @@ #include "db/db_test_util.h" #include "test_util/sync_point.h" +#ifdef GFLAGS +#include "tools/io_tracer_parser_tool.h" +#endif namespace ROCKSDB_NAMESPACE { @@ -1266,6 +1269,33 @@ class PrefetchTestWithPosix : public DBTestBase, public ::testing::WithParamInterface { public: PrefetchTestWithPosix() : DBTestBase("prefetch_test_with_posix", true) {} + +#ifndef ROCKSDB_LITE +#ifdef GFLAGS + const int kMaxArgCount = 100; + const size_t kArgBufferSize = 100000; + + void RunIOTracerParserTool(std::string trace_file) { + std::vector params = {"./io_tracer_parser", + "-io_trace_file=" + trace_file}; + + char arg_buffer[kArgBufferSize]; + char* argv[kMaxArgCount]; + int argc = 0; + int cursor = 0; + for (const auto& arg : params) { + ASSERT_LE(cursor + arg.size() + 1, kArgBufferSize); + ASSERT_LE(argc + 1, kMaxArgCount); + + snprintf(arg_buffer + cursor, arg.size() + 1, "%s", arg.c_str()); + + argv[argc++] = arg_buffer + cursor; + cursor += static_cast(arg.size()) + 1; + } + ASSERT_EQ(0, ROCKSDB_NAMESPACE::io_tracer_parser(argc, argv)); + } +#endif // GFLAGS +#endif // ROCKSDB_LITE }; INSTANTIATE_TEST_CASE_P(PrefetchTestWithPosix, PrefetchTestWithPosix, @@ -1433,6 +1463,133 @@ TEST_P(PrefetchTestWithPosix, ReadAsyncWithPosixFS) { Close(); } + +#ifndef ROCKSDB_LITE +#ifdef GFLAGS +TEST_P(PrefetchTestWithPosix, TraceReadAsyncWithCallbackWrapper) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } + + const int kNumKeys = 1000; + std::shared_ptr fs = std::make_shared( + FileSystem::Default(), /*support_prefetch=*/false); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + + bool use_direct_io = false; + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + options.statistics = CreateDBStatistics(); + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + int total_keys = 0; + // Write the keys. + { + WriteBatch batch; + Random rnd(309); + for (int j = 0; j < 5; j++) { + for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + total_keys++; + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + ASSERT_OK(Flush()); + } + MoveFilesToLevel(2); + } + + int buff_prefetch_count = 0; + bool read_async_called = false; + ReadOptions ro; + ro.adaptive_readahead = true; + ro.async_io = true; + + if (GetParam()) { + ro.readahead_size = 16 * 1024; + } + + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsyncInternal:Start", + [&](void*) { buff_prefetch_count++; }); + + SyncPoint::GetInstance()->SetCallBack( + "UpdateResults::io_uring_result", + [&](void* /*arg*/) { read_async_called = true; }); + SyncPoint::GetInstance()->EnableProcessing(); + + // Read the keys. + { + // Start io_tracing. + WriteOptions write_opt; + TraceOptions trace_opt; + std::unique_ptr trace_writer; + std::string trace_file_path = dbname_ + "/io_trace_file"; + + ASSERT_OK( + NewFileTraceWriter(env_, EnvOptions(), trace_file_path, &trace_writer)); + ASSERT_OK(db_->StartIOTrace(trace_opt, std::move(trace_writer))); + ASSERT_OK(options.statistics->Reset()); + + auto iter = std::unique_ptr(db_->NewIterator(ro)); + int num_keys = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + num_keys++; + } + + // End the tracing. + ASSERT_OK(db_->EndIOTrace()); + ASSERT_OK(env_->FileExists(trace_file_path)); + + ASSERT_EQ(num_keys, total_keys); + ASSERT_GT(buff_prefetch_count, 0); + + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + // Not all platforms support iouring. In that case, ReadAsync in posix + // won't submit async requests. + if (read_async_called) { + ASSERT_GT(async_read_bytes.count, 0); + } else { + ASSERT_EQ(async_read_bytes.count, 0); + } + } + + // Check the file to see if ReadAsync is logged. + RunIOTracerParserTool(trace_file_path); + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + Close(); +} +#endif // GFLAGS +#endif // ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index 44580e4aa..e74b78cc4 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -457,9 +457,16 @@ IOStatus RandomAccessFileReader::ReadAsync( IOStatus s = file_->ReadAsync(req, opts, read_async_callback, read_async_info, io_handle, del_fn, nullptr /*dbg*/); +// Suppress false positive clang analyzer warnings. +// Memory is not released if file_->ReadAsync returns !s.ok(), because +// ReadAsyncCallback is never called in that case. If ReadAsyncCallback is +// called then ReadAsync should always return IOStatus::OK(). +#ifndef __clang_analyzer__ if (!s.ok()) { delete read_async_info; } +#endif // __clang_analyzer__ + return s; }