RemoteCompaction support Fallback to local compaction (#8709)

Summary:
Add support for fallback to local compaction, the user can
return `CompactionServiceJobStatus::kUseLocal` to instruct RocksDB to
run the compaction locally instead of waiting for the remote compaction
result.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8709

Test Plan: unittest

Reviewed By: ajkr

Differential Revision: D30560163

Pulled By: jay-zhuang

fbshipit-source-id: 65d8905a4a1bc185a68daa120997f21d3198dbe1
main
Jay Zhuang 3 years ago committed by Facebook GitHub Bot
parent b512f4bc76
commit 1c290c785d
  1. 1
      HISTORY.md
  2. 86
      db/compaction/compaction_job.cc
  3. 2
      db/compaction/compaction_job.h
  4. 127
      db/compaction/compaction_service_test.cc
  5. 2
      include/rocksdb/options.h

@ -24,6 +24,7 @@
* Add compaction priority information in RemoteCompaction, which can be used to schedule high priority job first. * Add compaction priority information in RemoteCompaction, which can be used to schedule high priority job first.
* Added new callback APIs `OnBlobFileCreationStarted`,`OnBlobFileCreated`and `OnBlobFileDeleted` in `EventListener` class of listener.h. It notifies listeners during creation/deletion of individual blob files in Integrated BlobDB. It also log blob file creation finished event and deletion event in LOG file. * Added new callback APIs `OnBlobFileCreationStarted`,`OnBlobFileCreated`and `OnBlobFileDeleted` in `EventListener` class of listener.h. It notifies listeners during creation/deletion of individual blob files in Integrated BlobDB. It also log blob file creation finished event and deletion event in LOG file.
* Batch blob read requests for `DB::MultiGet` using `MultiRead`. * Batch blob read requests for `DB::MultiGet` using `MultiRead`.
* Add support for fallback to local compaction, the user can return `CompactionServiceJobStatus::kUseLocal` to instruct RocksDB to run the compaction locally instead of waiting for the remote compaction result.
### Public API change ### Public API change
* Remove obsolete implementation details FullKey and ParseFullKey from public API * Remove obsolete implementation details FullKey and ParseFullKey from public API

@ -932,7 +932,8 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
} }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
void CompactionJob::ProcessKeyValueCompactionWithCompactionService( CompactionServiceJobStatus
CompactionJob::ProcessKeyValueCompactionWithCompactionService(
SubcompactionState* sub_compact) { SubcompactionState* sub_compact) {
assert(sub_compact); assert(sub_compact);
assert(sub_compact->compaction); assert(sub_compact->compaction);
@ -969,7 +970,7 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
Status s = compaction_input.Write(&compaction_input_binary); Status s = compaction_input.Write(&compaction_input_binary);
if (!s.ok()) { if (!s.ok()) {
sub_compact->status = s; sub_compact->status = s;
return; return CompactionServiceJobStatus::kFailure;
} }
std::ostringstream input_files_oss; std::ostringstream input_files_oss;
@ -988,36 +989,73 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
GetCompactionId(sub_compact), thread_pri_); GetCompactionId(sub_compact), thread_pri_);
CompactionServiceJobStatus compaction_status = CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->StartV2(info, compaction_input_binary); db_options_.compaction_service->StartV2(info, compaction_input_binary);
if (compaction_status != CompactionServiceJobStatus::kSuccess) { switch (compaction_status) {
sub_compact->status = case CompactionServiceJobStatus::kSuccess:
Status::Incomplete("CompactionService failed to start compaction job."); break;
return; case CompactionServiceJobStatus::kFailure:
sub_compact->status = Status::Incomplete(
"CompactionService failed to start compaction job.");
ROCKS_LOG_WARN(db_options_.info_log,
"[%s] [JOB %d] Remote compaction failed to start.",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
case CompactionServiceJobStatus::kUseLocal:
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API Start.",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
default:
assert(false); // unknown status
break;
} }
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Waiting for remote compaction...",
compaction_input.column_family.name.c_str(), job_id_);
std::string compaction_result_binary; std::string compaction_result_binary;
compaction_status = db_options_.compaction_service->WaitForCompleteV2( compaction_status = db_options_.compaction_service->WaitForCompleteV2(
info, &compaction_result_binary); info, &compaction_result_binary);
if (compaction_status == CompactionServiceJobStatus::kUseLocal) {
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API "
"WaitForComplete.",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
}
CompactionServiceResult compaction_result; CompactionServiceResult compaction_result;
s = CompactionServiceResult::Read(compaction_result_binary, s = CompactionServiceResult::Read(compaction_result_binary,
&compaction_result); &compaction_result);
if (compaction_status != CompactionServiceJobStatus::kSuccess) {
sub_compact->status = if (compaction_status == CompactionServiceJobStatus::kFailure) {
s.ok() ? compaction_result.status if (s.ok()) {
: Status::Incomplete( if (compaction_result.status.ok()) {
"CompactionService failed to run compaction job."); sub_compact->status = Status::Incomplete(
"CompactionService failed to run the compaction job (even though "
"the internal status is okay).");
} else {
// set the current sub compaction status with the status returned from
// remote
sub_compact->status = compaction_result.status;
}
} else {
sub_compact->status = Status::Incomplete(
"CompactionService failed to run the compaction job (and no valid "
"result is returned).");
compaction_result.status.PermitUncheckedError(); compaction_result.status.PermitUncheckedError();
}
ROCKS_LOG_WARN(db_options_.info_log, ROCKS_LOG_WARN(db_options_.info_log,
"[%s] [JOB %d] Remote compaction failed, status: %s", "[%s] [JOB %d] Remote compaction failed.",
compaction_input.column_family.name.c_str(), job_id_, compaction_input.column_family.name.c_str(), job_id_);
s.ToString().c_str()); return compaction_status;
return;
} }
if (!s.ok()) { if (!s.ok()) {
sub_compact->status = s; sub_compact->status = s;
compaction_result.status.PermitUncheckedError(); compaction_result.status.PermitUncheckedError();
return; return CompactionServiceJobStatus::kFailure;
} }
sub_compact->status = compaction_result.status; sub_compact->status = compaction_result.status;
@ -1037,7 +1075,7 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
if (!s.ok()) { if (!s.ok()) {
sub_compact->status = s; sub_compact->status = s;
return; return CompactionServiceJobStatus::kFailure;
} }
for (const auto& file : compaction_result.output_files) { for (const auto& file : compaction_result.output_files) {
@ -1048,7 +1086,7 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
s = fs_->RenameFile(src_file, tgt_file, IOOptions(), nullptr); s = fs_->RenameFile(src_file, tgt_file, IOOptions(), nullptr);
if (!s.ok()) { if (!s.ok()) {
sub_compact->status = s; sub_compact->status = s;
return; return CompactionServiceJobStatus::kFailure;
} }
FileMetaData meta; FileMetaData meta;
@ -1056,7 +1094,7 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr); s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr);
if (!s.ok()) { if (!s.ok()) {
sub_compact->status = s; sub_compact->status = s;
return; return CompactionServiceJobStatus::kFailure;
} }
meta.fd = FileDescriptor(file_num, compaction->output_path_id(), file_size, meta.fd = FileDescriptor(file_num, compaction->output_path_id(), file_size,
file.smallest_seqno, file.largest_seqno); file.smallest_seqno, file.largest_seqno);
@ -1077,6 +1115,7 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
sub_compact->total_bytes = compaction_result.total_bytes; sub_compact->total_bytes = compaction_result.total_bytes;
IOSTATS_ADD(bytes_written, compaction_result.bytes_written); IOSTATS_ADD(bytes_written, compaction_result.bytes_written);
IOSTATS_ADD(bytes_read, compaction_result.bytes_read); IOSTATS_ADD(bytes_read, compaction_result.bytes_read);
return CompactionServiceJobStatus::kSuccess;
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
@ -1086,7 +1125,14 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (db_options_.compaction_service) { if (db_options_.compaction_service) {
return ProcessKeyValueCompactionWithCompactionService(sub_compact); CompactionServiceJobStatus comp_status =
ProcessKeyValueCompactionWithCompactionService(sub_compact);
if (comp_status == CompactionServiceJobStatus::kSuccess ||
comp_status == CompactionServiceJobStatus::kFailure) {
return;
}
// fallback to local compaction
assert(comp_status == CompactionServiceJobStatus::kUseLocal);
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE

@ -146,7 +146,7 @@ class CompactionJob {
// consecutive groups such that each group has a similar size. // consecutive groups such that each group has a similar size.
void GenSubcompactionBoundaries(); void GenSubcompactionBoundaries();
void ProcessKeyValueCompactionWithCompactionService( CompactionServiceJobStatus ProcessKeyValueCompactionWithCompactionService(
SubcompactionState* sub_compact); SubcompactionState* sub_compact);
// update the thread status for starting a compaction. // update the thread status for starting a compaction.

@ -19,6 +19,11 @@ class TestCompactionServiceBase {
override_start_status = s; override_start_status = s;
} }
void OverrideWaitStatus(CompactionServiceJobStatus s) {
is_override_wait_status = true;
override_wait_status = s;
}
void OverrideWaitResult(std::string str) { void OverrideWaitResult(std::string str) {
is_override_wait_result = true; is_override_wait_result = true;
override_wait_result = std::move(str); override_wait_result = std::move(str);
@ -27,6 +32,7 @@ class TestCompactionServiceBase {
void ResetOverride() { void ResetOverride() {
is_override_wait_result = false; is_override_wait_result = false;
is_override_start_status = false; is_override_start_status = false;
is_override_wait_status = false;
} }
virtual ~TestCompactionServiceBase() = default; virtual ~TestCompactionServiceBase() = default;
@ -35,6 +41,9 @@ class TestCompactionServiceBase {
bool is_override_start_status = false; bool is_override_start_status = false;
CompactionServiceJobStatus override_start_status = CompactionServiceJobStatus override_start_status =
CompactionServiceJobStatus::kFailure; CompactionServiceJobStatus::kFailure;
bool is_override_wait_status = false;
CompactionServiceJobStatus override_wait_status =
CompactionServiceJobStatus::kFailure;
bool is_override_wait_result = false; bool is_override_wait_result = false;
std::string override_wait_result; std::string override_wait_result;
}; };
@ -76,6 +85,10 @@ class MyTestCompactionServiceLegacy : public CompactionService,
jobs_.erase(i); jobs_.erase(i);
} }
if (is_override_wait_status) {
return override_wait_status;
}
CompactionServiceOptionsOverride options_override; CompactionServiceOptionsOverride options_override;
options_override.env = options_.env; options_override.env = options_.env;
options_override.file_checksum_gen_factory = options_override.file_checksum_gen_factory =
@ -160,6 +173,10 @@ class MyTestCompactionService : public CompactionService,
jobs_.erase(i); jobs_.erase(i);
} }
if (is_override_wait_status) {
return override_wait_status;
}
CompactionServiceOptionsOverride options_override; CompactionServiceOptionsOverride options_override;
options_override.env = options_.env; options_override.env = options_.env;
options_override.file_checksum_gen_factory = options_override.file_checksum_gen_factory =
@ -323,7 +340,7 @@ TEST_P(CompactionServiceTest, BasicCompactions) {
Statistics* compactor_statistics = GetCompactorStatistics(); Statistics* compactor_statistics = GetCompactorStatistics();
ASSERT_GE(my_cs->GetCompactionNum(), 1); ASSERT_GE(my_cs->GetCompactionNum(), 1);
// make sure the compaction statistics is only recorded on remote side // make sure the compaction statistics is only recorded on the remote side
ASSERT_GE( ASSERT_GE(
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), 1); compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), 1);
Statistics* primary_statistics = GetPrimaryStatistics(); Statistics* primary_statistics = GetPrimaryStatistics();
@ -658,6 +675,114 @@ TEST_P(CompactionServiceTest, CompactionInfo) {
ASSERT_EQ(Env::BOTTOM, info.priority); ASSERT_EQ(Env::BOTTOM, info.priority);
} }
TEST_P(CompactionServiceTest, FallbackLocalAuto) {
Options options = CurrentOptions();
ReopenWithCompactionService(&options);
auto my_cs = GetCompactionService();
Statistics* compactor_statistics = GetCompactorStatistics();
Statistics* primary_statistics = GetPrimaryStatistics();
uint64_t compactor_new_key =
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);
uint64_t primary_new_key =
primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);
my_cs->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal);
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 10 + j;
ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
}
ASSERT_OK(Flush());
}
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 20 + j * 2;
ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// verify result
for (int i = 0; i < 200; i++) {
auto result = Get(Key(i));
if (i % 2) {
ASSERT_EQ(result, "value" + ToString(i));
} else {
ASSERT_EQ(result, "value_new" + ToString(i));
}
}
ASSERT_EQ(my_cs->GetCompactionNum(), 0);
// make sure the compaction statistics is only recorded on the local side
ASSERT_EQ(
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
compactor_new_key);
ASSERT_GT(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
primary_new_key);
}
TEST_P(CompactionServiceTest, FallbackLocalManual) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
ReopenWithCompactionService(&options);
GenerateTestData();
VerifyTestData();
auto my_cs = GetCompactionService();
Statistics* compactor_statistics = GetCompactorStatistics();
Statistics* primary_statistics = GetPrimaryStatistics();
uint64_t compactor_new_key =
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);
uint64_t primary_new_key =
primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);
// re-enable remote compaction
my_cs->ResetOverride();
std::string start_str = Key(15);
std::string end_str = Key(45);
Slice start(start_str);
Slice end(end_str);
uint64_t comp_num = my_cs->GetCompactionNum();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
// make sure the compaction statistics is only recorded on the remote side
ASSERT_GT(
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
compactor_new_key);
ASSERT_EQ(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
primary_new_key);
// return run local again with API WaitForComplete
my_cs->OverrideWaitStatus(CompactionServiceJobStatus::kUseLocal);
start_str = Key(120);
start = start_str;
comp_num = my_cs->GetCompactionNum();
compactor_new_key =
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);
primary_new_key =
primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
ASSERT_EQ(my_cs->GetCompactionNum(),
comp_num); // no remote compaction is run
// make sure the compaction statistics is only recorded on the local side
ASSERT_EQ(
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
compactor_new_key);
ASSERT_GT(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
primary_new_key);
// verify result after 2 manual compactions
VerifyTestData();
}
INSTANTIATE_TEST_CASE_P( INSTANTIATE_TEST_CASE_P(
CompactionServiceTest, CompactionServiceTest, CompactionServiceTest, CompactionServiceTest,
::testing::Values( ::testing::Values(

@ -372,7 +372,7 @@ extern const char* kHostnameForDbHostId;
enum class CompactionServiceJobStatus : char { enum class CompactionServiceJobStatus : char {
kSuccess, kSuccess,
kFailure, kFailure,
kUseLocal, // TODO: Add support for use local compaction kUseLocal,
}; };
struct CompactionServiceJobInfo { struct CompactionServiceJobInfo {

Loading…
Cancel
Save