@ -28,6 +28,7 @@
# include "util/crc32c.h"
# include "util/file_reader_writer.h"
# include "util/filename.h"
# include "util/logging.h"
# include "util/random.h"
# include "util/timer_queue.h"
# include "utilities/transactions/optimistic_transaction_db_impl.h"
@ -174,20 +175,21 @@ void EvictAllVersionsCompactionListener::InternalListener::OnCompaction(
Status s = handle . DecodeFrom ( & lsmval ) ;
if ( s . ok ( ) ) {
if ( impl_ - > debug_level_ > = 3 )
Log ( InfoLogLevel : : INFO_LEVEL , impl_ - > db_options_ . info_log ,
" CALLBACK COMPACTED OUT KEY: %s SN: %d "
" NEW: %d FN: % " PRIu64 " OFFSET: % " PRIu64 " SIZE: % " PRIu64 ,
key . ToString ( ) . c_str ( ) , sn , is_new , handle . filenumber ( ) ,
handle . offset ( ) , handle . size ( ) ) ;
ROCKS_LOG_INFO ( impl_ - > db_options_ . info_log ,
" CALLBACK COMPACTED OUT KEY: %s SN: %d "
" NEW: %d FN: % " PRIu64 " OFFSET: % " PRIu64
" SIZE: % " PRIu64 ,
key . ToString ( ) . c_str ( ) , sn , is_new , handle . filenumber ( ) ,
handle . offset ( ) , handle . size ( ) ) ;
impl_ - > override_vals_q_ . enqueue ( { handle . filenumber ( ) , key . size ( ) ,
handle . offset ( ) , handle . size ( ) , sn } ) ;
}
} else {
if ( impl_ - > debug_level_ > = 3 )
Log ( InfoLogLevel : : INFO_LEVEL , impl_ - > db_options_ . info_log ,
" CALLBACK NEW KEY: %s SN: %d NEW: %d " , key . ToString ( ) . c_str ( ) , sn ,
is_new ) ;
ROCKS_LOG_INFO ( impl_ - > db_options_ . info_log ,
" CALLBACK NEW KEY: %s SN: %d NEW: %d " ,
key . ToString ( ) . c_str ( ) , sn , is_new ) ;
}
}
@ -248,15 +250,15 @@ Status BlobDBImpl::LinkToBaseDB(DB* db) {
Status s = myenv_ - > CreateDirIfMissing ( blob_dir_ ) ;
if ( ! s . ok ( ) ) {
Log ( InfoLogLevel : : WARN_LEVEL , db_options_ . info_log ,
" Failed to create blob directory: %s status: '%s' " , blob_dir_ . c_str ( ) ,
s . ToString ( ) . c_str ( ) ) ;
ROCKS_LOG_WARN ( db_options_ . info_log ,
" Failed to create blob directory: %s status: '%s' " ,
blob_dir_ . c_str ( ) , s . ToString ( ) . c_str ( ) ) ;
}
s = myenv_ - > NewDirectory ( blob_dir_ , & dir_ent_ ) ;
if ( ! s . ok ( ) ) {
Log ( InfoLogLevel : : WARN_LEVEL , db_options_ . info_log ,
" Failed to open blob directory: %s status: '%s' " , blob_dir_ . c_str ( ) ,
s . ToString ( ) . c_str ( ) ) ;
ROCKS_LOG_WARN ( db_options_ . info_log ,
" Failed to open blob directory: %s status: '%s' " ,
blob_dir_ . c_str ( ) , s . ToString ( ) . c_str ( ) ) ;
}
if ( ! bdb_options_ . disable_background_tasks ) {
@ -317,9 +319,9 @@ Status BlobDBImpl::OpenPhase1() {
std : : unique_ptr < Directory > dir_ent ;
Status s = myenv_ - > NewDirectory ( blob_dir_ , & dir_ent ) ;
if ( ! s . ok ( ) ) {
Log ( InfoLogLevel : : WARN_LEVEL , db_options_ . info_log ,
" Failed to open blob directory: %s status: '%s' " , blob_dir_ . c_str ( ) ,
s . ToString ( ) . c_str ( ) ) ;
ROCKS_LOG_WARN ( db_options_ . info_log ,
" Failed to open blob directory: %s status: '%s' " ,
blob_dir_ . c_str ( ) , s . ToString ( ) . c_str ( ) ) ;
open_p1_done_ = true ;
return Status : : OK ( ) ;
}
@ -382,9 +384,9 @@ Status BlobDBImpl::GetAllLogFiles(
if ( psucc & & type = = kBlobFile ) {
file_nums - > insert ( std : : make_pair ( number , f ) ) ;
} else {
Log ( InfoLogLevel : : WARN_LEVEL , db_options_ . info_log ,
" Skipping file in blob directory %s parse: %d type: %d " , f . c_str ( ) ,
psucc , ( ( psucc ) ? type : - 1 ) ) ;
ROCKS_LOG_WARN ( db_options_ . info_log ,
" Skipping file in blob directory %s parse: %d type: %d " ,
f . c_str ( ) , psucc , ( ( psucc ) ? type : - 1 ) ) ;
}
}
@ -398,17 +400,18 @@ Status BlobDBImpl::OpenAllFiles() {
Status status = GetAllLogFiles ( & file_nums ) ;
if ( ! status . ok ( ) ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
" Failed to collect files from blob dir: %s status: '%s' " ,
blob_dir_ . c_str ( ) , status . ToString ( ) . c_str ( ) ) ;
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" Failed to collect files from blob dir: %s status: '%s' " ,
blob_dir_ . c_str ( ) , status . ToString ( ) . c_str ( ) ) ;
return status ;
}
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
" BlobDir files path: %s count: %d min: % " PRIu64 " max: % " PRIu64 ,
blob_dir_ . c_str ( ) , static_cast < int > ( file_nums . size ( ) ) ,
( file_nums . empty ( ) ) ? - 1 : ( file_nums . begin ( ) ) - > first ,
( file_nums . empty ( ) ) ? - 1 : ( file_nums . end ( ) ) - > first ) ;
ROCKS_LOG_INFO ( db_options_ . info_log ,
" BlobDir files path: %s count: %d min: % " PRIu64
" max: % " PRIu64 ,
blob_dir_ . c_str ( ) , static_cast < int > ( file_nums . size ( ) ) ,
( file_nums . empty ( ) ) ? - 1 : ( file_nums . begin ( ) ) - > first ,
( file_nums . empty ( ) ) ? - 1 : ( file_nums . end ( ) ) - > first ) ;
if ( ! file_nums . empty ( ) )
next_file_number_ . store ( ( file_nums . rbegin ( ) ) - > first + 1 ) ;
@ -418,15 +421,16 @@ Status BlobDBImpl::OpenAllFiles() {
uint64_t size_bytes ;
Status s1 = myenv_ - > GetFileSize ( bfpath , & size_bytes ) ;
if ( ! s1 . ok ( ) ) {
Log ( InfoLogLevel : : WARN_LEVEL , db_options_ . info_log ,
ROCKS_LOG_WARN (
db_options_ . info_log ,
" Unable to get size of %s. File skipped from open status: '%s' " ,
bfpath . c_str ( ) , s1 . ToString ( ) . c_str ( ) ) ;
continue ;
}
if ( debug_level_ > = 1 )
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
" Blob File open: %s size: % " PRIu64 , bfpath . c_str ( ) , size_bytes ) ;
ROCKS_LOG_INFO ( db_options_ . info_log , " Blob File open: %s size: % " PRIu64 ,
bfpath . c_str ( ) , size_bytes ) ;
std : : shared_ptr < BlobFile > bfptr =
std : : make_shared < BlobFile > ( this , blob_dir_ , f_iter . first ) ;
@ -441,10 +445,10 @@ Status BlobDBImpl::OpenAllFiles() {
reader = bfptr - > OpenSequentialReader ( myenv_ , db_options_ , env_options_ ) ;
s1 = reader - > ReadHeader ( & bfptr - > header_ ) ;
if ( ! s1 . ok ( ) ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
" Failure to read header for blob-file %s "
" status: '%s' size: % " PRIu64 ,
bfpath . c_str ( ) , s1 . ToString ( ) . c_str ( ) , size_bytes ) ;
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" Failure to read header for blob-file %s "
" status: '%s' size: % " PRIu64 ,
bfpath . c_str ( ) , s1 . ToString ( ) . c_str ( ) , size_bytes ) ;
continue ;
}
bfptr - > header_valid_ = true ;
@ -459,15 +463,15 @@ Status BlobDBImpl::OpenAllFiles() {
if ( s1 . ok ( ) ) {
s1 = bfptr - > SetFromFooterLocked ( bf ) ;
if ( ! s1 . ok ( ) ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
" Header Footer mismatch for blob-file %s "
" status: '%s' size: % " PRIu64 ,
bfpath . c_str ( ) , s1 . ToString ( ) . c_str ( ) , size_bytes ) ;
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" Header Footer mismatch for blob-file %s "
" status: '%s' size: % " PRIu64 ,
bfpath . c_str ( ) , s1 . ToString ( ) . c_str ( ) , size_bytes ) ;
continue ;
}
} else {
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
" File found incomplete (w/o footer) %s " , bfpath . c_str ( ) ) ;
ROCKS_LOG_INFO ( db_options_ . info_log ,
" File found incomplete (w/o footer) %s " , bfpath . c_str ( ) ) ;
// sequentially iterate over the file and read all the records
ttlrange_t ttl_range ( std : : numeric_limits < uint32_t > : : max ( ) ,
@ -496,15 +500,15 @@ Status BlobDBImpl::OpenAllFiles() {
}
if ( record_start ! = bfptr - > GetFileSize ( ) ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
" Blob file is corrupted or crashed during write %s "
" good_size: % " PRIu64 " file_size: % " PRIu64 ,
bfpath . c_str ( ) , record_start , bfptr - > GetFileSize ( ) ) ;
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" Blob file is corrupted or crashed during write %s "
" good_size: % " PRIu64 " file_size: % " PRIu64 ,
bfpath . c_str ( ) , record_start , bfptr - > GetFileSize ( ) ) ;
}
if ( ! blob_count ) {
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
" BlobCount = 0 in file %s " , bfpath . c_str ( ) ) ;
ROCKS_LOG_INFO ( db_options_ . info_log , " BlobCount = 0 in file %s " ,
bfpath . c_str ( ) ) ;
continue ;
}
@ -513,11 +517,12 @@ Status BlobDBImpl::OpenAllFiles() {
if ( bfptr - > HasTimestamp ( ) ) bfptr - > set_time_range ( ts_range ) ;
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
" Blob File: %s blob_count: % " PRIu64 " size_bytes: % " PRIu64
" sn_range: (%d, %d) ts: %d ttl: %d " ,
bfpath . c_str ( ) , blob_count , size_bytes , sn_range . first ,
sn_range . second , bfptr - > HasTimestamp ( ) , bfptr - > HasTTL ( ) ) ;
ROCKS_LOG_INFO ( db_options_ . info_log ,
" Blob File: %s blob_count: % " PRIu64
" size_bytes: % " PRIu64
" sn_range: (%d, %d) ts: %d ttl: %d " ,
bfpath . c_str ( ) , blob_count , size_bytes , sn_range . first ,
sn_range . second , bfptr - > HasTimestamp ( ) , bfptr - > HasTTL ( ) ) ;
if ( bfptr - > HasTTL ( ) ) {
ttl_range . second =
@ -531,14 +536,16 @@ Status BlobDBImpl::OpenAllFiles() {
Status fstatus = CreateWriterLocked ( bfptr ) ;
if ( fstatus . ok ( ) ) fstatus = bfptr - > WriteFooterAndCloseLocked ( ) ;
if ( ! fstatus . ok ( ) ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
ROCKS_LOG_ERROR (
db_options_ . info_log ,
" Failed to close Blob File: %s status: '%s'. Skipped " ,
bfpath . c_str ( ) , fstatus . ToString ( ) . c_str ( ) ) ;
continue ;
} else {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
" Blob File Closed: %s now: %d ttl_range: (%d, %d) " ,
bfpath . c_str ( ) , epoch_now , ttl_range . first , ttl_range . second ) ;
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" Blob File Closed: %s now: %d ttl_range: (%d, %d) " ,
bfpath . c_str ( ) , epoch_now , ttl_range . first ,
ttl_range . second ) ;
}
} else {
open_blob_files_ . insert ( bfptr ) ;
@ -570,9 +577,8 @@ std::shared_ptr<RandomAccessFileReader> BlobDBImpl::GetOrOpenRandomAccessReader(
std : : shared_ptr < BlobFile > BlobDBImpl : : NewBlobFile ( const std : : string & reason ) {
uint64_t file_num = next_file_number_ + + ;
auto bfile = std : : make_shared < BlobFile > ( this , blob_dir_ , file_num ) ;
Log ( InfoLogLevel : : DEBUG_LEVEL , db_options_ . info_log ,
" New blob file created: %s reason='%s' " , bfile - > PathName ( ) . c_str ( ) ,
reason . c_str ( ) ) ;
ROCKS_LOG_DEBUG ( db_options_ . info_log , " New blob file created: %s reason='%s' " ,
bfile - > PathName ( ) . c_str ( ) , reason . c_str ( ) ) ;
LogFlush ( db_options_ . info_log ) ;
return bfile ;
}
@ -588,11 +594,11 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
Status s = myenv_ - > ReopenWritableFile ( fpath , & wfile , env_options ) ;
if ( ! s . ok ( ) ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
" Failed to open blob file for write: %s status: '%s' "
" exists: '%s' " ,
fpath . c_str ( ) , s . ToString ( ) . c_str ( ) ,
myenv_ - > FileExists ( fpath ) . ToString ( ) . c_str ( ) ) ;
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" Failed to open blob file for write: %s status: '%s' "
" exists: '%s' " ,
fpath . c_str ( ) , s . ToString ( ) . c_str ( ) ,
myenv_ - > FileExists ( fpath ) . ToString ( ) . c_str ( ) ) ;
return s ;
}
@ -601,8 +607,8 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
uint64_t boffset = bfile - > GetFileSize ( ) ;
if ( debug_level_ > = 2 & & boffset ) {
Log ( InfoLogLevel : : DEBUG_LEVEL , db_options_ . info_log ,
" Open blob file: %s with offset: %d " , fpath . c_str ( ) , boffset ) ;
ROCKS_LOG_DEBUG ( db_options_ . info_log , " Open blob file: %s with offset: %d " ,
fpath . c_str ( ) , boffset ) ;
}
Writer : : ElemType et = Writer : : kEtNone ;
@ -611,8 +617,9 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
else if ( bfile - > file_size_ > BlobLogHeader : : kHeaderSize )
et = Writer : : kEtFooter ;
else if ( bfile - > file_size_ ) {
Log ( InfoLogLevel : : WARN_LEVEL , db_options_ . info_log ,
" Open blob file: %s with wrong size: %d " , fpath . c_str ( ) , boffset ) ;
ROCKS_LOG_WARN ( db_options_ . info_log ,
" Open blob file: %s with wrong size: %d " , fpath . c_str ( ) ,
boffset ) ;
return Status : : Corruption ( " Invalid blob file size " ) ;
}
@ -687,8 +694,9 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFile() {
// file not visible, hence no lock
std : : shared_ptr < Writer > writer = CheckOrCreateWriterLocked ( bfile ) ;
if ( ! writer ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
" Failed to get writer from blob file: %s " , bfile - > PathName ( ) . c_str ( ) ) ;
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" Failed to get writer from blob file: %s " ,
bfile - > PathName ( ) . c_str ( ) ) ;
return nullptr ;
}
@ -704,10 +712,10 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFile() {
Status s = writer - > WriteHeader ( bfile - > header_ ) ;
if ( ! s . ok ( ) ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
" Failed to write header to new blob file: %s "
" status: '%s' " ,
bfile - > PathName ( ) . c_str ( ) , s . ToString ( ) . c_str ( ) ) ;
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" Failed to write header to new blob file: %s "
" status: '%s' " ,
bfile - > PathName ( ) . c_str ( ) , s . ToString ( ) . c_str ( ) ) ;
return nullptr ;
}
@ -739,17 +747,16 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint32_t expiration) {
bfile = NewBlobFile ( " SelectBlobFileTTL " ) ;
assert ( bfile ) ;
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
" New blob file TTL range: %s %d %d " , bfile - > PathName ( ) . c_str ( ) , exp_low ,
exp_high ) ;
ROCKS_LOG_INFO ( db_options_ . info_log , " New blob file TTL range: %s %d %d " ,
bfile - > PathName ( ) . c_str ( ) , exp_low , exp_high ) ;
LogFlush ( db_options_ . info_log ) ;
// we don't need to take lock as no other thread is seeing bfile yet
std : : shared_ptr < Writer > writer = CheckOrCreateWriterLocked ( bfile ) ;
if ( ! writer ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
" Failed to get writer from blob file with TTL: %s " ,
bfile - > PathName ( ) . c_str ( ) ) ;
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" Failed to get writer from blob file with TTL: %s " ,
bfile - > PathName ( ) . c_str ( ) ) ;
return nullptr ;
}
@ -772,10 +779,10 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint32_t expiration) {
Status s = writer - > WriteHeader ( bfile - > header_ ) ;
if ( ! s . ok ( ) ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
" Failed to write header to new blob file: %s "
" status: '%s' " ,
bfile - > PathName ( ) . c_str ( ) , s . ToString ( ) . c_str ( ) ) ;
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" Failed to write header to new blob file: %s "
" status: '%s' " ,
bfile - > PathName ( ) . c_str ( ) , s . ToString ( ) . c_str ( ) ) ;
return nullptr ;
}
@ -1047,18 +1054,19 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options,
// CheckOrCreateWriterLocked(bfile);
if ( debug_level_ > = 3 )
Log ( InfoLogLevel : : DEBUG_LEVEL , db_options_ . info_log ,
" >Adding KEY FILE: %s: KEY: %s VALSZ: %d " , bfile - > PathName ( ) . c_str ( ) ,
key . ToString ( ) . c_str ( ) , value . size ( ) ) ;
ROCKS_LOG_DEBUG (
db_options_ . info_log , " >Adding KEY FILE: %s: KEY: %s VALSZ: %d " ,
bfile - > PathName ( ) . c_str ( ) , key . ToString ( ) . c_str ( ) , value . size ( ) ) ;
std : : string index_entry ;
Status s = AppendBlob ( bfile , headerbuf , key , value , & index_entry ) ;
if ( ! s . ok ( ) ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
" Failed to append blob to FILE: %s: KEY: %s VALSZ: %d "
" status: '%s' blob_file: '%s' " ,
bfile - > PathName ( ) . c_str ( ) , key . ToString ( ) . c_str ( ) , value . size ( ) ,
s . ToString ( ) . c_str ( ) , bfile - > DumpState ( ) . c_str ( ) ) ;
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" Failed to append blob to FILE: %s: KEY: %s VALSZ: %d "
" status: '%s' blob_file: '%s' " ,
bfile - > PathName ( ) . c_str ( ) , key . ToString ( ) . c_str ( ) ,
value . size ( ) , s . ToString ( ) . c_str ( ) ,
bfile - > DumpState ( ) . c_str ( ) ) ;
// Fallback just write to the LSM and get going
WriteBatch batch ;
batch . Put ( column_family , key , value ) ;
@ -1075,17 +1083,17 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options,
SequenceNumber sn = WriteBatchInternal : : Sequence ( & batch ) ;
if ( debug_level_ > = 3 )
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
" <Adding KEY FILE: %s: KEY: %s SN: %d " , bfile - > PathName ( ) . c_str ( ) ,
key . ToString ( ) . c_str ( ) , sn ) ;
ROCKS_LOG_INFO ( db_options_ . info_log , " <Adding KEY FILE: %s: KEY: %s SN: %d " ,
bfile - > PathName ( ) . c_str ( ) , key . ToString ( ) . c_str ( ) , sn ) ;
s = AppendSN ( bfile , sn ) ;
if ( ! s . ok ( ) ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
" Failed to append SN to FILE: %s: KEY: %s VALSZ: %d "
" status: '%s' blob_file: '%s' " ,
bfile - > PathName ( ) . c_str ( ) , key . ToString ( ) . c_str ( ) , value . size ( ) ,
s . ToString ( ) . c_str ( ) , bfile - > DumpState ( ) . c_str ( ) ) ;
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" Failed to append SN to FILE: %s: KEY: %s VALSZ: %d "
" status: '%s' blob_file: '%s' " ,
bfile - > PathName ( ) . c_str ( ) , key . ToString ( ) . c_str ( ) ,
value . size ( ) , s . ToString ( ) . c_str ( ) ,
bfile - > DumpState ( ) . c_str ( ) ) ;
}
if ( expiration ! = - 1 ) extendTTL ( & ( bfile - > ttl_range_ ) , ( uint32_t ) expiration ) ;
@ -1113,9 +1121,9 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
}
if ( ! s . ok ( ) ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
" Invalid status in AppendBlob: %s status: '%s' " ,
bfile - > PathName ( ) . c_str ( ) , s . ToString ( ) . c_str ( ) ) ;
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" Invalid status in AppendBlob: %s status: '%s' " ,
bfile - > PathName ( ) . c_str ( ) , s . ToString ( ) . c_str ( ) ) ;
return s ;
}
@ -1135,10 +1143,10 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
handle . EncodeTo ( index_entry ) ;
if ( debug_level_ > = 3 )
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
" >Adding KEY FILE: %s: BC: %d OFFSET: %d SZ: %d " ,
bfile - > PathName ( ) . c_str ( ) , bfile - > blob_count_ . load ( ) , blob_offset ,
value . size ( ) ) ;
ROCKS_LOG_INFO ( db_options_ . info_log ,
" >Adding KEY FILE: %s: BC: %d OFFSET: %d SZ: %d " ,
bfile - > PathName ( ) . c_str ( ) , bfile - > blob_count_ . load ( ) ,
blob_offset , value . size ( ) ) ;
return s ;
}
@ -1153,9 +1161,9 @@ Status BlobDBImpl::AppendSN(const std::shared_ptr<BlobFile>& bfile,
s = writer - > AddRecordFooter ( sn ) ;
if ( ! s . ok ( ) ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
" Invalid status in AppendSN: %s status: '%s' " ,
bfile - > PathName ( ) . c_str ( ) , s . ToString ( ) . c_str ( ) ) ;
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" Invalid status in AppendSN: %s status: '%s' " ,
bfile - > PathName ( ) . c_str ( ) , s . ToString ( ) . c_str ( ) ) ;
return s ;
}
@ -1203,7 +1211,8 @@ Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key,
if ( handle . offset ( ) <
( BlobLogHeader : : kHeaderSize + BlobLogRecord : : kHeaderSize + key . size ( ) ) ) {
if ( debug_level_ > = 2 ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
ROCKS_LOG_ERROR (
db_options_ . info_log ,
" Invalid blob handle file_number: % " PRIu64 " blob_offset: % " PRIu64
" blob_size: % " PRIu64 " key: %s " ,
handle . filenumber ( ) , handle . offset ( ) , handle . size ( ) , key . data ( ) ) ;
@ -1254,7 +1263,8 @@ Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key,
s = reader - > Read ( handle . offset ( ) , handle . size ( ) , & blob_value , buffer ) ;
if ( ! s . ok ( ) | | blob_value . size ( ) ! = handle . size ( ) ) {
if ( debug_level_ > = 2 ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
ROCKS_LOG_ERROR (
db_options_ . info_log ,
" Failed to read blob from file: %s blob_offset: % " PRIu64
" blob_size: % " PRIu64 " read: %d key: %s status: '%s' " ,
bfile - > PathName ( ) . c_str ( ) , handle . offset ( ) , handle . size ( ) ,
@ -1273,7 +1283,8 @@ Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key,
sizeof ( uint32_t ) , & crc_slice , crc_buffer ) ;
if ( ! s . ok ( ) | | ! GetFixed32 ( & crc_slice , & crc_exp ) ) {
if ( debug_level_ > = 2 ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
ROCKS_LOG_ERROR (
db_options_ . info_log ,
" Failed to fetch blob crc file: %s blob_offset: % " PRIu64
" blob_size: % " PRIu64 " key: %s status: '%s' " ,
bfile - > PathName ( ) . c_str ( ) , handle . offset ( ) , handle . size ( ) ,
@ -1286,11 +1297,11 @@ Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key,
crc = crc32c : : Mask ( crc ) ; // Adjust for storage
if ( crc ! = crc_exp ) {
if ( debug_level_ > = 2 ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
" Blob crc mismatch file: %s blob_offset: % " PRIu64
" blob_size: % " PRIu64 " key: %s status: '%s' " ,
bfile - > PathName ( ) . c_str ( ) , handle . offset ( ) , handle . size ( ) ,
key . data ( ) , s . ToString ( ) . c_str ( ) ) ;
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" Blob crc mismatch file: %s blob_offset: % " PRIu64
" blob_size: % " PRIu64 " key: %s status: '%s' " ,
bfile - > PathName ( ) . c_str ( ) , handle . offset ( ) ,
handle . size ( ) , key . data ( ) , s . ToString ( ) . c_str ( ) ) ;
}
return Status : : Corruption ( " Corruption. Blob CRC mismatch " ) ;
}
@ -1335,9 +1346,9 @@ Status BlobDBImpl::Get(const ReadOptions& options,
s = db_ - > Get ( options , column_family , key , & index_entry ) ;
if ( ! s . ok ( ) ) {
if ( debug_level_ > = 3 )
Log ( InfoLogLevel : : WARN_LEVEL , db_options_ . info_log ,
" Get Failed on LSM KEY: %s status: '%s' " , key . ToString ( ) . c_str ( ) ,
s . ToString ( ) . c_str ( ) ) ;
ROCKS_LOG_WARN ( db_options_ . info_log ,
" Get Failed on LSM KEY: %s status: '%s' " ,
key . ToString ( ) . c_str ( ) , s . ToString ( ) . c_str ( ) ) ;
return s ;
}
@ -1360,13 +1371,13 @@ Slice BlobDBIterator::value() const {
std : : pair < bool , int64_t > BlobDBImpl : : SanityCheck ( bool aborted ) {
if ( aborted ) return std : : make_pair ( false , - 1 ) ;
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log , " Starting Sanity Check " ) ;
ROCKS_LOG_INFO ( db_options_ . info_log , " Starting Sanity Check " ) ;
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
" Number of files % " PRIu64 , blob_files_ . size ( ) ) ;
ROCKS_LOG_INFO ( db_options_ . info_log , " Number of files % " PRIu64 ,
blob_files_ . size ( ) ) ;
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
" Number of open files % " PRIu64 , open_blob_files_ . size ( ) ) ;
ROCKS_LOG_INFO ( db_options_ . info_log , " Number of open files % " PRIu64 ,
open_blob_files_ . size ( ) ) ;
for ( auto bfile : open_blob_files_ ) {
assert ( ! bfile - > Immutable ( ) ) ;
@ -1377,7 +1388,8 @@ std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
for ( auto bfile_pair : blob_files_ ) {
auto bfile = bfile_pair . second ;
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
ROCKS_LOG_INFO (
db_options_ . info_log ,
" Blob File %s % " PRIu64 " % " PRIu64 " % " PRIu64 " % " PRIu64 " %d " ,
bfile - > PathName ( ) . c_str ( ) , bfile - > GetFileSize ( ) , bfile - > BlobCount ( ) ,
bfile - > deleted_count_ , bfile - > deleted_size_ ,
@ -1415,10 +1427,11 @@ void BlobDBImpl::CloseIf(const std::shared_ptr<BlobFile>& bfile) {
if ( ! close ) return ;
if ( debug_level_ > = 2 ) {
Log ( InfoLogLevel : : DEBUG_LEVEL , db_options_ . info_log ,
" Scheduling file for close %s fsize: % " PRIu64 " limit: % " PRIu64 ,
bfile - > PathName ( ) . c_str ( ) , bfile - > GetFileSize ( ) ,
bdb_options_ . blob_file_size ) ;
ROCKS_LOG_DEBUG ( db_options_ . info_log ,
" Scheduling file for close %s fsize: % " PRIu64
" limit: % " PRIu64 ,
bfile - > PathName ( ) . c_str ( ) , bfile - > GetFileSize ( ) ,
bdb_options_ . blob_file_size ) ;
}
{
@ -1430,10 +1443,10 @@ void BlobDBImpl::CloseIf(const std::shared_ptr<BlobFile>& bfile) {
if ( findit ! = open_simple_files_ . end ( ) ) {
open_simple_files_ . erase ( findit ) ;
} else {
Log ( InfoLogLevel : : WARN_LEVEL , db_options_ . info_log ,
" File not found while closing %s fsize: % " PRIu64
" Multithreaded Writes? " ,
bfile - > PathName ( ) . c_str ( ) , bfile - > GetFileSize ( ) ) ;
ROCKS_LOG_WARN ( db_options_ . info_log ,
" File not found while closing %s fsize: % " PRIu64
" Multithreaded Writes? " ,
bfile - > PathName ( ) . c_str ( ) , bfile - > GetFileSize ( ) ) ;
}
}
@ -1451,14 +1464,14 @@ bool BlobDBImpl::FileDeleteOk_SnapshotCheckLocked(
// you want to check that there are no snapshots in the
bool notok = db_impl_ - > HasActiveSnapshotLaterThanSN ( esn ) ;
if ( notok ) {
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
" Could not delete file due to snapshot failure %s " ,
bfile - > PathName ( ) . c_str ( ) ) ;
ROCKS_LOG_INFO ( db_options_ . info_log ,
" Could not delete file due to snapshot failure %s " ,
bfile - > PathName ( ) . c_str ( ) ) ;
return false ;
} else {
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
" Will delete file due to snapshot success %s " ,
bfile - > PathName ( ) . c_str ( ) ) ;
ROCKS_LOG_INFO ( db_options_ . info_log ,
" Will delete file due to snapshot success %s " ,
bfile - > PathName ( ) . c_str ( ) ) ;
return true ;
}
}
@ -1474,8 +1487,8 @@ bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size,
// file was deleted
if ( hitr = = blob_files_ . end ( ) ) {
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
" Could not find file_number % " PRIu64 , file_number ) ;
ROCKS_LOG_INFO ( db_options_ . info_log ,
" Could not find file_number % " PRIu64 , file_number ) ;
return false ;
}
@ -1495,9 +1508,9 @@ bool BlobDBImpl::MarkBlobDeleted(const Slice& key, const Slice& lsmValue) {
BlobHandle handle ;
Status s = handle . DecodeFrom ( & val ) ;
if ( ! s . ok ( ) ) {
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
" Could not parse lsm val in MarkBlobDeleted %s " ,
lsmValue . ToString ( ) . c_str ( ) ) ;
ROCKS_LOG_INFO ( db_options_ . info_log ,
" Could not parse lsm val in MarkBlobDeleted %s " ,
lsmValue . ToString ( ) . c_str ( ) ) ;
return false ;
}
bool succ = FindFileAndEvictABlob ( handle . filenumber ( ) , key . size ( ) ,
@ -1514,13 +1527,15 @@ std::pair<bool, int64_t> BlobDBImpl::EvictCompacted(bool aborted) {
packet . blob_offset_ , packet . blob_size_ ) ;
if ( ! succ )
Log ( InfoLogLevel : : DEBUG_LEVEL , db_options_ . info_log ,
ROCKS_LOG_DEBUG (
db_options_ . info_log ,
" EVICT COMPACTION FAILURE SN: %d FN: %d OFFSET: %d SIZE: %d " ,
packet . dsn_ , packet . file_number_ , packet . blob_offset_ ,
packet . blob_size_ ) ;
if ( debug_level_ > = 3 )
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
ROCKS_LOG_INFO (
db_options_ . info_log ,
" EVICT COMPACTED SN: %d FN: %d OFFSET: %d SIZE: %d SUCC: %d " ,
packet . dsn_ , packet . file_number_ , packet . blob_offset_ ,
packet . blob_size_ , succ ) ;
@ -1565,8 +1580,8 @@ std::pair<bool, int64_t> BlobDBImpl::EvictDeletions(bool aborted) {
iter - > Seek ( eslice ) ;
if ( ! iter - > status ( ) . ok ( ) ) {
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
" Invalid iterator seek %s " , dpacket . key_ . c_str ( ) ) ;
ROCKS_LOG_INFO ( db_options_ . info_log , " Invalid iterator seek %s " ,
dpacket . key_ . c_str ( ) ) ;
continue ;
}
@ -1714,17 +1729,18 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
std : : shared_ptr < Reader > reader =
bfptr - > OpenSequentialReader ( myenv_ , db_options_ , env_options_ ) ;
if ( ! reader ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
" File sequential reader could not be opened " ,
bfptr - > PathName ( ) . c_str ( ) ) ;
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" File sequential reader could not be opened " ,
bfptr - > PathName ( ) . c_str ( ) ) ;
return Status : : IOError ( " failed to create sequential reader " ) ;
}
BlobLogHeader header ;
Status s = reader - > ReadHeader ( & header ) ;
if ( ! s . ok ( ) ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
" Failure to read header for blob-file %s " , bfptr - > PathName ( ) . c_str ( ) ) ;
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" Failure to read header for blob-file %s " ,
bfptr - > PathName ( ) . c_str ( ) ) ;
return s ;
}
@ -1798,11 +1814,12 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
// a new version of the key came in at this time, which will override
// the current version being iterated on.
if ( s1 . IsBusy ( ) ) {
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
" Optimistic transaction failed delete: %s bn: % " PRIu32 ,
bfptr - > PathName ( ) . c_str ( ) , gcstats - > blob_count ) ;
ROCKS_LOG_INFO ( db_options_ . info_log ,
" Optimistic transaction failed delete: %s bn: % " PRIu32 ,
bfptr - > PathName ( ) . c_str ( ) , gcstats - > blob_count ) ;
} else {
Log ( InfoLogLevel : : DEBUG_LEVEL , db_options_ . info_log ,
ROCKS_LOG_DEBUG (
db_options_ . info_log ,
" Successfully added delete back into LSM: %s bn: % " PRIu32 ,
bfptr - > PathName ( ) . c_str ( ) , gcstats - > blob_count ) ;
@ -1830,8 +1847,9 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
s = new_writer - > WriteHeader ( newfile - > header_ ) ;
if ( ! s . ok ( ) ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
" File: %s - header writing failed " , newfile - > PathName ( ) . c_str ( ) ) ;
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" File: %s - header writing failed " ,
newfile - > PathName ( ) . c_str ( ) ) ;
return s ;
}
@ -1870,25 +1888,26 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
// a new version of the key came in at this time, which will override
// the current version being iterated on.
if ( s1 . IsBusy ( ) ) {
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
" Optimistic transaction failed: %s put bn: % " PRIu32 ,
bfptr - > PathName ( ) . c_str ( ) , gcstats - > blob_count ) ;
ROCKS_LOG_INFO ( db_options_ . info_log ,
" Optimistic transaction failed: %s put bn: % " PRIu32 ,
bfptr - > PathName ( ) . c_str ( ) , gcstats - > blob_count ) ;
} else {
gcstats - > succ_relocs + + ;
Log ( InfoLogLevel : : DEBUG_LEVEL , db_options_ . info_log ,
" Successfully added put back into LSM: %s bn: % " PRIu32 ,
bfptr - > PathName ( ) . c_str ( ) , gcstats - > blob_count ) ;
ROCKS_LOG_DEBUG ( db_options_ . info_log ,
" Successfully added put back into LSM: %s bn: % " PRIu32 ,
bfptr - > PathName ( ) . c_str ( ) , gcstats - > blob_count ) ;
}
delete txn ;
}
if ( gcstats - > newfile ) total_blob_space_ + = newfile - > file_size_ ;
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
" File: %s Num deletes % " PRIu32 " Num relocs: % " PRIu32
" Succ Deletes: % " PRIu32 " Succ relocs: % " PRIu32 ,
bfptr - > PathName ( ) . c_str ( ) , gcstats - > num_deletes , gcstats - > num_relocs ,
gcstats - > succ_deletes_lsm , gcstats - > succ_relocs ) ;
ROCKS_LOG_INFO ( db_options_ . info_log ,
" File: %s Num deletes % " PRIu32 " Num relocs: % " PRIu32
" Succ Deletes: % " PRIu32 " Succ relocs: % " PRIu32 ,
bfptr - > PathName ( ) . c_str ( ) , gcstats - > num_deletes ,
gcstats - > num_relocs , gcstats - > succ_deletes_lsm ,
gcstats - > succ_relocs ) ;
return s ;
}
@ -1906,8 +1925,8 @@ bool BlobDBImpl::ShouldGCFile(std::shared_ptr<BlobFile> bfile, std::time_t tt,
}
if ( ! bfile - > file_size_ . load ( ) ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
" Invalid file size = 0 %s " , bfile - > PathName ( ) . c_str ( ) ) ;
ROCKS_LOG_ERROR ( db_options_ . info_log , " Invalid file size = 0 %s " ,
bfile - > PathName ( ) . c_str ( ) ) ;
* reason = " file is empty " ;
return false ;
}
@ -1990,17 +2009,18 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsFiles(bool aborted) {
Status s = myenv_ - > DeleteFile ( bfile - > PathName ( ) ) ;
if ( ! s . ok ( ) ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
" File failed to be deleted as obsolete %s " ,
bfile - > PathName ( ) . c_str ( ) ) ;
ROCKS_LOG_ERROR ( db_options_ . info_log ,
" File failed to be deleted as obsolete %s " ,
bfile - > PathName ( ) . c_str ( ) ) ;
+ + iter ;
continue ;
}
file_deleted = true ;
total_blob_space_ - = bfile - > file_size_ ;
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
" File deleted as obsolete from blob dir %s " , bfile - > PathName ( ) . c_str ( ) ) ;
ROCKS_LOG_INFO ( db_options_ . info_log ,
" File deleted as obsolete from blob dir %s " ,
bfile - > PathName ( ) . c_str ( ) ) ;
iter = tobsolete . erase ( iter ) ;
}
@ -2021,7 +2041,8 @@ bool BlobDBImpl::CallbackEvictsImpl(std::shared_ptr<BlobFile> bfile) {
std : : shared_ptr < Reader > reader =
bfile - > OpenSequentialReader ( myenv_ , db_options_ , env_options_ ) ;
if ( ! reader ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
ROCKS_LOG_ERROR (
db_options_ . info_log ,
" File sequential reader could not be opened for evict callback: %s " ,
bfile - > PathName ( ) . c_str ( ) ) ;
return false ;
@ -2032,7 +2053,8 @@ bool BlobDBImpl::CallbackEvictsImpl(std::shared_ptr<BlobFile> bfile) {
BlobLogHeader header ;
Status s = reader - > ReadHeader ( & header ) ;
if ( ! s . ok ( ) ) {
Log ( InfoLogLevel : : ERROR_LEVEL , db_options_ . info_log ,
ROCKS_LOG_ERROR (
db_options_ . info_log ,
" Failure to read header for blob-file during evict callback %s " ,
bfile - > PathName ( ) . c_str ( ) ) ;
return false ;
@ -2065,8 +2087,8 @@ std::pair<bool, int64_t> BlobDBImpl::CallbackEvicts(
if ( aborted ) return std : : make_pair ( false , - 1 ) ;
bool succ = CallbackEvictsImpl ( bfile ) ;
if ( succ ) {
Log ( InfoLogLevel : : DEBUG_LEVEL , db_options_ . info_log ,
" Eviction callbacks completed %s " , bfile - > PathName ( ) . c_str ( ) ) ;
ROCKS_LOG_DEBUG ( db_options_ . info_log , " Eviction callbacks completed %s " ,
bfile - > PathName ( ) . c_str ( ) ) ;
}
WriteLock wl ( & mutex_ ) ;
@ -2129,17 +2151,17 @@ void BlobDBImpl::FilterSubsetOfFiles(
std : : string reason ;
bool shouldgc = ShouldGCFile ( bfile , tt , last_id , & reason ) ;
if ( ! shouldgc ) {
Log ( InfoLogLevel : : DEBUG_LEVEL , db_options_ . info_log ,
" File has been skipped for GC ttl %s %d %d reason='%s' " ,
bfile - > PathName ( ) . c_str ( ) , tt , bfile - > GetTTLRange ( ) . second ,
reason . c_str ( ) ) ;
ROCKS_LOG_DEBUG ( db_options_ . info_log ,
" File has been skipped for GC ttl %s %d %d reason='%s' " ,
bfile - > PathName ( ) . c_str ( ) , tt ,
bfile - > GetTTLRange ( ) . second , reason . c_str ( ) ) ;
continue ;
}
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
" File has been chosen for GC ttl %s %d %d reason='%s' " ,
bfile - > PathName ( ) . c_str ( ) , tt , bfile - > GetTTLRange ( ) . second ,
reason . c_str ( ) ) ;
ROCKS_LOG_INFO ( db_options_ . info_log ,
" File has been chosen for GC ttl %s %d %d reason='%s' " ,
bfile - > PathName ( ) . c_str ( ) , tt , bfile - > GetTTLRange ( ) . second ,
reason . c_str ( ) ) ;
to_process - > push_back ( bfile ) ;
}
}