Perform post-flush updates of memtable list in a callback (#6069)

Summary:
Currently, the following interleaving of events can lead to SuperVersion containing both immutable memtables as well as the resulting L0. This can cause Get to return incorrect result if there are merge operands. This may also affect other operations such as single deletes.

```
  time  main_thr  bg_flush_thr  bg_compact_thr  compact_thr  set_opts_thr
0  |                                                         WriteManifest:0
1  |                                           issue compact
2  |                                 wait
3  |   Merge(counter)
4  |   issue flush
5  |                   wait
6  |                                                         WriteManifest:1
7  |                                 wake up
8  |                                 write manifest
9  |                  wake up
10 |  Get(counter)
11 |                  remove imm
   V
```

The reason behind is that: one bg flush thread's installing new `Version` can be batched and performed by another thread that is the "leader" MANIFEST writer. This bg thread removes the memtables from current super version only after `LogAndApply` returns. After the leader MANIFEST writer signals (releasing mutex) this bg flush thread, it is possible that another thread sees this cf with both memtables (whose data have been flushed to the newest L0) and the L0 before this bg flush thread removes the memtables.

To address this issue, each bg flush thread can pass a callback function to `LogAndApply`. The callback is responsible for removing the memtables. Therefore, the leader MANIFEST writer can call this callback and remove the memtables before releasing the mutex.

Test plan (devserver)
```
$make merge_test
$./merge_test --gtest_filter=MergeTest.MergeWithCompactionAndFlush
$make check
```

Pull Request resolved: https://github.com/facebook/rocksdb/pull/6069

Reviewed By: cheng-chang

Differential Revision: D18790894

Pulled By: riversand963

fbshipit-source-id: e41bd600c0448b4f4b2deb3f7677f95e3076b4ed
main
Yanqin Jin 4 years ago committed by Facebook GitHub Bot
parent a7a04b6898
commit 6134ce6444
  1. 1
      HISTORY.md
  2. 164
      db/memtable_list.cc
  3. 7
      db/memtable_list.h
  4. 107
      db/merge_test.cc
  5. 27
      db/version_set.cc
  6. 15
      db/version_set.h

@ -7,6 +7,7 @@
* Since 6.14, fix false positive flush/compaction `Status::Corruption` failure when `paranoid_file_checks == true` and range tombstones were written to the compaction output files.
* Since 6.14, fix a bug that could cause a stalled write to crash with mixed of slowdown and no_slowdown writes (`WriteOptions.no_slowdown=true`).
* Fixed a bug which causes hang in closing DB when refit level is set in opt build. It was because ContinueBackgroundWork() was called in assert statement which is a no op. It was introduced in 6.14.
* Perform post-flush updates of in-memory data structures of memtable list in a callback that will be called by the thread writing to MANIFEST file before signaling other threads and releasing db mutex.
### Public API Change
* Deprecate `BlockBasedTableOptions::pin_l0_filter_and_index_blocks_in_cache` and `BlockBasedTableOptions::pin_top_level_index_and_filter`. These options still take effect until users migrate to the replacement APIs in `BlockBasedTableOptions::metadata_cache_options`. Migration guidance can be found in the API comments on the deprecated options.

@ -481,83 +481,18 @@ Status MemTableList::TryInstallMemtableFlushResults(
vset, *cfd, edit_list, memtables_to_flush, prep_tracker));
}
const auto manifest_write_cb = [this, cfd, batch_count, log_buffer,
to_delete, mu](const Status& status) {
RemoveMemTablesOrRestoreFlags(status, cfd, batch_count, log_buffer,
to_delete, mu);
};
// this can release and reacquire the mutex.
s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
db_directory);
db_directory, /*new_descriptor_log=*/false,
/*column_family_options=*/nullptr,
manifest_write_cb);
*io_s = vset->io_status();
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable
InstallNewVersion();
// All the later memtables that have the same filenum
// are part of the same batch. They can be committed now.
uint64_t mem_id = 1; // how many memtables have been flushed.
// commit new state only if the column family is NOT dropped.
// The reason is as follows (refer to
// ColumnFamilyTest.FlushAndDropRaceCondition).
// If the column family is dropped, then according to LogAndApply, its
// corresponding flush operation is NOT written to the MANIFEST. This
// means the DB is not aware of the L0 files generated from the flush.
// By committing the new state, we remove the memtable from the memtable
// list. Creating an iterator on this column family will not be able to
// read full data since the memtable is removed, and the DB is not aware
// of the L0 files, causing MergingIterator unable to build child
// iterators. RocksDB contract requires that the iterator can be created
// on a dropped column family, and we must be able to
// read full data as long as column family handle is not deleted, even if
// the column family is dropped.
if (s.ok() && !cfd->IsDropped()) { // commit new state
while (batch_count-- > 0) {
MemTable* m = current_->memlist_.back();
if (m->edit_.GetBlobFileAdditions().empty()) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " done",
cfd->GetName().c_str(), m->file_number_, mem_id);
} else {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit table #%" PRIu64
" (+%zu blob files)"
": memtable #%" PRIu64 " done",
cfd->GetName().c_str(), m->file_number_,
m->edit_.GetBlobFileAdditions().size(), mem_id);
}
assert(m->file_number_ > 0);
current_->Remove(m, to_delete);
UpdateCachedValuesFromMemTableListVersion();
ResetTrimHistoryNeeded();
++mem_id;
}
} else {
for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; ++it) {
MemTable* m = *it;
// commit failed. setup state so that we can flush again.
if (m->edit_.GetBlobFileAdditions().empty()) {
ROCKS_LOG_BUFFER(log_buffer,
"Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " failed",
m->file_number_, mem_id);
} else {
ROCKS_LOG_BUFFER(log_buffer,
"Level-0 commit table #%" PRIu64
" (+%zu blob files)"
": memtable #%" PRIu64 " failed",
m->file_number_,
m->edit_.GetBlobFileAdditions().size(), mem_id);
}
m->flush_completed_ = false;
m->flush_in_progress_ = false;
m->edit_.Clear();
num_flush_not_started_++;
m->file_number_ = 0;
imm_flush_needed.store(true, std::memory_order_release);
++mem_id;
}
}
}
}
commit_in_progress_ = false;
@ -642,6 +577,87 @@ void MemTableList::InstallNewVersion() {
}
}
void MemTableList::RemoveMemTablesOrRestoreFlags(
const Status& s, ColumnFamilyData* cfd, size_t batch_count,
LogBuffer* log_buffer, autovector<MemTable*>* to_delete,
InstrumentedMutex* mu) {
assert(mu);
mu->AssertHeld();
assert(to_delete);
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable
InstallNewVersion();
// All the later memtables that have the same filenum
// are part of the same batch. They can be committed now.
uint64_t mem_id = 1; // how many memtables have been flushed.
// commit new state only if the column family is NOT dropped.
// The reason is as follows (refer to
// ColumnFamilyTest.FlushAndDropRaceCondition).
// If the column family is dropped, then according to LogAndApply, its
// corresponding flush operation is NOT written to the MANIFEST. This
// means the DB is not aware of the L0 files generated from the flush.
// By committing the new state, we remove the memtable from the memtable
// list. Creating an iterator on this column family will not be able to
// read full data since the memtable is removed, and the DB is not aware
// of the L0 files, causing MergingIterator unable to build child
// iterators. RocksDB contract requires that the iterator can be created
// on a dropped column family, and we must be able to
// read full data as long as column family handle is not deleted, even if
// the column family is dropped.
if (s.ok() && !cfd->IsDropped()) { // commit new state
while (batch_count-- > 0) {
MemTable* m = current_->memlist_.back();
if (m->edit_.GetBlobFileAdditions().empty()) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " done",
cfd->GetName().c_str(), m->file_number_, mem_id);
} else {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit table #%" PRIu64
" (+%zu blob files)"
": memtable #%" PRIu64 " done",
cfd->GetName().c_str(), m->file_number_,
m->edit_.GetBlobFileAdditions().size(), mem_id);
}
assert(m->file_number_ > 0);
current_->Remove(m, to_delete);
UpdateCachedValuesFromMemTableListVersion();
ResetTrimHistoryNeeded();
++mem_id;
}
} else {
for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; ++it) {
MemTable* m = *it;
// commit failed. setup state so that we can flush again.
if (m->edit_.GetBlobFileAdditions().empty()) {
ROCKS_LOG_BUFFER(log_buffer,
"Level-0 commit table #%" PRIu64 ": memtable #%" PRIu64
" failed",
m->file_number_, mem_id);
} else {
ROCKS_LOG_BUFFER(log_buffer,
"Level-0 commit table #%" PRIu64
" (+%zu blob files)"
": memtable #%" PRIu64 " failed",
m->file_number_,
m->edit_.GetBlobFileAdditions().size(), mem_id);
}
m->flush_completed_ = false;
m->flush_in_progress_ = false;
m->edit_.Clear();
num_flush_not_started_++;
m->file_number_ = 0;
imm_flush_needed.store(true, std::memory_order_release);
++mem_id;
}
}
}
uint64_t MemTableList::PrecomputeMinLogContainingPrepSection(
const autovector<MemTable*>& memtables_to_flush) {
uint64_t min_log = 0;

@ -389,6 +389,13 @@ class MemTableList {
// DB mutex held
void InstallNewVersion();
// DB mutex held
// Called after writing to MANIFEST
void RemoveMemTablesOrRestoreFlags(const Status& s, ColumnFamilyData* cfd,
size_t batch_count, LogBuffer* log_buffer,
autovector<MemTable*>* to_delete,
InstrumentedMutex* mu);
const int min_write_buffer_number_to_merge_;
MemTableListVersion* current_;

@ -4,8 +4,9 @@
// (found in the LICENSE.Apache file in the root directory).
//
#include <assert.h>
#include <memory>
#include <iostream>
#include <memory>
#include "db/db_impl/db_impl.h"
#include "db/dbformat.h"
@ -18,6 +19,7 @@
#include "rocksdb/merge_operator.h"
#include "rocksdb/utilities/db_ttl.h"
#include "test_util/testharness.h"
#include "util/coding.h"
#include "utilities/merge_operators.h"
namespace ROCKSDB_NAMESPACE {
@ -296,6 +298,96 @@ void testCounters(Counters& counters, DB* db, bool test_compaction) {
}
}
void testCountersWithFlushAndCompaction(Counters& counters, DB* db) {
ASSERT_OK(db->Put({}, "1", "1"));
ASSERT_OK(db->Flush(FlushOptions()));
std::atomic<int> cnt{0};
const auto get_thread_id = [&cnt]() {
thread_local int thread_id{cnt++};
return thread_id;
};
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::LogAndApply:BeforeWriterWaiting", [&](void* /*arg*/) {
int thread_id = get_thread_id();
if (1 == thread_id) {
TEST_SYNC_POINT(
"testCountersWithFlushAndCompaction::bg_compact_thread:0");
} else if (2 == thread_id) {
TEST_SYNC_POINT(
"testCountersWithFlushAndCompaction::bg_flush_thread:0");
}
});
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::LogAndApply:WriteManifest", [&](void* /*arg*/) {
int thread_id = get_thread_id();
if (0 == thread_id) {
TEST_SYNC_POINT(
"testCountersWithFlushAndCompaction::set_options_thread:0");
TEST_SYNC_POINT(
"testCountersWithFlushAndCompaction::set_options_thread:1");
}
});
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::LogAndApply:WakeUpAndDone", [&](void* arg) {
auto* mutex = reinterpret_cast<InstrumentedMutex*>(arg);
mutex->AssertHeld();
int thread_id = get_thread_id();
ASSERT_EQ(2, thread_id);
mutex->Unlock();
TEST_SYNC_POINT(
"testCountersWithFlushAndCompaction::bg_flush_thread:1");
TEST_SYNC_POINT(
"testCountersWithFlushAndCompaction::bg_flush_thread:2");
mutex->Lock();
});
SyncPoint::GetInstance()->LoadDependency({
{"testCountersWithFlushAndCompaction::set_options_thread:0",
"testCountersWithCompactionAndFlush:BeforeCompact"},
{"testCountersWithFlushAndCompaction::bg_compact_thread:0",
"testCountersWithFlushAndCompaction:BeforeIncCounters"},
{"testCountersWithFlushAndCompaction::bg_flush_thread:0",
"testCountersWithFlushAndCompaction::set_options_thread:1"},
{"testCountersWithFlushAndCompaction::bg_flush_thread:1",
"testCountersWithFlushAndCompaction:BeforeVerification"},
{"testCountersWithFlushAndCompaction:AfterGet",
"testCountersWithFlushAndCompaction::bg_flush_thread:2"},
});
SyncPoint::GetInstance()->EnableProcessing();
port::Thread set_options_thread([&]() {
ASSERT_OK(reinterpret_cast<DBImpl*>(db)->SetOptions(
{{"disable_auto_compactions", "false"}}));
});
TEST_SYNC_POINT("testCountersWithCompactionAndFlush:BeforeCompact");
port::Thread compact_thread([&]() {
ASSERT_OK(reinterpret_cast<DBImpl*>(db)->CompactRange(
CompactRangeOptions(), db->DefaultColumnFamily(), nullptr, nullptr));
});
TEST_SYNC_POINT("testCountersWithFlushAndCompaction:BeforeIncCounters");
counters.add("test-key", 1);
FlushOptions flush_opts;
flush_opts.wait = false;
ASSERT_OK(db->Flush(flush_opts));
TEST_SYNC_POINT("testCountersWithFlushAndCompaction:BeforeVerification");
std::string expected;
PutFixed64(&expected, 1);
std::string actual;
Status s = db->Get(ReadOptions(), "test-key", &actual);
TEST_SYNC_POINT("testCountersWithFlushAndCompaction:AfterGet");
set_options_thread.join();
compact_thread.join();
ASSERT_OK(s);
ASSERT_EQ(expected, actual);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
void testSuccessiveMerge(Counters& counters, size_t max_num_merges,
size_t num_merges) {
@ -488,6 +580,19 @@ TEST_F(MergeTest, MergeDbTtlTest) {
runTest(test::PerThreadDBPath("merge_testdbttl"),
true); // Run test on TTL database
}
TEST_F(MergeTest, MergeWithCompactionAndFlush) {
const std::string dbname =
test::PerThreadDBPath("merge_with_compaction_and_flush");
{
auto db = OpenDb(dbname);
{
MergeBasedCounters counters(db, 0);
testCountersWithFlushAndCompaction(counters, db.get());
}
}
DestroyDB(dbname, Options());
}
#endif // !ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE

@ -3665,16 +3665,18 @@ struct VersionSet::ManifestWriter {
ColumnFamilyData* cfd;
const MutableCFOptions mutable_cf_options;
const autovector<VersionEdit*>& edit_list;
const std::function<void(const Status&)> manifest_write_callback;
explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd,
const MutableCFOptions& cf_options,
const autovector<VersionEdit*>& e)
explicit ManifestWriter(
InstrumentedMutex* mu, ColumnFamilyData* _cfd,
const MutableCFOptions& cf_options, const autovector<VersionEdit*>& e,
const std::function<void(const Status&)>& manifest_wcb)
: done(false),
cv(mu),
cfd(_cfd),
mutable_cf_options(cf_options),
edit_list(e) {}
edit_list(e),
manifest_write_callback(manifest_wcb) {}
~ManifestWriter() { status.PermitUncheckedError(); }
bool IsAllWalEdits() const {
@ -4044,7 +4046,7 @@ Status VersionSet::ProcessManifestWrites(
FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
mu->Unlock();
TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WriteManifest", nullptr);
if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
assert(!builder_guards.empty() &&
@ -4294,6 +4296,9 @@ Status VersionSet::ProcessManifestWrites(
}
ready->status = s;
ready->done = true;
if (ready->manifest_write_callback) {
(ready->manifest_write_callback)(s);
}
if (need_signal) {
ready->cv.Signal();
}
@ -4314,7 +4319,8 @@ Status VersionSet::LogAndApply(
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<autovector<VersionEdit*>>& edit_lists,
InstrumentedMutex* mu, FSDirectory* db_directory, bool new_descriptor_log,
const ColumnFamilyOptions* new_cf_options) {
const ColumnFamilyOptions* new_cf_options,
const std::vector<std::function<void(const Status&)>>& manifest_wcbs) {
mu->AssertHeld();
int num_edits = 0;
for (const auto& elist : edit_lists) {
@ -4344,12 +4350,16 @@ Status VersionSet::LogAndApply(
assert(static_cast<size_t>(num_cfds) == edit_lists.size());
}
for (int i = 0; i < num_cfds; ++i) {
const auto wcb =
manifest_wcbs.empty() ? [](const Status&) {} : manifest_wcbs[i];
writers.emplace_back(mu, column_family_datas[i],
*mutable_cf_options_list[i], edit_lists[i]);
*mutable_cf_options_list[i], edit_lists[i], wcb);
manifest_writers_.push_back(&writers[i]);
}
assert(!writers.empty());
ManifestWriter& first_writer = writers.front();
TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:BeforeWriterWaiting",
nullptr);
while (!first_writer.done && &first_writer != manifest_writers_.front()) {
first_writer.cv.Wait();
}
@ -4361,6 +4371,7 @@ Status VersionSet::LogAndApply(
for (const auto& writer : writers) {
assert(writer.done);
}
TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WakeUpAndDone", mu);
#endif /* !NDEBUG */
return first_writer.status;
}

@ -958,7 +958,8 @@ class VersionSet {
const MutableCFOptions& mutable_cf_options,
const autovector<VersionEdit*>& edit_list, InstrumentedMutex* mu,
FSDirectory* db_directory = nullptr, bool new_descriptor_log = false,
const ColumnFamilyOptions* column_family_options = nullptr) {
const ColumnFamilyOptions* column_family_options = nullptr,
const std::function<void(const Status&)>& manifest_wcb = {}) {
autovector<ColumnFamilyData*> cfds;
cfds.emplace_back(column_family_data);
autovector<const MutableCFOptions*> mutable_cf_options_list;
@ -966,7 +967,8 @@ class VersionSet {
autovector<autovector<VersionEdit*>> edit_lists;
edit_lists.emplace_back(edit_list);
return LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu,
db_directory, new_descriptor_log, column_family_options);
db_directory, new_descriptor_log, column_family_options,
{manifest_wcb});
}
// The across-multi-cf batch version. If edit_lists contain more than
@ -978,7 +980,9 @@ class VersionSet {
const autovector<autovector<VersionEdit*>>& edit_lists,
InstrumentedMutex* mu, FSDirectory* db_directory = nullptr,
bool new_descriptor_log = false,
const ColumnFamilyOptions* new_cf_options = nullptr);
const ColumnFamilyOptions* new_cf_options = nullptr,
const std::vector<std::function<void(const Status&)>>& manifest_wcbs =
{});
static Status GetCurrentManifestPath(const std::string& dbname,
FileSystem* fs,
@ -1419,8 +1423,9 @@ class ReactiveVersionSet : public VersionSet {
const autovector<const MutableCFOptions*>& /*mutable_cf_options_list*/,
const autovector<autovector<VersionEdit*>>& /*edit_lists*/,
InstrumentedMutex* /*mu*/, FSDirectory* /*db_directory*/,
bool /*new_descriptor_log*/,
const ColumnFamilyOptions* /*new_cf_option*/) override {
bool /*new_descriptor_log*/, const ColumnFamilyOptions* /*new_cf_option*/,
const std::vector<std::function<void(const Status&)>>& /*manifest_wcbs*/)
override {
return Status::NotSupported("not supported in reactive mode");
}

Loading…
Cancel
Save