Clean up BlobLogReader and rename it to BlobLogSequentialReader (#7517)

Summary:
The patch does some cleanup in and around the legacy `BlobLogReader` class:
* It renames the class to `BlobLogSequentialReader` to emphasize that it is for
sequentially iterating through blobs in a blob file, as opposed to doing random
point reads using `BlobIndex`es (which is `BlobFileReader`'s jurisdiction).
* It removes some dead code from the old BlobDB implementation that references
`BlobLogReader` (namely the method `BlobFile::OpenRandomAccessReader`).
* It cleans up some `#include`s and forward declarations.
* It fixes some incorrect/outdated comments related to the reader class.
* It adds a few assertions to the `Read` methods of the class.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7517

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D24172611

Pulled By: ltamasi

fbshipit-source-id: 43e2ae1eba5c3dd30c1070cb00f217edc45bd64f
main
Levi Tamasi 4 years ago committed by Facebook GitHub Bot
parent 22655a398b
commit 1f84611e5d
  1. 2
      CMakeLists.txt
  2. 4
      TARGETS
  3. 8
      db/blob/blob_file_builder_test.cc
  4. 23
      db/blob/blob_log_sequential_reader.cc
  5. 45
      db/blob/blob_log_sequential_reader.h
  6. 2
      src.mk
  7. 1
      utilities/blob_db/blob_db_impl.h
  8. 23
      utilities/blob_db/blob_file.cc
  9. 5
      utilities/blob_db/blob_file.h

@ -575,7 +575,7 @@ set(SOURCES
db/blob/blob_file_meta.cc db/blob/blob_file_meta.cc
db/blob/blob_file_reader.cc db/blob/blob_file_reader.cc
db/blob/blob_log_format.cc db/blob/blob_log_format.cc
db/blob/blob_log_reader.cc db/blob/blob_log_sequential_reader.cc
db/blob/blob_log_writer.cc db/blob/blob_log_writer.cc
db/builder.cc db/builder.cc
db/c.cc db/c.cc

@ -139,7 +139,7 @@ cpp_library(
"db/blob/blob_file_meta.cc", "db/blob/blob_file_meta.cc",
"db/blob/blob_file_reader.cc", "db/blob/blob_file_reader.cc",
"db/blob/blob_log_format.cc", "db/blob/blob_log_format.cc",
"db/blob/blob_log_reader.cc", "db/blob/blob_log_sequential_reader.cc",
"db/blob/blob_log_writer.cc", "db/blob/blob_log_writer.cc",
"db/builder.cc", "db/builder.cc",
"db/c.cc", "db/c.cc",
@ -427,7 +427,7 @@ cpp_library(
"db/blob/blob_file_meta.cc", "db/blob/blob_file_meta.cc",
"db/blob/blob_file_reader.cc", "db/blob/blob_file_reader.cc",
"db/blob/blob_log_format.cc", "db/blob/blob_log_format.cc",
"db/blob/blob_log_reader.cc", "db/blob/blob_log_sequential_reader.cc",
"db/blob/blob_log_writer.cc", "db/blob/blob_log_writer.cc",
"db/builder.cc", "db/builder.cc",
"db/c.cc", "db/c.cc",

@ -14,7 +14,7 @@
#include "db/blob/blob_file_addition.h" #include "db/blob/blob_file_addition.h"
#include "db/blob/blob_index.h" #include "db/blob/blob_index.h"
#include "db/blob/blob_log_format.h" #include "db/blob/blob_log_format.h"
#include "db/blob/blob_log_reader.h" #include "db/blob/blob_log_sequential_reader.h"
#include "env/composite_env_wrapper.h" #include "env/composite_env_wrapper.h"
#include "env/mock_env.h" #include "env/mock_env.h"
#include "file/filename.h" #include "file/filename.h"
@ -61,8 +61,8 @@ class BlobFileBuilderTest : public testing::Test {
&mock_env_)); &mock_env_));
constexpr Statistics* statistics = nullptr; constexpr Statistics* statistics = nullptr;
BlobLogReader blob_log_reader(std::move(file_reader), &mock_env_, BlobLogSequentialReader blob_log_reader(std::move(file_reader), &mock_env_,
statistics); statistics);
BlobLogHeader header; BlobLogHeader header;
ASSERT_OK(blob_log_reader.ReadHeader(&header)); ASSERT_OK(blob_log_reader.ReadHeader(&header));
@ -77,7 +77,7 @@ class BlobFileBuilderTest : public testing::Test {
uint64_t blob_offset = 0; uint64_t blob_offset = 0;
ASSERT_OK(blob_log_reader.ReadRecord( ASSERT_OK(blob_log_reader.ReadRecord(
&record, BlobLogReader::kReadHeaderKeyBlob, &blob_offset)); &record, BlobLogSequentialReader::kReadHeaderKeyBlob, &blob_offset));
// Check the contents of the blob file // Check the contents of the blob file
const auto& expected_key_value = expected_key_value_pairs[i]; const auto& expected_key_value = expected_key_value_pairs[i];

@ -4,7 +4,7 @@
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
// //
#include "db/blob/blob_log_reader.h" #include "db/blob/blob_log_sequential_reader.h"
#include <algorithm> #include <algorithm>
@ -14,16 +14,19 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
BlobLogReader::BlobLogReader( BlobLogSequentialReader::BlobLogSequentialReader(
std::unique_ptr<RandomAccessFileReader>&& file_reader, Env* env, std::unique_ptr<RandomAccessFileReader>&& file_reader, Env* env,
Statistics* statistics) Statistics* statistics)
: file_(std::move(file_reader)), : file_(std::move(file_reader)),
env_(env), env_(env),
statistics_(statistics), statistics_(statistics),
buffer_(),
next_byte_(0) {} next_byte_(0) {}
Status BlobLogReader::ReadSlice(uint64_t size, Slice* slice, char* buf) { BlobLogSequentialReader::~BlobLogSequentialReader() = default;
Status BlobLogSequentialReader::ReadSlice(uint64_t size, Slice* slice,
char* buf) {
assert(slice);
assert(file_); assert(file_);
StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS); StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
@ -40,7 +43,8 @@ Status BlobLogReader::ReadSlice(uint64_t size, Slice* slice, char* buf) {
return s; return s;
} }
Status BlobLogReader::ReadHeader(BlobLogHeader* header) { Status BlobLogSequentialReader::ReadHeader(BlobLogHeader* header) {
assert(header);
assert(next_byte_ == 0); assert(next_byte_ == 0);
static_assert(BlobLogHeader::kSize <= sizeof(header_buf_), static_assert(BlobLogHeader::kSize <= sizeof(header_buf_),
@ -58,8 +62,10 @@ Status BlobLogReader::ReadHeader(BlobLogHeader* header) {
return header->DecodeFrom(buffer_); return header->DecodeFrom(buffer_);
} }
Status BlobLogReader::ReadRecord(BlobLogRecord* record, ReadLevel level, Status BlobLogSequentialReader::ReadRecord(BlobLogRecord* record,
uint64_t* blob_offset) { ReadLevel level,
uint64_t* blob_offset) {
assert(record);
static_assert(BlobLogRecord::kHeaderSize <= sizeof(header_buf_), static_assert(BlobLogRecord::kHeaderSize <= sizeof(header_buf_),
"Buffer is smaller than BlobLogRecord::kHeaderSize"); "Buffer is smaller than BlobLogRecord::kHeaderSize");
@ -108,7 +114,8 @@ Status BlobLogReader::ReadRecord(BlobLogRecord* record, ReadLevel level,
return s; return s;
} }
Status BlobLogReader::ReadFooter(BlobLogFooter* footer) { Status BlobLogSequentialReader::ReadFooter(BlobLogFooter* footer) {
assert(footer);
static_assert(BlobLogFooter::kSize <= sizeof(header_buf_), static_assert(BlobLogFooter::kSize <= sizeof(header_buf_),
"Buffer is smaller than BlobLogFooter::kSize"); "Buffer is smaller than BlobLogFooter::kSize");

@ -6,28 +6,26 @@
#pragma once #pragma once
#include <memory> #include <memory>
#include <string>
#include "db/blob/blob_log_format.h" #include "db/blob/blob_log_format.h"
#include "file/random_access_file_reader.h"
#include "rocksdb/env.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
class SequentialFileReader; class RandomAccessFileReader;
class Logger; class Env;
class Statistics;
class Status;
/** /**
* BlobLogReader is a general purpose log stream reader implementation. The * BlobLogSequentialReader is a general purpose log stream reader
* actual job of reading from the device is implemented by the SequentialFile * implementation. The actual job of reading from the device is implemented by
* interface. * the RandomAccessFileReader interface.
* *
* Please see Writer for details on the file and record layout. * Please see BlobLogWriter for details on the file and record layout.
*/ */
class BlobLogReader {
class BlobLogSequentialReader {
public: public:
enum ReadLevel { enum ReadLevel {
kReadHeader, kReadHeader,
@ -35,23 +33,22 @@ class BlobLogReader {
kReadHeaderKeyBlob, kReadHeaderKeyBlob,
}; };
// Create a reader that will return log records from "*file". // Create a reader that will return log records from "*file_reader".
// "*file" must remain live while this BlobLogReader is in use. BlobLogSequentialReader(std::unique_ptr<RandomAccessFileReader>&& file_reader,
BlobLogReader(std::unique_ptr<RandomAccessFileReader>&& file_reader, Env* env, Env* env, Statistics* statistics);
Statistics* statistics);
// No copying allowed // No copying allowed
BlobLogReader(const BlobLogReader&) = delete; BlobLogSequentialReader(const BlobLogSequentialReader&) = delete;
BlobLogReader& operator=(const BlobLogReader&) = delete; BlobLogSequentialReader& operator=(const BlobLogSequentialReader&) = delete;
~BlobLogReader() = default; ~BlobLogSequentialReader();
Status ReadHeader(BlobLogHeader* header); Status ReadHeader(BlobLogHeader* header);
// Read the next record into *record. Returns true if read // Read the next record into *record. Returns true if read
// successfully, false if we hit end of the input. May use // successfully, false if we hit end of the input. The contents filled in
// "*scratch" as temporary storage. The contents filled in *record // *record will only be valid until the next mutating operation on this
// will only be valid until the next mutating operation on this // reader.
// reader or the next mutation to *scratch.
// If blob_offset is non-null, return offset of the blob through it. // If blob_offset is non-null, return offset of the blob through it.
Status ReadRecord(BlobLogRecord* record, ReadLevel level = kReadHeader, Status ReadRecord(BlobLogRecord* record, ReadLevel level = kReadHeader,
uint64_t* blob_offset = nullptr); uint64_t* blob_offset = nullptr);
@ -72,7 +69,7 @@ class BlobLogReader {
Slice buffer_; Slice buffer_;
char header_buf_[BlobLogRecord::kHeaderSize]; char header_buf_[BlobLogRecord::kHeaderSize];
// which byte to read next. For asserting proper usage // which byte to read next
uint64_t next_byte_; uint64_t next_byte_;
}; };

@ -11,7 +11,7 @@ LIB_SOURCES = \
db/blob/blob_file_meta.cc \ db/blob/blob_file_meta.cc \
db/blob/blob_file_reader.cc \ db/blob/blob_file_reader.cc \
db/blob/blob_log_format.cc \ db/blob/blob_log_format.cc \
db/blob/blob_log_reader.cc \ db/blob/blob_log_sequential_reader.cc \
db/blob/blob_log_writer.cc \ db/blob/blob_log_writer.cc \
db/builder.cc \ db/builder.cc \
db/c.cc \ db/c.cc \

@ -20,7 +20,6 @@
#include <vector> #include <vector>
#include "db/blob/blob_log_format.h" #include "db/blob/blob_log_format.h"
#include "db/blob/blob_log_reader.h"
#include "db/blob/blob_log_writer.h" #include "db/blob/blob_log_writer.h"
#include "db/db_iter.h" #include "db/db_iter.h"
#include "rocksdb/compaction_filter.h" #include "rocksdb/compaction_filter.h"

@ -61,29 +61,6 @@ std::string BlobFile::PathName() const {
return BlobFileName(path_to_dir_, file_number_); return BlobFileName(path_to_dir_, file_number_);
} }
std::shared_ptr<BlobLogReader> BlobFile::OpenRandomAccessReader(
Env* env, const DBOptions& db_options,
const EnvOptions& env_options) const {
constexpr size_t kReadaheadSize = 2 * 1024 * 1024;
std::unique_ptr<RandomAccessFile> sfile;
std::string path_name(PathName());
Status s = env->NewRandomAccessFile(path_name, &sfile, env_options);
if (!s.ok()) {
// report something here.
return nullptr;
}
sfile = NewReadaheadRandomAccessFile(std::move(sfile), kReadaheadSize);
std::unique_ptr<RandomAccessFileReader> sfile_reader;
sfile_reader.reset(new RandomAccessFileReader(
NewLegacyRandomAccessFileWrapper(sfile), path_name));
std::shared_ptr<BlobLogReader> log_reader = std::make_shared<BlobLogReader>(
std::move(sfile_reader), db_options.env, db_options.statistics.get());
return log_reader;
}
std::string BlobFile::DumpState() const { std::string BlobFile::DumpState() const {
char str[1000]; char str[1000];
snprintf( snprintf(

@ -11,7 +11,6 @@
#include <unordered_set> #include <unordered_set>
#include "db/blob/blob_log_format.h" #include "db/blob/blob_log_format.h"
#include "db/blob/blob_log_reader.h"
#include "db/blob/blob_log_writer.h" #include "db/blob/blob_log_writer.h"
#include "file/random_access_file_reader.h" #include "file/random_access_file_reader.h"
#include "port/port.h" #include "port/port.h"
@ -216,10 +215,6 @@ class BlobFile {
bool* fresh_open); bool* fresh_open);
private: private:
std::shared_ptr<BlobLogReader> OpenRandomAccessReader(
Env* env, const DBOptions& db_options,
const EnvOptions& env_options) const;
Status ReadFooter(BlobLogFooter* footer); Status ReadFooter(BlobLogFooter* footer);
Status WriteFooterAndCloseLocked(SequenceNumber sequence); Status WriteFooterAndCloseLocked(SequenceNumber sequence);

Loading…
Cancel
Save