From 1ec72b37b1e8b001c61e71f532ee6a10213c5e0c Mon Sep 17 00:00:00 2001 From: Kai Liu Date: Tue, 18 Mar 2014 12:37:42 -0700 Subject: [PATCH 1/8] Several easy-to-add properties related to compaction and flushes Summary: To partly address the request @nkg- raised, add three easy-to-add properties to compactions and flushes. Test Plan: run unit tests and add a new unit test to cover new properties. Reviewers: haobo, dhruba Reviewed By: dhruba CC: nkg-, leveldb Differential Revision: https://reviews.facebook.net/D13677 --- db/db_impl.cc | 5 +- db/db_test.cc | 88 +++++++ db/internal_stats.cc | 583 +++++++++++++++++++++++-------------------- db/internal_stats.h | 22 +- db/memtable_list.cc | 2 +- db/memtable_list.h | 2 +- 6 files changed, 429 insertions(+), 273 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 4c161be1e..7b77cbaaa 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3696,9 +3696,10 @@ const Options& DBImpl::GetOptions() const { bool DBImpl::GetProperty(const Slice& property, std::string* value) { value->clear(); + DBPropertyType property_type = GetPropertyType(property); MutexLock l(&mutex_); - return internal_stats_.GetProperty(property, value, versions_.get(), - imm_.size()); + return internal_stats_.GetProperty(property_type, property, value, + versions_.get(), imm_); } void DBImpl::GetApproximateSizes( diff --git a/db/db_test.cc b/db/db_test.cc index 19b95540b..8e99bcec9 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2055,6 +2055,94 @@ TEST(DBTest, NumImmutableMemTable) { } while (ChangeCompactOptions()); } +class SleepingBackgroundTask { + public: + explicit SleepingBackgroundTask(Env* env) + : env_(env), bg_cv_(&mutex_), should_sleep_(true) {} + void DoSleep() { + MutexLock l(&mutex_); + while (should_sleep_) { + bg_cv_.Wait(); + } + } + void WakeUp() { + MutexLock l(&mutex_); + should_sleep_ = false; + bg_cv_.SignalAll(); + } + + static void DoSleepTask(void* arg) { + reinterpret_cast(arg)->DoSleep(); + } + + private: + const Env* env_; + port::Mutex mutex_; + port::CondVar bg_cv_; // Signalled when background work finishes + bool should_sleep_; +}; + +TEST(DBTest, GetProperty) { + // Set sizes to both background thread pool to be 1 and block them. + env_->SetBackgroundThreads(1, Env::HIGH); + env_->SetBackgroundThreads(1, Env::LOW); + SleepingBackgroundTask sleeping_task_low(env_); + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + Env::Priority::LOW); + SleepingBackgroundTask sleeping_task_high(env_); + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high, + Env::Priority::HIGH); + + Options options = CurrentOptions(); + WriteOptions writeOpt = WriteOptions(); + writeOpt.disableWAL = true; + options.compaction_style = kCompactionStyleUniversal; + options.level0_file_num_compaction_trigger = 1; + options.compaction_options_universal.size_ratio = 50; + options.max_background_compactions = 1; + options.max_background_flushes = 1; + options.max_write_buffer_number = 10; + options.min_write_buffer_number_to_merge = 1; + options.write_buffer_size = 1000000; + Reopen(&options); + + std::string big_value(1000000 * 2, 'x'); + std::string num; + SetPerfLevel(kEnableTime); + + ASSERT_OK(dbfull()->Put(writeOpt, "k1", big_value)); + ASSERT_TRUE(dbfull()->GetProperty("rocksdb.num-immutable-mem-table", &num)); + ASSERT_EQ(num, "0"); + ASSERT_TRUE(dbfull()->GetProperty("rocksdb.mem-table-flush-pending", &num)); + ASSERT_EQ(num, "0"); + ASSERT_TRUE(dbfull()->GetProperty("rocksdb.compaction-pending", &num)); + ASSERT_EQ(num, "0"); + perf_context.Reset(); + + ASSERT_OK(dbfull()->Put(writeOpt, "k2", big_value)); + ASSERT_TRUE(dbfull()->GetProperty("rocksdb.num-immutable-mem-table", &num)); + ASSERT_EQ(num, "1"); + ASSERT_OK(dbfull()->Put(writeOpt, "k3", big_value)); + ASSERT_TRUE(dbfull()->GetProperty("rocksdb.num-immutable-mem-table", &num)); + ASSERT_EQ(num, "2"); + ASSERT_TRUE(dbfull()->GetProperty("rocksdb.mem-table-flush-pending", &num)); + ASSERT_EQ(num, "1"); + ASSERT_TRUE(dbfull()->GetProperty("rocksdb.compaction-pending", &num)); + ASSERT_EQ(num, "0"); + + sleeping_task_high.WakeUp(); + dbfull()->TEST_WaitForFlushMemTable(); + + ASSERT_OK(dbfull()->Put(writeOpt, "k4", big_value)); + ASSERT_OK(dbfull()->Put(writeOpt, "k5", big_value)); + dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_TRUE(dbfull()->GetProperty("rocksdb.mem-table-flush-pending", &num)); + ASSERT_EQ(num, "0"); + ASSERT_TRUE(dbfull()->GetProperty("rocksdb.compaction-pending", &num)); + ASSERT_EQ(num, "1"); + sleeping_task_low.WakeUp(); +} + TEST(DBTest, FLUSH) { do { Options options = CurrentOptions(); diff --git a/db/internal_stats.cc b/db/internal_stats.cc index 7dcfd86c3..1f81023f6 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -1,4 +1,3 @@ - // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. @@ -8,291 +7,341 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/internal_stats.h" +#include "db/memtable_list.h" #include namespace rocksdb { -bool InternalStats::GetProperty(const Slice& property, std::string* value, - VersionSet* version_set, int immsize) { - Version* current = version_set->current(); +DBPropertyType GetPropertyType(const Slice& property) { Slice in = property; Slice prefix("rocksdb."); - if (!in.starts_with(prefix)) return false; + if (!in.starts_with(prefix)) return kUnknown; in.remove_prefix(prefix.size()); if (in.starts_with("num-files-at-level")) { - in.remove_prefix(strlen("num-files-at-level")); - uint64_t level; - bool ok = ConsumeDecimalNumber(&in, &level) && in.empty(); - if (!ok || (int)level >= number_levels_) { - return false; - } else { - char buf[100]; - snprintf(buf, sizeof(buf), "%d", - current->NumLevelFiles(static_cast(level))); - *value = buf; - return true; - } + return kNumFilesAtLevel; } else if (in == "levelstats") { - char buf[1000]; - snprintf(buf, sizeof(buf), - "Level Files Size(MB)\n" - "--------------------\n"); - value->append(buf); - - for (int level = 0; level < number_levels_; level++) { - snprintf(buf, sizeof(buf), "%3d %8d %8.0f\n", level, - current->NumLevelFiles(level), - current->NumLevelBytes(level) / 1048576.0); - value->append(buf); - } - return true; - + return kLevelStats; } else if (in == "stats") { - char buf[1000]; - - uint64_t wal_bytes = 0; - uint64_t wal_synced = 0; - uint64_t user_bytes_written = 0; - uint64_t write_other = 0; - uint64_t write_self = 0; - uint64_t write_with_wal = 0; - uint64_t total_bytes_written = 0; - uint64_t total_bytes_read = 0; - uint64_t micros_up = env_->NowMicros() - started_at_; - // Add "+1" to make sure seconds_up is > 0 and avoid NaN later - double seconds_up = (micros_up + 1) / 1000000.0; - uint64_t total_slowdown = 0; - uint64_t total_slowdown_count = 0; - uint64_t interval_bytes_written = 0; - uint64_t interval_bytes_read = 0; - uint64_t interval_bytes_new = 0; - double interval_seconds_up = 0; - - if (statistics_) { - wal_bytes = statistics_->getTickerCount(WAL_FILE_BYTES); - wal_synced = statistics_->getTickerCount(WAL_FILE_SYNCED); - user_bytes_written = statistics_->getTickerCount(BYTES_WRITTEN); - write_other = statistics_->getTickerCount(WRITE_DONE_BY_OTHER); - write_self = statistics_->getTickerCount(WRITE_DONE_BY_SELF); - write_with_wal = statistics_->getTickerCount(WRITE_WITH_WAL); + return kStats; + } else if (in == "sstables") { + return kSsTables; + } else if (in == "num-immutable-mem-table") { + return kNumImmutableMemTable; + } else if (in == "mem-table-flush-pending") { + return MemtableFlushPending; + } else if (in == "compaction-pending") { + return CompactionPending; + } + return kUnknown; +} + +bool InternalStats::GetProperty(DBPropertyType property_type, + const Slice& property, std::string* value, + VersionSet* version_set, + const MemTableList& imm) { + Version* current = version_set->current(); + Slice in = property; + + switch (property_type) { + case kNumFilesAtLevel: { + in.remove_prefix(strlen("rocksdb.num-files-at-level")); + uint64_t level; + bool ok = ConsumeDecimalNumber(&in, &level) && in.empty(); + if (!ok || (int)level >= number_levels_) { + return false; + } else { + char buf[100]; + snprintf(buf, sizeof(buf), "%d", + current->NumLevelFiles(static_cast(level))); + *value = buf; + return true; + } } + case kLevelStats: { + char buf[1000]; + snprintf(buf, sizeof(buf), + "Level Files Size(MB)\n" + "--------------------\n"); + value->append(buf); - // Pardon the long line but I think it is easier to read this way. - snprintf(buf, sizeof(buf), - " Compactions\n" - "Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count msComp msStall Ln-stall Stall-cnt\n" - "------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n" - ); - value->append(buf); - for (int level = 0; level < number_levels_; level++) { - int files = current->NumLevelFiles(level); - if (compaction_stats_[level].micros > 0 || files > 0) { - int64_t bytes_read = compaction_stats_[level].bytes_readn + - compaction_stats_[level].bytes_readnp1; - int64_t bytes_new = compaction_stats_[level].bytes_written - - compaction_stats_[level].bytes_readnp1; - double amplify = (compaction_stats_[level].bytes_readn == 0) - ? 0.0 - : (compaction_stats_[level].bytes_written + - compaction_stats_[level].bytes_readnp1 + - compaction_stats_[level].bytes_readn) / - (double)compaction_stats_[level].bytes_readn; - - total_bytes_read += bytes_read; - total_bytes_written += compaction_stats_[level].bytes_written; - - uint64_t stalls = level == 0 ? (stall_counts_[LEVEL0_SLOWDOWN] + - stall_counts_[LEVEL0_NUM_FILES] + - stall_counts_[MEMTABLE_COMPACTION]) - : stall_leveln_slowdown_count_[level]; - - double stall_us = level == 0 ? (stall_micros_[LEVEL0_SLOWDOWN] + - stall_micros_[LEVEL0_NUM_FILES] + - stall_micros_[MEMTABLE_COMPACTION]) - : stall_leveln_slowdown_[level]; - - snprintf(buf, sizeof(buf), - "%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f " - "%10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %8d %9.1f %9.1f " - "%9lu\n", - level, files, current->NumLevelBytes(level) / 1048576.0, - current->NumLevelBytes(level) / - version_set->MaxBytesForLevel(level), - compaction_stats_[level].micros / 1e6, bytes_read / 1048576.0, - compaction_stats_[level].bytes_written / 1048576.0, - compaction_stats_[level].bytes_readn / 1048576.0, - compaction_stats_[level].bytes_readnp1 / 1048576.0, - bytes_new / 1048576.0, amplify, - // +1 to avoid division by 0 - (bytes_read / 1048576.0) / - ((compaction_stats_[level].micros + 1) / 1000000.0), - (compaction_stats_[level].bytes_written / 1048576.0) / - ((compaction_stats_[level].micros + 1) / 1000000.0), - compaction_stats_[level].files_in_leveln, - compaction_stats_[level].files_in_levelnp1, - compaction_stats_[level].files_out_levelnp1, - compaction_stats_[level].files_out_levelnp1 - - compaction_stats_[level].files_in_levelnp1, - compaction_stats_[level].count, - (int)((double)compaction_stats_[level].micros / 1000.0 / - (compaction_stats_[level].count + 1)), - (double)stall_us / 1000.0 / (stalls + 1), stall_us / 1000000.0, - (unsigned long)stalls); - total_slowdown += stall_leveln_slowdown_[level]; - total_slowdown_count += stall_leveln_slowdown_count_[level]; + for (int level = 0; level < number_levels_; level++) { + snprintf(buf, sizeof(buf), "%3d %8d %8.0f\n", level, + current->NumLevelFiles(level), + current->NumLevelBytes(level) / 1048576.0); value->append(buf); } + return true; } + case kStats: { + char buf[1000]; + + uint64_t wal_bytes = 0; + uint64_t wal_synced = 0; + uint64_t user_bytes_written = 0; + uint64_t write_other = 0; + uint64_t write_self = 0; + uint64_t write_with_wal = 0; + uint64_t total_bytes_written = 0; + uint64_t total_bytes_read = 0; + uint64_t micros_up = env_->NowMicros() - started_at_; + // Add "+1" to make sure seconds_up is > 0 and avoid NaN later + double seconds_up = (micros_up + 1) / 1000000.0; + uint64_t total_slowdown = 0; + uint64_t total_slowdown_count = 0; + uint64_t interval_bytes_written = 0; + uint64_t interval_bytes_read = 0; + uint64_t interval_bytes_new = 0; + double interval_seconds_up = 0; + + if (statistics_) { + wal_bytes = statistics_->getTickerCount(WAL_FILE_BYTES); + wal_synced = statistics_->getTickerCount(WAL_FILE_SYNCED); + user_bytes_written = statistics_->getTickerCount(BYTES_WRITTEN); + write_other = statistics_->getTickerCount(WRITE_DONE_BY_OTHER); + write_self = statistics_->getTickerCount(WRITE_DONE_BY_SELF); + write_with_wal = statistics_->getTickerCount(WRITE_WITH_WAL); + } - interval_bytes_new = user_bytes_written - last_stats_.ingest_bytes_; - interval_bytes_read = total_bytes_read - last_stats_.compaction_bytes_read_; - interval_bytes_written = - total_bytes_written - last_stats_.compaction_bytes_written_; - interval_seconds_up = seconds_up - last_stats_.seconds_up_; - - snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n", - seconds_up, interval_seconds_up); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Writes cumulative: %llu total, %llu batches, " - "%.1f per batch, %.2f ingest GB\n", - (unsigned long long)(write_other + write_self), - (unsigned long long)write_self, - (write_other + write_self) / (double)(write_self + 1), - user_bytes_written / (1048576.0 * 1024)); - value->append(buf); - - snprintf(buf, sizeof(buf), - "WAL cumulative: %llu WAL writes, %llu WAL syncs, " - "%.2f writes per sync, %.2f GB written\n", - (unsigned long long)write_with_wal, (unsigned long long)wal_synced, - write_with_wal / (double)(wal_synced + 1), - wal_bytes / (1048576.0 * 1024)); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Compaction IO cumulative (GB): " - "%.2f new, %.2f read, %.2f write, %.2f read+write\n", - user_bytes_written / (1048576.0 * 1024), - total_bytes_read / (1048576.0 * 1024), - total_bytes_written / (1048576.0 * 1024), - (total_bytes_read + total_bytes_written) / (1048576.0 * 1024)); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Compaction IO cumulative (MB/sec): " - "%.1f new, %.1f read, %.1f write, %.1f read+write\n", - user_bytes_written / 1048576.0 / seconds_up, - total_bytes_read / 1048576.0 / seconds_up, - total_bytes_written / 1048576.0 / seconds_up, - (total_bytes_read + total_bytes_written) / 1048576.0 / seconds_up); - value->append(buf); - - // +1 to avoid divide by 0 and NaN - snprintf( - buf, sizeof(buf), - "Amplification cumulative: %.1f write, %.1f compaction\n", - (double)(total_bytes_written + wal_bytes) / (user_bytes_written + 1), - (double)(total_bytes_written + total_bytes_read + wal_bytes) / - (user_bytes_written + 1)); - value->append(buf); - - uint64_t interval_write_other = write_other - last_stats_.write_other_; - uint64_t interval_write_self = write_self - last_stats_.write_self_; - - snprintf(buf, sizeof(buf), - "Writes interval: %llu total, %llu batches, " - "%.1f per batch, %.1f ingest MB\n", - (unsigned long long)(interval_write_other + interval_write_self), - (unsigned long long)interval_write_self, - (double)(interval_write_other + interval_write_self) / - (interval_write_self + 1), - (user_bytes_written - last_stats_.ingest_bytes_) / 1048576.0); - value->append(buf); - - uint64_t interval_write_with_wal = - write_with_wal - last_stats_.write_with_wal_; - - uint64_t interval_wal_synced = wal_synced - last_stats_.wal_synced_; - uint64_t interval_wal_bytes = wal_bytes - last_stats_.wal_bytes_; - - snprintf(buf, sizeof(buf), - "WAL interval: %llu WAL writes, %llu WAL syncs, " - "%.2f writes per sync, %.2f MB written\n", - (unsigned long long)interval_write_with_wal, - (unsigned long long)interval_wal_synced, - interval_write_with_wal / (double)(interval_wal_synced + 1), - interval_wal_bytes / (1048576.0 * 1024)); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Compaction IO interval (MB): " - "%.2f new, %.2f read, %.2f write, %.2f read+write\n", - interval_bytes_new / 1048576.0, interval_bytes_read / 1048576.0, - interval_bytes_written / 1048576.0, - (interval_bytes_read + interval_bytes_written) / 1048576.0); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Compaction IO interval (MB/sec): " - "%.1f new, %.1f read, %.1f write, %.1f read+write\n", - interval_bytes_new / 1048576.0 / interval_seconds_up, - interval_bytes_read / 1048576.0 / interval_seconds_up, - interval_bytes_written / 1048576.0 / interval_seconds_up, - (interval_bytes_read + interval_bytes_written) / 1048576.0 / - interval_seconds_up); - value->append(buf); - - // +1 to avoid divide by 0 and NaN - snprintf( - buf, sizeof(buf), - "Amplification interval: %.1f write, %.1f compaction\n", - (double)(interval_bytes_written + wal_bytes) / (interval_bytes_new + 1), - (double)(interval_bytes_written + interval_bytes_read + wal_bytes) / - (interval_bytes_new + 1)); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Stalls(secs): %.3f level0_slowdown, %.3f level0_numfiles, " - "%.3f memtable_compaction, %.3f leveln_slowdown\n", - stall_micros_[LEVEL0_SLOWDOWN] / 1000000.0, - stall_micros_[LEVEL0_NUM_FILES] / 1000000.0, - stall_micros_[MEMTABLE_COMPACTION] / 1000000.0, - total_slowdown / 1000000.0); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Stalls(count): %lu level0_slowdown, %lu level0_numfiles, " - "%lu memtable_compaction, %lu leveln_slowdown\n", - (unsigned long)stall_counts_[LEVEL0_SLOWDOWN], - (unsigned long)stall_counts_[LEVEL0_NUM_FILES], - (unsigned long)stall_counts_[MEMTABLE_COMPACTION], - (unsigned long)total_slowdown_count); - value->append(buf); - - last_stats_.compaction_bytes_read_ = total_bytes_read; - last_stats_.compaction_bytes_written_ = total_bytes_written; - last_stats_.ingest_bytes_ = user_bytes_written; - last_stats_.seconds_up_ = seconds_up; - last_stats_.wal_bytes_ = wal_bytes; - last_stats_.wal_synced_ = wal_synced; - last_stats_.write_with_wal_ = write_with_wal; - last_stats_.write_other_ = write_other; - last_stats_.write_self_ = write_self; - - return true; - } else if (in == "sstables") { - *value = current->DebugString(); - return true; - } else if (in == "num-immutable-mem-table") { - *value = std::to_string(immsize); - return true; - } + // Pardon the long line but I think it is easier to read this way. + snprintf( + buf, sizeof(buf), + " Compactions\n" + "Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) " + " " + "Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn " + "Rnp1 " + " Wnp1 NewW Count msComp msStall Ln-stall Stall-cnt\n" + "--------------------------------------------------------------------" + "--" + "--------------------------------------------------------------------" + "--" + "----------------------------------------------------------------\n"); + value->append(buf); + for (int level = 0; level < number_levels_; level++) { + int files = current->NumLevelFiles(level); + if (compaction_stats_[level].micros > 0 || files > 0) { + int64_t bytes_read = compaction_stats_[level].bytes_readn + + compaction_stats_[level].bytes_readnp1; + int64_t bytes_new = compaction_stats_[level].bytes_written - + compaction_stats_[level].bytes_readnp1; + double amplify = + (compaction_stats_[level].bytes_readn == 0) + ? 0.0 + : (compaction_stats_[level].bytes_written + + compaction_stats_[level].bytes_readnp1 + + compaction_stats_[level].bytes_readn) / + (double)compaction_stats_[level].bytes_readn; + + total_bytes_read += bytes_read; + total_bytes_written += compaction_stats_[level].bytes_written; + + uint64_t stalls = level == 0 ? (stall_counts_[LEVEL0_SLOWDOWN] + + stall_counts_[LEVEL0_NUM_FILES] + + stall_counts_[MEMTABLE_COMPACTION]) + : stall_leveln_slowdown_count_[level]; + + double stall_us = level == 0 ? (stall_micros_[LEVEL0_SLOWDOWN] + + stall_micros_[LEVEL0_NUM_FILES] + + stall_micros_[MEMTABLE_COMPACTION]) + : stall_leveln_slowdown_[level]; + + snprintf(buf, sizeof(buf), + "%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f " + "%10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %8d %9.1f %9.1f " + "%9lu\n", + level, files, current->NumLevelBytes(level) / 1048576.0, + current->NumLevelBytes(level) / + version_set->MaxBytesForLevel(level), + compaction_stats_[level].micros / 1e6, + bytes_read / 1048576.0, + compaction_stats_[level].bytes_written / 1048576.0, + compaction_stats_[level].bytes_readn / 1048576.0, + compaction_stats_[level].bytes_readnp1 / 1048576.0, + bytes_new / 1048576.0, amplify, + // +1 to avoid division by 0 + (bytes_read / 1048576.0) / + ((compaction_stats_[level].micros + 1) / 1000000.0), + (compaction_stats_[level].bytes_written / 1048576.0) / + ((compaction_stats_[level].micros + 1) / 1000000.0), + compaction_stats_[level].files_in_leveln, + compaction_stats_[level].files_in_levelnp1, + compaction_stats_[level].files_out_levelnp1, + compaction_stats_[level].files_out_levelnp1 - + compaction_stats_[level].files_in_levelnp1, + compaction_stats_[level].count, + (int)((double)compaction_stats_[level].micros / 1000.0 / + (compaction_stats_[level].count + 1)), + (double)stall_us / 1000.0 / (stalls + 1), + stall_us / 1000000.0, (unsigned long)stalls); + total_slowdown += stall_leveln_slowdown_[level]; + total_slowdown_count += stall_leveln_slowdown_count_[level]; + value->append(buf); + } + } + + interval_bytes_new = user_bytes_written - last_stats_.ingest_bytes_; + interval_bytes_read = + total_bytes_read - last_stats_.compaction_bytes_read_; + interval_bytes_written = + total_bytes_written - last_stats_.compaction_bytes_written_; + interval_seconds_up = seconds_up - last_stats_.seconds_up_; + + snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n", + seconds_up, interval_seconds_up); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Writes cumulative: %llu total, %llu batches, " + "%.1f per batch, %.2f ingest GB\n", + (unsigned long long)(write_other + write_self), + (unsigned long long)write_self, + (write_other + write_self) / (double)(write_self + 1), + user_bytes_written / (1048576.0 * 1024)); + value->append(buf); + + snprintf(buf, sizeof(buf), + "WAL cumulative: %llu WAL writes, %llu WAL syncs, " + "%.2f writes per sync, %.2f GB written\n", + (unsigned long long)write_with_wal, + (unsigned long long)wal_synced, + write_with_wal / (double)(wal_synced + 1), + wal_bytes / (1048576.0 * 1024)); + value->append(buf); - return false; + snprintf(buf, sizeof(buf), + "Compaction IO cumulative (GB): " + "%.2f new, %.2f read, %.2f write, %.2f read+write\n", + user_bytes_written / (1048576.0 * 1024), + total_bytes_read / (1048576.0 * 1024), + total_bytes_written / (1048576.0 * 1024), + (total_bytes_read + total_bytes_written) / (1048576.0 * 1024)); + value->append(buf); + + snprintf( + buf, sizeof(buf), + "Compaction IO cumulative (MB/sec): " + "%.1f new, %.1f read, %.1f write, %.1f read+write\n", + user_bytes_written / 1048576.0 / seconds_up, + total_bytes_read / 1048576.0 / seconds_up, + total_bytes_written / 1048576.0 / seconds_up, + (total_bytes_read + total_bytes_written) / 1048576.0 / seconds_up); + value->append(buf); + + // +1 to avoid divide by 0 and NaN + snprintf( + buf, sizeof(buf), + "Amplification cumulative: %.1f write, %.1f compaction\n", + (double)(total_bytes_written + wal_bytes) / (user_bytes_written + 1), + (double)(total_bytes_written + total_bytes_read + wal_bytes) / + (user_bytes_written + 1)); + value->append(buf); + + uint64_t interval_write_other = write_other - last_stats_.write_other_; + uint64_t interval_write_self = write_self - last_stats_.write_self_; + + snprintf(buf, sizeof(buf), + "Writes interval: %llu total, %llu batches, " + "%.1f per batch, %.1f ingest MB\n", + (unsigned long long)(interval_write_other + interval_write_self), + (unsigned long long)interval_write_self, + (double)(interval_write_other + interval_write_self) / + (interval_write_self + 1), + (user_bytes_written - last_stats_.ingest_bytes_) / 1048576.0); + value->append(buf); + + uint64_t interval_write_with_wal = + write_with_wal - last_stats_.write_with_wal_; + + uint64_t interval_wal_synced = wal_synced - last_stats_.wal_synced_; + uint64_t interval_wal_bytes = wal_bytes - last_stats_.wal_bytes_; + + snprintf(buf, sizeof(buf), + "WAL interval: %llu WAL writes, %llu WAL syncs, " + "%.2f writes per sync, %.2f MB written\n", + (unsigned long long)interval_write_with_wal, + (unsigned long long)interval_wal_synced, + interval_write_with_wal / (double)(interval_wal_synced + 1), + interval_wal_bytes / (1048576.0 * 1024)); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Compaction IO interval (MB): " + "%.2f new, %.2f read, %.2f write, %.2f read+write\n", + interval_bytes_new / 1048576.0, interval_bytes_read / 1048576.0, + interval_bytes_written / 1048576.0, + (interval_bytes_read + interval_bytes_written) / 1048576.0); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Compaction IO interval (MB/sec): " + "%.1f new, %.1f read, %.1f write, %.1f read+write\n", + interval_bytes_new / 1048576.0 / interval_seconds_up, + interval_bytes_read / 1048576.0 / interval_seconds_up, + interval_bytes_written / 1048576.0 / interval_seconds_up, + (interval_bytes_read + interval_bytes_written) / 1048576.0 / + interval_seconds_up); + value->append(buf); + + // +1 to avoid divide by 0 and NaN + snprintf( + buf, sizeof(buf), + "Amplification interval: %.1f write, %.1f compaction\n", + (double)(interval_bytes_written + wal_bytes) / + (interval_bytes_new + 1), + (double)(interval_bytes_written + interval_bytes_read + wal_bytes) / + (interval_bytes_new + 1)); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Stalls(secs): %.3f level0_slowdown, %.3f level0_numfiles, " + "%.3f memtable_compaction, %.3f leveln_slowdown\n", + stall_micros_[LEVEL0_SLOWDOWN] / 1000000.0, + stall_micros_[LEVEL0_NUM_FILES] / 1000000.0, + stall_micros_[MEMTABLE_COMPACTION] / 1000000.0, + total_slowdown / 1000000.0); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Stalls(count): %lu level0_slowdown, %lu level0_numfiles, " + "%lu memtable_compaction, %lu leveln_slowdown\n", + (unsigned long)stall_counts_[LEVEL0_SLOWDOWN], + (unsigned long)stall_counts_[LEVEL0_NUM_FILES], + (unsigned long)stall_counts_[MEMTABLE_COMPACTION], + (unsigned long)total_slowdown_count); + value->append(buf); + + last_stats_.compaction_bytes_read_ = total_bytes_read; + last_stats_.compaction_bytes_written_ = total_bytes_written; + last_stats_.ingest_bytes_ = user_bytes_written; + last_stats_.seconds_up_ = seconds_up; + last_stats_.wal_bytes_ = wal_bytes; + last_stats_.wal_synced_ = wal_synced; + last_stats_.write_with_wal_ = write_with_wal; + last_stats_.write_other_ = write_other; + last_stats_.write_self_ = write_self; + + return true; + } + case kSsTables: + *value = current->DebugString(); + return true; + case kNumImmutableMemTable: + *value = std::to_string(imm.size()); + return true; + case MemtableFlushPending: + // Return number of mem tables that are ready to flush (made immutable) + *value = std::to_string(imm.IsFlushPending() ? 1 : 0); + return true; + case CompactionPending: + // 1 if the system already determines at least one compacdtion is needed. + // 0 otherwise, + *value = std::to_string(current->NeedsCompaction() ? 1 : 0); + return true; + default: + return false; + } } } // namespace rocksdb diff --git a/db/internal_stats.h b/db/internal_stats.h index 8e99a9d7e..5f1a6263a 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -17,6 +17,23 @@ #include namespace rocksdb { + +class MemTableList; + +enum DBPropertyType { + kNumFilesAtLevel, // Number of files at a specific level + kLevelStats, // Return number of files and total sizes of each level + kStats, // Return general statitistics of DB + kSsTables, // Return a human readable string of current SST files + kNumImmutableMemTable, // Return number of immutable mem tables + MemtableFlushPending, // Return 1 if mem table flushing is pending, otherwise + // 0. + CompactionPending, // Return 1 if a compaction is pending. Otherwise 0. + kUnknown, +}; + +extern DBPropertyType GetPropertyType(const Slice& property); + class InternalStats { public: enum WriteStallType { @@ -99,8 +116,9 @@ class InternalStats { stall_leveln_slowdown_count_[level] += micros; } - bool GetProperty(const Slice& property, std::string* value, - VersionSet* version_set, int immsize); + bool GetProperty(DBPropertyType property_type, const Slice& property, + std::string* value, VersionSet* version_set, + const MemTableList& imm); private: std::vector compaction_stats_; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index d58fe5048..ebda34802 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -92,7 +92,7 @@ void MemTableListVersion::Remove(MemTable* m) { // Returns true if there is at least one memtable on which flush has // not yet started. -bool MemTableList::IsFlushPending() { +bool MemTableList::IsFlushPending() const { if ((flush_requested_ && num_flush_not_started_ >= 1) || (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) { assert(imm_flush_needed.NoBarrier_Load() != nullptr); diff --git a/db/memtable_list.h b/db/memtable_list.h index 0bd54235a..3c87d4eee 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -86,7 +86,7 @@ class MemTableList { // Returns true if there is at least one memtable on which flush has // not yet started. - bool IsFlushPending(); + bool IsFlushPending() const; // Returns the earliest memtables that needs to be flushed. The returned // memtables are guaranteed to be in the ascending order of created time. From 71e6a34271c691f67747c14a74ffaa587f7c797d Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 18 Mar 2014 12:25:08 -0700 Subject: [PATCH 2/8] Add a DB property to indicate number of background errors encountered Summary: Add a property to calculate number of background errors encountered to help users build their monitoring Test Plan: Add a unit test. make all check Reviewers: haobo, igor, dhruba Reviewed By: igor CC: ljin, nkg-, yhchiang, leveldb Differential Revision: https://reviews.facebook.net/D16959 --- db/db_impl.cc | 22 ++++++++++++++++------ db/db_impl.h | 2 +- db/db_test.cc | 42 ++++++++++++++++++++++++++++++++++++++++++ db/internal_stats.cc | 16 ++++++++++++---- db/internal_stats.h | 20 +++++++++++++++++--- 5 files changed, 88 insertions(+), 14 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 7b77cbaaa..70652b4c7 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -9,6 +9,8 @@ #include "db/db_impl.h" +#define __STDC_FORMAT_MACROS +#include #include #include #include @@ -1806,8 +1808,10 @@ Status DBImpl::WaitForFlushMemTable() { return s; } -Status DBImpl::TEST_FlushMemTable() { - return FlushMemTable(FlushOptions()); +Status DBImpl::TEST_FlushMemTable(bool wait) { + FlushOptions fo; + fo.wait = wait; + return FlushMemTable(fo); } Status DBImpl::TEST_WaitForFlushMemTable() { @@ -1904,10 +1908,13 @@ void DBImpl::BackgroundCallFlush() { // case this is an environmental problem and we do not want to // chew up resources for failed compactions for the duration of // the problem. + uint64_t error_cnt = internal_stats_.BumpAndGetBackgroundErrorCount(); bg_cv_.SignalAll(); // In case a waiter can proceed despite the error - Log(options_.info_log, "Waiting after background flush error: %s", - s.ToString().c_str()); mutex_.Unlock(); + Log(options_.info_log, + "Waiting after background flush error: %s" + "Accumulated background error counts: %" PRIu64, + s.ToString().c_str(), error_cnt); log_buffer.FlushBufferToLog(); LogFlush(options_.info_log); env_->SleepForMicroseconds(1000000); @@ -1978,11 +1985,14 @@ void DBImpl::BackgroundCallCompaction() { // case this is an environmental problem and we do not want to // chew up resources for failed compactions for the duration of // the problem. + uint64_t error_cnt = internal_stats_.BumpAndGetBackgroundErrorCount(); bg_cv_.SignalAll(); // In case a waiter can proceed despite the error mutex_.Unlock(); log_buffer.FlushBufferToLog(); - Log(options_.info_log, "Waiting after background compaction error: %s", - s.ToString().c_str()); + Log(options_.info_log, + "Waiting after background compaction error: %s, " + "Accumulated background error counts: %" PRIu64, + s.ToString().c_str(), error_cnt); LogFlush(options_.info_log); env_->SleepForMicroseconds(1000000); mutex_.Lock(); diff --git a/db/db_impl.h b/db/db_impl.h index 6e6dc425a..dbcfb39aa 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -109,7 +109,7 @@ class DBImpl : public DB { const Slice* end); // Force current memtable contents to be flushed. - Status TEST_FlushMemTable(); + Status TEST_FlushMemTable(bool wait = true); // Wait for memtable compaction Status TEST_WaitForFlushMemTable(); diff --git a/db/db_test.cc b/db/db_test.cc index 8e99bcec9..5c076ce5b 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4188,6 +4188,11 @@ TEST(DBTest, NoSpace) { dbfull()->TEST_CompactRange(level, nullptr, nullptr); } } + + std::string property_value; + ASSERT_TRUE(db_->GetProperty("rocksdb.background-errors", &property_value)); + ASSERT_EQ("5", property_value); + env_->no_space_.Release_Store(nullptr); ASSERT_LT(CountFiles(), num_files + 3); @@ -4196,6 +4201,43 @@ TEST(DBTest, NoSpace) { } while (ChangeCompactOptions()); } +// Check background error counter bumped on flush failures. +TEST(DBTest, NoSpaceFlush) { + do { + Options options = CurrentOptions(); + options.env = env_; + options.max_background_flushes = 1; + Reopen(&options); + + ASSERT_OK(Put("foo", "v1")); + env_->no_space_.Release_Store(env_); // Force out-of-space errors + + std::string property_value; + // Background error count is 0 now. + ASSERT_TRUE(db_->GetProperty("rocksdb.background-errors", &property_value)); + ASSERT_EQ("0", property_value); + + dbfull()->TEST_FlushMemTable(false); + + // Wait 300 milliseconds or background-errors turned 1 from 0. + int time_to_sleep_limit = 300000; + while (time_to_sleep_limit > 0) { + int to_sleep = (time_to_sleep_limit > 1000) ? 1000 : time_to_sleep_limit; + time_to_sleep_limit -= to_sleep; + env_->SleepForMicroseconds(to_sleep); + + ASSERT_TRUE( + db_->GetProperty("rocksdb.background-errors", &property_value)); + if (property_value == "1") { + break; + } + } + ASSERT_EQ("1", property_value); + + env_->no_space_.Release_Store(nullptr); + } while (ChangeCompactOptions()); +} + TEST(DBTest, NonWritableFileSystem) { do { Options options = CurrentOptions(); diff --git a/db/internal_stats.cc b/db/internal_stats.cc index 1f81023f6..629941c88 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -30,9 +30,11 @@ DBPropertyType GetPropertyType(const Slice& property) { } else if (in == "num-immutable-mem-table") { return kNumImmutableMemTable; } else if (in == "mem-table-flush-pending") { - return MemtableFlushPending; + return kMemtableFlushPending; } else if (in == "compaction-pending") { - return CompactionPending; + return kCompactionPending; + } else if (in == "background-errors") { + return kBackgroundErrors; } return kUnknown; } @@ -330,15 +332,21 @@ bool InternalStats::GetProperty(DBPropertyType property_type, case kNumImmutableMemTable: *value = std::to_string(imm.size()); return true; - case MemtableFlushPending: + case kMemtableFlushPending: // Return number of mem tables that are ready to flush (made immutable) *value = std::to_string(imm.IsFlushPending() ? 1 : 0); return true; - case CompactionPending: + case kCompactionPending: // 1 if the system already determines at least one compacdtion is needed. // 0 otherwise, *value = std::to_string(current->NeedsCompaction() ? 1 : 0); return true; + ///////////// + case kBackgroundErrors: + // Accumulated number of errors in background flushes or compactions. + *value = std::to_string(GetBackgroundErrorCount()); + return true; + ///////// default: return false; } diff --git a/db/internal_stats.h b/db/internal_stats.h index 5f1a6263a..b6032d014 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -26,9 +26,11 @@ enum DBPropertyType { kStats, // Return general statitistics of DB kSsTables, // Return a human readable string of current SST files kNumImmutableMemTable, // Return number of immutable mem tables - MemtableFlushPending, // Return 1 if mem table flushing is pending, otherwise - // 0. - CompactionPending, // Return 1 if a compaction is pending. Otherwise 0. + kMemtableFlushPending, // Return 1 if mem table flushing is pending, + // otherwise + // 0. + kCompactionPending, // Return 1 if a compaction is pending. Otherwise 0. + kBackgroundErrors, // Return accumulated background errors encountered. kUnknown, }; @@ -49,6 +51,7 @@ class InternalStats { stall_counts_(WRITE_STALLS_ENUM_MAX, 0), stall_leveln_slowdown_(num_levels, 0), stall_leveln_slowdown_count_(num_levels, 0), + bg_error_count_(0), number_levels_(num_levels), statistics_(statistics), env_(env), @@ -116,6 +119,10 @@ class InternalStats { stall_leveln_slowdown_count_[level] += micros; } + uint64_t GetBackgroundErrorCount() const { return bg_error_count_; } + + uint64_t BumpAndGetBackgroundErrorCount() { return ++bg_error_count_; } + bool GetProperty(DBPropertyType property_type, const Slice& property, std::string* value, VersionSet* version_set, const MemTableList& imm); @@ -158,6 +165,13 @@ class InternalStats { std::vector stall_leveln_slowdown_; std::vector stall_leveln_slowdown_count_; + // Total number of background errors encountered. Every time a flush task + // or compaction task fails, this counter is incremented. The failure can + // be caused by any possible reason, including file system errors, out of + // resources, or input file corruption. Failing when retrying the same flush + // or compaction will cause the counter to increase too. + uint64_t bg_error_count_; + int number_levels_; Statistics* statistics_; Env* env_; From 6dc940d4c9951487281677b77d4d3e18f7032bc8 Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Wed, 19 Mar 2014 10:54:32 -0700 Subject: [PATCH 3/8] avoid shared_ptr assignment in Version::Get() Summary: This is a 500ns operation while the whole Get() call takes only a few micro! Test Plan: ran db_bench, for a DB with 50M keys, QPS jumps from 5.2M/s to 7.2M/s Reviewers: haobo, igor, dhruba Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D17007 --- db/version_set.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/db/version_set.cc b/db/version_set.cc index 3b7a74afd..f8cbbf52f 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -485,7 +485,7 @@ void Version::Get(const ReadOptions& options, const Comparator* ucmp = vset_->icmp_.user_comparator(); auto merge_operator = db_options.merge_operator.get(); - auto logger = db_options.info_log; + auto logger = db_options.info_log.get(); assert(status->ok() || status->IsMergeInProgress()); Saver saver; @@ -496,7 +496,7 @@ void Version::Get(const ReadOptions& options, saver.value = value; saver.merge_operator = merge_operator; saver.merge_context = merge_context; - saver.logger = logger.get(); + saver.logger = logger; saver.didIO = false; saver.statistics = db_options.statistics.get(); @@ -618,7 +618,7 @@ void Version::Get(const ReadOptions& options, // do a final merge of nullptr and operands; if (merge_operator->FullMerge(user_key, nullptr, saver.merge_context->GetOperands(), - value, logger.get())) { + value, logger)) { *status = Status::OK(); } else { RecordTick(db_options.statistics.get(), NUMBER_MERGE_FAILURES); From 22507aff6c852b73e2d98ede5367984b79e63516 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 19 Mar 2014 15:40:12 -0700 Subject: [PATCH 4/8] Fix compile issue in Mac OS Summary: Compile issues are: * Unused variable env_ * Unused fallocate_with_keep_size_ Test Plan: compiles Reviewers: dhruba, haobo, sdong Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D17043 --- db/db_test.cc | 8 +++----- util/env_posix.cc | 24 ++++++++++++++++++------ 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/db/db_test.cc b/db/db_test.cc index 5c076ce5b..d126bb530 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2057,8 +2057,7 @@ TEST(DBTest, NumImmutableMemTable) { class SleepingBackgroundTask { public: - explicit SleepingBackgroundTask(Env* env) - : env_(env), bg_cv_(&mutex_), should_sleep_(true) {} + SleepingBackgroundTask() : bg_cv_(&mutex_), should_sleep_(true) {} void DoSleep() { MutexLock l(&mutex_); while (should_sleep_) { @@ -2076,7 +2075,6 @@ class SleepingBackgroundTask { } private: - const Env* env_; port::Mutex mutex_; port::CondVar bg_cv_; // Signalled when background work finishes bool should_sleep_; @@ -2086,10 +2084,10 @@ TEST(DBTest, GetProperty) { // Set sizes to both background thread pool to be 1 and block them. env_->SetBackgroundThreads(1, Env::HIGH); env_->SetBackgroundThreads(1, Env::LOW); - SleepingBackgroundTask sleeping_task_low(env_); + SleepingBackgroundTask sleeping_task_low; env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); - SleepingBackgroundTask sleeping_task_high(env_); + SleepingBackgroundTask sleeping_task_high; env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high, Env::Priority::HIGH); diff --git a/util/env_posix.cc b/util/env_posix.cc index c610c1546..856d49250 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -356,7 +356,9 @@ class PosixMmapFile : public WritableFile { uint64_t file_offset_; // Offset of base_ in file // Have we done an munmap of unsynced data? bool pending_sync_; +#ifdef ROCKSDB_FALLOCATE_PRESENT bool fallocate_with_keep_size_; +#endif // Roundup x to a multiple of y static size_t Roundup(size_t x, size_t y) { @@ -441,8 +443,10 @@ class PosixMmapFile : public WritableFile { dst_(nullptr), last_sync_(nullptr), file_offset_(0), - pending_sync_(false), - fallocate_with_keep_size_(options.fallocate_with_keep_size) { + pending_sync_(false) { +#ifdef ROCKSDB_FALLOCATE_PRESENT + fallocate_with_keep_size_ = options.fallocate_with_keep_size; +#endif assert((page_size & (page_size - 1)) == 0); assert(options.use_mmap_writes); } @@ -614,7 +618,9 @@ class PosixWritableFile : public WritableFile { bool pending_fsync_; uint64_t last_sync_size_; uint64_t bytes_per_sync_; +#ifdef ROCKSDB_FALLOCATE_PRESENT bool fallocate_with_keep_size_; +#endif public: PosixWritableFile(const std::string& fname, int fd, size_t capacity, @@ -628,8 +634,10 @@ class PosixWritableFile : public WritableFile { pending_sync_(false), pending_fsync_(false), last_sync_size_(0), - bytes_per_sync_(options.bytes_per_sync), - fallocate_with_keep_size_(options.fallocate_with_keep_size) { + bytes_per_sync_(options.bytes_per_sync) { +#ifdef ROCKSDB_FALLOCATE_PRESENT + fallocate_with_keep_size_ = options.fallocate_with_keep_size; +#endif assert(!options.use_mmap_writes); } @@ -809,15 +817,19 @@ class PosixRandomRWFile : public RandomRWFile { int fd_; bool pending_sync_; bool pending_fsync_; +#ifdef ROCKSDB_FALLOCATE_PRESENT bool fallocate_with_keep_size_; +#endif public: PosixRandomRWFile(const std::string& fname, int fd, const EnvOptions& options) : filename_(fname), fd_(fd), pending_sync_(false), - pending_fsync_(false), - fallocate_with_keep_size_(options.fallocate_with_keep_size) { + pending_fsync_(false) { +#ifdef ROCKSDB_FALLOCATE_PRESENT + fallocate_with_keep_size_ = options.fallocate_with_keep_size; +#endif assert(!options.use_mmap_writes && !options.use_mmap_reads); } From 1ad0c2f9db9ea52520d5b231979e82b2c5d53004 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 19 Mar 2014 15:40:33 -0700 Subject: [PATCH 5/8] add tags to gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index a4cddf141..974991fd8 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,4 @@ build_tools/VALGRIND_LOGS/ coverage/COVERAGE_REPORT .gdbhistory .phutil_module_cache +tags From e493f2f54e90bc6aa42c4708e32f88af6e4e4e1e Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 19 Mar 2014 16:01:25 -0700 Subject: [PATCH 6/8] Don't compact with zero input files MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: We have an issue with internal service trying to run compaction with zero input files: 2014/02/07-02:26:58.386531 7f79117ec700 Compaction start summary: Base version 1420 Base level 3, seek compaction:0, inputs:[ϛ~^Qy^?],[] 2014/02/07-02:26:58.386539 7f79117ec700 Compacted 0@3 + 0@4 files => 0 bytes There are two issues: * inputsummary is printing out junk * it's constantly retrying (since I guess madeProgress is true), so it prints out a lot of data in the LOG file (40GB in one day). I read through the Level compaction picker and added some failure condition if input[0] is empty. I think PickCompaction() should not return compaction with zero input files with this change. I'm not confident enough to add an assertion though :) Test Plan: make check Reviewers: dhruba, haobo, sdong, kailiu Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D16005 --- db/compaction.cc | 5 ++--- db/compaction_picker.cc | 6 +++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/db/compaction.cc b/db/compaction.cc index 8faa89f67..8bb4f9c61 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -197,6 +197,7 @@ static void FileSizeSummary(unsigned long long sz, char* output, int len) { static int InputSummary(std::vector& files, char* output, int len) { + *output = '\0'; int write = 0; for (unsigned int i = 0; i < files.size(); i++) { int sz = len - write; @@ -233,9 +234,7 @@ void Compaction::Summary(char* output, int len) { return; } - if (inputs_[1].size()) { - write += InputSummary(inputs_[1], output+write, len-write); - } + write += InputSummary(inputs_[1], output+write, len-write); if (write < 0 || write >= len) { return; } diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index d585e41ec..e1b61d7e7 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -178,7 +178,11 @@ bool CompactionPicker::ExpandWhileOverlapping(Compaction* c) { // If, after the expansion, there are files that are already under // compaction, then we must drop/cancel this compaction. int parent_index = -1; - if (FilesInCompaction(c->inputs_[0]) || + if (c->inputs_[0].empty()) { + Log(options_->info_log, + "ExpandWhileOverlapping() failure because zero input files"); + } + if (c->inputs_[0].empty() || FilesInCompaction(c->inputs_[0]) || (c->level() != c->output_level() && ParentRangeInCompaction(c->input_version_, &smallest, &largest, level, &parent_index))) { From 69f6cf431da185a10a10128d01c27a4ab19d6a17 Mon Sep 17 00:00:00 2001 From: Kai Liu Date: Wed, 19 Mar 2014 16:04:51 -0700 Subject: [PATCH 7/8] Fix two bugs in talbe format Summary: Previous code had two bugs: * didn't initialize the table_magic_number_ explicitly -- as a result a random junk number is stored for table_magic_number_, making HasInitializedMagicNumber() always return true. * if condition is inconrrect in set_table_magic_number(), and the return value is not checked. I replace if-else by a stronger requirement enforced by assert(). Test Plan: Previous sst_dump failed to work. After the fix, things back to normal. Reviewers: yhchiang CC: haobo, sdong, igor, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D17055 --- table/format.h | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/table/format.h b/table/format.h index ed292347e..f05fbf890 100644 --- a/table/format.h +++ b/table/format.h @@ -112,14 +112,10 @@ class Footer { static const uint64_t kInvalidTableMagicNumber = 0; private: - // Set the table_magic_number only when it was not previously - // initialized. Return true on success. - bool set_table_magic_number(uint64_t magic_number) { - if (HasInitializedTableMagicNumber()) { - table_magic_number_ = magic_number; - return true; - } - return false; + // REQUIRES: magic number wasn't initialized. + void set_table_magic_number(uint64_t magic_number) { + assert(!HasInitializedTableMagicNumber()); + table_magic_number_ = magic_number; } // return true if @table_magic_number_ is set to a value different @@ -130,7 +126,7 @@ class Footer { BlockHandle metaindex_handle_; BlockHandle index_handle_; - uint64_t table_magic_number_; + uint64_t table_magic_number_ = 0; }; // Read the footer from file From fcd5c5e8289d88f1100d24e7331020ee3d280464 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 19 Mar 2014 16:52:26 -0700 Subject: [PATCH 8/8] ComputeCompactionScore in CompactionPicker Summary: As it turns out, we need the call to ComputeCompactionScore (previously: Finalize) in CompactionPicker. The issue caused a deadlock in db_stress: http://ci-builds.fb.com/job/rocksdb_crashtest/290/console The last two lines before a deadlock were: 2014/03/18-22:43:41.481029 7facafbee700 (Original Log Time 2014/03/18-22:43:41.480989) Compaction nothing to do 2014/03/18-22:43:41.481041 7faccf7fc700 wait for fewer level0 files... "Compaction nothing to do" and other thread waiting for fewer level0 files. Hm hm. I moved the pre-sorting to SaveTo, which should fix both the original and the new issue. Test Plan: make check for now, will run db_stress in jenkins Reviewers: dhruba, haobo, sdong Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D17037 --- db/compaction_picker.cc | 6 ++++++ db/version_set.cc | 29 +++++++++++++---------------- db/version_set.h | 6 +++--- 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index e1b61d7e7..ccdbce72b 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -377,6 +377,12 @@ Compaction* LevelCompactionPicker::PickCompaction(Version* version, Compaction* c = nullptr; int level = -1; + // Compute the compactions needed. It is better to do it here + // and also in LogAndApply(), otherwise the values could be stale. + std::vector size_being_compacted(NumberLevels() - 1); + SizeBeingCompacted(size_being_compacted); + version->ComputeCompactionScore(size_being_compacted); + // We prefer compactions triggered by too much data in a level over // the compactions triggered by seeks. // diff --git a/db/version_set.cc b/db/version_set.cc index f8cbbf52f..70da5b9d5 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -461,7 +461,6 @@ Version::Version(VersionSet* vset, uint64_t version_number) prev_(this), refs_(0), num_levels_(vset->num_levels_), - finalized_(false), files_(new std::vector[num_levels_]), files_by_size_(num_levels_), next_file_to_compact_by_size_(num_levels_), @@ -479,7 +478,6 @@ void Version::Get(const ReadOptions& options, GetStats* stats, const Options& db_options, bool* value_found) { - assert(finalized_); Slice ikey = k.internal_key(); Slice user_key = k.user_key(); const Comparator* ucmp = vset_->icmp_.user_comparator(); @@ -643,16 +641,8 @@ bool Version::UpdateStats(const GetStats& stats) { return false; } -void Version::Finalize(std::vector& size_being_compacted) { - assert(!finalized_); - finalized_ = true; - // Pre-sort level0 for Get() - if (vset_->options_->compaction_style == kCompactionStyleUniversal) { - std::sort(files_[0].begin(), files_[0].end(), NewestFirstBySeqNo); - } else { - std::sort(files_[0].begin(), files_[0].end(), NewestFirst); - } - +void Version::ComputeCompactionScore( + std::vector& size_being_compacted) { double max_score = 0; int max_score_level = 0; @@ -1398,6 +1388,13 @@ class VersionSet::Builder { } } + // TODO(icanadi) do it in the loop above, which already sorts the files + // Pre-sort level0 for Get() + if (v->vset_->options_->compaction_style == kCompactionStyleUniversal) { + std::sort(v->files_[0].begin(), v->files_[0].end(), NewestFirstBySeqNo); + } else { + std::sort(v->files_[0].begin(), v->files_[0].end(), NewestFirst); + } CheckConsistency(v); } @@ -1575,9 +1572,9 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, } } - // The calls to Finalize and UpdateFilesBySize are cpu-heavy + // The calls to ComputeCompactionScore and UpdateFilesBySize are cpu-heavy // and is best called outside the mutex. - v->Finalize(size_being_compacted); + v->ComputeCompactionScore(size_being_compacted); v->UpdateFilesBySize(); // Write new record to MANIFEST log @@ -1870,7 +1867,7 @@ Status VersionSet::Recover() { // Install recovered version std::vector size_being_compacted(v->NumberLevels() - 1); compaction_picker_->SizeBeingCompacted(size_being_compacted); - v->Finalize(size_being_compacted); + v->ComputeCompactionScore(size_being_compacted); manifest_file_size_ = manifest_file_size; AppendVersion(v); @@ -2074,7 +2071,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, // Install recovered version std::vector size_being_compacted(v->NumberLevels() - 1); compaction_picker_->SizeBeingCompacted(size_being_compacted); - v->Finalize(size_being_compacted); + v->ComputeCompactionScore(size_being_compacted); AppendVersion(v); manifest_file_number_ = next_file; diff --git a/db/version_set.h b/db/version_set.h index 39bb7d414..7d7cdf4fc 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -94,8 +94,9 @@ class Version { // Updates internal structures that keep track of compaction scores // We use compaction scores to figure out which compaction to do next - // Also pre-sorts level0 files for Get() - void Finalize(std::vector& size_being_compacted); + // REQUIRES: If Version is not yet saved to current_, it can be called without + // a lock. Once a version is saved to current_, call only with mutex held + void ComputeCompactionScore(std::vector& size_being_compacted); // Reference count management (so Versions do not disappear out from // under live iterators) @@ -227,7 +228,6 @@ class Version { Version* prev_; // Previous version in linked list int refs_; // Number of live refs to this version int num_levels_; // Number of levels - bool finalized_; // True if Finalized is called // List of files per level, files in each level are arranged // in increasing order of keys