diff --git a/db/corruption_test.cc b/db/corruption_test.cc index 86d79f887..846ad5402 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -32,6 +32,7 @@ #include "table/mock_table.h" #include "test_util/testharness.h" #include "test_util/testutil.h" +#include "util/cast_util.h" #include "util/random.h" #include "util/string_util.h" @@ -552,7 +553,7 @@ TEST_F(CorruptionTest, FileSystemStateCorrupted) { DBImpl* dbi = static_cast_with_check(db_); std::vector metadata; dbi->GetLiveFilesMetaData(&metadata); - ASSERT_GT(metadata.size(), size_t(0)); + ASSERT_GT(metadata.size(), 0); std::string filename = dbname_ + metadata[0].name; delete db_; @@ -568,7 +569,7 @@ TEST_F(CorruptionTest, FileSystemStateCorrupted) { } else { // delete the file ASSERT_OK(env_.DeleteFile(filename)); Status x = TryReopen(&options); - ASSERT_TRUE(x.IsPathNotFound()); + ASSERT_TRUE(x.IsCorruption()); } ASSERT_OK(DestroyDB(dbname_, options_)); diff --git a/db/version_edit.h b/db/version_edit.h index 9193a8369..fed9613e3 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -533,8 +533,11 @@ class VersionEdit { private: friend class ReactiveVersionSet; + friend class VersionEditHandlerBase; + friend class ListColumnFamiliesHandler; friend class VersionEditHandler; friend class VersionEditHandlerPointInTime; + friend class DumpManifestHandler; friend class VersionSet; friend class Version; friend class AtomicGroupReadBuffer; diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index c5924f065..44ec792ae 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -9,28 +9,14 @@ #include "db/version_edit_handler.h" +#include + #include "monitoring/persistent_stats_history.h" namespace ROCKSDB_NAMESPACE { -VersionEditHandler::VersionEditHandler( - bool read_only, const std::vector& column_families, - VersionSet* version_set, bool track_missing_files, - bool no_error_if_table_files_missing, - const std::shared_ptr& io_tracer) - : read_only_(read_only), - column_families_(column_families), - status_(), - version_set_(version_set), - track_missing_files_(track_missing_files), - no_error_if_table_files_missing_(no_error_if_table_files_missing), - initialized_(false), - io_tracer_(io_tracer) { - assert(version_set_ != nullptr); -} - -void VersionEditHandler::Iterate(log::Reader& reader, Status* log_read_status, - std::string* db_id) { +void VersionEditHandlerBase::Iterate(log::Reader& reader, + Status* log_read_status) { Slice record; std::string scratch; assert(log_read_status); @@ -38,19 +24,14 @@ void VersionEditHandler::Iterate(log::Reader& reader, Status* log_read_status, size_t recovered_edits = 0; Status s = Initialize(); - while (s.ok() && reader.ReadRecord(&record, &scratch) && - log_read_status->ok()) { + while (reader.LastRecordEnd() < max_manifest_read_size_ && s.ok() && + reader.ReadRecord(&record, &scratch) && log_read_status->ok()) { VersionEdit edit; s = edit.DecodeFrom(record); if (!s.ok()) { break; } - if (edit.has_db_id_) { - version_set_->db_id_ = edit.GetDbId(); - if (db_id != nullptr) { - *db_id = version_set_->db_id_; - } - } + s = read_buffer_.AddEdit(&edit); if (!s.ok()) { break; @@ -81,11 +62,67 @@ void VersionEditHandler::Iterate(log::Reader& reader, Status* log_read_status, s = *log_read_status; } + read_buffer_.Clear(); + CheckIterationResult(reader, &s); if (!s.ok()) { status_ = s; } + TEST_SYNC_POINT_CALLBACK("VersionEditHandlerBase::Iterate:Finish", + &recovered_edits); +} + +Status ListColumnFamiliesHandler::ApplyVersionEdit( + VersionEdit& edit, ColumnFamilyData** /*unused*/) { + Status s; + if (edit.is_column_family_add_) { + if (column_family_names_.find(edit.column_family_) != + column_family_names_.end()) { + s = Status::Corruption("Manifest adding the same column family twice"); + } else { + column_family_names_.insert( + {edit.column_family_, edit.column_family_name_}); + } + } else if (edit.is_column_family_drop_) { + if (column_family_names_.find(edit.column_family_) == + column_family_names_.end()) { + s = Status::Corruption("Manifest - dropping non-existing column family"); + } else { + column_family_names_.erase(edit.column_family_); + } + } + return s; +} + +Status FileChecksumRetriever::ApplyVersionEdit(VersionEdit& edit, + ColumnFamilyData** /*unused*/) { + for (const auto& deleted_file : edit.GetDeletedFiles()) { + file_checksum_list_.RemoveOneFileChecksum(deleted_file.second); + } + for (const auto& new_file : edit.GetNewFiles()) { + file_checksum_list_.InsertOneFileChecksum( + new_file.second.fd.GetNumber(), new_file.second.file_checksum, + new_file.second.file_checksum_func_name); + } + return Status::OK(); +} + +VersionEditHandler::VersionEditHandler( + bool read_only, const std::vector& column_families, + VersionSet* version_set, bool track_missing_files, + bool no_error_if_table_files_missing, + const std::shared_ptr& io_tracer, bool skip_load_table_files) + : VersionEditHandlerBase(), + read_only_(read_only), + column_families_(column_families), + version_set_(version_set), + track_missing_files_(track_missing_files), + no_error_if_table_files_missing_(no_error_if_table_files_missing), + io_tracer_(io_tracer), + skip_load_table_files_(skip_load_table_files), + initialized_(false) { + assert(version_set_ != nullptr); } Status VersionEditHandler::Initialize() { @@ -274,7 +311,7 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader, Status* s) { assert(s != nullptr); if (!s->ok()) { - read_buffer_.Clear(); + // Do nothing here. } else if (!version_edit_params_.has_log_number_ || !version_edit_params_.has_next_file_number_ || !version_edit_params_.has_last_sequence_) { @@ -292,6 +329,8 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader, msg.append(" entry in MANIFEST"); *s = Status::Corruption(msg); } + // There were some column families in the MANIFEST that weren't specified + // in the argument. This is OK in read_only mode if (s->ok() && !read_only_ && !column_families_not_found_.empty()) { std::string msg; for (const auto& cf : column_families_not_found_) { @@ -330,6 +369,10 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader, *s = LoadTables(cfd, /*prefetch_index_and_filter_in_cache=*/false, /*is_initial_load=*/true); if (!s->ok()) { + // If s is IOError::PathNotFound, then we mark the db as corrupted. + if (s->IsPathNotFound()) { + *s = Status::Corruption("Corruption: " + s->ToString()); + } break; } } @@ -426,6 +469,9 @@ Status VersionEditHandler::MaybeCreateVersion(const VersionEdit& /*edit*/, Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd, bool prefetch_index_and_filter_in_cache, bool is_initial_load) { + if (skip_load_table_files_) { + return Status::OK(); + } assert(cfd != nullptr); assert(!cfd->IsDropped()); auto builder_iter = builders_.find(cfd->GetID()); @@ -452,10 +498,11 @@ Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd, Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, const VersionEdit& edit) { Status s; + if (edit.has_db_id_) { + version_set_->db_id_ = edit.GetDbId(); + version_edit_params_.SetDBId(edit.db_id_); + } if (cfd != nullptr) { - if (edit.has_db_id_) { - version_edit_params_.SetDBId(edit.db_id_); - } if (edit.has_log_number_) { if (cfd->GetLogNumber() > edit.log_number_) { ROCKS_LOG_WARN( @@ -505,8 +552,7 @@ VersionEditHandlerPointInTime::VersionEditHandlerPointInTime( VersionSet* version_set, const std::shared_ptr& io_tracer) : VersionEditHandler(read_only, column_families, version_set, /*track_missing_files=*/true, - /*no_error_if_table_files_missing=*/true, io_tracer), - io_tracer_(io_tracer) {} + /*no_error_if_table_files_missing=*/true, io_tracer) {} VersionEditHandlerPointInTime::~VersionEditHandlerPointInTime() { for (const auto& elem : versions_) { @@ -612,4 +658,32 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion( return s; } +void DumpManifestHandler::CheckIterationResult(const log::Reader& reader, + Status* s) { + VersionEditHandler::CheckIterationResult(reader, s); + if (!s->ok()) { + fprintf(stdout, "%s\n", s->ToString().c_str()); + return; + } + for (auto* cfd : *(version_set_->column_family_set_)) { + fprintf(stdout, + "--------------- Column family \"%s\" (ID %" PRIu32 + ") --------------\n", + cfd->GetName().c_str(), cfd->GetID()); + fprintf(stdout, "log number: %" PRIu64 "\n", cfd->GetLogNumber()); + fprintf(stdout, "comparator: %s\n", cfd->user_comparator()->Name()); + assert(cfd->current()); + fprintf(stdout, "%s \n", cfd->current()->DebugString(hex_).c_str()); + } + fprintf(stdout, + "next_file_number %" PRIu64 " last_sequence %" PRIu64 + " prev_log_number %" PRIu64 " max_column_family %" PRIu32 + " min_log_number_to_keep " + "%" PRIu64 "\n", + version_set_->current_next_file_number(), + version_set_->LastSequence(), version_set_->prev_log_number(), + version_set_->column_family_set_->GetMaxColumnFamily(), + version_set_->min_log_number_to_keep_2pc()); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_edit_handler.h b/db/version_edit_handler.h index 4faf7d651..113869c3c 100644 --- a/db/version_edit_handler.h +++ b/db/version_edit_handler.h @@ -15,7 +15,73 @@ namespace ROCKSDB_NAMESPACE { -typedef std::unique_ptr VersionBuilderUPtr; +class VersionEditHandlerBase { + public: + explicit VersionEditHandlerBase() + : max_manifest_read_size_(std::numeric_limits::max()) {} + + virtual ~VersionEditHandlerBase() {} + + void Iterate(log::Reader& reader, Status* log_read_status); + + const Status& status() const { return status_; } + + protected: + explicit VersionEditHandlerBase(uint64_t max_read_size) + : max_manifest_read_size_(max_read_size) {} + virtual Status Initialize() { return Status::OK(); } + + virtual Status ApplyVersionEdit(VersionEdit& edit, + ColumnFamilyData** cfd) = 0; + + virtual void CheckIterationResult(const log::Reader& /*reader*/, + Status* /*s*/) {} + + Status status_; + + private: + AtomicGroupReadBuffer read_buffer_; + const uint64_t max_manifest_read_size_; +}; + +class ListColumnFamiliesHandler : public VersionEditHandlerBase { + public: + ListColumnFamiliesHandler() : VersionEditHandlerBase() {} + + ~ListColumnFamiliesHandler() override {} + + const std::map GetColumnFamilyNames() const { + return column_family_names_; + } + + protected: + Status ApplyVersionEdit(VersionEdit& edit, + ColumnFamilyData** /*unused*/) override; + + private: + // default column family is always implicitly there + std::map column_family_names_{ + {0, kDefaultColumnFamilyName}}; +}; + +class FileChecksumRetriever : public VersionEditHandlerBase { + public: + FileChecksumRetriever(uint64_t max_read_size, + FileChecksumList& file_checksum_list) + : VersionEditHandlerBase(max_read_size), + file_checksum_list_(file_checksum_list) {} + + ~FileChecksumRetriever() override {} + + protected: + Status ApplyVersionEdit(VersionEdit& edit, + ColumnFamilyData** /*unused*/) override; + + private: + FileChecksumList& file_checksum_list_; +}; + +using VersionBuilderUPtr = std::unique_ptr; // A class used for scanning MANIFEST file. // VersionEditHandler reads a MANIFEST file, parses the version edits, and @@ -24,31 +90,48 @@ typedef std::unique_ptr VersionBuilderUPtr; // To use this class and its subclasses, // 1. Create an object of VersionEditHandler or its subclasses. // VersionEditHandler handler(read_only, column_families, version_set, -// track_missing_files, ignore_missing_files); +// track_missing_files, +// no_error_if_table_files_missing); // 2. Status s = handler.Iterate(reader, &db_id); // 3. Check s and handle possible errors. // // Not thread-safe, external synchronization is necessary if an object of // VersionEditHandler is shared by multiple threads. -class VersionEditHandler { +class VersionEditHandler : public VersionEditHandlerBase { public: explicit VersionEditHandler( bool read_only, const std::vector& column_families, VersionSet* version_set, bool track_missing_files, - bool ignore_missing_files, const std::shared_ptr& io_tracer); - - virtual ~VersionEditHandler() {} + bool no_error_if_table_files_missing, + const std::shared_ptr& io_tracer) + : VersionEditHandler(read_only, column_families, version_set, + track_missing_files, no_error_if_table_files_missing, + io_tracer, /*skip_load_table_files=*/false) {} - void Iterate(log::Reader& reader, Status* log_read_status, - std::string* db_id); + ~VersionEditHandler() override {} - const Status& status() const { return status_; } + const VersionEditParams& GetVersionEditParams() const { + return version_edit_params_; + } bool HasMissingFiles() const; + void GetDbId(std::string* db_id) const { + if (db_id && version_edit_params_.has_db_id_) { + *db_id = version_edit_params_.db_id_; + } + } + protected: - Status ApplyVersionEdit(VersionEdit& edit, ColumnFamilyData** cfd); + explicit VersionEditHandler( + bool read_only, + const std::vector& column_families, + VersionSet* version_set, bool track_missing_files, + bool no_error_if_table_files_missing, + const std::shared_ptr& io_tracer, bool skip_load_table_files); + + Status ApplyVersionEdit(VersionEdit& edit, ColumnFamilyData** cfd) override; Status OnColumnFamilyAdd(VersionEdit& edit, ColumnFamilyData** cfd); @@ -60,12 +143,12 @@ class VersionEditHandler { Status OnWalDeletion(VersionEdit& edit); - Status Initialize(); + Status Initialize() override; void CheckColumnFamilyId(const VersionEdit& edit, bool* cf_in_not_found, bool* cf_in_builders) const; - virtual void CheckIterationResult(const log::Reader& reader, Status* s); + void CheckIterationResult(const log::Reader& reader, Status* s) override; ColumnFamilyData* CreateCfAndInit(const ColumnFamilyOptions& cf_options, const VersionEdit& edit); @@ -82,24 +165,26 @@ class VersionEditHandler { const bool read_only_; const std::vector& column_families_; - Status status_; VersionSet* version_set_; - AtomicGroupReadBuffer read_buffer_; std::unordered_map builders_; std::unordered_map name_to_options_; + // Keeps track of column families in manifest that were not found in + // column families parameters. if those column families are not dropped + // by subsequent manifest records, Recover() will return failure status. std::unordered_map column_families_not_found_; VersionEditParams version_edit_params_; const bool track_missing_files_; std::unordered_map> cf_to_missing_files_; bool no_error_if_table_files_missing_; + std::shared_ptr io_tracer_; + bool skip_load_table_files_; private: Status ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, const VersionEdit& edit); bool initialized_; - std::shared_ptr io_tracer_; }; // A class similar to its base class, i.e. VersionEditHandler. @@ -124,7 +209,44 @@ class VersionEditHandlerPointInTime : public VersionEditHandler { private: std::unordered_map versions_; - std::shared_ptr io_tracer_; +}; + +class DumpManifestHandler : public VersionEditHandler { + public: + DumpManifestHandler( + const std::vector& column_families, + VersionSet* version_set, const std::shared_ptr& io_tracer, + bool verbose, bool hex, bool json) + : VersionEditHandler( + /*read_only=*/true, column_families, version_set, + /*track_missing_files=*/false, + /*no_error_if_table_files_missing=*/false, io_tracer, + /*skip_load_table_files=*/true), + verbose_(verbose), + hex_(hex), + json_(json), + count_(0) {} + + ~DumpManifestHandler() override {} + + Status ApplyVersionEdit(VersionEdit& edit, ColumnFamilyData** cfd) override { + // Write out each individual edit + if (verbose_ && !json_) { + fprintf(stdout, "%s\n", edit.DebugString(hex_).c_str()); + } else if (json_) { + fprintf(stdout, "%s\n", edit.DebugJSON(count_, hex_).c_str()); + } + ++count_; + return VersionEditHandler::ApplyVersionEdit(edit, cfd); + } + + void CheckIterationResult(const log::Reader& reader, Status* s) override; + + private: + const bool verbose_; + const bool hex_; + const bool json_; + int count_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_set.cc b/db/version_set.cc index fda3155e5..cffc5979d 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -4713,15 +4713,6 @@ Status VersionSet::ReadAndRecover( Status VersionSet::Recover( const std::vector& column_families, bool read_only, std::string* db_id) { - std::unordered_map cf_name_to_options; - for (const auto& cf : column_families) { - cf_name_to_options.emplace(cf.name, cf.options); - } - // keeps track of column families in manifest that were not found in - // column families parameters. if those column families are not dropped - // by subsequent manifest records, Recover() will return failure status - std::unordered_map column_families_not_found; - // Read "CURRENT" file, which contains a pointer to the current manifest file std::string manifest_path; Status s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path, @@ -4746,139 +4737,30 @@ Status VersionSet::Recover( new SequentialFileReader(std::move(manifest_file), manifest_path, db_options_->log_readahead_size, io_tracer_)); } - - VersionBuilderMap builders; - - // add default column family - auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName); - if (default_cf_iter == cf_name_to_options.end()) { - return Status::InvalidArgument("Default column family not specified"); - } - VersionEdit default_cf_edit; - default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName); - default_cf_edit.SetColumnFamily(0); - ColumnFamilyData* default_cfd = - CreateColumnFamily(default_cf_iter->second, &default_cf_edit); - // In recovery, nobody else can access it, so it's fine to set it to be - // initialized earlier. - default_cfd->set_initialized(); - builders.insert( - std::make_pair(0, std::unique_ptr( - new BaseReferencedVersionBuilder(default_cfd)))); uint64_t current_manifest_file_size = 0; - VersionEditParams version_edit_params; + uint64_t log_number = 0; { VersionSet::LogReporter reporter; Status log_read_status; reporter.status = &log_read_status; log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter, true /* checksum */, 0 /* log_number */); - AtomicGroupReadBuffer read_buffer; - s = ReadAndRecover(reader, &read_buffer, cf_name_to_options, - column_families_not_found, builders, &log_read_status, - &version_edit_params, db_id); - current_manifest_file_size = reader.GetReadOffset(); - assert(current_manifest_file_size != 0); - } - - if (s.ok()) { - if (!version_edit_params.has_next_file_number_) { - s = Status::Corruption("no meta-nextfile entry in descriptor"); - } else if (!version_edit_params.has_log_number_) { - s = Status::Corruption("no meta-lognumber entry in descriptor"); - } else if (!version_edit_params.has_last_sequence_) { - s = Status::Corruption("no last-sequence-number entry in descriptor"); - } - - if (!version_edit_params.has_prev_log_number_) { - version_edit_params.SetPrevLogNumber(0); - } - - column_family_set_->UpdateMaxColumnFamily( - version_edit_params.max_column_family_); - - // When reading DB generated using old release, min_log_number_to_keep=0. - // All log files will be scanned for potential prepare entries. - MarkMinLogNumberToKeep2PC(version_edit_params.min_log_number_to_keep_); - MarkFileNumberUsed(version_edit_params.prev_log_number_); - MarkFileNumberUsed(version_edit_params.log_number_); - } - - // there were some column families in the MANIFEST that weren't specified - // in the argument. This is OK in read_only mode - if (read_only == false && !column_families_not_found.empty()) { - std::string list_of_not_found; - for (const auto& cf : column_families_not_found) { - list_of_not_found += ", " + cf.second; - } - list_of_not_found = list_of_not_found.substr(2); - s = Status::InvalidArgument( - "You have to open all column families. Column families not opened: " + - list_of_not_found); - } - - if (s.ok()) { - for (auto cfd : *column_family_set_) { - assert(builders.count(cfd->GetID()) > 0); - auto* builder = builders[cfd->GetID()]->version_builder(); - if (!builder->CheckConsistencyForNumLevels()) { - s = Status::InvalidArgument( - "db has more levels than options.num_levels"); - break; - } + VersionEditHandler handler( + read_only, column_families, const_cast(this), + /*track_missing_files=*/false, + /*no_error_if_table_files_missing=*/false, io_tracer_); + handler.Iterate(reader, &log_read_status); + s = handler.status(); + if (s.ok()) { + log_number = handler.GetVersionEditParams().log_number_; + current_manifest_file_size = reader.GetReadOffset(); + assert(current_manifest_file_size != 0); + handler.GetDbId(db_id); } } if (s.ok()) { - for (auto cfd : *column_family_set_) { - if (cfd->IsDropped()) { - continue; - } - if (read_only) { - cfd->table_cache()->SetTablesAreImmortal(); - } - assert(cfd->initialized()); - auto builders_iter = builders.find(cfd->GetID()); - assert(builders_iter != builders.end()); - auto builder = builders_iter->second->version_builder(); - - // unlimited table cache. Pre-load table handle now. - // Need to do it out of the mutex. - s = builder->LoadTableHandlers( - cfd->internal_stats(), db_options_->max_file_opening_threads, - false /* prefetch_index_and_filter_in_cache */, - true /* is_initial_load */, - cfd->GetLatestMutableCFOptions()->prefix_extractor.get(), - MaxFileSizeForL0MetaPin(*cfd->GetLatestMutableCFOptions())); - if (!s.ok()) { - if (db_options_->paranoid_checks) { - return s; - } - s = Status::OK(); - } - - Version* v = new Version(cfd, this, file_options_, - *cfd->GetLatestMutableCFOptions(), io_tracer_, - current_version_number_++); - s = builder->SaveTo(v->storage_info()); - if (!s.ok()) { - delete v; - return s; - } - - // Install recovered version - v->PrepareApply(*cfd->GetLatestMutableCFOptions(), - !(db_options_->skip_stats_update_on_db_open)); - AppendVersion(cfd, v); - } - manifest_file_size_ = current_manifest_file_size; - next_file_number_.store(version_edit_params.next_file_number_ + 1); - last_allocated_sequence_ = version_edit_params.last_sequence_; - last_published_sequence_ = version_edit_params.last_sequence_; - last_sequence_ = version_edit_params.last_sequence_; - prev_log_number_ = version_edit_params.prev_log_number_; - ROCKS_LOG_INFO( db_options_->info_log, "Recovered from manifest file:%s succeeded," @@ -4887,9 +4769,8 @@ Status VersionSet::Recover( ",prev_log_number is %" PRIu64 ",max_column_family is %" PRIu32 ",min_log_number_to_keep is %" PRIu64 "\n", manifest_path.c_str(), manifest_file_number_, next_file_number_.load(), - last_sequence_.load(), version_edit_params.log_number_, - prev_log_number_, column_family_set_->GetMaxColumnFamily(), - min_log_number_to_keep_2pc()); + last_sequence_.load(), log_number, prev_log_number_, + column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc()); for (auto cfd : *column_family_set_) { if (cfd->IsDropped()) { @@ -5037,7 +4918,9 @@ Status VersionSet::TryRecoverFromOneManifest( VersionEditHandlerPointInTime handler_pit( read_only, column_families, const_cast(this), io_tracer_); - handler_pit.Iterate(reader, &s, db_id); + handler_pit.Iterate(reader, &s); + + handler_pit.GetDbId(db_id); assert(nullptr != has_missing_table_file); *has_missing_table_file = handler_pit.HasMissingFiles(); @@ -5071,48 +4954,23 @@ Status VersionSet::ListColumnFamilies(std::vector* column_families, nullptr /*IOTracer*/)); } - std::map column_family_names; - // default column family is always implicitly there - column_family_names.insert({0, kDefaultColumnFamilyName}); VersionSet::LogReporter reporter; reporter.status = &s; log::Reader reader(nullptr, std::move(file_reader), &reporter, true /* checksum */, 0 /* log_number */); - Slice record; - std::string scratch; - while (reader.ReadRecord(&record, &scratch) && s.ok()) { - VersionEdit edit; - s = edit.DecodeFrom(record); - if (!s.ok()) { - break; - } - if (edit.is_column_family_add_) { - if (column_family_names.find(edit.column_family_) != - column_family_names.end()) { - s = Status::Corruption("Manifest adding the same column family twice"); - break; - } - column_family_names.insert( - {edit.column_family_, edit.column_family_name_}); - } else if (edit.is_column_family_drop_) { - if (column_family_names.find(edit.column_family_) == - column_family_names.end()) { - s = Status::Corruption( - "Manifest - dropping non-existing column family"); - break; - } - column_family_names.erase(edit.column_family_); - } - } + ListColumnFamiliesHandler handler; + handler.Iterate(reader, &s); + + assert(column_families); column_families->clear(); - if (s.ok()) { - for (const auto& iter : column_family_names) { + if (handler.status().ok()) { + for (const auto& iter : handler.GetColumnFamilyNames()) { column_families->push_back(iter.second); } } - return s; + return handler.status(); } #ifndef ROCKSDB_LITE @@ -5271,194 +5129,19 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, std::move(file), dscname, db_options_->log_readahead_size, io_tracer_)); } - bool have_prev_log_number = false; - bool have_next_file = false; - bool have_last_sequence = false; - uint64_t next_file = 0; - uint64_t last_sequence = 0; - uint64_t previous_log_number = 0; - int count = 0; - std::unordered_map comparators; - std::unordered_map> - builders; - - // add default column family - VersionEdit default_cf_edit; - default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName); - default_cf_edit.SetColumnFamily(0); - ColumnFamilyData* default_cfd = - CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit); - builders.insert( - std::make_pair(0, std::unique_ptr( - new BaseReferencedVersionBuilder(default_cfd)))); - + std::vector column_families( + 1, ColumnFamilyDescriptor(kDefaultColumnFamilyName, options)); + DumpManifestHandler handler(column_families, this, io_tracer_, verbose, hex, + json); { VersionSet::LogReporter reporter; reporter.status = &s; log::Reader reader(nullptr, std::move(file_reader), &reporter, true /* checksum */, 0 /* log_number */); - Slice record; - std::string scratch; - while (reader.ReadRecord(&record, &scratch) && s.ok()) { - VersionEdit edit; - s = edit.DecodeFrom(record); - if (!s.ok()) { - break; - } - - // Write out each individual edit - if (verbose && !json) { - printf("%s\n", edit.DebugString(hex).c_str()); - } else if (json) { - printf("%s\n", edit.DebugJSON(count, hex).c_str()); - } - count++; - - bool cf_in_builders = - builders.find(edit.column_family_) != builders.end(); - - if (edit.has_comparator_) { - comparators.insert({edit.column_family_, edit.comparator_}); - } - - ColumnFamilyData* cfd = nullptr; - - if (edit.is_column_family_add_) { - if (cf_in_builders) { - s = Status::Corruption( - "Manifest adding the same column family twice"); - break; - } - cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit); - cfd->set_initialized(); - builders.insert(std::make_pair( - edit.column_family_, std::unique_ptr( - new BaseReferencedVersionBuilder(cfd)))); - } else if (edit.is_column_family_drop_) { - if (!cf_in_builders) { - s = Status::Corruption( - "Manifest - dropping non-existing column family"); - break; - } - auto builder_iter = builders.find(edit.column_family_); - builders.erase(builder_iter); - comparators.erase(edit.column_family_); - cfd = column_family_set_->GetColumnFamily(edit.column_family_); - assert(cfd != nullptr); - cfd->UnrefAndTryDelete(); - cfd = nullptr; - } else { - if (!cf_in_builders) { - s = Status::Corruption( - "Manifest record referencing unknown column family"); - break; - } - - cfd = column_family_set_->GetColumnFamily(edit.column_family_); - // this should never happen since cf_in_builders is true - assert(cfd != nullptr); - - // if it is not column family add or column family drop, - // then it's a file add/delete, which should be forwarded - // to builder - auto builder = builders.find(edit.column_family_); - assert(builder != builders.end()); - s = builder->second->version_builder()->Apply(&edit); - if (!s.ok()) { - break; - } - } - - if (cfd != nullptr && edit.has_log_number_) { - cfd->SetLogNumber(edit.log_number_); - } - - - if (edit.has_prev_log_number_) { - previous_log_number = edit.prev_log_number_; - have_prev_log_number = true; - } - - if (edit.has_next_file_number_) { - next_file = edit.next_file_number_; - have_next_file = true; - } - - if (edit.has_last_sequence_) { - last_sequence = edit.last_sequence_; - have_last_sequence = true; - } - - if (edit.has_max_column_family_) { - column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_); - } - - if (edit.has_min_log_number_to_keep_) { - MarkMinLogNumberToKeep2PC(edit.min_log_number_to_keep_); - } - } - } - file_reader.reset(); - - if (s.ok()) { - if (!have_next_file) { - s = Status::Corruption("no meta-nextfile entry in descriptor"); - printf("no meta-nextfile entry in descriptor"); - } else if (!have_last_sequence) { - printf("no last-sequence-number entry in descriptor"); - s = Status::Corruption("no last-sequence-number entry in descriptor"); - } - - if (!have_prev_log_number) { - previous_log_number = 0; - } + handler.Iterate(reader, &s); } - if (s.ok()) { - for (auto cfd : *column_family_set_) { - if (cfd->IsDropped()) { - continue; - } - auto builders_iter = builders.find(cfd->GetID()); - assert(builders_iter != builders.end()); - auto builder = builders_iter->second->version_builder(); - - Version* v = new Version(cfd, this, file_options_, - *cfd->GetLatestMutableCFOptions(), io_tracer_, - current_version_number_++); - s = builder->SaveTo(v->storage_info()); - v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false); - - printf("--------------- Column family \"%s\" (ID %" PRIu32 - ") --------------\n", - cfd->GetName().c_str(), cfd->GetID()); - printf("log number: %" PRIu64 "\n", cfd->GetLogNumber()); - auto comparator = comparators.find(cfd->GetID()); - if (comparator != comparators.end()) { - printf("comparator: %s\n", comparator->second.c_str()); - } else { - printf("comparator: \n"); - } - printf("%s \n", v->DebugString(hex).c_str()); - delete v; - } - - next_file_number_.store(next_file + 1); - last_allocated_sequence_ = last_sequence; - last_published_sequence_ = last_sequence; - last_sequence_ = last_sequence; - prev_log_number_ = previous_log_number; - - printf("next_file_number %" PRIu64 " last_sequence %" PRIu64 - " prev_log_number %" PRIu64 " max_column_family %" PRIu32 - " min_log_number_to_keep " - "%" PRIu64 "\n", - next_file_number_.load(), last_sequence, previous_log_number, - column_family_set_->GetMaxColumnFamily(), - min_log_number_to_keep_2pc()); - } - - return s; + return handler.status(); } #endif // ROCKSDB_LITE diff --git a/db/version_set.h b/db/version_set.h index 613133610..09e54f6af 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1227,6 +1227,7 @@ class VersionSet { friend class Version; friend class VersionEditHandler; friend class VersionEditHandlerPointInTime; + friend class DumpManifestHandler; friend class DBImpl; friend class DBImplReadOnly; diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 4d54cec92..540885720 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -1750,6 +1750,10 @@ class VersionSetAtomicGroupTest : public VersionSetTestBase, EXPECT_TRUE(first_in_atomic_group_); last_in_atomic_group_ = true; }); + SyncPoint::GetInstance()->SetCallBack( + "VersionEditHandlerBase::Iterate:Finish", [&](void* arg) { + num_recovered_edits_ = *reinterpret_cast(arg); + }); SyncPoint::GetInstance()->SetCallBack( "VersionSet::ReadAndRecover:RecoveredEdits", [&](void* arg) { num_recovered_edits_ = *reinterpret_cast(arg); diff --git a/util/file_checksum_helper.cc b/util/file_checksum_helper.cc index 761307b3e..78ed524cb 100644 --- a/util/file_checksum_helper.cc +++ b/util/file_checksum_helper.cc @@ -13,6 +13,7 @@ #include "db/log_reader.h" #include "db/version_edit.h" +#include "db/version_edit_handler.h" #include "file/sequence_file_reader.h" namespace ROCKSDB_NAMESPACE { @@ -95,7 +96,7 @@ Status GetFileChecksumsFromManifest(Env* src_env, const std::string& abs_path, if (checksum_list == nullptr) { return Status::InvalidArgument("checksum_list is nullptr"); } - + assert(checksum_list); checksum_list->reset(); Status s; @@ -123,32 +124,13 @@ Status GetFileChecksumsFromManifest(Env* src_env, const std::string& abs_path, reporter.status_ptr = &s; log::Reader reader(nullptr, std::move(file_reader), &reporter, true /* checksum */, 0 /* log_number */); - Slice record; - std::string scratch; - while (reader.LastRecordEnd() < manifest_file_size && - reader.ReadRecord(&record, &scratch) && s.ok()) { - VersionEdit edit; - s = edit.DecodeFrom(record); - if (!s.ok()) { - break; - } - - // Remove the deleted files from the checksum_list - for (const auto& deleted_file : edit.GetDeletedFiles()) { - checksum_list->RemoveOneFileChecksum(deleted_file.second); - } - - // Add the new files to the checksum_list - for (const auto& new_file : edit.GetNewFiles()) { - checksum_list->InsertOneFileChecksum( - new_file.second.fd.GetNumber(), new_file.second.file_checksum, - new_file.second.file_checksum_func_name); - } - } - assert(!s.ok() || + FileChecksumRetriever retriever(manifest_file_size, *checksum_list); + retriever.Iterate(reader, &s); + assert(!retriever.status().ok() || manifest_file_size == std::numeric_limits::max() || reader.LastRecordEnd() == manifest_file_size); - return s; + + return retriever.status(); } } // namespace ROCKSDB_NAMESPACE