Avoid double-compacting data in bottom level in manual compactions (#5138)

Summary:
Depending on the config, manual compaction (leveled compaction style) does following compactions:
L0->L1
L1->L2
...
Ln-1 -> Ln
Ln -> Ln
The final Ln -> Ln compaction is partly unnecessary as it recompacts all the files that were just generated by the Ln-1 -> Ln. We should avoid recompacting such files. This rule should be applied to Lmax only.
Resolves issue https://github.com/facebook/rocksdb/issues/4995
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5138

Differential Revision: D14940106

Pulled By: miasantreble

fbshipit-source-id: 8d3cf5507a17e76f3333cfd4bac5256d005636e5
main
Zhongyi Xie 6 years ago committed by Facebook Github Bot
parent d9280ff2d2
commit baa5302447
  1. 1
      HISTORY.md
  2. 9
      db/column_family.cc
  3. 5
      db/column_family.h
  4. 55
      db/compaction_picker.cc
  5. 11
      db/compaction_picker.h
  6. 5
      db/compaction_picker_fifo.cc
  7. 5
      db/compaction_picker_fifo.h
  8. 63
      db/db_compaction_test.cc
  9. 12
      db/db_impl.h
  10. 37
      db/db_impl_compaction_flush.cc
  11. 5
      db/db_impl_debug.cc
  12. 7
      db/db_range_del_test.cc
  13. 6
      db/db_sst_test.cc
  14. 8
      db/db_test2.cc
  15. 2
      db/listener_test.cc
  16. 3
      include/rocksdb/options.h
  17. 40
      java/rocksjni/portal.h
  18. 3
      tools/db_bench_tool.cc
  19. 2
      tools/ldb_cmd.cc
  20. 2
      utilities/option_change_migration/option_change_migration.cc

@ -9,6 +9,7 @@
### Public API Change ### Public API Change
* Change the behavior of OptimizeForPointLookup(): move away from hash-based block-based-table index, and use whole key memtable filtering. * Change the behavior of OptimizeForPointLookup(): move away from hash-based block-based-table index, and use whole key memtable filtering.
* Change the behavior of OptimizeForSmallDb(): use a 16MB block cache, put index and filter blocks into it, and cost the memtable size to it. DBOptions.OptimizeForSmallDb() and ColumnFamilyOptions.OptimizeForSmallDb() start to take an optional cache object. * Change the behavior of OptimizeForSmallDb(): use a 16MB block cache, put index and filter blocks into it, and cost the memtable size to it. DBOptions.OptimizeForSmallDb() and ColumnFamilyOptions.OptimizeForSmallDb() start to take an optional cache object.
* Added BottommostLevelCompaction::kForceOptimized to avoid double compacting newly compacted files in bottom level compaction of manual compaction.
### Bug Fixes ### Bug Fixes
* Fix a bug in 2PC where a sequence of txn prepare, memtable flush, and crash could result in losing the prepared transaction. * Fix a bug in 2PC where a sequence of txn prepare, memtable flush, and crash could result in losing the prepared transaction.

@ -989,13 +989,14 @@ const int ColumnFamilyData::kCompactToBaseLevel = -2;
Compaction* ColumnFamilyData::CompactRange( Compaction* ColumnFamilyData::CompactRange(
const MutableCFOptions& mutable_cf_options, int input_level, const MutableCFOptions& mutable_cf_options, int input_level,
int output_level, uint32_t output_path_id, uint32_t max_subcompactions, int output_level, const CompactRangeOptions& compact_range_options,
const InternalKey* begin, const InternalKey* end, const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end, bool* conflict) { InternalKey** compaction_end, bool* conflict,
uint64_t max_file_num_to_ignore) {
auto* result = compaction_picker_->CompactRange( auto* result = compaction_picker_->CompactRange(
GetName(), mutable_cf_options, current_->storage_info(), input_level, GetName(), mutable_cf_options, current_->storage_info(), input_level,
output_level, output_path_id, max_subcompactions, begin, end, output_level, compact_range_options, begin, end, compaction_end, conflict,
compaction_end, conflict); max_file_num_to_ignore);
if (result != nullptr) { if (result != nullptr) {
result->SetInputVersion(current_); result->SetInputVersion(current_);
} }

@ -296,9 +296,10 @@ class ColumnFamilyData {
// REQUIRES: DB mutex held // REQUIRES: DB mutex held
Compaction* CompactRange(const MutableCFOptions& mutable_cf_options, Compaction* CompactRange(const MutableCFOptions& mutable_cf_options,
int input_level, int output_level, int input_level, int output_level,
uint32_t output_path_id, uint32_t max_subcompactions, const CompactRangeOptions& compact_range_options,
const InternalKey* begin, const InternalKey* end, const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end, bool* manual_conflict); InternalKey** compaction_end, bool* manual_conflict,
uint64_t max_file_num_to_ignore);
CompactionPicker* compaction_picker() { return compaction_picker_.get(); } CompactionPicker* compaction_picker() { return compaction_picker_.get(); }
// thread-safe // thread-safe

@ -543,9 +543,9 @@ void CompactionPicker::GetGrandparents(
Compaction* CompactionPicker::CompactRange( Compaction* CompactionPicker::CompactRange(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, int input_level, int output_level, VersionStorageInfo* vstorage, int input_level, int output_level,
uint32_t output_path_id, uint32_t max_subcompactions, const CompactRangeOptions& compact_range_options, const InternalKey* begin,
const InternalKey* begin, const InternalKey* end, const InternalKey* end, InternalKey** compaction_end, bool* manual_conflict,
InternalKey** compaction_end, bool* manual_conflict) { uint64_t max_file_num_to_ignore) {
// CompactionPickerFIFO has its own implementation of compact range // CompactionPickerFIFO has its own implementation of compact range
assert(ioptions_.compaction_style != kCompactionStyleFIFO); assert(ioptions_.compaction_style != kCompactionStyleFIFO);
@ -609,11 +609,13 @@ Compaction* CompactionPicker::CompactRange(
output_level, output_level,
MaxFileSizeForLevel(mutable_cf_options, output_level, MaxFileSizeForLevel(mutable_cf_options, output_level,
ioptions_.compaction_style), ioptions_.compaction_style),
/* max_compaction_bytes */ LLONG_MAX, output_path_id, /* max_compaction_bytes */ LLONG_MAX,
compact_range_options.target_path_id,
GetCompressionType(ioptions_, vstorage, mutable_cf_options, GetCompressionType(ioptions_, vstorage, mutable_cf_options,
output_level, 1), output_level, 1),
GetCompressionOptions(ioptions_, vstorage, output_level), GetCompressionOptions(ioptions_, vstorage, output_level),
max_subcompactions, /* grandparents */ {}, /* is manual */ true); compact_range_options.max_subcompactions, /* grandparents */ {},
/* is manual */ true);
RegisterCompaction(c); RegisterCompaction(c);
return c; return c;
} }
@ -658,7 +660,43 @@ Compaction* CompactionPicker::CompactRange(
} }
} }
} }
assert(output_path_id < static_cast<uint32_t>(ioptions_.cf_paths.size())); assert(compact_range_options.target_path_id <
static_cast<uint32_t>(ioptions_.cf_paths.size()));
// for BOTTOM LEVEL compaction only, use max_file_num_to_ignore to filter out
// files that are created during the current compaction.
if (compact_range_options.bottommost_level_compaction ==
BottommostLevelCompaction::kForceOptimized &&
max_file_num_to_ignore != port::kMaxUint64) {
assert(input_level == output_level);
// inputs_shrunk holds a continuous subset of input files which were all
// created before the current manual compaction
std::vector<FileMetaData*> inputs_shrunk;
size_t skip_input_index = inputs.size();
for (size_t i = 0; i < inputs.size(); ++i) {
if (inputs[i]->fd.GetNumber() < max_file_num_to_ignore) {
inputs_shrunk.push_back(inputs[i]);
} else if (!inputs_shrunk.empty()) {
// inputs[i] was created during the current manual compaction and
// need to be skipped
skip_input_index = i;
break;
}
}
if (inputs_shrunk.empty()) {
return nullptr;
}
if (inputs.size() != inputs_shrunk.size()) {
inputs.files.swap(inputs_shrunk);
}
// set covering_the_whole_range to false if there is any file that need to
// be compacted in the range of inputs[skip_input_index+1, inputs.size())
for (size_t i = skip_input_index + 1; i < inputs.size(); ++i) {
if (inputs[i]->fd.GetNumber() < max_file_num_to_ignore) {
covering_the_whole_range = false;
}
}
}
InternalKey key_storage; InternalKey key_storage;
InternalKey* next_smallest = &key_storage; InternalKey* next_smallest = &key_storage;
@ -724,11 +762,12 @@ Compaction* CompactionPicker::CompactRange(
MaxFileSizeForLevel(mutable_cf_options, output_level, MaxFileSizeForLevel(mutable_cf_options, output_level,
ioptions_.compaction_style, vstorage->base_level(), ioptions_.compaction_style, vstorage->base_level(),
ioptions_.level_compaction_dynamic_level_bytes), ioptions_.level_compaction_dynamic_level_bytes),
mutable_cf_options.max_compaction_bytes, output_path_id, mutable_cf_options.max_compaction_bytes,
compact_range_options.target_path_id,
GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level, GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level,
vstorage->base_level()), vstorage->base_level()),
GetCompressionOptions(ioptions_, vstorage, output_level), GetCompressionOptions(ioptions_, vstorage, output_level),
max_subcompactions, std::move(grandparents), compact_range_options.max_subcompactions, std::move(grandparents),
/* is manual compaction */ true); /* is manual compaction */ true);
TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction); TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction);

@ -58,9 +58,10 @@ class CompactionPicker {
virtual Compaction* CompactRange( virtual Compaction* CompactRange(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, int input_level, int output_level, VersionStorageInfo* vstorage, int input_level, int output_level,
uint32_t output_path_id, uint32_t max_subcompactions, const CompactRangeOptions& compact_range_options,
const InternalKey* begin, const InternalKey* end, const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end, bool* manual_conflict); InternalKey** compaction_end, bool* manual_conflict,
uint64_t max_file_num_to_ignore);
// The maximum allowed output level. Default value is NumberLevels() - 1. // The maximum allowed output level. Default value is NumberLevels() - 1.
virtual int MaxOutputLevel() const { return NumberLevels() - 1; } virtual int MaxOutputLevel() const { return NumberLevels() - 1; }
@ -255,12 +256,12 @@ class NullCompactionPicker : public CompactionPicker {
const MutableCFOptions& /*mutable_cf_options*/, const MutableCFOptions& /*mutable_cf_options*/,
VersionStorageInfo* /*vstorage*/, VersionStorageInfo* /*vstorage*/,
int /*input_level*/, int /*output_level*/, int /*input_level*/, int /*output_level*/,
uint32_t /*output_path_id*/, const CompactRangeOptions& /*compact_range_options*/,
uint32_t /*max_subcompactions*/,
const InternalKey* /*begin*/, const InternalKey* /*begin*/,
const InternalKey* /*end*/, const InternalKey* /*end*/,
InternalKey** /*compaction_end*/, InternalKey** /*compaction_end*/,
bool* /*manual_conflict*/) override { bool* /*manual_conflict*/,
uint64_t /*max_file_num_to_ignore*/) override {
return nullptr; return nullptr;
} }

