From 3786181a90bd2daeff22bc0f20e0c06adca95bd2 Mon Sep 17 00:00:00 2001 From: Jay Zhuang Date: Wed, 19 May 2021 21:40:43 -0700 Subject: [PATCH] Add remote compaction public API (#8300) Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/8300 Reviewed By: ajkr Differential Revision: D28464726 Pulled By: jay-zhuang fbshipit-source-id: 49e9f4fb791808a6cbf39a7b1a331373f645fc5e --- CMakeLists.txt | 1 + HISTORY.md | 1 + Makefile | 3 + TARGETS | 7 + db/compaction/compaction_job.cc | 264 ++++++++++++- db/compaction/compaction_job.h | 7 + db/compaction/compaction_job_test.cc | 17 +- db/compaction/compaction_service_test.cc | 454 +++++++++++++++++++++++ db/db_impl/db_impl_compaction_flush.cc | 11 +- db/db_impl/db_impl_secondary.cc | 84 ++++- db/db_secondary_test.cc | 7 + db/output_validator.h | 6 +- include/rocksdb/db.h | 10 + include/rocksdb/options.h | 49 +++ include/rocksdb/status.h | 7 +- include/rocksdb/utilities/options_type.h | 1 + options/db_options.cc | 3 +- options/db_options.h | 1 + options/options_helper.cc | 8 + options/options_settable_test.cc | 2 + src.mk | 1 + util/status.cc | 5 +- util/string_util.cc | 9 + util/string_util.h | 2 + 24 files changed, 927 insertions(+), 33 deletions(-) create mode 100644 db/compaction/compaction_service_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 7da762f25..1ae550dbe 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1113,6 +1113,7 @@ if(WITH_TESTS) db/compaction/compaction_job_test.cc db/compaction/compaction_iterator_test.cc db/compaction/compaction_picker_test.cc + db/compaction/compaction_service_test.cc db/comparator_db_test.cc db/corruption_test.cc db/cuckoo_table_db_test.cc diff --git a/HISTORY.md b/HISTORY.md index 8d0cca346..bb248d371 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -17,6 +17,7 @@ * Allow `CompactionFilter`s to apply in more table file creation scenarios such as flush and recovery. For compatibility, `CompactionFilter`s by default apply during compaction. Users can customize this behavior by overriding `CompactionFilterFactory::ShouldFilterTableFileCreation()`. * Added more fields to FilterBuildingContext with LSM details, for custom filter policies that vary behavior based on where they are in the LSM-tree. * Added DB::Properties::kBlockCacheEntryStats for querying statistics on what percentage of block cache is used by various kinds of blocks, etc. using DB::GetProperty and DB::GetMapProperty. The same information is now dumped to info LOG periodically according to `stats_dump_period_sec`. +* Add an experimental Remote Compaction feature, which allows the user to run Compaction on a different host or process. The feature is still under development, currently only works on some basic use cases. The interface will be changed without backward/forward compatibility support. ### Performance Improvements * BlockPrefetcher is used by iterators to prefetch data if they anticipate more data to be used in future. It is enabled implicitly by rocksdb. Added change to take in account read pattern if reads are sequential. This would disable prefetching for random reads in MultiGet and iterators as readahead_size is increased exponential doing large prefetches. diff --git a/Makefile b/Makefile index 8bf806f38..c0e3cc969 100644 --- a/Makefile +++ b/Makefile @@ -1540,6 +1540,9 @@ compaction_job_test: $(OBJ_DIR)/db/compaction/compaction_job_test.o $(TEST_LIBRA compaction_job_stats_test: $(OBJ_DIR)/db/compaction/compaction_job_stats_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +compaction_service_test: $(OBJ_DIR)/db/compaction/compaction_service_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + compact_on_deletion_collector_test: $(OBJ_DIR)/utilities/table_properties_collectors/compact_on_deletion_collector_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 838877164..226c850c6 100644 --- a/TARGETS +++ b/TARGETS @@ -1107,6 +1107,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "compaction_service_test", + "db/compaction/compaction_service_test.cc", + "parallel", + [], + [], + ], [ "comparator_db_test", "db/comparator_db_test.cc", diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 4bbc36247..c1819af10 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -131,10 +131,12 @@ struct CompactionJob::SubcompactionState { // Files produced by this subcompaction struct Output { Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp, - bool _enable_order_check, bool _enable_hash) + bool _enable_order_check, bool _enable_hash, bool _finished = false, + uint64_t precalculated_hash = 0) : meta(std::move(_meta)), - validator(_icmp, _enable_order_check, _enable_hash), - finished(false) {} + validator(_icmp, _enable_order_check, _enable_hash, + precalculated_hash), + finished(_finished) {} FileMetaData meta; OutputValidator validator; bool finished; @@ -299,8 +301,8 @@ void CompactionJob::AggregateStatistics() { CompactionJob::CompactionJob( int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, - const FileOptions& file_options, VersionSet* versions, - const std::atomic* shutting_down, + const MutableDBOptions& mutable_db_options, const FileOptions& file_options, + VersionSet* versions, const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer, FSDirectory* db_directory, FSDirectory* output_directory, FSDirectory* blob_output_directory, Statistics* stats, @@ -317,6 +319,7 @@ CompactionJob::CompactionJob( : compact_(new CompactionState(compaction)), compaction_stats_(compaction->compaction_reason(), 1), db_options_(db_options), + mutable_db_options_copy_(mutable_db_options), log_buffer_(log_buffer), output_directory_(output_directory), stats_(stats), @@ -901,10 +904,163 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { return status; } +#ifndef ROCKSDB_LITE +void CompactionJob::ProcessKeyValueCompactionWithCompactionService( + SubcompactionState* sub_compact) { + assert(sub_compact); + assert(sub_compact->compaction); + assert(db_options_.compaction_service); + + const Compaction* compaction = sub_compact->compaction; + CompactionServiceInput compaction_input; + compaction_input.output_level = compaction->output_level(); + + const std::vector& inputs = + *(compact_->compaction->inputs()); + for (const auto& files_per_level : inputs) { + for (const auto& file : files_per_level.files) { + compaction_input.input_files.emplace_back( + MakeTableFileName(file->fd.GetNumber())); + } + } + compaction_input.column_family.name = + compaction->column_family_data()->GetName(); + compaction_input.column_family.options = + compaction->column_family_data()->GetLatestCFOptions(); + compaction_input.db_options = + BuildDBOptions(db_options_, mutable_db_options_copy_); + compaction_input.snapshots = existing_snapshots_; + compaction_input.has_begin = sub_compact->start; + compaction_input.begin = + compaction_input.has_begin ? sub_compact->start->ToString() : ""; + compaction_input.has_end = sub_compact->end; + compaction_input.end = + compaction_input.has_end ? sub_compact->end->ToString() : ""; + compaction_input.approx_size = sub_compact->approx_size; + + std::string compaction_input_binary; + Status s = compaction_input.Write(&compaction_input_binary); + if (!s.ok()) { + sub_compact->status = s; + return; + } + + std::ostringstream input_files_oss; + bool is_first_one = true; + for (const auto& file : compaction_input.input_files) { + input_files_oss << (is_first_one ? "" : ", ") << file; + is_first_one = false; + } + + ROCKS_LOG_INFO( + db_options_.info_log, + "[%s] [JOB %d] Starting remote compaction (output level: %d): %s", + compaction_input.column_family.name.c_str(), job_id_, + compaction_input.output_level, input_files_oss.str().c_str()); + CompactionServiceJobStatus compaction_status = + db_options_.compaction_service->Start(compaction_input_binary, job_id_); + if (compaction_status != CompactionServiceJobStatus::kSuccess) { + sub_compact->status = + Status::Incomplete("CompactionService failed to start compaction job."); + return; + } + + std::string compaction_result_binary; + compaction_status = db_options_.compaction_service->WaitForComplete( + job_id_, &compaction_result_binary); + + CompactionServiceResult compaction_result; + s = CompactionServiceResult::Read(compaction_result_binary, + &compaction_result); + if (compaction_status != CompactionServiceJobStatus::kSuccess) { + sub_compact->status = + s.ok() ? compaction_result.status + : Status::Incomplete( + "CompactionService failed to run compaction job."); + compaction_result.status.PermitUncheckedError(); + ROCKS_LOG_WARN(db_options_.info_log, + "[%s] [JOB %d] Remote compaction failed, status: %s", + compaction_input.column_family.name.c_str(), job_id_, + s.ToString().c_str()); + return; + } + + if (!s.ok()) { + sub_compact->status = s; + compaction_result.status.PermitUncheckedError(); + return; + } + sub_compact->status = compaction_result.status; + + std::ostringstream output_files_oss; + is_first_one = true; + for (const auto& file : compaction_result.output_files) { + output_files_oss << (is_first_one ? "" : ", ") << file.file_name; + is_first_one = false; + } + + ROCKS_LOG_INFO(db_options_.info_log, + "[%s] [JOB %d] Receive remote compaction result, output path: " + "%s, files: %s", + compaction_input.column_family.name.c_str(), job_id_, + compaction_result.output_path.c_str(), + output_files_oss.str().c_str()); + + if (!s.ok()) { + sub_compact->status = s; + return; + } + + for (const auto& file : compaction_result.output_files) { + uint64_t file_num = versions_->NewFileNumber(); + auto src_file = compaction_result.output_path + "/" + file.file_name; + auto tgt_file = TableFileName(compaction->immutable_cf_options()->cf_paths, + file_num, compaction->output_path_id()); + s = fs_->RenameFile(src_file, tgt_file, IOOptions(), nullptr); + if (!s.ok()) { + sub_compact->status = s; + return; + } + + FileMetaData meta; + uint64_t file_size; + s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr); + if (!s.ok()) { + sub_compact->status = s; + return; + } + meta.fd = FileDescriptor(file_num, compaction->output_path_id(), file_size, + file.smallest_seqno, file.largest_seqno); + meta.smallest.DecodeFrom(file.smallest_internal_key); + meta.largest.DecodeFrom(file.largest_internal_key); + meta.oldest_ancester_time = file.oldest_ancester_time; + meta.file_creation_time = file.file_creation_time; + meta.marked_for_compaction = file.marked_for_compaction; + + auto cfd = compaction->column_family_data(); + sub_compact->outputs.emplace_back(std::move(meta), + cfd->internal_comparator(), false, false, + true, file.paranoid_hash); + } + sub_compact->compaction_job_stats = compaction_result.stats; + sub_compact->num_output_records = compaction_result.num_output_records; + sub_compact->approx_size = compaction_input.approx_size; // is this used? + sub_compact->total_bytes = compaction_result.total_bytes; + IOSTATS_ADD(bytes_written, compaction_result.bytes_written); + IOSTATS_ADD(bytes_read, compaction_result.bytes_read); +} +#endif // !ROCKSDB_LITE + void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { assert(sub_compact); assert(sub_compact->compaction); +#ifndef ROCKSDB_LITE + if (db_options_.compaction_service) { + return ProcessKeyValueCompactionWithCompactionService(sub_compact); + } +#endif // !ROCKSDB_LITE + uint64_t prev_cpu_micros = db_options_.clock->CPUNanos() / 1000; ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); @@ -1951,9 +2107,9 @@ std::string CompactionServiceCompactionJob::GetTableFileName( CompactionServiceCompactionJob::CompactionServiceCompactionJob( int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, - const FileOptions& file_options, VersionSet* versions, - const std::atomic* shutting_down, LogBuffer* log_buffer, - FSDirectory* output_directory, Statistics* stats, + const MutableDBOptions& mutable_db_options, const FileOptions& file_options, + VersionSet* versions, const std::atomic* shutting_down, + LogBuffer* log_buffer, FSDirectory* output_directory, Statistics* stats, InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, std::vector existing_snapshots, std::shared_ptr table_cache, EventLogger* event_logger, @@ -1963,10 +2119,10 @@ CompactionServiceCompactionJob::CompactionServiceCompactionJob( const CompactionServiceInput& compaction_service_input, CompactionServiceResult* compaction_service_result) : CompactionJob( - job_id, compaction, db_options, file_options, versions, shutting_down, - 0, log_buffer, nullptr, output_directory, nullptr, stats, db_mutex, - db_error_handler, existing_snapshots, kMaxSequenceNumber, nullptr, - table_cache, event_logger, + job_id, compaction, db_options, mutable_db_options, file_options, + versions, shutting_down, 0, log_buffer, nullptr, output_directory, + nullptr, stats, db_mutex, db_error_handler, existing_snapshots, + kMaxSequenceNumber, nullptr, table_cache, event_logger, compaction->mutable_cf_options()->paranoid_file_checks, compaction->mutable_cf_options()->report_bg_io_stats, dbname, &(compaction_service_result->stats), Env::Priority::USER, io_tracer, @@ -2357,7 +2513,85 @@ static std::unordered_map OptionTypeFlags::kNone}}, }; +namespace { +// this is a helper struct to serialize and deserialize class Status, because +// Status's members are not public. +struct StatusSerializationAdapter { + uint8_t code; + uint8_t subcode; + uint8_t severity; + std::string message; + + StatusSerializationAdapter() {} + explicit StatusSerializationAdapter(const Status& s) { + code = s.code(); + subcode = s.subcode(); + severity = s.severity(); + auto msg = s.getState(); + message = msg ? msg : ""; + } + + Status GetStatus() { + return Status(static_cast(code), + static_cast(subcode), + static_cast(severity), message); + } +}; +} // namespace + +static std::unordered_map + status_adapter_type_info = { + {"code", + {offsetof(struct StatusSerializationAdapter, code), + OptionType::kUInt8T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"subcode", + {offsetof(struct StatusSerializationAdapter, subcode), + OptionType::kUInt8T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"severity", + {offsetof(struct StatusSerializationAdapter, severity), + OptionType::kUInt8T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"message", + {offsetof(struct StatusSerializationAdapter, message), + OptionType::kEncodedString, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, +}; + static std::unordered_map cs_result_type_info = { + {"status", + {offsetof(struct CompactionServiceResult, status), + OptionType::kCustomizable, OptionVerificationType::kNormal, + OptionTypeFlags::kNone, + [](const ConfigOptions& opts, const std::string& /*name*/, + const std::string& value, void* addr) { + auto status_obj = static_cast(addr); + StatusSerializationAdapter adapter; + Status s = OptionTypeInfo::ParseType( + opts, value, status_adapter_type_info, &adapter); + *status_obj = adapter.GetStatus(); + return s; + }, + [](const ConfigOptions& opts, const std::string& /*name*/, + const void* addr, std::string* value) { + const auto status_obj = static_cast(addr); + StatusSerializationAdapter adapter(*status_obj); + std::string result; + Status s = OptionTypeInfo::SerializeType(opts, status_adapter_type_info, + &adapter, &result); + *value = "{" + result + "}"; + return s; + }, + [](const ConfigOptions& opts, const std::string& /*name*/, + const void* addr1, const void* addr2, std::string* mismatch) { + const auto status1 = static_cast(addr1); + const auto status2 = static_cast(addr2); + StatusSerializationAdapter adatper1(*status1); + StatusSerializationAdapter adapter2(*status2); + return OptionTypeInfo::TypesAreEqual(opts, status_adapter_type_info, + &adatper1, &adapter2, mismatch); + }}}, {"output_files", OptionTypeInfo::Vector( offsetof(struct CompactionServiceResult, output_files), @@ -2396,6 +2630,9 @@ static std::unordered_map cs_result_type_info = { Status CompactionServiceInput::Read(const std::string& data_str, CompactionServiceInput* obj) { + if (data_str.size() <= sizeof(BinaryFormatVersion)) { + return Status::InvalidArgument("Invalid CompactionServiceInput string"); + } auto format_version = DecodeFixed32(data_str.data()); if (format_version == kOptionsString) { ConfigOptions cf; @@ -2422,6 +2659,9 @@ Status CompactionServiceInput::Write(std::string* output) { Status CompactionServiceResult::Read(const std::string& data_str, CompactionServiceResult* obj) { + if (data_str.size() <= sizeof(BinaryFormatVersion)) { + return Status::InvalidArgument("Invalid CompactionServiceResult string"); + } auto format_version = DecodeFixed32(data_str.data()); if (format_version == kOptionsString) { ConfigOptions cf; diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 1b11292ef..197f7e93b 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -66,6 +66,7 @@ class CompactionJob { public: CompactionJob( int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, + const MutableDBOptions& mutable_db_options, const FileOptions& file_options, VersionSet* versions, const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer, @@ -125,6 +126,7 @@ class CompactionJob { CompactionState* compact_; InternalStats::CompactionStats compaction_stats_; const ImmutableDBOptions& db_options_; + const MutableDBOptions mutable_db_options_copy_; LogBuffer* log_buffer_; FSDirectory* output_directory_; Statistics* stats_; @@ -143,6 +145,9 @@ class CompactionJob { // consecutive groups such that each group has a similar size. void GenSubcompactionBoundaries(); + void ProcessKeyValueCompactionWithCompactionService( + SubcompactionState* sub_compact); + // update the thread status for starting a compaction. void ReportStartedCompaction(Compaction* compaction); void AllocateCompactionOutputFileNumbers(); @@ -287,6 +292,7 @@ struct CompactionServiceOutputFile { // instance, with these information, the primary db instance with write // permission is able to install the result to the DB. struct CompactionServiceResult { + Status status; std::vector output_files; int output_level; @@ -317,6 +323,7 @@ class CompactionServiceCompactionJob : private CompactionJob { public: CompactionServiceCompactionJob( int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, + const MutableDBOptions& mutable_db_options, const FileOptions& file_options, VersionSet* versions, const std::atomic* shutting_down, LogBuffer* log_buffer, FSDirectory* output_directory, Statistics* stats, diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 3b43cbbff..062aa7d15 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -344,9 +344,9 @@ class CompactionJobTestBase : public testing::Test { ASSERT_TRUE(full_history_ts_low_.empty() || ucmp_->timestamp_size() == full_history_ts_low_.size()); CompactionJob compaction_job( - 0, &compaction, db_options_, env_options_, versions_.get(), - &shutting_down_, preserve_deletes_seqnum_, &log_buffer, nullptr, - nullptr, nullptr, nullptr, &mutex_, &error_handler_, snapshots, + 0, &compaction, db_options_, mutable_db_options_, env_options_, + versions_.get(), &shutting_down_, preserve_deletes_seqnum_, &log_buffer, + nullptr, nullptr, nullptr, nullptr, &mutex_, &error_handler_, snapshots, earliest_write_conflict_snapshot, snapshot_checker, table_cache_, &event_logger, false, false, dbname_, &compaction_job_stats_, Env::Priority::USER, nullptr /* IOTracer */, @@ -1190,6 +1190,14 @@ TEST_F(CompactionJobTest, ResultSerialization) { const int kStrMaxLen = 1000; Random rnd(static_cast(time(nullptr))); Random64 rnd64(time(nullptr)); + std::vector status_list = { + Status::OK(), + Status::InvalidArgument("invalid option"), + Status::Aborted("failed to run"), + Status::NotSupported("not supported option"), + }; + result.status = + status_list.at(rnd.Uniform(static_cast(status_list.size()))); while (!rnd.OneIn(10)) { result.output_files.emplace_back( rnd.RandomString(rnd.Uniform(kStrMaxLen)), rnd64.Uniform(UINT64_MAX), @@ -1262,6 +1270,9 @@ TEST_F(CompactionJobTest, ResultSerialization) { output.replace(0, kDataVersionSize, buf, kDataVersionSize); Status s = CompactionServiceResult::Read(output, &deserialized3); ASSERT_TRUE(s.IsNotSupported()); + for (const auto& item : status_list) { + item.PermitUncheckedError(); + } } class CompactionJobTimestampTest : public CompactionJobTestBase { diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc new file mode 100644 index 000000000..f68b53683 --- /dev/null +++ b/db/compaction/compaction_service_test.cc @@ -0,0 +1,454 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "db/db_test_util.h" +#include "port/stack_trace.h" + +namespace ROCKSDB_NAMESPACE { + +class MyTestCompactionService : public CompactionService { + public: + MyTestCompactionService(const std::string& db_path, + std::shared_ptr fs, Options& options) + : db_path_(db_path), fs_(fs), options_(options) {} + + CompactionServiceJobStatus Start(const std::string& compaction_service_input, + int job_id) override { + InstrumentedMutexLock l(&mutex_); + jobs_.emplace(job_id, compaction_service_input); + CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess; + TEST_SYNC_POINT_CALLBACK("MyTestCompactionService::Start::End", &s); + return s; + } + + CompactionServiceJobStatus WaitForComplete( + int job_id, std::string* compaction_service_result) override { + std::string compaction_input; + { + InstrumentedMutexLock l(&mutex_); + auto i = jobs_.find(job_id); + if (i == jobs_.end()) { + return CompactionServiceJobStatus::kFailure; + } + compaction_input = std::move(i->second); + jobs_.erase(i); + } + + CompactionServiceOptionsOverride options_override; + options_override.env = options_.env; + options_override.file_checksum_gen_factory = + options_.file_checksum_gen_factory; + options_override.comparator = options_.comparator; + options_override.merge_operator = options_.merge_operator; + options_override.compaction_filter = options_.compaction_filter; + options_override.compaction_filter_factory = + options_.compaction_filter_factory; + options_override.prefix_extractor = options_.prefix_extractor; + options_override.table_factory = options_.table_factory; + options_override.sst_partitioner_factory = options_.sst_partitioner_factory; + + Status s = DB::OpenAndCompact(db_path_, db_path_ + "/" + ToString(job_id), + compaction_input, compaction_service_result, + options_override); + TEST_SYNC_POINT_CALLBACK("MyTestCompactionService::WaitForComplete::End", + compaction_service_result); + compaction_num_.fetch_add(1); + if (s.ok()) { + return CompactionServiceJobStatus::kSuccess; + } else { + return CompactionServiceJobStatus::kFailure; + } + } + + int GetCompactionNum() { return compaction_num_.load(); } + + private: + InstrumentedMutex mutex_; + std::atomic_int compaction_num_{0}; + std::map jobs_; + const std::string db_path_; + std::shared_ptr fs_; + Options options_; +}; + +class CompactionServiceTest : public DBTestBase { + public: + explicit CompactionServiceTest() + : DBTestBase("compaction_service_test", true) {} + + protected: + void GenerateTestData() { + // Generate 20 files @ L2 + for (int i = 0; i < 20; i++) { + for (int j = 0; j < 10; j++) { + int key_id = i * 10 + j; + ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id))); + } + ASSERT_OK(Flush()); + } + MoveFilesToLevel(2); + + // Generate 10 files @ L1 overlap with all 20 files @ L2 + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + int key_id = i * 20 + j * 2; + ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id))); + } + ASSERT_OK(Flush()); + } + MoveFilesToLevel(1); + ASSERT_EQ(FilesPerLevel(), "0,10,20"); + } + + void VerifyTestData() { + for (int i = 0; i < 200; i++) { + auto result = Get(Key(i)); + if (i % 2) { + ASSERT_EQ(result, "value" + ToString(i)); + } else { + ASSERT_EQ(result, "value_new" + ToString(i)); + } + } + } +}; + +TEST_F(CompactionServiceTest, BasicCompactions) { + Options options = CurrentOptions(); + options.env = env_; + options.compaction_service = std::make_shared( + dbname_, env_->GetFileSystem(), options); + + DestroyAndReopen(options); + + for (int i = 0; i < 20; i++) { + for (int j = 0; j < 10; j++) { + int key_id = i * 10 + j; + ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id))); + } + ASSERT_OK(Flush()); + } + + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + int key_id = i * 20 + j * 2; + ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id))); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + // verify result + for (int i = 0; i < 200; i++) { + auto result = Get(Key(i)); + if (i % 2) { + ASSERT_EQ(result, "value" + ToString(i)); + } else { + ASSERT_EQ(result, "value_new" + ToString(i)); + } + } + auto my_cs = + dynamic_cast(options.compaction_service.get()); + ASSERT_GE(my_cs->GetCompactionNum(), 1); + + // Test failed compaction + SyncPoint::GetInstance()->SetCallBack( + "DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) { + // override job status + Status* s = static_cast(status); + *s = Status::Aborted("MyTestCompactionService failed to compact!"); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + Status s; + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + int key_id = i * 20 + j * 2; + s = Put(Key(key_id), "value_new" + ToString(key_id)); + if (s.IsAborted()) { + break; + } + } + if (s.IsAborted()) { + break; + } + s = Flush(); + if (s.IsAborted()) { + break; + } + s = dbfull()->TEST_WaitForCompact(); + if (s.IsAborted()) { + break; + } + } + ASSERT_TRUE(s.IsAborted()); +} + +TEST_F(CompactionServiceTest, ManualCompaction) { + Options options = CurrentOptions(); + options.env = env_; + options.disable_auto_compactions = true; + options.compaction_service = std::make_shared( + dbname_, env_->GetFileSystem(), options); + DestroyAndReopen(options); + GenerateTestData(); + + auto my_cs = + dynamic_cast(options.compaction_service.get()); + + std::string start_str = Key(15); + std::string end_str = Key(45); + Slice start(start_str); + Slice end(end_str); + uint64_t comp_num = my_cs->GetCompactionNum(); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end)); + ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); + VerifyTestData(); + + start_str = Key(120); + start = start_str; + comp_num = my_cs->GetCompactionNum(); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr)); + ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); + VerifyTestData(); + + end_str = Key(92); + end = end_str; + comp_num = my_cs->GetCompactionNum(); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, &end)); + ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); + VerifyTestData(); + + comp_num = my_cs->GetCompactionNum(); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); + VerifyTestData(); +} + +TEST_F(CompactionServiceTest, FailedToStart) { + Options options = CurrentOptions(); + options.env = env_; + options.disable_auto_compactions = true; + options.compaction_service = std::make_shared( + dbname_, env_->GetFileSystem(), options); + DestroyAndReopen(options); + GenerateTestData(); + + SyncPoint::GetInstance()->SetCallBack( + "MyTestCompactionService::Start::End", [&](void* status) { + // override job status + auto s = static_cast(status); + *s = CompactionServiceJobStatus::kFailure; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + std::string start_str = Key(15); + std::string end_str = Key(45); + Slice start(start_str); + Slice end(end_str); + Status s = db_->CompactRange(CompactRangeOptions(), &start, &end); + ASSERT_TRUE(s.IsIncomplete()); +} + +TEST_F(CompactionServiceTest, InvalidResult) { + Options options = CurrentOptions(); + options.env = env_; + options.disable_auto_compactions = true; + options.compaction_service = std::make_shared( + dbname_, env_->GetFileSystem(), options); + DestroyAndReopen(options); + GenerateTestData(); + + SyncPoint::GetInstance()->SetCallBack( + "MyTestCompactionService::WaitForComplete::End", [&](void* result) { + // override job status + auto result_str = static_cast(result); + *result_str = "Invalid Str"; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + std::string start_str = Key(15); + std::string end_str = Key(45); + Slice start(start_str); + Slice end(end_str); + Status s = db_->CompactRange(CompactRangeOptions(), &start, &end); + ASSERT_FALSE(s.ok()); +} + +// TODO: support sub-compaction +TEST_F(CompactionServiceTest, DISABLED_SubCompaction) { + Options options = CurrentOptions(); + options.env = env_; + options.max_subcompactions = 10; + options.target_file_size_base = 1 << 10; // 1KB + options.disable_auto_compactions = true; + options.compaction_service = std::make_shared( + dbname_, env_->GetFileSystem(), options); + + DestroyAndReopen(options); + GenerateTestData(); + + auto cro = CompactRangeOptions(); + cro.max_subcompactions = 10; + db_->CompactRange(cro, nullptr, nullptr); +} + +class PartialDeleteCompactionFilter : public CompactionFilter { + public: + CompactionFilter::Decision FilterV2( + int /*level*/, const Slice& key, ValueType /*value_type*/, + const Slice& /*existing_value*/, std::string* /*new_value*/, + std::string* /*skip_until*/) const override { + int i = std::stoi(key.ToString().substr(3)); + if (i > 5 && i <= 105) { + return CompactionFilter::Decision::kRemove; + } + return CompactionFilter::Decision::kKeep; + } + + const char* Name() const override { return "PartialDeleteCompactionFilter"; } +}; + +TEST_F(CompactionServiceTest, CompactionFilter) { + Options options = CurrentOptions(); + options.env = env_; + auto delete_comp_filter = PartialDeleteCompactionFilter(); + options.compaction_filter = &delete_comp_filter; + options.compaction_service = std::make_shared( + dbname_, env_->GetFileSystem(), options); + + DestroyAndReopen(options); + + for (int i = 0; i < 20; i++) { + for (int j = 0; j < 10; j++) { + int key_id = i * 10 + j; + ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id))); + } + ASSERT_OK(Flush()); + } + + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + int key_id = i * 20 + j * 2; + ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id))); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + // verify result + for (int i = 0; i < 200; i++) { + auto result = Get(Key(i)); + if (i > 5 && i <= 105) { + ASSERT_EQ(result, "NOT_FOUND"); + } else if (i % 2) { + ASSERT_EQ(result, "value" + ToString(i)); + } else { + ASSERT_EQ(result, "value_new" + ToString(i)); + } + } + auto my_cs = + dynamic_cast(options.compaction_service.get()); + ASSERT_GE(my_cs->GetCompactionNum(), 1); +} + +TEST_F(CompactionServiceTest, Snapshot) { + Options options = CurrentOptions(); + options.env = env_; + options.compaction_service = std::make_shared( + dbname_, env_->GetFileSystem(), options); + + DestroyAndReopen(options); + + ASSERT_OK(Put(Key(1), "value1")); + ASSERT_OK(Put(Key(2), "value1")); + const Snapshot* s1 = db_->GetSnapshot(); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(Key(1), "value2")); + ASSERT_OK(Put(Key(3), "value2")); + ASSERT_OK(Flush()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + auto my_cs = + dynamic_cast(options.compaction_service.get()); + ASSERT_GE(my_cs->GetCompactionNum(), 1); + ASSERT_EQ("value1", Get(Key(1), s1)); + ASSERT_EQ("value2", Get(Key(1))); + db_->ReleaseSnapshot(s1); +} + +TEST_F(CompactionServiceTest, ConcurrentCompaction) { + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 100; + options.env = env_; + options.compaction_service = std::make_shared( + dbname_, env_->GetFileSystem(), options); + options.max_background_jobs = 20; + + DestroyAndReopen(options); + GenerateTestData(); + + ColumnFamilyMetaData meta; + db_->GetColumnFamilyMetaData(&meta); + + std::vector threads; + for (const auto& file : meta.levels[1].files) { + threads.push_back(std::thread([&]() { + std::string fname = file.db_path + "/" + file.name; + ASSERT_OK(db_->CompactFiles(CompactionOptions(), {fname}, 2)); + })); + } + + for (auto& thread : threads) { + thread.join(); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + // verify result + for (int i = 0; i < 200; i++) { + auto result = Get(Key(i)); + if (i % 2) { + ASSERT_EQ(result, "value" + ToString(i)); + } else { + ASSERT_EQ(result, "value_new" + ToString(i)); + } + } + auto my_cs = + dynamic_cast(options.compaction_service.get()); + ASSERT_EQ(my_cs->GetCompactionNum(), 10); + ASSERT_EQ(FilesPerLevel(), "0,0,10"); +} + +} // namespace ROCKSDB_NAMESPACE + +#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS +extern "C" { +void RegisterCustomObjects(int argc, char** argv); +} +#else +void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {} +#endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + RegisterCustomObjects(argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include + +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, + "SKIPPED as CompactionService is not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // ROCKSDB_LITE diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 290e323e1..6ec2f1c63 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1092,7 +1092,7 @@ Status DBImpl::CompactFiles(const CompactionOptions& compact_options, assert(cfd); Status s; - JobContext job_context(0, true); + JobContext job_context(next_job_id_.fetch_add(1), true); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log.get()); @@ -1243,7 +1243,7 @@ Status DBImpl::CompactFilesImpl( assert(is_snapshot_supported_ || snapshots_.empty()); CompactionJobStats compaction_job_stats; CompactionJob compaction_job( - job_context->job_id, c.get(), immutable_db_options_, + job_context->job_id, c.get(), immutable_db_options_, mutable_db_options_, file_options_for_compaction_, versions_.get(), &shutting_down_, preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(), GetDataDir(c->column_family_data(), c->output_path_id()), @@ -3130,8 +3130,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, assert(is_snapshot_supported_ || snapshots_.empty()); CompactionJob compaction_job( job_context->job_id, c.get(), immutable_db_options_, - file_options_for_compaction_, versions_.get(), &shutting_down_, - preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(), + mutable_db_options_, file_options_for_compaction_, versions_.get(), + &shutting_down_, preserve_deletes_seqnum_.load(), log_buffer, + directories_.GetDbDir(), GetDataDir(c->column_family_data(), c->output_path_id()), GetDataDir(c->column_family_data(), 0), stats_, &mutex_, &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot, @@ -3360,7 +3361,7 @@ bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) { if (m->cfd != m1->cfd) { return false; } - return true; + return false; } #ifndef ROCKSDB_LITE diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 8a845fca9..10c04b3e1 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -11,6 +11,7 @@ #include "db/merge_context.h" #include "logging/auto_roll_logger.h" #include "monitoring/perf_context_imp.h" +#include "rocksdb/configurable.h" #include "util/cast_util.h" namespace ROCKSDB_NAMESPACE { @@ -726,11 +727,11 @@ Status DBImplSecondary::CompactWithoutInstallation( const int job_id = next_job_id_.fetch_add(1); CompactionServiceCompactionJob compaction_job( - job_id, c.get(), immutable_db_options_, file_options_for_compaction_, - versions_.get(), &shutting_down_, &log_buffer, output_dir.get(), stats_, - &mutex_, &error_handler_, input.snapshots, table_cache_, &event_logger_, - dbname_, io_tracer_, db_id_, db_session_id_, secondary_path_, input, - result); + job_id, c.get(), immutable_db_options_, mutable_db_options_, + file_options_for_compaction_, versions_.get(), &shutting_down_, + &log_buffer, output_dir.get(), stats_, &mutex_, &error_handler_, + input.snapshots, table_cache_, &event_logger_, dbname_, io_tracer_, + db_id_, db_session_id_, secondary_path_, input, result); mutex_.Unlock(); s = compaction_job.Run(); @@ -742,6 +743,79 @@ Status DBImplSecondary::CompactWithoutInstallation( c->ReleaseCompactionFiles(s); c.reset(); + TEST_SYNC_POINT_CALLBACK("DBImplSecondary::CompactWithoutInstallation::End", + &s); + result->status = s; + return s; +} + +Status DB::OpenAndCompact( + const std::string& name, const std::string& output_directory, + const std::string& input, std::string* result, + const CompactionServiceOptionsOverride& override_options) { + CompactionServiceInput compaction_input; + Status s = CompactionServiceInput::Read(input, &compaction_input); + if (!s.ok()) { + return s; + } + + compaction_input.db_options.max_open_files = -1; + compaction_input.db_options.compaction_service = nullptr; + if (compaction_input.db_options.statistics) { + compaction_input.db_options.statistics.reset(); + } + compaction_input.db_options.env = override_options.env; + compaction_input.db_options.file_checksum_gen_factory = + override_options.file_checksum_gen_factory; + compaction_input.column_family.options.comparator = + override_options.comparator; + compaction_input.column_family.options.merge_operator = + override_options.merge_operator; + compaction_input.column_family.options.compaction_filter = + override_options.compaction_filter; + compaction_input.column_family.options.compaction_filter_factory = + override_options.compaction_filter_factory; + compaction_input.column_family.options.prefix_extractor = + override_options.prefix_extractor; + compaction_input.column_family.options.table_factory = + override_options.table_factory; + compaction_input.column_family.options.sst_partitioner_factory = + override_options.sst_partitioner_factory; + + std::vector column_families; + column_families.push_back(compaction_input.column_family); + // TODO: we have to open default CF, because of an implementation limitation, + // currently we just use the same CF option from input, which is not collect + // and open may fail. + if (compaction_input.column_family.name != kDefaultColumnFamilyName) { + column_families.emplace_back(kDefaultColumnFamilyName, + compaction_input.column_family.options); + } + + DB* db; + std::vector handles; + + s = DB::OpenAsSecondary(compaction_input.db_options, name, output_directory, + column_families, &handles, &db); + if (!s.ok()) { + return s; + } + + CompactionServiceResult compaction_result; + DBImplSecondary* db_secondary = static_cast_with_check(db); + assert(handles.size() > 0); + s = db_secondary->CompactWithoutInstallation(handles[0], compaction_input, + &compaction_result); + + Status serialization_status = compaction_result.Write(result); + + for (auto& handle : handles) { + delete handle; + } + delete db; + if (s.ok()) { + return serialization_status; + } return s; } diff --git a/db/db_secondary_test.cc b/db/db_secondary_test.cc index 9de935204..13ec1eec0 100644 --- a/db/db_secondary_test.cc +++ b/db/db_secondary_test.cc @@ -187,6 +187,7 @@ TEST_F(DBSecondaryTest, SimpleInternalCompaction) { ASSERT_EQ(result.output_path, this->secondary_path_); ASSERT_EQ(result.num_output_records, 2); ASSERT_GT(result.bytes_written, 0); + ASSERT_OK(result.status); } TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) { @@ -235,6 +236,7 @@ TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) { CompactionServiceResult result; ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input1, &result)); + ASSERT_OK(result.status); // pick 2 files on level 1 for compaction, which has 6 overlap files on L2 CompactionServiceInput input2; @@ -247,6 +249,7 @@ TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) { input2.output_level = 2; ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input2, &result)); + ASSERT_OK(result.status); CloseSecondary(); @@ -259,6 +262,7 @@ TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) { Status s = db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input2, &result); ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_OK(result.status); // TODO: L0 -> L1 compaction should success, currently version is not built // if files is missing. @@ -304,6 +308,7 @@ TEST_F(DBSecondaryTest, InternalCompactionCompactedFiles) { Status s = db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input, &result); ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_OK(result.status); } TEST_F(DBSecondaryTest, InternalCompactionMissingFiles) { @@ -340,11 +345,13 @@ TEST_F(DBSecondaryTest, InternalCompactionMissingFiles) { Status s = db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input, &result); ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_OK(result.status); input.input_files.erase(input.input_files.begin()); ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input, &result)); + ASSERT_OK(result.status); } TEST_F(DBSecondaryTest, OpenAsSecondary) { diff --git a/db/output_validator.h b/db/output_validator.h index 644c3f8c0..ad9000d5e 100644 --- a/db/output_validator.h +++ b/db/output_validator.h @@ -17,8 +17,10 @@ namespace ROCKSDB_NAMESPACE { class OutputValidator { public: explicit OutputValidator(const InternalKeyComparator& icmp, - bool enable_order_check, bool enable_hash) + bool enable_order_check, bool enable_hash, + uint64_t precalculated_hash = 0) : icmp_(icmp), + paranoid_hash_(precalculated_hash), enable_order_check_(enable_order_check), enable_hash_(enable_hash) {} @@ -37,8 +39,6 @@ class OutputValidator { // without notice between releases. uint64_t GetHash() const { return paranoid_hash_; } - void SetHash(uint64_t hash) { paranoid_hash_ = hash; } - private: const InternalKeyComparator& icmp_; std::string prev_key_; diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 330313e35..4815c75e0 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -239,6 +239,16 @@ class DB { const std::vector& column_families, std::vector* handles, DB** dbptr); + // Open DB and run the compaction. + // It's a read-only operation, the result won't be installed to the DB, it + // will be output to the `output_directory`. The API should only be used with + // `options.CompactionService` to run compaction triggered by + // `CompactionService`. + static Status OpenAndCompact( + const std::string& name, const std::string& output_directory, + const std::string& input, std::string* output, + const CompactionServiceOptionsOverride& override_options); + virtual Status Resume() { return Status::NotSupported(); } // Close the DB by releasing resources, closing files etc. This should be diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 1d5bcc0db..e6e212098 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -364,6 +364,30 @@ struct DbPath { extern const char* kHostnameForDbHostId; +enum class CompactionServiceJobStatus : char { + kSuccess, + kFailure, + kUseLocal, // TODO: Add support for use local compaction +}; + +class CompactionService { + public: + // Start the compaction with input information, which can be passed to + // `DB::OpenAndCompact()`. + // job_id is pre-assigned, it will be reset after DB re-open. + // TODO: sub-compaction is not supported, as they will have the same job_id, a + // sub-compaction id might be added + virtual CompactionServiceJobStatus Start( + const std::string& compaction_service_input, int job_id) = 0; + + // Wait compaction to be finish. + // TODO: Add output path override + virtual CompactionServiceJobStatus WaitForComplete( + int job_id, std::string* compaction_service_result) = 0; + + virtual ~CompactionService() {} +}; + struct DBOptions { // The function recovers options to the option as in version 4.6. DBOptions* OldDefaults(int rocksdb_major_version = 4, @@ -1215,6 +1239,15 @@ struct DBOptions { // should enble this set as empty. Otherwise,it may cause unexpected // write failures. FileTypeSet checksum_handoff_file_types; + + // EXPERIMENTAL + // CompactionService is a feature allows the user to run compactions on a + // different host or process, which offloads the background load from the + // primary host. + // It's an experimental feature, the interface will be changed without + // backward/forward compatibility support for now. Some known issues are still + // under development. + std::shared_ptr compaction_service = nullptr; }; // Options to control the behavior of a database (passed to DB::Open) @@ -1725,4 +1758,20 @@ struct SizeApproximationOptions { double files_size_error_margin = -1.0; }; +struct CompactionServiceOptionsOverride { + // Currently pointer configurations are not passed to compaction service + // compaction so the user needs to set it. It will be removed once pointer + // configuration passing is supported. + Env* env = Env::Default(); + std::shared_ptr file_checksum_gen_factory = nullptr; + + const Comparator* comparator = BytewiseComparator(); + std::shared_ptr merge_operator = nullptr; + const CompactionFilter* compaction_filter = nullptr; + std::shared_ptr compaction_filter_factory = nullptr; + std::shared_ptr prefix_extractor = nullptr; + std::shared_ptr table_factory; + std::shared_ptr sst_partitioner_factory = nullptr; +}; + } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index 235cea15d..1de2ebcb0 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -132,6 +132,10 @@ class Status { }; Status(const Status& s, Severity sev); + + Status(Code _code, SubCode _subcode, Severity _sev, const Slice& msg) + : Status(_code, _subcode, msg, "", _sev) {} + Severity severity() const { MarkChecked(); return sev_; @@ -463,7 +467,8 @@ class Status { explicit Status(Code _code, SubCode _subcode = kNone) : code_(_code), subcode_(_subcode), sev_(kNoError), state_(nullptr) {} - Status(Code _code, SubCode _subcode, const Slice& msg, const Slice& msg2); + Status(Code _code, SubCode _subcode, const Slice& msg, const Slice& msg2, + Severity sev = kNoError); Status(Code _code, const Slice& msg, const Slice& msg2) : Status(_code, kNone, msg, msg2) {} diff --git a/include/rocksdb/utilities/options_type.h b/include/rocksdb/utilities/options_type.h index 96e0ef8cf..7057c78ac 100644 --- a/include/rocksdb/utilities/options_type.h +++ b/include/rocksdb/utilities/options_type.h @@ -25,6 +25,7 @@ enum class OptionType { kInt32T, kInt64T, kUInt, + kUInt8T, kUInt32T, kUInt64T, kSizeT, diff --git a/options/db_options.cc b/options/db_options.cc index ede1d80e6..b08402599 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -578,7 +578,8 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) bgerror_resume_retry_interval(options.bgerror_resume_retry_interval), allow_data_in_errors(options.allow_data_in_errors), db_host_id(options.db_host_id), - checksum_handoff_file_types(options.checksum_handoff_file_types) { + checksum_handoff_file_types(options.checksum_handoff_file_types), + compaction_service(options.compaction_service) { stats = statistics.get(); fs = env->GetFileSystem(); if (env != nullptr) { diff --git a/options/db_options.h b/options/db_options.h index af1707a5a..85968ffcf 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -99,6 +99,7 @@ struct ImmutableDBOptions { SystemClock* clock; Statistics* stats; Logger* logger; + std::shared_ptr compaction_service; }; struct MutableDBOptions { diff --git a/options/options_helper.cc b/options/options_helper.cc index 0b5b07b7a..68bd6a4c9 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -435,6 +435,9 @@ static bool ParseOptionHelper(void* opt_address, const OptionType& opt_type, case OptionType::kUInt: *static_cast(opt_address) = ParseUint32(value); break; + case OptionType::kUInt8T: + *static_cast(opt_address) = ParseUint8(value); + break; case OptionType::kUInt32T: *static_cast(opt_address) = ParseUint32(value); break; @@ -510,6 +513,9 @@ bool SerializeSingleOptionHelper(const void* opt_address, case OptionType::kUInt: *value = ToString(*(static_cast(opt_address))); break; + case OptionType::kUInt8T: + *value = ToString(*(static_cast(opt_address))); + break; case OptionType::kUInt32T: *value = ToString(*(static_cast(opt_address))); break; @@ -1224,6 +1230,8 @@ static bool AreOptionsEqual(OptionType type, const void* this_offset, GetUnaligned(static_cast(that_offset), &v2); return (v1 == v2); } + case OptionType::kUInt8T: + return IsOptionEqual(this_offset, that_offset); case OptionType::kUInt32T: return IsOptionEqual(this_offset, that_offset); case OptionType::kUInt64T: { diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 3ecdf952e..a9f5d7bcd 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -230,6 +230,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { {offsetof(struct DBOptions, db_host_id), sizeof(std::string)}, {offsetof(struct DBOptions, checksum_handoff_file_types), sizeof(FileTypeSet)}, + {offsetof(struct DBOptions, compaction_service), + sizeof(std::shared_ptr)}, }; char* options_ptr = new char[sizeof(DBOptions)]; diff --git a/src.mk b/src.mk index 09ba4f04a..1a99b55a8 100644 --- a/src.mk +++ b/src.mk @@ -392,6 +392,7 @@ TEST_MAIN_SOURCES = \ db/compaction/compaction_job_test.cc \ db/compaction/compaction_job_stats_test.cc \ db/compaction/compaction_picker_test.cc \ + db/compaction/compaction_service_test.cc \ db/comparator_db_test.cc \ db/corruption_test.cc \ db/cuckoo_table_db_test.cc \ diff --git a/util/status.cc b/util/status.cc index 857f16dcd..efd83b0ae 100644 --- a/util/status.cc +++ b/util/status.cc @@ -58,9 +58,8 @@ static const char* msgs[static_cast(Status::kMaxSubCode)] = { }; Status::Status(Code _code, SubCode _subcode, const Slice& msg, - const Slice& msg2) - : code_(_code), subcode_(_subcode), sev_(kNoError) { - assert(code_ != kOk); + const Slice& msg2, Severity sev) + : code_(_code), subcode_(_subcode), sev_(sev) { assert(subcode_ != kMaxSubCode); const size_t len1 = msg.size(); const size_t len2 = msg2.size(); diff --git a/util/string_util.cc b/util/string_util.cc index c5a7a7cae..5e1149119 100644 --- a/util/string_util.cc +++ b/util/string_util.cc @@ -301,6 +301,15 @@ bool ParseBoolean(const std::string& type, const std::string& value) { throw std::invalid_argument(type); } +uint8_t ParseUint8(const std::string& value) { + uint64_t num = ParseUint64(value); + if ((num >> 8LL) == 0) { + return static_cast(num); + } else { + throw std::out_of_range(value); + } +} + uint32_t ParseUint32(const std::string& value) { uint64_t num = ParseUint64(value); if ((num >> 32LL) == 0) { diff --git a/util/string_util.h b/util/string_util.h index 83fa5781d..195ae8b0b 100644 --- a/util/string_util.h +++ b/util/string_util.h @@ -120,6 +120,8 @@ bool StartsWith(const std::string& string, const std::string& pattern); #ifndef ROCKSDB_LITE bool ParseBoolean(const std::string& type, const std::string& value); +uint8_t ParseUint8(const std::string& value); + uint32_t ParseUint32(const std::string& value); int32_t ParseInt32(const std::string& value);