Don't let flushes preempt compactions

Summary:
When we first started, max_background_flushes was 0 by default and compaction thread was executing flushes (since there was no flush thread). Then, we switched the default max_background_flushes to 1. However, we still support the case where there is no flush thread and flushes are done in compaction. This is making our code a bit more complicated. By not supporting this use-case we can make our code simpler.

We have a special case that when you set max_background_flushes to 0, we
schedule the flush to execute on the compaction thread.

Test Plan: make check (there might be some unit tests that depend on this behavior)

Reviewers: IslamAbdelRahman, yhchiang, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D41931
main
Igor Canadi 10 years ago
parent 79373c372d
commit 35ca59364c
  1. 1
      Makefile
  2. 6
      db/c.cc
  3. 3
      db/column_family.cc
  4. 1
      db/column_family_test.cc
  5. 9
      db/compaction_job.cc
  6. 4
      db/compaction_job.h
  7. 5
      db/compaction_job_stats_test.cc
  8. 27
      db/compaction_job_test.cc
  9. 14
      db/corruption_test.cc
  10. 1
      db/cuckoo_table_db_test.cc
  11. 5
      db/db_compaction_filter_test.cc
  12. 101
      db/db_impl.cc
  13. 7
      db/db_impl.h
  14. 118
      db/db_test.cc
  15. 5
      db/deletefile_test.cc
  16. 20
      db/flush_job.cc
  17. 1
      db/plain_table_db_test.cc
  18. 32
      db/version_set.cc
  19. 6
      db/version_set.h
  20. 9
      include/rocksdb/options.h
  21. 18
      java/rocksjni/options.cc
  22. 1
      tools/db_stress.cc
  23. 73
      tools/reduce_levels_test.cc
  24. 11
      util/db_test_util.cc
  25. 2
      util/db_test_util.h
  26. 1
      util/ldb_cmd.cc
  27. 2
      util/mutable_cf_options.cc
  28. 3
      util/mutable_cf_options.h
  29. 4
      util/options.cc
  30. 2
      util/options_helper.cc
  31. 2
      util/options_test.cc
  32. 1
      utilities/ttl/ttl_test.cc

@ -221,6 +221,7 @@ TESTS = \
db_test \ db_test \
db_iter_test \ db_iter_test \
db_log_iter_test \ db_log_iter_test \
db_compaction_filter_test \
db_dynamic_level_test \ db_dynamic_level_test \
db_tailing_iter_test \ db_tailing_iter_test \
block_hash_index_test \ block_hash_index_test \

@ -1509,10 +1509,8 @@ void rocksdb_options_set_level0_stop_writes_trigger(
opt->rep.level0_stop_writes_trigger = n; opt->rep.level0_stop_writes_trigger = n;
} }
void rocksdb_options_set_max_mem_compaction_level( void rocksdb_options_set_max_mem_compaction_level(rocksdb_options_t* opt,
rocksdb_options_t* opt, int n) { int n) {}
opt->rep.max_mem_compaction_level = n;
}
void rocksdb_options_set_compression(rocksdb_options_t* opt, int t) { void rocksdb_options_set_compression(rocksdb_options_t* opt, int t) {
opt->rep.compression = static_cast<CompressionType>(t); opt->rep.compression = static_cast<CompressionType>(t);

@ -138,9 +138,6 @@ ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
result.num_levels < 2) { result.num_levels < 2) {
result.num_levels = 2; result.num_levels = 2;
} }
if (result.max_mem_compaction_level >= result.num_levels) {
result.max_mem_compaction_level = result.num_levels - 1;
}
if (result.max_write_buffer_number < 2) { if (result.max_write_buffer_number < 2) {
result.max_write_buffer_number = 2; result.max_write_buffer_number = 2;
} }

@ -875,7 +875,6 @@ TEST_F(ColumnFamilyTest, DifferentCompactionStyles) {
two.compaction_style = kCompactionStyleLevel; two.compaction_style = kCompactionStyleLevel;
two.num_levels = 4; two.num_levels = 4;
two.max_mem_compaction_level = 0;
two.level0_file_num_compaction_trigger = 3; two.level0_file_num_compaction_trigger = 3;
two.write_buffer_size = 100000; two.write_buffer_size = 100000;

@ -94,8 +94,7 @@ CompactionJob::CompactionJob(
std::atomic<bool>* shutting_down, LogBuffer* log_buffer, std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
Directory* db_directory, Directory* output_directory, Statistics* stats, Directory* db_directory, Directory* output_directory, Statistics* stats,
std::vector<SequenceNumber> existing_snapshots, std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache, std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
std::function<uint64_t()> yield_callback, EventLogger* event_logger,
bool paranoid_file_checks, const std::string& dbname, bool paranoid_file_checks, const std::string& dbname,
CompactionJobStats* compaction_job_stats) CompactionJobStats* compaction_job_stats)
: job_id_(job_id), : job_id_(job_id),
@ -114,7 +113,6 @@ CompactionJob::CompactionJob(
stats_(stats), stats_(stats),
existing_snapshots_(std::move(existing_snapshots)), existing_snapshots_(std::move(existing_snapshots)),
table_cache_(std::move(table_cache)), table_cache_(std::move(table_cache)),
yield_callback_(std::move(yield_callback)),
event_logger_(event_logger), event_logger_(event_logger),
paranoid_file_checks_(paranoid_file_checks) { paranoid_file_checks_(paranoid_file_checks) {
assert(log_buffer_ != nullptr); assert(log_buffer_ != nullptr);
@ -356,11 +354,6 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
RecordCompactionIOStats(); RecordCompactionIOStats();
loop_cnt = 0; loop_cnt = 0;
} }
// FLUSH preempts compaction
// TODO(icanadi) this currently only checks if flush is necessary on
// compacting column family. we should also check if flush is necessary
// on other column families, too
(*imm_micros) += yield_callback_();
Slice key = input->key(); Slice key = input->key();
Slice value = input->value(); Slice value = input->value();

@ -58,7 +58,6 @@ class CompactionJob {
Statistics* stats, Statistics* stats,
std::vector<SequenceNumber> existing_snapshots, std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache, std::shared_ptr<Cache> table_cache,
std::function<uint64_t()> yield_callback,
EventLogger* event_logger, bool paranoid_file_checks, EventLogger* event_logger, bool paranoid_file_checks,
const std::string& dbname, const std::string& dbname,
CompactionJobStats* compaction_job_stats); CompactionJobStats* compaction_job_stats);
@ -145,9 +144,6 @@ class CompactionJob {
std::vector<SequenceNumber> existing_snapshots_; std::vector<SequenceNumber> existing_snapshots_;
std::shared_ptr<Cache> table_cache_; std::shared_ptr<Cache> table_cache_;
// yield callback
std::function<uint64_t()> yield_callback_;
EventLogger* event_logger_; EventLogger* event_logger_;
bool paranoid_file_checks_; bool paranoid_file_checks_;

@ -624,7 +624,6 @@ TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) {
options.listeners.emplace_back(stats_checker); options.listeners.emplace_back(stats_checker);
options.create_if_missing = true; options.create_if_missing = true;
options.max_background_flushes = 0; options.max_background_flushes = 0;
options.max_mem_compaction_level = 0;
// just enough setting to hold off auto-compaction. // just enough setting to hold off auto-compaction.
options.level0_file_num_compaction_trigger = kTestScale + 1; options.level0_file_num_compaction_trigger = kTestScale + 1;
options.num_levels = 3; options.num_levels = 3;
@ -776,7 +775,6 @@ TEST_F(CompactionJobStatsTest, DeletionStatsTest) {
options.listeners.emplace_back(stats_checker); options.listeners.emplace_back(stats_checker);
options.create_if_missing = true; options.create_if_missing = true;
options.max_background_flushes = 0; options.max_background_flushes = 0;
options.max_mem_compaction_level = 0;
options.level0_file_num_compaction_trigger = kTestScale+1; options.level0_file_num_compaction_trigger = kTestScale+1;
options.num_levels = 3; options.num_levels = 3;
options.compression = kNoCompression; options.compression = kNoCompression;
@ -865,8 +863,6 @@ TEST_F(CompactionJobStatsTest, UniversalCompactionTest) {
Options options; Options options;
options.listeners.emplace_back(stats_checker); options.listeners.emplace_back(stats_checker);
options.create_if_missing = true; options.create_if_missing = true;
options.max_background_flushes = 0;
options.max_mem_compaction_level = 0;
options.num_levels = 3; options.num_levels = 3;
options.compression = kNoCompression; options.compression = kNoCompression;
options.level0_file_num_compaction_trigger = 2; options.level0_file_num_compaction_trigger = 2;
@ -917,6 +913,7 @@ TEST_F(CompactionJobStatsTest, UniversalCompactionTest) {
kKeySize, kValueSize, key_interval, kKeySize, kValueSize, key_interval,
compression_ratio, 1); compression_ratio, 1);
} }
reinterpret_cast<DBImpl*>(db_)->TEST_WaitForCompact();
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U); ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
} }

