@ -2125,6 +2125,117 @@ class DBBasicTestWithTimestamp : public DBBasicTestWithTimestampBase {
} ;
} ;
# ifndef ROCKSDB_LITE
// A class which remembers the name of each flushed file.
class FlushedFileCollector : public EventListener {
public :
FlushedFileCollector ( ) { }
~ FlushedFileCollector ( ) override { }
void OnFlushCompleted ( DB * /*db*/ , const FlushJobInfo & info ) override {
InstrumentedMutexLock lock ( & mutex_ ) ;
flushed_files_ . push_back ( info . file_path ) ;
}
std : : vector < std : : string > GetFlushedFiles ( ) {
std : : vector < std : : string > result ;
{
InstrumentedMutexLock lock ( & mutex_ ) ;
result = flushed_files_ ;
}
return result ;
}
void ClearFlushedFiles ( ) {
InstrumentedMutexLock lock ( & mutex_ ) ;
flushed_files_ . clear ( ) ;
}
private :
std : : vector < std : : string > flushed_files_ ;
InstrumentedMutex mutex_ ;
} ;
TEST_F ( DBBasicTestWithTimestamp , PutAndGetWithCompaction ) {
const int kNumKeysPerFile = 8192 ;
const size_t kNumTimestamps = 2 ;
const size_t kNumKeysPerTimestamp = ( kNumKeysPerFile - 1 ) / kNumTimestamps ;
const size_t kSplitPosBase = kNumKeysPerTimestamp / 2 ;
Options options = CurrentOptions ( ) ;
options . create_if_missing = true ;
options . env = env_ ;
options . memtable_factory . reset ( new SpecialSkipListFactory ( kNumKeysPerFile ) ) ;
FlushedFileCollector * collector = new FlushedFileCollector ( ) ;
options . listeners . emplace_back ( collector ) ;
std : : string tmp ;
size_t ts_sz = EncodeTimestamp ( 0 , 0 , & tmp ) . size ( ) ;
TestComparator test_cmp ( ts_sz ) ;
options . comparator = & test_cmp ;
BlockBasedTableOptions bbto ;
bbto . filter_policy . reset ( NewBloomFilterPolicy (
10 /*bits_per_key*/ , false /*use_block_based_builder*/ ) ) ;
bbto . whole_key_filtering = true ;
options . table_factory . reset ( NewBlockBasedTableFactory ( bbto ) ) ;
DestroyAndReopen ( options ) ;
CreateAndReopenWithCF ( { " pikachu " } , options ) ;
size_t num_cfs = handles_ . size ( ) ;
ASSERT_EQ ( 2 , num_cfs ) ;
std : : vector < std : : string > write_ts_strs ( kNumTimestamps ) ;
std : : vector < std : : string > read_ts_strs ( kNumTimestamps ) ;
std : : vector < Slice > write_ts_list ;
std : : vector < Slice > read_ts_list ;
for ( size_t i = 0 ; i ! = kNumTimestamps ; + + i ) {
write_ts_list . emplace_back ( EncodeTimestamp ( i * 2 , 0 , & write_ts_strs [ i ] ) ) ;
read_ts_list . emplace_back ( EncodeTimestamp ( 1 + i * 2 , 0 , & read_ts_strs [ i ] ) ) ;
const Slice & write_ts = write_ts_list . back ( ) ;
WriteOptions wopts ;
wopts . timestamp = & write_ts ;
for ( int cf = 0 ; cf ! = static_cast < int > ( num_cfs ) ; + + cf ) {
for ( size_t j = 0 ; j ! = kNumKeysPerTimestamp ; + + j ) {
ASSERT_OK ( Put ( cf , " key " + std : : to_string ( j ) ,
" value_ " + std : : to_string ( j ) + " _ " + std : : to_string ( i ) ,
wopts ) ) ;
if ( j = = kSplitPosBase + i | | j = = kNumKeysPerTimestamp - 1 ) {
// flush all keys with the same timestamp to two sst files, split at
// incremental positions such that lowerlevel[1].smallest.userkey ==
// higherlevel[0].largest.userkey
ASSERT_OK ( Flush ( cf ) ) ;
// compact files (2 at each level) to a lower level such that all keys
// with the same timestamp is at one level, with newer versions at
// higher levels.
CompactionOptions compact_opt ;
compact_opt . compression = kNoCompression ;
db_ - > CompactFiles ( compact_opt , handles_ [ cf ] ,
collector - > GetFlushedFiles ( ) ,
static_cast < int > ( kNumTimestamps - i ) ) ;
collector - > ClearFlushedFiles ( ) ;
}
}
}
}
const auto & verify_db_func = [ & ] ( ) {
for ( size_t i = 0 ; i ! = kNumTimestamps ; + + i ) {
ReadOptions ropts ;
ropts . timestamp = & read_ts_list [ i ] ;
for ( int cf = 0 ; cf ! = static_cast < int > ( num_cfs ) ; + + cf ) {
ColumnFamilyHandle * cfh = handles_ [ cf ] ;
for ( size_t j = 0 ; j ! = kNumKeysPerTimestamp ; + + j ) {
std : : string value ;
ASSERT_OK ( db_ - > Get ( ropts , cfh , " key " + std : : to_string ( j ) , & value ) ) ;
ASSERT_EQ ( " value_ " + std : : to_string ( j ) + " _ " + std : : to_string ( i ) ,
value ) ;
}
}
}
} ;
verify_db_func ( ) ;
}
# endif // !ROCKSDB_LITE
class DBBasicTestWithTimestampWithParam
: public DBBasicTestWithTimestampBase ,
public testing : : WithParamInterface < bool > {
@ -2247,115 +2358,6 @@ TEST_P(DBBasicTestWithTimestampWithParam, PutAndGet) {
INSTANTIATE_TEST_CASE_P ( Timestamp , DBBasicTestWithTimestampWithParam ,
: : testing : : Bool ( ) ) ;
// A class which remembers the name of each flushed file.
class FlushedFileCollector : public EventListener {
public :
FlushedFileCollector ( ) { }
~ FlushedFileCollector ( ) override { }
void OnFlushCompleted ( DB * /*db*/ , const FlushJobInfo & info ) override {
InstrumentedMutexLock lock ( & mutex_ ) ;
flushed_files_ . push_back ( info . file_path ) ;
}
std : : vector < std : : string > GetFlushedFiles ( ) {
std : : vector < std : : string > result ;
{
InstrumentedMutexLock lock ( & mutex_ ) ;
result = flushed_files_ ;
}
return result ;
}
void ClearFlushedFiles ( ) {
InstrumentedMutexLock lock ( & mutex_ ) ;
flushed_files_ . clear ( ) ;
}
private :
std : : vector < std : : string > flushed_files_ ;
InstrumentedMutex mutex_ ;
} ;
TEST_F ( DBBasicTestWithTimestamp , PutAndGetWithCompaction ) {
const int kNumKeysPerFile = 8192 ;
const size_t kNumTimestamps = 2 ;
const size_t kNumKeysPerTimestamp = ( kNumKeysPerFile - 1 ) / kNumTimestamps ;
const size_t kSplitPosBase = kNumKeysPerTimestamp / 2 ;
Options options = CurrentOptions ( ) ;
options . create_if_missing = true ;
options . env = env_ ;
options . memtable_factory . reset ( new SpecialSkipListFactory ( kNumKeysPerFile ) ) ;
FlushedFileCollector * collector = new FlushedFileCollector ( ) ;
options . listeners . emplace_back ( collector ) ;
std : : string tmp ;
size_t ts_sz = EncodeTimestamp ( 0 , 0 , & tmp ) . size ( ) ;
TestComparator test_cmp ( ts_sz ) ;
options . comparator = & test_cmp ;
BlockBasedTableOptions bbto ;
bbto . filter_policy . reset ( NewBloomFilterPolicy (
10 /*bits_per_key*/ , false /*use_block_based_builder*/ ) ) ;
bbto . whole_key_filtering = true ;
options . table_factory . reset ( NewBlockBasedTableFactory ( bbto ) ) ;
DestroyAndReopen ( options ) ;
CreateAndReopenWithCF ( { " pikachu " } , options ) ;
size_t num_cfs = handles_ . size ( ) ;
ASSERT_EQ ( 2 , num_cfs ) ;
std : : vector < std : : string > write_ts_strs ( kNumTimestamps ) ;
std : : vector < std : : string > read_ts_strs ( kNumTimestamps ) ;
std : : vector < Slice > write_ts_list ;
std : : vector < Slice > read_ts_list ;
for ( size_t i = 0 ; i ! = kNumTimestamps ; + + i ) {
write_ts_list . emplace_back ( EncodeTimestamp ( i * 2 , 0 , & write_ts_strs [ i ] ) ) ;
read_ts_list . emplace_back ( EncodeTimestamp ( 1 + i * 2 , 0 , & read_ts_strs [ i ] ) ) ;
const Slice & write_ts = write_ts_list . back ( ) ;
WriteOptions wopts ;
wopts . timestamp = & write_ts ;
for ( int cf = 0 ; cf ! = static_cast < int > ( num_cfs ) ; + + cf ) {
for ( size_t j = 0 ; j ! = kNumKeysPerTimestamp ; + + j ) {
ASSERT_OK ( Put ( cf , " key " + std : : to_string ( j ) ,
" value_ " + std : : to_string ( j ) + " _ " + std : : to_string ( i ) ,
wopts ) ) ;
if ( j = = kSplitPosBase + i | | j = = kNumKeysPerTimestamp - 1 ) {
// flush all keys with the same timestamp to two sst files, split at
// incremental positions such that lowerlevel[1].smallest.userkey ==
// higherlevel[0].largest.userkey
ASSERT_OK ( Flush ( cf ) ) ;
// compact files (2 at each level) to a lower level such that all keys
// with the same timestamp is at one level, with newer versions at
// higher levels.
CompactionOptions compact_opt ;
compact_opt . compression = kNoCompression ;
db_ - > CompactFiles ( compact_opt , handles_ [ cf ] ,
collector - > GetFlushedFiles ( ) ,
static_cast < int > ( kNumTimestamps - i ) ) ;
collector - > ClearFlushedFiles ( ) ;
}
}
}
}
const auto & verify_db_func = [ & ] ( ) {
for ( size_t i = 0 ; i ! = kNumTimestamps ; + + i ) {
ReadOptions ropts ;
ropts . timestamp = & read_ts_list [ i ] ;
for ( int cf = 0 ; cf ! = static_cast < int > ( num_cfs ) ; + + cf ) {
ColumnFamilyHandle * cfh = handles_ [ cf ] ;
for ( size_t j = 0 ; j ! = kNumKeysPerTimestamp ; + + j ) {
std : : string value ;
ASSERT_OK ( db_ - > Get ( ropts , cfh , " key " + std : : to_string ( j ) , & value ) ) ;
ASSERT_EQ ( " value_ " + std : : to_string ( j ) + " _ " + std : : to_string ( i ) ,
value ) ;
}
}
}
} ;
verify_db_func ( ) ;
}
} // namespace rocksdb
# ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS