From f24a3ee52da7ed0b10edc6486a4e58787e7112b0 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 28 Jan 2014 11:05:04 -0800 Subject: [PATCH] Read from and write to different column families Summary: This one is big. It adds ability to write to and read from different column families (see the unit test). It also supports recovery of different column families from log, which was the hardest part to reason about. We need to make sure to never delete the log file which has unflushed data from any column family. To support that, I added another concept, which is versions_->MinLogNumber() Test Plan: Added a unit test in column_family_test Reviewers: dhruba, haobo, sdong, kailiu CC: leveldb Differential Revision: https://reviews.facebook.net/D15537 --- db/column_family.cc | 17 +++- db/column_family.h | 26 ++++++ db/column_family_test.cc | 175 ++++++++++++++++++++++++++++++++++++-- db/db_impl.cc | 141 +++++++++++++++++++----------- db/db_impl.h | 6 +- db/version_set.cc | 11 +++ db/version_set.h | 17 +++- db/write_batch.cc | 71 +++++++++++++--- db/write_batch_internal.h | 10 +++ 9 files changed, 400 insertions(+), 74 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 77e224000..afbf69e9b 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -69,7 +69,8 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, options(options), mem(nullptr), imm(options.min_write_buffer_number_to_merge), - super_version(nullptr) {} + super_version(nullptr), + log_number(0) {} ColumnFamilyData::~ColumnFamilyData() { if (super_version != nullptr) { @@ -167,4 +168,18 @@ void ColumnFamilySet::DropColumnFamily(uint32_t id) { column_family_data_.erase(cfd); } +MemTable* ColumnFamilyMemTablesImpl::GetMemTable(uint32_t column_family_id) { + auto cfd = column_family_set_->GetColumnFamily(column_family_id); + // TODO(icanadi): this should not be asserting. Rather, it should somehow + // return Corruption status back to the Iterator. This will require + // API change in WriteBatch::Handler, which is a public API + assert(cfd != nullptr); + + if (log_number_ == 0 || log_number_ >= cfd->log_number) { + return cfd->mem; + } else { + return nullptr; + } +} + } // namespace rocksdb diff --git a/db/column_family.h b/db/column_family.h index b5235d3df..44e459e36 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -15,6 +15,7 @@ #include "rocksdb/options.h" #include "db/memtablelist.h" +#include "db/write_batch_internal.h" namespace rocksdb { @@ -63,6 +64,11 @@ struct ColumnFamilyData { MemTableList imm; SuperVersion* super_version; + // This is the earliest log file number that contains data from this + // Column Family. All earlier log files must be ignored and not + // recovered from + uint64_t log_number; + ColumnFamilyData(uint32_t id, const std::string& name, Version* dummy_versions, const ColumnFamilyOptions& options); ~ColumnFamilyData(); @@ -122,4 +128,24 @@ class ColumnFamilySet { uint32_t max_column_family_; }; +class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables { + public: + explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set) + : column_family_set_(column_family_set), log_number_(0) {} + + // If column_family_data->log_number is bigger than log_number, + // the memtable will not be returned. + // If log_number == 0, the memtable will be always returned + void SetLogNumber(uint64_t log_number) { log_number_ = log_number; } + + // Returns the column families memtable if log_number == 0 || log_number <= + // column_family_data->log_number. + // If column family doesn't exist, it asserts + virtual MemTable* GetMemTable(uint32_t column_family_id) override; + + private: + ColumnFamilySet* column_family_set_; + uint64_t log_number_; +}; + } // namespace rocksdb diff --git a/db/column_family_test.cc b/db/column_family_test.cc index fc278ecf3..eb48f6cff 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -8,8 +8,10 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/db_impl.h" +#include "rocksdb/env.h" #include "rocksdb/db.h" #include "util/testharness.h" +#include "utilities/merge_operators.h" #include #include @@ -22,10 +24,10 @@ using namespace std; class ColumnFamilyTest { public: ColumnFamilyTest() { + env_ = Env::Default(); dbname_ = test::TmpDir() + "/column_family_test"; db_options_.create_if_missing = true; - options_.create_if_missing = true; - DestroyDB(dbname_, options_); + DestroyDB(dbname_, Options(db_options_, column_family_options_)); } void Close() { @@ -37,18 +39,77 @@ class ColumnFamilyTest { vector column_families; for (auto x : cf) { column_families.push_back( - ColumnFamilyDescriptor(x, ColumnFamilyOptions())); + ColumnFamilyDescriptor(x, column_family_options_)); } - vector handles; return DB::OpenWithColumnFamilies(db_options_, dbname_, column_families, - &handles, &db_); + &handles_, &db_); } - Options options_; + void Destroy() { + delete db_; + db_ = nullptr; + ASSERT_OK(DestroyDB(dbname_, Options(db_options_, column_family_options_))); + } + + void CreateColumnFamilies(const vector& cfs) { + int cfi = handles_.size(); + handles_.resize(cfi + cfs.size()); + for (auto cf : cfs) { + ASSERT_OK(db_->CreateColumnFamily(column_family_options_, cf, + &handles_[cfi++])); + } + } + + Status Put(int cf, const string& key, const string& value) { + return db_->Put(WriteOptions(), handles_[cf], Slice(key), Slice(value)); + } + Status Merge(int cf, const string& key, const string& value) { + return db_->Merge(WriteOptions(), handles_[cf], Slice(key), Slice(value)); + } + + string Get(int cf, const string& key) { + ReadOptions options; + options.verify_checksums = true; + string result; + Status s = db_->Get(options, handles_[cf], Slice(key), &result); + if (s.IsNotFound()) { + result = "NOT_FOUND"; + } else if (!s.ok()) { + result = s.ToString(); + } + return result; + } + + void CopyFile(const string& source, const string& destination, + uint64_t size = 0) { + const EnvOptions soptions; + unique_ptr srcfile; + ASSERT_OK(env_->NewSequentialFile(source, &srcfile, soptions)); + unique_ptr destfile; + ASSERT_OK(env_->NewWritableFile(destination, &destfile, soptions)); + + if (size == 0) { + // default argument means copy everything + ASSERT_OK(env_->GetFileSize(source, &size)); + } + + char buffer[4096]; + Slice slice; + while (size > 0) { + uint64_t one = min(uint64_t(sizeof(buffer)), size); + ASSERT_OK(srcfile->Read(one, &slice, buffer)); + ASSERT_OK(destfile->Append(slice)); + size -= slice.size(); + } + ASSERT_OK(destfile->Close()); + } + + vector handles_; ColumnFamilyOptions column_family_options_; DBOptions db_options_; string dbname_; DB* db_; + Env* env_; }; TEST(ColumnFamilyTest, AddDrop) { @@ -74,6 +135,108 @@ TEST(ColumnFamilyTest, AddDrop) { ASSERT_TRUE(families == vector({"default", "four", "one", "three"})); } +TEST(ColumnFamilyTest, ReadWrite) { + ASSERT_OK(Open({"default"})); + CreateColumnFamilies({"one", "two"}); + Close(); + ASSERT_OK(Open({"default", "one", "two"})); + ASSERT_OK(Put(0, "foo", "v1")); + ASSERT_OK(Put(0, "bar", "v2")); + ASSERT_OK(Put(1, "mirko", "v3")); + ASSERT_OK(Put(0, "foo", "v2")); + ASSERT_OK(Put(2, "fodor", "v5")); + + for (int iter = 0; iter <= 3; ++iter) { + ASSERT_EQ("v2", Get(0, "foo")); + ASSERT_EQ("v2", Get(0, "bar")); + ASSERT_EQ("v3", Get(1, "mirko")); + ASSERT_EQ("v5", Get(2, "fodor")); + ASSERT_EQ("NOT_FOUND", Get(0, "fodor")); + ASSERT_EQ("NOT_FOUND", Get(1, "fodor")); + ASSERT_EQ("NOT_FOUND", Get(2, "foo")); + if (iter <= 1) { + // reopen + Close(); + ASSERT_OK(Open({"default", "one", "two"})); + } + } + Close(); +} + +TEST(ColumnFamilyTest, IgnoreRecoveredLog) { + string backup_logs = dbname_ + "/backup_logs"; + + // delete old files in backup_logs directory + env_->CreateDirIfMissing(backup_logs); + vector old_files; + env_->GetChildren(backup_logs, &old_files); + for (auto& file : old_files) { + if (file != "." && file != "..") { + env_->DeleteFile(backup_logs + "/" + file); + } + } + + column_family_options_.merge_operator = + MergeOperators::CreateUInt64AddOperator(); + db_options_.wal_dir = dbname_ + "/logs"; + Destroy(); + ASSERT_OK(Open({"default"})); + CreateColumnFamilies({"cf1", "cf2"}); + + // fill up the DB + string one, two, three; + PutFixed64(&one, 1); + PutFixed64(&two, 2); + PutFixed64(&three, 3); + ASSERT_OK(Merge(0, "foo", one)); + ASSERT_OK(Merge(1, "mirko", one)); + ASSERT_OK(Merge(0, "foo", one)); + ASSERT_OK(Merge(2, "bla", one)); + ASSERT_OK(Merge(2, "fodor", one)); + ASSERT_OK(Merge(0, "bar", one)); + ASSERT_OK(Merge(2, "bla", one)); + ASSERT_OK(Merge(1, "mirko", two)); + ASSERT_OK(Merge(1, "franjo", one)); + + // copy the logs to backup + vector logs; + env_->GetChildren(db_options_.wal_dir, &logs); + for (auto& log : logs) { + if (log != ".." && log != ".") { + CopyFile(db_options_.wal_dir + "/" + log, backup_logs + "/" + log); + } + } + + // recover the DB + Close(); + + // 1. check consistency + // 2. copy the logs from backup back to WAL dir. if the recovery happens + // again on the same log files, this should lead to incorrect results + // due to applying merge operator twice + // 3. check consistency + for (int iter = 0; iter < 2; ++iter) { + // assert consistency + ASSERT_OK(Open({"default", "cf1", "cf2"})); + ASSERT_EQ(two, Get(0, "foo")); + ASSERT_EQ(one, Get(0, "bar")); + ASSERT_EQ(three, Get(1, "mirko")); + ASSERT_EQ(one, Get(1, "franjo")); + ASSERT_EQ(one, Get(2, "fodor")); + ASSERT_EQ(two, Get(2, "bla")); + Close(); + + if (iter == 0) { + // copy the logs from backup back to wal dir + for (auto& log : logs) { + if (log != ".." && log != ".") { + CopyFile(backup_logs + "/" + log, db_options_.wal_dir + "/" + log); + } + } + } + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl.cc b/db/db_impl.cc index d963212df..f8f52a9b6 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -309,6 +310,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) versions_.reset(new VersionSet(dbname_, &options_, storage_options_, table_cache_.get(), &internal_comparator_)); + column_family_memtables_.reset( + new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet())); dumpLeveldbBuildVersion(options_.info_log.get()); options_.Dump(options_.info_log.get()); @@ -494,7 +497,7 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state, // store the current filenum, lognum, etc deletion_state.manifest_file_number = versions_->ManifestFileNumber(); - deletion_state.log_number = versions_->LogNumber(); + deletion_state.log_number = versions_->MinLogNumber(); deletion_state.prev_log_number = versions_->PrevLogNumber(); if (!doing_the_full_scan && !deletion_state.HaveSomethingToDelete()) { @@ -860,7 +863,7 @@ Status DBImpl::Recover( // Note that PrevLogNumber() is no longer used, but we pay // attention to it in case we are recovering a database // produced by an older version of rocksdb. - const uint64_t min_log = versions_->LogNumber(); + const uint64_t min_log = versions_->MinLogNumber(); const uint64_t prev_log = versions_->PrevLogNumber(); std::vector filenames; s = env_->GetChildren(options_.wal_dir, &filenames); @@ -924,7 +927,12 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, mutex_.AssertHeld(); - VersionEdit edit; + std::unordered_map version_edits; + for (auto cfd : *versions_->GetColumnFamilySet()) { + VersionEdit edit; + edit.SetColumnFamily(cfd->id); + version_edits.insert({cfd->id, edit}); + } // Open the log file std::string fname = LogFileName(options_.wal_dir, log_number); @@ -955,7 +963,6 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, std::string scratch; Slice record; WriteBatch batch; - bool memtable_empty = true; while (reader.ReadRecord(&record, &scratch)) { if (record.size() < 12) { reporter.Corruption( @@ -964,9 +971,13 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, } WriteBatchInternal::SetContents(&batch, record); - status = - WriteBatchInternal::InsertInto(&batch, default_cfd_->mem, &options_); - memtable_empty = false; + // filter out all the column families that have already + // flushed memtables with log_number + column_family_memtables_->SetLogNumber(log_number); + status = WriteBatchInternal::InsertInto( + &batch, column_family_memtables_.get(), &options_); + column_family_memtables_->SetLogNumber(0); + MaybeIgnoreError(&status); if (!status.ok()) { return status; @@ -978,38 +989,52 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, *max_sequence = last_seq; } - if (!read_only && default_cfd_->mem->ApproximateMemoryUsage() > - options_.write_buffer_size) { - status = WriteLevel0TableForRecovery(default_cfd_->mem, &edit); - // we still want to clear memtable, even if the recovery failed - default_cfd_->CreateNewMemtable(); - memtable_empty = true; - if (!status.ok()) { - // Reflect errors immediately so that conditions like full - // file-systems cause the DB::Open() to fail. - return status; + if (!read_only) { + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->mem->ApproximateMemoryUsage() > + cfd->options.write_buffer_size) { + auto iter = version_edits.find(cfd->id); + assert(iter != version_edits.end()); + VersionEdit* edit = &iter->second; + status = WriteLevel0TableForRecovery(cfd->mem, edit); + // we still want to clear the memtable, even if the recovery failed + cfd->CreateNewMemtable(); + if (!status.ok()) { + // Reflect errors immediately so that conditions like full + // file-systems cause the DB::Open() to fail. + return status; + } + } } } } - if (!memtable_empty && !read_only) { - status = WriteLevel0TableForRecovery(default_cfd_->mem, &edit); - default_cfd_->CreateNewMemtable(); - if (!status.ok()) { - return status; - } - } + if (!read_only) { + for (auto cfd : *versions_->GetColumnFamilySet()) { + auto iter = version_edits.find(cfd->id); + assert(iter != version_edits.end()); + VersionEdit* edit = &iter->second; + + // flush the final memtable + status = WriteLevel0TableForRecovery(cfd->mem, edit); + // we still want to clear the memtable, even if the recovery failed + cfd->CreateNewMemtable(); + if (!status.ok()) { + return status; + } - if (edit.NumEntries() > 0) { - // if read_only, NumEntries() will be 0 - assert(!read_only); - // writing log number in the manifest means that any log file - // with number strongly less than (log_number + 1) is already - // recovered and should be ignored on next reincarnation. - // Since we already recovered log_number, we want all logs - // with numbers `<= log_number` (includes this one) to be ignored - edit.SetLogNumber(log_number + 1); - status = versions_->LogAndApply(default_cfd_, &edit, &mutex_); + // write MANIFEST with update + // writing log number in the manifest means that any log file + // with number strongly less than (log_number + 1) is already + // recovered and should be ignored on next reincarnation. + // Since we already recovered log_number, we want all logs + // with numbers `<= log_number` (includes this one) to be ignored + edit->SetLogNumber(log_number + 1); + status = versions_->LogAndApply(cfd, edit, &mutex_); + if (!status.ok()) { + return status; + } + } } return status; @@ -2737,7 +2762,7 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { Status DBImpl::Get(const ReadOptions& options, const ColumnFamilyHandle& column_family, const Slice& key, std::string* value) { - return GetImpl(options, key, value); + return GetImpl(options, column_family, key, value); } // DeletionState gets created and destructed outside of the lock -- we @@ -2784,12 +2809,19 @@ SuperVersion* DBImpl::InstallSuperVersion(ColumnFamilyData* cfd, } Status DBImpl::GetImpl(const ReadOptions& options, - const Slice& key, - std::string* value, + const ColumnFamilyHandle& column_family, + const Slice& key, std::string* value, bool* value_found) { - Status s; - StopWatch sw(env_, options_.statistics.get(), DB_GET, false); + + mutex_.Lock(); + auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); + // this is asserting because client calling Get() with undefined + // ColumnFamilyHandle is undefined behavior. + assert(cfd != nullptr); + SuperVersion* get_version = cfd->super_version->Ref(); + mutex_.Unlock(); + SequenceNumber snapshot; if (options.snapshot != nullptr) { snapshot = reinterpret_cast(options.snapshot)->number_; @@ -2797,17 +2829,13 @@ Status DBImpl::GetImpl(const ReadOptions& options, snapshot = versions_->LastSequence(); } - // This can be replaced by using atomics and spinlock instead of big mutex - mutex_.Lock(); - SuperVersion* get_version = default_cfd_->super_version->Ref(); - mutex_.Unlock(); - bool have_stat_update = false; Version::GetStats stats; // Prepare to store a list of merge operations if merge occurs. MergeContext merge_context; + Status s; // First look in the memtable, then in the immutable memtable (if any). // s is both in/out. When in, s could either be OK or MergeInProgress. // merge_operands will contain the sequence of merges in the latter case. @@ -2957,6 +2985,8 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, // add to internal data structures versions_->CreateColumnFamily(options, &edit); } + Log(options_.info_log, "Created column family %s\n", + column_family_name.c_str()); return s; } @@ -2976,6 +3006,9 @@ Status DBImpl::DropColumnFamily(const ColumnFamilyHandle& column_family) { // remove from internal data structures versions_->DropColumnFamily(&edit); } + // TODO(icanadi) PurgeObsoletetFiles here + Log(options_.info_log, "Dropped column family with id %u\n", + column_family.id); return s; } @@ -2989,7 +3022,7 @@ bool DBImpl::KeyMayExist(const ReadOptions& options, } ReadOptions roptions = options; roptions.read_tier = kBlockCacheTier; // read from block cache only - auto s = GetImpl(roptions, key, value, value_found); + auto s = GetImpl(roptions, column_family, key, value, value_found); // If options.block_cache != nullptr and the index block of the table didn't // not present in block_cache, the return value will be Status::Incomplete. @@ -3102,7 +3135,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes - // into default_cfd_->mem. + // into memtables { mutex_.Unlock(); WriteBatch* updates = nullptr; @@ -3148,9 +3181,14 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } } if (status.ok()) { - status = WriteBatchInternal::InsertInto(updates, default_cfd_->mem, - &options_, this, - options_.filter_deletes); + // TODO(icanadi) this accesses column_family_set_ without any lock. + // We'll need to add a spinlock for reading that we also lock when we + // write to a column family (only on column family add/drop, which is + // a very rare action) + status = WriteBatchInternal::InsertInto( + updates, column_family_memtables_.get(), &options_, this, + options_.filter_deletes); + if (!status.ok()) { // Panic for in-memory corruptions // Note that existing logic was not sound. Any partial failure writing @@ -3995,9 +4033,12 @@ Status DB::OpenWithColumnFamilies( if (s.ok()) { lfile->SetPreallocationBlockSize(1.1 * impl->options_.write_buffer_size); VersionEdit edit; - edit.SetLogNumber(new_log_number); impl->logfile_number_ = new_log_number; impl->log_.reset(new log::Writer(std::move(lfile))); + // We use this LogAndApply just to store the next file number, the one + // that we used by calling impl->versions_->NewFileNumber() + // The used log number are already written to manifest in RecoverLogFile() + // method s = impl->versions_->LogAndApply(impl->default_cfd_, &edit, &impl->mutex_, impl->db_directory_.get()); } diff --git a/db/db_impl.h b/db/db_impl.h index f16d4a65c..1355fa498 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -399,6 +399,7 @@ class DBImpl : public DB { uint64_t logfile_number_; unique_ptr log_; ColumnFamilyData* default_cfd_; + unique_ptr column_family_memtables_; // An ordinal representing the current SuperVersion. Updated by // InstallSuperVersion(), i.e. incremented every time super_version_ @@ -603,9 +604,8 @@ class DBImpl : public DB { // Function that Get and KeyMayExist call with no_io true or false // Note: 'value_found' from KeyMayExist propagates here Status GetImpl(const ReadOptions& options, - const Slice& key, - std::string* value, - bool* value_found = nullptr); + const ColumnFamilyHandle& column_family, const Slice& key, + std::string* value, bool* value_found = nullptr); }; // Sanitize db options. The caller should delete result.info_log if diff --git a/db/version_set.cc b/db/version_set.cc index 0a717de3a..e7e479d73 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1567,6 +1567,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, manifest_file_size_ = new_manifest_file_size; AppendVersion(column_family_data, v); log_number_ = edit->log_number_; + column_family_data->log_number = edit->log_number_; prev_log_number_ = edit->prev_log_number_; } else { @@ -1753,6 +1754,10 @@ Status VersionSet::Recover( break; } + if (edit.has_log_number_) { + cfd->log_number = edit.log_number_; + } + // if it is not column family add or column family drop, // then it's a file add/delete, which should be forwarded // to builder @@ -1838,6 +1843,11 @@ Status VersionSet::Recover( (unsigned long)last_sequence_, (unsigned long)log_number_, (unsigned long)prev_log_number_); + + for (auto cfd : *column_family_set_) { + Log(options_->info_log, "Column family \"%s\", log number is %lu\n", + cfd->name.c_str(), cfd->log_number); + } } for (auto builder : builders) { @@ -2140,6 +2150,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { f->largest_seqno); } } + edit.SetLogNumber(cfd->log_number); std::string record; edit.EncodeTo(&record); Status s = log->AddRecord(record); diff --git a/db/version_set.h b/db/version_set.h index 1cda55ca1..57ea509a4 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -349,13 +349,26 @@ class VersionSet { // Mark the specified file number as used. void MarkFileNumberUsed(uint64_t number); - // Return the current log file number. + // Return the current log file number. This is the biggest log_number from + // all column families uint64_t LogNumber() const { return log_number_; } // Return the log file number for the log file that is currently // being compacted, or zero if there is no such log file. uint64_t PrevLogNumber() const { return prev_log_number_; } + // Returns the minimum log number such that all + // log numbers less than or equal to it can be deleted + uint64_t MinLogNumber() const { + uint64_t min_log_num = 0; + for (auto cfd : *column_family_set_) { + if (min_log_num == 0 || min_log_num > cfd->log_number) { + min_log_num = cfd->log_number; + } + } + return min_log_num; + } + int NumberLevels() const { return num_levels_; } // Pick level and inputs for a new compaction. @@ -433,7 +446,7 @@ class VersionSet { friend class Compaction; friend class Version; - // TODO temporarily until we have what ColumnFamilyData needs (icmp_) + // TODO(icanadi) temporarily until we have what ColumnFamilyData needs (icmp_) friend struct ColumnFamilyData; struct LogReporter : public log::Reader::Reporter { diff --git a/db/write_batch.cc b/db/write_batch.cc index af4790ce5..c6f096476 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -230,17 +230,19 @@ class MemTableInserter : public WriteBatch::Handler { public: SequenceNumber sequence_; MemTable* mem_; + ColumnFamilyMemTables* cf_mems_; const Options* options_; DBImpl* db_; const bool filter_deletes_; MemTableInserter(SequenceNumber sequence, MemTable* mem, const Options* opts, DB* db, const bool filter_deletes) - : sequence_(sequence), - mem_(mem), - options_(opts), - db_(reinterpret_cast(db)), - filter_deletes_(filter_deletes) { + : sequence_(sequence), + mem_(mem), + cf_mems_(nullptr), + options_(opts), + db_(reinterpret_cast(db)), + filter_deletes_(filter_deletes) { assert(mem_); if (filter_deletes_) { assert(options_); @@ -248,18 +250,50 @@ class MemTableInserter : public WriteBatch::Handler { } } + MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems, + const Options* opts, DB* db, const bool filter_deletes) + : sequence_(sequence), + mem_(nullptr), + cf_mems_(cf_mems), + options_(opts), + db_(reinterpret_cast(db)), + filter_deletes_(filter_deletes) { + assert(cf_mems); + if (filter_deletes_) { + assert(options_); + assert(db_); + } + } + + // returns nullptr if the update to the column family is not needed + MemTable* GetMemTable(uint32_t column_family_id) { + if (mem_ != nullptr) { + return (column_family_id == 0) ? mem_ : nullptr; + } else { + return cf_mems_->GetMemTable(column_family_id); + } + } + virtual void PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) { - if (options_->inplace_update_support - && mem_->Update(sequence_, kTypeValue, key, value)) { + MemTable* mem = GetMemTable(column_family_id); + if (mem == nullptr) { + return; + } + if (options_->inplace_update_support && + mem->Update(sequence_, kTypeValue, key, value)) { RecordTick(options_->statistics.get(), NUMBER_KEYS_UPDATED); } else { - mem_->Add(sequence_, kTypeValue, key, value); + mem->Add(sequence_, kTypeValue, key, value); } sequence_++; } virtual void MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) { + MemTable* mem = GetMemTable(column_family_id); + if (mem == nullptr) { + return; + } bool perform_merge = false; if (options_->max_successive_merges > 0 && db_ != nullptr) { @@ -267,7 +301,7 @@ class MemTableInserter : public WriteBatch::Handler { // Count the number of successive merges at the head // of the key in the memtable - size_t num_merges = mem_->CountSuccessiveMergeEntries(lkey); + size_t num_merges = mem->CountSuccessiveMergeEntries(lkey); if (num_merges >= options_->max_successive_merges) { perform_merge = true; @@ -307,18 +341,22 @@ class MemTableInserter : public WriteBatch::Handler { perform_merge = false; } else { // 3) Add value to memtable - mem_->Add(sequence_, kTypeValue, key, new_value); + mem->Add(sequence_, kTypeValue, key, new_value); } } if (!perform_merge) { // Add merge operator to memtable - mem_->Add(sequence_, kTypeMerge, key, value); + mem->Add(sequence_, kTypeMerge, key, value); } sequence_++; } virtual void DeleteCF(uint32_t column_family_id, const Slice& key) { + MemTable* mem = GetMemTable(column_family_id); + if (mem == nullptr) { + return; + } if (filter_deletes_) { SnapshotImpl read_from_snapshot; read_from_snapshot.number_ = sequence_; @@ -330,7 +368,7 @@ class MemTableInserter : public WriteBatch::Handler { return; } } - mem_->Add(sequence_, kTypeDeletion, key, Slice()); + mem->Add(sequence_, kTypeDeletion, key, Slice()); sequence_++; } }; @@ -344,6 +382,15 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* mem, return b->Iterate(&inserter); } +Status WriteBatchInternal::InsertInto(const WriteBatch* b, + ColumnFamilyMemTables* memtables, + const Options* opts, DB* db, + const bool filter_deletes) { + MemTableInserter inserter(WriteBatchInternal::Sequence(b), memtables, opts, + db, filter_deletes); + return b->Iterate(&inserter); +} + void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { assert(contents.size() >= kHeader); b->rep_.assign(contents.data(), contents.size()); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index b8991732f..244799fc3 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -17,6 +17,11 @@ namespace rocksdb { class MemTable; +class ColumnFamilyMemTables { + public: + virtual MemTable* GetMemTable(uint32_t column_family_id) = 0; +}; + // WriteBatchInternal provides static methods for manipulating a // WriteBatch that we don't want in the public WriteBatch interface. class WriteBatchInternal { @@ -51,6 +56,11 @@ class WriteBatchInternal { const Options* opts, DB* db = nullptr, const bool filter_del = false); + static Status InsertInto(const WriteBatch* batch, + ColumnFamilyMemTables* memtables, + const Options* opts, DB* db = nullptr, + const bool filter_del = false); + static void Append(WriteBatch* dst, const WriteBatch* src); };