diff --git a/db/db_impl.cc b/db/db_impl.cc index 4eb7091cb..8180564c2 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1112,6 +1112,25 @@ Status DBImpl::SyncWAL() { return status; } +Status DBImpl::LockWAL() { + log_write_mutex_.Lock(); + auto cur_log_writer = logs_.back().writer; + auto status = cur_log_writer->WriteBuffer(); + if (!status.ok()) { + ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s", + status.ToString().c_str()); + // In case there is a fs error we should set it globally to prevent the + // future writes + WriteStatusCheck(status); + } + return status; +} + +Status DBImpl::UnlockWAL() { + log_write_mutex_.Unlock(); + return Status::OK(); +} + void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status) { mutex_.AssertHeld(); diff --git a/db/db_impl.h b/db/db_impl.h index 5af6e2bf2..e834e0fbe 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -234,8 +234,10 @@ class DBImpl : public DB { const FlushOptions& options, const std::vector& column_families) override; virtual Status FlushWAL(bool sync) override; - bool TEST_WALBufferIsEmpty(); + bool TEST_WALBufferIsEmpty(bool lock = true); virtual Status SyncWAL() override; + virtual Status LockWAL() override; + virtual Status UnlockWAL() override; virtual SequenceNumber GetLatestSequenceNumber() const override; virtual SequenceNumber GetLastPublishedSequence() const { diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 2f99e7d0e..982227149 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -26,10 +26,16 @@ void DBImpl::TEST_SwitchWAL() { SwitchWAL(&write_context); } -bool DBImpl::TEST_WALBufferIsEmpty() { - InstrumentedMutexLock wl(&log_write_mutex_); +bool DBImpl::TEST_WALBufferIsEmpty(bool lock) { + if (lock) { + log_write_mutex_.Lock(); + } log::Writer* cur_log_writer = logs_.back().writer; - return cur_log_writer->TEST_BufferIsEmpty(); + auto res = cur_log_writer->TEST_BufferIsEmpty(); + if (lock) { + log_write_mutex_.Unlock(); + } + return res; } int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes( diff --git a/db/db_write_test.cc b/db/db_write_test.cc index 3208f34b0..e6bab8751 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -166,6 +166,25 @@ TEST_P(DBWriteTest, IOErrorOnSwitchMemtable) { Close(); } +// Test that db->LockWAL() flushes the WAL after locking. +TEST_P(DBWriteTest, LockWalInEffect) { + Options options = GetOptions(); + Reopen(options); + // try the 1st WAL created during open + ASSERT_OK(Put("key" + ToString(0), "value")); + ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty()); + ASSERT_OK(dbfull()->LockWAL()); + ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false)); + ASSERT_OK(dbfull()->UnlockWAL()); + // try the 2nd wal created during SwitchWAL + dbfull()->TEST_SwitchWAL(); + ASSERT_OK(Put("key" + ToString(0), "value")); + ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty()); + ASSERT_OK(dbfull()->LockWAL()); + ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false)); + ASSERT_OK(dbfull()->UnlockWAL()); +} + INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest, testing::Values(DBTestBase::kDefault, DBTestBase::kConcurrentWALWrites, diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 7e2556f73..b40af20e2 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -986,6 +986,16 @@ class DB { // Currently only works if allow_mmap_writes = false in Options. virtual Status SyncWAL() = 0; + // Lock the WAL. Also flushes the WAL after locking. + virtual Status LockWAL() { + return Status::NotSupported("LockWAL not implemented"); + } + + // Unlock the WAL. + virtual Status UnlockWAL() { + return Status::NotSupported("UnlockWAL not implemented"); + } + // The sequence number of the most recent transaction. virtual SequenceNumber GetLatestSequenceNumber() const = 0; diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index aac0745fd..8fef9b3e8 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -281,6 +281,10 @@ class StackableDB : public DB { virtual Status FlushWAL(bool sync) override { return db_->FlushWAL(sync); } + virtual Status LockWAL() override { return db_->LockWAL(); } + + virtual Status UnlockWAL() override { return db_->UnlockWAL(); } + #ifndef ROCKSDB_LITE virtual Status DisableFileDeletions() override {