Added log_readahead_size option to control prefetching for Log::Reader (#5592)

Summary:
Added log_readahead_size option to control prefetching for Log::Reader.
This is mostly useful for reading a remotely located log, as it can save the number of round-trips when reading it.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5592

Differential Revision: D16362989

Pulled By: elipoz

fbshipit-source-id: c5d4d5245a44008cd59879640efff70c091ad3e8
main
Eli Pozniansky 5 years ago committed by Facebook Github Bot
parent 6bb3b4b567
commit c129c75fb7
  1. 3
      db/db_impl/db_impl_open.cc
  2. 3
      db/db_impl/db_impl_secondary.cc
  3. 9
      db/version_set.cc
  4. 4
      include/rocksdb/env.h
  5. 11
      include/rocksdb/options.h
  6. 6
      options/db_options.cc
  7. 1
      options/db_options.h
  8. 5
      options/options_helper.cc
  9. 3
      options/options_settable_test.cc

@ -721,7 +721,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
continue; continue;
} }
} }
file_reader.reset(new SequentialFileReader(std::move(file), fname)); file_reader.reset(new SequentialFileReader(
std::move(file), fname, immutable_db_options_.log_readahead_size));
} }
// Create the log reader. // Create the log reader.

@ -150,7 +150,8 @@ Status DBImplSecondary::MaybeInitLogReader(
*log_reader = nullptr; *log_reader = nullptr;
return status; return status;
} }
file_reader.reset(new SequentialFileReader(std::move(file), fname)); file_reader.reset(new SequentialFileReader(
std::move(file), fname, immutable_db_options_.log_readahead_size));
} }
// Create the log reader. // Create the log reader.

