Prevent a case of WriteBufferManager flush thrashing (#6364)

Summary:
Previously, the flushes triggered by `WriteBufferManager` could affect
the same CF repeatedly if it happens to get consecutive writes. Such
flushes are not particularly useful for reducing memory usage since
they switch nearly-empty memtables to immutable while they've just begun
filling their first arena block. In fact they may not even reduce the
mutable memory count if they involve replacing one mutable memtable containing
one arena block with a new mutable memtable containing one arena block.
Further, if such switches happen even a few times before a flush finishes,
the immutable memtable limit will be reached and writes will stall.

This PR adds a heuristic to not switch memtables to immutable for CFs
that already have one or more immutable memtables awaiting flush. There
is a memory usage regression if the user continues writing to the same
CF, that DB does not have any CFs eligible for switching, flushes
are not finishing, and the `WriteBufferManager` was constructed with
`allow_stall=false`. Before, it would grow by switching nearly empty
memtables until writes stall. Now, it would grow by filling memtables
until writes stall. This feels like an acceptable behavior change because
users who prefer to stall over violate the memory limit should be using
`allow_stall=true`, which is unaffected by this PR.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/6364

Test Plan:
- Command:

`rm -rf /dev/shm/dbbench/ && TEST_TMPDIR=/dev/shm ./db_bench -benchmarks=fillrandom -num_multi_db=8 -num_column_families=2 -write_buffer_size=4194304 -db_write_buffer_size=16777216 -compression_type=none -statistics=true -target_file_size_base=4194304 -max_bytes_for_level_base=16777216`

- `rocksdb.db.write.stall` count before this PR: 175
- `rocksdb.db.write.stall` count after this PR: 0

Reviewed By: jay-zhuang

Differential Revision: D20167197

Pulled By: ajkr

fbshipit-source-id: 4a64064e9bc33d57c0a35f15547542d0191d0cb7
main
Andrew Kryczka 2 years ago committed by Facebook GitHub Bot
parent 65814a4ae6
commit 91166012c8
  1. 1
      HISTORY.md
  2. 23
      db/db_impl/db_impl_write.cc
  3. 69
      db/db_write_buffer_manager_test.cc
  4. 8
      db/memtable_list.cc
  5. 4
      db/memtable_list.h
  6. 3
      include/rocksdb/options.h

@ -35,6 +35,7 @@
* Remove [min|max]_timestamp from VersionEdit for now since they are not tracked in MANIFEST anyway but consume two empty std::string (up to 64 bytes) for each file. Should they be added back in the future, we should store them more compactly.
* Improve universal tiered storage compaction picker to avoid extra major compaction triggered by size amplification. If `preclude_last_level_data_seconds` is enabled, the size amplification is calculated within non last_level data only which skip the last level and use the penultimate level as the size base.
* If an error is hit when writing to a file (append, sync, etc), RocksDB is more strict with not issuing more operations to it, except closing the file, with exceptions of some WAL file operations in error recovery path.
* A `WriteBufferManager` constructed with `allow_stall == false` will no longer trigger write stall implicitly by thrashing until memtable count limit is reached. Instead, a column family can continue accumulating writes while that CF is flushing, which means memory may increase. Users who prefer stalling writes must now explicitly set `allow_stall == true`.
### Performance Improvements
* Instead of constructing `FragmentedRangeTombstoneList` during every read operation, it is now constructed once and stored in immutable memtables. This improves speed of querying range tombstones from immutable memtables.

@ -1648,12 +1648,6 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) {
// thread is writing to another DB with the same write buffer, they may also
// be flushed. We may end up with flushing much more DBs than needed. It's
// suboptimal but still correct.
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Flushing column family with oldest memtable entry. Write buffers are "
"using %" ROCKSDB_PRIszt " bytes out of a total of %" ROCKSDB_PRIszt ".",
write_buffer_manager_->memory_usage(),
write_buffer_manager_->buffer_size());
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
autovector<ColumnFamilyData*> cfds;
@ -1667,9 +1661,11 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) {
if (cfd->IsDropped()) {
continue;
}
if (!cfd->mem()->IsEmpty()) {
// We only consider active mem table, hoping immutable memtable is
// already in the process of flushing.
if (!cfd->mem()->IsEmpty() && !cfd->imm()->IsFlushPendingOrRunning()) {
// We only consider flush on CFs with bytes in the mutable memtable,
// and no immutable memtables for which flush has yet to finish. If
// we triggered flush on CFs already trying to flush, we would risk
// creating too many immutable memtables leading to write stalls.
uint64_t seq = cfd->mem()->GetCreationSeq();
if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
cfd_picked = cfd;
@ -1682,6 +1678,15 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) {
}
MaybeFlushStatsCF(&cfds);
}
if (!cfds.empty()) {
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Flushing triggered to alleviate write buffer memory usage. Write "
"buffer is using %" ROCKSDB_PRIszt
" bytes out of a total of %" ROCKSDB_PRIszt ".",
write_buffer_manager_->memory_usage(),
write_buffer_manager_->buffer_size());
}
WriteThread::Writer nonmem_w;
if (two_write_queues_) {

@ -780,6 +780,75 @@ TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsMultipleDB) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
#ifndef ROCKSDB_LITE
// Tests a `WriteBufferManager` constructed with `allow_stall == false` does not
// thrash memtable switching when full and a CF receives multiple writes.
// Instead, we expect to switch a CF's memtable for flush only when that CF does
// not have any pending or running flush.
//
// This test uses multiple DBs each with a single CF instead of a single DB
// with multiple CFs. That way we can control which CF is considered for switch
// by writing to that CF's DB.
//
// Not supported in LITE mode due to `GetProperty()` unavailable.
TEST_P(DBWriteBufferManagerTest, StopSwitchingMemTablesOnceFlushing) {
Options options = CurrentOptions();
options.arena_block_size = 4 << 10; // 4KB
options.write_buffer_size = 1 << 20; // 1MB
std::shared_ptr<Cache> cache =
NewLRUCache(4 << 20 /* capacity (4MB) */, 2 /* num_shard_bits */);
ASSERT_LT(cache->GetUsage(), 256 << 10 /* 256KB */);
cost_cache_ = GetParam();
if (cost_cache_) {
options.write_buffer_manager.reset(new WriteBufferManager(
512 << 10 /* buffer_size (512KB) */, cache, false /* allow_stall */));
} else {
options.write_buffer_manager.reset(
new WriteBufferManager(512 << 10 /* buffer_size (512KB) */,
nullptr /* cache */, false /* allow_stall */));
}
Reopen(options);
std::string dbname = test::PerThreadDBPath("db_shared_wbm_db");
DB* shared_wbm_db = nullptr;
ASSERT_OK(DestroyDB(dbname, options));
ASSERT_OK(DB::Open(options, dbname, &shared_wbm_db));
// The last write will make WBM need flush, but it won't flush yet.
ASSERT_OK(Put(Key(1), DummyString(256 << 10 /* 256KB */), WriteOptions()));
ASSERT_FALSE(options.write_buffer_manager->ShouldFlush());
ASSERT_OK(Put(Key(1), DummyString(256 << 10 /* 256KB */), WriteOptions()));
ASSERT_TRUE(options.write_buffer_manager->ShouldFlush());
// Flushes will be pending, not running because flush threads are blocked.
test::SleepingBackgroundTask sleeping_task_high;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
&sleeping_task_high, Env::Priority::HIGH);
for (int i = 0; i < 3; ++i) {
ASSERT_OK(
shared_wbm_db->Put(WriteOptions(), Key(1), DummyString(1 /* len */)));
std::string prop;
ASSERT_TRUE(
shared_wbm_db->GetProperty("rocksdb.num-immutable-mem-table", &prop));
ASSERT_EQ(std::to_string(i > 0 ? 1 : 0), prop);
ASSERT_TRUE(
shared_wbm_db->GetProperty("rocksdb.mem-table-flush-pending", &prop));
ASSERT_EQ(std::to_string(i > 0 ? 1 : 0), prop);
}
// Clean up DBs.
sleeping_task_high.WakeUp();
sleeping_task_high.WaitUntilDone();
ASSERT_OK(shared_wbm_db->Close());
ASSERT_OK(DestroyDB(dbname, options));
delete shared_wbm_db;
}
#endif // ROCKSDB_LITE
INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest, DBWriteBufferManagerTest,
testing::Bool());

