diff --git a/db/db_impl.cc b/db/db_impl.cc index 1e9728cc1..c833ecacd 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -718,6 +718,11 @@ Status DBImpl::FlushWAL(bool sync) { if (!s.ok()) { ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s", s.ToString().c_str()); + // In case there is a fs error we should set it globally to prevent the + // future writes + WriteStatusCheck(s); + // whether sync or not, we should abort the rest of function upon error + return s; } if (!sync) { ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false"); diff --git a/db/db_impl.h b/db/db_impl.h index 9620a53de..9fb4bfb88 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -221,6 +221,7 @@ class DBImpl : public DB { virtual Status Flush(const FlushOptions& options, ColumnFamilyHandle* column_family) override; virtual Status FlushWAL(bool sync) override; + bool TEST_WALBufferIsEmpty(); virtual Status SyncWAL() override; virtual SequenceNumber GetLatestSequenceNumber() const override; diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 793abd35f..8032f8333 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -25,6 +25,12 @@ void DBImpl::TEST_SwitchWAL() { SwitchWAL(&write_context); } +bool DBImpl::TEST_WALBufferIsEmpty() { + InstrumentedMutexLock wl(&log_write_mutex_); + log::Writer* cur_log_writer = logs_.back().writer; + return cur_log_writer->TEST_BufferIsEmpty(); +} + int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes( ColumnFamilyHandle* column_family) { ColumnFamilyData* cfd; diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 0cf2b0b8f..2ab347893 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -1090,7 +1090,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, new_log_number, new log::Writer( std::move(file_writer), new_log_number, - impl->immutable_db_options_.recycle_log_file_num > 0)); + impl->immutable_db_options_.recycle_log_file_num > 0, + impl->immutable_db_options_.manual_wal_flush)); } // set column family handles @@ -1217,6 +1218,9 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, if (s.ok()) { ROCKS_LOG_INFO(impl->immutable_db_options_.info_log, "DB pointer %p", impl); LogFlush(impl->immutable_db_options_.info_log); + assert(impl->TEST_WALBufferIsEmpty()); + // If the assert above fails then we need to FlushWAL before returning + // control back to the user. if (!persist_options_status.ok()) { s = Status::IOError( "DB::Open() failed --- Unable to persist Options file", diff --git a/db/db_write_test.cc b/db/db_write_test.cc index 917aef550..200397681 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -50,6 +50,7 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { std::atomic leader_count{0}; std::vector threads; mock_env->SetFilesystemActive(false); + // Wait until all threads linked to write threads, to make sure // all threads join the same batch group. SyncPoint::GetInstance()->SetCallBack( @@ -68,7 +69,13 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { threads.push_back(port::Thread( [&](int index) { // All threads should fail. - ASSERT_FALSE(Put("key" + ToString(index), "value").ok()); + auto res = Put("key" + ToString(index), "value"); + if (options.manual_wal_flush) { + ASSERT_TRUE(res.ok()); + // we should see fs error when we do the flush + res = dbfull()->FlushWAL(false); + } + ASSERT_FALSE(res.ok()); }, i)); } @@ -80,6 +87,22 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { Close(); } +TEST_P(DBWriteTest, ManualWalFlushInEffect) { + Options options = GetOptions(); + Reopen(options); + // try the 1st WAL created during open + ASSERT_TRUE(Put("key" + ToString(0), "value").ok()); + ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty()); + ASSERT_TRUE(dbfull()->FlushWAL(false).ok()); + ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty()); + // try the 2nd wal created during SwitchWAL + dbfull()->TEST_SwitchWAL(); + ASSERT_TRUE(Put("key" + ToString(0), "value").ok()); + ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty()); + ASSERT_TRUE(dbfull()->FlushWAL(false).ok()); + ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty()); +} + TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) { std::unique_ptr mock_env( new FaultInjectionTestEnv(Env::Default())); @@ -90,7 +113,15 @@ TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) { // Forcibly fail WAL write for the first Put only. Subsequent Puts should // fail due to read-only mode mock_env->SetFilesystemActive(i != 0); - ASSERT_FALSE(Put("key" + ToString(i), "value").ok()); + auto res = Put("key" + ToString(i), "value"); + if (options.manual_wal_flush && i == 0) { + // even with manual_wal_flush the 2nd Put should return error because of + // the read-only mode + ASSERT_TRUE(res.ok()); + // we should see fs error when we do the flush + res = dbfull()->FlushWAL(false); + } + ASSERT_FALSE(res.ok()); } // Close before mock_env destruct. Close(); diff --git a/db/log_writer.cc b/db/log_writer.cc index 999f9c580..c31adbec5 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -92,6 +92,8 @@ Status Writer::AddRecord(const Slice& slice) { return s; } +bool Writer::TEST_BufferIsEmpty() { return dest_->TEST_BufferIsEmpty(); } + Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { assert(n <= 0xffff); // Must fit in two bytes diff --git a/db/log_writer.h b/db/log_writer.h index 143ad2674..abd7977b9 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -85,6 +85,8 @@ class Writer { Status WriteBuffer(); + bool TEST_BufferIsEmpty(); + private: unique_ptr dest_; size_t block_offset_; // Current offset in block diff --git a/options/options_helper.cc b/options/options_helper.cc index 41db55c36..6d412b5ee 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -124,6 +124,8 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, immutable_db_options.allow_ingest_behind; options.preserve_deletes = immutable_db_options.preserve_deletes; + options.two_write_queues = immutable_db_options.two_write_queues; + options.manual_wal_flush = immutable_db_options.manual_wal_flush; return options; } diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 9db12ba06..6d62e54ca 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -189,6 +189,8 @@ class WritableFileWriter { bool use_direct_io() { return writable_file_->use_direct_io(); } + bool TEST_BufferIsEmpty() { return buf_.CurrentSize() == 0; } + private: // Used when os buffering is OFF and we are writing // DMA such as in Direct I/O mode