From e813f5b6d96a5af8300b3b615a67bf6b4ecda776 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Tue, 24 Jun 2014 16:37:06 -0600 Subject: [PATCH] Allow compaction to reclaim storage more effectively. Summary: This diff allows compaction to reclaim storage more effectively. In the current design, compactions are mainly triggered based on the file sizes. However, since deletion entries does not have value, files which have many deletion entries are less likely to be compacted. As a result, it may took a while to make deletion entries to be compacted. This diff address issue by compensating the size of deletion entries during compaction process: the size of each deletion entry in the compaction process is augmented by 2x average value size. The diff applies to both leveled and universal compacitons. Test Plan: develop CompactionDeletionTrigger make db_test ./db_test Reviewers: haobo, igor, ljin, sdong Reviewed By: sdong Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19029 --- db/compaction_picker.cc | 63 +++++++-------- db/db_test.cc | 113 +++++++++++++++++++++++++++ db/version_edit.h | 11 +++ db/version_set.cc | 167 ++++++++++++++++++++++++++++++---------- db/version_set.h | 38 +++++++++ 5 files changed, 320 insertions(+), 72 deletions(-) diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index b6e407e63..67ba9cd4a 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -19,10 +19,10 @@ namespace rocksdb { namespace { -uint64_t TotalFileSize(const std::vector& files) { +uint64_t TotalCompensatedFileSize(const std::vector& files) { uint64_t sum = 0; for (size_t i = 0; i < files.size() && files[i]; i++) { - sum += files[i]->fd.GetFileSize(); + sum += files[i]->compensated_file_size; } return sum; } @@ -80,7 +80,7 @@ void CompactionPicker::SizeBeingCompacted(std::vector& sizes) { for (auto c : compactions_in_progress_[level]) { assert(c->level() == level); for (int i = 0; i < c->num_input_files(0); i++) { - total += c->input(0, i)->fd.GetFileSize(); + total += c->input(0, i)->compensated_file_size; } } sizes[level] = total; @@ -261,9 +261,9 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) { std::vector expanded0; c->input_version_->GetOverlappingInputs( level, &all_start, &all_limit, &expanded0, c->base_index_, nullptr); - const uint64_t inputs0_size = TotalFileSize(c->inputs_[0]); - const uint64_t inputs1_size = TotalFileSize(c->inputs_[1]); - const uint64_t expanded0_size = TotalFileSize(expanded0); + const uint64_t inputs0_size = TotalCompensatedFileSize(c->inputs_[0]); + const uint64_t inputs1_size = TotalCompensatedFileSize(c->inputs_[1]); + const uint64_t expanded0_size = TotalCompensatedFileSize(expanded0); uint64_t limit = ExpandedCompactionByteSizeLimit(level); if (expanded0.size() > c->inputs_[0].size() && inputs1_size + expanded0_size < limit && @@ -335,7 +335,7 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level, MaxFileSizeForLevel(input_level) * options_->source_compaction_factor; uint64_t total = 0; for (size_t i = 0; i + 1 < inputs.size(); ++i) { - uint64_t s = inputs[i]->fd.GetFileSize(); + uint64_t s = inputs[i]->compensated_file_size; total += s; if (total >= limit) { **compaction_end = inputs[i + 1]->smallest; @@ -483,11 +483,11 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version, FileMetaData* f = c->input_version_->files_[level][index]; // check to verify files are arranged in descending size - assert( - (i == file_size.size() - 1) || - (i >= Version::number_of_files_to_sort_ - 1) || - (f->fd.GetFileSize() >= - c->input_version_->files_[level][file_size[i + 1]]->fd.GetFileSize())); + assert((i == file_size.size() - 1) || + (i >= Version::number_of_files_to_sort_ - 1) || + (f->compensated_file_size >= + c->input_version_->files_[level][file_size[i + 1]]-> + compensated_file_size)); // do not pick a file to compact if it is being compacted // from n-1 level. @@ -665,7 +665,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // This file is not being compacted. Consider it as the // first candidate to be compacted. - uint64_t candidate_size = f != nullptr ? f->fd.GetFileSize() : 0; + uint64_t candidate_size = f != nullptr? f->compensated_file_size : 0; if (f != nullptr) { LogToBuffer(log_buffer, "[%s] Universal: Possible candidate file %lu[%d].", @@ -703,9 +703,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // by the last-resort read amp strategy which disregards size ratios. break; } - candidate_size = f->fd.GetFileSize(); + candidate_size = f->compensated_file_size; } else { // default kCompactionStopStyleTotalSize - candidate_size += f->fd.GetFileSize(); + candidate_size += f->compensated_file_size; } candidate_count++; } @@ -721,10 +721,10 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( int index = file_by_time[i]; FileMetaData* f = version->files_[level][index]; LogToBuffer(log_buffer, - "[%s] Universal: Skipping file %lu[%d] with size %lu %d\n", - version->cfd_->GetName().c_str(), - (unsigned long)f->fd.GetNumber(), i, - (unsigned long)f->fd.GetFileSize(), f->being_compacted); + "[%s] Universal: Skipping file %" PRIu64 "[%d] " + "with size %" PRIu64 " (compensated size %" PRIu64 ") %d\n", + version->cfd_->GetName().c_str(), f->fd.GetNumber(), + i, f->fd.GetFileSize(), f->compensated_file_size, f->being_compacted); } } } @@ -759,10 +759,12 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( int index = file_by_time[i]; FileMetaData* f = c->input_version_->files_[level][index]; c->inputs_[0].push_back(f); - LogToBuffer( - log_buffer, "[%s] Universal: Picking file %lu[%d] with size %lu\n", - version->cfd_->GetName().c_str(), (unsigned long)f->fd.GetNumber(), i, - (unsigned long)f->fd.GetFileSize()); + LogToBuffer(log_buffer, + "[%s] Universal: Picking file %" PRIu64 "[%d] " + "with size %" PRIu64 " (compensated size %" PRIu64 ")\n", + version->cfd_->GetName().c_str(), + f->fd.GetNumber(), i, + f->fd.GetFileSize(), f->compensated_file_size); } return c; } @@ -826,7 +828,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( " is already being compacted. No size amp reduction possible.\n"); return nullptr; } - candidate_size += f->fd.GetFileSize(); + candidate_size += f->compensated_file_size; candidate_count++; } if (candidate_count == 0) { @@ -866,10 +868,11 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( f = c->input_version_->files_[level][index]; c->inputs_[0].push_back(f); LogToBuffer(log_buffer, - "[%s] Universal: size amp picking file %lu[%d] with size %lu", - version->cfd_->GetName().c_str(), - (unsigned long)f->fd.GetNumber(), index, - (unsigned long)f->fd.GetFileSize()); + "[%s] Universal: size amp picking file %" PRIu64 "[%d] " + "with size %" PRIu64 " (compensated size %" PRIu64 ")", + version->cfd_->GetName().c_str(), + f->fd.GetNumber(), index, + f->fd.GetFileSize(), f->compensated_file_size); } return c; } @@ -879,7 +882,7 @@ Compaction* FIFOCompactionPicker::PickCompaction(Version* version, assert(version->NumberLevels() == 1); uint64_t total_size = 0; for (const auto& file : version->files_[0]) { - total_size += file->fd.GetFileSize(); + total_size += file->compensated_file_size; } if (total_size <= options_->compaction_options_fifo.max_table_files_size || @@ -907,7 +910,7 @@ Compaction* FIFOCompactionPicker::PickCompaction(Version* version, for (auto ritr = version->files_[0].rbegin(); ritr != version->files_[0].rend(); ++ritr) { auto f = *ritr; - total_size -= f->fd.GetFileSize(); + total_size -= f->compensated_file_size; c->inputs_[0].push_back(f); char tmp_fsize[16]; AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize)); diff --git a/db/db_test.cc b/db/db_test.cc index ab559b53a..6344722ed 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2726,6 +2726,119 @@ TEST(DBTest, CompactionTrigger) { ASSERT_EQ(NumTableFilesAtLevel(1, 1), 1); } +namespace { +static const int kCDTValueSize = 1000; +static const int kCDTKeysPerBuffer = 4; +static const int kCDTNumLevels = 8; +Options DeletionTriggerOptions() { + Options options; + options.compression = kNoCompression; + options.write_buffer_size = kCDTKeysPerBuffer * (kCDTValueSize + 24); + options.min_write_buffer_number_to_merge = 1; + options.num_levels = kCDTNumLevels; + options.max_mem_compaction_level = 0; + options.level0_file_num_compaction_trigger = 1; + options.target_file_size_base = options.write_buffer_size * 2; + options.target_file_size_multiplier = 2; + options.max_bytes_for_level_base = + options.target_file_size_base * options.target_file_size_multiplier; + options.max_bytes_for_level_multiplier = 2; + options.disable_auto_compactions = false; + return options; +} +} // anonymous namespace + +TEST(DBTest, CompactionDeletionTrigger) { + Options options = DeletionTriggerOptions(); + options.create_if_missing = true; + + for (int tid = 0; tid < 2; ++tid) { + uint64_t db_size[2]; + + DestroyAndReopen(&options); + Random rnd(301); + + const int kTestSize = kCDTKeysPerBuffer * 512; + std::vector values; + for (int k = 0; k < kTestSize; ++k) { + values.push_back(RandomString(&rnd, kCDTValueSize)); + ASSERT_OK(Put(Key(k), values[k])); + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + db_size[0] = Size(Key(0), Key(kTestSize - 1)); + + for (int k = 0; k < kTestSize; ++k) { + ASSERT_OK(Delete(Key(k))); + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + db_size[1] = Size(Key(0), Key(kTestSize - 1)); + + // must have much smaller db size. + ASSERT_GT(db_size[0] / 3, db_size[1]); + + // repeat the test with universal compaction + options.compaction_style = kCompactionStyleUniversal; + options.num_levels = 1; + } +} + +TEST(DBTest, CompactionDeletionTriggerReopen) { + for (int tid = 0; tid < 2; ++tid) { + uint64_t db_size[3]; + Options options = DeletionTriggerOptions(); + options.create_if_missing = true; + + DestroyAndReopen(&options); + Random rnd(301); + + // round 1 --- insert key/value pairs. + const int kTestSize = kCDTKeysPerBuffer * 512; + std::vector values; + for (int k = 0; k < kTestSize; ++k) { + values.push_back(RandomString(&rnd, kCDTValueSize)); + ASSERT_OK(Put(Key(k), values[k])); + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + db_size[0] = Size(Key(0), Key(kTestSize - 1)); + Close(); + + // round 2 --- disable auto-compactions and issue deletions. + options.create_if_missing = false; + options.disable_auto_compactions = true; + Reopen(&options); + + for (int k = 0; k < kTestSize; ++k) { + ASSERT_OK(Delete(Key(k))); + } + db_size[1] = Size(Key(0), Key(kTestSize - 1)); + Close(); + // as auto_compaction is off, we shouldn't see too much reduce + // in db size. + ASSERT_LT(db_size[0] / 3, db_size[1]); + + // round 3 --- reopen db with auto_compaction on and see if + // deletion compensation still work. + options.disable_auto_compactions = false; + Reopen(&options); + // insert relatively small amount of data to trigger auto compaction. + for (int k = 0; k < kTestSize / 10; ++k) { + ASSERT_OK(Put(Key(k), values[k])); + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + db_size[2] = Size(Key(0), Key(kTestSize - 1)); + // this time we're expecting significant drop in size. + ASSERT_GT(db_size[0] / 3, db_size[2]); + + // repeat the test with universal compaction + options.compaction_style = kCompactionStyleUniversal; + options.num_levels = 1; + } +} + // This is a static filter used for filtering // kvs during the compaction process. static int cfilter_count; diff --git a/db/version_edit.h b/db/version_edit.h index 1d214a149..df1cc7827 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -40,6 +40,11 @@ struct FileDescriptor { struct FileMetaData { int refs; FileDescriptor fd; + uint64_t compensated_file_size; // File size compensated by deletion entry. + uint64_t num_entries; // the number of entries. + uint64_t num_deletions; // the number of deletion entries. + uint64_t raw_key_size; // total uncompressed key size. + uint64_t raw_value_size; // total uncompressed value size. InternalKey smallest; // Smallest internal key served by table InternalKey largest; // Largest internal key served by table bool being_compacted; // Is this file undergoing compaction? @@ -52,6 +57,11 @@ struct FileMetaData { FileMetaData() : refs(0), fd(0, 0), + compensated_file_size(0), + num_entries(0), + num_deletions(0), + raw_key_size(0), + raw_value_size(0), being_compacted(false), table_reader_handle(nullptr) {} }; @@ -149,6 +159,7 @@ class VersionEdit { private: friend class VersionSet; + friend class Version; typedef std::set< std::pair> DeletedFileSet; diff --git a/db/version_set.cc b/db/version_set.cc index 10b25533c..29611f0a0 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -47,6 +47,15 @@ static uint64_t TotalFileSize(const std::vector& files) { return sum; } +static uint64_t TotalCompensatedFileSize( + const std::vector& files) { + uint64_t sum = 0; + for (size_t i = 0; i < files.size() && files[i]; i++) { + sum += files[i]->compensated_file_size; + } + return sum; +} + Version::~Version() { assert(refs_ == 0); @@ -241,53 +250,69 @@ class Version::LevelFileIteratorState : public TwoLevelIteratorState { bool for_compaction_; }; -Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) { +Status Version::GetTableProperties(std::shared_ptr* tp, + const FileMetaData* file_meta, + const std::string* fname) { auto table_cache = cfd_->table_cache(); auto options = cfd_->options(); + Status s = table_cache->GetTableProperties( + vset_->storage_options_, cfd_->internal_comparator(), file_meta->fd, + tp, true /* no io */); + if (s.ok()) { + return s; + } + + // We only ignore error type `Incomplete` since it's by design that we + // disallow table when it's not in table cache. + if (!s.IsIncomplete()) { + return s; + } + + // 2. Table is not present in table cache, we'll read the table properties + // directly from the properties block in the file. + std::unique_ptr file; + if (fname != nullptr) { + s = options->env->NewRandomAccessFile( + *fname, &file, vset_->storage_options_); + } else { + s = options->env->NewRandomAccessFile( + TableFileName(vset_->dbname_, file_meta->fd.GetNumber()), + &file, vset_->storage_options_); + } + if (!s.ok()) { + return s; + } + + TableProperties* raw_table_properties; + // By setting the magic number to kInvalidTableMagicNumber, we can by + // pass the magic number check in the footer. + s = ReadTableProperties( + file.get(), file_meta->fd.GetFileSize(), + Footer::kInvalidTableMagicNumber /* table's magic number */, + vset_->env_, options->info_log.get(), &raw_table_properties); + if (!s.ok()) { + return s; + } + RecordTick(options->statistics.get(), + NUMBER_DIRECT_LOAD_TABLE_PROPERTIES); + + *tp = std::shared_ptr(raw_table_properties); + return s; +} + +Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) { for (int level = 0; level < num_levels_; level++) { for (const auto& file_meta : files_[level]) { auto fname = TableFileName(vset_->dbname_, file_meta->fd.GetNumber()); // 1. If the table is already present in table cache, load table // properties from there. std::shared_ptr table_properties; - Status s = table_cache->GetTableProperties( - vset_->storage_options_, cfd_->internal_comparator(), file_meta->fd, - &table_properties, true /* no io */); + Status s = GetTableProperties(&table_properties, file_meta, &fname); if (s.ok()) { props->insert({fname, table_properties}); - continue; - } - - // We only ignore error type `Incomplete` since it's by design that we - // disallow table when it's not in table cache. - if (!s.IsIncomplete()) { - return s; - } - - // 2. Table is not present in table cache, we'll read the table properties - // directly from the properties block in the file. - std::unique_ptr file; - s = options->env->NewRandomAccessFile(fname, &file, - vset_->storage_options_); - if (!s.ok()) { - return s; - } - - TableProperties* raw_table_properties; - // By setting the magic number to kInvalidTableMagicNumber, we can by - // pass the magic number check in the footer. - s = ReadTableProperties( - file.get(), file_meta->fd.GetFileSize(), - Footer::kInvalidTableMagicNumber /* table's magic number */, - vset_->env_, options->info_log.get(), &raw_table_properties); - if (!s.ok()) { + } else { return s; } - RecordTick(options->statistics.get(), - NUMBER_DIRECT_LOAD_TABLE_PROPERTIES); - - props->insert({fname, std::shared_ptr( - raw_table_properties)}); } } @@ -492,7 +517,11 @@ Version::Version(ColumnFamilyData* cfd, VersionSet* vset, compaction_level_(num_levels_), version_number_(version_number), file_indexer_(num_levels_, cfd == nullptr ? nullptr - : cfd->internal_comparator().user_comparator()) { + : cfd->internal_comparator().user_comparator()), + total_file_size_(0), + total_raw_key_size_(0), + total_raw_value_size_(0), + num_non_deletions_(0) { } void Version::Get(const ReadOptions& options, @@ -699,6 +728,58 @@ void Version::PrepareApply(std::vector& size_being_compacted) { UpdateNumNonEmptyLevels(); } +bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) { + if (file_meta->num_entries > 0) { + return false; + } + std::shared_ptr tp; + Status s = GetTableProperties(&tp, file_meta); + if (!s.ok()) { + return false; + } + if (tp.get() == nullptr) return false; + file_meta->num_entries = tp->num_entries; + file_meta->num_deletions = GetDeletedKeys(tp->user_collected_properties); + file_meta->raw_value_size = tp->raw_value_size; + file_meta->raw_key_size = tp->raw_key_size; + + return true; +} + +void Version::UpdateTemporaryStats(const VersionEdit* edit) { + static const int kDeletionWeightOnCompaction = 2; + + // incrementally update the average value size by + // including newly added files into the global stats + int init_count = 0; + int total_count = 0; + for (int level = 0; level < num_levels_; level++) { + for (auto* file_meta : files_[level]) { + if (MaybeInitializeFileMetaData(file_meta)) { + // each FileMeta will be initialized only once. + total_file_size_ += file_meta->fd.GetFileSize(); + total_raw_key_size_ += file_meta->raw_key_size; + total_raw_value_size_ += file_meta->raw_value_size; + num_non_deletions_ += + file_meta->num_entries - file_meta->num_deletions; + init_count++; + } + total_count++; + } + } + + uint64_t average_value_size = GetAverageValueSize(); + + // compute the compensated size + for (int level = 0; level < num_levels_; level++) { + for (auto* file_meta : files_[level]) { + file_meta->compensated_file_size = file_meta->fd.GetFileSize() + + file_meta->num_deletions * average_value_size * + kDeletionWeightOnCompaction; + } + } +} + void Version::ComputeCompactionScore( std::vector& size_being_compacted) { double max_score = 0; @@ -728,7 +809,7 @@ void Version::ComputeCompactionScore( uint64_t total_size = 0; for (unsigned int i = 0; i < files_[level].size(); i++) { if (!files_[level][i]->being_compacted) { - total_size += files_[level][i]->fd.GetFileSize(); + total_size += files_[level][i]->compensated_file_size; numfiles++; } } @@ -747,7 +828,7 @@ void Version::ComputeCompactionScore( } else { // Compute the ratio of current size to size limit. const uint64_t level_bytes = - TotalFileSize(files_[level]) - size_being_compacted[level]; + TotalCompensatedFileSize(files_[level]) - size_being_compacted[level]; score = static_cast(level_bytes) / cfd_->compaction_picker()->MaxBytesForLevel(level); if (max_score < score) { @@ -783,9 +864,10 @@ namespace { // Compator that is used to sort files based on their size // In normal mode: descending size -bool CompareSizeDescending(const Version::Fsize& first, - const Version::Fsize& second) { - return (first.file->fd.GetFileSize() > second.file->fd.GetFileSize()); +bool CompareCompensatedSizeDescending(const Version::Fsize& first, + const Version::Fsize& second) { + return (first.file->compensated_file_size > + second.file->compensated_file_size); } // A static compator used to sort files based on their seqno // In universal style : descending seqno @@ -846,7 +928,7 @@ void Version::UpdateFilesBySize() { num = temp.size(); } std::partial_sort(temp.begin(), temp.begin() + num, temp.end(), - CompareSizeDescending); + CompareCompensatedSizeDescending); } assert(temp.size() == files.size()); @@ -1674,6 +1756,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, if (!edit->IsColumnFamilyManipulation()) { // This is cpu-heavy operations, which should be called outside mutex. + v->UpdateTemporaryStats(edit); v->PrepareApply(size_being_compacted); } diff --git a/db/version_set.h b/db/version_set.h index cf526c2bd..542db7466 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -196,6 +196,25 @@ class Version { // Returns the version nuber of this version uint64_t GetVersionNumber() const { return version_number_; } + uint64_t GetAverageValueSize() const { + if (num_non_deletions_ == 0) { + return 0; + } + assert(total_raw_key_size_ + total_raw_value_size_ > 0); + assert(total_file_size_ > 0); + return total_raw_value_size_ / num_non_deletions_ * total_file_size_ / + (total_raw_key_size_ + total_raw_value_size_); + } + + // REQUIRES: lock is held + // On success, "tp" will contains the table properties of the file + // specified in "file_meta". If the file name of "file_meta" is + // known ahread, passing it by a non-null "fname" can save a + // file-name conversion. + Status GetTableProperties(std::shared_ptr* tp, + const FileMetaData* file_meta, + const std::string* fname = nullptr); + // REQUIRES: lock is held // On success, *props will be populated with all SSTables' table properties. // The keys of `props` are the sst file name, the values of `props` are the @@ -228,6 +247,15 @@ class Version { // Update num_non_empty_levels_. void UpdateNumNonEmptyLevels(); + // The helper function of UpdateTemporaryStats, which may fill the missing + // fields of file_mata from its associated TableProperties. + // Returns true if it does initialize FileMetaData. + bool MaybeInitializeFileMetaData(FileMetaData* file_meta); + + // Update the temporary stats associated with the current version. + // This temporary stats will be used in compaction. + void UpdateTemporaryStats(const VersionEdit* edit); + // Sort all files for this version based on their file size and // record results in files_by_size_. The largest files are listed first. void UpdateFilesBySize(); @@ -285,6 +313,16 @@ class Version { Version(ColumnFamilyData* cfd, VersionSet* vset, uint64_t version_number = 0); FileIndexer file_indexer_; + // total file size + uint64_t total_file_size_; + // the total size of all raw keys. + uint64_t total_raw_key_size_; + // the total size of all raw values. + uint64_t total_raw_value_size_; + // total number of non-deletion entries + uint64_t num_non_deletions_; + + ~Version(); // re-initializes the index that is used to offset into files_by_size_