Fail recovery when MANIFEST record checksum mismatch (#6996)

Summary:
https://github.com/facebook/rocksdb/issues/5411 refactored `VersionSet::Recover` but introduced a bug, explained as follows.
Before, once a checksum mismatch happens, `reporter` will set `s` to be non-ok. Therefore, Recover will stop processing the MANIFEST any further.
```
// Correct
// Inside Recover
LogReporter reporter;
reporter.status = &s;
log::Reader reader(..., reporter);
while (reader.ReadRecord() && s.ok()) {
...
}
```
The bug is that, the local variable `s` in `ReadAndRecover` won't be updated by `reporter` while reading the MANIFEST. It is possible that the reader sees a checksum mismatch in a record, but `ReadRecord` retries internally read and finds the next valid record. The mismatched record will be ignored and no error is reported.
```
// Incorrect
// Inside Recover
LogReporter reporter;
reporter.status = &s;
log::Reader reader(..., reporter);
s = ReadAndRecover(reader, ...);

// Inside ReadAndRecover
  Status s;  // Shadows the s in Recover.
  while (reader.ReadRecord() && s.ok()) {
   ...
  }
```
`LogReporter` can use a separate `log_read_status` to track the errors while reading the MANIFEST. RocksDB can process more MANIFEST entries only if `log_read_status.ok()`.
Test plan (devserver):
make check
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6996

Reviewed By: ajkr

Differential Revision: D22105746

Pulled By: riversand963

fbshipit-source-id: b22f717a423457a41ca152a242abbb64cf91fc38
main
Yanqin Jin 4 years ago committed by Facebook GitHub Bot
parent 775dc623ad
commit 569b87e8c7
  1. 3
      HISTORY.md
  2. 27
      db/db_basic_test.cc
  3. 2
      db/log_writer.cc
  4. 13
      db/version_edit_handler.cc
  5. 3
      db/version_edit_handler.h
  6. 55
      db/version_set.cc
  7. 16
      db/version_set.h

@ -12,6 +12,9 @@
### New Features ### New Features
* DB identity (`db_id`) and DB session identity (`db_session_id`) are added to table properties and stored in SST files. SST files generated from SstFileWriter and Repairer have DB identity “SST Writer” and “DB Repairer”, respectively. Their DB session IDs are generated in the same way as `DB::GetDbSessionId`. The session ID for SstFileWriter (resp., Repairer) resets every time `SstFileWriter::Open` (resp., `Repairer::Run`) is called. * DB identity (`db_id`) and DB session identity (`db_session_id`) are added to table properties and stored in SST files. SST files generated from SstFileWriter and Repairer have DB identity “SST Writer” and “DB Repairer”, respectively. Their DB session IDs are generated in the same way as `DB::GetDbSessionId`. The session ID for SstFileWriter (resp., Repairer) resets every time `SstFileWriter::Open` (resp., `Repairer::Run`) is called.
### Bug Fixes
* Fail recovery and report once hitting a physical log record checksum mismatch, while reading MANIFEST. RocksDB should not continue processing the MANIFEST any further.
## 6.11 (6/12/2020) ## 6.11 (6/12/2020)
### Bug Fixes ### Bug Fixes
* Fix consistency checking error swallowing in some cases when options.force_consistency_checks = true. * Fix consistency checking error swallowing in some cases when options.force_consistency_checks = true.

@ -2318,6 +2318,33 @@ TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) {
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
TEST_F(DBBasicTest, ManifestChecksumMismatch) {
Options options = CurrentOptions();
DestroyAndReopen(options);
ASSERT_OK(Put("bar", "value"));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum", [&](void* arg) {
auto* crc = reinterpret_cast<uint32_t*>(arg);
*crc = *crc + 1;
});
SyncPoint::GetInstance()->EnableProcessing();
WriteOptions write_opts;
write_opts.disableWAL = true;
Status s = db_->Put(write_opts, "foo", "value");
ASSERT_OK(s);
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_OK(Put("foo", "value1"));
ASSERT_OK(Flush());
s = TryReopen(options);
ASSERT_TRUE(s.IsCorruption());
}
class DBBasicTestMultiGet : public DBTestBase { class DBBasicTestMultiGet : public DBTestBase {
public: public:
DBBasicTestMultiGet(std::string test_dir, int num_cfs, bool compressed_cache, DBBasicTestMultiGet(std::string test_dir, int num_cfs, bool compressed_cache,

@ -147,6 +147,8 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
// Compute the crc of the record type and the payload. // Compute the crc of the record type and the payload.
crc = crc32c::Extend(crc, ptr, n); crc = crc32c::Extend(crc, ptr, n);
crc = crc32c::Mask(crc); // Adjust for storage crc = crc32c::Mask(crc); // Adjust for storage
TEST_SYNC_POINT_CALLBACK("LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum",
&crc);
EncodeFixed32(buf, crc); EncodeFixed32(buf, crc);
// Write the header and the payload // Write the header and the payload

@ -27,12 +27,17 @@ VersionEditHandler::VersionEditHandler(
assert(version_set_ != nullptr); assert(version_set_ != nullptr);
} }
Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) { void VersionEditHandler::Iterate(log::Reader& reader, Status* log_read_status,
std::string* db_id) {
Slice record; Slice record;
std::string scratch; std::string scratch;
assert(log_read_status);
assert(log_read_status->ok());
size_t recovered_edits = 0; size_t recovered_edits = 0;
Status s = Initialize(); Status s = Initialize();
while (reader.ReadRecord(&record, &scratch) && s.ok()) { while (s.ok() && reader.ReadRecord(&record, &scratch) &&
log_read_status->ok()) {
VersionEdit edit; VersionEdit edit;
s = edit.DecodeFrom(record); s = edit.DecodeFrom(record);
if (!s.ok()) { if (!s.ok()) {
@ -70,13 +75,15 @@ Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) {
} }
} }
} }
if (!log_read_status->ok()) {
s = *log_read_status;
}
CheckIterationResult(reader, &s); CheckIterationResult(reader, &s);
if (!s.ok()) { if (!s.ok()) {
status_ = s; status_ = s;
} }
return s;
} }
Status VersionEditHandler::Initialize() { Status VersionEditHandler::Initialize() {

@ -40,7 +40,8 @@ class VersionEditHandler {
virtual ~VersionEditHandler() {} virtual ~VersionEditHandler() {}
Status Iterate(log::Reader& reader, std::string* db_id); void Iterate(log::Reader& reader, Status* log_read_status,
std::string* db_id);
const Status& status() const { return status_; } const Status& status() const { return status_; }

@ -10,6 +10,7 @@
#include "db/version_set.h" #include "db/version_set.h"
#include <stdio.h> #include <stdio.h>
#include <algorithm> #include <algorithm>
#include <array> #include <array>
#include <cinttypes> #include <cinttypes>
@ -19,6 +20,7 @@
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include "compaction/compaction.h" #include "compaction/compaction.h"
#include "db/internal_stats.h" #include "db/internal_stats.h"
#include "db/log_reader.h" #include "db/log_reader.h"
@ -50,6 +52,7 @@
#include "table/table_reader.h" #include "table/table_reader.h"
#include "table/two_level_iterator.h" #include "table/two_level_iterator.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/cast_util.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/string_util.h" #include "util/string_util.h"
@ -4444,24 +4447,26 @@ Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
if (dbname.back() != '/') { if (dbname.back() != '/') {
manifest_path->push_back('/'); manifest_path->push_back('/');
} }
*manifest_path += fname; manifest_path->append(fname);
return Status::OK(); return Status::OK();
} }
Status VersionSet::ReadAndRecover( Status VersionSet::ReadAndRecover(
log::Reader* reader, AtomicGroupReadBuffer* read_buffer, log::Reader& reader, AtomicGroupReadBuffer* read_buffer,
const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options, const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
std::unordered_map<int, std::string>& column_families_not_found, std::unordered_map<int, std::string>& column_families_not_found,
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>& std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
builders, builders,
VersionEditParams* version_edit_params, std::string* db_id) { Status* log_read_status, VersionEditParams* version_edit_params,
assert(reader != nullptr); std::string* db_id) {
assert(read_buffer != nullptr); assert(read_buffer != nullptr);
assert(log_read_status != nullptr);
Status s; Status s;
Slice record; Slice record;
std::string scratch; std::string scratch;
size_t recovered_edits = 0; size_t recovered_edits = 0;
while (reader->ReadRecord(&record, &scratch) && s.ok()) { while (s.ok() && reader.ReadRecord(&record, &scratch) &&
log_read_status->ok()) {
VersionEdit edit; VersionEdit edit;
s = edit.DecodeFrom(record); s = edit.DecodeFrom(record);
if (!s.ok()) { if (!s.ok()) {
@ -4505,6 +4510,9 @@ Status VersionSet::ReadAndRecover(
} }
} }
} }
if (!log_read_status->ok()) {
s = *log_read_status;
}
if (!s.ok()) { if (!s.ok()) {
// Clear the buffer if we fail to decode/apply an edit. // Clear the buffer if we fail to decode/apply an edit.
read_buffer->Clear(); read_buffer->Clear();
@ -4551,8 +4559,7 @@ Status VersionSet::Recover(
db_options_->log_readahead_size)); db_options_->log_readahead_size));
} }
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>> VersionBuilderMap builders;
builders;
// add default column family // add default column family
auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName); auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
@ -4574,12 +4581,13 @@ Status VersionSet::Recover(
VersionEditParams version_edit_params; VersionEditParams version_edit_params;
{ {
VersionSet::LogReporter reporter; VersionSet::LogReporter reporter;
reporter.status = &s; Status log_read_status;
reporter.status = &log_read_status;
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter, log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
true /* checksum */, 0 /* log_number */); true /* checksum */, 0 /* log_number */);
AtomicGroupReadBuffer read_buffer; AtomicGroupReadBuffer read_buffer;
s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options, s = ReadAndRecover(reader, &read_buffer, cf_name_to_options,
column_families_not_found, builders, column_families_not_found, builders, &log_read_status,
&version_edit_params, db_id); &version_edit_params, db_id);
current_manifest_file_size = reader.GetReadOffset(); current_manifest_file_size = reader.GetReadOffset();
assert(current_manifest_file_size != 0); assert(current_manifest_file_size != 0);
@ -4845,21 +4853,20 @@ Status VersionSet::TryRecoverFromOneManifest(
db_options_->log_readahead_size)); db_options_->log_readahead_size));
} }
assert(s.ok());
VersionSet::LogReporter reporter; VersionSet::LogReporter reporter;
reporter.status = &s; reporter.status = &s;
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter, log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
/*checksum=*/true, /*log_num=*/0); /*checksum=*/true, /*log_num=*/0);
{ VersionEditHandlerPointInTime handler_pit(read_only, column_families,
VersionEditHandlerPointInTime handler_pit(read_only, column_families, const_cast<VersionSet*>(this));
const_cast<VersionSet*>(this));
s = handler_pit.Iterate(reader, db_id); handler_pit.Iterate(reader, &s, db_id);
assert(nullptr != has_missing_table_file); assert(nullptr != has_missing_table_file);
*has_missing_table_file = handler_pit.HasMissingFiles(); *has_missing_table_file = handler_pit.HasMissingFiles();
}
return s; return handler_pit.status();
} }
Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families, Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
@ -5980,8 +5987,7 @@ Status ReactiveVersionSet::Recover(
// In recovery, nobody else can access it, so it's fine to set it to be // In recovery, nobody else can access it, so it's fine to set it to be
// initialized earlier. // initialized earlier.
default_cfd->set_initialized(); default_cfd->set_initialized();
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>> VersionBuilderMap builders;
builders;
std::unordered_map<int, std::string> column_families_not_found; std::unordered_map<int, std::string> column_families_not_found;
builders.insert( builders.insert(
std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>( std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
@ -5989,7 +5995,7 @@ Status ReactiveVersionSet::Recover(
manifest_reader_status->reset(new Status()); manifest_reader_status->reset(new Status());
manifest_reporter->reset(new LogReporter()); manifest_reporter->reset(new LogReporter());
static_cast<LogReporter*>(manifest_reporter->get())->status = static_cast_with_check<LogReporter>(manifest_reporter->get())->status =
manifest_reader_status->get(); manifest_reader_status->get();
Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader); Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
log::Reader* reader = manifest_reader->get(); log::Reader* reader = manifest_reader->get();
@ -5998,10 +6004,9 @@ Status ReactiveVersionSet::Recover(
VersionEdit version_edit; VersionEdit version_edit;
while (s.ok() && retry < 1) { while (s.ok() && retry < 1) {
assert(reader != nullptr); assert(reader != nullptr);
Slice record; s = ReadAndRecover(*reader, &read_buffer_, cf_name_to_options,
std::string scratch; column_families_not_found, builders,
s = ReadAndRecover(reader, &read_buffer_, cf_name_to_options, manifest_reader_status->get(), &version_edit);
column_families_not_found, builders, &version_edit);
if (s.ok()) { if (s.ok()) {
bool enough = version_edit.has_next_file_number_ && bool enough = version_edit.has_next_file_number_ &&
version_edit.has_log_number_ && version_edit.has_log_number_ &&

@ -1165,6 +1165,10 @@ class VersionSet {
void SetIOStatusOK() { io_status_ = IOStatus::OK(); } void SetIOStatusOK() { io_status_ = IOStatus::OK(); }
protected: protected:
using VersionBuilderMap =
std::unordered_map<uint32_t,
std::unique_ptr<BaseReferencedVersionBuilder>>;
struct ManifestWriter; struct ManifestWriter;
friend class Version; friend class Version;
@ -1176,7 +1180,9 @@ class VersionSet {
struct LogReporter : public log::Reader::Reporter { struct LogReporter : public log::Reader::Reporter {
Status* status; Status* status;
virtual void Corruption(size_t /*bytes*/, const Status& s) override { virtual void Corruption(size_t /*bytes*/, const Status& s) override {
if (this->status->ok()) *this->status = s; if (status->ok()) {
*status = s;
}
} }
}; };
@ -1207,13 +1213,14 @@ class VersionSet {
const VersionEdit* edit); const VersionEdit* edit);
Status ReadAndRecover( Status ReadAndRecover(
log::Reader* reader, AtomicGroupReadBuffer* read_buffer, log::Reader& reader, AtomicGroupReadBuffer* read_buffer,
const std::unordered_map<std::string, ColumnFamilyOptions>& const std::unordered_map<std::string, ColumnFamilyOptions>&
name_to_options, name_to_options,
std::unordered_map<int, std::string>& column_families_not_found, std::unordered_map<int, std::string>& column_families_not_found,
std::unordered_map< std::unordered_map<
uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>& builders, uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>& builders,
VersionEditParams* version_edit, std::string* db_id = nullptr); Status* log_read_status, VersionEditParams* version_edit,
std::string* db_id = nullptr);
// REQUIRES db mutex // REQUIRES db mutex
Status ApplyOneVersionEditToBuilder( Status ApplyOneVersionEditToBuilder(
@ -1342,8 +1349,7 @@ class ReactiveVersionSet : public VersionSet {
std::unique_ptr<log::FragmentBufferedReader>* manifest_reader); std::unique_ptr<log::FragmentBufferedReader>* manifest_reader);
private: private:
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>> VersionBuilderMap active_version_builders_;
active_version_builders_;
AtomicGroupReadBuffer read_buffer_; AtomicGroupReadBuffer read_buffer_;
// Number of version edits to skip by ReadAndApply at the beginning of a new // Number of version edits to skip by ReadAndApply at the beginning of a new
// MANIFEST created by primary. // MANIFEST created by primary.

Loading…
Cancel
Save