add option to not flush memtable on open()

Summary:
Add option to not flush memtable on open()
In case the option is enabled, don't delete existing log files by not updating log numbers to MANIFEST.
Will still flush if we need to (e.g. memtable full in the middle). In that case we also flush final memtable.
If wal_recovery_mode = kPointInTimeRecovery, do not halt immediately after encounter corruption. Instead, check if seq id of next log file is last_log_sequence + 1. In that case we continue recovery.

Test Plan: See unit test.

Reviewers: dhruba, horuff, sdong

Reviewed By: sdong

Subscribers: benj, yhchiang, andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D57813
main
Yi Wu 10 years ago
parent 8100ec2cd1
commit bc8af90e8c
  1. 3
      HISTORY.md
  2. 1
      db/db_block_cache_test.cc
  3. 85
      db/db_impl.cc
  4. 209
      db/db_wal_test.cc
  5. 9
      include/rocksdb/options.h
  6. 8
      util/options.cc
  7. 5
      util/options_helper.h
  8. 3
      util/options_settable_test.cc

@ -4,6 +4,9 @@
* Deprecate BlockBaseTableOptions.hash_index_allow_collision=false
* options.memtable_prefix_bloom_bits changes to options.memtable_prefix_bloom_bits_ratio and deprecate options.memtable_prefix_bloom_probes
### New Features
* Add avoid_flush_during_recovery option.
## 4.9.0 (6/9/2016)
### Public API changes
* Add bottommost_compression option, This option can be used to set a specific compression algorithm for the bottommost level (Last level containing files in the DB).

@ -39,6 +39,7 @@ class DBBlockCacheTest : public DBTestBase {
Options GetOptions(const BlockBasedTableOptions& table_options) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.avoid_flush_during_recovery = false;
// options.compression = kNoCompression;
options.statistics = rocksdb::CreateDBStatistics();
options.table_factory.reset(new BlockBasedTableFactory(table_options));

@ -196,6 +196,13 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
result.new_table_reader_for_compaction_inputs = true;
}
// Force flush on DB open if 2PC is enabled, since with 2PC we have no
// guarantee that consecutive log files have consecutive sequence id, which
// make recovery complicated.
if (result.allow_2pc) {
result.avoid_flush_during_recovery = false;
}
return result;
}
@ -1342,7 +1349,10 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
}
#endif
bool continue_replay_log = true;
bool stop_replay_by_wal_filter = false;
bool stop_replay_for_corruption = false;
bool flushed = false;
SequenceNumber recovered_sequence = 0;
for (auto log_number : log_numbers) {
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
@ -1350,6 +1360,23 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
versions_->MarkFileNumberUsedDuringRecovery(log_number);
// Open the log file
std::string fname = LogFileName(db_options_.wal_dir, log_number);
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Recovering log #%" PRIu64 " mode %d", log_number,
db_options_.wal_recovery_mode);
auto logFileDropped = [this, &fname]() {
uint64_t bytes;
if (env_->GetFileSize(fname, &bytes).ok()) {
auto info_log = db_options_.info_log.get();
Log(InfoLogLevel::WARN_LEVEL, info_log, "%s: dropping %d bytes",
fname.c_str(), static_cast<int>(bytes));
}
};
if (stop_replay_by_wal_filter) {
logFileDropped();
continue;
}
unique_ptr<SequentialFileReader> file_reader;
{
unique_ptr<SequentialFile> file;
@ -1385,9 +1412,6 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// large sequence numbers).
log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
true /*checksum*/, 0 /*initial_offset*/, log_number);
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Recovering log #%" PRIu64 " mode %d skip-recovery %d", log_number,
db_options_.wal_recovery_mode, !continue_replay_log);
// Determine if we should tolerate incomplete records at the tail end of the
// Read all the records and add to a memtable
@ -1395,17 +1419,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
Slice record;
WriteBatch batch;
if (!continue_replay_log) {
uint64_t bytes;
if (env_->GetFileSize(fname, &bytes).ok()) {
auto info_log = db_options_.info_log.get();
Log(InfoLogLevel::WARN_LEVEL, info_log, "%s: dropping %d bytes",
fname.c_str(), static_cast<int>(bytes));
}
}
while (
continue_replay_log &&
!stop_replay_by_wal_filter &&
reader.ReadRecord(&record, &scratch, db_options_.wal_recovery_mode) &&
status.ok()) {
if (record.size() < WriteBatchInternal::kHeader) {
@ -1414,6 +1429,29 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
continue;
}
WriteBatchInternal::SetContents(&batch, record);
SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
// In point-in-time recovery mode, if sequence id of log files are
// consecutive, we continue recovery despite corruption. This could happen
// when we open and write to a corrupted DB, where sequence id will start
// from the last sequence id we recovered.
if (db_options_.wal_recovery_mode ==
WALRecoveryMode::kPointInTimeRecovery) {
if (sequence == recovered_sequence + 1) {
stop_replay_for_corruption = false;
}
if (stop_replay_for_corruption) {
logFileDropped();
break;
}
}
recovered_sequence = sequence;
if (*next_sequence == kMaxSequenceNumber) {
*next_sequence = sequence;
} else {
WriteBatchInternal::SetSequence(&batch, *next_sequence);
}
#ifndef ROCKSDB_LITE
if (db_options_.wal_filter != nullptr) {
@ -1433,7 +1471,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
continue;
case WalFilter::WalProcessingOption::kStopReplay:
// skip current record and stop replay
continue_replay_log = false;
stop_replay_by_wal_filter = true;
continue;
case WalFilter::WalProcessingOption::kCorruptedRecord: {
status = Status::Corruption("Corruption reported by Wal Filter ",
@ -1489,11 +1527,6 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
}
#endif // ROCKSDB_LITE
if (*next_sequence == kMaxSequenceNumber) {
*next_sequence = WriteBatchInternal::Sequence(&batch);
}
WriteBatchInternal::SetSequence(&batch, *next_sequence);
// If column family was not found, it might mean that the WAL write
// batch references to the column family that was dropped after the
// insert. We don't want to fail the whole write batch in that case --
@ -1529,6 +1562,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// file-systems cause the DB::Open() to fail.
return status;
}
flushed = true;
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
*next_sequence);
@ -1545,8 +1579,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
WALRecoveryMode::kPointInTimeRecovery) {
// We should ignore the error but not continue replaying
status = Status::OK();
continue_replay_log = false;
stop_replay_for_corruption = true;
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Point in time recovered to log #%" PRIu64 " seq #%" PRIu64,
log_number, *next_sequence);
@ -1588,15 +1621,21 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// flush the final memtable (if non-empty)
if (cfd->mem()->GetFirstSequenceNumber() != 0) {
// If flush happened in the middle of recovery (e.g. due to memtable
// being full), we flush at the end. Otherwise we'll need to record
// where we were on last flush, which make the logic complicated.
if (flushed || !db_options_.avoid_flush_during_recovery) {
status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
if (!status.ok()) {
// Recovery failed
break;
}
flushed = true;
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
*next_sequence);
}
}
// write MANIFEST with update
// writing log_number in the manifest means that any log file
@ -1604,7 +1643,9 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// recovered and should be ignored on next reincarnation.
// Since we already recovered max_log_number, we want all logs
// with numbers `<= max_log_number` (includes this one) to be ignored
if (flushed) {
edit->SetLogNumber(max_log_number + 1);
}
// we must mark the next log number as used, even though it's
// not actually used. that is because VersionSet assumes
// VersionSet::next_file_number_ always to be strictly greater than any

