diff --git a/env/env_test.cc b/env/env_test.cc index 98f73d9a9..4f31dbd14 100644 --- a/env/env_test.cc +++ b/env/env_test.cc @@ -1180,6 +1180,99 @@ TEST_P(EnvPosixTestWithParam, MultiRead) { } } +TEST_F(EnvPosixTest, MultiReadNonAlignedLargeNum) { + // In this test we don't do aligned read, wo it doesn't work for + // direct I/O case. + EnvOptions soptions; + soptions.use_direct_reads = soptions.use_direct_writes = false; + std::string fname = test::PerThreadDBPath(env_, "testfile"); + + const size_t kTotalSize = 81920; + std::string expected_data; + Random rnd(301); + test::RandomString(&rnd, kTotalSize, &expected_data); + + // Create file. + { + std::unique_ptr wfile; + ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); + ASSERT_OK(wfile->Append(expected_data)); + ASSERT_OK(wfile->Close()); + } + + // More attempts to simulate more partial result sequences. + for (uint32_t attempt = 0; attempt < 25; attempt++) { + // Right now kIoUringDepth is hard coded as 256, so we need very large + // number of keys to cover the case of multiple rounds of submissions. + // Right now the test latency is still acceptable. If it ends up with + // too long, we can modify the io uring depth with SyncPoint here. + const int num_reads = rnd.Uniform(512) + 1; + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "PosixRandomAccessFile::MultiRead:io_uring_result", [&](void* arg) { + if (attempt > 5) { + // Improve partial result rates in second half of the run to + // cover the case of repeated partial results. + int odd = (attempt < 15) ? num_reads / 2 : 4; + // No failure in first several attempts. + size_t& bytes_read = *static_cast(arg); + if (rnd.OneIn(odd)) { + bytes_read = 0; + } else if (rnd.OneIn(odd / 2)) { + bytes_read = static_cast( + rnd.Uniform(static_cast(bytes_read))); + } + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Generate (offset, len) pairs + std::set start_offsets; + for (int i = 0; i < num_reads; i++) { + int rnd_off; + // No repeat offsets. + while (start_offsets.find(rnd_off = rnd.Uniform(81920)) != start_offsets.end()) {} + start_offsets.insert(rnd_off); + } + std::vector offsets; + std::vector lens; + // std::set already sorted the offsets. + for (int so: start_offsets) { + offsets.push_back(so); + } + for (size_t i = 0; i < offsets.size() - 1; i++) { + lens.push_back(static_cast(rnd.Uniform(static_cast(offsets[i + 1] - offsets[i])) + 1)); + } + lens.push_back(static_cast(rnd.Uniform(static_cast(kTotalSize - offsets.back())) + 1)); + ASSERT_EQ(num_reads, lens.size()); + + // Create requests + std::vector scratches; + scratches.reserve(num_reads); + std::vector reqs(num_reads); + for (size_t i = 0; i < reqs.size(); ++i) { + reqs[i].offset = offsets[i]; + reqs[i].len = lens[i]; + scratches.emplace_back(reqs[i].len, ' '); + reqs[i].scratch = const_cast(scratches.back().data()); + } + + // Query the data + std::unique_ptr file; + ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); + ASSERT_OK(file->MultiRead(reqs.data(), reqs.size())); + + // Validate results + for (int i = 0; i < num_reads; ++i) { + ASSERT_OK(reqs[i].status); + ASSERT_EQ(Slice(expected_data.data() + offsets[i], lens[i]).ToString(true), + reqs[i].result.ToString(true)); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + } +} + // Only works in linux platforms #ifdef OS_WIN TEST_P(EnvPosixTestWithParam, DISABLED_InvalidateCache) {