diff --git a/HISTORY.md b/HISTORY.md index d5d4dcc40..60e5fd75c 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -4,6 +4,9 @@ * Deprecate BlockBaseTableOptions.hash_index_allow_collision=false * options.memtable_prefix_bloom_bits changes to options.memtable_prefix_bloom_bits_ratio and deprecate options.memtable_prefix_bloom_probes +### New Features +* Add avoid_flush_during_recovery option. + ## 4.9.0 (6/9/2016) ### Public API changes * Add bottommost_compression option, This option can be used to set a specific compression algorithm for the bottommost level (Last level containing files in the DB). diff --git a/db/db_block_cache_test.cc b/db/db_block_cache_test.cc index f68dd188d..b7c67a61d 100644 --- a/db/db_block_cache_test.cc +++ b/db/db_block_cache_test.cc @@ -39,6 +39,7 @@ class DBBlockCacheTest : public DBTestBase { Options GetOptions(const BlockBasedTableOptions& table_options) { Options options = CurrentOptions(); options.create_if_missing = true; + options.avoid_flush_during_recovery = false; // options.compression = kNoCompression; options.statistics = rocksdb::CreateDBStatistics(); options.table_factory.reset(new BlockBasedTableFactory(table_options)); diff --git a/db/db_impl.cc b/db/db_impl.cc index 2511c2ddf..1dfde31bb 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -196,6 +196,13 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { result.new_table_reader_for_compaction_inputs = true; } + // Force flush on DB open if 2PC is enabled, since with 2PC we have no + // guarantee that consecutive log files have consecutive sequence id, which + // make recovery complicated. + if (result.allow_2pc) { + result.avoid_flush_during_recovery = false; + } + return result; } @@ -1342,7 +1349,10 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, } #endif - bool continue_replay_log = true; + bool stop_replay_by_wal_filter = false; + bool stop_replay_for_corruption = false; + bool flushed = false; + SequenceNumber recovered_sequence = 0; for (auto log_number : log_numbers) { // The previous incarnation may not have written any MANIFEST // records after allocating this log number. So we manually @@ -1350,6 +1360,23 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, versions_->MarkFileNumberUsedDuringRecovery(log_number); // Open the log file std::string fname = LogFileName(db_options_.wal_dir, log_number); + + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "Recovering log #%" PRIu64 " mode %d", log_number, + db_options_.wal_recovery_mode); + auto logFileDropped = [this, &fname]() { + uint64_t bytes; + if (env_->GetFileSize(fname, &bytes).ok()) { + auto info_log = db_options_.info_log.get(); + Log(InfoLogLevel::WARN_LEVEL, info_log, "%s: dropping %d bytes", + fname.c_str(), static_cast(bytes)); + } + }; + if (stop_replay_by_wal_filter) { + logFileDropped(); + continue; + } + unique_ptr file_reader; { unique_ptr file; @@ -1385,9 +1412,6 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // large sequence numbers). log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter, true /*checksum*/, 0 /*initial_offset*/, log_number); - Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, - "Recovering log #%" PRIu64 " mode %d skip-recovery %d", log_number, - db_options_.wal_recovery_mode, !continue_replay_log); // Determine if we should tolerate incomplete records at the tail end of the // Read all the records and add to a memtable @@ -1395,17 +1419,8 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, Slice record; WriteBatch batch; - if (!continue_replay_log) { - uint64_t bytes; - if (env_->GetFileSize(fname, &bytes).ok()) { - auto info_log = db_options_.info_log.get(); - Log(InfoLogLevel::WARN_LEVEL, info_log, "%s: dropping %d bytes", - fname.c_str(), static_cast(bytes)); - } - } - while ( - continue_replay_log && + !stop_replay_by_wal_filter && reader.ReadRecord(&record, &scratch, db_options_.wal_recovery_mode) && status.ok()) { if (record.size() < WriteBatchInternal::kHeader) { @@ -1414,6 +1429,29 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, continue; } WriteBatchInternal::SetContents(&batch, record); + SequenceNumber sequence = WriteBatchInternal::Sequence(&batch); + + // In point-in-time recovery mode, if sequence id of log files are + // consecutive, we continue recovery despite corruption. This could happen + // when we open and write to a corrupted DB, where sequence id will start + // from the last sequence id we recovered. + if (db_options_.wal_recovery_mode == + WALRecoveryMode::kPointInTimeRecovery) { + if (sequence == recovered_sequence + 1) { + stop_replay_for_corruption = false; + } + if (stop_replay_for_corruption) { + logFileDropped(); + break; + } + } + + recovered_sequence = sequence; + if (*next_sequence == kMaxSequenceNumber) { + *next_sequence = sequence; + } else { + WriteBatchInternal::SetSequence(&batch, *next_sequence); + } #ifndef ROCKSDB_LITE if (db_options_.wal_filter != nullptr) { @@ -1433,7 +1471,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, continue; case WalFilter::WalProcessingOption::kStopReplay: // skip current record and stop replay - continue_replay_log = false; + stop_replay_by_wal_filter = true; continue; case WalFilter::WalProcessingOption::kCorruptedRecord: { status = Status::Corruption("Corruption reported by Wal Filter ", @@ -1489,11 +1527,6 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, } #endif // ROCKSDB_LITE - if (*next_sequence == kMaxSequenceNumber) { - *next_sequence = WriteBatchInternal::Sequence(&batch); - } - WriteBatchInternal::SetSequence(&batch, *next_sequence); - // If column family was not found, it might mean that the WAL write // batch references to the column family that was dropped after the // insert. We don't want to fail the whole write batch in that case -- @@ -1529,6 +1562,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // file-systems cause the DB::Open() to fail. return status; } + flushed = true; cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), *next_sequence); @@ -1545,8 +1579,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, WALRecoveryMode::kPointInTimeRecovery) { // We should ignore the error but not continue replaying status = Status::OK(); - continue_replay_log = false; - + stop_replay_for_corruption = true; Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Point in time recovered to log #%" PRIu64 " seq #%" PRIu64, log_number, *next_sequence); @@ -1588,14 +1621,20 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // flush the final memtable (if non-empty) if (cfd->mem()->GetFirstSequenceNumber() != 0) { - status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit); - if (!status.ok()) { - // Recovery failed - break; - } + // If flush happened in the middle of recovery (e.g. due to memtable + // being full), we flush at the end. Otherwise we'll need to record + // where we were on last flush, which make the logic complicated. + if (flushed || !db_options_.avoid_flush_during_recovery) { + status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit); + if (!status.ok()) { + // Recovery failed + break; + } + flushed = true; - cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), - *next_sequence); + cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), + *next_sequence); + } } // write MANIFEST with update @@ -1604,7 +1643,9 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // recovered and should be ignored on next reincarnation. // Since we already recovered max_log_number, we want all logs // with numbers `<= max_log_number` (includes this one) to be ignored - edit->SetLogNumber(max_log_number + 1); + if (flushed) { + edit->SetLogNumber(max_log_number + 1); + } // we must mark the next log number as used, even though it's // not actually used. that is because VersionSet assumes // VersionSet::next_file_number_ always to be strictly greater than any diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index d8831ef3d..3852eb334 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -9,6 +9,7 @@ #include "db/db_test_util.h" #include "port/stack_trace.h" +#include "util/options_helper.h" #include "util/sync_point.h" namespace rocksdb { @@ -156,6 +157,7 @@ TEST_F(DBWALTest, RecoverWithTableHandle) { Options options = CurrentOptions(); options.create_if_missing = true; options.disable_auto_compactions = true; + options.avoid_flush_during_recovery = false; DestroyAndReopen(options); CreateAndReopenWithCF({"pikachu"}, options); @@ -382,6 +384,7 @@ TEST_F(DBWALTest, RecoverCheckFileAmount) { Options options = CurrentOptions(); options.write_buffer_size = 100000; options.arena_block_size = 4 * 1024; + options.avoid_flush_during_recovery = false; CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options); ASSERT_OK(Put(0, Key(1), DummyString(1))); @@ -756,6 +759,212 @@ TEST_F(DBWALTest, kSkipAnyCorruptedRecords) { } } +TEST_F(DBWALTest, AvoidFlushDuringRecovery) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.avoid_flush_during_recovery = false; + + // Test with flush after recovery. + Reopen(options); + ASSERT_OK(Put("foo", "v1")); + ASSERT_OK(Put("bar", "v2")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("foo", "v3")); + ASSERT_OK(Put("bar", "v4")); + ASSERT_EQ(1, TotalTableFiles()); + // Reopen DB. Check if WAL logs flushed. + Reopen(options); + ASSERT_EQ("v3", Get("foo")); + ASSERT_EQ("v4", Get("bar")); + ASSERT_EQ(2, TotalTableFiles()); + + // Test without flush after recovery. + options.avoid_flush_during_recovery = true; + DestroyAndReopen(options); + ASSERT_OK(Put("foo", "v5")); + ASSERT_OK(Put("bar", "v6")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("foo", "v7")); + ASSERT_OK(Put("bar", "v8")); + ASSERT_EQ(1, TotalTableFiles()); + // Reopen DB. WAL logs should not be flushed this time. + Reopen(options); + ASSERT_EQ("v7", Get("foo")); + ASSERT_EQ("v8", Get("bar")); + ASSERT_EQ(1, TotalTableFiles()); + + // Force flush with allow_2pc. + options.avoid_flush_during_recovery = true; + options.allow_2pc = true; + ASSERT_OK(Put("foo", "v9")); + ASSERT_OK(Put("bar", "v10")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("foo", "v11")); + ASSERT_OK(Put("bar", "v12")); + Reopen(options); + ASSERT_EQ("v11", Get("foo")); + ASSERT_EQ("v12", Get("bar")); + ASSERT_EQ(2, TotalTableFiles()); +} + +TEST_F(DBWALTest, RecoverWithoutFlush) { + Options options = CurrentOptions(); + options.avoid_flush_during_recovery = true; + options.create_if_missing = false; + options.disable_auto_compactions = true; + options.write_buffer_size = 64 * 1024 * 1024; + + size_t count = RecoveryTestHelper::FillData(this, &options); + auto validateData = [this, count]() { + for (size_t i = 0; i < count; i++) { + ASSERT_NE(Get("key" + ToString(i)), "NOT_FOUND"); + } + }; + Reopen(options); + validateData(); + // Insert some data without flush + ASSERT_OK(Put("foo", "foo_v1")); + ASSERT_OK(Put("bar", "bar_v1")); + Reopen(options); + validateData(); + ASSERT_EQ(Get("foo"), "foo_v1"); + ASSERT_EQ(Get("bar"), "bar_v1"); + // Insert again and reopen + ASSERT_OK(Put("foo", "foo_v2")); + ASSERT_OK(Put("bar", "bar_v2")); + Reopen(options); + validateData(); + ASSERT_EQ(Get("foo"), "foo_v2"); + ASSERT_EQ(Get("bar"), "bar_v2"); + // manual flush and insert again + Flush(); + ASSERT_EQ(Get("foo"), "foo_v2"); + ASSERT_EQ(Get("bar"), "bar_v2"); + ASSERT_OK(Put("foo", "foo_v3")); + ASSERT_OK(Put("bar", "bar_v3")); + Reopen(options); + validateData(); + ASSERT_EQ(Get("foo"), "foo_v3"); + ASSERT_EQ(Get("bar"), "bar_v3"); +} + +TEST_F(DBWALTest, RecoverWithoutFlushMultipleCF) { + const std::string kSmallValue = "v"; + const std::string kLargeValue = DummyString(1024); + Options options = CurrentOptions(); + options.avoid_flush_during_recovery = true; + options.create_if_missing = false; + options.disable_auto_compactions = true; + + auto countWalFiles = [this]() { + VectorLogPtr log_files; + dbfull()->GetSortedWalFiles(log_files); + return log_files.size(); + }; + + // Create DB with multiple column families and multiple log files. + CreateAndReopenWithCF({"one", "two"}, options); + ASSERT_OK(Put(0, "key1", kSmallValue)); + ASSERT_OK(Put(1, "key2", kLargeValue)); + Flush(1); + ASSERT_EQ(1, countWalFiles()); + ASSERT_OK(Put(0, "key3", kSmallValue)); + ASSERT_OK(Put(2, "key4", kLargeValue)); + Flush(2); + ASSERT_EQ(2, countWalFiles()); + + // Reopen, insert and flush. + options.db_write_buffer_size = 64 * 1024 * 1024; + ReopenWithColumnFamilies({"default", "one", "two"}, options); + ASSERT_EQ(Get(0, "key1"), kSmallValue); + ASSERT_EQ(Get(1, "key2"), kLargeValue); + ASSERT_EQ(Get(0, "key3"), kSmallValue); + ASSERT_EQ(Get(2, "key4"), kLargeValue); + // Insert more data. + ASSERT_OK(Put(0, "key5", kLargeValue)); + ASSERT_OK(Put(1, "key6", kLargeValue)); + ASSERT_EQ(3, countWalFiles()); + Flush(1); + ASSERT_OK(Put(2, "key7", kLargeValue)); + ASSERT_EQ(4, countWalFiles()); + + // Reopen twice and validate. + for (int i = 0; i < 2; i++) { + ReopenWithColumnFamilies({"default", "one", "two"}, options); + ASSERT_EQ(Get(0, "key1"), kSmallValue); + ASSERT_EQ(Get(1, "key2"), kLargeValue); + ASSERT_EQ(Get(0, "key3"), kSmallValue); + ASSERT_EQ(Get(2, "key4"), kLargeValue); + ASSERT_EQ(Get(0, "key5"), kLargeValue); + ASSERT_EQ(Get(1, "key6"), kLargeValue); + ASSERT_EQ(Get(2, "key7"), kLargeValue); + ASSERT_EQ(4, countWalFiles()); + } +} + +// In this test we are trying to do the following: +// 1. Create a DB with corrupted WAL log; +// 2. Open with avoid_flush_during_recovery = true; +// 3. Append more data without flushing, which creates new WAL log. +// 4. Open again. See if it can correctly handle previous corruption. +TEST_F(DBWALTest, RecoverFromCorruptedWALWithoutFlush) { + const int jstart = RecoveryTestHelper::kWALFileOffset; + const int jend = jstart + RecoveryTestHelper::kWALFilesCount; + const int kAppendKeys = 100; + Options options = CurrentOptions(); + options.avoid_flush_during_recovery = true; + options.create_if_missing = false; + options.disable_auto_compactions = true; + options.write_buffer_size = 64 * 1024 * 1024; + + auto getAll = [this]() { + std::vector> data; + ReadOptions ropt; + Iterator* iter = dbfull()->NewIterator(ropt); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + data.push_back( + std::make_pair(iter->key().ToString(), iter->value().ToString())); + } + delete iter; + return data; + }; + for (auto& mode : wal_recovery_mode_string_map) { + options.wal_recovery_mode = mode.second; + for (auto trunc : {true, false}) { + for (int i = 0; i < 4; i++) { + for (int j = jstart; j < jend; j++) { + // Create corrupted WAL + RecoveryTestHelper::FillData(this, &options); + RecoveryTestHelper::CorruptWAL(this, options, /*off=*/i * .3, + /*len%=*/.1, /*wal=*/j, trunc); + // Skip the test if DB won't open. + if (!TryReopen(options).ok()) { + ASSERT_TRUE(options.wal_recovery_mode == + WALRecoveryMode::kAbsoluteConsistency || + (!trunc && + options.wal_recovery_mode == + WALRecoveryMode::kTolerateCorruptedTailRecords)); + continue; + } + ASSERT_OK(TryReopen(options)); + // Append some more data. + for (int k = 0; k < kAppendKeys; k++) { + std::string key = "extra_key" + ToString(k); + std::string value = DummyString(RecoveryTestHelper::kValueSize); + ASSERT_OK(Put(key, value)); + } + // Save data for comparision. + auto data = getAll(); + // Reopen. Verify data. + ASSERT_OK(TryReopen(options)); + auto actual_data = getAll(); + ASSERT_EQ(data, actual_data); + } + } + } + } +} + #endif // ROCKSDB_LITE } // namespace rocksdb diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 9eefd885e..8851285f0 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1341,6 +1341,15 @@ struct DBOptions { // when printing to LOG. // DEFAULT: false bool dump_malloc_stats; + + // By default RocksDB replay WAL logs and flush them on DB open, which may + // create very small SST files. If this option is enabled, RocksDB will try + // to avoid (but not guarantee not to) flush during recovery. Also, existing + // WAL logs will be kept, so that if crash happened before flush, we still + // have logs to recover from. + // + // DEFAULT: false + bool avoid_flush_during_recovery; }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/util/options.cc b/util/options.cc index 7c3867087..cae77aaaf 100644 --- a/util/options.cc +++ b/util/options.cc @@ -273,7 +273,8 @@ DBOptions::DBOptions() wal_filter(nullptr), #endif // ROCKSDB_LITE fail_if_options_file_error(false), - dump_malloc_stats(false) { + dump_malloc_stats(false), + avoid_flush_during_recovery(false) { } DBOptions::DBOptions(const Options& options) @@ -343,7 +344,8 @@ DBOptions::DBOptions(const Options& options) wal_filter(options.wal_filter), #endif // ROCKSDB_LITE fail_if_options_file_error(options.fail_if_options_file_error), - dump_malloc_stats(options.dump_malloc_stats) { + dump_malloc_stats(options.dump_malloc_stats), + avoid_flush_during_recovery(options.avoid_flush_during_recovery) { } static const char* const access_hints[] = { @@ -468,6 +470,8 @@ void DBOptions::Dump(Logger* log) const { Header(log, " Options.wal_filter: %s", wal_filter ? wal_filter->Name() : "None"); #endif // ROCKDB_LITE + Header(log, " Options.avoid_flush_during_recovery: %d", + avoid_flush_during_recovery); } // DBOptions::Dump void ColumnFamilyOptions::Dump(Logger* log) const { diff --git a/util/options_helper.h b/util/options_helper.h index a7fea3c08..b28032853 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -319,7 +319,10 @@ static std::unordered_map db_options_type_info = { OptionVerificationType::kNormal}}, {"dump_malloc_stats", {offsetof(struct DBOptions, dump_malloc_stats), OptionType::kBoolean, - OptionVerificationType::kNormal}}}; + OptionVerificationType::kNormal}}, + {"avoid_flush_during_recovery", + {offsetof(struct DBOptions, avoid_flush_during_recovery), + OptionType::kBoolean, OptionVerificationType::kNormal}}}; static std::unordered_map cf_options_type_info = { /* not yet supported diff --git a/util/options_settable_test.cc b/util/options_settable_test.cc index f50ebf011..f0ea31048 100644 --- a/util/options_settable_test.cc +++ b/util/options_settable_test.cc @@ -283,7 +283,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "access_hint_on_compaction_start=NONE;" "info_log_level=DEBUG_LEVEL;" "dump_malloc_stats=false;" - "allow_2pc=false;", + "allow_2pc=false;" + "avoid_flush_during_recovery=false;", new_options)); ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions),