Enhance async scan prefetch unit tests (#11087)

Summary:
Add more coverage in unit tests for async scan. The added unit test fails without PR https://github.com/facebook/rocksdb/pull/10939.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11087

Test Plan: CircleCI jobs status for new unit tests.

Reviewed By: anand1976

Differential Revision: D42487931

Pulled By: akankshamahajan15

fbshipit-source-id: d59ed7666599bd0d2733ac5d76bd70984b54c5a9
oxigraph-8.1.1
akankshamahajan 2 years ago committed by Facebook GitHub Bot
parent f4a5446cab
commit bde65052c4
  1. 1
      file/file_prefetch_buffer.cc
  2. 500
      file/prefetch_test.cc

@ -109,6 +109,7 @@ Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t read_len,
uint64_t rounddown_start, uint32_t index) {
TEST_SYNC_POINT("FilePrefetchBuffer::ReadAsync");
// callback for async read request.
auto fp = std::bind(&FilePrefetchBuffer::PrefetchAsyncCallback, this,
std::placeholders::_1, std::placeholders::_2);

@ -79,6 +79,27 @@ class PrefetchTest
public ::testing::WithParamInterface<std::tuple<bool, bool>> {
public:
PrefetchTest() : DBTestBase("prefetch_test", true) {}
void SetGenericOptions(Env* env, bool use_direct_io, Options& options) {
options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env;
options.disable_auto_compactions = true;
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
}
void SetBlockBasedTableOptions(BlockBasedTableOptions& table_options) {
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
}
};
INSTANTIATE_TEST_CASE_P(PrefetchTest, PrefetchTest,
@ -89,28 +110,23 @@ std::string BuildKey(int num, std::string postfix = "") {
return "my_key_" + std::to_string(num) + postfix;
}
// This test verifies the basic functionality of prefetching.
TEST_P(PrefetchTest, Basic) {
// First param is if the mockFS support_prefetch or not
bool support_prefetch =
std::get<0>(GetParam()) &&
test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
// Second param is if directIO is enabled or not
bool use_direct_io = std::get<1>(GetParam());
const int kNumKeys = 1100;
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
const int kNumKeys = 1100;
int buff_prefetch_count = 0;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
@ -192,35 +208,24 @@ TEST_P(PrefetchTest, Basic) {
}
#ifndef ROCKSDB_LITE
// This test verifies BlockBasedTableOptions.max_auto_readahead_size is
// configured dynamically.
TEST_P(PrefetchTest, ConfigureAutoMaxReadaheadSize) {
// First param is if the mockFS support_prefetch or not
bool support_prefetch =
std::get<0>(GetParam()) &&
test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
// Second param is if directIO is enabled or not
bool use_direct_io = std::get<1>(GetParam());
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
options.disable_auto_compactions = true;
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
SetBlockBasedTableOptions(table_options);
table_options.max_auto_readahead_size = 0;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
@ -329,6 +334,8 @@ TEST_P(PrefetchTest, ConfigureAutoMaxReadaheadSize) {
Close();
}
// This test verifies BlockBasedTableOptions.initial_auto_readahead_size is
// configured dynamically.
TEST_P(PrefetchTest, ConfigureInternalAutoReadaheadSize) {
// First param is if the mockFS support_prefetch or not
bool support_prefetch =
@ -341,23 +348,10 @@ TEST_P(PrefetchTest, ConfigureInternalAutoReadaheadSize) {
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
options.disable_auto_compactions = true;
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
SetBlockBasedTableOptions(table_options);
table_options.initial_auto_readahead_size = 0;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
@ -468,6 +462,8 @@ TEST_P(PrefetchTest, ConfigureInternalAutoReadaheadSize) {
Close();
}
// This test verifies BlockBasedTableOptions.num_file_reads_for_auto_readahead
// is configured dynamically.
TEST_P(PrefetchTest, ConfigureNumFilesReadsForReadaheadSize) {
// First param is if the mockFS support_prefetch or not
bool support_prefetch =
@ -482,26 +478,13 @@ TEST_P(PrefetchTest, ConfigureNumFilesReadsForReadaheadSize) {
// Second param is if directIO is enabled or not
bool use_direct_io = std::get<1>(GetParam());
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
SetBlockBasedTableOptions(table_options);
table_options.num_file_reads_for_auto_readahead = 0;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
int buff_prefetch_count = 0;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
@ -578,6 +561,13 @@ TEST_P(PrefetchTest, ConfigureNumFilesReadsForReadaheadSize) {
}
#endif // !ROCKSDB_LITE
// This test verifies the basic functionality of implicit autoreadahead:
// - Enable implicit autoreadahead and prefetch only if sequential blocks are
// read,
// - If data is already in buffer and few blocks are not requested to read,
// don't reset,
// - If data blocks are sequential during read after enabling implicit
// autoreadahead, reset readahead parameters.
TEST_P(PrefetchTest, PrefetchWhenReseek) {
// First param is if the mockFS support_prefetch or not
bool support_prefetch =
@ -592,25 +582,12 @@ TEST_P(PrefetchTest, PrefetchWhenReseek) {
// Second param is if directIO is enabled or not
bool use_direct_io = std::get<1>(GetParam());
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
int buff_prefetch_count = 0;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
@ -846,6 +823,12 @@ TEST_P(PrefetchTest, PrefetchWhenReseek) {
Close();
}
// This test verifies the functionality of implicit autoreadahead when caching
// is enabled:
// - If data is already in buffer and few blocks are not requested to read,
// don't reset,
// - If block was eligible for prefetching/in buffer but found in cache, don't
// prefetch and reset.
TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) {
// First param is if the mockFS support_prefetch or not
bool support_prefetch =
@ -860,26 +843,15 @@ TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) {
// Second param is if directIO is enabled or not
bool use_direct_io = std::get<1>(GetParam());
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); // 8MB
table_options.block_cache = cache;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
table_options.no_block_cache = false;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
int buff_prefetch_count = 0;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
@ -978,6 +950,7 @@ TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) {
}
#ifndef ROCKSDB_LITE
// This test verifies the functionality of ReadOptions.adaptive_readahead.
TEST_P(PrefetchTest, DBIterLevelReadAhead) {
const int kNumKeys = 1000;
// Set options
@ -988,23 +961,11 @@ TEST_P(PrefetchTest, DBIterLevelReadAhead) {
bool use_direct_io = std::get<0>(GetParam());
bool is_adaptive_readahead = std::get<1>(GetParam());
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
options.statistics = CreateDBStatistics();
options.env = env.get();
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
@ -1028,7 +989,6 @@ TEST_P(PrefetchTest, DBIterLevelReadAhead) {
}
MoveFilesToLevel(2);
int buff_prefetch_count = 0;
int buff_async_prefetch_count = 0;
int readahead_carry_over_count = 0;
int num_sst_files = NumTableFilesAtLevel(2);
size_t current_readahead_size = 0;
@ -1039,6 +999,101 @@ TEST_P(PrefetchTest, DBIterLevelReadAhead) {
"FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
// The callback checks, since reads are sequential, readahead_size doesn't
// start from 8KB when iterator moves to next file and its called
// num_sst_files-1 times (excluding for first file).
SyncPoint::GetInstance()->SetCallBack(
"BlockPrefetcher::SetReadaheadState", [&](void* arg) {
readahead_carry_over_count++;
size_t readahead_size = *reinterpret_cast<size_t*>(arg);
if (readahead_carry_over_count) {
ASSERT_GT(readahead_size, 8 * 1024);
}
});
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::TryReadFromCache", [&](void* arg) {
current_readahead_size = *reinterpret_cast<size_t*>(arg);
ASSERT_GT(current_readahead_size, 0);
});
SyncPoint::GetInstance()->EnableProcessing();
ReadOptions ro;
if (is_adaptive_readahead) {
ro.adaptive_readahead = true;
}
ASSERT_OK(options.statistics->Reset());
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
num_keys++;
}
ASSERT_EQ(num_keys, total_keys);
// For index and data blocks.
if (is_adaptive_readahead) {
ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1));
} else {
ASSERT_GT(buff_prefetch_count, 0);
ASSERT_EQ(readahead_carry_over_count, 0);
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
Close();
}
// This test verifies the functionality of ReadOptions.adaptive_readahead when
// async_io is enabled.
TEST_P(PrefetchTest, DBIterLevelReadAheadWithAsyncIO) {
const int kNumKeys = 1000;
// Set options
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = std::get<0>(GetParam());
bool is_adaptive_readahead = std::get<1>(GetParam());
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test
return;
} else {
ASSERT_OK(s);
}
WriteBatch batch;
Random rnd(309);
int total_keys = 0;
for (int j = 0; j < 5; j++) {
for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
total_keys++;
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
}
MoveFilesToLevel(2);
int buff_async_prefetch_count = 0;
int readahead_carry_over_count = 0;
int num_sst_files = NumTableFilesAtLevel(2);
size_t current_readahead_size = 0;
// Test - Iterate over the keys sequentially.
{
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::PrefetchAsyncInternal:Start",
[&](void*) { buff_async_prefetch_count++; });
@ -1066,8 +1121,8 @@ TEST_P(PrefetchTest, DBIterLevelReadAhead) {
ReadOptions ro;
if (is_adaptive_readahead) {
ro.adaptive_readahead = true;
ro.async_io = true;
}
ro.async_io = true;
ASSERT_OK(options.statistics->Reset());
@ -1082,11 +1137,10 @@ TEST_P(PrefetchTest, DBIterLevelReadAhead) {
// For index and data blocks.
if (is_adaptive_readahead) {
ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1));
ASSERT_GT(buff_async_prefetch_count, 0);
} else {
ASSERT_GT(buff_prefetch_count, 0);
ASSERT_EQ(readahead_carry_over_count, 0);
}
ASSERT_GT(buff_async_prefetch_count, 0);
// Check stats to make sure async prefetch is done.
{
@ -1110,11 +1164,34 @@ class PrefetchTest1 : public DBTestBase,
public ::testing::WithParamInterface<bool> {
public:
PrefetchTest1() : DBTestBase("prefetch_test1", true) {}
void SetGenericOptions(Env* env, bool use_direct_io, Options& options) {
options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env;
options.disable_auto_compactions = true;
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
}
void SetBlockBasedTableOptions(BlockBasedTableOptions& table_options) {
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
}
};
INSTANTIATE_TEST_CASE_P(PrefetchTest1, PrefetchTest1, ::testing::Bool());
#ifndef ROCKSDB_LITE
// This test verifies the functionality of ReadOptions.adaptive_readahead when
// reads are not sequential.
TEST_P(PrefetchTest1, NonSequentialReadsWithAdaptiveReadahead) {
const int kNumKeys = 1000;
// Set options
@ -1122,21 +1199,10 @@ TEST_P(PrefetchTest1, NonSequentialReadsWithAdaptiveReadahead) {
std::make_shared<MockFS>(env_->GetFileSystem(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
if (GetParam()) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
Options options;
SetGenericOptions(env.get(), GetParam(), options);
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
@ -1208,6 +1274,16 @@ TEST_P(PrefetchTest1, NonSequentialReadsWithAdaptiveReadahead) {
}
#endif //! ROCKSDB_LITE
// This test verifies the functionality of adaptive_readaheadsize with cache and
// if block is found in cache, decrease the readahead_size if
// - its enabled internally by RocksDB (implicit_auto_readahead_) and,
// - readahead_size is greater than 0 and,
// - the block would have called prefetch API if not found in cache for
// which conditions are:
// - few/no bytes are in buffer and,
// - block is sequential with the previous read and,
// - num_file_reads_ + 1 (including this read) >
// num_file_reads_for_auto_readahead_
TEST_P(PrefetchTest1, DecreaseReadAheadIfInCache) {
const int kNumKeys = 2000;
// Set options
@ -1215,24 +1291,14 @@ TEST_P(PrefetchTest1, DecreaseReadAheadIfInCache) {
std::make_shared<MockFS>(env_->GetFileSystem(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
if (GetParam()) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
Options options;
SetGenericOptions(env.get(), GetParam(), options);
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); // 8MB
table_options.block_cache = cache;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
table_options.no_block_cache = false;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
@ -1352,6 +1418,8 @@ TEST_P(PrefetchTest1, DecreaseReadAheadIfInCache) {
Close();
}
// This test verifies the basic functionality of seek parallelization for
// async_io.
TEST_P(PrefetchTest1, SeekParallelizationTest) {
const int kNumKeys = 2000;
// Set options
@ -1359,23 +1427,11 @@ TEST_P(PrefetchTest1, SeekParallelizationTest) {
std::make_shared<MockFS>(env_->GetFileSystem(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
if (GetParam()) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
Options options;
SetGenericOptions(env.get(), GetParam(), options);
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
@ -1485,7 +1541,8 @@ void RunIOTracerParserTool(std::string trace_file) {
#endif // ROCKSDB_LITE
} // namespace
// Tests the default implementation of ReadAsync API with PosixFileSystem.
// Tests the default implementation of ReadAsync API with PosixFileSystem during
// prefetching.
TEST_P(PrefetchTest, ReadAsyncWithPosixFS) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
@ -1498,22 +1555,11 @@ TEST_P(PrefetchTest, ReadAsyncWithPosixFS) {
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = std::get<0>(GetParam());
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
options.statistics = CreateDBStatistics();
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
@ -1600,6 +1646,8 @@ TEST_P(PrefetchTest, ReadAsyncWithPosixFS) {
Close();
}
// This test verifies implementation of seek parallelization with
// PosixFileSystem during prefetching.
TEST_P(PrefetchTest, MultipleSeekWithPosixFS) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
@ -1612,22 +1660,11 @@ TEST_P(PrefetchTest, MultipleSeekWithPosixFS) {
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = std::get<0>(GetParam());
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
options.statistics = CreateDBStatistics();
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
@ -1774,7 +1811,9 @@ TEST_P(PrefetchTest, MultipleSeekWithPosixFS) {
Close();
}
TEST_P(PrefetchTest, SeekParallelizationTest1) {
// This test verifies implementation of seek parallelization with
// PosixFileSystem during prefetching.
TEST_P(PrefetchTest, SeekParallelizationTestWithPosix) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
@ -1786,23 +1825,11 @@ TEST_P(PrefetchTest, SeekParallelizationTest1) {
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = std::get<0>(GetParam());
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
@ -1904,6 +1931,7 @@ TEST_P(PrefetchTest, SeekParallelizationTest1) {
#ifndef ROCKSDB_LITE
#ifdef GFLAGS
// This test verifies io_tracing with PosixFileSystem during prefetching.
TEST_P(PrefetchTest, TraceReadAsyncWithCallbackWrapper) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
@ -1916,22 +1944,11 @@ TEST_P(PrefetchTest, TraceReadAsyncWithCallbackWrapper) {
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = std::get<0>(GetParam());
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
options.statistics = CreateDBStatistics();
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
@ -2036,6 +2053,7 @@ class FilePrefetchBufferTest : public testing::Test {
fs_ = FileSystem::Default();
test_dir_ = test::PerThreadDBPath("file_prefetch_buffer_test");
ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr));
stats_ = CreateDBStatistics();
}
void TearDown() override { EXPECT_OK(DestroyDir(env_, test_dir_)); }
@ -2052,8 +2070,9 @@ class FilePrefetchBufferTest : public testing::Test {
std::string fpath = Path(fname);
std::unique_ptr<FSRandomAccessFile> f;
ASSERT_OK(fs_->NewRandomAccessFile(fpath, opts, &f, nullptr));
reader->reset(new RandomAccessFileReader(std::move(f), fpath,
env_->GetSystemClock().get()));
reader->reset(new RandomAccessFileReader(
std::move(f), fpath, env_->GetSystemClock().get(),
/*io_tracer=*/nullptr, stats_.get()));
}
void AssertResult(const std::string& content,
@ -2066,11 +2085,13 @@ class FilePrefetchBufferTest : public testing::Test {
}
FileSystem* fs() { return fs_.get(); }
Statistics* stats() { return stats_.get(); }
private:
Env* env_;
std::shared_ptr<FileSystem> fs_;
std::string test_dir_;
std::shared_ptr<Statistics> stats_;
std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; }
};
@ -2099,6 +2120,53 @@ TEST_F(FilePrefetchBufferTest, SeekWithBlockCacheHit) {
ASSERT_TRUE(fpb.TryReadFromCacheAsync(IOOptions(), r.get(), 8192, 8192,
&result, &s, Env::IOPriority::IO_LOW));
}
TEST_F(FilePrefetchBufferTest, NoSyncWithAsyncIO) {
std::string fname = "seek-with-block-cache-hit";
Random rand(0);
std::string content = rand.RandomString(32768);
Write(fname, content);
FileOptions opts;
std::unique_ptr<RandomAccessFileReader> r;
Read(fname, opts, &r);
FilePrefetchBuffer fpb(
/*readahead_size=*/8192, /*max_readahead_size=*/16384, /*enable=*/true,
/*track_min_offset=*/false, /*implicit_auto_readahead=*/false,
/*num_file_reads=*/0, /*num_file_reads_for_auto_readahead=*/0, fs());
int read_async_called = 0;
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::ReadAsync",
[&](void* /*arg*/) { read_async_called++; });
SyncPoint::GetInstance()->EnableProcessing();
Slice async_result;
// Simulate a seek of 4000 bytes at offset 3000. Due to the readahead
// settings, it will do two reads of 4000+4096 and 4096
Status s = fpb.PrefetchAsync(IOOptions(), r.get(), 3000, 4000, &async_result);
// Platforms that don't have IO uring may not support async IO
ASSERT_TRUE(s.IsTryAgain() || s.IsNotSupported());
ASSERT_TRUE(fpb.TryReadFromCacheAsync(IOOptions(), r.get(), /*offset=*/3000,
/*length=*/4000, &async_result, &s,
Env::IOPriority::IO_LOW));
// No sync call should be made.
HistogramData sst_read_micros;
stats()->histogramData(SST_READ_MICROS, &sst_read_micros);
ASSERT_EQ(sst_read_micros.count, 0);
// Number of async calls should be.
ASSERT_EQ(read_async_called, 2);
// Length should be 4000.
ASSERT_EQ(async_result.size(), 4000);
// Data correctness.
Slice result(content.c_str() + 3000, 4000);
ASSERT_EQ(result.size(), 4000);
ASSERT_EQ(result, async_result);
}
#endif // ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE

Loading…
Cancel
Save