From 876c2dbff4f68f884ee1fda2e8e07ca1b3aa9cf2 Mon Sep 17 00:00:00 2001 From: sdong Date: Fri, 7 Feb 2020 15:16:29 -0800 Subject: [PATCH] Allow readahead when reading option files. (#6372) Summary: Right, when reading from option files, no readahead is used and 8KB buffer is used. It might introduce high latency if the file system provide high latency and doesn't do readahead. Instead, introduce a readahead to the file. When calling inside DB, infer the value from options.log_readahead. Otherwise, a default 512KB readahead size is used. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6372 Test Plan: Add --log_readahead_size in db_bench. Run it with several options and observe read size from option files using strace. Differential Revision: D19727739 fbshipit-source-id: e6d8053b0a64259abc087f1f388b9cd66fa8a583 --- HISTORY.md | 3 ++ file/read_write_util.cc | 5 +- file/read_write_util.h | 3 +- options/options_parser.cc | 21 ++++++-- options/options_parser.h | 4 +- options/options_test.cc | 86 ++++++++++++++++++++++++++----- test_util/testutil.h | 10 ++-- tools/db_bench_tool.cc | 3 ++ tools/trace_analyzer_test.cc | 5 +- tools/trace_analyzer_tool.cc | 7 ++- util/file_reader_writer_test.cc | 5 +- utilities/options/options_util.cc | 3 +- 12 files changed, 124 insertions(+), 31 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 0dde06d15..01d3f2e3a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -10,6 +10,9 @@ * Add DBOptions::skip_checking_sst_file_sizes_on_db_open. It disables potentially expensive checking of all sst file sizes in DB::Open(). * BlobDB now ignores trivially moved files when updating the mapping between blob files and SSTs. This should mitigate issue #6338 where out of order flush/compaction notifications could trigger an assertion with the earlier code. +### Performance Improvements +* Perfom readahead when reading from option files. Inside DB, options.log_readahead_size will be used as the readahead size. In other cases, a default 512KB is used. + ### Public API Change * The BlobDB garbage collector now emits the statistics `BLOB_DB_GC_NUM_FILES` (number of blob files obsoleted during GC), `BLOB_DB_GC_NUM_NEW_FILES` (number of new blob files generated during GC), `BLOB_DB_GC_FAILURES` (number of failed GC passes), `BLOB_DB_GC_NUM_KEYS_RELOCATED` (number of blobs relocated during GC), and `BLOB_DB_GC_BYTES_RELOCATED` (total size of blobs relocated during GC). On the other hand, the following statistics, which are not relevant for the new GC implementation, are now deprecated: `BLOB_DB_GC_NUM_KEYS_OVERWRITTEN`, `BLOB_DB_GC_NUM_KEYS_EXPIRED`, `BLOB_DB_GC_BYTES_OVERWRITTEN`, `BLOB_DB_GC_BYTES_EXPIRED`, and `BLOB_DB_GC_MICROS`. * Disable recycle_log_file_num when an inconsistent recovery modes are requested: kPointInTimeRecovery and kAbsoluteConsistency diff --git a/file/read_write_util.cc b/file/read_write_util.cc index de39008ba..c092fb1be 100644 --- a/file/read_write_util.cc +++ b/file/read_write_util.cc @@ -22,7 +22,7 @@ IOStatus NewWritableFile(FileSystem* fs, const std::string& fname, return s; } -bool ReadOneLine(std::istringstream* iss, FSSequentialFile* seq_file, +bool ReadOneLine(std::istringstream* iss, SequentialFileReader* seq_file_reader, std::string* output, bool* has_data, Status* result) { const int kBufferSize = 8192; char buffer[kBufferSize + 1]; @@ -40,8 +40,7 @@ bool ReadOneLine(std::istringstream* iss, FSSequentialFile* seq_file, // if we're not sure whether we have a complete line, // further read from the file. if (*has_data) { - *result = seq_file->Read(kBufferSize, IOOptions(), - &input_slice, buffer, nullptr); + *result = seq_file_reader->Read(kBufferSize, &input_slice, buffer); } if (input_slice.size() == 0) { // meaning we have read all the data diff --git a/file/read_write_util.h b/file/read_write_util.h index 3abb3388e..1f315f032 100644 --- a/file/read_write_util.h +++ b/file/read_write_util.h @@ -9,6 +9,7 @@ #pragma once #include +#include "file/sequence_file_reader.h" #include "rocksdb/env.h" #include "rocksdb/file_system.h" @@ -24,7 +25,7 @@ extern IOStatus NewWritableFile(FileSystem* fs, const std::string& fname, const FileOptions& options); // Read a single line from a file. -bool ReadOneLine(std::istringstream* iss, FSSequentialFile* seq_file, +bool ReadOneLine(std::istringstream* iss, SequentialFileReader* seq_file_reader, std::string* output, bool* has_data, Status* result); #ifndef NDEBUG diff --git a/options/options_parser.cc b/options/options_parser.cc index 53bc56760..f133aab5e 100644 --- a/options/options_parser.cc +++ b/options/options_parser.cc @@ -203,7 +203,8 @@ Status RocksDBOptionsParser::ParseStatement(std::string* name, } Status RocksDBOptionsParser::Parse(const std::string& file_name, FileSystem* fs, - bool ignore_unknown_options) { + bool ignore_unknown_options, + size_t file_readahead_size) { Reset(); std::unique_ptr seq_file; @@ -213,6 +214,9 @@ Status RocksDBOptionsParser::Parse(const std::string& file_name, FileSystem* fs, return s; } + SequentialFileReader sf_reader(std::move(seq_file), file_name, + file_readahead_size); + OptionSection section = kOptionSectionUnknown; std::string title; std::string argument; @@ -221,8 +225,8 @@ Status RocksDBOptionsParser::Parse(const std::string& file_name, FileSystem* fs, std::string line; bool has_data = true; // we only support single-lined statement. - for (int line_num = 1; - ReadOneLine(&iss, seq_file.get(), &line, &has_data, &s); ++line_num) { + for (int line_num = 1; ReadOneLine(&iss, &sf_reader, &line, &has_data, &s); + ++line_num) { if (!s.ok()) { return s; } @@ -656,8 +660,17 @@ Status RocksDBOptionsParser::VerifyRocksDBOptionsFromFile( const std::vector& cf_opts, const std::string& file_name, FileSystem* fs, OptionsSanityCheckLevel sanity_check_level, bool ignore_unknown_options) { + // We infer option file readhead size from log readahead size. + // If it is not given, use 512KB. + size_t file_readahead_size = db_opt.log_readahead_size; + if (file_readahead_size == 0) { + const size_t kDefaultOptionFileReadAheadSize = 512 * 1024; + file_readahead_size = kDefaultOptionFileReadAheadSize; + } + RocksDBOptionsParser parser; - Status s = parser.Parse(file_name, fs, ignore_unknown_options); + Status s = + parser.Parse(file_name, fs, ignore_unknown_options, file_readahead_size); if (!s.ok()) { return s; } diff --git a/options/options_parser.h b/options/options_parser.h index b675b28fe..13601d967 100644 --- a/options/options_parser.h +++ b/options/options_parser.h @@ -48,8 +48,10 @@ class RocksDBOptionsParser { ~RocksDBOptionsParser() {} void Reset(); + // `file_readahead_size` is used for readahead for the option file. + // If 0 is given, a default value will be used. Status Parse(const std::string& file_name, FileSystem* fs, - bool ignore_unknown_options = false); + bool ignore_unknown_options, size_t file_readahead_size); static std::string TrimAndRemoveComment(const std::string& line, const bool trim_only = false); diff --git a/options/options_test.cc b/options/options_test.cc index ceffc483a..4fd6c9e9a 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -1150,7 +1150,8 @@ TEST_F(OptionsParserTest, Comment) { const std::string kTestFileName = "test-rocksdb-options.ini"; env_->WriteToNewFile(kTestFileName, options_file_content); RocksDBOptionsParser parser; - ASSERT_OK(parser.Parse(kTestFileName, fs_.get())); + ASSERT_OK( + parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); ASSERT_OK(RocksDBOptionsParser::VerifyDBOptions(*parser.db_opt(), db_opt)); ASSERT_EQ(parser.NumColumnFamilies(), 1U); @@ -1176,7 +1177,8 @@ TEST_F(OptionsParserTest, ExtraSpace) { const std::string kTestFileName = "test-rocksdb-options.ini"; env_->WriteToNewFile(kTestFileName, options_file_content); RocksDBOptionsParser parser; - ASSERT_OK(parser.Parse(kTestFileName, fs_.get())); + ASSERT_OK( + parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); } TEST_F(OptionsParserTest, MissingDBOptions) { @@ -1193,7 +1195,8 @@ TEST_F(OptionsParserTest, MissingDBOptions) { const std::string kTestFileName = "test-rocksdb-options.ini"; env_->WriteToNewFile(kTestFileName, options_file_content); RocksDBOptionsParser parser; - ASSERT_NOK(parser.Parse(kTestFileName, fs_.get())); + ASSERT_NOK( + parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); } TEST_F(OptionsParserTest, DoubleDBOptions) { @@ -1221,7 +1224,8 @@ TEST_F(OptionsParserTest, DoubleDBOptions) { const std::string kTestFileName = "test-rocksdb-options.ini"; env_->WriteToNewFile(kTestFileName, options_file_content); RocksDBOptionsParser parser; - ASSERT_NOK(parser.Parse(kTestFileName, fs_.get())); + ASSERT_NOK( + parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); } TEST_F(OptionsParserTest, NoDefaultCFOptions) { @@ -1248,7 +1252,8 @@ TEST_F(OptionsParserTest, NoDefaultCFOptions) { const std::string kTestFileName = "test-rocksdb-options.ini"; env_->WriteToNewFile(kTestFileName, options_file_content); RocksDBOptionsParser parser; - ASSERT_NOK(parser.Parse(kTestFileName, fs_.get())); + ASSERT_NOK( + parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); } TEST_F(OptionsParserTest, DefaultCFOptionsMustBeTheFirst) { @@ -1277,7 +1282,8 @@ TEST_F(OptionsParserTest, DefaultCFOptionsMustBeTheFirst) { const std::string kTestFileName = "test-rocksdb-options.ini"; env_->WriteToNewFile(kTestFileName, options_file_content); RocksDBOptionsParser parser; - ASSERT_NOK(parser.Parse(kTestFileName, fs_.get())); + ASSERT_NOK( + parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); } TEST_F(OptionsParserTest, DuplicateCFOptions) { @@ -1305,7 +1311,8 @@ TEST_F(OptionsParserTest, DuplicateCFOptions) { const std::string kTestFileName = "test-rocksdb-options.ini"; env_->WriteToNewFile(kTestFileName, options_file_content); RocksDBOptionsParser parser; - ASSERT_NOK(parser.Parse(kTestFileName, fs_.get())); + ASSERT_NOK( + parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); } TEST_F(OptionsParserTest, IgnoreUnknownOptions) { @@ -1373,13 +1380,16 @@ TEST_F(OptionsParserTest, IgnoreUnknownOptions) { env_->DeleteFile(kTestFileName); env_->WriteToNewFile(kTestFileName, options_file_content); RocksDBOptionsParser parser; - ASSERT_NOK(parser.Parse(kTestFileName, fs_.get())); + ASSERT_NOK(parser.Parse(kTestFileName, fs_.get(), false, + 4096 /* readahead_size */)); if (should_ignore) { ASSERT_OK(parser.Parse(kTestFileName, fs_.get(), - true /* ignore_unknown_options */)); + true /* ignore_unknown_options */, + 4096 /* readahead_size */)); } else { ASSERT_NOK(parser.Parse(kTestFileName, fs_.get(), - true /* ignore_unknown_options */)); + true /* ignore_unknown_options */, + 4096 /* readahead_size */)); } } } @@ -1417,7 +1427,7 @@ TEST_F(OptionsParserTest, ParseVersion) { parser.Reset(); env_->WriteToNewFile(iv, buffer); - ASSERT_NOK(parser.Parse(iv, fs_.get())); + ASSERT_NOK(parser.Parse(iv, fs_.get(), false, 0 /* readahead_size */)); } const std::vector valid_versions = { @@ -1426,7 +1436,7 @@ TEST_F(OptionsParserTest, ParseVersion) { snprintf(buffer, kLength - 1, file_template.c_str(), vv.c_str()); parser.Reset(); env_->WriteToNewFile(vv, buffer); - ASSERT_OK(parser.Parse(vv, fs_.get())); + ASSERT_OK(parser.Parse(vv, fs_.get(), false, 0 /* readahead_size */)); } } @@ -1518,6 +1528,52 @@ void VerifyCFPointerTypedOptions( } } +TEST_F(OptionsParserTest, Readahead) { + DBOptions base_db_opt; + std::vector base_cf_opts; + base_cf_opts.emplace_back(); + base_cf_opts.emplace_back(); + + std::string one_mb_string = std::string(1024 * 1024, 'x'); + std::vector cf_names = {"default", one_mb_string}; + const std::string kOptionsFileName = "test-persisted-options.ini"; + + ASSERT_OK(PersistRocksDBOptions(base_db_opt, cf_names, base_cf_opts, + kOptionsFileName, fs_.get())); + + uint64_t file_size; + ASSERT_OK(env_->GetFileSize(kOptionsFileName, &file_size)); + + RocksDBOptionsParser parser; + + env_->num_seq_file_read_ = 0; + size_t readahead_size = 128 * 1024; + + ASSERT_OK(parser.Parse(kOptionsFileName, fs_.get(), false, readahead_size)); + ASSERT_EQ(env_->num_seq_file_read_.load(), + (file_size - 1) / readahead_size + 1); + + env_->num_seq_file_read_.store(0); + readahead_size = 1024 * 1024; + ASSERT_OK(parser.Parse(kOptionsFileName, fs_.get(), false, readahead_size)); + ASSERT_EQ(env_->num_seq_file_read_.load(), + (file_size - 1) / readahead_size + 1); + + // Tiny readahead. 8 KB is read each time. + env_->num_seq_file_read_.store(0); + ASSERT_OK( + parser.Parse(kOptionsFileName, fs_.get(), false, 1 /* readahead_size */)); + ASSERT_GE(env_->num_seq_file_read_.load(), file_size / (8 * 1024)); + ASSERT_LT(env_->num_seq_file_read_.load(), file_size / (8 * 1024) * 2); + + // Disable readahead means 512KB readahead. + env_->num_seq_file_read_.store(0); + ASSERT_OK( + parser.Parse(kOptionsFileName, fs_.get(), false, 0 /* readahead_size */)); + ASSERT_GE(env_->num_seq_file_read_.load(), + (file_size - 1) / (512 * 1024) + 1); +} + TEST_F(OptionsParserTest, DumpAndParse) { DBOptions base_db_opt; std::vector base_cf_opts; @@ -1554,7 +1610,8 @@ TEST_F(OptionsParserTest, DumpAndParse) { kOptionsFileName, fs_.get())); RocksDBOptionsParser parser; - ASSERT_OK(parser.Parse(kOptionsFileName, fs_.get())); + ASSERT_OK( + parser.Parse(kOptionsFileName, fs_.get(), false, 0 /* readahead_size */)); // Make sure block-based table factory options was deserialized correctly std::shared_ptr ttf = (*parser.cf_opts())[4].table_factory; @@ -1613,7 +1670,8 @@ TEST_F(OptionsParserTest, DifferentDefault) { kOptionsFileName, fs_.get())); RocksDBOptionsParser parser; - ASSERT_OK(parser.Parse(kOptionsFileName, fs_.get())); + ASSERT_OK( + parser.Parse(kOptionsFileName, fs_.get(), false, 0 /* readahead_size */)); { Options old_default_opts; diff --git a/test_util/testutil.h b/test_util/testutil.h index 996649c1b..5fec97e02 100644 --- a/test_util/testutil.h +++ b/test_util/testutil.h @@ -503,8 +503,8 @@ inline std::string EncodeInt(uint64_t x) { class SeqStringSource : public SequentialFile { public: - explicit SeqStringSource(const std::string& data) - : data_(data), offset_(0) {} + SeqStringSource(const std::string& data, std::atomic* read_count) + : data_(data), offset_(0), read_count_(read_count) {} ~SeqStringSource() override {} Status Read(size_t n, Slice* result, char* scratch) override { std::string output; @@ -517,6 +517,7 @@ inline std::string EncodeInt(uint64_t x) { return Status::InvalidArgument( "Attemp to read when it already reached eof."); } + (*read_count_)++; return Status::OK(); } Status Skip(uint64_t n) override { @@ -532,6 +533,7 @@ inline std::string EncodeInt(uint64_t x) { private: std::string data_; size_t offset_; + std::atomic* read_count_; }; class StringEnv : public EnvWrapper { @@ -583,7 +585,7 @@ inline std::string EncodeInt(uint64_t x) { if (iter == files_.end()) { return Status::NotFound("The specified file does not exist", f); } - r->reset(new SeqStringSource(iter->second)); + r->reset(new SeqStringSource(iter->second, &num_seq_file_read_)); return Status::OK(); } Status NewRandomAccessFile(const std::string& /*f*/, @@ -661,6 +663,8 @@ inline std::string EncodeInt(uint64_t x) { return Status::NotSupported(); } + std::atomic num_seq_file_read_; + protected: std::unordered_map files_; }; diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index f0c447262..d9e6c1af9 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -522,6 +522,8 @@ DEFINE_bool(new_table_reader_for_compaction_inputs, true, DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size"); +DEFINE_int32(log_readahead_size, 0, "WAL and manifest readahead size"); + DEFINE_int32(random_access_max_buffer_size, 1024 * 1024, "Maximum windows randomaccess buffer size"); @@ -3490,6 +3492,7 @@ class Benchmark { options.new_table_reader_for_compaction_inputs = FLAGS_new_table_reader_for_compaction_inputs; options.compaction_readahead_size = FLAGS_compaction_readahead_size; + options.log_readahead_size = FLAGS_log_readahead_size; options.random_access_max_buffer_size = FLAGS_random_access_max_buffer_size; options.writable_file_max_buffer_size = FLAGS_writable_file_max_buffer_size; options.use_fsync = FLAGS_use_fsync; diff --git a/tools/trace_analyzer_test.cc b/tools/trace_analyzer_test.cc index 1ef100fe4..e453a4e9d 100644 --- a/tools/trace_analyzer_test.cc +++ b/tools/trace_analyzer_test.cc @@ -132,7 +132,10 @@ class TraceAnalyzerTest : public testing::Test { Status s; std::unique_ptr file = NewLegacySequentialFileWrapper(f_ptr); - for (count = 0; ReadOneLine(&iss, file.get(), &get_line, &has_data, &s); + SequentialFileReader sf_reader(std::move(file), file_path, + 4096 /* filereadahead_size */); + + for (count = 0; ReadOneLine(&iss, &sf_reader, &get_line, &has_data, &s); ++count) { ASSERT_OK(s); result.push_back(get_line); diff --git a/tools/trace_analyzer_tool.cc b/tools/trace_analyzer_tool.cc index 27d097c2c..915f2727d 100644 --- a/tools/trace_analyzer_tool.cc +++ b/tools/trace_analyzer_tool.cc @@ -1058,11 +1058,16 @@ Status TraceAnalyzer::ReProcessing() { cf_id); wkey_input_f.reset(); } + if (wkey_input_f) { std::unique_ptr file; file = NewLegacySequentialFileWrapper(wkey_input_f); + size_t kTraceFileReadaheadSize = 2 * 1024 * 1024; + SequentialFileReader sf_reader( + std::move(file), whole_key_path, + kTraceFileReadaheadSize /* filereadahead_size */); for (cfs_[cf_id].w_count = 0; - ReadOneLine(&iss, file.get(), &get_key, &has_data, &s); + ReadOneLine(&iss, &sf_reader, &get_key, &has_data, &s); ++cfs_[cf_id].w_count) { if (!s.ok()) { fprintf(stderr, "Read whole key space file failed\n"); diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index 92f1004bc..7192c19f4 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -348,8 +348,8 @@ class ReadaheadSequentialFileTest : public testing::Test, } void Skip(size_t n) { test_read_holder_->Skip(n); } void ResetSourceStr(const std::string& str = "") { - auto read_holder = - std::unique_ptr(new test::SeqStringSource(str)); + auto read_holder = std::unique_ptr( + new test::SeqStringSource(str, &seq_read_count_)); test_read_holder_.reset(new SequentialFileReader( NewLegacySequentialFileWrapper(read_holder), "test", readahead_size_)); } @@ -359,6 +359,7 @@ class ReadaheadSequentialFileTest : public testing::Test, size_t readahead_size_; std::unique_ptr test_read_holder_; std::unique_ptr scratch_; + std::atomic seq_read_count_; }; TEST_P(ReadaheadSequentialFileTest, EmptySourceStr) { diff --git a/utilities/options/options_util.cc b/utilities/options/options_util.cc index 53acf7bc1..fb5b3c2e7 100644 --- a/utilities/options/options_util.cc +++ b/utilities/options/options_util.cc @@ -20,7 +20,8 @@ Status LoadOptionsFromFile(const std::string& file_name, Env* env, std::shared_ptr* cache) { RocksDBOptionsParser parser; LegacyFileSystemWrapper fs(env); - Status s = parser.Parse(file_name, &fs, ignore_unknown_options); + Status s = parser.Parse(file_name, &fs, ignore_unknown_options, + 0 /* file_readahead_size */); if (!s.ok()) { return s; }