options.paranoid_file_checks to read all rows after writing to a file.

Summary: To further distinguish the corruption cases were caused by storage media or in memory states when writing it, add a paranoid check after writing the file to iterate all the rows.

Test Plan: Add a new unit test for it

Reviewers: rven, igor

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D37335
main
sdong 10 years ago
parent 283a042969
commit 397b6588bd
  1. 7
      db/builder.cc
  2. 2
      db/builder.h
  3. 6
      db/compaction_job.cc
  4. 44
      db/db_test.cc
  5. 3
      db/flush_job.cc
  6. 2
      db/repair.cc
  7. 4
      include/rocksdb/options.h
  8. 7
      util/mutable_cf_options.h
  9. 6
      util/options.cc
  10. 2
      util/options_helper.cc

@ -50,7 +50,7 @@ Status BuildTable(
const SequenceNumber newest_snapshot, const SequenceNumber newest_snapshot,
const SequenceNumber earliest_seqno_in_memtable, const SequenceNumber earliest_seqno_in_memtable,
const CompressionType compression, const CompressionType compression,
const CompressionOptions& compression_opts, const CompressionOptions& compression_opts, bool paranoid_file_checks,
const Env::IOPriority io_priority) { const Env::IOPriority io_priority) {
Status s; Status s;
meta->fd.file_size = 0; meta->fd.file_size = 0;
@ -227,6 +227,11 @@ Status BuildTable(
Iterator* it = table_cache->NewIterator(ReadOptions(), env_options, Iterator* it = table_cache->NewIterator(ReadOptions(), env_options,
internal_comparator, meta->fd); internal_comparator, meta->fd);
s = it->status(); s = it->status();
if (s.ok() && paranoid_file_checks) {
for (it->SeekToFirst(); it->Valid(); it->Next()) {}
s = it->status();
}
delete it; delete it;
} }
} }

@ -52,7 +52,7 @@ extern Status BuildTable(
const SequenceNumber newest_snapshot, const SequenceNumber newest_snapshot,
const SequenceNumber earliest_seqno_in_memtable, const SequenceNumber earliest_seqno_in_memtable,
const CompressionType compression, const CompressionType compression,
const CompressionOptions& compression_opts, const CompressionOptions& compression_opts, bool paranoid_file_checks,
const Env::IOPriority io_priority = Env::IO_HIGH); const Env::IOPriority io_priority = Env::IO_HIGH);
} // namespace rocksdb } // namespace rocksdb

@ -984,6 +984,12 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) {
Iterator* iter = cfd->table_cache()->NewIterator( Iterator* iter = cfd->table_cache()->NewIterator(
ReadOptions(), env_options_, cfd->internal_comparator(), fd); ReadOptions(), env_options_, cfd->internal_comparator(), fd);
s = iter->status(); s = iter->status();
if (s.ok() && mutable_cf_options_.paranoid_file_checks) {
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {}
s = iter->status();
}
delete iter; delete iter;
if (s.ok()) { if (s.ok()) {
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,

@ -1587,6 +1587,50 @@ TEST_F(DBTest, IndexAndFilterBlocksOfNewTableAddedToCache) {
TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
} }
TEST_F(DBTest, ParanoidFileChecks) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.statistics = rocksdb::CreateDBStatistics();
options.level0_file_num_compaction_trigger = 2;
options.paranoid_file_checks = true;
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = false;
table_options.filter_policy.reset(NewBloomFilterPolicy(20));
options.table_factory.reset(new BlockBasedTableFactory(table_options));
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put(1, "1_key", "val"));
ASSERT_OK(Put(1, "9_key", "val"));
// Create a new table.
ASSERT_OK(Flush(1));
ASSERT_EQ(1, /* read and cache data block */
TestGetTickerCount(options, BLOCK_CACHE_ADD));
ASSERT_OK(Put(1, "1_key2", "val2"));
ASSERT_OK(Put(1, "9_key2", "val2"));
// Create a new SST file. This will further trigger a compaction
// and generate another file.
ASSERT_OK(Flush(1));
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(3, /* Totally 3 files created up to now */
TestGetTickerCount(options, BLOCK_CACHE_ADD));
// After disabling options.paranoid_file_checks. NO further block
// is added after generating a new file.
ASSERT_OK(
dbfull()->SetOptions(handles_[1], {{"paranoid_file_checks", "false"}}));
ASSERT_OK(Put(1, "1_key3", "val3"));
ASSERT_OK(Put(1, "9_key3", "val3"));
ASSERT_OK(Flush(1));
ASSERT_OK(Put(1, "1_key4", "val4"));
ASSERT_OK(Put(1, "9_key4", "val4"));
ASSERT_OK(Flush(1));
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(3, /* Totally 3 files created up to now */
TestGetTickerCount(options, BLOCK_CACHE_ADD));
}
TEST_F(DBTest, GetPropertiesOfAllTablesTest) { TEST_F(DBTest, GetPropertiesOfAllTablesTest) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.max_background_flushes = 0; options.max_background_flushes = 0;

@ -187,7 +187,8 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
cfd_->internal_comparator(), cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(), newest_snapshot_, cfd_->int_tbl_prop_collector_factories(), newest_snapshot_,
earliest_seqno_in_memtable, output_compression_, earliest_seqno_in_memtable, output_compression_,
cfd_->ioptions()->compression_opts, Env::IO_HIGH); cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks, Env::IO_HIGH);
LogFlush(db_options_.info_log); LogFlush(db_options_.info_log);
} }
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,

