diff --git a/db/db_impl.h b/db/db_impl.h index 6a39ba5fe..415344e4e 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -491,6 +491,8 @@ class DBImpl : public DB { void MarkLogAsHavingPrepSectionFlushed(uint64_t log); void MarkLogAsContainingPrepSection(uint64_t log); + Status NewDB(); + protected: Env* const env_; const std::string dbname_; @@ -559,8 +561,6 @@ class DBImpl : public DB { struct PurgeFileInfo; - Status NewDB(); - // Recover the descriptor from persistent storage. May do a significant // amount of work to recover recently logged updates. Any changes to // be made to the descriptor are added to *edit. diff --git a/db/repair.cc b/db/repair.cc index 437474cf1..baedb5320 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -94,32 +94,69 @@ class Repairer { : dbname_(dbname), env_(options.env), icmp_(options.comparator), + cf_options_(options), options_(SanitizeOptions(dbname, &icmp_, options)), ioptions_(options_), + env_options_(), raw_table_cache_( // TableCache can be small since we expect each table to be opened // once. NewLRUCache(10, options_.table_cache_numshardbits)), + table_cache_( + new TableCache(ioptions_, env_options_, raw_table_cache_.get())), + wb_(options_.db_write_buffer_size), + wc_(options_.delayed_write_rate), + vset_(dbname_, &options_, env_options_, raw_table_cache_.get(), &wb_, + &wc_), next_file_number_(1) { GetIntTblPropCollectorFactory(options, &int_tbl_prop_collector_factories_); + } - table_cache_ = - new TableCache(ioptions_, env_options_, raw_table_cache_.get()); - edit_ = new VersionEdit(); + // Adds a column family to the VersionSet with cf_options_ and updates + // manifest. + Status AddColumnFamily(const std::string& cf_name, uint32_t cf_id) { + MutableCFOptions mut_cf_opts(options_, ioptions_); + + VersionEdit edit; + edit.SetComparatorName(icmp_.user_comparator()->Name()); + edit.SetLogNumber(0); + edit.SetColumnFamily(cf_id); + ColumnFamilyData* cfd; + cfd = nullptr; + edit.AddColumnFamily(cf_name); + + mutex_.Lock(); + Status status = vset_.LogAndApply( + cfd, mut_cf_opts, &edit, &mutex_, nullptr /* db_directory */, + false /* new_descriptor_log */, &cf_options_); + mutex_.Unlock(); + return status; } ~Repairer() { delete table_cache_; - raw_table_cache_.reset(); - delete edit_; } Status Run() { Status status = FindFiles(); + if (status.ok()) { + // Discard older manifests and start a fresh one + for (size_t i = 0; i < manifests_.size(); i++) { + ArchiveFile(dbname_ + "/" + manifests_[i]); + } + // Just create a DBImpl temporarily so we can reuse NewDB() + DBImpl* db_impl = new DBImpl(options_, dbname_); + status = db_impl->NewDB(); + delete db_impl; + } + if (status.ok()) { + // Recover using the fresh manifest created by NewDB() + status = vset_.Recover({{kDefaultColumnFamilyName, cf_options_}}, false); + } if (status.ok()) { ConvertLogFilesToTables(); ExtractMetaData(); - status = WriteDescriptor(); + status = AddTables(); } if (status.ok()) { uint64_t bytes = 0; @@ -149,18 +186,22 @@ class Repairer { const InternalKeyComparator icmp_; std::vector> int_tbl_prop_collector_factories_; - const Options options_; - const ImmutableCFOptions ioptions_; + const ColumnFamilyOptions cf_options_; // unsanitized + const Options options_; // sanitized + const ImmutableCFOptions ioptions_; // sanitized + const EnvOptions env_options_; std::shared_ptr raw_table_cache_; TableCache* table_cache_; - VersionEdit* edit_; + WriteBuffer wb_; + WriteController wc_; + VersionSet vset_; + InstrumentedMutex mutex_; std::vector manifests_; std::vector table_fds_; std::vector logs_; std::vector tables_; uint64_t next_file_number_; - const EnvOptions env_options_; Status FindFiles() { std::vector filenames; @@ -246,7 +287,7 @@ class Repairer { reporter.env = env_; reporter.info_log = options_.info_log; reporter.lognum = log; - // We intentially make log::Reader do checksumming so that + // We intentionally make log::Reader do checksumming so that // corruptions cause entire commits to be skipped instead of // propagating bad information (like overly large sequence // numbers). @@ -384,62 +425,34 @@ class Repairer { return status; } - Status WriteDescriptor() { - std::string tmp = TempFileName(dbname_, 1); - unique_ptr file; - EnvOptions env_options = env_->OptimizeForManifestWrite(env_options_); - Status status = env_->NewWritableFile(tmp, &file, env_options); - if (!status.ok()) { - return status; - } - + Status AddTables() { SequenceNumber max_sequence = 0; for (size_t i = 0; i < tables_.size(); i++) { if (max_sequence < tables_[i].max_sequence) { max_sequence = tables_[i].max_sequence; } } - - edit_->SetComparatorName(icmp_.user_comparator()->Name()); - edit_->SetLogNumber(0); - edit_->SetNextFile(next_file_number_); - edit_->SetLastSequence(max_sequence); - - for (size_t i = 0; i < tables_.size(); i++) { - // TODO(opt): separate out into multiple levels - const TableInfo& t = tables_[i]; - edit_->AddFile(0, t.meta.fd.GetNumber(), t.meta.fd.GetPathId(), - t.meta.fd.GetFileSize(), t.meta.smallest, t.meta.largest, - t.min_sequence, t.max_sequence, - t.meta.marked_for_compaction); - } - - //fprintf(stderr, "NewDescriptor:\n%s\n", edit_.DebugString().c_str()); - { - unique_ptr file_writer( - new WritableFileWriter(std::move(file), env_options)); - log::Writer log(std::move(file_writer), 0, false); - std::string record; - edit_->EncodeTo(&record); - status = log.AddRecord(record); - } - - if (!status.ok()) { - env_->DeleteFile(tmp); - } else { - // Discard older manifests - for (size_t i = 0; i < manifests_.size(); i++) { - ArchiveFile(dbname_ + "/" + manifests_[i]); - } - - // Install new manifest - status = env_->RenameFile(tmp, DescriptorFileName(dbname_, 1)); - if (status.ok()) { - status = SetCurrentFile(env_, dbname_, 1, nullptr); - } else { - env_->DeleteFile(tmp); - } + vset_.SetLastSequence(max_sequence); + + auto* cfd = vset_.GetColumnFamilySet()->GetDefault(); + VersionEdit edit; + edit.SetComparatorName(cfd->user_comparator()->Name()); + edit.SetLogNumber(0); + edit.SetNextFile(next_file_number_); + edit.SetColumnFamily(cfd->GetID()); + + // TODO(opt): separate out into multiple levels + for (const auto& table : tables_) { + edit.AddFile(0, table.meta.fd.GetNumber(), table.meta.fd.GetPathId(), + table.meta.fd.GetFileSize(), table.meta.smallest, + table.meta.largest, table.min_sequence, table.max_sequence, + table.meta.marked_for_compaction); } + mutex_.Lock(); + Status status = vset_.LogAndApply( + cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, + nullptr /* db_directory */, false /* new_descriptor_log */); + mutex_.Unlock(); return status; } @@ -464,7 +477,7 @@ class Repairer { fname.c_str(), s.ToString().c_str()); } }; -} // namespace +} // anonymous namespace Status RepairDB(const std::string& dbname, const Options& options) { Repairer repairer(dbname, options); diff --git a/db/repair_test.cc b/db/repair_test.cc index cdd46ecc9..f0c1d231d 100644 --- a/db/repair_test.cc +++ b/db/repair_test.cc @@ -49,7 +49,7 @@ TEST_F(RepairTest, LostManifest) { Close(); ASSERT_OK(env_->FileExists(manifest_path)); ASSERT_OK(env_->DeleteFile(manifest_path)); - RepairDB(dbname_, CurrentOptions()); + ASSERT_OK(RepairDB(dbname_, CurrentOptions())); Reopen(CurrentOptions()); ASSERT_EQ(Get("key"), "val"); @@ -70,7 +70,7 @@ TEST_F(RepairTest, CorruptManifest) { Close(); ASSERT_OK(env_->FileExists(manifest_path)); CreateFile(env_, manifest_path, "blah"); - RepairDB(dbname_, CurrentOptions()); + ASSERT_OK(RepairDB(dbname_, CurrentOptions())); Reopen(CurrentOptions()); ASSERT_EQ(Get("key"), "val"); @@ -96,7 +96,7 @@ TEST_F(RepairTest, IncompleteManifest) { ASSERT_OK(env_->FileExists(new_manifest_path)); // Replace the manifest with one that is only aware of the first SST file. CopyFile(orig_manifest_path + ".tmp", new_manifest_path); - RepairDB(dbname_, CurrentOptions()); + ASSERT_OK(RepairDB(dbname_, CurrentOptions())); Reopen(CurrentOptions()); ASSERT_EQ(Get("key"), "val"); @@ -115,7 +115,7 @@ TEST_F(RepairTest, LostSst) { ASSERT_OK(env_->DeleteFile(sst_path)); Close(); - RepairDB(dbname_, CurrentOptions()); + ASSERT_OK(RepairDB(dbname_, CurrentOptions())); Reopen(CurrentOptions()); // Exactly one of the key-value pairs should be in the DB now. @@ -134,7 +134,7 @@ TEST_F(RepairTest, CorruptSst) { CreateFile(env_, sst_path, "blah"); Close(); - RepairDB(dbname_, CurrentOptions()); + ASSERT_OK(RepairDB(dbname_, CurrentOptions())); Reopen(CurrentOptions()); // Exactly one of the key-value pairs should be in the DB now. @@ -159,7 +159,7 @@ TEST_F(RepairTest, UnflushedSst) { Close(); ASSERT_OK(env_->FileExists(manifest_path)); ASSERT_OK(env_->DeleteFile(manifest_path)); - RepairDB(dbname_, CurrentOptions()); + ASSERT_OK(RepairDB(dbname_, CurrentOptions())); Reopen(CurrentOptions()); ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files));