@ -9,6 +9,7 @@
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "util/options_helper.h"
#include "util/sync_point.h"
namespace rocksdb {
@ -156,6 +157,7 @@ TEST_F(DBWALTest, RecoverWithTableHandle) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.avoid_flush_during_recovery = false;
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
@ -382,6 +384,7 @@ TEST_F(DBWALTest, RecoverCheckFileAmount) {
Options options = CurrentOptions();
options.write_buffer_size = 100000;
options.arena_block_size = 4 * 1024;
options.avoid_flush_during_recovery = false;
CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
ASSERT_OK(Put(0, Key(1), DummyString(1)));
@ -756,6 +759,212 @@ TEST_F(DBWALTest, kSkipAnyCorruptedRecords) {
}
}
TEST_F(DBWALTest, AvoidFlushDuringRecovery) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.avoid_flush_during_recovery = false;
// Test with flush after recovery.
Reopen(options);
ASSERT_OK(Put("foo", "v1"));
ASSERT_OK(Put("bar", "v2"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo", "v3"));
ASSERT_OK(Put("bar", "v4"));
ASSERT_EQ(1, TotalTableFiles());
// Reopen DB. Check if WAL logs flushed.
Reopen(options);
ASSERT_EQ("v3", Get("foo"));
ASSERT_EQ("v4", Get("bar"));
ASSERT_EQ(2, TotalTableFiles());
// Test without flush after recovery.
options.avoid_flush_during_recovery = true;
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "v5"));
ASSERT_OK(Put("bar", "v6"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo", "v7"));
ASSERT_OK(Put("bar", "v8"));
ASSERT_EQ(1, TotalTableFiles());
// Reopen DB. WAL logs should not be flushed this time.
Reopen(options);
ASSERT_EQ("v7", Get("foo"));
ASSERT_EQ("v8", Get("bar"));
ASSERT_EQ(1, TotalTableFiles());
// Force flush with allow_2pc.
options.avoid_flush_during_recovery = true;
options.allow_2pc = true;
ASSERT_OK(Put("foo", "v9"));
ASSERT_OK(Put("bar", "v10"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo", "v11"));
ASSERT_OK(Put("bar", "v12"));
Reopen(options);
ASSERT_EQ("v11", Get("foo"));
ASSERT_EQ("v12", Get("bar"));
ASSERT_EQ(2, TotalTableFiles());
}
TEST_F(DBWALTest, RecoverWithoutFlush) {
Options options = CurrentOptions();
options.avoid_flush_during_recovery = true;
options.create_if_missing = false;
options.disable_auto_compactions = true;
options.write_buffer_size = 64 * 1024 * 1024;
size_t count = RecoveryTestHelper::FillData(this, &options);
auto validateData = [this, count]() {
for (size_t i = 0; i < count; i++) {
ASSERT_NE(Get("key" + ToString(i)), "NOT_FOUND");
}
};
Reopen(options);
validateData();
// Insert some data without flush
ASSERT_OK(Put("foo", "foo_v1"));
ASSERT_OK(Put("bar", "bar_v1"));
Reopen(options);
validateData();
ASSERT_EQ(Get("foo"), "foo_v1");
ASSERT_EQ(Get("bar"), "bar_v1");
// Insert again and reopen
ASSERT_OK(Put("foo", "foo_v2"));
ASSERT_OK(Put("bar", "bar_v2"));
Reopen(options);
validateData();
ASSERT_EQ(Get("foo"), "foo_v2");
ASSERT_EQ(Get("bar"), "bar_v2");
// manual flush and insert again
Flush();
ASSERT_EQ(Get("foo"), "foo_v2");
ASSERT_EQ(Get("bar"), "bar_v2");
ASSERT_OK(Put("foo", "foo_v3"));
ASSERT_OK(Put("bar", "bar_v3"));
Reopen(options);
validateData();
ASSERT_EQ(Get("foo"), "foo_v3");
ASSERT_EQ(Get("bar"), "bar_v3");
}
TEST_F(DBWALTest, RecoverWithoutFlushMultipleCF) {
const std::string kSmallValue = "v";
const std::string kLargeValue = DummyString(1024);
Options options = CurrentOptions();
options.avoid_flush_during_recovery = true;
options.create_if_missing = false;
options.disable_auto_compactions = true;
auto countWalFiles = [this]() {
VectorLogPtr log_files;
dbfull()->GetSortedWalFiles(log_files);
return log_files.size();
};
// Create DB with multiple column families and multiple log files.
CreateAndReopenWithCF({"one", "two"}, options);
ASSERT_OK(Put(0, "key1", kSmallValue));
ASSERT_OK(Put(1, "key2", kLargeValue));
Flush(1);
ASSERT_EQ(1, countWalFiles());
ASSERT_OK(Put(0, "key3", kSmallValue));
ASSERT_OK(Put(2, "key4", kLargeValue));
Flush(2);
ASSERT_EQ(2, countWalFiles());
// Reopen, insert and flush.
options.db_write_buffer_size = 64 * 1024 * 1024;
ReopenWithColumnFamilies({"default", "one", "two"}, options);
ASSERT_EQ(Get(0, "key1"), kSmallValue);
ASSERT_EQ(Get(1, "key2"), kLargeValue);
ASSERT_EQ(Get(0, "key3"), kSmallValue);
ASSERT_EQ(Get(2, "key4"), kLargeValue);
// Insert more data.
ASSERT_OK(Put(0, "key5", kLargeValue));
ASSERT_OK(Put(1, "key6", kLargeValue));
ASSERT_EQ(3, countWalFiles());
Flush(1);
ASSERT_OK(Put(2, "key7", kLargeValue));
ASSERT_EQ(4, countWalFiles());
// Reopen twice and validate.
for (int i = 0; i < 2; i++) {
ReopenWithColumnFamilies({"default", "one", "two"}, options);
ASSERT_EQ(Get(0, "key1"), kSmallValue);
ASSERT_EQ(Get(1, "key2"), kLargeValue);
ASSERT_EQ(Get(0, "key3"), kSmallValue);
ASSERT_EQ(Get(2, "key4"), kLargeValue);
ASSERT_EQ(Get(0, "key5"), kLargeValue);
ASSERT_EQ(Get(1, "key6"), kLargeValue);
ASSERT_EQ(Get(2, "key7"), kLargeValue);
ASSERT_EQ(4, countWalFiles());
}
}
// In this test we are trying to do the following:
// 1. Create a DB with corrupted WAL log;
// 2. Open with avoid_flush_during_recovery = true;
// 3. Append more data without flushing, which creates new WAL log.
// 4. Open again. See if it can correctly handle previous corruption.
TEST_F(DBWALTest, RecoverFromCorruptedWALWithoutFlush) {
const int jstart = RecoveryTestHelper::kWALFileOffset;
const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
const int kAppendKeys = 100;
Options options = CurrentOptions();
options.avoid_flush_during_recovery = true;
options.create_if_missing = false;
options.disable_auto_compactions = true;
options.write_buffer_size = 64 * 1024 * 1024;
auto getAll = [this]() {
std::vector<std::pair<std::string, std::string>> data;
ReadOptions ropt;
Iterator* iter = dbfull()->NewIterator(ropt);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
data.push_back(
std::make_pair(iter->key().ToString(), iter->value().ToString()));
}
delete iter;
return data;
};
for (auto& mode : wal_recovery_mode_string_map) {
options.wal_recovery_mode = mode.second;
for (auto trunc : {true, false}) {
for (int i = 0; i < 4; i++) {
for (int j = jstart; j < jend; j++) {
// Create corrupted WAL
RecoveryTestHelper::FillData(this, &options);
RecoveryTestHelper::CorruptWAL(this, options, /*off=*/i * .3,
/*len%=*/.1, /*wal=*/j, trunc);
// Skip the test if DB won't open.
if (!TryReopen(options).ok()) {
ASSERT_TRUE(options.wal_recovery_mode ==
WALRecoveryMode::kAbsoluteConsistency ||
(!trunc &&
options.wal_recovery_mode ==
WALRecoveryMode::kTolerateCorruptedTailRecords));
continue;
}
ASSERT_OK(TryReopen(options));
// Append some more data.
for (int k = 0; k < kAppendKeys; k++) {
std::string key = "extra_key" + ToString(k);
std::string value = DummyString(RecoveryTestHelper::kValueSize);
ASSERT_OK(Put(key, value));
}
// Save data for comparision.
auto data = getAll();
// Reopen. Verify data.
ASSERT_OK(TryReopen(options));
auto actual_data = getAll();
ASSERT_EQ(data, actual_data);
}
}
}
}
}
#endif // ROCKSDB_LITE
} // namespace rocksdb

@ -1341,6 +1341,15 @@ struct DBOptions {
// when printing to LOG.
// DEFAULT: false
bool dump_malloc_stats;
// By default RocksDB replay WAL logs and flush them on DB open, which may
// create very small SST files. If this option is enabled, RocksDB will try
// to avoid (but not guarantee not to) flush during recovery. Also, existing
// WAL logs will be kept, so that if crash happened before flush, we still
// have logs to recover from.
//
// DEFAULT: false
bool avoid_flush_during_recovery;
};
// Options to control the behavior of a database (passed to DB::Open)

@ -273,7 +273,8 @@ DBOptions::DBOptions()
wal_filter(nullptr),
#endif // ROCKSDB_LITE
fail_if_options_file_error(false),
dump_malloc_stats(false) {
dump_malloc_stats(false),
avoid_flush_during_recovery(false) {
}
DBOptions::DBOptions(const Options& options)
@ -343,7 +344,8 @@ DBOptions::DBOptions(const Options& options)
wal_filter(options.wal_filter),
#endif // ROCKSDB_LITE
fail_if_options_file_error(options.fail_if_options_file_error),
dump_malloc_stats(options.dump_malloc_stats) {
dump_malloc_stats(options.dump_malloc_stats),
avoid_flush_during_recovery(options.avoid_flush_during_recovery) {
}
static const char* const access_hints[] = {
@ -468,6 +470,8 @@ void DBOptions::Dump(Logger* log) const {
Header(log, " Options.wal_filter: %s",
wal_filter ? wal_filter->Name() : "None");
#endif // ROCKDB_LITE
Header(log, " Options.avoid_flush_during_recovery: %d",
avoid_flush_during_recovery);
} // DBOptions::Dump
void ColumnFamilyOptions::Dump(Logger* log) const {

@ -319,7 +319,10 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
OptionVerificationType::kNormal}},
{"dump_malloc_stats",
{offsetof(struct DBOptions, dump_malloc_stats), OptionType::kBoolean,
OptionVerificationType::kNormal}}};
OptionVerificationType::kNormal}},
{"avoid_flush_during_recovery",
{offsetof(struct DBOptions, avoid_flush_during_recovery),
OptionType::kBoolean, OptionVerificationType::kNormal}}};
static std::unordered_map<std::string, OptionTypeInfo> cf_options_type_info = {
/* not yet supported

@ -283,7 +283,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
"access_hint_on_compaction_start=NONE;"
"info_log_level=DEBUG_LEVEL;"
"dump_malloc_stats=false;"
"allow_2pc=false;",
"allow_2pc=false;"
"avoid_flush_during_recovery=false;",
new_options));
ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions),

Loading…
Cancel
Save