@ -85,11 +85,11 @@ struct DBImpl::Writer {
struct DBImpl : : CompactionState {
struct DBImpl : : CompactionState {
Compaction * const compaction ;
Compaction * const compaction ;
// Sequence numbers < smallest_snapshot are not significant since we
// If there were two snapshots with seq numbers s1 and
// will never have to service a snapshot below smallest_snapshot.
// s2 and s1 < s2, and if we find two instances of a key k1 then lies
// Therefore if we have seen a sequence number S <= smallest_snapshot,
// entirely within s1 and s2, then the earlier version of k1 can be safely
// we can drop all entries for the same key with sequence numbers < S .
// deleted because that version is not visible in any snapshot .
SequenceNumber smallest_snapshot ;
std : : vector < SequenceNumber > existing_snapshots ;
// Files produced by compaction
// Files produced by compaction
struct Output {
struct Output {
@ -1262,6 +1262,32 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
return versions_ - > LogAndApply ( compact - > compaction - > edit ( ) , & mutex_ ) ;
return versions_ - > LogAndApply ( compact - > compaction - > edit ( ) , & mutex_ ) ;
}
}
//
// Given a sequence number, return the sequence number of the
// earliest snapshot that this sequence number is visible in.
// The snapshots themselves are arranged in ascending order of
// sequence numbers.
// Employ a sequential search because the total number of
// snapshots are typically small.
inline SequenceNumber DBImpl : : findEarliestVisibleSnapshot (
SequenceNumber in , std : : vector < SequenceNumber > & snapshots ) {
SequenceNumber prev ;
prev = 0 ;
for ( std : : vector < SequenceNumber > : : iterator it = snapshots . begin ( ) ;
it < snapshots . end ( ) ; it + + ) {
assert ( prev < = * it ) ;
if ( * it > = in ) {
return * it ;
}
assert ( prev = * it ) ; // assignment
}
Log ( options_ . info_log ,
" Looking for seqid %ld but maxseqid is %ld " , in ,
snapshots [ snapshots . size ( ) - 1 ] ) ;
assert ( 0 ) ;
return 0 ;
}
Status DBImpl : : DoCompactionWork ( CompactionState * compact ) {
Status DBImpl : : DoCompactionWork ( CompactionState * compact ) {
int64_t imm_micros = 0 ; // Micros spent doing imm_ compactions
int64_t imm_micros = 0 ; // Micros spent doing imm_ compactions
@ -1279,10 +1305,19 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
assert ( versions_ - > NumLevelFiles ( compact - > compaction - > level ( ) ) > 0 ) ;
assert ( versions_ - > NumLevelFiles ( compact - > compaction - > level ( ) ) > 0 ) ;
assert ( compact - > builder = = NULL ) ;
assert ( compact - > builder = = NULL ) ;
assert ( compact - > outfile = = NULL ) ;
assert ( compact - > outfile = = NULL ) ;
if ( snapshots_ . empty ( ) ) {
compact - > smallest_snapshot = versions_ - > LastSequence ( ) ;
SequenceNumber visible_at_tip = 0 ;
SequenceNumber earliest_snapshot ;
snapshots_ . getAll ( compact - > existing_snapshots ) ;
if ( compact - > existing_snapshots . size ( ) = = 0 ) {
// optimize for fast path if there are no snapshots
visible_at_tip = versions_ - > LastSequence ( ) ;
earliest_snapshot = visible_at_tip ;
} else {
} else {
compact - > smallest_snapshot = snapshots_ . oldest ( ) - > number_ ;
// Add the current seqno as the 'latest' virtual
// snapshot to the end of this list.
compact - > existing_snapshots . push_back ( versions_ - > LastSequence ( ) ) ;
earliest_snapshot = compact - > existing_snapshots [ 0 ] ;
}
}
// Allocate the output file numbers before we release the lock
// Allocate the output file numbers before we release the lock
@ -1299,6 +1334,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
std : : string current_user_key ;
std : : string current_user_key ;
bool has_current_user_key = false ;
bool has_current_user_key = false ;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber ;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber ;
SequenceNumber visible_in_snapshot = kMaxSequenceNumber ;
for ( ; input - > Valid ( ) & & ! shutting_down_ . Acquire_Load ( ) ; ) {
for ( ; input - > Valid ( ) & & ! shutting_down_ . Acquire_Load ( ) ; ) {
// Prioritize immutable compaction work
// Prioritize immutable compaction work
if ( imm_ . imm_flush_needed . NoBarrier_Load ( ) ! = NULL ) {
if ( imm_ . imm_flush_needed . NoBarrier_Load ( ) ! = NULL ) {
@ -1330,6 +1366,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
current_user_key . clear ( ) ;
current_user_key . clear ( ) ;
has_current_user_key = false ;
has_current_user_key = false ;
last_sequence_for_key = kMaxSequenceNumber ;
last_sequence_for_key = kMaxSequenceNumber ;
visible_in_snapshot = kMaxSequenceNumber ;
} else {
} else {
if ( ! has_current_user_key | |
if ( ! has_current_user_key | |
user_comparator ( ) - > Compare ( ikey . user_key ,
user_comparator ( ) - > Compare ( ikey . user_key ,
@ -1338,14 +1375,26 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
current_user_key . assign ( ikey . user_key . data ( ) , ikey . user_key . size ( ) ) ;
current_user_key . assign ( ikey . user_key . data ( ) , ikey . user_key . size ( ) ) ;
has_current_user_key = true ;
has_current_user_key = true ;
last_sequence_for_key = kMaxSequenceNumber ;
last_sequence_for_key = kMaxSequenceNumber ;
visible_in_snapshot = kMaxSequenceNumber ;
}
}
if ( last_sequence_for_key < = compact - > smallest_snapshot ) {
// If there are no snapshots, then this kv affect visibility at tip.
// Otherwise, search though all existing snapshots to find
// the earlist snapshot that is affected by this kv.
SequenceNumber visible = visible_at_tip ? visible_at_tip :
findEarliestVisibleSnapshot ( ikey . sequence ,
compact - > existing_snapshots ) ;
if ( visible_in_snapshot = = visible ) {
// If the earliest snapshot is which this key is visible in
// is the same as the visibily of a previous instance of the
// same key, then this kv is not visible in any snapshot.
// Hidden by an newer entry for same user key
// Hidden by an newer entry for same user key
assert ( last_sequence_for_key > = ikey . sequence ) ;
drop = true ; // (A)
drop = true ; // (A)
RecordTick ( options_ . statistics , COMPACTION_KEY_DROP_NEWER_ENTRY ) ;
RecordTick ( options_ . statistics , COMPACTION_KEY_DROP_NEWER_ENTRY ) ;
} else if ( ikey . type = = kTypeDeletion & &
} else if ( ikey . type = = kTypeDeletion & &
ikey . sequence < = compact - > smallest_snapshot & &
ikey . sequence < = earli est_snapshot & &
compact - > compaction - > IsBaseLevelForKey ( ikey . user_key ) ) {
compact - > compaction - > IsBaseLevelForKey ( ikey . user_key ) ) {
// For this user key:
// For this user key:
// (1) there is no data in higher levels
// (1) there is no data in higher levels
@ -1358,7 +1407,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
RecordTick ( options_ . statistics , COMPACTION_KEY_DROP_OBSOLETE ) ;
RecordTick ( options_ . statistics , COMPACTION_KEY_DROP_OBSOLETE ) ;
} else if ( options_ . CompactionFilter ! = NULL & &
} else if ( options_ . CompactionFilter ! = NULL & &
ikey . type ! = kTypeDeletion & &
ikey . type ! = kTypeDeletion & &
ikey . sequence < compact - > small est_snapshot) {
ikey . sequence < earli est_snapshot) {
// If the user has specified a compaction filter, then invoke
// If the user has specified a compaction filter, then invoke
// it. If this key is not visible via any snapshot and the
// it. If this key is not visible via any snapshot and the
// return value of the compaction filter is true and then
// return value of the compaction filter is true and then
@ -1378,6 +1427,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
}
}
last_sequence_for_key = ikey . sequence ;
last_sequence_for_key = ikey . sequence ;
visible_in_snapshot = visible ;
}
}
#if 0
#if 0
Log ( options_ . info_log ,
Log ( options_ . info_log ,
@ -1762,7 +1812,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
} else if (
} else if (
allow_delay & &
allow_delay & &
versions_ - > NumLevelFiles ( 0 ) > =
versions_ - > NumLevelFiles ( 0 ) > =
options_ . level0_slowdown_writes_trigger ) {
options_ . level0_slowdown_writes_trigger ) {
// We are getting close to hitting a hard limit on the number of
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
// seconds when we hit the hard limit, start delaying each
@ -1796,7 +1846,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
bg_cv_ . Wait ( ) ;
bg_cv_ . Wait ( ) ;
stall_memtable_compaction_ + = env_ - > NowMicros ( ) - t1 ;
stall_memtable_compaction_ + = env_ - > NowMicros ( ) - t1 ;
} else if ( versions_ - > NumLevelFiles ( 0 ) > =
} else if ( versions_ - > NumLevelFiles ( 0 ) > =
options_ . level0_stop_writes_trigger ) {
options_ . level0_stop_writes_trigger ) {
// There are too many level-0 files.
// There are too many level-0 files.
DelayLoggingAndReset ( ) ;
DelayLoggingAndReset ( ) ;
uint64_t t1 = env_ - > NowMicros ( ) ;
uint64_t t1 = env_ - > NowMicros ( ) ;