// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "file/sequence_file_reader.h" #include #include #include "file/read_write_util.h" #include "monitoring/histogram.h" #include "monitoring/iostats_context_imp.h" #include "port/port.h" #include "test_util/sync_point.h" #include "util/aligned_buffer.h" #include "util/random.h" #include "util/rate_limiter.h" namespace ROCKSDB_NAMESPACE { IOStatus SequentialFileReader::Create( const std::shared_ptr& fs, const std::string& fname, const FileOptions& file_opts, std::unique_ptr* reader, IODebugContext* dbg) { std::unique_ptr file; IOStatus io_s = fs->NewSequentialFile(fname, file_opts, &file, dbg); if (io_s.ok()) { reader->reset(new SequentialFileReader(std::move(file), fname)); } return io_s; } IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { IOStatus io_s; if (use_direct_io()) { #ifndef ROCKSDB_LITE size_t offset = offset_.fetch_add(n); size_t alignment = file_->GetRequiredBufferAlignment(); size_t aligned_offset = TruncateToPageBoundary(alignment, offset); size_t offset_advance = offset - aligned_offset; size_t size = Roundup(offset + n, alignment) - aligned_offset; size_t r = 0; AlignedBuffer buf; buf.Alignment(alignment); buf.AllocateNewBuffer(size); Slice tmp; uint64_t orig_offset = 0; FileOperationInfo::StartTimePoint start_ts; if (ShouldNotifyListeners()) { orig_offset = aligned_offset + buf.CurrentSize(); start_ts = FileOperationInfo::StartNow(); } io_s = file_->PositionedRead(aligned_offset, size, IOOptions(), &tmp, buf.BufferStart(), nullptr); if (io_s.ok() && offset_advance < tmp.size()) { buf.Size(tmp.size()); r = buf.Read(scratch, offset_advance, std::min(tmp.size() - offset_advance, n)); } *result = Slice(scratch, r); if (ShouldNotifyListeners()) { auto finish_ts = FileOperationInfo::FinishNow(); NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts, io_s); } #endif // !ROCKSDB_LITE } else { // To be paranoid, modify scratch a little bit, so in case underlying // FileSystem doesn't fill the buffer but return succee and `scratch` // returns contains a previous block, returned value will not pass // checksum. // It's hard to find useful byte for direct I/O case, so we skip it. if (n > 0 && scratch != nullptr) { scratch[0]++; } #ifndef ROCKSDB_LITE FileOperationInfo::StartTimePoint start_ts; if (ShouldNotifyListeners()) { start_ts = FileOperationInfo::StartNow(); } #endif io_s = file_->Read(n, IOOptions(), result, scratch, nullptr); #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = FileOperationInfo::FinishNow(); size_t offset = offset_.fetch_add(result->size()); NotifyOnFileReadFinish(offset, result->size(), start_ts, finish_ts, io_s); } #endif } IOSTATS_ADD(bytes_read, result->size()); return io_s; } IOStatus SequentialFileReader::Skip(uint64_t n) { #ifndef ROCKSDB_LITE if (use_direct_io()) { offset_ += static_cast(n); return IOStatus::OK(); } #endif // !ROCKSDB_LITE return file_->Skip(n); } namespace { // This class wraps a SequentialFile, exposing same API, with the differenece // of being able to prefetch up to readahead_size bytes and then serve them // from memory, avoiding the entire round-trip if, for example, the data for the // file is actually remote. class ReadaheadSequentialFile : public FSSequentialFile { public: ReadaheadSequentialFile(std::unique_ptr&& file, size_t readahead_size) : file_(std::move(file)), alignment_(file_->GetRequiredBufferAlignment()), readahead_size_(Roundup(readahead_size, alignment_)), buffer_(), buffer_offset_(0), read_offset_(0) { buffer_.Alignment(alignment_); buffer_.AllocateNewBuffer(readahead_size_); } ReadaheadSequentialFile(const ReadaheadSequentialFile&) = delete; ReadaheadSequentialFile& operator=(const ReadaheadSequentialFile&) = delete; IOStatus Read(size_t n, const IOOptions& opts, Slice* result, char* scratch, IODebugContext* dbg) override { std::unique_lock lk(lock_); size_t cached_len = 0; // Check if there is a cache hit, meaning that [offset, offset + n) is // either completely or partially in the buffer. If it's completely cached, // including end of file case when offset + n is greater than EOF, then // return. if (TryReadFromCache(n, &cached_len, scratch) && (cached_len == n || buffer_.CurrentSize() < readahead_size_)) { // We read exactly what we needed, or we hit end of file - return. *result = Slice(scratch, cached_len); return IOStatus::OK(); } n -= cached_len; IOStatus s; // Read-ahead only make sense if we have some slack left after reading if (n + alignment_ >= readahead_size_) { s = file_->Read(n, opts, result, scratch + cached_len, dbg); if (s.ok()) { read_offset_ += result->size(); *result = Slice(scratch, cached_len + result->size()); } buffer_.Clear(); return s; } s = ReadIntoBuffer(readahead_size_, opts, dbg); if (s.ok()) { // The data we need is now in cache, so we can safely read it size_t remaining_len; TryReadFromCache(n, &remaining_len, scratch + cached_len); *result = Slice(scratch, cached_len + remaining_len); } return s; } IOStatus Skip(uint64_t n) override { std::unique_lock lk(lock_); IOStatus s = IOStatus::OK(); // First check if we need to skip already cached data if (buffer_.CurrentSize() > 0) { // Do we need to skip beyond cached data? if (read_offset_ + n >= buffer_offset_ + buffer_.CurrentSize()) { // Yes. Skip whaterver is in memory and adjust offset accordingly n -= buffer_offset_ + buffer_.CurrentSize() - read_offset_; read_offset_ = buffer_offset_ + buffer_.CurrentSize(); } else { // No. The entire section to be skipped is entirely i cache. read_offset_ += n; n = 0; } } if (n > 0) { // We still need to skip more, so call the file API for skipping s = file_->Skip(n); if (s.ok()) { read_offset_ += n; } buffer_.Clear(); } return s; } IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& opts, Slice* result, char* scratch, IODebugContext* dbg) override { return file_->PositionedRead(offset, n, opts, result, scratch, dbg); } IOStatus InvalidateCache(size_t offset, size_t length) override { std::unique_lock lk(lock_); buffer_.Clear(); return file_->InvalidateCache(offset, length); } bool use_direct_io() const override { return file_->use_direct_io(); } private: // Tries to read from buffer_ n bytes. If anything was read from the cache, it // sets cached_len to the number of bytes actually read, copies these number // of bytes to scratch and returns true. // If nothing was read sets cached_len to 0 and returns false. bool TryReadFromCache(size_t n, size_t* cached_len, char* scratch) { if (read_offset_ < buffer_offset_ || read_offset_ >= buffer_offset_ + buffer_.CurrentSize()) { *cached_len = 0; return false; } uint64_t offset_in_buffer = read_offset_ - buffer_offset_; *cached_len = std::min( buffer_.CurrentSize() - static_cast(offset_in_buffer), n); memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len); read_offset_ += *cached_len; return true; } // Reads into buffer_ the next n bytes from file_. // Can actually read less if EOF was reached. // Returns the status of the read operastion on the file. IOStatus ReadIntoBuffer(size_t n, const IOOptions& opts, IODebugContext* dbg) { if (n > buffer_.Capacity()) { n = buffer_.Capacity(); } assert(IsFileSectorAligned(n, alignment_)); Slice result; IOStatus s = file_->Read(n, opts, &result, buffer_.BufferStart(), dbg); if (s.ok()) { buffer_offset_ = read_offset_; buffer_.Size(result.size()); assert(result.size() == 0 || buffer_.BufferStart() == result.data()); } return s; } const std::unique_ptr file_; const size_t alignment_; const size_t readahead_size_; std::mutex lock_; // The buffer storing the prefetched data AlignedBuffer buffer_; // The offset in file_, corresponding to data stored in buffer_ uint64_t buffer_offset_; // The offset up to which data was read from file_. In fact, it can be larger // than the actual file size, since the file_->Skip(n) call doesn't return the // actual number of bytes that were skipped, which can be less than n. // This is not a problemm since read_offset_ is monotonically increasing and // its only use is to figure out if next piece of data should be read from // buffer_ or file_ directly. uint64_t read_offset_; }; } // namespace std::unique_ptr SequentialFileReader::NewReadaheadSequentialFile( std::unique_ptr&& file, size_t readahead_size) { if (file->GetRequiredBufferAlignment() >= readahead_size) { // Short-circuit and return the original file if readahead_size is // too small and hence doesn't make sense to be used for prefetching. return std::move(file); } std::unique_ptr result( new ReadaheadSequentialFile(std::move(file), readahead_size)); return result; } } // namespace ROCKSDB_NAMESPACE