@ -49,12 +49,8 @@ class CountMergeOperator : public AssociativeMergeOperator {
return true ;
return true ;
}
}
return mergeOperator_ - > PartialMerge (
return mergeOperator_ - > PartialMerge ( key , * existing_value , value , new_value ,
key ,
logger ) ;
* existing_value ,
value ,
new_value ,
logger ) ;
}
}
bool PartialMergeMulti ( const Slice & key ,
bool PartialMergeMulti ( const Slice & key ,
@ -73,6 +69,29 @@ class CountMergeOperator : public AssociativeMergeOperator {
std : : shared_ptr < MergeOperator > mergeOperator_ ;
std : : shared_ptr < MergeOperator > mergeOperator_ ;
} ;
} ;
class EnvMergeTest : public EnvWrapper {
public :
EnvMergeTest ( ) : EnvWrapper ( Env : : Default ( ) ) { }
// ~EnvMergeTest() override {}
uint64_t NowNanos ( ) override {
+ + now_nanos_count_ ;
return target ( ) - > NowNanos ( ) ;
}
static uint64_t now_nanos_count_ ;
static std : : unique_ptr < EnvMergeTest > singleton_ ;
static EnvMergeTest * GetInstance ( ) {
if ( nullptr = = singleton_ ) singleton_ . reset ( new EnvMergeTest ) ;
return singleton_ . get ( ) ;
}
} ;
uint64_t EnvMergeTest : : now_nanos_count_ { 0 } ;
std : : unique_ptr < EnvMergeTest > EnvMergeTest : : singleton_ ;
std : : shared_ptr < DB > OpenDb ( const std : : string & dbname , const bool ttl = false ,
std : : shared_ptr < DB > OpenDb ( const std : : string & dbname , const bool ttl = false ,
const size_t max_successive_merges = 0 ) {
const size_t max_successive_merges = 0 ) {
DB * db ;
DB * db ;
@ -80,6 +99,7 @@ std::shared_ptr<DB> OpenDb(const std::string& dbname, const bool ttl = false,
options . create_if_missing = true ;
options . create_if_missing = true ;
options . merge_operator = std : : make_shared < CountMergeOperator > ( ) ;
options . merge_operator = std : : make_shared < CountMergeOperator > ( ) ;
options . max_successive_merges = max_successive_merges ;
options . max_successive_merges = max_successive_merges ;
options . env = EnvMergeTest : : GetInstance ( ) ;
EXPECT_OK ( DestroyDB ( dbname , Options ( ) ) ) ;
EXPECT_OK ( DestroyDB ( dbname , Options ( ) ) ) ;
Status s ;
Status s ;
// DBWithTTL is not supported in ROCKSDB_LITE
// DBWithTTL is not supported in ROCKSDB_LITE
@ -106,7 +126,6 @@ std::shared_ptr<DB> OpenDb(const std::string& dbname, const bool ttl = false,
// set, add, get and remove
// set, add, get and remove
// This is a quick implementation without a Merge operation.
// This is a quick implementation without a Merge operation.
class Counters {
class Counters {
protected :
protected :
std : : shared_ptr < DB > db_ ;
std : : shared_ptr < DB > db_ ;
@ -190,7 +209,6 @@ class Counters {
return get ( key , & base ) & & set ( key , base + value ) ;
return get ( key , & base ) & & set ( key , base + value ) ;
}
}
// convenience functions for testing
// convenience functions for testing
void assert_set ( const std : : string & key , uint64_t value ) {
void assert_set ( const std : : string & key , uint64_t value ) {
assert ( set ( key , value ) ) ;
assert ( set ( key , value ) ) ;
@ -202,27 +220,25 @@ class Counters {
uint64_t value = default_ ;
uint64_t value = default_ ;
int result = get ( key , & value ) ;
int result = get ( key , & value ) ;
assert ( result ) ;
assert ( result ) ;
if ( result = = 0 ) exit ( 1 ) ; // Disable unused variable warning.
if ( result = = 0 ) exit ( 1 ) ; // Disable unused variable warning.
return value ;
return value ;
}
}
void assert_add ( const std : : string & key , uint64_t value ) {
void assert_add ( const std : : string & key , uint64_t value ) {
int result = add ( key , value ) ;
int result = add ( key , value ) ;
assert ( result ) ;
assert ( result ) ;
if ( result = = 0 ) exit ( 1 ) ; // Disable unused variable warning.
if ( result = = 0 ) exit ( 1 ) ; // Disable unused variable warning.
}
}
} ;
} ;
// Implement 'add' directly with the new Merge operation
// Implement 'add' directly with the new Merge operation
class MergeBasedCounters : public Counters {
class MergeBasedCounters : public Counters {
private :
private :
WriteOptions merge_option_ ; // for merge
WriteOptions merge_option_ ; // for merge
public :
public :
explicit MergeBasedCounters ( std : : shared_ptr < DB > db , uint64_t defaultCount = 0 )
explicit MergeBasedCounters ( std : : shared_ptr < DB > db , uint64_t defaultCount = 0 )
: Counters ( db , defaultCount ) ,
: Counters ( db , defaultCount ) , merge_option_ ( ) { }
merge_option_ ( ) {
}
// mapped to a rocksdb Merge operation
// mapped to a rocksdb Merge operation
bool add ( const std : : string & key , uint64_t value ) override {
bool add ( const std : : string & key , uint64_t value ) override {
@ -243,14 +259,13 @@ class MergeBasedCounters : public Counters {
void dumpDb ( DB * db ) {
void dumpDb ( DB * db ) {
auto it = std : : unique_ptr < Iterator > ( db - > NewIterator ( ReadOptions ( ) ) ) ;
auto it = std : : unique_ptr < Iterator > ( db - > NewIterator ( ReadOptions ( ) ) ) ;
for ( it - > SeekToFirst ( ) ; it - > Valid ( ) ; it - > Next ( ) ) {
for ( it - > SeekToFirst ( ) ; it - > Valid ( ) ; it - > Next ( ) ) {
//uint64_t value = DecodeFixed64(it->value().data());
// uint64_t value = DecodeFixed64(it->value().data());
//std::cout << it->key().ToString() << ": " << value << std::endl;
// std::cout << it->key().ToString() << ": " << value << std::endl;
}
}
assert ( it - > status ( ) . ok ( ) ) ; // Check for any errors found during the scan
assert ( it - > status ( ) . ok ( ) ) ; // Check for any errors found during the scan
}
}
void testCounters ( Counters & counters , DB * db , bool test_compaction ) {
void testCounters ( Counters & counters , DB * db , bool test_compaction ) {
FlushOptions o ;
FlushOptions o ;
o . wait = true ;
o . wait = true ;
@ -392,7 +407,6 @@ void testCountersWithFlushAndCompaction(Counters& counters, DB* db) {
void testSuccessiveMerge ( Counters & counters , size_t max_num_merges ,
void testSuccessiveMerge ( Counters & counters , size_t max_num_merges ,
size_t num_merges ) {
size_t num_merges ) {
counters . assert_remove ( " z " ) ;
counters . assert_remove ( " z " ) ;
uint64_t sum = 0 ;
uint64_t sum = 0 ;
@ -449,6 +463,7 @@ void testPartialMerge(Counters* counters, DB* db, size_t max_merge,
ASSERT_OK ( db - > CompactRange ( CompactRangeOptions ( ) , nullptr , nullptr ) ) ;
ASSERT_OK ( db - > CompactRange ( CompactRangeOptions ( ) , nullptr , nullptr ) ) ;
ASSERT_EQ ( tmp_sum , counters - > assert_get ( " c " ) ) ;
ASSERT_EQ ( tmp_sum , counters - > assert_get ( " c " ) ) ;
ASSERT_EQ ( num_partial_merge_calls , 0U ) ;
ASSERT_EQ ( num_partial_merge_calls , 0U ) ;
ASSERT_EQ ( EnvMergeTest : : now_nanos_count_ , 0U ) ;
}
}
void testSingleBatchSuccessiveMerge ( DB * db , size_t max_num_merges ,
void testSingleBatchSuccessiveMerge ( DB * db , size_t max_num_merges ,
@ -486,7 +501,6 @@ void testSingleBatchSuccessiveMerge(DB* db, size_t max_num_merges,
}
}
void runTest ( const std : : string & dbname , const bool use_ttl = false ) {
void runTest ( const std : : string & dbname , const bool use_ttl = false ) {
{
{
auto db = OpenDb ( dbname , use_ttl ) ;
auto db = OpenDb ( dbname , use_ttl ) ;