Add unit test to verify that the dynamic priority can be passed from compaction to FS (#10088)

Summary:
**Summary:**
Add unit tests to verify that the dynamic priority can be passed from compaction to FS. Compaction reads&writes and other DB reads&writes share the same read&write paths to FSRandomAccessFile or FSWritableFile, so a MockTestFileSystem is added to replace the default filesystem from Env to intercept and verify the io_priority. To prepare the compaction input files, use the default filesystem from Env. To test the io priority of the compaction reads and writes, db_options_.fs is set as MockTestFileSystem.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10088

Test Plan: Add unit tests.

Reviewed By: anand1976

Differential Revision: D36882528

Pulled By: gitbw95

fbshipit-source-id: 120adc15801966f2b8c9fc45285f590a3fff96d1
main
gitbw95 3 years ago committed by Facebook GitHub Bot
parent b6de139df5
commit 5cbee1f609
  1. 270
      db/compaction/compaction_job_test.cc

@ -69,12 +69,138 @@ void VerifyInitializationOfCompactionJobStats(
#endif // !defined(IOS_CROSS_COMPILE) #endif // !defined(IOS_CROSS_COMPILE)
} }
// Mock FSWritableFile for testing io priority.
// Only override the essential functions for testing compaction io priority.
class MockTestWritableFile : public FSWritableFileOwnerWrapper {
public:
MockTestWritableFile(std::unique_ptr<FSWritableFile>&& file,
Env::IOPriority io_priority)
: FSWritableFileOwnerWrapper(std::move(file)),
write_io_priority_(io_priority) {}
IOStatus Append(const Slice& data, const IOOptions& options,
IODebugContext* dbg) override {
EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
return target()->Append(data, options, dbg);
}
IOStatus Append(const Slice& data, const IOOptions& options,
const DataVerificationInfo& verification_info,
IODebugContext* dbg) override {
EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
return target()->Append(data, options, verification_info, dbg);
}
IOStatus Close(const IOOptions& options, IODebugContext* dbg) override {
EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
return target()->Close(options, dbg);
}
IOStatus Flush(const IOOptions& options, IODebugContext* dbg) override {
EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
return target()->Flush(options, dbg);
}
IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override {
EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
return target()->Sync(options, dbg);
}
IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override {
EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
return target()->Fsync(options, dbg);
}
uint64_t GetFileSize(const IOOptions& options, IODebugContext* dbg) override {
EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
return target()->GetFileSize(options, dbg);
}
IOStatus RangeSync(uint64_t offset, uint64_t nbytes, const IOOptions& options,
IODebugContext* dbg) override {
EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
return target()->RangeSync(offset, nbytes, options, dbg);
}
void PrepareWrite(size_t offset, size_t len, const IOOptions& options,
IODebugContext* dbg) override {
EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
target()->PrepareWrite(offset, len, options, dbg);
}
IOStatus Allocate(uint64_t offset, uint64_t len, const IOOptions& options,
IODebugContext* dbg) override {
EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
return target()->Allocate(offset, len, options, dbg);
}
private:
Env::IOPriority write_io_priority_;
};
// Mock FSRandomAccessFile for testing io priority.
// Only override the essential functions for testing compaction io priority.
class MockTestRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
public:
MockTestRandomAccessFile(std::unique_ptr<FSRandomAccessFile>&& file,
Env::IOPriority io_priority)
: FSRandomAccessFileOwnerWrapper(std::move(file)),
read_io_priority_(io_priority) {}
IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) const override {
EXPECT_EQ(options.rate_limiter_priority, read_io_priority_);
return target()->Read(offset, n, options, result, scratch, dbg);
}
IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options,
IODebugContext* dbg) override {
EXPECT_EQ(options.rate_limiter_priority, read_io_priority_);
return target()->Prefetch(offset, n, options, dbg);
}
private:
Env::IOPriority read_io_priority_;
};
// Mock FileSystem for testing io priority.
class MockTestFileSystem : public FileSystemWrapper {
public:
explicit MockTestFileSystem(const std::shared_ptr<FileSystem>& base,
Env::IOPriority read_io_priority,
Env::IOPriority write_io_priority)
: FileSystemWrapper(base),
read_io_priority_(read_io_priority),
write_io_priority_(write_io_priority) {}
static const char* kClassName() { return "MockTestFileSystem"; }
const char* Name() const override { return kClassName(); }
IOStatus NewRandomAccessFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSRandomAccessFile>* result,
IODebugContext* dbg) override {
IOStatus s = target()->NewRandomAccessFile(fname, file_opts, result, dbg);
EXPECT_OK(s);
result->reset(
new MockTestRandomAccessFile(std::move(*result), read_io_priority_));
return s;
}
IOStatus NewWritableFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* dbg) override {
IOStatus s = target()->NewWritableFile(fname, file_opts, result, dbg);
EXPECT_OK(s);
result->reset(
new MockTestWritableFile(std::move(*result), write_io_priority_));
return s;
}
private:
Env::IOPriority read_io_priority_;
Env::IOPriority write_io_priority_;
};
} // namespace } // namespace
class CompactionJobTestBase : public testing::Test { class CompactionJobTestBase : public testing::Test {
protected: protected:
CompactionJobTestBase(std::string dbname, const Comparator* ucmp, CompactionJobTestBase(std::string dbname, const Comparator* ucmp,
std::function<std::string(uint64_t)> encode_u64_ts) std::function<std::string(uint64_t)> encode_u64_ts,
bool test_io_priority)
: dbname_(std::move(dbname)), : dbname_(std::move(dbname)),
ucmp_(ucmp), ucmp_(ucmp),
db_options_(), db_options_(),
@ -90,7 +216,8 @@ class CompactionJobTestBase : public testing::Test {
shutting_down_(false), shutting_down_(false),
mock_table_factory_(new mock::MockTableFactory()), mock_table_factory_(new mock::MockTableFactory()),
error_handler_(nullptr, db_options_, &mutex_), error_handler_(nullptr, db_options_, &mutex_),
encode_u64_ts_(std::move(encode_u64_ts)) { encode_u64_ts_(std::move(encode_u64_ts)),
test_io_priority_(test_io_priority) {
Env* base_env = Env::Default(); Env* base_env = Env::Default();
EXPECT_OK( EXPECT_OK(
test::CreateEnvFromSystem(ConfigOptions(), &base_env, &env_guard_)); test::CreateEnvFromSystem(ConfigOptions(), &base_env, &env_guard_));
@ -105,7 +232,12 @@ class CompactionJobTestBase : public testing::Test {
db_options_.db_paths.emplace_back(dbname_, db_options_.db_paths.emplace_back(dbname_,
std::numeric_limits<uint64_t>::max()); std::numeric_limits<uint64_t>::max());
cf_options_.comparator = ucmp_; cf_options_.comparator = ucmp_;
cf_options_.table_factory = mock_table_factory_; if (test_io_priority_) {
BlockBasedTableOptions table_options;
cf_options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
} else {
cf_options_.table_factory = mock_table_factory_;
}
} }
std::string GenerateFileName(uint64_t file_number) { std::string GenerateFileName(uint64_t file_number) {
@ -145,6 +277,33 @@ class CompactionJobTestBase : public testing::Test {
return blob_index; return blob_index;
} }
// Creates a table with the specificied key value pairs.
void CreateTable(const std::string& table_name,
const mock::KVVector& contents, uint64_t& file_size) {
std::unique_ptr<WritableFileWriter> file_writer;
Status s = WritableFileWriter::Create(fs_, table_name, FileOptions(),
&file_writer, nullptr);
ASSERT_OK(s);
std::unique_ptr<TableBuilder> table_builder(
cf_options_.table_factory->NewTableBuilder(
TableBuilderOptions(*cfd_->ioptions(), mutable_cf_options_,
cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(),
CompressionType::kNoCompression,
CompressionOptions(), 0 /* column_family_id */,
kDefaultColumnFamilyName, -1 /* level */),
file_writer.get()));
// Build table.
for (auto kv : contents) {
std::string key;
std::string value;
std::tie(key, value) = kv;
table_builder->Add(key, value);
}
ASSERT_OK(table_builder->Finish());
file_size = table_builder->FileSize();
}
void AddMockFile(const mock::KVVector& contents, int level = 0) { void AddMockFile(const mock::KVVector& contents, int level = 0) {
assert(contents.size() > 0); assert(contents.size() > 0);
@ -198,11 +357,18 @@ class CompactionJobTestBase : public testing::Test {
} }
uint64_t file_number = versions_->NewFileNumber(); uint64_t file_number = versions_->NewFileNumber();
EXPECT_OK(mock_table_factory_->CreateMockTable(
env_, GenerateFileName(file_number), std::move(contents))); uint64_t file_size;
if (test_io_priority_) {
CreateTable(GenerateFileName(file_number), contents, file_size);
} else {
file_size = 10;
EXPECT_OK(mock_table_factory_->CreateMockTable(
env_, GenerateFileName(file_number), std::move(contents)));
}
VersionEdit edit; VersionEdit edit;
edit.AddFile(level, file_number, 0, 10, smallest_key, largest_key, edit.AddFile(level, file_number, 0, file_size, smallest_key, largest_key,
smallest_seqno, largest_seqno, false, Temperature::kUnknown, smallest_seqno, largest_seqno, false, Temperature::kUnknown,
oldest_blob_file_number, kUnknownOldestAncesterTime, oldest_blob_file_number, kUnknownOldestAncesterTime,
kUnknownFileCreationTime, kUnknownFileChecksum, kUnknownFileCreationTime, kUnknownFileChecksum,
@ -323,7 +489,15 @@ class CompactionJobTestBase : public testing::Test {
SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber, SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber,
int output_level = 1, bool verify = true, int output_level = 1, bool verify = true,
uint64_t expected_oldest_blob_file_number = kInvalidBlobFileNumber, uint64_t expected_oldest_blob_file_number = kInvalidBlobFileNumber,
bool check_get_priority = false) { bool check_get_priority = false,
Env::IOPriority read_io_priority = Env::IO_TOTAL,
Env::IOPriority write_io_priority = Env::IO_TOTAL) {
// For compaction, set fs as MockTestFileSystem to check the io_priority.
if (test_io_priority_) {
db_options_.fs.reset(
new MockTestFileSystem(fs_, read_io_priority, write_io_priority));
}
auto cfd = versions_->GetColumnFamilySet()->GetDefault(); auto cfd = versions_->GetColumnFamilySet()->GetDefault();
size_t num_input_files = 0; size_t num_input_files = 0;
@ -445,15 +619,16 @@ class CompactionJobTestBase : public testing::Test {
ErrorHandler error_handler_; ErrorHandler error_handler_;
std::string full_history_ts_low_; std::string full_history_ts_low_;
const std::function<std::string(uint64_t)> encode_u64_ts_; const std::function<std::string(uint64_t)> encode_u64_ts_;
bool test_io_priority_;
}; };
// TODO(icanadi) Make it simpler once we mock out VersionSet // TODO(icanadi) Make it simpler once we mock out VersionSet
class CompactionJobTest : public CompactionJobTestBase { class CompactionJobTest : public CompactionJobTestBase {
public: public:
CompactionJobTest() CompactionJobTest()
: CompactionJobTestBase(test::PerThreadDBPath("compaction_job_test"), : CompactionJobTestBase(
BytewiseComparator(), test::PerThreadDBPath("compaction_job_test"), BytewiseComparator(),
[](uint64_t /*ts*/) { return ""; }) {} [](uint64_t /*ts*/) { return ""; }, false) {}
}; };
TEST_F(CompactionJobTest, Simple) { TEST_F(CompactionJobTest, Simple) {
@ -1343,23 +1518,13 @@ TEST_F(CompactionJobTest, ResultSerialization) {
} }
} }
TEST_F(CompactionJobTest, GetRateLimiterPriority) {
NewDB();
auto expected_results = CreateTwoFiles(false);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto files = cfd->current()->storage_info()->LevelFiles(0);
ASSERT_EQ(2U, files.size());
RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, true,
kInvalidBlobFileNumber, true);
}
class CompactionJobTimestampTest : public CompactionJobTestBase { class CompactionJobTimestampTest : public CompactionJobTestBase {
public: public:
CompactionJobTimestampTest() CompactionJobTimestampTest()
: CompactionJobTestBase(test::PerThreadDBPath("compaction_job_ts_test"), : CompactionJobTestBase(test::PerThreadDBPath("compaction_job_ts_test"),
test::BytewiseComparatorWithU64TsWrapper(), test::BytewiseComparatorWithU64TsWrapper(),
test::EncodeInt) {} test::EncodeInt, false) {}
}; };
TEST_F(CompactionJobTimestampTest, GCDisabled) { TEST_F(CompactionJobTimestampTest, GCDisabled) {
@ -1475,6 +1640,69 @@ TEST_F(CompactionJobTimestampTest, SomeKeysExpired) {
RunCompaction({files}, expected_results); RunCompaction({files}, expected_results);
} }
// The io priority of the compaction reads and writes are different from
// other DB reads and writes. To prepare the compaction input files, use the
// default filesystem from Env. To test the io priority of the compaction
// reads and writes, db_options_.fs is set as MockTestFileSystem.
class CompactionJobIOPriorityTest : public CompactionJobTestBase {
public:
CompactionJobIOPriorityTest()
: CompactionJobTestBase(
test::PerThreadDBPath("compaction_job_io_priority_test"),
BytewiseComparator(), [](uint64_t /*ts*/) { return ""; }, true) {}
};
TEST_F(CompactionJobIOPriorityTest, WriteControllerStateNormal) {
// When the state from WriteController is normal.
NewDB();
mock::KVVector expected_results = CreateTwoFiles(false);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto files = cfd->current()->storage_info()->LevelFiles(0);
ASSERT_EQ(2U, files.size());
RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false,
kInvalidBlobFileNumber, false, Env::IO_LOW, Env::IO_LOW);
}
TEST_F(CompactionJobIOPriorityTest, WriteControllerStateDelayed) {
// When the state from WriteController is Delayed.
NewDB();
mock::KVVector expected_results = CreateTwoFiles(false);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto files = cfd->current()->storage_info()->LevelFiles(0);
ASSERT_EQ(2U, files.size());
{
std::unique_ptr<WriteControllerToken> delay_token =
write_controller_.GetDelayToken(1000000);
RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false,
kInvalidBlobFileNumber, false, Env::IO_USER, Env::IO_USER);
}
}
TEST_F(CompactionJobIOPriorityTest, WriteControllerStateStalled) {
// When the state from WriteController is Stalled.
NewDB();
mock::KVVector expected_results = CreateTwoFiles(false);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto files = cfd->current()->storage_info()->LevelFiles(0);
ASSERT_EQ(2U, files.size());
{
std::unique_ptr<WriteControllerToken> stop_token =
write_controller_.GetStopToken();
RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false,
kInvalidBlobFileNumber, false, Env::IO_USER, Env::IO_USER);
}
}
TEST_F(CompactionJobIOPriorityTest, GetRateLimiterPriority) {
NewDB();
mock::KVVector expected_results = CreateTwoFiles(false);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto files = cfd->current()->storage_info()->LevelFiles(0);
ASSERT_EQ(2U, files.size());
RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false,
kInvalidBlobFileNumber, true, Env::IO_LOW, Env::IO_LOW);
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

Loading…
Cancel
Save