From be41c61f22ad6195e6b3a004762d54ee6c19c3ed Mon Sep 17 00:00:00 2001 From: Zitan Chen <11285749+gg814@users.noreply.github.com> Date: Wed, 24 Jun 2020 19:30:15 -0700 Subject: [PATCH] Add a new option for BackupEngine to store table files under shared_checksum using DB session id in the backup filenames (#6997) Summary: `BackupableDBOptions::new_naming_for_backup_files` is added. This option is false by default. When it is true, backup table filenames under directory shared_checksum are of the form `__.sst`. Note that when this option is true, it comes into effect only when both `share_files_with_checksum` and `share_table_files` are true. Three new test cases are added. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6997 Test Plan: Passed make check. Reviewed By: ajkr Differential Revision: D22098895 Pulled By: gg814 fbshipit-source-id: a1d9145e7fe562d71cde7ac995e17cb24fd42e76 --- CMakeLists.txt | 1 + HISTORY.md | 1 + TARGETS | 1 + db/db_impl/db_impl.cc | 2 +- include/rocksdb/utilities/backupable_db.h | 31 +- src.mk | 1 + table/sst_file_dumper.cc | 478 ++++++++++++++++++ .../sst_file_dumper.h | 7 +- tools/ldb_cmd.cc | 21 +- tools/sst_dump_tool.cc | 458 +---------------- utilities/backupable/backupable_db.cc | 207 ++++++-- utilities/backupable/backupable_db_test.cc | 138 +++++ 12 files changed, 839 insertions(+), 507 deletions(-) create mode 100644 table/sst_file_dumper.cc rename tools/sst_dump_tool_imp.h => table/sst_file_dumper.h (97%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 7170dcc53..eacaed215 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -694,6 +694,7 @@ set(SOURCES table/plain/plain_table_index.cc table/plain/plain_table_key_coding.cc table/plain/plain_table_reader.cc + table/sst_file_dumper.cc table/sst_file_reader.cc table/sst_file_writer.cc table/table_properties.cc diff --git a/HISTORY.md b/HISTORY.md index 2a432ff73..ea1ae1e2f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -16,6 +16,7 @@ ### New Features * DB identity (`db_id`) and DB session identity (`db_session_id`) are added to table properties and stored in SST files. SST files generated from SstFileWriter and Repairer have DB identity “SST Writer” and “DB Repairer”, respectively. Their DB session IDs are generated in the same way as `DB::GetDbSessionId`. The session ID for SstFileWriter (resp., Repairer) resets every time `SstFileWriter::Open` (resp., `Repairer::Run`) is called. * Added experimental option BlockBasedTableOptions::optimize_filters_for_memory for reducing allocated memory size of Bloom filters (~10% savings with Jemalloc) while preserving the same general accuracy. To have an effect, the option requires format_version=5 and malloc_usable_size. Enabling this option is forward and backward compatible with existing format_version=5. +* `BackupableDBOptions::new_naming_for_backup_files` is added. This option is true by default. When it is true, backup table filenames are of the form `__.sst` as opposed to `__.sst`. When there is no `db_session_id` available in the table file, we use `file_size` as a fallback. Note that when this option is true, it comes into effect only when both `share_files_with_checksum` and `share_table_files` are true. ### Bug Fixes * Fail recovery and report once hitting a physical log record checksum mismatch, while reading MANIFEST. RocksDB should not continue processing the MANIFEST any further. diff --git a/TARGETS b/TARGETS index 132e5ff53..df285d497 100644 --- a/TARGETS +++ b/TARGETS @@ -282,6 +282,7 @@ cpp_library( "table/plain/plain_table_index.cc", "table/plain/plain_table_key_coding.cc", "table/plain/plain_table_reader.cc", + "table/sst_file_dumper.cc", "table/sst_file_reader.cc", "table/sst_file_writer.cc", "table/table_properties.cc", diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 3a5ccc831..1c3ffb752 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -87,10 +87,10 @@ #include "table/get_context.h" #include "table/merging_iterator.h" #include "table/multiget_context.h" +#include "table/sst_file_dumper.h" #include "table/table_builder.h" #include "table/two_level_iterator.h" #include "test_util/sync_point.h" -#include "tools/sst_dump_tool_imp.h" #include "util/autovector.h" #include "util/build_version.h" #include "util/cast_util.h" diff --git a/include/rocksdb/utilities/backupable_db.h b/include/rocksdb/utilities/backupable_db.h index 3f7ec9965..f3c3780e7 100644 --- a/include/rocksdb/utilities/backupable_db.h +++ b/include/rocksdb/utilities/backupable_db.h @@ -88,11 +88,17 @@ struct BackupableDBOptions { std::shared_ptr restore_rate_limiter{nullptr}; // Only used if share_table_files is set to true. If true, will consider that - // backups can come from different databases, hence a sst is not uniquely - // identifed by its name, but by the triple (file name, crc32c, file length) - // Default: false - // Note: this is an experimental option, and you'll need to set it manually + // backups can come from different databases, hence an sst is not uniquely + // identifed by its name, but by the triple + // (file name, crc32c, db session id or file length) + // + // Note: If this option is set to true, we recommend setting + // new_naming_for_backup_files to true as well, which is also our default + // option. Otherwise, there is a non-negligible chance of filename collision + // when sharing tables in shared_checksum among several DBs. // *turn it on only if you know what you're doing* + // + // Default: false bool share_files_with_checksum; // Up to this many background threads will copy files for CreateNewBackup() @@ -116,6 +122,17 @@ struct BackupableDBOptions { // Default: INT_MAX int max_valid_backups_to_open; + // If true, backup SST filenames consist of file_number, crc32c, db_session_id + // if false, backup SST filenames consist of file_number, crc32c, file_size + // + // Default: true + // + // Note: This option comes into effect only if both share_files_with_checksum + // and share_table_files are true. In the cases of old table files where no + // db_session_id is stored, we use the file_size to replace the empty + // db_session_id as a fallback. + bool new_naming_for_backup_files; + void Dump(Logger* logger) const; explicit BackupableDBOptions( @@ -125,7 +142,8 @@ struct BackupableDBOptions { bool _backup_log_files = true, uint64_t _backup_rate_limit = 0, uint64_t _restore_rate_limit = 0, int _max_background_operations = 1, uint64_t _callback_trigger_interval_size = 4 * 1024 * 1024, - int _max_valid_backups_to_open = INT_MAX) + int _max_valid_backups_to_open = INT_MAX, + bool _new_naming_for_backup_files = true) : backup_dir(_backup_dir), backup_env(_backup_env), share_table_files(_share_table_files), @@ -138,7 +156,8 @@ struct BackupableDBOptions { share_files_with_checksum(false), max_background_operations(_max_background_operations), callback_trigger_interval_size(_callback_trigger_interval_size), - max_valid_backups_to_open(_max_valid_backups_to_open) { + max_valid_backups_to_open(_max_valid_backups_to_open), + new_naming_for_backup_files(_new_naming_for_backup_files) { assert(share_table_files || !share_files_with_checksum); } }; diff --git a/src.mk b/src.mk index 218ef0ab6..3f92d6f89 100644 --- a/src.mk +++ b/src.mk @@ -166,6 +166,7 @@ LIB_SOURCES = \ table/plain/plain_table_index.cc \ table/plain/plain_table_key_coding.cc \ table/plain/plain_table_reader.cc \ + table/sst_file_dumper.cc \ table/sst_file_reader.cc \ table/sst_file_writer.cc \ table/table_properties.cc \ diff --git a/table/sst_file_dumper.cc b/table/sst_file_dumper.cc new file mode 100644 index 000000000..aea13661d --- /dev/null +++ b/table/sst_file_dumper.cc @@ -0,0 +1,478 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#ifndef ROCKSDB_LITE + +#include "table/sst_file_dumper.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "db/blob/blob_index.h" +#include "db/memtable.h" +#include "db/write_batch_internal.h" +#include "env/composite_env_wrapper.h" +#include "options/cf_options.h" +#include "port/port.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/iterator.h" +#include "rocksdb/slice_transform.h" +#include "rocksdb/status.h" +#include "rocksdb/table_properties.h" +#include "rocksdb/utilities/ldb_cmd.h" +#include "table/block_based/block.h" +#include "table/block_based/block_based_table_builder.h" +#include "table/block_based/block_based_table_factory.h" +#include "table/block_based/block_builder.h" +#include "table/format.h" +#include "table/meta_blocks.h" +#include "table/plain/plain_table_factory.h" +#include "table/table_reader.h" +#include "util/compression.h" +#include "util/random.h" + +namespace ROCKSDB_NAMESPACE { + +SstFileDumper::SstFileDumper(const Options& options, + const std::string& file_path, + size_t readahead_size, bool verify_checksum, + bool output_hex, bool decode_blob_index, + bool silent) + : file_name_(file_path), + read_num_(0), + output_hex_(output_hex), + decode_blob_index_(decode_blob_index), + silent_(silent), + options_(options), + ioptions_(options_), + moptions_(ColumnFamilyOptions(options_)), + read_options_(verify_checksum, false), + internal_comparator_(BytewiseComparator()) { + read_options_.readahead_size = readahead_size; + if (!silent_) { + fprintf(stdout, "Process %s\n", file_path.c_str()); + } + init_result_ = GetTableReader(file_name_); +} + +extern const uint64_t kBlockBasedTableMagicNumber; +extern const uint64_t kLegacyBlockBasedTableMagicNumber; +extern const uint64_t kPlainTableMagicNumber; +extern const uint64_t kLegacyPlainTableMagicNumber; + +const char* testFileName = "test_file_name"; + +Status SstFileDumper::GetTableReader(const std::string& file_path) { + // Warning about 'magic_number' being uninitialized shows up only in UBsan + // builds. Though access is guarded by 's.ok()' checks, fix the issue to + // avoid any warnings. + uint64_t magic_number = Footer::kInvalidTableMagicNumber; + + // read table magic number + Footer footer; + + std::unique_ptr file; + uint64_t file_size = 0; + Status s = options_.env->NewRandomAccessFile(file_path, &file, soptions_); + if (s.ok()) { + s = options_.env->GetFileSize(file_path, &file_size); + } + + // check empty file + // if true, skip further processing of this file + if (file_size == 0) { + return Status::Aborted(file_path, "Empty file"); + } + + file_.reset(new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file), + file_path)); + + FilePrefetchBuffer prefetch_buffer(nullptr, 0, 0, true /* enable */, + false /* track_min_offset */); + if (s.ok()) { + const uint64_t kSstDumpTailPrefetchSize = 512 * 1024; + uint64_t prefetch_size = (file_size > kSstDumpTailPrefetchSize) + ? kSstDumpTailPrefetchSize + : file_size; + uint64_t prefetch_off = file_size - prefetch_size; + prefetch_buffer.Prefetch(file_.get(), prefetch_off, + static_cast(prefetch_size)); + + s = ReadFooterFromFile(file_.get(), &prefetch_buffer, file_size, &footer); + } + if (s.ok()) { + magic_number = footer.table_magic_number(); + } + + if (s.ok()) { + if (magic_number == kPlainTableMagicNumber || + magic_number == kLegacyPlainTableMagicNumber) { + soptions_.use_mmap_reads = true; + options_.env->NewRandomAccessFile(file_path, &file, soptions_); + file_.reset(new RandomAccessFileReader( + NewLegacyRandomAccessFileWrapper(file), file_path)); + } + options_.comparator = &internal_comparator_; + // For old sst format, ReadTableProperties might fail but file can be read + if (ReadTableProperties(magic_number, file_.get(), file_size, + (magic_number == kBlockBasedTableMagicNumber) + ? &prefetch_buffer + : nullptr) + .ok()) { + SetTableOptionsByMagicNumber(magic_number); + } else { + SetOldTableOptions(); + } + } + + if (s.ok()) { + s = NewTableReader(ioptions_, soptions_, internal_comparator_, file_size, + &table_reader_); + } + return s; +} + +Status SstFileDumper::NewTableReader( + const ImmutableCFOptions& /*ioptions*/, const EnvOptions& /*soptions*/, + const InternalKeyComparator& /*internal_comparator*/, uint64_t file_size, + std::unique_ptr* /*table_reader*/) { + auto t_opt = + TableReaderOptions(ioptions_, moptions_.prefix_extractor.get(), soptions_, + internal_comparator_, false /* skip_filters */, + false /* imortal */, true /* force_direct_prefetch */); + // Allow open file with global sequence number for backward compatibility. + t_opt.largest_seqno = kMaxSequenceNumber; + + // We need to turn off pre-fetching of index and filter nodes for + // BlockBasedTable + if (BlockBasedTableFactory::kName == options_.table_factory->Name()) { + return options_.table_factory->NewTableReader(t_opt, std::move(file_), + file_size, &table_reader_, + /*enable_prefetch=*/false); + } + + // For all other factory implementation + return options_.table_factory->NewTableReader(t_opt, std::move(file_), + file_size, &table_reader_); +} + +Status SstFileDumper::VerifyChecksum() { + // We could pass specific readahead setting into read options if needed. + return table_reader_->VerifyChecksum(read_options_, + TableReaderCaller::kSSTDumpTool); +} + +Status SstFileDumper::DumpTable(const std::string& out_filename) { + std::unique_ptr out_file; + Env* env = options_.env; + env->NewWritableFile(out_filename, &out_file, soptions_); + Status s = table_reader_->DumpTable(out_file.get()); + out_file->Close(); + return s; +} + +uint64_t SstFileDumper::CalculateCompressedTableSize( + const TableBuilderOptions& tb_options, size_t block_size, + uint64_t* num_data_blocks) { + std::unique_ptr out_file; + std::unique_ptr env(NewMemEnv(options_.env)); + env->NewWritableFile(testFileName, &out_file, soptions_); + std::unique_ptr dest_writer; + dest_writer.reset( + new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(out_file)), + testFileName, soptions_)); + BlockBasedTableOptions table_options; + table_options.block_size = block_size; + BlockBasedTableFactory block_based_tf(table_options); + std::unique_ptr table_builder; + table_builder.reset(block_based_tf.NewTableBuilder( + tb_options, + TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, + dest_writer.get())); + std::unique_ptr iter(table_reader_->NewIterator( + read_options_, moptions_.prefix_extractor.get(), /*arena=*/nullptr, + /*skip_filters=*/false, TableReaderCaller::kSSTDumpTool)); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + table_builder->Add(iter->key(), iter->value()); + } + if (!iter->status().ok()) { + fputs(iter->status().ToString().c_str(), stderr); + exit(1); + } + Status s = table_builder->Finish(); + if (!s.ok()) { + fputs(s.ToString().c_str(), stderr); + exit(1); + } + uint64_t size = table_builder->FileSize(); + assert(num_data_blocks != nullptr); + *num_data_blocks = table_builder->GetTableProperties().num_data_blocks; + env->DeleteFile(testFileName); + return size; +} + +int SstFileDumper::ShowAllCompressionSizes( + size_t block_size, + const std::vector>& + compression_types, + int32_t compress_level_from, int32_t compress_level_to) { + fprintf(stdout, "Block Size: %" ROCKSDB_PRIszt "\n", block_size); + for (auto& i : compression_types) { + if (CompressionTypeSupported(i.first)) { + fprintf(stdout, "Compression: %-24s\n", i.second); + CompressionOptions compress_opt; + for (int32_t j = compress_level_from; j <= compress_level_to; j++) { + fprintf(stdout, "Compression level: %d", j); + compress_opt.level = j; + ShowCompressionSize(block_size, i.first, compress_opt); + } + } else { + fprintf(stdout, "Unsupported compression type: %s.\n", i.second); + } + } + return 0; +} + +int SstFileDumper::ShowCompressionSize(size_t block_size, + CompressionType compress_type, + const CompressionOptions& compress_opt) { + Options opts; + opts.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + opts.statistics->set_stats_level(StatsLevel::kAll); + const ImmutableCFOptions imoptions(opts); + const ColumnFamilyOptions cfo(opts); + const MutableCFOptions moptions(cfo); + ROCKSDB_NAMESPACE::InternalKeyComparator ikc(opts.comparator); + std::vector> + block_based_table_factories; + + std::string column_family_name; + int unknown_level = -1; + TableBuilderOptions tb_opts( + imoptions, moptions, ikc, &block_based_table_factories, compress_type, + 0 /* sample_for_compression */, compress_opt, false /* skip_filters */, + column_family_name, unknown_level); + uint64_t num_data_blocks = 0; + std::chrono::steady_clock::time_point start = + std::chrono::steady_clock::now(); + uint64_t file_size = + CalculateCompressedTableSize(tb_opts, block_size, &num_data_blocks); + std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now(); + fprintf(stdout, " Size: %10" PRIu64, file_size); + fprintf(stdout, " Blocks: %6" PRIu64, num_data_blocks); + fprintf(stdout, " Time Taken: %10s microsecs", + std::to_string( + std::chrono::duration_cast(end - start) + .count()) + .c_str()); + const uint64_t compressed_blocks = + opts.statistics->getAndResetTickerCount(NUMBER_BLOCK_COMPRESSED); + const uint64_t not_compressed_blocks = + opts.statistics->getAndResetTickerCount(NUMBER_BLOCK_NOT_COMPRESSED); + // When the option enable_index_compression is true, + // NUMBER_BLOCK_COMPRESSED is incremented for index block(s). + if ((compressed_blocks + not_compressed_blocks) > num_data_blocks) { + num_data_blocks = compressed_blocks + not_compressed_blocks; + } + + const uint64_t ratio_not_compressed_blocks = + (num_data_blocks - compressed_blocks) - not_compressed_blocks; + const double compressed_pcnt = + (0 == num_data_blocks) ? 0.0 + : ((static_cast(compressed_blocks) / + static_cast(num_data_blocks)) * + 100.0); + const double ratio_not_compressed_pcnt = + (0 == num_data_blocks) + ? 0.0 + : ((static_cast(ratio_not_compressed_blocks) / + static_cast(num_data_blocks)) * + 100.0); + const double not_compressed_pcnt = + (0 == num_data_blocks) ? 0.0 + : ((static_cast(not_compressed_blocks) / + static_cast(num_data_blocks)) * + 100.0); + fprintf(stdout, " Compressed: %6" PRIu64 " (%5.1f%%)", compressed_blocks, + compressed_pcnt); + fprintf(stdout, " Not compressed (ratio): %6" PRIu64 " (%5.1f%%)", + ratio_not_compressed_blocks, ratio_not_compressed_pcnt); + fprintf(stdout, " Not compressed (abort): %6" PRIu64 " (%5.1f%%)\n", + not_compressed_blocks, not_compressed_pcnt); + return 0; +} + +Status SstFileDumper::ReadTableProperties(uint64_t table_magic_number, + RandomAccessFileReader* file, + uint64_t file_size, + FilePrefetchBuffer* prefetch_buffer) { + TableProperties* table_properties = nullptr; + Status s = ROCKSDB_NAMESPACE::ReadTableProperties( + file, file_size, table_magic_number, ioptions_, &table_properties, + /* compression_type_missing= */ false, + /* memory_allocator= */ nullptr, prefetch_buffer); + if (s.ok()) { + table_properties_.reset(table_properties); + } else { + if (!silent_) { + fprintf(stdout, "Not able to read table properties\n"); + } + } + return s; +} + +Status SstFileDumper::SetTableOptionsByMagicNumber( + uint64_t table_magic_number) { + assert(table_properties_); + if (table_magic_number == kBlockBasedTableMagicNumber || + table_magic_number == kLegacyBlockBasedTableMagicNumber) { + BlockBasedTableFactory* bbtf = new BlockBasedTableFactory(); + // To force tail prefetching, we fake reporting two useful reads of 512KB + // from the tail. + // It needs at least two data points to warm up the stats. + bbtf->tail_prefetch_stats()->RecordEffectiveSize(512 * 1024); + bbtf->tail_prefetch_stats()->RecordEffectiveSize(512 * 1024); + + options_.table_factory.reset(bbtf); + if (!silent_) { + fprintf(stdout, "Sst file format: block-based\n"); + } + + auto& props = table_properties_->user_collected_properties; + auto pos = props.find(BlockBasedTablePropertyNames::kIndexType); + if (pos != props.end()) { + auto index_type_on_file = static_cast( + DecodeFixed32(pos->second.c_str())); + if (index_type_on_file == + BlockBasedTableOptions::IndexType::kHashSearch) { + options_.prefix_extractor.reset(NewNoopTransform()); + } + } + } else if (table_magic_number == kPlainTableMagicNumber || + table_magic_number == kLegacyPlainTableMagicNumber) { + options_.allow_mmap_reads = true; + + PlainTableOptions plain_table_options; + plain_table_options.user_key_len = kPlainTableVariableLength; + plain_table_options.bloom_bits_per_key = 0; + plain_table_options.hash_table_ratio = 0; + plain_table_options.index_sparseness = 1; + plain_table_options.huge_page_tlb_size = 0; + plain_table_options.encoding_type = kPlain; + plain_table_options.full_scan_mode = true; + + options_.table_factory.reset(NewPlainTableFactory(plain_table_options)); + if (!silent_) { + fprintf(stdout, "Sst file format: plain table\n"); + } + } else { + char error_msg_buffer[80]; + snprintf(error_msg_buffer, sizeof(error_msg_buffer) - 1, + "Unsupported table magic number --- %lx", + (long)table_magic_number); + return Status::InvalidArgument(error_msg_buffer); + } + + return Status::OK(); +} + +Status SstFileDumper::SetOldTableOptions() { + assert(table_properties_ == nullptr); + options_.table_factory = std::make_shared(); + if (!silent_) { + fprintf(stdout, "Sst file format: block-based(old version)\n"); + } + + return Status::OK(); +} + +Status SstFileDumper::ReadSequential(bool print_kv, uint64_t read_num, + bool has_from, const std::string& from_key, + bool has_to, const std::string& to_key, + bool use_from_as_prefix) { + if (!table_reader_) { + return init_result_; + } + + InternalIterator* iter = table_reader_->NewIterator( + read_options_, moptions_.prefix_extractor.get(), + /*arena=*/nullptr, /*skip_filters=*/false, + TableReaderCaller::kSSTDumpTool); + uint64_t i = 0; + if (has_from) { + InternalKey ikey; + ikey.SetMinPossibleForUserKey(from_key); + iter->Seek(ikey.Encode()); + } else { + iter->SeekToFirst(); + } + for (; iter->Valid(); iter->Next()) { + Slice key = iter->key(); + Slice value = iter->value(); + ++i; + if (read_num > 0 && i > read_num) break; + + ParsedInternalKey ikey; + if (!ParseInternalKey(key, &ikey)) { + std::cerr << "Internal Key [" << key.ToString(true /* in hex*/) + << "] parse error!\n"; + continue; + } + + // the key returned is not prefixed with out 'from' key + if (use_from_as_prefix && !ikey.user_key.starts_with(from_key)) { + break; + } + + // If end marker was specified, we stop before it + if (has_to && BytewiseComparator()->Compare(ikey.user_key, to_key) >= 0) { + break; + } + + if (print_kv) { + if (!decode_blob_index_ || ikey.type != kTypeBlobIndex) { + fprintf(stdout, "%s => %s\n", ikey.DebugString(output_hex_).c_str(), + value.ToString(output_hex_).c_str()); + } else { + BlobIndex blob_index; + + const Status s = blob_index.DecodeFrom(value); + if (!s.ok()) { + fprintf(stderr, "%s => error decoding blob index\n", + ikey.DebugString(output_hex_).c_str()); + continue; + } + + fprintf(stdout, "%s => %s\n", ikey.DebugString(output_hex_).c_str(), + blob_index.DebugString(output_hex_).c_str()); + } + } + } + + read_num_ += i; + + Status ret = iter->status(); + delete iter; + return ret; +} + +Status SstFileDumper::ReadTableProperties( + std::shared_ptr* table_properties) { + if (!table_reader_) { + return init_result_; + } + + *table_properties = table_reader_->GetTableProperties(); + return init_result_; +} +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/tools/sst_dump_tool_imp.h b/table/sst_file_dumper.h similarity index 97% rename from tools/sst_dump_tool_imp.h rename to table/sst_file_dumper.h index 1d96f46a7..0bc64e3ae 100644 --- a/tools/sst_dump_tool_imp.h +++ b/table/sst_file_dumper.h @@ -5,8 +5,6 @@ #pragma once #ifndef ROCKSDB_LITE -#include "rocksdb/sst_dump_tool.h" - #include #include #include "db/dbformat.h" @@ -19,7 +17,8 @@ class SstFileDumper { public: explicit SstFileDumper(const Options& options, const std::string& file_name, size_t readahead_size, bool verify_checksum, - bool output_hex, bool decode_blob_index); + bool output_hex, bool decode_blob_index, + bool silent = false); Status ReadSequential(bool print_kv, uint64_t read_num, bool has_from, const std::string& from_key, bool has_to, @@ -74,6 +73,8 @@ class SstFileDumper { bool output_hex_; bool decode_blob_index_; EnvOptions soptions_; + // less verbose in stdout/stderr + bool silent_; // options_ and internal_comparator_ will also be used in // ReadSequential internally (specifically, seek-related operations) diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 0859f7d15..210669ec4 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -8,6 +8,15 @@ #include "rocksdb/utilities/ldb_cmd.h" #include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include "db/db_impl/db_impl.h" #include "db/dbformat.h" @@ -26,8 +35,8 @@ #include "rocksdb/write_batch.h" #include "rocksdb/write_buffer_manager.h" #include "table/scoped_arena_iterator.h" +#include "table/sst_file_dumper.h" #include "tools/ldb_cmd_impl.h" -#include "tools/sst_dump_tool_imp.h" #include "util/cast_util.h" #include "util/coding.h" #include "util/file_checksum_helper.h" @@ -36,16 +45,6 @@ #include "utilities/merge_operators.h" #include "utilities/ttl/db_ttl_impl.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include - namespace ROCKSDB_NAMESPACE { class FileChecksumGenCrc32c; diff --git a/tools/sst_dump_tool.cc b/tools/sst_dump_tool.cc index cff5d0552..70ce6913f 100644 --- a/tools/sst_dump_tool.cc +++ b/tools/sst_dump_tool.cc @@ -1,4 +1,3 @@ - // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License @@ -6,68 +5,17 @@ // #ifndef ROCKSDB_LITE -#include "tools/sst_dump_tool_imp.h" +#include "rocksdb/sst_dump_tool.h" #include -#include #include -#include -#include -#include -#include - -#include "db/blob/blob_index.h" -#include "db/memtable.h" -#include "db/write_batch_internal.h" -#include "env/composite_env_wrapper.h" -#include "options/cf_options.h" -#include "rocksdb/db.h" -#include "rocksdb/env.h" -#include "rocksdb/iterator.h" -#include "rocksdb/slice_transform.h" -#include "rocksdb/status.h" -#include "rocksdb/table_properties.h" -#include "rocksdb/utilities/ldb_cmd.h" -#include "table/block_based/block.h" -#include "table/block_based/block_based_table_builder.h" -#include "table/block_based/block_based_table_factory.h" -#include "table/block_based/block_builder.h" -#include "table/format.h" -#include "table/meta_blocks.h" -#include "table/plain/plain_table_factory.h" -#include "table/table_reader.h" -#include "util/compression.h" -#include "util/random.h" #include "port/port.h" +#include "rocksdb/utilities/ldb_cmd.h" +#include "table/sst_file_dumper.h" namespace ROCKSDB_NAMESPACE { -SstFileDumper::SstFileDumper(const Options& options, - const std::string& file_path, - size_t readahead_size, bool verify_checksum, - bool output_hex, bool decode_blob_index) - : file_name_(file_path), - read_num_(0), - output_hex_(output_hex), - decode_blob_index_(decode_blob_index), - options_(options), - ioptions_(options_), - moptions_(ColumnFamilyOptions(options_)), - read_options_(verify_checksum, false), - internal_comparator_(BytewiseComparator()) { - read_options_.readahead_size = readahead_size; - fprintf(stdout, "Process %s\n", file_path.c_str()); - init_result_ = GetTableReader(file_name_); -} - -extern const uint64_t kBlockBasedTableMagicNumber; -extern const uint64_t kLegacyBlockBasedTableMagicNumber; -extern const uint64_t kPlainTableMagicNumber; -extern const uint64_t kLegacyPlainTableMagicNumber; - -const char* testFileName = "test_file_name"; - static const std::vector> kCompressions = { {CompressionType::kNoCompression, "kNoCompression"}, @@ -79,406 +27,6 @@ static const std::vector> {CompressionType::kXpressCompression, "kXpressCompression"}, {CompressionType::kZSTD, "kZSTD"}}; -Status SstFileDumper::GetTableReader(const std::string& file_path) { - // Warning about 'magic_number' being uninitialized shows up only in UBsan - // builds. Though access is guarded by 's.ok()' checks, fix the issue to - // avoid any warnings. - uint64_t magic_number = Footer::kInvalidTableMagicNumber; - - // read table magic number - Footer footer; - - std::unique_ptr file; - uint64_t file_size = 0; - Status s = options_.env->NewRandomAccessFile(file_path, &file, soptions_); - if (s.ok()) { - s = options_.env->GetFileSize(file_path, &file_size); - } - - // check empty file - // if true, skip further processing of this file - if (file_size == 0) { - return Status::Aborted(file_path, "Empty file"); - } - - file_.reset(new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file), - file_path)); - - FilePrefetchBuffer prefetch_buffer(nullptr, 0, 0, true /* enable */, - false /* track_min_offset */); - if (s.ok()) { - const uint64_t kSstDumpTailPrefetchSize = 512 * 1024; - uint64_t prefetch_size = (file_size > kSstDumpTailPrefetchSize) - ? kSstDumpTailPrefetchSize - : file_size; - uint64_t prefetch_off = file_size - prefetch_size; - prefetch_buffer.Prefetch(file_.get(), prefetch_off, - static_cast(prefetch_size)); - - s = ReadFooterFromFile(file_.get(), &prefetch_buffer, file_size, &footer); - } - if (s.ok()) { - magic_number = footer.table_magic_number(); - } - - if (s.ok()) { - if (magic_number == kPlainTableMagicNumber || - magic_number == kLegacyPlainTableMagicNumber) { - soptions_.use_mmap_reads = true; - options_.env->NewRandomAccessFile(file_path, &file, soptions_); - file_.reset(new RandomAccessFileReader( - NewLegacyRandomAccessFileWrapper(file), file_path)); - } - options_.comparator = &internal_comparator_; - // For old sst format, ReadTableProperties might fail but file can be read - if (ReadTableProperties(magic_number, file_.get(), file_size, - (magic_number == kBlockBasedTableMagicNumber) - ? &prefetch_buffer - : nullptr) - .ok()) { - SetTableOptionsByMagicNumber(magic_number); - } else { - SetOldTableOptions(); - } - } - - if (s.ok()) { - s = NewTableReader(ioptions_, soptions_, internal_comparator_, file_size, - &table_reader_); - } - return s; -} - -Status SstFileDumper::NewTableReader( - const ImmutableCFOptions& /*ioptions*/, const EnvOptions& /*soptions*/, - const InternalKeyComparator& /*internal_comparator*/, uint64_t file_size, - std::unique_ptr* /*table_reader*/) { - auto t_opt = - TableReaderOptions(ioptions_, moptions_.prefix_extractor.get(), soptions_, - internal_comparator_, false /* skip_filters */, - false /* imortal */, true /* force_direct_prefetch */); - // Allow open file with global sequence number for backward compatibility. - t_opt.largest_seqno = kMaxSequenceNumber; - - // We need to turn off pre-fetching of index and filter nodes for - // BlockBasedTable - if (BlockBasedTableFactory::kName == options_.table_factory->Name()) { - return options_.table_factory->NewTableReader(t_opt, std::move(file_), - file_size, &table_reader_, - /*enable_prefetch=*/false); - } - - // For all other factory implementation - return options_.table_factory->NewTableReader(t_opt, std::move(file_), - file_size, &table_reader_); -} - -Status SstFileDumper::VerifyChecksum() { - // We could pass specific readahead setting into read options if needed. - return table_reader_->VerifyChecksum(read_options_, - TableReaderCaller::kSSTDumpTool); -} - -Status SstFileDumper::DumpTable(const std::string& out_filename) { - std::unique_ptr out_file; - Env* env = options_.env; - env->NewWritableFile(out_filename, &out_file, soptions_); - Status s = table_reader_->DumpTable(out_file.get()); - out_file->Close(); - return s; -} - -uint64_t SstFileDumper::CalculateCompressedTableSize( - const TableBuilderOptions& tb_options, size_t block_size, - uint64_t* num_data_blocks) { - std::unique_ptr out_file; - std::unique_ptr env(NewMemEnv(options_.env)); - env->NewWritableFile(testFileName, &out_file, soptions_); - std::unique_ptr dest_writer; - dest_writer.reset( - new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(out_file)), - testFileName, soptions_)); - BlockBasedTableOptions table_options; - table_options.block_size = block_size; - BlockBasedTableFactory block_based_tf(table_options); - std::unique_ptr table_builder; - table_builder.reset(block_based_tf.NewTableBuilder( - tb_options, - TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, - dest_writer.get())); - std::unique_ptr iter(table_reader_->NewIterator( - read_options_, moptions_.prefix_extractor.get(), /*arena=*/nullptr, - /*skip_filters=*/false, TableReaderCaller::kSSTDumpTool)); - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - table_builder->Add(iter->key(), iter->value()); - } - if (!iter->status().ok()) { - fputs(iter->status().ToString().c_str(), stderr); - exit(1); - } - Status s = table_builder->Finish(); - if (!s.ok()) { - fputs(s.ToString().c_str(), stderr); - exit(1); - } - uint64_t size = table_builder->FileSize(); - assert(num_data_blocks != nullptr); - *num_data_blocks = table_builder->GetTableProperties().num_data_blocks; - env->DeleteFile(testFileName); - return size; -} - -int SstFileDumper::ShowAllCompressionSizes( - size_t block_size, - const std::vector>& - compression_types, - int32_t compress_level_from, - int32_t compress_level_to) { - - fprintf(stdout, "Block Size: %" ROCKSDB_PRIszt "\n", block_size); - for (auto& i : compression_types) { - if (CompressionTypeSupported(i.first)) { - fprintf(stdout, "Compression: %-24s\n", i.second); - CompressionOptions compress_opt; - for(int32_t j = compress_level_from; j <= compress_level_to; - j++) { - fprintf(stdout, "Compression level: %d", j); - compress_opt.level = j; - ShowCompressionSize(block_size, i.first, compress_opt); - } - } else { - fprintf(stdout, "Unsupported compression type: %s.\n", i.second); - } - } - return 0; -} - -int SstFileDumper::ShowCompressionSize( - size_t block_size, - CompressionType compress_type, - const CompressionOptions& compress_opt) { - Options opts; - opts.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); - opts.statistics->set_stats_level(StatsLevel::kAll); - const ImmutableCFOptions imoptions(opts); - const ColumnFamilyOptions cfo(opts); - const MutableCFOptions moptions(cfo); - ROCKSDB_NAMESPACE::InternalKeyComparator ikc(opts.comparator); - std::vector > - block_based_table_factories; - - std::string column_family_name; - int unknown_level = -1; - TableBuilderOptions tb_opts( - imoptions, moptions, ikc, &block_based_table_factories, compress_type, - 0 /* sample_for_compression */, compress_opt, - false /* skip_filters */, column_family_name, unknown_level); - uint64_t num_data_blocks = 0; - std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now(); - uint64_t file_size = - CalculateCompressedTableSize(tb_opts, block_size, &num_data_blocks); - std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now(); - fprintf(stdout, " Size: %10" PRIu64, file_size); - fprintf(stdout, " Blocks: %6" PRIu64, num_data_blocks); - fprintf(stdout, " Time Taken: %10s microsecs", - std::to_string(std::chrono::duration_cast - (end-start).count()).c_str()); - const uint64_t compressed_blocks = - opts.statistics->getAndResetTickerCount(NUMBER_BLOCK_COMPRESSED); - const uint64_t not_compressed_blocks = - opts.statistics->getAndResetTickerCount(NUMBER_BLOCK_NOT_COMPRESSED); - // When the option enable_index_compression is true, - // NUMBER_BLOCK_COMPRESSED is incremented for index block(s). - if ((compressed_blocks + not_compressed_blocks) > num_data_blocks) { - num_data_blocks = compressed_blocks + not_compressed_blocks; - } - - const uint64_t ratio_not_compressed_blocks = - (num_data_blocks - compressed_blocks) - not_compressed_blocks; - const double compressed_pcnt = - (0 == num_data_blocks) ? 0.0 - : ((static_cast(compressed_blocks) / - static_cast(num_data_blocks)) * - 100.0); - const double ratio_not_compressed_pcnt = - (0 == num_data_blocks) - ? 0.0 - : ((static_cast(ratio_not_compressed_blocks) / - static_cast(num_data_blocks)) * - 100.0); - const double not_compressed_pcnt = - (0 == num_data_blocks) - ? 0.0 - : ((static_cast(not_compressed_blocks) / - static_cast(num_data_blocks)) * - 100.0); - fprintf(stdout, " Compressed: %6" PRIu64 " (%5.1f%%)", compressed_blocks, - compressed_pcnt); - fprintf(stdout, " Not compressed (ratio): %6" PRIu64 " (%5.1f%%)", - ratio_not_compressed_blocks, ratio_not_compressed_pcnt); - fprintf(stdout, " Not compressed (abort): %6" PRIu64 " (%5.1f%%)\n", - not_compressed_blocks, not_compressed_pcnt); - return 0; -} - -Status SstFileDumper::ReadTableProperties(uint64_t table_magic_number, - RandomAccessFileReader* file, - uint64_t file_size, - FilePrefetchBuffer* prefetch_buffer) { - TableProperties* table_properties = nullptr; - Status s = ROCKSDB_NAMESPACE::ReadTableProperties( - file, file_size, table_magic_number, ioptions_, &table_properties, - /* compression_type_missing= */ false, - /* memory_allocator= */ nullptr, prefetch_buffer); - if (s.ok()) { - table_properties_.reset(table_properties); - } else { - fprintf(stdout, "Not able to read table properties\n"); - } - return s; -} - -Status SstFileDumper::SetTableOptionsByMagicNumber( - uint64_t table_magic_number) { - assert(table_properties_); - if (table_magic_number == kBlockBasedTableMagicNumber || - table_magic_number == kLegacyBlockBasedTableMagicNumber) { - BlockBasedTableFactory* bbtf = new BlockBasedTableFactory(); - // To force tail prefetching, we fake reporting two useful reads of 512KB - // from the tail. - // It needs at least two data points to warm up the stats. - bbtf->tail_prefetch_stats()->RecordEffectiveSize(512 * 1024); - bbtf->tail_prefetch_stats()->RecordEffectiveSize(512 * 1024); - - options_.table_factory.reset(bbtf); - fprintf(stdout, "Sst file format: block-based\n"); - - auto& props = table_properties_->user_collected_properties; - auto pos = props.find(BlockBasedTablePropertyNames::kIndexType); - if (pos != props.end()) { - auto index_type_on_file = static_cast( - DecodeFixed32(pos->second.c_str())); - if (index_type_on_file == - BlockBasedTableOptions::IndexType::kHashSearch) { - options_.prefix_extractor.reset(NewNoopTransform()); - } - } - } else if (table_magic_number == kPlainTableMagicNumber || - table_magic_number == kLegacyPlainTableMagicNumber) { - options_.allow_mmap_reads = true; - - PlainTableOptions plain_table_options; - plain_table_options.user_key_len = kPlainTableVariableLength; - plain_table_options.bloom_bits_per_key = 0; - plain_table_options.hash_table_ratio = 0; - plain_table_options.index_sparseness = 1; - plain_table_options.huge_page_tlb_size = 0; - plain_table_options.encoding_type = kPlain; - plain_table_options.full_scan_mode = true; - - options_.table_factory.reset(NewPlainTableFactory(plain_table_options)); - fprintf(stdout, "Sst file format: plain table\n"); - } else { - char error_msg_buffer[80]; - snprintf(error_msg_buffer, sizeof(error_msg_buffer) - 1, - "Unsupported table magic number --- %lx", - (long)table_magic_number); - return Status::InvalidArgument(error_msg_buffer); - } - - return Status::OK(); -} - -Status SstFileDumper::SetOldTableOptions() { - assert(table_properties_ == nullptr); - options_.table_factory = std::make_shared(); - fprintf(stdout, "Sst file format: block-based(old version)\n"); - - return Status::OK(); -} - -Status SstFileDumper::ReadSequential(bool print_kv, uint64_t read_num, - bool has_from, const std::string& from_key, - bool has_to, const std::string& to_key, - bool use_from_as_prefix) { - if (!table_reader_) { - return init_result_; - } - - InternalIterator* iter = table_reader_->NewIterator( - read_options_, moptions_.prefix_extractor.get(), - /*arena=*/nullptr, /*skip_filters=*/false, - TableReaderCaller::kSSTDumpTool); - uint64_t i = 0; - if (has_from) { - InternalKey ikey; - ikey.SetMinPossibleForUserKey(from_key); - iter->Seek(ikey.Encode()); - } else { - iter->SeekToFirst(); - } - for (; iter->Valid(); iter->Next()) { - Slice key = iter->key(); - Slice value = iter->value(); - ++i; - if (read_num > 0 && i > read_num) - break; - - ParsedInternalKey ikey; - if (!ParseInternalKey(key, &ikey)) { - std::cerr << "Internal Key [" - << key.ToString(true /* in hex*/) - << "] parse error!\n"; - continue; - } - - // the key returned is not prefixed with out 'from' key - if (use_from_as_prefix && !ikey.user_key.starts_with(from_key)) { - break; - } - - // If end marker was specified, we stop before it - if (has_to && BytewiseComparator()->Compare(ikey.user_key, to_key) >= 0) { - break; - } - - if (print_kv) { - if (!decode_blob_index_ || ikey.type != kTypeBlobIndex) { - fprintf(stdout, "%s => %s\n", ikey.DebugString(output_hex_).c_str(), - value.ToString(output_hex_).c_str()); - } else { - BlobIndex blob_index; - - const Status s = blob_index.DecodeFrom(value); - if (!s.ok()) { - fprintf(stderr, "%s => error decoding blob index\n", - ikey.DebugString(output_hex_).c_str()); - continue; - } - - fprintf(stdout, "%s => %s\n", ikey.DebugString(output_hex_).c_str(), - blob_index.DebugString(output_hex_).c_str()); - } - } - } - - read_num_ += i; - - Status ret = iter->status(); - delete iter; - return ret; -} - -Status SstFileDumper::ReadTableProperties( - std::shared_ptr* table_properties) { - if (!table_reader_) { - return init_result_; - } - - *table_properties = table_reader_->GetTableProperties(); - return init_result_; -} - namespace { void print_help(bool to_stderr) { diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 2183ef553..50830cc0e 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -9,7 +9,10 @@ #ifndef ROCKSDB_LITE +#include "rocksdb/utilities/backupable_db.h" + #include + #include #include #include @@ -33,7 +36,7 @@ #include "port/port.h" #include "rocksdb/rate_limiter.h" #include "rocksdb/transaction_log.h" -#include "rocksdb/utilities/backupable_db.h" +#include "table/sst_file_dumper.h" #include "test_util/sync_point.h" #include "util/channel.h" #include "util/coding.h" @@ -129,6 +132,9 @@ class BackupEngineImpl : public BackupEngine { Status Initialize(); + // Whether new naming for backup files is used or not + bool UseNewNaming() const { return options_.new_naming_for_backup_files; } + private: void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0); Status DeleteBackupInternal(BackupID backup_id); @@ -140,8 +146,14 @@ class BackupEngineImpl : public BackupEngine { std::unordered_map* result); struct FileInfo { - FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum) - : refs(0), filename(fname), size(sz), checksum_value(checksum) {} + FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum, + const std::string& id = "", const std::string& sid = "") + : refs(0), + filename(fname), + size(sz), + checksum_value(checksum), + db_id(id), + db_session_id(sid) {} FileInfo(const FileInfo&) = delete; FileInfo& operator=(const FileInfo&) = delete; @@ -150,6 +162,11 @@ class BackupEngineImpl : public BackupEngine { const std::string filename; const uint64_t size; const uint32_t checksum_value; + // DB identities + // db_id is obtained for potential usage in the future but not used + // currently; db_session_id appears in the backup SST filename + const std::string db_id; + const std::string db_session_id; }; class BackupMeta { @@ -281,14 +298,20 @@ class BackupEngineImpl : public BackupEngine { return GetSharedChecksumDirRel() + "/" + (tmp ? "." : "") + file + (tmp ? ".tmp" : ""); } - inline std::string GetSharedFileWithChecksum(const std::string& file, - const uint32_t checksum_value, - const uint64_t file_size) const { + // If UseNewNaming() && !db_session_id.empty(), backup SST filenames consist + // of file_number, crc32c, db_session_id. + // Otherwise, backup SST filenames consist of file_number, crc32c, file_size + inline std::string GetSharedFileWithChecksum( + const std::string& file, const uint32_t checksum_value, + const uint64_t file_size, const std::string& db_session_id) const { assert(file.size() == 0 || file[0] != '/'); std::string file_copy = file; - return file_copy.insert(file_copy.find_last_of('.'), - "_" + ROCKSDB_NAMESPACE::ToString(checksum_value) + - "_" + ROCKSDB_NAMESPACE::ToString(file_size)); + const std::string suffix = UseNewNaming() && !db_session_id.empty() + ? db_session_id + : ROCKSDB_NAMESPACE::ToString(file_size); + return file_copy.insert( + file_copy.find_last_of('.'), + "_" + ROCKSDB_NAMESPACE::ToString(checksum_value) + "_" + suffix); } inline std::string GetFileFromChecksumFile(const std::string& file) const { assert(file.size() == 0 || file[0] != '/'); @@ -304,6 +327,9 @@ class BackupEngineImpl : public BackupEngine { return GetBackupMetaDir() + "/" + (tmp ? "." : "") + ROCKSDB_NAMESPACE::ToString(backup_id) + (tmp ? ".tmp" : ""); } + inline bool IsSstFile(const std::string& fname) const { + return fname.length() > 4 && fname.rfind(".sst") == fname.length() - 4; + } // If size_limit == 0, there is no size limit, copy everything. // @@ -311,22 +337,28 @@ class BackupEngineImpl : public BackupEngine { // // @param src If non-empty, the file is copied from this pathname. // @param contents If non-empty, the file will be created with these contents. - Status CopyOrCreateFile(const std::string& src, const std::string& dst, - const std::string& contents, Env* src_env, - Env* dst_env, const EnvOptions& src_env_options, - bool sync, RateLimiter* rate_limiter, - uint64_t* size = nullptr, - uint32_t* checksum_value = nullptr, - uint64_t size_limit = 0, - std::function progress_callback = []() {}); + Status CopyOrCreateFile( + const std::string& src, const std::string& dst, + const std::string& contents, Env* src_env, Env* dst_env, + const EnvOptions& src_env_options, bool sync, RateLimiter* rate_limiter, + uint64_t* size = nullptr, uint32_t* checksum_value = nullptr, + uint64_t size_limit = 0, + std::function progress_callback = []() {}, + std::string* db_id = nullptr, std::string* db_session_id = nullptr); Status CalculateChecksum(const std::string& src, Env* src_env, const EnvOptions& src_env_options, uint64_t size_limit, uint32_t* checksum_value); + // Obtain db_id and db_session_id from the table properties of file_path + Status GetFileDbIdentities(Env* src_env, const std::string& file_path, + std::string* db_id, std::string* db_session_id); + struct CopyOrCreateResult { uint64_t size; uint32_t checksum_value; + std::string db_id; + std::string db_session_id; Status status; }; @@ -644,6 +676,9 @@ Status BackupEngineImpl::Initialize() { continue; } assert(backups_.find(backup_id) == backups_.end()); + // Insert all the (backup_id, BackupMeta) that will be loaded later + // The loading performed later will check whether there are corrupt backups + // and move the corrupt backups to corrupt_backups_ backups_.insert(std::make_pair( backup_id, std::unique_ptr(new BackupMeta( GetBackupMetaFile(backup_id, false /* tmp */), @@ -667,7 +702,11 @@ Status BackupEngineImpl::Initialize() { return s; } } else { // Load data from storage + // abs_path_to_size: maps absolute paths of files in backup directory to + // their corresponding sizes std::unordered_map abs_path_to_size; + // Insert files and their sizes in backup sub-directories (shared and + // shared_checksum) to abs_path_to_size for (const auto& rel_dir : {GetSharedFileRel(), GetSharedFileWithChecksumRel()}) { const auto abs_dir = GetAbsolutePath(rel_dir); @@ -687,6 +726,8 @@ Status BackupEngineImpl::Initialize() { break; } + // Insert files and their sizes in backup sub-directories + // (private/backup_id) to abs_path_to_size InsertPathnameToSizeBytes( GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_env_, &abs_path_to_size); @@ -765,7 +806,7 @@ Status BackupEngineImpl::Initialize() { work_item.src_env, work_item.dst_env, work_item.src_env_options, work_item.sync, work_item.rate_limiter, &result.size, &result.checksum_value, work_item.size_limit, - work_item.progress_callback); + work_item.progress_callback, &result.db_id, &result.db_session_id); work_item.result.set_value(std::move(result)); } }); @@ -924,10 +965,9 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata( item.dst_path); } if (item_status.ok()) { - item_status = new_backup.get()->AddFile( - std::make_shared(item.dst_relative, - result.size, - result.checksum_value)); + item_status = new_backup.get()->AddFile(std::make_shared( + item.dst_relative, result.size, result.checksum_value, result.db_id, + result.db_session_id)); } if (!item_status.ok()) { s = item_status; @@ -1199,12 +1239,13 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options, // 1. extract the filename size_t slash = file.find_last_of('/'); // file will either be shared/, shared_checksum/ - // or private// + // shared_checksum/, or private// assert(slash != std::string::npos); dst = file.substr(slash + 1); // if the file was in shared_checksum, extract the real file name // in this case the file is __. + // or __. if new naming is used if (file.substr(0, slash) == GetSharedChecksumDirRel()) { dst = GetFileFromChecksumFile(dst); } @@ -1214,7 +1255,8 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options, FileType type; bool ok = ParseFileName(dst, &number, &type); if (!ok) { - return Status::Corruption("Backup corrupted"); + return Status::Corruption("Backup corrupted: Fail to parse filename " + + dst); } // 3. Construct the final path // kLogFile lives in wal_dir and all the rest live in db_dir @@ -1297,7 +1339,8 @@ Status BackupEngineImpl::CopyOrCreateFile( const std::string& src, const std::string& dst, const std::string& contents, Env* src_env, Env* dst_env, const EnvOptions& src_env_options, bool sync, RateLimiter* rate_limiter, uint64_t* size, uint32_t* checksum_value, - uint64_t size_limit, std::function progress_callback) { + uint64_t size_limit, std::function progress_callback, + std::string* db_id, std::string* db_session_id) { assert(src.empty() != contents.empty()); Status s; std::unique_ptr dst_file; @@ -1311,6 +1354,12 @@ Status BackupEngineImpl::CopyOrCreateFile( if (checksum_value != nullptr) { *checksum_value = 0; } + if (db_id != nullptr) { + *db_id = ""; + } + if (db_session_id != nullptr) { + *db_session_id = ""; + } // Check if size limit is set. if not, set it to very big number if (size_limit == 0) { @@ -1381,6 +1430,22 @@ Status BackupEngineImpl::CopyOrCreateFile( if (s.ok()) { s = dest_writer->Close(); } + if (s.ok() && UseNewNaming()) { + // When copying SST files and using db_session_id in the name, + // try to get DB identities + // Note that when CopyOrCreateFile() is called while restoring, we still + // try obtaining the ids but as for now we do not use ids to verify + // the restored file + if (!src.empty()) { + // copying + if (IsSstFile(src)) { + // SST file + // Ignore the returned status + // In the failed cases, db_id and db_session_id will be empty + GetFileDbIdentities(src_env, src, db_id, db_session_id); + } + } + } return s; } @@ -1400,19 +1465,28 @@ Status BackupEngineImpl::AddBackupFileWorkItem( std::string dst_relative_tmp; Status s; uint32_t checksum_value = 0; + std::string db_id; + std::string db_session_id; + // Step 1: Prepare the relative path to destination if (shared && shared_checksum) { - // add checksum and file length to the file name + // Prepare checksum to add to file name s = CalculateChecksum(src_dir + fname, db_env_, src_env_options, size_limit, &checksum_value); if (!s.ok()) { return s; } + if (UseNewNaming()) { + // Prepare db_session_id to add to the file name + // Ignore the returned status + // In the failed cases, db_id and db_session_id will be empty + GetFileDbIdentities(db_env_, src_dir + fname, &db_id, &db_session_id); + } if (size_bytes == port::kMaxUint64) { return Status::NotFound("File missing: " + src_dir + fname); } - dst_relative = - GetSharedFileWithChecksum(dst_relative, checksum_value, size_bytes); + dst_relative = GetSharedFileWithChecksum(dst_relative, checksum_value, + size_bytes, db_session_id); dst_relative_tmp = GetSharedFileWithChecksumRel(dst_relative, true); dst_relative = GetSharedFileWithChecksumRel(dst_relative, false); } else if (shared) { @@ -1436,6 +1510,7 @@ Status BackupEngineImpl::AddBackupFileWorkItem( copy_dest_path = &final_dest_path; } + // Step 2: Determine whether to copy or not // if it's shared, we also need to check if it exists -- if it does, no need // to copy it again. bool need_to_copy = true; @@ -1445,6 +1520,8 @@ Status BackupEngineImpl::AddBackupFileWorkItem( bool file_exists = false; if (shared && !same_path) { + // Should be in shared directory but not a live path, check existence in + // shared directory Status exist = backup_env_->FileExists(final_dest_path); if (exist.ok()) { file_exists = true; @@ -1461,9 +1538,17 @@ Status BackupEngineImpl::AddBackupFileWorkItem( } else if (shared && (same_path || file_exists)) { need_to_copy = false; if (shared_checksum) { - ROCKS_LOG_INFO(options_.info_log, - "%s already present, with checksum %u and size %" PRIu64, - fname.c_str(), checksum_value, size_bytes); + if (UseNewNaming() && !db_session_id.empty()) { + ROCKS_LOG_INFO(options_.info_log, + "%s already present, with checksum %u, size %" PRIu64 + " and DB session identity %s", + fname.c_str(), checksum_value, size_bytes, + db_session_id.c_str()); + } else { + ROCKS_LOG_INFO(options_.info_log, + "%s already present, with checksum %u and size %" PRIu64, + fname.c_str(), checksum_value, size_bytes); + } } else if (backuped_file_infos_.find(dst_relative) == backuped_file_infos_.end() && !same_path) { // file already exists, but it's not referenced by any backup. overwrite @@ -1481,10 +1566,24 @@ Status BackupEngineImpl::AddBackupFileWorkItem( "%s already present, calculate checksum", fname.c_str()); s = CalculateChecksum(src_dir + fname, db_env_, src_env_options, size_limit, &checksum_value); + if (!s.ok()) { + return s; + } + // try to get the db identities as they are also members of + // the class CopyOrCreateResult + if (UseNewNaming()) { + assert(IsSstFile(fname)); + ROCKS_LOG_INFO(options_.info_log, + "%s checksum checksum calculated, try to obtain DB " + "session identity", + fname.c_str()); + GetFileDbIdentities(db_env_, src_dir + fname, &db_id, &db_session_id); + } } } live_dst_paths.insert(final_dest_path); + // Step 3: Add work item if (!contents.empty() || need_to_copy) { ROCKS_LOG_INFO(options_.info_log, "Copying %s to %s", fname.c_str(), copy_dest_path->c_str()); @@ -1507,6 +1606,8 @@ Status BackupEngineImpl::AddBackupFileWorkItem( result.status = s; result.size = size_bytes; result.checksum_value = checksum_value; + result.db_id = db_id; + result.db_session_id = db_session_id; promise_result.set_value(std::move(result)); } return s; @@ -1551,6 +1652,48 @@ Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env, return s; } +Status BackupEngineImpl::GetFileDbIdentities(Env* src_env, + const std::string& file_path, + std::string* db_id, + std::string* db_session_id) { + // Prepare the full_path of file_path under src_env for SstFileDumper + std::string full_path; + src_env->GetAbsolutePath(file_path, &full_path); + + SstFileDumper sst_reader(Options(), full_path, + 2 * 1024 * 1024 + /* readahead_size */, + false /* verify_checksum */, false /* output_hex */, + false /* decode_blob_index */, true /* silent */); + + const TableProperties* table_properties = nullptr; + std::shared_ptr tp; + Status s = sst_reader.getStatus(); + + if (s.ok()) { + // Try to get table properties from the table reader of sst_reader + if (!sst_reader.ReadTableProperties(&tp).ok()) { + // Try to use table properites from the initialization of sst_reader + table_properties = sst_reader.GetInitTableProperties(); + } else { + table_properties = tp.get(); + } + } else { + return s; + } + + if (table_properties != nullptr) { + db_id->assign(table_properties->db_id); + db_session_id->assign(table_properties->db_session_id); + if (db_session_id->empty()) { + return Status::NotFound("DB session identity not found in " + file_path); + } + return Status::OK(); + } else { + return Status::Corruption("Table properties missing in " + file_path); + } +} + void BackupEngineImpl::DeleteChildren(const std::string& dir, uint32_t file_type_filter) { std::vector children; @@ -1814,6 +1957,8 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile( for (uint32_t i = 0; s.ok() && i < num_files; ++i) { auto line = GetSliceUntil(&data, '\n'); + // filename is relative, i.e., shared/number.sst, + // shared_checksum/number.sst, or private/backup_id/number.sst std::string filename = GetSliceUntil(&line, ' ').ToString(); uint64_t size; diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 1f60fe79f..217e27676 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -1283,6 +1283,144 @@ TEST_F(BackupableDBTest, ShareTableFilesWithChecksumsTransition) { } } +// Verify backup and restore with share_files_with_checksum and +// new_naming_for_backup_files on +TEST_F(BackupableDBTest, ShareTableFilesWithChecksumsNewNaming) { + // Use session id in the name of SST file backup + ASSERT_TRUE(backupable_options_->new_naming_for_backup_files); + const int keys_iteration = 5000; + OpenDBAndBackupEngine(true, false, kShareWithChecksum); + for (int i = 0; i < 5; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(i % 2))); + } + CloseDBAndBackupEngine(); + + for (int i = 0; i < 5; ++i) { + AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1), + keys_iteration * 6); + } +} + +// Verify backup and restore with share_files_with_checksum off and then +// transition this option and new_naming_for_backup_files to be on +TEST_F(BackupableDBTest, ShareTableFilesWithChecksumsNewNamingTransition) { + const int keys_iteration = 5000; + // We may set new_naming_for_backup_files to false here + // but even if it is true, it should have no effect when + // share_files_with_checksum is false + ASSERT_TRUE(backupable_options_->new_naming_for_backup_files); + // set share_files_with_checksum to false + OpenDBAndBackupEngine(true, false, kShareNoChecksum); + for (int i = 0; i < 5; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + } + CloseDBAndBackupEngine(); + + for (int i = 0; i < 5; ++i) { + AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1), + keys_iteration * 6); + } + + // set share_files_with_checksum to true and do some more backups + // and use session id in the name of SST file backup + ASSERT_TRUE(backupable_options_->new_naming_for_backup_files); + OpenDBAndBackupEngine(false /* destroy_old_data */, false, + kShareWithChecksum); + for (int i = 5; i < 10; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + } + CloseDBAndBackupEngine(); + + // Verify first (about to delete) + AssertBackupConsistency(1, 0, keys_iteration, keys_iteration * 11); + + // For an extra challenge, make sure that GarbageCollect / DeleteBackup + // is OK even if we open without share_table_files but with + // new_naming_for_backup_files on + ASSERT_TRUE(backupable_options_->new_naming_for_backup_files); + OpenDBAndBackupEngine(false /* destroy_old_data */, false, kNoShare); + backup_engine_->DeleteBackup(1); + backup_engine_->GarbageCollect(); + CloseDBAndBackupEngine(); + + // Verify second (about to delete) + AssertBackupConsistency(2, 0, keys_iteration * 2, keys_iteration * 11); + + // Turn off new_naming_for_backup_files and open without share_table_files + // Again, make sure that GarbageCollect / DeleteBackup is OK + backupable_options_->new_naming_for_backup_files = false; + OpenDBAndBackupEngine(false /* destroy_old_data */, false, kNoShare); + backup_engine_->DeleteBackup(2); + backup_engine_->GarbageCollect(); + CloseDBAndBackupEngine(); + + // Verify rest (not deleted) + for (int i = 1; i < 9; ++i) { + AssertBackupConsistency(i + 2, 0, keys_iteration * (i + 2), + keys_iteration * 11); + } +} + +// Verify backup and restore with share_files_with_checksum on but +// new_naming_for_backup_files off, then transition new_naming_for_backup_files +// to be on +TEST_F(BackupableDBTest, ShareTableFilesWithChecksumsNewNamingUpgrade) { + backupable_options_->new_naming_for_backup_files = false; + const int keys_iteration = 5000; + // set share_files_with_checksum to true + OpenDBAndBackupEngine(true, false, kShareWithChecksum); + for (int i = 0; i < 5; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + } + CloseDBAndBackupEngine(); + + for (int i = 0; i < 5; ++i) { + AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1), + keys_iteration * 6); + } + + // set new_naming_for_backup_files to true and do some more backups + backupable_options_->new_naming_for_backup_files = true; + OpenDBAndBackupEngine(false /* destroy_old_data */, false, + kShareWithChecksum); + for (int i = 5; i < 10; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + } + CloseDBAndBackupEngine(); + + // Verify first (about to delete) + AssertBackupConsistency(1, 0, keys_iteration, keys_iteration * 11); + + // For an extra challenge, make sure that GarbageCollect / DeleteBackup + // is OK even if we open without share_table_files + OpenDBAndBackupEngine(false /* destroy_old_data */, false, kNoShare); + backup_engine_->DeleteBackup(1); + backup_engine_->GarbageCollect(); + CloseDBAndBackupEngine(); + + // Verify second (about to delete) + AssertBackupConsistency(2, 0, keys_iteration * 2, keys_iteration * 11); + + // Turn off new_naming_for_backup_files and open without share_table_files + // Again, make sure that GarbageCollect / DeleteBackup is OK + backupable_options_->new_naming_for_backup_files = false; + OpenDBAndBackupEngine(false /* destroy_old_data */, false, kNoShare); + backup_engine_->DeleteBackup(2); + backup_engine_->GarbageCollect(); + CloseDBAndBackupEngine(); + + // Verify rest (not deleted) + for (int i = 2; i < 10; ++i) { + AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1), + keys_iteration * 11); + } +} + // This test simulates cleaning up after aborted or incomplete creation // of a new backup. TEST_F(BackupableDBTest, DeleteTmpFiles) {