diff --git a/HISTORY.md b/HISTORY.md index b6047a600..e91350297 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -35,6 +35,7 @@ * Remove [min|max]_timestamp from VersionEdit for now since they are not tracked in MANIFEST anyway but consume two empty std::string (up to 64 bytes) for each file. Should they be added back in the future, we should store them more compactly. * Improve universal tiered storage compaction picker to avoid extra major compaction triggered by size amplification. If `preclude_last_level_data_seconds` is enabled, the size amplification is calculated within non last_level data only which skip the last level and use the penultimate level as the size base. * If an error is hit when writing to a file (append, sync, etc), RocksDB is more strict with not issuing more operations to it, except closing the file, with exceptions of some WAL file operations in error recovery path. +* A `WriteBufferManager` constructed with `allow_stall == false` will no longer trigger write stall implicitly by thrashing until memtable count limit is reached. Instead, a column family can continue accumulating writes while that CF is flushing, which means memory may increase. Users who prefer stalling writes must now explicitly set `allow_stall == true`. ### Performance Improvements * Instead of constructing `FragmentedRangeTombstoneList` during every read operation, it is now constructed once and stored in immutable memtables. This improves speed of querying range tombstones from immutable memtables. diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 0ab334bb6..34880ceb7 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1648,12 +1648,6 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) { // thread is writing to another DB with the same write buffer, they may also // be flushed. We may end up with flushing much more DBs than needed. It's // suboptimal but still correct. - ROCKS_LOG_INFO( - immutable_db_options_.info_log, - "Flushing column family with oldest memtable entry. Write buffers are " - "using %" ROCKSDB_PRIszt " bytes out of a total of %" ROCKSDB_PRIszt ".", - write_buffer_manager_->memory_usage(), - write_buffer_manager_->buffer_size()); // no need to refcount because drop is happening in write thread, so can't // happen while we're in the write thread autovector cfds; @@ -1667,9 +1661,11 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) { if (cfd->IsDropped()) { continue; } - if (!cfd->mem()->IsEmpty()) { - // We only consider active mem table, hoping immutable memtable is - // already in the process of flushing. + if (!cfd->mem()->IsEmpty() && !cfd->imm()->IsFlushPendingOrRunning()) { + // We only consider flush on CFs with bytes in the mutable memtable, + // and no immutable memtables for which flush has yet to finish. If + // we triggered flush on CFs already trying to flush, we would risk + // creating too many immutable memtables leading to write stalls. uint64_t seq = cfd->mem()->GetCreationSeq(); if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) { cfd_picked = cfd; @@ -1682,6 +1678,15 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) { } MaybeFlushStatsCF(&cfds); } + if (!cfds.empty()) { + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "Flushing triggered to alleviate write buffer memory usage. Write " + "buffer is using %" ROCKSDB_PRIszt + " bytes out of a total of %" ROCKSDB_PRIszt ".", + write_buffer_manager_->memory_usage(), + write_buffer_manager_->buffer_size()); + } WriteThread::Writer nonmem_w; if (two_write_queues_) { diff --git a/db/db_write_buffer_manager_test.cc b/db/db_write_buffer_manager_test.cc index c1e8f7100..4c31a7824 100644 --- a/db/db_write_buffer_manager_test.cc +++ b/db/db_write_buffer_manager_test.cc @@ -780,6 +780,75 @@ TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsMultipleDB) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } +#ifndef ROCKSDB_LITE + +// Tests a `WriteBufferManager` constructed with `allow_stall == false` does not +// thrash memtable switching when full and a CF receives multiple writes. +// Instead, we expect to switch a CF's memtable for flush only when that CF does +// not have any pending or running flush. +// +// This test uses multiple DBs each with a single CF instead of a single DB +// with multiple CFs. That way we can control which CF is considered for switch +// by writing to that CF's DB. +// +// Not supported in LITE mode due to `GetProperty()` unavailable. +TEST_P(DBWriteBufferManagerTest, StopSwitchingMemTablesOnceFlushing) { + Options options = CurrentOptions(); + options.arena_block_size = 4 << 10; // 4KB + options.write_buffer_size = 1 << 20; // 1MB + std::shared_ptr cache = + NewLRUCache(4 << 20 /* capacity (4MB) */, 2 /* num_shard_bits */); + ASSERT_LT(cache->GetUsage(), 256 << 10 /* 256KB */); + cost_cache_ = GetParam(); + if (cost_cache_) { + options.write_buffer_manager.reset(new WriteBufferManager( + 512 << 10 /* buffer_size (512KB) */, cache, false /* allow_stall */)); + } else { + options.write_buffer_manager.reset( + new WriteBufferManager(512 << 10 /* buffer_size (512KB) */, + nullptr /* cache */, false /* allow_stall */)); + } + + Reopen(options); + std::string dbname = test::PerThreadDBPath("db_shared_wbm_db"); + DB* shared_wbm_db = nullptr; + + ASSERT_OK(DestroyDB(dbname, options)); + ASSERT_OK(DB::Open(options, dbname, &shared_wbm_db)); + + // The last write will make WBM need flush, but it won't flush yet. + ASSERT_OK(Put(Key(1), DummyString(256 << 10 /* 256KB */), WriteOptions())); + ASSERT_FALSE(options.write_buffer_manager->ShouldFlush()); + ASSERT_OK(Put(Key(1), DummyString(256 << 10 /* 256KB */), WriteOptions())); + ASSERT_TRUE(options.write_buffer_manager->ShouldFlush()); + + // Flushes will be pending, not running because flush threads are blocked. + test::SleepingBackgroundTask sleeping_task_high; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task_high, Env::Priority::HIGH); + + for (int i = 0; i < 3; ++i) { + ASSERT_OK( + shared_wbm_db->Put(WriteOptions(), Key(1), DummyString(1 /* len */))); + std::string prop; + ASSERT_TRUE( + shared_wbm_db->GetProperty("rocksdb.num-immutable-mem-table", &prop)); + ASSERT_EQ(std::to_string(i > 0 ? 1 : 0), prop); + ASSERT_TRUE( + shared_wbm_db->GetProperty("rocksdb.mem-table-flush-pending", &prop)); + ASSERT_EQ(std::to_string(i > 0 ? 1 : 0), prop); + } + + // Clean up DBs. + sleeping_task_high.WakeUp(); + sleeping_task_high.WaitUntilDone(); + ASSERT_OK(shared_wbm_db->Close()); + ASSERT_OK(DestroyDB(dbname, options)); + delete shared_wbm_db; +} + +#endif // ROCKSDB_LITE + INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest, DBWriteBufferManagerTest, testing::Bool()); diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 858404a0e..3b6a132e9 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -351,6 +351,14 @@ bool MemTableList::IsFlushPending() const { return false; } +bool MemTableList::IsFlushPendingOrRunning() const { + if (current_->memlist_.size() - num_flush_not_started_ > 0) { + // Flush is already running on at least one memtable + return true; + } + return IsFlushPending(); +} + // Returns the memtables that need to be flushed. void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id, autovector* ret, diff --git a/db/memtable_list.h b/db/memtable_list.h index 0f75d3055..cc6ceb815 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -255,6 +255,10 @@ class MemTableList { // not yet started. bool IsFlushPending() const; + // Returns true if there is at least one memtable that is pending flush or + // flushing. + bool IsFlushPendingOrRunning() const; + // Returns the earliest memtables that needs to be flushed. The returned // memtables are guaranteed to be in the ascending order of created time. void PickMemtablesToFlush(uint64_t max_memtable_id, diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 630072043..b0d21f62c 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -905,7 +905,8 @@ struct DBOptions { // can be passed into multiple DBs and it will track the sum of size of all // the DBs. If the total size of all live memtables of all the DBs exceeds // a limit, a flush will be triggered in the next DB to which the next write - // is issued. + // is issued, as long as there is one or more column family not already + // flushing. // // If the object is only passed to one DB, the behavior is the same as // db_write_buffer_size. When write_buffer_manager is set, the value set will