Add temperature information to the event listener callbacks (#9591)

Summary:
RocksDB try to provide temperature information in the event
listener callbacks. The information is not guaranteed, as some operation
like backup won't have these information.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9591

Test Plan: Added unittest

Reviewed By: siying, pdillinger

Differential Revision: D34309339

Pulled By: jay-zhuang

fbshipit-source-id: 4aca4f270f99fa49186d85d300da42594663d6d7
main
Jay Zhuang 3 years ago committed by Facebook GitHub Bot
parent 54fb2a8975
commit 2fbc672732
  1. 1
      HISTORY.md
  2. 5
      db/compaction/compaction_job.cc
  3. 97
      db/db_test2.cc
  4. 2
      file/random_access_file_reader.h
  5. 17
      file/writable_file_writer.h
  6. 9
      include/rocksdb/listener.h

@ -73,6 +73,7 @@
* Remove deprecated remote compaction APIs `CompactionService::Start()` and `CompactionService::WaitForComplete()`. Please use `CompactionService::StartV2()`, `CompactionService::WaitForCompleteV2()` instead, which provides the same information plus extra data like priority, db_id, etc. * Remove deprecated remote compaction APIs `CompactionService::Start()` and `CompactionService::WaitForComplete()`. Please use `CompactionService::StartV2()`, `CompactionService::WaitForCompleteV2()` instead, which provides the same information plus extra data like priority, db_id, etc.
* `ColumnFamilyOptions::OldDefaults` and `DBOptions::OldDefaults` are marked deprecated, as they are no longer maintained. * `ColumnFamilyOptions::OldDefaults` and `DBOptions::OldDefaults` are marked deprecated, as they are no longer maintained.
* Add subcompaction callback APIs: `OnSubcompactionBegin()` and `OnSubcompactionCompleted()`. * Add subcompaction callback APIs: `OnSubcompactionBegin()` and `OnSubcompactionCompleted()`.
* Add file Temperature information to `FileOperationInfo` in event listener API.
### Behavior Changes ### Behavior Changes
* Disallow the combination of DBOptions.use_direct_io_for_flush_and_compaction == true and DBOptions.writable_file_max_buffer_size == 0. This combination can cause WritableFileWriter::Append() to loop forever, and it does not make much sense in direct IO. * Disallow the combination of DBOptions.use_direct_io_for_flush_and_compaction == true and DBOptions.writable_file_max_buffer_size == 0. This combination can cause WritableFileWriter::Append() to loop forever, and it does not make much sense in direct IO.

@ -2268,9 +2268,8 @@ Status CompactionJob::OpenCompactionOutputFile(
const auto& listeners = const auto& listeners =
sub_compact->compaction->immutable_options()->listeners; sub_compact->compaction->immutable_options()->listeners;
sub_compact->outfile.reset(new WritableFileWriter( sub_compact->outfile.reset(new WritableFileWriter(
std::move(writable_file), fname, file_options_, db_options_.clock, std::move(writable_file), fname, fo_copy, db_options_.clock, io_tracer_,
io_tracer_, db_options_.stats, listeners, db_options_.stats, listeners, db_options_.file_checksum_gen_factory.get(),
db_options_.file_checksum_gen_factory.get(),
tmp_set.Contains(FileType::kTableFile), false)); tmp_set.Contains(FileType::kTableFile), false));
TableBuilderOptions tboptions( TableBuilderOptions tboptions(

@ -6500,10 +6500,66 @@ TEST_P(RenameCurrentTest, Compaction) {
} }
TEST_F(DBTest2, BottommostTemperature) { TEST_F(DBTest2, BottommostTemperature) {
class TestListener : public EventListener {
public:
void OnFileReadFinish(const FileOperationInfo& info) override {
UpdateFileTemperature(info);
}
void OnFileWriteFinish(const FileOperationInfo& info) override {
UpdateFileTemperature(info);
}
void OnFileFlushFinish(const FileOperationInfo& info) override {
UpdateFileTemperature(info);
}
void OnFileSyncFinish(const FileOperationInfo& info) override {
UpdateFileTemperature(info);
}
void OnFileCloseFinish(const FileOperationInfo& info) override {
UpdateFileTemperature(info);
}
bool ShouldBeNotifiedOnFileIO() override { return true; }
std::unordered_map<uint64_t, Temperature> file_temperatures;
private:
void UpdateFileTemperature(const FileOperationInfo& info) {
auto filename = GetFileName(info.path);
uint64_t number;
FileType type;
ASSERT_TRUE(ParseFileName(filename, &number, &type));
if (type == kTableFile) {
MutexLock l(&mutex_);
auto ret = file_temperatures.insert({number, info.temperature});
if (!ret.second) {
// the same file temperature should always be the same for all events
ASSERT_TRUE(ret.first->second == info.temperature);
}
}
}
std::string GetFileName(const std::string& fname) {
auto filename = fname.substr(fname.find_last_of(kFilePathSeparator) + 1);
// workaround only for Windows that the file path could contain both
// Windows FilePathSeparator and '/'
filename = filename.substr(filename.find_last_of('/') + 1);
return filename;
}
port::Mutex mutex_;
};
auto* listener = new TestListener();
Options options = CurrentOptions(); Options options = CurrentOptions();
options.bottommost_temperature = Temperature::kWarm; options.bottommost_temperature = Temperature::kWarm;
options.level0_file_num_compaction_trigger = 2; options.level0_file_num_compaction_trigger = 2;
options.statistics = CreateDBStatistics(); options.statistics = CreateDBStatistics();
options.listeners.emplace_back(listener);
Reopen(options); Reopen(options);
auto size = GetSstSizeHelper(Temperature::kUnknown); auto size = GetSstSizeHelper(Temperature::kUnknown);
@ -6527,7 +6583,13 @@ TEST_F(DBTest2, BottommostTemperature) {
ColumnFamilyMetaData metadata; ColumnFamilyMetaData metadata;
db_->GetColumnFamilyMetaData(&metadata); db_->GetColumnFamilyMetaData(&metadata);
ASSERT_EQ(1, metadata.file_count); ASSERT_EQ(1, metadata.file_count);
ASSERT_EQ(Temperature::kWarm, metadata.levels[1].files[0].temperature); SstFileMetaData meta = metadata.levels[1].files[0];
ASSERT_EQ(Temperature::kWarm, meta.temperature);
uint64_t number;
FileType type;
ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
size = GetSstSizeHelper(Temperature::kUnknown); size = GetSstSizeHelper(Temperature::kUnknown);
ASSERT_EQ(size, 0); ASSERT_EQ(size, 0);
size = GetSstSizeHelper(Temperature::kWarm); size = GetSstSizeHelper(Temperature::kWarm);
@ -6574,7 +6636,16 @@ TEST_F(DBTest2, BottommostTemperature) {
db_->GetColumnFamilyMetaData(&metadata); db_->GetColumnFamilyMetaData(&metadata);
ASSERT_EQ(2, metadata.file_count); ASSERT_EQ(2, metadata.file_count);
ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[0].temperature); meta = metadata.levels[0].files[0];
ASSERT_EQ(Temperature::kUnknown, meta.temperature);
ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
meta = metadata.levels[1].files[0];
ASSERT_EQ(Temperature::kWarm, meta.temperature);
ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
size = GetSstSizeHelper(Temperature::kUnknown); size = GetSstSizeHelper(Temperature::kUnknown);
ASSERT_GT(size, 0); ASSERT_GT(size, 0);
size = GetSstSizeHelper(Temperature::kWarm); size = GetSstSizeHelper(Temperature::kWarm);
@ -6584,8 +6655,15 @@ TEST_F(DBTest2, BottommostTemperature) {
Reopen(options); Reopen(options);
db_->GetColumnFamilyMetaData(&metadata); db_->GetColumnFamilyMetaData(&metadata);
ASSERT_EQ(2, metadata.file_count); ASSERT_EQ(2, metadata.file_count);
ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[0].temperature); meta = metadata.levels[0].files[0];
ASSERT_EQ(Temperature::kWarm, metadata.levels[1].files[0].temperature); ASSERT_EQ(Temperature::kUnknown, meta.temperature);
ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
meta = metadata.levels[1].files[0];
ASSERT_EQ(Temperature::kWarm, meta.temperature);
ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
size = GetSstSizeHelper(Temperature::kUnknown); size = GetSstSizeHelper(Temperature::kUnknown);
ASSERT_GT(size, 0); ASSERT_GT(size, 0);
size = GetSstSizeHelper(Temperature::kWarm); size = GetSstSizeHelper(Temperature::kWarm);
@ -6605,8 +6683,15 @@ TEST_F(DBTest2, BottommostTemperature) {
Reopen(options); Reopen(options);
db_->GetColumnFamilyMetaData(&metadata); db_->GetColumnFamilyMetaData(&metadata);
ASSERT_EQ(2, metadata.file_count); ASSERT_EQ(2, metadata.file_count);
ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[0].temperature); meta = metadata.levels[0].files[0];
ASSERT_EQ(Temperature::kWarm, metadata.levels[1].files[0].temperature); ASSERT_EQ(Temperature::kUnknown, meta.temperature);
ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
meta = metadata.levels[1].files[0];
ASSERT_EQ(Temperature::kWarm, meta.temperature);
ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
} }
TEST_F(DBTest2, BottommostTemperatureUniversal) { TEST_F(DBTest2, BottommostTemperatureUniversal) {

@ -53,7 +53,7 @@ class RandomAccessFileReader {
const FileOperationInfo::FinishTimePoint& finish_ts, const FileOperationInfo::FinishTimePoint& finish_ts,
const Status& status) const { const Status& status) const {
FileOperationInfo info(FileOperationType::kRead, file_name_, start_ts, FileOperationInfo info(FileOperationType::kRead, file_name_, start_ts,
finish_ts, status); finish_ts, status, file_temperature_);
info.offset = offset; info.offset = offset;
info.length = length; info.length = length;

@ -42,7 +42,7 @@ class WritableFileWriter {
const FileOperationInfo::FinishTimePoint& finish_ts, const FileOperationInfo::FinishTimePoint& finish_ts,
const IOStatus& io_status) { const IOStatus& io_status) {
FileOperationInfo info(FileOperationType::kWrite, file_name_, start_ts, FileOperationInfo info(FileOperationType::kWrite, file_name_, start_ts,
finish_ts, io_status); finish_ts, io_status, temperature_);
info.offset = offset; info.offset = offset;
info.length = length; info.length = length;
@ -56,7 +56,7 @@ class WritableFileWriter {
const FileOperationInfo::FinishTimePoint& finish_ts, const FileOperationInfo::FinishTimePoint& finish_ts,
const IOStatus& io_status) { const IOStatus& io_status) {
FileOperationInfo info(FileOperationType::kFlush, file_name_, start_ts, FileOperationInfo info(FileOperationType::kFlush, file_name_, start_ts,
finish_ts, io_status); finish_ts, io_status, temperature_);
for (auto& listener : listeners_) { for (auto& listener : listeners_) {
listener->OnFileFlushFinish(info); listener->OnFileFlushFinish(info);
@ -68,7 +68,8 @@ class WritableFileWriter {
const FileOperationInfo::FinishTimePoint& finish_ts, const FileOperationInfo::FinishTimePoint& finish_ts,
const IOStatus& io_status, const IOStatus& io_status,
FileOperationType type = FileOperationType::kSync) { FileOperationType type = FileOperationType::kSync) {
FileOperationInfo info(type, file_name_, start_ts, finish_ts, io_status); FileOperationInfo info(type, file_name_, start_ts, finish_ts, io_status,
temperature_);
for (auto& listener : listeners_) { for (auto& listener : listeners_) {
listener->OnFileSyncFinish(info); listener->OnFileSyncFinish(info);
@ -81,7 +82,7 @@ class WritableFileWriter {
const FileOperationInfo::FinishTimePoint& finish_ts, const FileOperationInfo::FinishTimePoint& finish_ts,
const IOStatus& io_status) { const IOStatus& io_status) {
FileOperationInfo info(FileOperationType::kRangeSync, file_name_, start_ts, FileOperationInfo info(FileOperationType::kRangeSync, file_name_, start_ts,
finish_ts, io_status); finish_ts, io_status, temperature_);
info.offset = offset; info.offset = offset;
info.length = length; info.length = length;
@ -95,7 +96,7 @@ class WritableFileWriter {
const FileOperationInfo::FinishTimePoint& finish_ts, const FileOperationInfo::FinishTimePoint& finish_ts,
const IOStatus& io_status) { const IOStatus& io_status) {
FileOperationInfo info(FileOperationType::kTruncate, file_name_, start_ts, FileOperationInfo info(FileOperationType::kTruncate, file_name_, start_ts,
finish_ts, io_status); finish_ts, io_status, temperature_);
for (auto& listener : listeners_) { for (auto& listener : listeners_) {
listener->OnFileTruncateFinish(info); listener->OnFileTruncateFinish(info);
@ -107,7 +108,7 @@ class WritableFileWriter {
const FileOperationInfo::FinishTimePoint& finish_ts, const FileOperationInfo::FinishTimePoint& finish_ts,
const IOStatus& io_status) { const IOStatus& io_status) {
FileOperationInfo info(FileOperationType::kClose, file_name_, start_ts, FileOperationInfo info(FileOperationType::kClose, file_name_, start_ts,
finish_ts, io_status); finish_ts, io_status, temperature_);
for (auto& listener : listeners_) { for (auto& listener : listeners_) {
listener->OnFileCloseFinish(info); listener->OnFileCloseFinish(info);
@ -159,6 +160,7 @@ class WritableFileWriter {
bool perform_data_verification_; bool perform_data_verification_;
uint32_t buffered_data_crc32c_checksum_; uint32_t buffered_data_crc32c_checksum_;
bool buffered_data_with_checksum_; bool buffered_data_with_checksum_;
Temperature temperature_;
public: public:
WritableFileWriter( WritableFileWriter(
@ -189,7 +191,8 @@ class WritableFileWriter {
checksum_finalized_(false), checksum_finalized_(false),
perform_data_verification_(perform_data_verification), perform_data_verification_(perform_data_verification),
buffered_data_crc32c_checksum_(0), buffered_data_crc32c_checksum_(0),
buffered_data_with_checksum_(buffered_data_with_checksum) { buffered_data_with_checksum_(buffered_data_with_checksum),
temperature_(options.temperature) {
assert(!use_direct_io() || max_buffer_size_ > 0); assert(!use_direct_io() || max_buffer_size_ > 0);
TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0", TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0",
reinterpret_cast<void*>(max_buffer_size_)); reinterpret_cast<void*>(max_buffer_size_));

@ -12,6 +12,7 @@
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include "rocksdb/advanced_options.h"
#include "rocksdb/compaction_job_stats.h" #include "rocksdb/compaction_job_stats.h"
#include "rocksdb/compression_type.h" #include "rocksdb/compression_type.h"
#include "rocksdb/customizable.h" #include "rocksdb/customizable.h"
@ -255,16 +256,22 @@ struct FileOperationInfo {
FileOperationType type; FileOperationType type;
const std::string& path; const std::string& path;
// Rocksdb try to provide file temperature information, but it's not
// guaranteed.
Temperature temperature;
uint64_t offset; uint64_t offset;
size_t length; size_t length;
const Duration duration; const Duration duration;
const SystemTimePoint& start_ts; const SystemTimePoint& start_ts;
Status status; Status status;
FileOperationInfo(const FileOperationType _type, const std::string& _path, FileOperationInfo(const FileOperationType _type, const std::string& _path,
const StartTimePoint& _start_ts, const StartTimePoint& _start_ts,
const FinishTimePoint& _finish_ts, const Status& _status) const FinishTimePoint& _finish_ts, const Status& _status,
const Temperature _temperature = Temperature::kUnknown)
: type(_type), : type(_type),
path(_path), path(_path),
temperature(_temperature),
duration(std::chrono::duration_cast<std::chrono::nanoseconds>( duration(std::chrono::duration_cast<std::chrono::nanoseconds>(
_finish_ts - _start_ts.second)), _finish_ts - _start_ts.second)),
start_ts(_start_ts.first), start_ts(_start_ts.first),

Loading…
Cancel
Save