|
|
|
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
//
|
|
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
|
|
|
|
#include <algorithm>
|
|
|
|
#include <cinttypes>
|
|
|
|
#include <iostream>
|
|
|
|
#include <mutex>
|
|
|
|
#include <queue>
|
|
|
|
#include <set>
|
|
|
|
#include <thread>
|
|
|
|
#include <unordered_set>
|
|
|
|
#include <utility>
|
|
|
|
|
|
|
|
#include "db/db_impl/db_impl.h"
|
|
|
|
#include "db/dbformat.h"
|
|
|
|
#include "db/job_context.h"
|
|
|
|
#include "db/version_set.h"
|
|
|
|
#include "db/write_batch_internal.h"
|
|
|
|
#include "env/mock_env.h"
|
|
|
|
#include "file/filename.h"
|
|
|
|
#include "monitoring/statistics_impl.h"
|
|
|
|
#include "monitoring/thread_status_util.h"
|
|
|
|
#include "port/stack_trace.h"
|
|
|
|
#include "rocksdb/cache.h"
|
|
|
|
#include "rocksdb/compaction_filter.h"
|
|
|
|
#include "rocksdb/convenience.h"
|
|
|
|
#include "rocksdb/db.h"
|
|
|
|
#include "rocksdb/env.h"
|
|
|
|
#include "rocksdb/experimental.h"
|
|
|
|
#include "rocksdb/filter_policy.h"
|
|
|
|
#include "rocksdb/options.h"
|
|
|
|
#include "rocksdb/perf_context.h"
|
|
|
|
#include "rocksdb/slice.h"
|
|
|
|
#include "rocksdb/slice_transform.h"
|
|
|
|
#include "rocksdb/table.h"
|
|
|
|
#include "rocksdb/table_properties.h"
|
|
|
|
#include "rocksdb/thread_status.h"
|
|
|
|
#include "rocksdb/utilities/checkpoint.h"
|
|
|
|
#include "rocksdb/utilities/write_batch_with_index.h"
|
|
|
|
#include "table/block_based/block_based_table_factory.h"
|
|
|
|
#include "table/mock_table.h"
|
|
|
|
#include "table/plain/plain_table_factory.h"
|
|
|
|
#include "table/scoped_arena_iterator.h"
|
|
|
|
#include "test_util/sync_point.h"
|
|
|
|
#include "test_util/testharness.h"
|
|
|
|
#include "test_util/testutil.h"
|
|
|
|
#include "util/cast_util.h"
|
|
|
|
#include "util/compression.h"
|
|
|
|
#include "util/hash.h"
|
|
|
|
#include "util/mutexlock.h"
|
|
|
|
#include "util/rate_limiter_impl.h"
|
|
|
|
#include "util/string_util.h"
|
|
|
|
#include "utilities/merge_operators.h"
|
|
|
|
|
|
|
|
#if !defined(IOS_CROSS_COMPILE)
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
|
|
|
|
static std::string RandomString(Random* rnd, int len, double ratio) {
|
|
|
|
std::string r;
|
|
|
|
test::CompressibleString(rnd, ratio, len, &r);
|
|
|
|
return r;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::string Key(uint64_t key, int length) {
|
|
|
|
const int kBufSize = 1000;
|
|
|
|
char buf[kBufSize];
|
|
|
|
if (length > kBufSize) {
|
|
|
|
length = kBufSize;
|
|
|
|
}
|
|
|
|
snprintf(buf, kBufSize, "%0*" PRIu64, length, key);
|
|
|
|
return std::string(buf);
|
|
|
|
}
|
|
|
|
|
Parallelize L0-L1 Compaction: Restructure Compaction Job
Summary:
As of now compactions involving files from Level 0 and Level 1 are single
threaded because the files in L0, although sorted, are not range partitioned like
the other levels. This means that during L0-L1 compaction each file from L1
needs to be merged with potentially all the files from L0.
This attempt to parallelize the L0-L1 compaction assigns a thread and a
corresponding iterator to each L1 file that then considers only the key range
found in that L1 file and only the L0 files that have those keys (and only the
specific portion of those L0 files in which those keys are found). In this way
the overlap is minimized and potentially eliminated between different iterators
focusing on the same files.
The first step is to restructure the compaction logic to break L0-L1 compactions
into multiple, smaller, sequential compactions. Eventually each of these smaller
jobs will be run simultaneously. Areas to pay extra attention to are
# Correct aggregation of compaction job statistics across multiple threads
# Proper opening/closing of output files (make sure each thread's is unique)
# Keys that span multiple L1 files
# Skewed distributions of keys within L0 files
Test Plan: Make and run db_test (newer version has separate compaction tests) and compaction_job_stats_test
Reviewers: igor, noetzli, anthony, sdong, yhchiang
Reviewed By: yhchiang
Subscribers: MarkCallaghan, dhruba, leveldb
Differential Revision: https://reviews.facebook.net/D42699
9 years ago
|
|
|
class CompactionJobStatsTest : public testing::Test,
|
|
|
|
public testing::WithParamInterface<bool> {
|
|
|
|
public:
|
|
|
|
std::string dbname_;
|
|
|
|
std::string alternative_wal_dir_;
|
|
|
|
Env* env_;
|
|
|
|
DB* db_;
|
|
|
|
std::vector<ColumnFamilyHandle*> handles_;
|
|
|
|
uint32_t max_subcompactions_;
|
|
|
|
|
|
|
|
Options last_options_;
|
|
|
|
|
|
|
|
CompactionJobStatsTest() : env_(Env::Default()) {
|
|
|
|
env_->SetBackgroundThreads(1, Env::LOW);
|
|
|
|
env_->SetBackgroundThreads(1, Env::HIGH);
|
|
|
|
dbname_ = test::PerThreadDBPath("compaction_job_stats_test");
|
|
|
|
alternative_wal_dir_ = dbname_ + "/wal";
|
|
|
|
Options options;
|
|
|
|
options.create_if_missing = true;
|
|
|
|
max_subcompactions_ = GetParam();
|
|
|
|
options.max_subcompactions = max_subcompactions_;
|
|
|
|
auto delete_options = options;
|
|
|
|
delete_options.wal_dir = alternative_wal_dir_;
|
|
|
|
EXPECT_OK(DestroyDB(dbname_, delete_options));
|
|
|
|
// Destroy it for not alternative WAL dir is used.
|
|
|
|
EXPECT_OK(DestroyDB(dbname_, options));
|
|
|
|
db_ = nullptr;
|
|
|
|
Reopen(options);
|
|
|
|
}
|
|
|
|
|
|
|
|
~CompactionJobStatsTest() override {
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({});
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
|
|
Close();
|
|
|
|
Options options;
|
|
|
|
options.db_paths.emplace_back(dbname_, 0);
|
|
|
|
options.db_paths.emplace_back(dbname_ + "_2", 0);
|
|
|
|
options.db_paths.emplace_back(dbname_ + "_3", 0);
|
|
|
|
options.db_paths.emplace_back(dbname_ + "_4", 0);
|
|
|
|
EXPECT_OK(DestroyDB(dbname_, options));
|
|
|
|
}
|
|
|
|
|
|
|
|
// Required if inheriting from testing::WithParamInterface<>
|
Parallelize L0-L1 Compaction: Restructure Compaction Job
Summary:
As of now compactions involving files from Level 0 and Level 1 are single
threaded because the files in L0, although sorted, are not range partitioned like
the other levels. This means that during L0-L1 compaction each file from L1
needs to be merged with potentially all the files from L0.
This attempt to parallelize the L0-L1 compaction assigns a thread and a
corresponding iterator to each L1 file that then considers only the key range
found in that L1 file and only the L0 files that have those keys (and only the
specific portion of those L0 files in which those keys are found). In this way
the overlap is minimized and potentially eliminated between different iterators
focusing on the same files.
The first step is to restructure the compaction logic to break L0-L1 compactions
into multiple, smaller, sequential compactions. Eventually each of these smaller
jobs will be run simultaneously. Areas to pay extra attention to are
# Correct aggregation of compaction job statistics across multiple threads
# Proper opening/closing of output files (make sure each thread's is unique)
# Keys that span multiple L1 files
# Skewed distributions of keys within L0 files
Test Plan: Make and run db_test (newer version has separate compaction tests) and compaction_job_stats_test
Reviewers: igor, noetzli, anthony, sdong, yhchiang
Reviewed By: yhchiang
Subscribers: MarkCallaghan, dhruba, leveldb
Differential Revision: https://reviews.facebook.net/D42699
9 years ago
|
|
|
static void SetUpTestCase() {}
|
|
|
|
static void TearDownTestCase() {}
|
|
|
|
|
|
|
|
DBImpl* dbfull() { return static_cast_with_check<DBImpl>(db_); }
|
|
|
|
|
|
|
|
void CreateColumnFamilies(const std::vector<std::string>& cfs,
|
|
|
|
const Options& options) {
|
|
|
|
ColumnFamilyOptions cf_opts(options);
|
|
|
|
size_t cfi = handles_.size();
|
|
|
|
handles_.resize(cfi + cfs.size());
|
|
|
|
for (auto cf : cfs) {
|
|
|
|
ASSERT_OK(db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++]));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void CreateAndReopenWithCF(const std::vector<std::string>& cfs,
|
|
|
|
const Options& options) {
|
|
|
|
CreateColumnFamilies(cfs, options);
|
|
|
|
std::vector<std::string> cfs_plus_default = cfs;
|
|
|
|
cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
|
|
|
|
ReopenWithColumnFamilies(cfs_plus_default, options);
|
|
|
|
}
|
|
|
|
|
|
|
|
void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
|
|
|
|
const std::vector<Options>& options) {
|
|
|
|
ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
|
|
|
|
}
|
|
|
|
|
|
|
|
void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
|
|
|
|
const Options& options) {
|
|
|
|
ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
|
|
|
|
}
|
|
|
|
|
|
|
|
Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
|
|
|
|
const std::vector<Options>& options) {
|
|
|
|
Close();
|
|
|
|
EXPECT_EQ(cfs.size(), options.size());
|
|
|
|
std::vector<ColumnFamilyDescriptor> column_families;
|
|
|
|
for (size_t i = 0; i < cfs.size(); ++i) {
|
|
|
|
column_families.push_back(ColumnFamilyDescriptor(cfs[i], options[i]));
|
|
|
|
}
|
|
|
|
DBOptions db_opts = DBOptions(options[0]);
|
|
|
|
return DB::Open(db_opts, dbname_, column_families, &handles_, &db_);
|
|
|
|
}
|
|
|
|
|
|
|
|
Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
|
|
|
|
const Options& options) {
|
|
|
|
Close();
|
|
|
|
std::vector<Options> v_opts(cfs.size(), options);
|
|
|
|
return TryReopenWithColumnFamilies(cfs, v_opts);
|
|
|
|
}
|
|
|
|
|
|
|
|
void Reopen(const Options& options) { ASSERT_OK(TryReopen(options)); }
|
|
|
|
|
|
|
|
void Close() {
|
|
|
|
for (auto h : handles_) {
|
|
|
|
delete h;
|
|
|
|
}
|
|
|
|
handles_.clear();
|
|
|
|
delete db_;
|
|
|
|
db_ = nullptr;
|
|
|
|
}
|
|
|
|
|
|
|
|
void DestroyAndReopen(const Options& options) {
|
|
|
|
// Destroy using last options
|
|
|
|
Destroy(last_options_);
|
|
|
|
ASSERT_OK(TryReopen(options));
|
|
|
|
}
|
|
|
|
|
|
|
|
void Destroy(const Options& options) {
|
|
|
|
Close();
|
|
|
|
ASSERT_OK(DestroyDB(dbname_, options));
|
|
|
|
}
|
|
|
|
|
|
|
|
Status ReadOnlyReopen(const Options& options) {
|
|
|
|
return DB::OpenForReadOnly(options, dbname_, &db_);
|
|
|
|
}
|
|
|
|
|
|
|
|
Status TryReopen(const Options& options) {
|
|
|
|
Close();
|
|
|
|
last_options_ = options;
|
|
|
|
return DB::Open(options, dbname_, &db_);
|
|
|
|
}
|
|
|
|
|
|
|
|
Status Flush(int cf = 0) {
|
|
|
|
if (cf == 0) {
|
|
|
|
return db_->Flush(FlushOptions());
|
|
|
|
} else {
|
|
|
|
return db_->Flush(FlushOptions(), handles_[cf]);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()) {
|
|
|
|
return db_->Put(wo, k, v);
|
|
|
|
}
|
|
|
|
|
|
|
|
Status Put(int cf, const Slice& k, const Slice& v,
|
|
|
|
WriteOptions wo = WriteOptions()) {
|
|
|
|
return db_->Put(wo, handles_[cf], k, v);
|
|
|
|
}
|
|
|
|
|
|
|
|
Status Delete(const std::string& k) { return db_->Delete(WriteOptions(), k); }
|
|
|
|
|
|
|
|
Status Delete(int cf, const std::string& k) {
|
|
|
|
return db_->Delete(WriteOptions(), handles_[cf], k);
|
|
|
|
}
|
|
|
|
|
|
|
|
std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) {
|
|
|
|
ReadOptions options;
|
|
|
|
options.verify_checksums = true;
|
|
|
|
options.snapshot = snapshot;
|
|
|
|
std::string result;
|
|
|
|
Status s = db_->Get(options, k, &result);
|
|
|
|
if (s.IsNotFound()) {
|
|
|
|
result = "NOT_FOUND";
|
|
|
|
} else if (!s.ok()) {
|
|
|
|
result = s.ToString();
|
|
|
|
}
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::string Get(int cf, const std::string& k,
|
|
|
|
const Snapshot* snapshot = nullptr) {
|
|
|
|
ReadOptions options;
|
|
|
|
options.verify_checksums = true;
|
|
|
|
options.snapshot = snapshot;
|
|
|
|
std::string result;
|
|
|
|
Status s = db_->Get(options, handles_[cf], k, &result);
|
|
|
|
if (s.IsNotFound()) {
|
|
|
|
result = "NOT_FOUND";
|
|
|
|
} else if (!s.ok()) {
|
|
|
|
result = s.ToString();
|
|
|
|
}
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
|
|
|
int NumTableFilesAtLevel(int level, int cf = 0) {
|
|
|
|
std::string property;
|
|
|
|
if (cf == 0) {
|
|
|
|
// default cfd
|
|
|
|
EXPECT_TRUE(db_->GetProperty(
|
|
|
|
"rocksdb.num-files-at-level" + std::to_string(level), &property));
|
|
|
|
} else {
|
|
|
|
EXPECT_TRUE(db_->GetProperty(
|
|
|
|
handles_[cf], "rocksdb.num-files-at-level" + std::to_string(level),
|
|
|
|
&property));
|
|
|
|
}
|
|
|
|
return atoi(property.c_str());
|
|
|
|
}
|
|
|
|
|
|
|
|
// Return spread of files per level
|
|
|
|
std::string FilesPerLevel(int cf = 0) {
|
|
|
|
int num_levels =
|
|
|
|
(cf == 0) ? db_->NumberLevels() : db_->NumberLevels(handles_[1]);
|
|
|
|
std::string result;
|
|
|
|
size_t last_non_zero_offset = 0;
|
|
|
|
for (int level = 0; level < num_levels; level++) {
|
|
|
|
int f = NumTableFilesAtLevel(level, cf);
|
|
|
|
char buf[100];
|
|
|
|
snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
|
|
|
|
result += buf;
|
|
|
|
if (f > 0) {
|
|
|
|
last_non_zero_offset = result.size();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
result.resize(last_non_zero_offset);
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
|
|
|
Status Size(uint64_t* size, const Slice& start, const Slice& limit,
|
|
|
|
int cf = 0) {
|
|
|
|
Range r(start, limit);
|
|
|
|
if (cf == 0) {
|
|
|
|
return db_->GetApproximateSizes(&r, 1, size);
|
|
|
|
} else {
|
|
|
|
return db_->GetApproximateSizes(handles_[1], &r, 1, size);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void Compact(int cf, const Slice& start, const Slice& limit,
|
|
|
|
uint32_t target_path_id) {
|
|
|
|
CompactRangeOptions compact_options;
|
|
|
|
compact_options.target_path_id = target_path_id;
|
|
|
|
ASSERT_OK(db_->CompactRange(compact_options, handles_[cf], &start, &limit));
|
|
|
|
}
|
|
|
|
|
|
|
|
void Compact(int cf, const Slice& start, const Slice& limit) {
|
|
|
|
ASSERT_OK(
|
|
|
|
db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit));
|
|
|
|
}
|
|
|
|
|
|
|
|
void Compact(const Slice& start, const Slice& limit) {
|
|
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &limit));
|
|
|
|
}
|
|
|
|
|
Allowing L0 -> L1 trivial move on sorted data
Summary:
This diff updates the logic of how we do trivial move, now trivial move can run on any number of files in input level as long as they are not overlapping
The conditions for trivial move have been updated
Introduced conditions:
- Trivial move cannot happen if we have a compaction filter (except if the compaction is not manual)
- Input level files cannot be overlapping
Removed conditions:
- Trivial move only run when the compaction is not manual
- Input level should can contain only 1 file
More context on what tests failed because of Trivial move
```
DBTest.CompactionsGenerateMultipleFiles
This test is expecting compaction on a file in L0 to generate multiple files in L1, this test will fail with trivial move because we end up with one file in L1
```
```
DBTest.NoSpaceCompactRange
This test expect compaction to fail when we force environment to report running out of space, of course this is not valid in trivial move situation
because trivial move does not need any extra space, and did not check for that
```
```
DBTest.DropWrites
Similar to DBTest.NoSpaceCompactRange
```
```
DBTest.DeleteObsoleteFilesPendingOutputs
This test expect that a file in L2 is deleted after it's moved to L3, this is not valid with trivial move because although the file was moved it is now used by L3
```
```
CuckooTableDBTest.CompactionIntoMultipleFiles
Same as DBTest.CompactionsGenerateMultipleFiles
```
This diff is based on a work by @sdong https://reviews.facebook.net/D34149
Test Plan: make -j64 check
Reviewers: rven, sdong, igor
Reviewed By: igor
Subscribers: yhchiang, ott, march, dhruba, sdong
Differential Revision: https://reviews.facebook.net/D34797
10 years ago
|
|
|
void TEST_Compact(int level, int cf, const Slice& start, const Slice& limit) {
|
|
|
|
ASSERT_OK(dbfull()->TEST_CompactRange(level, &start, &limit, handles_[cf],
|
|
|
|
true /* disallow trivial move */));
|
|
|
|
}
|
|
|
|
|
|
|
|
// Do n memtable compactions, each of which produces an sstable
|
|
|
|
// covering the range [small,large].
|
|
|
|
void MakeTables(int n, const std::string& small, const std::string& large,
|
|
|
|
int cf = 0) {
|
|
|
|
for (int i = 0; i < n; i++) {
|
|
|
|
ASSERT_OK(Put(cf, small, "begin"));
|
|
|
|
ASSERT_OK(Put(cf, large, "end"));
|
|
|
|
ASSERT_OK(Flush(cf));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
static void SetDeletionCompactionStats(CompactionJobStats* stats,
|
|
|
|
uint64_t input_deletions,
|
|
|
|
uint64_t expired_deletions,
|
|
|
|
uint64_t records_replaced) {
|
|
|
|
stats->num_input_deletion_records = input_deletions;
|
|
|
|
stats->num_expired_deletion_records = expired_deletions;
|
|
|
|
stats->num_records_replaced = records_replaced;
|
|
|
|
}
|
|
|
|
|
|
|
|
void MakeTableWithKeyValues(Random* rnd, uint64_t smallest, uint64_t largest,
|
|
|
|
int key_size, int value_size, uint64_t interval,
|
|
|
|
double ratio, int cf = 0) {
|
|
|
|
for (auto key = smallest; key < largest; key += interval) {
|
|
|
|
ASSERT_OK(Put(cf, Slice(Key(key, key_size)),
|
|
|
|
Slice(RandomString(rnd, value_size, ratio))));
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush(cf));
|
|
|
|
}
|
|
|
|
|
|
|
|
// This function behaves with the implicit understanding that two
|
|
|
|
// rounds of keys are inserted into the database, as per the behavior
|
|
|
|
// of the DeletionStatsTest.
|
|
|
|
void SelectivelyDeleteKeys(uint64_t smallest, uint64_t largest,
|
|
|
|
uint64_t interval, int deletion_interval,
|
|
|
|
int key_size, uint64_t cutoff_key_num,
|
|
|
|
CompactionJobStats* stats, int cf = 0) {
|
|
|
|
// interval needs to be >= 2 so that deletion entries can be inserted
|
|
|
|
// that are intended to not result in an actual key deletion by using
|
|
|
|
// an offset of 1 from another existing key
|
|
|
|
ASSERT_GE(interval, 2);
|
|
|
|
|
|
|
|
uint64_t ctr = 1;
|
|
|
|
uint32_t deletions_made = 0;
|
|
|
|
uint32_t num_deleted = 0;
|
|
|
|
uint32_t num_expired = 0;
|
|
|
|
for (auto key = smallest; key <= largest; key += interval, ctr++) {
|
|
|
|
if (ctr % deletion_interval == 0) {
|
|
|
|
ASSERT_OK(Delete(cf, Key(key, key_size)));
|
|
|
|
deletions_made++;
|
|
|
|
num_deleted++;
|
|
|
|
|
|
|
|
if (key > cutoff_key_num) {
|
|
|
|
num_expired++;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Insert some deletions for keys that don't exist that
|
|
|
|
// are both in and out of the key range
|
|
|
|
ASSERT_OK(Delete(cf, Key(smallest + 1, key_size)));
|
|
|
|
deletions_made++;
|
|
|
|
|
|
|
|
ASSERT_OK(Delete(cf, Key(smallest - 1, key_size)));
|
|
|
|
deletions_made++;
|
|
|
|
num_expired++;
|
|
|
|
|
|
|
|
ASSERT_OK(Delete(cf, Key(smallest - 9, key_size)));
|
|
|
|
deletions_made++;
|
|
|
|
num_expired++;
|
|
|
|
|
|
|
|
ASSERT_OK(Flush(cf));
|
|
|
|
SetDeletionCompactionStats(stats, deletions_made, num_expired, num_deleted);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
// An EventListener which helps verify the compaction results in
|
|
|
|
// test CompactionJobStatsTest.
|
|
|
|
class CompactionJobStatsChecker : public EventListener {
|
|
|
|
public:
|
Add options.compaction_measure_io_stats to print write I/O stats in compactions
Summary:
Add options.compaction_measure_io_stats to print out / pass to listener accumulated time spent on write calls. Example outputs in info logs:
2015/08/12-16:27:59.463944 7fd428bff700 (Original Log Time 2015/08/12-16:27:59.463922) EVENT_LOG_v1 {"time_micros": 1439422079463897, "job": 6, "event": "compaction_finished", "output_level": 1, "num_output_files": 4, "total_output_size": 6900525, "num_input_records": 111483, "num_output_records": 106877, "file_write_nanos": 15663206, "file_range_sync_nanos": 649588, "file_fsync_nanos": 349614797, "file_prepare_write_nanos": 1505812, "lsm_state": [2, 4, 0, 0, 0, 0, 0]}
Add two more counters in iostats_context.
Also add a parameter of db_bench.
Test Plan: Add a unit test. Also manually verify LOG outputs in db_bench
Subscribers: leveldb, dhruba
Differential Revision: https://reviews.facebook.net/D44115
9 years ago
|
|
|
CompactionJobStatsChecker()
|
|
|
|
: compression_enabled_(false), verify_next_comp_io_stats_(false) {}
|
|
|
|
|
|
|
|
size_t NumberOfUnverifiedStats() { return expected_stats_.size(); }
|
|
|
|
|
Add options.compaction_measure_io_stats to print write I/O stats in compactions
Summary:
Add options.compaction_measure_io_stats to print out / pass to listener accumulated time spent on write calls. Example outputs in info logs:
2015/08/12-16:27:59.463944 7fd428bff700 (Original Log Time 2015/08/12-16:27:59.463922) EVENT_LOG_v1 {"time_micros": 1439422079463897, "job": 6, "event": "compaction_finished", "output_level": 1, "num_output_files": 4, "total_output_size": 6900525, "num_input_records": 111483, "num_output_records": 106877, "file_write_nanos": 15663206, "file_range_sync_nanos": 649588, "file_fsync_nanos": 349614797, "file_prepare_write_nanos": 1505812, "lsm_state": [2, 4, 0, 0, 0, 0, 0]}
Add two more counters in iostats_context.
Also add a parameter of db_bench.
Test Plan: Add a unit test. Also manually verify LOG outputs in db_bench
Subscribers: leveldb, dhruba
Differential Revision: https://reviews.facebook.net/D44115
9 years ago
|
|
|
void set_verify_next_comp_io_stats(bool v) { verify_next_comp_io_stats_ = v; }
|
|
|
|
|
|
|
|
// Once a compaction completed, this function will verify the returned
|
|
|
|
// CompactionJobInfo with the oldest CompactionJobInfo added earlier
|
|
|
|
// in "expected_stats_" which has not yet being used for verification.
|
|
|
|
void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
|
Add options.compaction_measure_io_stats to print write I/O stats in compactions
Summary:
Add options.compaction_measure_io_stats to print out / pass to listener accumulated time spent on write calls. Example outputs in info logs:
2015/08/12-16:27:59.463944 7fd428bff700 (Original Log Time 2015/08/12-16:27:59.463922) EVENT_LOG_v1 {"time_micros": 1439422079463897, "job": 6, "event": "compaction_finished", "output_level": 1, "num_output_files": 4, "total_output_size": 6900525, "num_input_records": 111483, "num_output_records": 106877, "file_write_nanos": 15663206, "file_range_sync_nanos": 649588, "file_fsync_nanos": 349614797, "file_prepare_write_nanos": 1505812, "lsm_state": [2, 4, 0, 0, 0, 0, 0]}
Add two more counters in iostats_context.
Also add a parameter of db_bench.
Test Plan: Add a unit test. Also manually verify LOG outputs in db_bench
Subscribers: leveldb, dhruba
Differential Revision: https://reviews.facebook.net/D44115
9 years ago
|
|
|
if (verify_next_comp_io_stats_) {
|
|
|
|
ASSERT_GT(ci.stats.file_write_nanos, 0);
|
|
|
|
ASSERT_GT(ci.stats.file_range_sync_nanos, 0);
|
|
|
|
ASSERT_GT(ci.stats.file_fsync_nanos, 0);
|
|
|
|
ASSERT_GT(ci.stats.file_prepare_write_nanos, 0);
|
|
|
|
verify_next_comp_io_stats_ = false;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
|
|
if (expected_stats_.size()) {
|
|
|
|
Verify(ci.stats, expected_stats_.front());
|
|
|
|
expected_stats_.pop();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// A helper function which verifies whether two CompactionJobStats
|
|
|
|
// match. The verification of all compaction stats are done by
|
|
|
|
// ASSERT_EQ except for the total input / output bytes, which we
|
|
|
|
// use ASSERT_GE and ASSERT_LE with a reasonable bias ---
|
|
|
|
// 10% in uncompressed case and 20% when compression is used.
|
|
|
|
virtual void Verify(const CompactionJobStats& current_stats,
|
|
|
|
const CompactionJobStats& stats) {
|
|
|
|
// time
|
|
|
|
ASSERT_GT(current_stats.elapsed_micros, 0U);
|
|
|
|
|
|
|
|
ASSERT_EQ(current_stats.num_input_records, stats.num_input_records);
|
|
|
|
ASSERT_EQ(current_stats.num_input_files, stats.num_input_files);
|
|
|
|
ASSERT_EQ(current_stats.num_input_files_at_output_level,
|
|
|
|
stats.num_input_files_at_output_level);
|
|
|
|
|
|
|
|
ASSERT_EQ(current_stats.num_output_records, stats.num_output_records);
|
|
|
|
ASSERT_EQ(current_stats.num_output_files, stats.num_output_files);
|
|
|
|
|
|
|
|
ASSERT_EQ(current_stats.is_full_compaction, stats.is_full_compaction);
|
|
|
|
ASSERT_EQ(current_stats.is_manual_compaction, stats.is_manual_compaction);
|
|
|
|
|
|
|
|
// file size
|
|
|
|
double kFileSizeBias = compression_enabled_ ? 0.20 : 0.10;
|
|
|
|
ASSERT_GE(current_stats.total_input_bytes * (1.00 + kFileSizeBias),
|
|
|
|
stats.total_input_bytes);
|
|
|
|
ASSERT_LE(current_stats.total_input_bytes,
|
|
|
|
stats.total_input_bytes * (1.00 + kFileSizeBias));
|
|
|
|
ASSERT_GE(current_stats.total_output_bytes * (1.00 + kFileSizeBias),
|
|
|
|
stats.total_output_bytes);
|
|
|
|
ASSERT_LE(current_stats.total_output_bytes,
|
|
|
|
stats.total_output_bytes * (1.00 + kFileSizeBias));
|
|
|
|
ASSERT_EQ(current_stats.total_input_raw_key_bytes,
|
|
|
|
stats.total_input_raw_key_bytes);
|
|
|
|
ASSERT_EQ(current_stats.total_input_raw_value_bytes,
|
|
|
|
stats.total_input_raw_value_bytes);
|
|
|
|
|
|
|
|
ASSERT_EQ(current_stats.num_records_replaced, stats.num_records_replaced);
|
|
|
|
|
|
|
|
ASSERT_EQ(current_stats.num_corrupt_keys, stats.num_corrupt_keys);
|
|
|
|
|
|
|
|
ASSERT_EQ(std::string(current_stats.smallest_output_key_prefix),
|
|
|
|
std::string(stats.smallest_output_key_prefix));
|
|
|
|
ASSERT_EQ(std::string(current_stats.largest_output_key_prefix),
|
|
|
|
std::string(stats.largest_output_key_prefix));
|
|
|
|
}
|
|
|
|
|
|
|
|
// Add an expected compaction stats, which will be used to
|
|
|
|
// verify the CompactionJobStats returned by the OnCompactionCompleted()
|
|
|
|
// callback.
|
|
|
|
void AddExpectedStats(const CompactionJobStats& stats) {
|
|
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
|
|
expected_stats_.push(stats);
|
|
|
|
}
|
|
|
|
|
|
|
|
void EnableCompression(bool flag) { compression_enabled_ = flag; }
|
|
|
|
|
Add options.compaction_measure_io_stats to print write I/O stats in compactions
Summary:
Add options.compaction_measure_io_stats to print out / pass to listener accumulated time spent on write calls. Example outputs in info logs:
2015/08/12-16:27:59.463944 7fd428bff700 (Original Log Time 2015/08/12-16:27:59.463922) EVENT_LOG_v1 {"time_micros": 1439422079463897, "job": 6, "event": "compaction_finished", "output_level": 1, "num_output_files": 4, "total_output_size": 6900525, "num_input_records": 111483, "num_output_records": 106877, "file_write_nanos": 15663206, "file_range_sync_nanos": 649588, "file_fsync_nanos": 349614797, "file_prepare_write_nanos": 1505812, "lsm_state": [2, 4, 0, 0, 0, 0, 0]}
Add two more counters in iostats_context.
Also add a parameter of db_bench.
Test Plan: Add a unit test. Also manually verify LOG outputs in db_bench
Subscribers: leveldb, dhruba
Differential Revision: https://reviews.facebook.net/D44115
9 years ago
|
|
|
bool verify_next_comp_io_stats() const { return verify_next_comp_io_stats_; }
|
|
|
|
|
|
|
|
private:
|
|
|
|
std::mutex mutex_;
|
|
|
|
std::queue<CompactionJobStats> expected_stats_;
|
|
|
|
bool compression_enabled_;
|
Add options.compaction_measure_io_stats to print write I/O stats in compactions
Summary:
Add options.compaction_measure_io_stats to print out / pass to listener accumulated time spent on write calls. Example outputs in info logs:
2015/08/12-16:27:59.463944 7fd428bff700 (Original Log Time 2015/08/12-16:27:59.463922) EVENT_LOG_v1 {"time_micros": 1439422079463897, "job": 6, "event": "compaction_finished", "output_level": 1, "num_output_files": 4, "total_output_size": 6900525, "num_input_records": 111483, "num_output_records": 106877, "file_write_nanos": 15663206, "file_range_sync_nanos": 649588, "file_fsync_nanos": 349614797, "file_prepare_write_nanos": 1505812, "lsm_state": [2, 4, 0, 0, 0, 0, 0]}
Add two more counters in iostats_context.
Also add a parameter of db_bench.
Test Plan: Add a unit test. Also manually verify LOG outputs in db_bench
Subscribers: leveldb, dhruba
Differential Revision: https://reviews.facebook.net/D44115
9 years ago
|
|
|
bool verify_next_comp_io_stats_;
|
|
|
|
};
|
|
|
|
|
|
|
|
// An EventListener which helps verify the compaction statistics in
|
|
|
|
// the test DeletionStatsTest.
|
|
|
|
class CompactionJobDeletionStatsChecker : public CompactionJobStatsChecker {
|
|
|
|
public:
|
|
|
|
// Verifies whether two CompactionJobStats match.
|
|
|
|
void Verify(const CompactionJobStats& current_stats,
|
|
|
|
const CompactionJobStats& stats) override {
|
|
|
|
ASSERT_EQ(current_stats.num_input_deletion_records,
|
|
|
|
stats.num_input_deletion_records);
|
|
|
|
ASSERT_EQ(current_stats.num_expired_deletion_records,
|
|
|
|
stats.num_expired_deletion_records);
|
|
|
|
ASSERT_EQ(current_stats.num_records_replaced, stats.num_records_replaced);
|
|
|
|
|
|
|
|
ASSERT_EQ(current_stats.num_corrupt_keys, stats.num_corrupt_keys);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
namespace {
|
|
|
|
|
|
|
|
uint64_t EstimatedFileSize(uint64_t num_records, size_t key_size,
|
|
|
|
size_t value_size, double compression_ratio = 1.0,
|
|
|
|
size_t block_size = 4096,
|
|
|
|
int bloom_bits_per_key = 10) {
|
|
|
|
const size_t kPerKeyOverhead = 8;
|
|
|
|
const size_t kFooterSize = 512;
|
|
|
|
|
|
|
|
uint64_t data_size = static_cast<uint64_t>(
|
|
|
|
num_records *
|
|
|
|
(key_size + value_size * compression_ratio + kPerKeyOverhead));
|
|
|
|
|
|
|
|
return data_size + kFooterSize +
|
|
|
|
num_records * bloom_bits_per_key / 8 // filter block
|
|
|
|
+ data_size * (key_size + 8) / block_size; // index block
|
|
|
|
}
|
|
|
|
|
|
|
|
namespace {
|
|
|
|
|
|
|
|
void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) {
|
|
|
|
assert(prefix_length > 0);
|
|
|
|
size_t length = src.size() > prefix_length ? prefix_length : src.size();
|
|
|
|
dst->assign(src.data(), length);
|
|
|
|
}
|
|
|
|
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
CompactionJobStats NewManualCompactionJobStats(
|
|
|
|
const std::string& smallest_key, const std::string& largest_key,
|
|
|
|
size_t num_input_files, size_t num_input_files_at_output_level,
|
|
|
|
uint64_t num_input_records, size_t key_size, size_t value_size,
|
|
|
|
size_t num_output_files, uint64_t num_output_records,
|
|
|
|
double compression_ratio, uint64_t num_records_replaced,
|
|
|
|
bool is_full = false, bool is_manual = true) {
|
|
|
|
CompactionJobStats stats;
|
|
|
|
stats.Reset();
|
|
|
|
|
|
|
|
stats.num_input_records = num_input_records;
|
|
|
|
stats.num_input_files = num_input_files;
|
|
|
|
stats.num_input_files_at_output_level = num_input_files_at_output_level;
|
|
|
|
|
|
|
|
stats.num_output_records = num_output_records;
|
|
|
|
stats.num_output_files = num_output_files;
|
|
|
|
|
|
|
|
stats.total_input_bytes =
|
|
|
|
EstimatedFileSize(num_input_records / num_input_files, key_size,
|
|
|
|
value_size, compression_ratio) *
|
|
|
|
num_input_files;
|
|
|
|
stats.total_output_bytes =
|
|
|
|
EstimatedFileSize(num_output_records / num_output_files, key_size,
|
|
|
|
value_size, compression_ratio) *
|
|
|
|
num_output_files;
|
|
|
|
stats.total_input_raw_key_bytes = num_input_records * (key_size + 8);
|
|
|
|
stats.total_input_raw_value_bytes = num_input_records * value_size;
|
|
|
|
|
|
|
|
stats.is_full_compaction = is_full;
|
|
|
|
stats.is_manual_compaction = is_manual;
|
|
|
|
|
|
|
|
stats.num_records_replaced = num_records_replaced;
|
|
|
|
|
|
|
|
CopyPrefix(smallest_key, CompactionJobStats::kMaxPrefixLength,
|
|
|
|
&stats.smallest_output_key_prefix);
|
|
|
|
CopyPrefix(largest_key, CompactionJobStats::kMaxPrefixLength,
|
|
|
|
&stats.largest_output_key_prefix);
|
|
|
|
|
|
|
|
return stats;
|
|
|
|
}
|
|
|
|
|
|
|
|
CompressionType GetAnyCompression() {
|
|
|
|
if (Snappy_Supported()) {
|
|
|
|
return kSnappyCompression;
|
|
|
|
} else if (Zlib_Supported()) {
|
|
|
|
return kZlibCompression;
|
|
|
|
} else if (BZip2_Supported()) {
|
|
|
|
return kBZip2Compression;
|
|
|
|
} else if (LZ4_Supported()) {
|
|
|
|
return kLZ4Compression;
|
|
|
|
} else if (XPRESS_Supported()) {
|
|
|
|
return kXpressCompression;
|
|
|
|
}
|
|
|
|
|
|
|
|
return kNoCompression;
|
|
|
|
}
|
|
|
|
|
|
|
|
} // namespace
|
|
|
|
|
Parallelize L0-L1 Compaction: Restructure Compaction Job
Summary:
As of now compactions involving files from Level 0 and Level 1 are single
threaded because the files in L0, although sorted, are not range partitioned like
the other levels. This means that during L0-L1 compaction each file from L1
needs to be merged with potentially all the files from L0.
This attempt to parallelize the L0-L1 compaction assigns a thread and a
corresponding iterator to each L1 file that then considers only the key range
found in that L1 file and only the L0 files that have those keys (and only the
specific portion of those L0 files in which those keys are found). In this way
the overlap is minimized and potentially eliminated between different iterators
focusing on the same files.
The first step is to restructure the compaction logic to break L0-L1 compactions
into multiple, smaller, sequential compactions. Eventually each of these smaller
jobs will be run simultaneously. Areas to pay extra attention to are
# Correct aggregation of compaction job statistics across multiple threads
# Proper opening/closing of output files (make sure each thread's is unique)
# Keys that span multiple L1 files
# Skewed distributions of keys within L0 files
Test Plan: Make and run db_test (newer version has separate compaction tests) and compaction_job_stats_test
Reviewers: igor, noetzli, anthony, sdong, yhchiang
Reviewed By: yhchiang
Subscribers: MarkCallaghan, dhruba, leveldb
Differential Revision: https://reviews.facebook.net/D42699
9 years ago
|
|
|
TEST_P(CompactionJobStatsTest, CompactionJobStatsTest) {
|
|
|
|
Random rnd(301);
|
|
|
|
const int kBufSize = 100;
|
|
|
|
char buf[kBufSize];
|
|
|
|
uint64_t key_base = 100000000l;
|
|
|
|
// Note: key_base must be multiple of num_keys_per_L0_file
|
|
|
|
int num_keys_per_L0_file = 100;
|
|
|
|
const int kTestScale = 8;
|
|
|
|
const int kKeySize = 10;
|
|
|
|
const int kValueSize = 1000;
|
|
|
|
const double kCompressionRatio = 0.5;
|
|
|
|
double compression_ratio = 1.0;
|
|
|
|
uint64_t key_interval = key_base / num_keys_per_L0_file;
|
|
|
|
|
|
|
|
// Whenever a compaction completes, this listener will try to
|
|
|
|
// verify whether the returned CompactionJobStats matches
|
|
|
|
// what we expect. The expected CompactionJobStats is added
|
|
|
|
// via AddExpectedStats().
|
|
|
|
auto* stats_checker = new CompactionJobStatsChecker();
|
|
|
|
Options options;
|
|
|
|
options.level_compaction_dynamic_level_bytes = false;
|
|
|
|
options.listeners.emplace_back(stats_checker);
|
|
|
|
options.create_if_missing = true;
|
|
|
|
// just enough setting to hold off auto-compaction.
|
|
|
|
options.level0_file_num_compaction_trigger = kTestScale + 1;
|
|
|
|
options.num_levels = 3;
|
|
|
|
options.compression = kNoCompression;
|
|
|
|
options.max_subcompactions = max_subcompactions_;
|
Add options.compaction_measure_io_stats to print write I/O stats in compactions
Summary:
Add options.compaction_measure_io_stats to print out / pass to listener accumulated time spent on write calls. Example outputs in info logs:
2015/08/12-16:27:59.463944 7fd428bff700 (Original Log Time 2015/08/12-16:27:59.463922) EVENT_LOG_v1 {"time_micros": 1439422079463897, "job": 6, "event": "compaction_finished", "output_level": 1, "num_output_files": 4, "total_output_size": 6900525, "num_input_records": 111483, "num_output_records": 106877, "file_write_nanos": 15663206, "file_range_sync_nanos": 649588, "file_fsync_nanos": 349614797, "file_prepare_write_nanos": 1505812, "lsm_state": [2, 4, 0, 0, 0, 0, 0]}
Add two more counters in iostats_context.
Also add a parameter of db_bench.
Test Plan: Add a unit test. Also manually verify LOG outputs in db_bench
Subscribers: leveldb, dhruba
Differential Revision: https://reviews.facebook.net/D44115
9 years ago
|
|
|
options.bytes_per_sync = 512 * 1024;
|
|
|
|
|
|
|
|
options.report_bg_io_stats = true;
|
|
|
|
for (int test = 0; test < 2; ++test) {
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
|
|
|
|
|
|
// 1st Phase: generate "num_L0_files" L0 files.
|
|
|
|
int num_L0_files = 0;
|
|
|
|
for (uint64_t start_key = key_base; start_key <= key_base * kTestScale;
|
|
|
|
start_key += key_base) {
|
|
|
|
MakeTableWithKeyValues(&rnd, start_key, start_key + key_base - 1,
|
|
|
|
kKeySize, kValueSize, key_interval,
|
|
|
|
compression_ratio, 1);
|
|
|
|
snprintf(buf, kBufSize, "%d", ++num_L0_files);
|
|
|
|
ASSERT_EQ(std::string(buf), FilesPerLevel(1));
|
|
|
|
}
|
|
|
|
ASSERT_EQ(std::to_string(num_L0_files), FilesPerLevel(1));
|
|
|
|
|
|
|
|
// 2nd Phase: perform L0 -> L1 compaction.
|
|
|
|
int L0_compaction_count = 6;
|
|
|
|
int count = 1;
|
|
|
|
std::string smallest_key;
|
|
|
|
std::string largest_key;
|
|
|
|
for (uint64_t start_key = key_base;
|
|
|
|
start_key <= key_base * L0_compaction_count;
|
|
|
|
start_key += key_base, count++) {
|
|
|
|
smallest_key = Key(start_key, 10);
|
|
|
|
largest_key = Key(start_key + key_base - key_interval, 10);
|
|
|
|
stats_checker->AddExpectedStats(NewManualCompactionJobStats(
|
|
|
|
smallest_key, largest_key, 1, 0, num_keys_per_L0_file, kKeySize,
|
|
|
|
kValueSize, 1, num_keys_per_L0_file, compression_ratio, 0));
|
|
|
|
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
|
Allowing L0 -> L1 trivial move on sorted data
Summary:
This diff updates the logic of how we do trivial move, now trivial move can run on any number of files in input level as long as they are not overlapping
The conditions for trivial move have been updated
Introduced conditions:
- Trivial move cannot happen if we have a compaction filter (except if the compaction is not manual)
- Input level files cannot be overlapping
Removed conditions:
- Trivial move only run when the compaction is not manual
- Input level should can contain only 1 file
More context on what tests failed because of Trivial move
```
DBTest.CompactionsGenerateMultipleFiles
This test is expecting compaction on a file in L0 to generate multiple files in L1, this test will fail with trivial move because we end up with one file in L1
```
```
DBTest.NoSpaceCompactRange
This test expect compaction to fail when we force environment to report running out of space, of course this is not valid in trivial move situation
because trivial move does not need any extra space, and did not check for that
```
```
DBTest.DropWrites
Similar to DBTest.NoSpaceCompactRange
```
```
DBTest.DeleteObsoleteFilesPendingOutputs
This test expect that a file in L2 is deleted after it's moved to L3, this is not valid with trivial move because although the file was moved it is now used by L3
```
```
CuckooTableDBTest.CompactionIntoMultipleFiles
Same as DBTest.CompactionsGenerateMultipleFiles
```
This diff is based on a work by @sdong https://reviews.facebook.net/D34149
Test Plan: make -j64 check
Reviewers: rven, sdong, igor
Reviewed By: igor
Subscribers: yhchiang, ott, march, dhruba, sdong
Differential Revision: https://reviews.facebook.net/D34797
10 years ago
|
|
|
TEST_Compact(0, 1, smallest_key, largest_key);
|
|
|
|
snprintf(buf, kBufSize, "%d,%d", num_L0_files - count, count);
|
|
|
|
ASSERT_EQ(std::string(buf), FilesPerLevel(1));
|
|
|
|
}
|
|
|
|
|
|
|
|
// compact two files into one in the last L0 -> L1 compaction
|
|
|
|
int num_remaining_L0 = num_L0_files - L0_compaction_count;
|
|
|
|
smallest_key = Key(key_base * (L0_compaction_count + 1), 10);
|
|
|
|
largest_key = Key(key_base * (kTestScale + 1) - key_interval, 10);
|
|
|
|
stats_checker->AddExpectedStats(NewManualCompactionJobStats(
|
|
|
|
smallest_key, largest_key, num_remaining_L0, 0,
|
|
|
|
num_keys_per_L0_file * num_remaining_L0, kKeySize, kValueSize, 1,
|
|
|
|
num_keys_per_L0_file * num_remaining_L0, compression_ratio, 0));
|
|
|
|
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
|
Allowing L0 -> L1 trivial move on sorted data
Summary:
This diff updates the logic of how we do trivial move, now trivial move can run on any number of files in input level as long as they are not overlapping
The conditions for trivial move have been updated
Introduced conditions:
- Trivial move cannot happen if we have a compaction filter (except if the compaction is not manual)
- Input level files cannot be overlapping
Removed conditions:
- Trivial move only run when the compaction is not manual
- Input level should can contain only 1 file
More context on what tests failed because of Trivial move
```
DBTest.CompactionsGenerateMultipleFiles
This test is expecting compaction on a file in L0 to generate multiple files in L1, this test will fail with trivial move because we end up with one file in L1
```
```
DBTest.NoSpaceCompactRange
This test expect compaction to fail when we force environment to report running out of space, of course this is not valid in trivial move situation
because trivial move does not need any extra space, and did not check for that
```
```
DBTest.DropWrites
Similar to DBTest.NoSpaceCompactRange
```
```
DBTest.DeleteObsoleteFilesPendingOutputs
This test expect that a file in L2 is deleted after it's moved to L3, this is not valid with trivial move because although the file was moved it is now used by L3
```
```
CuckooTableDBTest.CompactionIntoMultipleFiles
Same as DBTest.CompactionsGenerateMultipleFiles
```
This diff is based on a work by @sdong https://reviews.facebook.net/D34149
Test Plan: make -j64 check
Reviewers: rven, sdong, igor
Reviewed By: igor
Subscribers: yhchiang, ott, march, dhruba, sdong
Differential Revision: https://reviews.facebook.net/D34797
10 years ago
|
|
|
TEST_Compact(0, 1, smallest_key, largest_key);
|
|
|
|
|
|
|
|
int num_L1_files = num_L0_files - num_remaining_L0 + 1;
|
|
|
|
num_L0_files = 0;
|
|
|
|
snprintf(buf, kBufSize, "%d,%d", num_L0_files, num_L1_files);
|
|
|
|
ASSERT_EQ(std::string(buf), FilesPerLevel(1));
|
|
|
|
|
|
|
|
// 3rd Phase: generate sparse L0 files (wider key-range, same num of keys)
|
|
|
|
int sparseness = 2;
|
|
|
|
for (uint64_t start_key = key_base; start_key <= key_base * kTestScale;
|
|
|
|
start_key += key_base * sparseness) {
|
|
|
|
MakeTableWithKeyValues(
|
|
|
|
&rnd, start_key, start_key + key_base * sparseness - 1, kKeySize,
|
|
|
|
kValueSize, key_base * sparseness / num_keys_per_L0_file,
|
|
|
|
compression_ratio, 1);
|
|
|
|
snprintf(buf, kBufSize, "%d,%d", ++num_L0_files, num_L1_files);
|
|
|
|
ASSERT_EQ(std::string(buf), FilesPerLevel(1));
|
|
|
|
}
|
|
|
|
|
|
|
|
// 4th Phase: perform L0 -> L1 compaction again, expect higher write amp
|
Parallelize L0-L1 Compaction: Restructure Compaction Job
Summary:
As of now compactions involving files from Level 0 and Level 1 are single
threaded because the files in L0, although sorted, are not range partitioned like
the other levels. This means that during L0-L1 compaction each file from L1
needs to be merged with potentially all the files from L0.
This attempt to parallelize the L0-L1 compaction assigns a thread and a
corresponding iterator to each L1 file that then considers only the key range
found in that L1 file and only the L0 files that have those keys (and only the
specific portion of those L0 files in which those keys are found). In this way
the overlap is minimized and potentially eliminated between different iterators
focusing on the same files.
The first step is to restructure the compaction logic to break L0-L1 compactions
into multiple, smaller, sequential compactions. Eventually each of these smaller
jobs will be run simultaneously. Areas to pay extra attention to are
# Correct aggregation of compaction job statistics across multiple threads
# Proper opening/closing of output files (make sure each thread's is unique)
# Keys that span multiple L1 files
# Skewed distributions of keys within L0 files
Test Plan: Make and run db_test (newer version has separate compaction tests) and compaction_job_stats_test
Reviewers: igor, noetzli, anthony, sdong, yhchiang
Reviewed By: yhchiang
Subscribers: MarkCallaghan, dhruba, leveldb
Differential Revision: https://reviews.facebook.net/D42699
9 years ago
|
|
|
// When subcompactions are enabled, the number of output files increases
|
|
|
|
// by 1 because multiple threads are consuming the input and generating
|
|
|
|
// output files without coordinating to see if the output could fit into
|
|
|
|
// a smaller number of files like it does when it runs sequentially
|
|
|
|
int num_output_files = options.max_subcompactions > 1 ? 2 : 1;
|
|
|
|
for (uint64_t start_key = key_base; num_L0_files > 1;
|
|
|
|
start_key += key_base * sparseness) {
|
|
|
|
smallest_key = Key(start_key, 10);
|
|
|
|
largest_key = Key(start_key + key_base * sparseness - key_interval, 10);
|
|
|
|
stats_checker->AddExpectedStats(NewManualCompactionJobStats(
|
|
|
|
smallest_key, largest_key, 3, 2, num_keys_per_L0_file * 3, kKeySize,
|
|
|
|
kValueSize, num_output_files,
|
|
|
|
num_keys_per_L0_file * 2, // 1/3 of the data will be updated.
|
|
|
|
compression_ratio, num_keys_per_L0_file));
|
|
|
|
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
|
|
|
|
Compact(1, smallest_key, largest_key);
|
|
|
|
if (options.max_subcompactions == 1) {
|
Parallelize L0-L1 Compaction: Restructure Compaction Job
Summary:
As of now compactions involving files from Level 0 and Level 1 are single
threaded because the files in L0, although sorted, are not range partitioned like
the other levels. This means that during L0-L1 compaction each file from L1
needs to be merged with potentially all the files from L0.
This attempt to parallelize the L0-L1 compaction assigns a thread and a
corresponding iterator to each L1 file that then considers only the key range
found in that L1 file and only the L0 files that have those keys (and only the
specific portion of those L0 files in which those keys are found). In this way
the overlap is minimized and potentially eliminated between different iterators
focusing on the same files.
The first step is to restructure the compaction logic to break L0-L1 compactions
into multiple, smaller, sequential compactions. Eventually each of these smaller
jobs will be run simultaneously. Areas to pay extra attention to are
# Correct aggregation of compaction job statistics across multiple threads
# Proper opening/closing of output files (make sure each thread's is unique)
# Keys that span multiple L1 files
# Skewed distributions of keys within L0 files
Test Plan: Make and run db_test (newer version has separate compaction tests) and compaction_job_stats_test
Reviewers: igor, noetzli, anthony, sdong, yhchiang
Reviewed By: yhchiang
Subscribers: MarkCallaghan, dhruba, leveldb
Differential Revision: https://reviews.facebook.net/D42699
9 years ago
|
|
|
--num_L1_files;
|
|
|
|
}
|
|
|
|
snprintf(buf, kBufSize, "%d,%d", --num_L0_files, num_L1_files);
|
|
|
|
ASSERT_EQ(std::string(buf), FilesPerLevel(1));
|
|
|
|
}
|
|
|
|
|
|
|
|
// 5th Phase: Do a full compaction, which involves in two sub-compactions.
|
|
|
|
// Here we expect to have 1 L0 files and 4 L1 files
|
|
|
|
// In the first sub-compaction, we expect L0 compaction.
|
|
|
|
smallest_key = Key(key_base, 10);
|
|
|
|
largest_key = Key(key_base * (kTestScale + 1) - key_interval, 10);
|
|
|
|
stats_checker->AddExpectedStats(NewManualCompactionJobStats(
|
|
|
|
Key(key_base * (kTestScale + 1 - sparseness), 10), largest_key, 2, 1,
|
|
|
|
num_keys_per_L0_file * 3, kKeySize, kValueSize, 1,
|
|
|
|
num_keys_per_L0_file * 2, compression_ratio, num_keys_per_L0_file));
|
|
|
|
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
|
|
|
|
Compact(1, smallest_key, largest_key);
|
Add options.compaction_measure_io_stats to print write I/O stats in compactions
Summary:
Add options.compaction_measure_io_stats to print out / pass to listener accumulated time spent on write calls. Example outputs in info logs:
2015/08/12-16:27:59.463944 7fd428bff700 (Original Log Time 2015/08/12-16:27:59.463922) EVENT_LOG_v1 {"time_micros": 1439422079463897, "job": 6, "event": "compaction_finished", "output_level": 1, "num_output_files": 4, "total_output_size": 6900525, "num_input_records": 111483, "num_output_records": 106877, "file_write_nanos": 15663206, "file_range_sync_nanos": 649588, "file_fsync_nanos": 349614797, "file_prepare_write_nanos": 1505812, "lsm_state": [2, 4, 0, 0, 0, 0, 0]}
Add two more counters in iostats_context.
Also add a parameter of db_bench.
Test Plan: Add a unit test. Also manually verify LOG outputs in db_bench
Subscribers: leveldb, dhruba
Differential Revision: https://reviews.facebook.net/D44115
9 years ago
|
|
|
|
|
|
|
num_L1_files = options.max_subcompactions > 1 ? 7 : 4;
|
Parallelize L0-L1 Compaction: Restructure Compaction Job
Summary:
As of now compactions involving files from Level 0 and Level 1 are single
threaded because the files in L0, although sorted, are not range partitioned like
the other levels. This means that during L0-L1 compaction each file from L1
needs to be merged with potentially all the files from L0.
This attempt to parallelize the L0-L1 compaction assigns a thread and a
corresponding iterator to each L1 file that then considers only the key range
found in that L1 file and only the L0 files that have those keys (and only the
specific portion of those L0 files in which those keys are found). In this way
the overlap is minimized and potentially eliminated between different iterators
focusing on the same files.
The first step is to restructure the compaction logic to break L0-L1 compactions
into multiple, smaller, sequential compactions. Eventually each of these smaller
jobs will be run simultaneously. Areas to pay extra attention to are
# Correct aggregation of compaction job statistics across multiple threads
# Proper opening/closing of output files (make sure each thread's is unique)
# Keys that span multiple L1 files
# Skewed distributions of keys within L0 files
Test Plan: Make and run db_test (newer version has separate compaction tests) and compaction_job_stats_test
Reviewers: igor, noetzli, anthony, sdong, yhchiang
Reviewed By: yhchiang
Subscribers: MarkCallaghan, dhruba, leveldb
Differential Revision: https://reviews.facebook.net/D42699
9 years ago
|
|
|
char L1_buf[4];
|
|
|
|
snprintf(L1_buf, sizeof(L1_buf), "0,%d", num_L1_files);
|
|
|
|
std::string L1_files(L1_buf);
|
|
|
|
ASSERT_EQ(L1_files, FilesPerLevel(1));
|
|
|
|
options.compression = GetAnyCompression();
|
|
|
|
if (options.compression == kNoCompression) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
stats_checker->EnableCompression(true);
|
|
|
|
compression_ratio = kCompressionRatio;
|
Add options.compaction_measure_io_stats to print write I/O stats in compactions
Summary:
Add options.compaction_measure_io_stats to print out / pass to listener accumulated time spent on write calls. Example outputs in info logs:
2015/08/12-16:27:59.463944 7fd428bff700 (Original Log Time 2015/08/12-16:27:59.463922) EVENT_LOG_v1 {"time_micros": 1439422079463897, "job": 6, "event": "compaction_finished", "output_level": 1, "num_output_files": 4, "total_output_size": 6900525, "num_input_records": 111483, "num_output_records": 106877, "file_write_nanos": 15663206, "file_range_sync_nanos": 649588, "file_fsync_nanos": 349614797, "file_prepare_write_nanos": 1505812, "lsm_state": [2, 4, 0, 0, 0, 0, 0]}
Add two more counters in iostats_context.
Also add a parameter of db_bench.
Test Plan: Add a unit test. Also manually verify LOG outputs in db_bench
Subscribers: leveldb, dhruba
Differential Revision: https://reviews.facebook.net/D44115
9 years ago
|
|
|
|
|
|
|
for (int i = 0; i < 5; i++) {
|
|
|
|
ASSERT_OK(Put(1, Slice(Key(key_base + i, 10)),
|
|
|
|
Slice(RandomString(&rnd, 512 * 1024, 1))));
|
|
|
|
}
|
|
|
|
|
|
|
|
ASSERT_OK(Flush(1));
|
|
|
|
ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_WaitForCompact());
|
Add options.compaction_measure_io_stats to print write I/O stats in compactions
Summary:
Add options.compaction_measure_io_stats to print out / pass to listener accumulated time spent on write calls. Example outputs in info logs:
2015/08/12-16:27:59.463944 7fd428bff700 (Original Log Time 2015/08/12-16:27:59.463922) EVENT_LOG_v1 {"time_micros": 1439422079463897, "job": 6, "event": "compaction_finished", "output_level": 1, "num_output_files": 4, "total_output_size": 6900525, "num_input_records": 111483, "num_output_records": 106877, "file_write_nanos": 15663206, "file_range_sync_nanos": 649588, "file_fsync_nanos": 349614797, "file_prepare_write_nanos": 1505812, "lsm_state": [2, 4, 0, 0, 0, 0, 0]}
Add two more counters in iostats_context.
Also add a parameter of db_bench.
Test Plan: Add a unit test. Also manually verify LOG outputs in db_bench
Subscribers: leveldb, dhruba
Differential Revision: https://reviews.facebook.net/D44115
9 years ago
|
|
|
|
|
|
|
stats_checker->set_verify_next_comp_io_stats(true);
|
|
|
|
std::atomic<bool> first_prepare_write(true);
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
|
|
"WritableFileWriter::Append:BeforePrepareWrite", [&](void* /*arg*/) {
|
Add options.compaction_measure_io_stats to print write I/O stats in compactions
Summary:
Add options.compaction_measure_io_stats to print out / pass to listener accumulated time spent on write calls. Example outputs in info logs:
2015/08/12-16:27:59.463944 7fd428bff700 (Original Log Time 2015/08/12-16:27:59.463922) EVENT_LOG_v1 {"time_micros": 1439422079463897, "job": 6, "event": "compaction_finished", "output_level": 1, "num_output_files": 4, "total_output_size": 6900525, "num_input_records": 111483, "num_output_records": 106877, "file_write_nanos": 15663206, "file_range_sync_nanos": 649588, "file_fsync_nanos": 349614797, "file_prepare_write_nanos": 1505812, "lsm_state": [2, 4, 0, 0, 0, 0, 0]}
Add two more counters in iostats_context.
Also add a parameter of db_bench.
Test Plan: Add a unit test. Also manually verify LOG outputs in db_bench
Subscribers: leveldb, dhruba
Differential Revision: https://reviews.facebook.net/D44115
9 years ago
|
|
|
if (first_prepare_write.load()) {
|
|
|
|
options.env->SleepForMicroseconds(3);
|
|
|
|
first_prepare_write.store(false);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
std::atomic<bool> first_flush(true);
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
|
|
"WritableFileWriter::Flush:BeforeAppend", [&](void* /*arg*/) {
|
Add options.compaction_measure_io_stats to print write I/O stats in compactions
Summary:
Add options.compaction_measure_io_stats to print out / pass to listener accumulated time spent on write calls. Example outputs in info logs:
2015/08/12-16:27:59.463944 7fd428bff700 (Original Log Time 2015/08/12-16:27:59.463922) EVENT_LOG_v1 {"time_micros": 1439422079463897, "job": 6, "event": "compaction_finished", "output_level": 1, "num_output_files": 4, "total_output_size": 6900525, "num_input_records": 111483, "num_output_records": 106877, "file_write_nanos": 15663206, "file_range_sync_nanos": 649588, "file_fsync_nanos": 349614797, "file_prepare_write_nanos": 1505812, "lsm_state": [2, 4, 0, 0, 0, 0, 0]}
Add two more counters in iostats_context.
Also add a parameter of db_bench.
Test Plan: Add a unit test. Also manually verify LOG outputs in db_bench
Subscribers: leveldb, dhruba
Differential Revision: https://reviews.facebook.net/D44115
9 years ago
|
|
|
if (first_flush.load()) {
|
|
|
|
options.env->SleepForMicroseconds(3);
|
|
|
|
first_flush.store(false);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
std::atomic<bool> first_sync(true);
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
|
|
"WritableFileWriter::SyncInternal:0", [&](void* /*arg*/) {
|
Add options.compaction_measure_io_stats to print write I/O stats in compactions
Summary:
Add options.compaction_measure_io_stats to print out / pass to listener accumulated time spent on write calls. Example outputs in info logs:
2015/08/12-16:27:59.463944 7fd428bff700 (Original Log Time 2015/08/12-16:27:59.463922) EVENT_LOG_v1 {"time_micros": 1439422079463897, "job": 6, "event": "compaction_finished", "output_level": 1, "num_output_files": 4, "total_output_size": 6900525, "num_input_records": 111483, "num_output_records": 106877, "file_write_nanos": 15663206, "file_range_sync_nanos": 649588, "file_fsync_nanos": 349614797, "file_prepare_write_nanos": 1505812, "lsm_state": [2, 4, 0, 0, 0, 0, 0]}
Add two more counters in iostats_context.
Also add a parameter of db_bench.
Test Plan: Add a unit test. Also manually verify LOG outputs in db_bench
Subscribers: leveldb, dhruba
Differential Revision: https://reviews.facebook.net/D44115
9 years ago
|
|
|
if (first_sync.load()) {
|
|
|
|
options.env->SleepForMicroseconds(3);
|
|
|
|
first_sync.store(false);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
std::atomic<bool> first_range_sync(true);
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
|
|
"WritableFileWriter::RangeSync:0", [&](void* /*arg*/) {
|
Add options.compaction_measure_io_stats to print write I/O stats in compactions
Summary:
Add options.compaction_measure_io_stats to print out / pass to listener accumulated time spent on write calls. Example outputs in info logs:
2015/08/12-16:27:59.463944 7fd428bff700 (Original Log Time 2015/08/12-16:27:59.463922) EVENT_LOG_v1 {"time_micros": 1439422079463897, "job": 6, "event": "compaction_finished", "output_level": 1, "num_output_files": 4, "total_output_size": 6900525, "num_input_records": 111483, "num_output_records": 106877, "file_write_nanos": 15663206, "file_range_sync_nanos": 649588, "file_fsync_nanos": 349614797, "file_prepare_write_nanos": 1505812, "lsm_state": [2, 4, 0, 0, 0, 0, 0]}
Add two more counters in iostats_context.
Also add a parameter of db_bench.
Test Plan: Add a unit test. Also manually verify LOG outputs in db_bench
Subscribers: leveldb, dhruba
Differential Revision: https://reviews.facebook.net/D44115
9 years ago
|
|
|
if (first_range_sync.load()) {
|
|
|
|
options.env->SleepForMicroseconds(3);
|
|
|
|
first_range_sync.store(false);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
Add options.compaction_measure_io_stats to print write I/O stats in compactions
Summary:
Add options.compaction_measure_io_stats to print out / pass to listener accumulated time spent on write calls. Example outputs in info logs:
2015/08/12-16:27:59.463944 7fd428bff700 (Original Log Time 2015/08/12-16:27:59.463922) EVENT_LOG_v1 {"time_micros": 1439422079463897, "job": 6, "event": "compaction_finished", "output_level": 1, "num_output_files": 4, "total_output_size": 6900525, "num_input_records": 111483, "num_output_records": 106877, "file_write_nanos": 15663206, "file_range_sync_nanos": 649588, "file_fsync_nanos": 349614797, "file_prepare_write_nanos": 1505812, "lsm_state": [2, 4, 0, 0, 0, 0, 0]}
Add two more counters in iostats_context.
Also add a parameter of db_bench.
Test Plan: Add a unit test. Also manually verify LOG outputs in db_bench
Subscribers: leveldb, dhruba
Differential Revision: https://reviews.facebook.net/D44115
9 years ago
|
|
|
|
|
|
|
Compact(1, smallest_key, largest_key);
|
|
|
|
|
|
|
|
ASSERT_TRUE(!stats_checker->verify_next_comp_io_stats());
|
|
|
|
ASSERT_TRUE(!first_prepare_write.load());
|
|
|
|
ASSERT_TRUE(!first_flush.load());
|
|
|
|
ASSERT_TRUE(!first_sync.load());
|
|
|
|
ASSERT_TRUE(!first_range_sync.load());
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
|
|
|
}
|
|
|
|
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
|
|
|
|
}
|
|
|
|
|
Parallelize L0-L1 Compaction: Restructure Compaction Job
Summary:
As of now compactions involving files from Level 0 and Level 1 are single
threaded because the files in L0, although sorted, are not range partitioned like
the other levels. This means that during L0-L1 compaction each file from L1
needs to be merged with potentially all the files from L0.
This attempt to parallelize the L0-L1 compaction assigns a thread and a
corresponding iterator to each L1 file that then considers only the key range
found in that L1 file and only the L0 files that have those keys (and only the
specific portion of those L0 files in which those keys are found). In this way
the overlap is minimized and potentially eliminated between different iterators
focusing on the same files.
The first step is to restructure the compaction logic to break L0-L1 compactions
into multiple, smaller, sequential compactions. Eventually each of these smaller
jobs will be run simultaneously. Areas to pay extra attention to are
# Correct aggregation of compaction job statistics across multiple threads
# Proper opening/closing of output files (make sure each thread's is unique)
# Keys that span multiple L1 files
# Skewed distributions of keys within L0 files
Test Plan: Make and run db_test (newer version has separate compaction tests) and compaction_job_stats_test
Reviewers: igor, noetzli, anthony, sdong, yhchiang
Reviewed By: yhchiang
Subscribers: MarkCallaghan, dhruba, leveldb
Differential Revision: https://reviews.facebook.net/D42699
9 years ago
|
|
|
TEST_P(CompactionJobStatsTest, DeletionStatsTest) {
|
|
|
|
Random rnd(301);
|
|
|
|
uint64_t key_base = 100000l;
|
|
|
|
// Note: key_base must be multiple of num_keys_per_L0_file
|
|
|
|
int num_keys_per_L0_file = 20;
|
|
|
|
const int kTestScale = 8; // make sure this is even
|
|
|
|
const int kKeySize = 10;
|
|
|
|
const int kValueSize = 100;
|
|
|
|
double compression_ratio = 1.0;
|
|
|
|
uint64_t key_interval = key_base / num_keys_per_L0_file;
|
|
|
|
uint64_t largest_key_num = key_base * (kTestScale + 1) - key_interval;
|
|
|
|
uint64_t cutoff_key_num = key_base * (kTestScale / 2 + 1) - key_interval;
|
|
|
|
const std::string smallest_key = Key(key_base - 10, kKeySize);
|
|
|
|
const std::string largest_key = Key(largest_key_num + 10, kKeySize);
|
|
|
|
|
|
|
|
// Whenever a compaction completes, this listener will try to
|
|
|
|
// verify whether the returned CompactionJobStats matches
|
|
|
|
// what we expect.
|
|
|
|
auto* stats_checker = new CompactionJobDeletionStatsChecker();
|
|
|
|
Options options;
|
|
|
|
options.level_compaction_dynamic_level_bytes = false;
|
|
|
|
options.listeners.emplace_back(stats_checker);
|
|
|
|
options.create_if_missing = true;
|
|
|
|
options.level0_file_num_compaction_trigger = kTestScale + 1;
|
|
|
|
options.num_levels = 3;
|
|
|
|
options.compression = kNoCompression;
|
|
|
|
options.max_bytes_for_level_multiplier = 2;
|
|
|
|
options.max_subcompactions = max_subcompactions_;
|
|
|
|
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
|
|
|
|
|
|
// Stage 1: Generate several L0 files and then send them to L2 by
|
|
|
|
// using CompactRangeOptions and CompactRange(). These files will
|
|
|
|
// have a strict subset of the keys from the full key-range
|
|
|
|
for (uint64_t start_key = key_base; start_key <= key_base * kTestScale / 2;
|
|
|
|
start_key += key_base) {
|
|
|
|
MakeTableWithKeyValues(&rnd, start_key, start_key + key_base - 1, kKeySize,
|
|
|
|
kValueSize, key_interval, compression_ratio, 1);
|
|
|
|
}
|
|
|
|
|
|
|
|
CompactRangeOptions cr_options;
|
|
|
|
cr_options.change_level = true;
|
|
|
|
cr_options.target_level = 2;
|
|
|
|
ASSERT_OK(db_->CompactRange(cr_options, handles_[1], nullptr, nullptr));
|
|
|
|
ASSERT_GT(NumTableFilesAtLevel(2, 1), 0);
|
|
|
|
|
|
|
|
// Stage 2: Generate files including keys from the entire key range
|
|
|
|
for (uint64_t start_key = key_base; start_key <= key_base * kTestScale;
|
|
|
|
start_key += key_base) {
|
|
|
|
MakeTableWithKeyValues(&rnd, start_key, start_key + key_base - 1, kKeySize,
|
|
|
|
kValueSize, key_interval, compression_ratio, 1);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Send these L0 files to L1
|
|
|
|
TEST_Compact(0, 1, smallest_key, largest_key);
|
|
|
|
ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
|
|
|
|
|
|
|
|
// Add a new record and flush so now there is a L0 file
|
|
|
|
// with a value too (not just deletions from the next step)
|
|
|
|
ASSERT_OK(Put(1, Key(key_base - 6, kKeySize), "test"));
|
|
|
|
ASSERT_OK(Flush(1));
|
|
|
|
|
|
|
|
// Stage 3: Generate L0 files with some deletions so now
|
|
|
|
// there are files with the same key range in L0, L1, and L2
|
|
|
|
int deletion_interval = 3;
|
|
|
|
CompactionJobStats first_compaction_stats;
|
|
|
|
SelectivelyDeleteKeys(key_base, largest_key_num, key_interval,
|
|
|
|
deletion_interval, kKeySize, cutoff_key_num,
|
|
|
|
&first_compaction_stats, 1);
|
|
|
|
|
|
|
|
stats_checker->AddExpectedStats(first_compaction_stats);
|
|
|
|
|
|
|
|
// Stage 4: Trigger compaction and verify the stats
|
|
|
|
TEST_Compact(0, 1, smallest_key, largest_key);
|
|
|
|
}
|
|
|
|
|
|
|
|
namespace {
|
|
|
|
int GetUniversalCompactionInputUnits(uint32_t num_flushes) {
|
|
|
|
uint32_t compaction_input_units;
|
|
|
|
for (compaction_input_units = 1; num_flushes >= compaction_input_units;
|
|
|
|
compaction_input_units *= 2) {
|
|
|
|
if ((num_flushes & compaction_input_units) != 0) {
|
|
|
|
return compaction_input_units > 1 ? compaction_input_units : 0;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
} // namespace
|
|
|
|
|
Parallelize L0-L1 Compaction: Restructure Compaction Job
Summary:
As of now compactions involving files from Level 0 and Level 1 are single
threaded because the files in L0, although sorted, are not range partitioned like
the other levels. This means that during L0-L1 compaction each file from L1
needs to be merged with potentially all the files from L0.
This attempt to parallelize the L0-L1 compaction assigns a thread and a
corresponding iterator to each L1 file that then considers only the key range
found in that L1 file and only the L0 files that have those keys (and only the
specific portion of those L0 files in which those keys are found). In this way
the overlap is minimized and potentially eliminated between different iterators
focusing on the same files.
The first step is to restructure the compaction logic to break L0-L1 compactions
into multiple, smaller, sequential compactions. Eventually each of these smaller
jobs will be run simultaneously. Areas to pay extra attention to are
# Correct aggregation of compaction job statistics across multiple threads
# Proper opening/closing of output files (make sure each thread's is unique)
# Keys that span multiple L1 files
# Skewed distributions of keys within L0 files
Test Plan: Make and run db_test (newer version has separate compaction tests) and compaction_job_stats_test
Reviewers: igor, noetzli, anthony, sdong, yhchiang
Reviewed By: yhchiang
Subscribers: MarkCallaghan, dhruba, leveldb
Differential Revision: https://reviews.facebook.net/D42699
9 years ago
|
|
|
TEST_P(CompactionJobStatsTest, UniversalCompactionTest) {
|
|
|
|
Random rnd(301);
|
|
|
|
uint64_t key_base = 100000000l;
|
|
|
|
// Note: key_base must be multiple of num_keys_per_L0_file
|
|
|
|
int num_keys_per_table = 100;
|
|
|
|
const uint32_t kTestScale = 6;
|
|
|
|
const int kKeySize = 10;
|
|
|
|
const int kValueSize = 900;
|
|
|
|
double compression_ratio = 1.0;
|
|
|
|
uint64_t key_interval = key_base / num_keys_per_table;
|
|
|
|
|
|
|
|
auto* stats_checker = new CompactionJobStatsChecker();
|
|
|
|
Options options;
|
|
|
|
options.listeners.emplace_back(stats_checker);
|
|
|
|
options.create_if_missing = true;
|
|
|
|
options.num_levels = 3;
|
|
|
|
options.compression = kNoCompression;
|
|
|
|
options.level0_file_num_compaction_trigger = 2;
|
|
|
|
options.target_file_size_base = num_keys_per_table * 1000;
|
|
|
|
options.compaction_style = kCompactionStyleUniversal;
|
|
|
|
options.compaction_options_universal.size_ratio = 1;
|
|
|
|
options.compaction_options_universal.max_size_amplification_percent = 1000;
|
|
|
|
options.max_subcompactions = max_subcompactions_;
|
Parallelize L0-L1 Compaction: Restructure Compaction Job
Summary:
As of now compactions involving files from Level 0 and Level 1 are single
threaded because the files in L0, although sorted, are not range partitioned like
the other levels. This means that during L0-L1 compaction each file from L1
needs to be merged with potentially all the files from L0.
This attempt to parallelize the L0-L1 compaction assigns a thread and a
corresponding iterator to each L1 file that then considers only the key range
found in that L1 file and only the L0 files that have those keys (and only the
specific portion of those L0 files in which those keys are found). In this way
the overlap is minimized and potentially eliminated between different iterators
focusing on the same files.
The first step is to restructure the compaction logic to break L0-L1 compactions
into multiple, smaller, sequential compactions. Eventually each of these smaller
jobs will be run simultaneously. Areas to pay extra attention to are
# Correct aggregation of compaction job statistics across multiple threads
# Proper opening/closing of output files (make sure each thread's is unique)
# Keys that span multiple L1 files
# Skewed distributions of keys within L0 files
Test Plan: Make and run db_test (newer version has separate compaction tests) and compaction_job_stats_test
Reviewers: igor, noetzli, anthony, sdong, yhchiang
Reviewed By: yhchiang
Subscribers: MarkCallaghan, dhruba, leveldb
Differential Revision: https://reviews.facebook.net/D42699
9 years ago
|
|
|
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
|
|
|
|
|
|
// Generates the expected CompactionJobStats for each compaction
|
|
|
|
for (uint32_t num_flushes = 2; num_flushes <= kTestScale; num_flushes++) {
|
|
|
|
// Here we treat one newly flushed file as an unit.
|
|
|
|
//
|
|
|
|
// For example, if a newly flushed file is 100k, and a compaction has
|
|
|
|
// 4 input units, then this compaction inputs 400k.
|
|
|
|
uint32_t num_input_units = GetUniversalCompactionInputUnits(num_flushes);
|
|
|
|
if (num_input_units == 0) {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
// A full compaction only happens when the number of flushes equals to
|
|
|
|
// the number of compaction input runs.
|
|
|
|
bool is_full = num_flushes == num_input_units;
|
|
|
|
// The following statement determines the expected smallest key
|
|
|
|
// based on whether it is a full compaction.
|
|
|
|
uint64_t smallest_key = is_full ? key_base : key_base * (num_flushes - 1);
|
|
|
|
|
|
|
|
stats_checker->AddExpectedStats(NewManualCompactionJobStats(
|
|
|
|
Key(smallest_key, 10),
|
|
|
|
Key(smallest_key + key_base * num_input_units - key_interval, 10),
|
|
|
|
num_input_units, num_input_units > 2 ? num_input_units / 2 : 0,
|
|
|
|
num_keys_per_table * num_input_units, kKeySize, kValueSize,
|
|
|
|
num_input_units, num_keys_per_table * num_input_units, 1.0, 0, is_full,
|
|
|
|
false));
|
|
|
|
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
|
|
|
}
|
|
|
|
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 3U);
|
|
|
|
|
|
|
|
for (uint64_t start_key = key_base; start_key <= key_base * kTestScale;
|
|
|
|
start_key += key_base) {
|
|
|
|
MakeTableWithKeyValues(&rnd, start_key, start_key + key_base - 1, kKeySize,
|
|
|
|
kValueSize, key_interval, compression_ratio, 1);
|
|
|
|
ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_WaitForCompact());
|
|
|
|
}
|
|
|
|
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
|
|
|
|
}
|
|
|
|
|
|
|
|
INSTANTIATE_TEST_CASE_P(CompactionJobStatsTest, CompactionJobStatsTest,
|
|
|
|
::testing::Values(1, 4));
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
|
|
|
|
int main(int argc, char** argv) {
|
|
|
|
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
|
|
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
|
|
return RUN_ALL_TESTS();
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
#else
|
|
|
|
|
|
|
|
int main(int /*argc*/, char** /*argv*/) { return 0; }
|
|
|
|
#endif // !defined(IOS_CROSS_COMPILE)
|