@ -62,7 +62,7 @@ bool blobf_compare_ttl::operator()(const std::shared_ptr<BlobFile>& lhs,
if ( lhs - > expiration_range_ . first > rhs - > expiration_range_ . first ) {
if ( lhs - > expiration_range_ . first > rhs - > expiration_range_ . first ) {
return false ;
return false ;
}
}
return lhs - > BlobFileNumber ( ) > rhs - > BlobFileNumber ( ) ;
return lhs - > BlobFileNumber ( ) < rhs - > BlobFileNumber ( ) ;
}
}
void EvictAllVersionsCompactionListener : : InternalListener : : OnCompaction (
void EvictAllVersionsCompactionListener : : InternalListener : : OnCompaction (
@ -117,7 +117,8 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
total_periods_ampl_ ( 0 ) ,
total_periods_ampl_ ( 0 ) ,
total_blob_space_ ( 0 ) ,
total_blob_space_ ( 0 ) ,
open_p1_done_ ( false ) ,
open_p1_done_ ( false ) ,
debug_level_ ( 0 ) {
debug_level_ ( 0 ) ,
oldest_file_evicted_ ( false ) {
blob_dir_ = ( bdb_options_ . path_relative )
blob_dir_ = ( bdb_options_ . path_relative )
? dbname + " / " + bdb_options_ . blob_dir
? dbname + " / " + bdb_options_ . blob_dir
: bdb_options_ . blob_dir ;
: bdb_options_ . blob_dir ;
@ -171,7 +172,8 @@ BlobDBImpl::BlobDBImpl(DB* db, const BlobDBOptions& blob_db_options)
last_period_ampl_ ( 0 ) ,
last_period_ampl_ ( 0 ) ,
total_periods_write_ ( 0 ) ,
total_periods_write_ ( 0 ) ,
total_periods_ampl_ ( 0 ) ,
total_periods_ampl_ ( 0 ) ,
total_blob_space_ ( 0 ) {
total_blob_space_ ( 0 ) ,
oldest_file_evicted_ ( false ) {
if ( ! bdb_options_ . blob_dir . empty ( ) )
if ( ! bdb_options_ . blob_dir . empty ( ) )
blob_dir_ = ( bdb_options_ . path_relative )
blob_dir_ = ( bdb_options_ . path_relative )
? db_ - > GetName ( ) + " / " + bdb_options_ . blob_dir
? db_ - > GetName ( ) + " / " + bdb_options_ . blob_dir
@ -931,20 +933,56 @@ uint64_t BlobDBImpl::ExtractExpiration(const Slice& key, const Slice& value,
return has_expiration ? expiration : kNoExpiration ;
return has_expiration ? expiration : kNoExpiration ;
}
}
std : : shared_ptr < BlobFile > BlobDBImpl : : GetOldestBlobFile ( ) {
std : : vector < std : : shared_ptr < BlobFile > > blob_files ;
CopyBlobFiles ( & blob_files , [ ] ( const std : : shared_ptr < BlobFile > & f ) {
return ! f - > Obsolete ( ) & & f - > Immutable ( ) ;
} ) ;
blobf_compare_ttl compare ;
return * std : : min_element ( blob_files . begin ( ) , blob_files . end ( ) , compare ) ;
}
bool BlobDBImpl : : EvictOldestBlobFile ( ) {
auto oldest_file = GetOldestBlobFile ( ) ;
if ( oldest_file = = nullptr ) {
return false ;
}
WriteLock wl ( & mutex_ ) ;
oldest_file - > SetCanBeDeleted ( ) ;
obsolete_files_ . push_front ( oldest_file ) ;
oldest_file_evicted_ . store ( true ) ;
return true ;
}
Status BlobDBImpl : : CheckSize ( size_t blob_size ) {
uint64_t new_space_util = total_blob_space_ . load ( ) + blob_size ;
if ( bdb_options_ . blob_dir_size > 0 ) {
if ( ! bdb_options_ . is_fifo & &
( new_space_util > bdb_options_ . blob_dir_size ) ) {
return Status : : NoSpace (
" Write failed, as writing it would exceed blob_dir_size limit. " ) ;
}
if ( bdb_options_ . is_fifo & & ! oldest_file_evicted_ . load ( ) & &
( new_space_util >
kEvictOldestFileAtSize * bdb_options_ . blob_dir_size ) ) {
EvictOldestBlobFile ( ) ;
}
}
return Status : : OK ( ) ;
}
Status BlobDBImpl : : AppendBlob ( const std : : shared_ptr < BlobFile > & bfile ,
Status BlobDBImpl : : AppendBlob ( const std : : shared_ptr < BlobFile > & bfile ,
const std : : string & headerbuf , const Slice & key ,
const std : : string & headerbuf , const Slice & key ,
const Slice & value , uint64_t expiration ,
const Slice & value , uint64_t expiration ,
std : : string * index_entry ) {
std : : string * index_entry ) {
auto size_put = BlobLogRecord : : kHeaderSize + key . size ( ) + value . size ( ) ;
auto size_put = BlobLogRecord : : kHeaderSize + key . size ( ) + value . size ( ) ;
if ( bdb_options_ . blob_dir_size > 0 & &
Status s = CheckSize ( size_put ) ;
( total_blob_space_ . load ( ) + size_put ) > bdb_options_ . blob_dir_size ) {
if ( ! s . ok ( ) ) {
if ( ! bdb_options_ . is_fifo ) {
return s ;
return Status : : NoSpace ( " Blob DB reached the maximum configured size. " ) ;
}
}
}
Status s ;
uint64_t blob_offset = 0 ;
uint64_t blob_offset = 0 ;
uint64_t key_offset = 0 ;
uint64_t key_offset = 0 ;
{
{
@ -1910,7 +1948,12 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
}
}
// directory change. Fsync
// directory change. Fsync
if ( file_deleted ) dir_ent_ - > Fsync ( ) ;
if ( file_deleted ) {
dir_ent_ - > Fsync ( ) ;
// reset oldest_file_evicted flag
oldest_file_evicted_ . store ( false ) ;
}
// put files back into obsolete if for some reason, delete failed
// put files back into obsolete if for some reason, delete failed
if ( ! tobsolete . empty ( ) ) {
if ( ! tobsolete . empty ( ) ) {
@ -1924,14 +1967,19 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
}
}
void BlobDBImpl : : CopyBlobFiles (
void BlobDBImpl : : CopyBlobFiles (
std : : vector < std : : shared_ptr < BlobFile > > * bfiles_copy ) {
std : : vector < std : : shared_ptr < BlobFile > > * bfiles_copy ,
std : : function < bool ( const std : : shared_ptr < BlobFile > & ) > predicate ) {
ReadLock rl ( & mutex_ ) ;
ReadLock rl ( & mutex_ ) ;
// take a copy
bfiles_copy - > reserve ( blob_files_ . size ( ) ) ;
for ( auto const & p : blob_files_ ) {
for ( auto const & p : blob_files_ ) {
bool pred_value = true ;
if ( predicate ) {
pred_value = predicate ( p . second ) ;
}
if ( pred_value ) {
bfiles_copy - > push_back ( p . second ) ;
bfiles_copy - > push_back ( p . second ) ;
}
}
}
}
}
void BlobDBImpl : : FilterSubsetOfFiles (
void BlobDBImpl : : FilterSubsetOfFiles (