Allow readahead when reading option files. (#6372)

Summary:
Right, when reading from option files, no readahead is used and 8KB buffer is used. It might introduce high latency if the file system provide high latency and doesn't do readahead. Instead, introduce a readahead to the file. When calling inside DB, infer the value from options.log_readahead. Otherwise, a default 512KB readahead size is used.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6372

Test Plan: Add --log_readahead_size in db_bench. Run it with several options and observe read size from option files using strace.

Differential Revision: D19727739

fbshipit-source-id: e6d8053b0a64259abc087f1f388b9cd66fa8a583
main
sdong 5 years ago committed by Facebook Github Bot
parent b42fa1497f
commit 876c2dbff4
  1. 3
      HISTORY.md
  2. 5
      file/read_write_util.cc
  3. 3
      file/read_write_util.h
  4. 21
      options/options_parser.cc
  5. 4
      options/options_parser.h
  6. 86
      options/options_test.cc
  7. 10
      test_util/testutil.h
  8. 3
      tools/db_bench_tool.cc
  9. 5
      tools/trace_analyzer_test.cc
  10. 7
      tools/trace_analyzer_tool.cc
  11. 5
      util/file_reader_writer_test.cc
  12. 3
      utilities/options/options_util.cc

@ -10,6 +10,9 @@
* Add DBOptions::skip_checking_sst_file_sizes_on_db_open. It disables potentially expensive checking of all sst file sizes in DB::Open().
* BlobDB now ignores trivially moved files when updating the mapping between blob files and SSTs. This should mitigate issue #6338 where out of order flush/compaction notifications could trigger an assertion with the earlier code.
### Performance Improvements
* Perfom readahead when reading from option files. Inside DB, options.log_readahead_size will be used as the readahead size. In other cases, a default 512KB is used.
### Public API Change
* The BlobDB garbage collector now emits the statistics `BLOB_DB_GC_NUM_FILES` (number of blob files obsoleted during GC), `BLOB_DB_GC_NUM_NEW_FILES` (number of new blob files generated during GC), `BLOB_DB_GC_FAILURES` (number of failed GC passes), `BLOB_DB_GC_NUM_KEYS_RELOCATED` (number of blobs relocated during GC), and `BLOB_DB_GC_BYTES_RELOCATED` (total size of blobs relocated during GC). On the other hand, the following statistics, which are not relevant for the new GC implementation, are now deprecated: `BLOB_DB_GC_NUM_KEYS_OVERWRITTEN`, `BLOB_DB_GC_NUM_KEYS_EXPIRED`, `BLOB_DB_GC_BYTES_OVERWRITTEN`, `BLOB_DB_GC_BYTES_EXPIRED`, and `BLOB_DB_GC_MICROS`.
* Disable recycle_log_file_num when an inconsistent recovery modes are requested: kPointInTimeRecovery and kAbsoluteConsistency

@ -22,7 +22,7 @@ IOStatus NewWritableFile(FileSystem* fs, const std::string& fname,
return s;
}
bool ReadOneLine(std::istringstream* iss, FSSequentialFile* seq_file,
bool ReadOneLine(std::istringstream* iss, SequentialFileReader* seq_file_reader,
std::string* output, bool* has_data, Status* result) {
const int kBufferSize = 8192;
char buffer[kBufferSize + 1];
@ -40,8 +40,7 @@ bool ReadOneLine(std::istringstream* iss, FSSequentialFile* seq_file,
// if we're not sure whether we have a complete line,
// further read from the file.
if (*has_data) {
*result = seq_file->Read(kBufferSize, IOOptions(),
&input_slice, buffer, nullptr);
*result = seq_file_reader->Read(kBufferSize, &input_slice, buffer);
}
if (input_slice.size() == 0) {
// meaning we have read all the data

@ -9,6 +9,7 @@
#pragma once
#include <atomic>
#include "file/sequence_file_reader.h"
#include "rocksdb/env.h"
#include "rocksdb/file_system.h"
@ -24,7 +25,7 @@ extern IOStatus NewWritableFile(FileSystem* fs, const std::string& fname,
const FileOptions& options);
// Read a single line from a file.
bool ReadOneLine(std::istringstream* iss, FSSequentialFile* seq_file,
bool ReadOneLine(std::istringstream* iss, SequentialFileReader* seq_file_reader,
std::string* output, bool* has_data, Status* result);
#ifndef NDEBUG

@ -203,7 +203,8 @@ Status RocksDBOptionsParser::ParseStatement(std::string* name,
}
Status RocksDBOptionsParser::Parse(const std::string& file_name, FileSystem* fs,
bool ignore_unknown_options) {
bool ignore_unknown_options,
size_t file_readahead_size) {
Reset();
std::unique_ptr<FSSequentialFile> seq_file;
@ -213,6 +214,9 @@ Status RocksDBOptionsParser::Parse(const std::string& file_name, FileSystem* fs,
return s;
}
SequentialFileReader sf_reader(std::move(seq_file), file_name,
file_readahead_size);
OptionSection section = kOptionSectionUnknown;
std::string title;
std::string argument;
@ -221,8 +225,8 @@ Status RocksDBOptionsParser::Parse(const std::string& file_name, FileSystem* fs,
std::string line;
bool has_data = true;
// we only support single-lined statement.
for (int line_num = 1;
ReadOneLine(&iss, seq_file.get(), &line, &has_data, &s); ++line_num) {
for (int line_num = 1; ReadOneLine(&iss, &sf_reader, &line, &has_data, &s);
++line_num) {
if (!s.ok()) {
return s;
}
@ -656,8 +660,17 @@ Status RocksDBOptionsParser::VerifyRocksDBOptionsFromFile(
const std::vector<ColumnFamilyOptions>& cf_opts,
const std::string& file_name, FileSystem* fs,
OptionsSanityCheckLevel sanity_check_level, bool ignore_unknown_options) {
// We infer option file readhead size from log readahead size.
// If it is not given, use 512KB.
size_t file_readahead_size = db_opt.log_readahead_size;
if (file_readahead_size == 0) {
const size_t kDefaultOptionFileReadAheadSize = 512 * 1024;
file_readahead_size = kDefaultOptionFileReadAheadSize;
}
RocksDBOptionsParser parser;
Status s = parser.Parse(file_name, fs, ignore_unknown_options);
Status s =
parser.Parse(file_name, fs, ignore_unknown_options, file_readahead_size);
if (!s.ok()) {
return s;
}

@ -48,8 +48,10 @@ class RocksDBOptionsParser {
~RocksDBOptionsParser() {}
void Reset();
// `file_readahead_size` is used for readahead for the option file.
// If 0 is given, a default value will be used.
Status Parse(const std::string& file_name, FileSystem* fs,
bool ignore_unknown_options = false);
bool ignore_unknown_options, size_t file_readahead_size);
static std::string TrimAndRemoveComment(const std::string& line,
const bool trim_only = false);

@ -1150,7 +1150,8 @@ TEST_F(OptionsParserTest, Comment) {
const std::string kTestFileName = "test-rocksdb-options.ini";
env_->WriteToNewFile(kTestFileName, options_file_content);
RocksDBOptionsParser parser;
ASSERT_OK(parser.Parse(kTestFileName, fs_.get()));
ASSERT_OK(
parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */));
ASSERT_OK(RocksDBOptionsParser::VerifyDBOptions(*parser.db_opt(), db_opt));
ASSERT_EQ(parser.NumColumnFamilies(), 1U);
@ -1176,7 +1177,8 @@ TEST_F(OptionsParserTest, ExtraSpace) {
const std::string kTestFileName = "test-rocksdb-options.ini";
env_->WriteToNewFile(kTestFileName, options_file_content);
RocksDBOptionsParser parser;
ASSERT_OK(parser.Parse(kTestFileName, fs_.get()));
ASSERT_OK(
parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */));
}
TEST_F(OptionsParserTest, MissingDBOptions) {
@ -1193,7 +1195,8 @@ TEST_F(OptionsParserTest, MissingDBOptions) {
const std::string kTestFileName = "test-rocksdb-options.ini";
env_->WriteToNewFile(kTestFileName, options_file_content);
RocksDBOptionsParser parser;
ASSERT_NOK(parser.Parse(kTestFileName, fs_.get()));
ASSERT_NOK(
parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */));
}
TEST_F(OptionsParserTest, DoubleDBOptions) {
@ -1221,7 +1224,8 @@ TEST_F(OptionsParserTest, DoubleDBOptions) {
const std::string kTestFileName = "test-rocksdb-options.ini";
env_->WriteToNewFile(kTestFileName, options_file_content);
RocksDBOptionsParser parser;
ASSERT_NOK(parser.Parse(kTestFileName, fs_.get()));
ASSERT_NOK(
parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */));
}
TEST_F(OptionsParserTest, NoDefaultCFOptions) {
@ -1248,7 +1252,8 @@ TEST_F(OptionsParserTest, NoDefaultCFOptions) {
const std::string kTestFileName = "test-rocksdb-options.ini";
env_->WriteToNewFile(kTestFileName, options_file_content);
RocksDBOptionsParser parser;
ASSERT_NOK(parser.Parse(kTestFileName, fs_.get()));
ASSERT_NOK(
parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */));
}
TEST_F(OptionsParserTest, DefaultCFOptionsMustBeTheFirst) {
@ -1277,7 +1282,8 @@ TEST_F(OptionsParserTest, DefaultCFOptionsMustBeTheFirst) {
const std::string kTestFileName = "test-rocksdb-options.ini";
env_->WriteToNewFile(kTestFileName, options_file_content);
RocksDBOptionsParser parser;
ASSERT_NOK(parser.Parse(kTestFileName, fs_.get()));
ASSERT_NOK(
parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */));
}
TEST_F(OptionsParserTest, DuplicateCFOptions) {
@ -1305,7 +1311,8 @@ TEST_F(OptionsParserTest, DuplicateCFOptions) {
const std::string kTestFileName = "test-rocksdb-options.ini";
env_->WriteToNewFile(kTestFileName, options_file_content);
RocksDBOptionsParser parser;
ASSERT_NOK(parser.Parse(kTestFileName, fs_.get()));
ASSERT_NOK(
parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */));
}
TEST_F(OptionsParserTest, IgnoreUnknownOptions) {
@ -1373,13 +1380,16 @@ TEST_F(OptionsParserTest, IgnoreUnknownOptions) {
env_->DeleteFile(kTestFileName);
env_->WriteToNewFile(kTestFileName, options_file_content);
RocksDBOptionsParser parser;
ASSERT_NOK(parser.Parse(kTestFileName, fs_.get()));
ASSERT_NOK(parser.Parse(kTestFileName, fs_.get(), false,
4096 /* readahead_size */));
if (should_ignore) {
ASSERT_OK(parser.Parse(kTestFileName, fs_.get(),
true /* ignore_unknown_options */));
true /* ignore_unknown_options */,
4096 /* readahead_size */));
} else {
ASSERT_NOK(parser.Parse(kTestFileName, fs_.get(),
true /* ignore_unknown_options */));
true /* ignore_unknown_options */,
4096 /* readahead_size */));
}
}
}
@ -1417,7 +1427,7 @@ TEST_F(OptionsParserTest, ParseVersion) {
parser.Reset();
env_->WriteToNewFile(iv, buffer);
ASSERT_NOK(parser.Parse(iv, fs_.get()));
ASSERT_NOK(parser.Parse(iv, fs_.get(), false, 0 /* readahead_size */));
}
const std::vector<std::string> valid_versions = {
@ -1426,7 +1436,7 @@ TEST_F(OptionsParserTest, ParseVersion) {
snprintf(buffer, kLength - 1, file_template.c_str(), vv.c_str());
parser.Reset();
env_->WriteToNewFile(vv, buffer);
ASSERT_OK(parser.Parse(vv, fs_.get()));
ASSERT_OK(parser.Parse(vv, fs_.get(), false, 0 /* readahead_size */));
}
}
@ -1518,6 +1528,52 @@ void VerifyCFPointerTypedOptions(
}
}
TEST_F(OptionsParserTest, Readahead) {
DBOptions base_db_opt;
std::vector<ColumnFamilyOptions> base_cf_opts;
base_cf_opts.emplace_back();
base_cf_opts.emplace_back();
std::string one_mb_string = std::string(1024 * 1024, 'x');
std::vector<std::string> cf_names = {"default", one_mb_string};
const std::string kOptionsFileName = "test-persisted-options.ini";
ASSERT_OK(PersistRocksDBOptions(base_db_opt, cf_names, base_cf_opts,
kOptionsFileName, fs_.get()));
uint64_t file_size;
ASSERT_OK(env_->GetFileSize(kOptionsFileName, &file_size));
RocksDBOptionsParser parser;
env_->num_seq_file_read_ = 0;
size_t readahead_size = 128 * 1024;
ASSERT_OK(parser.Parse(kOptionsFileName, fs_.get(), false, readahead_size));
ASSERT_EQ(env_->num_seq_file_read_.load(),
(file_size - 1) / readahead_size + 1);
env_->num_seq_file_read_.store(0);
readahead_size = 1024 * 1024;
ASSERT_OK(parser.Parse(kOptionsFileName, fs_.get(), false, readahead_size));
ASSERT_EQ(env_->num_seq_file_read_.load(),
(file_size - 1) / readahead_size + 1);
// Tiny readahead. 8 KB is read each time.
env_->num_seq_file_read_.store(0);
ASSERT_OK(
parser.Parse(kOptionsFileName, fs_.get(), false, 1 /* readahead_size */));
ASSERT_GE(env_->num_seq_file_read_.load(), file_size / (8 * 1024));
ASSERT_LT(env_->num_seq_file_read_.load(), file_size / (8 * 1024) * 2);
// Disable readahead means 512KB readahead.
env_->num_seq_file_read_.store(0);
ASSERT_OK(
parser.Parse(kOptionsFileName, fs_.get(), false, 0 /* readahead_size */));
ASSERT_GE(env_->num_seq_file_read_.load(),
(file_size - 1) / (512 * 1024) + 1);
}
TEST_F(OptionsParserTest, DumpAndParse) {
DBOptions base_db_opt;
std::vector<ColumnFamilyOptions> base_cf_opts;
@ -1554,7 +1610,8 @@ TEST_F(OptionsParserTest, DumpAndParse) {
kOptionsFileName, fs_.get()));
RocksDBOptionsParser parser;
ASSERT_OK(parser.Parse(kOptionsFileName, fs_.get()));
ASSERT_OK(
parser.Parse(kOptionsFileName, fs_.get(), false, 0 /* readahead_size */));
// Make sure block-based table factory options was deserialized correctly
std::shared_ptr<TableFactory> ttf = (*parser.cf_opts())[4].table_factory;
@ -1613,7 +1670,8 @@ TEST_F(OptionsParserTest, DifferentDefault) {
kOptionsFileName, fs_.get()));
RocksDBOptionsParser parser;
ASSERT_OK(parser.Parse(kOptionsFileName, fs_.get()));
ASSERT_OK(
parser.Parse(kOptionsFileName, fs_.get(), false, 0 /* readahead_size */));
{
Options old_default_opts;

@ -503,8 +503,8 @@ inline std::string EncodeInt(uint64_t x) {
class SeqStringSource : public SequentialFile {
public:
explicit SeqStringSource(const std::string& data)
: data_(data), offset_(0) {}
SeqStringSource(const std::string& data, std::atomic<int>* read_count)
: data_(data), offset_(0), read_count_(read_count) {}
~SeqStringSource() override {}
Status Read(size_t n, Slice* result, char* scratch) override {
std::string output;
@ -517,6 +517,7 @@ inline std::string EncodeInt(uint64_t x) {
return Status::InvalidArgument(
"Attemp to read when it already reached eof.");
}
(*read_count_)++;
return Status::OK();
}
Status Skip(uint64_t n) override {
@ -532,6 +533,7 @@ inline std::string EncodeInt(uint64_t x) {
private:
std::string data_;
size_t offset_;
std::atomic<int>* read_count_;
};
class StringEnv : public EnvWrapper {
@ -583,7 +585,7 @@ inline std::string EncodeInt(uint64_t x) {
if (iter == files_.end()) {
return Status::NotFound("The specified file does not exist", f);
}
r->reset(new SeqStringSource(iter->second));
r->reset(new SeqStringSource(iter->second, &num_seq_file_read_));
return Status::OK();
}
Status NewRandomAccessFile(const std::string& /*f*/,
@ -661,6 +663,8 @@ inline std::string EncodeInt(uint64_t x) {
return Status::NotSupported();
}
std::atomic<int> num_seq_file_read_;
protected:
std::unordered_map<std::string, std::string> files_;
};

@ -522,6 +522,8 @@ DEFINE_bool(new_table_reader_for_compaction_inputs, true,
DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size");
DEFINE_int32(log_readahead_size, 0, "WAL and manifest readahead size");
DEFINE_int32(random_access_max_buffer_size, 1024 * 1024,
"Maximum windows randomaccess buffer size");
@ -3490,6 +3492,7 @@ class Benchmark {
options.new_table_reader_for_compaction_inputs =
FLAGS_new_table_reader_for_compaction_inputs;
options.compaction_readahead_size = FLAGS_compaction_readahead_size;
options.log_readahead_size = FLAGS_log_readahead_size;
options.random_access_max_buffer_size = FLAGS_random_access_max_buffer_size;
options.writable_file_max_buffer_size = FLAGS_writable_file_max_buffer_size;
options.use_fsync = FLAGS_use_fsync;

@ -132,7 +132,10 @@ class TraceAnalyzerTest : public testing::Test {
Status s;
std::unique_ptr<FSSequentialFile> file =
NewLegacySequentialFileWrapper(f_ptr);
for (count = 0; ReadOneLine(&iss, file.get(), &get_line, &has_data, &s);
SequentialFileReader sf_reader(std::move(file), file_path,
4096 /* filereadahead_size */);
for (count = 0; ReadOneLine(&iss, &sf_reader, &get_line, &has_data, &s);
++count) {
ASSERT_OK(s);
result.push_back(get_line);

@ -1058,11 +1058,16 @@ Status TraceAnalyzer::ReProcessing() {
cf_id);
wkey_input_f.reset();
}
if (wkey_input_f) {
std::unique_ptr<FSSequentialFile> file;
file = NewLegacySequentialFileWrapper(wkey_input_f);
size_t kTraceFileReadaheadSize = 2 * 1024 * 1024;
SequentialFileReader sf_reader(
std::move(file), whole_key_path,
kTraceFileReadaheadSize /* filereadahead_size */);
for (cfs_[cf_id].w_count = 0;
ReadOneLine(&iss, file.get(), &get_key, &has_data, &s);
ReadOneLine(&iss, &sf_reader, &get_key, &has_data, &s);
++cfs_[cf_id].w_count) {
if (!s.ok()) {
fprintf(stderr, "Read whole key space file failed\n");

@ -348,8 +348,8 @@ class ReadaheadSequentialFileTest : public testing::Test,
}
void Skip(size_t n) { test_read_holder_->Skip(n); }
void ResetSourceStr(const std::string& str = "") {
auto read_holder =
std::unique_ptr<SequentialFile>(new test::SeqStringSource(str));
auto read_holder = std::unique_ptr<SequentialFile>(
new test::SeqStringSource(str, &seq_read_count_));
test_read_holder_.reset(new SequentialFileReader(
NewLegacySequentialFileWrapper(read_holder), "test", readahead_size_));
}
@ -359,6 +359,7 @@ class ReadaheadSequentialFileTest : public testing::Test,
size_t readahead_size_;
std::unique_ptr<SequentialFileReader> test_read_holder_;
std::unique_ptr<char[]> scratch_;
std::atomic<int> seq_read_count_;
};
TEST_P(ReadaheadSequentialFileTest, EmptySourceStr) {

@ -20,7 +20,8 @@ Status LoadOptionsFromFile(const std::string& file_name, Env* env,
std::shared_ptr<Cache>* cache) {
RocksDBOptionsParser parser;
LegacyFileSystemWrapper fs(env);
Status s = parser.Parse(file_name, &fs, ignore_unknown_options);
Status s = parser.Parse(file_name, &fs, ignore_unknown_options,
0 /* file_readahead_size */);
if (!s.ok()) {
return s;
}

Loading…
Cancel
Save