[rocksdb] new CompactionFilterV2 API

Summary:
This diff adds a new CompactionFilterV2 API that roll up the
decisions of kv pairs during compactions. These kv pairs must share the
same key prefix. They are buffered inside the db.

    typedef std::vector<Slice> SliceVector;
    virtual std::vector<bool> Filter(int level,
                                 const SliceVector& keys,
                                 const SliceVector& existing_values,
                                 std::vector<std::string>* new_values,
                                 std::vector<bool>* values_changed
                                 ) const = 0;

Application can override the Filter() function to operate
on the buffered kv pairs. More details in the inline documentation.

Test Plan:
make check. Added unit tests to make sure Keep, Delete,
Change all works.

Reviewers: haobo

CCs: leveldb

Differential Revision: https://reviews.facebook.net/D15087
main
Danny Guo 11 years ago
parent cda4006e87
commit b47812fba6
  1. 2
      HISTORY.md
  2. 495
      db/db_impl.cc
  3. 19
      db/db_impl.h
  4. 254
      db/db_test.cc
  5. 14
      db/merge_helper.cc
  6. 3
      db/merge_helper.h
  7. 113
      include/rocksdb/compaction_filter.h
  8. 2
      include/rocksdb/env.h
  9. 5
      include/rocksdb/options.h
  10. 5
      util/options.cc
  11. 2
      utilities/ttl/db_ttl.h
  12. 2
      utilities/ttl/ttl_test.cc

@ -15,11 +15,13 @@
* Added Env::GetThreadPoolQueueLen(), which returns the waiting queue length of thread pools * Added Env::GetThreadPoolQueueLen(), which returns the waiting queue length of thread pools
* Added a command "checkconsistency" in ldb tool, which checks * Added a command "checkconsistency" in ldb tool, which checks
if file system state matches DB state (file existence and file sizes) if file system state matches DB state (file existence and file sizes)
* CompactionFilter::Context is now CompactionFilterContext. It is shared by CompactionFilter and CompactionFilterV2
### New Features ### New Features
* If we find one truncated record at the end of the MANIFEST or WAL files, * If we find one truncated record at the end of the MANIFEST or WAL files,
we will ignore it. We assume that writers of these records were interrupted we will ignore it. We assume that writers of these records were interrupted
and that we can safely ignore it. and that we can safely ignore it.
* Now compaction filter has a V2 interface. It buffers the kv-pairs sharing the same key prefix, process them in batches, and return the batched results back to DB.
## 2.7.0 (01/28/2014) ## 2.7.0 (01/28/2014)

