Update column families' log number altogether after flushing during recovery (#5856)

Summary:
A bug occasionally shows up in crash test, and https://github.com/facebook/rocksdb/issues/5851 reproduces it.
The bug can surface in the following way.
1. Database has multiple column families.
2. Between one DB restart, the last log file is corrupted in the middle (not the tail)
3. During restart, DB crashes between flushing between two column families.

Then DB will fail to be opened again with error "SST file is ahead of WALs".
Solution is to update the log number associated with each column family altogether after flushing all column families' memtables. The version edits should be written to a new MANIFEST. Only after writing to all these version edits succeed does RocksDB (atomically) points the CURRENT file to the new MANIFEST.

Test plan (on devserver):
```
$make all && make check
```
Specifically
```
$make db_test2
$./db_test2 --gtest_filter=DBTest2.CrashInRecoveryMultipleCF
```
Also checked for compatibility as follows.
Use this branch, run DBTest2.CrashInRecoveryMultipleCF and preserve the db directory.
Then checkout 5.4, build ldb, and dump the MANIFEST.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5856

Differential Revision: D17620818

Pulled By: riversand963

fbshipit-source-id: b52ce5969c9a8052cacec2bd805fcfb373589039
main
Yanqin Jin 5 years ago committed by Facebook Github Bot
parent ca7ccbe2ea
commit 2309fd63bf
  1. 2
      HISTORY.md
  2. 30
      db/db_impl/db_impl_open.cc
  3. 106
      db/db_test2.cc
  4. 5
      db/version_set.cc

@ -12,6 +12,8 @@
* Fix OnFlushCompleted fired before flush result persisted in MANIFEST when there's concurrent flush job. The bug exists since OnFlushCompleted was introduced in rocksdb 3.8. * Fix OnFlushCompleted fired before flush result persisted in MANIFEST when there's concurrent flush job. The bug exists since OnFlushCompleted was introduced in rocksdb 3.8.
* Fixed an sst_dump crash on some plain table SST files. * Fixed an sst_dump crash on some plain table SST files.
* Fixed a memory leak in some error cases of opening plain table SST files. * Fixed a memory leak in some error cases of opening plain table SST files.
* Fix a bug when a crash happens while calling WriteLevel0TableForRecovery for multiple column families, leading to a column family's log number greater than the first corrutped log number when the DB is being opened in PointInTime recovery mode during next recovery attempt (#5856).
### New Features ### New Features
* Introduced DBOptions::max_write_batch_group_size_bytes to configure maximum limit on number of bytes that are written in a single batch of WAL or memtable write. It is followed when the leader write size is larger than 1/8 of this limit. * Introduced DBOptions::max_write_batch_group_size_bytes to configure maximum limit on number of bytes that are written in a single batch of WAL or memtable write. It is followed when the leader write size is larger than 1/8 of this limit.
* VerifyChecksum() by default will issue readahead. Allow ReadOptions to be passed in to those functions to override the readhead size. For checksum verifying before external SST file ingestion, a new option IngestExternalFileOptions.verify_checksums_readahead_size, is added for this readahead setting. * VerifyChecksum() by default will issue readahead. Allow ReadOptions to be passed in to those functions to override the readhead size. For checksum verifying before external SST file ingestion, a new option IngestExternalFileOptions.verify_checksums_readahead_size, is added for this readahead setting.

@ -1023,6 +1023,9 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
continue; continue;
} }
TEST_SYNC_POINT_CALLBACK(
"DBImpl::RecoverLogFiles:BeforeFlushFinalMemtable", /*arg=*/nullptr);
// flush the final memtable (if non-empty) // flush the final memtable (if non-empty)
if (cfd->mem()->GetFirstSequenceNumber() != 0) { if (cfd->mem()->GetFirstSequenceNumber() != 0) {
// If flush happened in the middle of recovery (e.g. due to memtable // If flush happened in the middle of recovery (e.g. due to memtable
@ -1042,7 +1045,9 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
data_seen = true; data_seen = true;
} }
// write MANIFEST with update // Update the log number info in the version edit corresponding to this
// column family. Note that the version edits will be written to MANIFEST
// together later.
// writing log_number in the manifest means that any log file // writing log_number in the manifest means that any log file
// with number strongly less than (log_number + 1) is already // with number strongly less than (log_number + 1) is already
// recovered and should be ignored on next reincarnation. // recovered and should be ignored on next reincarnation.
@ -1051,19 +1056,28 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) { if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
edit->SetLogNumber(max_log_number + 1); edit->SetLogNumber(max_log_number + 1);
} }
}
if (status.ok()) {
// we must mark the next log number as used, even though it's // we must mark the next log number as used, even though it's
// not actually used. that is because VersionSet assumes // not actually used. that is because VersionSet assumes
// VersionSet::next_file_number_ always to be strictly greater than any // VersionSet::next_file_number_ always to be strictly greater than any
// log number // log number
versions_->MarkFileNumberUsed(max_log_number + 1); versions_->MarkFileNumberUsed(max_log_number + 1);
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
edit, &mutex_); autovector<ColumnFamilyData*> cfds;
TEST_SYNC_POINT_CALLBACK("DBImpl::RecoverLogFiles:AfterLogAndApply", autovector<const MutableCFOptions*> cf_opts;
nullptr); autovector<autovector<VersionEdit*>> edit_lists;
if (!status.ok()) { for (auto* cfd : *versions_->GetColumnFamilySet()) {
// Recovery failed cfds.push_back(cfd);
break; cf_opts.push_back(cfd->GetLatestMutableCFOptions());
auto iter = version_edits.find(cfd->GetID());
assert(iter != version_edits.end());
edit_lists.push_back({&iter->second});
} }
// write MANIFEST with update
status = versions_->LogAndApply(cfds, cf_opts, edit_lists, &mutex_,
directories_.GetDbDir(),
/*new_descriptor_log=*/true);
} }
} }

