From 27a8856c23b4f734dedbaccb9cd62714d3d0435e Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Fri, 31 Jan 2014 16:45:20 -0800 Subject: [PATCH] Compacting column families Summary: This diff enables non-default column families to get compacted both automatically and also by calling CompactRange() Test Plan: make check Reviewers: dhruba, haobo, kailiu, sdong CC: leveldb Differential Revision: https://reviews.facebook.net/D15813 --- db/column_family_test.cc | 88 ++++++++++++++++++++++++++++- db/compaction.cc | 7 +++ db/compaction.h | 8 +++ db/db_impl.cc | 117 +++++++++++++++++++++------------------ db/db_impl.h | 10 ++-- db/version_set.cc | 11 ++-- 6 files changed, 177 insertions(+), 64 deletions(-) diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 1e67ce31c..539f70127 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -83,6 +83,46 @@ class ColumnFamilyTest { return result; } + void Compact(int cf, const Slice& start, const Slice& limit) { + ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit)); + } + + int NumTableFilesAtLevel(int cf, int level) { + string property; + ASSERT_TRUE(db_->GetProperty( + handles_[cf], "rocksdb.num-files-at-level" + NumberToString(level), + &property)); + return atoi(property.c_str()); + } + + // Return spread of files per level + string FilesPerLevel(int cf) { + string result; + int last_non_zero_offset = 0; + for (int level = 0; level < column_family_options_.num_levels; level++) { + int f = NumTableFilesAtLevel(cf, level); + char buf[100]; + snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f); + result += buf; + if (f > 0) { + last_non_zero_offset = result.size(); + } + } + result.resize(last_non_zero_offset); + return result; + } + + // Do n memtable flushes, each of which produces an sstable + // covering the range [small,large]. + void MakeTables(int cf, int n, const string& small, + const string& large) { + for (int i = 0; i < n; i++) { + ASSERT_OK(Put(cf, small, "begin")); + ASSERT_OK(Put(cf, large, "end")); + ASSERT_OK(db_->Flush(FlushOptions(), handles_[cf])); + } + } + void CopyFile(const string& source, const string& destination, uint64_t size = 0) { const EnvOptions soptions; @@ -111,7 +151,7 @@ class ColumnFamilyTest { ColumnFamilyOptions column_family_options_; DBOptions db_options_; string dbname_; - DB* db_; + DB* db_ = nullptr; Env* env_; }; @@ -274,6 +314,52 @@ TEST(ColumnFamilyTest, FlushTest) { Close(); } +// This is the same as DBTest::ManualCompaction, but it does all +// operations on non-default column family +TEST(ColumnFamilyTest, ManualCompaction) { + // iter - 0 with 7 levels + // iter - 1 with 3 levels + int cf = 1; + for (int iter = 0; iter < 2; ++iter) { + column_family_options_.num_levels = (iter == 0) ? 3 : 7; + Destroy(); + ASSERT_OK(Open({"default"})); + CreateColumnFamilies({"one"}); + Close(); + ASSERT_OK(Open({"default", "one"})); + + MakeTables(cf, 3, "p", "q"); + ASSERT_EQ("1,1,1", FilesPerLevel(cf)); + + // Compaction range falls before files + Compact(cf, "", "c"); + ASSERT_EQ("1,1,1", FilesPerLevel(cf)); + + // Compaction range falls after files + Compact(cf, "r", "z"); + ASSERT_EQ("1,1,1", FilesPerLevel(cf)); + + // Compaction range overlaps files + Compact(cf, "p1", "p9"); + ASSERT_EQ("0,0,1", FilesPerLevel(cf)); + + // Populate a different range + MakeTables(cf, 3, "c", "e"); + ASSERT_EQ("1,1,2", FilesPerLevel(cf)); + + // Compact just the new range + Compact(cf, "b", "f"); + ASSERT_EQ("0,0,2", FilesPerLevel(cf)); + + // Compact all + MakeTables(cf, 1, "a", "z"); + ASSERT_EQ("0,1,2", FilesPerLevel(cf)); + Compact(cf, "", "zzz"); + ASSERT_EQ("0,0,1", FilesPerLevel(cf)); + } + Close(); +} + } // namespace rocksdb diff --git a/db/compaction.cc b/db/compaction.cc index 536b7e233..d045a83d1 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -8,6 +8,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/compaction.h" +#include "db/column_family.h" namespace rocksdb { @@ -29,6 +30,7 @@ Compaction::Compaction(Version* input_version, int level, int out_level, max_grandparent_overlap_bytes_(max_grandparent_overlap_bytes), input_version_(input_version), number_levels_(input_version_->NumberLevels()), + cfd_(input_version_->cfd_), seek_compaction_(seek_compaction), enable_compression_(enable_compression), grandparent_index_(0), @@ -43,6 +45,7 @@ Compaction::Compaction(Version* input_version, int level, int out_level, input_version_->Ref(); edit_ = new VersionEdit(); + edit_->SetColumnFamily(cfd_->GetID()); for (int i = 0; i < number_levels_; i++) { level_ptrs_[i] = 0; } @@ -170,6 +173,10 @@ void Compaction::ReleaseInputs() { } } +void Compaction::ReleaseCompactionFiles(Status status) { + cfd_->compaction_picker()->ReleaseCompactionFiles(this, status); +} + void Compaction::ResetNextCompactionIndex() { input_version_->ResetNextCompactionIndex(level_); } diff --git a/db/compaction.h b/db/compaction.h index efd6ef71f..f92dc1db0 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -13,6 +13,7 @@ namespace rocksdb { class Version; +class ColumnFamilyData; // A Compaction encapsulates information about a compaction. class Compaction { @@ -36,6 +37,8 @@ class Compaction { // Returns input version of the compaction Version* input_version() const { return input_version_; } + ColumnFamilyData* column_family_data() const { return cfd_; } + // Return the ith input file at "level()+which" ("which" must be 0 or 1). FileMetaData* input(int which, int i) const { return inputs_[which][i]; } @@ -67,6 +70,10 @@ class Compaction { // is successful. void ReleaseInputs(); + // Clear all files to indicate that they are not being compacted + // Delete this compaction from the list of running compactions. + void ReleaseCompactionFiles(Status status); + void Summary(char* output, int len); // Return the score that was used to pick this compaction run. @@ -94,6 +101,7 @@ class Compaction { Version* input_version_; VersionEdit* edit_; int number_levels_; + ColumnFamilyData* cfd_; bool seek_compaction_; bool enable_compression_; diff --git a/db/db_impl.cc b/db/db_impl.cc index d4148d34e..63b7e63d6 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1264,7 +1264,14 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family, const Slice* begin, const Slice* end, bool reduce_level, int target_level) { - Status s = FlushMemTable(default_cfd_, FlushOptions()); + mutex_.Lock(); + auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); + mutex_.Unlock(); + // this is asserting because client calling DB methods with undefined + // ColumnFamilyHandle is undefined behavior. + assert(cfd != nullptr); + + Status s = FlushMemTable(cfd, FlushOptions()); if (!s.ok()) { LogFlush(options_.info_log); return s; @@ -1273,8 +1280,8 @@ Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family, int max_level_with_files = 1; { MutexLock l(&mutex_); - Version* base = default_cfd_->current(); - for (int level = 1; level < NumberLevels(); level++) { + Version* base = cfd->current(); + for (int level = 1; level < cfd->NumberLevels(); level++) { if (base->OverlapInLevel(level, begin, end)) { max_level_with_files = level; } @@ -1285,9 +1292,9 @@ Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family, // bottom-most level, the output level will be the same as input one if (options_.compaction_style == kCompactionStyleUniversal || level == max_level_with_files) { - s = RunManualCompaction(level, level, begin, end); + s = RunManualCompaction(cfd, level, level, begin, end); } else { - s = RunManualCompaction(level, level + 1, begin, end); + s = RunManualCompaction(cfd, level, level + 1, begin, end); } if (!s.ok()) { LogFlush(options_.info_log); @@ -1296,7 +1303,7 @@ Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family, } if (reduce_level) { - s = ReFitLevel(max_level_with_files, target_level); + s = ReFitLevel(cfd, max_level_with_files, target_level); } LogFlush(options_.info_log); @@ -1304,15 +1311,15 @@ Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family, } // return the same level if it cannot be moved -int DBImpl::FindMinimumEmptyLevelFitting(int level) { +int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, int level) { mutex_.AssertHeld(); - Version* current = default_cfd_->current(); + Version* current = cfd->current(); int minimum_level = level; for (int i = level - 1; i > 0; --i) { // stop if level i is not empty if (current->NumLevelFiles(i) > 0) break; // stop if level i is too small (cannot fit the level files) - if (default_cfd_->compaction_picker()->MaxBytesForLevel(i) < + if (cfd->compaction_picker()->MaxBytesForLevel(i) < current->NumLevelBytes(level)) { break; } @@ -1322,8 +1329,8 @@ int DBImpl::FindMinimumEmptyLevelFitting(int level) { return minimum_level; } -Status DBImpl::ReFitLevel(int level, int target_level) { - assert(level < NumberLevels()); +Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { + assert(level < cfd->NumberLevels()); SuperVersion* superversion_to_free = nullptr; SuperVersion* new_superversion = new SuperVersion(); @@ -1351,7 +1358,7 @@ Status DBImpl::ReFitLevel(int level, int target_level) { // move to a smaller level int to_level = target_level; if (target_level < 0) { - to_level = FindMinimumEmptyLevelFitting(level); + to_level = FindMinimumEmptyLevelFitting(cfd, level); } assert(to_level <= level); @@ -1359,10 +1366,11 @@ Status DBImpl::ReFitLevel(int level, int target_level) { Status status; if (to_level < level) { Log(options_.info_log, "Before refitting:\n%s", - default_cfd_->current()->DebugString().data()); + cfd->current()->DebugString().data()); VersionEdit edit; - for (const auto& f : default_cfd_->current()->files_[level]) { + edit.SetColumnFamily(cfd->GetID()); + for (const auto& f : cfd->current()->files_[level]) { edit.DeleteFile(level, f->number); edit.AddFile(to_level, f->number, f->file_size, f->smallest, f->largest, f->smallest_seqno, f->largest_seqno); @@ -1370,16 +1378,15 @@ Status DBImpl::ReFitLevel(int level, int target_level) { Log(options_.info_log, "Apply version edit:\n%s", edit.DebugString().data()); - status = versions_->LogAndApply(default_cfd_, &edit, &mutex_, - db_directory_.get()); - superversion_to_free = default_cfd_->InstallSuperVersion(new_superversion); + status = versions_->LogAndApply(cfd, &edit, &mutex_, db_directory_.get()); + superversion_to_free = cfd->InstallSuperVersion(new_superversion); new_superversion = nullptr; Log(options_.info_log, "LogAndApply: %s\n", status.ToString().data()); if (status.ok()) { Log(options_.info_log, "After refitting:\n%s", - default_cfd_->current()->DebugString().data()); + cfd->current()->DebugString().data()); } } @@ -1607,15 +1614,15 @@ Status DBImpl::AppendSortedWalsOfType(const std::string& path, return status; } -Status DBImpl::RunManualCompaction(int input_level, - int output_level, - const Slice* begin, +Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, + int output_level, const Slice* begin, const Slice* end) { assert(input_level >= 0); InternalKey begin_storage, end_storage; ManualCompaction manual; + manual.cfd = cfd; manual.input_level = input_level; manual.output_level = output_level; manual.done = false; @@ -1630,7 +1637,7 @@ Status DBImpl::RunManualCompaction(int input_level, manual.begin = &begin_storage; } if (end == nullptr || - options_.compaction_style == kCompactionStyleUniversal) { + cfd->options()->compaction_style == kCompactionStyleUniversal) { manual.end = nullptr; } else { end_storage = InternalKey(*end, 0, static_cast(0)); @@ -1686,7 +1693,7 @@ Status DBImpl::TEST_CompactRange(int level, int output_level = (options_.compaction_style == kCompactionStyleUniversal) ? level : level + 1; - return RunManualCompaction(level, output_level, begin, end); + return RunManualCompaction(default_cfd_, level, output_level, begin, end); } Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, @@ -1946,8 +1953,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, if (is_manual) { ManualCompaction* m = manual_compaction_; assert(m->in_progress); - c.reset(default_cfd_->CompactRange(m->input_level, m->output_level, - m->begin, m->end, &manual_end)); + c.reset(m->cfd->CompactRange(m->input_level, m->output_level, m->begin, + m->end, &manual_end)); if (!c) { m->done = true; } @@ -1962,7 +1969,12 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, ? "(end)" : manual_end->DebugString().c_str())); } else if (!options_.disable_auto_compactions) { - c.reset(default_cfd_->PickCompaction()); + for (auto cfd : *versions_->GetColumnFamilySet()) { + c.reset(cfd->PickCompaction()); + if (c != nullptr) { + break; + } + } } Status status; @@ -1977,23 +1989,23 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest, f->smallest_seqno, f->largest_seqno); - status = versions_->LogAndApply(default_cfd_, c->edit(), &mutex_, + status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_, db_directory_.get()); - InstallSuperVersion(default_cfd_, deletion_state); + InstallSuperVersion(c->column_family_data(), deletion_state); Version::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast(f->number), c->level() + 1, static_cast(f->file_size), - status.ToString().c_str(), default_cfd_->current()->LevelSummary(&tmp)); - default_cfd_->compaction_picker()->ReleaseCompactionFiles(c.get(), status); + status.ToString().c_str(), c->input_version()->LevelSummary(&tmp)); + c->ReleaseCompactionFiles(status); *madeProgress = true; } else { MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel. CompactionState* compact = new CompactionState(c.get()); status = DoCompactionWork(compact, deletion_state); CleanupCompaction(compact, status); - default_cfd_->compaction_picker()->ReleaseCompactionFiles(c.get(), status); + c->ReleaseCompactionFiles(status); c->ReleaseInputs(); *madeProgress = true; } @@ -2123,8 +2135,9 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { if (s.ok()) { // Over-estimate slightly so we don't end up just barely crossing // the threshold. + ColumnFamilyData* cfd = compact->compaction->column_family_data(); compact->outfile->SetPreallocationBlockSize( - 1.1 * default_cfd_->compaction_picker()->MaxFileSizeForLevel( + 1.1 * cfd->compaction_picker()->MaxFileSizeForLevel( compact->compaction->output_level())); CompressionType compression_type = GetCompressionType( @@ -2228,8 +2241,9 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { compact->compaction->output_level(), out.number, out.file_size, out.smallest, out.largest, out.smallest_seqno, out.largest_seqno); } - return versions_->LogAndApply(default_cfd_, compact->compaction->edit(), - &mutex_, db_directory_.get()); + return versions_->LogAndApply(compact->compaction->column_family_data(), + compact->compaction->edit(), &mutex_, + db_directory_.get()); } // @@ -2264,13 +2278,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, DeletionState& deletion_state) { assert(compact); int64_t imm_micros = 0; // Micros spent doing imm_ compactions + ColumnFamilyData* cfd = compact->compaction->column_family_data(); Log(options_.info_log, - "Compacting %d@%d + %d@%d files, score %.2f slots available %d", - compact->compaction->num_input_files(0), - compact->compaction->level(), - compact->compaction->num_input_files(1), - compact->compaction->output_level(), - compact->compaction->score(), + "[CF %u] Compacting %d@%d + %d@%d files, score %.2f slots available %d", + cfd->GetID(), compact->compaction->num_input_files(0), + compact->compaction->level(), compact->compaction->num_input_files(1), + compact->compaction->output_level(), compact->compaction->score(), options_.max_background_compactions - bg_compaction_scheduled_); char scratch[256]; compact->compaction->Summary(scratch, sizeof(scratch)); @@ -2321,7 +2334,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, MergeHelper merge(user_comparator(), options_.merge_operator.get(), options_.info_log.get(), false /* internal key corruption is expected */); - auto compaction_filter = options_.compaction_filter; + auto compaction_filter = cfd->options()->compaction_filter; std::unique_ptr compaction_filter_from_factory = nullptr; if (!compaction_filter) { auto context = compact->GetFilterContext(); @@ -2333,12 +2346,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { // Prioritize immutable compaction work // TODO: remove memtable flush from normal compaction work - if (default_cfd_->imm()->imm_flush_needed.NoBarrier_Load() != nullptr) { + if (cfd->imm()->imm_flush_needed.NoBarrier_Load() != nullptr) { const uint64_t imm_start = env_->NowMicros(); LogFlush(options_.info_log); mutex_.Lock(); - if (default_cfd_->imm()->IsFlushPending()) { - FlushMemTableToOutputFile(default_cfd_, nullptr, deletion_state); + if (cfd->imm()->IsFlushPending()) { + FlushMemTableToOutputFile(cfd, nullptr, deletion_state); bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary } mutex_.Unlock(); @@ -2388,11 +2401,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, // the entry with a delete marker. bool value_changed = false; compaction_filter_value.clear(); - bool to_delete = - compaction_filter->Filter(compact->compaction->level(), - ikey.user_key, value, - &compaction_filter_value, - &value_changed); + bool to_delete = compaction_filter->Filter( + compact->compaction->level(), ikey.user_key, value, + &compaction_filter_value, &value_changed); if (to_delete) { // make a copy of the original key delete_key.assign(key.data(), key.data() + key.size()); @@ -2410,7 +2421,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, value = compaction_filter_value; } } - } // If there are no snapshots, then this kv affect visibility at tip. @@ -2649,7 +2659,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, if (status.ok()) { status = InstallCompactionResults(compact); - InstallSuperVersion(default_cfd_, deletion_state); + InstallSuperVersion(cfd, deletion_state); } Version::LevelSummaryStorage tmp; Log(options_.info_log, @@ -2847,7 +2857,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, mutex_.Lock(); auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); - // this is asserting because client calling Get() with undefined + // this is asserting because client calling DB methods with undefined // ColumnFamilyHandle is undefined behavior. assert(cfd != nullptr); SuperVersion* get_version = cfd->GetSuperVersion()->Ref(); @@ -3538,7 +3548,8 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family, const Slice& property, std::string* value) { value->clear(); MutexLock l(&mutex_); - return internal_stats_.GetProperty(property, value, default_cfd_); + auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); + return internal_stats_.GetProperty(property, value, cfd); } void DBImpl::GetApproximateSizes(const ColumnFamilyHandle& column_family, diff --git a/db/db_impl.h b/db/db_impl.h index 69cfba42a..4f8cca1b8 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -129,9 +129,8 @@ class DBImpl : public DB { virtual Status GetDbIdentity(std::string& identity); - Status RunManualCompaction(int input_level, - int output_level, - const Slice* begin, + Status RunManualCompaction(ColumnFamilyData* cfd, int input_level, + int output_level, const Slice* begin, const Slice* end); // Extra methods (for testing) that are not in the public DB interface @@ -361,12 +360,12 @@ class DBImpl : public DB { // Return the minimum empty level that could hold the total data in the // input level. Return the input level, if such level could not be found. - int FindMinimumEmptyLevelFitting(int level); + int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, int level); // Move the files in the input level to the target level. // If target_level < 0, automatically calculate the minimum level that could // hold the data set. - Status ReFitLevel(int level, int target_level = -1); + Status ReFitLevel(ColumnFamilyData* cfd, int level, int target_level = -1); // Returns the current SuperVersion number. uint64_t CurrentVersionNumber() const; @@ -428,6 +427,7 @@ class DBImpl : public DB { // Information for a manual compaction struct ManualCompaction { + ColumnFamilyData* cfd; int input_level; int output_level; bool done; diff --git a/db/version_set.cc b/db/version_set.cc index 7fc1880ea..46f528538 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -241,7 +241,8 @@ bool Version::PrefixMayMatch(const ReadOptions& options, Iterator* Version::NewConcatenatingIterator(const ReadOptions& options, const EnvOptions& soptions, int level) const { - Iterator* level_iter = new LevelFileNumIterator(vset_->icmp_, &files_[level]); + Iterator* level_iter = + new LevelFileNumIterator(cfd_->internal_comparator(), &files_[level]); if (options.prefix) { InternalKey internal_prefix(*options.prefix, 0, kTypeValue); if (!PrefixMayMatch(options, soptions, @@ -2283,7 +2284,7 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { // Create concatenating iterator for the files from this level list[num++] = NewTwoLevelIterator( new Version::LevelFileNumIterator( - c->input_version()->cfd_->internal_comparator(), + c->column_family_data()->internal_comparator(), c->inputs(which)), &GetFileIterator, table_cache_, options, storage_options_, true /* for compaction */); @@ -2291,7 +2292,8 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { } } assert(num <= space); - Iterator* result = NewMergingIterator(&icmp_, list, num); + Iterator* result = NewMergingIterator( + &c->column_family_data()->internal_comparator(), list, num); delete[] list; return result; } @@ -2300,8 +2302,7 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { // in the current version bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) { #ifndef NDEBUG - // TODO this only works for default column family now - Version* version = column_family_set_->GetDefault()->current(); + Version* version = c->column_family_data()->current(); if (c->input_version() != version) { Log(options_->info_log, "VerifyCompactionFileConsistency version mismatch"); }