Add remote compaction public API (#8300)

Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/8300

Reviewed By: ajkr

Differential Revision: D28464726

Pulled By: jay-zhuang

fbshipit-source-id: 49e9f4fb791808a6cbf39a7b1a331373f645fc5e
main
Jay Zhuang 4 years ago committed by Facebook GitHub Bot
parent 311a544c2a
commit 3786181a90
  1. 1
      CMakeLists.txt
  2. 1
      HISTORY.md
  3. 3
      Makefile
  4. 7
      TARGETS
  5. 264
      db/compaction/compaction_job.cc
  6. 7
      db/compaction/compaction_job.h
  7. 17
      db/compaction/compaction_job_test.cc
  8. 454
      db/compaction/compaction_service_test.cc
  9. 11
      db/db_impl/db_impl_compaction_flush.cc
  10. 84
      db/db_impl/db_impl_secondary.cc
  11. 7
      db/db_secondary_test.cc
  12. 6
      db/output_validator.h
  13. 10
      include/rocksdb/db.h
  14. 49
      include/rocksdb/options.h
  15. 7
      include/rocksdb/status.h
  16. 1
      include/rocksdb/utilities/options_type.h
  17. 3
      options/db_options.cc
  18. 1
      options/db_options.h
  19. 8
      options/options_helper.cc
  20. 2
      options/options_settable_test.cc
  21. 1
      src.mk
  22. 5
      util/status.cc
  23. 9
      util/string_util.cc
  24. 2
      util/string_util.h

@ -1113,6 +1113,7 @@ if(WITH_TESTS)
db/compaction/compaction_job_test.cc db/compaction/compaction_job_test.cc
db/compaction/compaction_iterator_test.cc db/compaction/compaction_iterator_test.cc
db/compaction/compaction_picker_test.cc db/compaction/compaction_picker_test.cc
db/compaction/compaction_service_test.cc
db/comparator_db_test.cc db/comparator_db_test.cc
db/corruption_test.cc db/corruption_test.cc
db/cuckoo_table_db_test.cc db/cuckoo_table_db_test.cc

@ -17,6 +17,7 @@
* Allow `CompactionFilter`s to apply in more table file creation scenarios such as flush and recovery. For compatibility, `CompactionFilter`s by default apply during compaction. Users can customize this behavior by overriding `CompactionFilterFactory::ShouldFilterTableFileCreation()`. * Allow `CompactionFilter`s to apply in more table file creation scenarios such as flush and recovery. For compatibility, `CompactionFilter`s by default apply during compaction. Users can customize this behavior by overriding `CompactionFilterFactory::ShouldFilterTableFileCreation()`.
* Added more fields to FilterBuildingContext with LSM details, for custom filter policies that vary behavior based on where they are in the LSM-tree. * Added more fields to FilterBuildingContext with LSM details, for custom filter policies that vary behavior based on where they are in the LSM-tree.
* Added DB::Properties::kBlockCacheEntryStats for querying statistics on what percentage of block cache is used by various kinds of blocks, etc. using DB::GetProperty and DB::GetMapProperty. The same information is now dumped to info LOG periodically according to `stats_dump_period_sec`. * Added DB::Properties::kBlockCacheEntryStats for querying statistics on what percentage of block cache is used by various kinds of blocks, etc. using DB::GetProperty and DB::GetMapProperty. The same information is now dumped to info LOG periodically according to `stats_dump_period_sec`.
* Add an experimental Remote Compaction feature, which allows the user to run Compaction on a different host or process. The feature is still under development, currently only works on some basic use cases. The interface will be changed without backward/forward compatibility support.
### Performance Improvements ### Performance Improvements
* BlockPrefetcher is used by iterators to prefetch data if they anticipate more data to be used in future. It is enabled implicitly by rocksdb. Added change to take in account read pattern if reads are sequential. This would disable prefetching for random reads in MultiGet and iterators as readahead_size is increased exponential doing large prefetches. * BlockPrefetcher is used by iterators to prefetch data if they anticipate more data to be used in future. It is enabled implicitly by rocksdb. Added change to take in account read pattern if reads are sequential. This would disable prefetching for random reads in MultiGet and iterators as readahead_size is increased exponential doing large prefetches.

@ -1540,6 +1540,9 @@ compaction_job_test: $(OBJ_DIR)/db/compaction/compaction_job_test.o $(TEST_LIBRA
compaction_job_stats_test: $(OBJ_DIR)/db/compaction/compaction_job_stats_test.o $(TEST_LIBRARY) $(LIBRARY) compaction_job_stats_test: $(OBJ_DIR)/db/compaction/compaction_job_stats_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK) $(AM_LINK)
compaction_service_test: $(OBJ_DIR)/db/compaction/compaction_service_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
compact_on_deletion_collector_test: $(OBJ_DIR)/utilities/table_properties_collectors/compact_on_deletion_collector_test.o $(TEST_LIBRARY) $(LIBRARY) compact_on_deletion_collector_test: $(OBJ_DIR)/utilities/table_properties_collectors/compact_on_deletion_collector_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK) $(AM_LINK)

