fork of https://github.com/oxigraph/rocksdb and https://github.com/facebook/rocksdb for nextgraph and oxigraph
				
			
			
		
			You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							2177 lines
						
					
					
						
							69 KiB
						
					
					
				
			
		
		
	
	
							2177 lines
						
					
					
						
							69 KiB
						
					
					
				| 
 | |
| //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
 | |
| //  This source code is licensed under both the GPLv2 (found in the
 | |
| //  COPYING file in the root directory) and Apache 2.0 License
 | |
| //  (found in the LICENSE.Apache file in the root directory).
 | |
| #ifndef ROCKSDB_LITE
 | |
| 
 | |
| #include "utilities/blob_db/blob_db_impl.h"
 | |
| #include <algorithm>
 | |
| #include <cinttypes>
 | |
| #include <iomanip>
 | |
| #include <memory>
 | |
| #include <sstream>
 | |
| 
 | |
| #include "db/blob/blob_index.h"
 | |
| #include "db/db_impl/db_impl.h"
 | |
| #include "db/write_batch_internal.h"
 | |
| #include "file/file_util.h"
 | |
| #include "file/filename.h"
 | |
| #include "file/random_access_file_reader.h"
 | |
| #include "file/sst_file_manager_impl.h"
 | |
| #include "file/writable_file_writer.h"
 | |
| #include "logging/logging.h"
 | |
| #include "monitoring/instrumented_mutex.h"
 | |
| #include "monitoring/statistics.h"
 | |
| #include "rocksdb/convenience.h"
 | |
| #include "rocksdb/env.h"
 | |
| #include "rocksdb/iterator.h"
 | |
| #include "rocksdb/utilities/stackable_db.h"
 | |
| #include "rocksdb/utilities/transaction.h"
 | |
| #include "table/block_based/block.h"
 | |
| #include "table/block_based/block_based_table_builder.h"
 | |
| #include "table/block_based/block_builder.h"
 | |
| #include "table/meta_blocks.h"
 | |
| #include "test_util/sync_point.h"
 | |
| #include "util/cast_util.h"
 | |
| #include "util/crc32c.h"
 | |
| #include "util/mutexlock.h"
 | |
| #include "util/random.h"
 | |
| #include "util/stop_watch.h"
 | |
| #include "util/timer_queue.h"
 | |
| #include "utilities/blob_db/blob_compaction_filter.h"
 | |
| #include "utilities/blob_db/blob_db_iterator.h"
 | |
| #include "utilities/blob_db/blob_db_listener.h"
 | |
| 
 | |
| namespace {
 | |
| int kBlockBasedTableVersionFormat = 2;
 | |
| }  // end namespace
 | |
| 
 | |
