diff --git a/.travis.yml b/.travis.yml index b6fa63c5d..b045d259e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -34,7 +34,7 @@ before_script: # as EnvPosixTest::AllocateTest expects within the Travis OpenVZ environment. script: - if [[ "${TRAVIS_OS_NAME}" == 'linux' ]]; then OPT=-DTRAVIS CLANG_FORMAT_DIFF=/tmp/clang-format-diff.py make format || true; fi - - OPT=-DTRAVIS V=1 make -j4 check && OPT=-DTRAVIS V=1 make clean jclean rocksdbjava jtest && make clean && OPT="-DTRAVIS -DROCKSDB_LITE" V=1 make -j4 check + - OPT=-DTRAVIS V=1 make -j4 check && OPT=-DTRAVIS V=1 make clean jclean rocksdbjava jtest && make clean && OPT="-DTRAVIS -DROCKSDB_LITE" V=1 make -j4 static_lib notifications: email: diff --git a/Makefile b/Makefile index 38fe1f5c1..c2642ce0f 100644 --- a/Makefile +++ b/Makefile @@ -700,7 +700,7 @@ clean: tags: ctags * -R - cscope -b `find . -name '*.cc'` `find . -name '*.h'` + cscope -b `find . -name '*.cc'` `find . -name '*.h'` `find . -name '*.c'` format: build_tools/format-diff.sh diff --git a/db/auto_roll_logger.cc b/db/auto_roll_logger.cc index cf92f34c8..2349bd0c0 100644 --- a/db/auto_roll_logger.cc +++ b/db/auto_roll_logger.cc @@ -32,8 +32,16 @@ Status AutoRollLogger::ResetLogger() { } void AutoRollLogger::RollLogFile() { - std::string old_fname = OldInfoLogFileName( - dbname_, env_->NowMicros(), db_absolute_path_, db_log_dir_); + // This function is called when log is rotating. Two rotations + // can happen quickly (NowMicro returns same value). To not overwrite + // previous log file we increment by one micro second and try again. + uint64_t now = env_->NowMicros(); + std::string old_fname; + do { + old_fname = OldInfoLogFileName( + dbname_, now, db_absolute_path_, db_log_dir_); + now++; + } while (env_->FileExists(old_fname).ok()); env_->RenameFile(log_fname_, old_fname); } diff --git a/db/column_family.cc b/db/column_family.cc index 408f53831..ca3be7855 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -239,6 +239,17 @@ ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options, result.level0_slowdown_writes_trigger, result.level0_file_num_compaction_trigger); } + + if (result.soft_pending_compaction_bytes_limit == 0) { + result.soft_pending_compaction_bytes_limit = + result.hard_pending_compaction_bytes_limit; + } else if (result.hard_pending_compaction_bytes_limit > 0 && + result.soft_pending_compaction_bytes_limit > + result.hard_pending_compaction_bytes_limit) { + result.soft_pending_compaction_bytes_limit = + result.hard_pending_compaction_bytes_limit; + } + if (result.level_compaction_dynamic_level_bytes) { if (result.compaction_style != kCompactionStyleLevel || db_options.db_paths.size() > 1U) { @@ -513,6 +524,21 @@ std::unique_ptr SetupDelay( } return write_controller->GetDelayToken(write_rate); } + +int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger, + int level0_slowdown_writes_trigger) { + // SanitizeOptions() ensures it. + assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger); + + // 1/4 of the way between L0 compaction trigger threshold and slowdown + // condition. + // Or twice as compaction trigger, if it is smaller. + return std::min(level0_file_num_compaction_trigger * 2, + level0_file_num_compaction_trigger + + (level0_slowdown_writes_trigger - + level0_file_num_compaction_trigger) / + 4); +} } // namespace void ColumnFamilyData::RecalculateWriteStallConditions( @@ -531,21 +557,6 @@ void ColumnFamilyData::RecalculateWriteStallConditions( "(waiting for flush), max_write_buffer_number is set to %d", name_.c_str(), imm()->NumNotFlushed(), mutable_cf_options.max_write_buffer_number); - } else if (mutable_cf_options.max_write_buffer_number > 3 && - imm()->NumNotFlushed() >= - mutable_cf_options.max_write_buffer_number - 1) { - write_controller_token_ = - SetupDelay(ioptions_.delayed_write_rate, write_controller, - compaction_needed_bytes, prev_compaction_needed_bytes_, - mutable_cf_options.disable_auto_compactions); - internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1); - Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, - "[%s] Stalling writes because we have %d immutable memtables " - "(waiting for flush), max_write_buffer_number is set to %d " - "rate %" PRIu64, - name_.c_str(), imm()->NumNotFlushed(), - mutable_cf_options.max_write_buffer_number, - write_controller->delayed_write_rate()); } else if (vstorage->l0_delay_trigger_count() >= mutable_cf_options.level0_stop_writes_trigger) { write_controller_token_ = write_controller->GetStopToken(); @@ -567,6 +578,21 @@ void ColumnFamilyData::RecalculateWriteStallConditions( "[%s] Stopping writes because of estimated pending compaction " "bytes %" PRIu64, name_.c_str(), compaction_needed_bytes); + } else if (mutable_cf_options.max_write_buffer_number > 3 && + imm()->NumNotFlushed() >= + mutable_cf_options.max_write_buffer_number - 1) { + write_controller_token_ = + SetupDelay(ioptions_.delayed_write_rate, write_controller, + compaction_needed_bytes, prev_compaction_needed_bytes_, + mutable_cf_options.disable_auto_compactions); + internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1); + Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, + "[%s] Stalling writes because we have %d immutable memtables " + "(waiting for flush), max_write_buffer_number is set to %d " + "rate %" PRIu64, + name_.c_str(), imm()->NumNotFlushed(), + mutable_cf_options.max_write_buffer_number, + write_controller->delayed_write_rate()); } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 && vstorage->l0_delay_trigger_count() >= mutable_cf_options.level0_slowdown_writes_trigger) { @@ -598,6 +624,29 @@ void ColumnFamilyData::RecalculateWriteStallConditions( "bytes %" PRIu64 " rate %" PRIu64, name_.c_str(), vstorage->estimated_compaction_needed_bytes(), write_controller->delayed_write_rate()); + } else if (vstorage->l0_delay_trigger_count() >= + GetL0ThresholdSpeedupCompaction( + mutable_cf_options.level0_file_num_compaction_trigger, + mutable_cf_options.level0_slowdown_writes_trigger)) { + write_controller_token_ = write_controller->GetCompactionPressureToken(); + Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, + "[%s] Increasing compaction threads because we have %d level-0 " + "files ", + name_.c_str(), vstorage->l0_delay_trigger_count()); + } else if (vstorage->estimated_compaction_needed_bytes() >= + mutable_cf_options.soft_pending_compaction_bytes_limit / 4) { + // Increase compaction threads if bytes needed for compaction exceeds + // 1/4 of threshold for slowing down. + // If soft pending compaction byte limit is not set, always speed up + // compaction. + write_controller_token_ = write_controller->GetCompactionPressureToken(); + if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) { + Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, + "[%s] Increasing compaction threads because of estimated pending " + "compaction " + "bytes %" PRIu64, + name_.c_str(), vstorage->estimated_compaction_needed_bytes()); + } } else { write_controller_token_.reset(); } diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 62fadbbee..e3b51fc85 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -65,6 +65,7 @@ class ColumnFamilyTest : public testing::Test { ~ColumnFamilyTest() { Close(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); Destroy(); delete env_; } @@ -2047,7 +2048,6 @@ TEST_F(ColumnFamilyTest, FlushAndDropRaceCondition) { Close(); Destroy(); - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } #ifndef ROCKSDB_LITE @@ -2125,7 +2125,6 @@ TEST_F(ColumnFamilyTest, CreateAndDropRace) { drop_cf_thread.join(); Close(); Destroy(); - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); for (auto* comparator : comparators) { if (comparator) { delete comparator; @@ -2137,6 +2136,9 @@ TEST_F(ColumnFamilyTest, CreateAndDropRace) { TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { const uint64_t kBaseRate = 810000u; db_options_.delayed_write_rate = kBaseRate; + db_options_.base_background_compactions = 2; + db_options_.max_background_compactions = 6; + Open({"default"}); ColumnFamilyData* cfd = static_cast(db_->DefaultColumnFamily())->cfd(); @@ -2162,6 +2164,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); vstorage->TEST_set_estimated_compaction_needed_bytes(400); cfd->RecalculateWriteStallConditions(mutable_cf_options); @@ -2169,6 +2172,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate / 1.2, dbfull()->TEST_write_controler().delayed_write_rate()); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); vstorage->TEST_set_estimated_compaction_needed_bytes(500); cfd->RecalculateWriteStallConditions(mutable_cf_options); @@ -2224,6 +2228,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); vstorage->TEST_set_estimated_compaction_needed_bytes(3001); cfd->RecalculateWriteStallConditions(mutable_cf_options); @@ -2248,6 +2253,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate / 1.2, dbfull()->TEST_write_controler().delayed_write_rate()); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); vstorage->set_l0_delay_trigger_count(101); cfd->RecalculateWriteStallConditions(mutable_cf_options); @@ -2320,6 +2326,73 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { dbfull()->TEST_write_controler().delayed_write_rate()); } +TEST_F(ColumnFamilyTest, CompactionSpeedupSingleColumnFamily) { + db_options_.base_background_compactions = 2; + db_options_.max_background_compactions = 6; + Open({"default"}); + ColumnFamilyData* cfd = + static_cast(db_->DefaultColumnFamily())->cfd(); + + VersionStorageInfo* vstorage = cfd->current()->storage_info(); + + MutableCFOptions mutable_cf_options( + Options(db_options_, column_family_options_), + ImmutableCFOptions(Options(db_options_, column_family_options_))); + + // Speed up threshold = min(4 * 2, 4 + (36 - 4)/4) = 8 + mutable_cf_options.level0_file_num_compaction_trigger = 4; + mutable_cf_options.level0_slowdown_writes_trigger = 36; + mutable_cf_options.level0_stop_writes_trigger = 50; + // Speedup threshold = 200 / 4 = 50 + mutable_cf_options.soft_pending_compaction_bytes_limit = 200; + mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; + + vstorage->TEST_set_estimated_compaction_needed_bytes(40); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(50); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(300); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(45); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + + vstorage->set_l0_delay_trigger_count(7); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + + vstorage->set_l0_delay_trigger_count(9); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage->set_l0_delay_trigger_count(6); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + + // Speed up threshold = min(4 * 2, 4 + (12 - 4)/4) = 6 + mutable_cf_options.level0_file_num_compaction_trigger = 4; + mutable_cf_options.level0_slowdown_writes_trigger = 16; + mutable_cf_options.level0_stop_writes_trigger = 30; + + vstorage->set_l0_delay_trigger_count(5); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + + vstorage->set_l0_delay_trigger_count(7); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage->set_l0_delay_trigger_count(3); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); +} + TEST_F(ColumnFamilyTest, WriteStallTwoColumnFamilies) { const uint64_t kBaseRate = 810000u; db_options_.delayed_write_rate = kBaseRate; @@ -2401,6 +2474,74 @@ TEST_F(ColumnFamilyTest, WriteStallTwoColumnFamilies) { ASSERT_EQ(kBaseRate / 1.2, dbfull()->TEST_write_controler().delayed_write_rate()); } + +TEST_F(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) { + db_options_.base_background_compactions = 2; + db_options_.max_background_compactions = 6; + column_family_options_.soft_pending_compaction_bytes_limit = 200; + column_family_options_.hard_pending_compaction_bytes_limit = 2000; + Open(); + CreateColumnFamilies({"one"}); + ColumnFamilyData* cfd = + static_cast(db_->DefaultColumnFamily())->cfd(); + VersionStorageInfo* vstorage = cfd->current()->storage_info(); + + ColumnFamilyData* cfd1 = + static_cast(handles_[1])->cfd(); + VersionStorageInfo* vstorage1 = cfd1->current()->storage_info(); + + MutableCFOptions mutable_cf_options( + Options(db_options_, column_family_options_), + ImmutableCFOptions(Options(db_options_, column_family_options_))); + // Speed up threshold = min(4 * 2, 4 + (36 - 4)/4) = 8 + mutable_cf_options.level0_file_num_compaction_trigger = 4; + mutable_cf_options.level0_slowdown_writes_trigger = 36; + mutable_cf_options.level0_stop_writes_trigger = 30; + // Speedup threshold = 200 / 4 = 50 + mutable_cf_options.soft_pending_compaction_bytes_limit = 200; + mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; + + MutableCFOptions mutable_cf_options1 = mutable_cf_options; + mutable_cf_options1.level0_slowdown_writes_trigger = 16; + + vstorage->TEST_set_estimated_compaction_needed_bytes(40); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(60); + cfd1->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage1->TEST_set_estimated_compaction_needed_bytes(30); + cfd1->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage1->TEST_set_estimated_compaction_needed_bytes(70); + cfd1->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(20); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage1->TEST_set_estimated_compaction_needed_bytes(3); + cfd1->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + + vstorage->set_l0_delay_trigger_count(9); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage1->set_l0_delay_trigger_count(2); + cfd1->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage->set_l0_delay_trigger_count(0); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); +} } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index d29b50e7f..5d9e0536f 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -533,6 +533,104 @@ TEST_P(DBCompactionTestWithParam, CompactionTrigger) { ASSERT_EQ(NumTableFilesAtLevel(1, 1), 1); } +TEST_F(DBCompactionTest, BGCompactionsAllowed) { + // Create several column families. Make compaction triggers in all of them + // and see number of compactions scheduled to be less than allowed. + const int kNumKeysPerFile = 100; + + Options options; + options.write_buffer_size = 110 << 10; // 110KB + options.arena_block_size = 4 << 10; + options.num_levels = 3; + // Should speed up compaction when there are 4 files. + options.level0_file_num_compaction_trigger = 2; + options.level0_slowdown_writes_trigger = 20; + options.soft_pending_compaction_bytes_limit = 1 << 30; // Infinitely large + options.base_background_compactions = 1; + options.max_background_compactions = 3; + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); + options = CurrentOptions(options); + + // Block all threads in thread pool. + const size_t kTotalTasks = 4; + env_->SetBackgroundThreads(4, Env::LOW); + test::SleepingBackgroundTask sleeping_tasks[kTotalTasks]; + for (size_t i = 0; i < kTotalTasks; i++) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_tasks[i], Env::Priority::LOW); + sleeping_tasks[i].WaitUntilSleeping(); + } + + CreateAndReopenWithCF({"one", "two", "three"}, options); + + Random rnd(301); + for (int cf = 0; cf < 4; cf++) { + for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) { + for (int i = 0; i < kNumKeysPerFile; i++) { + ASSERT_OK(Put(cf, Key(i), "")); + } + // put extra key to trigger flush + ASSERT_OK(Put(cf, "", "")); + dbfull()->TEST_WaitForFlushMemTable(handles_[cf]); + ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1); + } + } + + // Now all column families qualify compaction but only one should be + // scheduled, because no column family hits speed up condition. + ASSERT_EQ(1, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + + // Create two more files for one column family, which triggers speed up + // condition, three compactions will be scheduled. + for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) { + for (int i = 0; i < kNumKeysPerFile; i++) { + ASSERT_OK(Put(2, Key(i), "")); + } + // put extra key to trigger flush + ASSERT_OK(Put(2, "", "")); + dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1, + NumTableFilesAtLevel(0, 2)); + } + ASSERT_EQ(3, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + + // Unblock all threads to unblock all compactions. + for (size_t i = 0; i < kTotalTasks; i++) { + sleeping_tasks[i].WakeUp(); + sleeping_tasks[i].WaitUntilDone(); + } + dbfull()->TEST_WaitForCompact(); + + // Verify number of compactions allowed will come back to 1. + + for (size_t i = 0; i < kTotalTasks; i++) { + sleeping_tasks[i].Reset(); + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_tasks[i], Env::Priority::LOW); + sleeping_tasks[i].WaitUntilSleeping(); + } + for (int cf = 0; cf < 4; cf++) { + for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) { + for (int i = 0; i < kNumKeysPerFile; i++) { + ASSERT_OK(Put(cf, Key(i), "")); + } + // put extra key to trigger flush + ASSERT_OK(Put(cf, "", "")); + dbfull()->TEST_WaitForFlushMemTable(handles_[cf]); + ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1); + } + } + + // Now all column families qualify compaction but only one should be + // scheduled, because no column family hits speed up condition. + ASSERT_EQ(1, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + + for (size_t i = 0; i < kTotalTasks; i++) { + sleeping_tasks[i].WakeUp(); + sleeping_tasks[i].WaitUntilDone(); + } +} + TEST_P(DBCompactionTestWithParam, CompactionsGenerateMultipleFiles) { Options options; options.write_buffer_size = 100000000; // Large write buffer @@ -2198,6 +2296,25 @@ TEST_P(DBCompactionTestWithParam, CompressLevelCompaction) { Destroy(options); } +TEST_F(DBCompactionTest, SanitizeCompactionOptionsTest) { + Options options = CurrentOptions(); + options.max_background_compactions = 5; + options.soft_pending_compaction_bytes_limit = 0; + options.hard_pending_compaction_bytes_limit = 100; + options.create_if_missing = true; + DestroyAndReopen(options); + ASSERT_EQ(5, db_->GetOptions().base_background_compactions); + ASSERT_EQ(100, db_->GetOptions().soft_pending_compaction_bytes_limit); + + options.base_background_compactions = 4; + options.max_background_compactions = 3; + options.soft_pending_compaction_bytes_limit = 200; + options.hard_pending_compaction_bytes_limit = 150; + DestroyAndReopen(options); + ASSERT_EQ(3, db_->GetOptions().base_background_compactions); + ASSERT_EQ(150, db_->GetOptions().soft_pending_compaction_bytes_limit); +} + // This tests for a bug that could cause two level0 compactions running // concurrently // TODO(aekmekji): Make sure that the reason this fails when run with diff --git a/db/db_impl.cc b/db/db_impl.cc index 6db05ae66..f625c775e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -146,6 +146,12 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { result.info_log = nullptr; } } + if (result.base_background_compactions == -1) { + result.base_background_compactions = result.max_background_compactions; + } + if (result.base_background_compactions > result.max_background_compactions) { + result.base_background_compactions = result.max_background_compactions; + } result.env->IncBackgroundThreadsIfNeeded(src.max_background_compactions, Env::Priority::LOW); result.env->IncBackgroundThreadsIfNeeded(src.max_background_flushes, @@ -2448,12 +2454,14 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this); } + auto bg_compactions_allowed = BGCompactionsAllowed(); + // special case -- if max_background_flushes == 0, then schedule flush on a // compaction thread if (db_options_.max_background_flushes == 0) { while (unscheduled_flushes_ > 0 && bg_flush_scheduled_ + bg_compaction_scheduled_ < - db_options_.max_background_compactions) { + bg_compactions_allowed) { unscheduled_flushes_--; bg_flush_scheduled_++; env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::LOW, this); @@ -2466,7 +2474,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { return; } - while (bg_compaction_scheduled_ < db_options_.max_background_compactions && + while (bg_compaction_scheduled_ < bg_compactions_allowed && unscheduled_compactions_ > 0) { CompactionArg* ca = new CompactionArg; ca->db = this; @@ -2478,6 +2486,14 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { } } +int DBImpl::BGCompactionsAllowed() const { + if (write_controller_.NeedSpeedupCompaction()) { + return db_options_.max_background_compactions; + } else { + return db_options_.base_background_compactions; + } +} + void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) { assert(!cfd->pending_compaction()); cfd->Ref(); @@ -2590,10 +2606,10 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, LogToBuffer( log_buffer, "Calling FlushMemTableToOutputFile with column " - "family [%s], flush slots available %d, compaction slots available %d", - cfd->GetName().c_str(), - db_options_.max_background_flushes - bg_flush_scheduled_, - db_options_.max_background_compactions - bg_compaction_scheduled_); + "family [%s], flush slots available %d, compaction slots allowed %d, " + "compaction slots scheduled %d", + cfd->GetName().c_str(), db_options_.max_background_flushes, + bg_flush_scheduled_, BGCompactionsAllowed() - bg_compaction_scheduled_); status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress, job_context, log_buffer); if (cfd->Unref()) { @@ -3311,6 +3327,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, RecordTick(stats_, NUMBER_KEYS_READ); RecordTick(stats_, BYTES_READ, value->size()); + MeasureTime(stats_, BYTES_PER_READ, value->size()); } return s; } @@ -3421,6 +3438,7 @@ std::vector DBImpl::MultiGet( RecordTick(stats_, NUMBER_MULTIGET_CALLS); RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys); RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read); + MeasureTime(stats_, BYTES_PER_MULTIGET, bytes_read); PERF_TIMER_STOP(get_post_process_time); return stat_list; @@ -4119,7 +4137,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (write_thread_.CompleteParallelWorker(&w)) { // we're responsible for early exit - auto last_sequence = w.parallel_group->last_writer->sequence; + auto last_sequence = + w.parallel_group->last_writer->sequence + + WriteBatchInternal::Count(w.parallel_group->last_writer->batch) - 1; SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence); versions_->SetLastSequence(last_sequence); write_thread_.EarlyExitParallelGroup(&w); @@ -4305,6 +4325,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // Record statistics RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); RecordTick(stats_, BYTES_WRITTEN, total_byte_size); + MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size); PERF_TIMER_STOP(write_pre_and_post_process_time); if (write_options.disableWAL) { @@ -4418,7 +4439,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, this, true /*dont_filter_deletes*/, true /*concurrent_memtable_writes*/); - assert(last_writer->sequence == last_sequence); + assert(last_writer->sequence + + WriteBatchInternal::Count(last_writer->batch) - 1 == + last_sequence); // CompleteParallelWorker returns true if this thread should // handle exit, false means somebody else did exit_completed_early = !write_thread_.CompleteParallelWorker(&w); diff --git a/db/db_impl.h b/db/db_impl.h index 683fd49dc..d09d645d7 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -347,6 +347,10 @@ class DBImpl : public DB { #endif // NDEBUG + // Return maximum background compaction alowed to be scheduled based on + // compaction status. + int BGCompactionsAllowed() const; + // Returns the list of live files in 'live' and the list // of all files in the filesystem in 'candidate_files'. // If force == false and the last call was less than diff --git a/db/db_test.cc b/db/db_test.cc index 76e64d484..dfc231969 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -606,6 +606,61 @@ TEST_F(DBTest, EmptyFlush) { kSkipUniversalCompaction | kSkipMergePut)); } +// Disable because not all platform can run it. +// It requires more than 9GB memory to run it, With single allocation +// of more than 3GB. +TEST_F(DBTest, DISABLED_VeryLargeValue) { + const size_t kValueSize = 3221225472u; // 3GB value + const size_t kKeySize = 8388608u; // 8MB key + std::string raw(kValueSize, 'v'); + std::string key1(kKeySize, 'c'); + std::string key2(kKeySize, 'd'); + + Options options; + options.env = env_; + options.write_buffer_size = 100000; // Small write buffer + options.paranoid_checks = true; + options = CurrentOptions(options); + DestroyAndReopen(options); + + ASSERT_OK(Put("boo", "v1")); + ASSERT_OK(Put("foo", "v1")); + ASSERT_OK(Put(key1, raw)); + raw[0] = 'w'; + ASSERT_OK(Put(key2, raw)); + dbfull()->TEST_WaitForFlushMemTable(); + + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + + std::string value; + Status s = db_->Get(ReadOptions(), key1, &value); + ASSERT_OK(s); + ASSERT_EQ(kValueSize, value.size()); + ASSERT_EQ('v', value[0]); + + s = db_->Get(ReadOptions(), key2, &value); + ASSERT_OK(s); + ASSERT_EQ(kValueSize, value.size()); + ASSERT_EQ('w', value[0]); + + // Compact all files. + Flush(); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + + // Check DB is not in read-only state. + ASSERT_OK(Put("boo", "v1")); + + s = db_->Get(ReadOptions(), key1, &value); + ASSERT_OK(s); + ASSERT_EQ(kValueSize, value.size()); + ASSERT_EQ('v', value[0]); + + s = db_->Get(ReadOptions(), key2, &value); + ASSERT_OK(s); + ASSERT_EQ(kValueSize, value.size()); + ASSERT_EQ('w', value[0]); +} + TEST_F(DBTest, GetFromImmutableLayer) { do { Options options; diff --git a/db/db_test_util.cc b/db/db_test_util.cc index e6ee304a5..f2906c7ca 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -85,7 +85,8 @@ bool DBTestBase::ShouldSkipOptions(int option_config, int skip_mask) { option_config == kHashCuckoo || option_config == kUniversalCompaction || option_config == kUniversalCompactionMultiLevel || option_config == kUniversalSubcompactions || - option_config == kFIFOCompaction) { + option_config == kFIFOCompaction || + option_config == kConcurrentSkipList) { return true; } #endif @@ -361,6 +362,11 @@ Options DBTestBase::CurrentOptions( options.max_subcompactions = 4; break; } + case kConcurrentSkipList: { + options.allow_concurrent_memtable_write = true; + options.enable_write_thread_adaptive_yield = true; + break; + } default: break; diff --git a/db/db_test_util.h b/db/db_test_util.h index 031057bbb..48fa5430e 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -525,9 +525,10 @@ class DBTestBase : public testing::Test { kOptimizeFiltersForHits = 27, kRowCache = 28, kRecycleLogFiles = 29, - kLevelSubcompactions = 30, - kUniversalSubcompactions = 31, - kEnd = 30 + kEnd = 30, + kConcurrentSkipList = 30, + kLevelSubcompactions = 31, + kUniversalSubcompactions = 32, }; int option_config_; diff --git a/db/db_universal_compaction_test.cc b/db/db_universal_compaction_test.cc index 9d3cca83c..a4cf6657f 100644 --- a/db/db_universal_compaction_test.cc +++ b/db/db_universal_compaction_test.cc @@ -187,14 +187,16 @@ TEST_P(DBTestUniversalCompaction, OptimizeFiltersForHits) { env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); - Put("", ""); for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) { Put(Key(num * 10), "val"); + if (num) { + dbfull()->TEST_WaitForFlushMemTable(); + } Put(Key(30 + num * 10), "val"); Put(Key(60 + num * 10), "val"); - - dbfull()->TEST_WaitForFlushMemTable(); } + Put("", ""); + dbfull()->TEST_WaitForFlushMemTable(); // Query set of non existing keys for (int i = 5; i < 90; i += 10) { @@ -205,6 +207,13 @@ TEST_P(DBTestUniversalCompaction, OptimizeFiltersForHits) { ASSERT_GT(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0); auto prev_counter = TestGetTickerCount(options, BLOOM_FILTER_USEFUL); + // Make sure bloom filter is used for all but the last L0 file when looking + // up a non-existent key that's in the range of all L0 files. + ASSERT_EQ(Get(Key(35)), "NOT_FOUND"); + ASSERT_EQ(prev_counter + NumTableFilesAtLevel(0) - 1, + TestGetTickerCount(options, BLOOM_FILTER_USEFUL)); + prev_counter = TestGetTickerCount(options, BLOOM_FILTER_USEFUL); + // Unblock compaction and wait it for happening. sleeping_task_low.WakeUp(); dbfull()->TEST_WaitForCompact(); diff --git a/db/perf_context_test.cc b/db/perf_context_test.cc index 9494ac92b..8a345e5bb 100644 --- a/db/perf_context_test.cc +++ b/db/perf_context_test.cc @@ -589,6 +589,19 @@ TEST_F(PerfContextTest, FalseDBMutexWait) { } } } + +TEST_F(PerfContextTest, ToString) { + perf_context.Reset(); + perf_context.block_read_count = 12345; + + std::string zero_included = perf_context.ToString(); + ASSERT_NE(std::string::npos, zero_included.find("= 0")); + ASSERT_NE(std::string::npos, zero_included.find("= 12345")); + + std::string zero_excluded = perf_context.ToString(true); + ASSERT_EQ(std::string::npos, zero_excluded.find("= 0")); + ASSERT_NE(std::string::npos, zero_excluded.find("= 12345")); +} } int main(int argc, char** argv) { diff --git a/db/version_set.cc b/db/version_set.cc index 3679bfbb4..6804730d7 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -84,15 +84,11 @@ int FindFileInRange(const InternalKeyComparator& icmp, // are MergeInProgress). class FilePicker { public: - FilePicker( - std::vector* files, - const Slice& user_key, - const Slice& ikey, - autovector* file_levels, - unsigned int num_levels, - FileIndexer* file_indexer, - const Comparator* user_comparator, - const InternalKeyComparator* internal_comparator) + FilePicker(std::vector* files, const Slice& user_key, + const Slice& ikey, autovector* file_levels, + unsigned int num_levels, FileIndexer* file_indexer, + const Comparator* user_comparator, + const InternalKeyComparator* internal_comparator) : num_levels_(num_levels), curr_level_(-1), hit_file_level_(-1), @@ -102,6 +98,7 @@ class FilePicker { files_(files), #endif level_files_brief_(file_levels), + is_hit_file_last_in_level_(false), user_key_(user_key), ikey_(ikey), file_indexer_(file_indexer), @@ -126,6 +123,8 @@ class FilePicker { // Loops over all files in current level. FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_]; hit_file_level_ = curr_level_; + is_hit_file_last_in_level_ = + curr_index_in_curr_level_ == curr_file_level_->num_files - 1; int cmp_largest = -1; // Do key range filtering of files or/and fractional cascading if: @@ -209,6 +208,10 @@ class FilePicker { // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts unsigned int GetHitFileLevel() { return hit_file_level_; } + // Returns true if the most recent "hit file" (i.e., one returned by + // GetNextFile()) is at the last index in its level. + bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; } + private: unsigned int num_levels_; unsigned int curr_level_; @@ -220,6 +223,7 @@ class FilePicker { #endif autovector* level_files_brief_; bool search_ended_; + bool is_hit_file_last_in_level_; LevelFilesBrief* curr_file_level_; unsigned int curr_index_in_curr_level_; unsigned int start_index_in_curr_level_; @@ -903,7 +907,8 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, *status = table_cache_->Get( read_options, *internal_comparator(), f->fd, ikey, &get_context, cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), - IsFilterSkipped(static_cast(fp.GetHitFileLevel()))); + IsFilterSkipped(static_cast(fp.GetHitFileLevel()), + fp.IsHitFileLastInLevel())); // TODO: examine the behavior for corrupted key if (!status->ok()) { return; @@ -960,10 +965,11 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, } } -bool Version::IsFilterSkipped(int level) { +bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) { // Reaching the bottom level implies misses at all upper levels, so we'll // skip checking the filters when we predict a hit. - return cfd_->ioptions()->optimize_filters_for_hits && level > 0 && + return cfd_->ioptions()->optimize_filters_for_hits && + (level > 0 || is_file_last_in_level) && level == storage_info_.num_non_empty_levels() - 1; } diff --git a/db/version_set.h b/db/version_set.h index 097109fd4..7ce4a6bdf 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -530,7 +530,7 @@ class Version { // checked during read operations. In certain cases (trivial move or preload), // the filter block may already be cached, but we still do not access it such // that it eventually expires from the cache. - bool IsFilterSkipped(int level); + bool IsFilterSkipped(int level, bool is_file_last_in_level = false); // The helper function of UpdateAccumulatedStats, which may fill the missing // fields of file_mata from its associated TableProperties. diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 5d008b3a4..c475dbeb6 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -308,6 +308,120 @@ TEST_F(WriteBatchTest, Blob) { handler.seen); } +// It requires more than 30GB of memory to run the test. With single memory +// allocation of more than 30GB. +// Not all platform can run it. Also it runs a long time. So disable it. +TEST_F(WriteBatchTest, DISABLED_ManyUpdates) { + // Insert key and value of 3GB and push total batch size to 12GB. + static const size_t kKeyValueSize = 4u; + static const uint32_t kNumUpdates = 3 << 30; + std::string raw(kKeyValueSize, 'A'); + WriteBatch batch(kNumUpdates * (4 + kKeyValueSize * 2) + 1024u); + char c = 'A'; + for (uint32_t i = 0; i < kNumUpdates; i++) { + if (c > 'Z') { + c = 'A'; + } + raw[0] = c; + raw[raw.length() - 1] = c; + c++; + batch.Put(raw, raw); + } + + ASSERT_EQ(kNumUpdates, batch.Count()); + + struct NoopHandler : public WriteBatch::Handler { + uint32_t num_seen = 0; + char expected_char = 'A'; + virtual Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + EXPECT_EQ(kKeyValueSize, key.size()); + EXPECT_EQ(kKeyValueSize, value.size()); + EXPECT_EQ(expected_char, key[0]); + EXPECT_EQ(expected_char, value[0]); + EXPECT_EQ(expected_char, key[kKeyValueSize - 1]); + EXPECT_EQ(expected_char, value[kKeyValueSize - 1]); + expected_char++; + if (expected_char > 'Z') { + expected_char = 'A'; + } + ++num_seen; + return Status::OK(); + } + virtual Status DeleteCF(uint32_t column_family_id, + const Slice& key) override { + EXPECT_TRUE(false); + return Status::OK(); + } + virtual Status SingleDeleteCF(uint32_t column_family_id, + const Slice& key) override { + EXPECT_TRUE(false); + return Status::OK(); + } + virtual Status MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + EXPECT_TRUE(false); + return Status::OK(); + } + virtual void LogData(const Slice& blob) override { EXPECT_TRUE(false); } + virtual bool Continue() override { return num_seen < kNumUpdates; } + } handler; + + batch.Iterate(&handler); + ASSERT_EQ(kNumUpdates, handler.num_seen); +} + +// The test requires more than 18GB memory to run it, with single memory +// allocation of more than 12GB. Not all the platform can run it. So disable it. +TEST_F(WriteBatchTest, DISABLED_LargeKeyValue) { + // Insert key and value of 3GB and push total batch size to 12GB. + static const size_t kKeyValueSize = 3221225472u; + std::string raw(kKeyValueSize, 'A'); + WriteBatch batch(12884901888u + 1024u); + for (char i = 0; i < 2; i++) { + raw[0] = 'A' + i; + raw[raw.length() - 1] = 'A' - i; + batch.Put(raw, raw); + } + + ASSERT_EQ(2, batch.Count()); + + struct NoopHandler : public WriteBatch::Handler { + int num_seen = 0; + virtual Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + EXPECT_EQ(kKeyValueSize, key.size()); + EXPECT_EQ(kKeyValueSize, value.size()); + EXPECT_EQ('A' + num_seen, key[0]); + EXPECT_EQ('A' + num_seen, value[0]); + EXPECT_EQ('A' - num_seen, key[kKeyValueSize - 1]); + EXPECT_EQ('A' - num_seen, value[kKeyValueSize - 1]); + ++num_seen; + return Status::OK(); + } + virtual Status DeleteCF(uint32_t column_family_id, + const Slice& key) override { + EXPECT_TRUE(false); + return Status::OK(); + } + virtual Status SingleDeleteCF(uint32_t column_family_id, + const Slice& key) override { + EXPECT_TRUE(false); + return Status::OK(); + } + virtual Status MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + EXPECT_TRUE(false); + return Status::OK(); + } + virtual void LogData(const Slice& blob) override { EXPECT_TRUE(false); } + virtual bool Continue() override { return num_seen < 2; } + } handler; + + batch.Iterate(&handler); + ASSERT_EQ(2, handler.num_seen); +} + TEST_F(WriteBatchTest, Continue) { WriteBatch batch; diff --git a/db/write_controller.cc b/db/write_controller.cc index 7a933ec42..a0c18835f 100644 --- a/db/write_controller.cc +++ b/db/write_controller.cc @@ -26,6 +26,13 @@ std::unique_ptr WriteController::GetDelayToken( return std::unique_ptr(new DelayWriteToken(this)); } +std::unique_ptr +WriteController::GetCompactionPressureToken() { + ++total_compaction_pressure_; + return std::unique_ptr( + new CompactionPressureToken(this)); +} + bool WriteController::IsStopped() const { return total_stopped_ > 0; } // This is inside DB mutex, so we can't sleep and need to minimize // frequency to get time. @@ -106,4 +113,9 @@ DelayWriteToken::~DelayWriteToken() { assert(controller_->total_delayed_ >= 0); } +CompactionPressureToken::~CompactionPressureToken() { + controller_->total_compaction_pressure_--; + assert(controller_->total_compaction_pressure_ >= 0); +} + } // namespace rocksdb diff --git a/db/write_controller.h b/db/write_controller.h index a5d498c3a..6cba2c643 100644 --- a/db/write_controller.h +++ b/db/write_controller.h @@ -23,6 +23,7 @@ class WriteController { explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u) : total_stopped_(0), total_delayed_(0), + total_compaction_pressure_(0), bytes_left_(0), last_refill_time_(0) { set_delayed_write_rate(_delayed_write_rate); @@ -38,10 +39,16 @@ class WriteController { // which returns number of microseconds to sleep. std::unique_ptr GetDelayToken( uint64_t delayed_write_rate); + // When an actor (column family) requests a moderate token, compaction + // threads will be increased + std::unique_ptr GetCompactionPressureToken(); - // these two metods are querying the state of the WriteController + // these three metods are querying the state of the WriteController bool IsStopped() const; bool NeedsDelay() const { return total_delayed_ > 0; } + bool NeedSpeedupCompaction() const { + return IsStopped() || NeedsDelay() || total_compaction_pressure_ > 0; + } // return how many microseconds the caller needs to sleep after the call // num_bytes: how many number of bytes to put into the DB. // Prerequisite: DB mutex held. @@ -59,9 +66,11 @@ class WriteController { friend class WriteControllerToken; friend class StopWriteToken; friend class DelayWriteToken; + friend class CompactionPressureToken; int total_stopped_; int total_delayed_; + int total_compaction_pressure_; uint64_t bytes_left_; uint64_t last_refill_time_; uint64_t delayed_write_rate_; @@ -96,4 +105,11 @@ class DelayWriteToken : public WriteControllerToken { virtual ~DelayWriteToken(); }; +class CompactionPressureToken : public WriteControllerToken { + public: + explicit CompactionPressureToken(WriteController* controller) + : WriteControllerToken(controller) {} + virtual ~CompactionPressureToken(); +}; + } // namespace rocksdb diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index a3f410422..a26ed7d81 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -933,8 +933,19 @@ struct DBOptions { // regardless of this setting uint64_t delete_obsolete_files_period_micros; + // Suggested number of concurrent background compaction jobs, submitted to + // the default LOW priority thread pool. + // + // Default: max_background_compactions + int base_background_compactions; + // Maximum number of concurrent background compaction jobs, submitted to // the default LOW priority thread pool. + // We first try to schedule compactions based on + // `base_background_compactions`. If the compaction cannot catch up , we + // will increase number of compaction threads up to + // `max_background_compactions`. + // // If you're increasing this, also consider increasing number of threads in // LOW priority thread pool. For more information, see // Env::SetBackgroundThreads @@ -1110,6 +1121,9 @@ struct DBOptions { // This option is currently honored only on Windows // // Default: 1 Mb + // + // Special value: 0 - means do not maintain per instance buffer. Allocate + // per request buffer and avoid locking. size_t random_access_max_buffer_size; // This is the maximum buffer size that is used by WritableFileWriter. diff --git a/include/rocksdb/perf_context.h b/include/rocksdb/perf_context.h index c2af729e3..7a6b6f367 100644 --- a/include/rocksdb/perf_context.h +++ b/include/rocksdb/perf_context.h @@ -21,7 +21,7 @@ struct PerfContext { void Reset(); // reset all performance counters to zero - std::string ToString() const; + std::string ToString(bool exclude_zero_counters = false) const; uint64_t user_key_comparison_count; // total number of user key comparisons uint64_t block_cache_hit_count; // total number of block cache hits diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 813104b99..35de93334 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -280,6 +280,10 @@ enum Histograms : uint32_t { SST_READ_MICROS, // The number of subcompactions actually scheduled during a compaction NUM_SUBCOMPACTIONS_SCHEDULED, + // Value size distribution in each operation + BYTES_PER_READ, + BYTES_PER_WRITE, + BYTES_PER_MULTIGET, HISTOGRAM_ENUM_MAX, // TODO(ldemailly): enforce HistogramsNameMap match }; @@ -307,6 +311,9 @@ const std::vector> HistogramsNameMap = { {WRITE_STALL, "rocksdb.db.write.stall"}, {SST_READ_MICROS, "rocksdb.sst.read.micros"}, {NUM_SUBCOMPACTIONS_SCHEDULED, "rocksdb.num.subcompactions.scheduled"}, + {BYTES_PER_READ, "rocksdb.bytes.per.read"}, + {BYTES_PER_WRITE, "rocksdb.bytes.per.write"}, + {BYTES_PER_MULTIGET, "rocksdb.bytes.per.multiget"}, }; struct HistogramData { diff --git a/port/win/env_win.cc b/port/win/env_win.cc index 977c80b88..1ae0b1932 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -766,6 +766,18 @@ class WinRandomAccessFile : public RandomAccessFile { return read; } + void CalculateReadParameters(uint64_t offset, size_t bytes_requested, + size_t& actual_bytes_toread, + uint64_t& first_page_start) const { + + const size_t alignment = buffer_.Alignment(); + + first_page_start = TruncateToPageBoundary(alignment, offset); + const uint64_t last_page_start = + TruncateToPageBoundary(alignment, offset + bytes_requested - 1); + actual_bytes_toread = (last_page_start - first_page_start) + alignment; + } + public: WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment, const EnvOptions& options) @@ -797,66 +809,87 @@ class WinRandomAccessFile : public RandomAccessFile { virtual Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const override { + Status s; SSIZE_T r = -1; size_t left = n; char* dest = scratch; + if (n == 0) { + *result = Slice(scratch, 0); + return s; + } + // When in unbuffered mode we need to do the following changes: // - use our own aligned buffer // - always read at the offset of that is a multiple of alignment if (!use_os_buffer_) { - std::unique_lock lock(buffer_mut_); - // Let's see if at least some of the requested data is already - // in the buffer - if (offset >= buffered_start_ && - offset < (buffered_start_ + buffer_.CurrentSize())) { - size_t buffer_offset = offset - buffered_start_; - r = buffer_.Read(dest, buffer_offset, left); - assert(r >= 0); + uint64_t first_page_start = 0; + size_t actual_bytes_toread = 0; + size_t bytes_requested = left; - left -= size_t(r); - offset += r; - dest += r; - } + if (!read_ahead_ && random_access_max_buffer_size_ == 0) { + CalculateReadParameters(offset, bytes_requested, actual_bytes_toread, + first_page_start); - // Still some left or none was buffered - if (left > 0) { - // Figure out the start/end offset for reading and amount to read - const size_t alignment = buffer_.Alignment(); - const size_t first_page_start = - TruncateToPageBoundary(alignment, offset); + assert(actual_bytes_toread > 0); + + r = ReadIntoOneShotBuffer(offset, first_page_start, + actual_bytes_toread, left, dest); + } else { + + std::unique_lock lock(buffer_mut_); + + // Let's see if at least some of the requested data is already + // in the buffer + if (offset >= buffered_start_ && + offset < (buffered_start_ + buffer_.CurrentSize())) { + size_t buffer_offset = offset - buffered_start_; + r = buffer_.Read(dest, buffer_offset, left); + assert(r >= 0); - size_t bytes_requested = left; - if (read_ahead_ && bytes_requested < compaction_readahead_size_) { - bytes_requested = compaction_readahead_size_; + left -= size_t(r); + offset += r; + dest += r; } - const size_t last_page_start = - TruncateToPageBoundary(alignment, offset + bytes_requested - 1); - const size_t actual_bytes_toread = - (last_page_start - first_page_start) + alignment; + // Still some left or none was buffered + if (left > 0) { + // Figure out the start/end offset for reading and amount to read + bytes_requested = left; - if (buffer_.Capacity() < actual_bytes_toread) { - // If we are in read-ahead mode or the requested size - // exceeds max buffer size then use one-shot - // big buffer otherwise reallocate main buffer - if (read_ahead_ || + if (read_ahead_ && bytes_requested < compaction_readahead_size_) { + bytes_requested = compaction_readahead_size_; + } + + CalculateReadParameters(offset, bytes_requested, actual_bytes_toread, + first_page_start); + + assert(actual_bytes_toread > 0); + + if (buffer_.Capacity() < actual_bytes_toread) { + // If we are in read-ahead mode or the requested size + // exceeds max buffer size then use one-shot + // big buffer otherwise reallocate main buffer + if (read_ahead_ || (actual_bytes_toread > random_access_max_buffer_size_)) { - // Unlock the mutex since we are not using instance buffer - lock.unlock(); - r = ReadIntoOneShotBuffer(offset, first_page_start, - actual_bytes_toread, left, dest); - } else { - buffer_.AllocateNewBuffer(actual_bytes_toread); + // Unlock the mutex since we are not using instance buffer + lock.unlock(); + r = ReadIntoOneShotBuffer(offset, first_page_start, + actual_bytes_toread, left, dest); + } + else { + buffer_.AllocateNewBuffer(actual_bytes_toread); + r = ReadIntoInstanceBuffer(offset, first_page_start, + actual_bytes_toread, left, dest); + } + } + else { + buffer_.Clear(); r = ReadIntoInstanceBuffer(offset, first_page_start, - actual_bytes_toread, left, dest); + actual_bytes_toread, left, dest); } - } else { - buffer_.Clear(); - r = ReadIntoInstanceBuffer(offset, first_page_start, - actual_bytes_toread, left, dest); } } } else { @@ -1105,6 +1138,8 @@ void WinthreadCall(const char* label, std::error_code result) { } } +typedef VOID(WINAPI * FnGetSystemTimePreciseAsFileTime)(LPFILETIME); + class WinEnv : public Env { public: WinEnv(); @@ -1643,25 +1678,29 @@ class WinEnv : public Env { } virtual uint64_t NowMicros() override { - // all std::chrono clocks on windows proved to return - // values that may repeat that is not good enough for some uses. - const int64_t c_UnixEpochStartTicks = 116444736000000000i64; - const int64_t c_FtToMicroSec = 10; - - // This interface needs to return system time and not - // just any microseconds because it is often used as an argument - // to TimedWait() on condition variable - FILETIME ftSystemTime; - GetSystemTimePreciseAsFileTime(&ftSystemTime); - - LARGE_INTEGER li; - li.LowPart = ftSystemTime.dwLowDateTime; - li.HighPart = ftSystemTime.dwHighDateTime; - // Subtract unix epoch start - li.QuadPart -= c_UnixEpochStartTicks; - // Convert to microsecs - li.QuadPart /= c_FtToMicroSec; - return li.QuadPart; + if (GetSystemTimePreciseAsFileTime_ != NULL) { + // all std::chrono clocks on windows proved to return + // values that may repeat that is not good enough for some uses. + const int64_t c_UnixEpochStartTicks = 116444736000000000i64; + const int64_t c_FtToMicroSec = 10; + + // This interface needs to return system time and not + // just any microseconds because it is often used as an argument + // to TimedWait() on condition variable + FILETIME ftSystemTime; + GetSystemTimePreciseAsFileTime_(&ftSystemTime); + + LARGE_INTEGER li; + li.LowPart = ftSystemTime.dwLowDateTime; + li.HighPart = ftSystemTime.dwHighDateTime; + // Subtract unix epoch start + li.QuadPart -= c_UnixEpochStartTicks; + // Convert to microsecs + li.QuadPart /= c_FtToMicroSec; + return li.QuadPart; + } + using namespace std::chrono; + return duration_cast(system_clock::now().time_since_epoch()).count(); } virtual uint64_t NowNanos() override { @@ -2071,6 +2110,7 @@ class WinEnv : public Env { std::vector thread_pools_; mutable std::mutex mu_; std::vector threads_to_join_; + FnGetSystemTimePreciseAsFileTime GetSystemTimePreciseAsFileTime_; }; WinEnv::WinEnv() @@ -2079,7 +2119,15 @@ WinEnv::WinEnv() page_size_(4 * 1012), allocation_granularity_(page_size_), perf_counter_frequency_(0), - thread_pools_(Priority::TOTAL) { + thread_pools_(Priority::TOTAL), + GetSystemTimePreciseAsFileTime_(NULL) { + + HMODULE module = GetModuleHandle("kernel32.dll"); + if (module != NULL) { + GetSystemTimePreciseAsFileTime_ = (FnGetSystemTimePreciseAsFileTime)GetProcAddress( + module, "GetSystemTimePreciseAsFileTime"); + } + SYSTEM_INFO sinfo; GetSystemInfo(&sinfo); diff --git a/util/options.cc b/util/options.cc index d21d2a24b..00d797167 100644 --- a/util/options.cc +++ b/util/options.cc @@ -229,6 +229,7 @@ DBOptions::DBOptions() db_log_dir(""), wal_dir(""), delete_obsolete_files_period_micros(6ULL * 60 * 60 * 1000000), + base_background_compactions(-1), max_background_compactions(1), max_subcompactions(1), max_background_flushes(1), @@ -295,6 +296,7 @@ DBOptions::DBOptions(const Options& options) wal_dir(options.wal_dir), delete_obsolete_files_period_micros( options.delete_obsolete_files_period_micros), + base_background_compactions(options.base_background_compactions), max_background_compactions(options.max_background_compactions), max_subcompactions(options.max_subcompactions), max_background_flushes(options.max_background_flushes), @@ -383,6 +385,8 @@ void DBOptions::Dump(Logger* log) const { table_cache_numshardbits); Header(log, " Options.delete_obsolete_files_period_micros: %" PRIu64, delete_obsolete_files_period_micros); + Header(log, " Options.base_background_compactions: %d", + base_background_compactions); Header(log, " Options.max_background_compactions: %d", max_background_compactions); Header(log, " Options.max_subcompactions: %" PRIu32, @@ -652,6 +656,7 @@ Options::PrepareForBulkLoad() // to L1. This is helpful so that all files that are // input to the manual compaction are all at L0. max_background_compactions = 2; + base_background_compactions = 2; // The compaction would create large files in L1. target_file_size_base = 256 * 1024 * 1024; diff --git a/util/options_helper.h b/util/options_helper.h index 84d547cfc..4c4555aca 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -208,7 +208,7 @@ static std::unordered_map db_options_type_info = { {offsetof(struct DBOptions, random_access_max_buffer_size), OptionType::kSizeT, OptionVerificationType::kNormal}}, {"writable_file_max_buffer_size", - {offsetof(struct DBOptions, writable_file_max_buffer_size), + {offsetof(struct DBOptions, writable_file_max_buffer_size), OptionType::kSizeT, OptionVerificationType::kNormal}}, {"use_adaptive_mutex", {offsetof(struct DBOptions, use_adaptive_mutex), OptionType::kBoolean, @@ -219,6 +219,9 @@ static std::unordered_map db_options_type_info = { {"max_background_compactions", {offsetof(struct DBOptions, max_background_compactions), OptionType::kInt, OptionVerificationType::kNormal}}, + {"base_background_compactions", + {offsetof(struct DBOptions, base_background_compactions), OptionType::kInt, + OptionVerificationType::kNormal}}, {"max_background_flushes", {offsetof(struct DBOptions, max_background_flushes), OptionType::kInt, OptionVerificationType::kNormal}}, diff --git a/util/options_test.cc b/util/options_test.cc index 65c45c2b0..961593bd0 100644 --- a/util/options_test.cc +++ b/util/options_test.cc @@ -1669,50 +1669,7 @@ TEST_F(OptionsParserTest, DBOptionsAllFieldsSettable) { "table_cache_numshardbits=28;" "max_open_files=72;" "max_file_opening_threads=35;" - "max_background_compactions=33;" - "use_fsync=true;" - "use_adaptive_mutex=false;" - "max_total_wal_size=4295005604;" - "compaction_readahead_size=0;" - "new_table_reader_for_compaction_inputs=false;" - "keep_log_file_num=4890;" - "skip_stats_update_on_db_open=false;" - "max_manifest_file_size=4295009941;" - "db_log_dir=path/to/db_log_dir;" - "skip_log_error_on_recovery=true;" - "writable_file_max_buffer_size=1048576;" - "paranoid_checks=true;" - "is_fd_close_on_exec=false;" - "bytes_per_sync=4295013613;" - "enable_thread_tracking=false;" - "disable_data_sync=false;" - "recycle_log_file_num=0;" - "disableDataSync=false;" - "create_missing_column_families=true;" - "log_file_time_to_roll=3097;" - "max_background_flushes=35;" - "create_if_missing=false;" - "error_if_exists=true;" - "allow_os_buffer=false;" - "delayed_write_rate=4294976214;" - "manifest_preallocation_size=1222;" - "allow_mmap_writes=false;" - "stats_dump_period_sec=70127;" - "allow_fallocate=true;" - "allow_mmap_reads=false;" - "max_log_file_size=4607;" - "random_access_max_buffer_size=1048576;" - "advise_random_on_open=true;" - "wal_bytes_per_sync=4295048118;" - "delete_obsolete_files_period_micros=4294967758;" - "WAL_ttl_seconds=4295008036;" - "WAL_size_limit_MB=4295036161;" - "wal_dir=path/to/wal_dir;" - "db_write_buffer_size=2587;" - "max_subcompactions=64330;" - "table_cache_numshardbits=28;" - "max_open_files=72;" - "max_file_opening_threads=35;" + "base_background_compactions=3;" "max_background_compactions=33;" "use_fsync=true;" "use_adaptive_mutex=false;" diff --git a/util/perf_context.cc b/util/perf_context.cc index 282516590..07bad40f2 100644 --- a/util/perf_context.cc +++ b/util/perf_context.cc @@ -61,32 +61,54 @@ void PerfContext::Reset() { #endif } -#define OUTPUT(counter) #counter << " = " << counter << ", " +#define OUTPUT(counter) \ + if (!exclude_zero_counters || (counter > 0)) { \ + ss << #counter << " = " << counter << ", "; \ + } -std::string PerfContext::ToString() const { +std::string PerfContext::ToString(bool exclude_zero_counters) const { #if defined(NPERF_CONTEXT) || defined(IOS_CROSS_COMPILE) return ""; #else std::ostringstream ss; - ss << OUTPUT(user_key_comparison_count) << OUTPUT(block_cache_hit_count) - << OUTPUT(block_read_count) << OUTPUT(block_read_byte) - << OUTPUT(block_read_time) << OUTPUT(block_checksum_time) - << OUTPUT(block_decompress_time) << OUTPUT(internal_key_skipped_count) - << OUTPUT(internal_delete_skipped_count) << OUTPUT(write_wal_time) - << OUTPUT(get_snapshot_time) << OUTPUT(get_from_memtable_time) - << OUTPUT(get_from_memtable_count) << OUTPUT(get_post_process_time) - << OUTPUT(get_from_output_files_time) << OUTPUT(seek_on_memtable_time) - << OUTPUT(seek_on_memtable_count) << OUTPUT(seek_child_seek_time) - << OUTPUT(seek_child_seek_count) << OUTPUT(seek_min_heap_time) - << OUTPUT(seek_internal_seek_time) << OUTPUT(find_next_user_entry_time) - << OUTPUT(write_pre_and_post_process_time) << OUTPUT(write_memtable_time) - << OUTPUT(db_mutex_lock_nanos) << OUTPUT(db_condition_wait_nanos) - << OUTPUT(merge_operator_time_nanos) << OUTPUT(write_delay_time) - << OUTPUT(read_index_block_nanos) << OUTPUT(read_filter_block_nanos) - << OUTPUT(new_table_block_iter_nanos) << OUTPUT(new_table_iterator_nanos) - << OUTPUT(block_seek_nanos) << OUTPUT(find_table_nanos) - << OUTPUT(bloom_memtable_hit_count) << OUTPUT(bloom_memtable_miss_count) - << OUTPUT(bloom_sst_hit_count) << OUTPUT(bloom_sst_miss_count); + OUTPUT(user_key_comparison_count); + OUTPUT(block_cache_hit_count); + OUTPUT(block_read_count); + OUTPUT(block_read_byte); + OUTPUT(block_read_time); + OUTPUT(block_checksum_time); + OUTPUT(block_decompress_time); + OUTPUT(internal_key_skipped_count); + OUTPUT(internal_delete_skipped_count); + OUTPUT(write_wal_time); + OUTPUT(get_snapshot_time); + OUTPUT(get_from_memtable_time); + OUTPUT(get_from_memtable_count); + OUTPUT(get_post_process_time); + OUTPUT(get_from_output_files_time); + OUTPUT(seek_on_memtable_time); + OUTPUT(seek_on_memtable_count); + OUTPUT(seek_child_seek_time); + OUTPUT(seek_child_seek_count); + OUTPUT(seek_min_heap_time); + OUTPUT(seek_internal_seek_time); + OUTPUT(find_next_user_entry_time); + OUTPUT(write_pre_and_post_process_time); + OUTPUT(write_memtable_time); + OUTPUT(db_mutex_lock_nanos); + OUTPUT(db_condition_wait_nanos); + OUTPUT(merge_operator_time_nanos); + OUTPUT(write_delay_time); + OUTPUT(read_index_block_nanos); + OUTPUT(read_filter_block_nanos); + OUTPUT(new_table_block_iter_nanos); + OUTPUT(new_table_iterator_nanos); + OUTPUT(block_seek_nanos); + OUTPUT(find_table_nanos); + OUTPUT(bloom_memtable_hit_count); + OUTPUT(bloom_memtable_miss_count); + OUTPUT(bloom_sst_hit_count); + OUTPUT(bloom_sst_miss_count); return ss.str(); #endif }