level compaction expansion

Summary:
reimplement the compaction expansion on lower level.

Considering such a case:
input level file: 1[B E] 2[F G] 3[H I] 4 [J M]
output level file: 5[A C] 6[D K] 7[L O]

If we initially pick file 2, now we will compact file 2 and 6. But we can safely compact 2, 3 and 6 without expanding the output level.

The previous code is messy and wrong.

In this diff, I first determine the input range [a, b], and output range [c, d],
then we get the range [e,f] = [min(a, c), max(b, d] and put all eligible clean-cut files within [e, f] into this compaction.

**Note: clean-cut means the files don't have the same user key on the boundaries of some files that are not chosen in this compaction**.
Closes https://github.com/facebook/rocksdb/pull/1760

Differential Revision: D4395564

Pulled By: lightmark

fbshipit-source-id: 2dc2c5c
main
Aaron Gao 8 years ago committed by Facebook Github Bot
parent ebc8a79980
commit 2a0f3d0de1
  1. 80
      db/compaction_picker.cc
  2. 72
      db/compaction_picker_test.cc
  3. 206
      db/version_set.cc
  4. 32
      db/version_set.h

@ -18,7 +18,6 @@
#include <queue> #include <queue>
#include <string> #include <string>
#include <utility> #include <utility>
#include "db/column_family.h" #include "db/column_family.h"
#include "db/filename.h" #include "db/filename.h"
#include "util/log_buffer.h" #include "util/log_buffer.h"
@ -466,49 +465,66 @@ bool CompactionPicker::SetupOtherInputs(
// user key, while excluding other entries for the same user key. This // user key, while excluding other entries for the same user key. This
// can happen when one user key spans multiple files. // can happen when one user key spans multiple files.
if (!output_level_inputs->empty()) { if (!output_level_inputs->empty()) {
CompactionInputFiles expanded0; const uint64_t limit = mutable_cf_options.max_compaction_bytes;
expanded0.level = input_level; const uint64_t output_level_inputs_size =
// Get entire range covered by compaction TotalCompensatedFileSize(output_level_inputs->files);
const uint64_t inputs_size = TotalCompensatedFileSize(inputs->files);
bool expand_inputs = false;
CompactionInputFiles expanded_inputs;
expanded_inputs.level = input_level;
// Get closed interval of output level
InternalKey all_start, all_limit; InternalKey all_start, all_limit;
GetRange(*inputs, *output_level_inputs, &all_start, &all_limit); GetRange(*inputs, *output_level_inputs, &all_start, &all_limit);
bool try_overlapping_inputs = true;
vstorage->GetOverlappingInputs(input_level, &all_start, &all_limit, vstorage->GetOverlappingInputs(input_level, &all_start, &all_limit,
&expanded0.files, base_index, nullptr); &expanded_inputs.files, base_index, nullptr);
const uint64_t inputs0_size = TotalCompensatedFileSize(inputs->files); uint64_t expanded_inputs_size =
const uint64_t inputs1_size = TotalCompensatedFileSize(expanded_inputs.files);
TotalCompensatedFileSize(output_level_inputs->files); if (!ExpandWhileOverlapping(cf_name, vstorage, &expanded_inputs)) {
const uint64_t expanded0_size = TotalCompensatedFileSize(expanded0.files); try_overlapping_inputs = false;
uint64_t limit = mutable_cf_options.max_compaction_bytes; }
if (expanded0.size() > inputs->size() && if (try_overlapping_inputs && expanded_inputs.size() > inputs->size() &&
inputs1_size + expanded0_size < limit && output_level_inputs_size + expanded_inputs_size < limit &&
!FilesInCompaction(expanded0.files) && !FilesInCompaction(expanded_inputs.files)) {
!vstorage->HasOverlappingUserKey(&expanded0.files, input_level)) {
InternalKey new_start, new_limit; InternalKey new_start, new_limit;
GetRange(expanded0, &new_start, &new_limit); GetRange(expanded_inputs, &new_start, &new_limit);
CompactionInputFiles expanded1; CompactionInputFiles expanded_output_level_inputs;
expanded1.level = output_level; expanded_output_level_inputs.level = output_level;
vstorage->GetOverlappingInputs(output_level, &new_start, &new_limit, vstorage->GetOverlappingInputs(output_level, &new_start, &new_limit,
&expanded1.files, *parent_index, &expanded_output_level_inputs.files,
parent_index); *parent_index, parent_index);
assert(!expanded1.empty()); assert(!expanded_output_level_inputs.empty());
if (!FilesInCompaction(expanded1.files) && if (!FilesInCompaction(expanded_output_level_inputs.files) &&
ExpandWhileOverlapping(cf_name, vstorage, &expanded1) && ExpandWhileOverlapping(cf_name, vstorage,
expanded1.size() == output_level_inputs->size()) { &expanded_output_level_inputs) &&
expanded_output_level_inputs.size() == output_level_inputs->size()) {
expand_inputs = true;
}
}
if (!expand_inputs) {
vstorage->GetCleanInputsWithinInterval(input_level, &all_start,
&all_limit, &expanded_inputs.files,
base_index, nullptr);
expanded_inputs_size = TotalCompensatedFileSize(expanded_inputs.files);
if (expanded_inputs.size() > inputs->size() &&
output_level_inputs_size + expanded_inputs_size < limit &&
!FilesInCompaction(expanded_inputs.files)) {
expand_inputs = true;
}
}
if (expand_inputs) {
Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log, Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
"[%s] Expanding@%d %" ROCKSDB_PRIszt "+%" ROCKSDB_PRIszt "(%" PRIu64 "[%s] Expanding@%d %" ROCKSDB_PRIszt "+%" ROCKSDB_PRIszt "(%" PRIu64
"+%" PRIu64 " bytes) to %" ROCKSDB_PRIszt "+%" ROCKSDB_PRIszt "+%" PRIu64 " bytes) to %" ROCKSDB_PRIszt "+%" ROCKSDB_PRIszt
" (%" PRIu64 "+%" PRIu64 "bytes)\n", " (%" PRIu64 "+%" PRIu64 "bytes)\n",
cf_name.c_str(), input_level, inputs->size(), cf_name.c_str(), input_level, inputs->size(),
output_level_inputs->size(), inputs0_size, inputs1_size, output_level_inputs->size(), inputs_size, output_level_inputs_size,
expanded0.size(), expanded1.size(), expanded0_size, inputs1_size); expanded_inputs.size(), output_level_inputs->size(),
smallest = new_start; expanded_inputs_size, output_level_inputs_size);
largest = new_limit; inputs->files = expanded_inputs.files;
inputs->files = expanded0.files;
output_level_inputs->files = expanded1.files;
} }
} }
}
return true; return true;
} }

