BlobDB: refactor DB open logic

Summary:
Refactor BlobDB open logic. List of changes:

Major:
* On reopen, mark blob files found as immutable, do not use them for writing new keys.
* Not to scan the whole file to find file footer. Instead just seek to the end of the file and try to read footer.

Minor:
* Move most of the real logic from blob_db.cc to blob_db_impl.cc.
* Not to hold shared_ptr of event listeners in global maps in blob_db.cc
* Some changes to BlobFile interface.
* Improve logging and error handling.
Closes https://github.com/facebook/rocksdb/pull/3246

Differential Revision: D6526147

Pulled By: yiwu-arbug

fbshipit-source-id: 9dc4cdd63359a2f9b696af817086949da8d06952
main
Yi Wu 7 years ago committed by Facebook Github Bot
parent 6a183d1ae8
commit 250a51a3f9
  1. 169
      utilities/blob_db/blob_db.cc
  2. 24
      utilities/blob_db/blob_db.h
  3. 396
      utilities/blob_db/blob_db_impl.cc
  4. 79
      utilities/blob_db/blob_db_impl.h
  5. 130
      utilities/blob_db/blob_file.cc
  6. 41
      utilities/blob_db/blob_file.h
  7. 5
      utilities/blob_db/blob_log_writer.cc
  8. 2
      utilities/blob_db/blob_log_writer.h