@ -351,6 +351,14 @@ bool MemTableList::IsFlushPending() const {
return false;
}
bool MemTableList::IsFlushPendingOrRunning() const {
if (current_->memlist_.size() - num_flush_not_started_ > 0) {
// Flush is already running on at least one memtable
return true;
}
return IsFlushPending();
}
// Returns the memtables that need to be flushed.
void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id,
autovector<MemTable*>* ret,

@ -255,6 +255,10 @@ class MemTableList {
// not yet started.
bool IsFlushPending() const;
// Returns true if there is at least one memtable that is pending flush or
// flushing.
bool IsFlushPendingOrRunning() const;
// Returns the earliest memtables that needs to be flushed. The returned
// memtables are guaranteed to be in the ascending order of created time.
void PickMemtablesToFlush(uint64_t max_memtable_id,

@ -905,7 +905,8 @@ struct DBOptions {
// can be passed into multiple DBs and it will track the sum of size of all
// the DBs. If the total size of all live memtables of all the DBs exceeds
// a limit, a flush will be triggered in the next DB to which the next write
// is issued.
// is issued, as long as there is one or more column family not already
// flushing.
//
// If the object is only passed to one DB, the behavior is the same as
// db_write_buffer_size. When write_buffer_manager is set, the value set will

Loading…
Cancel
Save