diff --git a/db/column_family.cc b/db/column_family.cc index a728a3fd5..0e83e98ab 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -388,12 +388,13 @@ const EnvOptions* ColumnFamilyData::soptions() const { void ColumnFamilyData::SetCurrent(Version* current) { current_ = current; } -void ColumnFamilyData::CreateNewMemtable(const MemTableOptions& moptions) { +void ColumnFamilyData::CreateNewMemtable( + const MutableCFOptions& mutable_cf_options) { assert(current_ != nullptr); if (mem_ != nullptr) { delete mem_->Unref(); } - mem_ = new MemTable(internal_comparator_, ioptions_, moptions); + mem_ = new MemTable(internal_comparator_, ioptions_, mutable_cf_options); mem_->Ref(); } diff --git a/db/column_family.h b/db/column_family.h index 96b08c52e..b37b684fa 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -198,7 +198,7 @@ class ColumnFamilyData { Version* dummy_versions() { return dummy_versions_; } void SetMemtable(MemTable* new_mem) { mem_ = new_mem; } void SetCurrent(Version* current); - void CreateNewMemtable(const MemTableOptions& moptions); + void CreateNewMemtable(const MutableCFOptions& mutable_cf_options); TableCache* table_cache() const { return table_cache_.get(); } diff --git a/db/db_impl.cc b/db/db_impl.cc index 334b6df0f..40b94acab 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1228,8 +1228,7 @@ Status DBImpl::Recover( if (!s.ok()) { // Clear memtables if recovery failed for (auto cfd : *versions_->GetColumnFamilySet()) { - cfd->CreateNewMemtable(MemTableOptions( - *cfd->GetLatestMutableCFOptions(), *cfd->options())); + cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions()); } } } @@ -1360,8 +1359,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // file-systems cause the DB::Open() to fail. return status; } - cfd->CreateNewMemtable(MemTableOptions( - *cfd->GetLatestMutableCFOptions(), *cfd->options())); + cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions()); } } } @@ -1398,8 +1396,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // Recovery failed break; } - cfd->CreateNewMemtable(MemTableOptions( - *cfd->GetLatestMutableCFOptions(), *cfd->options())); + cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions()); } // write MANIFEST with update @@ -2749,7 +2746,7 @@ Status DBImpl::ProcessKeyValueCompaction( ColumnFamilyData* cfd = compact->compaction->column_family_data(); MergeHelper merge( cfd->user_comparator(), cfd->ioptions()->merge_operator, - db_options_.info_log.get(), cfd->options()->min_partial_merge_operands, + db_options_.info_log.get(), cfd->ioptions()->min_partial_merge_operands, false /* internal key corruption is expected */); auto compaction_filter = cfd->ioptions()->compaction_filter; std::unique_ptr compaction_filter_from_factory = nullptr; @@ -4281,9 +4278,8 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, } if (s.ok()) { - new_mem = new MemTable(cfd->internal_comparator(), - *cfd->ioptions(), MemTableOptions(mutable_cf_options, - *cfd->options())); + new_mem = new MemTable(cfd->internal_comparator(), *cfd->ioptions(), + mutable_cf_options); new_superversion = new SuperVersion(); } } diff --git a/db/memtable.cc b/db/memtable.cc index b9b99a684..8d9d99d7e 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -32,7 +32,8 @@ namespace rocksdb { MemTableOptions::MemTableOptions( - const MutableCFOptions& mutable_cf_options, const Options& options) + const ImmutableCFOptions& ioptions, + const MutableCFOptions& mutable_cf_options) : write_buffer_size(mutable_cf_options.write_buffer_size), arena_block_size(mutable_cf_options.arena_block_size), memtable_prefix_bloom_bits(mutable_cf_options.memtable_prefix_bloom_bits), @@ -40,21 +41,23 @@ MemTableOptions::MemTableOptions( mutable_cf_options.memtable_prefix_bloom_probes), memtable_prefix_bloom_huge_page_tlb_size( mutable_cf_options.memtable_prefix_bloom_huge_page_tlb_size), - inplace_update_support(options.inplace_update_support), - inplace_update_num_locks(options.inplace_update_num_locks), - inplace_callback(options.inplace_callback), + inplace_update_support(ioptions.inplace_update_support), + inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks), + inplace_callback(ioptions.inplace_callback), max_successive_merges(mutable_cf_options.max_successive_merges), - filter_deletes(mutable_cf_options.filter_deletes) {} + filter_deletes(mutable_cf_options.filter_deletes), + statistics(ioptions.statistics), + merge_operator(ioptions.merge_operator), + info_log(ioptions.info_log) {} MemTable::MemTable(const InternalKeyComparator& cmp, const ImmutableCFOptions& ioptions, - const MemTableOptions& moptions) + const MutableCFOptions& mutable_cf_options) : comparator_(cmp), - ioptions_(ioptions), - moptions_(moptions), + moptions_(ioptions, mutable_cf_options), refs_(0), - kArenaBlockSize(OptimizeBlockSize(moptions.arena_block_size)), - arena_(moptions.arena_block_size), + kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)), + arena_(moptions_.arena_block_size), table_(ioptions.memtable_factory->CreateMemTableRep( comparator_, &arena_, ioptions.prefix_extractor, ioptions.info_log)), num_entries_(0), @@ -63,20 +66,20 @@ MemTable::MemTable(const InternalKeyComparator& cmp, file_number_(0), first_seqno_(0), mem_next_logfile_number_(0), - locks_(moptions.inplace_update_support ? moptions.inplace_update_num_locks - : 0), + locks_(moptions_.inplace_update_support ? + moptions_.inplace_update_num_locks : 0), prefix_extractor_(ioptions.prefix_extractor), should_flush_(ShouldFlushNow()), flush_scheduled_(false) { // if should_flush_ == true without an entry inserted, something must have // gone wrong already. assert(!should_flush_); - if (prefix_extractor_ && moptions.memtable_prefix_bloom_bits > 0) { + if (prefix_extractor_ && moptions_.memtable_prefix_bloom_bits > 0) { prefix_bloom_.reset(new DynamicBloom( &arena_, - moptions.memtable_prefix_bloom_bits, ioptions.bloom_locality, - moptions.memtable_prefix_bloom_probes, nullptr, - moptions.memtable_prefix_bloom_huge_page_tlb_size, + moptions_.memtable_prefix_bloom_bits, ioptions.bloom_locality, + moptions_.memtable_prefix_bloom_probes, nullptr, + moptions_.memtable_prefix_bloom_huge_page_tlb_size, ioptions.info_log)); } } @@ -454,10 +457,10 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, saver.status = s; saver.mem = this; saver.merge_context = merge_context; - saver.merge_operator = ioptions_.merge_operator; - saver.logger = ioptions_.info_log; + saver.merge_operator = moptions_.merge_operator; + saver.logger = moptions_.info_log; saver.inplace_update_support = moptions_.inplace_update_support; - saver.statistics = ioptions_.statistics; + saver.statistics = moptions_.statistics; table_->Get(key, &saver, SaveValue); } @@ -578,12 +581,12 @@ bool MemTable::UpdateCallback(SequenceNumber seq, memcpy(p, prev_buffer, new_prev_size); } } - RecordTick(ioptions_.statistics, NUMBER_KEYS_UPDATED); + RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED); should_flush_ = ShouldFlushNow(); return true; } else if (status == UpdateStatus::UPDATED) { Add(seq, kTypeValue, key, Slice(str_value)); - RecordTick(ioptions_.statistics, NUMBER_KEYS_WRITTEN); + RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN); should_flush_ = ShouldFlushNow(); return true; } else if (status == UpdateStatus::UPDATE_FAILED) { diff --git a/db/memtable.h b/db/memtable.h index ce6cce7f6..96af1e90a 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -32,8 +32,8 @@ class MergeContext; struct MemTableOptions { explicit MemTableOptions( - const MutableCFOptions& mutable_cf_options, - const Options& options); + const ImmutableCFOptions& ioptions, + const MutableCFOptions& mutable_cf_options); size_t write_buffer_size; size_t arena_block_size; uint32_t memtable_prefix_bloom_bits; @@ -47,6 +47,9 @@ struct MemTableOptions { std::string* merged_value); size_t max_successive_merges; bool filter_deletes; + Statistics* statistics; + MergeOperator* merge_operator; + Logger* info_log; }; class MemTable { @@ -64,7 +67,7 @@ class MemTable { // is zero and the caller must call Ref() at least once. explicit MemTable(const InternalKeyComparator& comparator, const ImmutableCFOptions& ioptions, - const MemTableOptions& moptions); + const MutableCFOptions& mutable_cf_options); ~MemTable(); @@ -199,7 +202,6 @@ class MemTable { const Arena& TEST_GetArena() const { return arena_; } - const ImmutableCFOptions* GetImmutableOptions() const { return &ioptions_; } const MemTableOptions* GetMemTableOptions() const { return &moptions_; } private: @@ -211,7 +213,6 @@ class MemTable { friend class MemTableList; KeyComparator comparator_; - const ImmutableCFOptions& ioptions_; const MemTableOptions moptions_; int refs_; const size_t kArenaBlockSize; diff --git a/db/repair.cc b/db/repair.cc index 80fb92bd9..10628c544 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -220,7 +220,7 @@ class Repairer { Slice record; WriteBatch batch; MemTable* mem = new MemTable(icmp_, ioptions_, - MemTableOptions(MutableCFOptions(options_, ioptions_), options_)); + MutableCFOptions(options_, ioptions_)); auto cf_mems_default = new ColumnFamilyMemTablesDefault(mem, &options_); mem->Ref(); int counter = 0; diff --git a/db/version_set.cc b/db/version_set.cc index 65c36c715..ea52d95bf 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2926,8 +2926,7 @@ ColumnFamilyData* VersionSet::CreateColumnFamily( AppendVersion(new_cfd, v); // GetLatestMutableCFOptions() is safe here without mutex since the // cfd is not available to client - new_cfd->CreateNewMemtable(MemTableOptions( - *new_cfd->GetLatestMutableCFOptions(), *new_cfd->options())); + new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions()); new_cfd->SetLogNumber(edit->log_number_); return new_cfd; } diff --git a/db/write_batch.cc b/db/write_batch.cc index b8d0322d8..6e15ec5c0 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -349,13 +349,12 @@ class MemTableInserter : public WriteBatch::Handler { return seek_status; } MemTable* mem = cf_mems_->GetMemTable(); - auto* ioptions = mem->GetImmutableOptions(); auto* moptions = mem->GetMemTableOptions(); if (!moptions->inplace_update_support) { mem->Add(sequence_, kTypeValue, key, value); } else if (moptions->inplace_callback == nullptr) { mem->Update(sequence_, key, value); - RecordTick(ioptions->statistics, NUMBER_KEYS_UPDATED); + RecordTick(moptions->statistics, NUMBER_KEYS_UPDATED); } else { if (mem->UpdateCallback(sequence_, key, value)) { } else { @@ -382,11 +381,11 @@ class MemTableInserter : public WriteBatch::Handler { if (status == UpdateStatus::UPDATED_INPLACE) { // prev_value is updated in-place with final value. mem->Add(sequence_, kTypeValue, key, Slice(prev_buffer, prev_size)); - RecordTick(ioptions->statistics, NUMBER_KEYS_WRITTEN); + RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN); } else if (status == UpdateStatus::UPDATED) { // merged_value contains the final value. mem->Add(sequence_, kTypeValue, key, Slice(merged_value)); - RecordTick(ioptions->statistics, NUMBER_KEYS_WRITTEN); + RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN); } } } @@ -406,7 +405,6 @@ class MemTableInserter : public WriteBatch::Handler { return seek_status; } MemTable* mem = cf_mems_->GetMemTable(); - auto* ioptions = mem->GetImmutableOptions(); auto* moptions = mem->GetMemTableOptions(); bool perform_merge = false; @@ -441,16 +439,16 @@ class MemTableInserter : public WriteBatch::Handler { Slice get_value_slice = Slice(get_value); // 2) Apply this merge - auto merge_operator = ioptions->merge_operator; + auto merge_operator = moptions->merge_operator; assert(merge_operator); std::deque operands; operands.push_front(value.ToString()); std::string new_value; if (!merge_operator->FullMerge(key, &get_value_slice, operands, - &new_value, ioptions->info_log)) { + &new_value, moptions->info_log)) { // Failed to merge! - RecordTick(ioptions->statistics, NUMBER_MERGE_FAILURES); + RecordTick(moptions->statistics, NUMBER_MERGE_FAILURES); // Store the delta in memtable perform_merge = false; @@ -477,7 +475,6 @@ class MemTableInserter : public WriteBatch::Handler { return seek_status; } MemTable* mem = cf_mems_->GetMemTable(); - auto* ioptions = mem->GetImmutableOptions(); auto* moptions = mem->GetMemTableOptions(); if (!dont_filter_deletes_ && moptions->filter_deletes) { SnapshotImpl read_from_snapshot; @@ -490,7 +487,7 @@ class MemTableInserter : public WriteBatch::Handler { cf_handle = db_->DefaultColumnFamily(); } if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) { - RecordTick(ioptions->statistics, NUMBER_FILTERED_DELETES); + RecordTick(moptions->statistics, NUMBER_FILTERED_DELETES); return Status::OK(); } } diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index cb4048214..d24b2e068 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -29,7 +29,7 @@ static std::string PrintContents(WriteBatch* b) { options.memtable_factory = factory; ImmutableCFOptions ioptions(options); MemTable* mem = new MemTable(cmp, ioptions, - MemTableOptions(MutableCFOptions(options, ioptions), options)); + MutableCFOptions(options, ioptions)); mem->Ref(); std::string state; ColumnFamilyMemTablesDefault cf_mems_default(mem, &options); diff --git a/include/rocksdb/immutable_options.h b/include/rocksdb/immutable_options.h index 2dd50f756..49a136c07 100644 --- a/include/rocksdb/immutable_options.h +++ b/include/rocksdb/immutable_options.h @@ -5,6 +5,7 @@ #pragma once +#include #include #include "rocksdb/options.h" @@ -36,6 +37,13 @@ struct ImmutableCFOptions { CompactionFilterFactoryV2* compaction_filter_factory_v2; + bool inplace_update_support; + + UpdateStatus (*inplace_callback)(char* existing_value, + uint32_t* existing_value_size, + Slice delta_value, + std::string* merged_value); + Logger* info_log; Statistics* statistics; diff --git a/table/table_test.cc b/table/table_test.cc index 5e1bbe4cf..362905eea 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -439,7 +439,7 @@ class MemTableConstructor: public Constructor { options.memtable_factory = table_factory_; ImmutableCFOptions ioptions(options); memtable_ = new MemTable(internal_comparator_, ioptions, - MemTableOptions(MutableCFOptions(options, ioptions), options)); + MutableCFOptions(options, ioptions)); memtable_->Ref(); } ~MemTableConstructor() { @@ -455,7 +455,7 @@ class MemTableConstructor: public Constructor { options.memtable_factory = table_factory_; ImmutableCFOptions mem_ioptions(options); memtable_ = new MemTable(internal_comparator_, mem_ioptions, - MemTableOptions(MutableCFOptions(options, mem_ioptions), options)); + MutableCFOptions(options, mem_ioptions)); memtable_->Ref(); int seq = 1; for (KVMap::const_iterator it = data.begin(); @@ -1879,7 +1879,7 @@ TEST(MemTableTest, Simple) { options.memtable_factory = table_factory; ImmutableCFOptions ioptions(options); MemTable* memtable = new MemTable(cmp, ioptions, - MemTableOptions(MutableCFOptions(options, ioptions), options)); + MutableCFOptions(options, ioptions)); memtable->Ref(); WriteBatch batch; WriteBatchInternal::SetSequence(&batch, 100); diff --git a/util/mutable_cf_options.h b/util/mutable_cf_options.h index a738e7978..831b0d786 100644 --- a/util/mutable_cf_options.h +++ b/util/mutable_cf_options.h @@ -22,6 +22,7 @@ struct MutableCFOptions { options.memtable_prefix_bloom_huge_page_tlb_size), max_successive_merges(options.max_successive_merges), filter_deletes(options.filter_deletes), + inplace_update_num_locks(options.inplace_update_num_locks), disable_auto_compactions(options.disable_auto_compactions), soft_rate_limit(options.soft_rate_limit), hard_rate_limit(options.hard_rate_limit), @@ -53,6 +54,7 @@ struct MutableCFOptions { memtable_prefix_bloom_huge_page_tlb_size(0), max_successive_merges(0), filter_deletes(false), + inplace_update_num_locks(0), disable_auto_compactions(false), soft_rate_limit(0), hard_rate_limit(0), @@ -94,6 +96,7 @@ struct MutableCFOptions { size_t memtable_prefix_bloom_huge_page_tlb_size; size_t max_successive_merges; bool filter_deletes; + size_t inplace_update_num_locks; // Compaction related options bool disable_auto_compactions; diff --git a/util/options.cc b/util/options.cc index b5dc98317..03ffb0a6d 100644 --- a/util/options.cc +++ b/util/options.cc @@ -42,6 +42,8 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options) compaction_filter(options.compaction_filter), compaction_filter_factory(options.compaction_filter_factory.get()), compaction_filter_factory_v2(options.compaction_filter_factory_v2.get()), + inplace_update_support(options.inplace_update_support), + inplace_callback(options.inplace_callback), info_log(options.info_log.get()), statistics(options.statistics.get()), env(options.env), diff --git a/util/options_helper.cc b/util/options_helper.cc index 4fef52299..9b95150c5 100644 --- a/util/options_helper.cc +++ b/util/options_helper.cc @@ -94,6 +94,8 @@ bool ParseMemtableOptions(const std::string& name, const std::string& value, new_options->filter_deletes = ParseBoolean(name, value); } else if (name == "max_write_buffer_number") { new_options->max_write_buffer_number = ParseInt(value); + } else if (name == "inplace_update_num_locks") { + new_options->inplace_update_num_locks = ParseInt64(value); } else { return false; } @@ -299,14 +301,12 @@ bool GetColumnFamilyOptionsFromMap( } else if (o.first == "compaction_options_fifo") { new_options->compaction_options_fifo.max_table_files_size = ParseUint64(o.second); - } else if (o.first == "inplace_update_support") { - new_options->inplace_update_support = ParseBoolean(o.first, o.second); - } else if (o.first == "inplace_update_num_locks") { - new_options->inplace_update_num_locks = ParseInt64(o.second); } else if (o.first == "bloom_locality") { new_options->bloom_locality = ParseUint32(o.second); } else if (o.first == "min_partial_merge_operands") { new_options->min_partial_merge_operands = ParseUint32(o.second); + } else if (o.first == "inplace_update_support") { + new_options->inplace_update_support = ParseBoolean(o.first, o.second); } else { return false; }