Add experimental API MarkForCompaction()

Summary:
Some Mongo+Rocks datasets in Parse's environment are not doing compactions very frequently. During the quiet period (with no IO), we'd like to schedule compactions so that our reads become faster. Also, aggressively compacting during quiet periods helps when write bursts happen. In addition, we also want to compact files that are containing deleted key ranges (like old oplog keys).

All of this is currently not possible with CompactRange() because it's single-threaded and blocks all other compactions from happening. Running CompactRange() risks an issue of blocking writes because we generate too much Level 0 files before the compaction is over. Stopping writes is very dangerous because they hold transaction locks. We tried running manual compaction once on Mongo+Rocks and everything fell apart.

MarkForCompaction() solves all of those problems. This is very light-weight manual compaction. It is lower priority than automatic compactions, which means it shouldn't interfere with background process keeping the LSM tree clean. However, if no automatic compactions need to be run (or we have extra background threads available), we will start compacting files that are marked for compaction.

Test Plan: added a new unit test

Reviewers: yhchiang, rven, MarkCallaghan, sdong

Reviewed By: sdong

Subscribers: yoshinorim, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D37083
main
Igor Canadi 9 years ago
parent acf8a4141d
commit 6059bdf86a
  1. 37
      db/compaction_picker.cc
  2. 8
      db/compaction_picker.h
  3. 6
      db/compaction_picker_test.cc
  4. 4
      db/db_impl.h
  5. 59
      db/db_impl_experimental.cc
  6. 88
      db/db_test.cc
  7. 38
      db/experimental.cc
  8. 6
      db/version_edit.h
  9. 20
      db/version_set.cc
  10. 25
      db/version_set.h
  11. 20
      include/rocksdb/experimental.h
  12. 2
      src.mk

