From 6134ce6444898f8ad432045ad73a2e54ed820af2 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Mon, 26 Oct 2020 18:20:43 -0700 Subject: [PATCH] Perform post-flush updates of memtable list in a callback (#6069) Summary: Currently, the following interleaving of events can lead to SuperVersion containing both immutable memtables as well as the resulting L0. This can cause Get to return incorrect result if there are merge operands. This may also affect other operations such as single deletes. ``` time main_thr bg_flush_thr bg_compact_thr compact_thr set_opts_thr 0 | WriteManifest:0 1 | issue compact 2 | wait 3 | Merge(counter) 4 | issue flush 5 | wait 6 | WriteManifest:1 7 | wake up 8 | write manifest 9 | wake up 10 | Get(counter) 11 | remove imm V ``` The reason behind is that: one bg flush thread's installing new `Version` can be batched and performed by another thread that is the "leader" MANIFEST writer. This bg thread removes the memtables from current super version only after `LogAndApply` returns. After the leader MANIFEST writer signals (releasing mutex) this bg flush thread, it is possible that another thread sees this cf with both memtables (whose data have been flushed to the newest L0) and the L0 before this bg flush thread removes the memtables. To address this issue, each bg flush thread can pass a callback function to `LogAndApply`. The callback is responsible for removing the memtables. Therefore, the leader MANIFEST writer can call this callback and remove the memtables before releasing the mutex. Test plan (devserver) ``` $make merge_test $./merge_test --gtest_filter=MergeTest.MergeWithCompactionAndFlush $make check ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/6069 Reviewed By: cheng-chang Differential Revision: D18790894 Pulled By: riversand963 fbshipit-source-id: e41bd600c0448b4f4b2deb3f7677f95e3076b4ed --- HISTORY.md | 1 + db/memtable_list.cc | 164 ++++++++++++++++++++++++-------------------- db/memtable_list.h | 7 ++ db/merge_test.cc | 107 ++++++++++++++++++++++++++++- db/version_set.cc | 27 +++++--- db/version_set.h | 15 ++-- 6 files changed, 233 insertions(+), 88 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 2851f5373..6fc3bb8b2 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,7 @@ * Since 6.14, fix false positive flush/compaction `Status::Corruption` failure when `paranoid_file_checks == true` and range tombstones were written to the compaction output files. * Since 6.14, fix a bug that could cause a stalled write to crash with mixed of slowdown and no_slowdown writes (`WriteOptions.no_slowdown=true`). * Fixed a bug which causes hang in closing DB when refit level is set in opt build. It was because ContinueBackgroundWork() was called in assert statement which is a no op. It was introduced in 6.14. +* Perform post-flush updates of in-memory data structures of memtable list in a callback that will be called by the thread writing to MANIFEST file before signaling other threads and releasing db mutex. ### Public API Change * Deprecate `BlockBasedTableOptions::pin_l0_filter_and_index_blocks_in_cache` and `BlockBasedTableOptions::pin_top_level_index_and_filter`. These options still take effect until users migrate to the replacement APIs in `BlockBasedTableOptions::metadata_cache_options`. Migration guidance can be found in the API comments on the deprecated options. diff --git a/db/memtable_list.cc b/db/memtable_list.cc index dced9f7db..1e1758d59 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -481,83 +481,18 @@ Status MemTableList::TryInstallMemtableFlushResults( vset, *cfd, edit_list, memtables_to_flush, prep_tracker)); } + const auto manifest_write_cb = [this, cfd, batch_count, log_buffer, + to_delete, mu](const Status& status) { + RemoveMemTablesOrRestoreFlags(status, cfd, batch_count, log_buffer, + to_delete, mu); + }; + // this can release and reacquire the mutex. s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, - db_directory); + db_directory, /*new_descriptor_log=*/false, + /*column_family_options=*/nullptr, + manifest_write_cb); *io_s = vset->io_status(); - - // we will be changing the version in the next code path, - // so we better create a new one, since versions are immutable - InstallNewVersion(); - - // All the later memtables that have the same filenum - // are part of the same batch. They can be committed now. - uint64_t mem_id = 1; // how many memtables have been flushed. - - // commit new state only if the column family is NOT dropped. - // The reason is as follows (refer to - // ColumnFamilyTest.FlushAndDropRaceCondition). - // If the column family is dropped, then according to LogAndApply, its - // corresponding flush operation is NOT written to the MANIFEST. This - // means the DB is not aware of the L0 files generated from the flush. - // By committing the new state, we remove the memtable from the memtable - // list. Creating an iterator on this column family will not be able to - // read full data since the memtable is removed, and the DB is not aware - // of the L0 files, causing MergingIterator unable to build child - // iterators. RocksDB contract requires that the iterator can be created - // on a dropped column family, and we must be able to - // read full data as long as column family handle is not deleted, even if - // the column family is dropped. - if (s.ok() && !cfd->IsDropped()) { // commit new state - while (batch_count-- > 0) { - MemTable* m = current_->memlist_.back(); - if (m->edit_.GetBlobFileAdditions().empty()) { - ROCKS_LOG_BUFFER(log_buffer, - "[%s] Level-0 commit table #%" PRIu64 - ": memtable #%" PRIu64 " done", - cfd->GetName().c_str(), m->file_number_, mem_id); - } else { - ROCKS_LOG_BUFFER(log_buffer, - "[%s] Level-0 commit table #%" PRIu64 - " (+%zu blob files)" - ": memtable #%" PRIu64 " done", - cfd->GetName().c_str(), m->file_number_, - m->edit_.GetBlobFileAdditions().size(), mem_id); - } - - assert(m->file_number_ > 0); - current_->Remove(m, to_delete); - UpdateCachedValuesFromMemTableListVersion(); - ResetTrimHistoryNeeded(); - ++mem_id; - } - } else { - for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; ++it) { - MemTable* m = *it; - // commit failed. setup state so that we can flush again. - if (m->edit_.GetBlobFileAdditions().empty()) { - ROCKS_LOG_BUFFER(log_buffer, - "Level-0 commit table #%" PRIu64 - ": memtable #%" PRIu64 " failed", - m->file_number_, mem_id); - } else { - ROCKS_LOG_BUFFER(log_buffer, - "Level-0 commit table #%" PRIu64 - " (+%zu blob files)" - ": memtable #%" PRIu64 " failed", - m->file_number_, - m->edit_.GetBlobFileAdditions().size(), mem_id); - } - - m->flush_completed_ = false; - m->flush_in_progress_ = false; - m->edit_.Clear(); - num_flush_not_started_++; - m->file_number_ = 0; - imm_flush_needed.store(true, std::memory_order_release); - ++mem_id; - } - } } } commit_in_progress_ = false; @@ -642,6 +577,87 @@ void MemTableList::InstallNewVersion() { } } +void MemTableList::RemoveMemTablesOrRestoreFlags( + const Status& s, ColumnFamilyData* cfd, size_t batch_count, + LogBuffer* log_buffer, autovector* to_delete, + InstrumentedMutex* mu) { + assert(mu); + mu->AssertHeld(); + assert(to_delete); + // we will be changing the version in the next code path, + // so we better create a new one, since versions are immutable + InstallNewVersion(); + + // All the later memtables that have the same filenum + // are part of the same batch. They can be committed now. + uint64_t mem_id = 1; // how many memtables have been flushed. + + // commit new state only if the column family is NOT dropped. + // The reason is as follows (refer to + // ColumnFamilyTest.FlushAndDropRaceCondition). + // If the column family is dropped, then according to LogAndApply, its + // corresponding flush operation is NOT written to the MANIFEST. This + // means the DB is not aware of the L0 files generated from the flush. + // By committing the new state, we remove the memtable from the memtable + // list. Creating an iterator on this column family will not be able to + // read full data since the memtable is removed, and the DB is not aware + // of the L0 files, causing MergingIterator unable to build child + // iterators. RocksDB contract requires that the iterator can be created + // on a dropped column family, and we must be able to + // read full data as long as column family handle is not deleted, even if + // the column family is dropped. + if (s.ok() && !cfd->IsDropped()) { // commit new state + while (batch_count-- > 0) { + MemTable* m = current_->memlist_.back(); + if (m->edit_.GetBlobFileAdditions().empty()) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + ": memtable #%" PRIu64 " done", + cfd->GetName().c_str(), m->file_number_, mem_id); + } else { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + " (+%zu blob files)" + ": memtable #%" PRIu64 " done", + cfd->GetName().c_str(), m->file_number_, + m->edit_.GetBlobFileAdditions().size(), mem_id); + } + + assert(m->file_number_ > 0); + current_->Remove(m, to_delete); + UpdateCachedValuesFromMemTableListVersion(); + ResetTrimHistoryNeeded(); + ++mem_id; + } + } else { + for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; ++it) { + MemTable* m = *it; + // commit failed. setup state so that we can flush again. + if (m->edit_.GetBlobFileAdditions().empty()) { + ROCKS_LOG_BUFFER(log_buffer, + "Level-0 commit table #%" PRIu64 ": memtable #%" PRIu64 + " failed", + m->file_number_, mem_id); + } else { + ROCKS_LOG_BUFFER(log_buffer, + "Level-0 commit table #%" PRIu64 + " (+%zu blob files)" + ": memtable #%" PRIu64 " failed", + m->file_number_, + m->edit_.GetBlobFileAdditions().size(), mem_id); + } + + m->flush_completed_ = false; + m->flush_in_progress_ = false; + m->edit_.Clear(); + num_flush_not_started_++; + m->file_number_ = 0; + imm_flush_needed.store(true, std::memory_order_release); + ++mem_id; + } + } +} + uint64_t MemTableList::PrecomputeMinLogContainingPrepSection( const autovector& memtables_to_flush) { uint64_t min_log = 0; diff --git a/db/memtable_list.h b/db/memtable_list.h index 72105d266..0af7a0cea 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -389,6 +389,13 @@ class MemTableList { // DB mutex held void InstallNewVersion(); + // DB mutex held + // Called after writing to MANIFEST + void RemoveMemTablesOrRestoreFlags(const Status& s, ColumnFamilyData* cfd, + size_t batch_count, LogBuffer* log_buffer, + autovector* to_delete, + InstrumentedMutex* mu); + const int min_write_buffer_number_to_merge_; MemTableListVersion* current_; diff --git a/db/merge_test.cc b/db/merge_test.cc index 3f85f6464..047ea7fac 100644 --- a/db/merge_test.cc +++ b/db/merge_test.cc @@ -4,8 +4,9 @@ // (found in the LICENSE.Apache file in the root directory). // #include -#include + #include +#include #include "db/db_impl/db_impl.h" #include "db/dbformat.h" @@ -18,6 +19,7 @@ #include "rocksdb/merge_operator.h" #include "rocksdb/utilities/db_ttl.h" #include "test_util/testharness.h" +#include "util/coding.h" #include "utilities/merge_operators.h" namespace ROCKSDB_NAMESPACE { @@ -296,6 +298,96 @@ void testCounters(Counters& counters, DB* db, bool test_compaction) { } } +void testCountersWithFlushAndCompaction(Counters& counters, DB* db) { + ASSERT_OK(db->Put({}, "1", "1")); + ASSERT_OK(db->Flush(FlushOptions())); + + std::atomic cnt{0}; + const auto get_thread_id = [&cnt]() { + thread_local int thread_id{cnt++}; + return thread_id; + }; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::LogAndApply:BeforeWriterWaiting", [&](void* /*arg*/) { + int thread_id = get_thread_id(); + if (1 == thread_id) { + TEST_SYNC_POINT( + "testCountersWithFlushAndCompaction::bg_compact_thread:0"); + } else if (2 == thread_id) { + TEST_SYNC_POINT( + "testCountersWithFlushAndCompaction::bg_flush_thread:0"); + } + }); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::LogAndApply:WriteManifest", [&](void* /*arg*/) { + int thread_id = get_thread_id(); + if (0 == thread_id) { + TEST_SYNC_POINT( + "testCountersWithFlushAndCompaction::set_options_thread:0"); + TEST_SYNC_POINT( + "testCountersWithFlushAndCompaction::set_options_thread:1"); + } + }); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::LogAndApply:WakeUpAndDone", [&](void* arg) { + auto* mutex = reinterpret_cast(arg); + mutex->AssertHeld(); + int thread_id = get_thread_id(); + ASSERT_EQ(2, thread_id); + mutex->Unlock(); + TEST_SYNC_POINT( + "testCountersWithFlushAndCompaction::bg_flush_thread:1"); + TEST_SYNC_POINT( + "testCountersWithFlushAndCompaction::bg_flush_thread:2"); + mutex->Lock(); + }); + SyncPoint::GetInstance()->LoadDependency({ + {"testCountersWithFlushAndCompaction::set_options_thread:0", + "testCountersWithCompactionAndFlush:BeforeCompact"}, + {"testCountersWithFlushAndCompaction::bg_compact_thread:0", + "testCountersWithFlushAndCompaction:BeforeIncCounters"}, + {"testCountersWithFlushAndCompaction::bg_flush_thread:0", + "testCountersWithFlushAndCompaction::set_options_thread:1"}, + {"testCountersWithFlushAndCompaction::bg_flush_thread:1", + "testCountersWithFlushAndCompaction:BeforeVerification"}, + {"testCountersWithFlushAndCompaction:AfterGet", + "testCountersWithFlushAndCompaction::bg_flush_thread:2"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + + port::Thread set_options_thread([&]() { + ASSERT_OK(reinterpret_cast(db)->SetOptions( + {{"disable_auto_compactions", "false"}})); + }); + TEST_SYNC_POINT("testCountersWithCompactionAndFlush:BeforeCompact"); + port::Thread compact_thread([&]() { + ASSERT_OK(reinterpret_cast(db)->CompactRange( + CompactRangeOptions(), db->DefaultColumnFamily(), nullptr, nullptr)); + }); + + TEST_SYNC_POINT("testCountersWithFlushAndCompaction:BeforeIncCounters"); + counters.add("test-key", 1); + + FlushOptions flush_opts; + flush_opts.wait = false; + ASSERT_OK(db->Flush(flush_opts)); + + TEST_SYNC_POINT("testCountersWithFlushAndCompaction:BeforeVerification"); + std::string expected; + PutFixed64(&expected, 1); + std::string actual; + Status s = db->Get(ReadOptions(), "test-key", &actual); + TEST_SYNC_POINT("testCountersWithFlushAndCompaction:AfterGet"); + set_options_thread.join(); + compact_thread.join(); + ASSERT_OK(s); + ASSERT_EQ(expected, actual); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + void testSuccessiveMerge(Counters& counters, size_t max_num_merges, size_t num_merges) { @@ -488,6 +580,19 @@ TEST_F(MergeTest, MergeDbTtlTest) { runTest(test::PerThreadDBPath("merge_testdbttl"), true); // Run test on TTL database } + +TEST_F(MergeTest, MergeWithCompactionAndFlush) { + const std::string dbname = + test::PerThreadDBPath("merge_with_compaction_and_flush"); + { + auto db = OpenDb(dbname); + { + MergeBasedCounters counters(db, 0); + testCountersWithFlushAndCompaction(counters, db.get()); + } + } + DestroyDB(dbname, Options()); +} #endif // !ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_set.cc b/db/version_set.cc index 91fedab4d..13b2eb518 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3665,16 +3665,18 @@ struct VersionSet::ManifestWriter { ColumnFamilyData* cfd; const MutableCFOptions mutable_cf_options; const autovector& edit_list; + const std::function manifest_write_callback; - explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd, - const MutableCFOptions& cf_options, - const autovector& e) + explicit ManifestWriter( + InstrumentedMutex* mu, ColumnFamilyData* _cfd, + const MutableCFOptions& cf_options, const autovector& e, + const std::function& manifest_wcb) : done(false), cv(mu), cfd(_cfd), mutable_cf_options(cf_options), - edit_list(e) {} - + edit_list(e), + manifest_write_callback(manifest_wcb) {} ~ManifestWriter() { status.PermitUncheckedError(); } bool IsAllWalEdits() const { @@ -4044,7 +4046,7 @@ Status VersionSet::ProcessManifestWrites( FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_); mu->Unlock(); - TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest"); + TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WriteManifest", nullptr); if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) { for (int i = 0; i < static_cast(versions.size()); ++i) { assert(!builder_guards.empty() && @@ -4294,6 +4296,9 @@ Status VersionSet::ProcessManifestWrites( } ready->status = s; ready->done = true; + if (ready->manifest_write_callback) { + (ready->manifest_write_callback)(s); + } if (need_signal) { ready->cv.Signal(); } @@ -4314,7 +4319,8 @@ Status VersionSet::LogAndApply( const autovector& mutable_cf_options_list, const autovector>& edit_lists, InstrumentedMutex* mu, FSDirectory* db_directory, bool new_descriptor_log, - const ColumnFamilyOptions* new_cf_options) { + const ColumnFamilyOptions* new_cf_options, + const std::vector>& manifest_wcbs) { mu->AssertHeld(); int num_edits = 0; for (const auto& elist : edit_lists) { @@ -4344,12 +4350,16 @@ Status VersionSet::LogAndApply( assert(static_cast(num_cfds) == edit_lists.size()); } for (int i = 0; i < num_cfds; ++i) { + const auto wcb = + manifest_wcbs.empty() ? [](const Status&) {} : manifest_wcbs[i]; writers.emplace_back(mu, column_family_datas[i], - *mutable_cf_options_list[i], edit_lists[i]); + *mutable_cf_options_list[i], edit_lists[i], wcb); manifest_writers_.push_back(&writers[i]); } assert(!writers.empty()); ManifestWriter& first_writer = writers.front(); + TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:BeforeWriterWaiting", + nullptr); while (!first_writer.done && &first_writer != manifest_writers_.front()) { first_writer.cv.Wait(); } @@ -4361,6 +4371,7 @@ Status VersionSet::LogAndApply( for (const auto& writer : writers) { assert(writer.done); } + TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WakeUpAndDone", mu); #endif /* !NDEBUG */ return first_writer.status; } diff --git a/db/version_set.h b/db/version_set.h index 8bc9b3ef6..613133610 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -958,7 +958,8 @@ class VersionSet { const MutableCFOptions& mutable_cf_options, const autovector& edit_list, InstrumentedMutex* mu, FSDirectory* db_directory = nullptr, bool new_descriptor_log = false, - const ColumnFamilyOptions* column_family_options = nullptr) { + const ColumnFamilyOptions* column_family_options = nullptr, + const std::function& manifest_wcb = {}) { autovector cfds; cfds.emplace_back(column_family_data); autovector mutable_cf_options_list; @@ -966,7 +967,8 @@ class VersionSet { autovector> edit_lists; edit_lists.emplace_back(edit_list); return LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu, - db_directory, new_descriptor_log, column_family_options); + db_directory, new_descriptor_log, column_family_options, + {manifest_wcb}); } // The across-multi-cf batch version. If edit_lists contain more than @@ -978,7 +980,9 @@ class VersionSet { const autovector>& edit_lists, InstrumentedMutex* mu, FSDirectory* db_directory = nullptr, bool new_descriptor_log = false, - const ColumnFamilyOptions* new_cf_options = nullptr); + const ColumnFamilyOptions* new_cf_options = nullptr, + const std::vector>& manifest_wcbs = + {}); static Status GetCurrentManifestPath(const std::string& dbname, FileSystem* fs, @@ -1419,8 +1423,9 @@ class ReactiveVersionSet : public VersionSet { const autovector& /*mutable_cf_options_list*/, const autovector>& /*edit_lists*/, InstrumentedMutex* /*mu*/, FSDirectory* /*db_directory*/, - bool /*new_descriptor_log*/, - const ColumnFamilyOptions* /*new_cf_option*/) override { + bool /*new_descriptor_log*/, const ColumnFamilyOptions* /*new_cf_option*/, + const std::vector>& /*manifest_wcbs*/) + override { return Status::NotSupported("not supported in reactive mode"); }