Fix a bug by setting up subcompaction bounds properly (#10658)

Summary:
When user-defined timestamp is enabled, subcompaction bounds should be set up properly. When creating InputIterator for the compaction, the `start` and `end` should have their timestamp portions set to kMaxTimestamp, which is the highest possible timestamp. This is similar to what we do with setting up their sequence numbers to `kMaxSequenceNumber`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10658

Test Plan:
```bash
make check
rm -rf /dev/shm/rocksdb/* && mkdir
/dev/shm/rocksdb/rocksdb_crashtest_expected && ./db_stress
--allow_data_in_errors=True --clear_column_family_one_in=0
--continuous_verification_interval=0 --data_block_index_type=1
--db=/dev/shm/rocksdb//rocksdb_crashtest_blackbox --delpercent=5
--delrangepercent=0
--expected_values_dir=/dev/shm/rocksdb//rocksdb_crashtest_expected
--iterpercent=0 --max_background_compactions=20
--max_bytes_for_level_base=10485760 --max_key=25000000
--max_write_batch_group_size_bytes=1048576 --nooverwritepercent=1
--ops_per_thread=300000 --paranoid_file_checks=1 --partition_filters=0
--prefix_size=8 --prefixpercent=5 --readpercent=30 --reopen=0
--snapshot_hold_ops=100000 --subcompactions=4
--target_file_size_base=65536 --target_file_size_multiplier=2
--test_batches_snapshots=0 --test_cf_consistency=0 --use_multiget=1
--user_timestamp_size=8 --value_size_mult=32 --verify_checksum=1
--write_buffer_size=65536 --writepercent=60 -disable_wal=1
-column_families=1
```

Reviewed By: akankshamahajan15

Differential Revision: D39393402

Pulled By: riversand963

fbshipit-source-id: f276e35b19fce51a175c368a502fb0718d1f3871
main
Yanqin Jin 2 years ago committed by Facebook GitHub Bot
parent be04a3b6cd
commit ce2c11d848
  1. 31
      db/compaction/compaction_job.cc
  2. 444
      db/compaction/compaction_job_test.cc
  3. 8
      db/version_set.cc

@ -254,7 +254,7 @@ void CompactionJob::Prepare() {
// timestamp part).
assert(i == 0 || i == boundaries_.size() ||
cfd->user_comparator()->CompareWithoutTimestamp(
boundaries_[i - 1], true, boundaries_[i], true) < 0);
boundaries_[i - 1], boundaries_[i]) < 0);
}
RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
compact_->sub_compact_states.size());
@ -486,8 +486,8 @@ void CompactionJob::GenSubcompactionBoundaries() {
std::sort(
all_anchors.begin(), all_anchors.end(),
[cfd_comparator](TableReader::Anchor& a, TableReader::Anchor& b) -> bool {
return cfd_comparator->CompareWithoutTimestamp(a.user_key, true,
b.user_key, true) < 0;
return cfd_comparator->CompareWithoutTimestamp(a.user_key, b.user_key) <
0;
});
// Remove duplicated entries from boundaries.
@ -496,7 +496,7 @@ void CompactionJob::GenSubcompactionBoundaries() {
[cfd_comparator](TableReader::Anchor& a,
TableReader::Anchor& b) -> bool {
return cfd_comparator->CompareWithoutTimestamp(
a.user_key, true, b.user_key, true) == 0;
a.user_key, b.user_key) == 0;
}),
all_anchors.end());
@ -1085,13 +1085,34 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
Slice start_slice;
Slice end_slice;
static constexpr char kMaxTs[] =
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff";
Slice ts_slice;
std::string max_ts;
if (ts_sz > 0) {
if (ts_sz <= strlen(kMaxTs)) {
ts_slice = Slice(kMaxTs, ts_sz);
} else {
max_ts = std::string(ts_sz, '\xff');
ts_slice = Slice(max_ts);
}
}
if (start.has_value()) {
start_ikey.SetInternalKey(start.value(), kMaxSequenceNumber,
kValueTypeForSeek);
if (ts_sz > 0) {
start_ikey.UpdateInternalKey(kMaxSequenceNumber, kValueTypeForSeek,
&ts_slice);
}
start_slice = start_ikey.GetInternalKey();
}
if (end.has_value()) {
end_ikey.SetInternalKey(end.value(), kMaxSequenceNumber, kValueTypeForSeek);
if (ts_sz > 0) {
end_ikey.UpdateInternalKey(kMaxSequenceNumber, kValueTypeForSeek,
&ts_slice);
}
end_slice = end_ikey.GetInternalKey();
}
@ -1112,7 +1133,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
}
std::unique_ptr<InternalIterator> trim_history_iter;
if (cfd->user_comparator()->timestamp_size() > 0 && !trim_ts_.empty()) {
if (ts_sz > 0 && !trim_ts_.empty()) {
trim_history_iter = std::make_unique<HistoryTrimmingIterator>(
input, cfd->user_comparator(), trim_ts_);
input = trim_history_iter.get();

@ -19,7 +19,9 @@
#include "db/db_impl/db_impl.h"
#include "db/error_handler.h"
#include "db/version_set.h"
#include "file/random_access_file_reader.h"
#include "file/writable_file_writer.h"
#include "options/options_helper.h"
#include "rocksdb/cache.h"
#include "rocksdb/convenience.h"
#include "rocksdb/db.h"
@ -194,13 +196,15 @@ class MockTestFileSystem : public FileSystemWrapper {
Env::IOPriority write_io_priority_;
};
enum TableTypeForTest : uint8_t { kMockTable = 0, kBlockBasedTable = 1 };
} // namespace
class CompactionJobTestBase : public testing::Test {
protected:
CompactionJobTestBase(std::string dbname, const Comparator* ucmp,
std::function<std::string(uint64_t)> encode_u64_ts,
bool test_io_priority)
bool test_io_priority, TableTypeForTest table_type)
: dbname_(std::move(dbname)),
ucmp_(ucmp),
db_options_(),
@ -217,7 +221,8 @@ class CompactionJobTestBase : public testing::Test {
mock_table_factory_(new mock::MockTableFactory()),
error_handler_(nullptr, db_options_, &mutex_),
encode_u64_ts_(std::move(encode_u64_ts)),
test_io_priority_(test_io_priority) {
test_io_priority_(test_io_priority),
table_type_(table_type) {
Env* base_env = Env::Default();
EXPECT_OK(
test::CreateEnvFromSystem(ConfigOptions(), &base_env, &env_guard_));
@ -232,11 +237,13 @@ class CompactionJobTestBase : public testing::Test {
db_options_.db_paths.emplace_back(dbname_,
std::numeric_limits<uint64_t>::max());
cf_options_.comparator = ucmp_;
if (test_io_priority_) {
if (table_type_ == TableTypeForTest::kBlockBasedTable) {
BlockBasedTableOptions table_options;
cf_options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
} else {
} else if (table_type_ == TableTypeForTest::kMockTable) {
cf_options_.table_factory = mock_table_factory_;
} else {
assert(false);
}
}
@ -358,13 +365,15 @@ class CompactionJobTestBase : public testing::Test {
uint64_t file_number = versions_->NewFileNumber();
uint64_t file_size;
if (test_io_priority_) {
uint64_t file_size = 0;
if (table_type_ == TableTypeForTest::kBlockBasedTable) {
CreateTable(GenerateFileName(file_number), contents, file_size);
} else {
} else if (table_type_ == TableTypeForTest::kMockTable) {
file_size = 10;
EXPECT_OK(mock_table_factory_->CreateMockTable(
env_, GenerateFileName(file_number), std::move(contents)));
} else {
assert(false);
}
VersionEdit edit;
@ -381,6 +390,83 @@ class CompactionJobTestBase : public testing::Test {
mutex_.Unlock();
}
void VerifyTables(int output_level,
const std::vector<mock::KVVector>& expected_results,
std::vector<uint64_t> expected_oldest_blob_file_numbers) {
if (expected_results.empty()) {
ASSERT_EQ(compaction_job_stats_.num_output_files, 0U);
return;
}
int expected_output_file_num = 0;
for (const auto& e : expected_results) {
if (!e.empty()) {
++expected_output_file_num;
}
}
ASSERT_EQ(expected_output_file_num, compaction_job_stats_.num_output_files);
if (expected_output_file_num == 0) {
return;
}
if (expected_oldest_blob_file_numbers.empty()) {
expected_oldest_blob_file_numbers.resize(expected_output_file_num,
kInvalidBlobFileNumber);
}
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
if (table_type_ == TableTypeForTest::kMockTable) {
assert(expected_results.size() == 1);
mock_table_factory_->AssertLatestFile(expected_results[0]);
} else {
assert(table_type_ == TableTypeForTest::kBlockBasedTable);
}
auto output_files =
cfd->current()->storage_info()->LevelFiles(output_level);
ASSERT_EQ(expected_output_file_num, output_files.size());
if (table_type_ == TableTypeForTest::kMockTable) {
assert(output_files.size() == 1);
const FileMetaData* const output_file = output_files[0];
ASSERT_EQ(output_file->oldest_blob_file_number,
expected_oldest_blob_file_numbers[0]);
return;
}
for (size_t i = 0; i < expected_results.size(); ++i) {
const FileMetaData* const output_file = output_files[i];
std::string file_name = GenerateFileName(output_file->fd.GetNumber());
const auto& fs = env_->GetFileSystem();
std::unique_ptr<RandomAccessFileReader> freader;
IOStatus ios = RandomAccessFileReader::Create(
fs, file_name, FileOptions(), &freader, nullptr);
ASSERT_OK(ios);
std::unique_ptr<TableReader> table_reader;
uint64_t file_size = output_file->fd.GetFileSize();
ReadOptions read_opts;
Status s = cf_options_.table_factory->NewTableReader(
read_opts,
TableReaderOptions(*cfd->ioptions(), nullptr, FileOptions(),
cfd_->internal_comparator()),
std::move(freader), file_size, &table_reader, false);
ASSERT_OK(s);
assert(table_reader);
std::unique_ptr<InternalIterator> iiter(
table_reader->NewIterator(read_opts, nullptr, nullptr, true,
TableReaderCaller::kUncategorized));
assert(iiter);
mock::KVVector from_db;
for (iiter->SeekToFirst(); iiter->Valid(); iiter->Next()) {
const Slice key = iiter->key();
const Slice value = iiter->value();
from_db.emplace_back(
make_pair(key.ToString(false), value.ToString(false)));
}
ASSERT_EQ(expected_results[i], from_db);
}
}
void SetLastSequence(const SequenceNumber sequence_number) {
versions_->SetLastAllocatedSequence(sequence_number + 1);
versions_->SetLastPublishedSequence(sequence_number + 1);
@ -439,6 +525,13 @@ class CompactionJobTestBase : public testing::Test {
void NewDB() {
EXPECT_OK(DestroyDB(dbname_, Options()));
EXPECT_OK(env_->CreateDirIfMissing(dbname_));
std::shared_ptr<Logger> info_log;
DBOptions db_opts = BuildDBOptions(db_options_, mutable_db_options_);
Status s = CreateLoggerFromOptions(dbname_, db_opts, &info_log);
ASSERT_OK(s);
db_options_.info_log = info_log;
versions_.reset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
@ -455,9 +548,9 @@ class CompactionJobTestBase : public testing::Test {
const std::string manifest = DescriptorFileName(dbname_, 1);
std::unique_ptr<WritableFileWriter> file_writer;
const auto& fs = env_->GetFileSystem();
Status s = WritableFileWriter::Create(
fs, manifest, fs->OptimizeForManifestWrite(env_options_), &file_writer,
nullptr);
s = WritableFileWriter::Create(fs, manifest,
fs->OptimizeForManifestWrite(env_options_),
&file_writer, nullptr);
ASSERT_OK(s);
{
@ -481,27 +574,32 @@ class CompactionJobTestBase : public testing::Test {
cfd_ = versions_->GetColumnFamilySet()->GetDefault();
}
// input_files[i] on input_levels[i]
void RunLastLevelCompaction(
const std::vector<std::vector<FileMetaData*>>& input_files,
const std::vector<int> input_levels,
std::function<void(Compaction& comp)>&& verify_func,
const std::vector<SequenceNumber>& snapshots = {}) {
const int kLastLevel = cf_options_.num_levels - 1;
verify_per_key_placement_ = std::move(verify_func);
mock::KVVector empty_map;
RunCompaction(input_files, empty_map, snapshots, kMaxSequenceNumber,
kLastLevel, false);
RunCompaction(input_files, input_levels, {empty_map}, snapshots,
kMaxSequenceNumber, kLastLevel, false);
}
// input_files[i] on input_levels[i]
void RunCompaction(
const std::vector<std::vector<FileMetaData*>>& input_files,
const mock::KVVector& expected_results,
const std::vector<int>& input_levels,
const std::vector<mock::KVVector>& expected_results,
const std::vector<SequenceNumber>& snapshots = {},
SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber,
int output_level = 1, bool verify = true,
uint64_t expected_oldest_blob_file_number = kInvalidBlobFileNumber,
std::vector<uint64_t> expected_oldest_blob_file_numbers = {},
bool check_get_priority = false,
Env::IOPriority read_io_priority = Env::IO_TOTAL,
Env::IOPriority write_io_priority = Env::IO_TOTAL) {
Env::IOPriority write_io_priority = Env::IO_TOTAL,
int max_subcompactions = 0) {
// For compaction, set fs as MockTestFileSystem to check the io_priority.
if (test_io_priority_) {
db_options_.fs.reset(
@ -512,10 +610,10 @@ class CompactionJobTestBase : public testing::Test {
size_t num_input_files = 0;
std::vector<CompactionInputFiles> compaction_input_files;
for (size_t level = 0; level < input_files.size(); level++) {
auto level_files = input_files[level];
for (size_t i = 0; i < input_files.size(); ++i) {
auto level_files = input_files[i];
CompactionInputFiles compaction_level;
compaction_level.level = static_cast<int>(level);
compaction_level.level = input_levels[i];
compaction_level.files.insert(compaction_level.files.end(),
level_files.begin(), level_files.end());
compaction_input_files.push_back(compaction_level);
@ -527,9 +625,10 @@ class CompactionJobTestBase : public testing::Test {
*cfd->GetLatestMutableCFOptions(), mutable_db_options_,
compaction_input_files, output_level, 1024 * 1024, 10 * 1024 * 1024, 0,
kNoCompression, cfd->GetLatestMutableCFOptions()->compression_opts,
Temperature::kUnknown, 0, {}, true);
Temperature::kUnknown, max_subcompactions, {}, true);
compaction.SetInputVersion(cfd->current());
assert(db_options_.info_log);
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
mutex_.Lock();
EventLogger event_logger(db_options_.info_log.get());
@ -559,23 +658,14 @@ class CompactionJobTestBase : public testing::Test {
ASSERT_OK(compaction_job.Install(*cfd->GetLatestMutableCFOptions()));
ASSERT_OK(compaction_job.io_status());
mutex_.Unlock();
log_buffer.FlushBufferToLog();
if (verify) {
ASSERT_GE(compaction_job_stats_.elapsed_micros, 0U);
ASSERT_EQ(compaction_job_stats_.num_input_files, num_input_files);
if (expected_results.empty()) {
ASSERT_EQ(compaction_job_stats_.num_output_files, 0U);
} else {
ASSERT_EQ(compaction_job_stats_.num_output_files, 1U);
mock_table_factory_->AssertLatestFile(expected_results);
auto output_files =
cfd->current()->storage_info()->LevelFiles(output_level);
ASSERT_EQ(output_files.size(), 1);
ASSERT_EQ(output_files[0]->oldest_blob_file_number,
expected_oldest_blob_file_number);
}
VerifyTables(output_level, expected_results,
expected_oldest_blob_file_numbers);
}
if (check_get_priority) {
@ -635,8 +725,9 @@ class CompactionJobTestBase : public testing::Test {
ErrorHandler error_handler_;
std::string full_history_ts_low_;
const std::function<std::string(uint64_t)> encode_u64_ts_;
bool test_io_priority_;
const bool test_io_priority_;
std::function<void(Compaction& comp)> verify_per_key_placement_;
const TableTypeForTest table_type_ = kMockTable;
};
// TODO(icanadi) Make it simpler once we mock out VersionSet
@ -645,7 +736,8 @@ class CompactionJobTest : public CompactionJobTestBase {
CompactionJobTest()
: CompactionJobTestBase(
test::PerThreadDBPath("compaction_job_test"), BytewiseComparator(),
[](uint64_t /*ts*/) { return ""; }, false) {}
[](uint64_t /*ts*/) { return ""; }, /*test_io_priority=*/false,
TableTypeForTest::kMockTable) {}
};
TEST_F(CompactionJobTest, Simple) {
@ -653,9 +745,10 @@ TEST_F(CompactionJobTest, Simple) {
auto expected_results = CreateTwoFiles(false);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto files = cfd->current()->storage_info()->LevelFiles(0);
constexpr int input_level = 0;
auto files = cfd->current()->storage_info()->LevelFiles(input_level);
ASSERT_EQ(2U, files.size());
RunCompaction({ files }, expected_results);
RunCompaction({files}, {input_level}, {expected_results});
}
TEST_F(CompactionJobTest, DISABLED_SimpleCorrupted) {
@ -663,8 +756,9 @@ TEST_F(CompactionJobTest, DISABLED_SimpleCorrupted) {
auto expected_results = CreateTwoFiles(true);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto files = cfd->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results);
constexpr int input_level = 0;
auto files = cfd->current()->storage_info()->LevelFiles(input_level);
RunCompaction({files}, {input_level}, {expected_results});
ASSERT_EQ(compaction_job_stats_.num_corrupt_keys, 400U);
}
@ -683,8 +777,9 @@ TEST_F(CompactionJobTest, SimpleDeletion) {
mock::MakeMockFile({{KeyStr("b", 0U, kTypeValue), "val"}});
SetLastSequence(4U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results);
constexpr int input_level = 0;
auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
RunCompaction({files}, {input_level}, {expected_results});
}
TEST_F(CompactionJobTest, OutputNothing) {
@ -701,8 +796,10 @@ TEST_F(CompactionJobTest, OutputNothing) {
auto expected_results = mock::MakeMockFile();
SetLastSequence(4U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results);
constexpr int input_level = 0;
auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
RunCompaction({files}, {input_level}, {expected_results});
}
TEST_F(CompactionJobTest, SimpleOverwrite) {
@ -723,8 +820,9 @@ TEST_F(CompactionJobTest, SimpleOverwrite) {
{KeyStr("b", 0U, kTypeValue), "val3"}});
SetLastSequence(4U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results);
constexpr int input_level = 0;
auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
RunCompaction({files}, {input_level}, {expected_results});
}
TEST_F(CompactionJobTest, SimpleNonLastLevel) {
@ -751,9 +849,12 @@ TEST_F(CompactionJobTest, SimpleNonLastLevel) {
{KeyStr("b", 6U, kTypeValue), "val3"}});
SetLastSequence(6U);
auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0);
auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1);
RunCompaction({lvl0_files, lvl1_files}, expected_results);
const std::vector<int> input_levels = {0, 1};
auto lvl0_files =
cfd_->current()->storage_info()->LevelFiles(input_levels[0]);
auto lvl1_files =
cfd_->current()->storage_info()->LevelFiles(input_levels[1]);
RunCompaction({lvl0_files, lvl1_files}, input_levels, {expected_results});
}
TEST_F(CompactionJobTest, SimpleMerge) {
@ -776,8 +877,9 @@ TEST_F(CompactionJobTest, SimpleMerge) {
{KeyStr("b", 0U, kTypeValue), "1,2"}});
SetLastSequence(5U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results);
constexpr int input_level = 0;
auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
RunCompaction({files}, {input_level}, {expected_results});
}
TEST_F(CompactionJobTest, NonAssocMerge) {
@ -800,8 +902,9 @@ TEST_F(CompactionJobTest, NonAssocMerge) {
{KeyStr("b", 0U, kTypeValue), "1,2"}});
SetLastSequence(5U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results);
constexpr int input_level = 0;
auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
RunCompaction({files}, {input_level}, {expected_results});
}
// Filters merge operands with value 10.
@ -827,8 +930,9 @@ TEST_F(CompactionJobTest, MergeOperandFilter) {
{KeyStr("b", 0U, kTypeValue), test::EncodeInt(2U)}});
SetLastSequence(5U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results);
constexpr int input_level = 0;
auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
RunCompaction({files}, {input_level}, {expected_results});
}
TEST_F(CompactionJobTest, FilterSomeMergeOperands) {
@ -863,8 +967,9 @@ TEST_F(CompactionJobTest, FilterSomeMergeOperands) {
});
SetLastSequence(5U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results);
constexpr int input_level = 0;
auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
RunCompaction({files}, {input_level}, {expected_results});
}
// Test where all operands/merge results are filtered out.
@ -897,10 +1002,11 @@ TEST_F(CompactionJobTest, FilterAllMergeOperands) {
AddMockFile(file3, 2);
SetLastSequence(11U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
constexpr int input_level = 0;
auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
mock::KVVector empty_map;
RunCompaction({files}, empty_map);
RunCompaction({files}, {input_level}, {empty_map});
}
TEST_F(CompactionJobTest, SimpleSingleDelete) {
@ -925,8 +1031,9 @@ TEST_F(CompactionJobTest, SimpleSingleDelete) {
mock::MakeMockFile({{KeyStr("a", 5U, kTypeDeletion), ""}});
SetLastSequence(6U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results);
constexpr int input_level = 0;
auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
RunCompaction({files}, {input_level}, {expected_results});
}
TEST_F(CompactionJobTest, SingleDeleteSnapshots) {
@ -990,8 +1097,9 @@ TEST_F(CompactionJobTest, SingleDeleteSnapshots) {
});
SetLastSequence(22U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results, {10U, 20U}, 10U);
constexpr int input_level = 0;
auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
RunCompaction({files}, {input_level}, {expected_results}, {10U, 20U}, 10U);
}
TEST_F(CompactionJobTest, EarliestWriteConflictSnapshot) {
@ -1068,8 +1176,10 @@ TEST_F(CompactionJobTest, EarliestWriteConflictSnapshot) {
});
SetLastSequence(24U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results, {10U, 20U, 30U}, 20U);
constexpr int input_level = 0;
auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
RunCompaction({files}, {input_level}, {expected_results}, {10U, 20U, 30U},
20U);
}
TEST_F(CompactionJobTest, SingleDeleteZeroSeq) {
@ -1091,8 +1201,9 @@ TEST_F(CompactionJobTest, SingleDeleteZeroSeq) {
});
SetLastSequence(22U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results, {});
constexpr int input_level = 0;
auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
RunCompaction({files}, {input_level}, {expected_results}, {});
}
TEST_F(CompactionJobTest, MultiSingleDelete) {
@ -1246,8 +1357,9 @@ TEST_F(CompactionJobTest, MultiSingleDelete) {
{KeyStr("M", 3U, kTypeSingleDeletion), ""}});
SetLastSequence(22U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results, {10U}, 10U);
constexpr int input_level = 0;
auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
RunCompaction({files}, {input_level}, {expected_results}, {10U}, 10U);
}
// This test documents the behavior where a corrupt key follows a deletion or a
@ -1276,8 +1388,9 @@ TEST_F(CompactionJobTest, DISABLED_CorruptionAfterDeletion) {
{test::KeyStr("c", 0U, kTypeValue), "val2"}});
SetLastSequence(6U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results);
constexpr int input_level = 0;
auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
RunCompaction({files}, {input_level}, {expected_results});
}
TEST_F(CompactionJobTest, OldestBlobFileNumber) {
@ -1322,10 +1435,12 @@ TEST_F(CompactionJobTest, OldestBlobFileNumber) {
expected_blob4, expected_blob5, expected_blob6});
SetLastSequence(6U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results, std::vector<SequenceNumber>(),
kMaxSequenceNumber, /* output_level */ 1, /* verify */ true,
/* expected_oldest_blob_file_number */ 19);
constexpr int input_level = 0;
auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
RunCompaction({files}, {input_level}, {expected_results},
std::vector<SequenceNumber>(), kMaxSequenceNumber,
/* output_level */ 1, /* verify */ true,
/* expected_oldest_blob_file_numbers */ {19});
}
TEST_F(CompactionJobTest, VerifyPenultimateLevelOutput) {
@ -1377,13 +1492,15 @@ TEST_F(CompactionJobTest, VerifyPenultimateLevelOutput) {
AddMockFile(file3_2, 3);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto files0 = cfd->current()->storage_info()->LevelFiles(0);
auto files1 = cfd->current()->storage_info()->LevelFiles(1);
auto files2 = cfd->current()->storage_info()->LevelFiles(2);
auto files3 = cfd->current()->storage_info()->LevelFiles(3);
const std::vector<int> input_levels = {0, 1, 2, 3};
auto files0 = cfd->current()->storage_info()->LevelFiles(input_levels[0]);
auto files1 = cfd->current()->storage_info()->LevelFiles(input_levels[1]);
auto files2 = cfd->current()->storage_info()->LevelFiles(input_levels[2]);
auto files3 = cfd->current()->storage_info()->LevelFiles(input_levels[3]);
RunLastLevelCompaction(
{files0, files1, files2, files3}, /*verify_func=*/[&](Compaction& comp) {
{files0, files1, files2, files3}, input_levels,
/*verify_func=*/[&](Compaction& comp) {
for (char c = 'a'; c <= 'z'; c++) {
std::string c_str;
c_str = c;
@ -1408,8 +1525,9 @@ TEST_F(CompactionJobTest, NoEnforceSingleDeleteContract) {
SetLastSequence(4U);
auto expected_results = mock::MakeMockFile();
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results);
constexpr int input_level = 0;
auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
RunCompaction({files}, {input_level}, {expected_results});
}
TEST_F(CompactionJobTest, InputSerialization) {
@ -1609,7 +1727,8 @@ class CompactionJobTimestampTest : public CompactionJobTestBase {
CompactionJobTimestampTest()
: CompactionJobTestBase(test::PerThreadDBPath("compaction_job_ts_test"),
test::BytewiseComparatorWithU64TsWrapper(),
test::EncodeInt, false) {}
test::EncodeInt, /*test_io_priority=*/false,
TableTypeForTest::kMockTable) {}
};
TEST_F(CompactionJobTimestampTest, GCDisabled) {
@ -1641,8 +1760,9 @@ TEST_F(CompactionJobTimestampTest, GCDisabled) {
{KeyStr("c", 4, ValueType::kTypeValue, 94), "c5"},
{KeyStr("d", 7, ValueType::kTypeValue, 97), "d7"},
{KeyStr("d", 3, ValueType::kTypeSingleDeletion, 93), ""}});
const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results);
constexpr int input_level = 0;
const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
RunCompaction({files}, {input_level}, {expected_results});
}
TEST_F(CompactionJobTimestampTest, NoKeyExpired) {
@ -1667,10 +1787,11 @@ TEST_F(CompactionJobTimestampTest, NoKeyExpired) {
{KeyStr("b", 7, ValueType::kTypeValue, 101), "b7"},
{KeyStr("c", 5, ValueType::kTypeValue, 99), "c5"},
{KeyStr("c", 3, ValueType::kTypeValue, 97), "c3"}});
const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
constexpr int input_level = 0;
const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
full_history_ts_low_ = encode_u64_ts_(0);
RunCompaction({files}, expected_results);
RunCompaction({files}, {input_level}, {expected_results});
}
TEST_F(CompactionJobTimestampTest, AllKeysExpired) {
@ -1693,10 +1814,11 @@ TEST_F(CompactionJobTimestampTest, AllKeysExpired) {
auto expected_results =
mock::MakeMockFile({{KeyStr("c", 0, ValueType::kTypeValue, 0), "c7"}});
const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
constexpr int input_level = 0;
const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
full_history_ts_low_ = encode_u64_ts_(std::numeric_limits<uint64_t>::max());
RunCompaction({files}, expected_results);
RunCompaction({files}, {input_level}, {expected_results});
}
TEST_F(CompactionJobTimestampTest, SomeKeysExpired) {
@ -1719,10 +1841,121 @@ TEST_F(CompactionJobTimestampTest, SomeKeysExpired) {
mock::MakeMockFile({{KeyStr("a", 5, ValueType::kTypeValue, 50), "a5"},
{KeyStr("a", 0, ValueType::kTypeValue, 0), "a3"},
{KeyStr("b", 6, ValueType::kTypeValue, 49), "b6"}});
const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
constexpr int input_level = 0;
const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
full_history_ts_low_ = encode_u64_ts_(49);
RunCompaction({files}, expected_results);
RunCompaction({files}, {input_level}, {expected_results});
}
class CompactionJobTimestampTestWithBbTable : public CompactionJobTestBase {
public:
// Block-based table is needed if we want to test subcompaction partitioning
// with anchors.
explicit CompactionJobTimestampTestWithBbTable()
: CompactionJobTestBase(
test::PerThreadDBPath("compaction_job_ts_bbt_test"),
test::BytewiseComparatorWithU64TsWrapper(), test::EncodeInt,
/*test_io_priority=*/false, TableTypeForTest::kBlockBasedTable) {}
};
TEST_F(CompactionJobTimestampTestWithBbTable, SubcompactionAnchorL1) {
cf_options_.target_file_size_base = 20;
mutable_cf_options_.target_file_size_base = 20;
NewDB();
const std::vector<std::string> keys = {
KeyStr("a", 20, ValueType::kTypeValue, 200),
KeyStr("b", 21, ValueType::kTypeValue, 210),
KeyStr("b", 20, ValueType::kTypeValue, 200),
KeyStr("b", 18, ValueType::kTypeValue, 180),
KeyStr("c", 17, ValueType::kTypeValue, 170),
KeyStr("c", 16, ValueType::kTypeValue, 160),
KeyStr("c", 15, ValueType::kTypeValue, 150)};
const std::vector<std::string> values = {"a20", "b21", "b20", "b18",
"c17", "c16", "c15"};
constexpr int input_level = 1;
auto file1 = mock::MakeMockFile(
{{keys[0], values[0]}, {keys[1], values[1]}, {keys[2], values[2]}});
AddMockFile(file1, input_level);
auto file2 = mock::MakeMockFile(
{{keys[3], values[3]}, {keys[4], values[4]}, {keys[5], values[5]}});
AddMockFile(file2, input_level);
auto file3 = mock::MakeMockFile({{keys[6], values[6]}});
AddMockFile(file3, input_level);
SetLastSequence(20);
auto output1 = mock::MakeMockFile({{keys[0], values[0]}});
auto output2 = mock::MakeMockFile(
{{keys[1], values[1]}, {keys[2], values[2]}, {keys[3], values[3]}});
auto output3 = mock::MakeMockFile(
{{keys[4], values[4]}, {keys[5], values[5]}, {keys[6], values[6]}});
auto expected_results =
std::vector<mock::KVVector>{output1, output2, output3};
const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
constexpr int output_level = 2;
constexpr int max_subcompactions = 4;
RunCompaction({files}, {input_level}, expected_results, /*snapshots=*/{},
/*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
output_level, /*verify=*/true, {kInvalidBlobFileNumber},
/*check_get_priority=*/false, Env::IO_TOTAL, Env::IO_TOTAL,
max_subcompactions);
}
TEST_F(CompactionJobTimestampTestWithBbTable, SubcompactionL0) {
cf_options_.target_file_size_base = 20;
mutable_cf_options_.target_file_size_base = 20;
NewDB();
const std::vector<std::string> keys = {
KeyStr("a", 20, ValueType::kTypeValue, 200),
KeyStr("b", 20, ValueType::kTypeValue, 200),
KeyStr("b", 19, ValueType::kTypeValue, 190),
KeyStr("b", 18, ValueType::kTypeValue, 180),
KeyStr("c", 17, ValueType::kTypeValue, 170),
KeyStr("c", 16, ValueType::kTypeValue, 160),
KeyStr("c", 15, ValueType::kTypeValue, 150)};
const std::vector<std::string> values = {"a20", "b20", "b19", "b18",
"c17", "c16", "c15"};
constexpr int input_level = 0;
auto file1 = mock::MakeMockFile({{keys[5], values[5]}, {keys[6], values[6]}});
AddMockFile(file1, input_level);
auto file2 = mock::MakeMockFile({{keys[3], values[3]}, {keys[4], values[4]}});
AddMockFile(file2, input_level);
auto file3 = mock::MakeMockFile(
{{keys[0], values[0]}, {keys[1], values[1]}, {keys[2], values[2]}});
AddMockFile(file3, input_level);
SetLastSequence(20);
auto output1 = mock::MakeMockFile({{keys[0], values[0]}});
auto output2 = mock::MakeMockFile(
{{keys[1], values[1]}, {keys[2], values[2]}, {keys[3], values[3]}});
auto output3 = mock::MakeMockFile(
{{keys[4], values[4]}, {keys[5], values[5]}, {keys[6], values[6]}});
auto expected_results =
std::vector<mock::KVVector>{output1, output2, output3};
const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
constexpr int output_level = 1;
constexpr int max_subcompactions = 4;
RunCompaction({files}, {input_level}, expected_results, /*snapshots=*/{},
/*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
output_level, /*verify=*/true, {kInvalidBlobFileNumber},
/*check_get_priority=*/false, Env::IO_TOTAL, Env::IO_TOTAL,
max_subcompactions);
}
// The io priority of the compaction reads and writes are different from
@ -1734,7 +1967,8 @@ class CompactionJobIOPriorityTest : public CompactionJobTestBase {
CompactionJobIOPriorityTest()
: CompactionJobTestBase(
test::PerThreadDBPath("compaction_job_io_priority_test"),
BytewiseComparator(), [](uint64_t /*ts*/) { return ""; }, true) {}
BytewiseComparator(), [](uint64_t /*ts*/) { return ""; },
/*test_io_priority=*/true, TableTypeForTest::kBlockBasedTable) {}
};
TEST_F(CompactionJobIOPriorityTest, WriteControllerStateNormal) {
@ -1742,10 +1976,12 @@ TEST_F(CompactionJobIOPriorityTest, WriteControllerStateNormal) {
NewDB();
mock::KVVector expected_results = CreateTwoFiles(false);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto files = cfd->current()->storage_info()->LevelFiles(0);
constexpr int input_level = 0;
auto files = cfd->current()->storage_info()->LevelFiles(input_level);
ASSERT_EQ(2U, files.size());
RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false,
kInvalidBlobFileNumber, false, Env::IO_LOW, Env::IO_LOW);
RunCompaction({files}, {input_level}, {expected_results}, {},
kMaxSequenceNumber, 1, false, {kInvalidBlobFileNumber}, false,
Env::IO_LOW, Env::IO_LOW);
}
TEST_F(CompactionJobIOPriorityTest, WriteControllerStateDelayed) {
@ -1753,13 +1989,15 @@ TEST_F(CompactionJobIOPriorityTest, WriteControllerStateDelayed) {
NewDB();
mock::KVVector expected_results = CreateTwoFiles(false);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto files = cfd->current()->storage_info()->LevelFiles(0);
constexpr int input_level = 0;
auto files = cfd->current()->storage_info()->LevelFiles(input_level);
ASSERT_EQ(2U, files.size());
{
std::unique_ptr<WriteControllerToken> delay_token =
write_controller_.GetDelayToken(1000000);
RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false,
kInvalidBlobFileNumber, false, Env::IO_USER, Env::IO_USER);
RunCompaction({files}, {input_level}, {expected_results}, {},
kMaxSequenceNumber, 1, false, {kInvalidBlobFileNumber}, false,
Env::IO_USER, Env::IO_USER);
}
}
@ -1768,13 +2006,15 @@ TEST_F(CompactionJobIOPriorityTest, WriteControllerStateStalled) {
NewDB();
mock::KVVector expected_results = CreateTwoFiles(false);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto files = cfd->current()->storage_info()->LevelFiles(0);
constexpr int input_level = 0;
auto files = cfd->current()->storage_info()->LevelFiles(input_level);
ASSERT_EQ(2U, files.size());
{
std::unique_ptr<WriteControllerToken> stop_token =
write_controller_.GetStopToken();
RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false,
kInvalidBlobFileNumber, false, Env::IO_USER, Env::IO_USER);
RunCompaction({files}, {input_level}, {expected_results}, {},
kMaxSequenceNumber, 1, false, {kInvalidBlobFileNumber}, false,
Env::IO_USER, Env::IO_USER);
}
}
@ -1782,10 +2022,12 @@ TEST_F(CompactionJobIOPriorityTest, GetRateLimiterPriority) {
NewDB();
mock::KVVector expected_results = CreateTwoFiles(false);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto files = cfd->current()->storage_info()->LevelFiles(0);
constexpr int input_level = 0;
auto files = cfd->current()->storage_info()->LevelFiles(input_level);
ASSERT_EQ(2U, files.size());
RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false,
kInvalidBlobFileNumber, true, Env::IO_LOW, Env::IO_LOW);
RunCompaction({files}, {input_level}, {expected_results}, {},
kMaxSequenceNumber, 1, false, {kInvalidBlobFileNumber}, true,
Env::IO_LOW, Env::IO_LOW);
}
} // namespace ROCKSDB_NAMESPACE

@ -6433,16 +6433,16 @@ InternalIterator* VersionSet::MakeInputIterator(
for (size_t i = 0; i < flevel->num_files; i++) {
const FileMetaData& fmd = *flevel->files[i].file_metadata;
if (start.has_value() &&
cfd->user_comparator()->Compare(start.value(),
fmd.largest.user_key()) > 0) {
cfd->user_comparator()->CompareWithoutTimestamp(
start.value(), fmd.largest.user_key()) > 0) {
continue;
}
// We should be able to filter out the case where the end key
// equals to the end boundary, since the end key is exclusive.
// We try to be extra safe here.
if (end.has_value() &&
cfd->user_comparator()->Compare(end.value(),
fmd.smallest.user_key()) < 0) {
cfd->user_comparator()->CompareWithoutTimestamp(
end.value(), fmd.smallest.user_key()) < 0) {
continue;
}

Loading…
Cancel
Save