@ -667,6 +667,9 @@ Status CompactionPicker::SanitizeCompactionInputFiles(
bool LevelCompactionPicker::NeedsCompaction(const VersionStorageInfo* vstorage)
const {
if (!vstorage->FilesMarkedForCompaction().empty()) {
return true;
}
for (int i = 0; i <= vstorage->MaxInputLevel(); i++) {
if (vstorage->CompactionScore(i) >= 1) {
return true;
@ -675,6 +678,31 @@ bool LevelCompactionPicker::NeedsCompaction(const VersionStorageInfo* vstorage)
return false;
}
void LevelCompactionPicker::PickFilesMarkedForCompactionExperimental(
const std::string& cf_name, VersionStorageInfo* vstorage,
CompactionInputFiles* inputs, int* level, int* output_level) {
if (vstorage->FilesMarkedForCompaction().empty()) {
return;
}
for (auto& level_file : vstorage->FilesMarkedForCompaction()) {
// If it's being compaction it has nothing to do here.
// If this assert() fails that means that some function marked some
// files as being_compacted, but didn't call ComputeCompactionScore()
assert(!level_file.second->being_compacted);
*level = level_file.first;
*output_level = (*level == 0) ? vstorage->base_level() : *level + 1;
inputs->files = {level_file.second};
inputs->level = *level;
if (ExpandWhileOverlapping(cf_name, vstorage, inputs)) {
// found the compaction!
return;
}
}
inputs->files.clear();
}
Compaction* LevelCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
@ -697,10 +725,19 @@ Compaction* LevelCompactionPicker::PickCompaction(
ExpandWhileOverlapping(cf_name, vstorage, &inputs)) {
// found the compaction!
break;
} else {
// didn't find the compaction, clear the inputs
inputs.clear();
}
}
}
// if we didn't find a compaction, check if there are any files marked for
// compaction
if (inputs.empty()) {
PickFilesMarkedForCompactionExperimental(cf_name, vstorage, &inputs, &level,
&output_level);
}
if (inputs.empty()) {
return nullptr;
}

@ -197,6 +197,14 @@ class LevelCompactionPicker : public CompactionPicker {
bool PickCompactionBySize(VersionStorageInfo* vstorage, int level,
int output_level, CompactionInputFiles* inputs,
int* parent_index, int* base_index);
// If there is any file marked for compaction, put put it into inputs.
// This is still experimental. It will return meaningful results only if
// clients call experimental feature SuggestCompactRange()
void PickFilesMarkedForCompactionExperimental(const std::string& cf_name,
VersionStorageInfo* vstorage,
CompactionInputFiles* inputs,
int* level, int* output_level);
};
#ifndef ROCKSDB_LITE

@ -350,13 +350,16 @@ TEST_F(CompactionPickerTest, NeedsCompactionUniversal) {
// must return false when there's no files.
ASSERT_EQ(universal_compaction_picker.NeedsCompaction(vstorage_.get()),
false);
UpdateVersionStorageInfo();
// verify the trigger given different number of L0 files.
for (int i = 1;
i <= mutable_cf_options_.level0_file_num_compaction_trigger * 2; ++i) {
NewVersionStorage(1, kCompactionStyleUniversal);
Add(0, i, ToString((i + 100) * 1000).c_str(),
ToString((i + 100) * 1000 + 999).c_str(), 1000000, 0, i * 100,
i * 100 + 99);
UpdateVersionStorageInfo();
ASSERT_EQ(level_compaction_picker.NeedsCompaction(vstorage_.get()),
vstorage_->CompactionScore(0) >= 1);
}
@ -373,6 +376,7 @@ TEST_F(CompactionPickerTest, NeedsCompactionFIFO) {
ioptions_.compaction_options_fifo = fifo_options_;
FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_);
UpdateVersionStorageInfo();
// must return false when there's no files.
ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), false);
@ -380,10 +384,12 @@ TEST_F(CompactionPickerTest, NeedsCompactionFIFO) {
// size of L0 files.
uint64_t current_size = 0;
for (int i = 1; i <= kFileCount; ++i) {
NewVersionStorage(1, kCompactionStyleFIFO);
Add(0, i, ToString((i + 100) * 1000).c_str(),
ToString((i + 100) * 1000 + 999).c_str(),
kFileSize, 0, i * 100, i * 100 + 99);
current_size += kFileSize;
UpdateVersionStorageInfo();
ASSERT_EQ(level_compaction_picker.NeedsCompaction(vstorage_.get()),
vstorage_->CompactionScore(0) >= 1);
}

@ -183,6 +183,10 @@ class DBImpl : public DB {
ColumnFamilyHandle* column_family,
ColumnFamilyMetaData* metadata) override;
// experimental API
Status SuggestCompactRange(ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end);
#endif // ROCKSDB_LITE
// checks if all live files exist on file system and that their file sizes

@ -0,0 +1,59 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_impl.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <vector>
#include "db/column_family.h"
#include "db/version_set.h"
#include "rocksdb/status.h"
namespace rocksdb {
#ifndef ROCKSDB_LITE
Status DBImpl::SuggestCompactRange(ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
InternalKey start_key, end_key;
if (begin != nullptr) {
start_key = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
}
if (end != nullptr) {
end_key = InternalKey(*end, 0, static_cast<ValueType>(0));
}
{
InstrumentedMutexLock l(&mutex_);
auto vstorage = cfd->current()->storage_info();
for (int level = 0; level < vstorage->num_non_empty_levels(); ++level) {
std::vector<FileMetaData*> inputs;
vstorage->GetOverlappingInputs(
level, begin == nullptr ? nullptr : &start_key,
end == nullptr ? nullptr : &end_key, &inputs);
for (auto f : inputs) {
f->marked_for_compaction = true;
}
}
// Since we have some more files to compact, we should also recompute
// compaction score
vstorage->ComputeCompactionScore(*cfd->GetLatestMutableCFOptions(),
CompactionOptionsFIFO());
SchedulePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();
}
return Status::OK();
}
#endif // ROCKSDB_LITE
} // namespace rocksdb

@ -27,6 +27,7 @@
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/experimental.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/perf_context.h"
#include "rocksdb/slice.h"
@ -12491,6 +12492,93 @@ TEST_F(DBTest, CompressLevelCompaction) {
Destroy(options);
}
TEST_F(DBTest, SuggestCompactRangeTest) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleLevel;
options.write_buffer_size = 100 << 10; // 100KB
options.level0_file_num_compaction_trigger = 2;
options.num_levels = 3;
options.max_bytes_for_level_base = 400 * 1024;
Reopen(options);
Random rnd(301);
int key_idx = 0;
// First three 110KB files are going to level 0
// After that, (100K, 200K)
for (int num = 0; num < 3; num++) {
GenerateNewFile(&rnd, &key_idx);
}
// Another 110KB triggers a compaction to 400K file to fill up level 0
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(4, GetSstFileCount(dbname_));
// (1, 4)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4", FilesPerLevel(0));
// (1, 4, 1)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,1", FilesPerLevel(0));
// (1, 4, 2)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,2", FilesPerLevel(0));
// (1, 4, 3)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,3", FilesPerLevel(0));
// (1, 4, 4)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,4", FilesPerLevel(0));
// (1, 4, 5)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,5", FilesPerLevel(0));
// (1, 4, 6)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,6", FilesPerLevel(0));
// (1, 4, 7)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,7", FilesPerLevel(0));
// (1, 4, 8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,8", FilesPerLevel(0));
// compact it three times
for (int i = 0; i < 3; ++i) {
ASSERT_OK(experimental::SuggestCompactRange(db_, nullptr, nullptr));
dbfull()->TEST_WaitForCompact();
}
ASSERT_EQ("0,0,13", FilesPerLevel(0));
GenerateNewFile(&rnd, &key_idx, false);
ASSERT_EQ("1,0,13", FilesPerLevel(0));
// nonoverlapping with the file on level 0
Slice start("a"), end("b");
ASSERT_OK(experimental::SuggestCompactRange(db_, &start, &end));
dbfull()->TEST_WaitForCompact();
// should not compact the level 0 file
ASSERT_EQ("1,0,13", FilesPerLevel(0));
start = Slice("j");
end = Slice("m");
ASSERT_OK(experimental::SuggestCompactRange(db_, &start, &end));
dbfull()->TEST_WaitForCompact();
// now it should compact the level 0 file
ASSERT_EQ("0,1,13", FilesPerLevel(0));
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -0,0 +1,38 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "rocksdb/experimental.h"
#include "db/db_impl.h"
namespace rocksdb {
namespace experimental {
#ifndef ROCKSDB_LITE
Status SuggestCompactRange(DB* db, ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) {
auto dbimpl = dynamic_cast<DBImpl*>(db);
if (dbimpl == nullptr) {
return Status::NotSupported("Didn't recognize DB object");
}
return dbimpl->SuggestCompactRange(column_family, begin, end);
}
#else // ROCKSDB_LITE
Status SuggestCompactRange(DB* db, ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) {
return Status::NotSupported("Not supported in RocksDB LITE");
}
#endif // ROCKSDB_LITE
Status SuggestCompactRange(DB* db, const Slice* begin, const Slice* end) {
return SuggestCompactRange(db, db->DefaultColumnFamily(), begin, end);
}
} // namespace experimental
} // namespace rocksdb

@ -87,6 +87,9 @@ struct FileMetaData {
bool init_stats_from_file; // true if the data-entry stats of this file
// has initialized from file.
bool marked_for_compaction; // True if client asked us nicely to compact this
// file.
FileMetaData()
: refs(0),
being_compacted(false),
@ -96,7 +99,8 @@ struct FileMetaData {
num_deletions(0),
raw_key_size(0),
raw_value_size(0),
init_stats_from_file(false) {}
init_stats_from_file(false),
marked_for_compaction(false) {}
};
// A compressed copy of file meta data that just contain

@ -998,9 +998,9 @@ void VersionStorageInfo::ComputeCompactionScore(
// overwrites/deletions).
int num_sorted_runs = 0;
uint64_t total_size = 0;
for (unsigned int i = 0; i < files_[level].size(); i++) {
if (!files_[level][i]->being_compacted) {
total_size += files_[level][i]->compensated_file_size;
for (auto* f : files_[level]) {
if (!f->being_compacted) {
total_size += f->compensated_file_size;
num_sorted_runs++;
}
}
@ -1033,7 +1033,7 @@ void VersionStorageInfo::ComputeCompactionScore(
// Compute the ratio of current size to size limit.
uint64_t level_bytes_no_compacting = 0;
for (auto f : files_[level]) {
if (f && f->being_compacted == false) {
if (!f->being_compacted) {
level_bytes_no_compacting += f->compensated_file_size;
}
}
@ -1066,6 +1066,18 @@ void VersionStorageInfo::ComputeCompactionScore(
}
}
}
ComputeFilesMarkedForCompaction();
}
void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
files_marked_for_compaction_.clear();
for (int level = 0; level <= MaxInputLevel(); level++) {
for (auto* f : files_[level]) {
if (!f->being_compacted && f->marked_for_compaction) {
files_marked_for_compaction_.emplace_back(level, f);
}
}
}
}
namespace {

@ -18,13 +18,15 @@
// synchronization on all accesses.
#pragma once
#include <atomic>
#include <deque>
#include <limits>
#include <map>
#include <memory>
#include <set>
#include <utility>
#include <vector>
#include <deque>
#include <atomic>
#include <limits>
#include "db/dbformat.h"
#include "db/version_builder.h"
#include "db/version_edit.h"
@ -119,6 +121,10 @@ class VersionStorageInfo {
const MutableCFOptions& mutable_cf_options,
const CompactionOptionsFIFO& compaction_options_fifo);
// This computes files_marked_for_compaction_ and is called by
// ComputeCompactionScore()
void ComputeFilesMarkedForCompaction();
// Generate level_files_brief_ from files_
void GenerateLevelFilesBrief();
// Sort all files for this version based on their file size and
@ -222,6 +228,14 @@ class VersionStorageInfo {
return files_by_size_[level];
}
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
// REQUIRES: DB mutex held during access
const autovector<std::pair<int, FileMetaData*>>& FilesMarkedForCompaction()
const {
assert(finalized_);
return files_marked_for_compaction_;
}
int base_level() const { return base_level_; }
// REQUIRES: lock is held
@ -340,6 +354,11 @@ class VersionStorageInfo {
// seconds/minutes (because of concurrent compactions).
static const size_t number_of_files_to_sort_ = 50;
// This vector contains list of files marked for compaction and also not
// currently being compacted. It is protected by DB mutex. It is calculated in
// ComputeCompactionScore()
autovector<std::pair<int, FileMetaData*>> files_marked_for_compaction_;
// Level that should be compacted next and its compaction score.
// Score < 1 means compaction is not strictly needed. These fields
// are initialized by Finalize().

@ -0,0 +1,20 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include "rocksdb/db.h"
#include "rocksdb/status.h"
namespace rocksdb {
namespace experimental {
// Supported only for Leveled compaction
Status SuggestCompactRange(DB* db, ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end);
Status SuggestCompactRange(DB* db, const Slice* begin, const Slice* end);
} // namespace experimental
} // namespace rocksdb

@ -11,7 +11,9 @@ LIB_SOURCES = \
db/db_impl.cc \
db/db_impl_debug.cc \
db/db_impl_readonly.cc \
db/db_impl_experimental.cc \
db/db_iter.cc \
db/experimental.cc \
db/file_indexer.cc \
db/filename.cc \
db/flush_job.cc \

Loading…
Cancel
Save