diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 84f9f55dd..178386206 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -1503,6 +1503,37 @@ TEST_F(DBCompactionTest, DISABLED_ManualPartialFill) { } } +TEST_F(DBCompactionTest, ManualCompactionWithUnorderedWrite) { + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL", + "DBCompactionTest::ManualCompactionWithUnorderedWrite:WaitWriteWAL"}, + {"DBImpl::WaitForPendingWrites:BeforeBlock", + "DBImpl::WriteImpl:BeforeUnorderedWriteMemtable"}}); + + Options options = CurrentOptions(); + options.unordered_write = true; + DestroyAndReopen(options); + Put("foo", "v1"); + ASSERT_OK(Flush()); + + Put("bar", "v1"); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + port::Thread writer([&]() { Put("foo", "v2"); }); + + TEST_SYNC_POINT( + "DBCompactionTest::ManualCompactionWithUnorderedWrite:WaitWriteWAL"); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + writer.join(); + ASSERT_EQ(Get("foo"), "v2"); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + Reopen(options); + ASSERT_EQ(Get("foo"), "v2"); +} + TEST_F(DBCompactionTest, DeleteFileRange) { Options options = CurrentOptions(); options.write_buffer_size = 10 * 1024 * 1024; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 2504831a3..bb5d3d263 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -3958,6 +3958,13 @@ Status DBImpl::IngestExternalFiles( nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); } + // When unordered_write is enabled, the keys are writing to memtable in an + // unordered way. If the ingestion job checks memtable key range before the + // key landing in memtable, the ingestion job may skip the necessary + // memtable flush. + // So wait here to ensure there is no pending write to memtable. + WaitForPendingWrites(); + num_running_ingest_file_ += static_cast(num_cfs); TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter"); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 5d3f6830a..d56e0e14d 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1416,6 +1416,7 @@ class DBImpl : public DB { inline void WaitForPendingWrites() { mutex_.AssertHeld(); + TEST_SYNC_POINT("DBImpl::WaitForPendingWrites:BeforeBlock"); // In case of pipelined write is enabled, wait for all pending memtable // writers. if (immutable_db_options_.enable_pipelined_write) { diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index b389e40f4..2336b3e92 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1540,6 +1540,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); } } + WaitForPendingWrites(); if (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) { s = SwitchMemtable(cfd, &context); @@ -1625,11 +1626,11 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, } } } - TEST_SYNC_POINT("FlushMemTableFinished"); + TEST_SYNC_POINT("DBImpl::FlushMemTable:FlushMemTableFinished"); return s; } -// Flush all elments in 'column_family_datas' +// Flush all elements in 'column_family_datas' // and atomically record the result to the MANIFEST. Status DBImpl::AtomicFlushMemTables( const autovector& column_family_datas, @@ -1665,6 +1666,7 @@ Status DBImpl::AtomicFlushMemTables( nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); } } + WaitForPendingWrites(); for (auto cfd : column_family_datas) { if (cfd->IsDropped()) { diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 34193cabc..9d38fb74c 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -131,6 +131,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, log_used, log_ref, &seq, sub_batch_cnt, pre_release_callback, kDoAssignOrder, kDoPublishLastSeq, disable_memtable); + TEST_SYNC_POINT("DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL"); if (!status.ok()) { return status; } @@ -138,6 +139,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, *seq_used = seq; } if (!disable_memtable) { + TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeUnorderedWriteMemtable"); status = UnorderedWriteMemtable(write_options, my_batch, callback, log_ref, seq, sub_batch_cnt); } diff --git a/db/error_handler_test.cc b/db/error_handler_test.cc index c18706fc2..879686e9f 100644 --- a/db/error_handler_test.cc +++ b/db/error_handler_test.cc @@ -183,7 +183,8 @@ TEST_F(DBErrorHandlingTest, CompactionWriteError) { ); listener->EnableAutoRecovery(false); rocksdb::SyncPoint::GetInstance()->LoadDependency( - {{"FlushMemTableFinished", "BackgroundCallCompaction:0"}}); + {{"DBImpl::FlushMemTable:FlushMemTableFinished", + "BackgroundCallCompaction:0"}}); rocksdb::SyncPoint::GetInstance()->SetCallBack( "BackgroundCallCompaction:0", [&](void *) { fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space")); @@ -219,7 +220,8 @@ TEST_F(DBErrorHandlingTest, CorruptionError) { ASSERT_EQ(s, Status::OK()); rocksdb::SyncPoint::GetInstance()->LoadDependency( - {{"FlushMemTableFinished", "BackgroundCallCompaction:0"}}); + {{"DBImpl::FlushMemTable:FlushMemTableFinished", + "BackgroundCallCompaction:0"}}); rocksdb::SyncPoint::GetInstance()->SetCallBack( "BackgroundCallCompaction:0", [&](void *) { fault_env->SetFilesystemActive(false, Status::Corruption("Corruption")); diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index 3a059773f..c949f2258 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -1700,6 +1700,35 @@ TEST_F(ExternalSSTFileTest, SstFileWriterNonSharedKeys) { ASSERT_OK(DeprecatedAddFile({file_path})); } +TEST_F(ExternalSSTFileTest, WithUnorderedWrite) { + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL", + "ExternalSSTFileTest::WithUnorderedWrite:WaitWriteWAL"}, + {"DBImpl::WaitForPendingWrites:BeforeBlock", + "DBImpl::WriteImpl:BeforeUnorderedWriteMemtable"}}); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::IngestExternalFile:NeedFlush", [&](void* need_flush) { + ASSERT_TRUE(*reinterpret_cast(need_flush)); + }); + + Options options = CurrentOptions(); + options.unordered_write = true; + DestroyAndReopen(options); + Put("foo", "v1"); + SyncPoint::GetInstance()->EnableProcessing(); + port::Thread writer([&]() { Put("bar", "v2"); }); + + TEST_SYNC_POINT("ExternalSSTFileTest::WithUnorderedWrite:WaitWriteWAL"); + ASSERT_OK(GenerateAndAddExternalFile(options, {{"bar", "v3"}}, -1, + true /* allow_global_seqno */)); + ASSERT_EQ(Get("bar"), "v3"); + + writer.join(); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + TEST_P(ExternalSSTFileTest, IngestFileWithGlobalSeqnoRandomized) { Options options = CurrentOptions(); options.IncreaseParallelism(20);