Allow EventListener::OnCompactionCompleted to return CompactionJobStats.

Summary:
Allow EventListener::OnCompactionCompleted to return CompactionJobStats,
which contains useful information about a compaction.

Example CompactionJobStats returned by OnCompactionCompleted():
    smallest_output_key_prefix 05000000
    largest_output_key_prefix 06990000
    elapsed_time 42419
    num_input_records 300
    num_input_files 3
    num_input_files_at_output_level 2
    num_output_records 200
    num_output_files 1
    actual_bytes_input 167200
    actual_bytes_output 110688
    total_input_raw_key_bytes 5400
    total_input_raw_value_bytes 300000
    num_records_replaced 100
    is_manual_compaction 1

Test Plan: Developed a mega test in db_test which covers 20 variables in CompactionJobStats.

Reviewers: rven, igor, anthony, sdong

Reviewed By: sdong

Subscribers: tnovak, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D38463
main
Yueh-Hsuan Chiang 10 years ago
parent 3083ed2129
commit fe5c6321cb
  1. 6
      Makefile
  2. 2
      db/column_family.cc
  3. 7
      db/column_family.h
  4. 122
      db/compaction_job.cc
  5. 10
      db/compaction_job.h
  6. 691
      db/compaction_job_stats_test.cc
  7. 49
      db/compaction_job_test.cc
  8. 37
      db/db_impl.cc
  9. 4
      db/db_impl.h
  10. 7
      db/merge_helper.cc
  11. 51
      include/rocksdb/compaction_job_stats.h
  12. 6
      include/rocksdb/iostats_context.h
  13. 15
      include/rocksdb/listener.h
  14. 2
      src.mk
  15. 44
      util/compaction_job_stats_impl.cc
  16. 9
      util/thread_status_updater.cc
  17. 2
      util/thread_status_updater.h
  18. 14
      util/thread_status_util.cc
  19. 2
      util/thread_status_util.h

@ -289,7 +289,8 @@ TESTS = \
compact_files_test \ compact_files_test \
perf_context_test \ perf_context_test \
optimistic_transaction_test \ optimistic_transaction_test \
write_callback_test write_callback_test \
compaction_job_stats_test
SUBSET := $(shell echo $(TESTS) |sed s/^.*$(ROCKSDBTESTS_START)/$(ROCKSDBTESTS_START)/) SUBSET := $(shell echo $(TESTS) |sed s/^.*$(ROCKSDBTESTS_START)/$(ROCKSDBTESTS_START)/)
@ -714,6 +715,9 @@ flush_job_test: db/flush_job_test.o $(LIBOBJECTS) $(TESTHARNESS)
compaction_job_test: db/compaction_job_test.o $(LIBOBJECTS) $(TESTHARNESS) compaction_job_test: db/compaction_job_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)
compaction_job_stats_test: db/compaction_job_stats_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
wal_manager_test: db/wal_manager_test.o $(LIBOBJECTS) $(TESTHARNESS) wal_manager_test: db/wal_manager_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)

@ -25,13 +25,13 @@
#include "db/version_set.h" #include "db/version_set.h"
#include "db/writebuffer.h" #include "db/writebuffer.h"
#include "db/internal_stats.h" #include "db/internal_stats.h"
#include "db/job_context.h"
#include "db/table_properties_collector.h" #include "db/table_properties_collector.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "db/write_controller.h" #include "db/write_controller.h"
#include "util/autovector.h" #include "util/autovector.h"
#include "util/hash_skiplist_rep.h" #include "util/hash_skiplist_rep.h"
#include "util/options_helper.h" #include "util/options_helper.h"
#include "util/thread_status_util.h"
#include "util/xfunc.h" #include "util/xfunc.h"
namespace rocksdb { namespace rocksdb {

@ -14,15 +14,16 @@
#include <vector> #include <vector>
#include <atomic> #include <atomic>
#include "rocksdb/options.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "db/memtable_list.h" #include "db/memtable_list.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "db/write_controller.h" #include "db/write_controller.h"
#include "db/table_cache.h" #include "db/table_cache.h"
#include "db/table_properties_collector.h" #include "db/table_properties_collector.h"
#include "db/flush_scheduler.h" #include "db/flush_scheduler.h"
#include "rocksdb/compaction_job_stats.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "util/instrumented_mutex.h" #include "util/instrumented_mutex.h"
#include "util/mutable_cf_options.h" #include "util/mutable_cf_options.h"
#include "util/thread_local.h" #include "util/thread_local.h"

@ -205,9 +205,11 @@ CompactionJob::CompactionJob(
std::vector<SequenceNumber> existing_snapshots, std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache, std::shared_ptr<Cache> table_cache,
std::function<uint64_t()> yield_callback, EventLogger* event_logger, std::function<uint64_t()> yield_callback, EventLogger* event_logger,
bool paranoid_file_checks, const std::string& dbname) bool paranoid_file_checks, const std::string& dbname,
CompactionJobStats* compaction_job_stats)
: job_id_(job_id), : job_id_(job_id),
compact_(new CompactionState(compaction)), compact_(new CompactionState(compaction)),
compaction_job_stats_(compaction_job_stats),
compaction_stats_(1), compaction_stats_(1),
dbname_(dbname), dbname_(dbname),
db_options_(db_options), db_options_(db_options),
@ -248,11 +250,15 @@ void CompactionJob::ReportStartedCompaction(
(static_cast<uint64_t>(compact_->compaction->start_level()) << 32) + (static_cast<uint64_t>(compact_->compaction->start_level()) << 32) +
compact_->compaction->output_level()); compact_->compaction->output_level());
// In the current design, a CompactionJob is always created
// for non-trivial compaction.
assert(compaction->IsTrivialMove() == false ||
compaction->IsManualCompaction() == true);
ThreadStatusUtil::SetThreadOperationProperty( ThreadStatusUtil::SetThreadOperationProperty(
ThreadStatus::COMPACTION_PROP_FLAGS, ThreadStatus::COMPACTION_PROP_FLAGS,
compaction->IsManualCompaction() + compaction->IsManualCompaction() +
(compaction->IsDeletionCompaction() << 1) + (compaction->IsDeletionCompaction() << 1));
(compaction->IsTrivialMove() << 2));
ThreadStatusUtil::SetThreadOperationProperty( ThreadStatusUtil::SetThreadOperationProperty(
ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES, ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES,
@ -269,6 +275,11 @@ void CompactionJob::ReportStartedCompaction(
// to ensure GetThreadList() can always show them all together. // to ensure GetThreadList() can always show them all together.
ThreadStatusUtil::SetThreadOperation( ThreadStatusUtil::SetThreadOperation(
ThreadStatus::OP_COMPACTION); ThreadStatus::OP_COMPACTION);
if (compaction_job_stats_) {
compaction_job_stats_->is_manual_compaction =
compaction->IsManualCompaction();
}
} }
void CompactionJob::Prepare() { void CompactionJob::Prepare() {
@ -575,6 +586,8 @@ void CompactionJob::Install(Status* status,
status->ToString().c_str(), stats.num_input_records, status->ToString().c_str(), stats.num_input_records,
stats.num_dropped_records); stats.num_dropped_records);
UpdateCompactionJobStats(stats);
auto stream = event_logger_->LogToBuffer(log_buffer_); auto stream = event_logger_->LogToBuffer(log_buffer_);
stream << "job" << job_id_ << "event" stream << "job" << job_id_ << "event"
<< "compaction_finished" << "compaction_finished"
@ -636,19 +649,8 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
!cfd->IsDropped() && status.ok()) { !cfd->IsDropped() && status.ok()) {
compact_->num_input_records++; compact_->num_input_records++;
if (++loop_cnt > 1000) { if (++loop_cnt > 1000) {
if (key_drop_user > 0) { RecordDroppedKeys(
RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user); &key_drop_user, &key_drop_newer_entry, &key_drop_obsolete);
key_drop_user = 0;
}
if (key_drop_newer_entry > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY,
key_drop_newer_entry);
key_drop_newer_entry = 0;
}
if (key_drop_obsolete > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, key_drop_obsolete);
key_drop_obsolete = 0;
}
RecordCompactionIOStats(); RecordCompactionIOStats();
loop_cnt = 0; loop_cnt = 0;
} }
@ -680,6 +682,13 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
++combined_idx; ++combined_idx;
} }
if (compaction_job_stats_ != nullptr) {
compaction_job_stats_->total_input_raw_key_bytes +=
input->key().size();
compaction_job_stats_->total_input_raw_value_bytes +=
input->value().size();
}
if (compact_->compaction->ShouldStopBefore(key) && if (compact_->compaction->ShouldStopBefore(key) &&
compact_->builder != nullptr) { compact_->builder != nullptr) {
status = FinishCompactionOutputFile(input); status = FinishCompactionOutputFile(input);
@ -922,20 +931,33 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
} }
} }
RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time); RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time);
if (key_drop_user > 0) { RecordDroppedKeys(&key_drop_user, &key_drop_newer_entry, &key_drop_obsolete);
RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user);
}
if (key_drop_newer_entry > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, key_drop_newer_entry);
}
if (key_drop_obsolete > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, key_drop_obsolete);
}
RecordCompactionIOStats(); RecordCompactionIOStats();
return status; return status;
} }
void CompactionJob::RecordDroppedKeys(
int64_t* key_drop_user,
int64_t* key_drop_newer_entry,
int64_t* key_drop_obsolete) {
if (*key_drop_user > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_USER, *key_drop_user);
*key_drop_user = 0;
}
if (*key_drop_newer_entry > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, *key_drop_newer_entry);
if (compaction_job_stats_) {
compaction_job_stats_->num_records_replaced += *key_drop_newer_entry;
}
*key_drop_newer_entry = 0;
}
if (*key_drop_obsolete > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, *key_drop_obsolete);
*key_drop_obsolete = 0;
}
}
void CompactionJob::CallCompactionFilterV2( void CompactionJob::CallCompactionFilterV2(
CompactionFilterV2* compaction_filter_v2, uint64_t* time) { CompactionFilterV2* compaction_filter_v2, uint64_t* time) {
if (compact_ == nullptr || compaction_filter_v2 == nullptr) { if (compact_ == nullptr || compaction_filter_v2 == nullptr) {
@ -1227,4 +1249,52 @@ void CompactionJob::CleanupCompaction(const Status& status) {
compact_ = nullptr; compact_ = nullptr;
} }
#ifndef ROCKSDB_LITE
namespace {
void CopyPrefix(
char* dst, size_t dst_length, const Slice& src) {
assert(dst_length > 0);
size_t length = src.size() > dst_length - 1 ? dst_length - 1 : src.size();
memcpy(dst, src.data(), length);
dst[length] = 0;
}
} // namespace
#endif // !ROCKSDB_LITE
void CompactionJob::UpdateCompactionJobStats(
const InternalStats::CompactionStats& stats) const {
#ifndef ROCKSDB_LITE
if (compaction_job_stats_) {
compaction_job_stats_->elapsed_micros = stats.micros;
// input information
compaction_job_stats_->total_input_bytes =
stats.bytes_readn + stats.bytes_readnp1;
compaction_job_stats_->num_input_records = compact_->num_input_records;
compaction_job_stats_->num_input_files =
stats.files_in_leveln + stats.files_in_levelnp1;
compaction_job_stats_->num_input_files_at_output_level =
stats.files_in_levelnp1;
// output information
compaction_job_stats_->total_output_bytes = stats.bytes_written;
compaction_job_stats_->num_output_records =
compact_->num_output_records;
compaction_job_stats_->num_output_files = stats.files_out_levelnp1;
if (compact_->outputs.size() > 0U) {
CopyPrefix(
compaction_job_stats_->smallest_output_key_prefix,
sizeof(compaction_job_stats_->smallest_output_key_prefix),
compact_->outputs[0].smallest.user_key().ToString());
CopyPrefix(
compaction_job_stats_->largest_output_key_prefix,
sizeof(compaction_job_stats_->largest_output_key_prefix),
compact_->current_output()->largest.user_key().ToString());
}
}
#endif // !ROCKSDB_LITE
}
} // namespace rocksdb } // namespace rocksdb

@ -27,6 +27,7 @@
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/memtablerep.h" #include "rocksdb/memtablerep.h"
#include "rocksdb/compaction_filter.h" #include "rocksdb/compaction_filter.h"
#include "rocksdb/compaction_job_stats.h"
#include "rocksdb/transaction_log.h" #include "rocksdb/transaction_log.h"
#include "util/autovector.h" #include "util/autovector.h"
#include "util/event_logger.h" #include "util/event_logger.h"
@ -59,7 +60,8 @@ class CompactionJob {
std::shared_ptr<Cache> table_cache, std::shared_ptr<Cache> table_cache,
std::function<uint64_t()> yield_callback, std::function<uint64_t()> yield_callback,
EventLogger* event_logger, bool paranoid_file_checks, EventLogger* event_logger, bool paranoid_file_checks,
const std::string& dbname); const std::string& dbname,
CompactionJobStats* compaction_job_stats);
~CompactionJob(); ~CompactionJob();
@ -97,12 +99,18 @@ class CompactionJob {
void RecordCompactionIOStats(); void RecordCompactionIOStats();
Status OpenCompactionOutputFile(); Status OpenCompactionOutputFile();
void CleanupCompaction(const Status& status); void CleanupCompaction(const Status& status);
void UpdateCompactionJobStats(
const InternalStats::CompactionStats& stats) const;
void RecordDroppedKeys(int64_t* key_drop_user,
int64_t* key_drop_newer_entry,
int64_t* key_drop_obsolete);
int job_id_; int job_id_;
// CompactionJob state // CompactionJob state
struct CompactionState; struct CompactionState;
CompactionState* compact_; CompactionState* compact_;
CompactionJobStats* compaction_job_stats_;
bool bottommost_level_; bool bottommost_level_;
SequenceNumber earliest_snapshot_; SequenceNumber earliest_snapshot_;

@ -0,0 +1,691 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <algorithm>
#include <iostream>
#include <mutex>
#include <queue>
#include <set>
#include <thread>
#include <unordered_set>
#include <utility>
#include "db/db_impl.h"
#include "db/dbformat.h"
#include "db/filename.h"
#include "db/job_context.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "port/stack_trace.h"
#include "rocksdb/cache.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/experimental.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/options.h"
#include "rocksdb/perf_context.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/thread_status.h"
#include "rocksdb/utilities/checkpoint.h"
#include "rocksdb/utilities/convenience.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "table/block_based_table_factory.h"
#include "table/mock_table.h"
#include "table/plain_table_factory.h"
#include "util/compression.h"
#include "util/hash.h"
#include "util/hash_linklist_rep.h"
#include "util/logging.h"
#include "util/mock_env.h"
#include "util/mutexlock.h"
#include "util/rate_limiter.h"
#include "util/scoped_arena_iterator.h"
#include "util/statistics.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "util/thread_status_util.h"
#include "util/xfunc.h"
#include "utilities/merge_operators.h"
#if !defined(IOS_CROSS_COMPILE)
#ifndef ROCKSDB_LITE
namespace rocksdb {
static std::string RandomString(Random* rnd, int len, double ratio) {
std::string r;
test::CompressibleString(rnd, ratio, len, &r);
return r;
}
std::string Key(uint64_t key, int length) {
const int kBufSize = 1000;
char buf[kBufSize];
if (length > kBufSize) {
length = kBufSize;
}
snprintf(buf, kBufSize, "%0*lu", length, key);
return std::string(buf);
}
class CompactionJobStatsTest : public testing::Test {
public:
std::string dbname_;
std::string alternative_wal_dir_;
Env* env_;
DB* db_;
std::vector<ColumnFamilyHandle*> handles_;
Options last_options_;
CompactionJobStatsTest() : env_(Env::Default()) {
env_->SetBackgroundThreads(1, Env::LOW);
env_->SetBackgroundThreads(1, Env::HIGH);
dbname_ = test::TmpDir(env_) + "/compaction_job_stats_test";
alternative_wal_dir_ = dbname_ + "/wal";
Options options;
options.create_if_missing = true;
auto delete_options = options;
delete_options.wal_dir = alternative_wal_dir_;
EXPECT_OK(DestroyDB(dbname_, delete_options));
// Destroy it for not alternative WAL dir is used.
EXPECT_OK(DestroyDB(dbname_, options));
db_ = nullptr;
Reopen(options);
}
~CompactionJobStatsTest() {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->LoadDependency({});
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
Options options;
options.db_paths.emplace_back(dbname_, 0);
options.db_paths.emplace_back(dbname_ + "_2", 0);
options.db_paths.emplace_back(dbname_ + "_3", 0);
options.db_paths.emplace_back(dbname_ + "_4", 0);
EXPECT_OK(DestroyDB(dbname_, options));
}
DBImpl* dbfull() {
return reinterpret_cast<DBImpl*>(db_);
}
void CreateColumnFamilies(const std::vector<std::string>& cfs,
const Options& options) {
ColumnFamilyOptions cf_opts(options);
size_t cfi = handles_.size();
handles_.resize(cfi + cfs.size());
for (auto cf : cfs) {
ASSERT_OK(db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++]));
}
}
void CreateAndReopenWithCF(const std::vector<std::string>& cfs,
const Options& options) {
CreateColumnFamilies(cfs, options);
std::vector<std::string> cfs_plus_default = cfs;
cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
ReopenWithColumnFamilies(cfs_plus_default, options);
}
void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
const std::vector<Options>& options) {
ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
}
void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
const Options& options) {
ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
}
Status TryReopenWithColumnFamilies(
const std::vector<std::string>& cfs,
const std::vector<Options>& options) {
Close();
EXPECT_EQ(cfs.size(), options.size());
std::vector<ColumnFamilyDescriptor> column_families;
for (size_t i = 0; i < cfs.size(); ++i) {
column_families.push_back(ColumnFamilyDescriptor(cfs[i], options[i]));
}
DBOptions db_opts = DBOptions(options[0]);
return DB::Open(db_opts, dbname_, column_families, &handles_, &db_);
}
Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
const Options& options) {
Close();
std::vector<Options> v_opts(cfs.size(), options);
return TryReopenWithColumnFamilies(cfs, v_opts);
}
void Reopen(const Options& options) {
ASSERT_OK(TryReopen(options));
}
void Close() {
for (auto h : handles_) {
delete h;
}
handles_.clear();
delete db_;
db_ = nullptr;
}
void DestroyAndReopen(const Options& options) {
// Destroy using last options
Destroy(last_options_);
ASSERT_OK(TryReopen(options));
}
void Destroy(const Options& options) {
Close();
ASSERT_OK(DestroyDB(dbname_, options));
}
Status ReadOnlyReopen(const Options& options) {
return DB::OpenForReadOnly(options, dbname_, &db_);
}
Status TryReopen(const Options& options) {
Close();
last_options_ = options;
return DB::Open(options, dbname_, &db_);
}
Status Flush(int cf = 0) {
if (cf == 0) {
return db_->Flush(FlushOptions());
} else {
return db_->Flush(FlushOptions(), handles_[cf]);
}
}
Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()) {
return db_->Put(wo, k, v);
}
Status Put(int cf, const Slice& k, const Slice& v,
WriteOptions wo = WriteOptions()) {
return db_->Put(wo, handles_[cf], k, v);
}
Status Delete(const std::string& k) {
return db_->Delete(WriteOptions(), k);
}
Status Delete(int cf, const std::string& k) {
return db_->Delete(WriteOptions(), handles_[cf], k);
}
std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) {
ReadOptions options;
options.verify_checksums = true;
options.snapshot = snapshot;
std::string result;
Status s = db_->Get(options, k, &result);
if (s.IsNotFound()) {
result = "NOT_FOUND";
} else if (!s.ok()) {
result = s.ToString();
}
return result;
}
std::string Get(int cf, const std::string& k,
const Snapshot* snapshot = nullptr) {
ReadOptions options;
options.verify_checksums = true;
options.snapshot = snapshot;
std::string result;
Status s = db_->Get(options, handles_[cf], k, &result);
if (s.IsNotFound()) {
result = "NOT_FOUND";
} else if (!s.ok()) {
result = s.ToString();
}
return result;
}
int NumTableFilesAtLevel(int level, int cf = 0) {
std::string property;
if (cf == 0) {
// default cfd
EXPECT_TRUE(db_->GetProperty(
"rocksdb.num-files-at-level" + NumberToString(level), &property));
} else {
EXPECT_TRUE(db_->GetProperty(
handles_[cf], "rocksdb.num-files-at-level" + NumberToString(level),
&property));
}
return atoi(property.c_str());
}
// Return spread of files per level
std::string FilesPerLevel(int cf = 0) {
int num_levels =
(cf == 0) ? db_->NumberLevels() : db_->NumberLevels(handles_[1]);
std::string result;
size_t last_non_zero_offset = 0;
for (int level = 0; level < num_levels; level++) {
int f = NumTableFilesAtLevel(level, cf);
char buf[100];
snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
result += buf;
if (f > 0) {
last_non_zero_offset = result.size();
}
}
result.resize(last_non_zero_offset);
return result;
}
uint64_t Size(const Slice& start, const Slice& limit, int cf = 0) {
Range r(start, limit);
uint64_t size;
if (cf == 0) {
db_->GetApproximateSizes(&r, 1, &size);
} else {
db_->GetApproximateSizes(handles_[1], &r, 1, &size);
}
return size;
}
void Compact(int cf, const Slice& start, const Slice& limit,
uint32_t target_path_id) {
ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit, false, -1,
target_path_id));
}
void Compact(int cf, const Slice& start, const Slice& limit) {
ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit));
}
void Compact(const Slice& start, const Slice& limit) {
ASSERT_OK(db_->CompactRange(&start, &limit));
}
// Do n memtable compactions, each of which produces an sstable
// covering the range [small,large].
void MakeTables(int n, const std::string& small, const std::string& large,
int cf = 0) {
for (int i = 0; i < n; i++) {
ASSERT_OK(Put(cf, small, "begin"));
ASSERT_OK(Put(cf, large, "end"));
ASSERT_OK(Flush(cf));
}
}
void MakeTableWithKeyValues(
Random* rnd, uint64_t smallest, uint64_t largest,
int key_size, int value_size, uint64_t interval,
double ratio, int cf = 0) {
for (auto key = smallest; key < largest; key += interval) {
ASSERT_OK(Put(cf, Slice(Key(key, key_size)),
Slice(RandomString(rnd, value_size, ratio))));
}
ASSERT_OK(Flush(cf));
}
};
// An EventListener which helps verify the compaction results in
// test CompactionJobStatsTest.
class CompactionJobStatsChecker : public EventListener {
public:
CompactionJobStatsChecker() : compression_enabled_(false) {}
size_t NumberOfUnverifiedStats() { return expected_stats_.size(); }
// Once a compaction completed, this functionw will verify the returned
// CompactionJobInfo with the oldest CompactionJobInfo added earlier
// in "expected_stats_" which has not yet being used for verification.
virtual void OnCompactionCompleted(DB *db, const CompactionJobInfo& ci) {
std::lock_guard<std::mutex> lock(mutex_);
if (expected_stats_.size()) {
Verify(ci.stats, expected_stats_.front());
expected_stats_.pop();
}
}
// A helper function which verifies whether two CompactionJobStats
// match. The verification of all compaction stats are done by
// ASSERT_EQ except the following stats, which we use ASSERT_GE
// and ASSERT_LE with a reasonable (< 15%) bias:
// 1. write-amplication
// 2. actual bytes input and output, which relies on the compression
// ratio and the implementation of table formats.
void Verify(const CompactionJobStats& current_stats,
const CompactionJobStats& stats) {
// time
ASSERT_GT(current_stats.elapsed_micros, 0U);
ASSERT_EQ(current_stats.num_input_records,
stats.num_input_records);
ASSERT_EQ(current_stats.num_input_files,
stats.num_input_files);
ASSERT_EQ(current_stats.num_input_files_at_output_level,
stats.num_input_files_at_output_level);
ASSERT_EQ(current_stats.num_output_records,
stats.num_output_records);
ASSERT_EQ(current_stats.num_output_files,
stats.num_output_files);
ASSERT_EQ(current_stats.is_manual_compaction,
stats.is_manual_compaction);
// file size
double kFileSizeBias = 0.15;
ASSERT_GE(current_stats.total_input_bytes * (1.00 + kFileSizeBias),
stats.total_input_bytes);
ASSERT_LE(current_stats.total_input_bytes,
stats.total_input_bytes * (1.00 + kFileSizeBias));
ASSERT_GE(current_stats.total_output_bytes * (1.00 + kFileSizeBias),
stats.total_output_bytes);
ASSERT_LE(current_stats.total_output_bytes,
stats.total_output_bytes * (1.00 + kFileSizeBias));
ASSERT_EQ(current_stats.total_input_raw_key_bytes,
stats.total_input_raw_key_bytes);
ASSERT_EQ(current_stats.total_input_raw_value_bytes,
stats.total_input_raw_value_bytes);
ASSERT_EQ(current_stats.num_records_replaced,
stats.num_records_replaced);
ASSERT_EQ(
std::string(current_stats.smallest_output_key_prefix),
std::string(stats.smallest_output_key_prefix));
ASSERT_EQ(
std::string(current_stats.largest_output_key_prefix),
std::string(stats.largest_output_key_prefix));
}
// Add an expected compaction stats, which will be used to
// verify the CompactionJobStats returned by the OnCompactionCompleted()
// callback.
void AddExpectedStats(const CompactionJobStats& stats) {
std::lock_guard<std::mutex> lock(mutex_);
expected_stats_.push(stats);
}
void EnableCompression(bool flag) {
compression_enabled_ = flag;
}
private:
std::mutex mutex_;
std::queue<CompactionJobStats> expected_stats_;
bool compression_enabled_;
};
namespace {
uint64_t EstimatedFileSize(
uint64_t num_records, size_t key_size, size_t value_size,
double compression_ratio = 1.0,
size_t block_size = 4096,
int bloom_bits_per_key = 10) {
const size_t kPerKeyOverhead = 8;
const size_t kFooterSize = 512;
uint64_t data_size =
num_records * (key_size + value_size * compression_ratio +
kPerKeyOverhead);
return data_size + kFooterSize
+ num_records * bloom_bits_per_key / 8 // filter block
+ data_size * (key_size + 8) / block_size; // index block
}
namespace {
void CopyPrefix(
char* dst, size_t dst_length, const Slice& src) {
assert(dst_length > 0);
size_t length = src.size() > dst_length - 1 ? dst_length - 1 : src.size();
memcpy(dst, src.data(), length);
dst[length] = 0;
}
} // namespace
CompactionJobStats NewManualCompactionJobStats(
const std::string& smallest_key, const std::string& largest_key,
size_t num_input_files, size_t num_input_files_at_output_level,
uint64_t num_input_records, size_t key_size, size_t value_size,
size_t num_output_files, uint64_t num_output_records,
double compression_ratio, uint64_t num_records_replaced) {
CompactionJobStats stats;
stats.Reset();
stats.num_input_records = num_input_records;
stats.num_input_files = num_input_files;
stats.num_input_files_at_output_level = num_input_files_at_output_level;
stats.num_output_records = num_output_records;
stats.num_output_files = num_output_files;
stats.total_input_bytes =
EstimatedFileSize(
num_input_records / num_input_files,
key_size, value_size, compression_ratio) * num_input_files;
stats.total_output_bytes =
EstimatedFileSize(
num_output_records / num_output_files,
key_size, value_size, compression_ratio) * num_output_files;
stats.total_input_raw_key_bytes =
num_input_records * (key_size + 8);
stats.total_input_raw_value_bytes =
num_input_records * value_size;
stats.is_manual_compaction = true;
stats.num_records_replaced = num_records_replaced;
CopyPrefix(stats.smallest_output_key_prefix,
sizeof(stats.smallest_output_key_prefix),
smallest_key);
CopyPrefix(stats.largest_output_key_prefix,
sizeof(stats.largest_output_key_prefix),
largest_key);
return stats;
}
CompressionType GetAnyCompression() {
if (Snappy_Supported()) {
return kSnappyCompression;
} else if (Zlib_Supported()) {
return kZlibCompression;
} else if (BZip2_Supported()) {
return kBZip2Compression;
} else if (LZ4_Supported()) {
return kLZ4Compression;
}
return kNoCompression;
}
} // namespace
TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) {
Random rnd(301);
const int kBufSize = 100;
char buf[kBufSize];
uint64_t key_base = 100000000l;
// Note: key_base must be multiple of num_keys_per_L0_file
int num_keys_per_L0_file = 100;
const int kTestScale = 8;
const int kKeySize = 10;
const int kValueSize = 1000;
const double kCompressionRatio = 0.5;
double compression_ratio = 1.0;
uint64_t key_interval = key_base / num_keys_per_L0_file;
// Whenever a compaction completes, this listener will try to
// verify whether the returned CompactionJobStats matches
// what we expect. The expected CompactionJobStats is added
// via AddExpectedStats().
auto* stats_checker = new CompactionJobStatsChecker();
Options options;
options.listeners.emplace_back(stats_checker);
options.create_if_missing = true;
options.max_background_flushes = 0;
options.max_mem_compaction_level = 0;
// just enough setting to hold off auto-compaction.
options.level0_file_num_compaction_trigger = kTestScale + 1;
options.num_levels = 3;
options.compression = kNoCompression;
for (int test = 0; test < 2; ++test) {
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
// 1st Phase: generate "num_L0_files" L0 files.
int num_L0_files = 0;
for (uint64_t start_key = key_base;
start_key <= key_base * kTestScale;
start_key += key_base) {
MakeTableWithKeyValues(
&rnd, start_key, start_key + key_base - 1,
kKeySize, kValueSize, key_interval,
kCompressionRatio, 1);
snprintf(buf, kBufSize, "%d", ++num_L0_files);
ASSERT_EQ(std::string(buf), FilesPerLevel(1));
}
ASSERT_EQ(std::to_string(num_L0_files), FilesPerLevel(1));
// 2nd Phase: perform L0 -> L1 compaction.
int L0_compaction_count = 6;
int count = 1;
std::string smallest_key;
std::string largest_key;
for (uint64_t start_key = key_base;
start_key <= key_base * L0_compaction_count;
start_key += key_base, count++) {
smallest_key = Key(start_key, 10);
largest_key = Key(start_key + key_base - key_interval, 10);
stats_checker->AddExpectedStats(
NewManualCompactionJobStats(
smallest_key, largest_key,
1, 0, num_keys_per_L0_file,
kKeySize, kValueSize,
1, num_keys_per_L0_file,
compression_ratio, 0));
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
Compact(1, smallest_key, largest_key);
snprintf(buf, kBufSize, "%d,%d", num_L0_files - count, count);
ASSERT_EQ(std::string(buf), FilesPerLevel(1));
}
// compact two files into one in the last L0 -> L1 compaction
int num_remaining_L0 = num_L0_files - L0_compaction_count;
smallest_key = Key(key_base * (L0_compaction_count + 1), 10);
largest_key = Key(key_base * (kTestScale + 1) - key_interval, 10);
stats_checker->AddExpectedStats(
NewManualCompactionJobStats(
smallest_key, largest_key,
num_remaining_L0,
0, num_keys_per_L0_file * num_remaining_L0,
kKeySize, kValueSize,
1, num_keys_per_L0_file * num_remaining_L0,
compression_ratio, 0));
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
Compact(1, smallest_key, largest_key);
int num_L1_files = num_L0_files - num_remaining_L0 + 1;
num_L0_files = 0;
snprintf(buf, kBufSize, "%d,%d", num_L0_files, num_L1_files);
ASSERT_EQ(std::string(buf), FilesPerLevel(1));
// 3rd Phase: generate sparse L0 files (wider key-range, same num of keys)
int sparseness = 2;
for (uint64_t start_key = key_base;
start_key <= key_base * kTestScale;
start_key += key_base * sparseness) {
MakeTableWithKeyValues(
&rnd, start_key, start_key + key_base * sparseness - 1,
kKeySize, kValueSize,
key_base * sparseness / num_keys_per_L0_file,
kCompressionRatio, 1);
snprintf(buf, kBufSize, "%d,%d", ++num_L0_files, num_L1_files);
ASSERT_EQ(std::string(buf), FilesPerLevel(1));
}
// 4th Phase: perform L0 -> L1 compaction again, expect higher write amp
for (uint64_t start_key = key_base;
num_L0_files > 1;
start_key += key_base * sparseness) {
smallest_key = Key(start_key, 10);
largest_key =
Key(start_key + key_base * sparseness - key_interval, 10);
stats_checker->AddExpectedStats(
NewManualCompactionJobStats(
smallest_key, largest_key,
3, 2, num_keys_per_L0_file * 3,
kKeySize, kValueSize,
1, num_keys_per_L0_file * 2, // 1/3 of the data will be updated.
compression_ratio,
num_keys_per_L0_file));
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
Compact(1, smallest_key, largest_key);
snprintf(buf, kBufSize, "%d,%d",
--num_L0_files, --num_L1_files);
ASSERT_EQ(std::string(buf), FilesPerLevel(1));
}
// 5th Phase: Do a full compaction, which involves in two sub-compactions.
// Here we expect to have 1 L0 files and 4 L1 files
// In the first sub-compaction, we expect L0 compaction.
smallest_key = Key(key_base, 10);
largest_key = Key(key_base * (kTestScale + 1) - key_interval, 10);
stats_checker->AddExpectedStats(
NewManualCompactionJobStats(
Key(key_base * (kTestScale + 1 - sparseness), 10), largest_key,
2, 1, num_keys_per_L0_file * 3,
kKeySize, kValueSize,
1, num_keys_per_L0_file * 2,
compression_ratio,
num_keys_per_L0_file));
// In the second sub-compaction, we expect L1 compaction.
stats_checker->AddExpectedStats(
NewManualCompactionJobStats(
smallest_key, largest_key,
4, 0, num_keys_per_L0_file * 8,
kKeySize, kValueSize,
1, num_keys_per_L0_file * 8,
compression_ratio, 0));
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 2U);
Compact(1, smallest_key, largest_key);
ASSERT_EQ("0,1", FilesPerLevel(1));
options.compression = GetAnyCompression();
if (options.compression == kNoCompression) {
break;
}
stats_checker->EnableCompression(true);
compression_ratio = kCompressionRatio;
}
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
}
} // namespace rocksdb
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#endif // !ROCKSDB_LITE
#endif // !defined(IOS_CROSS_COMPILE)

