Cut output files at compaction cursors (#10227)

Summary:
The files behind the compaction cursor contain newer data than the files ahead of it. If a compaction writes a file that spans from before its output level’s cursor to after it, then data before the cursor will be contaminated with the old timestamp from the data after the cursor. To avoid this, we can split the output file into two – one entirely before the cursor and one entirely after the cursor. Note that, in rare cases, we **DO NOT** need to cut the file if it is a trivial move since the file will not be contaminated by older files. In such case, the compact cursor is not guaranteed to be the boundary of the file, but it does not hurt the round-robin selection process.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10227

Test Plan:
Add 'RoundRobinCutOutputAtCompactCursor' unit test in `db_compaction_test`

Task: [T122216351](https://www.internalfb.com/intern/tasks/?t=122216351)

Reviewed By: jay-zhuang

Differential Revision: D37388088

Pulled By: littlepig2013

fbshipit-source-id: 9246a6a084b6037b90d6ab3183ba4dfb75a3378d
main
zczhu 2 years ago committed by Facebook GitHub Bot
parent ba1f62ddfb
commit 17a1d65e3a
  1. 22
      db/compaction/compaction.cc
  2. 8
      db/compaction/compaction.h
  3. 37
      db/compaction/compaction_job.cc
  4. 57
      db/db_compaction_test.cc

@ -279,6 +279,28 @@ Compaction::Compaction(
}
GetBoundaryKeys(vstorage, inputs_, &smallest_user_key_, &largest_user_key_);
// Every compaction regardless of any compaction reason may respect the
// existing compact cursor in the output level to split output files
InternalKey temp_split_key = InternalKey();
if (immutable_options_.compaction_style == kCompactionStyleLevel &&
immutable_options_.compaction_pri == kRoundRobin) {
const InternalKey cursor =
input_vstorage_->GetCompactCursors()[output_level_];
if (cursor.Valid()) {
const Slice& cursor_user_key = ExtractUserKey(cursor.Encode());
auto ucmp = vstorage->InternalComparator()->user_comparator();
// May split output files according to the cursor if it in the user-key
// range
if (ucmp->CompareWithoutTimestamp(cursor_user_key, smallest_user_key_) >
0 &&
ucmp->CompareWithoutTimestamp(cursor_user_key, largest_user_key_) <=
0) {
temp_split_key = cursor;
}
}
}
output_split_key_ = temp_split_key;
}
Compaction::~Compaction() {

@ -177,6 +177,12 @@ class Compaction {
// moving a single input file to the next level (no merging or splitting)
bool IsTrivialMove() const;
// The split user key in the output level if this compaction is required to
// split the output files according to the existing cursor in the output
// level under round-robin compaction policy. Empty indicates no required
// splitting key
const InternalKey GetOutputSplitKey() const { return output_split_key_; }
// If true, then the compaction can be done by simply deleting input files.
bool deletion_compaction() const { return deletion_compaction_; }
@ -379,6 +385,8 @@ class Compaction {
Temperature output_temperature_;
// If true, then the compaction can be done by simply deleting input files.
const bool deletion_compaction_;
// should it split the output file using the compact cursor?
InternalKey output_split_key_;
// Compaction input files organized by level. Constant after construction
const std::vector<CompactionInputFiles> inputs_;

@ -191,7 +191,7 @@ struct CompactionJob::SubcompactionState {
// The number of bytes overlapping between the current output and
// grandparent files used in ShouldStopBefore().
uint64_t overlapped_bytes = 0;
// A flag determine whether the key has been seen in ShouldStopBefore()
// A flag determines whether the key has been seen in ShouldStopBefore()
bool seen_key = false;
// sub compaction job id, which is used to identify different sub-compaction
// within the same compaction job.
@ -201,6 +201,9 @@ struct CompactionJob::SubcompactionState {
// sub-compaction begin.
bool notify_on_subcompaction_completion = false;
// A flag determines if this subcompaction has been split by the cursor
bool is_split = false;
SubcompactionState(Compaction* c, Slice* _start, Slice* _end, uint64_t size,
uint32_t _sub_job_id)
: compaction(c),
@ -234,6 +237,23 @@ struct CompactionJob::SubcompactionState {
&compaction->column_family_data()->internal_comparator();
const std::vector<FileMetaData*>& grandparents = compaction->grandparents();
const InternalKey output_split_key = compaction->GetOutputSplitKey();
if (output_split_key.Valid() && !is_split) {
// Invalid output_split_key indicates that we do not need to split
if ((end == nullptr || icmp->user_comparator()->Compare(
ExtractUserKey(output_split_key.Encode()),
ExtractUserKey(*end)) < 0) &&
(start == nullptr || icmp->user_comparator()->Compare(
ExtractUserKey(output_split_key.Encode()),
ExtractUserKey(*start)) > 0)) {
// We may only split the output when the cursor is in the range. Split
// occurs when the next key is larger than/equal to the cursor
if (icmp->Compare(internal_key, output_split_key.Encode()) >= 0) {
is_split = true;
return true;
}
}
}
bool grandparant_file_switched = false;
// Scan to find earliest grandparent file that contains key.
while (grandparent_index < grandparents.size() &&
@ -621,6 +641,16 @@ void CompactionJob::GenSubcompactionBoundaries() {
}
}
Slice output_split_user_key;
const InternalKey output_split_key = c->GetOutputSplitKey();
if (output_split_key.Valid()) {
output_split_user_key = ExtractUserKey(output_split_key.Encode());
bounds.emplace_back(output_split_key.Encode());
} else {
// Empty user key indicates that splitting is not required here
output_split_user_key = Slice();
}
std::sort(bounds.begin(), bounds.end(),
[cfd_comparator](const Slice& a, const Slice& b) -> bool {
return cfd_comparator->Compare(ExtractUserKey(a),
@ -691,7 +721,10 @@ void CompactionJob::GenSubcompactionBoundaries() {
// need to put an end boundary
continue;
}
if (sum >= mean) {
if (sum >= mean ||
(!output_split_user_key.empty() &&
cfd_comparator->Compare(ExtractUserKey(ranges[i].range.limit),
output_split_user_key) == 0)) {
boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit));
sizes_.emplace_back(sum);
subcompactions--;

@ -5290,6 +5290,63 @@ TEST_F(DBCompactionTest, PersistRoundRobinCompactCursor) {
}
}
TEST_F(DBCompactionTest, RoundRobinCutOutputAtCompactCursor) {
Options options = CurrentOptions();
options.num_levels = 3;
options.compression = kNoCompression;
options.write_buffer_size = 4 * 1024;
options.max_bytes_for_level_base = 64 * 1024;
options.max_bytes_for_level_multiplier = 4;
options.level0_file_num_compaction_trigger = 4;
options.compaction_pri = CompactionPri::kRoundRobin;
DestroyAndReopen(options);
VersionSet* const versions = dbfull()->GetVersionSet();
assert(versions);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
ASSERT_NE(cfd, nullptr);
Version* const current = cfd->current();
ASSERT_NE(current, nullptr);
VersionStorageInfo* storage_info = current->storage_info();
ASSERT_NE(storage_info, nullptr);
const InternalKey split_cursor = InternalKey(Key(600), 100, kTypeValue);
storage_info->AddCursorForOneLevel(2, split_cursor);
Random rnd(301);
for (int i = 0; i < 50; i++) {
for (int j = 0; j < 50; j++) {
ASSERT_OK(Put(Key(j * 2 + i * 100), rnd.RandomString(102)));
}
}
// Add more overlapping files (avoid trivial move) to trigger compaction that
// output files in L2. Note that trivial move does not trigger compaction and
// in that case the cursor is not necessarily the boundary of file.
for (int i = 0; i < 50; i++) {
for (int j = 0; j < 50; j++) {
ASSERT_OK(Put(Key(j * 2 + 1 + i * 100), rnd.RandomString(1014)));
}
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
std::vector<std::vector<FileMetaData>> level_to_files;
dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
&level_to_files);
const auto icmp = cfd->current()->storage_info()->InternalComparator();
// Files in level 2 should be split by the cursor
for (const auto& file : level_to_files[2]) {
ASSERT_TRUE(
icmp->Compare(file.smallest.Encode(), split_cursor.Encode()) >= 0 ||
icmp->Compare(file.largest.Encode(), split_cursor.Encode()) < 0);
}
}
class NoopMergeOperator : public MergeOperator {
public:
NoopMergeOperator() {}

Loading…
Cancel
Save