diff --git a/db/column_family.cc b/db/column_family.cc index 95be55bbf..e11872b24 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -332,18 +332,31 @@ void ColumnFamilySet::DropColumnFamily(uint32_t id) { next->prev_.store(prev); } -MemTable* ColumnFamilyMemTablesImpl::GetMemTable(uint32_t column_family_id) { - auto cfd = column_family_set_->GetColumnFamily(column_family_id); - // TODO(icanadi): this should not be asserting. Rather, it should somehow - // return Corruption status back to the Iterator. This will require - // API change in WriteBatch::Handler, which is a public API - assert(cfd != nullptr); +bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) { + current_ = column_family_set_->GetColumnFamily(column_family_id); + handle_.id = column_family_id; + return current_ != nullptr; +} - if (log_number_ == 0 || log_number_ >= cfd->GetLogNumber()) { - return cfd->mem(); - } else { - return nullptr; - } +uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const { + assert(current_ != nullptr); + return current_->GetLogNumber(); +} + +MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const { + assert(current_ != nullptr); + return current_->mem(); +} + +const Options* ColumnFamilyMemTablesImpl::GetFullOptions() const { + assert(current_ != nullptr); + return current_->full_options(); +} + +const ColumnFamilyHandle& ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() + const { + assert(current_ != nullptr); + return handle_; } } // namespace rocksdb diff --git a/db/column_family.h b/db/column_family.h index b099e0ca2..9e42d70f8 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -238,21 +238,29 @@ class ColumnFamilySet { class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables { public: explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set) - : column_family_set_(column_family_set), log_number_(0) {} + : column_family_set_(column_family_set), current_(nullptr) {} - // If column_family_data->log_number is bigger than log_number, - // the memtable will not be returned. - // If log_number == 0, the memtable will be always returned - void SetLogNumber(uint64_t log_number) { log_number_ = log_number; } + // sets current_ to ColumnFamilyData with column_family_id + // returns false if column family doesn't exist + bool Seek(uint32_t column_family_id) override; + + // Returns log number of the selected column family + uint64_t GetLogNumber() const override; + + // REQUIRES: Seek() called first + virtual MemTable* GetMemTable() const override; - // Returns the column families memtable if log_number == 0 || log_number <= - // column_family_data->log_number. - // If column family doesn't exist, it asserts - virtual MemTable* GetMemTable(uint32_t column_family_id) override; + // Returns options for selected column family + // REQUIRES: Seek() called first + virtual const Options* GetFullOptions() const override; + + // Returns column family handle for the selected column family + virtual const ColumnFamilyHandle& GetColumnFamilyHandle() const override; private: ColumnFamilySet* column_family_set_; - uint64_t log_number_; + ColumnFamilyData* current_; + ColumnFamilyHandle handle_; }; } // namespace rocksdb diff --git a/db/db_impl.cc b/db/db_impl.cc index 64f9ee382..37900eb48 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -903,13 +903,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, } WriteBatchInternal::SetContents(&batch, record); - // filter out all the column families that have already - // flushed memtables with log_number - column_family_memtables_->SetLogNumber(log_number); - // TODO(icanadi) options_ status = WriteBatchInternal::InsertInto( - &batch, column_family_memtables_.get(), default_cfd_->full_options()); - column_family_memtables_->SetLogNumber(0); + &batch, column_family_memtables_.get(), log_number); MaybeIgnoreError(&status); if (!status.ok()) { @@ -3202,11 +3197,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // We'll need to add a spinlock for reading that we also lock when we // write to a column family (only on column family add/drop, which is // a very rare action) - // TODO(icanadi) options_ status = WriteBatchInternal::InsertInto( - updates, column_family_memtables_.get(), - default_cfd_->full_options(), this, - default_cfd_->options()->filter_deletes); + updates, column_family_memtables_.get(), 0, this, false); if (!status.ok()) { // Panic for in-memory corruptions diff --git a/db/repair.cc b/db/repair.cc index fbf947615..72387a71d 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -205,6 +205,7 @@ class Repairer { Slice record; WriteBatch batch; MemTable* mem = new MemTable(icmp_, options_); + auto cf_mems_default = new ColumnFamilyMemTablesDefault(mem, &options_); mem->Ref(); int counter = 0; while (reader.ReadRecord(&record, &scratch)) { @@ -214,7 +215,7 @@ class Repairer { continue; } WriteBatchInternal::SetContents(&batch, record); - status = WriteBatchInternal::InsertInto(&batch, mem, &options_); + status = WriteBatchInternal::InsertInto(&batch, cf_mems_default); if (status.ok()) { counter += WriteBatchInternal::Count(&batch); } else { @@ -236,6 +237,7 @@ class Repairer { kNoCompression); delete iter; delete mem->Unref(); + delete cf_mems_default; mem = nullptr; if (status.ok()) { if (meta.file_size > 0) { diff --git a/db/write_batch.cc b/db/write_batch.cc index 42990d73e..1132b3551 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -229,60 +229,41 @@ namespace { class MemTableInserter : public WriteBatch::Handler { public: SequenceNumber sequence_; - MemTable* mem_; ColumnFamilyMemTables* cf_mems_; - const Options* options_; + uint64_t log_number_; DBImpl* db_; - const bool filter_deletes_; - - MemTableInserter(SequenceNumber sequence, MemTable* mem, const Options* opts, - DB* db, const bool filter_deletes) - : sequence_(sequence), - mem_(mem), - cf_mems_(nullptr), - options_(opts), - db_(reinterpret_cast(db)), - filter_deletes_(filter_deletes) { - assert(mem_); - if (filter_deletes_) { - assert(options_); - assert(db_); - } - } + const bool dont_filter_deletes_; MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems, - const Options* opts, DB* db, const bool filter_deletes) + uint64_t log_number, DB* db, const bool dont_filter_deletes) : sequence_(sequence), - mem_(nullptr), cf_mems_(cf_mems), - options_(opts), + log_number_(log_number), db_(reinterpret_cast(db)), - filter_deletes_(filter_deletes) { + dont_filter_deletes_(dont_filter_deletes) { assert(cf_mems); - if (filter_deletes_) { - assert(options_); + if (!dont_filter_deletes_) { assert(db_); } } - // returns nullptr if the update to the column family is not needed - MemTable* GetMemTable(uint32_t column_family_id) { - if (mem_ != nullptr) { - return (column_family_id == 0) ? mem_ : nullptr; - } else { - return cf_mems_->GetMemTable(column_family_id); - } + bool IgnoreUpdate() { + return log_number_ != 0 && log_number_ < cf_mems_->GetLogNumber(); } virtual void PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) { - MemTable* mem = GetMemTable(column_family_id); - if (mem == nullptr) { + bool found = cf_mems_->Seek(column_family_id); + // TODO(icanadi) if found = false somehow return the error to caller + // Will need to change public API to do this + if (!found || IgnoreUpdate()) { return; } - if (options_->inplace_update_support && + MemTable* mem = cf_mems_->GetMemTable(); + const Options* options = cf_mems_->GetFullOptions(); + if (options->inplace_update_support && mem->Update(sequence_, kTypeValue, key, value)) { - RecordTick(options_->statistics.get(), NUMBER_KEYS_UPDATED); + RecordTick(options->statistics.get(), NUMBER_KEYS_UPDATED); } else { mem->Add(sequence_, kTypeValue, key, value); } @@ -290,20 +271,22 @@ class MemTableInserter : public WriteBatch::Handler { } virtual void MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) { - MemTable* mem = GetMemTable(column_family_id); - if (mem == nullptr) { + bool found = cf_mems_->Seek(column_family_id); + if (!found || IgnoreUpdate()) { return; } + MemTable* mem = cf_mems_->GetMemTable(); + const Options* options = cf_mems_->GetFullOptions(); bool perform_merge = false; - if (options_->max_successive_merges > 0 && db_ != nullptr) { + if (options->max_successive_merges > 0 && db_ != nullptr) { LookupKey lkey(key, sequence_); // Count the number of successive merges at the head // of the key in the memtable size_t num_merges = mem->CountSuccessiveMergeEntries(lkey); - if (num_merges >= options_->max_successive_merges) { + if (num_merges >= options->max_successive_merges) { perform_merge = true; } } @@ -319,23 +302,21 @@ class MemTableInserter : public WriteBatch::Handler { ReadOptions read_options; read_options.snapshot = &read_from_snapshot; - db_->Get(read_options, key, &get_value); + db_->Get(read_options, cf_mems_->GetColumnFamilyHandle(), key, + &get_value); Slice get_value_slice = Slice(get_value); // 2) Apply this merge - auto merge_operator = options_->merge_operator.get(); + auto merge_operator = options->merge_operator.get(); assert(merge_operator); std::deque operands; operands.push_front(value.ToString()); std::string new_value; - if (!merge_operator->FullMerge(key, - &get_value_slice, - operands, - &new_value, - options_->info_log.get())) { + if (!merge_operator->FullMerge(key, &get_value_slice, operands, + &new_value, options->info_log.get())) { // Failed to merge! - RecordTick(options_->statistics.get(), NUMBER_MERGE_FAILURES); + RecordTick(options->statistics.get(), NUMBER_MERGE_FAILURES); // Store the delta in memtable perform_merge = false; @@ -353,18 +334,21 @@ class MemTableInserter : public WriteBatch::Handler { sequence_++; } virtual void DeleteCF(uint32_t column_family_id, const Slice& key) { - MemTable* mem = GetMemTable(column_family_id); - if (mem == nullptr) { + bool found = cf_mems_->Seek(column_family_id); + if (!found || IgnoreUpdate()) { return; } - if (filter_deletes_) { + MemTable* mem = cf_mems_->GetMemTable(); + const Options* options = cf_mems_->GetFullOptions(); + if (!dont_filter_deletes_ && options->filter_deletes) { SnapshotImpl read_from_snapshot; read_from_snapshot.number_ = sequence_; ReadOptions ropts; ropts.snapshot = &read_from_snapshot; std::string value; - if (!db_->KeyMayExist(ropts, key, &value)) { - RecordTick(options_->statistics.get(), NUMBER_FILTERED_DELETES); + if (!db_->KeyMayExist(ropts, cf_mems_->GetColumnFamilyHandle(), key, + &value)) { + RecordTick(options->statistics.get(), NUMBER_FILTERED_DELETES); return; } } @@ -374,20 +358,12 @@ class MemTableInserter : public WriteBatch::Handler { }; } // namespace -Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* mem, - const Options* opts, DB* db, - const bool filter_deletes) { - MemTableInserter inserter(WriteBatchInternal::Sequence(b), mem, opts, db, - filter_deletes); - return b->Iterate(&inserter); -} - Status WriteBatchInternal::InsertInto(const WriteBatch* b, ColumnFamilyMemTables* memtables, - const Options* opts, DB* db, - const bool filter_deletes) { - MemTableInserter inserter(WriteBatchInternal::Sequence(b), memtables, opts, - db, filter_deletes); + uint64_t log_number, DB* db, + const bool dont_filter_deletes) { + MemTableInserter inserter(WriteBatchInternal::Sequence(b), memtables, + log_number, db, dont_filter_deletes); return b->Iterate(&inserter); } diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 244799fc3..20ddf80e5 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -12,6 +12,7 @@ #include "rocksdb/write_batch.h" #include "rocksdb/db.h" #include "rocksdb/options.h" +#include "rocksdb/column_family.h" namespace rocksdb { @@ -19,7 +20,47 @@ class MemTable; class ColumnFamilyMemTables { public: - virtual MemTable* GetMemTable(uint32_t column_family_id) = 0; + virtual ~ColumnFamilyMemTables() {} + virtual bool Seek(uint32_t column_family_id) = 0; + // returns true if the update to memtable should be ignored + // (useful when recovering from log whose updates have already + // been processed) + virtual uint64_t GetLogNumber() const = 0; + virtual MemTable* GetMemTable() const = 0; + virtual const Options* GetFullOptions() const = 0; + virtual const ColumnFamilyHandle& GetColumnFamilyHandle() const = 0; +}; + +class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables { + public: + ColumnFamilyMemTablesDefault(MemTable* mem, const Options* options) + : ok_(false), mem_(mem), options_(options) {} + + bool Seek(uint32_t column_family_id) override { + ok_ = (column_family_id == 0); + return ok_; + } + + uint64_t GetLogNumber() const override { return 0; } + + MemTable* GetMemTable() const override { + assert(ok_); + return mem_; + } + + const Options* GetFullOptions() const override { + assert(ok_); + return options_; + } + + const ColumnFamilyHandle& GetColumnFamilyHandle() const override { + return default_column_family; + } + + private: + bool ok_; + MemTable* mem_; + const Options* const options_; }; // WriteBatchInternal provides static methods for manipulating a @@ -50,16 +91,15 @@ class WriteBatchInternal { static void SetContents(WriteBatch* batch, const Slice& contents); // Inserts batch entries into memtable - // Drops deletes in batch if filter_del is set to true and - // db->KeyMayExist returns false - static Status InsertInto(const WriteBatch* batch, MemTable* memtable, - const Options* opts, DB* db = nullptr, - const bool filter_del = false); - + // If dont_filter_deletes is false AND options.filter_deletes is true, + // then --> Drops deletes in batch if db->KeyMayExist returns false + // If log_number is not-null, the memtable will be updated only if + // memtables->GetLogNumber() >= log_number + // See MemTableInserter::IgnoreUpdate() static Status InsertInto(const WriteBatch* batch, ColumnFamilyMemTables* memtables, - const Options* opts, DB* db = nullptr, - const bool filter_del = false); + uint64_t log_number = 0, DB* db = nullptr, + const bool dont_filter_deletes = true); static void Append(WriteBatch* dst, const WriteBatch* src); }; diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 62e197706..a2dee2959 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -27,7 +27,8 @@ static std::string PrintContents(WriteBatch* b) { MemTable* mem = new MemTable(cmp, ColumnFamilyOptions(options)); mem->Ref(); std::string state; - Status s = WriteBatchInternal::InsertInto(b, mem, &options); + ColumnFamilyMemTablesDefault cf_mems_default(mem, &options); + Status s = WriteBatchInternal::InsertInto(b, &cf_mems_default); int count = 0; Iterator* iter = mem->NewIterator(); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { diff --git a/table/table_test.cc b/table/table_test.cc index aabe2f424..5b312f272 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -1281,7 +1281,8 @@ TEST(MemTableTest, Simple) { batch.Put(std::string("k2"), std::string("v2")); batch.Put(std::string("k3"), std::string("v3")); batch.Put(std::string("largekey"), std::string("vlarge")); - ASSERT_TRUE(WriteBatchInternal::InsertInto(&batch, memtable, &options).ok()); + ColumnFamilyMemTablesDefault cf_mems_default(memtable, &options); + ASSERT_TRUE(WriteBatchInternal::InsertInto(&batch, &cf_mems_default).ok()); Iterator* iter = memtable->NewIterator(); iter->SeekToFirst();