Fix segfault in FilePrefetchBuffer with async_io enabled (#9777)

Summary:
If FilePrefetchBuffer object is destroyed and then later Poll() calls callback on object which has been destroyed, it gives segfault on accessing destroyed object. It was caught after adding unit tests that tests Posix implementation of ReadAsync and Poll APIs.
This PR also updates and fixes existing IOURing tests which were not running locally because RocksDbIOUringEnable function wasn't defined and IOUring was disabled for those tests

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9777

Test Plan: Added new unit test

Reviewed By: anand1976

Differential Revision: D35254002

Pulled By: akankshamahajan15

fbshipit-source-id: 68e80054ffb14ae25c255920ebc6548ca5f130a1
main
Akanksha Mahajan 2 years ago committed by Facebook GitHub Bot
parent ec77a92882
commit 36bc3da97f
  1. 1
      HISTORY.md
  2. 7
      env/env_test.cc
  3. 9
      env/fs_posix.cc
  4. 53
      env/io_posix.cc
  5. 5
      env/io_posix.h
  6. 38
      file/file_prefetch_buffer.cc
  7. 26
      file/file_prefetch_buffer.h
  8. 81
      file/prefetch_test.cc
  9. 8
      table/block_based/block_based_table_reader.h
  10. 3
      table/block_fetcher.cc

@ -7,6 +7,7 @@
* Fixed a bug that `rocksdb.read.block.compaction.micros` cannot track compaction stats (#9722).
* Fixed `file_type`, `relative_filename` and `directory` fields returned by `GetLiveFilesMetaData()`, which were added in inheriting from `FileStorageInfo`.
* Fixed a bug affecting `track_and_verify_wals_in_manifest`. Without the fix, application may see "open error: Corruption: Missing WAL with log number" while trying to open the db. The corruption is a false alarm but prevents DB open (#9766).
* Fix segfault in FilePrefetchBuffer with async_io as it doesn't wait for pending jobs to complete on destruction.
### New Features
* For db_bench when --seed=0 or --seed is not set then it uses the current time as the seed value. Previously it used the value 1000.

7
env/env_test.cc vendored

@ -85,6 +85,8 @@ struct Deleter {
void (*fn_)(void*);
};
extern "C" bool RocksDbIOUringEnable() { return true; }
std::unique_ptr<char, Deleter> NewAligned(const size_t size, const char ch) {
char* ptr = nullptr;
#ifdef OS_WIN
@ -1256,7 +1258,7 @@ TEST_P(EnvPosixTestWithParam, MultiRead) {
// Random Read
Random rnd(301 + attempt);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"UpdateResults:io_uring_result", [&](void* arg) {
"UpdateResults::io_uring_result", [&](void* arg) {
if (attempt > 0) {
// No failure in the first attempt.
size_t& bytes_read = *static_cast<size_t*>(arg);
@ -1326,7 +1328,7 @@ TEST_F(EnvPosixTest, MultiReadNonAlignedLargeNum) {
const int num_reads = rnd.Uniform(512) + 1;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"UpdateResults:io_uring_result", [&](void* arg) {
"UpdateResults::io_uring_result", [&](void* arg) {
if (attempt > 5) {
// Improve partial result rates in second half of the run to
// cover the case of repeated partial results.
@ -3308,7 +3310,6 @@ TEST_F(TestAsyncRead, ReadAsync) {
}
}
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

9
env/fs_posix.cc vendored

@ -1045,9 +1045,12 @@ class PosixFileSystem : public FileSystem {
// EXPERIMENTAL
//
// TODO akankshamahajan: Update Poll API to take into account min_completions
// TODO akankshamahajan:
// 1. Update Poll API to take into account min_completions
// and returns if number of handles in io_handles (any order) completed is
// equal to atleast min_completions.
// 2. Currently in case of direct_io, Read API is called because of which call
// to Poll API fails as it expects IOHandle to be populated.
virtual IOStatus Poll(std::vector<void*>& io_handles,
size_t /*min_completions*/) override {
#if defined(ROCKSDB_IOURING_PRESENT)
@ -1094,12 +1097,14 @@ class PosixFileSystem : public FileSystem {
req.offset = posix_handle->offset;
req.len = posix_handle->len;
size_t finished_len = 0;
size_t bytes_read = 0;
UpdateResult(cqe, "", req.len, posix_handle->iov.iov_len,
true /*async_read*/, finished_len, &req);
true /*async_read*/, finished_len, &req, bytes_read);
posix_handle->is_finished = true;
io_uring_cqe_seen(iu, cqe);
posix_handle->cb(req, posix_handle->cb_arg);
(void)finished_len;
(void)bytes_read;
if (static_cast<Posix_IOHandle*>(io_handles[i]) == posix_handle) {
break;

53
env/io_posix.cc vendored

@ -744,31 +744,36 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs,
wrap_cache.erase(wrap_check);
FSReadRequest* req = req_wrap->req;
size_t bytes_read = 0;
UpdateResult(cqe, filename_, req->len, req_wrap->iov.iov_len,
false /*async_read*/, req_wrap->finished_len, req);
false /*async_read*/, req_wrap->finished_len, req,
bytes_read);
int32_t res = cqe->res;
if (res == 0) {
/// cqe->res == 0 can means EOF, or can mean partial results. See
// comment
// https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
// Fall back to pread in this case.
if (use_direct_io() && !IsSectorAligned(req_wrap->finished_len,
GetRequiredBufferAlignment())) {
// Bytes reads don't fill sectors. Should only happen at the end
// of the file.
req->result = Slice(req->scratch, req_wrap->finished_len);
req->status = IOStatus::OK();
} else {
Slice tmp_slice;
req->status =
Read(req->offset + req_wrap->finished_len,
req->len - req_wrap->finished_len, options, &tmp_slice,
req->scratch + req_wrap->finished_len, dbg);
req->result =
Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
if (res >= 0) {
if (bytes_read == 0) {
/// cqe->res == 0 can means EOF, or can mean partial results. See
// comment
// https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
// Fall back to pread in this case.
if (use_direct_io() &&
!IsSectorAligned(req_wrap->finished_len,
GetRequiredBufferAlignment())) {
// Bytes reads don't fill sectors. Should only happen at the end
// of the file.
req->result = Slice(req->scratch, req_wrap->finished_len);
req->status = IOStatus::OK();
} else {
Slice tmp_slice;
req->status =
Read(req->offset + req_wrap->finished_len,
req->len - req_wrap->finished_len, options, &tmp_slice,
req->scratch + req_wrap->finished_len, dbg);
req->result =
Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
}
} else if (bytes_read < req_wrap->iov.iov_len) {
incomplete_rq_list.push_back(req_wrap);
}
} else if (res > 0 && res < static_cast<int32_t>(req_wrap->iov.iov_len)) {
incomplete_rq_list.push_back(req_wrap);
}
io_uring_cqe_seen(iu, cqe);
}
@ -896,8 +901,8 @@ IOStatus PosixRandomAccessFile::ReadAsync(
// Initialize Posix_IOHandle.
posix_handle->iu = iu;
posix_handle->iov.iov_base = posix_handle->scratch;
posix_handle->iov.iov_len = posix_handle->len;
posix_handle->iov.iov_base = req.scratch;
posix_handle->iov.iov_len = req.len;
posix_handle->cb = cb;
posix_handle->cb_arg = cb_arg;
posix_handle->offset = req.offset;

5
env/io_posix.h vendored

@ -66,12 +66,13 @@ struct Posix_IOHandle {
inline void UpdateResult(struct io_uring_cqe* cqe, const std::string& file_name,
size_t len, size_t iov_len, bool async_read,
size_t& finished_len, FSReadRequest* req) {
size_t& finished_len, FSReadRequest* req,
size_t& bytes_read) {
if (cqe->res < 0) {
req->result = Slice(req->scratch, 0);
req->status = IOError("Req failed", file_name, cqe->res);
} else {
size_t bytes_read = static_cast<size_t>(cqe->res);
bytes_read = static_cast<size_t>(cqe->res);
TEST_SYNC_POINT_CALLBACK("UpdateResults::io_uring_result", &bytes_read);
if (bytes_read == iov_len) {
req->result = Slice(req->scratch, req->len);

@ -111,13 +111,6 @@ Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts,
Env::IOPriority rate_limiter_priority,
uint64_t read_len, uint64_t chunk_len,
uint64_t rounddown_start, uint32_t index) {
// Reset io_handle.
if (io_handle_ != nullptr && del_fn_ != nullptr) {
del_fn_(io_handle_);
io_handle_ = nullptr;
del_fn_ = nullptr;
}
// callback for async read request.
auto fp = std::bind(&FilePrefetchBuffer::PrefetchAsyncCallback, this,
std::placeholders::_1, std::placeholders::_2);
@ -129,6 +122,7 @@ Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts,
req.scratch = bufs_[index].buffer_.BufferStart() + chunk_len;
Status s = reader->ReadAsync(req, opts, fp, nullptr /*cb_arg*/, &io_handle_,
&del_fn_, rate_limiter_priority);
req.status.PermitUncheckedError();
if (s.ok()) {
async_read_in_progress_ = true;
}
@ -221,24 +215,31 @@ void FilePrefetchBuffer::CopyDataToBuffer(uint32_t src, uint64_t& offset,
// the asynchronous prefetching in second buffer.
Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
RandomAccessFileReader* reader,
FileSystem* fs, uint64_t offset,
size_t length, size_t readahead_size,
uint64_t offset, size_t length,
size_t readahead_size,
Env::IOPriority rate_limiter_priority,
bool& copy_to_third_buffer) {
if (!enable_) {
return Status::OK();
}
if (async_read_in_progress_ && fs != nullptr) {
if (async_read_in_progress_ && fs_ != nullptr) {
// Wait for prefetch data to complete.
// No mutex is needed as PrefetchAsyncCallback updates the result in second
// buffer and FilePrefetchBuffer should wait for Poll before accessing the
// second buffer.
std::vector<void*> handles;
handles.emplace_back(io_handle_);
fs->Poll(handles, 1).PermitUncheckedError();
fs_->Poll(handles, 1).PermitUncheckedError();
}
// Release io_handle_ after the Poll API as request has been completed.
if (io_handle_ != nullptr && del_fn_ != nullptr) {
del_fn_(io_handle_);
io_handle_ = nullptr;
del_fn_ = nullptr;
}
// TODO akanksha: Update TEST_SYNC_POINT after new tests are added.
// TODO akanksha: Update TEST_SYNC_POINT after Async APIs are merged with
// normal prefetching.
TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start");
Status s;
size_t prefetch_size = length + readahead_size;
@ -438,8 +439,8 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
bool FilePrefetchBuffer::TryReadFromCacheAsync(
const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
size_t n, Slice* result, Status* status,
Env::IOPriority rate_limiter_priority, bool for_compaction /* = false */,
FileSystem* fs) {
Env::IOPriority rate_limiter_priority, bool for_compaction /* = false */
) {
if (track_min_offset_ && offset < min_offset_read_) {
min_offset_read_ = static_cast<size_t>(offset);
}
@ -474,7 +475,7 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync(
if (implicit_auto_readahead_ && async_io_) {
// Prefetch n + readahead_size_/2 synchronously as remaining
// readahead_size_/2 will be prefetched asynchronously.
s = PrefetchAsync(opts, reader, fs, offset, n, readahead_size_ / 2,
s = PrefetchAsync(opts, reader, offset, n, readahead_size_ / 2,
rate_limiter_priority, copy_to_third_buffer);
} else {
s = Prefetch(opts, reader, offset, n + readahead_size_,
@ -527,12 +528,5 @@ void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req,
size_t current_size = bufs_[index].buffer_.CurrentSize();
bufs_[index].buffer_.Size(current_size + req.result.size());
}
// Release io_handle_.
if (io_handle_ != nullptr && del_fn_ != nullptr) {
del_fn_(io_handle_);
io_handle_ = nullptr;
del_fn_ = nullptr;
}
}
} // namespace ROCKSDB_NAMESPACE

@ -65,7 +65,7 @@ class FilePrefetchBuffer {
FilePrefetchBuffer(size_t readahead_size = 0, size_t max_readahead_size = 0,
bool enable = true, bool track_min_offset = false,
bool implicit_auto_readahead = false,
bool async_io = false)
bool async_io = false, FileSystem* fs = nullptr)
: curr_(0),
readahead_size_(readahead_size),
max_readahead_size_(max_readahead_size),
@ -79,13 +79,29 @@ class FilePrefetchBuffer {
io_handle_(nullptr),
del_fn_(nullptr),
async_read_in_progress_(false),
async_io_(async_io) {
async_io_(async_io),
fs_(fs) {
// If async_io_ is enabled, data is asynchronously filled in second buffer
// while curr_ is being consumed. If data is overlapping in two buffers,
// data is copied to third buffer to return continuous buffer.
bufs_.resize(3);
}
~FilePrefetchBuffer() {
// Wait for any pending async job before destroying the class object.
if (async_read_in_progress_ && fs_ != nullptr) {
std::vector<void*> handles;
handles.emplace_back(io_handle_);
fs_->Poll(handles, 1).PermitUncheckedError();
}
// Release io_handle_.
if (io_handle_ != nullptr && del_fn_ != nullptr) {
del_fn_(io_handle_);
io_handle_ = nullptr;
del_fn_ = nullptr;
}
}
// Load data into the buffer from a file.
// reader : the file reader.
// offset : the file offset to start reading from.
@ -100,8 +116,7 @@ class FilePrefetchBuffer {
Env::IOPriority rate_limiter_priority);
Status PrefetchAsync(const IOOptions& opts, RandomAccessFileReader* reader,
FileSystem* fs, uint64_t offset, size_t length,
size_t readahead_size,
uint64_t offset, size_t length, size_t readahead_size,
Env::IOPriority rate_limiter_priority,
bool& copy_to_third_buffer);
@ -129,7 +144,7 @@ class FilePrefetchBuffer {
RandomAccessFileReader* reader, uint64_t offset,
size_t n, Slice* result, Status* status,
Env::IOPriority rate_limiter_priority,
bool for_compaction /* = false */, FileSystem* fs);
bool for_compaction /* = false */);
// The minimum `offset` ever passed to TryReadFromCache(). This will nly be
// tracked if track_min_offset = true.
@ -256,5 +271,6 @@ class FilePrefetchBuffer {
IOHandleDeleter del_fn_;
bool async_read_in_progress_;
bool async_io_;
FileSystem* fs_;
};
} // namespace ROCKSDB_NAMESPACE

@ -1013,6 +1013,87 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
Close();
}
extern "C" bool RocksDbIOUringEnable() { return true; }
// Tests the default implementation of ReadAsync API with PosixFileSystem.
TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
}
const int kNumKeys = 1000;
std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(
FileSystem::Default(), /*support_prefetch=*/false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = false;
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test
return;
} else {
ASSERT_OK(s);
}
int total_keys = 0;
// Write the keys.
{
WriteBatch batch;
Random rnd(309);
for (int j = 0; j < 5; j++) {
for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
total_keys++;
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
}
MoveFilesToLevel(2);
}
int buff_prefetch_count = 0;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// Read the keys.
{
ReadOptions ro;
ro.adaptive_readahead = true;
ro.async_io = true;
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
num_keys++;
}
ASSERT_EQ(num_keys, total_keys);
ASSERT_GT(buff_prefetch_count, 0);
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -655,10 +655,10 @@ struct BlockBasedTable::Rep {
std::unique_ptr<FilePrefetchBuffer>* fpb,
bool implicit_auto_readahead,
bool async_io) const {
fpb->reset(new FilePrefetchBuffer(readahead_size, max_readahead_size,
!ioptions.allow_mmap_reads /* enable */,
false /* track_min_offset */,
implicit_auto_readahead, async_io));
fpb->reset(new FilePrefetchBuffer(
readahead_size, max_readahead_size,
!ioptions.allow_mmap_reads /* enable */, false /* track_min_offset */,
implicit_auto_readahead, async_io, ioptions.fs.get()));
}
void CreateFilePrefetchBufferIfNotExists(

@ -75,8 +75,7 @@ inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
if (read_options_.async_io) {
read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCacheAsync(
opts, file_, handle_.offset(), block_size_with_trailer_, &slice_,
&io_s, read_options_.rate_limiter_priority, for_compaction_,
ioptions_.fs.get());
&io_s, read_options_.rate_limiter_priority, for_compaction_);
} else {
read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCache(
opts, file_, handle_.offset(), block_size_with_trailer_, &slice_,

Loading…
Cancel
Save