From 6059bdf86a9cedb029be8d14c5b4c8b17bea6dfa Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Fri, 17 Apr 2015 16:44:45 -0700 Subject: [PATCH] Add experimental API MarkForCompaction() Summary: Some Mongo+Rocks datasets in Parse's environment are not doing compactions very frequently. During the quiet period (with no IO), we'd like to schedule compactions so that our reads become faster. Also, aggressively compacting during quiet periods helps when write bursts happen. In addition, we also want to compact files that are containing deleted key ranges (like old oplog keys). All of this is currently not possible with CompactRange() because it's single-threaded and blocks all other compactions from happening. Running CompactRange() risks an issue of blocking writes because we generate too much Level 0 files before the compaction is over. Stopping writes is very dangerous because they hold transaction locks. We tried running manual compaction once on Mongo+Rocks and everything fell apart. MarkForCompaction() solves all of those problems. This is very light-weight manual compaction. It is lower priority than automatic compactions, which means it shouldn't interfere with background process keeping the LSM tree clean. However, if no automatic compactions need to be run (or we have extra background threads available), we will start compacting files that are marked for compaction. Test Plan: added a new unit test Reviewers: yhchiang, rven, MarkCallaghan, sdong Reviewed By: sdong Subscribers: yoshinorim, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D37083 --- db/compaction_picker.cc | 37 ++++++++++++++ db/compaction_picker.h | 8 ++++ db/compaction_picker_test.cc | 6 +++ db/db_impl.h | 4 ++ db/db_impl_experimental.cc | 59 +++++++++++++++++++++++ db/db_test.cc | 88 ++++++++++++++++++++++++++++++++++ db/experimental.cc | 38 +++++++++++++++ db/version_edit.h | 6 ++- db/version_set.cc | 20 ++++++-- db/version_set.h | 25 ++++++++-- include/rocksdb/experimental.h | 20 ++++++++ src.mk | 2 + 12 files changed, 305 insertions(+), 8 deletions(-) create mode 100644 db/db_impl_experimental.cc create mode 100644 db/experimental.cc create mode 100644 include/rocksdb/experimental.h diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 953e44b32..d8328ef87 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -667,6 +667,9 @@ Status CompactionPicker::SanitizeCompactionInputFiles( bool LevelCompactionPicker::NeedsCompaction(const VersionStorageInfo* vstorage) const { + if (!vstorage->FilesMarkedForCompaction().empty()) { + return true; + } for (int i = 0; i <= vstorage->MaxInputLevel(); i++) { if (vstorage->CompactionScore(i) >= 1) { return true; @@ -675,6 +678,31 @@ bool LevelCompactionPicker::NeedsCompaction(const VersionStorageInfo* vstorage) return false; } +void LevelCompactionPicker::PickFilesMarkedForCompactionExperimental( + const std::string& cf_name, VersionStorageInfo* vstorage, + CompactionInputFiles* inputs, int* level, int* output_level) { + if (vstorage->FilesMarkedForCompaction().empty()) { + return; + } + + for (auto& level_file : vstorage->FilesMarkedForCompaction()) { + // If it's being compaction it has nothing to do here. + // If this assert() fails that means that some function marked some + // files as being_compacted, but didn't call ComputeCompactionScore() + assert(!level_file.second->being_compacted); + *level = level_file.first; + *output_level = (*level == 0) ? vstorage->base_level() : *level + 1; + + inputs->files = {level_file.second}; + inputs->level = *level; + if (ExpandWhileOverlapping(cf_name, vstorage, inputs)) { + // found the compaction! + return; + } + } + inputs->files.clear(); +} + Compaction* LevelCompactionPicker::PickCompaction( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, LogBuffer* log_buffer) { @@ -697,10 +725,19 @@ Compaction* LevelCompactionPicker::PickCompaction( ExpandWhileOverlapping(cf_name, vstorage, &inputs)) { // found the compaction! break; + } else { + // didn't find the compaction, clear the inputs + inputs.clear(); } } } + // if we didn't find a compaction, check if there are any files marked for + // compaction + if (inputs.empty()) { + PickFilesMarkedForCompactionExperimental(cf_name, vstorage, &inputs, &level, + &output_level); + } if (inputs.empty()) { return nullptr; } diff --git a/db/compaction_picker.h b/db/compaction_picker.h index db48802ae..c1e9e712a 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -197,6 +197,14 @@ class LevelCompactionPicker : public CompactionPicker { bool PickCompactionBySize(VersionStorageInfo* vstorage, int level, int output_level, CompactionInputFiles* inputs, int* parent_index, int* base_index); + + // If there is any file marked for compaction, put put it into inputs. + // This is still experimental. It will return meaningful results only if + // clients call experimental feature SuggestCompactRange() + void PickFilesMarkedForCompactionExperimental(const std::string& cf_name, + VersionStorageInfo* vstorage, + CompactionInputFiles* inputs, + int* level, int* output_level); }; #ifndef ROCKSDB_LITE diff --git a/db/compaction_picker_test.cc b/db/compaction_picker_test.cc index d3789dcfb..d2a4d5fd0 100644 --- a/db/compaction_picker_test.cc +++ b/db/compaction_picker_test.cc @@ -350,13 +350,16 @@ TEST_F(CompactionPickerTest, NeedsCompactionUniversal) { // must return false when there's no files. ASSERT_EQ(universal_compaction_picker.NeedsCompaction(vstorage_.get()), false); + UpdateVersionStorageInfo(); // verify the trigger given different number of L0 files. for (int i = 1; i <= mutable_cf_options_.level0_file_num_compaction_trigger * 2; ++i) { + NewVersionStorage(1, kCompactionStyleUniversal); Add(0, i, ToString((i + 100) * 1000).c_str(), ToString((i + 100) * 1000 + 999).c_str(), 1000000, 0, i * 100, i * 100 + 99); + UpdateVersionStorageInfo(); ASSERT_EQ(level_compaction_picker.NeedsCompaction(vstorage_.get()), vstorage_->CompactionScore(0) >= 1); } @@ -373,6 +376,7 @@ TEST_F(CompactionPickerTest, NeedsCompactionFIFO) { ioptions_.compaction_options_fifo = fifo_options_; FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_); + UpdateVersionStorageInfo(); // must return false when there's no files. ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), false); @@ -380,10 +384,12 @@ TEST_F(CompactionPickerTest, NeedsCompactionFIFO) { // size of L0 files. uint64_t current_size = 0; for (int i = 1; i <= kFileCount; ++i) { + NewVersionStorage(1, kCompactionStyleFIFO); Add(0, i, ToString((i + 100) * 1000).c_str(), ToString((i + 100) * 1000 + 999).c_str(), kFileSize, 0, i * 100, i * 100 + 99); current_size += kFileSize; + UpdateVersionStorageInfo(); ASSERT_EQ(level_compaction_picker.NeedsCompaction(vstorage_.get()), vstorage_->CompactionScore(0) >= 1); } diff --git a/db/db_impl.h b/db/db_impl.h index 617f631f8..e1354731e 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -183,6 +183,10 @@ class DBImpl : public DB { ColumnFamilyHandle* column_family, ColumnFamilyMetaData* metadata) override; + // experimental API + Status SuggestCompactRange(ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end); + #endif // ROCKSDB_LITE // checks if all live files exist on file system and that their file sizes diff --git a/db/db_impl_experimental.cc b/db/db_impl_experimental.cc new file mode 100644 index 000000000..8d4b176eb --- /dev/null +++ b/db/db_impl_experimental.cc @@ -0,0 +1,59 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/db_impl.h" + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include + +#include "db/column_family.h" +#include "db/version_set.h" +#include "rocksdb/status.h" + +namespace rocksdb { + +#ifndef ROCKSDB_LITE +Status DBImpl::SuggestCompactRange(ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end) { + auto cfh = reinterpret_cast(column_family); + auto cfd = cfh->cfd(); + InternalKey start_key, end_key; + if (begin != nullptr) { + start_key = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek); + } + if (end != nullptr) { + end_key = InternalKey(*end, 0, static_cast(0)); + } + { + InstrumentedMutexLock l(&mutex_); + auto vstorage = cfd->current()->storage_info(); + for (int level = 0; level < vstorage->num_non_empty_levels(); ++level) { + std::vector inputs; + vstorage->GetOverlappingInputs( + level, begin == nullptr ? nullptr : &start_key, + end == nullptr ? nullptr : &end_key, &inputs); + for (auto f : inputs) { + f->marked_for_compaction = true; + } + } + // Since we have some more files to compact, we should also recompute + // compaction score + vstorage->ComputeCompactionScore(*cfd->GetLatestMutableCFOptions(), + CompactionOptionsFIFO()); + SchedulePendingCompaction(cfd); + MaybeScheduleFlushOrCompaction(); + } + return Status::OK(); +} +#endif // ROCKSDB_LITE + +} // namespace rocksdb diff --git a/db/db_test.cc b/db/db_test.cc index 555883bd1..a1473d954 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -27,6 +27,7 @@ #include "rocksdb/compaction_filter.h" #include "rocksdb/db.h" #include "rocksdb/env.h" +#include "rocksdb/experimental.h" #include "rocksdb/filter_policy.h" #include "rocksdb/perf_context.h" #include "rocksdb/slice.h" @@ -12491,6 +12492,93 @@ TEST_F(DBTest, CompressLevelCompaction) { Destroy(options); } +TEST_F(DBTest, SuggestCompactRangeTest) { + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleLevel; + options.write_buffer_size = 100 << 10; // 100KB + options.level0_file_num_compaction_trigger = 2; + options.num_levels = 3; + options.max_bytes_for_level_base = 400 * 1024; + + Reopen(options); + + Random rnd(301); + int key_idx = 0; + + // First three 110KB files are going to level 0 + // After that, (100K, 200K) + for (int num = 0; num < 3; num++) { + GenerateNewFile(&rnd, &key_idx); + } + + // Another 110KB triggers a compaction to 400K file to fill up level 0 + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ(4, GetSstFileCount(dbname_)); + + // (1, 4) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4", FilesPerLevel(0)); + + // (1, 4, 1) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,1", FilesPerLevel(0)); + + // (1, 4, 2) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,2", FilesPerLevel(0)); + + // (1, 4, 3) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,3", FilesPerLevel(0)); + + // (1, 4, 4) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,4", FilesPerLevel(0)); + + // (1, 4, 5) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,5", FilesPerLevel(0)); + + // (1, 4, 6) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,6", FilesPerLevel(0)); + + // (1, 4, 7) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,7", FilesPerLevel(0)); + + // (1, 4, 8) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,8", FilesPerLevel(0)); + + // compact it three times + for (int i = 0; i < 3; ++i) { + ASSERT_OK(experimental::SuggestCompactRange(db_, nullptr, nullptr)); + dbfull()->TEST_WaitForCompact(); + } + + ASSERT_EQ("0,0,13", FilesPerLevel(0)); + + GenerateNewFile(&rnd, &key_idx, false); + ASSERT_EQ("1,0,13", FilesPerLevel(0)); + + // nonoverlapping with the file on level 0 + Slice start("a"), end("b"); + ASSERT_OK(experimental::SuggestCompactRange(db_, &start, &end)); + dbfull()->TEST_WaitForCompact(); + + // should not compact the level 0 file + ASSERT_EQ("1,0,13", FilesPerLevel(0)); + + start = Slice("j"); + end = Slice("m"); + ASSERT_OK(experimental::SuggestCompactRange(db_, &start, &end)); + dbfull()->TEST_WaitForCompact(); + + // now it should compact the level 0 file + ASSERT_EQ("0,1,13", FilesPerLevel(0)); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/experimental.cc b/db/experimental.cc new file mode 100644 index 000000000..0056d0a57 --- /dev/null +++ b/db/experimental.cc @@ -0,0 +1,38 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "rocksdb/experimental.h" + +#include "db/db_impl.h" + +namespace rocksdb { +namespace experimental { + +#ifndef ROCKSDB_LITE + +Status SuggestCompactRange(DB* db, ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end) { + auto dbimpl = dynamic_cast(db); + if (dbimpl == nullptr) { + return Status::NotSupported("Didn't recognize DB object"); + } + return dbimpl->SuggestCompactRange(column_family, begin, end); +} + +#else // ROCKSDB_LITE + +Status SuggestCompactRange(DB* db, ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end) { + return Status::NotSupported("Not supported in RocksDB LITE"); +} + +#endif // ROCKSDB_LITE + +Status SuggestCompactRange(DB* db, const Slice* begin, const Slice* end) { + return SuggestCompactRange(db, db->DefaultColumnFamily(), begin, end); +} + +} // namespace experimental +} // namespace rocksdb diff --git a/db/version_edit.h b/db/version_edit.h index 004855ff9..6da4f5b02 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -87,6 +87,9 @@ struct FileMetaData { bool init_stats_from_file; // true if the data-entry stats of this file // has initialized from file. + bool marked_for_compaction; // True if client asked us nicely to compact this + // file. + FileMetaData() : refs(0), being_compacted(false), @@ -96,7 +99,8 @@ struct FileMetaData { num_deletions(0), raw_key_size(0), raw_value_size(0), - init_stats_from_file(false) {} + init_stats_from_file(false), + marked_for_compaction(false) {} }; // A compressed copy of file meta data that just contain diff --git a/db/version_set.cc b/db/version_set.cc index 05562ce93..55950e0b4 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -998,9 +998,9 @@ void VersionStorageInfo::ComputeCompactionScore( // overwrites/deletions). int num_sorted_runs = 0; uint64_t total_size = 0; - for (unsigned int i = 0; i < files_[level].size(); i++) { - if (!files_[level][i]->being_compacted) { - total_size += files_[level][i]->compensated_file_size; + for (auto* f : files_[level]) { + if (!f->being_compacted) { + total_size += f->compensated_file_size; num_sorted_runs++; } } @@ -1033,7 +1033,7 @@ void VersionStorageInfo::ComputeCompactionScore( // Compute the ratio of current size to size limit. uint64_t level_bytes_no_compacting = 0; for (auto f : files_[level]) { - if (f && f->being_compacted == false) { + if (!f->being_compacted) { level_bytes_no_compacting += f->compensated_file_size; } } @@ -1066,6 +1066,18 @@ void VersionStorageInfo::ComputeCompactionScore( } } } + ComputeFilesMarkedForCompaction(); +} + +void VersionStorageInfo::ComputeFilesMarkedForCompaction() { + files_marked_for_compaction_.clear(); + for (int level = 0; level <= MaxInputLevel(); level++) { + for (auto* f : files_[level]) { + if (!f->being_compacted && f->marked_for_compaction) { + files_marked_for_compaction_.emplace_back(level, f); + } + } + } } namespace { diff --git a/db/version_set.h b/db/version_set.h index 1a1d67be0..267a8ba34 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -18,13 +18,15 @@ // synchronization on all accesses. #pragma once +#include +#include +#include #include #include #include +#include #include -#include -#include -#include + #include "db/dbformat.h" #include "db/version_builder.h" #include "db/version_edit.h" @@ -119,6 +121,10 @@ class VersionStorageInfo { const MutableCFOptions& mutable_cf_options, const CompactionOptionsFIFO& compaction_options_fifo); + // This computes files_marked_for_compaction_ and is called by + // ComputeCompactionScore() + void ComputeFilesMarkedForCompaction(); + // Generate level_files_brief_ from files_ void GenerateLevelFilesBrief(); // Sort all files for this version based on their file size and @@ -222,6 +228,14 @@ class VersionStorageInfo { return files_by_size_[level]; } + // REQUIRES: This version has been saved (see VersionSet::SaveTo) + // REQUIRES: DB mutex held during access + const autovector>& FilesMarkedForCompaction() + const { + assert(finalized_); + return files_marked_for_compaction_; + } + int base_level() const { return base_level_; } // REQUIRES: lock is held @@ -340,6 +354,11 @@ class VersionStorageInfo { // seconds/minutes (because of concurrent compactions). static const size_t number_of_files_to_sort_ = 50; + // This vector contains list of files marked for compaction and also not + // currently being compacted. It is protected by DB mutex. It is calculated in + // ComputeCompactionScore() + autovector> files_marked_for_compaction_; + // Level that should be compacted next and its compaction score. // Score < 1 means compaction is not strictly needed. These fields // are initialized by Finalize(). diff --git a/include/rocksdb/experimental.h b/include/rocksdb/experimental.h new file mode 100644 index 000000000..35e7c240d --- /dev/null +++ b/include/rocksdb/experimental.h @@ -0,0 +1,20 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include "rocksdb/db.h" +#include "rocksdb/status.h" + +namespace rocksdb { +namespace experimental { + +// Supported only for Leveled compaction +Status SuggestCompactRange(DB* db, ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end); +Status SuggestCompactRange(DB* db, const Slice* begin, const Slice* end); + +} // namespace experimental +} // namespace rocksdb diff --git a/src.mk b/src.mk index 3fba1e95c..f4410e87b 100644 --- a/src.mk +++ b/src.mk @@ -11,7 +11,9 @@ LIB_SOURCES = \ db/db_impl.cc \ db/db_impl_debug.cc \ db/db_impl_readonly.cc \ + db/db_impl_experimental.cc \ db/db_iter.cc \ + db/experimental.cc \ db/file_indexer.cc \ db/filename.cc \ db/flush_job.cc \