@ -413,7 +413,7 @@ Status BlobDBImpl::OpenAllFiles() {
expiration_range . second ) ;
}
} else {
open_blob _files_ . insert ( bfptr ) ;
open_ttl _files_ . insert ( bfptr ) ;
}
}
}
@ -493,23 +493,23 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
std : : shared_ptr < BlobFile > BlobDBImpl : : FindBlobFileLocked (
uint64_t expiration ) const {
if ( open_blob _files_ . empty ( ) ) return nullptr ;
if ( open_ttl _files_ . empty ( ) ) return nullptr ;
std : : shared_ptr < BlobFile > tmp = std : : make_shared < BlobFile > ( ) ;
tmp - > expiration_range_ = std : : make_pair ( expiration , 0 ) ;
auto citr = open_blob _files_ . equal_range ( tmp ) ;
if ( citr . first = = open_blob _files_ . end ( ) ) {
assert ( citr . second = = open_blob _files_ . end ( ) ) ;
auto citr = open_ttl _files_ . equal_range ( tmp ) ;
if ( citr . first = = open_ttl _files_ . end ( ) ) {
assert ( citr . second = = open_ttl _files_ . end ( ) ) ;
std : : shared_ptr < BlobFile > check = * ( open_blob _files_ . rbegin ( ) ) ;
std : : shared_ptr < BlobFile > check = * ( open_ttl _files_ . rbegin ( ) ) ;
return ( check - > expiration_range_ . second < expiration ) ? nullptr : check ;
}
if ( citr . first ! = citr . second ) return * ( citr . first ) ;
auto finditr = citr . second ;
if ( finditr ! = open_blob _files_ . begin ( ) ) - - finditr ;
if ( finditr ! = open_ttl _files_ . begin ( ) ) - - finditr ;
bool b2 = ( * finditr ) - > expiration_range_ . second < expiration ;
bool b1 = ( * finditr ) - > expiration_range_ . first > expiration ;
@ -530,11 +530,17 @@ std::shared_ptr<Writer> BlobDBImpl::CheckOrCreateWriterLocked(
}
std : : shared_ptr < BlobFile > BlobDBImpl : : SelectBlobFile ( ) {
uint32_t val = blob_rgen . Next ( ) ;
{
ReadLock rl ( & mutex_ ) ;
if ( open_simple_files_ . size ( ) = = bdb_options_ . num_concurrent_simple_blobs )
return open_simple_files_ [ val % bdb_options_ . num_concurrent_simple_blobs ] ;
if ( open_non_ttl_file_ ! = nullptr ) {
return open_non_ttl_file_ ;
}
}
// CHECK again
WriteLock wl ( & mutex_ ) ;
if ( open_non_ttl_file_ ! = nullptr ) {
return open_non_ttl_file_ ;
}
std : : shared_ptr < BlobFile > bfile = NewBlobFile ( " SelectBlobFile " ) ;
@ -557,12 +563,6 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFile() {
bfile - > header_valid_ = true ;
bfile - > SetHasTTL ( false ) ;
// CHECK again
WriteLock wl ( & mutex_ ) ;
if ( open_simple_files_ . size ( ) = = bdb_options_ . num_concurrent_simple_blobs ) {
return open_simple_files_ [ val % bdb_options_ . num_concurrent_simple_blobs ] ;
}
Status s = writer - > WriteHeader ( bfile - > header_ ) ;
if ( ! s . ok ( ) ) {
ROCKS_LOG_ERROR ( db_options_ . info_log ,
@ -574,7 +574,7 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFile() {
dir_change_ . store ( true ) ;
blob_files_ . insert ( std : : make_pair ( bfile - > BlobFileNumber ( ) , bfile ) ) ;
open_simple_files_ . push_back ( bfile ) ;
open_non_ttl_file_ = bfile ;
return bfile ;
}
@ -625,7 +625,7 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) {
bfile - > file_size_ = BlobLogHeader : : kSize ;
// set the first value of the range, since that is
// concrete at this time. also necessary to add to open_blob _files_
// concrete at this time. also necessary to add to open_ttl _files_
bfile - > expiration_range_ = expiration_range ;
WriteLock wl ( & mutex_ ) ;
@ -647,7 +647,7 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) {
dir_change_ . store ( true ) ;
blob_files_ . insert ( std : : make_pair ( bfile - > BlobFileNumber ( ) , bfile ) ) ;
open_blob _files_ . insert ( bfile ) ;
open_ttl _files_ . insert ( bfile ) ;
epoch_of_ + + ;
return bfile ;
@ -1192,9 +1192,9 @@ std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
blob_files_ . size ( ) ) ;
ROCKS_LOG_INFO ( db_options_ . info_log , " Number of open files % " PRIu64 ,
open_blob _files_ . size ( ) ) ;
open_ttl _files_ . size ( ) ) ;
for ( auto bfile : open_blob _files_ ) {
for ( auto bfile : open_ttl _files_ ) {
assert ( ! bfile - > Immutable ( ) ) ;
}
@ -1215,6 +1215,7 @@ std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
}
Status BlobDBImpl : : CloseBlobFile ( std : : shared_ptr < BlobFile > bfile ) {
assert ( bfile ! = nullptr ) ;
Status s ;
ROCKS_LOG_INFO ( db_options_ . info_log , " Close blob file % " PRIu64 ,
bfile - > BlobFileNumber ( ) ) ;
@ -1223,13 +1224,11 @@ Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile) {
if ( bfile - > HasTTL ( ) ) {
size_t erased __attribute__ ( ( __unused__ ) ) ;
erased = open_blob _files_ . erase ( bfile ) ;
erased = open_ttl _files_ . erase ( bfile ) ;
assert ( erased = = 1 ) ;
} else {
auto iter = std : : find ( open_simple_files_ . begin ( ) ,
open_simple_files_ . end ( ) , bfile ) ;
assert ( iter ! = open_simple_files_ . end ( ) ) ;
open_simple_files_ . erase ( iter ) ;
assert ( bfile = = open_non_ttl_file_ ) ;
open_non_ttl_file_ = nullptr ;
}
}
@ -1412,7 +1411,7 @@ std::pair<bool, int64_t> BlobDBImpl::CheckSeqFiles(bool aborted) {
uint64_t epoch_now = EpochNow ( ) ;
ReadLock rl ( & mutex_ ) ;
for ( auto bfile : open_blob _files_ ) {
for ( auto bfile : open_ttl _files_ ) {
{
ReadLock lockbfile_r ( & bfile - > mutex_ ) ;
@ -1437,14 +1436,14 @@ std::pair<bool, int64_t> BlobDBImpl::FsyncFiles(bool aborted) {
std : : vector < std : : shared_ptr < BlobFile > > process_files ;
{
ReadLock rl ( & mutex_ ) ;
for ( auto fitr : open_blob _files_ ) {
for ( auto fitr : open_ttl _files_ ) {
if ( fitr - > NeedsFsync ( true , bdb_options_ . bytes_per_sync ) )
process_files . push_back ( fitr ) ;
}
for ( auto fitr : open_simple_files_ ) {
if ( fitr - > NeedsFsync ( true , bdb_options_ . bytes_per_sync ) )
process_files . push_back ( fitr ) ;
if ( open_non_ttl_file_ ! = nullptr & &
open_non_ttl_file_ - > NeedsFsync ( true , bdb_options_ . bytes_per_sync ) ) {
process_files . push_back ( open_non_ttl_file_ ) ;
}
}
@ -1800,7 +1799,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
// but under the asusmption that this is only called when a
// file is Immutable, we can reduce the critical section
bool BlobDBImpl : : ShouldGCFile ( std : : shared_ptr < BlobFile > bfile , uint64_t now ,
bool is_oldest_simple_blob _file ,
bool is_oldest_non_ttl _file ,
std : : string * reason ) {
if ( bfile - > HasTTL ( ) ) {
ExpirationRange expiration_range = bfile - > GetExpirationRange ( ) ;
@ -1858,7 +1857,7 @@ bool BlobDBImpl::ShouldGCFile(std::shared_ptr<BlobFile> bfile, uint64_t now,
return false ;
}
if ( is_oldest_simple_blob _file ) {
if ( is_oldest_non_ttl _file ) {
* reason = " out of space and is the oldest simple blob file " ;
return true ;
}
@ -1924,72 +1923,6 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
return std : : make_pair ( ! aborted , - 1 ) ;
}
bool BlobDBImpl : : CallbackEvictsImpl ( std : : shared_ptr < BlobFile > bfile ) {
std : : shared_ptr < Reader > reader =
bfile - > OpenSequentialReader ( env_ , db_options_ , env_options_ ) ;
if ( ! reader ) {
ROCKS_LOG_ERROR (
db_options_ . info_log ,
" File sequential reader could not be opened for evict callback: %s " ,
bfile - > PathName ( ) . c_str ( ) ) ;
return false ;
}
ReadLock lockbfile_r ( & bfile - > mutex_ ) ;
BlobLogHeader header ;
Status s = reader - > ReadHeader ( & header ) ;
if ( ! s . ok ( ) ) {
ROCKS_LOG_ERROR (
db_options_ . info_log ,
" Failure to read header for blob-file during evict callback %s " ,
bfile - > PathName ( ) . c_str ( ) ) ;
return false ;
}
ColumnFamilyHandle * cfh =
db_impl_ - > GetColumnFamilyHandleUnlocked ( bfile - > column_family_id ( ) ) ;
BlobLogRecord record ;
Reader : : ReadLevel full = Reader : : kReadHeaderKeyBlob ;
while ( reader - > ReadRecord ( & record , full ) . ok ( ) ) {
bdb_options_ . gc_evict_cb_fn ( cfh , record . key , record . value ) ;
}
return true ;
}
std : : pair < bool , int64_t > BlobDBImpl : : RemoveTimerQ ( TimerQueue * tq ,
bool aborted ) {
WriteLock wl ( & mutex_ ) ;
for ( auto itr = cb_threads_ . begin ( ) ; itr ! = cb_threads_ . end ( ) ; + + itr ) {
if ( ( * itr ) . get ( ) ! = tq ) continue ;
cb_threads_ . erase ( itr ) ;
break ;
}
return std : : make_pair ( false , - 1 ) ;
}
std : : pair < bool , int64_t > BlobDBImpl : : CallbackEvicts (
TimerQueue * tq , std : : shared_ptr < BlobFile > bfile , bool aborted ) {
if ( aborted ) return std : : make_pair ( false , - 1 ) ;
bool succ = CallbackEvictsImpl ( bfile ) ;
if ( succ ) {
ROCKS_LOG_DEBUG ( db_options_ . info_log , " Eviction callbacks completed %s " ,
bfile - > PathName ( ) . c_str ( ) ) ;
}
WriteLock wl ( & mutex_ ) ;
bfile - > SetCanBeDeleted ( ) ;
obsolete_files_ . push_front ( bfile ) ;
if ( tq ) {
// all of the callbacks have been processed
tqueue_ . add ( 0 , std : : bind ( & BlobDBImpl : : RemoveTimerQ , this , tq ,
std : : placeholders : : _1 ) ) ;
}
return std : : make_pair ( false , - 1 ) ;
}
void BlobDBImpl : : CopyBlobFiles (
std : : vector < std : : shared_ptr < BlobFile > > * bfiles_copy ) {
ReadLock rl ( & mutex_ ) ;
@ -2011,7 +1944,7 @@ void BlobDBImpl::FilterSubsetOfFiles(
uint64_t now = EpochNow ( ) ;
size_t files_processed = 0 ;
bool simple_blob _file_found = false ;
bool non_ttl _file_found = false ;
for ( auto bfile : blob_files ) {
if ( files_processed > = files_to_collect ) break ;
// if this is the first time processing the file
@ -2031,15 +1964,14 @@ void BlobDBImpl::FilterSubsetOfFiles(
// then it should not be GC'd
if ( bfile - > Obsolete ( ) | | ! bfile - > Immutable ( ) ) continue ;
bool is_oldest_simple_blob _file = false ;
if ( ! simple_blob _file_found & & ! bfile - > HasTTL ( ) ) {
is_oldest_simple_blob _file = true ;
simple_blob _file_found = true ;
bool is_oldest_non_ttl _file = false ;
if ( ! non_ttl _file_found & & ! bfile - > HasTTL ( ) ) {
is_oldest_non_ttl _file = true ;
non_ttl _file_found = true ;
}
std : : string reason ;
bool shouldgc =
ShouldGCFile ( bfile , now , is_oldest_simple_blob_file , & reason ) ;
bool shouldgc = ShouldGCFile ( bfile , now , is_oldest_non_ttl_file , & reason ) ;
if ( ! shouldgc ) {
ROCKS_LOG_DEBUG ( db_options_ . info_log ,
" File has been skipped for GC ttl %s % " PRIu64 " % " PRIu64
@ -2097,25 +2029,11 @@ std::pair<bool, int64_t> BlobDBImpl::RunGC(bool aborted) {
}
if ( ! obsoletes . empty ( ) ) {
bool evict_cb = ( ! ! bdb_options_ . gc_evict_cb_fn ) ;
std : : shared_ptr < TimerQueue > tq ;
if ( evict_cb ) tq = std : : make_shared < TimerQueue > ( ) ;
// if evict callback is present, first schedule the callback thread
WriteLock wl ( & mutex_ ) ;
for ( auto bfile : obsoletes ) {
bool last_file = ( bfile = = obsoletes . back ( ) ) ;
if ( ! evict_cb ) {
bfile - > SetCanBeDeleted ( ) ;
obsolete_files_ . push_front ( bfile ) ;
} else {
tq - > add ( 0 , std : : bind ( & BlobDBImpl : : CallbackEvicts , this ,
( last_file ) ? tq . get ( ) : nullptr , bfile ,
std : : placeholders : : _1 ) ) ;
}
bfile - > SetCanBeDeleted ( ) ;
obsolete_files_ . push_front ( bfile ) ;
}
if ( evict_cb ) cb_threads_ . emplace_back ( tq ) ;
}
// reschedule