From 72daa92d3a47d6cc19c553bb51052d991f965125 Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Tue, 26 Nov 2019 13:16:39 -0800 Subject: [PATCH] Refactor blob file creation logic (#6066) Summary: The patch refactors and cleans up the logic around creating new blob files by moving the common code of `SelectBlobFile` and `SelectBlobFileTTL` to a new helper method `CreateBlobFileAndWriter`, bringing the implementation of `SelectBlobFile` and `SelectBlobFileTTL` into sync, and increasing encapsulation by adding new constructors for `BlobFile` and `BlobLogHeader`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6066 Test Plan: Ran `make check` and used the BlobDB mode of `db_bench` to sanity test both the TTL and the non-TTL code paths. Differential Revision: D18646921 Pulled By: ltamasi fbshipit-source-id: e5705a84807932e31dccab4f49b3e64369cea26d --- utilities/blob_db/blob_db_impl.cc | 193 +++++++++++++--------------- utilities/blob_db/blob_db_impl.h | 25 ++-- utilities/blob_db/blob_file.cc | 46 ++----- utilities/blob_db/blob_file.h | 53 ++++---- utilities/blob_db/blob_log_format.h | 10 +- 5 files changed, 151 insertions(+), 176 deletions(-) diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 4d259eab1..3403c345c 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -84,7 +84,6 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname, statistics_(db_options_.statistics.get()), next_file_number_(1), flush_sequence_(0), - epoch_of_(0), closed_(true), open_file_count_(0), total_blob_size_(0), @@ -584,14 +583,24 @@ Status BlobDBImpl::GetBlobFileReader( return s; } -std::shared_ptr BlobDBImpl::NewBlobFile(const std::string& reason) { +std::shared_ptr BlobDBImpl::NewBlobFile( + bool has_ttl, const ExpirationRange& expiration_range, + const std::string& reason) { + assert(has_ttl == (expiration_range.first || expiration_range.second)); + uint64_t file_num = next_file_number_++; - auto bfile = std::make_shared(this, blob_dir_, file_num, - db_options_.info_log.get()); + + const uint32_t column_family_id = + static_cast(DefaultColumnFamily())->GetID(); + auto blob_file = std::make_shared( + this, blob_dir_, file_num, db_options_.info_log.get(), column_family_id, + bdb_options_.compression, has_ttl, expiration_range); + ROCKS_LOG_DEBUG(db_options_.info_log, "New blob file created: %s reason='%s'", - bfile->PathName().c_str(), reason.c_str()); + blob_file->PathName().c_str(), reason.c_str()); LogFlush(db_options_.info_log); - return bfile; + + return blob_file; } Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr& bfile) { @@ -687,47 +696,29 @@ Status BlobDBImpl::CheckOrCreateWriterLocked( return s; } -Status BlobDBImpl::SelectBlobFile(std::shared_ptr* blob_file) { - assert(blob_file != nullptr); - { - ReadLock rl(&mutex_); - if (open_non_ttl_file_ != nullptr) { - *blob_file = open_non_ttl_file_; - return Status::OK(); - } - } - - // CHECK again - WriteLock wl(&mutex_); - if (open_non_ttl_file_ != nullptr) { - *blob_file = open_non_ttl_file_; - return Status::OK(); - } +Status BlobDBImpl::CreateBlobFileAndWriter( + bool has_ttl, const ExpirationRange& expiration_range, + const std::string& reason, std::shared_ptr* blob_file, + std::shared_ptr* writer) { + assert(has_ttl == (expiration_range.first || expiration_range.second)); + assert(blob_file); + assert(writer); - *blob_file = NewBlobFile("SelectBlobFile"); - assert(*blob_file != nullptr); + *blob_file = NewBlobFile(has_ttl, expiration_range, reason); + assert(*blob_file); // file not visible, hence no lock - std::shared_ptr writer; - Status s = CheckOrCreateWriterLocked(*blob_file, &writer); + Status s = CheckOrCreateWriterLocked(*blob_file, writer); if (!s.ok()) { ROCKS_LOG_ERROR(db_options_.info_log, - "Failed to get writer from blob file: %s, error: %s", + "Failed to get writer for blob file: %s, error: %s", (*blob_file)->PathName().c_str(), s.ToString().c_str()); return s; } - (*blob_file)->file_size_ = BlobLogHeader::kSize; - (*blob_file)->header_.compression = bdb_options_.compression; - (*blob_file)->header_.has_ttl = false; - (*blob_file)->header_.column_family_id = - reinterpret_cast(DefaultColumnFamily())->GetID(); - (*blob_file)->header_valid_ = true; - (*blob_file)->SetColumnFamilyId((*blob_file)->header_.column_family_id); - (*blob_file)->SetHasTTL(false); - (*blob_file)->SetCompression(bdb_options_.compression); + assert(*writer); - s = writer->WriteHeader((*blob_file)->header_); + s = (*writer)->WriteHeader((*blob_file)->header_); if (!s.ok()) { ROCKS_LOG_ERROR(db_options_.info_log, "Failed to write header to new blob file: %s" @@ -736,93 +727,92 @@ Status BlobDBImpl::SelectBlobFile(std::shared_ptr* blob_file) { return s; } - blob_files_.insert( - std::make_pair((*blob_file)->BlobFileNumber(), *blob_file)); - open_non_ttl_file_ = *blob_file; + (*blob_file)->SetFileSize(BlobLogHeader::kSize); total_blob_size_ += BlobLogHeader::kSize; + return s; } -Status BlobDBImpl::SelectBlobFileTTL(uint64_t expiration, - std::shared_ptr* blob_file) { - assert(blob_file != nullptr); - assert(expiration != kNoExpiration); - uint64_t epoch_read = 0; +Status BlobDBImpl::SelectBlobFile(std::shared_ptr* blob_file) { + assert(blob_file); + { ReadLock rl(&mutex_); - *blob_file = FindBlobFileLocked(expiration); - epoch_read = epoch_of_.load(); - } - if (*blob_file != nullptr) { - assert(!(*blob_file)->Immutable()); - return Status::OK(); + if (open_non_ttl_file_) { + assert(!open_non_ttl_file_->Immutable()); + *blob_file = open_non_ttl_file_; + return Status::OK(); + } } - uint64_t exp_low = - (expiration / bdb_options_.ttl_range_secs) * bdb_options_.ttl_range_secs; - uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs; - ExpirationRange expiration_range = std::make_pair(exp_low, exp_high); - - *blob_file = NewBlobFile("SelectBlobFileTTL"); - assert(*blob_file != nullptr); + // Check again + WriteLock wl(&mutex_); - ROCKS_LOG_INFO(db_options_.info_log, - "New blob file TTL range: %s %" PRIu64 " %" PRIu64, - (*blob_file)->PathName().c_str(), exp_low, exp_high); - LogFlush(db_options_.info_log); + if (open_non_ttl_file_) { + assert(!open_non_ttl_file_->Immutable()); + *blob_file = open_non_ttl_file_; + return Status::OK(); + } - // we don't need to take lock as no other thread is seeing bfile yet std::shared_ptr writer; - Status s = CheckOrCreateWriterLocked(*blob_file, &writer); + const Status s = CreateBlobFileAndWriter( + /* has_ttl */ false, ExpirationRange(), + /* reason */ "SelectBlobFile", blob_file, &writer); if (!s.ok()) { - ROCKS_LOG_ERROR( - db_options_.info_log, - "Failed to get writer from blob file with TTL: %s, error: %s", - (*blob_file)->PathName().c_str(), s.ToString().c_str()); return s; } - (*blob_file)->header_.expiration_range = expiration_range; - (*blob_file)->header_.compression = bdb_options_.compression; - (*blob_file)->header_.has_ttl = true; - (*blob_file)->header_.column_family_id = - reinterpret_cast(DefaultColumnFamily())->GetID(); - (*blob_file)->header_valid_ = true; - (*blob_file)->SetColumnFamilyId((*blob_file)->header_.column_family_id); - (*blob_file)->SetHasTTL(true); - (*blob_file)->SetCompression(bdb_options_.compression); - (*blob_file)->file_size_ = BlobLogHeader::kSize; + blob_files_.insert(std::map>::value_type( + (*blob_file)->BlobFileNumber(), *blob_file)); + open_non_ttl_file_ = *blob_file; - // set the first value of the range, since that is - // concrete at this time. also necessary to add to open_ttl_files_ - (*blob_file)->expiration_range_ = expiration_range; + return s; +} - WriteLock wl(&mutex_); - // in case the epoch has shifted in the interim, then check - // check condition again - should be rare. - if (epoch_of_.load() != epoch_read) { - std::shared_ptr blob_file2 = FindBlobFileLocked(expiration); - if (blob_file2 != nullptr) { - *blob_file = std::move(blob_file2); +Status BlobDBImpl::SelectBlobFileTTL(uint64_t expiration, + std::shared_ptr* blob_file) { + assert(blob_file); + assert(expiration != kNoExpiration); + + { + ReadLock rl(&mutex_); + + *blob_file = FindBlobFileLocked(expiration); + if (*blob_file != nullptr) { + assert(!(*blob_file)->Immutable()); return Status::OK(); } } - s = writer->WriteHeader((*blob_file)->header_); + // Check again + WriteLock wl(&mutex_); + + *blob_file = FindBlobFileLocked(expiration); + if (*blob_file != nullptr) { + assert(!(*blob_file)->Immutable()); + return Status::OK(); + } + + const uint64_t exp_low = + (expiration / bdb_options_.ttl_range_secs) * bdb_options_.ttl_range_secs; + const uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs; + const ExpirationRange expiration_range(exp_low, exp_high); + + std::ostringstream oss; + oss << "SelectBlobFileTTL range: [" << exp_low << ',' << exp_high << ')'; + + std::shared_ptr writer; + const Status s = + CreateBlobFileAndWriter(/* has_ttl */ true, expiration_range, + /* reason */ oss.str(), blob_file, &writer); if (!s.ok()) { - ROCKS_LOG_ERROR(db_options_.info_log, - "Failed to write header to new blob file: %s" - " status: '%s'", - (*blob_file)->PathName().c_str(), s.ToString().c_str()); return s; } - blob_files_.insert( - std::make_pair((*blob_file)->BlobFileNumber(), *blob_file)); + blob_files_.insert(std::map>::value_type( + (*blob_file)->BlobFileNumber(), *blob_file)); open_ttl_files_.insert(*blob_file); - total_blob_size_ += BlobLogHeader::kSize; - epoch_of_++; return s; } @@ -1954,7 +1944,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, // new file std::string reason("GC of "); reason += bfptr->PathName(); - newfile = NewBlobFile(reason); + newfile = NewBlobFile(bfptr->HasTTL(), bfptr->expiration_range_, reason); s = CheckOrCreateWriterLocked(newfile, &new_writer); if (!s.ok()) { @@ -1963,14 +1953,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, newfile->PathName().c_str(), s.ToString().c_str()); break; } - // Can't use header beyond this point - newfile->header_ = std::move(header); - newfile->header_valid_ = true; newfile->file_size_ = BlobLogHeader::kSize; - newfile->SetColumnFamilyId(bfptr->column_family_id()); - newfile->SetHasTTL(bfptr->HasTTL()); - newfile->SetCompression(bfptr->compression()); - newfile->expiration_range_ = bfptr->expiration_range_; s = new_writer->WriteHeader(newfile->header_); if (!s.ok()) { diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 17c97abf9..7e9d57204 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -264,14 +264,22 @@ class BlobDBImpl : public BlobDB { const Slice& value, uint64_t expiration, std::string* index_entry); - // find an existing blob log file based on the expiration unix epoch - // if such a file does not exist, return nullptr + // Create a new blob file and associated writer. + Status CreateBlobFileAndWriter(bool has_ttl, + const ExpirationRange& expiration_range, + const std::string& reason, + std::shared_ptr* blob_file, + std::shared_ptr* writer); + + // Get the open non-TTL blob log file, or create a new one if no such file + // exists. + Status SelectBlobFile(std::shared_ptr* blob_file); + + // Get the open TTL blob log file for a certain expiration, or create a new + // one if no such file exists. Status SelectBlobFileTTL(uint64_t expiration, std::shared_ptr* blob_file); - // find an existing blob log file to append the value to - Status SelectBlobFile(std::shared_ptr* blob_file); - std::shared_ptr FindBlobFileLocked(uint64_t expiration) const; // periodic sanity check. Bunch of checks @@ -300,7 +308,9 @@ class BlobDBImpl : public BlobDB { void StartBackgroundTasks(); // add a new Blob File - std::shared_ptr NewBlobFile(const std::string& reason); + std::shared_ptr NewBlobFile(bool has_ttl, + const ExpirationRange& expiration_range, + const std::string& reason); // collect all the blob log files from the blob directory Status GetAllBlobFiles(std::set* file_numbers); @@ -434,9 +444,6 @@ class BlobDBImpl : public BlobDB { // The largest sequence number that has been flushed. SequenceNumber flush_sequence_; - // epoch or version of the open files. - std::atomic epoch_of_; - // opened non-TTL blob file. std::shared_ptr open_non_ttl_file_; diff --git a/utilities/blob_db/blob_file.cc b/utilities/blob_db/blob_file.cc index a3a037ac8..f7a0e7de8 100644 --- a/utilities/blob_db/blob_file.cc +++ b/utilities/blob_db/blob_file.cc @@ -10,7 +10,6 @@ #include #include -#include #include #include "db/column_family.h" @@ -25,45 +24,24 @@ namespace rocksdb { namespace blob_db { -BlobFile::BlobFile() - : parent_(nullptr), - file_number_(0), - info_log_(nullptr), - column_family_id_(std::numeric_limits::max()), - compression_(kNoCompression), - has_ttl_(false), - blob_count_(0), - file_size_(0), - closed_(false), - immutable_sequence_(0), - obsolete_(false), - obsolete_sequence_(0), - expiration_range_({0, 0}), - last_access_(-1), - last_fsync_(0), - header_valid_(false), - footer_valid_(false) {} - BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn, Logger* info_log) + : parent_(p), path_to_dir_(bdir), file_number_(fn), info_log_(info_log) {} + +BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn, + Logger* info_log, uint32_t column_family_id, + CompressionType compression, bool has_ttl, + const ExpirationRange& expiration_range) : parent_(p), path_to_dir_(bdir), file_number_(fn), info_log_(info_log), - column_family_id_(std::numeric_limits::max()), - compression_(kNoCompression), - has_ttl_(false), - blob_count_(0), - file_size_(0), - closed_(false), - immutable_sequence_(0), - obsolete_(false), - obsolete_sequence_(0), - expiration_range_({0, 0}), - last_access_(-1), - last_fsync_(0), - header_valid_(false), - footer_valid_(false) {} + column_family_id_(column_family_id), + compression_(compression), + has_ttl_(has_ttl), + expiration_range_(expiration_range), + header_(column_family_id, compression, has_ttl, expiration_range), + header_valid_(true) {} BlobFile::~BlobFile() { if (obsolete_) { diff --git a/utilities/blob_db/blob_file.h b/utilities/blob_db/blob_file.h index 06ada8635..485b2e985 100644 --- a/utilities/blob_db/blob_file.h +++ b/utilities/blob_db/blob_file.h @@ -6,6 +6,7 @@ #ifndef ROCKSDB_LITE #include +#include #include #include @@ -29,7 +30,7 @@ class BlobFile { private: // access to parent - const BlobDBImpl* parent_; + const BlobDBImpl* parent_{nullptr}; // path to blob directory std::string path_to_dir_; @@ -37,49 +38,50 @@ class BlobFile { // the id of the file. // the above 2 are created during file creation and never changed // after that - uint64_t file_number_; + uint64_t file_number_{0}; // The file numbers of the SST files whose oldest blob file reference // points to this blob file. std::unordered_set linked_sst_files_; // Info log. - Logger* info_log_; + Logger* info_log_{nullptr}; // Column family id. - uint32_t column_family_id_; + uint32_t column_family_id_{std::numeric_limits::max()}; // Compression type of blobs in the file - CompressionType compression_; + CompressionType compression_{kNoCompression}; // If true, the keys in this file all has TTL. Otherwise all keys don't // have TTL. - bool has_ttl_; + bool has_ttl_{false}; + + // TTL range of blobs in the file. + ExpirationRange expiration_range_; // number of blobs in the file - std::atomic blob_count_; + std::atomic blob_count_{0}; // size of the file - std::atomic file_size_; + std::atomic file_size_{0}; BlobLogHeader header_; // closed_ = true implies the file is no more mutable // no more blobs will be appended and the footer has been written out - std::atomic closed_; + std::atomic closed_{false}; // The latest sequence number when the file was closed/made immutable. - SequenceNumber immutable_sequence_; + SequenceNumber immutable_sequence_{0}; // has a pass of garbage collection successfully finished on this file // obsolete_ still needs to do iterator/snapshot checks - std::atomic obsolete_; + std::atomic obsolete_{false}; // The last sequence number by the time the file marked as obsolete. // Data in this file is visible to a snapshot taken before the sequence. - SequenceNumber obsolete_sequence_; - - ExpirationRange expiration_range_; + SequenceNumber obsolete_sequence_{0}; // Sequential/Append writer for blobs std::shared_ptr log_writer_; @@ -92,29 +94,30 @@ class BlobFile { mutable port::RWMutex mutex_; // time when the random access reader was last created. - std::atomic last_access_; + std::atomic last_access_{-1}; // last time file was fsync'd/fdatasyncd - std::atomic last_fsync_; + std::atomic last_fsync_{0}; - bool header_valid_; + bool header_valid_{false}; - bool footer_valid_; + bool footer_valid_{false}; public: - BlobFile(); + BlobFile() = default; BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum, Logger* info_log); + BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum, + Logger* info_log, uint32_t column_family_id, + CompressionType compression, bool has_ttl, + const ExpirationRange& expiration_range); + ~BlobFile(); uint32_t column_family_id() const; - void SetColumnFamilyId(uint32_t cf_id) { - column_family_id_ = cf_id; - } - // Returns log file's absolute pathname. std::string PathName() const; @@ -203,10 +206,6 @@ class BlobFile { CompressionType compression() const { return compression_; } - void SetCompression(CompressionType c) { - compression_ = c; - } - std::shared_ptr GetWriter() const { return log_writer_; } // Read blob file header and footer. Return corruption if file header is diff --git a/utilities/blob_db/blob_log_format.h b/utilities/blob_db/blob_log_format.h index fcc042f06..34eacd5e0 100644 --- a/utilities/blob_db/blob_log_format.h +++ b/utilities/blob_db/blob_log_format.h @@ -43,11 +43,19 @@ using ExpirationRange = std::pair; struct BlobLogHeader { static constexpr size_t kSize = 30; + BlobLogHeader() = default; + BlobLogHeader(uint32_t _column_family_id, CompressionType _compression, + bool _has_ttl, const ExpirationRange& _expiration_range) + : column_family_id(_column_family_id), + compression(_compression), + has_ttl(_has_ttl), + expiration_range(_expiration_range) {} + uint32_t version = kVersion1; uint32_t column_family_id = 0; CompressionType compression = kNoCompression; bool has_ttl = false; - ExpirationRange expiration_range = std::make_pair(0, 0); + ExpirationRange expiration_range; void EncodeTo(std::string* dst);