Fix CompactRange to apply filter to every key

Summary:
When doing CompactRange(), we should first flush the memtable and then calculate max_level_with_files. Also, we want to compact all the levels that have files, including level `max_level_with_files`.

This patch fixed the unit test.

Test Plan: Added a failing unit test and a fix, so it's not failing anymore.

Reviewers: dhruba, haobo, sdong

Reviewed By: haobo

CC: leveldb, xjin

Differential Revision: https://reviews.facebook.net/D14421
main
Igor Canadi 11 years ago
parent 1ed2404f27
commit d9cd7a063f
  1. 85
      db/db_impl.cc
  2. 12
      db/db_impl.h
  3. 68
      db/db_test.cc
  4. 51
      db/version_set.cc
  5. 16
      db/version_set.h
  6. 1
      include/rocksdb/db.h
  7. 75
      util/manual_compaction_test.cc

@ -1278,8 +1278,11 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
return s;
}
void DBImpl::CompactRange(const Slice* begin, const Slice* end,
bool reduce_level, int target_level) {
void DBImpl::CompactRange(const Slice* begin,
const Slice* end,
bool reduce_level,
int target_level) {
FlushMemTable(FlushOptions());
int max_level_with_files = 1;
{
MutexLock l(&mutex_);
@ -1290,9 +1293,15 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end,
}
}
}
TEST_FlushMemTable(); // TODO(sanjay): Skip if memtable does not overlap
for (int level = 0; level < max_level_with_files; level++) {
TEST_CompactRange(level, begin, end);
for (int level = 0; level <= max_level_with_files; level++) {
// in case the compaction is unversal or if we're compacting the
// bottom-most level, the output level will be the same as input one
if (options_.compaction_style == kCompactionStyleUniversal ||
level == max_level_with_files) {
RunManualCompaction(level, level, begin, end);
} else {
RunManualCompaction(level, level + 1, begin, end);
}
}
if (reduce_level) {
@ -1591,13 +1600,17 @@ Status DBImpl::AppendSortedWalsOfType(const std::string& path,
return status;
}
void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
assert(level >= 0);
void DBImpl::RunManualCompaction(int input_level,
int output_level,
const Slice* begin,
const Slice* end) {
assert(input_level >= 0);
InternalKey begin_storage, end_storage;
ManualCompaction manual;
manual.level = level;
manual.input_level = input_level;
manual.output_level = output_level;
manual.done = false;
manual.in_progress = false;
// For universal compaction, we enforce every manual compaction to compact
@ -1625,11 +1638,11 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
// can compact any range of keys/files.
//
// bg_manual_only_ is non-zero when at least one thread is inside
// TEST_CompactRange(), i.e. during that time no other compaction will
// RunManualCompaction(), i.e. during that time no other compaction will
// get scheduled (see MaybeScheduleFlushOrCompaction).
//
// Note that the following loop doesn't stop more that one thread calling
// TEST_CompactRange() from getting to the second while loop below.
// RunManualCompaction() from getting to the second while loop below.
// However, only one of them will actually schedule compaction, while
// others will wait on a condition variable until it completes.
@ -1659,6 +1672,15 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
--bg_manual_only_;
}
void DBImpl::TEST_CompactRange(int level,
const Slice* begin,
const Slice* end) {
int output_level = (options_.compaction_style == kCompactionStyleUniversal)
? level
: level + 1;
RunManualCompaction(level, output_level, begin, end);
}
Status DBImpl::FlushMemTable(const FlushOptions& options) {
// nullptr batch means just wait for earlier writes to be done
Status s = Write(WriteOptions(), nullptr);
@ -1878,23 +1900,27 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
unique_ptr<Compaction> c;
bool is_manual = (manual_compaction_ != nullptr) &&
(manual_compaction_->in_progress == false);
InternalKey manual_end;
InternalKey manual_end_storage;
InternalKey* manual_end = &manual_end_storage;
if (is_manual) {
ManualCompaction* m = manual_compaction_;
assert(!m->in_progress);
m->in_progress = true; // another thread cannot pick up the same work
c.reset(versions_->CompactRange(m->level, m->begin, m->end));
if (c) {
manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
} else {
c.reset(versions_->CompactRange(
m->input_level, m->output_level, m->begin, m->end, &manual_end));
if (!c) {
m->done = true;
}
Log(options_.info_log,
"Manual compaction at level-%d from %s .. %s; will stop at %s\n",
m->level,
"Manual compaction from level-%d to level-%d from %s .. %s; will stop "
"at %s\n",
m->input_level,
m->output_level,
(m->begin ? m->begin->DebugString().c_str() : "(begin)"),
(m->end ? m->end->DebugString().c_str() : "(end)"),
(m->done ? "(end)" : manual_end.DebugString().c_str()));
((m->done || manual_end == nullptr)
? "(end)"
: manual_end->DebugString().c_str()));
} else if (!options_.disable_auto_compactions) {
c.reset(versions_->PickCompaction());
}
@ -1959,13 +1985,19 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
// Also note that, if we don't stop here, then the current compaction
// writes a new file back to level 0, which will be used in successive
// compaction. Hence the manual compaction will never finish.
if (options_.compaction_style == kCompactionStyleUniversal) {
//
// Stop the compaction if manual_end points to nullptr -- this means
// that we compacted the whole range. manual_end should always point
// to nullptr in case of universal compaction
if (manual_end == nullptr) {
m->done = true;
}
if (!m->done) {
// We only compacted part of the requested range. Update *m
// to the range that is left to be compacted.
m->tmp_storage = manual_end;
// Universal compaction should always compact the whole range
assert(options_.compaction_style != kCompactionStyleUniversal);
m->tmp_storage = *manual_end;
m->begin = &m->tmp_storage;
}
m->in_progress = false; // not being processed anymore
@ -1997,14 +2029,14 @@ void DBImpl::CleanupCompaction(CompactionState* compact, Status status) {
}
// Allocate the file numbers for the output file. We allocate as
// many output file numbers as there are files in level+1.
// many output file numbers as there are files in level+1 (at least one)
// Insert them into pending_outputs so that they do not get deleted.
void DBImpl::AllocateCompactionOutputFileNumbers(CompactionState* compact) {
mutex_.AssertHeld();
assert(compact != nullptr);
assert(compact->builder == nullptr);
int filesNeeded = compact->compaction->num_input_files(1);
for (int i = 0; i < filesNeeded; i++) {
for (int i = 0; i < std::max(filesNeeded, 1); i++) {
uint64_t file_number = versions_->NewFileNumber();
pending_outputs_.insert(file_number);
compact->allocated_file_numbers.push_back(file_number);
@ -2148,14 +2180,11 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
// Add compaction outputs
compact->compaction->AddInputDeletions(compact->compaction->edit());
const int level = compact->compaction->level();
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
compact->compaction->edit()->AddFile(
(options_.compaction_style == kCompactionStyleUniversal) ?
level : level + 1,
out.number, out.file_size, out.smallest, out.largest,
out.smallest_seqno, out.largest_seqno);
compact->compaction->output_level(), out.number, out.file_size,
out.smallest, out.largest, out.smallest_seqno, out.largest_seqno);
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}
@ -2197,7 +2226,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
compact->compaction->num_input_files(0),
compact->compaction->level(),
compact->compaction->num_input_files(1),
compact->compaction->level() + 1,
compact->compaction->output_level(),
compact->compaction->score(),
options_.max_background_compactions - bg_compaction_scheduled_);
char scratch[256];

@ -89,10 +89,17 @@ class DBImpl : public DB {
virtual Status GetDbIdentity(std::string& identity);
void RunManualCompaction(int input_level,
int output_level,
const Slice* begin,
const Slice* end);
// Extra methods (for testing) that are not in the public DB interface
// Compact any files in the named level that overlap [*begin, *end]
void TEST_CompactRange(int level, const Slice* begin, const Slice* end);
void TEST_CompactRange(int level,
const Slice* begin,
const Slice* end);
// Force current memtable contents to be flushed.
Status TEST_FlushMemTable();
@ -406,7 +413,8 @@ class DBImpl : public DB {
// Information for a manual compaction
struct ManualCompaction {
int level;
int input_level;
int output_level;
bool done;
bool in_progress; // compaction request being processed?
const InternalKey* begin; // nullptr means beginning of key range

@ -3309,34 +3309,46 @@ TEST(DBTest, ManualCompaction) {
ASSERT_EQ(dbfull()->MaxMemCompactionLevel(), 2)
<< "Need to update this test to match kMaxMemCompactLevel";
MakeTables(3, "p", "q");
ASSERT_EQ("1,1,1", FilesPerLevel());
// Compaction range falls before files
Compact("", "c");
ASSERT_EQ("1,1,1", FilesPerLevel());
// Compaction range falls after files
Compact("r", "z");
ASSERT_EQ("1,1,1", FilesPerLevel());
// Compaction range overlaps files
Compact("p1", "p9");
ASSERT_EQ("0,0,1", FilesPerLevel());
// Populate a different range
MakeTables(3, "c", "e");
ASSERT_EQ("1,1,2", FilesPerLevel());
// Compact just the new range
Compact("b", "f");
ASSERT_EQ("0,0,2", FilesPerLevel());
// Compact all
MakeTables(1, "a", "z");
ASSERT_EQ("0,1,2", FilesPerLevel());
db_->CompactRange(nullptr, nullptr);
ASSERT_EQ("0,0,1", FilesPerLevel());
// iter - 0 with 7 levels
// iter - 1 with 3 levels
for (int iter = 0; iter < 2; ++iter) {
MakeTables(3, "p", "q");
ASSERT_EQ("1,1,1", FilesPerLevel());
// Compaction range falls before files
Compact("", "c");
ASSERT_EQ("1,1,1", FilesPerLevel());
// Compaction range falls after files
Compact("r", "z");
ASSERT_EQ("1,1,1", FilesPerLevel());
// Compaction range overlaps files
Compact("p1", "p9");
ASSERT_EQ("0,0,1", FilesPerLevel());
// Populate a different range
MakeTables(3, "c", "e");
ASSERT_EQ("1,1,2", FilesPerLevel());
// Compact just the new range
Compact("b", "f");
ASSERT_EQ("0,0,2", FilesPerLevel());
// Compact all
MakeTables(1, "a", "z");
ASSERT_EQ("0,1,2", FilesPerLevel());
db_->CompactRange(nullptr, nullptr);
ASSERT_EQ("0,0,1", FilesPerLevel());
if (iter == 0) {
Options options = CurrentOptions();
options.num_levels = 3;
options.create_if_missing = true;
DestroyAndReopen(&options);
}
}
}
TEST(DBTest, DBOpen_Options) {

@ -2715,6 +2715,7 @@ Compaction* VersionSet::PickCompaction() {
bool VersionSet::ParentRangeInCompaction(const InternalKey* smallest,
const InternalKey* largest, int level, int* parent_index) {
std::vector<FileMetaData*> inputs;
assert(level + 1 < NumberLevels());
current_->GetOverlappingInputs(level+1, smallest, largest,
&inputs, *parent_index, parent_index);
@ -2776,7 +2777,8 @@ void VersionSet::ExpandWhileOverlapping(Compaction* c) {
// compaction, then we must drop/cancel this compaction.
int parent_index = -1;
if (FilesInCompaction(c->inputs_[0]) ||
ParentRangeInCompaction(&smallest, &largest, level, &parent_index)) {
(c->level() != c->output_level() &&
ParentRangeInCompaction(&smallest, &largest, level, &parent_index))) {
c->inputs_[0].clear();
c->inputs_[1].clear();
delete c;
@ -2790,7 +2792,9 @@ void VersionSet::ExpandWhileOverlapping(Compaction* c) {
// user-key with another file.
void VersionSet::SetupOtherInputs(Compaction* c) {
// If inputs are empty, then there is nothing to expand.
if (c->inputs_[0].empty()) {
// If both input and output levels are the same, no need to consider
// files at level "level+1"
if (c->inputs_[0].empty() || c->level() == c->output_level()) {
return;
}
@ -2918,11 +2922,13 @@ void VersionSet::GetObsoleteFiles(std::vector<FileMetaData*>* files) {
obsolete_files_.clear();
}
Compaction* VersionSet::CompactRange(
int level,
const InternalKey* begin,
const InternalKey* end) {
Compaction* VersionSet::CompactRange(int input_level,
int output_level,
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end) {
std::vector<FileMetaData*> inputs;
bool covering_the_whole_range = true;
// All files are 'overlapping' in universal style compaction.
// We have to compact the entire range in one shot.
@ -2930,7 +2936,7 @@ Compaction* VersionSet::CompactRange(
begin = nullptr;
end = nullptr;
}
current_->GetOverlappingInputs(level, begin, end, &inputs);
current_->GetOverlappingInputs(input_level, begin, end, &inputs);
if (inputs.empty()) {
return nullptr;
}
@ -2939,24 +2945,26 @@ Compaction* VersionSet::CompactRange(
// But we cannot do this for level-0 since level-0 files can overlap
// and we must not pick one file and drop another older file if the
// two files overlap.
if (level > 0) {
const uint64_t limit = MaxFileSizeForLevel(level) *
options_->source_compaction_factor;
if (input_level > 0) {
const uint64_t limit =
MaxFileSizeForLevel(input_level) * options_->source_compaction_factor;
uint64_t total = 0;
for (size_t i = 0; i < inputs.size(); ++i) {
for (size_t i = 0; i + 1 < inputs.size(); ++i) {
uint64_t s = inputs[i]->file_size;
total += s;
if (total >= limit) {
**compaction_end = inputs[i + 1]->smallest;
covering_the_whole_range = false;
inputs.resize(i + 1);
break;
}
}
}
int out_level = (options_->compaction_style == kCompactionStyleUniversal) ?
level : level+1;
Compaction* c = new Compaction(level, out_level, MaxFileSizeForLevel(out_level),
MaxGrandParentOverlapBytes(level), NumberLevels());
Compaction* c = new Compaction(input_level,
output_level,
MaxFileSizeForLevel(output_level),
MaxGrandParentOverlapBytes(input_level),
NumberLevels());
c->inputs_[0] = inputs;
ExpandWhileOverlapping(c);
@ -2969,6 +2977,10 @@ Compaction* VersionSet::CompactRange(
c->input_version_->Ref();
SetupOtherInputs(c);
if (covering_the_whole_range) {
*compaction_end = nullptr;
}
// These files that are to be manaully compacted do not trample
// upon other files because manual compactions are processed when
// the system has a max of 1 background compaction thread.
@ -3016,7 +3028,10 @@ bool Compaction::IsTrivialMove() const {
// Avoid a move if there is lots of overlapping grandparent data.
// Otherwise, the move could create a parent file that will require
// a very expensive merge later on.
return (num_input_files(0) == 1 &&
// If level_== out_level_, the purpose is to force compaction filter to be
// applied to that level, and thus cannot be a trivia move.
return (level_ != out_level_ &&
num_input_files(0) == 1 &&
num_input_files(1) == 0 &&
TotalFileSize(grandparents_) <= maxGrandParentOverlapBytes_);
}
@ -3109,7 +3124,7 @@ void Compaction::SetupBottomMostLevel(bool isManual) {
}
bottommost_level_ = true;
int num_levels = input_version_->vset_->NumberLevels();
for (int i = level() + 2; i < num_levels; i++) {
for (int i = output_level() + 1; i < num_levels; i++) {
if (input_version_->vset_->NumLevelFiles(i) > 0) {
bottommost_level_ = false;
break;

@ -310,10 +310,18 @@ class VersionSet {
// the specified level. Returns nullptr if there is nothing in that
// level that overlaps the specified range. Caller should delete
// the result.
Compaction* CompactRange(
int level,
const InternalKey* begin,
const InternalKey* end);
//
// The returned Compaction might not include the whole requested range.
// In that case, compaction_end will be set to the next key that needs
// compacting. In case the compaction will compact the whole range,
// compaction_end will be set to nullptr.
// Client is responsible for compaction_end storage -- when called,
// *compaction_end should point to valid InternalKey!
Compaction* CompactRange(int input_level,
int output_level,
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end);
// Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1.

@ -199,6 +199,7 @@ class DB {
uint64_t* sizes) = 0;
// Compact the underlying storage for the key range [*begin,*end].
// The actual compaction interval might be superset of [*begin, *end].
// In particular, deleted and overwritten versions are discarded,
// and the data is rearranged to reduce the cost of operations
// needed to access the data. This operation should typically only

@ -9,9 +9,13 @@
#include <cstdlib>
#include "rocksdb/db.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/slice.h"
#include "rocksdb/write_batch.h"
#include "util/testharness.h"
using namespace rocksdb;
namespace {
const int kNumKeys = 1100000;
@ -26,12 +30,71 @@ std::string Key2(int i) {
return Key1(i) + "_xxx";
}
class ManualCompactionTest { };
class ManualCompactionTest {
public:
ManualCompactionTest() {
// Get rid of any state from an old run.
dbname_ = rocksdb::test::TmpDir() + "/rocksdb_cbug_test";
DestroyDB(dbname_, rocksdb::Options());
}
std::string dbname_;
};
class DestroyAllCompactionFilter : public CompactionFilter {
public:
DestroyAllCompactionFilter() {}
virtual bool Filter(int level,
const Slice& key,
const Slice& existing_value,
std::string* new_value,
bool* value_changed) const {
return existing_value.ToString() == "destroy";
}
virtual const char* Name() const {
return "DestroyAllCompactionFilter";
}
};
TEST(ManualCompactionTest, CompactTouchesAllKeys) {
for (int iter = 0; iter < 2; ++iter) {
DB* db;
Options options;
if (iter == 0) { // level compaction
options.num_levels = 3;
options.compaction_style = kCompactionStyleLevel;
} else { // universal compaction
options.compaction_style = kCompactionStyleUniversal;
}
options.create_if_missing = true;
options.compression = rocksdb::kNoCompression;
options.compaction_filter = new DestroyAllCompactionFilter();
ASSERT_OK(DB::Open(options, dbname_, &db));
db->Put(WriteOptions(), Slice("key1"), Slice("destroy"));
db->Put(WriteOptions(), Slice("key2"), Slice("destroy"));
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
db->Put(WriteOptions(), Slice("key4"), Slice("destroy"));
Slice key4("key4");
db->CompactRange(nullptr, &key4);
Iterator* itr = db->NewIterator(ReadOptions());
itr->SeekToFirst();
ASSERT_TRUE(itr->Valid());
ASSERT_EQ("key3", itr->key().ToString());
itr->Next();
ASSERT_TRUE(!itr->Valid());
delete itr;
delete options.compaction_filter;
delete db;
DestroyDB(dbname_, options);
}
}
TEST(ManualCompactionTest, Test) {
// Get rid of any state from an old run.
std::string dbpath = rocksdb::test::TmpDir() + "/rocksdb_cbug_test";
DestroyDB(dbpath, rocksdb::Options());
// Open database. Disable compression since it affects the creation
// of layers and the code below is trying to test against a very
@ -40,7 +103,7 @@ TEST(ManualCompactionTest, Test) {
rocksdb::Options db_options;
db_options.create_if_missing = true;
db_options.compression = rocksdb::kNoCompression;
ASSERT_OK(rocksdb::DB::Open(db_options, dbpath, &db));
ASSERT_OK(rocksdb::DB::Open(db_options, dbname_, &db));
// create first key range
rocksdb::WriteBatch batch;
@ -83,7 +146,7 @@ TEST(ManualCompactionTest, Test) {
// close database
delete db;
DestroyDB(dbpath, rocksdb::Options());
DestroyDB(dbname_, rocksdb::Options());
}
} // anonymous namespace

Loading…
Cancel
Save