@ -894,6 +894,74 @@ TEST_F(DBRangeDelTest, MemtableBloomFilter) {
}
}
TEST_F ( DBRangeDelTest , CompactionTreatsSplitInputLevelDeletionAtomically ) {
// make sure compaction treats files containing a split range deletion in the
// input level as an atomic unit. I.e., compacting any input-level file(s)
// containing a portion of the range deletion causes all other input-level
// files containing portions of that same range deletion to be included in the
// compaction.
const int kNumFilesPerLevel = 4 , kValueBytes = 4 < < 10 ;
Options options = CurrentOptions ( ) ;
options . compression = kNoCompression ;
options . level0_file_num_compaction_trigger = kNumFilesPerLevel ;
options . memtable_factory . reset (
new SpecialSkipListFactory ( 2 /* num_entries_flush */ ) ) ;
options . target_file_size_base = kValueBytes ;
// i == 0: CompactFiles
// i == 1: CompactRange
// i == 2: automatic compaction
for ( int i = 0 ; i < 3 ; + + i ) {
DestroyAndReopen ( options ) ;
ASSERT_OK ( Put ( Key ( 0 ) , " " ) ) ;
ASSERT_OK ( db_ - > Flush ( FlushOptions ( ) ) ) ;
MoveFilesToLevel ( 2 ) ;
ASSERT_EQ ( 1 , NumTableFilesAtLevel ( 2 ) ) ;
// snapshot protects range tombstone from dropping due to becoming obsolete.
const Snapshot * snapshot = db_ - > GetSnapshot ( ) ;
db_ - > DeleteRange ( WriteOptions ( ) , db_ - > DefaultColumnFamily ( ) , Key ( 0 ) ,
Key ( 2 * kNumFilesPerLevel ) ) ;
Random rnd ( 301 ) ;
std : : string value = RandomString ( & rnd , kValueBytes ) ;
for ( int j = 0 ; j < kNumFilesPerLevel ; + + j ) {
// give files overlapping key-ranges to prevent trivial move
ASSERT_OK ( Put ( Key ( j ) , value ) ) ;
ASSERT_OK ( Put ( Key ( 2 * kNumFilesPerLevel - 1 - j ) , value ) ) ;
if ( j > 0 ) {
dbfull ( ) - > TEST_WaitForFlushMemTable ( ) ;
ASSERT_EQ ( j , NumTableFilesAtLevel ( 0 ) ) ;
}
}
// put extra key to trigger final flush
ASSERT_OK ( Put ( " " , " " ) ) ;
dbfull ( ) - > TEST_WaitForFlushMemTable ( ) ;
dbfull ( ) - > TEST_WaitForCompact ( ) ;
ASSERT_EQ ( 0 , NumTableFilesAtLevel ( 0 ) ) ;
ASSERT_EQ ( kNumFilesPerLevel , NumTableFilesAtLevel ( 1 ) ) ;
ColumnFamilyMetaData meta ;
db_ - > GetColumnFamilyMetaData ( & meta ) ;
if ( i = = 0 ) {
ASSERT_OK ( db_ - > CompactFiles (
CompactionOptions ( ) , { meta . levels [ 1 ] . files [ 0 ] . name } , 2 /* level */ ) ) ;
} else if ( i = = 1 ) {
auto begin_str = Key ( 0 ) , end_str = Key ( 1 ) ;
Slice begin = begin_str , end = end_str ;
ASSERT_OK ( db_ - > CompactRange ( CompactRangeOptions ( ) , & begin , & end ) ) ;
} else if ( i = = 2 ) {
ASSERT_OK ( db_ - > SetOptions ( db_ - > DefaultColumnFamily ( ) ,
{ { " max_bytes_for_level_base " , " 10000 " } } ) ) ;
dbfull ( ) - > TEST_WaitForCompact ( ) ;
}
ASSERT_EQ ( 0 , NumTableFilesAtLevel ( 1 ) ) ;
ASSERT_GT ( NumTableFilesAtLevel ( 2 ) , 0 ) ;
db_ - > ReleaseSnapshot ( snapshot ) ;
}
}
# endif // ROCKSDB_LITE
} // namespace rocksdb