// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "file/random_access_file_reader.h" #include #include #include "monitoring/histogram.h" #include "monitoring/iostats_context_imp.h" #include "port/port.h" #include "test_util/sync_point.h" #include "util/random.h" #include "util/rate_limiter.h" namespace ROCKSDB_NAMESPACE { Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, char* scratch, std::unique_ptr* internal_buf, bool for_compaction) const { (void) internal_buf; Status s; uint64_t elapsed = 0; { StopWatch sw(env_, stats_, hist_type_, (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/, true /*delay_enabled*/); auto prev_perf_level = GetPerfLevel(); IOSTATS_TIMER_GUARD(read_nanos); if (use_direct_io()) { #ifndef ROCKSDB_LITE size_t alignment = file_->GetRequiredBufferAlignment(); size_t aligned_offset = TruncateToPageBoundary(alignment, static_cast(offset)); size_t offset_advance = static_cast(offset) - aligned_offset; size_t read_size = Roundup(static_cast(offset + n), alignment) - aligned_offset; AlignedBuffer buf; buf.Alignment(alignment); buf.AllocateNewBuffer(read_size); while (buf.CurrentSize() < read_size) { size_t allowed; if (for_compaction && rate_limiter_ != nullptr) { allowed = rate_limiter_->RequestToken( buf.Capacity() - buf.CurrentSize(), buf.Alignment(), Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead); } else { assert(buf.CurrentSize() == 0); allowed = read_size; } Slice tmp; FileOperationInfo::TimePoint start_ts; uint64_t orig_offset = 0; if (ShouldNotifyListeners()) { start_ts = std::chrono::system_clock::now(); orig_offset = aligned_offset + buf.CurrentSize(); } { IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_); s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, IOOptions(), &tmp, buf.Destination(), nullptr); } if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::system_clock::now(); NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts, s); } buf.Size(buf.CurrentSize() + tmp.size()); if (!s.ok() || tmp.size() < allowed) { break; } } size_t res_len = 0; if (s.ok() && offset_advance < buf.CurrentSize()) { res_len = std::min(buf.CurrentSize() - offset_advance, n); if (internal_buf == nullptr) { buf.Read(scratch, offset_advance, res_len); } else { scratch = buf.BufferStart(); internal_buf->reset(buf.Release()); } } *result = Slice(scratch, res_len); #endif // !ROCKSDB_LITE } else { size_t pos = 0; const char* res_scratch = nullptr; while (pos < n) { size_t allowed; if (for_compaction && rate_limiter_ != nullptr) { if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) { sw.DelayStart(); } allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */, Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead); if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) { sw.DelayStop(); } } else { allowed = n; } Slice tmp_result; #ifndef ROCKSDB_LITE FileOperationInfo::TimePoint start_ts; if (ShouldNotifyListeners()) { start_ts = std::chrono::system_clock::now(); } #endif { IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_); s = file_->Read(offset + pos, allowed, IOOptions(), &tmp_result, scratch + pos, nullptr); } #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::system_clock::now(); NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts, finish_ts, s); } #endif if (res_scratch == nullptr) { // we can't simply use `scratch` because reads of mmap'd files return // data in a different buffer. res_scratch = tmp_result.data(); } else { // make sure chunks are inserted contiguously into `res_scratch`. assert(tmp_result.data() == res_scratch + pos); } pos += tmp_result.size(); if (!s.ok() || tmp_result.size() < allowed) { break; } } *result = Slice(res_scratch, s.ok() ? pos : 0); } IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size()); SetPerfLevel(prev_perf_level); } if (stats_ != nullptr && file_read_hist_ != nullptr) { file_read_hist_->Add(elapsed); } return s; } Status RandomAccessFileReader::MultiRead(FSReadRequest* read_reqs, size_t num_reqs) const { Status s; uint64_t elapsed = 0; assert(!use_direct_io()); { StopWatch sw(env_, stats_, hist_type_, (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/, true /*delay_enabled*/); auto prev_perf_level = GetPerfLevel(); IOSTATS_TIMER_GUARD(read_nanos); #ifndef ROCKSDB_LITE FileOperationInfo::TimePoint start_ts; if (ShouldNotifyListeners()) { start_ts = std::chrono::system_clock::now(); } #endif // ROCKSDB_LITE { IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_); s = file_->MultiRead(read_reqs, num_reqs, IOOptions(), nullptr); } for (size_t i = 0; i < num_reqs; ++i) { #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::system_clock::now(); NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(), start_ts, finish_ts, read_reqs[i].status); } #endif // ROCKSDB_LITE IOSTATS_ADD_IF_POSITIVE(bytes_read, read_reqs[i].result.size()); } SetPerfLevel(prev_perf_level); } if (stats_ != nullptr && file_read_hist_ != nullptr) { file_read_hist_->Add(elapsed); } return s; } } // namespace ROCKSDB_NAMESPACE