From f6082d194439c0fc3992ab343d63e2389f867ee5 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Tue, 31 Oct 2017 16:33:55 -0700 Subject: [PATCH] Blob DB: cleanup unused options Summary: * cleanup num_concurrent_simple_blobs. We don't do concurrent writes (by taking write_mutex_) so it doesn't make sense to have multiple non TTL files open. We can revisit later when we want to improve writes. * cleanup eviction callback. we don't have plan to use it now. * rename s/open_simple_blob_files_/open_non_ttl_file_/ and s/open_blob_files_/open_ttl_files_/ to avoid confusion. Closes https://github.com/facebook/rocksdb/pull/3088 Differential Revision: D6182598 Pulled By: yiwu-arbug fbshipit-source-id: 99e6f5e01fa66d31309cdb06ce48502464bac6ad --- utilities/blob_db/blob_db.cc | 2 - utilities/blob_db/blob_db.h | 9 -- utilities/blob_db/blob_db_impl.cc | 162 ++++++++---------------------- utilities/blob_db/blob_db_impl.h | 17 +--- utilities/blob_db/blob_db_test.cc | 1 - 5 files changed, 44 insertions(+), 147 deletions(-) diff --git a/utilities/blob_db/blob_db.cc b/utilities/blob_db/blob_db.cc index 947840751..1fd926141 100644 --- a/utilities/blob_db/blob_db.cc +++ b/utilities/blob_db/blob_db.cc @@ -186,8 +186,6 @@ void BlobDBOptions::Dump(Logger* log) const { bytes_per_sync); ROCKS_LOG_HEADER(log, " blob_db_options.blob_file_size: %" PRIu64, blob_file_size); - ROCKS_LOG_HEADER(log, "blob_db_options.num_concurrent_simple_blobs: %" PRIu32, - num_concurrent_simple_blobs); ROCKS_LOG_HEADER(log, " blob_db_options.ttl_extractor: %p", ttl_extractor.get()); ROCKS_LOG_HEADER(log, " blob_db_options.compression: %d", diff --git a/utilities/blob_db/blob_db.h b/utilities/blob_db/blob_db.h index 76ab95555..1ef382ab8 100644 --- a/utilities/blob_db/blob_db.h +++ b/utilities/blob_db/blob_db.h @@ -63,20 +63,11 @@ struct BlobDBOptions { // after it exceeds that size uint64_t blob_file_size = 256 * 1024 * 1024; - // how many files to use for simple blobs at one time - uint32_t num_concurrent_simple_blobs = 1; - // Instead of setting TTL explicitly by calling PutWithTTL or PutUntil, // applications can set a TTLExtractor which can extract TTL from key-value // pairs. std::shared_ptr ttl_extractor = nullptr; - // eviction callback. - // this function will be called for every blob that is getting - // evicted. - std::function - gc_evict_cb_fn; - // what compression to use for Blob's CompressionType compression = kNoCompression; diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 602fc171c..af64b4188 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -413,7 +413,7 @@ Status BlobDBImpl::OpenAllFiles() { expiration_range.second); } } else { - open_blob_files_.insert(bfptr); + open_ttl_files_.insert(bfptr); } } } @@ -493,23 +493,23 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr& bfile) { std::shared_ptr BlobDBImpl::FindBlobFileLocked( uint64_t expiration) const { - if (open_blob_files_.empty()) return nullptr; + if (open_ttl_files_.empty()) return nullptr; std::shared_ptr tmp = std::make_shared(); tmp->expiration_range_ = std::make_pair(expiration, 0); - auto citr = open_blob_files_.equal_range(tmp); - if (citr.first == open_blob_files_.end()) { - assert(citr.second == open_blob_files_.end()); + auto citr = open_ttl_files_.equal_range(tmp); + if (citr.first == open_ttl_files_.end()) { + assert(citr.second == open_ttl_files_.end()); - std::shared_ptr check = *(open_blob_files_.rbegin()); + std::shared_ptr check = *(open_ttl_files_.rbegin()); return (check->expiration_range_.second < expiration) ? nullptr : check; } if (citr.first != citr.second) return *(citr.first); auto finditr = citr.second; - if (finditr != open_blob_files_.begin()) --finditr; + if (finditr != open_ttl_files_.begin()) --finditr; bool b2 = (*finditr)->expiration_range_.second < expiration; bool b1 = (*finditr)->expiration_range_.first > expiration; @@ -530,11 +530,17 @@ std::shared_ptr BlobDBImpl::CheckOrCreateWriterLocked( } std::shared_ptr BlobDBImpl::SelectBlobFile() { - uint32_t val = blob_rgen.Next(); { ReadLock rl(&mutex_); - if (open_simple_files_.size() == bdb_options_.num_concurrent_simple_blobs) - return open_simple_files_[val % bdb_options_.num_concurrent_simple_blobs]; + if (open_non_ttl_file_ != nullptr) { + return open_non_ttl_file_; + } + } + + // CHECK again + WriteLock wl(&mutex_); + if (open_non_ttl_file_ != nullptr) { + return open_non_ttl_file_; } std::shared_ptr bfile = NewBlobFile("SelectBlobFile"); @@ -557,12 +563,6 @@ std::shared_ptr BlobDBImpl::SelectBlobFile() { bfile->header_valid_ = true; bfile->SetHasTTL(false); - // CHECK again - WriteLock wl(&mutex_); - if (open_simple_files_.size() == bdb_options_.num_concurrent_simple_blobs) { - return open_simple_files_[val % bdb_options_.num_concurrent_simple_blobs]; - } - Status s = writer->WriteHeader(bfile->header_); if (!s.ok()) { ROCKS_LOG_ERROR(db_options_.info_log, @@ -574,7 +574,7 @@ std::shared_ptr BlobDBImpl::SelectBlobFile() { dir_change_.store(true); blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile)); - open_simple_files_.push_back(bfile); + open_non_ttl_file_ = bfile; return bfile; } @@ -625,7 +625,7 @@ std::shared_ptr BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) { bfile->file_size_ = BlobLogHeader::kSize; // set the first value of the range, since that is - // concrete at this time. also necessary to add to open_blob_files_ + // concrete at this time. also necessary to add to open_ttl_files_ bfile->expiration_range_ = expiration_range; WriteLock wl(&mutex_); @@ -647,7 +647,7 @@ std::shared_ptr BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) { dir_change_.store(true); blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile)); - open_blob_files_.insert(bfile); + open_ttl_files_.insert(bfile); epoch_of_++; return bfile; @@ -1192,9 +1192,9 @@ std::pair BlobDBImpl::SanityCheck(bool aborted) { blob_files_.size()); ROCKS_LOG_INFO(db_options_.info_log, "Number of open files %" PRIu64, - open_blob_files_.size()); + open_ttl_files_.size()); - for (auto bfile : open_blob_files_) { + for (auto bfile : open_ttl_files_) { assert(!bfile->Immutable()); } @@ -1215,6 +1215,7 @@ std::pair BlobDBImpl::SanityCheck(bool aborted) { } Status BlobDBImpl::CloseBlobFile(std::shared_ptr bfile) { + assert(bfile != nullptr); Status s; ROCKS_LOG_INFO(db_options_.info_log, "Close blob file %" PRIu64, bfile->BlobFileNumber()); @@ -1223,13 +1224,11 @@ Status BlobDBImpl::CloseBlobFile(std::shared_ptr bfile) { if (bfile->HasTTL()) { size_t erased __attribute__((__unused__)); - erased = open_blob_files_.erase(bfile); + erased = open_ttl_files_.erase(bfile); assert(erased == 1); } else { - auto iter = std::find(open_simple_files_.begin(), - open_simple_files_.end(), bfile); - assert(iter != open_simple_files_.end()); - open_simple_files_.erase(iter); + assert(bfile == open_non_ttl_file_); + open_non_ttl_file_ = nullptr; } } @@ -1412,7 +1411,7 @@ std::pair BlobDBImpl::CheckSeqFiles(bool aborted) { uint64_t epoch_now = EpochNow(); ReadLock rl(&mutex_); - for (auto bfile : open_blob_files_) { + for (auto bfile : open_ttl_files_) { { ReadLock lockbfile_r(&bfile->mutex_); @@ -1437,14 +1436,14 @@ std::pair BlobDBImpl::FsyncFiles(bool aborted) { std::vector> process_files; { ReadLock rl(&mutex_); - for (auto fitr : open_blob_files_) { + for (auto fitr : open_ttl_files_) { if (fitr->NeedsFsync(true, bdb_options_.bytes_per_sync)) process_files.push_back(fitr); } - for (auto fitr : open_simple_files_) { - if (fitr->NeedsFsync(true, bdb_options_.bytes_per_sync)) - process_files.push_back(fitr); + if (open_non_ttl_file_ != nullptr && + open_non_ttl_file_->NeedsFsync(true, bdb_options_.bytes_per_sync)) { + process_files.push_back(open_non_ttl_file_); } } @@ -1800,7 +1799,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, // but under the asusmption that this is only called when a // file is Immutable, we can reduce the critical section bool BlobDBImpl::ShouldGCFile(std::shared_ptr bfile, uint64_t now, - bool is_oldest_simple_blob_file, + bool is_oldest_non_ttl_file, std::string* reason) { if (bfile->HasTTL()) { ExpirationRange expiration_range = bfile->GetExpirationRange(); @@ -1858,7 +1857,7 @@ bool BlobDBImpl::ShouldGCFile(std::shared_ptr bfile, uint64_t now, return false; } - if (is_oldest_simple_blob_file) { + if (is_oldest_non_ttl_file) { *reason = "out of space and is the oldest simple blob file"; return true; } @@ -1924,72 +1923,6 @@ std::pair BlobDBImpl::DeleteObsoleteFiles(bool aborted) { return std::make_pair(!aborted, -1); } -bool BlobDBImpl::CallbackEvictsImpl(std::shared_ptr bfile) { - std::shared_ptr reader = - bfile->OpenSequentialReader(env_, db_options_, env_options_); - if (!reader) { - ROCKS_LOG_ERROR( - db_options_.info_log, - "File sequential reader could not be opened for evict callback: %s", - bfile->PathName().c_str()); - return false; - } - - ReadLock lockbfile_r(&bfile->mutex_); - - BlobLogHeader header; - Status s = reader->ReadHeader(&header); - if (!s.ok()) { - ROCKS_LOG_ERROR( - db_options_.info_log, - "Failure to read header for blob-file during evict callback %s", - bfile->PathName().c_str()); - return false; - } - - ColumnFamilyHandle* cfh = - db_impl_->GetColumnFamilyHandleUnlocked(bfile->column_family_id()); - BlobLogRecord record; - Reader::ReadLevel full = Reader::kReadHeaderKeyBlob; - while (reader->ReadRecord(&record, full).ok()) { - bdb_options_.gc_evict_cb_fn(cfh, record.key, record.value); - } - - return true; -} - -std::pair BlobDBImpl::RemoveTimerQ(TimerQueue* tq, - bool aborted) { - WriteLock wl(&mutex_); - for (auto itr = cb_threads_.begin(); itr != cb_threads_.end(); ++itr) { - if ((*itr).get() != tq) continue; - - cb_threads_.erase(itr); - break; - } - return std::make_pair(false, -1); -} - -std::pair BlobDBImpl::CallbackEvicts( - TimerQueue* tq, std::shared_ptr bfile, bool aborted) { - if (aborted) return std::make_pair(false, -1); - bool succ = CallbackEvictsImpl(bfile); - if (succ) { - ROCKS_LOG_DEBUG(db_options_.info_log, "Eviction callbacks completed %s", - bfile->PathName().c_str()); - } - - WriteLock wl(&mutex_); - bfile->SetCanBeDeleted(); - obsolete_files_.push_front(bfile); - if (tq) { - // all of the callbacks have been processed - tqueue_.add(0, std::bind(&BlobDBImpl::RemoveTimerQ, this, tq, - std::placeholders::_1)); - } - return std::make_pair(false, -1); -} - void BlobDBImpl::CopyBlobFiles( std::vector>* bfiles_copy) { ReadLock rl(&mutex_); @@ -2011,7 +1944,7 @@ void BlobDBImpl::FilterSubsetOfFiles( uint64_t now = EpochNow(); size_t files_processed = 0; - bool simple_blob_file_found = false; + bool non_ttl_file_found = false; for (auto bfile : blob_files) { if (files_processed >= files_to_collect) break; // if this is the first time processing the file @@ -2031,15 +1964,14 @@ void BlobDBImpl::FilterSubsetOfFiles( // then it should not be GC'd if (bfile->Obsolete() || !bfile->Immutable()) continue; - bool is_oldest_simple_blob_file = false; - if (!simple_blob_file_found && !bfile->HasTTL()) { - is_oldest_simple_blob_file = true; - simple_blob_file_found = true; + bool is_oldest_non_ttl_file = false; + if (!non_ttl_file_found && !bfile->HasTTL()) { + is_oldest_non_ttl_file = true; + non_ttl_file_found = true; } std::string reason; - bool shouldgc = - ShouldGCFile(bfile, now, is_oldest_simple_blob_file, &reason); + bool shouldgc = ShouldGCFile(bfile, now, is_oldest_non_ttl_file, &reason); if (!shouldgc) { ROCKS_LOG_DEBUG(db_options_.info_log, "File has been skipped for GC ttl %s %" PRIu64 " %" PRIu64 @@ -2097,25 +2029,11 @@ std::pair BlobDBImpl::RunGC(bool aborted) { } if (!obsoletes.empty()) { - bool evict_cb = (!!bdb_options_.gc_evict_cb_fn); - std::shared_ptr tq; - if (evict_cb) tq = std::make_shared(); - - // if evict callback is present, first schedule the callback thread WriteLock wl(&mutex_); for (auto bfile : obsoletes) { - bool last_file = (bfile == obsoletes.back()); - - if (!evict_cb) { - bfile->SetCanBeDeleted(); - obsolete_files_.push_front(bfile); - } else { - tq->add(0, std::bind(&BlobDBImpl::CallbackEvicts, this, - (last_file) ? tq.get() : nullptr, bfile, - std::placeholders::_1)); - } + bfile->SetCanBeDeleted(); + obsolete_files_.push_front(bfile); } - if (evict_cb) cb_threads_.emplace_back(tq); } // reschedule diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index b18d26e1f..f90363008 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -305,7 +305,7 @@ class BlobDBImpl : public BlobDB { // tt - current time // last_id - the id of the non-TTL file to evict bool ShouldGCFile(std::shared_ptr bfile, uint64_t now, - bool is_oldest_simple_blob_file, std::string* reason); + bool is_oldest_non_ttl_file, std::string* reason); // collect all the blob log files from the blob directory Status GetAllLogFiles(std::set>* file_nums); @@ -370,14 +370,8 @@ class BlobDBImpl : public BlobDB { std::pair EvictCompacted(bool aborted); - bool CallbackEvictsImpl(std::shared_ptr bfile); - std::pair RemoveTimerQ(TimerQueue* tq, bool aborted); - std::pair CallbackEvicts(TimerQueue* tq, - std::shared_ptr bfile, - bool aborted); - // Adds the background tasks to the timer queue void StartBackgroundTasks(); @@ -467,12 +461,12 @@ class BlobDBImpl : public BlobDB { // epoch or version of the open files. std::atomic epoch_of_; - // All opened non-TTL blob files. - std::vector> open_simple_files_; + // opened non-TTL blob file. + std::shared_ptr open_non_ttl_file_; // all the blob files which are currently being appended to based // on variety of incoming TTL's - std::multiset, blobf_compare_ttl> open_blob_files_; + std::multiset, blobf_compare_ttl> open_ttl_files_; // packet of information to put in lockess delete(s) queue struct delete_packet_t { @@ -505,9 +499,6 @@ class BlobDBImpl : public BlobDB { // timer based queue to execute tasks TimerQueue tqueue_; - // timer queues to call eviction callbacks. - std::vector> cb_threads_; - // only accessed in GC thread, hence not atomic. The epoch of the // GC task. Each execution is one epoch. Helps us in allocating // files to one execution diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 85507eb5f..4dcf1a752 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -268,7 +268,6 @@ TEST_F(BlobDBTest, TTLExtrator_NoTTL) { bdb_options.ttl_range_secs = 1000; bdb_options.min_blob_size = 0; bdb_options.blob_file_size = 256 * 1000 * 1000; - bdb_options.num_concurrent_simple_blobs = 1; bdb_options.ttl_extractor = ttl_extractor_; bdb_options.disable_background_tasks = true; Open(bdb_options, options);