Blob DB: Improve FIFO eviction

Summary:
Improving blob db FIFO eviction with the following changes,
* Change blob_dir_size to max_db_size. Take into account SST file size when computing DB size.
* FIFO now only take into account live sst files and live blob files. It is normal for disk usage to go over max_db_size because there are obsolete sst files and blob files pending deletion.
* FIFO eviction now also evict TTL blob files that's still open. It doesn't evict non-TTL blob files.
* If FIFO is triggered, it will pass an expiration and the current sequence number to compaction filter. Compaction filter will then filter inlined keys to evict those with an earlier expiration and smaller sequence number. So call LSM FIFO.
* Compaction filter also filter those blob indexes where corresponding blob file is gone.
* Add an event listener to listen compaction/flush event and update sst file size.
* Implement DB::Close() to make sure base db, as well as event listener and compaction filter, destruct before blob db.
* More blob db statistics around FIFO.
* Fix some locking issue when accessing a blob file.
Closes https://github.com/facebook/rocksdb/pull/3556

Differential Revision: D7139328

Pulled By: yiwu-arbug

fbshipit-source-id: ea5edb07b33dfceacb2682f4789bea61de28bbfa
main
Yi Wu 7 years ago committed by Facebook Github Bot
parent 0a2354ca8f
commit b864bc9b5b
  1. 1
      CMakeLists.txt
  2. 1
      TARGETS
  3. 5
      db/compaction_iterator.cc
  4. 18
      include/rocksdb/statistics.h
  5. 1
      src.mk
  6. 4
      tools/db_bench_tool.cc
  7. 117
      utilities/blob_db/blob_compaction_filter.cc
  8. 69
      utilities/blob_db/blob_compaction_filter.h
  9. 44
      utilities/blob_db/blob_db.cc
  10. 17
      utilities/blob_db/blob_db.h
  11. 308
      utilities/blob_db/blob_db_impl.cc
  12. 74
      utilities/blob_db/blob_db_impl.h
  13. 42
      utilities/blob_db/blob_db_iterator.h
  14. 46
      utilities/blob_db/blob_db_listener.h
  15. 302
      utilities/blob_db/blob_db_test.cc

