Check iterator status BlockBasedTableReader::VerifyChecksumInBlocks() (#6909)

Summary:
The ```for``` loop in ```VerifyChecksumInBlocks``` only checks ```index_iter->Valid()``` which could be ```false``` either due to reaching the end of the index or, in case of partitioned index, it could be due to a checksum mismatch error when reading a 2nd level index block. Instead of throwing away the index iterator status, we need to return any errors back to the caller.

Tests:
Add a test in block_based_table_reader_test.cc.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6909

Reviewed By: pdillinger

Differential Revision: D21833922

Pulled By: anand1976

fbshipit-source-id: bc778ebf1121dbbdd768689de5183f07a9f0beae
main
anand76 4 years ago committed by Facebook GitHub Bot
parent 1bee0fca05
commit 98b0cbea88
  1. 1
      HISTORY.md
  2. 42
      db/corruption_test.cc
  3. 6
      table/block_based/block_based_table_reader.cc
  4. 1
      table/block_based/block_based_table_reader.h
  5. 120
      table/block_based/block_based_table_reader_test.cc
  6. 1
      table/block_based/partitioned_index_iterator.h
  7. 42
      test_util/testutil.cc
  8. 2
      test_util/testutil.h

@ -12,6 +12,7 @@
* Fix corrupt key read from ingested file when iterator direction switches from reverse to forward at a key that is a prefix of another key in the same file. It is only possible in files with a non-zero global seqno.
* Fix abnormally large estimate from GetApproximateSizes when a range starts near the end of one SST file and near the beginning of another. Now GetApproximateSizes consistently and fairly includes the size of SST metadata in addition to data blocks, attributing metadata proportionally among the data blocks based on their size.
* Fix potential file descriptor leakage in PosixEnv's IsDirectory() and NewRandomAccessFile().
* Fix false negative from the VerifyChecksum() API when there is a checksum mismatch in an index partition block in a BlockBasedTable format table file (index_type is kTwoLevelIndexSearch).
### Public API Change
* Flush(..., column_family) may return Status::ColumnFamilyDropped() instead of Status::InvalidArgument() if column_family is dropped while processing the flush request.

@ -157,42 +157,6 @@ class CorruptionTest : public testing::Test {
ASSERT_GE(max_expected, correct);
}
void CorruptFile(const std::string& fname, int offset, int bytes_to_corrupt) {
struct stat sbuf;
if (stat(fname.c_str(), &sbuf) != 0) {
const char* msg = strerror(errno);
FAIL() << fname << ": " << msg;
}
if (offset < 0) {
// Relative to end of file; make it absolute
if (-offset > sbuf.st_size) {
offset = 0;
} else {
offset = static_cast<int>(sbuf.st_size + offset);
}
}
if (offset > sbuf.st_size) {
offset = static_cast<int>(sbuf.st_size);
}
if (offset + bytes_to_corrupt > sbuf.st_size) {
bytes_to_corrupt = static_cast<int>(sbuf.st_size - offset);
}
// Do it
std::string contents;
Status s = ReadFileToString(Env::Default(), fname, &contents);
ASSERT_TRUE(s.ok()) << s.ToString();
for (int i = 0; i < bytes_to_corrupt; i++) {
contents[i + offset] ^= 0x80;
}
s = WriteStringToFile(Env::Default(), contents, fname);
ASSERT_TRUE(s.ok()) << s.ToString();
Options options;
EnvOptions env_options;
ASSERT_NOK(VerifySstFileChecksum(options, env_options, fname));
}
void Corrupt(FileType filetype, int offset, int bytes_to_corrupt) {
// Pick file to corrupt
std::vector<std::string> filenames;
@ -211,7 +175,7 @@ class CorruptionTest : public testing::Test {
}
ASSERT_TRUE(!fname.empty()) << filetype;
CorruptFile(fname, offset, bytes_to_corrupt);
test::CorruptFile(fname, offset, bytes_to_corrupt);
}
// corrupts exactly one file at level `level`. if no file found at level,
@ -221,7 +185,7 @@ class CorruptionTest : public testing::Test {
db_->GetLiveFilesMetaData(&metadata);
for (const auto& m : metadata) {
if (m.level == level) {
CorruptFile(dbname_ + "/" + m.name, offset, bytes_to_corrupt);
test::CorruptFile(dbname_ + "/" + m.name, offset, bytes_to_corrupt);
return;
}
}
@ -556,7 +520,7 @@ TEST_F(CorruptionTest, RangeDeletionCorrupted) {
ImmutableCFOptions(options_), kRangeDelBlock, &range_del_handle));
ASSERT_OK(TryReopen());
CorruptFile(filename, static_cast<int>(range_del_handle.offset()), 1);
test::CorruptFile(filename, static_cast<int>(range_del_handle.offset()), 1);
ASSERT_TRUE(TryReopen().IsCorruption());
}

@ -2837,6 +2837,12 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
break;
}
}
if (s.ok()) {
// In the case of two level indexes, we would have exited the above loop
// by checking index_iter->Valid(), but Valid() might have returned false
// due to an IO error. So check the index_iter status
s = index_iter->status();
}
return s;
}