@ -138,6 +138,45 @@ class CompactionJobTest : public testing::Test {
std::shared_ptr<mock::MockTableFactory> mock_table_factory_; std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
}; };
namespace {
void VerifyInitializationOfCompactionJobStats(
const CompactionJobStats& compaction_job_stats) {
#if !defined(IOS_CROSS_COMPILE)
ASSERT_EQ(compaction_job_stats.elapsed_micros, 0U);
ASSERT_EQ(compaction_job_stats.num_input_records, 0U);
ASSERT_EQ(compaction_job_stats.num_input_files, 0U);
ASSERT_EQ(compaction_job_stats.num_input_files_at_output_level, 0U);
ASSERT_EQ(compaction_job_stats.num_output_records, 0U);
ASSERT_EQ(compaction_job_stats.num_output_files, 0U);
ASSERT_EQ(compaction_job_stats.is_manual_compaction, 0U);
ASSERT_EQ(compaction_job_stats.total_input_bytes, 0U);
ASSERT_EQ(compaction_job_stats.total_output_bytes, 0U);
ASSERT_EQ(compaction_job_stats.total_input_raw_key_bytes, 0U);
ASSERT_EQ(compaction_job_stats.total_input_raw_value_bytes, 0U);
ASSERT_EQ(compaction_job_stats.smallest_output_key_prefix[0], 0);
ASSERT_EQ(compaction_job_stats.largest_output_key_prefix[0], 0);
ASSERT_EQ(compaction_job_stats.num_records_replaced, 0U);
#endif // !defined(IOS_CROSS_COMPILE)
}
void VerifyCompactionJobStats(
const CompactionJobStats& compaction_job_stats,
const std::vector<FileMetaData*>& files,
size_t num_output_files,
uint64_t min_elapsed_time) {
ASSERT_GE(compaction_job_stats.elapsed_micros, min_elapsed_time);
ASSERT_EQ(compaction_job_stats.num_input_files, files.size());
ASSERT_EQ(compaction_job_stats.num_output_files, num_output_files);
}
} // namespace
TEST_F(CompactionJobTest, Simple) { TEST_F(CompactionJobTest, Simple) {
auto cfd = versions_->GetColumnFamilySet()->GetDefault(); auto cfd = versions_->GetColumnFamilySet()->GetDefault();
@ -164,11 +203,15 @@ TEST_F(CompactionJobTest, Simple) {
mutex_.Lock(); mutex_.Lock();
EventLogger event_logger(db_options_.info_log.get()); EventLogger event_logger(db_options_.info_log.get());
std::string db_name = "dbname"; std::string db_name = "dbname";
CompactionJobStats compaction_job_stats;
CompactionJob compaction_job(0, compaction.get(), db_options_, env_options_, CompactionJob compaction_job(0, compaction.get(), db_options_, env_options_,
versions_.get(), &shutting_down_, &log_buffer, versions_.get(), &shutting_down_, &log_buffer,
nullptr, nullptr, nullptr, {}, table_cache_, nullptr, nullptr, nullptr, {}, table_cache_,
std::move(yield_callback), &event_logger, false, std::move(yield_callback), &event_logger, false,
db_name); db_name, &compaction_job_stats);
auto start_micros = Env::Default()->NowMicros();
VerifyInitializationOfCompactionJobStats(compaction_job_stats);
compaction_job.Prepare(); compaction_job.Prepare();
mutex_.Unlock(); mutex_.Unlock();
@ -179,6 +222,10 @@ TEST_F(CompactionJobTest, Simple) {
ASSERT_OK(s); ASSERT_OK(s);
mutex_.Unlock(); mutex_.Unlock();
VerifyCompactionJobStats(
compaction_job_stats,
files, 1, (Env::Default()->NowMicros() - start_micros) / 2);
mock_table_factory_->AssertLatestFile(expected_results); mock_table_factory_->AssertLatestFile(expected_results);
ASSERT_EQ(yield_callback_called, 20000); ASSERT_EQ(yield_callback_called, 20000);
} }

@ -1530,8 +1530,20 @@ Status DBImpl::CompactFilesImpl(
&shutting_down_, log_buffer, directories_.GetDbDir(), &shutting_down_, log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->GetOutputPathId()), stats_, directories_.GetDataDir(c->GetOutputPathId()), stats_,
snapshots_.GetAll(), table_cache_, std::move(yield_callback), snapshots_.GetAll(), table_cache_, std::move(yield_callback),
&event_logger_, c->mutable_cf_options()->paranoid_file_checks, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, dbname_,
dbname_); nullptr); // Here we pass a nullptr for CompactionJobStats because
// CompactFiles does not trigger OnCompactionCompleted(),
// which is the only place where CompactionJobStats is
// returned. The idea of not triggering OnCompationCompleted()
// is that CompactFiles runs in the caller thread, so the user
// should always know when it completes. As a result, it makes
// less sense to notify the users something they should already
// know.
//
// In the future, if we would like to add CompactionJobStats
// support for CompactFiles, we should have CompactFiles API
// pass a pointer of CompactionJobStats as the out-value
// instead of using EventListener.
compaction_job.Prepare(); compaction_job.Prepare();
mutex_.Unlock(); mutex_.Unlock();
@ -1570,7 +1582,9 @@ Status DBImpl::CompactFilesImpl(
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
void DBImpl::NotifyOnCompactionCompleted( void DBImpl::NotifyOnCompactionCompleted(
ColumnFamilyData* cfd, Compaction *c, const Status &st) { ColumnFamilyData* cfd, Compaction *c, const Status &st,
const CompactionJobStats& compaction_job_stats,
const uint64_t job_id) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (db_options_.listeners.size() == 0U) { if (db_options_.listeners.size() == 0U) {
return; return;
@ -1585,7 +1599,11 @@ void DBImpl::NotifyOnCompactionCompleted(
CompactionJobInfo info; CompactionJobInfo info;
info.cf_name = cfd->GetName(); info.cf_name = cfd->GetName();
info.status = st; info.status = st;
info.thread_id = ThreadStatusUtil::GetThreadID();
info.job_id = job_id;
info.base_input_level = c->start_level();
info.output_level = c->output_level(); info.output_level = c->output_level();
info.stats = compaction_job_stats;
for (size_t i = 0; i < c->num_input_levels(); ++i) { for (size_t i = 0; i < c->num_input_levels(); ++i) {
for (const auto fmd : *c->inputs(i)) { for (const auto fmd : *c->inputs(i)) {
info.input_files.push_back( info.input_files.push_back(
@ -2246,6 +2264,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
bool is_manual = (manual_compaction_ != nullptr) && bool is_manual = (manual_compaction_ != nullptr) &&
(manual_compaction_->in_progress == false); (manual_compaction_->in_progress == false);
CompactionJobStats compaction_job_stats;
Status status = bg_error_; Status status = bg_error_;
if (status.ok() && shutting_down_.load(std::memory_order_acquire)) { if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
status = Status::ShutdownInProgress(); status = Status::ShutdownInProgress();
@ -2389,6 +2408,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
assert(c->level() == 0); assert(c->level() == 0);
assert(c->column_family_data()->ioptions()->compaction_style == assert(c->column_family_data()->ioptions()->compaction_style ==
kCompactionStyleFIFO); kCompactionStyleFIFO);
compaction_job_stats.num_input_files = c->num_input_files(0);
for (const auto& f : *c->inputs(0)) { for (const auto& f : *c->inputs(0)) {
c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
} }
@ -2408,6 +2430,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
ThreadStatusUtil::SetColumnFamily(c->column_family_data()); ThreadStatusUtil::SetColumnFamily(c->column_family_data());
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION); ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
compaction_job_stats.num_input_files = c->num_input_files(0);
// Move file to next level // Move file to next level
assert(c->num_input_files(0) == 1); assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0); FileMetaData* f = c->input(0, 0);
@ -2456,7 +2480,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
directories_.GetDataDir(c->GetOutputPathId()), stats_, directories_.GetDataDir(c->GetOutputPathId()), stats_,
snapshots_.GetAll(), table_cache_, std::move(yield_callback), snapshots_.GetAll(), table_cache_, std::move(yield_callback),
&event_logger_, c->mutable_cf_options()->paranoid_file_checks, &event_logger_, c->mutable_cf_options()->paranoid_file_checks,
dbname_); dbname_, &compaction_job_stats);
compaction_job.Prepare(); compaction_job.Prepare();
mutex_.Unlock(); mutex_.Unlock();
@ -2470,9 +2494,10 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
} }
*madeProgress = true; *madeProgress = true;
} }
// FIXME(orib): should I check if column family data is null?
if (c != nullptr) { if (c != nullptr) {
NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status); NotifyOnCompactionCompleted(
c->column_family_data(), c.get(), status,
compaction_job_stats, job_context->job_id);
c->ReleaseCompactionFiles(status); c->ReleaseCompactionFiles(status);
*madeProgress = true; *madeProgress = true;
} }

@ -351,7 +351,9 @@ class DBImpl : public DB {
const MutableCFOptions& mutable_cf_options); const MutableCFOptions& mutable_cf_options);
void NotifyOnCompactionCompleted(ColumnFamilyData* cfd, void NotifyOnCompactionCompleted(ColumnFamilyData* cfd,
Compaction *c, const Status &st); Compaction *c, const Status &st,
const CompactionJobStats& job_stats,
uint64_t job_id);
void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const; void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const;

@ -3,14 +3,16 @@
// LICENSE file in the root directory of this source tree. An additional grant // LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory. // of patent rights can be found in the PATENTS file in the same directory.
// //
#include <stdio.h>
#include <string>
#include "merge_helper.h" #include "merge_helper.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/merge_operator.h" #include "rocksdb/merge_operator.h"
#include "util/statistics.h" #include "util/statistics.h"
#include <string>
#include <stdio.h>
#include "util/perf_context_imp.h" #include "util/perf_context_imp.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
@ -222,7 +224,6 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
UpdateInternalKey(&original_key[0], original_key.size(), UpdateInternalKey(&original_key[0], original_key.size(),
orig_ikey.sequence, orig_ikey.type); orig_ikey.sequence, orig_ikey.type);
// The final value() is always stored in operands_.back()
swap(operands_.back(),merge_result); swap(operands_.back(),merge_result);
} else { } else {
RecordTick(stats, NUMBER_MERGE_FAILURES); RecordTick(stats, NUMBER_MERGE_FAILURES);

@ -0,0 +1,51 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <stddef.h>
#include <stdint.h>
namespace rocksdb {
struct CompactionJobStats {
CompactionJobStats() { Reset(); }
void Reset();
// the elapsed time in micro of this compaction.
uint64_t elapsed_micros;
// the number of compaction input records.
uint64_t num_input_records;
// the number of compaction input files.
size_t num_input_files;
// the number of compaction input files at the output level.
size_t num_input_files_at_output_level;
// the number of compaction output records.
uint64_t num_output_records;
// the number of compaction output files.
size_t num_output_files;
// true if the compaction is a manual compaction
bool is_manual_compaction;
// the size of the compaction input in bytes.
uint64_t total_input_bytes;
// the size of the compaction output in bytes.
uint64_t total_output_bytes;
// number of records being replaced by newer record associated with same key
uint64_t num_records_replaced;
// the sum of the uncompressed input keys in bytes.
uint64_t total_input_raw_key_bytes;
// the sum of the uncompressed input values in bytes.
uint64_t total_input_raw_value_bytes;
// 0-terminated strings storing the first 8 bytes of the smallest and
// largest key in the output.
char smallest_output_key_prefix[9];
char largest_output_key_prefix[9];
};
} // namespace rocksdb

@ -2,9 +2,7 @@
// This source code is licensed under the BSD-style license found in the // This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant // LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory. // of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#ifndef INCLUDE_ROCKSDB_IOSTATS_CONTEXT_H_
#define INCLUDE_ROCKSDB_IOSTATS_CONTEXT_H_
#include <stdint.h> #include <stdint.h>
#include <string> #include <string>
@ -48,5 +46,3 @@ extern __thread IOStatsContext iostats_context;
#endif // IOS_CROSS_COMPILE #endif // IOS_CROSS_COMPILE
} // namespace rocksdb } // namespace rocksdb
#endif // INCLUDE_ROCKSDB_IOSTATS_CONTEXT_H_

@ -7,6 +7,7 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "rocksdb/compaction_job_stats.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/table_properties.h" #include "rocksdb/table_properties.h"
@ -14,6 +15,7 @@ namespace rocksdb {
class DB; class DB;
class Status; class Status;
struct CompactionJobStats;
struct TableFileCreationInfo { struct TableFileCreationInfo {
TableFileCreationInfo() = default; TableFileCreationInfo() = default;
@ -38,16 +40,29 @@ struct TableFileCreationInfo {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
struct CompactionJobInfo { struct CompactionJobInfo {
CompactionJobInfo() = default;
explicit CompactionJobInfo(const CompactionJobStats& _stats) :
stats(_stats) {}
// the name of the column family where the compaction happened. // the name of the column family where the compaction happened.
std::string cf_name; std::string cf_name;
// the status indicating whether the compaction was successful or not. // the status indicating whether the compaction was successful or not.
Status status; Status status;
// the id of the thread that completed this compaction job.
uint64_t thread_id;
// the job id, which is unique in the same thread.
int job_id;
// the smallest input level of the compaction.
int base_input_level;
// the output level of the compaction. // the output level of the compaction.
int output_level; int output_level;
// the names of the compaction input files. // the names of the compaction input files.
std::vector<std::string> input_files; std::vector<std::string> input_files;
// the names of the compaction output files. // the names of the compaction output files.
std::vector<std::string> output_files; std::vector<std::string> output_files;
// If non-null, this variable stores detailed information
// about this compaction.
CompactionJobStats stats;
}; };
// EventListener class contains a set of call-back functions that will // EventListener class contains a set of call-back functions that will

@ -78,6 +78,7 @@ LIB_SOURCES = \
util/cache.cc \ util/cache.cc \
util/coding.cc \ util/coding.cc \
util/comparator.cc \ util/comparator.cc \
util/compaction_job_stats_impl.cc \
util/crc32c.cc \ util/crc32c.cc \
util/db_info_dumper.cc \ util/db_info_dumper.cc \
util/dynamic_bloom.cc \ util/dynamic_bloom.cc \
@ -153,6 +154,7 @@ TEST_BENCH_SOURCES = \
third-party/gtest-1.7.0/fused-src/gtest/gtest-all.cc \ third-party/gtest-1.7.0/fused-src/gtest/gtest-all.cc \
db/column_family_test.cc \ db/column_family_test.cc \
db/compaction_job_test.cc \ db/compaction_job_test.cc \
db/compaction_job_stats_test.cc \
db/compaction_picker_test.cc \ db/compaction_picker_test.cc \
db/comparator_db_test.cc \ db/comparator_db_test.cc \
db/corruption_test.cc \ db/corruption_test.cc \

@ -0,0 +1,44 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <cstring>
#include "include/rocksdb/compaction_job_stats.h"
namespace rocksdb {
#ifndef ROCKSDB_LITE
void CompactionJobStats::Reset() {
elapsed_micros = 0;
num_input_files = 0;
num_input_files_at_output_level = 0;
num_output_files = 0;
num_input_records = 0;
num_output_records = 0;
total_input_bytes = 0;
total_output_bytes = 0;
total_input_raw_key_bytes = 0;
total_input_raw_value_bytes = 0;
num_records_replaced = 0;
is_manual_compaction = 0;
smallest_output_key_prefix[0] = 0;
largest_output_key_prefix[0] = 0;
}
#else
void CompactionJobStats::Reset() {
}
#endif // !ROCKSDB_LITE
} // namespace rocksdb

@ -15,6 +15,11 @@ namespace rocksdb {
__thread ThreadStatusData* ThreadStatusUpdater::thread_status_data_ = nullptr; __thread ThreadStatusData* ThreadStatusUpdater::thread_status_data_ = nullptr;
uint64_t ThreadStatusUpdater::GetThreadID() {
auto* data = InitAndGet();
return data->thread_id;
}
void ThreadStatusUpdater::UnregisterThread() { void ThreadStatusUpdater::UnregisterThread() {
if (thread_status_data_ != nullptr) { if (thread_status_data_ != nullptr) {
std::lock_guard<std::mutex> lck(thread_list_mutex_); std::lock_guard<std::mutex> lck(thread_list_mutex_);
@ -292,6 +297,10 @@ void ThreadStatusUpdater::UnregisterThread() {
void ThreadStatusUpdater::ResetThreadStatus() { void ThreadStatusUpdater::ResetThreadStatus() {
} }
uint64_t ThreadStatusUpdater::GetThreadID() {
return 0;
}
void ThreadStatusUpdater::SetThreadType( void ThreadStatusUpdater::SetThreadType(
ThreadStatus::ThreadType ttype) { ThreadStatus::ThreadType ttype) {
} }

@ -115,6 +115,8 @@ class ThreadStatusUpdater {
// ColumnFamilyInfoKey, ThreadOperation, and ThreadState. // ColumnFamilyInfoKey, ThreadOperation, and ThreadState.
void ResetThreadStatus(); void ResetThreadStatus();
uint64_t GetThreadID();
// Set the thread type of the current thread. // Set the thread type of the current thread.
void SetThreadType(ThreadStatus::ThreadType ttype); void SetThreadType(ThreadStatus::ThreadType ttype);

@ -32,6 +32,16 @@ void ThreadStatusUtil::UnregisterThread() {
} }
} }
uint64_t ThreadStatusUtil::GetThreadID() {
if (thread_updater_local_cache_ == nullptr) {
// thread_updater_local_cache_ must be set in SetColumnFamily
// or other ThreadStatusUtil functions.
return 0;
}
return thread_updater_local_cache_->GetThreadID();
}
void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) { void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) {
if (!MaybeInitThreadLocalUpdater(cfd->ioptions()->env)) { if (!MaybeInitThreadLocalUpdater(cfd->ioptions()->env)) {
return; return;
@ -170,6 +180,10 @@ bool ThreadStatusUtil::MaybeInitThreadLocalUpdater(const Env* env) {
return false; return false;
} }
uint64_t ThreadStatusUtil::GetThreadID() {
return 0;
}
void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) { void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) {
} }

@ -27,6 +27,8 @@ class ColumnFamilyData;
// all function calls to ThreadStatusUtil will be no-op. // all function calls to ThreadStatusUtil will be no-op.
class ThreadStatusUtil { class ThreadStatusUtil {
public: public:
static uint64_t GetThreadID();
// Set the thread type of the current thread. // Set the thread type of the current thread.
static void SetThreadType( static void SetThreadType(
const Env* env, ThreadStatus::ThreadType thread_type); const Env* env, ThreadStatus::ThreadType thread_type);

Loading…
Cancel
Save