|
|
|
@ -138,8 +138,6 @@ struct DBImpl::CompactionState { |
|
|
|
|
return context; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
std::vector<Slice> key_buf_; |
|
|
|
|
std::vector<Slice> existing_value_buf_; |
|
|
|
|
std::vector<std::string> key_str_buf_; |
|
|
|
|
std::vector<std::string> existing_value_str_buf_; |
|
|
|
|
// new_value_buf_ will only be appended if a value changes
|
|
|
|
@ -149,12 +147,7 @@ struct DBImpl::CompactionState { |
|
|
|
|
std::vector<bool> value_changed_buf_; |
|
|
|
|
// to_delete_buf_[i] is true iff key_buf_[i] is deleted
|
|
|
|
|
std::vector<bool> to_delete_buf_; |
|
|
|
|
// buffer for the parsed internal keys, the string buffer is backed
|
|
|
|
|
// by key_str_buf_
|
|
|
|
|
std::vector<ParsedInternalKey> ikey_buf_; |
|
|
|
|
|
|
|
|
|
std::vector<Slice> other_key_buf_; |
|
|
|
|
std::vector<Slice> other_value_buf_; |
|
|
|
|
std::vector<std::string> other_key_str_buf_; |
|
|
|
|
std::vector<std::string> other_value_str_buf_; |
|
|
|
|
|
|
|
|
@ -168,12 +161,6 @@ struct DBImpl::CompactionState { |
|
|
|
|
void BufferKeyValueSlices(const Slice& key, const Slice& value) { |
|
|
|
|
key_str_buf_.emplace_back(key.ToString()); |
|
|
|
|
existing_value_str_buf_.emplace_back(value.ToString()); |
|
|
|
|
key_buf_.emplace_back(Slice(key_str_buf_.back())); |
|
|
|
|
existing_value_buf_.emplace_back(Slice(existing_value_str_buf_.back())); |
|
|
|
|
|
|
|
|
|
ParsedInternalKey ikey; |
|
|
|
|
ParseInternalKey(key_buf_.back(), &ikey); |
|
|
|
|
ikey_buf_.emplace_back(ikey); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Buffers the kv-pair that will not be run through compaction filter V2
|
|
|
|
@ -181,8 +168,6 @@ struct DBImpl::CompactionState { |
|
|
|
|
void BufferOtherKeyValueSlices(const Slice& key, const Slice& value) { |
|
|
|
|
other_key_str_buf_.emplace_back(key.ToString()); |
|
|
|
|
other_value_str_buf_.emplace_back(value.ToString()); |
|
|
|
|
other_key_buf_.emplace_back(Slice(other_key_str_buf_.back())); |
|
|
|
|
other_value_buf_.emplace_back(Slice(other_value_str_buf_.back())); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Add a kv-pair to the combined buffer
|
|
|
|
@ -196,24 +181,24 @@ struct DBImpl::CompactionState { |
|
|
|
|
void MergeKeyValueSliceBuffer(const InternalKeyComparator* comparator) { |
|
|
|
|
size_t i = 0; |
|
|
|
|
size_t j = 0; |
|
|
|
|
size_t total_size = key_buf_.size() + other_key_buf_.size(); |
|
|
|
|
size_t total_size = key_str_buf_.size() + other_key_str_buf_.size(); |
|
|
|
|
combined_key_buf_.reserve(total_size); |
|
|
|
|
combined_value_buf_.reserve(total_size); |
|
|
|
|
|
|
|
|
|
while (i + j < total_size) { |
|
|
|
|
int comp_res = 0; |
|
|
|
|
if (i < key_buf_.size() && j < other_key_buf_.size()) { |
|
|
|
|
comp_res = comparator->Compare(key_buf_[i], other_key_buf_[j]); |
|
|
|
|
} else if (i >= key_buf_.size() && j < other_key_buf_.size()) { |
|
|
|
|
if (i < key_str_buf_.size() && j < other_key_str_buf_.size()) { |
|
|
|
|
comp_res = comparator->Compare(key_str_buf_[i], other_key_str_buf_[j]); |
|
|
|
|
} else if (i >= key_str_buf_.size() && j < other_key_str_buf_.size()) { |
|
|
|
|
comp_res = 1; |
|
|
|
|
} else if (j >= other_key_buf_.size() && i < key_buf_.size()) { |
|
|
|
|
} else if (j >= other_key_str_buf_.size() && i < key_str_buf_.size()) { |
|
|
|
|
comp_res = -1; |
|
|
|
|
} |
|
|
|
|
if (comp_res > 0) { |
|
|
|
|
AddToCombinedKeyValueSlices(other_key_buf_[j], other_value_buf_[j]); |
|
|
|
|
AddToCombinedKeyValueSlices(other_key_str_buf_[j], other_value_str_buf_[j]); |
|
|
|
|
j++; |
|
|
|
|
} else if (comp_res < 0) { |
|
|
|
|
AddToCombinedKeyValueSlices(key_buf_[i], existing_value_buf_[i]); |
|
|
|
|
AddToCombinedKeyValueSlices(key_str_buf_[i], existing_value_str_buf_[i]); |
|
|
|
|
i++; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -221,29 +206,19 @@ struct DBImpl::CompactionState { |
|
|
|
|
|
|
|
|
|
void CleanupBatchBuffer() { |
|
|
|
|
to_delete_buf_.clear(); |
|
|
|
|
key_buf_.clear(); |
|
|
|
|
existing_value_buf_.clear(); |
|
|
|
|
key_str_buf_.clear(); |
|
|
|
|
existing_value_str_buf_.clear(); |
|
|
|
|
new_value_buf_.clear(); |
|
|
|
|
value_changed_buf_.clear(); |
|
|
|
|
ikey_buf_.clear(); |
|
|
|
|
|
|
|
|
|
to_delete_buf_.shrink_to_fit(); |
|
|
|
|
key_buf_.shrink_to_fit(); |
|
|
|
|
existing_value_buf_.shrink_to_fit(); |
|
|
|
|
key_str_buf_.shrink_to_fit(); |
|
|
|
|
existing_value_str_buf_.shrink_to_fit(); |
|
|
|
|
new_value_buf_.shrink_to_fit(); |
|
|
|
|
value_changed_buf_.shrink_to_fit(); |
|
|
|
|
ikey_buf_.shrink_to_fit(); |
|
|
|
|
|
|
|
|
|
other_key_buf_.clear(); |
|
|
|
|
other_value_buf_.clear(); |
|
|
|
|
other_key_str_buf_.clear(); |
|
|
|
|
other_value_str_buf_.clear(); |
|
|
|
|
other_key_buf_.shrink_to_fit(); |
|
|
|
|
other_value_buf_.shrink_to_fit(); |
|
|
|
|
other_key_str_buf_.shrink_to_fit(); |
|
|
|
|
other_value_str_buf_.shrink_to_fit(); |
|
|
|
|
} |
|
|
|
@ -2862,9 +2837,22 @@ void DBImpl::CallCompactionFilterV2(CompactionState* compact, |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Assemble slice vectors for user keys and existing values.
|
|
|
|
|
// We also keep track of our parsed internal key structs because
|
|
|
|
|
// we may need to access the sequence number in the event that
|
|
|
|
|
// keys are garbage collected during the filter process.
|
|
|
|
|
std::vector<ParsedInternalKey> ikey_buf; |
|
|
|
|
std::vector<Slice> user_key_buf; |
|
|
|
|
for (const auto& key : compact->ikey_buf_) { |
|
|
|
|
user_key_buf.emplace_back(key.user_key); |
|
|
|
|
std::vector<Slice> existing_value_buf; |
|
|
|
|
|
|
|
|
|
for (const auto& key : compact->key_str_buf_) { |
|
|
|
|
ParsedInternalKey ikey; |
|
|
|
|
ParseInternalKey(Slice(key), &ikey); |
|
|
|
|
ikey_buf.emplace_back(ikey); |
|
|
|
|
user_key_buf.emplace_back(ikey.user_key); |
|
|
|
|
} |
|
|
|
|
for (const auto& value : compact->existing_value_str_buf_) { |
|
|
|
|
existing_value_buf.emplace_back(Slice(value)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// If the user has specified a compaction filter and the sequence
|
|
|
|
@ -2874,16 +2862,16 @@ void DBImpl::CallCompactionFilterV2(CompactionState* compact, |
|
|
|
|
// the entry with a delete marker.
|
|
|
|
|
compact->to_delete_buf_ = compaction_filter_v2->Filter( |
|
|
|
|
compact->compaction->level(), |
|
|
|
|
user_key_buf, compact->existing_value_buf_, |
|
|
|
|
user_key_buf, existing_value_buf, |
|
|
|
|
&compact->new_value_buf_, |
|
|
|
|
&compact->value_changed_buf_); |
|
|
|
|
|
|
|
|
|
// new_value_buf_.size() <= to_delete__buf_.size(). "=" iff all
|
|
|
|
|
// kv-pairs in this compaction run needs to be deleted.
|
|
|
|
|
assert(compact->to_delete_buf_.size() == |
|
|
|
|
compact->key_buf_.size()); |
|
|
|
|
compact->key_str_buf_.size()); |
|
|
|
|
assert(compact->to_delete_buf_.size() == |
|
|
|
|
compact->existing_value_buf_.size()); |
|
|
|
|
compact->existing_value_str_buf_.size()); |
|
|
|
|
assert(compact->to_delete_buf_.size() == |
|
|
|
|
compact->value_changed_buf_.size()); |
|
|
|
|
|
|
|
|
@ -2894,15 +2882,15 @@ void DBImpl::CallCompactionFilterV2(CompactionState* compact, |
|
|
|
|
// the Slice buffer points to the updated buffer
|
|
|
|
|
UpdateInternalKey(&compact->key_str_buf_[i][0], |
|
|
|
|
compact->key_str_buf_[i].size(), |
|
|
|
|
compact->ikey_buf_[i].sequence, |
|
|
|
|
ikey_buf[i].sequence, |
|
|
|
|
kTypeDeletion); |
|
|
|
|
|
|
|
|
|
// no value associated with delete
|
|
|
|
|
compact->existing_value_buf_[i].clear(); |
|
|
|
|
compact->existing_value_str_buf_[i].clear(); |
|
|
|
|
RecordTick(stats_, COMPACTION_KEY_DROP_USER); |
|
|
|
|
} else if (compact->value_changed_buf_[i]) { |
|
|
|
|
compact->existing_value_buf_[i] = |
|
|
|
|
Slice(compact->new_value_buf_[new_value_idx++]); |
|
|
|
|
compact->existing_value_str_buf_[i] = |
|
|
|
|
compact->new_value_buf_[new_value_idx++]; |
|
|
|
|
} |
|
|
|
|
} // for
|
|
|
|
|
} |
|
|
|
@ -3031,7 +3019,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, |
|
|
|
|
} else { |
|
|
|
|
// Now prefix changes, this batch is done.
|
|
|
|
|
// Call compaction filter on the buffered values to change the value
|
|
|
|
|
if (compact->key_buf_.size() > 0) { |
|
|
|
|
if (compact->key_str_buf_.size() > 0) { |
|
|
|
|
CallCompactionFilterV2(compact, compaction_filter_v2); |
|
|
|
|
} |
|
|
|
|
compact->cur_prefix_ = key_prefix.ToString(); |
|
|
|
@ -3074,7 +3062,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, |
|
|
|
|
backup_input->Next(); |
|
|
|
|
if (!backup_input->Valid()) { |
|
|
|
|
// If this is the single last value, we need to merge it.
|
|
|
|
|
if (compact->key_buf_.size() > 0) { |
|
|
|
|
if (compact->key_str_buf_.size() > 0) { |
|
|
|
|
CallCompactionFilterV2(compact, compaction_filter_v2); |
|
|
|
|
} |
|
|
|
|
compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator()); |
|
|
|
@ -3097,7 +3085,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, |
|
|
|
|
} |
|
|
|
|
} // done processing all prefix batches
|
|
|
|
|
// finish the last batch
|
|
|
|
|
if (compact->key_buf_.size() > 0) { |
|
|
|
|
if (compact->key_str_buf_.size() > 0) { |
|
|
|
|
CallCompactionFilterV2(compact, compaction_filter_v2); |
|
|
|
|
} |
|
|
|
|
compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator()); |
|
|
|
|