@ -255,6 +255,7 @@ class BlockBasedTable : public TableReader {
private:
friend class MockedBlockBasedTable;
friend class BlockBasedTableReaderTestVerifyChecksum_ChecksumMismatch_Test;
static std::atomic<uint64_t> next_cache_key_id_;
BlockCacheTracer* const block_cache_tracer_;

@ -4,6 +4,8 @@
// (found in the LICENSE.Apache file in the root directory).
#include "table/block_based/block_based_table_reader.h"
#include "rocksdb/file_system.h"
#include "table/block_based/partitioned_index_iterator.h"
#include "db/table_properties_collector.h"
#include "options/options_helper.h"
@ -19,19 +21,29 @@ namespace ROCKSDB_NAMESPACE {
class BlockBasedTableReaderTest
: public testing::Test,
public testing::WithParamInterface<std::tuple<CompressionType, bool>> {
public testing::WithParamInterface<std::tuple<
CompressionType, bool, BlockBasedTableOptions::IndexType, bool>> {
protected:
CompressionType compression_type_;
bool use_direct_reads_;
void SetUp() override {
std::tie(compression_type_, use_direct_reads_) = GetParam();
BlockBasedTableOptions::IndexType index_type;
bool no_block_cache;
std::tie(compression_type_, use_direct_reads_, index_type, no_block_cache) =
GetParam();
test::SetupSyncPointsToMockDirectIO();
test_dir_ = test::PerThreadDBPath("block_based_table_reader_test");
env_ = Env::Default();
fs_ = FileSystem::Default();
ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr));
BlockBasedTableOptions opts;
opts.index_type = index_type;
opts.no_block_cache = no_block_cache;
table_factory_.reset(
static_cast<BlockBasedTableFactory*>(NewBlockBasedTableFactory(opts)));
}
void TearDown() override { EXPECT_OK(test::DestroyDir(env_, test_dir_)); }
@ -50,7 +62,7 @@ class BlockBasedTableReaderTest
ColumnFamilyOptions cf_options;
MutableCFOptions moptions(cf_options);
std::vector<std::unique_ptr<IntTblPropCollectorFactory>> factories;
std::unique_ptr<TableBuilder> table_builder(table_factory_.NewTableBuilder(
std::unique_ptr<TableBuilder> table_builder(table_factory_->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, comparator, &factories,
compression_type, 0 /* sample_for_compression */,
CompressionOptions(), false /* skip_filters */,
@ -79,19 +91,21 @@ class BlockBasedTableReaderTest
std::unique_ptr<TableReader> table_reader;
ASSERT_OK(BlockBasedTable::Open(ioptions, EnvOptions(),
table_factory_.table_options(), comparator,
table_factory_->table_options(), comparator,
std::move(file), file_size, &table_reader));
table->reset(reinterpret_cast<BlockBasedTable*>(table_reader.release()));
}
std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; }
const std::shared_ptr<FileSystem>& fs() const { return fs_; }
private:
std::string test_dir_;
Env* env_;
std::shared_ptr<FileSystem> fs_;
BlockBasedTableFactory table_factory_;
std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; }
std::unique_ptr<BlockBasedTableFactory> table_factory_;
void WriteToFile(const std::string& content, const std::string& filename) {
std::unique_ptr<FSWritableFile> f;
@ -219,20 +233,104 @@ TEST_P(BlockBasedTableReaderTest, MultiGet) {
}
}
class BlockBasedTableReaderTestVerifyChecksum
: public BlockBasedTableReaderTest {
public:
BlockBasedTableReaderTestVerifyChecksum() : BlockBasedTableReaderTest() {}
};
TEST_P(BlockBasedTableReaderTestVerifyChecksum, ChecksumMismatch) {
// Prepare key-value pairs to occupy multiple blocks.
// Each value is 256B, every 16 pairs constitute 1 block.
// Adjacent blocks contain values with different compression complexity:
// human readable strings are easier to compress than random strings.
Random rnd(101);
std::map<std::string, std::string> kv;
{
uint32_t key = 0;
for (int block = 0; block < 800; block++) {
for (int i = 0; i < 16; i++) {
char k[9] = {0};
// Internal key is constructed directly from this key,
// and internal key size is required to be >= 8 bytes,
// so use %08u as the format string.
sprintf(k, "%08u", key);
std::string v;
test::RandomString(&rnd, 256, &v);
kv[std::string(k)] = v;
key++;
}
}
}
std::string table_name =
"BlockBasedTableReaderTest" + CompressionTypeToString(compression_type_);
CreateTable(table_name, compression_type_, kv);
std::unique_ptr<BlockBasedTable> table;
Options options;
ImmutableCFOptions ioptions(options);
FileOptions foptions;
foptions.use_direct_reads = use_direct_reads_;
InternalKeyComparator comparator(options.comparator);
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table);
// Use the top level iterator to find the offset/size of the first
// 2nd level index block and corrupt the block
IndexBlockIter iiter_on_stack;
BlockCacheLookupContext context{TableReaderCaller::kUserVerifyChecksum};
InternalIteratorBase<IndexValue>* iiter = table->NewIndexIterator(
ReadOptions(), /*disable_prefix_seek=*/false, &iiter_on_stack,
/*get_context=*/nullptr, &context);
std::unique_ptr<InternalIteratorBase<IndexValue>> iiter_unique_ptr;
if (iiter != &iiter_on_stack) {
iiter_unique_ptr = std::unique_ptr<InternalIteratorBase<IndexValue>>(iiter);
}
ASSERT_OK(iiter->status());
iiter->SeekToFirst();
BlockHandle handle = static_cast<ParititionedIndexIterator*>(iiter)
->index_iter_->value()
.handle;
table.reset();
// Corrupt the block pointed to by handle
test::CorruptFile(Path(table_name), static_cast<int>(handle.offset()), 128);
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table);
Status s = table->VerifyChecksum(ReadOptions(),
TableReaderCaller::kUserVerifyChecksum);
ASSERT_EQ(s.code(), Status::kCorruption);
}
// Param 1: compression type
// Param 2: whether to use direct reads
// Param 3: Block Based Table Index type
// Param 4: BBTO no_block_cache option
#ifdef ROCKSDB_LITE
// Skip direct I/O tests in lite mode since direct I/O is unsupported.
INSTANTIATE_TEST_CASE_P(
MultiGet, BlockBasedTableReaderTest,
::testing::Combine(::testing::ValuesIn(GetSupportedCompressions()),
::testing::Values(false)));
::testing::Combine(
::testing::ValuesIn(GetSupportedCompressions()),
::testing::Values(false),
::testing::Values(BlockBasedTableOptions::IndexType::kBinarySearch),
::testing::Values(false)));
#else // ROCKSDB_LITE
INSTANTIATE_TEST_CASE_P(
MultiGet, BlockBasedTableReaderTest,
::testing::Combine(::testing::ValuesIn(GetSupportedCompressions()),
::testing::Bool()));
::testing::Combine(
::testing::ValuesIn(GetSupportedCompressions()), ::testing::Bool(),
::testing::Values(BlockBasedTableOptions::IndexType::kBinarySearch),
::testing::Values(false)));
#endif // ROCKSDB_LITE
INSTANTIATE_TEST_CASE_P(
VerifyChecksum, BlockBasedTableReaderTestVerifyChecksum,
::testing::Combine(
::testing::ValuesIn(GetSupportedCompressions()),
::testing::Values(false),
::testing::Values(
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch),
::testing::Values(true)));
} // namespace ROCKSDB_NAMESPACE

