diff --git a/CMakeLists.txt b/CMakeLists.txt index f2d60fa42..23a068ee7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -565,6 +565,7 @@ set(SOURCES db/trim_history_scheduler.cc db/version_builder.cc db/version_edit.cc + db/version_edit_handler.cc db/version_set.cc db/wal_manager.cc db/write_batch.cc diff --git a/HISTORY.md b/HISTORY.md index 7208b07e9..af2d631cf 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -13,6 +13,7 @@ ### New Features * Basic support for user timestamp in iterator. Seek/SeekToFirst/Next and lower/upper bounds are supported. Reverse iteration is not supported. Merge is not considered. * When file lock failure when the lock is held by the current process, return acquiring time and thread ID in the error message. +* Added a new option, best_efforts_recovery (default: false), to allow database to open in a db dir with missing table files. During best efforts recovery, missing table files are ignored, and database recovers to the most recent state without missing table file. Cross-column-family consistency is not guaranteed even if WAL is enabled. ## 6.8.0 (02/24/2020) ### Java API Changes diff --git a/TARGETS b/TARGETS index d4ea69b00..429566eb6 100644 --- a/TARGETS +++ b/TARGETS @@ -171,6 +171,7 @@ cpp_library( "db/trim_history_scheduler.cc", "db/version_builder.cc", "db/version_edit.cc", + "db/version_edit_handler.cc", "db/version_set.cc", "db/wal_manager.cc", "db/write_batch.cc", diff --git a/db/column_family.cc b/db/column_family.cc index 407e684a1..299c97b79 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -1397,8 +1397,8 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname, const ImmutableDBOptions* db_options, const FileOptions& file_options, Cache* table_cache, - WriteBufferManager* write_buffer_manager, - WriteController* write_controller, + WriteBufferManager* _write_buffer_manager, + WriteController* _write_controller, BlockCacheTracer* const block_cache_tracer) : max_column_family_(0), dummy_cfd_(new ColumnFamilyData( @@ -1410,8 +1410,8 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname, db_options_(db_options), file_options_(file_options), table_cache_(table_cache), - write_buffer_manager_(write_buffer_manager), - write_controller_(write_controller), + write_buffer_manager_(_write_buffer_manager), + write_controller_(_write_controller), block_cache_tracer_(block_cache_tracer) { // initialize linked list dummy_cfd_->prev_ = dummy_cfd_; diff --git a/db/column_family.h b/db/column_family.h index 342f6ba8c..cd2546d72 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -647,8 +647,8 @@ class ColumnFamilySet { ColumnFamilySet(const std::string& dbname, const ImmutableDBOptions* db_options, const FileOptions& file_options, Cache* table_cache, - WriteBufferManager* write_buffer_manager, - WriteController* write_controller, + WriteBufferManager* _write_buffer_manager, + WriteController* _write_controller, BlockCacheTracer* const block_cache_tracer); ~ColumnFamilySet(); @@ -678,6 +678,10 @@ class ColumnFamilySet { Cache* get_table_cache() { return table_cache_; } + WriteBufferManager* write_buffer_manager() { return write_buffer_manager_; } + + WriteController* write_controller() { return write_controller_; } + private: friend class ColumnFamilyData; // helper function that gets called from cfd destructor diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index c33690e27..f67f04425 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -1734,6 +1734,158 @@ TEST_F(DBBasicTest, MultiGetIOBufferOverrun) { keys.data(), values.data(), statuses.data(), true); } +TEST_F(DBBasicTest, IncrementalRecoveryNoCorrupt) { + Options options = CurrentOptions(); + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu", "eevee"}, options); + size_t num_cfs = handles_.size(); + ASSERT_EQ(3, num_cfs); + WriteOptions write_opts; + write_opts.disableWAL = true; + for (size_t cf = 0; cf != num_cfs; ++cf) { + for (size_t i = 0; i != 10000; ++i) { + std::string key_str = Key(static_cast(i)); + std::string value_str = std::to_string(cf) + "_" + std::to_string(i); + + ASSERT_OK(Put(static_cast(cf), key_str, value_str)); + if (0 == (i % 1000)) { + ASSERT_OK(Flush(static_cast(cf))); + } + } + } + for (size_t cf = 0; cf != num_cfs; ++cf) { + ASSERT_OK(Flush(static_cast(cf))); + } + Close(); + options.best_efforts_recovery = true; + ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"}, + options); + num_cfs = handles_.size(); + ASSERT_EQ(3, num_cfs); + for (size_t cf = 0; cf != num_cfs; ++cf) { + for (int i = 0; i != 10000; ++i) { + std::string key_str = Key(static_cast(i)); + std::string expected_value_str = + std::to_string(cf) + "_" + std::to_string(i); + ASSERT_EQ(expected_value_str, Get(static_cast(cf), key_str)); + } + } +} + +namespace { +class TableFileListener : public EventListener { + public: + void OnTableFileCreated(const TableFileCreationInfo& info) override { + InstrumentedMutexLock lock(&mutex_); + cf_to_paths_[info.cf_name].push_back(info.file_path); + } + std::vector& GetFiles(const std::string& cf_name) { + InstrumentedMutexLock lock(&mutex_); + return cf_to_paths_[cf_name]; + } + + private: + InstrumentedMutex mutex_; + std::unordered_map> cf_to_paths_; +}; +} // namespace + +TEST_F(DBBasicTest, RecoverWithMissingFiles) { + Options options = CurrentOptions(); + DestroyAndReopen(options); + TableFileListener* listener = new TableFileListener(); + // Disable auto compaction to simplify SST file name tracking. + options.disable_auto_compactions = true; + options.listeners.emplace_back(listener); + CreateAndReopenWithCF({"pikachu", "eevee"}, options); + std::vector all_cf_names = {kDefaultColumnFamilyName, "pikachu", + "eevee"}; + size_t num_cfs = handles_.size(); + ASSERT_EQ(3, num_cfs); + for (size_t cf = 0; cf != num_cfs; ++cf) { + ASSERT_OK(Put(static_cast(cf), "a", "0_value")); + ASSERT_OK(Flush(static_cast(cf))); + ASSERT_OK(Put(static_cast(cf), "b", "0_value")); + ASSERT_OK(Flush(static_cast(cf))); + ASSERT_OK(Put(static_cast(cf), "c", "0_value")); + ASSERT_OK(Flush(static_cast(cf))); + } + + // Delete files + for (size_t i = 0; i < all_cf_names.size(); ++i) { + std::vector& files = listener->GetFiles(all_cf_names[i]); + ASSERT_EQ(3, files.size()); + for (int j = static_cast(files.size() - 1); j >= static_cast(i); + --j) { + ASSERT_OK(env_->DeleteFile(files[j])); + } + } + options.best_efforts_recovery = true; + ReopenWithColumnFamilies(all_cf_names, options); + // Verify data + ReadOptions read_opts; + read_opts.total_order_seek = true; + { + std::unique_ptr iter(db_->NewIterator(read_opts, handles_[0])); + iter->SeekToFirst(); + ASSERT_FALSE(iter->Valid()); + iter.reset(db_->NewIterator(read_opts, handles_[1])); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a", iter->key()); + iter->Next(); + ASSERT_FALSE(iter->Valid()); + iter.reset(db_->NewIterator(read_opts, handles_[2])); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a", iter->key()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("b", iter->key()); + iter->Next(); + ASSERT_FALSE(iter->Valid()); + } +} + +TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) { + Options options = CurrentOptions(); + DestroyAndReopen(options); + TableFileListener* listener = new TableFileListener(); + options.listeners.emplace_back(listener); + CreateAndReopenWithCF({"pikachu"}, options); + std::vector kAllCfNames = {kDefaultColumnFamilyName, "pikachu"}; + size_t num_cfs = handles_.size(); + ASSERT_EQ(2, num_cfs); + for (int cf = 0; cf < static_cast(kAllCfNames.size()); ++cf) { + ASSERT_OK(Put(cf, "a", "0_value")); + ASSERT_OK(Flush(cf)); + ASSERT_OK(Put(cf, "b", "0_value")); + } + // Delete files + for (size_t i = 0; i < kAllCfNames.size(); ++i) { + std::vector& files = listener->GetFiles(kAllCfNames[i]); + ASSERT_EQ(1, files.size()); + for (int j = static_cast(files.size() - 1); j >= static_cast(i); + --j) { + ASSERT_OK(env_->DeleteFile(files[j])); + } + } + options.best_efforts_recovery = true; + ReopenWithColumnFamilies(kAllCfNames, options); + // Verify WAL is not applied + ReadOptions read_opts; + read_opts.total_order_seek = true; + std::unique_ptr iter(db_->NewIterator(read_opts, handles_[0])); + iter->SeekToFirst(); + ASSERT_FALSE(iter->Valid()); + iter.reset(db_->NewIterator(read_opts, handles_[1])); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a", iter->key()); + iter->Next(); + ASSERT_FALSE(iter->Valid()); +} + class DBBasicTestWithParallelIO : public DBTestBase, public testing::WithParamInterface> { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index a22b20981..978ee8c9c 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1124,6 +1124,17 @@ class DBImpl : public DB { virtual bool OwnTablesAndLogs() const { return true; } + // REQUIRES: db mutex held when calling this function, but the db mutex can + // be released and re-acquired. Db mutex will be held when the function + // returns. + // Currently, this function should be called only in best-efforts recovery + // mode. + // After best-efforts recovery, there may be SST files in db/cf paths that are + // not referenced in the MANIFEST. We delete these SST files. In the + // meantime, we find out the largest file number present in the paths, and + // bump up the version set's next_file_number_ to be 1 + largest_file_number. + Status CleanupFilesAfterRecovery(); + private: friend class DB; friend class ErrorHandler; diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index c5d07dd01..20381eaa9 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -14,6 +14,7 @@ #include "db/event_helpers.h" #include "db/memtable_list.h" #include "file/file_util.h" +#include "file/filename.h" #include "file/sst_file_manager_impl.h" #include "util/autovector.h" @@ -664,4 +665,55 @@ uint64_t PrecomputeMinLogNumberToKeep( return min_log_number_to_keep; } +Status DBImpl::CleanupFilesAfterRecovery() { + mutex_.AssertHeld(); + std::vector paths; + paths.push_back(dbname_); + for (const auto& db_path : immutable_db_options_.db_paths) { + paths.push_back(db_path.path); + } + for (const auto* cfd : *versions_->GetColumnFamilySet()) { + for (const auto& cf_path : cfd->ioptions()->cf_paths) { + paths.push_back(cf_path.path); + } + } + // Dedup paths + std::sort(paths.begin(), paths.end()); + paths.erase(std::unique(paths.begin(), paths.end()), paths.end()); + + uint64_t next_file_number = versions_->current_next_file_number(); + uint64_t largest_file_number = next_file_number; + std::set files_to_delete; + for (const auto& path : paths) { + std::vector files; + env_->GetChildren(path, &files); + for (const auto& fname : files) { + uint64_t number = 0; + FileType type; + if (!ParseFileName(fname, &number, &type)) { + continue; + } + const std::string normalized_fpath = NormalizePath(path + fname); + largest_file_number = std::max(largest_file_number, number); + if (type == kTableFile && number >= next_file_number && + files_to_delete.find(normalized_fpath) == files_to_delete.end()) { + files_to_delete.insert(normalized_fpath); + } + } + } + if (largest_file_number > next_file_number) { + versions_->next_file_number_.store(largest_file_number + 1); + } + mutex_.Unlock(); + Status s; + for (const auto& fname : files_to_delete) { + s = env_->DeleteFile(fname); + if (!s.ok()) { + break; + } + } + mutex_.Lock(); + return s; +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 4318e5b75..09728b6ab 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -252,6 +252,12 @@ Status DBImpl::ValidateOptions(const DBOptions& db_options) { "atomic_flush is incompatible with enable_pipelined_write"); } + // TODO remove this restriction + if (db_options.atomic_flush && db_options.best_efforts_recovery) { + return Status::InvalidArgument( + "atomic_flush is currently incompatible with best-efforts recovery"); + } + return Status::OK(); } @@ -419,7 +425,17 @@ Status DBImpl::Recover( } } assert(db_id_.empty()); - Status s = versions_->Recover(column_families, read_only, &db_id_); + Status s; + bool missing_table_file = false; + if (!immutable_db_options_.best_efforts_recovery) { + s = versions_->Recover(column_families, read_only, &db_id_); + } else { + s = versions_->TryRecover(column_families, read_only, &db_id_, + &missing_table_file); + if (s.ok()) { + s = CleanupFilesAfterRecovery(); + } + } if (!s.ok()) { return s; } @@ -499,7 +515,9 @@ Status DBImpl::Recover( // attention to it in case we are recovering a database // produced by an older version of rocksdb. std::vector filenames; - s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames); + if (!immutable_db_options_.best_efforts_recovery) { + s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames); + } if (s.IsNotFound()) { return Status::InvalidArgument("wal_dir not found", immutable_db_options_.wal_dir); diff --git a/db/version_builder.cc b/db/version_builder.cc index d0ea16f0a..cb4c5a246 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -528,4 +528,26 @@ Status VersionBuilder::LoadTableHandlers( is_initial_load, prefix_extractor); } +BaseReferencedVersionBuilder::BaseReferencedVersionBuilder( + ColumnFamilyData* cfd) + : version_builder_(new VersionBuilder( + cfd->current()->version_set()->file_options(), cfd->table_cache(), + cfd->current()->storage_info(), cfd->ioptions()->info_log)), + version_(cfd->current()) { + version_->Ref(); +} + +BaseReferencedVersionBuilder::BaseReferencedVersionBuilder( + ColumnFamilyData* cfd, Version* v) + : version_builder_(new VersionBuilder( + cfd->current()->version_set()->file_options(), cfd->table_cache(), + v->storage_info(), cfd->ioptions()->info_log)), + version_(v) { + assert(version_ != cfd->current()); +} + +BaseReferencedVersionBuilder::~BaseReferencedVersionBuilder() { + version_->Unref(); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_builder.h b/db/version_builder.h index a2c8e0294..1c75fee6d 100644 --- a/db/version_builder.h +++ b/db/version_builder.h @@ -21,6 +21,8 @@ class VersionStorageInfo; class VersionEdit; struct FileMetaData; class InternalStats; +class Version; +class ColumnFamilyData; // A helper class so we can efficiently apply a whole sequence // of edits to a particular state without creating intermediate @@ -44,5 +46,20 @@ class VersionBuilder { std::unique_ptr rep_; }; +// A wrapper of version builder which references the current version in +// constructor and unref it in the destructor. +// Both of the constructor and destructor need to be called inside DB Mutex. +class BaseReferencedVersionBuilder { + public: + explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd); + BaseReferencedVersionBuilder(ColumnFamilyData* cfd, Version* v); + ~BaseReferencedVersionBuilder(); + VersionBuilder* version_builder() const { return version_builder_.get(); } + + private: + std::unique_ptr version_builder_; + Version* version_; +}; + extern bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b); } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_edit.h b/db/version_edit.h index 221a1efdb..1ea38cf68 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -438,6 +438,8 @@ class VersionEdit { private: friend class ReactiveVersionSet; + friend class VersionEditHandler; + friend class VersionEditHandlerPointInTime; friend class VersionSet; friend class Version; friend class AtomicGroupReadBuffer; diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc new file mode 100644 index 000000000..44e21efa6 --- /dev/null +++ b/db/version_edit_handler.cc @@ -0,0 +1,576 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/version_edit_handler.h" + +#include "monitoring/persistent_stats_history.h" + +namespace ROCKSDB_NAMESPACE { + +VersionEditHandler::VersionEditHandler( + bool read_only, const std::vector& column_families, + VersionSet* version_set, bool track_missing_files, + bool no_error_if_table_files_missing) + : read_only_(read_only), + column_families_(column_families), + status_(), + version_set_(version_set), + track_missing_files_(track_missing_files), + no_error_if_table_files_missing_(no_error_if_table_files_missing), + initialized_(false) { + assert(version_set_ != nullptr); +} + +Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) { + Slice record; + std::string scratch; + size_t recovered_edits = 0; + Status s = Initialize(); + while (reader.ReadRecord(&record, &scratch) && s.ok()) { + VersionEdit edit; + s = edit.DecodeFrom(record); + if (!s.ok()) { + break; + } + if (edit.has_db_id_) { + version_set_->db_id_ = edit.GetDbId(); + if (db_id != nullptr) { + *db_id = version_set_->db_id_; + } + } + s = read_buffer_.AddEdit(&edit); + if (!s.ok()) { + break; + } + ColumnFamilyData* cfd = nullptr; + if (edit.is_in_atomic_group_) { + if (read_buffer_.IsFull()) { + for (auto& e : read_buffer_.replay_buffer()) { + s = ApplyVersionEdit(e, &cfd); + if (!s.ok()) { + break; + } + ++recovered_edits; + } + if (!s.ok()) { + break; + } + read_buffer_.Clear(); + } + } else { + s = ApplyVersionEdit(edit, &cfd); + if (s.ok()) { + ++recovered_edits; + } + } + } + + CheckIterationResult(reader, &s); + + if (!s.ok()) { + status_ = s; + } + return s; +} + +Status VersionEditHandler::Initialize() { + Status s; + if (!initialized_) { + for (const auto& cf_desc : column_families_) { + name_to_options_.emplace(cf_desc.name, cf_desc.options); + } + auto default_cf_iter = name_to_options_.find(kDefaultColumnFamilyName); + if (default_cf_iter == name_to_options_.end()) { + s = Status::InvalidArgument("Default column family not specified"); + } + if (s.ok()) { + VersionEdit default_cf_edit; + default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName); + default_cf_edit.SetColumnFamily(0); + ColumnFamilyData* cfd = + CreateCfAndInit(default_cf_iter->second, default_cf_edit); + assert(cfd != nullptr); +#ifdef NDEBUG + (void)cfd; +#endif + initialized_ = true; + } + } + return s; +} + +Status VersionEditHandler::ApplyVersionEdit(VersionEdit& edit, + ColumnFamilyData** cfd) { + Status s; + if (edit.is_column_family_add_) { + s = OnColumnFamilyAdd(edit, cfd); + } else if (edit.is_column_family_drop_) { + s = OnColumnFamilyDrop(edit, cfd); + } else { + s = OnNonCfOperation(edit, cfd); + } + if (s.ok()) { + assert(cfd != nullptr); + s = ExtractInfoFromVersionEdit(*cfd, edit); + } + return s; +} + +Status VersionEditHandler::OnColumnFamilyAdd(VersionEdit& edit, + ColumnFamilyData** cfd) { + bool cf_in_not_found = false; + bool cf_in_builders = false; + CheckColumnFamilyId(edit, &cf_in_not_found, &cf_in_builders); + + assert(cfd != nullptr); + *cfd = nullptr; + Status s; + if (cf_in_builders || cf_in_not_found) { + s = Status::Corruption("MANIFEST adding the same column family twice: " + + edit.column_family_name_); + } + if (s.ok()) { + auto cf_options = name_to_options_.find(edit.column_family_name_); + // implicitly add persistent_stats column family without requiring user + // to specify + ColumnFamilyData* tmp_cfd = nullptr; + bool is_persistent_stats_column_family = + edit.column_family_name_.compare(kPersistentStatsColumnFamilyName) == 0; + if (cf_options == name_to_options_.end() && + !is_persistent_stats_column_family) { + column_families_not_found_.emplace(edit.column_family_, + edit.column_family_name_); + } else { + if (is_persistent_stats_column_family) { + ColumnFamilyOptions cfo; + OptimizeForPersistentStats(&cfo); + tmp_cfd = CreateCfAndInit(cfo, edit); + } else { + tmp_cfd = CreateCfAndInit(cf_options->second, edit); + } + *cfd = tmp_cfd; + } + } + return s; +} + +Status VersionEditHandler::OnColumnFamilyDrop(VersionEdit& edit, + ColumnFamilyData** cfd) { + bool cf_in_not_found = false; + bool cf_in_builders = false; + CheckColumnFamilyId(edit, &cf_in_not_found, &cf_in_builders); + + assert(cfd != nullptr); + *cfd = nullptr; + ColumnFamilyData* tmp_cfd = nullptr; + Status s; + if (cf_in_builders) { + tmp_cfd = DestroyCfAndCleanup(edit); + } else if (cf_in_not_found) { + column_families_not_found_.erase(edit.column_family_); + } else { + s = Status::Corruption("MANIFEST - dropping non-existing column family"); + } + *cfd = tmp_cfd; + return s; +} + +Status VersionEditHandler::OnNonCfOperation(VersionEdit& edit, + ColumnFamilyData** cfd) { + bool cf_in_not_found = false; + bool cf_in_builders = false; + CheckColumnFamilyId(edit, &cf_in_not_found, &cf_in_builders); + + assert(cfd != nullptr); + *cfd = nullptr; + Status s; + if (!cf_in_not_found) { + if (!cf_in_builders) { + s = Status::Corruption( + "MANIFEST record referencing unknown column family"); + } + ColumnFamilyData* tmp_cfd = nullptr; + if (s.ok()) { + auto builder_iter = builders_.find(edit.column_family_); + assert(builder_iter != builders_.end()); + tmp_cfd = version_set_->GetColumnFamilySet()->GetColumnFamily( + edit.column_family_); + assert(tmp_cfd != nullptr); + s = MaybeCreateVersion(edit, tmp_cfd, /*force_create_version=*/false); + if (s.ok()) { + s = builder_iter->second->version_builder()->Apply(&edit); + } + } + *cfd = tmp_cfd; + } + return s; +} + +// TODO maybe cache the computation result +bool VersionEditHandler::HasMissingFiles() const { + bool ret = false; + for (const auto& elem : cf_to_missing_files_) { + const auto& missing_files = elem.second; + if (!missing_files.empty()) { + ret = true; + break; + } + } + return ret; +} + +void VersionEditHandler::CheckColumnFamilyId(const VersionEdit& edit, + bool* cf_in_not_found, + bool* cf_in_builders) const { + assert(cf_in_not_found != nullptr); + assert(cf_in_builders != nullptr); + // Not found means that user didn't supply that column + // family option AND we encountered column family add + // record. Once we encounter column family drop record, + // we will delete the column family from + // column_families_not_found. + bool in_not_found = column_families_not_found_.find(edit.column_family_) != + column_families_not_found_.end(); + // in builders means that user supplied that column family + // option AND that we encountered column family add record + bool in_builders = builders_.find(edit.column_family_) != builders_.end(); + // They cannot both be true + assert(!(in_not_found && in_builders)); + *cf_in_not_found = in_not_found; + *cf_in_builders = in_builders; +} + +void VersionEditHandler::CheckIterationResult(const log::Reader& reader, + Status* s) { + assert(s != nullptr); + if (!s->ok()) { + read_buffer_.Clear(); + } else if (!version_edit_params_.has_log_number_ || + !version_edit_params_.has_next_file_number_ || + !version_edit_params_.has_last_sequence_) { + std::string msg("no "); + if (!version_edit_params_.has_log_number_) { + msg.append("log_file_number, "); + } + if (!version_edit_params_.has_next_file_number_) { + msg.append("next_file_number, "); + } + if (!version_edit_params_.has_last_sequence_) { + msg.append("last_sequence, "); + } + msg = msg.substr(0, msg.size() - 2); + msg.append(" entry in MANIFEST"); + *s = Status::Corruption(msg); + } + if (s->ok() && !read_only_ && !column_families_not_found_.empty()) { + std::string msg; + for (const auto& cf : column_families_not_found_) { + msg.append(", "); + msg.append(cf.second); + } + msg = msg.substr(2); + *s = Status::InvalidArgument("Column families not opened: " + msg); + } + if (s->ok()) { + version_set_->GetColumnFamilySet()->UpdateMaxColumnFamily( + version_edit_params_.max_column_family_); + version_set_->MarkMinLogNumberToKeep2PC( + version_edit_params_.min_log_number_to_keep_); + version_set_->MarkFileNumberUsed(version_edit_params_.prev_log_number_); + version_set_->MarkFileNumberUsed(version_edit_params_.log_number_); + for (auto* cfd : *(version_set_->GetColumnFamilySet())) { + auto builder_iter = builders_.find(cfd->GetID()); + assert(builder_iter != builders_.end()); + auto* builder = builder_iter->second->version_builder(); + if (!builder->CheckConsistencyForNumLevels()) { + *s = Status::InvalidArgument( + "db has more levels than options.num_levels"); + break; + } + } + } + if (s->ok()) { + for (auto* cfd : *(version_set_->GetColumnFamilySet())) { + if (cfd->IsDropped()) { + continue; + } + if (read_only_) { + cfd->table_cache()->SetTablesAreImmortal(); + } + *s = LoadTables(cfd, /*prefetch_index_and_filter_in_cache=*/false, + /*is_initial_load=*/true); + if (!s->ok()) { + break; + } + } + } + if (s->ok()) { + for (auto* cfd : *(version_set_->column_family_set_)) { + if (cfd->IsDropped()) { + continue; + } + assert(cfd->initialized()); + VersionEdit edit; + *s = MaybeCreateVersion(edit, cfd, /*force_create_version=*/true); + if (!s->ok()) { + break; + } + } + } + if (s->ok()) { + version_set_->manifest_file_size_ = reader.GetReadOffset(); + assert(version_set_->manifest_file_size_ > 0); + version_set_->next_file_number_.store( + version_edit_params_.next_file_number_ + 1); + version_set_->last_allocated_sequence_ = + version_edit_params_.last_sequence_; + version_set_->last_published_sequence_ = + version_edit_params_.last_sequence_; + version_set_->last_sequence_ = version_edit_params_.last_sequence_; + version_set_->prev_log_number_ = version_edit_params_.prev_log_number_; + } +} + +ColumnFamilyData* VersionEditHandler::CreateCfAndInit( + const ColumnFamilyOptions& cf_options, const VersionEdit& edit) { + ColumnFamilyData* cfd = version_set_->CreateColumnFamily(cf_options, &edit); + assert(cfd != nullptr); + cfd->set_initialized(); + assert(builders_.find(edit.column_family_) == builders_.end()); + builders_.emplace(edit.column_family_, + VersionBuilderUPtr(new BaseReferencedVersionBuilder(cfd))); + if (track_missing_files_) { + cf_to_missing_files_.emplace(edit.column_family_, + std::unordered_set()); + } + return cfd; +} + +ColumnFamilyData* VersionEditHandler::DestroyCfAndCleanup( + const VersionEdit& edit) { + auto builder_iter = builders_.find(edit.column_family_); + assert(builder_iter != builders_.end()); + builders_.erase(builder_iter); + if (track_missing_files_) { + auto missing_files_iter = cf_to_missing_files_.find(edit.column_family_); + assert(missing_files_iter != cf_to_missing_files_.end()); + cf_to_missing_files_.erase(missing_files_iter); + } + ColumnFamilyData* ret = + version_set_->GetColumnFamilySet()->GetColumnFamily(edit.column_family_); + assert(ret != nullptr); + if (ret->UnrefAndTryDelete()) { + ret = nullptr; + } else { + assert(false); + } + return ret; +} + +Status VersionEditHandler::MaybeCreateVersion(const VersionEdit& /*edit*/, + ColumnFamilyData* cfd, + bool force_create_version) { + assert(cfd->initialized()); + if (force_create_version) { + auto builder_iter = builders_.find(cfd->GetID()); + assert(builder_iter != builders_.end()); + auto* builder = builder_iter->second->version_builder(); + auto* v = new Version(cfd, version_set_, version_set_->file_options_, + *cfd->GetLatestMutableCFOptions(), + version_set_->current_version_number_++); + builder->SaveTo(v->storage_info()); + // Install new version + v->PrepareApply(*cfd->GetLatestMutableCFOptions(), + !(version_set_->db_options_->skip_stats_update_on_db_open)); + version_set_->AppendVersion(cfd, v); + } + return Status::OK(); +} + +Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd, + bool prefetch_index_and_filter_in_cache, + bool is_initial_load) { + assert(cfd != nullptr); + assert(!cfd->IsDropped()); + Status s; + auto builder_iter = builders_.find(cfd->GetID()); + assert(builder_iter != builders_.end()); + assert(builder_iter->second != nullptr); + VersionBuilder* builder = builder_iter->second->version_builder(); + assert(builder); + s = builder->LoadTableHandlers( + cfd->internal_stats(), + version_set_->db_options_->max_file_opening_threads, + prefetch_index_and_filter_in_cache, is_initial_load, + cfd->GetLatestMutableCFOptions()->prefix_extractor.get()); + if (s.IsPathNotFound() && no_error_if_table_files_missing_) { + s = Status::OK(); + } + if (!s.ok() && !version_set_->db_options_->paranoid_checks) { + s = Status::OK(); + } + return s; +} + +Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, + const VersionEdit& edit) { + Status s; + if (cfd != nullptr) { + if (edit.has_db_id_) { + version_edit_params_.SetDBId(edit.db_id_); + } + if (edit.has_log_number_) { + if (cfd->GetLogNumber() > edit.log_number_) { + ROCKS_LOG_WARN( + version_set_->db_options()->info_log, + "MANIFEST corruption detected, but ignored - Log numbers in " + "records NOT monotonically increasing"); + } else { + cfd->SetLogNumber(edit.log_number_); + version_edit_params_.SetLogNumber(edit.log_number_); + } + } + if (edit.has_comparator_ && + edit.comparator_ != cfd->user_comparator()->Name()) { + s = Status::InvalidArgument( + cfd->user_comparator()->Name(), + "does not match existing comparator " + edit.comparator_); + } + } + + if (s.ok()) { + if (edit.has_prev_log_number_) { + version_edit_params_.SetPrevLogNumber(edit.prev_log_number_); + } + if (edit.has_next_file_number_) { + version_edit_params_.SetNextFile(edit.next_file_number_); + } + if (edit.has_max_column_family_) { + version_edit_params_.SetMaxColumnFamily(edit.max_column_family_); + } + if (edit.has_min_log_number_to_keep_) { + version_edit_params_.min_log_number_to_keep_ = + std::max(version_edit_params_.min_log_number_to_keep_, + edit.min_log_number_to_keep_); + } + if (edit.has_last_sequence_) { + version_edit_params_.SetLastSequence(edit.last_sequence_); + } + if (!version_edit_params_.has_prev_log_number_) { + version_edit_params_.SetPrevLogNumber(0); + } + } + return s; +} + +VersionEditHandlerPointInTime::VersionEditHandlerPointInTime( + bool read_only, const std::vector& column_families, + VersionSet* version_set) + : VersionEditHandler(read_only, column_families, version_set, + /*track_missing_files=*/true, + /*no_error_if_table_files_missing=*/true) {} + +VersionEditHandlerPointInTime::~VersionEditHandlerPointInTime() { + for (const auto& elem : versions_) { + delete elem.second; + } + versions_.clear(); +} + +void VersionEditHandlerPointInTime::CheckIterationResult( + const log::Reader& reader, Status* s) { + VersionEditHandler::CheckIterationResult(reader, s); + assert(s != nullptr); + if (s->ok()) { + for (auto* cfd : *(version_set_->column_family_set_)) { + if (cfd->IsDropped()) { + continue; + } + assert(cfd->initialized()); + auto v_iter = versions_.find(cfd->GetID()); + if (v_iter != versions_.end()) { + assert(v_iter->second != nullptr); + + version_set_->AppendVersion(cfd, v_iter->second); + versions_.erase(v_iter); + } + } + } +} + +ColumnFamilyData* VersionEditHandlerPointInTime::DestroyCfAndCleanup( + const VersionEdit& edit) { + ColumnFamilyData* cfd = VersionEditHandler::DestroyCfAndCleanup(edit); + auto v_iter = versions_.find(edit.column_family_); + if (v_iter != versions_.end()) { + delete v_iter->second; + versions_.erase(v_iter); + } + return cfd; +} + +Status VersionEditHandlerPointInTime::MaybeCreateVersion( + const VersionEdit& edit, ColumnFamilyData* cfd, bool force_create_version) { + assert(cfd != nullptr); + if (!force_create_version) { + assert(edit.column_family_ == cfd->GetID()); + } + auto missing_files_iter = cf_to_missing_files_.find(cfd->GetID()); + assert(missing_files_iter != cf_to_missing_files_.end()); + std::unordered_set& missing_files = missing_files_iter->second; + const bool prev_has_missing_files = !missing_files.empty(); + for (const auto& file : edit.GetDeletedFiles()) { + uint64_t file_num = file.second; + auto fiter = missing_files.find(file_num); + if (fiter != missing_files.end()) { + missing_files.erase(fiter); + } + } + Status s; + for (const auto& elem : edit.GetNewFiles()) { + const FileMetaData& meta = elem.second; + const FileDescriptor& fd = meta.fd; + uint64_t file_num = fd.GetNumber(); + const std::string fpath = + MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_num); + s = version_set_->VerifyFileMetadata(fpath, meta); + if (s.IsPathNotFound() || s.IsNotFound()) { + missing_files.insert(file_num); + s = Status::OK(); + } + } + bool missing_info = !version_edit_params_.has_log_number_ || + !version_edit_params_.has_next_file_number_ || + !version_edit_params_.has_last_sequence_; + + // Create version before apply edit + if (!missing_info && ((!missing_files.empty() && !prev_has_missing_files) || + (missing_files.empty() && force_create_version))) { + auto builder_iter = builders_.find(cfd->GetID()); + assert(builder_iter != builders_.end()); + auto* builder = builder_iter->second->version_builder(); + auto* version = new Version(cfd, version_set_, version_set_->file_options_, + *cfd->GetLatestMutableCFOptions(), + version_set_->current_version_number_++); + builder->SaveTo(version->storage_info()); + version->PrepareApply( + *cfd->GetLatestMutableCFOptions(), + !version_set_->db_options_->skip_stats_update_on_db_open); + auto v_iter = versions_.find(cfd->GetID()); + if (v_iter != versions_.end()) { + delete v_iter->second; + v_iter->second = version; + } else { + versions_.emplace(cfd->GetID(), version); + } + } + return s; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/version_edit_handler.h b/db/version_edit_handler.h new file mode 100644 index 000000000..7df940d6f --- /dev/null +++ b/db/version_edit_handler.h @@ -0,0 +1,123 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include "db/version_builder.h" +#include "db/version_edit.h" +#include "db/version_set.h" + +namespace ROCKSDB_NAMESPACE { + +typedef std::unique_ptr VersionBuilderUPtr; + +// A class used for scanning MANIFEST file. +// VersionEditHandler reads a MANIFEST file, parses the version edits, and +// builds the version set's in-memory state, e.g. the version storage info for +// the versions of column families. +// To use this class and its subclasses, +// 1. Create an object of VersionEditHandler or its subclasses. +// VersionEditHandler handler(read_only, column_families, version_set, +// track_missing_files, ignore_missing_files); +// 2. Status s = handler.Iterate(reader, &db_id); +// 3. Check s and handle possible errors. +// +// Not thread-safe, external synchronization is necessary if an object of +// VersionEditHandler is shared by multiple threads. +class VersionEditHandler { + public: + explicit VersionEditHandler( + bool read_only, + const std::vector& column_families, + VersionSet* version_set, bool track_missing_files, + bool ignore_missing_files); + + virtual ~VersionEditHandler() {} + + Status Iterate(log::Reader& reader, std::string* db_id); + + const Status& status() const { return status_; } + + bool HasMissingFiles() const; + + protected: + Status ApplyVersionEdit(VersionEdit& edit, ColumnFamilyData** cfd); + + Status OnColumnFamilyAdd(VersionEdit& edit, ColumnFamilyData** cfd); + + Status OnColumnFamilyDrop(VersionEdit& edit, ColumnFamilyData** cfd); + + Status OnNonCfOperation(VersionEdit& edit, ColumnFamilyData** cfd); + + Status Initialize(); + + void CheckColumnFamilyId(const VersionEdit& edit, bool* cf_in_not_found, + bool* cf_in_builders) const; + + virtual void CheckIterationResult(const log::Reader& reader, Status* s); + + ColumnFamilyData* CreateCfAndInit(const ColumnFamilyOptions& cf_options, + const VersionEdit& edit); + + virtual ColumnFamilyData* DestroyCfAndCleanup(const VersionEdit& edit); + + virtual Status MaybeCreateVersion(const VersionEdit& edit, + ColumnFamilyData* cfd, + bool force_create_version); + + Status LoadTables(ColumnFamilyData* cfd, + bool prefetch_index_and_filter_in_cache, + bool is_initial_load); + + const bool read_only_; + const std::vector& column_families_; + Status status_; + VersionSet* version_set_; + AtomicGroupReadBuffer read_buffer_; + std::unordered_map builders_; + std::unordered_map name_to_options_; + std::unordered_map column_families_not_found_; + VersionEditParams version_edit_params_; + const bool track_missing_files_; + std::unordered_map> + cf_to_missing_files_; + bool no_error_if_table_files_missing_; + + private: + Status ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, + const VersionEdit& edit); + + bool initialized_; +}; + +// A class similar to its base class, i.e. VersionEditHandler. +// VersionEditHandlerPointInTime restores the versions to the most recent point +// in time such that at this point, the version does not have missing files. +// +// Not thread-safe, external synchronization is necessary if an object of +// VersionEditHandlerPointInTime is shared by multiple threads. +class VersionEditHandlerPointInTime : public VersionEditHandler { + public: + VersionEditHandlerPointInTime( + bool read_only, + const std::vector& column_families, + VersionSet* version_set); + ~VersionEditHandlerPointInTime() override; + + protected: + void CheckIterationResult(const log::Reader& reader, Status* s) override; + ColumnFamilyData* DestroyCfAndCleanup(const VersionEdit& edit) override; + Status MaybeCreateVersion(const VersionEdit& edit, ColumnFamilyData* cfd, + bool force_create_version) override; + + private: + std::unordered_map versions_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/version_set.cc b/db/version_set.cc index 99bc12cba..6753a58d0 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -29,6 +29,7 @@ #include "db/pinned_iterators_manager.h" #include "db/table_cache.h" #include "db/version_builder.h" +#include "db/version_edit_handler.h" #include "file/filename.h" #include "file/random_access_file_reader.h" #include "file/read_write_util.h" @@ -1202,28 +1203,6 @@ void LevelIterator::InitFileIterator(size_t new_file_index) { } } // anonymous namespace -// A wrapper of version builder which references the current version in -// constructor and unref it in the destructor. -// Both of the constructor and destructor need to be called inside DB Mutex. -class BaseReferencedVersionBuilder { - public: - explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd) - : version_builder_(new VersionBuilder( - cfd->current()->version_set()->file_options(), cfd->table_cache(), - cfd->current()->storage_info(), cfd->ioptions()->info_log)), - version_(cfd->current()) { - version_->Ref(); - } - ~BaseReferencedVersionBuilder() { - version_->Unref(); - } - VersionBuilder* version_builder() { return version_builder_.get(); } - - private: - std::unique_ptr version_builder_; - Version* version_; -}; - Status Version::GetTableProperties(std::shared_ptr* tp, const FileMetaData* file_meta, const std::string* fname) const { @@ -3570,6 +3549,33 @@ VersionSet::~VersionSet() { obsolete_files_.clear(); } +void VersionSet::Reset() { + if (column_family_set_) { + Cache* table_cache = column_family_set_->get_table_cache(); + WriteBufferManager* wbm = column_family_set_->write_buffer_manager(); + WriteController* wc = column_family_set_->write_controller(); + column_family_set_.reset(new ColumnFamilySet(dbname_, db_options_, + file_options_, table_cache, + wbm, wc, block_cache_tracer_)); + } + db_id_.clear(); + next_file_number_.store(2); + min_log_number_to_keep_2pc_.store(0); + manifest_file_number_ = 0; + options_file_number_ = 0; + pending_manifest_file_number_ = 0; + last_sequence_.store(0); + last_allocated_sequence_.store(0); + last_published_sequence_.store(0); + prev_log_number_ = 0; + descriptor_log_.reset(); + current_version_number_ = 0; + manifest_writers_.clear(); + manifest_file_size_ = 0; + obsolete_files_.clear(); + obsolete_manifests_.clear(); +} + void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, Version* v) { // compute new compaction score @@ -4452,8 +4458,6 @@ Status VersionSet::Recover( reporter.status = &s; log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter, true /* checksum */, 0 /* log_number */); - Slice record; - std::string scratch; AtomicGroupReadBuffer read_buffer; s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options, column_families_not_found, builders, @@ -4581,6 +4585,159 @@ Status VersionSet::Recover( return s; } +namespace { +class ManifestPicker { + public: + explicit ManifestPicker(const std::string& dbname, FileSystem* fs); + void SeekToFirstManifest(); + // REQUIRES Valid() == true + std::string GetNextManifest(uint64_t* file_number, std::string* file_name); + bool Valid() const { return manifest_file_iter_ != manifest_files_.end(); } + const Status& status() const { return status_; } + + private: + const std::string& dbname_; + FileSystem* const fs_; + // MANIFEST file names(s) + std::vector manifest_files_; + std::vector::const_iterator manifest_file_iter_; + Status status_; +}; + +ManifestPicker::ManifestPicker(const std::string& dbname, FileSystem* fs) + : dbname_(dbname), fs_(fs) {} + +void ManifestPicker::SeekToFirstManifest() { + assert(fs_ != nullptr); + std::vector children; + Status s = fs_->GetChildren(dbname_, IOOptions(), &children, /*dbg=*/nullptr); + if (!s.ok()) { + status_ = s; + return; + } + for (const auto& fname : children) { + uint64_t file_num = 0; + FileType file_type; + bool parse_ok = ParseFileName(fname, &file_num, &file_type); + if (parse_ok && file_type == kDescriptorFile) { + manifest_files_.push_back(fname); + } + } + std::sort(manifest_files_.begin(), manifest_files_.end(), + [](const std::string& lhs, const std::string& rhs) { + uint64_t num1 = 0; + uint64_t num2 = 0; + FileType type1; + FileType type2; + bool parse_ok1 = ParseFileName(lhs, &num1, &type1); + bool parse_ok2 = ParseFileName(rhs, &num2, &type2); +#ifndef NDEBUG + assert(parse_ok1); + assert(parse_ok2); +#else + (void)parse_ok1; + (void)parse_ok2; +#endif + return num1 > num2; + }); + manifest_file_iter_ = manifest_files_.begin(); +} + +std::string ManifestPicker::GetNextManifest(uint64_t* number, + std::string* file_name) { + assert(status_.ok()); + assert(Valid()); + std::string ret; + if (manifest_file_iter_ != manifest_files_.end()) { + ret.assign(dbname_); + if (ret.back() != kFilePathSeparator) { + ret.push_back(kFilePathSeparator); + } + ret.append(*manifest_file_iter_); + if (number) { + FileType type; + bool parse = ParseFileName(*manifest_file_iter_, number, &type); + assert(type == kDescriptorFile); +#ifndef NDEBUG + assert(parse); +#else + (void)parse; +#endif + } + if (file_name) { + *file_name = *manifest_file_iter_; + } + ++manifest_file_iter_; + } + return ret; +} +} // namespace + +Status VersionSet::TryRecover( + const std::vector& column_families, bool read_only, + std::string* db_id, bool* has_missing_table_file) { + ManifestPicker manifest_picker(dbname_, fs_); + manifest_picker.SeekToFirstManifest(); + Status s = manifest_picker.status(); + if (!s.ok()) { + return s; + } + if (!manifest_picker.Valid()) { + return Status::Corruption("Cannot locate MANIFEST file in " + dbname_); + } + std::string manifest_path = + manifest_picker.GetNextManifest(&manifest_file_number_, nullptr); + while (!manifest_path.empty()) { + s = TryRecoverFromOneManifest(manifest_path, column_families, read_only, + db_id, has_missing_table_file); + if (s.ok() || !manifest_picker.Valid()) { + break; + } + Reset(); + manifest_path = + manifest_picker.GetNextManifest(&manifest_file_number_, nullptr); + } + return s; +} + +Status VersionSet::TryRecoverFromOneManifest( + const std::string& manifest_path, + const std::vector& column_families, bool read_only, + std::string* db_id, bool* has_missing_table_file) { + ROCKS_LOG_INFO(db_options_->info_log, "Trying to recover from manifest: %s\n", + manifest_path.c_str()); + std::unique_ptr manifest_file_reader; + Status s; + { + std::unique_ptr manifest_file; + s = fs_->NewSequentialFile(manifest_path, + fs_->OptimizeForManifestRead(file_options_), + &manifest_file, nullptr); + if (!s.ok()) { + return s; + } + manifest_file_reader.reset( + new SequentialFileReader(std::move(manifest_file), manifest_path, + db_options_->log_readahead_size)); + } + + VersionSet::LogReporter reporter; + reporter.status = &s; + log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter, + /*checksum=*/true, /*log_num=*/0); + { + VersionEditHandlerPointInTime handler_pit(read_only, column_families, + const_cast(this)); + + s = handler_pit.Iterate(reader, db_id); + + assert(nullptr != has_missing_table_file); + *has_missing_table_file = handler_pit.HasMissingFiles(); + } + + return s; +} + Status VersionSet::ListColumnFamilies(std::vector* column_families, const std::string& dbname, FileSystem* fs) { @@ -5518,7 +5675,7 @@ void VersionSet::GetObsoleteFiles(std::vector* files, } ColumnFamilyData* VersionSet::CreateColumnFamily( - const ColumnFamilyOptions& cf_options, VersionEdit* edit) { + const ColumnFamilyOptions& cf_options, const VersionEdit* edit) { assert(edit->is_column_family_add_); MutableCFOptions dummy_cf_options; @@ -5573,6 +5730,18 @@ uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) { return total_files_size; } +Status VersionSet::VerifyFileMetadata(const std::string& fpath, + const FileMetaData& meta) const { + uint64_t fsize = 0; + Status status = fs_->GetFileSize(fpath, IOOptions(), &fsize, nullptr); + if (status.ok()) { + if (fsize != meta.fd.GetFileSize()) { + status = Status::Corruption("File size mismatch: " + fpath); + } + } + return status; +} + ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname, const ImmutableDBOptions* _db_options, const FileOptions& _file_options, diff --git a/db/version_set.h b/db/version_set.h index a6629e89d..b33a7c6e8 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -688,6 +688,8 @@ class Version { FileSystem* fs_; friend class ReactiveVersionSet; friend class VersionSet; + friend class VersionEditHandler; + friend class VersionEditHandlerPointInTime; const InternalKeyComparator* internal_comparator() const { return storage_info_.internal_comparator_; @@ -876,6 +878,17 @@ class VersionSet { Status Recover(const std::vector& column_families, bool read_only = false, std::string* db_id = nullptr); + Status TryRecover(const std::vector& column_families, + bool read_only, std::string* db_id, + bool* has_missing_table_file); + + // Try to recover the version set to the most recent consistent state + // recorded in the specified manifest. + Status TryRecoverFromOneManifest( + const std::string& manifest_path, + const std::vector& column_families, + bool read_only, std::string* db_id, bool* has_missing_table_file); + // Reads a manifest file and returns a list of column families in // column_families. static Status ListColumnFamilies(std::vector* column_families, @@ -1059,6 +1072,8 @@ class VersionSet { struct ManifestWriter; friend class Version; + friend class VersionEditHandler; + friend class VersionEditHandlerPointInTime; friend class DBImpl; friend class DBImplReadOnly; @@ -1069,6 +1084,8 @@ class VersionSet { } }; + void Reset(); + // Returns approximated offset of a key in a file for a given version. uint64_t ApproximateOffsetOf(Version* v, const FdWithKeyRange& f, const Slice& key, TableReaderCaller caller); @@ -1091,7 +1108,7 @@ class VersionSet { void AppendVersion(ColumnFamilyData* column_family_data, Version* v); ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& cf_options, - VersionEdit* edit); + const VersionEdit* edit); Status ReadAndRecover( log::Reader* reader, AtomicGroupReadBuffer* read_buffer, @@ -1115,8 +1132,10 @@ class VersionSet { const VersionEdit& from_edit, VersionEditParams* version_edit_params); - std::unique_ptr column_family_set_; + Status VerifyFileMetadata(const std::string& fpath, + const FileMetaData& meta) const; + std::unique_ptr column_family_set_; Env* const env_; FileSystem* const fs_; const std::string dbname_; diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 03e0e26d2..e877273d2 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -10,6 +10,7 @@ #include "db/version_set.h" #include "db/db_impl/db_impl.h" #include "db/log_writer.h" +#include "env/mock_env.h" #include "logging/logging.h" #include "table/mock_table.h" #include "test_util/testharness.h" @@ -611,16 +612,37 @@ class VersionSetTestBase { const static std::string kColumnFamilyName3; int num_initial_edits_; - VersionSetTestBase() - : env_(Env::Default()), - fs_(std::make_shared(env_)), - dbname_(test::PerThreadDBPath("version_set_test")), - db_options_(), + explicit VersionSetTestBase(const std::string& name) + : mem_env_(nullptr), + env_(nullptr), + env_guard_(), + fs_(), + dbname_(test::PerThreadDBPath(name)), + options_(), + db_options_(options_), + cf_options_(options_), + immutable_cf_options_(db_options_, cf_options_), mutable_cf_options_(cf_options_), table_cache_(NewLRUCache(50000, 16)), write_buffer_manager_(db_options_.db_write_buffer_size), shutting_down_(false), mock_table_factory_(std::make_shared()) { + const char* test_env_uri = getenv("TEST_ENV_URI"); + Env* base_env = nullptr; + if (test_env_uri) { + Status s = Env::LoadEnv(test_env_uri, &base_env, &env_guard_); + EXPECT_OK(s); + EXPECT_NE(Env::Default(), base_env); + } else { + base_env = Env::Default(); + } + EXPECT_NE(nullptr, base_env); + if (getenv("MEM_ENV")) { + mem_env_ = new MockEnv(base_env); + } + env_ = mem_env_ ? mem_env_ : base_env; + + fs_ = std::make_shared(env_); EXPECT_OK(env_->CreateDirIfMissing(dbname_)); db_options_.env = env_; @@ -628,23 +650,38 @@ class VersionSetTestBase { versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_, - /*block_cache_tracer=*/nullptr)), - reactive_versions_ = std::make_shared( - dbname_, &db_options_, env_options_, table_cache_.get(), - &write_buffer_manager_, &write_controller_); + /*block_cache_tracer=*/nullptr)); + reactive_versions_ = std::make_shared( + dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_); db_options_.db_paths.emplace_back(dbname_, std::numeric_limits::max()); } - void PrepareManifest(std::vector* column_families, - SequenceNumber* last_seqno, - std::unique_ptr* log_writer) { + virtual ~VersionSetTestBase() { + if (getenv("KEEP_DB")) { + fprintf(stdout, "DB is still at %s\n", dbname_.c_str()); + } else { + Options options; + options.env = env_; + EXPECT_OK(DestroyDB(dbname_, options)); + } + if (mem_env_) { + delete mem_env_; + mem_env_ = nullptr; + } + } + + protected: + virtual void PrepareManifest( + std::vector* column_families, + SequenceNumber* last_seqno, std::unique_ptr* log_writer) { assert(column_families != nullptr); assert(last_seqno != nullptr); assert(log_writer != nullptr); VersionEdit new_db; if (db_options_.write_dbid_to_manifest) { - DBImpl* impl = new DBImpl(DBOptions(), dbname_); + std::unique_ptr impl(new DBImpl(DBOptions(), dbname_)); std::string db_id; impl->GetDbIdentityFromIdentityFile(&db_id); new_db.SetDBId(db_id); @@ -715,12 +752,25 @@ class VersionSetTestBase { versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); } + void VerifyManifest(std::string* manifest_path) const { + assert(manifest_path != nullptr); + uint64_t manifest_file_number = 0; + Status s = versions_->GetCurrentManifestPath( + dbname_, fs_.get(), manifest_path, &manifest_file_number); + ASSERT_OK(s); + ASSERT_EQ(1, manifest_file_number); + } + + MockEnv* mem_env_; Env* env_; + std::shared_ptr env_guard_; std::shared_ptr fs_; const std::string dbname_; EnvOptions env_options_; + Options options_; ImmutableDBOptions db_options_; ColumnFamilyOptions cf_options_; + ImmutableCFOptions immutable_cf_options_; MutableCFOptions mutable_cf_options_; std::shared_ptr table_cache_; WriteController write_controller_; @@ -738,7 +788,7 @@ const std::string VersionSetTestBase::kColumnFamilyName3 = "charles"; class VersionSetTest : public VersionSetTestBase, public testing::Test { public: - VersionSetTest() : VersionSetTestBase() {} + VersionSetTest() : VersionSetTestBase("version_set_test") {} }; TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) { @@ -780,7 +830,8 @@ TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) { class VersionSetAtomicGroupTest : public VersionSetTestBase, public testing::Test { public: - VersionSetAtomicGroupTest() : VersionSetTestBase() {} + VersionSetAtomicGroupTest() + : VersionSetTestBase("version_set_atomic_group_test") {} void SetUp() override { PrepareManifest(&column_families_, &last_seqno_, &log_writer_); @@ -1164,7 +1215,8 @@ TEST_F(VersionSetAtomicGroupTest, class VersionSetTestDropOneCF : public VersionSetTestBase, public testing::TestWithParam { public: - VersionSetTestDropOneCF() : VersionSetTestBase() {} + VersionSetTestDropOneCF() + : VersionSetTestBase("version_set_test_drop_one_cf") {} }; // This test simulates the following execution sequence @@ -1279,6 +1331,706 @@ INSTANTIATE_TEST_CASE_P( testing::Values(VersionSetTestBase::kColumnFamilyName1, VersionSetTestBase::kColumnFamilyName2, VersionSetTestBase::kColumnFamilyName3)); + +class EmptyDefaultCfNewManifest : public VersionSetTestBase, + public testing::Test { + public: + EmptyDefaultCfNewManifest() : VersionSetTestBase("version_set_new_db_test") {} + // Emulate DBImpl::NewDB() + void PrepareManifest(std::vector* /*column_families*/, + SequenceNumber* /*last_seqno*/, + std::unique_ptr* log_writer) override { + assert(log_writer != nullptr); + VersionEdit new_db; + new_db.SetLogNumber(0); + std::unique_ptr file; + const std::string manifest_path = DescriptorFileName(dbname_, 1); + Status s = env_->NewWritableFile( + manifest_path, &file, env_->OptimizeForManifestWrite(env_options_)); + ASSERT_OK(s); + std::unique_ptr file_writer( + new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(file)), + manifest_path, env_options_)); + log_writer->reset(new log::Writer(std::move(file_writer), 0, true)); + std::string record; + ASSERT_TRUE(new_db.EncodeTo(&record)); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + // Create new column family + VersionEdit new_cf; + new_cf.AddColumnFamily(VersionSetTestBase::kColumnFamilyName1); + new_cf.SetColumnFamily(1); + new_cf.SetLastSequence(2); + new_cf.SetNextFile(2); + record.clear(); + ASSERT_TRUE(new_cf.EncodeTo(&record)); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + } + + protected: + bool write_dbid_to_manifest_ = false; + std::unique_ptr log_writer_; +}; + +// Create db, create column family. Cf creation will switch to a new MANIFEST. +// Then reopen db, trying to recover. +TEST_F(EmptyDefaultCfNewManifest, Recover) { + PrepareManifest(nullptr, nullptr, &log_writer_); + log_writer_.reset(); + Status s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr); + ASSERT_OK(s); + std::string manifest_path; + VerifyManifest(&manifest_path); + std::vector column_families; + column_families.emplace_back(kDefaultColumnFamilyName, cf_options_); + column_families.emplace_back(VersionSetTestBase::kColumnFamilyName1, + cf_options_); + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest( + manifest_path, column_families, false, &db_id, &has_missing_table_file); + ASSERT_OK(s); + ASSERT_FALSE(has_missing_table_file); +} + +class VersionSetTestEmptyDb + : public VersionSetTestBase, + public testing::TestWithParam< + std::tuple>> { + public: + static const std::string kUnknownColumnFamilyName; + VersionSetTestEmptyDb() : VersionSetTestBase("version_set_test_empty_db") {} + + protected: + void PrepareManifest(std::vector* /*column_families*/, + SequenceNumber* /*last_seqno*/, + std::unique_ptr* log_writer) override { + assert(nullptr != log_writer); + VersionEdit new_db; + if (db_options_.write_dbid_to_manifest) { + std::unique_ptr impl(new DBImpl(DBOptions(), dbname_)); + std::string db_id; + impl->GetDbIdentityFromIdentityFile(&db_id); + new_db.SetDBId(db_id); + } + const std::string manifest_path = DescriptorFileName(dbname_, 1); + std::unique_ptr file; + Status s = env_->NewWritableFile( + manifest_path, &file, env_->OptimizeForManifestWrite(env_options_)); + ASSERT_OK(s); + std::unique_ptr file_writer( + new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(file)), + manifest_path, env_options_)); + { + log_writer->reset(new log::Writer(std::move(file_writer), 0, false)); + std::string record; + new_db.EncodeTo(&record); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + } + } + + std::unique_ptr log_writer_; +}; + +const std::string VersionSetTestEmptyDb::kUnknownColumnFamilyName = "unknown"; + +TEST_P(VersionSetTestEmptyDb, OpenFromIncompleteManifest0) { + db_options_.write_dbid_to_manifest = std::get<0>(GetParam()); + PrepareManifest(nullptr, nullptr, &log_writer_); + log_writer_.reset(); + Status s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr); + ASSERT_OK(s); + + std::string manifest_path; + VerifyManifest(&manifest_path); + + bool read_only = std::get<1>(GetParam()); + const std::vector cf_names = std::get<2>(GetParam()); + + std::vector column_families; + for (const auto& cf_name : cf_names) { + column_families.emplace_back(cf_name, cf_options_); + } + + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families, + read_only, &db_id, + &has_missing_table_file); + auto iter = + std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName); + if (iter == cf_names.end()) { + ASSERT_TRUE(s.IsInvalidArgument()); + } else { + ASSERT_TRUE(s.IsCorruption()); + } +} + +TEST_P(VersionSetTestEmptyDb, OpenFromIncompleteManifest1) { + db_options_.write_dbid_to_manifest = std::get<0>(GetParam()); + PrepareManifest(nullptr, nullptr, &log_writer_); + // Only a subset of column families in the MANIFEST. + VersionEdit new_cf1; + new_cf1.AddColumnFamily(VersionSetTestBase::kColumnFamilyName1); + new_cf1.SetColumnFamily(1); + Status s; + { + std::string record; + new_cf1.EncodeTo(&record); + s = log_writer_->AddRecord(record); + ASSERT_OK(s); + } + log_writer_.reset(); + s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr); + ASSERT_OK(s); + + std::string manifest_path; + VerifyManifest(&manifest_path); + + bool read_only = std::get<1>(GetParam()); + const std::vector& cf_names = std::get<2>(GetParam()); + std::vector column_families; + for (const auto& cf_name : cf_names) { + column_families.emplace_back(cf_name, cf_options_); + } + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families, + read_only, &db_id, + &has_missing_table_file); + auto iter = + std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName); + if (iter == cf_names.end()) { + ASSERT_TRUE(s.IsInvalidArgument()); + } else { + ASSERT_TRUE(s.IsCorruption()); + } +} + +TEST_P(VersionSetTestEmptyDb, OpenFromInCompleteManifest2) { + db_options_.write_dbid_to_manifest = std::get<0>(GetParam()); + PrepareManifest(nullptr, nullptr, &log_writer_); + // Write all column families but no log_number, next_file_number and + // last_sequence. + const std::vector all_cf_names = { + kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2, + kColumnFamilyName3}; + uint32_t cf_id = 1; + Status s; + for (size_t i = 1; i != all_cf_names.size(); ++i) { + VersionEdit new_cf; + new_cf.AddColumnFamily(all_cf_names[i]); + new_cf.SetColumnFamily(cf_id++); + std::string record; + ASSERT_TRUE(new_cf.EncodeTo(&record)); + s = log_writer_->AddRecord(record); + ASSERT_OK(s); + } + log_writer_.reset(); + s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr); + ASSERT_OK(s); + + std::string manifest_path; + VerifyManifest(&manifest_path); + + bool read_only = std::get<1>(GetParam()); + const std::vector& cf_names = std::get<2>(GetParam()); + std::vector column_families; + for (const auto& cf_name : cf_names) { + column_families.emplace_back(cf_name, cf_options_); + } + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families, + read_only, &db_id, + &has_missing_table_file); + auto iter = + std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName); + if (iter == cf_names.end()) { + ASSERT_TRUE(s.IsInvalidArgument()); + } else { + ASSERT_TRUE(s.IsCorruption()); + } +} + +TEST_P(VersionSetTestEmptyDb, OpenManifestWithUnknownCF) { + db_options_.write_dbid_to_manifest = std::get<0>(GetParam()); + PrepareManifest(nullptr, nullptr, &log_writer_); + // Write all column families but no log_number, next_file_number and + // last_sequence. + const std::vector all_cf_names = { + kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2, + kColumnFamilyName3}; + uint32_t cf_id = 1; + Status s; + for (size_t i = 1; i != all_cf_names.size(); ++i) { + VersionEdit new_cf; + new_cf.AddColumnFamily(all_cf_names[i]); + new_cf.SetColumnFamily(cf_id++); + std::string record; + ASSERT_TRUE(new_cf.EncodeTo(&record)); + s = log_writer_->AddRecord(record); + ASSERT_OK(s); + } + { + VersionEdit tmp_edit; + tmp_edit.SetColumnFamily(4); + tmp_edit.SetLogNumber(0); + tmp_edit.SetNextFile(2); + tmp_edit.SetLastSequence(0); + std::string record; + ASSERT_TRUE(tmp_edit.EncodeTo(&record)); + s = log_writer_->AddRecord(record); + ASSERT_OK(s); + } + log_writer_.reset(); + s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr); + ASSERT_OK(s); + + std::string manifest_path; + VerifyManifest(&manifest_path); + + bool read_only = std::get<1>(GetParam()); + const std::vector& cf_names = std::get<2>(GetParam()); + std::vector column_families; + for (const auto& cf_name : cf_names) { + column_families.emplace_back(cf_name, cf_options_); + } + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families, + read_only, &db_id, + &has_missing_table_file); + auto iter = + std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName); + if (iter == cf_names.end()) { + ASSERT_TRUE(s.IsInvalidArgument()); + } else { + ASSERT_TRUE(s.IsCorruption()); + } +} + +TEST_P(VersionSetTestEmptyDb, OpenCompleteManifest) { + db_options_.write_dbid_to_manifest = std::get<0>(GetParam()); + PrepareManifest(nullptr, nullptr, &log_writer_); + // Write all column families but no log_number, next_file_number and + // last_sequence. + const std::vector all_cf_names = { + kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2, + kColumnFamilyName3}; + uint32_t cf_id = 1; + Status s; + for (size_t i = 1; i != all_cf_names.size(); ++i) { + VersionEdit new_cf; + new_cf.AddColumnFamily(all_cf_names[i]); + new_cf.SetColumnFamily(cf_id++); + std::string record; + ASSERT_TRUE(new_cf.EncodeTo(&record)); + s = log_writer_->AddRecord(record); + ASSERT_OK(s); + } + { + VersionEdit tmp_edit; + tmp_edit.SetLogNumber(0); + tmp_edit.SetNextFile(2); + tmp_edit.SetLastSequence(0); + std::string record; + ASSERT_TRUE(tmp_edit.EncodeTo(&record)); + s = log_writer_->AddRecord(record); + ASSERT_OK(s); + } + log_writer_.reset(); + s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr); + ASSERT_OK(s); + + std::string manifest_path; + VerifyManifest(&manifest_path); + + bool read_only = std::get<1>(GetParam()); + const std::vector& cf_names = std::get<2>(GetParam()); + std::vector column_families; + for (const auto& cf_name : cf_names) { + column_families.emplace_back(cf_name, cf_options_); + } + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families, + read_only, &db_id, + &has_missing_table_file); + auto iter = + std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName); + if (iter == cf_names.end()) { + ASSERT_TRUE(s.IsInvalidArgument()); + } else if (read_only) { + ASSERT_OK(s); + ASSERT_FALSE(has_missing_table_file); + } else if (cf_names.size() == all_cf_names.size()) { + ASSERT_OK(s); + ASSERT_FALSE(has_missing_table_file); + } else if (cf_names.size() < all_cf_names.size()) { + ASSERT_TRUE(s.IsInvalidArgument()); + } else { + ASSERT_OK(s); + ASSERT_FALSE(has_missing_table_file); + ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetColumnFamily( + kUnknownColumnFamilyName); + ASSERT_EQ(nullptr, cfd); + } +} + +INSTANTIATE_TEST_CASE_P( + BestEffortRecovery, VersionSetTestEmptyDb, + testing::Combine( + /*write_dbid_to_manifest=*/testing::Bool(), + /*read_only=*/testing::Bool(), + /*cf_names=*/ + testing::Values( + std::vector(), + std::vector({kDefaultColumnFamilyName}), + std::vector({VersionSetTestBase::kColumnFamilyName1, + VersionSetTestBase::kColumnFamilyName2, + VersionSetTestBase::kColumnFamilyName3}), + std::vector({kDefaultColumnFamilyName, + VersionSetTestBase::kColumnFamilyName1}), + std::vector({kDefaultColumnFamilyName, + VersionSetTestBase::kColumnFamilyName1, + VersionSetTestBase::kColumnFamilyName2, + VersionSetTestBase::kColumnFamilyName3}), + std::vector( + {kDefaultColumnFamilyName, + VersionSetTestBase::kColumnFamilyName1, + VersionSetTestBase::kColumnFamilyName2, + VersionSetTestBase::kColumnFamilyName3, + VersionSetTestEmptyDb::kUnknownColumnFamilyName})))); + +class VersionSetTestMissingFiles : public VersionSetTestBase, + public testing::Test { + public: + VersionSetTestMissingFiles() + : VersionSetTestBase("version_set_test_missing_files"), + block_based_table_options_(), + table_factory_(std::make_shared( + block_based_table_options_)), + internal_comparator_( + std::make_shared(options_.comparator)) {} + + protected: + void PrepareManifest(std::vector* column_families, + SequenceNumber* last_seqno, + std::unique_ptr* log_writer) override { + assert(column_families != nullptr); + assert(last_seqno != nullptr); + assert(log_writer != nullptr); + const std::string manifest = DescriptorFileName(dbname_, 1); + std::unique_ptr file; + Status s = env_->NewWritableFile( + manifest, &file, env_->OptimizeForManifestWrite(env_options_)); + ASSERT_OK(s); + std::unique_ptr file_writer(new WritableFileWriter( + NewLegacyWritableFileWrapper(std::move(file)), manifest, env_options_)); + log_writer->reset(new log::Writer(std::move(file_writer), 0, false)); + VersionEdit new_db; + if (db_options_.write_dbid_to_manifest) { + std::unique_ptr impl(new DBImpl(DBOptions(), dbname_)); + std::string db_id; + impl->GetDbIdentityFromIdentityFile(&db_id); + new_db.SetDBId(db_id); + } + { + std::string record; + ASSERT_TRUE(new_db.EncodeTo(&record)); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + } + const std::vector cf_names = { + kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2, + kColumnFamilyName3}; + uint32_t cf_id = 1; // default cf id is 0 + cf_options_.table_factory = table_factory_; + for (const auto& cf_name : cf_names) { + column_families->emplace_back(cf_name, cf_options_); + if (cf_name == kDefaultColumnFamilyName) { + continue; + } + VersionEdit new_cf; + new_cf.AddColumnFamily(cf_name); + new_cf.SetColumnFamily(cf_id); + std::string record; + ASSERT_TRUE(new_cf.EncodeTo(&record)); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + + VersionEdit cf_files; + cf_files.SetColumnFamily(cf_id); + cf_files.SetLogNumber(0); + record.clear(); + ASSERT_TRUE(cf_files.EncodeTo(&record)); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + ++cf_id; + } + SequenceNumber seq = 2; + { + VersionEdit edit; + edit.SetNextFile(7); + edit.SetLastSequence(seq); + std::string record; + ASSERT_TRUE(edit.EncodeTo(&record)); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + } + *last_seqno = seq + 1; + } + + struct SstInfo { + uint64_t file_number; + std::string column_family; + std::string key; // the only key + int level = 0; + SstInfo(uint64_t file_num, const std::string& cf_name, + const std::string& _key) + : SstInfo(file_num, cf_name, _key, 0) {} + SstInfo(uint64_t file_num, const std::string& cf_name, + const std::string& _key, int lvl) + : file_number(file_num), + column_family(cf_name), + key(_key), + level(lvl) {} + }; + + // Create dummy sst, return their metadata. Note that only file name and size + // are used. + void CreateDummyTableFiles(const std::vector& file_infos, + std::vector* file_metas) { + assert(file_metas != nullptr); + for (const auto& info : file_infos) { + uint64_t file_num = info.file_number; + std::string fname = MakeTableFileName(dbname_, file_num); + std::unique_ptr file; + Status s = fs_->NewWritableFile(fname, FileOptions(), &file, nullptr); + ASSERT_OK(s); + std::unique_ptr fwriter( + new WritableFileWriter(std::move(file), fname, FileOptions(), env_)); + std::vector> + int_tbl_prop_collector_factories; + + std::unique_ptr builder(table_factory_->NewTableBuilder( + TableBuilderOptions( + immutable_cf_options_, mutable_cf_options_, *internal_comparator_, + &int_tbl_prop_collector_factories, kNoCompression, + /*_sample_for_compression=*/0, CompressionOptions(), + /*_skip_filters=*/false, info.column_family, info.level), + TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, + fwriter.get())); + InternalKey ikey(info.key, 0, ValueType::kTypeValue); + builder->Add(ikey.Encode(), "value"); + ASSERT_OK(builder->Finish()); + fwriter->Flush(); + uint64_t file_size = 0; + s = fs_->GetFileSize(fname, IOOptions(), &file_size, nullptr); + ASSERT_OK(s); + ASSERT_NE(0, file_size); + FileMetaData meta; + meta = FileMetaData(file_num, /*file_path_id=*/0, file_size, ikey, ikey, + 0, 0, false, 0, 0, 0, kUnknownFileChecksum, + kUnknownFileChecksumFuncName); + file_metas->emplace_back(meta); + } + } + + // This method updates last_sequence_. + void WriteFileAdditionAndDeletionToManifest( + uint32_t cf, const std::vector>& added_files, + const std::vector>& deleted_files) { + VersionEdit edit; + edit.SetColumnFamily(cf); + for (const auto& elem : added_files) { + int level = elem.first; + edit.AddFile(level, elem.second); + } + for (const auto& elem : deleted_files) { + int level = elem.first; + edit.DeleteFile(level, elem.second); + } + edit.SetLastSequence(last_seqno_); + ++last_seqno_; + assert(log_writer_.get() != nullptr); + std::string record; + ASSERT_TRUE(edit.EncodeTo(&record)); + Status s = log_writer_->AddRecord(record); + ASSERT_OK(s); + } + + BlockBasedTableOptions block_based_table_options_; + std::shared_ptr table_factory_; + std::shared_ptr internal_comparator_; + std::vector column_families_; + SequenceNumber last_seqno_; + std::unique_ptr log_writer_; +}; + +TEST_F(VersionSetTestMissingFiles, ManifestFarBehindSst) { + std::vector existing_files = { + SstInfo(100, kDefaultColumnFamilyName, "a"), + SstInfo(102, kDefaultColumnFamilyName, "b"), + SstInfo(103, kDefaultColumnFamilyName, "c"), + SstInfo(107, kDefaultColumnFamilyName, "d"), + SstInfo(110, kDefaultColumnFamilyName, "e")}; + std::vector file_metas; + CreateDummyTableFiles(existing_files, &file_metas); + + PrepareManifest(&column_families_, &last_seqno_, &log_writer_); + std::vector> added_files; + for (uint64_t file_num = 10; file_num < 15; ++file_num) { + std::string smallest_ukey = "a"; + std::string largest_ukey = "b"; + InternalKey smallest_ikey(smallest_ukey, 1, ValueType::kTypeValue); + InternalKey largest_ikey(largest_ukey, 1, ValueType::kTypeValue); + FileMetaData meta = + FileMetaData(file_num, /*file_path_id=*/0, /*file_size=*/12, + smallest_ikey, largest_ikey, 0, 0, false, 0, 0, 0, + kUnknownFileChecksum, kUnknownFileChecksumFuncName); + added_files.emplace_back(0, meta); + } + WriteFileAdditionAndDeletionToManifest( + /*cf=*/0, added_files, std::vector>()); + std::vector> deleted_files; + deleted_files.emplace_back(0, 10); + WriteFileAdditionAndDeletionToManifest( + /*cf=*/0, std::vector>(), deleted_files); + log_writer_.reset(); + Status s = SetCurrentFile(env_, dbname_, 1, nullptr); + ASSERT_OK(s); + std::string manifest_path; + VerifyManifest(&manifest_path); + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families_, + /*read_only=*/false, &db_id, + &has_missing_table_file); + ASSERT_OK(s); + ASSERT_TRUE(has_missing_table_file); + for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) { + VersionStorageInfo* vstorage = cfd->current()->storage_info(); + const std::vector& files = vstorage->LevelFiles(0); + ASSERT_TRUE(files.empty()); + } +} + +TEST_F(VersionSetTestMissingFiles, ManifestAheadofSst) { + std::vector existing_files = { + SstInfo(100, kDefaultColumnFamilyName, "a"), + SstInfo(102, kDefaultColumnFamilyName, "b"), + SstInfo(103, kDefaultColumnFamilyName, "c"), + SstInfo(107, kDefaultColumnFamilyName, "d"), + SstInfo(110, kDefaultColumnFamilyName, "e")}; + std::vector file_metas; + CreateDummyTableFiles(existing_files, &file_metas); + + PrepareManifest(&column_families_, &last_seqno_, &log_writer_); + std::vector> added_files; + for (size_t i = 3; i != 5; ++i) { + added_files.emplace_back(0, file_metas[i]); + } + WriteFileAdditionAndDeletionToManifest( + /*cf=*/0, added_files, std::vector>()); + + added_files.clear(); + for (uint64_t file_num = 120; file_num < 130; ++file_num) { + std::string smallest_ukey = "a"; + std::string largest_ukey = "b"; + InternalKey smallest_ikey(smallest_ukey, 1, ValueType::kTypeValue); + InternalKey largest_ikey(largest_ukey, 1, ValueType::kTypeValue); + FileMetaData meta = + FileMetaData(file_num, /*file_path_id=*/0, /*file_size=*/12, + smallest_ikey, largest_ikey, 0, 0, false, 0, 0, 0, + kUnknownFileChecksum, kUnknownFileChecksumFuncName); + added_files.emplace_back(0, meta); + } + WriteFileAdditionAndDeletionToManifest( + /*cf=*/0, added_files, std::vector>()); + log_writer_.reset(); + Status s = SetCurrentFile(env_, dbname_, 1, nullptr); + ASSERT_OK(s); + std::string manifest_path; + VerifyManifest(&manifest_path); + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families_, + /*read_only=*/false, &db_id, + &has_missing_table_file); + ASSERT_OK(s); + ASSERT_TRUE(has_missing_table_file); + for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) { + VersionStorageInfo* vstorage = cfd->current()->storage_info(); + const std::vector& files = vstorage->LevelFiles(0); + if (cfd->GetName() == kDefaultColumnFamilyName) { + ASSERT_EQ(2, files.size()); + for (const auto* fmeta : files) { + if (fmeta->fd.GetNumber() != 107 && fmeta->fd.GetNumber() != 110) { + ASSERT_FALSE(true); + } + } + } else { + ASSERT_TRUE(files.empty()); + } + } +} + +TEST_F(VersionSetTestMissingFiles, NoFileMissing) { + std::vector existing_files = { + SstInfo(100, kDefaultColumnFamilyName, "a"), + SstInfo(102, kDefaultColumnFamilyName, "b"), + SstInfo(103, kDefaultColumnFamilyName, "c"), + SstInfo(107, kDefaultColumnFamilyName, "d"), + SstInfo(110, kDefaultColumnFamilyName, "e")}; + std::vector file_metas; + CreateDummyTableFiles(existing_files, &file_metas); + + PrepareManifest(&column_families_, &last_seqno_, &log_writer_); + std::vector> added_files; + for (const auto& meta : file_metas) { + added_files.emplace_back(0, meta); + } + WriteFileAdditionAndDeletionToManifest( + /*cf=*/0, added_files, std::vector>()); + std::vector> deleted_files; + deleted_files.emplace_back(/*level=*/0, 100); + WriteFileAdditionAndDeletionToManifest( + /*cf=*/0, std::vector>(), deleted_files); + log_writer_.reset(); + Status s = SetCurrentFile(env_, dbname_, 1, nullptr); + ASSERT_OK(s); + std::string manifest_path; + VerifyManifest(&manifest_path); + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families_, + /*read_only=*/false, &db_id, + &has_missing_table_file); + ASSERT_OK(s); + ASSERT_FALSE(has_missing_table_file); + for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) { + VersionStorageInfo* vstorage = cfd->current()->storage_info(); + const std::vector& files = vstorage->LevelFiles(0); + if (cfd->GetName() == kDefaultColumnFamilyName) { + ASSERT_EQ(existing_files.size() - deleted_files.size(), files.size()); + bool has_deleted_file = false; + for (const auto* fmeta : files) { + if (fmeta->fd.GetNumber() == 100) { + has_deleted_file = true; + break; + } + } + ASSERT_FALSE(has_deleted_file); + } else { + ASSERT_TRUE(files.empty()); + } + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/env/mock_env.cc b/env/mock_env.cc index ab249bbb8..ea63cd16a 100644 --- a/env/mock_env.cc +++ b/env/mock_env.cc @@ -10,6 +10,7 @@ #include "env/mock_env.h" #include #include +#include "file/filename.h" #include "port/sys_time.h" #include "util/cast_util.h" #include "util/murmurhash.h" @@ -747,17 +748,6 @@ Status MockEnv::CorruptBuffer(const std::string& fname) { return Status::OK(); } -std::string MockEnv::NormalizePath(const std::string path) { - std::string dst; - for (auto c : path) { - if (!dst.empty() && c == '/' && dst.back() == '/') { - continue; - } - dst.push_back(c); - } - return dst; -} - void MockEnv::FakeSleepForMicroseconds(int64_t micros) { fake_sleep_micros_.fetch_add(micros); } diff --git a/env/mock_env.h b/env/mock_env.h index 5cd9d499d..1ed5c0b1f 100644 --- a/env/mock_env.h +++ b/env/mock_env.h @@ -106,8 +106,6 @@ class MockEnv : public EnvWrapper { void FakeSleepForMicroseconds(int64_t micros); private: - std::string NormalizePath(const std::string path); - // Map from filenames to MemFile objects, representing a simple file system. typedef std::map FileSystem; port::Mutex mutex_; diff --git a/file/filename.cc b/file/filename.cc index a68e14c90..07cc08369 100644 --- a/file/filename.cc +++ b/file/filename.cc @@ -453,4 +453,16 @@ Status GetInfoLogFiles(Env* env, const std::string& db_log_dir, return Status::OK(); } +std::string NormalizePath(const std::string& path) { + std::string dst; + for (auto c : path) { + if (!dst.empty() && c == kFilePathSeparator && + dst.back() == kFilePathSeparator) { + continue; + } + dst.push_back(c); + } + return dst; +} + } // namespace ROCKSDB_NAMESPACE diff --git a/file/filename.h b/file/filename.h index 201429cf5..9a4be0a53 100644 --- a/file/filename.h +++ b/file/filename.h @@ -29,6 +29,12 @@ class Env; class Directory; class WritableFileWriter; +#ifdef OS_WIN +const char kFilePathSeparator = '\\'; +#else +const char kFilePathSeparator = '/'; +#endif + enum FileType { kLogFile, kDBLockFile, @@ -183,4 +189,6 @@ extern Status GetInfoLogFiles(Env* env, const std::string& db_log_dir, const std::string& dbname, std::string* parent_dir, std::vector* file_names); + +extern std::string NormalizePath(const std::string& path); } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index f5d44fb74..1976cdd1c 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1130,6 +1130,16 @@ struct DBOptions { // // Default: nullptr std::shared_ptr sst_file_checksum_func = nullptr; + + // By default, RocksDB recovery fails if any table file referenced in + // MANIFEST are missing after scanning the MANIFEST. + // Best-efforts recovery is another recovery mode that + // tries to restore the database to the most recent point in time without + // missing file. + // Currently not compatible with atomic flush. Furthermore, WAL files will + // not be used for recovery if best_efforts_recovery is true. + // Default: false + bool best_efforts_recovery = false; }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/options/db_options.cc b/options/db_options.cc index 0dcc8ef79..478897786 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -95,7 +95,8 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) persist_stats_to_disk(options.persist_stats_to_disk), write_dbid_to_manifest(options.write_dbid_to_manifest), log_readahead_size(options.log_readahead_size), - sst_file_checksum_func(options.sst_file_checksum_func) { + sst_file_checksum_func(options.sst_file_checksum_func), + best_efforts_recovery(options.best_efforts_recovery) { } void ImmutableDBOptions::Dump(Logger* log) const { @@ -250,6 +251,8 @@ void ImmutableDBOptions::Dump(Logger* log) const { sst_file_checksum_func ? sst_file_checksum_func->Name() : kUnknownFileChecksumFuncName.c_str()); + ROCKS_LOG_HEADER(log, " Options.best_efforts_recovery: %d", + static_cast(best_efforts_recovery)); } MutableDBOptions::MutableDBOptions() diff --git a/options/db_options.h b/options/db_options.h index cba298a21..3ab6b73eb 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -88,6 +88,7 @@ struct ImmutableDBOptions { bool write_dbid_to_manifest; size_t log_readahead_size; std::shared_ptr sst_file_checksum_func; + bool best_efforts_recovery; }; struct MutableDBOptions { diff --git a/options/options_helper.cc b/options/options_helper.cc index bc36dbcfe..3ebbcee4e 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -145,6 +145,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, immutable_db_options.avoid_unnecessary_blocking_io; options.log_readahead_size = immutable_db_options.log_readahead_size; options.sst_file_checksum_func = immutable_db_options.sst_file_checksum_func; + options.best_efforts_recovery = immutable_db_options.best_efforts_recovery; return options; } @@ -1681,6 +1682,9 @@ std::unordered_map {"log_readahead_size", {offsetof(struct DBOptions, log_readahead_size), OptionType::kSizeT, OptionVerificationType::kNormal, false, 0}}, + {"best_efforts_recovery", + {offsetof(struct DBOptions, best_efforts_recovery), + OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}, }; std::unordered_map diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index c5fee7b3e..057d772bf 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -303,7 +303,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "atomic_flush=false;" "avoid_unnecessary_blocking_io=false;" "log_readahead_size=0;" - "write_dbid_to_manifest=false", + "write_dbid_to_manifest=false;" + "best_efforts_recovery=false", new_options)); ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions), diff --git a/src.mk b/src.mk index 89558cf96..dd3218051 100644 --- a/src.mk +++ b/src.mk @@ -59,6 +59,7 @@ LIB_SOURCES = \ db/trim_history_scheduler.cc \ db/version_builder.cc \ db/version_edit.cc \ + db/version_edit_handler.cc \ db/version_set.cc \ db/wal_manager.cc \ db/write_batch.cc \