@ -14,7 +14,9 @@
# include <algorithm>
# include <memory>
# include <mutex>
# include <sstream>
# include "db/dbformat.h"
# include "port/port.h"
# include "rocksdb/convenience.h"
# include "rocksdb/slice.h"
@ -216,6 +218,75 @@ class ReverseBytewiseComparatorImpl : public BytewiseComparatorImpl {
return - a . compare ( b ) ;
}
} ;
// EXPERIMENTAL
// Comparator with 64-bit integer timestamp.
// We did not performance test this yet.
template < typename TComparator >
class ComparatorWithU64TsImpl : public Comparator {
static_assert ( std : : is_base_of < Comparator , TComparator > : : value ,
" template type must be a inherited type of comparator " ) ;
public :
explicit ComparatorWithU64TsImpl ( ) : Comparator ( /*ts_sz=*/ sizeof ( uint64_t ) ) {
assert ( cmp_without_ts_ . timestamp_size ( ) = = 0 ) ;
}
static const char * kClassName ( ) {
static std : : string class_name = kClassNameInternal ( ) ;
return class_name . c_str ( ) ;
}
const char * Name ( ) const override { return kClassName ( ) ; }
void FindShortSuccessor ( std : : string * ) const override { }
void FindShortestSeparator ( std : : string * , const Slice & ) const override { }
int Compare ( const Slice & a , const Slice & b ) const override {
int ret = CompareWithoutTimestamp ( a , b ) ;
size_t ts_sz = timestamp_size ( ) ;
if ( ret ! = 0 ) {
return ret ;
}
// Compare timestamp.
// For the same user key with different timestamps, larger (newer) timestamp
// comes first.
return - CompareTimestamp ( ExtractTimestampFromUserKey ( a , ts_sz ) ,
ExtractTimestampFromUserKey ( b , ts_sz ) ) ;
}
using Comparator : : CompareWithoutTimestamp ;
int CompareWithoutTimestamp ( const Slice & a , bool a_has_ts , const Slice & b ,
bool b_has_ts ) const override {
const size_t ts_sz = timestamp_size ( ) ;
assert ( ! a_has_ts | | a . size ( ) > = ts_sz ) ;
assert ( ! b_has_ts | | b . size ( ) > = ts_sz ) ;
Slice lhs = a_has_ts ? StripTimestampFromUserKey ( a , ts_sz ) : a ;
Slice rhs = b_has_ts ? StripTimestampFromUserKey ( b , ts_sz ) : b ;
return cmp_without_ts_ . Compare ( lhs , rhs ) ;
}
int CompareTimestamp ( const Slice & ts1 , const Slice & ts2 ) const override {
assert ( ts1 . size ( ) = = sizeof ( uint64_t ) ) ;
assert ( ts2 . size ( ) = = sizeof ( uint64_t ) ) ;
uint64_t lhs = DecodeFixed64 ( ts1 . data ( ) ) ;
uint64_t rhs = DecodeFixed64 ( ts2 . data ( ) ) ;
if ( lhs < rhs ) {
return - 1 ;
} else if ( lhs > rhs ) {
return 1 ;
} else {
return 0 ;
}
}
private :
static std : : string kClassNameInternal ( ) {
std : : stringstream ss ;
ss < < TComparator : : kClassName ( ) < < " .u64ts " ;
return ss . str ( ) ;
}
TComparator cmp_without_ts_ ;
} ;
} // namespace
const Comparator * BytewiseComparator ( ) {
@ -228,6 +299,11 @@ const Comparator* ReverseBytewiseComparator() {
return & rbytewise ;
}
const Comparator * BytewiseComparatorWithU64Ts ( ) {
static ComparatorWithU64TsImpl < BytewiseComparatorImpl > comp_with_u64_ts ;
return & comp_with_u64_ts ;
}
# ifndef ROCKSDB_LITE
static int RegisterBuiltinComparators ( ObjectLibrary & library ,
const std : : string & /*arg*/ ) {
@ -241,7 +317,12 @@ static int RegisterBuiltinComparators(ObjectLibrary& library,
[ ] ( const std : : string & /*uri*/ ,
std : : unique_ptr < const Comparator > * /*guard */ ,
std : : string * /* errmsg */ ) { return ReverseBytewiseComparator ( ) ; } ) ;
return 2 ;
library . AddFactory < const Comparator > (
ComparatorWithU64TsImpl < BytewiseComparatorImpl > : : kClassName ( ) ,
[ ] ( const std : : string & /*uri*/ ,
std : : unique_ptr < const Comparator > * /*guard */ ,
std : : string * /* errmsg */ ) { return BytewiseComparatorWithU64Ts ( ) ; } ) ;
return 3 ;
}
# endif // ROCKSDB_LITE
@ -265,6 +346,9 @@ Status Comparator::CreateFromString(const ConfigOptions& config_options,
* result = BytewiseComparator ( ) ;
} else if ( id = = ReverseBytewiseComparatorImpl : : kClassName ( ) ) {
* result = ReverseBytewiseComparator ( ) ;
} else if ( id = =
ComparatorWithU64TsImpl < BytewiseComparatorImpl > : : kClassName ( ) ) {
* result = BytewiseComparatorWithU64Ts ( ) ;
} else if ( value . empty ( ) ) {
// No Id and no options. Clear the object
* result = nullptr ;