From dc5de45af8fc1bcc90edea5cd9a5695bd85a8baf Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Fri, 19 Nov 2021 17:52:42 -0800 Subject: [PATCH] Support readahead during compaction for blob files (#9187) Summary: The patch adds a new BlobDB configuration option `blob_compaction_readahead_size` that can be used to enable prefetching data from blob files during compaction. This is important when using storage with higher latencies like HDDs or remote filesystems. If enabled, prefetching is used for all cases when blobs are read during compaction, namely garbage collection, compaction filters (when the existing value has to be read from a blob file), and `Merge` (when the value of the base `Put` is stored in a blob file). Pull Request resolved: https://github.com/facebook/rocksdb/pull/9187 Test Plan: Ran `make check` and the stress/crash test. Reviewed By: riversand963 Differential Revision: D32565512 Pulled By: ltamasi fbshipit-source-id: 87be9cebc3aa01cc227bec6b5f64d827b8164f5d --- CMakeLists.txt | 1 + TARGETS | 2 + db/blob/blob_fetcher.cc | 28 +++-- db/blob/blob_fetcher.h | 21 +++- db/blob/blob_file_reader.cc | 24 +++- db/blob/blob_file_reader.h | 4 +- db/blob/blob_file_reader_test.cc | 57 +++++---- db/blob/db_blob_compaction_test.cc | 118 ++++++++++++++++++ db/blob/prefetch_buffer_collection.cc | 21 ++++ db/blob/prefetch_buffer_collection.h | 38 ++++++ db/c.cc | 10 ++ db/c_test.c | 4 + db/compaction/compaction_iteration_stats.h | 4 + db/compaction/compaction_iterator.cc | 105 +++++++++++----- db/compaction/compaction_iterator.h | 25 +++- db/compaction/compaction_iterator_test.cc | 6 +- db/db_iter.cc | 3 +- db/merge_helper.cc | 35 +++++- db/merge_helper.h | 19 ++- db/merge_helper_test.cc | 6 +- db/version_set.cc | 28 +++-- db/version_set.h | 7 +- db_stress_tool/db_stress_common.h | 1 + db_stress_tool/db_stress_gflags.cc | 5 + db_stress_tool/db_stress_test_base.cc | 31 ++--- file/file_prefetch_buffer.cc | 8 +- file/file_prefetch_buffer.h | 32 ++--- file/file_util.cc | 11 +- include/rocksdb/advanced_options.h | 12 +- include/rocksdb/c.h | 6 + options/cf_options.cc | 6 + options/cf_options.h | 3 + options/options.cc | 6 +- options/options_helper.cc | 2 + options/options_settable_test.cc | 1 + options/options_test.cc | 4 + src.mk | 1 + table/block_based/block_based_table_reader.cc | 14 ++- table/block_based/block_based_table_reader.h | 8 +- table/block_fetcher.cc | 7 +- table/format.cc | 2 +- table/get_context.cc | 6 +- table/sst_file_dumper.cc | 5 +- table/table_test.cc | 15 ++- test_util/testutil.cc | 1 + tools/db_bench_tool.cc | 7 ++ tools/db_bench_tool_test.cc | 1 + tools/db_crashtest.py | 1 + 48 files changed, 599 insertions(+), 163 deletions(-) create mode 100644 db/blob/prefetch_buffer_collection.cc create mode 100644 db/blob/prefetch_buffer_collection.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 9ac79153a..894c39987 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -652,6 +652,7 @@ set(SOURCES db/blob/blob_log_format.cc db/blob/blob_log_sequential_reader.cc db/blob/blob_log_writer.cc + db/blob/prefetch_buffer_collection.cc db/builder.cc db/c.cc db/column_family.cc diff --git a/TARGETS b/TARGETS index a842673a8..903d7e5f3 100644 --- a/TARGETS +++ b/TARGETS @@ -159,6 +159,7 @@ cpp_library( "db/blob/blob_log_format.cc", "db/blob/blob_log_sequential_reader.cc", "db/blob/blob_log_writer.cc", + "db/blob/prefetch_buffer_collection.cc", "db/builder.cc", "db/c.cc", "db/column_family.cc", @@ -487,6 +488,7 @@ cpp_library( "db/blob/blob_log_format.cc", "db/blob/blob_log_sequential_reader.cc", "db/blob/blob_log_writer.cc", + "db/blob/prefetch_buffer_collection.cc", "db/builder.cc", "db/c.cc", "db/column_family.cc", diff --git a/db/blob/blob_fetcher.cc b/db/blob/blob_fetcher.cc index a42a4be5f..124429f93 100644 --- a/db/blob/blob_fetcher.cc +++ b/db/blob/blob_fetcher.cc @@ -9,14 +9,26 @@ namespace ROCKSDB_NAMESPACE { -Status BlobFetcher::FetchBlob(const Slice& user_key, const Slice& blob_index, - PinnableSlice* blob_value) { - Status s; +Status BlobFetcher::FetchBlob(const Slice& user_key, + const Slice& blob_index_slice, + FilePrefetchBuffer* prefetch_buffer, + PinnableSlice* blob_value, + uint64_t* bytes_read) const { assert(version_); - constexpr uint64_t* bytes_read = nullptr; - s = version_->GetBlob(read_options_, user_key, blob_index, blob_value, - bytes_read); - return s; + + return version_->GetBlob(read_options_, user_key, blob_index_slice, + prefetch_buffer, blob_value, bytes_read); +} + +Status BlobFetcher::FetchBlob(const Slice& user_key, + const BlobIndex& blob_index, + FilePrefetchBuffer* prefetch_buffer, + PinnableSlice* blob_value, + uint64_t* bytes_read) const { + assert(version_); + + return version_->GetBlob(read_options_, user_key, blob_index, prefetch_buffer, + blob_value, bytes_read); } -} // namespace ROCKSDB_NAMESPACE \ No newline at end of file +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_fetcher.h b/db/blob/blob_fetcher.h index 747057f09..8aeaf965d 100644 --- a/db/blob/blob_fetcher.h +++ b/db/blob/blob_fetcher.h @@ -9,18 +9,29 @@ #include "rocksdb/status.h" namespace ROCKSDB_NAMESPACE { + class Version; +class Slice; +class FilePrefetchBuffer; +class PinnableSlice; +class BlobIndex; +// A thin wrapper around the blob retrieval functionality of Version. class BlobFetcher { public: - BlobFetcher(Version* version, const ReadOptions& read_options) + BlobFetcher(const Version* version, const ReadOptions& read_options) : version_(version), read_options_(read_options) {} - Status FetchBlob(const Slice& user_key, const Slice& blob_index, - PinnableSlice* blob_value); + Status FetchBlob(const Slice& user_key, const Slice& blob_index_slice, + FilePrefetchBuffer* prefetch_buffer, + PinnableSlice* blob_value, uint64_t* bytes_read) const; + + Status FetchBlob(const Slice& user_key, const BlobIndex& blob_index, + FilePrefetchBuffer* prefetch_buffer, + PinnableSlice* blob_value, uint64_t* bytes_read) const; private: - Version* version_; + const Version* version_; ReadOptions read_options_; }; -} // namespace ROCKSDB_NAMESPACE \ No newline at end of file +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_file_reader.cc b/db/blob/blob_file_reader.cc index 0cb49ae62..981261001 100644 --- a/db/blob/blob_file_reader.cc +++ b/db/blob/blob_file_reader.cc @@ -9,6 +9,7 @@ #include #include "db/blob/blob_log_format.h" +#include "file/file_prefetch_buffer.h" #include "file/filename.h" #include "monitoring/statistics.h" #include "options/cf_options.h" @@ -282,6 +283,7 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options, const Slice& user_key, uint64_t offset, uint64_t value_size, CompressionType compression_type, + FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value, uint64_t* bytes_read) const { assert(value); @@ -313,7 +315,21 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options, Buffer buf; AlignedBuf aligned_buf; - { + bool prefetched = false; + + if (prefetch_buffer) { + Status s; + constexpr bool for_compaction = true; + + prefetched = prefetch_buffer->TryReadFromCache( + IOOptions(), file_reader_.get(), record_offset, + static_cast(record_size), &record_slice, &s, for_compaction); + if (!s.ok()) { + return s; + } + } + + if (!prefetched) { TEST_SYNC_POINT("BlobFileReader::GetBlob:ReadFromFile"); const Status s = ReadFromFile(file_reader_.get(), record_offset, @@ -322,11 +338,11 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options, if (!s.ok()) { return s; } - - TEST_SYNC_POINT_CALLBACK("BlobFileReader::GetBlob:TamperWithResult", - &record_slice); } + TEST_SYNC_POINT_CALLBACK("BlobFileReader::GetBlob:TamperWithResult", + &record_slice); + if (read_options.verify_checksums) { const Status s = VerifyBlob(record_slice, user_key, value_size); if (!s.ok()) { diff --git a/db/blob/blob_file_reader.h b/db/blob/blob_file_reader.h index b7da393f3..ffd1d11d5 100644 --- a/db/blob/blob_file_reader.h +++ b/db/blob/blob_file_reader.h @@ -21,6 +21,7 @@ struct FileOptions; class HistogramImpl; struct ReadOptions; class Slice; +class FilePrefetchBuffer; class PinnableSlice; class Statistics; @@ -41,7 +42,8 @@ class BlobFileReader { Status GetBlob(const ReadOptions& read_options, const Slice& user_key, uint64_t offset, uint64_t value_size, - CompressionType compression_type, PinnableSlice* value, + CompressionType compression_type, + FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value, uint64_t* bytes_read) const; // offsets must be sorted in ascending order by caller. diff --git a/db/blob/blob_file_reader_test.cc b/db/blob/blob_file_reader_test.cc index ebafef834..8cbb982a8 100644 --- a/db/blob/blob_file_reader_test.cc +++ b/db/blob/blob_file_reader_test.cc @@ -179,13 +179,15 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { ReadOptions read_options; read_options.verify_checksums = false; + constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; + { PinnableSlice value; uint64_t bytes_read = 0; ASSERT_OK(reader->GetBlob(read_options, keys[0], blob_offsets[0], - blob_sizes[0], kNoCompression, &value, - &bytes_read)); + blob_sizes[0], kNoCompression, prefetch_buffer, + &value, &bytes_read)); ASSERT_EQ(value, blobs[0]); ASSERT_EQ(bytes_read, blob_sizes[0]); @@ -222,8 +224,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { uint64_t bytes_read = 0; ASSERT_OK(reader->GetBlob(read_options, keys[1], blob_offsets[1], - blob_sizes[1], kNoCompression, &value, - &bytes_read)); + blob_sizes[1], kNoCompression, prefetch_buffer, + &value, &bytes_read)); ASSERT_EQ(value, blobs[1]); const uint64_t key_size = keys[1].size(); @@ -239,8 +241,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { ASSERT_TRUE(reader ->GetBlob(read_options, keys[0], blob_offsets[0] - 1, - blob_sizes[0], kNoCompression, &value, - &bytes_read) + blob_sizes[0], kNoCompression, prefetch_buffer, + &value, &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); } @@ -252,8 +254,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { ASSERT_TRUE(reader ->GetBlob(read_options, keys[2], blob_offsets[2] + 1, - blob_sizes[2], kNoCompression, &value, - &bytes_read) + blob_sizes[2], kNoCompression, prefetch_buffer, + &value, &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); } @@ -265,7 +267,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { ASSERT_TRUE(reader ->GetBlob(read_options, keys[0], blob_offsets[0], - blob_sizes[0], kZSTD, &value, &bytes_read) + blob_sizes[0], kZSTD, prefetch_buffer, &value, + &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); } @@ -280,8 +283,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { ->GetBlob(read_options, shorter_key, blob_offsets[0] - (keys[0].size() - sizeof(shorter_key) + 1), - blob_sizes[0], kNoCompression, &value, - &bytes_read) + blob_sizes[0], kNoCompression, prefetch_buffer, + &value, &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); @@ -323,8 +326,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { ASSERT_TRUE(reader ->GetBlob(read_options, incorrect_key, blob_offsets[0], - blob_sizes[0], kNoCompression, &value, - &bytes_read) + blob_sizes[0], kNoCompression, prefetch_buffer, + &value, &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); @@ -363,8 +366,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { ASSERT_TRUE(reader ->GetBlob(read_options, keys[1], blob_offsets[1], - blob_sizes[1] + 1, kNoCompression, &value, - &bytes_read) + blob_sizes[1] + 1, kNoCompression, + prefetch_buffer, &value, &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); @@ -642,12 +645,14 @@ TEST_F(BlobFileReaderTest, BlobCRCError) { SyncPoint::GetInstance()->EnableProcessing(); + constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; PinnableSlice value; uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(ReadOptions(), key, blob_offset, blob_size, - kNoCompression, &value, &bytes_read) + kNoCompression, prefetch_buffer, &value, + &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); @@ -695,12 +700,15 @@ TEST_F(BlobFileReaderTest, Compression) { ReadOptions read_options; read_options.verify_checksums = false; + constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; + { PinnableSlice value; uint64_t bytes_read = 0; ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size, - kSnappyCompression, &value, &bytes_read)); + kSnappyCompression, prefetch_buffer, &value, + &bytes_read)); ASSERT_EQ(value, blob); ASSERT_EQ(bytes_read, blob_size); } @@ -712,7 +720,8 @@ TEST_F(BlobFileReaderTest, Compression) { uint64_t bytes_read = 0; ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size, - kSnappyCompression, &value, &bytes_read)); + kSnappyCompression, prefetch_buffer, &value, + &bytes_read)); ASSERT_EQ(value, blob); constexpr uint64_t key_size = sizeof(key) - 1; @@ -770,12 +779,14 @@ TEST_F(BlobFileReaderTest, UncompressionError) { SyncPoint::GetInstance()->EnableProcessing(); + constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; PinnableSlice value; uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(ReadOptions(), key, blob_offset, blob_size, - kSnappyCompression, &value, &bytes_read) + kSnappyCompression, prefetch_buffer, &value, + &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); @@ -854,12 +865,14 @@ TEST_P(BlobFileReaderIOErrorTest, IOError) { } else { ASSERT_OK(s); + constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; PinnableSlice value; uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(ReadOptions(), key, blob_offset, blob_size, - kNoCompression, &value, &bytes_read) + kNoCompression, prefetch_buffer, &value, + &bytes_read) .IsIOError()); ASSERT_EQ(bytes_read, 0); } @@ -937,12 +950,14 @@ TEST_P(BlobFileReaderDecodingErrorTest, DecodingError) { } else { ASSERT_OK(s); + constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; PinnableSlice value; uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(ReadOptions(), key, blob_offset, blob_size, - kNoCompression, &value, &bytes_read) + kNoCompression, prefetch_buffer, &value, + &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); } diff --git a/db/blob/db_blob_compaction_test.cc b/db/blob/db_blob_compaction_test.cc index ee4acc4bf..aff47896d 100644 --- a/db/blob/db_blob_compaction_test.cc +++ b/db/blob/db_blob_compaction_test.cc @@ -590,6 +590,124 @@ TEST_F(DBBlobCompactionTest, MergeBlobWithBase) { Close(); } +TEST_F(DBBlobCompactionTest, CompactionReadaheadGarbageCollection) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = 0; + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 1.0; + options.blob_compaction_readahead_size = 1 << 10; + options.disable_auto_compactions = true; + + Reopen(options); + + ASSERT_OK(Put("key", "lime")); + ASSERT_OK(Put("foo", "bar")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("key", "pie")); + ASSERT_OK(Put("foo", "baz")); + ASSERT_OK(Flush()); + + size_t num_non_prefetch_reads = 0; + SyncPoint::GetInstance()->SetCallBack( + "BlobFileReader::GetBlob:ReadFromFile", + [&num_non_prefetch_reads](void* /* arg */) { ++num_non_prefetch_reads; }); + SyncPoint::GetInstance()->EnableProcessing(); + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + ASSERT_EQ(Get("key"), "pie"); + ASSERT_EQ(Get("foo"), "baz"); + ASSERT_EQ(num_non_prefetch_reads, 0); + + Close(); +} + +TEST_F(DBBlobCompactionTest, CompactionReadaheadFilter) { + Options options = GetDefaultOptions(); + + std::unique_ptr compaction_filter_guard( + new ValueMutationFilter("pie")); + + options.compaction_filter = compaction_filter_guard.get(); + options.enable_blob_files = true; + options.min_blob_size = 0; + options.blob_compaction_readahead_size = 1 << 10; + options.disable_auto_compactions = true; + + Reopen(options); + + ASSERT_OK(Put("key", "lime")); + ASSERT_OK(Put("foo", "bar")); + ASSERT_OK(Flush()); + + size_t num_non_prefetch_reads = 0; + SyncPoint::GetInstance()->SetCallBack( + "BlobFileReader::GetBlob:ReadFromFile", + [&num_non_prefetch_reads](void* /* arg */) { ++num_non_prefetch_reads; }); + SyncPoint::GetInstance()->EnableProcessing(); + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + ASSERT_EQ(Get("key"), "limepie"); + ASSERT_EQ(Get("foo"), "barpie"); + ASSERT_EQ(num_non_prefetch_reads, 0); + + Close(); +} + +TEST_F(DBBlobCompactionTest, CompactionReadaheadMerge) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = 0; + options.blob_compaction_readahead_size = 1 << 10; + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + options.disable_auto_compactions = true; + + Reopen(options); + + ASSERT_OK(Put("key", "lime")); + ASSERT_OK(Put("foo", "bar")); + ASSERT_OK(Flush()); + + ASSERT_OK(Merge("key", "pie")); + ASSERT_OK(Merge("foo", "baz")); + ASSERT_OK(Flush()); + + size_t num_non_prefetch_reads = 0; + SyncPoint::GetInstance()->SetCallBack( + "BlobFileReader::GetBlob:ReadFromFile", + [&num_non_prefetch_reads](void* /* arg */) { ++num_non_prefetch_reads; }); + SyncPoint::GetInstance()->EnableProcessing(); + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + ASSERT_EQ(Get("key"), "lime,pie"); + ASSERT_EQ(Get("foo"), "bar,baz"); + ASSERT_EQ(num_non_prefetch_reads, 0); + + Close(); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/blob/prefetch_buffer_collection.cc b/db/blob/prefetch_buffer_collection.cc new file mode 100644 index 000000000..079576f51 --- /dev/null +++ b/db/blob/prefetch_buffer_collection.cc @@ -0,0 +1,21 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob/prefetch_buffer_collection.h" + +namespace ROCKSDB_NAMESPACE { + +FilePrefetchBuffer* PrefetchBufferCollection::GetOrCreatePrefetchBuffer( + uint64_t file_number) { + auto& prefetch_buffer = prefetch_buffers_[file_number]; + if (!prefetch_buffer) { + prefetch_buffer.reset( + new FilePrefetchBuffer(readahead_size_, readahead_size_)); + } + + return prefetch_buffer.get(); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/prefetch_buffer_collection.h b/db/blob/prefetch_buffer_collection.h new file mode 100644 index 000000000..b973eddc0 --- /dev/null +++ b/db/blob/prefetch_buffer_collection.h @@ -0,0 +1,38 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include +#include +#include + +#include "file/file_prefetch_buffer.h" +#include "rocksdb/rocksdb_namespace.h" + +namespace ROCKSDB_NAMESPACE { + +// A class that owns a collection of FilePrefetchBuffers using the file number +// as key. Used for implementing compaction readahead for blob files. Designed +// to be accessed by a single thread only: every (sub)compaction needs its own +// buffers since they are guaranteed to read different blobs from different +// positions even when reading the same file. +class PrefetchBufferCollection { + public: + explicit PrefetchBufferCollection(uint64_t readahead_size) + : readahead_size_(readahead_size) { + assert(readahead_size_ > 0); + } + + FilePrefetchBuffer* GetOrCreatePrefetchBuffer(uint64_t file_number); + + private: + uint64_t readahead_size_; + std::unordered_map> + prefetch_buffers_; // maps file number to prefetch buffer +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/c.cc b/db/c.cc index 89dfa5303..dda5f5560 100644 --- a/db/c.cc +++ b/db/c.cc @@ -2760,6 +2760,16 @@ double rocksdb_options_get_blob_gc_force_threshold(rocksdb_options_t* opt) { return opt->rep.blob_garbage_collection_force_threshold; } +void rocksdb_options_set_blob_compaction_readahead_size(rocksdb_options_t* opt, + uint64_t val) { + opt->rep.blob_compaction_readahead_size = val; +} + +uint64_t rocksdb_options_get_blob_compaction_readahead_size( + rocksdb_options_t* opt) { + return opt->rep.blob_compaction_readahead_size; +} + void rocksdb_options_set_num_levels(rocksdb_options_t* opt, int n) { opt->rep.num_levels = n; } diff --git a/db/c_test.c b/db/c_test.c index fb8f65635..8f7e05ca2 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -1799,6 +1799,10 @@ int main(int argc, char** argv) { rocksdb_options_set_blob_gc_force_threshold(o, 0.75); CheckCondition(0.75 == rocksdb_options_get_blob_gc_force_threshold(o)); + rocksdb_options_set_blob_compaction_readahead_size(o, 262144); + CheckCondition(262144 == + rocksdb_options_get_blob_compaction_readahead_size(o)); + // Create a copy that should be equal to the original. rocksdb_options_t* copy; copy = rocksdb_options_create_copy(o); diff --git a/db/compaction/compaction_iteration_stats.h b/db/compaction/compaction_iteration_stats.h index 910c4469a..1b1c28b57 100644 --- a/db/compaction/compaction_iteration_stats.h +++ b/db/compaction/compaction_iteration_stats.h @@ -9,6 +9,8 @@ #include "rocksdb/rocksdb_namespace.h" +namespace ROCKSDB_NAMESPACE { + struct CompactionIterationStats { // Compaction statistics @@ -43,3 +45,5 @@ struct CompactionIterationStats { uint64_t num_blobs_relocated = 0; uint64_t total_blob_bytes_relocated = 0; }; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index cd6a28d8f..5ccd1a922 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -8,8 +8,10 @@ #include #include +#include "db/blob/blob_fetcher.h" #include "db/blob/blob_file_builder.h" #include "db/blob/blob_index.h" +#include "db/blob/prefetch_buffer_collection.h" #include "db/snapshot_checker.h" #include "logging/logging.h" #include "port/likely.h" @@ -88,6 +90,9 @@ CompactionIterator::CompactionIterator( merge_out_iter_(merge_helper_), blob_garbage_collection_cutoff_file_number_( ComputeBlobGarbageCollectionCutoffFileNumber(compaction_.get())), + blob_fetcher_(CreateBlobFetcherIfNeeded(compaction_.get())), + prefetch_buffers_( + CreatePrefetchBufferCollectionIfNeeded(compaction_.get())), current_key_committed_(false), cmp_with_history_ts_low_(0), level_(compaction_ == nullptr ? 0 : compaction_->level()) { @@ -225,6 +230,13 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, compaction_filter_skip_until_.rep()); if (CompactionFilter::Decision::kUndetermined == filter && !compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) { + if (compaction_ == nullptr) { + status_ = + Status::Corruption("Unexpected blob index outside of compaction"); + valid_ = false; + return false; + } + // For integrated BlobDB impl, CompactionIterator reads blob value. // For Stacked BlobDB impl, the corresponding CompactionFilter's // FilterV2 method should read the blob value. @@ -235,23 +247,19 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, valid_ = false; return false; } - if (blob_index.HasTTL() || blob_index.IsInlined()) { - status_ = Status::Corruption("Unexpected TTL/inlined blob index"); - valid_ = false; - return false; - } - if (compaction_ == nullptr) { - status_ = - Status::Corruption("Unexpected blob index outside of compaction"); - valid_ = false; - return false; - } - const Version* const version = compaction_->input_version(); - assert(version); + + FilePrefetchBuffer* prefetch_buffer = + prefetch_buffers_ ? prefetch_buffers_->GetOrCreatePrefetchBuffer( + blob_index.file_number()) + : nullptr; uint64_t bytes_read = 0; - s = version->GetBlob(ReadOptions(), ikey_.user_key, blob_index, - &blob_value_, &bytes_read); + + assert(blob_fetcher_); + + s = blob_fetcher_->FetchBlob(ikey_.user_key, blob_index, + prefetch_buffer, &blob_value_, + &bytes_read); if (!s.ok()) { status_ = s; valid_ = false; @@ -831,15 +839,15 @@ void CompactionIterator::NextFromInput() { } pinned_iters_mgr_.StartPinning(); - Version* version = compaction_ ? compaction_->input_version() : nullptr; // We know the merge type entry is not hidden, otherwise we would // have hit (A) // We encapsulate the merge related state machine in a different // object to minimize change to the existing flow. - Status s = merge_helper_->MergeUntil(&input_, range_del_agg_, - prev_snapshot, bottommost_level_, - allow_data_in_errors_, version); + Status s = merge_helper_->MergeUntil( + &input_, range_del_agg_, prev_snapshot, bottommost_level_, + allow_data_in_errors_, blob_fetcher_.get(), prefetch_buffers_.get(), + &iter_stats_); merge_out_iter_.SeekToFirst(); if (!s.ok() && !s.IsMergeInProgress()) { @@ -959,26 +967,23 @@ void CompactionIterator::GarbageCollectBlobIfNeeded() { } } - if (blob_index.IsInlined() || blob_index.HasTTL()) { - status_ = Status::Corruption("Unexpected TTL/inlined blob index"); - valid_ = false; - - return; - } - if (blob_index.file_number() >= blob_garbage_collection_cutoff_file_number_) { return; } - const Version* const version = compaction_->input_version(); - assert(version); + FilePrefetchBuffer* prefetch_buffer = + prefetch_buffers_ ? prefetch_buffers_->GetOrCreatePrefetchBuffer( + blob_index.file_number()) + : nullptr; uint64_t bytes_read = 0; { - const Status s = version->GetBlob(ReadOptions(), user_key(), blob_index, - &blob_value_, &bytes_read); + assert(blob_fetcher_); + + const Status s = blob_fetcher_->FetchBlob( + user_key(), blob_index, prefetch_buffer, &blob_value_, &bytes_read); if (!s.ok()) { status_ = s; @@ -1151,7 +1156,7 @@ uint64_t CompactionIterator::ComputeBlobGarbageCollectionCutoffFileNumber( return 0; } - Version* const version = compaction->input_version(); + const Version* const version = compaction->input_version(); assert(version); const VersionStorageInfo* const storage_info = version->storage_info(); @@ -1167,4 +1172,42 @@ uint64_t CompactionIterator::ComputeBlobGarbageCollectionCutoffFileNumber( : std::numeric_limits::max(); } +std::unique_ptr CompactionIterator::CreateBlobFetcherIfNeeded( + const CompactionProxy* compaction) { + if (!compaction) { + return nullptr; + } + + const Version* const version = compaction->input_version(); + if (!version) { + return nullptr; + } + + return std::unique_ptr(new BlobFetcher(version, ReadOptions())); +} + +std::unique_ptr +CompactionIterator::CreatePrefetchBufferCollectionIfNeeded( + const CompactionProxy* compaction) { + if (!compaction) { + return nullptr; + } + + if (!compaction->input_version()) { + return nullptr; + } + + if (compaction->allow_mmap_reads()) { + return nullptr; + } + + const uint64_t readahead_size = compaction->blob_compaction_readahead_size(); + if (!readahead_size) { + return nullptr; + } + + return std::unique_ptr( + new PrefetchBufferCollection(readahead_size)); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index c3785a893..ed74ba061 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -23,6 +23,8 @@ namespace ROCKSDB_NAMESPACE { class BlobFileBuilder; +class BlobFetcher; +class PrefetchBufferCollection; // A wrapper of internal iterator whose purpose is to count how // many entries there are in the iterator. @@ -92,11 +94,15 @@ class CompactionIterator { virtual bool preserve_deletes() const = 0; + virtual bool allow_mmap_reads() const = 0; + virtual bool enable_blob_garbage_collection() const = 0; virtual double blob_garbage_collection_age_cutoff() const = 0; - virtual Version* input_version() const = 0; + virtual uint64_t blob_compaction_readahead_size() const = 0; + + virtual const Version* input_version() const = 0; virtual bool DoesInputReferenceBlobFiles() const = 0; @@ -137,6 +143,10 @@ class CompactionIterator { return compaction_->immutable_options()->preserve_deletes; } + bool allow_mmap_reads() const override { + return compaction_->immutable_options()->allow_mmap_reads; + } + bool enable_blob_garbage_collection() const override { return compaction_->mutable_cf_options()->enable_blob_garbage_collection; } @@ -146,7 +156,11 @@ class CompactionIterator { ->blob_garbage_collection_age_cutoff; } - Version* input_version() const override { + uint64_t blob_compaction_readahead_size() const override { + return compaction_->mutable_cf_options()->blob_compaction_readahead_size; + } + + const Version* input_version() const override { return compaction_->input_version(); } @@ -291,6 +305,10 @@ class CompactionIterator { static uint64_t ComputeBlobGarbageCollectionCutoffFileNumber( const CompactionProxy* compaction); + static std::unique_ptr CreateBlobFetcherIfNeeded( + const CompactionProxy* compaction); + static std::unique_ptr + CreatePrefetchBufferCollectionIfNeeded(const CompactionProxy* compaction); SequenceIterWrapper input_; const Comparator* cmp_; @@ -379,6 +397,9 @@ class CompactionIterator { uint64_t blob_garbage_collection_cutoff_file_number_; + std::unique_ptr blob_fetcher_; + std::unique_ptr prefetch_buffers_; + std::string blob_index_; PinnableSlice blob_value_; std::string compaction_filter_value_; diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index d6cc899f5..7dd50bf0e 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -168,11 +168,15 @@ class FakeCompaction : public CompactionIterator::CompactionProxy { bool preserve_deletes() const override { return false; } + bool allow_mmap_reads() const override { return false; } + bool enable_blob_garbage_collection() const override { return false; } double blob_garbage_collection_age_cutoff() const override { return 0.0; } - Version* input_version() const override { return nullptr; } + uint64_t blob_compaction_readahead_size() const override { return 0; } + + const Version* input_version() const override { return nullptr; } bool DoesInputReferenceBlobFiles() const override { return false; } diff --git a/db/db_iter.cc b/db/db_iter.cc index 75a196e4d..fc8ddfc04 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -192,10 +192,11 @@ bool DBIter::SetBlobValueIfNeeded(const Slice& user_key, read_options.read_tier = read_tier_; read_options.verify_checksums = verify_checksums_; + constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; constexpr uint64_t* bytes_read = nullptr; const Status s = version_->GetBlob(read_options, user_key, blob_index, - &blob_value_, bytes_read); + prefetch_buffer, &blob_value_, bytes_read); if (!s.ok()) { status_ = s; diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 388f4f1f2..217876849 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -8,6 +8,9 @@ #include #include "db/blob/blob_fetcher.h" +#include "db/blob/blob_index.h" +#include "db/blob/prefetch_buffer_collection.h" +#include "db/compaction/compaction_iteration_stats.h" #include "db/dbformat.h" #include "monitoring/perf_context_imp.h" #include "monitoring/statistics.h" @@ -121,7 +124,9 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, const SequenceNumber stop_before, const bool at_bottom, const bool allow_data_in_errors, - Version* version) { + const BlobFetcher* blob_fetcher, + PrefetchBufferCollection* prefetch_buffers, + CompactionIterationStats* c_iter_stats) { // Get a copy of the internal key, before it's invalidated by iter->Next() // Also maintain the list of merge operands seen. assert(HasOperator()); @@ -212,13 +217,35 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, !range_del_agg->ShouldDelete( ikey, RangeDelPositioningMode::kForwardTraversal))) { if (ikey.type == kTypeBlobIndex) { - assert(version); - BlobFetcher blob_fetcher(version, ReadOptions()); - s = blob_fetcher.FetchBlob(ikey.user_key, val, &blob_value); + BlobIndex blob_index; + + s = blob_index.DecodeFrom(val); if (!s.ok()) { return s; } + + FilePrefetchBuffer* prefetch_buffer = + prefetch_buffers ? prefetch_buffers->GetOrCreatePrefetchBuffer( + blob_index.file_number()) + : nullptr; + + uint64_t bytes_read = 0; + + assert(blob_fetcher); + + s = blob_fetcher->FetchBlob(ikey.user_key, blob_index, + prefetch_buffer, &blob_value, + &bytes_read); + if (!s.ok()) { + return s; + } + val_ptr = &blob_value; + + if (c_iter_stats) { + ++c_iter_stats->num_blobs_read; + c_iter_stats->total_blob_bytes_read += bytes_read; + } } else { val_ptr = &val; } diff --git a/db/merge_helper.h b/db/merge_helper.h index 392934bfb..ae4262806 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -26,7 +26,9 @@ class Logger; class MergeOperator; class Statistics; class SystemClock; -class Version; +class BlobFetcher; +class PrefetchBufferCollection; +struct CompactionIterationStats; class MergeHelper { public: @@ -70,6 +72,10 @@ class MergeHelper { // we could reach the start of the history of this user key. // allow_data_in_errors: (IN) if true, data details will be displayed in // error/log messages. + // blob_fetcher: (IN) blob fetcher object for the compaction's input version. + // prefetch_buffers: (IN/OUT) a collection of blob file prefetch buffers + // used for compaction readahead. + // c_iter_stats: (OUT) compaction iteration statistics. // // Returns one of the following statuses: // - OK: Entries were successfully merged. @@ -82,11 +88,12 @@ class MergeHelper { // // REQUIRED: The first key in the input is not corrupted. Status MergeUntil(InternalIterator* iter, - CompactionRangeDelAggregator* range_del_agg = nullptr, - const SequenceNumber stop_before = 0, - const bool at_bottom = false, - const bool allow_data_in_errors = false, - Version* version = nullptr); + CompactionRangeDelAggregator* range_del_agg, + const SequenceNumber stop_before, const bool at_bottom, + const bool allow_data_in_errors, + const BlobFetcher* blob_fetcher, + PrefetchBufferCollection* prefetch_buffers, + CompactionIterationStats* c_iter_stats); // Filters a merge operand using the compaction filter specified // in the constructor. Returns the decision that the filter made. diff --git a/db/merge_helper_test.cc b/db/merge_helper_test.cc index 597eb5931..f458a1af7 100644 --- a/db/merge_helper_test.cc +++ b/db/merge_helper_test.cc @@ -32,8 +32,10 @@ class MergeHelperTest : public testing::Test { merge_helper_.reset(new MergeHelper(env_, icmp_.user_comparator(), merge_op_.get(), filter_.get(), nullptr, false, latest_snapshot)); - return merge_helper_->MergeUntil(iter_.get(), nullptr /* range_del_agg */, - stop_before, at_bottom); + return merge_helper_->MergeUntil( + iter_.get(), nullptr /* range_del_agg */, stop_before, at_bottom, + false /* allow_data_in_errors */, nullptr /* blob_fetcher */, + nullptr /* prefetch_buffers */, nullptr /* c_iter_stats */); } void AddKeyVal(const std::string& user_key, const SequenceNumber& seq, diff --git a/db/version_set.cc b/db/version_set.cc index 557698e9f..0da8c8751 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1790,12 +1790,9 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, io_tracer_(io_tracer) {} Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, - const Slice& blob_index_slice, PinnableSlice* value, - uint64_t* bytes_read) const { - if (read_options.read_tier == kBlockCacheTier) { - return Status::Incomplete("Cannot read blob: no disk I/O allowed"); - } - + const Slice& blob_index_slice, + FilePrefetchBuffer* prefetch_buffer, + PinnableSlice* value, uint64_t* bytes_read) const { BlobIndex blob_index; { @@ -1805,14 +1802,20 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, } } - return GetBlob(read_options, user_key, blob_index, value, bytes_read); + return GetBlob(read_options, user_key, blob_index, prefetch_buffer, value, + bytes_read); } Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, - const BlobIndex& blob_index, PinnableSlice* value, - uint64_t* bytes_read) const { + const BlobIndex& blob_index, + FilePrefetchBuffer* prefetch_buffer, + PinnableSlice* value, uint64_t* bytes_read) const { assert(value); + if (read_options.read_tier == kBlockCacheTier) { + return Status::Incomplete("Cannot read blob: no disk I/O allowed"); + } + if (blob_index.HasTTL() || blob_index.IsInlined()) { return Status::Corruption("Unexpected TTL/inlined blob index"); } @@ -1840,7 +1843,7 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, assert(blob_file_reader.GetValue()); const Status s = blob_file_reader.GetValue()->GetBlob( read_options, user_key, blob_index.offset(), blob_index.size(), - blob_index.compression(), value, bytes_read); + blob_index.compression(), prefetch_buffer, value, bytes_read); return s; } @@ -2067,10 +2070,11 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, if (is_blob_index) { if (do_merge && value) { + constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; constexpr uint64_t* bytes_read = nullptr; - *status = - GetBlob(read_options, user_key, *value, value, bytes_read); + *status = GetBlob(read_options, user_key, *value, prefetch_buffer, + value, bytes_read); if (!status->ok()) { if (status->IsIncomplete()) { get_context.MarkKeyMayExist(); diff --git a/db/version_set.h b/db/version_set.h index 63874b2f9..45efd6f37 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -723,13 +723,15 @@ class Version { // saves it in *value. // REQUIRES: blob_index_slice stores an encoded blob reference Status GetBlob(const ReadOptions& read_options, const Slice& user_key, - const Slice& blob_index_slice, PinnableSlice* value, + const Slice& blob_index_slice, + FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value, uint64_t* bytes_read) const; // Retrieves a blob using a blob reference and saves it in *value, // assuming the corresponding blob file is part of this Version. Status GetBlob(const ReadOptions& read_options, const Slice& user_key, - const BlobIndex& blob_index, PinnableSlice* value, + const BlobIndex& blob_index, + FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value, uint64_t* bytes_read) const; using BlobReadRequest = @@ -804,6 +806,7 @@ class Version { int TEST_refs() const { return refs_; } VersionStorageInfo* storage_info() { return &storage_info_; } + const VersionStorageInfo* storage_info() const { return &storage_info_; } VersionSet* version_set() { return vset_; } diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 1c415785f..387aa0204 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -253,6 +253,7 @@ DECLARE_string(blob_compression_type); DECLARE_bool(enable_blob_garbage_collection); DECLARE_double(blob_garbage_collection_age_cutoff); DECLARE_double(blob_garbage_collection_force_threshold); +DECLARE_uint64(blob_compaction_readahead_size); DECLARE_int32(approximate_size_one_in); DECLARE_bool(sync_fault_injection); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index a9bc883f5..d0db73eb3 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -405,6 +405,11 @@ DEFINE_double(blob_garbage_collection_force_threshold, "[Integrated BlobDB] The threshold for the ratio of garbage in " "the oldest blob files for forcing garbage collection."); +DEFINE_uint64(blob_compaction_readahead_size, + ROCKSDB_NAMESPACE::AdvancedColumnFamilyOptions() + .blob_compaction_readahead_size, + "[Integrated BlobDB] Compaction readahead for blob files."); + static const bool FLAGS_subcompactions_dummy __attribute__((__unused__)) = RegisterFlagValidator(&FLAGS_subcompactions, &ValidateUint32Range); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index d16fcd326..0dbc7c3e1 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -269,6 +269,8 @@ bool StressTest::BuildOptionsTable() { std::vector{"0.0", "0.25", "0.5", "0.75", "1.0"}); options_tbl.emplace("blob_garbage_collection_force_threshold", std::vector{"0.5", "0.75", "1.0"}); + options_tbl.emplace("blob_compaction_readahead_size", + std::vector{"0", "1M", "4M"}); } options_table_ = std::move(options_tbl); @@ -2323,6 +2325,8 @@ void StressTest::Open() { FLAGS_blob_garbage_collection_age_cutoff; options_.blob_garbage_collection_force_threshold = FLAGS_blob_garbage_collection_force_threshold; + options_.blob_compaction_readahead_size = + FLAGS_blob_compaction_readahead_size; } else { #ifdef ROCKSDB_LITE fprintf(stderr, "--options_file not supported in lite mode\n"); @@ -2422,21 +2426,18 @@ void StressTest::Open() { exit(1); } - if (options_.enable_blob_files) { - fprintf(stdout, - "Integrated BlobDB: blob files enabled, min blob size %" PRIu64 - ", blob file size %" PRIu64 ", blob compression type %s\n", - options_.min_blob_size, options_.blob_file_size, - CompressionTypeToString(options_.blob_compression_type).c_str()); - } - - if (options_.enable_blob_garbage_collection) { - fprintf( - stdout, - "Integrated BlobDB: blob GC enabled, cutoff %f, force threshold %f\n", - options_.blob_garbage_collection_age_cutoff, - options_.blob_garbage_collection_force_threshold); - } + fprintf(stdout, + "Integrated BlobDB: blob files enabled %d, min blob size %" PRIu64 + ", blob file size %" PRIu64 + ", blob compression type %s, blob GC enabled %d, cutoff %f, force " + "threshold %f, blob compaction readahead size %" PRIu64 "\n", + options_.enable_blob_files, options_.min_blob_size, + options_.blob_file_size, + CompressionTypeToString(options_.blob_compression_type).c_str(), + options_.enable_blob_garbage_collection, + options_.blob_garbage_collection_age_cutoff, + options_.blob_garbage_collection_force_threshold, + options_.blob_compaction_readahead_size); fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str()); diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index a04d66a2a..a3d6e0727 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -108,6 +108,7 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts, } bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, + RandomAccessFileReader* reader, uint64_t offset, size_t n, Slice* result, Status* status, bool for_compaction) { @@ -124,11 +125,11 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, // If readahead is not enabled: return false. if (offset + n > buffer_offset_ + buffer_.CurrentSize()) { if (readahead_size_ > 0) { - assert(file_reader_ != nullptr); + assert(reader != nullptr); assert(max_readahead_size_ >= readahead_size_); Status s; if (for_compaction) { - s = Prefetch(opts, file_reader_, offset, std::max(n, readahead_size_), + s = Prefetch(opts, reader, offset, std::max(n, readahead_size_), for_compaction); } else { if (implicit_auto_readahead_) { @@ -149,8 +150,7 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, return false; } } - s = Prefetch(opts, file_reader_, offset, n + readahead_size_, - for_compaction); + s = Prefetch(opts, reader, offset, n + readahead_size_, for_compaction); } if (!s.ok()) { if (status) { diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index e741a2cba..e91ee41ce 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -13,7 +13,6 @@ #include #include -#include "file/random_access_file_reader.h" #include "file/readahead_file_info.h" #include "port/port.h" #include "rocksdb/env.h" @@ -24,6 +23,9 @@ namespace ROCKSDB_NAMESPACE { #define DEAFULT_DECREMENT 8 * 1024 +struct IOOptions; +class RandomAccessFileReader; + // FilePrefetchBuffer is a smart buffer to store and read data from a file. class FilePrefetchBuffer { public: @@ -31,7 +33,6 @@ class FilePrefetchBuffer { // Constructor. // // All arguments are optional. - // file_reader : the file reader to use. Can be a nullptr. // readahead_size : the initial readahead size. // max_readahead_size : the maximum readahead size. // If max_readahead_size > readahead_size, the readahead size will be @@ -46,18 +47,14 @@ class FilePrefetchBuffer { // implicit_auto_readahead : Readahead is enabled implicitly by rocksdb after // doing sequential scans for two times. // - // Automatic readhead is enabled for a file if file_reader, readahead_size, + // Automatic readhead is enabled for a file if readahead_size // and max_readahead_size are passed in. - // If file_reader is a nullptr, setting readahead_size and max_readahead_size - // does not make any sense. So it does nothing. // A user can construct a FilePrefetchBuffer without any arguments, but use // `Prefetch` to load data into the buffer. - FilePrefetchBuffer(RandomAccessFileReader* file_reader = nullptr, - size_t readahead_size = 0, size_t max_readahead_size = 0, + FilePrefetchBuffer(size_t readahead_size = 0, size_t max_readahead_size = 0, bool enable = true, bool track_min_offset = false, bool implicit_auto_readahead = false) : buffer_offset_(0), - file_reader_(file_reader), readahead_size_(readahead_size), max_readahead_size_(max_readahead_size), initial_readahead_size_(readahead_size), @@ -77,18 +74,22 @@ class FilePrefetchBuffer { Status Prefetch(const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t n, bool for_compaction = false); - // Tries returning the data for a file raed from this buffer, if that data is + // Tries returning the data for a file read from this buffer if that data is // in the buffer. // It handles tracking the minimum read offset if track_min_offset = true. // It also does the exponential readahead when readahead_size is set as part // of the constructor. // - // offset : the file offset. - // n : the number of bytes. - // result : output buffer to put the data into. - // for_compaction : if cache read is done for compaction read. - bool TryReadFromCache(const IOOptions& opts, uint64_t offset, size_t n, - Slice* result, Status* s, bool for_compaction = false); + // opts : the IO options to use. + // reader : the file reader. + // offset : the file offset. + // n : the number of bytes. + // result : output buffer to put the data into. + // s : output status. + // for_compaction : true if cache read is done for compaction read. + bool TryReadFromCache(const IOOptions& opts, RandomAccessFileReader* reader, + uint64_t offset, size_t n, Slice* result, Status* s, + bool for_compaction = false); // The minimum `offset` ever passed to TryReadFromCache(). This will nly be // tracked if track_min_offset = true. @@ -145,7 +146,6 @@ class FilePrefetchBuffer { private: AlignedBuffer buffer_; uint64_t buffer_offset_; - RandomAccessFileReader* file_reader_; size_t readahead_size_; // FilePrefetchBuffer object won't be created from Iterator flow if // max_readahead_size_ = 0. diff --git a/file/file_util.cc b/file/file_util.cc index 547cf80ed..87e343a53 100644 --- a/file/file_util.cc +++ b/file/file_util.cc @@ -183,9 +183,9 @@ IOStatus GenerateOneFileChecksum( ? verify_checksums_readahead_size : default_max_read_ahead_size; - FilePrefetchBuffer prefetch_buffer( - reader.get(), readahead_size /* readahead_size */, - readahead_size /* max_readahead_size */, !allow_mmap_reads /* enable */); + FilePrefetchBuffer prefetch_buffer(readahead_size /* readahead_size */, + readahead_size /* max_readahead_size */, + !allow_mmap_reads /* enable */); Slice slice; uint64_t offset = 0; @@ -193,8 +193,9 @@ IOStatus GenerateOneFileChecksum( while (size > 0) { size_t bytes_to_read = static_cast(std::min(uint64_t{readahead_size}, size)); - if (!prefetch_buffer.TryReadFromCache(opts, offset, bytes_to_read, &slice, - nullptr, false)) { + if (!prefetch_buffer.TryReadFromCache( + opts, reader.get(), offset, bytes_to_read, &slice, + nullptr /* status */, false /* for_compaction */)) { return IOStatus::Corruption("file read failed"); } if (slice.size() == 0) { diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 75236472c..1ee09bb4b 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -819,8 +819,9 @@ struct AdvancedColumnFamilyOptions { // amplification for large-value use cases at the cost of introducing a level // of indirection for reads. See also the options min_blob_size, // blob_file_size, blob_compression_type, enable_blob_garbage_collection, - // blob_garbage_collection_age_cutoff, and - // blob_garbage_collection_force_threshold below. + // blob_garbage_collection_age_cutoff, + // blob_garbage_collection_force_threshold, and blob_compaction_readahead_size + // below. // // Default: false // @@ -893,6 +894,13 @@ struct AdvancedColumnFamilyOptions { // Dynamically changeable through the SetOptions() API double blob_garbage_collection_force_threshold = 1.0; + // Compaction readahead for blob files. + // + // Default: 0 + // + // Dynamically changeable through the SetOptions() API + uint64_t blob_compaction_readahead_size = 0; + // Create ColumnFamilyOptions with default values for all fields AdvancedColumnFamilyOptions(); // Create ColumnFamilyOptions from Options diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 730bef9f2..37107e250 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -1122,6 +1122,12 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_blob_gc_force_threshold( extern ROCKSDB_LIBRARY_API double rocksdb_options_get_blob_gc_force_threshold( rocksdb_options_t* opt); +extern ROCKSDB_LIBRARY_API void +rocksdb_options_set_blob_compaction_readahead_size(rocksdb_options_t* opt, + uint64_t val); +extern ROCKSDB_LIBRARY_API uint64_t +rocksdb_options_get_blob_compaction_readahead_size(rocksdb_options_t* opt); + /* returns a pointer to a malloc()-ed, null terminated string */ extern ROCKSDB_LIBRARY_API char* rocksdb_options_statistics_get_string( rocksdb_options_t* opt); diff --git a/options/cf_options.cc b/options/cf_options.cc index 57c965cc3..aef949d11 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -434,6 +434,10 @@ static std::unordered_map blob_garbage_collection_force_threshold), OptionType::kDouble, OptionVerificationType::kNormal, OptionTypeFlags::kMutable}}, + {"blob_compaction_readahead_size", + {offsetof(struct MutableCFOptions, blob_compaction_readahead_size), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kMutable}}, {"sample_for_compression", {offsetof(struct MutableCFOptions, sample_for_compression), OptionType::kUInt64T, OptionVerificationType::kNormal, @@ -1055,6 +1059,8 @@ void MutableCFOptions::Dump(Logger* log) const { blob_garbage_collection_age_cutoff); ROCKS_LOG_INFO(log, " blob_garbage_collection_force_threshold: %f", blob_garbage_collection_force_threshold); + ROCKS_LOG_INFO(log, " blob_compaction_readahead_size: %" PRIu64, + blob_compaction_readahead_size); } MutableCFOptions::MutableCFOptions(const Options& options) diff --git a/options/cf_options.h b/options/cf_options.h index d08096da1..ca2b59fca 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -143,6 +143,7 @@ struct MutableCFOptions { options.blob_garbage_collection_age_cutoff), blob_garbage_collection_force_threshold( options.blob_garbage_collection_force_threshold), + blob_compaction_readahead_size(options.blob_compaction_readahead_size), max_sequential_skip_in_iterations( options.max_sequential_skip_in_iterations), check_flush_compaction_key_order( @@ -190,6 +191,7 @@ struct MutableCFOptions { enable_blob_garbage_collection(false), blob_garbage_collection_age_cutoff(0.0), blob_garbage_collection_force_threshold(0.0), + blob_compaction_readahead_size(0), max_sequential_skip_in_iterations(0), check_flush_compaction_key_order(true), paranoid_file_checks(false), @@ -255,6 +257,7 @@ struct MutableCFOptions { bool enable_blob_garbage_collection; double blob_garbage_collection_age_cutoff; double blob_garbage_collection_force_threshold; + uint64_t blob_compaction_readahead_size; // Misc options uint64_t max_sequential_skip_in_iterations; diff --git a/options/options.cc b/options/options.cc index aa16663f9..969bc31a8 100644 --- a/options/options.cc +++ b/options/options.cc @@ -99,7 +99,8 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options) blob_garbage_collection_age_cutoff( options.blob_garbage_collection_age_cutoff), blob_garbage_collection_force_threshold( - options.blob_garbage_collection_force_threshold) { + options.blob_garbage_collection_force_threshold), + blob_compaction_readahead_size(options.blob_compaction_readahead_size) { assert(memtable_factory.get() != nullptr); if (max_bytes_for_level_multiplier_additional.size() < static_cast(num_levels)) { @@ -405,6 +406,9 @@ void ColumnFamilyOptions::Dump(Logger* log) const { blob_garbage_collection_age_cutoff); ROCKS_LOG_HEADER(log, "Options.blob_garbage_collection_force_threshold: %f", blob_garbage_collection_force_threshold); + ROCKS_LOG_HEADER( + log, " Options.blob_compaction_readahead_size: %" PRIu64, + blob_compaction_readahead_size); } // ColumnFamilyOptions::Dump void Options::Dump(Logger* log) const { diff --git a/options/options_helper.cc b/options/options_helper.cc index be70463f7..2df838de9 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -254,6 +254,8 @@ void UpdateColumnFamilyOptions(const MutableCFOptions& moptions, moptions.blob_garbage_collection_age_cutoff; cf_opts->blob_garbage_collection_force_threshold = moptions.blob_garbage_collection_force_threshold; + cf_opts->blob_compaction_readahead_size = + moptions.blob_compaction_readahead_size; // Misc options cf_opts->max_sequential_skip_in_iterations = diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index a95936967..b0ba1a540 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -518,6 +518,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { "enable_blob_garbage_collection=true;" "blob_garbage_collection_age_cutoff=0.5;" "blob_garbage_collection_force_threshold=0.75;" + "blob_compaction_readahead_size=262144;" "compaction_options_fifo={max_table_files_size=3;allow_" "compaction=false;age_for_warm=1;};", new_options)); diff --git a/options/options_test.cc b/options/options_test.cc index 3514420a3..0392320a8 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -109,6 +109,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"enable_blob_garbage_collection", "true"}, {"blob_garbage_collection_age_cutoff", "0.5"}, {"blob_garbage_collection_force_threshold", "0.75"}, + {"blob_compaction_readahead_size", "256K"}, }; std::unordered_map db_options_map = { @@ -241,6 +242,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_cf_opt.enable_blob_garbage_collection, true); ASSERT_EQ(new_cf_opt.blob_garbage_collection_age_cutoff, 0.5); ASSERT_EQ(new_cf_opt.blob_garbage_collection_force_threshold, 0.75); + ASSERT_EQ(new_cf_opt.blob_compaction_readahead_size, 262144); cf_options_map["write_buffer_size"] = "hello"; ASSERT_NOK(GetColumnFamilyOptionsFromMap(exact, base_cf_opt, cf_options_map, @@ -2269,6 +2271,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { {"enable_blob_garbage_collection", "true"}, {"blob_garbage_collection_age_cutoff", "0.5"}, {"blob_garbage_collection_force_threshold", "0.75"}, + {"blob_compaction_readahead_size", "256K"}, }; std::unordered_map db_options_map = { @@ -2393,6 +2396,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { ASSERT_EQ(new_cf_opt.enable_blob_garbage_collection, true); ASSERT_EQ(new_cf_opt.blob_garbage_collection_age_cutoff, 0.5); ASSERT_EQ(new_cf_opt.blob_garbage_collection_force_threshold, 0.75); + ASSERT_EQ(new_cf_opt.blob_compaction_readahead_size, 262144); cf_options_map["write_buffer_size"] = "hello"; ASSERT_NOK(GetColumnFamilyOptionsFromMap( diff --git a/src.mk b/src.mk index 46b1afcd9..12718c66b 100644 --- a/src.mk +++ b/src.mk @@ -18,6 +18,7 @@ LIB_SOURCES = \ db/blob/blob_log_format.cc \ db/blob/blob_log_sequential_reader.cc \ db/blob/blob_log_writer.cc \ + db/blob/prefetch_buffer_collection.cc \ db/builder.cc \ db/c.cc \ db/column_family.cc \ diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 348fd3009..1012b02a7 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -579,7 +579,8 @@ Status BlockBasedTable::Open( } else { // Should not prefetch for mmap mode. prefetch_buffer.reset(new FilePrefetchBuffer( - nullptr, 0, 0, false /* enable */, true /* track_min_offset */)); + 0 /* readahead_size */, 0 /* max_readahead_size */, false /* enable */, + true /* track_min_offset */)); } // Read in the following order: @@ -732,14 +733,17 @@ Status BlockBasedTable::PrefetchTail( // Try file system prefetch if (!file->use_direct_io() && !force_direct_prefetch) { if (!file->Prefetch(prefetch_off, prefetch_len).IsNotSupported()) { - prefetch_buffer->reset( - new FilePrefetchBuffer(nullptr, 0, 0, false, true)); + prefetch_buffer->reset(new FilePrefetchBuffer( + 0 /* readahead_size */, 0 /* max_readahead_size */, + false /* enable */, true /* track_min_offset */)); return Status::OK(); } } // Use `FilePrefetchBuffer` - prefetch_buffer->reset(new FilePrefetchBuffer(nullptr, 0, 0, true, true)); + prefetch_buffer->reset( + new FilePrefetchBuffer(0 /* readahead_size */, 0 /* max_readahead_size */, + true /* enable */, true /* track_min_offset */)); IOOptions opts; Status s = file->PrepareIOOptions(ro, opts); if (s.ok()) { @@ -2966,7 +2970,7 @@ Status BlockBasedTable::VerifyChecksumInBlocks( // FilePrefetchBuffer doesn't work in mmap mode and readahead is not // needed there. FilePrefetchBuffer prefetch_buffer( - rep_->file.get(), readahead_size /* readahead_size */, + readahead_size /* readahead_size */, readahead_size /* max_readahead_size */, !rep_->ioptions.allow_mmap_reads /* enable */); diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 45c8a7e73..8efcd7e09 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -683,10 +683,10 @@ struct BlockBasedTable::Rep { size_t max_readahead_size, std::unique_ptr* fpb, bool implicit_auto_readahead) const { - fpb->reset(new FilePrefetchBuffer( - file.get(), readahead_size, max_readahead_size, - !ioptions.allow_mmap_reads /* enable */, false /* track_min_offset*/, - implicit_auto_readahead)); + fpb->reset(new FilePrefetchBuffer(readahead_size, max_readahead_size, + !ioptions.allow_mmap_reads /* enable */, + false /* track_min_offset */, + implicit_auto_readahead)); } void CreateFilePrefetchBufferIfNotExists( diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index 03ca5ce47..5283b1aa5 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -69,9 +69,10 @@ inline bool BlockFetcher::TryGetFromPrefetchBuffer() { if (prefetch_buffer_ != nullptr) { IOOptions opts; IOStatus io_s = file_->PrepareIOOptions(read_options_, opts); - if (io_s.ok() && prefetch_buffer_->TryReadFromCache( - opts, handle_.offset(), block_size_with_trailer_, - &slice_, &io_s, for_compaction_)) { + if (io_s.ok() && + prefetch_buffer_->TryReadFromCache(opts, file_, handle_.offset(), + block_size_with_trailer_, &slice_, + &io_s, for_compaction_)) { ProcessTrailerIfPresent(); if (!io_status_.ok()) { return true; diff --git a/table/format.cc b/table/format.cc index c68fbb132..10dbd3f14 100644 --- a/table/format.cc +++ b/table/format.cc @@ -330,7 +330,7 @@ Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file, // for iterator, TryReadFromCache might do a readahead. Revisit to see if we // need to pass a timeout at that point if (prefetch_buffer == nullptr || - !prefetch_buffer->TryReadFromCache(IOOptions(), read_offset, + !prefetch_buffer->TryReadFromCache(IOOptions(), file, read_offset, Footer::kMaxEncodedLength, &footer_input, nullptr)) { if (file->use_direct_io()) { diff --git a/table/get_context.cc b/table/get_context.cc index 80906f795..c61f82b1a 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -382,7 +382,11 @@ void GetContext::Merge(const Slice* value) { bool GetContext::GetBlobValue(const Slice& blob_index, PinnableSlice* blob_value) { - Status status = blob_fetcher_->FetchBlob(user_key_, blob_index, blob_value); + constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; + constexpr uint64_t* bytes_read = nullptr; + + Status status = blob_fetcher_->FetchBlob( + user_key_, blob_index, prefetch_buffer, blob_value, bytes_read); if (!status.ok()) { if (status.IsIncomplete()) { MarkKeyMayExist(); diff --git a/table/sst_file_dumper.cc b/table/sst_file_dumper.cc index ff9c42f6c..8a2be95a6 100644 --- a/table/sst_file_dumper.cc +++ b/table/sst_file_dumper.cc @@ -96,8 +96,9 @@ Status SstFileDumper::GetTableReader(const std::string& file_path) { file_.reset(new RandomAccessFileReader(std::move(file), file_path)); - FilePrefetchBuffer prefetch_buffer(nullptr, 0, 0, true /* enable */, - false /* track_min_offset */); + FilePrefetchBuffer prefetch_buffer( + 0 /* readahead_size */, 0 /* max_readahead_size */, true /* enable */, + false /* track_min_offset */); if (s.ok()) { const uint64_t kSstDumpTailPrefetchSize = 512 * 1024; uint64_t prefetch_size = (file_size > kSstDumpTailPrefetchSize) diff --git a/table/table_test.cc b/table/table_test.cc index b845b976f..c0b254137 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -4937,11 +4937,18 @@ TEST_F(BBTTailPrefetchTest, TestTailPrefetchStats) { TEST_F(BBTTailPrefetchTest, FilePrefetchBufferMinOffset) { TailPrefetchStats tpstats; - FilePrefetchBuffer buffer(nullptr, 0, 0, false, true); + FilePrefetchBuffer buffer(0 /* readahead_size */, 0 /* max_readahead_size */, + false /* enable */, true /* track_min_offset */); IOOptions opts; - buffer.TryReadFromCache(opts, 500, 10, nullptr, nullptr); - buffer.TryReadFromCache(opts, 480, 10, nullptr, nullptr); - buffer.TryReadFromCache(opts, 490, 10, nullptr, nullptr); + buffer.TryReadFromCache(opts, nullptr /* reader */, 500 /* offset */, + 10 /* n */, nullptr /* result */, + nullptr /* status */); + buffer.TryReadFromCache(opts, nullptr /* reader */, 480 /* offset */, + 10 /* n */, nullptr /* result */, + nullptr /* status */); + buffer.TryReadFromCache(opts, nullptr /* reader */, 490 /* offset */, + 10 /* n */, nullptr /* result */, + nullptr /* status */); ASSERT_EQ(480, buffer.min_offset_read()); } diff --git a/test_util/testutil.cc b/test_util/testutil.cc index 986b8d184..78d09ee0f 100644 --- a/test_util/testutil.cc +++ b/test_util/testutil.cc @@ -446,6 +446,7 @@ void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, DBOptions& db_options, uint_max + rnd->Uniform(10000); cf_opt->min_blob_size = uint_max + rnd->Uniform(10000); cf_opt->blob_file_size = uint_max + rnd->Uniform(10000); + cf_opt->blob_compaction_readahead_size = uint_max + rnd->Uniform(10000); // unsigned int options cf_opt->rate_limit_delay_max_milliseconds = rnd->Uniform(10000); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 8e6795e94..16019dd29 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1002,6 +1002,11 @@ DEFINE_double(blob_garbage_collection_force_threshold, "[Integrated BlobDB] The threshold for the ratio of garbage in " "the oldest blob files for forcing garbage collection."); +DEFINE_uint64(blob_compaction_readahead_size, + ROCKSDB_NAMESPACE::AdvancedColumnFamilyOptions() + .blob_compaction_readahead_size, + "[Integrated BlobDB] Compaction readahead for blob files."); + #ifndef ROCKSDB_LITE // Secondary DB instance Options @@ -4365,6 +4370,8 @@ class Benchmark { FLAGS_blob_garbage_collection_age_cutoff; options.blob_garbage_collection_force_threshold = FLAGS_blob_garbage_collection_force_threshold; + options.blob_compaction_readahead_size = + FLAGS_blob_compaction_readahead_size; #ifndef ROCKSDB_LITE if (FLAGS_readonly && FLAGS_transaction_db) { diff --git a/tools/db_bench_tool_test.cc b/tools/db_bench_tool_test.cc index bad4ae5c0..4b4284934 100644 --- a/tools/db_bench_tool_test.cc +++ b/tools/db_bench_tool_test.cc @@ -278,6 +278,7 @@ const std::string options_file_content = R"OPTIONS_FILE( enable_blob_garbage_collection=true blob_garbage_collection_age_cutoff=0.5 blob_garbage_collection_force_threshold=0.75 + blob_compaction_readahead_size=262144 [TableOptions/BlockBasedTable "default"] format_version=0 diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 71f44923b..4498fac76 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -294,6 +294,7 @@ blob_params = { "enable_blob_garbage_collection": lambda: random.choice([0] + [1] * 3), "blob_garbage_collection_age_cutoff": lambda: random.choice([0.0, 0.25, 0.5, 0.75, 1.0]), "blob_garbage_collection_force_threshold": lambda: random.choice([0.5, 0.75, 1.0]), + "blob_compaction_readahead_size": lambda: random.choice([0, 1048576, 4194304]), } ts_params = {