From 9d212d3f0ecee2e6f72286497dc2b7a3e2d3ba58 Mon Sep 17 00:00:00 2001 From: Akanksha Mahajan Date: Tue, 29 Sep 2020 23:16:12 -0700 Subject: [PATCH] Provide users with option to opt-in to get corrupt data in logs/messages (#7420) Summary: Add a new Option "allow_data_in_errors". When it's set by users, it allows them to opt-in to get error messages containing corrupted keys/values. Corrupt keys, values will be logged in the messages, logs, status etc. that will help users with the useful information regarding affected data. By default value is set false to prevent users data to be exposed in the messages. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7420 Test Plan: 1. make check -j64 2. Add a new test case Reviewed By: ajkr Differential Revision: D23835028 Pulled By: akankshamahajan15 fbshipit-source-id: 8d2eba8fb898e79fcf1fccc07295065a75eb59b1 --- HISTORY.md | 1 + db/builder.cc | 2 +- db/compaction/compaction_iterator.cc | 24 ++++++++++++++-------- db/compaction/compaction_iterator.h | 4 ++++ db/compaction/compaction_iterator_test.cc | 3 ++- db/compaction/compaction_job.cc | 7 ++++--- db/corruption_test.cc | 25 +++++++++++++++++++++++ db/memtable.cc | 21 +++++++++++-------- db/memtable.h | 1 + include/rocksdb/options.h | 9 ++++++++ options/cf_options.cc | 3 ++- options/cf_options.h | 2 ++ options/db_options.cc | 9 +++++++- options/db_options.h | 1 + 14 files changed, 89 insertions(+), 23 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 791ceb98c..6c367de76 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ * The methods to create and manage EncrypedEnv have been changed. The EncryptionProvider is now passed to NewEncryptedEnv as a shared pointer, rather than a raw pointer. Comparably, the CTREncryptedProvider now takes a shared pointer, rather than a reference, to a BlockCipher. CreateFromString methods have been added to BlockCipher and EncryptionProvider to provide a single API by which different ciphers and providers can be created, respectively. * The internal classes (CTREncryptionProvider, ROT13BlockCipher, CTRCipherStream) associated with the EncryptedEnv have been moved out of the public API. To create a CTREncryptionProvider, one can either use EncryptionProvider::NewCTRProvider, or EncryptionProvider::CreateFromString("CTR"). To create a new ROT13BlockCipher, one can either use BlockCipher::NewROT13Cipher or BlockCipher::CreateFromString("ROT13"). * The EncryptionProvider::AddCipher method has been added to allow keys to be added to an EncryptionProvider. This API will allow future providers to support multiple cipher keys. +* Add a new option "allow_data_in_errors". When this new option is set by users, it allows users to opt-in to get error messages containing corrupted keys/values. Corrupt keys, values will be logged in the messages, logs, status etc. that will help users with the useful information regarding affected data. By default value of this option is set false to prevent users data to be exposed in the messages so currently, data will be redacted from logs, messages, status by default. ### General Improvements * The settings of the DBOptions and ColumnFamilyOptions are now managed by Configurable objects (see New Features). The same convenience methods to configure these options still exist but the backend implementation has been unified under a common implementation. diff --git a/db/builder.cc b/db/builder.cc index da668d604..f4fd717a4 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -180,7 +180,7 @@ Status BuildTable( &snapshots, earliest_write_conflict_snapshot, snapshot_checker, env, ShouldReportDetailedTime(env, ioptions.statistics), true /* internal key corruption is not ok */, range_del_agg.get(), - blob_file_builder.get()); + blob_file_builder.get(), ioptions.allow_data_in_errors); c_iter.SeekToFirst(); for (; c_iter.Valid(); c_iter.Next()) { diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 7f249632f..9655d8a99 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -39,8 +39,8 @@ CompactionIterator::CompactionIterator( const SnapshotChecker* snapshot_checker, Env* env, bool report_detailed_time, bool expect_valid_internal_key, CompactionRangeDelAggregator* range_del_agg, - BlobFileBuilder* blob_file_builder, const Compaction* compaction, - const CompactionFilter* compaction_filter, + BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, + const Compaction* compaction, const CompactionFilter* compaction_filter, const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, const std::atomic* manual_compaction_paused, @@ -49,7 +49,7 @@ CompactionIterator::CompactionIterator( input, cmp, merge_helper, last_sequence, snapshots, earliest_write_conflict_snapshot, snapshot_checker, env, report_detailed_time, expect_valid_internal_key, range_del_agg, - blob_file_builder, + blob_file_builder, allow_data_in_errors, std::unique_ptr( compaction ? new CompactionProxy(compaction) : nullptr), compaction_filter, shutting_down, preserve_deletes_seqnum, @@ -62,7 +62,7 @@ CompactionIterator::CompactionIterator( const SnapshotChecker* snapshot_checker, Env* env, bool report_detailed_time, bool expect_valid_internal_key, CompactionRangeDelAggregator* range_del_agg, - BlobFileBuilder* blob_file_builder, + BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, std::unique_ptr compaction, const CompactionFilter* compaction_filter, const std::atomic* shutting_down, @@ -89,7 +89,8 @@ CompactionIterator::CompactionIterator( current_user_key_snapshot_(0), merge_out_iter_(merge_helper_), current_key_committed_(false), - info_log_(info_log) { + info_log_(info_log), + allow_data_in_errors_(allow_data_in_errors) { assert(compaction_filter_ == nullptr || compaction_ != nullptr); assert(snapshots_ != nullptr); bottommost_level_ = compaction_ == nullptr @@ -271,13 +272,20 @@ void CompactionIterator::NextFromInput() { if (!ParseInternalKey(key_, &ikey_)) { iter_stats_.num_input_corrupt_records++; + // If `expect_valid_internal_key_` is false, return the corrupted key // and let the caller decide what to do with it. // TODO(noetzli): We should have a more elegant solution for this. if (expect_valid_internal_key_) { - assert(!"Corrupted internal key not expected."); - status_ = Status::Corruption("Corrupted internal key not expected."); - break; + std::string msg("Corrupted internal key not expected."); + if (allow_data_in_errors_) { + msg.append(" Corrupt key: " + ikey_.user_key.ToString(/*hex=*/true) + + ". "); + msg.append("key type: " + std::to_string(ikey_.type) + "."); + msg.append("seq: " + std::to_string(ikey_.sequence) + "."); + } + status_ = Status::Corruption(msg.c_str()); + return; } key_ = current_key_.SetInternalKey(key_); has_current_user_key_ = false; diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index e5b1cc8b1..29dedd3c7 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -69,6 +69,7 @@ class CompactionIterator { bool report_detailed_time, bool expect_valid_internal_key, CompactionRangeDelAggregator* range_del_agg, BlobFileBuilder* blob_file_builder, + bool allow_data_in_errors, const Compaction* compaction = nullptr, const CompactionFilter* compaction_filter = nullptr, const std::atomic* shutting_down = nullptr, @@ -85,6 +86,7 @@ class CompactionIterator { bool report_detailed_time, bool expect_valid_internal_key, CompactionRangeDelAggregator* range_del_agg, BlobFileBuilder* blob_file_builder, + bool allow_data_in_errors, std::unique_ptr compaction, const CompactionFilter* compaction_filter = nullptr, const std::atomic* shutting_down = nullptr, @@ -233,6 +235,8 @@ class CompactionIterator { bool current_key_committed_; std::shared_ptr info_log_; + bool allow_data_in_errors_; + bool IsShuttingDown() { // This is a best-effort facility, so memory_order_relaxed is sufficient. return shutting_down_ && shutting_down_->load(std::memory_order_relaxed); diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index 9b4e92929..57db42489 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -259,7 +259,8 @@ class CompactionIteratorTest : public testing::TestWithParam { earliest_write_conflict_snapshot, snapshot_checker_.get(), Env::Default(), false /* report_detailed_time */, false, range_del_agg_.get(), nullptr /* blob_file_builder */, - std::move(compaction), filter, &shutting_down_)); + false /*allow_data_in_errors*/, std::move(compaction), filter, + &shutting_down_)); } void AddSnapshot(SequenceNumber snapshot, diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 10610b6f6..457e57ebb 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -914,9 +914,10 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { &existing_snapshots_, earliest_write_conflict_snapshot_, snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), /*expect_valid_internal_key=*/true, &range_del_agg, - /* blob_file_builder */ nullptr, sub_compact->compaction, - compaction_filter, shutting_down_, preserve_deletes_seqnum_, - manual_compaction_paused_, db_options_.info_log)); + /* blob_file_builder */ nullptr, db_options_.allow_data_in_errors, + sub_compact->compaction, compaction_filter, shutting_down_, + preserve_deletes_seqnum_, manual_compaction_paused_, + db_options_.info_log)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) { diff --git a/db/corruption_test.cc b/db/corruption_test.cc index 1adff85b9..0633e23cf 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -619,6 +619,31 @@ TEST_F(CorruptionTest, ParanoidFileChecksOnCompact) { } } +TEST_F(CorruptionTest, LogCorruptionErrorsInCompactionIterator) { + Options options; + options.create_if_missing = true; + options.allow_data_in_errors = true; + auto mode = mock::MockTableFactory::kCorruptKey; + delete db_; + db_ = nullptr; + ASSERT_OK(DestroyDB(dbname_, options)); + + std::shared_ptr mock = + std::make_shared(); + mock->SetCorruptionMode(mode); + options.table_factory = mock; + + ASSERT_OK(DB::Open(options, dbname_, &db_)); + assert(db_ != nullptr); + Build(100, 2); + + DBImpl* dbi = static_cast_with_check(db_); + ASSERT_OK(dbi->TEST_FlushMemTable()); + Status s = dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true); + ASSERT_NOK(s); + ASSERT_TRUE(s.IsCorruption()); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/memtable.cc b/db/memtable.cc index a7d20d5fa..6aa7faea5 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -58,7 +58,8 @@ ImmutableMemTableOptions::ImmutableMemTableOptions( max_successive_merges(mutable_cf_options.max_successive_merges), statistics(ioptions.statistics), merge_operator(ioptions.merge_operator), - info_log(ioptions.info_log) {} + info_log(ioptions.info_log), + allow_data_in_errors(ioptions.allow_data_in_errors) {} MemTable::MemTable(const InternalKeyComparator& cmp, const ImmutableCFOptions& ioptions, @@ -623,7 +624,7 @@ struct Saver { Env* env_; ReadCallback* callback_; bool* is_blob_index; - + bool allow_data_in_errors; bool CheckCallback(SequenceNumber _seq) { if (callback_) { return callback_->IsVisible(_seq); @@ -778,14 +779,17 @@ static bool SaveValue(void* arg, const char* entry) { return true; } default: { - std::string msg("Unrecognized value type: " + - std::to_string(static_cast(type)) + ". "); - msg.append("User key: " + user_key_slice.ToString(/*hex=*/true) + ". "); - msg.append("seq: " + std::to_string(seq) + "."); + std::string msg("Corrupted value not expected."); + if (s->allow_data_in_errors) { + msg.append("Unrecognized value type: " + + std::to_string(static_cast(type)) + ". "); + msg.append("User key: " + user_key_slice.ToString(/*hex=*/true) + + ". "); + msg.append("seq: " + std::to_string(seq) + "."); + } *(s->status) = Status::Corruption(msg.c_str()); + return false; } - assert(false); - return true; } } @@ -881,6 +885,7 @@ void MemTable::GetFromTable(const LookupKey& key, saver.callback_ = callback; saver.is_blob_index = is_blob_index; saver.do_merge = do_merge; + saver.allow_data_in_errors = moptions_.allow_data_in_errors; table_->Get(key, &saver, SaveValue); *seq = saver.seq; } diff --git a/db/memtable.h b/db/memtable.h index e78d66424..d5bd4e95a 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -54,6 +54,7 @@ struct ImmutableMemTableOptions { Statistics* statistics; MergeOperator* merge_operator; Logger* info_log; + bool allow_data_in_errors; }; // Batched counters to updated when inserting keys in one write batch. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index ccc68c8a7..53c444a30 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1157,6 +1157,15 @@ struct DBOptions { // // Default: 1000000 (microseconds). uint64_t bgerror_resume_retry_interval = 1000000; + + // It allows user to opt-in to get error messages containing corrupted + // keys/values. Corrupt keys, values will be logged in the + // messages/logs/status that will help users with the useful information + // regarding affected data. By default value is set false to prevent users + // data to be exposed in the logs/messages etc. + // + // Default: false + bool allow_data_in_errors = false; }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/options/cf_options.cc b/options/cf_options.cc index 6fadec962..76eca44df 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -848,7 +848,8 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options, cf_paths(cf_options.cf_paths), compaction_thread_limiter(cf_options.compaction_thread_limiter), file_checksum_gen_factory(db_options.file_checksum_gen_factory.get()), - sst_partitioner_factory(cf_options.sst_partitioner_factory) {} + sst_partitioner_factory(cf_options.sst_partitioner_factory), + allow_data_in_errors(db_options.allow_data_in_errors) {} // Multiple two operands. If they overflow, return op1. uint64_t MultiplyCheckOverflow(uint64_t op1, double op2) { diff --git a/options/cf_options.h b/options/cf_options.h index ae5f6d08d..8d5b649e8 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -122,6 +122,8 @@ struct ImmutableCFOptions { FileChecksumGenFactory* file_checksum_gen_factory; std::shared_ptr sst_partitioner_factory; + + bool allow_data_in_errors; }; struct MutableCFOptions { diff --git a/options/db_options.cc b/options/db_options.cc index 4f97644c7..2b97c37db 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -415,6 +415,10 @@ static std::unordered_map } return s; }}}, + {"allow_data_in_errors", + {offsetof(struct ImmutableDBOptions, allow_data_in_errors), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, }; const std::string OptionsHelper::kDBOptionsName = "DBOptions"; @@ -564,7 +568,8 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) file_checksum_gen_factory(options.file_checksum_gen_factory), best_efforts_recovery(options.best_efforts_recovery), max_bgerror_resume_count(options.max_bgerror_resume_count), - bgerror_resume_retry_interval(options.bgerror_resume_retry_interval) { + bgerror_resume_retry_interval(options.bgerror_resume_retry_interval), + allow_data_in_errors(options.allow_data_in_errors) { } void ImmutableDBOptions::Dump(Logger* log) const { @@ -720,6 +725,8 @@ void ImmutableDBOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER(log, " Options.bgerror_resume_retry_interval: %" PRIu64, bgerror_resume_retry_interval); + ROCKS_LOG_HEADER(log, " Options.allow_data_in_errors: %d", + allow_data_in_errors); } MutableDBOptions::MutableDBOptions() diff --git a/options/db_options.h b/options/db_options.h index 4a3e73677..2684e01b5 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -90,6 +90,7 @@ struct ImmutableDBOptions { bool best_efforts_recovery; int max_bgerror_resume_count; uint64_t bgerror_resume_retry_interval; + bool allow_data_in_errors; }; struct MutableDBOptions {