@ -1107,6 +1107,13 @@ ROCKS_TESTS = [
[], [],
[], [],
], ],
[
"compaction_service_test",
"db/compaction/compaction_service_test.cc",
"parallel",
[],
[],
],
[ [
"comparator_db_test", "comparator_db_test",
"db/comparator_db_test.cc", "db/comparator_db_test.cc",

@ -131,10 +131,12 @@ struct CompactionJob::SubcompactionState {
// Files produced by this subcompaction // Files produced by this subcompaction
struct Output { struct Output {
Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp, Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp,
bool _enable_order_check, bool _enable_hash) bool _enable_order_check, bool _enable_hash, bool _finished = false,
uint64_t precalculated_hash = 0)
: meta(std::move(_meta)), : meta(std::move(_meta)),
validator(_icmp, _enable_order_check, _enable_hash), validator(_icmp, _enable_order_check, _enable_hash,
finished(false) {} precalculated_hash),
finished(_finished) {}
FileMetaData meta; FileMetaData meta;
OutputValidator validator; OutputValidator validator;
bool finished; bool finished;
@ -299,8 +301,8 @@ void CompactionJob::AggregateStatistics() {
CompactionJob::CompactionJob( CompactionJob::CompactionJob(
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
const FileOptions& file_options, VersionSet* versions, const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
const std::atomic<bool>* shutting_down, VersionSet* versions, const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer, const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer,
FSDirectory* db_directory, FSDirectory* output_directory, FSDirectory* db_directory, FSDirectory* output_directory,
FSDirectory* blob_output_directory, Statistics* stats, FSDirectory* blob_output_directory, Statistics* stats,
@ -317,6 +319,7 @@ CompactionJob::CompactionJob(
: compact_(new CompactionState(compaction)), : compact_(new CompactionState(compaction)),
compaction_stats_(compaction->compaction_reason(), 1), compaction_stats_(compaction->compaction_reason(), 1),
db_options_(db_options), db_options_(db_options),
mutable_db_options_copy_(mutable_db_options),
log_buffer_(log_buffer), log_buffer_(log_buffer),
output_directory_(output_directory), output_directory_(output_directory),
stats_(stats), stats_(stats),
@ -901,10 +904,163 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
return status; return status;
} }
#ifndef ROCKSDB_LITE
void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
SubcompactionState* sub_compact) {
assert(sub_compact);
assert(sub_compact->compaction);
assert(db_options_.compaction_service);
const Compaction* compaction = sub_compact->compaction;
CompactionServiceInput compaction_input;
compaction_input.output_level = compaction->output_level();
const std::vector<CompactionInputFiles>& inputs =
*(compact_->compaction->inputs());
for (const auto& files_per_level : inputs) {
for (const auto& file : files_per_level.files) {
compaction_input.input_files.emplace_back(
MakeTableFileName(file->fd.GetNumber()));
}
}
compaction_input.column_family.name =
compaction->column_family_data()->GetName();
compaction_input.column_family.options =
compaction->column_family_data()->GetLatestCFOptions();
compaction_input.db_options =
BuildDBOptions(db_options_, mutable_db_options_copy_);
compaction_input.snapshots = existing_snapshots_;
compaction_input.has_begin = sub_compact->start;
compaction_input.begin =
compaction_input.has_begin ? sub_compact->start->ToString() : "";
compaction_input.has_end = sub_compact->end;
compaction_input.end =
compaction_input.has_end ? sub_compact->end->ToString() : "";
compaction_input.approx_size = sub_compact->approx_size;
std::string compaction_input_binary;
Status s = compaction_input.Write(&compaction_input_binary);
if (!s.ok()) {
sub_compact->status = s;
return;
}
std::ostringstream input_files_oss;
bool is_first_one = true;
for (const auto& file : compaction_input.input_files) {
input_files_oss << (is_first_one ? "" : ", ") << file;
is_first_one = false;
}
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Starting remote compaction (output level: %d): %s",
compaction_input.column_family.name.c_str(), job_id_,
compaction_input.output_level, input_files_oss.str().c_str());
CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->Start(compaction_input_binary, job_id_);
if (compaction_status != CompactionServiceJobStatus::kSuccess) {
sub_compact->status =
Status::Incomplete("CompactionService failed to start compaction job.");
return;
}
std::string compaction_result_binary;
compaction_status = db_options_.compaction_service->WaitForComplete(
job_id_, &compaction_result_binary);
CompactionServiceResult compaction_result;
s = CompactionServiceResult::Read(compaction_result_binary,
&compaction_result);
if (compaction_status != CompactionServiceJobStatus::kSuccess) {
sub_compact->status =
s.ok() ? compaction_result.status
: Status::Incomplete(
"CompactionService failed to run compaction job.");
compaction_result.status.PermitUncheckedError();
ROCKS_LOG_WARN(db_options_.info_log,
"[%s] [JOB %d] Remote compaction failed, status: %s",
compaction_input.column_family.name.c_str(), job_id_,
s.ToString().c_str());
return;
}
if (!s.ok()) {
sub_compact->status = s;
compaction_result.status.PermitUncheckedError();
return;
}
sub_compact->status = compaction_result.status;
std::ostringstream output_files_oss;
is_first_one = true;
for (const auto& file : compaction_result.output_files) {
output_files_oss << (is_first_one ? "" : ", ") << file.file_name;
is_first_one = false;
}
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Receive remote compaction result, output path: "
"%s, files: %s",
compaction_input.column_family.name.c_str(), job_id_,
compaction_result.output_path.c_str(),
output_files_oss.str().c_str());
if (!s.ok()) {
sub_compact->status = s;
return;
}
for (const auto& file : compaction_result.output_files) {
uint64_t file_num = versions_->NewFileNumber();
auto src_file = compaction_result.output_path + "/" + file.file_name;
auto tgt_file = TableFileName(compaction->immutable_cf_options()->cf_paths,
file_num, compaction->output_path_id());
s = fs_->RenameFile(src_file, tgt_file, IOOptions(), nullptr);
if (!s.ok()) {
sub_compact->status = s;
return;
}
FileMetaData meta;
uint64_t file_size;
s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr);
if (!s.ok()) {
sub_compact->status = s;
return;
}
meta.fd = FileDescriptor(file_num, compaction->output_path_id(), file_size,
file.smallest_seqno, file.largest_seqno);
meta.smallest.DecodeFrom(file.smallest_internal_key);
meta.largest.DecodeFrom(file.largest_internal_key);
meta.oldest_ancester_time = file.oldest_ancester_time;
meta.file_creation_time = file.file_creation_time;
meta.marked_for_compaction = file.marked_for_compaction;
auto cfd = compaction->column_family_data();
sub_compact->outputs.emplace_back(std::move(meta),
cfd->internal_comparator(), false, false,
true, file.paranoid_hash);
}
sub_compact->compaction_job_stats = compaction_result.stats;
sub_compact->num_output_records = compaction_result.num_output_records;
sub_compact->approx_size = compaction_input.approx_size; // is this used?
sub_compact->total_bytes = compaction_result.total_bytes;
IOSTATS_ADD(bytes_written, compaction_result.bytes_written);
IOSTATS_ADD(bytes_read, compaction_result.bytes_read);
}
#endif // !ROCKSDB_LITE
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
assert(sub_compact); assert(sub_compact);
assert(sub_compact->compaction); assert(sub_compact->compaction);
#ifndef ROCKSDB_LITE
if (db_options_.compaction_service) {
return ProcessKeyValueCompactionWithCompactionService(sub_compact);
}
#endif // !ROCKSDB_LITE
uint64_t prev_cpu_micros = db_options_.clock->CPUNanos() / 1000; uint64_t prev_cpu_micros = db_options_.clock->CPUNanos() / 1000;
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
@ -1951,9 +2107,9 @@ std::string CompactionServiceCompactionJob::GetTableFileName(
CompactionServiceCompactionJob::CompactionServiceCompactionJob( CompactionServiceCompactionJob::CompactionServiceCompactionJob(
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
const FileOptions& file_options, VersionSet* versions, const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
const std::atomic<bool>* shutting_down, LogBuffer* log_buffer, VersionSet* versions, const std::atomic<bool>* shutting_down,
FSDirectory* output_directory, Statistics* stats, LogBuffer* log_buffer, FSDirectory* output_directory, Statistics* stats,
InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
std::vector<SequenceNumber> existing_snapshots, std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger, std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
@ -1963,10 +2119,10 @@ CompactionServiceCompactionJob::CompactionServiceCompactionJob(
const CompactionServiceInput& compaction_service_input, const CompactionServiceInput& compaction_service_input,
CompactionServiceResult* compaction_service_result) CompactionServiceResult* compaction_service_result)
: CompactionJob( : CompactionJob(
job_id, compaction, db_options, file_options, versions, shutting_down, job_id, compaction, db_options, mutable_db_options, file_options,
0, log_buffer, nullptr, output_directory, nullptr, stats, db_mutex, versions, shutting_down, 0, log_buffer, nullptr, output_directory,
db_error_handler, existing_snapshots, kMaxSequenceNumber, nullptr, nullptr, stats, db_mutex, db_error_handler, existing_snapshots,
table_cache, event_logger, kMaxSequenceNumber, nullptr, table_cache, event_logger,
compaction->mutable_cf_options()->paranoid_file_checks, compaction->mutable_cf_options()->paranoid_file_checks,
compaction->mutable_cf_options()->report_bg_io_stats, dbname, compaction->mutable_cf_options()->report_bg_io_stats, dbname,
&(compaction_service_result->stats), Env::Priority::USER, io_tracer, &(compaction_service_result->stats), Env::Priority::USER, io_tracer,
@ -2357,7 +2513,85 @@ static std::unordered_map<std::string, OptionTypeInfo>
OptionTypeFlags::kNone}}, OptionTypeFlags::kNone}},
}; };
namespace {
// this is a helper struct to serialize and deserialize class Status, because
// Status's members are not public.
struct StatusSerializationAdapter {
uint8_t code;
uint8_t subcode;
uint8_t severity;
std::string message;
StatusSerializationAdapter() {}
explicit StatusSerializationAdapter(const Status& s) {
code = s.code();
subcode = s.subcode();
severity = s.severity();
auto msg = s.getState();
message = msg ? msg : "";
}
Status GetStatus() {
return Status(static_cast<Status::Code>(code),
static_cast<Status::SubCode>(subcode),
static_cast<Status::Severity>(severity), message);
}
};
} // namespace
static std::unordered_map<std::string, OptionTypeInfo>
status_adapter_type_info = {
{"code",
{offsetof(struct StatusSerializationAdapter, code),
OptionType::kUInt8T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"subcode",
{offsetof(struct StatusSerializationAdapter, subcode),
OptionType::kUInt8T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"severity",
{offsetof(struct StatusSerializationAdapter, severity),
OptionType::kUInt8T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"message",
{offsetof(struct StatusSerializationAdapter, message),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
};
static std::unordered_map<std::string, OptionTypeInfo> cs_result_type_info = { static std::unordered_map<std::string, OptionTypeInfo> cs_result_type_info = {
{"status",
{offsetof(struct CompactionServiceResult, status),
OptionType::kCustomizable, OptionVerificationType::kNormal,
OptionTypeFlags::kNone,
[](const ConfigOptions& opts, const std::string& /*name*/,
const std::string& value, void* addr) {
auto status_obj = static_cast<Status*>(addr);
StatusSerializationAdapter adapter;
Status s = OptionTypeInfo::ParseType(
opts, value, status_adapter_type_info, &adapter);
*status_obj = adapter.GetStatus();
return s;
},
[](const ConfigOptions& opts, const std::string& /*name*/,
const void* addr, std::string* value) {
const auto status_obj = static_cast<const Status*>(addr);
StatusSerializationAdapter adapter(*status_obj);
std::string result;
Status s = OptionTypeInfo::SerializeType(opts, status_adapter_type_info,
&adapter, &result);
*value = "{" + result + "}";
return s;
},
[](const ConfigOptions& opts, const std::string& /*name*/,
const void* addr1, const void* addr2, std::string* mismatch) {
const auto status1 = static_cast<const Status*>(addr1);
const auto status2 = static_cast<const Status*>(addr2);
StatusSerializationAdapter adatper1(*status1);
StatusSerializationAdapter adapter2(*status2);
return OptionTypeInfo::TypesAreEqual(opts, status_adapter_type_info,
&adatper1, &adapter2, mismatch);
}}},
{"output_files", {"output_files",
OptionTypeInfo::Vector<CompactionServiceOutputFile>( OptionTypeInfo::Vector<CompactionServiceOutputFile>(
offsetof(struct CompactionServiceResult, output_files), offsetof(struct CompactionServiceResult, output_files),
@ -2396,6 +2630,9 @@ static std::unordered_map<std::string, OptionTypeInfo> cs_result_type_info = {
Status CompactionServiceInput::Read(const std::string& data_str, Status CompactionServiceInput::Read(const std::string& data_str,
CompactionServiceInput* obj) { CompactionServiceInput* obj) {
if (data_str.size() <= sizeof(BinaryFormatVersion)) {
return Status::InvalidArgument("Invalid CompactionServiceInput string");
}
auto format_version = DecodeFixed32(data_str.data()); auto format_version = DecodeFixed32(data_str.data());
if (format_version == kOptionsString) { if (format_version == kOptionsString) {
ConfigOptions cf; ConfigOptions cf;
@ -2422,6 +2659,9 @@ Status CompactionServiceInput::Write(std::string* output) {
Status CompactionServiceResult::Read(const std::string& data_str, Status CompactionServiceResult::Read(const std::string& data_str,
CompactionServiceResult* obj) { CompactionServiceResult* obj) {
if (data_str.size() <= sizeof(BinaryFormatVersion)) {
return Status::InvalidArgument("Invalid CompactionServiceResult string");
}
auto format_version = DecodeFixed32(data_str.data()); auto format_version = DecodeFixed32(data_str.data());
if (format_version == kOptionsString) { if (format_version == kOptionsString) {
ConfigOptions cf; ConfigOptions cf;

@ -66,6 +66,7 @@ class CompactionJob {
public: public:
CompactionJob( CompactionJob(
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
const MutableDBOptions& mutable_db_options,
const FileOptions& file_options, VersionSet* versions, const FileOptions& file_options, VersionSet* versions,
const std::atomic<bool>* shutting_down, const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer, const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer,
@ -125,6 +126,7 @@ class CompactionJob {
CompactionState* compact_; CompactionState* compact_;
InternalStats::CompactionStats compaction_stats_; InternalStats::CompactionStats compaction_stats_;
const ImmutableDBOptions& db_options_; const ImmutableDBOptions& db_options_;
const MutableDBOptions mutable_db_options_copy_;
LogBuffer* log_buffer_; LogBuffer* log_buffer_;
FSDirectory* output_directory_; FSDirectory* output_directory_;
Statistics* stats_; Statistics* stats_;
@ -143,6 +145,9 @@ class CompactionJob {
// consecutive groups such that each group has a similar size. // consecutive groups such that each group has a similar size.
void GenSubcompactionBoundaries(); void GenSubcompactionBoundaries();
void ProcessKeyValueCompactionWithCompactionService(
SubcompactionState* sub_compact);
// update the thread status for starting a compaction. // update the thread status for starting a compaction.
void ReportStartedCompaction(Compaction* compaction); void ReportStartedCompaction(Compaction* compaction);
void AllocateCompactionOutputFileNumbers(); void AllocateCompactionOutputFileNumbers();
@ -287,6 +292,7 @@ struct CompactionServiceOutputFile {
// instance, with these information, the primary db instance with write // instance, with these information, the primary db instance with write
// permission is able to install the result to the DB. // permission is able to install the result to the DB.
struct CompactionServiceResult { struct CompactionServiceResult {
Status status;
std::vector<CompactionServiceOutputFile> output_files; std::vector<CompactionServiceOutputFile> output_files;
int output_level; int output_level;
@ -317,6 +323,7 @@ class CompactionServiceCompactionJob : private CompactionJob {
public: public:
CompactionServiceCompactionJob( CompactionServiceCompactionJob(
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
const MutableDBOptions& mutable_db_options,
const FileOptions& file_options, VersionSet* versions, const FileOptions& file_options, VersionSet* versions,
const std::atomic<bool>* shutting_down, LogBuffer* log_buffer, const std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
FSDirectory* output_directory, Statistics* stats, FSDirectory* output_directory, Statistics* stats,

@ -344,9 +344,9 @@ class CompactionJobTestBase : public testing::Test {
ASSERT_TRUE(full_history_ts_low_.empty() || ASSERT_TRUE(full_history_ts_low_.empty() ||
ucmp_->timestamp_size() == full_history_ts_low_.size()); ucmp_->timestamp_size() == full_history_ts_low_.size());
CompactionJob compaction_job( CompactionJob compaction_job(
0, &compaction, db_options_, env_options_, versions_.get(), 0, &compaction, db_options_, mutable_db_options_, env_options_,
&shutting_down_, preserve_deletes_seqnum_, &log_buffer, nullptr, versions_.get(), &shutting_down_, preserve_deletes_seqnum_, &log_buffer,
nullptr, nullptr, nullptr, &mutex_, &error_handler_, snapshots, nullptr, nullptr, nullptr, nullptr, &mutex_, &error_handler_, snapshots,
earliest_write_conflict_snapshot, snapshot_checker, table_cache_, earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
&event_logger, false, false, dbname_, &compaction_job_stats_, &event_logger, false, false, dbname_, &compaction_job_stats_,
Env::Priority::USER, nullptr /* IOTracer */, Env::Priority::USER, nullptr /* IOTracer */,
@ -1190,6 +1190,14 @@ TEST_F(CompactionJobTest, ResultSerialization) {
const int kStrMaxLen = 1000; const int kStrMaxLen = 1000;
Random rnd(static_cast<uint32_t>(time(nullptr))); Random rnd(static_cast<uint32_t>(time(nullptr)));
Random64 rnd64(time(nullptr)); Random64 rnd64(time(nullptr));
std::vector<Status> status_list = {
Status::OK(),
Status::InvalidArgument("invalid option"),
Status::Aborted("failed to run"),
Status::NotSupported("not supported option"),
};
result.status =
status_list.at(rnd.Uniform(static_cast<int>(status_list.size())));
while (!rnd.OneIn(10)) { while (!rnd.OneIn(10)) {
result.output_files.emplace_back( result.output_files.emplace_back(
rnd.RandomString(rnd.Uniform(kStrMaxLen)), rnd64.Uniform(UINT64_MAX), rnd.RandomString(rnd.Uniform(kStrMaxLen)), rnd64.Uniform(UINT64_MAX),
@ -1262,6 +1270,9 @@ TEST_F(CompactionJobTest, ResultSerialization) {
output.replace(0, kDataVersionSize, buf, kDataVersionSize); output.replace(0, kDataVersionSize, buf, kDataVersionSize);
Status s = CompactionServiceResult::Read(output, &deserialized3); Status s = CompactionServiceResult::Read(output, &deserialized3);
ASSERT_TRUE(s.IsNotSupported()); ASSERT_TRUE(s.IsNotSupported());
for (const auto& item : status_list) {
item.PermitUncheckedError();
}
} }
class CompactionJobTimestampTest : public CompactionJobTestBase { class CompactionJobTimestampTest : public CompactionJobTestBase {

@ -0,0 +1,454 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "db/db_test_util.h"
#include "port/stack_trace.h"
namespace ROCKSDB_NAMESPACE {
class MyTestCompactionService : public CompactionService {
public:
MyTestCompactionService(const std::string& db_path,
std::shared_ptr<FileSystem> fs, Options& options)
: db_path_(db_path), fs_(fs), options_(options) {}
CompactionServiceJobStatus Start(const std::string& compaction_service_input,
int job_id) override {
InstrumentedMutexLock l(&mutex_);
jobs_.emplace(job_id, compaction_service_input);
CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
TEST_SYNC_POINT_CALLBACK("MyTestCompactionService::Start::End", &s);
return s;
}
CompactionServiceJobStatus WaitForComplete(
int job_id, std::string* compaction_service_result) override {
std::string compaction_input;
{
InstrumentedMutexLock l(&mutex_);
auto i = jobs_.find(job_id);
if (i == jobs_.end()) {
return CompactionServiceJobStatus::kFailure;
}
compaction_input = std::move(i->second);
jobs_.erase(i);
}
CompactionServiceOptionsOverride options_override;
options_override.env = options_.env;
options_override.file_checksum_gen_factory =
options_.file_checksum_gen_factory;
options_override.comparator = options_.comparator;
options_override.merge_operator = options_.merge_operator;
options_override.compaction_filter = options_.compaction_filter;
options_override.compaction_filter_factory =
options_.compaction_filter_factory;
options_override.prefix_extractor = options_.prefix_extractor;
options_override.table_factory = options_.table_factory;
options_override.sst_partitioner_factory = options_.sst_partitioner_factory;
Status s = DB::OpenAndCompact(db_path_, db_path_ + "/" + ToString(job_id),
compaction_input, compaction_service_result,
options_override);
TEST_SYNC_POINT_CALLBACK("MyTestCompactionService::WaitForComplete::End",
compaction_service_result);
compaction_num_.fetch_add(1);
if (s.ok()) {
return CompactionServiceJobStatus::kSuccess;
} else {
return CompactionServiceJobStatus::kFailure;
}
}
int GetCompactionNum() { return compaction_num_.load(); }
private:
InstrumentedMutex mutex_;
std::atomic_int compaction_num_{0};
std::map<int, std::string> jobs_;
const std::string db_path_;
std::shared_ptr<FileSystem> fs_;
Options options_;
};
class CompactionServiceTest : public DBTestBase {
public:
explicit CompactionServiceTest()
: DBTestBase("compaction_service_test", true) {}
protected:
void GenerateTestData() {
// Generate 20 files @ L2
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 10 + j;
ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
}
ASSERT_OK(Flush());
}
MoveFilesToLevel(2);
// Generate 10 files @ L1 overlap with all 20 files @ L2
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 20 + j * 2;
ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
}
ASSERT_OK(Flush());
}
MoveFilesToLevel(1);
ASSERT_EQ(FilesPerLevel(), "0,10,20");
}
void VerifyTestData() {
for (int i = 0; i < 200; i++) {
auto result = Get(Key(i));
if (i % 2) {
ASSERT_EQ(result, "value" + ToString(i));
} else {
ASSERT_EQ(result, "value_new" + ToString(i));
}
}
}
};
TEST_F(CompactionServiceTest, BasicCompactions) {
Options options = CurrentOptions();
options.env = env_;
options.compaction_service = std::make_shared<MyTestCompactionService>(
dbname_, env_->GetFileSystem(), options);
DestroyAndReopen(options);
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 10 + j;
ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
}
ASSERT_OK(Flush());
}
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 20 + j * 2;
ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// verify result
for (int i = 0; i < 200; i++) {
auto result = Get(Key(i));
if (i % 2) {
ASSERT_EQ(result, "value" + ToString(i));
} else {
ASSERT_EQ(result, "value_new" + ToString(i));
}
}
auto my_cs =
dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
ASSERT_GE(my_cs->GetCompactionNum(), 1);
// Test failed compaction
SyncPoint::GetInstance()->SetCallBack(
"DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) {
// override job status
Status* s = static_cast<Status*>(status);
*s = Status::Aborted("MyTestCompactionService failed to compact!");
});
SyncPoint::GetInstance()->EnableProcessing();
Status s;
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 20 + j * 2;
s = Put(Key(key_id), "value_new" + ToString(key_id));
if (s.IsAborted()) {
break;
}
}
if (s.IsAborted()) {
break;
}
s = Flush();
if (s.IsAborted()) {
break;
}
s = dbfull()->TEST_WaitForCompact();
if (s.IsAborted()) {
break;
}
}
ASSERT_TRUE(s.IsAborted());
}
TEST_F(CompactionServiceTest, ManualCompaction) {
Options options = CurrentOptions();
options.env = env_;
options.disable_auto_compactions = true;
options.compaction_service = std::make_shared<MyTestCompactionService>(
dbname_, env_->GetFileSystem(), options);
DestroyAndReopen(options);
GenerateTestData();
auto my_cs =
dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
std::string start_str = Key(15);
std::string end_str = Key(45);
Slice start(start_str);
Slice end(end_str);
uint64_t comp_num = my_cs->GetCompactionNum();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
VerifyTestData();
start_str = Key(120);
start = start_str;
comp_num = my_cs->GetCompactionNum();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
VerifyTestData();
end_str = Key(92);
end = end_str;
comp_num = my_cs->GetCompactionNum();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, &end));
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
VerifyTestData();
comp_num = my_cs->GetCompactionNum();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
VerifyTestData();
}
TEST_F(CompactionServiceTest, FailedToStart) {
Options options = CurrentOptions();
options.env = env_;
options.disable_auto_compactions = true;
options.compaction_service = std::make_shared<MyTestCompactionService>(
dbname_, env_->GetFileSystem(), options);
DestroyAndReopen(options);
GenerateTestData();
SyncPoint::GetInstance()->SetCallBack(
"MyTestCompactionService::Start::End", [&](void* status) {
// override job status
auto s = static_cast<CompactionServiceJobStatus*>(status);
*s = CompactionServiceJobStatus::kFailure;
});
SyncPoint::GetInstance()->EnableProcessing();
std::string start_str = Key(15);
std::string end_str = Key(45);
Slice start(start_str);
Slice end(end_str);
Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
ASSERT_TRUE(s.IsIncomplete());
}
TEST_F(CompactionServiceTest, InvalidResult) {
Options options = CurrentOptions();
options.env = env_;
options.disable_auto_compactions = true;
options.compaction_service = std::make_shared<MyTestCompactionService>(
dbname_, env_->GetFileSystem(), options);
DestroyAndReopen(options);
GenerateTestData();
SyncPoint::GetInstance()->SetCallBack(
"MyTestCompactionService::WaitForComplete::End", [&](void* result) {
// override job status
auto result_str = static_cast<std::string*>(result);
*result_str = "Invalid Str";
});
SyncPoint::GetInstance()->EnableProcessing();
std::string start_str = Key(15);
std::string end_str = Key(45);
Slice start(start_str);
Slice end(end_str);
Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
ASSERT_FALSE(s.ok());
}
// TODO: support sub-compaction
TEST_F(CompactionServiceTest, DISABLED_SubCompaction) {
Options options = CurrentOptions();
options.env = env_;
options.max_subcompactions = 10;
options.target_file_size_base = 1 << 10; // 1KB
options.disable_auto_compactions = true;
options.compaction_service = std::make_shared<MyTestCompactionService>(
dbname_, env_->GetFileSystem(), options);
DestroyAndReopen(options);
GenerateTestData();
auto cro = CompactRangeOptions();
cro.max_subcompactions = 10;
db_->CompactRange(cro, nullptr, nullptr);
}
class PartialDeleteCompactionFilter : public CompactionFilter {
public:
CompactionFilter::Decision FilterV2(
int /*level*/, const Slice& key, ValueType /*value_type*/,
const Slice& /*existing_value*/, std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
int i = std::stoi(key.ToString().substr(3));
if (i > 5 && i <= 105) {
return CompactionFilter::Decision::kRemove;
}
return CompactionFilter::Decision::kKeep;
}
const char* Name() const override { return "PartialDeleteCompactionFilter"; }
};
TEST_F(CompactionServiceTest, CompactionFilter) {
Options options = CurrentOptions();
options.env = env_;
auto delete_comp_filter = PartialDeleteCompactionFilter();
options.compaction_filter = &delete_comp_filter;
options.compaction_service = std::make_shared<MyTestCompactionService>(
dbname_, env_->GetFileSystem(), options);
DestroyAndReopen(options);
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 10 + j;
ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
}
ASSERT_OK(Flush());
}
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 20 + j * 2;
ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// verify result
for (int i = 0; i < 200; i++) {
auto result = Get(Key(i));
if (i > 5 && i <= 105) {
ASSERT_EQ(result, "NOT_FOUND");
} else if (i % 2) {
ASSERT_EQ(result, "value" + ToString(i));
} else {
ASSERT_EQ(result, "value_new" + ToString(i));
}
}
auto my_cs =
dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
ASSERT_GE(my_cs->GetCompactionNum(), 1);
}
TEST_F(CompactionServiceTest, Snapshot) {
Options options = CurrentOptions();
options.env = env_;
options.compaction_service = std::make_shared<MyTestCompactionService>(
dbname_, env_->GetFileSystem(), options);
DestroyAndReopen(options);
ASSERT_OK(Put(Key(1), "value1"));
ASSERT_OK(Put(Key(2), "value1"));
const Snapshot* s1 = db_->GetSnapshot();
ASSERT_OK(Flush());
ASSERT_OK(Put(Key(1), "value2"));
ASSERT_OK(Put(Key(3), "value2"));
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
auto my_cs =
dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
ASSERT_GE(my_cs->GetCompactionNum(), 1);
ASSERT_EQ("value1", Get(Key(1), s1));
ASSERT_EQ("value2", Get(Key(1)));
db_->ReleaseSnapshot(s1);
}
TEST_F(CompactionServiceTest, ConcurrentCompaction) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 100;
options.env = env_;
options.compaction_service = std::make_shared<MyTestCompactionService>(
dbname_, env_->GetFileSystem(), options);
options.max_background_jobs = 20;
DestroyAndReopen(options);
GenerateTestData();
ColumnFamilyMetaData meta;
db_->GetColumnFamilyMetaData(&meta);
std::vector<std::thread> threads;
for (const auto& file : meta.levels[1].files) {
threads.push_back(std::thread([&]() {
std::string fname = file.db_path + "/" + file.name;
ASSERT_OK(db_->CompactFiles(CompactionOptions(), {fname}, 2));
}));
}
for (auto& thread : threads) {
thread.join();
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// verify result
for (int i = 0; i < 200; i++) {
auto result = Get(Key(i));
if (i % 2) {
ASSERT_EQ(result, "value" + ToString(i));
} else {
ASSERT_EQ(result, "value_new" + ToString(i));
}
}
auto my_cs =
dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
ASSERT_EQ(my_cs->GetCompactionNum(), 10);
ASSERT_EQ(FilesPerLevel(), "0,0,10");
}
} // namespace ROCKSDB_NAMESPACE
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
extern "C" {
void RegisterCustomObjects(int argc, char** argv);
}
#else
void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
#endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
RegisterCustomObjects(argc, argv);
return RUN_ALL_TESTS();
}
#else
#include <stdio.h>
int main(int /*argc*/, char** /*argv*/) {
fprintf(stderr,
"SKIPPED as CompactionService is not supported in ROCKSDB_LITE\n");
return 0;
}
#endif // ROCKSDB_LITE

@ -1092,7 +1092,7 @@ Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
assert(cfd); assert(cfd);
Status s; Status s;
JobContext job_context(0, true); JobContext job_context(next_job_id_.fetch_add(1), true);
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
immutable_db_options_.info_log.get()); immutable_db_options_.info_log.get());
@ -1243,7 +1243,7 @@ Status DBImpl::CompactFilesImpl(
assert(is_snapshot_supported_ || snapshots_.empty()); assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJobStats compaction_job_stats; CompactionJobStats compaction_job_stats;
CompactionJob compaction_job( CompactionJob compaction_job(
job_context->job_id, c.get(), immutable_db_options_, job_context->job_id, c.get(), immutable_db_options_, mutable_db_options_,
file_options_for_compaction_, versions_.get(), &shutting_down_, file_options_for_compaction_, versions_.get(), &shutting_down_,
preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(), preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
GetDataDir(c->column_family_data(), c->output_path_id()), GetDataDir(c->column_family_data(), c->output_path_id()),
@ -3130,8 +3130,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
assert(is_snapshot_supported_ || snapshots_.empty()); assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJob compaction_job( CompactionJob compaction_job(
job_context->job_id, c.get(), immutable_db_options_, job_context->job_id, c.get(), immutable_db_options_,
file_options_for_compaction_, versions_.get(), &shutting_down_, mutable_db_options_, file_options_for_compaction_, versions_.get(),
preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(), &shutting_down_, preserve_deletes_seqnum_.load(), log_buffer,
directories_.GetDbDir(),
GetDataDir(c->column_family_data(), c->output_path_id()), GetDataDir(c->column_family_data(), c->output_path_id()),
GetDataDir(c->column_family_data(), 0), stats_, &mutex_, GetDataDir(c->column_family_data(), 0), stats_, &mutex_,
&error_handler_, snapshot_seqs, earliest_write_conflict_snapshot, &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
@ -3360,7 +3361,7 @@ bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
if (m->cfd != m1->cfd) { if (m->cfd != m1->cfd) {
return false; return false;
} }
return true; return false;
} }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE

@ -11,6 +11,7 @@
#include "db/merge_context.h" #include "db/merge_context.h"
#include "logging/auto_roll_logger.h" #include "logging/auto_roll_logger.h"
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
#include "rocksdb/configurable.h"
#include "util/cast_util.h" #include "util/cast_util.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -726,11 +727,11 @@ Status DBImplSecondary::CompactWithoutInstallation(
const int job_id = next_job_id_.fetch_add(1); const int job_id = next_job_id_.fetch_add(1);
CompactionServiceCompactionJob compaction_job( CompactionServiceCompactionJob compaction_job(
job_id, c.get(), immutable_db_options_, file_options_for_compaction_, job_id, c.get(), immutable_db_options_, mutable_db_options_,
versions_.get(), &shutting_down_, &log_buffer, output_dir.get(), stats_, file_options_for_compaction_, versions_.get(), &shutting_down_,
&mutex_, &error_handler_, input.snapshots, table_cache_, &event_logger_, &log_buffer, output_dir.get(), stats_, &mutex_, &error_handler_,
dbname_, io_tracer_, db_id_, db_session_id_, secondary_path_, input, input.snapshots, table_cache_, &event_logger_, dbname_, io_tracer_,
result); db_id_, db_session_id_, secondary_path_, input, result);
mutex_.Unlock(); mutex_.Unlock();
s = compaction_job.Run(); s = compaction_job.Run();
@ -742,6 +743,79 @@ Status DBImplSecondary::CompactWithoutInstallation(
c->ReleaseCompactionFiles(s); c->ReleaseCompactionFiles(s);
c.reset(); c.reset();
TEST_SYNC_POINT_CALLBACK("DBImplSecondary::CompactWithoutInstallation::End",
&s);
result->status = s;
return s;
}
Status DB::OpenAndCompact(
const std::string& name, const std::string& output_directory,
const std::string& input, std::string* result,
const CompactionServiceOptionsOverride& override_options) {
CompactionServiceInput compaction_input;
Status s = CompactionServiceInput::Read(input, &compaction_input);
if (!s.ok()) {
return s;
}
compaction_input.db_options.max_open_files = -1;
compaction_input.db_options.compaction_service = nullptr;
if (compaction_input.db_options.statistics) {
compaction_input.db_options.statistics.reset();
}
compaction_input.db_options.env = override_options.env;
compaction_input.db_options.file_checksum_gen_factory =
override_options.file_checksum_gen_factory;
compaction_input.column_family.options.comparator =
override_options.comparator;
compaction_input.column_family.options.merge_operator =
override_options.merge_operator;
compaction_input.column_family.options.compaction_filter =
override_options.compaction_filter;
compaction_input.column_family.options.compaction_filter_factory =
override_options.compaction_filter_factory;
compaction_input.column_family.options.prefix_extractor =
override_options.prefix_extractor;
compaction_input.column_family.options.table_factory =
override_options.table_factory;
compaction_input.column_family.options.sst_partitioner_factory =
override_options.sst_partitioner_factory;
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(compaction_input.column_family);
// TODO: we have to open default CF, because of an implementation limitation,
// currently we just use the same CF option from input, which is not collect
// and open may fail.
if (compaction_input.column_family.name != kDefaultColumnFamilyName) {
column_families.emplace_back(kDefaultColumnFamilyName,
compaction_input.column_family.options);
}
DB* db;
std::vector<ColumnFamilyHandle*> handles;
s = DB::OpenAsSecondary(compaction_input.db_options, name, output_directory,
column_families, &handles, &db);
if (!s.ok()) {
return s;
}
CompactionServiceResult compaction_result;
DBImplSecondary* db_secondary = static_cast_with_check<DBImplSecondary>(db);
assert(handles.size() > 0);
s = db_secondary->CompactWithoutInstallation(handles[0], compaction_input,
&compaction_result);
Status serialization_status = compaction_result.Write(result);
for (auto& handle : handles) {
delete handle;
}
delete db;
if (s.ok()) {
return serialization_status;
}
return s; return s;
} }

@ -187,6 +187,7 @@ TEST_F(DBSecondaryTest, SimpleInternalCompaction) {
ASSERT_EQ(result.output_path, this->secondary_path_); ASSERT_EQ(result.output_path, this->secondary_path_);
ASSERT_EQ(result.num_output_records, 2); ASSERT_EQ(result.num_output_records, 2);
ASSERT_GT(result.bytes_written, 0); ASSERT_GT(result.bytes_written, 0);
ASSERT_OK(result.status);
} }
TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) { TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) {
@ -235,6 +236,7 @@ TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) {
CompactionServiceResult result; CompactionServiceResult result;
ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input1, ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input1,
&result)); &result));
ASSERT_OK(result.status);
// pick 2 files on level 1 for compaction, which has 6 overlap files on L2 // pick 2 files on level 1 for compaction, which has 6 overlap files on L2
CompactionServiceInput input2; CompactionServiceInput input2;
@ -247,6 +249,7 @@ TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) {
input2.output_level = 2; input2.output_level = 2;
ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input2, ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input2,
&result)); &result));
ASSERT_OK(result.status);
CloseSecondary(); CloseSecondary();
@ -259,6 +262,7 @@ TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) {
Status s = db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input2, Status s = db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input2,
&result); &result);
ASSERT_TRUE(s.IsInvalidArgument()); ASSERT_TRUE(s.IsInvalidArgument());
ASSERT_OK(result.status);
// TODO: L0 -> L1 compaction should success, currently version is not built // TODO: L0 -> L1 compaction should success, currently version is not built
// if files is missing. // if files is missing.
@ -304,6 +308,7 @@ TEST_F(DBSecondaryTest, InternalCompactionCompactedFiles) {
Status s = Status s =
db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input, &result); db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input, &result);
ASSERT_TRUE(s.IsInvalidArgument()); ASSERT_TRUE(s.IsInvalidArgument());
ASSERT_OK(result.status);
} }
TEST_F(DBSecondaryTest, InternalCompactionMissingFiles) { TEST_F(DBSecondaryTest, InternalCompactionMissingFiles) {
@ -340,11 +345,13 @@ TEST_F(DBSecondaryTest, InternalCompactionMissingFiles) {
Status s = Status s =
db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input, &result); db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input, &result);
ASSERT_TRUE(s.IsInvalidArgument()); ASSERT_TRUE(s.IsInvalidArgument());
ASSERT_OK(result.status);
input.input_files.erase(input.input_files.begin()); input.input_files.erase(input.input_files.begin());
ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input, ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input,
&result)); &result));
ASSERT_OK(result.status);
} }
TEST_F(DBSecondaryTest, OpenAsSecondary) { TEST_F(DBSecondaryTest, OpenAsSecondary) {

@ -17,8 +17,10 @@ namespace ROCKSDB_NAMESPACE {
class OutputValidator { class OutputValidator {
public: public:
explicit OutputValidator(const InternalKeyComparator& icmp, explicit OutputValidator(const InternalKeyComparator& icmp,
bool enable_order_check, bool enable_hash) bool enable_order_check, bool enable_hash,
uint64_t precalculated_hash = 0)
: icmp_(icmp), : icmp_(icmp),
paranoid_hash_(precalculated_hash),
enable_order_check_(enable_order_check), enable_order_check_(enable_order_check),
enable_hash_(enable_hash) {} enable_hash_(enable_hash) {}
@ -37,8 +39,6 @@ class OutputValidator {
// without notice between releases. // without notice between releases.
uint64_t GetHash() const { return paranoid_hash_; } uint64_t GetHash() const { return paranoid_hash_; }
void SetHash(uint64_t hash) { paranoid_hash_ = hash; }
private: private:
const InternalKeyComparator& icmp_; const InternalKeyComparator& icmp_;
std::string prev_key_; std::string prev_key_;

@ -239,6 +239,16 @@ class DB {
const std::vector<ColumnFamilyDescriptor>& column_families, const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr); std::vector<ColumnFamilyHandle*>* handles, DB** dbptr);
// Open DB and run the compaction.
// It's a read-only operation, the result won't be installed to the DB, it
// will be output to the `output_directory`. The API should only be used with
// `options.CompactionService` to run compaction triggered by
// `CompactionService`.
static Status OpenAndCompact(
const std::string& name, const std::string& output_directory,
const std::string& input, std::string* output,
const CompactionServiceOptionsOverride& override_options);
virtual Status Resume() { return Status::NotSupported(); } virtual Status Resume() { return Status::NotSupported(); }
// Close the DB by releasing resources, closing files etc. This should be // Close the DB by releasing resources, closing files etc. This should be

@ -364,6 +364,30 @@ struct DbPath {
extern const char* kHostnameForDbHostId; extern const char* kHostnameForDbHostId;
enum class CompactionServiceJobStatus : char {
kSuccess,
kFailure,
kUseLocal, // TODO: Add support for use local compaction
};
class CompactionService {
public:
// Start the compaction with input information, which can be passed to
// `DB::OpenAndCompact()`.
// job_id is pre-assigned, it will be reset after DB re-open.
// TODO: sub-compaction is not supported, as they will have the same job_id, a
// sub-compaction id might be added
virtual CompactionServiceJobStatus Start(
const std::string& compaction_service_input, int job_id) = 0;
// Wait compaction to be finish.
// TODO: Add output path override
virtual CompactionServiceJobStatus WaitForComplete(
int job_id, std::string* compaction_service_result) = 0;
virtual ~CompactionService() {}
};
struct DBOptions { struct DBOptions {
// The function recovers options to the option as in version 4.6. // The function recovers options to the option as in version 4.6.
DBOptions* OldDefaults(int rocksdb_major_version = 4, DBOptions* OldDefaults(int rocksdb_major_version = 4,
@ -1215,6 +1239,15 @@ struct DBOptions {
// should enble this set as empty. Otherwise,it may cause unexpected // should enble this set as empty. Otherwise,it may cause unexpected
// write failures. // write failures.
FileTypeSet checksum_handoff_file_types; FileTypeSet checksum_handoff_file_types;
// EXPERIMENTAL
// CompactionService is a feature allows the user to run compactions on a
// different host or process, which offloads the background load from the
// primary host.
// It's an experimental feature, the interface will be changed without
// backward/forward compatibility support for now. Some known issues are still
// under development.
std::shared_ptr<CompactionService> compaction_service = nullptr;
}; };
// Options to control the behavior of a database (passed to DB::Open) // Options to control the behavior of a database (passed to DB::Open)
@ -1725,4 +1758,20 @@ struct SizeApproximationOptions {
double files_size_error_margin = -1.0; double files_size_error_margin = -1.0;
}; };
struct CompactionServiceOptionsOverride {
// Currently pointer configurations are not passed to compaction service
// compaction so the user needs to set it. It will be removed once pointer
// configuration passing is supported.
Env* env = Env::Default();
std::shared_ptr<FileChecksumGenFactory> file_checksum_gen_factory = nullptr;
const Comparator* comparator = BytewiseComparator();
std::shared_ptr<MergeOperator> merge_operator = nullptr;
const CompactionFilter* compaction_filter = nullptr;
std::shared_ptr<CompactionFilterFactory> compaction_filter_factory = nullptr;
std::shared_ptr<const SliceTransform> prefix_extractor = nullptr;
std::shared_ptr<TableFactory> table_factory;
std::shared_ptr<SstPartitionerFactory> sst_partitioner_factory = nullptr;
};
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -132,6 +132,10 @@ class Status {
}; };
Status(const Status& s, Severity sev); Status(const Status& s, Severity sev);
Status(Code _code, SubCode _subcode, Severity _sev, const Slice& msg)
: Status(_code, _subcode, msg, "", _sev) {}
Severity severity() const { Severity severity() const {
MarkChecked(); MarkChecked();
return sev_; return sev_;
@ -463,7 +467,8 @@ class Status {
explicit Status(Code _code, SubCode _subcode = kNone) explicit Status(Code _code, SubCode _subcode = kNone)
: code_(_code), subcode_(_subcode), sev_(kNoError), state_(nullptr) {} : code_(_code), subcode_(_subcode), sev_(kNoError), state_(nullptr) {}
Status(Code _code, SubCode _subcode, const Slice& msg, const Slice& msg2); Status(Code _code, SubCode _subcode, const Slice& msg, const Slice& msg2,
Severity sev = kNoError);
Status(Code _code, const Slice& msg, const Slice& msg2) Status(Code _code, const Slice& msg, const Slice& msg2)
: Status(_code, kNone, msg, msg2) {} : Status(_code, kNone, msg, msg2) {}

@ -25,6 +25,7 @@ enum class OptionType {
kInt32T, kInt32T,
kInt64T, kInt64T,
kUInt, kUInt,
kUInt8T,
kUInt32T, kUInt32T,
kUInt64T, kUInt64T,
kSizeT, kSizeT,

@ -578,7 +578,8 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
bgerror_resume_retry_interval(options.bgerror_resume_retry_interval), bgerror_resume_retry_interval(options.bgerror_resume_retry_interval),
allow_data_in_errors(options.allow_data_in_errors), allow_data_in_errors(options.allow_data_in_errors),
db_host_id(options.db_host_id), db_host_id(options.db_host_id),
checksum_handoff_file_types(options.checksum_handoff_file_types) { checksum_handoff_file_types(options.checksum_handoff_file_types),
compaction_service(options.compaction_service) {
stats = statistics.get(); stats = statistics.get();
fs = env->GetFileSystem(); fs = env->GetFileSystem();
if (env != nullptr) { if (env != nullptr) {

@ -99,6 +99,7 @@ struct ImmutableDBOptions {
SystemClock* clock; SystemClock* clock;
Statistics* stats; Statistics* stats;
Logger* logger; Logger* logger;
std::shared_ptr<CompactionService> compaction_service;
}; };
struct MutableDBOptions { struct MutableDBOptions {

@ -435,6 +435,9 @@ static bool ParseOptionHelper(void* opt_address, const OptionType& opt_type,
case OptionType::kUInt: case OptionType::kUInt:
*static_cast<unsigned int*>(opt_address) = ParseUint32(value); *static_cast<unsigned int*>(opt_address) = ParseUint32(value);
break; break;
case OptionType::kUInt8T:
*static_cast<uint8_t*>(opt_address) = ParseUint8(value);
break;
case OptionType::kUInt32T: case OptionType::kUInt32T:
*static_cast<uint32_t*>(opt_address) = ParseUint32(value); *static_cast<uint32_t*>(opt_address) = ParseUint32(value);
break; break;
@ -510,6 +513,9 @@ bool SerializeSingleOptionHelper(const void* opt_address,
case OptionType::kUInt: case OptionType::kUInt:
*value = ToString(*(static_cast<const unsigned int*>(opt_address))); *value = ToString(*(static_cast<const unsigned int*>(opt_address)));
break; break;
case OptionType::kUInt8T:
*value = ToString(*(static_cast<const uint8_t*>(opt_address)));
break;
case OptionType::kUInt32T: case OptionType::kUInt32T:
*value = ToString(*(static_cast<const uint32_t*>(opt_address))); *value = ToString(*(static_cast<const uint32_t*>(opt_address)));
break; break;
@ -1224,6 +1230,8 @@ static bool AreOptionsEqual(OptionType type, const void* this_offset,
GetUnaligned(static_cast<const int64_t*>(that_offset), &v2); GetUnaligned(static_cast<const int64_t*>(that_offset), &v2);
return (v1 == v2); return (v1 == v2);
} }
case OptionType::kUInt8T:
return IsOptionEqual<uint8_t>(this_offset, that_offset);
case OptionType::kUInt32T: case OptionType::kUInt32T:
return IsOptionEqual<uint32_t>(this_offset, that_offset); return IsOptionEqual<uint32_t>(this_offset, that_offset);
case OptionType::kUInt64T: { case OptionType::kUInt64T: {

@ -230,6 +230,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
{offsetof(struct DBOptions, db_host_id), sizeof(std::string)}, {offsetof(struct DBOptions, db_host_id), sizeof(std::string)},
{offsetof(struct DBOptions, checksum_handoff_file_types), {offsetof(struct DBOptions, checksum_handoff_file_types),
sizeof(FileTypeSet)}, sizeof(FileTypeSet)},
{offsetof(struct DBOptions, compaction_service),
sizeof(std::shared_ptr<CompactionService>)},
}; };
char* options_ptr = new char[sizeof(DBOptions)]; char* options_ptr = new char[sizeof(DBOptions)];

@ -392,6 +392,7 @@ TEST_MAIN_SOURCES = \
db/compaction/compaction_job_test.cc \ db/compaction/compaction_job_test.cc \
db/compaction/compaction_job_stats_test.cc \ db/compaction/compaction_job_stats_test.cc \
db/compaction/compaction_picker_test.cc \ db/compaction/compaction_picker_test.cc \
db/compaction/compaction_service_test.cc \
db/comparator_db_test.cc \ db/comparator_db_test.cc \
db/corruption_test.cc \ db/corruption_test.cc \
db/cuckoo_table_db_test.cc \ db/cuckoo_table_db_test.cc \

@ -58,9 +58,8 @@ static const char* msgs[static_cast<int>(Status::kMaxSubCode)] = {
}; };
Status::Status(Code _code, SubCode _subcode, const Slice& msg, Status::Status(Code _code, SubCode _subcode, const Slice& msg,
const Slice& msg2) const Slice& msg2, Severity sev)
: code_(_code), subcode_(_subcode), sev_(kNoError) { : code_(_code), subcode_(_subcode), sev_(sev) {
assert(code_ != kOk);
assert(subcode_ != kMaxSubCode); assert(subcode_ != kMaxSubCode);
const size_t len1 = msg.size(); const size_t len1 = msg.size();
const size_t len2 = msg2.size(); const size_t len2 = msg2.size();

@ -301,6 +301,15 @@ bool ParseBoolean(const std::string& type, const std::string& value) {
throw std::invalid_argument(type); throw std::invalid_argument(type);
} }
uint8_t ParseUint8(const std::string& value) {
uint64_t num = ParseUint64(value);
if ((num >> 8LL) == 0) {
return static_cast<uint8_t>(num);
} else {
throw std::out_of_range(value);
}
}
uint32_t ParseUint32(const std::string& value) { uint32_t ParseUint32(const std::string& value) {
uint64_t num = ParseUint64(value); uint64_t num = ParseUint64(value);
if ((num >> 32LL) == 0) { if ((num >> 32LL) == 0) {

@ -120,6 +120,8 @@ bool StartsWith(const std::string& string, const std::string& pattern);
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
bool ParseBoolean(const std::string& type, const std::string& value); bool ParseBoolean(const std::string& type, const std::string& value);
uint8_t ParseUint8(const std::string& value);
uint32_t ParseUint32(const std::string& value); uint32_t ParseUint32(const std::string& value);
int32_t ParseInt32(const std::string& value); int32_t ParseInt32(const std::string& value);

Loading…
Cancel
Save