@ -232,7 +232,7 @@ enum SaverState {
kFound ,
kFound ,
kDeleted ,
kDeleted ,
kCorrupt ,
kCorrupt ,
kMerge // value contains the current merge result (the operand )
kMerge // saver contains the current merge result (the operands )
} ;
} ;
struct Saver {
struct Saver {
SaverState state ;
SaverState state ;
@ -241,6 +241,7 @@ struct Saver {
bool * value_found ; // Is value set correctly? Used by KeyMayExist
bool * value_found ; // Is value set correctly? Used by KeyMayExist
std : : string * value ;
std : : string * value ;
const MergeOperator * merge_operator ;
const MergeOperator * merge_operator ;
std : : deque < std : : string > * merge_operands ; // the merge operations encountered
Logger * logger ;
Logger * logger ;
bool didIO ; // did we do any disk io?
bool didIO ; // did we do any disk io?
} ;
} ;
@ -261,6 +262,11 @@ static void MarkKeyMayExist(void* arg) {
static bool SaveValue ( void * arg , const Slice & ikey , const Slice & v , bool didIO ) {
static bool SaveValue ( void * arg , const Slice & ikey , const Slice & v , bool didIO ) {
Saver * s = reinterpret_cast < Saver * > ( arg ) ;
Saver * s = reinterpret_cast < Saver * > ( arg ) ;
std : : deque < std : : string > * const ops = s - > merge_operands ; // shorter alias
std : : string merge_result ; // temporary area for merge results later
assert ( s ! = nullptr & & ops ! = nullptr ) ;
ParsedInternalKey parsed_key ;
ParsedInternalKey parsed_key ;
// TODO: didIO and Merge?
// TODO: didIO and Merge?
s - > didIO = didIO ;
s - > didIO = didIO ;
@ -269,49 +275,57 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
s - > state = kCorrupt ;
s - > state = kCorrupt ;
} else {
} else {
if ( s - > ucmp - > Compare ( parsed_key . user_key , s - > user_key ) = = 0 ) {
if ( s - > ucmp - > Compare ( parsed_key . user_key , s - > user_key ) = = 0 ) {
// Key matches. Process it
switch ( parsed_key . type ) {
switch ( parsed_key . type ) {
case kTypeValue :
case kTypeValue :
if ( kNotFound = = s - > state ) {
if ( kNotFound = = s - > state ) {
s - > value - > assign ( v . data ( ) , v . size ( ) ) ;
} else if ( kMerge = = s - > state ) {
std : : string operand ;
swap ( operand , * s - > value ) ;
s - > merge_operator - > Merge ( s - > user_key , & v , operand ,
s - > value , s - > logger ) ;
} else {
assert ( false ) ;
}
s - > state = kFound ;
s - > state = kFound ;
return false ;
case kTypeMerge :
if ( kNotFound = = s - > state ) {
s - > state = kMerge ;
s - > value - > assign ( v . data ( ) , v . size ( ) ) ;
s - > value - > assign ( v . data ( ) , v . size ( ) ) ;
} else if ( kMerge = = s - > state ) {
} else if ( kMerge = = s - > state ) {
std : : string operand ;
assert ( s - > merge_operator ! = nullptr ) ;
swap ( operand , * s - > value ) ;
s - > state = kFound ;
s - > merge_operator - > Merge ( s - > user_key , & v , operand ,
if ( ! s - > merge_operator - > Merge ( s - > user_key , & v , * ops ,
s - > value , s - > logger ) ;
s - > value , s - > logger ) ) {
s - > state = kCorrupt ;
}
} else {
} else {
assert ( false ) ;
assert ( false ) ;
}
}
return tru e;
return fals e;
case kTypeDeletion :
case kTypeDeletion :
if ( kNotFound = = s - > state ) {
if ( kNotFound = = s - > state ) {
s - > state = kDeleted ;
s - > state = kDeleted ;
} else if ( kMerge = = s - > state ) {
} else if ( kMerge = = s - > state ) {
std : : string operand ;
swap ( operand , * s - > value ) ;
s - > merge_operator - > Merge ( s - > user_key , nullptr , operand ,
s - > value , s - > logger ) ;
s - > state = kFound ;
s - > state = kFound ;
if ( ! s - > merge_operator - > Merge ( s - > user_key , nullptr , * ops ,
s - > value , s - > logger ) ) {
s - > state = kCorrupt ;
}
} else {
} else {
assert ( false ) ;
assert ( false ) ;
}
}
return false ;
return false ;
case kTypeMerge :
assert ( s - > state = = kNotFound | | s - > state = = kMerge ) ;
s - > state = kMerge ;
ops - > push_front ( v . ToString ( ) ) ;
while ( ops - > size ( ) > = 2 ) {
// Attempt to merge operands together via user associateive merge
if ( s - > merge_operator - > PartialMerge ( s - > user_key ,
Slice ( ( * ops ) [ 0 ] ) ,
Slice ( ( * ops ) [ 1 ] ) ,
& merge_result ,
s - > logger ) ) {
ops - > pop_front ( ) ;
swap ( ops - > front ( ) , merge_result ) ;
} else {
// Associative merge returns false ==> stack the operands
break ;
}
}
return true ;
}
}
}
}
}
}
@ -341,7 +355,8 @@ Version::Version(VersionSet* vset, uint64_t version_number)
void Version : : Get ( const ReadOptions & options ,
void Version : : Get ( const ReadOptions & options ,
const LookupKey & k ,
const LookupKey & k ,
std : : string * value ,
std : : string * value ,
Status * status ,
Status * status ,
std : : deque < std : : string > * operands ,
GetStats * stats ,
GetStats * stats ,
const Options & db_options ,
const Options & db_options ,
const bool no_io ,
const bool no_io ,
@ -364,6 +379,7 @@ void Version::Get(const ReadOptions& options,
saver . value_found = value_found ;
saver . value_found = value_found ;
saver . value = value ;
saver . value = value ;
saver . merge_operator = merge_operator ;
saver . merge_operator = merge_operator ;
saver . merge_operands = operands ;
saver . logger = logger . get ( ) ;
saver . logger = logger . get ( ) ;
saver . didIO = false ;
saver . didIO = false ;
@ -374,55 +390,68 @@ void Version::Get(const ReadOptions& options,
// We can search level-by-level since entries never hop across
// We can search level-by-level since entries never hop across
// levels. Therefore we are guaranteed that if we find data
// levels. Therefore we are guaranteed that if we find data
// in an smaller level, later levels are irrelevant.
// in an smaller level, later levels are irrelevant (unless we
std : : vector < FileMetaData * > tmp ;
// are MergeInProgress).
FileMetaData * tmp2 ;
std : : vector < FileMetaData * > important_files ;
for ( int level = 0 ; level < vset_ - > NumberLevels ( ) ; level + + ) {
for ( int level = 0 ; level < vset_ - > NumberLevels ( ) ; level + + ) {
size_t num_files = files_ [ level ] . size ( ) ;
size_t num_files = files_ [ level ] . size ( ) ;
if ( num_files = = 0 ) continue ;
if ( num_files = = 0 ) continue ;
// Get the list of files to search in this level
// Get the list of files to search in this level
FileMetaData * const * files = & files_ [ level ] [ 0 ] ;
FileMetaData * const * files = & files_ [ level ] [ 0 ] ;
important_files . clear ( ) ;
important_files . reserve ( num_files ) ;
// Some files may overlap each other. We find
// all files that overlap user_key and process them in order from
// newest to oldest. In the context of merge-operator,
// this can occur at any level. Otherwise, it only occurs
// at Level-0 (since Put/Deletes are always compacted into a single entry).
uint32_t start_index ;
if ( level = = 0 ) {
if ( level = = 0 ) {
// Level-0 files may overlap each other. Find all files that
// On Level-0, we read through all files to check for overlap.
// overlap user_key and process them in order from newest to oldest.
start_index = 0 ;
tmp . reserve ( num_files ) ;
} else {
for ( uint32_t i = 0 ; i < num_files ; i + + ) {
// On Level-n (n>=1), files are sorted.
// Binary search to find earliest index whose largest key >= ikey.
// We will also stop when the file no longer overlaps ikey
start_index = FindFile ( vset_ - > icmp_ , files_ [ level ] , ikey ) ;
}
// Traverse the list, finding all overlapping files.
for ( uint32_t i = start_index ; i < num_files ; i + + ) {
FileMetaData * f = files [ i ] ;
FileMetaData * f = files [ i ] ;
if ( ucmp - > Compare ( user_key , f - > smallest . user_key ( ) ) > = 0 & &
if ( ucmp - > Compare ( user_key , f - > smallest . user_key ( ) ) > = 0 & &
ucmp - > Compare ( user_key , f - > largest . user_key ( ) ) < = 0 ) {
ucmp - > Compare ( user_key , f - > largest . user_key ( ) ) < = 0 ) {
tmp . push_back ( f ) ;
important_files . push_back ( f ) ;
} else if ( level > 0 ) {
// If on Level-n (n>=1) then the files are sorted.
// So we can stop looking when we are past the ikey.
break ;
}
}
}
}
if ( tmp . empty ( ) ) continue ;
std : : sort ( tmp . begin ( ) , tmp . end ( ) , NewestFirst ) ;
if ( important_files . empty ( ) ) continue ;
files = & tmp [ 0 ] ;
num_files = tmp . size ( ) ;
if ( level = = 0 ) {
} else {
std : : sort ( important_files . begin ( ) , important_files . end ( ) , NewestFirst ) ;
// Binary search to find earliest index whose largest key >= ikey.
uint32_t index = FindFile ( vset_ - > icmp_ , files_ [ level ] , ikey ) ;
if ( index > = num_files ) {
files = nullptr ;
num_files = 0 ;
} else {
tmp2 = files [ index ] ;
if ( ucmp - > Compare ( user_key , tmp2 - > smallest . user_key ( ) ) < 0 ) {
// All of "tmp2" is past any data for user_key
files = nullptr ;
num_files = 0 ;
} else {
} else {
files = & tmp2 ;
// Sanity check to make sure that the files are correctly sorted
num_files = 1 ;
# ifndef NDEBUG
// TODO, is level 1-n files all disjoint in user key space?
num_files = important_files . size ( ) ;
}
for ( uint32_t i = 1 ; i < num_files ; + + i ) {
FileMetaData * a = important_files [ i - 1 ] ;
FileMetaData * b = important_files [ i ] ;
int comp_sign = vset_ - > icmp_ . Compare ( a - > largest , b - > smallest ) ;
assert ( comp_sign < 0 ) ;
}
}
# endif
}
}
// Traverse each relevant file to find the desired key
num_files = important_files . size ( ) ;
for ( uint32_t i = 0 ; i < num_files ; + + i ) {
for ( uint32_t i = 0 ; i < num_files ; + + i ) {
FileMetaData * f = files [ i ] ;
FileMetaData * f = important_ files[ i ] ;
bool tableIO = false ;
bool tableIO = false ;
* status = vset_ - > table_cache_ - > Get ( options , f - > number , f - > file_size ,
* status = vset_ - > table_cache_ - > Get ( options , f - > number , f - > file_size ,
ikey , & saver , SaveValue , & tableIO ,
ikey , & saver , SaveValue , & tableIO ,
@ -469,17 +498,17 @@ void Version::Get(const ReadOptions& options,
if ( kMerge = = saver . state ) {
if ( kMerge = = saver . state ) {
// merge operand is in *value and we hit the beginning of the key history
// merge_operands are in saver and we hit the beginning of the key history
// do a final merge of nullptr and operand;
// do a final merge of nullptr and operands;
std : : string operand ;
if ( merge_operator - > Merge ( user_key , nullptr , * saver . merge_operands ,
swap ( operand , * value ) ;
value , logger . get ( ) ) ) {
merge_operator - > Merge ( user_key , nullptr , operand ,
value , logger . get ( ) ) ;
* status = Status : : OK ( ) ;
* status = Status : : OK ( ) ;
return ;
} else {
* status = Status : : Corruption ( " could not perform end-of-key merge for " ,
user_key ) ;
}
} else {
} else {
* status = Status : : NotFound ( Slice ( ) ) ; // Use an empty error message for speed
* status = Status : : NotFound ( Slice ( ) ) ; // Use an empty error message for speed
return ;
}
}
}
}
@ -717,6 +746,53 @@ void Version::ExtendOverlappingInputs(
}
}
}
}
// Returns true iff the first or last file in inputs contains
// an overlapping user key to the file "just outside" of it (i.e.
// just after the last file, or just before the first file)
// REQUIRES: "*inputs" is a sorted list of non-overlapping files
bool Version : : HasOverlappingUserKey (
const std : : vector < FileMetaData * > * inputs ,
int level ) {
// If inputs empty, there is no overlap.
// If level == 0, it is assumed that all needed files were already included.
if ( inputs - > empty ( ) | | level = = 0 ) {
return false ;
}
const Comparator * user_cmp = vset_ - > icmp_ . user_comparator ( ) ;
const std : : vector < FileMetaData * > & files = files_ [ level ] ;
const size_t kNumFiles = files . size ( ) ;
// Check the last file in inputs against the file after it
size_t last_file = FindFile ( vset_ - > icmp_ , files ,
inputs - > back ( ) - > largest . Encode ( ) ) ;
assert ( 0 < = last_file & & last_file < kNumFiles ) ; // File should exist!
if ( last_file < kNumFiles - 1 ) { // If not the last file
const Slice last_key_in_input = files [ last_file ] - > largest . user_key ( ) ;
const Slice first_key_after = files [ last_file + 1 ] - > smallest . user_key ( ) ;
if ( user_cmp - > Compare ( last_key_in_input , first_key_after ) = = 0 ) {
// The last user key in input overlaps with the next file's first key
return true ;
}
}
// Check the first file in inputs against the file just before it
size_t first_file = FindFile ( vset_ - > icmp_ , files ,
inputs - > front ( ) - > smallest . Encode ( ) ) ;
assert ( 0 < = first_file & & first_file < = last_file ) ; // File should exist!
if ( first_file > 0 ) { // If not first file
const Slice & first_key_in_input = files [ first_file ] - > smallest . user_key ( ) ;
const Slice & last_key_before = files [ first_file - 1 ] - > largest . user_key ( ) ;
if ( user_cmp - > Compare ( first_key_in_input , last_key_before ) = = 0 ) {
// The first user key in input overlaps with the previous file's last key
return true ;
}
}
return false ;
}
std : : string Version : : DebugString ( bool hex ) const {
std : : string Version : : DebugString ( bool hex ) const {
std : : string r ;
std : : string r ;
for ( int level = 0 ; level < vset_ - > NumberLevels ( ) ; level + + ) {
for ( int level = 0 ; level < vset_ - > NumberLevels ( ) ; level + + ) {
@ -1566,7 +1642,7 @@ void VersionSet::Finalize(Version* v,
// sort all the levels based on their score. Higher scores get listed
// sort all the levels based on their score. Higher scores get listed
// first. Use bubble sort because the number of entries are small.
// first. Use bubble sort because the number of entries are small.
for ( int i = 0 ; i < NumberLevels ( ) - 2 ; i + + ) {
for ( int i = 0 ; i < NumberLevels ( ) - 2 ; i + + ) {
for ( int j = i + 1 ; j < NumberLevels ( ) - 1 ; j + + ) {
for ( int j = i + 1 ; j < NumberLevels ( ) - 1 ; j + + ) {
if ( v - > compaction_score_ [ i ] < v - > compaction_score_ [ j ] ) {
if ( v - > compaction_score_ [ i ] < v - > compaction_score_ [ j ] ) {
double score = v - > compaction_score_ [ i ] ;
double score = v - > compaction_score_ [ i ] ;
@ -2060,7 +2136,7 @@ Compaction* VersionSet::PickCompaction() {
Compaction * c = nullptr ;
Compaction * c = nullptr ;
int level = - 1 ;
int level = - 1 ;
// c ompute the compactions needed. It is better to do it here
// C ompute the compactions needed. It is better to do it here
// and also in LogAndApply(), otherwise the values could be stale.
// and also in LogAndApply(), otherwise the values could be stale.
std : : vector < uint64_t > size_being_compacted ( NumberLevels ( ) - 1 ) ;
std : : vector < uint64_t > size_being_compacted ( NumberLevels ( ) - 1 ) ;
current_ - > vset_ - > SizeBeingCompacted ( size_being_compacted ) ;
current_ - > vset_ - > SizeBeingCompacted ( size_being_compacted ) ;
@ -2076,6 +2152,7 @@ Compaction* VersionSet::PickCompaction() {
level = current_ - > compaction_level_ [ i ] ;
level = current_ - > compaction_level_ [ i ] ;
if ( ( current_ - > compaction_score_ [ i ] > = 1 ) ) {
if ( ( current_ - > compaction_score_ [ i ] > = 1 ) ) {
c = PickCompactionBySize ( level , current_ - > compaction_score_ [ i ] ) ;
c = PickCompactionBySize ( level , current_ - > compaction_score_ [ i ] ) ;
ExpandWhileOverlapping ( c ) ;
if ( c ! = nullptr ) {
if ( c ! = nullptr ) {
break ;
break ;
}
}
@ -2092,13 +2169,14 @@ Compaction* VersionSet::PickCompaction() {
// Only allow one level 0 compaction at a time.
// Only allow one level 0 compaction at a time.
// Do not pick this file if its parents at level+1 are being compacted.
// Do not pick this file if its parents at level+1 are being compacted.
if ( level ! = 0 | | compactions_in_progress_ [ 0 ] . empty ( ) ) {
if ( level ! = 0 | | compactions_in_progress_ [ 0 ] . empty ( ) ) {
if ( ! ParentRangeInCompaction ( & f - > smallest , & f - > largest , level ,
if ( ! ParentRangeInCompaction ( & f - > smallest , & f - > largest , level ,
& parent_index ) ) {
& parent_index ) ) {
c = new Compaction ( level , MaxFileSizeForLevel ( level + 1 ) ,
c = new Compaction ( level , MaxFileSizeForLevel ( level + 1 ) ,
MaxGrandParentOverlapBytes ( level ) , NumberLevels ( ) , true ) ;
MaxGrandParentOverlapBytes ( level ) , NumberLevels ( ) , true ) ;
c - > inputs_ [ 0 ] . push_back ( f ) ;
c - > inputs_ [ 0 ] . push_back ( f ) ;
c - > parent_index_ = parent_index ;
c - > parent_index_ = parent_index ;
current_ - > file_to_compact_ = nullptr ;
current_ - > file_to_compact_ = nullptr ;
ExpandWhileOverlapping ( c ) ;
}
}
}
}
}
}
@ -2110,7 +2188,6 @@ Compaction* VersionSet::PickCompaction() {
c - > input_version_ = current_ ;
c - > input_version_ = current_ ;
c - > input_version_ - > Ref ( ) ;
c - > input_version_ - > Ref ( ) ;
// Files in level 0 may overlap each other, so pick up all overlapping ones
// Two level 0 compaction won't run at the same time, so don't need to worry
// Two level 0 compaction won't run at the same time, so don't need to worry
// about files on level 0 being compacted.
// about files on level 0 being compacted.
if ( level = = 0 ) {
if ( level = = 0 ) {
@ -2135,6 +2212,8 @@ Compaction* VersionSet::PickCompaction() {
assert ( ! c - > inputs_ [ 0 ] . empty ( ) ) ;
assert ( ! c - > inputs_ [ 0 ] . empty ( ) ) ;
}
}
// Setup "level+1" files (inputs_[1])
SetupOtherInputs ( c ) ;
SetupOtherInputs ( c ) ;
// mark all the files that are being compacted
// mark all the files that are being compacted
@ -2166,11 +2245,76 @@ bool VersionSet::FilesInCompaction(std::vector<FileMetaData*>& files) {
return false ;
return false ;
}
}
// Add more files to the inputs on "level" to make sure that
// no newer version of a key is compacted to "level+1" while leaving an older
// version in a "level". Otherwise, any Get() will search "level" first,
// and will likely return an old/stale value for the key, since it always
// searches in increasing order of level to find the value. This could
// also scramble the order of merge operands. This function should be
// called any time a new Compaction is created, and its inputs_[0] are
// populated.
//
// Will set c to nullptr if it is impossible to apply this compaction.
void VersionSet : : ExpandWhileOverlapping ( Compaction * c ) {
// If inputs are empty then there is nothing to expand.
if ( ! c | | c - > inputs_ [ 0 ] . empty ( ) ) {
return ;
}
// GetOverlappingInputs will always do the right thing for level-0.
// So we don't need to do any expansion if level == 0.
if ( c - > level ( ) = = 0 ) {
return ;
}
const int level = c - > level ( ) ;
InternalKey smallest , largest ;
// Keep expanding c->inputs_[0] until we are sure that there is a
// "clean cut" boundary between the files in input and the surrounding files.
// This will ensure that no parts of a key are lost during compaction.
int hint_index = - 1 ;
size_t old_size ;
do {
old_size = c - > inputs_ [ 0 ] . size ( ) ;
GetRange ( c - > inputs_ [ 0 ] , & smallest , & largest ) ;
c - > inputs_ [ 0 ] . clear ( ) ;
current_ - > GetOverlappingInputs ( level , & smallest , & largest , & c - > inputs_ [ 0 ] ,
hint_index , & hint_index ) ;
} while ( c - > inputs_ [ 0 ] . size ( ) > old_size ) ;
// Get the new range
GetRange ( c - > inputs_ [ 0 ] , & smallest , & largest ) ;
// If, after the expansion, there are files that are already under
// compaction, then we must drop/cancel this compaction.
int parent_index = - 1 ;
if ( FilesInCompaction ( c - > inputs_ [ 0 ] ) | |
ParentRangeInCompaction ( & smallest , & largest , level , & parent_index ) ) {
c - > inputs_ [ 0 ] . clear ( ) ;
c - > inputs_ [ 1 ] . clear ( ) ;
delete c ;
c = nullptr ;
}
}
// Populates the set of inputs from "level+1" that overlap with "level".
// Will also attempt to expand "level" if that doesn't expand "level+1"
// or cause "level" to include a file for compaction that has an overlapping
// user-key with another file.
void VersionSet : : SetupOtherInputs ( Compaction * c ) {
void VersionSet : : SetupOtherInputs ( Compaction * c ) {
// If inputs are empty, then there is nothing to expand.
if ( c - > inputs_ [ 0 ] . empty ( ) ) {
return ;
}
const int level = c - > level ( ) ;
const int level = c - > level ( ) ;
InternalKey smallest , largest ;
InternalKey smallest , largest ;
// Get the range one last time.
GetRange ( c - > inputs_ [ 0 ] , & smallest , & largest ) ;
GetRange ( c - > inputs_ [ 0 ] , & smallest , & largest ) ;
// Populate the set of next-level files (inputs_[1]) to include in compaction
current_ - > GetOverlappingInputs ( level + 1 , & smallest , & largest , & c - > inputs_ [ 1 ] ,
current_ - > GetOverlappingInputs ( level + 1 , & smallest , & largest , & c - > inputs_ [ 1 ] ,
c - > parent_index_ , & c - > parent_index_ ) ;
c - > parent_index_ , & c - > parent_index_ ) ;
@ -2178,8 +2322,11 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
InternalKey all_start , all_limit ;
InternalKey all_start , all_limit ;
GetRange2 ( c - > inputs_ [ 0 ] , c - > inputs_ [ 1 ] , & all_start , & all_limit ) ;
GetRange2 ( c - > inputs_ [ 0 ] , c - > inputs_ [ 1 ] , & all_start , & all_limit ) ;
// See if we can grow the number of inputs in "level" without
// See if we can further grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up.
// changing the number of "level+1" files we pick up. We also choose NOT
// to expand if this would cause "level" to include some entries for some
// user key, while excluding other entries for the same user key. This
// can happen when one user key spans multiple files.
if ( ! c - > inputs_ [ 1 ] . empty ( ) ) {
if ( ! c - > inputs_ [ 1 ] . empty ( ) ) {
std : : vector < FileMetaData * > expanded0 ;
std : : vector < FileMetaData * > expanded0 ;
current_ - > GetOverlappingInputs ( level , & all_start , & all_limit , & expanded0 ,
current_ - > GetOverlappingInputs ( level , & all_start , & all_limit , & expanded0 ,
@ -2190,7 +2337,8 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
int64_t limit = ExpandedCompactionByteSizeLimit ( level ) ;
int64_t limit = ExpandedCompactionByteSizeLimit ( level ) ;
if ( expanded0 . size ( ) > c - > inputs_ [ 0 ] . size ( ) & &
if ( expanded0 . size ( ) > c - > inputs_ [ 0 ] . size ( ) & &
inputs1_size + expanded0_size < limit & &
inputs1_size + expanded0_size < limit & &
! FilesInCompaction ( expanded0 ) ) {
! FilesInCompaction ( expanded0 ) & &
! current_ - > HasOverlappingUserKey ( & expanded0 , level ) ) {
InternalKey new_start , new_limit ;
InternalKey new_start , new_limit ;
GetRange ( expanded0 , & new_start , & new_limit ) ;
GetRange ( expanded0 , & new_start , & new_limit ) ;
std : : vector < FileMetaData * > expanded1 ;
std : : vector < FileMetaData * > expanded1 ;
@ -2249,13 +2397,11 @@ Compaction* VersionSet::CompactRange(
return nullptr ;
return nullptr ;
}
}
// Avoid compacting too much in one shot in case the range is large.
// Avoid compacting too much in one shot in case the range is large.
// But we cannot do this for level-0 since level-0 files can overlap
// But we cannot do this for level-0 since level-0 files can overlap
// and we must not pick one file and drop another older file if the
// and we must not pick one file and drop another older file if the
// two files overlap.
// two files overlap.
if ( level > 0 ) {
if ( level > 0 ) {
const uint64_t limit = MaxFileSizeForLevel ( level ) *
const uint64_t limit = MaxFileSizeForLevel ( level ) *
options_ - > source_compaction_factor ;
options_ - > source_compaction_factor ;
uint64_t total = 0 ;
uint64_t total = 0 ;
@ -2268,12 +2414,18 @@ Compaction* VersionSet::CompactRange(
}
}
}
}
}
}
Compaction * c = new Compaction ( level , MaxFileSizeForLevel ( level + 1 ) ,
Compaction * c = new Compaction ( level , MaxFileSizeForLevel ( level + 1 ) ,
MaxGrandParentOverlapBytes ( level ) , NumberLevels ( ) ) ;
MaxGrandParentOverlapBytes ( level ) , NumberLevels ( ) ) ;
c - > inputs_ [ 0 ] = inputs ;
ExpandWhileOverlapping ( c ) ;
if ( c = = nullptr ) {
Log ( options_ - > info_log , " Could not compact due to expansion failure. \n " ) ;
return nullptr ;
}
c - > input_version_ = current_ ;
c - > input_version_ = current_ ;
c - > input_version_ - > Ref ( ) ;
c - > input_version_ - > Ref ( ) ;
c - > inputs_ [ 0 ] = inputs ;
SetupOtherInputs ( c ) ;
SetupOtherInputs ( c ) ;
// These files that are to be manaully compacted do not trample
// These files that are to be manaully compacted do not trample
@ -2418,8 +2570,9 @@ void Compaction::Summary(char* output, int len) {
int write = snprintf ( output , len ,
int write = snprintf ( output , len ,
" Base version %ld Base level %d, seek compaction:%d, inputs: " ,
" Base version %ld Base level %d, seek compaction:%d, inputs: " ,
input_version_ - > GetVersionNumber ( ) , level_ , seek_compaction_ ) ;
input_version_ - > GetVersionNumber ( ) , level_ , seek_compaction_ ) ;
if ( write < 0 | | write > len )
if ( write < 0 | | write > len ) {
return ;
return ;
}
char level_low_summary [ 100 ] ;
char level_low_summary [ 100 ] ;
InputSummary ( inputs_ [ 0 ] , level_low_summary , sizeof ( level_low_summary ) ) ;
InputSummary ( inputs_ [ 0 ] , level_low_summary , sizeof ( level_low_summary ) ) ;