@ -213,9 +213,10 @@ Compaction* FIFOCompactionPicker::PickCompaction(
Compaction* FIFOCompactionPicker::CompactRange( Compaction* FIFOCompactionPicker::CompactRange(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, int input_level, int output_level, VersionStorageInfo* vstorage, int input_level, int output_level,
uint32_t /*output_path_id*/, uint32_t /*max_subcompactions*/, const CompactRangeOptions& /*compact_range_options*/,
const InternalKey* /*begin*/, const InternalKey* /*end*/, const InternalKey* /*begin*/, const InternalKey* /*end*/,
InternalKey** compaction_end, bool* /*manual_conflict*/) { InternalKey** compaction_end, bool* /*manual_conflict*/,
uint64_t /*max_file_num_to_ignore*/) {
#ifdef NDEBUG #ifdef NDEBUG
(void)input_level; (void)input_level;
(void)output_level; (void)output_level;

@ -27,9 +27,10 @@ class FIFOCompactionPicker : public CompactionPicker {
virtual Compaction* CompactRange( virtual Compaction* CompactRange(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, int input_level, int output_level, VersionStorageInfo* vstorage, int input_level, int output_level,
uint32_t output_path_id, uint32_t max_subcompactions, const CompactRangeOptions& compact_range_options,
const InternalKey* begin, const InternalKey* end, const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end, bool* manual_conflict) override; InternalKey** compaction_end, bool* manual_conflict,
uint64_t max_file_num_to_ignore) override;
// The maximum allowed output level. Always returns 0. // The maximum allowed output level. Always returns 0.
virtual int MaxOutputLevel() const override { return 0; } virtual int MaxOutputLevel() const override { return 0; }

@ -353,7 +353,8 @@ TEST_P(DBCompactionTestWithParam, CompactionsPreserveDeletes) {
CompactRangeOptions cro; CompactRangeOptions cro;
cro.change_level = true; cro.change_level = true;
cro.target_level = 2; cro.target_level = 2;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; cro.bottommost_level_compaction =
BottommostLevelCompaction::kForceOptimized;
dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForFlushMemTable();
dbfull()->CompactRange(cro, nullptr, nullptr); dbfull()->CompactRange(cro, nullptr, nullptr);
@ -511,7 +512,7 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
CompactRangeOptions cro; CompactRangeOptions cro;
cro.change_level = true; cro.change_level = true;
cro.target_level = 2; cro.target_level = 2;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
db_->CompactRange(cro, nullptr, nullptr); db_->CompactRange(cro, nullptr, nullptr);
// Only verifying compaction outputs issues one table cache lookup // Only verifying compaction outputs issues one table cache lookup
// for both data block and range deletion block). // for both data block and range deletion block).
@ -2260,6 +2261,8 @@ TEST_P(DBCompactionTestWithParam, ConvertCompactionStyle) {
CompactRangeOptions compact_options; CompactRangeOptions compact_options;
compact_options.change_level = true; compact_options.change_level = true;
compact_options.target_level = 0; compact_options.target_level = 0;
// cannot use kForceOptimized here because the compaction here is expected
// to generate one output file
compact_options.bottommost_level_compaction = compact_options.bottommost_level_compaction =
BottommostLevelCompaction::kForce; BottommostLevelCompaction::kForce;
compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
@ -3039,7 +3042,7 @@ TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) {
// then compacte the bottommost level L3=>L3 (non trivial move) // then compacte the bottommost level L3=>L3 (non trivial move)
compact_options = CompactRangeOptions(); compact_options = CompactRangeOptions();
compact_options.bottommost_level_compaction = compact_options.bottommost_level_compaction =
BottommostLevelCompaction::kForce; BottommostLevelCompaction::kForceOptimized;
ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
ASSERT_EQ("0,0,0,1", FilesPerLevel(0)); ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
ASSERT_EQ(trivial_move, 4); ASSERT_EQ(trivial_move, 4);
@ -4378,7 +4381,7 @@ TEST_F(DBCompactionTest, PartialManualCompaction) {
{{"max_compaction_bytes", std::to_string(max_compaction_bytes)}})); {{"max_compaction_bytes", std::to_string(max_compaction_bytes)}}));
CompactRangeOptions cro; CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
dbfull()->CompactRange(cro, nullptr, nullptr); dbfull()->CompactRange(cro, nullptr, nullptr);
} }
@ -4422,6 +4425,58 @@ TEST_F(DBCompactionTest, ManualCompactionFailsInReadOnlyMode) {
Close(); Close();
} }
// ManualCompactionBottomLevelOptimization tests the bottom level manual
// compaction optimization to skip recompacting files created by Ln-1 to Ln
// compaction
TEST_F(DBCompactionTest, ManualCompactionBottomLevelOptimized) {
Options opts = CurrentOptions();
opts.num_levels = 3;
opts.level0_file_num_compaction_trigger = 5;
opts.compression = kNoCompression;
opts.merge_operator.reset(new NoopMergeOperator());
opts.target_file_size_base = 1024;
opts.max_bytes_for_level_multiplier = 2;
opts.disable_auto_compactions = true;
DestroyAndReopen(opts);
ColumnFamilyHandleImpl* cfh =
static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
ColumnFamilyData* cfd = cfh->cfd();
InternalStats* internal_stats_ptr = cfd->internal_stats();
ASSERT_NE(internal_stats_ptr, nullptr);
Random rnd(301);
for (auto i = 0; i < 8; ++i) {
for (auto j = 0; j < 10; ++j) {
ASSERT_OK(
Put("foo" + std::to_string(i * 10 + j), RandomString(&rnd, 1024)));
}
Flush();
}
MoveFilesToLevel(2);
for (auto i = 0; i < 8; ++i) {
for (auto j = 0; j < 10; ++j) {
ASSERT_OK(
Put("bar" + std::to_string(i * 10 + j), RandomString(&rnd, 1024)));
}
Flush();
}
const std::vector<InternalStats::CompactionStats>& comp_stats =
internal_stats_ptr->TEST_GetCompactionStats();
int num = comp_stats[2].num_input_files_in_output_level;
ASSERT_EQ(num, 0);
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
dbfull()->CompactRange(cro, nullptr, nullptr);
const std::vector<InternalStats::CompactionStats>& comp_stats2 =
internal_stats_ptr->TEST_GetCompactionStats();
num = comp_stats2[2].num_input_files_in_output_level;
ASSERT_EQ(num, 0);
}
// FixFileIngestionCompactionDeadlock tests and verifies that compaction and // FixFileIngestionCompactionDeadlock tests and verifies that compaction and
// file ingestion do not cause deadlock in the event of write stall triggered // file ingestion do not cause deadlock in the event of write stall triggered
// by number of L0 files reaching level0_stop_writes_trigger. // by number of L0 files reaching level0_stop_writes_trigger.

