Fix flush reason attribution (#8150)

Summary:
Current flush reason attribution is misleading or incorrect (depending on what the original intention was):

- Flush due to WAL reaching its maximum size is attributed to `kWriteBufferManager`
- Flushes due to full write buffer and write buffer manager are not distinguishable, both are attributed to `kWriteBufferFull`

This changes the first to a new flush reason `kWALFull`, and splits the second between `kWriteBufferManager` and `kWriteBufferFull`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8150

Reviewed By: zhichao-cao

Differential Revision: D27569645

Pulled By: ot

fbshipit-source-id: 7e3c8ca186a6e71976e6b8e937297eebd4b769cc
main
Giuseppe Ottaviano 4 years ago committed by Facebook GitHub Bot
parent 0be89e87fd
commit 48cd7a3aae
  1. 1
      HISTORY.md
  2. 2
      db/db_impl/db_impl.h
  3. 14
      db/db_impl/db_impl_write.cc
  4. 3
      db/db_test.cc
  5. 11
      db/db_test2.cc
  6. 11
      db/db_test_util.h
  7. 15
      db/db_wal_test.cc
  8. 2
      db/flush_job.cc
  9. 1
      include/rocksdb/listener.h

@ -19,6 +19,7 @@
* Added `TableProperties::slow_compression_estimated_data_size` and `TableProperties::fast_compression_estimated_data_size`. When `ColumnFamilyOptions::sample_for_compression > 0`, they estimate what `TableProperties::data_size` would have been if the "fast" or "slow" (see `ColumnFamilyOptions::sample_for_compression` API doc for definitions) compression had been used instead. * Added `TableProperties::slow_compression_estimated_data_size` and `TableProperties::fast_compression_estimated_data_size`. When `ColumnFamilyOptions::sample_for_compression > 0`, they estimate what `TableProperties::data_size` would have been if the "fast" or "slow" (see `ColumnFamilyOptions::sample_for_compression` API doc for definitions) compression had been used instead.
* Update DB::StartIOTrace and remove Env object from the arguments as its redundant and DB already has Env object that is passed down to IOTracer::StartIOTrace * Update DB::StartIOTrace and remove Env object from the arguments as its redundant and DB already has Env object that is passed down to IOTracer::StartIOTrace
* For new integrated BlobDB, add support for blob files for backup/restore like table files. Because of current limitations, blob files always use the kLegacyCrc32cAndFileSize naming scheme, and incremental backups must read and checksum all blob files in a DB, even for files that are already backed up. * For new integrated BlobDB, add support for blob files for backup/restore like table files. Because of current limitations, blob files always use the kLegacyCrc32cAndFileSize naming scheme, and incremental backups must read and checksum all blob files in a DB, even for files that are already backed up.
* Added `FlushReason::kWalFull`, which is reported when a memtable is flushed due to the WAL reaching its size limit; those flushes were previously reported as `FlushReason::kWriteBufferManager`. Also, changed the reason for flushes triggered by the write buffer manager to `FlushReason::kWriteBufferManager`; they were previously reported as `FlushReason::kWriteBufferFull`.
### New Features ### New Features
* Added the ability to open BackupEngine backups as read-only DBs, using BackupInfo::name_for_open and env_for_open provided by BackupEngine::GetBackupInfo() with include_file_details=true. * Added the ability to open BackupEngine backups as read-only DBs, using BackupInfo::name_for_open and env_for_open provided by BackupEngine::GetBackupInfo() with include_file_details=true.

@ -1605,7 +1605,7 @@ class DBImpl : public DB {
Status SwitchWAL(WriteContext* write_context); Status SwitchWAL(WriteContext* write_context);
// REQUIRES: mutex locked and in write thread. // REQUIRES: mutex locked and in write thread.
Status HandleWriteBufferFull(WriteContext* write_context); Status HandleWriteBufferManagerFlush(WriteContext* write_context);
// REQUIRES: mutex locked // REQUIRES: mutex locked
Status PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync, Status PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync,

@ -937,7 +937,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
// be flushed. We may end up with flushing much more DBs than needed. It's // be flushed. We may end up with flushing much more DBs than needed. It's
// suboptimal but still correct. // suboptimal but still correct.
WaitForPendingWrites(); WaitForPendingWrites();
status = HandleWriteBufferFull(write_context); status = HandleWriteBufferManagerFlush(write_context);
} }
if (UNLIKELY(status.ok() && !trim_history_scheduler_.Empty())) { if (UNLIKELY(status.ok() && !trim_history_scheduler_.Empty())) {
@ -1348,20 +1348,20 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
if (!immutable_db_options_.atomic_flush) { if (!immutable_db_options_.atomic_flush) {
FlushRequest flush_req; FlushRequest flush_req;
GenerateFlushRequest({cfd}, &flush_req); GenerateFlushRequest({cfd}, &flush_req);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager); SchedulePendingFlush(flush_req, FlushReason::kWalFull);
} }
} }
if (immutable_db_options_.atomic_flush) { if (immutable_db_options_.atomic_flush) {
FlushRequest flush_req; FlushRequest flush_req;
GenerateFlushRequest(cfds, &flush_req); GenerateFlushRequest(cfds, &flush_req);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager); SchedulePendingFlush(flush_req, FlushReason::kWalFull);
} }
MaybeScheduleFlushOrCompaction(); MaybeScheduleFlushOrCompaction();
} }
return status; return status;
} }
Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) {
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(write_context != nullptr); assert(write_context != nullptr);
Status status; Status status;
@ -1373,7 +1373,7 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
// suboptimal but still correct. // suboptimal but still correct.
ROCKS_LOG_INFO( ROCKS_LOG_INFO(
immutable_db_options_.info_log, immutable_db_options_.info_log,
"Flushing column family with oldest memtable entry. Write buffer is " "Flushing column family with oldest memtable entry. Write buffers are "
"using %" ROCKSDB_PRIszt " bytes out of a total of %" ROCKSDB_PRIszt ".", "using %" ROCKSDB_PRIszt " bytes out of a total of %" ROCKSDB_PRIszt ".",
write_buffer_manager_->memory_usage(), write_buffer_manager_->memory_usage(),
write_buffer_manager_->buffer_size()); write_buffer_manager_->buffer_size());
@ -1434,13 +1434,13 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
if (!immutable_db_options_.atomic_flush) { if (!immutable_db_options_.atomic_flush) {
FlushRequest flush_req; FlushRequest flush_req;
GenerateFlushRequest({cfd}, &flush_req); GenerateFlushRequest({cfd}, &flush_req);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager);
} }
} }
if (immutable_db_options_.atomic_flush) { if (immutable_db_options_.atomic_flush) {
FlushRequest flush_req; FlushRequest flush_req;
GenerateFlushRequest(cfds, &flush_req); GenerateFlushRequest(cfds, &flush_req);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager);
} }
MaybeScheduleFlushOrCompaction(); MaybeScheduleFlushOrCompaction();
} }

