Direct IO capability for RocksDB

Summary:
This patch adds direct IO capability to RocksDB Env.

The direct IO capability is required for persistent cache since NVM is best
accessed as 4K direct IO. SSDs can leverage direct IO for reading.

Direct IO requires the offset and size be sector size aligned, and memory to
be kernel page aligned. Since neither RocksDB/Persistent read cache data
layout is aligned to sector size, the code can accommodate reading unaligned IO size
(or unaligned memory) at the cost of an alloc/copy.

The write code path expects the size and memory to be aligned.

Test Plan: Run RocksDB unit tests

Reviewers: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D57393
main
krad 9 years ago
parent 8f1214531e
commit f89caa127b
  1. 6
      include/rocksdb/env.h
  2. 38
      util/env_posix.cc
  3. 579
      util/env_test.cc
  4. 179
      util/io_posix.cc
  5. 64
      util/io_posix.h

@ -68,6 +68,12 @@ struct EnvOptions {
// If true, then use mmap to write data
bool use_mmap_writes = true;
// If true, then use O_DIRECT for reading data
bool use_direct_reads = false;
// If true, then use O_DIRECT for writing data
bool use_direct_writes = false;
// If false, fallocate() calls are bypassed
bool allow_fallocate = true;

@ -152,6 +152,17 @@ class PosixEnv : public Env {
if (f == nullptr) {
*result = nullptr;
return IOError(fname, errno);
} else if (options.use_direct_reads && !options.use_mmap_writes) {
int flags = O_RDONLY | O_DIRECT;
TEST_SYNC_POINT_CALLBACK("NewSequentialFile:O_DIRECT", &flags);
int fd = open(fname.c_str(), flags, 0644);
if (fd < 0) {
return IOError(fname, errno);
}
std::unique_ptr<PosixDirectIOSequentialFile> file(
new PosixDirectIOSequentialFile(fname, fd));
*result = std::move(file);
return Status::OK();
} else {
int fd = fileno(f);
SetFD_CLOEXEC(fd, &options);
@ -189,6 +200,18 @@ class PosixEnv : public Env {
}
}
close(fd);
} else if (options.use_direct_reads) {
int flags = O_RDONLY | O_DIRECT;
TEST_SYNC_POINT_CALLBACK("NewRandomAccessFile:O_DIRECT", &flags);
fd = open(fname.c_str(), flags, 0644);
if (fd < 0) {
s = IOError(fname, errno);
} else {
std::unique_ptr<PosixDirectIORandomAccessFile> file(
new PosixDirectIORandomAccessFile(fname, fd));
*result = std::move(file);
s = Status::OK();
}
} else {
result->reset(new PosixRandomAccessFile(fname, fd, options));
}
@ -221,6 +244,18 @@ class PosixEnv : public Env {
}
if (options.use_mmap_writes && !forceMmapOff) {
result->reset(new PosixMmapFile(fname, fd, page_size_, options));
} else if (options.use_direct_writes) {
int flags = O_WRONLY | O_APPEND | O_TRUNC | O_CREAT | O_DIRECT;
TEST_SYNC_POINT_CALLBACK("NewWritableFile:O_DIRECT", &flags);
fd = open(fname.c_str(), flags, 0644);
if (fd < 0) {
s = IOError(fname, errno);
} else {
std::unique_ptr<PosixDirectIOWritableFile> file(
new PosixDirectIOWritableFile(fname, fd));
*result = std::move(file);
s = Status::OK();
}
} else {
// disable mmap writes
EnvOptions no_mmap_writes_options = options;
@ -763,6 +798,9 @@ std::string Env::GenerateUniqueId() {
return uuid2;
}
//
// Default Posix Env
//
Env* Env::Default() {
// The following function call initializes the singletons of ThreadLocalPtr
// right before the static default_env. This guarantees default_env will

@ -36,6 +36,7 @@
#include "util/log_buffer.h"
#include "util/mutexlock.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "util/testharness.h"
#include "util/testutil.h"
@ -43,6 +44,17 @@ namespace rocksdb {
static const int kDelayMicros = 100000;
std::unique_ptr<char, void (&)(void*)> NewAligned(const size_t size,
const char ch) {
char* ptr = nullptr;
if (posix_memalign(reinterpret_cast<void**>(&ptr), 4 * 1024, size) != 0) {
return std::unique_ptr<char, void (&)(void*)>(nullptr, free);
}
std::unique_ptr<char, void (&)(void*)> uptr(ptr, free);
memset(uptr.get(), ch, size);
return uptr;
}
class EnvPosixTest : public testing::Test {
private:
port::Mutex mu_;
@ -553,108 +565,119 @@ class IoctlFriendlyTmpdir {
// Only works in linux platforms
TEST_F(EnvPosixTest, RandomAccessUniqueID) {
// Create file.
const EnvOptions soptions;
IoctlFriendlyTmpdir ift;
std::string fname = ift.name() + "/testfile";
unique_ptr<WritableFile> wfile;
ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
unique_ptr<RandomAccessFile> file;
// Get Unique ID
ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
ASSERT_TRUE(id_size > 0);
std::string unique_id1(temp_id, id_size);
ASSERT_TRUE(IsUniqueIDValid(unique_id1));
// Get Unique ID again
ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
ASSERT_TRUE(id_size > 0);
std::string unique_id2(temp_id, id_size);
ASSERT_TRUE(IsUniqueIDValid(unique_id2));
// Get Unique ID again after waiting some time.
env_->SleepForMicroseconds(1000000);
ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
ASSERT_TRUE(id_size > 0);
std::string unique_id3(temp_id, id_size);
ASSERT_TRUE(IsUniqueIDValid(unique_id3));
// Check IDs are the same.
ASSERT_EQ(unique_id1, unique_id2);
ASSERT_EQ(unique_id2, unique_id3);
// Delete the file
env_->DeleteFile(fname);
for (bool directio : {true, false}) {
// Create file.
EnvOptions soptions;
soptions.use_direct_reads = soptions.use_direct_writes = directio;
IoctlFriendlyTmpdir ift;
std::string fname = ift.name() + "/testfile";
unique_ptr<WritableFile> wfile;
ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
unique_ptr<RandomAccessFile> file;
// Get Unique ID
ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
ASSERT_TRUE(id_size > 0);
std::string unique_id1(temp_id, id_size);
ASSERT_TRUE(IsUniqueIDValid(unique_id1));
// Get Unique ID again
ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
ASSERT_TRUE(id_size > 0);
std::string unique_id2(temp_id, id_size);
ASSERT_TRUE(IsUniqueIDValid(unique_id2));
// Get Unique ID again after waiting some time.
env_->SleepForMicroseconds(1000000);
ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
ASSERT_TRUE(id_size > 0);
std::string unique_id3(temp_id, id_size);
ASSERT_TRUE(IsUniqueIDValid(unique_id3));
// Check IDs are the same.
ASSERT_EQ(unique_id1, unique_id2);
ASSERT_EQ(unique_id2, unique_id3);
// Delete the file
env_->DeleteFile(fname);
}
}
// only works in linux platforms
#ifdef ROCKSDB_FALLOCATE_PRESENT
TEST_F(EnvPosixTest, AllocateTest) {
IoctlFriendlyTmpdir ift;
std::string fname = ift.name() + "/preallocate_testfile";
// Try fallocate in a file to see whether the target file system supports it.
// Skip the test if fallocate is not supported.
std::string fname_test_fallocate = ift.name() + "/preallocate_testfile_2";
int fd = -1;
do {
fd = open(fname_test_fallocate.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
} while (fd < 0 && errno == EINTR);
ASSERT_GT(fd, 0);
for (bool directio : {true, false}) {
IoctlFriendlyTmpdir ift;
std::string fname = ift.name() + "/preallocate_testfile";
// Try fallocate in a file to see whether the target file system supports
// it.
// Skip the test if fallocate is not supported.
std::string fname_test_fallocate = ift.name() + "/preallocate_testfile_2";
int fd = -1;
do {
fd = open(fname_test_fallocate.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
} while (fd < 0 && errno == EINTR);
ASSERT_GT(fd, 0);
int alloc_status = fallocate(fd, 0, 0, 1);
int err_number = 0;
if (alloc_status != 0) {
err_number = errno;
fprintf(stderr, "Warning: fallocate() fails, %s\n", strerror(err_number));
}
close(fd);
ASSERT_OK(env_->DeleteFile(fname_test_fallocate));
if (alloc_status != 0 && err_number == EOPNOTSUPP) {
// The filesystem containing the file does not support fallocate
return;
}
int alloc_status = fallocate(fd, 0, 0, 1);
EnvOptions soptions;
soptions.use_mmap_writes = false;
soptions.use_direct_reads = soptions.use_direct_writes = directio;
unique_ptr<WritableFile> wfile;
ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
int err_number = 0;
if (alloc_status != 0) {
err_number = errno;
fprintf(stderr, "Warning: fallocate() fails, %s\n", strerror(err_number));
}
close(fd);
ASSERT_OK(env_->DeleteFile(fname_test_fallocate));
if (alloc_status != 0 && err_number == EOPNOTSUPP) {
// The filesystem containing the file does not support fallocate
return;
// allocate 100 MB
size_t kPreallocateSize = 100 * 1024 * 1024;
size_t kBlockSize = 512;
size_t kPageSize = 4096;
size_t kDataSize = 1024 * 1024;
auto data_ptr = NewAligned(kDataSize, 'A');
Slice data(data_ptr.get(), kDataSize);
wfile->SetPreallocationBlockSize(kPreallocateSize);
wfile->PrepareWrite(wfile->GetFileSize(), kDataSize);
ASSERT_OK(wfile->Append(data));
ASSERT_OK(wfile->Flush());
struct stat f_stat;
ASSERT_EQ(stat(fname.c_str(), &f_stat), 0);
ASSERT_EQ((unsigned int)kDataSize, f_stat.st_size);
// verify that blocks are preallocated
// Note here that we don't check the exact number of blocks preallocated --
// we only require that number of allocated blocks is at least what we
// expect.
// It looks like some FS give us more blocks that we asked for. That's fine.
// It might be worth investigating further.
ASSERT_LE((unsigned int)(kPreallocateSize / kBlockSize), f_stat.st_blocks);
// close the file, should deallocate the blocks
wfile.reset();
stat(fname.c_str(), &f_stat);
ASSERT_EQ((unsigned int)kDataSize, f_stat.st_size);
// verify that preallocated blocks were deallocated on file close
// Because the FS might give us more blocks, we add a full page to the size
// and expect the number of blocks to be less or equal to that.
ASSERT_GE((f_stat.st_size + kPageSize + kBlockSize - 1) / kBlockSize,
(unsigned int)f_stat.st_blocks);
}
EnvOptions soptions;
soptions.use_mmap_writes = false;
unique_ptr<WritableFile> wfile;
ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
// allocate 100 MB
size_t kPreallocateSize = 100 * 1024 * 1024;
size_t kBlockSize = 512;
size_t kPageSize = 4096;
std::string data(1024 * 1024, 'a');
wfile->SetPreallocationBlockSize(kPreallocateSize);
wfile->PrepareWrite(wfile->GetFileSize(), data.size());
ASSERT_OK(wfile->Append(Slice(data)));
ASSERT_OK(wfile->Flush());
struct stat f_stat;
stat(fname.c_str(), &f_stat);
ASSERT_EQ((unsigned int)data.size(), f_stat.st_size);
// verify that blocks are preallocated
// Note here that we don't check the exact number of blocks preallocated --
// we only require that number of allocated blocks is at least what we expect.
// It looks like some FS give us more blocks that we asked for. That's fine.
// It might be worth investigating further.
ASSERT_LE((unsigned int)(kPreallocateSize / kBlockSize), f_stat.st_blocks);
// close the file, should deallocate the blocks
wfile.reset();
stat(fname.c_str(), &f_stat);
ASSERT_EQ((unsigned int)data.size(), f_stat.st_size);
// verify that preallocated blocks were deallocated on file close
// Because the FS might give us more blocks, we add a full page to the size
// and expect the number of blocks to be less or equal to that.
ASSERT_GE((f_stat.st_size + kPageSize + kBlockSize - 1) / kBlockSize, (unsigned int)f_stat.st_blocks);
}
#endif // ROCKSDB_FALLOCATE_PRESENT
@ -675,119 +698,159 @@ bool HasPrefix(const std::unordered_set<std::string>& ss) {
// Only works in linux platforms
TEST_F(EnvPosixTest, RandomAccessUniqueIDConcurrent) {
// Check whether a bunch of concurrently existing files have unique IDs.
const EnvOptions soptions;
for (bool directio : {true, false}) {
// Check whether a bunch of concurrently existing files have unique IDs.
EnvOptions soptions;
soptions.use_direct_reads = soptions.use_direct_writes = directio;
// Create the files
IoctlFriendlyTmpdir ift;
std::vector<std::string> fnames;
for (int i = 0; i < 1000; ++i) {
fnames.push_back(ift.name() + "/" + "testfile" + ToString(i));
// Create file.
unique_ptr<WritableFile> wfile;
ASSERT_OK(env_->NewWritableFile(fnames[i], &wfile, soptions));
}
// Create the files
IoctlFriendlyTmpdir ift;
std::vector<std::string> fnames;
for (int i = 0; i < 1000; ++i) {
fnames.push_back(ift.name() + "/" + "testfile" + ToString(i));
// Collect and check whether the IDs are unique.
std::unordered_set<std::string> ids;
for (const std::string fname : fnames) {
unique_ptr<RandomAccessFile> file;
std::string unique_id;
ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
ASSERT_TRUE(id_size > 0);
unique_id = std::string(temp_id, id_size);
ASSERT_TRUE(IsUniqueIDValid(unique_id));
// Create file.
unique_ptr<WritableFile> wfile;
ASSERT_OK(env_->NewWritableFile(fnames[i], &wfile, soptions));
}
ASSERT_TRUE(ids.count(unique_id) == 0);
ids.insert(unique_id);
}
// Collect and check whether the IDs are unique.
std::unordered_set<std::string> ids;
for (const std::string fname: fnames) {
unique_ptr<RandomAccessFile> file;
std::string unique_id;
ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
ASSERT_TRUE(id_size > 0);
unique_id = std::string(temp_id, id_size);
ASSERT_TRUE(IsUniqueIDValid(unique_id));
// Delete the files
for (const std::string fname : fnames) {
ASSERT_OK(env_->DeleteFile(fname));
}
ASSERT_TRUE(ids.count(unique_id) == 0);
ids.insert(unique_id);
ASSERT_TRUE(!HasPrefix(ids));
}
}
// Delete the files
for (const std::string fname: fnames) {
ASSERT_OK(env_->DeleteFile(fname));
}
// Only works in linux platforms
TEST_F(EnvPosixTest, RandomAccessUniqueIDDeletes) {
for (bool directio : {true, false}) {
EnvOptions soptions;
soptions.use_direct_reads = soptions.use_direct_writes = directio;
IoctlFriendlyTmpdir ift;
std::string fname = ift.name() + "/" + "testfile";
// Check that after file is deleted we don't get same ID again in a new
// file.
std::unordered_set<std::string> ids;
for (int i = 0; i < 1000; ++i) {
// Create file.
{
unique_ptr<WritableFile> wfile;
ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
}
// Get Unique ID
std::string unique_id;
{
unique_ptr<RandomAccessFile> file;
ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
ASSERT_TRUE(id_size > 0);
unique_id = std::string(temp_id, id_size);
}
ASSERT_TRUE(IsUniqueIDValid(unique_id));
ASSERT_TRUE(ids.count(unique_id) == 0);
ids.insert(unique_id);
// Delete the file
ASSERT_OK(env_->DeleteFile(fname));
}
ASSERT_TRUE(!HasPrefix(ids));
ASSERT_TRUE(!HasPrefix(ids));
}
}
// Only works in linux platforms
TEST_F(EnvPosixTest, RandomAccessUniqueIDDeletes) {
const EnvOptions soptions;
TEST_P(EnvPosixTestWithParam, InvalidateCache) {
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
for (bool directio : {true, false}) {
EnvOptions soptions;
soptions.use_direct_reads = soptions.use_direct_writes = directio;
std::string fname = test::TmpDir(env_) + "/" + "testfile";
IoctlFriendlyTmpdir ift;
std::string fname = ift.name() + "/" + "testfile";
const size_t kSectorSize = 512;
auto data = NewAligned(kSectorSize, 'A');
Slice slice(data.get(), kSectorSize);
// Check that after file is deleted we don't get same ID again in a new file.
std::unordered_set<std::string> ids;
for (int i = 0; i < 1000; ++i) {
// Create file.
{
unique_ptr<WritableFile> wfile;
if (soptions.use_direct_writes) {
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"NewWritableFile:O_DIRECT", [&](void* arg) {
int* val = static_cast<int*>(arg);
*val &= ~O_DIRECT;
});
}
ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
ASSERT_OK(wfile.get()->Append(slice));
ASSERT_OK(wfile.get()->InvalidateCache(0, 0));
ASSERT_OK(wfile.get()->Close());
}
// Get Unique ID
std::string unique_id;
// Random Read
{
unique_ptr<RandomAccessFile> file;
char scratch[kSectorSize];
Slice result;
if (soptions.use_direct_reads) {
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"NewRandomAccessFile:O_DIRECT", [&](void* arg) {
int* val = static_cast<int*>(arg);
*val &= ~O_DIRECT;
});
}
ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
ASSERT_TRUE(id_size > 0);
unique_id = std::string(temp_id, id_size);
ASSERT_OK(file.get()->Read(0, kSectorSize, &result, scratch));
ASSERT_EQ(memcmp(scratch, data.get(), kSectorSize), 0);
ASSERT_OK(file.get()->InvalidateCache(0, 11));
ASSERT_OK(file.get()->InvalidateCache(0, 0));
}
ASSERT_TRUE(IsUniqueIDValid(unique_id));
ASSERT_TRUE(ids.count(unique_id) == 0);
ids.insert(unique_id);
// Sequential Read
{
unique_ptr<SequentialFile> file;
char scratch[kSectorSize];
Slice result;
if (soptions.use_direct_reads) {
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"NewSequentialFile:O_DIRECT", [&](void* arg) {
int* val = static_cast<int*>(arg);
*val &= ~O_DIRECT;
});
}
ASSERT_OK(env_->NewSequentialFile(fname, &file, soptions));
ASSERT_OK(file.get()->Read(kSectorSize, &result, scratch));
ASSERT_EQ(memcmp(scratch, data.get(), kSectorSize), 0);
ASSERT_OK(file.get()->InvalidateCache(0, 11));
ASSERT_OK(file.get()->InvalidateCache(0, 0));
}
// Delete the file
ASSERT_OK(env_->DeleteFile(fname));
}
ASSERT_TRUE(!HasPrefix(ids));
}
// Only works in linux platforms
TEST_P(EnvPosixTestWithParam, InvalidateCache) {
const EnvOptions soptions;
std::string fname = test::TmpDir(env_) + "/" + "testfile";
// Create file.
{
unique_ptr<WritableFile> wfile;
ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
ASSERT_OK(wfile.get()->Append(Slice("Hello world")));
ASSERT_OK(wfile.get()->InvalidateCache(0, 0));
ASSERT_OK(wfile.get()->Close());
}
// Random Read
{
unique_ptr<RandomAccessFile> file;
char scratch[100];
Slice result;
ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
ASSERT_OK(file.get()->Read(0, 11, &result, scratch));
ASSERT_EQ(memcmp(scratch, "Hello world", 11), 0);
ASSERT_OK(file.get()->InvalidateCache(0, 11));
ASSERT_OK(file.get()->InvalidateCache(0, 0));
}
// Sequential Read
{
unique_ptr<SequentialFile> file;
char scratch[100];
Slice result;
ASSERT_OK(env_->NewSequentialFile(fname, &file, soptions));
ASSERT_OK(file.get()->Read(11, &result, scratch));
ASSERT_EQ(memcmp(scratch, "Hello world", 11), 0);
ASSERT_OK(file.get()->InvalidateCache(0, 11));
ASSERT_OK(file.get()->InvalidateCache(0, 0));
}
// Delete the file
ASSERT_OK(env_->DeleteFile(fname));
rocksdb::SyncPoint::GetInstance()->ClearTrace();
}
#endif // not TRAVIS
#endif // OS_LINUX
@ -909,73 +972,109 @@ TEST_P(EnvPosixTestWithParam, LogBufferMaxSizeTest) {
}
TEST_P(EnvPosixTestWithParam, Preallocation) {
const std::string src = test::TmpDir(env_) + "/" + "testfile";
unique_ptr<WritableFile> srcfile;
const EnvOptions soptions;
ASSERT_OK(env_->NewWritableFile(src, &srcfile, soptions));
srcfile->SetPreallocationBlockSize(1024 * 1024);
// No writes should mean no preallocation
size_t block_size, last_allocated_block;
srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
ASSERT_EQ(last_allocated_block, 0UL);
// Small write should preallocate one block
std::string str = "test";
srcfile->PrepareWrite(srcfile->GetFileSize(), str.size());
srcfile->Append(str);
srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
ASSERT_EQ(last_allocated_block, 1UL);
// Write an entire preallocation block, make sure we increased by two.
std::string buf(block_size, ' ');
srcfile->PrepareWrite(srcfile->GetFileSize(), buf.size());
srcfile->Append(buf);
srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
ASSERT_EQ(last_allocated_block, 2UL);
// Write five more blocks at once, ensure we're where we need to be.
buf = std::string(block_size * 5, ' ');
srcfile->PrepareWrite(srcfile->GetFileSize(), buf.size());
srcfile->Append(buf);
srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
ASSERT_EQ(last_allocated_block, 7UL);
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
for (bool directio : {true, false}) {
const std::string src = test::TmpDir(env_) + "/" + "testfile";
unique_ptr<WritableFile> srcfile;
EnvOptions soptions;
soptions.use_direct_reads = soptions.use_direct_writes = directio;
if (soptions.use_direct_writes) {
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"NewWritableFile:O_DIRECT", [&](void* arg) {
int* val = static_cast<int*>(arg);
*val &= ~O_DIRECT;
});
}
ASSERT_OK(env_->NewWritableFile(src, &srcfile, soptions));
srcfile->SetPreallocationBlockSize(1024 * 1024);
// No writes should mean no preallocation
size_t block_size, last_allocated_block;
srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
ASSERT_EQ(last_allocated_block, 0UL);
// Small write should preallocate one block
size_t kStrSize = 512;
auto data = NewAligned(kStrSize, 'A');
Slice str(data.get(), kStrSize);
srcfile->PrepareWrite(srcfile->GetFileSize(), kStrSize);
srcfile->Append(str);
srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
ASSERT_EQ(last_allocated_block, 1UL);
// Write an entire preallocation block, make sure we increased by two.
{
auto buf_ptr = NewAligned(block_size, ' ');
Slice buf(buf_ptr.get(), block_size);
srcfile->PrepareWrite(srcfile->GetFileSize(), block_size);
srcfile->Append(buf);
srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
ASSERT_EQ(last_allocated_block, 2UL);
}
// Write five more blocks at once, ensure we're where we need to be.
{
auto buf_ptr = NewAligned(block_size * 5, ' ');
Slice buf = Slice(buf_ptr.get(), block_size * 5);
srcfile->PrepareWrite(srcfile->GetFileSize(), buf.size());
srcfile->Append(buf);
srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
ASSERT_EQ(last_allocated_block, 7UL);
}
}
rocksdb::SyncPoint::GetInstance()->ClearTrace();
}
// Test that the two ways to get children file attributes (in bulk or
// individually) behave consistently.
TEST_P(EnvPosixTestWithParam, ConsistentChildrenAttributes) {
const EnvOptions soptions;
const int kNumChildren = 10;
std::string data;
for (int i = 0; i < kNumChildren; ++i) {
std::ostringstream oss;
oss << test::TmpDir(env_) << "/testfile_" << i;
const std::string path = oss.str();
unique_ptr<WritableFile> file;
ASSERT_OK(env_->NewWritableFile(path, &file, soptions));
file->Append(data);
data.append("test");
}
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
for (bool directio : {true, false}) {
EnvOptions soptions;
soptions.use_direct_reads = soptions.use_direct_writes = directio;
const int kNumChildren = 10;
std::string data;
for (int i = 0; i < kNumChildren; ++i) {
std::ostringstream oss;
oss << test::TmpDir(env_) << "/testfile_" << i;
const std::string path = oss.str();
unique_ptr<WritableFile> file;
if (soptions.use_direct_writes) {
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"NewWritableFile:O_DIRECT", [&](void* arg) {
int* val = static_cast<int*>(arg);
*val &= ~O_DIRECT;
});
}
std::vector<Env::FileAttributes> file_attrs;
ASSERT_OK(env_->GetChildrenFileAttributes(test::TmpDir(env_), &file_attrs));
for (int i = 0; i < kNumChildren; ++i) {
std::ostringstream oss;
oss << "testfile_" << i;
const std::string name = oss.str();
const std::string path = test::TmpDir(env_) + "/" + name;
auto file_attrs_iter = std::find_if(
file_attrs.begin(), file_attrs.end(),
[&name](const Env::FileAttributes& fm) { return fm.name == name; });
ASSERT_TRUE(file_attrs_iter != file_attrs.end());
uint64_t size;
ASSERT_OK(env_->GetFileSize(path, &size));
ASSERT_EQ(size, 4 * i);
ASSERT_EQ(size, file_attrs_iter->size_bytes);
ASSERT_OK(env_->NewWritableFile(path, &file, soptions));
auto buf_ptr = NewAligned(data.size(), 'T');
Slice buf(buf_ptr.get(), data.size());
file->Append(buf);
data.append(std::string(512, 'T'));
}
std::vector<Env::FileAttributes> file_attrs;
ASSERT_OK(env_->GetChildrenFileAttributes(test::TmpDir(env_), &file_attrs));
for (int i = 0; i < kNumChildren; ++i) {
std::ostringstream oss;
oss << "testfile_" << i;
const std::string name = oss.str();
const std::string path = test::TmpDir(env_) + "/" + name;
auto file_attrs_iter = std::find_if(
file_attrs.begin(), file_attrs.end(),
[&name](const Env::FileAttributes& fm) { return fm.name == name; });
ASSERT_TRUE(file_attrs_iter != file_attrs.end());
uint64_t size;
ASSERT_OK(env_->GetFileSize(path, &size));
ASSERT_EQ(size, 512 * i);
ASSERT_EQ(size, file_attrs_iter->size_bytes);
}
}
rocksdb::SyncPoint::GetInstance()->ClearTrace();
}
// Test that all WritableFileWrapper forwards all calls to WritableFile.

@ -12,6 +12,7 @@
#include "util/io_posix.h"
#include <errno.h>
#include <fcntl.h>
#include <algorithm>
#if defined(OS_LINUX)
#include <linux/fs.h>
#endif
@ -46,6 +47,112 @@ int Fadvise(int fd, off_t offset, size_t len, int advice) {
#endif
}
/*
* DirectIOHelper
*/
namespace {
const size_t kSectorSize = 512;
#ifdef OS_LINUX
const size_t kPageSize = sysconf(_SC_PAGESIZE);
#else
const size_t kPageSize = 4 * 1024;
#endif
std::unique_ptr<void, void (&)(void*)> NewAligned(const size_t size) {
void* ptr = nullptr;
if (posix_memalign(&ptr, 4 * 1024, size) != 0) {
return std::unique_ptr<char, void (&)(void*)>(nullptr, free);
}
std::unique_ptr<void, void (&)(void*)> uptr(ptr, free);
return uptr;
}
size_t Upper(const size_t size, const size_t fac) {
if (size % fac == 0) {
return size;
}
return size + (fac - size % fac);
}
size_t Lower(const size_t size, const size_t fac) {
if (size % fac == 0) {
return size;
}
return size - (size % fac);
}
bool IsSectorAligned(const size_t off) { return off % kSectorSize == 0; }
static bool IsPageAligned(const void* ptr) {
return uintptr_t(ptr) % (kPageSize) == 0;
}
Status ReadAligned(int fd, Slice* data, const uint64_t offset,
const size_t size, char* scratch) {
assert(IsSectorAligned(offset));
assert(IsSectorAligned(size));
assert(IsPageAligned(scratch));
size_t bytes_read = 0;
ssize_t status = -1;
while (bytes_read < size) {
status =
pread(fd, scratch + bytes_read, size - bytes_read, offset + bytes_read);
if (status <= 0) {
if (errno == EINTR) {
continue;
}
break;
}
bytes_read += status;
}
*data = Slice(scratch, bytes_read);
return status < 0 ? Status::IOError(strerror(errno)) : Status::OK();
}
Status ReadUnaligned(int fd, Slice* data, const uint64_t offset,
const size_t size, char* scratch) {
assert(scratch);
assert(!IsSectorAligned(offset) || !IsSectorAligned(size) ||
!IsPageAligned(scratch));
const uint64_t aligned_off = Lower(offset, kSectorSize);
const size_t aligned_size = Upper(size + (offset - aligned_off), kSectorSize);
auto aligned_scratch = NewAligned(aligned_size);
assert(aligned_scratch);
if (!aligned_scratch) {
return Status::IOError("Unable to allocate");
}
assert(IsSectorAligned(aligned_off));
assert(IsSectorAligned(aligned_size));
assert(aligned_scratch);
assert(IsPageAligned(aligned_scratch.get()));
assert(offset + size <= aligned_off + aligned_size);
Slice scratch_slice;
Status s = ReadAligned(fd, &scratch_slice, aligned_off, aligned_size,
reinterpret_cast<char*>(aligned_scratch.get()));
// copy data upto min(size, what was read)
memcpy(scratch, reinterpret_cast<char*>(aligned_scratch.get()) +
(offset % kSectorSize),
std::min(size, scratch_slice.size()));
*data = Slice(scratch, std::min(size, scratch_slice.size()));
return s;
}
Status DirectIORead(int fd, Slice* result, size_t off, size_t n,
char* scratch) {
if (IsSectorAligned(off) && IsSectorAligned(n) &&
IsPageAligned(result->data())) {
return ReadAligned(fd, result, off, n, scratch);
}
return ReadUnaligned(fd, result, off, n, scratch);
}
} // namespace
/*
* PosixSequentialFile
*/
@ -104,15 +211,37 @@ Status PosixSequentialFile::InvalidateCache(size_t offset, size_t length) {
#endif
}
/*
* PosixDirectIOSequentialFile
*/
Status PosixDirectIOSequentialFile::Read(size_t n, Slice* result,
char* scratch) {
const size_t off = off_.fetch_add(n);
return DirectIORead(fd_, result, off, n, scratch);
}
Status PosixDirectIOSequentialFile::Skip(uint64_t n) {
off_ += n;
return Status::OK();
}
Status PosixDirectIOSequentialFile::InvalidateCache(size_t /*offset*/,
size_t /*length*/) {
return Status::OK();
}
/*
* PosixRandomAccessFile
*/
#if defined(OS_LINUX)
namespace {
static size_t GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
if (max_size < kMaxVarint64Length * 3) {
return 0;
}
struct stat buf;
int result = fstat(fd, &buf);
assert(result != -1);
if (result == -1) {
return 0;
}
@ -132,12 +261,10 @@ static size_t GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
assert(rid >= id);
return static_cast<size_t>(rid - id);
}
} // namespace
#endif
#if defined(OS_MACOSX)
namespace {
static size_t GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
if (max_size < kMaxVarint64Length * 3) {
return 0;
}
@ -155,9 +282,7 @@ static size_t GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
assert(rid >= id);
return static_cast<size_t>(rid - id);
}
} // namespace
#endif
/*
* PosixRandomAccessFile
*
@ -206,7 +331,7 @@ Status PosixRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result,
#if defined(OS_LINUX) || defined(OS_MACOSX)
size_t PosixRandomAccessFile::GetUniqueId(char* id, size_t max_size) const {
return GetUniqueIdFromFile(fd_, id, max_size);
return PosixHelper::GetUniqueIdFromFile(fd_, id, max_size);
}
#endif
@ -246,6 +371,15 @@ Status PosixRandomAccessFile::InvalidateCache(size_t offset, size_t length) {
#endif
}
/*
* PosixDirectIORandomAccessFile
*/
Status PosixDirectIORandomAccessFile::Read(uint64_t offset, size_t n,
Slice* result, char* scratch) const {
Status s = DirectIORead(fd_, result, offset, n, scratch);
return s;
}
/*
* PosixMmapReadableFile
*
@ -663,10 +797,37 @@ Status PosixWritableFile::RangeSync(uint64_t offset, uint64_t nbytes) {
}
size_t PosixWritableFile::GetUniqueId(char* id, size_t max_size) const {
return GetUniqueIdFromFile(fd_, id, max_size);
return PosixHelper::GetUniqueIdFromFile(fd_, id, max_size);
}
#endif
/*
* PosixDirectIOWritableFile
*/
Status PosixDirectIOWritableFile::Append(const Slice& data) {
assert(IsSectorAligned(data.size()) && IsPageAligned(data.data()));
if (!IsSectorAligned(data.size()) || !IsPageAligned(data.data())) {
return Status::IOError("Unaligned buffer for direct IO");
}
return PosixWritableFile::Append(data);
}
Status PosixDirectIOWritableFile::PositionedAppend(const Slice& data,
uint64_t offset) {
assert(IsSectorAligned(offset));
assert(IsSectorAligned(data.size()));
assert(IsPageAligned(data.data()));
if (!IsSectorAligned(offset) || !IsSectorAligned(data.size()) ||
!IsPageAligned(data.data())) {
return Status::IOError("offset or size is not aligned");
}
return PosixWritableFile::PositionedAppend(data, offset);
}
/*
* PosixDirectory
*/
PosixDirectory::~PosixDirectory() { close(fd_); }
Status PosixDirectory::Fsync() {

@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <unistd.h>
#include <atomic>
#include "rocksdb/env.h"
// For non linux platform, the following macros are used only as place
@ -26,6 +27,11 @@ static Status IOError(const std::string& context, int err_number) {
return Status::IOError(context, strerror(err_number));
}
class PosixHelper {
public:
static size_t GetUniqueIdFromFile(int fd, char* id, size_t max_size);
};
class PosixSequentialFile : public SequentialFile {
private:
std::string filename_;
@ -43,8 +49,25 @@ class PosixSequentialFile : public SequentialFile {
virtual Status InvalidateCache(size_t offset, size_t length) override;
};
class PosixRandomAccessFile : public RandomAccessFile {
class PosixDirectIOSequentialFile : public SequentialFile {
public:
explicit PosixDirectIOSequentialFile(const std::string& filename, int fd)
: filename_(filename), fd_(fd) {}
virtual ~PosixDirectIOSequentialFile() {}
Status Read(size_t n, Slice* result, char* scratch) override;
Status Skip(uint64_t n) override;
Status InvalidateCache(size_t offset, size_t length) override;
private:
const std::string filename_;
int fd_ = -1;
std::atomic<size_t> off_{0}; // read offset
};
class PosixRandomAccessFile : public RandomAccessFile {
protected:
std::string filename_;
int fd_;
bool use_os_buffer_;
@ -63,8 +86,23 @@ class PosixRandomAccessFile : public RandomAccessFile {
virtual Status InvalidateCache(size_t offset, size_t length) override;
};
// Direct IO random access file direct IO implementation
class PosixDirectIORandomAccessFile : public PosixRandomAccessFile {
public:
explicit PosixDirectIORandomAccessFile(const std::string& filename, int fd)
: PosixRandomAccessFile(filename, fd, EnvOptions()) {}
virtual ~PosixDirectIORandomAccessFile() {}
Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override;
virtual void Hint(AccessPattern pattern) override {}
Status InvalidateCache(size_t offset, size_t length) override {
return Status::OK();
}
};
class PosixWritableFile : public WritableFile {
private:
protected:
const std::string filename_;
int fd_;
uint64_t filesize_;
@ -74,9 +112,9 @@ class PosixWritableFile : public WritableFile {
#endif
public:
PosixWritableFile(const std::string& fname, int fd,
const EnvOptions& options);
~PosixWritableFile();
explicit PosixWritableFile(const std::string& fname, int fd,
const EnvOptions& options);
virtual ~PosixWritableFile();
// Means Close() will properly take care of truncate
// and it does not need any additional information
@ -96,6 +134,22 @@ class PosixWritableFile : public WritableFile {
#endif
};
class PosixDirectIOWritableFile : public PosixWritableFile {
public:
explicit PosixDirectIOWritableFile(const std::string& filename, int fd)
: PosixWritableFile(filename, fd, EnvOptions()) {}
virtual ~PosixDirectIOWritableFile() {}
bool UseOSBuffer() const override { return false; }
size_t GetRequiredBufferAlignment() const override { return 4 * 1024; }
Status Append(const Slice& data) override;
Status PositionedAppend(const Slice& data, uint64_t offset) override;
bool UseDirectIO() const override { return true; }
Status InvalidateCache(size_t offset, size_t length) override {
return Status::OK();
}
};
class PosixMmapReadableFile : public RandomAccessFile {
private:
int fd_;

Loading…
Cancel
Save