@ -5,6 +5,7 @@
# include "table/plain_table_reader.h"
# include <unordered_map>
# include <map>
# include "db/dbformat.h"
@ -23,21 +24,35 @@
# include "util/coding.h"
# include "util/hash.h"
# include "util/histogram.h"
# include "util/murmurhash.h"
# include "util/perf_context_imp.h"
# include "util/stop_watch.h"
namespace std {
template < >
struct hash < rocksdb : : Slice > {
public :
std : : size_t operator ( ) ( rocksdb : : Slice const & s ) const {
return rocksdb : : Hash ( s . data ( ) , s . size ( ) , 397 ) ;
return MurmurHash ( s . data ( ) , s . size ( ) , 397 ) ;
}
} ;
class slice_comparator {
public :
bool operator ( ) ( rocksdb : : Slice const & s1 , rocksdb : : Slice const & s2 ) {
return s1 . compare ( s2 ) < 0 ;
}
} ;
}
namespace rocksdb {
static uint32_t getBucketId ( Slice const & s , size_t prefix_len ,
uint32_t num_buckets ) {
return MurmurHash ( s . data ( ) , prefix_len , 397 ) % num_buckets ;
}
PlainTableReader : : PlainTableReader ( const EnvOptions & storage_options ,
uint64_t file_size , int user_key_size ,
int key_prefix_len , int bloom_bits_per_key ,
@ -51,6 +66,8 @@ PlainTableReader::PlainTableReader(const EnvOptions& storage_options,
filter_policy_ = nullptr ;
}
hash_table_ = nullptr ;
data_start_offset_ = 0 ;
data_end_offset_ = file_size ;
}
PlainTableReader : : ~ PlainTableReader ( ) {
@ -73,6 +90,10 @@ Status PlainTableReader::Open(const Options& options,
double hash_table_ratio ) {
assert ( options . allow_mmap_reads ) ;
if ( file_size > 2147483646 ) {
return Status : : NotSupported ( " File is too large for PlainTableReader! " ) ;
}
PlainTableReader * t = new PlainTableReader ( soptions , file_size ,
user_key_size ,
key_prefix_len ,
@ -101,104 +122,111 @@ Iterator* PlainTableReader::NewIterator(const ReadOptions& options) {
}
Status PlainTableReader : : PopulateIndex ( uint64_t file_size ) {
// Get mmapped memory to file_data_.
Status s = file_ - > Read ( 0 , file_size_ , & file_data_ , nullptr ) ;
if ( ! s . ok ( ) ) {
return s ;
}
version_ = DecodeFixed32 ( file_data_ . data ( ) ) ;
version_ ^ = 0x80000000 ;
assert ( version_ = = 1 ) ;
data_start_offset_ = 4 ;
data_end_offset_ = file_size ;
Slice key_slice ;
Slice key_prefix_slice ;
Slice key_suffix_slice ;
Slice value_slice ;
Slice tmp_slice ;
Slice prev_key_prefix_slice ;
uint64_t pos = 0 ;
uint64_t data_offset_for_cur_prefix = 0 ;
int count_prefix = 0 ;
uint32_t pos = data_start_offset_ ;
int key_index_within_prefix = 0 ;
bool first = true ;
std : : string prefix_sub_index ;
HistogramImpl keys_per_prefix_hist ;
std : : unordered_map < Slice , uint64_t > tmp_index ;
// Need map to be ordered to make sure sub indexes generated
// are in order.
std : : map < Slice , std : : string , std : : slice_comparator > prefix2map ;
while ( pos < file_size ) {
uint64 _t key_offset = pos ;
po s = Next ( pos , & key_slice , & value_slice , & tmp_slice ) ;
uint32 _t key_offset = pos ;
status_ = Next ( pos , & key_slice , & value_slice , pos ) ;
key_prefix_slice = Slice ( key_slice . data ( ) , key_prefix_len_ ) ;
if ( first | | prev_key_prefix_slice ! = key_prefix_slice ) {
if ( ! first ) {
if ( count_prefix < 8 | | key_prefix_len_ = = user_key_size_ ) {
tmp_index [ prev_key_prefix_slice ] = data_offset_for_cur_prefix ;
} else {
tmp_index [ prev_key_prefix_slice ] = sub_index_ . length ( )
| kSubIndexMask ;
PutFixed32 ( & sub_index_ , ( count_prefix - 1 ) / 8 + 1 ) ;
sub_index_ . append ( prefix_sub_index ) ;
}
prefix_sub_index . clear ( ) ;
data_offset_for_cur_prefix = key_offset ;
keys_per_prefix_hist . Add ( count_prefix ) ;
keys_per_prefix_hist . Add ( key_index_within_prefix ) ;
}
key_index_within_prefix = 0 ;
prev_key_prefix_slice = key_prefix_slice ;
count_prefix = 1 ;
} else {
count_prefix + + ;
}
if ( key_prefix_len_ < user_key_size_ & & count_prefix % 8 = = 1 ) {
prefix_sub_index . append ( key_slice . data ( ) + key_prefix_len_ ,
user_key_size_ - key_prefix_len_ ) ;
PutFixed64 ( & prefix_sub_index , key_offset ) ;
}
first = false ;
if ( key_index_within_prefix + + % 8 = = 0 ) {
// Add an index key for every 8 keys
std : : string & prefix_index = prefix2map [ key_prefix_slice ] ;
PutFixed32 ( & prefix_index , key_offset ) ;
}
keys_per_prefix_hist . Add ( count_prefix ) ;
if ( count_prefix < = 2 | | key_prefix_len_ = = user_key_size_ ) {
tmp_index [ prev_key_prefix_slice ] = data_offset_for_cur_prefix ;
} else {
tmp_index [ prev_key_prefix_slice ] = sub_index_ . length ( ) | kSubIndexMask ;
PutFixed32 ( & sub_index_ , ( count_prefix - 1 ) / 8 + 1 ) ;
sub_index_ . append ( prefix_sub_index ) ;
first = false ;
}
keys_per_prefix_hist . Add ( key_index_within_prefix ) ;
if ( hash_table_ ! = nullptr ) {
delete [ ] hash_table_ ;
}
// Make the hash table 3/5 full
std : : vector < Slice > filter_entries ( 0 ) ; // for creating bloom filter;
if ( filter_policy_ ! = nullptr ) {
filter_entries . reserve ( tmp_index . size ( ) ) ;
filter_entries . reserve ( prefix2map . size ( ) ) ;
}
double hash_table_size_multipier =
( hash_table_ratio_ > 1.0 ) ? 1.0 : 1.0 / hash_table_ratio_ ;
hash_table_size_ = tmp_index . size ( ) * hash_table_size_multipier + 1 ;
hash_table_ = new char [ GetHashTableRecordLen ( ) * hash_table_size_ ] ;
for ( int i = 0 ; i < hash_table_size_ ; i + + ) {
memcpy ( GetHashTableBucketPtr ( i ) + key_prefix_len_ , & file_size_ ,
kOffsetLen ) ;
}
hash_table_size_ = prefix2map . size ( ) * hash_table_size_multipier + 1 ;
hash_table_ = new uint32_t [ hash_table_size_ ] ;
std : : vector < std : : string > hash2map ( hash_table_size_ ) ;
for ( auto it = tmp_index . begin ( ) ; it ! = tmp_index . end ( ) ; + + it ) {
size_t sub_index_size_needed = 0 ;
for ( auto & p : prefix2map ) {
auto & sub_index = hash2map [ getBucketId ( p . first , key_prefix_len_ ,
hash_table_size_ ) ] ;
if ( sub_index . length ( ) > 0 | | p . second . length ( ) > kOffsetLen ) {
if ( sub_index . length ( ) < = kOffsetLen ) {
sub_index_size_needed + = sub_index . length ( ) + 4 ;
}
sub_index_size_needed + = p . second . length ( ) ;
}
sub_index . append ( p . second ) ;
if ( filter_policy_ ! = nullptr ) {
filter_entries . push_back ( it - > first ) ;
filter_entries . push_back ( p . first ) ;
}
}
int bucket = GetHashTableBucket ( it - > first ) ;
uint64_t * hash_value ;
while ( true ) {
GetHashValue ( bucket , & hash_value ) ;
if ( * hash_value = = file_size_ ) {
sub_index_ . clear ( ) ;
Log ( options_ . info_log , " Reserving %zu bytes for sub index " ,
sub_index_size_needed ) ;
sub_index_ . reserve ( sub_index_size_needed ) ;
for ( int i = 0 ; i < hash_table_size_ ; i + + ) {
uint32_t num_keys_for_bucket = hash2map [ i ] . length ( ) / kOffsetLen ;
switch ( num_keys_for_bucket ) {
case 0 :
// No key for bucket
hash_table_ [ i ] = data_end_offset_ ;
break ;
case 1 :
// point directly to the file offset
hash_table_ [ i ] = DecodeFixed32 ( hash2map [ i ] . data ( ) ) ;
break ;
default :
// point to index block
hash_table_ [ i ] = sub_index_ . length ( ) | kSubIndexMask ;
PutFixed32 ( & sub_index_ , num_keys_for_bucket ) ;
sub_index_ . append ( hash2map [ i ] ) ;
}
bucket = ( bucket + 1 ) % hash_table_size_ ;
}
char * bucket_ptr = GetHashTableBucketPtr ( bucket ) ;
memcpy ( bucket_ptr , it - > first . data ( ) , key_prefix_len_ ) ;
memcpy ( bucket_ptr + key_prefix_len_ , & it - > second , kOffsetLen ) ;
}
if ( filter_policy_ ! = nullptr ) {
filter_str_ . clear ( ) ;
filter_policy_ - > CreateFilter ( & filter_entries [ 0 ] , filter_entries . size ( ) ,
& filter_str_ ) ;
filter_slice_ = Slice ( filter_str_ . data ( ) , filter_str_ . size ( ) ) ;
}
Log ( options_ . info_log , " Number of prefixes: %d, suffix_map length %ld " ,
Log ( options_ . info_log , " hash table size: %d, suffix_map length %zu " ,
hash_table_size_ , sub_index_ . length ( ) ) ;
Log ( options_ . info_log , " Number of Keys per prefix Histogram: %s " ,
keys_per_prefix_hist . ToString ( ) . c_str ( ) ) ;
@ -206,51 +234,33 @@ Status PlainTableReader::PopulateIndex(uint64_t file_size) {
return Status : : OK ( ) ;
}
inline int PlainTableReader : : GetHashTableBucket ( Slice key ) {
return rocksdb : : Hash ( key . data ( ) , key_prefix_len_ , 397 ) % hash_table_size_ ;
}
inline void PlainTableReader : : GetHashValue ( int bucket , uint64_t * * ret_value ) {
* ret_value = ( uint64_t * ) ( GetHashTableBucketPtr ( bucket ) + key_prefix_len_ ) ;
}
Status PlainTableReader : : GetOffset ( const Slice & target , uint64_t * offset ) {
Status s ;
int bucket = GetHashTableBucket ( target ) ;
uint64_t * found_value ;
Slice hash_key ;
while ( true ) {
GetHashValue ( bucket , & found_value ) ;
if ( * found_value = = file_size_ ) {
break ;
}
GetHashKey ( bucket , & hash_key ) ;
if ( target . starts_with ( hash_key ) ) {
break ;
}
bucket = ( bucket + 1 ) % hash_table_size_ ;
}
if ( * found_value = = file_size_ | | ( * found_value & kSubIndexMask ) = = 0 ) {
* offset = * found_value ;
return Status : : OK ( ) ;
uint32_t PlainTableReader : : GetOffset ( const Slice & target ,
bool & prefix_matched ) {
prefix_matched = false ;
int bucket = getBucketId ( target , key_prefix_len_ , hash_table_size_ ) ;
uint32_t bucket_value = hash_table_ [ bucket ] ;
if ( bucket_value = = data_end_offset_ ) {
return data_end_offset_ ;
} else if ( ( bucket_value & kSubIndexMask ) = = 0 ) {
// point directly to the file
return bucket_value ;
}
// point to sub-index, need to do a binary search
uint32_t low = 0 ;
uint64_t prefix_index_offset = * found_value ^ kSubIndexMask ;
uint32_t high = DecodeFixed32 ( sub_index_ . data ( ) + prefix_index_offset ) ;
uint64_t prefix_index_offset = bucket_value ^ kSubIndexMask ;
uint32_t upper_bound = DecodeFixed32 ( sub_index_ . data ( ) + prefix_index_offset ) ;
uint32_t high = upper_bound ;
uint64_t base_offset = prefix_index_offset + 4 ;
char * mid_key_str = new char [ target . size ( ) ] ;
memcpy ( mid_key_str , target . data ( ) , target . size ( ) ) ;
Slice mid_key = Slice ( mid_key_str , target . size ( ) ) ;
Slice mid_key ;
// The key is between ( low, high). Do a binary search between it.
// The key is between [low, high). Do a binary search between it.
while ( high - low > 1 ) {
uint32_t mid = ( high + low ) / 2 ;
const char * base = sub_index_ . data ( ) + base_offset
+ ( user_key_size_ - key_prefix_len_ + kOffsetLen ) * mid ;
memcpy ( mid_key_str + key_prefix_len_ , base ,
user_key_size_ - key_prefix_len _ ) ;
const char * index_offset = sub_index_ . data ( ) + base_offset
+ kOffsetLen * mid ;
uint32_t file_offset = DecodeFixed32 ( index_offset ) ;
mid_key = Slice ( file_data_ . data ( ) + file_offset , user_key_size_ ) ;
int cmp_result = options_ . comparator - > Compare ( target , mid_key ) ;
if ( cmp_result > 0 ) {
@ -259,21 +269,32 @@ Status PlainTableReader::GetOffset(const Slice& target, uint64_t* offset) {
if ( cmp_result = = 0 ) {
// Happen to have found the exact key or target is smaller than the
// first key after base_offset.
* offset = DecodeFixed64 ( base + user_key_size_ - key_prefix_len_ ) ;
delete [ ] mid_key_str ;
return s ;
prefix_matched = true ;
return file_offset ;
} else {
high = mid ;
}
}
}
const char * base = sub_index_ . data ( ) + base_offset
+ ( user_key_size_ - key_prefix_len_ + kOffsetLen ) * low ;
* offset = DecodeFixed64 ( base + user_key_size_ - key_prefix_len_ ) ;
delete [ ] mid_key_str ;
return s ;
// The key is between low and low+1 (if exists). Both of them can have the
// correct prefix. Need to rule out at least one, to avoid to miss the
// correct one.
uint32_t low_key_offset = DecodeFixed32 (
sub_index_ . data ( ) + base_offset + kOffsetLen * low ) ;
if ( low + 1 < upper_bound ) {
if ( Slice ( file_data_ . data ( ) + low_key_offset , key_prefix_len_ )
= = Slice ( target . data ( ) , key_prefix_len_ ) ) {
prefix_matched = true ;
} else {
prefix_matched = false ;
return DecodeFixed32 (
sub_index_ . data ( ) + base_offset + kOffsetLen * ( low + 1 ) ) ;
}
} else {
prefix_matched = false ;
}
return low_key_offset ;
}
bool PlainTableReader : : MayHavePrefix ( const Slice & target_prefix ) {
@ -282,46 +303,74 @@ bool PlainTableReader::MayHavePrefix(const Slice& target_prefix) {
}
uint64_t PlainTableReader : : Next ( uint64_t offset , Slice * key , Slice * value ,
Slice * tmp_slice ) {
if ( offset > = file_size_ ) {
return file_size_ ;
Status PlainTableReader : : Next ( uint32_t offset , Slice * key , Slice * value ,
uint32_t & next_offset ) {
if ( offset = = data_end_offset_ ) {
next_offset = data_end_offset_ ;
return Status : : OK ( ) ;
}
int internal_key_size = GetInternalKeyLength ( ) ;
Status s = file_ - > Read ( offset , internal_key_size , key , nullptr ) ;
offset + = internal_key_size ;
if ( offset > data_end_offset_ ) {
return Status : : Corruption ( " Offset is out of file size " ) ;
}
s = file_ - > Read ( offset , 4 , tmp_slice , nullptr ) ;
offset + = 4 ;
uint32_t value_size = DecodeFixed32 ( tmp_slice - > data ( ) ) ;
int internal_key_size = GetInternalKeyLength ( ) ;
if ( offset + internal_key_size > = data_end_offset_ ) {
return Status : : Corruption ( " Un able to read the next key " ) ;
}
s = file_ - > Read ( offset , value_size , value , nullptr ) ;
offset + = value_size ;
const char * key_ptr = file_data_ . data ( ) + offset ;
* key = Slice ( key_ptr , internal_key_size ) ;
return offset ;
uint32_t value_size ;
const char * value_ptr = GetVarint32Ptr ( key_ptr + internal_key_size ,
file_data_ . data ( ) + data_end_offset_ ,
& value_size ) ;
if ( value_ptr = = nullptr ) {
return Status : : Corruption ( " Error reading value length. " ) ;
}
next_offset = offset + ( value_ptr - key_ptr ) + value_size ;
if ( next_offset > data_end_offset_ ) {
return Status : : Corruption ( " Reach end of file when reading value " ) ;
}
* value = Slice ( value_ptr , value_size ) ;
return Status : : OK ( ) ;
}
Status PlainTableReader : : Get (
const ReadOptions & ro , const Slice & target , void * arg ,
bool ( * saver ) ( void * , const Slice & , const Slice & , bool ) ,
void ( * mark_key_may_exist ) ( void * ) ) {
uint64_t offset ;
Status s = GetOffset ( target , & offset ) ;
if ( ! s . ok ( ) ) {
return s ;
// Check bloom filter first.
if ( ! MayHavePrefix ( Slice ( target . data ( ) , key_prefix_len_ ) ) ) {
return Status : : OK ( ) ;
}
uint32_t offset ;
bool prefix_match ;
offset = GetOffset ( target , prefix_match ) ;
Slice found_key ;
Slice found_value ;
Slice tmp_slice ;
while ( offset < file_size_ ) {
offset = Next ( offset , & found_key , & found_value , & tmp_slice ) ;
while ( offset < data_end_offset_ ) {
Status s = Next ( offset , & found_key , & found_value , offset ) ;
if ( ! s . ok ( ) ) {
return s ;
}
if ( ! prefix_match ) {
// Need to verify prefix for the first key found if it is not yet
// checked.
if ( ! target . starts_with ( Slice ( found_key . data ( ) , key_prefix_len_ ) ) ) {
break ;
}
prefix_match = true ;
}
if ( options_ . comparator - > Compare ( found_key , target ) > = 0
& & ! ( * saver ) ( arg , found_key , found_value , true ) ) {
break ;
}
}
return s ;
return Statu s: : OK ( ) ;
}
bool PlainTableReader : : TEST_KeyInCache ( const ReadOptions & options ,
@ -342,11 +391,12 @@ PlainTableIterator::~PlainTableIterator() {
}
bool PlainTableIterator : : Valid ( ) const {
return offset_ < table_ - > file_size_ & & offset_ > = 0 ;
return offset_ < table_ - > data_end_offset_
& & offset_ > = table_ - > data_start_offset_ ;
}
void PlainTableIterator : : SeekToFirst ( ) {
next_offset_ = 0 ;
next_offset_ = table_ - > data_start_offset_ ;
Next ( ) ;
}
@ -356,26 +406,35 @@ void PlainTableIterator::SeekToLast() {
void PlainTableIterator : : Seek ( const Slice & target ) {
if ( ! table_ - > MayHavePrefix ( Slice ( target . data ( ) , table_ - > key_prefix_len_ ) ) ) {
offset_ = next_offset_ = table_ - > file_size _;
offset_ = next_offset_ = table_ - > data_end_offset _;
return ;
}
bool prefix_match ;
next_offset_ = table_ - > GetOffset ( target , prefix_match ) ;
Status s = table_ - > GetOffset ( target , & next_offset_ ) ;
if ( ! s . ok ( ) ) {
status_ = s ;
if ( next_offset_ < table_ - > data_end_offset_ ) {
for ( Next ( ) ; status_ . ok ( ) & & Valid ( ) ; Next ( ) ) {
if ( ! prefix_match ) {
// Need to verify the first key's prefix
if ( ! target . starts_with ( Slice ( key ( ) . data ( ) , table_ - > key_prefix_len_ ) ) ) {
offset_ = next_offset_ = table_ - > data_end_offset_ ;
break ;
}
if ( next_offset_ < table_ - > file_size_ ) {
for ( Next ( ) ;
Valid ( ) & & table_ - > options_ . comparator - > Compare ( key ( ) , target ) < 0 ;
Next ( ) ) {
prefix_match = true ;
}
if ( table_ - > options_ . comparator - > Compare ( key ( ) , target ) > = 0 ) {
break ;
}
}
} else {
offset_ = table_ - > data_end_offset_ ;
}
}
void PlainTableIterator : : Next ( ) {
offset_ = next_offset_ ;
Slice tmp_slice ;
next_offset _ = table_ - > Next ( next_offset_ , & key_ , & value_ , & tmp_slice ) ;
status _ = table_ - > Next ( next_offset_ , & key_ , & value_ , next_offset_ ) ;
}
void PlainTableIterator : : Prev ( ) {