@ -3,25 +3,29 @@
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
# include <algorithm>
# include <map>
# include <string>
# include <tuple>
# include "db/compaction_job.h"
# include "db/column_family.h"
# include "db/version_set.h"
# include "db/writebuffer.h"
# include "rocksdb/cache.h"
# include "rocksdb/options.h"
# include "rocksdb/db.h"
# include "rocksdb/options.h"
# include "table/mock_table.h"
# include "util/file_reader_writer.h"
# include "util/string_util.h"
# include "util/testharness.h"
# include "util/testutil.h"
# include "table/mock_table .h"
# include "utilities/merge_operators .h"
namespace rocksdb {
namespace {
void VerifyInitializationOfCompactionJobStats (
const CompactionJobStats & compaction_job_stats ) {
# if !defined(IOS_CROSS_COMPILE)
@ -73,12 +77,6 @@ class CompactionJobTest : public testing::Test {
EXPECT_OK ( env_ - > CreateDirIfMissing ( dbname_ ) ) ;
db_options_ . db_paths . emplace_back ( dbname_ ,
std : : numeric_limits < uint64_t > : : max ( ) ) ;
NewDB ( ) ;
std : : vector < ColumnFamilyDescriptor > column_families ;
cf_options_ . table_factory = mock_table_factory_ ;
column_families . emplace_back ( kDefaultColumnFamilyName , cf_options_ ) ;
EXPECT_OK ( versions_ - > Recover ( column_families , false ) ) ;
}
std : : string GenerateFileName ( uint64_t file_number ) {
@ -89,13 +87,68 @@ class CompactionJobTest : public testing::Test {
return TableFileName ( db_paths , meta . fd . GetNumber ( ) , meta . fd . GetPathId ( ) ) ;
}
std : : string KeyStr ( const std : : string & user_key , const SequenceNumber seq_num ,
const ValueType t ) {
return InternalKey ( user_key , seq_num , t ) . Encode ( ) . ToString ( ) ;
}
// Corrupts key by changing the type
void CorruptKey ( InternalKey * ikey ) {
void CorruptKeyType ( InternalKey * ikey ) {
std : : string keystr = ikey - > Encode ( ) . ToString ( ) ;
keystr [ keystr . size ( ) - 8 ] = kTypeLogData ;
ikey - > DecodeFrom ( Slice ( keystr . data ( ) , keystr . size ( ) ) ) ;
}
void AddMockFile ( const mock : : MockFileContents & contents , int level = 0 ) {
assert ( contents . size ( ) > 0 ) ;
bool first_key = true ;
std : : string smallest , largest ;
InternalKey smallest_key , largest_key ;
SequenceNumber smallest_seqno = kMaxSequenceNumber ;
SequenceNumber largest_seqno = 0 ;
for ( auto kv : contents ) {
ParsedInternalKey key ;
std : : string skey ;
std : : string value ;
std : : tie ( skey , value ) = kv ;
ParseInternalKey ( skey , & key ) ;
smallest_seqno = std : : min ( smallest_seqno , key . sequence ) ;
largest_seqno = std : : max ( largest_seqno , key . sequence ) ;
if ( first_key | |
cfd_ - > user_comparator ( ) - > Compare ( key . user_key , smallest ) < 0 ) {
smallest . assign ( key . user_key . data ( ) , key . user_key . size ( ) ) ;
smallest_key . DecodeFrom ( skey ) ;
}
if ( first_key | |
cfd_ - > user_comparator ( ) - > Compare ( key . user_key , largest ) > 0 ) {
largest . assign ( key . user_key . data ( ) , key . user_key . size ( ) ) ;
largest_key . DecodeFrom ( skey ) ;
}
first_key = false ;
}
uint64_t file_number = versions_ - > NewFileNumber ( ) ;
EXPECT_OK ( mock_table_factory_ - > CreateMockTable (
env_ , GenerateFileName ( file_number ) , std : : move ( contents ) ) ) ;
VersionEdit edit ;
edit . AddFile ( level , file_number , 0 , 10 , smallest_key , largest_key ,
smallest_seqno , largest_seqno , false ) ;
mutex_ . Lock ( ) ;
versions_ - > LogAndApply ( versions_ - > GetColumnFamilySet ( ) - > GetDefault ( ) ,
mutable_cf_options_ , & edit , & mutex_ ) ;
mutex_ . Unlock ( ) ;
}
void SetLastSequence ( const SequenceNumber sequence_number ) {
versions_ - > SetLastSequence ( sequence_number + 1 ) ;
}
// returns expected result after compaction
mock : : MockFileContents CreateTwoFiles ( bool gen_corrupted_keys ) {
mock : : MockFileContents expected_results ;
@ -110,8 +163,6 @@ class CompactionJobTest : public testing::Test {
for ( int i = 0 ; i < 2 ; + + i ) {
mock : : MockFileContents contents ;
SequenceNumber smallest_seqno = 0 , largest_seqno = 0 ;
InternalKey smallest , largest ;
for ( int k = 0 ; k < kKeysPerFile ; + + k ) {
auto key = ToString ( i * kMatchingKeys + k ) ;
auto value = ToString ( i * kKeysPerFile + k ) ;
@ -120,15 +171,8 @@ class CompactionJobTest : public testing::Test {
// file
InternalKey bottommost_internal_key ( key , 0 , kTypeValue ) ;
if ( corrupt_id ( k ) ) {
CorruptKey ( & internal_key ) ;
CorruptKey ( & bottommost_internal_key ) ;
}
if ( k = = 0 ) {
smallest = internal_key ;
smallest_seqno = sequence_number ;
} else if ( k = = kKeysPerFile - 1 ) {
largest = internal_key ;
largest_seqno = sequence_number ;
CorruptKeyType ( & internal_key ) ;
CorruptKeyType ( & bottommost_internal_key ) ;
}
contents . insert ( { internal_key . Encode ( ) . ToString ( ) , value } ) ;
if ( i = = 1 | | k < kMatchingKeys | | corrupt_id ( k - kMatchingKeys ) ) {
@ -137,24 +181,15 @@ class CompactionJobTest : public testing::Test {
}
}
uint64_t file_number = versions_ - > NewFileNumber ( ) ;
EXPECT_OK ( mock_table_factory_ - > CreateMockTable (
env_ , GenerateFileName ( file_number ) , std : : move ( contents ) ) ) ;
AddMockFile ( contents ) ;
}
VersionEdit edit ;
edit . AddFile ( 0 , file_number , 0 , 10 , smallest , largest , smallest_seqno ,
largest_seqno , false ) ;
SetLastSequence ( sequence_number ) ;
mutex_ . Lock ( ) ;
versions_ - > LogAndApply ( versions_ - > GetColumnFamilySet ( ) - > GetDefault ( ) ,
mutable_cf_options_ , & edit , & mutex_ ) ;
mutex_ . Unlock ( ) ;
}
versions_ - > SetLastSequence ( sequence_number + 1 ) ;
return expected_results ;
}
void NewDB ( ) {
void NewDB ( std : : shared_ptr < MergeOperator > merge_operator = nullptr ) {
VersionEdit new_db ;
new_db . SetLogNumber ( 0 ) ;
new_db . SetNextFile ( 2 ) ;
@ -166,7 +201,7 @@ class CompactionJobTest : public testing::Test {
manifest , & file , env_ - > OptimizeForManifestWrite ( env_options_ ) ) ;
ASSERT_OK ( s ) ;
unique_ptr < WritableFileWriter > file_writer (
new WritableFileWriter ( std : : move ( file ) , EnvOptions ( ) ) ) ;
new WritableFileWriter ( std : : move ( file ) , env_options_ ) ) ;
{
log : : Writer log ( std : : move ( file_writer ) ) ;
std : : string record ;
@ -176,19 +211,35 @@ class CompactionJobTest : public testing::Test {
ASSERT_OK ( s ) ;
// Make "CURRENT" file that points to the new manifest file.
s = SetCurrentFile ( env_ , dbname_ , 1 , nullptr ) ;
std : : vector < ColumnFamilyDescriptor > column_families ;
cf_options_ . table_factory = mock_table_factory_ ;
cf_options_ . merge_operator = merge_operator ;
column_families . emplace_back ( kDefaultColumnFamilyName , cf_options_ ) ;
EXPECT_OK ( versions_ - > Recover ( column_families , false ) ) ;
cfd_ = versions_ - > GetColumnFamilySet ( ) - > GetDefault ( ) ;
}
void RunCompaction ( const std : : vector < FileMetaData * > & files ) {
void RunCompaction ( const std : : vector < std : : vector < FileMetaData * > > & input_files ,
const mock : : MockFileContents & expected_results ) {
auto cfd = versions_ - > GetColumnFamilySet ( ) - > GetDefault ( ) ;
CompactionInputFiles compaction_input_files ;
compaction_input_files . level = 0 ;
for ( auto file : files ) {
compaction_input_files . files . push_back ( file ) ;
size_t num_input_files = 0 ;
std : : vector < CompactionInputFiles > compaction_input_files ;
for ( size_t level = 0 ; level < input_files . size ( ) ; level + + ) {
auto level_files = input_files [ level ] ;
CompactionInputFiles compaction_level ;
compaction_level . level = level ;
compaction_level . files . insert ( compaction_level . files . end ( ) ,
level_files . begin ( ) , level_files . end ( ) ) ;
compaction_input_files . push_back ( compaction_level ) ;
num_input_files + = level_files . size ( ) ;
}
Compaction compaction ( cfd - > current ( ) - > storage_info ( ) ,
* cfd - > GetLatestMutableCFOptions ( ) ,
{ compaction_input_files } , 1 , 1024 * 1024 , 10 , 0 ,
compaction_input_files , 1 , 1024 * 1024 , 10 , 0 ,
kNoCompression , { } ) ;
compaction . SetInputVersion ( cfd - > current ( ) ) ;
@ -204,16 +255,18 @@ class CompactionJobTest : public testing::Test {
compaction_job . Prepare ( ) ;
mutex_ . Unlock ( ) ;
ASSERT_OK ( compaction_job . Run ( ) ) ;
mutex_ . Lock ( ) ;
Status s ;
s = compaction_job . Run ( ) ;
ASSERT_OK ( s ) ;
mutex_ . Lock ( ) ;
compaction_job . Install ( & s , * cfd - > GetLatestMutableCFOptions ( ) , & mutex_ ) ;
ASSERT_OK ( s ) ;
mutex_ . Unlock ( ) ;
ASSERT_GE ( compaction_job_stats_ . elapsed_micros , 0U ) ;
ASSERT_EQ ( compaction_job_stats_ . num_input_files , files . size ( ) ) ;
ASSERT_EQ ( compaction_job_stats_ . num_input_files , num_input_files ) ;
ASSERT_EQ ( compaction_job_stats_ . num_output_files , 1U ) ;
mock_table_factory_ - > AssertLatestFile ( expected_results ) ;
}
Env * env_ ;
@ -230,26 +283,165 @@ class CompactionJobTest : public testing::Test {
std : : atomic < bool > shutting_down_ ;
std : : shared_ptr < mock : : MockTableFactory > mock_table_factory_ ;
CompactionJobStats compaction_job_stats_ ;
ColumnFamilyData * cfd_ ;
} ;
TEST_F ( CompactionJobTest , Simple ) {
NewDB ( ) ;
auto expected_results = CreateTwoFiles ( false ) ;
auto cfd = versions_ - > GetColumnFamilySet ( ) - > GetDefault ( ) ;
auto files = cfd - > current ( ) - > storage_info ( ) - > LevelFiles ( 0 ) ;
ASSERT_EQ ( 2U , files . size ( ) ) ;
RunCompaction ( files ) ;
mock_table_factory_ - > AssertLatestFile ( expected_results ) ;
RunCompaction ( { files } , expected_results ) ;
}
TEST_F ( CompactionJobTest , SimpleCorrupted ) {
NewDB ( ) ;
auto expected_results = CreateTwoFiles ( true ) ;
auto cfd = versions_ - > GetColumnFamilySet ( ) - > GetDefault ( ) ;
auto files = cfd - > current ( ) - > storage_info ( ) - > LevelFiles ( 0 ) ;
RunCompaction ( files ) ;
RunCompaction ( { files } , expected_results ) ;
ASSERT_EQ ( compaction_job_stats_ . num_corrupt_keys , 400U ) ;
mock_table_factory_ - > AssertLatestFile ( expected_results ) ;
}
TEST_F ( CompactionJobTest , SimpleDeletion ) {
NewDB ( ) ;
mock : : MockFileContents file1 = {
{ KeyStr ( " c " , 4U , kTypeDeletion ) , " " } ,
{ KeyStr ( " c " , 3U , kTypeValue ) , " val " }
} ;
AddMockFile ( file1 ) ;
mock : : MockFileContents file2 = {
{ KeyStr ( " b " , 2U , kTypeValue ) , " val " } ,
{ KeyStr ( " b " , 1U , kTypeValue ) , " val " }
} ;
AddMockFile ( file2 ) ;
mock : : MockFileContents expected_results = {
{ KeyStr ( " b " , 0U , kTypeValue ) , " val " }
} ;
SetLastSequence ( 4U ) ;
auto files = cfd_ - > current ( ) - > storage_info ( ) - > LevelFiles ( 0 ) ;
RunCompaction ( { files } , expected_results ) ;
}
TEST_F ( CompactionJobTest , SimpleOverwrite ) {
NewDB ( ) ;
mock : : MockFileContents file1 = {
{ KeyStr ( " a " , 3U , kTypeValue ) , " val2 " } ,
{ KeyStr ( " b " , 4U , kTypeValue ) , " val3 " } ,
} ;
AddMockFile ( file1 ) ;
mock : : MockFileContents file2 = {
{ KeyStr ( " a " , 1U , kTypeValue ) , " val " } ,
{ KeyStr ( " b " , 2U , kTypeValue ) , " val " }
} ;
AddMockFile ( file2 ) ;
mock : : MockFileContents expected_results = {
{ KeyStr ( " a " , 0U , kTypeValue ) , " val2 " } ,
{ KeyStr ( " b " , 0U , kTypeValue ) , " val3 " }
} ;
SetLastSequence ( 4U ) ;
auto files = cfd_ - > current ( ) - > storage_info ( ) - > LevelFiles ( 0 ) ;
RunCompaction ( { files } , expected_results ) ;
}
TEST_F ( CompactionJobTest , SimpleNonLastLevel ) {
NewDB ( ) ;
mock : : MockFileContents file1 = {
{ KeyStr ( " a " , 5U , kTypeValue ) , " val2 " } ,
{ KeyStr ( " b " , 6U , kTypeValue ) , " val3 " } ,
} ;
AddMockFile ( file1 ) ;
mock : : MockFileContents file2 = {
{ KeyStr ( " a " , 3U , kTypeValue ) , " val " } ,
{ KeyStr ( " b " , 4U , kTypeValue ) , " val " }
} ;
AddMockFile ( file2 , 1 ) ;
mock : : MockFileContents file3 = {
{ KeyStr ( " a " , 1U , kTypeValue ) , " val " } ,
{ KeyStr ( " b " , 2U , kTypeValue ) , " val " }
} ;
AddMockFile ( file3 , 2 ) ;
// Because level 1 is not the last level, the sequence numbers of a and b
// cannot be set to 0
mock : : MockFileContents expected_results = {
{ KeyStr ( " a " , 5U , kTypeValue ) , " val2 " } ,
{ KeyStr ( " b " , 6U , kTypeValue ) , " val3 " }
} ;
SetLastSequence ( 6U ) ;
auto lvl0_files = cfd_ - > current ( ) - > storage_info ( ) - > LevelFiles ( 0 ) ;
auto lvl1_files = cfd_ - > current ( ) - > storage_info ( ) - > LevelFiles ( 1 ) ;
RunCompaction ( { lvl0_files , lvl1_files } , expected_results ) ;
}
TEST_F ( CompactionJobTest , SimpleMerge ) {
auto merge_op = MergeOperators : : CreateStringAppendOperator ( ) ;
NewDB ( merge_op ) ;
mock : : MockFileContents file1 = {
{ KeyStr ( " a " , 5U , kTypeMerge ) , " 5 " } ,
{ KeyStr ( " a " , 4U , kTypeMerge ) , " 4 " } ,
{ KeyStr ( " a " , 3U , kTypeValue ) , " 3 " } ,
} ;
AddMockFile ( file1 ) ;
mock : : MockFileContents file2 = {
{ KeyStr ( " b " , 2U , kTypeMerge ) , " 2 " } ,
{ KeyStr ( " b " , 1U , kTypeValue ) , " 1 " }
} ;
AddMockFile ( file2 ) ;
mock : : MockFileContents expected_results = {
{ KeyStr ( " a " , 0U , kTypeValue ) , " 3,4,5 " } ,
{ KeyStr ( " b " , 0U , kTypeValue ) , " 1,2 " }
} ;
SetLastSequence ( 5U ) ;
auto files = cfd_ - > current ( ) - > storage_info ( ) - > LevelFiles ( 0 ) ;
RunCompaction ( { files } , expected_results ) ;
}
TEST_F ( CompactionJobTest , NonAssocMerge ) {
auto merge_op = MergeOperators : : CreateStringAppendTESTOperator ( ) ;
NewDB ( merge_op ) ;
mock : : MockFileContents file1 = {
{ KeyStr ( " a " , 5U , kTypeMerge ) , " 5 " } ,
{ KeyStr ( " a " , 4U , kTypeMerge ) , " 4 " } ,
{ KeyStr ( " a " , 3U , kTypeMerge ) , " 3 " } ,
} ;
AddMockFile ( file1 ) ;
mock : : MockFileContents file2 = {
{ KeyStr ( " b " , 2U , kTypeMerge ) , " 2 " } ,
{ KeyStr ( " b " , 1U , kTypeMerge ) , " 1 " }
} ;
AddMockFile ( file2 ) ;
mock : : MockFileContents expected_results = {
{ KeyStr ( " a " , 0U , kTypeValue ) , " 3,4,5 " } ,
{ KeyStr ( " b " , 2U , kTypeMerge ) , " 2 " } ,
{ KeyStr ( " b " , 1U , kTypeMerge ) , " 1 " }
} ;
SetLastSequence ( 5U ) ;
auto files = cfd_ - > current ( ) - > storage_info ( ) - > LevelFiles ( 0 ) ;
RunCompaction ( { files } , expected_results ) ;
}
} // namespace rocksdb