Ensure VerifyFileChecksums reads don't exceed readahead_size (#11328)

Summary:
VerifyFileChecksums currently interprets the readahead_size as a payload of readahead_size for calculating the checksum, plus a prefetch of an additional readahead_size. Hence each read is readahead_size * 2. This change treats it as chunks of readahead_size for checksum calculation.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11328

Test Plan: Add a unit test

Reviewed By: pdillinger

Differential Revision: D44718781

Pulled By: anand1976

fbshipit-source-id: 79bae1ebaa27de2a13bc86f5910bf09356936e63
oxigraph-8.3.2
anand76 2 years ago committed by Facebook GitHub Bot
parent 7f5b9f40cb
commit 0623c5b903
  1. 3
      HISTORY.md
  2. 57
      db/db_basic_test.cc
  3. 24
      file/file_util.cc

@ -9,6 +9,9 @@
* For level compaction with `level_compaction_dynamic_level_bytes=true`, RocksDB now trivially moves levels down to fill LSM starting from bottommost level during DB open. See more in comments for option `level_compaction_dynamic_level_bytes`.
* User-provided `ReadOptions` take effect for more reads of non-`CacheEntryRole::kDataBlock` blocks.
### Bug Fixes
* In the DB::VerifyFileChecksums API, ensure that file system reads of SST files are equal to the readahead_size in ReadOptions, if specified. Previously, each read was 2x the readahead_size.
### New Features
* Add experimental `PerfContext` counters `iter_{next|prev|seek}_count` for db iterator, each counting the times of corresponding API being called.
* Allow runtime changes to whether `WriteBufferManager` allows stall or not by calling `SetAllowStall()`

@ -4494,6 +4494,63 @@ TEST_F(DBBasicTest, VerifyFileChecksums) {
ASSERT_TRUE(db_->VerifyFileChecksums(ReadOptions()).IsInvalidArgument());
}
TEST_F(DBBasicTest, VerifyFileChecksumsReadahead) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.env = env_;
options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
DestroyAndReopen(options);
Random rnd(301);
int alignment = 256 * 1024;
for (int i = 0; i < 16; ++i) {
ASSERT_OK(Put("key" + std::to_string(i), rnd.RandomString(alignment)));
}
ASSERT_OK(Flush());
std::vector<std::string> filenames;
int sst_cnt = 0;
std::string sst_name;
uint64_t sst_size;
uint64_t number;
FileType type;
ASSERT_OK(env_->GetChildren(dbname_, &filenames));
for (auto name : filenames) {
if (ParseFileName(name, &number, &type)) {
if (type == kTableFile) {
sst_cnt++;
sst_name = name;
}
}
}
ASSERT_EQ(sst_cnt, 1);
ASSERT_OK(env_->GetFileSize(dbname_ + '/' + sst_name, &sst_size));
bool last_read = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"GenerateOneFileChecksum::Chunk:0", [&](void* /*arg*/) {
if (env_->random_read_bytes_counter_.load() == sst_size) {
EXPECT_FALSE(last_read);
last_read = true;
} else {
ASSERT_EQ(env_->random_read_bytes_counter_.load() & (alignment - 1),
0);
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
env_->count_random_reads_ = true;
env_->random_read_bytes_counter_ = 0;
env_->random_read_counter_.Reset();
ReadOptions ro;
ro.readahead_size = alignment;
ASSERT_OK(db_->VerifyFileChecksums(ro));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ASSERT_TRUE(last_read);
ASSERT_EQ(env_->random_read_counter_.Read(),
(sst_size + alignment - 1) / (alignment));
}
// TODO: re-enable after we provide finer-grained control for WAL tracking to
// meet the needs of different use cases, durability levels and recovery modes.
TEST_F(DBBasicTest, DISABLED_ManualWalSync) {

@ -135,7 +135,7 @@ IOStatus GenerateOneFileChecksum(
FileChecksumGenFactory* checksum_factory,
const std::string& requested_checksum_func_name, std::string* file_checksum,
std::string* file_checksum_func_name,
size_t verify_checksums_readahead_size, bool allow_mmap_reads,
size_t verify_checksums_readahead_size, bool /*allow_mmap_reads*/,
std::shared_ptr<IOTracer>& io_tracer, RateLimiter* rate_limiter,
Env::IOPriority rate_limiter_priority) {
if (checksum_factory == nullptr) {
@ -196,10 +196,12 @@ IOStatus GenerateOneFileChecksum(
size_t readahead_size = (verify_checksums_readahead_size != 0)
? verify_checksums_readahead_size
: default_max_read_ahead_size;
FilePrefetchBuffer prefetch_buffer(readahead_size /* readahead_size */,
readahead_size /* max_readahead_size */,
!allow_mmap_reads /* enable */);
std::unique_ptr<char[]> buf;
if (reader->use_direct_io()) {
size_t alignment = reader->file()->GetRequiredBufferAlignment();
readahead_size = (readahead_size + alignment - 1) & ~(alignment - 1);
}
buf.reset(new char[readahead_size]);
Slice slice;
uint64_t offset = 0;
@ -207,11 +209,11 @@ IOStatus GenerateOneFileChecksum(
while (size > 0) {
size_t bytes_to_read =
static_cast<size_t>(std::min(uint64_t{readahead_size}, size));
if (!prefetch_buffer.TryReadFromCache(
opts, reader.get(), offset, bytes_to_read, &slice,
nullptr /* status */, rate_limiter_priority,
false /* for_compaction */)) {
return IOStatus::Corruption("file read failed");
io_s = reader->Read(opts, offset, bytes_to_read, &slice, buf.get(), nullptr,
rate_limiter_priority);
if (!io_s.ok()) {
return IOStatus::Corruption("file read failed with error: " +
io_s.ToString());
}
if (slice.size() == 0) {
return IOStatus::Corruption("file too small");
@ -219,6 +221,8 @@ IOStatus GenerateOneFileChecksum(
checksum_generator->Update(slice.data(), slice.size());
size -= slice.size();
offset += slice.size();
TEST_SYNC_POINT("GenerateOneFileChecksum::Chunk:0");
}
checksum_generator->Finalize();
*file_checksum = checksum_generator->GetChecksum();

Loading…
Cancel
Save