From 2f41ecfe75f0ebf33e4969083b031c7a97ebaee7 Mon Sep 17 00:00:00 2001 From: Zhongyi Xie Date: Fri, 23 Aug 2019 13:54:09 -0700 Subject: [PATCH] Refactor trimming logic for immutable memtables (#5022) Summary: MyRocks currently sets `max_write_buffer_number_to_maintain` in order to maintain enough history for transaction conflict checking. The effectiveness of this approach depends on the size of memtables. When memtables are small, it may not keep enough history; when memtables are large, this may consume too much memory. We are proposing a new way to configure memtable list history: by limiting the memory usage of immutable memtables. The new option is `max_write_buffer_size_to_maintain` and it will take precedence over the old `max_write_buffer_number_to_maintain` if they are both set to non-zero values. The new option accounts for the total memory usage of flushed immutable memtables and mutable memtable. When the total usage exceeds the limit, RocksDB may start dropping immutable memtables (which is also called trimming history), starting from the oldest one. The semantics of the old option actually works both as an upper bound and lower bound. History trimming will start if number of immutable memtables exceeds the limit, but it will never go below (limit-1) due to history trimming. In order the mimic the behavior with the new option, history trimming will stop if dropping the next immutable memtable causes the total memory usage go below the size limit. For example, assuming the size limit is set to 64MB, and there are 3 immutable memtables with sizes of 20, 30, 30. Although the total memory usage is 80MB > 64MB, dropping the oldest memtable will reduce the memory usage to 60MB < 64MB, so in this case no memtable will be dropped. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5022 Differential Revision: D14394062 Pulled By: miasantreble fbshipit-source-id: 60457a509c6af89d0993f988c9b5c2aa9e45f5c5 --- CMakeLists.txt | 1 + HISTORY.md | 2 + TARGETS | 1 + db/c.cc | 5 ++ db/column_family.cc | 12 ++- db/column_family_test.cc | 11 ++- db/db_basic_test.cc | 5 +- db/db_compaction_test.cc | 2 +- db/db_impl/db_impl.cc | 1 + db/db_impl/db_impl.h | 5 ++ db/db_impl/db_impl_open.cc | 8 +- db/db_impl/db_impl_secondary.cc | 6 +- db/db_impl/db_impl_write.cc | 55 ++++++++++--- db/db_properties_test.cc | 7 +- db/db_test.cc | 5 +- db/deletefile_test.cc | 2 + db/flush_scheduler.cc | 2 +- db/flush_scheduler.h | 11 ++- db/memtable.cc | 8 +- db/memtable.h | 12 ++- db/memtable_list.cc | 81 +++++++++++++++++-- db/memtable_list.h | 67 ++++++++++++--- db/memtable_list_test.cc | 31 +++++-- db/repair.cc | 3 +- db/trim_history_scheduler.cc | 59 ++++++++++++++ db/trim_history_scheduler.h | 44 ++++++++++ db/write_batch.cc | 45 ++++++++--- db/write_batch_internal.h | 9 ++- db/write_batch_test.cc | 3 +- examples/rocksdb_option_file_example.ini | 2 +- include/rocksdb/advanced_options.h | 29 +++++-- include/rocksdb/c.h | 3 + include/rocksdb/utilities/transaction.h | 6 +- java/rocksjni/write_batch_test.cc | 4 +- options/cf_options.cc | 2 + options/cf_options.h | 2 + options/options.cc | 5 ++ options/options_helper.cc | 3 + options/options_settable_test.cc | 1 + options/options_test.cc | 2 + src.mk | 1 + table/table_test.cc | 3 +- test_util/testutil.cc | 1 + tools/db_bench_tool.cc | 16 ++++ tools/db_bench_tool_test.cc | 1 + tools/db_stress.cc | 16 ++++ util/string_util.cc | 3 +- util/string_util.h | 1 - .../optimistic_transaction_db_impl.cc | 8 +- .../optimistic_transaction_test.cc | 1 + .../pessimistic_transaction_db.cc | 8 +- utilities/transactions/transaction_test.cc | 6 +- utilities/transactions/transaction_util.cc | 2 +- 53 files changed, 522 insertions(+), 107 deletions(-) create mode 100644 db/trim_history_scheduler.cc create mode 100644 db/trim_history_scheduler.h diff --git a/CMakeLists.txt b/CMakeLists.txt index f81e0ca4f..a094d3261 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -530,6 +530,7 @@ set(SOURCES db/table_cache.cc db/table_properties_collector.cc db/transaction_log_impl.cc + db/trim_history_scheduler.cc db/version_builder.cc db/version_edit.cc db/version_set.cc diff --git a/HISTORY.md b/HISTORY.md index a87d06f90..da06ca228 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,8 @@ * Fix bloom filter lookups by the MultiGet batching API when BlockBasedTableOptions::whole_key_filtering is false, by checking that a key is in the perfix_extractor domain and extracting the prefix before looking up. ### New Features * VerifyChecksum() by default will issue readahead. Allow ReadOptions to be passed in to those functions to override the readhead size. For checksum verifying before external SST file ingestion, a new option IngestExternalFileOptions.verify_checksums_readahead_size, is added for this readahead setting. +### Public API Change +* Added max_write_buffer_size_to_maintain option to better control memory usage of immutable memtables. ## 6.4.0 (7/30/2019) ### Default Option Change diff --git a/TARGETS b/TARGETS index bac5c4311..058e591e8 100644 --- a/TARGETS +++ b/TARGETS @@ -158,6 +158,7 @@ cpp_library( "db/table_cache.cc", "db/table_properties_collector.cc", "db/transaction_log_impl.cc", + "db/trim_history_scheduler.cc", "db/version_builder.cc", "db/version_edit.cc", "db/version_set.cc", diff --git a/db/c.cc b/db/c.cc index 4d40558f6..66e3892af 100644 --- a/db/c.cc +++ b/db/c.cc @@ -2514,6 +2514,11 @@ void rocksdb_options_set_max_write_buffer_number_to_maintain( opt->rep.max_write_buffer_number_to_maintain = n; } +void rocksdb_options_set_max_write_buffer_size_to_maintain( + rocksdb_options_t* opt, int64_t n) { + opt->rep.max_write_buffer_size_to_maintain = n; +} + void rocksdb_options_set_enable_pipelined_write(rocksdb_options_t* opt, unsigned char v) { opt->rep.enable_pipelined_write = v; diff --git a/db/column_family.cc b/db/column_family.cc index e135c2d31..4c67b7d76 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -227,7 +227,14 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options, if (result.max_write_buffer_number < 2) { result.max_write_buffer_number = 2; } - if (result.max_write_buffer_number_to_maintain < 0) { + // fall back max_write_buffer_number_to_maintain if + // max_write_buffer_size_to_maintain is not set + if (result.max_write_buffer_size_to_maintain < 0) { + result.max_write_buffer_size_to_maintain = + result.max_write_buffer_number * + static_cast(result.write_buffer_size); + } else if (result.max_write_buffer_size_to_maintain == 0 && + result.max_write_buffer_number_to_maintain < 0) { result.max_write_buffer_number_to_maintain = result.max_write_buffer_number; } // bloom filter size shouldn't exceed 1/4 of memtable size. @@ -423,7 +430,8 @@ ColumnFamilyData::ColumnFamilyData( write_buffer_manager_(write_buffer_manager), mem_(nullptr), imm_(ioptions_.min_write_buffer_number_to_merge, - ioptions_.max_write_buffer_number_to_maintain), + ioptions_.max_write_buffer_number_to_maintain, + ioptions_.max_write_buffer_size_to_maintain), super_version_(nullptr), super_version_number_(0), local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)), diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 235313f48..95c43ac5a 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -1132,22 +1132,25 @@ TEST_P(ColumnFamilyTest, DifferentWriteBufferSizes) { default_cf.arena_block_size = 4 * 4096; default_cf.max_write_buffer_number = 10; default_cf.min_write_buffer_number_to_merge = 1; - default_cf.max_write_buffer_number_to_maintain = 0; + default_cf.max_write_buffer_size_to_maintain = 0; one.write_buffer_size = 200000; one.arena_block_size = 4 * 4096; one.max_write_buffer_number = 10; one.min_write_buffer_number_to_merge = 2; - one.max_write_buffer_number_to_maintain = 1; + one.max_write_buffer_size_to_maintain = + static_cast(one.write_buffer_size); two.write_buffer_size = 1000000; two.arena_block_size = 4 * 4096; two.max_write_buffer_number = 10; two.min_write_buffer_number_to_merge = 3; - two.max_write_buffer_number_to_maintain = 2; + two.max_write_buffer_size_to_maintain = + static_cast(two.write_buffer_size); three.write_buffer_size = 4096 * 22; three.arena_block_size = 4096; three.max_write_buffer_number = 10; three.min_write_buffer_number_to_merge = 4; - three.max_write_buffer_number_to_maintain = -1; + three.max_write_buffer_size_to_maintain = + static_cast(three.write_buffer_size); Reopen({default_cf, one, two, three}); diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 828eaaaeb..963dde6ce 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -297,7 +297,7 @@ TEST_F(DBBasicTest, FlushMultipleMemtable) { writeOpt.disableWAL = true; options.max_write_buffer_number = 4; options.min_write_buffer_number_to_merge = 3; - options.max_write_buffer_number_to_maintain = -1; + options.max_write_buffer_size_to_maintain = -1; CreateAndReopenWithCF({"pikachu"}, options); ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1")); ASSERT_OK(Flush(1)); @@ -327,7 +327,8 @@ TEST_F(DBBasicTest, FlushEmptyColumnFamily) { writeOpt.disableWAL = true; options.max_write_buffer_number = 2; options.min_write_buffer_number_to_merge = 1; - options.max_write_buffer_number_to_maintain = 1; + options.max_write_buffer_size_to_maintain = + static_cast(options.write_buffer_size); CreateAndReopenWithCF({"pikachu"}, options); // Compaction can still go through even if no thread can flush the diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 7f639c853..45ba17011 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -141,7 +141,7 @@ Options DeletionTriggerOptions(Options options) { options.compression = kNoCompression; options.write_buffer_size = kCDTKeysPerBuffer * (kCDTValueSize + 24); options.min_write_buffer_number_to_merge = 1; - options.max_write_buffer_number_to_maintain = 0; + options.max_write_buffer_size_to_maintain = 0; options.num_levels = kCDTNumLevels; options.level0_file_num_compaction_trigger = 1; options.target_file_size_base = options.write_buffer_size * 2; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index dae879346..587f1d8d9 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -472,6 +472,7 @@ Status DBImpl::CloseHelper() { &files_grabbed_for_purge_); EraseThreadStatusDbInfo(); flush_scheduler_.Clear(); + trim_history_scheduler_.Clear(); while (!flush_queue_.empty()) { const FlushRequest& flush_req = PopFirstFromFlushQueue(); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 42377ce41..dcefc4753 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -37,6 +37,7 @@ #include "db/read_callback.h" #include "db/snapshot_checker.h" #include "db/snapshot_impl.h" +#include "db/trim_history_scheduler.h" #include "db/version_edit.h" #include "db/wal_manager.h" #include "db/write_controller.h" @@ -1355,6 +1356,8 @@ class DBImpl : public DB { void MaybeFlushStatsCF(autovector* cfds); + Status TrimMemtableHistory(WriteContext* context); + Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context); void SelectColumnFamiliesForAtomicFlush(autovector* cfds); @@ -1733,6 +1736,8 @@ class DBImpl : public DB { FlushScheduler flush_scheduler_; + TrimHistoryScheduler trim_history_scheduler_; + SnapshotList snapshots_; // For each background job, pending_outputs_ keeps the current file number at diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 0e0fcfbf2..737e3a660 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -862,9 +862,10 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // That's why we set ignore missing column families to true bool has_valid_writes = false; status = WriteBatchInternal::InsertInto( - &batch, column_family_memtables_.get(), &flush_scheduler_, true, - log_number, this, false /* concurrent_memtable_writes */, - next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_); + &batch, column_family_memtables_.get(), &flush_scheduler_, + &trim_history_scheduler_, true, log_number, this, + false /* concurrent_memtable_writes */, next_sequence, + &has_valid_writes, seq_per_batch_, batch_per_txn_); MaybeIgnoreError(&status); if (!status.ok()) { // We are treating this as a failure while reading since we read valid @@ -931,6 +932,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, } flush_scheduler_.Clear(); + trim_history_scheduler_.Clear(); auto last_sequence = *next_sequence - 1; if ((*next_sequence != kMaxSequenceNumber) && (versions_->LastSequence() <= last_sequence)) { diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index a73cd6ba2..1a55c328e 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -253,9 +253,9 @@ Status DBImplSecondary::RecoverLogFiles( bool has_valid_writes = false; status = WriteBatchInternal::InsertInto( &batch, column_family_memtables_.get(), - nullptr /* flush_scheduler */, true, log_number, this, - false /* concurrent_memtable_writes */, next_sequence, - &has_valid_writes, seq_per_batch_, batch_per_txn_); + nullptr /* flush_scheduler */, nullptr /* trim_history_scheduler*/, + true, log_number, this, false /* concurrent_memtable_writes */, + next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_); } // If column family was not found, it might mean that the WAL write // batch references to the column family that was dropped after the diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 0ad2a3e9a..d15165122 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -171,6 +171,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, versions_->GetColumnFamilySet()); w.status = WriteBatchInternal::InsertInto( &w, w.sequence, &column_family_memtables, &flush_scheduler_, + &trim_history_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt, batch_per_txn_); @@ -375,7 +376,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // w.sequence will be set inside InsertInto w.status = WriteBatchInternal::InsertInto( write_group, current_sequence, column_family_memtables_.get(), - &flush_scheduler_, write_options.ignore_missing_column_families, + &flush_scheduler_, &trim_history_scheduler_, + write_options.ignore_missing_column_families, 0 /*recovery_log_number*/, this, parallel, seq_per_batch_, batch_per_txn_); } else { @@ -391,6 +393,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, assert(w.sequence == current_sequence); w.status = WriteBatchInternal::InsertInto( &w, w.sequence, &column_family_memtables, &flush_scheduler_, + &trim_history_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt, batch_per_txn_); @@ -545,9 +548,9 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, } else { memtable_write_group.status = WriteBatchInternal::InsertInto( memtable_write_group, w.sequence, column_family_memtables_.get(), - &flush_scheduler_, write_options.ignore_missing_column_families, - 0 /*log_number*/, this, false /*concurrent_memtable_writes*/, - seq_per_batch_, batch_per_txn_); + &flush_scheduler_, &trim_history_scheduler_, + write_options.ignore_missing_column_families, 0 /*log_number*/, this, + false /*concurrent_memtable_writes*/, seq_per_batch_, batch_per_txn_); versions_->SetLastSequence(memtable_write_group.last_sequence); write_thread_.ExitAsMemTableWriter(&w, memtable_write_group); } @@ -559,8 +562,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, versions_->GetColumnFamilySet()); w.status = WriteBatchInternal::InsertInto( &w, w.sequence, &column_family_memtables, &flush_scheduler_, - write_options.ignore_missing_column_families, 0 /*log_number*/, this, - true /*concurrent_memtable_writes*/); + &trim_history_scheduler_, write_options.ignore_missing_column_families, + 0 /*log_number*/, this, true /*concurrent_memtable_writes*/); if (write_thread_.CompleteParallelMemTableWriter(&w)) { MemTableInsertStatusCheck(w.status); versions_->SetLastSequence(w.write_group->last_sequence); @@ -597,8 +600,9 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options, versions_->GetColumnFamilySet()); w.status = WriteBatchInternal::InsertInto( &w, w.sequence, &column_family_memtables, &flush_scheduler_, - write_options.ignore_missing_column_families, 0 /*log_number*/, this, - true /*concurrent_memtable_writes*/, seq_per_batch_, sub_batch_cnt); + &trim_history_scheduler_, write_options.ignore_missing_column_families, + 0 /*log_number*/, this, true /*concurrent_memtable_writes*/, + seq_per_batch_, sub_batch_cnt); WriteStatusCheck(w.status); if (write_options.disableWAL) { @@ -856,6 +860,10 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, status = HandleWriteBufferFull(write_context); } + if (UNLIKELY(status.ok() && !trim_history_scheduler_.Empty())) { + status = TrimMemtableHistory(write_context); + } + if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) { WaitForPendingWrites(); status = ScheduleFlushes(write_context); @@ -1112,9 +1120,9 @@ Status DBImpl::WriteRecoverableState() { WriteBatchInternal::SetSequence(&cached_recoverable_state_, seq + 1); auto status = WriteBatchInternal::InsertInto( &cached_recoverable_state_, column_family_memtables_.get(), - &flush_scheduler_, true, 0 /*recovery_log_number*/, this, - false /* concurrent_memtable_writes */, &next_seq, &dont_care_bool, - seq_per_batch_); + &flush_scheduler_, &trim_history_scheduler_, true, + 0 /*recovery_log_number*/, this, false /* concurrent_memtable_writes */, + &next_seq, &dont_care_bool, seq_per_batch_); auto last_seq = next_seq - 1; if (two_write_queues_) { versions_->FetchAddLastAllocatedSequence(last_seq - seq); @@ -1474,6 +1482,31 @@ void DBImpl::MaybeFlushStatsCF(autovector* cfds) { } } +Status DBImpl::TrimMemtableHistory(WriteContext* context) { + autovector cfds; + ColumnFamilyData* tmp_cfd; + while ((tmp_cfd = trim_history_scheduler_.TakeNextColumnFamily()) != + nullptr) { + cfds.push_back(tmp_cfd); + } + for (auto& cfd : cfds) { + autovector to_delete; + cfd->imm()->TrimHistory(&to_delete, cfd->mem()->ApproximateMemoryUsage()); + for (auto m : to_delete) { + delete m; + } + context->superversion_context.NewSuperVersion(); + assert(context->superversion_context.new_superversion.get() != nullptr); + cfd->InstallSuperVersion(&context->superversion_context, &mutex_); + + if (cfd->Unref()) { + delete cfd; + cfd = nullptr; + } + } + return Status::OK(); +} + Status DBImpl::ScheduleFlushes(WriteContext* context) { autovector cfds; if (immutable_db_options_.atomic_flush) { diff --git a/db/db_properties_test.cc b/db/db_properties_test.cc index 1a988f5ea..956accef8 100644 --- a/db/db_properties_test.cc +++ b/db/db_properties_test.cc @@ -615,8 +615,9 @@ TEST_F(DBPropertiesTest, NumImmutableMemTable) { writeOpt.disableWAL = true; options.max_write_buffer_number = 4; options.min_write_buffer_number_to_merge = 3; - options.max_write_buffer_number_to_maintain = 4; options.write_buffer_size = 1000000; + options.max_write_buffer_size_to_maintain = + 5 * static_cast(options.write_buffer_size); CreateAndReopenWithCF({"pikachu"}, options); std::string big_value(1000000 * 2, 'x'); @@ -747,7 +748,7 @@ TEST_F(DBPropertiesTest, DISABLED_GetProperty) { options.max_background_flushes = 1; options.max_write_buffer_number = 10; options.min_write_buffer_number_to_merge = 1; - options.max_write_buffer_number_to_maintain = 0; + options.max_write_buffer_size_to_maintain = 0; options.write_buffer_size = 1000000; Reopen(options); @@ -997,7 +998,7 @@ TEST_F(DBPropertiesTest, EstimatePendingCompBytes) { options.max_background_flushes = 1; options.max_write_buffer_number = 10; options.min_write_buffer_number_to_merge = 1; - options.max_write_buffer_number_to_maintain = 0; + options.max_write_buffer_size_to_maintain = 0; options.write_buffer_size = 1000000; Reopen(options); diff --git a/db/db_test.cc b/db/db_test.cc index 17ac22729..906a67cda 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -883,7 +883,7 @@ TEST_F(DBTest, FlushMultipleMemtable) { writeOpt.disableWAL = true; options.max_write_buffer_number = 4; options.min_write_buffer_number_to_merge = 3; - options.max_write_buffer_number_to_maintain = -1; + options.max_write_buffer_size_to_maintain = -1; CreateAndReopenWithCF({"pikachu"}, options); ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1")); ASSERT_OK(Flush(1)); @@ -901,7 +901,8 @@ TEST_F(DBTest, FlushSchedule) { options.level0_stop_writes_trigger = 1 << 10; options.level0_slowdown_writes_trigger = 1 << 10; options.min_write_buffer_number_to_merge = 1; - options.max_write_buffer_number_to_maintain = 1; + options.max_write_buffer_size_to_maintain = + static_cast(options.write_buffer_size); options.max_write_buffer_number = 2; options.write_buffer_size = 120 * 1024; CreateAndReopenWithCF({"pikachu"}, options); diff --git a/db/deletefile_test.cc b/db/deletefile_test.cc index 18014e5b4..b99c8e9dc 100644 --- a/db/deletefile_test.cc +++ b/db/deletefile_test.cc @@ -284,6 +284,8 @@ TEST_F(DeleteFileTest, BackgroundPurgeIteratorTest) { TEST_F(DeleteFileTest, BackgroundPurgeCFDropTest) { auto do_test = [&](bool bg_purge) { ColumnFamilyOptions co; + co.max_write_buffer_size_to_maintain = + static_cast(co.write_buffer_size); WriteOptions wo; FlushOptions fo; ColumnFamilyHandle* cfh = nullptr; diff --git a/db/flush_scheduler.cc b/db/flush_scheduler.cc index 9c6c04efe..cbcb5ce49 100644 --- a/db/flush_scheduler.cc +++ b/db/flush_scheduler.cc @@ -11,7 +11,7 @@ namespace rocksdb { -void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) { +void FlushScheduler::ScheduleWork(ColumnFamilyData* cfd) { #ifndef NDEBUG { std::lock_guard lock(checking_mutex_); diff --git a/db/flush_scheduler.h b/db/flush_scheduler.h index b5abec405..5ca85e88b 100644 --- a/db/flush_scheduler.h +++ b/db/flush_scheduler.h @@ -9,26 +9,29 @@ #include #include #include +#include "util/autovector.h" namespace rocksdb { class ColumnFamilyData; -// Unless otherwise noted, all methods on FlushScheduler should be called -// only with the DB mutex held or from a single-threaded recovery context. +// FlushScheduler keeps track of all column families whose memtable may +// be full and require flushing. Unless otherwise noted, all methods on +// FlushScheduler should be called only with the DB mutex held or from +// a single-threaded recovery context. class FlushScheduler { public: FlushScheduler() : head_(nullptr) {} // May be called from multiple threads at once, but not concurrent with // any other method calls on this instance - void ScheduleFlush(ColumnFamilyData* cfd); + void ScheduleWork(ColumnFamilyData* cfd); // Removes and returns Ref()-ed column family. Client needs to Unref(). // Filters column families that have been dropped. ColumnFamilyData* TakeNextColumnFamily(); - // This can be called concurrently with ScheduleFlush but it would miss all + // This can be called concurrently with ScheduleWork but it would miss all // the scheduled flushes after the last synchronization. This would result // into less precise enforcement of memtable sizes but should not matter much. bool Empty(); diff --git a/db/memtable.cc b/db/memtable.cc index 62c7339b5..06cb2222e 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -105,7 +105,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp, insert_with_hint_prefix_extractor_( ioptions.memtable_insert_with_hint_prefix_extractor), oldest_key_time_(std::numeric_limits::max()), - atomic_flush_seqno_(kMaxSequenceNumber) { + atomic_flush_seqno_(kMaxSequenceNumber), + approximate_memory_usage_(0) { UpdateFlushState(); // something went wrong if we need to flush before inserting anything assert(!ShouldScheduleFlush()); @@ -139,11 +140,12 @@ size_t MemTable::ApproximateMemoryUsage() { } total_usage += usage; } + approximate_memory_usage_.store(total_usage, std::memory_order_relaxed); // otherwise, return the actual usage return total_usage; } -bool MemTable::ShouldFlushNow() const { +bool MemTable::ShouldFlushNow() { size_t write_buffer_size = write_buffer_size_.load(std::memory_order_relaxed); // In a lot of times, we cannot allocate arena blocks that exactly matches the // buffer size. Thus we have to decide if we should over-allocate or @@ -159,6 +161,8 @@ bool MemTable::ShouldFlushNow() const { range_del_table_->ApproximateMemoryUsage() + arena_.MemoryAllocatedBytes(); + approximate_memory_usage_.store(allocated_memory, std::memory_order_relaxed); + // if we can still allocate one more block without exceeding the // over-allocation ratio, then we should not flush. if (allocated_memory + kArenaBlockSize < diff --git a/db/memtable.h b/db/memtable.h index 36ba0df79..c0baa9e17 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -130,6 +130,12 @@ class MemTable { // operations on the same MemTable (unless this Memtable is immutable). size_t ApproximateMemoryUsage(); + // As a cheap version of `ApproximateMemoryUsage()`, this function doens't + // require external synchronization. The value may be less accurate though + size_t ApproximateMemoryUsageFast() { + return approximate_memory_usage_.load(std::memory_order_relaxed); + } + // This method heuristically determines if the memtable should continue to // host more data. bool ShouldScheduleFlush() const { @@ -486,8 +492,12 @@ class MemTable { // writes with sequence number smaller than seq are flushed. SequenceNumber atomic_flush_seqno_; + // keep track of memory usage in table_, arena_, and range_del_table_. + // Gets refrshed inside `ApproximateMemoryUsage()` or `ShouldFlushNow` + std::atomic approximate_memory_usage_; + // Returns a heuristic flush decision - bool ShouldFlushNow() const; + bool ShouldFlushNow(); // Updates flush_state_ using ShouldFlushNow() void UpdateFlushState(); diff --git a/db/memtable_list.cc b/db/memtable_list.cc index d06a82df8..e3f0732de 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -46,6 +46,8 @@ MemTableListVersion::MemTableListVersion( size_t* parent_memtable_list_memory_usage, MemTableListVersion* old) : max_write_buffer_number_to_maintain_( old->max_write_buffer_number_to_maintain_), + max_write_buffer_size_to_maintain_( + old->max_write_buffer_size_to_maintain_), parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) { if (old != nullptr) { memlist_ = old->memlist_; @@ -62,8 +64,10 @@ MemTableListVersion::MemTableListVersion( MemTableListVersion::MemTableListVersion( size_t* parent_memtable_list_memory_usage, - int max_write_buffer_number_to_maintain) + int max_write_buffer_number_to_maintain, + int64_t max_write_buffer_size_to_maintain) : max_write_buffer_number_to_maintain_(max_write_buffer_number_to_maintain), + max_write_buffer_size_to_maintain_(max_write_buffer_size_to_maintain), parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {} void MemTableListVersion::Ref() { ++refs_; } @@ -240,7 +244,7 @@ void MemTableListVersion::Add(MemTable* m, autovector* to_delete) { assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable AddMemTable(m); - TrimHistory(to_delete); + TrimHistory(to_delete, m->ApproximateMemoryUsage()); } // Removes m from list of memtables not flushed. Caller should NOT Unref m. @@ -250,19 +254,51 @@ void MemTableListVersion::Remove(MemTable* m, memlist_.remove(m); m->MarkFlushed(); - if (max_write_buffer_number_to_maintain_ > 0) { + if (max_write_buffer_size_to_maintain_ > 0 || + max_write_buffer_number_to_maintain_ > 0) { memlist_history_.push_front(m); - TrimHistory(to_delete); + // Unable to get size of mutable memtable at this point, pass 0 to + // TrimHistory as a best effort. + TrimHistory(to_delete, 0); } else { UnrefMemTable(to_delete, m); } } +// return the total memory usage assuming the oldest flushed memtable is dropped +size_t MemTableListVersion::ApproximateMemoryUsageExcludingLast() { + size_t total_memtable_size = 0; + for (auto& memtable : memlist_) { + total_memtable_size += memtable->ApproximateMemoryUsage(); + } + for (auto& memtable : memlist_history_) { + total_memtable_size += memtable->ApproximateMemoryUsage(); + } + if (!memlist_history_.empty()) { + total_memtable_size -= memlist_history_.back()->ApproximateMemoryUsage(); + } + return total_memtable_size; +} + +bool MemTableListVersion::MemtableLimitExceeded(size_t usage) { + if (max_write_buffer_size_to_maintain_ > 0) { + // calculate the total memory usage after dropping the oldest flushed + // memtable, compare with max_write_buffer_size_to_maintain_ to decide + // whether to trim history + return ApproximateMemoryUsageExcludingLast() + usage >= + static_cast(max_write_buffer_size_to_maintain_); + } else if (max_write_buffer_number_to_maintain_ > 0) { + return memlist_.size() + memlist_history_.size() > + static_cast(max_write_buffer_number_to_maintain_); + } else { + return false; + } +} + // Make sure we don't use up too much space in history -void MemTableListVersion::TrimHistory(autovector* to_delete) { - while (memlist_.size() + memlist_history_.size() > - static_cast(max_write_buffer_number_to_maintain_) && - !memlist_history_.empty()) { +void MemTableListVersion::TrimHistory(autovector* to_delete, + size_t usage) { + while (MemtableLimitExceeded(usage) && !memlist_history_.empty()) { MemTable* x = memlist_history_.back(); memlist_history_.pop_back(); @@ -444,6 +480,8 @@ Status MemTableList::TryInstallMemtableFlushResults( cfd->GetName().c_str(), m->file_number_, mem_id); assert(m->file_number_ > 0); current_->Remove(m, to_delete); + UpdateMemoryUsageExcludingLast(); + ResetTrimHistoryNeeded(); ++mem_id; } } else { @@ -483,6 +521,15 @@ void MemTableList::Add(MemTable* m, autovector* to_delete) { if (num_flush_not_started_ == 1) { imm_flush_needed.store(true, std::memory_order_release); } + UpdateMemoryUsageExcludingLast(); + ResetTrimHistoryNeeded(); +} + +void MemTableList::TrimHistory(autovector* to_delete, size_t usage) { + InstallNewVersion(); + current_->TrimHistory(to_delete, usage); + UpdateMemoryUsageExcludingLast(); + ResetTrimHistoryNeeded(); } // Returns an estimate of the number of bytes of data in use. @@ -496,6 +543,20 @@ size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() { size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; } +size_t MemTableList::ApproximateMemoryUsageExcludingLast() { + size_t usage = + current_memory_usage_excluding_last_.load(std::memory_order_relaxed); + return usage; +} + +// Update current_memory_usage_excluding_last_, need to call whenever state +// changes for MemtableListVersion (whenever InstallNewVersion() is called) +void MemTableList::UpdateMemoryUsageExcludingLast() { + size_t total_memtable_size = current_->ApproximateMemoryUsageExcludingLast(); + current_memory_usage_excluding_last_.store(total_memtable_size, + std::memory_order_relaxed); +} + uint64_t MemTableList::ApproximateOldestKeyTime() const { if (!current_->memlist_.empty()) { return current_->memlist_.back()->ApproximateOldestKeyTime(); @@ -623,6 +684,8 @@ Status InstallMemtableAtomicFlushResults( cfds[i]->GetName().c_str(), m->GetFileNumber(), mem_id); imm->current_->Remove(m, to_delete); + imm->UpdateMemoryUsageExcludingLast(); + imm->ResetTrimHistoryNeeded(); } } } else { @@ -664,6 +727,8 @@ void MemTableList::RemoveOldMemTables(uint64_t log_number, imm_flush_needed.store(false, std::memory_order_release); } } + UpdateMemoryUsageExcludingLast(); + ResetTrimHistoryNeeded(); } } // namespace rocksdb diff --git a/db/memtable_list.h b/db/memtable_list.h index 2bd225b83..75cc1a524 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -44,7 +44,8 @@ class MemTableListVersion { explicit MemTableListVersion(size_t* parent_memtable_list_memory_usage, MemTableListVersion* old = nullptr); explicit MemTableListVersion(size_t* parent_memtable_list_memory_usage, - int max_write_buffer_number_to_maintain); + int max_write_buffer_number_to_maintain, + int64_t max_write_buffer_size_to_maintain); void Ref(); void Unref(autovector* to_delete = nullptr); @@ -139,7 +140,7 @@ class MemTableListVersion { // REQUIRE: m is an immutable memtable void Remove(MemTable* m, autovector* to_delete); - void TrimHistory(autovector* to_delete); + void TrimHistory(autovector* to_delete, size_t usage); bool GetFromList(std::list* list, const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, @@ -152,6 +153,14 @@ class MemTableListVersion { void UnrefMemTable(autovector* to_delete, MemTable* m); + // Calculate the total amount of memory used by memlist_ and memlist_history_ + // excluding the last MemTable in memlist_history_. The reason for excluding + // the last MemTable is to see if dropping the last MemTable will keep total + // memory usage above or equal to max_write_buffer_size_to_maintain_ + size_t ApproximateMemoryUsageExcludingLast(); + + bool MemtableLimitExceeded(size_t usage); + // Immutable MemTables that have not yet been flushed. std::list memlist_; @@ -160,8 +169,10 @@ class MemTableListVersion { std::list memlist_history_; // Maximum number of MemTables to keep in memory (including both flushed - // and not-yet-flushed tables). const int max_write_buffer_number_to_maintain_; + // Maximum size of MemTables to keep in memory (including both flushed + // and not-yet-flushed tables). + const int64_t max_write_buffer_size_to_maintain_; int refs_ = 0; @@ -176,35 +187,41 @@ class MemTableListVersion { // recoverability from a crash. // // -// Other than imm_flush_needed, this class is not thread-safe and requires -// external synchronization (such as holding the db mutex or being on the -// write thread.) +// Other than imm_flush_needed and imm_trim_needed, this class is not +// thread-safe and requires external synchronization (such as holding the db +// mutex or being on the write thread.) class MemTableList { public: // A list of memtables. explicit MemTableList(int min_write_buffer_number_to_merge, - int max_write_buffer_number_to_maintain) + int max_write_buffer_number_to_maintain, + int64_t max_write_buffer_size_to_maintain) : imm_flush_needed(false), + imm_trim_needed(false), min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge), current_(new MemTableListVersion(¤t_memory_usage_, - max_write_buffer_number_to_maintain)), + max_write_buffer_number_to_maintain, + max_write_buffer_size_to_maintain)), num_flush_not_started_(0), commit_in_progress_(false), - flush_requested_(false) { + flush_requested_(false), + current_memory_usage_(0), + current_memory_usage_excluding_last_(0) { current_->Ref(); - current_memory_usage_ = 0; } // Should not delete MemTableList without making sure MemTableList::current() // is Unref()'d. ~MemTableList() {} - MemTableListVersion* current() { return current_; } + MemTableListVersion* current() const { return current_; } // so that background threads can detect non-nullptr pointer to // determine whether there is anything more to start flushing. std::atomic imm_flush_needed; + std::atomic imm_trim_needed; + // Returns the total number of memtables in the list that haven't yet // been flushed and logged. int NumNotFlushed() const; @@ -243,6 +260,18 @@ class MemTableList { // Returns an estimate of the number of bytes of data in use. size_t ApproximateMemoryUsage(); + // Returns the cached current_memory_usage_excluding_last_ value + size_t ApproximateMemoryUsageExcludingLast(); + + // Update current_memory_usage_excluding_last_ from MemtableListVersion + void UpdateMemoryUsageExcludingLast(); + + // `usage` is the current size of the mutable Memtable. When + // max_write_buffer_size_to_maintain is used, total size of mutable and + // immutable memtables is checked against it to decide whether to trim + // memtable list. + void TrimHistory(autovector* to_delete, size_t usage); + // Returns an estimate of the number of bytes of data used by // the unflushed mem-tables. size_t ApproximateUnflushedMemTablesMemoryUsage(); @@ -259,6 +288,20 @@ class MemTableList { bool HasFlushRequested() { return flush_requested_; } + // Returns true if a trim history should be scheduled and the caller should + // be the one to schedule it + bool MarkTrimHistoryNeeded() { + auto expected = false; + return imm_trim_needed.compare_exchange_strong( + expected, true, std::memory_order_relaxed, std::memory_order_relaxed); + } + + void ResetTrimHistoryNeeded() { + auto expected = true; + imm_trim_needed.compare_exchange_strong( + expected, false, std::memory_order_relaxed, std::memory_order_relaxed); + } + // Copying allowed // MemTableList(const MemTableList&); // void operator=(const MemTableList&); @@ -338,6 +381,8 @@ class MemTableList { // The current memory usage. size_t current_memory_usage_; + + std::atomic current_memory_usage_excluding_last_; }; // Installs memtable atomic flush results. diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 3a14b6830..b8dc80216 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -183,7 +183,7 @@ class MemTableListTest : public testing::Test { TEST_F(MemTableListTest, Empty) { // Create an empty MemTableList and validate basic functions. - MemTableList list(1, 0); + MemTableList list(1, 0, 0); ASSERT_EQ(0, list.NumNotFlushed()); ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); @@ -202,8 +202,10 @@ TEST_F(MemTableListTest, GetTest) { // Create MemTableList int min_write_buffer_number_to_merge = 2; int max_write_buffer_number_to_maintain = 0; + int64_t max_write_buffer_size_to_maintain = 0; MemTableList list(min_write_buffer_number_to_merge, - max_write_buffer_number_to_maintain); + max_write_buffer_number_to_maintain, + max_write_buffer_size_to_maintain); SequenceNumber seq = 1; std::string value; @@ -312,8 +314,10 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { // Create MemTableList int min_write_buffer_number_to_merge = 2; int max_write_buffer_number_to_maintain = 2; + int64_t max_write_buffer_size_to_maintain = 2000; MemTableList list(min_write_buffer_number_to_merge, - max_write_buffer_number_to_maintain); + max_write_buffer_number_to_maintain, + max_write_buffer_size_to_maintain); SequenceNumber seq = 1; std::string value; @@ -514,8 +518,11 @@ TEST_F(MemTableListTest, FlushPendingTest) { // Create MemTableList int min_write_buffer_number_to_merge = 3; int max_write_buffer_number_to_maintain = 7; + int64_t max_write_buffer_size_to_maintain = + 7 * static_cast(options.write_buffer_size); MemTableList list(min_write_buffer_number_to_merge, - max_write_buffer_number_to_maintain); + max_write_buffer_number_to_maintain, + max_write_buffer_size_to_maintain); // Create some MemTables uint64_t memtable_id = 0; @@ -670,7 +677,9 @@ TEST_F(MemTableListTest, FlushPendingTest) { // created. So TryInstallMemtableFlushResults will install the first 3 tables // in to_flush and stop when it encounters a table not yet flushed. ASSERT_EQ(2, list.NumNotFlushed()); - int num_in_history = std::min(3, max_write_buffer_number_to_maintain); + int num_in_history = + std::min(3, static_cast(max_write_buffer_size_to_maintain) / + static_cast(options.write_buffer_size)); ASSERT_EQ(num_in_history, list.NumFlushed()); ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size()); @@ -687,7 +696,9 @@ TEST_F(MemTableListTest, FlushPendingTest) { // This will actually install 2 tables. The 1 we told it to flush, and also // tables[4] which has been waiting for tables[3] to commit. ASSERT_EQ(0, list.NumNotFlushed()); - num_in_history = std::min(5, max_write_buffer_number_to_maintain); + num_in_history = + std::min(5, static_cast(max_write_buffer_size_to_maintain) / + static_cast(options.write_buffer_size)); ASSERT_EQ(num_in_history, list.NumFlushed()); ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size()); @@ -730,7 +741,8 @@ TEST_F(MemTableListTest, FlushPendingTest) { list.current()->Unref(&to_delete); int to_delete_size = - std::min(num_tables, max_write_buffer_number_to_maintain); + std::min(num_tables, static_cast(max_write_buffer_size_to_maintain) / + static_cast(options.write_buffer_size)); ASSERT_EQ(to_delete_size, to_delete.size()); for (const auto& m : to_delete) { @@ -769,10 +781,13 @@ TEST_F(MemTableListTest, AtomicFlusTest) { // Create MemTableLists int min_write_buffer_number_to_merge = 3; int max_write_buffer_number_to_maintain = 7; + int64_t max_write_buffer_size_to_maintain = + 7 * static_cast(options.write_buffer_size); autovector lists; for (int i = 0; i != num_cfs; ++i) { lists.emplace_back(new MemTableList(min_write_buffer_number_to_merge, - max_write_buffer_number_to_maintain)); + max_write_buffer_number_to_maintain, + max_write_buffer_size_to_maintain)); } autovector cf_ids; diff --git a/db/repair.cc b/db/repair.cc index 8967b39f3..0f0d329cc 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -383,7 +383,8 @@ class Repairer { continue; } WriteBatchInternal::SetContents(&batch, record); - status = WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr); + status = + WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr, nullptr); if (status.ok()) { counter += WriteBatchInternal::Count(&batch); } else { diff --git a/db/trim_history_scheduler.cc b/db/trim_history_scheduler.cc new file mode 100644 index 000000000..a213ac65f --- /dev/null +++ b/db/trim_history_scheduler.cc @@ -0,0 +1,59 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/trim_history_scheduler.h" + +#include + +#include "db/column_family.h" + +namespace rocksdb { + +void TrimHistoryScheduler::ScheduleWork(ColumnFamilyData* cfd) { + std::lock_guard lock(checking_mutex_); + cfd->Ref(); + cfds_.push_back(cfd); + is_empty_.store(false, std::memory_order_relaxed); +} + +ColumnFamilyData* TrimHistoryScheduler::TakeNextColumnFamily() { + std::lock_guard lock(checking_mutex_); + while (true) { + if (cfds_.empty()) { + return nullptr; + } + ColumnFamilyData* cfd = cfds_.back(); + cfds_.pop_back(); + if (cfds_.empty()) { + is_empty_.store(true, std::memory_order_relaxed); + } + + if (!cfd->IsDropped()) { + // success + return cfd; + } + if (cfd->Unref()) { + // no longer relevant, retry + delete cfd; + } + } +} + +bool TrimHistoryScheduler::Empty() { + bool is_empty = is_empty_.load(std::memory_order_relaxed); + return is_empty; +} + +void TrimHistoryScheduler::Clear() { + ColumnFamilyData* cfd; + while ((cfd = TakeNextColumnFamily()) != nullptr) { + if (cfd->Unref()) { + delete cfd; + } + } + assert(Empty()); +} + +} // namespace rocksdb diff --git a/db/trim_history_scheduler.h b/db/trim_history_scheduler.h new file mode 100644 index 000000000..e9013b964 --- /dev/null +++ b/db/trim_history_scheduler.h @@ -0,0 +1,44 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include +#include +#include "util/autovector.h" + +namespace rocksdb { + +class ColumnFamilyData; + +// Similar to FlushScheduler, TrimHistoryScheduler is a FIFO queue that keeps +// track of column families whose flushed immutable memtables may need to be +// removed (aka trimmed). The actual trimming may be slightly delayed. Due to +// the use of the mutex and atomic variable, ScheduleWork, +// TakeNextColumnFamily, and, Empty can be called concurrently. +class TrimHistoryScheduler { + public: + TrimHistoryScheduler() : is_empty_(true) {} + + // When a column family needs history trimming, add cfd to the FIFO queue + void ScheduleWork(ColumnFamilyData* cfd); + + // Remove the column family from the queue, the caller is responsible for + // calling `MemtableList::TrimHistory` + ColumnFamilyData* TakeNextColumnFamily(); + + bool Empty(); + + void Clear(); + + // Not on critical path, use mutex to ensure thread safety + private: + std::atomic is_empty_; + autovector cfds_; + std::mutex checking_mutex_; +}; + +} // namespace rocksdb diff --git a/db/write_batch.cc b/db/write_batch.cc index 8a896644f..2a1bf1948 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -48,6 +48,7 @@ #include "db/memtable.h" #include "db/merge_context.h" #include "db/snapshot_impl.h" +#include "db/trim_history_scheduler.h" #include "db/write_batch_internal.h" #include "monitoring/perf_context_imp.h" #include "monitoring/statistics.h" @@ -1189,6 +1190,7 @@ class MemTableInserter : public WriteBatch::Handler { SequenceNumber sequence_; ColumnFamilyMemTables* const cf_mems_; FlushScheduler* const flush_scheduler_; + TrimHistoryScheduler* const trim_history_scheduler_; const bool ignore_missing_column_families_; const uint64_t recovering_log_number_; // log number that all Memtables inserted into should reference @@ -1250,6 +1252,7 @@ class MemTableInserter : public WriteBatch::Handler { // cf_mems should not be shared with concurrent inserters MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems, FlushScheduler* flush_scheduler, + TrimHistoryScheduler* trim_history_scheduler, bool ignore_missing_column_families, uint64_t recovering_log_number, DB* db, bool concurrent_memtable_writes, @@ -1258,6 +1261,7 @@ class MemTableInserter : public WriteBatch::Handler { : sequence_(_sequence), cf_mems_(cf_mems), flush_scheduler_(flush_scheduler), + trim_history_scheduler_(trim_history_scheduler), ignore_missing_column_families_(ignore_missing_column_families), recovering_log_number_(recovering_log_number), log_number_ref_(0), @@ -1748,7 +1752,20 @@ class MemTableInserter : public WriteBatch::Handler { cfd->mem()->MarkFlushScheduled()) { // MarkFlushScheduled only returns true if we are the one that // should take action, so no need to dedup further - flush_scheduler_->ScheduleFlush(cfd); + flush_scheduler_->ScheduleWork(cfd); + } + } + // check if memtable_list size exceeds max_write_buffer_size_to_maintain + if (trim_history_scheduler_ != nullptr) { + auto* cfd = cf_mems_->current(); + assert(cfd != nullptr); + if (cfd->ioptions()->max_write_buffer_size_to_maintain > 0 && + cfd->mem()->ApproximateMemoryUsageFast() + + cfd->imm()->ApproximateMemoryUsageExcludingLast() >= + static_cast( + cfd->ioptions()->max_write_buffer_size_to_maintain) && + cfd->imm()->MarkTrimHistoryNeeded()) { + trim_history_scheduler_->ScheduleWork(cfd); } } } @@ -1908,12 +1925,14 @@ class MemTableInserter : public WriteBatch::Handler { Status WriteBatchInternal::InsertInto( WriteThread::WriteGroup& write_group, SequenceNumber sequence, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, + TrimHistoryScheduler* trim_history_scheduler, bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db, bool concurrent_memtable_writes, bool seq_per_batch, bool batch_per_txn) { MemTableInserter inserter( - sequence, memtables, flush_scheduler, ignore_missing_column_families, - recovery_log_number, db, concurrent_memtable_writes, - nullptr /*has_valid_writes*/, seq_per_batch, batch_per_txn); + sequence, memtables, flush_scheduler, trim_history_scheduler, + ignore_missing_column_families, recovery_log_number, db, + concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch, + batch_per_txn); for (auto w : write_group) { if (w->CallbackFailed()) { continue; @@ -1939,6 +1958,7 @@ Status WriteBatchInternal::InsertInto( Status WriteBatchInternal::InsertInto( WriteThread::Writer* writer, SequenceNumber sequence, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, + TrimHistoryScheduler* trim_history_scheduler, bool ignore_missing_column_families, uint64_t log_number, DB* db, bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt, bool batch_per_txn) { @@ -1947,9 +1967,10 @@ Status WriteBatchInternal::InsertInto( #endif assert(writer->ShouldWriteToMemtable()); MemTableInserter inserter( - sequence, memtables, flush_scheduler, ignore_missing_column_families, - log_number, db, concurrent_memtable_writes, nullptr /*has_valid_writes*/, - seq_per_batch, batch_per_txn); + sequence, memtables, flush_scheduler, trim_history_scheduler, + ignore_missing_column_families, log_number, db, + concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch, + batch_per_txn); SetSequence(writer->batch, sequence); inserter.set_log_number_ref(writer->log_ref); Status s = writer->batch->Iterate(&inserter); @@ -1963,11 +1984,13 @@ Status WriteBatchInternal::InsertInto( Status WriteBatchInternal::InsertInto( const WriteBatch* batch, ColumnFamilyMemTables* memtables, - FlushScheduler* flush_scheduler, bool ignore_missing_column_families, - uint64_t log_number, DB* db, bool concurrent_memtable_writes, - SequenceNumber* next_seq, bool* has_valid_writes, bool seq_per_batch, - bool batch_per_txn) { + FlushScheduler* flush_scheduler, + TrimHistoryScheduler* trim_history_scheduler, + bool ignore_missing_column_families, uint64_t log_number, DB* db, + bool concurrent_memtable_writes, SequenceNumber* next_seq, + bool* has_valid_writes, bool seq_per_batch, bool batch_per_txn) { MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler, + trim_history_scheduler, ignore_missing_column_families, log_number, db, concurrent_memtable_writes, has_valid_writes, seq_per_batch, batch_per_txn); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 67136a847..6793e8450 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -9,11 +9,13 @@ #pragma once #include +#include "db/flush_scheduler.h" +#include "db/trim_history_scheduler.h" #include "db/write_thread.h" -#include "rocksdb/types.h" -#include "rocksdb/write_batch.h" #include "rocksdb/db.h" #include "rocksdb/options.h" +#include "rocksdb/types.h" +#include "rocksdb/write_batch.h" #include "util/autovector.h" namespace rocksdb { @@ -162,6 +164,7 @@ class WriteBatchInternal { static Status InsertInto( WriteThread::WriteGroup& write_group, SequenceNumber sequence, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, + TrimHistoryScheduler* trim_history_scheduler, bool ignore_missing_column_families = false, uint64_t log_number = 0, DB* db = nullptr, bool concurrent_memtable_writes = false, bool seq_per_batch = false, bool batch_per_txn = true); @@ -171,6 +174,7 @@ class WriteBatchInternal { static Status InsertInto( const WriteBatch* batch, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, + TrimHistoryScheduler* trim_history_scheduler, bool ignore_missing_column_families = false, uint64_t log_number = 0, DB* db = nullptr, bool concurrent_memtable_writes = false, SequenceNumber* next_seq = nullptr, bool* has_valid_writes = nullptr, @@ -179,6 +183,7 @@ class WriteBatchInternal { static Status InsertInto(WriteThread::Writer* writer, SequenceNumber sequence, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, + TrimHistoryScheduler* trim_history_scheduler, bool ignore_missing_column_families = false, uint64_t log_number = 0, DB* db = nullptr, bool concurrent_memtable_writes = false, diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 5de602cee..6a3f9e680 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -35,7 +35,8 @@ static std::string PrintContents(WriteBatch* b) { mem->Ref(); std::string state; ColumnFamilyMemTablesDefault cf_mems_default(mem); - Status s = WriteBatchInternal::InsertInto(b, &cf_mems_default, nullptr); + Status s = + WriteBatchInternal::InsertInto(b, &cf_mems_default, nullptr, nullptr); int count = 0; int put_count = 0; int delete_count = 0; diff --git a/examples/rocksdb_option_file_example.ini b/examples/rocksdb_option_file_example.ini index 351f1ed01..dcbc9a308 100644 --- a/examples/rocksdb_option_file_example.ini +++ b/examples/rocksdb_option_file_example.ini @@ -104,7 +104,7 @@ compression=kSnappyCompression level0_file_num_compaction_trigger=4 purge_redundant_kvs_while_flush=true - max_write_buffer_number_to_maintain=0 + max_write_buffer_size_to_maintain=0 memtable_factory=SkipListFactory max_grandparent_overlap_factor=8 expanded_compaction_factor=25 diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index c88a6c17d..2964491f7 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -175,11 +175,26 @@ struct AdvancedColumnFamilyOptions { // individual write buffers. Default: 1 int min_write_buffer_number_to_merge = 1; + // DEPRECATED // The total maximum number of write buffers to maintain in memory including // copies of buffers that have already been flushed. Unlike // max_write_buffer_number, this parameter does not affect flushing. - // This controls the minimum amount of write history that will be available - // in memory for conflict checking when Transactions are used. + // This parameter is being replaced by max_write_buffer_size_to_maintain. + // If both parameters are set to non-zero values, this parameter will be + // ignored. + int max_write_buffer_number_to_maintain = 0; + + // The total maximum size(bytes) of write buffers to maintain in memory + // including copies of buffers that have already been flushed. This parameter + // only affects trimming of flushed buffers and does not affect flushing. + // This controls the maximum amount of write history that will be available + // in memory for conflict checking when Transactions are used. The actual + // size of write history (flushed Memtables) might be higher than this limit + // if further trimming will reduce write history total size below this + // limit. For example, if max_write_buffer_size_to_maintain is set to 64MB, + // and there are three flushed Memtables, with sizes of 32MB, 20MB, 20MB. + // Because trimming the next Memtable of size 20MB will reduce total memory + // usage to 52MB which is below the limit, RocksDB will stop trimming. // // When using an OptimisticTransactionDB: // If this value is too low, some transactions may fail at commit time due @@ -192,14 +207,14 @@ struct AdvancedColumnFamilyOptions { // done for conflict detection. // // Setting this value to 0 will cause write buffers to be freed immediately - // after they are flushed. - // If this value is set to -1, 'max_write_buffer_number' will be used. + // after they are flushed. If this value is set to -1, + // 'max_write_buffer_number * write_buffer_size' will be used. // // Default: // If using a TransactionDB/OptimisticTransactionDB, the default value will - // be set to the value of 'max_write_buffer_number' if it is not explicitly - // set by the user. Otherwise, the default is 0. - int max_write_buffer_number_to_maintain = 0; + // be set to the value of 'max_write_buffer_number * write_buffer_size' + // if it is not explicitly set by the user. Otherwise, the default is 0. + int64_t max_write_buffer_size_to_maintain = 0; // Allows thread-safe inplace updates. If this is true, there is no way to // achieve point-in-time consistency using snapshot or iterator (assuming diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index e8cb32242..d7f13f8ed 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -854,6 +854,9 @@ rocksdb_options_set_min_write_buffer_number_to_merge(rocksdb_options_t*, int); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_max_write_buffer_number_to_maintain(rocksdb_options_t*, int); +extern ROCKSDB_LIBRARY_API void +rocksdb_options_set_max_write_buffer_size_to_maintain(rocksdb_options_t*, + int64_t); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_enable_pipelined_write( rocksdb_options_t*, unsigned char); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_unordered_write( diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index c0d8537a2..95d299c1b 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -131,7 +131,7 @@ class Transaction { // Status::Busy() may be returned if the transaction could not guarantee // that there are no write conflicts. Status::TryAgain() may be returned // if the memtable history size is not large enough - // (See max_write_buffer_number_to_maintain). + // (See max_write_buffer_size_to_maintain). // // If this transaction was created by a TransactionDB(), Status::Expired() // may be returned if this transaction has lived for longer than @@ -243,7 +243,7 @@ class Transaction { // Status::Busy() if there is a write conflict, // Status::TimedOut() if a lock could not be acquired, // Status::TryAgain() if the memtable history size is not large enough - // (See max_write_buffer_number_to_maintain) + // (See max_write_buffer_size_to_maintain) // Status::MergeInProgress() if merge operations cannot be resolved. // or other errors if this key could not be read. virtual Status GetForUpdate(const ReadOptions& options, @@ -320,7 +320,7 @@ class Transaction { // Status::Busy() if there is a write conflict, // Status::TimedOut() if a lock could not be acquired, // Status::TryAgain() if the memtable history size is not large enough - // (See max_write_buffer_number_to_maintain) + // (See max_write_buffer_size_to_maintain) // or other errors on unexpected failures. virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value, const bool assume_tracked = false) = 0; diff --git a/java/rocksjni/write_batch_test.cc b/java/rocksjni/write_batch_test.cc index c6b8a9239..eeb269e48 100644 --- a/java/rocksjni/write_batch_test.cc +++ b/java/rocksjni/write_batch_test.cc @@ -52,8 +52,8 @@ jbyteArray Java_org_rocksdb_WriteBatchTest_getContents(JNIEnv* env, mem->Ref(); std::string state; rocksdb::ColumnFamilyMemTablesDefault cf_mems_default(mem); - rocksdb::Status s = - rocksdb::WriteBatchInternal::InsertInto(b, &cf_mems_default, nullptr); + rocksdb::Status s = rocksdb::WriteBatchInternal::InsertInto( + b, &cf_mems_default, nullptr, nullptr); int count = 0; rocksdb::Arena arena; rocksdb::ScopedArenaIterator iter( diff --git a/options/cf_options.cc b/options/cf_options.cc index 5830fc661..ded9ca01b 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -33,6 +33,8 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options, cf_options.min_write_buffer_number_to_merge), max_write_buffer_number_to_maintain( cf_options.max_write_buffer_number_to_maintain), + max_write_buffer_size_to_maintain( + cf_options.max_write_buffer_size_to_maintain), inplace_update_support(cf_options.inplace_update_support), inplace_callback(cf_options.inplace_callback), info_log(db_options.info_log.get()), diff --git a/options/cf_options.h b/options/cf_options.h index 47fca58fa..e13eae801 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -43,6 +43,8 @@ struct ImmutableCFOptions { int max_write_buffer_number_to_maintain; + int64_t max_write_buffer_size_to_maintain; + bool inplace_update_support; UpdateStatus (*inplace_callback)(char* existing_value, diff --git a/options/options.cc b/options/options.cc index 5efd3ce57..11804d88f 100644 --- a/options/options.cc +++ b/options/options.cc @@ -42,6 +42,8 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options) options.min_write_buffer_number_to_merge), max_write_buffer_number_to_maintain( options.max_write_buffer_number_to_maintain), + max_write_buffer_size_to_maintain( + options.max_write_buffer_size_to_maintain), inplace_update_support(options.inplace_update_support), inplace_update_num_locks(options.inplace_update_num_locks), inplace_callback(options.inplace_callback), @@ -158,6 +160,9 @@ void ColumnFamilyOptions::Dump(Logger* log) const { min_write_buffer_number_to_merge); ROCKS_LOG_HEADER(log, " Options.max_write_buffer_number_to_maintain: %d", max_write_buffer_number_to_maintain); + ROCKS_LOG_HEADER(log, + " Options.max_write_buffer_size_to_maintain: %" PRIu64, + max_write_buffer_size_to_maintain); ROCKS_LOG_HEADER( log, " Options.bottommost_compression_opts.window_bits: %d", bottommost_compression_opts.window_bits); diff --git a/options/options_helper.cc b/options/options_helper.cc index 5733ceed4..588a45ef7 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -1873,6 +1873,9 @@ std::unordered_map {"max_write_buffer_number_to_maintain", {offset_of(&ColumnFamilyOptions::max_write_buffer_number_to_maintain), OptionType::kInt, OptionVerificationType::kNormal, false, 0}}, + {"max_write_buffer_size_to_maintain", + {offset_of(&ColumnFamilyOptions::max_write_buffer_size_to_maintain), + OptionType::kInt64T, OptionVerificationType::kNormal, false, 0}}, {"min_write_buffer_number_to_merge", {offset_of(&ColumnFamilyOptions::min_write_buffer_number_to_merge), OptionType::kInt, OptionVerificationType::kNormal, false, 0}}, diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index e60fd6f9e..d0fef5847 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -438,6 +438,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { "soft_rate_limit=530.615385;" "soft_pending_compaction_bytes_limit=0;" "max_write_buffer_number_to_maintain=84;" + "max_write_buffer_size_to_maintain=2147483648;" "merge_operator=aabcxehazrMergeOperator;" "memtable_prefix_bloom_size_ratio=0.4642;" "memtable_whole_key_filtering=true;" diff --git a/options/options_test.cc b/options/options_test.cc index 05ea766f6..0e7cebf3a 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -49,6 +49,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"max_write_buffer_number", "2"}, {"min_write_buffer_number_to_merge", "3"}, {"max_write_buffer_number_to_maintain", "99"}, + {"max_write_buffer_size_to_maintain", "-99999"}, {"compression", "kSnappyCompression"}, {"compression_per_level", "kNoCompression:" @@ -150,6 +151,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_cf_opt.max_write_buffer_number, 2); ASSERT_EQ(new_cf_opt.min_write_buffer_number_to_merge, 3); ASSERT_EQ(new_cf_opt.max_write_buffer_number_to_maintain, 99); + ASSERT_EQ(new_cf_opt.max_write_buffer_size_to_maintain, -99999); ASSERT_EQ(new_cf_opt.compression, kSnappyCompression); ASSERT_EQ(new_cf_opt.compression_per_level.size(), 9U); ASSERT_EQ(new_cf_opt.compression_per_level[0], kNoCompression); diff --git a/src.mk b/src.mk index 8ebc0bee9..6d087861d 100644 --- a/src.mk +++ b/src.mk @@ -53,6 +53,7 @@ LIB_SOURCES = \ db/table_cache.cc \ db/table_properties_collector.cc \ db/transaction_log_impl.cc \ + db/trim_history_scheduler.cc \ db/version_builder.cc \ db/version_edit.cc \ db/version_set.cc \ diff --git a/table/table_test.cc b/table/table_test.cc index 749048b78..c1f9ed3f3 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -3300,7 +3300,8 @@ TEST_F(MemTableTest, Simple) { batch.DeleteRange(std::string("begin"), std::string("end")); ColumnFamilyMemTablesDefault cf_mems_default(memtable); ASSERT_TRUE( - WriteBatchInternal::InsertInto(&batch, &cf_mems_default, nullptr).ok()); + WriteBatchInternal::InsertInto(&batch, &cf_mems_default, nullptr, nullptr) + .ok()); for (int i = 0; i < 2; ++i) { Arena arena; diff --git a/test_util/testutil.cc b/test_util/testutil.cc index 46f878f8c..f3e71bebc 100644 --- a/test_util/testutil.cc +++ b/test_util/testutil.cc @@ -333,6 +333,7 @@ void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, DBOptions& db_options, cf_opt->max_mem_compaction_level = rnd->Uniform(100); cf_opt->max_write_buffer_number = rnd->Uniform(100); cf_opt->max_write_buffer_number_to_maintain = rnd->Uniform(100); + cf_opt->max_write_buffer_size_to_maintain = rnd->Uniform(10000); cf_opt->min_write_buffer_number_to_merge = rnd->Uniform(100); cf_opt->num_levels = rnd->Uniform(100); cf_opt->target_file_size_multiplier = rnd->Uniform(100); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 001dd4d2f..3c2de42dd 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -340,6 +340,20 @@ DEFINE_int32(max_write_buffer_number_to_maintain, "after they are flushed. If this value is set to -1, " "'max_write_buffer_number' will be used."); +DEFINE_int64(max_write_buffer_size_to_maintain, + rocksdb::Options().max_write_buffer_size_to_maintain, + "The total maximum size of write buffers to maintain in memory " + "including copies of buffers that have already been flushed. " + "Unlike max_write_buffer_number, this parameter does not affect " + "flushing. This controls the minimum amount of write history " + "that will be available in memory for conflict checking when " + "Transactions are used. If this value is too low, some " + "transactions may fail at commit time due to not being able to " + "determine whether there were any write conflicts. Setting this " + "value to 0 will cause write buffers to be freed immediately " + "after they are flushed. If this value is set to -1, " + "'max_write_buffer_number' will be used."); + DEFINE_int32(max_background_jobs, rocksdb::Options().max_background_jobs, "The maximum number of concurrent background jobs that can occur " @@ -3385,6 +3399,8 @@ class Benchmark { FLAGS_min_write_buffer_number_to_merge; options.max_write_buffer_number_to_maintain = FLAGS_max_write_buffer_number_to_maintain; + options.max_write_buffer_size_to_maintain = + FLAGS_max_write_buffer_size_to_maintain; options.max_background_jobs = FLAGS_max_background_jobs; options.max_background_compactions = FLAGS_max_background_compactions; options.max_subcompactions = static_cast(FLAGS_subcompactions); diff --git a/tools/db_bench_tool_test.cc b/tools/db_bench_tool_test.cc index 4eb5472ac..87f8f1943 100644 --- a/tools/db_bench_tool_test.cc +++ b/tools/db_bench_tool_test.cc @@ -245,6 +245,7 @@ const std::string options_file_content = R"OPTIONS_FILE( expanded_compaction_factor=25 soft_rate_limit=0.000000 max_write_buffer_number_to_maintain=0 + max_write_buffer_size_to_maintain=0 verify_checksums_in_compaction=true merge_operator=nullptr memtable_prefix_bloom_bits=0 diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 7b4a36f0f..3461a75d2 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -204,6 +204,20 @@ DEFINE_int32(max_write_buffer_number_to_maintain, "after they are flushed. If this value is set to -1, " "'max_write_buffer_number' will be used."); +DEFINE_int64(max_write_buffer_size_to_maintain, + rocksdb::Options().max_write_buffer_size_to_maintain, + "The total maximum size of write buffers to maintain in memory " + "including copies of buffers that have already been flushed. " + "Unlike max_write_buffer_number, this parameter does not affect " + "flushing. This controls the minimum amount of write history " + "that will be available in memory for conflict checking when " + "Transactions are used. If this value is too low, some " + "transactions may fail at commit time due to not being able to " + "determine whether there were any write conflicts. Setting this " + "value to 0 will cause write buffers to be freed immediately " + "after they are flushed. If this value is set to -1, " + "'max_write_buffer_number' will be used."); + DEFINE_double(memtable_prefix_bloom_size_ratio, rocksdb::Options().memtable_prefix_bloom_size_ratio, "creates prefix blooms for memtables, each with size " @@ -2762,6 +2776,8 @@ class StressTest { FLAGS_min_write_buffer_number_to_merge; options_.max_write_buffer_number_to_maintain = FLAGS_max_write_buffer_number_to_maintain; + options_.max_write_buffer_size_to_maintain = + FLAGS_max_write_buffer_size_to_maintain; options_.memtable_prefix_bloom_size_ratio = FLAGS_memtable_prefix_bloom_size_ratio; options_.memtable_whole_key_filtering = diff --git a/util/string_util.cc b/util/string_util.cc index 9b447d50c..4a194f3a9 100644 --- a/util/string_util.cc +++ b/util/string_util.cc @@ -6,17 +6,16 @@ #include "util/string_util.h" #include -#include #include #include #include #include +#include #include #include #include #include #include -#include "rocksdb/env.h" #include "port/port.h" #include "port/sys_time.h" #include "rocksdb/slice.h" diff --git a/util/string_util.h b/util/string_util.h index faf763e54..122a6c356 100644 --- a/util/string_util.h +++ b/util/string_util.h @@ -121,7 +121,6 @@ uint64_t ParseUint64(const std::string& value); int ParseInt(const std::string& value); - int64_t ParseInt64(const std::string& value); double ParseDouble(const std::string& value); diff --git a/utilities/transactions/optimistic_transaction_db_impl.cc b/utilities/transactions/optimistic_transaction_db_impl.cc index b7fedc066..0fb7c9100 100644 --- a/utilities/transactions/optimistic_transaction_db_impl.cc +++ b/utilities/transactions/optimistic_transaction_db_impl.cc @@ -63,9 +63,11 @@ Status OptimisticTransactionDB::Open( for (auto& column_family : column_families_copy) { ColumnFamilyOptions* options = &column_family.options; - if (options->max_write_buffer_number_to_maintain == 0) { - // Setting to -1 will set the History size to max_write_buffer_number. - options->max_write_buffer_number_to_maintain = -1; + if (options->max_write_buffer_size_to_maintain == 0 && + options->max_write_buffer_number_to_maintain == 0) { + // Setting to -1 will set the History size to + // max_write_buffer_number * write_buffer_size. + options->max_write_buffer_size_to_maintain = -1; } } diff --git a/utilities/transactions/optimistic_transaction_test.cc b/utilities/transactions/optimistic_transaction_test.cc index 3aa6c207a..2a6933495 100644 --- a/utilities/transactions/optimistic_transaction_test.cc +++ b/utilities/transactions/optimistic_transaction_test.cc @@ -36,6 +36,7 @@ class OptimisticTransactionTest : public testing::Test { OptimisticTransactionTest() { options.create_if_missing = true; options.max_write_buffer_number = 2; + options.max_write_buffer_size_to_maintain = 1600; dbname = test::PerThreadDBPath("optimistic_transaction_testdb"); DestroyDB(dbname, options); diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 2f9c918a3..caac2ab18 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -271,9 +271,11 @@ void TransactionDB::PrepareWrap( for (size_t i = 0; i < column_families->size(); i++) { ColumnFamilyOptions* cf_options = &(*column_families)[i].options; - if (cf_options->max_write_buffer_number_to_maintain == 0) { - // Setting to -1 will set the History size to max_write_buffer_number. - cf_options->max_write_buffer_number_to_maintain = -1; + if (cf_options->max_write_buffer_size_to_maintain == 0 && + cf_options->max_write_buffer_number_to_maintain == 0) { + // Setting to -1 will set the History size to + // max_write_buffer_number * write_buffer_size. + cf_options->max_write_buffer_size_to_maintain = -1; } if (!cf_options->disable_auto_compactions) { // Disable compactions momentarily to prevent race with DB::Open diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index da7cee063..23ae374dc 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -227,8 +227,10 @@ TEST_P(TransactionTest, ValidateSnapshotTest) { db_impl->TEST_FlushMemTable(true); // Make sure the flushed memtable is not kept in memory int max_memtable_in_history = - std::max(options.max_write_buffer_number, - options.max_write_buffer_number_to_maintain) + + std::max( + options.max_write_buffer_number, + static_cast(options.max_write_buffer_size_to_maintain) / + static_cast(options.write_buffer_size)) + 1; for (int i = 0; i < max_memtable_in_history; i++) { db->Put(write_options, Slice("key"), Slice("value")); diff --git a/utilities/transactions/transaction_util.cc b/utilities/transactions/transaction_util.cc index ba3b75e15..371448503 100644 --- a/utilities/transactions/transaction_util.cc +++ b/utilities/transactions/transaction_util.cc @@ -94,7 +94,7 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, " as the MemTable only contains changes newer than " "SequenceNumber %" PRIu64 ". Increasing the value of the " - "max_write_buffer_number_to_maintain option could reduce the " + "max_write_buffer_size_to_maintain option could reduce the " "frequency " "of this error.", snap_seq, earliest_seq);