From 6a6723ee1eeab356bfb6a8e7a0608acaf0b3dcd5 Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Tue, 11 Apr 2017 16:11:27 -0700 Subject: [PATCH] Move MergeOperatorPinning tests to be with other merge operator tests Summary: Moved MergeOperatorPinning tests from db_test2.cc to db_merge_operator_test.cc. [This is the same code as PR #2104 , which has already been reviewed, but I am creating a new PR as I cannot import from #2104 onto phabricator anymore even after rebasing. I'll close and discard #2104.] Closes https://github.com/facebook/rocksdb/pull/2125 Differential Revision: D4863312 Pulled By: sagar0 fbshipit-source-id: 0f71a7690aa09c1d03ee85ce2bc1d2d89e4f4399 --- db/db_merge_operator_test.cc | 288 +++++++++++++++++++++++++++++++++++ db/db_test2.cc | 287 ---------------------------------- 2 files changed, 288 insertions(+), 287 deletions(-) diff --git a/db/db_merge_operator_test.cc b/db/db_merge_operator_test.cc index 75e337da8..45f2b5f97 100644 --- a/db/db_merge_operator_test.cc +++ b/db/db_merge_operator_test.cc @@ -92,6 +92,294 @@ TEST_F(DBMergeOperatorTest, MergeErrorOnIteration) { VerifyDBInternal({{"k1", "v1"}, {"k2", "corrupted"}, {"k2", "v2"}}); } + +class MergeOperatorPinningTest : public DBMergeOperatorTest, + public testing::WithParamInterface { + public: + MergeOperatorPinningTest() { disable_block_cache_ = GetParam(); } + + bool disable_block_cache_; +}; + +INSTANTIATE_TEST_CASE_P(MergeOperatorPinningTest, MergeOperatorPinningTest, + ::testing::Bool()); + +#ifndef ROCKSDB_LITE +TEST_P(MergeOperatorPinningTest, OperandsMultiBlocks) { + Options options = CurrentOptions(); + BlockBasedTableOptions table_options; + table_options.block_size = 1; // every block will contain one entry + table_options.no_block_cache = disable_block_cache_; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + options.merge_operator = MergeOperators::CreateStringAppendTESTOperator(); + options.level0_slowdown_writes_trigger = (1 << 30); + options.level0_stop_writes_trigger = (1 << 30); + options.disable_auto_compactions = true; + DestroyAndReopen(options); + + const int kKeysPerFile = 10; + const int kOperandsPerKeyPerFile = 7; + const int kOperandSize = 100; + // Filse to write in L0 before compacting to lower level + const int kFilesPerLevel = 3; + + Random rnd(301); + std::map true_data; + int batch_num = 1; + int lvl_to_fill = 4; + int key_id = 0; + while (true) { + for (int j = 0; j < kKeysPerFile; j++) { + std::string key = Key(key_id % 35); + key_id++; + for (int k = 0; k < kOperandsPerKeyPerFile; k++) { + std::string val = RandomString(&rnd, kOperandSize); + ASSERT_OK(db_->Merge(WriteOptions(), key, val)); + if (true_data[key].size() == 0) { + true_data[key] = val; + } else { + true_data[key] += "," + val; + } + } + } + + if (lvl_to_fill == -1) { + // Keep last batch in memtable and stop + break; + } + + ASSERT_OK(Flush()); + if (batch_num % kFilesPerLevel == 0) { + if (lvl_to_fill != 0) { + MoveFilesToLevel(lvl_to_fill); + } + lvl_to_fill--; + } + batch_num++; + } + + // 3 L0 files + // 1 L1 file + // 3 L2 files + // 1 L3 file + // 3 L4 Files + ASSERT_EQ(FilesPerLevel(), "3,1,3,1,3"); + + VerifyDBFromMap(true_data); +} + +TEST_P(MergeOperatorPinningTest, Randomized) { + do { + Options options = CurrentOptions(); + options.merge_operator = MergeOperators::CreateMaxOperator(); + BlockBasedTableOptions table_options; + table_options.no_block_cache = disable_block_cache_; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + DestroyAndReopen(options); + + Random rnd(301); + std::map true_data; + + const int kTotalMerges = 10000; + // Every key gets ~10 operands + const int kKeyRange = kTotalMerges / 10; + const int kOperandSize = 20; + const int kNumPutBefore = kKeyRange / 10; // 10% value + const int kNumPutAfter = kKeyRange / 10; // 10% overwrite + const int kNumDelete = kKeyRange / 10; // 10% delete + + // kNumPutBefore keys will have base values + for (int i = 0; i < kNumPutBefore; i++) { + std::string key = Key(rnd.Next() % kKeyRange); + std::string value = RandomString(&rnd, kOperandSize); + ASSERT_OK(db_->Put(WriteOptions(), key, value)); + + true_data[key] = value; + } + + // Do kTotalMerges merges + for (int i = 0; i < kTotalMerges; i++) { + std::string key = Key(rnd.Next() % kKeyRange); + std::string value = RandomString(&rnd, kOperandSize); + ASSERT_OK(db_->Merge(WriteOptions(), key, value)); + + if (true_data[key] < value) { + true_data[key] = value; + } + } + + // Overwrite random kNumPutAfter keys + for (int i = 0; i < kNumPutAfter; i++) { + std::string key = Key(rnd.Next() % kKeyRange); + std::string value = RandomString(&rnd, kOperandSize); + ASSERT_OK(db_->Put(WriteOptions(), key, value)); + + true_data[key] = value; + } + + // Delete random kNumDelete keys + for (int i = 0; i < kNumDelete; i++) { + std::string key = Key(rnd.Next() % kKeyRange); + ASSERT_OK(db_->Delete(WriteOptions(), key)); + + true_data.erase(key); + } + + VerifyDBFromMap(true_data); + + // Skip HashCuckoo since it does not support merge operators + } while (ChangeOptions(kSkipMergePut | kSkipHashCuckoo)); +} + +class MergeOperatorHook : public MergeOperator { + public: + explicit MergeOperatorHook(std::shared_ptr _merge_op) + : merge_op_(_merge_op) {} + + virtual bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override { + before_merge_(); + bool res = merge_op_->FullMergeV2(merge_in, merge_out); + after_merge_(); + return res; + } + + virtual const char* Name() const override { return merge_op_->Name(); } + + std::shared_ptr merge_op_; + std::function before_merge_ = []() {}; + std::function after_merge_ = []() {}; +}; + +TEST_P(MergeOperatorPinningTest, EvictCacheBeforeMerge) { + Options options = CurrentOptions(); + + auto merge_hook = + std::make_shared(MergeOperators::CreateMaxOperator()); + options.merge_operator = merge_hook; + options.disable_auto_compactions = true; + options.level0_slowdown_writes_trigger = (1 << 30); + options.level0_stop_writes_trigger = (1 << 30); + options.max_open_files = 20; + BlockBasedTableOptions bbto; + bbto.no_block_cache = disable_block_cache_; + if (bbto.no_block_cache == false) { + bbto.block_cache = NewLRUCache(64 * 1024 * 1024); + } else { + bbto.block_cache = nullptr; + } + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + DestroyAndReopen(options); + + const int kNumOperands = 30; + const int kNumKeys = 1000; + const int kOperandSize = 100; + Random rnd(301); + + // 1000 keys every key have 30 operands, every operand is in a different file + std::map true_data; + for (int i = 0; i < kNumOperands; i++) { + for (int j = 0; j < kNumKeys; j++) { + std::string k = Key(j); + std::string v = RandomString(&rnd, kOperandSize); + ASSERT_OK(db_->Merge(WriteOptions(), k, v)); + + true_data[k] = std::max(true_data[k], v); + } + ASSERT_OK(Flush()); + } + + std::vector file_numbers = ListTableFiles(env_, dbname_); + ASSERT_EQ(file_numbers.size(), kNumOperands); + int merge_cnt = 0; + + // Code executed before merge operation + merge_hook->before_merge_ = [&]() { + // Evict all tables from cache before every merge operation + for (uint64_t num : file_numbers) { + TableCache::Evict(dbfull()->TEST_table_cache(), num); + } + // Decrease cache capacity to force all unrefed blocks to be evicted + if (bbto.block_cache) { + bbto.block_cache->SetCapacity(1); + } + merge_cnt++; + }; + + // Code executed after merge operation + merge_hook->after_merge_ = [&]() { + // Increase capacity again after doing the merge + if (bbto.block_cache) { + bbto.block_cache->SetCapacity(64 * 1024 * 1024); + } + }; + + size_t total_reads; + VerifyDBFromMap(true_data, &total_reads); + ASSERT_EQ(merge_cnt, total_reads); + + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + + VerifyDBFromMap(true_data, &total_reads); +} + +TEST_P(MergeOperatorPinningTest, TailingIterator) { + Options options = CurrentOptions(); + options.merge_operator = MergeOperators::CreateMaxOperator(); + BlockBasedTableOptions bbto; + bbto.no_block_cache = disable_block_cache_; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + DestroyAndReopen(options); + + const int kNumOperands = 100; + const int kNumWrites = 100000; + + std::function writer_func = [&]() { + int k = 0; + for (int i = 0; i < kNumWrites; i++) { + db_->Merge(WriteOptions(), Key(k), Key(k)); + + if (i && i % kNumOperands == 0) { + k++; + } + if (i && i % 127 == 0) { + ASSERT_OK(Flush()); + } + if (i && i % 317 == 0) { + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + } + } + }; + + std::function reader_func = [&]() { + ReadOptions ro; + ro.tailing = true; + Iterator* iter = db_->NewIterator(ro); + + iter->SeekToFirst(); + for (int i = 0; i < (kNumWrites / kNumOperands); i++) { + while (!iter->Valid()) { + // wait for the key to be written + env_->SleepForMicroseconds(100); + iter->Seek(Key(i)); + } + ASSERT_EQ(iter->key(), Key(i)); + ASSERT_EQ(iter->value(), Key(i)); + + iter->Next(); + } + + delete iter; + }; + + rocksdb::port::Thread writer_thread(writer_func); + rocksdb::port::Thread reader_thread(reader_func); + + writer_thread.join(); + reader_thread.join(); +} +#endif // ROCKSDB_LITE + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_test2.cc b/db/db_test2.cc index bd22ba228..f673d24b6 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -1627,293 +1627,6 @@ TEST_F(DBTest2, SyncPointMarker) { } #endif -class MergeOperatorPinningTest : public DBTest2, - public testing::WithParamInterface { - public: - MergeOperatorPinningTest() { disable_block_cache_ = GetParam(); } - - bool disable_block_cache_; -}; - -INSTANTIATE_TEST_CASE_P(MergeOperatorPinningTest, MergeOperatorPinningTest, - ::testing::Bool()); - -#ifndef ROCKSDB_LITE -TEST_P(MergeOperatorPinningTest, OperandsMultiBlocks) { - Options options = CurrentOptions(); - BlockBasedTableOptions table_options; - table_options.block_size = 1; // every block will contain one entry - table_options.no_block_cache = disable_block_cache_; - options.table_factory.reset(NewBlockBasedTableFactory(table_options)); - options.merge_operator = MergeOperators::CreateStringAppendTESTOperator(); - options.level0_slowdown_writes_trigger = (1 << 30); - options.level0_stop_writes_trigger = (1 << 30); - options.disable_auto_compactions = true; - DestroyAndReopen(options); - - const int kKeysPerFile = 10; - const int kOperandsPerKeyPerFile = 7; - const int kOperandSize = 100; - // Filse to write in L0 before compacting to lower level - const int kFilesPerLevel = 3; - - Random rnd(301); - std::map true_data; - int batch_num = 1; - int lvl_to_fill = 4; - int key_id = 0; - while (true) { - for (int j = 0; j < kKeysPerFile; j++) { - std::string key = Key(key_id % 35); - key_id++; - for (int k = 0; k < kOperandsPerKeyPerFile; k++) { - std::string val = RandomString(&rnd, kOperandSize); - ASSERT_OK(db_->Merge(WriteOptions(), key, val)); - if (true_data[key].size() == 0) { - true_data[key] = val; - } else { - true_data[key] += "," + val; - } - } - } - - if (lvl_to_fill == -1) { - // Keep last batch in memtable and stop - break; - } - - ASSERT_OK(Flush()); - if (batch_num % kFilesPerLevel == 0) { - if (lvl_to_fill != 0) { - MoveFilesToLevel(lvl_to_fill); - } - lvl_to_fill--; - } - batch_num++; - } - - // 3 L0 files - // 1 L1 file - // 3 L2 files - // 1 L3 file - // 3 L4 Files - ASSERT_EQ(FilesPerLevel(), "3,1,3,1,3"); - - VerifyDBFromMap(true_data); -} - -TEST_P(MergeOperatorPinningTest, Randomized) { - do { - Options options = CurrentOptions(); - options.merge_operator = MergeOperators::CreateMaxOperator(); - BlockBasedTableOptions table_options; - table_options.no_block_cache = disable_block_cache_; - options.table_factory.reset(NewBlockBasedTableFactory(table_options)); - DestroyAndReopen(options); - - Random rnd(301); - std::map true_data; - - const int kTotalMerges = 10000; - // Every key gets ~10 operands - const int kKeyRange = kTotalMerges / 10; - const int kOperandSize = 20; - const int kNumPutBefore = kKeyRange / 10; // 10% value - const int kNumPutAfter = kKeyRange / 10; // 10% overwrite - const int kNumDelete = kKeyRange / 10; // 10% delete - - // kNumPutBefore keys will have base values - for (int i = 0; i < kNumPutBefore; i++) { - std::string key = Key(rnd.Next() % kKeyRange); - std::string value = RandomString(&rnd, kOperandSize); - ASSERT_OK(db_->Put(WriteOptions(), key, value)); - - true_data[key] = value; - } - - // Do kTotalMerges merges - for (int i = 0; i < kTotalMerges; i++) { - std::string key = Key(rnd.Next() % kKeyRange); - std::string value = RandomString(&rnd, kOperandSize); - ASSERT_OK(db_->Merge(WriteOptions(), key, value)); - - if (true_data[key] < value) { - true_data[key] = value; - } - } - - // Overwrite random kNumPutAfter keys - for (int i = 0; i < kNumPutAfter; i++) { - std::string key = Key(rnd.Next() % kKeyRange); - std::string value = RandomString(&rnd, kOperandSize); - ASSERT_OK(db_->Put(WriteOptions(), key, value)); - - true_data[key] = value; - } - - // Delete random kNumDelete keys - for (int i = 0; i < kNumDelete; i++) { - std::string key = Key(rnd.Next() % kKeyRange); - ASSERT_OK(db_->Delete(WriteOptions(), key)); - - true_data.erase(key); - } - - VerifyDBFromMap(true_data); - - // Skip HashCuckoo since it does not support merge operators - } while (ChangeOptions(kSkipMergePut | kSkipHashCuckoo)); -} - -class MergeOperatorHook : public MergeOperator { - public: - explicit MergeOperatorHook(std::shared_ptr _merge_op) - : merge_op_(_merge_op) {} - - virtual bool FullMergeV2(const MergeOperationInput& merge_in, - MergeOperationOutput* merge_out) const override { - before_merge_(); - bool res = merge_op_->FullMergeV2(merge_in, merge_out); - after_merge_(); - return res; - } - - virtual const char* Name() const override { return merge_op_->Name(); } - - std::shared_ptr merge_op_; - std::function before_merge_ = []() {}; - std::function after_merge_ = []() {}; -}; - -TEST_P(MergeOperatorPinningTest, EvictCacheBeforeMerge) { - Options options = CurrentOptions(); - - auto merge_hook = - std::make_shared(MergeOperators::CreateMaxOperator()); - options.merge_operator = merge_hook; - options.disable_auto_compactions = true; - options.level0_slowdown_writes_trigger = (1 << 30); - options.level0_stop_writes_trigger = (1 << 30); - options.max_open_files = 20; - BlockBasedTableOptions bbto; - bbto.no_block_cache = disable_block_cache_; - if (bbto.no_block_cache == false) { - bbto.block_cache = NewLRUCache(64 * 1024 * 1024); - } else { - bbto.block_cache = nullptr; - } - options.table_factory.reset(NewBlockBasedTableFactory(bbto)); - DestroyAndReopen(options); - - const int kNumOperands = 30; - const int kNumKeys = 1000; - const int kOperandSize = 100; - Random rnd(301); - - // 1000 keys every key have 30 operands, every operand is in a different file - std::map true_data; - for (int i = 0; i < kNumOperands; i++) { - for (int j = 0; j < kNumKeys; j++) { - std::string k = Key(j); - std::string v = RandomString(&rnd, kOperandSize); - ASSERT_OK(db_->Merge(WriteOptions(), k, v)); - - true_data[k] = std::max(true_data[k], v); - } - ASSERT_OK(Flush()); - } - - std::vector file_numbers = ListTableFiles(env_, dbname_); - ASSERT_EQ(file_numbers.size(), kNumOperands); - int merge_cnt = 0; - - // Code executed before merge operation - merge_hook->before_merge_ = [&]() { - // Evict all tables from cache before every merge operation - for (uint64_t num : file_numbers) { - TableCache::Evict(dbfull()->TEST_table_cache(), num); - } - // Decrease cache capacity to force all unrefed blocks to be evicted - if (bbto.block_cache) { - bbto.block_cache->SetCapacity(1); - } - merge_cnt++; - }; - - // Code executed after merge operation - merge_hook->after_merge_ = [&]() { - // Increase capacity again after doing the merge - if (bbto.block_cache) { - bbto.block_cache->SetCapacity(64 * 1024 * 1024); - } - }; - - size_t total_reads; - VerifyDBFromMap(true_data, &total_reads); - ASSERT_EQ(merge_cnt, total_reads); - - db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); - - VerifyDBFromMap(true_data, &total_reads); -} - -TEST_P(MergeOperatorPinningTest, TailingIterator) { - Options options = CurrentOptions(); - options.merge_operator = MergeOperators::CreateMaxOperator(); - BlockBasedTableOptions bbto; - bbto.no_block_cache = disable_block_cache_; - options.table_factory.reset(NewBlockBasedTableFactory(bbto)); - DestroyAndReopen(options); - - const int kNumOperands = 100; - const int kNumWrites = 100000; - - std::function writer_func = [&]() { - int k = 0; - for (int i = 0; i < kNumWrites; i++) { - db_->Merge(WriteOptions(), Key(k), Key(k)); - - if (i && i % kNumOperands == 0) { - k++; - } - if (i && i % 127 == 0) { - ASSERT_OK(Flush()); - } - if (i && i % 317 == 0) { - ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); - } - } - }; - - std::function reader_func = [&]() { - ReadOptions ro; - ro.tailing = true; - Iterator* iter = db_->NewIterator(ro); - - iter->SeekToFirst(); - for (int i = 0; i < (kNumWrites / kNumOperands); i++) { - while (!iter->Valid()) { - // wait for the key to be written - env_->SleepForMicroseconds(100); - iter->Seek(Key(i)); - } - ASSERT_EQ(iter->key(), Key(i)); - ASSERT_EQ(iter->value(), Key(i)); - - iter->Next(); - } - - delete iter; - }; - - rocksdb::port::Thread writer_thread(writer_func); - rocksdb::port::Thread reader_thread(reader_func); - - writer_thread.join(); - reader_thread.join(); -} -#endif // ROCKSDB_LITE - size_t GetEncodedEntrySize(size_t key_size, size_t value_size) { std::string buffer;