@ -289,7 +289,7 @@ class Repairer {
status = BuildTable(dbname_, env_, ioptions_, env_options_, table_cache_, status = BuildTable(dbname_, env_, ioptions_, env_options_, table_cache_,
iter.get(), &meta, icmp_, iter.get(), &meta, icmp_,
&int_tbl_prop_collector_factories_, 0, 0, &int_tbl_prop_collector_factories_, 0, 0,
kNoCompression, CompressionOptions()); kNoCompression, CompressionOptions(), false);
} }
delete mem->Unref(); delete mem->Unref();
delete cf_mems_default; delete cf_mems_default;

@ -705,6 +705,10 @@ struct ColumnFamilyOptions {
// Default: false // Default: false
bool optimize_filters_for_hits; bool optimize_filters_for_hits;
// After writing every SST file, reopen it and read all the keys.
// Default: false
bool paranoid_file_checks;
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
// A vector of EventListeners which call-back functions will be called // A vector of EventListeners which call-back functions will be called
// when specific RocksDB event happens. // when specific RocksDB event happens.

@ -42,7 +42,8 @@ struct MutableCFOptions {
max_mem_compaction_level(options.max_mem_compaction_level), max_mem_compaction_level(options.max_mem_compaction_level),
verify_checksums_in_compaction(options.verify_checksums_in_compaction), verify_checksums_in_compaction(options.verify_checksums_in_compaction),
max_sequential_skip_in_iterations( max_sequential_skip_in_iterations(
options.max_sequential_skip_in_iterations) options.max_sequential_skip_in_iterations),
paranoid_file_checks(options.paranoid_file_checks)
{ {
RefreshDerivedOptions(ioptions); RefreshDerivedOptions(ioptions);
} }
@ -71,7 +72,8 @@ struct MutableCFOptions {
max_bytes_for_level_multiplier(0), max_bytes_for_level_multiplier(0),
max_mem_compaction_level(0), max_mem_compaction_level(0),
verify_checksums_in_compaction(false), verify_checksums_in_compaction(false),
max_sequential_skip_in_iterations(0) max_sequential_skip_in_iterations(0),
paranoid_file_checks(false)
{} {}
// Must be called after any change to MutableCFOptions // Must be called after any change to MutableCFOptions
@ -125,6 +127,7 @@ struct MutableCFOptions {
// Misc options // Misc options
uint64_t max_sequential_skip_in_iterations; uint64_t max_sequential_skip_in_iterations;
bool paranoid_file_checks;
// Derived options // Derived options
// Per-level target file size. // Per-level target file size.

@ -128,7 +128,8 @@ ColumnFamilyOptions::ColumnFamilyOptions()
bloom_locality(0), bloom_locality(0),
max_successive_merges(0), max_successive_merges(0),
min_partial_merge_operands(2), min_partial_merge_operands(2),
optimize_filters_for_hits(false) optimize_filters_for_hits(false),
paranoid_file_checks(false)
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
, ,
listeners() { listeners() {
@ -197,7 +198,8 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
bloom_locality(options.bloom_locality), bloom_locality(options.bloom_locality),
max_successive_merges(options.max_successive_merges), max_successive_merges(options.max_successive_merges),
min_partial_merge_operands(options.min_partial_merge_operands), min_partial_merge_operands(options.min_partial_merge_operands),
optimize_filters_for_hits(options.optimize_filters_for_hits) optimize_filters_for_hits(options.optimize_filters_for_hits),
paranoid_file_checks(options.paranoid_file_checks)
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
, ,
listeners(options.listeners) { listeners(options.listeners) {

@ -225,6 +225,8 @@ bool ParseMiscOptions(const std::string& name, const std::string& value,
OptionsType* new_options) { OptionsType* new_options) {
if (name == "max_sequential_skip_in_iterations") { if (name == "max_sequential_skip_in_iterations") {
new_options->max_sequential_skip_in_iterations = ParseUint64(value); new_options->max_sequential_skip_in_iterations = ParseUint64(value);
} else if (name == "paranoid_file_checks") {
new_options->paranoid_file_checks = ParseBoolean(name, value);
} else { } else {
return false; return false;
} }

Loading…
Cancel
Save