From 671753c43d063c0430c68af1db303499a8af732b Mon Sep 17 00:00:00 2001 From: akankshamahajan Date: Mon, 24 Oct 2022 18:34:52 -0700 Subject: [PATCH] Run Clang format on file folder (#10860) Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/10860 Test Plan: CircleCI jobs Reviewed By: anand1976 Differential Revision: D40656236 Pulled By: akankshamahajan15 fbshipit-source-id: 557600db5c2e0ab9b400655336c467307f7136de --- file/delete_scheduler.cc | 11 +- file/delete_scheduler.h | 7 +- file/delete_scheduler_test.cc | 13 +- file/file_util.cc | 2 +- file/filename.cc | 33 +- file/filename.h | 9 +- file/prefetch_test.cc | 988 ++++++++++++------------- file/random_access_file_reader.cc | 2 +- file/random_access_file_reader_test.cc | 4 +- file/read_write_util.cc | 1 + file/read_write_util.h | 1 + file/sst_file_manager_impl.cc | 14 +- file/sst_file_manager_impl.h | 3 +- 13 files changed, 538 insertions(+), 550 deletions(-) diff --git a/file/delete_scheduler.cc b/file/delete_scheduler.cc index 300bf0f8f..b97a0f224 100644 --- a/file/delete_scheduler.cc +++ b/file/delete_scheduler.cc @@ -61,9 +61,10 @@ DeleteScheduler::~DeleteScheduler() { Status DeleteScheduler::DeleteFile(const std::string& file_path, const std::string& dir_to_sync, const bool force_bg) { - if (rate_bytes_per_sec_.load() <= 0 || (!force_bg && - total_trash_size_.load() > - sst_file_manager_->GetTotalSize() * max_trash_db_ratio_.load())) { + if (rate_bytes_per_sec_.load() <= 0 || + (!force_bg && + total_trash_size_.load() > + sst_file_manager_->GetTotalSize() * max_trash_db_ratio_.load())) { // Rate limiting is disabled or trash size makes up more than // max_trash_db_ratio_ (default 25%) of the total DB size TEST_SYNC_POINT("DeleteScheduler::DeleteFile"); @@ -318,8 +319,8 @@ Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash, if (my_status.ok()) { if (num_hard_links == 1) { std::unique_ptr wf; - my_status = fs_->ReopenWritableFile(path_in_trash, FileOptions(), - &wf, nullptr); + my_status = fs_->ReopenWritableFile(path_in_trash, FileOptions(), &wf, + nullptr); if (my_status.ok()) { my_status = wf->Truncate(file_size - bytes_max_delete_chunk_, IOOptions(), nullptr); diff --git a/file/delete_scheduler.h b/file/delete_scheduler.h index 6d3f6b4a4..2904ec621 100644 --- a/file/delete_scheduler.h +++ b/file/delete_scheduler.h @@ -14,7 +14,6 @@ #include "monitoring/instrumented_mutex.h" #include "port/port.h" - #include "rocksdb/status.h" namespace ROCKSDB_NAMESPACE { @@ -54,7 +53,7 @@ class DeleteScheduler { // set, it forces the file to always be deleted in the background thread, // except when rate limiting is disabled Status DeleteFile(const std::string& fname, const std::string& dir_to_sync, - const bool force_bg = false); + const bool force_bg = false); // Wait for all files being deleteing in the background to finish or for // destructor to be called. @@ -67,9 +66,7 @@ class DeleteScheduler { uint64_t GetTotalTrashSize() { return total_trash_size_.load(); } // Return trash/DB size ratio where new files will be deleted immediately - double GetMaxTrashDBRatio() { - return max_trash_db_ratio_.load(); - } + double GetMaxTrashDBRatio() { return max_trash_db_ratio_.load(); } // Update trash/DB size ratio where new files will be deleted immediately void SetMaxTrashDBRatio(double r) { diff --git a/file/delete_scheduler_test.cc b/file/delete_scheduler_test.cc index 196721155..d825da32a 100644 --- a/file/delete_scheduler_test.cc +++ b/file/delete_scheduler_test.cc @@ -136,7 +136,7 @@ TEST_F(DeleteSchedulerTest, BasicRateLimiting) { EXPECT_EQ(dummy_files_dirs_[0], *dir); }); - int num_files = 100; // 100 files + int num_files = 100; // 100 files uint64_t file_size = 1024; // every file is 1 kb std::vector delete_kbs_per_sec = {512, 200, 100, 50, 25}; @@ -249,7 +249,7 @@ TEST_F(DeleteSchedulerTest, RateLimitingMultiThreaded) { [&](void* arg) { penalties.push_back(*(static_cast(arg))); }); int thread_cnt = 10; - int num_files = 10; // 10 files per thread + int num_files = 10; // 10 files per thread uint64_t file_size = 1024; // every file is 1 kb std::vector delete_kbs_per_sec = {512, 200, 100, 50, 25}; @@ -591,8 +591,7 @@ TEST_F(DeleteSchedulerTest, DISABLED_DynamicRateLimiting1) { rate_bytes_per_sec_ = 0; // Disable rate limiting initially NewDeleteScheduler(); - - int num_files = 10; // 10 files + int num_files = 10; // 10 files uint64_t file_size = 1024; // every file is 1 kb std::vector delete_kbs_per_sec = {512, 200, 0, 100, 50, -2, 25}; @@ -662,9 +661,9 @@ TEST_F(DeleteSchedulerTest, ImmediateDeleteOn25PercDBSize) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); - int num_files = 100; // 100 files - uint64_t file_size = 1024 * 10; // 100 KB as a file size - rate_bytes_per_sec_ = 1; // 1 byte per sec (very slow trash delete) + int num_files = 100; // 100 files + uint64_t file_size = 1024 * 10; // 100 KB as a file size + rate_bytes_per_sec_ = 1; // 1 byte per sec (very slow trash delete) NewDeleteScheduler(); delete_scheduler_->SetMaxTrashDBRatio(0.25); diff --git a/file/file_util.cc b/file/file_util.cc index d7858f3c8..7997d6e11 100644 --- a/file/file_util.cc +++ b/file/file_util.cc @@ -5,8 +5,8 @@ // #include "file/file_util.h" -#include #include +#include #include "file/random_access_file_reader.h" #include "file/sequence_file_reader.h" diff --git a/file/filename.cc b/file/filename.cc index b771e0813..1e04c7339 100644 --- a/file/filename.cc +++ b/file/filename.cc @@ -7,11 +7,13 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "file/filename.h" -#include #include #include + +#include #include + #include "file/writable_file_writer.h" #include "rocksdb/env.h" #include "test_util/sync_point.h" @@ -42,10 +44,8 @@ static size_t GetInfoLogPrefix(const std::string& path, char* dest, int len) { while (i < src_len && write_idx < len - sizeof(suffix)) { if ((path[i] >= 'a' && path[i] <= 'z') || (path[i] >= '0' && path[i] <= '9') || - (path[i] >= 'A' && path[i] <= 'Z') || - path[i] == '-' || - path[i] == '.' || - path[i] == '_'){ + (path[i] >= 'A' && path[i] <= 'Z') || path[i] == '-' || + path[i] == '.' || path[i] == '_') { dest[write_idx++] = path[i]; } else { if (i > 0) { @@ -153,9 +153,10 @@ void FormatFileNumber(uint64_t number, uint32_t path_id, char* out_buf, if (path_id == 0) { snprintf(out_buf, out_buf_size, "%" PRIu64, number); } else { - snprintf(out_buf, out_buf_size, "%" PRIu64 - "(path " - "%" PRIu32 ")", + snprintf(out_buf, out_buf_size, + "%" PRIu64 + "(path " + "%" PRIu32 ")", number, path_id); } } @@ -176,9 +177,7 @@ std::string CurrentFileName(const std::string& dbname) { return dbname + "/" + kCurrentFileName; } -std::string LockFileName(const std::string& dbname) { - return dbname + "/LOCK"; -} +std::string LockFileName(const std::string& dbname) { return dbname + "/LOCK"; } std::string TempFileName(const std::string& dbname, uint64_t number) { return MakeFileName(dbname, number, kTempFileNameSuffix.c_str()); @@ -199,7 +198,8 @@ InfoLogPrefix::InfoLogPrefix(bool has_log_dir, } std::string InfoLogFileName(const std::string& dbname, - const std::string& db_path, const std::string& log_dir) { + const std::string& db_path, + const std::string& log_dir) { if (log_dir.empty()) { return dbname + "/LOG"; } @@ -210,7 +210,8 @@ std::string InfoLogFileName(const std::string& dbname, // Return the name of the old info log file for "dbname". std::string OldInfoLogFileName(const std::string& dbname, uint64_t ts, - const std::string& db_path, const std::string& log_dir) { + const std::string& db_path, + const std::string& log_dir) { char buf[50]; snprintf(buf, sizeof(buf), "%llu", static_cast(ts)); @@ -263,9 +264,7 @@ std::string IdentityFileName(const std::string& dbname) { // dbname/OPTIONS-[0-9]+ // dbname/OPTIONS-[0-9]+.dbtmp // Disregards / at the beginning -bool ParseFileName(const std::string& fname, - uint64_t* number, - FileType* type, +bool ParseFileName(const std::string& fname, uint64_t* number, FileType* type, WalFileType* log_type) { return ParseFileName(fname, number, "", type, log_type); } @@ -370,7 +369,7 @@ bool ParseFileName(const std::string& fname, uint64_t* number, *log_type = kAliveLogFile; } } else if (archive_dir_found) { - return false; // Archive dir can contain only log files + return false; // Archive dir can contain only log files } else if (suffix == Slice(kRocksDbTFileExt) || suffix == Slice(kLevelDbTFileExt)) { *type = kTableFile; diff --git a/file/filename.h b/file/filename.h index 6d45e5210..2eb125b6a 100644 --- a/file/filename.h +++ b/file/filename.h @@ -11,8 +11,9 @@ #pragma once #include -#include + #include +#include #include #include "options/db_options.h" @@ -54,8 +55,7 @@ extern std::string ArchivalDirectory(const std::string& dbname); // Return the name of the archived log file with the specified number // in the db named by "dbname". The result will be prefixed with "dbname". -extern std::string ArchivedLogFileName(const std::string& dbname, - uint64_t num); +extern std::string ArchivedLogFileName(const std::string& dbname, uint64_t num); extern std::string MakeTableFileName(const std::string& name, uint64_t number); @@ -140,8 +140,7 @@ extern std::string TempOptionsFileName(const std::string& dbname, // Return the name to use for a metadatabase. The result will be prefixed with // "dbname". -extern std::string MetaDatabaseName(const std::string& dbname, - uint64_t number); +extern std::string MetaDatabaseName(const std::string& dbname, uint64_t number); // Return the name of the Identity file which stores a unique number for the db // that will get regenerated if the db loses all its data and is recreated fresh diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index ec06ef0d8..55ffede50 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -660,7 +660,7 @@ TEST_P(PrefetchTest, PrefetchWhenReseek) { iter->Seek(BuildKey(1019)); ASSERT_TRUE(iter->Valid()); // Missed 2 blocks but they are already in buffer so no reset. - iter->Seek(BuildKey(103)); // Already in buffer. + iter->Seek(BuildKey(103)); // Already in buffer. ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1033)); // Prefetch Data ASSERT_TRUE(iter->Valid()); @@ -1455,119 +1455,165 @@ extern "C" bool RocksDbIOUringEnable() { return true; } namespace { #ifndef ROCKSDB_LITE #ifdef GFLAGS - const int kMaxArgCount = 100; - const size_t kArgBufferSize = 100000; +const int kMaxArgCount = 100; +const size_t kArgBufferSize = 100000; - void RunIOTracerParserTool(std::string trace_file) { - std::vector params = {"./io_tracer_parser", - "-io_trace_file=" + trace_file}; +void RunIOTracerParserTool(std::string trace_file) { + std::vector params = {"./io_tracer_parser", + "-io_trace_file=" + trace_file}; - char arg_buffer[kArgBufferSize]; - char* argv[kMaxArgCount]; - int argc = 0; - int cursor = 0; - for (const auto& arg : params) { - ASSERT_LE(cursor + arg.size() + 1, kArgBufferSize); - ASSERT_LE(argc + 1, kMaxArgCount); + char arg_buffer[kArgBufferSize]; + char* argv[kMaxArgCount]; + int argc = 0; + int cursor = 0; + for (const auto& arg : params) { + ASSERT_LE(cursor + arg.size() + 1, kArgBufferSize); + ASSERT_LE(argc + 1, kMaxArgCount); - snprintf(arg_buffer + cursor, arg.size() + 1, "%s", arg.c_str()); + snprintf(arg_buffer + cursor, arg.size() + 1, "%s", arg.c_str()); - argv[argc++] = arg_buffer + cursor; - cursor += static_cast(arg.size()) + 1; - } - ASSERT_EQ(0, ROCKSDB_NAMESPACE::io_tracer_parser(argc, argv)); + argv[argc++] = arg_buffer + cursor; + cursor += static_cast(arg.size()) + 1; } + ASSERT_EQ(0, ROCKSDB_NAMESPACE::io_tracer_parser(argc, argv)); +} #endif // GFLAGS #endif // ROCKSDB_LITE - } // namespace +} // namespace // Tests the default implementation of ReadAsync API with PosixFileSystem. - TEST_P(PrefetchTest, ReadAsyncWithPosixFS) { - if (mem_env_ || encrypted_env_) { - ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); - return; - } +TEST_P(PrefetchTest, ReadAsyncWithPosixFS) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } - const int kNumKeys = 1000; - std::shared_ptr fs = std::make_shared( - FileSystem::Default(), /*support_prefetch=*/false); - std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); - - bool use_direct_io = std::get<0>(GetParam()); - Options options = CurrentOptions(); - options.write_buffer_size = 1024; - options.create_if_missing = true; - options.compression = kNoCompression; - options.env = env.get(); - options.statistics = CreateDBStatistics(); - if (use_direct_io) { - options.use_direct_reads = true; - options.use_direct_io_for_flush_and_compaction = true; - } - BlockBasedTableOptions table_options; - table_options.no_block_cache = true; - table_options.cache_index_and_filter_blocks = false; - table_options.metadata_block_size = 1024; - table_options.index_type = - BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; - options.table_factory.reset(NewBlockBasedTableFactory(table_options)); - - Status s = TryReopen(options); - if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { - // If direct IO is not supported, skip the test - return; - } else { - ASSERT_OK(s); - } + const int kNumKeys = 1000; + std::shared_ptr fs = std::make_shared( + FileSystem::Default(), /*support_prefetch=*/false); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); - int total_keys = 0; - // Write the keys. - { - WriteBatch batch; - Random rnd(309); - for (int j = 0; j < 5; j++) { - for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { - ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); - total_keys++; - } - ASSERT_OK(db_->Write(WriteOptions(), &batch)); - ASSERT_OK(Flush()); + bool use_direct_io = std::get<0>(GetParam()); + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + options.statistics = CreateDBStatistics(); + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + int total_keys = 0; + // Write the keys. + { + WriteBatch batch; + Random rnd(309); + for (int j = 0; j < 5; j++) { + for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + total_keys++; } - MoveFilesToLevel(2); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + ASSERT_OK(Flush()); } + MoveFilesToLevel(2); + } - int buff_prefetch_count = 0; - bool read_async_called = false; - ReadOptions ro; - ro.adaptive_readahead = true; - ro.async_io = true; + int buff_prefetch_count = 0; + bool read_async_called = false; + ReadOptions ro; + ro.adaptive_readahead = true; + ro.async_io = true; + + if (std::get<1>(GetParam())) { + ro.readahead_size = 16 * 1024; + } + + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsyncInternal:Start", + [&](void*) { buff_prefetch_count++; }); + + SyncPoint::GetInstance()->SetCallBack( + "UpdateResults::io_uring_result", + [&](void* /*arg*/) { read_async_called = true; }); + SyncPoint::GetInstance()->EnableProcessing(); + + // Read the keys. + { + ASSERT_OK(options.statistics->Reset()); + get_perf_context()->Reset(); - if (std::get<1>(GetParam())) { - ro.readahead_size = 16 * 1024; + auto iter = std::unique_ptr(db_->NewIterator(ro)); + int num_keys = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + num_keys++; } - SyncPoint::GetInstance()->SetCallBack( - "FilePrefetchBuffer::PrefetchAsyncInternal:Start", - [&](void*) { buff_prefetch_count++; }); + ASSERT_EQ(num_keys, total_keys); + ASSERT_GT(buff_prefetch_count, 0); - SyncPoint::GetInstance()->SetCallBack( - "UpdateResults::io_uring_result", - [&](void* /*arg*/) { read_async_called = true; }); - SyncPoint::GetInstance()->EnableProcessing(); + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + HistogramData prefetched_bytes_discarded; + options.statistics->histogramData(PREFETCHED_BYTES_DISCARDED, + &prefetched_bytes_discarded); + + // Not all platforms support iouring. In that case, ReadAsync in posix + // won't submit async requests. + if (read_async_called) { + ASSERT_GT(async_read_bytes.count, 0); + } else { + ASSERT_EQ(async_read_bytes.count, 0); + } + ASSERT_GT(prefetched_bytes_discarded.count, 0); + } + ASSERT_EQ(get_perf_context()->number_async_seek, 0); + } - // Read the keys. + { + // Read the keys using seek. { ASSERT_OK(options.statistics->Reset()); get_perf_context()->Reset(); auto iter = std::unique_ptr(db_->NewIterator(ro)); int num_keys = 0; - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + iter->Seek(BuildKey(450)); + while (iter->Valid()) { + ASSERT_OK(iter->status()); + num_keys++; + iter->Next(); + } + ASSERT_OK(iter->status()); + + iter->Seek(BuildKey(450)); + while (iter->Valid()) { ASSERT_OK(iter->status()); num_keys++; + iter->Prev(); } - ASSERT_EQ(num_keys, total_keys); + ASSERT_EQ(num_keys, total_keys + 1); ASSERT_GT(buff_prefetch_count, 0); // Check stats to make sure async prefetch is done. @@ -1582,498 +1628,448 @@ namespace { // won't submit async requests. if (read_async_called) { ASSERT_GT(async_read_bytes.count, 0); + ASSERT_GT(get_perf_context()->number_async_seek, 0); } else { ASSERT_EQ(async_read_bytes.count, 0); + ASSERT_EQ(get_perf_context()->number_async_seek, 0); } ASSERT_GT(prefetched_bytes_discarded.count, 0); } - ASSERT_EQ(get_perf_context()->number_async_seek, 0); } + } - { - // Read the keys using seek. - { - ASSERT_OK(options.statistics->Reset()); - get_perf_context()->Reset(); - - auto iter = std::unique_ptr(db_->NewIterator(ro)); - int num_keys = 0; - iter->Seek(BuildKey(450)); - while (iter->Valid()) { - ASSERT_OK(iter->status()); - num_keys++; - iter->Next(); - } - ASSERT_OK(iter->status()); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); - iter->Seek(BuildKey(450)); - while (iter->Valid()) { - ASSERT_OK(iter->status()); - num_keys++; - iter->Prev(); - } + Close(); +} - ASSERT_EQ(num_keys, total_keys + 1); - ASSERT_GT(buff_prefetch_count, 0); - - // Check stats to make sure async prefetch is done. - { - HistogramData async_read_bytes; - options.statistics->histogramData(ASYNC_READ_BYTES, - &async_read_bytes); - HistogramData prefetched_bytes_discarded; - options.statistics->histogramData(PREFETCHED_BYTES_DISCARDED, - &prefetched_bytes_discarded); - - // Not all platforms support iouring. In that case, ReadAsync in posix - // won't submit async requests. - if (read_async_called) { - ASSERT_GT(async_read_bytes.count, 0); - ASSERT_GT(get_perf_context()->number_async_seek, 0); - } else { - ASSERT_EQ(async_read_bytes.count, 0); - ASSERT_EQ(get_perf_context()->number_async_seek, 0); - } - ASSERT_GT(prefetched_bytes_discarded.count, 0); - } - } - } +TEST_P(PrefetchTest, MultipleSeekWithPosixFS) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } - SyncPoint::GetInstance()->DisableProcessing(); - SyncPoint::GetInstance()->ClearAllCallBacks(); + const int kNumKeys = 1000; + std::shared_ptr fs = std::make_shared( + FileSystem::Default(), /*support_prefetch=*/false); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); - Close(); + bool use_direct_io = std::get<0>(GetParam()); + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + options.statistics = CreateDBStatistics(); + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; } + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); - TEST_P(PrefetchTest, MultipleSeekWithPosixFS) { - if (mem_env_ || encrypted_env_) { - ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); - return; - } + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } - const int kNumKeys = 1000; - std::shared_ptr fs = std::make_shared( - FileSystem::Default(), /*support_prefetch=*/false); - std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); - - bool use_direct_io = std::get<0>(GetParam()); - Options options = CurrentOptions(); - options.write_buffer_size = 1024; - options.create_if_missing = true; - options.compression = kNoCompression; - options.env = env.get(); - options.statistics = CreateDBStatistics(); - if (use_direct_io) { - options.use_direct_reads = true; - options.use_direct_io_for_flush_and_compaction = true; + int total_keys = 0; + // Write the keys. + { + WriteBatch batch; + Random rnd(309); + for (int j = 0; j < 5; j++) { + for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + total_keys++; + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + ASSERT_OK(Flush()); } - BlockBasedTableOptions table_options; - table_options.no_block_cache = true; - table_options.cache_index_and_filter_blocks = false; - table_options.metadata_block_size = 1024; - table_options.index_type = - BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; - options.table_factory.reset(NewBlockBasedTableFactory(table_options)); - - Status s = TryReopen(options); - if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { - // If direct IO is not supported, skip the test - return; - } else { - ASSERT_OK(s); + MoveFilesToLevel(2); + } + + int num_keys_first_batch = 0; + int num_keys_second_batch = 0; + // Calculate number of keys without async_io for correctness validation. + { + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + // First Seek. + iter->Seek(BuildKey(450)); + while (iter->Valid() && num_keys_first_batch < 100) { + ASSERT_OK(iter->status()); + num_keys_first_batch++; + iter->Next(); } + ASSERT_OK(iter->status()); - int total_keys = 0; - // Write the keys. - { - WriteBatch batch; - Random rnd(309); - for (int j = 0; j < 5; j++) { - for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { - ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); - total_keys++; - } - ASSERT_OK(db_->Write(WriteOptions(), &batch)); - ASSERT_OK(Flush()); - } - MoveFilesToLevel(2); + iter->Seek(BuildKey(942)); + while (iter->Valid()) { + ASSERT_OK(iter->status()); + num_keys_second_batch++; + iter->Next(); } + ASSERT_OK(iter->status()); + } - int num_keys_first_batch = 0; - int num_keys_second_batch = 0; - // Calculate number of keys without async_io for correctness validation. + int buff_prefetch_count = 0; + bool read_async_called = false; + ReadOptions ro; + ro.adaptive_readahead = true; + ro.async_io = true; + + if (std::get<1>(GetParam())) { + ro.readahead_size = 16 * 1024; + } + + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsyncInternal:Start", + [&](void*) { buff_prefetch_count++; }); + + SyncPoint::GetInstance()->SetCallBack( + "UpdateResults::io_uring_result", + [&](void* /*arg*/) { read_async_called = true; }); + SyncPoint::GetInstance()->EnableProcessing(); + + // Read the keys using seek. + { + ASSERT_OK(options.statistics->Reset()); + get_perf_context()->Reset(); + + auto iter = std::unique_ptr(db_->NewIterator(ro)); + int num_keys = 0; + // First Seek. { - auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); - // First Seek. iter->Seek(BuildKey(450)); - while (iter->Valid() && num_keys_first_batch < 100) { + while (iter->Valid() && num_keys < 100) { ASSERT_OK(iter->status()); - num_keys_first_batch++; + num_keys++; iter->Next(); } ASSERT_OK(iter->status()); + ASSERT_EQ(num_keys, num_keys_first_batch); + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); - iter->Seek(BuildKey(942)); - while (iter->Valid()) { - ASSERT_OK(iter->status()); - num_keys_second_batch++; - iter->Next(); + // Not all platforms support iouring. In that case, ReadAsync in posix + // won't submit async requests. + if (read_async_called) { + ASSERT_GT(async_read_bytes.count, 0); + ASSERT_GT(get_perf_context()->number_async_seek, 0); + } else { + ASSERT_EQ(async_read_bytes.count, 0); + ASSERT_EQ(get_perf_context()->number_async_seek, 0); + } } - ASSERT_OK(iter->status()); } - int buff_prefetch_count = 0; - bool read_async_called = false; - ReadOptions ro; - ro.adaptive_readahead = true; - ro.async_io = true; - - if (std::get<1>(GetParam())) { - ro.readahead_size = 16 * 1024; - } - - SyncPoint::GetInstance()->SetCallBack( - "FilePrefetchBuffer::PrefetchAsyncInternal:Start", - [&](void*) { buff_prefetch_count++; }); - - SyncPoint::GetInstance()->SetCallBack( - "UpdateResults::io_uring_result", - [&](void* /*arg*/) { read_async_called = true; }); - SyncPoint::GetInstance()->EnableProcessing(); - - // Read the keys using seek. + // Second Seek. { + num_keys = 0; ASSERT_OK(options.statistics->Reset()); get_perf_context()->Reset(); - auto iter = std::unique_ptr(db_->NewIterator(ro)); - int num_keys = 0; - // First Seek. - { - iter->Seek(BuildKey(450)); - while (iter->Valid() && num_keys < 100) { - ASSERT_OK(iter->status()); - num_keys++; - iter->Next(); - } + iter->Seek(BuildKey(942)); + while (iter->Valid()) { ASSERT_OK(iter->status()); - ASSERT_EQ(num_keys, num_keys_first_batch); - // Check stats to make sure async prefetch is done. - { - HistogramData async_read_bytes; - options.statistics->histogramData(ASYNC_READ_BYTES, - &async_read_bytes); - - // Not all platforms support iouring. In that case, ReadAsync in posix - // won't submit async requests. - if (read_async_called) { - ASSERT_GT(async_read_bytes.count, 0); - ASSERT_GT(get_perf_context()->number_async_seek, 0); - } else { - ASSERT_EQ(async_read_bytes.count, 0); - ASSERT_EQ(get_perf_context()->number_async_seek, 0); - } - } + num_keys++; + iter->Next(); } + ASSERT_OK(iter->status()); + ASSERT_EQ(num_keys, num_keys_second_batch); - // Second Seek. + ASSERT_GT(buff_prefetch_count, 0); + + // Check stats to make sure async prefetch is done. { - num_keys = 0; - ASSERT_OK(options.statistics->Reset()); - get_perf_context()->Reset(); - - iter->Seek(BuildKey(942)); - while (iter->Valid()) { - ASSERT_OK(iter->status()); - num_keys++; - iter->Next(); - } - ASSERT_OK(iter->status()); - ASSERT_EQ(num_keys, num_keys_second_batch); - - ASSERT_GT(buff_prefetch_count, 0); - - // Check stats to make sure async prefetch is done. - { - HistogramData async_read_bytes; - options.statistics->histogramData(ASYNC_READ_BYTES, - &async_read_bytes); - HistogramData prefetched_bytes_discarded; - options.statistics->histogramData(PREFETCHED_BYTES_DISCARDED, - &prefetched_bytes_discarded); - - // Not all platforms support iouring. In that case, ReadAsync in posix - // won't submit async requests. - if (read_async_called) { - ASSERT_GT(async_read_bytes.count, 0); - ASSERT_GT(get_perf_context()->number_async_seek, 0); - } else { - ASSERT_EQ(async_read_bytes.count, 0); - ASSERT_EQ(get_perf_context()->number_async_seek, 0); - } - ASSERT_GT(prefetched_bytes_discarded.count, 0); + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + HistogramData prefetched_bytes_discarded; + options.statistics->histogramData(PREFETCHED_BYTES_DISCARDED, + &prefetched_bytes_discarded); + + // Not all platforms support iouring. In that case, ReadAsync in posix + // won't submit async requests. + if (read_async_called) { + ASSERT_GT(async_read_bytes.count, 0); + ASSERT_GT(get_perf_context()->number_async_seek, 0); + } else { + ASSERT_EQ(async_read_bytes.count, 0); + ASSERT_EQ(get_perf_context()->number_async_seek, 0); } + ASSERT_GT(prefetched_bytes_discarded.count, 0); } } + } - SyncPoint::GetInstance()->DisableProcessing(); - SyncPoint::GetInstance()->ClearAllCallBacks(); - Close(); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + Close(); +} + +TEST_P(PrefetchTest, SeekParallelizationTest1) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; } + const int kNumKeys = 2000; + // Set options + std::shared_ptr fs = std::make_shared( + FileSystem::Default(), /*support_prefetch=*/false); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); - TEST_P(PrefetchTest, SeekParallelizationTest1) { - if (mem_env_ || encrypted_env_) { - ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); - return; - } - const int kNumKeys = 2000; - // Set options - std::shared_ptr fs = std::make_shared( - FileSystem::Default(), /*support_prefetch=*/false); - std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); - - bool use_direct_io = std::get<0>(GetParam()); - Options options = CurrentOptions(); - options.write_buffer_size = 1024; - options.create_if_missing = true; - options.compression = kNoCompression; - options.env = env.get(); - if (use_direct_io) { - options.use_direct_reads = true; - options.use_direct_io_for_flush_and_compaction = true; - } + bool use_direct_io = std::get<0>(GetParam()); + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } - options.statistics = CreateDBStatistics(); - BlockBasedTableOptions table_options; - table_options.no_block_cache = true; - table_options.cache_index_and_filter_blocks = false; - table_options.metadata_block_size = 1024; - table_options.index_type = - BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; - options.table_factory.reset(NewBlockBasedTableFactory(table_options)); - - Status s = TryReopen(options); - if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { - // If direct IO is not supported, skip the test - return; - } else { - ASSERT_OK(s); - } + options.statistics = CreateDBStatistics(); + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); - WriteBatch batch; - Random rnd(309); - for (int i = 0; i < kNumKeys; i++) { - ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); - } - ASSERT_OK(db_->Write(WriteOptions(), &batch)); + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } - std::string start_key = BuildKey(0); - std::string end_key = BuildKey(kNumKeys - 1); - Slice least(start_key.data(), start_key.size()); - Slice greatest(end_key.data(), end_key.size()); + WriteBatch batch; + Random rnd(309); + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); - ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); + std::string start_key = BuildKey(0); + std::string end_key = BuildKey(kNumKeys - 1); + Slice least(start_key.data(), start_key.size()); + Slice greatest(end_key.data(), end_key.size()); - int buff_prefetch_count = 0; + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); - SyncPoint::GetInstance()->SetCallBack( - "FilePrefetchBuffer::PrefetchAsyncInternal:Start", - [&](void*) { buff_prefetch_count++; }); + int buff_prefetch_count = 0; - bool read_async_called = false; - SyncPoint::GetInstance()->SetCallBack( - "UpdateResults::io_uring_result", - [&](void* /*arg*/) { read_async_called = true; }); - SyncPoint::GetInstance()->EnableProcessing(); + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsyncInternal:Start", + [&](void*) { buff_prefetch_count++; }); - SyncPoint::GetInstance()->EnableProcessing(); - ReadOptions ro; - ro.adaptive_readahead = true; - ro.async_io = true; + bool read_async_called = false; + SyncPoint::GetInstance()->SetCallBack( + "UpdateResults::io_uring_result", + [&](void* /*arg*/) { read_async_called = true; }); + SyncPoint::GetInstance()->EnableProcessing(); - if (std::get<1>(GetParam())) { - ro.readahead_size = 16 * 1024; - } + SyncPoint::GetInstance()->EnableProcessing(); + ReadOptions ro; + ro.adaptive_readahead = true; + ro.async_io = true; - { - ASSERT_OK(options.statistics->Reset()); - // Each block contains around 4 keys. - auto iter = std::unique_ptr(db_->NewIterator(ro)); - iter->Seek( - BuildKey(0)); // Prefetch data because of seek parallelization. - ASSERT_TRUE(iter->Valid()); - iter->Next(); - ASSERT_TRUE(iter->Valid()); - iter->Next(); - ASSERT_TRUE(iter->Valid()); - iter->Next(); - ASSERT_TRUE(iter->Valid()); + if (std::get<1>(GetParam())) { + ro.readahead_size = 16 * 1024; + } - // New data block. Since num_file_reads in FilePrefetch after this read is - // 2, it won't go for prefetching. - iter->Next(); - ASSERT_TRUE(iter->Valid()); - iter->Next(); - ASSERT_TRUE(iter->Valid()); - iter->Next(); - ASSERT_TRUE(iter->Valid()); - iter->Next(); - ASSERT_TRUE(iter->Valid()); + { + ASSERT_OK(options.statistics->Reset()); + // Each block contains around 4 keys. + auto iter = std::unique_ptr(db_->NewIterator(ro)); + iter->Seek(BuildKey(0)); // Prefetch data because of seek parallelization. + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); - // Prefetch data. - iter->Next(); - ASSERT_TRUE(iter->Valid()); + // New data block. Since num_file_reads in FilePrefetch after this read is + // 2, it won't go for prefetching. + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); - // Check stats to make sure async prefetch is done. - { - HistogramData async_read_bytes; - options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); - // Not all platforms support iouring. In that case, ReadAsync in posix - // won't submit async requests. - if (read_async_called) { - ASSERT_GT(async_read_bytes.count, 0); - ASSERT_GT(get_perf_context()->number_async_seek, 0); - if (std::get<1>(GetParam())) { - ASSERT_EQ(buff_prefetch_count, 1); - } else { - ASSERT_EQ(buff_prefetch_count, 2); - } - } else { - ASSERT_EQ(async_read_bytes.count, 0); - ASSERT_EQ(get_perf_context()->number_async_seek, 0); + // Prefetch data. + iter->Next(); + ASSERT_TRUE(iter->Valid()); + + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + // Not all platforms support iouring. In that case, ReadAsync in posix + // won't submit async requests. + if (read_async_called) { + ASSERT_GT(async_read_bytes.count, 0); + ASSERT_GT(get_perf_context()->number_async_seek, 0); + if (std::get<1>(GetParam())) { ASSERT_EQ(buff_prefetch_count, 1); + } else { + ASSERT_EQ(buff_prefetch_count, 2); } + } else { + ASSERT_EQ(async_read_bytes.count, 0); + ASSERT_EQ(get_perf_context()->number_async_seek, 0); + ASSERT_EQ(buff_prefetch_count, 1); } - - buff_prefetch_count = 0; } - Close(); + + buff_prefetch_count = 0; } + Close(); +} #ifndef ROCKSDB_LITE #ifdef GFLAGS - TEST_P(PrefetchTest, TraceReadAsyncWithCallbackWrapper) { - if (mem_env_ || encrypted_env_) { - ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); - return; - } +TEST_P(PrefetchTest, TraceReadAsyncWithCallbackWrapper) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } - const int kNumKeys = 1000; - std::shared_ptr fs = std::make_shared( - FileSystem::Default(), /*support_prefetch=*/false); - std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); - - bool use_direct_io = std::get<0>(GetParam()); - Options options = CurrentOptions(); - options.write_buffer_size = 1024; - options.create_if_missing = true; - options.compression = kNoCompression; - options.env = env.get(); - options.statistics = CreateDBStatistics(); - if (use_direct_io) { - options.use_direct_reads = true; - options.use_direct_io_for_flush_and_compaction = true; - } - BlockBasedTableOptions table_options; - table_options.no_block_cache = true; - table_options.cache_index_and_filter_blocks = false; - table_options.metadata_block_size = 1024; - table_options.index_type = - BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; - options.table_factory.reset(NewBlockBasedTableFactory(table_options)); - - Status s = TryReopen(options); - if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { - // If direct IO is not supported, skip the test - return; - } else { - ASSERT_OK(s); - } + const int kNumKeys = 1000; + std::shared_ptr fs = std::make_shared( + FileSystem::Default(), /*support_prefetch=*/false); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); - int total_keys = 0; - // Write the keys. - { - WriteBatch batch; - Random rnd(309); - for (int j = 0; j < 5; j++) { - for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { - ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); - total_keys++; - } - ASSERT_OK(db_->Write(WriteOptions(), &batch)); - ASSERT_OK(Flush()); + bool use_direct_io = std::get<0>(GetParam()); + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + options.statistics = CreateDBStatistics(); + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + int total_keys = 0; + // Write the keys. + { + WriteBatch batch; + Random rnd(309); + for (int j = 0; j < 5; j++) { + for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + total_keys++; } - MoveFilesToLevel(2); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + ASSERT_OK(Flush()); } + MoveFilesToLevel(2); + } - int buff_prefetch_count = 0; - bool read_async_called = false; - ReadOptions ro; - ro.adaptive_readahead = true; - ro.async_io = true; + int buff_prefetch_count = 0; + bool read_async_called = false; + ReadOptions ro; + ro.adaptive_readahead = true; + ro.async_io = true; - if (std::get<1>(GetParam())) { - ro.readahead_size = 16 * 1024; - } + if (std::get<1>(GetParam())) { + ro.readahead_size = 16 * 1024; + } - SyncPoint::GetInstance()->SetCallBack( - "FilePrefetchBuffer::PrefetchAsyncInternal:Start", - [&](void*) { buff_prefetch_count++; }); + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsyncInternal:Start", + [&](void*) { buff_prefetch_count++; }); - SyncPoint::GetInstance()->SetCallBack( - "UpdateResults::io_uring_result", - [&](void* /*arg*/) { read_async_called = true; }); - SyncPoint::GetInstance()->EnableProcessing(); + SyncPoint::GetInstance()->SetCallBack( + "UpdateResults::io_uring_result", + [&](void* /*arg*/) { read_async_called = true; }); + SyncPoint::GetInstance()->EnableProcessing(); - // Read the keys. - { - // Start io_tracing. - WriteOptions write_opt; - TraceOptions trace_opt; - std::unique_ptr trace_writer; - std::string trace_file_path = dbname_ + "/io_trace_file"; - - ASSERT_OK(NewFileTraceWriter(env_, EnvOptions(), trace_file_path, - &trace_writer)); - ASSERT_OK(db_->StartIOTrace(trace_opt, std::move(trace_writer))); - ASSERT_OK(options.statistics->Reset()); + // Read the keys. + { + // Start io_tracing. + WriteOptions write_opt; + TraceOptions trace_opt; + std::unique_ptr trace_writer; + std::string trace_file_path = dbname_ + "/io_trace_file"; + + ASSERT_OK( + NewFileTraceWriter(env_, EnvOptions(), trace_file_path, &trace_writer)); + ASSERT_OK(db_->StartIOTrace(trace_opt, std::move(trace_writer))); + ASSERT_OK(options.statistics->Reset()); - auto iter = std::unique_ptr(db_->NewIterator(ro)); - int num_keys = 0; - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - ASSERT_OK(iter->status()); - num_keys++; - } + auto iter = std::unique_ptr(db_->NewIterator(ro)); + int num_keys = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + num_keys++; + } - // End the tracing. - ASSERT_OK(db_->EndIOTrace()); - ASSERT_OK(env_->FileExists(trace_file_path)); + // End the tracing. + ASSERT_OK(db_->EndIOTrace()); + ASSERT_OK(env_->FileExists(trace_file_path)); - ASSERT_EQ(num_keys, total_keys); - ASSERT_GT(buff_prefetch_count, 0); + ASSERT_EQ(num_keys, total_keys); + ASSERT_GT(buff_prefetch_count, 0); - // Check stats to make sure async prefetch is done. - { - HistogramData async_read_bytes; - options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); - // Not all platforms support iouring. In that case, ReadAsync in posix - // won't submit async requests. - if (read_async_called) { - ASSERT_GT(async_read_bytes.count, 0); - } else { - ASSERT_EQ(async_read_bytes.count, 0); - } + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + // Not all platforms support iouring. In that case, ReadAsync in posix + // won't submit async requests. + if (read_async_called) { + ASSERT_GT(async_read_bytes.count, 0); + } else { + ASSERT_EQ(async_read_bytes.count, 0); } - - // Check the file to see if ReadAsync is logged. - RunIOTracerParserTool(trace_file_path); } - SyncPoint::GetInstance()->DisableProcessing(); - SyncPoint::GetInstance()->ClearAllCallBacks(); - - Close(); + // Check the file to see if ReadAsync is logged. + RunIOTracerParserTool(trace_file_path); } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + Close(); +} #endif // GFLAGS #endif // ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index 71ae4577e..7180e239e 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -248,7 +248,7 @@ size_t End(const FSReadRequest& r) { FSReadRequest Align(const FSReadRequest& r, size_t alignment) { FSReadRequest req; req.offset = static_cast( - TruncateToPageBoundary(alignment, static_cast(r.offset))); + TruncateToPageBoundary(alignment, static_cast(r.offset))); req.len = Roundup(End(r), alignment) - req.offset; req.scratch = nullptr; return req; diff --git a/file/random_access_file_reader_test.cc b/file/random_access_file_reader_test.cc index 0f5402686..ac0e9e57a 100644 --- a/file/random_access_file_reader_test.cc +++ b/file/random_access_file_reader_test.cc @@ -60,9 +60,7 @@ class RandomAccessFileReaderTest : public testing::Test { std::shared_ptr fs_; std::string test_dir_; - std::string Path(const std::string& fname) { - return test_dir_ + "/" + fname; - } + std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; } }; // Skip the following tests in lite mode since direct I/O is unsupported. diff --git a/file/read_write_util.cc b/file/read_write_util.cc index cc4f6b849..3617a35e3 100644 --- a/file/read_write_util.cc +++ b/file/read_write_util.cc @@ -10,6 +10,7 @@ #include "file/read_write_util.h" #include + #include "test_util/sync_point.h" namespace ROCKSDB_NAMESPACE { diff --git a/file/read_write_util.h b/file/read_write_util.h index 718135c98..9f034b705 100644 --- a/file/read_write_util.h +++ b/file/read_write_util.h @@ -9,6 +9,7 @@ #pragma once #include + #include "file/sequence_file_reader.h" #include "rocksdb/env.h" #include "rocksdb/file_system.h" diff --git a/file/sst_file_manager_impl.cc b/file/sst_file_manager_impl.cc index c4c411488..7053e6a07 100644 --- a/file/sst_file_manager_impl.cc +++ b/file/sst_file_manager_impl.cc @@ -161,9 +161,8 @@ bool SstFileManagerImpl::EnoughRoomForCompaction( // Update cur_compactions_reserved_size_ so concurrent compaction // don't max out space - size_t needed_headroom = - cur_compactions_reserved_size_ + size_added_by_compaction + - compaction_buffer_size_; + size_t needed_headroom = cur_compactions_reserved_size_ + + size_added_by_compaction + compaction_buffer_size_; if (max_allowed_space_ != 0 && (needed_headroom + total_files_size_ > max_allowed_space_)) { return false; @@ -415,13 +414,12 @@ bool SstFileManagerImpl::CancelErrorRecovery(ErrorHandler* handler) { return false; } -Status SstFileManagerImpl::ScheduleFileDeletion( - const std::string& file_path, const std::string& path_to_sync, - const bool force_bg) { +Status SstFileManagerImpl::ScheduleFileDeletion(const std::string& file_path, + const std::string& path_to_sync, + const bool force_bg) { TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::ScheduleFileDeletion", const_cast(&file_path)); - return delete_scheduler_.DeleteFile(file_path, path_to_sync, - force_bg); + return delete_scheduler_.DeleteFile(file_path, path_to_sync, force_bg); } void SstFileManagerImpl::WaitForEmptyTrash() { diff --git a/file/sst_file_manager_impl.h b/file/sst_file_manager_impl.h index bc41a9405..548eb57f8 100644 --- a/file/sst_file_manager_impl.h +++ b/file/sst_file_manager_impl.h @@ -9,10 +9,9 @@ #include -#include "port/port.h" - #include "db/compaction/compaction.h" #include "file/delete_scheduler.h" +#include "port/port.h" #include "rocksdb/sst_file_manager.h" namespace ROCKSDB_NAMESPACE {