From 3615f534d13684b5673126d80b7279381191fc1b Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Thu, 30 Jan 2014 17:48:42 -0800 Subject: [PATCH] Enable flushing memtables from arbitrary column families Summary: Removed default_cfd_ from all flush code paths. This means we can now flush memtables from arbitrary column families! Test Plan: Added a new unit test Reviewers: dhruba, haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D15789 --- db/column_family_test.cc | 37 ++++++++++ db/db_impl.cc | 151 ++++++++++++++++++++++++++------------- db/db_impl.h | 10 +-- db/version_set.cc | 37 +++++----- db/version_set.h | 9 +-- 5 files changed, 162 insertions(+), 82 deletions(-) diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 553c8832f..1e67ce31c 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -66,6 +66,9 @@ class ColumnFamilyTest { Status Merge(int cf, const string& key, const string& value) { return db_->Merge(WriteOptions(), handles_[cf], Slice(key), Slice(value)); } + Status Flush(int cf) { + return db_->Flush(FlushOptions(), handles_[cf]); + } string Get(int cf, const string& key) { ReadOptions options; @@ -238,6 +241,40 @@ TEST(ColumnFamilyTest, IgnoreRecoveredLog) { } } +TEST(ColumnFamilyTest, FlushTest) { + ASSERT_OK(Open({"default"})); + CreateColumnFamilies({"one", "two"}); + Close(); + ASSERT_OK(Open({"default", "one", "two"})); + ASSERT_OK(Put(0, "foo", "v1")); + ASSERT_OK(Put(0, "bar", "v2")); + ASSERT_OK(Put(1, "mirko", "v3")); + ASSERT_OK(Put(0, "foo", "v2")); + ASSERT_OK(Put(2, "fodor", "v5")); + for (int i = 0; i < 3; ++i) { + Flush(i); + } + Close(); + ASSERT_OK(Open({"default", "one", "two"})); + + for (int iter = 0; iter <= 2; ++iter) { + ASSERT_EQ("v2", Get(0, "foo")); + ASSERT_EQ("v2", Get(0, "bar")); + ASSERT_EQ("v3", Get(1, "mirko")); + ASSERT_EQ("v5", Get(2, "fodor")); + ASSERT_EQ("NOT_FOUND", Get(0, "fodor")); + ASSERT_EQ("NOT_FOUND", Get(1, "fodor")); + ASSERT_EQ("NOT_FOUND", Get(2, "foo")); + if (iter <= 1) { + // reopen + Close(); + ASSERT_OK(Open({"default", "one", "two"})); + } + } + Close(); +} + + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 9b71af659..702dade25 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -317,8 +317,12 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) DBImpl::~DBImpl() { // Wait for background work to finish - if (flush_on_destroy_ && default_cfd_->mem()->GetFirstSequenceNumber() != 0) { - FlushMemTable(FlushOptions()); + if (flush_on_destroy_) { + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->mem()->GetFirstSequenceNumber() != 0) { + FlushMemTable(cfd, FlushOptions()); + } + } } mutex_.Lock(); shutting_down_.Release_Store(this); // Any non-nullptr value is ok @@ -979,6 +983,9 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->mem()->ApproximateMemoryUsage() > cfd->options()->write_buffer_size) { + // If this asserts, it means that ColumnFamilyMemTablesImpl failed in + // filtering updates to already-flushed column families + assert(cfd->GetLogNumber() <= log_number); auto iter = version_edits.find(cfd->GetID()); assert(iter != version_edits.end()); VersionEdit* edit = &iter->second; @@ -1001,8 +1008,20 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, assert(iter != version_edits.end()); VersionEdit* edit = &iter->second; - // flush the final memtable - status = WriteLevel0TableForRecovery(cfd->mem(), edit); + if (cfd->GetLogNumber() > log_number) { + // Column family cfd has already flushed the data + // from log_number. Memtable has to be empty because + // we filter the updates based on log_number + // (in ColumnFamilyMemTablesImpl) + assert(cfd->mem()->GetFirstSequenceNumber() == 0); + assert(edit->NumEntries() == 0); + continue; + } + + // flush the final memtable (if non-empty) + if (cfd->mem()->GetFirstSequenceNumber() != 0) { + status = WriteLevel0TableForRecovery(cfd->mem(), edit); + } // we still want to clear the memtable, even if the recovery failed cfd->CreateNewMemtable(); if (!status.ok()) { @@ -1016,6 +1035,12 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, // Since we already recovered log_number, we want all logs // with numbers `<= log_number` (includes this one) to be ignored edit->SetLogNumber(log_number + 1); + // we must mark the next log number as used, even though it's + // not actually used. that is because VersionSet assumes + // VersionSet::next_file_number_ always to be strictly greater than any + // log + // number + versions_->MarkFileNumberUsed(log_number + 1); status = versions_->LogAndApply(cfd, edit, &mutex_); if (!status.ok()) { return status; @@ -1077,8 +1102,8 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) { return s; } - -Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, +Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, + std::vector& mems, VersionEdit* edit, uint64_t* filenumber) { mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); @@ -1090,7 +1115,7 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, const SequenceNumber newest_snapshot = snapshots_.GetNewest(); const SequenceNumber earliest_seqno_in_memtable = mems[0]->GetFirstSequenceNumber(); - Version* base = default_cfd_->current(); + Version* base = cfd->current(); base->Ref(); // it is likely that we do not need this reference Status s; { @@ -1127,7 +1152,7 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, // re-acquire the most current version - base = default_cfd_->current(); + base = cfd->current(); // There could be multiple threads writing to its own level-0 file. // The pending_outputs cannot be cleared here, otherwise this newly @@ -1149,7 +1174,7 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, // threads could be concurrently producing compacted files for // that key range. if (base != nullptr && options_.max_background_compactions <= 1 && - options_.compaction_style == kCompactionStyleLevel) { + cfd->options()->compaction_style == kCompactionStyleLevel) { level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } edit->AddFile(level, meta.number, meta.file_size, @@ -1165,12 +1190,13 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, return s; } -Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, +Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, + bool* madeProgress, DeletionState& deletion_state) { mutex_.AssertHeld(); - assert(default_cfd_->imm()->size() != 0); + assert(cfd->imm()->size() != 0); - if (!default_cfd_->imm()->IsFlushPending()) { + if (!cfd->imm()->IsFlushPending()) { Log(options_.info_log, "FlushMemTableToOutputFile already in progress"); return Status::IOError("FlushMemTableToOutputFile already in progress"); } @@ -1178,7 +1204,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, // Save the contents of the earliest memtable as a new Table uint64_t file_number; std::vector mems; - default_cfd_->imm()->PickMemtablesToFlush(&mems); + cfd->imm()->PickMemtablesToFlush(&mems); if (mems.empty()) { Log(options_.info_log, "Nothing in memstore to flush"); return Status::IOError("Nothing in memstore to flush"); @@ -1193,9 +1219,8 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, edit->SetPrevLogNumber(0); // SetLogNumber(log_num) indicates logs with number smaller than log_num // will no longer be picked up for recovery. - edit->SetLogNumber( - mems.back()->GetNextLogNumber() - ); + edit->SetLogNumber(mems.back()->GetNextLogNumber()); + edit->SetColumnFamily(cfd->GetID()); std::vector logs_to_delete; for (auto mem : mems) { @@ -1203,7 +1228,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, } // This will release and re-acquire the mutex. - Status s = WriteLevel0Table(mems, edit, &file_number); + Status s = WriteLevel0Table(cfd, mems, edit, &file_number); if (s.ok() && shutting_down_.Acquire_Load()) { s = Status::IOError( @@ -1212,13 +1237,13 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, } // Replace immutable memtable with the generated Table - s = default_cfd_->imm()->InstallMemtableFlushResults( - default_cfd_, mems, versions_.get(), s, &mutex_, options_.info_log.get(), + s = cfd->imm()->InstallMemtableFlushResults( + cfd, mems, versions_.get(), s, &mutex_, options_.info_log.get(), file_number, pending_outputs_, &deletion_state.memtables_to_free, db_directory_.get()); if (s.ok()) { - InstallSuperVersion(default_cfd_, deletion_state); + InstallSuperVersion(cfd, deletion_state); if (madeProgress) { *madeProgress = 1; } @@ -1239,7 +1264,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family, const Slice* begin, const Slice* end, bool reduce_level, int target_level) { - Status s = FlushMemTable(FlushOptions()); + Status s = FlushMemTable(default_cfd_, FlushOptions()); if (!s.ok()) { LogFlush(options_.info_log); return s; @@ -1382,8 +1407,12 @@ uint64_t DBImpl::CurrentVersionNumber() const { Status DBImpl::Flush(const FlushOptions& options, const ColumnFamilyHandle& column_family) { - Status status = FlushMemTable(options); - return status; + mutex_.Lock(); + auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); + mutex_.Unlock(); + assert(cfd != nullptr); + + return FlushMemTable(cfd, options); } SequenceNumber DBImpl::GetLatestSequenceNumber() const { @@ -1657,35 +1686,36 @@ Status DBImpl::TEST_CompactRange(int level, return RunManualCompaction(level, output_level, begin, end); } -Status DBImpl::FlushMemTable(const FlushOptions& options) { +Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, + const FlushOptions& options) { // nullptr batch means just wait for earlier writes to be done Status s = Write(WriteOptions(), nullptr); if (s.ok() && options.wait) { // Wait until the compaction completes - s = WaitForFlushMemTable(); + s = WaitForFlushMemTable(cfd); } return s; } -Status DBImpl::WaitForFlushMemTable() { +Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) { Status s; // Wait until the compaction completes MutexLock l(&mutex_); - while (default_cfd_->imm()->size() > 0 && bg_error_.ok()) { + while (cfd->imm()->size() > 0 && bg_error_.ok()) { bg_cv_.Wait(); } - if (default_cfd_->imm()->size() != 0) { + if (!bg_error_.ok()) { s = bg_error_; } return s; } Status DBImpl::TEST_FlushMemTable() { - return FlushMemTable(FlushOptions()); + return FlushMemTable(default_cfd_, FlushOptions()); } Status DBImpl::TEST_WaitForFlushMemTable() { - return WaitForFlushMemTable(); + return WaitForFlushMemTable(default_cfd_); } Status DBImpl::TEST_WaitForCompact() { @@ -1710,19 +1740,31 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions } else { - bool is_flush_pending = default_cfd_->imm()->IsFlushPending(); + bool is_flush_pending = false; + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->imm()->IsFlushPending()) { + is_flush_pending = true; + } + } if (is_flush_pending && (bg_flush_scheduled_ < options_.max_background_flushes)) { // memtable flush needed bg_flush_scheduled_++; env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH); } + bool is_compaction_needed = false; + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->current()->NeedsCompaction()) { + is_compaction_needed = true; + break; + } + } // Schedule BGWorkCompaction if there's a compaction pending (or a memtable // flush, but the HIGH pool is not enabled). Do it only if // max_background_compactions hasn't been reached and, in case // bg_manual_only_ > 0, if it's a manual compaction. - if ((manual_compaction_ || default_cfd_->current()->NeedsCompaction() || + if ((manual_compaction_ || is_compaction_needed || (is_flush_pending && (options_.max_background_flushes <= 0))) && bg_compaction_scheduled_ < options_.max_background_compactions && (!bg_manual_only_ || manual_compaction_)) { @@ -1744,11 +1786,14 @@ void DBImpl::BGWorkCompaction(void* db) { Status DBImpl::BackgroundFlush(bool* madeProgress, DeletionState& deletion_state) { Status stat; - while (stat.ok() && default_cfd_->imm()->IsFlushPending()) { - Log(options_.info_log, - "BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d", - options_.max_background_flushes - bg_flush_scheduled_); - stat = FlushMemTableToOutputFile(madeProgress, deletion_state); + for (auto cfd : *versions_->GetColumnFamilySet()) { + while (stat.ok() && cfd->imm()->IsFlushPending()) { + Log(options_.info_log, + "BackgroundCallFlush doing FlushMemTableToOutputFile with column " + "family %u, flush slots available %d", + cfd->GetID(), options_.max_background_flushes - bg_flush_scheduled_); + stat = FlushMemTableToOutputFile(cfd, madeProgress, deletion_state); + } } return stat; } @@ -1871,20 +1916,24 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, } // TODO: remove memtable flush from formal compaction - while (default_cfd_->imm()->IsFlushPending()) { - Log(options_.info_log, - "BackgroundCompaction doing FlushMemTableToOutputFile, compaction slots " - "available %d", - options_.max_background_compactions - bg_compaction_scheduled_); - Status stat = FlushMemTableToOutputFile(madeProgress, deletion_state); - if (!stat.ok()) { - if (is_manual) { - manual_compaction_->status = stat; - manual_compaction_->done = true; - manual_compaction_->in_progress = false; - manual_compaction_ = nullptr; + for (auto cfd : *versions_->GetColumnFamilySet()) { + while (cfd->imm()->IsFlushPending()) { + Log(options_.info_log, + "BackgroundCompaction doing FlushMemTableToOutputFile with column " + "family %d, compaction slots available %d", + cfd->GetID(), + options_.max_background_compactions - bg_compaction_scheduled_); + Status stat = + FlushMemTableToOutputFile(cfd, madeProgress, deletion_state); + if (!stat.ok()) { + if (is_manual) { + manual_compaction_->status = stat; + manual_compaction_->done = true; + manual_compaction_->in_progress = false; + manual_compaction_ = nullptr; + } + return stat; } - return stat; } } @@ -2285,7 +2334,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, LogFlush(options_.info_log); mutex_.Lock(); if (default_cfd_->imm()->IsFlushPending()) { - FlushMemTableToOutputFile(nullptr, deletion_state); + FlushMemTableToOutputFile(default_cfd_, nullptr, deletion_state); bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary } mutex_.Unlock(); diff --git a/db/db_impl.h b/db/db_impl.h index e144582e7..69cfba42a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -286,7 +286,7 @@ class DBImpl : public DB { // Flush the in-memory write buffer to storage. Switches to a new // log-file/memtable and writes a new descriptor iff successful. - Status FlushMemTableToOutputFile(bool* madeProgress, + Status FlushMemTableToOutputFile(ColumnFamilyData* cfd, bool* madeProgress, DeletionState& deletion_state); Status RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, @@ -298,8 +298,8 @@ class DBImpl : public DB { // for the entire period. The second method WriteLevel0Table supports // concurrent flush memtables to storage. Status WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit); - Status WriteLevel0Table(std::vector &mems, VersionEdit* edit, - uint64_t* filenumber); + Status WriteLevel0Table(ColumnFamilyData* cfd, std::vector& mems, + VersionEdit* edit, uint64_t* filenumber); uint64_t SlowdownAmount(int n, double bottom, double top); Status MakeRoomForWrite(ColumnFamilyData* cfd, @@ -308,10 +308,10 @@ class DBImpl : public DB { autovector* write_batch_group); // Force current memtable contents to be flushed. - Status FlushMemTable(const FlushOptions& options); + Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options); // Wait for memtable flushed - Status WaitForFlushMemTable(); + Status WaitForFlushMemTable(ColumnFamilyData* cfd); void MaybeScheduleLogDBDeployStats(); static void BGLogDBDeployStats(void* db); diff --git a/db/version_set.cc b/db/version_set.cc index fd03e9982..ae00309e1 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1130,10 +1130,12 @@ struct VersionSet::ManifestWriter { Status status; bool done; port::CondVar cv; + ColumnFamilyData* cfd; VersionEdit* edit; - explicit ManifestWriter(port::Mutex* mu, VersionEdit* e) : - done(false), cv(mu), edit(e) {} + explicit ManifestWriter(port::Mutex* mu, ColumnFamilyData* cfd, + VersionEdit* e) + : done(false), cv(mu), cfd(cfd), edit(e) {} }; // A helper class so we can efficiently apply a whole sequence @@ -1374,7 +1376,6 @@ VersionSet::VersionSet(const std::string& dbname, const Options* options, next_file_number_(2), manifest_file_number_(0), // Filled by Recover() last_sequence_(0), - log_number_(0), prev_log_number_(0), num_levels_(options_->num_levels), current_version_number_(0), @@ -1428,7 +1429,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, mu->AssertHeld(); // queue our request - ManifestWriter w(mu, edit); + ManifestWriter w(mu, column_family_data, edit); manifest_writers_.push_back(&w); while (!w.done && &w != manifest_writers_.front()) { w.cv.Wait(); @@ -1447,8 +1448,12 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, assert(manifest_writers_.front() == &w); std::deque::iterator iter = manifest_writers_.begin(); for (; iter != manifest_writers_.end(); ++iter) { + if ((*iter)->cfd->GetID() != column_family_data->GetID()) { + // group commits across column families are not yet supported + break; + } last_writer = *iter; - LogAndApplyHelper(&builder, v, last_writer->edit, mu); + LogAndApplyHelper(column_family_data, &builder, v, last_writer->edit, mu); batch_edits.push_back(last_writer->edit); } builder.SaveTo(v); @@ -1564,7 +1569,6 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, if (s.ok()) { manifest_file_size_ = new_manifest_file_size; AppendVersion(column_family_data, v); - log_number_ = edit->log_number_; column_family_data->SetLogNumber(edit->log_number_); prev_log_number_ = edit->prev_log_number_; @@ -1596,15 +1600,16 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, return s; } -void VersionSet::LogAndApplyHelper(Builder* builder, Version* v, - VersionEdit* edit, port::Mutex* mu) { +void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, Builder* builder, + Version* v, VersionEdit* edit, + port::Mutex* mu) { mu->AssertHeld(); if (edit->has_log_number_) { - assert(edit->log_number_ >= log_number_); + assert(edit->log_number_ >= cfd->GetLogNumber()); assert(edit->log_number_ < next_file_number_); } else { - edit->SetLogNumber(log_number_); + edit->SetLogNumber(cfd->GetLogNumber()); } if (!edit->has_prev_log_number_) { @@ -1754,6 +1759,7 @@ Status VersionSet::Recover( if (edit.has_log_number_) { cfd->SetLogNumber(edit.log_number_); + have_log_number = true; } // if it is not column family add or column family drop, @@ -1764,11 +1770,6 @@ Status VersionSet::Recover( builder->second->Apply(&edit); } - if (edit.has_log_number_) { - log_number = edit.log_number_; - have_log_number = true; - } - if (edit.has_prev_log_number_) { prev_log_number = edit.prev_log_number_; have_prev_log_number = true; @@ -1828,7 +1829,6 @@ Status VersionSet::Recover( manifest_file_number_ = next_file; next_file_number_ = next_file + 1; last_sequence_ = last_sequence; - log_number_ = log_number; prev_log_number_ = prev_log_number; Log(options_->info_log, "Recovered from manifest file:%s succeeded," @@ -1839,7 +1839,7 @@ Status VersionSet::Recover( (unsigned long)manifest_file_number_, (unsigned long)next_file_number_, (unsigned long)last_sequence_, - (unsigned long)log_number_, + (unsigned long)log_number, (unsigned long)prev_log_number_); for (auto cfd : *column_family_set_) { @@ -2041,7 +2041,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, } if (edit.has_log_number_) { - log_number = edit.log_number_; + log_number = std::max(log_number, edit.log_number_); have_log_number = true; } @@ -2090,7 +2090,6 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, manifest_file_number_ = next_file; next_file_number_ = next_file + 1; last_sequence_ = last_sequence; - log_number_ = log_number; prev_log_number_ = prev_log_number; printf("manifest_file_number %lu next_file_number %lu last_sequence " diff --git a/db/version_set.h b/db/version_set.h index 8512f0835..3376668cb 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -344,10 +344,6 @@ class VersionSet { // Mark the specified file number as used. void MarkFileNumberUsed(uint64_t number); - // Return the current log file number. This is the biggest log_number from - // all column families - uint64_t LogNumber() const { return log_number_; } - // Return the log file number for the log file that is currently // being compacted, or zero if there is no such log file. uint64_t PrevLogNumber() const { return prev_log_number_; } @@ -468,7 +464,6 @@ class VersionSet { uint64_t next_file_number_; uint64_t manifest_file_number_; std::atomic last_sequence_; - uint64_t log_number_; uint64_t prev_log_number_; // 0 or backing store for memtable being compacted int num_levels_; @@ -502,8 +497,8 @@ class VersionSet { VersionSet(const VersionSet&); void operator=(const VersionSet&); - void LogAndApplyHelper(Builder*b, Version* v, - VersionEdit* edit, port::Mutex* mu); + void LogAndApplyHelper(ColumnFamilyData* cfd, Builder* b, Version* v, + VersionEdit* edit, port::Mutex* mu); }; } // namespace rocksdb