Support direct IO in RandomAccessFileReader::MultiRead (#6446)

Summary:
By supporting direct IO in RandomAccessFileReader::MultiRead, the benefits of parallel IO (IO uring) and direct IO can be combined.

In direct IO mode, read requests are aligned and merged together before being issued to RandomAccessFile::MultiRead, so blocks in the original requests might share the same underlying buffer, the shared buffers are returned in `aligned_bufs`, which is a new parameter of the `MultiRead` API.

For example, suppose alignment requirement for direct IO is 4KB, one request is (offset: 1KB, len: 1KB), another request is (offset: 3KB, len: 1KB), then since they all belong to page (offset: 0, len: 4KB), `MultiRead` only reads the page with direct IO into a buffer on heap, and returns 2 Slices referencing regions in that same buffer. See `random_access_file_reader_test.cc` for more examples.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6446

Test Plan: Added a new test `random_access_file_reader_test.cc`.

Reviewed By: anand1976

Differential Revision: D20097518

Pulled By: cheng-chang

fbshipit-source-id: ca48a8faf9c3af146465c102ef6b266a363e78d1
main
Cheng Chang 5 years ago committed by Facebook GitHub Bot
parent 5fd152b7ad
commit 4fc216649d
  1. 1
      CMakeLists.txt
  2. 4
      Makefile
  3. 7
      TARGETS
  4. 111
      file/random_access_file_reader.cc
  5. 25
      file/random_access_file_reader.h
  6. 255
      file/random_access_file_reader_test.cc
  7. 2
      port/win/env_win.cc
  8. 1
      src.mk
  9. 6
      table/block_based/block_based_table_reader.cc
  10. 6
      utilities/blob_db/blob_db_impl.cc
  11. 18
      utilities/blob_db/blob_file.cc

@ -1022,6 +1022,7 @@ if(WITH_TESTS)
env/io_posix_test.cc env/io_posix_test.cc
env/mock_env_test.cc env/mock_env_test.cc
file/delete_scheduler_test.cc file/delete_scheduler_test.cc
file/random_access_file_reader_test.cc
logging/auto_roll_logger_test.cc logging/auto_roll_logger_test.cc
logging/env_logger_test.cc logging/env_logger_test.cc
logging/event_logger_test.cc logging/event_logger_test.cc

@ -513,6 +513,7 @@ TESTS = \
fault_injection_test \ fault_injection_test \
filelock_test \ filelock_test \
filename_test \ filename_test \
random_access_file_reader_test \
file_reader_writer_test \ file_reader_writer_test \
block_based_filter_block_test \ block_based_filter_block_test \
full_filter_block_test \ full_filter_block_test \
@ -1505,6 +1506,9 @@ delete_scheduler_test: file/delete_scheduler_test.o $(LIBOBJECTS) $(TESTHARNESS)
filename_test: db/filename_test.o $(LIBOBJECTS) $(TESTHARNESS) filename_test: db/filename_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)
random_access_file_reader_test: file/random_access_file_reader_test.o $(LIBOBJECTS) $(TESTHARNESS) $(TESTUTIL)
$(AM_LINK)
file_reader_writer_test: util/file_reader_writer_test.o $(LIBOBJECTS) $(TESTHARNESS) file_reader_writer_test: util/file_reader_writer_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)

