From b864bc9b5ba8394cd1e55c9c57ab65d7de0732c2 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Tue, 6 Mar 2018 11:46:20 -0800 Subject: [PATCH] Blob DB: Improve FIFO eviction Summary: Improving blob db FIFO eviction with the following changes, * Change blob_dir_size to max_db_size. Take into account SST file size when computing DB size. * FIFO now only take into account live sst files and live blob files. It is normal for disk usage to go over max_db_size because there are obsolete sst files and blob files pending deletion. * FIFO eviction now also evict TTL blob files that's still open. It doesn't evict non-TTL blob files. * If FIFO is triggered, it will pass an expiration and the current sequence number to compaction filter. Compaction filter will then filter inlined keys to evict those with an earlier expiration and smaller sequence number. So call LSM FIFO. * Compaction filter also filter those blob indexes where corresponding blob file is gone. * Add an event listener to listen compaction/flush event and update sst file size. * Implement DB::Close() to make sure base db, as well as event listener and compaction filter, destruct before blob db. * More blob db statistics around FIFO. * Fix some locking issue when accessing a blob file. Closes https://github.com/facebook/rocksdb/pull/3556 Differential Revision: D7139328 Pulled By: yiwu-arbug fbshipit-source-id: ea5edb07b33dfceacb2682f4789bea61de28bbfa --- CMakeLists.txt | 1 + TARGETS | 1 + db/compaction_iterator.cc | 5 +- include/rocksdb/statistics.h | 18 +- src.mk | 1 + tools/db_bench_tool.cc | 4 +- utilities/blob_db/blob_compaction_filter.cc | 117 ++++++++ utilities/blob_db/blob_compaction_filter.h | 69 +---- utilities/blob_db/blob_db.cc | 66 ++-- utilities/blob_db/blob_db.h | 17 +- utilities/blob_db/blob_db_impl.cc | 314 +++++++++++++------- utilities/blob_db/blob_db_impl.h | 74 +++-- utilities/blob_db/blob_db_iterator.h | 42 ++- utilities/blob_db/blob_db_listener.h | 46 +++ utilities/blob_db/blob_db_test.cc | 302 +++++++++++++++++-- 15 files changed, 823 insertions(+), 254 deletions(-) create mode 100644 utilities/blob_db/blob_compaction_filter.cc create mode 100644 utilities/blob_db/blob_db_listener.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 5a9bd5318..739da75b4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -575,6 +575,7 @@ set(SOURCES util/transaction_test_util.cc util/xxhash.cc utilities/backupable/backupable_db.cc + utilities/blob_db/blob_compaction_filter.cc utilities/blob_db/blob_db.cc utilities/blob_db/blob_db_impl.cc utilities/blob_db/blob_dump_tool.cc diff --git a/TARGETS b/TARGETS index ca0ab48d0..b7dae2caa 100644 --- a/TARGETS +++ b/TARGETS @@ -217,6 +217,7 @@ cpp_library( "util/transaction_test_util.cc", "util/xxhash.cc", "utilities/backupable/backupable_db.cc", + "utilities/blob_db/blob_compaction_filter.cc", "utilities/blob_db/blob_db.cc", "utilities/blob_db/blob_db_impl.cc", "utilities/blob_db/blob_dump_tool.cc", diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 45d786f0d..3d62f43a4 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -167,10 +167,13 @@ void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, CompactionFilter::ValueType value_type = ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue : CompactionFilter::ValueType::kBlobIndex; + // Hack: pass internal key to BlobIndexCompactionFilter since it needs + // to get sequence number. + Slice& filter_key = ikey_.type == kTypeValue ? ikey_.user_key : key_; { StopWatchNano timer(env_, true); filter = compaction_filter_->FilterV2( - compaction_->level(), ikey_.user_key, value_type, value_, + compaction_->level(), filter_key, value_type, value_, &compaction_filter_value_, compaction_filter_skip_until_.rep()); iter_stats_.total_filter_time += env_ != nullptr ? timer.ElapsedNanos() : 0; diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 57c8daf33..8cf8b7d3e 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -265,7 +265,16 @@ enum Tickers : uint32_t { BLOB_DB_BLOB_FILE_SYNCED, // # of blob index evicted from base DB by BlobDB compaction filter because // of expiration. - BLOB_DB_BLOB_INDEX_EXPIRED, + BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, + // size of blob index evicted from base DB by BlobDB compaction filter + // because of expiration. + BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, + // # of blob index evicted from base DB by BlobDB compaction filter because + // of corresponding file deleted. + BLOB_DB_BLOB_INDEX_EVICTED_COUNT, + // size of blob index evicted from base DB by BlobDB compaction filter + // because of corresponding file deleted. + BLOB_DB_BLOB_INDEX_EVICTED_SIZE, // # of blob files being garbage collected. BLOB_DB_GC_NUM_FILES, // # of blob files generated by garbage collection. @@ -417,7 +426,12 @@ const std::vector> TickersNameMap = { {BLOB_DB_BLOB_FILE_BYTES_WRITTEN, "rocksdb.blobdb.blob.file.bytes.written"}, {BLOB_DB_BLOB_FILE_BYTES_READ, "rocksdb.blobdb.blob.file.bytes.read"}, {BLOB_DB_BLOB_FILE_SYNCED, "rocksdb.blobdb.blob.file.synced"}, - {BLOB_DB_BLOB_INDEX_EXPIRED, "rocksdb.blobdb.blob.index.expired"}, + {BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, + "rocksdb.blobdb.blob.index.expired.count"}, + {BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, "rocksdb.blobdb.blob.index.expired.size"}, + {BLOB_DB_BLOB_INDEX_EVICTED_COUNT, + "rocksdb.blobdb.blob.index.evicted.count"}, + {BLOB_DB_BLOB_INDEX_EVICTED_SIZE, "rocksdb.blobdb.blob.index.evicted.size"}, {BLOB_DB_GC_NUM_FILES, "rocksdb.blobdb.gc.num.files"}, {BLOB_DB_GC_NUM_NEW_FILES, "rocksdb.blobdb.gc.num.new.files"}, {BLOB_DB_GC_FAILURES, "rocksdb.blobdb.gc.failures"}, diff --git a/src.mk b/src.mk index c2ad87c54..a0fcd80f8 100644 --- a/src.mk +++ b/src.mk @@ -153,6 +153,7 @@ LIB_SOURCES = \ util/transaction_test_util.cc \ util/xxhash.cc \ utilities/backupable/backupable_db.cc \ + utilities/blob_db/blob_compaction_filter.cc \ utilities/blob_db/blob_db.cc \ utilities/blob_db/blob_db_impl.cc \ utilities/blob_db/blob_file.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 40e033167..855e0764f 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -686,7 +686,7 @@ DEFINE_bool(blob_db_enable_gc, false, "Enable BlobDB garbage collection."); DEFINE_bool(blob_db_is_fifo, false, "Enable FIFO eviction strategy in BlobDB."); -DEFINE_uint64(blob_db_dir_size, 0, +DEFINE_uint64(blob_db_max_db_size, 0, "Max size limit of the directory where blob files are stored."); DEFINE_uint64(blob_db_max_ttl_range, 86400, @@ -3446,7 +3446,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { blob_db::BlobDBOptions blob_db_options; blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc; blob_db_options.is_fifo = FLAGS_blob_db_is_fifo; - blob_db_options.blob_dir_size = FLAGS_blob_db_dir_size; + blob_db_options.max_db_size = FLAGS_blob_db_max_db_size; blob_db_options.ttl_range_secs = FLAGS_blob_db_ttl_range_secs; blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size; blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync; diff --git a/utilities/blob_db/blob_compaction_filter.cc b/utilities/blob_db/blob_compaction_filter.cc new file mode 100644 index 000000000..cbc76a98d --- /dev/null +++ b/utilities/blob_db/blob_compaction_filter.cc @@ -0,0 +1,117 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "utilities/blob_db/blob_compaction_filter.h" +#include "db/dbformat.h" + +namespace rocksdb { +namespace blob_db { + +namespace { + +// CompactionFilter to delete expired blob index from base DB. +class BlobIndexCompactionFilter : public CompactionFilter { + public: + BlobIndexCompactionFilter(BlobCompactionContext context, + uint64_t current_time, Statistics* statistics) + : context_(context), + current_time_(current_time), + statistics_(statistics) {} + + virtual ~BlobIndexCompactionFilter() { + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_); + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_); + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_); + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_); + } + + virtual const char* Name() const override { + return "BlobIndexCompactionFilter"; + } + + // Filter expired blob indexes regardless of snapshots. + virtual bool IgnoreSnapshots() const override { return true; } + + virtual Decision FilterV2(int /*level*/, const Slice& key, + ValueType value_type, const Slice& value, + std::string* /*new_value*/, + std::string* /*skip_until*/) const override { + if (value_type != kBlobIndex) { + return Decision::kKeep; + } + BlobIndex blob_index; + Status s = blob_index.DecodeFrom(value); + if (!s.ok()) { + // Unable to decode blob index. Keeping the value. + return Decision::kKeep; + } + if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) { + // Expired + expired_count_++; + expired_size_ += key.size() + value.size(); + return Decision::kRemove; + } + if (!blob_index.IsInlined() && + blob_index.file_number() < context_.next_file_number && + context_.current_blob_files.count(blob_index.file_number()) == 0) { + // Corresponding blob file gone. Could have been garbage collected or + // evicted by FIFO eviction. + evicted_count_++; + evicted_size_ += key.size() + value.size(); + return Decision::kRemove; + } + if (context_.fifo_eviction_seq > 0 && blob_index.HasTTL() && + blob_index.expiration() < context_.evict_expiration_up_to) { + // Hack: Internal key is passed to BlobIndexCompactionFilter for it to + // get sequence number. + ParsedInternalKey ikey; + bool ok = ParseInternalKey(key, &ikey); + // Remove keys that could have been remove by last FIFO eviction. + // If get error while parsing key, ignore and continue. + if (ok && ikey.sequence < context_.fifo_eviction_seq) { + evicted_count_++; + evicted_size_ += key.size() + value.size(); + return Decision::kRemove; + } + } + return Decision::kKeep; + } + + private: + BlobCompactionContext context_; + const uint64_t current_time_; + Statistics* statistics_; + // It is safe to not using std::atomic since the compaction filter, created + // from a compaction filter factroy, will not be called from multiple threads. + mutable uint64_t expired_count_ = 0; + mutable uint64_t expired_size_ = 0; + mutable uint64_t evicted_count_ = 0; + mutable uint64_t evicted_size_ = 0; +}; + +} // anonymous namespace + +std::unique_ptr +BlobIndexCompactionFilterFactory::CreateCompactionFilter( + const CompactionFilter::Context& /*context*/) { + int64_t current_time = 0; + Status s = env_->GetCurrentTime(¤t_time); + if (!s.ok()) { + return nullptr; + } + assert(current_time >= 0); + + BlobCompactionContext context; + blob_db_impl_->GetCompactionContext(&context); + + return std::unique_ptr(new BlobIndexCompactionFilter( + context, static_cast(current_time), statistics_)); +} + +} // namespace blob_db +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/utilities/blob_db/blob_compaction_filter.h b/utilities/blob_db/blob_compaction_filter.h index 192a338ff..7a8ea6135 100644 --- a/utilities/blob_db/blob_compaction_filter.h +++ b/utilities/blob_db/blob_compaction_filter.h @@ -5,82 +5,39 @@ #pragma once #ifndef ROCKSDB_LITE +#include + #include "monitoring/statistics.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/env.h" +#include "utilities/blob_db/blob_db_impl.h" #include "utilities/blob_db/blob_index.h" namespace rocksdb { namespace blob_db { -// CompactionFilter to delete expired blob index from base DB. -class BlobIndexCompactionFilter : public CompactionFilter { - public: - BlobIndexCompactionFilter(uint64_t current_time, Statistics* statistics) - : current_time_(current_time), statistics_(statistics) {} - - virtual ~BlobIndexCompactionFilter() { - RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED, expired_count_); - } - - virtual const char* Name() const override { - return "BlobIndexCompactionFilter"; - } - - // Filter expired blob indexes regardless of snapshots. - virtual bool IgnoreSnapshots() const override { return true; } - - virtual Decision FilterV2(int /*level*/, const Slice& /*key*/, - ValueType value_type, const Slice& value, - std::string* /*new_value*/, - std::string* /*skip_until*/) const override { - if (value_type != kBlobIndex) { - return Decision::kKeep; - } - BlobIndex blob_index; - Status s = blob_index.DecodeFrom(value); - if (!s.ok()) { - // Unable to decode blob index. Keeping the value. - return Decision::kKeep; - } - if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) { - // Expired - expired_count_++; - return Decision::kRemove; - } - return Decision::kKeep; - } - - private: - const uint64_t current_time_; - Statistics* statistics_; - // It is safe to not using std::atomic since the compaction filter, created - // from a compaction filter factroy, will not be called from multiple threads. - mutable uint64_t expired_count_ = 0; +struct BlobCompactionContext { + uint64_t next_file_number; + std::unordered_set current_blob_files; + SequenceNumber fifo_eviction_seq; + uint64_t evict_expiration_up_to; }; class BlobIndexCompactionFilterFactory : public CompactionFilterFactory { public: - BlobIndexCompactionFilterFactory(Env* env, Statistics* statistics) - : env_(env), statistics_(statistics) {} + BlobIndexCompactionFilterFactory(BlobDBImpl* blob_db_impl, Env* env, + Statistics* statistics) + : blob_db_impl_(blob_db_impl), env_(env), statistics_(statistics) {} virtual const char* Name() const override { return "BlobIndexCompactionFilterFactory"; } virtual std::unique_ptr CreateCompactionFilter( - const CompactionFilter::Context& /*context*/) override { - int64_t current_time = 0; - Status s = env_->GetCurrentTime(¤t_time); - if (!s.ok()) { - return nullptr; - } - assert(current_time >= 0); - return std::unique_ptr(new BlobIndexCompactionFilter( - static_cast(current_time), statistics_)); - } + const CompactionFilter::Context& /*context*/) override; private: + BlobDBImpl* blob_db_impl_; Env* env_; Statistics* statistics_; }; diff --git a/utilities/blob_db/blob_db.cc b/utilities/blob_db/blob_db.cc index f042db76e..523324a76 100644 --- a/utilities/blob_db/blob_db.cc +++ b/utilities/blob_db/blob_db.cc @@ -63,30 +63,48 @@ Status BlobDB::Open(const DBOptions& db_options, BlobDB::BlobDB() : StackableDB(nullptr) {} void BlobDBOptions::Dump(Logger* log) const { - ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir: %s", - blob_dir.c_str()); - ROCKS_LOG_HEADER(log, " blob_db_options.path_relative: %d", - path_relative); - ROCKS_LOG_HEADER(log, " blob_db_options.is_fifo: %d", - is_fifo); - ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir_size: %" PRIu64, - blob_dir_size); - ROCKS_LOG_HEADER(log, " blob_db_options.ttl_range_secs: %" PRIu32, - ttl_range_secs); - ROCKS_LOG_HEADER(log, " blob_db_options.min_blob_size: %" PRIu64, - min_blob_size); - ROCKS_LOG_HEADER(log, " blob_db_options.bytes_per_sync: %" PRIu64, - bytes_per_sync); - ROCKS_LOG_HEADER(log, " blob_db_options.blob_file_size: %" PRIu64, - blob_file_size); - ROCKS_LOG_HEADER(log, " blob_db_options.ttl_extractor: %p", - ttl_extractor.get()); - ROCKS_LOG_HEADER(log, " blob_db_options.compression: %d", - static_cast(compression)); - ROCKS_LOG_HEADER(log, "blob_db_options.enable_garbage_collection: %d", - enable_garbage_collection); - ROCKS_LOG_HEADER(log, " blob_db_options.disable_background_tasks: %d", - disable_background_tasks); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.blob_dir: %s", + blob_dir.c_str()); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.path_relative: %d", + path_relative); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.is_fifo: %d", + is_fifo); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.max_db_size: %" PRIu64, + max_db_size); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.ttl_range_secs: %" PRIu32, + ttl_range_secs); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.min_blob_size: %" PRIu64, + min_blob_size); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.bytes_per_sync: %" PRIu64, + bytes_per_sync); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.blob_file_size: %" PRIu64, + blob_file_size); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.ttl_extractor: %p", + ttl_extractor.get()); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.compression: %d", + static_cast(compression)); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.enable_garbage_collection: %d", + enable_garbage_collection); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.garbage_collection_interval_secs: %" PRIu64, + garbage_collection_interval_secs); + ROCKS_LOG_HEADER( + log, "BlobDBOptions.garbage_collection_deletion_size_threshold: %lf", + garbage_collection_deletion_size_threshold); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.disable_background_tasks: %d", + disable_background_tasks); } } // namespace blob_db diff --git a/utilities/blob_db/blob_db.h b/utilities/blob_db/blob_db.h index 8f7371116..183d23a8c 100644 --- a/utilities/blob_db/blob_db.h +++ b/utilities/blob_db/blob_db.h @@ -36,13 +36,17 @@ struct BlobDBOptions { // whether the blob_dir path is relative or absolute. bool path_relative = true; - // is the eviction strategy fifo based + // When max_db_size is reached, evict blob files to free up space + // instead of returnning NoSpace error on write. Blob files will be + // evicted in this order until enough space is free up: + // * the TTL blob file cloeset to expire, + // * the oldest non-TTL blob file. bool is_fifo = false; - // maximum size of the blob dir. Once this gets used, up - // evict the blob file which is oldest (is_fifo ) - // 0 means no limits - uint64_t blob_dir_size = 0; + // Maximum size of the database (including SST files and blob files). + // + // Default: 0 (no limits) + uint64_t max_db_size = 0; // a new bucket is opened, for ttl_range. So if ttl_range is 600seconds // (10 minutes), and the first bucket starts at 1471542000 @@ -198,6 +202,9 @@ class BlobDB : public StackableDB { return NewIterator(options); } + using rocksdb::StackableDB::Close; + virtual Status Close() override = 0; + // Opening blob db. static Status Open(const Options& options, const BlobDBOptions& bdb_options, const std::string& dbname, BlobDB** blob_db); diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 7a6874c95..1a1a22392 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -35,6 +35,7 @@ #include "util/timer_queue.h" #include "utilities/blob_db/blob_compaction_filter.h" #include "utilities/blob_db/blob_db_iterator.h" +#include "utilities/blob_db/blob_db_listener.h" #include "utilities/blob_db/blob_index.h" namespace { @@ -44,12 +45,6 @@ int kBlockBasedTableVersionFormat = 2; namespace rocksdb { namespace blob_db { -void BlobDBFlushBeginListener::OnFlushBegin(DB* /*db*/, - const FlushJobInfo& /*info*/) { - assert(blob_db_impl_ != nullptr); - blob_db_impl_->SyncBlobFiles(); -} - WalFilter::WalProcessingOption BlobReconcileWalFilter::LogRecordFound( unsigned long long /*log_number*/, const std::string& /*log_file_name*/, const WriteBatch& /*batch*/, WriteBatch* /*new_batch*/, @@ -59,6 +54,7 @@ WalFilter::WalProcessingOption BlobReconcileWalFilter::LogRecordFound( bool blobf_compare_ttl::operator()(const std::shared_ptr& lhs, const std::shared_ptr& rhs) const { + assert(lhs->HasTTL() && rhs->HasTTL()); if (lhs->expiration_range_.first < rhs->expiration_range_.first) { return true; } @@ -84,12 +80,13 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname, statistics_(db_options_.statistics.get()), next_file_number_(1), epoch_of_(0), - shutdown_(false), + closed_(true), open_file_count_(0), - total_blob_space_(0), - open_p1_done_(false), - debug_level_(0), - oldest_file_evicted_(false) { + total_blob_size_(0), + live_sst_size_(0), + fifo_eviction_seq_(0), + evict_expiration_up_to_(0), + debug_level_(0) { blob_dir_ = (bdb_options_.path_relative) ? dbname + "/" + bdb_options_.blob_dir : bdb_options_.blob_dir; @@ -98,8 +95,30 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname, BlobDBImpl::~BlobDBImpl() { // CancelAllBackgroundWork(db_, true); + Status s __attribute__((__unused__)) = Close(); + assert(s.ok()); +} + +Status BlobDBImpl::Close() { + if (closed_) { + return Status::OK(); + } + closed_ = true; - Shutdown(); + // Close base DB before BlobDBImpl destructs to stop event listener and + // compaction filter call. + Status s = db_->Close(); + // delete db_ anyway even if close failed. + delete db_; + // Reset pointers to avoid StackableDB delete the pointer again. + db_ = nullptr; + db_impl_ = nullptr; + if (!s.ok()) { + return s; + } + + s = SyncBlobFiles(); + return s; } BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; } @@ -149,10 +168,9 @@ Status BlobDBImpl::Open(std::vector* handles) { } // Update options - db_options_.listeners.push_back( - std::shared_ptr(new BlobDBFlushBeginListener(this))); + db_options_.listeners.push_back(std::make_shared(this)); cf_options_.compaction_filter_factory.reset( - new BlobIndexCompactionFilterFactory(env_, statistics_)); + new BlobIndexCompactionFilterFactory(this, env_, statistics_)); // Open base db. ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_); @@ -161,6 +179,7 @@ Status BlobDBImpl::Open(std::vector* handles) { return s; } db_impl_ = static_cast_with_check(db_->GetRootDB()); + UpdateLiveSSTSize(); // Start background jobs. if (!bdb_options_.disable_background_tasks) { @@ -169,6 +188,7 @@ Status BlobDBImpl::Open(std::vector* handles) { ROCKS_LOG_INFO(db_options_.info_log, "BlobDB pointer %p", this); bdb_options_.Dump(db_options_.info_log.get()); + closed_ = false; return s; } @@ -190,8 +210,6 @@ void BlobDBImpl::StartBackgroundTasks() { std::bind(&BlobDBImpl::CheckSeqFiles, this, std::placeholders::_1)); } -void BlobDBImpl::Shutdown() { shutdown_.store(true); } - Status BlobDBImpl::GetAllBlobFiles(std::set* file_numbers) { assert(file_numbers != nullptr); std::vector all_files; @@ -241,8 +259,7 @@ Status BlobDBImpl::OpenAllBlobFiles() { Status read_metadata_status = blob_file->ReadMetadata(env_, env_options_); if (read_metadata_status.IsCorruption()) { // Remove incomplete file. - blob_file->MarkObsolete(0 /*sequence number*/); - obsolete_files_.push_back(blob_file); + ObsoleteBlobFile(blob_file, 0 /*obsolete_seq*/, false /*update_size*/); if (!obsolete_file_list.empty()) { obsolete_file_list.append(", "); } @@ -256,6 +273,8 @@ Status BlobDBImpl::OpenAllBlobFiles() { return read_metadata_status; } + total_blob_size_ += blob_file->GetFileSize(); + blob_files_[file_number] = blob_file; if (!blob_file_list.empty()) { blob_file_list.append(", "); @@ -343,25 +362,33 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr& bfile) { std::shared_ptr BlobDBImpl::FindBlobFileLocked( uint64_t expiration) const { - if (open_ttl_files_.empty()) return nullptr; + if (open_ttl_files_.empty()) { + return nullptr; + } std::shared_ptr tmp = std::make_shared(); + tmp->SetHasTTL(true); tmp->expiration_range_ = std::make_pair(expiration, 0); + tmp->file_number_ = std::numeric_limits::max(); auto citr = open_ttl_files_.equal_range(tmp); if (citr.first == open_ttl_files_.end()) { assert(citr.second == open_ttl_files_.end()); std::shared_ptr check = *(open_ttl_files_.rbegin()); - return (check->expiration_range_.second < expiration) ? nullptr : check; + return (check->expiration_range_.second <= expiration) ? nullptr : check; } - if (citr.first != citr.second) return *(citr.first); + if (citr.first != citr.second) { + return *(citr.first); + } auto finditr = citr.second; - if (finditr != open_ttl_files_.begin()) --finditr; + if (finditr != open_ttl_files_.begin()) { + --finditr; + } - bool b2 = (*finditr)->expiration_range_.second < expiration; + bool b2 = (*finditr)->expiration_range_.second <= expiration; bool b1 = (*finditr)->expiration_range_.first > expiration; return (b1 || b2) ? nullptr : (*finditr); @@ -426,6 +453,7 @@ std::shared_ptr BlobDBImpl::SelectBlobFile() { blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile)); open_non_ttl_file_ = bfile; + total_blob_size_ += BlobLogHeader::kSize; return bfile; } @@ -500,6 +528,7 @@ std::shared_ptr BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) { blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile)); open_ttl_files_.insert(bfile); + total_blob_size_ += BlobLogHeader::kSize; epoch_of_++; return bfile; @@ -663,6 +692,7 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key, Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/, const Slice& key, const Slice& value, uint64_t expiration, WriteBatch* batch) { + write_mutex_.AssertHeld(); Status s; std::string index_entry; uint32_t column_family_id = @@ -680,20 +710,27 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/, RecordTick(statistics_, BLOB_DB_WRITE_INLINED_TTL); } } else { - std::shared_ptr bfile = (expiration != kNoExpiration) - ? SelectBlobFileTTL(expiration) - : SelectBlobFile(); - if (!bfile) { - return Status::NotFound("Blob file not found"); - } - - assert(bfile->compression() == bdb_options_.compression); std::string compression_output; Slice value_compressed = GetCompressedSlice(value, &compression_output); std::string headerbuf; Writer::ConstructBlobHeader(&headerbuf, key, value_compressed, expiration); + // Check DB size limit before selecting blob file to + // Since CheckSizeAndEvictBlobFiles() can close blob files, it needs to be + // done before calling SelectBlobFile(). + s = CheckSizeAndEvictBlobFiles(headerbuf.size() + key.size() + + value_compressed.size()); + if (!s.ok()) { + return s; + } + + std::shared_ptr bfile = (expiration != kNoExpiration) + ? SelectBlobFileTTL(expiration) + : SelectBlobFile(); + assert(bfile != nullptr); + assert(bfile->compression() == bdb_options_.compression); + s = AppendBlob(bfile, headerbuf, key, value_compressed, expiration, &index_entry); if (expiration == kNoExpiration) { @@ -756,66 +793,118 @@ uint64_t BlobDBImpl::ExtractExpiration(const Slice& key, const Slice& value, return has_expiration ? expiration : kNoExpiration; } -std::shared_ptr BlobDBImpl::GetOldestBlobFile() { - std::vector> blob_files; - CopyBlobFiles(&blob_files, [](const std::shared_ptr& f) { - return !f->Obsolete() && f->Immutable(); - }); - if (blob_files.empty()) { - return nullptr; +void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) { + ReadLock l(&mutex_); + + context->next_file_number = next_file_number_.load(); + context->current_blob_files.clear(); + for (auto& p : blob_files_) { + context->current_blob_files.insert(p.first); } - blobf_compare_ttl compare; - return *std::min_element(blob_files.begin(), blob_files.end(), compare); + context->fifo_eviction_seq = fifo_eviction_seq_; + context->evict_expiration_up_to = evict_expiration_up_to_; } -bool BlobDBImpl::EvictOldestBlobFile() { - auto oldest_file = GetOldestBlobFile(); - if (oldest_file == nullptr) { - return false; +void BlobDBImpl::UpdateLiveSSTSize() { + uint64_t live_sst_size = 0; + bool ok = GetIntProperty(DB::Properties::kLiveSstFilesSize, &live_sst_size); + if (ok) { + live_sst_size_.store(live_sst_size); + ROCKS_LOG_INFO(db_options_.info_log, + "Updated total SST file size: %" PRIu64 " bytes.", + live_sst_size); + } else { + ROCKS_LOG_ERROR( + db_options_.info_log, + "Failed to update total SST file size after flush or compaction."); } + { + // Trigger FIFO eviction if needed. + MutexLock l(&write_mutex_); + Status s = CheckSizeAndEvictBlobFiles(0, true /*force*/); + if (s.IsNoSpace()) { + ROCKS_LOG_WARN(db_options_.info_log, + "DB grow out-of-space after SST size updated. Current live" + " SST size: %" PRIu64 + " , current blob files size: %" PRIu64 ".", + live_sst_size_.load(), total_blob_size_.load()); + } + } +} - WriteLock wl(&mutex_); - // Double check the file is not obsolete by others - if (oldest_file_evicted_ == false && !oldest_file->Obsolete()) { - auto expiration_range = oldest_file->GetExpirationRange(); +Status BlobDBImpl::CheckSizeAndEvictBlobFiles(uint64_t blob_size, + bool force_evict) { + write_mutex_.AssertHeld(); + + uint64_t live_sst_size = live_sst_size_.load(); + if (bdb_options_.max_db_size == 0 || + live_sst_size + total_blob_size_.load() + blob_size <= + bdb_options_.max_db_size) { + return Status::OK(); + } + + if (bdb_options_.is_fifo == false || + (!force_evict && live_sst_size + blob_size > bdb_options_.max_db_size)) { + // FIFO eviction is disabled, or no space to insert new blob even we evict + // all blob files. + return Status::NoSpace( + "Write failed, as writing it would exceed max_db_size limit."); + } + + std::vector> candidate_files; + CopyBlobFiles(&candidate_files, + [&](const std::shared_ptr& blob_file) { + // Only evict TTL files + return blob_file->HasTTL(); + }); + std::sort(candidate_files.begin(), candidate_files.end(), + blobf_compare_ttl()); + std::reverse(candidate_files.begin(), candidate_files.end()); + fifo_eviction_seq_ = GetLatestSequenceNumber(); + + WriteLock l(&mutex_); + + while (!candidate_files.empty() && + live_sst_size + total_blob_size_.load() + blob_size > + bdb_options_.max_db_size) { + std::shared_ptr blob_file = candidate_files.back(); + candidate_files.pop_back(); + WriteLock file_lock(&blob_file->mutex_); + if (blob_file->Obsolete()) { + // File already obsoleted by someone else. + continue; + } + // FIFO eviction can evict open blob files. + if (!blob_file->Immutable()) { + Status s = CloseBlobFile(blob_file, false /*need_lock*/); + if (!s.ok()) { + return s; + } + } + assert(blob_file->Immutable()); + auto expiration_range = blob_file->GetExpirationRange(); ROCKS_LOG_INFO(db_options_.info_log, "Evict oldest blob file since DB out of space. Current " - "space used: %" PRIu64 ", blob dir size: %" PRIu64 - ", evicted blob file #%" PRIu64 + "live SST file size: %" PRIu64 ", total blob size: %" PRIu64 + ", max db size: %" PRIu64 ", evicted blob file #%" PRIu64 " with expiration range (%" PRIu64 ", %" PRIu64 ").", - total_blob_space_.load(), bdb_options_.blob_dir_size, - oldest_file->BlobFileNumber(), expiration_range.first, - expiration_range.second); - oldest_file->MarkObsolete(GetLatestSequenceNumber()); - obsolete_files_.push_back(oldest_file); - oldest_file_evicted_.store(true); + live_sst_size, total_blob_size_.load(), + bdb_options_.max_db_size, blob_file->BlobFileNumber(), + expiration_range.first, expiration_range.second); + ObsoleteBlobFile(blob_file, fifo_eviction_seq_, true /*update_size*/); + evict_expiration_up_to_ = expiration_range.first; RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED); RecordTick(statistics_, BLOB_DB_FIFO_NUM_KEYS_EVICTED, - oldest_file->BlobCount()); + blob_file->BlobCount()); RecordTick(statistics_, BLOB_DB_FIFO_BYTES_EVICTED, - oldest_file->GetFileSize()); + blob_file->GetFileSize()); TEST_SYNC_POINT("BlobDBImpl::EvictOldestBlobFile:Evicted"); - return true; } - - return false; -} - -Status BlobDBImpl::CheckSize(size_t blob_size) { - uint64_t new_space_util = total_blob_space_.load() + blob_size; - if (bdb_options_.blob_dir_size > 0) { - if (!bdb_options_.is_fifo && - (new_space_util > bdb_options_.blob_dir_size)) { - return Status::NoSpace( - "Write failed, as writing it would exceed blob_dir_size limit."); - } - if (bdb_options_.is_fifo && !oldest_file_evicted_.load() && - (new_space_util > - kEvictOldestFileAtSize * bdb_options_.blob_dir_size)) { - EvictOldestBlobFile(); - } + if (live_sst_size + total_blob_size_.load() + blob_size > + bdb_options_.max_db_size) { + return Status::NoSpace( + "Write failed, as writing it would exceed max_db_size limit."); } - return Status::OK(); } @@ -823,18 +912,15 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr& bfile, const std::string& headerbuf, const Slice& key, const Slice& value, uint64_t expiration, std::string* index_entry) { - auto size_put = BlobLogRecord::kHeaderSize + key.size() + value.size(); - Status s = CheckSize(size_put); - if (!s.ok()) { - return s; - } - + Status s; uint64_t blob_offset = 0; uint64_t key_offset = 0; { WriteLock lockbfile_w(&bfile->mutex_); std::shared_ptr writer = CheckOrCreateWriterLocked(bfile); - if (!writer) return Status::IOError("Failed to create blob writer"); + if (!writer) { + return Status::IOError("Failed to create blob writer"); + } // write the blob to the blob log. s = writer->EmitPhysicalRecord(headerbuf, key, value, &key_offset, @@ -851,8 +937,9 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr& bfile, // increment blob count bfile->blob_count_++; + uint64_t size_put = headerbuf.size() + key.size() + value.size(); bfile->file_size_ += size_put; - total_blob_space_ += size_put; + total_blob_size_ += size_put; if (expiration == kNoExpiration) { BlobIndex::EncodeBlob(index_entry, bfile->BlobFileNumber(), blob_offset, @@ -1114,14 +1201,19 @@ std::pair BlobDBImpl::SanityCheck(bool aborted) { return std::make_pair(true, -1); } -Status BlobDBImpl::CloseBlobFile(std::shared_ptr bfile) { +Status BlobDBImpl::CloseBlobFile(std::shared_ptr bfile, + bool need_lock) { assert(bfile != nullptr); + write_mutex_.AssertHeld(); Status s; ROCKS_LOG_INFO(db_options_.info_log, "Closing blob file %" PRIu64 ". Path: %s", bfile->BlobFileNumber(), bfile->PathName().c_str()); { - WriteLock wl(&mutex_); + std::unique_ptr lock; + if (need_lock) { + lock.reset(new WriteLock(&mutex_)); + } if (bfile->HasTTL()) { size_t erased __attribute__((__unused__)); @@ -1134,11 +1226,16 @@ Status BlobDBImpl::CloseBlobFile(std::shared_ptr bfile) { } if (!bfile->closed_.load()) { - WriteLock lockbfile_w(&bfile->mutex_); + std::unique_ptr file_lock; + if (need_lock) { + file_lock.reset(new WriteLock(&bfile->mutex_)); + } s = bfile->WriteFooterAndCloseLocked(); } - if (!s.ok()) { + if (s.ok()) { + total_blob_size_ += BlobLogFooter::kSize; + } else { ROCKS_LOG_ERROR(db_options_.info_log, "Failed to close blob file %" PRIu64 "with error: %s", bfile->BlobFileNumber(), s.ToString().c_str()); @@ -1155,6 +1252,18 @@ Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr& bfile) { return CloseBlobFile(bfile); } +void BlobDBImpl::ObsoleteBlobFile(std::shared_ptr blob_file, + SequenceNumber obsolete_seq, + bool update_size) { + // Should hold write lock of mutex_ or during DB open. + blob_file->MarkObsolete(obsolete_seq); + obsolete_files_.push_back(blob_file); + assert(total_blob_size_.load() >= blob_file->GetFileSize()); + if (update_size) { + total_blob_size_ -= blob_file->GetFileSize(); + } +} + bool BlobDBImpl::VisibleToActiveSnapshot( const std::shared_ptr& bfile) { assert(bfile->Obsolete()); @@ -1198,6 +1307,7 @@ std::pair BlobDBImpl::CheckSeqFiles(bool aborted) { } } + MutexLock l(&write_mutex_); for (auto bfile : process_files) { CloseBlobFile(bfile); } @@ -1515,12 +1625,9 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, } } // end of ReadRecord loop - if (s.ok()) { - bfptr->MarkObsolete(GetLatestSequenceNumber()); - { - WriteLock wl(&mutex_); - obsolete_files_.push_back(bfptr); - } + { + WriteLock wl(&mutex_); + ObsoleteBlobFile(bfptr, GetLatestSequenceNumber(), true /*update_size*/); } ROCKS_LOG_INFO( @@ -1543,7 +1650,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, gc_stats->bytes_overwritten); RecordTick(statistics_, BLOB_DB_GC_BYTES_EXPIRED, gc_stats->bytes_expired); if (newfile != nullptr) { - total_blob_space_ += newfile->file_size_; + total_blob_size_ += newfile->file_size_; ROCKS_LOG_INFO(db_options_.info_log, "New blob file %" PRIu64 ".", newfile->BlobFileNumber()); RecordTick(statistics_, BLOB_DB_GC_NUM_NEW_FILES); @@ -1600,7 +1707,6 @@ std::pair BlobDBImpl::DeleteObsoleteFiles(bool aborted) { } file_deleted = true; - total_blob_space_ -= bfile->file_size_; ROCKS_LOG_INFO(db_options_.info_log, "File deleted as obsolete from blob dir %s", bfile->PathName().c_str()); @@ -1611,9 +1717,6 @@ std::pair BlobDBImpl::DeleteObsoleteFiles(bool aborted) { // directory change. Fsync if (file_deleted) { dir_ent_->Fsync(); - - // reset oldest_file_evicted flag - oldest_file_evicted_.store(false); } // put files back into obsolete if for some reason, delete failed @@ -1734,15 +1837,24 @@ void BlobDBImpl::TEST_DeleteObsoleteFiles() { } Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr& bfile) { + MutexLock l(&write_mutex_); return CloseBlobFile(bfile); } +void BlobDBImpl::TEST_ObsoleteBlobFile(std::shared_ptr& blob_file, + SequenceNumber obsolete_seq, + bool update_size) { + return ObsoleteBlobFile(blob_file, obsolete_seq, update_size); +} + Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr& bfile, GCStats* gc_stats) { return GCFileAndUpdateLSM(bfile, gc_stats); } void BlobDBImpl::TEST_RunGC() { RunGC(false /*abort*/); } + +uint64_t BlobDBImpl::TEST_live_sst_size() { return live_sst_size_.load(); } #endif // !NDEBUG } // namespace blob_db diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 38d1580bb..d3e810deb 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -43,19 +43,9 @@ struct FlushJobInfo; namespace blob_db { -class BlobFile; +struct BlobCompactionContext; class BlobDBImpl; - -class BlobDBFlushBeginListener : public EventListener { - public: - explicit BlobDBFlushBeginListener(BlobDBImpl* blob_db_impl) - : blob_db_impl_(blob_db_impl) {} - - void OnFlushBegin(DB* db, const FlushJobInfo& info) override; - - private: - BlobDBImpl* blob_db_impl_; -}; +class BlobFile; // this implements the callback from the WAL which ensures that the // blob record is present in the blob log. If fsync/fdatasync in not @@ -154,6 +144,8 @@ class BlobDBImpl : public BlobDB { virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; + virtual Status Close() override; + virtual Status GetLiveFiles(std::vector&, uint64_t* manifest_file_size, bool flush_memtable = true) override; @@ -180,6 +172,10 @@ class BlobDBImpl : public BlobDB { Status SyncBlobFiles() override; + void UpdateLiveSSTSize(); + + void GetCompactionContext(BlobCompactionContext* context); + #ifndef NDEBUG Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry, PinnableSlice* value); @@ -190,12 +186,18 @@ class BlobDBImpl : public BlobDB { Status TEST_CloseBlobFile(std::shared_ptr& bfile); + void TEST_ObsoleteBlobFile(std::shared_ptr& blob_file, + SequenceNumber obsolete_seq = 0, + bool update_size = true); + Status TEST_GCFileAndUpdateLSM(std::shared_ptr& bfile, GCStats* gc_stats); void TEST_RunGC(); void TEST_DeleteObsoleteFiles(); + + uint64_t TEST_live_sst_size(); #endif // !NDEBUG private: @@ -217,11 +219,17 @@ class BlobDBImpl : public BlobDB { std::string* compression_output) const; // Close a file by appending a footer, and removes file from open files list. - Status CloseBlobFile(std::shared_ptr bfile); + Status CloseBlobFile(std::shared_ptr bfile, bool need_lock = true); // Close a file if its size exceeds blob_file_size Status CloseBlobFileIfNeeded(std::shared_ptr& bfile); + // Mark file as obsolete and move the file to obsolete file list. + // + // REQUIRED: hold write lock of mutex_ or during DB open. + void ObsoleteBlobFile(std::shared_ptr blob_file, + SequenceNumber obsolete_seq, bool update_size); + uint64_t ExtractExpiration(const Slice& key, const Slice& value, Slice* value_slice, std::string* new_value); @@ -243,8 +251,6 @@ class BlobDBImpl : public BlobDB { std::shared_ptr FindBlobFileLocked(uint64_t expiration) const; - void Shutdown(); - // periodic sanity check. Bunch of checks std::pair SanityCheck(bool aborted); @@ -315,11 +321,12 @@ class BlobDBImpl : public BlobDB { uint64_t EpochNow() { return env_->NowMicros() / 1000000; } - Status CheckSize(size_t blob_size); - - std::shared_ptr GetOldestBlobFile(); - - bool EvictOldestBlobFile(); + // Check if inserting a new blob will make DB grow out of space. + // If is_fifo = true, FIFO eviction will be triggered to make room for the + // new blob. If force_evict = true, FIFO eviction will evict blob files + // even eviction will not make enough room for the new blob. + Status CheckSizeAndEvictBlobFiles(uint64_t blob_size, + bool force_evict = false); // name of the database directory std::string dbname_; @@ -366,10 +373,10 @@ class BlobDBImpl : public BlobDB { // all the blob files which are currently being appended to based // on variety of incoming TTL's - std::multiset, blobf_compare_ttl> open_ttl_files_; + std::set, blobf_compare_ttl> open_ttl_files_; - // atomic bool to represent shutdown - std::atomic shutdown_; + // Flag to check whether Close() has been called on this DB + bool closed_; // timer based queue to execute tasks TimerQueue tqueue_; @@ -378,14 +385,25 @@ class BlobDBImpl : public BlobDB { // counter is used to monitor and close excess RA files. std::atomic open_file_count_; - // total size of all blob files at a given time - std::atomic total_blob_space_; + // Total size of all live blob files (i.e. exclude obsolete files). + std::atomic total_blob_size_; + + // total size of SST files. + std::atomic live_sst_size_; + + // Latest FIFO eviction timestamp + // + // REQUIRES: access with metex_ lock held. + uint64_t fifo_eviction_seq_; + + // The expiration up to which latest FIFO eviction evicts. + // + // REQUIRES: access with metex_ lock held. + uint64_t evict_expiration_up_to_; + std::list> obsolete_files_; - bool open_p1_done_; uint32_t debug_level_; - - std::atomic oldest_file_evicted_; }; } // namespace blob_db diff --git a/utilities/blob_db/blob_db_iterator.h b/utilities/blob_db/blob_db_iterator.h index f901df366..38bbfac62 100644 --- a/utilities/blob_db/blob_db_iterator.h +++ b/utilities/blob_db/blob_db_iterator.h @@ -46,28 +46,36 @@ class BlobDBIterator : public Iterator { StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS); RecordTick(statistics_, BLOB_DB_NUM_SEEK); iter_->SeekToFirst(); - UpdateBlobValue(); + while (UpdateBlobValue()) { + iter_->Next(); + } } void SeekToLast() override { StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS); RecordTick(statistics_, BLOB_DB_NUM_SEEK); iter_->SeekToLast(); - UpdateBlobValue(); + while (UpdateBlobValue()) { + iter_->Prev(); + } } void Seek(const Slice& target) override { StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS); RecordTick(statistics_, BLOB_DB_NUM_SEEK); iter_->Seek(target); - UpdateBlobValue(); + while (UpdateBlobValue()) { + iter_->Next(); + } } void SeekForPrev(const Slice& target) override { StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS); RecordTick(statistics_, BLOB_DB_NUM_SEEK); iter_->SeekForPrev(target); - UpdateBlobValue(); + while (UpdateBlobValue()) { + iter_->Prev(); + } } void Next() override { @@ -75,7 +83,9 @@ class BlobDBIterator : public Iterator { StopWatch next_sw(env_, statistics_, BLOB_DB_NEXT_MICROS); RecordTick(statistics_, BLOB_DB_NUM_NEXT); iter_->Next(); - UpdateBlobValue(); + while (UpdateBlobValue()) { + iter_->Next(); + } } void Prev() override { @@ -83,7 +93,9 @@ class BlobDBIterator : public Iterator { StopWatch prev_sw(env_, statistics_, BLOB_DB_PREV_MICROS); RecordTick(statistics_, BLOB_DB_NUM_PREV); iter_->Prev(); - UpdateBlobValue(); + while (UpdateBlobValue()) { + iter_->Prev(); + } } Slice key() const override { @@ -102,12 +114,24 @@ class BlobDBIterator : public Iterator { // Iterator::Refresh() not supported. private: - void UpdateBlobValue() { + // Return true if caller should continue to next value. + bool UpdateBlobValue() { TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:1"); TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:2"); value_.Reset(); - if (iter_->Valid() && iter_->IsBlob()) { - status_ = blob_db_->GetBlobValue(iter_->key(), iter_->value(), &value_); + if (iter_->Valid() && iter_->status().ok() && iter_->IsBlob()) { + Status s = blob_db_->GetBlobValue(iter_->key(), iter_->value(), &value_); + if (s.IsNotFound()) { + return true; + } else { + if (!s.ok()) { + status_ = s; + } + return false; + } + return status_.IsNotFound(); + } else { + return false; } } diff --git a/utilities/blob_db/blob_db_listener.h b/utilities/blob_db/blob_db_listener.h new file mode 100644 index 000000000..f096d238b --- /dev/null +++ b/utilities/blob_db/blob_db_listener.h @@ -0,0 +1,46 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE + +#include + +#include "rocksdb/listener.h" +#include "util/mutexlock.h" +#include "utilities/blob_db/blob_db_impl.h" + +namespace rocksdb { +namespace blob_db { + +class BlobDBListener : public EventListener { + public: + explicit BlobDBListener(BlobDBImpl* blob_db_impl) + : blob_db_impl_(blob_db_impl) {} + + void OnFlushBegin(DB* /*db*/, const FlushJobInfo& /*info*/) override { + assert(blob_db_impl_ != nullptr); + blob_db_impl_->SyncBlobFiles(); + } + + void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& /*info*/) override { + assert(blob_db_impl_ != nullptr); + blob_db_impl_->UpdateLiveSSTSize(); + } + + void OnCompactionCompleted(DB* /*db*/, + const CompactionJobInfo& /*info*/) override { + assert(blob_db_impl_ != nullptr); + blob_db_impl_->UpdateLiveSSTSize(); + } + + private: + BlobDBImpl* blob_db_impl_; +}; + +} // namespace blob_db +} // namespace rocksdb +#endif // !ROCKSDB_LITE diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 2fa8fe12a..32e5effbb 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -45,7 +45,10 @@ class BlobDBTest : public testing::Test { assert(s.ok()); } - ~BlobDBTest() { Destroy(); } + ~BlobDBTest() { + SyncPoint::GetInstance()->ClearAllCallBacks(); + Destroy(); + } Status TryOpen(BlobDBOptions bdb_options = BlobDBOptions(), Options options = Options()) { @@ -80,8 +83,13 @@ class BlobDBTest : public testing::Test { return reinterpret_cast(blob_db_); } - Status Put(const Slice &key, const Slice &value) { - return blob_db_->Put(WriteOptions(), key, value); + Status Put(const Slice &key, const Slice &value, + std::map *data = nullptr) { + Status s = blob_db_->Put(WriteOptions(), key, value); + if (data != nullptr) { + (*data)[key.ToString()] = value.ToString(); + } + return s; } void Delete(const std::string &key, @@ -92,6 +100,15 @@ class BlobDBTest : public testing::Test { } } + Status PutWithTTL(const Slice &key, const Slice &value, uint64_t ttl, + std::map *data = nullptr) { + Status s = blob_db_->PutWithTTL(WriteOptions(), key, value, ttl); + if (data != nullptr) { + (*data)[key.ToString()] = value.ToString(); + } + return s; + } + Status PutUntil(const Slice &key, const Slice &value, uint64_t expiration) { return blob_db_->PutUntil(WriteOptions(), key, value, expiration); } @@ -747,7 +764,7 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) { } // This test is no longer valid since we now return an error when we go -// over the configured blob_dir_size. +// over the configured max_db_size. // The test needs to be re-written later in such a way that writes continue // after a GC happens. TEST_F(BlobDBTest, DISABLED_GCOldestSimpleBlobFileWhenOutOfSpace) { @@ -755,7 +772,7 @@ TEST_F(BlobDBTest, DISABLED_GCOldestSimpleBlobFileWhenOutOfSpace) { Options options; options.env = mock_env_.get(); BlobDBOptions bdb_options; - bdb_options.blob_dir_size = 100; + bdb_options.max_db_size = 100; bdb_options.blob_file_size = 100; bdb_options.min_blob_size = 0; bdb_options.disable_background_tasks = true; @@ -1038,13 +1055,14 @@ TEST_F(BlobDBTest, MigrateFromPlainRocksDB) { } // Test to verify that a NoSpace IOError Status is returned on reaching -// blob_dir_size limit. +// max_db_size limit. TEST_F(BlobDBTest, OutOfSpace) { // Use mock env to stop wall clock. Options options; options.env = mock_env_.get(); BlobDBOptions bdb_options; - bdb_options.blob_dir_size = 150; + bdb_options.max_db_size = 200; + bdb_options.is_fifo = false; bdb_options.disable_background_tasks = true; Open(bdb_options); @@ -1053,16 +1071,16 @@ TEST_F(BlobDBTest, OutOfSpace) { std::string value(100, 'v'); ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 60)); - // Putting another blob should fail as ading it would exceed the blob_dir_size + // Putting another blob should fail as ading it would exceed the max_db_size // limit. Status s = blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60); ASSERT_TRUE(s.IsIOError()); ASSERT_TRUE(s.IsNoSpace()); } -TEST_F(BlobDBTest, EvictOldestFileWhenCloseToSpaceLimit) { +TEST_F(BlobDBTest, FIFOEviction) { BlobDBOptions bdb_options; - bdb_options.blob_dir_size = 270; + bdb_options.max_db_size = 200; bdb_options.blob_file_size = 100; bdb_options.is_fifo = true; bdb_options.disable_background_tasks = true; @@ -1078,32 +1096,36 @@ TEST_F(BlobDBTest, EvictOldestFileWhenCloseToSpaceLimit) { // So a 100 byte blob should take up 132 bytes. std::string value(100, 'v'); ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 10)); + VerifyDB({{"key1", value}}); - auto *bdb_impl = static_cast(blob_db_); - auto blob_files = bdb_impl->TEST_GetBlobFiles(); - ASSERT_EQ(1, blob_files.size()); + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); // Adding another 100 byte blob would take the total size to 264 bytes - // (2*132), which is more than 90% of blob_dir_size. So, the oldest file - // should be evicted and put in obsolete files list. + // (2*132). max_db_size will be exceeded + // than max_db_size and trigger FIFO eviction. ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60)); + ASSERT_EQ(1, evict_count); + // key1 will exist until corresponding file be deleted. + VerifyDB({{"key1", value}, {"key2", value}}); - auto obsolete_files = bdb_impl->TEST_GetObsoleteFiles(); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(2, blob_files.size()); + ASSERT_TRUE(blob_files[0]->Obsolete()); + ASSERT_FALSE(blob_files[1]->Obsolete()); + auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); ASSERT_EQ(1, obsolete_files.size()); - ASSERT_TRUE(obsolete_files[0]->Immutable()); - ASSERT_EQ(blob_files[0]->BlobFileNumber(), - obsolete_files[0]->BlobFileNumber()); + ASSERT_EQ(blob_files[0], obsolete_files[0]); - bdb_impl->TEST_DeleteObsoleteFiles(); - obsolete_files = bdb_impl->TEST_GetObsoleteFiles(); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); ASSERT_TRUE(obsolete_files.empty()); - ASSERT_EQ(1, evict_count); + VerifyDB({{"key2", value}}); } -TEST_F(BlobDBTest, NoOldestFileToEvict) { +TEST_F(BlobDBTest, FIFOEviction_NoOldestFileToEvict) { Options options; BlobDBOptions bdb_options; - bdb_options.blob_dir_size = 1000; + bdb_options.max_db_size = 1000; bdb_options.blob_file_size = 5000; bdb_options.is_fifo = true; bdb_options.disable_background_tasks = true; @@ -1116,11 +1138,97 @@ TEST_F(BlobDBTest, NoOldestFileToEvict) { SyncPoint::GetInstance()->EnableProcessing(); std::string value(2000, 'v'); - ASSERT_OK(Put("foo", std::string(2000, 'v'))); - ASSERT_OK(Put("bar", std::string(2000, 'v'))); + ASSERT_TRUE(Put("foo", std::string(2000, 'v')).IsNoSpace()); ASSERT_EQ(0, evict_count); } +TEST_F(BlobDBTest, FIFOEviction_NoEnoughBlobFilesToEvict) { + BlobDBOptions bdb_options; + bdb_options.is_fifo = true; + bdb_options.min_blob_size = 100; + bdb_options.disable_background_tasks = true; + Options options; + // Use mock env to stop wall clock. + options.env = mock_env_.get(); + options.disable_auto_compactions = true; + auto statistics = CreateDBStatistics(); + options.statistics = statistics; + Open(bdb_options, options); + + ASSERT_EQ(0, blob_db_impl()->TEST_live_sst_size()); + std::string small_value(50, 'v'); + std::map data; + // Insert some data into LSM tree to make sure FIFO eviction take SST + // file size into account. + for (int i = 0; i < 1000; i++) { + ASSERT_OK(Put("key" + ToString(i), small_value, &data)); + } + ASSERT_OK(blob_db_->Flush(FlushOptions())); + uint64_t live_sst_size = 0; + ASSERT_TRUE(blob_db_->GetIntProperty(DB::Properties::kTotalSstFilesSize, + &live_sst_size)); + ASSERT_TRUE(live_sst_size > 0); + ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size()); + + bdb_options.max_db_size = live_sst_size + 2000; + Reopen(bdb_options, options); + ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size()); + + std::string value_1k(1000, 'v'); + ASSERT_OK(PutWithTTL("large_key1", value_1k, 60, &data)); + ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + VerifyDB(data); + // large_key2 evicts large_key1 + ASSERT_OK(PutWithTTL("large_key2", value_1k, 60, &data)); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + data.erase("large_key1"); + VerifyDB(data); + // large_key3 get no enough space even after evicting large_key2, so it + // instead return no space error. + std::string value_2k(2000, 'v'); + ASSERT_TRUE(PutWithTTL("large_key3", value_2k, 60).IsNoSpace()); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + // Verify large_key2 still exists. + VerifyDB(data); +} + +// Test flush or compaction will trigger FIFO eviction since they update +// total SST file size. +TEST_F(BlobDBTest, FIFOEviction_TriggerOnSSTSizeChange) { + BlobDBOptions bdb_options; + bdb_options.max_db_size = 1000; + bdb_options.is_fifo = true; + bdb_options.min_blob_size = 100; + bdb_options.disable_background_tasks = true; + Options options; + // Use mock env to stop wall clock. + options.env = mock_env_.get(); + auto statistics = CreateDBStatistics(); + options.statistics = statistics; + options.compression = kNoCompression; + Open(bdb_options, options); + + std::string value(800, 'v'); + ASSERT_OK(PutWithTTL("large_key", value, 60)); + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + VerifyDB({{"large_key", value}}); + + // Insert some small keys and flush to bring DB out of space. + std::map data; + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put("key" + ToString(i), "v", &data)); + } + ASSERT_OK(blob_db_->Flush(FlushOptions())); + + // Verify large_key is deleted by FIFO eviction. + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + VerifyDB(data); +} + TEST_F(BlobDBTest, InlineSmallValues) { constexpr uint64_t kMaxExpiration = 1000; Random rnd(301); @@ -1197,6 +1305,7 @@ TEST_F(BlobDBTest, CompactionFilterNotSupported) { } } +// Test comapction filter should remove any expired blob index. TEST_F(BlobDBTest, FilterExpiredBlobIndex) { constexpr size_t kNumKeys = 100; constexpr size_t kNumPuts = 1000; @@ -1262,6 +1371,147 @@ TEST_F(BlobDBTest, FilterExpiredBlobIndex) { VerifyDB(data_after_compact); } +// Test compaction filter should remove any blob index where corresponding +// blob file has been removed (either by FIFO or garbage collection). +TEST_F(BlobDBTest, FilterFileNotAvailable) { + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Options options; + options.disable_auto_compactions = true; + Open(bdb_options, options); + + ASSERT_OK(Put("foo", "v1")); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_EQ(1, blob_files[0]->BlobFileNumber()); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); + + ASSERT_OK(Put("bar", "v2")); + blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(2, blob_files.size()); + ASSERT_EQ(2, blob_files[1]->BlobFileNumber()); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[1])); + + DB *base_db = blob_db_->GetRootDB(); + std::vector versions; + ASSERT_OK(GetAllKeyVersions(base_db, "", "", &versions)); + ASSERT_EQ(2, versions.size()); + ASSERT_EQ("bar", versions[0].user_key); + ASSERT_EQ("foo", versions[1].user_key); + VerifyDB({{"bar", "v2"}, {"foo", "v1"}}); + + ASSERT_OK(blob_db_->Flush(FlushOptions())); + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(GetAllKeyVersions(base_db, "", "", &versions)); + ASSERT_EQ(2, versions.size()); + ASSERT_EQ("bar", versions[0].user_key); + ASSERT_EQ("foo", versions[1].user_key); + VerifyDB({{"bar", "v2"}, {"foo", "v1"}}); + + // Remove the first blob file and compact. foo should be remove from base db. + blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[0]); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(GetAllKeyVersions(base_db, "", "", &versions)); + ASSERT_EQ(1, versions.size()); + ASSERT_EQ("bar", versions[0].user_key); + VerifyDB({{"bar", "v2"}}); + + // Remove the second blob file and compact. bar should be remove from base db. + blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[1]); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(GetAllKeyVersions(base_db, "", "", &versions)); + ASSERT_EQ(0, versions.size()); + VerifyDB({}); +} + +// Test compaction filter should filter any inlined TTL keys that would have +// been dropped by last FIFO eviction if they are store out-of-line. +TEST_F(BlobDBTest, FilterForFIFOEviction) { + Random rnd(215); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 100; + bdb_options.ttl_range_secs = 60; + bdb_options.max_db_size = 0; + bdb_options.disable_background_tasks = true; + Options options; + // Use mock env to stop wall clock. + mock_env_->set_current_time(0); + options.env = mock_env_.get(); + auto statistics = CreateDBStatistics(); + options.statistics = statistics; + options.disable_auto_compactions = true; + Open(bdb_options, options); + + std::map data; + std::map data_after_compact; + // Insert some small values that will be inlined. + for (int i = 0; i < 1000; i++) { + std::string key = "key" + ToString(i); + std::string value = test::RandomHumanReadableString(&rnd, 50); + uint64_t ttl = rnd.Next() % 120 + 1; + ASSERT_OK(PutWithTTL(key, value, ttl, &data)); + if (ttl >= 60) { + data_after_compact[key] = value; + } + } + uint64_t num_keys_to_evict = data.size() - data_after_compact.size(); + ASSERT_OK(blob_db_->Flush(FlushOptions())); + uint64_t live_sst_size = blob_db_impl()->TEST_live_sst_size(); + ASSERT_GT(live_sst_size, 0); + VerifyDB(data); + + bdb_options.max_db_size = live_sst_size + 30000; + bdb_options.is_fifo = true; + Reopen(bdb_options, options); + VerifyDB(data); + + // Put two large values, each on a different blob file. + std::string large_value(10000, 'v'); + ASSERT_OK(PutWithTTL("large_key1", large_value, 90)); + ASSERT_OK(PutWithTTL("large_key2", large_value, 150)); + ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + data["large_key1"] = large_value; + data["large_key2"] = large_value; + VerifyDB(data); + + // Put a third large value which will bring the DB out of space. + // FIFO eviction will evict the file of large_key1. + ASSERT_OK(PutWithTTL("large_key3", large_value, 150)); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size()); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + data.erase("large_key1"); + data["large_key3"] = large_value; + VerifyDB(data); + + // Putting some more small values. These values shouldn't be evicted by + // compaction filter since they are inserted after FIFO eviction. + ASSERT_OK(PutWithTTL("foo", "v", 30, &data_after_compact)); + ASSERT_OK(PutWithTTL("bar", "v", 30, &data_after_compact)); + + // FIFO eviction doesn't trigger again since there enough room for the flush. + ASSERT_OK(blob_db_->Flush(FlushOptions())); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + + // Manual compact and check if compaction filter evict those keys with + // expiration < 60. + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + // All keys with expiration < 60, plus large_key1 is filtered by + // compaction filter. + ASSERT_EQ(num_keys_to_evict + 1, + statistics->getTickerCount(BLOB_DB_BLOB_INDEX_EVICTED_COUNT)); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + data_after_compact["large_key2"] = large_value; + data_after_compact["large_key3"] = large_value; + VerifyDB(data_after_compact); +} + } // namespace blob_db } // namespace rocksdb