From 3b7ed677de402f72168db441099b5525697e375f Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Fri, 24 Jun 2016 16:29:43 -0700 Subject: [PATCH] ColumnFamilyOptions API [CF + RepairDB part 3/3] Summary: Overload RepairDB to take vector-of-ColumnFamilyDescriptor, which tells us CF name + options. Also takes a ColumnFamilyOptions for unspecified column families encountered during the repair. One potentially confusing thing is that we store options in the constructor and don't invoke AddColumnFamily() until discovering the CF in ScanTable. This is because we don't know the CF ID until we find a table belonging to that CF. Depends on D59781. Test Plan: $ ./repair_test Reviewers: yhchiang, IslamAbdelRahman, sdong Reviewed By: sdong Subscribers: andrewkr, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D59853 --- db/repair.cc | 173 ++++++++++++++++++++++++++++++------------- db/repair_test.cc | 60 +++++++++++++++ include/rocksdb/db.h | 17 +++++ 3 files changed, 199 insertions(+), 51 deletions(-) diff --git a/db/repair.cc b/db/repair.cc index f676fa4d9..6e4f0e372 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -74,15 +74,16 @@ #include "db/memtable.h" #include "db/table_cache.h" #include "db/version_edit.h" -#include "db/writebuffer.h" #include "db/write_batch_internal.h" +#include "db/writebuffer.h" #include "rocksdb/comparator.h" #include "rocksdb/db.h" #include "rocksdb/env.h" -#include "rocksdb/options.h" #include "rocksdb/immutable_options.h" +#include "rocksdb/options.h" #include "table/scoped_arena_iterator.h" #include "util/file_reader_writer.h" +#include "util/string_util.h" namespace rocksdb { @@ -90,35 +91,60 @@ namespace { class Repairer { public: - Repairer(const std::string& dbname, const Options& options) + Repairer(const std::string& dbname, const DBOptions& db_options, + const std::vector& column_families, + const ColumnFamilyOptions& default_cf_opts, + const ColumnFamilyOptions& unknown_cf_opts, bool create_unknown_cfs) : dbname_(dbname), - env_(options.env), - icmp_(options.comparator), - cf_options_(options), - options_(SanitizeOptions(dbname, &icmp_, options)), - ioptions_(options_), + env_(db_options.env), env_options_(), + db_options_(SanitizeOptions(dbname_, db_options)), + icmp_(default_cf_opts.comparator), + default_cf_opts_(default_cf_opts), + default_cf_iopts_( + ImmutableCFOptions(Options(db_options_, default_cf_opts))), + unknown_cf_opts_(unknown_cf_opts), + create_unknown_cfs_(create_unknown_cfs), raw_table_cache_( // TableCache can be small since we expect each table to be opened // once. - NewLRUCache(10, options_.table_cache_numshardbits)), - table_cache_( - new TableCache(ioptions_, env_options_, raw_table_cache_.get())), - wb_(options_.db_write_buffer_size), - wc_(options_.delayed_write_rate), - vset_(dbname_, &options_, env_options_, raw_table_cache_.get(), &wb_, + NewLRUCache(10, db_options_.table_cache_numshardbits)), + table_cache_(new TableCache(default_cf_iopts_, env_options_, + raw_table_cache_.get())), + wb_(db_options_.db_write_buffer_size), + wc_(db_options_.delayed_write_rate), + vset_(dbname_, &db_options_, env_options_, raw_table_cache_.get(), &wb_, &wc_), next_file_number_(1) { - GetIntTblPropCollectorFactory(options, &int_tbl_prop_collector_factories_); + for (const auto& cfd : column_families) { + cf_name_to_opts_[cfd.name] = cfd.options; + } + } + + const ColumnFamilyOptions* GetColumnFamilyOptions( + const std::string& cf_name) { + if (cf_name_to_opts_.find(cf_name) == cf_name_to_opts_.end()) { + if (create_unknown_cfs_) { + return &unknown_cf_opts_; + } + return nullptr; + } + return &cf_name_to_opts_[cf_name]; } // Adds a column family to the VersionSet with cf_options_ and updates // manifest. Status AddColumnFamily(const std::string& cf_name, uint32_t cf_id) { - MutableCFOptions mut_cf_opts(options_, ioptions_); + const auto* cf_opts = GetColumnFamilyOptions(cf_name); + if (cf_opts == nullptr) { + return Status::Corruption("Encountered unknown column family with name=" + + cf_name + ", id=" + ToString(cf_id)); + } + Options opts(db_options_, *cf_opts); + MutableCFOptions mut_cf_opts(opts, ImmutableCFOptions(opts)); VersionEdit edit; - edit.SetComparatorName(icmp_.user_comparator()->Name()); + edit.SetComparatorName(opts.comparator->Name()); edit.SetLogNumber(0); edit.SetColumnFamily(cf_id); ColumnFamilyData* cfd; @@ -126,9 +152,9 @@ class Repairer { edit.AddColumnFamily(cf_name); mutex_.Lock(); - Status status = vset_.LogAndApply( - cfd, mut_cf_opts, &edit, &mutex_, nullptr /* db_directory */, - false /* new_descriptor_log */, &cf_options_); + Status status = vset_.LogAndApply(cfd, mut_cf_opts, &edit, &mutex_, + nullptr /* db_directory */, + false /* new_descriptor_log */, cf_opts); mutex_.Unlock(); return status; } @@ -145,13 +171,14 @@ class Repairer { ArchiveFile(dbname_ + "/" + manifests_[i]); } // Just create a DBImpl temporarily so we can reuse NewDB() - DBImpl* db_impl = new DBImpl(options_, dbname_); + DBImpl* db_impl = new DBImpl(db_options_, dbname_); status = db_impl->NewDB(); delete db_impl; } if (status.ok()) { // Recover using the fresh manifest created by NewDB() - status = vset_.Recover({{kDefaultColumnFamilyName, cf_options_}}, false); + status = + vset_.Recover({{kDefaultColumnFamilyName, default_cf_opts_}}, false); } if (status.ok()) { // Need to scan existing SST files first so the column families are @@ -171,7 +198,7 @@ class Repairer { for (size_t i = 0; i < tables_.size(); i++) { bytes += tables_[i].meta.fd.GetFileSize(); } - Log(InfoLogLevel::WARN_LEVEL, options_.info_log, + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, "**** Repaired rocksdb %s; " "recovered %" ROCKSDB_PRIszt " files; %" PRIu64 "bytes. " @@ -193,18 +220,19 @@ class Repairer { std::string const dbname_; Env* const env_; - const InternalKeyComparator icmp_; - std::vector> - int_tbl_prop_collector_factories_; - const ColumnFamilyOptions cf_options_; // unsanitized - const Options options_; // sanitized - const ImmutableCFOptions ioptions_; // sanitized const EnvOptions env_options_; + const DBOptions db_options_; + const InternalKeyComparator icmp_; + const ColumnFamilyOptions default_cf_opts_; + const ImmutableCFOptions default_cf_iopts_; // table_cache_ holds reference + const ColumnFamilyOptions unknown_cf_opts_; + const bool create_unknown_cfs_; std::shared_ptr raw_table_cache_; TableCache* table_cache_; WriteBuffer wb_; WriteController wc_; VersionSet vset_; + std::unordered_map cf_name_to_opts_; InstrumentedMutex mutex_; std::vector manifests_; @@ -216,9 +244,9 @@ class Repairer { Status FindFiles() { std::vector filenames; bool found_file = false; - for (size_t path_id = 0; path_id < options_.db_paths.size(); path_id++) { + for (size_t path_id = 0; path_id < db_options_.db_paths.size(); path_id++) { Status status = - env_->GetChildren(options_.db_paths[path_id].path, &filenames); + env_->GetChildren(db_options_.db_paths[path_id].path, &filenames); if (!status.ok()) { return status; } @@ -261,7 +289,7 @@ class Repairer { std::string logname = LogFileName(dbname_, logs_[i]); Status status = ConvertLogToTable(logs_[i]); if (!status.ok()) { - Log(InfoLogLevel::WARN_LEVEL, options_.info_log, + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, "Log #%" PRIu64 ": ignoring conversion error: %s", logs_[i], status.ToString().c_str()); } @@ -295,13 +323,13 @@ class Repairer { // Create the log reader. LogReporter reporter; reporter.env = env_; - reporter.info_log = options_.info_log; + reporter.info_log = db_options_.info_log; reporter.lognum = log; // We intentionally make log::Reader do checksumming so that // corruptions cause entire commits to be skipped instead of // propagating bad information (like overly large sequence // numbers). - log::Reader reader(options_.info_log, std::move(lfile_reader), &reporter, + log::Reader reader(db_options_.info_log, std::move(lfile_reader), &reporter, true /*enable checksum*/, 0 /*initial_offset*/, log); // Initialize per-column family memtables @@ -327,9 +355,8 @@ class Repairer { if (status.ok()) { counter += WriteBatchInternal::Count(&batch); } else { - Log(InfoLogLevel::WARN_LEVEL, - options_.info_log, "Log #%" PRIu64 ": ignoring %s", log, - status.ToString().c_str()); + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, + "Log #%" PRIu64 ": ignoring %s", log, status.ToString().c_str()); status = Status::OK(); // Keep going with rest of file } } @@ -356,7 +383,7 @@ class Repairer { cfd->GetID(), cfd->GetName(), {}, kMaxSequenceNumber, kNoCompression, CompressionOptions(), false, nullptr /* internal_stats */, TableFileCreationReason::kRecovery); - Log(InfoLogLevel::INFO_LEVEL, options_.info_log, + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", log, counter, meta.fd.GetNumber(), status.ToString().c_str()); if (status.ok()) { @@ -378,13 +405,12 @@ class Repairer { Status status = ScanTable(&t); if (!status.ok()) { std::string fname = TableFileName( - options_.db_paths, t.meta.fd.GetNumber(), t.meta.fd.GetPathId()); + db_options_.db_paths, t.meta.fd.GetNumber(), t.meta.fd.GetPathId()); char file_num_buf[kFormatFileNumberBufSize]; FormatFileNumber(t.meta.fd.GetNumber(), t.meta.fd.GetPathId(), file_num_buf, sizeof(file_num_buf)); - Log(InfoLogLevel::WARN_LEVEL, options_.info_log, - "Table #%s: ignoring %s", file_num_buf, - status.ToString().c_str()); + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, + "Table #%s: ignoring %s", file_num_buf, status.ToString().c_str()); ArchiveFile(fname); } else { tables_.push_back(t); @@ -393,8 +419,8 @@ class Repairer { } Status ScanTable(TableInfo* t) { - std::string fname = TableFileName(options_.db_paths, t->meta.fd.GetNumber(), - t->meta.fd.GetPathId()); + std::string fname = TableFileName( + db_options_.db_paths, t->meta.fd.GetNumber(), t->meta.fd.GetPathId()); int counter = 0; uint64_t file_size; Status status = env_->GetFileSize(fname, &file_size); @@ -409,7 +435,7 @@ class Repairer { t->column_family_id = static_cast(props->column_family_id); if (t->column_family_id == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) { - Log(InfoLogLevel::WARN_LEVEL, options_.info_log, + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, "Table #%" PRIu64 ": column family unknown (probably due to legacy format); " "adding to default column family id 0.", @@ -427,7 +453,7 @@ class Repairer { if (status.ok()) { cfd = vset_.GetColumnFamilySet()->GetColumnFamily(t->column_family_id); if (cfd->GetName() != props->column_family_name) { - Log(InfoLogLevel::ERROR_LEVEL, options_.info_log, + Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, "Table #%" PRIu64 ": inconsistent column family name '%s'; expected '%s' for column " "family id %" PRIu32 ".", @@ -446,7 +472,7 @@ class Repairer { for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { Slice key = iter->key(); if (!ParseInternalKey(key, &parsed)) { - Log(InfoLogLevel::ERROR_LEVEL, options_.info_log, + Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, "Table #%" PRIu64 ": unparsable key %s", t->meta.fd.GetNumber(), EscapeString(key).c_str()); continue; @@ -470,7 +496,7 @@ class Repairer { } delete iter; - Log(InfoLogLevel::INFO_LEVEL, options_.info_log, + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Table #%" PRIu64 ": %d entries %s", t->meta.fd.GetNumber(), counter, status.ToString().c_str()); } @@ -532,15 +558,60 @@ class Repairer { new_file.append("/"); new_file.append((slash == nullptr) ? fname.c_str() : slash + 1); Status s = env_->RenameFile(fname, new_file); - Log(InfoLogLevel::INFO_LEVEL, - options_.info_log, "Archiving %s: %s\n", + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Archiving %s: %s\n", fname.c_str(), s.ToString().c_str()); } }; + +Status GetDefaultCFOptions( + const std::vector& column_families, + ColumnFamilyOptions* res) { + assert(res != nullptr); + auto iter = std::find_if(column_families.begin(), column_families.end(), + [](const ColumnFamilyDescriptor& cfd) { + return cfd.name == kDefaultColumnFamilyName; + }); + if (iter == column_families.end()) { + return Status::InvalidArgument( + "column_families", "Must contain entry for default column family"); + } + *res = iter->options; + return Status::OK(); +} } // anonymous namespace +Status RepairDB(const std::string& dbname, const DBOptions& db_options, + const std::vector& column_families) { + ColumnFamilyOptions default_cf_opts; + Status status = GetDefaultCFOptions(column_families, &default_cf_opts); + if (status.ok()) { + Repairer repairer(dbname, db_options, column_families, default_cf_opts, + ColumnFamilyOptions() /* unknown_cf_opts */, + false /* create_unknown_cfs */); + status = repairer.Run(); + } + return status; +} + +Status RepairDB(const std::string& dbname, const DBOptions& db_options, + const std::vector& column_families, + const ColumnFamilyOptions& unknown_cf_opts) { + ColumnFamilyOptions default_cf_opts; + Status status = GetDefaultCFOptions(column_families, &default_cf_opts); + if (status.ok()) { + Repairer repairer(dbname, db_options, column_families, default_cf_opts, + unknown_cf_opts, true /* create_unknown_cfs */); + status = repairer.Run(); + } + return status; +} + Status RepairDB(const std::string& dbname, const Options& options) { - Repairer repairer(dbname, options); + DBOptions db_options(options); + ColumnFamilyOptions cf_options(options); + Repairer repairer(dbname, db_options, {}, cf_options /* default_cf_opts */, + cf_options /* unknown_cf_opts */, + true /* create_unknown_cfs */); return repairer.Run(); } diff --git a/db/repair_test.cc b/db/repair_test.cc index 86bda9053..93e5113b3 100644 --- a/db/repair_test.cc +++ b/db/repair_test.cc @@ -9,6 +9,7 @@ #include "db/db_impl.h" #include "db/db_test_util.h" +#include "rocksdb/comparator.h" #include "rocksdb/db.h" #include "rocksdb/transaction_log.h" #include "util/file_util.h" @@ -208,6 +209,65 @@ TEST_F(RepairTest, RepairMultipleColumnFamilies) { } } +TEST_F(RepairTest, RepairColumnFamilyOptions) { + // Verify repair logic uses correct ColumnFamilyOptions when repairing a + // database with different options for column families. + const int kNumCfs = 2; + const int kEntriesPerCf = 2; + + Options opts(CurrentOptions()), rev_opts(CurrentOptions()); + opts.comparator = BytewiseComparator(); + rev_opts.comparator = ReverseBytewiseComparator(); + + DestroyAndReopen(opts); + CreateColumnFamilies({"reverse"}, rev_opts); + ReopenWithColumnFamilies({"default", "reverse"}, + std::vector{opts, rev_opts}); + for (int i = 0; i < kNumCfs; ++i) { + for (int j = 0; j < kEntriesPerCf; ++j) { + Put(i, "key" + ToString(j), "val" + ToString(j)); + if (i == kNumCfs - 1 && j == kEntriesPerCf - 1) { + // Leave one unflushed so we can verify RepairDB's flush logic + continue; + } + Flush(i); + } + } + Close(); + + // RepairDB() records the comparator in the manifest, and DB::Open would fail + // if a different comparator were used. + ASSERT_OK(RepairDB(dbname_, opts, {{"default", opts}, {"reverse", rev_opts}}, + opts /* unknown_cf_opts */)); + ASSERT_OK(TryReopenWithColumnFamilies({"default", "reverse"}, + std::vector{opts, rev_opts})); + for (int i = 0; i < kNumCfs; ++i) { + for (int j = 0; j < kEntriesPerCf; ++j) { + ASSERT_EQ(Get(i, "key" + ToString(j)), "val" + ToString(j)); + } + } + + // Examine table properties to verify RepairDB() used the right options when + // converting WAL->SST + TablePropertiesCollection fname_to_props; + db_->GetPropertiesOfAllTables(handles_[1], &fname_to_props); + ASSERT_EQ(fname_to_props.size(), 2U); + for (const auto& fname_and_props : fname_to_props) { + ASSERT_EQ(InternalKeyComparator(rev_opts.comparator).Name(), + fname_and_props.second->comparator_name); + } + + // Also check comparator when it's provided via "unknown" CF options + ASSERT_OK(RepairDB(dbname_, opts, {{"default", opts}}, + rev_opts /* unknown_cf_opts */)); + ASSERT_OK(TryReopenWithColumnFamilies({"default", "reverse"}, + std::vector{opts, rev_opts})); + for (int i = 0; i < kNumCfs; ++i) { + for (int j = 0; j < kEntriesPerCf; ++j) { + ASSERT_EQ(Get(i, "key" + ToString(j)), "val" + ToString(j)); + } + } +} } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 5338f88fd..e644303b0 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -859,7 +859,24 @@ Status DestroyDB(const std::string& name, const Options& options); // resurrect as much of the contents of the database as possible. // Some data may be lost, so be careful when calling this function // on a database that contains important information. +// +// With this API, we will warn and skip data associated with column families not +// specified in column_families. +// +// @param column_families Descriptors for known column families +Status RepairDB(const std::string& dbname, const DBOptions& db_options, + const std::vector& column_families); + +// @param unknown_cf_opts Options for column families encountered during the +// repair that were not specified in column_families. +Status RepairDB(const std::string& dbname, const DBOptions& db_options, + const std::vector& column_families, + const ColumnFamilyOptions& unknown_cf_opts); + +// @param options These options will be used for the database and for ALL column +// families encountered during the repair Status RepairDB(const std::string& dbname, const Options& options); + #endif } // namespace rocksdb