Include EventListener in stress test.

Summary: Include EventListener in stress test.

Test Plan: make blackbox_crash_test whitebox_crash_test

Reviewers: anthony, igor, rven, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D39105
main
Yueh-Hsuan Chiang 9 years ago
parent a3da590226
commit 9ffc8ba024
  1. 7
      include/rocksdb/listener.h
  2. 2
      include/rocksdb/options.h
  3. 112
      tools/db_stress.cc
  4. 12
      util/options.cc

@ -104,4 +104,11 @@ class EventListener {
} // namespace rocksdb
#else
namespace rocksdb {
class EventListener {
};
}
#endif // ROCKSDB_LITE

@ -1018,11 +1018,9 @@ struct DBOptions {
// Default: 0, turned off
uint64_t wal_bytes_per_sync;
#ifndef ROCKSDB_LITE
// A vector of EventListeners which call-back functions will be called
// when specific RocksDB event happens.
std::vector<std::shared_ptr<EventListener>> listeners;
#endif // ROCKSDB_LITE
// If true, then the status of the threads involved in this DB will
// be tracked and available via GetThreadList() API.

@ -30,10 +30,13 @@ int main() {
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <chrono>
#include <exception>
#include <thread>
#include <gflags/gflags.h>
#include "db/db_impl.h"
#include "db/version_set.h"
@ -777,6 +780,110 @@ struct ThreadState {
: tid(index), rand(1000 + index + _shared->GetSeed()), shared(_shared) {}
};
class DbStressListener : public EventListener {
public:
DbStressListener(
const std::string& db_name,
const std::vector<DbPath>& db_paths,
const std::vector<ColumnFamilyDescriptor>& cf_descs) :
db_name_(db_name),
db_paths_(db_paths),
cf_descs_(cf_descs),
rand_(301) {}
virtual ~DbStressListener() {}
#ifndef ROCKSDB_LITE
virtual void OnFlushCompleted(
DB* db, const std::string& column_family_name,
const std::string& file_path,
bool triggered_writes_slowdown,
bool triggered_writes_stop) override {
assert(db);
assert(db->GetName() == db_name_);
assert(IsValidColumnFamilyName(column_family_name));
VerifyFilePath(file_path);
// pretending doing some work here
std::this_thread::sleep_for(
std::chrono::microseconds(rand_.Uniform(5000)));
}
virtual void OnCompactionCompleted(
DB *db, const CompactionJobInfo& ci) {
assert(db);
assert(db->GetName() == db_name_);
assert(IsValidColumnFamilyName(ci.cf_name));
assert(ci.input_files.size() + ci.output_files.size() > 0U);
for (const auto& file_path : ci.input_files) {
VerifyFilePath(file_path);
}
for (const auto& file_path : ci.output_files) {
VerifyFilePath(file_path);
}
// pretending doing some work here
std::this_thread::sleep_for(
std::chrono::microseconds(rand_.Uniform(5000)));
}
protected:
bool IsValidColumnFamilyName(const std::string& cf_name) const {
if (cf_name == kDefaultColumnFamilyName) {
return true;
}
for (const auto& cf_desc : cf_descs_) {
if (cf_desc.name == cf_name) {
return true;
}
}
fprintf(stderr,
"Unable to find the matched column family name "
"for CF: %s. Existing CF names are:\n",
cf_name.c_str());
for (const auto& cf_desc : cf_descs_) {
fprintf(stderr, " %s\n", cf_desc.name.c_str());
}
fflush(stderr);
return false;
}
void VerifyFileDir(const std::string& file_dir) {
if (db_name_ == file_dir) {
return;
}
for (const auto& db_path : db_paths_) {
if (db_path.path == file_dir) {
return;
}
}
assert(false);
}
void VerifyFileName(const std::string& file_name) {
uint64_t file_number;
FileType file_type;
bool result = ParseFileName(file_name, &file_number, &file_type);
assert(result);
assert(file_type == kTableFile);
}
void VerifyFilePath(const std::string& file_path) {
size_t pos = file_path.find_last_of("/");
if (pos == std::string::npos) {
VerifyFileName(file_path);
} else {
if (pos > 0) {
VerifyFileDir(file_path.substr(0, pos));
}
VerifyFileName(file_path.substr(pos));
}
}
#endif // !ROCKSDB_LITE
private:
std::string db_name_;
std::vector<DbPath> db_paths_;
std::vector<ColumnFamilyDescriptor> cf_descs_;
Random rand_;
};
} // namespace
class StressTest {
@ -1913,6 +2020,9 @@ class StressTest {
cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
column_family_names_.push_back(name);
}
options_.listeners.clear();
options_.listeners.emplace_back(
new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors));
options_.create_missing_column_families = true;
s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
&column_families_, &db_);

@ -70,15 +70,9 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options)
options.level_compaction_dynamic_level_bytes),
access_hint_on_compaction_start(options.access_hint_on_compaction_start),
num_levels(options.num_levels),
optimize_filters_for_hits(options.optimize_filters_for_hits)
#ifndef ROCKSDB_LITE
,
optimize_filters_for_hits(options.optimize_filters_for_hits),
listeners(options.listeners) {
}
#else // ROCKSDB_LITE
{
}
#endif // ROCKSDB_LITE
ColumnFamilyOptions::ColumnFamilyOptions()
: comparator(BytewiseComparator()),
@ -247,9 +241,7 @@ DBOptions::DBOptions()
use_adaptive_mutex(false),
bytes_per_sync(0),
wal_bytes_per_sync(0),
#ifndef ROCKSDB_LITE
listeners(),
#endif
enable_thread_tracking(false) {
}
@ -294,9 +286,7 @@ DBOptions::DBOptions(const Options& options)
use_adaptive_mutex(options.use_adaptive_mutex),
bytes_per_sync(options.bytes_per_sync),
wal_bytes_per_sync(options.wal_bytes_per_sync),
#ifndef ROCKSDB_LITE
listeners(options.listeners),
#endif
enable_thread_tracking(options.enable_thread_tracking) {}
static const char* const access_hints[] = {

Loading…
Cancel
Save