diff --git a/HISTORY.md b/HISTORY.md index b6099eba2..11140ca4f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -19,6 +19,7 @@ * Added `TableProperties::slow_compression_estimated_data_size` and `TableProperties::fast_compression_estimated_data_size`. When `ColumnFamilyOptions::sample_for_compression > 0`, they estimate what `TableProperties::data_size` would have been if the "fast" or "slow" (see `ColumnFamilyOptions::sample_for_compression` API doc for definitions) compression had been used instead. * Update DB::StartIOTrace and remove Env object from the arguments as its redundant and DB already has Env object that is passed down to IOTracer::StartIOTrace * For new integrated BlobDB, add support for blob files for backup/restore like table files. Because of current limitations, blob files always use the kLegacyCrc32cAndFileSize naming scheme, and incremental backups must read and checksum all blob files in a DB, even for files that are already backed up. +* Added `FlushReason::kWalFull`, which is reported when a memtable is flushed due to the WAL reaching its size limit; those flushes were previously reported as `FlushReason::kWriteBufferManager`. Also, changed the reason for flushes triggered by the write buffer manager to `FlushReason::kWriteBufferManager`; they were previously reported as `FlushReason::kWriteBufferFull`. ### New Features * Added the ability to open BackupEngine backups as read-only DBs, using BackupInfo::name_for_open and env_for_open provided by BackupEngine::GetBackupInfo() with include_file_details=true. diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 6082ec70b..57edeb60e 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1605,7 +1605,7 @@ class DBImpl : public DB { Status SwitchWAL(WriteContext* write_context); // REQUIRES: mutex locked and in write thread. - Status HandleWriteBufferFull(WriteContext* write_context); + Status HandleWriteBufferManagerFlush(WriteContext* write_context); // REQUIRES: mutex locked Status PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync, diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index af785fa42..f9a6adc07 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -937,7 +937,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, // be flushed. We may end up with flushing much more DBs than needed. It's // suboptimal but still correct. WaitForPendingWrites(); - status = HandleWriteBufferFull(write_context); + status = HandleWriteBufferManagerFlush(write_context); } if (UNLIKELY(status.ok() && !trim_history_scheduler_.Empty())) { @@ -1348,20 +1348,20 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) { if (!immutable_db_options_.atomic_flush) { FlushRequest flush_req; GenerateFlushRequest({cfd}, &flush_req); - SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager); + SchedulePendingFlush(flush_req, FlushReason::kWalFull); } } if (immutable_db_options_.atomic_flush) { FlushRequest flush_req; GenerateFlushRequest(cfds, &flush_req); - SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager); + SchedulePendingFlush(flush_req, FlushReason::kWalFull); } MaybeScheduleFlushOrCompaction(); } return status; } -Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { +Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) { mutex_.AssertHeld(); assert(write_context != nullptr); Status status; @@ -1373,7 +1373,7 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { // suboptimal but still correct. ROCKS_LOG_INFO( immutable_db_options_.info_log, - "Flushing column family with oldest memtable entry. Write buffer is " + "Flushing column family with oldest memtable entry. Write buffers are " "using %" ROCKSDB_PRIszt " bytes out of a total of %" ROCKSDB_PRIszt ".", write_buffer_manager_->memory_usage(), write_buffer_manager_->buffer_size()); @@ -1434,13 +1434,13 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { if (!immutable_db_options_.atomic_flush) { FlushRequest flush_req; GenerateFlushRequest({cfd}, &flush_req); - SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); + SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager); } } if (immutable_db_options_.atomic_flush) { FlushRequest flush_req; GenerateFlushRequest(cfds, &flush_req); - SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); + SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager); } MaybeScheduleFlushOrCompaction(); } diff --git a/db/db_test.cc b/db/db_test.cc index 2fd04af5f..cf2590f8a 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -908,6 +908,9 @@ TEST_F(DBTest, FlushSchedule) { static_cast(options.write_buffer_size); options.max_write_buffer_number = 2; options.write_buffer_size = 120 * 1024; + auto flush_listener = std::make_shared(); + flush_listener->expected_flush_reason = FlushReason::kWriteBufferFull; + options.listeners.push_back(flush_listener); CreateAndReopenWithCF({"pikachu"}, options); std::vector threads; diff --git a/db/db_test2.cc b/db/db_test2.cc index f3a3a3edf..f22bf5c87 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -344,6 +344,10 @@ class DBTestSharedWriteBufferAcrossCFs TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) { Options options = CurrentOptions(); options.arena_block_size = 4096; + auto flush_listener = std::make_shared(); + options.listeners.push_back(flush_listener); + // Don't trip the listener at shutdown. + options.avoid_flush_during_shutdown = true; // Avoid undeterministic value by malloc_usable_size(); // Force arena block size to 1 @@ -387,6 +391,7 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) { // Create some data and flush "default" and "nikitich" so that they // are newer CFs created. + flush_listener->expected_flush_reason = FlushReason::kManualFlush; ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); Flush(3); ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); @@ -397,6 +402,7 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), static_cast(1)); + flush_listener->expected_flush_reason = FlushReason::kWriteBufferManager; ASSERT_OK(Put(3, Key(1), DummyString(30000), wo)); if (cost_cache_) { ASSERT_GE(cache->GetUsage(), 256 * 1024); @@ -521,6 +527,10 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) { std::string dbname2 = test::PerThreadDBPath("db_shared_wb_db2"); Options options = CurrentOptions(); options.arena_block_size = 4096; + auto flush_listener = std::make_shared(); + options.listeners.push_back(flush_listener); + // Don't trip the listener at shutdown. + options.avoid_flush_during_shutdown = true; // Avoid undeterministic value by malloc_usable_size(); // Force arena block size to 1 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( @@ -558,6 +568,7 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) { }; // Trigger a flush on cf2 + flush_listener->expected_flush_reason = FlushReason::kWriteBufferManager; ASSERT_OK(Put(2, Key(1), DummyString(70000), wo)); wait_flush(); ASSERT_OK(Put(0, Key(1), DummyString(20000), wo)); diff --git a/db/db_test_util.h b/db/db_test_util.h index 481343de4..3d098bb12 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -761,6 +761,17 @@ class OnFileDeletionListener : public EventListener { size_t matched_count_; std::string expected_file_name_; }; + +class FlushCounterListener : public EventListener { + public: + std::atomic count{0}; + std::atomic expected_flush_reason{FlushReason::kOthers}; + + void OnFlushBegin(DB* /*db*/, const FlushJobInfo& flush_job_info) override { + count++; + ASSERT_EQ(expected_flush_reason.load(), flush_job_info.flush_reason); + } +}; #endif // A test merge operator mimics put but also fails if one of merge operands is diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 69c42ff52..8aaa83cd1 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1801,19 +1801,8 @@ TEST_P(DBWALTestWithParamsVaryingRecoveryMode, // avoid_flush_during_recovery=true. // Flush should trigger if max_total_wal_size is reached. TEST_F(DBWALTest, RestoreTotalLogSizeAfterRecoverWithoutFlush) { - class TestFlushListener : public EventListener { - public: - std::atomic count{0}; - - TestFlushListener() = default; - - void OnFlushBegin(DB* /*db*/, const FlushJobInfo& flush_job_info) override { - count++; - ASSERT_EQ(FlushReason::kWriteBufferManager, flush_job_info.flush_reason); - } - }; - std::shared_ptr test_listener = - std::make_shared(); + auto test_listener = std::make_shared(); + test_listener->expected_flush_reason = FlushReason::kWalFull; constexpr size_t kKB = 1024; constexpr size_t kMB = 1024 * 1024; diff --git a/db/flush_job.cc b/db/flush_job.cc index fc8f8a9d3..0b108c4a8 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -75,6 +75,8 @@ const char* GetFlushReasonString (FlushReason flush_reason) { return "Manual Flush"; case FlushReason::kErrorRecovery: return "Error Recovery"; + case FlushReason::kWalFull: + return "WAL Full"; default: return "Invalid"; } diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index ca1766195..af4423de6 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -118,6 +118,7 @@ enum class FlushReason : int { // When set the flush reason to kErrorRecoveryRetryFlush, SwitchMemtable // will not be called to avoid many small immutable memtables. kErrorRecoveryRetryFlush = 0xc, + kWalFull = 0xd, }; // TODO: In the future, BackgroundErrorReason will only be used to indicate