diff --git a/HISTORY.md b/HISTORY.md index c9318e027..9b05f8a49 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### Public API Change * RocksDBOptionsParser::Parse()'s `ignore_unknown_options` argument will only be effective if the option file shows it is generated using a higher version of RocksDB than the current version. +* Remove CompactionEventListener. ### New Features * Avoid unnecessarily flushing in `CompactRange()` when the range specified by the user does not overlap unflushed memtables. diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 8a40cd405..79348fb6e 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -12,31 +12,6 @@ namespace rocksdb { -#ifndef ROCKSDB_LITE -CompactionEventListener::CompactionListenerValueType fromInternalValueType( - ValueType vt) { - switch (vt) { - case kTypeDeletion: - return CompactionEventListener::CompactionListenerValueType::kDelete; - case kTypeValue: - return CompactionEventListener::CompactionListenerValueType::kValue; - case kTypeMerge: - return CompactionEventListener::CompactionListenerValueType:: - kMergeOperand; - case kTypeSingleDeletion: - return CompactionEventListener::CompactionListenerValueType:: - kSingleDelete; - case kTypeRangeDeletion: - return CompactionEventListener::CompactionListenerValueType::kRangeDelete; - case kTypeBlobIndex: - return CompactionEventListener::CompactionListenerValueType::kBlobIndex; - default: - assert(false); - return CompactionEventListener::CompactionListenerValueType::kInvalid; - } -} -#endif // ROCKSDB_LITE - CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, @@ -44,7 +19,6 @@ CompactionIterator::CompactionIterator( const SnapshotChecker* snapshot_checker, Env* env, bool expect_valid_internal_key, RangeDelAggregator* range_del_agg, const Compaction* compaction, const CompactionFilter* compaction_filter, - CompactionEventListener* compaction_listener, const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum) : CompactionIterator( @@ -53,8 +27,7 @@ CompactionIterator::CompactionIterator( expect_valid_internal_key, range_del_agg, std::unique_ptr( compaction ? new CompactionProxy(compaction) : nullptr), - compaction_filter, compaction_listener, shutting_down, - preserve_deletes_seqnum) {} + compaction_filter, shutting_down, preserve_deletes_seqnum) {} CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, @@ -64,7 +37,6 @@ CompactionIterator::CompactionIterator( bool expect_valid_internal_key, RangeDelAggregator* range_del_agg, std::unique_ptr compaction, const CompactionFilter* compaction_filter, - CompactionEventListener* compaction_listener, const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum ) @@ -79,9 +51,6 @@ CompactionIterator::CompactionIterator( range_del_agg_(range_del_agg), compaction_(std::move(compaction)), compaction_filter_(compaction_filter), -#ifndef ROCKSDB_LITE - compaction_listener_(compaction_listener), -#endif // ROCKSDB_LITE shutting_down_(shutting_down), preserve_deletes_seqnum_(preserve_deletes_seqnum), ignore_snapshots_(false), @@ -293,28 +262,12 @@ void CompactionIterator::NextFromInput() { (snapshot_checker_ == nullptr || snapshot_checker_->IsInSnapshot(ikey_.sequence, kMaxSequenceNumber)); -#ifndef ROCKSDB_LITE - if (compaction_listener_) { - compaction_listener_->OnCompaction(compaction_->level(), ikey_.user_key, - fromInternalValueType(ikey_.type), - value_, ikey_.sequence, true); - } -#endif // !ROCKSDB_LITE - // Apply the compaction filter to the first committed version of the user // key. if (current_key_committed_) { InvokeFilterIfNeeded(&need_skip, &skip_until); } } else { -#ifndef ROCKSDB_LITE - if (compaction_listener_) { - compaction_listener_->OnCompaction(compaction_->level(), ikey_.user_key, - fromInternalValueType(ikey_.type), - value_, ikey_.sequence, false); - } -#endif // ROCKSDB_LITE - // Update the current key to reflect the new sequence number/type without // copying the user key. // TODO(rven): Compaction filter does not process keys in this path diff --git a/db/compaction_iterator.h b/db/compaction_iterator.h index 8222f6d54..cc5cf0cfd 100644 --- a/db/compaction_iterator.h +++ b/db/compaction_iterator.h @@ -20,8 +20,6 @@ namespace rocksdb { -class CompactionEventListener; - class CompactionIterator { public: // A wrapper around Compaction. Has a much smaller interface, only what @@ -69,7 +67,6 @@ class CompactionIterator { RangeDelAggregator* range_del_agg, const Compaction* compaction = nullptr, const CompactionFilter* compaction_filter = nullptr, - CompactionEventListener* compaction_listener = nullptr, const std::atomic* shutting_down = nullptr, const SequenceNumber preserve_deletes_seqnum = 0); @@ -83,7 +80,6 @@ class CompactionIterator { RangeDelAggregator* range_del_agg, std::unique_ptr compaction, const CompactionFilter* compaction_filter = nullptr, - CompactionEventListener* compaction_listener = nullptr, const std::atomic* shutting_down = nullptr, const SequenceNumber preserve_deletes_seqnum = 0); @@ -147,9 +143,6 @@ class CompactionIterator { RangeDelAggregator* range_del_agg_; std::unique_ptr compaction_; const CompactionFilter* compaction_filter_; -#ifndef ROCKSDB_LITE - CompactionEventListener* compaction_listener_; -#endif // !ROCKSDB_LITE const std::atomic* shutting_down_; const SequenceNumber preserve_deletes_seqnum_; bool bottommost_level_; diff --git a/db/compaction_iterator_test.cc b/db/compaction_iterator_test.cc index 223798064..6e653a9c0 100644 --- a/db/compaction_iterator_test.cc +++ b/db/compaction_iterator_test.cc @@ -245,7 +245,7 @@ class CompactionIteratorTest : public testing::TestWithParam { iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_, earliest_write_conflict_snapshot, snapshot_checker_.get(), Env::Default(), false, range_del_agg_.get(), std::move(compaction), - filter, nullptr, &shutting_down_)); + filter, &shutting_down_)); } void AddSnapshot(SequenceNumber snapshot, diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 440d64879..be99ff283 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -762,24 +762,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { input->SeekToFirst(); } - // we allow only 1 compaction event listener. Used by blob storage - CompactionEventListener* comp_event_listener = nullptr; -#ifndef ROCKSDB_LITE - for (auto& celitr : cfd->ioptions()->listeners) { - comp_event_listener = celitr->GetCompactionEventListener(); - if (comp_event_listener != nullptr) { - break; - } - } -#endif // ROCKSDB_LITE - Status status; sub_compact->c_iter.reset(new CompactionIterator( input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(), &existing_snapshots_, earliest_write_conflict_snapshot_, snapshot_checker_, env_, false, range_del_agg.get(), - sub_compact->compaction, compaction_filter, comp_event_listener, - shutting_down_, preserve_deletes_seqnum_)); + sub_compact->compaction, compaction_filter, shutting_down_, + preserve_deletes_seqnum_)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); if (c_iter->Valid() && diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index c32bd1cb6..d17d71c5f 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -227,30 +227,6 @@ struct ExternalFileIngestionInfo { TableProperties table_properties; }; -// A call-back function to RocksDB which will be called when the compaction -// iterator is compacting values. It is meant to be returned from -// EventListner::GetCompactionEventListner() at the beginning of compaction -// job. -class CompactionEventListener { - public: - enum CompactionListenerValueType { - kValue, - kMergeOperand, - kDelete, - kSingleDelete, - kRangeDelete, - kBlobIndex, - kInvalid, - }; - - virtual void OnCompaction(int level, const Slice& key, - CompactionListenerValueType value_type, - const Slice& existing_value, - const SequenceNumber& sn, bool is_new) = 0; - - virtual ~CompactionEventListener() = default; -}; - // EventListener class contains a set of call-back functions that will // be called when specific RocksDB event happens such as flush. It can // be used as a building block for developing custom features such as @@ -413,12 +389,6 @@ class EventListener { // returns. Otherwise, RocksDB may be blocked. virtual void OnStallConditionsChanged(const WriteStallInfo& /*info*/) {} - // Factory method to return CompactionEventListener. If multiple listeners - // provides CompactionEventListner, only the first one will be used. - virtual CompactionEventListener* GetCompactionEventListener() { - return nullptr; - } - virtual ~EventListener() {} }; diff --git a/util/mpsc.h b/util/mpsc.h deleted file mode 100644 index 7449fd350..000000000 --- a/util/mpsc.h +++ /dev/null @@ -1,158 +0,0 @@ -// Portions Copyright (c) 2011-present, Facebook, Inc. All rights reserved. -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). -// -// Large parts of this file is borrowed from the public domain code below. -// from https://github.com/mstump/queues - -// C++ implementation of Dmitry Vyukov's non-intrusive -// lock free unbound MPSC queue -// http://www.1024cores.net/home/ -// lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue - -// License from mstump/queues -// This is free and unencumbered software released into the public domain. -// -// Anyone is free to copy, modify, publish, use, compile, sell, or -// distribute this software, either in source code form or as a compiled -// binary, for any purpose, commercial or non-commercial, and by any -// means. -// -// In jurisdictions that recognize copyright laws, the author or authors -// of this software dedicate any and all copyright interest in the -// software to the public domain. We make this dedication for the benefit -// of the public at large and to the detriment of our heirs and -// successors. We intend this dedication to be an overt act of -// relinquishment in perpetuity of all present and future rights to this -// software under copyright law. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. -// IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR -// OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, -// ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR -// OTHER DEALINGS IN THE SOFTWARE. -// -// For more information, please refer to - -// License from http://www.1024cores.net/home/ -// lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue -// Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. -// Redistribution and use in source and binary forms, with or -// without modification, are permitted provided that the following -// conditions are met: -// 1. Redistributions of source code must retain the above copyright notice, -// this list of conditions and the following disclaimer. -// -// 2. Redistributions in binary form must reproduce the above copyright notice, -// this list of conditions and the following disclaimer in the documentation -// and/or other materials provided with the distribution. -// -// THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR -// IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES -// OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO -// EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, -// INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, -// BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF -// USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -// -// The views and conclusions contained in the software and documentation -// are those of the authors and should not be interpreted as representing -// official policies, either expressed or implied, of Dmitry Vyukov. -// - -#ifndef UTIL_MPSC_H_ -#define UTIL_MPSC_H_ - -#include -#include -#include - -/** - * Multiple Producer Single Consumer Lockless Q - */ -template -class mpsc_queue_t { - public: - struct buffer_node_t { - T data; - std::atomic next; - }; - - mpsc_queue_t() { - buffer_node_aligned_t* al_st = new buffer_node_aligned_t; - buffer_node_t* node = new (al_st) buffer_node_t(); - _head.store(node); - _tail.store(node); - - node->next.store(nullptr, std::memory_order_relaxed); - } - - ~mpsc_queue_t() { - T output; - while (this->dequeue(&output)) { - } - buffer_node_t* front = _head.load(std::memory_order_relaxed); - front->~buffer_node_t(); - - ::operator delete(front); - } - - void enqueue(const T& input) { - buffer_node_aligned_t* al_st = new buffer_node_aligned_t; - buffer_node_t* node = new (al_st) buffer_node_t(); - - node->data = input; - node->next.store(nullptr, std::memory_order_relaxed); - - buffer_node_t* prev_head = _head.exchange(node, std::memory_order_acq_rel); - prev_head->next.store(node, std::memory_order_release); - } - - bool dequeue(T* output) { - buffer_node_t* tail = _tail.load(std::memory_order_relaxed); - buffer_node_t* next = tail->next.load(std::memory_order_acquire); - - if (next == nullptr) { - return false; - } - - *output = next->data; - _tail.store(next, std::memory_order_release); - - tail->~buffer_node_t(); - - ::operator delete(tail); - return true; - } - - // you can only use pop_all if the queue is SPSC - buffer_node_t* pop_all() { - // nobody else can move the tail pointer. - buffer_node_t* tptr = _tail.load(std::memory_order_relaxed); - buffer_node_t* next = - tptr->next.exchange(nullptr, std::memory_order_acquire); - _head.exchange(tptr, std::memory_order_acquire); - - // there is a race condition here - return next; - } - - private: - typedef typename std::aligned_storage< - sizeof(buffer_node_t), std::alignment_of::value>::type - buffer_node_aligned_t; - - std::atomic _head; - std::atomic _tail; - - mpsc_queue_t(const mpsc_queue_t&) = delete; - mpsc_queue_t& operator=(const mpsc_queue_t&) = delete; -}; - -#endif // UTIL_MPSC_H_ diff --git a/utilities/blob_db/blob_db.h b/utilities/blob_db/blob_db.h index 961f1728b..8f7371116 100644 --- a/utilities/blob_db/blob_db.h +++ b/utilities/blob_db/blob_db.h @@ -107,8 +107,6 @@ class BlobDB : public StackableDB { } using rocksdb::StackableDB::Delete; - virtual Status Delete(const WriteOptions& options, - const Slice& key) override = 0; virtual Status Delete(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key) override { @@ -116,7 +114,8 @@ class BlobDB : public StackableDB { return Status::NotSupported( "Blob DB doesn't support non-default column family."); } - return Delete(options, key); + assert(db_ != nullptr); + return db_->Delete(options, column_family, key); } virtual Status PutWithTTL(const WriteOptions& options, const Slice& key, diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index f22a79205..c8914ebf0 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -66,37 +66,6 @@ bool blobf_compare_ttl::operator()(const std::shared_ptr& lhs, return lhs->BlobFileNumber() < rhs->BlobFileNumber(); } -void EvictAllVersionsCompactionListener::InternalListener::OnCompaction( - int level, const Slice& key, - CompactionEventListener::CompactionListenerValueType value_type, - const Slice& existing_value, const SequenceNumber& sn, bool is_new) { - assert(impl_->bdb_options_.enable_garbage_collection); - if (!is_new && - value_type == - CompactionEventListener::CompactionListenerValueType::kValue) { - BlobIndex blob_index; - Status s = blob_index.DecodeFrom(existing_value); - if (s.ok()) { - if (impl_->debug_level_ >= 3) - ROCKS_LOG_INFO( - impl_->db_options_.info_log, - "CALLBACK COMPACTED OUT KEY: %s SN: %d " - "NEW: %d FN: %" PRIu64 " OFFSET: %" PRIu64 " SIZE: %" PRIu64, - key.ToString().c_str(), sn, is_new, blob_index.file_number(), - blob_index.offset(), blob_index.size()); - - impl_->override_vals_q_.enqueue({blob_index.file_number(), key.size(), - blob_index.offset(), blob_index.size(), - sn}); - } - } else { - if (impl_->debug_level_ >= 3) - ROCKS_LOG_INFO(impl_->db_options_.info_log, - "CALLBACK NEW KEY: %s SN: %d NEW: %d", - key.ToString().c_str(), sn, is_new); - } -} - BlobDBImpl::BlobDBImpl(const std::string& dbname, const BlobDBOptions& blob_db_options, const DBOptions& db_options, @@ -114,7 +83,6 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname, next_file_number_(1), epoch_of_(0), shutdown_(false), - current_epoch_(0), open_file_count_(0), total_blob_space_(0), open_p1_done_(false), @@ -181,10 +149,6 @@ Status BlobDBImpl::Open(std::vector* handles) { // Update options db_options_.listeners.push_back( std::shared_ptr(new BlobDBFlushBeginListener(this))); - if (bdb_options_.enable_garbage_collection) { - db_options_.listeners.push_back(std::shared_ptr( - new EvictAllVersionsCompactionListener(this))); - } cf_options_.compaction_filter_factory.reset( new BlobIndexCompactionFilterFactory(env_, statistics_)); @@ -214,14 +178,6 @@ void BlobDBImpl::StartBackgroundTasks() { tqueue_.add(static_cast( bdb_options_.garbage_collection_interval_secs * 1000), std::bind(&BlobDBImpl::RunGC, this, std::placeholders::_1)); - if (bdb_options_.enable_garbage_collection) { - tqueue_.add( - kDeleteCheckPeriodMillisecs, - std::bind(&BlobDBImpl::EvictDeletions, this, std::placeholders::_1)); - tqueue_.add( - kDeleteCheckPeriodMillisecs, - std::bind(&BlobDBImpl::EvictCompacted, this, std::placeholders::_1)); - } tqueue_.add( kDeleteObsoleteFilesPeriodMillisecs, std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1)); @@ -298,12 +254,6 @@ Status BlobDBImpl::OpenAllBlobFiles() { return read_metadata_status; } - // since this file already existed, we will try to reconcile - // deleted count with LSM - if (bdb_options_.enable_garbage_collection) { - blob_file->gc_once_after_open_ = true; - } - blob_files_[file_number] = blob_file; if (!blob_file_list.empty()) { blob_file_list.append(", "); @@ -553,17 +503,6 @@ std::shared_ptr BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) { return bfile; } -Status BlobDBImpl::Delete(const WriteOptions& options, const Slice& key) { - SequenceNumber lsn = db_impl_->GetLatestSequenceNumber(); - Status s = db_->Delete(options, key); - - if (bdb_options_.enable_garbage_collection) { - // add deleted key to list of keys that have been deleted for book-keeping - delete_keys_q_.enqueue({DefaultColumnFamily(), key.ToString(), lsn}); - } - return s; -} - class BlobDBImpl::BlobInserter : public WriteBatch::Handler { private: const WriteOptions& options_; @@ -646,47 +585,7 @@ Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) { if (!s.ok()) { return s; } - s = db_->Write(options, blob_inserter.batch()); - if (!s.ok()) { - return s; - } - - // add deleted key to list of keys that have been deleted for book-keeping - class DeleteBookkeeper : public WriteBatch::Handler { - public: - explicit DeleteBookkeeper(BlobDBImpl* impl, const SequenceNumber& seq) - : impl_(impl), sequence_(seq) {} - - virtual Status PutCF(uint32_t /*column_family_id*/, const Slice& /*key*/, - const Slice& /*value*/) override { - sequence_++; - return Status::OK(); - } - - virtual Status DeleteCF(uint32_t column_family_id, - const Slice& key) override { - ColumnFamilyHandle* cfh = - impl_->db_impl_->GetColumnFamilyHandleUnlocked(column_family_id); - - impl_->delete_keys_q_.enqueue({cfh, key.ToString(), sequence_}); - sequence_++; - return Status::OK(); - } - - private: - BlobDBImpl* impl_; - SequenceNumber sequence_; - }; - - if (bdb_options_.enable_garbage_collection) { - // add deleted key to list of keys that have been deleted for book-keeping - SequenceNumber current_seq = - WriteBatchInternal::Sequence(blob_inserter.batch()); - DeleteBookkeeper delete_bookkeeper(this, current_seq); - s = updates->Iterate(&delete_bookkeeper); - } - - return s; + return db_->Write(options, blob_inserter.batch()); } Status BlobDBImpl::GetLiveFiles(std::vector& ret, @@ -1204,10 +1103,8 @@ std::pair BlobDBImpl::SanityCheck(bool aborted) { for (auto bfile_pair : blob_files_) { auto bfile = bfile_pair.second; ROCKS_LOG_INFO( - db_options_.info_log, - "Blob File %s %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64, + db_options_.info_log, "Blob File %s %" PRIu64 " %" PRIu64 " %" PRIu64, bfile->PathName().c_str(), bfile->GetFileSize(), bfile->BlobCount(), - bfile->deleted_count_, bfile->deleted_size_, (bfile->expiration_range_.second - epoch_now)); } @@ -1281,136 +1178,6 @@ bool BlobDBImpl::VisibleToActiveSnapshot( return oldest_snapshot < obsolete_sequence; } -bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size, - uint64_t blob_offset, - uint64_t blob_size) { - assert(bdb_options_.enable_garbage_collection); - (void)blob_offset; - std::shared_ptr bfile; - { - ReadLock rl(&mutex_); - auto hitr = blob_files_.find(file_number); - - // file was deleted - if (hitr == blob_files_.end()) { - return false; - } - - bfile = hitr->second; - } - - WriteLock lockbfile_w(&bfile->mutex_); - - bfile->deleted_count_++; - bfile->deleted_size_ += key_size + blob_size + BlobLogRecord::kHeaderSize; - return true; -} - -bool BlobDBImpl::MarkBlobDeleted(const Slice& key, const Slice& index_entry) { - assert(bdb_options_.enable_garbage_collection); - BlobIndex blob_index; - Status s = blob_index.DecodeFrom(index_entry); - if (!s.ok()) { - ROCKS_LOG_INFO(db_options_.info_log, - "Could not parse lsm val in MarkBlobDeleted %s", - index_entry.ToString().c_str()); - return false; - } - bool succ = FindFileAndEvictABlob(blob_index.file_number(), key.size(), - blob_index.offset(), blob_index.size()); - return succ; -} - -std::pair BlobDBImpl::EvictCompacted(bool aborted) { - assert(bdb_options_.enable_garbage_collection); - if (aborted) return std::make_pair(false, -1); - - override_packet_t packet; - size_t total_vals = 0; - size_t mark_evicted = 0; - while (override_vals_q_.dequeue(&packet)) { - bool succeeded = - FindFileAndEvictABlob(packet.file_number_, packet.key_size_, - packet.blob_offset_, packet.blob_size_); - total_vals++; - if (succeeded) { - mark_evicted++; - } - } - ROCKS_LOG_INFO(db_options_.info_log, - "Mark %" ROCKSDB_PRIszt - " values to evict, out of %" ROCKSDB_PRIszt - " compacted values.", - mark_evicted, total_vals); - return std::make_pair(true, -1); -} - -std::pair BlobDBImpl::EvictDeletions(bool aborted) { - assert(bdb_options_.enable_garbage_collection); - if (aborted) return std::make_pair(false, -1); - - ColumnFamilyHandle* last_cfh = nullptr; - Options last_op; - - Arena arena; - ScopedArenaIterator iter; - - // we will use same RangeDelAggregator for all cf's. - // essentially we do not support Range Deletes now - std::unique_ptr range_del_agg; - delete_packet_t dpacket; - while (delete_keys_q_.dequeue(&dpacket)) { - if (last_cfh != dpacket.cfh_) { - if (!range_del_agg) { - auto cfhi = reinterpret_cast(dpacket.cfh_); - auto cfd = cfhi->cfd(); - range_del_agg.reset(new RangeDelAggregator(cfd->internal_comparator(), - kMaxSequenceNumber)); - } - - // this can be expensive - last_cfh = dpacket.cfh_; - last_op = db_impl_->GetOptions(last_cfh); - iter.set(db_impl_->NewInternalIterator(&arena, range_del_agg.get(), - dpacket.cfh_)); - // this will not work for multiple CF's. - } - - Slice user_key(dpacket.key_); - InternalKey target(user_key, dpacket.dsn_, kTypeValue); - - Slice eslice = target.Encode(); - iter->Seek(eslice); - - if (!iter->status().ok()) { - ROCKS_LOG_INFO(db_options_.info_log, "Invalid iterator seek %s", - dpacket.key_.c_str()); - continue; - } - - const Comparator* bwc = BytewiseComparator(); - while (iter->Valid()) { - if (!bwc->Equal(ExtractUserKey(iter->key()), ExtractUserKey(eslice))) - break; - - ParsedInternalKey ikey(Slice(), 0, kTypeValue); - if (!ParseInternalKey(iter->key(), &ikey)) { - continue; - } - - // once you hit a DELETE, assume the keys below have been - // processed previously - if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) break; - - Slice val = iter->value(); - MarkBlobDeleted(ikey.user_key, val); - - iter->Next(); - } - } - return std::make_pair(true, -1); -} - std::pair BlobDBImpl::CheckSeqFiles(bool aborted) { if (aborted) return std::make_pair(false, -1); @@ -1572,8 +1339,6 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, return s; } - bool first_gc = bfptr->gc_once_after_open_; - auto* cfh = db_impl_->GetColumnFamilyHandleUnlocked(bfptr->column_family_id()); auto* cfd = reinterpret_cast(cfh)->cfd(); @@ -1583,19 +1348,9 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, // this reads the key but skips the blob Reader::ReadLevel shallow = Reader::kReadHeaderKey; - bool no_relocation_ttl = - (has_ttl && now >= bfptr->GetExpirationRange().second); + bool file_expired = has_ttl && now >= bfptr->GetExpirationRange().second; - bool no_relocation_lsmdel = false; - { - ReadLock lockbfile_r(&bfptr->mutex_); - no_relocation_lsmdel = - (bfptr->GetFileSize() == - (BlobLogHeader::kSize + bfptr->deleted_size_ + BlobLogFooter::kSize)); - } - - bool no_relocation = no_relocation_ttl || no_relocation_lsmdel; - if (!no_relocation) { + if (!file_expired) { // read the blob because you have to write it back to new file shallow = Reader::kReadHeaderKeyBlob; } @@ -1671,7 +1426,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, // If key has expired, remove it from base DB. // TODO(yiwu): Blob indexes will be remove by BlobIndexCompactionFilter. // We can just drop the blob record. - if (no_relocation_ttl || (has_ttl && now >= record.expiration)) { + if (file_expired || (has_ttl && now >= record.expiration)) { gc_stats->num_keys_expired++; gc_stats->bytes_expired += record.record_size(); TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"); @@ -1693,11 +1448,6 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, continue; } - if (first_gc) { - // Do not relocate blob record for initial GC. - continue; - } - // Relocate the blob record to new file. if (!newfile) { // new file @@ -1765,7 +1515,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, if (s.ok()) { bfptr->MarkObsolete(GetLatestSequenceNumber()); - if (!first_gc) { + { WriteLock wl(&mutex_); obsolete_files_.push_back(bfptr); } @@ -1806,73 +1556,6 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, return s; } -// Ideally we should hold the lock during the entire function, -// but under the asusmption that this is only called when a -// file is Immutable, we can reduce the critical section -bool BlobDBImpl::ShouldGCFile(std::shared_ptr bfile, uint64_t now, - bool is_oldest_non_ttl_file, - std::string* reason) { - if (bfile->HasTTL()) { - ExpirationRange expiration_range = bfile->GetExpirationRange(); - if (now > expiration_range.second) { - *reason = "entire file ttl expired"; - return true; - } - - if (!bfile->file_size_.load()) { - ROCKS_LOG_ERROR(db_options_.info_log, "Invalid file size = 0 %s", - bfile->PathName().c_str()); - *reason = "file is empty"; - return false; - } - - if (bfile->gc_once_after_open_.load()) { - return true; - } - - ReadLock lockbfile_r(&bfile->mutex_); - bool ret = ((bfile->deleted_size_ / bfile->file_size_.load()) > - bdb_options_.garbage_collection_deletion_size_threshold); - if (ret) { - *reason = "deleted blobs beyond threshold"; - } else { - *reason = "deleted blobs below threshold"; - } - return ret; - } - - // when crash happens, we lose the in-memory account of deleted blobs. - // we are therefore forced to do one GC to make sure delete accounting - // is OK - if (bfile->gc_once_after_open_.load()) { - return true; - } - - ReadLock lockbfile_r(&bfile->mutex_); - - if (bdb_options_.enable_garbage_collection) { - if ((bfile->deleted_size_ / bfile->file_size_.load()) > - bdb_options_.garbage_collection_deletion_size_threshold) { - *reason = "deleted simple blobs beyond threshold"; - return true; - } - } - - // if we haven't reached limits of disk space, don't DELETE - if (bdb_options_.blob_dir_size == 0 || - total_blob_space_.load() < bdb_options_.blob_dir_size) { - *reason = "disk space not exceeded"; - return false; - } - - if (is_oldest_non_ttl_file) { - *reason = "out of space and is the oldest simple blob file"; - return true; - } - *reason = "out of space but is not the oldest simple blob file"; - return false; -} - std::pair BlobDBImpl::DeleteObsoleteFiles(bool aborted) { if (aborted) return std::make_pair(false, -1); @@ -1958,98 +1641,13 @@ void BlobDBImpl::CopyBlobFiles( } } -void BlobDBImpl::FilterSubsetOfFiles( - const std::vector>& blob_files, - std::vector>* to_process, uint64_t epoch, - size_t files_to_collect) { - // 100.0 / 15.0 = 7 - uint64_t next_epoch_increment = static_cast( - std::ceil(100 / static_cast(kGCFilePercentage))); - uint64_t now = EpochNow(); - - size_t files_processed = 0; - bool non_ttl_file_found = false; - for (auto bfile : blob_files) { - if (files_processed >= files_to_collect) break; - // if this is the first time processing the file - // i.e. gc_epoch == -1, process it. - // else process the file if its processing epoch matches - // the current epoch. Typically the #of epochs should be - // around 5-10 - if (bfile->gc_epoch_ != -1 && (uint64_t)bfile->gc_epoch_ != epoch) { - continue; - } - - files_processed++; - // reset the epoch - bfile->gc_epoch_ = epoch + next_epoch_increment; - - // file has already been GC'd or is still open for append, - // then it should not be GC'd - if (bfile->Obsolete() || !bfile->Immutable()) continue; - - bool is_oldest_non_ttl_file = false; - if (!non_ttl_file_found && !bfile->HasTTL()) { - is_oldest_non_ttl_file = true; - non_ttl_file_found = true; - } - - std::string reason; - bool shouldgc = ShouldGCFile(bfile, now, is_oldest_non_ttl_file, &reason); - if (!shouldgc) { - ROCKS_LOG_DEBUG(db_options_.info_log, - "File has been skipped for GC ttl %s %" PRIu64 " %" PRIu64 - " reason='%s'", - bfile->PathName().c_str(), now, - bfile->GetExpirationRange().second, reason.c_str()); - continue; - } - - ROCKS_LOG_INFO(db_options_.info_log, - "File has been chosen for GC ttl %s %" PRIu64 " %" PRIu64 - " reason='%s'", - bfile->PathName().c_str(), now, - bfile->GetExpirationRange().second, reason.c_str()); - to_process->push_back(bfile); - } -} - std::pair BlobDBImpl::RunGC(bool aborted) { - if (aborted) return std::make_pair(false, -1); - - current_epoch_++; - - std::vector> blob_files; - CopyBlobFiles(&blob_files); - - if (!blob_files.size()) return std::make_pair(true, -1); - - // 15% of files are collected each call to space out the IO and CPU - // consumption. - size_t files_to_collect = (kGCFilePercentage * blob_files.size()) / 100; - - std::vector> to_process; - FilterSubsetOfFiles(blob_files, &to_process, current_epoch_, - files_to_collect); - - for (auto bfile : to_process) { - GCStats gc_stats; - Status s = GCFileAndUpdateLSM(bfile, &gc_stats); - if (!s.ok()) { - continue; - } - - if (bfile->gc_once_after_open_.load()) { - WriteLock lockbfile_w(&bfile->mutex_); - - bfile->deleted_size_ = - gc_stats.bytes_overwritten + gc_stats.bytes_expired; - bfile->deleted_count_ = - gc_stats.num_keys_overwritten + gc_stats.num_keys_expired; - bfile->gc_once_after_open_ = false; - } + if (aborted) { + return std::make_pair(false, -1); } + // TODO(yiwu): Garbage collection implementation. + // reschedule return std::make_pair(true, -1); } diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 328087c9c..d120ea258 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -26,7 +26,6 @@ #include "rocksdb/options.h" #include "rocksdb/statistics.h" #include "rocksdb/wal_filter.h" -#include "util/mpsc.h" #include "util/mutexlock.h" #include "util/timer_queue.h" #include "utilities/blob_db/blob_db.h" @@ -73,34 +72,6 @@ class BlobReconcileWalFilter : public WalFilter { virtual const char* Name() const override { return "BlobDBWalReconciler"; } }; -class EvictAllVersionsCompactionListener : public EventListener { - public: - class InternalListener : public CompactionEventListener { - friend class BlobDBImpl; - - public: - explicit InternalListener(BlobDBImpl* blob_db_impl) : impl_(blob_db_impl) {} - - virtual void OnCompaction(int level, const Slice& key, - CompactionListenerValueType value_type, - const Slice& existing_value, - const SequenceNumber& sn, bool is_new) override; - - private: - BlobDBImpl* impl_; - }; - - explicit EvictAllVersionsCompactionListener(BlobDBImpl* blob_db_impl) - : internal_listener_(new InternalListener(blob_db_impl)) {} - - virtual CompactionEventListener* GetCompactionEventListener() override { - return internal_listener_.get(); - } - - private: - std::unique_ptr internal_listener_; -}; - // Comparator to sort "TTL" aware Blob files based on the lower value of // TTL range. struct blobf_compare_ttl { @@ -124,7 +95,6 @@ struct GCStats { * Garbage Collected. */ class BlobDBImpl : public BlobDB { - friend class EvictAllVersionsCompactionListener; friend class BlobFile; friend class BlobDBIterator; @@ -161,9 +131,6 @@ class BlobDBImpl : public BlobDB { Status Put(const WriteOptions& options, const Slice& key, const Slice& value) override; - using BlobDB::Delete; - Status Delete(const WriteOptions& options, const Slice& key) override; - using BlobDB::Get; Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; @@ -249,13 +216,6 @@ class BlobDBImpl : public BlobDB { Slice GetCompressedSlice(const Slice& raw, std::string* compression_output) const; - // is this file ready for Garbage collection. if the TTL of the file - // has expired or if threshold of the file has been evicted - // tt - current time - // last_id - the id of the non-TTL file to evict - bool ShouldGCFile(std::shared_ptr bfile, uint64_t now, - bool is_oldest_non_ttl_file, std::string* reason); - // Close a file by appending a footer, and removes file from open files list. Status CloseBlobFile(std::shared_ptr bfile); @@ -305,11 +265,6 @@ class BlobDBImpl : public BlobDB { // efficiency std::pair ReclaimOpenFiles(bool aborted); - // background task to do book-keeping of deleted keys - std::pair EvictDeletions(bool aborted); - - std::pair EvictCompacted(bool aborted); - std::pair RemoveTimerQ(TimerQueue* tq, bool aborted); // Adds the background tasks to the timer queue @@ -354,20 +309,10 @@ class BlobDBImpl : public BlobDB { bool VisibleToActiveSnapshot(const std::shared_ptr& file); bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr& bfile); - bool MarkBlobDeleted(const Slice& key, const Slice& lsmValue); - - bool FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size, - uint64_t blob_offset, uint64_t blob_size); - void CopyBlobFiles( std::vector>* bfiles_copy, std::function&)> predicate = {}); - void FilterSubsetOfFiles( - const std::vector>& blob_files, - std::vector>* to_process, uint64_t epoch, - size_t files_to_collect); - uint64_t EpochNow() { return env_->NowMicros() / 1000000; } Status CheckSize(size_t blob_size); @@ -423,42 +368,12 @@ class BlobDBImpl : public BlobDB { // on variety of incoming TTL's std::multiset, blobf_compare_ttl> open_ttl_files_; - // packet of information to put in lockess delete(s) queue - struct delete_packet_t { - ColumnFamilyHandle* cfh_; - std::string key_; - SequenceNumber dsn_; - }; - - struct override_packet_t { - uint64_t file_number_; - uint64_t key_size_; - uint64_t blob_offset_; - uint64_t blob_size_; - SequenceNumber dsn_; - }; - - // LOCKLESS multiple producer single consumer queue to quickly append - // deletes without taking lock. Can rapidly grow in size!! - // deletes happen in LSM, but minor book-keeping needs to happen on - // BLOB side (for triggering eviction) - mpsc_queue_t delete_keys_q_; - - // LOCKLESS multiple producer single consumer queue for values - // that are being compacted - mpsc_queue_t override_vals_q_; - // atomic bool to represent shutdown std::atomic shutdown_; // timer based queue to execute tasks TimerQueue tqueue_; - // only accessed in GC thread, hence not atomic. The epoch of the - // GC task. Each execution is one epoch. Helps us in allocating - // files to one execution - uint64_t current_epoch_; - // number of files opened for random access/GET // counter is used to monitor and close excess RA files. std::atomic open_file_count_; diff --git a/utilities/blob_db/blob_file.cc b/utilities/blob_db/blob_file.cc index 324a9521d..287e62a40 100644 --- a/utilities/blob_db/blob_file.cc +++ b/utilities/blob_db/blob_file.cc @@ -35,13 +35,9 @@ BlobFile::BlobFile() compression_(kNoCompression), has_ttl_(false), blob_count_(0), - gc_epoch_(-1), file_size_(0), - deleted_count_(0), - deleted_size_(0), closed_(false), obsolete_(false), - gc_once_after_open_(false), expiration_range_({0, 0}), last_access_(-1), last_fsync_(0), @@ -58,13 +54,9 @@ BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn, compression_(kNoCompression), has_ttl_(false), blob_count_(0), - gc_epoch_(-1), file_size_(0), - deleted_count_(0), - deleted_size_(0), closed_(false), obsolete_(false), - gc_once_after_open_(false), expiration_range_({0, 0}), last_access_(-1), last_fsync_(0), @@ -109,16 +101,14 @@ std::shared_ptr BlobFile::OpenSequentialReader( std::string BlobFile::DumpState() const { char str[1000]; - snprintf(str, sizeof(str), - "path: %s fn: %" PRIu64 " blob_count: %" PRIu64 " gc_epoch: %" PRIu64 - " file_size: %" PRIu64 " deleted_count: %" PRIu64 - " deleted_size: %" PRIu64 - " closed: %d obsolete: %d expiration_range: (%" PRIu64 ", %" PRIu64 - "), writer: %d reader: %d", - path_to_dir_.c_str(), file_number_, blob_count_.load(), - gc_epoch_.load(), file_size_.load(), deleted_count_, deleted_size_, - closed_.load(), obsolete_.load(), expiration_range_.first, - expiration_range_.second, (!!log_writer_), (!!ra_file_reader_)); + snprintf( + str, sizeof(str), + "path: %s fn: %" PRIu64 " blob_count: %" PRIu64 " file_size: %" PRIu64 + " closed: %d obsolete: %d expiration_range: (%" PRIu64 ", %" PRIu64 + "), writer: %d reader: %d", + path_to_dir_.c_str(), file_number_, blob_count_.load(), file_size_.load(), + closed_.load(), obsolete_.load(), expiration_range_.first, + expiration_range_.second, (!!log_writer_), (!!ra_file_reader_)); return str; } diff --git a/utilities/blob_db/blob_file.h b/utilities/blob_db/blob_file.h index 0dac911c0..b64f29ad0 100644 --- a/utilities/blob_db/blob_file.h +++ b/utilities/blob_db/blob_file.h @@ -53,18 +53,9 @@ class BlobFile { // number of blobs in the file std::atomic blob_count_; - // the file will be selected for GC in this future epoch - std::atomic gc_epoch_; - // size of the file std::atomic file_size_; - // number of blobs in this particular file which have been evicted - uint64_t deleted_count_; - - // size of deleted blobs (used by heuristic to select file for GC) - uint64_t deleted_size_; - BlobLogHeader header_; // closed_ = true implies the file is no more mutable @@ -79,9 +70,6 @@ class BlobFile { // Data in this file is visible to a snapshot taken before the sequence. SequenceNumber obsolete_sequence_; - // should this file been gc'd once to reconcile lost deletes/compactions - std::atomic gc_once_after_open_; - ExpirationRange expiration_range_; // Sequential/Append writer for blobs