// Copyright (c) 2013, Facebook, Inc. All rights reserved. // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. #include "db/db_impl.h" #include "db/db_test_util.h" #include "db/dbformat.h" #include "db/filename.h" #include "db/version_set.h" #include "db/write_batch_internal.h" #include "memtable/hash_linklist_rep.h" #include "rocksdb/cache.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/filter_policy.h" #include "rocksdb/options.h" #include "rocksdb/perf_context.h" #include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" #include "rocksdb/table.h" #include "rocksdb/table_properties.h" #include "table/block_based_table_factory.h" #include "table/plain_table_factory.h" #include "util/hash.h" #include "util/logging.h" #include "util/mutexlock.h" #include "util/rate_limiter.h" #include "util/statistics.h" #include "util/string_util.h" #include "util/sync_point.h" #include "util/testharness.h" #include "util/testutil.h" #include "utilities/merge_operators.h" #ifndef ROCKSDB_LITE namespace rocksdb { class EventListenerTest : public DBTestBase { public: EventListenerTest() : DBTestBase("/listener_test") {} const size_t k110KB = 110 << 10; }; struct TestPropertiesCollector : public rocksdb::TablePropertiesCollector { virtual rocksdb::Status AddUserKey(const rocksdb::Slice& key, const rocksdb::Slice& value, rocksdb::EntryType type, rocksdb::SequenceNumber seq, uint64_t file_size) override { return Status::OK(); } virtual rocksdb::Status Finish( rocksdb::UserCollectedProperties* properties) override { properties->insert({"0", "1"}); return Status::OK(); } virtual const char* Name() const override { return "TestTablePropertiesCollector"; } rocksdb::UserCollectedProperties GetReadableProperties() const override { rocksdb::UserCollectedProperties ret; ret["2"] = "3"; return ret; } }; class TestPropertiesCollectorFactory : public TablePropertiesCollectorFactory { public: virtual TablePropertiesCollector* CreateTablePropertiesCollector( TablePropertiesCollectorFactory::Context context) override { return new TestPropertiesCollector; } const char* Name() const override { return "TestTablePropertiesCollector"; } }; class TestCompactionListener : public EventListener { public: void OnCompactionCompleted(DB *db, const CompactionJobInfo& ci) override { std::lock_guard lock(mutex_); compacted_dbs_.push_back(db); ASSERT_GT(ci.input_files.size(), 0U); ASSERT_GT(ci.output_files.size(), 0U); ASSERT_EQ(db->GetEnv()->GetThreadID(), ci.thread_id); ASSERT_GT(ci.thread_id, 0U); for (auto fl : {ci.input_files, ci.output_files}) { for (auto fn : fl) { auto it = ci.table_properties.find(fn); ASSERT_NE(it, ci.table_properties.end()); auto tp = it->second; ASSERT_TRUE(tp != nullptr); ASSERT_EQ(tp->user_collected_properties.find("0")->second, "1"); } } } std::vector compacted_dbs_; std::mutex mutex_; }; TEST_F(EventListenerTest, OnSingleDBCompactionTest) { const int kTestKeySize = 16; const int kTestValueSize = 984; const int kEntrySize = kTestKeySize + kTestValueSize; const int kEntriesPerBuffer = 100; const int kNumL0Files = 4; Options options; options.create_if_missing = true; options.write_buffer_size = kEntrySize * kEntriesPerBuffer; options.compaction_style = kCompactionStyleLevel; options.target_file_size_base = options.write_buffer_size; options.max_bytes_for_level_base = options.target_file_size_base * 2; options.max_bytes_for_level_multiplier = 2; options.compression = kNoCompression; #if ROCKSDB_USING_THREAD_STATUS options.enable_thread_tracking = true; #endif // ROCKSDB_USING_THREAD_STATUS options.level0_file_num_compaction_trigger = kNumL0Files; options.table_properties_collector_factories.push_back( std::make_shared()); TestCompactionListener* listener = new TestCompactionListener(); options.listeners.emplace_back(listener); std::vector cf_names = { "pikachu", "ilya", "muromec", "dobrynia", "nikitich", "alyosha", "popovich"}; CreateAndReopenWithCF(cf_names, options); ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p'))); ASSERT_OK(Put(2, "ilya", std::string(90000, 'i'))); ASSERT_OK(Put(3, "muromec", std::string(90000, 'm'))); ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd'))); ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n'))); ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a'))); ASSERT_OK(Put(7, "popovich", std::string(90000, 'p'))); for (int i = 1; i < 8; ++i) { ASSERT_OK(Flush(i)); const Slice kRangeStart = "a"; const Slice kRangeEnd = "z"; ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[i], &kRangeStart, &kRangeEnd)); dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForCompact(); } ASSERT_EQ(listener->compacted_dbs_.size(), cf_names.size()); for (size_t i = 0; i < cf_names.size(); ++i) { ASSERT_EQ(listener->compacted_dbs_[i], db_); } } // This simple Listener can only handle one flush at a time. class TestFlushListener : public EventListener { public: explicit TestFlushListener(Env* env) : slowdown_count(0), stop_count(0), db_closed(), env_(env) { db_closed = false; } void OnTableFileCreated( const TableFileCreationInfo& info) override { // remember the info for later checking the FlushJobInfo. prev_fc_info_ = info; ASSERT_GT(info.db_name.size(), 0U); ASSERT_GT(info.cf_name.size(), 0U); ASSERT_GT(info.file_path.size(), 0U); ASSERT_GT(info.job_id, 0); ASSERT_GT(info.table_properties.data_size, 0U); ASSERT_GT(info.table_properties.raw_key_size, 0U); ASSERT_GT(info.table_properties.raw_value_size, 0U); ASSERT_GT(info.table_properties.num_data_blocks, 0U); ASSERT_GT(info.table_properties.num_entries, 0U); #if ROCKSDB_USING_THREAD_STATUS // Verify the id of the current thread that created this table // file matches the id of any active flush or compaction thread. uint64_t thread_id = env_->GetThreadID(); std::vector thread_list; ASSERT_OK(env_->GetThreadList(&thread_list)); bool found_match = false; for (auto thread_status : thread_list) { if (thread_status.operation_type == ThreadStatus::OP_FLUSH || thread_status.operation_type == ThreadStatus::OP_COMPACTION) { if (thread_id == thread_status.thread_id) { found_match = true; break; } } } ASSERT_TRUE(found_match); #endif // ROCKSDB_USING_THREAD_STATUS } void OnFlushCompleted( DB* db, const FlushJobInfo& info) override { flushed_dbs_.push_back(db); flushed_column_family_names_.push_back(info.cf_name); if (info.triggered_writes_slowdown) { slowdown_count++; } if (info.triggered_writes_stop) { stop_count++; } // verify whether the previously created file matches the flushed file. ASSERT_EQ(prev_fc_info_.db_name, db->GetName()); ASSERT_EQ(prev_fc_info_.cf_name, info.cf_name); ASSERT_EQ(prev_fc_info_.job_id, info.job_id); ASSERT_EQ(prev_fc_info_.file_path, info.file_path); ASSERT_EQ(db->GetEnv()->GetThreadID(), info.thread_id); ASSERT_GT(info.thread_id, 0U); ASSERT_EQ(info.table_properties.user_collected_properties.find("0")->second, "1"); } std::vector flushed_column_family_names_; std::vector flushed_dbs_; int slowdown_count; int stop_count; bool db_closing; std::atomic_bool db_closed; TableFileCreationInfo prev_fc_info_; protected: Env* env_; }; TEST_F(EventListenerTest, OnSingleDBFlushTest) { Options options; options.write_buffer_size = k110KB; #if ROCKSDB_USING_THREAD_STATUS options.enable_thread_tracking = true; #endif // ROCKSDB_USING_THREAD_STATUS TestFlushListener* listener = new TestFlushListener(options.env); options.listeners.emplace_back(listener); std::vector cf_names = { "pikachu", "ilya", "muromec", "dobrynia", "nikitich", "alyosha", "popovich"}; options.table_properties_collector_factories.push_back( std::make_shared()); CreateAndReopenWithCF(cf_names, options); ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p'))); ASSERT_OK(Put(2, "ilya", std::string(90000, 'i'))); ASSERT_OK(Put(3, "muromec", std::string(90000, 'm'))); ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd'))); ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n'))); ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a'))); ASSERT_OK(Put(7, "popovich", std::string(90000, 'p'))); for (int i = 1; i < 8; ++i) { ASSERT_OK(Flush(i)); dbfull()->TEST_WaitForFlushMemTable(); ASSERT_EQ(listener->flushed_dbs_.size(), i); ASSERT_EQ(listener->flushed_column_family_names_.size(), i); } // make sure call-back functions are called in the right order for (size_t i = 0; i < cf_names.size(); ++i) { ASSERT_EQ(listener->flushed_dbs_[i], db_); ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]); } } TEST_F(EventListenerTest, MultiCF) { Options options; options.write_buffer_size = k110KB; #if ROCKSDB_USING_THREAD_STATUS options.enable_thread_tracking = true; #endif // ROCKSDB_USING_THREAD_STATUS TestFlushListener* listener = new TestFlushListener(options.env); options.listeners.emplace_back(listener); options.table_properties_collector_factories.push_back( std::make_shared()); std::vector cf_names = { "pikachu", "ilya", "muromec", "dobrynia", "nikitich", "alyosha", "popovich"}; CreateAndReopenWithCF(cf_names, options); ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p'))); ASSERT_OK(Put(2, "ilya", std::string(90000, 'i'))); ASSERT_OK(Put(3, "muromec", std::string(90000, 'm'))); ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd'))); ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n'))); ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a'))); ASSERT_OK(Put(7, "popovich", std::string(90000, 'p'))); for (int i = 1; i < 8; ++i) { ASSERT_OK(Flush(i)); ASSERT_EQ(listener->flushed_dbs_.size(), i); ASSERT_EQ(listener->flushed_column_family_names_.size(), i); } // make sure call-back functions are called in the right order for (size_t i = 0; i < cf_names.size(); i++) { ASSERT_EQ(listener->flushed_dbs_[i], db_); ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]); } } TEST_F(EventListenerTest, MultiDBMultiListeners) { Options options; #if ROCKSDB_USING_THREAD_STATUS options.enable_thread_tracking = true; #endif // ROCKSDB_USING_THREAD_STATUS options.table_properties_collector_factories.push_back( std::make_shared()); std::vector listeners; const int kNumDBs = 5; const int kNumListeners = 10; for (int i = 0; i < kNumListeners; ++i) { listeners.emplace_back(new TestFlushListener(options.env)); } std::vector cf_names = { "pikachu", "ilya", "muromec", "dobrynia", "nikitich", "alyosha", "popovich"}; options.create_if_missing = true; for (int i = 0; i < kNumListeners; ++i) { options.listeners.emplace_back(listeners[i]); } DBOptions db_opts(options); ColumnFamilyOptions cf_opts(options); std::vector dbs; std::vector> vec_handles; for (int d = 0; d < kNumDBs; ++d) { ASSERT_OK(DestroyDB(dbname_ + ToString(d), options)); DB* db; std::vector handles; ASSERT_OK(DB::Open(options, dbname_ + ToString(d), &db)); for (size_t c = 0; c < cf_names.size(); ++c) { ColumnFamilyHandle* handle; db->CreateColumnFamily(cf_opts, cf_names[c], &handle); handles.push_back(handle); } vec_handles.push_back(std::move(handles)); dbs.push_back(db); } for (int d = 0; d < kNumDBs; ++d) { for (size_t c = 0; c < cf_names.size(); ++c) { ASSERT_OK(dbs[d]->Put(WriteOptions(), vec_handles[d][c], cf_names[c], cf_names[c])); } } for (size_t c = 0; c < cf_names.size(); ++c) { for (int d = 0; d < kNumDBs; ++d) { ASSERT_OK(dbs[d]->Flush(FlushOptions(), vec_handles[d][c])); reinterpret_cast(dbs[d])->TEST_WaitForFlushMemTable(); } } for (auto* listener : listeners) { int pos = 0; for (size_t c = 0; c < cf_names.size(); ++c) { for (int d = 0; d < kNumDBs; ++d) { ASSERT_EQ(listener->flushed_dbs_[pos], dbs[d]); ASSERT_EQ(listener->flushed_column_family_names_[pos], cf_names[c]); pos++; } } } for (auto handles : vec_handles) { for (auto h : handles) { delete h; } handles.clear(); } vec_handles.clear(); for (auto db : dbs) { delete db; } } TEST_F(EventListenerTest, DisableBGCompaction) { Options options; #if ROCKSDB_USING_THREAD_STATUS options.enable_thread_tracking = true; #endif // ROCKSDB_USING_THREAD_STATUS TestFlushListener* listener = new TestFlushListener(options.env); const int kCompactionTrigger = 1; const int kSlowdownTrigger = 5; const int kStopTrigger = 100; options.level0_file_num_compaction_trigger = kCompactionTrigger; options.level0_slowdown_writes_trigger = kSlowdownTrigger; options.level0_stop_writes_trigger = kStopTrigger; options.max_write_buffer_number = 10; options.listeners.emplace_back(listener); // BG compaction is disabled. Number of L0 files will simply keeps // increasing in this test. options.compaction_style = kCompactionStyleNone; options.compression = kNoCompression; options.write_buffer_size = 100000; // Small write buffer options.table_properties_collector_factories.push_back( std::make_shared()); CreateAndReopenWithCF({"pikachu"}, options); ColumnFamilyMetaData cf_meta; db_->GetColumnFamilyMetaData(handles_[1], &cf_meta); // keep writing until writes are forced to stop. for (int i = 0; static_cast(cf_meta.file_count) < kSlowdownTrigger * 10; ++i) { Put(1, ToString(i), std::string(10000, 'x'), WriteOptions()); db_->Flush(FlushOptions(), handles_[1]); db_->GetColumnFamilyMetaData(handles_[1], &cf_meta); } ASSERT_GE(listener->slowdown_count, kSlowdownTrigger * 9); } } // namespace rocksdb #endif // ROCKSDB_LITE int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }