From 6ee38bb15c5421f4089e680ba67054cc14540b9d Mon Sep 17 00:00:00 2001 From: sdong Date: Thu, 28 Jan 2016 11:43:28 -0800 Subject: [PATCH 01/16] Slowdown of writing to the last memtable should not override stopping Summary: Now slowing down for the last mem table takes priority against some stopping conditions. This is logically confusing. Fix it. Test Plan: Run all existing tests. Reviewers: yhchiang, IslamAbdelRahman, kradhakrishnan, andrewkr, anthony Reviewed By: anthony Subscribers: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D53529 --- db/column_family.cc | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 408f53831..4c12a35bd 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -531,21 +531,6 @@ void ColumnFamilyData::RecalculateWriteStallConditions( "(waiting for flush), max_write_buffer_number is set to %d", name_.c_str(), imm()->NumNotFlushed(), mutable_cf_options.max_write_buffer_number); - } else if (mutable_cf_options.max_write_buffer_number > 3 && - imm()->NumNotFlushed() >= - mutable_cf_options.max_write_buffer_number - 1) { - write_controller_token_ = - SetupDelay(ioptions_.delayed_write_rate, write_controller, - compaction_needed_bytes, prev_compaction_needed_bytes_, - mutable_cf_options.disable_auto_compactions); - internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1); - Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, - "[%s] Stalling writes because we have %d immutable memtables " - "(waiting for flush), max_write_buffer_number is set to %d " - "rate %" PRIu64, - name_.c_str(), imm()->NumNotFlushed(), - mutable_cf_options.max_write_buffer_number, - write_controller->delayed_write_rate()); } else if (vstorage->l0_delay_trigger_count() >= mutable_cf_options.level0_stop_writes_trigger) { write_controller_token_ = write_controller->GetStopToken(); @@ -567,6 +552,21 @@ void ColumnFamilyData::RecalculateWriteStallConditions( "[%s] Stopping writes because of estimated pending compaction " "bytes %" PRIu64, name_.c_str(), compaction_needed_bytes); + } else if (mutable_cf_options.max_write_buffer_number > 3 && + imm()->NumNotFlushed() >= + mutable_cf_options.max_write_buffer_number - 1) { + write_controller_token_ = + SetupDelay(ioptions_.delayed_write_rate, write_controller, + compaction_needed_bytes, prev_compaction_needed_bytes_, + mutable_cf_options.disable_auto_compactions); + internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1); + Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, + "[%s] Stalling writes because we have %d immutable memtables " + "(waiting for flush), max_write_buffer_number is set to %d " + "rate %" PRIu64, + name_.c_str(), imm()->NumNotFlushed(), + mutable_cf_options.max_write_buffer_number, + write_controller->delayed_write_rate()); } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 && vstorage->l0_delay_trigger_count() >= mutable_cf_options.level0_slowdown_writes_trigger) { From 3b2a1ddd2e9ca0998aa711644258675324febf6a Mon Sep 17 00:00:00 2001 From: Venkatesh Radhakrishnan Date: Thu, 28 Jan 2016 11:56:16 -0800 Subject: [PATCH 02/16] Add options.base_background_compactions as a number of compaction threads for low compaction debt Summary: If options.base_background_compactions is given, we try to schedule number of compactions not existing this number, only when L0 files increase to certain number, or pending compaction bytes more than certain threshold, we schedule compactions based on options.max_background_compactions. The watermarks are calculated based on slowdown thresholds. Test Plan: Add new test cases in column_family_test. Adding more unit tests. Reviewers: IslamAbdelRahman, yhchiang, kradhakrishnan, rven, anthony Reviewed By: anthony Subscribers: leveldb, dhruba, yoshinorim Differential Revision: https://reviews.facebook.net/D53409 --- db/column_family.cc | 49 +++++++++++++ db/column_family_test.cc | 142 ++++++++++++++++++++++++++++++++++++++ db/db_compaction_test.cc | 117 +++++++++++++++++++++++++++++++ db/db_impl.cc | 28 ++++++-- db/db_impl.h | 4 ++ db/write_controller.cc | 12 ++++ db/write_controller.h | 18 ++++- include/rocksdb/options.h | 11 +++ util/options.cc | 5 ++ util/options_helper.h | 5 +- util/options_test.cc | 45 +----------- 11 files changed, 384 insertions(+), 52 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 4c12a35bd..ca3be7855 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -239,6 +239,17 @@ ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options, result.level0_slowdown_writes_trigger, result.level0_file_num_compaction_trigger); } + + if (result.soft_pending_compaction_bytes_limit == 0) { + result.soft_pending_compaction_bytes_limit = + result.hard_pending_compaction_bytes_limit; + } else if (result.hard_pending_compaction_bytes_limit > 0 && + result.soft_pending_compaction_bytes_limit > + result.hard_pending_compaction_bytes_limit) { + result.soft_pending_compaction_bytes_limit = + result.hard_pending_compaction_bytes_limit; + } + if (result.level_compaction_dynamic_level_bytes) { if (result.compaction_style != kCompactionStyleLevel || db_options.db_paths.size() > 1U) { @@ -513,6 +524,21 @@ std::unique_ptr SetupDelay( } return write_controller->GetDelayToken(write_rate); } + +int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger, + int level0_slowdown_writes_trigger) { + // SanitizeOptions() ensures it. + assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger); + + // 1/4 of the way between L0 compaction trigger threshold and slowdown + // condition. + // Or twice as compaction trigger, if it is smaller. + return std::min(level0_file_num_compaction_trigger * 2, + level0_file_num_compaction_trigger + + (level0_slowdown_writes_trigger - + level0_file_num_compaction_trigger) / + 4); +} } // namespace void ColumnFamilyData::RecalculateWriteStallConditions( @@ -598,6 +624,29 @@ void ColumnFamilyData::RecalculateWriteStallConditions( "bytes %" PRIu64 " rate %" PRIu64, name_.c_str(), vstorage->estimated_compaction_needed_bytes(), write_controller->delayed_write_rate()); + } else if (vstorage->l0_delay_trigger_count() >= + GetL0ThresholdSpeedupCompaction( + mutable_cf_options.level0_file_num_compaction_trigger, + mutable_cf_options.level0_slowdown_writes_trigger)) { + write_controller_token_ = write_controller->GetCompactionPressureToken(); + Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, + "[%s] Increasing compaction threads because we have %d level-0 " + "files ", + name_.c_str(), vstorage->l0_delay_trigger_count()); + } else if (vstorage->estimated_compaction_needed_bytes() >= + mutable_cf_options.soft_pending_compaction_bytes_limit / 4) { + // Increase compaction threads if bytes needed for compaction exceeds + // 1/4 of threshold for slowing down. + // If soft pending compaction byte limit is not set, always speed up + // compaction. + write_controller_token_ = write_controller->GetCompactionPressureToken(); + if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) { + Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, + "[%s] Increasing compaction threads because of estimated pending " + "compaction " + "bytes %" PRIu64, + name_.c_str(), vstorage->estimated_compaction_needed_bytes()); + } } else { write_controller_token_.reset(); } diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 62fadbbee..114451464 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -2137,6 +2137,9 @@ TEST_F(ColumnFamilyTest, CreateAndDropRace) { TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { const uint64_t kBaseRate = 810000u; db_options_.delayed_write_rate = kBaseRate; + db_options_.base_background_compactions = 2; + db_options_.max_background_compactions = 6; + Open({"default"}); ColumnFamilyData* cfd = static_cast(db_->DefaultColumnFamily())->cfd(); @@ -2162,6 +2165,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); vstorage->TEST_set_estimated_compaction_needed_bytes(400); cfd->RecalculateWriteStallConditions(mutable_cf_options); @@ -2169,6 +2173,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate / 1.2, dbfull()->TEST_write_controler().delayed_write_rate()); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); vstorage->TEST_set_estimated_compaction_needed_bytes(500); cfd->RecalculateWriteStallConditions(mutable_cf_options); @@ -2224,6 +2229,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); vstorage->TEST_set_estimated_compaction_needed_bytes(3001); cfd->RecalculateWriteStallConditions(mutable_cf_options); @@ -2248,6 +2254,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate / 1.2, dbfull()->TEST_write_controler().delayed_write_rate()); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); vstorage->set_l0_delay_trigger_count(101); cfd->RecalculateWriteStallConditions(mutable_cf_options); @@ -2320,6 +2327,73 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { dbfull()->TEST_write_controler().delayed_write_rate()); } +TEST_F(ColumnFamilyTest, CompactionSpeedupSingleColumnFamily) { + db_options_.base_background_compactions = 2; + db_options_.max_background_compactions = 6; + Open({"default"}); + ColumnFamilyData* cfd = + static_cast(db_->DefaultColumnFamily())->cfd(); + + VersionStorageInfo* vstorage = cfd->current()->storage_info(); + + MutableCFOptions mutable_cf_options( + Options(db_options_, column_family_options_), + ImmutableCFOptions(Options(db_options_, column_family_options_))); + + // Speed up threshold = min(4 * 2, 4 + (36 - 4)/4) = 8 + mutable_cf_options.level0_file_num_compaction_trigger = 4; + mutable_cf_options.level0_slowdown_writes_trigger = 36; + mutable_cf_options.level0_stop_writes_trigger = 50; + // Speedup threshold = 200 / 4 = 50 + mutable_cf_options.soft_pending_compaction_bytes_limit = 200; + mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; + + vstorage->TEST_set_estimated_compaction_needed_bytes(40); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(50); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(300); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(45); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + + vstorage->set_l0_delay_trigger_count(7); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + + vstorage->set_l0_delay_trigger_count(9); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage->set_l0_delay_trigger_count(6); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + + // Speed up threshold = min(4 * 2, 4 + (12 - 4)/4) = 6 + mutable_cf_options.level0_file_num_compaction_trigger = 4; + mutable_cf_options.level0_slowdown_writes_trigger = 16; + mutable_cf_options.level0_stop_writes_trigger = 30; + + vstorage->set_l0_delay_trigger_count(5); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + + vstorage->set_l0_delay_trigger_count(7); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage->set_l0_delay_trigger_count(3); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); +} + TEST_F(ColumnFamilyTest, WriteStallTwoColumnFamilies) { const uint64_t kBaseRate = 810000u; db_options_.delayed_write_rate = kBaseRate; @@ -2401,6 +2475,74 @@ TEST_F(ColumnFamilyTest, WriteStallTwoColumnFamilies) { ASSERT_EQ(kBaseRate / 1.2, dbfull()->TEST_write_controler().delayed_write_rate()); } + +TEST_F(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) { + db_options_.base_background_compactions = 2; + db_options_.max_background_compactions = 6; + column_family_options_.soft_pending_compaction_bytes_limit = 200; + column_family_options_.hard_pending_compaction_bytes_limit = 2000; + Open(); + CreateColumnFamilies({"one"}); + ColumnFamilyData* cfd = + static_cast(db_->DefaultColumnFamily())->cfd(); + VersionStorageInfo* vstorage = cfd->current()->storage_info(); + + ColumnFamilyData* cfd1 = + static_cast(handles_[1])->cfd(); + VersionStorageInfo* vstorage1 = cfd1->current()->storage_info(); + + MutableCFOptions mutable_cf_options( + Options(db_options_, column_family_options_), + ImmutableCFOptions(Options(db_options_, column_family_options_))); + // Speed up threshold = min(4 * 2, 4 + (36 - 4)/4) = 8 + mutable_cf_options.level0_file_num_compaction_trigger = 4; + mutable_cf_options.level0_slowdown_writes_trigger = 36; + mutable_cf_options.level0_stop_writes_trigger = 30; + // Speedup threshold = 200 / 4 = 50 + mutable_cf_options.soft_pending_compaction_bytes_limit = 200; + mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; + + MutableCFOptions mutable_cf_options1 = mutable_cf_options; + mutable_cf_options1.level0_slowdown_writes_trigger = 16; + + vstorage->TEST_set_estimated_compaction_needed_bytes(40); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(60); + cfd1->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage1->TEST_set_estimated_compaction_needed_bytes(30); + cfd1->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage1->TEST_set_estimated_compaction_needed_bytes(70); + cfd1->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(20); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage1->TEST_set_estimated_compaction_needed_bytes(3); + cfd1->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + + vstorage->set_l0_delay_trigger_count(9); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage1->set_l0_delay_trigger_count(2); + cfd1->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage->set_l0_delay_trigger_count(0); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); +} } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index d29b50e7f..5d9e0536f 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -533,6 +533,104 @@ TEST_P(DBCompactionTestWithParam, CompactionTrigger) { ASSERT_EQ(NumTableFilesAtLevel(1, 1), 1); } +TEST_F(DBCompactionTest, BGCompactionsAllowed) { + // Create several column families. Make compaction triggers in all of them + // and see number of compactions scheduled to be less than allowed. + const int kNumKeysPerFile = 100; + + Options options; + options.write_buffer_size = 110 << 10; // 110KB + options.arena_block_size = 4 << 10; + options.num_levels = 3; + // Should speed up compaction when there are 4 files. + options.level0_file_num_compaction_trigger = 2; + options.level0_slowdown_writes_trigger = 20; + options.soft_pending_compaction_bytes_limit = 1 << 30; // Infinitely large + options.base_background_compactions = 1; + options.max_background_compactions = 3; + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); + options = CurrentOptions(options); + + // Block all threads in thread pool. + const size_t kTotalTasks = 4; + env_->SetBackgroundThreads(4, Env::LOW); + test::SleepingBackgroundTask sleeping_tasks[kTotalTasks]; + for (size_t i = 0; i < kTotalTasks; i++) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_tasks[i], Env::Priority::LOW); + sleeping_tasks[i].WaitUntilSleeping(); + } + + CreateAndReopenWithCF({"one", "two", "three"}, options); + + Random rnd(301); + for (int cf = 0; cf < 4; cf++) { + for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) { + for (int i = 0; i < kNumKeysPerFile; i++) { + ASSERT_OK(Put(cf, Key(i), "")); + } + // put extra key to trigger flush + ASSERT_OK(Put(cf, "", "")); + dbfull()->TEST_WaitForFlushMemTable(handles_[cf]); + ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1); + } + } + + // Now all column families qualify compaction but only one should be + // scheduled, because no column family hits speed up condition. + ASSERT_EQ(1, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + + // Create two more files for one column family, which triggers speed up + // condition, three compactions will be scheduled. + for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) { + for (int i = 0; i < kNumKeysPerFile; i++) { + ASSERT_OK(Put(2, Key(i), "")); + } + // put extra key to trigger flush + ASSERT_OK(Put(2, "", "")); + dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1, + NumTableFilesAtLevel(0, 2)); + } + ASSERT_EQ(3, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + + // Unblock all threads to unblock all compactions. + for (size_t i = 0; i < kTotalTasks; i++) { + sleeping_tasks[i].WakeUp(); + sleeping_tasks[i].WaitUntilDone(); + } + dbfull()->TEST_WaitForCompact(); + + // Verify number of compactions allowed will come back to 1. + + for (size_t i = 0; i < kTotalTasks; i++) { + sleeping_tasks[i].Reset(); + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_tasks[i], Env::Priority::LOW); + sleeping_tasks[i].WaitUntilSleeping(); + } + for (int cf = 0; cf < 4; cf++) { + for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) { + for (int i = 0; i < kNumKeysPerFile; i++) { + ASSERT_OK(Put(cf, Key(i), "")); + } + // put extra key to trigger flush + ASSERT_OK(Put(cf, "", "")); + dbfull()->TEST_WaitForFlushMemTable(handles_[cf]); + ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1); + } + } + + // Now all column families qualify compaction but only one should be + // scheduled, because no column family hits speed up condition. + ASSERT_EQ(1, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + + for (size_t i = 0; i < kTotalTasks; i++) { + sleeping_tasks[i].WakeUp(); + sleeping_tasks[i].WaitUntilDone(); + } +} + TEST_P(DBCompactionTestWithParam, CompactionsGenerateMultipleFiles) { Options options; options.write_buffer_size = 100000000; // Large write buffer @@ -2198,6 +2296,25 @@ TEST_P(DBCompactionTestWithParam, CompressLevelCompaction) { Destroy(options); } +TEST_F(DBCompactionTest, SanitizeCompactionOptionsTest) { + Options options = CurrentOptions(); + options.max_background_compactions = 5; + options.soft_pending_compaction_bytes_limit = 0; + options.hard_pending_compaction_bytes_limit = 100; + options.create_if_missing = true; + DestroyAndReopen(options); + ASSERT_EQ(5, db_->GetOptions().base_background_compactions); + ASSERT_EQ(100, db_->GetOptions().soft_pending_compaction_bytes_limit); + + options.base_background_compactions = 4; + options.max_background_compactions = 3; + options.soft_pending_compaction_bytes_limit = 200; + options.hard_pending_compaction_bytes_limit = 150; + DestroyAndReopen(options); + ASSERT_EQ(3, db_->GetOptions().base_background_compactions); + ASSERT_EQ(150, db_->GetOptions().soft_pending_compaction_bytes_limit); +} + // This tests for a bug that could cause two level0 compactions running // concurrently // TODO(aekmekji): Make sure that the reason this fails when run with diff --git a/db/db_impl.cc b/db/db_impl.cc index 6db05ae66..2afab389d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -146,6 +146,12 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { result.info_log = nullptr; } } + if (result.base_background_compactions == -1) { + result.base_background_compactions = result.max_background_compactions; + } + if (result.base_background_compactions > result.max_background_compactions) { + result.base_background_compactions = result.max_background_compactions; + } result.env->IncBackgroundThreadsIfNeeded(src.max_background_compactions, Env::Priority::LOW); result.env->IncBackgroundThreadsIfNeeded(src.max_background_flushes, @@ -2448,12 +2454,14 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this); } + auto bg_compactions_allowed = BGCompactionsAllowed(); + // special case -- if max_background_flushes == 0, then schedule flush on a // compaction thread if (db_options_.max_background_flushes == 0) { while (unscheduled_flushes_ > 0 && bg_flush_scheduled_ + bg_compaction_scheduled_ < - db_options_.max_background_compactions) { + bg_compactions_allowed) { unscheduled_flushes_--; bg_flush_scheduled_++; env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::LOW, this); @@ -2466,7 +2474,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { return; } - while (bg_compaction_scheduled_ < db_options_.max_background_compactions && + while (bg_compaction_scheduled_ < bg_compactions_allowed && unscheduled_compactions_ > 0) { CompactionArg* ca = new CompactionArg; ca->db = this; @@ -2478,6 +2486,14 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { } } +int DBImpl::BGCompactionsAllowed() const { + if (write_controller_.NeedSpeedupCompaction()) { + return db_options_.max_background_compactions; + } else { + return db_options_.base_background_compactions; + } +} + void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) { assert(!cfd->pending_compaction()); cfd->Ref(); @@ -2590,10 +2606,10 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, LogToBuffer( log_buffer, "Calling FlushMemTableToOutputFile with column " - "family [%s], flush slots available %d, compaction slots available %d", - cfd->GetName().c_str(), - db_options_.max_background_flushes - bg_flush_scheduled_, - db_options_.max_background_compactions - bg_compaction_scheduled_); + "family [%s], flush slots available %d, compaction slots allowed %d, " + "compaction slots scheduled %d", + cfd->GetName().c_str(), db_options_.max_background_flushes, + bg_flush_scheduled_, BGCompactionsAllowed() - bg_compaction_scheduled_); status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress, job_context, log_buffer); if (cfd->Unref()) { diff --git a/db/db_impl.h b/db/db_impl.h index 683fd49dc..d09d645d7 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -347,6 +347,10 @@ class DBImpl : public DB { #endif // NDEBUG + // Return maximum background compaction alowed to be scheduled based on + // compaction status. + int BGCompactionsAllowed() const; + // Returns the list of live files in 'live' and the list // of all files in the filesystem in 'candidate_files'. // If force == false and the last call was less than diff --git a/db/write_controller.cc b/db/write_controller.cc index 7a933ec42..a0c18835f 100644 --- a/db/write_controller.cc +++ b/db/write_controller.cc @@ -26,6 +26,13 @@ std::unique_ptr WriteController::GetDelayToken( return std::unique_ptr(new DelayWriteToken(this)); } +std::unique_ptr +WriteController::GetCompactionPressureToken() { + ++total_compaction_pressure_; + return std::unique_ptr( + new CompactionPressureToken(this)); +} + bool WriteController::IsStopped() const { return total_stopped_ > 0; } // This is inside DB mutex, so we can't sleep and need to minimize // frequency to get time. @@ -106,4 +113,9 @@ DelayWriteToken::~DelayWriteToken() { assert(controller_->total_delayed_ >= 0); } +CompactionPressureToken::~CompactionPressureToken() { + controller_->total_compaction_pressure_--; + assert(controller_->total_compaction_pressure_ >= 0); +} + } // namespace rocksdb diff --git a/db/write_controller.h b/db/write_controller.h index a5d498c3a..6cba2c643 100644 --- a/db/write_controller.h +++ b/db/write_controller.h @@ -23,6 +23,7 @@ class WriteController { explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u) : total_stopped_(0), total_delayed_(0), + total_compaction_pressure_(0), bytes_left_(0), last_refill_time_(0) { set_delayed_write_rate(_delayed_write_rate); @@ -38,10 +39,16 @@ class WriteController { // which returns number of microseconds to sleep. std::unique_ptr GetDelayToken( uint64_t delayed_write_rate); + // When an actor (column family) requests a moderate token, compaction + // threads will be increased + std::unique_ptr GetCompactionPressureToken(); - // these two metods are querying the state of the WriteController + // these three metods are querying the state of the WriteController bool IsStopped() const; bool NeedsDelay() const { return total_delayed_ > 0; } + bool NeedSpeedupCompaction() const { + return IsStopped() || NeedsDelay() || total_compaction_pressure_ > 0; + } // return how many microseconds the caller needs to sleep after the call // num_bytes: how many number of bytes to put into the DB. // Prerequisite: DB mutex held. @@ -59,9 +66,11 @@ class WriteController { friend class WriteControllerToken; friend class StopWriteToken; friend class DelayWriteToken; + friend class CompactionPressureToken; int total_stopped_; int total_delayed_; + int total_compaction_pressure_; uint64_t bytes_left_; uint64_t last_refill_time_; uint64_t delayed_write_rate_; @@ -96,4 +105,11 @@ class DelayWriteToken : public WriteControllerToken { virtual ~DelayWriteToken(); }; +class CompactionPressureToken : public WriteControllerToken { + public: + explicit CompactionPressureToken(WriteController* controller) + : WriteControllerToken(controller) {} + virtual ~CompactionPressureToken(); +}; + } // namespace rocksdb diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index a3f410422..9eee07ac0 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -933,8 +933,19 @@ struct DBOptions { // regardless of this setting uint64_t delete_obsolete_files_period_micros; + // Suggested number of concurrent background compaction jobs, submitted to + // the default LOW priority thread pool. + // + // Default: max_background_compactions + int base_background_compactions; + // Maximum number of concurrent background compaction jobs, submitted to // the default LOW priority thread pool. + // We first try to schedule compactions based on + // `base_background_compactions`. If the compaction cannot catch up , we + // will increase number of compaction threads up to + // `max_background_compactions`. + // // If you're increasing this, also consider increasing number of threads in // LOW priority thread pool. For more information, see // Env::SetBackgroundThreads diff --git a/util/options.cc b/util/options.cc index d21d2a24b..00d797167 100644 --- a/util/options.cc +++ b/util/options.cc @@ -229,6 +229,7 @@ DBOptions::DBOptions() db_log_dir(""), wal_dir(""), delete_obsolete_files_period_micros(6ULL * 60 * 60 * 1000000), + base_background_compactions(-1), max_background_compactions(1), max_subcompactions(1), max_background_flushes(1), @@ -295,6 +296,7 @@ DBOptions::DBOptions(const Options& options) wal_dir(options.wal_dir), delete_obsolete_files_period_micros( options.delete_obsolete_files_period_micros), + base_background_compactions(options.base_background_compactions), max_background_compactions(options.max_background_compactions), max_subcompactions(options.max_subcompactions), max_background_flushes(options.max_background_flushes), @@ -383,6 +385,8 @@ void DBOptions::Dump(Logger* log) const { table_cache_numshardbits); Header(log, " Options.delete_obsolete_files_period_micros: %" PRIu64, delete_obsolete_files_period_micros); + Header(log, " Options.base_background_compactions: %d", + base_background_compactions); Header(log, " Options.max_background_compactions: %d", max_background_compactions); Header(log, " Options.max_subcompactions: %" PRIu32, @@ -652,6 +656,7 @@ Options::PrepareForBulkLoad() // to L1. This is helpful so that all files that are // input to the manual compaction are all at L0. max_background_compactions = 2; + base_background_compactions = 2; // The compaction would create large files in L1. target_file_size_base = 256 * 1024 * 1024; diff --git a/util/options_helper.h b/util/options_helper.h index 84d547cfc..4c4555aca 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -208,7 +208,7 @@ static std::unordered_map db_options_type_info = { {offsetof(struct DBOptions, random_access_max_buffer_size), OptionType::kSizeT, OptionVerificationType::kNormal}}, {"writable_file_max_buffer_size", - {offsetof(struct DBOptions, writable_file_max_buffer_size), + {offsetof(struct DBOptions, writable_file_max_buffer_size), OptionType::kSizeT, OptionVerificationType::kNormal}}, {"use_adaptive_mutex", {offsetof(struct DBOptions, use_adaptive_mutex), OptionType::kBoolean, @@ -219,6 +219,9 @@ static std::unordered_map db_options_type_info = { {"max_background_compactions", {offsetof(struct DBOptions, max_background_compactions), OptionType::kInt, OptionVerificationType::kNormal}}, + {"base_background_compactions", + {offsetof(struct DBOptions, base_background_compactions), OptionType::kInt, + OptionVerificationType::kNormal}}, {"max_background_flushes", {offsetof(struct DBOptions, max_background_flushes), OptionType::kInt, OptionVerificationType::kNormal}}, diff --git a/util/options_test.cc b/util/options_test.cc index 65c45c2b0..961593bd0 100644 --- a/util/options_test.cc +++ b/util/options_test.cc @@ -1669,50 +1669,7 @@ TEST_F(OptionsParserTest, DBOptionsAllFieldsSettable) { "table_cache_numshardbits=28;" "max_open_files=72;" "max_file_opening_threads=35;" - "max_background_compactions=33;" - "use_fsync=true;" - "use_adaptive_mutex=false;" - "max_total_wal_size=4295005604;" - "compaction_readahead_size=0;" - "new_table_reader_for_compaction_inputs=false;" - "keep_log_file_num=4890;" - "skip_stats_update_on_db_open=false;" - "max_manifest_file_size=4295009941;" - "db_log_dir=path/to/db_log_dir;" - "skip_log_error_on_recovery=true;" - "writable_file_max_buffer_size=1048576;" - "paranoid_checks=true;" - "is_fd_close_on_exec=false;" - "bytes_per_sync=4295013613;" - "enable_thread_tracking=false;" - "disable_data_sync=false;" - "recycle_log_file_num=0;" - "disableDataSync=false;" - "create_missing_column_families=true;" - "log_file_time_to_roll=3097;" - "max_background_flushes=35;" - "create_if_missing=false;" - "error_if_exists=true;" - "allow_os_buffer=false;" - "delayed_write_rate=4294976214;" - "manifest_preallocation_size=1222;" - "allow_mmap_writes=false;" - "stats_dump_period_sec=70127;" - "allow_fallocate=true;" - "allow_mmap_reads=false;" - "max_log_file_size=4607;" - "random_access_max_buffer_size=1048576;" - "advise_random_on_open=true;" - "wal_bytes_per_sync=4295048118;" - "delete_obsolete_files_period_micros=4294967758;" - "WAL_ttl_seconds=4295008036;" - "WAL_size_limit_MB=4295036161;" - "wal_dir=path/to/wal_dir;" - "db_write_buffer_size=2587;" - "max_subcompactions=64330;" - "table_cache_numshardbits=28;" - "max_open_files=72;" - "max_file_opening_threads=35;" + "base_background_compactions=3;" "max_background_compactions=33;" "use_fsync=true;" "use_adaptive_mutex=false;" From 37159a6448f64fac976a9c96ced440bd6bfb15e0 Mon Sep 17 00:00:00 2001 From: SherlockNoMad Date: Sun, 31 Jan 2016 18:09:24 -0800 Subject: [PATCH 03/16] Add histogram for value size per operation --- db/db_impl.cc | 3 +++ include/rocksdb/statistics.h | 7 +++++++ 2 files changed, 10 insertions(+) diff --git a/db/db_impl.cc b/db/db_impl.cc index 2afab389d..51ed9a3a0 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3327,6 +3327,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, RecordTick(stats_, NUMBER_KEYS_READ); RecordTick(stats_, BYTES_READ, value->size()); + MeasureTime(stats_, BYTES_PER_READ, value->size()); } return s; } @@ -3437,6 +3438,7 @@ std::vector DBImpl::MultiGet( RecordTick(stats_, NUMBER_MULTIGET_CALLS); RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys); RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read); + MeasureTime(stats_, BYTES_PER_MULTIGET, bytes_read); PERF_TIMER_STOP(get_post_process_time); return stat_list; @@ -4321,6 +4323,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // Record statistics RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); RecordTick(stats_, BYTES_WRITTEN, total_byte_size); + MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size); PERF_TIMER_STOP(write_pre_and_post_process_time); if (write_options.disableWAL) { diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 813104b99..35de93334 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -280,6 +280,10 @@ enum Histograms : uint32_t { SST_READ_MICROS, // The number of subcompactions actually scheduled during a compaction NUM_SUBCOMPACTIONS_SCHEDULED, + // Value size distribution in each operation + BYTES_PER_READ, + BYTES_PER_WRITE, + BYTES_PER_MULTIGET, HISTOGRAM_ENUM_MAX, // TODO(ldemailly): enforce HistogramsNameMap match }; @@ -307,6 +311,9 @@ const std::vector> HistogramsNameMap = { {WRITE_STALL, "rocksdb.db.write.stall"}, {SST_READ_MICROS, "rocksdb.sst.read.micros"}, {NUM_SUBCOMPACTIONS_SCHEDULED, "rocksdb.num.subcompactions.scheduled"}, + {BYTES_PER_READ, "rocksdb.bytes.per.read"}, + {BYTES_PER_WRITE, "rocksdb.bytes.per.write"}, + {BYTES_PER_MULTIGET, "rocksdb.bytes.per.multiget"}, }; struct HistogramData { From 1d854fa3d46cc8da984b5f70ec7856917bb52e70 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Mon, 1 Feb 2016 12:45:45 -0800 Subject: [PATCH 04/16] Fixed the asan error on column_family_test Summary: Fixed the asan error on column_family_test caused by not disabling SyncPoint. Test Plan: column_family_test Reviewers: anthony, rven, kradhakrishnan, sdong, IslamAbdelRahman Reviewed By: IslamAbdelRahman Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D53505 --- db/column_family_test.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 114451464..e3b51fc85 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -65,6 +65,7 @@ class ColumnFamilyTest : public testing::Test { ~ColumnFamilyTest() { Close(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); Destroy(); delete env_; } @@ -2047,7 +2048,6 @@ TEST_F(ColumnFamilyTest, FlushAndDropRaceCondition) { Close(); Destroy(); - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } #ifndef ROCKSDB_LITE @@ -2125,7 +2125,6 @@ TEST_F(ColumnFamilyTest, CreateAndDropRace) { drop_cf_thread.join(); Close(); Destroy(); - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); for (auto* comparator : comparators) { if (comparator) { delete comparator; From 36300fbbe3a7fde402152e4a57bbf5cda614d53c Mon Sep 17 00:00:00 2001 From: Dmitri Smirnov Date: Mon, 1 Feb 2016 13:14:37 -0800 Subject: [PATCH 05/16] Enable per-request buffer allocation in RandomAccessFile This change impacts only non-buffered I/O on Windows. Currently, there is a buffer per RandomAccessFile instance that is protected by a lock. The reason we maintain the buffer is non-buffered I/O requires an aligned buffer to work. XPerf traces demonstrate that we accumulate a considerable wait time while waiting for that lock. This change enables to set random access buffer size to zero which would indicate a per request allocation. We are expecting that allocation expense would be much less than I/O costs plus wait time due to the fact that the memory heap would tend to re-use page aligned allocations especially with the use of Jemalloc. This change does not affect buffer use as a read_ahead_buffer for compaction purposes. --- include/rocksdb/options.h | 3 + port/win/env_win.cc | 115 ++++++++++++++++++++++++-------------- 2 files changed, 77 insertions(+), 41 deletions(-) diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 9eee07ac0..a26ed7d81 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1121,6 +1121,9 @@ struct DBOptions { // This option is currently honored only on Windows // // Default: 1 Mb + // + // Special value: 0 - means do not maintain per instance buffer. Allocate + // per request buffer and avoid locking. size_t random_access_max_buffer_size; // This is the maximum buffer size that is used by WritableFileWriter. diff --git a/port/win/env_win.cc b/port/win/env_win.cc index 977c80b88..50059a98f 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -766,6 +766,18 @@ class WinRandomAccessFile : public RandomAccessFile { return read; } + void CalculateReadParameters(uint64_t offset, size_t bytes_requested, + size_t& actual_bytes_toread, + uint64_t& first_page_start) const { + + const size_t alignment = buffer_.Alignment(); + + first_page_start = TruncateToPageBoundary(alignment, offset); + const uint64_t last_page_start = + TruncateToPageBoundary(alignment, offset + bytes_requested - 1); + actual_bytes_toread = (last_page_start - first_page_start) + alignment; + } + public: WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment, const EnvOptions& options) @@ -797,66 +809,87 @@ class WinRandomAccessFile : public RandomAccessFile { virtual Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const override { + Status s; SSIZE_T r = -1; size_t left = n; char* dest = scratch; + if (n == 0) { + *result = Slice(scratch, 0); + return s; + } + // When in unbuffered mode we need to do the following changes: // - use our own aligned buffer // - always read at the offset of that is a multiple of alignment if (!use_os_buffer_) { - std::unique_lock lock(buffer_mut_); - // Let's see if at least some of the requested data is already - // in the buffer - if (offset >= buffered_start_ && - offset < (buffered_start_ + buffer_.CurrentSize())) { - size_t buffer_offset = offset - buffered_start_; - r = buffer_.Read(dest, buffer_offset, left); - assert(r >= 0); + uint64_t first_page_start = 0; + size_t actual_bytes_toread = 0; + size_t bytes_requested = left; - left -= size_t(r); - offset += r; - dest += r; - } + if (!read_ahead_ && random_access_max_buffer_size_ == 0) { + CalculateReadParameters(offset, bytes_requested, actual_bytes_toread, + first_page_start); + + assert(actual_bytes_toread > 0); - // Still some left or none was buffered - if (left > 0) { - // Figure out the start/end offset for reading and amount to read - const size_t alignment = buffer_.Alignment(); - const size_t first_page_start = - TruncateToPageBoundary(alignment, offset); + r = ReadIntoOneShotBuffer(offset, first_page_start, + actual_bytes_toread, left, dest); + } else { + + std::unique_lock lock(buffer_mut_); - size_t bytes_requested = left; - if (read_ahead_ && bytes_requested < compaction_readahead_size_) { - bytes_requested = compaction_readahead_size_; + // Let's see if at least some of the requested data is already + // in the buffer + if (offset >= buffered_start_ && + offset < (buffered_start_ + buffer_.CurrentSize())) { + size_t buffer_offset = offset - buffered_start_; + r = buffer_.Read(dest, buffer_offset, left); + assert(r >= 0); + + left -= size_t(r); + offset += r; + dest += r; } - const size_t last_page_start = - TruncateToPageBoundary(alignment, offset + bytes_requested - 1); - const size_t actual_bytes_toread = - (last_page_start - first_page_start) + alignment; + // Still some left or none was buffered + if (left > 0) { + // Figure out the start/end offset for reading and amount to read + bytes_requested = left; + + if (read_ahead_ && bytes_requested < compaction_readahead_size_) { + bytes_requested = compaction_readahead_size_; + } + + CalculateReadParameters(offset, bytes_requested, actual_bytes_toread, + first_page_start); + + assert(actual_bytes_toread > 0); - if (buffer_.Capacity() < actual_bytes_toread) { - // If we are in read-ahead mode or the requested size - // exceeds max buffer size then use one-shot - // big buffer otherwise reallocate main buffer - if (read_ahead_ || + if (buffer_.Capacity() < actual_bytes_toread) { + // If we are in read-ahead mode or the requested size + // exceeds max buffer size then use one-shot + // big buffer otherwise reallocate main buffer + if (read_ahead_ || (actual_bytes_toread > random_access_max_buffer_size_)) { - // Unlock the mutex since we are not using instance buffer - lock.unlock(); - r = ReadIntoOneShotBuffer(offset, first_page_start, - actual_bytes_toread, left, dest); - } else { - buffer_.AllocateNewBuffer(actual_bytes_toread); + // Unlock the mutex since we are not using instance buffer + lock.unlock(); + r = ReadIntoOneShotBuffer(offset, first_page_start, + actual_bytes_toread, left, dest); + } + else { + buffer_.AllocateNewBuffer(actual_bytes_toread); + r = ReadIntoInstanceBuffer(offset, first_page_start, + actual_bytes_toread, left, dest); + } + } + else { + buffer_.Clear(); r = ReadIntoInstanceBuffer(offset, first_page_start, - actual_bytes_toread, left, dest); + actual_bytes_toread, left, dest); } - } else { - buffer_.Clear(); - r = ReadIntoInstanceBuffer(offset, first_page_start, - actual_bytes_toread, left, dest); } } } else { From aa5e3b7c04b7116a177b7fb955b8e2fe76c5d64b Mon Sep 17 00:00:00 2001 From: Dmytro Ivchenko Date: Mon, 1 Feb 2016 13:41:13 -0800 Subject: [PATCH 06/16] PerfContext::ToString() add option to exclude zero counters Test Plan: Added unit test to check w/ w/o zeros scenarios Reviewers: yhchiang Reviewed By: yhchiang Subscribers: sdong, dhruba Differential Revision: https://reviews.facebook.net/D52809 --- db/perf_context_test.cc | 13 +++++++ include/rocksdb/perf_context.h | 2 +- util/perf_context.cc | 64 +++++++++++++++++++++++----------- 3 files changed, 57 insertions(+), 22 deletions(-) diff --git a/db/perf_context_test.cc b/db/perf_context_test.cc index 9494ac92b..8a345e5bb 100644 --- a/db/perf_context_test.cc +++ b/db/perf_context_test.cc @@ -589,6 +589,19 @@ TEST_F(PerfContextTest, FalseDBMutexWait) { } } } + +TEST_F(PerfContextTest, ToString) { + perf_context.Reset(); + perf_context.block_read_count = 12345; + + std::string zero_included = perf_context.ToString(); + ASSERT_NE(std::string::npos, zero_included.find("= 0")); + ASSERT_NE(std::string::npos, zero_included.find("= 12345")); + + std::string zero_excluded = perf_context.ToString(true); + ASSERT_EQ(std::string::npos, zero_excluded.find("= 0")); + ASSERT_NE(std::string::npos, zero_excluded.find("= 12345")); +} } int main(int argc, char** argv) { diff --git a/include/rocksdb/perf_context.h b/include/rocksdb/perf_context.h index c2af729e3..7a6b6f367 100644 --- a/include/rocksdb/perf_context.h +++ b/include/rocksdb/perf_context.h @@ -21,7 +21,7 @@ struct PerfContext { void Reset(); // reset all performance counters to zero - std::string ToString() const; + std::string ToString(bool exclude_zero_counters = false) const; uint64_t user_key_comparison_count; // total number of user key comparisons uint64_t block_cache_hit_count; // total number of block cache hits diff --git a/util/perf_context.cc b/util/perf_context.cc index 282516590..07bad40f2 100644 --- a/util/perf_context.cc +++ b/util/perf_context.cc @@ -61,32 +61,54 @@ void PerfContext::Reset() { #endif } -#define OUTPUT(counter) #counter << " = " << counter << ", " +#define OUTPUT(counter) \ + if (!exclude_zero_counters || (counter > 0)) { \ + ss << #counter << " = " << counter << ", "; \ + } -std::string PerfContext::ToString() const { +std::string PerfContext::ToString(bool exclude_zero_counters) const { #if defined(NPERF_CONTEXT) || defined(IOS_CROSS_COMPILE) return ""; #else std::ostringstream ss; - ss << OUTPUT(user_key_comparison_count) << OUTPUT(block_cache_hit_count) - << OUTPUT(block_read_count) << OUTPUT(block_read_byte) - << OUTPUT(block_read_time) << OUTPUT(block_checksum_time) - << OUTPUT(block_decompress_time) << OUTPUT(internal_key_skipped_count) - << OUTPUT(internal_delete_skipped_count) << OUTPUT(write_wal_time) - << OUTPUT(get_snapshot_time) << OUTPUT(get_from_memtable_time) - << OUTPUT(get_from_memtable_count) << OUTPUT(get_post_process_time) - << OUTPUT(get_from_output_files_time) << OUTPUT(seek_on_memtable_time) - << OUTPUT(seek_on_memtable_count) << OUTPUT(seek_child_seek_time) - << OUTPUT(seek_child_seek_count) << OUTPUT(seek_min_heap_time) - << OUTPUT(seek_internal_seek_time) << OUTPUT(find_next_user_entry_time) - << OUTPUT(write_pre_and_post_process_time) << OUTPUT(write_memtable_time) - << OUTPUT(db_mutex_lock_nanos) << OUTPUT(db_condition_wait_nanos) - << OUTPUT(merge_operator_time_nanos) << OUTPUT(write_delay_time) - << OUTPUT(read_index_block_nanos) << OUTPUT(read_filter_block_nanos) - << OUTPUT(new_table_block_iter_nanos) << OUTPUT(new_table_iterator_nanos) - << OUTPUT(block_seek_nanos) << OUTPUT(find_table_nanos) - << OUTPUT(bloom_memtable_hit_count) << OUTPUT(bloom_memtable_miss_count) - << OUTPUT(bloom_sst_hit_count) << OUTPUT(bloom_sst_miss_count); + OUTPUT(user_key_comparison_count); + OUTPUT(block_cache_hit_count); + OUTPUT(block_read_count); + OUTPUT(block_read_byte); + OUTPUT(block_read_time); + OUTPUT(block_checksum_time); + OUTPUT(block_decompress_time); + OUTPUT(internal_key_skipped_count); + OUTPUT(internal_delete_skipped_count); + OUTPUT(write_wal_time); + OUTPUT(get_snapshot_time); + OUTPUT(get_from_memtable_time); + OUTPUT(get_from_memtable_count); + OUTPUT(get_post_process_time); + OUTPUT(get_from_output_files_time); + OUTPUT(seek_on_memtable_time); + OUTPUT(seek_on_memtable_count); + OUTPUT(seek_child_seek_time); + OUTPUT(seek_child_seek_count); + OUTPUT(seek_min_heap_time); + OUTPUT(seek_internal_seek_time); + OUTPUT(find_next_user_entry_time); + OUTPUT(write_pre_and_post_process_time); + OUTPUT(write_memtable_time); + OUTPUT(db_mutex_lock_nanos); + OUTPUT(db_condition_wait_nanos); + OUTPUT(merge_operator_time_nanos); + OUTPUT(write_delay_time); + OUTPUT(read_index_block_nanos); + OUTPUT(read_filter_block_nanos); + OUTPUT(new_table_block_iter_nanos); + OUTPUT(new_table_iterator_nanos); + OUTPUT(block_seek_nanos); + OUTPUT(find_table_nanos); + OUTPUT(bloom_memtable_hit_count); + OUTPUT(bloom_memtable_miss_count); + OUTPUT(bloom_sst_hit_count); + OUTPUT(bloom_sst_miss_count); return ss.str(); #endif } From fdd70d14955e521d1b6bdb42838b44569f40c942 Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Mon, 1 Feb 2016 14:58:46 -0800 Subject: [PATCH 07/16] Skip filters for last L0 file if hit-optimized Summary: Following up on D53493, we can still enable the filter-skipping optimization for last file in L0. It's correct to assume the key will be present in the last L0 file when we're hit-optimized and L0 is deepest. The FilePicker encapsulates the state for traversing each level's files, so I needed to make it expose whether the returned file is last in its level. Test Plan: verified below test fails before this patch and passes afterwards. The change to how the test memtable is populated is needed so file 1 has keys (0, 30, 60), file 2 has keys (10, 40, 70), etc. $ ./db_universal_compaction_test --gtest_filter=UniversalCompactionNumLevels/DBTestUniversalCompaction.OptimizeFiltersForHits/* Reviewers: sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D53583 --- db/db_universal_compaction_test.cc | 15 ++++++++++++--- db/version_set.cc | 30 ++++++++++++++++++------------ db/version_set.h | 2 +- 3 files changed, 31 insertions(+), 16 deletions(-) diff --git a/db/db_universal_compaction_test.cc b/db/db_universal_compaction_test.cc index 9d3cca83c..a4cf6657f 100644 --- a/db/db_universal_compaction_test.cc +++ b/db/db_universal_compaction_test.cc @@ -187,14 +187,16 @@ TEST_P(DBTestUniversalCompaction, OptimizeFiltersForHits) { env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); - Put("", ""); for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) { Put(Key(num * 10), "val"); + if (num) { + dbfull()->TEST_WaitForFlushMemTable(); + } Put(Key(30 + num * 10), "val"); Put(Key(60 + num * 10), "val"); - - dbfull()->TEST_WaitForFlushMemTable(); } + Put("", ""); + dbfull()->TEST_WaitForFlushMemTable(); // Query set of non existing keys for (int i = 5; i < 90; i += 10) { @@ -205,6 +207,13 @@ TEST_P(DBTestUniversalCompaction, OptimizeFiltersForHits) { ASSERT_GT(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0); auto prev_counter = TestGetTickerCount(options, BLOOM_FILTER_USEFUL); + // Make sure bloom filter is used for all but the last L0 file when looking + // up a non-existent key that's in the range of all L0 files. + ASSERT_EQ(Get(Key(35)), "NOT_FOUND"); + ASSERT_EQ(prev_counter + NumTableFilesAtLevel(0) - 1, + TestGetTickerCount(options, BLOOM_FILTER_USEFUL)); + prev_counter = TestGetTickerCount(options, BLOOM_FILTER_USEFUL); + // Unblock compaction and wait it for happening. sleeping_task_low.WakeUp(); dbfull()->TEST_WaitForCompact(); diff --git a/db/version_set.cc b/db/version_set.cc index 3679bfbb4..6804730d7 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -84,15 +84,11 @@ int FindFileInRange(const InternalKeyComparator& icmp, // are MergeInProgress). class FilePicker { public: - FilePicker( - std::vector* files, - const Slice& user_key, - const Slice& ikey, - autovector* file_levels, - unsigned int num_levels, - FileIndexer* file_indexer, - const Comparator* user_comparator, - const InternalKeyComparator* internal_comparator) + FilePicker(std::vector* files, const Slice& user_key, + const Slice& ikey, autovector* file_levels, + unsigned int num_levels, FileIndexer* file_indexer, + const Comparator* user_comparator, + const InternalKeyComparator* internal_comparator) : num_levels_(num_levels), curr_level_(-1), hit_file_level_(-1), @@ -102,6 +98,7 @@ class FilePicker { files_(files), #endif level_files_brief_(file_levels), + is_hit_file_last_in_level_(false), user_key_(user_key), ikey_(ikey), file_indexer_(file_indexer), @@ -126,6 +123,8 @@ class FilePicker { // Loops over all files in current level. FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_]; hit_file_level_ = curr_level_; + is_hit_file_last_in_level_ = + curr_index_in_curr_level_ == curr_file_level_->num_files - 1; int cmp_largest = -1; // Do key range filtering of files or/and fractional cascading if: @@ -209,6 +208,10 @@ class FilePicker { // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts unsigned int GetHitFileLevel() { return hit_file_level_; } + // Returns true if the most recent "hit file" (i.e., one returned by + // GetNextFile()) is at the last index in its level. + bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; } + private: unsigned int num_levels_; unsigned int curr_level_; @@ -220,6 +223,7 @@ class FilePicker { #endif autovector* level_files_brief_; bool search_ended_; + bool is_hit_file_last_in_level_; LevelFilesBrief* curr_file_level_; unsigned int curr_index_in_curr_level_; unsigned int start_index_in_curr_level_; @@ -903,7 +907,8 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, *status = table_cache_->Get( read_options, *internal_comparator(), f->fd, ikey, &get_context, cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), - IsFilterSkipped(static_cast(fp.GetHitFileLevel()))); + IsFilterSkipped(static_cast(fp.GetHitFileLevel()), + fp.IsHitFileLastInLevel())); // TODO: examine the behavior for corrupted key if (!status->ok()) { return; @@ -960,10 +965,11 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, } } -bool Version::IsFilterSkipped(int level) { +bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) { // Reaching the bottom level implies misses at all upper levels, so we'll // skip checking the filters when we predict a hit. - return cfd_->ioptions()->optimize_filters_for_hits && level > 0 && + return cfd_->ioptions()->optimize_filters_for_hits && + (level > 0 || is_file_last_in_level) && level == storage_info_.num_non_empty_levels() - 1; } diff --git a/db/version_set.h b/db/version_set.h index 097109fd4..7ce4a6bdf 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -530,7 +530,7 @@ class Version { // checked during read operations. In certain cases (trivial move or preload), // the filter block may already be cached, but we still do not access it such // that it eventually expires from the cache. - bool IsFilterSkipped(int level); + bool IsFilterSkipped(int level, bool is_file_last_in_level = false); // The helper function of UpdateAccumulatedStats, which may fill the missing // fields of file_mata from its associated TableProperties. From ad7ecca72dc0b282fbf62ca0b43595787d278e48 Mon Sep 17 00:00:00 2001 From: sdong Date: Mon, 1 Feb 2016 11:03:28 -0800 Subject: [PATCH 08/16] Add unit tests to verify large key/value Summary: Add unit tests: (1) insert entries of 8MB key and 3GB value to DB (2) insert entry of 3GB key and 3GB value into write batch and make sure we can read it. (3) insert 3 billions of key-value pairs into write batch and make sure we can read it. Disable them because not all platform can run it. Test Plan: Run the tests Reviewers: IslamAbdelRahman, yhchiang, kradhakrishnan, andrewkr, anthony Reviewed By: anthony Subscribers: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D53619 --- db/db_test.cc | 55 ++++++++++++++++++++ db/write_batch_test.cc | 114 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 169 insertions(+) diff --git a/db/db_test.cc b/db/db_test.cc index 76e64d484..dfc231969 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -606,6 +606,61 @@ TEST_F(DBTest, EmptyFlush) { kSkipUniversalCompaction | kSkipMergePut)); } +// Disable because not all platform can run it. +// It requires more than 9GB memory to run it, With single allocation +// of more than 3GB. +TEST_F(DBTest, DISABLED_VeryLargeValue) { + const size_t kValueSize = 3221225472u; // 3GB value + const size_t kKeySize = 8388608u; // 8MB key + std::string raw(kValueSize, 'v'); + std::string key1(kKeySize, 'c'); + std::string key2(kKeySize, 'd'); + + Options options; + options.env = env_; + options.write_buffer_size = 100000; // Small write buffer + options.paranoid_checks = true; + options = CurrentOptions(options); + DestroyAndReopen(options); + + ASSERT_OK(Put("boo", "v1")); + ASSERT_OK(Put("foo", "v1")); + ASSERT_OK(Put(key1, raw)); + raw[0] = 'w'; + ASSERT_OK(Put(key2, raw)); + dbfull()->TEST_WaitForFlushMemTable(); + + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + + std::string value; + Status s = db_->Get(ReadOptions(), key1, &value); + ASSERT_OK(s); + ASSERT_EQ(kValueSize, value.size()); + ASSERT_EQ('v', value[0]); + + s = db_->Get(ReadOptions(), key2, &value); + ASSERT_OK(s); + ASSERT_EQ(kValueSize, value.size()); + ASSERT_EQ('w', value[0]); + + // Compact all files. + Flush(); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + + // Check DB is not in read-only state. + ASSERT_OK(Put("boo", "v1")); + + s = db_->Get(ReadOptions(), key1, &value); + ASSERT_OK(s); + ASSERT_EQ(kValueSize, value.size()); + ASSERT_EQ('v', value[0]); + + s = db_->Get(ReadOptions(), key2, &value); + ASSERT_OK(s); + ASSERT_EQ(kValueSize, value.size()); + ASSERT_EQ('w', value[0]); +} + TEST_F(DBTest, GetFromImmutableLayer) { do { Options options; diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 5d008b3a4..cd981fd60 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -308,6 +308,120 @@ TEST_F(WriteBatchTest, Blob) { handler.seen); } +// It requires more than 30GB of memory to run the test. With single memory +// allocation of more than 30GB. +// Not all platform can run it. Also it runs a long time. So disable it. +TEST_F(WriteBatchTest, DISABLED_ManyUpdates) { + // Insert key and value of 3GB and push total batch size to 12GB. + const size_t kKeyValueSize = 4u; + const uint32_t kNumUpdates = 3 << 30; + std::string raw(kKeyValueSize, 'A'); + WriteBatch batch(kNumUpdates * (4 + kKeyValueSize * 2) + 1024u); + char c = 'A'; + for (uint32_t i = 0; i < kNumUpdates; i++) { + if (c > 'Z') { + c = 'A'; + } + raw[0] = c; + raw[raw.length() - 1] = c; + c++; + batch.Put(raw, raw); + } + + ASSERT_EQ(kNumUpdates, batch.Count()); + + struct NoopHandler : public WriteBatch::Handler { + uint32_t num_seen = 0; + char expected_char = 'A'; + virtual Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + EXPECT_EQ(kKeyValueSize, key.size()); + EXPECT_EQ(kKeyValueSize, value.size()); + EXPECT_EQ(expected_char, key[0]); + EXPECT_EQ(expected_char, value[0]); + EXPECT_EQ(expected_char, key[kKeyValueSize - 1]); + EXPECT_EQ(expected_char, value[kKeyValueSize - 1]); + expected_char++; + if (expected_char > 'Z') { + expected_char = 'A'; + } + ++num_seen; + return Status::OK(); + } + virtual Status DeleteCF(uint32_t column_family_id, + const Slice& key) override { + EXPECT_TRUE(false); + return Status::OK(); + } + virtual Status SingleDeleteCF(uint32_t column_family_id, + const Slice& key) override { + EXPECT_TRUE(false); + return Status::OK(); + } + virtual Status MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + EXPECT_TRUE(false); + return Status::OK(); + } + virtual void LogData(const Slice& blob) override { EXPECT_TRUE(false); } + virtual bool Continue() override { return num_seen < kNumUpdates; } + } handler; + + batch.Iterate(&handler); + ASSERT_EQ(kNumUpdates, handler.num_seen); +} + +// The test requires more than 18GB memory to run it, with single memory +// allocation of more than 12GB. Not all the platform can run it. So disable it. +TEST_F(WriteBatchTest, DISABLED_LargeKeyValue) { + // Insert key and value of 3GB and push total batch size to 12GB. + const size_t kKeyValueSize = 3221225472u; + std::string raw(kKeyValueSize, 'A'); + WriteBatch batch(12884901888u + 1024u); + for (char i = 0; i < 2; i++) { + raw[0] = 'A' + i; + raw[raw.length() - 1] = 'A' - i; + batch.Put(raw, raw); + } + + ASSERT_EQ(2, batch.Count()); + + struct NoopHandler : public WriteBatch::Handler { + int num_seen = 0; + virtual Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + EXPECT_EQ(kKeyValueSize, key.size()); + EXPECT_EQ(kKeyValueSize, value.size()); + EXPECT_EQ('A' + num_seen, key[0]); + EXPECT_EQ('A' + num_seen, value[0]); + EXPECT_EQ('A' - num_seen, key[kKeyValueSize - 1]); + EXPECT_EQ('A' - num_seen, value[kKeyValueSize - 1]); + ++num_seen; + return Status::OK(); + } + virtual Status DeleteCF(uint32_t column_family_id, + const Slice& key) override { + EXPECT_TRUE(false); + return Status::OK(); + } + virtual Status SingleDeleteCF(uint32_t column_family_id, + const Slice& key) override { + EXPECT_TRUE(false); + return Status::OK(); + } + virtual Status MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + EXPECT_TRUE(false); + return Status::OK(); + } + virtual void LogData(const Slice& blob) override { EXPECT_TRUE(false); } + virtual bool Continue() override { return num_seen < 2; } + } handler; + + batch.Iterate(&handler); + ASSERT_EQ(2, handler.num_seen); +} + TEST_F(WriteBatchTest, Continue) { WriteBatch batch; From 1ad8182950f8423134f8c0a92213bced34cbf2df Mon Sep 17 00:00:00 2001 From: Islam AbdelRahman Date: Mon, 1 Feb 2016 16:07:53 -0800 Subject: [PATCH 09/16] Fix WriteBatchTest.ManyUpdates, WriteBatchTest.LargeKeyValue under clang Summary: Fix current clang failure https://ci-builds.fb.com/view/rocksdb/job/rocksdb_clang_build/1398/console Test Plan: make sure that both clang and g++ compilation succeed USE_CLANG=1 make check -j64 make check -j64 Reviewers: anthony, yhchiang, sdong Reviewed By: sdong Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D53667 --- db/write_batch_test.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index cd981fd60..c475dbeb6 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -313,8 +313,8 @@ TEST_F(WriteBatchTest, Blob) { // Not all platform can run it. Also it runs a long time. So disable it. TEST_F(WriteBatchTest, DISABLED_ManyUpdates) { // Insert key and value of 3GB and push total batch size to 12GB. - const size_t kKeyValueSize = 4u; - const uint32_t kNumUpdates = 3 << 30; + static const size_t kKeyValueSize = 4u; + static const uint32_t kNumUpdates = 3 << 30; std::string raw(kKeyValueSize, 'A'); WriteBatch batch(kNumUpdates * (4 + kKeyValueSize * 2) + 1024u); char c = 'A'; @@ -375,7 +375,7 @@ TEST_F(WriteBatchTest, DISABLED_ManyUpdates) { // allocation of more than 12GB. Not all the platform can run it. So disable it. TEST_F(WriteBatchTest, DISABLED_LargeKeyValue) { // Insert key and value of 3GB and push total batch size to 12GB. - const size_t kKeyValueSize = 3221225472u; + static const size_t kKeyValueSize = 3221225472u; std::string raw(kKeyValueSize, 'A'); WriteBatch batch(12884901888u + 1024u); for (char i = 0; i < 2; i++) { From ac3fa9a6fe3684d3d1234c8a505f5741db06681a Mon Sep 17 00:00:00 2001 From: sdong Date: Mon, 1 Feb 2016 18:20:34 -0800 Subject: [PATCH 10/16] Travis CI to disable ROCKSDB_LITE tests Summary: Travis CI fails most of the times because of timing out. To unblock it, disable LITE tests in Travis CI. Test Plan: Will see. Reviewers: anthony, yhchiang, kradhakrishnan, IslamAbdelRahman, andrewkr Reviewed By: andrewkr Subscribers: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D53679 --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index b6fa63c5d..b045d259e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -34,7 +34,7 @@ before_script: # as EnvPosixTest::AllocateTest expects within the Travis OpenVZ environment. script: - if [[ "${TRAVIS_OS_NAME}" == 'linux' ]]; then OPT=-DTRAVIS CLANG_FORMAT_DIFF=/tmp/clang-format-diff.py make format || true; fi - - OPT=-DTRAVIS V=1 make -j4 check && OPT=-DTRAVIS V=1 make clean jclean rocksdbjava jtest && make clean && OPT="-DTRAVIS -DROCKSDB_LITE" V=1 make -j4 check + - OPT=-DTRAVIS V=1 make -j4 check && OPT=-DTRAVIS V=1 make clean jclean rocksdbjava jtest && make clean && OPT="-DTRAVIS -DROCKSDB_LITE" V=1 make -j4 static_lib notifications: email: From 9c2cf9479b1b59b22562a4ee673caf9826199ef3 Mon Sep 17 00:00:00 2001 From: Nathan Bronson Date: Sun, 31 Jan 2016 10:48:43 -0800 Subject: [PATCH 11/16] Fix for --allow_concurrent_memtable_write with batching Summary: Concurrent memtable adds were incorrectly computing the last sequence number for a write batch group when the write batches were not solitary. This is the cause of https://github.com/facebook/mysql-5.6/issues/155 Test Plan: 1. unit tests 2. new unit test 3. parallel db_bench stress tests with batch size of 10 and asserts enabled Reviewers: igor, sdong Reviewed By: sdong Subscribers: IslamAbdelRahman, MarkCallaghan, dhruba Differential Revision: https://reviews.facebook.net/D53595 --- db/db_impl.cc | 8 ++++++-- db/db_test_util.cc | 8 +++++++- db/db_test_util.h | 7 ++++--- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 51ed9a3a0..f625c775e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -4137,7 +4137,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (write_thread_.CompleteParallelWorker(&w)) { // we're responsible for early exit - auto last_sequence = w.parallel_group->last_writer->sequence; + auto last_sequence = + w.parallel_group->last_writer->sequence + + WriteBatchInternal::Count(w.parallel_group->last_writer->batch) - 1; SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence); versions_->SetLastSequence(last_sequence); write_thread_.EarlyExitParallelGroup(&w); @@ -4437,7 +4439,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, this, true /*dont_filter_deletes*/, true /*concurrent_memtable_writes*/); - assert(last_writer->sequence == last_sequence); + assert(last_writer->sequence + + WriteBatchInternal::Count(last_writer->batch) - 1 == + last_sequence); // CompleteParallelWorker returns true if this thread should // handle exit, false means somebody else did exit_completed_early = !write_thread_.CompleteParallelWorker(&w); diff --git a/db/db_test_util.cc b/db/db_test_util.cc index e6ee304a5..f2906c7ca 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -85,7 +85,8 @@ bool DBTestBase::ShouldSkipOptions(int option_config, int skip_mask) { option_config == kHashCuckoo || option_config == kUniversalCompaction || option_config == kUniversalCompactionMultiLevel || option_config == kUniversalSubcompactions || - option_config == kFIFOCompaction) { + option_config == kFIFOCompaction || + option_config == kConcurrentSkipList) { return true; } #endif @@ -361,6 +362,11 @@ Options DBTestBase::CurrentOptions( options.max_subcompactions = 4; break; } + case kConcurrentSkipList: { + options.allow_concurrent_memtable_write = true; + options.enable_write_thread_adaptive_yield = true; + break; + } default: break; diff --git a/db/db_test_util.h b/db/db_test_util.h index 031057bbb..b993af8cb 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -525,9 +525,10 @@ class DBTestBase : public testing::Test { kOptimizeFiltersForHits = 27, kRowCache = 28, kRecycleLogFiles = 29, - kLevelSubcompactions = 30, - kUniversalSubcompactions = 31, - kEnd = 30 + kConcurrentSkipList = 30, + kEnd = 31, + kLevelSubcompactions = 31, + kUniversalSubcompactions = 32, }; int option_config_; From 502d41f1503d7d65e91a47b6d6ba9faf1c0471dd Mon Sep 17 00:00:00 2001 From: Tomas Kolda Date: Wed, 27 Jan 2016 13:36:22 +0100 Subject: [PATCH 12/16] Making use of GetSystemTimePreciseAsFileTime dynamic to not break compatibility with Windows 7. The issue with rotated logs was fixed other way. --- db/auto_roll_logger.cc | 13 ++++++++-- port/win/env_win.cc | 59 ++++++++++++++++++++++++++++-------------- 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/db/auto_roll_logger.cc b/db/auto_roll_logger.cc index cf92f34c8..c984b4810 100644 --- a/db/auto_roll_logger.cc +++ b/db/auto_roll_logger.cc @@ -32,8 +32,17 @@ Status AutoRollLogger::ResetLogger() { } void AutoRollLogger::RollLogFile() { - std::string old_fname = OldInfoLogFileName( - dbname_, env_->NowMicros(), db_absolute_path_, db_log_dir_); + uint64_t now = env_->NowMicros(); + std::string old_fname; + // Try to check target name only 10 times at most + for (int i = 0; i < 10; i++) { + old_fname = OldInfoLogFileName( + dbname_, now, db_absolute_path_, db_log_dir_); + if (!env_->FileExists(old_fname).ok()) { + break; + } + now++; + }; env_->RenameFile(log_fname_, old_fname); } diff --git a/port/win/env_win.cc b/port/win/env_win.cc index 50059a98f..87a25569c 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -1138,6 +1138,8 @@ void WinthreadCall(const char* label, std::error_code result) { } } +typedef VOID(WINAPI * FnGetSystemTimePreciseAsFileTime)(LPFILETIME); + class WinEnv : public Env { public: WinEnv(); @@ -1676,25 +1678,29 @@ class WinEnv : public Env { } virtual uint64_t NowMicros() override { - // all std::chrono clocks on windows proved to return - // values that may repeat that is not good enough for some uses. - const int64_t c_UnixEpochStartTicks = 116444736000000000i64; - const int64_t c_FtToMicroSec = 10; - - // This interface needs to return system time and not - // just any microseconds because it is often used as an argument - // to TimedWait() on condition variable - FILETIME ftSystemTime; - GetSystemTimePreciseAsFileTime(&ftSystemTime); - - LARGE_INTEGER li; - li.LowPart = ftSystemTime.dwLowDateTime; - li.HighPart = ftSystemTime.dwHighDateTime; - // Subtract unix epoch start - li.QuadPart -= c_UnixEpochStartTicks; - // Convert to microsecs - li.QuadPart /= c_FtToMicroSec; - return li.QuadPart; + if (GetSystemTimePreciseAsFileTime_ != NULL) { + // all std::chrono clocks on windows proved to return + // values that may repeat that is not good enough for some uses. + const int64_t c_UnixEpochStartTicks = 116444736000000000i64; + const int64_t c_FtToMicroSec = 10; + + // This interface needs to return system time and not + // just any microseconds because it is often used as an argument + // to TimedWait() on condition variable + FILETIME ftSystemTime; + GetSystemTimePreciseAsFileTime_(&ftSystemTime); + + LARGE_INTEGER li; + li.LowPart = ftSystemTime.dwLowDateTime; + li.HighPart = ftSystemTime.dwHighDateTime; + // Subtract unix epoch start + li.QuadPart -= c_UnixEpochStartTicks; + // Convert to microsecs + li.QuadPart /= c_FtToMicroSec; + return li.QuadPart; + } + using namespace std::chrono; + return duration_cast(system_clock::now().time_since_epoch()).count(); } virtual uint64_t NowNanos() override { @@ -2104,8 +2110,13 @@ class WinEnv : public Env { std::vector thread_pools_; mutable std::mutex mu_; std::vector threads_to_join_; + static FnGetSystemTimePreciseAsFileTime GetSystemTimePreciseAsFileTime_; + static bool GetSystemTimePreciseAsFileTimeInitialized_; }; +FnGetSystemTimePreciseAsFileTime WinEnv::GetSystemTimePreciseAsFileTime_ = NULL; +bool WinEnv::GetSystemTimePreciseAsFileTimeInitialized_ = false; + WinEnv::WinEnv() : checkedDiskForMmap_(false), forceMmapOff(false), @@ -2113,6 +2124,16 @@ WinEnv::WinEnv() allocation_granularity_(page_size_), perf_counter_frequency_(0), thread_pools_(Priority::TOTAL) { + + if (!GetSystemTimePreciseAsFileTimeInitialized_) { + HMODULE module = GetModuleHandle("kernel32.dll"); + if (module != NULL) { + GetSystemTimePreciseAsFileTime_ = (FnGetSystemTimePreciseAsFileTime)GetProcAddress( + module, "GetSystemTimePreciseAsFileTime"); + } + GetSystemTimePreciseAsFileTimeInitialized_ = true; + } + SYSTEM_INFO sinfo; GetSystemInfo(&sinfo); From 57a95a700155345c658936a3b2ca492f36192691 Mon Sep 17 00:00:00 2001 From: Tomas Kolda Date: Wed, 27 Jan 2016 21:00:42 +0100 Subject: [PATCH 13/16] Making use of GetSystemTimePreciseAsFileTime dynamic - code review fixes --- db/auto_roll_logger.cc | 4 +++- port/win/env_win.cc | 20 +++++++------------- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/db/auto_roll_logger.cc b/db/auto_roll_logger.cc index c984b4810..b8ba14c83 100644 --- a/db/auto_roll_logger.cc +++ b/db/auto_roll_logger.cc @@ -32,9 +32,11 @@ Status AutoRollLogger::ResetLogger() { } void AutoRollLogger::RollLogFile() { + // This function is called when log is rotating. Two rotations + // can happen quickly (NowMicro returns same value). To not overwrite + // previous log file we increment by one micro second and try again. uint64_t now = env_->NowMicros(); std::string old_fname; - // Try to check target name only 10 times at most for (int i = 0; i < 10; i++) { old_fname = OldInfoLogFileName( dbname_, now, db_absolute_path_, db_log_dir_); diff --git a/port/win/env_win.cc b/port/win/env_win.cc index 87a25569c..1ae0b1932 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -2110,28 +2110,22 @@ class WinEnv : public Env { std::vector thread_pools_; mutable std::mutex mu_; std::vector threads_to_join_; - static FnGetSystemTimePreciseAsFileTime GetSystemTimePreciseAsFileTime_; - static bool GetSystemTimePreciseAsFileTimeInitialized_; + FnGetSystemTimePreciseAsFileTime GetSystemTimePreciseAsFileTime_; }; -FnGetSystemTimePreciseAsFileTime WinEnv::GetSystemTimePreciseAsFileTime_ = NULL; -bool WinEnv::GetSystemTimePreciseAsFileTimeInitialized_ = false; - WinEnv::WinEnv() : checkedDiskForMmap_(false), forceMmapOff(false), page_size_(4 * 1012), allocation_granularity_(page_size_), perf_counter_frequency_(0), - thread_pools_(Priority::TOTAL) { + thread_pools_(Priority::TOTAL), + GetSystemTimePreciseAsFileTime_(NULL) { - if (!GetSystemTimePreciseAsFileTimeInitialized_) { - HMODULE module = GetModuleHandle("kernel32.dll"); - if (module != NULL) { - GetSystemTimePreciseAsFileTime_ = (FnGetSystemTimePreciseAsFileTime)GetProcAddress( - module, "GetSystemTimePreciseAsFileTime"); - } - GetSystemTimePreciseAsFileTimeInitialized_ = true; + HMODULE module = GetModuleHandle("kernel32.dll"); + if (module != NULL) { + GetSystemTimePreciseAsFileTime_ = (FnGetSystemTimePreciseAsFileTime)GetProcAddress( + module, "GetSystemTimePreciseAsFileTime"); } SYSTEM_INFO sinfo; From a62c519bb6129bce4f2300cb07c305ad6c0c945b Mon Sep 17 00:00:00 2001 From: Tomas Kolda Date: Tue, 2 Feb 2016 10:33:49 +0100 Subject: [PATCH 14/16] RollLogFile tries to find non conflicting file until there is no conflict. --- db/auto_roll_logger.cc | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/db/auto_roll_logger.cc b/db/auto_roll_logger.cc index b8ba14c83..2349bd0c0 100644 --- a/db/auto_roll_logger.cc +++ b/db/auto_roll_logger.cc @@ -37,14 +37,11 @@ void AutoRollLogger::RollLogFile() { // previous log file we increment by one micro second and try again. uint64_t now = env_->NowMicros(); std::string old_fname; - for (int i = 0; i < 10; i++) { + do { old_fname = OldInfoLogFileName( dbname_, now, db_absolute_path_, db_log_dir_); - if (!env_->FileExists(old_fname).ok()) { - break; - } now++; - }; + } while (env_->FileExists(old_fname).ok()); env_->RenameFile(log_fname_, old_fname); } From 466c2c1bf71762752a1c1aa94f74e2918f9aed68 Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Tue, 2 Feb 2016 14:52:49 -0800 Subject: [PATCH 15/16] Generate tags for *.c files Summary: db/c_test.c uses the functions in db/c.cc. If we have tags generated for one but not the other, it's easy to make mistakes like updating a function signature and missing a call site. Test Plan: $ make tags in vim: :cscope find s rocksdb_options_set_compression_options ... 3 325 db/c_test.c <
> rocksdb_options_set_compression_options(options, -14, -1, 0); Reviewers: sdong, yhchiang, IslamAbdelRahman Reviewed By: IslamAbdelRahman Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D53685 --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 38fe1f5c1..c2642ce0f 100644 --- a/Makefile +++ b/Makefile @@ -700,7 +700,7 @@ clean: tags: ctags * -R - cscope -b `find . -name '*.cc'` `find . -name '*.h'` + cscope -b `find . -name '*.cc'` `find . -name '*.h'` `find . -name '*.c'` format: build_tools/format-diff.sh From 5fcd1ba30a2432a0d1241f4f8d52a97b362ec31f Mon Sep 17 00:00:00 2001 From: Nathan Bronson Date: Tue, 2 Feb 2016 18:19:07 -0800 Subject: [PATCH 16/16] disable kConcurrentSkipList multithreaded test Summary: Disable test that is intermittently failing Test Plan: unit tests Reviewers: igor, andrewkr, sdong Reviewed By: sdong Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D53715 --- db/db_test_util.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/db_test_util.h b/db/db_test_util.h index b993af8cb..48fa5430e 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -525,8 +525,8 @@ class DBTestBase : public testing::Test { kOptimizeFiltersForHits = 27, kRowCache = 28, kRecycleLogFiles = 29, + kEnd = 30, kConcurrentSkipList = 30, - kEnd = 31, kLevelSubcompactions = 31, kUniversalSubcompactions = 32, };