Roundup read bytes in ReadaheadRandomAccessFile

Summary:
Fix alignment in ReadaheadRandomAccessFile
Closes https://github.com/facebook/rocksdb/pull/2253

Differential Revision: D5012336

Pulled By: lightmark

fbshipit-source-id: 10d2c829520cb787227ef653ef63d5d701725778
main
Aaron Gao 8 years ago committed by Facebook Github Bot
parent 264d3f540c
commit 2d42cf5ea9
  1. 8
      db/db_compaction_test.cc
  2. 4
      db/db_impl_open.cc
  3. 15
      util/file_reader_writer.cc

@ -2645,6 +2645,7 @@ TEST_P(DBCompactionDirectIOTest, DirectIO) {
options.use_direct_io_for_flush_and_compaction = GetParam();
options.env = new MockEnv(Env::Default());
Reopen(options);
bool readahead = false;
SyncPoint::GetInstance()->SetCallBack(
"TableCache::NewIterator:for_compaction", [&](void* arg) {
bool* use_direct_reads = static_cast<bool*>(arg);
@ -2657,11 +2658,18 @@ TEST_P(DBCompactionDirectIOTest, DirectIO) {
ASSERT_EQ(*use_direct_writes,
options.use_direct_io_for_flush_and_compaction);
});
if (options.use_direct_io_for_flush_and_compaction) {
SyncPoint::GetInstance()->SetCallBack(
"SanitizeOptions:direct_io", [&](void* arg) {
readahead = true;
});
}
SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu"}, options);
MakeTables(3, "p", "q", 1);
ASSERT_EQ("1,1,1", FilesPerLevel(1));
Compact(1, "p1", "p9");
ASSERT_FALSE(readahead ^ options.use_direct_io_for_flush_and_compaction);
ASSERT_EQ("0,0,1", FilesPerLevel(1));
Destroy(options);
delete options.env;

@ -99,7 +99,9 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
}
if (result.use_direct_reads && result.compaction_readahead_size == 0) {
if (result.use_direct_io_for_flush_and_compaction &&
result.compaction_readahead_size == 0) {
TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr);
result.compaction_readahead_size = 1024 * 1024 * 2;
}

@ -23,6 +23,16 @@
namespace rocksdb {
#ifndef NDEBUG
namespace {
bool IsSectorAligned(const size_t off, size_t sector_size) {
return off % sector_size == 0;
}
}
#endif
Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
Status s;
if (use_direct_io()) {
@ -502,7 +512,8 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
if (prefetch_offset == buffer_offset_) {
return Status::OK();
}
return ReadIntoBuffer(prefetch_offset, offset - prefetch_offset + n);
return ReadIntoBuffer(prefetch_offset,
Roundup(offset + n, alignment_) - prefetch_offset);
}
virtual size_t GetUniqueId(char* id, size_t max_size) const override {
@ -537,6 +548,8 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
if (n > buffer_.Capacity()) {
n = buffer_.Capacity();
}
assert(IsSectorAligned(offset, alignment_));
assert(IsSectorAligned(n, alignment_));
Slice result;
Status s = file_->Read(offset, n, &result, buffer_.BufferStart());
if (s.ok()) {

Loading…
Cancel
Save