Add extra information to RemoteCompaction APIs (#8680)

Summary:
Currently, we only provide job_id in RemoteCompaction APIs, the
main problem of `job_id` is it cannot uniquely identify a compaction job
between DB instances or between sessions.
Providing DB and session id to the user, which will make building cross
DB compaction service easier.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8680

Test Plan: unittest

Reviewed By: ajkr

Differential Revision: D30444859

Pulled By: jay-zhuang

fbshipit-source-id: fdf107f4286564049637f154193c6d94c3c59448
main
Jay Zhuang 3 years ago committed by Facebook GitHub Bot
parent 1a5eb33d91
commit 249b1078c9
  1. 4
      HISTORY.md
  2. 9
      db/compaction/compaction_job.cc
  3. 333
      db/compaction/compaction_service_test.cc
  4. 59
      include/rocksdb/options.h

@ -1,4 +1,8 @@
# Rocksdb Change Log
## Unreleased
### New Features
* RemoteCompaction's interface now includes `db_name`, `db_id`, `session_id`, which could help the user uniquely identify compaction job between db instances and sessions.
## 6.24.0 (2021-08-20)
### Bug Fixes
* If the primary's CURRENT file is missing or inaccessible, the secondary instance should not hang repeatedly trying to switch to a new MANIFEST. It should instead return the error code encountered while accessing the file.

@ -984,9 +984,10 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
"[%s] [JOB %d] Starting remote compaction (output level: %d): %s",
compaction_input.column_family.name.c_str(), job_id_,
compaction_input.output_level, input_files_oss.str().c_str());
CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_,
GetCompactionId(sub_compact));
CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->Start(compaction_input_binary,
GetCompactionId(sub_compact));
db_options_.compaction_service->StartV2(info, compaction_input_binary);
if (compaction_status != CompactionServiceJobStatus::kSuccess) {
sub_compact->status =
Status::Incomplete("CompactionService failed to start compaction job.");
@ -994,8 +995,8 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
}
std::string compaction_result_binary;
compaction_status = db_options_.compaction_service->WaitForComplete(
GetCompactionId(sub_compact), &compaction_result_binary);
compaction_status = db_options_.compaction_service->WaitForCompleteV2(
info, &compaction_result_binary);
CompactionServiceResult compaction_result;
s = CompactionServiceResult::Read(compaction_result_binary,

@ -10,13 +10,45 @@
namespace ROCKSDB_NAMESPACE {
class MyTestCompactionService : public CompactionService {
class TestCompactionServiceBase {
public:
MyTestCompactionService(const std::string& db_path, Options& options,
std::shared_ptr<Statistics> statistics = nullptr)
: db_path_(db_path), options_(options), statistics_(statistics) {}
virtual int GetCompactionNum() = 0;
static const char* kClassName() { return "MyTestCompactionService"; }
void OverrideStartStatus(CompactionServiceJobStatus s) {
is_override_start_status = true;
override_start_status = s;
}
void OverrideWaitResult(std::string str) {
is_override_wait_result = true;
override_wait_result = std::move(str);
}
void ResetOverride() {
is_override_wait_result = false;
is_override_start_status = false;
}
virtual ~TestCompactionServiceBase() = default;
protected:
bool is_override_start_status = false;
CompactionServiceJobStatus override_start_status =
CompactionServiceJobStatus::kFailure;
bool is_override_wait_result = false;
std::string override_wait_result;
};
class MyTestCompactionServiceLegacy : public CompactionService,
public TestCompactionServiceBase {
public:
MyTestCompactionServiceLegacy(std::string db_path, Options& options,
std::shared_ptr<Statistics>& statistics)
: db_path_(std::move(db_path)),
options_(options),
statistics_(statistics) {}
static const char* kClassName() { return "MyTestCompactionServiceLegacy"; }
const char* Name() const override { return kClassName(); }
@ -25,7 +57,9 @@ class MyTestCompactionService : public CompactionService {
InstrumentedMutexLock l(&mutex_);
jobs_.emplace(job_id, compaction_service_input);
CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
TEST_SYNC_POINT_CALLBACK("MyTestCompactionService::Start::End", &s);
if (is_override_start_status) {
return override_start_status;
}
return s;
}
@ -59,8 +93,9 @@ class MyTestCompactionService : public CompactionService {
Status s = DB::OpenAndCompact(
db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(job_id),
compaction_input, compaction_service_result, options_override);
TEST_SYNC_POINT_CALLBACK("MyTestCompactionService::WaitForComplete::End",
compaction_service_result);
if (is_override_wait_result) {
*compaction_service_result = override_wait_result;
}
compaction_num_.fetch_add(1);
if (s.ok()) {
return CompactionServiceJobStatus::kSuccess;
@ -69,7 +104,7 @@ class MyTestCompactionService : public CompactionService {
}
}
int GetCompactionNum() { return compaction_num_.load(); }
int GetCompactionNum() override { return compaction_num_.load(); }
private:
InstrumentedMutex mutex_;
@ -80,12 +115,140 @@ class MyTestCompactionService : public CompactionService {
std::shared_ptr<Statistics> statistics_;
};
class CompactionServiceTest : public DBTestBase {
class MyTestCompactionService : public CompactionService,
public TestCompactionServiceBase {
public:
MyTestCompactionService(std::string db_path, Options& options,
std::shared_ptr<Statistics>& statistics)
: db_path_(std::move(db_path)),
options_(options),
statistics_(statistics),
start_info_("na", "na", "na", 0),
wait_info_("na", "na", "na", 0) {}
static const char* kClassName() { return "MyTestCompactionService"; }
const char* Name() const override { return kClassName(); }
CompactionServiceJobStatus StartV2(
const CompactionServiceJobInfo& info,
const std::string& compaction_service_input) override {
InstrumentedMutexLock l(&mutex_);
start_info_ = info;
assert(info.db_name == db_path_);
jobs_.emplace(info.job_id, compaction_service_input);
CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
if (is_override_start_status) {
return override_start_status;
}
return s;
}
CompactionServiceJobStatus WaitForCompleteV2(
const CompactionServiceJobInfo& info,
std::string* compaction_service_result) override {
std::string compaction_input;
assert(info.db_name == db_path_);
{
InstrumentedMutexLock l(&mutex_);
wait_info_ = info;
auto i = jobs_.find(info.job_id);
if (i == jobs_.end()) {
return CompactionServiceJobStatus::kFailure;
}
compaction_input = std::move(i->second);
jobs_.erase(i);
}
CompactionServiceOptionsOverride options_override;
options_override.env = options_.env;
options_override.file_checksum_gen_factory =
options_.file_checksum_gen_factory;
options_override.comparator = options_.comparator;
options_override.merge_operator = options_.merge_operator;
options_override.compaction_filter = options_.compaction_filter;
options_override.compaction_filter_factory =
options_.compaction_filter_factory;
options_override.prefix_extractor = options_.prefix_extractor;
options_override.table_factory = options_.table_factory;
options_override.sst_partitioner_factory = options_.sst_partitioner_factory;
options_override.statistics = statistics_;
Status s = DB::OpenAndCompact(
db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(info.job_id),
compaction_input, compaction_service_result, options_override);
if (is_override_wait_result) {
*compaction_service_result = override_wait_result;
}
compaction_num_.fetch_add(1);
if (s.ok()) {
return CompactionServiceJobStatus::kSuccess;
} else {
return CompactionServiceJobStatus::kFailure;
}
}
int GetCompactionNum() override { return compaction_num_.load(); }
CompactionServiceJobInfo GetCompactionInfoForStart() { return start_info_; }
CompactionServiceJobInfo GetCompactionInfoForWait() { return wait_info_; }
private:
InstrumentedMutex mutex_;
std::atomic_int compaction_num_{0};
std::map<uint64_t, std::string> jobs_;
const std::string db_path_;
Options options_;
std::shared_ptr<Statistics> statistics_;
CompactionServiceJobInfo start_info_;
CompactionServiceJobInfo wait_info_;
};
// This is only for listing test classes
enum TestCompactionServiceType {
MyTestCompactionServiceType,
MyTestCompactionServiceLegacyType,
};
class CompactionServiceTest
: public DBTestBase,
public testing::WithParamInterface<TestCompactionServiceType> {
public:
explicit CompactionServiceTest()
: DBTestBase("compaction_service_test", true) {}
protected:
void ReopenWithCompactionService(Options* options) {
options->env = env_;
primary_statistics_ = CreateDBStatistics();
options->statistics = primary_statistics_;
compactor_statistics_ = CreateDBStatistics();
TestCompactionServiceType cs_type = GetParam();
switch (cs_type) {
case MyTestCompactionServiceType:
compaction_service_ = std::make_shared<MyTestCompactionService>(
dbname_, *options, compactor_statistics_);
break;
case MyTestCompactionServiceLegacyType:
compaction_service_ = std::make_shared<MyTestCompactionServiceLegacy>(
dbname_, *options, compactor_statistics_);
break;
default:
assert(false);
}
options->compaction_service = compaction_service_;
DestroyAndReopen(*options);
}
Statistics* GetCompactorStatistics() { return compactor_statistics_.get(); }
Statistics* GetPrimaryStatistics() { return primary_statistics_.get(); }
TestCompactionServiceBase* GetCompactionService() {
CompactionService* cs = compaction_service_.get();
return dynamic_cast<TestCompactionServiceBase*>(cs);
}
void GenerateTestData() {
// Generate 20 files @ L2
for (int i = 0; i < 20; i++) {
@ -119,17 +282,16 @@ class CompactionServiceTest : public DBTestBase {
}
}
}
private:
std::shared_ptr<Statistics> compactor_statistics_;
std::shared_ptr<Statistics> primary_statistics_;
std::shared_ptr<CompactionService> compaction_service_;
};
TEST_F(CompactionServiceTest, BasicCompactions) {
TEST_P(CompactionServiceTest, BasicCompactions) {
Options options = CurrentOptions();
options.env = env_;
options.statistics = CreateDBStatistics();
std::shared_ptr<Statistics> compactor_statistics = CreateDBStatistics();
options.compaction_service = std::make_shared<MyTestCompactionService>(
dbname_, options, compactor_statistics);
DestroyAndReopen(options);
ReopenWithCompactionService(&options);
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
@ -157,21 +319,22 @@ TEST_F(CompactionServiceTest, BasicCompactions) {
ASSERT_EQ(result, "value_new" + ToString(i));
}
}
auto my_cs =
dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
auto my_cs = GetCompactionService();
Statistics* compactor_statistics = GetCompactorStatistics();
ASSERT_GE(my_cs->GetCompactionNum(), 1);
// make sure the compaction statistics is only recorded on remote side
ASSERT_GE(
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), 1);
ASSERT_EQ(options.statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
Statistics* primary_statistics = GetPrimaryStatistics();
ASSERT_EQ(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
0);
// Test failed compaction
SyncPoint::GetInstance()->SetCallBack(
"DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) {
// override job status
Status* s = static_cast<Status*>(status);
auto s = static_cast<Status*>(status);
*s = Status::Aborted("MyTestCompactionService failed to compact!");
});
SyncPoint::GetInstance()->EnableProcessing();
@ -200,17 +363,13 @@ TEST_F(CompactionServiceTest, BasicCompactions) {
ASSERT_TRUE(s.IsAborted());
}
TEST_F(CompactionServiceTest, ManualCompaction) {
TEST_P(CompactionServiceTest, ManualCompaction) {
Options options = CurrentOptions();
options.env = env_;
options.disable_auto_compactions = true;
options.compaction_service =
std::make_shared<MyTestCompactionService>(dbname_, options);
DestroyAndReopen(options);
ReopenWithCompactionService(&options);
GenerateTestData();
auto my_cs =
dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
auto my_cs = GetCompactionService();
std::string start_str = Key(15);
std::string end_str = Key(45);
@ -241,22 +400,15 @@ TEST_F(CompactionServiceTest, ManualCompaction) {
VerifyTestData();
}
TEST_F(CompactionServiceTest, FailedToStart) {
TEST_P(CompactionServiceTest, FailedToStart) {
Options options = CurrentOptions();
options.env = env_;
options.disable_auto_compactions = true;
options.compaction_service =
std::make_shared<MyTestCompactionService>(dbname_, options);
DestroyAndReopen(options);
ReopenWithCompactionService(&options);
GenerateTestData();
SyncPoint::GetInstance()->SetCallBack(
"MyTestCompactionService::Start::End", [&](void* status) {
// override job status
auto s = static_cast<CompactionServiceJobStatus*>(status);
*s = CompactionServiceJobStatus::kFailure;
});
SyncPoint::GetInstance()->EnableProcessing();
auto my_cs = GetCompactionService();
my_cs->OverrideStartStatus(CompactionServiceJobStatus::kFailure);
std::string start_str = Key(15);
std::string end_str = Key(45);
@ -266,22 +418,15 @@ TEST_F(CompactionServiceTest, FailedToStart) {
ASSERT_TRUE(s.IsIncomplete());
}
TEST_F(CompactionServiceTest, InvalidResult) {
TEST_P(CompactionServiceTest, InvalidResult) {
Options options = CurrentOptions();
options.env = env_;
options.disable_auto_compactions = true;
options.compaction_service =
std::make_shared<MyTestCompactionService>(dbname_, options);
DestroyAndReopen(options);
ReopenWithCompactionService(&options);
GenerateTestData();
SyncPoint::GetInstance()->SetCallBack(
"MyTestCompactionService::WaitForComplete::End", [&](void* result) {
// override job status
auto result_str = static_cast<std::string*>(result);
*result_str = "Invalid Str";
});
SyncPoint::GetInstance()->EnableProcessing();
auto my_cs = GetCompactionService();
my_cs->OverrideWaitResult("Invalid Str");
std::string start_str = Key(15);
std::string end_str = Key(45);
@ -291,21 +436,17 @@ TEST_F(CompactionServiceTest, InvalidResult) {
ASSERT_FALSE(s.ok());
}
TEST_F(CompactionServiceTest, SubCompaction) {
TEST_P(CompactionServiceTest, SubCompaction) {
Options options = CurrentOptions();
options.env = env_;
options.max_subcompactions = 10;
options.target_file_size_base = 1 << 10; // 1KB
options.disable_auto_compactions = true;
options.compaction_service =
std::make_shared<MyTestCompactionService>(dbname_, options);
ReopenWithCompactionService(&options);
DestroyAndReopen(options);
GenerateTestData();
VerifyTestData();
auto my_cs =
dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
auto my_cs = GetCompactionService();
int compaction_num_before = my_cs->GetCompactionNum();
auto cro = CompactRangeOptions();
@ -334,16 +475,12 @@ class PartialDeleteCompactionFilter : public CompactionFilter {
const char* Name() const override { return "PartialDeleteCompactionFilter"; }
};
TEST_F(CompactionServiceTest, CompactionFilter) {
TEST_P(CompactionServiceTest, CompactionFilter) {
Options options = CurrentOptions();
options.env = env_;
std::unique_ptr<CompactionFilter> delete_comp_filter(
new PartialDeleteCompactionFilter());
options.compaction_filter = delete_comp_filter.get();
options.compaction_service =
std::make_shared<MyTestCompactionService>(dbname_, options);
DestroyAndReopen(options);
ReopenWithCompactionService(&options);
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
@ -375,18 +512,13 @@ TEST_F(CompactionServiceTest, CompactionFilter) {
ASSERT_EQ(result, "value_new" + ToString(i));
}
}
auto my_cs =
dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
auto my_cs = GetCompactionService();
ASSERT_GE(my_cs->GetCompactionNum(), 1);
}
TEST_F(CompactionServiceTest, Snapshot) {
TEST_P(CompactionServiceTest, Snapshot) {
Options options = CurrentOptions();
options.env = env_;
options.compaction_service =
std::make_shared<MyTestCompactionService>(dbname_, options);
DestroyAndReopen(options);
ReopenWithCompactionService(&options);
ASSERT_OK(Put(Key(1), "value1"));
ASSERT_OK(Put(Key(2), "value1"));
@ -398,23 +530,18 @@ TEST_F(CompactionServiceTest, Snapshot) {
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
auto my_cs =
dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
auto my_cs = GetCompactionService();
ASSERT_GE(my_cs->GetCompactionNum(), 1);
ASSERT_EQ("value1", Get(Key(1), s1));
ASSERT_EQ("value2", Get(Key(1)));
db_->ReleaseSnapshot(s1);
}
TEST_F(CompactionServiceTest, ConcurrentCompaction) {
TEST_P(CompactionServiceTest, ConcurrentCompaction) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 100;
options.env = env_;
options.compaction_service =
std::make_shared<MyTestCompactionService>(dbname_, options);
options.max_background_jobs = 20;
DestroyAndReopen(options);
ReopenWithCompactionService(&options);
GenerateTestData();
ColumnFamilyMetaData meta;
@ -442,12 +569,50 @@ TEST_F(CompactionServiceTest, ConcurrentCompaction) {
ASSERT_EQ(result, "value_new" + ToString(i));
}
}
auto my_cs =
dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
auto my_cs = GetCompactionService();
ASSERT_EQ(my_cs->GetCompactionNum(), 10);
ASSERT_EQ(FilesPerLevel(), "0,0,10");
}
TEST_P(CompactionServiceTest, CompactionInfo) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
ReopenWithCompactionService(&options);
GenerateTestData();
auto my_cs = GetCompactionService();
std::string start_str = Key(15);
std::string end_str = Key(45);
Slice start(start_str);
Slice end(end_str);
uint64_t comp_num = my_cs->GetCompactionNum();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
VerifyTestData();
// only test compaction info for new compaction service interface
if (GetParam() == MyTestCompactionServiceType) {
auto cs = static_cast_with_check<MyTestCompactionService>(my_cs);
CompactionServiceJobInfo info = cs->GetCompactionInfoForStart();
ASSERT_EQ(dbname_, info.db_name);
std::string db_id, db_session_id;
ASSERT_OK(db_->GetDbIdentity(db_id));
ASSERT_EQ(db_id, info.db_id);
ASSERT_OK(db_->GetDbSessionId(db_session_id));
ASSERT_EQ(db_session_id, info.db_session_id);
info = cs->GetCompactionInfoForWait();
ASSERT_EQ(dbname_, info.db_name);
ASSERT_EQ(db_id, info.db_id);
ASSERT_EQ(db_session_id, info.db_session_id);
}
}
INSTANTIATE_TEST_CASE_P(
CompactionServiceTest, CompactionServiceTest,
::testing::Values(
TestCompactionServiceType::MyTestCompactionServiceType,
TestCompactionServiceType::MyTestCompactionServiceLegacyType));
} // namespace ROCKSDB_NAMESPACE
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS

@ -375,27 +375,70 @@ enum class CompactionServiceJobStatus : char {
kUseLocal, // TODO: Add support for use local compaction
};
struct CompactionServiceJobInfo {
std::string db_name;
std::string db_id;
std::string db_session_id;
uint64_t job_id; // job_id is only unique within the current DB and session,
// restart DB will reset the job_id. `db_id` and
// `db_session_id` could help you build unique id across
// different DBs and sessions.
// TODO: Add priority information
CompactionServiceJobInfo(std::string db_name_, std::string db_id_,
std::string db_session_id_, uint64_t job_id_)
: db_name(std::move(db_name_)),
db_id(std::move(db_id_)),
db_session_id(std::move(db_session_id_)),
job_id(job_id_) {}
};
class CompactionService : public Customizable {
public:
static const char* Type() { return "CompactionService"; }
// Returns the name of this compaction service.
virtual const char* Name() const = 0;
const char* Name() const override = 0;
// Start the compaction with input information, which can be passed to
// `DB::OpenAndCompact()`.
// job_id is pre-assigned, it will be reset after DB re-open.
// TODO: sub-compaction is not supported, as they will have the same job_id, a
// sub-compaction id might be added
// Warning: deprecated, please use the new interface
// `StartV2(CompactionServiceJobInfo, ...)` instead.
virtual CompactionServiceJobStatus Start(
const std::string& compaction_service_input, uint64_t job_id) = 0;
const std::string& /*compaction_service_input*/, uint64_t /*job_id*/) {
return CompactionServiceJobStatus::kUseLocal;
}
// Start the remote compaction with `compaction_service_input`, which can be
// passed to `DB::OpenAndCompact()` on the remote side. `info` provides the
// information the user might want to know, which includes `job_id`.
virtual CompactionServiceJobStatus StartV2(
const CompactionServiceJobInfo& info,
const std::string& compaction_service_input) {
// Default implementation to call legacy interface, please override and
// replace the legacy implementation
return Start(compaction_service_input, info.job_id);
}
// Wait compaction to be finish.
// TODO: Add output path override
// Warning: deprecated, please use the new interface
// `WaitForCompleteV2(CompactionServiceJobInfo, ...)` instead.
virtual CompactionServiceJobStatus WaitForComplete(
uint64_t job_id, std::string* compaction_service_result) = 0;
virtual ~CompactionService() {}
uint64_t /*job_id*/, std::string* /*compaction_service_result*/) {
return CompactionServiceJobStatus::kUseLocal;
}
// Wait for remote compaction to finish.
virtual CompactionServiceJobStatus WaitForCompleteV2(
const CompactionServiceJobInfo& info,
std::string* compaction_service_result) {
// Default implementation to call legacy interface, please override and
// replace the legacy implementation
return WaitForComplete(info.job_id, compaction_service_result);
}
~CompactionService() override = default;
};
struct DBOptions {

Loading…
Cancel
Save