From a84459120195b831d89e6370b5bcb9ead08d3f29 Mon Sep 17 00:00:00 2001 From: Connor Date: Thu, 12 Dec 2019 14:05:48 -0800 Subject: [PATCH] wait pending memtable writes on file ingestion or compact range (#6113) Summary: **Summary:** This PR fixes two unordered_write related issues: - ingestion job may skip the necessary memtable flush https://github.com/facebook/rocksdb/issues/6026 - compact range may cause memtable is flushed before pending unordered write finished 1. `CompactRange` triggers memtable flush but doesn't wait for pending-writes 2. there are some pending writes but memtable is already flushed 3. the memtable related WAL is removed( note that the pending-writes were recorded in that WAL). 4. pending-writes write to newer created memtable 5. there is a restart 6. missing the previous pending-writes because WAL is removed but they aren't included in SST. **How to solve:** - Wait pending memtable writes before ingestion job check memtable key range - Wait pending memtable writes before flush memtable. **Note that: `CompactRange` calls `RangesOverlapWithMemtables` too without waiting for pending waits, but I'm not sure whether it affects the correctness.** **Test Plan:** make check Pull Request resolved: https://github.com/facebook/rocksdb/pull/6113 Differential Revision: D18895674 Pulled By: maysamyabandeh fbshipit-source-id: da22b4476fc7e06c176020e7cc171eb78189ecaf --- db/db_compaction_test.cc | 31 ++++++++++++++++++++++++++ db/db_impl/db_impl.cc | 7 ++++++ db/db_impl/db_impl.h | 1 + db/db_impl/db_impl_compaction_flush.cc | 6 +++-- db/db_impl/db_impl_write.cc | 2 ++ db/error_handler_test.cc | 6 +++-- db/external_sst_file_test.cc | 29 ++++++++++++++++++++++++ 7 files changed, 78 insertions(+), 4 deletions(-) diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 84f9f55dd..178386206 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -1503,6 +1503,37 @@ TEST_F(DBCompactionTest, DISABLED_ManualPartialFill) { } } +TEST_F(DBCompactionTest, ManualCompactionWithUnorderedWrite) { + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL", + "DBCompactionTest::ManualCompactionWithUnorderedWrite:WaitWriteWAL"}, + {"DBImpl::WaitForPendingWrites:BeforeBlock", + "DBImpl::WriteImpl:BeforeUnorderedWriteMemtable"}}); + + Options options = CurrentOptions(); + options.unordered_write = true; + DestroyAndReopen(options); + Put("foo", "v1"); + ASSERT_OK(Flush()); + + Put("bar", "v1"); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + port::Thread writer([&]() { Put("foo", "v2"); }); + + TEST_SYNC_POINT( + "DBCompactionTest::ManualCompactionWithUnorderedWrite:WaitWriteWAL"); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + writer.join(); + ASSERT_EQ(Get("foo"), "v2"); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + Reopen(options); + ASSERT_EQ(Get("foo"), "v2"); +} + TEST_F(DBCompactionTest, DeleteFileRange) { Options options = CurrentOptions(); options.write_buffer_size = 10 * 1024 * 1024; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 2504831a3..bb5d3d263 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -3958,6 +3958,13 @@ Status DBImpl::IngestExternalFiles( nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); } + // When unordered_write is enabled, the keys are writing to memtable in an + // unordered way. If the ingestion job checks memtable key range before the + // key landing in memtable, the ingestion job may skip the necessary + // memtable flush. + // So wait here to ensure there is no pending write to memtable. + WaitForPendingWrites(); + num_running_ingest_file_ += static_cast(num_cfs); TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter"); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 5d3f6830a..d56e0e14d 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1416,6 +1416,7 @@ class DBImpl : public DB { inline void WaitForPendingWrites() { mutex_.AssertHeld(); + TEST_SYNC_POINT("DBImpl::WaitForPendingWrites:BeforeBlock"); // In case of pipelined write is enabled, wait for all pending memtable // writers. if (immutable_db_options_.enable_pipelined_write) { diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index b389e40f4..2336b3e92 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1540,6 +1540,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); } } + WaitForPendingWrites(); if (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) { s = SwitchMemtable(cfd, &context); @@ -1625,11 +1626,11 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, } } } - TEST_SYNC_POINT("FlushMemTableFinished"); + TEST_SYNC_POINT("DBImpl::FlushMemTable:FlushMemTableFinished"); return s; } -// Flush all elments in 'column_family_datas' +// Flush all elements in 'column_family_datas' // and atomically record the result to the MANIFEST. Status DBImpl::AtomicFlushMemTables( const autovector& column_family_datas, @@ -1665,6 +1666,7 @@ Status DBImpl::AtomicFlushMemTables( nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); } } + WaitForPendingWrites(); for (auto cfd : column_family_datas) { if (cfd->IsDropped()) { diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 34193cabc..9d38fb74c 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -131,6 +131,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, log_used, log_ref, &seq, sub_batch_cnt, pre_release_callback, kDoAssignOrder, kDoPublishLastSeq, disable_memtable); + TEST_SYNC_POINT("DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL"); if (!status.ok()) { return status; } @@ -138,6 +139,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, *seq_used = seq; } if (!disable_memtable) { + TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeUnorderedWriteMemtable"); status = UnorderedWriteMemtable(write_options, my_batch, callback, log_ref, seq, sub_batch_cnt); } diff --git a/db/error_handler_test.cc b/db/error_handler_test.cc index c18706fc2..879686e9f 100644 --- a/db/error_handler_test.cc +++ b/db/error_handler_test.cc @@ -183,7 +183,8 @@ TEST_F(DBErrorHandlingTest, CompactionWriteError) { ); listener->EnableAutoRecovery(false); rocksdb::SyncPoint::GetInstance()->LoadDependency( - {{"FlushMemTableFinished", "BackgroundCallCompaction:0"}}); + {{"DBImpl::FlushMemTable:FlushMemTableFinished", + "BackgroundCallCompaction:0"}}); rocksdb::SyncPoint::GetInstance()->SetCallBack( "BackgroundCallCompaction:0", [&](void *) { fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space")); @@ -219,7 +220,8 @@ TEST_F(DBErrorHandlingTest, CorruptionError) { ASSERT_EQ(s, Status::OK()); rocksdb::SyncPoint::GetInstance()->LoadDependency( - {{"FlushMemTableFinished", "BackgroundCallCompaction:0"}}); + {{"DBImpl::FlushMemTable:FlushMemTableFinished", + "BackgroundCallCompaction:0"}}); rocksdb::SyncPoint::GetInstance()->SetCallBack( "BackgroundCallCompaction:0", [&](void *) { fault_env->SetFilesystemActive(false, Status::Corruption("Corruption")); diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index 3a059773f..c949f2258 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -1700,6 +1700,35 @@ TEST_F(ExternalSSTFileTest, SstFileWriterNonSharedKeys) { ASSERT_OK(DeprecatedAddFile({file_path})); } +TEST_F(ExternalSSTFileTest, WithUnorderedWrite) { + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL", + "ExternalSSTFileTest::WithUnorderedWrite:WaitWriteWAL"}, + {"DBImpl::WaitForPendingWrites:BeforeBlock", + "DBImpl::WriteImpl:BeforeUnorderedWriteMemtable"}}); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::IngestExternalFile:NeedFlush", [&](void* need_flush) { + ASSERT_TRUE(*reinterpret_cast(need_flush)); + }); + + Options options = CurrentOptions(); + options.unordered_write = true; + DestroyAndReopen(options); + Put("foo", "v1"); + SyncPoint::GetInstance()->EnableProcessing(); + port::Thread writer([&]() { Put("bar", "v2"); }); + + TEST_SYNC_POINT("ExternalSSTFileTest::WithUnorderedWrite:WaitWriteWAL"); + ASSERT_OK(GenerateAndAddExternalFile(options, {{"bar", "v3"}}, -1, + true /* allow_global_seqno */)); + ASSERT_EQ(Get("bar"), "v3"); + + writer.join(); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + TEST_P(ExternalSSTFileTest, IngestFileWithGlobalSeqnoRandomized) { Options options = CurrentOptions(); options.IncreaseParallelism(20);