Fix overlapping check by excluding timestamp (#10615)

Summary:
With user-defined timestamp, checking overlapping should exclude
timestamp part from key. This has already been done for range checking
for files in sstableKeyCompare(), but not yet done when checking with
concurrent compactions.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10615

Test Plan:
(Will add more tests)

make check
(Repro seems easier with this commit sha: git checkout 78bbdef530)
rm -rf /dev/shm/rocksdb/* &&
mkdir /dev/shm/rocksdb/rocksdb_crashtest_expected &&
./db_stress
--allow_data_in_errors=True --clear_column_family_one_in=0
--continuous_verification_interval=0 --data_block_index_type=1
--db=/dev/shm/rocksdb//rocksdb_crashtest_blackbox --delpercent=5
--delrangepercent=0
--expected_values_dir=/dev/shm/rocksdb//rocksdb_crashtest_expected
--iterpercent=0 --max_background_compactions=20
--max_bytes_for_level_base=10485760 --max_key=25000000
--max_write_batch_group_size_bytes=1048576 --nooverwritepercent=1
--ops_per_thread=1000000 --paranoid_file_checks=1 --partition_filters=0
--prefix_size=8 --prefixpercent=5 --readpercent=30 --reopen=0
--snapshot_hold_ops=100000 --subcompactions=1 --compaction_pri=3
--target_file_size_base=65536 --target_file_size_multiplier=2
--test_batches_snapshots=0 --test_cf_consistency=0 --use_multiget=1
--user_timestamp_size=8 --value_size_mult=32 --verify_checksum=1
--write_buffer_size=65536 --writepercent=60 -disable_wal=1

Reviewed By: akankshamahajan15

Differential Revision: D39146797

Pulled By: riversand963

fbshipit-source-id: 7fca800026ca6219220100b8b6cf84d907828163
main
Yanqin Jin 2 years ago committed by Facebook GitHub Bot
parent fe56cb9aa0
commit 3d67d79154
  1. 1
      HISTORY.md
  2. 51
      db/compaction/compaction_picker.cc
  3. 7
      db/compaction/compaction_picker_level.cc
  4. 178
      db/compaction/compaction_picker_test.cc
  5. 10
      db/compaction/compaction_picker_universal.cc
  6. 155
      db/db_with_timestamp_compaction_test.cc

@ -7,6 +7,7 @@
* Fixed a bug in the rocksdb.prefetched.bytes.discarded stat. It was counting the prefetch buffer size, rather than the actual number of bytes discarded from the buffer. * Fixed a bug in the rocksdb.prefetched.bytes.discarded stat. It was counting the prefetch buffer size, rather than the actual number of bytes discarded from the buffer.
* Fix bug where the directory containing CURRENT can left unsynced after CURRENT is updated to point to the latest MANIFEST, which leads to risk of unsync data loss of CURRENT. * Fix bug where the directory containing CURRENT can left unsynced after CURRENT is updated to point to the latest MANIFEST, which leads to risk of unsync data loss of CURRENT.
* Update rocksdb.multiget.io.batch.size stat in non-async MultiGet as well. * Update rocksdb.multiget.io.batch.size stat in non-async MultiGet as well.
* Fix a bug in key range overlap checking with concurrent compactions when user-defined timestamp is enabled. User-defined timestamps should be EXCLUDED when checking if two ranges overlap.
### Public API changes ### Public API changes
* Add `rocksdb_column_family_handle_get_id`, `rocksdb_column_family_handle_get_name` to get name, id of column family in C API * Add `rocksdb_column_family_handle_get_id`, `rocksdb_column_family_handle_get_name` to get name, id of column family in C API

@ -288,8 +288,10 @@ bool CompactionPicker::RangeOverlapWithCompaction(
const Comparator* ucmp = icmp_->user_comparator(); const Comparator* ucmp = icmp_->user_comparator();
for (Compaction* c : compactions_in_progress_) { for (Compaction* c : compactions_in_progress_) {
if (c->output_level() == level && if (c->output_level() == level &&
ucmp->Compare(smallest_user_key, c->GetLargestUserKey()) <= 0 && ucmp->CompareWithoutTimestamp(smallest_user_key,
ucmp->Compare(largest_user_key, c->GetSmallestUserKey()) >= 0) { c->GetLargestUserKey()) <= 0 &&
ucmp->CompareWithoutTimestamp(largest_user_key,
c->GetSmallestUserKey()) >= 0) {
// Overlap // Overlap
return true; return true;
} }
@ -878,21 +880,21 @@ namespace {
// Test whether two files have overlapping key-ranges. // Test whether two files have overlapping key-ranges.
bool HaveOverlappingKeyRanges(const Comparator* c, const SstFileMetaData& a, bool HaveOverlappingKeyRanges(const Comparator* c, const SstFileMetaData& a,
const SstFileMetaData& b) { const SstFileMetaData& b) {
if (c->Compare(a.smallestkey, b.smallestkey) >= 0) { if (c->CompareWithoutTimestamp(a.smallestkey, b.smallestkey) >= 0) {
if (c->Compare(a.smallestkey, b.largestkey) <= 0) { if (c->CompareWithoutTimestamp(a.smallestkey, b.largestkey) <= 0) {
// b.smallestkey <= a.smallestkey <= b.largestkey // b.smallestkey <= a.smallestkey <= b.largestkey
return true; return true;
} }
} else if (c->Compare(a.largestkey, b.smallestkey) >= 0) { } else if (c->CompareWithoutTimestamp(a.largestkey, b.smallestkey) >= 0) {
// a.smallestkey < b.smallestkey <= a.largestkey // a.smallestkey < b.smallestkey <= a.largestkey
return true; return true;
} }
if (c->Compare(a.largestkey, b.largestkey) <= 0) { if (c->CompareWithoutTimestamp(a.largestkey, b.largestkey) <= 0) {
if (c->Compare(a.largestkey, b.smallestkey) >= 0) { if (c->CompareWithoutTimestamp(a.largestkey, b.smallestkey) >= 0) {
// b.smallestkey <= a.largestkey <= b.largestkey // b.smallestkey <= a.largestkey <= b.largestkey
return true; return true;
} }
} else if (c->Compare(a.smallestkey, b.largestkey) <= 0) { } else if (c->CompareWithoutTimestamp(a.smallestkey, b.largestkey) <= 0) {
// a.smallestkey <= b.largestkey < a.largestkey // a.smallestkey <= b.largestkey < a.largestkey
return true; return true;
} }
@ -931,8 +933,10 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
// identify the first and the last compaction input files // identify the first and the last compaction input files
// in the current level. // in the current level.
for (size_t f = 0; f < current_files.size(); ++f) { for (size_t f = 0; f < current_files.size(); ++f) {
if (input_files->find(TableFileNameToNumber(current_files[f].name)) != const uint64_t file_number = TableFileNameToNumber(current_files[f].name);
input_files->end()) { if (input_files->find(file_number) == input_files->end()) {
continue;
}
first_included = std::min(first_included, static_cast<int>(f)); first_included = std::min(first_included, static_cast<int>(f));
last_included = std::max(last_included, static_cast<int>(f)); last_included = std::max(last_included, static_cast<int>(f));
if (is_first == false) { if (is_first == false) {
@ -941,26 +945,26 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
is_first = true; is_first = true;
} }
} }
}
if (last_included == kNotFound) { if (last_included == kNotFound) {
continue; continue;
} }
if (l != 0) { if (l != 0) {
// expend the compaction input of the current level if it // expand the compaction input of the current level if it
// has overlapping key-range with other non-compaction input // has overlapping key-range with other non-compaction input
// files in the same level. // files in the same level.
while (first_included > 0) { while (first_included > 0) {
if (comparator->Compare(current_files[first_included - 1].largestkey, if (comparator->CompareWithoutTimestamp(
current_files[first_included].smallestkey) < current_files[first_included - 1].largestkey,
0) { current_files[first_included].smallestkey) < 0) {
break; break;
} }
first_included--; first_included--;
} }
while (last_included < static_cast<int>(current_files.size()) - 1) { while (last_included < static_cast<int>(current_files.size()) - 1) {
if (comparator->Compare(current_files[last_included + 1].smallestkey, if (comparator->CompareWithoutTimestamp(
current_files[last_included + 1].smallestkey,
current_files[last_included].largestkey) > 0) { current_files[last_included].largestkey) > 0) {
break; break;
} }
@ -983,21 +987,22 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
// update smallest and largest key // update smallest and largest key
if (l == 0) { if (l == 0) {
for (int f = first_included; f <= last_included; ++f) { for (int f = first_included; f <= last_included; ++f) {
if (comparator->Compare(smallestkey, current_files[f].smallestkey) > if (comparator->CompareWithoutTimestamp(
0) { smallestkey, current_files[f].smallestkey) > 0) {
smallestkey = current_files[f].smallestkey; smallestkey = current_files[f].smallestkey;
} }
if (comparator->Compare(largestkey, current_files[f].largestkey) < 0) { if (comparator->CompareWithoutTimestamp(
largestkey, current_files[f].largestkey) < 0) {
largestkey = current_files[f].largestkey; largestkey = current_files[f].largestkey;
} }
} }
} else { } else {
if (comparator->Compare(smallestkey, if (comparator->CompareWithoutTimestamp(
current_files[first_included].smallestkey) > 0) { smallestkey, current_files[first_included].smallestkey) > 0) {
smallestkey = current_files[first_included].smallestkey; smallestkey = current_files[first_included].smallestkey;
} }
if (comparator->Compare(largestkey, if (comparator->CompareWithoutTimestamp(
current_files[last_included].largestkey) < 0) { largestkey, current_files[last_included].largestkey) < 0) {
largestkey = current_files[last_included].largestkey; largestkey = current_files[last_included].largestkey;
} }
} }

@ -662,9 +662,14 @@ bool LevelCompactionBuilder::TryExtendNonL0TrivialMove(int start_index) {
break; break;
} }
if (i < static_cast<int>(level_files.size()) - 1 && if (i < static_cast<int>(level_files.size()) - 1 &&
compaction_picker_->icmp()->user_comparator()->Compare( compaction_picker_->icmp()
->user_comparator()
->CompareWithoutTimestamp(
next_file->largest.user_key(), next_file->largest.user_key(),
level_files[i + 1]->smallest.user_key()) == 0) { level_files[i + 1]->smallest.user_key()) == 0) {
TEST_SYNC_POINT_CALLBACK(
"LevelCompactionBuilder::TryExtendNonL0TrivialMove:NoCleanCut",
nullptr);
// Not a clean up after adding the next file. Skip. // Not a clean up after adding the next file. Skip.
break; break;
} }

@ -27,7 +27,7 @@ class CountingLogger : public Logger {
size_t log_count; size_t log_count;
}; };
class CompactionPickerTest : public testing::Test { class CompactionPickerTestBase : public testing::Test {
public: public:
const Comparator* ucmp_; const Comparator* ucmp_;
InternalKeyComparator icmp_; InternalKeyComparator icmp_;
@ -49,9 +49,10 @@ class CompactionPickerTest : public testing::Test {
std::vector<CompactionInputFiles> input_files_; std::vector<CompactionInputFiles> input_files_;
int compaction_level_start_; int compaction_level_start_;
CompactionPickerTest() explicit CompactionPickerTestBase(const Comparator* _ucmp)
: ucmp_(BytewiseComparator()), : ucmp_(_ucmp),
icmp_(ucmp_), icmp_(ucmp_),
options_(CreateOptions(ucmp_)),
ioptions_(options_), ioptions_(options_),
mutable_cf_options_(options_), mutable_cf_options_(options_),
mutable_db_options_(), mutable_db_options_(),
@ -71,7 +72,7 @@ class CompactionPickerTest : public testing::Test {
std::numeric_limits<uint64_t>::max()); std::numeric_limits<uint64_t>::max());
} }
~CompactionPickerTest() override {} ~CompactionPickerTestBase() override {}
void NewVersionStorage(int num_levels, CompactionStyle style) { void NewVersionStorage(int num_levels, CompactionStyle style) {
DeleteVersionStorage(); DeleteVersionStorage();
@ -97,12 +98,17 @@ class CompactionPickerTest : public testing::Test {
input_files_.clear(); input_files_.clear();
} }
// REQUIRES: smallest and largest are c-style strings ending with '\0'
void Add(int level, uint32_t file_number, const char* smallest, void Add(int level, uint32_t file_number, const char* smallest,
const char* largest, uint64_t file_size = 1, uint32_t path_id = 0, const char* largest, uint64_t file_size = 1, uint32_t path_id = 0,
SequenceNumber smallest_seq = 100, SequenceNumber largest_seq = 100, SequenceNumber smallest_seq = 100, SequenceNumber largest_seq = 100,
size_t compensated_file_size = 0, bool marked_for_compact = false, size_t compensated_file_size = 0, bool marked_for_compact = false,
Temperature temperature = Temperature::kUnknown, Temperature temperature = Temperature::kUnknown,
uint64_t oldest_ancestor_time = kUnknownOldestAncesterTime) { uint64_t oldest_ancestor_time = kUnknownOldestAncesterTime,
Slice ts_of_smallest = Slice(), Slice ts_of_largest = Slice()) {
assert(ts_of_smallest.size() == ucmp_->timestamp_size());
assert(ts_of_largest.size() == ucmp_->timestamp_size());
VersionStorageInfo* vstorage; VersionStorageInfo* vstorage;
if (temp_vstorage_) { if (temp_vstorage_) {
vstorage = temp_vstorage_.get(); vstorage = temp_vstorage_.get();
@ -110,19 +116,46 @@ class CompactionPickerTest : public testing::Test {
vstorage = vstorage_.get(); vstorage = vstorage_.get();
} }
assert(level < vstorage->num_levels()); assert(level < vstorage->num_levels());
char* smallest_key_buf = nullptr;
char* largest_key_buf = nullptr;
if (!ts_of_smallest.empty()) {
smallest_key_buf = new char[strlen(smallest) + ucmp_->timestamp_size()];
memcpy(smallest_key_buf, smallest, strlen(smallest));
memcpy(smallest_key_buf + strlen(smallest), ts_of_smallest.data(),
ucmp_->timestamp_size());
largest_key_buf = new char[strlen(largest) + ucmp_->timestamp_size()];
memcpy(largest_key_buf, largest, strlen(largest));
memcpy(largest_key_buf + strlen(largest), ts_of_largest.data(),
ucmp_->timestamp_size());
}
InternalKey smallest_ikey = InternalKey(
smallest_key_buf ? Slice(smallest_key_buf,
ucmp_->timestamp_size() + strlen(smallest))
: smallest,
smallest_seq, kTypeValue);
InternalKey largest_ikey = InternalKey(
largest_key_buf
? Slice(largest_key_buf, ucmp_->timestamp_size() + strlen(largest))
: largest,
largest_seq, kTypeValue);
FileMetaData* f = new FileMetaData( FileMetaData* f = new FileMetaData(
file_number, path_id, file_size, file_number, path_id, file_size, smallest_ikey, largest_ikey,
InternalKey(smallest, smallest_seq, kTypeValue), smallest_seq, largest_seq, marked_for_compact, temperature,
InternalKey(largest, largest_seq, kTypeValue), smallest_seq, kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
largest_seq, marked_for_compact, temperature, kInvalidBlobFileNumber, kUnknownFileCreationTime, kUnknownFileChecksum,
kUnknownOldestAncesterTime, kUnknownFileCreationTime, kUnknownFileChecksumFuncName, kNullUniqueId64x2);
kUnknownFileChecksum, kUnknownFileChecksumFuncName, kNullUniqueId64x2);
f->compensated_file_size = f->compensated_file_size =
(compensated_file_size != 0) ? compensated_file_size : file_size; (compensated_file_size != 0) ? compensated_file_size : file_size;
f->oldest_ancester_time = oldest_ancestor_time; f->oldest_ancester_time = oldest_ancestor_time;
vstorage->AddFile(level, f); vstorage->AddFile(level, f);
files_.emplace_back(f); files_.emplace_back(f);
file_map_.insert({file_number, {f, level}}); file_map_.insert({file_number, {f, level}});
delete[] smallest_key_buf;
delete[] largest_key_buf;
} }
void SetCompactionInputFilesLevels(int level_count, int start_level) { void SetCompactionInputFilesLevels(int level_count, int start_level) {
@ -155,9 +188,31 @@ class CompactionPickerTest : public testing::Test {
} }
private: private:
Options CreateOptions(const Comparator* ucmp) const {
Options opts;
opts.comparator = ucmp;
return opts;
}
std::unique_ptr<VersionStorageInfo> temp_vstorage_; std::unique_ptr<VersionStorageInfo> temp_vstorage_;
}; };
class CompactionPickerTest : public CompactionPickerTestBase {
public:
explicit CompactionPickerTest()
: CompactionPickerTestBase(BytewiseComparator()) {}
~CompactionPickerTest() override {}
};
class CompactionPickerU64TsTest : public CompactionPickerTestBase {
public:
explicit CompactionPickerU64TsTest()
: CompactionPickerTestBase(test::BytewiseComparatorWithU64TsWrapper()) {}
~CompactionPickerU64TsTest() override {}
};
TEST_F(CompactionPickerTest, Empty) { TEST_F(CompactionPickerTest, Empty) {
NewVersionStorage(6, kCompactionStyleLevel); NewVersionStorage(6, kCompactionStyleLevel);
UpdateVersionStorageInfo(); UpdateVersionStorageInfo();
@ -3317,6 +3372,107 @@ TEST_F(CompactionPickerTest, UniversalSizeAmpTierCompactionLastLevel) {
ASSERT_EQ(compaction->input_levels(6)->num_files, 0); ASSERT_EQ(compaction->input_levels(6)->num_files, 0);
} }
TEST_F(CompactionPickerU64TsTest, Overlap) {
int num_levels = ioptions_.num_levels;
NewVersionStorage(num_levels, kCompactionStyleLevel);
constexpr int level = 0;
constexpr uint64_t file_number = 20ULL;
constexpr char smallest[4] = "500";
constexpr char largest[4] = "600";
constexpr uint64_t ts_of_smallest = 12345ULL;
constexpr uint64_t ts_of_largest = 56789ULL;
{
std::string ts1;
PutFixed64(&ts1, ts_of_smallest);
std::string ts2;
PutFixed64(&ts2, ts_of_largest);
Add(level, file_number, smallest, largest,
/*file_size=*/1U, /*path_id=*/0,
/*smallest_seq=*/100, /*largest_seq=*/100, /*compensated_file_size=*/0,
/*marked_for_compact=*/false, /*temperature=*/Temperature::kUnknown,
/*oldest_ancestor_time=*/kUnknownOldestAncesterTime, ts1, ts2);
UpdateVersionStorageInfo();
}
std::unordered_set<uint64_t> input{file_number};
std::vector<CompactionInputFiles> input_files;
ASSERT_OK(level_compaction_picker.GetCompactionInputsFromFileNumbers(
&input_files, &input, vstorage_.get(), CompactionOptions()));
std::unique_ptr<Compaction> comp1(level_compaction_picker.CompactFiles(
CompactionOptions(), input_files, level, vstorage_.get(),
mutable_cf_options_, mutable_db_options_, /*output_path_id=*/0));
{
// [600, ts=50000] to [600, ts=50000] is the range to check.
// ucmp->Compare(smallest_user_key, c->GetLargestUserKey()) > 0, but
// ucmp->CompareWithoutTimestamp(smallest_user_key,
// c->GetLargestUserKey()) == 0.
// Should still be considered overlapping.
std::string user_key_with_ts1(largest);
PutFixed64(&user_key_with_ts1, ts_of_largest - 1);
std::string user_key_with_ts2(largest);
PutFixed64(&user_key_with_ts2, ts_of_largest - 1);
ASSERT_TRUE(level_compaction_picker.RangeOverlapWithCompaction(
user_key_with_ts1, user_key_with_ts2, level));
}
{
// [500, ts=60000] to [500, ts=60000] is the range to check.
// ucmp->Compare(largest_user_key, c->GetSmallestUserKey()) < 0, but
// ucmp->CompareWithoutTimestamp(largest_user_key,
// c->GetSmallestUserKey()) == 0.
// Should still be considered overlapping.
std::string user_key_with_ts1(smallest);
PutFixed64(&user_key_with_ts1, ts_of_smallest + 1);
std::string user_key_with_ts2(smallest);
PutFixed64(&user_key_with_ts2, ts_of_smallest + 1);
ASSERT_TRUE(level_compaction_picker.RangeOverlapWithCompaction(
user_key_with_ts1, user_key_with_ts2, level));
}
}
TEST_F(CompactionPickerU64TsTest, CannotTrivialMoveUniversal) {
constexpr uint64_t kFileSize = 100000;
mutable_cf_options_.level0_file_num_compaction_trigger = 2;
mutable_cf_options_.compaction_options_universal.allow_trivial_move = true;
NewVersionStorage(1, kCompactionStyleUniversal);
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
UpdateVersionStorageInfo();
// must return false when there's no files.
ASSERT_FALSE(universal_compaction_picker.NeedsCompaction(vstorage_.get()));
std::string ts1;
PutFixed64(&ts1, 9000);
std::string ts2;
PutFixed64(&ts2, 8000);
std::string ts3;
PutFixed64(&ts3, 7000);
std::string ts4;
PutFixed64(&ts4, 6000);
NewVersionStorage(3, kCompactionStyleUniversal);
// A compaction should be triggered and pick file 2
Add(1, 1U, "150", "150", kFileSize, /*path_id=*/0, /*smallest_seq=*/100,
/*largest_seq=*/100, /*compensated_file_size=*/kFileSize,
/*marked_for_compact=*/false, Temperature::kUnknown,
kUnknownOldestAncesterTime, ts1, ts2);
Add(2, 2U, "150", "150", kFileSize, /*path_id=*/0, /*smallest_seq=*/100,
/*largest_seq=*/100, /*compensated_file_size=*/kFileSize,
/*marked_for_compact=*/false, Temperature::kUnknown,
kUnknownOldestAncesterTime, ts3, ts4);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
assert(compaction);
ASSERT_TRUE(!compaction->is_trivial_move());
}
class PerKeyPlacementCompactionPickerTest class PerKeyPlacementCompactionPickerTest
: public CompactionPickerTest, : public CompactionPickerTest,
public testing::WithParamInterface<bool> { public testing::WithParamInterface<bool> {

@ -164,7 +164,7 @@ struct SmallestKeyHeapComparator {
explicit SmallestKeyHeapComparator(const Comparator* ucmp) { ucmp_ = ucmp; } explicit SmallestKeyHeapComparator(const Comparator* ucmp) { ucmp_ = ucmp; }
bool operator()(InputFileInfo i1, InputFileInfo i2) const { bool operator()(InputFileInfo i1, InputFileInfo i2) const {
return (ucmp_->Compare(i1.f->smallest.user_key(), return (ucmp_->CompareWithoutTimestamp(i1.f->smallest.user_key(),
i2.f->smallest.user_key()) > 0); i2.f->smallest.user_key()) > 0);
} }
@ -249,13 +249,13 @@ bool UniversalCompactionBuilder::IsInputFilesNonOverlapping(Compaction* c) {
prev = curr; prev = curr;
first_iter = 0; first_iter = 0;
} else { } else {
if (comparator->Compare(prev.f->largest.user_key(), if (comparator->CompareWithoutTimestamp(
curr.f->smallest.user_key()) >= 0) { prev.f->largest.user_key(), curr.f->smallest.user_key()) >= 0) {
// found overlapping files, return false // found overlapping files, return false
return false; return false;
} }
assert(comparator->Compare(curr.f->largest.user_key(), assert(comparator->CompareWithoutTimestamp(
prev.f->largest.user_key()) > 0); curr.f->largest.user_key(), prev.f->largest.user_key()) > 0);
prev = curr; prev = curr;
} }

@ -170,6 +170,161 @@ TEST_F(TimestampCompatibleCompactionTest, MultipleSubCompactions) {
} }
} }
class TestFilePartitioner : public SstPartitioner {
public:
explicit TestFilePartitioner() {}
~TestFilePartitioner() override {}
const char* Name() const override { return "TestFilePartitioner"; }
PartitionerResult ShouldPartition(
const PartitionerRequest& /*request*/) override {
return PartitionerResult::kRequired;
}
bool CanDoTrivialMove(const Slice& /*smallest_user_key*/,
const Slice& /*largest_user_key*/) override {
return false;
}
};
class TestFilePartitionerFactory : public SstPartitionerFactory {
public:
explicit TestFilePartitionerFactory() {}
std::unique_ptr<SstPartitioner> CreatePartitioner(
const SstPartitioner::Context& /*context*/) const override {
std::unique_ptr<SstPartitioner> ret =
std::make_unique<TestFilePartitioner>();
return ret;
}
const char* Name() const override { return "TestFilePartitionerFactory"; }
};
#ifndef ROCKSDB_LITE
TEST_F(TimestampCompatibleCompactionTest, CompactFilesRangeCheckL0) {
Options options = CurrentOptions();
options.env = env_;
options.sst_partitioner_factory =
std::make_shared<TestFilePartitionerFactory>();
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
options.disable_auto_compactions = true;
DestroyAndReopen(options);
constexpr int kNumFiles = 10;
constexpr int kKeysPerFile = 2;
const std::string user_key = "foo";
constexpr uint64_t start_ts = 10000;
uint64_t cur_ts = start_ts;
for (int k = 0; k < kNumFiles; ++k) {
for (int i = 0; i < kKeysPerFile; ++i) {
ASSERT_OK(db_->Put(WriteOptions(), user_key, Timestamp(cur_ts),
"v" + std::to_string(i)));
++cur_ts;
}
ASSERT_OK(db_->Flush(FlushOptions()));
}
std::vector<std::string> input_files{};
{
std::vector<std::string> files;
ASSERT_OK(env_->GetChildren(dbname_, &files));
for (const auto& f : files) {
uint64_t file_num = 0;
FileType file_type = FileType::kWalFile;
if (!ParseFileName(f, &file_num, &file_type) ||
file_type != FileType::kTableFile) {
continue;
}
input_files.emplace_back(f);
}
// sorting here by name, which also happens to sort by generation date.
std::sort(input_files.begin(), input_files.end());
assert(kNumFiles == input_files.size());
std::vector<std::string> tmp;
tmp.emplace_back(input_files[input_files.size() / 2]);
input_files.swap(tmp);
}
{
std::vector<std::string> output_file_names;
CompactionJobInfo compaction_job_info;
ASSERT_OK(db_->CompactFiles(CompactionOptions(), input_files,
/*output_level=*/1, /*output_path_id=*/-1,
&output_file_names, &compaction_job_info));
// We expect the L0 files older than the original provided input were all
// included in the compaction.
ASSERT_EQ(static_cast<size_t>(kNumFiles / 2 + 1),
compaction_job_info.input_files.size());
}
}
TEST_F(TimestampCompatibleCompactionTest, CompactFilesRangeCheckL1) {
Options options = CurrentOptions();
options.env = env_;
options.sst_partitioner_factory =
std::make_shared<TestFilePartitionerFactory>();
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
constexpr int kNumFiles = 4;
options.level0_file_num_compaction_trigger = kNumFiles;
DestroyAndReopen(options);
constexpr int kKeysPerFile = 2;
const std::string user_key = "foo";
constexpr uint64_t start_ts = 10000;
uint64_t cur_ts = start_ts;
// Generate some initial files in both L0 and L1.
for (int k = 0; k < kNumFiles; ++k) {
for (int i = 0; i < kKeysPerFile; ++i) {
ASSERT_OK(db_->Put(WriteOptions(), user_key, Timestamp(cur_ts),
"v" + std::to_string(i)));
++cur_ts;
}
ASSERT_OK(db_->Flush(FlushOptions()));
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/0, /*cf=*/0));
ASSERT_EQ(kNumFiles * kKeysPerFile,
NumTableFilesAtLevel(/*level=*/1, /*cf=*/0));
constexpr int additional_l0s = 2;
for (int i = 0; i < additional_l0s; ++i, ++cur_ts) {
ASSERT_OK(db_->Put(WriteOptions(), user_key, Timestamp(cur_ts), "v"));
ASSERT_OK(db_->Flush(FlushOptions()));
}
ASSERT_EQ(additional_l0s, NumTableFilesAtLevel(/*level=*/0, /*cf=*/0));
std::vector<std::string> inputs;
{
std::vector<LiveFileMetaData> fmetas;
db_->GetLiveFilesMetaData(&fmetas);
bool included_one_l1 = false;
for (const auto& meta : fmetas) {
if (meta.level == 0) {
inputs.emplace_back(meta.relative_filename);
} else if (!included_one_l1) {
inputs.emplace_back(meta.relative_filename);
included_one_l1 = true;
}
}
}
ASSERT_EQ(static_cast<size_t>(3), inputs.size());
{
std::vector<std::string> output_file_names;
CompactionJobInfo compaction_job_info;
ASSERT_OK(db_->CompactFiles(CompactionOptions(), inputs, /*output_level=*/1,
/*output_path_id=*/-1, &output_file_names,
&compaction_job_info));
ASSERT_EQ(kNumFiles * kKeysPerFile + 2, output_file_names.size());
ASSERT_EQ(kNumFiles * kKeysPerFile + 2,
static_cast<int>(compaction_job_info.input_files.size()));
}
}
#endif // !ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

Loading…
Cancel
Save