@ -21,16 +21,33 @@
# include "util/sync_point.h"
namespace rocksdb {
namespace {
// RAII Timer
class StatsTimer {
public :
explicit StatsTimer ( Env * env , uint64_t * counter )
: env_ ( env ) , counter_ ( counter ) , start_micros_ ( env_ - > NowMicros ( ) ) { }
~ StatsTimer ( ) { * counter_ + = env_ - > NowMicros ( ) - start_micros_ ; }
private :
Env * env_ ;
uint64_t * counter_ ;
uint64_t start_micros_ ;
} ;
} // anonymous namespace
# ifndef ROCKSDB_LITE
Status DBImpl : : AddFile ( ColumnFamilyHandle * column_family ,
const std : : string & file_path , bool move_file ) {
Status DBImpl : : ReadExternalSstFileInfo ( ColumnFamilyHandle * column_family ,
const std : : string & file_path ,
ExternalSstFileInfo * file_info ) {
Status status ;
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_family ) ;
ColumnFamilyData * cfd = cfh - > cfd ( ) ;
auto cfd = cfh - > cfd ( ) ;
ExternalSstFileInfo file_info ;
file_info . file_path = file_path ;
status = env_ - > GetFileSize ( file_path , & file_info . file_size ) ;
file_info - > file_path = file_path ;
status = env_ - > GetFileSize ( file_path , & file_info - > file_size ) ;
if ( ! status . ok ( ) ) {
return status ;
}
@ -49,7 +66,7 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
status = cfd - > ioptions ( ) - > table_factory - > NewTableReader (
TableReaderOptions ( * cfd - > ioptions ( ) , env_options_ ,
cfd - > internal_comparator ( ) ) ,
std : : move ( sst_file_reader ) , file_info . file_size , & table_reader ) ;
std : : move ( sst_file_reader ) , file_info - > file_size , & table_reader ) ;
if ( ! status . ok ( ) ) {
return status ;
}
@ -63,17 +80,16 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
return Status : : InvalidArgument ( " Generated table version not found " ) ;
}
file_info . version =
file_info - > version =
DecodeFixed32 ( external_sst_file_version_iter - > second . c_str ( ) ) ;
if ( file_info . version = = 1 ) {
if ( file_info - > version = = 1 ) {
// version 1 imply that all sequence numbers in table equal 0
file_info . sequence_number = 0 ;
file_info - > sequence_number = 0 ;
} else {
return Status : : InvalidArgument ( " Generated table version is not supported " ) ;
}
// Get number of entries in table
file_info . num_entries = table_reader - > GetTableProperties ( ) - > num_entries ;
file_info - > num_entries = table_reader - > GetTableProperties ( ) - > num_entries ;
ParsedInternalKey key ;
std : : unique_ptr < InternalIterator > iter (
@ -87,7 +103,7 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
if ( key . sequence ! = 0 ) {
return Status : : Corruption ( " Generated table have non zero sequence number " ) ;
}
file_info . smallest_key = key . user_key . ToString ( ) ;
file_info - > smallest_key = key . user_key . ToString ( ) ;
// Get last (largest) key from file
iter - > SeekToLast ( ) ;
@ -97,70 +113,145 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
if ( key . sequence ! = 0 ) {
return Status : : Corruption ( " Generated table have non zero sequence number " ) ;
}
file_info . largest_key = key . user_key . ToString ( ) ;
file_info - > largest_key = key . user_key . ToString ( ) ;
return AddFile ( column_family , & file_info , move_file ) ;
return Status : : OK ( ) ;
}
Status DBImpl : : AddFile ( ColumnFamilyHandle * column_family ,
const ExternalSstFileInfo * file_info , bool move_file ) {
const std : : vector < std : : string > & file_path_list ,
bool move_file ) {
Status status ;
auto num_files = file_path_list . size ( ) ;
if ( num_files = = 0 ) {
return Status : : InvalidArgument ( " The list of files is empty " ) ;
}
std : : vector < ExternalSstFileInfo > file_infos ( num_files ) ;
std : : vector < ExternalSstFileInfo > file_info_list ( num_files ) ;
for ( size_t i = 0 ; i < num_files ; i + + ) {
status = ReadExternalSstFileInfo ( column_family , file_path_list [ i ] ,
& file_info_list [ i ] ) ;
if ( ! status . ok ( ) ) {
return status ;
}
}
return AddFile ( column_family , file_info_list , move_file ) ;
}
Status DBImpl : : AddFile ( ColumnFamilyHandle * column_family ,
const std : : vector < ExternalSstFileInfo > & file_info_list ,
bool move_file ) {
Status status ;
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_family ) ;
ColumnFamilyData * cfd = cfh - > cfd ( ) ;
const uint64_t start_micros = env_ - > NowMicros ( ) ;
if ( file_info - > num_entries = = 0 ) {
return Status : : InvalidArgument ( " File contain no entries " ) ;
}
if ( file_info - > version ! = 1 ) {
return Status : : InvalidArgument ( " Generated table version is not supported " ) ;
auto num_files = file_info_list . size ( ) ;
if ( num_files = = 0 ) {
return Status : : InvalidArgument ( " The list of files is empty " ) ;
}
// version 1 imply that file have only Put Operations with Sequence Number = 0
FileMetaData meta ;
meta . smallest =
InternalKey ( file_info - > smallest_key , file_info - > sequence_number ,
ValueType : : kTypeValue ) ;
meta . largest = InternalKey ( file_info - > largest_key , file_info - > sequence_number ,
ValueType : : kTypeValue ) ;
if ( ! meta . smallest . Valid ( ) | | ! meta . largest . Valid ( ) ) {
return Status : : Corruption ( " Generated table have corrupted keys " ) ;
// Verify that passed files dont have overlapping ranges
if ( num_files > 1 ) {
std : : vector < const ExternalSstFileInfo * > sorted_file_info_list ( num_files ) ;
for ( size_t i = 0 ; i < num_files ; i + + ) {
sorted_file_info_list [ i ] = & file_info_list [ i ] ;
}
auto * vstorage = cfd - > current ( ) - > storage_info ( ) ;
std : : sort (
sorted_file_info_list . begin ( ) , sorted_file_info_list . end ( ) ,
[ & vstorage , & file_info_list ] ( const ExternalSstFileInfo * info1 ,
const ExternalSstFileInfo * info2 ) {
return vstorage - > InternalComparator ( ) - > user_comparator ( ) - > Compare (
info1 - > smallest_key , info2 - > smallest_key ) < 0 ;
} ) ;
for ( size_t i = 0 ; i < num_files - 1 ; i + + ) {
if ( sorted_file_info_list [ i ] - > largest_key > =
sorted_file_info_list [ i + 1 ] - > smallest_key ) {
return Status : : NotSupported ( " Cannot add overlapping range among files " ) ;
}
}
}
meta . smallest_seqno = file_info - > sequence_number ;
meta . largest_seqno = file_info - > sequence_number ;
if ( meta . smallest_seqno ! = 0 | | meta . largest_seqno ! = 0 ) {
return Status : : InvalidArgument (
" Non zero sequence numbers are not supported " ) ;
std : : vector < uint64_t > micro_list ( num_files , 0 ) ;
std : : vector < FileMetaData > meta_list ( num_files ) ;
for ( size_t i = 0 ; i < num_files ; i + + ) {
StatsTimer t ( env_ , & micro_list [ i ] ) ;
if ( file_info_list [ i ] . num_entries = = 0 ) {
return Status : : InvalidArgument ( " File contain no entries " ) ;
}
if ( file_info_list [ i ] . version ! = 1 ) {
return Status : : InvalidArgument (
" Generated table version is not supported " ) ;
}
// version 1 imply that file have only Put Operations with Sequence Number =
// 0
meta_list [ i ] . smallest =
InternalKey ( file_info_list [ i ] . smallest_key ,
file_info_list [ i ] . sequence_number , ValueType : : kTypeValue ) ;
meta_list [ i ] . largest =
InternalKey ( file_info_list [ i ] . largest_key ,
file_info_list [ i ] . sequence_number , ValueType : : kTypeValue ) ;
if ( ! meta_list [ i ] . smallest . Valid ( ) | | ! meta_list [ i ] . largest . Valid ( ) ) {
return Status : : Corruption ( " Generated table have corrupted keys " ) ;
}
meta_list [ i ] . smallest_seqno = file_info_list [ i ] . sequence_number ;
meta_list [ i ] . largest_seqno = file_info_list [ i ] . sequence_number ;
if ( meta_list [ i ] . smallest_seqno ! = 0 | | meta_list [ i ] . largest_seqno ! = 0 ) {
return Status : : InvalidArgument (
" Non zero sequence numbers are not supported " ) ;
}
}
// Generate a location for the new table
std : : list < uint64_t > : : iterator pending_outputs_inserted_elem ;
std : : vector < std : : list < uint64_t > : : iterator > pending_outputs_inserted_elem_list (
num_files ) ;
// Generate locations for the new tables
{
InstrumentedMutexLock l ( & mutex_ ) ;
pending_outputs_inserted_elem = CaptureCurrentFileNumberInPendingOutputs ( ) ;
meta . fd =
FileDescriptor ( versions_ - > NewFileNumber ( ) , 0 , file_info - > file_size ) ;
for ( size_t i = 0 ; i < num_files ; i + + ) {
StatsTimer t ( env_ , & micro_list [ i ] ) ;
pending_outputs_inserted_elem_list [ i ] =
CaptureCurrentFileNumberInPendingOutputs ( ) ;
meta_list [ i ] . fd = FileDescriptor ( versions_ - > NewFileNumber ( ) , 0 ,
file_info_list [ i ] . file_size ) ;
}
}
std : : string db_fname = TableFileName (
db_options_ . db_paths , meta . fd . GetNumber ( ) , meta . fd . GetPathId ( ) ) ;
if ( move_file ) {
status = env_ - > LinkFile ( file_info - > file_path , db_fname ) ;
if ( status . IsNotSupported ( ) ) {
// Original file is on a different FS, use copy instead of hard linking
status = CopyFile ( env_ , file_info - > file_path , db_fname , 0 ) ;
// Copy/Move external files into DB
std : : vector < std : : string > db_fname_list ( num_files ) ;
size_t j = 0 ;
for ( ; j < num_files ; j + + ) {
StatsTimer t ( env_ , & micro_list [ j ] ) ;
db_fname_list [ j ] =
TableFileName ( db_options_ . db_paths , meta_list [ j ] . fd . GetNumber ( ) ,
meta_list [ j ] . fd . GetPathId ( ) ) ;
if ( move_file ) {
status = env_ - > LinkFile ( file_info_list [ j ] . file_path , db_fname_list [ j ] ) ;
if ( status . IsNotSupported ( ) ) {
// Original file is on a different FS, use copy instead of hard linking
status =
CopyFile ( env_ , file_info_list [ j ] . file_path , db_fname_list [ j ] , 0 ) ;
}
} else {
status = CopyFile ( env_ , file_info_list [ j ] . file_path , db_fname_list [ j ] , 0 ) ;
}
TEST_SYNC_POINT ( " DBImpl::AddFile:FileCopied " ) ;
if ( ! status . ok ( ) ) {
for ( size_t i = 0 ; i < j ; i + + ) {
Status s = env_ - > DeleteFile ( db_fname_list [ i ] ) ;
if ( ! s . ok ( ) ) {
Log ( InfoLogLevel : : WARN_LEVEL , db_options_ . info_log ,
" AddFile() clean up for file %s failed : %s " ,
db_fname_list [ i ] . c_str ( ) , s . ToString ( ) . c_str ( ) ) ;
}
}
return status ;
}
} else {
status = CopyFile ( env_ , file_info - > file_path , db_fname , 0 ) ;
}
TEST_SYNC_POINT ( " DBImpl::AddFile:FileCopied " ) ;
if ( ! status . ok ( ) ) {
return status ;
}
// The level the file will be ingested into
int target_level = 0 ;
{
InstrumentedMutexLock l ( & mutex_ ) ;
const MutableCFOptions mutable_cf_options =
@ -183,35 +274,46 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
ro . total_order_seek = true ;
ScopedArenaIterator iter ( NewInternalIterator ( ro , cfd , sv , & arena ) ) ;
InternalKey range_start ( file_info - > smallest_key , kMaxSequenceNumber ,
kTypeValue ) ;
iter - > Seek ( range_start . Encode ( ) ) ;
status = iter - > status ( ) ;
if ( status . ok ( ) & & iter - > Valid ( ) ) {
ParsedInternalKey seek_result ;
if ( ParseInternalKey ( iter - > key ( ) , & seek_result ) ) {
auto * vstorage = cfd - > current ( ) - > storage_info ( ) ;
if ( vstorage - > InternalComparator ( ) - > user_comparator ( ) - > Compare (
seek_result . user_key , file_info - > largest_key ) < = 0 ) {
status = Status : : NotSupported ( " Cannot add overlapping range " ) ;
for ( size_t i = 0 ; i < num_files ; i + + ) {
StatsTimer t ( env_ , & micro_list [ i ] ) ;
InternalKey range_start ( file_info_list [ i ] . smallest_key ,
kMaxSequenceNumber , kTypeValue ) ;
iter - > Seek ( range_start . Encode ( ) ) ;
status = iter - > status ( ) ;
if ( status . ok ( ) & & iter - > Valid ( ) ) {
ParsedInternalKey seek_result ;
if ( ParseInternalKey ( iter - > key ( ) , & seek_result ) ) {
auto * vstorage = cfd - > current ( ) - > storage_info ( ) ;
if ( vstorage - > InternalComparator ( ) - > user_comparator ( ) - > Compare (
seek_result . user_key , file_info_list [ i ] . largest_key ) < = 0 ) {
status = Status : : NotSupported ( " Cannot add overlapping range " ) ;
break ;
}
} else {
status = Status : : Corruption ( " DB have corrupted keys " ) ;
break ;
}
} else {
status = Status : : Corruption ( " DB have corrupted keys " ) ;
}
}
}
// The level the file will be ingested into
std : : vector < int > target_level_list ( num_files , 0 ) ;
if ( status . ok ( ) ) {
// Add file to the lowest possible level
target_level = PickLevelForIngestedFile ( cfd , file_info ) ;
// Add files to L0
VersionEdit edit ;
edit . SetColumnFamily ( cfd - > GetID ( ) ) ;
edit . AddFile ( target_level , meta . fd . GetNumber ( ) , meta . fd . GetPathId ( ) ,
meta . fd . GetFileSize ( ) , meta . smallest , meta . largest ,
meta . smallest_seqno , meta . largest_seqno ,
meta . marked_for_compaction ) ;
for ( size_t i = 0 ; i < num_files ; i + + ) {
StatsTimer t ( env_ , & micro_list [ i ] ) ;
// Add file to the lowest possible level
target_level_list [ i ] = PickLevelForIngestedFile ( cfd , file_info_list [ i ] ) ;
edit . AddFile ( target_level_list [ i ] , meta_list [ i ] . fd . GetNumber ( ) ,
meta_list [ i ] . fd . GetPathId ( ) , meta_list [ i ] . fd . GetFileSize ( ) ,
meta_list [ i ] . smallest , meta_list [ i ] . largest ,
meta_list [ i ] . smallest_seqno , meta_list [ i ] . largest_seqno ,
meta_list [ i ] . marked_for_compaction ) ;
}
status = versions_ - > LogAndApply ( cfd , mutable_cf_options , & edit , & mutex_ ,
directories_ . GetDbDir ( ) ) ;
}
@ -220,45 +322,41 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
if ( status . ok ( ) ) {
delete InstallSuperVersionAndScheduleWork ( cfd , nullptr ,
mutable_cf_options ) ;
}
for ( size_t i = 0 ; i < num_files ; i + + ) {
// Update internal stats
InternalStats : : CompactionStats stats ( 1 ) ;
stats . micros = env_ - > NowMicros ( ) - start_micros ;
stats . bytes_written = meta . fd . GetFileSize ( ) ;
stats . micros = micro_list [ i ] ;
stats . bytes_written = meta_list [ i ] . fd . GetFileSize ( ) ;
stats . num_output_files = 1 ;
cfd - > internal_stats ( ) - > AddCompactionStats ( target_level , stats ) ;
cfd - > internal_stats ( ) - > AddCompactionStats ( target_level_list [ i ] , stats ) ;
cfd - > internal_stats ( ) - > AddCFStats ( InternalStats : : BYTES_INGESTED_ADD_FILE ,
meta . fd . GetFileSize ( ) ) ;
meta_list [ i ] . fd . GetFileSize ( ) ) ;
ReleaseFileNumberFromPendingOutputs (
pending_outputs_inserted_elem_list [ i ] ) ;
}
ReleaseFileNumberFromPendingOutputs ( pending_outputs_inserted_elem ) ;
}
if ( ! status . ok ( ) ) {
// We failed to add the file to the database
Status s = env_ - > DeleteFile ( db_fname ) ;
if ( ! s . ok ( ) ) {
Log ( InfoLogLevel : : WARN_LEVEL , db_options_ . info_log ,
" AddFile() clean up for file %s failed : %s " , db_fname . c_str ( ) ,
s . ToString ( ) . c_str ( ) ) ;
// We failed to add the files to the database
for ( size_t i = 0 ; i < num_files ; i + + ) {
Status s = env_ - > DeleteFile ( db_fname_list [ i ] ) ;
if ( ! s . ok ( ) ) {
Log ( InfoLogLevel : : WARN_LEVEL , db_options_ . info_log ,
" AddFile() clean up for file %s failed : %s " ,
db_fname_list [ i ] . c_str ( ) , s . ToString ( ) . c_str ( ) ) ;
}
}
} else {
// File was ingested successfully
Log ( InfoLogLevel : : INFO_LEVEL , db_options_ . info_log ,
" New file % " PRIu64
" was added to L%d (Size: %.2f MB, "
" entries: % " PRIu64 " ) " ,
meta . fd . GetNumber ( ) , target_level ,
static_cast < double > ( meta . fd . GetFileSize ( ) ) / 1048576.0 ,
file_info - > num_entries ) ;
if ( move_file ) {
// The file was moved and added successfully, remove original file link
Status s = env_ - > DeleteFile ( file_info - > file_path ) ;
} else if ( status . ok ( ) & & move_file ) {
// The files were moved and added successfully, remove original file links
for ( size_t i = 0 ; i < num_files ; i + + ) {
Status s = env_ - > DeleteFile ( file_info_list [ i ] . file_path ) ;
if ( ! s . ok ( ) ) {
Log ( InfoLogLevel : : WARN_LEVEL , db_options_ . info_log ,
" %s was added to DB successfully but failed to remove original "
" file link : %s " ,
file_info - > file_path . c_str ( ) , s . ToString ( ) . c_str ( ) ) ;
" file "
" link : %s " ,
file_info_list [ i ] . file_path . c_str ( ) , s . ToString ( ) . c_str ( ) ) ;
}
}
}
@ -267,15 +365,15 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
// Finds the lowest level in the DB that the ingested file can be added to
int DBImpl : : PickLevelForIngestedFile ( ColumnFamilyData * cfd ,
const ExternalSstFileInfo * file_info ) {
const ExternalSstFileInfo & file_info ) {
mutex_ . AssertHeld ( ) ;
int target_level = 0 ;
auto * vstorage = cfd - > current ( ) - > storage_info ( ) ;
auto * ucmp = vstorage - > InternalComparator ( ) - > user_comparator ( ) ;
Slice file_smallest_user_key ( file_info - > smallest_key ) ;
Slice file_largest_user_key ( file_info - > largest_key ) ;
Slice file_smallest_user_key ( file_info . smallest_key ) ;
Slice file_largest_user_key ( file_info . largest_key ) ;
for ( int lvl = cfd - > NumberLevels ( ) - 1 ; lvl > = vstorage - > base_level ( ) ;
lvl - - ) {