@ -12,89 +12,10 @@
#include "utilities/blob_db/blob_db.h"
#include <inttypes.h>
#include "db/write_batch_internal.h"
#include "monitoring/instrumented_mutex.h"
#include "options/cf_options.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/convenience.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/utilities/stackable_db.h"
#include "table/block.h"
#include "table/block_based_table_builder.h"
#include "table/block_builder.h"
#include "util/file_reader_writer.h"
#include "util/filename.h"
#include "utilities/blob_db/blob_compaction_filter.h"
#include "utilities/blob_db/blob_db_impl.h"
namespace rocksdb {
namespace blob_db {
port::Mutex listener_mutex;
typedef std::shared_ptr<BlobDBFlushBeginListener> FlushBeginListener_t;
typedef std::shared_ptr<BlobReconcileWalFilter> ReconcileWalFilter_t;
typedef std::shared_ptr<EvictAllVersionsCompactionListener>
CompactionListener_t;
// to ensure the lifetime of the listeners
std::vector<std::shared_ptr<EventListener>> all_blobdb_listeners;
std::vector<ReconcileWalFilter_t> all_wal_filters;
Status BlobDB::OpenAndLoad(const Options& options,
const BlobDBOptions& bdb_options,
const std::string& dbname, BlobDB** blob_db,
Options* changed_options) {
if (options.compaction_filter != nullptr ||
options.compaction_filter_factory != nullptr) {
return Status::NotSupported("Blob DB doesn't support compaction filter.");
}
*changed_options = options;
*blob_db = nullptr;
FlushBeginListener_t fblistener =
std::make_shared<BlobDBFlushBeginListener>();
ReconcileWalFilter_t rw_filter = std::make_shared<BlobReconcileWalFilter>();
CompactionListener_t ce_listener =
std::make_shared<EvictAllVersionsCompactionListener>();
{
MutexLock l(&listener_mutex);
all_blobdb_listeners.push_back(fblistener);
if (bdb_options.enable_garbage_collection) {
all_blobdb_listeners.push_back(ce_listener);
}
all_wal_filters.push_back(rw_filter);
}
changed_options->compaction_filter_factory.reset(
new BlobIndexCompactionFilterFactory(options.env,
options.statistics.get()));
changed_options->listeners.emplace_back(fblistener);
if (bdb_options.enable_garbage_collection) {
changed_options->listeners.emplace_back(ce_listener);
}
changed_options->wal_filter = rw_filter.get();
DBOptions db_options(*changed_options);
// we need to open blob db first so that recovery can happen
BlobDBImpl* bdb = new BlobDBImpl(dbname, bdb_options, db_options);
fblistener->SetImplPtr(bdb);
if (bdb_options.enable_garbage_collection) {
ce_listener->SetImplPtr(bdb);
}
rw_filter->SetImplPtr(bdb);
Status s = bdb->OpenPhase1();
if (!s.ok()) return s;
*blob_db = bdb;
return s;
}
Status BlobDB::Open(const Options& options, const BlobDBOptions& bdb_options,
const std::string& dbname, BlobDB** blob_db) {
@ -116,96 +37,30 @@ Status BlobDB::Open(const Options& options, const BlobDBOptions& bdb_options,
return s;
}
Status BlobDB::Open(const DBOptions& db_options_input,
Status BlobDB::Open(const DBOptions& db_options,
const BlobDBOptions& bdb_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, BlobDB** blob_db,
bool no_base_db) {
std::vector<ColumnFamilyHandle*>* handles,
BlobDB** blob_db) {
if (column_families.size() != 1 ||
column_families[0].name != kDefaultColumnFamilyName) {
return Status::NotSupported(
"Blob DB doesn't support non-default column family.");
}
*blob_db = nullptr;
Status s;
DBOptions db_options(db_options_input);
if (db_options.info_log == nullptr) {
s = CreateLoggerFromOptions(dbname, db_options, &db_options.info_log);
if (!s.ok()) {
return s;
}
}
FlushBeginListener_t fblistener =
std::make_shared<BlobDBFlushBeginListener>();
CompactionListener_t ce_listener =
std::make_shared<EvictAllVersionsCompactionListener>();
ReconcileWalFilter_t rw_filter = std::make_shared<BlobReconcileWalFilter>();
db_options.listeners.emplace_back(fblistener);
if (bdb_options.enable_garbage_collection) {
db_options.listeners.emplace_back(ce_listener);
}
db_options.wal_filter = rw_filter.get();
{
MutexLock l(&listener_mutex);
all_blobdb_listeners.push_back(fblistener);
if (bdb_options.enable_garbage_collection) {
all_blobdb_listeners.push_back(ce_listener);
}
all_wal_filters.push_back(rw_filter);
}
ColumnFamilyOptions cf_options(column_families[0].options);
if (cf_options.compaction_filter != nullptr ||
cf_options.compaction_filter_factory != nullptr) {
return Status::NotSupported("Blob DB doesn't support compaction filter.");
}
cf_options.compaction_filter_factory.reset(
new BlobIndexCompactionFilterFactory(db_options.env,
db_options.statistics.get()));
ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options);
// we need to open blob db first so that recovery can happen
BlobDBImpl* bdb = new BlobDBImpl(dbname, bdb_options, db_options);
fblistener->SetImplPtr(bdb);
if (bdb_options.enable_garbage_collection) {
ce_listener->SetImplPtr(bdb);
}
rw_filter->SetImplPtr(bdb);
s = bdb->OpenPhase1();
if (!s.ok()) {
delete bdb;
return s;
}
if (no_base_db) {
*blob_db = bdb;
return s;
}
DB* db = nullptr;
s = DB::Open(db_options, dbname, {cf_descriptor}, handles, &db);
if (!s.ok()) {
delete bdb;
return s;
}
// set the implementation pointer
s = bdb->LinkToBaseDB(db);
if (!s.ok()) {
delete bdb;
bdb = nullptr;
BlobDBImpl* blob_db_impl = new BlobDBImpl(dbname, bdb_options, db_options,
column_families[0].options);
Status s = blob_db_impl->Open(handles);
if (s.ok()) {
*blob_db = static_cast<BlobDB*>(blob_db_impl);
} else {
delete blob_db_impl;
*blob_db = nullptr;
}
*blob_db = bdb;
bdb_options.Dump(db_options.info_log.get());
return s;
}
BlobDB::BlobDB(DB* db) : StackableDB(db) {}
BlobDB::BlobDB() : StackableDB(nullptr) {}
void BlobDBOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir: %s",

@ -190,23 +190,7 @@ class BlobDB : public StackableDB {
return NewIterator(options);
}
// Starting point for opening a Blob DB.
// changed_options - critical. Blob DB loads and inserts listeners
// into options which are necessary for recovery and atomicity
// Use this pattern if you need control on step 2, i.e. your
// BaseDB is not just a simple rocksdb but a stacked DB
// 1. ::OpenAndLoad
// 2. Open Base DB with the changed_options
// 3. ::LinkToBaseDB
static Status OpenAndLoad(const Options& options,
const BlobDBOptions& bdb_options,
const std::string& dbname, BlobDB** blob_db,
Options* changed_options);
// This is another way to open BLOB DB which do not have other
// Stackable DB's in play
// Steps.
// 1. ::Open
// Opening blob db.
static Status Open(const Options& options, const BlobDBOptions& bdb_options,
const std::string& dbname, BlobDB** blob_db);
@ -215,16 +199,14 @@ class BlobDB : public StackableDB {
const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles,
BlobDB** blob_db, bool no_base_db = false);
BlobDB** blob_db);
virtual BlobDBOptions GetBlobDBOptions() const = 0;
virtual ~BlobDB() {}
virtual Status LinkToBaseDB(DB* db_base) = 0;
protected:
explicit BlobDB(DB* db);
explicit BlobDB();
};
// Destroy the content of the database.

@ -8,7 +8,6 @@
#include <algorithm>
#include <cinttypes>
#include <iomanip>
#include <limits>
#include <memory>
#include "db/db_impl.h"
@ -34,6 +33,7 @@
#include "util/stop_watch.h"
#include "util/sync_point.h"
#include "util/timer_queue.h"
#include "utilities/blob_db/blob_compaction_filter.h"
#include "utilities/blob_db/blob_db_iterator.h"
#include "utilities/blob_db/blob_index.h"
@ -44,10 +44,9 @@ int kBlockBasedTableVersionFormat = 2;
namespace rocksdb {
namespace blob_db {
Random blob_rgen(static_cast<uint32_t>(time(nullptr)));
void BlobDBFlushBeginListener::OnFlushBegin(DB* db, const FlushJobInfo& info) {
if (impl_) impl_->OnFlushBeginHandler(db, info);
assert(blob_db_impl_ != nullptr);
blob_db_impl_->SyncBlobFiles();
}
WalFilter::WalProcessingOption BlobReconcileWalFilter::LogRecordFound(
@ -100,13 +99,16 @@ void EvictAllVersionsCompactionListener::InternalListener::OnCompaction(
BlobDBImpl::BlobDBImpl(const std::string& dbname,
const BlobDBOptions& blob_db_options,
const DBOptions& db_options)
: BlobDB(nullptr),
const DBOptions& db_options,
const ColumnFamilyOptions& cf_options)
: BlobDB(),
dbname_(dbname),
db_impl_(nullptr),
env_(db_options.env),
ttl_extractor_(blob_db_options.ttl_extractor.get()),
bdb_options_(blob_db_options),
db_options_(db_options),
cf_options_(cf_options),
env_options_(db_options),
statistics_(db_options_.statistics.get()),
dir_change_(false),
@ -124,86 +126,82 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
: bdb_options_.blob_dir;
}
Status BlobDBImpl::LinkToBaseDB(DB* db) {
BlobDBImpl::~BlobDBImpl() {
// CancelAllBackgroundWork(db_, true);
Shutdown();
}
BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; }
Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
assert(handles != nullptr);
assert(db_ == nullptr);
assert(open_p1_done_);
if (blob_dir_.empty()) {
return Status::NotSupported("No blob directory in options");
}
if (cf_options_.compaction_filter != nullptr ||
cf_options_.compaction_filter_factory != nullptr) {
return Status::NotSupported("Blob DB doesn't support compaction filter.");
}
db_ = db;
Status s;
// the Base DB in-itself can be a stackable DB
db_impl_ = static_cast_with_check<DBImpl, DB>(db_->GetRootDB());
// Create info log.
if (db_options_.info_log == nullptr) {
s = CreateLoggerFromOptions(dbname_, db_options_, &db_options_.info_log);
if (!s.ok()) {
return s;
}
}
env_ = db_->GetEnv();
ROCKS_LOG_INFO(db_options_.info_log, "Opening BlobDB...");
Status s = env_->CreateDirIfMissing(blob_dir_);
// Open blob directory.
s = env_->CreateDirIfMissing(blob_dir_);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Failed to create blob directory: %s status: '%s'",
blob_dir_.c_str(), s.ToString().c_str());
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to create blob_dir %s, status: %s",
blob_dir_.c_str(), s.ToString().c_str());
}
s = env_->NewDirectory(blob_dir_, &dir_ent_);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Failed to open blob directory: %s status: '%s'",
blob_dir_.c_str(), s.ToString().c_str());
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to open blob_dir %s, status: %s", blob_dir_.c_str(),
s.ToString().c_str());
return s;
}
if (!bdb_options_.disable_background_tasks) {
StartBackgroundTasks();
// Open blob files.
s = OpenAllBlobFiles();
if (!s.ok()) {
return s;
}
return s;
}
BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; }
BlobDBImpl::BlobDBImpl(DB* db, const BlobDBOptions& blob_db_options)
: BlobDB(db),
db_impl_(static_cast_with_check<DBImpl, DB>(db)),
env_(nullptr),
ttl_extractor_(nullptr),
bdb_options_(blob_db_options),
db_options_(db->GetOptions()),
env_options_(db_->GetOptions()),
statistics_(db_options_.statistics.get()),
dir_change_(false),
next_file_number_(1),
epoch_of_(0),
shutdown_(false),
current_epoch_(0),
open_file_count_(0),
total_blob_space_(0),
open_p1_done_(false),
debug_level_(0),
oldest_file_evicted_(false) {
if (!bdb_options_.blob_dir.empty())
blob_dir_ = (bdb_options_.path_relative)
? db_->GetName() + "/" + bdb_options_.blob_dir
: bdb_options_.blob_dir;
}
BlobDBImpl::~BlobDBImpl() {
// CancelAllBackgroundWork(db_, true);
Shutdown();
}
Status BlobDBImpl::OpenPhase1() {
assert(db_ == nullptr);
if (blob_dir_.empty())
return Status::NotSupported("No blob directory in options");
// Update options
db_options_.listeners.push_back(
std::shared_ptr<EventListener>(new BlobDBFlushBeginListener(this)));
if (bdb_options_.enable_garbage_collection) {
db_options_.listeners.push_back(std::shared_ptr<EventListener>(
new EvictAllVersionsCompactionListener(this)));
}
cf_options_.compaction_filter_factory.reset(
new BlobIndexCompactionFilterFactory(env_, statistics_));
std::unique_ptr<Directory> dir_ent;
Status s = env_->NewDirectory(blob_dir_, &dir_ent);
// Open base db.
ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_);
s = DB::Open(db_options_, dbname_, {cf_descriptor}, handles, &db_);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Failed to open blob directory: %s status: '%s'",
blob_dir_.c_str(), s.ToString().c_str());
open_p1_done_ = true;
return Status::OK();
return s;
}
db_impl_ = static_cast_with_check<DBImpl, DB>(db_->GetRootDB());
// Start background jobs.
if (!bdb_options_.disable_background_tasks) {
StartBackgroundTasks();
}
s = OpenAllFiles();
open_p1_done_ = true;
ROCKS_LOG_INFO(db_options_.info_log, "BlobDB pointer %p", this);
return s;
}
@ -236,196 +234,91 @@ void BlobDBImpl::StartBackgroundTasks() {
void BlobDBImpl::Shutdown() { shutdown_.store(true); }
void BlobDBImpl::OnFlushBeginHandler(DB* db, const FlushJobInfo& info) {
if (shutdown_.load()) return;
// a callback that happens too soon needs to be ignored
if (!db_) return;
FsyncFiles(false);
}
Status BlobDBImpl::GetAllLogFiles(
std::set<std::pair<uint64_t, std::string>>* file_nums) {
Status BlobDBImpl::GetAllBlobFiles(std::set<uint64_t>* file_numbers) {
assert(file_numbers != nullptr);
std::vector<std::string> all_files;
Status status = env_->GetChildren(blob_dir_, &all_files);
if (!status.ok()) {
return status;
Status s = env_->GetChildren(blob_dir_, &all_files);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to get list of blob files, status: %s",
s.ToString().c_str());
return s;
}
for (const auto& f : all_files) {
uint64_t number;
for (const auto& file_name : all_files) {
uint64_t file_number;
FileType type;
bool psucc = ParseFileName(f, &number, &type);
if (psucc && type == kBlobFile) {
file_nums->insert(std::make_pair(number, f));
bool success = ParseFileName(file_name, &file_number, &type);
if (success && type == kBlobFile) {
file_numbers->insert(file_number);
} else {
ROCKS_LOG_WARN(db_options_.info_log,
"Skipping file in blob directory %s parse: %d type: %d",
f.c_str(), psucc, ((psucc) ? type : -1));
"Skipping file in blob directory: %s", file_name.c_str());
}
}
return status;
return s;
}
Status BlobDBImpl::OpenAllFiles() {
WriteLock wl(&mutex_);
std::set<std::pair<uint64_t, std::string>> file_nums;
Status status = GetAllLogFiles(&file_nums);
Status BlobDBImpl::OpenAllBlobFiles() {
std::set<uint64_t> file_numbers;
Status s = GetAllBlobFiles(&file_numbers);
if (!s.ok()) {
return s;
}
if (!status.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to collect files from blob dir: %s status: '%s'",
blob_dir_.c_str(), status.ToString().c_str());
return status;
if (!file_numbers.empty()) {
next_file_number_.store(*file_numbers.rbegin() + 1);
}
ROCKS_LOG_INFO(db_options_.info_log,
"BlobDir files path: %s count: %d min: %" PRIu64
" max: %" PRIu64,
blob_dir_.c_str(), static_cast<int>(file_nums.size()),
(file_nums.empty()) ? -1 : file_nums.cbegin()->first,
(file_nums.empty()) ? -1 : file_nums.crbegin()->first);
if (!file_nums.empty())
next_file_number_.store((file_nums.rbegin())->first + 1);
for (auto& f_iter : file_nums) {
std::string bfpath = BlobFileName(blob_dir_, f_iter.first);
uint64_t size_bytes;
Status s1 = env_->GetFileSize(bfpath, &size_bytes);
if (!s1.ok()) {
ROCKS_LOG_WARN(
db_options_.info_log,
"Unable to get size of %s. File skipped from open status: '%s'",
bfpath.c_str(), s1.ToString().c_str());
continue;
}
std::string blob_file_list;
std::string obsolete_file_list;
if (debug_level_ >= 1)
ROCKS_LOG_INFO(db_options_.info_log, "Blob File open: %s size: %" PRIu64,
bfpath.c_str(), size_bytes);
for (auto& file_number : file_numbers) {
std::shared_ptr<BlobFile> blob_file = std::make_shared<BlobFile>(
this, blob_dir_, file_number, db_options_.info_log.get());
blob_file->MarkImmutable();
std::shared_ptr<BlobFile> bfptr =
std::make_shared<BlobFile>(this, blob_dir_, f_iter.first);
bfptr->SetFileSize(size_bytes);
// Read file header and footer
Status read_metadata_status = blob_file->ReadMetadata(env_, env_options_);
if (read_metadata_status.IsCorruption()) {
// Remove incomplete file.
blob_file->MarkObsolete(0 /*sequence number*/);
obsolete_files_.push_back(blob_file);
if (!obsolete_file_list.empty()) {
obsolete_file_list.append(", ");
}
obsolete_file_list.append(ToString(file_number));
continue;
} else if (!read_metadata_status.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Unable to read metadata of blob file % " PRIu64
", status: '%s'",
file_number, read_metadata_status.ToString().c_str());
return read_metadata_status;
}
// since this file already existed, we will try to reconcile
// deleted count with LSM
bfptr->gc_once_after_open_ = true;
// read header
std::shared_ptr<Reader> reader;
reader = bfptr->OpenSequentialReader(env_, db_options_, env_options_);
s1 = reader->ReadHeader(&bfptr->header_);
if (!s1.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failure to read header for blob-file %s "
"status: '%s' size: %" PRIu64,
bfpath.c_str(), s1.ToString().c_str(), size_bytes);
continue;
if (bdb_options_.enable_garbage_collection) {
blob_file->gc_once_after_open_ = true;
}
bfptr->SetHasTTL(bfptr->header_.has_ttl);
bfptr->SetCompression(bfptr->header_.compression);
bfptr->header_valid_ = true;
std::shared_ptr<RandomAccessFileReader> ra_reader =
GetOrOpenRandomAccessReader(bfptr, env_, env_options_);
BlobLogFooter bf;
s1 = bfptr->ReadFooter(&bf);
bfptr->CloseRandomAccessLocked();
if (s1.ok()) {
s1 = bfptr->SetFromFooterLocked(bf);
if (!s1.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Header Footer mismatch for blob-file %s "
"status: '%s' size: %" PRIu64,
bfpath.c_str(), s1.ToString().c_str(), size_bytes);
continue;
}
} else {
ROCKS_LOG_INFO(db_options_.info_log,
"File found incomplete (w/o footer) %s", bfpath.c_str());
// sequentially iterate over the file and read all the records
ExpirationRange expiration_range(std::numeric_limits<uint32_t>::max(),
std::numeric_limits<uint32_t>::min());
uint64_t blob_count = 0;
BlobLogRecord record;
Reader::ReadLevel shallow = Reader::kReadHeaderKey;
uint64_t record_start = reader->GetNextByte();
// TODO(arahut) - when we detect corruption, we should truncate
while (reader->ReadRecord(&record, shallow).ok()) {
++blob_count;
if (bfptr->HasTTL()) {
expiration_range.first =
std::min(expiration_range.first, record.expiration);
expiration_range.second =
std::max(expiration_range.second, record.expiration);
}
record_start = reader->GetNextByte();
}
if (record_start != bfptr->GetFileSize()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Blob file is corrupted or crashed during write %s"
" good_size: %" PRIu64 " file_size: %" PRIu64,
bfpath.c_str(), record_start, bfptr->GetFileSize());
}
if (!blob_count) {
ROCKS_LOG_INFO(db_options_.info_log, "BlobCount = 0 in file %s",
bfpath.c_str());
continue;
}
bfptr->SetBlobCount(blob_count);
bfptr->SetSequenceRange({0, 0});
ROCKS_LOG_INFO(db_options_.info_log,
"Blob File: %s blob_count: %" PRIu64
" size_bytes: %" PRIu64 " has_ttl: %d",
bfpath.c_str(), blob_count, size_bytes, bfptr->HasTTL());
if (bfptr->HasTTL()) {
expiration_range.second = std::max(
expiration_range.second,
expiration_range.first + (uint32_t)bdb_options_.ttl_range_secs);
bfptr->set_expiration_range(expiration_range);
uint64_t now = EpochNow();
if (expiration_range.second < now) {
Status fstatus = CreateWriterLocked(bfptr);
if (fstatus.ok()) fstatus = bfptr->WriteFooterAndCloseLocked();
if (!fstatus.ok()) {
ROCKS_LOG_ERROR(
db_options_.info_log,
"Failed to close Blob File: %s status: '%s'. Skipped",
bfpath.c_str(), fstatus.ToString().c_str());
continue;
} else {
ROCKS_LOG_ERROR(
db_options_.info_log,
"Blob File Closed: %s now: %d expiration_range: (%d, %d)",
bfpath.c_str(), now, expiration_range.first,
expiration_range.second);
}
} else {
open_ttl_files_.insert(bfptr);
}
}
blob_files_[file_number] = blob_file;
if (!blob_file_list.empty()) {
blob_file_list.append(", ");
}
blob_files_.insert(std::make_pair(f_iter.first, bfptr));
blob_file_list.append(ToString(file_number));
}
return status;
ROCKS_LOG_INFO(db_options_.info_log,
"Found %" ROCKSDB_PRIszt " blob files: %s", blob_files_.size(),
blob_file_list.c_str());
ROCKS_LOG_INFO(db_options_.info_log,
"Found %" ROCKSDB_PRIszt
" incomplete or corrupted blob files: %s",
obsolete_files_.size(), obsolete_file_list.c_str());
return s;
}
void BlobDBImpl::CloseRandomAccessLocked(
@ -445,7 +338,8 @@ std::shared_ptr<RandomAccessFileReader> BlobDBImpl::GetOrOpenRandomAccessReader(
std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(const std::string& reason) {
uint64_t file_num = next_file_number_++;
auto bfile = std::make_shared<BlobFile>(this, blob_dir_, file_num);
auto bfile = std::make_shared<BlobFile>(this, blob_dir_, file_num,
db_options_.info_log.get());
ROCKS_LOG_DEBUG(db_options_.info_log, "New blob file created: %s reason='%s'",
bfile->PathName().c_str(), reason.c_str());
LogFlush(db_options_.info_log);
@ -565,6 +459,7 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFile() {
bfile->header_.column_family_id =
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
bfile->header_valid_ = true;
bfile->SetColumnFamilyId(bfile->header_.column_family_id);
bfile->SetHasTTL(false);
bfile->SetCompression(bdb_options_.compression);
@ -626,6 +521,7 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) {
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
;
bfile->header_valid_ = true;
bfile->SetColumnFamilyId(bfile->header_.column_family_id);
bfile->SetHasTTL(true);
bfile->SetCompression(bdb_options_.compression);
bfile->file_size_ = BlobLogHeader::kSize;
@ -1536,8 +1432,14 @@ std::pair<bool, int64_t> BlobDBImpl::CheckSeqFiles(bool aborted) {
}
std::pair<bool, int64_t> BlobDBImpl::FsyncFiles(bool aborted) {
if (aborted) return std::make_pair(false, -1);
if (aborted || shutdown_) {
return std::make_pair(false, -1);
}
SyncBlobFiles();
return std::make_pair(true, -1);
}
Status BlobDBImpl::SyncBlobFiles() {
MutexLock l(&write_mutex_);
std::vector<std::shared_ptr<BlobFile>> process_files;
@ -1554,14 +1456,26 @@ std::pair<bool, int64_t> BlobDBImpl::FsyncFiles(bool aborted) {
}
}
for (auto fitr : process_files) {
if (fitr->NeedsFsync(true, bdb_options_.bytes_per_sync)) fitr->Fsync();
Status s;
for (auto& blob_file : process_files) {
if (blob_file->NeedsFsync(true, bdb_options_.bytes_per_sync)) {
s = blob_file->Fsync();
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to sync blob file %" PRIu64 ", status: %s",
blob_file->BlobFileNumber(), s.ToString().c_str());
return s;
}
}
}
bool expected = true;
if (dir_change_.compare_exchange_weak(expected, false)) dir_ent_->Fsync();
if (dir_change_.compare_exchange_weak(expected, false)) {
s = dir_ent_->Fsync();
}
return std::make_pair(true, -1);
return s;
}
std::pair<bool, int64_t> BlobDBImpl::ReclaimOpenFiles(bool aborted) {

@ -49,20 +49,20 @@ class BlobDBImpl;
class BlobDBFlushBeginListener : public EventListener {
public:
explicit BlobDBFlushBeginListener() : impl_(nullptr) {}
explicit BlobDBFlushBeginListener(BlobDBImpl* blob_db_impl)
: blob_db_impl_(blob_db_impl) {}
void OnFlushBegin(DB* db, const FlushJobInfo& info) override;
void SetImplPtr(BlobDBImpl* p) { impl_ = p; }
protected:
BlobDBImpl* impl_;
private:
BlobDBImpl* blob_db_impl_;
};
// this implements the callback from the WAL which ensures that the
// blob record is present in the blob log. If fsync/fdatasync in not
// happening on every write, there is the probability that keys in the
// blob log can lag the keys in blobs
// TODO(yiwu): implement the WAL filter.
class BlobReconcileWalFilter : public WalFilter {
public:
virtual WalFilter::WalProcessingOption LogRecordFound(
@ -71,11 +71,6 @@ class BlobReconcileWalFilter : public WalFilter {
bool* batch_changed) override;
virtual const char* Name() const override { return "BlobDBWalReconciler"; }
void SetImplPtr(BlobDBImpl* p) { impl_ = p; }
protected:
BlobDBImpl* impl_;
};
class EvictAllVersionsCompactionListener : public EventListener {
@ -84,49 +79,28 @@ class EvictAllVersionsCompactionListener : public EventListener {
friend class BlobDBImpl;
public:
explicit InternalListener(BlobDBImpl* blob_db_impl) : impl_(blob_db_impl) {}
virtual void OnCompaction(int level, const Slice& key,
CompactionListenerValueType value_type,
const Slice& existing_value,
const SequenceNumber& sn, bool is_new) override;
void SetImplPtr(BlobDBImpl* p) { impl_ = p; }
private:
BlobDBImpl* impl_;
};
explicit EvictAllVersionsCompactionListener()
: internal_listener_(new InternalListener()) {}
explicit EvictAllVersionsCompactionListener(BlobDBImpl* blob_db_impl)
: internal_listener_(new InternalListener(blob_db_impl)) {}
virtual CompactionEventListener* GetCompactionEventListener() override {
return internal_listener_.get();
}
void SetImplPtr(BlobDBImpl* p) { internal_listener_->SetImplPtr(p); }
private:
std::unique_ptr<InternalListener> internal_listener_;
};
#if 0
class EvictAllVersionsFilterFactory : public CompactionFilterFactory {
private:
BlobDBImpl* impl_;
public:
EvictAllVersionsFilterFactory() : impl_(nullptr) {}
void SetImplPtr(BlobDBImpl* p) { impl_ = p; }
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override;
virtual const char* Name() const override {
return "EvictAllVersionsFilterFactory";
}
};
#endif
// Comparator to sort "TTL" aware Blob files based on the lower value of
// TTL range.
struct blobf_compare_ttl {
@ -150,9 +124,7 @@ struct GCStats {
* Garbage Collected.
*/
class BlobDBImpl : public BlobDB {
friend class BlobDBFlushBeginListener;
friend class EvictAllVersionsCompactionListener;
friend class BlobDB;
friend class BlobFile;
friend class BlobDBIterator;
@ -246,17 +218,18 @@ class BlobDBImpl : public BlobDB {
Status PutUntil(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t expiration) override;
Status LinkToBaseDB(DB* db) override;
BlobDBOptions GetBlobDBOptions() const override;
BlobDBImpl(DB* db, const BlobDBOptions& bdb_options);
BlobDBImpl(const std::string& dbname, const BlobDBOptions& bdb_options,
const DBOptions& db_options);
const DBOptions& db_options,
const ColumnFamilyOptions& cf_options);
~BlobDBImpl();
Status Open(std::vector<ColumnFamilyHandle*>* handles);
Status SyncBlobFiles();
#ifndef NDEBUG
Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
PinnableSlice* value);
@ -279,8 +252,6 @@ class BlobDBImpl : public BlobDB {
class GarbageCollectionWriteCallback;
class BlobInserter;
Status OpenPhase1();
// Create a snapshot if there isn't one in read options.
// Return true if a snapshot is created.
bool SetSnapshotIfNeeded(ReadOptions* read_options);
@ -295,10 +266,6 @@ class BlobDBImpl : public BlobDB {
Slice GetCompressedSlice(const Slice& raw,
std::string* compression_output) const;
// Just before flush starts acting on memtable files,
// this handler is called.
void OnFlushBeginHandler(DB* db, const FlushJobInfo& info);
// is this file ready for Garbage collection. if the TTL of the file
// has expired or if threshold of the file has been evicted
// tt - current time
@ -306,9 +273,6 @@ class BlobDBImpl : public BlobDB {
bool ShouldGCFile(std::shared_ptr<BlobFile> bfile, uint64_t now,
bool is_oldest_non_ttl_file, std::string* reason);
// collect all the blob log files from the blob directory
Status GetAllLogFiles(std::set<std::pair<uint64_t, std::string>>* file_nums);
// Close a file by appending a footer, and removes file from open files list.
Status CloseBlobFile(std::shared_ptr<BlobFile> bfile);
@ -374,7 +338,11 @@ class BlobDBImpl : public BlobDB {
// add a new Blob File
std::shared_ptr<BlobFile> NewBlobFile(const std::string& reason);
Status OpenAllFiles();
// collect all the blob log files from the blob directory
Status GetAllBlobFiles(std::set<uint64_t>* file_numbers);
// Open all blob files found in blob_dir.
Status OpenAllBlobFiles();
// hold write mutex on file and call
// creates a Random Access reader for GET call
@ -428,6 +396,9 @@ class BlobDBImpl : public BlobDB {
bool EvictOldestBlobFile();
// name of the database directory
std::string dbname_;
// the base DB
DBImpl* db_impl_;
Env* env_;
@ -436,14 +407,12 @@ class BlobDBImpl : public BlobDB {
// the options that govern the behavior of Blob Storage
BlobDBOptions bdb_options_;
DBOptions db_options_;
ColumnFamilyOptions cf_options_;
EnvOptions env_options_;
// Raw pointer of statistic. db_options_ has a shared_ptr to hold ownership.
Statistics* statistics_;
// name of the database directory
std::string dbname_;
// by default this is "blob_dir" under dbname_
// but can be configured
std::string blob_dir_;

@ -13,6 +13,7 @@
#include <stdio.h>
#include <algorithm>
#include <limits>
#include <memory>
#include "db/column_family.h"
@ -29,8 +30,10 @@ namespace blob_db {
BlobFile::BlobFile()
: parent_(nullptr),
file_number_(0),
has_ttl_(false),
info_log_(nullptr),
column_family_id_(std::numeric_limits<uint32_t>::max()),
compression_(kNoCompression),
has_ttl_(false),
blob_count_(0),
gc_epoch_(-1),
file_size_(0),
@ -43,14 +46,18 @@ BlobFile::BlobFile()
sequence_range_({kMaxSequenceNumber, 0}),
last_access_(-1),
last_fsync_(0),
header_valid_(false) {}
header_valid_(false),
footer_valid_(false) {}
BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn)
BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn,
Logger* info_log)
: parent_(p),
path_to_dir_(bdir),
file_number_(fn),
has_ttl_(false),
info_log_(info_log),
column_family_id_(std::numeric_limits<uint32_t>::max()),
compression_(kNoCompression),
has_ttl_(false),
blob_count_(0),
gc_epoch_(-1),
file_size_(0),
@ -63,7 +70,8 @@ BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn)
sequence_range_({kMaxSequenceNumber, 0}),
last_access_(-1),
last_fsync_(0),
header_valid_(false) {}
header_valid_(false),
footer_valid_(false) {}
BlobFile::~BlobFile() {
if (obsolete_) {
@ -76,12 +84,7 @@ BlobFile::~BlobFile() {
}
}
uint32_t BlobFile::column_family_id() const {
// TODO(yiwu): Should return column family id encoded in blob file after
// we add blob db column family support.
return reinterpret_cast<ColumnFamilyHandle*>(parent_->DefaultColumnFamily())
->GetID();
}
uint32_t BlobFile::column_family_id() const { return column_family_id_; }
std::string BlobFile::PathName() const {
return BlobFileName(path_to_dir_, file_number_);
@ -123,6 +126,7 @@ std::string BlobFile::DumpState() const {
}
void BlobFile::MarkObsolete(SequenceNumber sequence) {
assert(Immutable());
obsolete_sequence_ = sequence;
obsolete_.store(true);
}
@ -186,11 +190,13 @@ Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) {
return Status::OK();
}
void BlobFile::Fsync() {
Status BlobFile::Fsync() {
Status s;
if (log_writer_.get()) {
log_writer_->Sync();
s = log_writer_->Sync();
last_fsync_.store(file_size_.load());
}
return s;
}
void BlobFile::CloseRandomAccessLocked() {
@ -216,7 +222,7 @@ std::shared_ptr<RandomAccessFileReader> BlobFile::GetOrOpenRandomAccessReader(
std::unique_ptr<RandomAccessFile> rfile;
Status s = env->NewRandomAccessFile(PathName(), &rfile, env_options);
if (!s.ok()) {
ROCKS_LOG_ERROR(parent_->db_options_.info_log,
ROCKS_LOG_ERROR(info_log_,
"Failed to open blob file for random-read: %s status: '%s'"
" exists: '%s'",
PathName().c_str(), s.ToString().c_str(),
@ -230,6 +236,102 @@ std::shared_ptr<RandomAccessFileReader> BlobFile::GetOrOpenRandomAccessReader(
return ra_file_reader_;
}
Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) {
assert(Immutable());
// Get file size.
uint64_t file_size = 0;
Status s = env->GetFileSize(PathName(), &file_size);
if (s.ok()) {
file_size_ = file_size;
} else {
ROCKS_LOG_ERROR(info_log_,
"Failed to get size of blob file %" ROCKSDB_PRIszt
", status: %s",
file_number_, s.ToString().c_str());
return s;
}
if (file_size < BlobLogHeader::kSize) {
ROCKS_LOG_ERROR(info_log_,
"Incomplete blob file blob file %" ROCKSDB_PRIszt
", size: %" PRIu64,
file_number_, file_size);
return Status::Corruption("Incomplete blob file header.");
}
// Create file reader.
std::unique_ptr<RandomAccessFile> file;
s = env->NewRandomAccessFile(PathName(), &file, env_options);
if (!s.ok()) {
ROCKS_LOG_ERROR(info_log_,
"Failed to open blob file %" ROCKSDB_PRIszt ", status: %s",
file_number_, s.ToString().c_str());
return s;
}
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(std::move(file), PathName()));
// Read file header.
char header_buf[BlobLogHeader::kSize];
Slice header_slice;
s = file_reader->Read(0, BlobLogHeader::kSize, &header_slice, header_buf);
if (!s.ok()) {
ROCKS_LOG_ERROR(info_log_,
"Failed to read header of blob file %" ROCKSDB_PRIszt
", status: %s",
file_number_, s.ToString().c_str());
return s;
}
BlobLogHeader header;
s = header.DecodeFrom(header_slice);
if (!s.ok()) {
ROCKS_LOG_ERROR(info_log_,
"Failed to decode header of blob file %" ROCKSDB_PRIszt
", status: %s",
file_number_, s.ToString().c_str());
return s;
}
column_family_id_ = header.column_family_id;
compression_ = header.compression;
has_ttl_ = header.has_ttl;
if (has_ttl_) {
expiration_range_ = header.expiration_range;
}
header_valid_ = true;
// Read file footer.
if (file_size_ < BlobLogHeader::kSize + BlobLogFooter::kSize) {
// OK not to have footer.
assert(!footer_valid_);
return Status::OK();
}
char footer_buf[BlobLogFooter::kSize];
Slice footer_slice;
s = file_reader->Read(file_size - BlobLogFooter::kSize, BlobLogFooter::kSize,
&footer_slice, footer_buf);
if (!s.ok()) {
ROCKS_LOG_ERROR(info_log_,
"Failed to read footer of blob file %" ROCKSDB_PRIszt
", status: %s",
file_number_, s.ToString().c_str());
return s;
}
BlobLogFooter footer;
s = footer.DecodeFrom(footer_slice);
if (!s.ok()) {
// OK not to have footer.
assert(!footer_valid_);
return Status::OK();
}
blob_count_ = footer.blob_count;
if (has_ttl_) {
assert(header.expiration_range.first <= footer.expiration_range.first);
assert(header.expiration_range.second >= footer.expiration_range.second);
expiration_range_ = footer.expiration_range;
}
footer_valid_ = true;
return Status::OK();
}
} // namespace blob_db
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -37,13 +37,19 @@ class BlobFile {
// after that
uint64_t file_number_;
// If true, the keys in this file all has TTL. Otherwise all keys don't
// have TTL.
bool has_ttl_;
// Info log.
Logger* info_log_;
// Column family id.
uint32_t column_family_id_;
// Compression type of blobs in the file
CompressionType compression_;
// If true, the keys in this file all has TTL. Otherwise all keys don't
// have TTL.
bool has_ttl_;
// number of blobs in the file
std::atomic<uint64_t> blob_count_;
@ -98,19 +104,25 @@ class BlobFile {
bool header_valid_;
bool footer_valid_;
SequenceNumber garbage_collection_finish_sequence_;
public:
BlobFile();
BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum);
BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum,
Logger* info_log);
~BlobFile();
uint32_t column_family_id() const;
// Returns log file's pathname relative to the main db dir
// Eg. For a live-log-file = blob_dir/000003.blob
void SetColumnFamilyId(uint32_t cf_id) {
column_family_id_ = cf_id;
}
// Returns log file's absolute pathname.
std::string PathName() const;
// Primary identifier for blob file.
@ -125,6 +137,13 @@ class BlobFile {
std::string DumpState() const;
// if the file is not taking any more appends.
bool Immutable() const { return closed_.load(); }
// Mark the file as immutable.
// REQUIRES: write lock held, or access from single thread (on DB open).
void MarkImmutable() { closed_ = true; }
// if the file has gone through GC and blobs have been relocated
bool Obsolete() const {
assert(Immutable() || !obsolete_.load());
@ -140,13 +159,10 @@ class BlobFile {
return obsolete_sequence_;
}
// if the file is not taking any more appends.
bool Immutable() const { return closed_.load(); }
// we will assume this is atomic
bool NeedsFsync(bool hard, uint64_t bytes_per_sync) const;
void Fsync();
Status Fsync();
uint64_t GetFileSize() const {
return file_size_.load(std::memory_order_acquire);
@ -184,6 +200,11 @@ class BlobFile {
std::shared_ptr<Writer> GetWriter() const { return log_writer_; }
// Read blob file header and footer. Return corruption if file header is
// malform or incomplete. If footer is malform or incomplete, set
// footer_valid_ to false and return Status::OK.
Status ReadMetadata(Env* env, const EnvOptions& env_options);
private:
std::shared_ptr<Reader> OpenSequentialReader(
Env* env, const DBOptions& db_options,

@ -32,10 +32,11 @@ Writer::Writer(unique_ptr<WritableFileWriter>&& dest, Env* env,
use_fsync_(use_fs),
last_elem_type_(kEtNone) {}
void Writer::Sync() {
Status Writer::Sync() {
StopWatch sync_sw(env_, statistics_, BLOB_DB_BLOB_FILE_SYNC_MICROS);
dest_->Sync(use_fsync_);
Status s = dest_->Sync(use_fsync_);
RecordTick(statistics_, BLOB_DB_BLOB_FILE_SYNCED);
return s;
}
Status Writer::WriteHeader(BlobLogHeader& header) {

@ -71,7 +71,7 @@ class Writer {
bool ShouldSync() const { return block_offset_ > next_sync_offset_; }
void Sync();
Status Sync();
void ResetSyncPointer() { next_sync_offset_ += bytes_per_sync_; }

Loading…
Cancel
Save