@ -908,6 +908,9 @@ TEST_F(DBTest, FlushSchedule) {
static_cast<int64_t>(options.write_buffer_size); static_cast<int64_t>(options.write_buffer_size);
options.max_write_buffer_number = 2; options.max_write_buffer_number = 2;
options.write_buffer_size = 120 * 1024; options.write_buffer_size = 120 * 1024;
auto flush_listener = std::make_shared<FlushCounterListener>();
flush_listener->expected_flush_reason = FlushReason::kWriteBufferFull;
options.listeners.push_back(flush_listener);
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
std::vector<port::Thread> threads; std::vector<port::Thread> threads;

@ -344,6 +344,10 @@ class DBTestSharedWriteBufferAcrossCFs
TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) { TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.arena_block_size = 4096; options.arena_block_size = 4096;
auto flush_listener = std::make_shared<FlushCounterListener>();
options.listeners.push_back(flush_listener);
// Don't trip the listener at shutdown.
options.avoid_flush_during_shutdown = true;
// Avoid undeterministic value by malloc_usable_size(); // Avoid undeterministic value by malloc_usable_size();
// Force arena block size to 1 // Force arena block size to 1
@ -387,6 +391,7 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
// Create some data and flush "default" and "nikitich" so that they // Create some data and flush "default" and "nikitich" so that they
// are newer CFs created. // are newer CFs created.
flush_listener->expected_flush_reason = FlushReason::kManualFlush;
ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
Flush(3); Flush(3);
ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
@ -397,6 +402,7 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(1)); static_cast<uint64_t>(1));
flush_listener->expected_flush_reason = FlushReason::kWriteBufferManager;
ASSERT_OK(Put(3, Key(1), DummyString(30000), wo)); ASSERT_OK(Put(3, Key(1), DummyString(30000), wo));
if (cost_cache_) { if (cost_cache_) {
ASSERT_GE(cache->GetUsage(), 256 * 1024); ASSERT_GE(cache->GetUsage(), 256 * 1024);
@ -521,6 +527,10 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
std::string dbname2 = test::PerThreadDBPath("db_shared_wb_db2"); std::string dbname2 = test::PerThreadDBPath("db_shared_wb_db2");
Options options = CurrentOptions(); Options options = CurrentOptions();
options.arena_block_size = 4096; options.arena_block_size = 4096;
auto flush_listener = std::make_shared<FlushCounterListener>();
options.listeners.push_back(flush_listener);
// Don't trip the listener at shutdown.
options.avoid_flush_during_shutdown = true;
// Avoid undeterministic value by malloc_usable_size(); // Avoid undeterministic value by malloc_usable_size();
// Force arena block size to 1 // Force arena block size to 1
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
@ -558,6 +568,7 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
}; };
// Trigger a flush on cf2 // Trigger a flush on cf2
flush_listener->expected_flush_reason = FlushReason::kWriteBufferManager;
ASSERT_OK(Put(2, Key(1), DummyString(70000), wo)); ASSERT_OK(Put(2, Key(1), DummyString(70000), wo));
wait_flush(); wait_flush();
ASSERT_OK(Put(0, Key(1), DummyString(20000), wo)); ASSERT_OK(Put(0, Key(1), DummyString(20000), wo));

