diff --git a/HISTORY.md b/HISTORY.md index 2851f5373..6fc3bb8b2 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,7 @@ * Since 6.14, fix false positive flush/compaction `Status::Corruption` failure when `paranoid_file_checks == true` and range tombstones were written to the compaction output files. * Since 6.14, fix a bug that could cause a stalled write to crash with mixed of slowdown and no_slowdown writes (`WriteOptions.no_slowdown=true`). * Fixed a bug which causes hang in closing DB when refit level is set in opt build. It was because ContinueBackgroundWork() was called in assert statement which is a no op. It was introduced in 6.14. +* Perform post-flush updates of in-memory data structures of memtable list in a callback that will be called by the thread writing to MANIFEST file before signaling other threads and releasing db mutex. ### Public API Change * Deprecate `BlockBasedTableOptions::pin_l0_filter_and_index_blocks_in_cache` and `BlockBasedTableOptions::pin_top_level_index_and_filter`. These options still take effect until users migrate to the replacement APIs in `BlockBasedTableOptions::metadata_cache_options`. Migration guidance can be found in the API comments on the deprecated options. diff --git a/db/memtable_list.cc b/db/memtable_list.cc index dced9f7db..1e1758d59 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -481,83 +481,18 @@ Status MemTableList::TryInstallMemtableFlushResults( vset, *cfd, edit_list, memtables_to_flush, prep_tracker)); } + const auto manifest_write_cb = [this, cfd, batch_count, log_buffer, + to_delete, mu](const Status& status) { + RemoveMemTablesOrRestoreFlags(status, cfd, batch_count, log_buffer, + to_delete, mu); + }; + // this can release and reacquire the mutex. s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, - db_directory); + db_directory, /*new_descriptor_log=*/false, + /*column_family_options=*/nullptr, + manifest_write_cb); *io_s = vset->io_status(); - - // we will be changing the version in the next code path, - // so we better create a new one, since versions are immutable - InstallNewVersion(); - - // All the later memtables that have the same filenum - // are part of the same batch. They can be committed now. - uint64_t mem_id = 1; // how many memtables have been flushed. - - // commit new state only if the column family is NOT dropped. - // The reason is as follows (refer to - // ColumnFamilyTest.FlushAndDropRaceCondition). - // If the column family is dropped, then according to LogAndApply, its - // corresponding flush operation is NOT written to the MANIFEST. This - // means the DB is not aware of the L0 files generated from the flush. - // By committing the new state, we remove the memtable from the memtable - // list. Creating an iterator on this column family will not be able to - // read full data since the memtable is removed, and the DB is not aware - // of the L0 files, causing MergingIterator unable to build child - // iterators. RocksDB contract requires that the iterator can be created - // on a dropped column family, and we must be able to - // read full data as long as column family handle is not deleted, even if - // the column family is dropped. - if (s.ok() && !cfd->IsDropped()) { // commit new state - while (batch_count-- > 0) { - MemTable* m = current_->memlist_.back(); - if (m->edit_.GetBlobFileAdditions().empty()) { - ROCKS_LOG_BUFFER(log_buffer, - "[%s] Level-0 commit table #%" PRIu64 - ": memtable #%" PRIu64 " done", - cfd->GetName().c_str(), m->file_number_, mem_id); - } else { - ROCKS_LOG_BUFFER(log_buffer, - "[%s] Level-0 commit table #%" PRIu64 - " (+%zu blob files)" - ": memtable #%" PRIu64 " done", - cfd->GetName().c_str(), m->file_number_, - m->edit_.GetBlobFileAdditions().size(), mem_id); - } - - assert(m->file_number_ > 0); - current_->Remove(m, to_delete); - UpdateCachedValuesFromMemTableListVersion(); - ResetTrimHistoryNeeded(); - ++mem_id; - } - } else { - for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; ++it) { - MemTable* m = *it; - // commit failed. setup state so that we can flush again. - if (m->edit_.GetBlobFileAdditions().empty()) { - ROCKS_LOG_BUFFER(log_buffer, - "Level-0 commit table #%" PRIu64 - ": memtable #%" PRIu64 " failed", - m->file_number_, mem_id); - } else { - ROCKS_LOG_BUFFER(log_buffer, - "Level-0 commit table #%" PRIu64 - " (+%zu blob files)" - ": memtable #%" PRIu64 " failed", - m->file_number_, - m->edit_.GetBlobFileAdditions().size(), mem_id); - } - - m->flush_completed_ = false; - m->flush_in_progress_ = false; - m->edit_.Clear(); - num_flush_not_started_++; - m->file_number_ = 0; - imm_flush_needed.store(true, std::memory_order_release); - ++mem_id; - } - } } } commit_in_progress_ = false; @@ -642,6 +577,87 @@ void MemTableList::InstallNewVersion() { } } +void MemTableList::RemoveMemTablesOrRestoreFlags( + const Status& s, ColumnFamilyData* cfd, size_t batch_count, + LogBuffer* log_buffer, autovector* to_delete, + InstrumentedMutex* mu) { + assert(mu); + mu->AssertHeld(); + assert(to_delete); + // we will be changing the version in the next code path, + // so we better create a new one, since versions are immutable + InstallNewVersion(); + + // All the later memtables that have the same filenum + // are part of the same batch. They can be committed now. + uint64_t mem_id = 1; // how many memtables have been flushed. + + // commit new state only if the column family is NOT dropped. + // The reason is as follows (refer to + // ColumnFamilyTest.FlushAndDropRaceCondition). + // If the column family is dropped, then according to LogAndApply, its + // corresponding flush operation is NOT written to the MANIFEST. This + // means the DB is not aware of the L0 files generated from the flush. + // By committing the new state, we remove the memtable from the memtable + // list. Creating an iterator on this column family will not be able to + // read full data since the memtable is removed, and the DB is not aware + // of the L0 files, causing MergingIterator unable to build child + // iterators. RocksDB contract requires that the iterator can be created + // on a dropped column family, and we must be able to + // read full data as long as column family handle is not deleted, even if + // the column family is dropped. + if (s.ok() && !cfd->IsDropped()) { // commit new state + while (batch_count-- > 0) { + MemTable* m = current_->memlist_.back(); + if (m->edit_.GetBlobFileAdditions().empty()) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + ": memtable #%" PRIu64 " done", + cfd->GetName().c_str(), m->file_number_, mem_id); + } else { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + " (+%zu blob files)" + ": memtable #%" PRIu64 " done", + cfd->GetName().c_str(), m->file_number_, + m->edit_.GetBlobFileAdditions().size(), mem_id); + } + + assert(m->file_number_ > 0); + current_->Remove(m, to_delete); + UpdateCachedValuesFromMemTableListVersion(); + ResetTrimHistoryNeeded(); + ++mem_id; + } + } else { + for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; ++it) { + MemTable* m = *it; + // commit failed. setup state so that we can flush again. + if (m->edit_.GetBlobFileAdditions().empty()) { + ROCKS_LOG_BUFFER(log_buffer, + "Level-0 commit table #%" PRIu64 ": memtable #%" PRIu64 + " failed", + m->file_number_, mem_id); + } else { + ROCKS_LOG_BUFFER(log_buffer, + "Level-0 commit table #%" PRIu64 + " (+%zu blob files)" + ": memtable #%" PRIu64 " failed", + m->file_number_, + m->edit_.GetBlobFileAdditions().size(), mem_id); + } + + m->flush_completed_ = false; + m->flush_in_progress_ = false; + m->edit_.Clear(); + num_flush_not_started_++; + m->file_number_ = 0; + imm_flush_needed.store(true, std::memory_order_release); + ++mem_id; + } + } +} + uint64_t MemTableList::PrecomputeMinLogContainingPrepSection( const autovector& memtables_to_flush) { uint64_t min_log = 0; diff --git a/db/memtable_list.h b/db/memtable_list.h index 72105d266..0af7a0cea 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -389,6 +389,13 @@ class MemTableList { // DB mutex held void InstallNewVersion(); + // DB mutex held + // Called after writing to MANIFEST + void RemoveMemTablesOrRestoreFlags(const Status& s, ColumnFamilyData* cfd, + size_t batch_count, LogBuffer* log_buffer, + autovector* to_delete, + InstrumentedMutex* mu); + const int min_write_buffer_number_to_merge_; MemTableListVersion* current_; diff --git a/db/merge_test.cc b/db/merge_test.cc index 3f85f6464..047ea7fac 100644 --- a/db/merge_test.cc +++ b/db/merge_test.cc @@ -4,8 +4,9 @@ // (found in the LICENSE.Apache file in the root directory). // #include -#include + #include +#include #include "db/db_impl/db_impl.h" #include "db/dbformat.h" @@ -18,6 +19,7 @@ #include "rocksdb/merge_operator.h" #include "rocksdb/utilities/db_ttl.h" #include "test_util/testharness.h" +#include "util/coding.h" #include "utilities/merge_operators.h" namespace ROCKSDB_NAMESPACE { @@ -296,6 +298,96 @@ void testCounters(Counters& counters, DB* db, bool test_compaction) { } } +void testCountersWithFlushAndCompaction(Counters& counters, DB* db) { + ASSERT_OK(db->Put({}, "1", "1")); + ASSERT_OK(db->Flush(FlushOptions())); + + std::atomic cnt{0}; + const auto get_thread_id = [&cnt]() { + thread_local int thread_id{cnt++}; + return thread_id; + }; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::LogAndApply:BeforeWriterWaiting", [&](void* /*arg*/) { + int thread_id = get_thread_id(); + if (1 == thread_id) { + TEST_SYNC_POINT( + "testCountersWithFlushAndCompaction::bg_compact_thread:0"); + } else if (2 == thread_id) { + TEST_SYNC_POINT( + "testCountersWithFlushAndCompaction::bg_flush_thread:0"); + } + }); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::LogAndApply:WriteManifest", [&](void* /*arg*/) { + int thread_id = get_thread_id(); + if (0 == thread_id) { + TEST_SYNC_POINT( + "testCountersWithFlushAndCompaction::set_options_thread:0"); + TEST_SYNC_POINT( + "testCountersWithFlushAndCompaction::set_options_thread:1"); + } + }); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::LogAndApply:WakeUpAndDone", [&](void* arg) { + auto* mutex = reinterpret_cast(arg); + mutex->AssertHeld(); + int thread_id = get_thread_id(); + ASSERT_EQ(2, thread_id); + mutex->Unlock(); + TEST_SYNC_POINT( + "testCountersWithFlushAndCompaction::bg_flush_thread:1"); + TEST_SYNC_POINT( + "testCountersWithFlushAndCompaction::bg_flush_thread:2"); + mutex->Lock(); + }); + SyncPoint::GetInstance()->LoadDependency({ + {"testCountersWithFlushAndCompaction::set_options_thread:0", + "testCountersWithCompactionAndFlush:BeforeCompact"}, + {"testCountersWithFlushAndCompaction::bg_compact_thread:0", + "testCountersWithFlushAndCompaction:BeforeIncCounters"}, + {"testCountersWithFlushAndCompaction::bg_flush_thread:0", + "testCountersWithFlushAndCompaction::set_options_thread:1"}, + {"testCountersWithFlushAndCompaction::bg_flush_thread:1", + "testCountersWithFlushAndCompaction:BeforeVerification"}, + {"testCountersWithFlushAndCompaction:AfterGet", + "testCountersWithFlushAndCompaction::bg_flush_thread:2"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + + port::Thread set_options_thread([&]() { + ASSERT_OK(reinterpret_cast(db)->SetOptions( + {{"disable_auto_compactions", "false"}})); + }); + TEST_SYNC_POINT("testCountersWithCompactionAndFlush:BeforeCompact"); + port::Thread compact_thread([&]() { + ASSERT_OK(reinterpret_cast(db)->CompactRange( + CompactRangeOptions(), db->DefaultColumnFamily(), nullptr, nullptr)); + }); + + TEST_SYNC_POINT("testCountersWithFlushAndCompaction:BeforeIncCounters"); + counters.add("test-key", 1); + + FlushOptions flush_opts; + flush_opts.wait = false; + ASSERT_OK(db->Flush(flush_opts)); + + TEST_SYNC_POINT("testCountersWithFlushAndCompaction:BeforeVerification"); + std::string expected; + PutFixed64(&expected, 1); + std::string actual; + Status s = db->Get(ReadOptions(), "test-key", &actual); + TEST_SYNC_POINT("testCountersWithFlushAndCompaction:AfterGet"); + set_options_thread.join(); + compact_thread.join(); + ASSERT_OK(s); + ASSERT_EQ(expected, actual); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + void testSuccessiveMerge(Counters& counters, size_t max_num_merges, size_t num_merges) { @@ -488,6 +580,19 @@ TEST_F(MergeTest, MergeDbTtlTest) { runTest(test::PerThreadDBPath("merge_testdbttl"), true); // Run test on TTL database } + +TEST_F(MergeTest, MergeWithCompactionAndFlush) { + const std::string dbname = + test::PerThreadDBPath("merge_with_compaction_and_flush"); + { + auto db = OpenDb(dbname); + { + MergeBasedCounters counters(db, 0); + testCountersWithFlushAndCompaction(counters, db.get()); + } + } + DestroyDB(dbname, Options()); +} #endif // !ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_set.cc b/db/version_set.cc index 91fedab4d..13b2eb518 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3665,16 +3665,18 @@ struct VersionSet::ManifestWriter { ColumnFamilyData* cfd; const MutableCFOptions mutable_cf_options; const autovector& edit_list; + const std::function manifest_write_callback; - explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd, - const MutableCFOptions& cf_options, - const autovector& e) + explicit ManifestWriter( + InstrumentedMutex* mu, ColumnFamilyData* _cfd, + const MutableCFOptions& cf_options, const autovector& e, + const std::function& manifest_wcb) : done(false), cv(mu), cfd(_cfd), mutable_cf_options(cf_options), - edit_list(e) {} - + edit_list(e), + manifest_write_callback(manifest_wcb) {} ~ManifestWriter() { status.PermitUncheckedError(); } bool IsAllWalEdits() const { @@ -4044,7 +4046,7 @@ Status VersionSet::ProcessManifestWrites( FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_); mu->Unlock(); - TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest"); + TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WriteManifest", nullptr); if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) { for (int i = 0; i < static_cast(versions.size()); ++i) { assert(!builder_guards.empty() && @@ -4294,6 +4296,9 @@ Status VersionSet::ProcessManifestWrites( } ready->status = s; ready->done = true; + if (ready->manifest_write_callback) { + (ready->manifest_write_callback)(s); + } if (need_signal) { ready->cv.Signal(); } @@ -4314,7 +4319,8 @@ Status VersionSet::LogAndApply( const autovector& mutable_cf_options_list, const autovector>& edit_lists, InstrumentedMutex* mu, FSDirectory* db_directory, bool new_descriptor_log, - const ColumnFamilyOptions* new_cf_options) { + const ColumnFamilyOptions* new_cf_options, + const std::vector>& manifest_wcbs) { mu->AssertHeld(); int num_edits = 0; for (const auto& elist : edit_lists) { @@ -4344,12 +4350,16 @@ Status VersionSet::LogAndApply( assert(static_cast(num_cfds) == edit_lists.size()); } for (int i = 0; i < num_cfds; ++i) { + const auto wcb = + manifest_wcbs.empty() ? [](const Status&) {} : manifest_wcbs[i]; writers.emplace_back(mu, column_family_datas[i], - *mutable_cf_options_list[i], edit_lists[i]); + *mutable_cf_options_list[i], edit_lists[i], wcb); manifest_writers_.push_back(&writers[i]); } assert(!writers.empty()); ManifestWriter& first_writer = writers.front(); + TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:BeforeWriterWaiting", + nullptr); while (!first_writer.done && &first_writer != manifest_writers_.front()) { first_writer.cv.Wait(); } @@ -4361,6 +4371,7 @@ Status VersionSet::LogAndApply( for (const auto& writer : writers) { assert(writer.done); } + TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WakeUpAndDone", mu); #endif /* !NDEBUG */ return first_writer.status; } diff --git a/db/version_set.h b/db/version_set.h index 8bc9b3ef6..613133610 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -958,7 +958,8 @@ class VersionSet { const MutableCFOptions& mutable_cf_options, const autovector& edit_list, InstrumentedMutex* mu, FSDirectory* db_directory = nullptr, bool new_descriptor_log = false, - const ColumnFamilyOptions* column_family_options = nullptr) { + const ColumnFamilyOptions* column_family_options = nullptr, + const std::function& manifest_wcb = {}) { autovector cfds; cfds.emplace_back(column_family_data); autovector mutable_cf_options_list; @@ -966,7 +967,8 @@ class VersionSet { autovector> edit_lists; edit_lists.emplace_back(edit_list); return LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu, - db_directory, new_descriptor_log, column_family_options); + db_directory, new_descriptor_log, column_family_options, + {manifest_wcb}); } // The across-multi-cf batch version. If edit_lists contain more than @@ -978,7 +980,9 @@ class VersionSet { const autovector>& edit_lists, InstrumentedMutex* mu, FSDirectory* db_directory = nullptr, bool new_descriptor_log = false, - const ColumnFamilyOptions* new_cf_options = nullptr); + const ColumnFamilyOptions* new_cf_options = nullptr, + const std::vector>& manifest_wcbs = + {}); static Status GetCurrentManifestPath(const std::string& dbname, FileSystem* fs, @@ -1419,8 +1423,9 @@ class ReactiveVersionSet : public VersionSet { const autovector& /*mutable_cf_options_list*/, const autovector>& /*edit_lists*/, InstrumentedMutex* /*mu*/, FSDirectory* /*db_directory*/, - bool /*new_descriptor_log*/, - const ColumnFamilyOptions* /*new_cf_option*/) override { + bool /*new_descriptor_log*/, const ColumnFamilyOptions* /*new_cf_option*/, + const std::vector>& /*manifest_wcbs*/) + override { return Status::NotSupported("not supported in reactive mode"); }