diff --git a/CMakeLists.txt b/CMakeLists.txt index f499ecd99..77b6f310c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -653,6 +653,7 @@ set(SOURCES file/file_prefetch_buffer.cc file/file_util.cc file/filename.cc + file/line_file_reader.cc file/random_access_file_reader.cc file/read_write_util.cc file/readahead_raf.cc diff --git a/TARGETS b/TARGETS index c2f658eae..16356a862 100644 --- a/TARGETS +++ b/TARGETS @@ -222,6 +222,7 @@ cpp_library( "file/file_prefetch_buffer.cc", "file/file_util.cc", "file/filename.cc", + "file/line_file_reader.cc", "file/random_access_file_reader.cc", "file/read_write_util.cc", "file/readahead_raf.cc", @@ -528,6 +529,7 @@ cpp_library( "file/file_prefetch_buffer.cc", "file/file_util.cc", "file/filename.cc", + "file/line_file_reader.cc", "file/random_access_file_reader.cc", "file/read_write_util.cc", "file/readahead_raf.cc", diff --git a/env/mock_env.cc b/env/mock_env.cc index d83e78ffc..9a917e21a 100644 --- a/env/mock_env.cc +++ b/env/mock_env.cc @@ -15,6 +15,7 @@ #include "file/filename.h" #include "port/sys_time.h" #include "rocksdb/file_system.h" +#include "test_util/sync_point.h" #include "util/cast_util.h" #include "util/hash.h" #include "util/random.h" @@ -105,6 +106,15 @@ class MemFile { IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*options*/, Slice* result, char* scratch, IODebugContext* /*dbg*/) const { + { + IOStatus s; + TEST_SYNC_POINT_CALLBACK("MemFile::Read:IOStatus", &s); + if (!s.ok()) { + // with sync point only + *result = Slice(); + return s; + } + } MutexLock lock(&mutex_); const uint64_t available = Size() - std::min(Size(), offset); size_t offset_ = static_cast(offset); diff --git a/file/line_file_reader.cc b/file/line_file_reader.cc new file mode 100644 index 000000000..8a56a09b2 --- /dev/null +++ b/file/line_file_reader.cc @@ -0,0 +1,65 @@ +// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "file/line_file_reader.h" + +#include + +namespace ROCKSDB_NAMESPACE { + +Status LineFileReader::Create(const std::shared_ptr& fs, + const std::string& fname, + const FileOptions& file_opts, + std::unique_ptr* reader, + IODebugContext* dbg) { + std::unique_ptr file; + Status s = fs->NewSequentialFile(fname, file_opts, &file, dbg); + if (s.ok()) { + reader->reset(new LineFileReader(std::move(file), fname)); + } + return s; +} + +bool LineFileReader::ReadLine(std::string* out) { + assert(out); + if (!status_.ok()) { + // Status should be checked (or permit unchecked) any time we return false. + status_.MustCheck(); + return false; + } + out->clear(); + for (;;) { + // Look for line delimiter + const char* found = static_cast( + std::memchr(buf_begin_, '\n', buf_end_ - buf_begin_)); + if (found) { + size_t len = found - buf_begin_; + out->append(buf_begin_, len); + buf_begin_ += len + /*delim*/ 1; + ++line_number_; + return true; + } + if (at_eof_) { + status_.MustCheck(); + return false; + } + // else flush and reload buffer + out->append(buf_begin_, buf_end_ - buf_begin_); + Slice result; + status_ = sfr_.Read(buf_.size(), &result, buf_.data()); + if (!status_.ok()) { + status_.MustCheck(); + return false; + } + if (result.size() != buf_.size()) { + // The obscure way of indicating EOF + at_eof_ = true; + } + buf_begin_ = result.data(); + buf_end_ = result.data() + result.size(); + } +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/file/line_file_reader.h b/file/line_file_reader.h new file mode 100644 index 000000000..48d79f327 --- /dev/null +++ b/file/line_file_reader.h @@ -0,0 +1,59 @@ +// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#include + +#include "file/sequence_file_reader.h" + +namespace ROCKSDB_NAMESPACE { + +// A wrapper on top of Env::SequentialFile for reading text lines from a file. +// Lines are delimited by '\n'. The last line may or may not include a +// trailing newline. Uses SequentialFileReader internally. +class LineFileReader { + private: + std::array buf_; + SequentialFileReader sfr_; + Status status_; + const char* buf_begin_ = buf_.data(); + const char* buf_end_ = buf_.data(); + size_t line_number_ = 0; + bool at_eof_ = false; + + public: + // See SequentialFileReader constructors + template + explicit LineFileReader(Args&&... args) + : sfr_(std::forward(args)...) {} + + static Status Create(const std::shared_ptr& fs, + const std::string& fname, const FileOptions& file_opts, + std::unique_ptr* reader, + IODebugContext* dbg); + + LineFileReader(const LineFileReader&) = delete; + LineFileReader& operator=(const LineFileReader&) = delete; + + // Reads another line from the file, returning true on success and saving + // the line to `out`, without delimiter, or returning false on failure. You + // must check GetStatus() to determine whether the failure was just + // end-of-file (OK status) or an I/O error (another status). + bool ReadLine(std::string* out); + + // Returns the number of the line most recently returned from ReadLine. + // Return value is unspecified if ReadLine has returned false due to + // I/O error. After ReadLine returns false due to end-of-file, return + // value is the last returned line number, or equivalently the total + // number of lines returned. + size_t GetLineNumber() const { return line_number_; } + + // Returns any error encountered during read. The error is considered + // permanent and no retry or recovery is attempted with the same + // LineFileReader. + const Status& GetStatus() const { return status_; } +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/file/read_write_util.cc b/file/read_write_util.cc index b4854e110..6b9379c42 100644 --- a/file/read_write_util.cc +++ b/file/read_write_util.cc @@ -22,43 +22,6 @@ IOStatus NewWritableFile(FileSystem* fs, const std::string& fname, return s; } -bool ReadOneLine(std::istringstream* iss, SequentialFileReader* seq_file_reader, - std::string* output, bool* has_data, Status* result) { - const int kBufferSize = 8192; - char buffer[kBufferSize + 1]; - Slice input_slice; - - std::string line; - bool has_complete_line = false; - while (!has_complete_line) { - if (std::getline(*iss, line)) { - has_complete_line = !iss->eof(); - } else { - has_complete_line = false; - } - if (!has_complete_line) { - // if we're not sure whether we have a complete line, - // further read from the file. - if (*has_data) { - *result = seq_file_reader->Read(kBufferSize, &input_slice, buffer); - } - if (input_slice.size() == 0) { - // meaning we have read all the data - *has_data = false; - break; - } else { - iss->str(line + input_slice.ToString()); - // reset the internal state of iss so that we can keep reading it. - iss->clear(); - *has_data = (input_slice.size() == kBufferSize); - continue; - } - } - } - *output = line; - return *has_data || has_complete_line; -} - #ifndef NDEBUG bool IsFileSectorAligned(const size_t off, size_t sector_size) { return off % sector_size == 0; diff --git a/file/read_write_util.h b/file/read_write_util.h index 22f4076b3..718135c98 100644 --- a/file/read_write_util.h +++ b/file/read_write_util.h @@ -24,10 +24,6 @@ extern IOStatus NewWritableFile(FileSystem* fs, const std::string& fname, std::unique_ptr* result, const FileOptions& options); -// Read a single line from a file. -bool ReadOneLine(std::istringstream* iss, SequentialFileReader* seq_file_reader, - std::string* output, bool* has_data, Status* result); - #ifndef NDEBUG bool IsFileSectorAligned(const size_t off, size_t sector_size); #endif // NDEBUG diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index bcc55e4fd..235cea15d 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -65,9 +65,11 @@ class Status { // In case of intentionally swallowing an error, user must explicitly call // this function. That way we are easily able to search the code to find where // error swallowing occurs. - void PermitUncheckedError() const { + inline void PermitUncheckedError() const { MarkChecked(); } + + inline void MustCheck() const { #ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; + checked_ = false; #endif // ROCKSDB_ASSERT_STATUS_CHECKED } @@ -92,9 +94,7 @@ class Status { }; Code code() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return code_; } @@ -118,9 +118,7 @@ class Status { }; SubCode subcode() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return subcode_; } @@ -135,17 +133,13 @@ class Status { Status(const Status& s, Severity sev); Severity severity() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return sev_; } // Returns a C style string indicating the message of the Status const char* getState() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return state_; } @@ -289,127 +283,95 @@ class Status { // Returns true iff the status indicates success. bool ok() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return code() == kOk; } // Returns true iff the status indicates success *with* something // overwritten bool IsOkOverwritten() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return code() == kOk && subcode() == kOverwritten; } // Returns true iff the status indicates a NotFound error. bool IsNotFound() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return code() == kNotFound; } // Returns true iff the status indicates a Corruption error. bool IsCorruption() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return code() == kCorruption; } // Returns true iff the status indicates a NotSupported error. bool IsNotSupported() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return code() == kNotSupported; } // Returns true iff the status indicates an InvalidArgument error. bool IsInvalidArgument() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return code() == kInvalidArgument; } // Returns true iff the status indicates an IOError. bool IsIOError() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return code() == kIOError; } // Returns true iff the status indicates an MergeInProgress. bool IsMergeInProgress() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return code() == kMergeInProgress; } // Returns true iff the status indicates Incomplete bool IsIncomplete() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return code() == kIncomplete; } // Returns true iff the status indicates Shutdown In progress bool IsShutdownInProgress() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return code() == kShutdownInProgress; } bool IsTimedOut() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return code() == kTimedOut; } bool IsAborted() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return code() == kAborted; } bool IsLockLimit() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return code() == kAborted && subcode() == kLockLimit; } // Returns true iff the status indicates that a resource is Busy and // temporarily could not be acquired. bool IsBusy() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return code() == kBusy; } bool IsDeadlock() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return code() == kBusy && subcode() == kDeadlock; } // Returns true iff the status indicated that the operation has Expired. bool IsExpired() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return code() == kExpired; } @@ -417,25 +379,19 @@ class Status { // This usually means that the operation failed, but may succeed if // re-attempted. bool IsTryAgain() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return code() == kTryAgain; } // Returns true iff the status indicates the proposed compaction is too large bool IsCompactionTooLarge() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return code() == kCompactionTooLarge; } // Returns true iff the status indicates Column Family Dropped bool IsColumnFamilyDropped() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return code() == kColumnFamilyDropped; } @@ -445,9 +401,7 @@ class Status { // with a specific subcode, enabling users to take the appropriate action // if needed bool IsNoSpace() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return (code() == kIOError) && (subcode() == kNoSpace); } @@ -455,9 +409,7 @@ class Status { // cases where we limit the memory used in certain operations (eg. the size // of a write batch) in order to avoid out of memory exceptions. bool IsMemoryLimit() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return (code() == kAborted) && (subcode() == kMemoryLimit); } @@ -466,9 +418,7 @@ class Status { // directory" error condition. A PathNotFound error is an I/O error with // a specific subcode, enabling users to take appropriate action if necessary bool IsPathNotFound() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return (code() == kIOError || code() == kNotFound) && (subcode() == kPathNotFound); } @@ -476,25 +426,19 @@ class Status { // Returns true iff the status indicates manual compaction paused. This // is caused by a call to PauseManualCompaction bool IsManualCompactionPaused() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return (code() == kIncomplete) && (subcode() == kManualCompactionPaused); } // Returns true iff the status indicates a TxnNotPrepared error. bool IsTxnNotPrepared() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return (code() == kInvalidArgument) && (subcode() == kTxnNotPrepared); } // Returns true iff the status indicates a IOFenced error. bool IsIOFenced() const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); return (code() == kIOError) && (subcode() == kIOFenced); } @@ -524,28 +468,28 @@ class Status { : Status(_code, kNone, msg, msg2) {} static const char* CopyState(const char* s); + + inline void MarkChecked() const { +#ifdef ROCKSDB_ASSERT_STATUS_CHECKED + checked_ = true; +#endif // ROCKSDB_ASSERT_STATUS_CHECKED + } }; inline Status::Status(const Status& s) : code_(s.code_), subcode_(s.subcode_), sev_(s.sev_) { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - s.checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + s.MarkChecked(); state_ = (s.state_ == nullptr) ? nullptr : CopyState(s.state_); } inline Status::Status(const Status& s, Severity sev) : code_(s.code_), subcode_(s.subcode_), sev_(sev) { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - s.checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + s.MarkChecked(); state_ = (s.state_ == nullptr) ? nullptr : CopyState(s.state_); } inline Status& Status::operator=(const Status& s) { if (this != &s) { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - s.checked_ = true; - checked_ = false; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + s.MarkChecked(); + MustCheck(); code_ = s.code_; subcode_ = s.subcode_; sev_ = s.sev_; @@ -560,9 +504,7 @@ inline Status::Status(Status&& s) noexcept #endif : Status() { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - s.checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + s.MarkChecked(); *this = std::move(s); } @@ -572,10 +514,8 @@ inline Status& Status::operator=(Status&& s) #endif { if (this != &s) { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - s.checked_ = true; - checked_ = false; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + s.MarkChecked(); + MustCheck(); code_ = std::move(s.code_); s.code_ = kOk; subcode_ = std::move(s.subcode_); @@ -590,18 +530,14 @@ inline Status& Status::operator=(Status&& s) } inline bool Status::operator==(const Status& rhs) const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; - rhs.checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); + rhs.MarkChecked(); return (code_ == rhs.code_); } inline bool Status::operator!=(const Status& rhs) const { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - checked_ = true; - rhs.checked_ = true; -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + MarkChecked(); + rhs.MarkChecked(); return !(*this == rhs); } diff --git a/options/options_parser.cc b/options/options_parser.cc index e5f6106ab..42cde218a 100644 --- a/options/options_parser.cc +++ b/options/options_parser.cc @@ -13,7 +13,7 @@ #include #include -#include "file/read_write_util.h" +#include "file/line_file_reader.h" #include "file/writable_file_writer.h" #include "options/cf_options.h" #include "options/db_options.h" @@ -262,22 +262,17 @@ Status RocksDBOptionsParser::Parse(const ConfigOptions& config_options_in, if (!s.ok()) { return s; } - SequentialFileReader sf_reader(std::move(seq_file), file_name, - config_options.file_readahead_size); + LineFileReader lf_reader(std::move(seq_file), file_name, + config_options.file_readahead_size); OptionSection section = kOptionSectionUnknown; std::string title; std::string argument; std::unordered_map opt_map; - std::istringstream iss; std::string line; - bool has_data = true; // we only support single-lined statement. - for (int line_num = 1; ReadOneLine(&iss, &sf_reader, &line, &has_data, &s); - ++line_num) { - if (!s.ok()) { - return s; - } + while (lf_reader.ReadLine(&line)) { + int line_num = static_cast(lf_reader.GetLineNumber()); line = TrimAndRemoveComment(line); if (line.empty()) { continue; @@ -313,6 +308,10 @@ Status RocksDBOptionsParser::Parse(const ConfigOptions& config_options_in, opt_map.insert({name, value}); } } + s = lf_reader.GetStatus(); + if (!s.ok()) { + return s; + } s = EndSection(config_options, section, title, argument, opt_map); opt_map.clear(); diff --git a/src.mk b/src.mk index 9925260f1..d8201f018 100644 --- a/src.mk +++ b/src.mk @@ -93,6 +93,7 @@ LIB_SOURCES = \ file/file_prefetch_buffer.cc \ file/file_util.cc \ file/filename.cc \ + file/line_file_reader.cc \ file/random_access_file_reader.cc \ file/read_write_util.cc \ file/readahead_raf.cc \ diff --git a/tools/trace_analyzer_test.cc b/tools/trace_analyzer_test.cc index f0deb12aa..6a035b24a 100644 --- a/tools/trace_analyzer_test.cc +++ b/tools/trace_analyzer_test.cc @@ -23,7 +23,7 @@ int main() { #include #include "db/db_test_util.h" -#include "file/read_write_util.h" +#include "file/line_file_reader.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/status.h" @@ -133,21 +133,17 @@ class TraceAnalyzerTest : public testing::Test { std::unique_ptr file; ASSERT_OK(fs->NewSequentialFile(file_path, fopts, &file, nullptr)); - std::string get_line; - std::istringstream iss; - bool has_data = true; + LineFileReader lf_reader(std::move(file), file_path, + 4096 /* filereadahead_size */); + std::vector result; - uint32_t count; - Status s; - SequentialFileReader sf_reader(std::move(file), file_path, - 4096 /* filereadahead_size */); - - for (count = 0; ReadOneLine(&iss, &sf_reader, &get_line, &has_data, &s); - ++count) { - ASSERT_OK(s); - result.push_back(get_line); + std::string line; + while (lf_reader.ReadLine(&line)) { + result.push_back(line); } + ASSERT_OK(lf_reader.GetStatus()); + ASSERT_EQ(cnt.size(), result.size()); for (int i = 0; i < static_cast(result.size()); i++) { if (full_content) { diff --git a/tools/trace_analyzer_tool.cc b/tools/trace_analyzer_tool.cc index a19947af2..4d8b3b4a6 100644 --- a/tools/trace_analyzer_tool.cc +++ b/tools/trace_analyzer_tool.cc @@ -26,7 +26,7 @@ #include "db/memtable.h" #include "db/write_batch_internal.h" #include "env/composite_env_wrapper.h" -#include "file/read_write_util.h" +#include "file/line_file_reader.h" #include "file/writable_file_writer.h" #include "options/cf_options.h" #include "rocksdb/db.h" @@ -1071,8 +1071,6 @@ Status TraceAnalyzer::ReProcessing() { FLAGS_key_space_dir + "/" + std::to_string(cf_id) + ".txt"; std::string input_key, get_key; std::vector prefix(kTaTypeNum); - std::istringstream iss; - bool has_data = true; std::unique_ptr file; s = env_->GetFileSystem()->NewSequentialFile( @@ -1085,17 +1083,11 @@ Status TraceAnalyzer::ReProcessing() { if (file) { size_t kTraceFileReadaheadSize = 2 * 1024 * 1024; - SequentialFileReader sf_reader( + LineFileReader lf_reader( std::move(file), whole_key_path, kTraceFileReadaheadSize /* filereadahead_size */); - for (cfs_[cf_id].w_count = 0; - ReadOneLine(&iss, &sf_reader, &get_key, &has_data, &s); + for (cfs_[cf_id].w_count = 0; lf_reader.ReadLine(&get_key); ++cfs_[cf_id].w_count) { - if (!s.ok()) { - fprintf(stderr, "Read whole key space file failed\n"); - return s; - } - input_key = ROCKSDB_NAMESPACE::LDBCommand::HexToString(get_key); for (int type = 0; type < kTaTypeNum; type++) { if (!ta_[type].enabled) { @@ -1152,6 +1144,11 @@ Status TraceAnalyzer::ReProcessing() { } } } + s = lf_reader.GetStatus(); + if (!s.ok()) { + fprintf(stderr, "Read whole key space file failed\n"); + return s; + } } } diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index cf0b4718f..449747024 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -6,6 +6,8 @@ #include #include +#include "env/mock_env.h" +#include "file/line_file_reader.h" #include "file/random_access_file_reader.h" #include "file/readahead_raf.h" #include "file/sequence_file_reader.h" @@ -496,6 +498,103 @@ INSTANTIATE_TEST_CASE_P( INSTANTIATE_TEST_CASE_P( ReadExceedsReadaheadSize, ReadaheadSequentialFileTest, ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList())); + +namespace { +std::string GenerateLine(int n) { + std::string rv; + // Multiples of 17 characters per line, for likely bad buffer alignment + for (int i = 0; i < n; ++i) { + rv.push_back(static_cast('0' + (i % 10))); + rv.append("xxxxxxxxxxxxxxxx"); + } + return rv; +} +} // namespace + +TEST(LineFileReaderTest, LineFileReaderTest) { + const int nlines = 1000; + + std::unique_ptr mem_env(new MockEnv(Env::Default())); + std::shared_ptr fs = mem_env->GetFileSystem(); + // Create an input file + { + std::unique_ptr file; + ASSERT_OK( + fs->NewWritableFile("testfile", FileOptions(), &file, /*dbg*/ nullptr)); + + for (int i = 0; i < nlines; ++i) { + std::string line = GenerateLine(i); + line.push_back('\n'); + ASSERT_OK(file->Append(line, IOOptions(), /*dbg*/ nullptr)); + } + } + + // Verify with no I/O errors + { + std::unique_ptr reader; + ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader, + nullptr)); + std::string line; + int count = 0; + while (reader->ReadLine(&line)) { + ASSERT_EQ(line, GenerateLine(count)); + ++count; + ASSERT_EQ(static_cast(reader->GetLineNumber()), count); + } + ASSERT_OK(reader->GetStatus()); + ASSERT_EQ(count, nlines); + ASSERT_EQ(static_cast(reader->GetLineNumber()), count); + // And still + ASSERT_FALSE(reader->ReadLine(&line)); + ASSERT_OK(reader->GetStatus()); + ASSERT_EQ(static_cast(reader->GetLineNumber()), count); + } + + // Verify with injected I/O error + { + std::unique_ptr reader; + ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader, + nullptr)); + std::string line; + int count = 0; + // Read part way through the file + while (count < nlines / 4) { + ASSERT_TRUE(reader->ReadLine(&line)); + ASSERT_EQ(line, GenerateLine(count)); + ++count; + ASSERT_EQ(static_cast(reader->GetLineNumber()), count); + } + ASSERT_OK(reader->GetStatus()); + + // Inject error + int callback_count = 0; + SyncPoint::GetInstance()->SetCallBack( + "MemFile::Read:IOStatus", [&](void* arg) { + IOStatus* status = static_cast(arg); + *status = IOStatus::Corruption("test"); + ++callback_count; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + while (reader->ReadLine(&line)) { + ASSERT_EQ(line, GenerateLine(count)); + ++count; + ASSERT_EQ(static_cast(reader->GetLineNumber()), count); + } + ASSERT_TRUE(reader->GetStatus().IsCorruption()); + ASSERT_LT(count, nlines / 2); + ASSERT_EQ(callback_count, 1); + + // Still get error & no retry + ASSERT_FALSE(reader->ReadLine(&line)); + ASSERT_TRUE(reader->GetStatus().IsCorruption()); + ASSERT_EQ(callback_count, 1); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index bfd562316..876ffa91a 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -30,6 +30,7 @@ #include "env/composite_env_wrapper.h" #include "file/filename.h" +#include "file/line_file_reader.h" #include "file/sequence_file_reader.h" #include "file/writable_file_writer.h" #include "logging/logging.h" @@ -289,8 +290,6 @@ class BackupEngineImpl : public BackupEngine { std::vector> files_; std::unordered_map>* file_infos_; Env* env_; - - static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024; // 10MB }; // BackupMeta inline std::string GetAbsolutePath( @@ -2140,53 +2139,54 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile( const std::string& backup_dir, const std::unordered_map& abs_path_to_size) { assert(Empty()); - Status s; - std::unique_ptr backup_meta_reader; - s = SequentialFileReader::Create(env_->GetFileSystem(), meta_filename_, - FileOptions(), &backup_meta_reader, nullptr); - if (!s.ok()) { - return s; + std::unique_ptr backup_meta_reader; + { + Status s = + LineFileReader::Create(env_->GetFileSystem(), meta_filename_, + FileOptions(), &backup_meta_reader, nullptr); + if (!s.ok()) { + return s; + } } - std::unique_ptr buf(new char[max_backup_meta_file_size_ + 1]); - Slice data; - s = backup_meta_reader->Read(max_backup_meta_file_size_, &data, buf.get()); - if (!s.ok() || data.size() == max_backup_meta_file_size_) { - return s.ok() ? Status::Corruption("File size too big") : s; + // Failures handled at the end + std::string line; + if (backup_meta_reader->ReadLine(&line)) { + timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); } - buf[data.size()] = 0; - - uint32_t num_files = 0; - char *next; - timestamp_ = strtoull(data.data(), &next, 10); - data.remove_prefix(next - data.data() + 1); // +1 for '\n' - sequence_number_ = strtoull(data.data(), &next, 10); - data.remove_prefix(next - data.data() + 1); // +1 for '\n' - - if (data.starts_with(kMetaDataPrefix)) { - // app metadata present - data.remove_prefix(kMetaDataPrefix.size()); - Slice hex_encoded_metadata = GetSliceUntil(&data, '\n'); - bool decode_success = hex_encoded_metadata.DecodeHex(&app_metadata_); - if (!decode_success) { - return Status::Corruption( - "Failed to decode stored hex encoded app metadata"); + if (backup_meta_reader->ReadLine(&line)) { + sequence_number_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); + } + if (backup_meta_reader->ReadLine(&line)) { + Slice data = line; + if (data.starts_with(kMetaDataPrefix)) { + // app metadata present + data.remove_prefix(kMetaDataPrefix.size()); + bool decode_success = data.DecodeHex(&app_metadata_); + if (!decode_success) { + return Status::Corruption( + "Failed to decode stored hex encoded app metadata"); + } + line.clear(); + } else { + // process the line below } + } else { + line.clear(); + } + uint32_t num_files = UINT32_MAX; + if (!line.empty() || backup_meta_reader->ReadLine(&line)) { + num_files = static_cast(strtoul(line.c_str(), nullptr, 10)); } - - num_files = static_cast(strtoul(data.data(), &next, 10)); - data.remove_prefix(next - data.data() + 1); // +1 for '\n' - std::vector> files; + while (backup_meta_reader->ReadLine(&line)) { + std::vector components = StringSplit(line, ' '); - // WART: The checksums are crc32c, not original crc32 - Slice checksum_prefix("crc32 "); + if (components.size() < 1) { + return Status::Corruption("Empty line instead of file entry."); + } - for (uint32_t i = 0; s.ok() && i < num_files; ++i) { - auto line = GetSliceUntil(&data, '\n'); - // filename is relative, i.e., shared/number.sst, - // shared_checksum/number.sst, or private/backup_id/number.sst - std::string filename = GetSliceUntil(&line, ' ').ToString(); + const std::string& filename = components[0]; uint64_t size; const std::shared_ptr file_info = GetFile(filename); @@ -2194,52 +2194,63 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile( size = file_info->size; } else { std::string abs_path = backup_dir + "/" + filename; - try { - size = abs_path_to_size.at(abs_path); - } catch (std::out_of_range&) { - return Status::Corruption("Size missing for pathname: " + abs_path); + auto e = abs_path_to_size.find(abs_path); + if (e == abs_path_to_size.end()) { + return Status::Corruption("Pathname in meta file not found on disk: " + + abs_path); } + size = e->second; } - if (line.empty()) { + if (components.size() < 3) { return Status::Corruption("File checksum is missing for " + filename + " in " + meta_filename_); } - uint32_t checksum_value = 0; - if (line.starts_with(checksum_prefix)) { - line.remove_prefix(checksum_prefix.size()); - checksum_value = static_cast(strtoul(line.data(), nullptr, 10)); - if (line != ROCKSDB_NAMESPACE::ToString(checksum_value)) { - return Status::Corruption("Invalid checksum value for " + filename + - " in " + meta_filename_); - } - } else { + // WART: The checksums are crc32c, not original crc32 + if (components[1] != "crc32") { return Status::Corruption("Unknown checksum type for " + filename + " in " + meta_filename_); } + uint32_t checksum_value = + static_cast(strtoul(components[2].c_str(), nullptr, 10)); + if (components[2] != ROCKSDB_NAMESPACE::ToString(checksum_value)) { + return Status::Corruption("Invalid checksum value for " + filename + + " in " + meta_filename_); + } + + if (components.size() > 3) { + return Status::Corruption("Extra data for entry " + filename + " in " + + meta_filename_); + } + files.emplace_back( new FileInfo(filename, size, ChecksumInt32ToHex(checksum_value))); } - if (s.ok() && data.size() > 0) { - // file has to be read completely. if not, we count it as corruption - s = Status::Corruption("Tailing data in backup meta file in " + - meta_filename_); + { + Status s = backup_meta_reader->GetStatus(); + if (!s.ok()) { + return s; + } + } + + if (num_files != files.size()) { + return Status::Corruption( + "Inconsistent number of files or missing/incomplete header in " + + meta_filename_); } - if (s.ok()) { - files_.reserve(files.size()); - for (const auto& file_info : files) { - s = AddFile(file_info); - if (!s.ok()) { - break; - } + files_.reserve(files.size()); + for (const auto& file_info : files) { + Status s = AddFile(file_info); + if (!s.ok()) { + return s; } } - return s; + return Status::OK(); } Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) { @@ -2254,7 +2265,7 @@ Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) { } std::ostringstream buf; - buf << timestamp_ << "\n"; + buf << static_cast(timestamp_) << "\n"; buf << sequence_number_ << "\n"; if (!app_metadata_.empty()) {