@ -3,6 +3,8 @@
// COPYING file in the root directory) and Apache 2.0 License
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// (found in the LICENSE.Apache file in the root directory).
# include <cinttypes>
# include "db/compaction/compaction_iterator.h"
# include "db/compaction/compaction_iterator.h"
# include "db/snapshot_checker.h"
# include "db/snapshot_checker.h"
# include "port/likely.h"
# include "port/likely.h"
@ -38,7 +40,8 @@ CompactionIterator::CompactionIterator(
const CompactionFilter * compaction_filter ,
const CompactionFilter * compaction_filter ,
const std : : atomic < bool > * shutting_down ,
const std : : atomic < bool > * shutting_down ,
const SequenceNumber preserve_deletes_seqnum ,
const SequenceNumber preserve_deletes_seqnum ,
const std : : atomic < bool > * manual_compaction_paused )
const std : : atomic < bool > * manual_compaction_paused ,
const std : : shared_ptr < Logger > info_log )
: CompactionIterator (
: CompactionIterator (
input , cmp , merge_helper , last_sequence , snapshots ,
input , cmp , merge_helper , last_sequence , snapshots ,
earliest_write_conflict_snapshot , snapshot_checker , env ,
earliest_write_conflict_snapshot , snapshot_checker , env ,
@ -46,7 +49,7 @@ CompactionIterator::CompactionIterator(
std : : unique_ptr < CompactionProxy > (
std : : unique_ptr < CompactionProxy > (
compaction ? new CompactionProxy ( compaction ) : nullptr ) ,
compaction ? new CompactionProxy ( compaction ) : nullptr ) ,
compaction_filter , shutting_down , preserve_deletes_seqnum ,
compaction_filter , shutting_down , preserve_deletes_seqnum ,
manual_compaction_paused ) { }
manual_compaction_paused , info_log ) { }
CompactionIterator : : CompactionIterator (
CompactionIterator : : CompactionIterator (
InternalIterator * input , const Comparator * cmp , MergeHelper * merge_helper ,
InternalIterator * input , const Comparator * cmp , MergeHelper * merge_helper ,
@ -59,7 +62,8 @@ CompactionIterator::CompactionIterator(
const CompactionFilter * compaction_filter ,
const CompactionFilter * compaction_filter ,
const std : : atomic < bool > * shutting_down ,
const std : : atomic < bool > * shutting_down ,
const SequenceNumber preserve_deletes_seqnum ,
const SequenceNumber preserve_deletes_seqnum ,
const std : : atomic < bool > * manual_compaction_paused )
const std : : atomic < bool > * manual_compaction_paused ,
const std : : shared_ptr < Logger > info_log )
: input_ ( input ) ,
: input_ ( input ) ,
cmp_ ( cmp ) ,
cmp_ ( cmp ) ,
merge_helper_ ( merge_helper ) ,
merge_helper_ ( merge_helper ) ,
@ -78,7 +82,8 @@ CompactionIterator::CompactionIterator(
current_user_key_sequence_ ( 0 ) ,
current_user_key_sequence_ ( 0 ) ,
current_user_key_snapshot_ ( 0 ) ,
current_user_key_snapshot_ ( 0 ) ,
merge_out_iter_ ( merge_helper_ ) ,
merge_out_iter_ ( merge_helper_ ) ,
current_key_committed_ ( false ) {
current_key_committed_ ( false ) ,
info_log_ ( info_log ) {
assert ( compaction_filter_ = = nullptr | | compaction_ ! = nullptr ) ;
assert ( compaction_filter_ = = nullptr | | compaction_ ! = nullptr ) ;
assert ( snapshots_ ! = nullptr ) ;
assert ( snapshots_ ! = nullptr ) ;
bottommost_level_ =
bottommost_level_ =
@ -142,6 +147,11 @@ void CompactionIterator::Next() {
// MergeUntil stops when it encounters a corrupt key and does not
// MergeUntil stops when it encounters a corrupt key and does not
// include them in the result, so we expect the keys here to be valid.
// include them in the result, so we expect the keys here to be valid.
assert ( valid_key ) ;
assert ( valid_key ) ;
if ( ! valid_key ) {
ROCKS_LOG_FATAL ( info_log_ , " Invalid key (%s) in compaction " ,
key_ . ToString ( true ) . c_str ( ) ) ;
}
// Keep current_key_ in sync.
// Keep current_key_ in sync.
current_key_ . UpdateInternalKey ( ikey_ . sequence , ikey_ . type ) ;
current_key_ . UpdateInternalKey ( ikey_ . sequence , ikey_ . type ) ;
key_ = current_key_ . GetInternalKey ( ) ;
key_ = current_key_ . GetInternalKey ( ) ;
@ -338,7 +348,18 @@ void CompactionIterator::NextFromInput() {
// not compact out. We will keep this Put, but can drop it's data.
// not compact out. We will keep this Put, but can drop it's data.
// (See Optimization 3, below.)
// (See Optimization 3, below.)
assert ( ikey_ . type = = kTypeValue ) ;
assert ( ikey_ . type = = kTypeValue ) ;
if ( ikey_ . type ! = kTypeValue ) {
ROCKS_LOG_FATAL ( info_log_ ,
" Unexpected key type %d for compaction output " ,
ikey_ . type ) ;
}
assert ( current_user_key_snapshot_ = = last_snapshot ) ;
assert ( current_user_key_snapshot_ = = last_snapshot ) ;
if ( current_user_key_snapshot_ ! = last_snapshot ) {
ROCKS_LOG_FATAL ( info_log_ ,
" current_user_key_snapshot_ (% " PRIu64
" ) != last_snapshot (% " PRIu64 " ) " ,
current_user_key_snapshot_ , last_snapshot ) ;
}
value_ . clear ( ) ;
value_ . clear ( ) ;
valid_ = true ;
valid_ = true ;
@ -480,6 +501,12 @@ void CompactionIterator::NextFromInput() {
// checking since there has already been a record returned for this key
// checking since there has already been a record returned for this key
// in this snapshot.
// in this snapshot.
assert ( last_sequence > = current_user_key_sequence_ ) ;
assert ( last_sequence > = current_user_key_sequence_ ) ;
if ( last_sequence < current_user_key_sequence_ ) {
ROCKS_LOG_FATAL ( info_log_ ,
" last_sequence (% " PRIu64
" ) < current_user_key_sequence_ (% " PRIu64 " ) " ,
last_sequence , current_user_key_sequence_ ) ;
}
+ + iter_stats_ . num_record_drop_hidden ; // (A)
+ + iter_stats_ . num_record_drop_hidden ; // (A)
input_ - > Next ( ) ;
input_ - > Next ( ) ;
@ -563,6 +590,10 @@ void CompactionIterator::NextFromInput() {
// MergeUntil stops when it encounters a corrupt key and does not
// MergeUntil stops when it encounters a corrupt key and does not
// include them in the result, so we expect the keys here to valid.
// include them in the result, so we expect the keys here to valid.
assert ( valid_key ) ;
assert ( valid_key ) ;
if ( ! valid_key ) {
ROCKS_LOG_FATAL ( info_log_ , " Invalid key (%s) in compaction " ,
key_ . ToString ( true ) . c_str ( ) ) ;
}
// Keep current_key_ in sync.
// Keep current_key_ in sync.
current_key_ . UpdateInternalKey ( ikey_ . sequence , ikey_ . type ) ;
current_key_ . UpdateInternalKey ( ikey_ . sequence , ikey_ . type ) ;
key_ = current_key_ . GetInternalKey ( ) ;
key_ = current_key_ . GetInternalKey ( ) ;
@ -623,6 +654,11 @@ void CompactionIterator::PrepareOutput() {
ikeyNotNeededForIncrementalSnapshot ( ) & & bottommost_level_ & & valid_ & &
ikeyNotNeededForIncrementalSnapshot ( ) & & bottommost_level_ & & valid_ & &
IN_EARLIEST_SNAPSHOT ( ikey_ . sequence ) & & ikey_ . type ! = kTypeMerge ) {
IN_EARLIEST_SNAPSHOT ( ikey_ . sequence ) & & ikey_ . type ! = kTypeMerge ) {
assert ( ikey_ . type ! = kTypeDeletion & & ikey_ . type ! = kTypeSingleDeletion ) ;
assert ( ikey_ . type ! = kTypeDeletion & & ikey_ . type ! = kTypeSingleDeletion ) ;
if ( ikey_ . type = = kTypeDeletion | | ikey_ . type = = kTypeSingleDeletion ) {
ROCKS_LOG_FATAL ( info_log_ ,
" Unexpected key type %d for seq-zero optimization " ,
ikey_ . type ) ;
}
ikey_ . sequence = 0 ;
ikey_ . sequence = 0 ;
current_key_ . UpdateInternalKey ( 0 , ikey_ . type ) ;
current_key_ . UpdateInternalKey ( 0 , ikey_ . type ) ;
}
}
@ -631,6 +667,10 @@ void CompactionIterator::PrepareOutput() {
inline SequenceNumber CompactionIterator : : findEarliestVisibleSnapshot (
inline SequenceNumber CompactionIterator : : findEarliestVisibleSnapshot (
SequenceNumber in , SequenceNumber * prev_snapshot ) {
SequenceNumber in , SequenceNumber * prev_snapshot ) {
assert ( snapshots_ - > size ( ) ) ;
assert ( snapshots_ - > size ( ) ) ;
if ( snapshots_ - > size ( ) = = 0 ) {
ROCKS_LOG_FATAL ( info_log_ ,
" No snapshot left in findEarliestVisibleSnapshot " ) ;
}
auto snapshots_iter = std : : lower_bound (
auto snapshots_iter = std : : lower_bound (
snapshots_ - > begin ( ) , snapshots_ - > end ( ) , in ) ;
snapshots_ - > begin ( ) , snapshots_ - > end ( ) , in ) ;
if ( snapshots_iter = = snapshots_ - > begin ( ) ) {
if ( snapshots_iter = = snapshots_ - > begin ( ) ) {
@ -638,6 +678,10 @@ inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
} else {
} else {
* prev_snapshot = * std : : prev ( snapshots_iter ) ;
* prev_snapshot = * std : : prev ( snapshots_iter ) ;
assert ( * prev_snapshot < in ) ;
assert ( * prev_snapshot < in ) ;
if ( * prev_snapshot > = in ) {
ROCKS_LOG_FATAL ( info_log_ ,
" *prev_snapshot >= in in findEarliestVisibleSnapshot " ) ;
}
}
}
if ( snapshot_checker_ = = nullptr ) {
if ( snapshot_checker_ = = nullptr ) {
return snapshots_iter ! = snapshots_ - > end ( )
return snapshots_iter ! = snapshots_ - > end ( )
@ -647,6 +691,9 @@ inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
for ( ; snapshots_iter ! = snapshots_ - > end ( ) ; + + snapshots_iter ) {
for ( ; snapshots_iter ! = snapshots_ - > end ( ) ; + + snapshots_iter ) {
auto cur = * snapshots_iter ;
auto cur = * snapshots_iter ;
assert ( in < = cur ) ;
assert ( in < = cur ) ;
if ( in > cur ) {
ROCKS_LOG_FATAL ( info_log_ , " in > cur in findEarliestVisibleSnapshot " ) ;
}
// Skip if cur is in released_snapshots.
// Skip if cur is in released_snapshots.
if ( has_released_snapshot & & released_snapshots_ . count ( cur ) > 0 ) {
if ( has_released_snapshot & & released_snapshots_ . count ( cur ) > 0 ) {
continue ;
continue ;
@ -671,9 +718,14 @@ inline bool CompactionIterator::ikeyNotNeededForIncrementalSnapshot() {
bool CompactionIterator : : IsInEarliestSnapshot ( SequenceNumber sequence ) {
bool CompactionIterator : : IsInEarliestSnapshot ( SequenceNumber sequence ) {
assert ( snapshot_checker_ ! = nullptr ) ;
assert ( snapshot_checker_ ! = nullptr ) ;
assert ( earliest_snapshot_ = = kMaxSequenceNumber | |
bool pre_condition = ( earliest_snapshot_ = = kMaxSequenceNumber | |
( earliest_snapshot_iter_ ! = snapshots_ - > end ( ) & &
( earliest_snapshot_iter_ ! = snapshots_ - > end ( ) & &
* earliest_snapshot_iter_ = = earliest_snapshot_ ) ) ;
* earliest_snapshot_iter_ = = earliest_snapshot_ ) ) ;
assert ( pre_condition ) ;
if ( ! pre_condition ) {
ROCKS_LOG_FATAL ( info_log_ ,
" Pre-Condition is not hold in IsInEarliestSnapshot " ) ;
}
auto in_snapshot =
auto in_snapshot =
snapshot_checker_ - > CheckInSnapshot ( sequence , earliest_snapshot_ ) ;
snapshot_checker_ - > CheckInSnapshot ( sequence , earliest_snapshot_ ) ;
while ( UNLIKELY ( in_snapshot = = SnapshotCheckerResult : : kSnapshotReleased ) ) {
while ( UNLIKELY ( in_snapshot = = SnapshotCheckerResult : : kSnapshotReleased ) ) {
@ -692,6 +744,10 @@ bool CompactionIterator::IsInEarliestSnapshot(SequenceNumber sequence) {
snapshot_checker_ - > CheckInSnapshot ( sequence , earliest_snapshot_ ) ;
snapshot_checker_ - > CheckInSnapshot ( sequence , earliest_snapshot_ ) ;
}
}
assert ( in_snapshot ! = SnapshotCheckerResult : : kSnapshotReleased ) ;
assert ( in_snapshot ! = SnapshotCheckerResult : : kSnapshotReleased ) ;
if ( in_snapshot = = SnapshotCheckerResult : : kSnapshotReleased ) {
ROCKS_LOG_FATAL ( info_log_ ,
" Unexpected released snapshot in IsInEarliestSnapshot " ) ;
}
return in_snapshot = = SnapshotCheckerResult : : kInSnapshot ;
return in_snapshot = = SnapshotCheckerResult : : kInSnapshot ;
}
}