@ -1712,10 +1712,17 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
gcstats - > blob_count + + ;
bool del_this = false ;
bool reloc_this = false ;
// TODO(yiwu): The following logic should use GetForUpdate() from
// optimistic transaction to check if the key is current, otherwise
// there can be another writer sneak in between sequence number of
// and the deletion.
// this particular TTL has expired
if ( no_relocation_ttl | | ( has_ttl & & tt > record . GetTTL ( ) ) ) {
del_this = true ;
} else {
} else if ( ! first_gc ) {
SequenceNumber seq = kMaxSequenceNumber ;
bool found_record_for_key = false ;
SuperVersion * sv = db_impl_ - > GetAndRefSuperVersion ( cfd ) ;
@ -1726,8 +1733,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
}
Status s1 = db_impl_ - > GetLatestSequenceForKey (
sv , record . Key ( ) , false , & seq , & found_record_for_key ) ;
if ( s1 . IsNotFound ( ) | | ( ! found_record_for_key | | seq ! = record . GetSN ( ) ) ) {
del _this = true ;
if ( found_record_for_key & & seq = = record . GetSN ( ) ) {
reloc _this = true ;
}
db_impl_ - > ReturnAndCleanupSuperVersion ( cfd , sv ) ;
}
@ -1749,77 +1756,76 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
gcstats - > overrided_while_delete + + ;
}
delete txn ;
continue ;
} else if ( first_gc ) {
continue ;
}
if ( ! newfile ) {
// new file
std : : string reason ( " GC of " ) ;
reason + = bfptr - > PathName ( ) ;
newfile = NewBlobFile ( reason ) ;
gcstats - > newfile = newfile ;
new_writer = CheckOrCreateWriterLocked ( newfile ) ;
newfile - > header_ = std : : move ( header ) ;
// Can't use header beyond this point
newfile - > header_valid_ = true ;
newfile - > file_size_ = BlobLogHeader : : kHeaderSize ;
s = new_writer - > WriteHeader ( newfile - > header_ ) ;
if ( ! s . ok ( ) ) {
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" File: %s - header writing failed " ,
newfile - > PathName ( ) . c_str ( ) ) ;
return s ;
if ( reloc_this ) {
if ( ! newfile ) {
// new file
std : : string reason ( " GC of " ) ;
reason + = bfptr - > PathName ( ) ;
newfile = NewBlobFile ( reason ) ;
gcstats - > newfile = newfile ;
new_writer = CheckOrCreateWriterLocked ( newfile ) ;
newfile - > header_ = std : : move ( header ) ;
// Can't use header beyond this point
newfile - > header_valid_ = true ;
newfile - > file_size_ = BlobLogHeader : : kHeaderSize ;
s = new_writer - > WriteHeader ( newfile - > header_ ) ;
if ( ! s . ok ( ) ) {
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" File: %s - header writing failed " ,
newfile - > PathName ( ) . c_str ( ) ) ;
return s ;
}
WriteLock wl ( & mutex_ ) ;
dir_change_ . store ( true ) ;
blob_files_ . insert ( std : : make_pair ( newfile - > BlobFileNumber ( ) , newfile ) ) ;
}
WriteLock wl ( & mutex_ ) ;
gcstats - > num_relocs + + ;
std : : string index_entry ;
dir_change_ . store ( true ) ;
blob_files_ . insert ( std : : make_pair ( newfile - > BlobFileNumber ( ) , newfile ) ) ;
}
uint64_t blob_offset = 0 ;
uint64_t key_offset = 0 ;
// write the blob to the blob log.
s = new_writer - > AddRecord ( record . Key ( ) , record . Blob ( ) , & key_offset ,
& blob_offset , record . GetTTL ( ) ) ;
gcstats - > num_relocs + + ;
std : : string index_entry ;
BlobHandle handle ;
handle . set_filenumber ( newfile - > BlobFileNumber ( ) ) ;
handle . set_size ( record . Blob ( ) . size ( ) ) ;
handle . set_offset ( blob_offset ) ;
handle . set_compression ( bdb_options_ . compression ) ;
handle . EncodeTo ( & index_entry ) ;
uint64_t blob_offset = 0 ;
uint64_t key_offset = 0 ;
// write the blob to the blob log.
s = new_writer - > AddRecord ( record . Key ( ) , record . Blob ( ) , & key_offset ,
& blob_offset , record . GetTTL ( ) ) ;
new_writer - > AddRecordFooter ( record . GetSN ( ) ) ;
newfile - > blob_count_ + + ;
newfile - > file_size_ + = BlobLogRecord : : kHeaderSize + record . Key ( ) . size ( ) +
record . Blob ( ) . size ( ) + BlobLogRecord : : kFooterSize ;
BlobHandle handle ;
handle . set_filenumber ( newfile - > BlobFileNumber ( ) ) ;
handle . set_size ( record . Blob ( ) . size ( ) ) ;
handle . set_offset ( blob_offset ) ;
handle . set_compression ( bdb_options_ . compression ) ;
handle . EncodeTo ( & index_entry ) ;
new_writer - > AddRecordFooter ( record . GetSN ( ) ) ;
newfile - > blob_count_ + + ;
newfile - > file_size_ + = BlobLogRecord : : kHeaderSize + record . Key ( ) . size ( ) +
record . Blob ( ) . size ( ) + BlobLogRecord : : kFooterSize ;
Transaction * txn = opt_db_ - > BeginTransaction (
write_options_ , OptimisticTransactionOptions ( ) , nullptr ) ;
txn - > Put ( cfh , record . Key ( ) , index_entry ) ;
Status s1 = txn - > Commit ( ) ;
// chances that this Put will fail is low. If it fails, it would be because
// a new version of the key came in at this time, which will override
// the current version being iterated on.
if ( s1 . IsBusy ( ) ) {
ROCKS_LOG_INFO ( db_options_ . info_log ,
" Optimistic transaction failed: %s put bn: % " PRIu32 ,
bfptr - > PathName ( ) . c_str ( ) , gcstats - > blob_count ) ;
} else {
gcstats - > succ_relocs + + ;
ROCKS_LOG_DEBUG ( db_options_ . info_log ,
" Successfully added put back into LSM: %s bn: % " PRIu32 ,
bfptr - > PathName ( ) . c_str ( ) , gcstats - > blob_count ) ;
Transaction * txn = opt_db_ - > BeginTransaction (
write_options_ , OptimisticTransactionOptions ( ) , nullptr ) ;
txn - > Put ( cfh , record . Key ( ) , index_entry ) ;
Status s1 = txn - > Commit ( ) ;
// chances that this Put will fail is low. If it fails, it would be
// because a new version of the key came in at this time, which will
// override the current version being iterated on.
if ( s1 . IsBusy ( ) ) {
ROCKS_LOG_INFO ( db_options_ . info_log ,
" Optimistic transaction failed: %s put bn: % " PRIu32 ,
bfptr - > PathName ( ) . c_str ( ) , gcstats - > blob_count ) ;
} else {
gcstats - > succ_relocs + + ;
ROCKS_LOG_DEBUG ( db_options_ . info_log ,
" Successfully added put back into LSM: %s bn: % " PRIu32 ,
bfptr - > PathName ( ) . c_str ( ) , gcstats - > blob_count ) ;
}
delete txn ;
}
delete txn ;
}
if ( gcstats - > newfile ) total_blob_space_ + = newfile - > file_size_ ;