fork of https://github.com/oxigraph/rocksdb and https://github.com/facebook/rocksdb for nextgraph and oxigraph
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
428 lines
15 KiB
428 lines
15 KiB
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under the BSD-style license found in the
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
|
|
#include "db/db_impl.h"
|
|
#include "db/db_test_util.h"
|
|
#include "db/dbformat.h"
|
|
#include "db/filename.h"
|
|
#include "db/version_set.h"
|
|
#include "db/write_batch_internal.h"
|
|
#include "memtable/hash_linklist_rep.h"
|
|
#include "rocksdb/cache.h"
|
|
#include "rocksdb/compaction_filter.h"
|
|
#include "rocksdb/db.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/filter_policy.h"
|
|
#include "rocksdb/options.h"
|
|
#include "rocksdb/perf_context.h"
|
|
#include "rocksdb/slice.h"
|
|
#include "rocksdb/slice_transform.h"
|
|
#include "rocksdb/table.h"
|
|
#include "rocksdb/table_properties.h"
|
|
#include "table/block_based_table_factory.h"
|
|
#include "table/plain_table_factory.h"
|
|
#include "util/hash.h"
|
|
#include "util/logging.h"
|
|
#include "util/mutexlock.h"
|
|
#include "util/rate_limiter.h"
|
|
#include "util/statistics.h"
|
|
#include "util/string_util.h"
|
|
#include "util/sync_point.h"
|
|
#include "util/testharness.h"
|
|
#include "util/testutil.h"
|
|
#include "utilities/merge_operators.h"
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
namespace rocksdb {
|
|
|
|
class EventListenerTest : public DBTestBase {
|
|
public:
|
|
EventListenerTest() : DBTestBase("listener_test") {}
|
|
|
|
const size_t k110KB = 110 << 10;
|
|
};
|
|
|
|
struct TestPropertiesCollector : public rocksdb::TablePropertiesCollector {
|
|
virtual rocksdb::Status AddUserKey(const rocksdb::Slice& key,
|
|
const rocksdb::Slice& value,
|
|
rocksdb::EntryType type,
|
|
rocksdb::SequenceNumber seq,
|
|
uint64_t file_size) override {
|
|
return Status::OK();
|
|
}
|
|
virtual rocksdb::Status Finish(
|
|
rocksdb::UserCollectedProperties* properties) override {
|
|
properties->insert({"0", "1"});
|
|
return Status::OK();
|
|
}
|
|
|
|
virtual const char* Name() const override {
|
|
return "TestTablePropertiesCollector";
|
|
}
|
|
|
|
rocksdb::UserCollectedProperties GetReadableProperties() const override {
|
|
rocksdb::UserCollectedProperties ret;
|
|
ret["2"] = "3";
|
|
return ret;
|
|
}
|
|
};
|
|
|
|
class TestPropertiesCollectorFactory : public TablePropertiesCollectorFactory {
|
|
public:
|
|
virtual TablePropertiesCollector* CreateTablePropertiesCollector(
|
|
TablePropertiesCollectorFactory::Context context) override {
|
|
return new TestPropertiesCollector;
|
|
}
|
|
const char* Name() const override { return "TestTablePropertiesCollector"; }
|
|
};
|
|
|
|
class TestCompactionListener : public EventListener {
|
|
public:
|
|
void OnCompactionCompleted(DB *db, const CompactionJobInfo& ci) override {
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
compacted_dbs_.push_back(db);
|
|
ASSERT_GT(ci.input_files.size(), 0U);
|
|
ASSERT_GT(ci.output_files.size(), 0U);
|
|
ASSERT_EQ(db->GetEnv()->GetThreadID(), ci.thread_id);
|
|
ASSERT_GT(ci.thread_id, 0U);
|
|
|
|
for (auto fl : {ci.input_files, ci.output_files}) {
|
|
for (auto fn : fl) {
|
|
auto it = ci.table_properties.find(fn);
|
|
ASSERT_NE(it, ci.table_properties.end());
|
|
auto tp = it->second;
|
|
ASSERT_TRUE(tp != nullptr);
|
|
ASSERT_EQ(tp->user_collected_properties.find("0")->second, "1");
|
|
}
|
|
}
|
|
}
|
|
|
|
std::vector<DB*> compacted_dbs_;
|
|
std::mutex mutex_;
|
|
};
|
|
|
|
TEST_F(EventListenerTest, OnSingleDBCompactionTest) {
|
|
const int kTestKeySize = 16;
|
|
const int kTestValueSize = 984;
|
|
const int kEntrySize = kTestKeySize + kTestValueSize;
|
|
const int kEntriesPerBuffer = 100;
|
|
const int kNumL0Files = 4;
|
|
|
|
Options options;
|
|
options.create_if_missing = true;
|
|
options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
|
|
options.compaction_style = kCompactionStyleLevel;
|
|
options.target_file_size_base = options.write_buffer_size;
|
|
options.max_bytes_for_level_base = options.target_file_size_base * 2;
|
|
options.max_bytes_for_level_multiplier = 2;
|
|
options.compression = kNoCompression;
|
|
#if ROCKSDB_USING_THREAD_STATUS
|
|
options.enable_thread_tracking = true;
|
|
#endif // ROCKSDB_USING_THREAD_STATUS
|
|
options.level0_file_num_compaction_trigger = kNumL0Files;
|
|
options.table_properties_collector_factories.push_back(
|
|
std::make_shared<TestPropertiesCollectorFactory>());
|
|
|
|
TestCompactionListener* listener = new TestCompactionListener();
|
|
options.listeners.emplace_back(listener);
|
|
std::vector<std::string> cf_names = {
|
|
"pikachu", "ilya", "muromec", "dobrynia",
|
|
"nikitich", "alyosha", "popovich"};
|
|
CreateAndReopenWithCF(cf_names, options);
|
|
ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
|
|
ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
|
|
ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
|
|
ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
|
|
ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
|
|
ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
|
|
ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
|
|
for (int i = 1; i < 8; ++i) {
|
|
ASSERT_OK(Flush(i));
|
|
const Slice kRangeStart = "a";
|
|
const Slice kRangeEnd = "z";
|
|
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[i],
|
|
&kRangeStart, &kRangeEnd));
|
|
dbfull()->TEST_WaitForFlushMemTable();
|
|
dbfull()->TEST_WaitForCompact();
|
|
}
|
|
|
|
ASSERT_EQ(listener->compacted_dbs_.size(), cf_names.size());
|
|
for (size_t i = 0; i < cf_names.size(); ++i) {
|
|
ASSERT_EQ(listener->compacted_dbs_[i], db_);
|
|
}
|
|
}
|
|
|
|
// This simple Listener can only handle one flush at a time.
|
|
class TestFlushListener : public EventListener {
|
|
public:
|
|
explicit TestFlushListener(Env* env)
|
|
: slowdown_count(0), stop_count(0), db_closed(), env_(env) {
|
|
db_closed = false;
|
|
}
|
|
void OnTableFileCreated(
|
|
const TableFileCreationInfo& info) override {
|
|
// remember the info for later checking the FlushJobInfo.
|
|
prev_fc_info_ = info;
|
|
ASSERT_GT(info.db_name.size(), 0U);
|
|
ASSERT_GT(info.cf_name.size(), 0U);
|
|
ASSERT_GT(info.file_path.size(), 0U);
|
|
ASSERT_GT(info.job_id, 0);
|
|
ASSERT_GT(info.table_properties.data_size, 0U);
|
|
ASSERT_GT(info.table_properties.raw_key_size, 0U);
|
|
ASSERT_GT(info.table_properties.raw_value_size, 0U);
|
|
ASSERT_GT(info.table_properties.num_data_blocks, 0U);
|
|
ASSERT_GT(info.table_properties.num_entries, 0U);
|
|
|
|
#if ROCKSDB_USING_THREAD_STATUS
|
|
// Verify the id of the current thread that created this table
|
|
// file matches the id of any active flush or compaction thread.
|
|
uint64_t thread_id = env_->GetThreadID();
|
|
std::vector<ThreadStatus> thread_list;
|
|
ASSERT_OK(env_->GetThreadList(&thread_list));
|
|
bool found_match = false;
|
|
for (auto thread_status : thread_list) {
|
|
if (thread_status.operation_type == ThreadStatus::OP_FLUSH ||
|
|
thread_status.operation_type == ThreadStatus::OP_COMPACTION) {
|
|
if (thread_id == thread_status.thread_id) {
|
|
found_match = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
ASSERT_TRUE(found_match);
|
|
#endif // ROCKSDB_USING_THREAD_STATUS
|
|
}
|
|
|
|
void OnFlushCompleted(
|
|
DB* db, const FlushJobInfo& info) override {
|
|
flushed_dbs_.push_back(db);
|
|
flushed_column_family_names_.push_back(info.cf_name);
|
|
if (info.triggered_writes_slowdown) {
|
|
slowdown_count++;
|
|
}
|
|
if (info.triggered_writes_stop) {
|
|
stop_count++;
|
|
}
|
|
// verify whether the previously created file matches the flushed file.
|
|
ASSERT_EQ(prev_fc_info_.db_name, db->GetName());
|
|
ASSERT_EQ(prev_fc_info_.cf_name, info.cf_name);
|
|
ASSERT_EQ(prev_fc_info_.job_id, info.job_id);
|
|
ASSERT_EQ(prev_fc_info_.file_path, info.file_path);
|
|
ASSERT_EQ(db->GetEnv()->GetThreadID(), info.thread_id);
|
|
ASSERT_GT(info.thread_id, 0U);
|
|
ASSERT_EQ(info.table_properties.user_collected_properties.find("0")->second,
|
|
"1");
|
|
}
|
|
|
|
std::vector<std::string> flushed_column_family_names_;
|
|
std::vector<DB*> flushed_dbs_;
|
|
int slowdown_count;
|
|
int stop_count;
|
|
bool db_closing;
|
|
std::atomic_bool db_closed;
|
|
TableFileCreationInfo prev_fc_info_;
|
|
|
|
protected:
|
|
Env* env_;
|
|
};
|
|
|
|
TEST_F(EventListenerTest, OnSingleDBFlushTest) {
|
|
Options options;
|
|
options.write_buffer_size = k110KB;
|
|
#if ROCKSDB_USING_THREAD_STATUS
|
|
options.enable_thread_tracking = true;
|
|
#endif // ROCKSDB_USING_THREAD_STATUS
|
|
TestFlushListener* listener = new TestFlushListener(options.env);
|
|
options.listeners.emplace_back(listener);
|
|
std::vector<std::string> cf_names = {
|
|
"pikachu", "ilya", "muromec", "dobrynia",
|
|
"nikitich", "alyosha", "popovich"};
|
|
options.table_properties_collector_factories.push_back(
|
|
std::make_shared<TestPropertiesCollectorFactory>());
|
|
CreateAndReopenWithCF(cf_names, options);
|
|
|
|
ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
|
|
ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
|
|
ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
|
|
ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
|
|
ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
|
|
ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
|
|
ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
|
|
for (int i = 1; i < 8; ++i) {
|
|
ASSERT_OK(Flush(i));
|
|
dbfull()->TEST_WaitForFlushMemTable();
|
|
ASSERT_EQ(listener->flushed_dbs_.size(), i);
|
|
ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
|
|
}
|
|
|
|
// make sure call-back functions are called in the right order
|
|
for (size_t i = 0; i < cf_names.size(); ++i) {
|
|
ASSERT_EQ(listener->flushed_dbs_[i], db_);
|
|
ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
|
|
}
|
|
}
|
|
|
|
TEST_F(EventListenerTest, MultiCF) {
|
|
Options options;
|
|
options.write_buffer_size = k110KB;
|
|
#if ROCKSDB_USING_THREAD_STATUS
|
|
options.enable_thread_tracking = true;
|
|
#endif // ROCKSDB_USING_THREAD_STATUS
|
|
TestFlushListener* listener = new TestFlushListener(options.env);
|
|
options.listeners.emplace_back(listener);
|
|
options.table_properties_collector_factories.push_back(
|
|
std::make_shared<TestPropertiesCollectorFactory>());
|
|
std::vector<std::string> cf_names = {
|
|
"pikachu", "ilya", "muromec", "dobrynia",
|
|
"nikitich", "alyosha", "popovich"};
|
|
CreateAndReopenWithCF(cf_names, options);
|
|
|
|
ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
|
|
ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
|
|
ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
|
|
ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
|
|
ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
|
|
ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
|
|
ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
|
|
for (int i = 1; i < 8; ++i) {
|
|
ASSERT_OK(Flush(i));
|
|
ASSERT_EQ(listener->flushed_dbs_.size(), i);
|
|
ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
|
|
}
|
|
|
|
// make sure call-back functions are called in the right order
|
|
for (size_t i = 0; i < cf_names.size(); i++) {
|
|
ASSERT_EQ(listener->flushed_dbs_[i], db_);
|
|
ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
|
|
}
|
|
}
|
|
|
|
TEST_F(EventListenerTest, MultiDBMultiListeners) {
|
|
Options options;
|
|
#if ROCKSDB_USING_THREAD_STATUS
|
|
options.enable_thread_tracking = true;
|
|
#endif // ROCKSDB_USING_THREAD_STATUS
|
|
options.table_properties_collector_factories.push_back(
|
|
std::make_shared<TestPropertiesCollectorFactory>());
|
|
std::vector<TestFlushListener*> listeners;
|
|
const int kNumDBs = 5;
|
|
const int kNumListeners = 10;
|
|
for (int i = 0; i < kNumListeners; ++i) {
|
|
listeners.emplace_back(new TestFlushListener(options.env));
|
|
}
|
|
|
|
std::vector<std::string> cf_names = {
|
|
"pikachu", "ilya", "muromec", "dobrynia",
|
|
"nikitich", "alyosha", "popovich"};
|
|
|
|
options.create_if_missing = true;
|
|
for (int i = 0; i < kNumListeners; ++i) {
|
|
options.listeners.emplace_back(listeners[i]);
|
|
}
|
|
DBOptions db_opts(options);
|
|
ColumnFamilyOptions cf_opts(options);
|
|
|
|
std::vector<DB*> dbs;
|
|
std::vector<std::vector<ColumnFamilyHandle *>> vec_handles;
|
|
|
|
for (int d = 0; d < kNumDBs; ++d) {
|
|
ASSERT_OK(DestroyDB(dbname_ + ToString(d), options));
|
|
DB* db;
|
|
std::vector<ColumnFamilyHandle*> handles;
|
|
ASSERT_OK(DB::Open(options, dbname_ + ToString(d), &db));
|
|
for (size_t c = 0; c < cf_names.size(); ++c) {
|
|
ColumnFamilyHandle* handle;
|
|
db->CreateColumnFamily(cf_opts, cf_names[c], &handle);
|
|
handles.push_back(handle);
|
|
}
|
|
|
|
vec_handles.push_back(std::move(handles));
|
|
dbs.push_back(db);
|
|
}
|
|
|
|
for (int d = 0; d < kNumDBs; ++d) {
|
|
for (size_t c = 0; c < cf_names.size(); ++c) {
|
|
ASSERT_OK(dbs[d]->Put(WriteOptions(), vec_handles[d][c],
|
|
cf_names[c], cf_names[c]));
|
|
}
|
|
}
|
|
|
|
for (size_t c = 0; c < cf_names.size(); ++c) {
|
|
for (int d = 0; d < kNumDBs; ++d) {
|
|
ASSERT_OK(dbs[d]->Flush(FlushOptions(), vec_handles[d][c]));
|
|
reinterpret_cast<DBImpl*>(dbs[d])->TEST_WaitForFlushMemTable();
|
|
}
|
|
}
|
|
|
|
for (auto* listener : listeners) {
|
|
int pos = 0;
|
|
for (size_t c = 0; c < cf_names.size(); ++c) {
|
|
for (int d = 0; d < kNumDBs; ++d) {
|
|
ASSERT_EQ(listener->flushed_dbs_[pos], dbs[d]);
|
|
ASSERT_EQ(listener->flushed_column_family_names_[pos], cf_names[c]);
|
|
pos++;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
for (auto handles : vec_handles) {
|
|
for (auto h : handles) {
|
|
delete h;
|
|
}
|
|
handles.clear();
|
|
}
|
|
vec_handles.clear();
|
|
|
|
for (auto db : dbs) {
|
|
delete db;
|
|
}
|
|
}
|
|
|
|
TEST_F(EventListenerTest, DisableBGCompaction) {
|
|
Options options;
|
|
#if ROCKSDB_USING_THREAD_STATUS
|
|
options.enable_thread_tracking = true;
|
|
#endif // ROCKSDB_USING_THREAD_STATUS
|
|
TestFlushListener* listener = new TestFlushListener(options.env);
|
|
const int kCompactionTrigger = 1;
|
|
const int kSlowdownTrigger = 5;
|
|
const int kStopTrigger = 100;
|
|
options.level0_file_num_compaction_trigger = kCompactionTrigger;
|
|
options.level0_slowdown_writes_trigger = kSlowdownTrigger;
|
|
options.level0_stop_writes_trigger = kStopTrigger;
|
|
options.max_write_buffer_number = 10;
|
|
options.listeners.emplace_back(listener);
|
|
// BG compaction is disabled. Number of L0 files will simply keeps
|
|
// increasing in this test.
|
|
options.compaction_style = kCompactionStyleNone;
|
|
options.compression = kNoCompression;
|
|
options.write_buffer_size = 100000; // Small write buffer
|
|
options.table_properties_collector_factories.push_back(
|
|
std::make_shared<TestPropertiesCollectorFactory>());
|
|
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
ColumnFamilyMetaData cf_meta;
|
|
db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
|
|
|
|
// keep writing until writes are forced to stop.
|
|
for (int i = 0; static_cast<int>(cf_meta.file_count) < kSlowdownTrigger * 10;
|
|
++i) {
|
|
Put(1, ToString(i), std::string(10000, 'x'), WriteOptions());
|
|
db_->Flush(FlushOptions(), handles_[1]);
|
|
db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
|
|
}
|
|
ASSERT_GE(listener->slowdown_count, kSlowdownTrigger * 9);
|
|
}
|
|
|
|
} // namespace rocksdb
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
int main(int argc, char** argv) {
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|
|
|