|
|
|
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
|
|
|
|
|
|
|
|
#include "db/db_test_util.h"
|
|
|
|
#include "port/stack_trace.h"
|
|
|
|
#include "table/unique_id_impl.h"
|
|
|
|
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
|
|
|
|
class MyTestCompactionService : public CompactionService {
|
|
|
|
public:
|
|
|
|
MyTestCompactionService(
|
|
|
|
std::string db_path, Options& options,
|
|
|
|
std::shared_ptr<Statistics>& statistics,
|
|
|
|
std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
|
|
std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
|
|
|
|
table_properties_collector_factories)
|
|
|
|
: db_path_(std::move(db_path)),
|
|
|
|
options_(options),
|
|
|
|
statistics_(statistics),
|
|
|
|
start_info_("na", "na", "na", 0, Env::TOTAL),
|
|
|
|
wait_info_("na", "na", "na", 0, Env::TOTAL),
|
|
|
|
listeners_(listeners),
|
|
|
|
table_properties_collector_factories_(
|
|
|
|
std::move(table_properties_collector_factories)) {}
|
|
|
|
|
|
|
|
static const char* kClassName() { return "MyTestCompactionService"; }
|
|
|
|
|
|
|
|
const char* Name() const override { return kClassName(); }
|
|
|
|
|
|
|
|
CompactionServiceJobStatus StartV2(
|
|
|
|
const CompactionServiceJobInfo& info,
|
|
|
|
const std::string& compaction_service_input) override {
|
|
|
|
InstrumentedMutexLock l(&mutex_);
|
|
|
|
start_info_ = info;
|
|
|
|
assert(info.db_name == db_path_);
|
|
|
|
jobs_.emplace(info.job_id, compaction_service_input);
|
|
|
|
CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
|
|
|
|
if (is_override_start_status_) {
|
|
|
|
return override_start_status_;
|
|
|
|
}
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
|
|
|
CompactionServiceJobStatus WaitForCompleteV2(
|
|
|
|
const CompactionServiceJobInfo& info,
|
|
|
|
std::string* compaction_service_result) override {
|
|
|
|
std::string compaction_input;
|
|
|
|
assert(info.db_name == db_path_);
|
|
|
|
{
|
|
|
|
InstrumentedMutexLock l(&mutex_);
|
|
|
|
wait_info_ = info;
|
|
|
|
auto i = jobs_.find(info.job_id);
|
|
|
|
if (i == jobs_.end()) {
|
|
|
|
return CompactionServiceJobStatus::kFailure;
|
|
|
|
}
|
|
|
|
compaction_input = std::move(i->second);
|
|
|
|
jobs_.erase(i);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (is_override_wait_status_) {
|
|
|
|
return override_wait_status_;
|
|
|
|
}
|
|
|
|
|
|
|
|
CompactionServiceOptionsOverride options_override;
|
|
|
|
options_override.env = options_.env;
|
|
|
|
options_override.file_checksum_gen_factory =
|
|
|
|
options_.file_checksum_gen_factory;
|
|
|
|
options_override.comparator = options_.comparator;
|
|
|
|
options_override.merge_operator = options_.merge_operator;
|
|
|
|
options_override.compaction_filter = options_.compaction_filter;
|
|
|
|
options_override.compaction_filter_factory =
|
|
|
|
options_.compaction_filter_factory;
|
|
|
|
options_override.prefix_extractor = options_.prefix_extractor;
|
|
|
|
options_override.table_factory = options_.table_factory;
|
|
|
|
options_override.sst_partitioner_factory = options_.sst_partitioner_factory;
|
|
|
|
options_override.statistics = statistics_;
|
|
|
|
if (!listeners_.empty()) {
|
|
|
|
options_override.listeners = listeners_;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!table_properties_collector_factories_.empty()) {
|
|
|
|
options_override.table_properties_collector_factories =
|
|
|
|
table_properties_collector_factories_;
|
|
|
|
}
|
|
|
|
|
|
|
|
OpenAndCompactOptions options;
|
|
|
|
options.canceled = &canceled_;
|
|
|
|
|
|
|
|
Status s = DB::OpenAndCompact(
|
|
|
|
options, db_path_, db_path_ + "/" + std::to_string(info.job_id),
|
|
|
|
compaction_input, compaction_service_result, options_override);
|
|
|
|
if (is_override_wait_result_) {
|
|
|
|
*compaction_service_result = override_wait_result_;
|
|
|
|
}
|
|
|
|
compaction_num_.fetch_add(1);
|
|
|
|
if (s.ok()) {
|
|
|
|
return CompactionServiceJobStatus::kSuccess;
|
|
|
|
} else {
|
|
|
|
return CompactionServiceJobStatus::kFailure;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
int GetCompactionNum() { return compaction_num_.load(); }
|
|
|
|
|
|
|
|
CompactionServiceJobInfo GetCompactionInfoForStart() { return start_info_; }
|
|
|
|
CompactionServiceJobInfo GetCompactionInfoForWait() { return wait_info_; }
|
|
|
|
|
|
|
|
void OverrideStartStatus(CompactionServiceJobStatus s) {
|
|
|
|
is_override_start_status_ = true;
|
|
|
|
override_start_status_ = s;
|
|
|
|
}
|
|
|
|
|
|
|
|
void OverrideWaitStatus(CompactionServiceJobStatus s) {
|
|
|
|
is_override_wait_status_ = true;
|
|
|
|
override_wait_status_ = s;
|
|
|
|
}
|
|
|
|
|
|
|
|
void OverrideWaitResult(std::string str) {
|
|
|
|
is_override_wait_result_ = true;
|
|
|
|
override_wait_result_ = std::move(str);
|
|
|
|
}
|
|
|
|
|
|
|
|
void ResetOverride() {
|
|
|
|
is_override_wait_result_ = false;
|
|
|
|
is_override_start_status_ = false;
|
|
|
|
is_override_wait_status_ = false;
|
|
|
|
}
|
|
|
|
|
|
|
|
void SetCanceled(bool canceled) { canceled_ = canceled; }
|
|
|
|
|
|
|
|
private:
|
|
|
|
InstrumentedMutex mutex_;
|
|
|
|
std::atomic_int compaction_num_{0};
|
|
|
|
std::map<uint64_t, std::string> jobs_;
|
|
|
|
const std::string db_path_;
|
|
|
|
Options options_;
|
|
|
|
std::shared_ptr<Statistics> statistics_;
|
|
|
|
CompactionServiceJobInfo start_info_;
|
|
|
|
CompactionServiceJobInfo wait_info_;
|
|
|
|
bool is_override_start_status_ = false;
|
|
|
|
CompactionServiceJobStatus override_start_status_ =
|
|
|
|
CompactionServiceJobStatus::kFailure;
|
|
|
|
bool is_override_wait_status_ = false;
|
|
|
|
CompactionServiceJobStatus override_wait_status_ =
|
|
|
|
CompactionServiceJobStatus::kFailure;
|
|
|
|
bool is_override_wait_result_ = false;
|
|
|
|
std::string override_wait_result_;
|
|
|
|
std::vector<std::shared_ptr<EventListener>> listeners_;
|
|
|
|
std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
|
|
|
|
table_properties_collector_factories_;
|
|
|
|
std::atomic_bool canceled_{false};
|
|
|
|
};
|
|
|
|
|
|
|
|
class CompactionServiceTest : public DBTestBase {
|
|
|
|
public:
|
|
|
|
explicit CompactionServiceTest()
|
|
|
|
: DBTestBase("compaction_service_test", true) {}
|
|
|
|
|
|
|
|
protected:
|
|
|
|
void ReopenWithCompactionService(Options* options) {
|
|
|
|
options->env = env_;
|
|
|
|
primary_statistics_ = CreateDBStatistics();
|
|
|
|
options->statistics = primary_statistics_;
|
|
|
|
compactor_statistics_ = CreateDBStatistics();
|
|
|
|
|
|
|
|
compaction_service_ = std::make_shared<MyTestCompactionService>(
|
|
|
|
dbname_, *options, compactor_statistics_, remote_listeners,
|
|
|
|
remote_table_properties_collector_factories);
|
|
|
|
options->compaction_service = compaction_service_;
|
|
|
|
DestroyAndReopen(*options);
|
|
|
|
}
|
|
|
|
|
|
|
|
Statistics* GetCompactorStatistics() { return compactor_statistics_.get(); }
|
|
|
|
|
|
|
|
Statistics* GetPrimaryStatistics() { return primary_statistics_.get(); }
|
|
|
|
|
|
|
|
MyTestCompactionService* GetCompactionService() {
|
|
|
|
CompactionService* cs = compaction_service_.get();
|
|
|
|
return static_cast_with_check<MyTestCompactionService>(cs);
|
|
|
|
}
|
|
|
|
|
|
|
|
void GenerateTestData() {
|
|
|
|
// Generate 20 files @ L2
|
|
|
|
for (int i = 0; i < 20; i++) {
|
|
|
|
for (int j = 0; j < 10; j++) {
|
|
|
|
int key_id = i * 10 + j;
|
|
|
|
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
|
|
|
MoveFilesToLevel(2);
|
|
|
|
|
|
|
|
// Generate 10 files @ L1 overlap with all 20 files @ L2
|
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
|
for (int j = 0; j < 10; j++) {
|
|
|
|
int key_id = i * 20 + j * 2;
|
|
|
|
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
|
|
|
MoveFilesToLevel(1);
|
|
|
|
ASSERT_EQ(FilesPerLevel(), "0,10,20");
|
|
|
|
}
|
|
|
|
|
|
|
|
void VerifyTestData() {
|
|
|
|
for (int i = 0; i < 200; i++) {
|
|
|
|
auto result = Get(Key(i));
|
|
|
|
if (i % 2) {
|
|
|
|
ASSERT_EQ(result, "value" + std::to_string(i));
|
|
|
|
} else {
|
|
|
|
ASSERT_EQ(result, "value_new" + std::to_string(i));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
std::vector<std::shared_ptr<EventListener>> remote_listeners;
|
|
|
|
std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
|
|
|
|
remote_table_properties_collector_factories;
|
|
|
|
|
|
|
|
private:
|
|
|
|
std::shared_ptr<Statistics> compactor_statistics_;
|
|
|
|
std::shared_ptr<Statistics> primary_statistics_;
|
|
|
|
std::shared_ptr<CompactionService> compaction_service_;
|
|
|
|
};
|
|
|
|
|
|
|
|
TEST_F(CompactionServiceTest, BasicCompactions) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
ReopenWithCompactionService(&options);
|
|
|
|
|
|
|
|
Statistics* primary_statistics = GetPrimaryStatistics();
|
|
|
|
Statistics* compactor_statistics = GetCompactorStatistics();
|
|
|
|
|
|
|
|
for (int i = 0; i < 20; i++) {
|
|
|
|
for (int j = 0; j < 10; j++) {
|
|
|
|
int key_id = i * 10 + j;
|
|
|
|
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
|
|
|
|
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
|
for (int j = 0; j < 10; j++) {
|
|
|
|
int key_id = i * 20 + j * 2;
|
|
|
|
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
|
|
|
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
|
|
|
|
|
|
|
// verify result
|
|
|
|
for (int i = 0; i < 200; i++) {
|
|
|
|
auto result = Get(Key(i));
|
|
|
|
if (i % 2) {
|
|
|
|
ASSERT_EQ(result, "value" + std::to_string(i));
|
|
|
|
} else {
|
|
|
|
ASSERT_EQ(result, "value_new" + std::to_string(i));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
auto my_cs = GetCompactionService();
|
|
|
|
ASSERT_GE(my_cs->GetCompactionNum(), 1);
|
|
|
|
|
|
|
|
// make sure the compaction statistics is only recorded on the remote side
|
|
|
|
ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), 1);
|
|
|
|
ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_READ_BYTES), 1);
|
|
|
|
ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES), 0);
|
|
|
|
// even with remote compaction, primary host still needs to read SST files to
|
|
|
|
// `verify_table()`.
|
|
|
|
ASSERT_GE(primary_statistics->getTickerCount(COMPACT_READ_BYTES), 1);
|
|
|
|
// all the compaction write happens on the remote side
|
|
|
|
ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
|
|
|
|
compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES));
|
|
|
|
ASSERT_GE(primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 1);
|
|
|
|
ASSERT_GT(primary_statistics->getTickerCount(COMPACT_READ_BYTES),
|
|
|
|
primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES));
|
|
|
|
// compactor is already the remote side, which doesn't have remote
|
|
|
|
ASSERT_EQ(compactor_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 0);
|
|
|
|
ASSERT_EQ(compactor_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
|
|
|
|
0);
|
|
|
|
|
|
|
|
// Test failed compaction
|
|
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
|
|
"DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) {
|
|
|
|
// override job status
|
|
|
|
auto s = static_cast<Status*>(status);
|
|
|
|
*s = Status::Aborted("MyTestCompactionService failed to compact!");
|
|
|
|
});
|
|
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
|
|
|
|
Status s;
|
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
|
for (int j = 0; j < 10; j++) {
|
|
|
|
int key_id = i * 20 + j * 2;
|
|
|
|
s = Put(Key(key_id), "value_new" + std::to_string(key_id));
|
|
|
|
if (s.IsAborted()) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (s.IsAborted()) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
s = Flush();
|
|
|
|
if (s.IsAborted()) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
s = dbfull()->TEST_WaitForCompact();
|
|
|
|
if (s.IsAborted()) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ASSERT_TRUE(s.IsAborted());
|
|
|
|
|
Always verify SST unique IDs on SST file open (#10532)
Summary:
Although we've been tracking SST unique IDs in the DB manifest
unconditionally, checking has been opt-in and with an extra pass at DB::Open
time. This changes the behavior of `verify_sst_unique_id_in_manifest` to
check unique ID against manifest every time an SST file is opened through
table cache (normal DB operations), replacing the explicit pass over files
at DB::Open time. This change also enables the option by default and
removes the "EXPERIMENTAL" designation.
One possible criticism is that the option no longer ensures the integrity
of a DB at Open time. This is far from an all-or-nothing issue. Verifying
the IDs of all SST files hardly ensures all the data in the DB is readable.
(VerifyChecksum is supposed to do that.) Also, with
max_open_files=-1 (default, extremely common), all SST files are
opened at DB::Open time anyway.
Implementation details:
* `VerifySstUniqueIdInManifest()` functions are the extra/explicit pass
that is now removed.
* Unit tests that manipulate/corrupt table properties have to opt out of
this check, because that corrupts the "actual" unique id. (And even for
testing we don't currently have a mechanism to set "no unique id"
in the in-memory file metadata for new files.)
* A lot of other unit test churn relates to (a) default checking on, and
(b) checking on SST open even without DB::Open (e.g. on flush)
* Use `FileMetaData` for more `TableCache` operations (in place of
`FileDescriptor`) so that we have access to the unique_id whenever
we might need to open an SST file. **There is the possibility of
performance impact because we can no longer use the more
localized `fd` part of an `FdWithKeyRange` but instead follow the
`file_metadata` pointer. However, this change (possible regression)
is only done for `GetMemoryUsageByTableReaders`.**
* Removed a completely unnecessary constructor overload of
`TableReaderOptions`
Possible follow-up:
* Verification only happens when opening through table cache. Are there
more places where this should happen?
* Improve error message when there is a file size mismatch vs. manifest
(FIXME added in the appropriate place).
* I'm not sure there's a justification for `FileDescriptor` to be distinct from
`FileMetaData`.
* I'm skeptical that `FdWithKeyRange` really still makes sense for
optimizing some data locality by duplicating some data in memory, but I
could be wrong.
* An unnecessary overload of NewTableReader was recently added, in
the public API nonetheless (though unusable there). It should be cleaned
up to put most things under `TableReaderOptions`.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/10532
Test Plan:
updated unit tests
Performance test showing no significant difference (just noise I think):
`./db_bench -benchmarks=readwhilewriting[-X10] -num=3000000 -disable_wal=1 -bloom_bits=8 -write_buffer_size=1000000 -target_file_size_base=1000000`
Before: readwhilewriting [AVG 10 runs] : 68702 (± 6932) ops/sec
After: readwhilewriting [AVG 10 runs] : 68239 (± 7198) ops/sec
Reviewed By: jay-zhuang
Differential Revision: D38765551
Pulled By: pdillinger
fbshipit-source-id: a827a708155f12344ab2a5c16e7701c7636da4c2
2 years ago
|
|
|
// Test re-open and successful unique id verification
|
|
|
|
std::atomic_int verify_passed{0};
|
|
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
|
|
"BlockBasedTable::Open::PassedVerifyUniqueId", [&](void* arg) {
|
|
|
|
// override job status
|
|
|
|
auto id = static_cast<UniqueId64x2*>(arg);
|
|
|
|
assert(*id != kNullUniqueId64x2);
|
|
|
|
verify_passed++;
|
|
|
|
});
|
|
|
|
Reopen(options);
|
|
|
|
ASSERT_GT(verify_passed, 0);
|
|
|
|
Close();
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_F(CompactionServiceTest, ManualCompaction) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
options.disable_auto_compactions = true;
|
|
|
|
ReopenWithCompactionService(&options);
|
|
|
|
GenerateTestData();
|
|
|
|
|
|
|
|
auto my_cs = GetCompactionService();
|
|
|
|
|
|
|
|
std::string start_str = Key(15);
|
|
|
|
std::string end_str = Key(45);
|
|
|
|
Slice start(start_str);
|
|
|
|
Slice end(end_str);
|
|
|
|
uint64_t comp_num = my_cs->GetCompactionNum();
|
|
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
|
|
|
|
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
|
|
|
|
VerifyTestData();
|
|
|
|
|
|
|
|
start_str = Key(120);
|
|
|
|
start = start_str;
|
|
|
|
comp_num = my_cs->GetCompactionNum();
|
|
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
|
|
|
|
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
|
|
|
|
VerifyTestData();
|
|
|
|
|
|
|
|
end_str = Key(92);
|
|
|
|
end = end_str;
|
|
|
|
comp_num = my_cs->GetCompactionNum();
|
|
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, &end));
|
|
|
|
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
|
|
|
|
VerifyTestData();
|
|
|
|
|
|
|
|
comp_num = my_cs->GetCompactionNum();
|
|
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
|
|
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
|
|
|
|
VerifyTestData();
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_F(CompactionServiceTest, CancelCompactionOnRemoteSide) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
options.disable_auto_compactions = true;
|
|
|
|
ReopenWithCompactionService(&options);
|
|
|
|
GenerateTestData();
|
|
|
|
|
|
|
|
auto my_cs = GetCompactionService();
|
|
|
|
|
|
|
|
std::string start_str = Key(15);
|
|
|
|
std::string end_str = Key(45);
|
|
|
|
Slice start(start_str);
|
|
|
|
Slice end(end_str);
|
|
|
|
uint64_t comp_num = my_cs->GetCompactionNum();
|
|
|
|
|
|
|
|
// Test cancel compaction at the beginning
|
|
|
|
my_cs->SetCanceled(true);
|
|
|
|
auto s = db_->CompactRange(CompactRangeOptions(), &start, &end);
|
|
|
|
ASSERT_TRUE(s.IsIncomplete());
|
|
|
|
// compaction number is not increased
|
|
|
|
ASSERT_GE(my_cs->GetCompactionNum(), comp_num);
|
|
|
|
VerifyTestData();
|
|
|
|
|
|
|
|
// Test cancel compaction in progress
|
|
|
|
ReopenWithCompactionService(&options);
|
|
|
|
GenerateTestData();
|
|
|
|
my_cs = GetCompactionService();
|
|
|
|
my_cs->SetCanceled(false);
|
|
|
|
|
|
|
|
std::atomic_bool cancel_issued{false};
|
|
|
|
SyncPoint::GetInstance()->SetCallBack("CompactionJob::Run():Inprogress",
|
|
|
|
[&](void* /*arg*/) {
|
|
|
|
cancel_issued = true;
|
|
|
|
my_cs->SetCanceled(true);
|
|
|
|
});
|
|
|
|
|
|
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
|
|
|
|
s = db_->CompactRange(CompactRangeOptions(), &start, &end);
|
|
|
|
ASSERT_TRUE(s.IsIncomplete());
|
|
|
|
ASSERT_TRUE(cancel_issued);
|
|
|
|
// compaction number is not increased
|
|
|
|
ASSERT_GE(my_cs->GetCompactionNum(), comp_num);
|
|
|
|
VerifyTestData();
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_F(CompactionServiceTest, FailedToStart) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
options.disable_auto_compactions = true;
|
|
|
|
ReopenWithCompactionService(&options);
|
|
|
|
|
|
|
|
GenerateTestData();
|
|
|
|
|
|
|
|
auto my_cs = GetCompactionService();
|
|
|
|
my_cs->OverrideStartStatus(CompactionServiceJobStatus::kFailure);
|
|
|
|
|
|
|
|
std::string start_str = Key(15);
|
|
|
|
std::string end_str = Key(45);
|
|
|
|
Slice start(start_str);
|
|
|
|
Slice end(end_str);
|
|
|
|
Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
|
|
|
|
ASSERT_TRUE(s.IsIncomplete());
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_F(CompactionServiceTest, InvalidResult) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
options.disable_auto_compactions = true;
|
|
|
|
ReopenWithCompactionService(&options);
|
|
|
|
|
|
|
|
GenerateTestData();
|
|
|
|
|
|
|
|
auto my_cs = GetCompactionService();
|
|
|
|
my_cs->OverrideWaitResult("Invalid Str");
|
|
|
|
|
|
|
|
std::string start_str = Key(15);
|
|
|
|
std::string end_str = Key(45);
|
|
|
|
Slice start(start_str);
|
|
|
|
Slice end(end_str);
|
|
|
|
Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
|
|
|
|
ASSERT_FALSE(s.ok());
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_F(CompactionServiceTest, SubCompaction) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
options.max_subcompactions = 10;
|
|
|
|
options.target_file_size_base = 1 << 10; // 1KB
|
|
|
|
options.disable_auto_compactions = true;
|
|
|
|
ReopenWithCompactionService(&options);
|
|
|
|
|
|
|
|
GenerateTestData();
|
|
|
|
VerifyTestData();
|
|
|
|
|
|
|
|
auto my_cs = GetCompactionService();
|
|
|
|
int compaction_num_before = my_cs->GetCompactionNum();
|
|
|
|
|
|
|
|
auto cro = CompactRangeOptions();
|
|
|
|
cro.max_subcompactions = 10;
|
|
|
|
Status s = db_->CompactRange(cro, nullptr, nullptr);
|
|
|
|
ASSERT_OK(s);
|
|
|
|
VerifyTestData();
|
|
|
|
int compaction_num = my_cs->GetCompactionNum() - compaction_num_before;
|
|
|
|
// make sure there's sub-compaction by checking the compaction number
|
|
|
|
ASSERT_GE(compaction_num, 2);
|
|
|
|
}
|
|
|
|
|
|
|
|
class PartialDeleteCompactionFilter : public CompactionFilter {
|
|
|
|
public:
|
|
|
|
CompactionFilter::Decision FilterV2(
|
|
|
|
int /*level*/, const Slice& key, ValueType /*value_type*/,
|
|
|
|
const Slice& /*existing_value*/, std::string* /*new_value*/,
|
|
|
|
std::string* /*skip_until*/) const override {
|
|
|
|
int i = std::stoi(key.ToString().substr(3));
|
|
|
|
if (i > 5 && i <= 105) {
|
|
|
|
return CompactionFilter::Decision::kRemove;
|
|
|
|
}
|
|
|
|
return CompactionFilter::Decision::kKeep;
|
|
|
|
}
|
|
|
|
|
|
|
|
const char* Name() const override { return "PartialDeleteCompactionFilter"; }
|
|
|
|
};
|
|
|
|
|
|
|
|
TEST_F(CompactionServiceTest, CompactionFilter) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
std::unique_ptr<CompactionFilter> delete_comp_filter(
|
|
|
|
new PartialDeleteCompactionFilter());
|
|
|
|
options.compaction_filter = delete_comp_filter.get();
|
|
|
|
ReopenWithCompactionService(&options);
|
|
|
|
|
|
|
|
for (int i = 0; i < 20; i++) {
|
|
|
|
for (int j = 0; j < 10; j++) {
|
|
|
|
int key_id = i * 10 + j;
|
|
|
|
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
|
|
|
|
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
|
for (int j = 0; j < 10; j++) {
|
|
|
|
int key_id = i * 20 + j * 2;
|
|
|
|
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
|
|
|
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
|
|
|
|
|
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
|
|
|
|
|
|
// verify result
|
|
|
|
for (int i = 0; i < 200; i++) {
|
|
|
|
auto result = Get(Key(i));
|
|
|
|
if (i > 5 && i <= 105) {
|
|
|
|
ASSERT_EQ(result, "NOT_FOUND");
|
|
|
|
} else if (i % 2) {
|
|
|
|
ASSERT_EQ(result, "value" + std::to_string(i));
|
|
|
|
} else {
|
|
|
|
ASSERT_EQ(result, "value_new" + std::to_string(i));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
auto my_cs = GetCompactionService();
|
|
|
|
ASSERT_GE(my_cs->GetCompactionNum(), 1);
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_F(CompactionServiceTest, Snapshot) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
ReopenWithCompactionService(&options);
|
|
|
|
|
|
|
|
ASSERT_OK(Put(Key(1), "value1"));
|
|
|
|
ASSERT_OK(Put(Key(2), "value1"));
|
|
|
|
const Snapshot* s1 = db_->GetSnapshot();
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
|
|
|
|
ASSERT_OK(Put(Key(1), "value2"));
|
|
|
|
ASSERT_OK(Put(Key(3), "value2"));
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
|
|
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
|
|
auto my_cs = GetCompactionService();
|
|
|
|
ASSERT_GE(my_cs->GetCompactionNum(), 1);
|
|
|
|
ASSERT_EQ("value1", Get(Key(1), s1));
|
|
|
|
ASSERT_EQ("value2", Get(Key(1)));
|
|
|
|
db_->ReleaseSnapshot(s1);
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_F(CompactionServiceTest, ConcurrentCompaction) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
options.level0_file_num_compaction_trigger = 100;
|
|
|
|
options.max_background_jobs = 20;
|
|
|
|
ReopenWithCompactionService(&options);
|
|
|
|
GenerateTestData();
|
|
|
|
|
|
|
|
ColumnFamilyMetaData meta;
|
|
|
|
db_->GetColumnFamilyMetaData(&meta);
|
|
|
|
|
|
|
|
std::vector<std::thread> threads;
|
|
|
|
for (const auto& file : meta.levels[1].files) {
|
|
|
|
threads.emplace_back(std::thread([&]() {
|
|
|
|
std::string fname = file.db_path + "/" + file.name;
|
|
|
|
ASSERT_OK(db_->CompactFiles(CompactionOptions(), {fname}, 2));
|
|
|
|
}));
|
|
|
|
}
|
|
|
|
|
|
|
|
for (auto& thread : threads) {
|
|
|
|
thread.join();
|
|
|
|
}
|
|
|
|
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
|
|
|
|
|
|
|
// verify result
|
|
|
|
for (int i = 0; i < 200; i++) {
|
|
|
|
auto result = Get(Key(i));
|
|
|
|
if (i % 2) {
|
|
|
|
ASSERT_EQ(result, "value" + std::to_string(i));
|
|
|
|
} else {
|
|
|
|
ASSERT_EQ(result, "value_new" + std::to_string(i));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
auto my_cs = GetCompactionService();
|
|
|
|
ASSERT_EQ(my_cs->GetCompactionNum(), 10);
|
|
|
|
ASSERT_EQ(FilesPerLevel(), "0,0,10");
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_F(CompactionServiceTest, CompactionInfo) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
ReopenWithCompactionService(&options);
|
|
|
|
|
|
|
|
for (int i = 0; i < 20; i++) {
|
|
|
|
for (int j = 0; j < 10; j++) {
|
|
|
|
int key_id = i * 10 + j;
|
|
|
|
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
|
|
|
|
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
|
for (int j = 0; j < 10; j++) {
|
|
|
|
int key_id = i * 20 + j * 2;
|
|
|
|
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
|
|
|
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
|
|
|
auto my_cs =
|
|
|
|
static_cast_with_check<MyTestCompactionService>(GetCompactionService());
|
|
|
|
uint64_t comp_num = my_cs->GetCompactionNum();
|
|
|
|
ASSERT_GE(comp_num, 1);
|
|
|
|
|
|
|
|
CompactionServiceJobInfo info = my_cs->GetCompactionInfoForStart();
|
|
|
|
ASSERT_EQ(dbname_, info.db_name);
|
|
|
|
std::string db_id, db_session_id;
|
|
|
|
ASSERT_OK(db_->GetDbIdentity(db_id));
|
|
|
|
ASSERT_EQ(db_id, info.db_id);
|
|
|
|
ASSERT_OK(db_->GetDbSessionId(db_session_id));
|
|
|
|
ASSERT_EQ(db_session_id, info.db_session_id);
|
|
|
|
ASSERT_EQ(Env::LOW, info.priority);
|
|
|
|
info = my_cs->GetCompactionInfoForWait();
|
|
|
|
ASSERT_EQ(dbname_, info.db_name);
|
|
|
|
ASSERT_EQ(db_id, info.db_id);
|
|
|
|
ASSERT_EQ(db_session_id, info.db_session_id);
|
|
|
|
ASSERT_EQ(Env::LOW, info.priority);
|
|
|
|
|
|
|
|
// Test priority USER
|
|
|
|
ColumnFamilyMetaData meta;
|
|
|
|
db_->GetColumnFamilyMetaData(&meta);
|
|
|
|
SstFileMetaData file = meta.levels[1].files[0];
|
|
|
|
ASSERT_OK(db_->CompactFiles(CompactionOptions(),
|
|
|
|
{file.db_path + "/" + file.name}, 2));
|
|
|
|
info = my_cs->GetCompactionInfoForStart();
|
|
|
|
ASSERT_EQ(Env::USER, info.priority);
|
|
|
|
info = my_cs->GetCompactionInfoForWait();
|
|
|
|
ASSERT_EQ(Env::USER, info.priority);
|
|
|
|
|
|
|
|
// Test priority BOTTOM
|
|
|
|
env_->SetBackgroundThreads(1, Env::BOTTOM);
|
|
|
|
options.num_levels = 2;
|
|
|
|
ReopenWithCompactionService(&options);
|
|
|
|
my_cs =
|
|
|
|
static_cast_with_check<MyTestCompactionService>(GetCompactionService());
|
|
|
|
|
|
|
|
for (int i = 0; i < 20; i++) {
|
|
|
|
for (int j = 0; j < 10; j++) {
|
|
|
|
int key_id = i * 10 + j;
|
|
|
|
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
|
|
|
|
|
|
|
for (int i = 0; i < 4; i++) {
|
|
|
|
for (int j = 0; j < 10; j++) {
|
|
|
|
int key_id = i * 20 + j * 2;
|
|
|
|
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
|
|
|
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
|
|
|
info = my_cs->GetCompactionInfoForStart();
|
|
|
|
ASSERT_EQ(Env::BOTTOM, info.priority);
|
|
|
|
info = my_cs->GetCompactionInfoForWait();
|
|
|
|
ASSERT_EQ(Env::BOTTOM, info.priority);
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_F(CompactionServiceTest, FallbackLocalAuto) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
ReopenWithCompactionService(&options);
|
|
|
|
|
|
|
|
auto my_cs = GetCompactionService();
|
|
|
|
Statistics* compactor_statistics = GetCompactorStatistics();
|
|
|
|
Statistics* primary_statistics = GetPrimaryStatistics();
|
|
|
|
uint64_t compactor_write_bytes =
|
|
|
|
compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
|
|
|
|
uint64_t primary_write_bytes =
|
|
|
|
primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
|
|
|
|
|
|
|
|
my_cs->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal);
|
|
|
|
|
|
|
|
for (int i = 0; i < 20; i++) {
|
|
|
|
for (int j = 0; j < 10; j++) {
|
|
|
|
int key_id = i * 10 + j;
|
|
|
|
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
|
|
|
|
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
|
for (int j = 0; j < 10; j++) {
|
|
|
|
int key_id = i * 20 + j * 2;
|
|
|
|
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
|
|
|
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
|
|
|
|
|
|
|
// verify result
|
|
|
|
for (int i = 0; i < 200; i++) {
|
|
|
|
auto result = Get(Key(i));
|
|
|
|
if (i % 2) {
|
|
|
|
ASSERT_EQ(result, "value" + std::to_string(i));
|
|
|
|
} else {
|
|
|
|
ASSERT_EQ(result, "value_new" + std::to_string(i));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
ASSERT_EQ(my_cs->GetCompactionNum(), 0);
|
|
|
|
|
|
|
|
// make sure the compaction statistics is only recorded on the local side
|
|
|
|
ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
|
|
|
|
compactor_write_bytes);
|
|
|
|
ASSERT_GT(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
|
|
|
|
primary_write_bytes);
|
|
|
|
ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 0);
|
|
|
|
ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES), 0);
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_F(CompactionServiceTest, FallbackLocalManual) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
options.disable_auto_compactions = true;
|
|
|
|
ReopenWithCompactionService(&options);
|
|
|
|
|
|
|
|
GenerateTestData();
|
|
|
|
VerifyTestData();
|
|
|
|
|
|
|
|
auto my_cs = GetCompactionService();
|
|
|
|
Statistics* compactor_statistics = GetCompactorStatistics();
|
|
|
|
Statistics* primary_statistics = GetPrimaryStatistics();
|
|
|
|
uint64_t compactor_write_bytes =
|
|
|
|
compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
|
|
|
|
uint64_t primary_write_bytes =
|
|
|
|
primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
|
|
|
|
|
|
|
|
// re-enable remote compaction
|
|
|
|
my_cs->ResetOverride();
|
|
|
|
std::string start_str = Key(15);
|
|
|
|
std::string end_str = Key(45);
|
|
|
|
Slice start(start_str);
|
|
|
|
Slice end(end_str);
|
|
|
|
uint64_t comp_num = my_cs->GetCompactionNum();
|
|
|
|
|
|
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
|
|
|
|
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
|
|
|
|
// make sure the compaction statistics is only recorded on the remote side
|
|
|
|
ASSERT_GT(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
|
|
|
|
compactor_write_bytes);
|
|
|
|
ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
|
|
|
|
compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES));
|
|
|
|
ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
|
|
|
|
primary_write_bytes);
|
|
|
|
|
|
|
|
// return run local again with API WaitForComplete
|
|
|
|
my_cs->OverrideWaitStatus(CompactionServiceJobStatus::kUseLocal);
|
|
|
|
start_str = Key(120);
|
|
|
|
start = start_str;
|
|
|
|
comp_num = my_cs->GetCompactionNum();
|
|
|
|
compactor_write_bytes =
|
|
|
|
compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
|
|
|
|
primary_write_bytes = primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
|
|
|
|
|
|
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
|
|
|
|
ASSERT_EQ(my_cs->GetCompactionNum(),
|
|
|
|
comp_num); // no remote compaction is run
|
|
|
|
// make sure the compaction statistics is only recorded on the local side
|
|
|
|
ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
|
|
|
|
compactor_write_bytes);
|
|
|
|
ASSERT_GT(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
|
|
|
|
primary_write_bytes);
|
|
|
|
ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
|
|
|
|
compactor_write_bytes);
|
|
|
|
|
|
|
|
// verify result after 2 manual compactions
|
|
|
|
VerifyTestData();
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_F(CompactionServiceTest, RemoteEventListener) {
|
|
|
|
class RemoteEventListenerTest : public EventListener {
|
|
|
|
public:
|
|
|
|
const char* Name() const override { return "RemoteEventListenerTest"; }
|
|
|
|
|
|
|
|
void OnSubcompactionBegin(const SubcompactionJobInfo& info) override {
|
|
|
|
auto result = on_going_compactions.emplace(info.job_id);
|
|
|
|
ASSERT_TRUE(result.second); // make sure there's no duplication
|
|
|
|
compaction_num++;
|
|
|
|
EventListener::OnSubcompactionBegin(info);
|
|
|
|
}
|
|
|
|
void OnSubcompactionCompleted(const SubcompactionJobInfo& info) override {
|
|
|
|
auto num = on_going_compactions.erase(info.job_id);
|
|
|
|
ASSERT_TRUE(num == 1); // make sure the compaction id exists
|
|
|
|
EventListener::OnSubcompactionCompleted(info);
|
|
|
|
}
|
|
|
|
void OnTableFileCreated(const TableFileCreationInfo& info) override {
|
|
|
|
ASSERT_EQ(on_going_compactions.count(info.job_id), 1);
|
|
|
|
file_created++;
|
|
|
|
EventListener::OnTableFileCreated(info);
|
|
|
|
}
|
|
|
|
void OnTableFileCreationStarted(
|
|
|
|
const TableFileCreationBriefInfo& info) override {
|
|
|
|
ASSERT_EQ(on_going_compactions.count(info.job_id), 1);
|
|
|
|
file_creation_started++;
|
|
|
|
EventListener::OnTableFileCreationStarted(info);
|
|
|
|
}
|
|
|
|
|
|
|
|
bool ShouldBeNotifiedOnFileIO() override {
|
|
|
|
file_io_notified++;
|
|
|
|
return EventListener::ShouldBeNotifiedOnFileIO();
|
|
|
|
}
|
|
|
|
|
|
|
|
std::atomic_uint64_t file_io_notified{0};
|
|
|
|
std::atomic_uint64_t file_creation_started{0};
|
|
|
|
std::atomic_uint64_t file_created{0};
|
|
|
|
|
|
|
|
std::set<int> on_going_compactions; // store the job_id
|
|
|
|
std::atomic_uint64_t compaction_num{0};
|
|
|
|
};
|
|
|
|
|
|
|
|
auto listener = new RemoteEventListenerTest();
|
|
|
|
remote_listeners.emplace_back(listener);
|
|
|
|
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
ReopenWithCompactionService(&options);
|
|
|
|
|
|
|
|
for (int i = 0; i < 20; i++) {
|
|
|
|
for (int j = 0; j < 10; j++) {
|
|
|
|
int key_id = i * 10 + j;
|
|
|
|
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
|
|
|
|
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
|
for (int j = 0; j < 10; j++) {
|
|
|
|
int key_id = i * 20 + j * 2;
|
|
|
|
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
|
|
|
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
|
|
|
|
|
|
|
// check the events are triggered
|
|
|
|
ASSERT_TRUE(listener->file_io_notified > 0);
|
|
|
|
ASSERT_TRUE(listener->file_creation_started > 0);
|
|
|
|
ASSERT_TRUE(listener->file_created > 0);
|
|
|
|
ASSERT_TRUE(listener->compaction_num > 0);
|
|
|
|
ASSERT_TRUE(listener->on_going_compactions.empty());
|
|
|
|
|
|
|
|
// verify result
|
|
|
|
for (int i = 0; i < 200; i++) {
|
|
|
|
auto result = Get(Key(i));
|
|
|
|
if (i % 2) {
|
|
|
|
ASSERT_EQ(result, "value" + std::to_string(i));
|
|
|
|
} else {
|
|
|
|
ASSERT_EQ(result, "value_new" + std::to_string(i));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_F(CompactionServiceTest, TablePropertiesCollector) {
|
|
|
|
const static std::string kUserPropertyName = "TestCount";
|
|
|
|
|
|
|
|
class TablePropertiesCollectorTest : public TablePropertiesCollector {
|
|
|
|
public:
|
|
|
|
Status Finish(UserCollectedProperties* properties) override {
|
|
|
|
*properties = UserCollectedProperties{
|
|
|
|
{kUserPropertyName, std::to_string(count_)},
|
|
|
|
};
|
|
|
|
return Status::OK();
|
|
|
|
}
|
|
|
|
|
|
|
|
UserCollectedProperties GetReadableProperties() const override {
|
|
|
|
return UserCollectedProperties();
|
|
|
|
}
|
|
|
|
|
|
|
|
const char* Name() const override { return "TablePropertiesCollectorTest"; }
|
|
|
|
|
|
|
|
Status AddUserKey(const Slice& /*user_key*/, const Slice& /*value*/,
|
|
|
|
EntryType /*type*/, SequenceNumber /*seq*/,
|
|
|
|
uint64_t /*file_size*/) override {
|
|
|
|
count_++;
|
|
|
|
return Status::OK();
|
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
|
|
uint32_t count_ = 0;
|
|
|
|
};
|
|
|
|
|
|
|
|
class TablePropertiesCollectorFactoryTest
|
|
|
|
: public TablePropertiesCollectorFactory {
|
|
|
|
public:
|
|
|
|
TablePropertiesCollector* CreateTablePropertiesCollector(
|
|
|
|
TablePropertiesCollectorFactory::Context /*context*/) override {
|
|
|
|
return new TablePropertiesCollectorTest();
|
|
|
|
}
|
|
|
|
|
|
|
|
const char* Name() const override {
|
|
|
|
return "TablePropertiesCollectorFactoryTest";
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
auto factory = new TablePropertiesCollectorFactoryTest();
|
|
|
|
remote_table_properties_collector_factories.emplace_back(factory);
|
|
|
|
|
|
|
|
const int kNumSst = 3;
|
|
|
|
const int kLevel0Trigger = 4;
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
options.level0_file_num_compaction_trigger = kLevel0Trigger;
|
|
|
|
ReopenWithCompactionService(&options);
|
|
|
|
|
|
|
|
// generate a few SSTs locally which should not have user property
|
|
|
|
for (int i = 0; i < kNumSst; i++) {
|
|
|
|
for (int j = 0; j < 100; j++) {
|
|
|
|
ASSERT_OK(Put(Key(i * 10 + j), "value"));
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
|
|
|
|
|
|
|
TablePropertiesCollection fname_to_props;
|
|
|
|
ASSERT_OK(db_->GetPropertiesOfAllTables(&fname_to_props));
|
|
|
|
for (const auto& file_props : fname_to_props) {
|
|
|
|
auto properties = file_props.second->user_collected_properties;
|
|
|
|
auto it = properties.find(kUserPropertyName);
|
|
|
|
ASSERT_EQ(it, properties.end());
|
|
|
|
}
|
|
|
|
|
|
|
|
// trigger compaction
|
|
|
|
for (int i = kNumSst; i < kLevel0Trigger; i++) {
|
|
|
|
for (int j = 0; j < 100; j++) {
|
|
|
|
ASSERT_OK(Put(Key(i * 10 + j), "value"));
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
Remove wait_unscheduled from waitForCompact internal API (#11443)
Summary:
Context:
In pull request https://github.com/facebook/rocksdb/issues/11436, we are introducing a new public API `waitForCompact(const WaitForCompactOptions& wait_for_compact_options)`. This API invokes the internal implementation `waitForCompact(bool wait_unscheduled=false)`. The unscheduled parameter indicates the compactions that are not yet scheduled but are required to process items in the queue.
In certain cases, we are unable to wait for compactions, such as during a shutdown or when background jobs are paused. It is important to return the appropriate status in these scenarios. For all other cases, we should wait for all compaction and flush jobs, including the unscheduled ones. The primary purpose of this new API is to wait until the system has resolved its compaction debt. Currently, the usage of `wait_unscheduled` is limited to test code.
This pull request eliminates the usage of wait_unscheduled. The internal `waitForCompact()` API now waits for unscheduled compactions unless the db is undergoing a shutdown. In the event of a shutdown, the API returns `Status::ShutdownInProgress()`.
Additionally, a new parameter, `abort_on_pause`, has been introduced with a default value of `false`. This parameter addresses the possibility of waiting indefinitely for unscheduled jobs if `PauseBackgroundWork()` was called before `waitForCompact()` is invoked. By setting `abort_on_pause` to `true`, the API will immediately return `Status::Aborted`.
Furthermore, all tests that previously called `waitForCompact(true)` have been fixed.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/11443
Test Plan:
Existing tests that involve a shutdown in progress:
- DBCompactionTest::CompactRangeShutdownWhileDelayed
- DBTestWithParam::PreShutdownMultipleCompaction
- DBTestWithParam::PreShutdownCompactionMiddle
Reviewed By: pdillinger
Differential Revision: D45923426
Pulled By: jaykorean
fbshipit-source-id: 7dc93fe6a6841a7d9d2d72866fa647090dba8eae
2 years ago
|
|
|
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
|
|
|
|
|
|
|
ASSERT_OK(db_->GetPropertiesOfAllTables(&fname_to_props));
|
|
|
|
|
|
|
|
bool has_user_property = false;
|
|
|
|
for (const auto& file_props : fname_to_props) {
|
|
|
|
auto properties = file_props.second->user_collected_properties;
|
|
|
|
auto it = properties.find(kUserPropertyName);
|
|
|
|
if (it != properties.end()) {
|
|
|
|
has_user_property = true;
|
|
|
|
ASSERT_GT(std::stoi(it->second), 0);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ASSERT_TRUE(has_user_property);
|
|
|
|
}
|
|
|
|
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
|
|
|
|
int main(int argc, char** argv) {
|
|
|
|
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
|
|
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
|
|
RegisterCustomObjects(argc, argv);
|
|
|
|
return RUN_ALL_TESTS();
|
|
|
|
}
|