diff --git a/db/db_bench.cc b/db/db_bench.cc index 9e11b56e8..9ba04a24b 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -373,6 +373,8 @@ DEFINE_int32(new_table_reader_for_compaction_inputs, true, DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size"); +DEFINE_int32(random_access_max_buffer_size, 1024 * 1024, "Maximum windows randomaccess buffer size"); + DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means" " use default settings."); DEFINE_int32(memtable_bloom_bits, 0, "Bloom filter bits per key for memtable. " @@ -2295,6 +2297,7 @@ class Benchmark { options.new_table_reader_for_compaction_inputs = FLAGS_new_table_reader_for_compaction_inputs; options.compaction_readahead_size = FLAGS_compaction_readahead_size; + options.random_access_max_buffer_size = FLAGS_random_access_max_buffer_size; options.statistics = dbstats; if (FLAGS_enable_io_prio) { FLAGS_env->LowerThreadPoolIOPriority(Env::LOW); diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 1dfd0f997..7290d4b1d 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -88,6 +88,12 @@ struct EnvOptions { // WAL writes bool fallocate_with_keep_size = true; + // See DBOPtions doc + size_t compaction_readahead_size; + + // See DBOPtions doc + size_t random_access_max_buffer_size; + // If not nullptr, write rate limiting is enabled for flush and compaction RateLimiter* rate_limiter = nullptr; }; @@ -408,6 +414,11 @@ class RandomAccessFile { return false; } + // For cases when read-ahead is implemented in the platform dependent + // layer + virtual void EnableReadAhead() { + } + // Tries to get an unique ID for this file that will be the same each time // the file is opened (and will stay the same while the file is open). // Furthermore, it tries to make this ID at most "max_size" bytes. If such an diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index a9830221a..470faa52f 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1074,6 +1074,20 @@ struct DBOptions { // Default: 0 size_t compaction_readahead_size; + // This is a maximum buffer size that is used by WinMmapReadableFile in + // unbuffered disk I/O mode. We need to maintain an aligned buffer for + // reads. We allow the buffer to grow until the specified value and then + // for bigger requests allocate one shot buffers. In unbuffered mode we + // always bypass read-ahead buffer at ReadaheadRandomAccessFile + // When read-ahead is required we then make use of compaction_readahead_size + // value and always try to read ahead. With read-ahead we always + // pre-allocate buffer to the size instead of growing it up to a limit. + // + // This option is currently honored only on Windows + // + // Default: 1 Mb + size_t random_access_max_buffer_size; + // Use adaptive mutex, which spins in the user space before resorting // to kernel. This could reduce context switch when the mutex is not // heavily contended. However, if the mutex is hot, we could end up diff --git a/port/win/env_win.cc b/port/win/env_win.cc index c55cd3039..b67298150 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -688,28 +688,98 @@ class WinRandomAccessFile : public RandomAccessFile { const std::string filename_; HANDLE hFile_; const bool use_os_buffer_; + bool read_ahead_; + const size_t compaction_readahead_size_; + const size_t random_access_max_buffer_size_; mutable std::mutex buffer_mut_; mutable AlignedBuffer buffer_; mutable uint64_t buffered_start_; // file offset set that is currently buffered + /* + * The function reads a requested amount of bytes into the specified aligned buffer + * Upon success the function sets the length of the buffer to the amount of bytes actually + * read even though it might be less than actually requested. + * It then copies the amount of bytes requested by the user (left) to the user supplied + * buffer (dest) and reduces left by the amount of bytes copied to the user buffer + * + * @user_offset [in] - offset on disk where the read was requested by the user + * @first_page_start [in] - actual page aligned disk offset that we want to read from + * @bytes_to_read [in] - total amount of bytes that will be read from disk which is generally + * greater or equal to the amount that the user has requested due to the + * either alignment requirements or read_ahead in effect. + * @left [in/out] total amount of bytes that needs to be copied to the user buffer. It is reduced + * by the amount of bytes that actually copied + * @buffer - buffer to use + * @dest - user supplied buffer + */ + SSIZE_T ReadIntoBuffer(uint64_t user_offset, uint64_t first_page_start, + size_t bytes_to_read, size_t& left, AlignedBuffer& buffer, char* dest) const { + + assert(buffer.CurrentSize() == 0); + assert(buffer.Capacity() >= bytes_to_read); + + SSIZE_T read = pread(hFile_, buffer.Destination(), bytes_to_read, + first_page_start); + + if (read > 0) { + buffer.Size(read); + + // Let's figure out how much we read from the users standpoint + if ((first_page_start + buffer.CurrentSize()) > user_offset) { + assert(first_page_start <= user_offset); + size_t buffer_offset = user_offset - first_page_start; + read = buffer.Read(dest, buffer_offset, left); + } else { + read = 0; + } + left -= read; + } + return read; + } + + SSIZE_T ReadIntoOneShotBuffer(uint64_t user_offset, uint64_t first_page_start, + size_t bytes_to_read, size_t& left, char* dest) const { + + AlignedBuffer bigBuffer; + bigBuffer.Alignment(buffer_.Alignment()); + bigBuffer.AllocateNewBuffer(bytes_to_read); + + return ReadIntoBuffer(user_offset, first_page_start, bytes_to_read, left, + bigBuffer, dest); + } + + SSIZE_T ReadIntoInstanceBuffer(uint64_t user_offset, uint64_t first_page_start, + size_t bytes_to_read, size_t& left, char* dest) const { + + SSIZE_T read = ReadIntoBuffer(user_offset, first_page_start, bytes_to_read, + left, buffer_, dest); + + if (read > 0) { + buffered_start_ = first_page_start; + } + + return read; + } + public: WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment, const EnvOptions& options) : filename_(fname), hFile_(hFile), use_os_buffer_(options.use_os_buffer), + read_ahead_(false), + compaction_readahead_size_(options.compaction_readahead_size), + random_access_max_buffer_size_(options.random_access_max_buffer_size), buffer_(), buffered_start_(0) { assert(!options.use_mmap_reads); // Unbuffered access, use internal buffer for reads if (!use_os_buffer_) { + // Do not allocate the buffer either until the first request or + // until there is a call to allocate a read-ahead buffer buffer_.Alignment(alignment); - // Random read, no need in a big buffer - // We read things in database blocks which are likely to be similar to - // the alignment we use. - buffer_.AllocateNewBuffer(alignment * 2); } } @@ -719,6 +789,10 @@ class WinRandomAccessFile : public RandomAccessFile { } } + virtual void EnableReadAhead() override { + this->Hint(SEQUENTIAL); + } + virtual Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const override { Status s; @@ -730,7 +804,7 @@ class WinRandomAccessFile : public RandomAccessFile { // - use our own aligned buffer // - always read at the offset of that is a multiple of alignment if (!use_os_buffer_) { - std::lock_guard lg(buffer_mut_); + std::unique_lock lock(buffer_mut_); // Let's see if at least some of the requested data is already // in the buffer @@ -749,40 +823,40 @@ class WinRandomAccessFile : public RandomAccessFile { if (left > 0) { // Figure out the start/end offset for reading and amount to read const size_t alignment = buffer_.Alignment(); - const size_t start_page_start = - TruncateToPageBoundary(alignment, offset); - const size_t end_page_start = - TruncateToPageBoundary(alignment, offset + left - 1); - const size_t actual_bytes_toread = - (end_page_start - start_page_start) + alignment; + const size_t first_page_start = + TruncateToPageBoundary(alignment, offset); - if (buffer_.Capacity() < actual_bytes_toread) { - buffer_.AllocateNewBuffer(actual_bytes_toread); - } else { - buffer_.Clear(); + size_t bytes_requested = left; + if (read_ahead_ && bytes_requested < compaction_readahead_size_) { + bytes_requested = compaction_readahead_size_; } - SSIZE_T read = 0; - read = pread(hFile_, buffer_.Destination(), actual_bytes_toread, - start_page_start); - - if (read > 0) { - buffer_.Size(read); - buffered_start_ = start_page_start; + const size_t last_page_start = + TruncateToPageBoundary(alignment, offset + bytes_requested - 1); + const size_t actual_bytes_toread = + (last_page_start - first_page_start) + alignment; - // Let's figure out how much we read from the users standpoint - if ((buffered_start_ + uint64_t(read)) > offset) { - size_t buffer_offset = offset - buffered_start_; - r = buffer_.Read(dest, buffer_offset, left); + if (buffer_.Capacity() < actual_bytes_toread) { + // If we are in read-ahead mode or the requested size + // exceeds max buffer size then use one-shot + // big buffer otherwise reallocate main buffer + if (read_ahead_ || + (actual_bytes_toread > random_access_max_buffer_size_)) { + // Unlock the mutex since we are not using instance buffer + lock.unlock(); + r = ReadIntoOneShotBuffer(offset, first_page_start, + actual_bytes_toread, left, dest); } else { - r = 0; + buffer_.AllocateNewBuffer(actual_bytes_toread); + r = ReadIntoInstanceBuffer(offset, first_page_start, + actual_bytes_toread, left, dest); } - left -= r; } else { - r = read; + buffer_.Clear(); + r = ReadIntoInstanceBuffer(offset, first_page_start, + actual_bytes_toread, left, dest); } } - } else { r = pread(hFile_, scratch, left, offset); if (r > 0) { @@ -802,7 +876,23 @@ class WinRandomAccessFile : public RandomAccessFile { return true; } - virtual void Hint(AccessPattern pattern) override {} + virtual void Hint(AccessPattern pattern) override { + + if (pattern == SEQUENTIAL && + !use_os_buffer_ && + compaction_readahead_size_ > 0) { + std::lock_guard lg(buffer_mut_); + if (!read_ahead_) { + read_ahead_ = true; + // This would allocate read-ahead size + 2 alignments + // - one for memory alignment which added implicitly by AlignedBuffer + // - We add one more alignment because we will read one alignment more + // from disk + buffer_.AllocateNewBuffer(compaction_readahead_size_ + buffer_.Alignment()); + } + } + } + virtual Status InvalidateCache(size_t offset, size_t length) override { return Status::OK(); diff --git a/util/env.cc b/util/env.cc index df45d9804..2c2339eaf 100644 --- a/util/env.cc +++ b/util/env.cc @@ -292,6 +292,8 @@ void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) { env_options->use_mmap_writes = options.allow_mmap_writes; env_options->set_fd_cloexec = options.is_fd_close_on_exec; env_options->bytes_per_sync = options.bytes_per_sync; + env_options->compaction_readahead_size = options.compaction_readahead_size; + env_options->random_access_max_buffer_size = options.random_access_max_buffer_size; env_options->rate_limiter = options.rate_limiter.get(); env_options->allow_fallocate = options.allow_fallocate; } diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index ff459262c..95e8c3998 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -384,9 +384,15 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { : file_(std::move(file)), readahead_size_(readahead_size), forward_calls_(file_->ShouldForwardRawRequest()), - buffer_(new char[readahead_size_]), + buffer_(), buffer_offset_(0), - buffer_len_(0) {} + buffer_len_(0) { + if (!forward_calls_) { + buffer_.reset(new char[readahead_size_]); + } else if (readahead_size_ > 0) { + file_->EnableReadAhead(); + } + } ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete; diff --git a/util/options.cc b/util/options.cc index 70bcf7063..ff7f5b2ce 100644 --- a/util/options.cc +++ b/util/options.cc @@ -249,6 +249,7 @@ DBOptions::DBOptions() access_hint_on_compaction_start(NORMAL), new_table_reader_for_compaction_inputs(false), compaction_readahead_size(0), + random_access_max_buffer_size(1024 * 1024), use_adaptive_mutex(false), bytes_per_sync(0), wal_bytes_per_sync(0), @@ -305,6 +306,7 @@ DBOptions::DBOptions(const Options& options) new_table_reader_for_compaction_inputs( options.new_table_reader_for_compaction_inputs), compaction_readahead_size(options.compaction_readahead_size), + random_access_max_buffer_size(options.random_access_max_buffer_size), use_adaptive_mutex(options.use_adaptive_mutex), bytes_per_sync(options.bytes_per_sync), wal_bytes_per_sync(options.wal_bytes_per_sync), @@ -393,6 +395,10 @@ void DBOptions::Dump(Logger* log) const { " Options.compaction_readahead_size: %" ROCKSDB_PRIszt "d", compaction_readahead_size); + Header(log, + " Options.random_access_max_buffer_size: %" ROCKSDB_PRIszt + "d", + random_access_max_buffer_size); Header(log, " Options.use_adaptive_mutex: %d", use_adaptive_mutex); Header(log, " Options.rate_limiter: %p", diff --git a/util/options_helper.h b/util/options_helper.h index 3f6eab40a..052bafd25 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -180,6 +180,9 @@ static std::unordered_map db_options_type_info = { {"compaction_readahead_size", {offsetof(struct DBOptions, compaction_readahead_size), OptionType::kSizeT, OptionVerificationType::kNormal}}, + {"random_access_max_buffer_size", + { offsetof(struct DBOptions, random_access_max_buffer_size), OptionType::kSizeT, + OptionVerificationType::kNormal}}, {"use_adaptive_mutex", {offsetof(struct DBOptions, use_adaptive_mutex), OptionType::kBoolean, OptionVerificationType::kNormal}}, diff --git a/util/options_test.cc b/util/options_test.cc index e1849bb3f..542151adb 100644 --- a/util/options_test.cc +++ b/util/options_test.cc @@ -339,6 +339,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"use_adaptive_mutex", "false"}, {"new_table_reader_for_compaction_inputs", "true"}, {"compaction_readahead_size", "100"}, + {"random_access_max_buffer_size", "3145728" }, {"bytes_per_sync", "47"}, {"wal_bytes_per_sync", "48"}, }; @@ -449,6 +450,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.use_adaptive_mutex, false); ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true); ASSERT_EQ(new_db_opt.compaction_readahead_size, 100); + ASSERT_EQ(new_db_opt.random_access_max_buffer_size, 3145728); ASSERT_EQ(new_db_opt.bytes_per_sync, static_cast(47)); ASSERT_EQ(new_db_opt.wal_bytes_per_sync, static_cast(48)); }