@ -395,11 +395,15 @@ class DBImpl : public DB {
virtual Status GetDbIdentity(std::string& identity) const override; virtual Status GetDbIdentity(std::string& identity) const override;
// max_file_num_to_ignore allows bottom level compaction to filter out newly
// compacted SST files. Setting max_file_num_to_ignore to kMaxUint64 will
// disable the filtering
Status RunManualCompaction(ColumnFamilyData* cfd, int input_level, Status RunManualCompaction(ColumnFamilyData* cfd, int input_level,
int output_level, uint32_t output_path_id, int output_level,
uint32_t max_subcompactions, const Slice* begin, const CompactRangeOptions& compact_range_options,
const Slice* end, bool exclusive, const Slice* begin, const Slice* end,
bool disallow_trivial_move = false); bool exclusive, bool disallow_trivial_move,
uint64_t max_file_num_to_ignore);
// Return an internal iterator over the current state of the database. // Return an internal iterator over the current state of the database.
// The keys of this iterator are internal keys (see format.h). // The keys of this iterator are internal keys (see format.h).

@ -684,6 +684,10 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
} }
int max_level_with_files = 0; int max_level_with_files = 0;
// max_file_num_to_ignore can be used to filter out newly created SST files,
// useful for bottom level compaction in a manual compaction
uint64_t max_file_num_to_ignore = port::kMaxUint64;
uint64_t next_file_number = port::kMaxUint64;
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
Version* base = cfd->current(); Version* base = cfd->current();
@ -693,9 +697,11 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
max_level_with_files = level; max_level_with_files = level;
} }
} }
next_file_number = versions_->current_next_file_number();
} }
int final_output_level = 0; int final_output_level = 0;
if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal && if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
cfd->NumberLevels() > 1) { cfd->NumberLevels() > 1) {
// Always compact all files together. // Always compact all files together.
@ -705,8 +711,8 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
final_output_level--; final_output_level--;
} }
s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels, s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
final_output_level, options.target_path_id, final_output_level, options, begin, end, exclusive,
options.max_subcompactions, begin, end, exclusive); false, max_file_num_to_ignore);
} else { } else {
for (int level = 0; level <= max_level_with_files; level++) { for (int level = 0; level <= max_level_with_files; level++) {
int output_level; int output_level;
@ -731,6 +737,10 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
continue; continue;
} }
output_level = level; output_level = level;
// update max_file_num_to_ignore only for bottom level compaction
// because data in newly compacted files in middle levels may still need
// to be pushed down
max_file_num_to_ignore = next_file_number;
} else { } else {
output_level = level + 1; output_level = level + 1;
if (cfd->ioptions()->compaction_style == kCompactionStyleLevel && if (cfd->ioptions()->compaction_style == kCompactionStyleLevel &&
@ -739,9 +749,8 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
output_level = ColumnFamilyData::kCompactToBaseLevel; output_level = ColumnFamilyData::kCompactToBaseLevel;
} }
} }
s = RunManualCompaction(cfd, level, output_level, options.target_path_id, s = RunManualCompaction(cfd, level, output_level, options, begin, end,
options.max_subcompactions, begin, end, exclusive, false, max_file_num_to_ignore);
exclusive);
if (!s.ok()) { if (!s.ok()) {
break; break;
} }
@ -1324,11 +1333,11 @@ Status DBImpl::Flush(const FlushOptions& flush_options,
return s; return s;
} }
Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, Status DBImpl::RunManualCompaction(
int output_level, uint32_t output_path_id, ColumnFamilyData* cfd, int input_level, int output_level,
uint32_t max_subcompactions, const CompactRangeOptions& compact_range_options, const Slice* begin,
const Slice* begin, const Slice* end, const Slice* end, bool exclusive, bool disallow_trivial_move,
bool exclusive, bool disallow_trivial_move) { uint64_t max_file_num_to_ignore) {
assert(input_level == ColumnFamilyData::kCompactAllLevels || assert(input_level == ColumnFamilyData::kCompactAllLevels ||
input_level >= 0); input_level >= 0);
@ -1341,7 +1350,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
manual.cfd = cfd; manual.cfd = cfd;
manual.input_level = input_level; manual.input_level = input_level;
manual.output_level = output_level; manual.output_level = output_level;
manual.output_path_id = output_path_id; manual.output_path_id = compact_range_options.target_path_id;
manual.done = false; manual.done = false;
manual.in_progress = false; manual.in_progress = false;
manual.incomplete = false; manual.incomplete = false;
@ -1416,9 +1425,9 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
(((manual.manual_end = &manual.tmp_storage1) != nullptr) && (((manual.manual_end = &manual.tmp_storage1) != nullptr) &&
((compaction = manual.cfd->CompactRange( ((compaction = manual.cfd->CompactRange(
*manual.cfd->GetLatestMutableCFOptions(), manual.input_level, *manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
manual.output_level, manual.output_path_id, max_subcompactions, manual.output_level, compact_range_options, manual.begin,
manual.begin, manual.end, &manual.manual_end, manual.end, &manual.manual_end, &manual_conflict,
&manual_conflict)) == nullptr && max_file_num_to_ignore)) == nullptr &&
manual_conflict))) { manual_conflict))) {
// exclusive manual compactions should not see a conflict during // exclusive manual compactions should not see a conflict during
// CompactRange // CompactRange

@ -93,8 +93,9 @@ Status DBImpl::TEST_CompactRange(int level, const Slice* begin,
cfd->ioptions()->compaction_style == kCompactionStyleFIFO) cfd->ioptions()->compaction_style == kCompactionStyleFIFO)
? level ? level
: level + 1; : level + 1;
return RunManualCompaction(cfd, level, output_level, 0, 0, begin, end, true, return RunManualCompaction(cfd, level, output_level, CompactRangeOptions(),
disallow_trivial_move); begin, end, true, disallow_trivial_move,
port::kMaxUint64 /*max_file_num_to_ignore*/);
} }
Status DBImpl::TEST_SwitchMemtable(ColumnFamilyData* cfd) { Status DBImpl::TEST_SwitchMemtable(ColumnFamilyData* cfd) {

@ -438,9 +438,10 @@ TEST_F(DBRangeDelTest, ValidUniversalSubcompactionBoundaries) {
ASSERT_OK(dbfull()->RunManualCompaction( ASSERT_OK(dbfull()->RunManualCompaction(
reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily()) reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
->cfd(), ->cfd(),
1 /* input_level */, 2 /* output_level */, 0 /* output_path_id */, 1 /* input_level */, 2 /* output_level */, CompactRangeOptions(),
0 /* max_subcompactions */, nullptr /* begin */, nullptr /* end */, nullptr /* begin */, nullptr /* end */, true /* exclusive */,
true /* exclusive */, true /* disallow_trivial_move */)); true /* disallow_trivial_move */,
port::kMaxUint64 /* max_file_num_to_ignore */));
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -457,7 +457,9 @@ TEST_F(DBSSTTest, RateLimitedWALDelete) {
ASSERT_EQ("4", FilesPerLevel(0)); ASSERT_EQ("4", FilesPerLevel(0));
// Compaction will move the 4 files in L0 to trash and create 1 L1 file // Compaction will move the 4 files in L0 to trash and create 1 L1 file
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ASSERT_OK(dbfull()->TEST_WaitForCompact(true)); ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
ASSERT_EQ("0,1", FilesPerLevel(0)); ASSERT_EQ("0,1", FilesPerLevel(0));
@ -563,7 +565,7 @@ TEST_F(DBSSTTest, DeleteSchedulerMultipleDBPaths) {
// Compaction will delete both files and regenerate a file in L1 in second // Compaction will delete both files and regenerate a file in L1 in second
// db path. The deleted files should still be cleaned up via delete scheduler. // db path. The deleted files should still be cleaned up via delete scheduler.
compact_options.bottommost_level_compaction = compact_options.bottommost_level_compaction =
BottommostLevelCompaction::kForce; BottommostLevelCompaction::kForceOptimized;
ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
ASSERT_EQ("0,1", FilesPerLevel(0)); ASSERT_EQ("0,1", FilesPerLevel(0));

@ -1194,7 +1194,7 @@ TEST_F(DBTest2, PresetCompressionDictLocality) {
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
CompactRangeOptions compact_range_opts; CompactRangeOptions compact_range_opts;
compact_range_opts.bottommost_level_compaction = compact_range_opts.bottommost_level_compaction =
BottommostLevelCompaction::kForce; BottommostLevelCompaction::kForceOptimized;
ASSERT_OK(db_->CompactRange(compact_range_opts, nullptr, nullptr)); ASSERT_OK(db_->CompactRange(compact_range_opts, nullptr, nullptr));
// Dictionary compression should not be so good as to compress four totally // Dictionary compression should not be so good as to compress four totally
@ -1712,7 +1712,7 @@ TEST_F(DBTest2, MaxCompactionBytesTest) {
GenerateNewRandomFile(&rnd); GenerateNewRandomFile(&rnd);
} }
CompactRangeOptions cro; CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ASSERT_EQ("0,0,8", FilesPerLevel(0)); ASSERT_EQ("0,0,8", FilesPerLevel(0));
@ -2303,7 +2303,7 @@ TEST_F(DBTest2, AutomaticCompactionOverlapManualCompaction) {
// Run a manual compaction that will compact the 2 files in L2 // Run a manual compaction that will compact the 2 files in L2
// into 1 file in L2 // into 1 file in L2
cro.exclusive_manual_compaction = false; cro.exclusive_manual_compaction = false;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
@ -2377,7 +2377,7 @@ TEST_F(DBTest2, ManualCompactionOverlapManualCompaction) {
// into 1 file in L1 // into 1 file in L1
CompactRangeOptions cro; CompactRangeOptions cro;
cro.exclusive_manual_compaction = false; cro.exclusive_manual_compaction = false;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
bg_thread.join(); bg_thread.join();

@ -492,7 +492,7 @@ TEST_F(EventListenerTest, CompactionReasonLevel) {
Put("key", "value"); Put("key", "value");
CompactRangeOptions cro; CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ASSERT_GT(listener->compaction_reasons_.size(), 0); ASSERT_GT(listener->compaction_reasons_.size(), 0);
for (auto compaction_reason : listener->compaction_reasons_) { for (auto compaction_reason : listener->compaction_reasons_) {

@ -1301,6 +1301,9 @@ enum class BottommostLevelCompaction {
kIfHaveCompactionFilter, kIfHaveCompactionFilter,
// Always compact bottommost level // Always compact bottommost level
kForce, kForce,
// Always compact bottommost level but in bottommost level avoid
// double-compacting files created in the same compaction
kForceOptimized,
}; };
// CompactRangeOptions is used by CompactRange() call. // CompactRangeOptions is used by CompactRange() call.

@ -1360,9 +1360,9 @@ class JniUtil {
public: public:
/** /**
* Detect if jlong overflows size_t * Detect if jlong overflows size_t
* *
* @param jvalue the jlong value * @param jvalue the jlong value
* *
* @return * @return
*/ */
inline static Status check_if_jlong_fits_size_t(const jlong& jvalue) { inline static Status check_if_jlong_fits_size_t(const jlong& jvalue) {
@ -1588,8 +1588,8 @@ class JniUtil {
* @param bytes The bytes to copy * @param bytes The bytes to copy
* *
* @return the Java byte[], or nullptr if an exception occurs * @return the Java byte[], or nullptr if an exception occurs
* *
* @throws RocksDBException thrown * @throws RocksDBException thrown
* if memory size to copy exceeds general java specific array size limitation. * if memory size to copy exceeds general java specific array size limitation.
*/ */
static jbyteArray copyBytes(JNIEnv* env, std::string bytes) { static jbyteArray copyBytes(JNIEnv* env, std::string bytes) {
@ -1827,7 +1827,7 @@ class JniUtil {
return env->NewStringUTF(string->c_str()); return env->NewStringUTF(string->c_str());
} }
/** /**
* Copies bytes to a new jByteArray with the check of java array size limitation. * Copies bytes to a new jByteArray with the check of java array size limitation.
* *
@ -1835,29 +1835,29 @@ class JniUtil {
* @param size number of bytes to copy * @param size number of bytes to copy
* *
* @return the Java byte[], or nullptr if an exception occurs * @return the Java byte[], or nullptr if an exception occurs
* *
* @throws RocksDBException thrown * @throws RocksDBException thrown
* if memory size to copy exceeds general java array size limitation to avoid overflow. * if memory size to copy exceeds general java array size limitation to avoid overflow.
*/ */
static jbyteArray createJavaByteArrayWithSizeCheck(JNIEnv* env, const char* bytes, const size_t size) { static jbyteArray createJavaByteArrayWithSizeCheck(JNIEnv* env, const char* bytes, const size_t size) {
// Limitation for java array size is vm specific // Limitation for java array size is vm specific
// In general it cannot exceed Integer.MAX_VALUE (2^31 - 1) // In general it cannot exceed Integer.MAX_VALUE (2^31 - 1)
// Current HotSpot VM limitation for array size is Integer.MAX_VALUE - 5 (2^31 - 1 - 5) // Current HotSpot VM limitation for array size is Integer.MAX_VALUE - 5 (2^31 - 1 - 5)
// It means that the next call to env->NewByteArray can still end with // It means that the next call to env->NewByteArray can still end with
// OutOfMemoryError("Requested array size exceeds VM limit") coming from VM // OutOfMemoryError("Requested array size exceeds VM limit") coming from VM
static const size_t MAX_JARRAY_SIZE = (static_cast<size_t>(1)) << 31; static const size_t MAX_JARRAY_SIZE = (static_cast<size_t>(1)) << 31;
if(size > MAX_JARRAY_SIZE) { if(size > MAX_JARRAY_SIZE) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, "Requested array size exceeds VM limit"); rocksdb::RocksDBExceptionJni::ThrowNew(env, "Requested array size exceeds VM limit");
return nullptr; return nullptr;
} }
const jsize jlen = static_cast<jsize>(size); const jsize jlen = static_cast<jsize>(size);
jbyteArray jbytes = env->NewByteArray(jlen); jbyteArray jbytes = env->NewByteArray(jlen);
if(jbytes == nullptr) { if(jbytes == nullptr) {
// exception thrown: OutOfMemoryError // exception thrown: OutOfMemoryError
return nullptr; return nullptr;
} }
env->SetByteArrayRegion(jbytes, 0, jlen, env->SetByteArrayRegion(jbytes, 0, jlen,
const_cast<jbyte*>(reinterpret_cast<const jbyte*>(bytes))); const_cast<jbyte*>(reinterpret_cast<const jbyte*>(bytes)));
if(env->ExceptionCheck()) { if(env->ExceptionCheck()) {
@ -1876,8 +1876,8 @@ class JniUtil {
* @param bytes The bytes to copy * @param bytes The bytes to copy
* *
* @return the Java byte[] or nullptr if an exception occurs * @return the Java byte[] or nullptr if an exception occurs
* *
* @throws RocksDBException thrown * @throws RocksDBException thrown
* if memory size to copy exceeds general java specific array size limitation. * if memory size to copy exceeds general java specific array size limitation.
*/ */
static jbyteArray copyBytes(JNIEnv* env, const Slice& bytes) { static jbyteArray copyBytes(JNIEnv* env, const Slice& bytes) {
@ -2007,13 +2007,13 @@ class JniUtil {
/** /**
* Creates a vector<T*> of C++ pointers from * Creates a vector<T*> of C++ pointers from
* a Java array of C++ pointer addresses. * a Java array of C++ pointer addresses.
* *
* @param env (IN) A pointer to the java environment * @param env (IN) A pointer to the java environment
* @param pointers (IN) A Java array of C++ pointer addresses * @param pointers (IN) A Java array of C++ pointer addresses
* @param has_exception (OUT) will be set to JNI_TRUE * @param has_exception (OUT) will be set to JNI_TRUE
* if an ArrayIndexOutOfBoundsException or OutOfMemoryError * if an ArrayIndexOutOfBoundsException or OutOfMemoryError
* exception occurs. * exception occurs.
* *
* @return A vector of C++ pointers. * @return A vector of C++ pointers.
*/ */
template<typename T> static std::vector<T*> fromJPointers( template<typename T> static std::vector<T*> fromJPointers(
@ -2037,13 +2037,13 @@ class JniUtil {
/** /**
* Creates a Java array of C++ pointer addresses * Creates a Java array of C++ pointer addresses
* from a vector of C++ pointers. * from a vector of C++ pointers.
* *
* @param env (IN) A pointer to the java environment * @param env (IN) A pointer to the java environment
* @param pointers (IN) A vector of C++ pointers * @param pointers (IN) A vector of C++ pointers
* @param has_exception (OUT) will be set to JNI_TRUE * @param has_exception (OUT) will be set to JNI_TRUE
* if an ArrayIndexOutOfBoundsException or OutOfMemoryError * if an ArrayIndexOutOfBoundsException or OutOfMemoryError
* exception occurs * exception occurs
* *
* @return Java array of C++ pointer addresses. * @return Java array of C++ pointer addresses.
*/ */
template<typename T> static jlongArray toJPointers(JNIEnv* env, template<typename T> static jlongArray toJPointers(JNIEnv* env,
@ -4084,6 +4084,8 @@ class BottommostLevelCompactionJni {
return 0x1; return 0x1;
case rocksdb::BottommostLevelCompaction::kForce: case rocksdb::BottommostLevelCompaction::kForce:
return 0x2; return 0x2;
case rocksdb::BottommostLevelCompaction::kForceOptimized:
return 0x3;
default: default:
return 0x7F; // undefined return 0x7F; // undefined
} }
@ -4100,6 +4102,8 @@ class BottommostLevelCompactionJni {
return rocksdb::BottommostLevelCompaction::kIfHaveCompactionFilter; return rocksdb::BottommostLevelCompaction::kIfHaveCompactionFilter;
case 0x2: case 0x2:
return rocksdb::BottommostLevelCompaction::kForce; return rocksdb::BottommostLevelCompaction::kForce;
case 0x3:
return rocksdb::BottommostLevelCompaction::kForceOptimized;
default: default:
// undefined/default // undefined/default
return rocksdb::BottommostLevelCompaction::kIfHaveCompactionFilter; return rocksdb::BottommostLevelCompaction::kIfHaveCompactionFilter;
@ -5670,7 +5674,7 @@ class TablePropertiesJni : public JavaClass {
env->DeleteLocalRef(jmerge_operator_name); env->DeleteLocalRef(jmerge_operator_name);
return nullptr; return nullptr;
} }
jstring jproperty_collectors_names = rocksdb::JniUtil::toJavaString(env, &table_properties.property_collectors_names, true); jstring jproperty_collectors_names = rocksdb::JniUtil::toJavaString(env, &table_properties.property_collectors_names, true);
if (env->ExceptionCheck()) { if (env->ExceptionCheck()) {
// exception occurred creating java string // exception occurred creating java string

@ -6093,7 +6093,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
void Compact(ThreadState* thread) { void Compact(ThreadState* thread) {
DB* db = SelectDB(thread); DB* db = SelectDB(thread);
CompactRangeOptions cro; CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; cro.bottommost_level_compaction =
BottommostLevelCompaction::kForceOptimized;
db->CompactRange(cro, nullptr, nullptr); db->CompactRange(cro, nullptr, nullptr);
} }

@ -849,7 +849,7 @@ void CompactorCommand::DoCommand() {
} }
CompactRangeOptions cro; CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
db_->CompactRange(cro, GetCfHandle(), begin, end); db_->CompactRange(cro, GetCfHandle(), begin, end);
exec_state_ = LDBCommandExecuteResult::Succeed(""); exec_state_ = LDBCommandExecuteResult::Succeed("");

@ -56,6 +56,8 @@ Status CompactToLevel(const Options& options, const std::string& dbname,
cro.change_level = true; cro.change_level = true;
cro.target_level = dest_level; cro.target_level = dest_level;
if (dest_level == 0) { if (dest_level == 0) {
// cannot use kForceOptimized because the compaction is expected to
// generate one output file
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
} }
db->CompactRange(cro, nullptr, nullptr); db->CompactRange(cro, nullptr, nullptr);

Loading…
Cancel
Save