detect logical sector size

Summary:
querying logical sector size from the device instead of hardcoding it for linux platform.
Closes https://github.com/facebook/rocksdb/pull/1875

Differential Revision: D4591946

Pulled By: ajkr

fbshipit-source-id: 4e9805c
main
Aaron Gao 7 years ago committed by Facebook Github Bot
parent ed50308d20
commit 1ef5f50e84
  1. 2
      db/write_controller.cc
  2. 73
      util/env_test.cc
  3. 92
      util/io_posix.cc
  4. 15
      util/io_posix.h

@ -105,7 +105,7 @@ uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) {
} }
uint64_t WriteController::NowMicrosMonotonic(Env* env) { uint64_t WriteController::NowMicrosMonotonic(Env* env) {
return env->NowNanos() / std::milli::den; return env->NowNanos() / std::milli::den;
} }
StopWriteToken::~StopWriteToken() { StopWriteToken::~StopWriteToken() {

@ -690,40 +690,39 @@ class IoctlFriendlyTmpdir {
std::string dir_; std::string dir_;
}; };
TEST_P(EnvPosixTestWithParam, PositionedAppend) {
if (direct_io_ && env_ == Env::Default()) { TEST_F(EnvPosixTest, PositionedAppend) {
unique_ptr<WritableFile> writable_file; unique_ptr<WritableFile> writable_file;
EnvOptions options;
options.use_direct_writes = direct_io_; EnvOptions options;
options.use_mmap_writes = false; options.use_direct_writes = true;
IoctlFriendlyTmpdir ift; options.use_mmap_writes = false;
ASSERT_OK( IoctlFriendlyTmpdir ift;
env_->NewWritableFile(ift.name() + "/f", &writable_file, options)); ASSERT_OK(env_->NewWritableFile(ift.name() + "/f", &writable_file, options));
const size_t kBlockSize = 512; const size_t kBlockSize = 4096;
const size_t kPageSize = 4096; const size_t kPageSize = 4096;
const size_t kDataSize = kPageSize; const size_t kDataSize = kPageSize;
// Write a page worth of 'a' // Write a page worth of 'a'
auto data_ptr = NewAligned(kDataSize, 'a'); auto data_ptr = NewAligned(kDataSize, 'a');
Slice data_a(data_ptr.get(), kDataSize); Slice data_a(data_ptr.get(), kDataSize);
ASSERT_OK(writable_file->PositionedAppend(data_a, 0U)); ASSERT_OK(writable_file->PositionedAppend(data_a, 0U));
// Write a page worth of 'b' right after the first sector // Write a page worth of 'b' right after the first sector
data_ptr = NewAligned(kDataSize, 'b'); data_ptr = NewAligned(kDataSize, 'b');
Slice data_b(data_ptr.get(), kDataSize); Slice data_b(data_ptr.get(), kDataSize);
ASSERT_OK(writable_file->PositionedAppend(data_b, kBlockSize)); ASSERT_OK(writable_file->PositionedAppend(data_b, kBlockSize));
ASSERT_OK(writable_file->Close()); ASSERT_OK(writable_file->Close());
// The file now has 1 sector worth of a followed by a page worth of b // The file now has 1 sector worth of a followed by a page worth of b
// Verify the above // Verify the above
unique_ptr<SequentialFile> seq_file; unique_ptr<SequentialFile> seq_file;
ASSERT_OK(env_->NewSequentialFile(ift.name() + "/f", &seq_file, options)); ASSERT_OK(env_->NewSequentialFile(ift.name() + "/f", &seq_file, options));
char scratch[kPageSize * 2]; char scratch[kPageSize * 2];
Slice result; Slice result;
ASSERT_OK(seq_file->Read(sizeof(scratch), &result, scratch)); ASSERT_OK(seq_file->Read(sizeof(scratch), &result, scratch));
ASSERT_EQ(kPageSize + kBlockSize, result.size()); ASSERT_EQ(kPageSize + kBlockSize, result.size());
ASSERT_EQ('a', result[kBlockSize - 1]); ASSERT_EQ('a', result[kBlockSize - 1]);
ASSERT_EQ('b', result[kBlockSize]); ASSERT_EQ('b', result[kBlockSize]);
}
} }
// Only works in linux platforms // Only works in linux platforms
@ -1155,7 +1154,7 @@ TEST_P(EnvPosixTestWithParam, Preallocation) {
ASSERT_EQ(last_allocated_block, 0UL); ASSERT_EQ(last_allocated_block, 0UL);
// Small write should preallocate one block // Small write should preallocate one block
size_t kStrSize = 512; size_t kStrSize = 4096;
auto data = NewAligned(kStrSize, 'A'); auto data = NewAligned(kStrSize, 'A');
Slice str(data.get(), kStrSize); Slice str(data.get(), kStrSize);
srcfile->PrepareWrite(srcfile->GetFileSize(), kStrSize); srcfile->PrepareWrite(srcfile->GetFileSize(), kStrSize);
@ -1212,7 +1211,7 @@ TEST_P(EnvPosixTestWithParam, ConsistentChildrenAttributes) {
auto buf_ptr = NewAligned(data.size(), 'T'); auto buf_ptr = NewAligned(data.size(), 'T');
Slice buf(buf_ptr.get(), data.size()); Slice buf(buf_ptr.get(), data.size());
file->Append(buf); file->Append(buf);
data.append(std::string(512, 'T')); data.append(std::string(4096, 'T'));
} }
std::vector<Env::FileAttributes> file_attrs; std::vector<Env::FileAttributes> file_attrs;
@ -1229,7 +1228,7 @@ TEST_P(EnvPosixTestWithParam, ConsistentChildrenAttributes) {
ASSERT_TRUE(file_attrs_iter != file_attrs.end()); ASSERT_TRUE(file_attrs_iter != file_attrs.end());
uint64_t size; uint64_t size;
ASSERT_OK(env_->GetFileSize(path, &size)); ASSERT_OK(env_->GetFileSize(path, &size));
ASSERT_EQ(size, 512 * i); ASSERT_EQ(size, 4096 * i);
ASSERT_EQ(size, file_attrs_iter->size_bytes); ASSERT_EQ(size, file_attrs_iter->size_bytes);
} }
rocksdb::SyncPoint::GetInstance()->ClearTrace(); rocksdb::SyncPoint::GetInstance()->ClearTrace();

