Fix flush not being commit while writing manifest

Summary:
Fix flush not being commit while writing manifest, which is a recent bug introduced by D60075.

The issue:
# Options.max_background_flushes > 1
# Background thread A pick up a flush job, flush, then commit to manifest. (Note that mutex is released before writing manifest.)
# Background thread B pick up another flush job, flush. When it gets to `MemTableList::InstallMemtableFlushResults`, it notices another thread is commiting, so it quit.
# After the first commit, thread A doesn't double check if there are more flush result need to commit, leaving the second flush uncommitted.

Test Plan: run the test. Also verify the new test hit deadlock without the fix.

Reviewers: sdong, igor, lightmark

Reviewed By: lightmark

Subscribers: andrewkr, omegaga, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D60969
main
Yi Wu 8 years ago
parent 9ab38c45ad
commit 32604e6601
  1. 1
      CMakeLists.txt
  2. 5
      Makefile
  3. 55
      db/db_flush_test.cc
  4. 119
      db/memtable_list.cc
  5. 1
      src.mk

@ -356,6 +356,7 @@ set(TESTS
db/db_test2.cc
db/db_block_cache_test.cc
db/db_bloom_filter_test.cc
db/db_flush_test.cc
db/db_iterator_test.cc
db/db_sst_test.cc
db/db_universal_compaction_test.cc

@ -274,8 +274,10 @@ TESTS = \
db_compaction_filter_test \
db_compaction_test \
db_dynamic_level_test \
db_flush_test \
db_inplace_update_test \
db_iterator_test \
db_options_test \
db_sst_test \
db_tailing_iter_test \
db_universal_compaction_test \
@ -920,6 +922,9 @@ db_compaction_test: db/db_compaction_test.o db/db_test_util.o $(LIBOBJECTS) $(TE
db_dynamic_level_test: db/db_dynamic_level_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_flush_test: db/db_flush_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_inplace_update_test: db/db_inplace_update_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

@ -0,0 +1,55 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "util/sync_point.h"
namespace rocksdb {
class DBFlushTest : public DBTestBase {
public:
DBFlushTest() : DBTestBase("/db_flush_test") {}
};
// We had issue when two background threads trying to flush at the same time,
// only one of them get committed. The test verifies the issue is fixed.
TEST_F(DBFlushTest, FlushWhileWritingManifest) {
Options options;
options.disable_auto_compactions = true;
options.max_background_flushes = 2;
Reopen(options);
FlushOptions no_wait;
no_wait.wait = false;
SyncPoint::GetInstance()->LoadDependency(
{{"VersionSet::LogAndApply:WriteManifest",
"DBFlushTest::FlushWhileWritingManifest:1"},
{"MemTableList::InstallMemtableFlushResults:InProgress",
"VersionSet::LogAndApply:WriteManifestDone"}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("foo", "v"));
ASSERT_OK(dbfull()->Flush(no_wait));
TEST_SYNC_POINT("DBFlushTest::FlushWhileWritingManifest:1");
ASSERT_OK(Put("bar", "v"));
ASSERT_OK(dbfull()->Flush(no_wait));
// If the issue is hit we will wait here forever.
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ(2, TotalTableFiles());
}
} // namespace rocksdb
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -11,14 +11,15 @@
#include <inttypes.h>
#include <string>
#include "rocksdb/db.h"
#include "db/memtable.h"
#include "db/version_set.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "table/merger.h"
#include "util/coding.h"
#include "util/log_buffer.h"
#include "util/sync_point.h"
#include "util/thread_status_util.h"
namespace rocksdb {
@ -297,69 +298,79 @@ Status MemTableList::InstallMemtableFlushResults(
// if some other thread is already committing, then return
Status s;
if (commit_in_progress_) {
TEST_SYNC_POINT("MemTableList::InstallMemtableFlushResults:InProgress");
return s;
}
// Only a single thread can be executing this piece of code
commit_in_progress_ = true;
// scan all memtables from the earliest, and commit those
// (in that order) that have finished flushing. Memetables
// are always committed in the order that they were created.
uint64_t batch_file_number = 0;
size_t batch_count = 0;
autovector<VersionEdit*> edit_list;
auto& memlist = current_->memlist_;
// enumerate from the last (earliest) element to see how many batch finished
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
if (!m->flush_completed_) {
// Retry until all completed flushes are committed. New flushes can finish
// while the current thread is writing manifest where mutex is released.
while (s.ok()) {
auto& memlist = current_->memlist_;
if (memlist.empty() || !memlist.back()->flush_completed_) {
break;
}
if (it == memlist.rbegin() || batch_file_number != m->file_number_) {
batch_file_number = m->file_number_;
LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64 " started",
cfd->GetName().c_str(), m->file_number_);
edit_list.push_back(&m->edit_);
// scan all memtables from the earliest, and commit those
// (in that order) that have finished flushing. Memetables
// are always committed in the order that they were created.
uint64_t batch_file_number = 0;
size_t batch_count = 0;
autovector<VersionEdit*> edit_list;
// enumerate from the last (earliest) element to see how many batch finished
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
if (!m->flush_completed_) {
break;
}
if (it == memlist.rbegin() || batch_file_number != m->file_number_) {
batch_file_number = m->file_number_;
LogToBuffer(log_buffer,
"[%s] Level-0 commit table #%" PRIu64 " started",
cfd->GetName().c_str(), m->file_number_);
edit_list.push_back(&m->edit_);
}
batch_count++;
}
batch_count++;
}
if (batch_count > 0) {
// this can release and reacquire the mutex.
s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, db_directory);
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable
InstallNewVersion();
// All the later memtables that have the same filenum
// are part of the same batch. They can be committed now.
uint64_t mem_id = 1; // how many memtables have been flushed.
if (s.ok()) { // commit new state
while (batch_count-- > 0) {
MemTable* m = current_->memlist_.back();
LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " done",
cfd->GetName().c_str(), m->file_number_, mem_id);
assert(m->file_number_ > 0);
current_->Remove(m, to_delete);
++mem_id;
}
} else {
for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; it++) {
MemTable* m = *it;
// commit failed. setup state so that we can flush again.
LogToBuffer(log_buffer, "Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " failed",
m->file_number_, mem_id);
m->flush_completed_ = false;
m->flush_in_progress_ = false;
m->edit_.Clear();
num_flush_not_started_++;
m->file_number_ = 0;
imm_flush_needed.store(true, std::memory_order_release);
++mem_id;
if (batch_count > 0) {
// this can release and reacquire the mutex.
s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
db_directory);
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable
InstallNewVersion();
// All the later memtables that have the same filenum
// are part of the same batch. They can be committed now.
uint64_t mem_id = 1; // how many memtables have been flushed.
if (s.ok()) { // commit new state
while (batch_count-- > 0) {
MemTable* m = current_->memlist_.back();
LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " done",
cfd->GetName().c_str(), m->file_number_, mem_id);
assert(m->file_number_ > 0);
current_->Remove(m, to_delete);
++mem_id;
}
} else {
for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; it++) {
MemTable* m = *it;
// commit failed. setup state so that we can flush again.
LogToBuffer(log_buffer, "Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " failed",
m->file_number_, mem_id);
m->flush_completed_ = false;
m->flush_in_progress_ = false;
m->edit_.Clear();
num_flush_not_started_++;
m->file_number_ = 0;
imm_flush_needed.store(true, std::memory_order_release);
++mem_id;
}
}
}
}

@ -217,6 +217,7 @@ MAIN_SOURCES = \
db/db_compaction_filter_test.cc \
db/db_compaction_test.cc \
db/db_dynamic_level_test.cc \
db/db_flush_test.cc \
db/db_inplace_update_test.cc \
db/db_iterator_test.cc \
db/db_log_iter_test.cc \

Loading…
Cancel
Save