Refactor wal filter processing during recovery (#10214)

Summary:
So that DBImpl::RecoverLogFiles do not have to deal with implementation
details of WalFilter.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10214

Test Plan: make check

Reviewed By: ajkr

Differential Revision: D37299122

Pulled By: riversand963

fbshipit-source-id: acf1a80f1ef75da393d375f55968b2f3ac189816
main
Yanqin Jin 3 years ago committed by Facebook GitHub Bot
parent f7605ec655
commit d654888b8f
  1. 10
      db/db_impl/db_impl.h
  2. 223
      db/db_impl/db_impl_open.cc

@ -1517,6 +1517,16 @@ class DBImpl : public DB {
// recovery.
Status LogAndApplyForRecovery(const RecoveryContext& recovery_ctx);
void InvokeWalFilterIfNeededOnColumnFamilyToWalNumberMap();
// Return true to proceed with current WAL record whose content is stored in
// `batch`. Return false to skip current WAL record.
bool InvokeWalFilterIfNeededOnWalRecord(uint64_t wal_number,
const std::string& wal_fname,
log::Reader::Reporter& reporter,
Status& status, bool& stop_replay,
WriteBatch& batch);
private:
friend class DB;
friend class ErrorHandler;

@ -845,6 +845,131 @@ Status DBImpl::LogAndApplyForRecovery(const RecoveryContext& recovery_ctx) {
return s;
}
void DBImpl::InvokeWalFilterIfNeededOnColumnFamilyToWalNumberMap() {
#ifndef ROCKSDB_LITE
if (immutable_db_options_.wal_filter == nullptr) {
return;
}
assert(immutable_db_options_.wal_filter != nullptr);
WalFilter& wal_filter = *(immutable_db_options_.wal_filter);
std::map<std::string, uint32_t> cf_name_id_map;
std::map<uint32_t, uint64_t> cf_lognumber_map;
assert(versions_);
assert(versions_->GetColumnFamilySet());
for (auto cfd : *versions_->GetColumnFamilySet()) {
assert(cfd);
cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
cf_lognumber_map.insert(std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
}
wal_filter.ColumnFamilyLogNumberMap(cf_lognumber_map, cf_name_id_map);
#endif // !ROCKSDB_LITE
}
bool DBImpl::InvokeWalFilterIfNeededOnWalRecord(uint64_t wal_number,
const std::string& wal_fname,
log::Reader::Reporter& reporter,
Status& status,
bool& stop_replay,
WriteBatch& batch) {
#ifndef ROCKSDB_LITE
if (immutable_db_options_.wal_filter == nullptr) {
return true;
}
assert(immutable_db_options_.wal_filter != nullptr);
WalFilter& wal_filter = *(immutable_db_options_.wal_filter);
WriteBatch new_batch;
bool batch_changed = false;
bool process_current_record = true;
WalFilter::WalProcessingOption wal_processing_option =
wal_filter.LogRecordFound(wal_number, wal_fname, batch, &new_batch,
&batch_changed);
switch (wal_processing_option) {
case WalFilter::WalProcessingOption::kContinueProcessing:
// do nothing, proceeed normally
break;
case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
// skip current record
process_current_record = false;
break;
case WalFilter::WalProcessingOption::kStopReplay:
// skip current record and stop replay
process_current_record = false;
stop_replay = true;
break;
case WalFilter::WalProcessingOption::kCorruptedRecord: {
status = Status::Corruption("Corruption reported by Wal Filter ",
wal_filter.Name());
MaybeIgnoreError(&status);
if (!status.ok()) {
process_current_record = false;
reporter.Corruption(batch.GetDataSize(), status);
}
break;
}
default: {
// logical error which should not happen. If RocksDB throws, we would
// just do `throw std::logic_error`.
assert(false);
status = Status::NotSupported(
"Unknown WalProcessingOption returned by Wal Filter ",
wal_filter.Name());
MaybeIgnoreError(&status);
if (!status.ok()) {
// Ignore the error with current record processing.
stop_replay = true;
}
break;
}
}
if (!process_current_record) {
return false;
}
if (batch_changed) {
// Make sure that the count in the new batch is
// within the orignal count.
int new_count = WriteBatchInternal::Count(&new_batch);
int original_count = WriteBatchInternal::Count(&batch);
if (new_count > original_count) {
ROCKS_LOG_FATAL(
immutable_db_options_.info_log,
"Recovering log #%" PRIu64
" mode %d log filter %s returned "
"more records (%d) than original (%d) which is not allowed. "
"Aborting recovery.",
wal_number, static_cast<int>(immutable_db_options_.wal_recovery_mode),
wal_filter.Name(), new_count, original_count);
status = Status::NotSupported(
"More than original # of records "
"returned by Wal Filter ",
wal_filter.Name());
return false;
}
// Set the same sequence number in the new_batch
// as the original batch.
WriteBatchInternal::SetSequence(&new_batch,
WriteBatchInternal::Sequence(&batch));
batch = new_batch;
}
return true;
#else // !ROCKSDB_LITE
(void)wal_number;
(void)wal_fname;
(void)reporter;
(void)status;
(void)stop_replay;
(void)batch;
return true;
#endif // ROCKSDB_LITE
}
// REQUIRES: wal_numbers are sorted in ascending order
Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
SequenceNumber* next_sequence, bool read_only,
@ -887,20 +1012,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
stream.EndArray();
}
#ifndef ROCKSDB_LITE
if (immutable_db_options_.wal_filter != nullptr) {
std::map<std::string, uint32_t> cf_name_id_map;
std::map<uint32_t, uint64_t> cf_lognumber_map;
for (auto cfd : *versions_->GetColumnFamilySet()) {
cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
cf_lognumber_map.insert(
std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
}
immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map,
cf_name_id_map);
}
#endif
// No-op for immutable_db_options_.wal_filter == nullptr.
InvokeWalFilterIfNeededOnColumnFamilyToWalNumberMap();
bool stop_replay_by_wal_filter = false;
bool stop_replay_for_corruption = false;
@ -1032,83 +1145,13 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
}
}
#ifndef ROCKSDB_LITE
if (immutable_db_options_.wal_filter != nullptr) {
WriteBatch new_batch;
bool batch_changed = false;
WalFilter::WalProcessingOption wal_processing_option =
immutable_db_options_.wal_filter->LogRecordFound(
wal_number, fname, batch, &new_batch, &batch_changed);
switch (wal_processing_option) {
case WalFilter::WalProcessingOption::kContinueProcessing:
// do nothing, proceeed normally
break;
case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
// skip current record
continue;
case WalFilter::WalProcessingOption::kStopReplay:
// skip current record and stop replay
stop_replay_by_wal_filter = true;
continue;
case WalFilter::WalProcessingOption::kCorruptedRecord: {
status =
Status::Corruption("Corruption reported by Wal Filter ",
immutable_db_options_.wal_filter->Name());
MaybeIgnoreError(&status);
if (!status.ok()) {
reporter.Corruption(record.size(), status);
continue;
}
break;
}
default: {
assert(false); // unhandled case
status = Status::NotSupported(
"Unknown WalProcessingOption returned"
" by Wal Filter ",
immutable_db_options_.wal_filter->Name());
MaybeIgnoreError(&status);
if (!status.ok()) {
return status;
} else {
// Ignore the error with current record processing.
continue;
}
}
}
if (batch_changed) {
// Make sure that the count in the new batch is
// within the orignal count.
int new_count = WriteBatchInternal::Count(&new_batch);
int original_count = WriteBatchInternal::Count(&batch);
if (new_count > original_count) {
ROCKS_LOG_FATAL(
immutable_db_options_.info_log,
"Recovering log #%" PRIu64
" mode %d log filter %s returned "
"more records (%d) than original (%d) which is not allowed. "
"Aborting recovery.",
wal_number,
static_cast<int>(immutable_db_options_.wal_recovery_mode),
immutable_db_options_.wal_filter->Name(), new_count,
original_count);
status = Status::NotSupported(
"More than original # of records "
"returned by Wal Filter ",
immutable_db_options_.wal_filter->Name());
return status;
}
// Set the same sequence number in the new_batch
// as the original batch.
WriteBatchInternal::SetSequence(&new_batch,
WriteBatchInternal::Sequence(&batch));
batch = new_batch;
}
// For the default case of wal_filter == nullptr, always performs no-op
// and returns true.
if (!InvokeWalFilterIfNeededOnWalRecord(wal_number, fname, reporter,
status, stop_replay_by_wal_filter,
batch)) {
continue;
}
#endif // ROCKSDB_LITE
// If column family was not found, it might mean that the WAL write
// batch references to the column family that was dropped after the

Loading…
Cancel
Save