@ -8,7 +8,6 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifdef ROCKSDB_LIB_IO_POSIX #ifdef ROCKSDB_LIB_IO_POSIX
#include "util/io_posix.h" #include "util/io_posix.h"
#include <errno.h> #include <errno.h>
#include <fcntl.h> #include <fcntl.h>
@ -47,20 +46,84 @@ int Fadvise(int fd, off_t offset, size_t len, int advice) {
#endif #endif
} }
namespace {
size_t GetLogicalBufferSize(int __attribute__((__unused__)) fd) {
#ifdef OS_LINUX
struct stat buf;
int result = fstat(fd, &buf);
if (result == -1) {
return kDefaultPageSize;
}
if (major(buf.st_dev) == 0) {
// Unnamed devices (e.g. non-device mounts), reserved as null device number.
// These don't have an entry in /sys/dev/block/. Return a sensible default.
return kDefaultPageSize;
}
// Reading queue/logical_block_size does not require special permissions.
const int kBufferSize = 100;
char path[kBufferSize];
char real_path[PATH_MAX + 1];
snprintf(path, kBufferSize, "/sys/dev/block/%u:%u", major(buf.st_dev),
minor(buf.st_dev));
if (realpath(path, real_path) == nullptr) {
return kDefaultPageSize;
}
std::string device_dir(real_path);
if (!device_dir.empty() && device_dir.back() == '/') {
device_dir.pop_back();
}
// NOTE: sda3 does not have a `queue/` subdir, only the parent sda has it.
// $ ls -al '/sys/dev/block/8:3'
// lrwxrwxrwx. 1 root root 0 Jun 26 01:38 /sys/dev/block/8:3 ->
// ../../block/sda/sda3
size_t parent_end = device_dir.rfind('/', device_dir.length() - 1);
if (parent_end == std::string::npos) {
return kDefaultPageSize;
}
size_t parent_begin = device_dir.rfind('/', parent_end - 1);
if (parent_begin == std::string::npos) {
return kDefaultPageSize;
}
if (device_dir.substr(parent_begin + 1, parent_end - parent_begin - 1) !=
"block") {
device_dir = device_dir.substr(0, parent_end);
}
std::string fname = device_dir + "/queue/logical_block_size";
FILE* fp;
size_t size = 0;
fp = fopen(fname.c_str(), "r");
if (fp != nullptr) {
char* line = nullptr;
size_t len = 0;
if (getline(&line, &len, fp) != -1) {
sscanf(line, "%zu", &size);
}
free(line);
fclose(fp);
}
if (size != 0 && (size & (size - 1)) == 0) {
return size;
}
#endif
return kDefaultPageSize;
}
} // namespace
/* /*
* DirectIOHelper * DirectIOHelper
*/ */
#ifndef NDEBUG #ifndef NDEBUG
namespace { namespace {
const size_t kSectorSize = 512;
#ifdef OS_LINUX #ifdef OS_LINUX
const size_t kPageSize = sysconf(_SC_PAGESIZE); const size_t kPageSize = sysconf(_SC_PAGESIZE);
#else #else
const size_t kPageSize = 4 * 1024; const size_t kPageSize = 4 * 1024;
#endif #endif
bool IsSectorAligned(const size_t off, size_t sector_size) {
bool IsSectorAligned(const size_t off) { return off % kSectorSize == 0; } return off % sector_size == 0;
}
static bool IsPageAligned(const void* ptr) { static bool IsPageAligned(const void* ptr) {
return uintptr_t(ptr) % (kPageSize) == 0; return uintptr_t(ptr) % (kPageSize) == 0;
@ -77,7 +140,8 @@ PosixSequentialFile::PosixSequentialFile(const std::string& fname, FILE* file,
: filename_(fname), : filename_(fname),
file_(file), file_(file),
fd_(fd), fd_(fd),
use_direct_io_(options.use_direct_reads) { use_direct_io_(options.use_direct_reads),
logical_sector_size_(GetLogicalBufferSize(fd_)) {
assert(!options.use_direct_reads || !options.use_mmap_reads); assert(!options.use_direct_reads || !options.use_mmap_reads);
} }
@ -230,7 +294,10 @@ size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
*/ */
PosixRandomAccessFile::PosixRandomAccessFile(const std::string& fname, int fd, PosixRandomAccessFile::PosixRandomAccessFile(const std::string& fname, int fd,
const EnvOptions& options) const EnvOptions& options)
: filename_(fname), fd_(fd), use_direct_io_(options.use_direct_reads) { : filename_(fname),
fd_(fd),
use_direct_io_(options.use_direct_reads),
logical_sector_size_(GetLogicalBufferSize(fd_)) {
assert(!options.use_direct_reads || !options.use_mmap_reads); assert(!options.use_direct_reads || !options.use_mmap_reads);
assert(!options.use_mmap_reads || sizeof(void*) < 8); assert(!options.use_mmap_reads || sizeof(void*) < 8);
} }
@ -601,7 +668,8 @@ PosixWritableFile::PosixWritableFile(const std::string& fname, int fd,
: filename_(fname), : filename_(fname),
use_direct_io_(options.use_direct_writes), use_direct_io_(options.use_direct_writes),
fd_(fd), fd_(fd),
filesize_(0) { filesize_(0),
logical_sector_size_(GetLogicalBufferSize(fd_)) {
#ifdef ROCKSDB_FALLOCATE_PRESENT #ifdef ROCKSDB_FALLOCATE_PRESENT
allow_fallocate_ = options.allow_fallocate; allow_fallocate_ = options.allow_fallocate;
fallocate_with_keep_size_ = options.fallocate_with_keep_size; fallocate_with_keep_size_ = options.fallocate_with_keep_size;
@ -616,7 +684,9 @@ PosixWritableFile::~PosixWritableFile() {
} }
Status PosixWritableFile::Append(const Slice& data) { Status PosixWritableFile::Append(const Slice& data) {
assert(!use_direct_io() || (IsSectorAligned(data.size()) && IsPageAligned(data.data()))); assert(!use_direct_io() ||
(IsSectorAligned(data.size(), GetRequiredBufferAlignment()) &&
IsPageAligned(data.data())));
const char* src = data.data(); const char* src = data.data();
size_t left = data.size(); size_t left = data.size();
while (left != 0) { while (left != 0) {
@ -635,8 +705,10 @@ Status PosixWritableFile::Append(const Slice& data) {
} }
Status PosixWritableFile::PositionedAppend(const Slice& data, uint64_t offset) { Status PosixWritableFile::PositionedAppend(const Slice& data, uint64_t offset) {
assert(use_direct_io() && IsSectorAligned(offset) && assert(use_direct_io() &&
IsSectorAligned(data.size()) && IsPageAligned(data.data())); IsSectorAligned(offset, GetRequiredBufferAlignment()) &&
IsSectorAligned(data.size(), GetRequiredBufferAlignment()) &&
IsPageAligned(data.data()));
assert(offset <= std::numeric_limits<off_t>::max()); assert(offset <= std::numeric_limits<off_t>::max());
const char* src = data.data(); const char* src = data.data();
size_t left = data.size(); size_t left = data.size();

@ -47,6 +47,7 @@ class PosixSequentialFile : public SequentialFile {
FILE* file_; FILE* file_;
int fd_; int fd_;
bool use_direct_io_; bool use_direct_io_;
size_t logical_sector_size_;
public: public:
PosixSequentialFile(const std::string& fname, FILE* file, int fd, PosixSequentialFile(const std::string& fname, FILE* file, int fd,
@ -59,6 +60,9 @@ class PosixSequentialFile : public SequentialFile {
virtual Status Skip(uint64_t n) override; virtual Status Skip(uint64_t n) override;
virtual Status InvalidateCache(size_t offset, size_t length) override; virtual Status InvalidateCache(size_t offset, size_t length) override;
virtual bool use_direct_io() const override { return use_direct_io_; } virtual bool use_direct_io() const override { return use_direct_io_; }
virtual size_t GetRequiredBufferAlignment() const override {
return logical_sector_size_;
}
}; };
class PosixRandomAccessFile : public RandomAccessFile { class PosixRandomAccessFile : public RandomAccessFile {
@ -66,6 +70,7 @@ class PosixRandomAccessFile : public RandomAccessFile {
std::string filename_; std::string filename_;
int fd_; int fd_;
bool use_direct_io_; bool use_direct_io_;
size_t logical_sector_size_;
public: public:
PosixRandomAccessFile(const std::string& fname, int fd, PosixRandomAccessFile(const std::string& fname, int fd,
@ -80,6 +85,9 @@ class PosixRandomAccessFile : public RandomAccessFile {
virtual void Hint(AccessPattern pattern) override; virtual void Hint(AccessPattern pattern) override;
virtual Status InvalidateCache(size_t offset, size_t length) override; virtual Status InvalidateCache(size_t offset, size_t length) override;
virtual bool use_direct_io() const override { return use_direct_io_; } virtual bool use_direct_io() const override { return use_direct_io_; }
virtual size_t GetRequiredBufferAlignment() const override {
return logical_sector_size_;
}
}; };
class PosixWritableFile : public WritableFile { class PosixWritableFile : public WritableFile {
@ -88,6 +96,7 @@ class PosixWritableFile : public WritableFile {
const bool use_direct_io_; const bool use_direct_io_;
int fd_; int fd_;
uint64_t filesize_; uint64_t filesize_;
size_t logical_sector_size_;
#ifdef ROCKSDB_FALLOCATE_PRESENT #ifdef ROCKSDB_FALLOCATE_PRESENT
bool allow_fallocate_; bool allow_fallocate_;
bool fallocate_with_keep_size_; bool fallocate_with_keep_size_;
@ -110,12 +119,10 @@ class PosixWritableFile : public WritableFile {
virtual bool IsSyncThreadSafe() const override; virtual bool IsSyncThreadSafe() const override;
virtual bool use_direct_io() const override { return use_direct_io_; } virtual bool use_direct_io() const override { return use_direct_io_; }
virtual uint64_t GetFileSize() override; virtual uint64_t GetFileSize() override;
virtual Status InvalidateCache(size_t offset, size_t length) override;
virtual size_t GetRequiredBufferAlignment() const override { virtual size_t GetRequiredBufferAlignment() const override {
// TODO(gzh): It should be the logical sector size/filesystem block size return logical_sector_size_;
// hardcoded as 4k for most cases
return 4 * 1024;
} }
virtual Status InvalidateCache(size_t offset, size_t length) override;
#ifdef ROCKSDB_FALLOCATE_PRESENT #ifdef ROCKSDB_FALLOCATE_PRESENT
virtual Status Allocate(uint64_t offset, uint64_t len) override; virtual Status Allocate(uint64_t offset, uint64_t len) override;
#endif #endif

Loading…
Cancel
Save