From 32c965d41790e8d5e4bb14c559a827eb12c8babf Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Wed, 4 Sep 2013 17:24:35 -0700 Subject: [PATCH] Flush was hanging because the configured options specified that more than 1 memtable need to be merged. Summary: There is an config option called Options.min_write_buffer_number_to_merge that specifies the minimum number of write buffers to merge in memory before flushing to a file in L0. But in the the case when the db is being closed, we should not be using this config, instead we should flush whatever write buffers were available at that time. Test Plan: Unit test attached. Reviewers: haobo, emayanke Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D12717 --- db/db_impl.cc | 3 +++ db/db_test.cc | 18 ++++++++++++++++++ db/memtablelist.cc | 4 +++- db/memtablelist.h | 9 ++++++++- 4 files changed, 32 insertions(+), 2 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 66e44a611..69e153bdd 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2745,6 +2745,9 @@ Status DBImpl::MakeRoomForWrite(bool force) { log_.reset(new log::Writer(std::move(lfile))); mem_->SetLogNumber(logfile_number_); imm_.Add(mem_); + if (force) { + imm_.FlushRequested(); + } mem_ = new MemTable(internal_comparator_, mem_rep_factory_, NumberLevels(), options_); mem_->Ref(); diff --git a/db/db_test.cc b/db/db_test.cc index 3f2879027..5f11d7203 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1366,6 +1366,24 @@ TEST(DBTest, CheckLock) { } while (ChangeCompactOptions()); } +TEST(DBTest, FlushMultipleMemtable) { + do { + Options options = CurrentOptions(); + WriteOptions writeOpt = WriteOptions(); + writeOpt.disableWAL = true; + options.max_write_buffer_number = 4; + options.min_write_buffer_number_to_merge = 3; + Reopen(&options); + ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1")); + dbfull()->Flush(FlushOptions()); + ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v1")); + + ASSERT_EQ("v1", Get("foo")); + ASSERT_EQ("v1", Get("bar")); + dbfull()->Flush(FlushOptions()); + } while (ChangeCompactOptions()); +} + TEST(DBTest, FLUSH) { do { Options options = CurrentOptions(); diff --git a/db/memtablelist.cc b/db/memtablelist.cc index a1e6c9674..fa10a740e 100644 --- a/db/memtablelist.cc +++ b/db/memtablelist.cc @@ -42,7 +42,8 @@ int MemTableList::size() { // Returns true if there is at least one memtable on which flush has // not yet started. bool MemTableList::IsFlushPending(int min_write_buffer_number_to_merge) { - if (num_flush_not_started_ >= min_write_buffer_number_to_merge) { + if ((flush_requested_ && num_flush_not_started_ >= 1) || + (num_flush_not_started_ >= min_write_buffer_number_to_merge)) { assert(imm_flush_needed.NoBarrier_Load() != nullptr); return true; } @@ -63,6 +64,7 @@ void MemTableList::PickMemtablesToFlush(std::vector* ret) { ret->push_back(m); } } + flush_requested_ = false; // start-flush request is complete } // Record a successful flush in the manifest file diff --git a/db/memtablelist.h b/db/memtablelist.h index d97187b0d..327a2e0f7 100644 --- a/db/memtablelist.h +++ b/db/memtablelist.h @@ -29,7 +29,8 @@ class MemTableList { public: // A list of memtables. MemTableList() : size_(0), num_flush_not_started_(0), - commit_in_progress_(false) { + commit_in_progress_(false), + flush_requested_(false) { imm_flush_needed.Release_Store(nullptr); } ~MemTableList() {}; @@ -76,6 +77,9 @@ class MemTableList { // Returns the list of underlying memtables. void GetMemTables(std::vector* list); + // Request a flush of all existing memtables to storage + void FlushRequested() { flush_requested_ = true; } + // Copying allowed // MemTableList(const MemTableList&); // void operator=(const MemTableList&); @@ -90,6 +94,9 @@ class MemTableList { // committing in progress bool commit_in_progress_; + // Requested a flush of all memtables to storage + bool flush_requested_; + }; } // namespace leveldb