@ -575,6 +575,7 @@ set(SOURCES
util/transaction_test_util.cc
util/xxhash.cc
utilities/backupable/backupable_db.cc
utilities/blob_db/blob_compaction_filter.cc
utilities/blob_db/blob_db.cc
utilities/blob_db/blob_db_impl.cc
utilities/blob_db/blob_dump_tool.cc

@ -217,6 +217,7 @@ cpp_library(
"util/transaction_test_util.cc",
"util/xxhash.cc",
"utilities/backupable/backupable_db.cc",
"utilities/blob_db/blob_compaction_filter.cc",
"utilities/blob_db/blob_db.cc",
"utilities/blob_db/blob_db_impl.cc",
"utilities/blob_db/blob_dump_tool.cc",

@ -167,10 +167,13 @@ void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
CompactionFilter::ValueType value_type =
ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue
: CompactionFilter::ValueType::kBlobIndex;
// Hack: pass internal key to BlobIndexCompactionFilter since it needs
// to get sequence number.
Slice& filter_key = ikey_.type == kTypeValue ? ikey_.user_key : key_;
{
StopWatchNano timer(env_, true);
filter = compaction_filter_->FilterV2(
compaction_->level(), ikey_.user_key, value_type, value_,
compaction_->level(), filter_key, value_type, value_,
&compaction_filter_value_, compaction_filter_skip_until_.rep());
iter_stats_.total_filter_time +=
env_ != nullptr ? timer.ElapsedNanos() : 0;

@ -265,7 +265,16 @@ enum Tickers : uint32_t {
BLOB_DB_BLOB_FILE_SYNCED,
// # of blob index evicted from base DB by BlobDB compaction filter because
// of expiration.
BLOB_DB_BLOB_INDEX_EXPIRED,
BLOB_DB_BLOB_INDEX_EXPIRED_COUNT,
// size of blob index evicted from base DB by BlobDB compaction filter
// because of expiration.
BLOB_DB_BLOB_INDEX_EXPIRED_SIZE,
// # of blob index evicted from base DB by BlobDB compaction filter because
// of corresponding file deleted.
BLOB_DB_BLOB_INDEX_EVICTED_COUNT,
// size of blob index evicted from base DB by BlobDB compaction filter
// because of corresponding file deleted.
BLOB_DB_BLOB_INDEX_EVICTED_SIZE,
// # of blob files being garbage collected.
BLOB_DB_GC_NUM_FILES,
// # of blob files generated by garbage collection.
@ -417,7 +426,12 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{BLOB_DB_BLOB_FILE_BYTES_WRITTEN, "rocksdb.blobdb.blob.file.bytes.written"},
{BLOB_DB_BLOB_FILE_BYTES_READ, "rocksdb.blobdb.blob.file.bytes.read"},
{BLOB_DB_BLOB_FILE_SYNCED, "rocksdb.blobdb.blob.file.synced"},
{BLOB_DB_BLOB_INDEX_EXPIRED, "rocksdb.blobdb.blob.index.expired"},
{BLOB_DB_BLOB_INDEX_EXPIRED_COUNT,
"rocksdb.blobdb.blob.index.expired.count"},
{BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, "rocksdb.blobdb.blob.index.expired.size"},
{BLOB_DB_BLOB_INDEX_EVICTED_COUNT,
"rocksdb.blobdb.blob.index.evicted.count"},
{BLOB_DB_BLOB_INDEX_EVICTED_SIZE, "rocksdb.blobdb.blob.index.evicted.size"},
{BLOB_DB_GC_NUM_FILES, "rocksdb.blobdb.gc.num.files"},
{BLOB_DB_GC_NUM_NEW_FILES, "rocksdb.blobdb.gc.num.new.files"},
{BLOB_DB_GC_FAILURES, "rocksdb.blobdb.gc.failures"},

@ -153,6 +153,7 @@ LIB_SOURCES = \
util/transaction_test_util.cc \
util/xxhash.cc \
utilities/backupable/backupable_db.cc \
utilities/blob_db/blob_compaction_filter.cc \
utilities/blob_db/blob_db.cc \
utilities/blob_db/blob_db_impl.cc \
utilities/blob_db/blob_file.cc \

@ -686,7 +686,7 @@ DEFINE_bool(blob_db_enable_gc, false, "Enable BlobDB garbage collection.");
DEFINE_bool(blob_db_is_fifo, false, "Enable FIFO eviction strategy in BlobDB.");
DEFINE_uint64(blob_db_dir_size, 0,
DEFINE_uint64(blob_db_max_db_size, 0,
"Max size limit of the directory where blob files are stored.");
DEFINE_uint64(blob_db_max_ttl_range, 86400,
@ -3446,7 +3446,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
blob_db::BlobDBOptions blob_db_options;
blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc;
blob_db_options.is_fifo = FLAGS_blob_db_is_fifo;
blob_db_options.blob_dir_size = FLAGS_blob_db_dir_size;
blob_db_options.max_db_size = FLAGS_blob_db_max_db_size;
blob_db_options.ttl_range_secs = FLAGS_blob_db_ttl_range_secs;
blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size;
blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync;

@ -0,0 +1,117 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_compaction_filter.h"
#include "db/dbformat.h"
namespace rocksdb {
namespace blob_db {
namespace {
// CompactionFilter to delete expired blob index from base DB.
class BlobIndexCompactionFilter : public CompactionFilter {
public:
BlobIndexCompactionFilter(BlobCompactionContext context,
uint64_t current_time, Statistics* statistics)
: context_(context),
current_time_(current_time),
statistics_(statistics) {}
virtual ~BlobIndexCompactionFilter() {
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_);
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_);
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_);
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_);
}
virtual const char* Name() const override {
return "BlobIndexCompactionFilter";
}
// Filter expired blob indexes regardless of snapshots.
virtual bool IgnoreSnapshots() const override { return true; }
virtual Decision FilterV2(int /*level*/, const Slice& key,
ValueType value_type, const Slice& value,
std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
if (value_type != kBlobIndex) {
return Decision::kKeep;
}
BlobIndex blob_index;
Status s = blob_index.DecodeFrom(value);
if (!s.ok()) {
// Unable to decode blob index. Keeping the value.
return Decision::kKeep;
}
if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) {
// Expired
expired_count_++;
expired_size_ += key.size() + value.size();
return Decision::kRemove;
}
if (!blob_index.IsInlined() &&
blob_index.file_number() < context_.next_file_number &&
context_.current_blob_files.count(blob_index.file_number()) == 0) {
// Corresponding blob file gone. Could have been garbage collected or
// evicted by FIFO eviction.
evicted_count_++;
evicted_size_ += key.size() + value.size();
return Decision::kRemove;
}
if (context_.fifo_eviction_seq > 0 && blob_index.HasTTL() &&
blob_index.expiration() < context_.evict_expiration_up_to) {
// Hack: Internal key is passed to BlobIndexCompactionFilter for it to
// get sequence number.
ParsedInternalKey ikey;
bool ok = ParseInternalKey(key, &ikey);
// Remove keys that could have been remove by last FIFO eviction.
// If get error while parsing key, ignore and continue.
if (ok && ikey.sequence < context_.fifo_eviction_seq) {
evicted_count_++;
evicted_size_ += key.size() + value.size();
return Decision::kRemove;
}
}
return Decision::kKeep;
}
private:
BlobCompactionContext context_;
const uint64_t current_time_;
Statistics* statistics_;
// It is safe to not using std::atomic since the compaction filter, created
// from a compaction filter factroy, will not be called from multiple threads.
mutable uint64_t expired_count_ = 0;
mutable uint64_t expired_size_ = 0;
mutable uint64_t evicted_count_ = 0;
mutable uint64_t evicted_size_ = 0;
};
} // anonymous namespace
std::unique_ptr<CompactionFilter>
BlobIndexCompactionFilterFactory::CreateCompactionFilter(
const CompactionFilter::Context& /*context*/) {
int64_t current_time = 0;
Status s = env_->GetCurrentTime(&current_time);
if (!s.ok()) {
return nullptr;
}
assert(current_time >= 0);
BlobCompactionContext context;
blob_db_impl_->GetCompactionContext(&context);
return std::unique_ptr<CompactionFilter>(new BlobIndexCompactionFilter(
context, static_cast<uint64_t>(current_time), statistics_));
}
} // namespace blob_db
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -5,82 +5,39 @@
#pragma once
#ifndef ROCKSDB_LITE
#include <unordered_set>
#include "monitoring/statistics.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/env.h"
#include "utilities/blob_db/blob_db_impl.h"
#include "utilities/blob_db/blob_index.h"
namespace rocksdb {
namespace blob_db {
// CompactionFilter to delete expired blob index from base DB.
class BlobIndexCompactionFilter : public CompactionFilter {
public:
BlobIndexCompactionFilter(uint64_t current_time, Statistics* statistics)
: current_time_(current_time), statistics_(statistics) {}
virtual ~BlobIndexCompactionFilter() {
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED, expired_count_);
}
virtual const char* Name() const override {
return "BlobIndexCompactionFilter";
}
// Filter expired blob indexes regardless of snapshots.
virtual bool IgnoreSnapshots() const override { return true; }
virtual Decision FilterV2(int /*level*/, const Slice& /*key*/,
ValueType value_type, const Slice& value,
std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
if (value_type != kBlobIndex) {
return Decision::kKeep;
}
BlobIndex blob_index;
Status s = blob_index.DecodeFrom(value);
if (!s.ok()) {
// Unable to decode blob index. Keeping the value.
return Decision::kKeep;
}
if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) {
// Expired
expired_count_++;
return Decision::kRemove;
}
return Decision::kKeep;
}
private:
const uint64_t current_time_;
Statistics* statistics_;
// It is safe to not using std::atomic since the compaction filter, created
// from a compaction filter factroy, will not be called from multiple threads.
mutable uint64_t expired_count_ = 0;
struct BlobCompactionContext {
uint64_t next_file_number;
std::unordered_set<uint64_t> current_blob_files;
SequenceNumber fifo_eviction_seq;
uint64_t evict_expiration_up_to;
};
class BlobIndexCompactionFilterFactory : public CompactionFilterFactory {
public:
BlobIndexCompactionFilterFactory(Env* env, Statistics* statistics)
: env_(env), statistics_(statistics) {}
BlobIndexCompactionFilterFactory(BlobDBImpl* blob_db_impl, Env* env,
Statistics* statistics)
: blob_db_impl_(blob_db_impl), env_(env), statistics_(statistics) {}
virtual const char* Name() const override {
return "BlobIndexCompactionFilterFactory";
}
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& /*context*/) override {
int64_t current_time = 0;
Status s = env_->GetCurrentTime(&current_time);
if (!s.ok()) {
return nullptr;
}
assert(current_time >= 0);
return std::unique_ptr<CompactionFilter>(new BlobIndexCompactionFilter(
static_cast<uint64_t>(current_time), statistics_));
}
const CompactionFilter::Context& /*context*/) override;
private:
BlobDBImpl* blob_db_impl_;
Env* env_;
Statistics* statistics_;
};

@ -63,29 +63,47 @@ Status BlobDB::Open(const DBOptions& db_options,
BlobDB::BlobDB() : StackableDB(nullptr) {}
void BlobDBOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir: %s",
ROCKS_LOG_HEADER(
log, " BlobDBOptions.blob_dir: %s",
blob_dir.c_str());
ROCKS_LOG_HEADER(log, " blob_db_options.path_relative: %d",
ROCKS_LOG_HEADER(
log, " BlobDBOptions.path_relative: %d",
path_relative);
ROCKS_LOG_HEADER(log, " blob_db_options.is_fifo: %d",
ROCKS_LOG_HEADER(
log, " BlobDBOptions.is_fifo: %d",
is_fifo);
ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir_size: %" PRIu64,
blob_dir_size);
ROCKS_LOG_HEADER(log, " blob_db_options.ttl_range_secs: %" PRIu32,
ROCKS_LOG_HEADER(
log, " BlobDBOptions.max_db_size: %" PRIu64,
max_db_size);
ROCKS_LOG_HEADER(
log, " BlobDBOptions.ttl_range_secs: %" PRIu32,
ttl_range_secs);
ROCKS_LOG_HEADER(log, " blob_db_options.min_blob_size: %" PRIu64,
ROCKS_LOG_HEADER(
log, " BlobDBOptions.min_blob_size: %" PRIu64,
min_blob_size);
ROCKS_LOG_HEADER(log, " blob_db_options.bytes_per_sync: %" PRIu64,
ROCKS_LOG_HEADER(
log, " BlobDBOptions.bytes_per_sync: %" PRIu64,
bytes_per_sync);
ROCKS_LOG_HEADER(log, " blob_db_options.blob_file_size: %" PRIu64,
ROCKS_LOG_HEADER(
log, " BlobDBOptions.blob_file_size: %" PRIu64,
blob_file_size);
ROCKS_LOG_HEADER(log, " blob_db_options.ttl_extractor: %p",
ROCKS_LOG_HEADER(
log, " BlobDBOptions.ttl_extractor: %p",
ttl_extractor.get());
ROCKS_LOG_HEADER(log, " blob_db_options.compression: %d",
ROCKS_LOG_HEADER(
log, " BlobDBOptions.compression: %d",
static_cast<int>(compression));
ROCKS_LOG_HEADER(log, "blob_db_options.enable_garbage_collection: %d",
ROCKS_LOG_HEADER(
log, " BlobDBOptions.enable_garbage_collection: %d",
enable_garbage_collection);
ROCKS_LOG_HEADER(log, " blob_db_options.disable_background_tasks: %d",
ROCKS_LOG_HEADER(
log, " BlobDBOptions.garbage_collection_interval_secs: %" PRIu64,
garbage_collection_interval_secs);
ROCKS_LOG_HEADER(
log, "BlobDBOptions.garbage_collection_deletion_size_threshold: %lf",
garbage_collection_deletion_size_threshold);
ROCKS_LOG_HEADER(
log, " BlobDBOptions.disable_background_tasks: %d",
disable_background_tasks);
}

@ -36,13 +36,17 @@ struct BlobDBOptions {
// whether the blob_dir path is relative or absolute.
bool path_relative = true;
// is the eviction strategy fifo based
// When max_db_size is reached, evict blob files to free up space
// instead of returnning NoSpace error on write. Blob files will be
// evicted in this order until enough space is free up:
// * the TTL blob file cloeset to expire,
// * the oldest non-TTL blob file.
bool is_fifo = false;
// maximum size of the blob dir. Once this gets used, up
// evict the blob file which is oldest (is_fifo )
// 0 means no limits
uint64_t blob_dir_size = 0;
// Maximum size of the database (including SST files and blob files).
//
// Default: 0 (no limits)
uint64_t max_db_size = 0;
// a new bucket is opened, for ttl_range. So if ttl_range is 600seconds
// (10 minutes), and the first bucket starts at 1471542000
@ -198,6 +202,9 @@ class BlobDB : public StackableDB {
return NewIterator(options);
}
using rocksdb::StackableDB::Close;
virtual Status Close() override = 0;
// Opening blob db.
static Status Open(const Options& options, const BlobDBOptions& bdb_options,
const std::string& dbname, BlobDB** blob_db);

@ -35,6 +35,7 @@
#include "util/timer_queue.h"
#include "utilities/blob_db/blob_compaction_filter.h"
#include "utilities/blob_db/blob_db_iterator.h"
#include "utilities/blob_db/blob_db_listener.h"
#include "utilities/blob_db/blob_index.h"
namespace {
@ -44,12 +45,6 @@ int kBlockBasedTableVersionFormat = 2;
namespace rocksdb {
namespace blob_db {
void BlobDBFlushBeginListener::OnFlushBegin(DB* /*db*/,
const FlushJobInfo& /*info*/) {
assert(blob_db_impl_ != nullptr);
blob_db_impl_->SyncBlobFiles();
}
WalFilter::WalProcessingOption BlobReconcileWalFilter::LogRecordFound(
unsigned long long /*log_number*/, const std::string& /*log_file_name*/,
const WriteBatch& /*batch*/, WriteBatch* /*new_batch*/,
@ -59,6 +54,7 @@ WalFilter::WalProcessingOption BlobReconcileWalFilter::LogRecordFound(
bool blobf_compare_ttl::operator()(const std::shared_ptr<BlobFile>& lhs,
const std::shared_ptr<BlobFile>& rhs) const {
assert(lhs->HasTTL() && rhs->HasTTL());
if (lhs->expiration_range_.first < rhs->expiration_range_.first) {
return true;
}
@ -84,12 +80,13 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
statistics_(db_options_.statistics.get()),
next_file_number_(1),
epoch_of_(0),
shutdown_(false),
closed_(true),
open_file_count_(0),
total_blob_space_(0),
open_p1_done_(false),
debug_level_(0),
oldest_file_evicted_(false) {
total_blob_size_(0),
live_sst_size_(0),
fifo_eviction_seq_(0),
evict_expiration_up_to_(0),
debug_level_(0) {
blob_dir_ = (bdb_options_.path_relative)
? dbname + "/" + bdb_options_.blob_dir
: bdb_options_.blob_dir;
@ -98,8 +95,30 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
BlobDBImpl::~BlobDBImpl() {
// CancelAllBackgroundWork(db_, true);
Status s __attribute__((__unused__)) = Close();
assert(s.ok());
}
Shutdown();
Status BlobDBImpl::Close() {
if (closed_) {
return Status::OK();
}
closed_ = true;
// Close base DB before BlobDBImpl destructs to stop event listener and
// compaction filter call.
Status s = db_->Close();
// delete db_ anyway even if close failed.
delete db_;
// Reset pointers to avoid StackableDB delete the pointer again.
db_ = nullptr;
db_impl_ = nullptr;
if (!s.ok()) {
return s;
}
s = SyncBlobFiles();
return s;
}
BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; }
@ -149,10 +168,9 @@ Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
}
// Update options
db_options_.listeners.push_back(
std::shared_ptr<EventListener>(new BlobDBFlushBeginListener(this)));
db_options_.listeners.push_back(std::make_shared<BlobDBListener>(this));
cf_options_.compaction_filter_factory.reset(
new BlobIndexCompactionFilterFactory(env_, statistics_));
new BlobIndexCompactionFilterFactory(this, env_, statistics_));
// Open base db.
ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_);
@ -161,6 +179,7 @@ Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
return s;
}
db_impl_ = static_cast_with_check<DBImpl, DB>(db_->GetRootDB());
UpdateLiveSSTSize();
// Start background jobs.
if (!bdb_options_.disable_background_tasks) {
@ -169,6 +188,7 @@ Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
ROCKS_LOG_INFO(db_options_.info_log, "BlobDB pointer %p", this);
bdb_options_.Dump(db_options_.info_log.get());
closed_ = false;
return s;
}
@ -190,8 +210,6 @@ void BlobDBImpl::StartBackgroundTasks() {
std::bind(&BlobDBImpl::CheckSeqFiles, this, std::placeholders::_1));
}
void BlobDBImpl::Shutdown() { shutdown_.store(true); }
Status BlobDBImpl::GetAllBlobFiles(std::set<uint64_t>* file_numbers) {
assert(file_numbers != nullptr);
std::vector<std::string> all_files;
@ -241,8 +259,7 @@ Status BlobDBImpl::OpenAllBlobFiles() {
Status read_metadata_status = blob_file->ReadMetadata(env_, env_options_);
if (read_metadata_status.IsCorruption()) {
// Remove incomplete file.
blob_file->MarkObsolete(0 /*sequence number*/);
obsolete_files_.push_back(blob_file);
ObsoleteBlobFile(blob_file, 0 /*obsolete_seq*/, false /*update_size*/);
if (!obsolete_file_list.empty()) {
obsolete_file_list.append(", ");
}
@ -256,6 +273,8 @@ Status BlobDBImpl::OpenAllBlobFiles() {
return read_metadata_status;
}
total_blob_size_ += blob_file->GetFileSize();
blob_files_[file_number] = blob_file;
if (!blob_file_list.empty()) {
blob_file_list.append(", ");
@ -343,25 +362,33 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked(
uint64_t expiration) const {
if (open_ttl_files_.empty()) return nullptr;
if (open_ttl_files_.empty()) {
return nullptr;
}
std::shared_ptr<BlobFile> tmp = std::make_shared<BlobFile>();
tmp->SetHasTTL(true);
tmp->expiration_range_ = std::make_pair(expiration, 0);
tmp->file_number_ = std::numeric_limits<uint64_t>::max();
auto citr = open_ttl_files_.equal_range(tmp);
if (citr.first == open_ttl_files_.end()) {
assert(citr.second == open_ttl_files_.end());
std::shared_ptr<BlobFile> check = *(open_ttl_files_.rbegin());
return (check->expiration_range_.second < expiration) ? nullptr : check;
return (check->expiration_range_.second <= expiration) ? nullptr : check;
}
if (citr.first != citr.second) return *(citr.first);
if (citr.first != citr.second) {
return *(citr.first);
}
auto finditr = citr.second;
if (finditr != open_ttl_files_.begin()) --finditr;
if (finditr != open_ttl_files_.begin()) {
--finditr;
}
bool b2 = (*finditr)->expiration_range_.second < expiration;
bool b2 = (*finditr)->expiration_range_.second <= expiration;
bool b1 = (*finditr)->expiration_range_.first > expiration;
return (b1 || b2) ? nullptr : (*finditr);
@ -426,6 +453,7 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFile() {
blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile));
open_non_ttl_file_ = bfile;
total_blob_size_ += BlobLogHeader::kSize;
return bfile;
}
@ -500,6 +528,7 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) {
blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile));
open_ttl_files_.insert(bfile);
total_blob_size_ += BlobLogHeader::kSize;
epoch_of_++;
return bfile;
@ -663,6 +692,7 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/,
const Slice& key, const Slice& value,
uint64_t expiration, WriteBatch* batch) {
write_mutex_.AssertHeld();
Status s;
std::string index_entry;
uint32_t column_family_id =
@ -680,20 +710,27 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/,
RecordTick(statistics_, BLOB_DB_WRITE_INLINED_TTL);
}
} else {
std::shared_ptr<BlobFile> bfile = (expiration != kNoExpiration)
? SelectBlobFileTTL(expiration)
: SelectBlobFile();
if (!bfile) {
return Status::NotFound("Blob file not found");
}
assert(bfile->compression() == bdb_options_.compression);
std::string compression_output;
Slice value_compressed = GetCompressedSlice(value, &compression_output);
std::string headerbuf;
Writer::ConstructBlobHeader(&headerbuf, key, value_compressed, expiration);
// Check DB size limit before selecting blob file to
// Since CheckSizeAndEvictBlobFiles() can close blob files, it needs to be
// done before calling SelectBlobFile().
s = CheckSizeAndEvictBlobFiles(headerbuf.size() + key.size() +
value_compressed.size());
if (!s.ok()) {
return s;
}
std::shared_ptr<BlobFile> bfile = (expiration != kNoExpiration)
? SelectBlobFileTTL(expiration)
: SelectBlobFile();
assert(bfile != nullptr);
assert(bfile->compression() == bdb_options_.compression);
s = AppendBlob(bfile, headerbuf, key, value_compressed, expiration,
&index_entry);
if (expiration == kNoExpiration) {
@ -756,66 +793,118 @@ uint64_t BlobDBImpl::ExtractExpiration(const Slice& key, const Slice& value,
return has_expiration ? expiration : kNoExpiration;
}
std::shared_ptr<BlobFile> BlobDBImpl::GetOldestBlobFile() {
std::vector<std::shared_ptr<BlobFile>> blob_files;
CopyBlobFiles(&blob_files, [](const std::shared_ptr<BlobFile>& f) {
return !f->Obsolete() && f->Immutable();
});
if (blob_files.empty()) {
return nullptr;
void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) {
ReadLock l(&mutex_);
context->next_file_number = next_file_number_.load();
context->current_blob_files.clear();
for (auto& p : blob_files_) {
context->current_blob_files.insert(p.first);
}
blobf_compare_ttl compare;
return *std::min_element(blob_files.begin(), blob_files.end(), compare);
context->fifo_eviction_seq = fifo_eviction_seq_;
context->evict_expiration_up_to = evict_expiration_up_to_;
}
bool BlobDBImpl::EvictOldestBlobFile() {
auto oldest_file = GetOldestBlobFile();
if (oldest_file == nullptr) {
return false;
void BlobDBImpl::UpdateLiveSSTSize() {
uint64_t live_sst_size = 0;
bool ok = GetIntProperty(DB::Properties::kLiveSstFilesSize, &live_sst_size);
if (ok) {
live_sst_size_.store(live_sst_size);
ROCKS_LOG_INFO(db_options_.info_log,
"Updated total SST file size: %" PRIu64 " bytes.",
live_sst_size);
} else {
ROCKS_LOG_ERROR(
db_options_.info_log,
"Failed to update total SST file size after flush or compaction.");
}
{
// Trigger FIFO eviction if needed.
MutexLock l(&write_mutex_);
Status s = CheckSizeAndEvictBlobFiles(0, true /*force*/);
if (s.IsNoSpace()) {
ROCKS_LOG_WARN(db_options_.info_log,
"DB grow out-of-space after SST size updated. Current live"
" SST size: %" PRIu64
" , current blob files size: %" PRIu64 ".",
live_sst_size_.load(), total_blob_size_.load());
}
}
}
WriteLock wl(&mutex_);
// Double check the file is not obsolete by others
if (oldest_file_evicted_ == false && !oldest_file->Obsolete()) {
auto expiration_range = oldest_file->GetExpirationRange();
Status BlobDBImpl::CheckSizeAndEvictBlobFiles(uint64_t blob_size,
bool force_evict) {
write_mutex_.AssertHeld();
uint64_t live_sst_size = live_sst_size_.load();
if (bdb_options_.max_db_size == 0 ||
live_sst_size + total_blob_size_.load() + blob_size <=
bdb_options_.max_db_size) {
return Status::OK();
}
if (bdb_options_.is_fifo == false ||
(!force_evict && live_sst_size + blob_size > bdb_options_.max_db_size)) {
// FIFO eviction is disabled, or no space to insert new blob even we evict
// all blob files.
return Status::NoSpace(
"Write failed, as writing it would exceed max_db_size limit.");
}
std::vector<std::shared_ptr<BlobFile>> candidate_files;
CopyBlobFiles(&candidate_files,
[&](const std::shared_ptr<BlobFile>& blob_file) {
// Only evict TTL files
return blob_file->HasTTL();
});
std::sort(candidate_files.begin(), candidate_files.end(),
blobf_compare_ttl());
std::reverse(candidate_files.begin(), candidate_files.end());
fifo_eviction_seq_ = GetLatestSequenceNumber();
WriteLock l(&mutex_);
while (!candidate_files.empty() &&
live_sst_size + total_blob_size_.load() + blob_size >
bdb_options_.max_db_size) {
std::shared_ptr<BlobFile> blob_file = candidate_files.back();
candidate_files.pop_back();
WriteLock file_lock(&blob_file->mutex_);
if (blob_file->Obsolete()) {
// File already obsoleted by someone else.
continue;
}
// FIFO eviction can evict open blob files.
if (!blob_file->Immutable()) {
Status s = CloseBlobFile(blob_file, false /*need_lock*/);
if (!s.ok()) {
return s;
}
}
assert(blob_file->Immutable());
auto expiration_range = blob_file->GetExpirationRange();
ROCKS_LOG_INFO(db_options_.info_log,
"Evict oldest blob file since DB out of space. Current "
"space used: %" PRIu64 ", blob dir size: %" PRIu64
", evicted blob file #%" PRIu64
"live SST file size: %" PRIu64 ", total blob size: %" PRIu64
", max db size: %" PRIu64 ", evicted blob file #%" PRIu64
" with expiration range (%" PRIu64 ", %" PRIu64 ").",
total_blob_space_.load(), bdb_options_.blob_dir_size,
oldest_file->BlobFileNumber(), expiration_range.first,
expiration_range.second);
oldest_file->MarkObsolete(GetLatestSequenceNumber());
obsolete_files_.push_back(oldest_file);
oldest_file_evicted_.store(true);
live_sst_size, total_blob_size_.load(),
bdb_options_.max_db_size, blob_file->BlobFileNumber(),
expiration_range.first, expiration_range.second);
ObsoleteBlobFile(blob_file, fifo_eviction_seq_, true /*update_size*/);
evict_expiration_up_to_ = expiration_range.first;
RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED);
RecordTick(statistics_, BLOB_DB_FIFO_NUM_KEYS_EVICTED,
oldest_file->BlobCount());
blob_file->BlobCount());
RecordTick(statistics_, BLOB_DB_FIFO_BYTES_EVICTED,
oldest_file->GetFileSize());
blob_file->GetFileSize());
TEST_SYNC_POINT("BlobDBImpl::EvictOldestBlobFile:Evicted");
return true;
}
return false;
}
Status BlobDBImpl::CheckSize(size_t blob_size) {
uint64_t new_space_util = total_blob_space_.load() + blob_size;
if (bdb_options_.blob_dir_size > 0) {
if (!bdb_options_.is_fifo &&
(new_space_util > bdb_options_.blob_dir_size)) {
if (live_sst_size + total_blob_size_.load() + blob_size >
bdb_options_.max_db_size) {
return Status::NoSpace(
"Write failed, as writing it would exceed blob_dir_size limit.");
}
if (bdb_options_.is_fifo && !oldest_file_evicted_.load() &&
(new_space_util >
kEvictOldestFileAtSize * bdb_options_.blob_dir_size)) {
EvictOldestBlobFile();
"Write failed, as writing it would exceed max_db_size limit.");
}
}
return Status::OK();
}
@ -823,18 +912,15 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
const std::string& headerbuf, const Slice& key,
const Slice& value, uint64_t expiration,
std::string* index_entry) {
auto size_put = BlobLogRecord::kHeaderSize + key.size() + value.size();
Status s = CheckSize(size_put);
if (!s.ok()) {
return s;
}
Status s;
uint64_t blob_offset = 0;
uint64_t key_offset = 0;
{
WriteLock lockbfile_w(&bfile->mutex_);
std::shared_ptr<Writer> writer = CheckOrCreateWriterLocked(bfile);
if (!writer) return Status::IOError("Failed to create blob writer");
if (!writer) {
return Status::IOError("Failed to create blob writer");
}
// write the blob to the blob log.
s = writer->EmitPhysicalRecord(headerbuf, key, value, &key_offset,
@ -851,8 +937,9 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
// increment blob count
bfile->blob_count_++;
uint64_t size_put = headerbuf.size() + key.size() + value.size();
bfile->file_size_ += size_put;
total_blob_space_ += size_put;
total_blob_size_ += size_put;
if (expiration == kNoExpiration) {
BlobIndex::EncodeBlob(index_entry, bfile->BlobFileNumber(), blob_offset,
@ -1114,14 +1201,19 @@ std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
return std::make_pair(true, -1);
}
Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile) {
Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile,
bool need_lock) {
assert(bfile != nullptr);
write_mutex_.AssertHeld();
Status s;
ROCKS_LOG_INFO(db_options_.info_log,
"Closing blob file %" PRIu64 ". Path: %s",
bfile->BlobFileNumber(), bfile->PathName().c_str());
{
WriteLock wl(&mutex_);
std::unique_ptr<WriteLock> lock;
if (need_lock) {
lock.reset(new WriteLock(&mutex_));
}
if (bfile->HasTTL()) {
size_t erased __attribute__((__unused__));
@ -1134,11 +1226,16 @@ Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile) {
}
if (!bfile->closed_.load()) {
WriteLock lockbfile_w(&bfile->mutex_);
std::unique_ptr<WriteLock> file_lock;
if (need_lock) {
file_lock.reset(new WriteLock(&bfile->mutex_));
}
s = bfile->WriteFooterAndCloseLocked();
}
if (!s.ok()) {
if (s.ok()) {
total_blob_size_ += BlobLogFooter::kSize;
} else {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to close blob file %" PRIu64 "with error: %s",
bfile->BlobFileNumber(), s.ToString().c_str());
@ -1155,6 +1252,18 @@ Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile) {
return CloseBlobFile(bfile);
}
void BlobDBImpl::ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
SequenceNumber obsolete_seq,
bool update_size) {
// Should hold write lock of mutex_ or during DB open.
blob_file->MarkObsolete(obsolete_seq);
obsolete_files_.push_back(blob_file);
assert(total_blob_size_.load() >= blob_file->GetFileSize());
if (update_size) {
total_blob_size_ -= blob_file->GetFileSize();
}
}
bool BlobDBImpl::VisibleToActiveSnapshot(
const std::shared_ptr<BlobFile>& bfile) {
assert(bfile->Obsolete());
@ -1198,6 +1307,7 @@ std::pair<bool, int64_t> BlobDBImpl::CheckSeqFiles(bool aborted) {
}
}
MutexLock l(&write_mutex_);
for (auto bfile : process_files) {
CloseBlobFile(bfile);
}
@ -1515,12 +1625,9 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
}
} // end of ReadRecord loop
if (s.ok()) {
bfptr->MarkObsolete(GetLatestSequenceNumber());
{
WriteLock wl(&mutex_);
obsolete_files_.push_back(bfptr);
}
ObsoleteBlobFile(bfptr, GetLatestSequenceNumber(), true /*update_size*/);
}
ROCKS_LOG_INFO(
@ -1543,7 +1650,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
gc_stats->bytes_overwritten);
RecordTick(statistics_, BLOB_DB_GC_BYTES_EXPIRED, gc_stats->bytes_expired);
if (newfile != nullptr) {
total_blob_space_ += newfile->file_size_;
total_blob_size_ += newfile->file_size_;
ROCKS_LOG_INFO(db_options_.info_log, "New blob file %" PRIu64 ".",
newfile->BlobFileNumber());
RecordTick(statistics_, BLOB_DB_GC_NUM_NEW_FILES);
@ -1600,7 +1707,6 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
}
file_deleted = true;
total_blob_space_ -= bfile->file_size_;
ROCKS_LOG_INFO(db_options_.info_log,
"File deleted as obsolete from blob dir %s",
bfile->PathName().c_str());
@ -1611,9 +1717,6 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
// directory change. Fsync
if (file_deleted) {
dir_ent_->Fsync();
// reset oldest_file_evicted flag
oldest_file_evicted_.store(false);
}
// put files back into obsolete if for some reason, delete failed
@ -1734,15 +1837,24 @@ void BlobDBImpl::TEST_DeleteObsoleteFiles() {
}
Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
MutexLock l(&write_mutex_);
return CloseBlobFile(bfile);
}
void BlobDBImpl::TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file,
SequenceNumber obsolete_seq,
bool update_size) {
return ObsoleteBlobFile(blob_file, obsolete_seq, update_size);
}
Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
GCStats* gc_stats) {
return GCFileAndUpdateLSM(bfile, gc_stats);
}
void BlobDBImpl::TEST_RunGC() { RunGC(false /*abort*/); }
uint64_t BlobDBImpl::TEST_live_sst_size() { return live_sst_size_.load(); }
#endif // !NDEBUG
} // namespace blob_db

@ -43,19 +43,9 @@ struct FlushJobInfo;
namespace blob_db {
class BlobFile;
struct BlobCompactionContext;
class BlobDBImpl;
class BlobDBFlushBeginListener : public EventListener {
public:
explicit BlobDBFlushBeginListener(BlobDBImpl* blob_db_impl)
: blob_db_impl_(blob_db_impl) {}
void OnFlushBegin(DB* db, const FlushJobInfo& info) override;
private:
BlobDBImpl* blob_db_impl_;
};
class BlobFile;
// this implements the callback from the WAL which ensures that the
// blob record is present in the blob log. If fsync/fdatasync in not
@ -154,6 +144,8 @@ class BlobDBImpl : public BlobDB {
virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
virtual Status Close() override;
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size,
bool flush_memtable = true) override;
@ -180,6 +172,10 @@ class BlobDBImpl : public BlobDB {
Status SyncBlobFiles() override;
void UpdateLiveSSTSize();
void GetCompactionContext(BlobCompactionContext* context);
#ifndef NDEBUG
Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
PinnableSlice* value);
@ -190,12 +186,18 @@ class BlobDBImpl : public BlobDB {
Status TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile);
void TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file,
SequenceNumber obsolete_seq = 0,
bool update_size = true);
Status TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
GCStats* gc_stats);
void TEST_RunGC();
void TEST_DeleteObsoleteFiles();
uint64_t TEST_live_sst_size();
#endif // !NDEBUG
private:
@ -217,11 +219,17 @@ class BlobDBImpl : public BlobDB {
std::string* compression_output) const;
// Close a file by appending a footer, and removes file from open files list.
Status CloseBlobFile(std::shared_ptr<BlobFile> bfile);
Status CloseBlobFile(std::shared_ptr<BlobFile> bfile, bool need_lock = true);
// Close a file if its size exceeds blob_file_size
Status CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile);
// Mark file as obsolete and move the file to obsolete file list.
//
// REQUIRED: hold write lock of mutex_ or during DB open.
void ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
SequenceNumber obsolete_seq, bool update_size);
uint64_t ExtractExpiration(const Slice& key, const Slice& value,
Slice* value_slice, std::string* new_value);
@ -243,8 +251,6 @@ class BlobDBImpl : public BlobDB {
std::shared_ptr<BlobFile> FindBlobFileLocked(uint64_t expiration) const;
void Shutdown();
// periodic sanity check. Bunch of checks
std::pair<bool, int64_t> SanityCheck(bool aborted);
@ -315,11 +321,12 @@ class BlobDBImpl : public BlobDB {
uint64_t EpochNow() { return env_->NowMicros() / 1000000; }
Status CheckSize(size_t blob_size);
std::shared_ptr<BlobFile> GetOldestBlobFile();
bool EvictOldestBlobFile();
// Check if inserting a new blob will make DB grow out of space.
// If is_fifo = true, FIFO eviction will be triggered to make room for the
// new blob. If force_evict = true, FIFO eviction will evict blob files
// even eviction will not make enough room for the new blob.
Status CheckSizeAndEvictBlobFiles(uint64_t blob_size,
bool force_evict = false);
// name of the database directory
std::string dbname_;
@ -366,10 +373,10 @@ class BlobDBImpl : public BlobDB {
// all the blob files which are currently being appended to based
// on variety of incoming TTL's
std::multiset<std::shared_ptr<BlobFile>, blobf_compare_ttl> open_ttl_files_;
std::set<std::shared_ptr<BlobFile>, blobf_compare_ttl> open_ttl_files_;
// atomic bool to represent shutdown
std::atomic<bool> shutdown_;
// Flag to check whether Close() has been called on this DB
bool closed_;
// timer based queue to execute tasks
TimerQueue tqueue_;
@ -378,14 +385,25 @@ class BlobDBImpl : public BlobDB {
// counter is used to monitor and close excess RA files.
std::atomic<uint32_t> open_file_count_;
// total size of all blob files at a given time
std::atomic<uint64_t> total_blob_space_;
// Total size of all live blob files (i.e. exclude obsolete files).
std::atomic<uint64_t> total_blob_size_;
// total size of SST files.
std::atomic<uint64_t> live_sst_size_;
// Latest FIFO eviction timestamp
//
// REQUIRES: access with metex_ lock held.
uint64_t fifo_eviction_seq_;
// The expiration up to which latest FIFO eviction evicts.
//
// REQUIRES: access with metex_ lock held.
uint64_t evict_expiration_up_to_;
std::list<std::shared_ptr<BlobFile>> obsolete_files_;
bool open_p1_done_;
uint32_t debug_level_;
std::atomic<bool> oldest_file_evicted_;
};
} // namespace blob_db

@ -46,28 +46,36 @@ class BlobDBIterator : public Iterator {
StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS);
RecordTick(statistics_, BLOB_DB_NUM_SEEK);
iter_->SeekToFirst();
UpdateBlobValue();
while (UpdateBlobValue()) {
iter_->Next();
}
}
void SeekToLast() override {
StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS);
RecordTick(statistics_, BLOB_DB_NUM_SEEK);
iter_->SeekToLast();
UpdateBlobValue();
while (UpdateBlobValue()) {
iter_->Prev();
}
}
void Seek(const Slice& target) override {
StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS);
RecordTick(statistics_, BLOB_DB_NUM_SEEK);
iter_->Seek(target);
UpdateBlobValue();
while (UpdateBlobValue()) {
iter_->Next();
}
}
void SeekForPrev(const Slice& target) override {
StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS);
RecordTick(statistics_, BLOB_DB_NUM_SEEK);
iter_->SeekForPrev(target);
UpdateBlobValue();
while (UpdateBlobValue()) {
iter_->Prev();
}
}
void Next() override {
@ -75,7 +83,9 @@ class BlobDBIterator : public Iterator {
StopWatch next_sw(env_, statistics_, BLOB_DB_NEXT_MICROS);
RecordTick(statistics_, BLOB_DB_NUM_NEXT);
iter_->Next();
UpdateBlobValue();
while (UpdateBlobValue()) {
iter_->Next();
}
}
void Prev() override {
@ -83,7 +93,9 @@ class BlobDBIterator : public Iterator {
StopWatch prev_sw(env_, statistics_, BLOB_DB_PREV_MICROS);
RecordTick(statistics_, BLOB_DB_NUM_PREV);
iter_->Prev();
UpdateBlobValue();
while (UpdateBlobValue()) {
iter_->Prev();
}
}
Slice key() const override {
@ -102,12 +114,24 @@ class BlobDBIterator : public Iterator {
// Iterator::Refresh() not supported.
private:
void UpdateBlobValue() {
// Return true if caller should continue to next value.
bool UpdateBlobValue() {
TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:1");
TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:2");
value_.Reset();
if (iter_->Valid() && iter_->IsBlob()) {
status_ = blob_db_->GetBlobValue(iter_->key(), iter_->value(), &value_);
if (iter_->Valid() && iter_->status().ok() && iter_->IsBlob()) {
Status s = blob_db_->GetBlobValue(iter_->key(), iter_->value(), &value_);
if (s.IsNotFound()) {
return true;
} else {
if (!s.ok()) {
status_ = s;
}
return false;
}
return status_.IsNotFound();
} else {
return false;
}
}

@ -0,0 +1,46 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include <atomic>
#include "rocksdb/listener.h"
#include "util/mutexlock.h"
#include "utilities/blob_db/blob_db_impl.h"
namespace rocksdb {
namespace blob_db {
class BlobDBListener : public EventListener {
public:
explicit BlobDBListener(BlobDBImpl* blob_db_impl)
: blob_db_impl_(blob_db_impl) {}
void OnFlushBegin(DB* /*db*/, const FlushJobInfo& /*info*/) override {
assert(blob_db_impl_ != nullptr);
blob_db_impl_->SyncBlobFiles();
}
void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& /*info*/) override {
assert(blob_db_impl_ != nullptr);
blob_db_impl_->UpdateLiveSSTSize();
}
void OnCompactionCompleted(DB* /*db*/,
const CompactionJobInfo& /*info*/) override {
assert(blob_db_impl_ != nullptr);
blob_db_impl_->UpdateLiveSSTSize();
}
private:
BlobDBImpl* blob_db_impl_;
};
} // namespace blob_db
} // namespace rocksdb
#endif // !ROCKSDB_LITE

@ -45,7 +45,10 @@ class BlobDBTest : public testing::Test {
assert(s.ok());
}
~BlobDBTest() { Destroy(); }
~BlobDBTest() {
SyncPoint::GetInstance()->ClearAllCallBacks();
Destroy();
}
Status TryOpen(BlobDBOptions bdb_options = BlobDBOptions(),
Options options = Options()) {
@ -80,8 +83,13 @@ class BlobDBTest : public testing::Test {
return reinterpret_cast<BlobDBImpl *>(blob_db_);
}
Status Put(const Slice &key, const Slice &value) {
return blob_db_->Put(WriteOptions(), key, value);
Status Put(const Slice &key, const Slice &value,
std::map<std::string, std::string> *data = nullptr) {
Status s = blob_db_->Put(WriteOptions(), key, value);
if (data != nullptr) {
(*data)[key.ToString()] = value.ToString();
}
return s;
}
void Delete(const std::string &key,
@ -92,6 +100,15 @@ class BlobDBTest : public testing::Test {
}
}
Status PutWithTTL(const Slice &key, const Slice &value, uint64_t ttl,
std::map<std::string, std::string> *data = nullptr) {
Status s = blob_db_->PutWithTTL(WriteOptions(), key, value, ttl);
if (data != nullptr) {
(*data)[key.ToString()] = value.ToString();
}
return s;
}
Status PutUntil(const Slice &key, const Slice &value, uint64_t expiration) {
return blob_db_->PutUntil(WriteOptions(), key, value, expiration);
}
@ -747,7 +764,7 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) {
}
// This test is no longer valid since we now return an error when we go
// over the configured blob_dir_size.
// over the configured max_db_size.
// The test needs to be re-written later in such a way that writes continue
// after a GC happens.
TEST_F(BlobDBTest, DISABLED_GCOldestSimpleBlobFileWhenOutOfSpace) {
@ -755,7 +772,7 @@ TEST_F(BlobDBTest, DISABLED_GCOldestSimpleBlobFileWhenOutOfSpace) {
Options options;
options.env = mock_env_.get();
BlobDBOptions bdb_options;
bdb_options.blob_dir_size = 100;
bdb_options.max_db_size = 100;
bdb_options.blob_file_size = 100;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
@ -1038,13 +1055,14 @@ TEST_F(BlobDBTest, MigrateFromPlainRocksDB) {
}
// Test to verify that a NoSpace IOError Status is returned on reaching
// blob_dir_size limit.
// max_db_size limit.
TEST_F(BlobDBTest, OutOfSpace) {
// Use mock env to stop wall clock.
Options options;
options.env = mock_env_.get();
BlobDBOptions bdb_options;
bdb_options.blob_dir_size = 150;
bdb_options.max_db_size = 200;
bdb_options.is_fifo = false;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
@ -1053,16 +1071,16 @@ TEST_F(BlobDBTest, OutOfSpace) {
std::string value(100, 'v');
ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 60));
// Putting another blob should fail as ading it would exceed the blob_dir_size
// Putting another blob should fail as ading it would exceed the max_db_size
// limit.
Status s = blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60);
ASSERT_TRUE(s.IsIOError());
ASSERT_TRUE(s.IsNoSpace());
}
TEST_F(BlobDBTest, EvictOldestFileWhenCloseToSpaceLimit) {
TEST_F(BlobDBTest, FIFOEviction) {
BlobDBOptions bdb_options;
bdb_options.blob_dir_size = 270;
bdb_options.max_db_size = 200;
bdb_options.blob_file_size = 100;
bdb_options.is_fifo = true;
bdb_options.disable_background_tasks = true;
@ -1078,32 +1096,36 @@ TEST_F(BlobDBTest, EvictOldestFileWhenCloseToSpaceLimit) {
// So a 100 byte blob should take up 132 bytes.
std::string value(100, 'v');
ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 10));
VerifyDB({{"key1", value}});
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
// Adding another 100 byte blob would take the total size to 264 bytes
// (2*132), which is more than 90% of blob_dir_size. So, the oldest file
// should be evicted and put in obsolete files list.
// (2*132). max_db_size will be exceeded
// than max_db_size and trigger FIFO eviction.
ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60));
ASSERT_EQ(1, evict_count);
// key1 will exist until corresponding file be deleted.
VerifyDB({{"key1", value}, {"key2", value}});
auto obsolete_files = bdb_impl->TEST_GetObsoleteFiles();
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(2, blob_files.size());
ASSERT_TRUE(blob_files[0]->Obsolete());
ASSERT_FALSE(blob_files[1]->Obsolete());
auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
ASSERT_EQ(1, obsolete_files.size());
ASSERT_TRUE(obsolete_files[0]->Immutable());
ASSERT_EQ(blob_files[0]->BlobFileNumber(),
obsolete_files[0]->BlobFileNumber());
ASSERT_EQ(blob_files[0], obsolete_files[0]);
bdb_impl->TEST_DeleteObsoleteFiles();
obsolete_files = bdb_impl->TEST_GetObsoleteFiles();
blob_db_impl()->TEST_DeleteObsoleteFiles();
obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
ASSERT_TRUE(obsolete_files.empty());
ASSERT_EQ(1, evict_count);
VerifyDB({{"key2", value}});
}
TEST_F(BlobDBTest, NoOldestFileToEvict) {
TEST_F(BlobDBTest, FIFOEviction_NoOldestFileToEvict) {
Options options;
BlobDBOptions bdb_options;
bdb_options.blob_dir_size = 1000;
bdb_options.max_db_size = 1000;
bdb_options.blob_file_size = 5000;
bdb_options.is_fifo = true;
bdb_options.disable_background_tasks = true;
@ -1116,11 +1138,97 @@ TEST_F(BlobDBTest, NoOldestFileToEvict) {
SyncPoint::GetInstance()->EnableProcessing();
std::string value(2000, 'v');
ASSERT_OK(Put("foo", std::string(2000, 'v')));
ASSERT_OK(Put("bar", std::string(2000, 'v')));
ASSERT_TRUE(Put("foo", std::string(2000, 'v')).IsNoSpace());
ASSERT_EQ(0, evict_count);
}
TEST_F(BlobDBTest, FIFOEviction_NoEnoughBlobFilesToEvict) {
BlobDBOptions bdb_options;
bdb_options.is_fifo = true;
bdb_options.min_blob_size = 100;
bdb_options.disable_background_tasks = true;
Options options;
// Use mock env to stop wall clock.
options.env = mock_env_.get();
options.disable_auto_compactions = true;
auto statistics = CreateDBStatistics();
options.statistics = statistics;
Open(bdb_options, options);
ASSERT_EQ(0, blob_db_impl()->TEST_live_sst_size());
std::string small_value(50, 'v');
std::map<std::string, std::string> data;
// Insert some data into LSM tree to make sure FIFO eviction take SST
// file size into account.
for (int i = 0; i < 1000; i++) {
ASSERT_OK(Put("key" + ToString(i), small_value, &data));
}
ASSERT_OK(blob_db_->Flush(FlushOptions()));
uint64_t live_sst_size = 0;
ASSERT_TRUE(blob_db_->GetIntProperty(DB::Properties::kTotalSstFilesSize,
&live_sst_size));
ASSERT_TRUE(live_sst_size > 0);
ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
bdb_options.max_db_size = live_sst_size + 2000;
Reopen(bdb_options, options);
ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
std::string value_1k(1000, 'v');
ASSERT_OK(PutWithTTL("large_key1", value_1k, 60, &data));
ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
VerifyDB(data);
// large_key2 evicts large_key1
ASSERT_OK(PutWithTTL("large_key2", value_1k, 60, &data));
ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
blob_db_impl()->TEST_DeleteObsoleteFiles();
data.erase("large_key1");
VerifyDB(data);
// large_key3 get no enough space even after evicting large_key2, so it
// instead return no space error.
std::string value_2k(2000, 'v');
ASSERT_TRUE(PutWithTTL("large_key3", value_2k, 60).IsNoSpace());
ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
// Verify large_key2 still exists.
VerifyDB(data);
}
// Test flush or compaction will trigger FIFO eviction since they update
// total SST file size.
TEST_F(BlobDBTest, FIFOEviction_TriggerOnSSTSizeChange) {
BlobDBOptions bdb_options;
bdb_options.max_db_size = 1000;
bdb_options.is_fifo = true;
bdb_options.min_blob_size = 100;
bdb_options.disable_background_tasks = true;
Options options;
// Use mock env to stop wall clock.
options.env = mock_env_.get();
auto statistics = CreateDBStatistics();
options.statistics = statistics;
options.compression = kNoCompression;
Open(bdb_options, options);
std::string value(800, 'v');
ASSERT_OK(PutWithTTL("large_key", value, 60));
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
VerifyDB({{"large_key", value}});
// Insert some small keys and flush to bring DB out of space.
std::map<std::string, std::string> data;
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put("key" + ToString(i), "v", &data));
}
ASSERT_OK(blob_db_->Flush(FlushOptions()));
// Verify large_key is deleted by FIFO eviction.
blob_db_impl()->TEST_DeleteObsoleteFiles();
ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
VerifyDB(data);
}
TEST_F(BlobDBTest, InlineSmallValues) {
constexpr uint64_t kMaxExpiration = 1000;
Random rnd(301);
@ -1197,6 +1305,7 @@ TEST_F(BlobDBTest, CompactionFilterNotSupported) {
}
}
// Test comapction filter should remove any expired blob index.
TEST_F(BlobDBTest, FilterExpiredBlobIndex) {
constexpr size_t kNumKeys = 100;
constexpr size_t kNumPuts = 1000;
@ -1262,6 +1371,147 @@ TEST_F(BlobDBTest, FilterExpiredBlobIndex) {
VerifyDB(data_after_compact);
}
// Test compaction filter should remove any blob index where corresponding
// blob file has been removed (either by FIFO or garbage collection).
TEST_F(BlobDBTest, FilterFileNotAvailable) {
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Options options;
options.disable_auto_compactions = true;
Open(bdb_options, options);
ASSERT_OK(Put("foo", "v1"));
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_EQ(1, blob_files[0]->BlobFileNumber());
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
ASSERT_OK(Put("bar", "v2"));
blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(2, blob_files.size());
ASSERT_EQ(2, blob_files[1]->BlobFileNumber());
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[1]));
DB *base_db = blob_db_->GetRootDB();
std::vector<KeyVersion> versions;
ASSERT_OK(GetAllKeyVersions(base_db, "", "", &versions));
ASSERT_EQ(2, versions.size());
ASSERT_EQ("bar", versions[0].user_key);
ASSERT_EQ("foo", versions[1].user_key);
VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
ASSERT_OK(blob_db_->Flush(FlushOptions()));
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_OK(GetAllKeyVersions(base_db, "", "", &versions));
ASSERT_EQ(2, versions.size());
ASSERT_EQ("bar", versions[0].user_key);
ASSERT_EQ("foo", versions[1].user_key);
VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
// Remove the first blob file and compact. foo should be remove from base db.
blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[0]);
blob_db_impl()->TEST_DeleteObsoleteFiles();
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_OK(GetAllKeyVersions(base_db, "", "", &versions));
ASSERT_EQ(1, versions.size());
ASSERT_EQ("bar", versions[0].user_key);
VerifyDB({{"bar", "v2"}});
// Remove the second blob file and compact. bar should be remove from base db.
blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[1]);
blob_db_impl()->TEST_DeleteObsoleteFiles();
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_OK(GetAllKeyVersions(base_db, "", "", &versions));
ASSERT_EQ(0, versions.size());
VerifyDB({});
}
// Test compaction filter should filter any inlined TTL keys that would have
// been dropped by last FIFO eviction if they are store out-of-line.
TEST_F(BlobDBTest, FilterForFIFOEviction) {
Random rnd(215);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 100;
bdb_options.ttl_range_secs = 60;
bdb_options.max_db_size = 0;
bdb_options.disable_background_tasks = true;
Options options;
// Use mock env to stop wall clock.
mock_env_->set_current_time(0);
options.env = mock_env_.get();
auto statistics = CreateDBStatistics();
options.statistics = statistics;
options.disable_auto_compactions = true;
Open(bdb_options, options);
std::map<std::string, std::string> data;
std::map<std::string, std::string> data_after_compact;
// Insert some small values that will be inlined.
for (int i = 0; i < 1000; i++) {
std::string key = "key" + ToString(i);
std::string value = test::RandomHumanReadableString(&rnd, 50);
uint64_t ttl = rnd.Next() % 120 + 1;
ASSERT_OK(PutWithTTL(key, value, ttl, &data));
if (ttl >= 60) {
data_after_compact[key] = value;
}
}
uint64_t num_keys_to_evict = data.size() - data_after_compact.size();
ASSERT_OK(blob_db_->Flush(FlushOptions()));
uint64_t live_sst_size = blob_db_impl()->TEST_live_sst_size();
ASSERT_GT(live_sst_size, 0);
VerifyDB(data);
bdb_options.max_db_size = live_sst_size + 30000;
bdb_options.is_fifo = true;
Reopen(bdb_options, options);
VerifyDB(data);
// Put two large values, each on a different blob file.
std::string large_value(10000, 'v');
ASSERT_OK(PutWithTTL("large_key1", large_value, 90));
ASSERT_OK(PutWithTTL("large_key2", large_value, 150));
ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
data["large_key1"] = large_value;
data["large_key2"] = large_value;
VerifyDB(data);
// Put a third large value which will bring the DB out of space.
// FIFO eviction will evict the file of large_key1.
ASSERT_OK(PutWithTTL("large_key3", large_value, 150));
ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
blob_db_impl()->TEST_DeleteObsoleteFiles();
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
data.erase("large_key1");
data["large_key3"] = large_value;
VerifyDB(data);
// Putting some more small values. These values shouldn't be evicted by
// compaction filter since they are inserted after FIFO eviction.
ASSERT_OK(PutWithTTL("foo", "v", 30, &data_after_compact));
ASSERT_OK(PutWithTTL("bar", "v", 30, &data_after_compact));
// FIFO eviction doesn't trigger again since there enough room for the flush.
ASSERT_OK(blob_db_->Flush(FlushOptions()));
ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
// Manual compact and check if compaction filter evict those keys with
// expiration < 60.
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// All keys with expiration < 60, plus large_key1 is filtered by
// compaction filter.
ASSERT_EQ(num_keys_to_evict + 1,
statistics->getTickerCount(BLOB_DB_BLOB_INDEX_EVICTED_COUNT));
ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
data_after_compact["large_key2"] = large_value;
data_after_compact["large_key3"] = large_value;
VerifyDB(data_after_compact);
}
} // namespace blob_db
} // namespace rocksdb

Loading…
Cancel
Save