avoid direct io in rocksdb_lite

Summary:
fix lite bugs
disable direct io in lite mode
Closes https://github.com/facebook/rocksdb/pull/1870

Differential Revision: D4559866

Pulled By: yiwu-arbug

fbshipit-source-id: 3761c51
main
Aaron Gao 7 years ago committed by Facebook Github Bot
parent 43e9f01c20
commit db2b4eb50e
  1. 2
      db/db_flush_test.cc
  2. 3
      db/repair_test.cc
  3. 5
      db/version_set.cc
  4. 2
      include/rocksdb/options.h
  5. 12
      util/env_posix.cc
  6. 138
      util/env_test.cc
  7. 59
      util/file_reader_writer.cc
  8. 2
      util/file_reader_writer.h
  9. 2
      util/file_reader_writer_test.cc

@ -74,7 +74,9 @@ TEST_F(DBFlushTest, SyncFail) {
TEST_SYNC_POINT("DBFlushTest::SyncFail:2"); TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
fault_injection_env->SetFilesystemActive(true); fault_injection_env->SetFilesystemActive(true);
dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForFlushMemTable();
#ifndef ROCKSDB_LITE
ASSERT_EQ("", FilesPerLevel()); // flush failed. ASSERT_EQ("", FilesPerLevel()); // flush failed.
#endif // ROCKSDB_LITE
// Flush job should release ref count to current version. // Flush job should release ref count to current version.
ASSERT_EQ(refs_before, cfd->current()->TEST_refs()); ASSERT_EQ(refs_before, cfd->current()->TEST_refs());
Destroy(options); Destroy(options);

@ -19,6 +19,7 @@
namespace rocksdb { namespace rocksdb {
#ifndef ROCKSDB_LITE
class RepairTest : public DBTestBase { class RepairTest : public DBTestBase {
public: public:
RepairTest() : DBTestBase("/repair_test") {} RepairTest() : DBTestBase("/repair_test") {}
@ -273,6 +274,8 @@ TEST_F(RepairTest, RepairColumnFamilyOptions) {
} }
} }
} }
#endif // ROCKSDB_LITE
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -2361,9 +2361,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
// If we just created a new descriptor file, install it by writing a // If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it. // new CURRENT file that points to it.
if (s.ok() && new_descriptor_log) { if (s.ok() && new_descriptor_log) {
s = SetCurrentFile( s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_,
env_, dbname_, pending_manifest_file_number_, db_directory);
db_directory);
} }
if (s.ok()) { if (s.ok()) {

@ -1121,10 +1121,12 @@ struct DBOptions {
// Use O_DIRECT for reading file // Use O_DIRECT for reading file
// Default: false // Default: false
// Not supported in ROCKSDB_LITE mode!
bool use_direct_reads = false; bool use_direct_reads = false;
// Use O_DIRECT for writing file // Use O_DIRECT for writing file
// Default: false // Default: false
// Not supported in ROCKSDB_LITE mode!
bool use_direct_writes = false; bool use_direct_writes = false;
// If false, fallocate() calls are bypassed // If false, fallocate() calls are bypassed

@ -156,6 +156,9 @@ class PosixEnv : public Env {
FILE* file = nullptr; FILE* file = nullptr;
if (options.use_direct_reads && !options.use_mmap_reads) { if (options.use_direct_reads && !options.use_mmap_reads) {
#ifdef ROCKSDB_LITE
return Status::IOError(fname, "Direct I/O not supported in RocksDB lite");
#endif // !ROCKSDB_LITE
#ifndef OS_MACOSX #ifndef OS_MACOSX
flags |= O_DIRECT; flags |= O_DIRECT;
#endif #endif
@ -200,6 +203,9 @@ class PosixEnv : public Env {
int fd; int fd;
int flags = O_RDONLY; int flags = O_RDONLY;
if (options.use_direct_reads && !options.use_mmap_reads) { if (options.use_direct_reads && !options.use_mmap_reads) {
#ifdef ROCKSDB_LITE
return Status::IOError(fname, "Direct I/O not supported in RocksDB lite");
#endif // !ROCKSDB_LITE
#ifndef OS_MACOSX #ifndef OS_MACOSX
flags |= O_DIRECT; flags |= O_DIRECT;
TEST_SYNC_POINT_CALLBACK("NewRandomAccessFile:O_DIRECT", &flags); TEST_SYNC_POINT_CALLBACK("NewRandomAccessFile:O_DIRECT", &flags);
@ -261,6 +267,9 @@ class PosixEnv : public Env {
// appends data to the end of the file, regardless of the value of // appends data to the end of the file, regardless of the value of
// offset. // offset.
// More info here: https://linux.die.net/man/2/pwrite // More info here: https://linux.die.net/man/2/pwrite
#ifdef ROCKSDB_LITE
return Status::IOError(fname, "Direct I/O not supported in RocksDB lite");
#endif // !ROCKSDB_LITE
flags |= O_WRONLY; flags |= O_WRONLY;
#ifndef OS_MACOSX #ifndef OS_MACOSX
flags |= O_DIRECT; flags |= O_DIRECT;
@ -325,6 +334,9 @@ class PosixEnv : public Env {
int flags = 0; int flags = 0;
// Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX) // Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX)
if (options.use_direct_writes && !options.use_mmap_writes) { if (options.use_direct_writes && !options.use_mmap_writes) {
#ifdef ROCKSDB_LITE
return Status::IOError(fname, "Direct I/O not supported in RocksDB lite");
#endif // !ROCKSDB_LITE
flags |= O_WRONLY; flags |= O_WRONLY;
#ifndef OS_MACOSX #ifndef OS_MACOSX
flags |= O_DIRECT; flags |= O_DIRECT;

@ -88,13 +88,19 @@ class EnvPosixTest : public testing::Test {
public: public:
Env* env_; Env* env_;
EnvPosixTest() : env_(Env::Default()) { } bool direct_io_;
EnvPosixTest() : env_(Env::Default()), direct_io_(false) {}
}; };
class EnvPosixTestWithParam : public EnvPosixTest, class EnvPosixTestWithParam
public ::testing::WithParamInterface<Env*> { : public EnvPosixTest,
public ::testing::WithParamInterface<std::pair<Env*, bool>> {
public: public:
EnvPosixTestWithParam() { env_ = GetParam(); } EnvPosixTestWithParam() {
std::pair<Env*, bool> param_pair = GetParam();
env_ = param_pair.first;
direct_io_ = param_pair.second;
}
void WaitThreadPoolsEmpty() { void WaitThreadPoolsEmpty() {
// Wait until the thread pools are empty. // Wait until the thread pools are empty.
@ -678,46 +684,48 @@ class IoctlFriendlyTmpdir {
std::string dir_; std::string dir_;
}; };
TEST_F(EnvPosixTest, PositionedAppend) { TEST_P(EnvPosixTestWithParam, PositionedAppend) {
unique_ptr<WritableFile> writable_file; if (direct_io_ && env_ == Env::Default()) {
unique_ptr<WritableFile> writable_file;
EnvOptions options; EnvOptions options;
options.use_direct_writes = true; options.use_direct_writes = direct_io_;
options.use_mmap_writes = false; options.use_mmap_writes = false;
IoctlFriendlyTmpdir ift; IoctlFriendlyTmpdir ift;
ASSERT_OK(env_->NewWritableFile(ift.name() + "/f", &writable_file, options)); ASSERT_OK(
env_->NewWritableFile(ift.name() + "/f", &writable_file, options));
const size_t kBlockSize = 512;
const size_t kPageSize = 4096; const size_t kBlockSize = 512;
const size_t kDataSize = kPageSize; const size_t kPageSize = 4096;
// Write a page worth of 'a' const size_t kDataSize = kPageSize;
auto data_ptr = NewAligned(kDataSize, 'a'); // Write a page worth of 'a'
Slice data_a(data_ptr.get(), kDataSize); auto data_ptr = NewAligned(kDataSize, 'a');
ASSERT_OK(writable_file->PositionedAppend(data_a, 0U)); Slice data_a(data_ptr.get(), kDataSize);
// Write a page worth of 'b' right after the first sector ASSERT_OK(writable_file->PositionedAppend(data_a, 0U));
data_ptr = NewAligned(kDataSize, 'b'); // Write a page worth of 'b' right after the first sector
Slice data_b(data_ptr.get(), kDataSize); data_ptr = NewAligned(kDataSize, 'b');
ASSERT_OK(writable_file->PositionedAppend(data_b, kBlockSize)); Slice data_b(data_ptr.get(), kDataSize);
ASSERT_OK(writable_file->Close()); ASSERT_OK(writable_file->PositionedAppend(data_b, kBlockSize));
// The file now has 1 sector worth of a followed by a page worth of b ASSERT_OK(writable_file->Close());
// The file now has 1 sector worth of a followed by a page worth of b
// Verify the above
unique_ptr<SequentialFile> seq_file; // Verify the above
ASSERT_OK(env_->NewSequentialFile(ift.name() + "/f", &seq_file, options)); unique_ptr<SequentialFile> seq_file;
char scratch[kPageSize * 2]; ASSERT_OK(env_->NewSequentialFile(ift.name() + "/f", &seq_file, options));
Slice result; char scratch[kPageSize * 2];
ASSERT_OK(seq_file->Read(sizeof(scratch), &result, scratch)); Slice result;
ASSERT_EQ(kPageSize + kBlockSize, result.size()); ASSERT_OK(seq_file->Read(sizeof(scratch), &result, scratch));
ASSERT_EQ('a', result[kBlockSize - 1]); ASSERT_EQ(kPageSize + kBlockSize, result.size());
ASSERT_EQ('b', result[kBlockSize]); ASSERT_EQ('a', result[kBlockSize - 1]);
ASSERT_EQ('b', result[kBlockSize]);
}
} }
// Only works in linux platforms // Only works in linux platforms
TEST_F(EnvPosixTest, RandomAccessUniqueID) { TEST_P(EnvPosixTestWithParam, RandomAccessUniqueID) {
for (bool directio : {true, false}) { // Create file.
// Create file. if (env_ == Env::Default()) {
EnvOptions soptions; EnvOptions soptions;
soptions.use_direct_reads = soptions.use_direct_writes = directio; soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
IoctlFriendlyTmpdir ift; IoctlFriendlyTmpdir ift;
std::string fname = ift.name() + "/testfile"; std::string fname = ift.name() + "/testfile";
unique_ptr<WritableFile> wfile; unique_ptr<WritableFile> wfile;
@ -758,8 +766,8 @@ TEST_F(EnvPosixTest, RandomAccessUniqueID) {
// only works in linux platforms // only works in linux platforms
#ifdef ROCKSDB_FALLOCATE_PRESENT #ifdef ROCKSDB_FALLOCATE_PRESENT
TEST_F(EnvPosixTest, AllocateTest) { TEST_P(EnvPosixTestWithParam, AllocateTest) {
for (bool directio : {true, false}) { if (env_ == Env::Default()) {
IoctlFriendlyTmpdir ift; IoctlFriendlyTmpdir ift;
std::string fname = ift.name() + "/preallocate_testfile"; std::string fname = ift.name() + "/preallocate_testfile";
@ -789,7 +797,7 @@ TEST_F(EnvPosixTest, AllocateTest) {
EnvOptions soptions; EnvOptions soptions;
soptions.use_mmap_writes = false; soptions.use_mmap_writes = false;
soptions.use_direct_reads = soptions.use_direct_writes = directio; soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
unique_ptr<WritableFile> wfile; unique_ptr<WritableFile> wfile;
ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
@ -846,11 +854,11 @@ bool HasPrefix(const std::unordered_set<std::string>& ss) {
} }
// Only works in linux and WIN platforms // Only works in linux and WIN platforms
TEST_F(EnvPosixTest, RandomAccessUniqueIDConcurrent) { TEST_P(EnvPosixTestWithParam, RandomAccessUniqueIDConcurrent) {
for (bool directio : {true, false}) { if (env_ == Env::Default()) {
// Check whether a bunch of concurrently existing files have unique IDs. // Check whether a bunch of concurrently existing files have unique IDs.
EnvOptions soptions; EnvOptions soptions;
soptions.use_direct_reads = soptions.use_direct_writes = directio; soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
// Create the files // Create the files
IoctlFriendlyTmpdir ift; IoctlFriendlyTmpdir ift;
@ -888,10 +896,10 @@ TEST_F(EnvPosixTest, RandomAccessUniqueIDConcurrent) {
} }
// Only works in linux and WIN platforms // Only works in linux and WIN platforms
TEST_F(EnvPosixTest, RandomAccessUniqueIDDeletes) { TEST_P(EnvPosixTestWithParam, RandomAccessUniqueIDDeletes) {
for (bool directio : {true, false}) { if (env_ == Env::Default()) {
EnvOptions soptions; EnvOptions soptions;
soptions.use_direct_reads = soptions.use_direct_writes = directio; soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
IoctlFriendlyTmpdir ift; IoctlFriendlyTmpdir ift;
std::string fname = ift.name() + "/" + "testfile"; std::string fname = ift.name() + "/" + "testfile";
@ -935,9 +943,8 @@ TEST_P(EnvPosixTestWithParam, DISABLED_InvalidateCache) {
TEST_P(EnvPosixTestWithParam, InvalidateCache) { TEST_P(EnvPosixTestWithParam, InvalidateCache) {
#endif #endif
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
for (bool directio : {true, false}) {
EnvOptions soptions; EnvOptions soptions;
soptions.use_direct_reads = soptions.use_direct_writes = directio; soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
std::string fname = test::TmpDir(env_) + "/" + "testfile"; std::string fname = test::TmpDir(env_) + "/" + "testfile";
const size_t kSectorSize = 512; const size_t kSectorSize = 512;
@ -997,7 +1004,6 @@ TEST_P(EnvPosixTestWithParam, InvalidateCache) {
} }
// Delete the file // Delete the file
ASSERT_OK(env_->DeleteFile(fname)); ASSERT_OK(env_->DeleteFile(fname));
}
rocksdb::SyncPoint::GetInstance()->ClearTrace(); rocksdb::SyncPoint::GetInstance()->ClearTrace();
} }
#endif // not TRAVIS #endif // not TRAVIS
@ -1121,12 +1127,10 @@ TEST_P(EnvPosixTestWithParam, LogBufferMaxSizeTest) {
TEST_P(EnvPosixTestWithParam, Preallocation) { TEST_P(EnvPosixTestWithParam, Preallocation) {
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
for (bool directio : {true, false}) {
const std::string src = test::TmpDir(env_) + "/" + "testfile"; const std::string src = test::TmpDir(env_) + "/" + "testfile";
unique_ptr<WritableFile> srcfile; unique_ptr<WritableFile> srcfile;
EnvOptions soptions; EnvOptions soptions;
soptions.use_direct_reads = soptions.use_direct_writes = directio; soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
#if !defined(OS_MACOSX) && !defined(OS_WIN) #if !defined(OS_MACOSX) && !defined(OS_WIN)
if (soptions.use_direct_writes) { if (soptions.use_direct_writes) {
rocksdb::SyncPoint::GetInstance()->SetCallBack( rocksdb::SyncPoint::GetInstance()->SetCallBack(
@ -1172,7 +1176,6 @@ TEST_P(EnvPosixTestWithParam, Preallocation) {
srcfile->GetPreallocationStatus(&block_size, &last_allocated_block); srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
ASSERT_EQ(last_allocated_block, 7UL); ASSERT_EQ(last_allocated_block, 7UL);
} }
}
rocksdb::SyncPoint::GetInstance()->ClearTrace(); rocksdb::SyncPoint::GetInstance()->ClearTrace();
} }
@ -1180,9 +1183,8 @@ TEST_P(EnvPosixTestWithParam, Preallocation) {
// individually) behave consistently. // individually) behave consistently.
TEST_P(EnvPosixTestWithParam, ConsistentChildrenAttributes) { TEST_P(EnvPosixTestWithParam, ConsistentChildrenAttributes) {
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
for (bool directio : {true, false}) {
EnvOptions soptions; EnvOptions soptions;
soptions.use_direct_reads = soptions.use_direct_writes = directio; soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
const int kNumChildren = 10; const int kNumChildren = 10;
std::string data; std::string data;
@ -1224,7 +1226,6 @@ TEST_P(EnvPosixTestWithParam, ConsistentChildrenAttributes) {
ASSERT_EQ(size, 512 * i); ASSERT_EQ(size, 512 * i);
ASSERT_EQ(size, file_attrs_iter->size_bytes); ASSERT_EQ(size, file_attrs_iter->size_bytes);
} }
}
rocksdb::SyncPoint::GetInstance()->ClearTrace(); rocksdb::SyncPoint::GetInstance()->ClearTrace();
} }
@ -1462,13 +1463,24 @@ TEST_P(EnvPosixTestWithParam, PosixRandomRWFileRandomized) {
env_->DeleteFile(path); env_->DeleteFile(path);
} }
INSTANTIATE_TEST_CASE_P(DefaultEnv, EnvPosixTestWithParam, INSTANTIATE_TEST_CASE_P(DefaultEnvWithoutDirectIO, EnvPosixTestWithParam,
::testing::Values(Env::Default())); ::testing::Values(std::pair<Env*, bool>(Env::Default(),
false)));
#if !defined(ROCKSDB_LITE)
INSTANTIATE_TEST_CASE_P(DefaultEnvWithDirectIO, EnvPosixTestWithParam,
::testing::Values(std::pair<Env*, bool>(Env::Default(),
true)));
#endif // !defined(ROCKSDB_LITE)
#if !defined(ROCKSDB_LITE) && !defined(OS_WIN) #if !defined(ROCKSDB_LITE) && !defined(OS_WIN)
static unique_ptr<Env> chroot_env(NewChrootEnv(Env::Default(), static unique_ptr<Env> chroot_env(NewChrootEnv(Env::Default(),
test::TmpDir(Env::Default()))); test::TmpDir(Env::Default())));
INSTANTIATE_TEST_CASE_P(ChrootEnv, EnvPosixTestWithParam, INSTANTIATE_TEST_CASE_P(
::testing::Values(chroot_env.get())); ChrootEnvWithoutDirectIO, EnvPosixTestWithParam,
::testing::Values(std::pair<Env*, bool>(chroot_env.get(), false)));
INSTANTIATE_TEST_CASE_P(
ChrootEnvWithDirectIO, EnvPosixTestWithParam,
::testing::Values(std::pair<Env*, bool>(chroot_env.get(), true)));
#endif // !defined(ROCKSDB_LITE) && !defined(OS_WIN) #endif // !defined(ROCKSDB_LITE) && !defined(OS_WIN)
} // namespace rocksdb } // namespace rocksdb

@ -24,6 +24,7 @@ namespace rocksdb {
Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
Status s; Status s;
if (use_direct_io()) { if (use_direct_io()) {
#ifndef ROCKSDB_LITE
size_t offset = offset_.fetch_add(n); size_t offset = offset_.fetch_add(n);
size_t alignment = file_->GetRequiredBufferAlignment(); size_t alignment = file_->GetRequiredBufferAlignment();
size_t aligned_offset = TruncateToPageBoundary(alignment, offset); size_t aligned_offset = TruncateToPageBoundary(alignment, offset);
@ -41,6 +42,7 @@ Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
std::min(tmp.size() - offset_advance, n)); std::min(tmp.size() - offset_advance, n));
} }
*result = Slice(scratch, r); *result = Slice(scratch, r);
#endif // !ROCKSDB_LITE
} else { } else {
s = file_->Read(n, result, scratch); s = file_->Read(n, result, scratch);
} }
@ -48,37 +50,15 @@ Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
return s; return s;
} }
Status SequentialFileReader::DirectRead(size_t n, Slice* result,
char* scratch) {
size_t offset = offset_.fetch_add(n);
size_t alignment = file_->GetRequiredBufferAlignment();
size_t aligned_offset = TruncateToPageBoundary(alignment, offset);
size_t offset_advance = offset - aligned_offset;
size_t size = Roundup(offset + n, alignment) - aligned_offset;
AlignedBuffer buf;
buf.Alignment(alignment);
buf.AllocateNewBuffer(size);
Slice tmp;
Status s =
file_->PositionedRead(aligned_offset, size, &tmp, buf.BufferStart());
if (s.ok()) {
buf.Size(tmp.size());
size_t r = buf.Read(scratch, offset_advance,
tmp.size() <= offset_advance
? 0
: std::min(tmp.size() - offset_advance, n));
*result = Slice(scratch, r);
}
return s;
}
Status SequentialFileReader::Skip(uint64_t n) { Status SequentialFileReader::Skip(uint64_t n) {
#ifndef ROCKSDB_LITE
if (use_direct_io()) { if (use_direct_io()) {
offset_ += n; offset_ += n;
return Status::OK(); return Status::OK();
} else {
return file_->Skip(n);
} }
#endif // !ROCKSDB_LITE
return file_->Skip(n);
} }
Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
@ -90,6 +70,7 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
(stats_ != nullptr) ? &elapsed : nullptr); (stats_ != nullptr) ? &elapsed : nullptr);
IOSTATS_TIMER_GUARD(read_nanos); IOSTATS_TIMER_GUARD(read_nanos);
if (use_direct_io()) { if (use_direct_io()) {
#ifndef ROCKSDB_LITE
size_t alignment = file_->GetRequiredBufferAlignment(); size_t alignment = file_->GetRequiredBufferAlignment();
size_t aligned_offset = TruncateToPageBoundary(alignment, offset); size_t aligned_offset = TruncateToPageBoundary(alignment, offset);
size_t offset_advance = offset - aligned_offset; size_t offset_advance = offset - aligned_offset;
@ -106,6 +87,7 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
std::min(tmp.size() - offset_advance, n)); std::min(tmp.size() - offset_advance, n));
} }
*result = Slice(scratch, r); *result = Slice(scratch, r);
#endif // !ROCKSDB_LITE
} else { } else {
s = file_->Read(offset, n, result, scratch); s = file_->Read(offset, n, result, scratch);
} }
@ -117,28 +99,6 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
return s; return s;
} }
Status RandomAccessFileReader::DirectRead(uint64_t offset, size_t n,
Slice* result, char* scratch) const {
size_t alignment = file_->GetRequiredBufferAlignment();
size_t aligned_offset = TruncateToPageBoundary(alignment, offset);
size_t offset_advance = offset - aligned_offset;
size_t size = Roundup(offset + n, alignment) - aligned_offset;
AlignedBuffer buf;
buf.Alignment(alignment);
buf.AllocateNewBuffer(size);
Slice tmp;
Status s = file_->Read(aligned_offset, size, &tmp, buf.BufferStart());
if (s.ok()) {
buf.Size(tmp.size());
size_t r = buf.Read(scratch, offset_advance,
tmp.size() <= offset_advance
? 0
: std::min(tmp.size() - offset_advance, n));
*result = Slice(scratch, r);
}
return s;
}
Status WritableFileWriter::Append(const Slice& data) { Status WritableFileWriter::Append(const Slice& data) {
const char* src = data.data(); const char* src = data.data();
size_t left = data.size(); size_t left = data.size();
@ -252,7 +212,9 @@ Status WritableFileWriter::Flush() {
if (buf_.CurrentSize() > 0) { if (buf_.CurrentSize() > 0) {
if (direct_io_) { if (direct_io_) {
#ifndef ROCKSDB_LITE
s = WriteDirect(); s = WriteDirect();
#endif // !ROCKSDB_LITE
} else { } else {
s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize()); s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize());
} }
@ -401,6 +363,7 @@ Status WritableFileWriter::WriteBuffered(const char* data, size_t size) {
// whole number of pages to be written again on the next flush because we can // whole number of pages to be written again on the next flush because we can
// only write on aligned // only write on aligned
// offsets. // offsets.
#ifndef ROCKSDB_LITE
Status WritableFileWriter::WriteDirect() { Status WritableFileWriter::WriteDirect() {
Status s; Status s;
@ -460,7 +423,7 @@ Status WritableFileWriter::WriteDirect() {
} }
return s; return s;
} }
#endif // !ROCKSDB_LITE
namespace { namespace {
class ReadaheadRandomAccessFile : public RandomAccessFile { class ReadaheadRandomAccessFile : public RandomAccessFile {

@ -170,7 +170,9 @@ class WritableFileWriter {
private: private:
// Used when os buffering is OFF and we are writing // Used when os buffering is OFF and we are writing
// DMA such as in Direct I/O mode // DMA such as in Direct I/O mode
#ifndef ROCKSDB_LITE
Status WriteDirect(); Status WriteDirect();
#endif // !ROCKSDB_LITE
// Normal write // Normal write
Status WriteBuffered(const char* data, size_t size); Status WriteBuffered(const char* data, size_t size);
Status RangeSync(uint64_t offset, uint64_t nbytes); Status RangeSync(uint64_t offset, uint64_t nbytes);

@ -85,6 +85,7 @@ TEST_F(WritableFileWriterTest, RangeSync) {
writer->Close(); writer->Close();
} }
#ifndef ROCKSDB_LITE
TEST_F(WritableFileWriterTest, AppendStatusReturn) { TEST_F(WritableFileWriterTest, AppendStatusReturn) {
class FakeWF : public WritableFile { class FakeWF : public WritableFile {
public: public:
@ -124,6 +125,7 @@ TEST_F(WritableFileWriterTest, AppendStatusReturn) {
dynamic_cast<FakeWF*>(writer->writable_file())->SetIOError(true); dynamic_cast<FakeWF*>(writer->writable_file())->SetIOError(true);
ASSERT_NOK(writer->Append(std::string(2 * kMb, 'b'))); ASSERT_NOK(writer->Append(std::string(2 * kMb, 'b')));
} }
#endif
} // namespace rocksdb } // namespace rocksdb

Loading…
Cancel
Save