@ -1300,6 +1300,13 @@ ROCKS_TESTS = [
[], [],
[], [],
], ],
[
"random_access_file_reader_test",
"file/random_access_file_reader_test.cc",
"serial",
[],
[],
],
[ [
"random_test", "random_test",
"util/random_test.cc", "util/random_test.cc",

@ -20,11 +20,11 @@
#include "util/rate_limiter.h" #include "util/rate_limiter.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
char* scratch, char* scratch, AlignedBuf* aligned_buf,
std::unique_ptr<const char[]>* internal_buf,
bool for_compaction) const { bool for_compaction) const {
(void) internal_buf; (void)aligned_buf;
Status s; Status s;
uint64_t elapsed = 0; uint64_t elapsed = 0;
{ {
@ -81,11 +81,11 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
size_t res_len = 0; size_t res_len = 0;
if (s.ok() && offset_advance < buf.CurrentSize()) { if (s.ok() && offset_advance < buf.CurrentSize()) {
res_len = std::min(buf.CurrentSize() - offset_advance, n); res_len = std::min(buf.CurrentSize() - offset_advance, n);
if (internal_buf == nullptr) { if (aligned_buf == nullptr) {
buf.Read(scratch, offset_advance, res_len); buf.Read(scratch, offset_advance, res_len);
} else { } else {
scratch = buf.BufferStart(); scratch = buf.BufferStart();
internal_buf->reset(buf.Release()); aligned_buf->reset(buf.Release());
} }
} }
*result = Slice(scratch, res_len); *result = Slice(scratch, res_len);
@ -154,11 +154,44 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
return s; return s;
} }
size_t End(const FSReadRequest& r) {
return static_cast<size_t>(r.offset) + r.len;
}
FSReadRequest Align(const FSReadRequest& r, size_t alignment) {
FSReadRequest req;
req.offset = static_cast<uint64_t>(
TruncateToPageBoundary(alignment, static_cast<size_t>(r.offset)));
req.len = Roundup(End(r), alignment) - req.offset;
return req;
}
// Try to merge src to dest if they have overlap.
//
// Each request represents an inclusive interval [offset, offset + len].
// If the intervals have overlap, update offset and len to represent the
// merged interval, and return true.
// Otherwise, do nothing and return false.
bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) {
size_t dest_offset = static_cast<size_t>(dest->offset);
size_t src_offset = static_cast<size_t>(src.offset);
size_t dest_end = End(*dest);
size_t src_end = End(src);
if (std::max(dest_offset, dest_offset) > std::min(dest_end, src_end)) {
return false;
}
dest->offset = static_cast<uint64_t>(std::min(dest_offset, src_offset));
dest->len = std::max(dest_end, src_end) - dest->offset;
return true;
}
Status RandomAccessFileReader::MultiRead(FSReadRequest* read_reqs, Status RandomAccessFileReader::MultiRead(FSReadRequest* read_reqs,
size_t num_reqs) const { size_t num_reqs,
AlignedBuf* aligned_buf) const {
(void)aligned_buf; // suppress warning of unused variable in LITE mode
assert(num_reqs > 0);
Status s; Status s;
uint64_t elapsed = 0; uint64_t elapsed = 0;
assert(!use_direct_io());
{ {
StopWatch sw(env_, stats_, hist_type_, StopWatch sw(env_, stats_, hist_type_,
(stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/, (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
@ -166,6 +199,44 @@ Status RandomAccessFileReader::MultiRead(FSReadRequest* read_reqs,
auto prev_perf_level = GetPerfLevel(); auto prev_perf_level = GetPerfLevel();
IOSTATS_TIMER_GUARD(read_nanos); IOSTATS_TIMER_GUARD(read_nanos);
FSReadRequest* fs_reqs = read_reqs;
size_t num_fs_reqs = num_reqs;
#ifndef ROCKSDB_LITE
std::vector<FSReadRequest> aligned_reqs;
if (use_direct_io()) {
// num_reqs is the max possible size,
// this can reduce std::vecector's internal resize operations.
aligned_reqs.reserve(num_reqs);
// Align and merge the read requests.
size_t alignment = file_->GetRequiredBufferAlignment();
aligned_reqs.push_back(Align(read_reqs[0], alignment));
for (size_t i = 1; i < num_reqs; i++) {
const auto& r = Align(read_reqs[i], alignment);
if (!TryMerge(&aligned_reqs.back(), r)) {
aligned_reqs.push_back(r);
}
}
// Allocate aligned buffer and let scratch buffers point to it.
size_t total_len = 0;
for (const auto& r : aligned_reqs) {
total_len += r.len;
}
AlignedBuffer buf;
buf.Alignment(alignment);
buf.AllocateNewBuffer(total_len);
char* scratch = buf.BufferStart();
for (auto& r : aligned_reqs) {
r.scratch = scratch;
scratch += r.len;
}
aligned_buf->reset(buf.Release());
fs_reqs = aligned_reqs.data();
num_fs_reqs = aligned_reqs.size();
}
#endif // ROCKSDB_LITE
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
FileOperationInfo::TimePoint start_ts; FileOperationInfo::TimePoint start_ts;
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
@ -174,8 +245,31 @@ Status RandomAccessFileReader::MultiRead(FSReadRequest* read_reqs,
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
{ {
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_); IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
s = file_->MultiRead(read_reqs, num_reqs, IOOptions(), nullptr); s = file_->MultiRead(fs_reqs, num_fs_reqs, IOOptions(), nullptr);
} }
#ifndef ROCKSDB_LITE
if (use_direct_io()) {
// Populate results in the unaligned read requests.
size_t aligned_i = 0;
for (size_t i = 0; i < num_reqs; i++) {
auto& r = read_reqs[i];
if (static_cast<size_t>(r.offset) > End(aligned_reqs[aligned_i])) {
aligned_i++;
}
const auto& fs_r = fs_reqs[aligned_i];
r.status = fs_r.status;
if (r.status.ok()) {
uint64_t offset = r.offset - fs_r.offset;
size_t len = std::min(r.len, static_cast<size_t>(fs_r.len - offset));
r.result = Slice(fs_r.scratch + offset, len);
} else {
r.result = Slice();
}
}
}
#endif // ROCKSDB_LITE
for (size_t i = 0; i < num_reqs; ++i) { for (size_t i = 0; i < num_reqs; ++i) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
@ -194,4 +288,5 @@ Status RandomAccessFileReader::MultiRead(FSReadRequest* read_reqs,
return s; return s;
} }
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -22,6 +22,8 @@ namespace ROCKSDB_NAMESPACE {
class Statistics; class Statistics;
class HistogramImpl; class HistogramImpl;
using AlignedBuf = std::unique_ptr<const char[]>;
// RandomAccessFileReader is a wrapper on top of Env::RnadomAccessFile. It is // RandomAccessFileReader is a wrapper on top of Env::RnadomAccessFile. It is
// responsible for: // responsible for:
// - Handling Buffered and Direct reads appropriately. // - Handling Buffered and Direct reads appropriately.
@ -59,7 +61,7 @@ class RandomAccessFileReader {
public: public:
explicit RandomAccessFileReader( explicit RandomAccessFileReader(
std::unique_ptr<FSRandomAccessFile>&& raf, std::string _file_name, std::unique_ptr<FSRandomAccessFile>&& raf, const std::string& _file_name,
Env* env = nullptr, Statistics* stats = nullptr, uint32_t hist_type = 0, Env* env = nullptr, Statistics* stats = nullptr, uint32_t hist_type = 0,
HistogramImpl* file_read_hist = nullptr, HistogramImpl* file_read_hist = nullptr,
RateLimiter* rate_limiter = nullptr, RateLimiter* rate_limiter = nullptr,
@ -106,17 +108,22 @@ class RandomAccessFileReader {
// 1. if using mmap, result is stored in a buffer other than scratch; // 1. if using mmap, result is stored in a buffer other than scratch;
// 2. if not using mmap, result is stored in the buffer starting from scratch. // 2. if not using mmap, result is stored in the buffer starting from scratch.
// //
// In direct IO mode, an internal aligned buffer is allocated. // In direct IO mode, an aligned buffer is allocated internally.
// 1. If internal_buf is null, then results are copied to the buffer // 1. If aligned_buf is null, then results are copied to the buffer
// starting from scratch; // starting from scratch;
// 2. Otherwise, scratch is not used and can be null, the internal_buf owns // 2. Otherwise, scratch is not used and can be null, the aligned_buf owns
// the internally allocated buffer on return, and the result refers to a // the internally allocated buffer on return, and the result refers to a
// region in internal_buf. // region in aligned_buf.
Status Read(uint64_t offset, size_t n, Slice* result, char* scratch, Status Read(uint64_t offset, size_t n, Slice* result, char* scratch,
std::unique_ptr<const char[]>* internal_buf, AlignedBuf* aligned_buf, bool for_compaction = false) const;
bool for_compaction = false) const;
// REQUIRES:
Status MultiRead(FSReadRequest* reqs, size_t num_reqs) const; // num_reqs > 0, reqs do not overlap, and offsets in reqs are increasing.
// In non-direct IO mode, aligned_buf should be null;
// In direct IO mode, aligned_buf stores the aligned buffer allocated inside
// MultiRead, the result Slices in reqs refer to aligned_buf.
Status MultiRead(FSReadRequest* reqs, size_t num_reqs,
AlignedBuf* aligned_buf) const;
Status Prefetch(uint64_t offset, size_t n) const { Status Prefetch(uint64_t offset, size_t n) const {
return file_->Prefetch(offset, n, IOOptions(), nullptr); return file_->Prefetch(offset, n, IOOptions(), nullptr);

@ -0,0 +1,255 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/file_system.h"
#include "file/random_access_file_reader.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
namespace ROCKSDB_NAMESPACE {
class RandomAccessFileReaderTest : public testing::Test {
public:
void SetUp() override {
#ifdef OS_LINUX
// TEST_TMPDIR may be set to /dev/shm in Makefile,
// but /dev/shm does not support direct IO.
// The default TEST_TMPDIR is under /tmp, but /tmp might also be a tmpfs
// which does not support direct IO neither.
unsetenv("TEST_TMPDIR");
char* tmpdir = getenv("DISK_TEMP_DIR");
if (tmpdir == nullptr) {
tmpdir = getenv("HOME");
}
if (tmpdir != nullptr) {
setenv("TEST_TMPDIR", tmpdir, 1);
}
#endif
env_ = Env::Default();
fs_ = FileSystem::Default();
test_dir_ = test::PerThreadDBPath("random_access_file_reader_test");
ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr));
alignment_ = GetAlignment();
}
void TearDown() override {
EXPECT_OK(test::DestroyDir(env_, test_dir_));
}
bool IsDirectIOSupported() {
Write(".direct", "");
FileOptions opt;
opt.use_direct_reads = true;
std::unique_ptr<FSRandomAccessFile> f;
auto s = fs_->NewRandomAccessFile(Path(".direct"), opt, &f, nullptr);
return s.ok();
}
void Write(const std::string& fname, const std::string& content) {
std::unique_ptr<FSWritableFile> f;
ASSERT_OK(fs_->NewWritableFile(Path(fname), FileOptions(), &f, nullptr));
ASSERT_OK(f->Append(content, IOOptions(), nullptr));
ASSERT_OK(f->Close(IOOptions(), nullptr));
}
void Read(const std::string& fname, const FileOptions& opts,
std::unique_ptr<RandomAccessFileReader>* reader) {
std::string fpath = Path(fname);
std::unique_ptr<FSRandomAccessFile> f;
ASSERT_OK(fs_->NewRandomAccessFile(fpath, opts, &f, nullptr));
(*reader).reset(new RandomAccessFileReader(std::move(f), fpath, env_));
}
void AssertResult(const std::string& content,
const std::vector<FSReadRequest>& reqs) {
for (const auto& r : reqs) {
ASSERT_OK(r.status);
ASSERT_EQ(r.len, r.result.size());
ASSERT_EQ(content.substr(r.offset, r.len), r.result.ToString());
}
}
size_t alignment() const { return alignment_; }
private:
Env* env_;
std::shared_ptr<FileSystem> fs_;
std::string test_dir_;
size_t alignment_;
std::string Path(const std::string& fname) {
return test_dir_ + "/" + fname;
}
size_t GetAlignment() {
std::string f = "get_alignment";
Write(f, "");
std::unique_ptr<RandomAccessFileReader> r;
Read(f, FileOptions(), &r);
size_t alignment = r->file()->GetRequiredBufferAlignment();
EXPECT_OK(fs_->DeleteFile(Path(f), IOOptions(), nullptr));
return alignment;
}
};
TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
if (!IsDirectIOSupported()) {
printf("Direct IO is not supported, skip this test\n");
return;
}
// Creates a file with 3 pages.
std::string fname = "multi-read-direct-io";
Random rand(0);
std::string content;
test::RandomString(&rand, 3 * static_cast<int>(alignment()), &content);
Write(fname, content);
FileOptions opts;
opts.use_direct_reads = true;
std::unique_ptr<RandomAccessFileReader> r;
Read(fname, opts, &r);
ASSERT_TRUE(r->use_direct_io());
{
// Reads 2 blocks in the 1st page.
// The results should be SharedSlices of the same underlying buffer.
//
// Illustration (each x is a 1/4 page)
// First page: xxxx
// 1st block: x
// 2nd block: xx
FSReadRequest r0;
r0.offset = 0;
r0.len = alignment() / 4;
r0.scratch = nullptr;
FSReadRequest r1;
r1.offset = alignment() / 2;
r1.len = alignment() / 2;
r1.scratch = nullptr;
std::vector<FSReadRequest> reqs;
reqs.push_back(std::move(r0));
reqs.push_back(std::move(r1));
AlignedBuf aligned_buf;
ASSERT_OK(r->MultiRead(reqs.data(), reqs.size(), &aligned_buf));
AssertResult(content, reqs);
}
{
// Reads 3 blocks:
// 1st block in the 1st page;
// 2nd block from the middle of the 1st page to the middle of the 2nd page;
// 3rd block in the 2nd page.
// The results should be SharedSlices of the same underlying buffer.
//
// Illustration (each x is a 1/4 page)
// 2 pages: xxxxxxxx
// 1st block: x
// 2nd block: xxxx
// 3rd block: x
FSReadRequest r0;
r0.offset = 0;
r0.len = alignment() / 4;
r0.scratch = nullptr;
FSReadRequest r1;
r1.offset = alignment() / 2;
r1.len = alignment();
r1.scratch = nullptr;
FSReadRequest r2;
r2.offset = 2 * alignment() - alignment() / 4;
r2.len = alignment() / 4;
r2.scratch = nullptr;
std::vector<FSReadRequest> reqs;
reqs.push_back(std::move(r0));
reqs.push_back(std::move(r1));
reqs.push_back(std::move(r2));
AlignedBuf aligned_buf;
ASSERT_OK(r->MultiRead(reqs.data(), reqs.size(), &aligned_buf));
AssertResult(content, reqs);
}
{
// Reads 3 blocks:
// 1st block in the middle of the 1st page;
// 2nd block in the middle of the 2nd page;
// 3rd block in the middle of the 3rd page.
// The results should be SharedSlices of the same underlying buffer.
//
// Illustration (each x is a 1/4 page)
// 3 pages: xxxxxxxxxxxx
// 1st block: xx
// 2nd block: xx
// 3rd block: xx
FSReadRequest r0;
r0.offset = alignment() / 4;
r0.len = alignment() / 2;
r0.scratch = nullptr;
FSReadRequest r1;
r1.offset = alignment() + alignment() / 4;
r1.len = alignment() / 2;
r1.scratch = nullptr;
FSReadRequest r2;
r2.offset = 2 * alignment() + alignment() / 4;
r2.len = alignment() / 2;
r2.scratch = nullptr;
std::vector<FSReadRequest> reqs;
reqs.push_back(std::move(r0));
reqs.push_back(std::move(r1));
reqs.push_back(std::move(r2));
AlignedBuf aligned_buf;
ASSERT_OK(r->MultiRead(reqs.data(), reqs.size(), &aligned_buf));
AssertResult(content, reqs);
}
{
// Reads 2 blocks:
// 1st block in the middle of the 1st page;
// 2nd block in the middle of the 3rd page.
// The results are two different buffers.
//
// Illustration (each x is a 1/4 page)
// 3 pages: xxxxxxxxxxxx
// 1st block: xx
// 2nd block: xx
FSReadRequest r0;
r0.offset = alignment() / 4;
r0.len = alignment() / 2;
r0.scratch = nullptr;
FSReadRequest r1;
r1.offset = 2 * alignment() + alignment() / 4;
r1.len = alignment() / 2;
r1.scratch = nullptr;
std::vector<FSReadRequest> reqs;
reqs.push_back(std::move(r0));
reqs.push_back(std::move(r1));
AlignedBuf aligned_buf;
ASSERT_OK(r->MultiRead(reqs.data(), reqs.size(), &aligned_buf));
AssertResult(content, reqs);
}
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -1424,7 +1424,7 @@ Status WinEnv::UnlockFile(FileLock* lock) {
return winenv_io_.UnlockFile(lock); return winenv_io_.UnlockFile(lock);
} }
Status WinEnv::GetTestDirectory(std::string* result) { Status WinEnv::GetTestDirectory(std::string* result) {
return winenv_io_.GetTestDirectory(result); return winenv_io_.GetTestDirectory(result);
} }

@ -395,6 +395,7 @@ MAIN_SOURCES = \
env/env_test.cc \ env/env_test.cc \
env/io_posix_test.cc \ env/io_posix_test.cc \
env/mock_env_test.cc \ env/mock_env_test.cc \
file/random_access_file_reader_test.cc \
logging/auto_roll_logger_test.cc \ logging/auto_roll_logger_test.cc \
logging/env_logger_test.cc \ logging/env_logger_test.cc \
logging/event_logger_test.cc \ logging/event_logger_test.cc \

@ -1607,7 +1607,6 @@ void BlockBasedTable::RetrieveMultipleBlocks(
req.scratch = scratch + buf_offset; req.scratch = scratch + buf_offset;
buf_offset += req.len; buf_offset += req.len;
} }
req.status = IOStatus::OK();
read_reqs.emplace_back(req); read_reqs.emplace_back(req);
} }
@ -1628,11 +1627,10 @@ void BlockBasedTable::RetrieveMultipleBlocks(
} else { } else {
req.scratch = scratch + buf_offset; req.scratch = scratch + buf_offset;
} }
req.status = IOStatus::OK();
read_reqs.emplace_back(req); read_reqs.emplace_back(req);
} }
file->MultiRead(&read_reqs[0], read_reqs.size()); file->MultiRead(&read_reqs[0], read_reqs.size(), nullptr);
idx_in_batch = 0; idx_in_batch = 0;
size_t valid_batch_idx = 0; size_t valid_batch_idx = 0;
@ -1699,7 +1697,7 @@ void BlockBasedTable::RetrieveMultipleBlocks(
// in each read request. Checksum is stored in the block trailer, // in each read request. Checksum is stored in the block trailer,
// which is handle.size() + 1. // which is handle.size() + 1.
s = ROCKSDB_NAMESPACE::VerifyChecksum(footer.checksum(), s = ROCKSDB_NAMESPACE::VerifyChecksum(footer.checksum(),
req.result.data() + req_offset, data + req_offset,
handle.size() + 1, expected); handle.size() + 1, expected);
TEST_SYNC_POINT_CALLBACK("RetrieveMultipleBlocks:VerifyChecksum", &s); TEST_SYNC_POINT_CALLBACK("RetrieveMultipleBlocks:VerifyChecksum", &s);
} }

@ -1483,7 +1483,7 @@ Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number,
// Allocate the buffer. This is safe in C++11 // Allocate the buffer. This is safe in C++11
std::string buf; std::string buf;
std::unique_ptr<const char[]> internal_buf; AlignedBuf aligned_buf;
// A partial blob record contain checksum, key and value. // A partial blob record contain checksum, key and value.
Slice blob_record; Slice blob_record;
@ -1492,11 +1492,11 @@ Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number,
StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS); StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
if (reader->use_direct_io()) { if (reader->use_direct_io()) {
s = reader->Read(record_offset, static_cast<size_t>(record_size), s = reader->Read(record_offset, static_cast<size_t>(record_size),
&blob_record, nullptr, &internal_buf); &blob_record, nullptr, &aligned_buf);
} else { } else {
buf.reserve(static_cast<size_t>(record_size)); buf.reserve(static_cast<size_t>(record_size));
s = reader->Read(record_offset, static_cast<size_t>(record_size), s = reader->Read(record_offset, static_cast<size_t>(record_size),
&blob_record, &buf[0], nullptr); &blob_record, &buf[0], nullptr);
} }
RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size()); RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size());
} }

@ -139,11 +139,11 @@ Status BlobFile::ReadFooter(BlobLogFooter* bf) {
Slice result; Slice result;
std::string buf; std::string buf;
std::unique_ptr<const char[]> internal_buf; AlignedBuf aligned_buf;
Status s; Status s;
if (ra_file_reader_->use_direct_io()) { if (ra_file_reader_->use_direct_io()) {
s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kSize, &result, s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kSize, &result,
nullptr, &internal_buf); nullptr, &aligned_buf);
} else { } else {
buf.reserve(BlobLogFooter::kSize + 10); buf.reserve(BlobLogFooter::kSize + 10);
s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kSize, &result, s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kSize, &result,
@ -263,11 +263,11 @@ Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) {
// Read file header. // Read file header.
std::string header_buf; std::string header_buf;
std::unique_ptr<const char[]> internal_buf; AlignedBuf aligned_buf;
Slice header_slice; Slice header_slice;
if (file_reader->use_direct_io()) { if (file_reader->use_direct_io()) {
s = file_reader->Read(0, BlobLogHeader::kSize, &header_slice, nullptr, s = file_reader->Read(0, BlobLogHeader::kSize, &header_slice, nullptr,
&internal_buf); &aligned_buf);
} else { } else {
header_buf.reserve(BlobLogHeader::kSize); header_buf.reserve(BlobLogHeader::kSize);
s = file_reader->Read(0, BlobLogHeader::kSize, &header_slice, s = file_reader->Read(0, BlobLogHeader::kSize, &header_slice,
@ -306,12 +306,14 @@ Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) {
std::string footer_buf; std::string footer_buf;
Slice footer_slice; Slice footer_slice;
if (file_reader->use_direct_io()) { if (file_reader->use_direct_io()) {
s = file_reader->Read(file_size - BlobLogFooter::kSize, BlobLogFooter::kSize, s = file_reader->Read(file_size - BlobLogFooter::kSize,
&footer_slice, nullptr, &internal_buf); BlobLogFooter::kSize, &footer_slice, nullptr,
&aligned_buf);
} else { } else {
footer_buf.reserve(BlobLogFooter::kSize); footer_buf.reserve(BlobLogFooter::kSize);
s = file_reader->Read(file_size - BlobLogFooter::kSize, BlobLogFooter::kSize, s = file_reader->Read(file_size - BlobLogFooter::kSize,
&footer_slice, &footer_buf[0], nullptr); BlobLogFooter::kSize, &footer_slice, &footer_buf[0],
nullptr);
} }
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_ERROR(info_log_, ROCKS_LOG_ERROR(info_log_,

Loading…
Cancel
Save