@ -1415,20 +1415,22 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
pending_outputs_ [ meta . fd . GetNumber ( ) ] = 0 ; // path 0 for level 0 file.
pending_outputs_ [ meta . fd . GetNumber ( ) ] = 0 ; // path 0 for level 0 file.
ReadOptions ro ;
ReadOptions ro ;
ro . total_order_seek = true ;
ro . total_order_seek = true ;
Iterator * iter = mem - > NewIterator ( ro ) ;
Arena arena ;
Status s ;
{
ScopedArenaIterator iter ( mem - > NewIterator ( ro , & arena ) ) ;
const SequenceNumber newest_snapshot = snapshots_ . GetNewest ( ) ;
const SequenceNumber newest_snapshot = snapshots_ . GetNewest ( ) ;
const SequenceNumber earliest_seqno_in_memtable =
const SequenceNumber earliest_seqno_in_memtable =
mem - > GetFirstSequenceNumber ( ) ;
mem - > GetFirstSequenceNumber ( ) ;
Log ( options_ . info_log , " [%s] Level-0 table #% " PRIu64 " : started " ,
Log ( options_ . info_log , " [%s] Level-0 table #% " PRIu64 " : started " ,
cfd - > GetName ( ) . c_str ( ) , meta . fd . GetNumber ( ) ) ;
cfd - > GetName ( ) . c_str ( ) , meta . fd . GetNumber ( ) ) ;
Status s ;
{
{
mutex_ . Unlock ( ) ;
mutex_ . Unlock ( ) ;
s = BuildTable ( dbname_ , env_ , * cfd - > ioptions ( ) , env_options_ ,
s = BuildTable (
cfd - > table_cache ( ) , iter , & meta , cfd - > internal_comparator ( ) ,
dbname_ , env_ , * cfd - > ioptions ( ) , env_options_ , cfd - > table_cache ( ) ,
newest_snapshot , earliest_seqno_in_memtable ,
iter . get ( ) , & meta , cfd - > internal_comparator ( ) , newest_snapshot ,
GetCompressionFlush ( * cfd - > options ( ) ) ,
earliest_seqno_in_memtable , GetCompressionFlush ( * cfd - > options ( ) ) ,
cfd - > options ( ) - > compression_opts , Env : : IO_HIGH ) ;
cfd - > options ( ) - > compression_opts , Env : : IO_HIGH ) ;
LogFlush ( options_ . info_log ) ;
LogFlush ( options_ . info_log ) ;
mutex_ . Lock ( ) ;
mutex_ . Lock ( ) ;
@ -1438,8 +1440,7 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
" [%s] Level-0 table #% " PRIu64 " : % " PRIu64 " bytes %s " ,
" [%s] Level-0 table #% " PRIu64 " : % " PRIu64 " bytes %s " ,
cfd - > GetName ( ) . c_str ( ) , meta . fd . GetNumber ( ) , meta . fd . GetFileSize ( ) ,
cfd - > GetName ( ) . c_str ( ) , meta . fd . GetNumber ( ) , meta . fd . GetFileSize ( ) ,
s . ToString ( ) . c_str ( ) ) ;
s . ToString ( ) . c_str ( ) ) ;
delete iter ;
}
pending_outputs_ . erase ( meta . fd . GetNumber ( ) ) ;
pending_outputs_ . erase ( meta . fd . GetNumber ( ) ) ;
// Note that if file_size is zero, the file has been deleted and
// Note that if file_size is zero, the file has been deleted and
@ -1485,24 +1486,27 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
std : : vector < Iterator * > memtables ;
std : : vector < Iterator * > memtables ;
ReadOptions ro ;
ReadOptions ro ;
ro . total_order_seek = true ;
ro . total_order_seek = true ;
Arena arena ;
for ( MemTable * m : mems ) {
for ( MemTable * m : mems ) {
Log ( options_ . info_log ,
Log ( options_ . info_log ,
" [%s] Flushing memtable with next log file: % " PRIu64 " \n " ,
" [%s] Flushing memtable with next log file: % " PRIu64 " \n " ,
cfd - > GetName ( ) . c_str ( ) , m - > GetNextLogNumber ( ) ) ;
cfd - > GetName ( ) . c_str ( ) , m - > GetNextLogNumber ( ) ) ;
memtables . push_back ( m - > NewIterator ( ro ) ) ;
memtables . push_back ( m - > NewIterator ( ro , & arena ) ) ;
}
}
Iterator * iter = NewMergingIterator ( & cfd - > internal_comparator ( ) ,
{
& memtables [ 0 ] , memtables . size ( ) ) ;
ScopedArenaIterator iter ( NewMergingIterator ( & cfd - > internal_comparator ( ) ,
& memtables [ 0 ] ,
memtables . size ( ) , & arena ) ) ;
Log ( options_ . info_log , " [%s] Level-0 flush table #% " PRIu64 " : started " ,
Log ( options_ . info_log , " [%s] Level-0 flush table #% " PRIu64 " : started " ,
cfd - > GetName ( ) . c_str ( ) , meta . fd . GetNumber ( ) ) ;
cfd - > GetName ( ) . c_str ( ) , meta . fd . GetNumber ( ) ) ;
s = BuildTable ( dbname_ , env_ , * cfd - > ioptions ( ) , env_options_ ,
s = BuildTable (
cfd - > table_cache ( ) , iter , & meta , cfd - > internal_comparator ( ) ,
dbname_ , env_ , * cfd - > ioptions ( ) , env_options_ , cfd - > table_cache ( ) ,
newest_snapshot , earliest_seqno_in_memtable ,
iter . get ( ) , & meta , cfd - > internal_comparator ( ) , newest_snapshot ,
GetCompressionFlush ( * cfd - > options ( ) ) ,
earliest_seqno_in_memtable , GetCompressionFlush ( * cfd - > options ( ) ) ,
cfd - > options ( ) - > compression_opts , Env : : IO_HIGH ) ;
cfd - > options ( ) - > compression_opts , Env : : IO_HIGH ) ;
LogFlush ( options_ . info_log ) ;
LogFlush ( options_ . info_log ) ;
delete iter ;
}
Log ( options_ . info_log ,
Log ( options_ . info_log ,
" [%s] Level-0 flush table #% " PRIu64 " : % " PRIu64 " bytes %s " ,
" [%s] Level-0 flush table #% " PRIu64 " : % " PRIu64 " bytes %s " ,
cfd - > GetName ( ) . c_str ( ) , meta . fd . GetNumber ( ) , meta . fd . GetFileSize ( ) ,
cfd - > GetName ( ) . c_str ( ) , meta . fd . GetNumber ( ) , meta . fd . GetFileSize ( ) ,
@ -3349,7 +3353,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
SuperVersion * super_version ,
SuperVersion * super_version ,
Arena * arena ) {
Arena * arena ) {
Iterator * internal_iter ;
Iterator * internal_iter ;
if ( arena ! = nullptr ) {
assert ( arena ! = nullptr ) ;
// Need to create internal iterator from the arena.
// Need to create internal iterator from the arena.
MergeIteratorBuilder merge_iter_builder ( & cfd - > internal_comparator ( ) , arena ) ;
MergeIteratorBuilder merge_iter_builder ( & cfd - > internal_comparator ( ) , arena ) ;
// Collect iterator for mutable mem
// Collect iterator for mutable mem
@ -3361,19 +3365,6 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
super_version - > current - > AddIterators ( options , env_options_ ,
super_version - > current - > AddIterators ( options , env_options_ ,
& merge_iter_builder ) ;
& merge_iter_builder ) ;
internal_iter = merge_iter_builder . Finish ( ) ;
internal_iter = merge_iter_builder . Finish ( ) ;
} else {
// Need to create internal iterator using malloc.
std : : vector < Iterator * > iterator_list ;
// Collect iterator for mutable mem
iterator_list . push_back ( super_version - > mem - > NewIterator ( options ) ) ;
// Collect all needed child iterators for immutable memtables
super_version - > imm - > AddIterators ( options , & iterator_list ) ;
// Collect iterators for files in L0 - Ln
super_version - > current - > AddIterators ( options , env_options_ ,
& iterator_list ) ;
internal_iter = NewMergingIterator ( & cfd - > internal_comparator ( ) ,
& iterator_list [ 0 ] , iterator_list . size ( ) ) ;
}
IterState * cleanup = new IterState ( this , & mutex_ , super_version ) ;
IterState * cleanup = new IterState ( this , & mutex_ , super_version ) ;
internal_iter - > RegisterCleanup ( CleanupIteratorState , cleanup , nullptr ) ;
internal_iter - > RegisterCleanup ( CleanupIteratorState , cleanup , nullptr ) ;
@ -3790,10 +3781,12 @@ Status DBImpl::NewIterators(
? reinterpret_cast < const SnapshotImpl * > ( options . snapshot ) - > number_
? reinterpret_cast < const SnapshotImpl * > ( options . snapshot ) - > number_
: latest_snapshot ;
: latest_snapshot ;
auto iter = NewInternalIterator ( options , cfd , super_versions [ i ] ) ;
ArenaWrappedDBIter * db_iter = NewArenaWrappedDbIterator (
iter = NewDBIterator ( env_ , * cfd - > options ( ) ,
env_ , * cfd - > options ( ) , cfd - > user_comparator ( ) , snapshot ) ;
cfd - > user_comparator ( ) , iter , snapshot ) ;
Iterator * internal_iter = NewInternalIterator (
iterators - > push_back ( iter ) ;
options , cfd , super_versions [ i ] , db_iter - > GetArena ( ) ) ;
db_iter - > SetIterUnderDBIter ( internal_iter ) ;
iterators - > push_back ( db_iter ) ;
}
}
}
}