Provide users with option to opt-in to get corrupt data in logs/messages (#7420)

Summary:
Add a new Option "allow_data_in_errors". When it's set by users, it allows them to opt-in to get error messages containing corrupted keys/values. Corrupt keys, values will be logged in the messages, logs, status etc. that will help users with the useful information regarding affected data.
By default value is set false to prevent users data to be exposed in the messages.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7420

Test Plan:
1. make check -j64
           2. Add a new test case

Reviewed By: ajkr

Differential Revision: D23835028

Pulled By: akankshamahajan15

fbshipit-source-id: 8d2eba8fb898e79fcf1fccc07295065a75eb59b1
main
Akanksha Mahajan 4 years ago committed by Facebook GitHub Bot
parent 1bdaef7a06
commit 9d212d3f0e
  1. 1
      HISTORY.md
  2. 2
      db/builder.cc
  3. 24
      db/compaction/compaction_iterator.cc
  4. 4
      db/compaction/compaction_iterator.h
  5. 3
      db/compaction/compaction_iterator_test.cc
  6. 7
      db/compaction/compaction_job.cc
  7. 25
      db/corruption_test.cc
  8. 17
      db/memtable.cc
  9. 1
      db/memtable.h
  10. 9
      include/rocksdb/options.h
  11. 3
      options/cf_options.cc
  12. 2
      options/cf_options.h
  13. 9
      options/db_options.cc
  14. 1
      options/db_options.h

@ -6,6 +6,7 @@
* The methods to create and manage EncrypedEnv have been changed. The EncryptionProvider is now passed to NewEncryptedEnv as a shared pointer, rather than a raw pointer. Comparably, the CTREncryptedProvider now takes a shared pointer, rather than a reference, to a BlockCipher. CreateFromString methods have been added to BlockCipher and EncryptionProvider to provide a single API by which different ciphers and providers can be created, respectively.
* The internal classes (CTREncryptionProvider, ROT13BlockCipher, CTRCipherStream) associated with the EncryptedEnv have been moved out of the public API. To create a CTREncryptionProvider, one can either use EncryptionProvider::NewCTRProvider, or EncryptionProvider::CreateFromString("CTR"). To create a new ROT13BlockCipher, one can either use BlockCipher::NewROT13Cipher or BlockCipher::CreateFromString("ROT13").
* The EncryptionProvider::AddCipher method has been added to allow keys to be added to an EncryptionProvider. This API will allow future providers to support multiple cipher keys.
* Add a new option "allow_data_in_errors". When this new option is set by users, it allows users to opt-in to get error messages containing corrupted keys/values. Corrupt keys, values will be logged in the messages, logs, status etc. that will help users with the useful information regarding affected data. By default value of this option is set false to prevent users data to be exposed in the messages so currently, data will be redacted from logs, messages, status by default.
### General Improvements
* The settings of the DBOptions and ColumnFamilyOptions are now managed by Configurable objects (see New Features). The same convenience methods to configure these options still exist but the backend implementation has been unified under a common implementation.

@ -180,7 +180,7 @@ Status BuildTable(
&snapshots, earliest_write_conflict_snapshot, snapshot_checker, env,
ShouldReportDetailedTime(env, ioptions.statistics),
true /* internal key corruption is not ok */, range_del_agg.get(),
blob_file_builder.get());
blob_file_builder.get(), ioptions.allow_data_in_errors);
c_iter.SeekToFirst();
for (; c_iter.Valid(); c_iter.Next()) {

@ -39,8 +39,8 @@ CompactionIterator::CompactionIterator(
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder, const Compaction* compaction,
const CompactionFilter* compaction_filter,
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
const Compaction* compaction, const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum,
const std::atomic<int>* manual_compaction_paused,
@ -49,7 +49,7 @@ CompactionIterator::CompactionIterator(
input, cmp, merge_helper, last_sequence, snapshots,
earliest_write_conflict_snapshot, snapshot_checker, env,
report_detailed_time, expect_valid_internal_key, range_del_agg,
blob_file_builder,
blob_file_builder, allow_data_in_errors,
std::unique_ptr<CompactionProxy>(
compaction ? new CompactionProxy(compaction) : nullptr),
compaction_filter, shutting_down, preserve_deletes_seqnum,
@ -62,7 +62,7 @@ CompactionIterator::CompactionIterator(
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder,
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
@ -89,7 +89,8 @@ CompactionIterator::CompactionIterator(
current_user_key_snapshot_(0),
merge_out_iter_(merge_helper_),
current_key_committed_(false),
info_log_(info_log) {
info_log_(info_log),
allow_data_in_errors_(allow_data_in_errors) {
assert(compaction_filter_ == nullptr || compaction_ != nullptr);
assert(snapshots_ != nullptr);
bottommost_level_ = compaction_ == nullptr
@ -271,13 +272,20 @@ void CompactionIterator::NextFromInput() {
if (!ParseInternalKey(key_, &ikey_)) {
iter_stats_.num_input_corrupt_records++;
// If `expect_valid_internal_key_` is false, return the corrupted key
// and let the caller decide what to do with it.
// TODO(noetzli): We should have a more elegant solution for this.
if (expect_valid_internal_key_) {
assert(!"Corrupted internal key not expected.");
status_ = Status::Corruption("Corrupted internal key not expected.");
break;
std::string msg("Corrupted internal key not expected.");
if (allow_data_in_errors_) {
msg.append(" Corrupt key: " + ikey_.user_key.ToString(/*hex=*/true) +
". ");
msg.append("key type: " + std::to_string(ikey_.type) + ".");
msg.append("seq: " + std::to_string(ikey_.sequence) + ".");
}
status_ = Status::Corruption(msg.c_str());
return;
}
key_ = current_key_.SetInternalKey(key_);
has_current_user_key_ = false;

@ -69,6 +69,7 @@ class CompactionIterator {
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder,
bool allow_data_in_errors,
const Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
@ -85,6 +86,7 @@ class CompactionIterator {
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder,
bool allow_data_in_errors,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
@ -233,6 +235,8 @@ class CompactionIterator {
bool current_key_committed_;
std::shared_ptr<Logger> info_log_;
bool allow_data_in_errors_;
bool IsShuttingDown() {
// This is a best-effort facility, so memory_order_relaxed is sufficient.
return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);

@ -259,7 +259,8 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
earliest_write_conflict_snapshot, snapshot_checker_.get(),
Env::Default(), false /* report_detailed_time */, false,
range_del_agg_.get(), nullptr /* blob_file_builder */,
std::move(compaction), filter, &shutting_down_));
false /*allow_data_in_errors*/, std::move(compaction), filter,
&shutting_down_));
}
void AddSnapshot(SequenceNumber snapshot,

@ -914,9 +914,10 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
&existing_snapshots_, earliest_write_conflict_snapshot_,
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_),
/*expect_valid_internal_key=*/true, &range_del_agg,
/* blob_file_builder */ nullptr, sub_compact->compaction,
compaction_filter, shutting_down_, preserve_deletes_seqnum_,
manual_compaction_paused_, db_options_.info_log));
/* blob_file_builder */ nullptr, db_options_.allow_data_in_errors,
sub_compact->compaction, compaction_filter, shutting_down_,
preserve_deletes_seqnum_, manual_compaction_paused_,
db_options_.info_log));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {

@ -619,6 +619,31 @@ TEST_F(CorruptionTest, ParanoidFileChecksOnCompact) {
}
}
TEST_F(CorruptionTest, LogCorruptionErrorsInCompactionIterator) {
Options options;
options.create_if_missing = true;
options.allow_data_in_errors = true;
auto mode = mock::MockTableFactory::kCorruptKey;
delete db_;
db_ = nullptr;
ASSERT_OK(DestroyDB(dbname_, options));
std::shared_ptr<mock::MockTableFactory> mock =
std::make_shared<mock::MockTableFactory>();
mock->SetCorruptionMode(mode);
options.table_factory = mock;
ASSERT_OK(DB::Open(options, dbname_, &db_));
assert(db_ != nullptr);
Build(100, 2);
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
ASSERT_OK(dbi->TEST_FlushMemTable());
Status s = dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true);
ASSERT_NOK(s);
ASSERT_TRUE(s.IsCorruption());
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -58,7 +58,8 @@ ImmutableMemTableOptions::ImmutableMemTableOptions(
max_successive_merges(mutable_cf_options.max_successive_merges),
statistics(ioptions.statistics),
merge_operator(ioptions.merge_operator),
info_log(ioptions.info_log) {}
info_log(ioptions.info_log),
allow_data_in_errors(ioptions.allow_data_in_errors) {}
MemTable::MemTable(const InternalKeyComparator& cmp,
const ImmutableCFOptions& ioptions,
@ -623,7 +624,7 @@ struct Saver {
Env* env_;
ReadCallback* callback_;
bool* is_blob_index;
bool allow_data_in_errors;
bool CheckCallback(SequenceNumber _seq) {
if (callback_) {
return callback_->IsVisible(_seq);
@ -778,14 +779,17 @@ static bool SaveValue(void* arg, const char* entry) {
return true;
}
default: {
std::string msg("Unrecognized value type: " +
std::string msg("Corrupted value not expected.");
if (s->allow_data_in_errors) {
msg.append("Unrecognized value type: " +
std::to_string(static_cast<int>(type)) + ". ");
msg.append("User key: " + user_key_slice.ToString(/*hex=*/true) + ". ");
msg.append("User key: " + user_key_slice.ToString(/*hex=*/true) +
". ");
msg.append("seq: " + std::to_string(seq) + ".");
}
*(s->status) = Status::Corruption(msg.c_str());
return false;
}
assert(false);
return true;
}
}
@ -881,6 +885,7 @@ void MemTable::GetFromTable(const LookupKey& key,
saver.callback_ = callback;
saver.is_blob_index = is_blob_index;
saver.do_merge = do_merge;
saver.allow_data_in_errors = moptions_.allow_data_in_errors;
table_->Get(key, &saver, SaveValue);
*seq = saver.seq;
}

@ -54,6 +54,7 @@ struct ImmutableMemTableOptions {
Statistics* statistics;
MergeOperator* merge_operator;
Logger* info_log;
bool allow_data_in_errors;
};
// Batched counters to updated when inserting keys in one write batch.

@ -1157,6 +1157,15 @@ struct DBOptions {
//
// Default: 1000000 (microseconds).
uint64_t bgerror_resume_retry_interval = 1000000;
// It allows user to opt-in to get error messages containing corrupted
// keys/values. Corrupt keys, values will be logged in the
// messages/logs/status that will help users with the useful information
// regarding affected data. By default value is set false to prevent users
// data to be exposed in the logs/messages etc.
//
// Default: false
bool allow_data_in_errors = false;
};
// Options to control the behavior of a database (passed to DB::Open)

@ -848,7 +848,8 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options,
cf_paths(cf_options.cf_paths),
compaction_thread_limiter(cf_options.compaction_thread_limiter),
file_checksum_gen_factory(db_options.file_checksum_gen_factory.get()),
sst_partitioner_factory(cf_options.sst_partitioner_factory) {}
sst_partitioner_factory(cf_options.sst_partitioner_factory),
allow_data_in_errors(db_options.allow_data_in_errors) {}
// Multiple two operands. If they overflow, return op1.
uint64_t MultiplyCheckOverflow(uint64_t op1, double op2) {

@ -122,6 +122,8 @@ struct ImmutableCFOptions {
FileChecksumGenFactory* file_checksum_gen_factory;
std::shared_ptr<SstPartitionerFactory> sst_partitioner_factory;
bool allow_data_in_errors;
};
struct MutableCFOptions {

@ -415,6 +415,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
}
return s;
}}},
{"allow_data_in_errors",
{offsetof(struct ImmutableDBOptions, allow_data_in_errors),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
};
const std::string OptionsHelper::kDBOptionsName = "DBOptions";
@ -564,7 +568,8 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
file_checksum_gen_factory(options.file_checksum_gen_factory),
best_efforts_recovery(options.best_efforts_recovery),
max_bgerror_resume_count(options.max_bgerror_resume_count),
bgerror_resume_retry_interval(options.bgerror_resume_retry_interval) {
bgerror_resume_retry_interval(options.bgerror_resume_retry_interval),
allow_data_in_errors(options.allow_data_in_errors) {
}
void ImmutableDBOptions::Dump(Logger* log) const {
@ -720,6 +725,8 @@ void ImmutableDBOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(log,
" Options.bgerror_resume_retry_interval: %" PRIu64,
bgerror_resume_retry_interval);
ROCKS_LOG_HEADER(log, " Options.allow_data_in_errors: %d",
allow_data_in_errors);
}
MutableDBOptions::MutableDBOptions()

@ -90,6 +90,7 @@ struct ImmutableDBOptions {
bool best_efforts_recovery;
int max_bgerror_resume_count;
uint64_t bgerror_resume_retry_interval;
bool allow_data_in_errors;
};
struct MutableDBOptions {

Loading…
Cancel
Save