Support readahead during compaction for blob files (#9187)

Summary:
The patch adds a new BlobDB configuration option `blob_compaction_readahead_size`
that can be used to enable prefetching data from blob files during compaction.
This is important when using storage with higher latencies like HDDs or remote filesystems.
If enabled, prefetching is used for all cases when blobs are read during compaction,
namely garbage collection, compaction filters (when the existing value has to be read from
a blob file), and `Merge` (when the value of the base `Put` is stored in a blob file).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9187

Test Plan: Ran `make check` and the stress/crash test.

Reviewed By: riversand963

Differential Revision: D32565512

Pulled By: ltamasi

fbshipit-source-id: 87be9cebc3aa01cc227bec6b5f64d827b8164f5d
main
Levi Tamasi 3 years ago committed by Facebook GitHub Bot
parent cd4ea675e3
commit dc5de45af8
  1. 1
      CMakeLists.txt
  2. 2
      TARGETS
  3. 28
      db/blob/blob_fetcher.cc
  4. 21
      db/blob/blob_fetcher.h
  5. 24
      db/blob/blob_file_reader.cc
  6. 4
      db/blob/blob_file_reader.h
  7. 57
      db/blob/blob_file_reader_test.cc
  8. 118
      db/blob/db_blob_compaction_test.cc
  9. 21
      db/blob/prefetch_buffer_collection.cc
  10. 38
      db/blob/prefetch_buffer_collection.h
  11. 10
      db/c.cc
  12. 4
      db/c_test.c
  13. 4
      db/compaction/compaction_iteration_stats.h
  14. 105
      db/compaction/compaction_iterator.cc
  15. 25
      db/compaction/compaction_iterator.h
  16. 6
      db/compaction/compaction_iterator_test.cc
  17. 3
      db/db_iter.cc
  18. 35
      db/merge_helper.cc
  19. 19
      db/merge_helper.h
  20. 6
      db/merge_helper_test.cc
  21. 28
      db/version_set.cc
  22. 7
      db/version_set.h
  23. 1
      db_stress_tool/db_stress_common.h
  24. 5
      db_stress_tool/db_stress_gflags.cc
  25. 31
      db_stress_tool/db_stress_test_base.cc
  26. 8
      file/file_prefetch_buffer.cc
  27. 32
      file/file_prefetch_buffer.h
  28. 11
      file/file_util.cc
  29. 12
      include/rocksdb/advanced_options.h
  30. 6
      include/rocksdb/c.h
  31. 6
      options/cf_options.cc
  32. 3
      options/cf_options.h
  33. 6
      options/options.cc
  34. 2
      options/options_helper.cc
  35. 1
      options/options_settable_test.cc
  36. 4
      options/options_test.cc
  37. 1
      src.mk
  38. 14
      table/block_based/block_based_table_reader.cc
  39. 8
      table/block_based/block_based_table_reader.h
  40. 7
      table/block_fetcher.cc
  41. 2
      table/format.cc
  42. 6
      table/get_context.cc
  43. 5
      table/sst_file_dumper.cc
  44. 15
      table/table_test.cc
  45. 1
      test_util/testutil.cc
  46. 7
      tools/db_bench_tool.cc
  47. 1
      tools/db_bench_tool_test.cc
  48. 1
      tools/db_crashtest.py

@ -652,6 +652,7 @@ set(SOURCES
db/blob/blob_log_format.cc
db/blob/blob_log_sequential_reader.cc
db/blob/blob_log_writer.cc
db/blob/prefetch_buffer_collection.cc
db/builder.cc
db/c.cc
db/column_family.cc

@ -159,6 +159,7 @@ cpp_library(
"db/blob/blob_log_format.cc",
"db/blob/blob_log_sequential_reader.cc",
"db/blob/blob_log_writer.cc",
"db/blob/prefetch_buffer_collection.cc",
"db/builder.cc",
"db/c.cc",
"db/column_family.cc",
@ -487,6 +488,7 @@ cpp_library(
"db/blob/blob_log_format.cc",
"db/blob/blob_log_sequential_reader.cc",
"db/blob/blob_log_writer.cc",
"db/blob/prefetch_buffer_collection.cc",
"db/builder.cc",
"db/c.cc",
"db/column_family.cc",

@ -9,14 +9,26 @@
namespace ROCKSDB_NAMESPACE {
Status BlobFetcher::FetchBlob(const Slice& user_key, const Slice& blob_index,
PinnableSlice* blob_value) {
Status s;
Status BlobFetcher::FetchBlob(const Slice& user_key,
const Slice& blob_index_slice,
FilePrefetchBuffer* prefetch_buffer,
PinnableSlice* blob_value,
uint64_t* bytes_read) const {
assert(version_);
constexpr uint64_t* bytes_read = nullptr;
s = version_->GetBlob(read_options_, user_key, blob_index, blob_value,
bytes_read);
return s;
return version_->GetBlob(read_options_, user_key, blob_index_slice,
prefetch_buffer, blob_value, bytes_read);
}
Status BlobFetcher::FetchBlob(const Slice& user_key,
const BlobIndex& blob_index,
FilePrefetchBuffer* prefetch_buffer,
PinnableSlice* blob_value,
uint64_t* bytes_read) const {
assert(version_);
return version_->GetBlob(read_options_, user_key, blob_index, prefetch_buffer,
blob_value, bytes_read);
}
} // namespace ROCKSDB_NAMESPACE
} // namespace ROCKSDB_NAMESPACE

@ -9,18 +9,29 @@
#include "rocksdb/status.h"
namespace ROCKSDB_NAMESPACE {
class Version;
class Slice;
class FilePrefetchBuffer;
class PinnableSlice;
class BlobIndex;
// A thin wrapper around the blob retrieval functionality of Version.
class BlobFetcher {
public:
BlobFetcher(Version* version, const ReadOptions& read_options)
BlobFetcher(const Version* version, const ReadOptions& read_options)
: version_(version), read_options_(read_options) {}
Status FetchBlob(const Slice& user_key, const Slice& blob_index,
PinnableSlice* blob_value);
Status FetchBlob(const Slice& user_key, const Slice& blob_index_slice,
FilePrefetchBuffer* prefetch_buffer,
PinnableSlice* blob_value, uint64_t* bytes_read) const;
Status FetchBlob(const Slice& user_key, const BlobIndex& blob_index,
FilePrefetchBuffer* prefetch_buffer,
PinnableSlice* blob_value, uint64_t* bytes_read) const;
private:
Version* version_;
const Version* version_;
ReadOptions read_options_;
};
} // namespace ROCKSDB_NAMESPACE
} // namespace ROCKSDB_NAMESPACE

@ -9,6 +9,7 @@
#include <string>
#include "db/blob/blob_log_format.h"
#include "file/file_prefetch_buffer.h"
#include "file/filename.h"
#include "monitoring/statistics.h"
#include "options/cf_options.h"
@ -282,6 +283,7 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options,
const Slice& user_key, uint64_t offset,
uint64_t value_size,
CompressionType compression_type,
FilePrefetchBuffer* prefetch_buffer,
PinnableSlice* value,
uint64_t* bytes_read) const {
assert(value);
@ -313,7 +315,21 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options,
Buffer buf;
AlignedBuf aligned_buf;
{
bool prefetched = false;
if (prefetch_buffer) {
Status s;
constexpr bool for_compaction = true;
prefetched = prefetch_buffer->TryReadFromCache(
IOOptions(), file_reader_.get(), record_offset,
static_cast<size_t>(record_size), &record_slice, &s, for_compaction);
if (!s.ok()) {
return s;
}
}
if (!prefetched) {
TEST_SYNC_POINT("BlobFileReader::GetBlob:ReadFromFile");
const Status s = ReadFromFile(file_reader_.get(), record_offset,
@ -322,11 +338,11 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options,
if (!s.ok()) {
return s;
}
TEST_SYNC_POINT_CALLBACK("BlobFileReader::GetBlob:TamperWithResult",
&record_slice);
}
TEST_SYNC_POINT_CALLBACK("BlobFileReader::GetBlob:TamperWithResult",
&record_slice);
if (read_options.verify_checksums) {
const Status s = VerifyBlob(record_slice, user_key, value_size);
if (!s.ok()) {

@ -21,6 +21,7 @@ struct FileOptions;
class HistogramImpl;
struct ReadOptions;
class Slice;
class FilePrefetchBuffer;
class PinnableSlice;
class Statistics;
@ -41,7 +42,8 @@ class BlobFileReader {
Status GetBlob(const ReadOptions& read_options, const Slice& user_key,
uint64_t offset, uint64_t value_size,
CompressionType compression_type, PinnableSlice* value,
CompressionType compression_type,
FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value,
uint64_t* bytes_read) const;
// offsets must be sorted in ascending order by caller.

@ -179,13 +179,15 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
ReadOptions read_options;
read_options.verify_checksums = false;
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
{
PinnableSlice value;
uint64_t bytes_read = 0;
ASSERT_OK(reader->GetBlob(read_options, keys[0], blob_offsets[0],
blob_sizes[0], kNoCompression, &value,
&bytes_read));
blob_sizes[0], kNoCompression, prefetch_buffer,
&value, &bytes_read));
ASSERT_EQ(value, blobs[0]);
ASSERT_EQ(bytes_read, blob_sizes[0]);
@ -222,8 +224,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
uint64_t bytes_read = 0;
ASSERT_OK(reader->GetBlob(read_options, keys[1], blob_offsets[1],
blob_sizes[1], kNoCompression, &value,
&bytes_read));
blob_sizes[1], kNoCompression, prefetch_buffer,
&value, &bytes_read));
ASSERT_EQ(value, blobs[1]);
const uint64_t key_size = keys[1].size();
@ -239,8 +241,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
ASSERT_TRUE(reader
->GetBlob(read_options, keys[0], blob_offsets[0] - 1,
blob_sizes[0], kNoCompression, &value,
&bytes_read)
blob_sizes[0], kNoCompression, prefetch_buffer,
&value, &bytes_read)
.IsCorruption());
ASSERT_EQ(bytes_read, 0);
}
@ -252,8 +254,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
ASSERT_TRUE(reader
->GetBlob(read_options, keys[2], blob_offsets[2] + 1,
blob_sizes[2], kNoCompression, &value,
&bytes_read)
blob_sizes[2], kNoCompression, prefetch_buffer,
&value, &bytes_read)
.IsCorruption());
ASSERT_EQ(bytes_read, 0);
}
@ -265,7 +267,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
ASSERT_TRUE(reader
->GetBlob(read_options, keys[0], blob_offsets[0],
blob_sizes[0], kZSTD, &value, &bytes_read)
blob_sizes[0], kZSTD, prefetch_buffer, &value,
&bytes_read)
.IsCorruption());
ASSERT_EQ(bytes_read, 0);
}
@ -280,8 +283,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
->GetBlob(read_options, shorter_key,
blob_offsets[0] -
(keys[0].size() - sizeof(shorter_key) + 1),
blob_sizes[0], kNoCompression, &value,
&bytes_read)
blob_sizes[0], kNoCompression, prefetch_buffer,
&value, &bytes_read)
.IsCorruption());
ASSERT_EQ(bytes_read, 0);
@ -323,8 +326,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
ASSERT_TRUE(reader
->GetBlob(read_options, incorrect_key, blob_offsets[0],
blob_sizes[0], kNoCompression, &value,
&bytes_read)
blob_sizes[0], kNoCompression, prefetch_buffer,
&value, &bytes_read)
.IsCorruption());
ASSERT_EQ(bytes_read, 0);
@ -363,8 +366,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
ASSERT_TRUE(reader
->GetBlob(read_options, keys[1], blob_offsets[1],
blob_sizes[1] + 1, kNoCompression, &value,
&bytes_read)
blob_sizes[1] + 1, kNoCompression,
prefetch_buffer, &value, &bytes_read)
.IsCorruption());
ASSERT_EQ(bytes_read, 0);
@ -642,12 +645,14 @@ TEST_F(BlobFileReaderTest, BlobCRCError) {
SyncPoint::GetInstance()->EnableProcessing();
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
PinnableSlice value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
->GetBlob(ReadOptions(), key, blob_offset, blob_size,
kNoCompression, &value, &bytes_read)
kNoCompression, prefetch_buffer, &value,
&bytes_read)
.IsCorruption());
ASSERT_EQ(bytes_read, 0);
@ -695,12 +700,15 @@ TEST_F(BlobFileReaderTest, Compression) {
ReadOptions read_options;
read_options.verify_checksums = false;
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
{
PinnableSlice value;
uint64_t bytes_read = 0;
ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size,
kSnappyCompression, &value, &bytes_read));
kSnappyCompression, prefetch_buffer, &value,
&bytes_read));
ASSERT_EQ(value, blob);
ASSERT_EQ(bytes_read, blob_size);
}
@ -712,7 +720,8 @@ TEST_F(BlobFileReaderTest, Compression) {
uint64_t bytes_read = 0;
ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size,
kSnappyCompression, &value, &bytes_read));
kSnappyCompression, prefetch_buffer, &value,
&bytes_read));
ASSERT_EQ(value, blob);
constexpr uint64_t key_size = sizeof(key) - 1;
@ -770,12 +779,14 @@ TEST_F(BlobFileReaderTest, UncompressionError) {
SyncPoint::GetInstance()->EnableProcessing();
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
PinnableSlice value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
->GetBlob(ReadOptions(), key, blob_offset, blob_size,
kSnappyCompression, &value, &bytes_read)
kSnappyCompression, prefetch_buffer, &value,
&bytes_read)
.IsCorruption());
ASSERT_EQ(bytes_read, 0);
@ -854,12 +865,14 @@ TEST_P(BlobFileReaderIOErrorTest, IOError) {
} else {
ASSERT_OK(s);
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
PinnableSlice value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
->GetBlob(ReadOptions(), key, blob_offset, blob_size,
kNoCompression, &value, &bytes_read)
kNoCompression, prefetch_buffer, &value,
&bytes_read)
.IsIOError());
ASSERT_EQ(bytes_read, 0);
}
@ -937,12 +950,14 @@ TEST_P(BlobFileReaderDecodingErrorTest, DecodingError) {
} else {
ASSERT_OK(s);
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
PinnableSlice value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
->GetBlob(ReadOptions(), key, blob_offset, blob_size,
kNoCompression, &value, &bytes_read)
kNoCompression, prefetch_buffer, &value,
&bytes_read)
.IsCorruption());
ASSERT_EQ(bytes_read, 0);
}

