Move the blob file format related classes to the main namespace, rename reader/writer (#7086)

Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/7086

Test Plan: `make check`

Reviewed By: zhichao-cao

Differential Revision: D22395420

Pulled By: ltamasi

fbshipit-source-id: 088a20097bd6b73b0c433cd79725779f97ec04f2
main
Levi Tamasi 4 years ago committed by Facebook GitHub Bot
parent 4b107ceb7e
commit a693341604
  1. 2
      db/blob/blob_log_format.cc
  2. 2
      db/blob/blob_log_format.h
  3. 15
      db/blob/blob_log_reader.cc
  4. 22
      db/blob/blob_log_reader.h
  5. 36
      db/blob/blob_log_writer.cc
  6. 21
      db/blob/blob_log_writer.h
  7. 2
      utilities/blob_db/blob_compaction_filter.h
  8. 21
      utilities/blob_db/blob_db_impl.cc
  9. 6
      utilities/blob_db/blob_db_impl.h
  10. 4
      utilities/blob_db/blob_file.cc
  11. 6
      utilities/blob_db/blob_file.h

@ -11,7 +11,6 @@
#include "util/crc32c.h"
namespace ROCKSDB_NAMESPACE {
namespace blob_db {
void BlobLogHeader::EncodeTo(std::string* dst) {
assert(dst != nullptr);
@ -144,6 +143,5 @@ Status BlobLogRecord::CheckBlobCRC() const {
return Status::OK();
}
} // namespace blob_db
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

@ -18,7 +18,6 @@
#include "rocksdb/types.h"
namespace ROCKSDB_NAMESPACE {
namespace blob_db {
constexpr uint32_t kMagicNumber = 2395959; // 0x00248f37
constexpr uint32_t kVersion1 = 1;
@ -126,6 +125,5 @@ struct BlobLogRecord {
Status CheckBlobCRC() const;
};
} // namespace blob_db
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

@ -14,17 +14,17 @@
#include "util/stop_watch.h"
namespace ROCKSDB_NAMESPACE {
namespace blob_db {
Reader::Reader(std::unique_ptr<RandomAccessFileReader>&& file_reader, Env* env,
Statistics* statistics)
BlobLogReader::BlobLogReader(
std::unique_ptr<RandomAccessFileReader>&& file_reader, Env* env,
Statistics* statistics)
: file_(std::move(file_reader)),
env_(env),
statistics_(statistics),
buffer_(),
next_byte_(0) {}
Status Reader::ReadSlice(uint64_t size, Slice* slice, char* buf) {
Status BlobLogReader::ReadSlice(uint64_t size, Slice* slice, char* buf) {
StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
Status s = file_->Read(IOOptions(), next_byte_, static_cast<size_t>(size),
slice, buf, nullptr);
@ -39,7 +39,7 @@ Status Reader::ReadSlice(uint64_t size, Slice* slice, char* buf) {
return s;
}
Status Reader::ReadHeader(BlobLogHeader* header) {
Status BlobLogReader::ReadHeader(BlobLogHeader* header) {
assert(file_.get() != nullptr);
assert(next_byte_ == 0);
Status s = ReadSlice(BlobLogHeader::kSize, &buffer_, header_buf_);
@ -54,8 +54,8 @@ Status Reader::ReadHeader(BlobLogHeader* header) {
return header->DecodeFrom(buffer_);
}
Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level,
uint64_t* blob_offset) {
Status BlobLogReader::ReadRecord(BlobLogRecord* record, ReadLevel level,
uint64_t* blob_offset) {
Status s = ReadSlice(BlobLogRecord::kHeaderSize, &buffer_, header_buf_);
if (!s.ok()) {
return s;
@ -101,6 +101,5 @@ Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level,
return s;
}
} // namespace blob_db
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

@ -22,15 +22,14 @@ namespace ROCKSDB_NAMESPACE {
class SequentialFileReader;
class Logger;
namespace blob_db {
/**
* Reader is a general purpose log stream reader implementation. The actual job
* of reading from the device is implemented by the SequentialFile interface.
* BlobLogReader is a general purpose log stream reader implementation. The
* actual job of reading from the device is implemented by the SequentialFile
* interface.
*
* Please see Writer for details on the file and record layout.
*/
class Reader {
class BlobLogReader {
public:
enum ReadLevel {
kReadHeader,
@ -39,14 +38,14 @@ class Reader {
};
// Create a reader that will return log records from "*file".
// "*file" must remain live while this Reader is in use.
Reader(std::unique_ptr<RandomAccessFileReader>&& file_reader, Env* env,
Statistics* statistics);
// "*file" must remain live while this BlobLogReader is in use.
BlobLogReader(std::unique_ptr<RandomAccessFileReader>&& file_reader, Env* env,
Statistics* statistics);
// No copying allowed
Reader(const Reader&) = delete;
Reader& operator=(const Reader&) = delete;
BlobLogReader(const BlobLogReader&) = delete;
BlobLogReader& operator=(const BlobLogReader&) = delete;
~Reader() = default;
~BlobLogReader() = default;
Status ReadHeader(BlobLogHeader* header);
@ -77,6 +76,5 @@ class Reader {
uint64_t next_byte_;
};
} // namespace blob_db
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

@ -17,11 +17,11 @@
#include "util/stop_watch.h"
namespace ROCKSDB_NAMESPACE {
namespace blob_db {
Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, Env* env,
Statistics* statistics, uint64_t log_number, uint64_t bpsync,
bool use_fs, uint64_t boffset)
BlobLogWriter::BlobLogWriter(std::unique_ptr<WritableFileWriter>&& dest,
Env* env, Statistics* statistics,
uint64_t log_number, uint64_t bpsync, bool use_fs,
uint64_t boffset)
: dest_(std::move(dest)),
env_(env),
statistics_(statistics),
@ -32,14 +32,14 @@ Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, Env* env,
use_fsync_(use_fs),
last_elem_type_(kEtNone) {}
Status Writer::Sync() {
Status BlobLogWriter::Sync() {
StopWatch sync_sw(env_, statistics_, BLOB_DB_BLOB_FILE_SYNC_MICROS);
Status s = dest_->Sync(use_fsync_);
RecordTick(statistics_, BLOB_DB_BLOB_FILE_SYNCED);
return s;
}
Status Writer::WriteHeader(BlobLogHeader& header) {
Status BlobLogWriter::WriteHeader(BlobLogHeader& header) {
assert(block_offset_ == 0);
assert(last_elem_type_ == kEtNone);
std::string str;
@ -56,7 +56,7 @@ Status Writer::WriteHeader(BlobLogHeader& header) {
return s;
}
Status Writer::AppendFooter(BlobLogFooter& footer) {
Status BlobLogWriter::AppendFooter(BlobLogFooter& footer) {
assert(block_offset_ != 0);
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
@ -76,9 +76,9 @@ Status Writer::AppendFooter(BlobLogFooter& footer) {
return s;
}
Status Writer::AddRecord(const Slice& key, const Slice& val,
uint64_t expiration, uint64_t* key_offset,
uint64_t* blob_offset) {
Status BlobLogWriter::AddRecord(const Slice& key, const Slice& val,
uint64_t expiration, uint64_t* key_offset,
uint64_t* blob_offset) {
assert(block_offset_ != 0);
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
@ -89,8 +89,8 @@ Status Writer::AddRecord(const Slice& key, const Slice& val,
return s;
}
Status Writer::AddRecord(const Slice& key, const Slice& val,
uint64_t* key_offset, uint64_t* blob_offset) {
Status BlobLogWriter::AddRecord(const Slice& key, const Slice& val,
uint64_t* key_offset, uint64_t* blob_offset) {
assert(block_offset_ != 0);
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
@ -101,8 +101,8 @@ Status Writer::AddRecord(const Slice& key, const Slice& val,
return s;
}
void Writer::ConstructBlobHeader(std::string* buf, const Slice& key,
const Slice& val, uint64_t expiration) {
void BlobLogWriter::ConstructBlobHeader(std::string* buf, const Slice& key,
const Slice& val, uint64_t expiration) {
BlobLogRecord record;
record.key = key;
record.value = val;
@ -110,9 +110,10 @@ void Writer::ConstructBlobHeader(std::string* buf, const Slice& key,
record.EncodeHeaderTo(buf);
}
Status Writer::EmitPhysicalRecord(const std::string& headerbuf,
const Slice& key, const Slice& val,
uint64_t* key_offset, uint64_t* blob_offset) {
Status BlobLogWriter::EmitPhysicalRecord(const std::string& headerbuf,
const Slice& key, const Slice& val,
uint64_t* key_offset,
uint64_t* blob_offset) {
StopWatch write_sw(env_, statistics_, BLOB_DB_BLOB_FILE_WRITE_MICROS);
Status s = dest_->Append(Slice(headerbuf));
if (s.ok()) {
@ -134,6 +135,5 @@ Status Writer::EmitPhysicalRecord(const std::string& headerbuf,
return s;
}
} // namespace blob_db
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

@ -21,29 +21,27 @@ namespace ROCKSDB_NAMESPACE {
class WritableFileWriter;
namespace blob_db {
/**
* Writer is the blob log stream writer. It provides an append-only
* BlobLogWriter is the blob log stream writer. It provides an append-only
* abstraction for writing blob data.
*
*
* Look at blob_db_format.h to see the details of the record formats.
*/
class Writer {
class BlobLogWriter {
public:
// Create a writer that will append data to "*dest".
// "*dest" must be initially empty.
// "*dest" must remain live while this Writer is in use.
Writer(std::unique_ptr<WritableFileWriter>&& dest, Env* env,
Statistics* statistics, uint64_t log_number, uint64_t bpsync,
bool use_fsync, uint64_t boffset = 0);
// "*dest" must remain live while this BlobLogWriter is in use.
BlobLogWriter(std::unique_ptr<WritableFileWriter>&& dest, Env* env,
Statistics* statistics, uint64_t log_number, uint64_t bpsync,
bool use_fsync, uint64_t boffset = 0);
// No copying allowed
Writer(const Writer&) = delete;
Writer& operator=(const Writer&) = delete;
BlobLogWriter(const BlobLogWriter&) = delete;
BlobLogWriter& operator=(const BlobLogWriter&) = delete;
~Writer() = default;
~BlobLogWriter() = default;
static void ConstructBlobHeader(std::string* buf, const Slice& key,
const Slice& val, uint64_t expiration);
@ -89,6 +87,5 @@ class Writer {
ElemType last_elem_type_;
};
} // namespace blob_db
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

@ -78,7 +78,7 @@ class BlobIndexCompactionFilterBase : public LayeredCompactionFilterBase {
Statistics* statistics_;
mutable std::shared_ptr<BlobFile> blob_file_;
mutable std::shared_ptr<Writer> writer_;
mutable std::shared_ptr<BlobLogWriter> writer_;
// It is safe to not using std::atomic since the compaction filter, created
// from a compaction filter factroy, will not be called from multiple threads.

@ -742,11 +742,11 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
boffset);
}
Writer::ElemType et = Writer::kEtNone;
BlobLogWriter::ElemType et = BlobLogWriter::kEtNone;
if (bfile->file_size_ == BlobLogHeader::kSize) {
et = Writer::kEtFileHdr;
et = BlobLogWriter::kEtFileHdr;
} else if (bfile->file_size_ > BlobLogHeader::kSize) {
et = Writer::kEtRecord;
et = BlobLogWriter::kEtRecord;
} else if (bfile->file_size_) {
ROCKS_LOG_WARN(db_options_.info_log,
"Open blob file: %s with wrong size: %" PRIu64,
@ -754,7 +754,7 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
return Status::Corruption("Invalid blob file size");
}
bfile->log_writer_ = std::make_shared<Writer>(
bfile->log_writer_ = std::make_shared<BlobLogWriter>(
std::move(fwriter), env_, statistics_, bfile->file_number_,
bdb_options_.bytes_per_sync, db_options_.use_fsync, boffset);
bfile->log_writer_->last_elem_type_ = et;
@ -798,7 +798,7 @@ std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked(
Status BlobDBImpl::CheckOrCreateWriterLocked(
const std::shared_ptr<BlobFile>& blob_file,
std::shared_ptr<Writer>* writer) {
std::shared_ptr<BlobLogWriter>* writer) {
assert(writer != nullptr);
*writer = blob_file->GetWriter();
if (*writer != nullptr) {
@ -814,7 +814,7 @@ Status BlobDBImpl::CheckOrCreateWriterLocked(
Status BlobDBImpl::CreateBlobFileAndWriter(
bool has_ttl, const ExpirationRange& expiration_range,
const std::string& reason, std::shared_ptr<BlobFile>* blob_file,
std::shared_ptr<Writer>* writer) {
std::shared_ptr<BlobLogWriter>* writer) {
TEST_SYNC_POINT("BlobDBImpl::CreateBlobFileAndWriter");
assert(has_ttl == (expiration_range.first || expiration_range.second));
assert(blob_file);
@ -871,7 +871,7 @@ Status BlobDBImpl::SelectBlobFile(std::shared_ptr<BlobFile>* blob_file) {
return Status::OK();
}
std::shared_ptr<Writer> writer;
std::shared_ptr<BlobLogWriter> writer;
const Status s = CreateBlobFileAndWriter(
/* has_ttl */ false, ExpirationRange(),
/* reason */ "SelectBlobFile", blob_file, &writer);
@ -917,7 +917,7 @@ Status BlobDBImpl::SelectBlobFileTTL(uint64_t expiration,
std::ostringstream oss;
oss << "SelectBlobFileTTL range: [" << exp_low << ',' << exp_high << ')';
std::shared_ptr<Writer> writer;
std::shared_ptr<BlobLogWriter> writer;
const Status s =
CreateBlobFileAndWriter(/* has_ttl */ true, expiration_range,
/* reason */ oss.str(), blob_file, &writer);
@ -1070,7 +1070,8 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/,
Slice value_compressed = GetCompressedSlice(value, &compression_output);
std::string headerbuf;
Writer::ConstructBlobHeader(&headerbuf, key, value_compressed, expiration);
BlobLogWriter::ConstructBlobHeader(&headerbuf, key, value_compressed,
expiration);
// Check DB size limit before selecting blob file to
// Since CheckSizeAndEvictBlobFiles() can close blob files, it needs to be
@ -1342,7 +1343,7 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
uint64_t key_offset = 0;
{
WriteLock lockbfile_w(&bfile->mutex_);
std::shared_ptr<Writer> writer;
std::shared_ptr<BlobLogWriter> writer;
s = CheckOrCreateWriterLocked(bfile, &writer);
if (!s.ok()) {
return s;

@ -268,7 +268,7 @@ class BlobDBImpl : public BlobDB {
const ExpirationRange& expiration_range,
const std::string& reason,
std::shared_ptr<BlobFile>* blob_file,
std::shared_ptr<Writer>* writer);
std::shared_ptr<BlobLogWriter>* writer);
// Get the open non-TTL blob log file, or create a new one if no such file
// exists.
@ -373,10 +373,10 @@ class BlobDBImpl : public BlobDB {
// creates a sequential (append) writer for this blobfile
Status CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile);
// returns a Writer object for the file. If writer is not
// returns a BlobLogWriter object for the file. If writer is not
// already present, creates one. Needs Write Mutex to be held
Status CheckOrCreateWriterLocked(const std::shared_ptr<BlobFile>& blob_file,
std::shared_ptr<Writer>* writer);
std::shared_ptr<BlobLogWriter>* writer);
// checks if there is no snapshot which is referencing the
// blobs

@ -61,7 +61,7 @@ std::string BlobFile::PathName() const {
return BlobFileName(path_to_dir_, file_number_);
}
std::shared_ptr<Reader> BlobFile::OpenRandomAccessReader(
std::shared_ptr<BlobLogReader> BlobFile::OpenRandomAccessReader(
Env* env, const DBOptions& db_options,
const EnvOptions& env_options) const {
constexpr size_t kReadaheadSize = 2 * 1024 * 1024;
@ -78,7 +78,7 @@ std::shared_ptr<Reader> BlobFile::OpenRandomAccessReader(
sfile_reader.reset(new RandomAccessFileReader(
NewLegacyRandomAccessFileWrapper(sfile), path_name));
std::shared_ptr<Reader> log_reader = std::make_shared<Reader>(
std::shared_ptr<BlobLogReader> log_reader = std::make_shared<BlobLogReader>(
std::move(sfile_reader), db_options.env, db_options.statistics.get());
return log_reader;

@ -86,7 +86,7 @@ class BlobFile {
SequenceNumber obsolete_sequence_{0};
// Sequential/Append writer for blobs
std::shared_ptr<Writer> log_writer_;
std::shared_ptr<BlobLogWriter> log_writer_;
// random access file reader for GET calls
std::shared_ptr<RandomAccessFileReader> ra_file_reader_;
@ -208,7 +208,7 @@ class BlobFile {
CompressionType GetCompressionType() const { return compression_; }
std::shared_ptr<Writer> GetWriter() const { return log_writer_; }
std::shared_ptr<BlobLogWriter> GetWriter() const { return log_writer_; }
// Read blob file header and footer. Return corruption if file header is
// malform or incomplete. If footer is malform or incomplete, set
@ -220,7 +220,7 @@ class BlobFile {
bool* fresh_open);
private:
std::shared_ptr<Reader> OpenRandomAccessReader(
std::shared_ptr<BlobLogReader> OpenRandomAccessReader(
Env* env, const DBOptions& db_options,
const EnvOptions& env_options) const;

Loading…
Cancel
Save