diff --git a/HISTORY.md b/HISTORY.md index 92687ee6e..ffa718c9d 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -9,6 +9,7 @@ * Add avoid_flush_during_recovery option. * Add a read option background_purge_on_iterator_cleanup to avoid deleting files in foreground when destroying iterators. Instead, a job is scheduled in high priority queue and would be executed in a separate background thread. * RepairDB support for column families. RepairDB now associates data with non-default column families using information embedded in the SST/WAL files (4.7 or later). For data written by 4.6 or earlier, RepairDB associates it with the default column family. +* Add options.write_buffer_manager which allows users to control total memtable sizes across multiple DB instances. ## 4.9.0 (6/9/2016) ### Public API changes diff --git a/db/column_family.cc b/db/column_family.cc index e0ad49e27..732a4cf29 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -26,7 +26,6 @@ #include "db/table_properties_collector.h" #include "db/version_set.h" #include "db/write_controller.h" -#include "db/writebuffer.h" #include "memtable/hash_skiplist_rep.h" #include "util/autovector.h" #include "util/compression.h" @@ -330,7 +329,7 @@ void SuperVersionUnrefHandle(void* ptr) { ColumnFamilyData::ColumnFamilyData( uint32_t id, const std::string& name, Version* _dummy_versions, - Cache* _table_cache, WriteBuffer* write_buffer, + Cache* _table_cache, WriteBufferManager* write_buffer_manager, const ColumnFamilyOptions& cf_options, const DBOptions* db_options, const EnvOptions& env_options, ColumnFamilySet* column_family_set) : id_(id), @@ -344,7 +343,7 @@ ColumnFamilyData::ColumnFamilyData( SanitizeOptions(*db_options, &internal_comparator_, cf_options)), ioptions_(options_), mutable_cf_options_(options_, ioptions_), - write_buffer_(write_buffer), + write_buffer_manager_(write_buffer_manager), mem_(nullptr), imm_(options_.min_write_buffer_number_to_merge, options_.max_write_buffer_number_to_maintain), @@ -676,7 +675,7 @@ MemTable* ColumnFamilyData::ConstructNewMemtable( const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) { assert(current_ != nullptr); return new MemTable(internal_comparator_, ioptions_, mutable_cf_options, - write_buffer_, earliest_seq); + write_buffer_manager_, earliest_seq); } void ColumnFamilyData::CreateNewMemtable( @@ -855,7 +854,7 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname, const DBOptions* db_options, const EnvOptions& env_options, Cache* table_cache, - WriteBuffer* write_buffer, + WriteBufferManager* write_buffer_manager, WriteController* write_controller) : max_column_family_(0), dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr, @@ -866,7 +865,7 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname, db_options_(db_options), env_options_(env_options), table_cache_(table_cache), - write_buffer_(write_buffer), + write_buffer_manager_(write_buffer_manager), write_controller_(write_controller) { // initialize linked list dummy_cfd_->prev_ = dummy_cfd_; @@ -929,10 +928,9 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( const std::string& name, uint32_t id, Version* dummy_versions, const ColumnFamilyOptions& options) { assert(column_families_.find(name) == column_families_.end()); - ColumnFamilyData* new_cfd = - new ColumnFamilyData(id, name, dummy_versions, table_cache_, - write_buffer_, options, db_options_, - env_options_, this); + ColumnFamilyData* new_cfd = new ColumnFamilyData( + id, name, dummy_versions, table_cache_, write_buffer_manager_, options, + db_options_, env_options_, this); column_families_.insert({name, id}); column_family_data_.insert({id, new_cfd}); max_column_family_ = std::max(max_column_family_, id); diff --git a/db/column_family.h b/db/column_family.h index f0dbced85..43f249b25 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -323,7 +323,7 @@ class ColumnFamilyData { friend class ColumnFamilySet; ColumnFamilyData(uint32_t id, const std::string& name, Version* dummy_versions, Cache* table_cache, - WriteBuffer* write_buffer, + WriteBufferManager* write_buffer_manager, const ColumnFamilyOptions& options, const DBOptions* db_options, const EnvOptions& env_options, ColumnFamilySet* column_family_set); @@ -348,7 +348,7 @@ class ColumnFamilyData { std::unique_ptr internal_stats_; - WriteBuffer* write_buffer_; + WriteBufferManager* write_buffer_manager_; MemTable* mem_; MemTableList imm_; @@ -438,7 +438,8 @@ class ColumnFamilySet { ColumnFamilySet(const std::string& dbname, const DBOptions* db_options, const EnvOptions& env_options, Cache* table_cache, - WriteBuffer* write_buffer, WriteController* write_controller); + WriteBufferManager* write_buffer_manager, + WriteController* write_controller); ~ColumnFamilySet(); ColumnFamilyData* GetDefault() const; @@ -495,7 +496,7 @@ class ColumnFamilySet { const DBOptions* const db_options_; const EnvOptions env_options_; Cache* table_cache_; - WriteBuffer* write_buffer_; + WriteBufferManager* write_buffer_manager_; WriteController* write_controller_; }; diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index f3bc4cca9..e218bd32a 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -10,13 +10,13 @@ #include #include -#include "db/compaction_job.h" #include "db/column_family.h" +#include "db/compaction_job.h" #include "db/version_set.h" -#include "db/writebuffer.h" #include "rocksdb/cache.h" #include "rocksdb/db.h" #include "rocksdb/options.h" +#include "rocksdb/write_buffer_manager.h" #include "table/mock_table.h" #include "util/file_reader_writer.h" #include "util/string_util.h" @@ -70,9 +70,9 @@ class CompactionJobTest : public testing::Test { dbname_(test::TmpDir() + "/compaction_job_test"), mutable_cf_options_(Options(), ImmutableCFOptions(Options())), table_cache_(NewLRUCache(50000, 16)), - write_buffer_(db_options_.db_write_buffer_size), + write_buffer_manager_(db_options_.db_write_buffer_size), versions_(new VersionSet(dbname_, &db_options_, env_options_, - table_cache_.get(), &write_buffer_, + table_cache_.get(), &write_buffer_manager_, &write_controller_)), shutting_down_(false), mock_table_factory_(new mock::MockTableFactory()) { @@ -285,7 +285,7 @@ class CompactionJobTest : public testing::Test { WriteController write_controller_; DBOptions db_options_; ColumnFamilyOptions cf_options_; - WriteBuffer write_buffer_; + WriteBufferManager write_buffer_manager_; std::unique_ptr versions_; InstrumentedMutex mutex_; std::atomic shutting_down_; diff --git a/db/db_impl.cc b/db/db_impl.cc index b3236b362..cabbcd82a 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -57,7 +57,6 @@ #include "db/version_set.h" #include "db/write_batch_internal.h" #include "db/write_callback.h" -#include "db/writebuffer.h" #include "db/xfunc_test_points.h" #include "memtable/hash_linklist_rep.h" #include "memtable/hash_skiplist_rep.h" @@ -73,6 +72,7 @@ #include "rocksdb/table.h" #include "rocksdb/version.h" #include "rocksdb/wal_filter.h" +#include "rocksdb/write_buffer_manager.h" #include "table/block.h" #include "table/block_based_table_factory.h" #include "table/merger.h" @@ -147,6 +147,10 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { result.info_log = nullptr; } } + if (!result.write_buffer_manager) { + result.write_buffer_manager.reset( + new WriteBufferManager(result.db_write_buffer_size)); + } if (result.base_background_compactions == -1) { result.base_background_compactions = result.max_background_compactions; } @@ -315,7 +319,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) total_log_size_(0), max_total_in_memory_state_(0), is_snapshot_supported_(true), - write_buffer_(options.db_write_buffer_size), + write_buffer_manager_(db_options_.write_buffer_manager.get()), write_thread_(options.enable_write_thread_adaptive_yield ? options.write_thread_max_yield_usec : 0, @@ -355,7 +359,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) NewLRUCache(table_cache_size, db_options_.table_cache_numshardbits); versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, - table_cache_.get(), &write_buffer_, + table_cache_.get(), write_buffer_manager_, &write_controller_)); column_family_memtables_.reset( new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet())); @@ -4420,11 +4424,17 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } } MaybeScheduleFlushOrCompaction(); - } else if (UNLIKELY(write_buffer_.ShouldFlush())) { + } else if (UNLIKELY(write_buffer_manager_->ShouldFlush())) { + // Before a new memtable is added in SwitchMemtable(), + // write_buffer_manager_->ShouldFlush() will keep returning true. If another + // thread is writing to another DB with the same write buffer, they may also + // be flushed. We may end up with flushing much more DBs than needed. It's + // suboptimal but still correct. Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Flushing column family with largest mem table size. Write buffer is " "using %" PRIu64 " bytes out of a total of %" PRIu64 ".", - write_buffer_.memory_usage(), write_buffer_.buffer_size()); + write_buffer_manager_->memory_usage(), + write_buffer_manager_->buffer_size()); // no need to refcount because drop is happening in write thread, so can't // happen while we're in the write thread ColumnFamilyData* largest_cfd = nullptr; diff --git a/db/db_impl.h b/db/db_impl.h index b736b2c2b..ff1d226fd 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -31,13 +31,13 @@ #include "db/wal_manager.h" #include "db/write_controller.h" #include "db/write_thread.h" -#include "db/writebuffer.h" #include "memtable_list.h" #include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/memtablerep.h" #include "rocksdb/transaction_log.h" +#include "rocksdb/write_buffer_manager.h" #include "table/scoped_arena_iterator.h" #include "util/autovector.h" #include "util/event_logger.h" @@ -803,7 +803,7 @@ class DBImpl : public DB { Directories directories_; - WriteBuffer write_buffer_; + WriteBufferManager* write_buffer_manager_; WriteThread write_thread_; diff --git a/db/db_test.cc b/db/db_test.cc index d7dcc2d26..d40a5a8c4 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2285,130 +2285,6 @@ TEST_F(DBTest, FlushOneColumnFamily) { } } -#ifndef ROCKSDB_LITE -TEST_F(DBTest, SharedWriteBuffer) { - Options options = CurrentOptions(); - options.db_write_buffer_size = 100000; // this is the real limit - options.write_buffer_size = 500000; // this is never hit - CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options); - - // Trigger a flush on CF "nikitich" - ASSERT_OK(Put(0, Key(1), DummyString(1))); - ASSERT_OK(Put(1, Key(1), DummyString(1))); - ASSERT_OK(Put(3, Key(1), DummyString(90000))); - ASSERT_OK(Put(2, Key(2), DummyString(20000))); - ASSERT_OK(Put(2, Key(1), DummyString(1))); - dbfull()->TEST_WaitForFlushMemTable(handles_[0]); - dbfull()->TEST_WaitForFlushMemTable(handles_[1]); - dbfull()->TEST_WaitForFlushMemTable(handles_[2]); - dbfull()->TEST_WaitForFlushMemTable(handles_[3]); - { - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(0)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), - static_cast(0)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), - static_cast(0)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), - static_cast(1)); - } - - // "dobrynia": 20KB - // Flush 'dobrynia' - ASSERT_OK(Put(3, Key(2), DummyString(40000))); - ASSERT_OK(Put(2, Key(2), DummyString(70000))); - ASSERT_OK(Put(0, Key(1), DummyString(1))); - dbfull()->TEST_WaitForFlushMemTable(handles_[1]); - dbfull()->TEST_WaitForFlushMemTable(handles_[2]); - dbfull()->TEST_WaitForFlushMemTable(handles_[3]); - { - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(0)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), - static_cast(0)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), - static_cast(1)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), - static_cast(1)); - } - - // "nikitich" still has data of 80KB - // Inserting Data in "dobrynia" triggers "nikitich" flushing. - ASSERT_OK(Put(3, Key(2), DummyString(40000))); - ASSERT_OK(Put(2, Key(2), DummyString(40000))); - ASSERT_OK(Put(0, Key(1), DummyString(1))); - dbfull()->TEST_WaitForFlushMemTable(handles_[1]); - dbfull()->TEST_WaitForFlushMemTable(handles_[2]); - dbfull()->TEST_WaitForFlushMemTable(handles_[3]); - { - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(0)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), - static_cast(0)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), - static_cast(1)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), - static_cast(2)); - } - - // "dobrynia" still has 40KB - ASSERT_OK(Put(1, Key(2), DummyString(20000))); - ASSERT_OK(Put(0, Key(1), DummyString(10000))); - ASSERT_OK(Put(0, Key(1), DummyString(1))); - dbfull()->TEST_WaitForFlushMemTable(handles_[0]); - dbfull()->TEST_WaitForFlushMemTable(handles_[1]); - dbfull()->TEST_WaitForFlushMemTable(handles_[2]); - dbfull()->TEST_WaitForFlushMemTable(handles_[3]); - // This should triggers no flush - { - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(0)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), - static_cast(0)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), - static_cast(1)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), - static_cast(2)); - } - - // "default": 10KB, "pikachu": 20KB, "dobrynia": 40KB - ASSERT_OK(Put(1, Key(2), DummyString(40000))); - ASSERT_OK(Put(0, Key(1), DummyString(1))); - dbfull()->TEST_WaitForFlushMemTable(handles_[0]); - dbfull()->TEST_WaitForFlushMemTable(handles_[1]); - dbfull()->TEST_WaitForFlushMemTable(handles_[2]); - dbfull()->TEST_WaitForFlushMemTable(handles_[3]); - // This should triggers flush of "pikachu" - { - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(0)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), - static_cast(1)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), - static_cast(1)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), - static_cast(2)); - } - - // "default": 10KB, "dobrynia": 40KB - // Some remaining writes so 'default', 'dobrynia' and 'nikitich' flush on - // closure. - ASSERT_OK(Put(3, Key(1), DummyString(1))); - ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"}, - options); - { - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(1)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), - static_cast(1)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), - static_cast(2)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), - static_cast(3)); - } -} -#endif // ROCKSDB_LITE - TEST_F(DBTest, PurgeInfoLogs) { Options options = CurrentOptions(); options.keep_log_file_num = 5; diff --git a/db/db_test2.cc b/db/db_test2.cc index 87a09595e..29b9a4536 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -144,6 +144,225 @@ TEST_F(DBTest2, CacheIndexAndFilterWithDBRestart) { } #ifndef ROCKSDB_LITE +class DBTestSharedWriteBufferAcrossCFs + : public DBTestBase, + public testing::WithParamInterface { + public: + DBTestSharedWriteBufferAcrossCFs() + : DBTestBase("/db_test_shared_write_buffer") {} + void SetUp() override { use_old_interface_ = GetParam(); } + bool use_old_interface_; +}; + +TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) { + Options options = CurrentOptions(); + if (use_old_interface_) { + options.db_write_buffer_size = 100000; // this is the real limit + } else { + options.write_buffer_manager.reset(new WriteBufferManager(100000)); + } + options.write_buffer_size = 500000; // this is never hit + CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options); + + // Trigger a flush on CF "nikitich" + ASSERT_OK(Put(0, Key(1), DummyString(1))); + ASSERT_OK(Put(1, Key(1), DummyString(1))); + ASSERT_OK(Put(3, Key(1), DummyString(90000))); + ASSERT_OK(Put(2, Key(2), DummyString(20000))); + ASSERT_OK(Put(2, Key(1), DummyString(1))); + dbfull()->TEST_WaitForFlushMemTable(handles_[0]); + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + dbfull()->TEST_WaitForFlushMemTable(handles_[3]); + { + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), + static_cast(1)); + } + + // "dobrynia": 20KB + // Flush 'dobrynia' + ASSERT_OK(Put(3, Key(2), DummyString(40000))); + ASSERT_OK(Put(2, Key(2), DummyString(70000))); + ASSERT_OK(Put(0, Key(1), DummyString(1))); + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + dbfull()->TEST_WaitForFlushMemTable(handles_[3]); + { + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), + static_cast(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), + static_cast(1)); + } + + // "nikitich" still has data of 80KB + // Inserting Data in "dobrynia" triggers "nikitich" flushing. + ASSERT_OK(Put(3, Key(2), DummyString(40000))); + ASSERT_OK(Put(2, Key(2), DummyString(40000))); + ASSERT_OK(Put(0, Key(1), DummyString(1))); + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + dbfull()->TEST_WaitForFlushMemTable(handles_[3]); + { + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), + static_cast(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), + static_cast(2)); + } + + // "dobrynia" still has 40KB + ASSERT_OK(Put(1, Key(2), DummyString(20000))); + ASSERT_OK(Put(0, Key(1), DummyString(10000))); + ASSERT_OK(Put(0, Key(1), DummyString(1))); + dbfull()->TEST_WaitForFlushMemTable(handles_[0]); + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + dbfull()->TEST_WaitForFlushMemTable(handles_[3]); + // This should triggers no flush + { + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), + static_cast(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), + static_cast(2)); + } + + // "default": 10KB, "pikachu": 20KB, "dobrynia": 40KB + ASSERT_OK(Put(1, Key(2), DummyString(40000))); + ASSERT_OK(Put(0, Key(1), DummyString(1))); + dbfull()->TEST_WaitForFlushMemTable(handles_[0]); + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + dbfull()->TEST_WaitForFlushMemTable(handles_[3]); + // This should triggers flush of "pikachu" + { + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), + static_cast(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), + static_cast(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), + static_cast(2)); + } + + // "default": 10KB, "dobrynia": 40KB + // Some remaining writes so 'default', 'dobrynia' and 'nikitich' flush on + // closure. + ASSERT_OK(Put(3, Key(1), DummyString(1))); + ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"}, + options); + { + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + static_cast(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), + static_cast(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), + static_cast(2)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), + static_cast(3)); + } +} + +INSTANTIATE_TEST_CASE_P(DBTestSharedWriteBufferAcrossCFs, + DBTestSharedWriteBufferAcrossCFs, ::testing::Bool()); + +TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) { + std::string dbname2 = test::TmpDir(env_) + "/db_shared_wb_db2"; + Options options = CurrentOptions(); + options.write_buffer_size = 500000; // this is never hit + options.write_buffer_manager.reset(new WriteBufferManager(100000)); + CreateAndReopenWithCF({"cf1", "cf2"}, options); + + ASSERT_OK(DestroyDB(dbname2, options)); + DB* db2 = nullptr; + ASSERT_OK(DB::Open(options, dbname2, &db2)); + + WriteOptions wo; + + // Trigger a flush on cf2 + ASSERT_OK(Put(0, Key(1), DummyString(1))); + ASSERT_OK(Put(1, Key(1), DummyString(1))); + ASSERT_OK(Put(2, Key(1), DummyString(90000))); + + // Insert to DB2 + ASSERT_OK(db2->Put(wo, Key(2), DummyString(20000))); + + ASSERT_OK(Put(2, Key(1), DummyString(1))); + dbfull()->TEST_WaitForFlushMemTable(handles_[0]); + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + static_cast(db2)->TEST_WaitForFlushMemTable(); + { + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"), + static_cast(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"), + static_cast(0)); + } + + // db2: 20KB + ASSERT_OK(db2->Put(wo, Key(2), DummyString(40000))); + ASSERT_OK(db2->Put(wo, Key(3), DummyString(70000))); + ASSERT_OK(db2->Put(wo, Key(1), DummyString(1))); + dbfull()->TEST_WaitForFlushMemTable(handles_[0]); + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + static_cast(db2)->TEST_WaitForFlushMemTable(); + { + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"), + static_cast(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"), + static_cast(1)); + } + + // + // Inserting Data in db2 and db_ triggers flushing in db_. + ASSERT_OK(db2->Put(wo, Key(3), DummyString(70000))); + ASSERT_OK(Put(2, Key(2), DummyString(45000))); + ASSERT_OK(Put(0, Key(1), DummyString(1))); + dbfull()->TEST_WaitForFlushMemTable(handles_[0]); + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + static_cast(db2)->TEST_WaitForFlushMemTable(); + { + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"), + static_cast(2)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"), + static_cast(1)); + } + + delete db2; + ASSERT_OK(DestroyDB(dbname2, options)); +} + namespace { void ValidateKeyExistence(DB* db, const std::vector& keys_must_exist, const std::vector& keys_must_not_exist) { diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 4c0ab4efa..868d240d9 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -494,14 +494,14 @@ class RecoveryTestHelper { shared_ptr table_cache = NewLRUCache(50000, 16); EnvOptions env_options; - WriteBuffer write_buffer(db_options.db_write_buffer_size); + WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size); unique_ptr versions; unique_ptr wal_manager; WriteController write_controller; versions.reset(new VersionSet(test->dbname_, &db_options, env_options, - table_cache.get(), &write_buffer, + table_cache.get(), &write_buffer_manager, &write_controller)); wal_manager.reset(new WalManager(db_options, env_options)); diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index c285987d9..062ab1d50 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -7,16 +7,16 @@ #include #include -#include "db/flush_job.h" #include "db/column_family.h" +#include "db/flush_job.h" #include "db/version_set.h" -#include "db/writebuffer.h" #include "rocksdb/cache.h" +#include "rocksdb/write_buffer_manager.h" +#include "table/mock_table.h" #include "util/file_reader_writer.h" #include "util/string_util.h" #include "util/testharness.h" #include "util/testutil.h" -#include "table/mock_table.h" namespace rocksdb { @@ -29,9 +29,9 @@ class FlushJobTest : public testing::Test { : env_(Env::Default()), dbname_(test::TmpDir() + "/flush_job_test"), table_cache_(NewLRUCache(50000, 16)), - write_buffer_(db_options_.db_write_buffer_size), + write_buffer_manager_(db_options_.db_write_buffer_size), versions_(new VersionSet(dbname_, &db_options_, env_options_, - table_cache_.get(), &write_buffer_, + table_cache_.get(), &write_buffer_manager_, &write_controller_)), shutting_down_(false), mock_table_factory_(new mock::MockTableFactory()) { @@ -77,7 +77,7 @@ class FlushJobTest : public testing::Test { std::shared_ptr table_cache_; WriteController write_controller_; DBOptions db_options_; - WriteBuffer write_buffer_; + WriteBufferManager write_buffer_manager_; ColumnFamilyOptions cf_options_; std::unique_ptr versions_; InstrumentedMutex mutex_; diff --git a/db/memtable.cc b/db/memtable.cc index d986bf95a..8b6f49a7c 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -17,12 +17,12 @@ #include "db/merge_context.h" #include "db/merge_helper.h" #include "db/pinned_iterators_manager.h" -#include "db/writebuffer.h" #include "rocksdb/comparator.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "rocksdb/merge_operator.h" #include "rocksdb/slice_transform.h" +#include "rocksdb/write_buffer_manager.h" #include "table/internal_iterator.h" #include "table/merger.h" #include "util/arena.h" @@ -57,13 +57,14 @@ MemTableOptions::MemTableOptions(const ImmutableCFOptions& ioptions, MemTable::MemTable(const InternalKeyComparator& cmp, const ImmutableCFOptions& ioptions, const MutableCFOptions& mutable_cf_options, - WriteBuffer* write_buffer, SequenceNumber earliest_seq) + WriteBufferManager* write_buffer_manager, + SequenceNumber earliest_seq) : comparator_(cmp), moptions_(ioptions, mutable_cf_options), refs_(0), kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)), arena_(moptions_.arena_block_size, 0), - allocator_(&arena_, write_buffer), + allocator_(&arena_, write_buffer_manager), table_(ioptions.memtable_factory->CreateMemTableRep( comparator_, &allocator_, ioptions.prefix_extractor, ioptions.info_log)), diff --git a/db/memtable.h b/db/memtable.h index aff41655c..e30c99976 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -32,7 +32,6 @@ namespace rocksdb { class Mutex; class MemTableIterator; class MergeContext; -class WriteBuffer; class InternalIterator; struct MemTableOptions { @@ -91,7 +90,8 @@ class MemTable { explicit MemTable(const InternalKeyComparator& comparator, const ImmutableCFOptions& ioptions, const MutableCFOptions& mutable_cf_options, - WriteBuffer* write_buffer, SequenceNumber earliest_seq); + WriteBufferManager* write_buffer_manager, + SequenceNumber earliest_seq); // Do not delete this MemTable unless Unref() indicates it not in use. ~MemTable(); diff --git a/db/memtable_allocator.cc b/db/memtable_allocator.cc index f9b2fbd73..1a50b962d 100644 --- a/db/memtable_allocator.cc +++ b/db/memtable_allocator.cc @@ -10,36 +10,39 @@ #include "db/memtable_allocator.h" #include -#include "db/writebuffer.h" +#include "rocksdb/write_buffer_manager.h" #include "util/arena.h" namespace rocksdb { MemTableAllocator::MemTableAllocator(Allocator* allocator, - WriteBuffer* write_buffer) - : allocator_(allocator), write_buffer_(write_buffer), bytes_allocated_(0) {} + WriteBufferManager* write_buffer_manager) + : allocator_(allocator), + write_buffer_manager_(write_buffer_manager), + bytes_allocated_(0) {} MemTableAllocator::~MemTableAllocator() { DoneAllocating(); } char* MemTableAllocator::Allocate(size_t bytes) { - assert(write_buffer_ != nullptr); + assert(write_buffer_manager_ != nullptr); bytes_allocated_.fetch_add(bytes, std::memory_order_relaxed); - write_buffer_->ReserveMem(bytes); + write_buffer_manager_->ReserveMem(bytes); return allocator_->Allocate(bytes); } char* MemTableAllocator::AllocateAligned(size_t bytes, size_t huge_page_size, Logger* logger) { - assert(write_buffer_ != nullptr); + assert(write_buffer_manager_ != nullptr); bytes_allocated_.fetch_add(bytes, std::memory_order_relaxed); - write_buffer_->ReserveMem(bytes); + write_buffer_manager_->ReserveMem(bytes); return allocator_->AllocateAligned(bytes, huge_page_size, logger); } void MemTableAllocator::DoneAllocating() { - if (write_buffer_ != nullptr) { - write_buffer_->FreeMem(bytes_allocated_.load(std::memory_order_relaxed)); - write_buffer_ = nullptr; + if (write_buffer_manager_ != nullptr) { + write_buffer_manager_->FreeMem( + bytes_allocated_.load(std::memory_order_relaxed)); + write_buffer_manager_ = nullptr; } } diff --git a/db/memtable_allocator.h b/db/memtable_allocator.h index d8bd4c808..050e13b36 100644 --- a/db/memtable_allocator.h +++ b/db/memtable_allocator.h @@ -8,21 +8,23 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. // // This is used by the MemTable to allocate write buffer memory. It connects -// to WriteBuffer so we can track and enforce overall write buffer limits. +// to WriteBufferManager so we can track and enforce overall write buffer +// limits. #pragma once #include +#include "rocksdb/write_buffer_manager.h" #include "util/allocator.h" namespace rocksdb { class Logger; -class WriteBuffer; class MemTableAllocator : public Allocator { public: - explicit MemTableAllocator(Allocator* allocator, WriteBuffer* write_buffer); + explicit MemTableAllocator(Allocator* allocator, + WriteBufferManager* write_buffer_manager); ~MemTableAllocator(); // Allocator interface @@ -37,7 +39,7 @@ class MemTableAllocator : public Allocator { private: Allocator* allocator_; - WriteBuffer* write_buffer_; + WriteBufferManager* write_buffer_manager_; std::atomic bytes_allocated_; // No copying allowed diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 50f96b9f8..42420b876 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -3,19 +3,19 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. +#include "db/memtable_list.h" #include #include #include -#include "db/memtable_list.h" #include "db/merge_context.h" #include "db/version_set.h" #include "db/write_controller.h" -#include "db/writebuffer.h" #include "rocksdb/db.h" #include "rocksdb/status.h" -#include "util/testutil.h" +#include "rocksdb/write_buffer_manager.h" #include "util/string_util.h" #include "util/testharness.h" +#include "util/testutil.h" namespace rocksdb { @@ -59,12 +59,12 @@ class MemTableListTest : public testing::Test { DBOptions db_options; EnvOptions env_options; shared_ptr table_cache(NewLRUCache(50000, 16)); - WriteBuffer write_buffer(db_options.db_write_buffer_size); + WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size); WriteController write_controller(10000000u); CreateDB(); VersionSet versions(dbname, &db_options, env_options, table_cache.get(), - &write_buffer, &write_controller); + &write_buffer_manager, &write_controller); // Create mock default ColumnFamilyData ColumnFamilyOptions cf_options; @@ -126,7 +126,7 @@ TEST_F(MemTableListTest, GetTest) { options.memtable_factory = factory; ImmutableCFOptions ioptions(options); - WriteBuffer wb(options.db_write_buffer_size); + WriteBufferManager wb(options.db_write_buffer_size); MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb, kMaxSequenceNumber); @@ -163,7 +163,7 @@ TEST_F(MemTableListTest, GetTest) { SequenceNumber saved_seq = seq; // Create another memtable and write some keys to it - WriteBuffer wb2(options.db_write_buffer_size); + WriteBufferManager wb2(options.db_write_buffer_size); MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb2, kMaxSequenceNumber); @@ -228,7 +228,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { options.memtable_factory = factory; ImmutableCFOptions ioptions(options); - WriteBuffer wb(options.db_write_buffer_size); + WriteBufferManager wb(options.db_write_buffer_size); MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb, kMaxSequenceNumber); @@ -303,7 +303,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { ASSERT_EQ("value2.2", value); // Create another memtable and write some keys to it - WriteBuffer wb2(options.db_write_buffer_size); + WriteBufferManager wb2(options.db_write_buffer_size); MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb2, kMaxSequenceNumber); @@ -329,7 +329,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { ASSERT_EQ(0, to_delete.size()); // Add a third memtable to push the first memtable out of the history - WriteBuffer wb3(options.db_write_buffer_size); + WriteBufferManager wb3(options.db_write_buffer_size); MemTable* mem3 = new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb3, kMaxSequenceNumber); @@ -390,7 +390,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { options.memtable_factory = factory; ImmutableCFOptions ioptions(options); InternalKeyComparator cmp(BytewiseComparator()); - WriteBuffer wb(options.db_write_buffer_size); + WriteBufferManager wb(options.db_write_buffer_size); autovector to_delete; // Create MemTableList diff --git a/db/memtablerep_bench.cc b/db/memtablerep_bench.cc index a897adeab..b5875618b 100644 --- a/db/memtablerep_bench.cc +++ b/db/memtablerep_bench.cc @@ -28,13 +28,13 @@ int main() { #include "db/dbformat.h" #include "db/memtable.h" -#include "db/writebuffer.h" #include "port/port.h" #include "port/stack_trace.h" #include "rocksdb/comparator.h" #include "rocksdb/memtablerep.h" #include "rocksdb/options.h" #include "rocksdb/slice_transform.h" +#include "rocksdb/write_buffer_manager.h" #include "util/arena.h" #include "util/mutexlock.h" #include "util/stop_watch.h" @@ -622,7 +622,7 @@ int main(int argc, char** argv) { rocksdb::BytewiseComparator()); rocksdb::MemTable::KeyComparator key_comp(internal_key_comp); rocksdb::Arena arena; - rocksdb::WriteBuffer wb(FLAGS_write_buffer_size); + rocksdb::WriteBufferManager wb(FLAGS_write_buffer_size); rocksdb::MemTableAllocator memtable_allocator(&arena, &wb); uint64_t sequence; auto createMemtableRep = [&] { diff --git a/db/repair.cc b/db/repair.cc index f7010836e..a46e33f2d 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -75,12 +75,12 @@ #include "db/table_cache.h" #include "db/version_edit.h" #include "db/write_batch_internal.h" -#include "db/writebuffer.h" #include "rocksdb/comparator.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/immutable_options.h" #include "rocksdb/options.h" +#include "rocksdb/write_buffer_manager.h" #include "table/scoped_arena_iterator.h" #include "util/file_reader_writer.h" #include "util/string_util.h" @@ -229,7 +229,7 @@ class Repairer { const bool create_unknown_cfs_; std::shared_ptr raw_table_cache_; TableCache* table_cache_; - WriteBuffer wb_; + WriteBufferManager wb_; WriteController wc_; VersionSet vset_; std::unordered_map cf_name_to_opts_; diff --git a/db/version_set.cc b/db/version_set.cc index 2bd87386b..43000b394 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -33,9 +33,9 @@ #include "db/merge_helper.h" #include "db/table_cache.h" #include "db/version_builder.h" -#include "db/writebuffer.h" #include "rocksdb/env.h" #include "rocksdb/merge_operator.h" +#include "rocksdb/write_buffer_manager.h" #include "table/format.h" #include "table/get_context.h" #include "table/internal_iterator.h" @@ -2086,11 +2086,11 @@ struct VersionSet::ManifestWriter { VersionSet::VersionSet(const std::string& dbname, const DBOptions* db_options, const EnvOptions& storage_options, Cache* table_cache, - WriteBuffer* write_buffer, + WriteBufferManager* write_buffer_manager, WriteController* write_controller) - : column_family_set_(new ColumnFamilySet( - dbname, db_options, storage_options, table_cache, - write_buffer, write_controller)), + : column_family_set_( + new ColumnFamilySet(dbname, db_options, storage_options, table_cache, + write_buffer_manager, write_controller)), env_(db_options->env), dbname_(dbname), db_options_(db_options), @@ -2837,7 +2837,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, std::shared_ptr tc(NewLRUCache(options->max_open_files - 10, options->table_cache_numshardbits)); WriteController wc(options->delayed_write_rate); - WriteBuffer wb(options->db_write_buffer_size); + WriteBufferManager wb(options->db_write_buffer_size); VersionSet versions(dbname, options, env_options, tc.get(), &wb, &wc); Status status; diff --git a/db/version_set.h b/db/version_set.h index 83270b35b..8fe7c47ce 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -55,7 +55,7 @@ class LookupKey; class MemTable; class Version; class VersionSet; -class WriteBuffer; +class WriteBufferManager; class MergeContext; class ColumnFamilySet; class TableCache; @@ -575,7 +575,8 @@ class VersionSet { public: VersionSet(const std::string& dbname, const DBOptions* db_options, const EnvOptions& env_options, Cache* table_cache, - WriteBuffer* write_buffer, WriteController* write_controller); + WriteBufferManager* write_buffer_manager, + WriteController* write_controller); ~VersionSet(); // Apply *edit to the current version to form a new descriptor that diff --git a/db/wal_manager_test.cc b/db/wal_manager_test.cc index 4d3f5b6e2..5db538a31 100644 --- a/db/wal_manager_test.cc +++ b/db/wal_manager_test.cc @@ -10,12 +10,12 @@ #include "rocksdb/cache.h" #include "rocksdb/write_batch.h" +#include "rocksdb/write_buffer_manager.h" #include "db/wal_manager.h" #include "db/log_writer.h" #include "db/column_family.h" #include "db/version_set.h" -#include "db/writebuffer.h" #include "util/file_reader_writer.h" #include "util/mock_env.h" #include "util/string_util.h" @@ -34,7 +34,7 @@ class WalManagerTest : public testing::Test { : env_(new MockEnv(Env::Default())), dbname_(test::TmpDir() + "/wal_manager_test"), table_cache_(NewLRUCache(50000, 16)), - write_buffer_(db_options_.db_write_buffer_size), + write_buffer_manager_(db_options_.db_write_buffer_size), current_log_number_(0) { DestroyDB(dbname_, Options()); } @@ -48,7 +48,7 @@ class WalManagerTest : public testing::Test { db_options_.env = env_.get(); versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, - table_cache_.get(), &write_buffer_, + table_cache_.get(), &write_buffer_manager_, &write_controller_)); wal_manager_.reset(new WalManager(db_options_, env_options_)); @@ -104,7 +104,7 @@ class WalManagerTest : public testing::Test { EnvOptions env_options_; std::shared_ptr table_cache_; DBOptions db_options_; - WriteBuffer write_buffer_; + WriteBufferManager write_buffer_manager_; std::unique_ptr versions_; std::unique_ptr wal_manager_; @@ -128,7 +128,7 @@ TEST_F(WalManagerTest, ReadFirstRecordCache) { unique_ptr file_writer( new WritableFileWriter(std::move(file), EnvOptions())); log::Writer writer(std::move(file_writer), 1, - db_options_.recycle_log_file_num > 0); + db_options_.recycle_log_file_num > 0); WriteBatch batch; batch.Put("foo", "bar"); WriteBatchInternal::SetSequence(&batch, 10); diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 530cccf92..cc351a5a6 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -10,13 +10,13 @@ #include "rocksdb/db.h" #include -#include "db/memtable.h" #include "db/column_family.h" +#include "db/memtable.h" #include "db/write_batch_internal.h" -#include "db/writebuffer.h" #include "rocksdb/env.h" #include "rocksdb/memtablerep.h" #include "rocksdb/utilities/write_batch_with_index.h" +#include "rocksdb/write_buffer_manager.h" #include "table/scoped_arena_iterator.h" #include "util/logging.h" #include "util/string_util.h" @@ -30,7 +30,7 @@ static std::string PrintContents(WriteBatch* b) { Options options; options.memtable_factory = factory; ImmutableCFOptions ioptions(options); - WriteBuffer wb(options.db_write_buffer_size); + WriteBufferManager wb(options.db_write_buffer_size); MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb, kMaxSequenceNumber); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index f36912dbd..1e3f58252 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -17,9 +17,10 @@ #include #include -#include "rocksdb/version.h" #include "rocksdb/listener.h" #include "rocksdb/universal_compaction.h" +#include "rocksdb/version.h" +#include "rocksdb/write_buffer_manager.h" #ifdef max #undef max @@ -1138,6 +1139,22 @@ struct DBOptions { // Default: 0 (disabled) size_t db_write_buffer_size; + // The memory usage of memtable will report to this object. The same object + // can be passed into multiple DBs and it will track the sum of size of all + // the DBs. If the total size of all live memtables of all the DBs exceeds + // a limit, a flush will be triggered in the next DB to which the next write + // is issued. + // + // If the object is only passed to on DB, the behavior is the same as + // db_write_buffer_size. When write_buffer_manager is set, the value set will + // override db_write_buffer_size. + // + // This feature is disabled by default. Specify a non-zero value + // to enable it. + // + // Default: null + std::shared_ptr write_buffer_manager; + // Specify the file access pattern once a compaction is started. // It will be applied to all input files of a compaction. // Default: NORMAL diff --git a/db/writebuffer.h b/include/rocksdb/write_buffer_manager.h similarity index 79% rename from db/writebuffer.h rename to include/rocksdb/write_buffer_manager.h index 19d51d925..283300f0a 100644 --- a/db/writebuffer.h +++ b/include/rocksdb/write_buffer_manager.h @@ -7,20 +7,22 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. // -// WriteBuffer is for managing memory allocation for one or more MemTables. +// WriteBufferManager is for managing memory allocation for one or more +// MemTables. #pragma once #include +#include namespace rocksdb { -class WriteBuffer { +class WriteBufferManager { public: - explicit WriteBuffer(size_t _buffer_size) + explicit WriteBufferManager(size_t _buffer_size) : buffer_size_(_buffer_size), memory_used_(0) {} - ~WriteBuffer() {} + ~WriteBufferManager() {} size_t memory_usage() const { return memory_used_.load(std::memory_order_relaxed); @@ -45,8 +47,7 @@ class WriteBuffer { std::atomic memory_used_; // No copying allowed - WriteBuffer(const WriteBuffer&); - void operator=(const WriteBuffer&); + WriteBufferManager(const WriteBufferManager&) = delete; + WriteBufferManager& operator=(const WriteBufferManager&) = delete; }; - } // namespace rocksdb diff --git a/java/rocksjni/write_batch.cc b/java/rocksjni/write_batch.cc index 017787209..2ae958702 100644 --- a/java/rocksjni/write_batch.cc +++ b/java/rocksjni/write_batch.cc @@ -7,19 +7,19 @@ // calling c++ rocksdb::WriteBatch methods from Java side. #include +#include "db/memtable.h" +#include "db/write_batch_internal.h" #include "include/org_rocksdb_WriteBatch.h" #include "include/org_rocksdb_WriteBatch_Handler.h" -#include "rocksjni/portal.h" -#include "rocksjni/writebatchhandlerjnicallback.h" #include "rocksdb/db.h" -#include "rocksdb/immutable_options.h" -#include "db/memtable.h" -#include "rocksdb/write_batch.h" -#include "rocksdb/status.h" -#include "db/write_batch_internal.h" -#include "db/writebuffer.h" #include "rocksdb/env.h" +#include "rocksdb/immutable_options.h" #include "rocksdb/memtablerep.h" +#include "rocksdb/status.h" +#include "rocksdb/write_batch.h" +#include "rocksdb/write_buffer_manager.h" +#include "rocksjni/portal.h" +#include "rocksjni/writebatchhandlerjnicallback.h" #include "table/scoped_arena_iterator.h" #include "util/logging.h" #include "util/testharness.h" diff --git a/java/rocksjni/write_batch_test.cc b/java/rocksjni/write_batch_test.cc index 4e8705967..371744e4f 100644 --- a/java/rocksjni/write_batch_test.cc +++ b/java/rocksjni/write_batch_test.cc @@ -9,17 +9,17 @@ #include "db/memtable.h" #include "db/write_batch_internal.h" -#include "db/writebuffer.h" #include "include/org_rocksdb_WriteBatch.h" -#include "include/org_rocksdb_WriteBatch_Handler.h" #include "include/org_rocksdb_WriteBatchTest.h" #include "include/org_rocksdb_WriteBatchTestInternalHelper.h" +#include "include/org_rocksdb_WriteBatch_Handler.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/immutable_options.h" #include "rocksdb/memtablerep.h" #include "rocksdb/status.h" #include "rocksdb/write_batch.h" +#include "rocksdb/write_buffer_manager.h" #include "rocksjni/portal.h" #include "table/scoped_arena_iterator.h" #include "util/logging.h" @@ -42,7 +42,7 @@ jbyteArray Java_org_rocksdb_WriteBatchTest_getContents( rocksdb::InternalKeyComparator cmp(rocksdb::BytewiseComparator()); auto factory = std::make_shared(); rocksdb::Options options; - rocksdb::WriteBuffer wb(options.db_write_buffer_size); + rocksdb::WriteBufferManager wb(options.db_write_buffer_size); options.memtable_factory = factory; rocksdb::MemTable* mem = new rocksdb::MemTable( cmp, rocksdb::ImmutableCFOptions(options), diff --git a/table/table_test.cc b/table/table_test.cc index d027dfd99..53aaad532 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -20,7 +20,6 @@ #include "db/dbformat.h" #include "db/memtable.h" #include "db/write_batch_internal.h" -#include "db/writebuffer.h" #include "memtable/stl_wrappers.h" #include "rocksdb/cache.h" #include "rocksdb/db.h" @@ -30,6 +29,7 @@ #include "rocksdb/perf_context.h" #include "rocksdb/slice_transform.h" #include "rocksdb/statistics.h" +#include "rocksdb/write_buffer_manager.h" #include "table/block.h" #include "table/block_based_table_builder.h" #include "table/block_based_table_factory.h" @@ -400,10 +400,10 @@ uint64_t TableConstructor::cur_uniq_id_ = 1; class MemTableConstructor: public Constructor { public: - explicit MemTableConstructor(const Comparator* cmp, WriteBuffer* wb) + explicit MemTableConstructor(const Comparator* cmp, WriteBufferManager* wb) : Constructor(cmp), internal_comparator_(cmp), - write_buffer_(wb), + write_buffer_manager_(wb), table_factory_(new SkipListFactory) { options_.memtable_factory = table_factory_; ImmutableCFOptions ioptions(options_); @@ -423,7 +423,7 @@ class MemTableConstructor: public Constructor { ImmutableCFOptions mem_ioptions(ioptions); memtable_ = new MemTable(internal_comparator_, mem_ioptions, MutableCFOptions(options_, mem_ioptions), - write_buffer_, kMaxSequenceNumber); + write_buffer_manager_, kMaxSequenceNumber); memtable_->Ref(); int seq = 1; for (const auto kv : kv_map) { @@ -445,7 +445,7 @@ class MemTableConstructor: public Constructor { mutable Arena arena_; InternalKeyComparator internal_comparator_; Options options_; - WriteBuffer* write_buffer_; + WriteBufferManager* write_buffer_manager_; MemTable* memtable_; std::shared_ptr table_factory_; }; @@ -941,7 +941,7 @@ class HarnessTest : public testing::Test { ImmutableCFOptions ioptions_; BlockBasedTableOptions table_options_ = BlockBasedTableOptions(); Constructor* constructor_; - WriteBuffer write_buffer_; + WriteBufferManager write_buffer_; bool support_prev_; bool only_support_prefix_seek_; shared_ptr internal_comparator_; @@ -2237,7 +2237,7 @@ TEST_F(MemTableTest, Simple) { Options options; options.memtable_factory = table_factory; ImmutableCFOptions ioptions(options); - WriteBuffer wb(options.db_write_buffer_size); + WriteBufferManager wb(options.db_write_buffer_size); MemTable* memtable = new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb, kMaxSequenceNumber); diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 90438af58..406b873ab 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -17,11 +17,11 @@ #include "db/filename.h" #include "db/log_reader.h" #include "db/write_batch_internal.h" -#include "db/writebuffer.h" #include "port/dirent.h" #include "rocksdb/cache.h" #include "rocksdb/table_properties.h" #include "rocksdb/write_batch.h" +#include "rocksdb/write_buffer_manager.h" #include "table/scoped_arena_iterator.h" #include "tools/ldb_cmd_impl.h" #include "tools/sst_dump_tool_imp.h" @@ -868,7 +868,7 @@ void DumpManifestFile(std::string file, bool verbose, bool hex, bool json) { options.db_paths.emplace_back("dummy", 0); options.num_levels = 64; WriteController wc(options.delayed_write_rate); - WriteBuffer wb(options.db_write_buffer_size); + WriteBufferManager wb(options.db_write_buffer_size); VersionSet versions(dbname, &options, sopt, tc.get(), &wb, &wc); Status s = versions.DumpManifest(options, file, verbose, hex, json); if (!s.ok()) { @@ -1578,7 +1578,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, NewLRUCache(opt.max_open_files - 10, opt.table_cache_numshardbits)); const InternalKeyComparator cmp(opt.comparator); WriteController wc(opt.delayed_write_rate); - WriteBuffer wb(opt.db_write_buffer_size); + WriteBufferManager wb(opt.db_write_buffer_size); VersionSet versions(db_path_, &opt, soptions, tc.get(), &wb, &wc); std::vector dummy; ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName, diff --git a/util/options.cc b/util/options.cc index 6bb3be4a9..16c7a7fa7 100644 --- a/util/options.cc +++ b/util/options.cc @@ -318,6 +318,7 @@ DBOptions::DBOptions(const Options& options) stats_dump_period_sec(options.stats_dump_period_sec), advise_random_on_open(options.advise_random_on_open), db_write_buffer_size(options.db_write_buffer_size), + write_buffer_manager(options.write_buffer_manager), access_hint_on_compaction_start(options.access_hint_on_compaction_start), new_table_reader_for_compaction_inputs( options.new_table_reader_for_compaction_inputs), diff --git a/util/options_settable_test.cc b/util/options_settable_test.cc index 2cfe065ac..b23e270e0 100644 --- a/util/options_settable_test.cc +++ b/util/options_settable_test.cc @@ -197,6 +197,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { {offsetof(struct DBOptions, db_paths), sizeof(std::vector)}, {offsetof(struct DBOptions, db_log_dir), sizeof(std::string)}, {offsetof(struct DBOptions, wal_dir), sizeof(std::string)}, + {offsetof(struct DBOptions, write_buffer_manager), + sizeof(std::shared_ptr)}, {offsetof(struct DBOptions, listeners), sizeof(std::vector>)}, {offsetof(struct DBOptions, row_cache), sizeof(std::shared_ptr)},