diff --git a/db/repair.cc b/db/repair.cc index f676fa4d9..6e4f0e372 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -74,15 +74,16 @@ #include "db/memtable.h" #include "db/table_cache.h" #include "db/version_edit.h" -#include "db/writebuffer.h" #include "db/write_batch_internal.h" +#include "db/writebuffer.h" #include "rocksdb/comparator.h" #include "rocksdb/db.h" #include "rocksdb/env.h" -#include "rocksdb/options.h" #include "rocksdb/immutable_options.h" +#include "rocksdb/options.h" #include "table/scoped_arena_iterator.h" #include "util/file_reader_writer.h" +#include "util/string_util.h" namespace rocksdb { @@ -90,35 +91,60 @@ namespace { class Repairer { public: - Repairer(const std::string& dbname, const Options& options) + Repairer(const std::string& dbname, const DBOptions& db_options, + const std::vector& column_families, + const ColumnFamilyOptions& default_cf_opts, + const ColumnFamilyOptions& unknown_cf_opts, bool create_unknown_cfs) : dbname_(dbname), - env_(options.env), - icmp_(options.comparator), - cf_options_(options), - options_(SanitizeOptions(dbname, &icmp_, options)), - ioptions_(options_), + env_(db_options.env), env_options_(), + db_options_(SanitizeOptions(dbname_, db_options)), + icmp_(default_cf_opts.comparator), + default_cf_opts_(default_cf_opts), + default_cf_iopts_( + ImmutableCFOptions(Options(db_options_, default_cf_opts))), + unknown_cf_opts_(unknown_cf_opts), + create_unknown_cfs_(create_unknown_cfs), raw_table_cache_( // TableCache can be small since we expect each table to be opened // once. - NewLRUCache(10, options_.table_cache_numshardbits)), - table_cache_( - new TableCache(ioptions_, env_options_, raw_table_cache_.get())), - wb_(options_.db_write_buffer_size), - wc_(options_.delayed_write_rate), - vset_(dbname_, &options_, env_options_, raw_table_cache_.get(), &wb_, + NewLRUCache(10, db_options_.table_cache_numshardbits)), + table_cache_(new TableCache(default_cf_iopts_, env_options_, + raw_table_cache_.get())), + wb_(db_options_.db_write_buffer_size), + wc_(db_options_.delayed_write_rate), + vset_(dbname_, &db_options_, env_options_, raw_table_cache_.get(), &wb_, &wc_), next_file_number_(1) { - GetIntTblPropCollectorFactory(options, &int_tbl_prop_collector_factories_); + for (const auto& cfd : column_families) { + cf_name_to_opts_[cfd.name] = cfd.options; + } + } + + const ColumnFamilyOptions* GetColumnFamilyOptions( + const std::string& cf_name) { + if (cf_name_to_opts_.find(cf_name) == cf_name_to_opts_.end()) { + if (create_unknown_cfs_) { + return &unknown_cf_opts_; + } + return nullptr; + } + return &cf_name_to_opts_[cf_name]; } // Adds a column family to the VersionSet with cf_options_ and updates // manifest. Status AddColumnFamily(const std::string& cf_name, uint32_t cf_id) { - MutableCFOptions mut_cf_opts(options_, ioptions_); + const auto* cf_opts = GetColumnFamilyOptions(cf_name); + if (cf_opts == nullptr) { + return Status::Corruption("Encountered unknown column family with name=" + + cf_name + ", id=" + ToString(cf_id)); + } + Options opts(db_options_, *cf_opts); + MutableCFOptions mut_cf_opts(opts, ImmutableCFOptions(opts)); VersionEdit edit; - edit.SetComparatorName(icmp_.user_comparator()->Name()); + edit.SetComparatorName(opts.comparator->Name()); edit.SetLogNumber(0); edit.SetColumnFamily(cf_id); ColumnFamilyData* cfd; @@ -126,9 +152,9 @@ class Repairer { edit.AddColumnFamily(cf_name); mutex_.Lock(); - Status status = vset_.LogAndApply( - cfd, mut_cf_opts, &edit, &mutex_, nullptr /* db_directory */, - false /* new_descriptor_log */, &cf_options_); + Status status = vset_.LogAndApply(cfd, mut_cf_opts, &edit, &mutex_, + nullptr /* db_directory */, + false /* new_descriptor_log */, cf_opts); mutex_.Unlock(); return status; } @@ -145,13 +171,14 @@ class Repairer { ArchiveFile(dbname_ + "/" + manifests_[i]); } // Just create a DBImpl temporarily so we can reuse NewDB() - DBImpl* db_impl = new DBImpl(options_, dbname_); + DBImpl* db_impl = new DBImpl(db_options_, dbname_); status = db_impl->NewDB(); delete db_impl; } if (status.ok()) { // Recover using the fresh manifest created by NewDB() - status = vset_.Recover({{kDefaultColumnFamilyName, cf_options_}}, false); + status = + vset_.Recover({{kDefaultColumnFamilyName, default_cf_opts_}}, false); } if (status.ok()) { // Need to scan existing SST files first so the column families are @@ -171,7 +198,7 @@ class Repairer { for (size_t i = 0; i < tables_.size(); i++) { bytes += tables_[i].meta.fd.GetFileSize(); } - Log(InfoLogLevel::WARN_LEVEL, options_.info_log, + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, "**** Repaired rocksdb %s; " "recovered %" ROCKSDB_PRIszt " files; %" PRIu64 "bytes. " @@ -193,18 +220,19 @@ class Repairer { std::string const dbname_; Env* const env_; - const InternalKeyComparator icmp_; - std::vector> - int_tbl_prop_collector_factories_; - const ColumnFamilyOptions cf_options_; // unsanitized - const Options options_; // sanitized - const ImmutableCFOptions ioptions_; // sanitized const EnvOptions env_options_; + const DBOptions db_options_; + const InternalKeyComparator icmp_; + const ColumnFamilyOptions default_cf_opts_; + const ImmutableCFOptions default_cf_iopts_; // table_cache_ holds reference + const ColumnFamilyOptions unknown_cf_opts_; + const bool create_unknown_cfs_; std::shared_ptr raw_table_cache_; TableCache* table_cache_; WriteBuffer wb_; WriteController wc_; VersionSet vset_; + std::unordered_map cf_name_to_opts_; InstrumentedMutex mutex_; std::vector manifests_; @@ -216,9 +244,9 @@ class Repairer { Status FindFiles() { std::vector filenames; bool found_file = false; - for (size_t path_id = 0; path_id < options_.db_paths.size(); path_id++) { + for (size_t path_id = 0; path_id < db_options_.db_paths.size(); path_id++) { Status status = - env_->GetChildren(options_.db_paths[path_id].path, &filenames); + env_->GetChildren(db_options_.db_paths[path_id].path, &filenames); if (!status.ok()) { return status; } @@ -261,7 +289,7 @@ class Repairer { std::string logname = LogFileName(dbname_, logs_[i]); Status status = ConvertLogToTable(logs_[i]); if (!status.ok()) { - Log(InfoLogLevel::WARN_LEVEL, options_.info_log, + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, "Log #%" PRIu64 ": ignoring conversion error: %s", logs_[i], status.ToString().c_str()); } @@ -295,13 +323,13 @@ class Repairer { // Create the log reader. LogReporter reporter; reporter.env = env_; - reporter.info_log = options_.info_log; + reporter.info_log = db_options_.info_log; reporter.lognum = log; // We intentionally make log::Reader do checksumming so that // corruptions cause entire commits to be skipped instead of // propagating bad information (like overly large sequence // numbers). - log::Reader reader(options_.info_log, std::move(lfile_reader), &reporter, + log::Reader reader(db_options_.info_log, std::move(lfile_reader), &reporter, true /*enable checksum*/, 0 /*initial_offset*/, log); // Initialize per-column family memtables @@ -327,9 +355,8 @@ class Repairer { if (status.ok()) { counter += WriteBatchInternal::Count(&batch); } else { - Log(InfoLogLevel::WARN_LEVEL, - options_.info_log, "Log #%" PRIu64 ": ignoring %s", log, - status.ToString().c_str()); + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, + "Log #%" PRIu64 ": ignoring %s", log, status.ToString().c_str()); status = Status::OK(); // Keep going with rest of file } } @@ -356,7 +383,7 @@ class Repairer { cfd->GetID(), cfd->GetName(), {}, kMaxSequenceNumber, kNoCompression, CompressionOptions(), false, nullptr /* internal_stats */, TableFileCreationReason::kRecovery); - Log(InfoLogLevel::INFO_LEVEL, options_.info_log, + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", log, counter, meta.fd.GetNumber(), status.ToString().c_str()); if (status.ok()) { @@ -378,13 +405,12 @@ class Repairer { Status status = ScanTable(&t); if (!status.ok()) { std::string fname = TableFileName( - options_.db_paths, t.meta.fd.GetNumber(), t.meta.fd.GetPathId()); + db_options_.db_paths, t.meta.fd.GetNumber(), t.meta.fd.GetPathId()); char file_num_buf[kFormatFileNumberBufSize]; FormatFileNumber(t.meta.fd.GetNumber(), t.meta.fd.GetPathId(), file_num_buf, sizeof(file_num_buf)); - Log(InfoLogLevel::WARN_LEVEL, options_.info_log, - "Table #%s: ignoring %s", file_num_buf, - status.ToString().c_str()); + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, + "Table #%s: ignoring %s", file_num_buf, status.ToString().c_str()); ArchiveFile(fname); } else { tables_.push_back(t); @@ -393,8 +419,8 @@ class Repairer { } Status ScanTable(TableInfo* t) { - std::string fname = TableFileName(options_.db_paths, t->meta.fd.GetNumber(), - t->meta.fd.GetPathId()); + std::string fname = TableFileName( + db_options_.db_paths, t->meta.fd.GetNumber(), t->meta.fd.GetPathId()); int counter = 0; uint64_t file_size; Status status = env_->GetFileSize(fname, &file_size); @@ -409,7 +435,7 @@ class Repairer { t->column_family_id = static_cast(props->column_family_id); if (t->column_family_id == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) { - Log(InfoLogLevel::WARN_LEVEL, options_.info_log, + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, "Table #%" PRIu64 ": column family unknown (probably due to legacy format); " "adding to default column family id 0.", @@ -427,7 +453,7 @@ class Repairer { if (status.ok()) { cfd = vset_.GetColumnFamilySet()->GetColumnFamily(t->column_family_id); if (cfd->GetName() != props->column_family_name) { - Log(InfoLogLevel::ERROR_LEVEL, options_.info_log, + Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, "Table #%" PRIu64 ": inconsistent column family name '%s'; expected '%s' for column " "family id %" PRIu32 ".", @@ -446,7 +472,7 @@ class Repairer { for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { Slice key = iter->key(); if (!ParseInternalKey(key, &parsed)) { - Log(InfoLogLevel::ERROR_LEVEL, options_.info_log, + Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, "Table #%" PRIu64 ": unparsable key %s", t->meta.fd.GetNumber(), EscapeString(key).c_str()); continue; @@ -470,7 +496,7 @@ class Repairer { } delete iter; - Log(InfoLogLevel::INFO_LEVEL, options_.info_log, + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Table #%" PRIu64 ": %d entries %s", t->meta.fd.GetNumber(), counter, status.ToString().c_str()); } @@ -532,15 +558,60 @@ class Repairer { new_file.append("/"); new_file.append((slash == nullptr) ? fname.c_str() : slash + 1); Status s = env_->RenameFile(fname, new_file); - Log(InfoLogLevel::INFO_LEVEL, - options_.info_log, "Archiving %s: %s\n", + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Archiving %s: %s\n", fname.c_str(), s.ToString().c_str()); } }; + +Status GetDefaultCFOptions( + const std::vector& column_families, + ColumnFamilyOptions* res) { + assert(res != nullptr); + auto iter = std::find_if(column_families.begin(), column_families.end(), + [](const ColumnFamilyDescriptor& cfd) { + return cfd.name == kDefaultColumnFamilyName; + }); + if (iter == column_families.end()) { + return Status::InvalidArgument( + "column_families", "Must contain entry for default column family"); + } + *res = iter->options; + return Status::OK(); +} } // anonymous namespace +Status RepairDB(const std::string& dbname, const DBOptions& db_options, + const std::vector& column_families) { + ColumnFamilyOptions default_cf_opts; + Status status = GetDefaultCFOptions(column_families, &default_cf_opts); + if (status.ok()) { + Repairer repairer(dbname, db_options, column_families, default_cf_opts, + ColumnFamilyOptions() /* unknown_cf_opts */, + false /* create_unknown_cfs */); + status = repairer.Run(); + } + return status; +} + +Status RepairDB(const std::string& dbname, const DBOptions& db_options, + const std::vector& column_families, + const ColumnFamilyOptions& unknown_cf_opts) { + ColumnFamilyOptions default_cf_opts; + Status status = GetDefaultCFOptions(column_families, &default_cf_opts); + if (status.ok()) { + Repairer repairer(dbname, db_options, column_families, default_cf_opts, + unknown_cf_opts, true /* create_unknown_cfs */); + status = repairer.Run(); + } + return status; +} + Status RepairDB(const std::string& dbname, const Options& options) { - Repairer repairer(dbname, options); + DBOptions db_options(options); + ColumnFamilyOptions cf_options(options); + Repairer repairer(dbname, db_options, {}, cf_options /* default_cf_opts */, + cf_options /* unknown_cf_opts */, + true /* create_unknown_cfs */); return repairer.Run(); } diff --git a/db/repair_test.cc b/db/repair_test.cc index 86bda9053..93e5113b3 100644 --- a/db/repair_test.cc +++ b/db/repair_test.cc @@ -9,6 +9,7 @@ #include "db/db_impl.h" #include "db/db_test_util.h" +#include "rocksdb/comparator.h" #include "rocksdb/db.h" #include "rocksdb/transaction_log.h" #include "util/file_util.h" @@ -208,6 +209,65 @@ TEST_F(RepairTest, RepairMultipleColumnFamilies) { } } +TEST_F(RepairTest, RepairColumnFamilyOptions) { + // Verify repair logic uses correct ColumnFamilyOptions when repairing a + // database with different options for column families. + const int kNumCfs = 2; + const int kEntriesPerCf = 2; + + Options opts(CurrentOptions()), rev_opts(CurrentOptions()); + opts.comparator = BytewiseComparator(); + rev_opts.comparator = ReverseBytewiseComparator(); + + DestroyAndReopen(opts); + CreateColumnFamilies({"reverse"}, rev_opts); + ReopenWithColumnFamilies({"default", "reverse"}, + std::vector{opts, rev_opts}); + for (int i = 0; i < kNumCfs; ++i) { + for (int j = 0; j < kEntriesPerCf; ++j) { + Put(i, "key" + ToString(j), "val" + ToString(j)); + if (i == kNumCfs - 1 && j == kEntriesPerCf - 1) { + // Leave one unflushed so we can verify RepairDB's flush logic + continue; + } + Flush(i); + } + } + Close(); + + // RepairDB() records the comparator in the manifest, and DB::Open would fail + // if a different comparator were used. + ASSERT_OK(RepairDB(dbname_, opts, {{"default", opts}, {"reverse", rev_opts}}, + opts /* unknown_cf_opts */)); + ASSERT_OK(TryReopenWithColumnFamilies({"default", "reverse"}, + std::vector{opts, rev_opts})); + for (int i = 0; i < kNumCfs; ++i) { + for (int j = 0; j < kEntriesPerCf; ++j) { + ASSERT_EQ(Get(i, "key" + ToString(j)), "val" + ToString(j)); + } + } + + // Examine table properties to verify RepairDB() used the right options when + // converting WAL->SST + TablePropertiesCollection fname_to_props; + db_->GetPropertiesOfAllTables(handles_[1], &fname_to_props); + ASSERT_EQ(fname_to_props.size(), 2U); + for (const auto& fname_and_props : fname_to_props) { + ASSERT_EQ(InternalKeyComparator(rev_opts.comparator).Name(), + fname_and_props.second->comparator_name); + } + + // Also check comparator when it's provided via "unknown" CF options + ASSERT_OK(RepairDB(dbname_, opts, {{"default", opts}}, + rev_opts /* unknown_cf_opts */)); + ASSERT_OK(TryReopenWithColumnFamilies({"default", "reverse"}, + std::vector{opts, rev_opts})); + for (int i = 0; i < kNumCfs; ++i) { + for (int j = 0; j < kEntriesPerCf; ++j) { + ASSERT_EQ(Get(i, "key" + ToString(j)), "val" + ToString(j)); + } + } +} } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 5338f88fd..e644303b0 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -859,7 +859,24 @@ Status DestroyDB(const std::string& name, const Options& options); // resurrect as much of the contents of the database as possible. // Some data may be lost, so be careful when calling this function // on a database that contains important information. +// +// With this API, we will warn and skip data associated with column families not +// specified in column_families. +// +// @param column_families Descriptors for known column families +Status RepairDB(const std::string& dbname, const DBOptions& db_options, + const std::vector& column_families); + +// @param unknown_cf_opts Options for column families encountered during the +// repair that were not specified in column_families. +Status RepairDB(const std::string& dbname, const DBOptions& db_options, + const std::vector& column_families, + const ColumnFamilyOptions& unknown_cf_opts); + +// @param options These options will be used for the database and for ALL column +// families encountered during the repair Status RepairDB(const std::string& dbname, const Options& options); + #endif } // namespace rocksdb