Several easy-to-add properties related to compaction and flushes

Summary: To partly address the request @nkg- raised, add three easy-to-add properties to compactions and flushes.

Test Plan: run unit tests and add a new unit test to cover new properties.

Reviewers: haobo, dhruba

Reviewed By: dhruba

CC: nkg-, leveldb

Differential Revision: https://reviews.facebook.net/D13677
main
Kai Liu 11 years ago committed by sdong
parent 758fa8c359
commit 1ec72b37b1
  1. 5
      db/db_impl.cc
  2. 88
      db/db_test.cc
  3. 101
      db/internal_stats.cc
  4. 22
      db/internal_stats.h
  5. 2
      db/memtable_list.cc
  6. 2
      db/memtable_list.h

@ -3696,9 +3696,10 @@ const Options& DBImpl::GetOptions() const {
bool DBImpl::GetProperty(const Slice& property, std::string* value) {
value->clear();
DBPropertyType property_type = GetPropertyType(property);
MutexLock l(&mutex_);
return internal_stats_.GetProperty(property, value, versions_.get(),
imm_.size());
return internal_stats_.GetProperty(property_type, property, value,
versions_.get(), imm_);
}
void DBImpl::GetApproximateSizes(

@ -2055,6 +2055,94 @@ TEST(DBTest, NumImmutableMemTable) {
} while (ChangeCompactOptions());
}
class SleepingBackgroundTask {
public:
explicit SleepingBackgroundTask(Env* env)
: env_(env), bg_cv_(&mutex_), should_sleep_(true) {}
void DoSleep() {
MutexLock l(&mutex_);
while (should_sleep_) {
bg_cv_.Wait();
}
}
void WakeUp() {
MutexLock l(&mutex_);
should_sleep_ = false;
bg_cv_.SignalAll();
}
static void DoSleepTask(void* arg) {
reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep();
}
private:
const Env* env_;
port::Mutex mutex_;
port::CondVar bg_cv_; // Signalled when background work finishes
bool should_sleep_;
};
TEST(DBTest, GetProperty) {
// Set sizes to both background thread pool to be 1 and block them.
env_->SetBackgroundThreads(1, Env::HIGH);
env_->SetBackgroundThreads(1, Env::LOW);
SleepingBackgroundTask sleeping_task_low(env_);
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);
SleepingBackgroundTask sleeping_task_high(env_);
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high,
Env::Priority::HIGH);
Options options = CurrentOptions();
WriteOptions writeOpt = WriteOptions();
writeOpt.disableWAL = true;
options.compaction_style = kCompactionStyleUniversal;
options.level0_file_num_compaction_trigger = 1;
options.compaction_options_universal.size_ratio = 50;
options.max_background_compactions = 1;
options.max_background_flushes = 1;
options.max_write_buffer_number = 10;
options.min_write_buffer_number_to_merge = 1;
options.write_buffer_size = 1000000;
Reopen(&options);
std::string big_value(1000000 * 2, 'x');
std::string num;
SetPerfLevel(kEnableTime);
ASSERT_OK(dbfull()->Put(writeOpt, "k1", big_value));
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.num-immutable-mem-table", &num));
ASSERT_EQ(num, "0");
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.mem-table-flush-pending", &num));
ASSERT_EQ(num, "0");
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.compaction-pending", &num));
ASSERT_EQ(num, "0");
perf_context.Reset();
ASSERT_OK(dbfull()->Put(writeOpt, "k2", big_value));
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.num-immutable-mem-table", &num));
ASSERT_EQ(num, "1");
ASSERT_OK(dbfull()->Put(writeOpt, "k3", big_value));
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.num-immutable-mem-table", &num));
ASSERT_EQ(num, "2");
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.mem-table-flush-pending", &num));
ASSERT_EQ(num, "1");
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.compaction-pending", &num));
ASSERT_EQ(num, "0");
sleeping_task_high.WakeUp();
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_OK(dbfull()->Put(writeOpt, "k4", big_value));
ASSERT_OK(dbfull()->Put(writeOpt, "k5", big_value));
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.mem-table-flush-pending", &num));
ASSERT_EQ(num, "0");
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.compaction-pending", &num));
ASSERT_EQ(num, "1");
sleeping_task_low.WakeUp();
}
TEST(DBTest, FLUSH) {
do {
Options options = CurrentOptions();

@ -1,4 +1,3 @@
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
@ -8,21 +7,46 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/internal_stats.h"
#include "db/memtable_list.h"
#include <vector>
namespace rocksdb {
bool InternalStats::GetProperty(const Slice& property, std::string* value,
VersionSet* version_set, int immsize) {
Version* current = version_set->current();
DBPropertyType GetPropertyType(const Slice& property) {
Slice in = property;
Slice prefix("rocksdb.");
if (!in.starts_with(prefix)) return false;
if (!in.starts_with(prefix)) return kUnknown;
in.remove_prefix(prefix.size());
if (in.starts_with("num-files-at-level")) {
in.remove_prefix(strlen("num-files-at-level"));
return kNumFilesAtLevel;
} else if (in == "levelstats") {
return kLevelStats;
} else if (in == "stats") {
return kStats;
} else if (in == "sstables") {
return kSsTables;
} else if (in == "num-immutable-mem-table") {
return kNumImmutableMemTable;
} else if (in == "mem-table-flush-pending") {
return MemtableFlushPending;
} else if (in == "compaction-pending") {
return CompactionPending;
}
return kUnknown;
}
bool InternalStats::GetProperty(DBPropertyType property_type,
const Slice& property, std::string* value,
VersionSet* version_set,
const MemTableList& imm) {
Version* current = version_set->current();
Slice in = property;
switch (property_type) {
case kNumFilesAtLevel: {
in.remove_prefix(strlen("rocksdb.num-files-at-level"));
uint64_t level;
bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
if (!ok || (int)level >= number_levels_) {
@ -34,7 +58,8 @@ bool InternalStats::GetProperty(const Slice& property, std::string* value,
*value = buf;
return true;
}
} else if (in == "levelstats") {
}
case kLevelStats: {
char buf[1000];
snprintf(buf, sizeof(buf),
"Level Files Size(MB)\n"
@ -48,8 +73,8 @@ bool InternalStats::GetProperty(const Slice& property, std::string* value,
value->append(buf);
}
return true;
} else if (in == "stats") {
}
case kStats: {
char buf[1000];
uint64_t wal_bytes = 0;
@ -80,11 +105,19 @@ bool InternalStats::GetProperty(const Slice& property, std::string* value,
}
// Pardon the long line but I think it is easier to read this way.
snprintf(buf, sizeof(buf),
snprintf(
buf, sizeof(buf),
" Compactions\n"
"Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count msComp msStall Ln-stall Stall-cnt\n"
"------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n"
);
"Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) "
" "
"Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn "
"Rnp1 "
" Wnp1 NewW Count msComp msStall Ln-stall Stall-cnt\n"
"--------------------------------------------------------------------"
"--"
"--------------------------------------------------------------------"
"--"
"----------------------------------------------------------------\n");
value->append(buf);
for (int level = 0; level < number_levels_; level++) {
int files = current->NumLevelFiles(level);
@ -93,7 +126,8 @@ bool InternalStats::GetProperty(const Slice& property, std::string* value,
compaction_stats_[level].bytes_readnp1;
int64_t bytes_new = compaction_stats_[level].bytes_written -
compaction_stats_[level].bytes_readnp1;
double amplify = (compaction_stats_[level].bytes_readn == 0)
double amplify =
(compaction_stats_[level].bytes_readn == 0)
? 0.0
: (compaction_stats_[level].bytes_written +
compaction_stats_[level].bytes_readnp1 +
@ -120,7 +154,8 @@ bool InternalStats::GetProperty(const Slice& property, std::string* value,
level, files, current->NumLevelBytes(level) / 1048576.0,
current->NumLevelBytes(level) /
version_set->MaxBytesForLevel(level),
compaction_stats_[level].micros / 1e6, bytes_read / 1048576.0,
compaction_stats_[level].micros / 1e6,
bytes_read / 1048576.0,
compaction_stats_[level].bytes_written / 1048576.0,
compaction_stats_[level].bytes_readn / 1048576.0,
compaction_stats_[level].bytes_readnp1 / 1048576.0,
@ -138,8 +173,8 @@ bool InternalStats::GetProperty(const Slice& property, std::string* value,
compaction_stats_[level].count,
(int)((double)compaction_stats_[level].micros / 1000.0 /
(compaction_stats_[level].count + 1)),
(double)stall_us / 1000.0 / (stalls + 1), stall_us / 1000000.0,
(unsigned long)stalls);
(double)stall_us / 1000.0 / (stalls + 1),
stall_us / 1000000.0, (unsigned long)stalls);
total_slowdown += stall_leveln_slowdown_[level];
total_slowdown_count += stall_leveln_slowdown_count_[level];
value->append(buf);
@ -147,7 +182,8 @@ bool InternalStats::GetProperty(const Slice& property, std::string* value,
}
interval_bytes_new = user_bytes_written - last_stats_.ingest_bytes_;
interval_bytes_read = total_bytes_read - last_stats_.compaction_bytes_read_;
interval_bytes_read =
total_bytes_read - last_stats_.compaction_bytes_read_;
interval_bytes_written =
total_bytes_written - last_stats_.compaction_bytes_written_;
interval_seconds_up = seconds_up - last_stats_.seconds_up_;
@ -168,7 +204,8 @@ bool InternalStats::GetProperty(const Slice& property, std::string* value,
snprintf(buf, sizeof(buf),
"WAL cumulative: %llu WAL writes, %llu WAL syncs, "
"%.2f writes per sync, %.2f GB written\n",
(unsigned long long)write_with_wal, (unsigned long long)wal_synced,
(unsigned long long)write_with_wal,
(unsigned long long)wal_synced,
write_with_wal / (double)(wal_synced + 1),
wal_bytes / (1048576.0 * 1024));
value->append(buf);
@ -182,7 +219,8 @@ bool InternalStats::GetProperty(const Slice& property, std::string* value,
(total_bytes_read + total_bytes_written) / (1048576.0 * 1024));
value->append(buf);
snprintf(buf, sizeof(buf),
snprintf(
buf, sizeof(buf),
"Compaction IO cumulative (MB/sec): "
"%.1f new, %.1f read, %.1f write, %.1f read+write\n",
user_bytes_written / 1048576.0 / seconds_up,
@ -250,7 +288,8 @@ bool InternalStats::GetProperty(const Slice& property, std::string* value,
snprintf(
buf, sizeof(buf),
"Amplification interval: %.1f write, %.1f compaction\n",
(double)(interval_bytes_written + wal_bytes) / (interval_bytes_new + 1),
(double)(interval_bytes_written + wal_bytes) /
(interval_bytes_new + 1),
(double)(interval_bytes_written + interval_bytes_read + wal_bytes) /
(interval_bytes_new + 1));
value->append(buf);
@ -284,15 +323,25 @@ bool InternalStats::GetProperty(const Slice& property, std::string* value,
last_stats_.write_self_ = write_self;
return true;
} else if (in == "sstables") {
}
case kSsTables:
*value = current->DebugString();
return true;
} else if (in == "num-immutable-mem-table") {
*value = std::to_string(immsize);
case kNumImmutableMemTable:
*value = std::to_string(imm.size());
return true;
}
case MemtableFlushPending:
// Return number of mem tables that are ready to flush (made immutable)
*value = std::to_string(imm.IsFlushPending() ? 1 : 0);
return true;
case CompactionPending:
// 1 if the system already determines at least one compacdtion is needed.
// 0 otherwise,
*value = std::to_string(current->NeedsCompaction() ? 1 : 0);
return true;
default:
return false;
}
}
} // namespace rocksdb

@ -17,6 +17,23 @@
#include <string>
namespace rocksdb {
class MemTableList;
enum DBPropertyType {
kNumFilesAtLevel, // Number of files at a specific level
kLevelStats, // Return number of files and total sizes of each level
kStats, // Return general statitistics of DB
kSsTables, // Return a human readable string of current SST files
kNumImmutableMemTable, // Return number of immutable mem tables
MemtableFlushPending, // Return 1 if mem table flushing is pending, otherwise
// 0.
CompactionPending, // Return 1 if a compaction is pending. Otherwise 0.
kUnknown,
};
extern DBPropertyType GetPropertyType(const Slice& property);
class InternalStats {
public:
enum WriteStallType {
@ -99,8 +116,9 @@ class InternalStats {
stall_leveln_slowdown_count_[level] += micros;
}
bool GetProperty(const Slice& property, std::string* value,
VersionSet* version_set, int immsize);
bool GetProperty(DBPropertyType property_type, const Slice& property,
std::string* value, VersionSet* version_set,
const MemTableList& imm);
private:
std::vector<CompactionStats> compaction_stats_;

@ -92,7 +92,7 @@ void MemTableListVersion::Remove(MemTable* m) {
// Returns true if there is at least one memtable on which flush has
// not yet started.
bool MemTableList::IsFlushPending() {
bool MemTableList::IsFlushPending() const {
if ((flush_requested_ && num_flush_not_started_ >= 1) ||
(num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
assert(imm_flush_needed.NoBarrier_Load() != nullptr);

@ -86,7 +86,7 @@ class MemTableList {
// Returns true if there is at least one memtable on which flush has
// not yet started.
bool IsFlushPending();
bool IsFlushPending() const;
// Returns the earliest memtables that needs to be flushed. The returned
// memtables are guaranteed to be in the ascending order of created time.

Loading…
Cancel
Save