From 718c1c9c1f01ce4a3c0f4ae488e78f583d40a38c Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Mon, 14 May 2018 10:53:32 -0700 Subject: [PATCH] Pass manual_wal_flush also to the first wal file Summary: Currently manual_wal_flush if set in the options will be used only for the wal files created during wal switch. The configuration thus does not affect the first wal file. The patch fixes that and also update the related unit tests. This PR is built on top of https://github.com/facebook/rocksdb/pull/3756 Closes https://github.com/facebook/rocksdb/pull/3824 Differential Revision: D7909153 Pulled By: maysamyabandeh fbshipit-source-id: 024ed99d2555db06bf096c902b998e432bb7b9ce --- db/db_impl.cc | 5 +++++ db/db_impl.h | 1 + db/db_impl_debug.cc | 6 ++++++ db/db_impl_open.cc | 6 +++++- db/db_write_test.cc | 35 +++++++++++++++++++++++++++++++++-- db/log_writer.cc | 2 ++ db/log_writer.h | 2 ++ options/options_helper.cc | 2 ++ util/file_reader_writer.h | 2 ++ 9 files changed, 58 insertions(+), 3 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 1e9728cc1..c833ecacd 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -718,6 +718,11 @@ Status DBImpl::FlushWAL(bool sync) { if (!s.ok()) { ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s", s.ToString().c_str()); + // In case there is a fs error we should set it globally to prevent the + // future writes + WriteStatusCheck(s); + // whether sync or not, we should abort the rest of function upon error + return s; } if (!sync) { ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false"); diff --git a/db/db_impl.h b/db/db_impl.h index 9620a53de..9fb4bfb88 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -221,6 +221,7 @@ class DBImpl : public DB { virtual Status Flush(const FlushOptions& options, ColumnFamilyHandle* column_family) override; virtual Status FlushWAL(bool sync) override; + bool TEST_WALBufferIsEmpty(); virtual Status SyncWAL() override; virtual SequenceNumber GetLatestSequenceNumber() const override; diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 793abd35f..8032f8333 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -25,6 +25,12 @@ void DBImpl::TEST_SwitchWAL() { SwitchWAL(&write_context); } +bool DBImpl::TEST_WALBufferIsEmpty() { + InstrumentedMutexLock wl(&log_write_mutex_); + log::Writer* cur_log_writer = logs_.back().writer; + return cur_log_writer->TEST_BufferIsEmpty(); +} + int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes( ColumnFamilyHandle* column_family) { ColumnFamilyData* cfd; diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 0cf2b0b8f..2ab347893 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -1090,7 +1090,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, new_log_number, new log::Writer( std::move(file_writer), new_log_number, - impl->immutable_db_options_.recycle_log_file_num > 0)); + impl->immutable_db_options_.recycle_log_file_num > 0, + impl->immutable_db_options_.manual_wal_flush)); } // set column family handles @@ -1217,6 +1218,9 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, if (s.ok()) { ROCKS_LOG_INFO(impl->immutable_db_options_.info_log, "DB pointer %p", impl); LogFlush(impl->immutable_db_options_.info_log); + assert(impl->TEST_WALBufferIsEmpty()); + // If the assert above fails then we need to FlushWAL before returning + // control back to the user. if (!persist_options_status.ok()) { s = Status::IOError( "DB::Open() failed --- Unable to persist Options file", diff --git a/db/db_write_test.cc b/db/db_write_test.cc index 917aef550..200397681 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -50,6 +50,7 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { std::atomic leader_count{0}; std::vector threads; mock_env->SetFilesystemActive(false); + // Wait until all threads linked to write threads, to make sure // all threads join the same batch group. SyncPoint::GetInstance()->SetCallBack( @@ -68,7 +69,13 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { threads.push_back(port::Thread( [&](int index) { // All threads should fail. - ASSERT_FALSE(Put("key" + ToString(index), "value").ok()); + auto res = Put("key" + ToString(index), "value"); + if (options.manual_wal_flush) { + ASSERT_TRUE(res.ok()); + // we should see fs error when we do the flush + res = dbfull()->FlushWAL(false); + } + ASSERT_FALSE(res.ok()); }, i)); } @@ -80,6 +87,22 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { Close(); } +TEST_P(DBWriteTest, ManualWalFlushInEffect) { + Options options = GetOptions(); + Reopen(options); + // try the 1st WAL created during open + ASSERT_TRUE(Put("key" + ToString(0), "value").ok()); + ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty()); + ASSERT_TRUE(dbfull()->FlushWAL(false).ok()); + ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty()); + // try the 2nd wal created during SwitchWAL + dbfull()->TEST_SwitchWAL(); + ASSERT_TRUE(Put("key" + ToString(0), "value").ok()); + ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty()); + ASSERT_TRUE(dbfull()->FlushWAL(false).ok()); + ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty()); +} + TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) { std::unique_ptr mock_env( new FaultInjectionTestEnv(Env::Default())); @@ -90,7 +113,15 @@ TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) { // Forcibly fail WAL write for the first Put only. Subsequent Puts should // fail due to read-only mode mock_env->SetFilesystemActive(i != 0); - ASSERT_FALSE(Put("key" + ToString(i), "value").ok()); + auto res = Put("key" + ToString(i), "value"); + if (options.manual_wal_flush && i == 0) { + // even with manual_wal_flush the 2nd Put should return error because of + // the read-only mode + ASSERT_TRUE(res.ok()); + // we should see fs error when we do the flush + res = dbfull()->FlushWAL(false); + } + ASSERT_FALSE(res.ok()); } // Close before mock_env destruct. Close(); diff --git a/db/log_writer.cc b/db/log_writer.cc index 999f9c580..c31adbec5 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -92,6 +92,8 @@ Status Writer::AddRecord(const Slice& slice) { return s; } +bool Writer::TEST_BufferIsEmpty() { return dest_->TEST_BufferIsEmpty(); } + Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { assert(n <= 0xffff); // Must fit in two bytes diff --git a/db/log_writer.h b/db/log_writer.h index 143ad2674..abd7977b9 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -85,6 +85,8 @@ class Writer { Status WriteBuffer(); + bool TEST_BufferIsEmpty(); + private: unique_ptr dest_; size_t block_offset_; // Current offset in block diff --git a/options/options_helper.cc b/options/options_helper.cc index 41db55c36..6d412b5ee 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -124,6 +124,8 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, immutable_db_options.allow_ingest_behind; options.preserve_deletes = immutable_db_options.preserve_deletes; + options.two_write_queues = immutable_db_options.two_write_queues; + options.manual_wal_flush = immutable_db_options.manual_wal_flush; return options; } diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 9db12ba06..6d62e54ca 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -189,6 +189,8 @@ class WritableFileWriter { bool use_direct_io() { return writable_file_->use_direct_io(); } + bool TEST_BufferIsEmpty() { return buf_.CurrentSize() == 0; } + private: // Used when os buffering is OFF and we are writing // DMA such as in Direct I/O mode