diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 00934484a..1fa9a1a9f 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -2387,17 +2387,22 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet { class DeadlineRandomAccessFile : public FSRandomAccessFileWrapper { public: - DeadlineRandomAccessFile(DeadlineFS& fs, + DeadlineRandomAccessFile(DeadlineFS& fs, SpecialEnv* env, std::unique_ptr& file) : FSRandomAccessFileWrapper(file.get()), fs_(fs), - file_(std::move(file)) {} + file_(std::move(file)), + env_(env) {} IOStatus Read(uint64_t offset, size_t len, const IOOptions& opts, Slice* result, char* scratch, IODebugContext* dbg) const override { int delay; + const std::chrono::microseconds deadline = fs_.GetDeadline(); + if (deadline.count()) { + AssertDeadline(deadline, opts); + } if (fs_.ShouldDelay(&delay)) { - Env::Default()->SleepForMicroseconds(delay); + env_->SleepForMicroseconds(delay); } return FSRandomAccessFileWrapper::Read(offset, len, opts, result, scratch, dbg); @@ -2406,22 +2411,37 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet { IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs, const IOOptions& options, IODebugContext* dbg) override { int delay; + const std::chrono::microseconds deadline = fs_.GetDeadline(); + if (deadline.count()) { + AssertDeadline(deadline, options); + } if (fs_.ShouldDelay(&delay)) { - Env::Default()->SleepForMicroseconds(delay); + env_->SleepForMicroseconds(delay); } return FSRandomAccessFileWrapper::MultiRead(reqs, num_reqs, options, dbg); } private: + void AssertDeadline(const std::chrono::microseconds deadline, + const IOOptions& opts) const { + // Give a leeway of +- 10us as it can take some time for the Get/ + // MultiGet call to reach here, in order to avoid false alarms + std::chrono::microseconds now = + std::chrono::microseconds(env_->NowMicros()); + ASSERT_EQ(deadline - now, opts.timeout); + } DeadlineFS& fs_; std::unique_ptr file_; + SpecialEnv* env_; }; class DeadlineFS : public FileSystemWrapper { public: - DeadlineFS() - : FileSystemWrapper(FileSystem::Default()), - delay_idx_(0) {} + DeadlineFS(SpecialEnv* env) + : FileSystemWrapper(FileSystem::Default()), + delay_idx_(0), + deadline_(std::chrono::microseconds::zero()), + env_(env) {} ~DeadlineFS() = default; IOStatus NewRandomAccessFile(const std::string& fname, @@ -2432,13 +2452,14 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet { IOStatus s; s = target()->NewRandomAccessFile(fname, opts, &file, dbg); - result->reset(new DeadlineRandomAccessFile(*this, file)); + result->reset(new DeadlineRandomAccessFile(*this, env_, file)); return s; } // Set a vector of {IO counter, delay in microseconds} pairs that control // when to inject a delay and duration of the delay - void SetDelaySequence(const std::vector>&& seq) { + void SetDelaySequence(const std::chrono::microseconds deadline, + const std::vector>&& seq) { int total_delay = 0; for (auto& seq_iter : seq) { // Ensure no individual delay is > 500ms @@ -2451,6 +2472,7 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet { delay_seq_ = seq; delay_idx_ = 0; io_count_ = 0; + deadline_ = deadline; } // Increment the IO counter and return a delay in microseconds @@ -2464,10 +2486,14 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet { return false; } + const std::chrono::microseconds GetDeadline() { return deadline_; } + private: std::vector> delay_seq_; size_t delay_idx_; int io_count_; + std::chrono::microseconds deadline_; + SpecialEnv* env_; }; inline void CheckStatus(std::vector& statuses, size_t num_ok) { @@ -2483,8 +2509,10 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet { TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { std::shared_ptr fs( - new DBBasicTestMultiGetDeadline::DeadlineFS()); - std::unique_ptr env = NewCompositeEnv(fs); + new DBBasicTestMultiGetDeadline::DeadlineFS(env_)); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + env_->no_slowdown_ = true; + env_->time_elapse_only_sleep_.store(true); Options options = CurrentOptions(); std::shared_ptr cache = NewLRUCache(1048576); @@ -2509,13 +2537,13 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { cfs[i] = handles_[i]; keys[i] = Slice(key_str[i].data(), key_str[i].size()); } - // Delay the first IO by 200ms - fs->SetDelaySequence({{0, 200000}}); ReadOptions ro; ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; + // Delay the first IO by 200ms + fs->SetDelaySequence(ro.deadline, {{0, 20000}}); + std::vector statuses = dbfull()->MultiGet(ro, cfs, keys, &values); - std::cout << "Non-batched MultiGet"; // The first key is successful because we check after the lookup, but // subsequent keys fail due to deadline exceeded CheckStatus(statuses, 1); @@ -2537,10 +2565,9 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { cfs[i] = handles_[i / 2]; keys[i] = Slice(key_str[i].data(), key_str[i].size()); } - fs->SetDelaySequence({{1, 200000}}); ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; + fs->SetDelaySequence(ro.deadline, {{1, 20000}}); statuses = dbfull()->MultiGet(ro, cfs, keys, &values); - std::cout << "Non-batched 2"; CheckStatus(statuses, 3); // Test batched MultiGet with an IO delay in the first data block read. @@ -2552,11 +2579,10 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { cache->SetCapacity(1048576); statuses.clear(); statuses.resize(keys.size()); - fs->SetDelaySequence({{0, 200000}}); ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; + fs->SetDelaySequence(ro.deadline, {{0, 20000}}); dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(), pin_values.data(), statuses.data()); - std::cout << "Batched 1"; CheckStatus(statuses, 2); // Similar to the previous one, but an IO delay in the third CF data block @@ -2568,11 +2594,10 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { cache->SetCapacity(1048576); statuses.clear(); statuses.resize(keys.size()); - fs->SetDelaySequence({{2, 200000}}); ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; + fs->SetDelaySequence(ro.deadline, {{2, 20000}}); dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(), pin_values.data(), statuses.data()); - std::cout << "Batched 2"; CheckStatus(statuses, 6); // Similar to the previous one, but an IO delay in the last but one CF @@ -2583,11 +2608,10 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { cache->SetCapacity(1048576); statuses.clear(); statuses.resize(keys.size()); - fs->SetDelaySequence({{3, 200000}}); ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; + fs->SetDelaySequence(ro.deadline, {{3, 20000}}); dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(), pin_values.data(), statuses.data()); - std::cout << "Batched 3"; CheckStatus(statuses, 8); // Test batched MultiGet with single CF and lots of keys. Inject delay @@ -2610,11 +2634,10 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { } statuses.clear(); statuses.resize(keys.size()); - fs->SetDelaySequence({{1, 200000}}); ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; + fs->SetDelaySequence(ro.deadline, {{1, 20000}}); dbfull()->MultiGet(ro, handles_[0], keys.size(), keys.data(), pin_values.data(), statuses.data()); - std::cout << "Batched single CF"; CheckStatus(statuses, 64); Close(); } diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index a3fd0df45..d7a040885 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1527,11 +1527,6 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, GetImplOptions& get_impl_options) { assert(get_impl_options.value != nullptr || get_impl_options.merge_operands != nullptr); - // We will eventually support deadline for Get requests too, but safeguard - // for now - if (read_options.deadline != std::chrono::microseconds::zero()) { - return Status::NotSupported("ReadOptions deadline is not supported"); - } #ifndef NDEBUG assert(get_impl_options.column_family); diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index f688a964f..c333e2ea5 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -88,8 +88,7 @@ Status FilePrefetchBuffer::Prefetch(RandomAccessFileReader* reader, Slice result; size_t read_len = static_cast(roundup_len - chunk_len); - s = reader->Read(rounddown_offset + chunk_len, - read_len, &result, + s = reader->Read(IOOptions(), rounddown_offset + chunk_len, read_len, &result, buffer_.BufferStart() + chunk_len, nullptr, for_compaction); #ifndef NDEBUG if (!s.ok() || result.size() < read_len) { diff --git a/file/file_util.h b/file/file_util.h index 333749e4d..dd7d8f500 100644 --- a/file/file_util.h +++ b/file/file_util.h @@ -30,4 +30,20 @@ extern Status DeleteDBFile(const ImmutableDBOptions* db_options, extern bool IsWalDirSameAsDBPath(const ImmutableDBOptions* db_options); +inline IOStatus PrepareIOFromReadOptions(const ReadOptions& ro, Env* env, + IOOptions& opts) { + if (!env) { + env = Env::Default(); + } + + if (ro.deadline.count()) { + std::chrono::microseconds now = std::chrono::microseconds(env->NowMicros()); + if (now > ro.deadline) { + return IOStatus::TimedOut("Deadline exceeded"); + } + opts.timeout = ro.deadline - now; + } + return IOStatus::OK(); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index 03a574254..8f98aba53 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -15,14 +15,16 @@ #include "monitoring/histogram.h" #include "monitoring/iostats_context_imp.h" #include "port/port.h" +#include "table/format.h" #include "test_util/sync_point.h" #include "util/random.h" #include "util/rate_limiter.h" namespace ROCKSDB_NAMESPACE { -Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, - char* scratch, AlignedBuf* aligned_buf, +Status RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset, + size_t n, Slice* result, char* scratch, + AlignedBuf* aligned_buf, bool for_compaction) const { (void)aligned_buf; Status s; @@ -62,10 +64,16 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, start_ts = std::chrono::system_clock::now(); orig_offset = aligned_offset + buf.CurrentSize(); } + { IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_); - s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, - IOOptions(), &tmp, buf.Destination(), nullptr); + // Only user reads are expected to specify a timeout. And user reads + // are not subjected to rate_limiter and should go through only + // one iteration of this loop, so we don't need to check and adjust + // the opts.timeout before calling file_->Read + assert(!opts.timeout.count() || allowed == read_size); + s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, opts, + &tmp, buf.Destination(), nullptr); } if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::system_clock::now(); @@ -116,9 +124,15 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, start_ts = std::chrono::system_clock::now(); } #endif + { IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_); - s = file_->Read(offset + pos, allowed, IOOptions(), &tmp_result, + // Only user reads are expected to specify a timeout. And user reads + // are not subjected to rate_limiter and should go through only + // one iteration of this loop, so we don't need to check and adjust + // the opts.timeout before calling file_->Read + assert(!opts.timeout.count() || allowed == n); + s = file_->Read(offset + pos, allowed, opts, &tmp_result, scratch + pos, nullptr); } #ifndef ROCKSDB_LITE @@ -186,7 +200,8 @@ bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) { return true; } -Status RandomAccessFileReader::MultiRead(FSReadRequest* read_reqs, +Status RandomAccessFileReader::MultiRead(const IOOptions& opts, + FSReadRequest* read_reqs, size_t num_reqs, AlignedBuf* aligned_buf) const { (void)aligned_buf; // suppress warning of unused variable in LITE mode @@ -244,9 +259,10 @@ Status RandomAccessFileReader::MultiRead(FSReadRequest* read_reqs, start_ts = std::chrono::system_clock::now(); } #endif // ROCKSDB_LITE + { IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_); - s = file_->MultiRead(fs_reqs, num_fs_reqs, IOOptions(), nullptr); + s = file_->MultiRead(fs_reqs, num_fs_reqs, opts, nullptr); } #ifndef ROCKSDB_LITE diff --git a/file/random_access_file_reader.h b/file/random_access_file_reader.h index 1cdc388fb..5dba52c58 100644 --- a/file/random_access_file_reader.h +++ b/file/random_access_file_reader.h @@ -114,15 +114,16 @@ class RandomAccessFileReader { // 2. Otherwise, scratch is not used and can be null, the aligned_buf owns // the internally allocated buffer on return, and the result refers to a // region in aligned_buf. - Status Read(uint64_t offset, size_t n, Slice* result, char* scratch, - AlignedBuf* aligned_buf, bool for_compaction = false) const; + Status Read(const IOOptions& opts, uint64_t offset, size_t n, Slice* result, + char* scratch, AlignedBuf* aligned_buf, + bool for_compaction = false) const; // REQUIRES: // num_reqs > 0, reqs do not overlap, and offsets in reqs are increasing. // In non-direct IO mode, aligned_buf should be null; // In direct IO mode, aligned_buf stores the aligned buffer allocated inside // MultiRead, the result Slices in reqs refer to aligned_buf. - Status MultiRead(FSReadRequest* reqs, size_t num_reqs, + Status MultiRead(const IOOptions& opts, FSReadRequest* reqs, size_t num_reqs, AlignedBuf* aligned_buf) const; Status Prefetch(uint64_t offset, size_t n) const { @@ -134,5 +135,7 @@ class RandomAccessFileReader { std::string file_name() const { return file_name_; } bool use_direct_io() const { return file_->use_direct_io(); } + + Env* env() const { return env_; } }; } // namespace ROCKSDB_NAMESPACE diff --git a/file/random_access_file_reader_test.cc b/file/random_access_file_reader_test.cc index 03f9da59d..499021ec5 100644 --- a/file/random_access_file_reader_test.cc +++ b/file/random_access_file_reader_test.cc @@ -106,7 +106,8 @@ TEST_F(RandomAccessFileReaderTest, ReadDirectIO) { Slice result; AlignedBuf buf; for (bool for_compaction : {true, false}) { - ASSERT_OK(r->Read(offset, len, &result, nullptr, &buf, for_compaction)); + ASSERT_OK(r->Read(IOOptions(), offset, len, &result, nullptr, &buf, + for_compaction)); ASSERT_EQ(result.ToString(), content.substr(offset, len)); } } @@ -152,7 +153,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) { reqs.push_back(std::move(r0)); reqs.push_back(std::move(r1)); AlignedBuf aligned_buf; - ASSERT_OK(r->MultiRead(reqs.data(), reqs.size(), &aligned_buf)); + ASSERT_OK( + r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf)); AssertResult(content, reqs); } @@ -189,7 +191,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) { reqs.push_back(std::move(r1)); reqs.push_back(std::move(r2)); AlignedBuf aligned_buf; - ASSERT_OK(r->MultiRead(reqs.data(), reqs.size(), &aligned_buf)); + ASSERT_OK( + r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf)); AssertResult(content, reqs); } @@ -226,7 +229,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) { reqs.push_back(std::move(r1)); reqs.push_back(std::move(r2)); AlignedBuf aligned_buf; - ASSERT_OK(r->MultiRead(reqs.data(), reqs.size(), &aligned_buf)); + ASSERT_OK( + r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf)); AssertResult(content, reqs); } @@ -255,7 +259,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) { reqs.push_back(std::move(r0)); reqs.push_back(std::move(r1)); AlignedBuf aligned_buf; - ASSERT_OK(r->MultiRead(reqs.data(), reqs.size(), &aligned_buf)); + ASSERT_OK( + r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf)); AssertResult(content, reqs); } diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index 4fdd51c67..13a260225 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -77,14 +77,16 @@ enum class IOType : uint8_t { // honored. More hints can be added here in the future to indicate things like // storage media (HDD/SSD) to be used, replication level etc. struct IOOptions { - // Timeout for the operation in milliseconds - std::chrono::milliseconds timeout; + // Timeout for the operation in microseconds + std::chrono::microseconds timeout; // Priority - high or low IOPriority prio; // Type of data being read/written IOType type; + + IOOptions() : timeout(0), prio(IOPriority::kIOLow), type(IOType::kUnknown) {} }; // File scope options that control how a file is opened/created and accessed diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 16c09f161..10b68ff4e 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1346,13 +1346,14 @@ struct ReadOptions { const Slice* timestamp; const Slice* iter_start_ts; - // Deadline for completing the read request (only MultiGet for now) in us. - // It should be set to some number of microseconds since a fixed point in - // time, identical to that used by system time. The best way is to use - // env->NowMicros() + some timeout. This is best efforts. The call may - // exceed the deadline if there is IO involved and the file system doesn't - // support deadlines, or due to checking for deadline periodically rather - // than for every key if processing a batch + // Deadline for completing the read request (only Get/MultiGet for now) in us. + // It should be set to microseconds since epoch, i.e, gettimeofday or + // equivalent plus allowed duration in microseconds. The best way is to use + // env->NowMicros() + some timeout. + // This is best efforts. The call may exceed the deadline if there is IO + // involved and the file system doesn't support deadlines, or due to + // checking for deadline periodically rather than for every key if + // processing a batch std::chrono::microseconds deadline; ReadOptions(); diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 2ee0cfe0e..4e852ebc2 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -20,6 +20,7 @@ #include "db/dbformat.h" #include "db/pinned_iterators_manager.h" #include "file/file_prefetch_buffer.h" +#include "file/file_util.h" #include "file/random_access_file_reader.h" #include "monitoring/perf_context_imp.h" #include "options/options_helper.h" @@ -1671,7 +1672,17 @@ void BlockBasedTable::RetrieveMultipleBlocks( read_reqs.emplace_back(req); } - file->MultiRead(&read_reqs[0], read_reqs.size(), nullptr); + { + IOOptions opts; + IOStatus s = PrepareIOFromReadOptions(options, file->env(), opts); + if (s.IsTimedOut()) { + for (FSReadRequest& req : read_reqs) { + req.status = s; + } + } else { + file->MultiRead(opts, &read_reqs[0], read_reqs.size(), nullptr); + } + } idx_in_batch = 0; size_t valid_batch_idx = 0; diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index 61c80cb9e..bcb5c4340 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -12,6 +12,7 @@ #include #include +#include "file/file_util.h" #include "logging/logging.h" #include "memory/memory_allocator.h" #include "monitoring/perf_context_imp.h" @@ -235,29 +236,33 @@ Status BlockFetcher::ReadBlockContents() { return status_; } } else if (!TryGetCompressedBlockFromPersistentCache()) { - // Actual file read - if (file_->use_direct_io()) { - PERF_TIMER_GUARD(block_read_time); - status_ = - file_->Read(handle_.offset(), block_size_with_trailer_, - &slice_, nullptr, &direct_io_buf_, for_compaction_); - PERF_COUNTER_ADD(block_read_count, 1); - used_buf_ = const_cast(slice_.data()); - } else { - PrepareBufferForBlockFromFile(); - PERF_TIMER_GUARD(block_read_time); - status_ = file_->Read(handle_.offset(), block_size_with_trailer_, - &slice_, used_buf_, nullptr, for_compaction_); - PERF_COUNTER_ADD(block_read_count, 1); + IOOptions opts; + status_ = PrepareIOFromReadOptions(read_options_, file_->env(), opts); + // Actual file read + if (status_.ok()) { + if (file_->use_direct_io()) { + PERF_TIMER_GUARD(block_read_time); + status_ = + file_->Read(opts, handle_.offset(), block_size_with_trailer_, + &slice_, nullptr, &direct_io_buf_, for_compaction_); + PERF_COUNTER_ADD(block_read_count, 1); + used_buf_ = const_cast(slice_.data()); + } else { + PrepareBufferForBlockFromFile(); + PERF_TIMER_GUARD(block_read_time); + status_ = file_->Read(opts, handle_.offset(), block_size_with_trailer_, + &slice_, used_buf_, nullptr, for_compaction_); + PERF_COUNTER_ADD(block_read_count, 1); #ifndef NDEBUG - if (used_buf_ == &stack_buf_[0]) { - num_stack_buf_memcpy_++; - } else if (used_buf_ == heap_buf_.get()) { - num_heap_buf_memcpy_++; - } else if (used_buf_ == compressed_buf_.get()) { - num_compressed_buf_memcpy_++; - } + if (used_buf_ == &stack_buf_[0]) { + num_stack_buf_memcpy_++; + } else if (used_buf_ == heap_buf_.get()) { + num_heap_buf_memcpy_++; + } else if (used_buf_ == compressed_buf_.get()) { + num_compressed_buf_memcpy_++; + } #endif + } } // TODO: introduce dedicated perf counter for range tombstones diff --git a/table/cuckoo/cuckoo_table_builder_test.cc b/table/cuckoo/cuckoo_table_builder_test.cc index 80181f067..fbddded57 100644 --- a/table/cuckoo/cuckoo_table_builder_test.cc +++ b/table/cuckoo/cuckoo_table_builder_test.cc @@ -112,8 +112,8 @@ class CuckooBuilderTest : public testing::Test { size_t bucket_size = expected_unused_bucket.size(); for (uint32_t i = 0; i < table_size + cuckoo_block_size - 1; ++i) { Slice read_slice; - ASSERT_OK(file_reader->Read(i * bucket_size, bucket_size, &read_slice, - nullptr, nullptr)); + ASSERT_OK(file_reader->Read(IOOptions(), i * bucket_size, bucket_size, + &read_slice, nullptr, nullptr)); size_t key_idx = std::find(expected_locations.begin(), expected_locations.end(), i) - expected_locations.begin(); diff --git a/table/cuckoo/cuckoo_table_reader.cc b/table/cuckoo/cuckoo_table_reader.cc index 1c068b08e..66b77249a 100644 --- a/table/cuckoo/cuckoo_table_reader.cc +++ b/table/cuckoo/cuckoo_table_reader.cc @@ -137,8 +137,8 @@ CuckooTableReader::CuckooTableReader( cuckoo_block_size_ = *reinterpret_cast( cuckoo_block_size->second.data()); cuckoo_block_bytes_minus_one_ = cuckoo_block_size_ * bucket_length_ - 1; - status_ = file_->Read(0, static_cast(file_size), &file_data_, nullptr, - nullptr); + status_ = file_->Read(IOOptions(), 0, static_cast(file_size), + &file_data_, nullptr, nullptr); } Status CuckooTableReader::Get(const ReadOptions& /*readOptions*/, diff --git a/table/format.cc b/table/format.cc index c84a59904..04a54e42d 100644 --- a/table/format.cc +++ b/table/format.cc @@ -304,12 +304,12 @@ Status ReadFooterFromFile(RandomAccessFileReader* file, !prefetch_buffer->TryReadFromCache(read_offset, Footer::kMaxEncodedLength, &footer_input)) { if (file->use_direct_io()) { - s = file->Read(read_offset, Footer::kMaxEncodedLength, &footer_input, - nullptr, &internal_buf); + s = file->Read(IOOptions(), read_offset, Footer::kMaxEncodedLength, + &footer_input, nullptr, &internal_buf); } else { footer_buf.reserve(Footer::kMaxEncodedLength); - s = file->Read(read_offset, Footer::kMaxEncodedLength, &footer_input, - &footer_buf[0], nullptr); + s = file->Read(IOOptions(), read_offset, Footer::kMaxEncodedLength, + &footer_input, &footer_buf[0], nullptr); } if (!s.ok()) return s; } diff --git a/table/mock_table.cc b/table/mock_table.cc index aabde104b..a1a5ed8d7 100644 --- a/table/mock_table.cc +++ b/table/mock_table.cc @@ -114,7 +114,7 @@ uint32_t MockTableFactory::GetAndWriteNextID(WritableFileWriter* file) const { uint32_t MockTableFactory::GetIDFromFile(RandomAccessFileReader* file) const { char buf[4]; Slice result; - file->Read(0, 4, &result, buf, nullptr); + file->Read(IOOptions(), 0, 4, &result, buf, nullptr); assert(result.size() == 4); return DecodeFixed32(buf); } diff --git a/table/plain/plain_table_key_coding.cc b/table/plain/plain_table_key_coding.cc index c88a2dff6..6c7b3c751 100644 --- a/table/plain/plain_table_key_coding.cc +++ b/table/plain/plain_table_key_coding.cc @@ -210,8 +210,9 @@ bool PlainTableFileReader::ReadNonMmap(uint32_t file_offset, uint32_t len, new_buffer->buf_len = 0; } Slice read_result; - Status s = file_info_->file->Read(file_offset, size_to_read, &read_result, - new_buffer->buf.get(), nullptr); + Status s = + file_info_->file->Read(IOOptions(), file_offset, size_to_read, + &read_result, new_buffer->buf.get(), nullptr); if (!s.ok()) { status_ = s; return false; diff --git a/table/plain/plain_table_reader.cc b/table/plain/plain_table_reader.cc index 484a1105b..052cf45d6 100644 --- a/table/plain/plain_table_reader.cc +++ b/table/plain/plain_table_reader.cc @@ -289,7 +289,8 @@ void PlainTableReader::FillBloom(const std::vector& prefix_hashes) { Status PlainTableReader::MmapDataIfNeeded() { if (file_info_.is_mmap_mode) { // Get mmapped memory. - return file_info_.file->Read(0, static_cast(file_size_), + return file_info_.file->Read(IOOptions(), 0, + static_cast(file_size_), &file_info_.file_data, nullptr, nullptr); } return Status::OK(); diff --git a/table/table_test.cc b/table/table_test.cc index 17f8d1b9c..1efa3089d 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -1264,16 +1264,16 @@ class FileChecksumTestHelper { Slice result; uint64_t offset = 0; Status s; - s = file_reader_->Read(offset, 2048, &result, scratch.get(), nullptr, - false); + s = file_reader_->Read(IOOptions(), offset, 2048, &result, scratch.get(), + nullptr, false); if (!s.ok()) { return s; } while (result.size() != 0) { file_checksum_generator->Update(scratch.get(), result.size()); offset += static_cast(result.size()); - s = file_reader_->Read(offset, 2048, &result, scratch.get(), nullptr, - false); + s = file_reader_->Read(IOOptions(), offset, 2048, &result, scratch.get(), + nullptr, false); if (!s.ok()) { return s; } diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 70c2f4197..bf3ad6426 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -1491,12 +1491,14 @@ Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number, { StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS); if (reader->use_direct_io()) { - s = reader->Read(record_offset, static_cast(record_size), - &blob_record, nullptr, &aligned_buf); + s = reader->Read(IOOptions(), record_offset, + static_cast(record_size), &blob_record, nullptr, + &aligned_buf); } else { buf.reserve(static_cast(record_size)); - s = reader->Read(record_offset, static_cast(record_size), - &blob_record, &buf[0], nullptr); + s = reader->Read(IOOptions(), record_offset, + static_cast(record_size), &blob_record, &buf[0], + nullptr); } RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size()); } diff --git a/utilities/blob_db/blob_dump_tool.cc b/utilities/blob_db/blob_dump_tool.cc index 26f9180ab..ccaee5845 100644 --- a/utilities/blob_db/blob_dump_tool.cc +++ b/utilities/blob_db/blob_dump_tool.cc @@ -101,7 +101,8 @@ Status BlobDumpTool::Read(uint64_t offset, size_t size, Slice* result) { } buffer_.reset(new char[buffer_size_]); } - Status s = reader_->Read(offset, size, result, buffer_.get(), nullptr); + Status s = + reader_->Read(IOOptions(), offset, size, result, buffer_.get(), nullptr); if (!s.ok()) { return s; } diff --git a/utilities/blob_db/blob_file.cc b/utilities/blob_db/blob_file.cc index 3db6cfc24..af5742f15 100644 --- a/utilities/blob_db/blob_file.cc +++ b/utilities/blob_db/blob_file.cc @@ -142,12 +142,12 @@ Status BlobFile::ReadFooter(BlobLogFooter* bf) { AlignedBuf aligned_buf; Status s; if (ra_file_reader_->use_direct_io()) { - s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kSize, &result, - nullptr, &aligned_buf); + s = ra_file_reader_->Read(IOOptions(), footer_offset, BlobLogFooter::kSize, + &result, nullptr, &aligned_buf); } else { buf.reserve(BlobLogFooter::kSize + 10); - s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kSize, &result, - &buf[0], nullptr); + s = ra_file_reader_->Read(IOOptions(), footer_offset, BlobLogFooter::kSize, + &result, &buf[0], nullptr); } if (!s.ok()) return s; if (result.size() != BlobLogFooter::kSize) { @@ -266,11 +266,11 @@ Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) { AlignedBuf aligned_buf; Slice header_slice; if (file_reader->use_direct_io()) { - s = file_reader->Read(0, BlobLogHeader::kSize, &header_slice, nullptr, - &aligned_buf); + s = file_reader->Read(IOOptions(), 0, BlobLogHeader::kSize, &header_slice, + nullptr, &aligned_buf); } else { header_buf.reserve(BlobLogHeader::kSize); - s = file_reader->Read(0, BlobLogHeader::kSize, &header_slice, + s = file_reader->Read(IOOptions(), 0, BlobLogHeader::kSize, &header_slice, &header_buf[0], nullptr); } if (!s.ok()) { @@ -306,12 +306,12 @@ Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) { std::string footer_buf; Slice footer_slice; if (file_reader->use_direct_io()) { - s = file_reader->Read(file_size - BlobLogFooter::kSize, + s = file_reader->Read(IOOptions(), file_size - BlobLogFooter::kSize, BlobLogFooter::kSize, &footer_slice, nullptr, &aligned_buf); } else { footer_buf.reserve(BlobLogFooter::kSize); - s = file_reader->Read(file_size - BlobLogFooter::kSize, + s = file_reader->Read(IOOptions(), file_size - BlobLogFooter::kSize, BlobLogFooter::kSize, &footer_slice, &footer_buf[0], nullptr); } diff --git a/utilities/blob_db/blob_log_reader.cc b/utilities/blob_db/blob_log_reader.cc index 913e9282d..31be72d76 100644 --- a/utilities/blob_db/blob_log_reader.cc +++ b/utilities/blob_db/blob_log_reader.cc @@ -26,8 +26,8 @@ Reader::Reader(std::unique_ptr&& file_reader, Env* env, Status Reader::ReadSlice(uint64_t size, Slice* slice, char* buf) { StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS); - Status s = - file_->Read(next_byte_, static_cast(size), slice, buf, nullptr); + Status s = file_->Read(IOOptions(), next_byte_, static_cast(size), + slice, buf, nullptr); next_byte_ += size; if (!s.ok()) { return s; diff --git a/utilities/persistent_cache/block_cache_tier_file.cc b/utilities/persistent_cache/block_cache_tier_file.cc index cde84bf71..d2f92a98f 100644 --- a/utilities/persistent_cache/block_cache_tier_file.cc +++ b/utilities/persistent_cache/block_cache_tier_file.cc @@ -235,7 +235,8 @@ bool RandomAccessCacheFile::Read(const LBA& lba, Slice* key, Slice* val, } Slice result; - Status s = freader_->Read(lba.off_, lba.size_, &result, scratch, nullptr); + Status s = freader_->Read(IOOptions(), lba.off_, lba.size_, &result, scratch, + nullptr); if (!s.ok()) { Error(log_, "Error reading from file %s. %s", Path().c_str(), s.ToString().c_str()); diff --git a/utilities/trace/file_trace_reader_writer.cc b/utilities/trace/file_trace_reader_writer.cc index e9220579c..77932dd29 100644 --- a/utilities/trace/file_trace_reader_writer.cc +++ b/utilities/trace/file_trace_reader_writer.cc @@ -33,8 +33,8 @@ Status FileTraceReader::Close() { Status FileTraceReader::Read(std::string* data) { assert(file_reader_ != nullptr); - Status s = file_reader_->Read(offset_, kTraceMetadataSize, &result_, buffer_, - nullptr); + Status s = file_reader_->Read(IOOptions(), offset_, kTraceMetadataSize, + &result_, buffer_, nullptr); if (!s.ok()) { return s; } @@ -58,7 +58,8 @@ Status FileTraceReader::Read(std::string* data) { unsigned int to_read = bytes_to_read > kBufferSize ? kBufferSize : bytes_to_read; while (to_read > 0) { - s = file_reader_->Read(offset_, to_read, &result_, buffer_, nullptr); + s = file_reader_->Read(IOOptions(), offset_, to_read, &result_, buffer_, + nullptr); if (!s.ok()) { return s; }