From 4036d58dc904a7a552280e6151f73679c9d7bc15 Mon Sep 17 00:00:00 2001 From: kailiu Date: Tue, 21 Jan 2014 17:51:36 -0800 Subject: [PATCH 1/3] Fix a Statistics-related unit test faulure Summary: In my MacOS, the member variables are populated with random numbers after initialization. This diff fixes it by fill these arrays with 0. Test Plan: make && ./table_test Reviewers: igor CC: leveldb Differential Revision: https://reviews.facebook.net/D15315 --- util/statistics.cc | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/util/statistics.cc b/util/statistics.cc index f19a777c1..a850445ed 100644 --- a/util/statistics.cc +++ b/util/statistics.cc @@ -5,6 +5,7 @@ // #include "util/statistics.h" #include "rocksdb/statistics.h" +#include #include namespace rocksdb { @@ -13,7 +14,11 @@ std::shared_ptr CreateDBStatistics() { return std::make_shared(); } -StatisticsImpl::StatisticsImpl() {} +StatisticsImpl::StatisticsImpl() { + // Fill tickers_ with "zero". To ensure plasform indepedent, we used + // uint_fast64_t() instead literal `0` to represent zero. + std::fill(tickers_, tickers_ + TICKER_ENUM_MAX, uint_fast64_t()); +} StatisticsImpl::~StatisticsImpl() {} From 6fe9b5774886a64d749ed76127e1d23424d3a406 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 22 Jan 2014 10:45:26 -0800 Subject: [PATCH 2/3] Refactor Recover() code Summary: This diff does two things: * Rethinks how we call Recover() with read_only option. Before, we call it with pointer to memtable where we'd like to apply those changes to. This memtable is set in db_impl_readonly.cc and it's actually DBImpl::mem_. Why don't we just apply updates to mem_ right away? It seems more intuitive. * Changes when we apply updates to manifest. Before, the process is to recover all the logs, flush it to sst files and then do one giant commit that atomically adds all recovered sst files and sets the next log number. This works good enough, but causes some small troubles for my column family approach, since I can't have one VersionEdit apply to more than single column family[1]. The change here is to commit the files recovered from logs right away. Here is the state of the world before the change: 1. Recover log 5, add new sst files to edit 2. Recover log 7, add new sst files to edit 3. Recover log 8, add new sst files to edit 4. Commit all added sst files to manifest and mark log files 5, 7 and 8 as recoverd (via SetLogNumber(9) function) After the change, we'll do: 1. Recover log 5, commit the new sst files and set log 5 as recovered 2. Recover log 7, commit the new sst files and set log 7 as recovered 3. Recover log 8, commit the new sst files and set log 8 as recovered The added (small) benefit is that if we fail after (2), the new recovery will only have to recover log 8. In previous case, we'll have to restart the recovery from the beginning. The bigger benefit will be to enable easier integration of multiple column families in Recovery code path. [1] I'm happy to dicuss this decison, but I believe this is the cleanest way to go. It also makes backward compatibility much easier. We don't have a requirement of adding multiple column families atomically. Test Plan: make check Reviewers: dhruba, haobo, kailiu, sdong Reviewed By: kailiu CC: leveldb Differential Revision: https://reviews.facebook.net/D15237 --- db/db_impl.cc | 79 +++++++++++++++------------- db/db_impl.h | 12 ++--- db/db_impl_readonly.cc | 4 +- db/db_test.cc | 117 +++++++++++++++++++++++++++++++++++------ 4 files changed, 150 insertions(+), 62 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 43f21505b..35517c22b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -860,14 +860,11 @@ void DBImpl::PurgeObsoleteWALFiles() { } } -// If externalTable is set, then apply recovered transactions -// to that table. This is used for readonly mode. -Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table, - bool error_if_log_file_exist) { +Status DBImpl::Recover(bool read_only, bool error_if_log_file_exist) { mutex_.AssertHeld(); assert(db_lock_ == nullptr); - if (!external_table) { + if (!read_only) { // We call CreateDirIfMissing() as the directory may already exist (if we // are reopening a DB), when this happens we don't want creating the // directory to cause an error. However, we need to check if creating the @@ -948,12 +945,12 @@ Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table, // Recover in the order in which the logs were generated std::sort(logs.begin(), logs.end()); - for (size_t i = 0; i < logs.size(); i++) { - s = RecoverLogFile(logs[i], edit, &max_sequence, external_table); + for (size_t i = 0; s.ok() && i < logs.size(); i++) { // The previous incarnation may not have written any MANIFEST // records after allocating this log number. So we manually // update the file number allocation counter in VersionSet. versions_->MarkFileNumberUsed(logs[i]); + s = RecoverLogFile(logs[i], &max_sequence, read_only); } if (s.ok()) { @@ -968,10 +965,8 @@ Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table, return s; } -Status DBImpl::RecoverLogFile(uint64_t log_number, - VersionEdit* edit, - SequenceNumber* max_sequence, - MemTable* external_table) { +Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, + bool read_only) { struct LogReporter : public log::Reader::Reporter { Env* env; Logger* info_log; @@ -988,6 +983,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, mutex_.AssertHeld(); + VersionEdit edit; + // Open the log file std::string fname = LogFileName(options_.wal_dir, log_number); unique_ptr file; @@ -1017,11 +1014,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, std::string scratch; Slice record; WriteBatch batch; - MemTable* mem = nullptr; - if (external_table) { - mem = external_table; - } - while (reader.ReadRecord(&record, &scratch) && status.ok()) { + bool memtable_empty = true; + while (reader.ReadRecord(&record, &scratch)) { if (record.size() < 12) { reporter.Corruption( record.size(), Status::Corruption("log record too small")); @@ -1029,14 +1023,11 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, } WriteBatchInternal::SetContents(&batch, record); - if (mem == nullptr) { - mem = new MemTable(internal_comparator_, options_); - mem->Ref(); - } - status = WriteBatchInternal::InsertInto(&batch, mem, &options_); + status = WriteBatchInternal::InsertInto(&batch, mem_, &options_); + memtable_empty = false; MaybeIgnoreError(&status); if (!status.ok()) { - break; + return status; } const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) + @@ -1045,28 +1036,44 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, *max_sequence = last_seq; } - if (!external_table && - mem->ApproximateMemoryUsage() > options_.write_buffer_size) { - status = WriteLevel0TableForRecovery(mem, edit); + if (!read_only && + mem_->ApproximateMemoryUsage() > options_.write_buffer_size) { + status = WriteLevel0TableForRecovery(mem_, &edit); + // we still want to clear memtable, even if the recovery failed + delete mem_->Unref(); + mem_ = new MemTable(internal_comparator_, options_); + mem_->Ref(); + memtable_empty = true; if (!status.ok()) { // Reflect errors immediately so that conditions like full // file-systems cause the DB::Open() to fail. - break; + return status; } - delete mem->Unref(); - mem = nullptr; } } - if (status.ok() && mem != nullptr && !external_table) { - status = WriteLevel0TableForRecovery(mem, edit); - // Reflect errors immediately so that conditions like full - // file-systems cause the DB::Open() to fail. + if (!memtable_empty && !read_only) { + status = WriteLevel0TableForRecovery(mem_, &edit); + delete mem_->Unref(); + mem_ = new MemTable(internal_comparator_, options_); + mem_->Ref(); + if (!status.ok()) { + return status; + } } - if (mem != nullptr && !external_table) { - delete mem->Unref(); + if (edit.NumEntries() > 0) { + // if read_only, NumEntries() will be 0 + assert(!read_only); + // writing log number in the manifest means that any log file + // with number strongly less than (log_number + 1) is already + // recovered and should be ignored on next reincarnation. + // Since we already recovered log_number, we want all logs + // with numbers `<= log_number` (includes this one) to be ignored + edit.SetLogNumber(log_number + 1); + status = versions_->LogAndApply(&edit, &mutex_); } + return status; } @@ -3826,8 +3833,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { return s; } impl->mutex_.Lock(); - VersionEdit edit; - s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists + s = impl->Recover(); // Handles create_if_missing, error_if_exists if (s.ok()) { uint64_t new_log_number = impl->versions_->NewFileNumber(); unique_ptr lfile; @@ -3839,6 +3845,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { ); if (s.ok()) { lfile->SetPreallocationBlockSize(1.1 * impl->options_.write_buffer_size); + VersionEdit edit; edit.SetLogNumber(new_log_number); impl->logfile_number_ = new_log_number; impl->log_.reset(new log::Writer(std::move(lfile))); diff --git a/db/db_impl.h b/db/db_impl.h index 3eebaf4a7..da888e4b4 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -262,10 +262,8 @@ class DBImpl : public DB { Status NewDB(); // Recover the descriptor from persistent storage. May do a significant - // amount of work to recover recently logged updates. Any changes to - // be made to the descriptor are added to *edit. - Status Recover(VersionEdit* edit, MemTable* external_table = nullptr, - bool error_if_log_file_exist = false); + // amount of work to recover recently logged updates. + Status Recover(bool read_only = false, bool error_if_log_file_exist = false); void MaybeIgnoreError(Status* s) const; @@ -279,10 +277,8 @@ class DBImpl : public DB { Status FlushMemTableToOutputFile(bool* madeProgress, DeletionState& deletion_state); - Status RecoverLogFile(uint64_t log_number, - VersionEdit* edit, - SequenceNumber* max_sequence, - MemTable* external_table); + Status RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, + bool read_only); // The following two methods are used to flush a memtable to // storage. The first one is used atdatabase RecoveryTime (when the diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index 04033b2fa..f1ffe3ca3 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -86,9 +86,7 @@ Status DB::OpenForReadOnly(const Options& options, const std::string& dbname, DBImplReadOnly* impl = new DBImplReadOnly(options, dbname); impl->mutex_.Lock(); - VersionEdit edit; - Status s = impl->Recover(&edit, impl->GetMemTable(), - error_if_log_file_exist); + Status s = impl->Recover(true /* read only */, error_if_log_file_exist); impl->mutex_.Unlock(); if (s.ok()) { *dbptr = impl; diff --git a/db/db_test.cc b/db/db_test.cc index 133bcb5b4..2445d585a 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -669,6 +669,31 @@ class DBTest { ASSERT_EQ(IterStatus(iter), expected_key); delete iter; } + + void CopyFile(const std::string& source, const std::string& destination, + uint64_t size = 0) { + const EnvOptions soptions; + unique_ptr srcfile; + ASSERT_OK(env_->NewSequentialFile(source, &srcfile, soptions)); + unique_ptr destfile; + ASSERT_OK(env_->NewWritableFile(destination, &destfile, soptions)); + + if (size == 0) { + // default argument means copy everything + ASSERT_OK(env_->GetFileSize(source, &size)); + } + + char buffer[4096]; + Slice slice; + while (size > 0) { + uint64_t one = std::min(uint64_t(sizeof(buffer)), size); + ASSERT_OK(srcfile->Read(one, &slice, buffer)); + ASSERT_OK(destfile->Append(slice)); + size -= slice.size(); + } + ASSERT_OK(destfile->Close()); + } + }; static std::string Key(int i) { @@ -1471,6 +1496,82 @@ TEST(DBTest, Recover) { } while (ChangeOptions()); } +TEST(DBTest, IgnoreRecoveredLog) { + std::string backup_logs = dbname_ + "/backup_logs"; + + // delete old files in backup_logs directory + env_->CreateDirIfMissing(backup_logs); + std::vector old_files; + env_->GetChildren(backup_logs, &old_files); + for (auto& file : old_files) { + if (file != "." && file != "..") { + env_->DeleteFile(backup_logs + "/" + file); + } + } + + do { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.merge_operator = MergeOperators::CreateUInt64AddOperator(); + options.wal_dir = dbname_ + "/logs"; + DestroyAndReopen(&options); + + // fill up the DB + std::string one, two; + PutFixed64(&one, 1); + PutFixed64(&two, 2); + ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one))); + ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one))); + ASSERT_OK(db_->Merge(WriteOptions(), Slice("bar"), Slice(one))); + + // copy the logs to backup + std::vector logs; + env_->GetChildren(options.wal_dir, &logs); + for (auto& log : logs) { + if (log != ".." && log != ".") { + CopyFile(options.wal_dir + "/" + log, backup_logs + "/" + log); + } + } + + // recover the DB + Reopen(&options); + ASSERT_EQ(two, Get("foo")); + ASSERT_EQ(one, Get("bar")); + Close(); + + // copy the logs from backup back to wal dir + for (auto& log : logs) { + if (log != ".." && log != ".") { + CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log); + } + } + // this should ignore the log files, recovery should not happen again + // if the recovery happens, the same merge operator would be called twice, + // leading to incorrect results + Reopen(&options); + ASSERT_EQ(two, Get("foo")); + ASSERT_EQ(one, Get("bar")); + Close(); + Destroy(&options); + + // copy the logs from backup back to wal dir + env_->CreateDirIfMissing(options.wal_dir); + for (auto& log : logs) { + if (log != ".." && log != ".") { + CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log); + // we won't be needing this file no more + env_->DeleteFile(backup_logs + "/" + log); + } + } + // assert that we successfully recovered only from logs, even though we + // destroyed the DB + Reopen(&options); + ASSERT_EQ(two, Get("foo")); + ASSERT_EQ(one, Get("bar")); + Close(); + } while (ChangeOptions()); +} + TEST(DBTest, RollLog) { do { ASSERT_OK(Put("foo", "v1")); @@ -3613,7 +3714,6 @@ TEST(DBTest, BloomFilter) { TEST(DBTest, SnapshotFiles) { do { Options options = CurrentOptions(); - const EnvOptions soptions; options.write_buffer_size = 100000000; // Large write buffer Reopen(&options); @@ -3669,20 +3769,7 @@ TEST(DBTest, SnapshotFiles) { } } } - unique_ptr srcfile; - ASSERT_OK(env_->NewSequentialFile(src, &srcfile, soptions)); - unique_ptr destfile; - ASSERT_OK(env_->NewWritableFile(dest, &destfile, soptions)); - - char buffer[4096]; - Slice slice; - while (size > 0) { - uint64_t one = std::min(uint64_t(sizeof(buffer)), size); - ASSERT_OK(srcfile->Read(one, &slice, buffer)); - ASSERT_OK(destfile->Append(slice)); - size -= slice.size(); - } - ASSERT_OK(destfile->Close()); + CopyFile(src, dest, size); } // release file snapshot From fb01755aa4921ec58571e87ab4de16ad610544ba Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 22 Jan 2014 10:55:16 -0800 Subject: [PATCH 3/3] Unfriending classes Summary: In this diff I made some effort to reduce usage of friending. To do that, I had to expose Compaction::inputs_ through a method inputs(). Not sure if this is a good idea, there is a trade-off. I think it's less confusing than having lots of friends. I also thought about other friendship relationships, but they are too much tangled at this point. Once you friend two classes, it's very hard to unfriend them :) Test Plan: make check Reviewers: haobo, kailiu, sdong, dhruba Reviewed By: kailiu CC: leveldb Differential Revision: https://reviews.facebook.net/D15267 --- db/compaction.cc | 6 +++--- db/compaction.h | 9 ++++++--- db/version_set.cc | 16 +++++++--------- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/db/compaction.cc b/db/compaction.cc index 703e7aeae..536b7e233 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -26,7 +26,7 @@ Compaction::Compaction(Version* input_version, int level, int out_level, : level_(level), out_level_(out_level), max_output_file_size_(target_file_size), - maxGrandParentOverlapBytes_(max_grandparent_overlap_bytes), + max_grandparent_overlap_bytes_(max_grandparent_overlap_bytes), input_version_(input_version), number_levels_(input_version_->NumberLevels()), seek_compaction_(seek_compaction), @@ -64,7 +64,7 @@ bool Compaction::IsTrivialMove() const { return (level_ != out_level_ && num_input_files(0) == 1 && num_input_files(1) == 0 && - TotalFileSize(grandparents_) <= maxGrandParentOverlapBytes_); + TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_); } void Compaction::AddInputDeletions(VersionEdit* edit) { @@ -117,7 +117,7 @@ bool Compaction::ShouldStopBefore(const Slice& internal_key) { } seen_key_ = true; - if (overlapped_bytes_ > maxGrandParentOverlapBytes_) { + if (overlapped_bytes_ > max_grandparent_overlap_bytes_) { // Too much overlap for current output; start new output overlapped_bytes_ = 0; return true; diff --git a/db/compaction.h b/db/compaction.h index 5e696a053..efd6ef71f 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -33,9 +33,14 @@ class Compaction { // "which" must be either 0 or 1 int num_input_files(int which) const { return inputs_[which].size(); } + // Returns input version of the compaction + Version* input_version() const { return input_version_; } + // Return the ith input file at "level()+which" ("which" must be 0 or 1). FileMetaData* input(int which, int i) const { return inputs_[which][i]; } + std::vector* inputs(int which) { return &inputs_[which]; } + // Maximum size of files to build during this compaction. uint64_t MaxOutputFileSize() const { return max_output_file_size_; } @@ -74,8 +79,6 @@ class Compaction { bool IsFullCompaction() { return is_full_compaction_; } private: - friend class Version; - friend class VersionSet; friend class CompactionPicker; friend class UniversalCompactionPicker; friend class LevelCompactionPicker; @@ -87,7 +90,7 @@ class Compaction { int level_; int out_level_; // levels to which output files are stored uint64_t max_output_file_size_; - uint64_t maxGrandParentOverlapBytes_; + uint64_t max_grandparent_overlap_bytes_; Version* input_version_; VersionEdit* edit_; int number_levels_; diff --git a/db/version_set.cc b/db/version_set.cc index ebd2805bc..a08feb875 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1994,23 +1994,21 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { // Level-0 files have to be merged together. For other levels, // we will make a concatenating iterator per level. // TODO(opt): use concatenating iterator for level-0 if there is no overlap - const int space = (c->level() == 0 ? c->inputs_[0].size() + 1 : 2); + const int space = (c->level() == 0 ? c->inputs(0)->size() + 1 : 2); Iterator** list = new Iterator*[space]; int num = 0; for (int which = 0; which < 2; which++) { - if (!c->inputs_[which].empty()) { + if (!c->inputs(which)->empty()) { if (c->level() + which == 0) { - const std::vector& files = c->inputs_[which]; - for (size_t i = 0; i < files.size(); i++) { + for (const auto& file : *c->inputs(which)) { list[num++] = table_cache_->NewIterator( - options, storage_options_compactions_, - files[i]->number, files[i]->file_size, nullptr, - true /* for compaction */); + options, storage_options_compactions_, file->number, + file->file_size, nullptr, true /* for compaction */); } } else { // Create concatenating iterator for the files from this level list[num++] = NewTwoLevelIterator( - new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]), + new Version::LevelFileNumIterator(icmp_, c->inputs(which)), &GetFileIterator, table_cache_, options, storage_options_, true /* for compaction */); } @@ -2034,7 +2032,7 @@ uint64_t VersionSet::MaxFileSizeForLevel(int level) { // in the current version bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) { #ifndef NDEBUG - if (c->input_version_ != current_) { + if (c->input_version() != current_) { Log(options_->info_log, "VerifyCompactionFileConsistency version mismatch"); }