diff --git a/db/compaction.cc b/db/compaction.cc index 703e7aeae..536b7e233 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -26,7 +26,7 @@ Compaction::Compaction(Version* input_version, int level, int out_level, : level_(level), out_level_(out_level), max_output_file_size_(target_file_size), - maxGrandParentOverlapBytes_(max_grandparent_overlap_bytes), + max_grandparent_overlap_bytes_(max_grandparent_overlap_bytes), input_version_(input_version), number_levels_(input_version_->NumberLevels()), seek_compaction_(seek_compaction), @@ -64,7 +64,7 @@ bool Compaction::IsTrivialMove() const { return (level_ != out_level_ && num_input_files(0) == 1 && num_input_files(1) == 0 && - TotalFileSize(grandparents_) <= maxGrandParentOverlapBytes_); + TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_); } void Compaction::AddInputDeletions(VersionEdit* edit) { @@ -117,7 +117,7 @@ bool Compaction::ShouldStopBefore(const Slice& internal_key) { } seen_key_ = true; - if (overlapped_bytes_ > maxGrandParentOverlapBytes_) { + if (overlapped_bytes_ > max_grandparent_overlap_bytes_) { // Too much overlap for current output; start new output overlapped_bytes_ = 0; return true; diff --git a/db/compaction.h b/db/compaction.h index 5e696a053..efd6ef71f 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -33,9 +33,14 @@ class Compaction { // "which" must be either 0 or 1 int num_input_files(int which) const { return inputs_[which].size(); } + // Returns input version of the compaction + Version* input_version() const { return input_version_; } + // Return the ith input file at "level()+which" ("which" must be 0 or 1). FileMetaData* input(int which, int i) const { return inputs_[which][i]; } + std::vector* inputs(int which) { return &inputs_[which]; } + // Maximum size of files to build during this compaction. uint64_t MaxOutputFileSize() const { return max_output_file_size_; } @@ -74,8 +79,6 @@ class Compaction { bool IsFullCompaction() { return is_full_compaction_; } private: - friend class Version; - friend class VersionSet; friend class CompactionPicker; friend class UniversalCompactionPicker; friend class LevelCompactionPicker; @@ -87,7 +90,7 @@ class Compaction { int level_; int out_level_; // levels to which output files are stored uint64_t max_output_file_size_; - uint64_t maxGrandParentOverlapBytes_; + uint64_t max_grandparent_overlap_bytes_; Version* input_version_; VersionEdit* edit_; int number_levels_; diff --git a/db/db_impl.cc b/db/db_impl.cc index cb23c979e..5cdb8ae2d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -863,16 +863,13 @@ void DBImpl::PurgeObsoleteWALFiles() { } } -// If externalTable is set, then apply recovered transactions -// to that table. This is used for readonly mode. Status DBImpl::Recover( - VersionEdit* edit, - const std::vector& column_families, - MemTable* external_table, bool error_if_log_file_exist) { + const std::vector& column_families, bool read_only, + bool error_if_log_file_exist) { mutex_.AssertHeld(); assert(db_lock_ == nullptr); - if (!external_table) { + if (!read_only) { // We call CreateDirIfMissing() as the directory may already exist (if we // are reopening a DB), when this happens we don't want creating the // directory to cause an error. However, we need to check if creating the @@ -966,12 +963,12 @@ Status DBImpl::Recover( // Recover in the order in which the logs were generated std::sort(logs.begin(), logs.end()); - for (size_t i = 0; i < logs.size(); i++) { - s = RecoverLogFile(logs[i], edit, &max_sequence, external_table); + for (size_t i = 0; s.ok() && i < logs.size(); i++) { // The previous incarnation may not have written any MANIFEST // records after allocating this log number. So we manually // update the file number allocation counter in VersionSet. versions_->MarkFileNumberUsed(logs[i]); + s = RecoverLogFile(logs[i], &max_sequence, read_only); } if (s.ok()) { @@ -986,10 +983,8 @@ Status DBImpl::Recover( return s; } -Status DBImpl::RecoverLogFile(uint64_t log_number, - VersionEdit* edit, - SequenceNumber* max_sequence, - MemTable* external_table) { +Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, + bool read_only) { struct LogReporter : public log::Reader::Reporter { Env* env; Logger* info_log; @@ -1006,6 +1001,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, mutex_.AssertHeld(); + VersionEdit edit; + // Open the log file std::string fname = LogFileName(options_.wal_dir, log_number); unique_ptr file; @@ -1035,11 +1032,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, std::string scratch; Slice record; WriteBatch batch; - MemTable* mem = nullptr; - if (external_table) { - mem = external_table; - } - while (reader.ReadRecord(&record, &scratch) && status.ok()) { + bool memtable_empty = true; + while (reader.ReadRecord(&record, &scratch)) { if (record.size() < 12) { reporter.Corruption( record.size(), Status::Corruption("log record too small")); @@ -1047,14 +1041,11 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, } WriteBatchInternal::SetContents(&batch, record); - if (mem == nullptr) { - mem = new MemTable(internal_comparator_, options_); - mem->Ref(); - } - status = WriteBatchInternal::InsertInto(&batch, mem, &options_); + status = WriteBatchInternal::InsertInto(&batch, mem_, &options_); + memtable_empty = false; MaybeIgnoreError(&status); if (!status.ok()) { - break; + return status; } const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) + @@ -1063,28 +1054,44 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, *max_sequence = last_seq; } - if (!external_table && - mem->ApproximateMemoryUsage() > options_.write_buffer_size) { - status = WriteLevel0TableForRecovery(mem, edit); + if (!read_only && + mem_->ApproximateMemoryUsage() > options_.write_buffer_size) { + status = WriteLevel0TableForRecovery(mem_, &edit); + // we still want to clear memtable, even if the recovery failed + delete mem_->Unref(); + mem_ = new MemTable(internal_comparator_, options_); + mem_->Ref(); + memtable_empty = true; if (!status.ok()) { // Reflect errors immediately so that conditions like full // file-systems cause the DB::Open() to fail. - break; + return status; } - delete mem->Unref(); - mem = nullptr; } } - if (status.ok() && mem != nullptr && !external_table) { - status = WriteLevel0TableForRecovery(mem, edit); - // Reflect errors immediately so that conditions like full - // file-systems cause the DB::Open() to fail. + if (!memtable_empty && !read_only) { + status = WriteLevel0TableForRecovery(mem_, &edit); + delete mem_->Unref(); + mem_ = new MemTable(internal_comparator_, options_); + mem_->Ref(); + if (!status.ok()) { + return status; + } } - if (mem != nullptr && !external_table) { - delete mem->Unref(); + if (edit.NumEntries() > 0) { + // if read_only, NumEntries() will be 0 + assert(!read_only); + // writing log number in the manifest means that any log file + // with number strongly less than (log_number + 1) is already + // recovered and should be ignored on next reincarnation. + // Since we already recovered log_number, we want all logs + // with numbers `<= log_number` (includes this one) to be ignored + edit.SetLogNumber(log_number + 1); + status = versions_->LogAndApply(&edit, &mutex_); } + return status; } @@ -3939,9 +3946,7 @@ Status DB::OpenWithColumnFamilies( return s; } impl->mutex_.Lock(); - VersionEdit edit; - // Handles create_if_missing, error_if_exists - s = impl->Recover(&edit, column_families); + s = impl->Recover(); // Handles create_if_missing, error_if_exists if (s.ok()) { uint64_t new_log_number = impl->versions_->NewFileNumber(); unique_ptr lfile; @@ -3953,6 +3958,7 @@ Status DB::OpenWithColumnFamilies( ); if (s.ok()) { lfile->SetPreallocationBlockSize(1.1 * impl->options_.write_buffer_size); + VersionEdit edit; edit.SetLogNumber(new_log_number); impl->logfile_number_ = new_log_number; impl->log_.reset(new log::Writer(std::move(lfile))); diff --git a/db/db_impl.h b/db/db_impl.h index 9146df7bd..19cd3977a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -301,10 +301,8 @@ class DBImpl : public DB { // Recover the descriptor from persistent storage. May do a significant // amount of work to recover recently logged updates. Any changes to // be made to the descriptor are added to *edit. - Status Recover(VersionEdit* edit, - const std::vector& column_families, - MemTable* external_table = nullptr, - bool error_if_log_file_exist = false); + Status Recover(const std::vector& column_families, + bool read_only = false, bool error_if_log_file_exist = false); void MaybeIgnoreError(Status* s) const; @@ -318,10 +316,8 @@ class DBImpl : public DB { Status FlushMemTableToOutputFile(bool* madeProgress, DeletionState& deletion_state); - Status RecoverLogFile(uint64_t log_number, - VersionEdit* edit, - SequenceNumber* max_sequence, - MemTable* external_table); + Status RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, + bool read_only); // The following two methods are used to flush a memtable to // storage. The first one is used atdatabase RecoveryTime (when the diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index ad3395778..c94440170 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -85,14 +85,12 @@ Status DB::OpenForReadOnly(const Options& options, const std::string& dbname, DBImplReadOnly* impl = new DBImplReadOnly(options, dbname); impl->mutex_.Lock(); - VersionEdit edit; DBOptions db_options(options); ColumnFamilyOptions cf_options(options); std::vector column_families; column_families.push_back( ColumnFamilyDescriptor(default_column_family_name, cf_options)); - Status s = impl->Recover(&edit, column_families, impl->GetMemTable(), - error_if_log_file_exist); + Status s = impl->Recover(column_families, true /* read only */, error_if_log_file_exist); impl->mutex_.Unlock(); if (s.ok()) { *dbptr = impl; diff --git a/db/db_test.cc b/db/db_test.cc index 44ce16d60..30ec6c26a 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -672,6 +672,31 @@ class DBTest { ASSERT_EQ(IterStatus(iter), expected_key); delete iter; } + + void CopyFile(const std::string& source, const std::string& destination, + uint64_t size = 0) { + const EnvOptions soptions; + unique_ptr srcfile; + ASSERT_OK(env_->NewSequentialFile(source, &srcfile, soptions)); + unique_ptr destfile; + ASSERT_OK(env_->NewWritableFile(destination, &destfile, soptions)); + + if (size == 0) { + // default argument means copy everything + ASSERT_OK(env_->GetFileSize(source, &size)); + } + + char buffer[4096]; + Slice slice; + while (size > 0) { + uint64_t one = std::min(uint64_t(sizeof(buffer)), size); + ASSERT_OK(srcfile->Read(one, &slice, buffer)); + ASSERT_OK(destfile->Append(slice)); + size -= slice.size(); + } + ASSERT_OK(destfile->Close()); + } + }; static std::string Key(int i) { @@ -1474,6 +1499,82 @@ TEST(DBTest, Recover) { } while (ChangeOptions()); } +TEST(DBTest, IgnoreRecoveredLog) { + std::string backup_logs = dbname_ + "/backup_logs"; + + // delete old files in backup_logs directory + env_->CreateDirIfMissing(backup_logs); + std::vector old_files; + env_->GetChildren(backup_logs, &old_files); + for (auto& file : old_files) { + if (file != "." && file != "..") { + env_->DeleteFile(backup_logs + "/" + file); + } + } + + do { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.merge_operator = MergeOperators::CreateUInt64AddOperator(); + options.wal_dir = dbname_ + "/logs"; + DestroyAndReopen(&options); + + // fill up the DB + std::string one, two; + PutFixed64(&one, 1); + PutFixed64(&two, 2); + ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one))); + ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one))); + ASSERT_OK(db_->Merge(WriteOptions(), Slice("bar"), Slice(one))); + + // copy the logs to backup + std::vector logs; + env_->GetChildren(options.wal_dir, &logs); + for (auto& log : logs) { + if (log != ".." && log != ".") { + CopyFile(options.wal_dir + "/" + log, backup_logs + "/" + log); + } + } + + // recover the DB + Reopen(&options); + ASSERT_EQ(two, Get("foo")); + ASSERT_EQ(one, Get("bar")); + Close(); + + // copy the logs from backup back to wal dir + for (auto& log : logs) { + if (log != ".." && log != ".") { + CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log); + } + } + // this should ignore the log files, recovery should not happen again + // if the recovery happens, the same merge operator would be called twice, + // leading to incorrect results + Reopen(&options); + ASSERT_EQ(two, Get("foo")); + ASSERT_EQ(one, Get("bar")); + Close(); + Destroy(&options); + + // copy the logs from backup back to wal dir + env_->CreateDirIfMissing(options.wal_dir); + for (auto& log : logs) { + if (log != ".." && log != ".") { + CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log); + // we won't be needing this file no more + env_->DeleteFile(backup_logs + "/" + log); + } + } + // assert that we successfully recovered only from logs, even though we + // destroyed the DB + Reopen(&options); + ASSERT_EQ(two, Get("foo")); + ASSERT_EQ(one, Get("bar")); + Close(); + } while (ChangeOptions()); +} + TEST(DBTest, RollLog) { do { ASSERT_OK(Put("foo", "v1")); @@ -3616,7 +3717,6 @@ TEST(DBTest, BloomFilter) { TEST(DBTest, SnapshotFiles) { do { Options options = CurrentOptions(); - const EnvOptions soptions; options.write_buffer_size = 100000000; // Large write buffer Reopen(&options); @@ -3672,20 +3772,7 @@ TEST(DBTest, SnapshotFiles) { } } } - unique_ptr srcfile; - ASSERT_OK(env_->NewSequentialFile(src, &srcfile, soptions)); - unique_ptr destfile; - ASSERT_OK(env_->NewWritableFile(dest, &destfile, soptions)); - - char buffer[4096]; - Slice slice; - while (size > 0) { - uint64_t one = std::min(uint64_t(sizeof(buffer)), size); - ASSERT_OK(srcfile->Read(one, &slice, buffer)); - ASSERT_OK(destfile->Append(slice)); - size -= slice.size(); - } - ASSERT_OK(destfile->Close()); + CopyFile(src, dest, size); } // release file snapshot diff --git a/db/version_set.cc b/db/version_set.cc index f9d04bf37..e2c41ee63 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2073,23 +2073,21 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { // Level-0 files have to be merged together. For other levels, // we will make a concatenating iterator per level. // TODO(opt): use concatenating iterator for level-0 if there is no overlap - const int space = (c->level() == 0 ? c->inputs_[0].size() + 1 : 2); + const int space = (c->level() == 0 ? c->inputs(0)->size() + 1 : 2); Iterator** list = new Iterator*[space]; int num = 0; for (int which = 0; which < 2; which++) { - if (!c->inputs_[which].empty()) { + if (!c->inputs(which)->empty()) { if (c->level() + which == 0) { - const std::vector& files = c->inputs_[which]; - for (size_t i = 0; i < files.size(); i++) { + for (const auto& file : *c->inputs(which)) { list[num++] = table_cache_->NewIterator( - options, storage_options_compactions_, - files[i]->number, files[i]->file_size, nullptr, - true /* for compaction */); + options, storage_options_compactions_, file->number, + file->file_size, nullptr, true /* for compaction */); } } else { // Create concatenating iterator for the files from this level list[num++] = NewTwoLevelIterator( - new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]), + new Version::LevelFileNumIterator(icmp_, c->inputs(which)), &GetFileIterator, table_cache_, options, storage_options_, true /* for compaction */); } @@ -2115,7 +2113,7 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) { #ifndef NDEBUG // TODO this only works for default column family now Version* version = column_family_data_.find(0)->second->current; - if (c->input_version_ != version) { + if (c->input_version() != version) { Log(options_->info_log, "VerifyCompactionFileConsistency version mismatch"); } diff --git a/util/statistics.cc b/util/statistics.cc index f19a777c1..a850445ed 100644 --- a/util/statistics.cc +++ b/util/statistics.cc @@ -5,6 +5,7 @@ // #include "util/statistics.h" #include "rocksdb/statistics.h" +#include #include namespace rocksdb { @@ -13,7 +14,11 @@ std::shared_ptr CreateDBStatistics() { return std::make_shared(); } -StatisticsImpl::StatisticsImpl() {} +StatisticsImpl::StatisticsImpl() { + // Fill tickers_ with "zero". To ensure plasform indepedent, we used + // uint_fast64_t() instead literal `0` to represent zero. + std::fill(tickers_, tickers_ + TICKER_ENUM_MAX, uint_fast64_t()); +} StatisticsImpl::~StatisticsImpl() {}