[CF] Propagate correct options to WriteBatch::InsertInto

Summary:
WriteBatch can have multiple column families in one batch. Every column family has different options. So we have to add a way for write batch to get options for an arbitrary column family.

This required a bit more acrobatics since lots of interfaces had to be changed.

Test Plan: make check

Reviewers: dhruba, haobo, sdong, kailiu

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15957
main
Igor Canadi 11 years ago
parent b4f441f48a
commit 8fa8a708ef
  1. 33
      db/column_family.cc
  2. 28
      db/column_family.h
  3. 12
      db/db_impl.cc
  4. 4
      db/repair.cc
  5. 104
      db/write_batch.cc
  6. 58
      db/write_batch_internal.h
  7. 3
      db/write_batch_test.cc
  8. 3
      table/table_test.cc

@ -332,18 +332,31 @@ void ColumnFamilySet::DropColumnFamily(uint32_t id) {
next->prev_.store(prev); next->prev_.store(prev);
} }
MemTable* ColumnFamilyMemTablesImpl::GetMemTable(uint32_t column_family_id) { bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
auto cfd = column_family_set_->GetColumnFamily(column_family_id); current_ = column_family_set_->GetColumnFamily(column_family_id);
// TODO(icanadi): this should not be asserting. Rather, it should somehow handle_.id = column_family_id;
// return Corruption status back to the Iterator. This will require return current_ != nullptr;
// API change in WriteBatch::Handler, which is a public API }
assert(cfd != nullptr);
if (log_number_ == 0 || log_number_ >= cfd->GetLogNumber()) { uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
return cfd->mem(); assert(current_ != nullptr);
} else { return current_->GetLogNumber();
return nullptr;
} }
MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
assert(current_ != nullptr);
return current_->mem();
}
const Options* ColumnFamilyMemTablesImpl::GetFullOptions() const {
assert(current_ != nullptr);
return current_->full_options();
}
const ColumnFamilyHandle& ColumnFamilyMemTablesImpl::GetColumnFamilyHandle()
const {
assert(current_ != nullptr);
return handle_;
} }
} // namespace rocksdb } // namespace rocksdb

