Propagate SST and blob file numbers through the EventListener interface (#5962)

Summary:
This patch adds a number of new information elements to the FlushJobInfo and
CompactionJobInfo structures that are passed to EventListeners via the
OnFlush{Begin, Completed} and OnCompaction{Begin, Completed} callbacks.
Namely, for flushes, the file numbers of the new SST and the oldest blob file it
references are propagated. For compactions, the new pieces of information are
the file number, level, and the oldest blob file referenced by each compaction
input and output file.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5962

Test Plan:
Extended the EventListener unit tests with logic that checks that these information
elements are correctly propagated from the corresponding FileMetaData.

Differential Revision: D18095568

Pulled By: ltamasi

fbshipit-source-id: 6874359a6aadb53366b5fe87adcb2f9bd27a0a56
main
Levi Tamasi 5 years ago committed by Facebook Github Bot
parent dd19014a7a
commit f7e7b34ebe
  1. 1
      HISTORY.md
  2. 38
      db/db_impl/db_impl_compaction_flush.cc
  3. 8
      db/flush_job.cc
  4. 100
      db/listener_test.cc
  5. 37
      include/rocksdb/listener.h

@ -22,6 +22,7 @@
* When using BlobDB, a mapping is maintained and persisted in the MANIFEST between each SST file and the oldest non-TTL blob file it references.
* `db_bench` now supports and by default issues non-TTL Puts to BlobDB. TTL Puts can be enabled by specifying a non-zero value for the `blob_db_max_ttl_range` command line parameter explicitly.
* `sst_dump` now supports printing BlobDB blob indexes in a human-readable format. This can be enabled by specifying the `decode_blob_index` flag on the command line.
* A number of new information elements are now exposed through the EventListener interface. For flushes, the file numbers of the new SST file and the oldest blob file referenced by the SST are propagated. For compactions, the level, file number, and the oldest blob file referenced are passed to the client for each compaction input and output file.
### Public API Change
* Added max_write_buffer_size_to_maintain option to better control memory usage of immutable memtables.
* Added a lightweight API GetCurrentWalFile() to get last live WAL filename and size. Meant to be used as a helper for backup/restore tooling in a larger ecosystem such as MySQL with a MyRocks storage engine.

@ -575,8 +575,10 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
info.cf_name = cfd->GetName();
// TODO(yhchiang): make db_paths dynamic in case flush does not
// go to L0 in the future.
info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path,
file_meta->fd.GetNumber());
const uint64_t file_number = file_meta->fd.GetNumber();
info.file_path =
MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_number);
info.file_number = file_number;
info.thread_id = env_->GetThreadID();
info.job_id = job_id;
info.triggered_writes_slowdown = triggered_writes_slowdown;
@ -1118,9 +1120,13 @@ void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
info.compression = c->output_compression();
for (size_t i = 0; i < c->num_input_levels(); ++i) {
for (const auto fmd : *c->inputs(i)) {
const FileDescriptor& desc = fmd->fd;
const uint64_t file_number = desc.GetNumber();
auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
fmd->fd.GetNumber(), fmd->fd.GetPathId());
file_number, desc.GetPathId());
info.input_files.push_back(fn);
info.input_file_infos.push_back(CompactionFileInfo{
static_cast<int>(i), file_number, fmd->oldest_blob_file_number});
if (info.table_properties.count(fn) == 0) {
std::shared_ptr<const TableProperties> tp;
auto s = current->GetTableProperties(&tp, fmd, &fn);
@ -1131,9 +1137,13 @@ void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
}
}
for (const auto newf : c->edit()->GetNewFiles()) {
const FileMetaData& meta = newf.second;
const FileDescriptor& desc = meta.fd;
const uint64_t file_number = desc.GetNumber();
info.output_files.push_back(TableFileName(
c->immutable_cf_options()->cf_paths, newf.second.fd.GetNumber(),
newf.second.fd.GetPathId()));
c->immutable_cf_options()->cf_paths, file_number, desc.GetPathId()));
info.output_file_infos.push_back(CompactionFileInfo{
newf.first, file_number, meta.oldest_blob_file_number});
}
for (auto listener : immutable_db_options_.listeners) {
listener->OnCompactionBegin(this, info);
@ -2956,9 +2966,13 @@ void DBImpl::BuildCompactionJobInfo(
compaction_job_info->compression = c->output_compression();
for (size_t i = 0; i < c->num_input_levels(); ++i) {
for (const auto fmd : *c->inputs(i)) {
auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
fmd->fd.GetNumber(), fmd->fd.GetPathId());
const FileDescriptor& desc = fmd->fd;
const uint64_t file_number = desc.GetNumber();
auto fn = TableFileName(c->immutable_cf_options()->cf_paths, file_number,
desc.GetPathId());
compaction_job_info->input_files.push_back(fn);
compaction_job_info->input_file_infos.push_back(CompactionFileInfo{
static_cast<int>(i), file_number, fmd->oldest_blob_file_number});
if (compaction_job_info->table_properties.count(fn) == 0) {
std::shared_ptr<const TableProperties> tp;
auto s = current->GetTableProperties(&tp, fmd, &fn);
@ -2969,9 +2983,13 @@ void DBImpl::BuildCompactionJobInfo(
}
}
for (const auto& newf : c->edit()->GetNewFiles()) {
compaction_job_info->output_files.push_back(
TableFileName(c->immutable_cf_options()->cf_paths,
newf.second.fd.GetNumber(), newf.second.fd.GetPathId()));
const FileMetaData& meta = newf.second;
const FileDescriptor& desc = meta.fd;
const uint64_t file_number = desc.GetNumber();
compaction_job_info->output_files.push_back(TableFileName(
c->immutable_cf_options()->cf_paths, file_number, desc.GetPathId()));
compaction_job_info->output_file_infos.push_back(CompactionFileInfo{
newf.first, file_number, meta.oldest_blob_file_number});
}
}
#endif

@ -434,8 +434,12 @@ std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
std::unique_ptr<FlushJobInfo> info(new FlushJobInfo);
info->cf_id = cfd_->GetID();
info->cf_name = cfd_->GetName();
info->file_path = MakeTableFileName(cfd_->ioptions()->cf_paths[0].path,
meta_.fd.GetNumber());
const uint64_t file_number = meta_.fd.GetNumber();
info->file_path =
MakeTableFileName(cfd_->ioptions()->cf_paths[0].path, file_number);
info->file_number = file_number;
info->oldest_blob_file_number = meta_.oldest_blob_file_number;
info->thread_id = db_options_.env->GetThreadID();
info->job_id = job_context_->job_id;
info->smallest_seqno = meta_.fd.smallest_seqno;

@ -3,6 +3,7 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/blob_index.h"
#include "db/db_impl/db_impl.h"
#include "db/db_test_util.h"
#include "db/dbformat.h"
@ -42,6 +43,14 @@ class EventListenerTest : public DBTestBase {
public:
EventListenerTest() : DBTestBase("/listener_test") {}
static std::string BlobStr(uint64_t blob_file_number, uint64_t offset,
uint64_t size) {
std::string blob_index;
BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size,
kNoCompression);
return blob_index;
}
const size_t k110KB = 110 << 10;
};
@ -79,11 +88,47 @@ class TestPropertiesCollectorFactory : public TablePropertiesCollectorFactory {
class TestCompactionListener : public EventListener {
public:
explicit TestCompactionListener(EventListenerTest* test) : test_(test) {}
void OnCompactionCompleted(DB *db, const CompactionJobInfo& ci) override {
std::lock_guard<std::mutex> lock(mutex_);
compacted_dbs_.push_back(db);
ASSERT_GT(ci.input_files.size(), 0U);
ASSERT_EQ(ci.input_files.size(), ci.input_file_infos.size());
for (size_t i = 0; i < ci.input_file_infos.size(); ++i) {
ASSERT_EQ(ci.input_file_infos[i].level, ci.base_input_level);
ASSERT_EQ(ci.input_file_infos[i].file_number,
TableFileNameToNumber(ci.input_files[i]));
}
ASSERT_GT(ci.output_files.size(), 0U);
ASSERT_EQ(ci.output_files.size(), ci.output_file_infos.size());
ASSERT_TRUE(test_);
ASSERT_EQ(test_->db_, db);
std::vector<std::vector<FileMetaData>> files_by_level;
test_->dbfull()->TEST_GetFilesMetaData(test_->handles_[ci.cf_id],
&files_by_level);
ASSERT_GT(files_by_level.size(), ci.output_level);
for (size_t i = 0; i < ci.output_file_infos.size(); ++i) {
ASSERT_EQ(ci.output_file_infos[i].level, ci.output_level);
ASSERT_EQ(ci.output_file_infos[i].file_number,
TableFileNameToNumber(ci.output_files[i]));
auto it = std::find_if(
files_by_level[ci.output_level].begin(),
files_by_level[ci.output_level].end(), [&](const FileMetaData& meta) {
return meta.fd.GetNumber() == ci.output_file_infos[i].file_number;
});
ASSERT_NE(it, files_by_level[ci.output_level].end());
ASSERT_EQ(ci.output_file_infos[i].oldest_blob_file_number,
it->oldest_blob_file_number);
}
ASSERT_EQ(db->GetEnv()->GetThreadID(), ci.thread_id);
ASSERT_GT(ci.thread_id, 0U);
@ -98,6 +143,7 @@ class TestCompactionListener : public EventListener {
}
}
EventListenerTest* test_;
std::vector<DB*> compacted_dbs_;
std::mutex mutex_;
};
@ -125,13 +171,19 @@ TEST_F(EventListenerTest, OnSingleDBCompactionTest) {
options.table_properties_collector_factories.push_back(
std::make_shared<TestPropertiesCollectorFactory>());
TestCompactionListener* listener = new TestCompactionListener();
TestCompactionListener* listener = new TestCompactionListener(this);
options.listeners.emplace_back(listener);
std::vector<std::string> cf_names = {
"pikachu", "ilya", "muromec", "dobrynia",
"nikitich", "alyosha", "popovich"};
CreateAndReopenWithCF(cf_names, options);
ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
WriteBatch batch;
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 1, "ditto",
BlobStr(123, 0, 1 << 10)));
ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
@ -140,11 +192,9 @@ TEST_F(EventListenerTest, OnSingleDBCompactionTest) {
ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
for (int i = 1; i < 8; ++i) {
ASSERT_OK(Flush(i));
const Slice kRangeStart = "a";
const Slice kRangeEnd = "z";
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[i],
&kRangeStart, &kRangeEnd));
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[i],
nullptr, nullptr));
dbfull()->TEST_WaitForCompact();
}
@ -157,8 +207,8 @@ TEST_F(EventListenerTest, OnSingleDBCompactionTest) {
// This simple Listener can only handle one flush at a time.
class TestFlushListener : public EventListener {
public:
explicit TestFlushListener(Env* env)
: slowdown_count(0), stop_count(0), db_closed(), env_(env) {
TestFlushListener(Env* env, EventListenerTest* test)
: slowdown_count(0), stop_count(0), db_closed(), env_(env), test_(test) {
db_closed = false;
}
void OnTableFileCreated(
@ -210,6 +260,27 @@ class TestFlushListener : public EventListener {
ASSERT_EQ(prev_fc_info_.cf_name, info.cf_name);
ASSERT_EQ(prev_fc_info_.job_id, info.job_id);
ASSERT_EQ(prev_fc_info_.file_path, info.file_path);
ASSERT_EQ(TableFileNameToNumber(info.file_path), info.file_number);
// Note: the following chunk relies on the notification pertaining to the
// database pointed to by DBTestBase::db_, and is thus bypassed when
// that assumption does not hold (see the test case MultiDBMultiListeners
// below).
ASSERT_TRUE(test_);
if (db == test_->db_) {
std::vector<std::vector<FileMetaData>> files_by_level;
test_->dbfull()->TEST_GetFilesMetaData(test_->handles_[info.cf_id],
&files_by_level);
ASSERT_FALSE(files_by_level.empty());
auto it = std::find_if(files_by_level[0].begin(), files_by_level[0].end(),
[&](const FileMetaData& meta) {
return meta.fd.GetNumber() == info.file_number;
});
ASSERT_NE(it, files_by_level[0].end());
ASSERT_EQ(info.oldest_blob_file_number, it->oldest_blob_file_number);
}
ASSERT_EQ(db->GetEnv()->GetThreadID(), info.thread_id);
ASSERT_GT(info.thread_id, 0U);
ASSERT_EQ(info.table_properties.user_collected_properties.find("0")->second,
@ -226,6 +297,7 @@ class TestFlushListener : public EventListener {
protected:
Env* env_;
EventListenerTest* test_;
};
TEST_F(EventListenerTest, OnSingleDBFlushTest) {
@ -235,7 +307,7 @@ TEST_F(EventListenerTest, OnSingleDBFlushTest) {
#ifdef ROCKSDB_USING_THREAD_STATUS
options.enable_thread_tracking = true;
#endif // ROCKSDB_USING_THREAD_STATUS
TestFlushListener* listener = new TestFlushListener(options.env);
TestFlushListener* listener = new TestFlushListener(options.env, this);
options.listeners.emplace_back(listener);
std::vector<std::string> cf_names = {
"pikachu", "ilya", "muromec", "dobrynia",
@ -245,6 +317,12 @@ TEST_F(EventListenerTest, OnSingleDBFlushTest) {
CreateAndReopenWithCF(cf_names, options);
ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
WriteBatch batch;
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 1, "ditto",
BlobStr(456, 0, 1 << 10)));
ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
@ -272,7 +350,7 @@ TEST_F(EventListenerTest, MultiCF) {
#ifdef ROCKSDB_USING_THREAD_STATUS
options.enable_thread_tracking = true;
#endif // ROCKSDB_USING_THREAD_STATUS
TestFlushListener* listener = new TestFlushListener(options.env);
TestFlushListener* listener = new TestFlushListener(options.env, this);
options.listeners.emplace_back(listener);
options.table_properties_collector_factories.push_back(
std::make_shared<TestPropertiesCollectorFactory>());
@ -313,7 +391,7 @@ TEST_F(EventListenerTest, MultiDBMultiListeners) {
const int kNumDBs = 5;
const int kNumListeners = 10;
for (int i = 0; i < kNumListeners; ++i) {
listeners.emplace_back(new TestFlushListener(options.env));
listeners.emplace_back(new TestFlushListener(options.env, this));
}
std::vector<std::string> cf_names = {
@ -390,7 +468,7 @@ TEST_F(EventListenerTest, DisableBGCompaction) {
#ifdef ROCKSDB_USING_THREAD_STATUS
options.enable_thread_tracking = true;
#endif // ROCKSDB_USING_THREAD_STATUS
TestFlushListener* listener = new TestFlushListener(options.env);
TestFlushListener* listener = new TestFlushListener(options.env, this);
const int kCompactionTrigger = 1;
const int kSlowdownTrigger = 5;
const int kStopTrigger = 100;

@ -170,6 +170,10 @@ struct FlushJobInfo {
std::string cf_name;
// the path to the newly created file
std::string file_path;
// the file number of the newly created file
uint64_t file_number;
// the oldest blob file referenced by the newly created file
uint64_t oldest_blob_file_number;
// the id of the thread that completed this flush job.
uint64_t thread_id;
// the job id, which is unique in the same thread.
@ -194,11 +198,18 @@ struct FlushJobInfo {
FlushReason flush_reason;
};
struct CompactionJobInfo {
CompactionJobInfo() = default;
explicit CompactionJobInfo(const CompactionJobStats& _stats)
: stats(_stats) {}
struct CompactionFileInfo {
// The level of the file.
int level;
// The file number of the file.
uint64_t file_number;
// The file number of the oldest blob file this SST file references.
uint64_t oldest_blob_file_number;
};
struct CompactionJobInfo {
// the id of the column family where the compaction happened.
uint32_t cf_id;
// the name of the column family where the compaction happened.
@ -213,11 +224,25 @@ struct CompactionJobInfo {
int base_input_level;
// the output level of the compaction.
int output_level;
// the names of the compaction input files.
// The following variables contain information about compaction inputs
// and outputs. A file may appear in both the input and output lists
// if it was simply moved to a different level. The order of elements
// is the same across input_files and input_file_infos; similarly, it is
// the same across output_files and output_file_infos.
// The names of the compaction input files.
std::vector<std::string> input_files;
// the names of the compaction output files.
// Additional information about the compaction input files.
std::vector<CompactionFileInfo> input_file_infos;
// The names of the compaction output files.
std::vector<std::string> output_files;
// Additional information about the compaction output files.
std::vector<CompactionFileInfo> output_file_infos;
// Table properties for input and output tables.
// The map is keyed by values from input_files and output_files.
TablePropertiesCollection table_properties;

Loading…
Cancel
Save