@ -177,8 +177,7 @@ class CompactionJobTest : public testing::Test {
s = SetCurrentFile(env_, dbname_, 1, nullptr); s = SetCurrentFile(env_, dbname_, 1, nullptr);
} }
void RunCompaction(std::function<uint64_t()> yield_callback, void RunCompaction(const std::vector<FileMetaData*>& files) {
const std::vector<FileMetaData*>& files) {
auto cfd = versions_->GetColumnFamilySet()->GetDefault(); auto cfd = versions_->GetColumnFamilySet()->GetDefault();
CompactionInputFiles compaction_input_files; CompactionInputFiles compaction_input_files;
@ -196,11 +195,10 @@ class CompactionJobTest : public testing::Test {
mutex_.Lock(); mutex_.Lock();
EventLogger event_logger(db_options_.info_log.get()); EventLogger event_logger(db_options_.info_log.get());
CompactionJobStats compaction_job_stats; CompactionJobStats compaction_job_stats;
CompactionJob compaction_job(0, &compaction, db_options_, env_options_, CompactionJob compaction_job(
versions_.get(), &shutting_down_, &log_buffer, 0, &compaction, db_options_, env_options_, versions_.get(),
nullptr, nullptr, nullptr, {}, table_cache_, &shutting_down_, &log_buffer, nullptr, nullptr, nullptr, {},
yield_callback, &event_logger, false, table_cache_, &event_logger, false, dbname_, &compaction_job_stats);
dbname_, &compaction_job_stats);
VerifyInitializationOfCompactionJobStats(compaction_job_stats); VerifyInitializationOfCompactionJobStats(compaction_job_stats);
@ -237,15 +235,8 @@ TEST_F(CompactionJobTest, Simple) {
auto files = cfd->current()->storage_info()->LevelFiles(0); auto files = cfd->current()->storage_info()->LevelFiles(0);
ASSERT_EQ(2U, files.size()); ASSERT_EQ(2U, files.size());
int yield_callback_called = 0; RunCompaction(files);
std::function<uint64_t()> yield_callback = [&]() {
yield_callback_called++;
return 0;
};
RunCompaction(std::move(yield_callback), files);
mock_table_factory_->AssertLatestFile(expected_results); mock_table_factory_->AssertLatestFile(expected_results);
ASSERT_EQ(yield_callback_called, 20000);
} }
TEST_F(CompactionJobTest, SimpleCorrupted) { TEST_F(CompactionJobTest, SimpleCorrupted) {
@ -253,11 +244,7 @@ TEST_F(CompactionJobTest, SimpleCorrupted) {
auto cfd = versions_->GetColumnFamilySet()->GetDefault(); auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto files = cfd->current()->storage_info()->LevelFiles(0); auto files = cfd->current()->storage_info()->LevelFiles(0);
std::function<uint64_t()> yield_callback = [&]() { RunCompaction(files);
return 0;
};
RunCompaction(std::move(yield_callback), files);
mock_table_factory_->AssertLatestFile(expected_results); mock_table_factory_->AssertLatestFile(expected_results);
} }

@ -351,13 +351,13 @@ TEST_F(CorruptionTest, CorruptedDescriptor) {
TEST_F(CorruptionTest, CompactionInputError) { TEST_F(CorruptionTest, CompactionInputError) {
Options options; Options options;
options.max_background_flushes = 0;
Reopen(&options); Reopen(&options);
Build(10); Build(10);
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
dbi->TEST_FlushMemTable(); dbi->TEST_FlushMemTable();
const int last = dbi->MaxMemCompactionLevel(); dbi->TEST_CompactRange(0, nullptr, nullptr);
ASSERT_EQ(1, Property("rocksdb.num-files-at-level" + NumberToString(last))); dbi->TEST_CompactRange(1, nullptr, nullptr);
ASSERT_EQ(1, Property("rocksdb.num-files-at-level2"));
Corrupt(kTableFile, 100, 1); Corrupt(kTableFile, 100, 1);
Check(9, 9); Check(9, 9);
@ -372,18 +372,20 @@ TEST_F(CorruptionTest, CompactionInputErrorParanoid) {
options.paranoid_checks = true; options.paranoid_checks = true;
options.write_buffer_size = 131072; options.write_buffer_size = 131072;
options.max_write_buffer_number = 2; options.max_write_buffer_number = 2;
options.max_background_flushes = 0;
Reopen(&options); Reopen(&options);
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
// Fill levels >= 1 so memtable flush outputs to level 0 // Fill levels >= 1
for (int level = 1; level < dbi->NumberLevels(); level++) { for (int level = 1; level < dbi->NumberLevels(); level++) {
dbi->Put(WriteOptions(), "", "begin"); dbi->Put(WriteOptions(), "", "begin");
dbi->Put(WriteOptions(), "~", "end"); dbi->Put(WriteOptions(), "~", "end");
dbi->TEST_FlushMemTable(); dbi->TEST_FlushMemTable();
for (int comp_level = 0; comp_level < dbi->NumberLevels() - level;
++comp_level) {
dbi->TEST_CompactRange(comp_level, nullptr, nullptr);
}
} }
options.max_mem_compaction_level = 0;
Reopen(&options); Reopen(&options);
dbi = reinterpret_cast<DBImpl*>(db_); dbi = reinterpret_cast<DBImpl*>(db_);

@ -39,7 +39,6 @@ class CuckooTableDBTest : public testing::Test {
options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0, 3, true)); options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0, 3, true));
options.allow_mmap_reads = true; options.allow_mmap_reads = true;
options.create_if_missing = true; options.create_if_missing = true;
options.max_mem_compaction_level = 0;
return options; return options;
} }

@ -178,7 +178,6 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.max_open_files = -1; options.max_open_files = -1;
options.num_levels = 3; options.num_levels = 3;
options.max_mem_compaction_level = 0;
options.compaction_filter_factory = std::make_shared<KeepFilterFactory>(); options.compaction_filter_factory = std::make_shared<KeepFilterFactory>();
options = CurrentOptions(options); options = CurrentOptions(options);
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
@ -356,7 +355,6 @@ TEST_F(DBTestCompactionFilter, CompactionFilterWithValueChange) {
do { do {
Options options; Options options;
options.num_levels = 3; options.num_levels = 3;
options.max_mem_compaction_level = 0;
options.compaction_filter_factory = options.compaction_filter_factory =
std::make_shared<ChangeFilterFactory>(); std::make_shared<ChangeFilterFactory>();
options = CurrentOptions(options); options = CurrentOptions(options);
@ -425,7 +423,6 @@ TEST_F(DBTestCompactionFilter, CompactionFilterWithMergeOperator) {
options.create_if_missing = true; options.create_if_missing = true;
options.merge_operator = MergeOperators::CreateUInt64AddOperator(); options.merge_operator = MergeOperators::CreateUInt64AddOperator();
options.num_levels = 3; options.num_levels = 3;
options.max_mem_compaction_level = 0;
// Filter out keys with value is 2. // Filter out keys with value is 2.
options.compaction_filter_factory = options.compaction_filter_factory =
std::make_shared<ConditionalFilterFactory>(two); std::make_shared<ConditionalFilterFactory>(two);
@ -538,6 +535,8 @@ TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) {
} }
} }
} // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {
#if !(defined NDEBUG) || !defined(OS_WIN) #if !(defined NDEBUG) || !defined(OS_WIN)
rocksdb::port::InstallStackTraceHandler(); rocksdb::port::InstallStackTraceHandler();

@ -1621,17 +1621,12 @@ Status DBImpl::CompactFilesImpl(
// deletion compaction currently not allowed in CompactFiles. // deletion compaction currently not allowed in CompactFiles.
assert(!c->deletion_compaction()); assert(!c->deletion_compaction());
auto yield_callback = [&]() {
return CallFlushDuringCompaction(
c->column_family_data(), *c->mutable_cf_options(),
job_context, log_buffer);
};
assert(is_snapshot_supported_ || snapshots_.empty()); assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJob compaction_job( CompactionJob compaction_job(
job_context->job_id, c.get(), db_options_, env_options_, versions_.get(), job_context->job_id, c.get(), db_options_, env_options_, versions_.get(),
&shutting_down_, log_buffer, directories_.GetDbDir(), &shutting_down_, log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->output_path_id()), stats_, snapshots_.GetAll(), directories_.GetDataDir(c->output_path_id()), stats_, snapshots_.GetAll(),
table_cache_, std::move(yield_callback), &event_logger_, table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks, dbname_, c->mutable_cf_options()->paranoid_file_checks, dbname_,
nullptr); // Here we pass a nullptr for CompactionJobStats because nullptr); // Here we pass a nullptr for CompactionJobStats because
// CompactFiles does not trigger OnCompactionCompleted(), // CompactFiles does not trigger OnCompactionCompleted(),
@ -1896,10 +1891,7 @@ int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) {
} }
int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* column_family) { int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* column_family) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); return 0;
InstrumentedMutexLock l(&mutex_);
return cfh->cfd()->GetSuperVersion()->
mutable_cf_options.max_mem_compaction_level;
} }
int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) { int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
@ -2077,27 +2069,24 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this); env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this);
} }
// special case -- if max_background_flushes == 0, then schedule flush on a
// compaction thread
if (db_options_.max_background_flushes == 0) {
while (unscheduled_flushes_ > 0 &&
bg_flush_scheduled_ + bg_compaction_scheduled_ <
db_options_.max_background_compactions) {
unscheduled_flushes_--;
bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::LOW, this);
}
}
if (bg_manual_only_) { if (bg_manual_only_) {
// only manual compactions are allowed to run. don't schedule automatic // only manual compactions are allowed to run. don't schedule automatic
// compactions // compactions
return; return;
} }
if (db_options_.max_background_flushes == 0 &&
bg_compaction_scheduled_ < db_options_.max_background_compactions &&
unscheduled_flushes_ > 0) {
// special case where flush is executed by compaction thread
// (if max_background_flushes == 0).
// Compaction thread will execute all the flushes
unscheduled_flushes_ = 0;
if (unscheduled_compactions_ > 0) {
// bg compaction will execute one compaction
unscheduled_compactions_--;
}
bg_compaction_scheduled_++;
env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW, this);
}
while (bg_compaction_scheduled_ < db_options_.max_background_compactions && while (bg_compaction_scheduled_ < db_options_.max_background_compactions &&
unscheduled_compactions_ > 0) { unscheduled_compactions_ > 0) {
bg_compaction_scheduled_++; bg_compaction_scheduled_++;
@ -2398,35 +2387,6 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
return Status::OK(); return Status::OK();
} }
// If there are no flush threads, then compaction thread needs to execute the
// flushes
if (db_options_.max_background_flushes == 0) {
// BackgroundFlush() will only execute a single flush. We keep calling it as
// long as there's more flushes to be done
while (!flush_queue_.empty()) {
LogToBuffer(
log_buffer,
"BackgroundCompaction calling BackgroundFlush. flush slots available "
"%d, compaction slots available %d",
db_options_.max_background_flushes - bg_flush_scheduled_,
db_options_.max_background_compactions - bg_compaction_scheduled_);
auto flush_status =
BackgroundFlush(madeProgress, job_context, log_buffer);
// the second condition will be false when a column family is dropped. we
// don't want to fail compaction because of that (because it might be a
// different column family)
if (!flush_status.ok() && !flush_status.IsShutdownInProgress()) {
if (is_manual) {
manual_compaction_->status = flush_status;
manual_compaction_->done = true;
manual_compaction_->in_progress = false;
manual_compaction_ = nullptr;
}
return flush_status;
}
}
}
unique_ptr<Compaction> c; unique_ptr<Compaction> c;
InternalKey manual_end_storage; InternalKey manual_end_storage;
InternalKey* manual_end = &manual_end_storage; InternalKey* manual_end = &manual_end_storage;
@ -2595,18 +2555,13 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
ThreadStatusUtil::ResetThreadStatus(); ThreadStatusUtil::ResetThreadStatus();
} else { } else {
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial"); TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial");
auto yield_callback = [&]() {
return CallFlushDuringCompaction(c->column_family_data(),
*c->mutable_cf_options(), job_context,
log_buffer);
};
assert(is_snapshot_supported_ || snapshots_.empty()); assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJob compaction_job( CompactionJob compaction_job(
job_context->job_id, c.get(), db_options_, env_options_, job_context->job_id, c.get(), db_options_, env_options_,
versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->output_path_id()), stats_, directories_.GetDataDir(c->output_path_id()), stats_,
snapshots_.GetAll(), table_cache_, std::move(yield_callback), snapshots_.GetAll(), table_cache_, &event_logger_,
&event_logger_, c->mutable_cf_options()->paranoid_file_checks, dbname_, c->mutable_cf_options()->paranoid_file_checks, dbname_,
&compaction_job_stats); &compaction_job_stats);
compaction_job.Prepare(); compaction_job.Prepare();
@ -2683,30 +2638,6 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
return status; return status;
} }
uint64_t DBImpl::CallFlushDuringCompaction(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
JobContext* job_context, LogBuffer* log_buffer) {
if (db_options_.max_background_flushes > 0) {
// flush thread will take care of this
return 0;
}
if (cfd->imm()->imm_flush_needed.load(std::memory_order_relaxed)) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
if (cfd->imm()->IsFlushPending()) {
cfd->Ref();
FlushMemTableToOutputFile(cfd, mutable_cf_options, nullptr, job_context,
log_buffer);
cfd->Unref();
bg_cv_.SignalAll(); // Wakeup DelayWrite() if necessary
}
mutex_.Unlock();
log_buffer->FlushBufferToLog();
return env_->NowMicros() - imm_start;
}
return 0;
}
namespace { namespace {
struct IterState { struct IterState {
IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version) IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version)

@ -474,13 +474,6 @@ class DBImpl : public DB {
Status BackgroundFlush(bool* madeProgress, JobContext* job_context, Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer); LogBuffer* log_buffer);
// This function is called as part of compaction. It enables Flush process to
// preempt compaction, since it's higher prioirty
uint64_t CallFlushDuringCompaction(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options,
JobContext* job_context,
LogBuffer* log_buffer);
void PrintStatistics(); void PrintStatistics();
// dump rocksdb.stats to LOG // dump rocksdb.stats to LOG

@ -232,7 +232,6 @@ TEST_F(DBTest, CompactedDB) {
const uint64_t kFileSize = 1 << 20; const uint64_t kFileSize = 1 << 20;
Options options; Options options;
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
options.max_mem_compaction_level = 0;
options.write_buffer_size = kFileSize; options.write_buffer_size = kFileSize;
options.target_file_size_base = kFileSize; options.target_file_size_base = kFileSize;
options.max_bytes_for_level_base = 1 << 30; options.max_bytes_for_level_base = 1 << 30;
@ -676,7 +675,6 @@ TEST_F(DBTest, GetPicksCorrectFile) {
TEST_F(DBTest, GetEncountersEmptyLevel) { TEST_F(DBTest, GetEncountersEmptyLevel) {
do { do {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.max_background_flushes = 0;
options.disableDataSync = true; options.disableDataSync = true;
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
// Arrange for the following to happen: // Arrange for the following to happen:
@ -688,14 +686,16 @@ TEST_F(DBTest, GetEncountersEmptyLevel) {
// occurring at level 1 (instead of the correct level 0). // occurring at level 1 (instead of the correct level 0).
// Step 1: First place sstables in levels 0 and 2 // Step 1: First place sstables in levels 0 and 2
int compaction_count = 0; Put(1, "a", "begin");
while (NumTableFilesAtLevel(0, 1) == 0 || NumTableFilesAtLevel(2, 1) == 0) { Put(1, "z", "end");
ASSERT_LE(compaction_count, 100) << "could not fill levels 0 and 2"; ASSERT_OK(Flush(1));
compaction_count++; dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
Put(1, "a", "begin"); dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
Put(1, "z", "end"); Put(1, "a", "begin");
ASSERT_OK(Flush(1)); Put(1, "z", "end");
} ASSERT_OK(Flush(1));
ASSERT_GT(NumTableFilesAtLevel(0, 1), 0);
ASSERT_GT(NumTableFilesAtLevel(2, 1), 0);
// Step 2: clear level 1 if necessary. // Step 2: clear level 1 if necessary.
dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
@ -709,7 +709,7 @@ TEST_F(DBTest, GetEncountersEmptyLevel) {
} }
// Step 4: Wait for compaction to finish // Step 4: Wait for compaction to finish
env_->SleepForMicroseconds(1000000); dbfull()->TEST_WaitForCompact();
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1); // XXX ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1); // XXX
} while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction)); } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction));
@ -2822,7 +2822,6 @@ TEST_F(DBTest, CompactionTrigger) {
Options options; Options options;
options.write_buffer_size = 100<<10; //100KB options.write_buffer_size = 100<<10; //100KB
options.num_levels = 3; options.num_levels = 3;
options.max_mem_compaction_level = 0;
options.level0_file_num_compaction_trigger = 3; options.level0_file_num_compaction_trigger = 3;
options = CurrentOptions(options); options = CurrentOptions(options);
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
@ -2864,7 +2863,6 @@ Options DeletionTriggerOptions() {
options.min_write_buffer_number_to_merge = 1; options.min_write_buffer_number_to_merge = 1;
options.max_write_buffer_number_to_maintain = 0; options.max_write_buffer_number_to_maintain = 0;
options.num_levels = kCDTNumLevels; options.num_levels = kCDTNumLevels;
options.max_mem_compaction_level = 0;
options.level0_file_num_compaction_trigger = 1; options.level0_file_num_compaction_trigger = 1;
options.target_file_size_base = options.write_buffer_size * 2; options.target_file_size_base = options.write_buffer_size * 2;
options.target_file_size_multiplier = 2; options.target_file_size_multiplier = 2;
@ -3853,7 +3851,6 @@ bool MinLevelToCompress(CompressionType& type, Options& options, int wbits,
fprintf(stderr, "Test with compression options : window_bits = %d, level = %d, strategy = %d}\n", wbits, lev, strategy); fprintf(stderr, "Test with compression options : window_bits = %d, level = %d, strategy = %d}\n", wbits, lev, strategy);
options.write_buffer_size = 100<<10; //100KB options.write_buffer_size = 100<<10; //100KB
options.num_levels = 3; options.num_levels = 3;
options.max_mem_compaction_level = 0;
options.level0_file_num_compaction_trigger = 3; options.level0_file_num_compaction_trigger = 3;
options.create_if_missing = true; options.create_if_missing = true;
@ -4455,7 +4452,6 @@ TEST_F(DBTest, HiddenValuesAreRemoved) {
options_override.skip_policy = kSkipNoSnapshot; options_override.skip_policy = kSkipNoSnapshot;
do { do {
Options options = CurrentOptions(options_override); Options options = CurrentOptions(options_override);
options.max_background_flushes = 0;
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301); Random rnd(301);
FillLevels("a", "z", 1); FillLevels("a", "z", 1);
@ -4557,7 +4553,8 @@ TEST_F(DBTest, DeletionMarkers1) {
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
Put(1, "foo", "v1"); Put(1, "foo", "v1");
ASSERT_OK(Flush(1)); ASSERT_OK(Flush(1));
const int last = CurrentOptions().max_mem_compaction_level; const int last = 2;
MoveFilesToLevel(last, 1);
// foo => v1 is now in last level // foo => v1 is now in last level
ASSERT_EQ(NumTableFilesAtLevel(last, 1), 1); ASSERT_EQ(NumTableFilesAtLevel(last, 1), 1);
@ -4565,6 +4562,7 @@ TEST_F(DBTest, DeletionMarkers1) {
Put(1, "a", "begin"); Put(1, "a", "begin");
Put(1, "z", "end"); Put(1, "z", "end");
Flush(1); Flush(1);
MoveFilesToLevel(last - 1, 1);
ASSERT_EQ(NumTableFilesAtLevel(last, 1), 1); ASSERT_EQ(NumTableFilesAtLevel(last, 1), 1);
ASSERT_EQ(NumTableFilesAtLevel(last - 1, 1), 1); ASSERT_EQ(NumTableFilesAtLevel(last - 1, 1), 1);
@ -4586,11 +4584,11 @@ TEST_F(DBTest, DeletionMarkers1) {
TEST_F(DBTest, DeletionMarkers2) { TEST_F(DBTest, DeletionMarkers2) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.max_background_flushes = 0;
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
Put(1, "foo", "v1"); Put(1, "foo", "v1");
ASSERT_OK(Flush(1)); ASSERT_OK(Flush(1));
const int last = CurrentOptions().max_mem_compaction_level; const int last = 2;
MoveFilesToLevel(last, 1);
// foo => v1 is now in last level // foo => v1 is now in last level
ASSERT_EQ(NumTableFilesAtLevel(last, 1), 1); ASSERT_EQ(NumTableFilesAtLevel(last, 1), 1);
@ -4598,6 +4596,7 @@ TEST_F(DBTest, DeletionMarkers2) {
Put(1, "a", "begin"); Put(1, "a", "begin");
Put(1, "z", "end"); Put(1, "z", "end");
Flush(1); Flush(1);
MoveFilesToLevel(last - 1, 1);
ASSERT_EQ(NumTableFilesAtLevel(last, 1), 1); ASSERT_EQ(NumTableFilesAtLevel(last, 1), 1);
ASSERT_EQ(NumTableFilesAtLevel(last - 1, 1), 1); ASSERT_EQ(NumTableFilesAtLevel(last - 1, 1), 1);
@ -4617,18 +4616,17 @@ TEST_F(DBTest, DeletionMarkers2) {
TEST_F(DBTest, OverlapInLevel0) { TEST_F(DBTest, OverlapInLevel0) {
do { do {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.max_background_flushes = 0;
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
int tmp = CurrentOptions().max_mem_compaction_level;
ASSERT_EQ(tmp, 2) << "Fix test to match config";
//Fill levels 1 and 2 to disable the pushing of new memtables to levels > 0. //Fill levels 1 and 2 to disable the pushing of new memtables to levels > 0.
ASSERT_OK(Put(1, "100", "v100")); ASSERT_OK(Put(1, "100", "v100"));
ASSERT_OK(Put(1, "999", "v999")); ASSERT_OK(Put(1, "999", "v999"));
Flush(1); Flush(1);
MoveFilesToLevel(2, 1);
ASSERT_OK(Delete(1, "100")); ASSERT_OK(Delete(1, "100"));
ASSERT_OK(Delete(1, "999")); ASSERT_OK(Delete(1, "999"));
Flush(1); Flush(1);
MoveFilesToLevel(1, 1);
ASSERT_EQ("0,1,1", FilesPerLevel(1)); ASSERT_EQ("0,1,1", FilesPerLevel(1));
// Make files spanning the following ranges in level-0: // Make files spanning the following ranges in level-0:
@ -4803,10 +4801,7 @@ TEST_F(DBTest, CustomComparator) {
TEST_F(DBTest, ManualCompaction) { TEST_F(DBTest, ManualCompaction) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.max_background_flushes = 0;
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_EQ(dbfull()->MaxMemCompactionLevel(), 2)
<< "Need to update this test to match kMaxMemCompactLevel";
// iter - 0 with 7 levels // iter - 0 with 7 levels
// iter - 1 with 3 levels // iter - 1 with 3 levels
@ -4836,7 +4831,7 @@ TEST_F(DBTest, ManualCompaction) {
// Compact all // Compact all
MakeTables(1, "a", "z", 1); MakeTables(1, "a", "z", 1);
ASSERT_EQ("0,1,2", FilesPerLevel(1)); ASSERT_EQ("1,0,2", FilesPerLevel(1));
db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
ASSERT_EQ("0,0,1", FilesPerLevel(1)); ASSERT_EQ("0,0,1", FilesPerLevel(1));
@ -4858,15 +4853,16 @@ TEST_F(DBTest, ManualLevelCompactionOutputPathId) {
options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760); options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760);
options.db_paths.emplace_back(dbname_ + "_3", 100 * 10485760); options.db_paths.emplace_back(dbname_ + "_3", 100 * 10485760);
options.db_paths.emplace_back(dbname_ + "_4", 120 * 10485760); options.db_paths.emplace_back(dbname_ + "_4", 120 * 10485760);
options.max_background_flushes = 1;
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_EQ(dbfull()->MaxMemCompactionLevel(), 2)
<< "Need to update this test to match kMaxMemCompactLevel";
// iter - 0 with 7 levels // iter - 0 with 7 levels
// iter - 1 with 3 levels // iter - 1 with 3 levels
for (int iter = 0; iter < 2; ++iter) { for (int iter = 0; iter < 2; ++iter) {
MakeTables(3, "p", "q", 1); for (int i = 0; i < 3; ++i) {
ASSERT_OK(Put(1, "p", "begin"));
ASSERT_OK(Put(1, "q", "end"));
ASSERT_OK(Flush(1));
}
ASSERT_EQ("3", FilesPerLevel(1)); ASSERT_EQ("3", FilesPerLevel(1));
ASSERT_EQ(3, GetSstFileCount(options.db_paths[0].path)); ASSERT_EQ(3, GetSstFileCount(options.db_paths[0].path));
ASSERT_EQ(0, GetSstFileCount(dbname_)); ASSERT_EQ(0, GetSstFileCount(dbname_));
@ -4887,7 +4883,11 @@ TEST_F(DBTest, ManualLevelCompactionOutputPathId) {
ASSERT_EQ(0, GetSstFileCount(dbname_)); ASSERT_EQ(0, GetSstFileCount(dbname_));
// Populate a different range // Populate a different range
MakeTables(3, "c", "e", 1); for (int i = 0; i < 3; ++i) {
ASSERT_OK(Put(1, "c", "begin"));
ASSERT_OK(Put(1, "e", "end"));
ASSERT_OK(Flush(1));
}
ASSERT_EQ("3,1", FilesPerLevel(1)); ASSERT_EQ("3,1", FilesPerLevel(1));
// Compact just the new range // Compact just the new range
@ -4898,7 +4898,9 @@ TEST_F(DBTest, ManualLevelCompactionOutputPathId) {
ASSERT_EQ(0, GetSstFileCount(dbname_)); ASSERT_EQ(0, GetSstFileCount(dbname_));
// Compact all // Compact all
MakeTables(1, "a", "z", 1); ASSERT_OK(Put(1, "a", "begin"));
ASSERT_OK(Put(1, "z", "end"));
ASSERT_OK(Flush(1));
ASSERT_EQ("1,2", FilesPerLevel(1)); ASSERT_EQ("1,2", FilesPerLevel(1));
ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path)); ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
@ -4966,14 +4968,14 @@ TEST_F(DBTest, DBOpen_Options) {
TEST_F(DBTest, DBOpen_Change_NumLevels) { TEST_F(DBTest, DBOpen_Change_NumLevels) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.create_if_missing = true; options.create_if_missing = true;
options.max_background_flushes = 0;
DestroyAndReopen(options); DestroyAndReopen(options);
ASSERT_TRUE(db_ != nullptr); ASSERT_TRUE(db_ != nullptr);
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put(1, "a", "123")); ASSERT_OK(Put(1, "a", "123"));
ASSERT_OK(Put(1, "b", "234")); ASSERT_OK(Put(1, "b", "234"));
db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); Flush(1);
MoveFilesToLevel(3, 1);
Close(); Close();
options.create_if_missing = false; options.create_if_missing = false;
@ -5153,7 +5155,6 @@ TEST_F(DBTest, ManifestWriteError) {
options.env = env_; options.env = env_;
options.create_if_missing = true; options.create_if_missing = true;
options.error_if_exists = false; options.error_if_exists = false;
options.max_background_flushes = 0;
DestroyAndReopen(options); DestroyAndReopen(options);
ASSERT_OK(Put("foo", "bar")); ASSERT_OK(Put("foo", "bar"));
ASSERT_EQ("bar", Get("foo")); ASSERT_EQ("bar", Get("foo"));
@ -5161,7 +5162,8 @@ TEST_F(DBTest, ManifestWriteError) {
// Memtable compaction (will succeed) // Memtable compaction (will succeed)
Flush(); Flush();
ASSERT_EQ("bar", Get("foo")); ASSERT_EQ("bar", Get("foo"));
const int last = dbfull()->MaxMemCompactionLevel(); const int last = 2;
MoveFilesToLevel(2);
ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo=>bar is now in last level ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo=>bar is now in last level
// Merging compaction (will fail) // Merging compaction (will fail)
@ -7707,6 +7709,7 @@ TEST_F(DBTest, DisableDataSyncTest) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.disableDataSync = iter == 0; options.disableDataSync = iter == 0;
options.create_if_missing = true; options.create_if_missing = true;
options.num_levels = 10;
options.env = env_; options.env = env_;
Reopen(options); Reopen(options);
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
@ -7732,7 +7735,6 @@ TEST_F(DBTest, DynamicMemtableOptions) {
options.create_if_missing = true; options.create_if_missing = true;
options.compression = kNoCompression; options.compression = kNoCompression;
options.max_background_compactions = 1; options.max_background_compactions = 1;
options.max_mem_compaction_level = 0;
options.write_buffer_size = k64KB; options.write_buffer_size = k64KB;
options.max_write_buffer_number = 2; options.max_write_buffer_number = 2;
// Don't trigger compact/slowdown/stop // Don't trigger compact/slowdown/stop
@ -8040,8 +8042,6 @@ TEST_F(DBTest, PreShutdownManualCompaction) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.max_background_flushes = 0; options.max_background_flushes = 0;
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_EQ(dbfull()->MaxMemCompactionLevel(), 2)
<< "Need to update this test to match kMaxMemCompactLevel";
// iter - 0 with 7 levels // iter - 0 with 7 levels
// iter - 1 with 3 levels // iter - 1 with 3 levels
@ -8071,10 +8071,10 @@ TEST_F(DBTest, PreShutdownManualCompaction) {
// Compact all // Compact all
MakeTables(1, "a", "z", 1); MakeTables(1, "a", "z", 1);
ASSERT_EQ("0,1,2", FilesPerLevel(1)); ASSERT_EQ("1,0,2", FilesPerLevel(1));
CancelAllBackgroundWork(db_); CancelAllBackgroundWork(db_);
db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
ASSERT_EQ("0,1,2", FilesPerLevel(1)); ASSERT_EQ("1,0,2", FilesPerLevel(1));
if (iter == 0) { if (iter == 0) {
options = CurrentOptions(); options = CurrentOptions();
@ -8724,44 +8724,6 @@ TEST_F(DBTest, DynamicCompactionOptions) {
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
ASSERT_LT(NumTableFilesAtLevel(0), 4); ASSERT_LT(NumTableFilesAtLevel(0), 4);
// Test max_mem_compaction_level.
// Destroy DB and start from scratch
options.max_background_compactions = 1;
options.max_background_flushes = 0;
options.max_mem_compaction_level = 2;
DestroyAndReopen(options);
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 0);
ASSERT_OK(Put("max_mem_compaction_level_key", RandomString(&rnd, 8)));
dbfull()->TEST_FlushMemTable(true);
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 1);
ASSERT_TRUE(Put("max_mem_compaction_level_key",
RandomString(&rnd, 8)).ok());
// Set new value and it becomes effective in this flush
ASSERT_OK(dbfull()->SetOptions({
{"max_mem_compaction_level", "1"}
}));
dbfull()->TEST_FlushMemTable(true);
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1), 1);
ASSERT_EQ(NumTableFilesAtLevel(2), 1);
ASSERT_TRUE(Put("max_mem_compaction_level_key",
RandomString(&rnd, 8)).ok());
// Set new value and it becomes effective in this flush
ASSERT_OK(dbfull()->SetOptions({
{"max_mem_compaction_level", "0"}
}));
dbfull()->TEST_FlushMemTable(true);
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
ASSERT_EQ(NumTableFilesAtLevel(1), 1);
ASSERT_EQ(NumTableFilesAtLevel(2), 1);
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }

@ -36,7 +36,6 @@ class DeleteFileTest : public testing::Test {
db_ = nullptr; db_ = nullptr;
env_ = Env::Default(); env_ = Env::Default();
options_.enable_thread_tracking = true; options_.enable_thread_tracking = true;
options_.max_background_flushes = 0;
options_.write_buffer_size = 1024*1024*1000; options_.write_buffer_size = 1024*1024*1000;
options_.target_file_size_base = 1024*1024*1000; options_.target_file_size_base = 1024*1024*1000;
options_.max_bytes_for_level_base = 1024*1024*1000; options_.max_bytes_for_level_base = 1024*1024*1000;
@ -117,10 +116,14 @@ class DeleteFileTest : public testing::Test {
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
ASSERT_OK(dbi->TEST_FlushMemTable()); ASSERT_OK(dbi->TEST_FlushMemTable());
ASSERT_OK(dbi->TEST_WaitForFlushMemTable()); ASSERT_OK(dbi->TEST_WaitForFlushMemTable());
for (int i = 0; i < 2; ++i) {
ASSERT_OK(dbi->TEST_CompactRange(i, nullptr, nullptr));
}
AddKeys(50000, 10000); AddKeys(50000, 10000);
ASSERT_OK(dbi->TEST_FlushMemTable()); ASSERT_OK(dbi->TEST_FlushMemTable());
ASSERT_OK(dbi->TEST_WaitForFlushMemTable()); ASSERT_OK(dbi->TEST_WaitForFlushMemTable());
ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr));
} }
void CheckFileTypeCounts(std::string& dir, void CheckFileTypeCounts(std::string& dir,

@ -276,27 +276,13 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
// Note that if file_size is zero, the file has been deleted and // Note that if file_size is zero, the file has been deleted and
// should not be added to the manifest. // should not be added to the manifest.
int level = 0;
if (s.ok() && meta->fd.GetFileSize() > 0) { if (s.ok() && meta->fd.GetFileSize() > 0) {
const Slice min_user_key = meta->smallest.user_key();
const Slice max_user_key = meta->largest.user_key();
// if we have more than 1 background thread, then we cannot // if we have more than 1 background thread, then we cannot
// insert files directly into higher levels because some other // insert files directly into higher levels because some other
// threads could be concurrently producing compacted files for // threads could be concurrently producing compacted files for
// that key range. // that key range.
if (base != nullptr && db_options_.max_background_compactions <= 1 && // Add file to L0
db_options_.max_background_flushes == 0 && edit->AddFile(0 /* level */, meta->fd.GetNumber(), meta->fd.GetPathId(),
cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
level = base->storage_info()->PickLevelForMemTableOutput(
mutable_cf_options_, min_user_key, max_user_key);
// If level does not match path id, reset level back to 0
uint32_t fdpath = LevelCompactionPicker::GetPathId(
*cfd_->ioptions(), mutable_cf_options_, level);
if (fdpath != 0) {
level = 0;
}
}
edit->AddFile(level, meta->fd.GetNumber(), meta->fd.GetPathId(),
meta->fd.GetFileSize(), meta->smallest, meta->largest, meta->fd.GetFileSize(), meta->smallest, meta->largest,
meta->smallest_seqno, meta->largest_seqno, meta->smallest_seqno, meta->largest_seqno,
meta->marked_for_compaction); meta->marked_for_compaction);
@ -305,7 +291,7 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
InternalStats::CompactionStats stats(1); InternalStats::CompactionStats stats(1);
stats.micros = db_options_.env->NowMicros() - start_micros; stats.micros = db_options_.env->NowMicros() - start_micros;
stats.bytes_written = meta->fd.GetFileSize(); stats.bytes_written = meta->fd.GetFileSize();
cfd_->internal_stats()->AddCompactionStats(level, stats); cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats);
cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
meta->fd.GetFileSize()); meta->fd.GetFileSize());
RecordTick(stats_, COMPACT_WRITE_BYTES, meta->fd.GetFileSize()); RecordTick(stats_, COMPACT_WRITE_BYTES, meta->fd.GetFileSize());

@ -1011,7 +1011,6 @@ TEST_F(PlainTableDBTest, CompactionTrigger) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.write_buffer_size = 100 << 10; //100KB options.write_buffer_size = 100 << 10; //100KB
options.num_levels = 3; options.num_levels = 3;
options.max_mem_compaction_level = 0;
options.level0_file_num_compaction_trigger = 3; options.level0_file_num_compaction_trigger = 3;
Reopen(&options); Reopen(&options);

@ -1279,38 +1279,6 @@ bool VersionStorageInfo::OverlapInLevel(int level,
largest_user_key); largest_user_key);
} }
int VersionStorageInfo::PickLevelForMemTableOutput(
const MutableCFOptions& mutable_cf_options, const Slice& smallest_user_key,
const Slice& largest_user_key) {
int level = 0;
if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) {
// Push to next level if there is no overlap in next level,
// and the #bytes overlapping in the level after that are limited.
InternalKey start;
start.SetMaxPossibleForUserKey(smallest_user_key);
InternalKey limit(largest_user_key, 0, static_cast<ValueType>(0));
std::vector<FileMetaData*> overlaps;
while (mutable_cf_options.max_mem_compaction_level > 0 &&
level < mutable_cf_options.max_mem_compaction_level) {
if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) {
break;
}
if (level + 2 >= num_levels_) {
level++;
break;
}
GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
const uint64_t sum = TotalFileSize(overlaps);
if (sum > mutable_cf_options.MaxGrandParentOverlapBytes(level)) {
break;
}
level++;
}
}
return level;
}
// Store in "*inputs" all files in "level" that overlap [begin,end] // Store in "*inputs" all files in "level" that overlap [begin,end]
// If hint_index is specified, then it points to a file in the // If hint_index is specified, then it points to a file in the
// overlapping range. // overlapping range.

@ -186,12 +186,6 @@ class VersionStorageInfo {
bool HasOverlappingUserKey(const std::vector<FileMetaData*>* inputs, bool HasOverlappingUserKey(const std::vector<FileMetaData*>* inputs,
int level); int level);
// Return the level at which we should place a new memtable compaction
// result that covers the range [smallest_user_key,largest_user_key].
int PickLevelForMemTableOutput(const MutableCFOptions& mutable_cf_options,
const Slice& smallest_user_key,
const Slice& largest_user_key);
int num_levels() const { return num_levels_; } int num_levels() const { return num_levels_; }
// REQUIRES: This version has been saved (see VersionSet::SaveTo) // REQUIRES: This version has been saved (see VersionSet::SaveTo)

@ -357,14 +357,7 @@ struct ColumnFamilyOptions {
// Dynamically changeable through SetOptions() API // Dynamically changeable through SetOptions() API
int level0_stop_writes_trigger; int level0_stop_writes_trigger;
// Maximum level to which a new compacted memtable is pushed if it // This does not do anything anymore. Deprecated.
// does not create overlap. We try to push to level 2 to avoid the
// relatively expensive level 0=>1 compactions and to avoid some
// expensive manifest file operations. We do not push all the way to
// the largest level since that can generate a lot of wasted disk
// space if the same key space is being repeatedly overwritten.
//
// Dynamically changeable through SetOptions() API
int max_mem_compaction_level; int max_mem_compaction_level;
// Target file size for compaction. // Target file size for compaction.

@ -1254,8 +1254,7 @@ void Java_org_rocksdb_Options_setLevelZeroStopWritesTrigger(
*/ */
jint Java_org_rocksdb_Options_maxMemCompactionLevel( jint Java_org_rocksdb_Options_maxMemCompactionLevel(
JNIEnv* env, jobject jobj, jlong jhandle) { JNIEnv* env, jobject jobj, jlong jhandle) {
return reinterpret_cast<rocksdb::Options*>( return 0;
jhandle)->max_mem_compaction_level;
} }
/* /*
@ -1264,11 +1263,7 @@ jint Java_org_rocksdb_Options_maxMemCompactionLevel(
* Signature: (JI)V * Signature: (JI)V
*/ */
void Java_org_rocksdb_Options_setMaxMemCompactionLevel( void Java_org_rocksdb_Options_setMaxMemCompactionLevel(
JNIEnv* env, jobject jobj, jlong jhandle, JNIEnv* env, jobject jobj, jlong jhandle, jint jmax_mem_compaction_level) {}
jint jmax_mem_compaction_level) {
reinterpret_cast<rocksdb::Options*>(jhandle)->max_mem_compaction_level =
static_cast<int>(jmax_mem_compaction_level);
}
/* /*
* Class: org_rocksdb_Options * Class: org_rocksdb_Options
@ -2413,8 +2408,7 @@ void Java_org_rocksdb_ColumnFamilyOptions_setLevelZeroStopWritesTrigger(
*/ */
jint Java_org_rocksdb_ColumnFamilyOptions_maxMemCompactionLevel( jint Java_org_rocksdb_ColumnFamilyOptions_maxMemCompactionLevel(
JNIEnv* env, jobject jobj, jlong jhandle) { JNIEnv* env, jobject jobj, jlong jhandle) {
return reinterpret_cast<rocksdb::ColumnFamilyOptions*>( return 0;
jhandle)->max_mem_compaction_level;
} }
/* /*
@ -2423,11 +2417,7 @@ jint Java_org_rocksdb_ColumnFamilyOptions_maxMemCompactionLevel(
* Signature: (JI)V * Signature: (JI)V
*/ */
void Java_org_rocksdb_ColumnFamilyOptions_setMaxMemCompactionLevel( void Java_org_rocksdb_ColumnFamilyOptions_setMaxMemCompactionLevel(
JNIEnv* env, jobject jobj, jlong jhandle, JNIEnv* env, jobject jobj, jlong jhandle, jint jmax_mem_compaction_level) {}
jint jmax_mem_compaction_level) {
reinterpret_cast<rocksdb::ColumnFamilyOptions*>(jhandle)->
max_mem_compaction_level = static_cast<int>(jmax_mem_compaction_level);
}
/* /*
* Class: org_rocksdb_ColumnFamilyOptions * Class: org_rocksdb_ColumnFamilyOptions

@ -1012,7 +1012,6 @@ class StressTest {
{ {
ToString(FLAGS_max_bytes_for_level_multiplier), "1", "2", ToString(FLAGS_max_bytes_for_level_multiplier), "1", "2",
}}, }},
{"max_mem_compaction_level", {"0", "1", "2"}},
{"max_sequential_skip_in_iterations", {"4", "8", "12"}}, {"max_sequential_skip_in_iterations", {"4", "8", "12"}},
}; };

@ -24,8 +24,7 @@ public:
db_ = nullptr; db_ = nullptr;
} }
Status OpenDB(bool create_if_missing, int levels, Status OpenDB(bool create_if_missing, int levels);
int mem_table_compact_level);
Status Put(const std::string& k, const std::string& v) { Status Put(const std::string& k, const std::string& v) {
return db_->Put(WriteOptions(), k, v); return db_->Put(WriteOptions(), k, v);
@ -43,7 +42,7 @@ public:
return result; return result;
} }
Status CompactMemTable() { Status Flush() {
if (db_ == nullptr) { if (db_ == nullptr) {
return Status::InvalidArgument("DB not opened."); return Status::InvalidArgument("DB not opened.");
} }
@ -51,6 +50,13 @@ public:
return db_impl->TEST_FlushMemTable(); return db_impl->TEST_FlushMemTable();
} }
void MoveL0FileToLevel(int level) {
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db_);
for (int i = 0; i < level; ++i) {
ASSERT_OK(db_impl->TEST_CompactRange(i, nullptr, nullptr));
}
}
void CloseDB() { void CloseDB() {
if (db_ != nullptr) { if (db_ != nullptr) {
delete db_; delete db_;
@ -72,13 +78,10 @@ private:
DB* db_; DB* db_;
}; };
Status ReduceLevelTest::OpenDB(bool create_if_missing, int num_levels, Status ReduceLevelTest::OpenDB(bool create_if_missing, int num_levels) {
int mem_table_compact_level) {
rocksdb::Options opt; rocksdb::Options opt;
opt.num_levels = num_levels; opt.num_levels = num_levels;
opt.create_if_missing = create_if_missing; opt.create_if_missing = create_if_missing;
opt.max_mem_compaction_level = mem_table_compact_level;
opt.max_background_flushes = 0;
rocksdb::Status st = rocksdb::DB::Open(opt, dbname_, &db_); rocksdb::Status st = rocksdb::DB::Open(opt, dbname_, &db_);
if (!st.ok()) { if (!st.ok()) {
fprintf(stderr, "Can't open the db:%s\n", st.ToString().c_str()); fprintf(stderr, "Can't open the db:%s\n", st.ToString().c_str());
@ -98,71 +101,73 @@ bool ReduceLevelTest::ReduceLevels(int target_level) {
} }
TEST_F(ReduceLevelTest, Last_Level) { TEST_F(ReduceLevelTest, Last_Level) {
// create files on all levels; ASSERT_OK(OpenDB(true, 4));
ASSERT_OK(OpenDB(true, 4, 3));
ASSERT_OK(Put("aaaa", "11111")); ASSERT_OK(Put("aaaa", "11111"));
ASSERT_OK(CompactMemTable()); Flush();
MoveL0FileToLevel(3);
ASSERT_EQ(FilesOnLevel(3), 1); ASSERT_EQ(FilesOnLevel(3), 1);
CloseDB(); CloseDB();
ASSERT_TRUE(ReduceLevels(3)); ASSERT_TRUE(ReduceLevels(3));
ASSERT_OK(OpenDB(true, 3, 1)); ASSERT_OK(OpenDB(true, 3));
ASSERT_EQ(FilesOnLevel(2), 1); ASSERT_EQ(FilesOnLevel(2), 1);
CloseDB(); CloseDB();
ASSERT_TRUE(ReduceLevels(2)); ASSERT_TRUE(ReduceLevels(2));
ASSERT_OK(OpenDB(true, 2, 1)); ASSERT_OK(OpenDB(true, 2));
ASSERT_EQ(FilesOnLevel(1), 1); ASSERT_EQ(FilesOnLevel(1), 1);
CloseDB(); CloseDB();
} }
TEST_F(ReduceLevelTest, Top_Level) { TEST_F(ReduceLevelTest, Top_Level) {
// create files on all levels; ASSERT_OK(OpenDB(true, 5));
ASSERT_OK(OpenDB(true, 5, 0));
ASSERT_OK(Put("aaaa", "11111")); ASSERT_OK(Put("aaaa", "11111"));
ASSERT_OK(CompactMemTable()); Flush();
ASSERT_EQ(FilesOnLevel(0), 1); ASSERT_EQ(FilesOnLevel(0), 1);
CloseDB(); CloseDB();
ASSERT_TRUE(ReduceLevels(4)); ASSERT_TRUE(ReduceLevels(4));
ASSERT_OK(OpenDB(true, 4, 0)); ASSERT_OK(OpenDB(true, 4));
CloseDB(); CloseDB();
ASSERT_TRUE(ReduceLevels(3)); ASSERT_TRUE(ReduceLevels(3));
ASSERT_OK(OpenDB(true, 3, 0)); ASSERT_OK(OpenDB(true, 3));
CloseDB(); CloseDB();
ASSERT_TRUE(ReduceLevels(2)); ASSERT_TRUE(ReduceLevels(2));
ASSERT_OK(OpenDB(true, 2, 0)); ASSERT_OK(OpenDB(true, 2));
CloseDB(); CloseDB();
} }
TEST_F(ReduceLevelTest, All_Levels) { TEST_F(ReduceLevelTest, All_Levels) {
// create files on all levels; ASSERT_OK(OpenDB(true, 5));
ASSERT_OK(OpenDB(true, 5, 1));
ASSERT_OK(Put("a", "a11111")); ASSERT_OK(Put("a", "a11111"));
ASSERT_OK(CompactMemTable()); ASSERT_OK(Flush());
ASSERT_EQ(FilesOnLevel(1), 1); MoveL0FileToLevel(4);
ASSERT_EQ(FilesOnLevel(4), 1);
CloseDB(); CloseDB();
ASSERT_OK(OpenDB(true, 5, 2)); ASSERT_OK(OpenDB(true, 5));
ASSERT_OK(Put("b", "b11111")); ASSERT_OK(Put("b", "b11111"));
ASSERT_OK(CompactMemTable()); ASSERT_OK(Flush());
ASSERT_EQ(FilesOnLevel(1), 1); MoveL0FileToLevel(3);
ASSERT_EQ(FilesOnLevel(2), 1); ASSERT_EQ(FilesOnLevel(3), 1);
ASSERT_EQ(FilesOnLevel(4), 1);
CloseDB(); CloseDB();
ASSERT_OK(OpenDB(true, 5, 3)); ASSERT_OK(OpenDB(true, 5));
ASSERT_OK(Put("c", "c11111")); ASSERT_OK(Put("c", "c11111"));
ASSERT_OK(CompactMemTable()); ASSERT_OK(Flush());
ASSERT_EQ(FilesOnLevel(1), 1); MoveL0FileToLevel(2);
ASSERT_EQ(FilesOnLevel(2), 1); ASSERT_EQ(FilesOnLevel(2), 1);
ASSERT_EQ(FilesOnLevel(3), 1); ASSERT_EQ(FilesOnLevel(3), 1);
ASSERT_EQ(FilesOnLevel(4), 1);
CloseDB(); CloseDB();
ASSERT_OK(OpenDB(true, 5, 4)); ASSERT_OK(OpenDB(true, 5));
ASSERT_OK(Put("d", "d11111")); ASSERT_OK(Put("d", "d11111"));
ASSERT_OK(CompactMemTable()); ASSERT_OK(Flush());
MoveL0FileToLevel(1);
ASSERT_EQ(FilesOnLevel(1), 1); ASSERT_EQ(FilesOnLevel(1), 1);
ASSERT_EQ(FilesOnLevel(2), 1); ASSERT_EQ(FilesOnLevel(2), 1);
ASSERT_EQ(FilesOnLevel(3), 1); ASSERT_EQ(FilesOnLevel(3), 1);
@ -170,7 +175,7 @@ TEST_F(ReduceLevelTest, All_Levels) {
CloseDB(); CloseDB();
ASSERT_TRUE(ReduceLevels(4)); ASSERT_TRUE(ReduceLevels(4));
ASSERT_OK(OpenDB(true, 4, 0)); ASSERT_OK(OpenDB(true, 4));
ASSERT_EQ("a11111", Get("a")); ASSERT_EQ("a11111", Get("a"));
ASSERT_EQ("b11111", Get("b")); ASSERT_EQ("b11111", Get("b"));
ASSERT_EQ("c11111", Get("c")); ASSERT_EQ("c11111", Get("c"));
@ -178,7 +183,7 @@ TEST_F(ReduceLevelTest, All_Levels) {
CloseDB(); CloseDB();
ASSERT_TRUE(ReduceLevels(3)); ASSERT_TRUE(ReduceLevels(3));
ASSERT_OK(OpenDB(true, 3, 0)); ASSERT_OK(OpenDB(true, 3));
ASSERT_EQ("a11111", Get("a")); ASSERT_EQ("a11111", Get("a"));
ASSERT_EQ("b11111", Get("b")); ASSERT_EQ("b11111", Get("b"));
ASSERT_EQ("c11111", Get("c")); ASSERT_EQ("c11111", Get("c"));
@ -186,7 +191,7 @@ TEST_F(ReduceLevelTest, All_Levels) {
CloseDB(); CloseDB();
ASSERT_TRUE(ReduceLevels(2)); ASSERT_TRUE(ReduceLevels(2));
ASSERT_OK(OpenDB(true, 2, 0)); ASSERT_OK(OpenDB(true, 2));
ASSERT_EQ("a11111", Get("a")); ASSERT_EQ("a11111", Get("a"));
ASSERT_EQ("b11111", Get("b")); ASSERT_EQ("b11111", Get("b"));
ASSERT_EQ("c11111", Get("c")); ASSERT_EQ("c11111", Get("c"));

@ -707,6 +707,7 @@ void DBTestBase::MakeTables(
ASSERT_OK(Put(cf, small, "begin")); ASSERT_OK(Put(cf, small, "begin"));
ASSERT_OK(Put(cf, large, "end")); ASSERT_OK(Put(cf, large, "end"));
ASSERT_OK(Flush(cf)); ASSERT_OK(Flush(cf));
MoveFilesToLevel(n - i - 1, cf);
} }
} }
@ -717,6 +718,16 @@ void DBTestBase::FillLevels(
MakeTables(db_->NumberLevels(handles_[cf]), smallest, largest, cf); MakeTables(db_->NumberLevels(handles_[cf]), smallest, largest, cf);
} }
void DBTestBase::MoveFilesToLevel(int level, int cf) {
for (int l = 0; l < level; ++l) {
if (cf > 0) {
dbfull()->TEST_CompactRange(l, nullptr, nullptr, handles_[cf]);
} else {
dbfull()->TEST_CompactRange(l, nullptr, nullptr);
}
}
}
void DBTestBase::DumpFileCounts(const char* label) { void DBTestBase::DumpFileCounts(const char* label) {
fprintf(stderr, "---\n%s:\n", label); fprintf(stderr, "---\n%s:\n", label);
fprintf(stderr, "maxoverlap: %" PRIu64 "\n", fprintf(stderr, "maxoverlap: %" PRIu64 "\n",

@ -581,6 +581,8 @@ class DBTestBase : public testing::Test {
void FillLevels(const std::string& smallest, const std::string& largest, void FillLevels(const std::string& smallest, const std::string& largest,
int cf); int cf);
void MoveFilesToLevel(int level, int cf = 0);
void DumpFileCounts(const char* label); void DumpFileCounts(const char* label);
std::string DumpSSTableList(); std::string DumpSSTableList();

@ -1143,7 +1143,6 @@ Options ReduceDBLevelsCommand::PrepareOptionsForOpenDB() {
// Disable size compaction // Disable size compaction
opt.max_bytes_for_level_base = 1ULL << 50; opt.max_bytes_for_level_base = 1ULL << 50;
opt.max_bytes_for_level_multiplier = 1; opt.max_bytes_for_level_multiplier = 1;
opt.max_mem_compaction_level = 0;
return opt; return opt;
} }

@ -113,8 +113,6 @@ void MutableCFOptions::Dump(Logger* log) const {
} }
result.resize(result.size() - 2); result.resize(result.size() - 2);
Log(log, "max_bytes_for_level_multiplier_additional: %s", result.c_str()); Log(log, "max_bytes_for_level_multiplier_additional: %s", result.c_str());
Log(log, " max_mem_compaction_level: %d",
max_mem_compaction_level);
Log(log, " verify_checksums_in_compaction: %d", Log(log, " verify_checksums_in_compaction: %d",
verify_checksums_in_compaction); verify_checksums_in_compaction);
Log(log, " max_sequential_skip_in_iterations: %" PRIu64, Log(log, " max_sequential_skip_in_iterations: %" PRIu64,

@ -39,7 +39,6 @@ struct MutableCFOptions {
max_bytes_for_level_multiplier(options.max_bytes_for_level_multiplier), max_bytes_for_level_multiplier(options.max_bytes_for_level_multiplier),
max_bytes_for_level_multiplier_additional( max_bytes_for_level_multiplier_additional(
options.max_bytes_for_level_multiplier_additional), options.max_bytes_for_level_multiplier_additional),
max_mem_compaction_level(options.max_mem_compaction_level),
verify_checksums_in_compaction(options.verify_checksums_in_compaction), verify_checksums_in_compaction(options.verify_checksums_in_compaction),
max_sequential_skip_in_iterations( max_sequential_skip_in_iterations(
options.max_sequential_skip_in_iterations), options.max_sequential_skip_in_iterations),
@ -70,7 +69,6 @@ struct MutableCFOptions {
target_file_size_multiplier(0), target_file_size_multiplier(0),
max_bytes_for_level_base(0), max_bytes_for_level_base(0),
max_bytes_for_level_multiplier(0), max_bytes_for_level_multiplier(0),
max_mem_compaction_level(0),
verify_checksums_in_compaction(false), verify_checksums_in_compaction(false),
max_sequential_skip_in_iterations(0), max_sequential_skip_in_iterations(0),
paranoid_file_checks(false) paranoid_file_checks(false)
@ -122,7 +120,6 @@ struct MutableCFOptions {
uint64_t max_bytes_for_level_base; uint64_t max_bytes_for_level_base;
int max_bytes_for_level_multiplier; int max_bytes_for_level_multiplier;
std::vector<int> max_bytes_for_level_multiplier_additional; std::vector<int> max_bytes_for_level_multiplier_additional;
int max_mem_compaction_level;
bool verify_checksums_in_compaction; bool verify_checksums_in_compaction;
// Misc options // Misc options

@ -88,7 +88,6 @@ ColumnFamilyOptions::ColumnFamilyOptions()
level0_file_num_compaction_trigger(4), level0_file_num_compaction_trigger(4),
level0_slowdown_writes_trigger(20), level0_slowdown_writes_trigger(20),
level0_stop_writes_trigger(24), level0_stop_writes_trigger(24),
max_mem_compaction_level(2),
target_file_size_base(2 * 1048576), target_file_size_base(2 * 1048576),
target_file_size_multiplier(1), target_file_size_multiplier(1),
max_bytes_for_level_base(10 * 1048576), max_bytes_for_level_base(10 * 1048576),
@ -145,7 +144,6 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
options.level0_file_num_compaction_trigger), options.level0_file_num_compaction_trigger),
level0_slowdown_writes_trigger(options.level0_slowdown_writes_trigger), level0_slowdown_writes_trigger(options.level0_slowdown_writes_trigger),
level0_stop_writes_trigger(options.level0_stop_writes_trigger), level0_stop_writes_trigger(options.level0_stop_writes_trigger),
max_mem_compaction_level(options.max_mem_compaction_level),
target_file_size_base(options.target_file_size_base), target_file_size_base(options.target_file_size_base),
target_file_size_multiplier(options.target_file_size_multiplier), target_file_size_multiplier(options.target_file_size_multiplier),
max_bytes_for_level_base(options.max_bytes_for_level_base), max_bytes_for_level_base(options.max_bytes_for_level_base),
@ -415,8 +413,6 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
level0_slowdown_writes_trigger); level0_slowdown_writes_trigger);
Warn(log, " Options.level0_stop_writes_trigger: %d", Warn(log, " Options.level0_stop_writes_trigger: %d",
level0_stop_writes_trigger); level0_stop_writes_trigger);
Warn(log, " Options.max_mem_compaction_level: %d",
max_mem_compaction_level);
Warn(log, " Options.target_file_size_base: %" PRIu64, Warn(log, " Options.target_file_size_base: %" PRIu64,
target_file_size_base); target_file_size_base);
Warn(log, " Options.target_file_size_multiplier: %d", Warn(log, " Options.target_file_size_multiplier: %d",

@ -228,8 +228,6 @@ bool ParseCompactionOptions(const std::string& name, const std::string& value,
start = end + 1; start = end + 1;
} }
} }
} else if (name == "max_mem_compaction_level") {
new_options->max_mem_compaction_level = ParseInt(value);
} else if (name == "verify_checksums_in_compaction") { } else if (name == "verify_checksums_in_compaction") {
new_options->verify_checksums_in_compaction = ParseBoolean(name, value); new_options->verify_checksums_in_compaction = ParseBoolean(name, value);
} else { } else {

@ -110,7 +110,6 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"level0_file_num_compaction_trigger", "8"}, {"level0_file_num_compaction_trigger", "8"},
{"level0_slowdown_writes_trigger", "9"}, {"level0_slowdown_writes_trigger", "9"},
{"level0_stop_writes_trigger", "10"}, {"level0_stop_writes_trigger", "10"},
{"max_mem_compaction_level", "11"},
{"target_file_size_base", "12"}, {"target_file_size_base", "12"},
{"target_file_size_multiplier", "13"}, {"target_file_size_multiplier", "13"},
{"max_bytes_for_level_base", "14"}, {"max_bytes_for_level_base", "14"},
@ -198,7 +197,6 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_cf_opt.level0_file_num_compaction_trigger, 8); ASSERT_EQ(new_cf_opt.level0_file_num_compaction_trigger, 8);
ASSERT_EQ(new_cf_opt.level0_slowdown_writes_trigger, 9); ASSERT_EQ(new_cf_opt.level0_slowdown_writes_trigger, 9);
ASSERT_EQ(new_cf_opt.level0_stop_writes_trigger, 10); ASSERT_EQ(new_cf_opt.level0_stop_writes_trigger, 10);
ASSERT_EQ(new_cf_opt.max_mem_compaction_level, 11);
ASSERT_EQ(new_cf_opt.target_file_size_base, static_cast<uint64_t>(12)); ASSERT_EQ(new_cf_opt.target_file_size_base, static_cast<uint64_t>(12));
ASSERT_EQ(new_cf_opt.target_file_size_multiplier, 13); ASSERT_EQ(new_cf_opt.target_file_size_multiplier, 13);
ASSERT_EQ(new_cf_opt.max_bytes_for_level_base, 14U); ASSERT_EQ(new_cf_opt.max_bytes_for_level_base, 14U);

@ -47,7 +47,6 @@ class TtlTest : public testing::Test {
// ensure that compaction is kicked in to always strip timestamp from kvs // ensure that compaction is kicked in to always strip timestamp from kvs
options_.max_grandparent_overlap_factor = 0; options_.max_grandparent_overlap_factor = 0;
// compaction should take place always from level0 for determinism // compaction should take place always from level0 for determinism
options_.max_mem_compaction_level = 0;
db_ttl_ = nullptr; db_ttl_ = nullptr;
DestroyDB(dbname_, Options()); DestroyDB(dbname_, Options());
} }

Loading…
Cancel
Save