@ -4267,7 +4267,8 @@ Status VersionSet::Recover(
return s; return s;
} }
manifest_file_reader.reset( manifest_file_reader.reset(
new SequentialFileReader(std::move(manifest_file), manifest_path)); new SequentialFileReader(std::move(manifest_file), manifest_path,
db_options_->log_readahead_size));
} }
uint64_t current_manifest_file_size; uint64_t current_manifest_file_size;
s = env_->GetFileSize(manifest_path, &current_manifest_file_size); s = env_->GetFileSize(manifest_path, &current_manifest_file_size);
@ -4597,7 +4598,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
file_reader.reset(new SequentialFileReader(std::move(file), dscname)); file_reader.reset(new SequentialFileReader(
std::move(file), dscname, db_options_->log_readahead_size));
} }
bool have_prev_log_number = false; bool have_prev_log_number = false;
@ -5721,7 +5723,8 @@ Status ReactiveVersionSet::MaybeSwitchManifest(
std::unique_ptr<SequentialFileReader> manifest_file_reader; std::unique_ptr<SequentialFileReader> manifest_file_reader;
if (s.ok()) { if (s.ok()) {
manifest_file_reader.reset( manifest_file_reader.reset(
new SequentialFileReader(std::move(manifest_file), manifest_path)); new SequentialFileReader(std::move(manifest_file), manifest_path,
db_options_->log_readahead_size));
manifest_reader->reset(new log::FragmentBufferedReader( manifest_reader->reset(new log::FragmentBufferedReader(
nullptr, std::move(manifest_file_reader), reporter, nullptr, std::move(manifest_file_reader), reporter,
true /* checksum */, 0 /* log_number */)); true /* checksum */, 0 /* log_number */));

@ -118,10 +118,10 @@ struct EnvOptions {
bool fallocate_with_keep_size = true; bool fallocate_with_keep_size = true;
// See DBOptions doc // See DBOptions doc
size_t compaction_readahead_size; size_t compaction_readahead_size = 0;
// See DBOptions doc // See DBOptions doc
size_t random_access_max_buffer_size; size_t random_access_max_buffer_size = 0;
// See DBOptions doc // See DBOptions doc
size_t writable_file_max_buffer_size = 1024 * 1024; size_t writable_file_max_buffer_size = 1024 * 1024;

@ -1087,6 +1087,17 @@ struct DBOptions {
// If set to true, takes precedence over // If set to true, takes precedence over
// ReadOptions::background_purge_on_iterator_cleanup. // ReadOptions::background_purge_on_iterator_cleanup.
bool avoid_unnecessary_blocking_io = false; bool avoid_unnecessary_blocking_io = false;
// The number of bytes to prefetch when reading the log. This is mostly useful
// for reading a remotely located log, as it can save the number of
// round-trips. If 0, then the prefetching is disabled.
// If non-zero, we perform bigger reads when reading the log.
// This is mostly useful for reading a remotely located log, as it can save
// the number of round-trips. If 0, then the prefetching is disabled.
//
// Default: 0
size_t log_readahead_size = 0;
}; };
// Options to control the behavior of a database (passed to DB::Open) // Options to control the behavior of a database (passed to DB::Open)

@ -85,7 +85,8 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
manual_wal_flush(options.manual_wal_flush), manual_wal_flush(options.manual_wal_flush),
atomic_flush(options.atomic_flush), atomic_flush(options.atomic_flush),
avoid_unnecessary_blocking_io(options.avoid_unnecessary_blocking_io), avoid_unnecessary_blocking_io(options.avoid_unnecessary_blocking_io),
persist_stats_to_disk(options.persist_stats_to_disk) { persist_stats_to_disk(options.persist_stats_to_disk),
log_readahead_size(options.log_readahead_size) {
} }
void ImmutableDBOptions::Dump(Logger* log) const { void ImmutableDBOptions::Dump(Logger* log) const {
@ -225,6 +226,9 @@ void ImmutableDBOptions::Dump(Logger* log) const {
avoid_unnecessary_blocking_io); avoid_unnecessary_blocking_io);
ROCKS_LOG_HEADER(log, " Options.persist_stats_to_disk: %u", ROCKS_LOG_HEADER(log, " Options.persist_stats_to_disk: %u",
persist_stats_to_disk); persist_stats_to_disk);
ROCKS_LOG_HEADER(
log, " Options.log_readahead_size: %" ROCKSDB_PRIszt,
log_readahead_size);
} }
MutableDBOptions::MutableDBOptions() MutableDBOptions::MutableDBOptions()

@ -82,6 +82,7 @@ struct ImmutableDBOptions {
bool atomic_flush; bool atomic_flush;
bool avoid_unnecessary_blocking_io; bool avoid_unnecessary_blocking_io;
bool persist_stats_to_disk; bool persist_stats_to_disk;
size_t log_readahead_size;
}; };
struct MutableDBOptions { struct MutableDBOptions {

@ -138,7 +138,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
options.atomic_flush = immutable_db_options.atomic_flush; options.atomic_flush = immutable_db_options.atomic_flush;
options.avoid_unnecessary_blocking_io = options.avoid_unnecessary_blocking_io =
immutable_db_options.avoid_unnecessary_blocking_io; immutable_db_options.avoid_unnecessary_blocking_io;
options.log_readahead_size = immutable_db_options.log_readahead_size;
return options; return options;
} }
@ -1664,6 +1664,9 @@ std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct DBOptions, avoid_unnecessary_blocking_io), {offsetof(struct DBOptions, avoid_unnecessary_blocking_io),
OptionType::kBoolean, OptionVerificationType::kNormal, false, OptionType::kBoolean, OptionVerificationType::kNormal, false,
offsetof(struct ImmutableDBOptions, avoid_unnecessary_blocking_io)}}, offsetof(struct ImmutableDBOptions, avoid_unnecessary_blocking_io)}},
{"log_readahead_size",
{offsetof(struct DBOptions, log_readahead_size), OptionType::kSizeT,
OptionVerificationType::kNormal, false, 0}},
}; };
std::unordered_map<std::string, BlockBasedTableOptions::IndexType> std::unordered_map<std::string, BlockBasedTableOptions::IndexType>

@ -295,7 +295,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
"manual_wal_flush=false;" "manual_wal_flush=false;"
"seq_per_batch=false;" "seq_per_batch=false;"
"atomic_flush=false;" "atomic_flush=false;"
"avoid_unnecessary_blocking_io=false", "avoid_unnecessary_blocking_io=false;"
"log_readahead_size=0",
new_options)); new_options));
ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions), ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions),

Loading…
Cancel
Save