Implement smart buffer management.

introduce a new DBOption random_access_max_buffer_size to limit
  the size of the random access buffer used for unbuffered access.
  Implement read ahead buffering when enabled.
  To that effect propagate compaction_readahead_size and the new option
  to the env options to make it available for the implementation.
  Add Hint() override so SetupForCompaction() call would call Hint()
  readahead can now be setup from both Hint() and EnableReadAhead()
  Add new option random_access_max_buffer_size support
  db_bench, options_helper to make it string parsable
  and the unit test.
main
Dmitri Smirnov 9 years ago
parent 7beb743cf5
commit 6fbc4f9f3e
  1. 3
      db/db_bench.cc
  2. 11
      include/rocksdb/env.h
  3. 14
      include/rocksdb/options.h
  4. 152
      port/win/env_win.cc
  5. 2
      util/env.cc
  6. 10
      util/file_reader_writer.cc
  7. 6
      util/options.cc
  8. 3
      util/options_helper.h
  9. 2
      util/options_test.cc

@ -373,6 +373,8 @@ DEFINE_int32(new_table_reader_for_compaction_inputs, true,
DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size");
DEFINE_int32(random_access_max_buffer_size, 1024 * 1024, "Maximum windows randomaccess buffer size");
DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means"
" use default settings.");
DEFINE_int32(memtable_bloom_bits, 0, "Bloom filter bits per key for memtable. "
@ -2295,6 +2297,7 @@ class Benchmark {
options.new_table_reader_for_compaction_inputs =
FLAGS_new_table_reader_for_compaction_inputs;
options.compaction_readahead_size = FLAGS_compaction_readahead_size;
options.random_access_max_buffer_size = FLAGS_random_access_max_buffer_size;
options.statistics = dbstats;
if (FLAGS_enable_io_prio) {
FLAGS_env->LowerThreadPoolIOPriority(Env::LOW);

@ -88,6 +88,12 @@ struct EnvOptions {
// WAL writes
bool fallocate_with_keep_size = true;
// See DBOPtions doc
size_t compaction_readahead_size;
// See DBOPtions doc
size_t random_access_max_buffer_size;
// If not nullptr, write rate limiting is enabled for flush and compaction
RateLimiter* rate_limiter = nullptr;
};
@ -408,6 +414,11 @@ class RandomAccessFile {
return false;
}
// For cases when read-ahead is implemented in the platform dependent
// layer
virtual void EnableReadAhead() {
}
// Tries to get an unique ID for this file that will be the same each time
// the file is opened (and will stay the same while the file is open).
// Furthermore, it tries to make this ID at most "max_size" bytes. If such an

@ -1074,6 +1074,20 @@ struct DBOptions {
// Default: 0
size_t compaction_readahead_size;
// This is a maximum buffer size that is used by WinMmapReadableFile in
// unbuffered disk I/O mode. We need to maintain an aligned buffer for
// reads. We allow the buffer to grow until the specified value and then
// for bigger requests allocate one shot buffers. In unbuffered mode we
// always bypass read-ahead buffer at ReadaheadRandomAccessFile
// When read-ahead is required we then make use of compaction_readahead_size
// value and always try to read ahead. With read-ahead we always
// pre-allocate buffer to the size instead of growing it up to a limit.
//
// This option is currently honored only on Windows
//
// Default: 1 Mb
size_t random_access_max_buffer_size;
// Use adaptive mutex, which spins in the user space before resorting
// to kernel. This could reduce context switch when the mutex is not
// heavily contended. However, if the mutex is hot, we could end up

@ -688,28 +688,98 @@ class WinRandomAccessFile : public RandomAccessFile {
const std::string filename_;
HANDLE hFile_;
const bool use_os_buffer_;
bool read_ahead_;
const size_t compaction_readahead_size_;
const size_t random_access_max_buffer_size_;
mutable std::mutex buffer_mut_;
mutable AlignedBuffer buffer_;
mutable uint64_t
buffered_start_; // file offset set that is currently buffered
/*
* The function reads a requested amount of bytes into the specified aligned buffer
* Upon success the function sets the length of the buffer to the amount of bytes actually
* read even though it might be less than actually requested.
* It then copies the amount of bytes requested by the user (left) to the user supplied
* buffer (dest) and reduces left by the amount of bytes copied to the user buffer
*
* @user_offset [in] - offset on disk where the read was requested by the user
* @first_page_start [in] - actual page aligned disk offset that we want to read from
* @bytes_to_read [in] - total amount of bytes that will be read from disk which is generally
* greater or equal to the amount that the user has requested due to the
* either alignment requirements or read_ahead in effect.
* @left [in/out] total amount of bytes that needs to be copied to the user buffer. It is reduced
* by the amount of bytes that actually copied
* @buffer - buffer to use
* @dest - user supplied buffer
*/
SSIZE_T ReadIntoBuffer(uint64_t user_offset, uint64_t first_page_start,
size_t bytes_to_read, size_t& left, AlignedBuffer& buffer, char* dest) const {
assert(buffer.CurrentSize() == 0);
assert(buffer.Capacity() >= bytes_to_read);
SSIZE_T read = pread(hFile_, buffer.Destination(), bytes_to_read,
first_page_start);
if (read > 0) {
buffer.Size(read);
// Let's figure out how much we read from the users standpoint
if ((first_page_start + buffer.CurrentSize()) > user_offset) {
assert(first_page_start <= user_offset);
size_t buffer_offset = user_offset - first_page_start;
read = buffer.Read(dest, buffer_offset, left);
} else {
read = 0;
}
left -= read;
}
return read;
}
SSIZE_T ReadIntoOneShotBuffer(uint64_t user_offset, uint64_t first_page_start,
size_t bytes_to_read, size_t& left, char* dest) const {
AlignedBuffer bigBuffer;
bigBuffer.Alignment(buffer_.Alignment());
bigBuffer.AllocateNewBuffer(bytes_to_read);
return ReadIntoBuffer(user_offset, first_page_start, bytes_to_read, left,
bigBuffer, dest);
}
SSIZE_T ReadIntoInstanceBuffer(uint64_t user_offset, uint64_t first_page_start,
size_t bytes_to_read, size_t& left, char* dest) const {
SSIZE_T read = ReadIntoBuffer(user_offset, first_page_start, bytes_to_read,
left, buffer_, dest);
if (read > 0) {
buffered_start_ = first_page_start;
}
return read;
}
public:
WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment,
const EnvOptions& options)
: filename_(fname),
hFile_(hFile),
use_os_buffer_(options.use_os_buffer),
read_ahead_(false),
compaction_readahead_size_(options.compaction_readahead_size),
random_access_max_buffer_size_(options.random_access_max_buffer_size),
buffer_(),
buffered_start_(0) {
assert(!options.use_mmap_reads);
// Unbuffered access, use internal buffer for reads
if (!use_os_buffer_) {
// Do not allocate the buffer either until the first request or
// until there is a call to allocate a read-ahead buffer
buffer_.Alignment(alignment);
// Random read, no need in a big buffer
// We read things in database blocks which are likely to be similar to
// the alignment we use.
buffer_.AllocateNewBuffer(alignment * 2);
}
}
@ -719,6 +789,10 @@ class WinRandomAccessFile : public RandomAccessFile {
}
}
virtual void EnableReadAhead() override {
this->Hint(SEQUENTIAL);
}
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
Status s;
@ -730,7 +804,7 @@ class WinRandomAccessFile : public RandomAccessFile {
// - use our own aligned buffer
// - always read at the offset of that is a multiple of alignment
if (!use_os_buffer_) {
std::lock_guard<std::mutex> lg(buffer_mut_);
std::unique_lock<std::mutex> lock(buffer_mut_);
// Let's see if at least some of the requested data is already
// in the buffer
@ -749,40 +823,40 @@ class WinRandomAccessFile : public RandomAccessFile {
if (left > 0) {
// Figure out the start/end offset for reading and amount to read
const size_t alignment = buffer_.Alignment();
const size_t start_page_start =
TruncateToPageBoundary(alignment, offset);
const size_t end_page_start =
TruncateToPageBoundary(alignment, offset + left - 1);
const size_t actual_bytes_toread =
(end_page_start - start_page_start) + alignment;
const size_t first_page_start =
TruncateToPageBoundary(alignment, offset);
if (buffer_.Capacity() < actual_bytes_toread) {
buffer_.AllocateNewBuffer(actual_bytes_toread);
} else {
buffer_.Clear();
size_t bytes_requested = left;
if (read_ahead_ && bytes_requested < compaction_readahead_size_) {
bytes_requested = compaction_readahead_size_;
}
SSIZE_T read = 0;
read = pread(hFile_, buffer_.Destination(), actual_bytes_toread,
start_page_start);
if (read > 0) {
buffer_.Size(read);
buffered_start_ = start_page_start;
const size_t last_page_start =
TruncateToPageBoundary(alignment, offset + bytes_requested - 1);
const size_t actual_bytes_toread =
(last_page_start - first_page_start) + alignment;
// Let's figure out how much we read from the users standpoint
if ((buffered_start_ + uint64_t(read)) > offset) {
size_t buffer_offset = offset - buffered_start_;
r = buffer_.Read(dest, buffer_offset, left);
if (buffer_.Capacity() < actual_bytes_toread) {
// If we are in read-ahead mode or the requested size
// exceeds max buffer size then use one-shot
// big buffer otherwise reallocate main buffer
if (read_ahead_ ||
(actual_bytes_toread > random_access_max_buffer_size_)) {
// Unlock the mutex since we are not using instance buffer
lock.unlock();
r = ReadIntoOneShotBuffer(offset, first_page_start,
actual_bytes_toread, left, dest);
} else {
r = 0;
buffer_.AllocateNewBuffer(actual_bytes_toread);
r = ReadIntoInstanceBuffer(offset, first_page_start,
actual_bytes_toread, left, dest);
}
left -= r;
} else {
r = read;
buffer_.Clear();
r = ReadIntoInstanceBuffer(offset, first_page_start,
actual_bytes_toread, left, dest);
}
}
} else {
r = pread(hFile_, scratch, left, offset);
if (r > 0) {
@ -802,7 +876,23 @@ class WinRandomAccessFile : public RandomAccessFile {
return true;
}
virtual void Hint(AccessPattern pattern) override {}
virtual void Hint(AccessPattern pattern) override {
if (pattern == SEQUENTIAL &&
!use_os_buffer_ &&
compaction_readahead_size_ > 0) {
std::lock_guard<std::mutex> lg(buffer_mut_);
if (!read_ahead_) {
read_ahead_ = true;
// This would allocate read-ahead size + 2 alignments
// - one for memory alignment which added implicitly by AlignedBuffer
// - We add one more alignment because we will read one alignment more
// from disk
buffer_.AllocateNewBuffer(compaction_readahead_size_ + buffer_.Alignment());
}
}
}
virtual Status InvalidateCache(size_t offset, size_t length) override {
return Status::OK();

@ -292,6 +292,8 @@ void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) {
env_options->use_mmap_writes = options.allow_mmap_writes;
env_options->set_fd_cloexec = options.is_fd_close_on_exec;
env_options->bytes_per_sync = options.bytes_per_sync;
env_options->compaction_readahead_size = options.compaction_readahead_size;
env_options->random_access_max_buffer_size = options.random_access_max_buffer_size;
env_options->rate_limiter = options.rate_limiter.get();
env_options->allow_fallocate = options.allow_fallocate;
}

@ -384,9 +384,15 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
: file_(std::move(file)),
readahead_size_(readahead_size),
forward_calls_(file_->ShouldForwardRawRequest()),
buffer_(new char[readahead_size_]),
buffer_(),
buffer_offset_(0),
buffer_len_(0) {}
buffer_len_(0) {
if (!forward_calls_) {
buffer_.reset(new char[readahead_size_]);
} else if (readahead_size_ > 0) {
file_->EnableReadAhead();
}
}
ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete;

@ -249,6 +249,7 @@ DBOptions::DBOptions()
access_hint_on_compaction_start(NORMAL),
new_table_reader_for_compaction_inputs(false),
compaction_readahead_size(0),
random_access_max_buffer_size(1024 * 1024),
use_adaptive_mutex(false),
bytes_per_sync(0),
wal_bytes_per_sync(0),
@ -305,6 +306,7 @@ DBOptions::DBOptions(const Options& options)
new_table_reader_for_compaction_inputs(
options.new_table_reader_for_compaction_inputs),
compaction_readahead_size(options.compaction_readahead_size),
random_access_max_buffer_size(options.random_access_max_buffer_size),
use_adaptive_mutex(options.use_adaptive_mutex),
bytes_per_sync(options.bytes_per_sync),
wal_bytes_per_sync(options.wal_bytes_per_sync),
@ -393,6 +395,10 @@ void DBOptions::Dump(Logger* log) const {
" Options.compaction_readahead_size: %" ROCKSDB_PRIszt
"d",
compaction_readahead_size);
Header(log,
" Options.random_access_max_buffer_size: %" ROCKSDB_PRIszt
"d",
random_access_max_buffer_size);
Header(log, " Options.use_adaptive_mutex: %d",
use_adaptive_mutex);
Header(log, " Options.rate_limiter: %p",

@ -180,6 +180,9 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
{"compaction_readahead_size",
{offsetof(struct DBOptions, compaction_readahead_size), OptionType::kSizeT,
OptionVerificationType::kNormal}},
{"random_access_max_buffer_size",
{ offsetof(struct DBOptions, random_access_max_buffer_size), OptionType::kSizeT,
OptionVerificationType::kNormal}},
{"use_adaptive_mutex",
{offsetof(struct DBOptions, use_adaptive_mutex), OptionType::kBoolean,
OptionVerificationType::kNormal}},

@ -339,6 +339,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"use_adaptive_mutex", "false"},
{"new_table_reader_for_compaction_inputs", "true"},
{"compaction_readahead_size", "100"},
{"random_access_max_buffer_size", "3145728" },
{"bytes_per_sync", "47"},
{"wal_bytes_per_sync", "48"},
};
@ -449,6 +450,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_db_opt.use_adaptive_mutex, false);
ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true);
ASSERT_EQ(new_db_opt.compaction_readahead_size, 100);
ASSERT_EQ(new_db_opt.random_access_max_buffer_size, 3145728);
ASSERT_EQ(new_db_opt.bytes_per_sync, static_cast<uint64_t>(47));
ASSERT_EQ(new_db_opt.wal_bytes_per_sync, static_cast<uint64_t>(48));
}

Loading…
Cancel
Save