From 6df589b446b3bd4b468a36a07b959d4d98a75fc7 Mon Sep 17 00:00:00 2001 From: sdong Date: Thu, 4 Jun 2015 12:03:40 -0700 Subject: [PATCH] Add TablePropertiesCollector::NeedCompact() to suggest DB to further compact output files Summary: It is experimental. Allow users to return from a call back function TablePropertiesCollector::NeedCompact(), based on the data in the file. It can be used to allow users to suggest DB to clear up delete tombstones faster. Test Plan: Add a unit test. Reviewers: igor, yhchiang, kradhakrishnan, rven Reviewed By: rven Subscribers: yoshinorim, MarkCallaghan, maykov, leveldb, dhruba Differential Revision: https://reviews.facebook.net/D39585 --- db/builder.cc | 1 + db/compaction_job.cc | 10 ++- db/compaction_job_test.cc | 2 +- db/db_impl.cc | 9 ++- db/db_impl_experimental.cc | 3 +- db/db_test.cc | 117 ++++++++++++++++++++++++++++- db/flush_job.cc | 3 +- db/repair.cc | 3 +- db/table_properties_collector.h | 6 ++ db/version_builder_test.cc | 30 ++++---- db/version_edit.h | 4 +- db/version_edit_test.cc | 7 +- db/version_set.cc | 17 ++++- include/rocksdb/table_properties.h | 3 + table/block_based_table_builder.cc | 9 +++ table/block_based_table_builder.h | 2 + table/table_builder.h | 4 + 17 files changed, 196 insertions(+), 34 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index 2a33bb008..9436534fc 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -222,6 +222,7 @@ Status BuildTable( } if (s.ok()) { meta->fd.file_size = builder->FileSize(); + meta->marked_for_compaction = builder->NeedCompact(); assert(meta->fd.GetFileSize() > 0); if (table_properties) { *table_properties = builder->GetTableProperties(); diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 761668e16..47275f00a 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -66,6 +66,7 @@ struct CompactionJob::CompactionState { uint64_t file_size; InternalKey smallest, largest; SequenceNumber smallest_seqno, largest_seqno; + bool need_compaction; }; std::vector outputs; @@ -1016,6 +1017,8 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) { // Check for iterator errors Status s = input->status(); const uint64_t current_entries = compact_->builder->NumEntries(); + compact_->current_output()->need_compaction = + compact_->builder->NeedCompact(); if (s.ok()) { s = compact_->builder->Finish(); } else { @@ -1106,9 +1109,10 @@ Status CompactionJob::InstallCompactionResults( compaction->AddInputDeletions(compact_->compaction->edit()); for (size_t i = 0; i < compact_->outputs.size(); i++) { const CompactionState::Output& out = compact_->outputs[i]; - compaction->edit()->AddFile( - compaction->output_level(), out.number, out.path_id, out.file_size, - out.smallest, out.largest, out.smallest_seqno, out.largest_seqno); + compaction->edit()->AddFile(compaction->output_level(), out.number, + out.path_id, out.file_size, out.smallest, + out.largest, out.smallest_seqno, + out.largest_seqno, out.need_compaction); } return versions_->LogAndApply(compaction->column_family_data(), mutable_cf_options, compaction->edit(), diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index e0ab453e9..79d12d8a7 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -90,7 +90,7 @@ class CompactionJobTest : public testing::Test { VersionEdit edit; edit.AddFile(0, file_number, 0, 10, smallest, largest, smallest_seqno, - largest_seqno); + largest_seqno, false); mutex_.Lock(); versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(), diff --git a/db/db_impl.cc b/db/db_impl.cc index fb70ff0b0..b4f7a4c12 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1207,7 +1207,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, if (s.ok() && meta.fd.GetFileSize() > 0) { edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(), meta.fd.GetFileSize(), meta.smallest, meta.largest, - meta.smallest_seqno, meta.largest_seqno); + meta.smallest_seqno, meta.largest_seqno, + meta.marked_for_compaction); } InternalStats::CompactionStats stats(1); @@ -1768,7 +1769,8 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { edit.DeleteFile(level, f->fd.GetNumber()); edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, f->largest, - f->smallest_seqno, f->largest_seqno); + f->smallest_seqno, f->largest_seqno, + f->marked_for_compaction); } Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log, "[%s] Apply version edit:\n%s", @@ -2457,7 +2459,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, f->largest, - f->smallest_seqno, f->largest_seqno); + f->smallest_seqno, f->largest_seqno, + f->marked_for_compaction); LogToBuffer(log_buffer, "[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n", diff --git a/db/db_impl_experimental.cc b/db/db_impl_experimental.cc index d6c3dfc95..65529307b 100644 --- a/db/db_impl_experimental.cc +++ b/db/db_impl_experimental.cc @@ -130,7 +130,8 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) { edit.DeleteFile(0, f->fd.GetNumber()); edit.AddFile(target_level, f->fd.GetNumber(), f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, f->largest, - f->smallest_seqno, f->largest_seqno); + f->smallest_seqno, f->largest_seqno, + f->marked_for_compaction); } status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), diff --git a/db/db_test.cc b/db/db_test.cc index f5b55fe97..079cdc684 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -12957,6 +12957,121 @@ TEST_F(DBTest, CompressLevelCompaction) { Destroy(options); } +class CountingDeleteTabPropCollector : public TablePropertiesCollector { + public: + const char* Name() const override { return "CountingDeleteTabPropCollector"; } + + Status AddUserKey(const Slice& user_key, const Slice& value, EntryType type, + SequenceNumber seq, uint64_t file_size) override { + if (type == kEntryDelete) { + num_deletes_++; + } + return Status::OK(); + } + + bool NeedCompact() const override { return num_deletes_ > 10; } + + UserCollectedProperties GetReadableProperties() const override { + return UserCollectedProperties{}; + } + + Status Finish(UserCollectedProperties* properties) override { + *properties = + UserCollectedProperties{{"num_delete", ToString(num_deletes_)}}; + return Status::OK(); + } + + private: + uint32_t num_deletes_ = 0; +}; + +class CountingDeleteTabPropCollectorFactory + : public TablePropertiesCollectorFactory { + public: + virtual TablePropertiesCollector* CreateTablePropertiesCollector() override { + return new CountingDeleteTabPropCollector(); + } + const char* Name() const override { + return "CountingDeleteTabPropCollectorFactory"; + } +}; + +TEST_F(DBTest, TablePropertiesNeedCompactTest) { + Random rnd(301); + + Options options; + options.create_if_missing = true; + options.write_buffer_size = 4096; + options.max_write_buffer_number = 8; + options.level0_file_num_compaction_trigger = 2; + options.level0_slowdown_writes_trigger = 2; + options.level0_stop_writes_trigger = 4; + options.target_file_size_base = 2048; + options.max_bytes_for_level_base = 10240; + options.max_bytes_for_level_multiplier = 4; + options.hard_rate_limit = 1.1; + options.num_levels = 8; + + std::shared_ptr collector_factory( + new CountingDeleteTabPropCollectorFactory); + options.table_properties_collector_factories.resize(1); + options.table_properties_collector_factories[0] = collector_factory; + + DestroyAndReopen(options); + + const int kMaxKey = 1000; + for (int i = 0; i < kMaxKey; i++) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, 102))); + ASSERT_OK(Put(Key(kMaxKey + i), RandomString(&rnd, 102))); + } + Flush(); + dbfull()->TEST_WaitForCompact(); + if (NumTableFilesAtLevel(0) == 1) { + // Clear Level 0 so that when later flush a file with deletions, + // we don't trigger an organic compaction. + ASSERT_OK(Put(Key(0), "")); + ASSERT_OK(Put(Key(kMaxKey * 2), "")); + Flush(); + dbfull()->TEST_WaitForCompact(); + } + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + + { + int c = 0; + std::unique_ptr iter(db_->NewIterator(ReadOptions())); + iter->Seek(Key(kMaxKey - 100)); + while (iter->Valid() && iter->key().compare(Key(kMaxKey + 100)) < 0) { + iter->Next(); + ++c; + } + ASSERT_EQ(c, 200); + } + + Delete(Key(0)); + for (int i = kMaxKey - 100; i < kMaxKey + 100; i++) { + Delete(Key(i)); + } + Delete(Key(kMaxKey * 2)); + + Flush(); + dbfull()->TEST_WaitForCompact(); + + { + SetPerfLevel(kEnableCount); + perf_context.Reset(); + int c = 0; + std::unique_ptr iter(db_->NewIterator(ReadOptions())); + iter->Seek(Key(kMaxKey - 100)); + while (iter->Valid() && iter->key().compare(Key(kMaxKey + 100)) < 0) { + iter->Next(); + } + ASSERT_EQ(c, 0); + ASSERT_LT(perf_context.internal_delete_skipped_count, 30u); + ASSERT_LT(perf_context.internal_key_skipped_count, 30u); + SetPerfLevel(kDisable); + } +} + TEST_F(DBTest, SuggestCompactRangeTest) { class CompactionFilterFactoryGetContext : public CompactionFilterFactory { public: @@ -12971,7 +13086,7 @@ TEST_F(DBTest, SuggestCompactRangeTest) { } static bool IsManual(CompactionFilterFactory* compaction_filter_factory) { return reinterpret_cast( - compaction_filter_factory)->saved_context.is_manual_compaction; + compaction_filter_factory)->saved_context.is_manual_compaction; } CompactionFilter::Context saved_context; }; diff --git a/db/flush_job.cc b/db/flush_job.cc index c59d56cef..7536b2ba8 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -296,7 +296,8 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, } edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(), meta.fd.GetFileSize(), meta.smallest, meta.largest, - meta.smallest_seqno, meta.largest_seqno); + meta.smallest_seqno, meta.largest_seqno, + meta.marked_for_compaction); } InternalStats::CompactionStats stats(1); diff --git a/db/repair.cc b/db/repair.cc index ed748bd86..15831899f 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -401,7 +401,8 @@ class Repairer { const TableInfo& t = tables_[i]; edit_->AddFile(0, t.meta.fd.GetNumber(), t.meta.fd.GetPathId(), t.meta.fd.GetFileSize(), t.meta.smallest, t.meta.largest, - t.min_sequence, t.max_sequence); + t.min_sequence, t.max_sequence, + t.meta.marked_for_compaction); } //fprintf(stderr, "NewDescriptor:\n%s\n", edit_.DebugString().c_str()); diff --git a/db/table_properties_collector.h b/db/table_properties_collector.h index 79bf132f6..4bd01fd16 100644 --- a/db/table_properties_collector.h +++ b/db/table_properties_collector.h @@ -32,6 +32,8 @@ class IntTblPropCollector { uint64_t file_size) = 0; virtual UserCollectedProperties GetReadableProperties() const = 0; + + virtual bool NeedCompact() const { return false; } }; // Facrtory for internal table properties collector. @@ -98,6 +100,10 @@ class UserKeyTablePropertiesCollector : public IntTblPropCollector { UserCollectedProperties GetReadableProperties() const override; + virtual bool NeedCompact() const override { + return collector_->NeedCompact(); + } + protected: std::unique_ptr collector_; }; diff --git a/db/version_builder_test.cc b/db/version_builder_test.cc index 25eeadb72..5312fd1ba 100644 --- a/db/version_builder_test.cc +++ b/db/version_builder_test.cc @@ -115,7 +115,7 @@ TEST_F(VersionBuilderTest, ApplyAndSaveTo) { VersionEdit version_edit; version_edit.AddFile(2, 666, 0, 100U, GetInternalKey("301"), - GetInternalKey("350"), 200, 200); + GetInternalKey("350"), 200, 200, false); version_edit.DeleteFile(3, 27U); EnvOptions env_options; @@ -149,7 +149,7 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic) { VersionEdit version_edit; version_edit.AddFile(3, 666, 0, 100U, GetInternalKey("301"), - GetInternalKey("350"), 200, 200); + GetInternalKey("350"), 200, 200, false); version_edit.DeleteFile(0, 1U); version_edit.DeleteFile(0, 88U); @@ -186,7 +186,7 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic2) { VersionEdit version_edit; version_edit.AddFile(4, 666, 0, 100U, GetInternalKey("301"), - GetInternalKey("350"), 200, 200); + GetInternalKey("350"), 200, 200, false); version_edit.DeleteFile(0, 1U); version_edit.DeleteFile(0, 88U); version_edit.DeleteFile(4, 6U); @@ -214,15 +214,15 @@ TEST_F(VersionBuilderTest, ApplyMultipleAndSaveTo) { VersionEdit version_edit; version_edit.AddFile(2, 666, 0, 100U, GetInternalKey("301"), - GetInternalKey("350"), 200, 200); + GetInternalKey("350"), 200, 200, false); version_edit.AddFile(2, 676, 0, 100U, GetInternalKey("401"), - GetInternalKey("450"), 200, 200); + GetInternalKey("450"), 200, 200, false); version_edit.AddFile(2, 636, 0, 100U, GetInternalKey("601"), - GetInternalKey("650"), 200, 200); + GetInternalKey("650"), 200, 200, false); version_edit.AddFile(2, 616, 0, 100U, GetInternalKey("501"), - GetInternalKey("550"), 200, 200); + GetInternalKey("550"), 200, 200, false); version_edit.AddFile(2, 606, 0, 100U, GetInternalKey("701"), - GetInternalKey("750"), 200, 200); + GetInternalKey("750"), 200, 200, false); EnvOptions env_options; @@ -248,24 +248,24 @@ TEST_F(VersionBuilderTest, ApplyDeleteAndSaveTo) { VersionEdit version_edit; version_edit.AddFile(2, 666, 0, 100U, GetInternalKey("301"), - GetInternalKey("350"), 200, 200); + GetInternalKey("350"), 200, 200, false); version_edit.AddFile(2, 676, 0, 100U, GetInternalKey("401"), - GetInternalKey("450"), 200, 200); + GetInternalKey("450"), 200, 200, false); version_edit.AddFile(2, 636, 0, 100U, GetInternalKey("601"), - GetInternalKey("650"), 200, 200); + GetInternalKey("650"), 200, 200, false); version_edit.AddFile(2, 616, 0, 100U, GetInternalKey("501"), - GetInternalKey("550"), 200, 200); + GetInternalKey("550"), 200, 200, false); version_edit.AddFile(2, 606, 0, 100U, GetInternalKey("701"), - GetInternalKey("750"), 200, 200); + GetInternalKey("750"), 200, 200, false); version_builder.Apply(&version_edit); VersionEdit version_edit2; version_edit.AddFile(2, 808, 0, 100U, GetInternalKey("901"), - GetInternalKey("950"), 200, 200); + GetInternalKey("950"), 200, 200, false); version_edit2.DeleteFile(2, 616); version_edit2.DeleteFile(2, 636); version_edit.AddFile(2, 806, 0, 100U, GetInternalKey("801"), - GetInternalKey("850"), 200, 200); + GetInternalKey("850"), 200, 200, false); version_builder.Apply(&version_edit2); version_builder.SaveTo(&new_vstorage); diff --git a/db/version_edit.h b/db/version_edit.h index 6da4f5b02..a978d4780 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -169,7 +169,8 @@ class VersionEdit { void AddFile(int level, uint64_t file, uint32_t file_path_id, uint64_t file_size, const InternalKey& smallest, const InternalKey& largest, const SequenceNumber& smallest_seqno, - const SequenceNumber& largest_seqno) { + const SequenceNumber& largest_seqno, + bool marked_for_compaction) { assert(smallest_seqno <= largest_seqno); FileMetaData f; f.fd = FileDescriptor(file, file_path_id, file_size); @@ -177,6 +178,7 @@ class VersionEdit { f.largest = largest; f.smallest_seqno = smallest_seqno; f.largest_seqno = largest_seqno; + f.marked_for_compaction = marked_for_compaction; new_files_.push_back(std::make_pair(level, f)); } diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index 8b7b31bdd..4186e08e6 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -34,7 +34,7 @@ TEST_F(VersionEditTest, EncodeDecode) { edit.AddFile(3, kBig + 300 + i, kBig32Bit + 400 + i, 0, InternalKey("foo", kBig + 500 + i, kTypeValue), InternalKey("zoo", kBig + 600 + i, kTypeDeletion), - kBig + 500 + i, kBig + 600 + i); + kBig + 500 + i, kBig + 600 + i, false); edit.DeleteFile(4, kBig + 700 + i); } @@ -47,10 +47,7 @@ TEST_F(VersionEditTest, EncodeDecode) { TEST_F(VersionEditTest, EncodeEmptyFile) { VersionEdit edit; - edit.AddFile(0, 0, 0, 0, - InternalKey(), - InternalKey(), - 0, 0); + edit.AddFile(0, 0, 0, 0, InternalKey(), InternalKey(), 0, 0, false); std::string buffer; ASSERT_TRUE(!edit.EncodeTo(&buffer)); } diff --git a/db/version_set.cc b/db/version_set.cc index 4e458b517..5d2f2f801 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1078,7 +1078,19 @@ void VersionStorageInfo::ComputeCompactionScore( void VersionStorageInfo::ComputeFilesMarkedForCompaction() { files_marked_for_compaction_.clear(); - for (int level = 0; level <= MaxInputLevel(); level++) { + int last_qualify_level = 0; + + // Do not include files from the last level with data + // If table properties collector suggests a file on the last level, + // we should not move it to a new level. + for (int level = num_levels() - 1; level >= 1; level--) { + if (!files_[level].empty()) { + last_qualify_level = level - 1; + break; + } + } + + for (int level = 0; level <= last_qualify_level; level++) { for (auto* f : files_[level]) { if (!f->being_compacted && f->marked_for_compaction) { files_marked_for_compaction_.emplace_back(level, f); @@ -2785,7 +2797,8 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { cfd->current()->storage_info()->LevelFiles(level)) { edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, f->largest, - f->smallest_seqno, f->largest_seqno); + f->smallest_seqno, f->largest_seqno, + f->marked_for_compaction); } } edit.SetLogNumber(cfd->GetLogNumber()); diff --git a/include/rocksdb/table_properties.h b/include/rocksdb/table_properties.h index 9a515cf45..74dfbeb46 100644 --- a/include/rocksdb/table_properties.h +++ b/include/rocksdb/table_properties.h @@ -129,6 +129,9 @@ class TablePropertiesCollector { // The name of the properties collector can be used for debugging purpose. virtual const char* Name() const = 0; + + // EXPERIMENTAL Return whether the output file should be further compacted + virtual bool NeedCompact() const { return false; } }; // Constructs TablePropertiesCollector. Internals create a new diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 201f1285e..c4b8b0eb3 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -848,6 +848,15 @@ uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; } +bool BlockBasedTableBuilder::NeedCompact() const { + for (const auto& collector : rep_->table_properties_collectors) { + if (collector->NeedCompact()) { + return true; + } + } + return false; +} + TableProperties BlockBasedTableBuilder::GetTableProperties() const { TableProperties ret = rep_->props; for (const auto& collector : rep_->table_properties_collectors) { diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index 716a4e9ba..5dc58ec94 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -70,6 +70,8 @@ class BlockBasedTableBuilder : public TableBuilder { // Finish() call, returns the size of the final generated file. uint64_t FileSize() const override; + bool NeedCompact() const override; + // Get table properties TableProperties GetTableProperties() const override; diff --git a/table/table_builder.h b/table/table_builder.h index 19da4c26d..2c9a13424 100644 --- a/table/table_builder.h +++ b/table/table_builder.h @@ -82,6 +82,10 @@ class TableBuilder { // Finish() call, returns the size of the final generated file. virtual uint64_t FileSize() const = 0; + // If the user defined table properties collector suggest the file to + // be further compacted. + virtual bool NeedCompact() const { return false; } + // Returns table properties virtual TableProperties GetTableProperties() const = 0; };