@ -122,6 +122,7 @@ class ParititionedIndexIterator : public InternalIteratorBase<IndexValue> {
}
private:
friend class BlockBasedTableReaderTestVerifyChecksum_ChecksumMismatch_Test;
const BlockBasedTable* table_;
const ReadOptions read_options_;
#ifndef NDEBUG

@ -10,6 +10,7 @@
#include "test_util/testutil.h"
#include <fcntl.h>
#include <sys/stat.h>
#include <array>
#include <cctype>
#include <fstream>
@ -536,5 +537,46 @@ void SetupSyncPointsToMockDirectIO() {
#endif
}
void CorruptFile(const std::string& fname, int offset, int bytes_to_corrupt) {
struct stat sbuf;
if (stat(fname.c_str(), &sbuf) != 0) {
// strerror is not thread-safe so should not be used in the "passing" path
// of unit tests (sometimes parallelized) but is OK here where test fails
const char* msg = strerror(errno);
fprintf(stderr, "%s:%s\n", fname.c_str(), msg);
assert(false);
}
if (offset < 0) {
// Relative to end of file; make it absolute
if (-offset > sbuf.st_size) {
offset = 0;
} else {
offset = static_cast<int>(sbuf.st_size + offset);
}
}
if (offset > sbuf.st_size) {
offset = static_cast<int>(sbuf.st_size);
}
if (offset + bytes_to_corrupt > sbuf.st_size) {
bytes_to_corrupt = static_cast<int>(sbuf.st_size - offset);
}
// Do it
std::string contents;
Status s = ReadFileToString(Env::Default(), fname, &contents);
assert(s.ok());
for (int i = 0; i < bytes_to_corrupt; i++) {
contents[i + offset] ^= 0x80;
}
s = WriteStringToFile(Env::Default(), contents, fname);
assert(s.ok());
Options options;
EnvOptions env_options;
#ifndef ROCKSDB_LITE
assert(!VerifySstFileChecksum(options, env_options, fname).ok());
#endif
}
} // namespace test
} // namespace ROCKSDB_NAMESPACE

@ -812,5 +812,7 @@ void ResetTmpDirForDirectIO();
// to the file system.
void SetupSyncPointsToMockDirectIO();
void CorruptFile(const std::string& fname, int offset, int bytes_to_corrupt);
} // namespace test
} // namespace ROCKSDB_NAMESPACE

Loading…
Cancel
Save