@ -238,21 +238,29 @@ class ColumnFamilySet {
class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables { class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {
public: public:
explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set) explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set)
: column_family_set_(column_family_set), log_number_(0) {} : column_family_set_(column_family_set), current_(nullptr) {}
// If column_family_data->log_number is bigger than log_number, // sets current_ to ColumnFamilyData with column_family_id
// the memtable will not be returned. // returns false if column family doesn't exist
// If log_number == 0, the memtable will be always returned bool Seek(uint32_t column_family_id) override;
void SetLogNumber(uint64_t log_number) { log_number_ = log_number; }
// Returns log number of the selected column family
uint64_t GetLogNumber() const override;
// REQUIRES: Seek() called first
virtual MemTable* GetMemTable() const override;
// Returns the column families memtable if log_number == 0 || log_number <= // Returns options for selected column family
// column_family_data->log_number. // REQUIRES: Seek() called first
// If column family doesn't exist, it asserts virtual const Options* GetFullOptions() const override;
virtual MemTable* GetMemTable(uint32_t column_family_id) override;
// Returns column family handle for the selected column family
virtual const ColumnFamilyHandle& GetColumnFamilyHandle() const override;
private: private:
ColumnFamilySet* column_family_set_; ColumnFamilySet* column_family_set_;
uint64_t log_number_; ColumnFamilyData* current_;
ColumnFamilyHandle handle_;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -903,13 +903,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
} }
WriteBatchInternal::SetContents(&batch, record); WriteBatchInternal::SetContents(&batch, record);
// filter out all the column families that have already
// flushed memtables with log_number
column_family_memtables_->SetLogNumber(log_number);
// TODO(icanadi) options_
status = WriteBatchInternal::InsertInto( status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), default_cfd_->full_options()); &batch, column_family_memtables_.get(), log_number);
column_family_memtables_->SetLogNumber(0);
MaybeIgnoreError(&status); MaybeIgnoreError(&status);
if (!status.ok()) { if (!status.ok()) {
@ -3202,11 +3197,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
// We'll need to add a spinlock for reading that we also lock when we // We'll need to add a spinlock for reading that we also lock when we
// write to a column family (only on column family add/drop, which is // write to a column family (only on column family add/drop, which is
// a very rare action) // a very rare action)
// TODO(icanadi) options_
status = WriteBatchInternal::InsertInto( status = WriteBatchInternal::InsertInto(
updates, column_family_memtables_.get(), updates, column_family_memtables_.get(), 0, this, false);
default_cfd_->full_options(), this,
default_cfd_->options()->filter_deletes);
if (!status.ok()) { if (!status.ok()) {
// Panic for in-memory corruptions // Panic for in-memory corruptions

@ -205,6 +205,7 @@ class Repairer {
Slice record; Slice record;
WriteBatch batch; WriteBatch batch;
MemTable* mem = new MemTable(icmp_, options_); MemTable* mem = new MemTable(icmp_, options_);
auto cf_mems_default = new ColumnFamilyMemTablesDefault(mem, &options_);
mem->Ref(); mem->Ref();
int counter = 0; int counter = 0;
while (reader.ReadRecord(&record, &scratch)) { while (reader.ReadRecord(&record, &scratch)) {
@ -214,7 +215,7 @@ class Repairer {
continue; continue;
} }
WriteBatchInternal::SetContents(&batch, record); WriteBatchInternal::SetContents(&batch, record);
status = WriteBatchInternal::InsertInto(&batch, mem, &options_); status = WriteBatchInternal::InsertInto(&batch, cf_mems_default);
if (status.ok()) { if (status.ok()) {
counter += WriteBatchInternal::Count(&batch); counter += WriteBatchInternal::Count(&batch);
} else { } else {
@ -236,6 +237,7 @@ class Repairer {
kNoCompression); kNoCompression);
delete iter; delete iter;
delete mem->Unref(); delete mem->Unref();
delete cf_mems_default;
mem = nullptr; mem = nullptr;
if (status.ok()) { if (status.ok()) {
if (meta.file_size > 0) { if (meta.file_size > 0) {

@ -229,60 +229,41 @@ namespace {
class MemTableInserter : public WriteBatch::Handler { class MemTableInserter : public WriteBatch::Handler {
public: public:
SequenceNumber sequence_; SequenceNumber sequence_;
MemTable* mem_;
ColumnFamilyMemTables* cf_mems_; ColumnFamilyMemTables* cf_mems_;
const Options* options_; uint64_t log_number_;
DBImpl* db_; DBImpl* db_;
const bool filter_deletes_; const bool dont_filter_deletes_;
MemTableInserter(SequenceNumber sequence, MemTable* mem, const Options* opts,
DB* db, const bool filter_deletes)
: sequence_(sequence),
mem_(mem),
cf_mems_(nullptr),
options_(opts),
db_(reinterpret_cast<DBImpl*>(db)),
filter_deletes_(filter_deletes) {
assert(mem_);
if (filter_deletes_) {
assert(options_);
assert(db_);
}
}
MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems, MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems,
const Options* opts, DB* db, const bool filter_deletes) uint64_t log_number, DB* db, const bool dont_filter_deletes)
: sequence_(sequence), : sequence_(sequence),
mem_(nullptr),
cf_mems_(cf_mems), cf_mems_(cf_mems),
options_(opts), log_number_(log_number),
db_(reinterpret_cast<DBImpl*>(db)), db_(reinterpret_cast<DBImpl*>(db)),
filter_deletes_(filter_deletes) { dont_filter_deletes_(dont_filter_deletes) {
assert(cf_mems); assert(cf_mems);
if (filter_deletes_) { if (!dont_filter_deletes_) {
assert(options_);
assert(db_); assert(db_);
} }
} }
// returns nullptr if the update to the column family is not needed bool IgnoreUpdate() {
MemTable* GetMemTable(uint32_t column_family_id) { return log_number_ != 0 && log_number_ < cf_mems_->GetLogNumber();
if (mem_ != nullptr) {
return (column_family_id == 0) ? mem_ : nullptr;
} else {
return cf_mems_->GetMemTable(column_family_id);
}
} }
virtual void PutCF(uint32_t column_family_id, const Slice& key, virtual void PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) { const Slice& value) {
MemTable* mem = GetMemTable(column_family_id); bool found = cf_mems_->Seek(column_family_id);
if (mem == nullptr) { // TODO(icanadi) if found = false somehow return the error to caller
// Will need to change public API to do this
if (!found || IgnoreUpdate()) {
return; return;
} }
if (options_->inplace_update_support && MemTable* mem = cf_mems_->GetMemTable();
const Options* options = cf_mems_->GetFullOptions();
if (options->inplace_update_support &&
mem->Update(sequence_, kTypeValue, key, value)) { mem->Update(sequence_, kTypeValue, key, value)) {
RecordTick(options_->statistics.get(), NUMBER_KEYS_UPDATED); RecordTick(options->statistics.get(), NUMBER_KEYS_UPDATED);
} else { } else {
mem->Add(sequence_, kTypeValue, key, value); mem->Add(sequence_, kTypeValue, key, value);
} }
@ -290,20 +271,22 @@ class MemTableInserter : public WriteBatch::Handler {
} }
virtual void MergeCF(uint32_t column_family_id, const Slice& key, virtual void MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) { const Slice& value) {
MemTable* mem = GetMemTable(column_family_id); bool found = cf_mems_->Seek(column_family_id);
if (mem == nullptr) { if (!found || IgnoreUpdate()) {
return; return;
} }
MemTable* mem = cf_mems_->GetMemTable();
const Options* options = cf_mems_->GetFullOptions();
bool perform_merge = false; bool perform_merge = false;
if (options_->max_successive_merges > 0 && db_ != nullptr) { if (options->max_successive_merges > 0 && db_ != nullptr) {
LookupKey lkey(key, sequence_); LookupKey lkey(key, sequence_);
// Count the number of successive merges at the head // Count the number of successive merges at the head
// of the key in the memtable // of the key in the memtable
size_t num_merges = mem->CountSuccessiveMergeEntries(lkey); size_t num_merges = mem->CountSuccessiveMergeEntries(lkey);
if (num_merges >= options_->max_successive_merges) { if (num_merges >= options->max_successive_merges) {
perform_merge = true; perform_merge = true;
} }
} }
@ -319,23 +302,21 @@ class MemTableInserter : public WriteBatch::Handler {
ReadOptions read_options; ReadOptions read_options;
read_options.snapshot = &read_from_snapshot; read_options.snapshot = &read_from_snapshot;
db_->Get(read_options, key, &get_value); db_->Get(read_options, cf_mems_->GetColumnFamilyHandle(), key,
&get_value);
Slice get_value_slice = Slice(get_value); Slice get_value_slice = Slice(get_value);
// 2) Apply this merge // 2) Apply this merge
auto merge_operator = options_->merge_operator.get(); auto merge_operator = options->merge_operator.get();
assert(merge_operator); assert(merge_operator);
std::deque<std::string> operands; std::deque<std::string> operands;
operands.push_front(value.ToString()); operands.push_front(value.ToString());
std::string new_value; std::string new_value;
if (!merge_operator->FullMerge(key, if (!merge_operator->FullMerge(key, &get_value_slice, operands,
&get_value_slice, &new_value, options->info_log.get())) {
operands,
&new_value,
options_->info_log.get())) {
// Failed to merge! // Failed to merge!
RecordTick(options_->statistics.get(), NUMBER_MERGE_FAILURES); RecordTick(options->statistics.get(), NUMBER_MERGE_FAILURES);
// Store the delta in memtable // Store the delta in memtable
perform_merge = false; perform_merge = false;
@ -353,18 +334,21 @@ class MemTableInserter : public WriteBatch::Handler {
sequence_++; sequence_++;
} }
virtual void DeleteCF(uint32_t column_family_id, const Slice& key) { virtual void DeleteCF(uint32_t column_family_id, const Slice& key) {
MemTable* mem = GetMemTable(column_family_id); bool found = cf_mems_->Seek(column_family_id);
if (mem == nullptr) { if (!found || IgnoreUpdate()) {
return; return;
} }
if (filter_deletes_) { MemTable* mem = cf_mems_->GetMemTable();
const Options* options = cf_mems_->GetFullOptions();
if (!dont_filter_deletes_ && options->filter_deletes) {
SnapshotImpl read_from_snapshot; SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_; read_from_snapshot.number_ = sequence_;
ReadOptions ropts; ReadOptions ropts;
ropts.snapshot = &read_from_snapshot; ropts.snapshot = &read_from_snapshot;
std::string value; std::string value;
if (!db_->KeyMayExist(ropts, key, &value)) { if (!db_->KeyMayExist(ropts, cf_mems_->GetColumnFamilyHandle(), key,
RecordTick(options_->statistics.get(), NUMBER_FILTERED_DELETES); &value)) {
RecordTick(options->statistics.get(), NUMBER_FILTERED_DELETES);
return; return;
} }
} }
@ -374,20 +358,12 @@ class MemTableInserter : public WriteBatch::Handler {
}; };
} // namespace } // namespace
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* mem,
const Options* opts, DB* db,
const bool filter_deletes) {
MemTableInserter inserter(WriteBatchInternal::Sequence(b), mem, opts, db,
filter_deletes);
return b->Iterate(&inserter);
}
Status WriteBatchInternal::InsertInto(const WriteBatch* b, Status WriteBatchInternal::InsertInto(const WriteBatch* b,
ColumnFamilyMemTables* memtables, ColumnFamilyMemTables* memtables,
const Options* opts, DB* db, uint64_t log_number, DB* db,
const bool filter_deletes) { const bool dont_filter_deletes) {
MemTableInserter inserter(WriteBatchInternal::Sequence(b), memtables, opts, MemTableInserter inserter(WriteBatchInternal::Sequence(b), memtables,
db, filter_deletes); log_number, db, dont_filter_deletes);
return b->Iterate(&inserter); return b->Iterate(&inserter);
} }

@ -12,6 +12,7 @@
#include "rocksdb/write_batch.h" #include "rocksdb/write_batch.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/column_family.h"
namespace rocksdb { namespace rocksdb {
@ -19,7 +20,47 @@ class MemTable;
class ColumnFamilyMemTables { class ColumnFamilyMemTables {
public: public:
virtual MemTable* GetMemTable(uint32_t column_family_id) = 0; virtual ~ColumnFamilyMemTables() {}
virtual bool Seek(uint32_t column_family_id) = 0;
// returns true if the update to memtable should be ignored
// (useful when recovering from log whose updates have already
// been processed)
virtual uint64_t GetLogNumber() const = 0;
virtual MemTable* GetMemTable() const = 0;
virtual const Options* GetFullOptions() const = 0;
virtual const ColumnFamilyHandle& GetColumnFamilyHandle() const = 0;
};
class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables {
public:
ColumnFamilyMemTablesDefault(MemTable* mem, const Options* options)
: ok_(false), mem_(mem), options_(options) {}
bool Seek(uint32_t column_family_id) override {
ok_ = (column_family_id == 0);
return ok_;
}
uint64_t GetLogNumber() const override { return 0; }
MemTable* GetMemTable() const override {
assert(ok_);
return mem_;
}
const Options* GetFullOptions() const override {
assert(ok_);
return options_;
}
const ColumnFamilyHandle& GetColumnFamilyHandle() const override {
return default_column_family;
}
private:
bool ok_;
MemTable* mem_;
const Options* const options_;
}; };
// WriteBatchInternal provides static methods for manipulating a // WriteBatchInternal provides static methods for manipulating a
@ -50,16 +91,15 @@ class WriteBatchInternal {
static void SetContents(WriteBatch* batch, const Slice& contents); static void SetContents(WriteBatch* batch, const Slice& contents);
// Inserts batch entries into memtable // Inserts batch entries into memtable
// Drops deletes in batch if filter_del is set to true and // If dont_filter_deletes is false AND options.filter_deletes is true,
// db->KeyMayExist returns false // then --> Drops deletes in batch if db->KeyMayExist returns false
static Status InsertInto(const WriteBatch* batch, MemTable* memtable, // If log_number is not-null, the memtable will be updated only if
const Options* opts, DB* db = nullptr, // memtables->GetLogNumber() >= log_number
const bool filter_del = false); // See MemTableInserter::IgnoreUpdate()
static Status InsertInto(const WriteBatch* batch, static Status InsertInto(const WriteBatch* batch,
ColumnFamilyMemTables* memtables, ColumnFamilyMemTables* memtables,
const Options* opts, DB* db = nullptr, uint64_t log_number = 0, DB* db = nullptr,
const bool filter_del = false); const bool dont_filter_deletes = true);
static void Append(WriteBatch* dst, const WriteBatch* src); static void Append(WriteBatch* dst, const WriteBatch* src);
}; };

@ -27,7 +27,8 @@ static std::string PrintContents(WriteBatch* b) {
MemTable* mem = new MemTable(cmp, ColumnFamilyOptions(options)); MemTable* mem = new MemTable(cmp, ColumnFamilyOptions(options));
mem->Ref(); mem->Ref();
std::string state; std::string state;
Status s = WriteBatchInternal::InsertInto(b, mem, &options); ColumnFamilyMemTablesDefault cf_mems_default(mem, &options);
Status s = WriteBatchInternal::InsertInto(b, &cf_mems_default);
int count = 0; int count = 0;
Iterator* iter = mem->NewIterator(); Iterator* iter = mem->NewIterator();
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {

@ -1281,7 +1281,8 @@ TEST(MemTableTest, Simple) {
batch.Put(std::string("k2"), std::string("v2")); batch.Put(std::string("k2"), std::string("v2"));
batch.Put(std::string("k3"), std::string("v3")); batch.Put(std::string("k3"), std::string("v3"));
batch.Put(std::string("largekey"), std::string("vlarge")); batch.Put(std::string("largekey"), std::string("vlarge"));
ASSERT_TRUE(WriteBatchInternal::InsertInto(&batch, memtable, &options).ok()); ColumnFamilyMemTablesDefault cf_mems_default(memtable, &options);
ASSERT_TRUE(WriteBatchInternal::InsertInto(&batch, &cf_mems_default).ok());
Iterator* iter = memtable->NewIterator(); Iterator* iter = memtable->NewIterator();
iter->SeekToFirst(); iter->SeekToFirst();

Loading…
Cancel
Save