diff --git a/HISTORY.md b/HISTORY.md index bae246654..60c0cceba 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,7 @@ * Add a new option BlockBasedTableOptions::max_auto_readahead_size. RocksDB does auto-readahead for iterators on noticing more than two reads for a table file if user doesn't provide readahead_size. The readahead starts at 8KB and doubles on every additional read upto max_auto_readahead_size and now max_auto_readahead_size can be configured dynamically as well. Found that 256 KB readahead size provides the best performance, based on experiments, for auto readahead. Experiment data is in PR #3282. If value is set 0 then no automatic prefetching will be done by rocksdb. Also changing the value will only affect files opened after the change. * Add suppport to extend DB::VerifyFileChecksums API to also verify blob files checksum. * When using the new BlobDB, the amount of data written by flushes/compactions is now broken down into table files and blob files in the compaction statistics; namely, Write(GB) denotes the amount of data written to table files, while Wblob(GB) means the amount of data written to blob files. +* Add new SetBufferSize API to WriteBufferManager to allow dynamic management of memory allotted to all write buffers. This allows user code to adjust memory monitoring provided by WriteBufferManager as process memory needs change datasets grow and shrink. ### New Features * Support compaction filters for the new implementation of BlobDB. Add `FilterBlobByKey()` to `CompactionFilter`. Subclasses can override this method so that compaction filters can determine whether the actual blob value has to be read during compaction. Use a new `kUndetermined` in `CompactionFilter::Decision` to indicated that further action is necessary for compaction filter to make a decision. diff --git a/include/rocksdb/write_buffer_manager.h b/include/rocksdb/write_buffer_manager.h index c1d1300ae..aa44c1406 100644 --- a/include/rocksdb/write_buffer_manager.h +++ b/include/rocksdb/write_buffer_manager.h @@ -32,7 +32,7 @@ class WriteBufferManager { ~WriteBufferManager(); - bool enabled() const { return buffer_size_ != 0; } + bool enabled() const { return buffer_size() > 0; } bool cost_to_cache() const { return cache_rep_ != nullptr; } @@ -46,16 +46,20 @@ class WriteBufferManager { size_t dummy_entries_in_cache_usage() const { return dummy_size_.load(std::memory_order_relaxed); } - size_t buffer_size() const { return buffer_size_; } + size_t buffer_size() const { + return buffer_size_.load(std::memory_order_relaxed); + } // Should only be called from write thread bool ShouldFlush() const { if (enabled()) { - if (mutable_memtable_memory_usage() > mutable_limit_) { + if (mutable_memtable_memory_usage() > + mutable_limit_.load(std::memory_order_relaxed)) { return true; } - if (memory_usage() >= buffer_size_ && - mutable_memtable_memory_usage() >= buffer_size_ / 2) { + size_t local_size = buffer_size(); + if (memory_usage() >= local_size && + mutable_memtable_memory_usage() >= local_size / 2) { // If the memory exceeds the buffer size, we trigger more aggressive // flush. But if already more than half memory is being flushed, // triggering more flush may not help. We will hold it instead. @@ -90,9 +94,14 @@ class WriteBufferManager { } } + void SetBufferSize(size_t new_size) { + buffer_size_.store(new_size, std::memory_order_relaxed); + mutable_limit_.store(new_size * 7 / 8, std::memory_order_relaxed); + } + private: - const size_t buffer_size_; - const size_t mutable_limit_; + std::atomic buffer_size_; + std::atomic mutable_limit_; std::atomic memory_used_; // Memory that hasn't been scheduled to free. std::atomic memory_active_; diff --git a/memtable/write_buffer_manager_test.cc b/memtable/write_buffer_manager_test.cc index f58320588..7e3de41d1 100644 --- a/memtable/write_buffer_manager_test.cc +++ b/memtable/write_buffer_manager_test.cc @@ -47,7 +47,33 @@ TEST_F(WriteBufferManagerTest, ShouldFlush) { ASSERT_TRUE(wbf->ShouldFlush()); wbf->FreeMem(7 * 1024 * 1024); - // 9MB total, 8MB mutable. + // 8MB total, 8MB mutable. + ASSERT_FALSE(wbf->ShouldFlush()); + + // change size: 8M limit, 7M mutable limit + wbf->SetBufferSize(8 * 1024 * 1024); + // 8MB total, 8MB mutable. + ASSERT_TRUE(wbf->ShouldFlush()); + + wbf->ScheduleFreeMem(2 * 1024 * 1024); + // 8MB total, 6MB mutable. + ASSERT_TRUE(wbf->ShouldFlush()); + + wbf->FreeMem(2 * 1024 * 1024); + // 6MB total, 6MB mutable. + ASSERT_FALSE(wbf->ShouldFlush()); + + wbf->ReserveMem(1 * 1024 * 1024); + // 7MB total, 7MB mutable. + ASSERT_FALSE(wbf->ShouldFlush()); + + wbf->ReserveMem(1 * 1024 * 1024); + // 8MB total, 8MB mutable. + ASSERT_TRUE(wbf->ShouldFlush()); + + wbf->ScheduleFreeMem(1 * 1024 * 1024); + wbf->FreeMem(1 * 1024 * 1024); + // 7MB total, 7MB mutable. ASSERT_FALSE(wbf->ShouldFlush()); }