From a693341604fd585619136a2e81e7a2d0fcdaee44 Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Mon, 6 Jul 2020 17:10:41 -0700 Subject: [PATCH] Move the blob file format related classes to the main namespace, rename reader/writer (#7086) Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/7086 Test Plan: `make check` Reviewed By: zhichao-cao Differential Revision: D22395420 Pulled By: ltamasi fbshipit-source-id: 088a20097bd6b73b0c433cd79725779f97ec04f2 --- db/blob/blob_log_format.cc | 2 -- db/blob/blob_log_format.h | 2 -- db/blob/blob_log_reader.cc | 15 +++++---- db/blob/blob_log_reader.h | 22 ++++++------- db/blob/blob_log_writer.cc | 36 +++++++++++----------- db/blob/blob_log_writer.h | 21 ++++++------- utilities/blob_db/blob_compaction_filter.h | 2 +- utilities/blob_db/blob_db_impl.cc | 21 +++++++------ utilities/blob_db/blob_db_impl.h | 6 ++-- utilities/blob_db/blob_file.cc | 4 +-- utilities/blob_db/blob_file.h | 6 ++-- 11 files changed, 64 insertions(+), 73 deletions(-) diff --git a/db/blob/blob_log_format.cc b/db/blob/blob_log_format.cc index efab39478..a69137046 100644 --- a/db/blob/blob_log_format.cc +++ b/db/blob/blob_log_format.cc @@ -11,7 +11,6 @@ #include "util/crc32c.h" namespace ROCKSDB_NAMESPACE { -namespace blob_db { void BlobLogHeader::EncodeTo(std::string* dst) { assert(dst != nullptr); @@ -144,6 +143,5 @@ Status BlobLogRecord::CheckBlobCRC() const { return Status::OK(); } -} // namespace blob_db } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE diff --git a/db/blob/blob_log_format.h b/db/blob/blob_log_format.h index f9ee3a22f..06ba35429 100644 --- a/db/blob/blob_log_format.h +++ b/db/blob/blob_log_format.h @@ -18,7 +18,6 @@ #include "rocksdb/types.h" namespace ROCKSDB_NAMESPACE { -namespace blob_db { constexpr uint32_t kMagicNumber = 2395959; // 0x00248f37 constexpr uint32_t kVersion1 = 1; @@ -126,6 +125,5 @@ struct BlobLogRecord { Status CheckBlobCRC() const; }; -} // namespace blob_db } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE diff --git a/db/blob/blob_log_reader.cc b/db/blob/blob_log_reader.cc index 8f075bb2e..000294c30 100644 --- a/db/blob/blob_log_reader.cc +++ b/db/blob/blob_log_reader.cc @@ -14,17 +14,17 @@ #include "util/stop_watch.h" namespace ROCKSDB_NAMESPACE { -namespace blob_db { -Reader::Reader(std::unique_ptr&& file_reader, Env* env, - Statistics* statistics) +BlobLogReader::BlobLogReader( + std::unique_ptr&& file_reader, Env* env, + Statistics* statistics) : file_(std::move(file_reader)), env_(env), statistics_(statistics), buffer_(), next_byte_(0) {} -Status Reader::ReadSlice(uint64_t size, Slice* slice, char* buf) { +Status BlobLogReader::ReadSlice(uint64_t size, Slice* slice, char* buf) { StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS); Status s = file_->Read(IOOptions(), next_byte_, static_cast(size), slice, buf, nullptr); @@ -39,7 +39,7 @@ Status Reader::ReadSlice(uint64_t size, Slice* slice, char* buf) { return s; } -Status Reader::ReadHeader(BlobLogHeader* header) { +Status BlobLogReader::ReadHeader(BlobLogHeader* header) { assert(file_.get() != nullptr); assert(next_byte_ == 0); Status s = ReadSlice(BlobLogHeader::kSize, &buffer_, header_buf_); @@ -54,8 +54,8 @@ Status Reader::ReadHeader(BlobLogHeader* header) { return header->DecodeFrom(buffer_); } -Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level, - uint64_t* blob_offset) { +Status BlobLogReader::ReadRecord(BlobLogRecord* record, ReadLevel level, + uint64_t* blob_offset) { Status s = ReadSlice(BlobLogRecord::kHeaderSize, &buffer_, header_buf_); if (!s.ok()) { return s; @@ -101,6 +101,5 @@ Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level, return s; } -} // namespace blob_db } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE diff --git a/db/blob/blob_log_reader.h b/db/blob/blob_log_reader.h index 1cbda4e64..937bbf3e4 100644 --- a/db/blob/blob_log_reader.h +++ b/db/blob/blob_log_reader.h @@ -22,15 +22,14 @@ namespace ROCKSDB_NAMESPACE { class SequentialFileReader; class Logger; -namespace blob_db { - /** - * Reader is a general purpose log stream reader implementation. The actual job - * of reading from the device is implemented by the SequentialFile interface. + * BlobLogReader is a general purpose log stream reader implementation. The + * actual job of reading from the device is implemented by the SequentialFile + * interface. * * Please see Writer for details on the file and record layout. */ -class Reader { +class BlobLogReader { public: enum ReadLevel { kReadHeader, @@ -39,14 +38,14 @@ class Reader { }; // Create a reader that will return log records from "*file". - // "*file" must remain live while this Reader is in use. - Reader(std::unique_ptr&& file_reader, Env* env, - Statistics* statistics); + // "*file" must remain live while this BlobLogReader is in use. + BlobLogReader(std::unique_ptr&& file_reader, Env* env, + Statistics* statistics); // No copying allowed - Reader(const Reader&) = delete; - Reader& operator=(const Reader&) = delete; + BlobLogReader(const BlobLogReader&) = delete; + BlobLogReader& operator=(const BlobLogReader&) = delete; - ~Reader() = default; + ~BlobLogReader() = default; Status ReadHeader(BlobLogHeader* header); @@ -77,6 +76,5 @@ class Reader { uint64_t next_byte_; }; -} // namespace blob_db } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE diff --git a/db/blob/blob_log_writer.cc b/db/blob/blob_log_writer.cc index a7b3201c4..8d92220cd 100644 --- a/db/blob/blob_log_writer.cc +++ b/db/blob/blob_log_writer.cc @@ -17,11 +17,11 @@ #include "util/stop_watch.h" namespace ROCKSDB_NAMESPACE { -namespace blob_db { -Writer::Writer(std::unique_ptr&& dest, Env* env, - Statistics* statistics, uint64_t log_number, uint64_t bpsync, - bool use_fs, uint64_t boffset) +BlobLogWriter::BlobLogWriter(std::unique_ptr&& dest, + Env* env, Statistics* statistics, + uint64_t log_number, uint64_t bpsync, bool use_fs, + uint64_t boffset) : dest_(std::move(dest)), env_(env), statistics_(statistics), @@ -32,14 +32,14 @@ Writer::Writer(std::unique_ptr&& dest, Env* env, use_fsync_(use_fs), last_elem_type_(kEtNone) {} -Status Writer::Sync() { +Status BlobLogWriter::Sync() { StopWatch sync_sw(env_, statistics_, BLOB_DB_BLOB_FILE_SYNC_MICROS); Status s = dest_->Sync(use_fsync_); RecordTick(statistics_, BLOB_DB_BLOB_FILE_SYNCED); return s; } -Status Writer::WriteHeader(BlobLogHeader& header) { +Status BlobLogWriter::WriteHeader(BlobLogHeader& header) { assert(block_offset_ == 0); assert(last_elem_type_ == kEtNone); std::string str; @@ -56,7 +56,7 @@ Status Writer::WriteHeader(BlobLogHeader& header) { return s; } -Status Writer::AppendFooter(BlobLogFooter& footer) { +Status BlobLogWriter::AppendFooter(BlobLogFooter& footer) { assert(block_offset_ != 0); assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord); @@ -76,9 +76,9 @@ Status Writer::AppendFooter(BlobLogFooter& footer) { return s; } -Status Writer::AddRecord(const Slice& key, const Slice& val, - uint64_t expiration, uint64_t* key_offset, - uint64_t* blob_offset) { +Status BlobLogWriter::AddRecord(const Slice& key, const Slice& val, + uint64_t expiration, uint64_t* key_offset, + uint64_t* blob_offset) { assert(block_offset_ != 0); assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord); @@ -89,8 +89,8 @@ Status Writer::AddRecord(const Slice& key, const Slice& val, return s; } -Status Writer::AddRecord(const Slice& key, const Slice& val, - uint64_t* key_offset, uint64_t* blob_offset) { +Status BlobLogWriter::AddRecord(const Slice& key, const Slice& val, + uint64_t* key_offset, uint64_t* blob_offset) { assert(block_offset_ != 0); assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord); @@ -101,8 +101,8 @@ Status Writer::AddRecord(const Slice& key, const Slice& val, return s; } -void Writer::ConstructBlobHeader(std::string* buf, const Slice& key, - const Slice& val, uint64_t expiration) { +void BlobLogWriter::ConstructBlobHeader(std::string* buf, const Slice& key, + const Slice& val, uint64_t expiration) { BlobLogRecord record; record.key = key; record.value = val; @@ -110,9 +110,10 @@ void Writer::ConstructBlobHeader(std::string* buf, const Slice& key, record.EncodeHeaderTo(buf); } -Status Writer::EmitPhysicalRecord(const std::string& headerbuf, - const Slice& key, const Slice& val, - uint64_t* key_offset, uint64_t* blob_offset) { +Status BlobLogWriter::EmitPhysicalRecord(const std::string& headerbuf, + const Slice& key, const Slice& val, + uint64_t* key_offset, + uint64_t* blob_offset) { StopWatch write_sw(env_, statistics_, BLOB_DB_BLOB_FILE_WRITE_MICROS); Status s = dest_->Append(Slice(headerbuf)); if (s.ok()) { @@ -134,6 +135,5 @@ Status Writer::EmitPhysicalRecord(const std::string& headerbuf, return s; } -} // namespace blob_db } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE diff --git a/db/blob/blob_log_writer.h b/db/blob/blob_log_writer.h index 85eb72436..d529d2771 100644 --- a/db/blob/blob_log_writer.h +++ b/db/blob/blob_log_writer.h @@ -21,29 +21,27 @@ namespace ROCKSDB_NAMESPACE { class WritableFileWriter; -namespace blob_db { - /** - * Writer is the blob log stream writer. It provides an append-only + * BlobLogWriter is the blob log stream writer. It provides an append-only * abstraction for writing blob data. * * * Look at blob_db_format.h to see the details of the record formats. */ -class Writer { +class BlobLogWriter { public: // Create a writer that will append data to "*dest". // "*dest" must be initially empty. - // "*dest" must remain live while this Writer is in use. - Writer(std::unique_ptr&& dest, Env* env, - Statistics* statistics, uint64_t log_number, uint64_t bpsync, - bool use_fsync, uint64_t boffset = 0); + // "*dest" must remain live while this BlobLogWriter is in use. + BlobLogWriter(std::unique_ptr&& dest, Env* env, + Statistics* statistics, uint64_t log_number, uint64_t bpsync, + bool use_fsync, uint64_t boffset = 0); // No copying allowed - Writer(const Writer&) = delete; - Writer& operator=(const Writer&) = delete; + BlobLogWriter(const BlobLogWriter&) = delete; + BlobLogWriter& operator=(const BlobLogWriter&) = delete; - ~Writer() = default; + ~BlobLogWriter() = default; static void ConstructBlobHeader(std::string* buf, const Slice& key, const Slice& val, uint64_t expiration); @@ -89,6 +87,5 @@ class Writer { ElemType last_elem_type_; }; -} // namespace blob_db } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE diff --git a/utilities/blob_db/blob_compaction_filter.h b/utilities/blob_db/blob_compaction_filter.h index bf54a4434..ab1be03d7 100644 --- a/utilities/blob_db/blob_compaction_filter.h +++ b/utilities/blob_db/blob_compaction_filter.h @@ -78,7 +78,7 @@ class BlobIndexCompactionFilterBase : public LayeredCompactionFilterBase { Statistics* statistics_; mutable std::shared_ptr blob_file_; - mutable std::shared_ptr writer_; + mutable std::shared_ptr writer_; // It is safe to not using std::atomic since the compaction filter, created // from a compaction filter factroy, will not be called from multiple threads. diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index e80e1f1a4..24df81f74 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -742,11 +742,11 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr& bfile) { boffset); } - Writer::ElemType et = Writer::kEtNone; + BlobLogWriter::ElemType et = BlobLogWriter::kEtNone; if (bfile->file_size_ == BlobLogHeader::kSize) { - et = Writer::kEtFileHdr; + et = BlobLogWriter::kEtFileHdr; } else if (bfile->file_size_ > BlobLogHeader::kSize) { - et = Writer::kEtRecord; + et = BlobLogWriter::kEtRecord; } else if (bfile->file_size_) { ROCKS_LOG_WARN(db_options_.info_log, "Open blob file: %s with wrong size: %" PRIu64, @@ -754,7 +754,7 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr& bfile) { return Status::Corruption("Invalid blob file size"); } - bfile->log_writer_ = std::make_shared( + bfile->log_writer_ = std::make_shared( std::move(fwriter), env_, statistics_, bfile->file_number_, bdb_options_.bytes_per_sync, db_options_.use_fsync, boffset); bfile->log_writer_->last_elem_type_ = et; @@ -798,7 +798,7 @@ std::shared_ptr BlobDBImpl::FindBlobFileLocked( Status BlobDBImpl::CheckOrCreateWriterLocked( const std::shared_ptr& blob_file, - std::shared_ptr* writer) { + std::shared_ptr* writer) { assert(writer != nullptr); *writer = blob_file->GetWriter(); if (*writer != nullptr) { @@ -814,7 +814,7 @@ Status BlobDBImpl::CheckOrCreateWriterLocked( Status BlobDBImpl::CreateBlobFileAndWriter( bool has_ttl, const ExpirationRange& expiration_range, const std::string& reason, std::shared_ptr* blob_file, - std::shared_ptr* writer) { + std::shared_ptr* writer) { TEST_SYNC_POINT("BlobDBImpl::CreateBlobFileAndWriter"); assert(has_ttl == (expiration_range.first || expiration_range.second)); assert(blob_file); @@ -871,7 +871,7 @@ Status BlobDBImpl::SelectBlobFile(std::shared_ptr* blob_file) { return Status::OK(); } - std::shared_ptr writer; + std::shared_ptr writer; const Status s = CreateBlobFileAndWriter( /* has_ttl */ false, ExpirationRange(), /* reason */ "SelectBlobFile", blob_file, &writer); @@ -917,7 +917,7 @@ Status BlobDBImpl::SelectBlobFileTTL(uint64_t expiration, std::ostringstream oss; oss << "SelectBlobFileTTL range: [" << exp_low << ',' << exp_high << ')'; - std::shared_ptr writer; + std::shared_ptr writer; const Status s = CreateBlobFileAndWriter(/* has_ttl */ true, expiration_range, /* reason */ oss.str(), blob_file, &writer); @@ -1070,7 +1070,8 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/, Slice value_compressed = GetCompressedSlice(value, &compression_output); std::string headerbuf; - Writer::ConstructBlobHeader(&headerbuf, key, value_compressed, expiration); + BlobLogWriter::ConstructBlobHeader(&headerbuf, key, value_compressed, + expiration); // Check DB size limit before selecting blob file to // Since CheckSizeAndEvictBlobFiles() can close blob files, it needs to be @@ -1342,7 +1343,7 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr& bfile, uint64_t key_offset = 0; { WriteLock lockbfile_w(&bfile->mutex_); - std::shared_ptr writer; + std::shared_ptr writer; s = CheckOrCreateWriterLocked(bfile, &writer); if (!s.ok()) { return s; diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index e53321564..dbe8584c4 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -268,7 +268,7 @@ class BlobDBImpl : public BlobDB { const ExpirationRange& expiration_range, const std::string& reason, std::shared_ptr* blob_file, - std::shared_ptr* writer); + std::shared_ptr* writer); // Get the open non-TTL blob log file, or create a new one if no such file // exists. @@ -373,10 +373,10 @@ class BlobDBImpl : public BlobDB { // creates a sequential (append) writer for this blobfile Status CreateWriterLocked(const std::shared_ptr& bfile); - // returns a Writer object for the file. If writer is not + // returns a BlobLogWriter object for the file. If writer is not // already present, creates one. Needs Write Mutex to be held Status CheckOrCreateWriterLocked(const std::shared_ptr& blob_file, - std::shared_ptr* writer); + std::shared_ptr* writer); // checks if there is no snapshot which is referencing the // blobs diff --git a/utilities/blob_db/blob_file.cc b/utilities/blob_db/blob_file.cc index af5742f15..dd83346df 100644 --- a/utilities/blob_db/blob_file.cc +++ b/utilities/blob_db/blob_file.cc @@ -61,7 +61,7 @@ std::string BlobFile::PathName() const { return BlobFileName(path_to_dir_, file_number_); } -std::shared_ptr BlobFile::OpenRandomAccessReader( +std::shared_ptr BlobFile::OpenRandomAccessReader( Env* env, const DBOptions& db_options, const EnvOptions& env_options) const { constexpr size_t kReadaheadSize = 2 * 1024 * 1024; @@ -78,7 +78,7 @@ std::shared_ptr BlobFile::OpenRandomAccessReader( sfile_reader.reset(new RandomAccessFileReader( NewLegacyRandomAccessFileWrapper(sfile), path_name)); - std::shared_ptr log_reader = std::make_shared( + std::shared_ptr log_reader = std::make_shared( std::move(sfile_reader), db_options.env, db_options.statistics.get()); return log_reader; diff --git a/utilities/blob_db/blob_file.h b/utilities/blob_db/blob_file.h index 217ba7422..15b838b00 100644 --- a/utilities/blob_db/blob_file.h +++ b/utilities/blob_db/blob_file.h @@ -86,7 +86,7 @@ class BlobFile { SequenceNumber obsolete_sequence_{0}; // Sequential/Append writer for blobs - std::shared_ptr log_writer_; + std::shared_ptr log_writer_; // random access file reader for GET calls std::shared_ptr ra_file_reader_; @@ -208,7 +208,7 @@ class BlobFile { CompressionType GetCompressionType() const { return compression_; } - std::shared_ptr GetWriter() const { return log_writer_; } + std::shared_ptr GetWriter() const { return log_writer_; } // Read blob file header and footer. Return corruption if file header is // malform or incomplete. If footer is malform or incomplete, set @@ -220,7 +220,7 @@ class BlobFile { bool* fresh_open); private: - std::shared_ptr OpenRandomAccessReader( + std::shared_ptr OpenRandomAccessReader( Env* env, const DBOptions& db_options, const EnvOptions& env_options) const;