diff --git a/db/column_family.cc b/db/column_family.cc index b10b800b4..94aef3819 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -383,7 +383,8 @@ void ColumnFamilyData::CreateNewMemtable() { if (mem_ != nullptr) { delete mem_->Unref(); } - mem_ = new MemTable(internal_comparator_, options_); + mem_ = new MemTable(internal_comparator_, ioptions_, + MemTableOptions(options_)); mem_->Ref(); } diff --git a/db/db_impl.cc b/db/db_impl.cc index a7c9206f9..fc98b2abd 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3434,10 +3434,10 @@ Status DBImpl::GetImpl(const ReadOptions& options, LookupKey lkey(key, snapshot); PERF_TIMER_STOP(get_snapshot_time); - if (sv->mem->Get(lkey, value, &s, merge_context, *cfd->options())) { + if (sv->mem->Get(lkey, value, &s, merge_context)) { // Done RecordTick(stats_, MEMTABLE_HIT); - } else if (sv->imm->Get(lkey, value, &s, merge_context, *cfd->options())) { + } else if (sv->imm->Get(lkey, value, &s, merge_context)) { // Done RecordTick(stats_, MEMTABLE_HIT); } else { @@ -3522,12 +3522,9 @@ std::vector DBImpl::MultiGet( assert(mgd_iter != multiget_cf_data.end()); auto mgd = mgd_iter->second; auto super_version = mgd->super_version; - auto cfd = mgd->cfd; - if (super_version->mem->Get(lkey, value, &s, merge_context, - *cfd->options())) { + if (super_version->mem->Get(lkey, value, &s, merge_context)) { // Done - } else if (super_version->imm->Get(lkey, value, &s, merge_context, - *cfd->options())) { + } else if (super_version->imm->Get(lkey, value, &s, merge_context)) { // Done } else { super_version->current->Get(options, lkey, value, &s, &merge_context); @@ -4294,7 +4291,9 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, } if (s.ok()) { - new_mem = new MemTable(cfd->internal_comparator(), *cfd->options()); + new_mem = new MemTable(cfd->internal_comparator(), + *cfd->ioptions(), + MemTableOptions(*cfd->options())); new_superversion = new SuperVersion(); } } diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index 86fa0852b..b1fae82cf 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -41,9 +41,9 @@ namespace rocksdb { -DBImplReadOnly::DBImplReadOnly(const DBOptions& options, +DBImplReadOnly::DBImplReadOnly(const DBOptions& db_options, const std::string& dbname) - : DBImpl(options, dbname) { + : DBImpl(db_options, dbname) { Log(db_options_.info_log, "Opening the db in read only mode"); } @@ -51,7 +51,7 @@ DBImplReadOnly::~DBImplReadOnly() { } // Implementations of the DB interface -Status DBImplReadOnly::Get(const ReadOptions& options, +Status DBImplReadOnly::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value) { Status s; @@ -61,10 +61,9 @@ Status DBImplReadOnly::Get(const ReadOptions& options, SuperVersion* super_version = cfd->GetSuperVersion(); MergeContext merge_context; LookupKey lkey(key, snapshot); - if (super_version->mem->Get(lkey, value, &s, merge_context, - *cfd->options())) { + if (super_version->mem->Get(lkey, value, &s, merge_context)) { } else { - super_version->current->Get(options, lkey, value, &s, &merge_context); + super_version->current->Get(read_options, lkey, value, &s, &merge_context); } return s; } diff --git a/db/memtable.cc b/db/memtable.cc index 1ed0e2cea..d7923711a 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -31,35 +31,51 @@ namespace rocksdb { -MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options) +MemTableOptions::MemTableOptions(const Options& options) + : write_buffer_size(options.write_buffer_size), + arena_block_size(options.arena_block_size), + memtable_prefix_bloom_bits(options.memtable_prefix_bloom_bits), + memtable_prefix_bloom_probes(options.memtable_prefix_bloom_probes), + memtable_prefix_bloom_huge_page_tlb_size( + options.memtable_prefix_bloom_huge_page_tlb_size), + inplace_update_support(options.inplace_update_support), + inplace_update_num_locks(options.inplace_update_num_locks), + inplace_callback(options.inplace_callback), + max_successive_merges(options.max_successive_merges), + filter_deletes(options.filter_deletes) {} + +MemTable::MemTable(const InternalKeyComparator& cmp, + const ImmutableCFOptions& ioptions, + const MemTableOptions& moptions) : comparator_(cmp), + ioptions_(ioptions), + moptions_(moptions), refs_(0), - kArenaBlockSize(OptimizeBlockSize(options.arena_block_size)), - kWriteBufferSize(options.write_buffer_size), - arena_(options.arena_block_size), - table_(options.memtable_factory->CreateMemTableRep( - comparator_, &arena_, options.prefix_extractor.get(), - options.info_log.get())), + kArenaBlockSize(OptimizeBlockSize(moptions.arena_block_size)), + arena_(moptions.arena_block_size), + table_(ioptions.memtable_factory->CreateMemTableRep( + comparator_, &arena_, ioptions.prefix_extractor, + ioptions.info_log)), num_entries_(0), flush_in_progress_(false), flush_completed_(false), file_number_(0), first_seqno_(0), mem_next_logfile_number_(0), - locks_(options.inplace_update_support ? options.inplace_update_num_locks - : 0), - prefix_extractor_(options.prefix_extractor.get()), + locks_(moptions.inplace_update_support ? moptions.inplace_update_num_locks + : 0), + prefix_extractor_(ioptions.prefix_extractor), should_flush_(ShouldFlushNow()) { // if should_flush_ == true without an entry inserted, something must have // gone wrong already. assert(!should_flush_); - if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) { + if (prefix_extractor_ && moptions.memtable_prefix_bloom_bits > 0) { prefix_bloom_.reset(new DynamicBloom( &arena_, - options.memtable_prefix_bloom_bits, options.bloom_locality, - options.memtable_prefix_bloom_probes, nullptr, - options.memtable_prefix_bloom_huge_page_tlb_size, - options.info_log.get())); + moptions.memtable_prefix_bloom_bits, ioptions.bloom_locality, + moptions.memtable_prefix_bloom_probes, nullptr, + moptions.memtable_prefix_bloom_huge_page_tlb_size, + ioptions.info_log)); } } @@ -97,14 +113,16 @@ bool MemTable::ShouldFlushNow() const { // if we can still allocate one more block without exceeding the // over-allocation ratio, then we should not flush. if (allocated_memory + kArenaBlockSize < - kWriteBufferSize + kArenaBlockSize * kAllowOverAllocationRatio) { + moptions_.write_buffer_size + + kArenaBlockSize * kAllowOverAllocationRatio) { return false; } - // if user keeps adding entries that exceeds kWriteBufferSize, we need to - // flush earlier even though we still have much available memory left. - if (allocated_memory > - kWriteBufferSize + kArenaBlockSize * kAllowOverAllocationRatio) { + // if user keeps adding entries that exceeds moptions.write_buffer_size, + // we need to flush earlier even though we still have much available + // memory left. + if (allocated_memory > moptions_.write_buffer_size + + kArenaBlockSize * kAllowOverAllocationRatio) { return true; } @@ -175,12 +193,12 @@ const char* EncodeKey(std::string* scratch, const Slice& target) { class MemTableIterator: public Iterator { public: MemTableIterator( - const MemTable& mem, const ReadOptions& options, Arena* arena) + const MemTable& mem, const ReadOptions& read_options, Arena* arena) : bloom_(nullptr), prefix_extractor_(mem.prefix_extractor_), valid_(false), arena_mode_(arena != nullptr) { - if (prefix_extractor_ != nullptr && !options.total_order_seek) { + if (prefix_extractor_ != nullptr && !read_options.total_order_seek) { bloom_ = mem.prefix_bloom_.get(); iter_ = mem.table_->GetDynamicPrefixIterator(arena); } else { @@ -248,10 +266,10 @@ class MemTableIterator: public Iterator { void operator=(const MemTableIterator&); }; -Iterator* MemTable::NewIterator(const ReadOptions& options, Arena* arena) { +Iterator* MemTable::NewIterator(const ReadOptions& read_options, Arena* arena) { assert(arena != nullptr); auto mem = arena->AllocateAligned(sizeof(MemTableIterator)); - return new (mem) MemTableIterator(*this, options, arena); + return new (mem) MemTableIterator(*this, read_options, arena); } port::RWMutex* MemTable::GetLock(const Slice& key) { @@ -412,7 +430,7 @@ static bool SaveValue(void* arg, const char* entry) { } bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, - MergeContext& merge_context, const Options& options) { + MergeContext& merge_context) { // The sequence number is updated synchronously in version_set.h if (IsEmpty()) { // Avoiding recording stats for speed. @@ -437,10 +455,10 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, saver.status = s; saver.mem = this; saver.merge_context = &merge_context; - saver.merge_operator = options.merge_operator.get(); - saver.logger = options.info_log.get(); - saver.inplace_update_support = options.inplace_update_support; - saver.statistics = options.statistics.get(); + saver.merge_operator = ioptions_.merge_operator; + saver.logger = ioptions_.info_log; + saver.inplace_update_support = moptions_.inplace_update_support; + saver.statistics = ioptions_.statistics; table_->Get(key, &saver, SaveValue); } @@ -512,8 +530,7 @@ void MemTable::Update(SequenceNumber seq, bool MemTable::UpdateCallback(SequenceNumber seq, const Slice& key, - const Slice& delta, - const Options& options) { + const Slice& delta) { LookupKey lkey(key, seq); Slice memkey = lkey.memtable_key(); @@ -548,8 +565,8 @@ bool MemTable::UpdateCallback(SequenceNumber seq, std::string str_value; WriteLock wl(GetLock(lkey.user_key())); - auto status = options.inplace_callback(prev_buffer, &new_prev_size, - delta, &str_value); + auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size, + delta, &str_value); if (status == UpdateStatus::UPDATED_INPLACE) { // Value already updated by callback. assert(new_prev_size <= prev_size); @@ -562,12 +579,12 @@ bool MemTable::UpdateCallback(SequenceNumber seq, memcpy(p, prev_buffer, new_prev_size); } } - RecordTick(options.statistics.get(), NUMBER_KEYS_UPDATED); + RecordTick(ioptions_.statistics, NUMBER_KEYS_UPDATED); should_flush_ = ShouldFlushNow(); return true; } else if (status == UpdateStatus::UPDATED) { Add(seq, kTypeValue, key, Slice(str_value)); - RecordTick(options.statistics.get(), NUMBER_KEYS_WRITTEN); + RecordTick(ioptions_.statistics, NUMBER_KEYS_WRITTEN); should_flush_ = ShouldFlushNow(); return true; } else if (status == UpdateStatus::UPDATE_FAILED) { diff --git a/db/memtable.h b/db/memtable.h index 80dcdd42e..26772e0f5 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -16,6 +16,7 @@ #include "db/version_edit.h" #include "rocksdb/db.h" #include "rocksdb/memtablerep.h" +#include "rocksdb/immutable_options.h" #include "util/arena.h" #include "util/dynamic_bloom.h" @@ -26,6 +27,23 @@ class Mutex; class MemTableIterator; class MergeContext; +struct MemTableOptions { + explicit MemTableOptions(const Options& options); + size_t write_buffer_size; + size_t arena_block_size; + uint32_t memtable_prefix_bloom_bits; + uint32_t memtable_prefix_bloom_probes; + size_t memtable_prefix_bloom_huge_page_tlb_size; + bool inplace_update_support; + size_t inplace_update_num_locks; + UpdateStatus (*inplace_callback)(char* existing_value, + uint32_t* existing_value_size, + Slice delta_value, + std::string* merged_value); + size_t max_successive_merges; + bool filter_deletes; +}; + class MemTable { public: struct KeyComparator : public MemTableRep::KeyComparator { @@ -40,7 +58,8 @@ class MemTable { // MemTables are reference counted. The initial reference count // is zero and the caller must call Ref() at least once. explicit MemTable(const InternalKeyComparator& comparator, - const Options& options); + const ImmutableCFOptions& ioptions, + const MemTableOptions& moptions); ~MemTable(); @@ -81,7 +100,7 @@ class MemTable { // arena: If not null, the arena needs to be used to allocate the Iterator. // Calling ~Iterator of the iterator will destroy all the states but // those allocated in arena. - Iterator* NewIterator(const ReadOptions& options, Arena* arena); + Iterator* NewIterator(const ReadOptions& read_options, Arena* arena); // Add an entry into memtable that maps key to value at the // specified sequence number and with the specified type. @@ -99,7 +118,7 @@ class MemTable { // store MergeInProgress in s, and return false. // Else, return false. bool Get(const LookupKey& key, std::string* value, Status* s, - MergeContext& merge_context, const Options& options); + MergeContext& merge_context); // Attempts to update the new_value inplace, else does normal Add // Pseudocode @@ -123,8 +142,7 @@ class MemTable { // else return false bool UpdateCallback(SequenceNumber seq, const Slice& key, - const Slice& delta, - const Options& options); + const Slice& delta); // Returns the number of successive merge entries starting from the newest // entry for the key up to the last non-merge entry or last entry for the @@ -172,6 +190,9 @@ class MemTable { const Arena& TEST_GetArena() const { return arena_; } + const ImmutableCFOptions* GetImmutableOptions() const { return &ioptions_; } + const MemTableOptions* GetMemTableOptions() const { return &moptions_; } + private: // Dynamically check if we can add more incoming entries. bool ShouldFlushNow() const; @@ -181,9 +202,10 @@ class MemTable { friend class MemTableList; KeyComparator comparator_; + const ImmutableCFOptions& ioptions_; + const MemTableOptions moptions_; int refs_; const size_t kArenaBlockSize; - const size_t kWriteBufferSize; Arena arena_; unique_ptr table_; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 418aae230..ced03dc82 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -62,10 +62,9 @@ int MemTableList::size() const { // Return the most recent value found, if any. // Operands stores the list of merge operations to apply, so far. bool MemTableListVersion::Get(const LookupKey& key, std::string* value, - Status* s, MergeContext& merge_context, - const Options& options) { + Status* s, MergeContext& merge_context) { for (auto& memtable : memlist_) { - if (memtable->Get(key, value, s, merge_context, options)) { + if (memtable->Get(key, value, s, merge_context)) { return true; } } diff --git a/db/memtable_list.h b/db/memtable_list.h index 997834e78..042ffc5cf 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -46,7 +46,7 @@ class MemTableListVersion { // Search all the memtables starting from the most recent one. // Return the most recent value found, if any. bool Get(const LookupKey& key, std::string* value, Status* s, - MergeContext& merge_context, const Options& options); + MergeContext& merge_context); void AddIterators(const ReadOptions& options, std::vector* iterator_list, Arena* arena); diff --git a/db/repair.cc b/db/repair.cc index ea6cdd642..bff81991e 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -219,7 +219,7 @@ class Repairer { std::string scratch; Slice record; WriteBatch batch; - MemTable* mem = new MemTable(icmp_, options_); + MemTable* mem = new MemTable(icmp_, ioptions_, MemTableOptions(options_)); auto cf_mems_default = new ColumnFamilyMemTablesDefault(mem, &options_); mem->Ref(); int counter = 0; diff --git a/db/write_batch.cc b/db/write_batch.cc index bfa5e3f6f..cacb4a5e3 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -23,7 +23,6 @@ // data: uint8[len] #include "rocksdb/write_batch.h" -#include "rocksdb/options.h" #include "rocksdb/merge_operator.h" #include "db/dbformat.h" #include "db/db_impl.h" @@ -350,14 +349,15 @@ class MemTableInserter : public WriteBatch::Handler { return seek_status; } MemTable* mem = cf_mems_->GetMemTable(); - const Options* options = cf_mems_->GetOptions(); - if (!options->inplace_update_support) { + auto* ioptions = mem->GetImmutableOptions(); + auto* moptions = mem->GetMemTableOptions(); + if (!moptions->inplace_update_support) { mem->Add(sequence_, kTypeValue, key, value); - } else if (options->inplace_callback == nullptr) { + } else if (moptions->inplace_callback == nullptr) { mem->Update(sequence_, key, value); - RecordTick(options->statistics.get(), NUMBER_KEYS_UPDATED); + RecordTick(ioptions->statistics, NUMBER_KEYS_UPDATED); } else { - if (mem->UpdateCallback(sequence_, key, value, *options)) { + if (mem->UpdateCallback(sequence_, key, value)) { } else { // key not found in memtable. Do sst get, update, add SnapshotImpl read_from_snapshot; @@ -376,17 +376,17 @@ class MemTableInserter : public WriteBatch::Handler { char* prev_buffer = const_cast(prev_value.c_str()); uint32_t prev_size = prev_value.size(); - auto status = options->inplace_callback(s.ok() ? prev_buffer : nullptr, - s.ok() ? &prev_size : nullptr, - value, &merged_value); + auto status = moptions->inplace_callback(s.ok() ? prev_buffer : nullptr, + s.ok() ? &prev_size : nullptr, + value, &merged_value); if (status == UpdateStatus::UPDATED_INPLACE) { // prev_value is updated in-place with final value. mem->Add(sequence_, kTypeValue, key, Slice(prev_buffer, prev_size)); - RecordTick(options->statistics.get(), NUMBER_KEYS_WRITTEN); + RecordTick(ioptions->statistics, NUMBER_KEYS_WRITTEN); } else if (status == UpdateStatus::UPDATED) { // merged_value contains the final value. mem->Add(sequence_, kTypeValue, key, Slice(merged_value)); - RecordTick(options->statistics.get(), NUMBER_KEYS_WRITTEN); + RecordTick(ioptions->statistics, NUMBER_KEYS_WRITTEN); } } } @@ -405,17 +405,18 @@ class MemTableInserter : public WriteBatch::Handler { return seek_status; } MemTable* mem = cf_mems_->GetMemTable(); - const Options* options = cf_mems_->GetOptions(); + auto* ioptions = mem->GetImmutableOptions(); + auto* moptions = mem->GetMemTableOptions(); bool perform_merge = false; - if (options->max_successive_merges > 0 && db_ != nullptr) { + if (moptions->max_successive_merges > 0 && db_ != nullptr) { LookupKey lkey(key, sequence_); // Count the number of successive merges at the head // of the key in the memtable size_t num_merges = mem->CountSuccessiveMergeEntries(lkey); - if (num_merges >= options->max_successive_merges) { + if (num_merges >= moptions->max_successive_merges) { perform_merge = true; } } @@ -439,16 +440,16 @@ class MemTableInserter : public WriteBatch::Handler { Slice get_value_slice = Slice(get_value); // 2) Apply this merge - auto merge_operator = options->merge_operator.get(); + auto merge_operator = ioptions->merge_operator; assert(merge_operator); std::deque operands; operands.push_front(value.ToString()); std::string new_value; if (!merge_operator->FullMerge(key, &get_value_slice, operands, - &new_value, options->info_log.get())) { + &new_value, ioptions->info_log)) { // Failed to merge! - RecordTick(options->statistics.get(), NUMBER_MERGE_FAILURES); + RecordTick(ioptions->statistics, NUMBER_MERGE_FAILURES); // Store the delta in memtable perform_merge = false; @@ -474,8 +475,9 @@ class MemTableInserter : public WriteBatch::Handler { return seek_status; } MemTable* mem = cf_mems_->GetMemTable(); - const Options* options = cf_mems_->GetOptions(); - if (!dont_filter_deletes_ && options->filter_deletes) { + auto* ioptions = mem->GetImmutableOptions(); + auto* moptions = mem->GetMemTableOptions(); + if (!dont_filter_deletes_ && moptions->filter_deletes) { SnapshotImpl read_from_snapshot; read_from_snapshot.number_ = sequence_; ReadOptions ropts; @@ -486,7 +488,7 @@ class MemTableInserter : public WriteBatch::Handler { cf_handle = db_->DefaultColumnFamily(); } if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) { - RecordTick(options->statistics.get(), NUMBER_FILTERED_DELETES); + RecordTick(ioptions->statistics, NUMBER_FILTERED_DELETES); return Status::OK(); } } diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index aefb01e79..0c69b6af9 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -27,7 +27,8 @@ static std::string PrintContents(WriteBatch* b) { auto factory = std::make_shared(); Options options; options.memtable_factory = factory; - MemTable* mem = new MemTable(cmp, options); + MemTable* mem = new MemTable(cmp, ImmutableCFOptions(options), + MemTableOptions(options)); mem->Ref(); std::string state; ColumnFamilyMemTablesDefault cf_mems_default(mem, &options); diff --git a/include/rocksdb/immutable_options.h b/include/rocksdb/immutable_options.h index f3e41c89e..de4480cff 100644 --- a/include/rocksdb/immutable_options.h +++ b/include/rocksdb/immutable_options.h @@ -51,6 +51,8 @@ struct ImmutableCFOptions { std::vector db_paths; + MemTableRepFactory* memtable_factory; + TableFactory* table_factory; Options::TablePropertiesCollectorFactories diff --git a/table/table_test.cc b/table/table_test.cc index a0f844014..118291daa 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -437,21 +437,25 @@ class MemTableConstructor: public Constructor { table_factory_(new SkipListFactory) { Options options; options.memtable_factory = table_factory_; - memtable_ = new MemTable(internal_comparator_, options); + memtable_ = new MemTable(internal_comparator_, + ImmutableCFOptions(options), + MemTableOptions(options)); memtable_->Ref(); } ~MemTableConstructor() { delete memtable_->Unref(); } - virtual Status FinishImpl(const Options& options, + virtual Status FinishImpl(const Options&, const ImmutableCFOptions& ioptions, const BlockBasedTableOptions& table_options, const InternalKeyComparator& internal_comparator, const KVMap& data) { delete memtable_->Unref(); - Options memtable_options; - memtable_options.memtable_factory = table_factory_; - memtable_ = new MemTable(internal_comparator_, memtable_options); + Options options; + options.memtable_factory = table_factory_; + memtable_ = new MemTable(internal_comparator_, + ImmutableCFOptions(options), + MemTableOptions(options)); memtable_->Ref(); int seq = 1; for (KVMap::const_iterator it = data.begin(); @@ -1859,7 +1863,8 @@ TEST(MemTableTest, Simple) { auto table_factory = std::make_shared(); Options options; options.memtable_factory = table_factory; - MemTable* memtable = new MemTable(cmp, options); + MemTable* memtable = new MemTable(cmp, ImmutableCFOptions(options), + MemTableOptions(options)); memtable->Ref(); WriteBatch batch; WriteBatchInternal::SetSequence(&batch, 100); diff --git a/util/options.cc b/util/options.cc index 4def58ffe..a61d9d633 100644 --- a/util/options.cc +++ b/util/options.cc @@ -47,6 +47,7 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options) allow_mmap_reads(options.allow_mmap_reads), allow_mmap_writes(options.allow_mmap_writes), db_paths(options.db_paths), + memtable_factory(options.memtable_factory.get()), table_factory(options.table_factory.get()), table_properties_collector_factories( options.table_properties_collector_factories),