You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
rocksdb/db/compaction/compaction_service_test.cc

955 lines
31 KiB

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "table/unique_id_impl.h"
namespace ROCKSDB_NAMESPACE {
class MyTestCompactionService : public CompactionService {
public:
MyTestCompactionService(
std::string db_path, Options& options,
std::shared_ptr<Statistics>& statistics,
std::vector<std::shared_ptr<EventListener>>& listeners,
std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
table_properties_collector_factories)
: db_path_(std::move(db_path)),
options_(options),
statistics_(statistics),
start_info_("na", "na", "na", 0, Env::TOTAL),
wait_info_("na", "na", "na", 0, Env::TOTAL),
listeners_(listeners),
table_properties_collector_factories_(
std::move(table_properties_collector_factories)) {}
static const char* kClassName() { return "MyTestCompactionService"; }
const char* Name() const override { return kClassName(); }
CompactionServiceJobStatus StartV2(
const CompactionServiceJobInfo& info,
const std::string& compaction_service_input) override {
InstrumentedMutexLock l(&mutex_);
start_info_ = info;
assert(info.db_name == db_path_);
jobs_.emplace(info.job_id, compaction_service_input);
CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
if (is_override_start_status_) {
return override_start_status_;
}
return s;
}
CompactionServiceJobStatus WaitForCompleteV2(
const CompactionServiceJobInfo& info,
std::string* compaction_service_result) override {
std::string compaction_input;
assert(info.db_name == db_path_);
{
InstrumentedMutexLock l(&mutex_);
wait_info_ = info;
auto i = jobs_.find(info.job_id);
if (i == jobs_.end()) {
return CompactionServiceJobStatus::kFailure;
}
compaction_input = std::move(i->second);
jobs_.erase(i);
}
if (is_override_wait_status_) {
return override_wait_status_;
}
CompactionServiceOptionsOverride options_override;
options_override.env = options_.env;
options_override.file_checksum_gen_factory =
options_.file_checksum_gen_factory;
options_override.comparator = options_.comparator;
options_override.merge_operator = options_.merge_operator;
options_override.compaction_filter = options_.compaction_filter;
options_override.compaction_filter_factory =
options_.compaction_filter_factory;
options_override.prefix_extractor = options_.prefix_extractor;
options_override.table_factory = options_.table_factory;
options_override.sst_partitioner_factory = options_.sst_partitioner_factory;
options_override.statistics = statistics_;
if (!listeners_.empty()) {
options_override.listeners = listeners_;
}
if (!table_properties_collector_factories_.empty()) {
options_override.table_properties_collector_factories =
table_properties_collector_factories_;
}
OpenAndCompactOptions options;
options.canceled = &canceled_;
Status s = DB::OpenAndCompact(
options, db_path_, db_path_ + "/" + std::to_string(info.job_id),
compaction_input, compaction_service_result, options_override);
if (is_override_wait_result_) {
*compaction_service_result = override_wait_result_;
}
compaction_num_.fetch_add(1);
if (s.ok()) {
return CompactionServiceJobStatus::kSuccess;
} else {
return CompactionServiceJobStatus::kFailure;
}
}
int GetCompactionNum() { return compaction_num_.load(); }
CompactionServiceJobInfo GetCompactionInfoForStart() { return start_info_; }
CompactionServiceJobInfo GetCompactionInfoForWait() { return wait_info_; }
void OverrideStartStatus(CompactionServiceJobStatus s) {
is_override_start_status_ = true;
override_start_status_ = s;
}
void OverrideWaitStatus(CompactionServiceJobStatus s) {
is_override_wait_status_ = true;
override_wait_status_ = s;
}
void OverrideWaitResult(std::string str) {
is_override_wait_result_ = true;
override_wait_result_ = std::move(str);
}
void ResetOverride() {
is_override_wait_result_ = false;
is_override_start_status_ = false;
is_override_wait_status_ = false;
}
void SetCanceled(bool canceled) { canceled_ = canceled; }
private:
InstrumentedMutex mutex_;
std::atomic_int compaction_num_{0};
std::map<uint64_t, std::string> jobs_;
const std::string db_path_;
Options options_;
std::shared_ptr<Statistics> statistics_;
CompactionServiceJobInfo start_info_;
CompactionServiceJobInfo wait_info_;
bool is_override_start_status_ = false;
CompactionServiceJobStatus override_start_status_ =
CompactionServiceJobStatus::kFailure;
bool is_override_wait_status_ = false;
CompactionServiceJobStatus override_wait_status_ =
CompactionServiceJobStatus::kFailure;
bool is_override_wait_result_ = false;
std::string override_wait_result_;
std::vector<std::shared_ptr<EventListener>> listeners_;
std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
table_properties_collector_factories_;
std::atomic_bool canceled_{false};
};
class CompactionServiceTest : public DBTestBase {
public:
explicit CompactionServiceTest()
: DBTestBase("compaction_service_test", true) {}
protected:
void ReopenWithCompactionService(Options* options) {
options->env = env_;
primary_statistics_ = CreateDBStatistics();
options->statistics = primary_statistics_;
compactor_statistics_ = CreateDBStatistics();
compaction_service_ = std::make_shared<MyTestCompactionService>(
dbname_, *options, compactor_statistics_, remote_listeners,
remote_table_properties_collector_factories);
options->compaction_service = compaction_service_;
DestroyAndReopen(*options);
}
Statistics* GetCompactorStatistics() { return compactor_statistics_.get(); }
Statistics* GetPrimaryStatistics() { return primary_statistics_.get(); }
MyTestCompactionService* GetCompactionService() {
CompactionService* cs = compaction_service_.get();
return static_cast_with_check<MyTestCompactionService>(cs);
}
void GenerateTestData() {
// Generate 20 files @ L2
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 10 + j;
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}
MoveFilesToLevel(2);
// Generate 10 files @ L1 overlap with all 20 files @ L2
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 20 + j * 2;
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}
MoveFilesToLevel(1);
ASSERT_EQ(FilesPerLevel(), "0,10,20");
}
void VerifyTestData() {
for (int i = 0; i < 200; i++) {
auto result = Get(Key(i));
if (i % 2) {
ASSERT_EQ(result, "value" + std::to_string(i));
} else {
ASSERT_EQ(result, "value_new" + std::to_string(i));
}
}
}
std::vector<std::shared_ptr<EventListener>> remote_listeners;
std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
remote_table_properties_collector_factories;
private:
std::shared_ptr<Statistics> compactor_statistics_;
std::shared_ptr<Statistics> primary_statistics_;
std::shared_ptr<CompactionService> compaction_service_;
};
TEST_F(CompactionServiceTest, BasicCompactions) {
Options options = CurrentOptions();
ReopenWithCompactionService(&options);
Statistics* primary_statistics = GetPrimaryStatistics();
Statistics* compactor_statistics = GetCompactorStatistics();
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 10 + j;
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 20 + j * 2;
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// verify result
for (int i = 0; i < 200; i++) {
auto result = Get(Key(i));
if (i % 2) {
ASSERT_EQ(result, "value" + std::to_string(i));
} else {
ASSERT_EQ(result, "value_new" + std::to_string(i));
}
}
auto my_cs = GetCompactionService();
ASSERT_GE(my_cs->GetCompactionNum(), 1);
// make sure the compaction statistics is only recorded on the remote side
ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), 1);
ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_READ_BYTES), 1);
ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES), 0);
// even with remote compaction, primary host still needs to read SST files to
// `verify_table()`.
ASSERT_GE(primary_statistics->getTickerCount(COMPACT_READ_BYTES), 1);
// all the compaction write happens on the remote side
ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES));
ASSERT_GE(primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 1);
ASSERT_GT(primary_statistics->getTickerCount(COMPACT_READ_BYTES),
primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES));
// compactor is already the remote side, which doesn't have remote
ASSERT_EQ(compactor_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 0);
ASSERT_EQ(compactor_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
0);
// Test failed compaction
SyncPoint::GetInstance()->SetCallBack(
"DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) {
// override job status
auto s = static_cast<Status*>(status);
*s = Status::Aborted("MyTestCompactionService failed to compact!");
});
SyncPoint::GetInstance()->EnableProcessing();
Status s;
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 20 + j * 2;
s = Put(Key(key_id), "value_new" + std::to_string(key_id));
if (s.IsAborted()) {
break;
}
}
if (s.IsAborted()) {
break;
}
s = Flush();
if (s.IsAborted()) {
break;
}
s = dbfull()->TEST_WaitForCompact();
if (s.IsAborted()) {
break;
}
}
ASSERT_TRUE(s.IsAborted());
Always verify SST unique IDs on SST file open (#10532) Summary: Although we've been tracking SST unique IDs in the DB manifest unconditionally, checking has been opt-in and with an extra pass at DB::Open time. This changes the behavior of `verify_sst_unique_id_in_manifest` to check unique ID against manifest every time an SST file is opened through table cache (normal DB operations), replacing the explicit pass over files at DB::Open time. This change also enables the option by default and removes the "EXPERIMENTAL" designation. One possible criticism is that the option no longer ensures the integrity of a DB at Open time. This is far from an all-or-nothing issue. Verifying the IDs of all SST files hardly ensures all the data in the DB is readable. (VerifyChecksum is supposed to do that.) Also, with max_open_files=-1 (default, extremely common), all SST files are opened at DB::Open time anyway. Implementation details: * `VerifySstUniqueIdInManifest()` functions are the extra/explicit pass that is now removed. * Unit tests that manipulate/corrupt table properties have to opt out of this check, because that corrupts the "actual" unique id. (And even for testing we don't currently have a mechanism to set "no unique id" in the in-memory file metadata for new files.) * A lot of other unit test churn relates to (a) default checking on, and (b) checking on SST open even without DB::Open (e.g. on flush) * Use `FileMetaData` for more `TableCache` operations (in place of `FileDescriptor`) so that we have access to the unique_id whenever we might need to open an SST file. **There is the possibility of performance impact because we can no longer use the more localized `fd` part of an `FdWithKeyRange` but instead follow the `file_metadata` pointer. However, this change (possible regression) is only done for `GetMemoryUsageByTableReaders`.** * Removed a completely unnecessary constructor overload of `TableReaderOptions` Possible follow-up: * Verification only happens when opening through table cache. Are there more places where this should happen? * Improve error message when there is a file size mismatch vs. manifest (FIXME added in the appropriate place). * I'm not sure there's a justification for `FileDescriptor` to be distinct from `FileMetaData`. * I'm skeptical that `FdWithKeyRange` really still makes sense for optimizing some data locality by duplicating some data in memory, but I could be wrong. * An unnecessary overload of NewTableReader was recently added, in the public API nonetheless (though unusable there). It should be cleaned up to put most things under `TableReaderOptions`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10532 Test Plan: updated unit tests Performance test showing no significant difference (just noise I think): `./db_bench -benchmarks=readwhilewriting[-X10] -num=3000000 -disable_wal=1 -bloom_bits=8 -write_buffer_size=1000000 -target_file_size_base=1000000` Before: readwhilewriting [AVG 10 runs] : 68702 (± 6932) ops/sec After: readwhilewriting [AVG 10 runs] : 68239 (± 7198) ops/sec Reviewed By: jay-zhuang Differential Revision: D38765551 Pulled By: pdillinger fbshipit-source-id: a827a708155f12344ab2a5c16e7701c7636da4c2
2 years ago
// Test re-open and successful unique id verification
std::atomic_int verify_passed{0};
SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTable::Open::PassedVerifyUniqueId", [&](void* arg) {
// override job status
auto id = static_cast<UniqueId64x2*>(arg);
assert(*id != kNullUniqueId64x2);
verify_passed++;
});
Reopen(options);
ASSERT_GT(verify_passed, 0);
Close();
}
TEST_F(CompactionServiceTest, ManualCompaction) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
ReopenWithCompactionService(&options);
GenerateTestData();
auto my_cs = GetCompactionService();
std::string start_str = Key(15);
std::string end_str = Key(45);
Slice start(start_str);
Slice end(end_str);
uint64_t comp_num = my_cs->GetCompactionNum();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
VerifyTestData();
start_str = Key(120);
start = start_str;
comp_num = my_cs->GetCompactionNum();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
VerifyTestData();
end_str = Key(92);
end = end_str;
comp_num = my_cs->GetCompactionNum();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, &end));
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
VerifyTestData();
comp_num = my_cs->GetCompactionNum();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
VerifyTestData();
}
TEST_F(CompactionServiceTest, CancelCompactionOnRemoteSide) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
ReopenWithCompactionService(&options);
GenerateTestData();
auto my_cs = GetCompactionService();
std::string start_str = Key(15);
std::string end_str = Key(45);
Slice start(start_str);
Slice end(end_str);
uint64_t comp_num = my_cs->GetCompactionNum();
// Test cancel compaction at the beginning
my_cs->SetCanceled(true);
auto s = db_->CompactRange(CompactRangeOptions(), &start, &end);
ASSERT_TRUE(s.IsIncomplete());
// compaction number is not increased
ASSERT_GE(my_cs->GetCompactionNum(), comp_num);
VerifyTestData();
// Test cancel compaction in progress
ReopenWithCompactionService(&options);
GenerateTestData();
my_cs = GetCompactionService();
my_cs->SetCanceled(false);
std::atomic_bool cancel_issued{false};
SyncPoint::GetInstance()->SetCallBack("CompactionJob::Run():Inprogress",
[&](void* /*arg*/) {
cancel_issued = true;
my_cs->SetCanceled(true);
});
SyncPoint::GetInstance()->EnableProcessing();
s = db_->CompactRange(CompactRangeOptions(), &start, &end);
ASSERT_TRUE(s.IsIncomplete());
ASSERT_TRUE(cancel_issued);
// compaction number is not increased
ASSERT_GE(my_cs->GetCompactionNum(), comp_num);
VerifyTestData();
}
TEST_F(CompactionServiceTest, FailedToStart) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
ReopenWithCompactionService(&options);
GenerateTestData();
auto my_cs = GetCompactionService();
my_cs->OverrideStartStatus(CompactionServiceJobStatus::kFailure);
std::string start_str = Key(15);
std::string end_str = Key(45);
Slice start(start_str);
Slice end(end_str);
Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
ASSERT_TRUE(s.IsIncomplete());
}
TEST_F(CompactionServiceTest, InvalidResult) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
ReopenWithCompactionService(&options);
GenerateTestData();
auto my_cs = GetCompactionService();
my_cs->OverrideWaitResult("Invalid Str");
std::string start_str = Key(15);
std::string end_str = Key(45);
Slice start(start_str);
Slice end(end_str);
Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
ASSERT_FALSE(s.ok());
}
TEST_F(CompactionServiceTest, SubCompaction) {
Options options = CurrentOptions();
options.max_subcompactions = 10;
options.target_file_size_base = 1 << 10; // 1KB
options.disable_auto_compactions = true;
ReopenWithCompactionService(&options);
GenerateTestData();
VerifyTestData();
auto my_cs = GetCompactionService();
int compaction_num_before = my_cs->GetCompactionNum();
auto cro = CompactRangeOptions();
cro.max_subcompactions = 10;
Status s = db_->CompactRange(cro, nullptr, nullptr);
ASSERT_OK(s);
VerifyTestData();
int compaction_num = my_cs->GetCompactionNum() - compaction_num_before;
// make sure there's sub-compaction by checking the compaction number
ASSERT_GE(compaction_num, 2);
}
class PartialDeleteCompactionFilter : public CompactionFilter {
public:
CompactionFilter::Decision FilterV2(
int /*level*/, const Slice& key, ValueType /*value_type*/,
const Slice& /*existing_value*/, std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
int i = std::stoi(key.ToString().substr(3));
if (i > 5 && i <= 105) {
return CompactionFilter::Decision::kRemove;
}
return CompactionFilter::Decision::kKeep;
}
const char* Name() const override { return "PartialDeleteCompactionFilter"; }
};
TEST_F(CompactionServiceTest, CompactionFilter) {
Options options = CurrentOptions();
std::unique_ptr<CompactionFilter> delete_comp_filter(
new PartialDeleteCompactionFilter());
options.compaction_filter = delete_comp_filter.get();
ReopenWithCompactionService(&options);
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 10 + j;
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 20 + j * 2;
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// verify result
for (int i = 0; i < 200; i++) {
auto result = Get(Key(i));
if (i > 5 && i <= 105) {
ASSERT_EQ(result, "NOT_FOUND");
} else if (i % 2) {
ASSERT_EQ(result, "value" + std::to_string(i));
} else {
ASSERT_EQ(result, "value_new" + std::to_string(i));
}
}
auto my_cs = GetCompactionService();
ASSERT_GE(my_cs->GetCompactionNum(), 1);
}
TEST_F(CompactionServiceTest, Snapshot) {
Options options = CurrentOptions();
ReopenWithCompactionService(&options);
ASSERT_OK(Put(Key(1), "value1"));
ASSERT_OK(Put(Key(2), "value1"));
const Snapshot* s1 = db_->GetSnapshot();
ASSERT_OK(Flush());
ASSERT_OK(Put(Key(1), "value2"));
ASSERT_OK(Put(Key(3), "value2"));
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
auto my_cs = GetCompactionService();
ASSERT_GE(my_cs->GetCompactionNum(), 1);
ASSERT_EQ("value1", Get(Key(1), s1));
ASSERT_EQ("value2", Get(Key(1)));
db_->ReleaseSnapshot(s1);
}
TEST_F(CompactionServiceTest, ConcurrentCompaction) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 100;
options.max_background_jobs = 20;
ReopenWithCompactionService(&options);
GenerateTestData();
ColumnFamilyMetaData meta;
db_->GetColumnFamilyMetaData(&meta);
std::vector<std::thread> threads;
for (const auto& file : meta.levels[1].files) {
threads.emplace_back(std::thread([&]() {
std::string fname = file.db_path + "/" + file.name;
ASSERT_OK(db_->CompactFiles(CompactionOptions(), {fname}, 2));
}));
}
for (auto& thread : threads) {
thread.join();
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// verify result
for (int i = 0; i < 200; i++) {
auto result = Get(Key(i));
if (i % 2) {
ASSERT_EQ(result, "value" + std::to_string(i));
} else {
ASSERT_EQ(result, "value_new" + std::to_string(i));
}
}
auto my_cs = GetCompactionService();
ASSERT_EQ(my_cs->GetCompactionNum(), 10);
ASSERT_EQ(FilesPerLevel(), "0,0,10");
}
TEST_F(CompactionServiceTest, CompactionInfo) {
Options options = CurrentOptions();
ReopenWithCompactionService(&options);
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 10 + j;
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 20 + j * 2;
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
auto my_cs =
static_cast_with_check<MyTestCompactionService>(GetCompactionService());
uint64_t comp_num = my_cs->GetCompactionNum();
ASSERT_GE(comp_num, 1);
CompactionServiceJobInfo info = my_cs->GetCompactionInfoForStart();
ASSERT_EQ(dbname_, info.db_name);
std::string db_id, db_session_id;
ASSERT_OK(db_->GetDbIdentity(db_id));
ASSERT_EQ(db_id, info.db_id);
ASSERT_OK(db_->GetDbSessionId(db_session_id));
ASSERT_EQ(db_session_id, info.db_session_id);
ASSERT_EQ(Env::LOW, info.priority);
info = my_cs->GetCompactionInfoForWait();
ASSERT_EQ(dbname_, info.db_name);
ASSERT_EQ(db_id, info.db_id);
ASSERT_EQ(db_session_id, info.db_session_id);
ASSERT_EQ(Env::LOW, info.priority);
// Test priority USER
ColumnFamilyMetaData meta;
db_->GetColumnFamilyMetaData(&meta);
SstFileMetaData file = meta.levels[1].files[0];
ASSERT_OK(db_->CompactFiles(CompactionOptions(),
{file.db_path + "/" + file.name}, 2));
info = my_cs->GetCompactionInfoForStart();
ASSERT_EQ(Env::USER, info.priority);
info = my_cs->GetCompactionInfoForWait();
ASSERT_EQ(Env::USER, info.priority);
// Test priority BOTTOM
env_->SetBackgroundThreads(1, Env::BOTTOM);
options.num_levels = 2;
ReopenWithCompactionService(&options);
my_cs =
static_cast_with_check<MyTestCompactionService>(GetCompactionService());
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 10 + j;
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 20 + j * 2;
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
info = my_cs->GetCompactionInfoForStart();
ASSERT_EQ(Env::BOTTOM, info.priority);
info = my_cs->GetCompactionInfoForWait();
ASSERT_EQ(Env::BOTTOM, info.priority);
}
TEST_F(CompactionServiceTest, FallbackLocalAuto) {
Options options = CurrentOptions();
ReopenWithCompactionService(&options);
auto my_cs = GetCompactionService();
Statistics* compactor_statistics = GetCompactorStatistics();
Statistics* primary_statistics = GetPrimaryStatistics();
uint64_t compactor_write_bytes =
compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
uint64_t primary_write_bytes =
primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
my_cs->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal);
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 10 + j;
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 20 + j * 2;
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// verify result
for (int i = 0; i < 200; i++) {
auto result = Get(Key(i));
if (i % 2) {
ASSERT_EQ(result, "value" + std::to_string(i));
} else {
ASSERT_EQ(result, "value_new" + std::to_string(i));
}
}
ASSERT_EQ(my_cs->GetCompactionNum(), 0);
// make sure the compaction statistics is only recorded on the local side
ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
compactor_write_bytes);
ASSERT_GT(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
primary_write_bytes);
ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 0);
ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES), 0);
}
TEST_F(CompactionServiceTest, FallbackLocalManual) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
ReopenWithCompactionService(&options);
GenerateTestData();
VerifyTestData();
auto my_cs = GetCompactionService();
Statistics* compactor_statistics = GetCompactorStatistics();
Statistics* primary_statistics = GetPrimaryStatistics();
uint64_t compactor_write_bytes =
compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
uint64_t primary_write_bytes =
primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
// re-enable remote compaction
my_cs->ResetOverride();
std::string start_str = Key(15);
std::string end_str = Key(45);
Slice start(start_str);
Slice end(end_str);
uint64_t comp_num = my_cs->GetCompactionNum();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
// make sure the compaction statistics is only recorded on the remote side
ASSERT_GT(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
compactor_write_bytes);
ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES));
ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
primary_write_bytes);
// return run local again with API WaitForComplete
my_cs->OverrideWaitStatus(CompactionServiceJobStatus::kUseLocal);
start_str = Key(120);
start = start_str;
comp_num = my_cs->GetCompactionNum();
compactor_write_bytes =
compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
primary_write_bytes = primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
ASSERT_EQ(my_cs->GetCompactionNum(),
comp_num); // no remote compaction is run
// make sure the compaction statistics is only recorded on the local side
ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
compactor_write_bytes);
ASSERT_GT(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
primary_write_bytes);
ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
compactor_write_bytes);
// verify result after 2 manual compactions
VerifyTestData();
}
TEST_F(CompactionServiceTest, RemoteEventListener) {
class RemoteEventListenerTest : public EventListener {
public:
const char* Name() const override { return "RemoteEventListenerTest"; }
void OnSubcompactionBegin(const SubcompactionJobInfo& info) override {
auto result = on_going_compactions.emplace(info.job_id);
ASSERT_TRUE(result.second); // make sure there's no duplication
compaction_num++;
EventListener::OnSubcompactionBegin(info);
}
void OnSubcompactionCompleted(const SubcompactionJobInfo& info) override {
auto num = on_going_compactions.erase(info.job_id);
ASSERT_TRUE(num == 1); // make sure the compaction id exists
EventListener::OnSubcompactionCompleted(info);
}
void OnTableFileCreated(const TableFileCreationInfo& info) override {
ASSERT_EQ(on_going_compactions.count(info.job_id), 1);
file_created++;
EventListener::OnTableFileCreated(info);
}
void OnTableFileCreationStarted(
const TableFileCreationBriefInfo& info) override {
ASSERT_EQ(on_going_compactions.count(info.job_id), 1);
file_creation_started++;
EventListener::OnTableFileCreationStarted(info);
}
bool ShouldBeNotifiedOnFileIO() override {
file_io_notified++;
return EventListener::ShouldBeNotifiedOnFileIO();
}
std::atomic_uint64_t file_io_notified{0};
std::atomic_uint64_t file_creation_started{0};
std::atomic_uint64_t file_created{0};
std::set<int> on_going_compactions; // store the job_id
std::atomic_uint64_t compaction_num{0};
};
auto listener = new RemoteEventListenerTest();
remote_listeners.emplace_back(listener);
Options options = CurrentOptions();
ReopenWithCompactionService(&options);
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 10 + j;
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 20 + j * 2;
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// check the events are triggered
ASSERT_TRUE(listener->file_io_notified > 0);
ASSERT_TRUE(listener->file_creation_started > 0);
ASSERT_TRUE(listener->file_created > 0);
ASSERT_TRUE(listener->compaction_num > 0);
ASSERT_TRUE(listener->on_going_compactions.empty());
// verify result
for (int i = 0; i < 200; i++) {
auto result = Get(Key(i));
if (i % 2) {
ASSERT_EQ(result, "value" + std::to_string(i));
} else {
ASSERT_EQ(result, "value_new" + std::to_string(i));
}
}
}
TEST_F(CompactionServiceTest, TablePropertiesCollector) {
const static std::string kUserPropertyName = "TestCount";
class TablePropertiesCollectorTest : public TablePropertiesCollector {
public:
Status Finish(UserCollectedProperties* properties) override {
*properties = UserCollectedProperties{
{kUserPropertyName, std::to_string(count_)},
};
return Status::OK();
}
UserCollectedProperties GetReadableProperties() const override {
return UserCollectedProperties();
}
const char* Name() const override { return "TablePropertiesCollectorTest"; }
Status AddUserKey(const Slice& /*user_key*/, const Slice& /*value*/,
EntryType /*type*/, SequenceNumber /*seq*/,
uint64_t /*file_size*/) override {
count_++;
return Status::OK();
}
private:
uint32_t count_ = 0;
};
class TablePropertiesCollectorFactoryTest
: public TablePropertiesCollectorFactory {
public:
TablePropertiesCollector* CreateTablePropertiesCollector(
TablePropertiesCollectorFactory::Context /*context*/) override {
return new TablePropertiesCollectorTest();
}
const char* Name() const override {
return "TablePropertiesCollectorFactoryTest";
}
};
auto factory = new TablePropertiesCollectorFactoryTest();
remote_table_properties_collector_factories.emplace_back(factory);
const int kNumSst = 3;
const int kLevel0Trigger = 4;
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = kLevel0Trigger;
ReopenWithCompactionService(&options);
// generate a few SSTs locally which should not have user property
for (int i = 0; i < kNumSst; i++) {
for (int j = 0; j < 100; j++) {
ASSERT_OK(Put(Key(i * 10 + j), "value"));
}
ASSERT_OK(Flush());
}
TablePropertiesCollection fname_to_props;
ASSERT_OK(db_->GetPropertiesOfAllTables(&fname_to_props));
for (const auto& file_props : fname_to_props) {
auto properties = file_props.second->user_collected_properties;
auto it = properties.find(kUserPropertyName);
ASSERT_EQ(it, properties.end());
}
// trigger compaction
for (int i = kNumSst; i < kLevel0Trigger; i++) {
for (int j = 0; j < 100; j++) {
ASSERT_OK(Put(Key(i * 10 + j), "value"));
}
ASSERT_OK(Flush());
}
Remove wait_unscheduled from waitForCompact internal API (#11443) Summary: Context: In pull request https://github.com/facebook/rocksdb/issues/11436, we are introducing a new public API `waitForCompact(const WaitForCompactOptions& wait_for_compact_options)`. This API invokes the internal implementation `waitForCompact(bool wait_unscheduled=false)`. The unscheduled parameter indicates the compactions that are not yet scheduled but are required to process items in the queue. In certain cases, we are unable to wait for compactions, such as during a shutdown or when background jobs are paused. It is important to return the appropriate status in these scenarios. For all other cases, we should wait for all compaction and flush jobs, including the unscheduled ones. The primary purpose of this new API is to wait until the system has resolved its compaction debt. Currently, the usage of `wait_unscheduled` is limited to test code. This pull request eliminates the usage of wait_unscheduled. The internal `waitForCompact()` API now waits for unscheduled compactions unless the db is undergoing a shutdown. In the event of a shutdown, the API returns `Status::ShutdownInProgress()`. Additionally, a new parameter, `abort_on_pause`, has been introduced with a default value of `false`. This parameter addresses the possibility of waiting indefinitely for unscheduled jobs if `PauseBackgroundWork()` was called before `waitForCompact()` is invoked. By setting `abort_on_pause` to `true`, the API will immediately return `Status::Aborted`. Furthermore, all tests that previously called `waitForCompact(true)` have been fixed. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11443 Test Plan: Existing tests that involve a shutdown in progress: - DBCompactionTest::CompactRangeShutdownWhileDelayed - DBTestWithParam::PreShutdownMultipleCompaction - DBTestWithParam::PreShutdownCompactionMiddle Reviewed By: pdillinger Differential Revision: D45923426 Pulled By: jaykorean fbshipit-source-id: 7dc93fe6a6841a7d9d2d72866fa647090dba8eae
2 years ago
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_OK(db_->GetPropertiesOfAllTables(&fname_to_props));
bool has_user_property = false;
for (const auto& file_props : fname_to_props) {
auto properties = file_props.second->user_collected_properties;
auto it = properties.find(kUserPropertyName);
if (it != properties.end()) {
has_user_property = true;
ASSERT_GT(std::stoi(it->second), 0);
}
}
ASSERT_TRUE(has_user_property);
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
RegisterCustomObjects(argc, argv);
return RUN_ALL_TESTS();
}