@ -34,29 +34,66 @@
namespace rocksdb {
namespace rocksdb {
extern const uint64_t kPlainTableMagicNumber ;
namespace {
static uint32_t GetSliceHash ( Slice const & s ) {
inline uint32_t GetSliceHash ( Slice const & s ) {
return Hash ( s . data ( ) , s . size ( ) , 397 ) ;
return Hash ( s . data ( ) , s . size ( ) , 397 ) ;
}
}
static uint32_t getBucketIdFromHash ( uint32_t hash , uint32_t num_buckets ) {
inline uint32_t GetBucketIdFromHash ( uint32_t hash , uint32_t num_buckets ) {
return hash % num_buckets ;
return hash % num_buckets ;
}
}
} // namespace
// Iterator to iterate IndexedTable
class PlainTableIterator : public Iterator {
public :
explicit PlainTableIterator ( PlainTableReader * table ) ;
~ PlainTableIterator ( ) ;
bool Valid ( ) const ;
void SeekToFirst ( ) ;
void SeekToLast ( ) ;
void Seek ( const Slice & target ) ;
void Next ( ) ;
void Prev ( ) ;
Slice key ( ) const ;
Slice value ( ) const ;
Status status ( ) const ;
private :
PlainTableReader * table_ ;
uint32_t offset_ ;
uint32_t next_offset_ ;
Slice key_ ;
Slice value_ ;
Status status_ ;
// No copying allowed
PlainTableIterator ( const PlainTableIterator & ) = delete ;
void operator = ( const Iterator & ) = delete ;
} ;
extern const uint64_t kPlainTableMagicNumber ;
PlainTableReader : : PlainTableReader ( const EnvOptions & storage_options ,
PlainTableReader : : PlainTableReader ( const EnvOptions & storage_options ,
uint64_t file_size , int bloom_bits_per_key ,
uint64_t file_size , int bloom_bits_per_key ,
double hash_table_ratio ,
double hash_table_ratio ,
const TableProperties & table_properties ) :
const TableProperties & table_properties )
hash_table_size_ ( 0 ) , soptions_ ( storage_options ) , file_size_ ( file_size ) ,
: soptions_ ( storage_options ) ,
hash_table_ratio_ ( hash_table_ratio ) ,
file_size_ ( file_size ) ,
bloom_bits_per_key_ ( bloom_bits_per_key ) ,
kHashTableRatio ( hash_table_ratio ) ,
table_properties_ ( table_properties ) , data_start_offset_ ( 0 ) ,
kBloomBitsPerKey ( bloom_bits_per_key ) ,
data_end_offset_ ( table_properties_ . data_size ) ,
table_properties_ ( table_properties ) ,
user_key_len_ ( table_properties . fixed_key_len ) {
data_end_offset_ ( table_properties_ . data_size ) ,
hash_table_ = nullptr ;
user_key_len_ ( table_properties . fixed_key_len ) { }
bloom_ = nullptr ;
sub_index_ = nullptr ;
}
PlainTableReader : : ~ PlainTableReader ( ) {
PlainTableReader : : ~ PlainTableReader ( ) {
delete [ ] hash_table_ ;
delete [ ] hash_table_ ;
@ -73,30 +110,20 @@ Status PlainTableReader::Open(const Options& options,
double hash_table_ratio ) {
double hash_table_ratio ) {
assert ( options . allow_mmap_reads ) ;
assert ( options . allow_mmap_reads ) ;
if ( file_size > 2147483646 ) {
if ( file_size > kMaxFileSize ) {
return Status : : NotSupported ( " File is too large for PlainTableReader! " ) ;
return Status : : NotSupported ( " File is too large for PlainTableReader! " ) ;
}
}
TableProperties table_properties ;
TableProperties table_properties ;
auto s = ReadTableProperties (
auto s = ReadTableProperties ( file . get ( ) , file_size , kPlainTableMagicNumber ,
file . get ( ) ,
options . env , options . info_log . get ( ) ,
file_size ,
& table_properties ) ;
kPlainTableMagicNumber ,
options . env ,
options . info_log . get ( ) ,
& table_properties
) ;
if ( ! s . ok ( ) ) {
if ( ! s . ok ( ) ) {
return s ;
return s ;
}
}
std : : unique_ptr < PlainTableReader > new_reader ( new PlainTableReader (
std : : unique_ptr < PlainTableReader > new_reader ( new PlainTableReader (
soptions ,
soptions , file_size , bloom_num_bits , hash_table_ratio , table_properties ) ) ;
file_size ,
bloom_num_bits ,
hash_table_ratio ,
table_properties
) ) ;
new_reader - > file_ = std : : move ( file ) ;
new_reader - > file_ = std : : move ( file ) ;
new_reader - > options_ = options ;
new_reader - > options_ = options ;
@ -129,12 +156,11 @@ struct PlainTableReader::IndexRecord {
// Helper class to track all the index records
// Helper class to track all the index records
class PlainTableReader : : IndexRecordList {
class PlainTableReader : : IndexRecordList {
public :
public :
explicit IndexRecordList ( size_t num_records_per_group ) :
explicit IndexRecordList ( size_t num_records_per_group )
num_records_per_group_ ( num_records_per_group ) ,
: kNumRecordsPerGroup ( num_records_per_group ) ,
current_group_ ( nullptr ) ,
current_group_ ( nullptr ) ,
num_records_in_current_group_ ( num_records_per_group ) {
num_records_in_current_group_ ( num_records_per_group ) { }
}
~ IndexRecordList ( ) {
~ IndexRecordList ( ) {
for ( size_t i = 0 ; i < groups_ . size ( ) ; i + + ) {
for ( size_t i = 0 ; i < groups_ . size ( ) ; i + + ) {
@ -143,65 +169,59 @@ public:
}
}
void AddRecord ( murmur_t hash , uint32_t offset ) {
void AddRecord ( murmur_t hash , uint32_t offset ) {
if ( num_records_in_current_group_ = = num_records_per_group_ ) {
if ( num_records_in_current_group_ = = kNumRecordsPerGroup ) {
current_group_ = AllocateNewGroup ( ) ;
current_group_ = AllocateNewGroup ( ) ;
num_records_in_current_group_ = 0 ;
num_records_in_current_group_ = 0 ;
}
}
auto & new_record = current_group_ [ num_records_in_current_group_ ] ;
auto & new_record = current_group_ [ num_records_in_current_group_ + + ] ;
new_record . hash = hash ;
new_record . hash = hash ;
new_record . offset = offset ;
new_record . offset = offset ;
new_record . next = nullptr ;
new_record . next = nullptr ;
num_records_in_current_group_ + + ;
}
}
size_t GetNumRecords ( ) {
size_t GetNumRecords ( ) const {
return ( groups_ . size ( ) - 1 ) * num_records_per_group_
return ( groups_ . size ( ) - 1 ) * kNumRecordsPerGroup +
+ num_records_in_current_group_ ;
num_records_in_current_group_ ;
}
}
IndexRecord * At ( size_t index ) {
IndexRecord * At ( size_t index ) {
return & ( groups_ [ index / num_records_per_group_ ]
return & ( groups_ [ index / kNumRecordsPerGroup ] [ index % kNumRecordsPerGroup ] ) ;
[ index % num_records_per_group_ ] ) ;
}
}
private :
IndexRecord * AllocateNewGroup ( ) {
IndexRecord * AllocateNewGroup ( ) {
IndexRecord * result = new IndexRecord [ num_records_per_group_ ] ;
IndexRecord * result = new IndexRecord [ kNumRecordsPerGroup ] ;
groups_ . push_back ( result ) ;
groups_ . push_back ( result ) ;
return result ;
return result ;
}
}
private :
const size_t num_records_per_group_ ;
const size_t kNumRecordsPerGroup ;
IndexRecord * current_group_ ;
IndexRecord * current_group_ ;
// List of arrays allocated
// List of arrays allocated
std : : vector < IndexRecord * > groups_ ;
std : : vector < IndexRecord * > groups_ ;
size_t num_records_in_current_group_ ;
size_t num_records_in_current_group_ ;
} ;
} ;
int PlainTableReader : : PopulateIndexRecordList (
int PlainTableReader : : PopulateIndexRecordList ( IndexRecordList * record_list ) {
IndexRecordList & record_list ) {
Slice key_slice ;
Slice key_prefix_slice ;
Slice key_suffix_slice ;
Slice value_slice ;
Slice prev_key_prefix_slice ;
Slice prev_key_prefix_slice ;
uint32_t prev_key_prefix_hash = 0 ;
uint32_t prev_key_prefix_hash = 0 ;
uint32_t pos = data_start_offset_ ;
uint32_t pos = data_start_offset_ ;
int key_index_within_prefix = 0 ;
int key_index_within_prefix = 0 ;
bool first = true ;
bool is_first_record = true ;
std : : string prefix_sub_index ;
HistogramImpl keys_per_prefix_hist ;
HistogramImpl keys_per_prefix_hist ;
// Need map to be ordered to make sure sub indexes generated
// Need map to be ordered to make sure sub indexes generated
// are in order.
// are in order.
int num_prefixes = 0 ;
int num_prefixes = 0 ;
while ( pos < data_end_offset_ ) {
while ( pos < data_end_offset_ ) {
uint32_t key_offset = pos ;
uint32_t key_offset = pos ;
Slice key_slice ;
Slice value_slice ;
status_ = Next ( pos , & key_slice , & value_slice , pos ) ;
status_ = Next ( pos , & key_slice , & value_slice , pos ) ;
key_prefix_slice = GetPrefix ( key_slice ) ;
Slice key_prefix_slice = GetPrefix ( key_slice ) ;
if ( first | | prev_key_prefix_slice ! = key_prefix_slice ) {
if ( is_ first_record | | prev_key_prefix_slice ! = key_prefix_slice ) {
num_prefixes + + ;
+ + num_prefixes ;
if ( ! first ) {
if ( ! is_ first_record ) {
keys_per_prefix_hist . Add ( key_index_within_prefix ) ;
keys_per_prefix_hist . Add ( key_index_within_prefix ) ;
}
}
key_index_within_prefix = 0 ;
key_index_within_prefix = 0 ;
@ -209,12 +229,13 @@ int PlainTableReader::PopulateIndexRecordList(
prev_key_prefix_hash = GetSliceHash ( key_prefix_slice ) ;
prev_key_prefix_hash = GetSliceHash ( key_prefix_slice ) ;
}
}
if ( key_index_within_prefix + + % 16 = = 0 ) {
if ( key_index_within_prefix + + % kIndexIntervalForSamePrefixKeys = = 0 ) {
// Add an index key for every 16 keys
// Add an index key for every kIndexIntervalForSamePrefixKeys keys
record_list . AddRecord ( prev_key_prefix_hash , key_offset ) ;
record_list - > AddRecord ( prev_key_prefix_hash , key_offset ) ;
}
}
first = false ;
is_ first_record = false ;
}
}
keys_per_prefix_hist . Add ( key_index_within_prefix ) ;
keys_per_prefix_hist . Add ( key_index_within_prefix ) ;
Log ( options_ . info_log , " Number of Keys per prefix Histogram: %s " ,
Log ( options_ . info_log , " Number of Keys per prefix Histogram: %s " ,
keys_per_prefix_hist . ToString ( ) . c_str ( ) ) ;
keys_per_prefix_hist . ToString ( ) . c_str ( ) ) ;
@ -222,23 +243,22 @@ int PlainTableReader::PopulateIndexRecordList(
return num_prefixes ;
return num_prefixes ;
}
}
void PlainTableReader : : Allocate ( int num_prefixes ) {
void PlainTableReader : : AllocateIndexAndBloom ( int num_prefixes ) {
if ( hash_table_ ! = nullptr ) {
delete [ ] hash_table_ ;
delete [ ] hash_table_ ;
}
if ( kBloomBitsPerKey > 0 ) {
if ( bloom_bits_per_key_ > 0 ) {
bloom_ = new DynamicBloom ( num_prefixes * kBloomBitsPerKey ) ;
bloom_ = new DynamicBloom ( num_prefixes * bloom_bits_per_key_ ) ;
}
}
double hash_table_size_multipier =
double hash_table_size_multipier =
( hash_table_ratio_ > 1.0 ) ? 1.0 : 1.0 / hash_table_ratio_ ;
( kHashTableRatio > 1.0 ) ? 1.0 : 1.0 / kHashTableRatio ;
hash_table_size_ = num_prefixes * hash_table_size_multipier + 1 ;
hash_table_size_ = num_prefixes * hash_table_size_multipier + 1 ;
hash_table_ = new uint32_t [ hash_table_size_ ] ;
hash_table_ = new uint32_t [ hash_table_size_ ] ;
}
}
size_t PlainTableReader : : BucketizeIndexesAndFillBloom (
size_t PlainTableReader : : BucketizeIndexesAndFillBloom (
IndexRecordList & record_list , int num_prefixes ,
IndexRecordList & record_list , int num_prefixes ,
std : : vector < IndexRecord * > & hash2 offsets,
std : : vector < IndexRecord * > * hash_to_ offsets,
std : : vector < uint32_t > & bucket_count ) {
std : : vector < uint32_t > * bucket_count ) {
size_t sub_index_size_needed = 0 ;
size_t sub_index_size_needed = 0 ;
bool first = true ;
bool first = true ;
uint32_t prev_hash = 0 ;
uint32_t prev_hash = 0 ;
@ -253,32 +273,34 @@ size_t PlainTableReader::BucketizeIndexesAndFillBloom(
bloom_ - > AddHash ( cur_hash ) ;
bloom_ - > AddHash ( cur_hash ) ;
}
}
}
}
uint32_t bucket = g etBucketIdFromHash( cur_hash , hash_table_size_ ) ;
uint32_t bucket = G etBucketIdFromHash( cur_hash , hash_table_size_ ) ;
IndexRecord * prev_bucket_head = hash2offsets [ bucket ] ;
IndexRecord * prev_bucket_head = ( * hash_to_offsets ) [ bucket ] ;
index_record - > next = prev_bucket_head ;
index_record - > next = prev_bucket_head ;
hash2offsets [ bucket ] = index_record ;
( * hash_to_offsets ) [ bucket ] = index_record ;
if ( bucket_count [ bucket ] > 0 ) {
auto & item_count = ( * bucket_count ) [ bucket ] ;
if ( bucket_count [ bucket ] = = 1 ) {
if ( item_count > 0 ) {
if ( item_count = = 1 ) {
sub_index_size_needed + = kOffsetLen + 1 ;
sub_index_size_needed + = kOffsetLen + 1 ;
}
}
if ( bucket_count [ bucket ] = = 127 ) {
if ( item_count = = 127 ) {
// Need more than one byte for length
// Need more than one byte for length
sub_index_size_needed + + ;
sub_index_size_needed + + ;
}
}
sub_index_size_needed + = kOffsetLen ;
sub_index_size_needed + = kOffsetLen ;
}
}
bucket_count [ bucket ] + + ;
item_count + + ;
}
}
return sub_index_size_needed ;
return sub_index_size_needed ;
}
}
void PlainTableReader : : FillIndexes ( size_t sub_index_size_needed ,
void PlainTableReader : : FillIndexes (
std : : vector < IndexRecord * > & hash2offsets ,
size_t sub_index_size_needed ,
std : : vector < uint32_t > & bucket_count ) {
const std : : vector < IndexRecord * > & hash_to_offsets ,
const std : : vector < uint32_t > & bucket_count ) {
Log ( options_ . info_log , " Reserving %zu bytes for sub index " ,
Log ( options_ . info_log , " Reserving %zu bytes for sub index " ,
sub_index_size_needed ) ;
sub_index_size_needed ) ;
// 4 bytes buffer for variable length size
// 8 bytes buffer for variable length size
size_t buffer_size = 64 ;
size_t buffer_size = 8 * 8 ;
size_t buffer_used = 0 ;
size_t buffer_used = 0 ;
sub_index_size_needed + = buffer_size ;
sub_index_size_needed + = buffer_size ;
sub_index_ = new char [ sub_index_size_needed ] ;
sub_index_ = new char [ sub_index_size_needed ] ;
@ -286,7 +308,6 @@ void PlainTableReader::FillIndexes(size_t sub_index_size_needed,
char * prev_ptr ;
char * prev_ptr ;
char * cur_ptr ;
char * cur_ptr ;
uint32_t * sub_index_ptr ;
uint32_t * sub_index_ptr ;
IndexRecord * record ;
for ( int i = 0 ; i < hash_table_size_ ; i + + ) {
for ( int i = 0 ; i < hash_table_size_ ; i + + ) {
uint32_t num_keys_for_bucket = bucket_count [ i ] ;
uint32_t num_keys_for_bucket = bucket_count [ i ] ;
switch ( num_keys_for_bucket ) {
switch ( num_keys_for_bucket ) {
@ -296,14 +317,14 @@ void PlainTableReader::FillIndexes(size_t sub_index_size_needed,
break ;
break ;
case 1 :
case 1 :
// point directly to the file offset
// point directly to the file offset
hash_table_ [ i ] = hash2 offsets [ i ] - > offset ;
hash_table_ [ i ] = hash_to_ offsets [ i ] - > offset ;
break ;
break ;
default :
default :
// point to second level indexes.
// point to second level indexes.
hash_table_ [ i ] = sub_index_offset | kSubIndexMask ;
hash_table_ [ i ] = sub_index_offset | kSubIndexMask ;
prev_ptr = sub_index_ + sub_index_offset ;
prev_ptr = sub_index_ + sub_index_offset ;
cur_ptr = EncodeVarint32 ( prev_ptr , num_keys_for_bucket ) ;
cur_ptr = EncodeVarint32 ( prev_ptr , num_keys_for_bucket ) ;
sub_index_offset + = cur_ptr - prev_ptr ;
sub_index_offset + = ( cur_ptr - prev_ptr ) ;
if ( cur_ptr - prev_ptr > 2
if ( cur_ptr - prev_ptr > 2
| | ( cur_ptr - prev_ptr = = 2 & & num_keys_for_bucket < = 127 ) ) {
| | ( cur_ptr - prev_ptr = = 2 & & num_keys_for_bucket < = 127 ) ) {
// Need to resize sub_index. Exponentially grow buffer.
// Need to resize sub_index. Exponentially grow buffer.
@ -321,10 +342,10 @@ void PlainTableReader::FillIndexes(size_t sub_index_size_needed,
}
}
}
}
sub_index_ptr = ( uint32_t * ) ( sub_index_ + sub_index_offset ) ;
sub_index_ptr = ( uint32_t * ) ( sub_index_ + sub_index_offset ) ;
record = hash2 offsets [ i ] ;
IndexRecord * record = hash_to_ offsets [ i ] ;
int j ;
int j ;
for ( j = num_keys_for_bucket - 1 ;
for ( j = num_keys_for_bucket - 1 ; j > = 0 & & record ;
j > = 0 & & record ; j - - , record = record - > next ) {
j - - , record = record - > next ) {
sub_index_ptr [ j ] = record - > offset ;
sub_index_ptr [ j ] = record - > offset ;
}
}
assert ( j = = - 1 & & record = = nullptr ) ;
assert ( j = = - 1 & & record = = nullptr ) ;
@ -337,24 +358,6 @@ void PlainTableReader::FillIndexes(size_t sub_index_size_needed,
hash_table_size_ , sub_index_size_needed ) ;
hash_table_size_ , sub_index_size_needed ) ;
}
}
// PopulateIndex() builds index of keys.
// hash_table_ contains buckets size of hash_table_size_, each is a 32-bit
// integer. The lower 31 bits contain an offset value (explained below) and
// the first bit of the integer indicates type of the offset:
//
// 0 indicates that the bucket contains only one prefix (no conflict when
// hashing this prefix), whose first row starts from this offset of the file.
// 1 indicates that the bucket contains more than one prefixes, or there
// are too many rows for one prefix so we need a binary search for it. In
// this case, the offset indicates the offset of sub_index_ holding the
// binary search indexes of keys for those rows. Those binary search indexes
// are organized in this way:
//
// The first 4 bytes, indicates how many indexes (N) are stored after it. After
// it, there are N 32-bit integers, each points of an offset of the file, which
// points to starting of a row. Those offsets need to be guaranteed to be in
// ascending order so the keys they are pointing to are also in ascending order
// to make sure we can use them to do binary searches.
Status PlainTableReader : : PopulateIndex ( ) {
Status PlainTableReader : : PopulateIndex ( ) {
// Get mmapped memory to file_data_.
// Get mmapped memory to file_data_.
Status s = file_ - > Read ( 0 , file_size_ , & file_data_ , nullptr ) ;
Status s = file_ - > Read ( 0 , file_size_ , & file_data_ , nullptr ) ;
@ -362,25 +365,24 @@ Status PlainTableReader::PopulateIndex() {
return s ;
return s ;
}
}
IndexRecordList record_list ( 256 ) ;
IndexRecordList record_list ( kRecordsPerGroup ) ;
// First, read the whole file, for every 16 rows for a prefix (starting from
// First, read the whole file, for every kIndexIntervalForSamePrefixKeys rows
// the first one), generate a record of (hash, offset) and append it to
// for a prefix (starting from the first one), generate a record of (hash,
// IndexRecordList, which is a data structure created to store them.
// offset) and append it to IndexRecordList, which is a data structure created
int num_prefixes = PopulateIndexRecordList ( record_list ) ;
// to store them.
int num_prefixes = PopulateIndexRecordList ( & record_list ) ;
// Calculated hash table and bloom filter size and allocate memory for indexes
// Calculated hash table and bloom filter size and allocate memory for indexes
// and bloom filter based on the number of prefixes.
// and bloom filter based on the number of prefixes.
Allocate ( num_prefixes ) ;
AllocateIndexAndBloom ( num_prefixes ) ;
// Bucketize all the index records to a temp data structure, in which for
// Bucketize all the index records to a temp data structure, in which for
// each bucket, we generate a linked list of IndexRecord, in reversed order.
// each bucket, we generate a linked list of IndexRecord, in reversed order.
std : : vector < IndexRecord * > hash2 offsets ( hash_table_size_ , nullptr ) ;
std : : vector < IndexRecord * > hash_to_ offsets ( hash_table_size_ , nullptr ) ;
std : : vector < uint32_t > bucket_count ( hash_table_size_ , 0 ) ;
std : : vector < uint32_t > bucket_count ( hash_table_size_ , 0 ) ;
size_t sub_index_size_needed = BucketizeIndexesAndFillBloom ( record_list ,
size_t sub_index_size_needed = BucketizeIndexesAndFillBloom (
num_prefixes ,
record_list , num_prefixes , & hash_to_offsets , & bucket_count ) ;
hash2offsets ,
bucket_count ) ;
// From the temp data structure, populate indexes.
// From the temp data structure, populate indexes.
FillIndexes ( sub_index_size_needed , hash2 offsets , bucket_count ) ;
FillIndexes ( sub_index_size_needed , hash_to_offsets , bucket_count ) ;
return Status : : OK ( ) ;
return Status : : OK ( ) ;
}
}
@ -389,7 +391,7 @@ Status PlainTableReader::GetOffset(const Slice& target, const Slice& prefix,
uint32_t prefix_hash , bool & prefix_matched ,
uint32_t prefix_hash , bool & prefix_matched ,
uint32_t & ret_offset ) {
uint32_t & ret_offset ) {
prefix_matched = false ;
prefix_matched = false ;
int bucket = g etBucketIdFromHash( prefix_hash , hash_table_size_ ) ;
int bucket = G etBucketIdFromHash( prefix_hash , hash_table_size_ ) ;
uint32_t bucket_value = hash_table_ [ bucket ] ;
uint32_t bucket_value = hash_table_ [ bucket ] ;
if ( bucket_value = = data_end_offset_ ) {
if ( bucket_value = = data_end_offset_ ) {
ret_offset = data_end_offset_ ;
ret_offset = data_end_offset_ ;