Fix CompactRange for universal compaction with num_levels > 1

Summary:
CompactRange for universal compaction with num_levels > 1 seems to have a bug. The unit test also has a bug so it doesn't capture the problem.
Fix it. Revert the compact range to the logic equivalent to num_levels=1. Always compact all files together.

It should also fix DBTest.IncreaseUniversalCompactionNumLevels. The issue was that options.write_buffer_size = 100 << 10 and options.write_buffer_size = 100 << 10 are not used in later test scenarios. So write_buffer_size of 4MB was used. The compaction trigger condition is not anymore obvious as expected.

Test Plan: Run the new test and all test suites

Reviewers: yhchiang, rven, kradhakrishnan, anthony, igor

Reviewed By: igor

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D37551
main
sdong 10 years ago
parent e003d3864c
commit d01bbb53ae
  1. 2
      db/column_family.cc
  2. 3
      db/column_family.h
  3. 40
      db/compaction_picker.cc
  4. 51
      db/db_impl.cc
  5. 32
      db/db_test.cc

@ -535,6 +535,8 @@ Compaction* ColumnFamilyData::PickCompaction(
return result; return result;
} }
const int ColumnFamilyData::kCompactAllLevels = -1;
Compaction* ColumnFamilyData::CompactRange( Compaction* ColumnFamilyData::CompactRange(
const MutableCFOptions& mutable_cf_options, const MutableCFOptions& mutable_cf_options,
int input_level, int output_level, uint32_t output_path_id, int input_level, int output_level, uint32_t output_path_id,

@ -236,6 +236,9 @@ class ColumnFamilyData {
// REQUIRES: DB mutex held // REQUIRES: DB mutex held
Compaction* PickCompaction(const MutableCFOptions& mutable_options, Compaction* PickCompaction(const MutableCFOptions& mutable_options,
LogBuffer* log_buffer); LogBuffer* log_buffer);
// A flag to tell a manual compaction is to compact all levels together
// instad of for specific level.
static const int kCompactAllLevels;
// REQUIRES: DB mutex held // REQUIRES: DB mutex held
Compaction* CompactRange( Compaction* CompactRange(
const MutableCFOptions& mutable_cf_options, const MutableCFOptions& mutable_cf_options,

@ -16,6 +16,7 @@
#include <inttypes.h> #include <inttypes.h>
#include <limits> #include <limits>
#include <string> #include <string>
#include "db/column_family.h"
#include "db/filename.h" #include "db/filename.h"
#include "util/log_buffer.h" #include "util/log_buffer.h"
#include "util/statistics.h" #include "util/statistics.h"
@ -357,6 +358,45 @@ Compaction* CompactionPicker::CompactRange(
// CompactionPickerFIFO has its own implementation of compact range // CompactionPickerFIFO has its own implementation of compact range
assert(ioptions_.compaction_style != kCompactionStyleFIFO); assert(ioptions_.compaction_style != kCompactionStyleFIFO);
if (input_level == ColumnFamilyData::kCompactAllLevels) {
assert(ioptions_.compaction_style == kCompactionStyleUniversal);
// Universal compaction with more than one level always compacts all the
// files together to the last level.
assert(vstorage->num_levels() > 1);
// DBImpl::CompactRange() set output level to be the last level
assert(output_level == vstorage->num_levels() - 1);
// DBImpl::RunManualCompaction will make full range for universal compaction
assert(begin == nullptr);
assert(end == nullptr);
*compaction_end = nullptr;
int start_level = 0;
for (; start_level < vstorage->num_levels() &&
vstorage->NumLevelFiles(start_level) == 0;
start_level++) {
}
if (start_level == vstorage->num_levels()) {
return nullptr;
}
std::vector<CompactionInputFiles> inputs(vstorage->num_levels() -
start_level);
for (int level = start_level; level < vstorage->num_levels(); level++) {
inputs[level - start_level].level = level;
auto& files = inputs[level - start_level].files;
for (FileMetaData* f : vstorage->LevelFiles(level)) {
files.push_back(f);
}
}
return new Compaction(
vstorage, mutable_cf_options, std::move(inputs), output_level,
mutable_cf_options.MaxFileSizeForLevel(output_level),
/* max_grandparent_overlap_bytes */ LLONG_MAX, output_path_id,
GetCompressionType(ioptions_, output_level, 1),
/* grandparents */ {}, /* is manual */ true);
}
CompactionInputFiles inputs; CompactionInputFiles inputs;
inputs.level = input_level; inputs.level = input_level;
bool covering_the_whole_range = true; bool covering_the_whole_range = true;

@ -1266,25 +1266,41 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
} }
} }
} }
for (int level = 0; level <= max_level_with_files; level++) {
// in case the compaction is unversal or if we're compacting the if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
// bottom-most level, the output level will be the same as input one. cfd->NumberLevels() > 1) {
// level 0 can never be the bottommost level (i.e. if all files are in level // Always compact all files together.
// 0, we will compact to level 1) int output_level = 0;
if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal || if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
cfd->ioptions()->compaction_style == kCompactionStyleFIFO || cfd->NumberLevels() > 1) {
(level == max_level_with_files && level > 0)) { output_level = cfd->NumberLevels() - 1;
s = RunManualCompaction(cfd, level, level, target_path_id, begin, end);
} else {
// TODO(sdong) Skip empty levels if possible.
s = RunManualCompaction(cfd, level, level + 1, target_path_id, begin,
end);
} }
if (!s.ok()) { s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
LogFlush(db_options_.info_log); output_level, target_path_id, begin, end);
return s; } else {
for (int level = 0; level <= max_level_with_files; level++) {
// in case the compaction is unversal or if we're compacting the
// bottom-most level, the output level will be the same as input one.
// level 0 can never be the bottommost level (i.e. if all files are in
// level 0, we will compact to level 1)
if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
cfd->ioptions()->compaction_style == kCompactionStyleFIFO ||
(level == max_level_with_files && level > 0)) {
s = RunManualCompaction(cfd, level, level, target_path_id, begin, end);
} else {
// TODO(sdong) Skip empty levels if possible.
s = RunManualCompaction(cfd, level, level + 1, target_path_id, begin,
end);
}
if (!s.ok()) {
break;
}
} }
} }
if (!s.ok()) {
LogFlush(db_options_.info_log);
return s;
}
if (reduce_level) { if (reduce_level) {
s = ReFitLevel(cfd, max_level_with_files, target_level); s = ReFitLevel(cfd, max_level_with_files, target_level);
@ -1668,7 +1684,8 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const {
Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
int output_level, uint32_t output_path_id, int output_level, uint32_t output_path_id,
const Slice* begin, const Slice* end) { const Slice* begin, const Slice* end) {
assert(input_level >= 0); assert(input_level == ColumnFamilyData::kCompactAllLevels ||
input_level >= 0);
InternalKey begin_storage, end_storage; InternalKey begin_storage, end_storage;

@ -5250,7 +5250,7 @@ TEST_F(DBTest, IncreaseUniversalCompactionNumLevels) {
int max_key3 = 800; int max_key3 = 800;
// Stage 1: open a DB with universal compaction, num_levels=1 // Stage 1: open a DB with universal compaction, num_levels=1
Options options; Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal; options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 1; options.num_levels = 1;
options.write_buffer_size = 100 << 10; // 100KB options.write_buffer_size = 100 << 10; // 100KB
@ -5272,7 +5272,6 @@ TEST_F(DBTest, IncreaseUniversalCompactionNumLevels) {
ASSERT_EQ(non_level0_num_files, 0); ASSERT_EQ(non_level0_num_files, 0);
// Stage 2: reopen with universal compaction, num_levels=4 // Stage 2: reopen with universal compaction, num_levels=4
options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal; options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 4; options.num_levels = 4;
options = CurrentOptions(options); options = CurrentOptions(options);
@ -5302,7 +5301,6 @@ TEST_F(DBTest, IncreaseUniversalCompactionNumLevels) {
// Need to restart it once to remove higher level records in manifest. // Need to restart it once to remove higher level records in manifest.
ReopenWithColumnFamilies({"default", "pikachu"}, options); ReopenWithColumnFamilies({"default", "pikachu"}, options);
// Final reopen // Final reopen
options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal; options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 1; options.num_levels = 1;
options = CurrentOptions(options); options = CurrentOptions(options);
@ -5808,8 +5806,12 @@ TEST_F(DBTest, CompactionFilterWithValueChange) {
// push all files to lower levels // push all files to lower levels
ASSERT_OK(Flush(1)); ASSERT_OK(Flush(1));
dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); if (option_config_ != kUniversalCompactionMultiLevel) {
dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
} else {
dbfull()->CompactRange(handles_[1], nullptr, nullptr);
}
// re-write all data again // re-write all data again
for (int i = 0; i < 100001; i++) { for (int i = 0; i < 100001; i++) {
@ -5821,8 +5823,12 @@ TEST_F(DBTest, CompactionFilterWithValueChange) {
// push all files to lower levels. This should // push all files to lower levels. This should
// invoke the compaction filter for all 100000 keys. // invoke the compaction filter for all 100000 keys.
ASSERT_OK(Flush(1)); ASSERT_OK(Flush(1));
dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); if (option_config_ != kUniversalCompactionMultiLevel) {
dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
} else {
dbfull()->CompactRange(handles_[1], nullptr, nullptr);
}
// verify that all keys now have the new value that // verify that all keys now have the new value that
// was set by the compaction process. // was set by the compaction process.
@ -7145,11 +7151,15 @@ TEST_F(DBTest, DropWrites) {
env_->drop_writes_.store(true, std::memory_order_release); env_->drop_writes_.store(true, std::memory_order_release);
env_->sleep_counter_.Reset(); env_->sleep_counter_.Reset();
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
for (int level = 0; level < dbfull()->NumberLevels(); level++) { if (option_config_ != kUniversalCompactionMultiLevel) {
if (level > 0 && level == dbfull()->NumberLevels() - 1) { for (int level = 0; level < dbfull()->NumberLevels(); level++) {
break; if (level > 0 && level == dbfull()->NumberLevels() - 1) {
break;
}
dbfull()->TEST_CompactRange(level, nullptr, nullptr);
} }
dbfull()->TEST_CompactRange(level, nullptr, nullptr); } else {
dbfull()->CompactRange(nullptr, nullptr);
} }
} }

Loading…
Cancel
Save