@ -453,11 +453,16 @@ int Version::PickLevelForMemTableOutput(
}
// Store in "*inputs" all files in "level" that overlap [begin,end]
// If hint_index is specified, then it points to a file in the
// overlapping range.
// The file_index returns a pointer to any file in an overlapping range.
void Version : : GetOverlappingInputs (
int level ,
const InternalKey * begin ,
const InternalKey * end ,
std : : vector < FileMetaData * > * inputs ) {
std : : vector < FileMetaData * > * inputs ,
int hint_index ,
int * file_index ) {
inputs - > clear ( ) ;
Slice user_begin , user_end ;
if ( begin ! = NULL ) {
@ -468,7 +473,8 @@ void Version::GetOverlappingInputs(
}
const Comparator * user_cmp = vset_ - > icmp_ . user_comparator ( ) ;
if ( begin ! = NULL & & end ! = NULL & & level > 0 ) {
GetOverlappingInputsBinarySearch ( level , user_begin , user_end , inputs ) ;
GetOverlappingInputsBinarySearch ( level , user_begin , user_end , inputs ,
hint_index , file_index ) ;
return ;
}
for ( size_t i = 0 ; i < files_ [ level ] . size ( ) ; ) {
@ -493,6 +499,8 @@ void Version::GetOverlappingInputs(
inputs - > clear ( ) ;
i = 0 ;
}
} else if ( file_index ) {
* file_index = i ;
}
}
}
@ -506,14 +514,24 @@ void Version::GetOverlappingInputsBinarySearch(
int level ,
const Slice & user_begin ,
const Slice & user_end ,
std : : vector < FileMetaData * > * inputs ) {
std : : vector < FileMetaData * > * inputs ,
int hint_index ,
int * file_index ) {
assert ( level > 0 ) ;
int min = 0 ;
int mid = 0 ;
int max = files_ [ level ] . size ( ) - 1 ;
bool foundOverlap = false ;
const Comparator * user_cmp = vset_ - > icmp_ . user_comparator ( ) ;
while ( min < = max ) {
// if the caller already knows the index of a file that has overlap,
// then we can skip the binary search.
if ( hint_index ! = - 1 ) {
mid = hint_index ;
foundOverlap = true ;
}
while ( ! foundOverlap & & min < = max ) {
mid = ( min + max ) / 2 ;
FileMetaData * f = files_ [ level ] [ mid ] ;
const Slice file_start = f - > smallest . user_key ( ) ;
@ -532,6 +550,10 @@ void Version::GetOverlappingInputsBinarySearch(
if ( ! foundOverlap ) {
return ;
}
// returns the index where an overlap is found
if ( file_index ) {
* file_index = mid ;
}
ExtendOverlappingInputs ( level , user_begin , user_end , inputs , mid ) ;
}
@ -546,13 +568,21 @@ void Version::ExtendOverlappingInputs(
std : : vector < FileMetaData * > * inputs ,
int midIndex ) {
// assert that the file at midIndex overlaps with the range
const Comparator * user_cmp = vset_ - > icmp_ . user_comparator ( ) ;
assert ( midIndex < files_ [ level ] . size ( ) ) ;
assert ( ( user_cmp - > Compare ( files_ [ level ] [ midIndex ] - > largest . user_key ( ) ,
user_begin ) > = 0 ) | |
( user_cmp - > Compare ( files_ [ level ] [ midIndex ] - > smallest . user_key ( ) ,
user_end ) < = 0 ) ) ;
# ifndef NDEBUG
{
// assert that the file at midIndex overlaps with the range
assert ( midIndex < files_ [ level ] . size ( ) ) ;
FileMetaData * f = files_ [ level ] [ midIndex ] ;
const Slice fstart = f - > smallest . user_key ( ) ;
const Slice flimit = f - > largest . user_key ( ) ;
if ( user_cmp - > Compare ( fstart , user_begin ) > = 0 ) {
assert ( user_cmp - > Compare ( fstart , user_end ) < = 0 ) ;
} else {
assert ( user_cmp - > Compare ( flimit , user_begin ) > = 0 ) ;
}
}
# endif
// check backwards from 'mid' to lower indices
for ( size_t i = midIndex ; i < files_ [ level ] . size ( ) ; i - - ) {
@ -1487,12 +1517,14 @@ Compaction* VersionSet::PickCompaction() {
if ( compact_pointer_ [ level ] . empty ( ) | |
icmp_ . Compare ( f - > largest . Encode ( ) , compact_pointer_ [ level ] ) > 0 ) {
c - > inputs_ [ 0 ] . push_back ( f ) ;
c - > base_index_ = i ;
break ;
}
}
if ( c - > inputs_ [ 0 ] . empty ( ) ) {
// Wrap-around to the beginning of the key space
c - > inputs_ [ 0 ] . push_back ( current_ - > files_ [ level ] [ 0 ] ) ;
c - > base_index_ = 0 ;
}
} else if ( seek_compaction ) {
level = current_ - > file_to_compact_level_ ;
@ -1527,7 +1559,8 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
InternalKey smallest , largest ;
GetRange ( c - > inputs_ [ 0 ] , & smallest , & largest ) ;
current_ - > GetOverlappingInputs ( level + 1 , & smallest , & largest , & c - > inputs_ [ 1 ] ) ;
current_ - > GetOverlappingInputs ( level + 1 , & smallest , & largest , & c - > inputs_ [ 1 ] ,
c - > parent_index_ , & c - > parent_index_ ) ;
// Get entire range covered by compaction
InternalKey all_start , all_limit ;
@ -1537,7 +1570,8 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
// changing the number of "level+1" files we pick up.
if ( ! c - > inputs_ [ 1 ] . empty ( ) ) {
std : : vector < FileMetaData * > expanded0 ;
current_ - > GetOverlappingInputs ( level , & all_start , & all_limit , & expanded0 ) ;
current_ - > GetOverlappingInputs ( level , & all_start , & all_limit , & expanded0 ,
c - > base_index_ , NULL ) ;
const int64_t inputs0_size = TotalFileSize ( c - > inputs_ [ 0 ] ) ;
const int64_t inputs1_size = TotalFileSize ( c - > inputs_ [ 1 ] ) ;
const int64_t expanded0_size = TotalFileSize ( expanded0 ) ;
@ -1548,7 +1582,8 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
GetRange ( expanded0 , & new_start , & new_limit ) ;
std : : vector < FileMetaData * > expanded1 ;
current_ - > GetOverlappingInputs ( level + 1 , & new_start , & new_limit ,
& expanded1 ) ;
& expanded1 , c - > parent_index_ ,
& c - > parent_index_ ) ;
if ( expanded1 . size ( ) = = c - > inputs_ [ 1 ] . size ( ) ) {
Log ( options_ - > info_log ,
" Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes) \n " ,
@ -1632,7 +1667,9 @@ Compaction::Compaction(int level, uint64_t target_file_size,
seek_compaction_ ( seek_compaction ) ,
grandparent_index_ ( 0 ) ,
seen_key_ ( false ) ,
overlapped_bytes_ ( 0 ) {
overlapped_bytes_ ( 0 ) ,
base_index_ ( - 1 ) ,
parent_index_ ( - 1 ) {
edit_ = new VersionEdit ( number_levels_ ) ;
level_ptrs_ = new size_t [ number_levels_ ] ;
for ( int i = 0 ; i < number_levels_ ; i + + ) {