@ -10,6 +10,7 @@
# include <cinttypes>
# include <iomanip>
# include <memory>
# include <sstream>
# include "db/blob_index.h"
# include "db/db_impl/db_impl.h"
@ -82,6 +83,7 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
env_options_ ( db_options ) ,
statistics_ ( db_options_ . statistics . get ( ) ) ,
next_file_number_ ( 1 ) ,
flush_sequence_ ( 0 ) ,
epoch_of_ ( 0 ) ,
closed_ ( true ) ,
open_file_count_ ( 0 ) ,
@ -206,6 +208,8 @@ Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
InitializeBlobFileToSstMapping ( live_files ) ;
MarkUnreferencedBlobFilesObsoleteDuringOpen ( ) ;
if ( ! disable_auto_compactions ) {
s = db_ - > EnableAutoCompaction ( * handles ) ;
if ( ! s . ok ( ) ) {
@ -288,23 +292,25 @@ Status BlobDBImpl::OpenAllBlobFiles() {
next_file_number_ . store ( * file_numbers . rbegin ( ) + 1 ) ;
}
std : : string blob_file_list ;
std : : string obsolete_file_list ;
std : : ostringstream blob_file_oss ;
std : : ostringstream live_imm_oss ;
std : : ostringstream obsolete_file_oss ;
for ( auto & file_number : file_numbers ) {
std : : shared_ptr < BlobFile > blob_file = std : : make_shared < BlobFile > (
this , blob_dir_ , file_number , db_options_ . info_log . get ( ) ) ;
blob_file - > MarkImmutable ( ) ;
blob_file - > MarkImmutable ( /* sequence */ 0 ) ;
// Read file header and footer
Status read_metadata_status = blob_file - > ReadMetadata ( env_ , env_options_ ) ;
if ( read_metadata_status . IsCorruption ( ) ) {
// Remove incomplete file.
ObsoleteBlobFile ( blob_file , 0 /*obsolete_seq*/ , false /*update_size*/ ) ;
if ( ! obsolete_file_list . empty ( ) ) {
obsolete_file_list . append ( " , " ) ;
if ( ! obsolete_files_ . empty ( ) ) {
obsolete_file_oss < < " , " ;
}
obsolete_file_list . append ( ToString ( file_number ) ) ;
obsolete_file_oss < < file_number ;
ObsoleteBlobFile ( blob_file , 0 /*obsolete_seq*/ , false /*update_size*/ ) ;
continue ;
} else if ( ! read_metadata_status . ok ( ) ) {
ROCKS_LOG_ERROR ( db_options_ . info_log ,
@ -316,20 +322,33 @@ Status BlobDBImpl::OpenAllBlobFiles() {
total_blob_size_ + = blob_file - > GetFileSize ( ) ;
if ( ! blob_files_ . empty ( ) ) {
blob_file_oss < < " , " ;
}
blob_file_oss < < file_number ;
blob_files_ [ file_number ] = blob_file ;
if ( ! blob_file_list . empty ( ) ) {
blob_file_list . append ( " , " ) ;
if ( ! blob_file - > HasTTL ( ) ) {
if ( ! live_imm_non_ttl_blob_files_ . empty ( ) ) {
live_imm_oss < < " , " ;
}
live_imm_oss < < file_number ;
live_imm_non_ttl_blob_files_ [ file_number ] = blob_file ;
}
blob_file_list . append ( ToString ( file_number ) ) ;
}
ROCKS_LOG_INFO ( db_options_ . info_log ,
" Found % " ROCKSDB_PRIszt " blob files: %s " , blob_files_ . size ( ) ,
blob_file_list . c_str ( ) ) ;
blob_file_oss . str ( ) . c_str ( ) ) ;
ROCKS_LOG_INFO (
db_options_ . info_log , " Found % " ROCKSDB_PRIszt " non-TTL blob files: %s " ,
live_imm_non_ttl_blob_files_ . size ( ) , live_imm_oss . str ( ) . c_str ( ) ) ;
ROCKS_LOG_INFO ( db_options_ . info_log ,
" Found % " ROCKSDB_PRIszt
" incomplete or corrupted blob files: %s " ,
obsolete_files_ . size ( ) , obsolete_file_list . c_str ( ) ) ;
obsolete_files_ . size ( ) , obsolete_file_oss . str ( ) . c_str ( ) ) ;
return s ;
}
@ -426,14 +445,16 @@ void BlobDBImpl::InitializeBlobFileToSstMapping(
void BlobDBImpl : : ProcessFlushJobInfo ( const FlushJobInfo & info ) {
assert ( bdb_options_ . enable_garbage_collection ) ;
if ( info . oldest_blob_file_number = = kInvalidBlobFileNumber ) {
return ;
}
WriteLock lock ( & mutex_ ) ;
{
ReadLock lock ( & mutex_ ) ;
if ( info . oldest_blob_file_number ! = kInvalidBlobFileNumber ) {
LinkSstToBlobFile ( info . file_number , info . oldest_blob_file_number ) ;
}
assert ( flush_sequence_ < info . largest_seqno ) ;
flush_sequence_ = info . largest_seqno ;
MarkUnreferencedBlobFilesObsolete ( ) ;
}
void BlobDBImpl : : ProcessCompactionJobInfo ( const CompactionJobInfo & info ) {
@ -443,27 +464,107 @@ void BlobDBImpl::ProcessCompactionJobInfo(const CompactionJobInfo& info) {
// file list in case of a trivial move. We process the inputs first
// to ensure the blob file still has a link after processing all updates.
{
ReadLock lock ( & mutex_ ) ;
WriteLock lock ( & mutex_ ) ;
for ( const auto & input : info . input_file_infos ) {
if ( input . oldest_blob_file_number = = kInvalidBlobFileNumber ) {
continue ;
}
for ( const auto & input : info . input_file_infos ) {
if ( input . oldest_blob_file_number = = kInvalidBlobFileNumber ) {
continue ;
}
UnlinkSstFromBlobFile ( input . file_number , input . oldest_blob_file_number ) ;
UnlinkSstFromBlobFile ( input . file_number , input . oldest_blob_file_number ) ;
}
for ( const auto & output : info . output_file_infos ) {
if ( output . oldest_blob_file_number = = kInvalidBlobFileNumber ) {
continue ;
}
for ( const auto & output : info . output_file_infos ) {
if ( output . oldest_blob_file_number = = kInvalidBlobFileNumber ) {
continue ;
}
LinkSstToBlobFile ( output . file_number , output . oldest_blob_file_number ) ;
}
LinkSstToBlobFile ( output . file_number , output . oldest_blob_file_number ) ;
MarkUnreferencedBlobFilesObsolete ( ) ;
}
bool BlobDBImpl : : MarkBlobFileObsoleteIfNeeded (
const std : : shared_ptr < BlobFile > & blob_file , SequenceNumber obsolete_seq ) {
assert ( blob_file ) ;
assert ( ! blob_file - > HasTTL ( ) ) ;
assert ( blob_file - > Immutable ( ) ) ;
assert ( bdb_options_ . enable_garbage_collection ) ;
// Note: FIFO eviction could have marked this file obsolete already.
if ( blob_file - > Obsolete ( ) ) {
return true ;
}
// We cannot mark this file (or any higher-numbered files for that matter)
// obsolete if it is referenced by any memtables or SSTs. We keep track of
// the SSTs explicitly. To account for memtables, we keep track of the highest
// sequence number received in flush notifications, and we do not mark the
// blob file obsolete if there are still unflushed memtables from before
// the time the blob file was closed.
if ( blob_file - > GetImmutableSequence ( ) > flush_sequence_ | |
! blob_file - > GetLinkedSstFiles ( ) . empty ( ) ) {
return false ;
}
ROCKS_LOG_INFO ( db_options_ . info_log ,
" Blob file % " PRIu64 " is no longer needed, marking obsolete " ,
blob_file - > BlobFileNumber ( ) ) ;
ObsoleteBlobFile ( blob_file , obsolete_seq , /* update_size */ true ) ;
return true ;
}
template < class Functor >
void BlobDBImpl : : MarkUnreferencedBlobFilesObsoleteImpl ( Functor mark_if_needed ) {
assert ( bdb_options_ . enable_garbage_collection ) ;
// Iterate through all live immutable non-TTL blob files, and mark them
// obsolete assuming no SST files or memtables rely on the blobs in them.
// Note: we need to stop as soon as we find a blob file that has any
// linked SSTs (or one potentially referenced by memtables).
auto it = live_imm_non_ttl_blob_files_ . begin ( ) ;
while ( it ! = live_imm_non_ttl_blob_files_ . end ( ) ) {
const auto & blob_file = it - > second ;
assert ( blob_file ) ;
assert ( blob_file - > BlobFileNumber ( ) = = it - > first ) ;
assert ( ! blob_file - > HasTTL ( ) ) ;
assert ( blob_file - > Immutable ( ) ) ;
// Small optimization: Obsolete() does an atomic read, so we can do
// this check without taking a lock on the blob file's mutex.
if ( blob_file - > Obsolete ( ) ) {
it = live_imm_non_ttl_blob_files_ . erase ( it ) ;
continue ;
}
if ( ! mark_if_needed ( blob_file ) ) {
break ;
}
it = live_imm_non_ttl_blob_files_ . erase ( it ) ;
}
}
void BlobDBImpl : : MarkUnreferencedBlobFilesObsolete ( ) {
const SequenceNumber obsolete_seq = GetLatestSequenceNumber ( ) ;
MarkUnreferencedBlobFilesObsoleteImpl (
[ = ] ( const std : : shared_ptr < BlobFile > & blob_file ) {
WriteLock file_lock ( & blob_file - > mutex_ ) ;
return MarkBlobFileObsoleteIfNeeded ( blob_file , obsolete_seq ) ;
} ) ;
}
void BlobDBImpl : : MarkUnreferencedBlobFilesObsoleteDuringOpen ( ) {
MarkUnreferencedBlobFilesObsoleteImpl (
[ = ] ( const std : : shared_ptr < BlobFile > & blob_file ) {
return MarkBlobFileObsoleteIfNeeded ( blob_file , /* obsolete_seq */ 0 ) ;
} ) ;
}
void BlobDBImpl : : CloseRandomAccessLocked (
const std : : shared_ptr < BlobFile > & bfile ) {
bfile - > CloseRandomAccessLocked ( ) ;
@ -1041,11 +1142,12 @@ Status BlobDBImpl::CheckSizeAndEvictBlobFiles(uint64_t blob_size,
WriteLock file_lock ( & blob_file - > mutex_ ) ;
if ( blob_file - > Obsolete ( ) ) {
// File already obsoleted by someone else.
assert ( blob_file - > Immutable ( ) ) ;
continue ;
}
// FIFO eviction can evict open blob files.
if ( ! blob_file - > Immutable ( ) ) {
Status s = CloseBlobFile ( blob_file , false /*need_lock*/ ) ;
Status s = CloseBlobFile ( blob_file ) ;
if ( ! s . ok ( ) ) {
return s ;
}
@ -1380,8 +1482,16 @@ std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
ROCKS_LOG_INFO ( db_options_ . info_log , " Number of open files % " ROCKSDB_PRIszt ,
open_ttl_files_ . size ( ) ) ;
for ( auto bfile : open_ttl_files_ ) {
assert ( ! bfile - > Immutable ( ) ) ;
for ( const auto & blob_file : open_ttl_files_ ) {
( void ) blob_file ;
assert ( ! blob_file - > Immutable ( ) ) ;
}
for ( const auto & pair : live_imm_non_ttl_blob_files_ ) {
const auto & blob_file = pair . second ;
( void ) blob_file ;
assert ( ! blob_file - > HasTTL ( ) ) ;
assert ( blob_file - > Immutable ( ) ) ;
}
uint64_t now = EpochNow ( ) ;
@ -1423,58 +1533,75 @@ std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
return std : : make_pair ( true , - 1 ) ;
}
Status BlobDBImpl : : CloseBlobFile ( std : : shared_ptr < BlobFile > bfile ,
bool need_lock ) {
assert ( bfile ! = nullptr ) ;
Status BlobDBImpl : : CloseBlobFile ( std : : shared_ptr < BlobFile > bfile ) {
assert ( bfile ) ;
assert ( ! bfile - > Immutable ( ) ) ;
assert ( ! bfile - > Obsolete ( ) ) ;
write_mutex_ . AssertHeld ( ) ;
Status s ;
ROCKS_LOG_INFO ( db_options_ . info_log ,
" Closing blob file % " PRIu64 " . Path: %s " ,
bfile - > BlobFileNumber ( ) , bfile - > PathName ( ) . c_str ( ) ) ;
{
std : : unique_ptr < WriteLock > lock ;
if ( need_lock ) {
lock . reset ( new WriteLock ( & mutex_ ) ) ;
}
if ( bfile - > HasTTL ( ) ) {
size_t erased __attribute__ ( ( __unused__ ) ) ;
erased = open_ttl_files_ . erase ( bfile ) ;
} else if ( bfile = = open_non_ttl_file_ ) {
open_non_ttl_file_ = nullptr ;
}
}
const SequenceNumber sequence = GetLatestSequenceNumber ( ) ;
if ( ! bfile - > closed_ . load ( ) ) {
std : : unique_ptr < WriteLock > file_lock ;
if ( need_lock ) {
file_lock . reset ( new WriteLock ( & bfile - > mutex_ ) ) ;
}
s = bfile - > WriteFooterAndCloseLocked ( ) ;
}
const Status s = bfile - > WriteFooterAndCloseLocked ( sequence ) ;
if ( s . ok ( ) ) {
total_blob_size_ + = BlobLogFooter : : kSize ;
} else {
bfile - > MarkImmutable ( sequence ) ;
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" Failed to close blob file % " PRIu64 " with error: %s " ,
bfile - > BlobFileNumber ( ) , s . ToString ( ) . c_str ( ) ) ;
}
if ( bfile - > HasTTL ( ) ) {
size_t erased __attribute__ ( ( __unused__ ) ) ;
erased = open_ttl_files_ . erase ( bfile ) ;
} else {
if ( bfile = = open_non_ttl_file_ ) {
open_non_ttl_file_ = nullptr ;
}
const uint64_t blob_file_number = bfile - > BlobFileNumber ( ) ;
auto it = live_imm_non_ttl_blob_files_ . lower_bound ( blob_file_number ) ;
assert ( it = = live_imm_non_ttl_blob_files_ . end ( ) | |
it - > first ! = blob_file_number ) ;
live_imm_non_ttl_blob_files_ . insert (
it , std : : map < uint64_t , std : : shared_ptr < BlobFile > > : : value_type (
blob_file_number , bfile ) ) ;
}
return s ;
}
Status BlobDBImpl : : CloseBlobFileIfNeeded ( std : : shared_ptr < BlobFile > & bfile ) {
write_mutex_ . AssertHeld ( ) ;
// atomic read
if ( bfile - > GetFileSize ( ) < bdb_options_ . blob_file_size ) {
return Status : : OK ( ) ;
}
WriteLock lock ( & mutex_ ) ;
WriteLock file_lock ( & bfile - > mutex_ ) ;
assert ( ! bfile - > Obsolete ( ) | | bfile - > Immutable ( ) ) ;
if ( bfile - > Immutable ( ) ) {
return Status : : OK ( ) ;
}
return CloseBlobFile ( bfile ) ;
}
void BlobDBImpl : : ObsoleteBlobFile ( std : : shared_ptr < BlobFile > blob_file ,
SequenceNumber obsolete_seq ,
bool update_size ) {
assert ( blob_file - > Immutable ( ) ) ;
assert ( ! blob_file - > Obsolete ( ) ) ;
// Should hold write lock of mutex_ or during DB open.
blob_file - > MarkObsolete ( obsolete_seq ) ;
obsolete_files_ . push_back ( blob_file ) ;
@ -1545,15 +1672,23 @@ std::pair<bool, int64_t> BlobDBImpl::EvictExpiredFiles(bool aborted) {
SequenceNumber seq = GetLatestSequenceNumber ( ) ;
{
MutexLock l ( & write_mutex_ ) ;
WriteLock lock ( & mutex_ ) ;
for ( auto & blob_file : process_files ) {
WriteLock file_lock ( & blob_file - > mutex_ ) ;
if ( ! blob_file - > Immutable ( ) ) {
CloseBlobFile ( blob_file , false /*need_lock*/ ) ;
}
// Need to double check if the file is obsolete.
if ( ! blob_file - > Obsolete ( ) ) {
ObsoleteBlobFile ( blob_file , seq , true /*update_size*/ ) ;
if ( blob_file - > Obsolete ( ) ) {
assert ( blob_file - > Immutable ( ) ) ;
continue ;
}
if ( ! blob_file - > Immutable ( ) ) {
CloseBlobFile ( blob_file ) ;
}
assert ( blob_file - > Immutable ( ) ) ;
ObsoleteBlobFile ( blob_file , seq , true /*update_size*/ ) ;
}
}
@ -1918,6 +2053,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
if ( newfile ! = nullptr ) {
{
MutexLock l ( & write_mutex_ ) ;
WriteLock lock ( & mutex_ ) ;
WriteLock file_lock ( & newfile - > mutex_ ) ;
CloseBlobFile ( newfile ) ;
}
total_blob_size_ + = newfile - > file_size_ ;
@ -2092,9 +2229,14 @@ Status BlobDBImpl::TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
return GetBlobValue ( key , index_entry , value ) ;
}
void BlobDBImpl : : TEST_AddDummyBlobFile ( uint64_t blob_file_number ) {
blob_files_ [ blob_file_number ] = std : : make_shared < BlobFile > (
this , blob_dir_ , blob_file_number , db_options_ . info_log . get ( ) ) ;
void BlobDBImpl : : TEST_AddDummyBlobFile ( uint64_t blob_file_number ,
SequenceNumber immutable_sequence ) {
auto blob_file = std : : make_shared < BlobFile > ( this , blob_dir_ , blob_file_number ,
db_options_ . info_log . get ( ) ) ;
blob_file - > MarkImmutable ( immutable_sequence ) ;
blob_files_ [ blob_file_number ] = blob_file ;
live_imm_non_ttl_blob_files_ [ blob_file_number ] = blob_file ;
}
std : : vector < std : : shared_ptr < BlobFile > > BlobDBImpl : : TEST_GetBlobFiles ( ) const {
@ -2106,6 +2248,16 @@ std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
return blob_files ;
}
std : : vector < std : : shared_ptr < BlobFile > > BlobDBImpl : : TEST_GetLiveImmNonTTLFiles ( )
const {
ReadLock l ( & mutex_ ) ;
std : : vector < std : : shared_ptr < BlobFile > > live_imm_non_ttl_files ;
for ( const auto & pair : live_imm_non_ttl_blob_files_ ) {
live_imm_non_ttl_files . emplace_back ( pair . second ) ;
}
return live_imm_non_ttl_files ;
}
std : : vector < std : : shared_ptr < BlobFile > > BlobDBImpl : : TEST_GetObsoleteFiles ( )
const {
ReadLock l ( & mutex_ ) ;
@ -2122,6 +2274,9 @@ void BlobDBImpl::TEST_DeleteObsoleteFiles() {
Status BlobDBImpl : : TEST_CloseBlobFile ( std : : shared_ptr < BlobFile > & bfile ) {
MutexLock l ( & write_mutex_ ) ;
WriteLock lock ( & mutex_ ) ;
WriteLock file_lock ( & bfile - > mutex_ ) ;
return CloseBlobFile ( bfile ) ;
}