@ -761,6 +761,17 @@ class OnFileDeletionListener : public EventListener {
size_t matched_count_; size_t matched_count_;
std::string expected_file_name_; std::string expected_file_name_;
}; };
class FlushCounterListener : public EventListener {
public:
std::atomic<int> count{0};
std::atomic<FlushReason> expected_flush_reason{FlushReason::kOthers};
void OnFlushBegin(DB* /*db*/, const FlushJobInfo& flush_job_info) override {
count++;
ASSERT_EQ(expected_flush_reason.load(), flush_job_info.flush_reason);
}
};
#endif #endif
// A test merge operator mimics put but also fails if one of merge operands is // A test merge operator mimics put but also fails if one of merge operands is

@ -1801,19 +1801,8 @@ TEST_P(DBWALTestWithParamsVaryingRecoveryMode,
// avoid_flush_during_recovery=true. // avoid_flush_during_recovery=true.
// Flush should trigger if max_total_wal_size is reached. // Flush should trigger if max_total_wal_size is reached.
TEST_F(DBWALTest, RestoreTotalLogSizeAfterRecoverWithoutFlush) { TEST_F(DBWALTest, RestoreTotalLogSizeAfterRecoverWithoutFlush) {
class TestFlushListener : public EventListener { auto test_listener = std::make_shared<FlushCounterListener>();
public: test_listener->expected_flush_reason = FlushReason::kWalFull;
std::atomic<int> count{0};
TestFlushListener() = default;
void OnFlushBegin(DB* /*db*/, const FlushJobInfo& flush_job_info) override {
count++;
ASSERT_EQ(FlushReason::kWriteBufferManager, flush_job_info.flush_reason);
}
};
std::shared_ptr<TestFlushListener> test_listener =
std::make_shared<TestFlushListener>();
constexpr size_t kKB = 1024; constexpr size_t kKB = 1024;
constexpr size_t kMB = 1024 * 1024; constexpr size_t kMB = 1024 * 1024;

@ -75,6 +75,8 @@ const char* GetFlushReasonString (FlushReason flush_reason) {
return "Manual Flush"; return "Manual Flush";
case FlushReason::kErrorRecovery: case FlushReason::kErrorRecovery:
return "Error Recovery"; return "Error Recovery";
case FlushReason::kWalFull:
return "WAL Full";
default: default:
return "Invalid"; return "Invalid";
} }

@ -118,6 +118,7 @@ enum class FlushReason : int {
// When set the flush reason to kErrorRecoveryRetryFlush, SwitchMemtable // When set the flush reason to kErrorRecoveryRetryFlush, SwitchMemtable
// will not be called to avoid many small immutable memtables. // will not be called to avoid many small immutable memtables.
kErrorRecoveryRetryFlush = 0xc, kErrorRecoveryRetryFlush = 0xc,
kWalFull = 0xd,
}; };
// TODO: In the future, BackgroundErrorReason will only be used to indicate // TODO: In the future, BackgroundErrorReason will only be used to indicate

Loading…
Cancel
Save