@ -44,6 +44,7 @@
# pragma once
# include <assert.h>
# include <stdlib.h>
# include <algorithm>
# include <atomic>
# include "port/port.h"
# include "util/allocator.h"
@ -53,6 +54,9 @@ namespace rocksdb {
template < class Comparator >
class InlineSkipList {
public :
struct InsertHint ;
private :
struct Node ;
@ -77,6 +81,19 @@ class InlineSkipList {
// REQUIRES: no concurrent calls to INSERT
void Insert ( const char * key ) ;
// Inserts a key allocated by AllocateKey with a hint. It can be used to
// optimize sequential inserts, or inserting a key close to the largest
// key inserted previously with the same hint.
//
// If hint points to nullptr, a new hint will be populated, which can be
// used in subsequent calls.
//
// REQUIRES: All keys inserted with the same hint must be consecutive in the
// skip-list, i.e. let [k1..k2] be the range of keys inserted with hint h,
// there shouldn't be a key k in the skip-list with k1 < k < k2, unless k is
// also inserted with the same hint.
void InsertWithHint ( const char * key , InsertHint * * hint ) ;
// Like Insert, but external synchronization is not required.
void InsertConcurrently ( const char * key ) ;
@ -86,6 +103,9 @@ class InlineSkipList {
// Return estimated number of entries smaller than `key`.
uint64_t EstimateCount ( const char * key ) const ;
// Validate correctness of the skip-list.
void TEST_Validate ( ) const ;
// Iteration over the contents of a skip list
class Iterator {
public :
@ -134,7 +154,7 @@ class InlineSkipList {
} ;
private :
enum MaxPossibleHeightEnum : uint16_t { kMaxPossibleHeight = 32 } ;
static const uint16_t kMaxPossibleHeight = 32 ;
const uint16_t kMaxHeight_ ;
const uint16_t kBranching_ ;
@ -156,7 +176,7 @@ class InlineSkipList {
// prev_height_ is the height of prev_[0]. prev_[0] can only be equal
// to head when max_height_ and prev_height_ are both 1.
Node * * prev_ ;
std : : atomic < int32 _t> prev_height_ ;
std : : atomic < uint16 _t> prev_height_ ;
inline int GetMaxHeight ( ) const {
return max_height_ . load ( std : : memory_order_relaxed ) ;
@ -166,6 +186,13 @@ class InlineSkipList {
Node * AllocateNode ( size_t key_size , int height ) ;
// Allocate a hint used by InsertWithHint().
InsertHint * AllocateInsertHint ( ) ;
// Extract the node from a key allocated by AllocateKey(), and populate
// height of the node.
Node * GetNodeForInsert ( const char * key , int * height ) ;
bool Equal ( const char * a , const char * b ) const {
return ( compare_ ( a , b ) = = 0 ) ;
}
@ -188,6 +215,13 @@ class InlineSkipList {
// level in [0..max_height_-1], if prev is non-null.
Node * FindLessThan ( const char * key , Node * * prev = nullptr ) const ;
// Return the latest node with a key < key on bottom_level. Start searching
// from root node on the level below top_level.
// Fills prev[level] with pointer to previous node at "level" for every
// level in [bottom_level..top_level-1], if prev is non-null.
Node * FindLessThan ( const char * key , Node * * prev , Node * root , int top_level ,
int bottom_level ) const ;
// Return the last node in the list.
// Return head_ if list is empty.
Node * FindLast ( ) const ;
@ -201,6 +235,10 @@ class InlineSkipList {
void FindLevelSplice ( const char * key , Node * before , Node * after , int level ,
Node * * out_prev , Node * * out_next ) ;
// Check if we need to invalidate prev_ cache after inserting a node of
// given height.
void MaybeInvalidatePrev ( int height ) ;
// No copying allowed
InlineSkipList ( const InlineSkipList & ) ;
InlineSkipList & operator = ( const InlineSkipList & ) ;
@ -265,12 +303,31 @@ struct InlineSkipList<Comparator>::Node {
next_ [ - n ] . store ( x , std : : memory_order_relaxed ) ;
}
// Insert node after prev on specific level.
void InsertAfter ( Node * prev , int level ) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "this" in prev.
NoBarrier_SetNext ( level , prev - > NoBarrier_Next ( level ) ) ;
prev - > SetNext ( level , this ) ;
}
private :
// next_[0] is the lowest level link (level 0). Higher levels are
// stored _earlier_, so level 1 is at next_[-1].
std : : atomic < Node * > next_ [ 1 ] ;
} ;
//
//
// Hint to insert position to speed-up inserts. See implementation of
// InsertWithHint() for more details.
template < class Comparator >
struct InlineSkipList < Comparator > : : InsertHint {
Node * * prev ;
uint8_t * prev_height ;
int num_levels ;
} ;
template < class Comparator >
inline InlineSkipList < Comparator > : : Iterator : : Iterator (
const InlineSkipList * list ) {
@ -401,8 +458,17 @@ InlineSkipList<Comparator>::FindGreaterOrEqual(const char* key) const {
template < class Comparator >
typename InlineSkipList < Comparator > : : Node *
InlineSkipList < Comparator > : : FindLessThan ( const char * key , Node * * prev ) const {
Node * x = head_ ;
int level = GetMaxHeight ( ) - 1 ;
return FindLessThan ( key , prev , head_ , GetMaxHeight ( ) , 0 ) ;
}
template < class Comparator >
typename InlineSkipList < Comparator > : : Node *
InlineSkipList < Comparator > : : FindLessThan ( const char * key , Node * * prev ,
Node * root , int top_level ,
int bottom_level ) const {
assert ( top_level > bottom_level ) ;
int level = top_level - 1 ;
Node * x = root ;
// KeyIsAfter(key, last_not_after) is definitely false
Node * last_not_after = nullptr ;
while ( true ) {
@ -416,10 +482,10 @@ InlineSkipList<Comparator>::FindLessThan(const char* key, Node** prev) const {
if ( prev ! = nullptr ) {
prev [ level ] = x ;
}
if ( level = = 0 ) {
if ( level = = bottom_level ) {
return x ;
} else {
// Switch to next list, reuse KeyIU sAfterNode() result
// Switch to next list, reuse KeyIsAfterNode() result
last_not_after = next ;
level - - ;
}
@ -528,6 +594,63 @@ InlineSkipList<Comparator>::AllocateNode(size_t key_size, int height) {
return x ;
}
template < class Comparator >
typename InlineSkipList < Comparator > : : InsertHint *
InlineSkipList < Comparator > : : AllocateInsertHint ( ) {
InsertHint * hint = reinterpret_cast < InsertHint * > (
allocator_ - > AllocateAligned ( sizeof ( InsertHint ) ) ) ;
// Allocate an extra level on kMaxHeight_, to make boundary cases easier to
// handle.
hint - > prev = reinterpret_cast < Node * * > (
allocator_ - > AllocateAligned ( sizeof ( Node * ) * ( kMaxHeight_ + 1 ) ) ) ;
hint - > prev_height = reinterpret_cast < uint8_t * > (
allocator_ - > AllocateAligned ( sizeof ( uint8_t * ) * kMaxHeight_ ) ) ;
for ( int i = 0 ; i < = kMaxHeight_ ; i + + ) {
hint - > prev [ i ] = head_ ;
}
hint - > num_levels = 0 ;
return hint ;
}
template < class Comparator >
typename InlineSkipList < Comparator > : : Node *
InlineSkipList < Comparator > : : GetNodeForInsert ( const char * key , int * height ) {
// Find the Node that we placed before the key in AllocateKey
Node * x = reinterpret_cast < Node * > ( const_cast < char * > ( key ) ) - 1 ;
assert ( height ! = nullptr ) ;
* height = x - > UnstashHeight ( ) ;
assert ( * height > = 1 & & * height < = kMaxHeight_ ) ;
if ( * height > GetMaxHeight ( ) ) {
// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
// new level pointers from head_ (nullptr), or a new value set in
// the loop below. In the former case the reader will
// immediately drop to the next level since nullptr sorts after all
// keys. In the latter case the reader will use the new node.
max_height_ . store ( * height , std : : memory_order_relaxed ) ;
}
return x ;
}
template < class Comparator >
void InlineSkipList < Comparator > : : MaybeInvalidatePrev ( int height ) {
// We don't have a lock-free algorithm for updating prev_, but we do have
// the option of invalidating the entire sequential-insertion cache.
// prev_'s invariant is that prev_[i] (i > 0) is the predecessor of
// prev_[0] at that level. We're only going to violate that if height
// > 1 and key lands after prev_[height - 1] but before prev_[0].
// Comparisons are pretty expensive, so an easier version is to just
// clear the cache if height > 1. We only write to prev_height_ if the
// nobody else has, to avoid invalidating the root of the skip list in
// all of the other CPU caches.
if ( height > 1 & & prev_height_ . load ( std : : memory_order_relaxed ) ! = 0 ) {
prev_height_ . store ( 0 , std : : memory_order_relaxed ) ;
}
}
template < class Comparator >
void InlineSkipList < Comparator > : : Insert ( const char * key ) {
// InsertConcurrently often can't maintain the prev_ invariants, so
@ -558,36 +681,135 @@ void InlineSkipList<Comparator>::Insert(const char* key) {
// Our data structure does not allow duplicate insertion
assert ( prev_ [ 0 ] - > Next ( 0 ) = = nullptr | | ! Equal ( key , prev_ [ 0 ] - > Next ( 0 ) - > Key ( ) ) ) ;
// Find the Node that we placed before the key in AllocateKey
Node * x = reinterpret_cast < Node * > ( const_cast < char * > ( key ) ) - 1 ;
int height = x - > UnstashHeight ( ) ;
assert ( height > = 1 & & height < = kMaxHeight_ ) ;
if ( height > GetMaxHeight ( ) ) {
for ( int i = GetMaxHeight ( ) ; i < height ; i + + ) {
prev_ [ i ] = head_ ;
}
// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
// new level pointers from head_ (nullptr), or a new value set in
// the loop below. In the former case the reader will
// immediately drop to the next level since nullptr sorts after all
// keys. In the latter case the reader will use the new node.
max_height_ . store ( height , std : : memory_order_relaxed ) ;
}
int height = 0 ;
Node * x = GetNodeForInsert ( key , & height ) ;
for ( int i = 0 ; i < height ; i + + ) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
x - > NoBarrier_SetNext ( i , prev_ [ i ] - > NoBarrier_Next ( i ) ) ;
prev_ [ i ] - > SetNext ( i , x ) ;
x - > InsertAfter ( prev_ [ i ] , i ) ;
}
prev_ [ 0 ] = x ;
prev_height_ . store ( height , std : : memory_order_relaxed ) ;
}
// The goal here is to reduce the number of key comparisons, as it can be
// expensive. We maintain a hint which help us to find a insert position
// between or next to previously inserted keys with the same hint.
// Note that we require all keys inserted with the same hint are consecutive
// in the skip-list.
//
// The hint keeps a list of nodes previous inserted with the same hint:
// * The first level, prev[0], points to the largest key of them.
// * For 0 < i < num_levels, prev[i] is the previous node of prev[i-1]
// on level i, i.e.
// prev[i] < prev[i-1] <= prev[i]->Next(i)
// (prev[i-1] and prev[i]->Next(i) could be the same node.)
// In addition prev_height keeps the height of prev[i].
//
// When inserting a new key, we look for the lowest level L where
// prev[L] < key < prev[L-1]. Let
// M = max(prev_height[i]..prev_height[num_levels-1])
// For each level between in [L, M), the previous node of
// the new key must be one of prev[i]. For levels below L and above M
// we do normal skip-list search if needed.
//
// The optimization is suitable for stream of keys where new inserts are next
// to or close to the largest key ever inserted, e.g. sequential inserts.
template < class Comparator >
void InlineSkipList < Comparator > : : InsertWithHint ( const char * key ,
InsertHint * * hint_ptr ) {
int height = 0 ;
Node * x = GetNodeForInsert ( key , & height ) ;
// InsertWithHint() is not compatible with prev_ optimization used by
// Insert().
MaybeInvalidatePrev ( height ) ;
assert ( hint_ptr ! = nullptr ) ;
InsertHint * hint = * hint_ptr ;
if ( hint = = nullptr ) {
// AllocateInsertHint will initialize hint with num_levels = 0 and
// prev[i] = head_ for all i.
hint = AllocateInsertHint ( ) ;
* hint_ptr = hint ;
}
// Look for the first level i < num_levels with prev[i] < key.
int level = 0 ;
for ( ; level < hint - > num_levels ; level + + ) {
if ( KeyIsAfterNode ( key , hint - > prev [ level ] ) ) {
assert ( ! KeyIsAfterNode ( key , hint - > prev [ level ] - > Next ( level ) ) ) ;
break ;
}
}
Node * tmp_prev [ kMaxPossibleHeight ] ;
if ( level > = hint - > num_levels ) {
// The hint is not useful in this case. Fallback to full search.
FindLessThan ( key , tmp_prev ) ;
for ( int i = 0 ; i < height ; i + + ) {
assert ( tmp_prev [ i ] = = head_ | | KeyIsAfterNode ( key , tmp_prev [ i ] ) ) ;
assert ( ! KeyIsAfterNode ( key , tmp_prev [ i ] - > Next ( i ) ) ) ;
x - > InsertAfter ( tmp_prev [ i ] , i ) ;
}
} else {
// Search on levels below "level", using prev[level] as root.
if ( level > 0 ) {
FindLessThan ( key , tmp_prev , hint - > prev [ level ] , level , 0 ) ;
for ( int i = 0 ; i < level & & i < height ; i + + ) {
assert ( tmp_prev [ i ] = = head_ | | KeyIsAfterNode ( key , tmp_prev [ i ] ) ) ;
assert ( ! KeyIsAfterNode ( key , tmp_prev [ i ] - > Next ( i ) ) ) ;
x - > InsertAfter ( tmp_prev [ i ] , i ) ;
}
}
// The current level where the new node is to insert into skip-list.
int current_level = level ;
for ( int i = level ; i < hint - > num_levels ; i + + ) {
while ( current_level < height & & current_level < hint - > prev_height [ i ] ) {
// In this case, prev[i] is the previous node of key on current_level,
// since:
// * prev[i] < key;
// * no other nodes less than prev[level-1] has height greater than
// current_level, and prev[level-1] > key.
assert ( KeyIsAfterNode ( key , hint - > prev [ i ] ) ) ;
assert ( ! KeyIsAfterNode ( key , hint - > prev [ i ] - > Next ( current_level ) ) ) ;
x - > InsertAfter ( hint - > prev [ i ] , current_level ) ;
current_level + + ;
}
}
// Full search on levels above current_level if needed.
if ( current_level < height ) {
FindLessThan ( key , tmp_prev , head_ , GetMaxHeight ( ) , current_level ) ;
for ( int i = current_level ; i < height ; i + + ) {
assert ( tmp_prev [ i ] = = head_ | | KeyIsAfterNode ( key , tmp_prev [ i ] ) ) ;
assert ( ! KeyIsAfterNode ( key , tmp_prev [ i ] - > Next ( i ) ) ) ;
x - > InsertAfter ( tmp_prev [ i ] , i ) ;
}
}
}
// The last step is update the new node into the hint.
// * If "height" <= "level", prev[level] is still the previous node of
// prev[level-1] on level "level". Stop.
// * Otherwise, the new node becomes the new previous node of
// prev[level-1], or if level=0, the new node becomes the largest node
// inserted with the same hint. Replace prev[level] with the new node.
// * If prev[i] is replaced by another node, check if it can replace
// prev[i+1] using a similar rule, up till "num_levels" level.
Node * p = x ;
uint8_t h = static_cast < uint8_t > ( height ) ;
for ( int i = level ; i < hint - > num_levels ; i + + ) {
if ( h < = i ) {
p = nullptr ;
break ;
}
std : : swap ( p , hint - > prev [ i ] ) ;
std : : swap ( h , hint - > prev_height [ i ] ) ;
}
if ( p ! = nullptr & & h > hint - > num_levels ) {
hint - > prev [ hint - > num_levels ] = p ;
hint - > prev_height [ hint - > num_levels ] = h ;
hint - > num_levels + + ;
}
}
template < class Comparator >
void InlineSkipList < Comparator > : : FindLevelSplice ( const char * key , Node * before ,
Node * after , int level ,
@ -613,19 +835,7 @@ void InlineSkipList<Comparator>::InsertConcurrently(const char* key) {
Node * x = reinterpret_cast < Node * > ( const_cast < char * > ( key ) ) - 1 ;
int height = x - > UnstashHeight ( ) ;
assert ( height > = 1 & & height < = kMaxHeight_ ) ;
// We don't have a lock-free algorithm for updating prev_, but we do have
// the option of invalidating the entire sequential-insertion cache.
// prev_'s invariant is that prev_[i] (i > 0) is the predecessor of
// prev_[0] at that level. We're only going to violate that if height
// > 1 and key lands after prev_[height - 1] but before prev_[0].
// Comparisons are pretty expensive, so an easier version is to just
// clear the cache if height > 1. We only write to prev_height_ if the
// nobody else has, to avoid invalidating the root of the skip list in
// all of the other CPU caches.
if ( height > 1 & & prev_height_ . load ( std : : memory_order_relaxed ) ! = 0 ) {
prev_height_ . store ( 0 , std : : memory_order_relaxed ) ;
}
MaybeInvalidatePrev ( height ) ;
int max_height = max_height_ . load ( std : : memory_order_relaxed ) ;
while ( height > max_height ) {
@ -673,4 +883,44 @@ bool InlineSkipList<Comparator>::Contains(const char* key) const {
}
}
template < class Comparator >
void InlineSkipList < Comparator > : : TEST_Validate ( ) const {
// Interate over all levels at the same time, and verify nodes appear in
// the right order, and nodes appear in upper level also appear in lower
// levels.
Node * nodes [ kMaxPossibleHeight ] ;
int max_height = GetMaxHeight ( ) ;
for ( int i = 0 ; i < max_height ; i + + ) {
nodes [ i ] = head_ ;
}
while ( nodes [ 0 ] ! = nullptr ) {
Node * l0_next = nodes [ 0 ] - > Next ( 0 ) ;
if ( l0_next = = nullptr ) {
break ;
}
assert ( nodes [ 0 ] = = head_ | | compare_ ( nodes [ 0 ] - > Key ( ) , l0_next - > Key ( ) ) < 0 ) ;
nodes [ 0 ] = l0_next ;
int i = 1 ;
while ( i < max_height ) {
Node * next = nodes [ i ] - > Next ( i ) ;
if ( next = = nullptr ) {
break ;
}
auto cmp = compare_ ( nodes [ 0 ] - > Key ( ) , next - > Key ( ) ) ;
assert ( cmp < = 0 ) ;
if ( cmp = = 0 ) {
assert ( next = = nodes [ 0 ] ) ;
nodes [ i ] = next ;
} else {
break ;
}
i + + ;
}
}
for ( int i = 1 ; i < max_height ; i + + ) {
assert ( nodes [ i ] - > Next ( i ) = = nullptr ) ;
}
}
} // namespace rocksdb