Add UT to test BG read qps behavior during upgrade for pr11406 (#11522)

Summary:
**Context/Summary:**
When db is upgrading to adopt [pr11406](https://github.com/facebook/rocksdb/pull/11406/), it's possible for RocksDB to infer a small tail size to prefetch for pre-upgrade files. Such small tail size would have caused 1 file read per index or filter partition if partitioned index or filer is used. This PR provides a UT to show this would not happen.

Misc: refactor the related UTs a bit to make this new UT more readable.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11522

Test Plan:
- New UT
If logic of upgrade is wrong e.g,
```
 --- a/table/block_based/partitioned_index_reader.cc
+++ b/table/block_based/partitioned_index_reader.cc
@@ -166,7 +166,8 @@ Status PartitionIndexReader::CacheDependencies(
   uint64_t prefetch_len = last_off - prefetch_off;
   std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
   if (tail_prefetch_buffer == nullptr || !tail_prefetch_buffer->Enabled() ||
-      tail_prefetch_buffer->GetPrefetchOffset() > prefetch_off) {
+      (false && tail_prefetch_buffer->GetPrefetchOffset() > prefetch_off)) {
```
, then the UT will fail like below
```
[ RUN      ] PrefetchTailTest/PrefetchTailTest.UpgradeToTailSizeInManifest/0
file/prefetch_test.cc:461: Failure
Expected: (db_open_file_read.count) < (num_index_partition), actual: 38 vs 33
Received signal 11 (Segmentation fault)
```

Reviewed By: pdillinger

Differential Revision: D46546707

Pulled By: hx235

fbshipit-source-id: 9897b0a975e9055963edac5451fd1cd9d6c45d0e
oxigraph-main
Hui Xiao 1 year ago committed by Facebook GitHub Bot
parent 66499780b2
commit 1da9ac2363
  1. 190
      file/prefetch_test.cc
  2. 7
      table/block_based/block_based_table_reader.cc

@ -25,10 +25,12 @@ class MockFS;
class MockRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
public:
MockRandomAccessFile(std::unique_ptr<FSRandomAccessFile>& file,
bool support_prefetch, std::atomic_int& prefetch_count)
bool support_prefetch, std::atomic_int& prefetch_count,
bool small_buffer_alignment = false)
: FSRandomAccessFileOwnerWrapper(std::move(file)),
support_prefetch_(support_prefetch),
prefetch_count_(prefetch_count) {}
prefetch_count_(prefetch_count),
small_buffer_alignment_(small_buffer_alignment) {}
IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options,
IODebugContext* dbg) override {
@ -40,16 +42,25 @@ class MockRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
}
}
size_t GetRequiredBufferAlignment() const override {
return small_buffer_alignment_
? 1
: FSRandomAccessFileOwnerWrapper::GetRequiredBufferAlignment();
}
private:
const bool support_prefetch_;
std::atomic_int& prefetch_count_;
const bool small_buffer_alignment_;
};
class MockFS : public FileSystemWrapper {
public:
explicit MockFS(const std::shared_ptr<FileSystem>& wrapped,
bool support_prefetch)
: FileSystemWrapper(wrapped), support_prefetch_(support_prefetch) {}
bool support_prefetch, bool small_buffer_alignment = false)
: FileSystemWrapper(wrapped),
support_prefetch_(support_prefetch),
small_buffer_alignment_(small_buffer_alignment) {}
static const char* kClassName() { return "MockFS"; }
const char* Name() const override { return kClassName(); }
@ -61,8 +72,8 @@ class MockFS : public FileSystemWrapper {
std::unique_ptr<FSRandomAccessFile> file;
IOStatus s;
s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
result->reset(
new MockRandomAccessFile(file, support_prefetch_, prefetch_count_));
result->reset(new MockRandomAccessFile(
file, support_prefetch_, prefetch_count_, small_buffer_alignment_));
return s;
}
@ -76,6 +87,7 @@ class MockFS : public FileSystemWrapper {
private:
const bool support_prefetch_;
const bool small_buffer_alignment_;
std::atomic_int prefetch_count_{0};
};
@ -85,7 +97,8 @@ class PrefetchTest
public:
PrefetchTest() : DBTestBase("prefetch_test", true) {}
void SetGenericOptions(Env* env, bool use_direct_io, Options& options) {
virtual void SetGenericOptions(Env* env, bool use_direct_io,
Options& options) {
options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
@ -236,30 +249,79 @@ TEST_P(PrefetchTest, Basic) {
Close();
}
TEST_P(PrefetchTest, BlockBasedTableTailPrefetch) {
const bool support_prefetch =
std::get<0>(GetParam()) &&
test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
// Second param is if directIO is enabled or not
const bool use_direct_io = std::get<1>(GetParam());
const bool use_file_prefetch_buffer = !support_prefetch || use_direct_io;
class PrefetchTailTest : public PrefetchTest {
public:
bool SupportPrefetch() const {
return std::get<0>(GetParam()) &&
test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
}
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool UseDirectIO() const { return std::get<1>(GetParam()); }
bool UseFilePrefetchBuffer() const {
return !SupportPrefetch() || UseDirectIO();
}
Env* GetEnv(bool small_buffer_alignment = false) const {
std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(
env_->GetFileSystem(), SupportPrefetch(), small_buffer_alignment);
return new CompositeEnvWrapper(env_, fs);
}
void SetGenericOptions(Env* env, bool use_direct_io,
Options& options) override {
PrefetchTest::SetGenericOptions(env, use_direct_io, options);
options.statistics = CreateDBStatistics();
}
void SetBlockBasedTableOptions(
BlockBasedTableOptions& table_options, bool partition_filters = true,
uint64_t metadata_block_size =
BlockBasedTableOptions().metadata_block_size,
bool use_small_cache = false) {
table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
table_options.partition_filters = partition_filters;
if (table_options.partition_filters) {
table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
}
table_options.metadata_block_size = metadata_block_size;
if (use_small_cache) {
LRUCacheOptions co;
co.capacity = 1;
std::shared_ptr<Cache> cache = NewLRUCache(co);
table_options.block_cache = cache;
}
}
int64_t GetNumIndexPartition() const {
int64_t index_partition_counts = 0;
TablePropertiesCollection all_table_props;
assert(db_->GetPropertiesOfAllTables(&all_table_props).ok());
for (const auto& name_and_table_props : all_table_props) {
const auto& table_props = name_and_table_props.second;
index_partition_counts += table_props->index_partitions;
}
return index_partition_counts;
}
};
INSTANTIATE_TEST_CASE_P(PrefetchTailTest, PrefetchTailTest,
::testing::Combine(::testing::Bool(),
::testing::Bool()));
TEST_P(PrefetchTailTest, Basic) {
std::unique_ptr<Env> env(GetEnv());
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
options.statistics = CreateDBStatistics();
SetGenericOptions(env.get(), UseDirectIO(), options);
BlockBasedTableOptions bbto;
bbto.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
bbto.partition_filters = true;
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
SetBlockBasedTableOptions(bbto);
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
if (UseDirectIO() && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test
ROCKSDB_GTEST_BYPASS("Direct IO is not supported");
return;
@ -276,7 +338,7 @@ TEST_P(PrefetchTest, BlockBasedTableTailPrefetch) {
HistogramData post_flush_file_read;
options.statistics->histogramData(FILE_READ_FLUSH_MICROS,
&post_flush_file_read);
if (use_file_prefetch_buffer) {
if (UseFilePrefetchBuffer()) {
// `PartitionedFilterBlockReader/PartitionIndexReader::CacheDependencies()`
// should read from the prefetched tail in file prefetch buffer instead of
// initiating extra SST reads. Therefore `BlockBasedTable::PrefetchTail()`
@ -300,7 +362,7 @@ TEST_P(PrefetchTest, BlockBasedTableTailPrefetch) {
HistogramData post_compaction_file_read;
options.statistics->histogramData(FILE_READ_COMPACTION_MICROS,
&post_compaction_file_read);
if (use_file_prefetch_buffer) {
if (UseFilePrefetchBuffer()) {
// `PartitionedFilterBlockReader/PartitionIndexReader::CacheDependencies()`
// should read from the prefetched tail in file prefetch buffer instead of
// initiating extra SST reads.
@ -323,6 +385,84 @@ TEST_P(PrefetchTest, BlockBasedTableTailPrefetch) {
Close();
}
TEST_P(PrefetchTailTest, UpgradeToTailSizeInManifest) {
if (!UseFilePrefetchBuffer()) {
ROCKSDB_GTEST_BYPASS(
"Upgrade to tail size in manifest is only relevant when RocksDB file "
"prefetch buffer is used.");
}
if (UseDirectIO()) {
ROCKSDB_GTEST_BYPASS(
"To simplify testing logics with setting file's buffer alignment to be "
"1, direct IO is required to be disabled.");
}
std::unique_ptr<Env> env(GetEnv(true /* small_buffer_alignment */));
Options options;
SetGenericOptions(env.get(), false /* use_direct_io*/, options);
options.max_open_files = -1;
options.write_buffer_size = 1024 * 1024;
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options, false /* partition_filters */,
1 /* metadata_block_size*/,
true /* use_small_cache */);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
SyncPoint::GetInstance()->EnableProcessing();
// To simulate a pre-upgrade DB where file tail size is not recorded in
// manifest
SyncPoint::GetInstance()->SetCallBack(
"FileMetaData::FileMetaData", [&](void* arg) {
FileMetaData* meta = static_cast<FileMetaData*>(arg);
meta->tail_size = 0;
});
ASSERT_OK(TryReopen(options));
for (int i = 0; i < 10000; ++i) {
ASSERT_OK(Put("k" + std::to_string(i), "v"));
}
ASSERT_OK(Flush());
SyncPoint::GetInstance()->ClearAllCallBacks();
// To simulate a DB undergoing the upgrade where tail size to prefetch is
// inferred to be a small number for files with no tail size recorded in
// manifest.
// "1" is chosen to be such number so that with `small_buffer_alignment ==
// true` and `use_small_cache == true`, it would have caused one file read per
// index partition during db open if the upgrade is done wrong.
SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTable::Open::TailPrefetchLen", [&](void* arg) {
std::pair<size_t*, size_t*>* prefetch_off_len_pair =
static_cast<std::pair<size_t*, size_t*>*>(arg);
size_t* prefetch_off = prefetch_off_len_pair->first;
size_t* tail_size = prefetch_off_len_pair->second;
const size_t file_size = *prefetch_off + *tail_size;
*tail_size = 1;
*prefetch_off = file_size - (*tail_size);
});
ASSERT_OK(TryReopen(options));
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
HistogramData db_open_file_read;
options.statistics->histogramData(FILE_READ_DB_OPEN_MICROS,
&db_open_file_read);
int64_t num_index_partition = GetNumIndexPartition();
// If the upgrade is done right, db open will prefetch all the index
// partitions at once, instead of doing one read per partition.
// That is, together with `metadata_block_size == 1`, there will be more index
// partitions than number of non index partitions reads.
ASSERT_LT(db_open_file_read.count, num_index_partition);
Close();
}
// This test verifies BlockBasedTableOptions.max_auto_readahead_size is
// configured dynamically.
TEST_P(PrefetchTest, ConfigureAutoMaxReadaheadSize) {

@ -851,8 +851,13 @@ Status BlockBasedTable::PrefetchTail(
prefetch_off = static_cast<size_t>(file_size - tail_prefetch_size);
prefetch_len = tail_prefetch_size;
}
#ifndef NDEBUG
std::pair<size_t*, size_t*> prefetch_off_len_pair = {&prefetch_off,
&prefetch_len};
TEST_SYNC_POINT_CALLBACK("BlockBasedTable::Open::TailPrefetchLen",
&tail_prefetch_size);
&prefetch_off_len_pair);
#endif // NDEBUG
// Try file system prefetch
if (!file->use_direct_io() && !force_direct_prefetch) {

Loading…
Cancel
Save