From a79b46c50316b580dfd628828d246a79304af254 Mon Sep 17 00:00:00 2001 From: Jay Zhuang Date: Wed, 12 May 2021 12:34:22 -0700 Subject: [PATCH] Add De/Serialization for CompactionInput/Result (#8247) Summary: The functions will be used for remote compaction parameter input and result. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8247 Test Plan: `make check` Reviewed By: ajkr Differential Revision: D28104680 Pulled By: jay-zhuang fbshipit-source-id: c0a5178e6277125118384278efea2acbf90aa6cb --- db/compaction/compaction_job.cc | 420 ++++++++++++++++++++++- db/compaction/compaction_job.h | 27 +- db/compaction/compaction_job_test.cc | 168 +++++++++ include/rocksdb/utilities/options_type.h | 1 + options/options_helper.cc | 12 + util/random.cc | 9 + util/random.h | 3 + 7 files changed, 635 insertions(+), 5 deletions(-) diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 5aefac430..97b47a6f5 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -46,6 +46,7 @@ #include "monitoring/iostats_context_imp.h" #include "monitoring/perf_context_imp.h" #include "monitoring/thread_status_util.h" +#include "options/configurable_helper.h" #include "options/options_helper.h" #include "port/port.h" #include "rocksdb/db.h" @@ -54,6 +55,7 @@ #include "rocksdb/statistics.h" #include "rocksdb/status.h" #include "rocksdb/table.h" +#include "rocksdb/utilities/options_type.h" #include "table/block_based/block.h" #include "table/block_based/block_based_table_factory.h" #include "table/merging_iterator.h" @@ -1939,6 +1941,7 @@ std::string CompactionJob::GetTableFileName(uint64_t file_number) { file_number, compact_->compaction->output_path_id()); } +#ifndef ROCKSDB_LITE std::string CompactionServiceCompactionJob::GetTableFileName( uint64_t file_number) { return MakeTableFileName(output_path_, file_number); @@ -1984,9 +1987,12 @@ Status CompactionServiceCompactionJob::Run() { c->column_family_data()->CalculateSSTWriteHint(c->output_level()); bottommost_level_ = c->bottommost_level(); - compact_->sub_compact_states.emplace_back(c, compaction_input_.begin, - compaction_input_.end, - compaction_input_.approx_size); + Slice begin = compaction_input_.begin; + Slice end = compaction_input_.end; + compact_->sub_compact_states.emplace_back( + c, compaction_input_.has_begin ? &begin : nullptr, + compaction_input_.has_end ? &end : nullptr, + compaction_input_.approx_size); log_buffer_->FlushBufferToLog(); LogCompaction(); @@ -2062,4 +2068,412 @@ void CompactionServiceCompactionJob::CleanupCompaction() { CompactionJob::CleanupCompaction(); } +// Internal binary format for the input and result data +enum BinaryFormatVersion : uint32_t { + kOptionsString = 1, // Use string format similar to Option string format +}; + +// offset_of is used to get the offset of a class data member +// ex: offset_of(&ColumnFamilyDescriptor::options) +// This call will return the offset of options in ColumnFamilyDescriptor class +// +// This is the same as offsetof() but allow us to work with non standard-layout +// classes and structures +// refs: +// http://en.cppreference.com/w/cpp/concept/StandardLayoutType +// https://gist.github.com/graphitemaster/494f21190bb2c63c5516 +static ColumnFamilyDescriptor dummy_cfd("", ColumnFamilyOptions()); +template +int offset_of(T1 ColumnFamilyDescriptor::*member) { + return int(size_t(&(dummy_cfd.*member)) - size_t(&dummy_cfd)); +} + +static CompactionServiceInput dummy_cs_input; +template +int offset_of(T1 CompactionServiceInput::*member) { + return int(size_t(&(dummy_cs_input.*member)) - size_t(&dummy_cs_input)); +} + +static std::unordered_map cfd_type_info = { + {"name", + {offset_of(&ColumnFamilyDescriptor::name), OptionType::kEncodedString, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"options", + {offset_of(&ColumnFamilyDescriptor::options), OptionType::kConfigurable, + OptionVerificationType::kNormal, OptionTypeFlags::kNone, + [](const ConfigOptions& opts, const std::string& /*name*/, + const std::string& value, char* addr) { + auto cf_options = reinterpret_cast(addr); + return GetColumnFamilyOptionsFromString(opts, ColumnFamilyOptions(), + value, cf_options); + }, + [](const ConfigOptions& opts, const std::string& /*name*/, + const char* addr, std::string* value) { + const auto cf_options = + reinterpret_cast(addr); + std::string result; + auto status = + GetStringFromColumnFamilyOptions(opts, *cf_options, &result); + *value = "{" + result + "}"; + return status; + }, + [](const ConfigOptions& opts, const std::string& name, const char* addr1, + const char* addr2, std::string* mismatch) { + const auto this_one = + reinterpret_cast(addr1); + const auto that_one = + reinterpret_cast(addr2); + auto this_conf = CFOptionsAsConfigurable(*this_one); + auto that_conf = CFOptionsAsConfigurable(*that_one); + std::string mismatch_opt; + bool result = + this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt); + if (!result) { + *mismatch = name + "." + mismatch_opt; + } + return result; + }}}, +}; + +static std::unordered_map cs_input_type_info = { + {"column_family", + OptionTypeInfo::Struct("column_family", &cfd_type_info, + offset_of(&CompactionServiceInput::column_family), + OptionVerificationType::kNormal, + OptionTypeFlags::kNone)}, + {"db_options", + {offset_of(&CompactionServiceInput::db_options), OptionType::kConfigurable, + OptionVerificationType::kNormal, OptionTypeFlags::kNone, + [](const ConfigOptions& opts, const std::string& /*name*/, + const std::string& value, char* addr) { + auto options = reinterpret_cast(addr); + return GetDBOptionsFromString(opts, DBOptions(), value, options); + }, + [](const ConfigOptions& opts, const std::string& /*name*/, + const char* addr, std::string* value) { + const auto options = reinterpret_cast(addr); + std::string result; + auto status = GetStringFromDBOptions(opts, *options, &result); + *value = "{" + result + "}"; + return status; + }, + [](const ConfigOptions& opts, const std::string& name, const char* addr1, + const char* addr2, std::string* mismatch) { + const auto this_one = reinterpret_cast(addr1); + const auto that_one = reinterpret_cast(addr2); + auto this_conf = DBOptionsAsConfigurable(*this_one); + auto that_conf = DBOptionsAsConfigurable(*that_one); + std::string mismatch_opt; + bool result = + this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt); + if (!result) { + *mismatch = name + "." + mismatch_opt; + } + return result; + }}}, + {"snapshots", OptionTypeInfo::Vector( + offset_of(&CompactionServiceInput::snapshots), + OptionVerificationType::kNormal, OptionTypeFlags::kNone, + {0, OptionType::kUInt64T})}, + {"input_files", OptionTypeInfo::Vector( + offset_of(&CompactionServiceInput::input_files), + OptionVerificationType::kNormal, OptionTypeFlags::kNone, + {0, OptionType::kEncodedString})}, + {"output_level", + {offset_of(&CompactionServiceInput::output_level), OptionType::kInt, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"has_begin", + {offset_of(&CompactionServiceInput::has_begin), OptionType::kBoolean, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"begin", + {offset_of(&CompactionServiceInput::begin), OptionType::kEncodedString, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"has_end", + {offset_of(&CompactionServiceInput::has_end), OptionType::kBoolean, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"end", + {offset_of(&CompactionServiceInput::end), OptionType::kEncodedString, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"approx_size", + {offset_of(&CompactionServiceInput::approx_size), OptionType::kUInt64T, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, +}; + +static std::unordered_map + cs_output_file_type_info = { + {"file_name", + {offsetof(struct CompactionServiceOutputFile, file_name), + OptionType::kEncodedString, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"smallest_seqno", + {offsetof(struct CompactionServiceOutputFile, smallest_seqno), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"largest_seqno", + {offsetof(struct CompactionServiceOutputFile, largest_seqno), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"smallest_internal_key", + {offsetof(struct CompactionServiceOutputFile, smallest_internal_key), + OptionType::kEncodedString, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"largest_internal_key", + {offsetof(struct CompactionServiceOutputFile, largest_internal_key), + OptionType::kEncodedString, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"oldest_ancester_time", + {offsetof(struct CompactionServiceOutputFile, oldest_ancester_time), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"file_creation_time", + {offsetof(struct CompactionServiceOutputFile, file_creation_time), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"paranoid_hash", + {offsetof(struct CompactionServiceOutputFile, paranoid_hash), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"marked_for_compaction", + {offsetof(struct CompactionServiceOutputFile, marked_for_compaction), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, +}; + +static std::unordered_map + compaction_job_stats_type_info = { + {"elapsed_micros", + {offsetof(struct CompactionJobStats, elapsed_micros), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"cpu_micros", + {offsetof(struct CompactionJobStats, cpu_micros), OptionType::kUInt64T, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"num_input_records", + {offsetof(struct CompactionJobStats, num_input_records), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"num_blobs_read", + {offsetof(struct CompactionJobStats, num_blobs_read), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"num_input_files", + {offsetof(struct CompactionJobStats, num_input_files), + OptionType::kSizeT, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"num_input_files_at_output_level", + {offsetof(struct CompactionJobStats, num_input_files_at_output_level), + OptionType::kSizeT, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"num_output_records", + {offsetof(struct CompactionJobStats, num_output_records), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"num_output_files", + {offsetof(struct CompactionJobStats, num_output_files), + OptionType::kSizeT, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"num_output_files_blob", + {offsetof(struct CompactionJobStats, num_output_files_blob), + OptionType::kSizeT, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"is_full_compaction", + {offsetof(struct CompactionJobStats, is_full_compaction), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"is_manual_compaction", + {offsetof(struct CompactionJobStats, is_manual_compaction), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"total_input_bytes", + {offsetof(struct CompactionJobStats, total_input_bytes), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"total_blob_bytes_read", + {offsetof(struct CompactionJobStats, total_blob_bytes_read), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"total_output_bytes", + {offsetof(struct CompactionJobStats, total_output_bytes), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"total_output_bytes_blob", + {offsetof(struct CompactionJobStats, total_output_bytes_blob), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"num_records_replaced", + {offsetof(struct CompactionJobStats, num_records_replaced), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"total_input_raw_key_bytes", + {offsetof(struct CompactionJobStats, total_input_raw_key_bytes), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"total_input_raw_value_bytes", + {offsetof(struct CompactionJobStats, total_input_raw_value_bytes), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"num_input_deletion_records", + {offsetof(struct CompactionJobStats, num_input_deletion_records), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"num_expired_deletion_records", + {offsetof(struct CompactionJobStats, num_expired_deletion_records), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"num_corrupt_keys", + {offsetof(struct CompactionJobStats, num_corrupt_keys), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"file_write_nanos", + {offsetof(struct CompactionJobStats, file_write_nanos), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"file_range_sync_nanos", + {offsetof(struct CompactionJobStats, file_range_sync_nanos), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"file_fsync_nanos", + {offsetof(struct CompactionJobStats, file_fsync_nanos), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"file_prepare_write_nanos", + {offsetof(struct CompactionJobStats, file_prepare_write_nanos), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"smallest_output_key_prefix", + {offsetof(struct CompactionJobStats, smallest_output_key_prefix), + OptionType::kEncodedString, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"largest_output_key_prefix", + {offsetof(struct CompactionJobStats, largest_output_key_prefix), + OptionType::kEncodedString, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"num_single_del_fallthru", + {offsetof(struct CompactionJobStats, num_single_del_fallthru), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"num_single_del_mismatch", + {offsetof(struct CompactionJobStats, num_single_del_mismatch), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, +}; + +static std::unordered_map cs_result_type_info = { + {"output_files", + OptionTypeInfo::Vector( + offsetof(struct CompactionServiceResult, output_files), + OptionVerificationType::kNormal, OptionTypeFlags::kNone, + OptionTypeInfo::Struct("output_files", &cs_output_file_type_info, 0, + OptionVerificationType::kNormal, + OptionTypeFlags::kNone))}, + {"output_level", + {offsetof(struct CompactionServiceResult, output_level), OptionType::kInt, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"output_path", + {offsetof(struct CompactionServiceResult, output_path), + OptionType::kEncodedString, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"num_output_records", + {offsetof(struct CompactionServiceResult, num_output_records), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"total_bytes", + {offsetof(struct CompactionServiceResult, total_bytes), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"bytes_read", + {offsetof(struct CompactionServiceResult, bytes_read), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"bytes_written", + {offsetof(struct CompactionServiceResult, bytes_written), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"stats", OptionTypeInfo::Struct( + "stats", &compaction_job_stats_type_info, + offsetof(struct CompactionServiceResult, stats), + OptionVerificationType::kNormal, OptionTypeFlags::kNone)}, +}; + +Status CompactionServiceInput::Read(const std::string& data_str, + CompactionServiceInput* obj) { + auto format_version = DecodeFixed32(data_str.data()); + if (format_version == kOptionsString) { + ConfigOptions cf; + cf.invoke_prepare_options = false; + cf.ignore_unknown_options = true; + return OptionTypeInfo::ParseType( + cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_input_type_info, + obj); + } else { + return Status::NotSupported( + "Compaction Service Input data version not supported: " + + ToString(format_version)); + } +} + +Status CompactionServiceInput::Write(std::string* output) { + char buf[sizeof(BinaryFormatVersion)]; + EncodeFixed32(buf, kOptionsString); + output->append(buf, sizeof(BinaryFormatVersion)); + ConfigOptions cf; + cf.invoke_prepare_options = false; + return OptionTypeInfo::SerializeType(cf, cs_input_type_info, this, output); +} + +Status CompactionServiceResult::Read(const std::string& data_str, + CompactionServiceResult* obj) { + auto format_version = DecodeFixed32(data_str.data()); + if (format_version == kOptionsString) { + ConfigOptions cf; + cf.invoke_prepare_options = false; + cf.ignore_unknown_options = true; + return OptionTypeInfo::ParseType( + cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_result_type_info, + obj); + } else { + return Status::NotSupported( + "Compaction Service Result data version not supported: " + + ToString(format_version)); + } +} + +Status CompactionServiceResult::Write(std::string* output) { + char buf[sizeof(BinaryFormatVersion)]; + EncodeFixed32(buf, kOptionsString); + output->append(buf, sizeof(BinaryFormatVersion)); + ConfigOptions cf; + cf.invoke_prepare_options = false; + return OptionTypeInfo::SerializeType(cf, cs_result_type_info, this, output); +} + +#ifndef NDEBUG +bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other) { + std::string mismatch; + return TEST_Equals(other, &mismatch); +} + +bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other, + std::string* mismatch) { + ConfigOptions cf; + cf.invoke_prepare_options = false; + return OptionTypeInfo::TypesAreEqual(cf, cs_result_type_info, this, other, + mismatch); +} + +bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other) { + std::string mismatch; + return TEST_Equals(other, &mismatch); +} + +bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other, + std::string* mismatch) { + ConfigOptions cf; + cf.invoke_prepare_options = false; + return OptionTypeInfo::TypesAreEqual(cf, cs_input_type_info, this, other, + mismatch); +} +#endif // NDEBUG +#endif // !ROCKSDB_LITE + } // namespace ROCKSDB_NAMESPACE diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 2a0e15e17..1b11292ef 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -235,9 +235,23 @@ struct CompactionServiceInput { int output_level; // information for subcompaction - Slice* begin = nullptr; - Slice* end = nullptr; + bool has_begin = false; + std::string begin; + bool has_end = false; + std::string end; uint64_t approx_size = 0; + + // serialization interface to read and write the object + static Status Read(const std::string& data_str, CompactionServiceInput* obj); + Status Write(std::string* output); + + // Initialize a dummy ColumnFamilyDescriptor + CompactionServiceInput() : column_family("", ColumnFamilyOptions()) {} + +#ifndef NDEBUG + bool TEST_Equals(CompactionServiceInput* other); + bool TEST_Equals(CompactionServiceInput* other, std::string* mismatch); +#endif // NDEBUG }; // CompactionServiceOutputFile is the metadata for the output SST file @@ -285,6 +299,15 @@ struct CompactionServiceResult { uint64_t bytes_read; uint64_t bytes_written; CompactionJobStats stats; + + // serialization interface to read and write the object + static Status Read(const std::string& data_str, CompactionServiceResult* obj); + Status Write(std::string* output); + +#ifndef NDEBUG + bool TEST_Equals(CompactionServiceResult* other); + bool TEST_Equals(CompactionServiceResult* other, std::string* mismatch); +#endif // NDEBUG }; // CompactionServiceCompactionJob is an read-only compaction job, it takes diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 839ba35c8..3b43cbbff 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -1096,6 +1096,174 @@ TEST_F(CompactionJobTest, OldestBlobFileNumber) { /* expected_oldest_blob_file_number */ 19); } +TEST_F(CompactionJobTest, InputSerialization) { + // Setup a random CompactionServiceInput + CompactionServiceInput input; + const int kStrMaxLen = 1000; + Random rnd(static_cast(time(nullptr))); + Random64 rnd64(time(nullptr)); + input.column_family.name = rnd.RandomString(rnd.Uniform(kStrMaxLen)); + input.column_family.options.comparator = ReverseBytewiseComparator(); + input.column_family.options.max_bytes_for_level_base = + rnd64.Uniform(UINT64_MAX); + input.column_family.options.disable_auto_compactions = rnd.OneIn(2); + input.column_family.options.compression = kZSTD; + input.column_family.options.compression_opts.level = 4; + input.db_options.max_background_flushes = 10; + input.db_options.paranoid_checks = rnd.OneIn(2); + input.db_options.statistics = CreateDBStatistics(); + input.db_options.env = env_; + while (!rnd.OneIn(10)) { + input.snapshots.emplace_back(rnd64.Uniform(UINT64_MAX)); + } + while (!rnd.OneIn(10)) { + input.input_files.emplace_back(rnd.RandomString(rnd.Uniform(kStrMaxLen))); + } + input.output_level = 4; + input.has_begin = rnd.OneIn(2); + if (input.has_begin) { + input.begin = rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)); + } + input.has_end = rnd.OneIn(2); + if (input.has_end) { + input.end = rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)); + } + input.approx_size = rnd64.Uniform(UINT64_MAX); + + std::string output; + ASSERT_OK(input.Write(&output)); + + // Test deserialization + CompactionServiceInput deserialized1; + ASSERT_OK(CompactionServiceInput::Read(output, &deserialized1)); + ASSERT_TRUE(deserialized1.TEST_Equals(&input)); + + // Test mismatch + deserialized1.db_options.max_background_flushes += 10; + std::string mismatch; + ASSERT_FALSE(deserialized1.TEST_Equals(&input, &mismatch)); + ASSERT_EQ(mismatch, "db_options.max_background_flushes"); + + // Test unknown field + CompactionServiceInput deserialized2; + output.clear(); + ASSERT_OK(input.Write(&output)); + output.append("new_field=123;"); + + ASSERT_OK(CompactionServiceInput::Read(output, &deserialized2)); + ASSERT_TRUE(deserialized2.TEST_Equals(&input)); + + // Test missing field + CompactionServiceInput deserialized3; + deserialized3.output_level = 0; + std::string to_remove = "output_level=4;"; + size_t pos = output.find(to_remove); + ASSERT_TRUE(pos != std::string::npos); + output.erase(pos, to_remove.length()); + ASSERT_OK(CompactionServiceInput::Read(output, &deserialized3)); + mismatch.clear(); + ASSERT_FALSE(deserialized3.TEST_Equals(&input, &mismatch)); + ASSERT_EQ(mismatch, "output_level"); + + // manually set the value back, should match the original structure + deserialized3.output_level = 4; + ASSERT_TRUE(deserialized3.TEST_Equals(&input)); + + // Test invalid version + output.clear(); + ASSERT_OK(input.Write(&output)); + + uint32_t data_version = DecodeFixed32(output.data()); + const size_t kDataVersionSize = sizeof(data_version); + ASSERT_EQ(data_version, + 1U); // Update once the default data version is changed + char buf[kDataVersionSize]; + EncodeFixed32(buf, data_version + 10); // make sure it's not valid + output.replace(0, kDataVersionSize, buf, kDataVersionSize); + Status s = CompactionServiceInput::Read(output, &deserialized3); + ASSERT_TRUE(s.IsNotSupported()); +} + +TEST_F(CompactionJobTest, ResultSerialization) { + // Setup a random CompactionServiceResult + CompactionServiceResult result; + const int kStrMaxLen = 1000; + Random rnd(static_cast(time(nullptr))); + Random64 rnd64(time(nullptr)); + while (!rnd.OneIn(10)) { + result.output_files.emplace_back( + rnd.RandomString(rnd.Uniform(kStrMaxLen)), rnd64.Uniform(UINT64_MAX), + rnd64.Uniform(UINT64_MAX), + rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)), + rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)), + rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX), + rnd64.Uniform(UINT64_MAX), rnd.OneIn(2)); + } + result.output_level = rnd.Uniform(10); + result.output_path = rnd.RandomString(rnd.Uniform(kStrMaxLen)); + result.num_output_records = rnd64.Uniform(UINT64_MAX); + result.total_bytes = rnd64.Uniform(UINT64_MAX); + result.bytes_read = 123; + result.bytes_written = rnd64.Uniform(UINT64_MAX); + result.stats.elapsed_micros = rnd64.Uniform(UINT64_MAX); + result.stats.num_output_files = rnd.Uniform(1000); + result.stats.is_full_compaction = rnd.OneIn(2); + result.stats.num_single_del_mismatch = rnd64.Uniform(UINT64_MAX); + result.stats.num_input_files = 9; + + std::string output; + ASSERT_OK(result.Write(&output)); + + // Test deserialization + CompactionServiceResult deserialized1; + ASSERT_OK(CompactionServiceResult::Read(output, &deserialized1)); + ASSERT_TRUE(deserialized1.TEST_Equals(&result)); + + // Test mismatch + deserialized1.stats.num_input_files += 10; + std::string mismatch; + ASSERT_FALSE(deserialized1.TEST_Equals(&result, &mismatch)); + ASSERT_EQ(mismatch, "stats.num_input_files"); + + // Test unknown field + CompactionServiceResult deserialized2; + output.clear(); + ASSERT_OK(result.Write(&output)); + output.append("new_field=123;"); + + ASSERT_OK(CompactionServiceResult::Read(output, &deserialized2)); + ASSERT_TRUE(deserialized2.TEST_Equals(&result)); + + // Test missing field + CompactionServiceResult deserialized3; + deserialized3.bytes_read = 0; + std::string to_remove = "bytes_read=123;"; + size_t pos = output.find(to_remove); + ASSERT_TRUE(pos != std::string::npos); + output.erase(pos, to_remove.length()); + ASSERT_OK(CompactionServiceResult::Read(output, &deserialized3)); + mismatch.clear(); + ASSERT_FALSE(deserialized3.TEST_Equals(&result, &mismatch)); + ASSERT_EQ(mismatch, "bytes_read"); + + deserialized3.bytes_read = 123; + ASSERT_TRUE(deserialized3.TEST_Equals(&result)); + + // Test invalid version + output.clear(); + ASSERT_OK(result.Write(&output)); + + uint32_t data_version = DecodeFixed32(output.data()); + const size_t kDataVersionSize = sizeof(data_version); + ASSERT_EQ(data_version, + 1U); // Update once the default data version is changed + char buf[kDataVersionSize]; + EncodeFixed32(buf, data_version + 10); // make sure it's not valid + output.replace(0, kDataVersionSize, buf, kDataVersionSize); + Status s = CompactionServiceResult::Read(output, &deserialized3); + ASSERT_TRUE(s.IsNotSupported()); +} + class CompactionJobTimestampTest : public CompactionJobTestBase { public: CompactionJobTimestampTest() diff --git a/include/rocksdb/utilities/options_type.h b/include/rocksdb/utilities/options_type.h index 3ac0b553e..cdab9df3f 100644 --- a/include/rocksdb/utilities/options_type.h +++ b/include/rocksdb/utilities/options_type.h @@ -50,6 +50,7 @@ enum class OptionType { kVector, kConfigurable, kCustomizable, + kEncodedString, kUnknown, }; diff --git a/options/options_helper.cc b/options/options_helper.cc index e6b4d7545..670e6e9c4 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -478,6 +478,11 @@ static bool ParseOptionHelper(char* opt_address, const OptionType& opt_type, return ParseEnum( compaction_stop_style_string_map, value, reinterpret_cast(opt_address)); + case OptionType::kEncodedString: { + std::string* output_addr = reinterpret_cast(opt_address); + (Slice(value)).DecodeHex(output_addr); + break; + } default: return false; } @@ -622,6 +627,11 @@ bool SerializeSingleOptionHelper(const char* opt_address, return SerializeEnum( compaction_stop_style_string_map, *reinterpret_cast(opt_address), value); + case OptionType::kEncodedString: { + const auto* ptr = reinterpret_cast(opt_address); + *value = (Slice(*ptr)).ToString(true); + break; + } default: return false; } @@ -1251,6 +1261,8 @@ static bool AreOptionsEqual(OptionType type, const char* this_offset, return IsOptionEqual(this_offset, that_offset); case OptionType::kEncodingType: return IsOptionEqual(this_offset, that_offset); + case OptionType::kEncodedString: + return IsOptionEqual(this_offset, that_offset); default: return false; } // End switch diff --git a/util/random.cc b/util/random.cc index 68624ad43..f1ab02939 100644 --- a/util/random.cc +++ b/util/random.cc @@ -53,4 +53,13 @@ std::string Random::RandomString(int len) { return ret; } +std::string Random::RandomBinaryString(int len) { + std::string ret; + ret.resize(len); + for (int i = 0; i < len; i++) { + ret[i] = static_cast(Uniform(CHAR_MAX)); + } + return ret; +} + } // namespace ROCKSDB_NAMESPACE diff --git a/util/random.h b/util/random.h index 5f6eaf51e..7e1350f06 100644 --- a/util/random.h +++ b/util/random.h @@ -92,6 +92,9 @@ class Random { // Generates a random string of len bytes using human-readable characters std::string HumanReadableString(int len); + // Generates a random binary data + std::string RandomBinaryString(int len); + // Returns a Random instance for use by the current thread without // additional locking static Random* GetTLSInstance();