diff --git a/utilities/blob_db/blob_db.cc b/utilities/blob_db/blob_db.cc index 2ba4f23cd..e686e1a4c 100644 --- a/utilities/blob_db/blob_db.cc +++ b/utilities/blob_db/blob_db.cc @@ -12,89 +12,10 @@ #include "utilities/blob_db/blob_db.h" #include - -#include "db/write_batch_internal.h" -#include "monitoring/instrumented_mutex.h" -#include "options/cf_options.h" -#include "rocksdb/compaction_filter.h" -#include "rocksdb/convenience.h" -#include "rocksdb/env.h" -#include "rocksdb/iterator.h" -#include "rocksdb/utilities/stackable_db.h" -#include "table/block.h" -#include "table/block_based_table_builder.h" -#include "table/block_builder.h" -#include "util/file_reader_writer.h" -#include "util/filename.h" -#include "utilities/blob_db/blob_compaction_filter.h" #include "utilities/blob_db/blob_db_impl.h" namespace rocksdb { - namespace blob_db { -port::Mutex listener_mutex; -typedef std::shared_ptr FlushBeginListener_t; -typedef std::shared_ptr ReconcileWalFilter_t; -typedef std::shared_ptr - CompactionListener_t; - -// to ensure the lifetime of the listeners -std::vector> all_blobdb_listeners; -std::vector all_wal_filters; - -Status BlobDB::OpenAndLoad(const Options& options, - const BlobDBOptions& bdb_options, - const std::string& dbname, BlobDB** blob_db, - Options* changed_options) { - if (options.compaction_filter != nullptr || - options.compaction_filter_factory != nullptr) { - return Status::NotSupported("Blob DB doesn't support compaction filter."); - } - - *changed_options = options; - *blob_db = nullptr; - - FlushBeginListener_t fblistener = - std::make_shared(); - ReconcileWalFilter_t rw_filter = std::make_shared(); - CompactionListener_t ce_listener = - std::make_shared(); - - { - MutexLock l(&listener_mutex); - all_blobdb_listeners.push_back(fblistener); - if (bdb_options.enable_garbage_collection) { - all_blobdb_listeners.push_back(ce_listener); - } - all_wal_filters.push_back(rw_filter); - } - - changed_options->compaction_filter_factory.reset( - new BlobIndexCompactionFilterFactory(options.env, - options.statistics.get())); - changed_options->listeners.emplace_back(fblistener); - if (bdb_options.enable_garbage_collection) { - changed_options->listeners.emplace_back(ce_listener); - } - changed_options->wal_filter = rw_filter.get(); - - DBOptions db_options(*changed_options); - - // we need to open blob db first so that recovery can happen - BlobDBImpl* bdb = new BlobDBImpl(dbname, bdb_options, db_options); - - fblistener->SetImplPtr(bdb); - if (bdb_options.enable_garbage_collection) { - ce_listener->SetImplPtr(bdb); - } - rw_filter->SetImplPtr(bdb); - - Status s = bdb->OpenPhase1(); - if (!s.ok()) return s; - - *blob_db = bdb; - return s; -} Status BlobDB::Open(const Options& options, const BlobDBOptions& bdb_options, const std::string& dbname, BlobDB** blob_db) { @@ -116,96 +37,30 @@ Status BlobDB::Open(const Options& options, const BlobDBOptions& bdb_options, return s; } -Status BlobDB::Open(const DBOptions& db_options_input, +Status BlobDB::Open(const DBOptions& db_options, const BlobDBOptions& bdb_options, const std::string& dbname, const std::vector& column_families, - std::vector* handles, BlobDB** blob_db, - bool no_base_db) { + std::vector* handles, + BlobDB** blob_db) { if (column_families.size() != 1 || column_families[0].name != kDefaultColumnFamilyName) { return Status::NotSupported( "Blob DB doesn't support non-default column family."); } - *blob_db = nullptr; - Status s; - - DBOptions db_options(db_options_input); - if (db_options.info_log == nullptr) { - s = CreateLoggerFromOptions(dbname, db_options, &db_options.info_log); - if (!s.ok()) { - return s; - } - } - - FlushBeginListener_t fblistener = - std::make_shared(); - CompactionListener_t ce_listener = - std::make_shared(); - ReconcileWalFilter_t rw_filter = std::make_shared(); - - db_options.listeners.emplace_back(fblistener); - if (bdb_options.enable_garbage_collection) { - db_options.listeners.emplace_back(ce_listener); - } - db_options.wal_filter = rw_filter.get(); - - { - MutexLock l(&listener_mutex); - all_blobdb_listeners.push_back(fblistener); - if (bdb_options.enable_garbage_collection) { - all_blobdb_listeners.push_back(ce_listener); - } - all_wal_filters.push_back(rw_filter); - } - - ColumnFamilyOptions cf_options(column_families[0].options); - if (cf_options.compaction_filter != nullptr || - cf_options.compaction_filter_factory != nullptr) { - return Status::NotSupported("Blob DB doesn't support compaction filter."); - } - cf_options.compaction_filter_factory.reset( - new BlobIndexCompactionFilterFactory(db_options.env, - db_options.statistics.get())); - ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options); - // we need to open blob db first so that recovery can happen - BlobDBImpl* bdb = new BlobDBImpl(dbname, bdb_options, db_options); - fblistener->SetImplPtr(bdb); - if (bdb_options.enable_garbage_collection) { - ce_listener->SetImplPtr(bdb); - } - rw_filter->SetImplPtr(bdb); - - s = bdb->OpenPhase1(); - if (!s.ok()) { - delete bdb; - return s; - } - - if (no_base_db) { - *blob_db = bdb; - return s; - } - - DB* db = nullptr; - s = DB::Open(db_options, dbname, {cf_descriptor}, handles, &db); - if (!s.ok()) { - delete bdb; - return s; - } - - // set the implementation pointer - s = bdb->LinkToBaseDB(db); - if (!s.ok()) { - delete bdb; - bdb = nullptr; + BlobDBImpl* blob_db_impl = new BlobDBImpl(dbname, bdb_options, db_options, + column_families[0].options); + Status s = blob_db_impl->Open(handles); + if (s.ok()) { + *blob_db = static_cast(blob_db_impl); + } else { + delete blob_db_impl; + *blob_db = nullptr; } - *blob_db = bdb; - bdb_options.Dump(db_options.info_log.get()); return s; } -BlobDB::BlobDB(DB* db) : StackableDB(db) {} +BlobDB::BlobDB() : StackableDB(nullptr) {} void BlobDBOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir: %s", diff --git a/utilities/blob_db/blob_db.h b/utilities/blob_db/blob_db.h index 3ade460eb..ecf5fcf7e 100644 --- a/utilities/blob_db/blob_db.h +++ b/utilities/blob_db/blob_db.h @@ -190,23 +190,7 @@ class BlobDB : public StackableDB { return NewIterator(options); } - // Starting point for opening a Blob DB. - // changed_options - critical. Blob DB loads and inserts listeners - // into options which are necessary for recovery and atomicity - // Use this pattern if you need control on step 2, i.e. your - // BaseDB is not just a simple rocksdb but a stacked DB - // 1. ::OpenAndLoad - // 2. Open Base DB with the changed_options - // 3. ::LinkToBaseDB - static Status OpenAndLoad(const Options& options, - const BlobDBOptions& bdb_options, - const std::string& dbname, BlobDB** blob_db, - Options* changed_options); - - // This is another way to open BLOB DB which do not have other - // Stackable DB's in play - // Steps. - // 1. ::Open + // Opening blob db. static Status Open(const Options& options, const BlobDBOptions& bdb_options, const std::string& dbname, BlobDB** blob_db); @@ -215,16 +199,14 @@ class BlobDB : public StackableDB { const std::string& dbname, const std::vector& column_families, std::vector* handles, - BlobDB** blob_db, bool no_base_db = false); + BlobDB** blob_db); virtual BlobDBOptions GetBlobDBOptions() const = 0; virtual ~BlobDB() {} - virtual Status LinkToBaseDB(DB* db_base) = 0; - protected: - explicit BlobDB(DB* db); + explicit BlobDB(); }; // Destroy the content of the database. diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 0a51acdc7..f78210bb9 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -8,7 +8,6 @@ #include #include #include -#include #include #include "db/db_impl.h" @@ -34,6 +33,7 @@ #include "util/stop_watch.h" #include "util/sync_point.h" #include "util/timer_queue.h" +#include "utilities/blob_db/blob_compaction_filter.h" #include "utilities/blob_db/blob_db_iterator.h" #include "utilities/blob_db/blob_index.h" @@ -44,10 +44,9 @@ int kBlockBasedTableVersionFormat = 2; namespace rocksdb { namespace blob_db { -Random blob_rgen(static_cast(time(nullptr))); - void BlobDBFlushBeginListener::OnFlushBegin(DB* db, const FlushJobInfo& info) { - if (impl_) impl_->OnFlushBeginHandler(db, info); + assert(blob_db_impl_ != nullptr); + blob_db_impl_->SyncBlobFiles(); } WalFilter::WalProcessingOption BlobReconcileWalFilter::LogRecordFound( @@ -100,13 +99,16 @@ void EvictAllVersionsCompactionListener::InternalListener::OnCompaction( BlobDBImpl::BlobDBImpl(const std::string& dbname, const BlobDBOptions& blob_db_options, - const DBOptions& db_options) - : BlobDB(nullptr), + const DBOptions& db_options, + const ColumnFamilyOptions& cf_options) + : BlobDB(), + dbname_(dbname), db_impl_(nullptr), env_(db_options.env), ttl_extractor_(blob_db_options.ttl_extractor.get()), bdb_options_(blob_db_options), db_options_(db_options), + cf_options_(cf_options), env_options_(db_options), statistics_(db_options_.statistics.get()), dir_change_(false), @@ -124,86 +126,82 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname, : bdb_options_.blob_dir; } -Status BlobDBImpl::LinkToBaseDB(DB* db) { +BlobDBImpl::~BlobDBImpl() { + // CancelAllBackgroundWork(db_, true); + + Shutdown(); +} + +BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; } + +Status BlobDBImpl::Open(std::vector* handles) { + assert(handles != nullptr); assert(db_ == nullptr); - assert(open_p1_done_); + if (blob_dir_.empty()) { + return Status::NotSupported("No blob directory in options"); + } + if (cf_options_.compaction_filter != nullptr || + cf_options_.compaction_filter_factory != nullptr) { + return Status::NotSupported("Blob DB doesn't support compaction filter."); + } - db_ = db; + Status s; - // the Base DB in-itself can be a stackable DB - db_impl_ = static_cast_with_check(db_->GetRootDB()); + // Create info log. + if (db_options_.info_log == nullptr) { + s = CreateLoggerFromOptions(dbname_, db_options_, &db_options_.info_log); + if (!s.ok()) { + return s; + } + } - env_ = db_->GetEnv(); + ROCKS_LOG_INFO(db_options_.info_log, "Opening BlobDB..."); - Status s = env_->CreateDirIfMissing(blob_dir_); + // Open blob directory. + s = env_->CreateDirIfMissing(blob_dir_); if (!s.ok()) { - ROCKS_LOG_WARN(db_options_.info_log, - "Failed to create blob directory: %s status: '%s'", - blob_dir_.c_str(), s.ToString().c_str()); + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to create blob_dir %s, status: %s", + blob_dir_.c_str(), s.ToString().c_str()); } s = env_->NewDirectory(blob_dir_, &dir_ent_); if (!s.ok()) { - ROCKS_LOG_WARN(db_options_.info_log, - "Failed to open blob directory: %s status: '%s'", - blob_dir_.c_str(), s.ToString().c_str()); + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to open blob_dir %s, status: %s", blob_dir_.c_str(), + s.ToString().c_str()); + return s; } - if (!bdb_options_.disable_background_tasks) { - StartBackgroundTasks(); + // Open blob files. + s = OpenAllBlobFiles(); + if (!s.ok()) { + return s; } - return s; -} - -BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; } -BlobDBImpl::BlobDBImpl(DB* db, const BlobDBOptions& blob_db_options) - : BlobDB(db), - db_impl_(static_cast_with_check(db)), - env_(nullptr), - ttl_extractor_(nullptr), - bdb_options_(blob_db_options), - db_options_(db->GetOptions()), - env_options_(db_->GetOptions()), - statistics_(db_options_.statistics.get()), - dir_change_(false), - next_file_number_(1), - epoch_of_(0), - shutdown_(false), - current_epoch_(0), - open_file_count_(0), - total_blob_space_(0), - open_p1_done_(false), - debug_level_(0), - oldest_file_evicted_(false) { - if (!bdb_options_.blob_dir.empty()) - blob_dir_ = (bdb_options_.path_relative) - ? db_->GetName() + "/" + bdb_options_.blob_dir - : bdb_options_.blob_dir; -} - -BlobDBImpl::~BlobDBImpl() { - // CancelAllBackgroundWork(db_, true); - - Shutdown(); -} - -Status BlobDBImpl::OpenPhase1() { - assert(db_ == nullptr); - if (blob_dir_.empty()) - return Status::NotSupported("No blob directory in options"); + // Update options + db_options_.listeners.push_back( + std::shared_ptr(new BlobDBFlushBeginListener(this))); + if (bdb_options_.enable_garbage_collection) { + db_options_.listeners.push_back(std::shared_ptr( + new EvictAllVersionsCompactionListener(this))); + } + cf_options_.compaction_filter_factory.reset( + new BlobIndexCompactionFilterFactory(env_, statistics_)); - std::unique_ptr dir_ent; - Status s = env_->NewDirectory(blob_dir_, &dir_ent); + // Open base db. + ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_); + s = DB::Open(db_options_, dbname_, {cf_descriptor}, handles, &db_); if (!s.ok()) { - ROCKS_LOG_WARN(db_options_.info_log, - "Failed to open blob directory: %s status: '%s'", - blob_dir_.c_str(), s.ToString().c_str()); - open_p1_done_ = true; - return Status::OK(); + return s; + } + db_impl_ = static_cast_with_check(db_->GetRootDB()); + + // Start background jobs. + if (!bdb_options_.disable_background_tasks) { + StartBackgroundTasks(); } - s = OpenAllFiles(); - open_p1_done_ = true; + ROCKS_LOG_INFO(db_options_.info_log, "BlobDB pointer %p", this); return s; } @@ -236,196 +234,91 @@ void BlobDBImpl::StartBackgroundTasks() { void BlobDBImpl::Shutdown() { shutdown_.store(true); } -void BlobDBImpl::OnFlushBeginHandler(DB* db, const FlushJobInfo& info) { - if (shutdown_.load()) return; - - // a callback that happens too soon needs to be ignored - if (!db_) return; - - FsyncFiles(false); -} - -Status BlobDBImpl::GetAllLogFiles( - std::set>* file_nums) { +Status BlobDBImpl::GetAllBlobFiles(std::set* file_numbers) { + assert(file_numbers != nullptr); std::vector all_files; - Status status = env_->GetChildren(blob_dir_, &all_files); - if (!status.ok()) { - return status; + Status s = env_->GetChildren(blob_dir_, &all_files); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to get list of blob files, status: %s", + s.ToString().c_str()); + return s; } - for (const auto& f : all_files) { - uint64_t number; + for (const auto& file_name : all_files) { + uint64_t file_number; FileType type; - bool psucc = ParseFileName(f, &number, &type); - if (psucc && type == kBlobFile) { - file_nums->insert(std::make_pair(number, f)); + bool success = ParseFileName(file_name, &file_number, &type); + if (success && type == kBlobFile) { + file_numbers->insert(file_number); } else { ROCKS_LOG_WARN(db_options_.info_log, - "Skipping file in blob directory %s parse: %d type: %d", - f.c_str(), psucc, ((psucc) ? type : -1)); + "Skipping file in blob directory: %s", file_name.c_str()); } } - return status; + return s; } -Status BlobDBImpl::OpenAllFiles() { - WriteLock wl(&mutex_); - - std::set> file_nums; - Status status = GetAllLogFiles(&file_nums); +Status BlobDBImpl::OpenAllBlobFiles() { + std::set file_numbers; + Status s = GetAllBlobFiles(&file_numbers); + if (!s.ok()) { + return s; + } - if (!status.ok()) { - ROCKS_LOG_ERROR(db_options_.info_log, - "Failed to collect files from blob dir: %s status: '%s'", - blob_dir_.c_str(), status.ToString().c_str()); - return status; + if (!file_numbers.empty()) { + next_file_number_.store(*file_numbers.rbegin() + 1); } - ROCKS_LOG_INFO(db_options_.info_log, - "BlobDir files path: %s count: %d min: %" PRIu64 - " max: %" PRIu64, - blob_dir_.c_str(), static_cast(file_nums.size()), - (file_nums.empty()) ? -1 : file_nums.cbegin()->first, - (file_nums.empty()) ? -1 : file_nums.crbegin()->first); - - if (!file_nums.empty()) - next_file_number_.store((file_nums.rbegin())->first + 1); - - for (auto& f_iter : file_nums) { - std::string bfpath = BlobFileName(blob_dir_, f_iter.first); - uint64_t size_bytes; - Status s1 = env_->GetFileSize(bfpath, &size_bytes); - if (!s1.ok()) { - ROCKS_LOG_WARN( - db_options_.info_log, - "Unable to get size of %s. File skipped from open status: '%s'", - bfpath.c_str(), s1.ToString().c_str()); - continue; - } + std::string blob_file_list; + std::string obsolete_file_list; - if (debug_level_ >= 1) - ROCKS_LOG_INFO(db_options_.info_log, "Blob File open: %s size: %" PRIu64, - bfpath.c_str(), size_bytes); + for (auto& file_number : file_numbers) { + std::shared_ptr blob_file = std::make_shared( + this, blob_dir_, file_number, db_options_.info_log.get()); + blob_file->MarkImmutable(); - std::shared_ptr bfptr = - std::make_shared(this, blob_dir_, f_iter.first); - bfptr->SetFileSize(size_bytes); + // Read file header and footer + Status read_metadata_status = blob_file->ReadMetadata(env_, env_options_); + if (read_metadata_status.IsCorruption()) { + // Remove incomplete file. + blob_file->MarkObsolete(0 /*sequence number*/); + obsolete_files_.push_back(blob_file); + if (!obsolete_file_list.empty()) { + obsolete_file_list.append(", "); + } + obsolete_file_list.append(ToString(file_number)); + continue; + } else if (!read_metadata_status.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Unable to read metadata of blob file % " PRIu64 + ", status: '%s'", + file_number, read_metadata_status.ToString().c_str()); + return read_metadata_status; + } // since this file already existed, we will try to reconcile // deleted count with LSM - bfptr->gc_once_after_open_ = true; - - // read header - std::shared_ptr reader; - reader = bfptr->OpenSequentialReader(env_, db_options_, env_options_); - s1 = reader->ReadHeader(&bfptr->header_); - if (!s1.ok()) { - ROCKS_LOG_ERROR(db_options_.info_log, - "Failure to read header for blob-file %s " - "status: '%s' size: %" PRIu64, - bfpath.c_str(), s1.ToString().c_str(), size_bytes); - continue; + if (bdb_options_.enable_garbage_collection) { + blob_file->gc_once_after_open_ = true; } - bfptr->SetHasTTL(bfptr->header_.has_ttl); - bfptr->SetCompression(bfptr->header_.compression); - bfptr->header_valid_ = true; - std::shared_ptr ra_reader = - GetOrOpenRandomAccessReader(bfptr, env_, env_options_); - - BlobLogFooter bf; - s1 = bfptr->ReadFooter(&bf); - - bfptr->CloseRandomAccessLocked(); - if (s1.ok()) { - s1 = bfptr->SetFromFooterLocked(bf); - if (!s1.ok()) { - ROCKS_LOG_ERROR(db_options_.info_log, - "Header Footer mismatch for blob-file %s " - "status: '%s' size: %" PRIu64, - bfpath.c_str(), s1.ToString().c_str(), size_bytes); - continue; - } - } else { - ROCKS_LOG_INFO(db_options_.info_log, - "File found incomplete (w/o footer) %s", bfpath.c_str()); - - // sequentially iterate over the file and read all the records - ExpirationRange expiration_range(std::numeric_limits::max(), - std::numeric_limits::min()); - - uint64_t blob_count = 0; - BlobLogRecord record; - Reader::ReadLevel shallow = Reader::kReadHeaderKey; - - uint64_t record_start = reader->GetNextByte(); - // TODO(arahut) - when we detect corruption, we should truncate - while (reader->ReadRecord(&record, shallow).ok()) { - ++blob_count; - if (bfptr->HasTTL()) { - expiration_range.first = - std::min(expiration_range.first, record.expiration); - expiration_range.second = - std::max(expiration_range.second, record.expiration); - } - record_start = reader->GetNextByte(); - } - - if (record_start != bfptr->GetFileSize()) { - ROCKS_LOG_ERROR(db_options_.info_log, - "Blob file is corrupted or crashed during write %s" - " good_size: %" PRIu64 " file_size: %" PRIu64, - bfpath.c_str(), record_start, bfptr->GetFileSize()); - } - - if (!blob_count) { - ROCKS_LOG_INFO(db_options_.info_log, "BlobCount = 0 in file %s", - bfpath.c_str()); - continue; - } - - bfptr->SetBlobCount(blob_count); - bfptr->SetSequenceRange({0, 0}); - - ROCKS_LOG_INFO(db_options_.info_log, - "Blob File: %s blob_count: %" PRIu64 - " size_bytes: %" PRIu64 " has_ttl: %d", - bfpath.c_str(), blob_count, size_bytes, bfptr->HasTTL()); - - if (bfptr->HasTTL()) { - expiration_range.second = std::max( - expiration_range.second, - expiration_range.first + (uint32_t)bdb_options_.ttl_range_secs); - bfptr->set_expiration_range(expiration_range); - - uint64_t now = EpochNow(); - if (expiration_range.second < now) { - Status fstatus = CreateWriterLocked(bfptr); - if (fstatus.ok()) fstatus = bfptr->WriteFooterAndCloseLocked(); - if (!fstatus.ok()) { - ROCKS_LOG_ERROR( - db_options_.info_log, - "Failed to close Blob File: %s status: '%s'. Skipped", - bfpath.c_str(), fstatus.ToString().c_str()); - continue; - } else { - ROCKS_LOG_ERROR( - db_options_.info_log, - "Blob File Closed: %s now: %d expiration_range: (%d, %d)", - bfpath.c_str(), now, expiration_range.first, - expiration_range.second); - } - } else { - open_ttl_files_.insert(bfptr); - } - } + blob_files_[file_number] = blob_file; + if (!blob_file_list.empty()) { + blob_file_list.append(", "); } - - blob_files_.insert(std::make_pair(f_iter.first, bfptr)); + blob_file_list.append(ToString(file_number)); } - return status; + ROCKS_LOG_INFO(db_options_.info_log, + "Found %" ROCKSDB_PRIszt " blob files: %s", blob_files_.size(), + blob_file_list.c_str()); + ROCKS_LOG_INFO(db_options_.info_log, + "Found %" ROCKSDB_PRIszt + " incomplete or corrupted blob files: %s", + obsolete_files_.size(), obsolete_file_list.c_str()); + return s; } void BlobDBImpl::CloseRandomAccessLocked( @@ -445,7 +338,8 @@ std::shared_ptr BlobDBImpl::GetOrOpenRandomAccessReader( std::shared_ptr BlobDBImpl::NewBlobFile(const std::string& reason) { uint64_t file_num = next_file_number_++; - auto bfile = std::make_shared(this, blob_dir_, file_num); + auto bfile = std::make_shared(this, blob_dir_, file_num, + db_options_.info_log.get()); ROCKS_LOG_DEBUG(db_options_.info_log, "New blob file created: %s reason='%s'", bfile->PathName().c_str(), reason.c_str()); LogFlush(db_options_.info_log); @@ -565,6 +459,7 @@ std::shared_ptr BlobDBImpl::SelectBlobFile() { bfile->header_.column_family_id = reinterpret_cast(DefaultColumnFamily())->GetID(); bfile->header_valid_ = true; + bfile->SetColumnFamilyId(bfile->header_.column_family_id); bfile->SetHasTTL(false); bfile->SetCompression(bdb_options_.compression); @@ -626,6 +521,7 @@ std::shared_ptr BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) { reinterpret_cast(DefaultColumnFamily())->GetID(); ; bfile->header_valid_ = true; + bfile->SetColumnFamilyId(bfile->header_.column_family_id); bfile->SetHasTTL(true); bfile->SetCompression(bdb_options_.compression); bfile->file_size_ = BlobLogHeader::kSize; @@ -1536,8 +1432,14 @@ std::pair BlobDBImpl::CheckSeqFiles(bool aborted) { } std::pair BlobDBImpl::FsyncFiles(bool aborted) { - if (aborted) return std::make_pair(false, -1); + if (aborted || shutdown_) { + return std::make_pair(false, -1); + } + SyncBlobFiles(); + return std::make_pair(true, -1); +} +Status BlobDBImpl::SyncBlobFiles() { MutexLock l(&write_mutex_); std::vector> process_files; @@ -1554,14 +1456,26 @@ std::pair BlobDBImpl::FsyncFiles(bool aborted) { } } - for (auto fitr : process_files) { - if (fitr->NeedsFsync(true, bdb_options_.bytes_per_sync)) fitr->Fsync(); + Status s; + + for (auto& blob_file : process_files) { + if (blob_file->NeedsFsync(true, bdb_options_.bytes_per_sync)) { + s = blob_file->Fsync(); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to sync blob file %" PRIu64 ", status: %s", + blob_file->BlobFileNumber(), s.ToString().c_str()); + return s; + } + } } bool expected = true; - if (dir_change_.compare_exchange_weak(expected, false)) dir_ent_->Fsync(); + if (dir_change_.compare_exchange_weak(expected, false)) { + s = dir_ent_->Fsync(); + } - return std::make_pair(true, -1); + return s; } std::pair BlobDBImpl::ReclaimOpenFiles(bool aborted) { diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 2a2eb83a2..3e361cd18 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -49,20 +49,20 @@ class BlobDBImpl; class BlobDBFlushBeginListener : public EventListener { public: - explicit BlobDBFlushBeginListener() : impl_(nullptr) {} + explicit BlobDBFlushBeginListener(BlobDBImpl* blob_db_impl) + : blob_db_impl_(blob_db_impl) {} void OnFlushBegin(DB* db, const FlushJobInfo& info) override; - void SetImplPtr(BlobDBImpl* p) { impl_ = p; } - - protected: - BlobDBImpl* impl_; + private: + BlobDBImpl* blob_db_impl_; }; // this implements the callback from the WAL which ensures that the // blob record is present in the blob log. If fsync/fdatasync in not // happening on every write, there is the probability that keys in the // blob log can lag the keys in blobs +// TODO(yiwu): implement the WAL filter. class BlobReconcileWalFilter : public WalFilter { public: virtual WalFilter::WalProcessingOption LogRecordFound( @@ -71,11 +71,6 @@ class BlobReconcileWalFilter : public WalFilter { bool* batch_changed) override; virtual const char* Name() const override { return "BlobDBWalReconciler"; } - - void SetImplPtr(BlobDBImpl* p) { impl_ = p; } - - protected: - BlobDBImpl* impl_; }; class EvictAllVersionsCompactionListener : public EventListener { @@ -84,49 +79,28 @@ class EvictAllVersionsCompactionListener : public EventListener { friend class BlobDBImpl; public: + explicit InternalListener(BlobDBImpl* blob_db_impl) : impl_(blob_db_impl) {} + virtual void OnCompaction(int level, const Slice& key, CompactionListenerValueType value_type, const Slice& existing_value, const SequenceNumber& sn, bool is_new) override; - void SetImplPtr(BlobDBImpl* p) { impl_ = p; } - private: BlobDBImpl* impl_; }; - explicit EvictAllVersionsCompactionListener() - : internal_listener_(new InternalListener()) {} + explicit EvictAllVersionsCompactionListener(BlobDBImpl* blob_db_impl) + : internal_listener_(new InternalListener(blob_db_impl)) {} virtual CompactionEventListener* GetCompactionEventListener() override { return internal_listener_.get(); } - void SetImplPtr(BlobDBImpl* p) { internal_listener_->SetImplPtr(p); } - private: std::unique_ptr internal_listener_; }; -#if 0 -class EvictAllVersionsFilterFactory : public CompactionFilterFactory { - private: - BlobDBImpl* impl_; - - public: - EvictAllVersionsFilterFactory() : impl_(nullptr) {} - - void SetImplPtr(BlobDBImpl* p) { impl_ = p; } - - virtual std::unique_ptr CreateCompactionFilter( - const CompactionFilter::Context& context) override; - - virtual const char* Name() const override { - return "EvictAllVersionsFilterFactory"; - } -}; -#endif - // Comparator to sort "TTL" aware Blob files based on the lower value of // TTL range. struct blobf_compare_ttl { @@ -150,9 +124,7 @@ struct GCStats { * Garbage Collected. */ class BlobDBImpl : public BlobDB { - friend class BlobDBFlushBeginListener; friend class EvictAllVersionsCompactionListener; - friend class BlobDB; friend class BlobFile; friend class BlobDBIterator; @@ -246,17 +218,18 @@ class BlobDBImpl : public BlobDB { Status PutUntil(const WriteOptions& options, const Slice& key, const Slice& value, uint64_t expiration) override; - Status LinkToBaseDB(DB* db) override; - BlobDBOptions GetBlobDBOptions() const override; - BlobDBImpl(DB* db, const BlobDBOptions& bdb_options); - BlobDBImpl(const std::string& dbname, const BlobDBOptions& bdb_options, - const DBOptions& db_options); + const DBOptions& db_options, + const ColumnFamilyOptions& cf_options); ~BlobDBImpl(); + Status Open(std::vector* handles); + + Status SyncBlobFiles(); + #ifndef NDEBUG Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry, PinnableSlice* value); @@ -279,8 +252,6 @@ class BlobDBImpl : public BlobDB { class GarbageCollectionWriteCallback; class BlobInserter; - Status OpenPhase1(); - // Create a snapshot if there isn't one in read options. // Return true if a snapshot is created. bool SetSnapshotIfNeeded(ReadOptions* read_options); @@ -295,10 +266,6 @@ class BlobDBImpl : public BlobDB { Slice GetCompressedSlice(const Slice& raw, std::string* compression_output) const; - // Just before flush starts acting on memtable files, - // this handler is called. - void OnFlushBeginHandler(DB* db, const FlushJobInfo& info); - // is this file ready for Garbage collection. if the TTL of the file // has expired or if threshold of the file has been evicted // tt - current time @@ -306,9 +273,6 @@ class BlobDBImpl : public BlobDB { bool ShouldGCFile(std::shared_ptr bfile, uint64_t now, bool is_oldest_non_ttl_file, std::string* reason); - // collect all the blob log files from the blob directory - Status GetAllLogFiles(std::set>* file_nums); - // Close a file by appending a footer, and removes file from open files list. Status CloseBlobFile(std::shared_ptr bfile); @@ -374,7 +338,11 @@ class BlobDBImpl : public BlobDB { // add a new Blob File std::shared_ptr NewBlobFile(const std::string& reason); - Status OpenAllFiles(); + // collect all the blob log files from the blob directory + Status GetAllBlobFiles(std::set* file_numbers); + + // Open all blob files found in blob_dir. + Status OpenAllBlobFiles(); // hold write mutex on file and call // creates a Random Access reader for GET call @@ -428,6 +396,9 @@ class BlobDBImpl : public BlobDB { bool EvictOldestBlobFile(); + // name of the database directory + std::string dbname_; + // the base DB DBImpl* db_impl_; Env* env_; @@ -436,14 +407,12 @@ class BlobDBImpl : public BlobDB { // the options that govern the behavior of Blob Storage BlobDBOptions bdb_options_; DBOptions db_options_; + ColumnFamilyOptions cf_options_; EnvOptions env_options_; // Raw pointer of statistic. db_options_ has a shared_ptr to hold ownership. Statistics* statistics_; - // name of the database directory - std::string dbname_; - // by default this is "blob_dir" under dbname_ // but can be configured std::string blob_dir_; diff --git a/utilities/blob_db/blob_file.cc b/utilities/blob_db/blob_file.cc index 3a7812132..18c17516c 100644 --- a/utilities/blob_db/blob_file.cc +++ b/utilities/blob_db/blob_file.cc @@ -13,6 +13,7 @@ #include #include +#include #include #include "db/column_family.h" @@ -29,8 +30,10 @@ namespace blob_db { BlobFile::BlobFile() : parent_(nullptr), file_number_(0), - has_ttl_(false), + info_log_(nullptr), + column_family_id_(std::numeric_limits::max()), compression_(kNoCompression), + has_ttl_(false), blob_count_(0), gc_epoch_(-1), file_size_(0), @@ -43,14 +46,18 @@ BlobFile::BlobFile() sequence_range_({kMaxSequenceNumber, 0}), last_access_(-1), last_fsync_(0), - header_valid_(false) {} + header_valid_(false), + footer_valid_(false) {} -BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn) +BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn, + Logger* info_log) : parent_(p), path_to_dir_(bdir), file_number_(fn), - has_ttl_(false), + info_log_(info_log), + column_family_id_(std::numeric_limits::max()), compression_(kNoCompression), + has_ttl_(false), blob_count_(0), gc_epoch_(-1), file_size_(0), @@ -63,7 +70,8 @@ BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn) sequence_range_({kMaxSequenceNumber, 0}), last_access_(-1), last_fsync_(0), - header_valid_(false) {} + header_valid_(false), + footer_valid_(false) {} BlobFile::~BlobFile() { if (obsolete_) { @@ -76,12 +84,7 @@ BlobFile::~BlobFile() { } } -uint32_t BlobFile::column_family_id() const { - // TODO(yiwu): Should return column family id encoded in blob file after - // we add blob db column family support. - return reinterpret_cast(parent_->DefaultColumnFamily()) - ->GetID(); -} +uint32_t BlobFile::column_family_id() const { return column_family_id_; } std::string BlobFile::PathName() const { return BlobFileName(path_to_dir_, file_number_); @@ -123,6 +126,7 @@ std::string BlobFile::DumpState() const { } void BlobFile::MarkObsolete(SequenceNumber sequence) { + assert(Immutable()); obsolete_sequence_ = sequence; obsolete_.store(true); } @@ -186,11 +190,13 @@ Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) { return Status::OK(); } -void BlobFile::Fsync() { +Status BlobFile::Fsync() { + Status s; if (log_writer_.get()) { - log_writer_->Sync(); + s = log_writer_->Sync(); last_fsync_.store(file_size_.load()); } + return s; } void BlobFile::CloseRandomAccessLocked() { @@ -216,7 +222,7 @@ std::shared_ptr BlobFile::GetOrOpenRandomAccessReader( std::unique_ptr rfile; Status s = env->NewRandomAccessFile(PathName(), &rfile, env_options); if (!s.ok()) { - ROCKS_LOG_ERROR(parent_->db_options_.info_log, + ROCKS_LOG_ERROR(info_log_, "Failed to open blob file for random-read: %s status: '%s'" " exists: '%s'", PathName().c_str(), s.ToString().c_str(), @@ -230,6 +236,102 @@ std::shared_ptr BlobFile::GetOrOpenRandomAccessReader( return ra_file_reader_; } +Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) { + assert(Immutable()); + // Get file size. + uint64_t file_size = 0; + Status s = env->GetFileSize(PathName(), &file_size); + if (s.ok()) { + file_size_ = file_size; + } else { + ROCKS_LOG_ERROR(info_log_, + "Failed to get size of blob file %" ROCKSDB_PRIszt + ", status: %s", + file_number_, s.ToString().c_str()); + return s; + } + if (file_size < BlobLogHeader::kSize) { + ROCKS_LOG_ERROR(info_log_, + "Incomplete blob file blob file %" ROCKSDB_PRIszt + ", size: %" PRIu64, + file_number_, file_size); + return Status::Corruption("Incomplete blob file header."); + } + + // Create file reader. + std::unique_ptr file; + s = env->NewRandomAccessFile(PathName(), &file, env_options); + if (!s.ok()) { + ROCKS_LOG_ERROR(info_log_, + "Failed to open blob file %" ROCKSDB_PRIszt ", status: %s", + file_number_, s.ToString().c_str()); + return s; + } + std::unique_ptr file_reader( + new RandomAccessFileReader(std::move(file), PathName())); + + // Read file header. + char header_buf[BlobLogHeader::kSize]; + Slice header_slice; + s = file_reader->Read(0, BlobLogHeader::kSize, &header_slice, header_buf); + if (!s.ok()) { + ROCKS_LOG_ERROR(info_log_, + "Failed to read header of blob file %" ROCKSDB_PRIszt + ", status: %s", + file_number_, s.ToString().c_str()); + return s; + } + BlobLogHeader header; + s = header.DecodeFrom(header_slice); + if (!s.ok()) { + ROCKS_LOG_ERROR(info_log_, + "Failed to decode header of blob file %" ROCKSDB_PRIszt + ", status: %s", + file_number_, s.ToString().c_str()); + return s; + } + column_family_id_ = header.column_family_id; + compression_ = header.compression; + has_ttl_ = header.has_ttl; + if (has_ttl_) { + expiration_range_ = header.expiration_range; + } + header_valid_ = true; + + // Read file footer. + if (file_size_ < BlobLogHeader::kSize + BlobLogFooter::kSize) { + // OK not to have footer. + assert(!footer_valid_); + return Status::OK(); + } + char footer_buf[BlobLogFooter::kSize]; + Slice footer_slice; + s = file_reader->Read(file_size - BlobLogFooter::kSize, BlobLogFooter::kSize, + &footer_slice, footer_buf); + if (!s.ok()) { + ROCKS_LOG_ERROR(info_log_, + "Failed to read footer of blob file %" ROCKSDB_PRIszt + ", status: %s", + file_number_, s.ToString().c_str()); + return s; + } + BlobLogFooter footer; + s = footer.DecodeFrom(footer_slice); + if (!s.ok()) { + // OK not to have footer. + assert(!footer_valid_); + return Status::OK(); + } + blob_count_ = footer.blob_count; + if (has_ttl_) { + assert(header.expiration_range.first <= footer.expiration_range.first); + assert(header.expiration_range.second >= footer.expiration_range.second); + expiration_range_ = footer.expiration_range; + } + footer_valid_ = true; + return Status::OK(); +} + } // namespace blob_db } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/blob_db/blob_file.h b/utilities/blob_db/blob_file.h index 4085cfef0..cc5a8c3e3 100644 --- a/utilities/blob_db/blob_file.h +++ b/utilities/blob_db/blob_file.h @@ -37,13 +37,19 @@ class BlobFile { // after that uint64_t file_number_; - // If true, the keys in this file all has TTL. Otherwise all keys don't - // have TTL. - bool has_ttl_; + // Info log. + Logger* info_log_; + + // Column family id. + uint32_t column_family_id_; // Compression type of blobs in the file CompressionType compression_; + // If true, the keys in this file all has TTL. Otherwise all keys don't + // have TTL. + bool has_ttl_; + // number of blobs in the file std::atomic blob_count_; @@ -98,19 +104,25 @@ class BlobFile { bool header_valid_; + bool footer_valid_; + SequenceNumber garbage_collection_finish_sequence_; public: BlobFile(); - BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum); + BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum, + Logger* info_log); ~BlobFile(); uint32_t column_family_id() const; - // Returns log file's pathname relative to the main db dir - // Eg. For a live-log-file = blob_dir/000003.blob + void SetColumnFamilyId(uint32_t cf_id) { + column_family_id_ = cf_id; + } + + // Returns log file's absolute pathname. std::string PathName() const; // Primary identifier for blob file. @@ -125,6 +137,13 @@ class BlobFile { std::string DumpState() const; + // if the file is not taking any more appends. + bool Immutable() const { return closed_.load(); } + + // Mark the file as immutable. + // REQUIRES: write lock held, or access from single thread (on DB open). + void MarkImmutable() { closed_ = true; } + // if the file has gone through GC and blobs have been relocated bool Obsolete() const { assert(Immutable() || !obsolete_.load()); @@ -140,13 +159,10 @@ class BlobFile { return obsolete_sequence_; } - // if the file is not taking any more appends. - bool Immutable() const { return closed_.load(); } - // we will assume this is atomic bool NeedsFsync(bool hard, uint64_t bytes_per_sync) const; - void Fsync(); + Status Fsync(); uint64_t GetFileSize() const { return file_size_.load(std::memory_order_acquire); @@ -184,6 +200,11 @@ class BlobFile { std::shared_ptr GetWriter() const { return log_writer_; } + // Read blob file header and footer. Return corruption if file header is + // malform or incomplete. If footer is malform or incomplete, set + // footer_valid_ to false and return Status::OK. + Status ReadMetadata(Env* env, const EnvOptions& env_options); + private: std::shared_ptr OpenSequentialReader( Env* env, const DBOptions& db_options, diff --git a/utilities/blob_db/blob_log_writer.cc b/utilities/blob_db/blob_log_writer.cc index 90e648290..9b0ca74f7 100644 --- a/utilities/blob_db/blob_log_writer.cc +++ b/utilities/blob_db/blob_log_writer.cc @@ -32,10 +32,11 @@ Writer::Writer(unique_ptr&& dest, Env* env, use_fsync_(use_fs), last_elem_type_(kEtNone) {} -void Writer::Sync() { +Status Writer::Sync() { StopWatch sync_sw(env_, statistics_, BLOB_DB_BLOB_FILE_SYNC_MICROS); - dest_->Sync(use_fsync_); + Status s = dest_->Sync(use_fsync_); RecordTick(statistics_, BLOB_DB_BLOB_FILE_SYNCED); + return s; } Status Writer::WriteHeader(BlobLogHeader& header) { diff --git a/utilities/blob_db/blob_log_writer.h b/utilities/blob_db/blob_log_writer.h index a41bd2c5a..dccac355c 100644 --- a/utilities/blob_db/blob_log_writer.h +++ b/utilities/blob_db/blob_log_writer.h @@ -71,7 +71,7 @@ class Writer { bool ShouldSync() const { return block_offset_ > next_sync_offset_; } - void Sync(); + Status Sync(); void ResetSyncPointer() { next_sync_offset_ += bytes_per_sync_; }