@ -157,10 +157,12 @@ void CompactionIterator::Next() {
Status s = ParseInternalKey ( key_ , & ikey_ , allow_data_in_errors_ ) ;
// MergeUntil stops when it encounters a corrupt key and does not
// include them in the result, so we expect the keys here to be valid.
assert ( s . ok ( ) ) ;
if ( ! s . ok ( ) ) {
ROCKS_LOG_FATAL ( info_log_ , " Invalid key in compaction. %s " ,
s . getState ( ) ) ;
ROCKS_LOG_FATAL (
info_log_ , " Invalid ikey %s in compaction. %s " ,
allow_data_in_errors_ ? key_ . ToString ( true ) . c_str ( ) : " hidden " ,
s . getState ( ) ) ;
assert ( false ) ;
}
// Keep current_key_ in sync.
@ -517,18 +519,18 @@ void CompactionIterator::NextFromInput() {
// In the previous iteration we encountered a single delete that we could
// not compact out. We will keep this Put, but can drop it's data.
// (See Optimization 3, below.)
assert ( ikey_ . type = = kTypeValue | | ikey_ . type = = kTypeBlobIndex ) ;
if ( ikey_ . type ! = kTypeValue & & ikey_ . type ! = kTypeBlobIndex ) {
ROCKS_LOG_FATAL ( info_log_ ,
" Unexpected key type %d for compaction output " ,
ikey_ . typ e ) ;
ROCKS_LOG_FATAL ( info_log_ , " Unexpected key %s for compaction output " ,
ikey_ . DebugString ( allow_data_in_errors_ , true ) . c_str ( ) ) ;
assert ( fals e) ;
}
assert ( current_user_key_snapshot_ > = last_snapshot ) ;
if ( current_user_key_snapshot_ < last_snapshot ) {
ROCKS_LOG_FATAL ( info_log_ ,
" current_user_key_snapshot_ (% " PRIu64
" key %s, current_user_key_snapshot_ (%" PRIu64
" ) < last_snapshot (% " PRIu64 " ) " ,
ikey_ . DebugString ( allow_data_in_errors_ , true ) . c_str ( ) ,
current_user_key_snapshot_ , last_snapshot ) ;
assert ( false ) ;
}
if ( ikey_ . type = = kTypeBlobIndex ) {
@ -767,12 +769,13 @@ void CompactionIterator::NextFromInput() {
// Note: Dropping this key will not affect TransactionDB write-conflict
// checking since there has already been a record returned for this key
// in this snapshot.
assert ( last_sequence > = current_user_key_sequence_ ) ;
if ( last_sequence < current_user_key_sequence_ ) {
ROCKS_LOG_FATAL ( info_log_ ,
" last_sequence (% " PRIu64
" key %s, last_sequence (%" PRIu64
" ) < current_user_key_sequence_ (% " PRIu64 " ) " ,
ikey_ . DebugString ( allow_data_in_errors_ , true ) . c_str ( ) ,
last_sequence , current_user_key_sequence_ ) ;
assert ( false ) ;
}
+ + iter_stats_ . num_record_drop_hidden ; // rule (A)
@ -884,10 +887,12 @@ void CompactionIterator::NextFromInput() {
pik_status = ParseInternalKey ( key_ , & ikey_ , allow_data_in_errors_ ) ;
// MergeUntil stops when it encounters a corrupt key and does not
// include them in the result, so we expect the keys here to valid.
assert ( pik_status . ok ( ) ) ;
if ( ! pik_status . ok ( ) ) {
ROCKS_LOG_FATAL ( info_log_ , " Invalid key in compaction. %s " ,
pik_status . getState ( ) ) ;
ROCKS_LOG_FATAL (
info_log_ , " Invalid key %s in compaction. %s " ,
allow_data_in_errors_ ? key_ . ToString ( true ) . c_str ( ) : " hidden " ,
pik_status . getState ( ) ) ;
assert ( false ) ;
}
// Keep current_key_ in sync.
current_key_ . UpdateInternalKey ( ikey_ . sequence , ikey_ . type ) ;
@ -1090,15 +1095,22 @@ void CompactionIterator::PrepareOutput() {
! compaction_ - > allow_ingest_behind ( ) & & bottommost_level_ & &
DefinitelyInSnapshot ( ikey_ . sequence , earliest_snapshot_ ) & &
ikey_ . type ! = kTypeMerge & & current_key_committed_ ) {
assert ( ikey_ . type ! = kTypeDeletion ) ;
assert ( ikey_ . type ! = kTypeSingleDeletion | |
( timestamp_size_ | | full_history_ts_low_ ) ) ;
if ( ikey_ . type = = kTypeDeletion | |
( ikey_ . type = = kTypeSingleDeletion & &
( ! timestamp_size_ | | ! full_history_ts_low_ ) ) ) {
ROCKS_LOG_FATAL ( info_log_ ,
" Unexpected key type %d for seq-zero optimization " ,
ikey_ . type ) ;
( ikey_ . type = = kTypeSingleDeletion & & timestamp_size_ = = 0 ) ) {
ROCKS_LOG_FATAL (
info_log_ ,
" Unexpected key %s for seq-zero optimization. "
" earliest_snapshot % " PRIu64
" , earliest_write_conflict_snapshot % " PRIu64
" job_snapshot % " PRIu64
" . timestamp_size: %d full_history_ts_low_ %s " ,
ikey_ . DebugString ( allow_data_in_errors_ , true ) . c_str ( ) ,
earliest_snapshot_ , earliest_write_conflict_snapshot_ ,
job_snapshot_ , static_cast < int > ( timestamp_size_ ) ,
full_history_ts_low_ ! = nullptr
? Slice ( * full_history_ts_low_ ) . ToString ( true ) . c_str ( )
: " null " ) ;
assert ( false ) ;
}
ikey_ . sequence = 0 ;
last_key_seq_zeroed_ = true ;
@ -1129,14 +1141,17 @@ inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
}
auto snapshots_iter = std : : lower_bound (
snapshots_ - > begin ( ) , snapshots_ - > end ( ) , in ) ;
assert ( prev_snapshot ! = nullptr ) ;
if ( snapshots_iter = = snapshots_ - > begin ( ) ) {
* prev_snapshot = 0 ;
} else {
* prev_snapshot = * std : : prev ( snapshots_iter ) ;
assert ( * prev_snapshot < in ) ;
if ( * prev_snapshot > = in ) {
ROCKS_LOG_FATAL ( info_log_ ,
" *prev_snapshot >= in in findEarliestVisibleSnapshot " ) ;
" *prev_snapshot (% " PRIu64 " ) >= in (% " PRIu64
" ) in findEarliestVisibleSnapshot " ,
* prev_snapshot , in ) ;
assert ( false ) ;
}
}
if ( snapshot_checker_ = = nullptr ) {
@ -1146,9 +1161,12 @@ inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
bool has_released_snapshot = ! released_snapshots_ . empty ( ) ;
for ( ; snapshots_iter ! = snapshots_ - > end ( ) ; + + snapshots_iter ) {
auto cur = * snapshots_iter ;
assert ( in < = cur ) ;
if ( in > cur ) {
ROCKS_LOG_FATAL ( info_log_ , " in > cur in findEarliestVisibleSnapshot " ) ;
ROCKS_LOG_FATAL ( info_log_ ,
" in (% " PRIu64 " ) > cur (% " PRIu64
" ) in findEarliestVisibleSnapshot " ,
in , cur ) ;
assert ( false ) ;
}
// Skip if cur is in released_snapshots.
if ( has_released_snapshot & & released_snapshots_ . count ( cur ) > 0 ) {