diff --git a/CMakeLists.txt b/CMakeLists.txt index f81e0ca4f..a094d3261 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -530,6 +530,7 @@ set(SOURCES db/table_cache.cc db/table_properties_collector.cc db/transaction_log_impl.cc + db/trim_history_scheduler.cc db/version_builder.cc db/version_edit.cc db/version_set.cc diff --git a/HISTORY.md b/HISTORY.md index a87d06f90..da06ca228 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,8 @@ * Fix bloom filter lookups by the MultiGet batching API when BlockBasedTableOptions::whole_key_filtering is false, by checking that a key is in the perfix_extractor domain and extracting the prefix before looking up. ### New Features * VerifyChecksum() by default will issue readahead. Allow ReadOptions to be passed in to those functions to override the readhead size. For checksum verifying before external SST file ingestion, a new option IngestExternalFileOptions.verify_checksums_readahead_size, is added for this readahead setting. +### Public API Change +* Added max_write_buffer_size_to_maintain option to better control memory usage of immutable memtables. ## 6.4.0 (7/30/2019) ### Default Option Change diff --git a/TARGETS b/TARGETS index bac5c4311..058e591e8 100644 --- a/TARGETS +++ b/TARGETS @@ -158,6 +158,7 @@ cpp_library( "db/table_cache.cc", "db/table_properties_collector.cc", "db/transaction_log_impl.cc", + "db/trim_history_scheduler.cc", "db/version_builder.cc", "db/version_edit.cc", "db/version_set.cc", diff --git a/db/c.cc b/db/c.cc index 4d40558f6..66e3892af 100644 --- a/db/c.cc +++ b/db/c.cc @@ -2514,6 +2514,11 @@ void rocksdb_options_set_max_write_buffer_number_to_maintain( opt->rep.max_write_buffer_number_to_maintain = n; } +void rocksdb_options_set_max_write_buffer_size_to_maintain( + rocksdb_options_t* opt, int64_t n) { + opt->rep.max_write_buffer_size_to_maintain = n; +} + void rocksdb_options_set_enable_pipelined_write(rocksdb_options_t* opt, unsigned char v) { opt->rep.enable_pipelined_write = v; diff --git a/db/column_family.cc b/db/column_family.cc index e135c2d31..4c67b7d76 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -227,7 +227,14 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options, if (result.max_write_buffer_number < 2) { result.max_write_buffer_number = 2; } - if (result.max_write_buffer_number_to_maintain < 0) { + // fall back max_write_buffer_number_to_maintain if + // max_write_buffer_size_to_maintain is not set + if (result.max_write_buffer_size_to_maintain < 0) { + result.max_write_buffer_size_to_maintain = + result.max_write_buffer_number * + static_cast(result.write_buffer_size); + } else if (result.max_write_buffer_size_to_maintain == 0 && + result.max_write_buffer_number_to_maintain < 0) { result.max_write_buffer_number_to_maintain = result.max_write_buffer_number; } // bloom filter size shouldn't exceed 1/4 of memtable size. @@ -423,7 +430,8 @@ ColumnFamilyData::ColumnFamilyData( write_buffer_manager_(write_buffer_manager), mem_(nullptr), imm_(ioptions_.min_write_buffer_number_to_merge, - ioptions_.max_write_buffer_number_to_maintain), + ioptions_.max_write_buffer_number_to_maintain, + ioptions_.max_write_buffer_size_to_maintain), super_version_(nullptr), super_version_number_(0), local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)), diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 235313f48..95c43ac5a 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -1132,22 +1132,25 @@ TEST_P(ColumnFamilyTest, DifferentWriteBufferSizes) { default_cf.arena_block_size = 4 * 4096; default_cf.max_write_buffer_number = 10; default_cf.min_write_buffer_number_to_merge = 1; - default_cf.max_write_buffer_number_to_maintain = 0; + default_cf.max_write_buffer_size_to_maintain = 0; one.write_buffer_size = 200000; one.arena_block_size = 4 * 4096; one.max_write_buffer_number = 10; one.min_write_buffer_number_to_merge = 2; - one.max_write_buffer_number_to_maintain = 1; + one.max_write_buffer_size_to_maintain = + static_cast(one.write_buffer_size); two.write_buffer_size = 1000000; two.arena_block_size = 4 * 4096; two.max_write_buffer_number = 10; two.min_write_buffer_number_to_merge = 3; - two.max_write_buffer_number_to_maintain = 2; + two.max_write_buffer_size_to_maintain = + static_cast(two.write_buffer_size); three.write_buffer_size = 4096 * 22; three.arena_block_size = 4096; three.max_write_buffer_number = 10; three.min_write_buffer_number_to_merge = 4; - three.max_write_buffer_number_to_maintain = -1; + three.max_write_buffer_size_to_maintain = + static_cast(three.write_buffer_size); Reopen({default_cf, one, two, three}); diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 828eaaaeb..963dde6ce 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -297,7 +297,7 @@ TEST_F(DBBasicTest, FlushMultipleMemtable) { writeOpt.disableWAL = true; options.max_write_buffer_number = 4; options.min_write_buffer_number_to_merge = 3; - options.max_write_buffer_number_to_maintain = -1; + options.max_write_buffer_size_to_maintain = -1; CreateAndReopenWithCF({"pikachu"}, options); ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1")); ASSERT_OK(Flush(1)); @@ -327,7 +327,8 @@ TEST_F(DBBasicTest, FlushEmptyColumnFamily) { writeOpt.disableWAL = true; options.max_write_buffer_number = 2; options.min_write_buffer_number_to_merge = 1; - options.max_write_buffer_number_to_maintain = 1; + options.max_write_buffer_size_to_maintain = + static_cast(options.write_buffer_size); CreateAndReopenWithCF({"pikachu"}, options); // Compaction can still go through even if no thread can flush the diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 7f639c853..45ba17011 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -141,7 +141,7 @@ Options DeletionTriggerOptions(Options options) { options.compression = kNoCompression; options.write_buffer_size = kCDTKeysPerBuffer * (kCDTValueSize + 24); options.min_write_buffer_number_to_merge = 1; - options.max_write_buffer_number_to_maintain = 0; + options.max_write_buffer_size_to_maintain = 0; options.num_levels = kCDTNumLevels; options.level0_file_num_compaction_trigger = 1; options.target_file_size_base = options.write_buffer_size * 2; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index dae879346..587f1d8d9 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -472,6 +472,7 @@ Status DBImpl::CloseHelper() { &files_grabbed_for_purge_); EraseThreadStatusDbInfo(); flush_scheduler_.Clear(); + trim_history_scheduler_.Clear(); while (!flush_queue_.empty()) { const FlushRequest& flush_req = PopFirstFromFlushQueue(); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 42377ce41..dcefc4753 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -37,6 +37,7 @@ #include "db/read_callback.h" #include "db/snapshot_checker.h" #include "db/snapshot_impl.h" +#include "db/trim_history_scheduler.h" #include "db/version_edit.h" #include "db/wal_manager.h" #include "db/write_controller.h" @@ -1355,6 +1356,8 @@ class DBImpl : public DB { void MaybeFlushStatsCF(autovector* cfds); + Status TrimMemtableHistory(WriteContext* context); + Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context); void SelectColumnFamiliesForAtomicFlush(autovector* cfds); @@ -1733,6 +1736,8 @@ class DBImpl : public DB { FlushScheduler flush_scheduler_; + TrimHistoryScheduler trim_history_scheduler_; + SnapshotList snapshots_; // For each background job, pending_outputs_ keeps the current file number at diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 0e0fcfbf2..737e3a660 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -862,9 +862,10 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // That's why we set ignore missing column families to true bool has_valid_writes = false; status = WriteBatchInternal::InsertInto( - &batch, column_family_memtables_.get(), &flush_scheduler_, true, - log_number, this, false /* concurrent_memtable_writes */, - next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_); + &batch, column_family_memtables_.get(), &flush_scheduler_, + &trim_history_scheduler_, true, log_number, this, + false /* concurrent_memtable_writes */, next_sequence, + &has_valid_writes, seq_per_batch_, batch_per_txn_); MaybeIgnoreError(&status); if (!status.ok()) { // We are treating this as a failure while reading since we read valid @@ -931,6 +932,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, } flush_scheduler_.Clear(); + trim_history_scheduler_.Clear(); auto last_sequence = *next_sequence - 1; if ((*next_sequence != kMaxSequenceNumber) && (versions_->LastSequence() <= last_sequence)) { diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index a73cd6ba2..1a55c328e 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -253,9 +253,9 @@ Status DBImplSecondary::RecoverLogFiles( bool has_valid_writes = false; status = WriteBatchInternal::InsertInto( &batch, column_family_memtables_.get(), - nullptr /* flush_scheduler */, true, log_number, this, - false /* concurrent_memtable_writes */, next_sequence, - &has_valid_writes, seq_per_batch_, batch_per_txn_); + nullptr /* flush_scheduler */, nullptr /* trim_history_scheduler*/, + true, log_number, this, false /* concurrent_memtable_writes */, + next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_); } // If column family was not found, it might mean that the WAL write // batch references to the column family that was dropped after the diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 0ad2a3e9a..d15165122 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -171,6 +171,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, versions_->GetColumnFamilySet()); w.status = WriteBatchInternal::InsertInto( &w, w.sequence, &column_family_memtables, &flush_scheduler_, + &trim_history_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt, batch_per_txn_); @@ -375,7 +376,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // w.sequence will be set inside InsertInto w.status = WriteBatchInternal::InsertInto( write_group, current_sequence, column_family_memtables_.get(), - &flush_scheduler_, write_options.ignore_missing_column_families, + &flush_scheduler_, &trim_history_scheduler_, + write_options.ignore_missing_column_families, 0 /*recovery_log_number*/, this, parallel, seq_per_batch_, batch_per_txn_); } else { @@ -391,6 +393,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, assert(w.sequence == current_sequence); w.status = WriteBatchInternal::InsertInto( &w, w.sequence, &column_family_memtables, &flush_scheduler_, + &trim_history_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt, batch_per_txn_); @@ -545,9 +548,9 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, } else { memtable_write_group.status = WriteBatchInternal::InsertInto( memtable_write_group, w.sequence, column_family_memtables_.get(), - &flush_scheduler_, write_options.ignore_missing_column_families, - 0 /*log_number*/, this, false /*concurrent_memtable_writes*/, - seq_per_batch_, batch_per_txn_); + &flush_scheduler_, &trim_history_scheduler_, + write_options.ignore_missing_column_families, 0 /*log_number*/, this, + false /*concurrent_memtable_writes*/, seq_per_batch_, batch_per_txn_); versions_->SetLastSequence(memtable_write_group.last_sequence); write_thread_.ExitAsMemTableWriter(&w, memtable_write_group); } @@ -559,8 +562,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, versions_->GetColumnFamilySet()); w.status = WriteBatchInternal::InsertInto( &w, w.sequence, &column_family_memtables, &flush_scheduler_, - write_options.ignore_missing_column_families, 0 /*log_number*/, this, - true /*concurrent_memtable_writes*/); + &trim_history_scheduler_, write_options.ignore_missing_column_families, + 0 /*log_number*/, this, true /*concurrent_memtable_writes*/); if (write_thread_.CompleteParallelMemTableWriter(&w)) { MemTableInsertStatusCheck(w.status); versions_->SetLastSequence(w.write_group->last_sequence); @@ -597,8 +600,9 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options, versions_->GetColumnFamilySet()); w.status = WriteBatchInternal::InsertInto( &w, w.sequence, &column_family_memtables, &flush_scheduler_, - write_options.ignore_missing_column_families, 0 /*log_number*/, this, - true /*concurrent_memtable_writes*/, seq_per_batch_, sub_batch_cnt); + &trim_history_scheduler_, write_options.ignore_missing_column_families, + 0 /*log_number*/, this, true /*concurrent_memtable_writes*/, + seq_per_batch_, sub_batch_cnt); WriteStatusCheck(w.status); if (write_options.disableWAL) { @@ -856,6 +860,10 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, status = HandleWriteBufferFull(write_context); } + if (UNLIKELY(status.ok() && !trim_history_scheduler_.Empty())) { + status = TrimMemtableHistory(write_context); + } + if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) { WaitForPendingWrites(); status = ScheduleFlushes(write_context); @@ -1112,9 +1120,9 @@ Status DBImpl::WriteRecoverableState() { WriteBatchInternal::SetSequence(&cached_recoverable_state_, seq + 1); auto status = WriteBatchInternal::InsertInto( &cached_recoverable_state_, column_family_memtables_.get(), - &flush_scheduler_, true, 0 /*recovery_log_number*/, this, - false /* concurrent_memtable_writes */, &next_seq, &dont_care_bool, - seq_per_batch_); + &flush_scheduler_, &trim_history_scheduler_, true, + 0 /*recovery_log_number*/, this, false /* concurrent_memtable_writes */, + &next_seq, &dont_care_bool, seq_per_batch_); auto last_seq = next_seq - 1; if (two_write_queues_) { versions_->FetchAddLastAllocatedSequence(last_seq - seq); @@ -1474,6 +1482,31 @@ void DBImpl::MaybeFlushStatsCF(autovector* cfds) { } } +Status DBImpl::TrimMemtableHistory(WriteContext* context) { + autovector cfds; + ColumnFamilyData* tmp_cfd; + while ((tmp_cfd = trim_history_scheduler_.TakeNextColumnFamily()) != + nullptr) { + cfds.push_back(tmp_cfd); + } + for (auto& cfd : cfds) { + autovector to_delete; + cfd->imm()->TrimHistory(&to_delete, cfd->mem()->ApproximateMemoryUsage()); + for (auto m : to_delete) { + delete m; + } + context->superversion_context.NewSuperVersion(); + assert(context->superversion_context.new_superversion.get() != nullptr); + cfd->InstallSuperVersion(&context->superversion_context, &mutex_); + + if (cfd->Unref()) { + delete cfd; + cfd = nullptr; + } + } + return Status::OK(); +} + Status DBImpl::ScheduleFlushes(WriteContext* context) { autovector cfds; if (immutable_db_options_.atomic_flush) { diff --git a/db/db_properties_test.cc b/db/db_properties_test.cc index 1a988f5ea..956accef8 100644 --- a/db/db_properties_test.cc +++ b/db/db_properties_test.cc @@ -615,8 +615,9 @@ TEST_F(DBPropertiesTest, NumImmutableMemTable) { writeOpt.disableWAL = true; options.max_write_buffer_number = 4; options.min_write_buffer_number_to_merge = 3; - options.max_write_buffer_number_to_maintain = 4; options.write_buffer_size = 1000000; + options.max_write_buffer_size_to_maintain = + 5 * static_cast(options.write_buffer_size); CreateAndReopenWithCF({"pikachu"}, options); std::string big_value(1000000 * 2, 'x'); @@ -747,7 +748,7 @@ TEST_F(DBPropertiesTest, DISABLED_GetProperty) { options.max_background_flushes = 1; options.max_write_buffer_number = 10; options.min_write_buffer_number_to_merge = 1; - options.max_write_buffer_number_to_maintain = 0; + options.max_write_buffer_size_to_maintain = 0; options.write_buffer_size = 1000000; Reopen(options); @@ -997,7 +998,7 @@ TEST_F(DBPropertiesTest, EstimatePendingCompBytes) { options.max_background_flushes = 1; options.max_write_buffer_number = 10; options.min_write_buffer_number_to_merge = 1; - options.max_write_buffer_number_to_maintain = 0; + options.max_write_buffer_size_to_maintain = 0; options.write_buffer_size = 1000000; Reopen(options); diff --git a/db/db_test.cc b/db/db_test.cc index 17ac22729..906a67cda 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -883,7 +883,7 @@ TEST_F(DBTest, FlushMultipleMemtable) { writeOpt.disableWAL = true; options.max_write_buffer_number = 4; options.min_write_buffer_number_to_merge = 3; - options.max_write_buffer_number_to_maintain = -1; + options.max_write_buffer_size_to_maintain = -1; CreateAndReopenWithCF({"pikachu"}, options); ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1")); ASSERT_OK(Flush(1)); @@ -901,7 +901,8 @@ TEST_F(DBTest, FlushSchedule) { options.level0_stop_writes_trigger = 1 << 10; options.level0_slowdown_writes_trigger = 1 << 10; options.min_write_buffer_number_to_merge = 1; - options.max_write_buffer_number_to_maintain = 1; + options.max_write_buffer_size_to_maintain = + static_cast(options.write_buffer_size); options.max_write_buffer_number = 2; options.write_buffer_size = 120 * 1024; CreateAndReopenWithCF({"pikachu"}, options); diff --git a/db/deletefile_test.cc b/db/deletefile_test.cc index 18014e5b4..b99c8e9dc 100644 --- a/db/deletefile_test.cc +++ b/db/deletefile_test.cc @@ -284,6 +284,8 @@ TEST_F(DeleteFileTest, BackgroundPurgeIteratorTest) { TEST_F(DeleteFileTest, BackgroundPurgeCFDropTest) { auto do_test = [&](bool bg_purge) { ColumnFamilyOptions co; + co.max_write_buffer_size_to_maintain = + static_cast(co.write_buffer_size); WriteOptions wo; FlushOptions fo; ColumnFamilyHandle* cfh = nullptr; diff --git a/db/flush_scheduler.cc b/db/flush_scheduler.cc index 9c6c04efe..cbcb5ce49 100644 --- a/db/flush_scheduler.cc +++ b/db/flush_scheduler.cc @@ -11,7 +11,7 @@ namespace rocksdb { -void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) { +void FlushScheduler::ScheduleWork(ColumnFamilyData* cfd) { #ifndef NDEBUG { std::lock_guard lock(checking_mutex_); diff --git a/db/flush_scheduler.h b/db/flush_scheduler.h index b5abec405..5ca85e88b 100644 --- a/db/flush_scheduler.h +++ b/db/flush_scheduler.h @@ -9,26 +9,29 @@ #include #include #include +#include "util/autovector.h" namespace rocksdb { class ColumnFamilyData; -// Unless otherwise noted, all methods on FlushScheduler should be called -// only with the DB mutex held or from a single-threaded recovery context. +// FlushScheduler keeps track of all column families whose memtable may +// be full and require flushing. Unless otherwise noted, all methods on +// FlushScheduler should be called only with the DB mutex held or from +// a single-threaded recovery context. class FlushScheduler { public: FlushScheduler() : head_(nullptr) {} // May be called from multiple threads at once, but not concurrent with // any other method calls on this instance - void ScheduleFlush(ColumnFamilyData* cfd); + void ScheduleWork(ColumnFamilyData* cfd); // Removes and returns Ref()-ed column family. Client needs to Unref(). // Filters column families that have been dropped. ColumnFamilyData* TakeNextColumnFamily(); - // This can be called concurrently with ScheduleFlush but it would miss all + // This can be called concurrently with ScheduleWork but it would miss all // the scheduled flushes after the last synchronization. This would result // into less precise enforcement of memtable sizes but should not matter much. bool Empty(); diff --git a/db/memtable.cc b/db/memtable.cc index 62c7339b5..06cb2222e 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -105,7 +105,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp, insert_with_hint_prefix_extractor_( ioptions.memtable_insert_with_hint_prefix_extractor), oldest_key_time_(std::numeric_limits::max()), - atomic_flush_seqno_(kMaxSequenceNumber) { + atomic_flush_seqno_(kMaxSequenceNumber), + approximate_memory_usage_(0) { UpdateFlushState(); // something went wrong if we need to flush before inserting anything assert(!ShouldScheduleFlush()); @@ -139,11 +140,12 @@ size_t MemTable::ApproximateMemoryUsage() { } total_usage += usage; } + approximate_memory_usage_.store(total_usage, std::memory_order_relaxed); // otherwise, return the actual usage return total_usage; } -bool MemTable::ShouldFlushNow() const { +bool MemTable::ShouldFlushNow() { size_t write_buffer_size = write_buffer_size_.load(std::memory_order_relaxed); // In a lot of times, we cannot allocate arena blocks that exactly matches the // buffer size. Thus we have to decide if we should over-allocate or @@ -159,6 +161,8 @@ bool MemTable::ShouldFlushNow() const { range_del_table_->ApproximateMemoryUsage() + arena_.MemoryAllocatedBytes(); + approximate_memory_usage_.store(allocated_memory, std::memory_order_relaxed); + // if we can still allocate one more block without exceeding the // over-allocation ratio, then we should not flush. if (allocated_memory + kArenaBlockSize < diff --git a/db/memtable.h b/db/memtable.h index 36ba0df79..c0baa9e17 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -130,6 +130,12 @@ class MemTable { // operations on the same MemTable (unless this Memtable is immutable). size_t ApproximateMemoryUsage(); + // As a cheap version of `ApproximateMemoryUsage()`, this function doens't + // require external synchronization. The value may be less accurate though + size_t ApproximateMemoryUsageFast() { + return approximate_memory_usage_.load(std::memory_order_relaxed); + } + // This method heuristically determines if the memtable should continue to // host more data. bool ShouldScheduleFlush() const { @@ -486,8 +492,12 @@ class MemTable { // writes with sequence number smaller than seq are flushed. SequenceNumber atomic_flush_seqno_; + // keep track of memory usage in table_, arena_, and range_del_table_. + // Gets refrshed inside `ApproximateMemoryUsage()` or `ShouldFlushNow` + std::atomic approximate_memory_usage_; + // Returns a heuristic flush decision - bool ShouldFlushNow() const; + bool ShouldFlushNow(); // Updates flush_state_ using ShouldFlushNow() void UpdateFlushState(); diff --git a/db/memtable_list.cc b/db/memtable_list.cc index d06a82df8..e3f0732de 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -46,6 +46,8 @@ MemTableListVersion::MemTableListVersion( size_t* parent_memtable_list_memory_usage, MemTableListVersion* old) : max_write_buffer_number_to_maintain_( old->max_write_buffer_number_to_maintain_), + max_write_buffer_size_to_maintain_( + old->max_write_buffer_size_to_maintain_), parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) { if (old != nullptr) { memlist_ = old->memlist_; @@ -62,8 +64,10 @@ MemTableListVersion::MemTableListVersion( MemTableListVersion::MemTableListVersion( size_t* parent_memtable_list_memory_usage, - int max_write_buffer_number_to_maintain) + int max_write_buffer_number_to_maintain, + int64_t max_write_buffer_size_to_maintain) : max_write_buffer_number_to_maintain_(max_write_buffer_number_to_maintain), + max_write_buffer_size_to_maintain_(max_write_buffer_size_to_maintain), parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {} void MemTableListVersion::Ref() { ++refs_; } @@ -240,7 +244,7 @@ void MemTableListVersion::Add(MemTable* m, autovector* to_delete) { assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable AddMemTable(m); - TrimHistory(to_delete); + TrimHistory(to_delete, m->ApproximateMemoryUsage()); } // Removes m from list of memtables not flushed. Caller should NOT Unref m. @@ -250,19 +254,51 @@ void MemTableListVersion::Remove(MemTable* m, memlist_.remove(m); m->MarkFlushed(); - if (max_write_buffer_number_to_maintain_ > 0) { + if (max_write_buffer_size_to_maintain_ > 0 || + max_write_buffer_number_to_maintain_ > 0) { memlist_history_.push_front(m); - TrimHistory(to_delete); + // Unable to get size of mutable memtable at this point, pass 0 to + // TrimHistory as a best effort. + TrimHistory(to_delete, 0); } else { UnrefMemTable(to_delete, m); } } +// return the total memory usage assuming the oldest flushed memtable is dropped +size_t MemTableListVersion::ApproximateMemoryUsageExcludingLast() { + size_t total_memtable_size = 0; + for (auto& memtable : memlist_) { + total_memtable_size += memtable->ApproximateMemoryUsage(); + } + for (auto& memtable : memlist_history_) { + total_memtable_size += memtable->ApproximateMemoryUsage(); + } + if (!memlist_history_.empty()) { + total_memtable_size -= memlist_history_.back()->ApproximateMemoryUsage(); + } + return total_memtable_size; +} + +bool MemTableListVersion::MemtableLimitExceeded(size_t usage) { + if (max_write_buffer_size_to_maintain_ > 0) { + // calculate the total memory usage after dropping the oldest flushed + // memtable, compare with max_write_buffer_size_to_maintain_ to decide + // whether to trim history + return ApproximateMemoryUsageExcludingLast() + usage >= + static_cast(max_write_buffer_size_to_maintain_); + } else if (max_write_buffer_number_to_maintain_ > 0) { + return memlist_.size() + memlist_history_.size() > + static_cast(max_write_buffer_number_to_maintain_); + } else { + return false; + } +} + // Make sure we don't use up too much space in history -void MemTableListVersion::TrimHistory(autovector* to_delete) { - while (memlist_.size() + memlist_history_.size() > - static_cast(max_write_buffer_number_to_maintain_) && - !memlist_history_.empty()) { +void MemTableListVersion::TrimHistory(autovector* to_delete, + size_t usage) { + while (MemtableLimitExceeded(usage) && !memlist_history_.empty()) { MemTable* x = memlist_history_.back(); memlist_history_.pop_back(); @@ -444,6 +480,8 @@ Status MemTableList::TryInstallMemtableFlushResults( cfd->GetName().c_str(), m->file_number_, mem_id); assert(m->file_number_ > 0); current_->Remove(m, to_delete); + UpdateMemoryUsageExcludingLast(); + ResetTrimHistoryNeeded(); ++mem_id; } } else { @@ -483,6 +521,15 @@ void MemTableList::Add(MemTable* m, autovector* to_delete) { if (num_flush_not_started_ == 1) { imm_flush_needed.store(true, std::memory_order_release); } + UpdateMemoryUsageExcludingLast(); + ResetTrimHistoryNeeded(); +} + +void MemTableList::TrimHistory(autovector* to_delete, size_t usage) { + InstallNewVersion(); + current_->TrimHistory(to_delete, usage); + UpdateMemoryUsageExcludingLast(); + ResetTrimHistoryNeeded(); } // Returns an estimate of the number of bytes of data in use. @@ -496,6 +543,20 @@ size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() { size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; } +size_t MemTableList::ApproximateMemoryUsageExcludingLast() { + size_t usage = + current_memory_usage_excluding_last_.load(std::memory_order_relaxed); + return usage; +} + +// Update current_memory_usage_excluding_last_, need to call whenever state +// changes for MemtableListVersion (whenever InstallNewVersion() is called) +void MemTableList::UpdateMemoryUsageExcludingLast() { + size_t total_memtable_size = current_->ApproximateMemoryUsageExcludingLast(); + current_memory_usage_excluding_last_.store(total_memtable_size, + std::memory_order_relaxed); +} + uint64_t MemTableList::ApproximateOldestKeyTime() const { if (!current_->memlist_.empty()) { return current_->memlist_.back()->ApproximateOldestKeyTime(); @@ -623,6 +684,8 @@ Status InstallMemtableAtomicFlushResults( cfds[i]->GetName().c_str(), m->GetFileNumber(), mem_id); imm->current_->Remove(m, to_delete); + imm->UpdateMemoryUsageExcludingLast(); + imm->ResetTrimHistoryNeeded(); } } } else { @@ -664,6 +727,8 @@ void MemTableList::RemoveOldMemTables(uint64_t log_number, imm_flush_needed.store(false, std::memory_order_release); } } + UpdateMemoryUsageExcludingLast(); + ResetTrimHistoryNeeded(); } } // namespace rocksdb diff --git a/db/memtable_list.h b/db/memtable_list.h index 2bd225b83..75cc1a524 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -44,7 +44,8 @@ class MemTableListVersion { explicit MemTableListVersion(size_t* parent_memtable_list_memory_usage, MemTableListVersion* old = nullptr); explicit MemTableListVersion(size_t* parent_memtable_list_memory_usage, - int max_write_buffer_number_to_maintain); + int max_write_buffer_number_to_maintain, + int64_t max_write_buffer_size_to_maintain); void Ref(); void Unref(autovector* to_delete = nullptr); @@ -139,7 +140,7 @@ class MemTableListVersion { // REQUIRE: m is an immutable memtable void Remove(MemTable* m, autovector* to_delete); - void TrimHistory(autovector* to_delete); + void TrimHistory(autovector* to_delete, size_t usage); bool GetFromList(std::list* list, const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, @@ -152,6 +153,14 @@ class MemTableListVersion { void UnrefMemTable(autovector* to_delete, MemTable* m); + // Calculate the total amount of memory used by memlist_ and memlist_history_ + // excluding the last MemTable in memlist_history_. The reason for excluding + // the last MemTable is to see if dropping the last MemTable will keep total + // memory usage above or equal to max_write_buffer_size_to_maintain_ + size_t ApproximateMemoryUsageExcludingLast(); + + bool MemtableLimitExceeded(size_t usage); + // Immutable MemTables that have not yet been flushed. std::list memlist_; @@ -160,8 +169,10 @@ class MemTableListVersion { std::list memlist_history_; // Maximum number of MemTables to keep in memory (including both flushed - // and not-yet-flushed tables). const int max_write_buffer_number_to_maintain_; + // Maximum size of MemTables to keep in memory (including both flushed + // and not-yet-flushed tables). + const int64_t max_write_buffer_size_to_maintain_; int refs_ = 0; @@ -176,35 +187,41 @@ class MemTableListVersion { // recoverability from a crash. // // -// Other than imm_flush_needed, this class is not thread-safe and requires -// external synchronization (such as holding the db mutex or being on the -// write thread.) +// Other than imm_flush_needed and imm_trim_needed, this class is not +// thread-safe and requires external synchronization (such as holding the db +// mutex or being on the write thread.) class MemTableList { public: // A list of memtables. explicit MemTableList(int min_write_buffer_number_to_merge, - int max_write_buffer_number_to_maintain) + int max_write_buffer_number_to_maintain, + int64_t max_write_buffer_size_to_maintain) : imm_flush_needed(false), + imm_trim_needed(false), min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge), current_(new MemTableListVersion(¤t_memory_usage_, - max_write_buffer_number_to_maintain)), + max_write_buffer_number_to_maintain, + max_write_buffer_size_to_maintain)), num_flush_not_started_(0), commit_in_progress_(false), - flush_requested_(false) { + flush_requested_(false), + current_memory_usage_(0), + current_memory_usage_excluding_last_(0) { current_->Ref(); - current_memory_usage_ = 0; } // Should not delete MemTableList without making sure MemTableList::current() // is Unref()'d. ~MemTableList() {} - MemTableListVersion* current() { return current_; } + MemTableListVersion* current() const { return current_; } // so that background threads can detect non-nullptr pointer to // determine whether there is anything more to start flushing. std::atomic imm_flush_needed; + std::atomic imm_trim_needed; + // Returns the total number of memtables in the list that haven't yet // been flushed and logged. int NumNotFlushed() const; @@ -243,6 +260,18 @@ class MemTableList { // Returns an estimate of the number of bytes of data in use. size_t ApproximateMemoryUsage(); + // Returns the cached current_memory_usage_excluding_last_ value + size_t ApproximateMemoryUsageExcludingLast(); + + // Update current_memory_usage_excluding_last_ from MemtableListVersion + void UpdateMemoryUsageExcludingLast(); + + // `usage` is the current size of the mutable Memtable. When + // max_write_buffer_size_to_maintain is used, total size of mutable and + // immutable memtables is checked against it to decide whether to trim + // memtable list. + void TrimHistory(autovector* to_delete, size_t usage); + // Returns an estimate of the number of bytes of data used by // the unflushed mem-tables. size_t ApproximateUnflushedMemTablesMemoryUsage(); @@ -259,6 +288,20 @@ class MemTableList { bool HasFlushRequested() { return flush_requested_; } + // Returns true if a trim history should be scheduled and the caller should + // be the one to schedule it + bool MarkTrimHistoryNeeded() { + auto expected = false; + return imm_trim_needed.compare_exchange_strong( + expected, true, std::memory_order_relaxed, std::memory_order_relaxed); + } + + void ResetTrimHistoryNeeded() { + auto expected = true; + imm_trim_needed.compare_exchange_strong( + expected, false, std::memory_order_relaxed, std::memory_order_relaxed); + } + // Copying allowed // MemTableList(const MemTableList&); // void operator=(const MemTableList&); @@ -338,6 +381,8 @@ class MemTableList { // The current memory usage. size_t current_memory_usage_; + + std::atomic current_memory_usage_excluding_last_; }; // Installs memtable atomic flush results. diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 3a14b6830..b8dc80216 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -183,7 +183,7 @@ class MemTableListTest : public testing::Test { TEST_F(MemTableListTest, Empty) { // Create an empty MemTableList and validate basic functions. - MemTableList list(1, 0); + MemTableList list(1, 0, 0); ASSERT_EQ(0, list.NumNotFlushed()); ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); @@ -202,8 +202,10 @@ TEST_F(MemTableListTest, GetTest) { // Create MemTableList int min_write_buffer_number_to_merge = 2; int max_write_buffer_number_to_maintain = 0; + int64_t max_write_buffer_size_to_maintain = 0; MemTableList list(min_write_buffer_number_to_merge, - max_write_buffer_number_to_maintain); + max_write_buffer_number_to_maintain, + max_write_buffer_size_to_maintain); SequenceNumber seq = 1; std::string value; @@ -312,8 +314,10 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { // Create MemTableList int min_write_buffer_number_to_merge = 2; int max_write_buffer_number_to_maintain = 2; + int64_t max_write_buffer_size_to_maintain = 2000; MemTableList list(min_write_buffer_number_to_merge, - max_write_buffer_number_to_maintain); + max_write_buffer_number_to_maintain, + max_write_buffer_size_to_maintain); SequenceNumber seq = 1; std::string value; @@ -514,8 +518,11 @@ TEST_F(MemTableListTest, FlushPendingTest) { // Create MemTableList int min_write_buffer_number_to_merge = 3; int max_write_buffer_number_to_maintain = 7; + int64_t max_write_buffer_size_to_maintain = + 7 * static_cast(options.write_buffer_size); MemTableList list(min_write_buffer_number_to_merge, - max_write_buffer_number_to_maintain); + max_write_buffer_number_to_maintain, + max_write_buffer_size_to_maintain); // Create some MemTables uint64_t memtable_id = 0; @@ -670,7 +677,9 @@ TEST_F(MemTableListTest, FlushPendingTest) { // created. So TryInstallMemtableFlushResults will install the first 3 tables // in to_flush and stop when it encounters a table not yet flushed. ASSERT_EQ(2, list.NumNotFlushed()); - int num_in_history = std::min(3, max_write_buffer_number_to_maintain); + int num_in_history = + std::min(3, static_cast(max_write_buffer_size_to_maintain) / + static_cast(options.write_buffer_size)); ASSERT_EQ(num_in_history, list.NumFlushed()); ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size()); @@ -687,7 +696,9 @@ TEST_F(MemTableListTest, FlushPendingTest) { // This will actually install 2 tables. The 1 we told it to flush, and also // tables[4] which has been waiting for tables[3] to commit. ASSERT_EQ(0, list.NumNotFlushed()); - num_in_history = std::min(5, max_write_buffer_number_to_maintain); + num_in_history = + std::min(5, static_cast(max_write_buffer_size_to_maintain) / + static_cast(options.write_buffer_size)); ASSERT_EQ(num_in_history, list.NumFlushed()); ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size()); @@ -730,7 +741,8 @@ TEST_F(MemTableListTest, FlushPendingTest) { list.current()->Unref(&to_delete); int to_delete_size = - std::min(num_tables, max_write_buffer_number_to_maintain); + std::min(num_tables, static_cast(max_write_buffer_size_to_maintain) / + static_cast(options.write_buffer_size)); ASSERT_EQ(to_delete_size, to_delete.size()); for (const auto& m : to_delete) { @@ -769,10 +781,13 @@ TEST_F(MemTableListTest, AtomicFlusTest) { // Create MemTableLists int min_write_buffer_number_to_merge = 3; int max_write_buffer_number_to_maintain = 7; + int64_t max_write_buffer_size_to_maintain = + 7 * static_cast(options.write_buffer_size); autovector lists; for (int i = 0; i != num_cfs; ++i) { lists.emplace_back(new MemTableList(min_write_buffer_number_to_merge, - max_write_buffer_number_to_maintain)); + max_write_buffer_number_to_maintain, + max_write_buffer_size_to_maintain)); } autovector cf_ids; diff --git a/db/repair.cc b/db/repair.cc index 8967b39f3..0f0d329cc 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -383,7 +383,8 @@ class Repairer { continue; } WriteBatchInternal::SetContents(&batch, record); - status = WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr); + status = + WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr, nullptr); if (status.ok()) { counter += WriteBatchInternal::Count(&batch); } else { diff --git a/db/trim_history_scheduler.cc b/db/trim_history_scheduler.cc new file mode 100644 index 000000000..a213ac65f --- /dev/null +++ b/db/trim_history_scheduler.cc @@ -0,0 +1,59 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/trim_history_scheduler.h" + +#include + +#include "db/column_family.h" + +namespace rocksdb { + +void TrimHistoryScheduler::ScheduleWork(ColumnFamilyData* cfd) { + std::lock_guard lock(checking_mutex_); + cfd->Ref(); + cfds_.push_back(cfd); + is_empty_.store(false, std::memory_order_relaxed); +} + +ColumnFamilyData* TrimHistoryScheduler::TakeNextColumnFamily() { + std::lock_guard lock(checking_mutex_); + while (true) { + if (cfds_.empty()) { + return nullptr; + } + ColumnFamilyData* cfd = cfds_.back(); + cfds_.pop_back(); + if (cfds_.empty()) { + is_empty_.store(true, std::memory_order_relaxed); + } + + if (!cfd->IsDropped()) { + // success + return cfd; + } + if (cfd->Unref()) { + // no longer relevant, retry + delete cfd; + } + } +} + +bool TrimHistoryScheduler::Empty() { + bool is_empty = is_empty_.load(std::memory_order_relaxed); + return is_empty; +} + +void TrimHistoryScheduler::Clear() { + ColumnFamilyData* cfd; + while ((cfd = TakeNextColumnFamily()) != nullptr) { + if (cfd->Unref()) { + delete cfd; + } + } + assert(Empty()); +} + +} // namespace rocksdb diff --git a/db/trim_history_scheduler.h b/db/trim_history_scheduler.h new file mode 100644 index 000000000..e9013b964 --- /dev/null +++ b/db/trim_history_scheduler.h @@ -0,0 +1,44 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include +#include +#include "util/autovector.h" + +namespace rocksdb { + +class ColumnFamilyData; + +// Similar to FlushScheduler, TrimHistoryScheduler is a FIFO queue that keeps +// track of column families whose flushed immutable memtables may need to be +// removed (aka trimmed). The actual trimming may be slightly delayed. Due to +// the use of the mutex and atomic variable, ScheduleWork, +// TakeNextColumnFamily, and, Empty can be called concurrently. +class TrimHistoryScheduler { + public: + TrimHistoryScheduler() : is_empty_(true) {} + + // When a column family needs history trimming, add cfd to the FIFO queue + void ScheduleWork(ColumnFamilyData* cfd); + + // Remove the column family from the queue, the caller is responsible for + // calling `MemtableList::TrimHistory` + ColumnFamilyData* TakeNextColumnFamily(); + + bool Empty(); + + void Clear(); + + // Not on critical path, use mutex to ensure thread safety + private: + std::atomic is_empty_; + autovector cfds_; + std::mutex checking_mutex_; +}; + +} // namespace rocksdb diff --git a/db/write_batch.cc b/db/write_batch.cc index 8a896644f..2a1bf1948 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -48,6 +48,7 @@ #include "db/memtable.h" #include "db/merge_context.h" #include "db/snapshot_impl.h" +#include "db/trim_history_scheduler.h" #include "db/write_batch_internal.h" #include "monitoring/perf_context_imp.h" #include "monitoring/statistics.h" @@ -1189,6 +1190,7 @@ class MemTableInserter : public WriteBatch::Handler { SequenceNumber sequence_; ColumnFamilyMemTables* const cf_mems_; FlushScheduler* const flush_scheduler_; + TrimHistoryScheduler* const trim_history_scheduler_; const bool ignore_missing_column_families_; const uint64_t recovering_log_number_; // log number that all Memtables inserted into should reference @@ -1250,6 +1252,7 @@ class MemTableInserter : public WriteBatch::Handler { // cf_mems should not be shared with concurrent inserters MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems, FlushScheduler* flush_scheduler, + TrimHistoryScheduler* trim_history_scheduler, bool ignore_missing_column_families, uint64_t recovering_log_number, DB* db, bool concurrent_memtable_writes, @@ -1258,6 +1261,7 @@ class MemTableInserter : public WriteBatch::Handler { : sequence_(_sequence), cf_mems_(cf_mems), flush_scheduler_(flush_scheduler), + trim_history_scheduler_(trim_history_scheduler), ignore_missing_column_families_(ignore_missing_column_families), recovering_log_number_(recovering_log_number), log_number_ref_(0), @@ -1748,7 +1752,20 @@ class MemTableInserter : public WriteBatch::Handler { cfd->mem()->MarkFlushScheduled()) { // MarkFlushScheduled only returns true if we are the one that // should take action, so no need to dedup further - flush_scheduler_->ScheduleFlush(cfd); + flush_scheduler_->ScheduleWork(cfd); + } + } + // check if memtable_list size exceeds max_write_buffer_size_to_maintain + if (trim_history_scheduler_ != nullptr) { + auto* cfd = cf_mems_->current(); + assert(cfd != nullptr); + if (cfd->ioptions()->max_write_buffer_size_to_maintain > 0 && + cfd->mem()->ApproximateMemoryUsageFast() + + cfd->imm()->ApproximateMemoryUsageExcludingLast() >= + static_cast( + cfd->ioptions()->max_write_buffer_size_to_maintain) && + cfd->imm()->MarkTrimHistoryNeeded()) { + trim_history_scheduler_->ScheduleWork(cfd); } } } @@ -1908,12 +1925,14 @@ class MemTableInserter : public WriteBatch::Handler { Status WriteBatchInternal::InsertInto( WriteThread::WriteGroup& write_group, SequenceNumber sequence, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, + TrimHistoryScheduler* trim_history_scheduler, bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db, bool concurrent_memtable_writes, bool seq_per_batch, bool batch_per_txn) { MemTableInserter inserter( - sequence, memtables, flush_scheduler, ignore_missing_column_families, - recovery_log_number, db, concurrent_memtable_writes, - nullptr /*has_valid_writes*/, seq_per_batch, batch_per_txn); + sequence, memtables, flush_scheduler, trim_history_scheduler, + ignore_missing_column_families, recovery_log_number, db, + concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch, + batch_per_txn); for (auto w : write_group) { if (w->CallbackFailed()) { continue; @@ -1939,6 +1958,7 @@ Status WriteBatchInternal::InsertInto( Status WriteBatchInternal::InsertInto( WriteThread::Writer* writer, SequenceNumber sequence, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, + TrimHistoryScheduler* trim_history_scheduler, bool ignore_missing_column_families, uint64_t log_number, DB* db, bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt, bool batch_per_txn) { @@ -1947,9 +1967,10 @@ Status WriteBatchInternal::InsertInto( #endif assert(writer->ShouldWriteToMemtable()); MemTableInserter inserter( - sequence, memtables, flush_scheduler, ignore_missing_column_families, - log_number, db, concurrent_memtable_writes, nullptr /*has_valid_writes*/, - seq_per_batch, batch_per_txn); + sequence, memtables, flush_scheduler, trim_history_scheduler, + ignore_missing_column_families, log_number, db, + concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch, + batch_per_txn); SetSequence(writer->batch, sequence); inserter.set_log_number_ref(writer->log_ref); Status s = writer->batch->Iterate(&inserter); @@ -1963,11 +1984,13 @@ Status WriteBatchInternal::InsertInto( Status WriteBatchInternal::InsertInto( const WriteBatch* batch, ColumnFamilyMemTables* memtables, - FlushScheduler* flush_scheduler, bool ignore_missing_column_families, - uint64_t log_number, DB* db, bool concurrent_memtable_writes, - SequenceNumber* next_seq, bool* has_valid_writes, bool seq_per_batch, - bool batch_per_txn) { + FlushScheduler* flush_scheduler, + TrimHistoryScheduler* trim_history_scheduler, + bool ignore_missing_column_families, uint64_t log_number, DB* db, + bool concurrent_memtable_writes, SequenceNumber* next_seq, + bool* has_valid_writes, bool seq_per_batch, bool batch_per_txn) { MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler, + trim_history_scheduler, ignore_missing_column_families, log_number, db, concurrent_memtable_writes, has_valid_writes, seq_per_batch, batch_per_txn); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 67136a847..6793e8450 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -9,11 +9,13 @@ #pragma once #include +#include "db/flush_scheduler.h" +#include "db/trim_history_scheduler.h" #include "db/write_thread.h" -#include "rocksdb/types.h" -#include "rocksdb/write_batch.h" #include "rocksdb/db.h" #include "rocksdb/options.h" +#include "rocksdb/types.h" +#include "rocksdb/write_batch.h" #include "util/autovector.h" namespace rocksdb { @@ -162,6 +164,7 @@ class WriteBatchInternal { static Status InsertInto( WriteThread::WriteGroup& write_group, SequenceNumber sequence, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, + TrimHistoryScheduler* trim_history_scheduler, bool ignore_missing_column_families = false, uint64_t log_number = 0, DB* db = nullptr, bool concurrent_memtable_writes = false, bool seq_per_batch = false, bool batch_per_txn = true); @@ -171,6 +174,7 @@ class WriteBatchInternal { static Status InsertInto( const WriteBatch* batch, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, + TrimHistoryScheduler* trim_history_scheduler, bool ignore_missing_column_families = false, uint64_t log_number = 0, DB* db = nullptr, bool concurrent_memtable_writes = false, SequenceNumber* next_seq = nullptr, bool* has_valid_writes = nullptr, @@ -179,6 +183,7 @@ class WriteBatchInternal { static Status InsertInto(WriteThread::Writer* writer, SequenceNumber sequence, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, + TrimHistoryScheduler* trim_history_scheduler, bool ignore_missing_column_families = false, uint64_t log_number = 0, DB* db = nullptr, bool concurrent_memtable_writes = false, diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 5de602cee..6a3f9e680 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -35,7 +35,8 @@ static std::string PrintContents(WriteBatch* b) { mem->Ref(); std::string state; ColumnFamilyMemTablesDefault cf_mems_default(mem); - Status s = WriteBatchInternal::InsertInto(b, &cf_mems_default, nullptr); + Status s = + WriteBatchInternal::InsertInto(b, &cf_mems_default, nullptr, nullptr); int count = 0; int put_count = 0; int delete_count = 0; diff --git a/examples/rocksdb_option_file_example.ini b/examples/rocksdb_option_file_example.ini index 351f1ed01..dcbc9a308 100644 --- a/examples/rocksdb_option_file_example.ini +++ b/examples/rocksdb_option_file_example.ini @@ -104,7 +104,7 @@ compression=kSnappyCompression level0_file_num_compaction_trigger=4 purge_redundant_kvs_while_flush=true - max_write_buffer_number_to_maintain=0 + max_write_buffer_size_to_maintain=0 memtable_factory=SkipListFactory max_grandparent_overlap_factor=8 expanded_compaction_factor=25 diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index c88a6c17d..2964491f7 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -175,11 +175,26 @@ struct AdvancedColumnFamilyOptions { // individual write buffers. Default: 1 int min_write_buffer_number_to_merge = 1; + // DEPRECATED // The total maximum number of write buffers to maintain in memory including // copies of buffers that have already been flushed. Unlike // max_write_buffer_number, this parameter does not affect flushing. - // This controls the minimum amount of write history that will be available - // in memory for conflict checking when Transactions are used. + // This parameter is being replaced by max_write_buffer_size_to_maintain. + // If both parameters are set to non-zero values, this parameter will be + // ignored. + int max_write_buffer_number_to_maintain = 0; + + // The total maximum size(bytes) of write buffers to maintain in memory + // including copies of buffers that have already been flushed. This parameter + // only affects trimming of flushed buffers and does not affect flushing. + // This controls the maximum amount of write history that will be available + // in memory for conflict checking when Transactions are used. The actual + // size of write history (flushed Memtables) might be higher than this limit + // if further trimming will reduce write history total size below this + // limit. For example, if max_write_buffer_size_to_maintain is set to 64MB, + // and there are three flushed Memtables, with sizes of 32MB, 20MB, 20MB. + // Because trimming the next Memtable of size 20MB will reduce total memory + // usage to 52MB which is below the limit, RocksDB will stop trimming. // // When using an OptimisticTransactionDB: // If this value is too low, some transactions may fail at commit time due @@ -192,14 +207,14 @@ struct AdvancedColumnFamilyOptions { // done for conflict detection. // // Setting this value to 0 will cause write buffers to be freed immediately - // after they are flushed. - // If this value is set to -1, 'max_write_buffer_number' will be used. + // after they are flushed. If this value is set to -1, + // 'max_write_buffer_number * write_buffer_size' will be used. // // Default: // If using a TransactionDB/OptimisticTransactionDB, the default value will - // be set to the value of 'max_write_buffer_number' if it is not explicitly - // set by the user. Otherwise, the default is 0. - int max_write_buffer_number_to_maintain = 0; + // be set to the value of 'max_write_buffer_number * write_buffer_size' + // if it is not explicitly set by the user. Otherwise, the default is 0. + int64_t max_write_buffer_size_to_maintain = 0; // Allows thread-safe inplace updates. If this is true, there is no way to // achieve point-in-time consistency using snapshot or iterator (assuming diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index e8cb32242..d7f13f8ed 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -854,6 +854,9 @@ rocksdb_options_set_min_write_buffer_number_to_merge(rocksdb_options_t*, int); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_max_write_buffer_number_to_maintain(rocksdb_options_t*, int); +extern ROCKSDB_LIBRARY_API void +rocksdb_options_set_max_write_buffer_size_to_maintain(rocksdb_options_t*, + int64_t); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_enable_pipelined_write( rocksdb_options_t*, unsigned char); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_unordered_write( diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index c0d8537a2..95d299c1b 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -131,7 +131,7 @@ class Transaction { // Status::Busy() may be returned if the transaction could not guarantee // that there are no write conflicts. Status::TryAgain() may be returned // if the memtable history size is not large enough - // (See max_write_buffer_number_to_maintain). + // (See max_write_buffer_size_to_maintain). // // If this transaction was created by a TransactionDB(), Status::Expired() // may be returned if this transaction has lived for longer than @@ -243,7 +243,7 @@ class Transaction { // Status::Busy() if there is a write conflict, // Status::TimedOut() if a lock could not be acquired, // Status::TryAgain() if the memtable history size is not large enough - // (See max_write_buffer_number_to_maintain) + // (See max_write_buffer_size_to_maintain) // Status::MergeInProgress() if merge operations cannot be resolved. // or other errors if this key could not be read. virtual Status GetForUpdate(const ReadOptions& options, @@ -320,7 +320,7 @@ class Transaction { // Status::Busy() if there is a write conflict, // Status::TimedOut() if a lock could not be acquired, // Status::TryAgain() if the memtable history size is not large enough - // (See max_write_buffer_number_to_maintain) + // (See max_write_buffer_size_to_maintain) // or other errors on unexpected failures. virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value, const bool assume_tracked = false) = 0; diff --git a/java/rocksjni/write_batch_test.cc b/java/rocksjni/write_batch_test.cc index c6b8a9239..eeb269e48 100644 --- a/java/rocksjni/write_batch_test.cc +++ b/java/rocksjni/write_batch_test.cc @@ -52,8 +52,8 @@ jbyteArray Java_org_rocksdb_WriteBatchTest_getContents(JNIEnv* env, mem->Ref(); std::string state; rocksdb::ColumnFamilyMemTablesDefault cf_mems_default(mem); - rocksdb::Status s = - rocksdb::WriteBatchInternal::InsertInto(b, &cf_mems_default, nullptr); + rocksdb::Status s = rocksdb::WriteBatchInternal::InsertInto( + b, &cf_mems_default, nullptr, nullptr); int count = 0; rocksdb::Arena arena; rocksdb::ScopedArenaIterator iter( diff --git a/options/cf_options.cc b/options/cf_options.cc index 5830fc661..ded9ca01b 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -33,6 +33,8 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options, cf_options.min_write_buffer_number_to_merge), max_write_buffer_number_to_maintain( cf_options.max_write_buffer_number_to_maintain), + max_write_buffer_size_to_maintain( + cf_options.max_write_buffer_size_to_maintain), inplace_update_support(cf_options.inplace_update_support), inplace_callback(cf_options.inplace_callback), info_log(db_options.info_log.get()), diff --git a/options/cf_options.h b/options/cf_options.h index 47fca58fa..e13eae801 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -43,6 +43,8 @@ struct ImmutableCFOptions { int max_write_buffer_number_to_maintain; + int64_t max_write_buffer_size_to_maintain; + bool inplace_update_support; UpdateStatus (*inplace_callback)(char* existing_value, diff --git a/options/options.cc b/options/options.cc index 5efd3ce57..11804d88f 100644 --- a/options/options.cc +++ b/options/options.cc @@ -42,6 +42,8 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options) options.min_write_buffer_number_to_merge), max_write_buffer_number_to_maintain( options.max_write_buffer_number_to_maintain), + max_write_buffer_size_to_maintain( + options.max_write_buffer_size_to_maintain), inplace_update_support(options.inplace_update_support), inplace_update_num_locks(options.inplace_update_num_locks), inplace_callback(options.inplace_callback), @@ -158,6 +160,9 @@ void ColumnFamilyOptions::Dump(Logger* log) const { min_write_buffer_number_to_merge); ROCKS_LOG_HEADER(log, " Options.max_write_buffer_number_to_maintain: %d", max_write_buffer_number_to_maintain); + ROCKS_LOG_HEADER(log, + " Options.max_write_buffer_size_to_maintain: %" PRIu64, + max_write_buffer_size_to_maintain); ROCKS_LOG_HEADER( log, " Options.bottommost_compression_opts.window_bits: %d", bottommost_compression_opts.window_bits); diff --git a/options/options_helper.cc b/options/options_helper.cc index 5733ceed4..588a45ef7 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -1873,6 +1873,9 @@ std::unordered_map {"max_write_buffer_number_to_maintain", {offset_of(&ColumnFamilyOptions::max_write_buffer_number_to_maintain), OptionType::kInt, OptionVerificationType::kNormal, false, 0}}, + {"max_write_buffer_size_to_maintain", + {offset_of(&ColumnFamilyOptions::max_write_buffer_size_to_maintain), + OptionType::kInt64T, OptionVerificationType::kNormal, false, 0}}, {"min_write_buffer_number_to_merge", {offset_of(&ColumnFamilyOptions::min_write_buffer_number_to_merge), OptionType::kInt, OptionVerificationType::kNormal, false, 0}}, diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index e60fd6f9e..d0fef5847 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -438,6 +438,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { "soft_rate_limit=530.615385;" "soft_pending_compaction_bytes_limit=0;" "max_write_buffer_number_to_maintain=84;" + "max_write_buffer_size_to_maintain=2147483648;" "merge_operator=aabcxehazrMergeOperator;" "memtable_prefix_bloom_size_ratio=0.4642;" "memtable_whole_key_filtering=true;" diff --git a/options/options_test.cc b/options/options_test.cc index 05ea766f6..0e7cebf3a 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -49,6 +49,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"max_write_buffer_number", "2"}, {"min_write_buffer_number_to_merge", "3"}, {"max_write_buffer_number_to_maintain", "99"}, + {"max_write_buffer_size_to_maintain", "-99999"}, {"compression", "kSnappyCompression"}, {"compression_per_level", "kNoCompression:" @@ -150,6 +151,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_cf_opt.max_write_buffer_number, 2); ASSERT_EQ(new_cf_opt.min_write_buffer_number_to_merge, 3); ASSERT_EQ(new_cf_opt.max_write_buffer_number_to_maintain, 99); + ASSERT_EQ(new_cf_opt.max_write_buffer_size_to_maintain, -99999); ASSERT_EQ(new_cf_opt.compression, kSnappyCompression); ASSERT_EQ(new_cf_opt.compression_per_level.size(), 9U); ASSERT_EQ(new_cf_opt.compression_per_level[0], kNoCompression); diff --git a/src.mk b/src.mk index 8ebc0bee9..6d087861d 100644 --- a/src.mk +++ b/src.mk @@ -53,6 +53,7 @@ LIB_SOURCES = \ db/table_cache.cc \ db/table_properties_collector.cc \ db/transaction_log_impl.cc \ + db/trim_history_scheduler.cc \ db/version_builder.cc \ db/version_edit.cc \ db/version_set.cc \ diff --git a/table/table_test.cc b/table/table_test.cc index 749048b78..c1f9ed3f3 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -3300,7 +3300,8 @@ TEST_F(MemTableTest, Simple) { batch.DeleteRange(std::string("begin"), std::string("end")); ColumnFamilyMemTablesDefault cf_mems_default(memtable); ASSERT_TRUE( - WriteBatchInternal::InsertInto(&batch, &cf_mems_default, nullptr).ok()); + WriteBatchInternal::InsertInto(&batch, &cf_mems_default, nullptr, nullptr) + .ok()); for (int i = 0; i < 2; ++i) { Arena arena; diff --git a/test_util/testutil.cc b/test_util/testutil.cc index 46f878f8c..f3e71bebc 100644 --- a/test_util/testutil.cc +++ b/test_util/testutil.cc @@ -333,6 +333,7 @@ void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, DBOptions& db_options, cf_opt->max_mem_compaction_level = rnd->Uniform(100); cf_opt->max_write_buffer_number = rnd->Uniform(100); cf_opt->max_write_buffer_number_to_maintain = rnd->Uniform(100); + cf_opt->max_write_buffer_size_to_maintain = rnd->Uniform(10000); cf_opt->min_write_buffer_number_to_merge = rnd->Uniform(100); cf_opt->num_levels = rnd->Uniform(100); cf_opt->target_file_size_multiplier = rnd->Uniform(100); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 001dd4d2f..3c2de42dd 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -340,6 +340,20 @@ DEFINE_int32(max_write_buffer_number_to_maintain, "after they are flushed. If this value is set to -1, " "'max_write_buffer_number' will be used."); +DEFINE_int64(max_write_buffer_size_to_maintain, + rocksdb::Options().max_write_buffer_size_to_maintain, + "The total maximum size of write buffers to maintain in memory " + "including copies of buffers that have already been flushed. " + "Unlike max_write_buffer_number, this parameter does not affect " + "flushing. This controls the minimum amount of write history " + "that will be available in memory for conflict checking when " + "Transactions are used. If this value is too low, some " + "transactions may fail at commit time due to not being able to " + "determine whether there were any write conflicts. Setting this " + "value to 0 will cause write buffers to be freed immediately " + "after they are flushed. If this value is set to -1, " + "'max_write_buffer_number' will be used."); + DEFINE_int32(max_background_jobs, rocksdb::Options().max_background_jobs, "The maximum number of concurrent background jobs that can occur " @@ -3385,6 +3399,8 @@ class Benchmark { FLAGS_min_write_buffer_number_to_merge; options.max_write_buffer_number_to_maintain = FLAGS_max_write_buffer_number_to_maintain; + options.max_write_buffer_size_to_maintain = + FLAGS_max_write_buffer_size_to_maintain; options.max_background_jobs = FLAGS_max_background_jobs; options.max_background_compactions = FLAGS_max_background_compactions; options.max_subcompactions = static_cast(FLAGS_subcompactions); diff --git a/tools/db_bench_tool_test.cc b/tools/db_bench_tool_test.cc index 4eb5472ac..87f8f1943 100644 --- a/tools/db_bench_tool_test.cc +++ b/tools/db_bench_tool_test.cc @@ -245,6 +245,7 @@ const std::string options_file_content = R"OPTIONS_FILE( expanded_compaction_factor=25 soft_rate_limit=0.000000 max_write_buffer_number_to_maintain=0 + max_write_buffer_size_to_maintain=0 verify_checksums_in_compaction=true merge_operator=nullptr memtable_prefix_bloom_bits=0 diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 7b4a36f0f..3461a75d2 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -204,6 +204,20 @@ DEFINE_int32(max_write_buffer_number_to_maintain, "after they are flushed. If this value is set to -1, " "'max_write_buffer_number' will be used."); +DEFINE_int64(max_write_buffer_size_to_maintain, + rocksdb::Options().max_write_buffer_size_to_maintain, + "The total maximum size of write buffers to maintain in memory " + "including copies of buffers that have already been flushed. " + "Unlike max_write_buffer_number, this parameter does not affect " + "flushing. This controls the minimum amount of write history " + "that will be available in memory for conflict checking when " + "Transactions are used. If this value is too low, some " + "transactions may fail at commit time due to not being able to " + "determine whether there were any write conflicts. Setting this " + "value to 0 will cause write buffers to be freed immediately " + "after they are flushed. If this value is set to -1, " + "'max_write_buffer_number' will be used."); + DEFINE_double(memtable_prefix_bloom_size_ratio, rocksdb::Options().memtable_prefix_bloom_size_ratio, "creates prefix blooms for memtables, each with size " @@ -2762,6 +2776,8 @@ class StressTest { FLAGS_min_write_buffer_number_to_merge; options_.max_write_buffer_number_to_maintain = FLAGS_max_write_buffer_number_to_maintain; + options_.max_write_buffer_size_to_maintain = + FLAGS_max_write_buffer_size_to_maintain; options_.memtable_prefix_bloom_size_ratio = FLAGS_memtable_prefix_bloom_size_ratio; options_.memtable_whole_key_filtering = diff --git a/util/string_util.cc b/util/string_util.cc index 9b447d50c..4a194f3a9 100644 --- a/util/string_util.cc +++ b/util/string_util.cc @@ -6,17 +6,16 @@ #include "util/string_util.h" #include -#include #include #include #include #include +#include #include #include #include #include #include -#include "rocksdb/env.h" #include "port/port.h" #include "port/sys_time.h" #include "rocksdb/slice.h" diff --git a/util/string_util.h b/util/string_util.h index faf763e54..122a6c356 100644 --- a/util/string_util.h +++ b/util/string_util.h @@ -121,7 +121,6 @@ uint64_t ParseUint64(const std::string& value); int ParseInt(const std::string& value); - int64_t ParseInt64(const std::string& value); double ParseDouble(const std::string& value); diff --git a/utilities/transactions/optimistic_transaction_db_impl.cc b/utilities/transactions/optimistic_transaction_db_impl.cc index b7fedc066..0fb7c9100 100644 --- a/utilities/transactions/optimistic_transaction_db_impl.cc +++ b/utilities/transactions/optimistic_transaction_db_impl.cc @@ -63,9 +63,11 @@ Status OptimisticTransactionDB::Open( for (auto& column_family : column_families_copy) { ColumnFamilyOptions* options = &column_family.options; - if (options->max_write_buffer_number_to_maintain == 0) { - // Setting to -1 will set the History size to max_write_buffer_number. - options->max_write_buffer_number_to_maintain = -1; + if (options->max_write_buffer_size_to_maintain == 0 && + options->max_write_buffer_number_to_maintain == 0) { + // Setting to -1 will set the History size to + // max_write_buffer_number * write_buffer_size. + options->max_write_buffer_size_to_maintain = -1; } } diff --git a/utilities/transactions/optimistic_transaction_test.cc b/utilities/transactions/optimistic_transaction_test.cc index 3aa6c207a..2a6933495 100644 --- a/utilities/transactions/optimistic_transaction_test.cc +++ b/utilities/transactions/optimistic_transaction_test.cc @@ -36,6 +36,7 @@ class OptimisticTransactionTest : public testing::Test { OptimisticTransactionTest() { options.create_if_missing = true; options.max_write_buffer_number = 2; + options.max_write_buffer_size_to_maintain = 1600; dbname = test::PerThreadDBPath("optimistic_transaction_testdb"); DestroyDB(dbname, options); diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 2f9c918a3..caac2ab18 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -271,9 +271,11 @@ void TransactionDB::PrepareWrap( for (size_t i = 0; i < column_families->size(); i++) { ColumnFamilyOptions* cf_options = &(*column_families)[i].options; - if (cf_options->max_write_buffer_number_to_maintain == 0) { - // Setting to -1 will set the History size to max_write_buffer_number. - cf_options->max_write_buffer_number_to_maintain = -1; + if (cf_options->max_write_buffer_size_to_maintain == 0 && + cf_options->max_write_buffer_number_to_maintain == 0) { + // Setting to -1 will set the History size to + // max_write_buffer_number * write_buffer_size. + cf_options->max_write_buffer_size_to_maintain = -1; } if (!cf_options->disable_auto_compactions) { // Disable compactions momentarily to prevent race with DB::Open diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index da7cee063..23ae374dc 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -227,8 +227,10 @@ TEST_P(TransactionTest, ValidateSnapshotTest) { db_impl->TEST_FlushMemTable(true); // Make sure the flushed memtable is not kept in memory int max_memtable_in_history = - std::max(options.max_write_buffer_number, - options.max_write_buffer_number_to_maintain) + + std::max( + options.max_write_buffer_number, + static_cast(options.max_write_buffer_size_to_maintain) / + static_cast(options.write_buffer_size)) + 1; for (int i = 0; i < max_memtable_in_history; i++) { db->Put(write_options, Slice("key"), Slice("value")); diff --git a/utilities/transactions/transaction_util.cc b/utilities/transactions/transaction_util.cc index ba3b75e15..371448503 100644 --- a/utilities/transactions/transaction_util.cc +++ b/utilities/transactions/transaction_util.cc @@ -94,7 +94,7 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, " as the MemTable only contains changes newer than " "SequenceNumber %" PRIu64 ". Increasing the value of the " - "max_write_buffer_number_to_maintain option could reduce the " + "max_write_buffer_size_to_maintain option could reduce the " "frequency " "of this error.", snap_seq, earliest_seq);