Merge branch 'master' into columnfamilies

Conflicts:
	db/db_impl.cc
	db/internal_stats.cc
	db/internal_stats.h
	db/version_set.cc
main
Igor Canadi 10 years ago
commit e20fa3f8a4
  1. 1
      .gitignore
  2. 5
      db/compaction.cc
  3. 12
      db/compaction_picker.cc
  4. 29
      db/db_impl.cc
  5. 2
      db/db_impl.h
  6. 128
      db/db_test.cc
  7. 587
      db/internal_stats.cc
  8. 35
      db/internal_stats.h
  9. 2
      db/memtable_list.cc
  10. 2
      db/memtable_list.h
  11. 37
      db/version_set.cc
  12. 6
      db/version_set.h
  13. 14
      table/format.h
  14. 24
      util/env_posix.cc

1
.gitignore vendored

@ -22,3 +22,4 @@ build_tools/VALGRIND_LOGS/
coverage/COVERAGE_REPORT
.gdbhistory
.phutil_module_cache
tags

@ -213,6 +213,7 @@ static void FileSizeSummary(unsigned long long sz, char* output, int len) {
static int InputSummary(std::vector<FileMetaData*>& files, char* output,
int len) {
*output = '\0';
int write = 0;
for (unsigned int i = 0; i < files.size(); i++) {
int sz = len - write;
@ -249,9 +250,7 @@ void Compaction::Summary(char* output, int len) {
return;
}
if (inputs_[1].size()) {
write += InputSummary(inputs_[1], output+write, len-write);
}
write += InputSummary(inputs_[1], output+write, len-write);
if (write < 0 || write >= len) {
return;
}

@ -178,7 +178,11 @@ bool CompactionPicker::ExpandWhileOverlapping(Compaction* c) {
// If, after the expansion, there are files that are already under
// compaction, then we must drop/cancel this compaction.
int parent_index = -1;
if (FilesInCompaction(c->inputs_[0]) ||
if (c->inputs_[0].empty()) {
Log(options_->info_log,
"ExpandWhileOverlapping() failure because zero input files");
}
if (c->inputs_[0].empty() || FilesInCompaction(c->inputs_[0]) ||
(c->level() != c->output_level() &&
ParentRangeInCompaction(c->input_version_, &smallest, &largest, level,
&parent_index))) {
@ -369,6 +373,12 @@ Compaction* LevelCompactionPicker::PickCompaction(Version* version,
Compaction* c = nullptr;
int level = -1;
// Compute the compactions needed. It is better to do it here
// and also in LogAndApply(), otherwise the values could be stale.
std::vector<uint64_t> size_being_compacted(NumberLevels() - 1);
SizeBeingCompacted(size_being_compacted);
version->ComputeCompactionScore(size_being_compacted);
// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
//

@ -10,7 +10,6 @@
#include "db/db_impl.h"
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include <algorithm>
#include <climits>
@ -1711,8 +1710,10 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
return s;
}
Status DBImpl::TEST_FlushMemTable() {
return FlushMemTable(default_cf_handle_->cfd(), FlushOptions());
Status DBImpl::TEST_FlushMemTable(bool wait) {
FlushOptions fo;
fo.wait = wait;
return FlushMemTable(default_cf_handle_->cfd(), fo);
}
Status DBImpl::TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family) {
@ -1851,10 +1852,15 @@ void DBImpl::BackgroundCallFlush() {
// case this is an environmental problem and we do not want to
// chew up resources for failed compactions for the duration of
// the problem.
uint64_t error_cnt = default_cf_handle_->cfd()
->internal_stats()
->BumpAndGetBackgroundErrorCount();
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
Log(options_.info_log, "Waiting after background flush error: %s",
s.ToString().c_str());
mutex_.Unlock();
Log(options_.info_log,
"Waiting after background flush error: %s"
"Accumulated background error counts: %" PRIu64,
s.ToString().c_str(), error_cnt);
log_buffer.FlushBufferToLog();
LogFlush(options_.info_log);
env_->SleepForMicroseconds(1000000);
@ -1925,11 +1931,16 @@ void DBImpl::BackgroundCallCompaction() {
// case this is an environmental problem and we do not want to
// chew up resources for failed compactions for the duration of
// the problem.
uint64_t error_cnt = default_cf_handle_->cfd()
->internal_stats()
->BumpAndGetBackgroundErrorCount();
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
mutex_.Unlock();
log_buffer.FlushBufferToLog();
Log(options_.info_log, "Waiting after background compaction error: %s",
s.ToString().c_str());
Log(options_.info_log,
"Waiting after background compaction error: %s, "
"Accumulated background error counts: %" PRIu64,
s.ToString().c_str(), error_cnt);
LogFlush(options_.info_log);
env_->SleepForMicroseconds(1000000);
mutex_.Lock();
@ -3820,8 +3831,10 @@ bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
value->clear();
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
DBPropertyType property_type = GetPropertyType(property);
MutexLock l(&mutex_);
return cfd->internal_stats()->GetProperty(property, value, cfd);
return cfd->internal_stats()->GetProperty(property_type, property, value,
cfd);
}
void DBImpl::GetApproximateSizes(ColumnFamilyHandle* column_family,

@ -143,7 +143,7 @@ class DBImpl : public DB {
ColumnFamilyHandle* column_family = nullptr);
// Force current memtable contents to be flushed.
Status TEST_FlushMemTable();
Status TEST_FlushMemTable(bool wait = true);
// Wait for memtable compaction
Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr);

@ -2206,6 +2206,92 @@ TEST(DBTest, NumImmutableMemTable) {
} while (ChangeCompactOptions());
}
class SleepingBackgroundTask {
public:
SleepingBackgroundTask() : bg_cv_(&mutex_), should_sleep_(true) {}
void DoSleep() {
MutexLock l(&mutex_);
while (should_sleep_) {
bg_cv_.Wait();
}
}
void WakeUp() {
MutexLock l(&mutex_);
should_sleep_ = false;
bg_cv_.SignalAll();
}
static void DoSleepTask(void* arg) {
reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep();
}
private:
port::Mutex mutex_;
port::CondVar bg_cv_; // Signalled when background work finishes
bool should_sleep_;
};
TEST(DBTest, GetProperty) {
// Set sizes to both background thread pool to be 1 and block them.
env_->SetBackgroundThreads(1, Env::HIGH);
env_->SetBackgroundThreads(1, Env::LOW);
SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);
SleepingBackgroundTask sleeping_task_high;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high,
Env::Priority::HIGH);
Options options = CurrentOptions();
WriteOptions writeOpt = WriteOptions();
writeOpt.disableWAL = true;
options.compaction_style = kCompactionStyleUniversal;
options.level0_file_num_compaction_trigger = 1;
options.compaction_options_universal.size_ratio = 50;
options.max_background_compactions = 1;
options.max_background_flushes = 1;
options.max_write_buffer_number = 10;
options.min_write_buffer_number_to_merge = 1;
options.write_buffer_size = 1000000;
Reopen(&options);
std::string big_value(1000000 * 2, 'x');
std::string num;
SetPerfLevel(kEnableTime);
ASSERT_OK(dbfull()->Put(writeOpt, "k1", big_value));
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.num-immutable-mem-table", &num));
ASSERT_EQ(num, "0");
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.mem-table-flush-pending", &num));
ASSERT_EQ(num, "0");
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.compaction-pending", &num));
ASSERT_EQ(num, "0");
perf_context.Reset();
ASSERT_OK(dbfull()->Put(writeOpt, "k2", big_value));
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.num-immutable-mem-table", &num));
ASSERT_EQ(num, "1");
ASSERT_OK(dbfull()->Put(writeOpt, "k3", big_value));
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.num-immutable-mem-table", &num));
ASSERT_EQ(num, "2");
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.mem-table-flush-pending", &num));
ASSERT_EQ(num, "1");
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.compaction-pending", &num));
ASSERT_EQ(num, "0");
sleeping_task_high.WakeUp();
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_OK(dbfull()->Put(writeOpt, "k4", big_value));
ASSERT_OK(dbfull()->Put(writeOpt, "k5", big_value));
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.mem-table-flush-pending", &num));
ASSERT_EQ(num, "0");
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.compaction-pending", &num));
ASSERT_EQ(num, "1");
sleeping_task_low.WakeUp();
}
TEST(DBTest, FLUSH) {
do {
CreateAndReopenWithCF({"pikachu"});
@ -4286,6 +4372,11 @@ TEST(DBTest, NoSpace) {
dbfull()->TEST_CompactRange(level, nullptr, nullptr);
}
}
std::string property_value;
ASSERT_TRUE(db_->GetProperty("rocksdb.background-errors", &property_value));
ASSERT_EQ("5", property_value);
env_->no_space_.Release_Store(nullptr);
ASSERT_LT(CountFiles(), num_files + 3);
@ -4294,6 +4385,43 @@ TEST(DBTest, NoSpace) {
} while (ChangeCompactOptions());
}
// Check background error counter bumped on flush failures.
TEST(DBTest, NoSpaceFlush) {
do {
Options options = CurrentOptions();
options.env = env_;
options.max_background_flushes = 1;
Reopen(&options);
ASSERT_OK(Put("foo", "v1"));
env_->no_space_.Release_Store(env_); // Force out-of-space errors
std::string property_value;
// Background error count is 0 now.
ASSERT_TRUE(db_->GetProperty("rocksdb.background-errors", &property_value));
ASSERT_EQ("0", property_value);
dbfull()->TEST_FlushMemTable(false);
// Wait 300 milliseconds or background-errors turned 1 from 0.
int time_to_sleep_limit = 300000;
while (time_to_sleep_limit > 0) {
int to_sleep = (time_to_sleep_limit > 1000) ? 1000 : time_to_sleep_limit;
time_to_sleep_limit -= to_sleep;
env_->SleepForMicroseconds(to_sleep);
ASSERT_TRUE(
db_->GetProperty("rocksdb.background-errors", &property_value));
if (property_value == "1") {
break;
}
}
ASSERT_EQ("1", property_value);
env_->no_space_.Release_Store(nullptr);
} while (ChangeCompactOptions());
}
TEST(DBTest, NonWritableFileSystem) {
do {
Options options = CurrentOptions();

@ -1,4 +1,3 @@
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
@ -14,285 +13,341 @@
namespace rocksdb {
bool InternalStats::GetProperty(const Slice& property, std::string* value,
ColumnFamilyData* cfd) {
DBPropertyType GetPropertyType(const Slice& property) {
Slice in = property;
Slice prefix("rocksdb.");
if (!in.starts_with(prefix)) return false;
if (!in.starts_with(prefix)) return kUnknown;
in.remove_prefix(prefix.size());
if (in.starts_with("num-files-at-level")) {
in.remove_prefix(strlen("num-files-at-level"));
uint64_t level;
bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
if (!ok || (int)level >= number_levels_) {
return false;
} else {
char buf[100];
snprintf(buf, sizeof(buf), "%d",
cfd->current()->NumLevelFiles(static_cast<int>(level)));
*value = buf;
return true;
}
return kNumFilesAtLevel;
} else if (in == "levelstats") {
char buf[1000];
snprintf(buf, sizeof(buf),
"Level Files Size(MB)\n"
"--------------------\n");
value->append(buf);
for (int level = 0; level < number_levels_; level++) {
snprintf(buf, sizeof(buf), "%3d %8d %8.0f\n", level,
cfd->current()->NumLevelFiles(level),
cfd->current()->NumLevelBytes(level) / 1048576.0);
value->append(buf);
}
return true;
return kLevelStats;
} else if (in == "stats") {
char buf[1000];
uint64_t wal_bytes = 0;
uint64_t wal_synced = 0;
uint64_t user_bytes_written = 0;
uint64_t write_other = 0;
uint64_t write_self = 0;
uint64_t write_with_wal = 0;
uint64_t total_bytes_written = 0;
uint64_t total_bytes_read = 0;
uint64_t micros_up = env_->NowMicros() - started_at_;
// Add "+1" to make sure seconds_up is > 0 and avoid NaN later
double seconds_up = (micros_up + 1) / 1000000.0;
uint64_t total_slowdown = 0;
uint64_t total_slowdown_count = 0;
uint64_t interval_bytes_written = 0;
uint64_t interval_bytes_read = 0;
uint64_t interval_bytes_new = 0;
double interval_seconds_up = 0;
if (statistics_) {
wal_bytes = statistics_->getTickerCount(WAL_FILE_BYTES);
wal_synced = statistics_->getTickerCount(WAL_FILE_SYNCED);
user_bytes_written = statistics_->getTickerCount(BYTES_WRITTEN);
write_other = statistics_->getTickerCount(WRITE_DONE_BY_OTHER);
write_self = statistics_->getTickerCount(WRITE_DONE_BY_SELF);
write_with_wal = statistics_->getTickerCount(WRITE_WITH_WAL);
return kStats;
} else if (in == "sstables") {
return kSsTables;
} else if (in == "num-immutable-mem-table") {
return kNumImmutableMemTable;
} else if (in == "mem-table-flush-pending") {
return kMemtableFlushPending;
} else if (in == "compaction-pending") {
return kCompactionPending;
} else if (in == "background-errors") {
return kBackgroundErrors;
}
return kUnknown;
}
bool InternalStats::GetProperty(DBPropertyType property_type,
const Slice& property, std::string* value,
ColumnFamilyData* cfd) {
Version* current = cfd->current();
Slice in = property;
switch (property_type) {
case kNumFilesAtLevel: {
in.remove_prefix(strlen("rocksdb.num-files-at-level"));
uint64_t level;
bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
if (!ok || (int)level >= number_levels_) {
return false;
} else {
char buf[100];
snprintf(buf, sizeof(buf), "%d",
current->NumLevelFiles(static_cast<int>(level)));
*value = buf;
return true;
}
}
case kLevelStats: {
char buf[1000];
snprintf(buf, sizeof(buf),
"Level Files Size(MB)\n"
"--------------------\n");
value->append(buf);
// Pardon the long line but I think it is easier to read this way.
snprintf(buf, sizeof(buf),
" Compactions\n"
"Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count msComp msStall Ln-stall Stall-cnt\n"
"------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n"
);
value->append(buf);
for (int level = 0; level < number_levels_; level++) {
int files = cfd->current()->NumLevelFiles(level);
if (compaction_stats_[level].micros > 0 || files > 0) {
int64_t bytes_read = compaction_stats_[level].bytes_readn +
compaction_stats_[level].bytes_readnp1;
int64_t bytes_new = compaction_stats_[level].bytes_written -
compaction_stats_[level].bytes_readnp1;
double amplify = (compaction_stats_[level].bytes_readn == 0)
? 0.0
: (compaction_stats_[level].bytes_written +
compaction_stats_[level].bytes_readnp1 +
compaction_stats_[level].bytes_readn) /
(double)compaction_stats_[level].bytes_readn;
total_bytes_read += bytes_read;
total_bytes_written += compaction_stats_[level].bytes_written;
uint64_t stalls = level == 0 ? (stall_counts_[LEVEL0_SLOWDOWN] +
stall_counts_[LEVEL0_NUM_FILES] +
stall_counts_[MEMTABLE_COMPACTION])
: stall_leveln_slowdown_count_[level];
double stall_us = level == 0 ? (stall_micros_[LEVEL0_SLOWDOWN] +
stall_micros_[LEVEL0_NUM_FILES] +
stall_micros_[MEMTABLE_COMPACTION])
: stall_leveln_slowdown_[level];
snprintf(buf, sizeof(buf),
"%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f "
"%10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %8d %9.1f %9.1f "
"%9lu\n",
level, files, cfd->current()->NumLevelBytes(level) / 1048576.0,
cfd->current()->NumLevelBytes(level) /
cfd->compaction_picker()->MaxBytesForLevel(level),
compaction_stats_[level].micros / 1e6, bytes_read / 1048576.0,
compaction_stats_[level].bytes_written / 1048576.0,
compaction_stats_[level].bytes_readn / 1048576.0,
compaction_stats_[level].bytes_readnp1 / 1048576.0,
bytes_new / 1048576.0, amplify,
// +1 to avoid division by 0
(bytes_read / 1048576.0) /
((compaction_stats_[level].micros + 1) / 1000000.0),
(compaction_stats_[level].bytes_written / 1048576.0) /
((compaction_stats_[level].micros + 1) / 1000000.0),
compaction_stats_[level].files_in_leveln,
compaction_stats_[level].files_in_levelnp1,
compaction_stats_[level].files_out_levelnp1,
compaction_stats_[level].files_out_levelnp1 -
compaction_stats_[level].files_in_levelnp1,
compaction_stats_[level].count,
(int)((double)compaction_stats_[level].micros / 1000.0 /
(compaction_stats_[level].count + 1)),
(double)stall_us / 1000.0 / (stalls + 1), stall_us / 1000000.0,
(unsigned long)stalls);
total_slowdown += stall_leveln_slowdown_[level];
total_slowdown_count += stall_leveln_slowdown_count_[level];
for (int level = 0; level < number_levels_; level++) {
snprintf(buf, sizeof(buf), "%3d %8d %8.0f\n", level,
current->NumLevelFiles(level),
current->NumLevelBytes(level) / 1048576.0);
value->append(buf);
}
return true;
}
case kStats: {
char buf[1000];
uint64_t wal_bytes = 0;
uint64_t wal_synced = 0;
uint64_t user_bytes_written = 0;
uint64_t write_other = 0;
uint64_t write_self = 0;
uint64_t write_with_wal = 0;
uint64_t total_bytes_written = 0;
uint64_t total_bytes_read = 0;
uint64_t micros_up = env_->NowMicros() - started_at_;
// Add "+1" to make sure seconds_up is > 0 and avoid NaN later
double seconds_up = (micros_up + 1) / 1000000.0;
uint64_t total_slowdown = 0;
uint64_t total_slowdown_count = 0;
uint64_t interval_bytes_written = 0;
uint64_t interval_bytes_read = 0;
uint64_t interval_bytes_new = 0;
double interval_seconds_up = 0;
if (statistics_) {
wal_bytes = statistics_->getTickerCount(WAL_FILE_BYTES);
wal_synced = statistics_->getTickerCount(WAL_FILE_SYNCED);
user_bytes_written = statistics_->getTickerCount(BYTES_WRITTEN);
write_other = statistics_->getTickerCount(WRITE_DONE_BY_OTHER);
write_self = statistics_->getTickerCount(WRITE_DONE_BY_SELF);
write_with_wal = statistics_->getTickerCount(WRITE_WITH_WAL);
}
interval_bytes_new = user_bytes_written - last_stats_.ingest_bytes_;
interval_bytes_read = total_bytes_read - last_stats_.compaction_bytes_read_;
interval_bytes_written =
total_bytes_written - last_stats_.compaction_bytes_written_;
interval_seconds_up = seconds_up - last_stats_.seconds_up_;
snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n",
seconds_up, interval_seconds_up);
value->append(buf);
snprintf(buf, sizeof(buf),
"Writes cumulative: %llu total, %llu batches, "
"%.1f per batch, %.2f ingest GB\n",
(unsigned long long)(write_other + write_self),
(unsigned long long)write_self,
(write_other + write_self) / (double)(write_self + 1),
user_bytes_written / (1048576.0 * 1024));
value->append(buf);
snprintf(buf, sizeof(buf),
"WAL cumulative: %llu WAL writes, %llu WAL syncs, "
"%.2f writes per sync, %.2f GB written\n",
(unsigned long long)write_with_wal, (unsigned long long)wal_synced,
write_with_wal / (double)(wal_synced + 1),
wal_bytes / (1048576.0 * 1024));
value->append(buf);
snprintf(buf, sizeof(buf),
"Compaction IO cumulative (GB): "
"%.2f new, %.2f read, %.2f write, %.2f read+write\n",
user_bytes_written / (1048576.0 * 1024),
total_bytes_read / (1048576.0 * 1024),
total_bytes_written / (1048576.0 * 1024),
(total_bytes_read + total_bytes_written) / (1048576.0 * 1024));
value->append(buf);
snprintf(buf, sizeof(buf),
"Compaction IO cumulative (MB/sec): "
"%.1f new, %.1f read, %.1f write, %.1f read+write\n",
user_bytes_written / 1048576.0 / seconds_up,
total_bytes_read / 1048576.0 / seconds_up,
total_bytes_written / 1048576.0 / seconds_up,
(total_bytes_read + total_bytes_written) / 1048576.0 / seconds_up);
value->append(buf);
// +1 to avoid divide by 0 and NaN
snprintf(
buf, sizeof(buf),
"Amplification cumulative: %.1f write, %.1f compaction\n",
(double)(total_bytes_written + wal_bytes) / (user_bytes_written + 1),
(double)(total_bytes_written + total_bytes_read + wal_bytes) /
(user_bytes_written + 1));
value->append(buf);
uint64_t interval_write_other = write_other - last_stats_.write_other_;
uint64_t interval_write_self = write_self - last_stats_.write_self_;
snprintf(buf, sizeof(buf),
"Writes interval: %llu total, %llu batches, "
"%.1f per batch, %.1f ingest MB\n",
(unsigned long long)(interval_write_other + interval_write_self),
(unsigned long long)interval_write_self,
(double)(interval_write_other + interval_write_self) /
(interval_write_self + 1),
(user_bytes_written - last_stats_.ingest_bytes_) / 1048576.0);
value->append(buf);
uint64_t interval_write_with_wal =
write_with_wal - last_stats_.write_with_wal_;
uint64_t interval_wal_synced = wal_synced - last_stats_.wal_synced_;
uint64_t interval_wal_bytes = wal_bytes - last_stats_.wal_bytes_;
snprintf(buf, sizeof(buf),
"WAL interval: %llu WAL writes, %llu WAL syncs, "
"%.2f writes per sync, %.2f MB written\n",
(unsigned long long)interval_write_with_wal,
(unsigned long long)interval_wal_synced,
interval_write_with_wal / (double)(interval_wal_synced + 1),
interval_wal_bytes / (1048576.0 * 1024));
value->append(buf);
snprintf(buf, sizeof(buf),
"Compaction IO interval (MB): "
"%.2f new, %.2f read, %.2f write, %.2f read+write\n",
interval_bytes_new / 1048576.0, interval_bytes_read / 1048576.0,
interval_bytes_written / 1048576.0,
(interval_bytes_read + interval_bytes_written) / 1048576.0);
value->append(buf);
snprintf(buf, sizeof(buf),
"Compaction IO interval (MB/sec): "
"%.1f new, %.1f read, %.1f write, %.1f read+write\n",
interval_bytes_new / 1048576.0 / interval_seconds_up,
interval_bytes_read / 1048576.0 / interval_seconds_up,
interval_bytes_written / 1048576.0 / interval_seconds_up,
(interval_bytes_read + interval_bytes_written) / 1048576.0 /
interval_seconds_up);
value->append(buf);
// +1 to avoid divide by 0 and NaN
snprintf(
buf, sizeof(buf),
"Amplification interval: %.1f write, %.1f compaction\n",
(double)(interval_bytes_written + wal_bytes) / (interval_bytes_new + 1),
(double)(interval_bytes_written + interval_bytes_read + wal_bytes) /
(interval_bytes_new + 1));
value->append(buf);
snprintf(buf, sizeof(buf),
"Stalls(secs): %.3f level0_slowdown, %.3f level0_numfiles, "
"%.3f memtable_compaction, %.3f leveln_slowdown\n",
stall_micros_[LEVEL0_SLOWDOWN] / 1000000.0,
stall_micros_[LEVEL0_NUM_FILES] / 1000000.0,
stall_micros_[MEMTABLE_COMPACTION] / 1000000.0,
total_slowdown / 1000000.0);
value->append(buf);
snprintf(buf, sizeof(buf),
"Stalls(count): %lu level0_slowdown, %lu level0_numfiles, "
"%lu memtable_compaction, %lu leveln_slowdown\n",
(unsigned long)stall_counts_[LEVEL0_SLOWDOWN],
(unsigned long)stall_counts_[LEVEL0_NUM_FILES],
(unsigned long)stall_counts_[MEMTABLE_COMPACTION],
(unsigned long)total_slowdown_count);
value->append(buf);
last_stats_.compaction_bytes_read_ = total_bytes_read;
last_stats_.compaction_bytes_written_ = total_bytes_written;
last_stats_.ingest_bytes_ = user_bytes_written;
last_stats_.seconds_up_ = seconds_up;
last_stats_.wal_bytes_ = wal_bytes;
last_stats_.wal_synced_ = wal_synced;
last_stats_.write_with_wal_ = write_with_wal;
last_stats_.write_other_ = write_other;
last_stats_.write_self_ = write_self;
return true;
} else if (in == "sstables") {
*value = cfd->current()->DebugString();
return true;
} else if (in == "num-immutable-mem-table") {
*value = std::to_string(cfd->imm()->size());
return true;
}
snprintf(
buf, sizeof(buf),
" Compactions\n"
"Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) "
" "
"Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn "
"Rnp1 "
" Wnp1 NewW Count msComp msStall Ln-stall Stall-cnt\n"
"--------------------------------------------------------------------"
"--"
"--------------------------------------------------------------------"
"--"
"----------------------------------------------------------------\n");
value->append(buf);
for (int level = 0; level < number_levels_; level++) {
int files = current->NumLevelFiles(level);
if (compaction_stats_[level].micros > 0 || files > 0) {
int64_t bytes_read = compaction_stats_[level].bytes_readn +
compaction_stats_[level].bytes_readnp1;
int64_t bytes_new = compaction_stats_[level].bytes_written -
compaction_stats_[level].bytes_readnp1;
double amplify =
(compaction_stats_[level].bytes_readn == 0)
? 0.0
: (compaction_stats_[level].bytes_written +
compaction_stats_[level].bytes_readnp1 +
compaction_stats_[level].bytes_readn) /
(double)compaction_stats_[level].bytes_readn;
total_bytes_read += bytes_read;
total_bytes_written += compaction_stats_[level].bytes_written;
uint64_t stalls = level == 0 ? (stall_counts_[LEVEL0_SLOWDOWN] +
stall_counts_[LEVEL0_NUM_FILES] +
stall_counts_[MEMTABLE_COMPACTION])
: stall_leveln_slowdown_count_[level];
double stall_us = level == 0 ? (stall_micros_[LEVEL0_SLOWDOWN] +
stall_micros_[LEVEL0_NUM_FILES] +
stall_micros_[MEMTABLE_COMPACTION])
: stall_leveln_slowdown_[level];
snprintf(buf, sizeof(buf),
"%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f "
"%10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %8d %9.1f %9.1f "
"%9lu\n",
level, files, current->NumLevelBytes(level) / 1048576.0,
current->NumLevelBytes(level) /
cfd->compaction_picker()->MaxBytesForLevel(level),
compaction_stats_[level].micros / 1e6,
bytes_read / 1048576.0,
compaction_stats_[level].bytes_written / 1048576.0,
compaction_stats_[level].bytes_readn / 1048576.0,
compaction_stats_[level].bytes_readnp1 / 1048576.0,
bytes_new / 1048576.0, amplify,
// +1 to avoid division by 0
(bytes_read / 1048576.0) /
((compaction_stats_[level].micros + 1) / 1000000.0),
(compaction_stats_[level].bytes_written / 1048576.0) /
((compaction_stats_[level].micros + 1) / 1000000.0),
compaction_stats_[level].files_in_leveln,
compaction_stats_[level].files_in_levelnp1,
compaction_stats_[level].files_out_levelnp1,
compaction_stats_[level].files_out_levelnp1 -
compaction_stats_[level].files_in_levelnp1,
compaction_stats_[level].count,
(int)((double)compaction_stats_[level].micros / 1000.0 /
(compaction_stats_[level].count + 1)),
(double)stall_us / 1000.0 / (stalls + 1),
stall_us / 1000000.0, (unsigned long)stalls);
total_slowdown += stall_leveln_slowdown_[level];
total_slowdown_count += stall_leveln_slowdown_count_[level];
value->append(buf);
}
}
interval_bytes_new = user_bytes_written - last_stats_.ingest_bytes_;
interval_bytes_read =
total_bytes_read - last_stats_.compaction_bytes_read_;
interval_bytes_written =
total_bytes_written - last_stats_.compaction_bytes_written_;
interval_seconds_up = seconds_up - last_stats_.seconds_up_;
snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n",
seconds_up, interval_seconds_up);
value->append(buf);
snprintf(buf, sizeof(buf),
"Writes cumulative: %llu total, %llu batches, "
"%.1f per batch, %.2f ingest GB\n",
(unsigned long long)(write_other + write_self),
(unsigned long long)write_self,
(write_other + write_self) / (double)(write_self + 1),
user_bytes_written / (1048576.0 * 1024));
value->append(buf);
snprintf(buf, sizeof(buf),
"WAL cumulative: %llu WAL writes, %llu WAL syncs, "
"%.2f writes per sync, %.2f GB written\n",
(unsigned long long)write_with_wal,
(unsigned long long)wal_synced,
write_with_wal / (double)(wal_synced + 1),
wal_bytes / (1048576.0 * 1024));
value->append(buf);
return false;
snprintf(buf, sizeof(buf),
"Compaction IO cumulative (GB): "
"%.2f new, %.2f read, %.2f write, %.2f read+write\n",
user_bytes_written / (1048576.0 * 1024),
total_bytes_read / (1048576.0 * 1024),
total_bytes_written / (1048576.0 * 1024),
(total_bytes_read + total_bytes_written) / (1048576.0 * 1024));
value->append(buf);
snprintf(
buf, sizeof(buf),
"Compaction IO cumulative (MB/sec): "
"%.1f new, %.1f read, %.1f write, %.1f read+write\n",
user_bytes_written / 1048576.0 / seconds_up,
total_bytes_read / 1048576.0 / seconds_up,
total_bytes_written / 1048576.0 / seconds_up,
(total_bytes_read + total_bytes_written) / 1048576.0 / seconds_up);
value->append(buf);
// +1 to avoid divide by 0 and NaN
snprintf(
buf, sizeof(buf),
"Amplification cumulative: %.1f write, %.1f compaction\n",
(double)(total_bytes_written + wal_bytes) / (user_bytes_written + 1),
(double)(total_bytes_written + total_bytes_read + wal_bytes) /
(user_bytes_written + 1));
value->append(buf);
uint64_t interval_write_other = write_other - last_stats_.write_other_;
uint64_t interval_write_self = write_self - last_stats_.write_self_;
snprintf(buf, sizeof(buf),
"Writes interval: %llu total, %llu batches, "
"%.1f per batch, %.1f ingest MB\n",
(unsigned long long)(interval_write_other + interval_write_self),
(unsigned long long)interval_write_self,
(double)(interval_write_other + interval_write_self) /
(interval_write_self + 1),
(user_bytes_written - last_stats_.ingest_bytes_) / 1048576.0);
value->append(buf);
uint64_t interval_write_with_wal =
write_with_wal - last_stats_.write_with_wal_;
uint64_t interval_wal_synced = wal_synced - last_stats_.wal_synced_;
uint64_t interval_wal_bytes = wal_bytes - last_stats_.wal_bytes_;
snprintf(buf, sizeof(buf),
"WAL interval: %llu WAL writes, %llu WAL syncs, "
"%.2f writes per sync, %.2f MB written\n",
(unsigned long long)interval_write_with_wal,
(unsigned long long)interval_wal_synced,
interval_write_with_wal / (double)(interval_wal_synced + 1),
interval_wal_bytes / (1048576.0 * 1024));
value->append(buf);
snprintf(buf, sizeof(buf),
"Compaction IO interval (MB): "
"%.2f new, %.2f read, %.2f write, %.2f read+write\n",
interval_bytes_new / 1048576.0, interval_bytes_read / 1048576.0,
interval_bytes_written / 1048576.0,
(interval_bytes_read + interval_bytes_written) / 1048576.0);
value->append(buf);
snprintf(buf, sizeof(buf),
"Compaction IO interval (MB/sec): "
"%.1f new, %.1f read, %.1f write, %.1f read+write\n",
interval_bytes_new / 1048576.0 / interval_seconds_up,
interval_bytes_read / 1048576.0 / interval_seconds_up,
interval_bytes_written / 1048576.0 / interval_seconds_up,
(interval_bytes_read + interval_bytes_written) / 1048576.0 /
interval_seconds_up);
value->append(buf);
// +1 to avoid divide by 0 and NaN
snprintf(
buf, sizeof(buf),
"Amplification interval: %.1f write, %.1f compaction\n",
(double)(interval_bytes_written + wal_bytes) /
(interval_bytes_new + 1),
(double)(interval_bytes_written + interval_bytes_read + wal_bytes) /
(interval_bytes_new + 1));
value->append(buf);
snprintf(buf, sizeof(buf),
"Stalls(secs): %.3f level0_slowdown, %.3f level0_numfiles, "
"%.3f memtable_compaction, %.3f leveln_slowdown\n",
stall_micros_[LEVEL0_SLOWDOWN] / 1000000.0,
stall_micros_[LEVEL0_NUM_FILES] / 1000000.0,
stall_micros_[MEMTABLE_COMPACTION] / 1000000.0,
total_slowdown / 1000000.0);
value->append(buf);
snprintf(buf, sizeof(buf),
"Stalls(count): %lu level0_slowdown, %lu level0_numfiles, "
"%lu memtable_compaction, %lu leveln_slowdown\n",
(unsigned long)stall_counts_[LEVEL0_SLOWDOWN],
(unsigned long)stall_counts_[LEVEL0_NUM_FILES],
(unsigned long)stall_counts_[MEMTABLE_COMPACTION],
(unsigned long)total_slowdown_count);
value->append(buf);
last_stats_.compaction_bytes_read_ = total_bytes_read;
last_stats_.compaction_bytes_written_ = total_bytes_written;
last_stats_.ingest_bytes_ = user_bytes_written;
last_stats_.seconds_up_ = seconds_up;
last_stats_.wal_bytes_ = wal_bytes;
last_stats_.wal_synced_ = wal_synced;
last_stats_.write_with_wal_ = write_with_wal;
last_stats_.write_other_ = write_other;
last_stats_.write_self_ = write_self;
return true;
}
case kSsTables:
*value = current->DebugString();
return true;
case kNumImmutableMemTable:
*value = std::to_string(cfd->imm()->size());
return true;
case kMemtableFlushPending:
// Return number of mem tables that are ready to flush (made immutable)
*value = std::to_string(cfd->imm()->IsFlushPending() ? 1 : 0);
return true;
case kCompactionPending:
// 1 if the system already determines at least one compacdtion is needed.
// 0 otherwise,
*value = std::to_string(current->NeedsCompaction() ? 1 : 0);
return true;
/////////////
case kBackgroundErrors:
// Accumulated number of errors in background flushes or compactions.
*value = std::to_string(GetBackgroundErrorCount());
return true;
/////////
default:
return false;
}
}
} // namespace rocksdb

@ -19,6 +19,25 @@
class ColumnFamilyData;
namespace rocksdb {
class MemTableList;
enum DBPropertyType {
kNumFilesAtLevel, // Number of files at a specific level
kLevelStats, // Return number of files and total sizes of each level
kStats, // Return general statitistics of DB
kSsTables, // Return a human readable string of current SST files
kNumImmutableMemTable, // Return number of immutable mem tables
kMemtableFlushPending, // Return 1 if mem table flushing is pending,
// otherwise
// 0.
kCompactionPending, // Return 1 if a compaction is pending. Otherwise 0.
kBackgroundErrors, // Return accumulated background errors encountered.
kUnknown,
};
extern DBPropertyType GetPropertyType(const Slice& property);
class InternalStats {
public:
enum WriteStallType {
@ -34,6 +53,7 @@ class InternalStats {
stall_counts_(WRITE_STALLS_ENUM_MAX, 0),
stall_leveln_slowdown_(num_levels, 0),
stall_leveln_slowdown_count_(num_levels, 0),
bg_error_count_(0),
number_levels_(num_levels),
statistics_(statistics),
env_(env),
@ -101,8 +121,12 @@ class InternalStats {
stall_leveln_slowdown_count_[level] += micros;
}
bool GetProperty(const Slice& property, std::string* value,
ColumnFamilyData* cfd);
uint64_t GetBackgroundErrorCount() const { return bg_error_count_; }
uint64_t BumpAndGetBackgroundErrorCount() { return ++bg_error_count_; }
bool GetProperty(DBPropertyType property_type, const Slice& property,
std::string* value, ColumnFamilyData* cfd);
private:
std::vector<CompactionStats> compaction_stats_;
@ -142,6 +166,13 @@ class InternalStats {
std::vector<uint64_t> stall_leveln_slowdown_;
std::vector<uint64_t> stall_leveln_slowdown_count_;
// Total number of background errors encountered. Every time a flush task
// or compaction task fails, this counter is incremented. The failure can
// be caused by any possible reason, including file system errors, out of
// resources, or input file corruption. Failing when retrying the same flush
// or compaction will cause the counter to increase too.
uint64_t bg_error_count_;
int number_levels_;
Statistics* statistics_;
Env* env_;

@ -93,7 +93,7 @@ void MemTableListVersion::Remove(MemTable* m) {
// Returns true if there is at least one memtable on which flush has
// not yet started.
bool MemTableList::IsFlushPending() {
bool MemTableList::IsFlushPending() const {
if ((flush_requested_ && num_flush_not_started_ >= 1) ||
(num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
assert(imm_flush_needed.NoBarrier_Load() != nullptr);

@ -91,7 +91,7 @@ class MemTableList {
// Returns true if there is at least one memtable on which flush has
// not yet started.
bool IsFlushPending();
bool IsFlushPending() const;
// Returns the earliest memtables that needs to be flushed. The returned
// memtables are guaranteed to be in the ascending order of created time.

@ -469,7 +469,6 @@ Version::Version(ColumnFamilyData* cfd, VersionSet* vset,
refs_(0),
// cfd is nullptr if Version is dummy
num_levels_(cfd == nullptr ? 0 : cfd->NumberLevels()),
finalized_(false),
files_(new std::vector<FileMetaData*>[num_levels_]),
files_by_size_(num_levels_),
next_file_to_compact_by_size_(num_levels_),
@ -487,13 +486,12 @@ void Version::Get(const ReadOptions& options,
GetStats* stats,
const Options& db_options,
bool* value_found) {
assert(finalized_);
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
const Comparator* ucmp = cfd_->internal_comparator().user_comparator();
auto merge_operator = db_options.merge_operator.get();
auto logger = db_options.info_log;
auto logger = db_options.info_log.get();
assert(status->ok() || status->IsMergeInProgress());
Saver saver;
@ -504,7 +502,7 @@ void Version::Get(const ReadOptions& options,
saver.value = value;
saver.merge_operator = merge_operator;
saver.merge_context = merge_context;
saver.logger = logger.get();
saver.logger = logger;
saver.didIO = false;
saver.statistics = db_options.statistics.get();
@ -627,7 +625,7 @@ void Version::Get(const ReadOptions& options,
// do a final merge of nullptr and operands;
if (merge_operator->FullMerge(user_key, nullptr,
saver.merge_context->GetOperands(),
value, logger.get())) {
value, logger)) {
*status = Status::OK();
} else {
RecordTick(db_options.statistics.get(), NUMBER_MERGE_FAILURES);
@ -652,16 +650,8 @@ bool Version::UpdateStats(const GetStats& stats) {
return false;
}
void Version::Finalize(std::vector<uint64_t>& size_being_compacted) {
assert(!finalized_);
finalized_ = true;
// Pre-sort level0 for Get()
if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
std::sort(files_[0].begin(), files_[0].end(), NewestFirstBySeqNo);
} else {
std::sort(files_[0].begin(), files_[0].end(), NewestFirst);
}
void Version::ComputeCompactionScore(
std::vector<uint64_t>& size_being_compacted) {
double max_score = 0;
int max_score_level = 0;
@ -1408,6 +1398,13 @@ class VersionSet::Builder {
}
}
// TODO(icanadi) do it in the loop above, which already sorts the files
// Pre-sort level0 for Get()
if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
std::sort(v->files_[0].begin(), v->files_[0].end(), NewestFirstBySeqNo);
} else {
std::sort(v->files_[0].begin(), v->files_[0].end(), NewestFirst);
}
CheckConsistency(v);
}
@ -1605,9 +1602,9 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
}
if (!edit->IsColumnFamilyManipulation()) {
// The calls to Finalize and UpdateFilesBySize are cpu-heavy
// The calls to ComputeCompactionScore and UpdateFilesBySize are cpu-heavy
// and is best called outside the mutex.
v->Finalize(size_being_compacted);
v->ComputeCompactionScore(size_being_compacted);
v->UpdateFilesBySize();
}
@ -2040,7 +2037,7 @@ Status VersionSet::Recover(
// Install recovered version
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted);
v->Finalize(size_being_compacted);
v->ComputeCompactionScore(size_being_compacted);
v->UpdateFilesBySize();
AppendVersion(cfd, v);
}
@ -2373,7 +2370,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
builder->SaveTo(v);
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted);
v->Finalize(size_being_compacted);
v->ComputeCompactionScore(size_being_compacted);
v->UpdateFilesBySize();
delete builder;
@ -2709,8 +2706,6 @@ ColumnFamilyData* VersionSet::CreateColumnFamily(
edit->column_family_name_, edit->column_family_, dummy_versions, options);
Version* v = new Version(new_cfd, this, current_version_number_++);
std::vector<uint64_t> size_being_compacted(options.num_levels - 1, 0);
v->Finalize(size_being_compacted);
AppendVersion(new_cfd, v);
new_cfd->CreateNewMemtable();

@ -99,8 +99,9 @@ class Version {
// Updates internal structures that keep track of compaction scores
// We use compaction scores to figure out which compaction to do next
// Also pre-sorts level0 files for Get()
void Finalize(std::vector<uint64_t>& size_being_compacted);
// REQUIRES: If Version is not yet saved to current_, it can be called without
// a lock. Once a version is saved to current_, call only with mutex held
void ComputeCompactionScore(std::vector<uint64_t>& size_being_compacted);
// Reference count management (so Versions do not disappear out from
// under live iterators)
@ -234,7 +235,6 @@ class Version {
Version* prev_; // Previous version in linked list
int refs_; // Number of live refs to this version
int num_levels_; // Number of levels
bool finalized_; // True if Finalized is called
// List of files per level, files in each level are arranged
// in increasing order of keys

@ -112,14 +112,10 @@ class Footer {
static const uint64_t kInvalidTableMagicNumber = 0;
private:
// Set the table_magic_number only when it was not previously
// initialized. Return true on success.
bool set_table_magic_number(uint64_t magic_number) {
if (HasInitializedTableMagicNumber()) {
table_magic_number_ = magic_number;
return true;
}
return false;
// REQUIRES: magic number wasn't initialized.
void set_table_magic_number(uint64_t magic_number) {
assert(!HasInitializedTableMagicNumber());
table_magic_number_ = magic_number;
}
// return true if @table_magic_number_ is set to a value different
@ -130,7 +126,7 @@ class Footer {
BlockHandle metaindex_handle_;
BlockHandle index_handle_;
uint64_t table_magic_number_;
uint64_t table_magic_number_ = 0;
};
// Read the footer from file

@ -356,7 +356,9 @@ class PosixMmapFile : public WritableFile {
uint64_t file_offset_; // Offset of base_ in file
// Have we done an munmap of unsynced data?
bool pending_sync_;
#ifdef ROCKSDB_FALLOCATE_PRESENT
bool fallocate_with_keep_size_;
#endif
// Roundup x to a multiple of y
static size_t Roundup(size_t x, size_t y) {
@ -441,8 +443,10 @@ class PosixMmapFile : public WritableFile {
dst_(nullptr),
last_sync_(nullptr),
file_offset_(0),
pending_sync_(false),
fallocate_with_keep_size_(options.fallocate_with_keep_size) {
pending_sync_(false) {
#ifdef ROCKSDB_FALLOCATE_PRESENT
fallocate_with_keep_size_ = options.fallocate_with_keep_size;
#endif
assert((page_size & (page_size - 1)) == 0);
assert(options.use_mmap_writes);
}
@ -614,7 +618,9 @@ class PosixWritableFile : public WritableFile {
bool pending_fsync_;
uint64_t last_sync_size_;
uint64_t bytes_per_sync_;
#ifdef ROCKSDB_FALLOCATE_PRESENT
bool fallocate_with_keep_size_;
#endif
public:
PosixWritableFile(const std::string& fname, int fd, size_t capacity,
@ -628,8 +634,10 @@ class PosixWritableFile : public WritableFile {
pending_sync_(false),
pending_fsync_(false),
last_sync_size_(0),
bytes_per_sync_(options.bytes_per_sync),
fallocate_with_keep_size_(options.fallocate_with_keep_size) {
bytes_per_sync_(options.bytes_per_sync) {
#ifdef ROCKSDB_FALLOCATE_PRESENT
fallocate_with_keep_size_ = options.fallocate_with_keep_size;
#endif
assert(!options.use_mmap_writes);
}
@ -809,15 +817,19 @@ class PosixRandomRWFile : public RandomRWFile {
int fd_;
bool pending_sync_;
bool pending_fsync_;
#ifdef ROCKSDB_FALLOCATE_PRESENT
bool fallocate_with_keep_size_;
#endif
public:
PosixRandomRWFile(const std::string& fname, int fd, const EnvOptions& options)
: filename_(fname),
fd_(fd),
pending_sync_(false),
pending_fsync_(false),
fallocate_with_keep_size_(options.fallocate_with_keep_size) {
pending_fsync_(false) {
#ifdef ROCKSDB_FALLOCATE_PRESENT
fallocate_with_keep_size_ = options.fallocate_with_keep_size;
#endif
assert(!options.use_mmap_writes && !options.use_mmap_reads);
}

Loading…
Cancel
Save