@ -590,6 +590,124 @@ TEST_F(DBBlobCompactionTest, MergeBlobWithBase) {
Close();
}
TEST_F(DBBlobCompactionTest, CompactionReadaheadGarbageCollection) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
options.enable_blob_garbage_collection = true;
options.blob_garbage_collection_age_cutoff = 1.0;
options.blob_compaction_readahead_size = 1 << 10;
options.disable_auto_compactions = true;
Reopen(options);
ASSERT_OK(Put("key", "lime"));
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
ASSERT_OK(Put("key", "pie"));
ASSERT_OK(Put("foo", "baz"));
ASSERT_OK(Flush());
size_t num_non_prefetch_reads = 0;
SyncPoint::GetInstance()->SetCallBack(
"BlobFileReader::GetBlob:ReadFromFile",
[&num_non_prefetch_reads](void* /* arg */) { ++num_non_prefetch_reads; });
SyncPoint::GetInstance()->EnableProcessing();
constexpr Slice* begin = nullptr;
constexpr Slice* end = nullptr;
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_EQ(Get("key"), "pie");
ASSERT_EQ(Get("foo"), "baz");
ASSERT_EQ(num_non_prefetch_reads, 0);
Close();
}
TEST_F(DBBlobCompactionTest, CompactionReadaheadFilter) {
Options options = GetDefaultOptions();
std::unique_ptr<CompactionFilter> compaction_filter_guard(
new ValueMutationFilter("pie"));
options.compaction_filter = compaction_filter_guard.get();
options.enable_blob_files = true;
options.min_blob_size = 0;
options.blob_compaction_readahead_size = 1 << 10;
options.disable_auto_compactions = true;
Reopen(options);
ASSERT_OK(Put("key", "lime"));
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
size_t num_non_prefetch_reads = 0;
SyncPoint::GetInstance()->SetCallBack(
"BlobFileReader::GetBlob:ReadFromFile",
[&num_non_prefetch_reads](void* /* arg */) { ++num_non_prefetch_reads; });
SyncPoint::GetInstance()->EnableProcessing();
constexpr Slice* begin = nullptr;
constexpr Slice* end = nullptr;
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_EQ(Get("key"), "limepie");
ASSERT_EQ(Get("foo"), "barpie");
ASSERT_EQ(num_non_prefetch_reads, 0);
Close();
}
TEST_F(DBBlobCompactionTest, CompactionReadaheadMerge) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
options.blob_compaction_readahead_size = 1 << 10;
options.merge_operator = MergeOperators::CreateStringAppendOperator();
options.disable_auto_compactions = true;
Reopen(options);
ASSERT_OK(Put("key", "lime"));
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("key", "pie"));
ASSERT_OK(Merge("foo", "baz"));
ASSERT_OK(Flush());
size_t num_non_prefetch_reads = 0;
SyncPoint::GetInstance()->SetCallBack(
"BlobFileReader::GetBlob:ReadFromFile",
[&num_non_prefetch_reads](void* /* arg */) { ++num_non_prefetch_reads; });
SyncPoint::GetInstance()->EnableProcessing();
constexpr Slice* begin = nullptr;
constexpr Slice* end = nullptr;
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_EQ(Get("key"), "lime,pie");
ASSERT_EQ(Get("foo"), "bar,baz");
ASSERT_EQ(num_non_prefetch_reads, 0);
Close();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -0,0 +1,21 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/blob/prefetch_buffer_collection.h"
namespace ROCKSDB_NAMESPACE {
FilePrefetchBuffer* PrefetchBufferCollection::GetOrCreatePrefetchBuffer(
uint64_t file_number) {
auto& prefetch_buffer = prefetch_buffers_[file_number];
if (!prefetch_buffer) {
prefetch_buffer.reset(
new FilePrefetchBuffer(readahead_size_, readahead_size_));
}
return prefetch_buffer.get();
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,38 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <cassert>
#include <cstdint>
#include <memory>
#include <unordered_map>
#include "file/file_prefetch_buffer.h"
#include "rocksdb/rocksdb_namespace.h"
namespace ROCKSDB_NAMESPACE {
// A class that owns a collection of FilePrefetchBuffers using the file number
// as key. Used for implementing compaction readahead for blob files. Designed
// to be accessed by a single thread only: every (sub)compaction needs its own
// buffers since they are guaranteed to read different blobs from different
// positions even when reading the same file.
class PrefetchBufferCollection {
public:
explicit PrefetchBufferCollection(uint64_t readahead_size)
: readahead_size_(readahead_size) {
assert(readahead_size_ > 0);
}
FilePrefetchBuffer* GetOrCreatePrefetchBuffer(uint64_t file_number);
private:
uint64_t readahead_size_;
std::unordered_map<uint64_t, std::unique_ptr<FilePrefetchBuffer>>
prefetch_buffers_; // maps file number to prefetch buffer
};
} // namespace ROCKSDB_NAMESPACE

@ -2760,6 +2760,16 @@ double rocksdb_options_get_blob_gc_force_threshold(rocksdb_options_t* opt) {
return opt->rep.blob_garbage_collection_force_threshold;
}
void rocksdb_options_set_blob_compaction_readahead_size(rocksdb_options_t* opt,
uint64_t val) {
opt->rep.blob_compaction_readahead_size = val;
}
uint64_t rocksdb_options_get_blob_compaction_readahead_size(
rocksdb_options_t* opt) {
return opt->rep.blob_compaction_readahead_size;
}
void rocksdb_options_set_num_levels(rocksdb_options_t* opt, int n) {
opt->rep.num_levels = n;
}

@ -1799,6 +1799,10 @@ int main(int argc, char** argv) {
rocksdb_options_set_blob_gc_force_threshold(o, 0.75);
CheckCondition(0.75 == rocksdb_options_get_blob_gc_force_threshold(o));
rocksdb_options_set_blob_compaction_readahead_size(o, 262144);
CheckCondition(262144 ==
rocksdb_options_get_blob_compaction_readahead_size(o));
// Create a copy that should be equal to the original.
rocksdb_options_t* copy;
copy = rocksdb_options_create_copy(o);

@ -9,6 +9,8 @@
#include "rocksdb/rocksdb_namespace.h"
namespace ROCKSDB_NAMESPACE {
struct CompactionIterationStats {
// Compaction statistics
@ -43,3 +45,5 @@ struct CompactionIterationStats {
uint64_t num_blobs_relocated = 0;
uint64_t total_blob_bytes_relocated = 0;
};
} // namespace ROCKSDB_NAMESPACE

@ -8,8 +8,10 @@
#include <iterator>
#include <limits>
#include "db/blob/blob_fetcher.h"
#include "db/blob/blob_file_builder.h"
#include "db/blob/blob_index.h"
#include "db/blob/prefetch_buffer_collection.h"
#include "db/snapshot_checker.h"
#include "logging/logging.h"
#include "port/likely.h"
@ -88,6 +90,9 @@ CompactionIterator::CompactionIterator(
merge_out_iter_(merge_helper_),
blob_garbage_collection_cutoff_file_number_(
ComputeBlobGarbageCollectionCutoffFileNumber(compaction_.get())),
blob_fetcher_(CreateBlobFetcherIfNeeded(compaction_.get())),
prefetch_buffers_(
CreatePrefetchBufferCollectionIfNeeded(compaction_.get())),
current_key_committed_(false),
cmp_with_history_ts_low_(0),
level_(compaction_ == nullptr ? 0 : compaction_->level()) {
@ -225,6 +230,13 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
compaction_filter_skip_until_.rep());
if (CompactionFilter::Decision::kUndetermined == filter &&
!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
if (compaction_ == nullptr) {
status_ =
Status::Corruption("Unexpected blob index outside of compaction");
valid_ = false;
return false;
}
// For integrated BlobDB impl, CompactionIterator reads blob value.
// For Stacked BlobDB impl, the corresponding CompactionFilter's
// FilterV2 method should read the blob value.
@ -235,23 +247,19 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
valid_ = false;
return false;
}
if (blob_index.HasTTL() || blob_index.IsInlined()) {
status_ = Status::Corruption("Unexpected TTL/inlined blob index");
valid_ = false;
return false;
}
if (compaction_ == nullptr) {
status_ =
Status::Corruption("Unexpected blob index outside of compaction");
valid_ = false;
return false;
}
const Version* const version = compaction_->input_version();
assert(version);
FilePrefetchBuffer* prefetch_buffer =
prefetch_buffers_ ? prefetch_buffers_->GetOrCreatePrefetchBuffer(
blob_index.file_number())
: nullptr;
uint64_t bytes_read = 0;
s = version->GetBlob(ReadOptions(), ikey_.user_key, blob_index,
&blob_value_, &bytes_read);
assert(blob_fetcher_);
s = blob_fetcher_->FetchBlob(ikey_.user_key, blob_index,
prefetch_buffer, &blob_value_,
&bytes_read);
if (!s.ok()) {
status_ = s;
valid_ = false;
@ -831,15 +839,15 @@ void CompactionIterator::NextFromInput() {
}
pinned_iters_mgr_.StartPinning();
Version* version = compaction_ ? compaction_->input_version() : nullptr;
// We know the merge type entry is not hidden, otherwise we would
// have hit (A)
// We encapsulate the merge related state machine in a different
// object to minimize change to the existing flow.
Status s = merge_helper_->MergeUntil(&input_, range_del_agg_,
prev_snapshot, bottommost_level_,
allow_data_in_errors_, version);
Status s = merge_helper_->MergeUntil(
&input_, range_del_agg_, prev_snapshot, bottommost_level_,
allow_data_in_errors_, blob_fetcher_.get(), prefetch_buffers_.get(),
&iter_stats_);
merge_out_iter_.SeekToFirst();
if (!s.ok() && !s.IsMergeInProgress()) {
@ -959,26 +967,23 @@ void CompactionIterator::GarbageCollectBlobIfNeeded() {
}
}
if (blob_index.IsInlined() || blob_index.HasTTL()) {
status_ = Status::Corruption("Unexpected TTL/inlined blob index");
valid_ = false;
return;
}
if (blob_index.file_number() >=
blob_garbage_collection_cutoff_file_number_) {
return;
}
const Version* const version = compaction_->input_version();
assert(version);
FilePrefetchBuffer* prefetch_buffer =
prefetch_buffers_ ? prefetch_buffers_->GetOrCreatePrefetchBuffer(
blob_index.file_number())
: nullptr;
uint64_t bytes_read = 0;
{
const Status s = version->GetBlob(ReadOptions(), user_key(), blob_index,
&blob_value_, &bytes_read);
assert(blob_fetcher_);
const Status s = blob_fetcher_->FetchBlob(
user_key(), blob_index, prefetch_buffer, &blob_value_, &bytes_read);
if (!s.ok()) {
status_ = s;
@ -1151,7 +1156,7 @@ uint64_t CompactionIterator::ComputeBlobGarbageCollectionCutoffFileNumber(
return 0;
}
Version* const version = compaction->input_version();
const Version* const version = compaction->input_version();
assert(version);
const VersionStorageInfo* const storage_info = version->storage_info();
@ -1167,4 +1172,42 @@ uint64_t CompactionIterator::ComputeBlobGarbageCollectionCutoffFileNumber(
: std::numeric_limits<uint64_t>::max();
}
std::unique_ptr<BlobFetcher> CompactionIterator::CreateBlobFetcherIfNeeded(
const CompactionProxy* compaction) {
if (!compaction) {
return nullptr;
}
const Version* const version = compaction->input_version();
if (!version) {
return nullptr;
}
return std::unique_ptr<BlobFetcher>(new BlobFetcher(version, ReadOptions()));
}
std::unique_ptr<PrefetchBufferCollection>
CompactionIterator::CreatePrefetchBufferCollectionIfNeeded(
const CompactionProxy* compaction) {
if (!compaction) {
return nullptr;
}
if (!compaction->input_version()) {
return nullptr;
}
if (compaction->allow_mmap_reads()) {
return nullptr;
}
const uint64_t readahead_size = compaction->blob_compaction_readahead_size();
if (!readahead_size) {
return nullptr;
}
return std::unique_ptr<PrefetchBufferCollection>(
new PrefetchBufferCollection(readahead_size));
}
} // namespace ROCKSDB_NAMESPACE

@ -23,6 +23,8 @@
namespace ROCKSDB_NAMESPACE {
class BlobFileBuilder;
class BlobFetcher;
class PrefetchBufferCollection;
// A wrapper of internal iterator whose purpose is to count how
// many entries there are in the iterator.
@ -92,11 +94,15 @@ class CompactionIterator {
virtual bool preserve_deletes() const = 0;
virtual bool allow_mmap_reads() const = 0;
virtual bool enable_blob_garbage_collection() const = 0;
virtual double blob_garbage_collection_age_cutoff() const = 0;
virtual Version* input_version() const = 0;
virtual uint64_t blob_compaction_readahead_size() const = 0;
virtual const Version* input_version() const = 0;
virtual bool DoesInputReferenceBlobFiles() const = 0;
@ -137,6 +143,10 @@ class CompactionIterator {
return compaction_->immutable_options()->preserve_deletes;
}
bool allow_mmap_reads() const override {
return compaction_->immutable_options()->allow_mmap_reads;
}
bool enable_blob_garbage_collection() const override {
return compaction_->mutable_cf_options()->enable_blob_garbage_collection;
}
@ -146,7 +156,11 @@ class CompactionIterator {
->blob_garbage_collection_age_cutoff;
}
Version* input_version() const override {
uint64_t blob_compaction_readahead_size() const override {
return compaction_->mutable_cf_options()->blob_compaction_readahead_size;
}
const Version* input_version() const override {
return compaction_->input_version();
}
@ -291,6 +305,10 @@ class CompactionIterator {
static uint64_t ComputeBlobGarbageCollectionCutoffFileNumber(
const CompactionProxy* compaction);
static std::unique_ptr<BlobFetcher> CreateBlobFetcherIfNeeded(
const CompactionProxy* compaction);
static std::unique_ptr<PrefetchBufferCollection>
CreatePrefetchBufferCollectionIfNeeded(const CompactionProxy* compaction);
SequenceIterWrapper input_;
const Comparator* cmp_;
@ -379,6 +397,9 @@ class CompactionIterator {
uint64_t blob_garbage_collection_cutoff_file_number_;
std::unique_ptr<BlobFetcher> blob_fetcher_;
std::unique_ptr<PrefetchBufferCollection> prefetch_buffers_;
std::string blob_index_;
PinnableSlice blob_value_;
std::string compaction_filter_value_;

@ -168,11 +168,15 @@ class FakeCompaction : public CompactionIterator::CompactionProxy {
bool preserve_deletes() const override { return false; }
bool allow_mmap_reads() const override { return false; }
bool enable_blob_garbage_collection() const override { return false; }
double blob_garbage_collection_age_cutoff() const override { return 0.0; }
Version* input_version() const override { return nullptr; }
uint64_t blob_compaction_readahead_size() const override { return 0; }
const Version* input_version() const override { return nullptr; }
bool DoesInputReferenceBlobFiles() const override { return false; }

@ -192,10 +192,11 @@ bool DBIter::SetBlobValueIfNeeded(const Slice& user_key,
read_options.read_tier = read_tier_;
read_options.verify_checksums = verify_checksums_;
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
constexpr uint64_t* bytes_read = nullptr;
const Status s = version_->GetBlob(read_options, user_key, blob_index,
&blob_value_, bytes_read);
prefetch_buffer, &blob_value_, bytes_read);
if (!s.ok()) {
status_ = s;

@ -8,6 +8,9 @@
#include <string>
#include "db/blob/blob_fetcher.h"
#include "db/blob/blob_index.h"
#include "db/blob/prefetch_buffer_collection.h"
#include "db/compaction/compaction_iteration_stats.h"
#include "db/dbformat.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h"
@ -121,7 +124,9 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
const SequenceNumber stop_before,
const bool at_bottom,
const bool allow_data_in_errors,
Version* version) {
const BlobFetcher* blob_fetcher,
PrefetchBufferCollection* prefetch_buffers,
CompactionIterationStats* c_iter_stats) {
// Get a copy of the internal key, before it's invalidated by iter->Next()
// Also maintain the list of merge operands seen.
assert(HasOperator());
@ -212,13 +217,35 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
!range_del_agg->ShouldDelete(
ikey, RangeDelPositioningMode::kForwardTraversal))) {
if (ikey.type == kTypeBlobIndex) {
assert(version);
BlobFetcher blob_fetcher(version, ReadOptions());
s = blob_fetcher.FetchBlob(ikey.user_key, val, &blob_value);
BlobIndex blob_index;
s = blob_index.DecodeFrom(val);
if (!s.ok()) {
return s;
}
FilePrefetchBuffer* prefetch_buffer =
prefetch_buffers ? prefetch_buffers->GetOrCreatePrefetchBuffer(
blob_index.file_number())
: nullptr;
uint64_t bytes_read = 0;
assert(blob_fetcher);
s = blob_fetcher->FetchBlob(ikey.user_key, blob_index,
prefetch_buffer, &blob_value,
&bytes_read);
if (!s.ok()) {
return s;
}
val_ptr = &blob_value;
if (c_iter_stats) {
++c_iter_stats->num_blobs_read;
c_iter_stats->total_blob_bytes_read += bytes_read;
}
} else {
val_ptr = &val;
}

@ -26,7 +26,9 @@ class Logger;
class MergeOperator;
class Statistics;
class SystemClock;
class Version;
class BlobFetcher;
class PrefetchBufferCollection;
struct CompactionIterationStats;
class MergeHelper {
public:
@ -70,6 +72,10 @@ class MergeHelper {
// we could reach the start of the history of this user key.
// allow_data_in_errors: (IN) if true, data details will be displayed in
// error/log messages.
// blob_fetcher: (IN) blob fetcher object for the compaction's input version.
// prefetch_buffers: (IN/OUT) a collection of blob file prefetch buffers
// used for compaction readahead.
// c_iter_stats: (OUT) compaction iteration statistics.
//
// Returns one of the following statuses:
// - OK: Entries were successfully merged.
@ -82,11 +88,12 @@ class MergeHelper {
//
// REQUIRED: The first key in the input is not corrupted.
Status MergeUntil(InternalIterator* iter,
CompactionRangeDelAggregator* range_del_agg = nullptr,
const SequenceNumber stop_before = 0,
const bool at_bottom = false,
const bool allow_data_in_errors = false,
Version* version = nullptr);
CompactionRangeDelAggregator* range_del_agg,
const SequenceNumber stop_before, const bool at_bottom,
const bool allow_data_in_errors,
const BlobFetcher* blob_fetcher,
PrefetchBufferCollection* prefetch_buffers,
CompactionIterationStats* c_iter_stats);
// Filters a merge operand using the compaction filter specified
// in the constructor. Returns the decision that the filter made.

@ -32,8 +32,10 @@ class MergeHelperTest : public testing::Test {
merge_helper_.reset(new MergeHelper(env_, icmp_.user_comparator(),
merge_op_.get(), filter_.get(), nullptr,
false, latest_snapshot));
return merge_helper_->MergeUntil(iter_.get(), nullptr /* range_del_agg */,
stop_before, at_bottom);
return merge_helper_->MergeUntil(
iter_.get(), nullptr /* range_del_agg */, stop_before, at_bottom,
false /* allow_data_in_errors */, nullptr /* blob_fetcher */,
nullptr /* prefetch_buffers */, nullptr /* c_iter_stats */);
}
void AddKeyVal(const std::string& user_key, const SequenceNumber& seq,

@ -1790,12 +1790,9 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
io_tracer_(io_tracer) {}
Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
const Slice& blob_index_slice, PinnableSlice* value,
uint64_t* bytes_read) const {
if (read_options.read_tier == kBlockCacheTier) {
return Status::Incomplete("Cannot read blob: no disk I/O allowed");
}
const Slice& blob_index_slice,
FilePrefetchBuffer* prefetch_buffer,
PinnableSlice* value, uint64_t* bytes_read) const {
BlobIndex blob_index;
{
@ -1805,14 +1802,20 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
}
}
return GetBlob(read_options, user_key, blob_index, value, bytes_read);
return GetBlob(read_options, user_key, blob_index, prefetch_buffer, value,
bytes_read);
}
Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
const BlobIndex& blob_index, PinnableSlice* value,
uint64_t* bytes_read) const {
const BlobIndex& blob_index,
FilePrefetchBuffer* prefetch_buffer,
PinnableSlice* value, uint64_t* bytes_read) const {
assert(value);
if (read_options.read_tier == kBlockCacheTier) {
return Status::Incomplete("Cannot read blob: no disk I/O allowed");
}
if (blob_index.HasTTL() || blob_index.IsInlined()) {
return Status::Corruption("Unexpected TTL/inlined blob index");
}
@ -1840,7 +1843,7 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
assert(blob_file_reader.GetValue());
const Status s = blob_file_reader.GetValue()->GetBlob(
read_options, user_key, blob_index.offset(), blob_index.size(),
blob_index.compression(), value, bytes_read);
blob_index.compression(), prefetch_buffer, value, bytes_read);
return s;
}
@ -2067,10 +2070,11 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
if (is_blob_index) {
if (do_merge && value) {
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
constexpr uint64_t* bytes_read = nullptr;
*status =
GetBlob(read_options, user_key, *value, value, bytes_read);
*status = GetBlob(read_options, user_key, *value, prefetch_buffer,
value, bytes_read);
if (!status->ok()) {
if (status->IsIncomplete()) {
get_context.MarkKeyMayExist();

@ -723,13 +723,15 @@ class Version {
// saves it in *value.
// REQUIRES: blob_index_slice stores an encoded blob reference
Status GetBlob(const ReadOptions& read_options, const Slice& user_key,
const Slice& blob_index_slice, PinnableSlice* value,
const Slice& blob_index_slice,
FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value,
uint64_t* bytes_read) const;
// Retrieves a blob using a blob reference and saves it in *value,
// assuming the corresponding blob file is part of this Version.
Status GetBlob(const ReadOptions& read_options, const Slice& user_key,
const BlobIndex& blob_index, PinnableSlice* value,
const BlobIndex& blob_index,
FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value,
uint64_t* bytes_read) const;
using BlobReadRequest =
@ -804,6 +806,7 @@ class Version {
int TEST_refs() const { return refs_; }
VersionStorageInfo* storage_info() { return &storage_info_; }
const VersionStorageInfo* storage_info() const { return &storage_info_; }
VersionSet* version_set() { return vset_; }

@ -253,6 +253,7 @@ DECLARE_string(blob_compression_type);
DECLARE_bool(enable_blob_garbage_collection);
DECLARE_double(blob_garbage_collection_age_cutoff);
DECLARE_double(blob_garbage_collection_force_threshold);
DECLARE_uint64(blob_compaction_readahead_size);
DECLARE_int32(approximate_size_one_in);
DECLARE_bool(sync_fault_injection);

@ -405,6 +405,11 @@ DEFINE_double(blob_garbage_collection_force_threshold,
"[Integrated BlobDB] The threshold for the ratio of garbage in "
"the oldest blob files for forcing garbage collection.");
DEFINE_uint64(blob_compaction_readahead_size,
ROCKSDB_NAMESPACE::AdvancedColumnFamilyOptions()
.blob_compaction_readahead_size,
"[Integrated BlobDB] Compaction readahead for blob files.");
static const bool FLAGS_subcompactions_dummy __attribute__((__unused__)) =
RegisterFlagValidator(&FLAGS_subcompactions, &ValidateUint32Range);

@ -269,6 +269,8 @@ bool StressTest::BuildOptionsTable() {
std::vector<std::string>{"0.0", "0.25", "0.5", "0.75", "1.0"});
options_tbl.emplace("blob_garbage_collection_force_threshold",
std::vector<std::string>{"0.5", "0.75", "1.0"});
options_tbl.emplace("blob_compaction_readahead_size",
std::vector<std::string>{"0", "1M", "4M"});
}
options_table_ = std::move(options_tbl);
@ -2323,6 +2325,8 @@ void StressTest::Open() {
FLAGS_blob_garbage_collection_age_cutoff;
options_.blob_garbage_collection_force_threshold =
FLAGS_blob_garbage_collection_force_threshold;
options_.blob_compaction_readahead_size =
FLAGS_blob_compaction_readahead_size;
} else {
#ifdef ROCKSDB_LITE
fprintf(stderr, "--options_file not supported in lite mode\n");
@ -2422,21 +2426,18 @@ void StressTest::Open() {
exit(1);
}
if (options_.enable_blob_files) {
fprintf(stdout,
"Integrated BlobDB: blob files enabled, min blob size %" PRIu64
", blob file size %" PRIu64 ", blob compression type %s\n",
options_.min_blob_size, options_.blob_file_size,
CompressionTypeToString(options_.blob_compression_type).c_str());
}
if (options_.enable_blob_garbage_collection) {
fprintf(
stdout,
"Integrated BlobDB: blob GC enabled, cutoff %f, force threshold %f\n",
options_.blob_garbage_collection_age_cutoff,
options_.blob_garbage_collection_force_threshold);
}
fprintf(stdout,
"Integrated BlobDB: blob files enabled %d, min blob size %" PRIu64
", blob file size %" PRIu64
", blob compression type %s, blob GC enabled %d, cutoff %f, force "
"threshold %f, blob compaction readahead size %" PRIu64 "\n",
options_.enable_blob_files, options_.min_blob_size,
options_.blob_file_size,
CompressionTypeToString(options_.blob_compression_type).c_str(),
options_.enable_blob_garbage_collection,
options_.blob_garbage_collection_age_cutoff,
options_.blob_garbage_collection_force_threshold,
options_.blob_compaction_readahead_size);
fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());

@ -108,6 +108,7 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,
}
bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t offset, size_t n,
Slice* result, Status* status,
bool for_compaction) {
@ -124,11 +125,11 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
// If readahead is not enabled: return false.
if (offset + n > buffer_offset_ + buffer_.CurrentSize()) {
if (readahead_size_ > 0) {
assert(file_reader_ != nullptr);
assert(reader != nullptr);
assert(max_readahead_size_ >= readahead_size_);
Status s;
if (for_compaction) {
s = Prefetch(opts, file_reader_, offset, std::max(n, readahead_size_),
s = Prefetch(opts, reader, offset, std::max(n, readahead_size_),
for_compaction);
} else {
if (implicit_auto_readahead_) {
@ -149,8 +150,7 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
return false;
}
}
s = Prefetch(opts, file_reader_, offset, n + readahead_size_,
for_compaction);
s = Prefetch(opts, reader, offset, n + readahead_size_, for_compaction);
}
if (!s.ok()) {
if (status) {

@ -13,7 +13,6 @@
#include <sstream>
#include <string>
#include "file/random_access_file_reader.h"
#include "file/readahead_file_info.h"
#include "port/port.h"
#include "rocksdb/env.h"
@ -24,6 +23,9 @@ namespace ROCKSDB_NAMESPACE {
#define DEAFULT_DECREMENT 8 * 1024
struct IOOptions;
class RandomAccessFileReader;
// FilePrefetchBuffer is a smart buffer to store and read data from a file.
class FilePrefetchBuffer {
public:
@ -31,7 +33,6 @@ class FilePrefetchBuffer {
// Constructor.
//
// All arguments are optional.
// file_reader : the file reader to use. Can be a nullptr.
// readahead_size : the initial readahead size.
// max_readahead_size : the maximum readahead size.
// If max_readahead_size > readahead_size, the readahead size will be
@ -46,18 +47,14 @@ class FilePrefetchBuffer {
// implicit_auto_readahead : Readahead is enabled implicitly by rocksdb after
// doing sequential scans for two times.
//
// Automatic readhead is enabled for a file if file_reader, readahead_size,
// Automatic readhead is enabled for a file if readahead_size
// and max_readahead_size are passed in.
// If file_reader is a nullptr, setting readahead_size and max_readahead_size
// does not make any sense. So it does nothing.
// A user can construct a FilePrefetchBuffer without any arguments, but use
// `Prefetch` to load data into the buffer.
FilePrefetchBuffer(RandomAccessFileReader* file_reader = nullptr,
size_t readahead_size = 0, size_t max_readahead_size = 0,
FilePrefetchBuffer(size_t readahead_size = 0, size_t max_readahead_size = 0,
bool enable = true, bool track_min_offset = false,
bool implicit_auto_readahead = false)
: buffer_offset_(0),
file_reader_(file_reader),
readahead_size_(readahead_size),
max_readahead_size_(max_readahead_size),
initial_readahead_size_(readahead_size),
@ -77,18 +74,22 @@ class FilePrefetchBuffer {
Status Prefetch(const IOOptions& opts, RandomAccessFileReader* reader,
uint64_t offset, size_t n, bool for_compaction = false);
// Tries returning the data for a file raed from this buffer, if that data is
// Tries returning the data for a file read from this buffer if that data is
// in the buffer.
// It handles tracking the minimum read offset if track_min_offset = true.
// It also does the exponential readahead when readahead_size is set as part
// of the constructor.
//
// offset : the file offset.
// n : the number of bytes.
// result : output buffer to put the data into.
// for_compaction : if cache read is done for compaction read.
bool TryReadFromCache(const IOOptions& opts, uint64_t offset, size_t n,
Slice* result, Status* s, bool for_compaction = false);
// opts : the IO options to use.
// reader : the file reader.
// offset : the file offset.
// n : the number of bytes.
// result : output buffer to put the data into.
// s : output status.
// for_compaction : true if cache read is done for compaction read.
bool TryReadFromCache(const IOOptions& opts, RandomAccessFileReader* reader,
uint64_t offset, size_t n, Slice* result, Status* s,
bool for_compaction = false);
// The minimum `offset` ever passed to TryReadFromCache(). This will nly be
// tracked if track_min_offset = true.
@ -145,7 +146,6 @@ class FilePrefetchBuffer {
private:
AlignedBuffer buffer_;
uint64_t buffer_offset_;
RandomAccessFileReader* file_reader_;
size_t readahead_size_;
// FilePrefetchBuffer object won't be created from Iterator flow if
// max_readahead_size_ = 0.

@ -183,9 +183,9 @@ IOStatus GenerateOneFileChecksum(
? verify_checksums_readahead_size
: default_max_read_ahead_size;
FilePrefetchBuffer prefetch_buffer(
reader.get(), readahead_size /* readahead_size */,
readahead_size /* max_readahead_size */, !allow_mmap_reads /* enable */);
FilePrefetchBuffer prefetch_buffer(readahead_size /* readahead_size */,
readahead_size /* max_readahead_size */,
!allow_mmap_reads /* enable */);
Slice slice;
uint64_t offset = 0;
@ -193,8 +193,9 @@ IOStatus GenerateOneFileChecksum(
while (size > 0) {
size_t bytes_to_read =
static_cast<size_t>(std::min(uint64_t{readahead_size}, size));
if (!prefetch_buffer.TryReadFromCache(opts, offset, bytes_to_read, &slice,
nullptr, false)) {
if (!prefetch_buffer.TryReadFromCache(
opts, reader.get(), offset, bytes_to_read, &slice,
nullptr /* status */, false /* for_compaction */)) {
return IOStatus::Corruption("file read failed");
}
if (slice.size() == 0) {

@ -819,8 +819,9 @@ struct AdvancedColumnFamilyOptions {
// amplification for large-value use cases at the cost of introducing a level
// of indirection for reads. See also the options min_blob_size,
// blob_file_size, blob_compression_type, enable_blob_garbage_collection,
// blob_garbage_collection_age_cutoff, and
// blob_garbage_collection_force_threshold below.
// blob_garbage_collection_age_cutoff,
// blob_garbage_collection_force_threshold, and blob_compaction_readahead_size
// below.
//
// Default: false
//
@ -893,6 +894,13 @@ struct AdvancedColumnFamilyOptions {
// Dynamically changeable through the SetOptions() API
double blob_garbage_collection_force_threshold = 1.0;
// Compaction readahead for blob files.
//
// Default: 0
//
// Dynamically changeable through the SetOptions() API
uint64_t blob_compaction_readahead_size = 0;
// Create ColumnFamilyOptions with default values for all fields
AdvancedColumnFamilyOptions();
// Create ColumnFamilyOptions from Options

@ -1122,6 +1122,12 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_blob_gc_force_threshold(
extern ROCKSDB_LIBRARY_API double rocksdb_options_get_blob_gc_force_threshold(
rocksdb_options_t* opt);
extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_blob_compaction_readahead_size(rocksdb_options_t* opt,
uint64_t val);
extern ROCKSDB_LIBRARY_API uint64_t
rocksdb_options_get_blob_compaction_readahead_size(rocksdb_options_t* opt);
/* returns a pointer to a malloc()-ed, null terminated string */
extern ROCKSDB_LIBRARY_API char* rocksdb_options_statistics_get_string(
rocksdb_options_t* opt);

@ -434,6 +434,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
blob_garbage_collection_force_threshold),
OptionType::kDouble, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"blob_compaction_readahead_size",
{offsetof(struct MutableCFOptions, blob_compaction_readahead_size),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"sample_for_compression",
{offsetof(struct MutableCFOptions, sample_for_compression),
OptionType::kUInt64T, OptionVerificationType::kNormal,
@ -1055,6 +1059,8 @@ void MutableCFOptions::Dump(Logger* log) const {
blob_garbage_collection_age_cutoff);
ROCKS_LOG_INFO(log, " blob_garbage_collection_force_threshold: %f",
blob_garbage_collection_force_threshold);
ROCKS_LOG_INFO(log, " blob_compaction_readahead_size: %" PRIu64,
blob_compaction_readahead_size);
}
MutableCFOptions::MutableCFOptions(const Options& options)

@ -143,6 +143,7 @@ struct MutableCFOptions {
options.blob_garbage_collection_age_cutoff),
blob_garbage_collection_force_threshold(
options.blob_garbage_collection_force_threshold),
blob_compaction_readahead_size(options.blob_compaction_readahead_size),
max_sequential_skip_in_iterations(
options.max_sequential_skip_in_iterations),
check_flush_compaction_key_order(
@ -190,6 +191,7 @@ struct MutableCFOptions {
enable_blob_garbage_collection(false),
blob_garbage_collection_age_cutoff(0.0),
blob_garbage_collection_force_threshold(0.0),
blob_compaction_readahead_size(0),
max_sequential_skip_in_iterations(0),
check_flush_compaction_key_order(true),
paranoid_file_checks(false),
@ -255,6 +257,7 @@ struct MutableCFOptions {
bool enable_blob_garbage_collection;
double blob_garbage_collection_age_cutoff;
double blob_garbage_collection_force_threshold;
uint64_t blob_compaction_readahead_size;
// Misc options
uint64_t max_sequential_skip_in_iterations;

@ -99,7 +99,8 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options)
blob_garbage_collection_age_cutoff(
options.blob_garbage_collection_age_cutoff),
blob_garbage_collection_force_threshold(
options.blob_garbage_collection_force_threshold) {
options.blob_garbage_collection_force_threshold),
blob_compaction_readahead_size(options.blob_compaction_readahead_size) {
assert(memtable_factory.get() != nullptr);
if (max_bytes_for_level_multiplier_additional.size() <
static_cast<unsigned int>(num_levels)) {
@ -405,6 +406,9 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
blob_garbage_collection_age_cutoff);
ROCKS_LOG_HEADER(log, "Options.blob_garbage_collection_force_threshold: %f",
blob_garbage_collection_force_threshold);
ROCKS_LOG_HEADER(
log, " Options.blob_compaction_readahead_size: %" PRIu64,
blob_compaction_readahead_size);
} // ColumnFamilyOptions::Dump
void Options::Dump(Logger* log) const {

@ -254,6 +254,8 @@ void UpdateColumnFamilyOptions(const MutableCFOptions& moptions,
moptions.blob_garbage_collection_age_cutoff;
cf_opts->blob_garbage_collection_force_threshold =
moptions.blob_garbage_collection_force_threshold;
cf_opts->blob_compaction_readahead_size =
moptions.blob_compaction_readahead_size;
// Misc options
cf_opts->max_sequential_skip_in_iterations =

@ -518,6 +518,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"enable_blob_garbage_collection=true;"
"blob_garbage_collection_age_cutoff=0.5;"
"blob_garbage_collection_force_threshold=0.75;"
"blob_compaction_readahead_size=262144;"
"compaction_options_fifo={max_table_files_size=3;allow_"
"compaction=false;age_for_warm=1;};",
new_options));

@ -109,6 +109,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"enable_blob_garbage_collection", "true"},
{"blob_garbage_collection_age_cutoff", "0.5"},
{"blob_garbage_collection_force_threshold", "0.75"},
{"blob_compaction_readahead_size", "256K"},
};
std::unordered_map<std::string, std::string> db_options_map = {
@ -241,6 +242,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_cf_opt.enable_blob_garbage_collection, true);
ASSERT_EQ(new_cf_opt.blob_garbage_collection_age_cutoff, 0.5);
ASSERT_EQ(new_cf_opt.blob_garbage_collection_force_threshold, 0.75);
ASSERT_EQ(new_cf_opt.blob_compaction_readahead_size, 262144);
cf_options_map["write_buffer_size"] = "hello";
ASSERT_NOK(GetColumnFamilyOptionsFromMap(exact, base_cf_opt, cf_options_map,
@ -2269,6 +2271,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) {
{"enable_blob_garbage_collection", "true"},
{"blob_garbage_collection_age_cutoff", "0.5"},
{"blob_garbage_collection_force_threshold", "0.75"},
{"blob_compaction_readahead_size", "256K"},
};
std::unordered_map<std::string, std::string> db_options_map = {
@ -2393,6 +2396,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_cf_opt.enable_blob_garbage_collection, true);
ASSERT_EQ(new_cf_opt.blob_garbage_collection_age_cutoff, 0.5);
ASSERT_EQ(new_cf_opt.blob_garbage_collection_force_threshold, 0.75);
ASSERT_EQ(new_cf_opt.blob_compaction_readahead_size, 262144);
cf_options_map["write_buffer_size"] = "hello";
ASSERT_NOK(GetColumnFamilyOptionsFromMap(

@ -18,6 +18,7 @@ LIB_SOURCES = \
db/blob/blob_log_format.cc \
db/blob/blob_log_sequential_reader.cc \
db/blob/blob_log_writer.cc \
db/blob/prefetch_buffer_collection.cc \
db/builder.cc \
db/c.cc \
db/column_family.cc \

@ -579,7 +579,8 @@ Status BlockBasedTable::Open(
} else {
// Should not prefetch for mmap mode.
prefetch_buffer.reset(new FilePrefetchBuffer(
nullptr, 0, 0, false /* enable */, true /* track_min_offset */));
0 /* readahead_size */, 0 /* max_readahead_size */, false /* enable */,
true /* track_min_offset */));
}
// Read in the following order:
@ -732,14 +733,17 @@ Status BlockBasedTable::PrefetchTail(
// Try file system prefetch
if (!file->use_direct_io() && !force_direct_prefetch) {
if (!file->Prefetch(prefetch_off, prefetch_len).IsNotSupported()) {
prefetch_buffer->reset(
new FilePrefetchBuffer(nullptr, 0, 0, false, true));
prefetch_buffer->reset(new FilePrefetchBuffer(
0 /* readahead_size */, 0 /* max_readahead_size */,
false /* enable */, true /* track_min_offset */));
return Status::OK();
}
}
// Use `FilePrefetchBuffer`
prefetch_buffer->reset(new FilePrefetchBuffer(nullptr, 0, 0, true, true));
prefetch_buffer->reset(
new FilePrefetchBuffer(0 /* readahead_size */, 0 /* max_readahead_size */,
true /* enable */, true /* track_min_offset */));
IOOptions opts;
Status s = file->PrepareIOOptions(ro, opts);
if (s.ok()) {
@ -2966,7 +2970,7 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
// FilePrefetchBuffer doesn't work in mmap mode and readahead is not
// needed there.
FilePrefetchBuffer prefetch_buffer(
rep_->file.get(), readahead_size /* readahead_size */,
readahead_size /* readahead_size */,
readahead_size /* max_readahead_size */,
!rep_->ioptions.allow_mmap_reads /* enable */);

@ -683,10 +683,10 @@ struct BlockBasedTable::Rep {
size_t max_readahead_size,
std::unique_ptr<FilePrefetchBuffer>* fpb,
bool implicit_auto_readahead) const {
fpb->reset(new FilePrefetchBuffer(
file.get(), readahead_size, max_readahead_size,
!ioptions.allow_mmap_reads /* enable */, false /* track_min_offset*/,
implicit_auto_readahead));
fpb->reset(new FilePrefetchBuffer(readahead_size, max_readahead_size,
!ioptions.allow_mmap_reads /* enable */,
false /* track_min_offset */,
implicit_auto_readahead));
}
void CreateFilePrefetchBufferIfNotExists(

@ -69,9 +69,10 @@ inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
if (prefetch_buffer_ != nullptr) {
IOOptions opts;
IOStatus io_s = file_->PrepareIOOptions(read_options_, opts);
if (io_s.ok() && prefetch_buffer_->TryReadFromCache(
opts, handle_.offset(), block_size_with_trailer_,
&slice_, &io_s, for_compaction_)) {
if (io_s.ok() &&
prefetch_buffer_->TryReadFromCache(opts, file_, handle_.offset(),
block_size_with_trailer_, &slice_,
&io_s, for_compaction_)) {
ProcessTrailerIfPresent();
if (!io_status_.ok()) {
return true;

@ -330,7 +330,7 @@ Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
// for iterator, TryReadFromCache might do a readahead. Revisit to see if we
// need to pass a timeout at that point
if (prefetch_buffer == nullptr ||
!prefetch_buffer->TryReadFromCache(IOOptions(), read_offset,
!prefetch_buffer->TryReadFromCache(IOOptions(), file, read_offset,
Footer::kMaxEncodedLength,
&footer_input, nullptr)) {
if (file->use_direct_io()) {

@ -382,7 +382,11 @@ void GetContext::Merge(const Slice* value) {
bool GetContext::GetBlobValue(const Slice& blob_index,
PinnableSlice* blob_value) {
Status status = blob_fetcher_->FetchBlob(user_key_, blob_index, blob_value);
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
constexpr uint64_t* bytes_read = nullptr;
Status status = blob_fetcher_->FetchBlob(
user_key_, blob_index, prefetch_buffer, blob_value, bytes_read);
if (!status.ok()) {
if (status.IsIncomplete()) {
MarkKeyMayExist();

@ -96,8 +96,9 @@ Status SstFileDumper::GetTableReader(const std::string& file_path) {
file_.reset(new RandomAccessFileReader(std::move(file), file_path));
FilePrefetchBuffer prefetch_buffer(nullptr, 0, 0, true /* enable */,
false /* track_min_offset */);
FilePrefetchBuffer prefetch_buffer(
0 /* readahead_size */, 0 /* max_readahead_size */, true /* enable */,
false /* track_min_offset */);
if (s.ok()) {
const uint64_t kSstDumpTailPrefetchSize = 512 * 1024;
uint64_t prefetch_size = (file_size > kSstDumpTailPrefetchSize)

@ -4937,11 +4937,18 @@ TEST_F(BBTTailPrefetchTest, TestTailPrefetchStats) {
TEST_F(BBTTailPrefetchTest, FilePrefetchBufferMinOffset) {
TailPrefetchStats tpstats;
FilePrefetchBuffer buffer(nullptr, 0, 0, false, true);
FilePrefetchBuffer buffer(0 /* readahead_size */, 0 /* max_readahead_size */,
false /* enable */, true /* track_min_offset */);
IOOptions opts;
buffer.TryReadFromCache(opts, 500, 10, nullptr, nullptr);
buffer.TryReadFromCache(opts, 480, 10, nullptr, nullptr);
buffer.TryReadFromCache(opts, 490, 10, nullptr, nullptr);
buffer.TryReadFromCache(opts, nullptr /* reader */, 500 /* offset */,
10 /* n */, nullptr /* result */,
nullptr /* status */);
buffer.TryReadFromCache(opts, nullptr /* reader */, 480 /* offset */,
10 /* n */, nullptr /* result */,
nullptr /* status */);
buffer.TryReadFromCache(opts, nullptr /* reader */, 490 /* offset */,
10 /* n */, nullptr /* result */,
nullptr /* status */);
ASSERT_EQ(480, buffer.min_offset_read());
}

@ -446,6 +446,7 @@ void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, DBOptions& db_options,
uint_max + rnd->Uniform(10000);
cf_opt->min_blob_size = uint_max + rnd->Uniform(10000);
cf_opt->blob_file_size = uint_max + rnd->Uniform(10000);
cf_opt->blob_compaction_readahead_size = uint_max + rnd->Uniform(10000);
// unsigned int options
cf_opt->rate_limit_delay_max_milliseconds = rnd->Uniform(10000);

@ -1002,6 +1002,11 @@ DEFINE_double(blob_garbage_collection_force_threshold,
"[Integrated BlobDB] The threshold for the ratio of garbage in "
"the oldest blob files for forcing garbage collection.");
DEFINE_uint64(blob_compaction_readahead_size,
ROCKSDB_NAMESPACE::AdvancedColumnFamilyOptions()
.blob_compaction_readahead_size,
"[Integrated BlobDB] Compaction readahead for blob files.");
#ifndef ROCKSDB_LITE
// Secondary DB instance Options
@ -4365,6 +4370,8 @@ class Benchmark {
FLAGS_blob_garbage_collection_age_cutoff;
options.blob_garbage_collection_force_threshold =
FLAGS_blob_garbage_collection_force_threshold;
options.blob_compaction_readahead_size =
FLAGS_blob_compaction_readahead_size;
#ifndef ROCKSDB_LITE
if (FLAGS_readonly && FLAGS_transaction_db) {

@ -278,6 +278,7 @@ const std::string options_file_content = R"OPTIONS_FILE(
enable_blob_garbage_collection=true
blob_garbage_collection_age_cutoff=0.5
blob_garbage_collection_force_threshold=0.75
blob_compaction_readahead_size=262144
[TableOptions/BlockBasedTable "default"]
format_version=0

@ -294,6 +294,7 @@ blob_params = {
"enable_blob_garbage_collection": lambda: random.choice([0] + [1] * 3),
"blob_garbage_collection_age_cutoff": lambda: random.choice([0.0, 0.25, 0.5, 0.75, 1.0]),
"blob_garbage_collection_force_threshold": lambda: random.choice([0.5, 0.75, 1.0]),
"blob_compaction_readahead_size": lambda: random.choice([0, 1048576, 4194304]),
}
ts_params = {

Loading…
Cancel
Save