diff --git a/HISTORY.md b/HISTORY.md index e2baea202..a48a77a6a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -9,6 +9,9 @@ * For level compaction with `level_compaction_dynamic_level_bytes=true`, RocksDB now trivially moves levels down to fill LSM starting from bottommost level during DB open. See more in comments for option `level_compaction_dynamic_level_bytes`. * User-provided `ReadOptions` take effect for more reads of non-`CacheEntryRole::kDataBlock` blocks. +### Bug Fixes +* In the DB::VerifyFileChecksums API, ensure that file system reads of SST files are equal to the readahead_size in ReadOptions, if specified. Previously, each read was 2x the readahead_size. + ### New Features * Add experimental `PerfContext` counters `iter_{next|prev|seek}_count` for db iterator, each counting the times of corresponding API being called. * Allow runtime changes to whether `WriteBufferManager` allows stall or not by calling `SetAllowStall()` diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 0178fe480..063b99839 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -4494,6 +4494,63 @@ TEST_F(DBBasicTest, VerifyFileChecksums) { ASSERT_TRUE(db_->VerifyFileChecksums(ReadOptions()).IsInvalidArgument()); } +TEST_F(DBBasicTest, VerifyFileChecksumsReadahead) { + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.env = env_; + options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory(); + DestroyAndReopen(options); + + Random rnd(301); + int alignment = 256 * 1024; + for (int i = 0; i < 16; ++i) { + ASSERT_OK(Put("key" + std::to_string(i), rnd.RandomString(alignment))); + } + ASSERT_OK(Flush()); + + std::vector filenames; + int sst_cnt = 0; + std::string sst_name; + uint64_t sst_size; + uint64_t number; + FileType type; + ASSERT_OK(env_->GetChildren(dbname_, &filenames)); + for (auto name : filenames) { + if (ParseFileName(name, &number, &type)) { + if (type == kTableFile) { + sst_cnt++; + sst_name = name; + } + } + } + ASSERT_EQ(sst_cnt, 1); + ASSERT_OK(env_->GetFileSize(dbname_ + '/' + sst_name, &sst_size)); + + bool last_read = false; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "GenerateOneFileChecksum::Chunk:0", [&](void* /*arg*/) { + if (env_->random_read_bytes_counter_.load() == sst_size) { + EXPECT_FALSE(last_read); + last_read = true; + } else { + ASSERT_EQ(env_->random_read_bytes_counter_.load() & (alignment - 1), + 0); + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + env_->count_random_reads_ = true; + env_->random_read_bytes_counter_ = 0; + env_->random_read_counter_.Reset(); + + ReadOptions ro; + ro.readahead_size = alignment; + ASSERT_OK(db_->VerifyFileChecksums(ro)); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ASSERT_TRUE(last_read); + ASSERT_EQ(env_->random_read_counter_.Read(), + (sst_size + alignment - 1) / (alignment)); +} + // TODO: re-enable after we provide finer-grained control for WAL tracking to // meet the needs of different use cases, durability levels and recovery modes. TEST_F(DBBasicTest, DISABLED_ManualWalSync) { diff --git a/file/file_util.cc b/file/file_util.cc index 4b36ea138..43608fcdc 100644 --- a/file/file_util.cc +++ b/file/file_util.cc @@ -135,7 +135,7 @@ IOStatus GenerateOneFileChecksum( FileChecksumGenFactory* checksum_factory, const std::string& requested_checksum_func_name, std::string* file_checksum, std::string* file_checksum_func_name, - size_t verify_checksums_readahead_size, bool allow_mmap_reads, + size_t verify_checksums_readahead_size, bool /*allow_mmap_reads*/, std::shared_ptr& io_tracer, RateLimiter* rate_limiter, Env::IOPriority rate_limiter_priority) { if (checksum_factory == nullptr) { @@ -196,10 +196,12 @@ IOStatus GenerateOneFileChecksum( size_t readahead_size = (verify_checksums_readahead_size != 0) ? verify_checksums_readahead_size : default_max_read_ahead_size; - - FilePrefetchBuffer prefetch_buffer(readahead_size /* readahead_size */, - readahead_size /* max_readahead_size */, - !allow_mmap_reads /* enable */); + std::unique_ptr buf; + if (reader->use_direct_io()) { + size_t alignment = reader->file()->GetRequiredBufferAlignment(); + readahead_size = (readahead_size + alignment - 1) & ~(alignment - 1); + } + buf.reset(new char[readahead_size]); Slice slice; uint64_t offset = 0; @@ -207,11 +209,11 @@ IOStatus GenerateOneFileChecksum( while (size > 0) { size_t bytes_to_read = static_cast(std::min(uint64_t{readahead_size}, size)); - if (!prefetch_buffer.TryReadFromCache( - opts, reader.get(), offset, bytes_to_read, &slice, - nullptr /* status */, rate_limiter_priority, - false /* for_compaction */)) { - return IOStatus::Corruption("file read failed"); + io_s = reader->Read(opts, offset, bytes_to_read, &slice, buf.get(), nullptr, + rate_limiter_priority); + if (!io_s.ok()) { + return IOStatus::Corruption("file read failed with error: " + + io_s.ToString()); } if (slice.size() == 0) { return IOStatus::Corruption("file too small"); @@ -219,6 +221,8 @@ IOStatus GenerateOneFileChecksum( checksum_generator->Update(slice.data(), slice.size()); size -= slice.size(); offset += slice.size(); + + TEST_SYNC_POINT("GenerateOneFileChecksum::Chunk:0"); } checksum_generator->Finalize(); *file_checksum = checksum_generator->GetChecksum();