diff --git a/db/compaction.h b/db/compaction.h index 36c62ff26..0bf63e6af 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -210,6 +210,14 @@ class Compaction { int output_level, VersionStorageInfo* vstorage, const std::vector& inputs); + TablePropertiesCollection GetOutputTableProperties() const { + return output_table_properties_; + } + + void SetOutputTableProperties(TablePropertiesCollection tp) { + output_table_properties_ = std::move(tp); + } + private: // mark (or clear) all files that are being compacted void MarkFilesBeingCompacted(bool mark_as_compacted); @@ -273,6 +281,9 @@ class Compaction { // Does input compression match the output compression? bool InputCompressionMatchesOutput() const; + + // table properties of output files + TablePropertiesCollection output_table_properties_; }; // Utility function diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 9ac4b16ec..fd8acaafd 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -77,6 +77,7 @@ struct CompactionJob::SubcompactionState { struct Output { FileMetaData meta; bool finished; + std::shared_ptr table_properties; }; // State kept for output being generated @@ -487,6 +488,16 @@ Status CompactionJob::Run() { } } + TablePropertiesCollection tp; + for (const auto& state : compact_->sub_compact_states) { + for (const auto& output : state.outputs) { + auto fn = TableFileName(db_options_.db_paths, output.meta.fd.GetNumber(), + output.meta.fd.GetPathId()); + tp[fn] = output.table_properties; + } + } + compact_->compaction->SetOutputTableProperties(std::move(tp)); + // Finish up all book-keeping to unify the subcompaction results AggregateStatistics(); UpdateCompactionStats(); @@ -814,7 +825,10 @@ Status CompactionJob::FinishCompactionOutputFile( delete iter; if (s.ok()) { - TableFileCreationInfo info(sub_compact->builder->GetTableProperties()); + auto tp = sub_compact->builder->GetTableProperties(); + sub_compact->current_output()->table_properties = + std::make_shared(tp); + TableFileCreationInfo info(std::move(tp)); info.db_name = dbname_; info.cf_name = cfd->GetName(); info.file_path = diff --git a/db/db_impl.cc b/db/db_impl.cc index 64fee85b0..c8f6fdd52 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1412,15 +1412,16 @@ Status DBImpl::FlushMemTableToOutputFile( if (s.ok()) { // may temporarily unlock and lock the mutex. NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options, - job_context->job_id); + job_context->job_id, flush_job.GetTableProperties()); } #endif // ROCKSDB_LITE return s; } -void DBImpl::NotifyOnFlushCompleted( - ColumnFamilyData* cfd, FileMetaData* file_meta, - const MutableCFOptions& mutable_cf_options, int job_id) { +void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd, + FileMetaData* file_meta, + const MutableCFOptions& mutable_cf_options, + int job_id, TableProperties prop) { #ifndef ROCKSDB_LITE if (db_options_.listeners.size() == 0U) { return; @@ -1450,6 +1451,7 @@ void DBImpl::NotifyOnFlushCompleted( info.triggered_writes_stop = triggered_writes_stop; info.smallest_seqno = file_meta->smallest_seqno; info.largest_seqno = file_meta->largest_seqno; + info.table_properties = prop; for (auto listener : db_options_.listeners) { listener->OnFlushCompleted(this, info); } @@ -1795,12 +1797,20 @@ void DBImpl::NotifyOnCompactionCompleted( info.base_input_level = c->start_level(); info.output_level = c->output_level(); info.stats = compaction_job_stats; + info.table_properties = c->GetOutputTableProperties(); for (size_t i = 0; i < c->num_input_levels(); ++i) { for (const auto fmd : *c->inputs(i)) { - info.input_files.push_back( - TableFileName(db_options_.db_paths, - fmd->fd.GetNumber(), - fmd->fd.GetPathId())); + auto fn = TableFileName(db_options_.db_paths, fmd->fd.GetNumber(), + fmd->fd.GetPathId()); + info.input_files.push_back(fn); + if (info.table_properties.count(fn) == 0) { + std::shared_ptr tp; + std::string fname; + auto s = cfd->current()->GetTableProperties(&tp, fmd, &fname); + if (s.ok()) { + info.table_properties[fn] = tp; + } + } } } for (const auto newf : c->edit()->GetNewFiles()) { diff --git a/db/db_impl.h b/db/db_impl.h index d7cc9db95..35558773e 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -375,7 +375,7 @@ class DBImpl : public DB { void NotifyOnFlushCompleted(ColumnFamilyData* cfd, FileMetaData* file_meta, const MutableCFOptions& mutable_cf_options, - int job_id); + int job_id, TableProperties prop); void NotifyOnCompactionCompleted(ColumnFamilyData* cfd, Compaction *c, const Status &st, diff --git a/db/event_helpers.cc b/db/event_helpers.cc index 9035c0c4b..7191e4cc6 100644 --- a/db/event_helpers.cc +++ b/db/event_helpers.cc @@ -53,7 +53,7 @@ void EventHelpers::LogAndNotifyTableFileCreation( info.table_properties.filter_policy_name; // user collected properties - for (const auto& prop : info.table_properties.user_collected_properties) { + for (const auto& prop : info.table_properties.readable_properties) { jwriter << prop.first << prop.second; } jwriter.EndObject(); diff --git a/db/flush_job.cc b/db/flush_job.cc index 816a141c2..f2d142298 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -231,14 +231,15 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression", &output_compression_); - s = BuildTable( - dbname_, db_options_.env, *cfd_->ioptions(), env_options_, - cfd_->table_cache(), iter.get(), meta, cfd_->internal_comparator(), - cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), - existing_snapshots_, output_compression_, - cfd_->ioptions()->compression_opts, - mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), - Env::IO_HIGH, &info.table_properties); + s = BuildTable(dbname_, db_options_.env, *cfd_->ioptions(), env_options_, + cfd_->table_cache(), iter.get(), meta, + cfd_->internal_comparator(), + cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), + existing_snapshots_, output_compression_, + cfd_->ioptions()->compression_opts, + mutable_cf_options_.paranoid_file_checks, + cfd_->internal_stats(), Env::IO_HIGH, &table_properties_); + info.table_properties = table_properties_; LogFlush(db_options_.info_log); } Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, diff --git a/db/flush_job.h b/db/flush_job.h index 14555ef56..6d9f63ea1 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -66,6 +66,7 @@ class FlushJob { ~FlushJob(); Status Run(FileMetaData* file_meta = nullptr); + TableProperties GetTableProperties() const { return table_properties_; } private: void ReportStartedFlush(); @@ -89,6 +90,7 @@ class FlushJob { CompressionType output_compression_; Statistics* stats_; EventLogger* event_logger_; + TableProperties table_properties_; }; } // namespace rocksdb diff --git a/db/listener_test.cc b/db/listener_test.cc index ce683a5b3..25ca218e5 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -152,6 +152,39 @@ class EventListenerTest : public testing::Test { std::vector handles_; }; +struct TestPropertiesCollector : public rocksdb::TablePropertiesCollector { + virtual rocksdb::Status AddUserKey(const rocksdb::Slice& key, + const rocksdb::Slice& value, + rocksdb::EntryType type, + rocksdb::SequenceNumber seq, + uint64_t file_size) { + return Status::OK(); + } + virtual rocksdb::Status Finish(rocksdb::UserCollectedProperties* properties) { + properties->insert({"0", "1"}); + return Status::OK(); + } + + virtual const char* Name() const override { + return "TestTablePropertiesCollector"; + } + + rocksdb::UserCollectedProperties GetReadableProperties() const override { + rocksdb::UserCollectedProperties ret; + ret["2"] = "3"; + return ret; + } +}; + +class TestPropertiesCollectorFactory : public TablePropertiesCollectorFactory { + public: + virtual TablePropertiesCollector* CreateTablePropertiesCollector( + TablePropertiesCollectorFactory::Context context) override { + return new TestPropertiesCollector; + } + const char* Name() const override { return "TestTablePropertiesCollector"; } +}; + class TestCompactionListener : public EventListener { public: void OnCompactionCompleted(DB *db, const CompactionJobInfo& ci) override { @@ -161,6 +194,16 @@ class TestCompactionListener : public EventListener { ASSERT_GT(ci.output_files.size(), 0U); ASSERT_EQ(db->GetEnv()->GetThreadID(), ci.thread_id); ASSERT_GT(ci.thread_id, 0U); + + for (auto fl : {ci.input_files, ci.output_files}) { + for (auto fn : fl) { + auto it = ci.table_properties.find(fn); + ASSERT_NE(it, ci.table_properties.end()); + auto tp = it->second; + ASSERT_TRUE(tp != nullptr); + ASSERT_EQ(tp->user_collected_properties.find("0")->second, "1"); + } + } } std::vector compacted_dbs_; @@ -186,6 +229,8 @@ TEST_F(EventListenerTest, OnSingleDBCompactionTest) { options.enable_thread_tracking = true; #endif // ROCKSDB_USING_THREAD_STATUS options.level0_file_num_compaction_trigger = kNumL0Files; + options.table_properties_collector_factories.push_back( + std::make_shared()); TestCompactionListener* listener = new TestCompactionListener(); options.listeners.emplace_back(listener); @@ -274,6 +319,8 @@ class TestFlushListener : public EventListener { ASSERT_EQ(prev_fc_info_.file_path, info.file_path); ASSERT_EQ(db->GetEnv()->GetThreadID(), info.thread_id); ASSERT_GT(info.thread_id, 0U); + ASSERT_EQ(info.table_properties.user_collected_properties.find("0")->second, + "1"); } std::vector flushed_column_family_names_; @@ -299,6 +346,8 @@ TEST_F(EventListenerTest, OnSingleDBFlushTest) { std::vector cf_names = { "pikachu", "ilya", "muromec", "dobrynia", "nikitich", "alyosha", "popovich"}; + options.table_properties_collector_factories.push_back( + std::make_shared()); CreateAndReopenWithCF(cf_names, &options); ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p'))); @@ -330,6 +379,8 @@ TEST_F(EventListenerTest, MultiCF) { #endif // ROCKSDB_USING_THREAD_STATUS TestFlushListener* listener = new TestFlushListener(options.env); options.listeners.emplace_back(listener); + options.table_properties_collector_factories.push_back( + std::make_shared()); std::vector cf_names = { "pikachu", "ilya", "muromec", "dobrynia", "nikitich", "alyosha", "popovich"}; @@ -360,6 +411,8 @@ TEST_F(EventListenerTest, MultiDBMultiListeners) { #if ROCKSDB_USING_THREAD_STATUS options.enable_thread_tracking = true; #endif // ROCKSDB_USING_THREAD_STATUS + options.table_properties_collector_factories.push_back( + std::make_shared()); std::vector listeners; const int kNumDBs = 5; const int kNumListeners = 10; @@ -454,6 +507,8 @@ TEST_F(EventListenerTest, DisableBGCompaction) { options.compaction_style = kCompactionStyleNone; options.compression = kNoCompression; options.write_buffer_size = 100000; // Small write buffer + options.table_properties_collector_factories.push_back( + std::make_shared()); CreateAndReopenWithCF({"pikachu"}, &options); ColumnFamilyMetaData cf_meta; diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index f693d5c9b..ba98c894d 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -4,7 +4,9 @@ #pragma once +#include #include +#include #include #include "rocksdb/compaction_job_stats.h" #include "rocksdb/status.h" @@ -12,6 +14,9 @@ namespace rocksdb { +typedef std::unordered_map> + TablePropertiesCollection; + class DB; class Status; struct CompactionJobStats; @@ -72,6 +77,8 @@ struct FlushJobInfo { SequenceNumber smallest_seqno; // The largest sequence number in the newly created file SequenceNumber largest_seqno; + // Table properties of the table being flushed + TableProperties table_properties; }; struct CompactionJobInfo { @@ -93,8 +100,13 @@ struct CompactionJobInfo { int output_level; // the names of the compaction input files. std::vector input_files; + // the names of the compaction output files. std::vector output_files; + // Table properties for input and output tables. + // The map is keyed by values from input_files and output_files. + TablePropertiesCollection table_properties; + // If non-null, this variable stores detailed information // about this compaction. CompactionJobStats stats; diff --git a/include/rocksdb/table_properties.h b/include/rocksdb/table_properties.h index 25173a942..45dde779a 100644 --- a/include/rocksdb/table_properties.h +++ b/include/rocksdb/table_properties.h @@ -56,6 +56,7 @@ struct TableProperties { // user collected properties UserCollectedProperties user_collected_properties; + UserCollectedProperties readable_properties; // convert this object to a human readable form // @prop_delim: delimiter for each property. diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index db6e3a453..402528a52 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -873,8 +873,9 @@ TableProperties BlockBasedTableBuilder::GetTableProperties() const { TableProperties ret = rep_->props; for (const auto& collector : rep_->table_properties_collectors) { for (const auto& prop : collector->GetReadableProperties()) { - ret.user_collected_properties.insert(prop); + ret.readable_properties.insert(prop); } + collector->Finish(&ret.user_collected_properties); } return ret; }