@ -37,16 +37,6 @@
namespace {
int kBlockBasedTableVersionFormat = 2 ;
void extendTTL ( rocksdb : : blob_db : : ttlrange_t * ttl_range , uint64_t ttl ) {
ttl_range - > first = std : : min ( ttl_range - > first , ttl ) ;
ttl_range - > second = std : : max ( ttl_range - > second , ttl ) ;
}
void extendTimestamps ( rocksdb : : blob_db : : tsrange_t * ts_range , uint64_t ts ) {
ts_range - > first = std : : min ( ts_range - > first , ts ) ;
ts_range - > second = std : : max ( ts_range - > second , ts ) ;
}
} // end namespace
namespace rocksdb {
@ -66,10 +56,12 @@ WalFilter::WalProcessingOption BlobReconcileWalFilter::LogRecordFound(
bool blobf_compare_ttl : : operator ( ) ( const std : : shared_ptr < BlobFile > & lhs ,
const std : : shared_ptr < BlobFile > & rhs ) const {
if ( lhs - > ttl_range_ . first < rhs - > ttl_range_ . first ) return true ;
if ( lhs - > ttl_range_ . first > rhs - > ttl_range_ . first ) return false ;
if ( lhs - > expiration_range_ . first < rhs - > expiration_range_ . first ) {
return true ;
}
if ( lhs - > expiration_range_ . first > rhs - > expiration_range_ . first ) {
return false ;
}
return lhs - > BlobFileNumber ( ) > rhs - > BlobFileNumber ( ) ;
}
@ -332,6 +324,7 @@ Status BlobDBImpl::OpenAllFiles() {
bfpath . c_str ( ) , s1 . ToString ( ) . c_str ( ) , size_bytes ) ;
continue ;
}
bfptr - > SetHasTTL ( bfptr - > header_ . has_ttl ) ;
bfptr - > header_valid_ = true ;
std : : shared_ptr < RandomAccessFileReader > ra_reader =
@ -355,9 +348,7 @@ Status BlobDBImpl::OpenAllFiles() {
" File found incomplete (w/o footer) %s " , bfpath . c_str ( ) ) ;
// sequentially iterate over the file and read all the records
ttlrange_t ttl_range ( std : : numeric_limits < uint32_t > : : max ( ) ,
std : : numeric_limits < uint32_t > : : min ( ) ) ;
tsrange_t ts_range ( std : : numeric_limits < uint32_t > : : max ( ) ,
ExpirationRange expiration_range ( std : : numeric_limits < uint32_t > : : max ( ) ,
std : : numeric_limits < uint32_t > : : min ( ) ) ;
uint64_t blob_count = 0 ;
@ -369,10 +360,10 @@ Status BlobDBImpl::OpenAllFiles() {
while ( reader - > ReadRecord ( & record , shallow ) . ok ( ) ) {
+ + blob_count ;
if ( bfptr - > HasTTL ( ) ) {
extendTTL ( & ttl_range , record . GetTTL ( ) ) ;
}
if ( bfptr - > HasTimestamp ( ) ) {
extendTimestamps ( & ts_range , record . GetTimeVal ( ) ) ;
expiration_range . first =
std : : min ( expiration_range . first , record . expiration ) ;
expiration_range . second =
std : : max ( expiration_range . second , record . expiration ) ;
}
record_start = reader - > GetNextByte ( ) ;
}
@ -391,24 +382,21 @@ Status BlobDBImpl::OpenAllFiles() {
}
bfptr - > SetBlobCount ( blob_count ) ;
bfptr - > SetSNRange ( { 0 , 0 } ) ;
if ( bfptr - > HasTimestamp ( ) ) bfptr - > set_time_range ( ts_range ) ;
bfptr - > SetSequenceRange ( { 0 , 0 } ) ;
ROCKS_LOG_INFO ( db_options_ . info_log ,
" Blob File: %s blob_count: % " PRIu64
" size_bytes: % " PRIu64 " ts: %d ttl: %d " ,
bfpath . c_str ( ) , blob_count , size_bytes ,
bfptr - > HasTimestamp ( ) , bfptr - > HasTTL ( ) ) ;
" size_bytes: % " PRIu64 " has_ttl: %d " ,
bfpath . c_str ( ) , blob_count , size_bytes , bfptr - > HasTTL ( ) ) ;
if ( bfptr - > HasTTL ( ) ) {
ttl _range. second =
std : : max ( ttl _range. second ,
ttl _range . first + ( uint32_t ) bdb_options_ . ttl_range_secs ) ;
bfptr - > set_ttl_range ( ttl _range ) ;
expiration _range. second = std : : max (
expiration _range. second ,
expiration _range. first + ( uint32_t ) bdb_options_ . ttl_range_secs ) ;
bfptr - > set_expiration_range ( expiration _range ) ;
uint64_t now = EpochNow ( ) ;
if ( ttl _range. second < now ) {
if ( expiration _range. second < now ) {
Status fstatus = CreateWriterLocked ( bfptr ) ;
if ( fstatus . ok ( ) ) fstatus = bfptr - > WriteFooterAndCloseLocked ( ) ;
if ( ! fstatus . ok ( ) ) {
@ -418,10 +406,11 @@ Status BlobDBImpl::OpenAllFiles() {
bfpath . c_str ( ) , fstatus . ToString ( ) . c_str ( ) ) ;
continue ;
} else {
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" Blob File Closed: %s now: %d ttl_range: (%d, %d) " ,
bfpath . c_str ( ) , now , ttl_range . first ,
ttl_range . second ) ;
ROCKS_LOG_ERROR (
db_options_ . info_log ,
" Blob File Closed: %s now: %d expiration_range: (%d, %d) " ,
bfpath . c_str ( ) , now , expiration_range . first ,
expiration_range . second ) ;
}
} else {
open_blob_files_ . insert ( bfptr ) ;
@ -483,9 +472,9 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
}
Writer : : ElemType et = Writer : : kEtNone ;
if ( bfile - > file_size_ = = BlobLogHeader : : kHeader Size ) {
if ( bfile - > file_size_ = = BlobLogHeader : : kSize ) {
et = Writer : : kEtFileHdr ;
} else if ( bfile - > file_size_ > BlobLogHeader : : kHeader Size ) {
} else if ( bfile - > file_size_ > BlobLogHeader : : kSize ) {
et = Writer : : kEtRecord ;
} else if ( bfile - > file_size_ ) {
ROCKS_LOG_WARN ( db_options_ . info_log ,
@ -507,14 +496,14 @@ std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked(
if ( open_blob_files_ . empty ( ) ) return nullptr ;
std : : shared_ptr < BlobFile > tmp = std : : make_shared < BlobFile > ( ) ;
tmp - > ttl _range_ = std : : make_pair ( expiration , 0 ) ;
tmp - > expiration _range_ = std : : make_pair ( expiration , 0 ) ;
auto citr = open_blob_files_ . equal_range ( tmp ) ;
if ( citr . first = = open_blob_files_ . end ( ) ) {
assert ( citr . second = = open_blob_files_ . end ( ) ) ;
std : : shared_ptr < BlobFile > check = * ( open_blob_files_ . rbegin ( ) ) ;
return ( check - > ttl _range_. second < expiration ) ? nullptr : check ;
return ( check - > expiration _range_. second < expiration ) ? nullptr : check ;
}
if ( citr . first ! = citr . second ) return * ( citr . first ) ;
@ -522,8 +511,8 @@ std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked(
auto finditr = citr . second ;
if ( finditr ! = open_blob_files_ . begin ( ) ) - - finditr ;
bool b2 = ( * finditr ) - > ttl _range_. second < expiration ;
bool b1 = ( * finditr ) - > ttl _range_. first > expiration ;
bool b2 = ( * finditr ) - > expiration _range_. second < expiration ;
bool b1 = ( * finditr ) - > expiration _range_. first > expiration ;
return ( b1 | | b2 ) ? nullptr : ( * finditr ) ;
}
@ -560,9 +549,11 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFile() {
return nullptr ;
}
bfile - > file_size_ = BlobLogHeader : : kHeaderSize ;
bfile - > header_ . compression_ = bdb_options_ . compression ;
bfile - > file_size_ = BlobLogHeader : : kSize ;
bfile - > header_ . compression = bdb_options_ . compression ;
bfile - > header_ . has_ttl = false ;
bfile - > header_valid_ = true ;
bfile - > SetHasTTL ( false ) ;
// CHECK again
WriteLock wl ( & mutex_ ) ;
@ -603,7 +594,7 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) {
uint64_t exp_low =
( expiration / bdb_options_ . ttl_range_secs ) * bdb_options_ . ttl_range_secs ;
uint64_t exp_high = exp_low + bdb_options_ . ttl_range_secs ;
ttlrange_t ttl_guess = std : : make_pair ( exp_low , exp_high ) ;
ExpirationRange expiration_range = std : : make_pair ( exp_low , exp_high ) ;
bfile = NewBlobFile ( " SelectBlobFileTTL " ) ;
assert ( bfile ) ;
@ -621,14 +612,16 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) {
return nullptr ;
}
bfile - > header_ . set_ttl_guess ( ttl_guess ) ;
bfile - > header_ . compression_ = bdb_options_ . compression ;
bfile - > header_ . expiration_range = expiration_range ;
bfile - > header_ . compression = bdb_options_ . compression ;
bfile - > header_ . has_ttl = true ;
bfile - > header_valid_ = true ;
bfile - > file_size_ = BlobLogHeader : : kHeaderSize ;
bfile - > SetHasTTL ( true ) ;
bfile - > file_size_ = BlobLogHeader : : kSize ;
// set the first value of the range, since that is
// concrete at this time. also necessary to add to open_blob_files_
bfile - > ttl_range_ = ttl_guess ;
bfile - > expiration_range_ = expiration_range ;
WriteLock wl ( & mutex_ ) ;
// in case the epoch has shifted in the interim, then check
@ -878,8 +871,7 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key,
Slice value_compressed = GetCompressedSlice ( value , & compression_output ) ;
std : : string headerbuf ;
Writer : : ConstructBlobHeader ( & headerbuf , key , value_compressed , expiration ,
- 1 ) ;
Writer : : ConstructBlobHeader ( & headerbuf , key , value_compressed , expiration ) ;
s = AppendBlob ( bfile , headerbuf , key , value_compressed , expiration ,
& index_entry ) ;
@ -887,7 +879,7 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key,
if ( s . ok ( ) ) {
bfile - > ExtendSequenceRange ( sequence ) ;
if ( expiration ! = kNoExpiration ) {
extendTTL ( & ( bfile - > ttl_range_ ) , expiration ) ;
bfile - > ExtendExpirationRange ( expiration ) ;
}
s = CloseBlobFileIfNeeded ( bfile ) ;
if ( s . ok ( ) ) {
@ -1045,7 +1037,7 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
// later from the Blob Header, which needs to be also a
// valid offset.
if ( blob_index . offset ( ) <
( BlobLogHeader : : kHeader Size + BlobLogRecord : : kHeaderSize + key . size ( ) ) ) {
( BlobLogHeader : : kSize + BlobLogRecord : : kHeaderSize + key . size ( ) ) ) {
if ( debug_level_ > = 2 ) {
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" Invalid blob index file_number: % " PRIu64
@ -1085,7 +1077,9 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
valueptr = & value_c ;
}
// allocate the buffer. This is safe in C++11
// Allocate the buffer. This is safe in C++11
// Note that std::string::reserved() does not work, since previous value
// of the buffer can be larger than blob_index.size().
valueptr - > resize ( blob_index . size ( ) ) ;
char * buffer = & ( * valueptr ) [ 0 ] ;
@ -1103,6 +1097,7 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
return Status : : NotFound ( " Blob Not Found as couldnt retrieve Blob " ) ;
}
// TODO(yiwu): Add an option to skip crc checking.
Slice crc_slice ;
uint32_t crc_exp ;
std : : string crc_str ;
@ -1121,7 +1116,8 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
return Status : : NotFound ( " Blob Not Found as couldnt retrieve CRC " ) ;
}
uint32_t crc = crc32c : : Extend ( 0 , blob_value . data ( ) , blob_value . size ( ) ) ;
uint32_t crc = crc32c : : Value ( key . data ( ) , key . size ( ) ) ;
crc = crc32c : : Extend ( crc , blob_value . data ( ) , blob_value . size ( ) ) ;
crc = crc32c : : Mask ( crc ) ; // Adjust for storage
if ( crc ! = crc_exp ) {
if ( debug_level_ > = 2 ) {
@ -1134,6 +1130,8 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
return Status : : Corruption ( " Corruption. Blob CRC mismatch " ) ;
}
// TODO(yiwu): Should use compression flag in the blob file instead of
// current compression option.
if ( bdb_options_ . compression ! = kNoCompression ) {
BlockContents contents ;
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( DefaultColumnFamily ( ) ) ;
@ -1204,7 +1202,7 @@ std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
" Blob File %s % " PRIu64 " % " PRIu64 " % " PRIu64 " % " PRIu64 " % " PRIu64 ,
bfile - > PathName ( ) . c_str ( ) , bfile - > GetFileSize ( ) , bfile - > BlobCount ( ) ,
bfile - > deleted_count_ , bfile - > deleted_size_ ,
( bfile - > ttl _range_. second - epoch_now ) ) ;
( bfile - > expiration _range_. second - epoch_now ) ) ;
}
// reschedule
@ -1256,7 +1254,7 @@ bool BlobDBImpl::FileDeleteOk_SnapshotCheckLocked(
const std : : shared_ptr < BlobFile > & bfile ) {
assert ( bfile - > Obsolete ( ) ) ;
SequenceNumber esn = bfile - > GetSN Range ( ) . first ;
SequenceNumber esn = bfile - > GetSequence Range ( ) . first ;
// TODO(yiwu): Here we should check instead if there is an active snapshot
// lies between the first sequence in the file, and the last sequence by
@ -1413,7 +1411,7 @@ std::pair<bool, int64_t> BlobDBImpl::CheckSeqFiles(bool aborted) {
{
ReadLock lockbfile_r ( & bfile - > mutex_ ) ;
if ( bfile - > ttl _range_. second > epoch_now ) continue ;
if ( bfile - > expiration _range_. second > epoch_now ) continue ;
process_files . push_back ( bfile ) ;
}
}
@ -1587,22 +1585,24 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
bool first_gc = bfptr - > gc_once_after_open_ ;
auto * cfh = bfptr - > GetColumnFamily ( db_ ) ;
auto * cfh =
db_impl_ - > GetColumnFamilyHandleUnlocked ( bfptr - > column_family_id ( ) ) ;
auto * cfd = reinterpret_cast < ColumnFamilyHandleImpl * > ( cfh ) - > cfd ( ) ;
auto column_family_id = cfd - > GetID ( ) ;
bool has_ttl = header . HasTTL ( ) ;
bool has_ttl = header . has_ttl ;
// this reads the key but skips the blob
Reader : : ReadLevel shallow = Reader : : kReadHeaderKey ;
bool no_relocation_ttl = ( has_ttl & & now > = bfptr - > GetTTLRange ( ) . second ) ;
bool no_relocation_ttl =
( has_ttl & & now > = bfptr - > GetExpirationRange ( ) . second ) ;
bool no_relocation_lsmdel = false ;
{
ReadLock lockbfile_r ( & bfptr - > mutex_ ) ;
no_relocation_lsmdel = ( bfptr - > GetFileSize ( ) = =
( BlobLogHeader : : kHeaderSize + bfptr - > deleted_size_ +
BlobLogFooter : : kFooter Size ) ) ;
no_relocation_lsmdel =
( bfptr - > GetFileSize ( ) = =
( BlobLogHeader : : kSize + bfptr - > deleted_size_ + BlobLogFooter : : kSize ) ) ;
}
bool no_relocation = no_relocation_ttl | | no_relocation_lsmdel ;
@ -1641,7 +1641,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
bool is_blob_index = false ;
PinnableSlice index_entry ;
Status get_status = db_impl_ - > GetImpl (
ReadOptions ( ) , cfh , record . Key ( ) , & index_entry , nullptr /*value_found*/ ,
ReadOptions ( ) , cfh , record . key , & index_entry , nullptr /*value_found*/ ,
nullptr /*read_callback*/ , & is_blob_index ) ;
TEST_SYNC_POINT ( " BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB " ) ;
if ( ! get_status . ok ( ) & & ! get_status . ok ( ) ) {
@ -1672,15 +1672,15 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
continue ;
}
GarbageCollectionWriteCallback callback ( cfd , record . Key ( ) , latest_seq ) ;
GarbageCollectionWriteCallback callback ( cfd , record . key , latest_seq ) ;
// If key has expired, remove it from base DB.
if ( no_relocation_ttl | | ( has_ttl & & now > = record . GetTTL ( ) ) ) {
if ( no_relocation_ttl | | ( has_ttl & & now > = record . expiration ) ) {
gc_stats - > num_deletes + + ;
gc_stats - > deleted_size + = record . GetBlobSize ( ) ;
gc_stats - > deleted_size + = record . value_size ;
TEST_SYNC_POINT ( " BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete " ) ;
WriteBatch delete_batch ;
Status delete_status = delete_batch . Delete ( record . Key ( ) ) ;
Status delete_status = delete_batch . Delete ( record . key ) ;
if ( delete_status . ok ( ) ) {
delete_status = db_impl_ - > WriteWithCallback ( WriteOptions ( ) ,
& delete_batch , & callback ) ;
@ -1719,7 +1719,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
newfile - > header_ = std : : move ( header ) ;
// Can't use header beyond this point
newfile - > header_valid_ = true ;
newfile - > file_size_ = BlobLogHeader : : kHeader Size ;
newfile - > file_size_ = BlobLogHeader : : kSize ;
s = new_writer - > WriteHeader ( newfile - > header_ ) ;
if ( ! s . ok ( ) ) {
@ -1741,21 +1741,21 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
uint64_t new_blob_offset = 0 ;
uint64_t new_key_offset = 0 ;
// write the blob to the blob log.
s = new_writer - > AddRecord ( record . Key ( ) , record . Blob ( ) , & new_key_offset ,
& new_blob_offset , record . GetTTL ( ) ) ;
s = new_writer - > AddRecord ( record . key , record . value , record . expiration ,
& new_key_offset , & new_blob_offset ) ;
BlobIndex : : EncodeBlob ( & new_index_entry , newfile - > BlobFileNumber ( ) ,
new_blob_offset , record . Blob ( ) . size ( ) ,
new_blob_offset , record . value . size ( ) ,
bdb_options_ . compression ) ;
newfile - > blob_count_ + + ;
newfile - > file_size_ + =
BlobLogRecord : : kHeaderSize + record . Key ( ) . size ( ) + record . Blob ( ) . size ( ) ;
BlobLogRecord : : kHeaderSize + record . key . size ( ) + record . value . size ( ) ;
TEST_SYNC_POINT ( " BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate " ) ;
WriteBatch rewrite_batch ;
Status rewrite_status = WriteBatchInternal : : PutBlobIndex (
& rewrite_batch , column_family_id , record . Key ( ) , new_index_entry ) ;
& rewrite_batch , column_family_id , record . key , new_index_entry ) ;
if ( rewrite_status . ok ( ) ) {
rewrite_status = db_impl_ - > WriteWithCallback ( WriteOptions ( ) ,
& rewrite_batch , & callback ) ;
@ -1798,8 +1798,8 @@ bool BlobDBImpl::ShouldGCFile(std::shared_ptr<BlobFile> bfile, uint64_t now,
bool is_oldest_simple_blob_file ,
std : : string * reason ) {
if ( bfile - > HasTTL ( ) ) {
ttlrange_t ttl _range = bfile - > GetTTL Range ( ) ;
if ( now > ttl _range. second ) {
ExpirationRange expiration _range = bfile - > GetExpiration Range ( ) ;
if ( now > expiration _range. second ) {
* reason = " entire file ttl expired " ;
return true ;
}
@ -1942,11 +1942,12 @@ bool BlobDBImpl::CallbackEvictsImpl(std::shared_ptr<BlobFile> bfile) {
return false ;
}
ColumnFamilyHandle * cfh = bfile - > GetColumnFamily ( db_ ) ;
ColumnFamilyHandle * cfh =
db_impl_ - > GetColumnFamilyHandleUnlocked ( bfile - > column_family_id ( ) ) ;
BlobLogRecord record ;
Reader : : ReadLevel full = Reader : : kReadHeaderKeyBlob ;
while ( reader - > ReadRecord ( & record , full ) . ok ( ) ) {
bdb_options_ . gc_evict_cb_fn ( cfh , record . Key ( ) , record . Blob ( ) ) ;
bdb_options_ . gc_evict_cb_fn ( cfh , record . key , record . value ) ;
}
return true ;
@ -2039,15 +2040,15 @@ void BlobDBImpl::FilterSubsetOfFiles(
" File has been skipped for GC ttl %s % " PRIu64 " % " PRIu64
" reason='%s' " ,
bfile - > PathName ( ) . c_str ( ) , now ,
bfile - > GetTTL Range ( ) . second , reason . c_str ( ) ) ;
bfile - > GetExpiration Range ( ) . second , reason . c_str ( ) ) ;
continue ;
}
ROCKS_LOG_INFO ( db_options_ . info_log ,
" File has been chosen for GC ttl %s % " PRIu64 " % " PRIu64
" reason='%s' " ,
bfile - > PathName ( ) . c_str ( ) , now , bfile - > GetTTLRange ( ) . second ,
reason . c_str ( ) ) ;
bfile - > PathName ( ) . c_str ( ) , now ,
bfile - > GetExpirationRange ( ) . second , reason . c_str ( ) ) ;
to_process - > push_back ( bfile ) ;
}
}