From a14b7873ee85f12150e3ef544cd535c322c864f9 Mon Sep 17 00:00:00 2001 From: Jonah Cohen Date: Tue, 2 Dec 2014 12:09:20 -0800 Subject: [PATCH] Enforce write buffer memory limit across column families Summary: Introduces a new class for managing write buffer memory across column families. We supplement ColumnFamilyOptions::write_buffer_size with ColumnFamilyOptions::write_buffer, a shared pointer to a WriteBuffer instance that enforces memory limits before flushing out to disk. Test Plan: Added SharedWriteBuffer unit test to db_test.cc Reviewers: sdong, rven, ljin, igor Reviewed By: igor Subscribers: tnovak, yhchiang, dhruba, xjin, MarkCallaghan, yoshinorim Differential Revision: https://reviews.facebook.net/D22581 --- HISTORY.md | 1 + db/c.cc | 5 ++ db/column_family.cc | 24 ++++++++-- db/column_family.h | 9 +++- db/compaction_job_test.cc | 6 ++- db/db_bench.cc | 4 ++ db/db_impl.cc | 25 ++++++++-- db/db_impl.h | 3 ++ db/db_test.cc | 86 +++++++++++++++++++++++++++++++++-- db/flush_job_test.cc | 10 ++-- db/log_and_apply_bench.cc | 4 +- db/memtable.cc | 12 +++-- db/memtable.h | 14 ++++-- db/memtable_allocator.cc | 52 +++++++++++++++++++++ db/memtable_allocator.h | 47 +++++++++++++++++++ db/repair.cc | 4 +- db/skiplist.h | 24 +++++----- db/version_set.cc | 8 +++- db/version_set.h | 3 +- db/wal_manager_test.cc | 6 ++- db/write_batch_test.cc | 4 +- db/writebuffer.h | 44 ++++++++++++++++++ include/rocksdb/memtablerep.h | 22 +++++---- include/rocksdb/options.h | 15 ++++++ table/bloom_block.h | 7 +-- table/table_test.cc | 31 ++++++++----- tools/db_stress.cc | 5 ++ util/allocator.h | 32 +++++++++++++ util/arena.h | 14 +++--- util/dynamic_bloom.cc | 13 +++--- util/dynamic_bloom.h | 11 +++-- util/dynamic_bloom_test.cc | 1 + util/hash_cuckoo_rep.cc | 22 +++++---- util/hash_cuckoo_rep.h | 2 +- util/hash_linklist_rep.cc | 44 +++++++++--------- util/hash_linklist_rep.h | 2 +- util/hash_skiplist_rep.cc | 29 ++++++------ util/hash_skiplist_rep.h | 2 +- util/ldb_cmd.cc | 19 +++++++- util/ldb_cmd.h | 1 + util/ldb_tool.cc | 2 + util/options.cc | 5 ++ util/options_helper.cc | 2 + util/skiplistrep.cc | 11 +++-- util/vectorrep.cc | 12 +++-- 45 files changed, 551 insertions(+), 148 deletions(-) create mode 100644 db/memtable_allocator.cc create mode 100644 db/memtable_allocator.h create mode 100644 db/writebuffer.h create mode 100644 util/allocator.h diff --git a/HISTORY.md b/HISTORY.md index 93170fa6f..f2b5bf873 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,7 @@ database which is an image of the existing database. *New API LinkFile added to Env. If you implement your own Env class, an implementation of the API LinkFile will have to be provided. +* MemTableRep takes MemTableAllocator instead of Arena ## 3.8.0 (11/14/2014) diff --git a/db/c.cc b/db/c.cc index 857f4e654..76a949cd1 100644 --- a/db/c.cc +++ b/db/c.cc @@ -1264,6 +1264,11 @@ void rocksdb_options_set_info_log_level( opt->rep.info_log_level = static_cast(v); } +void rocksdb_options_set_db_write_buffer_size(rocksdb_options_t* opt, + size_t s) { + opt->rep.db_write_buffer_size = s; +} + void rocksdb_options_set_write_buffer_size(rocksdb_options_t* opt, size_t s) { opt->rep.write_buffer_size = s; } diff --git a/db/column_family.cc b/db/column_family.cc index 7ba5ad763..f07c741a4 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -21,6 +21,9 @@ #include "db/compaction_picker.h" #include "db/db_impl.h" +#include "db/job_context.h" +#include "db/version_set.h" +#include "db/writebuffer.h" #include "db/internal_stats.h" #include "db/job_context.h" #include "db/table_properties_collector.h" @@ -223,6 +226,7 @@ void SuperVersionUnrefHandle(void* ptr) { ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, Version* _dummy_versions, Cache* _table_cache, + WriteBuffer* write_buffer, const ColumnFamilyOptions& cf_options, const DBOptions* db_options, const EnvOptions& env_options, @@ -237,6 +241,7 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, options_(*db_options, SanitizeOptions(&internal_comparator_, cf_options)), ioptions_(options_), mutable_cf_options_(options_, ioptions_), + write_buffer_(write_buffer), mem_(nullptr), imm_(options_.min_write_buffer_number_to_merge), super_version_(nullptr), @@ -413,13 +418,19 @@ void ColumnFamilyData::SetCurrent(Version* current_version) { current_ = current_version; } -void ColumnFamilyData::CreateNewMemtable( +MemTable* ColumnFamilyData::ConstructNewMemtable( const MutableCFOptions& mutable_cf_options) { assert(current_ != nullptr); + return new MemTable(internal_comparator_, ioptions_, + mutable_cf_options, write_buffer_); +} + +void ColumnFamilyData::CreateNewMemtable( + const MutableCFOptions& mutable_cf_options) { if (mem_ != nullptr) { delete mem_->Unref(); } - mem_ = new MemTable(internal_comparator_, ioptions_, mutable_cf_options); + SetMemtable(ConstructNewMemtable(mutable_cf_options)); mem_->Ref(); } @@ -600,9 +611,10 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname, const DBOptions* db_options, const EnvOptions& env_options, Cache* table_cache, + WriteBuffer* write_buffer, WriteController* write_controller) : max_column_family_(0), - dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, + dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr, ColumnFamilyOptions(), db_options, env_options, nullptr)), default_cfd_cache_(nullptr), @@ -610,6 +622,7 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname, db_options_(db_options), env_options_(env_options), table_cache_(table_cache), + write_buffer_(write_buffer), write_controller_(write_controller), spin_lock_(ATOMIC_FLAG_INIT) { // initialize linked list @@ -674,8 +687,9 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( const ColumnFamilyOptions& options) { assert(column_families_.find(name) == column_families_.end()); ColumnFamilyData* new_cfd = - new ColumnFamilyData(id, name, dummy_versions, table_cache_, options, - db_options_, env_options_, this); + new ColumnFamilyData(id, name, dummy_versions, table_cache_, + write_buffer_, options, db_options_, + env_options_, this); Lock(); column_families_.insert({name, id}); column_family_data_.insert({id, new_cfd}); diff --git a/db/column_family.h b/db/column_family.h index c6d49e71b..51ccd99ac 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -201,8 +201,9 @@ class ColumnFamilyData { MemTable* mem() { return mem_; } Version* current() { return current_; } Version* dummy_versions() { return dummy_versions_; } - void SetMemtable(MemTable* new_mem) { mem_ = new_mem; } void SetCurrent(Version* current); + MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options); + void SetMemtable(MemTable* new_mem) { mem_ = new_mem; } void CreateNewMemtable(const MutableCFOptions& mutable_cf_options); TableCache* table_cache() const { return table_cache_.get(); } @@ -264,6 +265,7 @@ class ColumnFamilyData { friend class ColumnFamilySet; ColumnFamilyData(uint32_t id, const std::string& name, Version* dummy_versions, Cache* table_cache, + WriteBuffer* write_buffer, const ColumnFamilyOptions& options, const DBOptions* db_options, const EnvOptions& env_options, ColumnFamilySet* column_family_set); @@ -294,6 +296,8 @@ class ColumnFamilyData { std::unique_ptr internal_stats_; + WriteBuffer* write_buffer_; + MemTable* mem_; MemTableList imm_; SuperVersion* super_version_; @@ -366,7 +370,7 @@ class ColumnFamilySet { ColumnFamilySet(const std::string& dbname, const DBOptions* db_options, const EnvOptions& env_options, Cache* table_cache, - WriteController* write_controller); + WriteBuffer* write_buffer, WriteController* write_controller); ~ColumnFamilySet(); ColumnFamilyData* GetDefault() const; @@ -421,6 +425,7 @@ class ColumnFamilySet { const DBOptions* const db_options_; const EnvOptions env_options_; Cache* table_cache_; + WriteBuffer* write_buffer_; WriteController* write_controller_; std::atomic_flag spin_lock_; }; diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 712471657..1db802813 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -9,6 +9,7 @@ #include "db/compaction_job.h" #include "db/column_family.h" #include "db/version_set.h" +#include "db/writebuffer.h" #include "rocksdb/cache.h" #include "rocksdb/options.h" #include "rocksdb/db.h" @@ -26,8 +27,10 @@ class CompactionJobTest { dbname_(test::TmpDir() + "/compaction_job_test"), mutable_cf_options_(Options(), ImmutableCFOptions(Options())), table_cache_(NewLRUCache(50000, 16, 8)), + write_buffer_(db_options_.db_write_buffer_size), versions_(new VersionSet(dbname_, &db_options_, env_options_, - table_cache_.get(), &write_controller_)), + table_cache_.get(), &write_buffer_, + &write_controller_)), shutting_down_(false), mock_table_factory_(new mock::MockTableFactory()) { ASSERT_OK(env_->CreateDirIfMissing(dbname_)); @@ -125,6 +128,7 @@ class CompactionJobTest { WriteController write_controller_; DBOptions db_options_; ColumnFamilyOptions cf_options_; + WriteBuffer write_buffer_; std::unique_ptr versions_; port::Mutex mutex_; std::atomic shutting_down_; diff --git a/db/db_bench.cc b/db/db_bench.cc index 6e5b63f24..c7fd0365c 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -198,6 +198,9 @@ DEFINE_bool(enable_numa, false, "CPU and memory of same node. Use \"$numactl --hardware\" command " "to see NUMA memory architecture."); +DEFINE_int64(db_write_buffer_size, rocksdb::Options().db_write_buffer_size, + "Number of bytes to buffer in all memtables before compacting"); + DEFINE_int64(write_buffer_size, rocksdb::Options().write_buffer_size, "Number of bytes to buffer in memtable before compacting"); @@ -1834,6 +1837,7 @@ class Benchmark { Options options; options.create_if_missing = !FLAGS_use_existing_db; options.create_missing_column_families = FLAGS_num_column_families > 1; + options.db_write_buffer_size = FLAGS_db_write_buffer_size; options.write_buffer_size = FLAGS_write_buffer_size; options.max_write_buffer_number = FLAGS_max_write_buffer_number; options.min_write_buffer_number_to_merge = diff --git a/db/db_impl.cc b/db/db_impl.cc index 99a386e76..bdc0030ae 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -44,6 +44,7 @@ #include "db/forward_iterator.h" #include "db/transaction_log_impl.h" #include "db/version_set.h" +#include "db/writebuffer.h" #include "db/write_batch_internal.h" #include "port/port.h" #include "rocksdb/cache.h" @@ -201,6 +202,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) default_cf_handle_(nullptr), total_log_size_(0), max_total_in_memory_state_(0), + write_buffer_(options.db_write_buffer_size), tmp_batch_(), bg_schedule_needed_(false), bg_compaction_scheduled_(0), @@ -231,7 +233,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) db_options_.table_cache_remove_scan_count_limit); versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, - table_cache_.get(), &write_controller_)); + table_cache_.get(), &write_buffer_, + &write_controller_)); column_family_memtables_.reset(new ColumnFamilyMemTablesImpl( versions_->GetColumnFamilySet(), &flush_scheduler_)); @@ -2823,6 +2826,23 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { } } MaybeScheduleFlushOrCompaction(); + } else if (UNLIKELY(write_buffer_.ShouldFlush())) { + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "Flushing all column families. Write buffer is using %" PRIu64 + " bytes out of a total of %" PRIu64 ".", + write_buffer_.memory_usage(), write_buffer_.buffer_size()); + // no need to refcount because drop is happening in write thread, so can't + // happen while we're in the write thread + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (!cfd->mem()->IsEmpty()) { + status = SetNewMemtableAndNewLogFile(cfd, &context); + if (!status.ok()) { + break; + } + cfd->imm()->FlushRequested(); + } + } + MaybeScheduleFlushOrCompaction(); } if (UNLIKELY(status.ok() && !bg_error_.ok())) { @@ -3030,8 +3050,7 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, } if (s.ok()) { - new_mem = new MemTable(cfd->internal_comparator(), *cfd->ioptions(), - mutable_cf_options); + new_mem = cfd->ConstructNewMemtable(mutable_cf_options); new_superversion = new SuperVersion(); } } diff --git a/db/db_impl.h b/db/db_impl.h index 1217610b5..c2c3969c1 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -24,6 +24,7 @@ #include "db/column_family.h" #include "db/version_edit.h" #include "db/wal_manager.h" +#include "db/writebuffer.h" #include "memtable_list.h" #include "port/port.h" #include "rocksdb/db.h" @@ -436,6 +437,8 @@ class DBImpl : public DB { std::unique_ptr db_directory_; + WriteBuffer write_buffer_; + WriteThread write_thread_; WriteBatch tmp_batch_; diff --git a/db/db_test.cc b/db/db_test.cc index de7132e58..ccc7597a2 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3445,7 +3445,7 @@ class ChangeFilterFactory : public CompactionFilterFactory { // TODO(kailiu) The tests on UniversalCompaction has some issues: // 1. A lot of magic numbers ("11" or "12"). -// 2. Made assumption on the memtable flush conidtions, which may change from +// 2. Made assumption on the memtable flush conditions, which may change from // time to time. TEST(DBTest, UniversalCompactionTrigger) { Options options; @@ -3521,7 +3521,7 @@ TEST(DBTest, UniversalCompactionTrigger) { } dbfull()->TEST_WaitForCompact(); // Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1. - // After comapction, we should have 2 files, with size 4, 2.4. + // After compaction, we should have 2 files, with size 4, 2.4. ASSERT_EQ(NumTableFilesAtLevel(0, 1), 2); for (int i = 1; i < options.num_levels ; i++) { ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0); @@ -3549,7 +3549,7 @@ TEST(DBTest, UniversalCompactionTrigger) { } dbfull()->TEST_WaitForCompact(); // Before compaction, we have 4 files at level 0, with size 4, 2.4, 1, 1. - // After comapction, we should have 3 files, with size 4, 2.4, 2. + // After compaction, we should have 3 files, with size 4, 2.4, 2. ASSERT_EQ(NumTableFilesAtLevel(0, 1), 3); for (int i = 1; i < options.num_levels ; i++) { ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0); @@ -6802,6 +6802,86 @@ TEST(DBTest, RecoverCheckFileAmount) { } } +TEST(DBTest, SharedWriteBuffer) { + Options options; + options.db_write_buffer_size = 100000; // this is the real limit + options.write_buffer_size = 500000; // this is never hit + CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options); + + // Trigger a flush on every CF + ASSERT_OK(Put(0, Key(1), DummyString(1))); + ASSERT_OK(Put(1, Key(1), DummyString(1))); + ASSERT_OK(Put(3, Key(1), DummyString(90000))); + ASSERT_OK(Put(2, Key(2), DummyString(20000))); + ASSERT_OK(Put(2, Key(1), DummyString(1))); + dbfull()->TEST_WaitForFlushMemTable(handles_[0]); + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + dbfull()->TEST_WaitForFlushMemTable(handles_[3]); + { + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + static_cast(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), + static_cast(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), + static_cast(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), + static_cast(1)); + } + + // Flush 'dobrynia' and 'nikitich' + ASSERT_OK(Put(2, Key(2), DummyString(50000))); + ASSERT_OK(Put(3, Key(2), DummyString(40000))); + ASSERT_OK(Put(2, Key(3), DummyString(20000))); + ASSERT_OK(Put(3, Key(2), DummyString(40000))); + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + dbfull()->TEST_WaitForFlushMemTable(handles_[3]); + { + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + static_cast(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), + static_cast(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), + static_cast(2)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), + static_cast(2)); + } + + // Make 'dobrynia' and 'nikitich' both take up 40% of space + // When 'pikachu' puts us over 100%, all 3 flush. + ASSERT_OK(Put(2, Key(2), DummyString(40000))); + ASSERT_OK(Put(1, Key(2), DummyString(20000))); + ASSERT_OK(Put(0, Key(1), DummyString(1))); + dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + dbfull()->TEST_WaitForFlushMemTable(handles_[3]); + { + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + static_cast(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), + static_cast(2)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), + static_cast(3)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), + static_cast(3)); + } + + // Some remaining writes so 'default' and 'nikitich' flush on closure. + ASSERT_OK(Put(3, Key(1), DummyString(1))); + ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"}, + options); + { + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + static_cast(2)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), + static_cast(2)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), + static_cast(3)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), + static_cast(4)); + } +} + TEST(DBTest, PurgeInfoLogs) { Options options = CurrentOptions(); options.keep_log_file_num = 5; diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index aee3fd1a8..7d779b58f 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -9,6 +9,7 @@ #include "db/flush_job.h" #include "db/column_family.h" #include "db/version_set.h" +#include "db/writebuffer.h" #include "rocksdb/cache.h" #include "util/testharness.h" #include "util/testutil.h" @@ -25,8 +26,10 @@ class FlushJobTest { : env_(Env::Default()), dbname_(test::TmpDir() + "/flush_job_test"), table_cache_(NewLRUCache(50000, 16, 8)), + write_buffer_(db_options_.db_write_buffer_size), versions_(new VersionSet(dbname_, &db_options_, env_options_, - table_cache_.get(), &write_controller_)), + table_cache_.get(), &write_buffer_, + &write_controller_)), shutting_down_(false), mock_table_factory_(new mock::MockTableFactory()) { ASSERT_OK(env_->CreateDirIfMissing(dbname_)); @@ -69,6 +72,7 @@ class FlushJobTest { std::shared_ptr table_cache_; WriteController write_controller_; DBOptions db_options_; + WriteBuffer write_buffer_; ColumnFamilyOptions cf_options_; std::unique_ptr versions_; port::Mutex mutex_; @@ -91,9 +95,7 @@ TEST(FlushJobTest, Empty) { TEST(FlushJobTest, NonEmpty) { JobContext job_context; auto cfd = versions_->GetColumnFamilySet()->GetDefault(); - - auto new_mem = new MemTable(cfd->internal_comparator(), *cfd->ioptions(), - *cfd->GetLatestMutableCFOptions()); + auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions()); new_mem->Ref(); std::map inserted_keys; for (int i = 1; i < 10000; ++i) { diff --git a/db/log_and_apply_bench.cc b/db/log_and_apply_bench.cc index 417a2a8d7..b55ec0539 100644 --- a/db/log_and_apply_bench.cc +++ b/db/log_and_apply_bench.cc @@ -15,6 +15,7 @@ #include "util/benchharness.h" #include "db/version_set.h" #include "db/write_controller.h" +#include "db/writebuffer.h" #include "util/mutexlock.h" namespace rocksdb { @@ -52,9 +53,10 @@ void BM_LogAndApply(int iters, int num_base_files) { // Notice we are using the default options not through SanitizeOptions(). // We might want to initialize some options manually if needed. options.db_paths.emplace_back(dbname, 0); + WriteBuffer wb(options.db_write_buffer_size); // The parameter of table cache is passed in as null, so any file I/O // operation is likely to fail. - vset = new VersionSet(dbname, &options, sopt, nullptr, &wc); + vset = new VersionSet(dbname, &options, sopt, nullptr, &wb, &wc); std::vector dummy; dummy.push_back(ColumnFamilyDescriptor()); ASSERT_OK(vset->Recover(dummy)); diff --git a/db/memtable.cc b/db/memtable.cc index 98212a61b..6dcacc421 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -15,6 +15,7 @@ #include "db/dbformat.h" #include "db/merge_context.h" +#include "db/writebuffer.h" #include "rocksdb/comparator.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" @@ -52,14 +53,17 @@ MemTableOptions::MemTableOptions( MemTable::MemTable(const InternalKeyComparator& cmp, const ImmutableCFOptions& ioptions, - const MutableCFOptions& mutable_cf_options) + const MutableCFOptions& mutable_cf_options, + WriteBuffer* write_buffer) : comparator_(cmp), moptions_(ioptions, mutable_cf_options), refs_(0), kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)), arena_(moptions_.arena_block_size), + allocator_(&arena_, write_buffer), table_(ioptions.memtable_factory->CreateMemTableRep( - comparator_, &arena_, ioptions.prefix_extractor, ioptions.info_log)), + comparator_, &allocator_, ioptions.prefix_extractor, + ioptions.info_log)), num_entries_(0), flush_in_progress_(false), flush_completed_(false), @@ -76,7 +80,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, assert(!should_flush_); if (prefix_extractor_ && moptions_.memtable_prefix_bloom_bits > 0) { prefix_bloom_.reset(new DynamicBloom( - &arena_, + &allocator_, moptions_.memtable_prefix_bloom_bits, ioptions.bloom_locality, moptions_.memtable_prefix_bloom_probes, nullptr, moptions_.memtable_prefix_bloom_huge_page_tlb_size, @@ -179,7 +183,7 @@ Slice MemTableRep::UserKey(const char* key) const { } KeyHandle MemTableRep::Allocate(const size_t len, char** buf) { - *buf = arena_->Allocate(len); + *buf = allocator_->Allocate(len); return static_cast(*buf); } diff --git a/db/memtable.h b/db/memtable.h index 96af1e90a..0c1f0de1a 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -19,16 +19,17 @@ #include "rocksdb/db.h" #include "rocksdb/memtablerep.h" #include "rocksdb/immutable_options.h" +#include "db/memtable_allocator.h" #include "util/arena.h" #include "util/dynamic_bloom.h" #include "util/mutable_cf_options.h" namespace rocksdb { -class Arena; class Mutex; class MemTableIterator; class MergeContext; +class WriteBuffer; struct MemTableOptions { explicit MemTableOptions( @@ -67,7 +68,8 @@ class MemTable { // is zero and the caller must call Ref() at least once. explicit MemTable(const InternalKeyComparator& comparator, const ImmutableCFOptions& ioptions, - const MutableCFOptions& mutable_cf_options); + const MutableCFOptions& mutable_cf_options, + WriteBuffer* write_buffer); ~MemTable(); @@ -183,7 +185,10 @@ class MemTable { void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; } // Notify the underlying storage that no more items will be added - void MarkImmutable() { table_->MarkReadOnly(); } + void MarkImmutable() { + table_->MarkReadOnly(); + allocator_.DoneAllocating(); + } // return true if the current MemTableRep supports merge operator. bool IsMergeOperatorSupported() const { @@ -200,8 +205,6 @@ class MemTable { return comparator_.comparator; } - const Arena& TEST_GetArena() const { return arena_; } - const MemTableOptions* GetMemTableOptions() const { return &moptions_; } private: @@ -217,6 +220,7 @@ class MemTable { int refs_; const size_t kArenaBlockSize; Arena arena_; + MemTableAllocator allocator_; unique_ptr table_; uint64_t num_entries_; diff --git a/db/memtable_allocator.cc b/db/memtable_allocator.cc new file mode 100644 index 000000000..d3ecea2fd --- /dev/null +++ b/db/memtable_allocator.cc @@ -0,0 +1,52 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include + +#include "db/memtable_allocator.h" +#include "db/writebuffer.h" +#include "util/arena.h" + +namespace rocksdb { + +MemTableAllocator::MemTableAllocator(Arena* arena, WriteBuffer* write_buffer) + : arena_(arena), write_buffer_(write_buffer), bytes_allocated_(0) { +} + +MemTableAllocator::~MemTableAllocator() { + DoneAllocating(); +} + +char* MemTableAllocator::Allocate(size_t bytes) { + assert(write_buffer_ != nullptr); + bytes_allocated_ += bytes; + write_buffer_->ReserveMem(bytes); + return arena_->Allocate(bytes); +} + +char* MemTableAllocator::AllocateAligned(size_t bytes, size_t huge_page_size, + Logger* logger) { + assert(write_buffer_ != nullptr); + bytes_allocated_ += bytes; + write_buffer_->ReserveMem(bytes); + return arena_->AllocateAligned(bytes, huge_page_size, logger); +} + +void MemTableAllocator::DoneAllocating() { + if (write_buffer_ != nullptr) { + write_buffer_->FreeMem(bytes_allocated_); + write_buffer_ = nullptr; + } +} + +size_t MemTableAllocator::BlockSize() const { + return arena_->BlockSize(); +} + +} // namespace rocksdb diff --git a/db/memtable_allocator.h b/db/memtable_allocator.h new file mode 100644 index 000000000..fa8ee1287 --- /dev/null +++ b/db/memtable_allocator.h @@ -0,0 +1,47 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// This is used by the MemTable to allocate write buffer memory. It connects +// to WriteBuffer so we can track and enforce overall write buffer limits. + +#pragma once +#include "util/allocator.h" + +namespace rocksdb { + +class Arena; +class Logger; +class WriteBuffer; + +class MemTableAllocator : public Allocator { + public: + explicit MemTableAllocator(Arena* arena, WriteBuffer* write_buffer); + ~MemTableAllocator(); + + // Allocator interface + char* Allocate(size_t bytes) override; + char* AllocateAligned(size_t bytes, size_t huge_page_size = 0, + Logger* logger = nullptr) override; + size_t BlockSize() const override; + + // Call when we're finished allocating memory so we can free it from + // the write buffer's limit. + void DoneAllocating(); + + private: + Arena* arena_; + WriteBuffer* write_buffer_; + size_t bytes_allocated_; + + // No copying allowed + MemTableAllocator(const MemTableAllocator&); + void operator=(const MemTableAllocator&); +}; + +} // namespace rocksdb diff --git a/db/repair.cc b/db/repair.cc index 8fa312638..3b5952dd0 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -45,6 +45,7 @@ #include "db/memtable.h" #include "db/table_cache.h" #include "db/version_edit.h" +#include "db/writebuffer.h" #include "db/write_batch_internal.h" #include "rocksdb/comparator.h" #include "rocksdb/db.h" @@ -220,8 +221,9 @@ class Repairer { std::string scratch; Slice record; WriteBatch batch; + WriteBuffer wb(options_.db_write_buffer_size); MemTable* mem = new MemTable(icmp_, ioptions_, - MutableCFOptions(options_, ioptions_)); + MutableCFOptions(options_, ioptions_), &wb); auto cf_mems_default = new ColumnFamilyMemTablesDefault(mem); mem->Ref(); int counter = 0; diff --git a/db/skiplist.h b/db/skiplist.h index 4ee4ed714..c1e375007 100644 --- a/db/skiplist.h +++ b/db/skiplist.h @@ -34,9 +34,8 @@ #include #include #include -#include "util/arena.h" #include "port/port.h" -#include "util/arena.h" +#include "util/allocator.h" #include "util/random.h" namespace rocksdb { @@ -48,9 +47,9 @@ class SkipList { public: // Create a new SkipList object that will use "cmp" for comparing keys, - // and will allocate memory using "*arena". Objects allocated in the arena - // must remain allocated for the lifetime of the skiplist object. - explicit SkipList(Comparator cmp, Arena* arena, + // and will allocate memory using "*allocator". Objects allocated in the + // allocator must remain allocated for the lifetime of the skiplist object. + explicit SkipList(Comparator cmp, Allocator* allocator, int32_t max_height = 12, int32_t branching_factor = 4); // Insert key into the list. @@ -110,7 +109,7 @@ class SkipList { // Immutable after construction Comparator const compare_; - Arena* const arena_; // Arena used for allocations of nodes + Allocator* const allocator_; // Allocator used for allocations of nodes Node* const head_; @@ -196,7 +195,7 @@ struct SkipList::Node { template typename SkipList::Node* SkipList::NewNode(const Key& key, int height) { - char* mem = arena_->AllocateAligned( + char* mem = allocator_->AllocateAligned( sizeof(Node) + sizeof(std::atomic) * (height - 1)); return new (mem) Node(key); } @@ -356,23 +355,24 @@ typename SkipList::Node* SkipList::FindLast() } template -SkipList::SkipList(const Comparator cmp, Arena* arena, +SkipList::SkipList(const Comparator cmp, Allocator* allocator, int32_t max_height, int32_t branching_factor) : kMaxHeight_(max_height), kBranching_(branching_factor), compare_(cmp), - arena_(arena), + allocator_(allocator), head_(NewNode(0 /* any key will do */, max_height)), max_height_(1), prev_height_(1), rnd_(0xdeadbeef) { assert(kMaxHeight_ > 0); assert(kBranching_ > 0); - // Allocate the prev_ Node* array, directly from the passed-in arena. + // Allocate the prev_ Node* array, directly from the passed-in allocator. // prev_ does not need to be freed, as its life cycle is tied up with - // the arena as a whole. - prev_ = (Node**) arena_->AllocateAligned(sizeof(Node*) * kMaxHeight_); + // the allocator as a whole. + prev_ = reinterpret_cast( + allocator_->AllocateAligned(sizeof(Node*) * kMaxHeight_)); for (int i = 0; i < kMaxHeight_; i++) { head_->SetNext(i, nullptr); prev_[i] = head_; diff --git a/db/version_set.cc b/db/version_set.cc index f71ffce95..f138c8232 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -31,6 +31,7 @@ #include "db/table_cache.h" #include "db/compaction.h" #include "db/version_builder.h" +#include "db/writebuffer.h" #include "rocksdb/env.h" #include "rocksdb/merge_operator.h" #include "table/table_reader.h" @@ -1490,9 +1491,11 @@ struct VersionSet::ManifestWriter { VersionSet::VersionSet(const std::string& dbname, const DBOptions* db_options, const EnvOptions& storage_options, Cache* table_cache, + WriteBuffer* write_buffer, WriteController* write_controller) : column_family_set_(new ColumnFamilySet( - dbname, db_options, storage_options, table_cache, write_controller)), + dbname, db_options, storage_options, table_cache, + write_buffer, write_controller)), env_(db_options->env), dbname_(dbname), db_options_(db_options), @@ -2215,7 +2218,8 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, options->max_open_files - 10, options->table_cache_numshardbits, options->table_cache_remove_scan_count_limit)); WriteController wc; - VersionSet versions(dbname, options, env_options, tc.get(), &wc); + WriteBuffer wb(options->db_write_buffer_size); + VersionSet versions(dbname, options, env_options, tc.get(), &wb, &wc); Status status; std::vector dummy; diff --git a/db/version_set.h b/db/version_set.h index 04ad37773..6e645680b 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -50,6 +50,7 @@ class LookupKey; class MemTable; class Version; class VersionSet; +class WriteBuffer; class MergeContext; class ColumnFamilyData; class ColumnFamilySet; @@ -475,7 +476,7 @@ class VersionSet { public: VersionSet(const std::string& dbname, const DBOptions* db_options, const EnvOptions& env_options, Cache* table_cache, - WriteController* write_controller); + WriteBuffer* write_buffer, WriteController* write_controller); ~VersionSet(); // Apply *edit to the current version to form a new descriptor that diff --git a/db/wal_manager_test.cc b/db/wal_manager_test.cc index 5c12586c8..bc12012ba 100644 --- a/db/wal_manager_test.cc +++ b/db/wal_manager_test.cc @@ -13,6 +13,7 @@ #include "db/log_writer.h" #include "db/column_family.h" #include "db/version_set.h" +#include "db/writebuffer.h" #include "util/testharness.h" #include "util/testutil.h" #include "table/mock_table.h" @@ -28,6 +29,7 @@ class WalManagerTest { : env_(Env::Default()), dbname_(test::TmpDir() + "/wal_manager_test"), table_cache_(NewLRUCache(50000, 16, 8)), + write_buffer_(db_options_.db_write_buffer_size), current_log_number_(0) { DestroyDB(dbname_, Options()); } @@ -40,7 +42,8 @@ class WalManagerTest { db_options_.wal_dir = dbname_; versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, - table_cache_.get(), &write_controller_)); + table_cache_.get(), &write_buffer_, + &write_controller_)); wal_manager_.reset(new WalManager(db_options_, env_options_)); } @@ -93,6 +96,7 @@ class WalManagerTest { EnvOptions env_options_; std::shared_ptr table_cache_; DBOptions db_options_; + WriteBuffer write_buffer_; std::unique_ptr versions_; std::unique_ptr wal_manager_; diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index dbf65b6e9..e28d02aef 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -13,6 +13,7 @@ #include "db/memtable.h" #include "db/column_family.h" #include "db/write_batch_internal.h" +#include "db/writebuffer.h" #include "rocksdb/env.h" #include "rocksdb/memtablerep.h" #include "rocksdb/utilities/write_batch_with_index.h" @@ -28,8 +29,9 @@ static std::string PrintContents(WriteBatch* b) { Options options; options.memtable_factory = factory; ImmutableCFOptions ioptions(options); + WriteBuffer wb(options.db_write_buffer_size); MemTable* mem = new MemTable(cmp, ioptions, - MutableCFOptions(options, ioptions)); + MutableCFOptions(options, ioptions), &wb); mem->Ref(); std::string state; ColumnFamilyMemTablesDefault cf_mems_default(mem); diff --git a/db/writebuffer.h b/db/writebuffer.h new file mode 100644 index 000000000..7047a9244 --- /dev/null +++ b/db/writebuffer.h @@ -0,0 +1,44 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// WriteBuffer is for managing memory allocation for one or more MemTables. + +#pragma once + +namespace rocksdb { + +class WriteBuffer { + public: + explicit WriteBuffer(size_t _buffer_size) + : buffer_size_(_buffer_size), memory_used_(0) {} + + ~WriteBuffer() {} + + size_t memory_usage() const { return memory_used_; } + size_t buffer_size() const { return buffer_size_; } + + // Should only be called from write thread + bool ShouldFlush() const { + return buffer_size() > 0 && memory_usage() >= buffer_size(); + } + + // Should only be called from write thread + void ReserveMem(size_t mem) { memory_used_ += mem; } + void FreeMem(size_t mem) { memory_used_ -= mem; } + + private: + const size_t buffer_size_; + size_t memory_used_; + + // No copying allowed + WriteBuffer(const WriteBuffer&); + void operator=(const WriteBuffer&); +}; + +} // namespace rocksdb diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index 8c2d7201b..97141cc73 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -14,8 +14,8 @@ // (4) Items are never deleted. // The liberal use of assertions is encouraged to enforce (1). // -// The factory will be passed an Arena object when a new MemTableRep is -// requested. The API for this object is in rocksdb/arena.h. +// The factory will be passed an MemTableAllocator object when a new MemTableRep +// is requested. // // Users can implement their own memtable representations. We include three // types built in: @@ -41,6 +41,7 @@ namespace rocksdb { class Arena; +class MemTableAllocator; class LookupKey; class Slice; class SliceTransform; @@ -65,7 +66,7 @@ class MemTableRep { virtual ~KeyComparator() { } }; - explicit MemTableRep(Arena* arena) : arena_(arena) {} + explicit MemTableRep(MemTableAllocator* allocator) : allocator_(allocator) {} // Allocate a buf of len size for storing key. The idea is that a specific // memtable representation knows its underlying data structure better. By @@ -101,7 +102,7 @@ class MemTableRep { bool (*callback_func)(void* arg, const char* entry)); // Report an approximation of how much memory has been used other than memory - // that was allocated through the arena. + // that was allocated through the allocator. virtual size_t ApproximateMemoryUsage() = 0; virtual ~MemTableRep() { } @@ -150,7 +151,7 @@ class MemTableRep { // Return an iterator that has a special Seek semantics. The result of // a Seek might only include keys with the same prefix as the target key. - // arena: If not null, the arena needs to be used to allocate the Iterator. + // arena: If not null, the arena is used to allocate the Iterator. // When destroying the iterator, the caller will not call "delete" // but Iterator::~Iterator() directly. The destructor needs to destroy // all the states but those allocated in arena. @@ -171,7 +172,7 @@ class MemTableRep { // user key. virtual Slice UserKey(const char* key) const; - Arena* arena_; + MemTableAllocator* allocator_; }; // This is the base class for all factories that are used by RocksDB to create @@ -180,7 +181,8 @@ class MemTableRepFactory { public: virtual ~MemTableRepFactory() {} virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&, - Arena*, const SliceTransform*, + MemTableAllocator*, + const SliceTransform*, Logger* logger) = 0; virtual const char* Name() const = 0; }; @@ -197,7 +199,8 @@ class SkipListFactory : public MemTableRepFactory { explicit SkipListFactory(size_t lookahead = 0) : lookahead_(lookahead) {} virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&, - Arena*, const SliceTransform*, + MemTableAllocator*, + const SliceTransform*, Logger* logger) override; virtual const char* Name() const override { return "SkipListFactory"; } @@ -220,7 +223,8 @@ class VectorRepFactory : public MemTableRepFactory { public: explicit VectorRepFactory(size_t count = 0) : count_(count) { } virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&, - Arena*, const SliceTransform*, + MemTableAllocator*, + const SliceTransform*, Logger* logger) override; virtual const char* Name() const override { return "VectorRepFactory"; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 91c6604ae..09b72ca6b 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -205,6 +205,9 @@ struct ColumnFamilyOptions { // Also, a larger write buffer will result in a longer recovery time // the next time the database is opened. // + // Note that write_buffer_size is enforced per column family. + // See db_write_buffer_size for sharing memory across column families. + // // Default: 4MB // // Dynamically changeable through SetOptions() API @@ -859,6 +862,18 @@ struct DBOptions { // Default: true bool advise_random_on_open; + // Amount of data to build up in memtables across all column + // families before writing to disk. + // + // This is distinct from write_buffer_size, which enforces a limit + // for a single memtable. + // + // This feature is disabled by default. Specify a non-zero value + // to enable it. + // + // Default: 0 (disabled) + size_t db_write_buffer_size; + // Specify the file access pattern once a compaction is started. // It will be applied to all input files of a compaction. // Default: NORMAL diff --git a/table/bloom_block.h b/table/bloom_block.h index 7ef5d14b6..5b60d2bca 100644 --- a/table/bloom_block.h +++ b/table/bloom_block.h @@ -18,9 +18,10 @@ class BloomBlockBuilder { explicit BloomBlockBuilder(uint32_t num_probes = 6) : bloom_(num_probes, nullptr) {} - void SetTotalBits(Arena* arena, uint32_t total_bits, uint32_t locality, - size_t huge_page_tlb_size, Logger* logger) { - bloom_.SetTotalBits(arena, total_bits, locality, huge_page_tlb_size, + void SetTotalBits(Allocator* allocator, uint32_t total_bits, + uint32_t locality, size_t huge_page_tlb_size, + Logger* logger) { + bloom_.SetTotalBits(allocator, total_bits, locality, huge_page_tlb_size, logger); } diff --git a/table/table_test.cc b/table/table_test.cc index 9e5f8a49e..3d603bf31 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -20,6 +20,7 @@ #include "db/dbformat.h" #include "db/memtable.h" #include "db/write_batch_internal.h" +#include "db/writebuffer.h" #include "rocksdb/cache.h" #include "rocksdb/db.h" @@ -427,15 +428,15 @@ uint64_t TableConstructor::cur_uniq_id_ = 1; class MemTableConstructor: public Constructor { public: - explicit MemTableConstructor(const Comparator* cmp) + explicit MemTableConstructor(const Comparator* cmp, WriteBuffer* wb) : Constructor(cmp), internal_comparator_(cmp), + write_buffer_(wb), table_factory_(new SkipListFactory) { - Options options; - options.memtable_factory = table_factory_; - ImmutableCFOptions ioptions(options); + options_.memtable_factory = table_factory_; + ImmutableCFOptions ioptions(options_); memtable_ = new MemTable(internal_comparator_, ioptions, - MutableCFOptions(options, ioptions)); + MutableCFOptions(options_, ioptions), wb); memtable_->Ref(); } ~MemTableConstructor() { @@ -446,11 +447,10 @@ class MemTableConstructor: public Constructor { const InternalKeyComparator& internal_comparator, const KVMap& kv_map) { delete memtable_->Unref(); - Options options; - options.memtable_factory = table_factory_; - ImmutableCFOptions mem_ioptions(options); + ImmutableCFOptions mem_ioptions(ioptions); memtable_ = new MemTable(internal_comparator_, mem_ioptions, - MutableCFOptions(options, mem_ioptions)); + MutableCFOptions(options_, mem_ioptions), + write_buffer_); memtable_->Ref(); int seq = 1; for (const auto kv : kv_map) { @@ -471,6 +471,8 @@ class MemTableConstructor: public Constructor { private: mutable Arena arena_; InternalKeyComparator internal_comparator_; + Options options_; + WriteBuffer* write_buffer_; MemTable* memtable_; std::shared_ptr table_factory_; }; @@ -696,7 +698,9 @@ class FixedOrLessPrefixTransform : public SliceTransform { class Harness { public: - Harness() : ioptions_(options_), constructor_(nullptr) {} + Harness() + : ioptions_(options_), constructor_(nullptr), + write_buffer_(options_.db_write_buffer_size) {} void Init(const TestArgs& args) { delete constructor_; @@ -773,7 +777,8 @@ class Harness { table_options_.block_size = 256; options_.table_factory.reset( new BlockBasedTableFactory(table_options_)); - constructor_ = new MemTableConstructor(options_.comparator); + constructor_ = new MemTableConstructor(options_.comparator, + &write_buffer_); break; case DB_TEST: table_options_.block_size = 256; @@ -981,6 +986,7 @@ class Harness { ImmutableCFOptions ioptions_; BlockBasedTableOptions table_options_ = BlockBasedTableOptions(); Constructor* constructor_; + WriteBuffer write_buffer_; bool support_prev_; bool only_support_prefix_seek_; shared_ptr internal_comparator_; @@ -1870,8 +1876,9 @@ TEST(MemTableTest, Simple) { Options options; options.memtable_factory = table_factory; ImmutableCFOptions ioptions(options); + WriteBuffer wb(options.db_write_buffer_size); MemTable* memtable = new MemTable(cmp, ioptions, - MutableCFOptions(options, ioptions)); + MutableCFOptions(options, ioptions), &wb); memtable->Ref(); WriteBatch batch; WriteBatchInternal::SetSequence(&batch, 100); diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 9aad6efb9..c63d82413 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -114,6 +114,9 @@ DEFINE_bool(verbose, false, "Verbose"); DEFINE_bool(progress_reports, true, "If true, db_stress will report number of finished operations"); +DEFINE_uint64(db_write_buffer_size, rocksdb::Options().db_write_buffer_size, + "Number of bytes to buffer in all memtables before compacting"); + DEFINE_int32(write_buffer_size, static_cast(rocksdb::Options().write_buffer_size), "Number of bytes to buffer in memtable before compacting"); @@ -1682,6 +1685,7 @@ class StressTest { fprintf(stdout, "Write percentage : %d%%\n", FLAGS_writepercent); fprintf(stdout, "Delete percentage : %d%%\n", FLAGS_delpercent); fprintf(stdout, "Iterate percentage : %d%%\n", FLAGS_iterpercent); + fprintf(stdout, "DB-write-buffer-size: %lu\n", FLAGS_db_write_buffer_size); fprintf(stdout, "Write-buffer-size : %d\n", FLAGS_write_buffer_size); fprintf(stdout, "Iterations : %lu\n", @@ -1753,6 +1757,7 @@ class StressTest { block_based_options.filter_policy = filter_policy_; options_.table_factory.reset( NewBlockBasedTableFactory(block_based_options)); + options_.db_write_buffer_size = FLAGS_db_write_buffer_size; options_.write_buffer_size = FLAGS_write_buffer_size; options_.max_write_buffer_number = FLAGS_max_write_buffer_number; options_.min_write_buffer_number_to_merge = diff --git a/util/allocator.h b/util/allocator.h new file mode 100644 index 000000000..58bf0da31 --- /dev/null +++ b/util/allocator.h @@ -0,0 +1,32 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// Abstract interface for allocating memory in blocks. This memory is freed +// when the allocator object is destroyed. See the Arena class for more info. + +#pragma once +#include +#include + +namespace rocksdb { + +class Logger; + +class Allocator { + public: + virtual ~Allocator() {} + + virtual char* Allocate(size_t bytes) = 0; + virtual char* AllocateAligned(size_t bytes, size_t huge_page_size = 0, + Logger* logger = nullptr) = 0; + + virtual size_t BlockSize() const = 0; +}; + +} // namespace rocksdb diff --git a/util/arena.h b/util/arena.h index 4764c1568..644a12947 100644 --- a/util/arena.h +++ b/util/arena.h @@ -7,7 +7,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -// Arena is an implementation of Arena class. For a request of small size, +// Arena is an implementation of Allocator class. For a request of small size, // it allocates a block with pre-defined block size. For a request of big // size, it uses malloc to directly get the requested size. @@ -17,15 +17,13 @@ #include #include #include -#include "util/arena.h" +#include "util/allocator.h" namespace rocksdb { -class Logger; - const size_t kInlineSize = 2048; -class Arena { +class Arena : public Allocator { public: // No copying allowed Arena(const Arena&) = delete; @@ -41,7 +39,7 @@ class Arena { explicit Arena(size_t block_size = kMinBlockSize, size_t huge_page_size = 0); ~Arena(); - char* Allocate(size_t bytes); + char* Allocate(size_t bytes) override; // huge_page_size: if >0, will try to allocate from huage page TLB. // The argument will be the size of the page size for huge page TLB. Bytes @@ -56,7 +54,7 @@ class Arena { // huge_page_tlb_size > 0, we highly recommend a logger is passed in. // Otherwise, the error message will be printed out to stderr directly. char* AllocateAligned(size_t bytes, size_t huge_page_size = 0, - Logger* logger = nullptr); + Logger* logger = nullptr) override; // Returns an estimate of the total memory usage of data allocated // by the arena (exclude the space allocated but not yet used for future @@ -74,7 +72,7 @@ class Arena { // same size of that allocation. size_t IrregularBlockNum() const { return irregular_block_num; } - size_t BlockSize() const { return kBlockSize; } + size_t BlockSize() const override { return kBlockSize; } private: char inline_block_[kInlineSize]; diff --git a/util/dynamic_bloom.cc b/util/dynamic_bloom.cc index 73c2c9436..ffe8157cc 100644 --- a/util/dynamic_bloom.cc +++ b/util/dynamic_bloom.cc @@ -9,6 +9,7 @@ #include "port/port.h" #include "rocksdb/slice.h" +#include "util/allocator.h" #include "util/hash.h" namespace rocksdb { @@ -29,13 +30,13 @@ uint32_t GetTotalBitsForLocality(uint32_t total_bits) { } } -DynamicBloom::DynamicBloom(Arena* arena, uint32_t total_bits, uint32_t locality, - uint32_t num_probes, +DynamicBloom::DynamicBloom(Allocator* allocator, uint32_t total_bits, + uint32_t locality, uint32_t num_probes, uint32_t (*hash_func)(const Slice& key), size_t huge_page_tlb_size, Logger* logger) : DynamicBloom(num_probes, hash_func) { - SetTotalBits(arena, total_bits, locality, huge_page_tlb_size, logger); + SetTotalBits(allocator, total_bits, locality, huge_page_tlb_size, logger); } DynamicBloom::DynamicBloom(uint32_t num_probes, @@ -52,7 +53,7 @@ void DynamicBloom::SetRawData(unsigned char* raw_data, uint32_t total_bits, kNumBlocks = num_blocks; } -void DynamicBloom::SetTotalBits(Arena* arena, +void DynamicBloom::SetTotalBits(Allocator* allocator, uint32_t total_bits, uint32_t locality, size_t huge_page_tlb_size, Logger* logger) { @@ -67,9 +68,9 @@ void DynamicBloom::SetTotalBits(Arena* arena, if (kNumBlocks > 0) { sz += CACHE_LINE_SIZE - 1; } - assert(arena); + assert(allocator); raw_ = reinterpret_cast( - arena->AllocateAligned(sz, huge_page_tlb_size, logger)); + allocator->AllocateAligned(sz, huge_page_tlb_size, logger)); memset(raw_, 0, sz); if (kNumBlocks > 0 && (reinterpret_cast(raw_) % CACHE_LINE_SIZE)) { data_ = raw_ + CACHE_LINE_SIZE - diff --git a/util/dynamic_bloom.h b/util/dynamic_bloom.h index b3b402c4f..a6e4d7367 100644 --- a/util/dynamic_bloom.h +++ b/util/dynamic_bloom.h @@ -9,7 +9,6 @@ #include "rocksdb/slice.h" -#include "util/arena.h" #include "port/port_posix.h" #include @@ -18,11 +17,12 @@ namespace rocksdb { class Slice; +class Allocator; class Logger; class DynamicBloom { public: - // arena: pass arena to bloom filter, hence trace the usage of memory + // allocator: pass allocator to bloom filter, hence trace the usage of memory // total_bits: fixed total bits for the bloom // num_probes: number of hash probes for a single key // locality: If positive, optimize for cache line locality, 0 otherwise. @@ -32,7 +32,7 @@ class DynamicBloom { // it to be allocated, like: // sysctl -w vm.nr_hugepages=20 // See linux doc Documentation/vm/hugetlbpage.txt - explicit DynamicBloom(Arena* arena, + explicit DynamicBloom(Allocator* allocator, uint32_t total_bits, uint32_t locality = 0, uint32_t num_probes = 6, uint32_t (*hash_func)(const Slice& key) = nullptr, @@ -42,8 +42,9 @@ class DynamicBloom { explicit DynamicBloom(uint32_t num_probes = 6, uint32_t (*hash_func)(const Slice& key) = nullptr); - void SetTotalBits(Arena* arena, uint32_t total_bits, uint32_t locality, - size_t huge_page_tlb_size, Logger* logger); + void SetTotalBits(Allocator* allocator, uint32_t total_bits, + uint32_t locality, size_t huge_page_tlb_size, + Logger* logger); ~DynamicBloom() {} diff --git a/util/dynamic_bloom_test.cc b/util/dynamic_bloom_test.cc index a3d6e0fc7..a8b1c529b 100644 --- a/util/dynamic_bloom_test.cc +++ b/util/dynamic_bloom_test.cc @@ -21,6 +21,7 @@ int main() { #include "dynamic_bloom.h" #include "port/port.h" +#include "util/arena.h" #include "util/logging.h" #include "util/testharness.h" #include "util/testutil.h" diff --git a/util/hash_cuckoo_rep.cc b/util/hash_cuckoo_rep.cc index 6a67fab44..3ac5ba746 100644 --- a/util/hash_cuckoo_rep.cc +++ b/util/hash_cuckoo_rep.cc @@ -52,25 +52,26 @@ struct CuckooStep { class HashCuckooRep : public MemTableRep { public: explicit HashCuckooRep(const MemTableRep::KeyComparator& compare, - Arena* arena, const size_t bucket_count, + MemTableAllocator* allocator, + const size_t bucket_count, const unsigned int hash_func_count) - : MemTableRep(arena), + : MemTableRep(allocator), compare_(compare), - arena_(arena), + allocator_(allocator), bucket_count_(bucket_count), cuckoo_path_max_depth_(kDefaultCuckooPathMaxDepth), occupied_count_(0), hash_function_count_(hash_func_count), backup_table_(nullptr) { char* mem = reinterpret_cast( - arena_->Allocate(sizeof(std::atomic) * bucket_count_)); + allocator_->Allocate(sizeof(std::atomic) * bucket_count_)); cuckoo_array_ = new (mem) std::atomic[bucket_count_]; for (unsigned int bid = 0; bid < bucket_count_; ++bid) { cuckoo_array_[bid].store(nullptr, std::memory_order_relaxed); } cuckoo_path_ = reinterpret_cast( - arena_->Allocate(sizeof(int) * (cuckoo_path_max_depth_ + 1))); + allocator_->Allocate(sizeof(int) * (cuckoo_path_max_depth_ + 1))); is_nearly_full_ = false; } @@ -181,8 +182,8 @@ class HashCuckooRep : public MemTableRep { private: const MemTableRep::KeyComparator& compare_; - // the pointer to Arena to allocate memory, immutable after construction. - Arena* const arena_; + // the pointer to Allocator to allocate memory, immutable after construction. + MemTableAllocator* const allocator_; // the number of hash bucket in the hash table. const size_t bucket_count_; // the maxinum depth of the cuckoo path. @@ -321,7 +322,7 @@ void HashCuckooRep::Insert(KeyHandle handle) { if (backup_table_.get() == nullptr) { VectorRepFactory factory(10); backup_table_.reset( - factory.CreateMemTableRep(compare_, arena_, nullptr, nullptr)); + factory.CreateMemTableRep(compare_, allocator_, nullptr, nullptr)); is_nearly_full_ = true; } backup_table_->Insert(key); @@ -601,7 +602,7 @@ void HashCuckooRep::Iterator::SeekToLast() { } // anom namespace MemTableRep* HashCuckooRepFactory::CreateMemTableRep( - const MemTableRep::KeyComparator& compare, Arena* arena, + const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator, const SliceTransform* transform, Logger* logger) { // The estimated average fullness. The write performance of any close hash // degrades as the fullness of the mem-table increases. Setting kFullness @@ -620,7 +621,8 @@ MemTableRep* HashCuckooRepFactory::CreateMemTableRep( if (hash_function_count > kMaxHashCount) { hash_function_count = kMaxHashCount; } - return new HashCuckooRep(compare, arena, bucket_count, hash_function_count); + return new HashCuckooRep(compare, allocator, bucket_count, + hash_function_count); } MemTableRepFactory* NewHashCuckooRepFactory(size_t write_buffer_size, diff --git a/util/hash_cuckoo_rep.h b/util/hash_cuckoo_rep.h index 669b6b7d4..9f374a978 100644 --- a/util/hash_cuckoo_rep.h +++ b/util/hash_cuckoo_rep.h @@ -28,7 +28,7 @@ class HashCuckooRepFactory : public MemTableRepFactory { virtual ~HashCuckooRepFactory() {} virtual MemTableRep* CreateMemTableRep( - const MemTableRep::KeyComparator& compare, Arena* arena, + const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator, const SliceTransform* transform, Logger* logger) override; virtual const char* Name() const override { return "HashCuckooRepFactory"; } diff --git a/util/hash_linklist_rep.cc b/util/hash_linklist_rep.cc index 4573d8340..d8e6da6aa 100644 --- a/util/hash_linklist_rep.cc +++ b/util/hash_linklist_rep.cc @@ -45,10 +45,10 @@ struct SkipListBucketHeader { MemtableSkipList skip_list; explicit SkipListBucketHeader(const MemTableRep::KeyComparator& cmp, - Arena* arena, uint32_t count) + MemTableAllocator* allocator, uint32_t count) : Counting_header(this, // Pointing to itself to indicate header type. count), - skip_list(cmp, arena) {} + skip_list(cmp, allocator) {} }; struct Node { @@ -143,10 +143,11 @@ struct Node { // which can be significant decrease of memory utilization. class HashLinkListRep : public MemTableRep { public: - HashLinkListRep(const MemTableRep::KeyComparator& compare, Arena* arena, - const SliceTransform* transform, size_t bucket_size, - uint32_t threshold_use_skiplist, size_t huge_page_tlb_size, - Logger* logger, int bucket_entries_logging_threshold, + HashLinkListRep(const MemTableRep::KeyComparator& compare, + MemTableAllocator* allocator, const SliceTransform* transform, + size_t bucket_size, uint32_t threshold_use_skiplist, + size_t huge_page_tlb_size, Logger* logger, + int bucket_entries_logging_threshold, bool if_log_bucket_dist_when_flash); virtual KeyHandle Allocate(const size_t len, char** buf) override; @@ -166,7 +167,7 @@ class HashLinkListRep : public MemTableRep { virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override; virtual MemTableRep::Iterator* GetDynamicPrefixIterator( - Arena* arena = nullptr) override; + Arena* arena = nullptr) override; private: friend class DynamicIterator; @@ -233,8 +234,8 @@ class HashLinkListRep : public MemTableRep { class FullListIterator : public MemTableRep::Iterator { public: - explicit FullListIterator(MemtableSkipList* list, Arena* arena) - : iter_(list), full_list_(list), arena_(arena) {} + explicit FullListIterator(MemtableSkipList* list, Allocator* allocator) + : iter_(list), full_list_(list), allocator_(allocator) {} virtual ~FullListIterator() { } @@ -288,7 +289,7 @@ class HashLinkListRep : public MemTableRep { MemtableSkipList::Iterator iter_; // To destruct with the iterator. std::unique_ptr full_list_; - std::unique_ptr arena_; + std::unique_ptr allocator_; std::string tmp_; // For passing to EncodeKey }; @@ -453,13 +454,14 @@ class HashLinkListRep : public MemTableRep { }; HashLinkListRep::HashLinkListRep(const MemTableRep::KeyComparator& compare, - Arena* arena, const SliceTransform* transform, + MemTableAllocator* allocator, + const SliceTransform* transform, size_t bucket_size, uint32_t threshold_use_skiplist, size_t huge_page_tlb_size, Logger* logger, int bucket_entries_logging_threshold, bool if_log_bucket_dist_when_flash) - : MemTableRep(arena), + : MemTableRep(allocator), bucket_size_(bucket_size), // Threshold to use skip list doesn't make sense if less than 3, so we // force it to be minimum of 3 to simplify implementation. @@ -469,7 +471,7 @@ HashLinkListRep::HashLinkListRep(const MemTableRep::KeyComparator& compare, logger_(logger), bucket_entries_logging_threshold_(bucket_entries_logging_threshold), if_log_bucket_dist_when_flash_(if_log_bucket_dist_when_flash) { - char* mem = arena_->AllocateAligned(sizeof(Pointer) * bucket_size, + char* mem = allocator_->AllocateAligned(sizeof(Pointer) * bucket_size, huge_page_tlb_size, logger); buckets_ = new (mem) Pointer[bucket_size]; @@ -483,7 +485,7 @@ HashLinkListRep::~HashLinkListRep() { } KeyHandle HashLinkListRep::Allocate(const size_t len, char** buf) { - char* mem = arena_->AllocateAligned(sizeof(Node) + len); + char* mem = allocator_->AllocateAligned(sizeof(Node) + len); Node* x = new (mem) Node(); *buf = x->key; return static_cast(x); @@ -559,7 +561,7 @@ void HashLinkListRep::Insert(KeyHandle handle) { // the new node. Otherwise, we might need to change next pointer of first. // In that case, a reader might sees the next pointer is NULL and wrongly // think the node is a bucket header. - auto* mem = arena_->AllocateAligned(sizeof(BucketHeader)); + auto* mem = allocator_->AllocateAligned(sizeof(BucketHeader)); header = new (mem) BucketHeader(first, 1); bucket.store(header, std::memory_order_release); } else { @@ -591,9 +593,9 @@ void HashLinkListRep::Insert(KeyHandle handle) { LinkListIterator bucket_iter( this, reinterpret_cast( first_next_pointer->load(std::memory_order_relaxed))); - auto mem = arena_->AllocateAligned(sizeof(SkipListBucketHeader)); + auto mem = allocator_->AllocateAligned(sizeof(SkipListBucketHeader)); SkipListBucketHeader* new_skip_list_header = new (mem) - SkipListBucketHeader(compare_, arena_, header->num_entries + 1); + SkipListBucketHeader(compare_, allocator_, header->num_entries + 1); auto& skip_list = new_skip_list_header->skip_list; // Add all current entries to the skip list @@ -669,7 +671,7 @@ bool HashLinkListRep::Contains(const char* key) const { } size_t HashLinkListRep::ApproximateMemoryUsage() { - // Memory is always allocated from the arena. + // Memory is always allocated from the allocator. return 0; } @@ -700,7 +702,7 @@ void HashLinkListRep::Get(const LookupKey& k, void* callback_args, MemTableRep::Iterator* HashLinkListRep::GetIterator(Arena* alloc_arena) { // allocate a new arena of similar size to the one currently in use - Arena* new_arena = new Arena(arena_->BlockSize()); + Arena* new_arena = new Arena(allocator_->BlockSize()); auto list = new MemtableSkipList(compare_, new_arena); HistogramImpl keys_per_bucket_hist; @@ -784,9 +786,9 @@ Node* HashLinkListRep::FindGreaterOrEqualInBucket(Node* head, } // anon namespace MemTableRep* HashLinkListRepFactory::CreateMemTableRep( - const MemTableRep::KeyComparator& compare, Arena* arena, + const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator, const SliceTransform* transform, Logger* logger) { - return new HashLinkListRep(compare, arena, transform, bucket_count_, + return new HashLinkListRep(compare, allocator, transform, bucket_count_, threshold_use_skiplist_, huge_page_tlb_size_, logger, bucket_entries_logging_threshold_, if_log_bucket_dist_when_flash_); diff --git a/util/hash_linklist_rep.h b/util/hash_linklist_rep.h index 0df35b545..629272394 100644 --- a/util/hash_linklist_rep.h +++ b/util/hash_linklist_rep.h @@ -29,7 +29,7 @@ class HashLinkListRepFactory : public MemTableRepFactory { virtual ~HashLinkListRepFactory() {} virtual MemTableRep* CreateMemTableRep( - const MemTableRep::KeyComparator& compare, Arena* arena, + const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator, const SliceTransform* transform, Logger* logger) override; virtual const char* Name() const override { diff --git a/util/hash_skiplist_rep.cc b/util/hash_skiplist_rep.cc index 1393a917e..4fb226811 100644 --- a/util/hash_skiplist_rep.cc +++ b/util/hash_skiplist_rep.cc @@ -23,9 +23,10 @@ namespace { class HashSkipListRep : public MemTableRep { public: - HashSkipListRep(const MemTableRep::KeyComparator& compare, Arena* arena, - const SliceTransform* transform, size_t bucket_size, - int32_t skiplist_height, int32_t skiplist_branching_factor); + HashSkipListRep(const MemTableRep::KeyComparator& compare, + MemTableAllocator* allocator, const SliceTransform* transform, + size_t bucket_size, int32_t skiplist_height, + int32_t skiplist_branching_factor); virtual void Insert(KeyHandle handle) override; @@ -62,7 +63,7 @@ class HashSkipListRep : public MemTableRep { const MemTableRep::KeyComparator& compare_; // immutable after construction - Arena* const arena_; + MemTableAllocator* const allocator_; inline size_t GetHash(const Slice& slice) const { return MurmurHash(slice.data(), static_cast(slice.size()), 0) % @@ -221,17 +222,19 @@ class HashSkipListRep : public MemTableRep { }; HashSkipListRep::HashSkipListRep(const MemTableRep::KeyComparator& compare, - Arena* arena, const SliceTransform* transform, + MemTableAllocator* allocator, + const SliceTransform* transform, size_t bucket_size, int32_t skiplist_height, int32_t skiplist_branching_factor) - : MemTableRep(arena), + : MemTableRep(allocator), bucket_size_(bucket_size), skiplist_height_(skiplist_height), skiplist_branching_factor_(skiplist_branching_factor), transform_(transform), compare_(compare), - arena_(arena) { - auto mem = arena->AllocateAligned(sizeof(std::atomic) * bucket_size); + allocator_(allocator) { + auto mem = allocator->AllocateAligned( + sizeof(std::atomic) * bucket_size); buckets_ = new (mem) std::atomic[bucket_size]; for (size_t i = 0; i < bucket_size_; ++i) { @@ -247,8 +250,8 @@ HashSkipListRep::Bucket* HashSkipListRep::GetInitializedBucket( size_t hash = GetHash(transformed); auto bucket = GetBucket(hash); if (bucket == nullptr) { - auto addr = arena_->AllocateAligned(sizeof(Bucket)); - bucket = new (addr) Bucket(compare_, arena_, skiplist_height_, + auto addr = allocator_->AllocateAligned(sizeof(Bucket)); + bucket = new (addr) Bucket(compare_, allocator_, skiplist_height_, skiplist_branching_factor_); buckets_[hash].store(bucket, std::memory_order_release); } @@ -291,7 +294,7 @@ void HashSkipListRep::Get(const LookupKey& k, void* callback_args, MemTableRep::Iterator* HashSkipListRep::GetIterator(Arena* arena) { // allocate a new arena of similar size to the one currently in use - Arena* new_arena = new Arena(arena_->BlockSize()); + Arena* new_arena = new Arena(allocator_->BlockSize()); auto list = new Bucket(compare_, new_arena); for (size_t i = 0; i < bucket_size_; ++i) { auto bucket = GetBucket(i); @@ -322,9 +325,9 @@ MemTableRep::Iterator* HashSkipListRep::GetDynamicPrefixIterator(Arena* arena) { } // anon namespace MemTableRep* HashSkipListRepFactory::CreateMemTableRep( - const MemTableRep::KeyComparator& compare, Arena* arena, + const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator, const SliceTransform* transform, Logger* logger) { - return new HashSkipListRep(compare, arena, transform, bucket_count_, + return new HashSkipListRep(compare, allocator, transform, bucket_count_, skiplist_height_, skiplist_branching_factor_); } diff --git a/util/hash_skiplist_rep.h b/util/hash_skiplist_rep.h index 6fec60a47..15d0fc77f 100644 --- a/util/hash_skiplist_rep.h +++ b/util/hash_skiplist_rep.h @@ -26,7 +26,7 @@ class HashSkipListRepFactory : public MemTableRepFactory { virtual ~HashSkipListRepFactory() {} virtual MemTableRep* CreateMemTableRep( - const MemTableRep::KeyComparator& compare, Arena* arena, + const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator, const SliceTransform* transform, Logger* logger) override; virtual const char* Name() const override { diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 4f925c7c3..8a8fa7a2e 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -10,6 +10,7 @@ #include "db/db_impl.h" #include "db/log_reader.h" #include "db/filename.h" +#include "db/writebuffer.h" #include "db/write_batch_internal.h" #include "rocksdb/write_batch.h" #include "rocksdb/cache.h" @@ -44,6 +45,7 @@ const string LDBCommand::ARG_FIX_PREFIX_LEN = "fix_prefix_len"; const string LDBCommand::ARG_COMPRESSION_TYPE = "compression_type"; const string LDBCommand::ARG_BLOCK_SIZE = "block_size"; const string LDBCommand::ARG_AUTO_COMPACTION = "auto_compaction"; +const string LDBCommand::ARG_DB_WRITE_BUFFER_SIZE = "db_write_buffer_size"; const string LDBCommand::ARG_WRITE_BUFFER_SIZE = "write_buffer_size"; const string LDBCommand::ARG_FILE_SIZE = "file_size"; const string LDBCommand::ARG_CREATE_IF_MISSING = "create_if_missing"; @@ -276,6 +278,17 @@ Options LDBCommand::PrepareOptionsForOpenDB() { } } + int db_write_buffer_size; + if (ParseIntOption(option_map_, ARG_DB_WRITE_BUFFER_SIZE, + db_write_buffer_size, exec_state_)) { + if (db_write_buffer_size >= 0) { + opt.db_write_buffer_size = db_write_buffer_size; + } else { + exec_state_ = LDBCommandExecuteResult::FAILED(ARG_DB_WRITE_BUFFER_SIZE + + " must be >= 0."); + } + } + int write_buffer_size; if (ParseIntOption(option_map_, ARG_WRITE_BUFFER_SIZE, write_buffer_size, exec_state_)) { @@ -584,7 +597,8 @@ void ManifestDumpCommand::DoCommand() { // SanitizeOptions(), we need to initialize it manually. options.db_paths.emplace_back("dummy", 0); WriteController wc; - VersionSet versions(dbname, &options, sopt, tc.get(), &wc); + WriteBuffer wb(options.db_write_buffer_size); + VersionSet versions(dbname, &options, sopt, tc.get(), &wb, &wc); Status s = versions.DumpManifest(options, file, verbose_, is_key_hex_); if (!s.ok()) { printf("Error in processing file %s %s\n", manifestfile.c_str(), @@ -1111,7 +1125,8 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, opt.table_cache_remove_scan_count_limit)); const InternalKeyComparator cmp(opt.comparator); WriteController wc; - VersionSet versions(db_path_, &opt, soptions, tc.get(), &wc); + WriteBuffer wb(opt.db_write_buffer_size); + VersionSet versions(db_path_, &opt, soptions, tc.get(), &wb, &wc); std::vector dummy; ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName, ColumnFamilyOptions(opt)); diff --git a/util/ldb_cmd.h b/util/ldb_cmd.h index 7436cc368..fd4d4d4b9 100644 --- a/util/ldb_cmd.h +++ b/util/ldb_cmd.h @@ -53,6 +53,7 @@ public: static const string ARG_COMPRESSION_TYPE; static const string ARG_BLOCK_SIZE; static const string ARG_AUTO_COMPACTION; + static const string ARG_DB_WRITE_BUFFER_SIZE; static const string ARG_WRITE_BUFFER_SIZE; static const string ARG_FILE_SIZE; static const string ARG_CREATE_IF_MISSING; diff --git a/util/ldb_tool.cc b/util/ldb_tool.cc index bb6c8ffca..fe2d7d538 100644 --- a/util/ldb_tool.cc +++ b/util/ldb_tool.cc @@ -53,6 +53,8 @@ public: ret.append(" --" + LDBCommand::ARG_BLOCK_SIZE + "=\n"); ret.append(" --" + LDBCommand::ARG_AUTO_COMPACTION + "=\n"); + ret.append(" --" + LDBCommand::ARG_DB_WRITE_BUFFER_SIZE + + "=\n"); ret.append(" --" + LDBCommand::ARG_WRITE_BUFFER_SIZE + "=\n"); ret.append(" --" + LDBCommand::ARG_FILE_SIZE + "=\n"); diff --git a/util/options.cc b/util/options.cc index c6b883779..085df053d 100644 --- a/util/options.cc +++ b/util/options.cc @@ -17,6 +17,7 @@ #include #include +#include "db/writebuffer.h" #include "rocksdb/cache.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/comparator.h" @@ -230,6 +231,7 @@ DBOptions::DBOptions() skip_log_error_on_recovery(false), stats_dump_period_sec(3600), advise_random_on_open(true), + db_write_buffer_size(0), access_hint_on_compaction_start(NORMAL), use_adaptive_mutex(false), bytes_per_sync(0), @@ -273,6 +275,7 @@ DBOptions::DBOptions(const Options& options) skip_log_error_on_recovery(options.skip_log_error_on_recovery), stats_dump_period_sec(options.stats_dump_period_sec), advise_random_on_open(options.advise_random_on_open), + db_write_buffer_size(options.db_write_buffer_size), access_hint_on_compaction_start(options.access_hint_on_compaction_start), use_adaptive_mutex(options.use_adaptive_mutex), bytes_per_sync(options.bytes_per_sync), @@ -336,6 +339,8 @@ void DBOptions::Dump(Logger* log) const { stats_dump_period_sec); Log(log, " Options.advise_random_on_open: %d", advise_random_on_open); + Log(log, " Options.db_write_buffer_size: %zd", + db_write_buffer_size); Log(log, " Options.access_hint_on_compaction_start: %s", access_hints[access_hint_on_compaction_start]); Log(log, " Options.use_adaptive_mutex: %d", diff --git a/util/options_helper.cc b/util/options_helper.cc index bea7f1a9d..c2bd3cb83 100644 --- a/util/options_helper.cc +++ b/util/options_helper.cc @@ -437,6 +437,8 @@ bool GetDBOptionsFromMap( new_options->stats_dump_period_sec = ParseUint32(o.second); } else if (o.first == "advise_random_on_open") { new_options->advise_random_on_open = ParseBoolean(o.first, o.second); + } else if (o.first == "db_write_buffer_size") { + new_options->db_write_buffer_size = ParseUint64(o.second); } else if (o.first == "use_adaptive_mutex") { new_options->use_adaptive_mutex = ParseBoolean(o.first, o.second); } else if (o.first == "bytes_per_sync") { diff --git a/util/skiplistrep.cc b/util/skiplistrep.cc index 1322f6c9a..ee57372fa 100644 --- a/util/skiplistrep.cc +++ b/util/skiplistrep.cc @@ -18,9 +18,10 @@ class SkipListRep : public MemTableRep { friend class LookaheadIterator; public: - explicit SkipListRep(const MemTableRep::KeyComparator& compare, Arena* arena, + explicit SkipListRep(const MemTableRep::KeyComparator& compare, + MemTableAllocator* allocator, const SliceTransform* transform, const size_t lookahead) - : MemTableRep(arena), skip_list_(compare, arena), cmp_(compare), + : MemTableRep(allocator), skip_list_(compare, allocator), cmp_(compare), transform_(transform), lookahead_(lookahead) { } @@ -36,7 +37,7 @@ public: } virtual size_t ApproximateMemoryUsage() override { - // All memory is allocated through arena; nothing to report here + // All memory is allocated through allocator; nothing to report here return 0; } @@ -224,9 +225,9 @@ public: } MemTableRep* SkipListFactory::CreateMemTableRep( - const MemTableRep::KeyComparator& compare, Arena* arena, + const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator, const SliceTransform* transform, Logger* logger) { - return new SkipListRep(compare, arena, transform, lookahead_); + return new SkipListRep(compare, allocator, transform, lookahead_); } } // namespace rocksdb diff --git a/util/vectorrep.cc b/util/vectorrep.cc index e61b8ad08..ee38bc304 100644 --- a/util/vectorrep.cc +++ b/util/vectorrep.cc @@ -25,7 +25,8 @@ using namespace stl_wrappers; class VectorRep : public MemTableRep { public: - VectorRep(const KeyComparator& compare, Arena* arena, size_t count); + VectorRep(const KeyComparator& compare, MemTableAllocator* allocator, + size_t count); // Insert key into the collection. (The caller will pack key and value into a // single buffer and pass that in as the parameter to Insert) @@ -131,8 +132,9 @@ size_t VectorRep::ApproximateMemoryUsage() { ); } -VectorRep::VectorRep(const KeyComparator& compare, Arena* arena, size_t count) - : MemTableRep(arena), +VectorRep::VectorRep(const KeyComparator& compare, MemTableAllocator* allocator, + size_t count) + : MemTableRep(allocator), bucket_(new Bucket()), immutable_(false), sorted_(false), @@ -282,9 +284,9 @@ MemTableRep::Iterator* VectorRep::GetIterator(Arena* arena) { } // anon namespace MemTableRep* VectorRepFactory::CreateMemTableRep( - const MemTableRep::KeyComparator& compare, Arena* arena, + const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator, const SliceTransform*, Logger* logger) { - return new VectorRep(compare, arena, count_); + return new VectorRep(compare, allocator, count_); } } // namespace rocksdb #endif // ROCKSDB_LITE