Refactor: add LineFileReader and Status::MustCheck (#8026)

Summary:
Removed confusing, awkward, and undocumented internal API
ReadOneLine and replaced with very simple LineFileReader.

In refactoring backupable_db.cc, this has the side benefit of
removing the arbitrary cap on the size of backup metadata files.

Also added Status::MustCheck to make it easy to mark a Status as
"must check." Using this, I can ensure that after
LineFileReader::ReadLine returns false the caller checks GetStatus().

Also removed some excessive conditional compilation in status.h

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8026

Test Plan: added unit test, and running tests with ASSERT_STATUS_CHECKED

Reviewed By: mrambacher

Differential Revision: D26831687

Pulled By: pdillinger

fbshipit-source-id: ef749c265a7a26bb13cd44f6f0f97db2955f6f0f
main
Peter Dillinger 4 years ago committed by Facebook GitHub Bot
parent 847ca9f964
commit 4b18c46d10
  1. 1
      CMakeLists.txt
  2. 2
      TARGETS
  3. 10
      env/mock_env.cc
  4. 65
      file/line_file_reader.cc
  5. 59
      file/line_file_reader.h
  6. 37
      file/read_write_util.cc
  7. 4
      file/read_write_util.h
  8. 164
      include/rocksdb/status.h
  9. 19
      options/options_parser.cc
  10. 1
      src.mk
  11. 22
      tools/trace_analyzer_test.cc
  12. 19
      tools/trace_analyzer_tool.cc
  13. 99
      util/file_reader_writer_test.cc
  14. 147
      utilities/backupable/backupable_db.cc

@ -653,6 +653,7 @@ set(SOURCES
file/file_prefetch_buffer.cc
file/file_util.cc
file/filename.cc
file/line_file_reader.cc
file/random_access_file_reader.cc
file/read_write_util.cc
file/readahead_raf.cc

@ -222,6 +222,7 @@ cpp_library(
"file/file_prefetch_buffer.cc",
"file/file_util.cc",
"file/filename.cc",
"file/line_file_reader.cc",
"file/random_access_file_reader.cc",
"file/read_write_util.cc",
"file/readahead_raf.cc",
@ -528,6 +529,7 @@ cpp_library(
"file/file_prefetch_buffer.cc",
"file/file_util.cc",
"file/filename.cc",
"file/line_file_reader.cc",
"file/random_access_file_reader.cc",
"file/read_write_util.cc",
"file/readahead_raf.cc",

10
env/mock_env.cc vendored

@ -15,6 +15,7 @@
#include "file/filename.h"
#include "port/sys_time.h"
#include "rocksdb/file_system.h"
#include "test_util/sync_point.h"
#include "util/cast_util.h"
#include "util/hash.h"
#include "util/random.h"
@ -105,6 +106,15 @@ class MemFile {
IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*options*/,
Slice* result, char* scratch, IODebugContext* /*dbg*/) const {
{
IOStatus s;
TEST_SYNC_POINT_CALLBACK("MemFile::Read:IOStatus", &s);
if (!s.ok()) {
// with sync point only
*result = Slice();
return s;
}
}
MutexLock lock(&mutex_);
const uint64_t available = Size() - std::min(Size(), offset);
size_t offset_ = static_cast<size_t>(offset);

@ -0,0 +1,65 @@
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "file/line_file_reader.h"
#include <cstring>
namespace ROCKSDB_NAMESPACE {
Status LineFileReader::Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<LineFileReader>* reader,
IODebugContext* dbg) {
std::unique_ptr<FSSequentialFile> file;
Status s = fs->NewSequentialFile(fname, file_opts, &file, dbg);
if (s.ok()) {
reader->reset(new LineFileReader(std::move(file), fname));
}
return s;
}
bool LineFileReader::ReadLine(std::string* out) {
assert(out);
if (!status_.ok()) {
// Status should be checked (or permit unchecked) any time we return false.
status_.MustCheck();
return false;
}
out->clear();
for (;;) {
// Look for line delimiter
const char* found = static_cast<const char*>(
std::memchr(buf_begin_, '\n', buf_end_ - buf_begin_));
if (found) {
size_t len = found - buf_begin_;
out->append(buf_begin_, len);
buf_begin_ += len + /*delim*/ 1;
++line_number_;
return true;
}
if (at_eof_) {
status_.MustCheck();
return false;
}
// else flush and reload buffer
out->append(buf_begin_, buf_end_ - buf_begin_);
Slice result;
status_ = sfr_.Read(buf_.size(), &result, buf_.data());
if (!status_.ok()) {
status_.MustCheck();
return false;
}
if (result.size() != buf_.size()) {
// The obscure way of indicating EOF
at_eof_ = true;
}
buf_begin_ = result.data();
buf_end_ = result.data() + result.size();
}
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,59 @@
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <array>
#include "file/sequence_file_reader.h"
namespace ROCKSDB_NAMESPACE {
// A wrapper on top of Env::SequentialFile for reading text lines from a file.
// Lines are delimited by '\n'. The last line may or may not include a
// trailing newline. Uses SequentialFileReader internally.
class LineFileReader {
private:
std::array<char, 8192> buf_;
SequentialFileReader sfr_;
Status status_;
const char* buf_begin_ = buf_.data();
const char* buf_end_ = buf_.data();
size_t line_number_ = 0;
bool at_eof_ = false;
public:
// See SequentialFileReader constructors
template <typename... Args>
explicit LineFileReader(Args&&... args)
: sfr_(std::forward<Args&&>(args)...) {}
static Status Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<LineFileReader>* reader,
IODebugContext* dbg);
LineFileReader(const LineFileReader&) = delete;
LineFileReader& operator=(const LineFileReader&) = delete;
// Reads another line from the file, returning true on success and saving
// the line to `out`, without delimiter, or returning false on failure. You
// must check GetStatus() to determine whether the failure was just
// end-of-file (OK status) or an I/O error (another status).
bool ReadLine(std::string* out);
// Returns the number of the line most recently returned from ReadLine.
// Return value is unspecified if ReadLine has returned false due to
// I/O error. After ReadLine returns false due to end-of-file, return
// value is the last returned line number, or equivalently the total
// number of lines returned.
size_t GetLineNumber() const { return line_number_; }
// Returns any error encountered during read. The error is considered
// permanent and no retry or recovery is attempted with the same
// LineFileReader.
const Status& GetStatus() const { return status_; }
};
} // namespace ROCKSDB_NAMESPACE

@ -22,43 +22,6 @@ IOStatus NewWritableFile(FileSystem* fs, const std::string& fname,
return s;
}
bool ReadOneLine(std::istringstream* iss, SequentialFileReader* seq_file_reader,
std::string* output, bool* has_data, Status* result) {
const int kBufferSize = 8192;
char buffer[kBufferSize + 1];
Slice input_slice;
std::string line;
bool has_complete_line = false;
while (!has_complete_line) {
if (std::getline(*iss, line)) {
has_complete_line = !iss->eof();
} else {
has_complete_line = false;
}
if (!has_complete_line) {
// if we're not sure whether we have a complete line,
// further read from the file.
if (*has_data) {
*result = seq_file_reader->Read(kBufferSize, &input_slice, buffer);
}
if (input_slice.size() == 0) {
// meaning we have read all the data
*has_data = false;
break;
} else {
iss->str(line + input_slice.ToString());
// reset the internal state of iss so that we can keep reading it.
iss->clear();
*has_data = (input_slice.size() == kBufferSize);
continue;
}
}
}
*output = line;
return *has_data || has_complete_line;
}
#ifndef NDEBUG
bool IsFileSectorAligned(const size_t off, size_t sector_size) {
return off % sector_size == 0;

@ -24,10 +24,6 @@ extern IOStatus NewWritableFile(FileSystem* fs, const std::string& fname,
std::unique_ptr<FSWritableFile>* result,
const FileOptions& options);
// Read a single line from a file.
bool ReadOneLine(std::istringstream* iss, SequentialFileReader* seq_file_reader,
std::string* output, bool* has_data, Status* result);
#ifndef NDEBUG
bool IsFileSectorAligned(const size_t off, size_t sector_size);
#endif // NDEBUG

@ -65,9 +65,11 @@ class Status {
// In case of intentionally swallowing an error, user must explicitly call
// this function. That way we are easily able to search the code to find where
// error swallowing occurs.
void PermitUncheckedError() const {
inline void PermitUncheckedError() const { MarkChecked(); }
inline void MustCheck() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
checked_ = false;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
}
@ -92,9 +94,7 @@ class Status {
};
Code code() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return code_;
}
@ -118,9 +118,7 @@ class Status {
};
SubCode subcode() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return subcode_;
}
@ -135,17 +133,13 @@ class Status {
Status(const Status& s, Severity sev);
Severity severity() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return sev_;
}
// Returns a C style string indicating the message of the Status
const char* getState() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return state_;
}
@ -289,127 +283,95 @@ class Status {
// Returns true iff the status indicates success.
bool ok() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return code() == kOk;
}
// Returns true iff the status indicates success *with* something
// overwritten
bool IsOkOverwritten() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return code() == kOk && subcode() == kOverwritten;
}
// Returns true iff the status indicates a NotFound error.
bool IsNotFound() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return code() == kNotFound;
}
// Returns true iff the status indicates a Corruption error.
bool IsCorruption() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return code() == kCorruption;
}
// Returns true iff the status indicates a NotSupported error.
bool IsNotSupported() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return code() == kNotSupported;
}
// Returns true iff the status indicates an InvalidArgument error.
bool IsInvalidArgument() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return code() == kInvalidArgument;
}
// Returns true iff the status indicates an IOError.
bool IsIOError() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return code() == kIOError;
}
// Returns true iff the status indicates an MergeInProgress.
bool IsMergeInProgress() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return code() == kMergeInProgress;
}
// Returns true iff the status indicates Incomplete
bool IsIncomplete() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return code() == kIncomplete;
}
// Returns true iff the status indicates Shutdown In progress
bool IsShutdownInProgress() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return code() == kShutdownInProgress;
}
bool IsTimedOut() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return code() == kTimedOut;
}
bool IsAborted() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return code() == kAborted;
}
bool IsLockLimit() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return code() == kAborted && subcode() == kLockLimit;
}
// Returns true iff the status indicates that a resource is Busy and
// temporarily could not be acquired.
bool IsBusy() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return code() == kBusy;
}
bool IsDeadlock() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return code() == kBusy && subcode() == kDeadlock;
}
// Returns true iff the status indicated that the operation has Expired.
bool IsExpired() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return code() == kExpired;
}
@ -417,25 +379,19 @@ class Status {
// This usually means that the operation failed, but may succeed if
// re-attempted.
bool IsTryAgain() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return code() == kTryAgain;
}
// Returns true iff the status indicates the proposed compaction is too large
bool IsCompactionTooLarge() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return code() == kCompactionTooLarge;
}
// Returns true iff the status indicates Column Family Dropped
bool IsColumnFamilyDropped() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return code() == kColumnFamilyDropped;
}
@ -445,9 +401,7 @@ class Status {
// with a specific subcode, enabling users to take the appropriate action
// if needed
bool IsNoSpace() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return (code() == kIOError) && (subcode() == kNoSpace);
}
@ -455,9 +409,7 @@ class Status {
// cases where we limit the memory used in certain operations (eg. the size
// of a write batch) in order to avoid out of memory exceptions.
bool IsMemoryLimit() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return (code() == kAborted) && (subcode() == kMemoryLimit);
}
@ -466,9 +418,7 @@ class Status {
// directory" error condition. A PathNotFound error is an I/O error with
// a specific subcode, enabling users to take appropriate action if necessary
bool IsPathNotFound() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return (code() == kIOError || code() == kNotFound) &&
(subcode() == kPathNotFound);
}
@ -476,25 +426,19 @@ class Status {
// Returns true iff the status indicates manual compaction paused. This
// is caused by a call to PauseManualCompaction
bool IsManualCompactionPaused() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return (code() == kIncomplete) && (subcode() == kManualCompactionPaused);
}
// Returns true iff the status indicates a TxnNotPrepared error.
bool IsTxnNotPrepared() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return (code() == kInvalidArgument) && (subcode() == kTxnNotPrepared);
}
// Returns true iff the status indicates a IOFenced error.
bool IsIOFenced() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
return (code() == kIOError) && (subcode() == kIOFenced);
}
@ -524,28 +468,28 @@ class Status {
: Status(_code, kNone, msg, msg2) {}
static const char* CopyState(const char* s);
inline void MarkChecked() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
}
};
inline Status::Status(const Status& s)
: code_(s.code_), subcode_(s.subcode_), sev_(s.sev_) {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
s.checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
s.MarkChecked();
state_ = (s.state_ == nullptr) ? nullptr : CopyState(s.state_);
}
inline Status::Status(const Status& s, Severity sev)
: code_(s.code_), subcode_(s.subcode_), sev_(sev) {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
s.checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
s.MarkChecked();
state_ = (s.state_ == nullptr) ? nullptr : CopyState(s.state_);
}
inline Status& Status::operator=(const Status& s) {
if (this != &s) {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
s.checked_ = true;
checked_ = false;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
s.MarkChecked();
MustCheck();
code_ = s.code_;
subcode_ = s.subcode_;
sev_ = s.sev_;
@ -560,9 +504,7 @@ inline Status::Status(Status&& s)
noexcept
#endif
: Status() {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
s.checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
s.MarkChecked();
*this = std::move(s);
}
@ -572,10 +514,8 @@ inline Status& Status::operator=(Status&& s)
#endif
{
if (this != &s) {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
s.checked_ = true;
checked_ = false;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
s.MarkChecked();
MustCheck();
code_ = std::move(s.code_);
s.code_ = kOk;
subcode_ = std::move(s.subcode_);
@ -590,18 +530,14 @@ inline Status& Status::operator=(Status&& s)
}
inline bool Status::operator==(const Status& rhs) const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
rhs.checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
rhs.MarkChecked();
return (code_ == rhs.code_);
}
inline bool Status::operator!=(const Status& rhs) const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
rhs.checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
MarkChecked();
rhs.MarkChecked();
return !(*this == rhs);
}

@ -13,7 +13,7 @@
#include <utility>
#include <vector>
#include "file/read_write_util.h"
#include "file/line_file_reader.h"
#include "file/writable_file_writer.h"
#include "options/cf_options.h"
#include "options/db_options.h"
@ -262,22 +262,17 @@ Status RocksDBOptionsParser::Parse(const ConfigOptions& config_options_in,
if (!s.ok()) {
return s;
}
SequentialFileReader sf_reader(std::move(seq_file), file_name,
config_options.file_readahead_size);
LineFileReader lf_reader(std::move(seq_file), file_name,
config_options.file_readahead_size);
OptionSection section = kOptionSectionUnknown;
std::string title;
std::string argument;
std::unordered_map<std::string, std::string> opt_map;
std::istringstream iss;
std::string line;
bool has_data = true;
// we only support single-lined statement.
for (int line_num = 1; ReadOneLine(&iss, &sf_reader, &line, &has_data, &s);
++line_num) {
if (!s.ok()) {
return s;
}
while (lf_reader.ReadLine(&line)) {
int line_num = static_cast<int>(lf_reader.GetLineNumber());
line = TrimAndRemoveComment(line);
if (line.empty()) {
continue;
@ -313,6 +308,10 @@ Status RocksDBOptionsParser::Parse(const ConfigOptions& config_options_in,
opt_map.insert({name, value});
}
}
s = lf_reader.GetStatus();
if (!s.ok()) {
return s;
}
s = EndSection(config_options, section, title, argument, opt_map);
opt_map.clear();

@ -93,6 +93,7 @@ LIB_SOURCES = \
file/file_prefetch_buffer.cc \
file/file_util.cc \
file/filename.cc \
file/line_file_reader.cc \
file/random_access_file_reader.cc \
file/read_write_util.cc \
file/readahead_raf.cc \

@ -23,7 +23,7 @@ int main() {
#include <thread>
#include "db/db_test_util.h"
#include "file/read_write_util.h"
#include "file/line_file_reader.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/status.h"
@ -133,21 +133,17 @@ class TraceAnalyzerTest : public testing::Test {
std::unique_ptr<FSSequentialFile> file;
ASSERT_OK(fs->NewSequentialFile(file_path, fopts, &file, nullptr));
std::string get_line;
std::istringstream iss;
bool has_data = true;
LineFileReader lf_reader(std::move(file), file_path,
4096 /* filereadahead_size */);
std::vector<std::string> result;
uint32_t count;
Status s;
SequentialFileReader sf_reader(std::move(file), file_path,
4096 /* filereadahead_size */);
for (count = 0; ReadOneLine(&iss, &sf_reader, &get_line, &has_data, &s);
++count) {
ASSERT_OK(s);
result.push_back(get_line);
std::string line;
while (lf_reader.ReadLine(&line)) {
result.push_back(line);
}
ASSERT_OK(lf_reader.GetStatus());
ASSERT_EQ(cnt.size(), result.size());
for (int i = 0; i < static_cast<int>(result.size()); i++) {
if (full_content) {

@ -26,7 +26,7 @@
#include "db/memtable.h"
#include "db/write_batch_internal.h"
#include "env/composite_env_wrapper.h"
#include "file/read_write_util.h"
#include "file/line_file_reader.h"
#include "file/writable_file_writer.h"
#include "options/cf_options.h"
#include "rocksdb/db.h"
@ -1071,8 +1071,6 @@ Status TraceAnalyzer::ReProcessing() {
FLAGS_key_space_dir + "/" + std::to_string(cf_id) + ".txt";
std::string input_key, get_key;
std::vector<std::string> prefix(kTaTypeNum);
std::istringstream iss;
bool has_data = true;
std::unique_ptr<FSSequentialFile> file;
s = env_->GetFileSystem()->NewSequentialFile(
@ -1085,17 +1083,11 @@ Status TraceAnalyzer::ReProcessing() {
if (file) {
size_t kTraceFileReadaheadSize = 2 * 1024 * 1024;
SequentialFileReader sf_reader(
LineFileReader lf_reader(
std::move(file), whole_key_path,
kTraceFileReadaheadSize /* filereadahead_size */);
for (cfs_[cf_id].w_count = 0;
ReadOneLine(&iss, &sf_reader, &get_key, &has_data, &s);
for (cfs_[cf_id].w_count = 0; lf_reader.ReadLine(&get_key);
++cfs_[cf_id].w_count) {
if (!s.ok()) {
fprintf(stderr, "Read whole key space file failed\n");
return s;
}
input_key = ROCKSDB_NAMESPACE::LDBCommand::HexToString(get_key);
for (int type = 0; type < kTaTypeNum; type++) {
if (!ta_[type].enabled) {
@ -1152,6 +1144,11 @@ Status TraceAnalyzer::ReProcessing() {
}
}
}
s = lf_reader.GetStatus();
if (!s.ok()) {
fprintf(stderr, "Read whole key space file failed\n");
return s;
}
}
}

@ -6,6 +6,8 @@
#include <algorithm>
#include <vector>
#include "env/mock_env.h"
#include "file/line_file_reader.h"
#include "file/random_access_file_reader.h"
#include "file/readahead_raf.h"
#include "file/sequence_file_reader.h"
@ -496,6 +498,103 @@ INSTANTIATE_TEST_CASE_P(
INSTANTIATE_TEST_CASE_P(
ReadExceedsReadaheadSize, ReadaheadSequentialFileTest,
::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
namespace {
std::string GenerateLine(int n) {
std::string rv;
// Multiples of 17 characters per line, for likely bad buffer alignment
for (int i = 0; i < n; ++i) {
rv.push_back(static_cast<char>('0' + (i % 10)));
rv.append("xxxxxxxxxxxxxxxx");
}
return rv;
}
} // namespace
TEST(LineFileReaderTest, LineFileReaderTest) {
const int nlines = 1000;
std::unique_ptr<MockEnv> mem_env(new MockEnv(Env::Default()));
std::shared_ptr<FileSystem> fs = mem_env->GetFileSystem();
// Create an input file
{
std::unique_ptr<FSWritableFile> file;
ASSERT_OK(
fs->NewWritableFile("testfile", FileOptions(), &file, /*dbg*/ nullptr));
for (int i = 0; i < nlines; ++i) {
std::string line = GenerateLine(i);
line.push_back('\n');
ASSERT_OK(file->Append(line, IOOptions(), /*dbg*/ nullptr));
}
}
// Verify with no I/O errors
{
std::unique_ptr<LineFileReader> reader;
ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader,
nullptr));
std::string line;
int count = 0;
while (reader->ReadLine(&line)) {
ASSERT_EQ(line, GenerateLine(count));
++count;
ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
}
ASSERT_OK(reader->GetStatus());
ASSERT_EQ(count, nlines);
ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
// And still
ASSERT_FALSE(reader->ReadLine(&line));
ASSERT_OK(reader->GetStatus());
ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
}
// Verify with injected I/O error
{
std::unique_ptr<LineFileReader> reader;
ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader,
nullptr));
std::string line;
int count = 0;
// Read part way through the file
while (count < nlines / 4) {
ASSERT_TRUE(reader->ReadLine(&line));
ASSERT_EQ(line, GenerateLine(count));
++count;
ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
}
ASSERT_OK(reader->GetStatus());
// Inject error
int callback_count = 0;
SyncPoint::GetInstance()->SetCallBack(
"MemFile::Read:IOStatus", [&](void* arg) {
IOStatus* status = static_cast<IOStatus*>(arg);
*status = IOStatus::Corruption("test");
++callback_count;
});
SyncPoint::GetInstance()->EnableProcessing();
while (reader->ReadLine(&line)) {
ASSERT_EQ(line, GenerateLine(count));
++count;
ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
}
ASSERT_TRUE(reader->GetStatus().IsCorruption());
ASSERT_LT(count, nlines / 2);
ASSERT_EQ(callback_count, 1);
// Still get error & no retry
ASSERT_FALSE(reader->ReadLine(&line));
ASSERT_TRUE(reader->GetStatus().IsCorruption());
ASSERT_EQ(callback_count, 1);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -30,6 +30,7 @@
#include "env/composite_env_wrapper.h"
#include "file/filename.h"
#include "file/line_file_reader.h"
#include "file/sequence_file_reader.h"
#include "file/writable_file_writer.h"
#include "logging/logging.h"
@ -289,8 +290,6 @@ class BackupEngineImpl : public BackupEngine {
std::vector<std::shared_ptr<FileInfo>> files_;
std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos_;
Env* env_;
static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024; // 10MB
}; // BackupMeta
inline std::string GetAbsolutePath(
@ -2140,53 +2139,54 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
const std::string& backup_dir,
const std::unordered_map<std::string, uint64_t>& abs_path_to_size) {
assert(Empty());
Status s;
std::unique_ptr<SequentialFileReader> backup_meta_reader;
s = SequentialFileReader::Create(env_->GetFileSystem(), meta_filename_,
FileOptions(), &backup_meta_reader, nullptr);
if (!s.ok()) {
return s;
std::unique_ptr<LineFileReader> backup_meta_reader;
{
Status s =
LineFileReader::Create(env_->GetFileSystem(), meta_filename_,
FileOptions(), &backup_meta_reader, nullptr);
if (!s.ok()) {
return s;
}
}
std::unique_ptr<char[]> buf(new char[max_backup_meta_file_size_ + 1]);
Slice data;
s = backup_meta_reader->Read(max_backup_meta_file_size_, &data, buf.get());
if (!s.ok() || data.size() == max_backup_meta_file_size_) {
return s.ok() ? Status::Corruption("File size too big") : s;
// Failures handled at the end
std::string line;
if (backup_meta_reader->ReadLine(&line)) {
timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
}
buf[data.size()] = 0;
uint32_t num_files = 0;
char *next;
timestamp_ = strtoull(data.data(), &next, 10);
data.remove_prefix(next - data.data() + 1); // +1 for '\n'
sequence_number_ = strtoull(data.data(), &next, 10);
data.remove_prefix(next - data.data() + 1); // +1 for '\n'
if (data.starts_with(kMetaDataPrefix)) {
// app metadata present
data.remove_prefix(kMetaDataPrefix.size());
Slice hex_encoded_metadata = GetSliceUntil(&data, '\n');
bool decode_success = hex_encoded_metadata.DecodeHex(&app_metadata_);
if (!decode_success) {
return Status::Corruption(
"Failed to decode stored hex encoded app metadata");
if (backup_meta_reader->ReadLine(&line)) {
sequence_number_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
}
if (backup_meta_reader->ReadLine(&line)) {
Slice data = line;
if (data.starts_with(kMetaDataPrefix)) {
// app metadata present
data.remove_prefix(kMetaDataPrefix.size());
bool decode_success = data.DecodeHex(&app_metadata_);
if (!decode_success) {
return Status::Corruption(
"Failed to decode stored hex encoded app metadata");
}
line.clear();
} else {
// process the line below
}
} else {
line.clear();
}
uint32_t num_files = UINT32_MAX;
if (!line.empty() || backup_meta_reader->ReadLine(&line)) {
num_files = static_cast<uint32_t>(strtoul(line.c_str(), nullptr, 10));
}
num_files = static_cast<uint32_t>(strtoul(data.data(), &next, 10));
data.remove_prefix(next - data.data() + 1); // +1 for '\n'
std::vector<std::shared_ptr<FileInfo>> files;
while (backup_meta_reader->ReadLine(&line)) {
std::vector<std::string> components = StringSplit(line, ' ');
// WART: The checksums are crc32c, not original crc32
Slice checksum_prefix("crc32 ");
if (components.size() < 1) {
return Status::Corruption("Empty line instead of file entry.");
}
for (uint32_t i = 0; s.ok() && i < num_files; ++i) {
auto line = GetSliceUntil(&data, '\n');
// filename is relative, i.e., shared/number.sst,
// shared_checksum/number.sst, or private/backup_id/number.sst
std::string filename = GetSliceUntil(&line, ' ').ToString();
const std::string& filename = components[0];
uint64_t size;
const std::shared_ptr<FileInfo> file_info = GetFile(filename);
@ -2194,52 +2194,63 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
size = file_info->size;
} else {
std::string abs_path = backup_dir + "/" + filename;
try {
size = abs_path_to_size.at(abs_path);
} catch (std::out_of_range&) {
return Status::Corruption("Size missing for pathname: " + abs_path);
auto e = abs_path_to_size.find(abs_path);
if (e == abs_path_to_size.end()) {
return Status::Corruption("Pathname in meta file not found on disk: " +
abs_path);
}
size = e->second;
}
if (line.empty()) {
if (components.size() < 3) {
return Status::Corruption("File checksum is missing for " + filename +
" in " + meta_filename_);
}
uint32_t checksum_value = 0;
if (line.starts_with(checksum_prefix)) {
line.remove_prefix(checksum_prefix.size());
checksum_value = static_cast<uint32_t>(strtoul(line.data(), nullptr, 10));
if (line != ROCKSDB_NAMESPACE::ToString(checksum_value)) {
return Status::Corruption("Invalid checksum value for " + filename +
" in " + meta_filename_);
}
} else {
// WART: The checksums are crc32c, not original crc32
if (components[1] != "crc32") {
return Status::Corruption("Unknown checksum type for " + filename +
" in " + meta_filename_);
}
uint32_t checksum_value =
static_cast<uint32_t>(strtoul(components[2].c_str(), nullptr, 10));
if (components[2] != ROCKSDB_NAMESPACE::ToString(checksum_value)) {
return Status::Corruption("Invalid checksum value for " + filename +
" in " + meta_filename_);
}
if (components.size() > 3) {
return Status::Corruption("Extra data for entry " + filename + " in " +
meta_filename_);
}
files.emplace_back(
new FileInfo(filename, size, ChecksumInt32ToHex(checksum_value)));
}
if (s.ok() && data.size() > 0) {
// file has to be read completely. if not, we count it as corruption
s = Status::Corruption("Tailing data in backup meta file in " +
meta_filename_);
{
Status s = backup_meta_reader->GetStatus();
if (!s.ok()) {
return s;
}
}
if (num_files != files.size()) {
return Status::Corruption(
"Inconsistent number of files or missing/incomplete header in " +
meta_filename_);
}
if (s.ok()) {
files_.reserve(files.size());
for (const auto& file_info : files) {
s = AddFile(file_info);
if (!s.ok()) {
break;
}
files_.reserve(files.size());
for (const auto& file_info : files) {
Status s = AddFile(file_info);
if (!s.ok()) {
return s;
}
}
return s;
return Status::OK();
}
Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) {
@ -2254,7 +2265,7 @@ Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) {
}
std::ostringstream buf;
buf << timestamp_ << "\n";
buf << static_cast<unsigned long long>(timestamp_) << "\n";
buf << sequence_number_ << "\n";
if (!app_metadata_.empty()) {

Loading…
Cancel
Save