Refactor trimming logic for immutable memtables (#5022)

Summary:
MyRocks currently sets `max_write_buffer_number_to_maintain` in order to maintain enough history for transaction conflict checking. The effectiveness of this approach depends on the size of memtables. When memtables are small, it may not keep enough history; when memtables are large, this may consume too much memory.
We are proposing a new way to configure memtable list history: by limiting the memory usage of immutable memtables. The new option is `max_write_buffer_size_to_maintain` and it will take precedence over the old `max_write_buffer_number_to_maintain` if they are both set to non-zero values. The new option accounts for the total memory usage of flushed immutable memtables and mutable memtable. When the total usage exceeds the limit, RocksDB may start dropping immutable memtables (which is also called trimming history), starting from the oldest one.
The semantics of the old option actually works both as an upper bound and lower bound. History trimming will start if number of immutable memtables exceeds the limit, but it will never go below (limit-1) due to history trimming.
In order the mimic the behavior with the new option, history trimming will stop if dropping the next immutable memtable causes the total memory usage go below the size limit. For example, assuming the size limit is set to 64MB, and there are 3 immutable memtables with sizes of 20, 30, 30. Although the total memory usage is 80MB > 64MB, dropping the oldest memtable will reduce the memory usage to 60MB < 64MB, so in this case no memtable will be dropped.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5022

Differential Revision: D14394062

Pulled By: miasantreble

fbshipit-source-id: 60457a509c6af89d0993f988c9b5c2aa9e45f5c5
main
Zhongyi Xie 5 years ago committed by Facebook Github Bot
parent 26293c89a6
commit 2f41ecfe75
  1. 1
      CMakeLists.txt
  2. 2
      HISTORY.md
  3. 1
      TARGETS
  4. 5
      db/c.cc
  5. 12
      db/column_family.cc
  6. 11
      db/column_family_test.cc
  7. 5
      db/db_basic_test.cc
  8. 2
      db/db_compaction_test.cc
  9. 1
      db/db_impl/db_impl.cc
  10. 5
      db/db_impl/db_impl.h
  11. 8
      db/db_impl/db_impl_open.cc
  12. 6
      db/db_impl/db_impl_secondary.cc
  13. 55
      db/db_impl/db_impl_write.cc
  14. 7
      db/db_properties_test.cc
  15. 5
      db/db_test.cc
  16. 2
      db/deletefile_test.cc
  17. 2
      db/flush_scheduler.cc
  18. 11
      db/flush_scheduler.h
  19. 8
      db/memtable.cc
  20. 12
      db/memtable.h
  21. 81
      db/memtable_list.cc
  22. 67
      db/memtable_list.h
  23. 31
      db/memtable_list_test.cc
  24. 3
      db/repair.cc
  25. 59
      db/trim_history_scheduler.cc
  26. 44
      db/trim_history_scheduler.h
  27. 45
      db/write_batch.cc
  28. 9
      db/write_batch_internal.h
  29. 3
      db/write_batch_test.cc
  30. 2
      examples/rocksdb_option_file_example.ini
  31. 29
      include/rocksdb/advanced_options.h
  32. 3
      include/rocksdb/c.h
  33. 6
      include/rocksdb/utilities/transaction.h
  34. 4
      java/rocksjni/write_batch_test.cc
  35. 2
      options/cf_options.cc
  36. 2
      options/cf_options.h
  37. 5
      options/options.cc
  38. 3
      options/options_helper.cc
  39. 1
      options/options_settable_test.cc
  40. 2
      options/options_test.cc
  41. 1
      src.mk
  42. 3
      table/table_test.cc
  43. 1
      test_util/testutil.cc
  44. 16
      tools/db_bench_tool.cc
  45. 1
      tools/db_bench_tool_test.cc
  46. 16
      tools/db_stress.cc
  47. 3
      util/string_util.cc
  48. 1
      util/string_util.h
  49. 8
      utilities/transactions/optimistic_transaction_db_impl.cc
  50. 1
      utilities/transactions/optimistic_transaction_test.cc
  51. 8
      utilities/transactions/pessimistic_transaction_db.cc
  52. 6
      utilities/transactions/transaction_test.cc
  53. 2
      utilities/transactions/transaction_util.cc

@ -530,6 +530,7 @@ set(SOURCES
db/table_cache.cc
db/table_properties_collector.cc
db/transaction_log_impl.cc
db/trim_history_scheduler.cc
db/version_builder.cc
db/version_edit.cc
db/version_set.cc

@ -6,6 +6,8 @@
* Fix bloom filter lookups by the MultiGet batching API when BlockBasedTableOptions::whole_key_filtering is false, by checking that a key is in the perfix_extractor domain and extracting the prefix before looking up.
### New Features
* VerifyChecksum() by default will issue readahead. Allow ReadOptions to be passed in to those functions to override the readhead size. For checksum verifying before external SST file ingestion, a new option IngestExternalFileOptions.verify_checksums_readahead_size, is added for this readahead setting.
### Public API Change
* Added max_write_buffer_size_to_maintain option to better control memory usage of immutable memtables.
## 6.4.0 (7/30/2019)
### Default Option Change

@ -158,6 +158,7 @@ cpp_library(
"db/table_cache.cc",
"db/table_properties_collector.cc",
"db/transaction_log_impl.cc",
"db/trim_history_scheduler.cc",
"db/version_builder.cc",
"db/version_edit.cc",
"db/version_set.cc",

@ -2514,6 +2514,11 @@ void rocksdb_options_set_max_write_buffer_number_to_maintain(
opt->rep.max_write_buffer_number_to_maintain = n;
}
void rocksdb_options_set_max_write_buffer_size_to_maintain(
rocksdb_options_t* opt, int64_t n) {
opt->rep.max_write_buffer_size_to_maintain = n;
}
void rocksdb_options_set_enable_pipelined_write(rocksdb_options_t* opt,
unsigned char v) {
opt->rep.enable_pipelined_write = v;

@ -227,7 +227,14 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
if (result.max_write_buffer_number < 2) {
result.max_write_buffer_number = 2;
}
if (result.max_write_buffer_number_to_maintain < 0) {
// fall back max_write_buffer_number_to_maintain if
// max_write_buffer_size_to_maintain is not set
if (result.max_write_buffer_size_to_maintain < 0) {
result.max_write_buffer_size_to_maintain =
result.max_write_buffer_number *
static_cast<int64_t>(result.write_buffer_size);
} else if (result.max_write_buffer_size_to_maintain == 0 &&
result.max_write_buffer_number_to_maintain < 0) {
result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
}
// bloom filter size shouldn't exceed 1/4 of memtable size.
@ -423,7 +430,8 @@ ColumnFamilyData::ColumnFamilyData(
write_buffer_manager_(write_buffer_manager),
mem_(nullptr),
imm_(ioptions_.min_write_buffer_number_to_merge,
ioptions_.max_write_buffer_number_to_maintain),
ioptions_.max_write_buffer_number_to_maintain,
ioptions_.max_write_buffer_size_to_maintain),
super_version_(nullptr),
super_version_number_(0),
local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),

@ -1132,22 +1132,25 @@ TEST_P(ColumnFamilyTest, DifferentWriteBufferSizes) {
default_cf.arena_block_size = 4 * 4096;
default_cf.max_write_buffer_number = 10;
default_cf.min_write_buffer_number_to_merge = 1;
default_cf.max_write_buffer_number_to_maintain = 0;
default_cf.max_write_buffer_size_to_maintain = 0;
one.write_buffer_size = 200000;
one.arena_block_size = 4 * 4096;
one.max_write_buffer_number = 10;
one.min_write_buffer_number_to_merge = 2;
one.max_write_buffer_number_to_maintain = 1;
one.max_write_buffer_size_to_maintain =
static_cast<int>(one.write_buffer_size);
two.write_buffer_size = 1000000;
two.arena_block_size = 4 * 4096;
two.max_write_buffer_number = 10;
two.min_write_buffer_number_to_merge = 3;
two.max_write_buffer_number_to_maintain = 2;
two.max_write_buffer_size_to_maintain =
static_cast<int>(two.write_buffer_size);
three.write_buffer_size = 4096 * 22;
three.arena_block_size = 4096;
three.max_write_buffer_number = 10;
three.min_write_buffer_number_to_merge = 4;
three.max_write_buffer_number_to_maintain = -1;
three.max_write_buffer_size_to_maintain =
static_cast<int>(three.write_buffer_size);
Reopen({default_cf, one, two, three});

@ -297,7 +297,7 @@ TEST_F(DBBasicTest, FlushMultipleMemtable) {
writeOpt.disableWAL = true;
options.max_write_buffer_number = 4;
options.min_write_buffer_number_to_merge = 3;
options.max_write_buffer_number_to_maintain = -1;
options.max_write_buffer_size_to_maintain = -1;
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
ASSERT_OK(Flush(1));
@ -327,7 +327,8 @@ TEST_F(DBBasicTest, FlushEmptyColumnFamily) {
writeOpt.disableWAL = true;
options.max_write_buffer_number = 2;
options.min_write_buffer_number_to_merge = 1;
options.max_write_buffer_number_to_maintain = 1;
options.max_write_buffer_size_to_maintain =
static_cast<int64_t>(options.write_buffer_size);
CreateAndReopenWithCF({"pikachu"}, options);
// Compaction can still go through even if no thread can flush the

@ -141,7 +141,7 @@ Options DeletionTriggerOptions(Options options) {
options.compression = kNoCompression;
options.write_buffer_size = kCDTKeysPerBuffer * (kCDTValueSize + 24);
options.min_write_buffer_number_to_merge = 1;
options.max_write_buffer_number_to_maintain = 0;
options.max_write_buffer_size_to_maintain = 0;
options.num_levels = kCDTNumLevels;
options.level0_file_num_compaction_trigger = 1;
options.target_file_size_base = options.write_buffer_size * 2;

@ -472,6 +472,7 @@ Status DBImpl::CloseHelper() {
&files_grabbed_for_purge_);
EraseThreadStatusDbInfo();
flush_scheduler_.Clear();
trim_history_scheduler_.Clear();
while (!flush_queue_.empty()) {
const FlushRequest& flush_req = PopFirstFromFlushQueue();

@ -37,6 +37,7 @@
#include "db/read_callback.h"
#include "db/snapshot_checker.h"
#include "db/snapshot_impl.h"
#include "db/trim_history_scheduler.h"
#include "db/version_edit.h"
#include "db/wal_manager.h"
#include "db/write_controller.h"
@ -1355,6 +1356,8 @@ class DBImpl : public DB {
void MaybeFlushStatsCF(autovector<ColumnFamilyData*>* cfds);
Status TrimMemtableHistory(WriteContext* context);
Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context);
void SelectColumnFamiliesForAtomicFlush(autovector<ColumnFamilyData*>* cfds);
@ -1733,6 +1736,8 @@ class DBImpl : public DB {
FlushScheduler flush_scheduler_;
TrimHistoryScheduler trim_history_scheduler_;
SnapshotList snapshots_;
// For each background job, pending_outputs_ keeps the current file number at

@ -862,9 +862,10 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// That's why we set ignore missing column families to true
bool has_valid_writes = false;
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), &flush_scheduler_, true,
log_number, this, false /* concurrent_memtable_writes */,
next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_);
&batch, column_family_memtables_.get(), &flush_scheduler_,
&trim_history_scheduler_, true, log_number, this,
false /* concurrent_memtable_writes */, next_sequence,
&has_valid_writes, seq_per_batch_, batch_per_txn_);
MaybeIgnoreError(&status);
if (!status.ok()) {
// We are treating this as a failure while reading since we read valid
@ -931,6 +932,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
}
flush_scheduler_.Clear();
trim_history_scheduler_.Clear();
auto last_sequence = *next_sequence - 1;
if ((*next_sequence != kMaxSequenceNumber) &&
(versions_->LastSequence() <= last_sequence)) {

@ -253,9 +253,9 @@ Status DBImplSecondary::RecoverLogFiles(
bool has_valid_writes = false;
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(),
nullptr /* flush_scheduler */, true, log_number, this,
false /* concurrent_memtable_writes */, next_sequence,
&has_valid_writes, seq_per_batch_, batch_per_txn_);
nullptr /* flush_scheduler */, nullptr /* trim_history_scheduler*/,
true, log_number, this, false /* concurrent_memtable_writes */,
next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_);
}
// If column family was not found, it might mean that the WAL write
// batch references to the column family that was dropped after the

@ -171,6 +171,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
versions_->GetColumnFamilySet());
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
&trim_history_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt,
batch_per_txn_);
@ -375,7 +376,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// w.sequence will be set inside InsertInto
w.status = WriteBatchInternal::InsertInto(
write_group, current_sequence, column_family_memtables_.get(),
&flush_scheduler_, write_options.ignore_missing_column_families,
&flush_scheduler_, &trim_history_scheduler_,
write_options.ignore_missing_column_families,
0 /*recovery_log_number*/, this, parallel, seq_per_batch_,
batch_per_txn_);
} else {
@ -391,6 +393,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
assert(w.sequence == current_sequence);
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
&trim_history_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/,
this, true /*concurrent_memtable_writes*/, seq_per_batch_,
w.batch_cnt, batch_per_txn_);
@ -545,9 +548,9 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
} else {
memtable_write_group.status = WriteBatchInternal::InsertInto(
memtable_write_group, w.sequence, column_family_memtables_.get(),
&flush_scheduler_, write_options.ignore_missing_column_families,
0 /*log_number*/, this, false /*concurrent_memtable_writes*/,
seq_per_batch_, batch_per_txn_);
&flush_scheduler_, &trim_history_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
false /*concurrent_memtable_writes*/, seq_per_batch_, batch_per_txn_);
versions_->SetLastSequence(memtable_write_group.last_sequence);
write_thread_.ExitAsMemTableWriter(&w, memtable_write_group);
}
@ -559,8 +562,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
versions_->GetColumnFamilySet());
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/);
&trim_history_scheduler_, write_options.ignore_missing_column_families,
0 /*log_number*/, this, true /*concurrent_memtable_writes*/);
if (write_thread_.CompleteParallelMemTableWriter(&w)) {
MemTableInsertStatusCheck(w.status);
versions_->SetLastSequence(w.write_group->last_sequence);
@ -597,8 +600,9 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
versions_->GetColumnFamilySet());
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/, seq_per_batch_, sub_batch_cnt);
&trim_history_scheduler_, write_options.ignore_missing_column_families,
0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
seq_per_batch_, sub_batch_cnt);
WriteStatusCheck(w.status);
if (write_options.disableWAL) {
@ -856,6 +860,10 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
status = HandleWriteBufferFull(write_context);
}
if (UNLIKELY(status.ok() && !trim_history_scheduler_.Empty())) {
status = TrimMemtableHistory(write_context);
}
if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
WaitForPendingWrites();
status = ScheduleFlushes(write_context);
@ -1112,9 +1120,9 @@ Status DBImpl::WriteRecoverableState() {
WriteBatchInternal::SetSequence(&cached_recoverable_state_, seq + 1);
auto status = WriteBatchInternal::InsertInto(
&cached_recoverable_state_, column_family_memtables_.get(),
&flush_scheduler_, true, 0 /*recovery_log_number*/, this,
false /* concurrent_memtable_writes */, &next_seq, &dont_care_bool,
seq_per_batch_);
&flush_scheduler_, &trim_history_scheduler_, true,
0 /*recovery_log_number*/, this, false /* concurrent_memtable_writes */,
&next_seq, &dont_care_bool, seq_per_batch_);
auto last_seq = next_seq - 1;
if (two_write_queues_) {
versions_->FetchAddLastAllocatedSequence(last_seq - seq);
@ -1474,6 +1482,31 @@ void DBImpl::MaybeFlushStatsCF(autovector<ColumnFamilyData*>* cfds) {
}
}
Status DBImpl::TrimMemtableHistory(WriteContext* context) {
autovector<ColumnFamilyData*> cfds;
ColumnFamilyData* tmp_cfd;
while ((tmp_cfd = trim_history_scheduler_.TakeNextColumnFamily()) !=
nullptr) {
cfds.push_back(tmp_cfd);
}
for (auto& cfd : cfds) {
autovector<MemTable*> to_delete;
cfd->imm()->TrimHistory(&to_delete, cfd->mem()->ApproximateMemoryUsage());
for (auto m : to_delete) {
delete m;
}
context->superversion_context.NewSuperVersion();
assert(context->superversion_context.new_superversion.get() != nullptr);
cfd->InstallSuperVersion(&context->superversion_context, &mutex_);
if (cfd->Unref()) {
delete cfd;
cfd = nullptr;
}
}
return Status::OK();
}
Status DBImpl::ScheduleFlushes(WriteContext* context) {
autovector<ColumnFamilyData*> cfds;
if (immutable_db_options_.atomic_flush) {

@ -615,8 +615,9 @@ TEST_F(DBPropertiesTest, NumImmutableMemTable) {
writeOpt.disableWAL = true;
options.max_write_buffer_number = 4;
options.min_write_buffer_number_to_merge = 3;
options.max_write_buffer_number_to_maintain = 4;
options.write_buffer_size = 1000000;
options.max_write_buffer_size_to_maintain =
5 * static_cast<int64_t>(options.write_buffer_size);
CreateAndReopenWithCF({"pikachu"}, options);
std::string big_value(1000000 * 2, 'x');
@ -747,7 +748,7 @@ TEST_F(DBPropertiesTest, DISABLED_GetProperty) {
options.max_background_flushes = 1;
options.max_write_buffer_number = 10;
options.min_write_buffer_number_to_merge = 1;
options.max_write_buffer_number_to_maintain = 0;
options.max_write_buffer_size_to_maintain = 0;
options.write_buffer_size = 1000000;
Reopen(options);
@ -997,7 +998,7 @@ TEST_F(DBPropertiesTest, EstimatePendingCompBytes) {
options.max_background_flushes = 1;
options.max_write_buffer_number = 10;
options.min_write_buffer_number_to_merge = 1;
options.max_write_buffer_number_to_maintain = 0;
options.max_write_buffer_size_to_maintain = 0;
options.write_buffer_size = 1000000;
Reopen(options);

@ -883,7 +883,7 @@ TEST_F(DBTest, FlushMultipleMemtable) {
writeOpt.disableWAL = true;
options.max_write_buffer_number = 4;
options.min_write_buffer_number_to_merge = 3;
options.max_write_buffer_number_to_maintain = -1;
options.max_write_buffer_size_to_maintain = -1;
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
ASSERT_OK(Flush(1));
@ -901,7 +901,8 @@ TEST_F(DBTest, FlushSchedule) {
options.level0_stop_writes_trigger = 1 << 10;
options.level0_slowdown_writes_trigger = 1 << 10;
options.min_write_buffer_number_to_merge = 1;
options.max_write_buffer_number_to_maintain = 1;
options.max_write_buffer_size_to_maintain =
static_cast<int64_t>(options.write_buffer_size);
options.max_write_buffer_number = 2;
options.write_buffer_size = 120 * 1024;
CreateAndReopenWithCF({"pikachu"}, options);

@ -284,6 +284,8 @@ TEST_F(DeleteFileTest, BackgroundPurgeIteratorTest) {
TEST_F(DeleteFileTest, BackgroundPurgeCFDropTest) {
auto do_test = [&](bool bg_purge) {
ColumnFamilyOptions co;
co.max_write_buffer_size_to_maintain =
static_cast<int64_t>(co.write_buffer_size);
WriteOptions wo;
FlushOptions fo;
ColumnFamilyHandle* cfh = nullptr;

@ -11,7 +11,7 @@
namespace rocksdb {
void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) {
void FlushScheduler::ScheduleWork(ColumnFamilyData* cfd) {
#ifndef NDEBUG
{
std::lock_guard<std::mutex> lock(checking_mutex_);

@ -9,26 +9,29 @@
#include <atomic>
#include <mutex>
#include <set>
#include "util/autovector.h"
namespace rocksdb {
class ColumnFamilyData;
// Unless otherwise noted, all methods on FlushScheduler should be called
// only with the DB mutex held or from a single-threaded recovery context.
// FlushScheduler keeps track of all column families whose memtable may
// be full and require flushing. Unless otherwise noted, all methods on
// FlushScheduler should be called only with the DB mutex held or from
// a single-threaded recovery context.
class FlushScheduler {
public:
FlushScheduler() : head_(nullptr) {}
// May be called from multiple threads at once, but not concurrent with
// any other method calls on this instance
void ScheduleFlush(ColumnFamilyData* cfd);
void ScheduleWork(ColumnFamilyData* cfd);
// Removes and returns Ref()-ed column family. Client needs to Unref().
// Filters column families that have been dropped.
ColumnFamilyData* TakeNextColumnFamily();
// This can be called concurrently with ScheduleFlush but it would miss all
// This can be called concurrently with ScheduleWork but it would miss all
// the scheduled flushes after the last synchronization. This would result
// into less precise enforcement of memtable sizes but should not matter much.
bool Empty();

@ -105,7 +105,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
insert_with_hint_prefix_extractor_(
ioptions.memtable_insert_with_hint_prefix_extractor),
oldest_key_time_(std::numeric_limits<uint64_t>::max()),
atomic_flush_seqno_(kMaxSequenceNumber) {
atomic_flush_seqno_(kMaxSequenceNumber),
approximate_memory_usage_(0) {
UpdateFlushState();
// something went wrong if we need to flush before inserting anything
assert(!ShouldScheduleFlush());
@ -139,11 +140,12 @@ size_t MemTable::ApproximateMemoryUsage() {
}
total_usage += usage;
}
approximate_memory_usage_.store(total_usage, std::memory_order_relaxed);
// otherwise, return the actual usage
return total_usage;
}
bool MemTable::ShouldFlushNow() const {
bool MemTable::ShouldFlushNow() {
size_t write_buffer_size = write_buffer_size_.load(std::memory_order_relaxed);
// In a lot of times, we cannot allocate arena blocks that exactly matches the
// buffer size. Thus we have to decide if we should over-allocate or
@ -159,6 +161,8 @@ bool MemTable::ShouldFlushNow() const {
range_del_table_->ApproximateMemoryUsage() +
arena_.MemoryAllocatedBytes();
approximate_memory_usage_.store(allocated_memory, std::memory_order_relaxed);
// if we can still allocate one more block without exceeding the
// over-allocation ratio, then we should not flush.
if (allocated_memory + kArenaBlockSize <

@ -130,6 +130,12 @@ class MemTable {
// operations on the same MemTable (unless this Memtable is immutable).
size_t ApproximateMemoryUsage();
// As a cheap version of `ApproximateMemoryUsage()`, this function doens't
// require external synchronization. The value may be less accurate though
size_t ApproximateMemoryUsageFast() {
return approximate_memory_usage_.load(std::memory_order_relaxed);
}
// This method heuristically determines if the memtable should continue to
// host more data.
bool ShouldScheduleFlush() const {
@ -486,8 +492,12 @@ class MemTable {
// writes with sequence number smaller than seq are flushed.
SequenceNumber atomic_flush_seqno_;
// keep track of memory usage in table_, arena_, and range_del_table_.
// Gets refrshed inside `ApproximateMemoryUsage()` or `ShouldFlushNow`
std::atomic<uint64_t> approximate_memory_usage_;
// Returns a heuristic flush decision
bool ShouldFlushNow() const;
bool ShouldFlushNow();
// Updates flush_state_ using ShouldFlushNow()
void UpdateFlushState();

@ -46,6 +46,8 @@ MemTableListVersion::MemTableListVersion(
size_t* parent_memtable_list_memory_usage, MemTableListVersion* old)
: max_write_buffer_number_to_maintain_(
old->max_write_buffer_number_to_maintain_),
max_write_buffer_size_to_maintain_(
old->max_write_buffer_size_to_maintain_),
parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {
if (old != nullptr) {
memlist_ = old->memlist_;
@ -62,8 +64,10 @@ MemTableListVersion::MemTableListVersion(
MemTableListVersion::MemTableListVersion(
size_t* parent_memtable_list_memory_usage,
int max_write_buffer_number_to_maintain)
int max_write_buffer_number_to_maintain,
int64_t max_write_buffer_size_to_maintain)
: max_write_buffer_number_to_maintain_(max_write_buffer_number_to_maintain),
max_write_buffer_size_to_maintain_(max_write_buffer_size_to_maintain),
parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {}
void MemTableListVersion::Ref() { ++refs_; }
@ -240,7 +244,7 @@ void MemTableListVersion::Add(MemTable* m, autovector<MemTable*>* to_delete) {
assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
AddMemTable(m);
TrimHistory(to_delete);
TrimHistory(to_delete, m->ApproximateMemoryUsage());
}
// Removes m from list of memtables not flushed. Caller should NOT Unref m.
@ -250,19 +254,51 @@ void MemTableListVersion::Remove(MemTable* m,
memlist_.remove(m);
m->MarkFlushed();
if (max_write_buffer_number_to_maintain_ > 0) {
if (max_write_buffer_size_to_maintain_ > 0 ||
max_write_buffer_number_to_maintain_ > 0) {
memlist_history_.push_front(m);
TrimHistory(to_delete);
// Unable to get size of mutable memtable at this point, pass 0 to
// TrimHistory as a best effort.
TrimHistory(to_delete, 0);
} else {
UnrefMemTable(to_delete, m);
}
}
// return the total memory usage assuming the oldest flushed memtable is dropped
size_t MemTableListVersion::ApproximateMemoryUsageExcludingLast() {
size_t total_memtable_size = 0;
for (auto& memtable : memlist_) {
total_memtable_size += memtable->ApproximateMemoryUsage();
}
for (auto& memtable : memlist_history_) {
total_memtable_size += memtable->ApproximateMemoryUsage();
}
if (!memlist_history_.empty()) {
total_memtable_size -= memlist_history_.back()->ApproximateMemoryUsage();
}
return total_memtable_size;
}
bool MemTableListVersion::MemtableLimitExceeded(size_t usage) {
if (max_write_buffer_size_to_maintain_ > 0) {
// calculate the total memory usage after dropping the oldest flushed
// memtable, compare with max_write_buffer_size_to_maintain_ to decide
// whether to trim history
return ApproximateMemoryUsageExcludingLast() + usage >=
static_cast<size_t>(max_write_buffer_size_to_maintain_);
} else if (max_write_buffer_number_to_maintain_ > 0) {
return memlist_.size() + memlist_history_.size() >
static_cast<size_t>(max_write_buffer_number_to_maintain_);
} else {
return false;
}
}
// Make sure we don't use up too much space in history
void MemTableListVersion::TrimHistory(autovector<MemTable*>* to_delete) {
while (memlist_.size() + memlist_history_.size() >
static_cast<size_t>(max_write_buffer_number_to_maintain_) &&
!memlist_history_.empty()) {
void MemTableListVersion::TrimHistory(autovector<MemTable*>* to_delete,
size_t usage) {
while (MemtableLimitExceeded(usage) && !memlist_history_.empty()) {
MemTable* x = memlist_history_.back();
memlist_history_.pop_back();
@ -444,6 +480,8 @@ Status MemTableList::TryInstallMemtableFlushResults(
cfd->GetName().c_str(), m->file_number_, mem_id);
assert(m->file_number_ > 0);
current_->Remove(m, to_delete);
UpdateMemoryUsageExcludingLast();
ResetTrimHistoryNeeded();
++mem_id;
}
} else {
@ -483,6 +521,15 @@ void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) {
if (num_flush_not_started_ == 1) {
imm_flush_needed.store(true, std::memory_order_release);
}
UpdateMemoryUsageExcludingLast();
ResetTrimHistoryNeeded();
}
void MemTableList::TrimHistory(autovector<MemTable*>* to_delete, size_t usage) {
InstallNewVersion();
current_->TrimHistory(to_delete, usage);
UpdateMemoryUsageExcludingLast();
ResetTrimHistoryNeeded();
}
// Returns an estimate of the number of bytes of data in use.
@ -496,6 +543,20 @@ size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() {
size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; }
size_t MemTableList::ApproximateMemoryUsageExcludingLast() {
size_t usage =
current_memory_usage_excluding_last_.load(std::memory_order_relaxed);
return usage;
}
// Update current_memory_usage_excluding_last_, need to call whenever state
// changes for MemtableListVersion (whenever InstallNewVersion() is called)
void MemTableList::UpdateMemoryUsageExcludingLast() {
size_t total_memtable_size = current_->ApproximateMemoryUsageExcludingLast();
current_memory_usage_excluding_last_.store(total_memtable_size,
std::memory_order_relaxed);
}
uint64_t MemTableList::ApproximateOldestKeyTime() const {
if (!current_->memlist_.empty()) {
return current_->memlist_.back()->ApproximateOldestKeyTime();
@ -623,6 +684,8 @@ Status InstallMemtableAtomicFlushResults(
cfds[i]->GetName().c_str(), m->GetFileNumber(),
mem_id);
imm->current_->Remove(m, to_delete);
imm->UpdateMemoryUsageExcludingLast();
imm->ResetTrimHistoryNeeded();
}
}
} else {
@ -664,6 +727,8 @@ void MemTableList::RemoveOldMemTables(uint64_t log_number,
imm_flush_needed.store(false, std::memory_order_release);
}
}
UpdateMemoryUsageExcludingLast();
ResetTrimHistoryNeeded();
}
} // namespace rocksdb

@ -44,7 +44,8 @@ class MemTableListVersion {
explicit MemTableListVersion(size_t* parent_memtable_list_memory_usage,
MemTableListVersion* old = nullptr);
explicit MemTableListVersion(size_t* parent_memtable_list_memory_usage,
int max_write_buffer_number_to_maintain);
int max_write_buffer_number_to_maintain,
int64_t max_write_buffer_size_to_maintain);
void Ref();
void Unref(autovector<MemTable*>* to_delete = nullptr);
@ -139,7 +140,7 @@ class MemTableListVersion {
// REQUIRE: m is an immutable memtable
void Remove(MemTable* m, autovector<MemTable*>* to_delete);
void TrimHistory(autovector<MemTable*>* to_delete);
void TrimHistory(autovector<MemTable*>* to_delete, size_t usage);
bool GetFromList(std::list<MemTable*>* list, const LookupKey& key,
std::string* value, Status* s, MergeContext* merge_context,
@ -152,6 +153,14 @@ class MemTableListVersion {
void UnrefMemTable(autovector<MemTable*>* to_delete, MemTable* m);
// Calculate the total amount of memory used by memlist_ and memlist_history_
// excluding the last MemTable in memlist_history_. The reason for excluding
// the last MemTable is to see if dropping the last MemTable will keep total
// memory usage above or equal to max_write_buffer_size_to_maintain_
size_t ApproximateMemoryUsageExcludingLast();
bool MemtableLimitExceeded(size_t usage);
// Immutable MemTables that have not yet been flushed.
std::list<MemTable*> memlist_;
@ -160,8 +169,10 @@ class MemTableListVersion {
std::list<MemTable*> memlist_history_;
// Maximum number of MemTables to keep in memory (including both flushed
// and not-yet-flushed tables).
const int max_write_buffer_number_to_maintain_;
// Maximum size of MemTables to keep in memory (including both flushed
// and not-yet-flushed tables).
const int64_t max_write_buffer_size_to_maintain_;
int refs_ = 0;
@ -176,35 +187,41 @@ class MemTableListVersion {
// recoverability from a crash.
//
//
// Other than imm_flush_needed, this class is not thread-safe and requires
// external synchronization (such as holding the db mutex or being on the
// write thread.)
// Other than imm_flush_needed and imm_trim_needed, this class is not
// thread-safe and requires external synchronization (such as holding the db
// mutex or being on the write thread.)
class MemTableList {
public:
// A list of memtables.
explicit MemTableList(int min_write_buffer_number_to_merge,
int max_write_buffer_number_to_maintain)
int max_write_buffer_number_to_maintain,
int64_t max_write_buffer_size_to_maintain)
: imm_flush_needed(false),
imm_trim_needed(false),
min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge),
current_(new MemTableListVersion(&current_memory_usage_,
max_write_buffer_number_to_maintain)),
max_write_buffer_number_to_maintain,
max_write_buffer_size_to_maintain)),
num_flush_not_started_(0),
commit_in_progress_(false),
flush_requested_(false) {
flush_requested_(false),
current_memory_usage_(0),
current_memory_usage_excluding_last_(0) {
current_->Ref();
current_memory_usage_ = 0;
}
// Should not delete MemTableList without making sure MemTableList::current()
// is Unref()'d.
~MemTableList() {}
MemTableListVersion* current() { return current_; }
MemTableListVersion* current() const { return current_; }
// so that background threads can detect non-nullptr pointer to
// determine whether there is anything more to start flushing.
std::atomic<bool> imm_flush_needed;
std::atomic<bool> imm_trim_needed;
// Returns the total number of memtables in the list that haven't yet
// been flushed and logged.
int NumNotFlushed() const;
@ -243,6 +260,18 @@ class MemTableList {
// Returns an estimate of the number of bytes of data in use.
size_t ApproximateMemoryUsage();
// Returns the cached current_memory_usage_excluding_last_ value
size_t ApproximateMemoryUsageExcludingLast();
// Update current_memory_usage_excluding_last_ from MemtableListVersion
void UpdateMemoryUsageExcludingLast();
// `usage` is the current size of the mutable Memtable. When
// max_write_buffer_size_to_maintain is used, total size of mutable and
// immutable memtables is checked against it to decide whether to trim
// memtable list.
void TrimHistory(autovector<MemTable*>* to_delete, size_t usage);
// Returns an estimate of the number of bytes of data used by
// the unflushed mem-tables.
size_t ApproximateUnflushedMemTablesMemoryUsage();
@ -259,6 +288,20 @@ class MemTableList {
bool HasFlushRequested() { return flush_requested_; }
// Returns true if a trim history should be scheduled and the caller should
// be the one to schedule it
bool MarkTrimHistoryNeeded() {
auto expected = false;
return imm_trim_needed.compare_exchange_strong(
expected, true, std::memory_order_relaxed, std::memory_order_relaxed);
}
void ResetTrimHistoryNeeded() {
auto expected = true;
imm_trim_needed.compare_exchange_strong(
expected, false, std::memory_order_relaxed, std::memory_order_relaxed);
}
// Copying allowed
// MemTableList(const MemTableList&);
// void operator=(const MemTableList&);
@ -338,6 +381,8 @@ class MemTableList {
// The current memory usage.
size_t current_memory_usage_;
std::atomic<size_t> current_memory_usage_excluding_last_;
};
// Installs memtable atomic flush results.

@ -183,7 +183,7 @@ class MemTableListTest : public testing::Test {
TEST_F(MemTableListTest, Empty) {
// Create an empty MemTableList and validate basic functions.
MemTableList list(1, 0);
MemTableList list(1, 0, 0);
ASSERT_EQ(0, list.NumNotFlushed());
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
@ -202,8 +202,10 @@ TEST_F(MemTableListTest, GetTest) {
// Create MemTableList
int min_write_buffer_number_to_merge = 2;
int max_write_buffer_number_to_maintain = 0;
int64_t max_write_buffer_size_to_maintain = 0;
MemTableList list(min_write_buffer_number_to_merge,
max_write_buffer_number_to_maintain);
max_write_buffer_number_to_maintain,
max_write_buffer_size_to_maintain);
SequenceNumber seq = 1;
std::string value;
@ -312,8 +314,10 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
// Create MemTableList
int min_write_buffer_number_to_merge = 2;
int max_write_buffer_number_to_maintain = 2;
int64_t max_write_buffer_size_to_maintain = 2000;
MemTableList list(min_write_buffer_number_to_merge,
max_write_buffer_number_to_maintain);
max_write_buffer_number_to_maintain,
max_write_buffer_size_to_maintain);
SequenceNumber seq = 1;
std::string value;
@ -514,8 +518,11 @@ TEST_F(MemTableListTest, FlushPendingTest) {
// Create MemTableList
int min_write_buffer_number_to_merge = 3;
int max_write_buffer_number_to_maintain = 7;
int64_t max_write_buffer_size_to_maintain =
7 * static_cast<int>(options.write_buffer_size);
MemTableList list(min_write_buffer_number_to_merge,
max_write_buffer_number_to_maintain);
max_write_buffer_number_to_maintain,
max_write_buffer_size_to_maintain);
// Create some MemTables
uint64_t memtable_id = 0;
@ -670,7 +677,9 @@ TEST_F(MemTableListTest, FlushPendingTest) {
// created. So TryInstallMemtableFlushResults will install the first 3 tables
// in to_flush and stop when it encounters a table not yet flushed.
ASSERT_EQ(2, list.NumNotFlushed());
int num_in_history = std::min(3, max_write_buffer_number_to_maintain);
int num_in_history =
std::min(3, static_cast<int>(max_write_buffer_size_to_maintain) /
static_cast<int>(options.write_buffer_size));
ASSERT_EQ(num_in_history, list.NumFlushed());
ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
@ -687,7 +696,9 @@ TEST_F(MemTableListTest, FlushPendingTest) {
// This will actually install 2 tables. The 1 we told it to flush, and also
// tables[4] which has been waiting for tables[3] to commit.
ASSERT_EQ(0, list.NumNotFlushed());
num_in_history = std::min(5, max_write_buffer_number_to_maintain);
num_in_history =
std::min(5, static_cast<int>(max_write_buffer_size_to_maintain) /
static_cast<int>(options.write_buffer_size));
ASSERT_EQ(num_in_history, list.NumFlushed());
ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
@ -730,7 +741,8 @@ TEST_F(MemTableListTest, FlushPendingTest) {
list.current()->Unref(&to_delete);
int to_delete_size =
std::min(num_tables, max_write_buffer_number_to_maintain);
std::min(num_tables, static_cast<int>(max_write_buffer_size_to_maintain) /
static_cast<int>(options.write_buffer_size));
ASSERT_EQ(to_delete_size, to_delete.size());
for (const auto& m : to_delete) {
@ -769,10 +781,13 @@ TEST_F(MemTableListTest, AtomicFlusTest) {
// Create MemTableLists
int min_write_buffer_number_to_merge = 3;
int max_write_buffer_number_to_maintain = 7;
int64_t max_write_buffer_size_to_maintain =
7 * static_cast<int64_t>(options.write_buffer_size);
autovector<MemTableList*> lists;
for (int i = 0; i != num_cfs; ++i) {
lists.emplace_back(new MemTableList(min_write_buffer_number_to_merge,
max_write_buffer_number_to_maintain));
max_write_buffer_number_to_maintain,
max_write_buffer_size_to_maintain));
}
autovector<uint32_t> cf_ids;

@ -383,7 +383,8 @@ class Repairer {
continue;
}
WriteBatchInternal::SetContents(&batch, record);
status = WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr);
status =
WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr, nullptr);
if (status.ok()) {
counter += WriteBatchInternal::Count(&batch);
} else {

@ -0,0 +1,59 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/trim_history_scheduler.h"
#include <cassert>
#include "db/column_family.h"
namespace rocksdb {
void TrimHistoryScheduler::ScheduleWork(ColumnFamilyData* cfd) {
std::lock_guard<std::mutex> lock(checking_mutex_);
cfd->Ref();
cfds_.push_back(cfd);
is_empty_.store(false, std::memory_order_relaxed);
}
ColumnFamilyData* TrimHistoryScheduler::TakeNextColumnFamily() {
std::lock_guard<std::mutex> lock(checking_mutex_);
while (true) {
if (cfds_.empty()) {
return nullptr;
}
ColumnFamilyData* cfd = cfds_.back();
cfds_.pop_back();
if (cfds_.empty()) {
is_empty_.store(true, std::memory_order_relaxed);
}
if (!cfd->IsDropped()) {
// success
return cfd;
}
if (cfd->Unref()) {
// no longer relevant, retry
delete cfd;
}
}
}
bool TrimHistoryScheduler::Empty() {
bool is_empty = is_empty_.load(std::memory_order_relaxed);
return is_empty;
}
void TrimHistoryScheduler::Clear() {
ColumnFamilyData* cfd;
while ((cfd = TakeNextColumnFamily()) != nullptr) {
if (cfd->Unref()) {
delete cfd;
}
}
assert(Empty());
}
} // namespace rocksdb

@ -0,0 +1,44 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <stdint.h>
#include <atomic>
#include <mutex>
#include "util/autovector.h"
namespace rocksdb {
class ColumnFamilyData;
// Similar to FlushScheduler, TrimHistoryScheduler is a FIFO queue that keeps
// track of column families whose flushed immutable memtables may need to be
// removed (aka trimmed). The actual trimming may be slightly delayed. Due to
// the use of the mutex and atomic variable, ScheduleWork,
// TakeNextColumnFamily, and, Empty can be called concurrently.
class TrimHistoryScheduler {
public:
TrimHistoryScheduler() : is_empty_(true) {}
// When a column family needs history trimming, add cfd to the FIFO queue
void ScheduleWork(ColumnFamilyData* cfd);
// Remove the column family from the queue, the caller is responsible for
// calling `MemtableList::TrimHistory`
ColumnFamilyData* TakeNextColumnFamily();
bool Empty();
void Clear();
// Not on critical path, use mutex to ensure thread safety
private:
std::atomic<bool> is_empty_;
autovector<ColumnFamilyData*> cfds_;
std::mutex checking_mutex_;
};
} // namespace rocksdb

@ -48,6 +48,7 @@
#include "db/memtable.h"
#include "db/merge_context.h"
#include "db/snapshot_impl.h"
#include "db/trim_history_scheduler.h"
#include "db/write_batch_internal.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h"
@ -1189,6 +1190,7 @@ class MemTableInserter : public WriteBatch::Handler {
SequenceNumber sequence_;
ColumnFamilyMemTables* const cf_mems_;
FlushScheduler* const flush_scheduler_;
TrimHistoryScheduler* const trim_history_scheduler_;
const bool ignore_missing_column_families_;
const uint64_t recovering_log_number_;
// log number that all Memtables inserted into should reference
@ -1250,6 +1252,7 @@ class MemTableInserter : public WriteBatch::Handler {
// cf_mems should not be shared with concurrent inserters
MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems,
FlushScheduler* flush_scheduler,
TrimHistoryScheduler* trim_history_scheduler,
bool ignore_missing_column_families,
uint64_t recovering_log_number, DB* db,
bool concurrent_memtable_writes,
@ -1258,6 +1261,7 @@ class MemTableInserter : public WriteBatch::Handler {
: sequence_(_sequence),
cf_mems_(cf_mems),
flush_scheduler_(flush_scheduler),
trim_history_scheduler_(trim_history_scheduler),
ignore_missing_column_families_(ignore_missing_column_families),
recovering_log_number_(recovering_log_number),
log_number_ref_(0),
@ -1748,7 +1752,20 @@ class MemTableInserter : public WriteBatch::Handler {
cfd->mem()->MarkFlushScheduled()) {
// MarkFlushScheduled only returns true if we are the one that
// should take action, so no need to dedup further
flush_scheduler_->ScheduleFlush(cfd);
flush_scheduler_->ScheduleWork(cfd);
}
}
// check if memtable_list size exceeds max_write_buffer_size_to_maintain
if (trim_history_scheduler_ != nullptr) {
auto* cfd = cf_mems_->current();
assert(cfd != nullptr);
if (cfd->ioptions()->max_write_buffer_size_to_maintain > 0 &&
cfd->mem()->ApproximateMemoryUsageFast() +
cfd->imm()->ApproximateMemoryUsageExcludingLast() >=
static_cast<size_t>(
cfd->ioptions()->max_write_buffer_size_to_maintain) &&
cfd->imm()->MarkTrimHistoryNeeded()) {
trim_history_scheduler_->ScheduleWork(cfd);
}
}
}
@ -1908,12 +1925,14 @@ class MemTableInserter : public WriteBatch::Handler {
Status WriteBatchInternal::InsertInto(
WriteThread::WriteGroup& write_group, SequenceNumber sequence,
ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
TrimHistoryScheduler* trim_history_scheduler,
bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db,
bool concurrent_memtable_writes, bool seq_per_batch, bool batch_per_txn) {
MemTableInserter inserter(
sequence, memtables, flush_scheduler, ignore_missing_column_families,
recovery_log_number, db, concurrent_memtable_writes,
nullptr /*has_valid_writes*/, seq_per_batch, batch_per_txn);
sequence, memtables, flush_scheduler, trim_history_scheduler,
ignore_missing_column_families, recovery_log_number, db,
concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch,
batch_per_txn);
for (auto w : write_group) {
if (w->CallbackFailed()) {
continue;
@ -1939,6 +1958,7 @@ Status WriteBatchInternal::InsertInto(
Status WriteBatchInternal::InsertInto(
WriteThread::Writer* writer, SequenceNumber sequence,
ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
TrimHistoryScheduler* trim_history_scheduler,
bool ignore_missing_column_families, uint64_t log_number, DB* db,
bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt,
bool batch_per_txn) {
@ -1947,9 +1967,10 @@ Status WriteBatchInternal::InsertInto(
#endif
assert(writer->ShouldWriteToMemtable());
MemTableInserter inserter(
sequence, memtables, flush_scheduler, ignore_missing_column_families,
log_number, db, concurrent_memtable_writes, nullptr /*has_valid_writes*/,
seq_per_batch, batch_per_txn);
sequence, memtables, flush_scheduler, trim_history_scheduler,
ignore_missing_column_families, log_number, db,
concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch,
batch_per_txn);
SetSequence(writer->batch, sequence);
inserter.set_log_number_ref(writer->log_ref);
Status s = writer->batch->Iterate(&inserter);
@ -1963,11 +1984,13 @@ Status WriteBatchInternal::InsertInto(
Status WriteBatchInternal::InsertInto(
const WriteBatch* batch, ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler, bool ignore_missing_column_families,
uint64_t log_number, DB* db, bool concurrent_memtable_writes,
SequenceNumber* next_seq, bool* has_valid_writes, bool seq_per_batch,
bool batch_per_txn) {
FlushScheduler* flush_scheduler,
TrimHistoryScheduler* trim_history_scheduler,
bool ignore_missing_column_families, uint64_t log_number, DB* db,
bool concurrent_memtable_writes, SequenceNumber* next_seq,
bool* has_valid_writes, bool seq_per_batch, bool batch_per_txn) {
MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler,
trim_history_scheduler,
ignore_missing_column_families, log_number, db,
concurrent_memtable_writes, has_valid_writes,
seq_per_batch, batch_per_txn);

@ -9,11 +9,13 @@
#pragma once
#include <vector>
#include "db/flush_scheduler.h"
#include "db/trim_history_scheduler.h"
#include "db/write_thread.h"
#include "rocksdb/types.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/types.h"
#include "rocksdb/write_batch.h"
#include "util/autovector.h"
namespace rocksdb {
@ -162,6 +164,7 @@ class WriteBatchInternal {
static Status InsertInto(
WriteThread::WriteGroup& write_group, SequenceNumber sequence,
ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
TrimHistoryScheduler* trim_history_scheduler,
bool ignore_missing_column_families = false, uint64_t log_number = 0,
DB* db = nullptr, bool concurrent_memtable_writes = false,
bool seq_per_batch = false, bool batch_per_txn = true);
@ -171,6 +174,7 @@ class WriteBatchInternal {
static Status InsertInto(
const WriteBatch* batch, ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler,
TrimHistoryScheduler* trim_history_scheduler,
bool ignore_missing_column_families = false, uint64_t log_number = 0,
DB* db = nullptr, bool concurrent_memtable_writes = false,
SequenceNumber* next_seq = nullptr, bool* has_valid_writes = nullptr,
@ -179,6 +183,7 @@ class WriteBatchInternal {
static Status InsertInto(WriteThread::Writer* writer, SequenceNumber sequence,
ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler,
TrimHistoryScheduler* trim_history_scheduler,
bool ignore_missing_column_families = false,
uint64_t log_number = 0, DB* db = nullptr,
bool concurrent_memtable_writes = false,

@ -35,7 +35,8 @@ static std::string PrintContents(WriteBatch* b) {
mem->Ref();
std::string state;
ColumnFamilyMemTablesDefault cf_mems_default(mem);
Status s = WriteBatchInternal::InsertInto(b, &cf_mems_default, nullptr);
Status s =
WriteBatchInternal::InsertInto(b, &cf_mems_default, nullptr, nullptr);
int count = 0;
int put_count = 0;
int delete_count = 0;

@ -104,7 +104,7 @@
compression=kSnappyCompression
level0_file_num_compaction_trigger=4
purge_redundant_kvs_while_flush=true
max_write_buffer_number_to_maintain=0
max_write_buffer_size_to_maintain=0
memtable_factory=SkipListFactory
max_grandparent_overlap_factor=8
expanded_compaction_factor=25

@ -175,11 +175,26 @@ struct AdvancedColumnFamilyOptions {
// individual write buffers. Default: 1
int min_write_buffer_number_to_merge = 1;
// DEPRECATED
// The total maximum number of write buffers to maintain in memory including
// copies of buffers that have already been flushed. Unlike
// max_write_buffer_number, this parameter does not affect flushing.
// This controls the minimum amount of write history that will be available
// in memory for conflict checking when Transactions are used.
// This parameter is being replaced by max_write_buffer_size_to_maintain.
// If both parameters are set to non-zero values, this parameter will be
// ignored.
int max_write_buffer_number_to_maintain = 0;
// The total maximum size(bytes) of write buffers to maintain in memory
// including copies of buffers that have already been flushed. This parameter
// only affects trimming of flushed buffers and does not affect flushing.
// This controls the maximum amount of write history that will be available
// in memory for conflict checking when Transactions are used. The actual
// size of write history (flushed Memtables) might be higher than this limit
// if further trimming will reduce write history total size below this
// limit. For example, if max_write_buffer_size_to_maintain is set to 64MB,
// and there are three flushed Memtables, with sizes of 32MB, 20MB, 20MB.
// Because trimming the next Memtable of size 20MB will reduce total memory
// usage to 52MB which is below the limit, RocksDB will stop trimming.
//
// When using an OptimisticTransactionDB:
// If this value is too low, some transactions may fail at commit time due
@ -192,14 +207,14 @@ struct AdvancedColumnFamilyOptions {
// done for conflict detection.
//
// Setting this value to 0 will cause write buffers to be freed immediately
// after they are flushed.
// If this value is set to -1, 'max_write_buffer_number' will be used.
// after they are flushed. If this value is set to -1,
// 'max_write_buffer_number * write_buffer_size' will be used.
//
// Default:
// If using a TransactionDB/OptimisticTransactionDB, the default value will
// be set to the value of 'max_write_buffer_number' if it is not explicitly
// set by the user. Otherwise, the default is 0.
int max_write_buffer_number_to_maintain = 0;
// be set to the value of 'max_write_buffer_number * write_buffer_size'
// if it is not explicitly set by the user. Otherwise, the default is 0.
int64_t max_write_buffer_size_to_maintain = 0;
// Allows thread-safe inplace updates. If this is true, there is no way to
// achieve point-in-time consistency using snapshot or iterator (assuming

@ -854,6 +854,9 @@ rocksdb_options_set_min_write_buffer_number_to_merge(rocksdb_options_t*, int);
extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_max_write_buffer_number_to_maintain(rocksdb_options_t*,
int);
extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_max_write_buffer_size_to_maintain(rocksdb_options_t*,
int64_t);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_enable_pipelined_write(
rocksdb_options_t*, unsigned char);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_unordered_write(

@ -131,7 +131,7 @@ class Transaction {
// Status::Busy() may be returned if the transaction could not guarantee
// that there are no write conflicts. Status::TryAgain() may be returned
// if the memtable history size is not large enough
// (See max_write_buffer_number_to_maintain).
// (See max_write_buffer_size_to_maintain).
//
// If this transaction was created by a TransactionDB(), Status::Expired()
// may be returned if this transaction has lived for longer than
@ -243,7 +243,7 @@ class Transaction {
// Status::Busy() if there is a write conflict,
// Status::TimedOut() if a lock could not be acquired,
// Status::TryAgain() if the memtable history size is not large enough
// (See max_write_buffer_number_to_maintain)
// (See max_write_buffer_size_to_maintain)
// Status::MergeInProgress() if merge operations cannot be resolved.
// or other errors if this key could not be read.
virtual Status GetForUpdate(const ReadOptions& options,
@ -320,7 +320,7 @@ class Transaction {
// Status::Busy() if there is a write conflict,
// Status::TimedOut() if a lock could not be acquired,
// Status::TryAgain() if the memtable history size is not large enough
// (See max_write_buffer_number_to_maintain)
// (See max_write_buffer_size_to_maintain)
// or other errors on unexpected failures.
virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value, const bool assume_tracked = false) = 0;

@ -52,8 +52,8 @@ jbyteArray Java_org_rocksdb_WriteBatchTest_getContents(JNIEnv* env,
mem->Ref();
std::string state;
rocksdb::ColumnFamilyMemTablesDefault cf_mems_default(mem);
rocksdb::Status s =
rocksdb::WriteBatchInternal::InsertInto(b, &cf_mems_default, nullptr);
rocksdb::Status s = rocksdb::WriteBatchInternal::InsertInto(
b, &cf_mems_default, nullptr, nullptr);
int count = 0;
rocksdb::Arena arena;
rocksdb::ScopedArenaIterator iter(

@ -33,6 +33,8 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options,
cf_options.min_write_buffer_number_to_merge),
max_write_buffer_number_to_maintain(
cf_options.max_write_buffer_number_to_maintain),
max_write_buffer_size_to_maintain(
cf_options.max_write_buffer_size_to_maintain),
inplace_update_support(cf_options.inplace_update_support),
inplace_callback(cf_options.inplace_callback),
info_log(db_options.info_log.get()),

@ -43,6 +43,8 @@ struct ImmutableCFOptions {
int max_write_buffer_number_to_maintain;
int64_t max_write_buffer_size_to_maintain;
bool inplace_update_support;
UpdateStatus (*inplace_callback)(char* existing_value,

@ -42,6 +42,8 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options)
options.min_write_buffer_number_to_merge),
max_write_buffer_number_to_maintain(
options.max_write_buffer_number_to_maintain),
max_write_buffer_size_to_maintain(
options.max_write_buffer_size_to_maintain),
inplace_update_support(options.inplace_update_support),
inplace_update_num_locks(options.inplace_update_num_locks),
inplace_callback(options.inplace_callback),
@ -158,6 +160,9 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
min_write_buffer_number_to_merge);
ROCKS_LOG_HEADER(log, " Options.max_write_buffer_number_to_maintain: %d",
max_write_buffer_number_to_maintain);
ROCKS_LOG_HEADER(log,
" Options.max_write_buffer_size_to_maintain: %" PRIu64,
max_write_buffer_size_to_maintain);
ROCKS_LOG_HEADER(
log, " Options.bottommost_compression_opts.window_bits: %d",
bottommost_compression_opts.window_bits);

@ -1873,6 +1873,9 @@ std::unordered_map<std::string, OptionTypeInfo>
{"max_write_buffer_number_to_maintain",
{offset_of(&ColumnFamilyOptions::max_write_buffer_number_to_maintain),
OptionType::kInt, OptionVerificationType::kNormal, false, 0}},
{"max_write_buffer_size_to_maintain",
{offset_of(&ColumnFamilyOptions::max_write_buffer_size_to_maintain),
OptionType::kInt64T, OptionVerificationType::kNormal, false, 0}},
{"min_write_buffer_number_to_merge",
{offset_of(&ColumnFamilyOptions::min_write_buffer_number_to_merge),
OptionType::kInt, OptionVerificationType::kNormal, false, 0}},

@ -438,6 +438,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"soft_rate_limit=530.615385;"
"soft_pending_compaction_bytes_limit=0;"
"max_write_buffer_number_to_maintain=84;"
"max_write_buffer_size_to_maintain=2147483648;"
"merge_operator=aabcxehazrMergeOperator;"
"memtable_prefix_bloom_size_ratio=0.4642;"
"memtable_whole_key_filtering=true;"

@ -49,6 +49,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"max_write_buffer_number", "2"},
{"min_write_buffer_number_to_merge", "3"},
{"max_write_buffer_number_to_maintain", "99"},
{"max_write_buffer_size_to_maintain", "-99999"},
{"compression", "kSnappyCompression"},
{"compression_per_level",
"kNoCompression:"
@ -150,6 +151,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_cf_opt.max_write_buffer_number, 2);
ASSERT_EQ(new_cf_opt.min_write_buffer_number_to_merge, 3);
ASSERT_EQ(new_cf_opt.max_write_buffer_number_to_maintain, 99);
ASSERT_EQ(new_cf_opt.max_write_buffer_size_to_maintain, -99999);
ASSERT_EQ(new_cf_opt.compression, kSnappyCompression);
ASSERT_EQ(new_cf_opt.compression_per_level.size(), 9U);
ASSERT_EQ(new_cf_opt.compression_per_level[0], kNoCompression);

@ -53,6 +53,7 @@ LIB_SOURCES = \
db/table_cache.cc \
db/table_properties_collector.cc \
db/transaction_log_impl.cc \
db/trim_history_scheduler.cc \
db/version_builder.cc \
db/version_edit.cc \
db/version_set.cc \

@ -3300,7 +3300,8 @@ TEST_F(MemTableTest, Simple) {
batch.DeleteRange(std::string("begin"), std::string("end"));
ColumnFamilyMemTablesDefault cf_mems_default(memtable);
ASSERT_TRUE(
WriteBatchInternal::InsertInto(&batch, &cf_mems_default, nullptr).ok());
WriteBatchInternal::InsertInto(&batch, &cf_mems_default, nullptr, nullptr)
.ok());
for (int i = 0; i < 2; ++i) {
Arena arena;

@ -333,6 +333,7 @@ void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, DBOptions& db_options,
cf_opt->max_mem_compaction_level = rnd->Uniform(100);
cf_opt->max_write_buffer_number = rnd->Uniform(100);
cf_opt->max_write_buffer_number_to_maintain = rnd->Uniform(100);
cf_opt->max_write_buffer_size_to_maintain = rnd->Uniform(10000);
cf_opt->min_write_buffer_number_to_merge = rnd->Uniform(100);
cf_opt->num_levels = rnd->Uniform(100);
cf_opt->target_file_size_multiplier = rnd->Uniform(100);

@ -340,6 +340,20 @@ DEFINE_int32(max_write_buffer_number_to_maintain,
"after they are flushed. If this value is set to -1, "
"'max_write_buffer_number' will be used.");
DEFINE_int64(max_write_buffer_size_to_maintain,
rocksdb::Options().max_write_buffer_size_to_maintain,
"The total maximum size of write buffers to maintain in memory "
"including copies of buffers that have already been flushed. "
"Unlike max_write_buffer_number, this parameter does not affect "
"flushing. This controls the minimum amount of write history "
"that will be available in memory for conflict checking when "
"Transactions are used. If this value is too low, some "
"transactions may fail at commit time due to not being able to "
"determine whether there were any write conflicts. Setting this "
"value to 0 will cause write buffers to be freed immediately "
"after they are flushed. If this value is set to -1, "
"'max_write_buffer_number' will be used.");
DEFINE_int32(max_background_jobs,
rocksdb::Options().max_background_jobs,
"The maximum number of concurrent background jobs that can occur "
@ -3385,6 +3399,8 @@ class Benchmark {
FLAGS_min_write_buffer_number_to_merge;
options.max_write_buffer_number_to_maintain =
FLAGS_max_write_buffer_number_to_maintain;
options.max_write_buffer_size_to_maintain =
FLAGS_max_write_buffer_size_to_maintain;
options.max_background_jobs = FLAGS_max_background_jobs;
options.max_background_compactions = FLAGS_max_background_compactions;
options.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);

@ -245,6 +245,7 @@ const std::string options_file_content = R"OPTIONS_FILE(
expanded_compaction_factor=25
soft_rate_limit=0.000000
max_write_buffer_number_to_maintain=0
max_write_buffer_size_to_maintain=0
verify_checksums_in_compaction=true
merge_operator=nullptr
memtable_prefix_bloom_bits=0

@ -204,6 +204,20 @@ DEFINE_int32(max_write_buffer_number_to_maintain,
"after they are flushed. If this value is set to -1, "
"'max_write_buffer_number' will be used.");
DEFINE_int64(max_write_buffer_size_to_maintain,
rocksdb::Options().max_write_buffer_size_to_maintain,
"The total maximum size of write buffers to maintain in memory "
"including copies of buffers that have already been flushed. "
"Unlike max_write_buffer_number, this parameter does not affect "
"flushing. This controls the minimum amount of write history "
"that will be available in memory for conflict checking when "
"Transactions are used. If this value is too low, some "
"transactions may fail at commit time due to not being able to "
"determine whether there were any write conflicts. Setting this "
"value to 0 will cause write buffers to be freed immediately "
"after they are flushed. If this value is set to -1, "
"'max_write_buffer_number' will be used.");
DEFINE_double(memtable_prefix_bloom_size_ratio,
rocksdb::Options().memtable_prefix_bloom_size_ratio,
"creates prefix blooms for memtables, each with size "
@ -2762,6 +2776,8 @@ class StressTest {
FLAGS_min_write_buffer_number_to_merge;
options_.max_write_buffer_number_to_maintain =
FLAGS_max_write_buffer_number_to_maintain;
options_.max_write_buffer_size_to_maintain =
FLAGS_max_write_buffer_size_to_maintain;
options_.memtable_prefix_bloom_size_ratio =
FLAGS_memtable_prefix_bloom_size_ratio;
options_.memtable_whole_key_filtering =

@ -6,17 +6,16 @@
#include "util/string_util.h"
#include <errno.h>
#include <cinttypes>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <algorithm>
#include <cinttypes>
#include <cmath>
#include <sstream>
#include <string>
#include <utility>
#include <vector>
#include "rocksdb/env.h"
#include "port/port.h"
#include "port/sys_time.h"
#include "rocksdb/slice.h"

@ -121,7 +121,6 @@ uint64_t ParseUint64(const std::string& value);
int ParseInt(const std::string& value);
int64_t ParseInt64(const std::string& value);
double ParseDouble(const std::string& value);

@ -63,9 +63,11 @@ Status OptimisticTransactionDB::Open(
for (auto& column_family : column_families_copy) {
ColumnFamilyOptions* options = &column_family.options;
if (options->max_write_buffer_number_to_maintain == 0) {
// Setting to -1 will set the History size to max_write_buffer_number.
options->max_write_buffer_number_to_maintain = -1;
if (options->max_write_buffer_size_to_maintain == 0 &&
options->max_write_buffer_number_to_maintain == 0) {
// Setting to -1 will set the History size to
// max_write_buffer_number * write_buffer_size.
options->max_write_buffer_size_to_maintain = -1;
}
}

@ -36,6 +36,7 @@ class OptimisticTransactionTest : public testing::Test {
OptimisticTransactionTest() {
options.create_if_missing = true;
options.max_write_buffer_number = 2;
options.max_write_buffer_size_to_maintain = 1600;
dbname = test::PerThreadDBPath("optimistic_transaction_testdb");
DestroyDB(dbname, options);

@ -271,9 +271,11 @@ void TransactionDB::PrepareWrap(
for (size_t i = 0; i < column_families->size(); i++) {
ColumnFamilyOptions* cf_options = &(*column_families)[i].options;
if (cf_options->max_write_buffer_number_to_maintain == 0) {
// Setting to -1 will set the History size to max_write_buffer_number.
cf_options->max_write_buffer_number_to_maintain = -1;
if (cf_options->max_write_buffer_size_to_maintain == 0 &&
cf_options->max_write_buffer_number_to_maintain == 0) {
// Setting to -1 will set the History size to
// max_write_buffer_number * write_buffer_size.
cf_options->max_write_buffer_size_to_maintain = -1;
}
if (!cf_options->disable_auto_compactions) {
// Disable compactions momentarily to prevent race with DB::Open

@ -227,8 +227,10 @@ TEST_P(TransactionTest, ValidateSnapshotTest) {
db_impl->TEST_FlushMemTable(true);
// Make sure the flushed memtable is not kept in memory
int max_memtable_in_history =
std::max(options.max_write_buffer_number,
options.max_write_buffer_number_to_maintain) +
std::max(
options.max_write_buffer_number,
static_cast<int>(options.max_write_buffer_size_to_maintain) /
static_cast<int>(options.write_buffer_size)) +
1;
for (int i = 0; i < max_memtable_in_history; i++) {
db->Put(write_options, Slice("key"), Slice("value"));

@ -94,7 +94,7 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv,
" as the MemTable only contains changes newer than "
"SequenceNumber %" PRIu64
". Increasing the value of the "
"max_write_buffer_number_to_maintain option could reduce the "
"max_write_buffer_size_to_maintain option could reduce the "
"frequency "
"of this error.",
snap_seq, earliest_seq);

Loading…
Cancel
Save