@ -54,13 +54,13 @@ namespace rocksdb {
template < class Comparator >
class InlineSkipList {
public :
struct InsertHint ;
private :
struct Node ;
struct Splice ;
public :
static const uint16_t kMaxPossibleHeight = 32 ;
// Create a new InlineSkipList object that will use "cmp" for comparing
// keys, and will allocate memory using "*allocator". Objects allocated
// in the allocator must remain allocated for the lifetime of the
@ -74,29 +74,49 @@ class InlineSkipList {
// is thread-safe.
char * AllocateKey ( size_t key_size ) ;
// Allocate a splice using allocator.
Splice * AllocateSplice ( ) ;
// Inserts a key allocated by AllocateKey, after the actual key value
// has been filled in.
//
// REQUIRES: nothing that compares equal to key is currently in the list.
// REQUIRES: no concurrent calls to INSERT
// REQUIRES: no concurrent calls to any of inserts.
void Insert ( const char * key ) ;
// Inserts a key allocated by AllocateKey with a hint. It can be used to
// optimize sequential inserts, or inserting a key close to the largest
// key inserted previously with the same hint .
// Inserts a key allocated by AllocateKey with a hint of last insert
// position in the skip-list. If hint points to nullptr, a new hint will be
// populated, which can be used in subsequent calls .
//
// If hint points to nullptr, a new hint will be populated, which can be
// used in subsequent calls.
// It can be used to optimize the workload where there are multiple groups
// of keys, and each key is likely to insert to a location close to the last
// inserted key in the same group. One example is sequential inserts.
//
// REQUIRES: All keys inserted with the same hint must be consecutive in the
// skip-list, i.e. let [k1..k2] be the range of keys inserted with hint h,
// there shouldn't be a key k in the skip-list with k1 < k < k2, unless k is
// also inserted with the same hint.
void InsertWithHint ( const char * key , InsertHint * * hint ) ;
// REQUIRES: nothing that compares equal to key is currently in the list.
// REQUIRES: no concurrent calls to any of inserts.
void InsertWithHint ( const char * key , void * * hint ) ;
// Like Insert, but external synchronization is not required.
void InsertConcurrently ( const char * key ) ;
// Inserts a node into the skip list. key must have been allocated by
// AllocateKey and then filled in by the caller. If UseCAS is true,
// then external synchronization is not required, otherwise this method
// may not be called concurrently with any other insertions.
//
// Regardless of whether UseCAS is true, the splice must be owned
// exclusively by the current thread. If allow_partial_splice_fix is
// true, then the cost of insertion is amortized O(log D), where D is
// the distance from the splice to the inserted key (measured as the
// number of intervening nodes). Note that this bound is very good for
// sequential insertions! If allow_partial_splice_fix is false then
// the existing splice will be ignored unless the current key is being
// inserted immediately after the splice. allow_partial_splice_fix ==
// false has worse running time for the non-sequential case O(log N),
// but a better constant factor.
template < bool UseCAS >
void Insert ( const char * key , Splice * splice , bool allow_partial_splice_fix ) ;
// Returns true iff an entry that compares equal to key is in the list.
bool Contains ( const char * key ) const ;
@ -154,8 +174,6 @@ class InlineSkipList {
} ;
private :
static const uint16_t kMaxPossibleHeight = 32 ;
const uint16_t kMaxHeight_ ;
const uint16_t kBranching_ ;
const uint32_t kScaledInverseBranching_ ;
@ -170,13 +188,10 @@ class InlineSkipList {
// values are ok.
std : : atomic < int > max_height_ ; // Height of the entire list
// Used for optimizing sequential insert patterns. Tricky. prev_height_
// of zero means prev_ is undefined. Otherwise: prev_[i] for i up
// to max_height_ - 1 (inclusive) is the predecessor of prev_[0], and
// prev_height_ is the height of prev_[0]. prev_[0] can only be equal
// to head when max_height_ and prev_height_ are both 1.
Node * * prev_ ;
std : : atomic < uint16_t > prev_height_ ;
// seq_splice_ is a Splice used for insertions in the non-concurrent
// case. It caches the prev and next found during the most recent
// non-concurrent insertion.
Splice * seq_splice_ ;
inline int GetMaxHeight ( ) const {
return max_height_ . load ( std : : memory_order_relaxed ) ;
@ -186,13 +201,6 @@ class InlineSkipList {
Node * AllocateNode ( size_t key_size , int height ) ;
// Allocate a hint used by InsertWithHint().
InsertHint * AllocateInsertHint ( ) ;
// Extract the node from a key allocated by AllocateKey(), and populate
// height of the node.
Node * GetNodeForInsert ( const char * key , int * height ) ;
bool Equal ( const char * a , const char * b ) const {
return ( compare_ ( a , b ) = = 0 ) ;
}
@ -202,7 +210,7 @@ class InlineSkipList {
}
// Return true if key is greater than the data stored in "n". Null n
// is considered infinite.
// is considered infinite. n should not be head_.
bool KeyIsAfterNode ( const char * key , Node * n ) const ;
// Returns the earliest node with a key >= key.
@ -232,12 +240,13 @@ class InlineSkipList {
// point to a node that is before the key, and after should point to
// a node that is after the key. after should be nullptr if a good after
// node isn't conveniently available.
void FindLevel Splice ( const char * key , Node * before , Node * after , int level ,
void FindSpliceForLevel ( const char * key , Node * before , Node * after , int level ,
Node * * out_prev , Node * * out_next ) ;
// Check if we need to invalidate prev_ cache after inserting a node of
// given height.
void MaybeInvalidatePrev ( int height ) ;
// Recomputes Splice levels from highest_level (inclusive) down to
// lowest_level (inclusive).
void RecomputeSpliceLevels ( const char * key , Splice * splice ,
int recompute_level ) ;
// No copying allowed
InlineSkipList ( const InlineSkipList & ) ;
@ -246,6 +255,19 @@ class InlineSkipList {
// Implementation details follow
template < class Comparator >
struct InlineSkipList < Comparator > : : Splice {
// The invariant of a Splice is that prev_[i+1].key <= prev_[i].key <
// next_[i].key <= next_[i+1].key for all i. That means that if a
// key is bracketed by prev_[i] and next_[i] then it is bracketed by
// all higher levels. It is _not_ required that prev_[i]->Next(i) ==
// next_[i] (it probably did at some point in the past, but intervening
// or concurrent operations might have inserted nodes in between).
int height_ = 0 ;
Node * * prev_ ;
Node * * next_ ;
} ;
// The Node data type is more of a pointer into custom-managed memory than
// a traditional C++ struct. The key is stored in the bytes immediately
// after the struct, and the next_ pointers for nodes with height > 1 are
@ -317,17 +339,6 @@ struct InlineSkipList<Comparator>::Node {
std : : atomic < Node * > next_ [ 1 ] ;
} ;
//
//
// Hint to insert position to speed-up inserts. See implementation of
// InsertWithHint() for more details.
template < class Comparator >
struct InlineSkipList < Comparator > : : InsertHint {
Node * * prev ;
uint8_t * prev_height ;
int num_levels ;
} ;
template < class Comparator >
inline InlineSkipList < Comparator > : : Iterator : : Iterator (
const InlineSkipList * list ) {
@ -419,6 +430,7 @@ template <class Comparator>
bool InlineSkipList < Comparator > : : KeyIsAfterNode ( const char * key ,
Node * n ) const {
// nullptr n is considered infinite
assert ( n ! = head_ ) ;
return ( n ! = nullptr ) & & ( compare_ ( n - > Key ( ) , key ) < 0 ) ;
}
@ -549,19 +561,14 @@ InlineSkipList<Comparator>::InlineSkipList(const Comparator cmp,
allocator_ ( allocator ) ,
head_ ( AllocateNode ( 0 , max_height ) ) ,
max_height_ ( 1 ) ,
prev_height_ ( 1 ) {
seq_splice_ ( AllocateSplice ( ) ) {
assert ( max_height > 0 & & kMaxHeight_ = = static_cast < uint32_t > ( max_height ) ) ;
assert ( branching_factor > 1 & &
kBranching_ = = static_cast < uint32_t > ( branching_factor ) ) ;
assert ( kScaledInverseBranching_ > 0 ) ;
// Allocate the prev_ Node* array, directly from the passed-in allocator.
// prev_ does not need to be freed, as its life cycle is tied up with
// the allocator as a whole.
prev_ = reinterpret_cast < Node * * > (
allocator_ - > AllocateAligned ( sizeof ( Node * ) * kMaxHeight_ ) ) ;
for ( int i = 0 ; i < kMaxHeight_ ; i + + ) {
for ( int i = 0 ; i < kMaxHeight_ ; + + i ) {
head_ - > SetNext ( i , nullptr ) ;
prev_ [ i ] = head_ ;
}
}
@ -595,225 +602,48 @@ InlineSkipList<Comparator>::AllocateNode(size_t key_size, int height) {
}
template < class Comparator >
typename InlineSkipList < Comparator > : : InsertHint *
InlineSkipList < Comparator > : : AllocateInsertHint ( ) {
InsertHint * hint = reinterpret_cast < InsertHint * > (
allocator_ - > AllocateAligned ( sizeof ( InsertHint ) ) ) ;
// Allocate an extra level on kMaxHeight_, to make boundary cases easier to
// handle.
hint - > prev = reinterpret_cast < Node * * > (
allocator_ - > AllocateAligned ( sizeof ( Node * ) * ( kMaxHeight_ + 1 ) ) ) ;
hint - > prev_height = reinterpret_cast < uint8_t * > (
allocator_ - > AllocateAligned ( sizeof ( uint8_t * ) * kMaxHeight_ ) ) ;
for ( int i = 0 ; i < = kMaxHeight_ ; i + + ) {
hint - > prev [ i ] = head_ ;
}
hint - > num_levels = 0 ;
return hint ;
typename InlineSkipList < Comparator > : : Splice *
InlineSkipList < Comparator > : : AllocateSplice ( ) {
// size of prev_ and next_
size_t array_size = sizeof ( Node * ) * ( kMaxHeight_ + 1 ) ;
char * raw = allocator_ - > AllocateAligned ( sizeof ( Splice ) + array_size * 2 ) ;
Splice * splice = reinterpret_cast < Splice * > ( raw ) ;
splice - > height_ = 0 ;
splice - > prev_ = reinterpret_cast < Node * * > ( raw + sizeof ( Splice ) ) ;
splice - > next_ = reinterpret_cast < Node * * > ( raw + sizeof ( Splice ) + array_size ) ;
return splice ;
}
template < class Comparator >
typename InlineSkipList < Comparator > : : Node *
InlineSkipList < Comparator > : : GetNodeForInsert ( const char * key , int * height ) {
// Find the Node that we placed before the key in AllocateKey
Node * x = reinterpret_cast < Node * > ( const_cast < char * > ( key ) ) - 1 ;
assert ( height ! = nullptr ) ;
* height = x - > UnstashHeight ( ) ;
assert ( * height > = 1 & & * height < = kMaxHeight_ ) ;
if ( * height > GetMaxHeight ( ) ) {
// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
// new level pointers from head_ (nullptr), or a new value set in
// the loop below. In the former case the reader will
// immediately drop to the next level since nullptr sorts after all
// keys. In the latter case the reader will use the new node.
max_height_ . store ( * height , std : : memory_order_relaxed ) ;
}
return x ;
void InlineSkipList < Comparator > : : Insert ( const char * key ) {
Insert < false > ( key , seq_splice_ , false ) ;
}
template < class Comparator >
void InlineSkipList < Comparator > : : MaybeInvalidatePrev ( int height ) {
// We don't have a lock-free algorithm for updating prev_, but we do have
// the option of invalidating the entire sequential-insertion cache.
// prev_'s invariant is that prev_[i] (i > 0) is the predecessor of
// prev_[0] at that level. We're only going to violate that if height
// > 1 and key lands after prev_[height - 1] but before prev_[0].
// Comparisons are pretty expensive, so an easier version is to just
// clear the cache if height > 1. We only write to prev_height_ if the
// nobody else has, to avoid invalidating the root of the skip list in
// all of the other CPU caches.
if ( height > 1 & & prev_height_ . load ( std : : memory_order_relaxed ) ! = 0 ) {
prev_height_ . store ( 0 , std : : memory_order_relaxed ) ;
}
void InlineSkipList < Comparator > : : InsertConcurrently ( const char * key ) {
Node * prev [ kMaxPossibleHeight ] ;
Node * next [ kMaxPossibleHeight ] ;
Splice splice ;
splice . prev_ = prev ;
splice . next_ = next ;
Insert < true > ( key , & splice , false ) ;
}
template < class Comparator >
void InlineSkipList < Comparator > : : Insert ( const char * key ) {
// InsertConcurrently often can't maintain the prev_ invariants, so
// it just sets prev_height_ to zero, letting us know that we should
// ignore it. A relaxed load suffices here because write thread
// synchronization separates Insert calls from InsertConcurrently calls.
auto prev_height = prev_height_ . load ( std : : memory_order_relaxed ) ;
// fast path for sequential insertion
if ( prev_height > 0 & & ! KeyIsAfterNode ( key , prev_ [ 0 ] - > NoBarrier_Next ( 0 ) ) & &
( prev_ [ 0 ] = = head_ | | KeyIsAfterNode ( key , prev_ [ 0 ] ) ) ) {
assert ( prev_ [ 0 ] ! = head_ | | ( prev_height = = 1 & & GetMaxHeight ( ) = = 1 ) ) ;
// Outside of this method prev_[1..max_height_] is the predecessor
// of prev_[0], and prev_height_ refers to prev_[0]. Inside Insert
// prev_[0..max_height - 1] is the predecessor of key. Switch from
// the external state to the internal
for ( int i = 1 ; i < prev_height ; i + + ) {
prev_ [ i ] = prev_ [ 0 ] ;
}
} else {
// TODO(opt): we could use a NoBarrier predecessor search as an
// optimization for architectures where memory_order_acquire needs
// a synchronization instruction. Doesn't matter on x86
FindLessThan ( key , prev_ ) ;
}
// Our data structure does not allow duplicate insertion
assert ( prev_ [ 0 ] - > Next ( 0 ) = = nullptr | | ! Equal ( key , prev_ [ 0 ] - > Next ( 0 ) - > Key ( ) ) ) ;
int height = 0 ;
Node * x = GetNodeForInsert ( key , & height ) ;
for ( int i = 0 ; i < height ; i + + ) {
x - > InsertAfter ( prev_ [ i ] , i ) ;
}
prev_ [ 0 ] = x ;
prev_height_ . store ( height , std : : memory_order_relaxed ) ;
}
// The goal here is to reduce the number of key comparisons, as it can be
// expensive. We maintain a hint which help us to find a insert position
// between or next to previously inserted keys with the same hint.
// Note that we require all keys inserted with the same hint are consecutive
// in the skip-list.
//
// The hint keeps a list of nodes previous inserted with the same hint:
// * The first level, prev[0], points to the largest key of them.
// * For 0 < i < num_levels, prev[i] is the previous node of prev[i-1]
// on level i, i.e.
// prev[i] < prev[i-1] <= prev[i]->Next(i)
// (prev[i-1] and prev[i]->Next(i) could be the same node.)
// In addition prev_height keeps the height of prev[i].
//
// When inserting a new key, we look for the lowest level L where
// prev[L] < key < prev[L-1]. Let
// M = max(prev_height[i]..prev_height[num_levels-1])
// For each level between in [L, M), the previous node of
// the new key must be one of prev[i]. For levels below L and above M
// we do normal skip-list search if needed.
//
// The optimization is suitable for stream of keys where new inserts are next
// to or close to the largest key ever inserted, e.g. sequential inserts.
template < class Comparator >
void InlineSkipList < Comparator > : : InsertWithHint ( const char * key ,
InsertHint * * hint_ptr ) {
int height = 0 ;
Node * x = GetNodeForInsert ( key , & height ) ;
// InsertWithHint() is not compatible with prev_ optimization used by
// Insert().
MaybeInvalidatePrev ( height ) ;
assert ( hint_ptr ! = nullptr ) ;
InsertHint * hint = * hint_ptr ;
if ( hint = = nullptr ) {
// AllocateInsertHint will initialize hint with num_levels = 0 and
// prev[i] = head_ for all i.
hint = AllocateInsertHint ( ) ;
* hint_ptr = hint ;
}
// Look for the first level i < num_levels with prev[i] < key.
int level = 0 ;
for ( ; level < hint - > num_levels ; level + + ) {
if ( KeyIsAfterNode ( key , hint - > prev [ level ] ) ) {
assert ( ! KeyIsAfterNode ( key , hint - > prev [ level ] - > Next ( level ) ) ) ;
break ;
}
}
Node * tmp_prev [ kMaxPossibleHeight ] ;
if ( level > = hint - > num_levels ) {
// The hint is not useful in this case. Fallback to full search.
FindLessThan ( key , tmp_prev ) ;
for ( int i = 0 ; i < height ; i + + ) {
assert ( tmp_prev [ i ] = = head_ | | KeyIsAfterNode ( key , tmp_prev [ i ] ) ) ;
assert ( ! KeyIsAfterNode ( key , tmp_prev [ i ] - > Next ( i ) ) ) ;
x - > InsertAfter ( tmp_prev [ i ] , i ) ;
}
} else {
// Search on levels below "level", using prev[level] as root.
if ( level > 0 ) {
FindLessThan ( key , tmp_prev , hint - > prev [ level ] , level , 0 ) ;
for ( int i = 0 ; i < level & & i < height ; i + + ) {
assert ( tmp_prev [ i ] = = head_ | | KeyIsAfterNode ( key , tmp_prev [ i ] ) ) ;
assert ( ! KeyIsAfterNode ( key , tmp_prev [ i ] - > Next ( i ) ) ) ;
x - > InsertAfter ( tmp_prev [ i ] , i ) ;
}
}
// The current level where the new node is to insert into skip-list.
int current_level = level ;
for ( int i = level ; i < hint - > num_levels ; i + + ) {
while ( current_level < height & & current_level < hint - > prev_height [ i ] ) {
// In this case, prev[i] is the previous node of key on current_level,
// since:
// * prev[i] < key;
// * no other nodes less than prev[level-1] has height greater than
// current_level, and prev[level-1] > key.
assert ( KeyIsAfterNode ( key , hint - > prev [ i ] ) ) ;
assert ( ! KeyIsAfterNode ( key , hint - > prev [ i ] - > Next ( current_level ) ) ) ;
x - > InsertAfter ( hint - > prev [ i ] , current_level ) ;
current_level + + ;
}
}
// Full search on levels above current_level if needed.
if ( current_level < height ) {
FindLessThan ( key , tmp_prev , head_ , GetMaxHeight ( ) , current_level ) ;
for ( int i = current_level ; i < height ; i + + ) {
assert ( tmp_prev [ i ] = = head_ | | KeyIsAfterNode ( key , tmp_prev [ i ] ) ) ;
assert ( ! KeyIsAfterNode ( key , tmp_prev [ i ] - > Next ( i ) ) ) ;
x - > InsertAfter ( tmp_prev [ i ] , i ) ;
}
}
}
// The last step is update the new node into the hint.
// * If "height" <= "level", prev[level] is still the previous node of
// prev[level-1] on level "level". Stop.
// * Otherwise, the new node becomes the new previous node of
// prev[level-1], or if level=0, the new node becomes the largest node
// inserted with the same hint. Replace prev[level] with the new node.
// * If prev[i] is replaced by another node, check if it can replace
// prev[i+1] using a similar rule, up till "num_levels" level.
Node * p = x ;
uint8_t h = static_cast < uint8_t > ( height ) ;
for ( int i = level ; i < hint - > num_levels ; i + + ) {
if ( h < = i ) {
p = nullptr ;
break ;
}
std : : swap ( p , hint - > prev [ i ] ) ;
std : : swap ( h , hint - > prev_height [ i ] ) ;
}
if ( p ! = nullptr & & h > hint - > num_levels ) {
hint - > prev [ hint - > num_levels ] = p ;
hint - > prev_height [ hint - > num_levels ] = h ;
hint - > num_levels + + ;
void InlineSkipList < Comparator > : : InsertWithHint ( const char * key , void * * hint ) {
assert ( hint ! = nullptr ) ;
Splice * splice = reinterpret_cast < Splice * > ( * hint ) ;
if ( splice = = nullptr ) {
splice = AllocateSplice ( ) ;
* hint = reinterpret_cast < void * > ( splice ) ;
}
Insert < false > ( key , splice , true ) ;
}
template < class Comparator >
void InlineSkipList < Comparator > : : FindLevel Splice ( const char * key , Node * before ,
Node * after , int level ,
Node * * out_prev ,
void InlineSkipList < Comparator > : : FindSpliceForLevel ( const char * key ,
Node * before , Node * after ,
int level , Node * * out_prev ,
Node * * out_next ) {
while ( true ) {
Node * next = before - > Next ( level ) ;
@ -831,15 +661,28 @@ void InlineSkipList<Comparator>::FindLevelSplice(const char* key, Node* before,
}
template < class Comparator >
void InlineSkipList < Comparator > : : InsertConcurrently ( const char * key ) {
void InlineSkipList < Comparator > : : RecomputeSpliceLevels ( const char * key ,
Splice * splice ,
int recompute_level ) {
assert ( recompute_level > 0 ) ;
assert ( recompute_level < = splice - > height_ ) ;
for ( int i = recompute_level - 1 ; i > = 0 ; - - i ) {
FindSpliceForLevel ( key , splice - > prev_ [ i + 1 ] , splice - > next_ [ i + 1 ] , i ,
& splice - > prev_ [ i ] , & splice - > next_ [ i ] ) ;
}
}
template < class Comparator >
template < bool UseCAS >
void InlineSkipList < Comparator > : : Insert ( const char * key , Splice * splice ,
bool allow_partial_splice_fix ) {
Node * x = reinterpret_cast < Node * > ( const_cast < char * > ( key ) ) - 1 ;
int height = x - > UnstashHeight ( ) ;
assert ( height > = 1 & & height < = kMaxHeight_ ) ;
MaybeInvalidatePrev ( height ) ;
int max_height = max_height_ . load ( std : : memory_order_relaxed ) ;
while ( height > max_height ) {
if ( max_height_ . compare_exchange_strong ( max_height , height ) ) {
if ( max_height_ . compare_exchange_weak ( max_height , height ) ) {
// successfully updated it
max_height = height ;
break ;
@ -849,17 +692,101 @@ void InlineSkipList<Comparator>::InsertConcurrently(const char* key) {
}
assert ( max_height < = kMaxPossibleHeight ) ;
Node * prev [ kMaxPossibleHeight + 1 ] ;
Node * next [ kMaxPossibleHeight + 1 ] ;
prev [ max_height ] = head_ ;
next [ max_height ] = nullptr ;
for ( int i = max_height - 1 ; i > = 0 ; - - i ) {
FindLevelSplice ( key , prev [ i + 1 ] , next [ i + 1 ] , i , & prev [ i ] , & next [ i ] ) ;
int recompute_height = 0 ;
if ( splice - > height_ < max_height ) {
// Either splice has never been used or max_height has grown since
// last use. We could potentially fix it in the latter case, but
// that is tricky.
splice - > prev_ [ max_height ] = head_ ;
splice - > next_ [ max_height ] = nullptr ;
splice - > height_ = max_height ;
recompute_height = max_height ;
} else {
// Splice is a valid proper-height splice that brackets some
// key, but does it bracket this one? We need to validate it and
// recompute a portion of the splice (levels 0..recompute_height-1)
// that is a superset of all levels that don't bracket the new key.
// Several choices are reasonable, because we have to balance the work
// saved against the extra comparisons required to validate the Splice.
//
// One strategy is just to recompute all of orig_splice_height if the
// bottom level isn't bracketing. This pessimistically assumes that
// we will either get a perfect Splice hit (increasing sequential
// inserts) or have no locality.
//
// Another strategy is to walk up the Splice's levels until we find
// a level that brackets the key. This strategy lets the Splice
// hint help for other cases: it turns insertion from O(log N) into
// O(log D), where D is the number of nodes in between the key that
// produced the Splice and the current insert (insertion is aided
// whether the new key is before or after the splice). If you have
// a way of using a prefix of the key to map directly to the closest
// Splice out of O(sqrt(N)) Splices and we make it so that splices
// can also be used as hints during read, then we end up with Oshman's
// and Shavit's SkipTrie, which has O(log log N) lookup and insertion
// (compare to O(log N) for skip list).
//
// We control the pessimistic strategy with allow_partial_splice_fix.
// A good strategy is probably to be pessimistic for seq_splice_,
// optimistic if the caller actually went to the work of providing
// a Splice.
while ( recompute_height < max_height ) {
if ( splice - > prev_ [ recompute_height ] - > Next ( recompute_height ) ! =
splice - > next_ [ recompute_height ] ) {
// splice isn't tight at this level, there must have been some inserts
// to this
// location that didn't update the splice. We might only be a little
// stale, but if
// the splice is very stale it would be O(N) to fix it. We haven't used
// up any of
// our budget of comparisons, so always move up even if we are
// pessimistic about
// our chances of success.
+ + recompute_height ;
} else if ( splice - > prev_ [ recompute_height ] ! = head_ & &
! KeyIsAfterNode ( key , splice - > prev_ [ recompute_height ] ) ) {
// key is from before splice
if ( allow_partial_splice_fix ) {
// skip all levels with the same node without more comparisons
Node * bad = splice - > prev_ [ recompute_height ] ;
while ( splice - > prev_ [ recompute_height ] = = bad ) {
+ + recompute_height ;
}
} else {
// we're pessimistic, recompute everything
recompute_height = max_height ;
}
} else if ( KeyIsAfterNode ( key , splice - > next_ [ recompute_height ] ) ) {
// key is from after splice
if ( allow_partial_splice_fix ) {
Node * bad = splice - > next_ [ recompute_height ] ;
while ( splice - > next_ [ recompute_height ] = = bad ) {
+ + recompute_height ;
}
} else {
recompute_height = max_height ;
}
} else {
// this level brackets the key, we won!
break ;
}
}
}
assert ( recompute_height < = max_height ) ;
if ( recompute_height > 0 ) {
RecomputeSpliceLevels ( key , splice , recompute_height ) ;
}
bool splice_is_valid = true ;
if ( UseCAS ) {
for ( int i = 0 ; i < height ; + + i ) {
while ( true ) {
x - > NoBarrier_SetNext ( i , next [ i ] ) ;
if ( prev [ i ] - > CASNext ( i , next [ i ] , x ) ) {
assert ( splice - > next_ [ i ] = = nullptr | |
compare_ ( x - > Key ( ) , splice - > next_ [ i ] - > Key ( ) ) < 0 ) ;
assert ( splice - > prev_ [ i ] = = head_ | |
compare_ ( splice - > prev_ [ i ] - > Key ( ) , x - > Key ( ) ) < 0 ) ;
x - > NoBarrier_SetNext ( i , splice - > next_ [ i ] ) ;
if ( splice - > prev_ [ i ] - > CASNext ( i , splice - > next_ [ i ] , x ) ) {
// success
break ;
}
@ -868,9 +795,56 @@ void InlineSkipList<Comparator>::InsertConcurrently(const char* key) {
// search, because it should be unlikely that lots of nodes have
// been inserted between prev[i] and next[i]. No point in using
// next[i] as the after hint, because we know it is stale.
FindLevelSplice ( key , prev [ i ] , nullptr , i , & prev [ i ] , & next [ i ] ) ;
FindSpliceForLevel ( key , splice - > prev_ [ i ] , nullptr , i , & splice - > prev_ [ i ] ,
& splice - > next_ [ i ] ) ;
// Since we've narrowed the bracket for level i, we might have
// violated the Splice constraint between i and i-1. Make sure
// we recompute the whole thing next time.
if ( i > 0 ) {
splice_is_valid = false ;
}
}
}
} else {
for ( int i = 0 ; i < height ; + + i ) {
if ( i > = recompute_height & &
splice - > prev_ [ i ] - > Next ( i ) ! = splice - > next_ [ i ] ) {
FindSpliceForLevel ( key , splice - > prev_ [ i ] , nullptr , i , & splice - > prev_ [ i ] ,
& splice - > next_ [ i ] ) ;
}
assert ( splice - > next_ [ i ] = = nullptr | |
compare_ ( x - > Key ( ) , splice - > next_ [ i ] - > Key ( ) ) < 0 ) ;
assert ( splice - > prev_ [ i ] = = head_ | |
compare_ ( splice - > prev_ [ i ] - > Key ( ) , x - > Key ( ) ) < 0 ) ;
assert ( splice - > prev_ [ i ] - > Next ( i ) = = splice - > next_ [ i ] ) ;
x - > NoBarrier_SetNext ( i , splice - > next_ [ i ] ) ;
splice - > prev_ [ i ] - > SetNext ( i , x ) ;
}
}
if ( splice_is_valid ) {
for ( int i = 0 ; i < height ; + + i ) {
splice - > prev_ [ i ] = x ;
}
assert ( splice - > prev_ [ splice - > height_ ] = = head_ ) ;
assert ( splice - > next_ [ splice - > height_ ] = = nullptr ) ;
for ( int i = 0 ; i < splice - > height_ ; + + i ) {
assert ( splice - > next_ [ i ] = = nullptr | |
compare_ ( key , splice - > next_ [ i ] - > Key ( ) ) < 0 ) ;
assert ( splice - > prev_ [ i ] = = head_ | |
compare_ ( splice - > prev_ [ i ] - > Key ( ) , key ) < = 0 ) ;
assert ( splice - > prev_ [ i + 1 ] = = splice - > prev_ [ i ] | |
splice - > prev_ [ i + 1 ] = = head_ | |
compare_ ( splice - > prev_ [ i + 1 ] - > Key ( ) , splice - > prev_ [ i ] - > Key ( ) ) <
0 ) ;
assert ( splice - > next_ [ i + 1 ] = = splice - > next_ [ i ] | |
splice - > next_ [ i + 1 ] = = nullptr | |
compare_ ( splice - > next_ [ i ] - > Key ( ) , splice - > next_ [ i + 1 ] - > Key ( ) ) <
0 ) ;
}
} else {
splice - > height_ = 0 ;
}
}
template < class Comparator >