@ -70,6 +70,7 @@ namespace rocksdb {
int DBImpl::SuperVersion::dummy = 0; int DBImpl::SuperVersion::dummy = 0;
void* const DBImpl::SuperVersion::kSVInUse = &DBImpl::SuperVersion::dummy; void* const DBImpl::SuperVersion::kSVInUse = &DBImpl::SuperVersion::dummy;
void* const DBImpl::SuperVersion::kSVObsolete = nullptr; void* const DBImpl::SuperVersion::kSVObsolete = nullptr;
const std::string kNullString = "NULL";
void DumpLeveldbBuildVersion(Logger * log); void DumpLeveldbBuildVersion(Logger * log);
@ -118,12 +119,129 @@ struct DBImpl::CompactionState {
} }
// Create a client visible context of this compaction // Create a client visible context of this compaction
CompactionFilter::Context GetFilterContext() { CompactionFilterContext GetFilterContext() {
CompactionFilter::Context context; CompactionFilterContext context;
context.is_full_compaction = compaction->IsFullCompaction(); context.is_full_compaction = compaction->IsFullCompaction();
context.is_manual_compaction = compaction->IsManualCompaction(); context.is_manual_compaction = compaction->IsManualCompaction();
return context; return context;
} }
std::vector<Slice> key_buf_;
std::vector<Slice> existing_value_buf_;
std::vector<std::string> key_str_buf_;
std::vector<std::string> existing_value_str_buf_;
// new_value_buf_ will only be appended if a value changes
std::vector<std::string> new_value_buf_;
// if values_changed_buf_[i] is true
// new_value_buf_ will add a new entry with the changed value
std::vector<bool> value_changed_buf_;
// to_delete_buf_[i] is true iff key_buf_[i] is deleted
std::vector<bool> to_delete_buf_;
// buffer for the parsed internal keys, the string buffer is backed
// by key_str_buf_
std::vector<ParsedInternalKey> ikey_buf_;
std::vector<Slice> other_key_buf_;
std::vector<Slice> other_value_buf_;
std::vector<std::string> other_key_str_buf_;
std::vector<std::string> other_value_str_buf_;
std::vector<Slice> combined_key_buf_;
std::vector<Slice> combined_value_buf_;
std::string cur_prefix_;
// Buffers the kv-pair that will be run through compaction filter V2
// in the future.
void BufferKeyValueSlices(const Slice& key, const Slice& value) {
key_str_buf_.emplace_back(key.ToString());
existing_value_str_buf_.emplace_back(value.ToString());
key_buf_.emplace_back(Slice(key_str_buf_.back()));
existing_value_buf_.emplace_back(Slice(existing_value_str_buf_.back()));
ParsedInternalKey ikey;
ParseInternalKey(key_buf_.back(), &ikey);
ikey_buf_.emplace_back(ikey);
}
// Buffers the kv-pair that will not be run through compaction filter V2
// in the future.
void BufferOtherKeyValueSlices(const Slice& key, const Slice& value) {
other_key_str_buf_.emplace_back(key.ToString());
other_value_str_buf_.emplace_back(value.ToString());
other_key_buf_.emplace_back(Slice(other_key_str_buf_.back()));
other_value_buf_.emplace_back(Slice(other_value_str_buf_.back()));
}
// Add a kv-pair to the combined buffer
void AddToCombinedKeyValueSlices(const Slice& key, const Slice& value) {
// The real strings are stored in the batch buffers
combined_key_buf_.emplace_back(key);
combined_value_buf_.emplace_back(value);
}
// Merging the two buffers
void MergeKeyValueSliceBuffer(const InternalKeyComparator* comparator) {
size_t i = 0;
size_t j = 0;
size_t total_size = key_buf_.size() + other_key_buf_.size();
combined_key_buf_.reserve(total_size);
combined_value_buf_.reserve(total_size);
while (i + j < total_size) {
int comp_res = 0;
if (i < key_buf_.size() && j < other_key_buf_.size()) {
comp_res = comparator->Compare(key_buf_[i], other_key_buf_[j]);
} else if (i >= key_buf_.size() && j < other_key_buf_.size()) {
comp_res = 1;
} else if (j >= other_key_buf_.size() && i < key_buf_.size()) {
comp_res = -1;
}
if (comp_res > 0) {
AddToCombinedKeyValueSlices(other_key_buf_[j], other_value_buf_[j]);
j++;
} else if (comp_res < 0) {
AddToCombinedKeyValueSlices(key_buf_[i], existing_value_buf_[i]);
i++;
}
}
}
void CleanupBatchBuffer() {
to_delete_buf_.clear();
key_buf_.clear();
existing_value_buf_.clear();
key_str_buf_.clear();
existing_value_str_buf_.clear();
new_value_buf_.clear();
value_changed_buf_.clear();
ikey_buf_.clear();
to_delete_buf_.shrink_to_fit();
key_buf_.shrink_to_fit();
existing_value_buf_.shrink_to_fit();
key_str_buf_.shrink_to_fit();
existing_value_str_buf_.shrink_to_fit();
new_value_buf_.shrink_to_fit();
value_changed_buf_.shrink_to_fit();
ikey_buf_.shrink_to_fit();
other_key_buf_.clear();
other_value_buf_.clear();
other_key_str_buf_.clear();
other_value_str_buf_.clear();
other_key_buf_.shrink_to_fit();
other_value_buf_.shrink_to_fit();
other_key_str_buf_.shrink_to_fit();
other_value_str_buf_.shrink_to_fit();
}
void CleanupMergedBuffer() {
combined_key_buf_.clear();
combined_value_buf_.clear();
combined_key_buf_.shrink_to_fit();
combined_value_buf_.shrink_to_fit();
}
}; };
// Fix user-supplied options to be reasonable // Fix user-supplied options to be reasonable
@ -2401,66 +2519,27 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot(
return 0; return 0;
} }
Status DBImpl::DoCompactionWork(CompactionState* compact, Status DBImpl::ProcessKeyValueCompaction(
SequenceNumber visible_at_tip,
SequenceNumber earliest_snapshot,
SequenceNumber latest_snapshot,
DeletionState& deletion_state, DeletionState& deletion_state,
bool bottommost_level,
int64_t& imm_micros,
Iterator* input,
CompactionState* compact,
bool is_compaction_v2,
LogBuffer* log_buffer) { LogBuffer* log_buffer) {
assert(compact); size_t combined_idx = 0;
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
Log(options_.info_log,
"Compacting %d@%d + %d@%d files, score %.2f slots available %d",
compact->compaction->num_input_files(0),
compact->compaction->level(),
compact->compaction->num_input_files(1),
compact->compaction->output_level(),
compact->compaction->score(),
options_.max_background_compactions - bg_compaction_scheduled_);
char scratch[2345];
compact->compaction->Summary(scratch, sizeof(scratch));
Log(options_.info_log, "Compaction start summary: %s\n", scratch);
assert(versions_->current()->NumLevelFiles(compact->compaction->level()) > 0);
assert(compact->builder == nullptr);
assert(!compact->outfile);
SequenceNumber visible_at_tip = 0;
SequenceNumber earliest_snapshot;
SequenceNumber latest_snapshot = 0;
snapshots_.getAll(compact->existing_snapshots);
if (compact->existing_snapshots.size() == 0) {
// optimize for fast path if there are no snapshots
visible_at_tip = versions_->LastSequence();
earliest_snapshot = visible_at_tip;
} else {
latest_snapshot = compact->existing_snapshots.back();
// Add the current seqno as the 'latest' virtual
// snapshot to the end of this list.
compact->existing_snapshots.push_back(versions_->LastSequence());
earliest_snapshot = compact->existing_snapshots[0];
}
// Is this compaction producing files at the bottommost level?
bool bottommost_level = compact->compaction->BottomMostLevel();
// Allocate the output file numbers before we release the lock
AllocateCompactionOutputFileNumbers(compact);
// Release mutex while we're actually doing the compaction work
mutex_.Unlock();
// flush log buffer immediately after releasing the mutex
log_buffer->FlushBufferToLog();
const uint64_t start_micros = env_->NowMicros();
unique_ptr<Iterator> input(versions_->MakeInputIterator(compact->compaction));
input->SeekToFirst();
Status status; Status status;
std::string compaction_filter_value;
ParsedInternalKey ikey; ParsedInternalKey ikey;
std::string current_user_key; std::string current_user_key;
bool has_current_user_key = false; bool has_current_user_key = false;
std::vector<char> delete_key; // for compaction filter
SequenceNumber last_sequence_for_key __attribute__((unused)) = SequenceNumber last_sequence_for_key __attribute__((unused)) =
kMaxSequenceNumber; kMaxSequenceNumber;
SequenceNumber visible_in_snapshot = kMaxSequenceNumber; SequenceNumber visible_in_snapshot = kMaxSequenceNumber;
std::string compaction_filter_value;
std::vector<char> delete_key; // for compaction filter
MergeHelper merge(user_comparator(), options_.merge_operator.get(), MergeHelper merge(user_comparator(), options_.merge_operator.get(),
options_.info_log.get(), options_.info_log.get(),
options_.min_partial_merge_operands, options_.min_partial_merge_operands,
@ -2490,12 +2569,31 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
imm_micros += (env_->NowMicros() - imm_start); imm_micros += (env_->NowMicros() - imm_start);
} }
Slice key = input->key(); Slice key;
Slice value = input->value(); Slice value;
// If is_compaction_v2 is on, kv-pairs are reset to the prefix batch.
// This prefix batch should contain results after calling
// compaction_filter_v2.
//
// If is_compaction_v2 is off, this function will go through all the
// kv-pairs in input.
if (!is_compaction_v2) {
key = input->key();
value = input->value();
} else {
if (combined_idx >= compact->combined_key_buf_.size()) {
break;
}
assert(combined_idx < compact->combined_key_buf_.size());
key = compact->combined_key_buf_[combined_idx];
value = compact->combined_value_buf_[combined_idx];
++combined_idx;
}
if (compact->compaction->ShouldStopBefore(key) && if (compact->compaction->ShouldStopBefore(key) &&
compact->builder != nullptr) { compact->builder != nullptr) {
status = FinishCompactionOutputFile(compact, input.get()); status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) { if (!status.ok()) {
break; break;
} }
@ -2521,9 +2619,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
has_current_user_key = true; has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber; last_sequence_for_key = kMaxSequenceNumber;
visible_in_snapshot = kMaxSequenceNumber; visible_in_snapshot = kMaxSequenceNumber;
// apply the compaction filter to the first occurrence of the user key // apply the compaction filter to the first occurrence of the user key
if (compaction_filter && if (compaction_filter && !is_compaction_v2 &&
ikey.type == kTypeValue && ikey.type == kTypeValue &&
(visible_at_tip || ikey.sequence > latest_snapshot)) { (visible_at_tip || ikey.sequence > latest_snapshot)) {
// If the user has specified a compaction filter and the sequence // If the user has specified a compaction filter and the sequence
@ -2596,8 +2693,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
// object to minimize change to the existing flow. Turn out this // object to minimize change to the existing flow. Turn out this
// logic could also be nicely re-used for memtable flush purge // logic could also be nicely re-used for memtable flush purge
// optimization in BuildTable. // optimization in BuildTable.
merge.MergeUntil(input.get(), prev_snapshot, bottommost_level, int steps = 0;
options_.statistics.get()); merge.MergeUntil(input, prev_snapshot, bottommost_level,
options_.statistics.get(), &steps);
// Skip the Merge ops
combined_idx = combined_idx - 1 + steps;
current_entry_is_merging = true; current_entry_is_merging = true;
if (merge.IsSuccess()) { if (merge.IsSuccess()) {
// Successfully found Put/Delete/(end-of-key-range) while merging // Successfully found Put/Delete/(end-of-key-range) while merging
@ -2699,7 +2800,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
// Close output file if it is big enough // Close output file if it is big enough
if (compact->builder->FileSize() >= if (compact->builder->FileSize() >=
compact->compaction->MaxOutputFileSize()) { compact->compaction->MaxOutputFileSize()) {
status = FinishCompactionOutputFile(compact, input.get()); status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) { if (!status.ok()) {
break; break;
} }
@ -2736,6 +2837,278 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
} }
} }
return status;
}
void DBImpl::CallCompactionFilterV2(CompactionState* compact,
CompactionFilterV2* compaction_filter_v2) {
if (compact == nullptr || compaction_filter_v2 == nullptr) {
return;
}
std::vector<Slice> user_key_buf;
for (const auto& key : compact->ikey_buf_) {
user_key_buf.emplace_back(key.user_key);
}
// If the user has specified a compaction filter and the sequence
// number is greater than any external snapshot, then invoke the
// filter.
// If the return value of the compaction filter is true, replace
// the entry with a delete marker.
compact->to_delete_buf_ = compaction_filter_v2->Filter(
compact->compaction->level(),
user_key_buf, compact->existing_value_buf_,
&compact->new_value_buf_,
&compact->value_changed_buf_);
// new_value_buf_.size() <= to_delete__buf_.size(). "=" iff all
// kv-pairs in this compaction run needs to be deleted.
assert(compact->to_delete_buf_.size() ==
compact->key_buf_.size());
assert(compact->to_delete_buf_.size() ==
compact->existing_value_buf_.size());
assert(compact->to_delete_buf_.size() ==
compact->value_changed_buf_.size());
int new_value_idx = 0;
for (unsigned int i = 0; i < compact->to_delete_buf_.size(); ++i) {
if (compact->to_delete_buf_[i]) {
// update the string buffer directly
// the Slice buffer points to the updated buffer
UpdateInternalKey(&compact->key_str_buf_[i][0],
compact->key_str_buf_[i].size(),
compact->ikey_buf_[i].sequence,
kTypeDeletion);
// no value associated with delete
compact->existing_value_buf_[i].clear();
RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_USER);
} else if (compact->value_changed_buf_[i]) {
compact->existing_value_buf_[i] =
Slice(compact->new_value_buf_[new_value_idx++]);
}
} // for
}
Status DBImpl::DoCompactionWork(CompactionState* compact,
DeletionState& deletion_state,
LogBuffer* log_buffer) {
assert(compact);
compact->CleanupBatchBuffer();
compact->CleanupMergedBuffer();
compact->cur_prefix_ = kNullString;
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
Log(options_.info_log,
"Compacting %d@%d + %d@%d files, score %.2f slots available %d",
compact->compaction->num_input_files(0),
compact->compaction->level(),
compact->compaction->num_input_files(1),
compact->compaction->output_level(),
compact->compaction->score(),
options_.max_background_compactions - bg_compaction_scheduled_);
char scratch[2345];
compact->compaction->Summary(scratch, sizeof(scratch));
Log(options_.info_log, "Compaction start summary: %s\n", scratch);
assert(versions_->current()->NumLevelFiles(compact->compaction->level()) > 0);
assert(compact->builder == nullptr);
assert(!compact->outfile);
SequenceNumber visible_at_tip = 0;
SequenceNumber earliest_snapshot;
SequenceNumber latest_snapshot = 0;
snapshots_.getAll(compact->existing_snapshots);
if (compact->existing_snapshots.size() == 0) {
// optimize for fast path if there are no snapshots
visible_at_tip = versions_->LastSequence();
earliest_snapshot = visible_at_tip;
} else {
latest_snapshot = compact->existing_snapshots.back();
// Add the current seqno as the 'latest' virtual
// snapshot to the end of this list.
compact->existing_snapshots.push_back(versions_->LastSequence());
earliest_snapshot = compact->existing_snapshots[0];
}
// Is this compaction producing files at the bottommost level?
bool bottommost_level = compact->compaction->BottomMostLevel();
// Allocate the output file numbers before we release the lock
AllocateCompactionOutputFileNumbers(compact);
// Release mutex while we're actually doing the compaction work
mutex_.Unlock();
const uint64_t start_micros = env_->NowMicros();
unique_ptr<Iterator> input(versions_->MakeInputIterator(compact->compaction));
input->SeekToFirst();
shared_ptr<Iterator> backup_input(
versions_->MakeInputIterator(compact->compaction));
backup_input->SeekToFirst();
Status status;
ParsedInternalKey ikey;
std::unique_ptr<CompactionFilterV2> compaction_filter_from_factory_v2
= nullptr;
auto context = compact->GetFilterContext();
compaction_filter_from_factory_v2 =
options_.compaction_filter_factory_v2->CreateCompactionFilterV2(context);
auto compaction_filter_v2 =
compaction_filter_from_factory_v2.get();
// temp_backup_input always point to the start of the current buffer
// temp_backup_input = backup_input;
// iterate through input,
// 1) buffer ineligible keys and value keys into 2 separate buffers;
// 2) send value_buffer to compaction filter and alternate the values;
// 3) merge value_buffer with ineligible_value_buffer;
// 4) run the modified "compaction" using the old for loop.
if (compaction_filter_v2) {
for (; backup_input->Valid() && !shutting_down_.Acquire_Load(); ) {
// Prioritize immutable compaction work
if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) {
const uint64_t imm_start = env_->NowMicros();
LogFlush(options_.info_log);
mutex_.Lock();
if (imm_.IsFlushPending()) {
FlushMemTableToOutputFile(nullptr, deletion_state, log_buffer);
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
}
mutex_.Unlock();
imm_micros += (env_->NowMicros() - imm_start);
}
Slice key = backup_input->key();
Slice value = backup_input->value();
const SliceTransform* transformer =
options_.compaction_filter_factory_v2->GetPrefixExtractor();
std::string key_prefix = transformer->Transform(key).ToString();
if (compact->cur_prefix_ == kNullString) {
compact->cur_prefix_ = key_prefix;
}
if (!ParseInternalKey(key, &ikey)) {
// log error
Log(options_.info_log, "Failed to parse key: %s",
key.ToString().c_str());
continue;
} else {
// If the prefix remains the same, keep buffering
if (key_prefix == compact->cur_prefix_) {
// Apply the compaction filter V2 to all the kv pairs sharing
// the same prefix
if (ikey.type == kTypeValue &&
(visible_at_tip || ikey.sequence > latest_snapshot)) {
// Buffer all keys sharing the same prefix for CompactionFilterV2
// Iterate through keys to check prefix
compact->BufferKeyValueSlices(key, value);
} else {
// buffer ineligible keys
compact->BufferOtherKeyValueSlices(key, value);
}
backup_input->Next();
continue;
// finish changing values for eligible keys
} else {
// Now prefix changes, this batch is done.
// Call compaction filter on the buffered values to change the value
if (compact->key_buf_.size() > 0) {
CallCompactionFilterV2(compact, compaction_filter_v2);
}
compact->cur_prefix_ = key_prefix;
}
}
// Merge this batch of data (values + ineligible keys)
compact->MergeKeyValueSliceBuffer(&internal_comparator_);
// Done buffering for the current prefix. Spit it out to disk
// Now just iterate through all the kv-pairs
status = ProcessKeyValueCompaction(
visible_at_tip,
earliest_snapshot,
latest_snapshot,
deletion_state,
bottommost_level,
imm_micros,
input.get(),
compact,
true,
log_buffer);
if (!status.ok()) {
break;
}
// After writing the kv-pairs, we can safely remove the reference
// to the string buffer and clean them up
compact->CleanupBatchBuffer();
compact->CleanupMergedBuffer();
// Buffer the key that triggers the mismatch in prefix
if (ikey.type == kTypeValue &&
(visible_at_tip || ikey.sequence > latest_snapshot)) {
compact->BufferKeyValueSlices(key, value);
} else {
compact->BufferOtherKeyValueSlices(key, value);
}
backup_input->Next();
if (!backup_input->Valid()) {
// If this is the single last value, we need to merge it.
if (compact->key_buf_.size() > 0) {
CallCompactionFilterV2(compact, compaction_filter_v2);
}
compact->MergeKeyValueSliceBuffer(&internal_comparator_);
status = ProcessKeyValueCompaction(
visible_at_tip,
earliest_snapshot,
latest_snapshot,
deletion_state,
bottommost_level,
imm_micros,
input.get(),
compact,
true,
log_buffer);
compact->CleanupBatchBuffer();
compact->CleanupMergedBuffer();
}
} // done processing all prefix batches
// finish the last batch
if (compact->key_buf_.size() > 0) {
CallCompactionFilterV2(compact, compaction_filter_v2);
}
compact->MergeKeyValueSliceBuffer(&internal_comparator_);
status = ProcessKeyValueCompaction(
visible_at_tip,
earliest_snapshot,
latest_snapshot,
deletion_state,
bottommost_level,
imm_micros,
input.get(),
compact,
true,
log_buffer);
} // checking for compaction filter v2
if (!compaction_filter_v2) {
status = ProcessKeyValueCompaction(
visible_at_tip,
earliest_snapshot,
latest_snapshot,
deletion_state,
bottommost_level,
imm_micros,
input.get(),
compact,
false,
log_buffer);
}
if (status.ok() && shutting_down_.Acquire_Load()) { if (status.ok() && shutting_down_.Acquire_Load()) {
status = Status::ShutdownInProgress( status = Status::ShutdownInProgress(
"Database shutdown started during compaction"); "Database shutdown started during compaction");

@ -36,6 +36,7 @@ class TableCache;
class Version; class Version;
class VersionEdit; class VersionEdit;
class VersionSet; class VersionSet;
class CompactionFilterV2;
class DBImpl : public DB { class DBImpl : public DB {
public: public:
@ -366,6 +367,24 @@ class DBImpl : public DB {
DeletionState& deletion_state, DeletionState& deletion_state,
LogBuffer* log_buffer); LogBuffer* log_buffer);
// Call compaction filter if is_compaction_v2 is not true. Then iterate
// through input and compact the kv-pairs
Status ProcessKeyValueCompaction(
SequenceNumber visible_at_tip,
SequenceNumber earliest_snapshot,
SequenceNumber latest_snapshot,
DeletionState& deletion_state,
bool bottommost_level,
int64_t& imm_micros,
Iterator* input,
CompactionState* compact,
bool is_compaction_v2,
LogBuffer* log_buffer);
// Call compaction_filter_v2->Filter() on kv-pairs in compact
void CallCompactionFilterV2(CompactionState* compact,
CompactionFilterV2* compaction_filter_v2);
Status OpenCompactionOutputFile(CompactionState* compact); Status OpenCompactionOutputFile(CompactionState* compact);
Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input); Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input);
Status InstallCompactionResults(CompactionState* compact); Status InstallCompactionResults(CompactionState* compact);

@ -2434,7 +2434,7 @@ class KeepFilterFactory : public CompactionFilterFactory {
: check_context_(check_context) {} : check_context_(check_context) {}
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override { const CompactionFilterContext& context) override {
if (check_context_) { if (check_context_) {
ASSERT_EQ(expect_full_compaction_.load(), context.is_full_compaction); ASSERT_EQ(expect_full_compaction_.load(), context.is_full_compaction);
ASSERT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction); ASSERT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction);
@ -2451,7 +2451,7 @@ class KeepFilterFactory : public CompactionFilterFactory {
class DeleteFilterFactory : public CompactionFilterFactory { class DeleteFilterFactory : public CompactionFilterFactory {
public: public:
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override { const CompactionFilterContext& context) override {
if (context.is_manual_compaction) { if (context.is_manual_compaction) {
return std::unique_ptr<CompactionFilter>(new DeleteFilter()); return std::unique_ptr<CompactionFilter>(new DeleteFilter());
} else { } else {
@ -2467,7 +2467,7 @@ class ChangeFilterFactory : public CompactionFilterFactory {
explicit ChangeFilterFactory() {} explicit ChangeFilterFactory() {}
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override { const CompactionFilterContext& context) override {
return std::unique_ptr<CompactionFilter>(new ChangeFilter()); return std::unique_ptr<CompactionFilter>(new ChangeFilter());
} }
@ -3507,7 +3507,7 @@ TEST(DBTest, CompactionFilterWithValueChange) {
// verify that all keys now have the new value that // verify that all keys now have the new value that
// was set by the compaction process. // was set by the compaction process.
for (int i = 0; i < 100000; i++) { for (int i = 0; i < 100001; i++) {
char key[100]; char key[100];
snprintf(key, sizeof(key), "B%010d", i); snprintf(key, sizeof(key), "B%010d", i);
std::string newvalue = Get(key); std::string newvalue = Get(key);
@ -3570,6 +3570,252 @@ TEST(DBTest, CompactionFilterContextManual) {
delete iter; delete iter;
} }
class KeepFilterV2 : public CompactionFilterV2 {
public:
virtual std::vector<bool> Filter(int level,
const SliceVector& keys,
const SliceVector& existing_values,
std::vector<std::string>* new_values,
std::vector<bool>* values_changed)
const override {
cfilter_count++;
std::vector<bool> ret;
new_values->clear();
values_changed->clear();
for (unsigned int i = 0; i < keys.size(); ++i) {
values_changed->push_back(false);
ret.push_back(false);
}
return ret;
}
virtual const char* Name() const override {
return "KeepFilterV2";
}
};
class DeleteFilterV2 : public CompactionFilterV2 {
public:
virtual std::vector<bool> Filter(int level,
const SliceVector& keys,
const SliceVector& existing_values,
std::vector<std::string>* new_values,
std::vector<bool>* values_changed)
const override {
cfilter_count++;
new_values->clear();
values_changed->clear();
std::vector<bool> ret;
for (unsigned int i = 0; i < keys.size(); ++i) {
values_changed->push_back(false);
ret.push_back(true);
}
return ret;
}
virtual const char* Name() const override {
return "DeleteFilterV2";
}
};
class ChangeFilterV2 : public CompactionFilterV2 {
public:
virtual std::vector<bool> Filter(int level,
const SliceVector& keys,
const SliceVector& existing_values,
std::vector<std::string>* new_values,
std::vector<bool>* values_changed)
const override {
std::vector<bool> ret;
new_values->clear();
values_changed->clear();
for (unsigned int i = 0; i < keys.size(); ++i) {
values_changed->push_back(true);
new_values->push_back(NEW_VALUE);
ret.push_back(false);
}
return ret;
}
virtual const char* Name() const override {
return "ChangeFilterV2";
}
};
class KeepFilterFactoryV2 : public CompactionFilterFactoryV2 {
public:
explicit KeepFilterFactoryV2(const SliceTransform* prefix_extractor)
: CompactionFilterFactoryV2(prefix_extractor) { }
virtual std::unique_ptr<CompactionFilterV2>
CreateCompactionFilterV2(
const CompactionFilterContext& context) override {
return std::unique_ptr<CompactionFilterV2>(new KeepFilterV2());
}
virtual const char* Name() const override {
return "KeepFilterFactoryV2";
}
};
class DeleteFilterFactoryV2 : public CompactionFilterFactoryV2 {
public:
explicit DeleteFilterFactoryV2(const SliceTransform* prefix_extractor)
: CompactionFilterFactoryV2(prefix_extractor) { }
virtual std::unique_ptr<CompactionFilterV2>
CreateCompactionFilterV2(
const CompactionFilterContext& context) override {
return std::unique_ptr<CompactionFilterV2>(new DeleteFilterV2());
}
virtual const char* Name() const override {
return "DeleteFilterFactoryV2";
}
};
class ChangeFilterFactoryV2 : public CompactionFilterFactoryV2 {
public:
explicit ChangeFilterFactoryV2(const SliceTransform* prefix_extractor)
: CompactionFilterFactoryV2(prefix_extractor) { }
virtual std::unique_ptr<CompactionFilterV2>
CreateCompactionFilterV2(
const CompactionFilterContext& context) override {
return std::unique_ptr<CompactionFilterV2>(new ChangeFilterV2());
}
virtual const char* Name() const override {
return "ChangeFilterFactoryV2";
}
};
TEST(DBTest, CompactionFilterV2) {
Options options = CurrentOptions();
options.num_levels = 3;
options.max_mem_compaction_level = 0;
// extract prefix
auto prefix_extractor = NewFixedPrefixTransform(8);
options.compaction_filter_factory_v2
= std::make_shared<KeepFilterFactoryV2>(prefix_extractor);
// In a testing environment, we can only flush the application
// compaction filter buffer using universal compaction
option_config_ = kUniversalCompaction;
options.compaction_style = (rocksdb::CompactionStyle)1;
Reopen(&options);
// Write 100K keys, these are written to a few files in L0.
const std::string value(10, 'x');
for (int i = 0; i < 100000; i++) {
char key[100];
snprintf(key, sizeof(key), "B%08d%010d", i , i);
Put(key, value);
}
dbfull()->TEST_FlushMemTable();
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
dbfull()->TEST_CompactRange(1, nullptr, nullptr);
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
// All the files are in the lowest level.
int count = 0;
int total = 0;
Iterator* iter = dbfull()->TEST_NewInternalIterator();
iter->SeekToFirst();
ASSERT_OK(iter->status());
while (iter->Valid()) {
ParsedInternalKey ikey(Slice(), 0, kTypeValue);
ikey.sequence = -1;
ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
total++;
if (ikey.sequence != 0) {
count++;
}
iter->Next();
}
ASSERT_EQ(total, 100000);
// 1 snapshot only. Since we are using universal compacton,
// the sequence no is cleared for better compression
ASSERT_EQ(count, 1);
delete iter;
// create a new database with the compaction
// filter in such a way that it deletes all keys
options.compaction_filter_factory_v2 =
std::make_shared<DeleteFilterFactoryV2>(prefix_extractor);
options.create_if_missing = true;
DestroyAndReopen(&options);
// write all the keys once again.
for (int i = 0; i < 100000; i++) {
char key[100];
snprintf(key, sizeof(key), "B%08d%010d", i, i);
Put(key, value);
}
dbfull()->TEST_FlushMemTable();
ASSERT_NE(NumTableFilesAtLevel(0), 0);
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
dbfull()->TEST_CompactRange(1, nullptr, nullptr);
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
// Scan the entire database to ensure that nothing is left
iter = db_->NewIterator(ReadOptions());
iter->SeekToFirst();
count = 0;
while (iter->Valid()) {
count++;
iter->Next();
}
ASSERT_EQ(count, 0);
delete iter;
}
TEST(DBTest, CompactionFilterV2WithValueChange) {
Options options = CurrentOptions();
options.num_levels = 3;
options.max_mem_compaction_level = 0;
auto prefix_extractor = NewFixedPrefixTransform(8);
options.compaction_filter_factory_v2 =
std::make_shared<ChangeFilterFactoryV2>(prefix_extractor);
// In a testing environment, we can only flush the application
// compaction filter buffer using universal compaction
option_config_ = kUniversalCompaction;
options.compaction_style = (rocksdb::CompactionStyle)1;
Reopen(&options);
// Write 100K+1 keys, these are written to a few files
// in L0. We do this so that the current snapshot points
// to the 100001 key.The compaction filter is not invoked
// on keys that are visible via a snapshot because we
// anyways cannot delete it.
const std::string value(10, 'x');
for (int i = 0; i < 100001; i++) {
char key[100];
snprintf(key, sizeof(key), "B%08d%010d", i, i);
Put(key, value);
}
// push all files to lower levels
dbfull()->TEST_FlushMemTable();
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
dbfull()->TEST_CompactRange(1, nullptr, nullptr);
// verify that all keys now have the new value that
// was set by the compaction process.
for (int i = 0; i < 100001; i++) {
char key[100];
snprintf(key, sizeof(key), "B%08d%010d", i, i);
std::string newvalue = Get(key);
ASSERT_EQ(newvalue.compare(NEW_VALUE), 0);
}
}
TEST(DBTest, SparseMerge) { TEST(DBTest, SparseMerge) {
do { do {
Options options = CurrentOptions(); Options options = CurrentOptions();

@ -21,7 +21,7 @@ namespace rocksdb {
// operands_ stores the list of merge operands encountered while merging. // operands_ stores the list of merge operands encountered while merging.
// keys_[i] corresponds to operands_[i] for each i. // keys_[i] corresponds to operands_[i] for each i.
void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
bool at_bottom, Statistics* stats) { bool at_bottom, Statistics* stats, int* steps) {
// Get a copy of the internal key, before it's invalidated by iter->Next() // Get a copy of the internal key, before it's invalidated by iter->Next()
// Also maintain the list of merge operands seen. // Also maintain the list of merge operands seen.
keys_.clear(); keys_.clear();
@ -42,6 +42,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
bool hit_the_next_user_key = false; bool hit_the_next_user_key = false;
ParsedInternalKey ikey; ParsedInternalKey ikey;
std::string merge_result; // Temporary value for merge results std::string merge_result; // Temporary value for merge results
if (steps) {
++(*steps);
}
for (iter->Next(); iter->Valid(); iter->Next()) { for (iter->Next(); iter->Valid(); iter->Next()) {
assert(operands_.size() >= 1); // Should be invariants! assert(operands_.size() >= 1); // Should be invariants!
assert(keys_.size() == operands_.size()); assert(keys_.size() == operands_.size());
@ -91,6 +94,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
// move iter to the next entry (before doing anything else) // move iter to the next entry (before doing anything else)
iter->Next(); iter->Next();
if (steps) {
++(*steps);
}
return; return;
} }
@ -119,6 +125,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
// move iter to the next entry // move iter to the next entry
iter->Next(); iter->Next();
if (steps) {
++(*steps);
}
return; return;
} }
@ -133,6 +142,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
// request or later did a partial merge. // request or later did a partial merge.
keys_.push_front(iter->key().ToString()); keys_.push_front(iter->key().ToString());
operands_.push_front(iter->value().ToString()); operands_.push_front(iter->value().ToString());
if (steps) {
++(*steps);
}
} }
} }

@ -47,7 +47,8 @@ class MergeHelper {
// at_bottom: (IN) true if the iterator covers the bottem level, which means // at_bottom: (IN) true if the iterator covers the bottem level, which means
// we could reach the start of the history of this user key. // we could reach the start of the history of this user key.
void MergeUntil(Iterator* iter, SequenceNumber stop_before = 0, void MergeUntil(Iterator* iter, SequenceNumber stop_before = 0,
bool at_bottom = false, Statistics* stats = nullptr); bool at_bottom = false, Statistics* stats = nullptr,
int* steps = nullptr);
// Query the merge result // Query the merge result
// These are valid until the next MergeUntil call // These are valid until the next MergeUntil call

@ -10,19 +10,15 @@
#define STORAGE_ROCKSDB_INCLUDE_COMPACTION_FILTER_H_ #define STORAGE_ROCKSDB_INCLUDE_COMPACTION_FILTER_H_
#include <string> #include <string>
#include <vector>
namespace rocksdb { namespace rocksdb {
class Slice; class Slice;
class SliceTransform;
// CompactionFilter allows an application to modify/delete a key-value at
// the time of compaction.
class CompactionFilter {
public:
// Context information of a compaction run // Context information of a compaction run
struct Context { struct CompactionFilterContext {
// Does this compaction run include all data files // Does this compaction run include all data files
bool is_full_compaction; bool is_full_compaction;
// Is this compaction requested by the client (true), // Is this compaction requested by the client (true),
@ -30,6 +26,11 @@ class CompactionFilter {
bool is_manual_compaction; bool is_manual_compaction;
}; };
// CompactionFilter allows an application to modify/delete a key-value at
// the time of compaction.
class CompactionFilter {
public:
virtual ~CompactionFilter() {} virtual ~CompactionFilter() {}
// The compaction process invokes this // The compaction process invokes this
@ -64,14 +65,47 @@ class CompactionFilter {
virtual const char* Name() const = 0; virtual const char* Name() const = 0;
}; };
// CompactionFilterV2 that buffers kv pairs sharing the same prefix and let
// application layer to make individual decisions for all the kv pairs in the
// buffer.
class CompactionFilterV2 {
public:
virtual ~CompactionFilterV2() {}
// The compaction process invokes this method for all the kv pairs
// sharing the same prefix. It is a "roll-up" version of CompactionFilter.
//
// Each entry in the return vector indicates if the corresponding kv should
// be preserved in the output of this compaction run. The application can
// inspect the exisitng values of the keys and make decision based on it.
//
// When a value is to be preserved, the application has the option
// to modify the entry in existing_values and pass it back through an entry
// in new_values. A corresponding values_changed entry needs to be set to
// true in this case. Note that the new_values vector contains only changed
// values, i.e. new_values.size() <= values_changed.size().
//
typedef std::vector<Slice> SliceVector;
virtual std::vector<bool> Filter(int level,
const SliceVector& keys,
const SliceVector& existing_values,
std::vector<std::string>* new_values,
std::vector<bool>* values_changed)
const = 0;
// Returns a name that identifies this compaction filter.
// The name will be printed to LOG file on start up for diagnosis.
virtual const char* Name() const = 0;
};
// Each compaction will create a new CompactionFilter allowing the // Each compaction will create a new CompactionFilter allowing the
// application to know about different campactions // application to know about different campactions
class CompactionFilterFactory { class CompactionFilterFactory {
public: public:
virtual ~CompactionFilterFactory() { }; virtual ~CompactionFilterFactory() { }
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) = 0; const CompactionFilterContext& context) = 0;
// Returns a name that identifies this compaction filter factory. // Returns a name that identifies this compaction filter factory.
virtual const char* Name() const = 0; virtual const char* Name() const = 0;
@ -82,7 +116,7 @@ class CompactionFilterFactory {
class DefaultCompactionFilterFactory : public CompactionFilterFactory { class DefaultCompactionFilterFactory : public CompactionFilterFactory {
public: public:
virtual std::unique_ptr<CompactionFilter> virtual std::unique_ptr<CompactionFilter>
CreateCompactionFilter(const CompactionFilter::Context& context) override { CreateCompactionFilter(const CompactionFilterContext& context) override {
return std::unique_ptr<CompactionFilter>(nullptr); return std::unique_ptr<CompactionFilter>(nullptr);
} }
@ -91,6 +125,65 @@ class DefaultCompactionFilterFactory : public CompactionFilterFactory {
} }
}; };
// Each compaction will create a new CompactionFilterV2
//
// CompactionFilterFactoryV2 enables application to specify a prefix and use
// CompactionFilterV2 to filter kv-pairs in batches. Each batch contains all
// the kv-pairs sharing the same prefix.
//
// This is useful for applications that require grouping kv-pairs in
// compaction filter to make a purge/no-purge decision. For example, if the
// key prefix is user id and the rest of key represents the type of value.
// This batching filter will come in handy if the application's compaction
// filter requires knowledge of all types of values for any user id.
//
class CompactionFilterFactoryV2 {
public:
explicit CompactionFilterFactoryV2(const SliceTransform* prefix_extractor)
: prefix_extractor_(prefix_extractor) { }
virtual ~CompactionFilterFactoryV2() { }
virtual std::unique_ptr<CompactionFilterV2> CreateCompactionFilterV2(
const CompactionFilterContext& context) = 0;
// Returns a name that identifies this compaction filter factory.
virtual const char* Name() const = 0;
const SliceTransform* GetPrefixExtractor() const {
return prefix_extractor_;
}
void SetPrefixExtractor(const SliceTransform* prefix_extractor) {
prefix_extractor_ = prefix_extractor;
}
private:
// Prefix extractor for compaction filter v2
// Keys sharing the same prefix will be buffered internally.
// Client can implement a Filter callback function to operate on the buffer
const SliceTransform* prefix_extractor_;
};
// Default implementaion of CompactionFilterFactoryV2 which does not
// return any filter
class DefaultCompactionFilterFactoryV2 : public CompactionFilterFactoryV2 {
public:
explicit DefaultCompactionFilterFactoryV2(
const SliceTransform* prefix_extractor)
: CompactionFilterFactoryV2(prefix_extractor) { }
virtual std::unique_ptr<CompactionFilterV2>
CreateCompactionFilterV2(
const CompactionFilterContext& context) override {
return std::unique_ptr<CompactionFilterV2>(nullptr);
}
virtual const char* Name() const override {
return "DefaultCompactionFilterFactoryV2";
}
};
} // namespace rocksdb } // namespace rocksdb
#endif // STORAGE_ROCKSDB_INCLUDE_COMPACTION_FILTER_H_ #endif // STORAGE_ROCKSDB_INCLUDE_COMPACTION_FILTER_H_

@ -214,7 +214,7 @@ class Env {
virtual void StartThread(void (*function)(void* arg), void* arg) = 0; virtual void StartThread(void (*function)(void* arg), void* arg) = 0;
// Wait for all threads started by StartThread to terminate. // Wait for all threads started by StartThread to terminate.
virtual void WaitForJoin() = 0; virtual void WaitForJoin() {}
// Get thread pool queue length for specific thrad pool. // Get thread pool queue length for specific thrad pool.
virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const { virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const {

@ -22,6 +22,7 @@ namespace rocksdb {
class Cache; class Cache;
class CompactionFilter; class CompactionFilter;
class CompactionFilterFactory; class CompactionFilterFactory;
class CompactionFilterFactoryV2;
class Comparator; class Comparator;
class Env; class Env;
enum InfoLogLevel : unsigned char; enum InfoLogLevel : unsigned char;
@ -123,6 +124,10 @@ struct Options {
// Default: a factory that doesn't provide any object // Default: a factory that doesn't provide any object
std::shared_ptr<CompactionFilterFactory> compaction_filter_factory; std::shared_ptr<CompactionFilterFactory> compaction_filter_factory;
// Version TWO of the compaction_filter_factory
// It supports rolling compaction
std::shared_ptr<CompactionFilterFactoryV2> compaction_filter_factory_v2;
// If true, the database will be created if it is missing. // If true, the database will be created if it is missing.
// Default: false // Default: false
bool create_if_missing; bool create_if_missing;

@ -32,6 +32,9 @@ Options::Options()
compaction_filter(nullptr), compaction_filter(nullptr),
compaction_filter_factory(std::shared_ptr<CompactionFilterFactory>( compaction_filter_factory(std::shared_ptr<CompactionFilterFactory>(
new DefaultCompactionFilterFactory())), new DefaultCompactionFilterFactory())),
compaction_filter_factory_v2(
new DefaultCompactionFilterFactoryV2(
NewFixedPrefixTransform(8))),
create_if_missing(false), create_if_missing(false),
error_if_exists(false), error_if_exists(false),
paranoid_checks(false), paranoid_checks(false),
@ -131,6 +134,8 @@ Options::Dump(Logger* log) const
compaction_filter? compaction_filter->Name() : "None"); compaction_filter? compaction_filter->Name() : "None");
Log(log," Options.compaction_filter_factory: %s", Log(log," Options.compaction_filter_factory: %s",
compaction_filter_factory->Name()); compaction_filter_factory->Name());
Log(log, " Options.compaction_filter_factory_v2: %s",
compaction_filter_factory_v2->Name());
Log(log," Options.memtable_factory: %s", Log(log," Options.memtable_factory: %s",
memtable_factory->Name()); memtable_factory->Name());
Log(log," Options.table_factory: %s", table_factory->Name()); Log(log," Options.table_factory: %s", table_factory->Name());

@ -192,7 +192,7 @@ class TtlCompactionFilterFactory : public CompactionFilterFactory {
user_comp_filter_factory_(comp_filter_factory) { } user_comp_filter_factory_(comp_filter_factory) { }
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) { const CompactionFilterContext& context) {
return std::unique_ptr<TtlCompactionFilter>( return std::unique_ptr<TtlCompactionFilter>(
new TtlCompactionFilter( new TtlCompactionFilter(
ttl_, ttl_,

@ -285,7 +285,7 @@ class TtlTest {
virtual std::unique_ptr<CompactionFilter> virtual std::unique_ptr<CompactionFilter>
CreateCompactionFilter( CreateCompactionFilter(
const CompactionFilter::Context& context) override { const CompactionFilterContext& context) override {
return std::unique_ptr<CompactionFilter>( return std::unique_ptr<CompactionFilter>(
new TestFilter(kSampleSize_, kNewValue_)); new TestFilter(kSampleSize_, kNewValue_));
} }

Loading…
Cancel
Save