// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include #include #include #include "db/db_test_util.h" #include "port/stack_trace.h" #include "rocksdb/persistent_cache.h" #include "rocksdb/wal_filter.h" namespace rocksdb { class DBTest2 : public DBTestBase { public: DBTest2() : DBTestBase("/db_test2") {} }; class PrefixFullBloomWithReverseComparator : public DBTestBase, public ::testing::WithParamInterface { public: PrefixFullBloomWithReverseComparator() : DBTestBase("/prefix_bloom_reverse") {} virtual void SetUp() override { if_cache_filter_ = GetParam(); } bool if_cache_filter_; }; TEST_P(PrefixFullBloomWithReverseComparator, PrefixFullBloomWithReverseComparator) { Options options = last_options_; options.comparator = ReverseBytewiseComparator(); options.prefix_extractor.reset(NewCappedPrefixTransform(3)); options.statistics = rocksdb::CreateDBStatistics(); BlockBasedTableOptions bbto; if (if_cache_filter_) { bbto.no_block_cache = false; bbto.cache_index_and_filter_blocks = true; bbto.block_cache = NewLRUCache(1); } bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); bbto.whole_key_filtering = false; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); DestroyAndReopen(options); ASSERT_OK(dbfull()->Put(WriteOptions(), "bar123", "foo")); ASSERT_OK(dbfull()->Put(WriteOptions(), "bar234", "foo2")); ASSERT_OK(dbfull()->Put(WriteOptions(), "foo123", "foo3")); dbfull()->Flush(FlushOptions()); if (bbto.block_cache) { bbto.block_cache->EraseUnRefEntries(); } unique_ptr iter(db_->NewIterator(ReadOptions())); iter->Seek("bar345"); ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); ASSERT_EQ("bar234", iter->key().ToString()); ASSERT_EQ("foo2", iter->value().ToString()); iter->Next(); ASSERT_TRUE(iter->Valid()); ASSERT_EQ("bar123", iter->key().ToString()); ASSERT_EQ("foo", iter->value().ToString()); iter->Seek("foo234"); ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); ASSERT_EQ("foo123", iter->key().ToString()); ASSERT_EQ("foo3", iter->value().ToString()); iter->Seek("bar"); ASSERT_OK(iter->status()); ASSERT_TRUE(!iter->Valid()); } INSTANTIATE_TEST_CASE_P(PrefixFullBloomWithReverseComparator, PrefixFullBloomWithReverseComparator, testing::Bool()); TEST_F(DBTest2, IteratorPropertyVersionNumber) { Put("", ""); Iterator* iter1 = db_->NewIterator(ReadOptions()); std::string prop_value; ASSERT_OK( iter1->GetProperty("rocksdb.iterator.super-version-number", &prop_value)); uint64_t version_number1 = static_cast(std::atoi(prop_value.c_str())); Put("", ""); Flush(); Iterator* iter2 = db_->NewIterator(ReadOptions()); ASSERT_OK( iter2->GetProperty("rocksdb.iterator.super-version-number", &prop_value)); uint64_t version_number2 = static_cast(std::atoi(prop_value.c_str())); ASSERT_GT(version_number2, version_number1); Put("", ""); Iterator* iter3 = db_->NewIterator(ReadOptions()); ASSERT_OK( iter3->GetProperty("rocksdb.iterator.super-version-number", &prop_value)); uint64_t version_number3 = static_cast(std::atoi(prop_value.c_str())); ASSERT_EQ(version_number2, version_number3); iter1->SeekToFirst(); ASSERT_OK( iter1->GetProperty("rocksdb.iterator.super-version-number", &prop_value)); uint64_t version_number1_new = static_cast(std::atoi(prop_value.c_str())); ASSERT_EQ(version_number1, version_number1_new); delete iter1; delete iter2; delete iter3; } TEST_F(DBTest2, CacheIndexAndFilterWithDBRestart) { Options options = CurrentOptions(); options.create_if_missing = true; options.statistics = rocksdb::CreateDBStatistics(); BlockBasedTableOptions table_options; table_options.cache_index_and_filter_blocks = true; table_options.filter_policy.reset(NewBloomFilterPolicy(20)); options.table_factory.reset(new BlockBasedTableFactory(table_options)); CreateAndReopenWithCF({"pikachu"}, options); Put(1, "a", "begin"); Put(1, "z", "end"); ASSERT_OK(Flush(1)); TryReopenWithColumnFamilies({"default", "pikachu"}, options); std::string value; value = Get(1, "a"); } #ifndef ROCKSDB_LITE class DBTestSharedWriteBufferAcrossCFs : public DBTestBase, public testing::WithParamInterface { public: DBTestSharedWriteBufferAcrossCFs() : DBTestBase("/db_test_shared_write_buffer") {} void SetUp() override { use_old_interface_ = GetParam(); } bool use_old_interface_; }; TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) { Options options = CurrentOptions(); if (use_old_interface_) { options.db_write_buffer_size = 100000; // this is the real limit } else { options.write_buffer_manager.reset(new WriteBufferManager(100000)); } options.write_buffer_size = 500000; // this is never hit CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options); // Trigger a flush on CF "nikitich" ASSERT_OK(Put(0, Key(1), DummyString(1))); ASSERT_OK(Put(1, Key(1), DummyString(1))); ASSERT_OK(Put(3, Key(1), DummyString(90000))); ASSERT_OK(Put(2, Key(2), DummyString(20000))); ASSERT_OK(Put(2, Key(1), DummyString(1))); dbfull()->TEST_WaitForFlushMemTable(handles_[0]); dbfull()->TEST_WaitForFlushMemTable(handles_[1]); dbfull()->TEST_WaitForFlushMemTable(handles_[2]); dbfull()->TEST_WaitForFlushMemTable(handles_[3]); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), static_cast(1)); } // "dobrynia": 20KB // Flush 'dobrynia' ASSERT_OK(Put(3, Key(2), DummyString(40000))); ASSERT_OK(Put(2, Key(2), DummyString(70000))); ASSERT_OK(Put(0, Key(1), DummyString(1))); dbfull()->TEST_WaitForFlushMemTable(handles_[1]); dbfull()->TEST_WaitForFlushMemTable(handles_[2]); dbfull()->TEST_WaitForFlushMemTable(handles_[3]); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), static_cast(1)); } // "nikitich" still has data of 80KB // Inserting Data in "dobrynia" triggers "nikitich" flushing. ASSERT_OK(Put(3, Key(2), DummyString(40000))); ASSERT_OK(Put(2, Key(2), DummyString(40000))); ASSERT_OK(Put(0, Key(1), DummyString(1))); dbfull()->TEST_WaitForFlushMemTable(handles_[1]); dbfull()->TEST_WaitForFlushMemTable(handles_[2]); dbfull()->TEST_WaitForFlushMemTable(handles_[3]); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), static_cast(2)); } // "dobrynia" still has 40KB ASSERT_OK(Put(1, Key(2), DummyString(20000))); ASSERT_OK(Put(0, Key(1), DummyString(10000))); ASSERT_OK(Put(0, Key(1), DummyString(1))); dbfull()->TEST_WaitForFlushMemTable(handles_[0]); dbfull()->TEST_WaitForFlushMemTable(handles_[1]); dbfull()->TEST_WaitForFlushMemTable(handles_[2]); dbfull()->TEST_WaitForFlushMemTable(handles_[3]); // This should triggers no flush { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), static_cast(2)); } // "default": 10KB, "pikachu": 20KB, "dobrynia": 40KB ASSERT_OK(Put(1, Key(2), DummyString(40000))); ASSERT_OK(Put(0, Key(1), DummyString(1))); dbfull()->TEST_WaitForFlushMemTable(handles_[0]); dbfull()->TEST_WaitForFlushMemTable(handles_[1]); dbfull()->TEST_WaitForFlushMemTable(handles_[2]); dbfull()->TEST_WaitForFlushMemTable(handles_[3]); // This should triggers flush of "pikachu" { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), static_cast(2)); } // "default": 10KB, "dobrynia": 40KB // Some remaining writes so 'default', 'dobrynia' and 'nikitich' flush on // closure. ASSERT_OK(Put(3, Key(1), DummyString(1))); ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"}, options); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), static_cast(2)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), static_cast(3)); } } INSTANTIATE_TEST_CASE_P(DBTestSharedWriteBufferAcrossCFs, DBTestSharedWriteBufferAcrossCFs, ::testing::Bool()); TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) { std::string dbname2 = test::TmpDir(env_) + "/db_shared_wb_db2"; Options options = CurrentOptions(); options.write_buffer_size = 500000; // this is never hit options.write_buffer_manager.reset(new WriteBufferManager(100000)); CreateAndReopenWithCF({"cf1", "cf2"}, options); ASSERT_OK(DestroyDB(dbname2, options)); DB* db2 = nullptr; ASSERT_OK(DB::Open(options, dbname2, &db2)); WriteOptions wo; // Trigger a flush on cf2 ASSERT_OK(Put(0, Key(1), DummyString(1))); ASSERT_OK(Put(1, Key(1), DummyString(1))); ASSERT_OK(Put(2, Key(1), DummyString(90000))); // Insert to DB2 ASSERT_OK(db2->Put(wo, Key(2), DummyString(20000))); ASSERT_OK(Put(2, Key(1), DummyString(1))); dbfull()->TEST_WaitForFlushMemTable(handles_[0]); dbfull()->TEST_WaitForFlushMemTable(handles_[1]); dbfull()->TEST_WaitForFlushMemTable(handles_[2]); static_cast(db2)->TEST_WaitForFlushMemTable(); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"), static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"), static_cast(0)); } // db2: 20KB ASSERT_OK(db2->Put(wo, Key(2), DummyString(40000))); ASSERT_OK(db2->Put(wo, Key(3), DummyString(70000))); ASSERT_OK(db2->Put(wo, Key(1), DummyString(1))); dbfull()->TEST_WaitForFlushMemTable(handles_[0]); dbfull()->TEST_WaitForFlushMemTable(handles_[1]); dbfull()->TEST_WaitForFlushMemTable(handles_[2]); static_cast(db2)->TEST_WaitForFlushMemTable(); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"), static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"), static_cast(1)); } // // Inserting Data in db2 and db_ triggers flushing in db_. ASSERT_OK(db2->Put(wo, Key(3), DummyString(70000))); ASSERT_OK(Put(2, Key(2), DummyString(45000))); ASSERT_OK(Put(0, Key(1), DummyString(1))); dbfull()->TEST_WaitForFlushMemTable(handles_[0]); dbfull()->TEST_WaitForFlushMemTable(handles_[1]); dbfull()->TEST_WaitForFlushMemTable(handles_[2]); static_cast(db2)->TEST_WaitForFlushMemTable(); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"), static_cast(2)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"), static_cast(1)); } delete db2; ASSERT_OK(DestroyDB(dbname2, options)); } namespace { void ValidateKeyExistence(DB* db, const std::vector& keys_must_exist, const std::vector& keys_must_not_exist) { // Ensure that expected keys exist std::vector values; if (keys_must_exist.size() > 0) { std::vector status_list = db->MultiGet(ReadOptions(), keys_must_exist, &values); for (size_t i = 0; i < keys_must_exist.size(); i++) { ASSERT_OK(status_list[i]); } } // Ensure that given keys don't exist if (keys_must_not_exist.size() > 0) { std::vector status_list = db->MultiGet(ReadOptions(), keys_must_not_exist, &values); for (size_t i = 0; i < keys_must_not_exist.size(); i++) { ASSERT_TRUE(status_list[i].IsNotFound()); } } } } // namespace TEST_F(DBTest2, WalFilterTest) { class TestWalFilter : public WalFilter { private: // Processing option that is requested to be applied at the given index WalFilter::WalProcessingOption wal_processing_option_; // Index at which to apply wal_processing_option_ // At other indexes default wal_processing_option::kContinueProcessing is // returned. size_t apply_option_at_record_index_; // Current record index, incremented with each record encountered. size_t current_record_index_; public: TestWalFilter(WalFilter::WalProcessingOption wal_processing_option, size_t apply_option_for_record_index) : wal_processing_option_(wal_processing_option), apply_option_at_record_index_(apply_option_for_record_index), current_record_index_(0) {} virtual WalProcessingOption LogRecord(const WriteBatch& batch, WriteBatch* new_batch, bool* batch_changed) const override { WalFilter::WalProcessingOption option_to_return; if (current_record_index_ == apply_option_at_record_index_) { option_to_return = wal_processing_option_; } else { option_to_return = WalProcessingOption::kContinueProcessing; } // Filter is passed as a const object for RocksDB to not modify the // object, however we modify it for our own purpose here and hence // cast the constness away. (const_cast(this)->current_record_index_)++; return option_to_return; } virtual const char* Name() const override { return "TestWalFilter"; } }; // Create 3 batches with two keys each std::vector> batch_keys(3); batch_keys[0].push_back("key1"); batch_keys[0].push_back("key2"); batch_keys[1].push_back("key3"); batch_keys[1].push_back("key4"); batch_keys[2].push_back("key5"); batch_keys[2].push_back("key6"); // Test with all WAL processing options for (int option = 0; option < static_cast( WalFilter::WalProcessingOption::kWalProcessingOptionMax); option++) { Options options = OptionsForLogIterTest(); DestroyAndReopen(options); CreateAndReopenWithCF({ "pikachu" }, options); // Write given keys in given batches for (size_t i = 0; i < batch_keys.size(); i++) { WriteBatch batch; for (size_t j = 0; j < batch_keys[i].size(); j++) { batch.Put(handles_[0], batch_keys[i][j], DummyString(1024)); } dbfull()->Write(WriteOptions(), &batch); } WalFilter::WalProcessingOption wal_processing_option = static_cast(option); // Create a test filter that would apply wal_processing_option at the first // record size_t apply_option_for_record_index = 1; TestWalFilter test_wal_filter(wal_processing_option, apply_option_for_record_index); // Reopen database with option to use WAL filter options = OptionsForLogIterTest(); options.wal_filter = &test_wal_filter; Status status = TryReopenWithColumnFamilies({ "default", "pikachu" }, options); if (wal_processing_option == WalFilter::WalProcessingOption::kCorruptedRecord) { assert(!status.ok()); // In case of corruption we can turn off paranoid_checks to reopen // databse options.paranoid_checks = false; ReopenWithColumnFamilies({ "default", "pikachu" }, options); } else { assert(status.ok()); } // Compute which keys we expect to be found // and which we expect not to be found after recovery. std::vector keys_must_exist; std::vector keys_must_not_exist; switch (wal_processing_option) { case WalFilter::WalProcessingOption::kCorruptedRecord: case WalFilter::WalProcessingOption::kContinueProcessing: { fprintf(stderr, "Testing with complete WAL processing\n"); // we expect all records to be processed for (size_t i = 0; i < batch_keys.size(); i++) { for (size_t j = 0; j < batch_keys[i].size(); j++) { keys_must_exist.push_back(Slice(batch_keys[i][j])); } } break; } case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: { fprintf(stderr, "Testing with ignoring record %" ROCKSDB_PRIszt " only\n", apply_option_for_record_index); // We expect the record with apply_option_for_record_index to be not // found. for (size_t i = 0; i < batch_keys.size(); i++) { for (size_t j = 0; j < batch_keys[i].size(); j++) { if (i == apply_option_for_record_index) { keys_must_not_exist.push_back(Slice(batch_keys[i][j])); } else { keys_must_exist.push_back(Slice(batch_keys[i][j])); } } } break; } case WalFilter::WalProcessingOption::kStopReplay: { fprintf(stderr, "Testing with stopping replay from record %" ROCKSDB_PRIszt "\n", apply_option_for_record_index); // We expect records beyond apply_option_for_record_index to be not // found. for (size_t i = 0; i < batch_keys.size(); i++) { for (size_t j = 0; j < batch_keys[i].size(); j++) { if (i >= apply_option_for_record_index) { keys_must_not_exist.push_back(Slice(batch_keys[i][j])); } else { keys_must_exist.push_back(Slice(batch_keys[i][j])); } } } break; } default: assert(false); // unhandled case } bool checked_after_reopen = false; while (true) { // Ensure that expected keys exists // and not expected keys don't exist after recovery ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist); if (checked_after_reopen) { break; } // reopen database again to make sure previous log(s) are not used //(even if they were skipped) // reopn database with option to use WAL filter options = OptionsForLogIterTest(); ReopenWithColumnFamilies({ "default", "pikachu" }, options); checked_after_reopen = true; } } } TEST_F(DBTest2, WalFilterTestWithChangeBatch) { class ChangeBatchHandler : public WriteBatch::Handler { private: // Batch to insert keys in WriteBatch* new_write_batch_; // Number of keys to add in the new batch size_t num_keys_to_add_in_new_batch_; // Number of keys added to new batch size_t num_keys_added_; public: ChangeBatchHandler(WriteBatch* new_write_batch, size_t num_keys_to_add_in_new_batch) : new_write_batch_(new_write_batch), num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch), num_keys_added_(0) {} virtual void Put(const Slice& key, const Slice& value) override { if (num_keys_added_ < num_keys_to_add_in_new_batch_) { new_write_batch_->Put(key, value); ++num_keys_added_; } } }; class TestWalFilterWithChangeBatch : public WalFilter { private: // Index at which to start changing records size_t change_records_from_index_; // Number of keys to add in the new batch size_t num_keys_to_add_in_new_batch_; // Current record index, incremented with each record encountered. size_t current_record_index_; public: TestWalFilterWithChangeBatch(size_t change_records_from_index, size_t num_keys_to_add_in_new_batch) : change_records_from_index_(change_records_from_index), num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch), current_record_index_(0) {} virtual WalProcessingOption LogRecord(const WriteBatch& batch, WriteBatch* new_batch, bool* batch_changed) const override { if (current_record_index_ >= change_records_from_index_) { ChangeBatchHandler handler(new_batch, num_keys_to_add_in_new_batch_); batch.Iterate(&handler); *batch_changed = true; } // Filter is passed as a const object for RocksDB to not modify the // object, however we modify it for our own purpose here and hence // cast the constness away. (const_cast(this) ->current_record_index_)++; return WalProcessingOption::kContinueProcessing; } virtual const char* Name() const override { return "TestWalFilterWithChangeBatch"; } }; std::vector> batch_keys(3); batch_keys[0].push_back("key1"); batch_keys[0].push_back("key2"); batch_keys[1].push_back("key3"); batch_keys[1].push_back("key4"); batch_keys[2].push_back("key5"); batch_keys[2].push_back("key6"); Options options = OptionsForLogIterTest(); DestroyAndReopen(options); CreateAndReopenWithCF({ "pikachu" }, options); // Write given keys in given batches for (size_t i = 0; i < batch_keys.size(); i++) { WriteBatch batch; for (size_t j = 0; j < batch_keys[i].size(); j++) { batch.Put(handles_[0], batch_keys[i][j], DummyString(1024)); } dbfull()->Write(WriteOptions(), &batch); } // Create a test filter that would apply wal_processing_option at the first // record size_t change_records_from_index = 1; size_t num_keys_to_add_in_new_batch = 1; TestWalFilterWithChangeBatch test_wal_filter_with_change_batch( change_records_from_index, num_keys_to_add_in_new_batch); // Reopen database with option to use WAL filter options = OptionsForLogIterTest(); options.wal_filter = &test_wal_filter_with_change_batch; ReopenWithColumnFamilies({ "default", "pikachu" }, options); // Ensure that all keys exist before change_records_from_index_ // And after that index only single key exists // as our filter adds only single key for each batch std::vector keys_must_exist; std::vector keys_must_not_exist; for (size_t i = 0; i < batch_keys.size(); i++) { for (size_t j = 0; j < batch_keys[i].size(); j++) { if (i >= change_records_from_index && j >= num_keys_to_add_in_new_batch) { keys_must_not_exist.push_back(Slice(batch_keys[i][j])); } else { keys_must_exist.push_back(Slice(batch_keys[i][j])); } } } bool checked_after_reopen = false; while (true) { // Ensure that expected keys exists // and not expected keys don't exist after recovery ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist); if (checked_after_reopen) { break; } // reopen database again to make sure previous log(s) are not used //(even if they were skipped) // reopn database with option to use WAL filter options = OptionsForLogIterTest(); ReopenWithColumnFamilies({ "default", "pikachu" }, options); checked_after_reopen = true; } } TEST_F(DBTest2, WalFilterTestWithChangeBatchExtraKeys) { class TestWalFilterWithChangeBatchAddExtraKeys : public WalFilter { public: virtual WalProcessingOption LogRecord(const WriteBatch& batch, WriteBatch* new_batch, bool* batch_changed) const override { *new_batch = batch; new_batch->Put("key_extra", "value_extra"); *batch_changed = true; return WalProcessingOption::kContinueProcessing; } virtual const char* Name() const override { return "WalFilterTestWithChangeBatchExtraKeys"; } }; std::vector> batch_keys(3); batch_keys[0].push_back("key1"); batch_keys[0].push_back("key2"); batch_keys[1].push_back("key3"); batch_keys[1].push_back("key4"); batch_keys[2].push_back("key5"); batch_keys[2].push_back("key6"); Options options = OptionsForLogIterTest(); DestroyAndReopen(options); CreateAndReopenWithCF({ "pikachu" }, options); // Write given keys in given batches for (size_t i = 0; i < batch_keys.size(); i++) { WriteBatch batch; for (size_t j = 0; j < batch_keys[i].size(); j++) { batch.Put(handles_[0], batch_keys[i][j], DummyString(1024)); } dbfull()->Write(WriteOptions(), &batch); } // Create a test filter that would add extra keys TestWalFilterWithChangeBatchAddExtraKeys test_wal_filter_extra_keys; // Reopen database with option to use WAL filter options = OptionsForLogIterTest(); options.wal_filter = &test_wal_filter_extra_keys; Status status = TryReopenWithColumnFamilies({"default", "pikachu"}, options); ASSERT_TRUE(status.IsNotSupported()); // Reopen without filter, now reopen should succeed - previous // attempt to open must not have altered the db. options = OptionsForLogIterTest(); ReopenWithColumnFamilies({ "default", "pikachu" }, options); std::vector keys_must_exist; std::vector keys_must_not_exist; // empty vector for (size_t i = 0; i < batch_keys.size(); i++) { for (size_t j = 0; j < batch_keys[i].size(); j++) { keys_must_exist.push_back(Slice(batch_keys[i][j])); } } ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist); } TEST_F(DBTest2, WalFilterTestWithColumnFamilies) { class TestWalFilterWithColumnFamilies : public WalFilter { private: // column_family_id -> log_number map (provided to WALFilter) std::map cf_log_number_map_; // column_family_name -> column_family_id map (provided to WALFilter) std::map cf_name_id_map_; // column_family_name -> keys_found_in_wal map // We store keys that are applicable to the column_family // during recovery (i.e. aren't already flushed to SST file(s)) // for verification against the keys we expect. std::map> cf_wal_keys_; public: virtual void ColumnFamilyLogNumberMap( const std::map& cf_lognumber_map, const std::map& cf_name_id_map) override { cf_log_number_map_ = cf_lognumber_map; cf_name_id_map_ = cf_name_id_map; } virtual WalProcessingOption LogRecordFound(unsigned long long log_number, const std::string& log_file_name, const WriteBatch& batch, WriteBatch* new_batch, bool* batch_changed) override { class LogRecordBatchHandler : public WriteBatch::Handler { private: const std::map & cf_log_number_map_; std::map> & cf_wal_keys_; unsigned long long log_number_; public: LogRecordBatchHandler(unsigned long long current_log_number, const std::map & cf_log_number_map, std::map> & cf_wal_keys) : cf_log_number_map_(cf_log_number_map), cf_wal_keys_(cf_wal_keys), log_number_(current_log_number){} virtual Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& /*value*/) override { auto it = cf_log_number_map_.find(column_family_id); assert(it != cf_log_number_map_.end()); unsigned long long log_number_for_cf = it->second; // If the current record is applicable for column_family_id // (i.e. isn't flushed to SST file(s) for column_family_id) // add it to the cf_wal_keys_ map for verification. if (log_number_ >= log_number_for_cf) { cf_wal_keys_[column_family_id].push_back(std::string(key.data(), key.size())); } return Status::OK(); } } handler(log_number, cf_log_number_map_, cf_wal_keys_); batch.Iterate(&handler); return WalProcessingOption::kContinueProcessing; } virtual const char* Name() const override { return "WalFilterTestWithColumnFamilies"; } const std::map>& GetColumnFamilyKeys() { return cf_wal_keys_; } const std::map & GetColumnFamilyNameIdMap() { return cf_name_id_map_; } }; std::vector> batch_keys_pre_flush(3); batch_keys_pre_flush[0].push_back("key1"); batch_keys_pre_flush[0].push_back("key2"); batch_keys_pre_flush[1].push_back("key3"); batch_keys_pre_flush[1].push_back("key4"); batch_keys_pre_flush[2].push_back("key5"); batch_keys_pre_flush[2].push_back("key6"); Options options = OptionsForLogIterTest(); DestroyAndReopen(options); CreateAndReopenWithCF({ "pikachu" }, options); // Write given keys in given batches for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) { WriteBatch batch; for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) { batch.Put(handles_[0], batch_keys_pre_flush[i][j], DummyString(1024)); batch.Put(handles_[1], batch_keys_pre_flush[i][j], DummyString(1024)); } dbfull()->Write(WriteOptions(), &batch); } //Flush default column-family db_->Flush(FlushOptions(), handles_[0]); // Do some more writes std::vector> batch_keys_post_flush(3); batch_keys_post_flush[0].push_back("key7"); batch_keys_post_flush[0].push_back("key8"); batch_keys_post_flush[1].push_back("key9"); batch_keys_post_flush[1].push_back("key10"); batch_keys_post_flush[2].push_back("key11"); batch_keys_post_flush[2].push_back("key12"); // Write given keys in given batches for (size_t i = 0; i < batch_keys_post_flush.size(); i++) { WriteBatch batch; for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) { batch.Put(handles_[0], batch_keys_post_flush[i][j], DummyString(1024)); batch.Put(handles_[1], batch_keys_post_flush[i][j], DummyString(1024)); } dbfull()->Write(WriteOptions(), &batch); } // On Recovery we should only find the second batch applicable to default CF // But both batches applicable to pikachu CF // Create a test filter that would add extra keys TestWalFilterWithColumnFamilies test_wal_filter_column_families; // Reopen database with option to use WAL filter options = OptionsForLogIterTest(); options.wal_filter = &test_wal_filter_column_families; Status status = TryReopenWithColumnFamilies({ "default", "pikachu" }, options); ASSERT_TRUE(status.ok()); // verify that handles_[0] only has post_flush keys // while handles_[1] has pre and post flush keys auto cf_wal_keys = test_wal_filter_column_families.GetColumnFamilyKeys(); auto name_id_map = test_wal_filter_column_families.GetColumnFamilyNameIdMap(); size_t index = 0; auto keys_cf = cf_wal_keys[name_id_map[kDefaultColumnFamilyName]]; //default column-family, only post_flush keys are expected for (size_t i = 0; i < batch_keys_post_flush.size(); i++) { for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) { Slice key_from_the_log(keys_cf[index++]); Slice batch_key(batch_keys_post_flush[i][j]); ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0); } } ASSERT_TRUE(index == keys_cf.size()); index = 0; keys_cf = cf_wal_keys[name_id_map["pikachu"]]; //pikachu column-family, all keys are expected for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) { for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) { Slice key_from_the_log(keys_cf[index++]); Slice batch_key(batch_keys_pre_flush[i][j]); ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0); } } for (size_t i = 0; i < batch_keys_post_flush.size(); i++) { for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) { Slice key_from_the_log(keys_cf[index++]); Slice batch_key(batch_keys_post_flush[i][j]); ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0); } } ASSERT_TRUE(index == keys_cf.size()); } TEST_F(DBTest2, PresetCompressionDict) { const size_t kBlockSizeBytes = 4 << 10; const size_t kL0FileBytes = 128 << 10; const size_t kApproxPerBlockOverheadBytes = 50; const int kNumL0Files = 5; Options options; options.arena_block_size = kBlockSizeBytes; options.compaction_style = kCompactionStyleUniversal; options.create_if_missing = true; options.disable_auto_compactions = true; options.level0_file_num_compaction_trigger = kNumL0Files; options.memtable_factory.reset( new SpecialSkipListFactory(kL0FileBytes / kBlockSizeBytes)); options.num_levels = 2; options.target_file_size_base = kL0FileBytes; options.target_file_size_multiplier = 2; options.write_buffer_size = kL0FileBytes; BlockBasedTableOptions table_options; table_options.block_size = kBlockSizeBytes; std::vector compression_types; if (Zlib_Supported()) { compression_types.push_back(kZlibCompression); } #if LZ4_VERSION_NUMBER >= 10400 // r124+ compression_types.push_back(kLZ4Compression); compression_types.push_back(kLZ4HCCompression); #endif // LZ4_VERSION_NUMBER >= 10400 #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ compression_types.push_back(kZSTDNotFinalCompression); #endif // ZSTD_VERSION_NUMBER >= 500 for (auto compression_type : compression_types) { options.compression = compression_type; size_t prev_out_bytes; for (int i = 0; i < 2; ++i) { // First iteration: compress without preset dictionary // Second iteration: compress with preset dictionary // To make sure the compression dictionary was actually used, we verify // the compressed size is smaller in the second iteration. Also in the // second iteration, verify the data we get out is the same data we put // in. if (i) { options.compression_opts.max_dict_bytes = kBlockSizeBytes; } else { options.compression_opts.max_dict_bytes = 0; } options.statistics = rocksdb::CreateDBStatistics(); options.table_factory.reset(NewBlockBasedTableFactory(table_options)); CreateAndReopenWithCF({"pikachu"}, options); Random rnd(301); std::string seq_data = RandomString(&rnd, kBlockSizeBytes - kApproxPerBlockOverheadBytes); ASSERT_EQ(0, NumTableFilesAtLevel(0, 1)); for (int j = 0; j < kNumL0Files; ++j) { for (size_t k = 0; k < kL0FileBytes / kBlockSizeBytes + 1; ++k) { ASSERT_OK(Put(1, Key(static_cast( j * (kL0FileBytes / kBlockSizeBytes) + k)), seq_data)); } dbfull()->TEST_WaitForFlushMemTable(handles_[1]); ASSERT_EQ(j + 1, NumTableFilesAtLevel(0, 1)); } db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); ASSERT_EQ(0, NumTableFilesAtLevel(0, 1)); ASSERT_GT(NumTableFilesAtLevel(1, 1), 0); size_t out_bytes = 0; std::vector files; GetSstFiles(dbname_, &files); for (const auto& file : files) { uint64_t curr_bytes; env_->GetFileSize(dbname_ + "/" + file, &curr_bytes); out_bytes += static_cast(curr_bytes); } for (size_t j = 0; j < kNumL0Files * (kL0FileBytes / kBlockSizeBytes); j++) { ASSERT_EQ(seq_data, Get(1, Key(static_cast(j)))); } if (i) { ASSERT_GT(prev_out_bytes, out_bytes); } prev_out_bytes = out_bytes; DestroyAndReopen(options); } } } class CompactionCompressionListener : public EventListener { public: explicit CompactionCompressionListener(Options* db_options) : db_options_(db_options) {} void OnCompactionCompleted(DB* db, const CompactionJobInfo& ci) override { // Figure out last level with files int bottommost_level = 0; for (int level = 0; level < db->NumberLevels(); level++) { std::string files_at_level; ASSERT_TRUE( db->GetProperty("rocksdb.num-files-at-level" + NumberToString(level), &files_at_level)); if (files_at_level != "0") { bottommost_level = level; } } if (db_options_->bottommost_compression != kDisableCompressionOption && ci.output_level == bottommost_level && ci.output_level >= 2) { ASSERT_EQ(ci.compression, db_options_->bottommost_compression); } else if (db_options_->compression_per_level.size() != 0) { ASSERT_EQ(ci.compression, db_options_->compression_per_level[ci.output_level]); } else { ASSERT_EQ(ci.compression, db_options_->compression); } max_level_checked = std::max(max_level_checked, ci.output_level); } int max_level_checked = 0; const Options* db_options_; }; TEST_F(DBTest2, CompressionOptions) { if (!Zlib_Supported() || !Snappy_Supported()) { return; } Options options = CurrentOptions(); options.level0_file_num_compaction_trigger = 2; options.max_bytes_for_level_base = 100; options.max_bytes_for_level_multiplier = 2; options.num_levels = 7; options.max_background_compactions = 1; options.base_background_compactions = 1; CompactionCompressionListener* listener = new CompactionCompressionListener(&options); options.listeners.emplace_back(listener); const int kKeySize = 5; const int kValSize = 20; Random rnd(301); for (int iter = 0; iter <= 2; iter++) { listener->max_level_checked = 0; if (iter == 0) { // Use different compression algorithms for different levels but // always use Zlib for bottommost level options.compression_per_level = {kNoCompression, kNoCompression, kNoCompression, kSnappyCompression, kSnappyCompression, kSnappyCompression, kZlibCompression}; options.compression = kNoCompression; options.bottommost_compression = kZlibCompression; } else if (iter == 1) { // Use Snappy except for bottommost level use ZLib options.compression_per_level = {}; options.compression = kSnappyCompression; options.bottommost_compression = kZlibCompression; } else if (iter == 2) { // Use Snappy everywhere options.compression_per_level = {}; options.compression = kSnappyCompression; options.bottommost_compression = kDisableCompressionOption; } DestroyAndReopen(options); // Write 10 random files for (int i = 0; i < 10; i++) { for (int j = 0; j < 5; j++) { ASSERT_OK( Put(RandomString(&rnd, kKeySize), RandomString(&rnd, kValSize))); } ASSERT_OK(Flush()); dbfull()->TEST_WaitForCompact(); } // Make sure that we wrote enough to check all 7 levels ASSERT_EQ(listener->max_level_checked, 6); } } class CompactionStallTestListener : public EventListener { public: CompactionStallTestListener() : compacted_files_cnt_(0) {} void OnCompactionCompleted(DB* db, const CompactionJobInfo& ci) override { ASSERT_EQ(ci.cf_name, "default"); ASSERT_EQ(ci.base_input_level, 0); ASSERT_EQ(ci.compaction_reason, CompactionReason::kLevelL0FilesNum); compacted_files_cnt_ += ci.input_files.size(); } std::atomic compacted_files_cnt_; }; TEST_F(DBTest2, CompactionStall) { rocksdb::SyncPoint::GetInstance()->LoadDependency( {{"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:0"}, {"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:1"}, {"DBTest2::CompactionStall:2", "DBImpl::NotifyOnCompactionCompleted::UnlockMutex"}}); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); Options options = CurrentOptions(); options.level0_file_num_compaction_trigger = 4; options.max_background_compactions = 40; CompactionStallTestListener* listener = new CompactionStallTestListener(); options.listeners.emplace_back(listener); DestroyAndReopen(options); Random rnd(301); // 4 Files in L0 for (int i = 0; i < 4; i++) { for (int j = 0; j < 10; j++) { ASSERT_OK(Put(RandomString(&rnd, 10), RandomString(&rnd, 10))); } ASSERT_OK(Flush()); } // Wait for compaction to be triggered TEST_SYNC_POINT("DBTest2::CompactionStall:0"); // Clear "DBImpl::BGWorkCompaction" SYNC_POINT since we want to hold it again // at DBTest2::CompactionStall::1 rocksdb::SyncPoint::GetInstance()->ClearTrace(); // Another 6 L0 files to trigger compaction again for (int i = 0; i < 6; i++) { for (int j = 0; j < 10; j++) { ASSERT_OK(Put(RandomString(&rnd, 10), RandomString(&rnd, 10))); } ASSERT_OK(Flush()); } // Wait for another compaction to be triggered TEST_SYNC_POINT("DBTest2::CompactionStall:1"); // Hold NotifyOnCompactionCompleted in the unlock mutex section TEST_SYNC_POINT("DBTest2::CompactionStall:2"); dbfull()->TEST_WaitForCompact(); ASSERT_LT(NumTableFilesAtLevel(0), options.level0_file_num_compaction_trigger); ASSERT_GT(listener->compacted_files_cnt_.load(), 10 - options.level0_file_num_compaction_trigger); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } #endif // ROCKSDB_LITE TEST_F(DBTest2, FirstSnapshotTest) { Options options; options.write_buffer_size = 100000; // Small write buffer options = CurrentOptions(options); CreateAndReopenWithCF({"pikachu"}, options); // This snapshot will have sequence number 0 what is expected behaviour. const Snapshot* s1 = db_->GetSnapshot(); Put(1, "k1", std::string(100000, 'x')); // Fill memtable Put(1, "k2", std::string(100000, 'y')); // Trigger flush db_->ReleaseSnapshot(s1); } class PinL0IndexAndFilterBlocksTest : public DBTestBase, public testing::WithParamInterface { public: PinL0IndexAndFilterBlocksTest() : DBTestBase("/db_pin_l0_index_bloom_test") {} virtual void SetUp() override { infinite_max_files_ = GetParam(); } bool infinite_max_files_; }; TEST_P(PinL0IndexAndFilterBlocksTest, IndexAndFilterBlocksOfNewTableAddedToCacheWithPinning) { Options options = CurrentOptions(); if (infinite_max_files_) { options.max_open_files = -1; } options.create_if_missing = true; options.statistics = rocksdb::CreateDBStatistics(); BlockBasedTableOptions table_options; table_options.cache_index_and_filter_blocks = true; table_options.pin_l0_filter_and_index_blocks_in_cache = true; table_options.filter_policy.reset(NewBloomFilterPolicy(20)); options.table_factory.reset(new BlockBasedTableFactory(table_options)); CreateAndReopenWithCF({"pikachu"}, options); ASSERT_OK(Put(1, "key", "val")); // Create a new table. ASSERT_OK(Flush(1)); // index/filter blocks added to block cache right after table creation. ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT)); // only index/filter were added ASSERT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_ADD)); ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_DATA_MISS)); std::string value; // Miss and hit count should remain the same, they're all pinned. db_->KeyMayExist(ReadOptions(), handles_[1], "key", &value); ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT)); // Miss and hit count should remain the same, they're all pinned. value = Get(1, "key"); ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT)); } TEST_P(PinL0IndexAndFilterBlocksTest, MultiLevelIndexAndFilterBlocksCachedWithPinning) { Options options = CurrentOptions(); if (infinite_max_files_) { options.max_open_files = -1; } options.create_if_missing = true; options.statistics = rocksdb::CreateDBStatistics(); BlockBasedTableOptions table_options; table_options.cache_index_and_filter_blocks = true; table_options.pin_l0_filter_and_index_blocks_in_cache = true; table_options.filter_policy.reset(NewBloomFilterPolicy(20)); options.table_factory.reset(new BlockBasedTableFactory(table_options)); CreateAndReopenWithCF({"pikachu"}, options); Put(1, "a", "begin"); Put(1, "z", "end"); ASSERT_OK(Flush(1)); // move this table to L1 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); // reset block cache table_options.block_cache = NewLRUCache(64 * 1024); options.table_factory.reset(NewBlockBasedTableFactory(table_options)); TryReopenWithColumnFamilies({"default", "pikachu"}, options); // create new table at L0 Put(1, "a2", "begin2"); Put(1, "z2", "end2"); ASSERT_OK(Flush(1)); table_options.block_cache->EraseUnRefEntries(); // get base cache values uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS); uint64_t fh = TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT); uint64_t im = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS); uint64_t ih = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT); std::string value; // this should be read from L0 // so cache values don't change value = Get(1, "a2"); ASSERT_EQ(fm, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); ASSERT_EQ(im, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT)); // this should be read from L1 // the file is opened, prefetching results in a cache filter miss // the block is loaded and added to the cache, // then the get results in a cache hit for L1 value = Get(1, "a"); ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); } INSTANTIATE_TEST_CASE_P(PinL0IndexAndFilterBlocksTest, PinL0IndexAndFilterBlocksTest, ::testing::Bool()); #ifndef ROCKSDB_LITE static void UniqueIdCallback(void* arg) { int* result = reinterpret_cast(arg); if (*result == -1) { *result = 0; } rocksdb::SyncPoint::GetInstance()->ClearTrace(); rocksdb::SyncPoint::GetInstance()->SetCallBack( "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback); } class MockPersistentCache : public PersistentCache { public: explicit MockPersistentCache(const bool is_compressed, const size_t max_size) : is_compressed_(is_compressed), max_size_(max_size) { rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->SetCallBack( "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback); } virtual ~MockPersistentCache() {} Status Insert(const Slice& page_key, const char* data, const size_t size) override { MutexLock _(&lock_); if (size_ > max_size_) { size_ -= data_.begin()->second.size(); data_.erase(data_.begin()); } data_.insert(std::make_pair(page_key.ToString(), std::string(data, size))); size_ += size; return Status::OK(); } Status Lookup(const Slice& page_key, std::unique_ptr* data, size_t* size) override { MutexLock _(&lock_); auto it = data_.find(page_key.ToString()); if (it == data_.end()) { return Status::NotFound(); } assert(page_key.ToString() == it->first); data->reset(new char[it->second.size()]); memcpy(data->get(), it->second.c_str(), it->second.size()); *size = it->second.size(); return Status::OK(); } bool IsCompressed() override { return is_compressed_; } port::Mutex lock_; std::map data_; const bool is_compressed_ = true; size_t size_ = 0; const size_t max_size_ = 10 * 1024; // 10KiB }; TEST_F(DBTest2, PersistentCache) { int num_iter = 80; Options options; options.write_buffer_size = 64 * 1024; // small write buffer options.statistics = rocksdb::CreateDBStatistics(); options = CurrentOptions(options); auto bsizes = {/*no block cache*/ 0, /*1M*/ 1 * 1024 * 1024}; auto types = {/*compressed*/ 1, /*uncompressed*/ 0}; for (auto bsize : bsizes) { for (auto type : types) { BlockBasedTableOptions table_options; table_options.persistent_cache.reset( new MockPersistentCache(type, 10 * 1024)); table_options.no_block_cache = true; table_options.block_cache = bsize ? NewLRUCache(bsize) : nullptr; table_options.block_cache_compressed = nullptr; options.table_factory.reset(NewBlockBasedTableFactory(table_options)); DestroyAndReopen(options); CreateAndReopenWithCF({"pikachu"}, options); // default column family doesn't have block cache Options no_block_cache_opts; no_block_cache_opts.statistics = options.statistics; no_block_cache_opts = CurrentOptions(no_block_cache_opts); BlockBasedTableOptions table_options_no_bc; table_options_no_bc.no_block_cache = true; no_block_cache_opts.table_factory.reset( NewBlockBasedTableFactory(table_options_no_bc)); ReopenWithColumnFamilies( {"default", "pikachu"}, std::vector({no_block_cache_opts, options})); Random rnd(301); // Write 8MB (80 values, each 100K) ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); std::vector values; std::string str; for (int i = 0; i < num_iter; i++) { if (i % 4 == 0) { // high compression ratio str = RandomString(&rnd, 1000); } values.push_back(str); ASSERT_OK(Put(1, Key(i), values[i])); } // flush all data from memtable so that reads are from block cache ASSERT_OK(Flush(1)); for (int i = 0; i < num_iter; i++) { ASSERT_EQ(Get(1, Key(i)), values[i]); } auto hit = options.statistics->getTickerCount(PERSISTENT_CACHE_HIT); auto miss = options.statistics->getTickerCount(PERSISTENT_CACHE_MISS); ASSERT_GT(hit, 0); ASSERT_GT(miss, 0); } } } namespace { void CountSyncPoint() { TEST_SYNC_POINT_CALLBACK("DBTest2::MarkedPoint", nullptr /* arg */); } } // namespace TEST_F(DBTest2, SyncPointMarker) { std::atomic sync_point_called(0); rocksdb::SyncPoint::GetInstance()->SetCallBack( "DBTest2::MarkedPoint", [&](void* arg) { sync_point_called.fetch_add(1); }); // The first dependency enforces Marker can be loaded before MarkedPoint. // The second checks that thread 1's MarkedPoint should be disabled here. // Execution order: // | Thread 1 | Thread 2 | // | | Marker | // | MarkedPoint | | // | Thread1First | | // | | MarkedPoint | rocksdb::SyncPoint::GetInstance()->LoadDependencyAndMarkers( {{"DBTest2::SyncPointMarker:Thread1First", "DBTest2::MarkedPoint"}}, {{"DBTest2::SyncPointMarker:Marker", "DBTest2::MarkedPoint"}}); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); std::function func1 = [&]() { CountSyncPoint(); TEST_SYNC_POINT("DBTest2::SyncPointMarker:Thread1First"); }; std::function func2 = [&]() { TEST_SYNC_POINT("DBTest2::SyncPointMarker:Marker"); CountSyncPoint(); }; auto thread1 = std::thread(func1); auto thread2 = std::thread(func2); thread1.join(); thread2.join(); // Callback is only executed once ASSERT_EQ(sync_point_called.load(), 1); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } #endif } // namespace rocksdb int main(int argc, char** argv) { rocksdb::port::InstallStackTraceHandler(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }