diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index e6d820ad4..4ec901bf7 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -2645,6 +2645,7 @@ TEST_P(DBCompactionDirectIOTest, DirectIO) { options.use_direct_io_for_flush_and_compaction = GetParam(); options.env = new MockEnv(Env::Default()); Reopen(options); + bool readahead = false; SyncPoint::GetInstance()->SetCallBack( "TableCache::NewIterator:for_compaction", [&](void* arg) { bool* use_direct_reads = static_cast(arg); @@ -2657,11 +2658,18 @@ TEST_P(DBCompactionDirectIOTest, DirectIO) { ASSERT_EQ(*use_direct_writes, options.use_direct_io_for_flush_and_compaction); }); + if (options.use_direct_io_for_flush_and_compaction) { + SyncPoint::GetInstance()->SetCallBack( + "SanitizeOptions:direct_io", [&](void* arg) { + readahead = true; + }); + } SyncPoint::GetInstance()->EnableProcessing(); CreateAndReopenWithCF({"pikachu"}, options); MakeTables(3, "p", "q", 1); ASSERT_EQ("1,1,1", FilesPerLevel(1)); Compact(1, "p1", "p9"); + ASSERT_FALSE(readahead ^ options.use_direct_io_for_flush_and_compaction); ASSERT_EQ("0,0,1", FilesPerLevel(1)); Destroy(options); delete options.env; diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 341f01d04..b0621571b 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -99,7 +99,9 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { result.db_paths.emplace_back(dbname, std::numeric_limits::max()); } - if (result.use_direct_reads && result.compaction_readahead_size == 0) { + if (result.use_direct_io_for_flush_and_compaction && + result.compaction_readahead_size == 0) { + TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr); result.compaction_readahead_size = 1024 * 1024 * 2; } diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index 690a8609f..c27047a5b 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -23,6 +23,16 @@ namespace rocksdb { +#ifndef NDEBUG +namespace { + +bool IsSectorAligned(const size_t off, size_t sector_size) { + return off % sector_size == 0; +} + +} +#endif + Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { Status s; if (use_direct_io()) { @@ -502,7 +512,8 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { if (prefetch_offset == buffer_offset_) { return Status::OK(); } - return ReadIntoBuffer(prefetch_offset, offset - prefetch_offset + n); + return ReadIntoBuffer(prefetch_offset, + Roundup(offset + n, alignment_) - prefetch_offset); } virtual size_t GetUniqueId(char* id, size_t max_size) const override { @@ -537,6 +548,8 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { if (n > buffer_.Capacity()) { n = buffer_.Capacity(); } + assert(IsSectorAligned(offset, alignment_)); + assert(IsSectorAligned(n, alignment_)); Slice result; Status s = file_->Read(offset, n, &result, buffer_.BufferStart()); if (s.ok()) {