| namespace ROCKSDB_NAMESPACE {
 | |
| namespace blob_db {
 | |
| 
 | |
| bool BlobFileComparator::operator()(
 | |
|     const std::shared_ptr<BlobFile>& lhs,
 | |
|     const std::shared_ptr<BlobFile>& rhs) const {
 | |
|   return lhs->BlobFileNumber() > rhs->BlobFileNumber();
 | |
| }
 | |
| 
 | |
| bool BlobFileComparatorTTL::operator()(
 | |
|     const std::shared_ptr<BlobFile>& lhs,
 | |
|     const std::shared_ptr<BlobFile>& rhs) const {
 | |
|   assert(lhs->HasTTL() && rhs->HasTTL());
 | |
|   if (lhs->expiration_range_.first < rhs->expiration_range_.first) {
 | |
|     return true;
 | |
|   }
 | |
|   if (lhs->expiration_range_.first > rhs->expiration_range_.first) {
 | |
|     return false;
 | |
|   }
 | |
|   return lhs->BlobFileNumber() < rhs->BlobFileNumber();
 | |
| }
 | |
| 
 | |
| BlobDBImpl::BlobDBImpl(const std::string& dbname,
 | |
|                        const BlobDBOptions& blob_db_options,
 | |
|                        const DBOptions& db_options,
 | |
|                        const ColumnFamilyOptions& cf_options)
 | |
|     : BlobDB(),
 | |
|       dbname_(dbname),
 | |
|       db_impl_(nullptr),
 | |
|       env_(db_options.env),
 | |
|       bdb_options_(blob_db_options),
 | |
|       db_options_(db_options),
 | |
|       cf_options_(cf_options),
 | |
|       file_options_(db_options),
 | |
|       statistics_(db_options_.statistics.get()),
 | |
|       next_file_number_(1),
 | |
|       flush_sequence_(0),
 | |
|       closed_(true),
 | |
|       open_file_count_(0),
 | |
|       total_blob_size_(0),
 | |
|       live_sst_size_(0),
 | |
|       fifo_eviction_seq_(0),
 | |
|       evict_expiration_up_to_(0),
 | |
|       debug_level_(0) {
 | |
|   clock_ = env_->GetSystemClock().get();
 | |
|   blob_dir_ = (bdb_options_.path_relative)
 | |
|                   ? dbname + "/" + bdb_options_.blob_dir
 | |
|                   : bdb_options_.blob_dir;
 | |
|   file_options_.bytes_per_sync = blob_db_options.bytes_per_sync;
 | |
| }
 | |
| 
 | |
| BlobDBImpl::~BlobDBImpl() {
 | |
|   tqueue_.shutdown();
 | |
|   // CancelAllBackgroundWork(db_, true);
 | |
|   Status s __attribute__((__unused__)) = Close();
 | |
|   assert(s.ok());
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::Close() {
 | |
|   if (closed_) {
 | |
|     return Status::OK();
 | |
|   }
 | |
|   closed_ = true;
 | |
| 
 | |
|   // Close base DB before BlobDBImpl destructs to stop event listener and
 | |
|   // compaction filter call.
 | |
|   Status s = db_->Close();
 | |
|   // delete db_ anyway even if close failed.
 | |
|   delete db_;
 | |
|   // Reset pointers to avoid StackableDB delete the pointer again.
 | |
|   db_ = nullptr;
 | |
|   db_impl_ = nullptr;
 | |
|   if (!s.ok()) {
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   s = SyncBlobFiles();
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; }
 | |
| 
 | |
| Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
 | |
|   assert(handles != nullptr);
 | |
|   assert(db_ == nullptr);
 | |
| 
 | |
|   if (blob_dir_.empty()) {
 | |
|     return Status::NotSupported("No blob directory in options");
 | |
|   }
 | |
| 
 | |
|   if (bdb_options_.garbage_collection_cutoff < 0.0 ||
 | |
|       bdb_options_.garbage_collection_cutoff > 1.0) {
 | |
|     return Status::InvalidArgument(
 | |
|         "Garbage collection cutoff must be in the interval [0.0, 1.0]");
 | |
|   }
 | |
| 
 | |
|   // Temporarily disable compactions in the base DB during open; save the user
 | |
|   // defined value beforehand so we can restore it once BlobDB is initialized.
 | |
|   // Note: this is only needed if garbage collection is enabled.
 | |
|   const bool disable_auto_compactions = cf_options_.disable_auto_compactions;
 | |
| 
 | |
|   if (bdb_options_.enable_garbage_collection) {
 | |
|     cf_options_.disable_auto_compactions = true;
 | |
|   }
 | |
| 
 | |
|   Status s;
 | |
| 
 | |
|   // Create info log.
 | |
|   if (db_options_.info_log == nullptr) {
 | |
|     s = CreateLoggerFromOptions(dbname_, db_options_, &db_options_.info_log);
 | |
|     if (!s.ok()) {
 | |
|       return s;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   ROCKS_LOG_INFO(db_options_.info_log, "Opening BlobDB...");
 | |
| 
 | |
|   if ((cf_options_.compaction_filter != nullptr ||
 | |
|        cf_options_.compaction_filter_factory != nullptr)) {
 | |
|     ROCKS_LOG_INFO(db_options_.info_log,
 | |
|                    "BlobDB only support compaction filter on non-TTL values.");
 | |
|   }
 | |
| 
 | |
|   // Open blob directory.
 | |
|   s = env_->CreateDirIfMissing(blob_dir_);
 | |
|   if (!s.ok()) {
 | |
|     ROCKS_LOG_ERROR(db_options_.info_log,
 | |
|                     "Failed to create blob_dir %s, status: %s",
 | |
|                     blob_dir_.c_str(), s.ToString().c_str());
 | |
|   }
 | |
|   s = env_->GetFileSystem()->NewDirectory(blob_dir_, IOOptions(), &dir_ent_,
 | |
|                                           nullptr);
 | |
|   if (!s.ok()) {
 | |
|     ROCKS_LOG_ERROR(db_options_.info_log,
 | |
|                     "Failed to open blob_dir %s, status: %s", blob_dir_.c_str(),
 | |
|                     s.ToString().c_str());
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   // Open blob files.
 | |
|   s = OpenAllBlobFiles();
 | |
|   if (!s.ok()) {
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   // Update options
 | |
|   if (bdb_options_.enable_garbage_collection) {
 | |
|     db_options_.listeners.push_back(std::make_shared<BlobDBListenerGC>(this));
 | |
|     cf_options_.compaction_filter_factory =
 | |
|         std::make_shared<BlobIndexCompactionFilterFactoryGC>(
 | |
|             this, clock_, cf_options_, statistics_);
 | |
|   } else {
 | |
|     db_options_.listeners.push_back(std::make_shared<BlobDBListener>(this));
 | |
|     cf_options_.compaction_filter_factory =
 | |
|         std::make_shared<BlobIndexCompactionFilterFactory>(
 | |
|             this, clock_, cf_options_, statistics_);
 | |
|   }
 | |
| 
 | |
|   // Reset user compaction filter after building into compaction factory.
 | |
|   cf_options_.compaction_filter = nullptr;
 | |
| 
 | |
|   // Open base db.
 | |
|   ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_);
 | |
|   s = DB::Open(db_options_, dbname_, {cf_descriptor}, handles, &db_);
 | |
|   if (!s.ok()) {
 | |
|     return s;
 | |
|   }
 | |
|   db_impl_ = static_cast_with_check<DBImpl>(db_->GetRootDB());
 | |
| 
 | |
|   // Sanitize the blob_dir provided. Using a directory where the
 | |
|   // base DB stores its files for the default CF is not supported.
 | |
|   const ColumnFamilyData* const cfd =
 | |
|       static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->cfd();
 | |
|   assert(cfd);
 | |
| 
 | |
|   const ImmutableCFOptions* const ioptions = cfd->ioptions();
 | |
|   assert(ioptions);
 | |
| 
 | |
|   assert(env_);
 | |
| 
 | |
|   for (const auto& cf_path : ioptions->cf_paths) {
 | |
|     bool blob_dir_same_as_cf_dir = false;
 | |
|     s = env_->AreFilesSame(blob_dir_, cf_path.path, &blob_dir_same_as_cf_dir);
 | |
|     if (!s.ok()) {
 | |
|       ROCKS_LOG_ERROR(db_options_.info_log,
 | |
|                       "Error while sanitizing blob_dir %s, status: %s",
 | |
|                       blob_dir_.c_str(), s.ToString().c_str());
 | |
|       return s;
 | |
|     }
 | |
| 
 | |
|     if (blob_dir_same_as_cf_dir) {
 | |
|       return Status::NotSupported(
 | |
|           "Using the base DB's storage directories for BlobDB files is not "
 | |
|           "supported.");
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // Initialize SST file <-> oldest blob file mapping if garbage collection
 | |
|   // is enabled.
 | |
|   if (bdb_options_.enable_garbage_collection) {
 | |
|     std::vector<LiveFileMetaData> live_files;
 | |
|     db_->GetLiveFilesMetaData(&live_files);
 | |
| 
 | |
|     InitializeBlobFileToSstMapping(live_files);
 | |
| 
 | |
|     MarkUnreferencedBlobFilesObsoleteDuringOpen();
 | |
| 
 | |
|     if (!disable_auto_compactions) {
 | |
|       s = db_->EnableAutoCompaction(*handles);
 | |
|       if (!s.ok()) {
 | |
|         ROCKS_LOG_ERROR(
 | |
|             db_options_.info_log,
 | |
|             "Failed to enable automatic compactions during open, status: %s",
 | |
|             s.ToString().c_str());
 | |
|         return s;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // Add trash files in blob dir to file delete scheduler.
 | |
|   SstFileManagerImpl* sfm = static_cast<SstFileManagerImpl*>(
 | |
|       db_impl_->immutable_db_options().sst_file_manager.get());
 | |
|   DeleteScheduler::CleanupDirectory(env_, sfm, blob_dir_);
 | |
| 
 | |
|   UpdateLiveSSTSize();
 | |
| 
 | |
|   // Start background jobs.
 | |
|   if (!bdb_options_.disable_background_tasks) {
 | |
|     StartBackgroundTasks();
 | |
|   }
 | |
| 
 | |
|   ROCKS_LOG_INFO(db_options_.info_log, "BlobDB pointer %p", this);
 | |
|   bdb_options_.Dump(db_options_.info_log.get());
 | |
|   closed_ = false;
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::StartBackgroundTasks() {
 | |
|   // store a call to a member function and object
 | |
|   tqueue_.add(
 | |
|       kReclaimOpenFilesPeriodMillisecs,
 | |
|       std::bind(&BlobDBImpl::ReclaimOpenFiles, this, std::placeholders::_1));
 | |
|   tqueue_.add(
 | |
|       kDeleteObsoleteFilesPeriodMillisecs,
 | |
|       std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1));
 | |
|   tqueue_.add(kSanityCheckPeriodMillisecs,
 | |
|               std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1));
 | |
|   tqueue_.add(
 | |
|       kEvictExpiredFilesPeriodMillisecs,
 | |
|       std::bind(&BlobDBImpl::EvictExpiredFiles, this, std::placeholders::_1));
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::GetAllBlobFiles(std::set<uint64_t>* file_numbers) {
 | |
|   assert(file_numbers != nullptr);
 | |
|   std::vector<std::string> all_files;
 | |
|   Status s = env_->GetChildren(blob_dir_, &all_files);
 | |
|   if (!s.ok()) {
 | |
|     ROCKS_LOG_ERROR(db_options_.info_log,
 | |
|                     "Failed to get list of blob files, status: %s",
 | |
|                     s.ToString().c_str());
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   for (const auto& file_name : all_files) {
 | |
|     uint64_t file_number;
 | |
|     FileType type;
 | |
|     bool success = ParseFileName(file_name, &file_number, &type);
 | |
|     if (success && type == kBlobFile) {
 | |
|       file_numbers->insert(file_number);
 | |
|     } else {
 | |
|       ROCKS_LOG_WARN(db_options_.info_log,
 | |
|                      "Skipping file in blob directory: %s", file_name.c_str());
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::OpenAllBlobFiles() {
 | |
|   std::set<uint64_t> file_numbers;
 | |
|   Status s = GetAllBlobFiles(&file_numbers);
 | |
|   if (!s.ok()) {
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   if (!file_numbers.empty()) {
 | |
|     next_file_number_.store(*file_numbers.rbegin() + 1);
 | |
|   }
 | |
| 
 | |
|   std::ostringstream blob_file_oss;
 | |
|   std::ostringstream live_imm_oss;
 | |
|   std::ostringstream obsolete_file_oss;
 | |
| 
 | |
|   for (auto& file_number : file_numbers) {
 | |
|     std::shared_ptr<BlobFile> blob_file = std::make_shared<BlobFile>(
 | |
|         this, blob_dir_, file_number, db_options_.info_log.get());
 | |
|     blob_file->MarkImmutable(/* sequence */ 0);
 | |
| 
 | |
|     // Read file header and footer
 | |
|     Status read_metadata_status =
 | |
|         blob_file->ReadMetadata(env_->GetFileSystem(), file_options_);
 | |
|     if (read_metadata_status.IsCorruption()) {
 | |
|       // Remove incomplete file.
 | |
|       if (!obsolete_files_.empty()) {
 | |
|         obsolete_file_oss << ", ";
 | |
|       }
 | |
|       obsolete_file_oss << file_number;
 | |
| 
 | |
|       ObsoleteBlobFile(blob_file, 0 /*obsolete_seq*/, false /*update_size*/);
 | |
|       continue;
 | |
|     } else if (!read_metadata_status.ok()) {
 | |
|       ROCKS_LOG_ERROR(db_options_.info_log,
 | |
|                       "Unable to read metadata of blob file %" PRIu64
 | |
|                       ", status: '%s'",
 | |
|                       file_number, read_metadata_status.ToString().c_str());
 | |
|       return read_metadata_status;
 | |
|     }
 | |
| 
 | |
|     total_blob_size_ += blob_file->GetFileSize();
 | |
| 
 | |
|     if (!blob_files_.empty()) {
 | |
|       blob_file_oss << ", ";
 | |
|     }
 | |
|     blob_file_oss << file_number;
 | |
| 
 | |
|     blob_files_[file_number] = blob_file;
 | |
| 
 | |
|     if (!blob_file->HasTTL()) {
 | |
|       if (!live_imm_non_ttl_blob_files_.empty()) {
 | |
|         live_imm_oss << ", ";
 | |
|       }
 | |
|       live_imm_oss << file_number;
 | |
| 
 | |
|       live_imm_non_ttl_blob_files_[file_number] = blob_file;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   ROCKS_LOG_INFO(db_options_.info_log,
 | |
|                  "Found %" ROCKSDB_PRIszt " blob files: %s", blob_files_.size(),
 | |
|                  blob_file_oss.str().c_str());
 | |
|   ROCKS_LOG_INFO(
 | |
|       db_options_.info_log, "Found %" ROCKSDB_PRIszt " non-TTL blob files: %s",
 | |
|       live_imm_non_ttl_blob_files_.size(), live_imm_oss.str().c_str());
 | |
|   ROCKS_LOG_INFO(db_options_.info_log,
 | |
|                  "Found %" ROCKSDB_PRIszt
 | |
|                  " incomplete or corrupted blob files: %s",
 | |
|                  obsolete_files_.size(), obsolete_file_oss.str().c_str());
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| template <typename Linker>
 | |
| void BlobDBImpl::LinkSstToBlobFileImpl(uint64_t sst_file_number,
 | |
|                                        uint64_t blob_file_number,
 | |
|                                        Linker linker) {
 | |
|   assert(bdb_options_.enable_garbage_collection);
 | |
|   assert(blob_file_number != kInvalidBlobFileNumber);
 | |
| 
 | |
|   auto it = blob_files_.find(blob_file_number);
 | |
|   if (it == blob_files_.end()) {
 | |
|     ROCKS_LOG_WARN(db_options_.info_log,
 | |
|                    "Blob file %" PRIu64
 | |
|                    " not found while trying to link "
 | |
|                    "SST file %" PRIu64,
 | |
|                    blob_file_number, sst_file_number);
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   BlobFile* const blob_file = it->second.get();
 | |
|   assert(blob_file);
 | |
| 
 | |
|   linker(blob_file, sst_file_number);
 | |
| 
 | |
|   ROCKS_LOG_INFO(db_options_.info_log,
 | |
|                  "Blob file %" PRIu64 " linked to SST file %" PRIu64,
 | |
|                  blob_file_number, sst_file_number);
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::LinkSstToBlobFile(uint64_t sst_file_number,
 | |
|                                    uint64_t blob_file_number) {
 | |
|   auto linker = [](BlobFile* blob_file, uint64_t sst_file) {
 | |
|     WriteLock file_lock(&blob_file->mutex_);
 | |
|     blob_file->LinkSstFile(sst_file);
 | |
|   };
 | |
| 
 | |
|   LinkSstToBlobFileImpl(sst_file_number, blob_file_number, linker);
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::LinkSstToBlobFileNoLock(uint64_t sst_file_number,
 | |
|                                          uint64_t blob_file_number) {
 | |
|   auto linker = [](BlobFile* blob_file, uint64_t sst_file) {
 | |
|     blob_file->LinkSstFile(sst_file);
 | |
|   };
 | |
| 
 | |
|   LinkSstToBlobFileImpl(sst_file_number, blob_file_number, linker);
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::UnlinkSstFromBlobFile(uint64_t sst_file_number,
 | |
|                                        uint64_t blob_file_number) {
 | |
|   assert(bdb_options_.enable_garbage_collection);
 | |
|   assert(blob_file_number != kInvalidBlobFileNumber);
 | |
| 
 | |
|   auto it = blob_files_.find(blob_file_number);
 | |
|   if (it == blob_files_.end()) {
 | |
|     ROCKS_LOG_WARN(db_options_.info_log,
 | |
|                    "Blob file %" PRIu64
 | |
|                    " not found while trying to unlink "
 | |
|                    "SST file %" PRIu64,
 | |
|                    blob_file_number, sst_file_number);
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   BlobFile* const blob_file = it->second.get();
 | |
|   assert(blob_file);
 | |
| 
 | |
|   {
 | |
|     WriteLock file_lock(&blob_file->mutex_);
 | |
|     blob_file->UnlinkSstFile(sst_file_number);
 | |
|   }
 | |
| 
 | |
|   ROCKS_LOG_INFO(db_options_.info_log,
 | |
|                  "Blob file %" PRIu64 " unlinked from SST file %" PRIu64,
 | |
|                  blob_file_number, sst_file_number);
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::InitializeBlobFileToSstMapping(
 | |
|     const std::vector<LiveFileMetaData>& live_files) {
 | |
|   assert(bdb_options_.enable_garbage_collection);
 | |
| 
 | |
|   for (const auto& live_file : live_files) {
 | |
|     const uint64_t sst_file_number = live_file.file_number;
 | |
|     const uint64_t blob_file_number = live_file.oldest_blob_file_number;
 | |
| 
 | |
|     if (blob_file_number == kInvalidBlobFileNumber) {
 | |
|       continue;
 | |
|     }
 | |
| 
 | |
|     LinkSstToBlobFileNoLock(sst_file_number, blob_file_number);
 | |
|   }
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::ProcessFlushJobInfo(const FlushJobInfo& info) {
 | |
|   assert(bdb_options_.enable_garbage_collection);
 | |
| 
 | |
|   WriteLock lock(&mutex_);
 | |
| 
 | |
|   if (info.oldest_blob_file_number != kInvalidBlobFileNumber) {
 | |
|     LinkSstToBlobFile(info.file_number, info.oldest_blob_file_number);
 | |
|   }
 | |
| 
 | |
|   assert(flush_sequence_ < info.largest_seqno);
 | |
|   flush_sequence_ = info.largest_seqno;
 | |
| 
 | |
|   MarkUnreferencedBlobFilesObsolete();
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::ProcessCompactionJobInfo(const CompactionJobInfo& info) {
 | |
|   assert(bdb_options_.enable_garbage_collection);
 | |
| 
 | |
|   if (!info.status.ok()) {
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   // Note: the same SST file may appear in both the input and the output
 | |
|   // file list in case of a trivial move. We walk through the two lists
 | |
|   // below in a fashion that's similar to merge sort to detect this.
 | |
| 
 | |
|   auto cmp = [](const CompactionFileInfo& lhs, const CompactionFileInfo& rhs) {
 | |
|     return lhs.file_number < rhs.file_number;
 | |
|   };
 | |
| 
 | |
|   auto inputs = info.input_file_infos;
 | |
|   auto iit = inputs.begin();
 | |
|   const auto iit_end = inputs.end();
 | |
| 
 | |
|   std::sort(iit, iit_end, cmp);
 | |
| 
 | |
|   auto outputs = info.output_file_infos;
 | |
|   auto oit = outputs.begin();
 | |
|   const auto oit_end = outputs.end();
 | |
| 
 | |
|   std::sort(oit, oit_end, cmp);
 | |
| 
 | |
|   WriteLock lock(&mutex_);
 | |
| 
 | |
|   while (iit != iit_end && oit != oit_end) {
 | |
|     const auto& input = *iit;
 | |
|     const auto& output = *oit;
 | |
| 
 | |
|     if (input.file_number == output.file_number) {
 | |
|       ++iit;
 | |
|       ++oit;
 | |
|     } else if (input.file_number < output.file_number) {
 | |
|       if (input.oldest_blob_file_number != kInvalidBlobFileNumber) {
 | |
|         UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number);
 | |
|       }
 | |
| 
 | |
|       ++iit;
 | |
|     } else {
 | |
|       assert(output.file_number < input.file_number);
 | |
| 
 | |
|       if (output.oldest_blob_file_number != kInvalidBlobFileNumber) {
 | |
|         LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number);
 | |
|       }
 | |
| 
 | |
|       ++oit;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   while (iit != iit_end) {
 | |
|     const auto& input = *iit;
 | |
| 
 | |
|     if (input.oldest_blob_file_number != kInvalidBlobFileNumber) {
 | |
|       UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number);
 | |
|     }
 | |
| 
 | |
|     ++iit;
 | |
|   }
 | |
| 
 | |
|   while (oit != oit_end) {
 | |
|     const auto& output = *oit;
 | |
| 
 | |
|     if (output.oldest_blob_file_number != kInvalidBlobFileNumber) {
 | |
|       LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number);
 | |
|     }
 | |
| 
 | |
|     ++oit;
 | |
|   }
 | |
| 
 | |
|   MarkUnreferencedBlobFilesObsolete();
 | |
| }
 | |
| 
 | |
| bool BlobDBImpl::MarkBlobFileObsoleteIfNeeded(
 | |
|     const std::shared_ptr<BlobFile>& blob_file, SequenceNumber obsolete_seq) {
 | |
|   assert(blob_file);
 | |
|   assert(!blob_file->HasTTL());
 | |
|   assert(blob_file->Immutable());
 | |
|   assert(bdb_options_.enable_garbage_collection);
 | |
| 
 | |
|   // Note: FIFO eviction could have marked this file obsolete already.
 | |
|   if (blob_file->Obsolete()) {
 | |
|     return true;
 | |
|   }
 | |
| 
 | |
|   // We cannot mark this file (or any higher-numbered files for that matter)
 | |
|   // obsolete if it is referenced by any memtables or SSTs. We keep track of
 | |
|   // the SSTs explicitly. To account for memtables, we keep track of the highest
 | |
|   // sequence number received in flush notifications, and we do not mark the
 | |
|   // blob file obsolete if there are still unflushed memtables from before
 | |
|   // the time the blob file was closed.
 | |
|   if (blob_file->GetImmutableSequence() > flush_sequence_ ||
 | |
|       !blob_file->GetLinkedSstFiles().empty()) {
 | |
|     return false;
 | |
|   }
 | |
| 
 | |
|   ROCKS_LOG_INFO(db_options_.info_log,
 | |
|                  "Blob file %" PRIu64 " is no longer needed, marking obsolete",
 | |
|                  blob_file->BlobFileNumber());
 | |
| 
 | |
|   ObsoleteBlobFile(blob_file, obsolete_seq, /* update_size */ true);
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| template <class Functor>
 | |
| void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed) {
 | |
|   assert(bdb_options_.enable_garbage_collection);
 | |
| 
 | |
|   // Iterate through all live immutable non-TTL blob files, and mark them
 | |
|   // obsolete assuming no SST files or memtables rely on the blobs in them.
 | |
|   // Note: we need to stop as soon as we find a blob file that has any
 | |
|   // linked SSTs (or one potentially referenced by memtables).
 | |
| 
 | |
|   uint64_t obsoleted_files = 0;
 | |
| 
 | |
|   auto it = live_imm_non_ttl_blob_files_.begin();
 | |
|   while (it != live_imm_non_ttl_blob_files_.end()) {
 | |
|     const auto& blob_file = it->second;
 | |
|     assert(blob_file);
 | |
|     assert(blob_file->BlobFileNumber() == it->first);
 | |
|     assert(!blob_file->HasTTL());
 | |
|     assert(blob_file->Immutable());
 | |
| 
 | |
|     // Small optimization: Obsolete() does an atomic read, so we can do
 | |
|     // this check without taking a lock on the blob file's mutex.
 | |
|     if (blob_file->Obsolete()) {
 | |
|       it = live_imm_non_ttl_blob_files_.erase(it);
 | |
|       continue;
 | |
|     }
 | |
| 
 | |
|     if (!mark_if_needed(blob_file)) {
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     it = live_imm_non_ttl_blob_files_.erase(it);
 | |
| 
 | |
|     ++obsoleted_files;
 | |
|   }
 | |
| 
 | |
|   if (obsoleted_files > 0) {
 | |
|     ROCKS_LOG_INFO(db_options_.info_log,
 | |
|                    "%" PRIu64 " blob file(s) marked obsolete by GC",
 | |
|                    obsoleted_files);
 | |
|     RecordTick(statistics_, BLOB_DB_GC_NUM_FILES, obsoleted_files);
 | |
|   }
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::MarkUnreferencedBlobFilesObsolete() {
 | |
|   const SequenceNumber obsolete_seq = GetLatestSequenceNumber();
 | |
| 
 | |
|   MarkUnreferencedBlobFilesObsoleteImpl(
 | |
|       [this, obsolete_seq](const std::shared_ptr<BlobFile>& blob_file) {
 | |
|         WriteLock file_lock(&blob_file->mutex_);
 | |
|         return MarkBlobFileObsoleteIfNeeded(blob_file, obsolete_seq);
 | |
|       });
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteDuringOpen() {
 | |
|   MarkUnreferencedBlobFilesObsoleteImpl(
 | |
|       [this](const std::shared_ptr<BlobFile>& blob_file) {
 | |
|         return MarkBlobFileObsoleteIfNeeded(blob_file, /* obsolete_seq */ 0);
 | |
|       });
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::CloseRandomAccessLocked(
 | |
|     const std::shared_ptr<BlobFile>& bfile) {
 | |
|   bfile->CloseRandomAccessLocked();
 | |
|   open_file_count_--;
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::GetBlobFileReader(
 | |
|     const std::shared_ptr<BlobFile>& blob_file,
 | |
|     std::shared_ptr<RandomAccessFileReader>* reader) {
 | |
|   assert(reader != nullptr);
 | |
|   bool fresh_open = false;
 | |
|   Status s = blob_file->GetReader(env_, file_options_, reader, &fresh_open);
 | |
|   if (s.ok() && fresh_open) {
 | |
|     assert(*reader != nullptr);
 | |
|     open_file_count_++;
 | |
|   }
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(
 | |
|     bool has_ttl, const ExpirationRange& expiration_range,
 | |
|     const std::string& reason) {
 | |
|   assert(has_ttl == (expiration_range.first || expiration_range.second));
 | |
| 
 | |
|   uint64_t file_num = next_file_number_++;
 | |
| 
 | |
|   const uint32_t column_family_id =
 | |
|       static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
 | |
|   auto blob_file = std::make_shared<BlobFile>(
 | |
|       this, blob_dir_, file_num, db_options_.info_log.get(), column_family_id,
 | |
|       bdb_options_.compression, has_ttl, expiration_range);
 | |
| 
 | |
|   ROCKS_LOG_DEBUG(db_options_.info_log, "New blob file created: %s reason='%s'",
 | |
|                   blob_file->PathName().c_str(), reason.c_str());
 | |
|   LogFlush(db_options_.info_log);
 | |
| 
 | |
|   return blob_file;
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::RegisterBlobFile(std::shared_ptr<BlobFile> blob_file) {
 | |
|   const uint64_t blob_file_number = blob_file->BlobFileNumber();
 | |
| 
 | |
|   auto it = blob_files_.lower_bound(blob_file_number);
 | |
|   assert(it == blob_files_.end() || it->first != blob_file_number);
 | |
| 
 | |
|   blob_files_.insert(it,
 | |
|                      std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
 | |
|                          blob_file_number, std::move(blob_file)));
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
 | |
|   std::string fpath(bfile->PathName());
 | |
|   std::unique_ptr<FSWritableFile> wfile;
 | |
|   const auto& fs = env_->GetFileSystem();
 | |
| 
 | |
|   Status s = fs->ReopenWritableFile(fpath, file_options_, &wfile, nullptr);
 | |
|   if (!s.ok()) {
 | |
|     ROCKS_LOG_ERROR(db_options_.info_log,
 | |
|                     "Failed to open blob file for write: %s status: '%s'"
 | |
|                     " exists: '%s'",
 | |
|                     fpath.c_str(), s.ToString().c_str(),
 | |
|                     fs->FileExists(fpath, file_options_.io_options, nullptr)
 | |
|                         .ToString()
 | |
|                         .c_str());
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   std::unique_ptr<WritableFileWriter> fwriter;
 | |
|   fwriter.reset(new WritableFileWriter(std::move(wfile), fpath, file_options_));
 | |
| 
 | |
|   uint64_t boffset = bfile->GetFileSize();
 | |
|   if (debug_level_ >= 2 && boffset) {
 | |
|     ROCKS_LOG_DEBUG(db_options_.info_log,
 | |
|                     "Open blob file: %s with offset: %" PRIu64, fpath.c_str(),
 | |
|                     boffset);
 | |
|   }
 | |
| 
 | |
|   BlobLogWriter::ElemType et = BlobLogWriter::kEtNone;
 | |
|   if (bfile->file_size_ == BlobLogHeader::kSize) {
 | |
|     et = BlobLogWriter::kEtFileHdr;
 | |
|   } else if (bfile->file_size_ > BlobLogHeader::kSize) {
 | |
|     et = BlobLogWriter::kEtRecord;
 | |
|   } else if (bfile->file_size_) {
 | |
|     ROCKS_LOG_WARN(db_options_.info_log,
 | |
|                    "Open blob file: %s with wrong size: %" PRIu64,
 | |
|                    fpath.c_str(), boffset);
 | |
|     return Status::Corruption("Invalid blob file size");
 | |
|   }
 | |
| 
 | |
|   constexpr bool do_flush = true;
 | |
| 
 | |
|   bfile->log_writer_ = std::make_shared<BlobLogWriter>(
 | |
|       std::move(fwriter), clock_, statistics_, bfile->file_number_,
 | |
|       db_options_.use_fsync, do_flush, boffset);
 | |
|   bfile->log_writer_->last_elem_type_ = et;
 | |
| 
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked(
 | |
|     uint64_t expiration) const {
 | |
|   if (open_ttl_files_.empty()) {
 | |
|     return nullptr;
 | |
|   }
 | |
| 
 | |
|   std::shared_ptr<BlobFile> tmp = std::make_shared<BlobFile>();
 | |
|   tmp->SetHasTTL(true);
 | |
|   tmp->expiration_range_ = std::make_pair(expiration, 0);
 | |
|   tmp->file_number_ = std::numeric_limits<uint64_t>::max();
 | |
| 
 | |
|   auto citr = open_ttl_files_.equal_range(tmp);
 | |
|   if (citr.first == open_ttl_files_.end()) {
 | |
|     assert(citr.second == open_ttl_files_.end());
 | |
| 
 | |
|     std::shared_ptr<BlobFile> check = *(open_ttl_files_.rbegin());
 | |
|     return (check->expiration_range_.second <= expiration) ? nullptr : check;
 | |
|   }
 | |
| 
 | |
|   if (citr.first != citr.second) {
 | |
|     return *(citr.first);
 | |
|   }
 | |
| 
 | |
|   auto finditr = citr.second;
 | |
|   if (finditr != open_ttl_files_.begin()) {
 | |
|     --finditr;
 | |
|   }
 | |
| 
 | |
|   bool b2 = (*finditr)->expiration_range_.second <= expiration;
 | |
|   bool b1 = (*finditr)->expiration_range_.first > expiration;
 | |
| 
 | |
|   return (b1 || b2) ? nullptr : (*finditr);
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::CheckOrCreateWriterLocked(
 | |
|     const std::shared_ptr<BlobFile>& blob_file,
 | |
|     std::shared_ptr<BlobLogWriter>* writer) {
 | |
|   assert(writer != nullptr);
 | |
|   *writer = blob_file->GetWriter();
 | |
|   if (*writer != nullptr) {
 | |
|     return Status::OK();
 | |
|   }
 | |
|   Status s = CreateWriterLocked(blob_file);
 | |
|   if (s.ok()) {
 | |
|     *writer = blob_file->GetWriter();
 | |
|   }
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::CreateBlobFileAndWriter(
 | |
|     bool has_ttl, const ExpirationRange& expiration_range,
 | |
|     const std::string& reason, std::shared_ptr<BlobFile>* blob_file,
 | |
|     std::shared_ptr<BlobLogWriter>* writer) {
 | |
|   TEST_SYNC_POINT("BlobDBImpl::CreateBlobFileAndWriter");
 | |
|   assert(has_ttl == (expiration_range.first || expiration_range.second));
 | |
|   assert(blob_file);
 | |
|   assert(writer);
 | |
| 
 | |
|   *blob_file = NewBlobFile(has_ttl, expiration_range, reason);
 | |
|   assert(*blob_file);
 | |
| 
 | |
|   // file not visible, hence no lock
 | |
|   Status s = CheckOrCreateWriterLocked(*blob_file, writer);
 | |
|   if (!s.ok()) {
 | |
|     ROCKS_LOG_ERROR(db_options_.info_log,
 | |
|                     "Failed to get writer for blob file: %s, error: %s",
 | |
|                     (*blob_file)->PathName().c_str(), s.ToString().c_str());
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   assert(*writer);
 | |
| 
 | |
|   s = (*writer)->WriteHeader((*blob_file)->header_);
 | |
|   if (!s.ok()) {
 | |
|     ROCKS_LOG_ERROR(db_options_.info_log,
 | |
|                     "Failed to write header to new blob file: %s"
 | |
|                     " status: '%s'",
 | |
|                     (*blob_file)->PathName().c_str(), s.ToString().c_str());
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   (*blob_file)->SetFileSize(BlobLogHeader::kSize);
 | |
|   total_blob_size_ += BlobLogHeader::kSize;
 | |
| 
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::SelectBlobFile(std::shared_ptr<BlobFile>* blob_file) {
 | |
|   assert(blob_file);
 | |
| 
 | |
|   {
 | |
|     ReadLock rl(&mutex_);
 | |
| 
 | |
|     if (open_non_ttl_file_) {
 | |
|       assert(!open_non_ttl_file_->Immutable());
 | |
|       *blob_file = open_non_ttl_file_;
 | |
|       return Status::OK();
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // Check again
 | |
|   WriteLock wl(&mutex_);
 | |
| 
 | |
|   if (open_non_ttl_file_) {
 | |
|     assert(!open_non_ttl_file_->Immutable());
 | |
|     *blob_file = open_non_ttl_file_;
 | |
|     return Status::OK();
 | |
|   }
 | |
| 
 | |
|   std::shared_ptr<BlobLogWriter> writer;
 | |
|   const Status s = CreateBlobFileAndWriter(
 | |
|       /* has_ttl */ false, ExpirationRange(),
 | |
|       /* reason */ "SelectBlobFile", blob_file, &writer);
 | |
|   if (!s.ok()) {
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   RegisterBlobFile(*blob_file);
 | |
|   open_non_ttl_file_ = *blob_file;
 | |
| 
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::SelectBlobFileTTL(uint64_t expiration,
 | |
|                                      std::shared_ptr<BlobFile>* blob_file) {
 | |
|   assert(blob_file);
 | |
|   assert(expiration != kNoExpiration);
 | |
| 
 | |
|   {
 | |
|     ReadLock rl(&mutex_);
 | |
| 
 | |
|     *blob_file = FindBlobFileLocked(expiration);
 | |
|     if (*blob_file != nullptr) {
 | |
|       assert(!(*blob_file)->Immutable());
 | |
|       return Status::OK();
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // Check again
 | |
|   WriteLock wl(&mutex_);
 | |
| 
 | |
|   *blob_file = FindBlobFileLocked(expiration);
 | |
|   if (*blob_file != nullptr) {
 | |
|     assert(!(*blob_file)->Immutable());
 | |
|     return Status::OK();
 | |
|   }
 | |
| 
 | |
|   const uint64_t exp_low =
 | |
|       (expiration / bdb_options_.ttl_range_secs) * bdb_options_.ttl_range_secs;
 | |
|   const uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs;
 | |
|   const ExpirationRange expiration_range(exp_low, exp_high);
 | |
| 
 | |
|   std::ostringstream oss;
 | |
|   oss << "SelectBlobFileTTL range: [" << exp_low << ',' << exp_high << ')';
 | |
| 
 | |
|   std::shared_ptr<BlobLogWriter> writer;
 | |
|   const Status s =
 | |
|       CreateBlobFileAndWriter(/* has_ttl */ true, expiration_range,
 | |
|                               /* reason */ oss.str(), blob_file, &writer);
 | |
|   if (!s.ok()) {
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   RegisterBlobFile(*blob_file);
 | |
|   open_ttl_files_.insert(*blob_file);
 | |
| 
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
 | |
|  private:
 | |
|   const WriteOptions& options_;
 | |
|   BlobDBImpl* blob_db_impl_;
 | |
|   uint32_t default_cf_id_;
 | |
|   WriteBatch batch_;
 | |
| 
 | |
|  public:
 | |
|   BlobInserter(const WriteOptions& options, BlobDBImpl* blob_db_impl,
 | |
|                uint32_t default_cf_id)
 | |
|       : options_(options),
 | |
|         blob_db_impl_(blob_db_impl),
 | |
|         default_cf_id_(default_cf_id) {}
 | |
| 
 | |
|   WriteBatch* batch() { return &batch_; }
 | |
| 
 | |
|   Status PutCF(uint32_t column_family_id, const Slice& key,
 | |
|                const Slice& value) override {
 | |
|     if (column_family_id != default_cf_id_) {
 | |
|       return Status::NotSupported(
 | |
|           "Blob DB doesn't support non-default column family.");
 | |
|     }
 | |
|     Status s = blob_db_impl_->PutBlobValue(options_, key, value, kNoExpiration,
 | |
|                                            &batch_);
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
 | |
|     if (column_family_id != default_cf_id_) {
 | |
|       return Status::NotSupported(
 | |
|           "Blob DB doesn't support non-default column family.");
 | |
|     }
 | |
|     Status s = WriteBatchInternal::Delete(&batch_, column_family_id, key);
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   virtual Status DeleteRange(uint32_t column_family_id, const Slice& begin_key,
 | |
|                              const Slice& end_key) {
 | |
|     if (column_family_id != default_cf_id_) {
 | |
|       return Status::NotSupported(
 | |
|           "Blob DB doesn't support non-default column family.");
 | |
|     }
 | |
|     Status s = WriteBatchInternal::DeleteRange(&batch_, column_family_id,
 | |
|                                                begin_key, end_key);
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   Status SingleDeleteCF(uint32_t /*column_family_id*/,
 | |
|                         const Slice& /*key*/) override {
 | |
|     return Status::NotSupported("Not supported operation in blob db.");
 | |
|   }
 | |
| 
 | |
|   Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
 | |
|                  const Slice& /*value*/) override {
 | |
|     return Status::NotSupported("Not supported operation in blob db.");
 | |
|   }
 | |
| 
 | |
|   void LogData(const Slice& blob) override { batch_.PutLogData(blob); }
 | |
| };
 | |
| 
 | |
| Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
 | |
|   StopWatch write_sw(clock_, statistics_, BLOB_DB_WRITE_MICROS);
 | |
|   RecordTick(statistics_, BLOB_DB_NUM_WRITE);
 | |
|   uint32_t default_cf_id =
 | |
|       static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
 | |
|           ->GetID();
 | |
|   Status s;
 | |
|   BlobInserter blob_inserter(options, this, default_cf_id);
 | |
|   {
 | |
|     // Release write_mutex_ before DB write to avoid race condition with
 | |
|     // flush begin listener, which also require write_mutex_ to sync
 | |
|     // blob files.
 | |
|     MutexLock l(&write_mutex_);
 | |
|     s = updates->Iterate(&blob_inserter);
 | |
|   }
 | |
|   if (!s.ok()) {
 | |
|     return s;
 | |
|   }
 | |
|   return db_->Write(options, blob_inserter.batch());
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
 | |
|                        const Slice& value) {
 | |
|   return PutUntil(options, key, value, kNoExpiration);
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
 | |
|                               const Slice& key, const Slice& value,
 | |
|                               uint64_t ttl) {
 | |
|   uint64_t now = EpochNow();
 | |
|   uint64_t expiration = kNoExpiration - now > ttl ? now + ttl : kNoExpiration;
 | |
|   return PutUntil(options, key, value, expiration);
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
 | |
|                             const Slice& value, uint64_t expiration) {
 | |
|   StopWatch write_sw(clock_, statistics_, BLOB_DB_WRITE_MICROS);
 | |
|   RecordTick(statistics_, BLOB_DB_NUM_PUT);
 | |
|   Status s;
 | |
|   WriteBatch batch;
 | |
|   {
 | |
|     // Release write_mutex_ before DB write to avoid race condition with
 | |
|     // flush begin listener, which also require write_mutex_ to sync
 | |
|     // blob files.
 | |
|     MutexLock l(&write_mutex_);
 | |
|     s = PutBlobValue(options, key, value, expiration, &batch);
 | |
|   }
 | |
|   if (s.ok()) {
 | |
|     s = db_->Write(options, &batch);
 | |
|   }
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/,
 | |
|                                 const Slice& key, const Slice& value,
 | |
|                                 uint64_t expiration, WriteBatch* batch) {
 | |
|   write_mutex_.AssertHeld();
 | |
|   Status s;
 | |
|   std::string index_entry;
 | |
|   uint32_t column_family_id =
 | |
|       static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
 | |
|           ->GetID();
 | |
|   if (value.size() < bdb_options_.min_blob_size) {
 | |
|     if (expiration == kNoExpiration) {
 | |
|       // Put as normal value
 | |
|       s = batch->Put(key, value);
 | |
|       RecordTick(statistics_, BLOB_DB_WRITE_INLINED);
 | |
|     } else {
 | |
|       // Inlined with TTL
 | |
|       BlobIndex::EncodeInlinedTTL(&index_entry, expiration, value);
 | |
|       s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
 | |
|                                            index_entry);
 | |
|       RecordTick(statistics_, BLOB_DB_WRITE_INLINED_TTL);
 | |
|     }
 | |
|   } else {
 | |
|     std::string compression_output;
 | |
|     Slice value_compressed = GetCompressedSlice(value, &compression_output);
 | |
| 
 | |
|     std::string headerbuf;
 | |
|     BlobLogWriter::ConstructBlobHeader(&headerbuf, key, value_compressed,
 | |
|                                        expiration);
 | |
| 
 | |
|     // Check DB size limit before selecting blob file to
 | |
|     // Since CheckSizeAndEvictBlobFiles() can close blob files, it needs to be
 | |
|     // done before calling SelectBlobFile().
 | |
|     s = CheckSizeAndEvictBlobFiles(headerbuf.size() + key.size() +
 | |
|                                    value_compressed.size());
 | |
|     if (!s.ok()) {
 | |
|       return s;
 | |
|     }
 | |
| 
 | |
|     std::shared_ptr<BlobFile> blob_file;
 | |
|     if (expiration != kNoExpiration) {
 | |
|       s = SelectBlobFileTTL(expiration, &blob_file);
 | |
|     } else {
 | |
|       s = SelectBlobFile(&blob_file);
 | |
|     }
 | |
|     if (s.ok()) {
 | |
|       assert(blob_file != nullptr);
 | |
|       assert(blob_file->GetCompressionType() == bdb_options_.compression);
 | |
|       s = AppendBlob(blob_file, headerbuf, key, value_compressed, expiration,
 | |
|                      &index_entry);
 | |
|     }
 | |
|     if (s.ok()) {
 | |
|       if (expiration != kNoExpiration) {
 | |
|         WriteLock file_lock(&blob_file->mutex_);
 | |
|         blob_file->ExtendExpirationRange(expiration);
 | |
|       }
 | |
|       s = CloseBlobFileIfNeeded(blob_file);
 | |
|     }
 | |
|     if (s.ok()) {
 | |
|       s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
 | |
|                                            index_entry);
 | |
|     }
 | |
|     if (s.ok()) {
 | |
|       if (expiration == kNoExpiration) {
 | |
|         RecordTick(statistics_, BLOB_DB_WRITE_BLOB);
 | |
|       } else {
 | |
|         RecordTick(statistics_, BLOB_DB_WRITE_BLOB_TTL);
 | |
|       }
 | |
|     } else {
 | |
|       ROCKS_LOG_ERROR(
 | |
|           db_options_.info_log,
 | |
|           "Failed to append blob to FILE: %s: KEY: %s VALSZ: %" ROCKSDB_PRIszt
 | |
|           " status: '%s' blob_file: '%s'",
 | |
|           blob_file->PathName().c_str(), key.ToString().c_str(), value.size(),
 | |
|           s.ToString().c_str(), blob_file->DumpState().c_str());
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   RecordTick(statistics_, BLOB_DB_NUM_KEYS_WRITTEN);
 | |
|   RecordTick(statistics_, BLOB_DB_BYTES_WRITTEN, key.size() + value.size());
 | |
|   RecordInHistogram(statistics_, BLOB_DB_KEY_SIZE, key.size());
 | |
|   RecordInHistogram(statistics_, BLOB_DB_VALUE_SIZE, value.size());
 | |
| 
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
 | |
|                                      std::string* compression_output) const {
 | |
|   if (bdb_options_.compression == kNoCompression) {
 | |
|     return raw;
 | |
|   }
 | |
|   StopWatch compression_sw(clock_, statistics_, BLOB_DB_COMPRESSION_MICROS);
 | |
|   CompressionType type = bdb_options_.compression;
 | |
|   CompressionOptions opts;
 | |
|   CompressionContext context(type);
 | |
|   CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), type,
 | |
|                        0 /* sample_for_compression */);
 | |
|   CompressBlock(raw, info, &type, kBlockBasedTableVersionFormat, false,
 | |
|                 compression_output, nullptr, nullptr);
 | |
|   return *compression_output;
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::DecompressSlice(const Slice& compressed_value,
 | |
|                                    CompressionType compression_type,
 | |
|                                    PinnableSlice* value_output) const {
 | |
|   assert(compression_type != kNoCompression);
 | |
| 
 | |
|   BlockContents contents;
 | |
|   auto cfh = static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
 | |
| 
 | |
|   {
 | |
|     StopWatch decompression_sw(clock_, statistics_,
 | |
|                                BLOB_DB_DECOMPRESSION_MICROS);
 | |
|     UncompressionContext context(compression_type);
 | |
|     UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
 | |
|                            compression_type);
 | |
|     Status s = UncompressBlockContentsForCompressionType(
 | |
|         info, compressed_value.data(), compressed_value.size(), &contents,
 | |
|         kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
 | |
|     if (!s.ok()) {
 | |
|       return Status::Corruption("Unable to decompress blob.");
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   value_output->PinSelf(contents.data);
 | |
| 
 | |
|   return Status::OK();
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::CompactFiles(
 | |
|     const CompactionOptions& compact_options,
 | |
|     const std::vector<std::string>& input_file_names, const int output_level,
 | |
|     const int output_path_id, std::vector<std::string>* const output_file_names,
 | |
|     CompactionJobInfo* compaction_job_info) {
 | |
|   // Note: we need CompactionJobInfo to be able to track updates to the
 | |
|   // blob file <-> SST mappings, so we provide one if the user hasn't,
 | |
|   // assuming that GC is enabled.
 | |
|   CompactionJobInfo info{};
 | |
|   if (bdb_options_.enable_garbage_collection && !compaction_job_info) {
 | |
|     compaction_job_info = &info;
 | |
|   }
 | |
| 
 | |
|   const Status s =
 | |
|       db_->CompactFiles(compact_options, input_file_names, output_level,
 | |
|                         output_path_id, output_file_names, compaction_job_info);
 | |
|   if (!s.ok()) {
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   if (bdb_options_.enable_garbage_collection) {
 | |
|     assert(compaction_job_info);
 | |
|     ProcessCompactionJobInfo(*compaction_job_info);
 | |
|   }
 | |
| 
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::GetCompactionContextCommon(BlobCompactionContext* context) {
 | |
|   assert(context);
 | |
| 
 | |
|   context->blob_db_impl = this;
 | |
|   context->next_file_number = next_file_number_.load();
 | |
|   context->current_blob_files.clear();
 | |
|   for (auto& p : blob_files_) {
 | |
|     context->current_blob_files.insert(p.first);
 | |
|   }
 | |
|   context->fifo_eviction_seq = fifo_eviction_seq_;
 | |
|   context->evict_expiration_up_to = evict_expiration_up_to_;
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) {
 | |
|   assert(context);
 | |
| 
 | |
|   ReadLock l(&mutex_);
 | |
|   GetCompactionContextCommon(context);
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context,
 | |
|                                       BlobCompactionContextGC* context_gc) {
 | |
|   assert(context);
 | |
|   assert(context_gc);
 | |
| 
 | |
|   ReadLock l(&mutex_);
 | |
|   GetCompactionContextCommon(context);
 | |
| 
 | |
|   if (!live_imm_non_ttl_blob_files_.empty()) {
 | |
|     auto it = live_imm_non_ttl_blob_files_.begin();
 | |
|     std::advance(it, bdb_options_.garbage_collection_cutoff *
 | |
|                          live_imm_non_ttl_blob_files_.size());
 | |
|     context_gc->cutoff_file_number = it != live_imm_non_ttl_blob_files_.end()
 | |
|                                          ? it->first
 | |
|                                          : std::numeric_limits<uint64_t>::max();
 | |
|   }
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::UpdateLiveSSTSize() {
 | |
|   uint64_t live_sst_size = 0;
 | |
|   bool ok = GetIntProperty(DB::Properties::kLiveSstFilesSize, &live_sst_size);
 | |
|   if (ok) {
 | |
|     live_sst_size_.store(live_sst_size);
 | |
|     ROCKS_LOG_INFO(db_options_.info_log,
 | |
|                    "Updated total SST file size: %" PRIu64 " bytes.",
 | |
|                    live_sst_size);
 | |
|   } else {
 | |
|     ROCKS_LOG_ERROR(
 | |
|         db_options_.info_log,
 | |
|         "Failed to update total SST file size after flush or compaction.");
 | |
|   }
 | |
|   {
 | |
|     // Trigger FIFO eviction if needed.
 | |
|     MutexLock l(&write_mutex_);
 | |
|     Status s = CheckSizeAndEvictBlobFiles(0, true /*force*/);
 | |
|     if (s.IsNoSpace()) {
 | |
|       ROCKS_LOG_WARN(db_options_.info_log,
 | |
|                      "DB grow out-of-space after SST size updated. Current live"
 | |
|                      " SST size: %" PRIu64
 | |
|                      " , current blob files size: %" PRIu64 ".",
 | |
|                      live_sst_size_.load(), total_blob_size_.load());
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::CheckSizeAndEvictBlobFiles(uint64_t blob_size,
 | |
|                                               bool force_evict) {
 | |
|   write_mutex_.AssertHeld();
 | |
| 
 | |
|   uint64_t live_sst_size = live_sst_size_.load();
 | |
|   if (bdb_options_.max_db_size == 0 ||
 | |
|       live_sst_size + total_blob_size_.load() + blob_size <=
 | |
|           bdb_options_.max_db_size) {
 | |
|     return Status::OK();
 | |
|   }
 | |
| 
 | |
|   if (bdb_options_.is_fifo == false ||
 | |
|       (!force_evict && live_sst_size + blob_size > bdb_options_.max_db_size)) {
 | |
|     // FIFO eviction is disabled, or no space to insert new blob even we evict
 | |
|     // all blob files.
 | |
|     return Status::NoSpace(
 | |
|         "Write failed, as writing it would exceed max_db_size limit.");
 | |
|   }
 | |
| 
 | |
|   std::vector<std::shared_ptr<BlobFile>> candidate_files;
 | |
|   CopyBlobFiles(&candidate_files);
 | |
|   std::sort(candidate_files.begin(), candidate_files.end(),
 | |
|             BlobFileComparator());
 | |
|   fifo_eviction_seq_ = GetLatestSequenceNumber();
 | |
| 
 | |
|   WriteLock l(&mutex_);
 | |
| 
 | |
|   while (!candidate_files.empty() &&
 | |
|          live_sst_size + total_blob_size_.load() + blob_size >
 | |
|              bdb_options_.max_db_size) {
 | |
|     std::shared_ptr<BlobFile> blob_file = candidate_files.back();
 | |
|     candidate_files.pop_back();
 | |
|     WriteLock file_lock(&blob_file->mutex_);
 | |
|     if (blob_file->Obsolete()) {
 | |
|       // File already obsoleted by someone else.
 | |
|       assert(blob_file->Immutable());
 | |
|       continue;
 | |
|     }
 | |
|     // FIFO eviction can evict open blob files.
 | |
|     if (!blob_file->Immutable()) {
 | |
|       Status s = CloseBlobFile(blob_file);
 | |
|       if (!s.ok()) {
 | |
|         return s;
 | |
|       }
 | |
|     }
 | |
|     assert(blob_file->Immutable());
 | |
|     auto expiration_range = blob_file->GetExpirationRange();
 | |
|     ROCKS_LOG_INFO(db_options_.info_log,
 | |
|                    "Evict oldest blob file since DB out of space. Current "
 | |
|                    "live SST file size: %" PRIu64 ", total blob size: %" PRIu64
 | |
|                    ", max db size: %" PRIu64 ", evicted blob file #%" PRIu64
 | |
|                    ".",
 | |
|                    live_sst_size, total_blob_size_.load(),
 | |
|                    bdb_options_.max_db_size, blob_file->BlobFileNumber());
 | |
|     ObsoleteBlobFile(blob_file, fifo_eviction_seq_, true /*update_size*/);
 | |
|     evict_expiration_up_to_ = expiration_range.first;
 | |
|     RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED);
 | |
|     RecordTick(statistics_, BLOB_DB_FIFO_NUM_KEYS_EVICTED,
 | |
|                blob_file->BlobCount());
 | |
|     RecordTick(statistics_, BLOB_DB_FIFO_BYTES_EVICTED,
 | |
|                blob_file->GetFileSize());
 | |
|     TEST_SYNC_POINT("BlobDBImpl::EvictOldestBlobFile:Evicted");
 | |
|   }
 | |
|   if (live_sst_size + total_blob_size_.load() + blob_size >
 | |
|       bdb_options_.max_db_size) {
 | |
|     return Status::NoSpace(
 | |
|         "Write failed, as writing it would exceed max_db_size limit.");
 | |
|   }
 | |
|   return Status::OK();
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
 | |
|                               const std::string& headerbuf, const Slice& key,
 | |
|                               const Slice& value, uint64_t expiration,
 | |
|                               std::string* index_entry) {
 | |
|   Status s;
 | |
|   uint64_t blob_offset = 0;
 | |
|   uint64_t key_offset = 0;
 | |
|   {
 | |
|     WriteLock lockbfile_w(&bfile->mutex_);
 | |
|     std::shared_ptr<BlobLogWriter> writer;
 | |
|     s = CheckOrCreateWriterLocked(bfile, &writer);
 | |
|     if (!s.ok()) {
 | |
|       return s;
 | |
|     }
 | |
| 
 | |
|     // write the blob to the blob log.
 | |
|     s = writer->EmitPhysicalRecord(headerbuf, key, value, &key_offset,
 | |
|                                    &blob_offset);
 | |
|   }
 | |
| 
 | |
|   if (!s.ok()) {
 | |
|     ROCKS_LOG_ERROR(db_options_.info_log,
 | |
|                     "Invalid status in AppendBlob: %s status: '%s'",
 | |
|                     bfile->PathName().c_str(), s.ToString().c_str());
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   uint64_t size_put = headerbuf.size() + key.size() + value.size();
 | |
|   bfile->BlobRecordAdded(size_put);
 | |
|   total_blob_size_ += size_put;
 | |
| 
 | |
|   if (expiration == kNoExpiration) {
 | |
|     BlobIndex::EncodeBlob(index_entry, bfile->BlobFileNumber(), blob_offset,
 | |
|                           value.size(), bdb_options_.compression);
 | |
|   } else {
 | |
|     BlobIndex::EncodeBlobTTL(index_entry, expiration, bfile->BlobFileNumber(),
 | |
|                              blob_offset, value.size(),
 | |
|                              bdb_options_.compression);
 | |
|   }
 | |
| 
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| std::vector<Status> BlobDBImpl::MultiGet(
 | |
|     const ReadOptions& read_options,
 | |
|     const std::vector<Slice>& keys, std::vector<std::string>* values) {
 | |
|   StopWatch multiget_sw(clock_, statistics_, BLOB_DB_MULTIGET_MICROS);
 | |
|   RecordTick(statistics_, BLOB_DB_NUM_MULTIGET);
 | |
|   // Get a snapshot to avoid blob file get deleted between we
 | |
|   // fetch and index entry and reading from the file.
 | |
|   ReadOptions ro(read_options);
 | |
|   bool snapshot_created = SetSnapshotIfNeeded(&ro);
 | |
| 
 | |
|   std::vector<Status> statuses;
 | |
|   statuses.reserve(keys.size());
 | |
|   values->clear();
 | |
|   values->reserve(keys.size());
 | |
|   PinnableSlice value;
 | |
|   for (size_t i = 0; i < keys.size(); i++) {
 | |
|     statuses.push_back(Get(ro, DefaultColumnFamily(), keys[i], &value));
 | |
|     values->push_back(value.ToString());
 | |
|     value.Reset();
 | |
|   }
 | |
|   if (snapshot_created) {
 | |
|     db_->ReleaseSnapshot(ro.snapshot);
 | |
|   }
 | |
|   return statuses;
 | |
| }
 | |
| 
 | |
| bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
 | |
|   assert(read_options != nullptr);
 | |
|   if (read_options->snapshot != nullptr) {
 | |
|     return false;
 | |
|   }
 | |
|   read_options->snapshot = db_->GetSnapshot();
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
 | |
|                                 PinnableSlice* value, uint64_t* expiration) {
 | |
|   assert(value);
 | |
| 
 | |
|   BlobIndex blob_index;
 | |
|   Status s = blob_index.DecodeFrom(index_entry);
 | |
|   if (!s.ok()) {
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   if (blob_index.HasTTL() && blob_index.expiration() <= EpochNow()) {
 | |
|     return Status::NotFound("Key expired");
 | |
|   }
 | |
| 
 | |
|   if (expiration != nullptr) {
 | |
|     if (blob_index.HasTTL()) {
 | |
|       *expiration = blob_index.expiration();
 | |
|     } else {
 | |
|       *expiration = kNoExpiration;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if (blob_index.IsInlined()) {
 | |
|     // TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same
 | |
|     // memory buffer to avoid extra copy.
 | |
|     value->PinSelf(blob_index.value());
 | |
|     return Status::OK();
 | |
|   }
 | |
| 
 | |
|   CompressionType compression_type = kNoCompression;
 | |
|   s = GetRawBlobFromFile(key, blob_index.file_number(), blob_index.offset(),
 | |
|                          blob_index.size(), value, &compression_type);
 | |
|   if (!s.ok()) {
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   if (compression_type != kNoCompression) {
 | |
|     s = DecompressSlice(*value, compression_type, value);
 | |
|     if (!s.ok()) {
 | |
|       if (debug_level_ >= 2) {
 | |
|         ROCKS_LOG_ERROR(
 | |
|             db_options_.info_log,
 | |
|             "Uncompression error during blob read from file: %" PRIu64
 | |
|             " blob_offset: %" PRIu64 " blob_size: %" PRIu64
 | |
|             " key: %s status: '%s'",
 | |
|             blob_index.file_number(), blob_index.offset(), blob_index.size(),
 | |
|             key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
 | |
|       }
 | |
|       return s;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return Status::OK();
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number,
 | |
|                                       uint64_t offset, uint64_t size,
 | |
|                                       PinnableSlice* value,
 | |
|                                       CompressionType* compression_type) {
 | |
|   assert(value);
 | |
|   assert(compression_type);
 | |
|   assert(*compression_type == kNoCompression);
 | |
| 
 | |
|   if (!size) {
 | |
|     value->PinSelf("");
 | |
|     return Status::OK();
 | |
|   }
 | |
| 
 | |
|   // offset has to have certain min, as we will read CRC
 | |
|   // later from the Blob Header, which needs to be also a
 | |
|   // valid offset.
 | |
|   if (offset <
 | |
|       (BlobLogHeader::kSize + BlobLogRecord::kHeaderSize + key.size())) {
 | |
|     if (debug_level_ >= 2) {
 | |
|       ROCKS_LOG_ERROR(db_options_.info_log,
 | |
|                       "Invalid blob index file_number: %" PRIu64
 | |
|                       " blob_offset: %" PRIu64 " blob_size: %" PRIu64
 | |
|                       " key: %s",
 | |
|                       file_number, offset, size,
 | |
|                       key.ToString(/* output_hex */ true).c_str());
 | |
|     }
 | |
| 
 | |
|     return Status::NotFound("Invalid blob offset");
 | |
|   }
 | |
| 
 | |
|   std::shared_ptr<BlobFile> blob_file;
 | |
| 
 | |
|   {
 | |
|     ReadLock rl(&mutex_);
 | |
|     auto it = blob_files_.find(file_number);
 | |
| 
 | |
|     // file was deleted
 | |
|     if (it == blob_files_.end()) {
 | |
|       return Status::NotFound("Blob Not Found as blob file missing");
 | |
|     }
 | |
| 
 | |
|     blob_file = it->second;
 | |
|   }
 | |
| 
 | |
|   *compression_type = blob_file->GetCompressionType();
 | |
| 
 | |
|   // takes locks when called
 | |
|   std::shared_ptr<RandomAccessFileReader> reader;
 | |
|   Status s = GetBlobFileReader(blob_file, &reader);
 | |
|   if (!s.ok()) {
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   assert(offset >= key.size() + sizeof(uint32_t));
 | |
|   const uint64_t record_offset = offset - key.size() - sizeof(uint32_t);
 | |
|   const uint64_t record_size = sizeof(uint32_t) + key.size() + size;
 | |
| 
 | |
|   // Allocate the buffer. This is safe in C++11
 | |
|   std::string buf;
 | |
|   AlignedBuf aligned_buf;
 | |
| 
 | |
|   // A partial blob record contain checksum, key and value.
 | |
|   Slice blob_record;
 | |
| 
 | |
|   {
 | |
|     StopWatch read_sw(clock_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
 | |
|     // TODO: rate limit old blob DB file reads.
 | |
|     if (reader->use_direct_io()) {
 | |
|       s = reader->Read(IOOptions(), record_offset,
 | |
|                        static_cast<size_t>(record_size), &blob_record, nullptr,
 | |
|                        &aligned_buf, Env::IO_TOTAL /* rate_limiter_priority */);
 | |
|     } else {
 | |
|       buf.reserve(static_cast<size_t>(record_size));
 | |
|       s = reader->Read(IOOptions(), record_offset,
 | |
|                        static_cast<size_t>(record_size), &blob_record, &buf[0],
 | |
|                        nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
 | |
|     }
 | |
|     RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size());
 | |
|   }
 | |
| 
 | |
|   if (!s.ok()) {
 | |
|     ROCKS_LOG_DEBUG(
 | |
|         db_options_.info_log,
 | |
|         "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
 | |
|         ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt ", status: '%s'",
 | |
|         file_number, offset, size, key.size(), s.ToString().c_str());
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   if (blob_record.size() != record_size) {
 | |
|     ROCKS_LOG_DEBUG(
 | |
|         db_options_.info_log,
 | |
|         "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
 | |
|         ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt
 | |
|         ", read %" ROCKSDB_PRIszt " bytes, expected %" PRIu64 " bytes",
 | |
|         file_number, offset, size, key.size(), blob_record.size(), record_size);
 | |
| 
 | |
|     return Status::Corruption("Failed to retrieve blob from blob index.");
 | |
|   }
 | |
| 
 | |
|   Slice crc_slice(blob_record.data(), sizeof(uint32_t));
 | |
|   Slice blob_value(blob_record.data() + sizeof(uint32_t) + key.size(),
 | |
|                    static_cast<size_t>(size));
 | |
| 
 | |
|   uint32_t crc_exp = 0;
 | |
|   if (!GetFixed32(&crc_slice, &crc_exp)) {
 | |
|     ROCKS_LOG_DEBUG(
 | |
|         db_options_.info_log,
 | |
|         "Unable to decode CRC from blob file %" PRIu64 ", blob_offset: %" PRIu64
 | |
|         ", blob_size: %" PRIu64 ", key size: %" ROCKSDB_PRIszt ", status: '%s'",
 | |
|         file_number, offset, size, key.size(), s.ToString().c_str());
 | |
|     return Status::Corruption("Unable to decode checksum.");
 | |
|   }
 | |
| 
 | |
|   uint32_t crc = crc32c::Value(blob_record.data() + sizeof(uint32_t),
 | |
|                                blob_record.size() - sizeof(uint32_t));
 | |
|   crc = crc32c::Mask(crc);  // Adjust for storage
 | |
|   if (crc != crc_exp) {
 | |
|     if (debug_level_ >= 2) {
 | |
|       ROCKS_LOG_ERROR(
 | |
|           db_options_.info_log,
 | |
|           "Blob crc mismatch file: %" PRIu64 " blob_offset: %" PRIu64
 | |
|           " blob_size: %" PRIu64 " key: %s status: '%s'",
 | |
|           file_number, offset, size,
 | |
|           key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
 | |
|     }
 | |
| 
 | |
|     return Status::Corruption("Corruption. Blob CRC mismatch");
 | |
|   }
 | |
| 
 | |
|   value->PinSelf(blob_value);
 | |
| 
 | |
|   return Status::OK();
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::Get(const ReadOptions& read_options,
 | |
|                        ColumnFamilyHandle* column_family, const Slice& key,
 | |
|                        PinnableSlice* value) {
 | |
|   return Get(read_options, column_family, key, value,
 | |
|              static_cast<uint64_t*>(nullptr) /*expiration*/);
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::Get(const ReadOptions& read_options,
 | |
|                        ColumnFamilyHandle* column_family, const Slice& key,
 | |
|                        PinnableSlice* value, uint64_t* expiration) {
 | |
|   StopWatch get_sw(clock_, statistics_, BLOB_DB_GET_MICROS);
 | |
|   RecordTick(statistics_, BLOB_DB_NUM_GET);
 | |
|   return GetImpl(read_options, column_family, key, value, expiration);
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::GetImpl(const ReadOptions& read_options,
 | |
|                            ColumnFamilyHandle* column_family, const Slice& key,
 | |
|                            PinnableSlice* value, uint64_t* expiration) {
 | |
|   if (column_family->GetID() != DefaultColumnFamily()->GetID()) {
 | |
|     return Status::NotSupported(
 | |
|         "Blob DB doesn't support non-default column family.");
 | |
|   }
 | |
|   // Get a snapshot to avoid blob file get deleted between we
 | |
|   // fetch and index entry and reading from the file.
 | |
|   // TODO(yiwu): For Get() retry if file not found would be a simpler strategy.
 | |
|   ReadOptions ro(read_options);
 | |
|   bool snapshot_created = SetSnapshotIfNeeded(&ro);
 | |
| 
 | |
|   PinnableSlice index_entry;
 | |
|   Status s;
 | |
|   bool is_blob_index = false;
 | |
|   DBImpl::GetImplOptions get_impl_options;
 | |
|   get_impl_options.column_family = column_family;
 | |
|   get_impl_options.value = &index_entry;
 | |
|   get_impl_options.is_blob_index = &is_blob_index;
 | |
|   s = db_impl_->GetImpl(ro, key, get_impl_options);
 | |
|   if (expiration != nullptr) {
 | |
|     *expiration = kNoExpiration;
 | |
|   }
 | |
|   RecordTick(statistics_, BLOB_DB_NUM_KEYS_READ);
 | |
|   if (s.ok()) {
 | |
|     if (is_blob_index) {
 | |
|       s = GetBlobValue(key, index_entry, value, expiration);
 | |
|     } else {
 | |
|       // The index entry is the value itself in this case.
 | |
|       value->PinSelf(index_entry);
 | |
|     }
 | |
|     RecordTick(statistics_, BLOB_DB_BYTES_READ, value->size());
 | |
|   }
 | |
|   if (snapshot_created) {
 | |
|     db_->ReleaseSnapshot(ro.snapshot);
 | |
|   }
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
 | |
|   if (aborted) {
 | |
|     return std::make_pair(false, -1);
 | |
|   }
 | |
| 
 | |
|   ReadLock rl(&mutex_);
 | |
| 
 | |
|   ROCKS_LOG_INFO(db_options_.info_log, "Starting Sanity Check");
 | |
|   ROCKS_LOG_INFO(db_options_.info_log, "Number of files %" ROCKSDB_PRIszt,
 | |
|                  blob_files_.size());
 | |
|   ROCKS_LOG_INFO(db_options_.info_log, "Number of open files %" ROCKSDB_PRIszt,
 | |
|                  open_ttl_files_.size());
 | |
| 
 | |
|   for (const auto& blob_file : open_ttl_files_) {
 | |
|     (void)blob_file;
 | |
|     assert(!blob_file->Immutable());
 | |
|   }
 | |
| 
 | |
|   for (const auto& pair : live_imm_non_ttl_blob_files_) {
 | |
|     const auto& blob_file = pair.second;
 | |
|     (void)blob_file;
 | |
|     assert(!blob_file->HasTTL());
 | |
|     assert(blob_file->Immutable());
 | |
|   }
 | |
| 
 | |
|   uint64_t now = EpochNow();
 | |
| 
 | |
|   for (auto blob_file_pair : blob_files_) {
 | |
|     auto blob_file = blob_file_pair.second;
 | |
|     std::ostringstream buf;
 | |
| 
 | |
|     buf << "Blob file " << blob_file->BlobFileNumber() << ", size "
 | |
|         << blob_file->GetFileSize() << ", blob count " << blob_file->BlobCount()
 | |
|         << ", immutable " << blob_file->Immutable();
 | |
| 
 | |
|     if (blob_file->HasTTL()) {
 | |
|       ExpirationRange expiration_range;
 | |
|       {
 | |
|         ReadLock file_lock(&blob_file->mutex_);
 | |
|         expiration_range = blob_file->GetExpirationRange();
 | |
|       }
 | |
|       buf << ", expiration range (" << expiration_range.first << ", "
 | |
|           << expiration_range.second << ")";
 | |
| 
 | |
|       if (!blob_file->Obsolete()) {
 | |
|         buf << ", expire in " << (expiration_range.second - now) << "seconds";
 | |
|       }
 | |
|     }
 | |
|     if (blob_file->Obsolete()) {
 | |
|       buf << ", obsolete at " << blob_file->GetObsoleteSequence();
 | |
|     }
 | |
|     buf << ".";
 | |
|     ROCKS_LOG_INFO(db_options_.info_log, "%s", buf.str().c_str());
 | |
|   }
 | |
| 
 | |
|   // reschedule
 | |
|   return std::make_pair(true, -1);
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile) {
 | |
|   TEST_SYNC_POINT("BlobDBImpl::CloseBlobFile");
 | |
|   assert(bfile);
 | |
|   assert(!bfile->Immutable());
 | |
|   assert(!bfile->Obsolete());
 | |
| 
 | |
|   if (bfile->HasTTL() || bfile == open_non_ttl_file_) {
 | |
|     write_mutex_.AssertHeld();
 | |
|   }
 | |
| 
 | |
|   ROCKS_LOG_INFO(db_options_.info_log,
 | |
|                  "Closing blob file %" PRIu64 ". Path: %s",
 | |
|                  bfile->BlobFileNumber(), bfile->PathName().c_str());
 | |
| 
 | |
|   const SequenceNumber sequence = GetLatestSequenceNumber();
 | |
| 
 | |
|   const Status s = bfile->WriteFooterAndCloseLocked(sequence);
 | |
| 
 | |
|   if (s.ok()) {
 | |
|     total_blob_size_ += BlobLogFooter::kSize;
 | |
|   } else {
 | |
|     bfile->MarkImmutable(sequence);
 | |
| 
 | |
|     ROCKS_LOG_ERROR(db_options_.info_log,
 | |
|                     "Failed to close blob file %" PRIu64 "with error: %s",
 | |
|                     bfile->BlobFileNumber(), s.ToString().c_str());
 | |
|   }
 | |
| 
 | |
|   if (bfile->HasTTL()) {
 | |
|     size_t erased __attribute__((__unused__));
 | |
|     erased = open_ttl_files_.erase(bfile);
 | |
|   } else {
 | |
|     if (bfile == open_non_ttl_file_) {
 | |
|       open_non_ttl_file_ = nullptr;
 | |
|     }
 | |
| 
 | |
|     const uint64_t blob_file_number = bfile->BlobFileNumber();
 | |
|     auto it = live_imm_non_ttl_blob_files_.lower_bound(blob_file_number);
 | |
|     assert(it == live_imm_non_ttl_blob_files_.end() ||
 | |
|            it->first != blob_file_number);
 | |
|     live_imm_non_ttl_blob_files_.insert(
 | |
|         it, std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
 | |
|                 blob_file_number, bfile));
 | |
|   }
 | |
| 
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile) {
 | |
|   write_mutex_.AssertHeld();
 | |
| 
 | |
|   // atomic read
 | |
|   if (bfile->GetFileSize() < bdb_options_.blob_file_size) {
 | |
|     return Status::OK();
 | |
|   }
 | |
| 
 | |
|   WriteLock lock(&mutex_);
 | |
|   WriteLock file_lock(&bfile->mutex_);
 | |
| 
 | |
|   assert(!bfile->Obsolete() || bfile->Immutable());
 | |
|   if (bfile->Immutable()) {
 | |
|     return Status::OK();
 | |
|   }
 | |
| 
 | |
|   return CloseBlobFile(bfile);
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
 | |
|                                   SequenceNumber obsolete_seq,
 | |
|                                   bool update_size) {
 | |
|   assert(blob_file->Immutable());
 | |
|   assert(!blob_file->Obsolete());
 | |
| 
 | |
|   // Should hold write lock of mutex_ or during DB open.
 | |
|   blob_file->MarkObsolete(obsolete_seq);
 | |
|   obsolete_files_.push_back(blob_file);
 | |
|   assert(total_blob_size_.load() >= blob_file->GetFileSize());
 | |
|   if (update_size) {
 | |
|     total_blob_size_ -= blob_file->GetFileSize();
 | |
|   }
 | |
| }
 | |
| 
 | |
| bool BlobDBImpl::VisibleToActiveSnapshot(
 | |
|     const std::shared_ptr<BlobFile>& bfile) {
 | |
|   assert(bfile->Obsolete());
 | |
| 
 | |
|   // We check whether the oldest snapshot is no less than the last sequence
 | |
|   // by the time the blob file become obsolete. If so, the blob file is not
 | |
|   // visible to all existing snapshots.
 | |
|   //
 | |
|   // If we keep track of the earliest sequence of the keys in the blob file,
 | |
|   // we could instead check if there's a snapshot falls in range
 | |
|   // [earliest_sequence, obsolete_sequence). But doing so will make the
 | |
|   // implementation more complicated.
 | |
|   SequenceNumber obsolete_sequence = bfile->GetObsoleteSequence();
 | |
|   SequenceNumber oldest_snapshot = kMaxSequenceNumber;
 | |
|   {
 | |
|     // Need to lock DBImpl mutex before access snapshot list.
 | |
|     InstrumentedMutexLock l(db_impl_->mutex());
 | |
|     auto& snapshots = db_impl_->snapshots();
 | |
|     if (!snapshots.empty()) {
 | |
|       oldest_snapshot = snapshots.oldest()->GetSequenceNumber();
 | |
|     }
 | |
|   }
 | |
|   bool visible = oldest_snapshot < obsolete_sequence;
 | |
|   if (visible) {
 | |
|     ROCKS_LOG_INFO(db_options_.info_log,
 | |
|                    "Obsolete blob file %" PRIu64 " (obsolete at %" PRIu64
 | |
|                    ") visible to oldest snapshot %" PRIu64 ".",
 | |
|                    bfile->BlobFileNumber(), obsolete_sequence, oldest_snapshot);
 | |
|   }
 | |
|   return visible;
 | |
| }
 | |
| 
 | |
| std::pair<bool, int64_t> BlobDBImpl::EvictExpiredFiles(bool aborted) {
 | |
|   if (aborted) {
 | |
|     return std::make_pair(false, -1);
 | |
|   }
 | |
| 
 | |
|   TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:0");
 | |
|   TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:1");
 | |
| 
 | |
|   std::vector<std::shared_ptr<BlobFile>> process_files;
 | |
|   uint64_t now = EpochNow();
 | |
|   {
 | |
|     ReadLock rl(&mutex_);
 | |
|     for (auto p : blob_files_) {
 | |
|       auto& blob_file = p.second;
 | |
|       ReadLock file_lock(&blob_file->mutex_);
 | |
|       if (blob_file->HasTTL() && !blob_file->Obsolete() &&
 | |
|           blob_file->GetExpirationRange().second <= now) {
 | |
|         process_files.push_back(blob_file);
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:2");
 | |
|   TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:3");
 | |
|   TEST_SYNC_POINT_CALLBACK("BlobDBImpl::EvictExpiredFiles:cb", nullptr);
 | |
| 
 | |
|   SequenceNumber seq = GetLatestSequenceNumber();
 | |
|   {
 | |
|     MutexLock l(&write_mutex_);
 | |
|     WriteLock lock(&mutex_);
 | |
|     for (auto& blob_file : process_files) {
 | |
|       WriteLock file_lock(&blob_file->mutex_);
 | |
| 
 | |
|       // Need to double check if the file is obsolete.
 | |
|       if (blob_file->Obsolete()) {
 | |
|         assert(blob_file->Immutable());
 | |
|         continue;
 | |
|       }
 | |
| 
 | |
|       if (!blob_file->Immutable()) {
 | |
|         CloseBlobFile(blob_file);
 | |
|       }
 | |
| 
 | |
|       assert(blob_file->Immutable());
 | |
| 
 | |
|       ObsoleteBlobFile(blob_file, seq, true /*update_size*/);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return std::make_pair(true, -1);
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::SyncBlobFiles() {
 | |
|   MutexLock l(&write_mutex_);
 | |
| 
 | |
|   std::vector<std::shared_ptr<BlobFile>> process_files;
 | |
|   {
 | |
|     ReadLock rl(&mutex_);
 | |
|     for (auto fitr : open_ttl_files_) {
 | |
|       process_files.push_back(fitr);
 | |
|     }
 | |
|     if (open_non_ttl_file_ != nullptr) {
 | |
|       process_files.push_back(open_non_ttl_file_);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   Status s;
 | |
|   for (auto& blob_file : process_files) {
 | |
|     s = blob_file->Fsync();
 | |
|     if (!s.ok()) {
 | |
|       ROCKS_LOG_ERROR(db_options_.info_log,
 | |
|                       "Failed to sync blob file %" PRIu64 ", status: %s",
 | |
|                       blob_file->BlobFileNumber(), s.ToString().c_str());
 | |
|       return s;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   s = dir_ent_->FsyncWithDirOptions(IOOptions(), nullptr, DirFsyncOptions());
 | |
|   if (!s.ok()) {
 | |
|     ROCKS_LOG_ERROR(db_options_.info_log,
 | |
|                     "Failed to sync blob directory, status: %s",
 | |
|                     s.ToString().c_str());
 | |
|   }
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| std::pair<bool, int64_t> BlobDBImpl::ReclaimOpenFiles(bool aborted) {
 | |
|   if (aborted) return std::make_pair(false, -1);
 | |
| 
 | |
|   if (open_file_count_.load() < kOpenFilesTrigger) {
 | |
|     return std::make_pair(true, -1);
 | |
|   }
 | |
| 
 | |
|   // in the future, we should sort by last_access_
 | |
|   // instead of closing every file
 | |
|   ReadLock rl(&mutex_);
 | |
|   for (auto const& ent : blob_files_) {
 | |
|     auto bfile = ent.second;
 | |
|     if (bfile->last_access_.load() == -1) continue;
 | |
| 
 | |
|     WriteLock lockbfile_w(&bfile->mutex_);
 | |
|     CloseRandomAccessLocked(bfile);
 | |
|   }
 | |
| 
 | |
|   return std::make_pair(true, -1);
 | |
| }
 | |
| 
 | |
| std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
 | |
|   if (aborted) {
 | |
|     return std::make_pair(false, -1);
 | |
|   }
 | |
| 
 | |
|   MutexLock delete_file_lock(&delete_file_mutex_);
 | |
|   if (disable_file_deletions_ > 0) {
 | |
|     return std::make_pair(true, -1);
 | |
|   }
 | |
| 
 | |
|   std::list<std::shared_ptr<BlobFile>> tobsolete;
 | |
|   {
 | |
|     WriteLock wl(&mutex_);
 | |
|     if (obsolete_files_.empty()) {
 | |
|       return std::make_pair(true, -1);
 | |
|     }
 | |
|     tobsolete.swap(obsolete_files_);
 | |
|   }
 | |
| 
 | |
|   bool file_deleted = false;
 | |
|   for (auto iter = tobsolete.begin(); iter != tobsolete.end();) {
 | |
|     auto bfile = *iter;
 | |
|     {
 | |
|       ReadLock lockbfile_r(&bfile->mutex_);
 | |
|       if (VisibleToActiveSnapshot(bfile)) {
 | |
|         ROCKS_LOG_INFO(db_options_.info_log,
 | |
|                        "Could not delete file due to snapshot failure %s",
 | |
|                        bfile->PathName().c_str());
 | |
|         ++iter;
 | |
|         continue;
 | |
|       }
 | |
|     }
 | |
|     ROCKS_LOG_INFO(db_options_.info_log,
 | |
|                    "Will delete file due to snapshot success %s",
 | |
|                    bfile->PathName().c_str());
 | |
| 
 | |
|     {
 | |
|       WriteLock wl(&mutex_);
 | |
|       blob_files_.erase(bfile->BlobFileNumber());
 | |
|     }
 | |
| 
 | |
|     Status s = DeleteDBFile(&(db_impl_->immutable_db_options()),
 | |
|                             bfile->PathName(), blob_dir_, true,
 | |
|                             /*force_fg=*/false);
 | |
|     if (!s.ok()) {
 | |
|       ROCKS_LOG_ERROR(db_options_.info_log,
 | |
|                       "File failed to be deleted as obsolete %s",
 | |
|                       bfile->PathName().c_str());
 | |
|       ++iter;
 | |
|       continue;
 | |
|     }
 | |
| 
 | |
|     file_deleted = true;
 | |
|     ROCKS_LOG_INFO(db_options_.info_log,
 | |
|                    "File deleted as obsolete from blob dir %s",
 | |
|                    bfile->PathName().c_str());
 | |
| 
 | |
|     iter = tobsolete.erase(iter);
 | |
|   }
 | |
| 
 | |
|   // directory change. Fsync
 | |
|   if (file_deleted) {
 | |
|     Status s = dir_ent_->FsyncWithDirOptions(
 | |
|         IOOptions(), nullptr,
 | |
|         DirFsyncOptions(DirFsyncOptions::FsyncReason::kFileDeleted));
 | |
|     if (!s.ok()) {
 | |
|       ROCKS_LOG_ERROR(db_options_.info_log, "Failed to sync dir %s: %s",
 | |
|                       blob_dir_.c_str(), s.ToString().c_str());
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // put files back into obsolete if for some reason, delete failed
 | |
|   if (!tobsolete.empty()) {
 | |
|     WriteLock wl(&mutex_);
 | |
|     for (auto bfile : tobsolete) {
 | |
|       blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile));
 | |
|       obsolete_files_.push_front(bfile);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return std::make_pair(!aborted, -1);
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::CopyBlobFiles(
 | |
|     std::vector<std::shared_ptr<BlobFile>>* bfiles_copy) {
 | |
|   ReadLock rl(&mutex_);
 | |
|   for (auto const& p : blob_files_) {
 | |
|     bfiles_copy->push_back(p.second);
 | |
|   }
 | |
| }
 | |
| 
 | |
| Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) {
 | |
|   auto* cfd =
 | |
|       static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
 | |
|           ->cfd();
 | |
|   // Get a snapshot to avoid blob file get deleted between we
 | |
|   // fetch and index entry and reading from the file.
 | |
|   ManagedSnapshot* own_snapshot = nullptr;
 | |
|   const Snapshot* snapshot = read_options.snapshot;
 | |
|   if (snapshot == nullptr) {
 | |
|     own_snapshot = new ManagedSnapshot(db_);
 | |
|     snapshot = own_snapshot->snapshot();
 | |
|   }
 | |
|   auto* iter = db_impl_->NewIteratorImpl(
 | |
|       read_options, cfd, snapshot->GetSequenceNumber(),
 | |
|       nullptr /*read_callback*/, true /*expose_blob_index*/);
 | |
|   return new BlobDBIterator(own_snapshot, iter, this, clock_, statistics_);
 | |
| }
 | |
| 
 | |
| Status DestroyBlobDB(const std::string& dbname, const Options& options,
 | |
|                      const BlobDBOptions& bdb_options) {
 | |
|   const ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
 | |
|   Env* env = soptions.env;
 | |
| 
 | |
|   Status status;
 | |
|   std::string blobdir;
 | |
|   blobdir = (bdb_options.path_relative) ? dbname + "/" + bdb_options.blob_dir
 | |
|                                         : bdb_options.blob_dir;
 | |
| 
 | |
|   std::vector<std::string> filenames;
 | |
|   if (env->GetChildren(blobdir, &filenames).ok()) {
 | |
|     for (const auto& f : filenames) {
 | |
|       uint64_t number;
 | |
|       FileType type;
 | |
|       if (ParseFileName(f, &number, &type) && type == kBlobFile) {
 | |
|         Status del = DeleteDBFile(&soptions, blobdir + "/" + f, blobdir, true,
 | |
|                                   /*force_fg=*/false);
 | |
|         if (status.ok() && !del.ok()) {
 | |
|           status = del;
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|     // TODO: What to do if we cannot delete the directory?
 | |
|     env->DeleteDir(blobdir).PermitUncheckedError();
 | |
|   }
 | |
|   Status destroy = DestroyDB(dbname, options);
 | |
|   if (status.ok() && !destroy.ok()) {
 | |
|     status = destroy;
 | |
|   }
 | |
| 
 | |
|   return status;
 | |
| }
 | |
| 
 | |
| #ifndef NDEBUG
 | |
| Status BlobDBImpl::TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
 | |
|                                      PinnableSlice* value) {
 | |
|   return GetBlobValue(key, index_entry, value);
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::TEST_AddDummyBlobFile(uint64_t blob_file_number,
 | |
|                                        SequenceNumber immutable_sequence) {
 | |
|   auto blob_file = std::make_shared<BlobFile>(this, blob_dir_, blob_file_number,
 | |
|                                               db_options_.info_log.get());
 | |
|   blob_file->MarkImmutable(immutable_sequence);
 | |
| 
 | |
|   blob_files_[blob_file_number] = blob_file;
 | |
|   live_imm_non_ttl_blob_files_[blob_file_number] = blob_file;
 | |
| }
 | |
| 
 | |
| std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
 | |
|   ReadLock l(&mutex_);
 | |
|   std::vector<std::shared_ptr<BlobFile>> blob_files;
 | |
|   for (auto& p : blob_files_) {
 | |
|     blob_files.emplace_back(p.second);
 | |
|   }
 | |
|   return blob_files;
 | |
| }
 | |
| 
 | |
| std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetLiveImmNonTTLFiles()
 | |
|     const {
 | |
|   ReadLock l(&mutex_);
 | |
|   std::vector<std::shared_ptr<BlobFile>> live_imm_non_ttl_files;
 | |
|   for (const auto& pair : live_imm_non_ttl_blob_files_) {
 | |
|     live_imm_non_ttl_files.emplace_back(pair.second);
 | |
|   }
 | |
|   return live_imm_non_ttl_files;
 | |
| }
 | |
| 
 | |
| std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetObsoleteFiles()
 | |
|     const {
 | |
|   ReadLock l(&mutex_);
 | |
|   std::vector<std::shared_ptr<BlobFile>> obsolete_files;
 | |
|   for (auto& bfile : obsolete_files_) {
 | |
|     obsolete_files.emplace_back(bfile);
 | |
|   }
 | |
|   return obsolete_files;
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::TEST_DeleteObsoleteFiles() {
 | |
|   DeleteObsoleteFiles(false /*abort*/);
 | |
| }
 | |
| 
 | |
| Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
 | |
|   MutexLock l(&write_mutex_);
 | |
|   WriteLock lock(&mutex_);
 | |
|   WriteLock file_lock(&bfile->mutex_);
 | |
| 
 | |
|   return CloseBlobFile(bfile);
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file,
 | |
|                                        SequenceNumber obsolete_seq,
 | |
|                                        bool update_size) {
 | |
|   return ObsoleteBlobFile(blob_file, obsolete_seq, update_size);
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::TEST_EvictExpiredFiles() {
 | |
|   EvictExpiredFiles(false /*abort*/);
 | |
| }
 | |
| 
 | |
| uint64_t BlobDBImpl::TEST_live_sst_size() { return live_sst_size_.load(); }
 | |
| 
 | |
| void BlobDBImpl::TEST_InitializeBlobFileToSstMapping(
 | |
|     const std::vector<LiveFileMetaData>& live_files) {
 | |
|   InitializeBlobFileToSstMapping(live_files);
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::TEST_ProcessFlushJobInfo(const FlushJobInfo& info) {
 | |
|   ProcessFlushJobInfo(info);
 | |
| }
 | |
| 
 | |
| void BlobDBImpl::TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info) {
 | |
|   ProcessCompactionJobInfo(info);
 | |
| }
 | |
| 
 | |
| #endif  //  !NDEBUG
 | |
| 
 | |
| }  // namespace blob_db
 | |
| }  // namespace ROCKSDB_NAMESPACE
 | |
| #endif  // ROCKSDB_LITE
 | |
| 
 |