@ -4109,63 +4109,73 @@ TEST_F(DBTest2, RowCacheSnapshot) {
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
// Disabled but the test is failing.
// When DB is reopened with multiple column families, the manifest file // When DB is reopened with multiple column families, the manifest file
// is written after the first CF is flushed, and it is written again // is written after the first CF is flushed, and it is written again
// after each flush. If DB crashes between the flushes, the flushed CF // after each flush. If DB crashes between the flushes, the flushed CF
// flushed will pass the latest log file, and now we require it not // flushed will pass the latest log file, and now we require it not
// to be corrupted, and triggering a corruption report. // to be corrupted, and triggering a corruption report.
// We need to fix the bug and enable the test. // We need to fix the bug and enable the test.
TEST_F(DBTest2, DISABLED_CrashInRecoveryMultipleCF) { TEST_F(DBTest2, CrashInRecoveryMultipleCF) {
Options options = CurrentOptions(); const std::vector<std::string> sync_points = {
options.create_if_missing = true; "DBImpl::RecoverLogFiles:BeforeFlushFinalMemtable",
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0"};
CreateAndReopenWithCF({"pikachu"}, options); for (const auto& test_sync_point : sync_points) {
ASSERT_OK(Put("foo", "bar")); Options options = CurrentOptions();
ASSERT_OK(Flush()); // First destroy original db to ensure a clean start.
ASSERT_OK(Put(1, "foo", "bar")); DestroyAndReopen(options);
ASSERT_OK(Flush(1)); options.create_if_missing = true;
ASSERT_OK(Put("foo", "bar")); options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
ASSERT_OK(Put(1, "foo", "bar")); CreateAndReopenWithCF({"pikachu"}, options);
// The value is large enough to be divided to two blocks. ASSERT_OK(Put("foo", "bar"));
std::string large_value(400, ' '); ASSERT_OK(Flush());
ASSERT_OK(Put("foo1", large_value)); ASSERT_OK(Put(1, "foo", "bar"));
ASSERT_OK(Put("foo2", large_value)); ASSERT_OK(Flush(1));
Close(); ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Put(1, "foo", "bar"));
// The value is large enough to be divided to two blocks.
std::string large_value(400, ' ');
ASSERT_OK(Put("foo1", large_value));
ASSERT_OK(Put("foo2", large_value));
Close();
// Corrupt the log file in the middle, so that it is not corrupted // Corrupt the log file in the middle, so that it is not corrupted
// in the tail. // in the tail.
std::vector<std::string> filenames; std::vector<std::string> filenames;
ASSERT_OK(env_->GetChildren(dbname_, &filenames)); ASSERT_OK(env_->GetChildren(dbname_, &filenames));
for (const auto& f : filenames) { for (const auto& f : filenames) {
uint64_t number; uint64_t number;
FileType type; FileType type;
if (ParseFileName(f, &number, &type) && type == FileType::kLogFile) { if (ParseFileName(f, &number, &type) && type == FileType::kLogFile) {
std::string fname = dbname_ + "/" + f; std::string fname = dbname_ + "/" + f;
std::string file_content; std::string file_content;
ASSERT_OK(ReadFileToString(env_, fname, &file_content)); ASSERT_OK(ReadFileToString(env_, fname, &file_content));
file_content[400] = 'h'; file_content[400] = 'h';
file_content[401] = 'a'; file_content[401] = 'a';
ASSERT_OK(WriteStringToFile(env_, file_content, fname)); ASSERT_OK(WriteStringToFile(env_, file_content, fname));
break; break;
}
} }
}
// Reopen and freeze the file system after the first manifest write. // Reopen and freeze the file system after the first manifest write.
FaultInjectionTestEnv fit_env(options.env); FaultInjectionTestEnv fit_env(options.env);
options.env = &fit_env; options.env = &fit_env;
rocksdb::SyncPoint::GetInstance()->SetCallBack( rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
"DBImpl::RecoverLogFiles:AfterLogAndApply", rocksdb::SyncPoint::GetInstance()->SetCallBack(
[&](void* /*arg*/) { fit_env.SetFilesystemActive(false); }); test_sync_point,
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); [&](void* /*arg*/) { fit_env.SetFilesystemActive(false); });
ASSERT_NOK(TryReopenWithColumnFamilies({"default", "pikachu"}, options)); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); ASSERT_NOK(TryReopenWithColumnFamilies(
{kDefaultColumnFamilyName, "pikachu"}, options));
fit_env.SetFilesystemActive(true); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
// If we continue using failure ingestion Env, it will conplain something
// when renaming current file, which is not expected. Need to investigate why. fit_env.SetFilesystemActive(true);
options.env = env_; // If we continue using failure ingestion Env, it will conplain something
ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options)); // when renaming current file, which is not expected. Need to investigate
// why.
options.env = env_;
ASSERT_OK(TryReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"},
options));
}
} }
} // namespace rocksdb } // namespace rocksdb

@ -3807,8 +3807,9 @@ Status VersionSet::ProcessManifestWrites(
rocksdb_kill_odds * REDUCE_ODDS2); rocksdb_kill_odds * REDUCE_ODDS2);
#ifndef NDEBUG #ifndef NDEBUG
if (batch_edits.size() > 1 && batch_edits.size() - 1 == idx) { if (batch_edits.size() > 1 && batch_edits.size() - 1 == idx) {
TEST_SYNC_POINT( TEST_SYNC_POINT_CALLBACK(
"VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0"); "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
nullptr);
TEST_SYNC_POINT( TEST_SYNC_POINT(
"VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1"); "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1");
} }

Loading…
Cancel
Save