@ -758,6 +758,70 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys7) {
ASSERT_EQ(5U, compaction->inputs(1)->back()->fd.GetNumber()); ASSERT_EQ(5U, compaction->inputs(1)->back()->fd.GetNumber());
} }
TEST_F(CompactionPickerTest, OverlappingUserKeys8) {
NewVersionStorage(6, kCompactionStyleLevel);
mutable_cf_options_.max_compaction_bytes = 100000000000u;
// grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up
// Expand input level as much as possible
// no overlapping case
Add(1, 1U, "101", "150", 1U);
Add(1, 2U, "151", "200", 1U);
Add(1, 3U, "201", "300", 1000000000U);
Add(1, 4U, "301", "400", 1U);
Add(1, 5U, "401", "500", 1U);
Add(2, 6U, "150", "200", 1U);
Add(2, 7U, "200", "450", 1U, 0, 0);
Add(2, 8U, "500", "600", 1U);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(3U, compaction->num_input_files(0));
ASSERT_EQ(2U, compaction->num_input_files(1));
ASSERT_EQ(2U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(3U, compaction->input(0, 1)->fd.GetNumber());
ASSERT_EQ(4U, compaction->input(0, 2)->fd.GetNumber());
ASSERT_EQ(6U, compaction->input(1, 0)->fd.GetNumber());
ASSERT_EQ(7U, compaction->input(1, 1)->fd.GetNumber());
}
TEST_F(CompactionPickerTest, OverlappingUserKeys9) {
NewVersionStorage(6, kCompactionStyleLevel);
mutable_cf_options_.max_compaction_bytes = 100000000000u;
// grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up
// Expand input level as much as possible
// overlapping case
Add(1, 1U, "121", "150", 1U);
Add(1, 2U, "151", "200", 1U);
Add(1, 3U, "201", "300", 1000000000U);
Add(1, 4U, "301", "400", 1U);
Add(1, 5U, "401", "500", 1U);
Add(2, 6U, "100", "120", 1U);
Add(2, 7U, "150", "200", 1U);
Add(2, 8U, "200", "450", 1U, 0, 0);
Add(2, 9U, "501", "600", 1U);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(5U, compaction->num_input_files(0));
ASSERT_EQ(2U, compaction->num_input_files(1));
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber());
ASSERT_EQ(3U, compaction->input(0, 2)->fd.GetNumber());
ASSERT_EQ(4U, compaction->input(0, 3)->fd.GetNumber());
ASSERT_EQ(7U, compaction->input(1, 0)->fd.GetNumber());
ASSERT_EQ(8U, compaction->input(1, 1)->fd.GetNumber());
}
TEST_F(CompactionPickerTest, NotScheduleL1IfL0WithHigherPri1) { TEST_F(CompactionPickerTest, NotScheduleL1IfL0WithHigherPri1) {
NewVersionStorage(6, kCompactionStyleLevel); NewVersionStorage(6, kCompactionStyleLevel);
mutable_cf_options_.level0_file_num_compaction_trigger = 2; mutable_cf_options_.level0_file_num_compaction_trigger = 2;
@ -1132,13 +1196,13 @@ TEST_F(CompactionPickerTest, MaxCompactionBytesHit) {
ioptions_.level_compaction_dynamic_level_bytes = false; ioptions_.level_compaction_dynamic_level_bytes = false;
NewVersionStorage(6, kCompactionStyleLevel); NewVersionStorage(6, kCompactionStyleLevel);
// A compaction should be triggered and pick file 2 and 5. // A compaction should be triggered and pick file 2 and 5.
// It cannot expand because adding file 1 and 3, the compaction size will // It can expand because adding file 1 and 3, the compaction size will
// exceed mutable_cf_options_.max_bytes_for_level_base. // exceed mutable_cf_options_.max_bytes_for_level_base.
Add(1, 1U, "100", "150", 300000U); Add(1, 1U, "100", "150", 300000U);
Add(1, 2U, "151", "200", 300001U, 0, 0); Add(1, 2U, "151", "200", 300001U, 0, 0);
Add(1, 3U, "201", "250", 300000U, 0, 0); Add(1, 3U, "201", "250", 300000U, 0, 0);
Add(1, 4U, "251", "300", 300000U, 0, 0); Add(1, 4U, "251", "300", 300000U, 0, 0);
Add(2, 5U, "160", "256", 1U); Add(2, 5U, "100", "256", 1U);
UpdateVersionStorageInfo(); UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction( std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
@ -1152,7 +1216,7 @@ TEST_F(CompactionPickerTest, MaxCompactionBytesHit) {
} }
TEST_F(CompactionPickerTest, MaxCompactionBytesNotHit) { TEST_F(CompactionPickerTest, MaxCompactionBytesNotHit) {
mutable_cf_options_.max_bytes_for_level_base = 1000000u; mutable_cf_options_.max_bytes_for_level_base = 800000u;
mutable_cf_options_.max_compaction_bytes = 1000000u; mutable_cf_options_.max_compaction_bytes = 1000000u;
ioptions_.level_compaction_dynamic_level_bytes = false; ioptions_.level_compaction_dynamic_level_bytes = false;
NewVersionStorage(6, kCompactionStyleLevel); NewVersionStorage(6, kCompactionStyleLevel);
@ -1162,7 +1226,7 @@ TEST_F(CompactionPickerTest, MaxCompactionBytesNotHit) {
Add(1, 2U, "151", "200", 300001U, 0, 0); Add(1, 2U, "151", "200", 300001U, 0, 0);
Add(1, 3U, "201", "250", 300000U, 0, 0); Add(1, 3U, "201", "250", 300000U, 0, 0);
Add(1, 4U, "251", "300", 300000U, 0, 0); Add(1, 4U, "251", "300", 300000U, 0, 0);
Add(2, 5U, "000", "233", 1U); Add(2, 5U, "000", "251", 1U);
UpdateVersionStorageInfo(); UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction( std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(

@ -22,7 +22,6 @@
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include <string> #include <string>
#include "db/compaction.h" #include "db/compaction.h"
#include "db/filename.h" #include "db/filename.h"
#include "db/internal_stats.h" #include "db/internal_stats.h"
@ -1640,10 +1639,11 @@ void VersionStorageInfo::GetOverlappingInputs(
} }
const Comparator* user_cmp = user_comparator_; const Comparator* user_cmp = user_comparator_;
if (begin != nullptr && end != nullptr && level > 0) { if (begin != nullptr && end != nullptr && level > 0) {
GetOverlappingInputsBinarySearch(level, user_begin, user_end, inputs, GetOverlappingInputsRangeBinarySearch(level, user_begin, user_end, inputs,
hint_index, file_index); hint_index, file_index);
return; return;
} }
for (size_t i = 0; i < level_files_brief_[level].num_files; ) { for (size_t i = 0; i < level_files_brief_[level].num_files; ) {
FdWithKeyRange* f = &(level_files_brief_[level].files[i++]); FdWithKeyRange* f = &(level_files_brief_[level].files[i++]);
const Slice file_start = ExtractUserKey(f->smallest_key); const Slice file_start = ExtractUserKey(f->smallest_key);
@ -1674,13 +1674,49 @@ void VersionStorageInfo::GetOverlappingInputs(
} }
} }
// Store in "*inputs" files in "level" that within range [begin,end]
// Guarantee a "clean cut" boundary between the files in inputs
// and the surrounding files and the maxinum number of files.
// This will ensure that no parts of a key are lost during compaction.
// If hint_index is specified, then it points to a file in the range.
// The file_index returns a pointer to any file in an overlapping range.
void VersionStorageInfo::GetCleanInputsWithinInterval(
int level, const InternalKey* begin, const InternalKey* end,
std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) const {
if (level >= num_non_empty_levels_) {
// this level is empty, no inputs within range
return;
}
inputs->clear();
Slice user_begin, user_end;
if (begin != nullptr) {
user_begin = begin->user_key();
}
if (end != nullptr) {
user_end = end->user_key();
}
if (file_index) {
*file_index = -1;
}
if (begin != nullptr && end != nullptr && level > 0) {
GetOverlappingInputsRangeBinarySearch(level, user_begin, user_end, inputs,
hint_index, file_index,
true /* within_interval */);
}
}
// Store in "*inputs" all files in "level" that overlap [begin,end] // Store in "*inputs" all files in "level" that overlap [begin,end]
// Employ binary search to find at least one file that overlaps the // Employ binary search to find at least one file that overlaps the
// specified range. From that file, iterate backwards and // specified range. From that file, iterate backwards and
// forwards to find all overlapping files. // forwards to find all overlapping files.
void VersionStorageInfo::GetOverlappingInputsBinarySearch( // if within_range is set, then only store the maximum clean inputs
// within range [begin, end]. "clean" means there is a boudnary
// between the files in "*inputs" and the surrounding files
void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
int level, const Slice& user_begin, const Slice& user_end, int level, const Slice& user_begin, const Slice& user_end,
std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) const { std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
bool within_interval) const {
assert(level > 0); assert(level > 0);
int min = 0; int min = 0;
int mid = 0; int mid = 0;
@ -1700,9 +1736,13 @@ void VersionStorageInfo::GetOverlappingInputsBinarySearch(
FdWithKeyRange* f = &(level_files_brief_[level].files[mid]); FdWithKeyRange* f = &(level_files_brief_[level].files[mid]);
const Slice file_start = ExtractUserKey(f->smallest_key); const Slice file_start = ExtractUserKey(f->smallest_key);
const Slice file_limit = ExtractUserKey(f->largest_key); const Slice file_limit = ExtractUserKey(f->largest_key);
if (user_cmp->Compare(file_limit, user_begin) < 0) { if ((!within_interval && user_cmp->Compare(file_limit, user_begin) < 0) ||
(within_interval && user_cmp->Compare(file_start, user_begin) < 0)) {
min = mid + 1; min = mid + 1;
} else if (user_cmp->Compare(user_end, file_start) < 0) { } else if ((!within_interval &&
user_cmp->Compare(user_end, file_start) < 0) ||
(within_interval &&
user_cmp->Compare(user_end, file_limit) < 0)) {
max = mid - 1; max = mid - 1;
} else { } else {
foundOverlap = true; foundOverlap = true;
@ -1718,24 +1758,38 @@ void VersionStorageInfo::GetOverlappingInputsBinarySearch(
if (file_index) { if (file_index) {
*file_index = mid; *file_index = mid;
} }
ExtendOverlappingInputs(level, user_begin, user_end, inputs, mid);
int start_index, end_index;
if (within_interval) {
ExtendFileRangeWithinInterval(level, user_begin, user_end, mid, &start_index,
&end_index);
} else {
ExtendFileRangeOverlappingInterval(level, user_begin, user_end, mid,
&start_index, &end_index);
}
assert(end_index >= start_index);
// insert overlapping files into vector
for (int i = start_index; i <= end_index; i++) {
inputs->push_back(files_[level][i]);
}
} }
// Store in "*inputs" all files in "level" that overlap [begin,end] // Store in *start_index and *end_index the range of all files in
// The midIndex specifies the index of at least one file that // "level" that overlap [begin,end]
// The mid_index specifies the index of at least one file that
// overlaps the specified range. From that file, iterate backward // overlaps the specified range. From that file, iterate backward
// and forward to find all overlapping files. // and forward to find all overlapping files.
// Use FileLevel in searching, make it faster // Use FileLevel in searching, make it faster
void VersionStorageInfo::ExtendOverlappingInputs( void VersionStorageInfo::ExtendFileRangeOverlappingInterval(
int level, const Slice& user_begin, const Slice& user_end, int level, const Slice& user_begin, const Slice& user_end,
std::vector<FileMetaData*>* inputs, unsigned int midIndex) const { unsigned int mid_index, int* start_index, int* end_index) const {
const Comparator* user_cmp = user_comparator_; const Comparator* user_cmp = user_comparator_;
const FdWithKeyRange* files = level_files_brief_[level].files; const FdWithKeyRange* files = level_files_brief_[level].files;
#ifndef NDEBUG #ifndef NDEBUG
{ {
// assert that the file at midIndex overlaps with the range // assert that the file at mid_index overlaps with the range
assert(midIndex < level_files_brief_[level].num_files); assert(mid_index < level_files_brief_[level].num_files);
const FdWithKeyRange* f = &files[midIndex]; const FdWithKeyRange* f = &files[mid_index];
const Slice fstart = ExtractUserKey(f->smallest_key); const Slice fstart = ExtractUserKey(f->smallest_key);
const Slice flimit = ExtractUserKey(f->largest_key); const Slice flimit = ExtractUserKey(f->largest_key);
if (user_cmp->Compare(fstart, user_begin) >= 0) { if (user_cmp->Compare(fstart, user_begin) >= 0) {
@ -1745,91 +1799,105 @@ void VersionStorageInfo::ExtendOverlappingInputs(
} }
} }
#endif #endif
int startIndex = midIndex + 1; *start_index = mid_index + 1;
int endIndex = midIndex; *end_index = mid_index;
int count __attribute__((unused)) = 0; int count __attribute__((unused)) = 0;
// check backwards from 'mid' to lower indices // check backwards from 'mid' to lower indices
for (int i = midIndex; i >= 0 ; i--) { for (int i = mid_index; i >= 0 ; i--) {
const FdWithKeyRange* f = &files[i]; const FdWithKeyRange* f = &files[i];
const Slice file_limit = ExtractUserKey(f->largest_key); const Slice file_limit = ExtractUserKey(f->largest_key);
if (user_cmp->Compare(file_limit, user_begin) >= 0) { if (user_cmp->Compare(file_limit, user_begin) >= 0) {
startIndex = i; *start_index = i;
assert((count++, true)); assert((count++, true));
} else { } else {
break; break;
} }
} }
// check forward from 'mid+1' to higher indices // check forward from 'mid+1' to higher indices
for (unsigned int i = midIndex+1; for (unsigned int i = mid_index+1;
i < level_files_brief_[level].num_files; i++) { i < level_files_brief_[level].num_files; i++) {
const FdWithKeyRange* f = &files[i]; const FdWithKeyRange* f = &files[i];
const Slice file_start = ExtractUserKey(f->smallest_key); const Slice file_start = ExtractUserKey(f->smallest_key);
if (user_cmp->Compare(file_start, user_end) <= 0) { if (user_cmp->Compare(file_start, user_end) <= 0) {
assert((count++, true)); assert((count++, true));
endIndex = i; *end_index = i;
} else { } else {
break; break;
} }
} }
assert(count == endIndex - startIndex + 1); assert(count == *end_index - *start_index + 1);
// insert overlapping files into vector
for (int i = startIndex; i <= endIndex; i++) {
FileMetaData* f = files_[level][i];
inputs->push_back(f);
}
}
// Returns true iff the first or last file in inputs contains
// an overlapping user key to the file "just outside" of it (i.e.
// just after the last file, or just before the first file)
// REQUIRES: "*inputs" is a sorted list of non-overlapping files
bool VersionStorageInfo::HasOverlappingUserKey(
const std::vector<FileMetaData*>* inputs, int level) {
// If inputs empty, there is no overlap.
// If level == 0, it is assumed that all needed files were already included.
if (inputs->empty() || level == 0){
return false;
} }
// Store in *start_index and *end_index the clean range of all files in
// "level" within [begin,end]
// The mid_index specifies the index of at least one file within
// the specified range. From that file, iterate backward
// and forward to find all overlapping files and then "shrink" to
// the clean range required.
// Use FileLevel in searching, make it faster
void VersionStorageInfo::ExtendFileRangeWithinInterval(
int level, const Slice& user_begin, const Slice& user_end,
unsigned int mid_index, int* start_index, int* end_index) const {
assert(level != 0);
const Comparator* user_cmp = user_comparator_; const Comparator* user_cmp = user_comparator_;
const rocksdb::LevelFilesBrief& file_level = level_files_brief_[level];
const FdWithKeyRange* files = level_files_brief_[level].files; const FdWithKeyRange* files = level_files_brief_[level].files;
const size_t kNumFiles = file_level.num_files; #ifndef NDEBUG
{
// Check the last file in inputs against the file after it // assert that the file at mid_index is within the range
size_t last_file = FindFile(*internal_comparator_, file_level, assert(mid_index < level_files_brief_[level].num_files);
inputs->back()->largest.Encode()); const FdWithKeyRange* f = &files[mid_index];
assert(last_file < kNumFiles); // File should exist! const Slice fstart = ExtractUserKey(f->smallest_key);
if (last_file < kNumFiles-1) { // If not the last file const Slice flimit = ExtractUserKey(f->largest_key);
const Slice last_key_in_input = ExtractUserKey( assert(user_cmp->Compare(fstart, user_begin) >= 0 &&
files[last_file].largest_key); user_cmp->Compare(flimit, user_end) <= 0);
const Slice first_key_after = ExtractUserKey(
files[last_file+1].smallest_key);
if (user_cmp->Equal(last_key_in_input, first_key_after)) {
// The last user key in input overlaps with the next file's first key
return true;
} }
#endif
ExtendFileRangeOverlappingInterval(level, user_begin, user_end, mid_index,
start_index, end_index);
int left = *start_index;
int right = *end_index;
// shrink from left to right
while (left <= right) {
const Slice& first_key_in_range = ExtractUserKey(files[left].smallest_key);
if (user_cmp->Compare(first_key_in_range, user_begin) < 0) {
left++;
continue;
}
if (left > 0) { // If not first file
const Slice& last_key_before =
ExtractUserKey(files[left - 1].largest_key);
if (user_cmp->Equal(first_key_in_range, last_key_before)) {
// The first user key in range overlaps with the previous file's last
// key
left++;
continue;
} }
// Check the first file in inputs against the file just before it
size_t first_file = FindFile(*internal_comparator_, file_level,
inputs->front()->smallest.Encode());
assert(first_file <= last_file); // File should exist!
if (first_file > 0) { // If not first file
const Slice& first_key_in_input = ExtractUserKey(
files[first_file].smallest_key);
const Slice& last_key_before = ExtractUserKey(
files[first_file-1].largest_key);
if (user_cmp->Equal(first_key_in_input, last_key_before)) {
// The first user key in input overlaps with the previous file's last key
return true;
} }
break;
}
// shrink from right to left
while (left <= right) {
const Slice last_key_in_range = ExtractUserKey(files[right].largest_key);
if (user_cmp->Compare(last_key_in_range, user_end) > 0) {
right--;
continue;
}
if (right < static_cast<int>(level_files_brief_[level].num_files) -
1) { // If not the last file
const Slice first_key_after =
ExtractUserKey(files[right + 1].smallest_key);
if (user_cmp->Equal(last_key_in_range, first_key_after)) {
// The last user key in range overlaps with the next file's first key
right--;
continue;
}
}
break;
} }
return false; *start_index = left;
*end_index = right;
} }
uint64_t VersionStorageInfo::NumLevelBytes(int level) const { uint64_t VersionStorageInfo::NumLevelBytes(int level) const {

@ -163,21 +163,41 @@ class VersionStorageInfo {
bool expand_range = true) // if set, returns files which overlap the bool expand_range = true) // if set, returns files which overlap the
const; // range and overlap each other. If false, const; // range and overlap each other. If false,
// then just files intersecting the range // then just files intersecting the range
void GetCleanInputsWithinInterval(
int level, const InternalKey* begin, // nullptr means before all keys
const InternalKey* end, // nullptr means after all keys
std::vector<FileMetaData*>* inputs,
int hint_index = -1, // index of overlap file
int* file_index = nullptr) // return index of overlap file
const;
void GetOverlappingInputsBinarySearch( void GetOverlappingInputsRangeBinarySearch(
int level, int level, // level > 0
const Slice& begin, // nullptr means before all keys const Slice& begin, // nullptr means before all keys
const Slice& end, // nullptr means after all keys const Slice& end, // nullptr means after all keys
std::vector<FileMetaData*>* inputs, std::vector<FileMetaData*>* inputs,
int hint_index, // index of overlap file int hint_index, // index of overlap file
int* file_index) const; // return index of overlap file int* file_index, // return index of overlap file
bool within_interval = false) // if set, force the inputs within interval
const;
void ExtendOverlappingInputs( void ExtendFileRangeOverlappingInterval(
int level, int level,
const Slice& begin, // nullptr means before all keys const Slice& begin, // nullptr means before all keys
const Slice& end, // nullptr means after all keys const Slice& end, // nullptr means after all keys
std::vector<FileMetaData*>* inputs, unsigned int index, // start extending from this index
unsigned int index) const; // start extending from this index int* startIndex, // return the startIndex of input range
int* endIndex) // return the endIndex of input range
const;
void ExtendFileRangeWithinInterval(
int level,
const Slice& begin, // nullptr means before all keys
const Slice& end, // nullptr means after all keys
unsigned int index, // start extending from this index
int* startIndex, // return the startIndex of input range
int* endIndex) // return the endIndex of input range
const;
// Returns true iff some file in the specified level overlaps // Returns true iff some file in the specified level overlaps
// some part of [*smallest_user_key,*largest_user_key]. // some part of [*smallest_user_key,*largest_user_key].

Loading…
Cancel
Save