[RocksDB] Add OnCompactionStart to CompactionFilter class

Summary: This is to give application compaction filter a chance to access context information of a specific compaction run. For example, depending on whether a compaction goes through all data files, the application could do things differently.

Test Plan: make check

Reviewers: dhruba, kailiu, sdong

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D13683
main
Haobo Xu 11 years ago
parent b4fab3be2a
commit 8cbe5bb56b
  1. 19
      db/db_impl.cc
  2. 6
      db/db_test.cc
  3. 5
      db/version_set.cc
  4. 5
      db/version_set.h
  5. 38
      include/rocksdb/compaction_filter.h
  6. 13
      utilities/ttl/db_ttl.h
  7. 3
      utilities/ttl/ttl_test.cc

@ -101,6 +101,13 @@ struct DBImpl::CompactionState {
: compaction(c),
total_bytes(0) {
}
// Create a client visible context of this compaction
CompactionFilter::Context GetFilterContext() {
CompactionFilter::Context context;
context.is_full_compaction = compaction->IsFullCompaction();
return context;
}
};
struct DBImpl::DeletionState {
@ -166,9 +173,8 @@ Options SanitizeOptions(const std::string& dbname,
if (result.soft_rate_limit > result.hard_rate_limit) {
result.soft_rate_limit = result.hard_rate_limit;
}
if (result.compaction_filter &&
result.compaction_filter_factory->CreateCompactionFilter().get()) {
Log(result.info_log, "Both filter and factory specified. Using filter");
if (result.compaction_filter) {
Log(result.info_log, "Compaction filter specified, ignore factory");
}
if (result.prefix_extractor) {
// If a prefix extractor has been supplied and a PrefixHashRepFactory is
@ -1927,6 +1933,7 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot(
}
Status DBImpl::DoCompactionWork(CompactionState* compact) {
assert(compact);
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
Log(options_.info_log,
"Compacting %d@%d + %d@%d files, score %.2f slots available %d",
@ -1987,10 +1994,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
auto compaction_filter = options_.compaction_filter;
std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
if (!compaction_filter) {
compaction_filter_from_factory = std::move(
options_.compaction_filter_factory->CreateCompactionFilter());
auto context = compact->GetFilterContext();
compaction_filter_from_factory =
options_.compaction_filter_factory->CreateCompactionFilter(context);
compaction_filter = compaction_filter_from_factory.get();
}
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
// Prioritize immutable compaction work
// TODO: remove memtable flush from normal compaction work

@ -2430,7 +2430,7 @@ class ChangeFilter : public CompactionFilter {
class KeepFilterFactory : public CompactionFilterFactory {
public:
virtual std::unique_ptr<CompactionFilter>
CreateCompactionFilter() override {
CreateCompactionFilter(const CompactionFilter::Context& context) override {
return std::unique_ptr<CompactionFilter>(new KeepFilter());
}
@ -2442,7 +2442,7 @@ class KeepFilterFactory : public CompactionFilterFactory {
class DeleteFilterFactory : public CompactionFilterFactory {
public:
virtual std::unique_ptr<CompactionFilter>
CreateCompactionFilter() override {
CreateCompactionFilter(const CompactionFilter::Context& context) override {
return std::unique_ptr<CompactionFilter>(new DeleteFilter());
}
@ -2456,7 +2456,7 @@ class ChangeFilterFactory : public CompactionFilterFactory {
explicit ChangeFilterFactory(int argv) : argv_(argv) {}
virtual std::unique_ptr<CompactionFilter>
CreateCompactionFilter() override {
CreateCompactionFilter(const CompactionFilter::Context& context) override {
return std::unique_ptr<CompactionFilter>(new ChangeFilter(argv_));
}

@ -2467,6 +2467,10 @@ Compaction* VersionSet::PickCompactionUniversal(int level, double score) {
// remember this currently undergoing compaction
compactions_in_progress_[level].insert(c);
// Record whether this compaction includes all sst files.
// For now, it is only relevant in universal compaction mode.
c->is_full_compaction_ = (c->inputs_[0].size() == current_->files_[0].size());
return c;
}
@ -2921,6 +2925,7 @@ Compaction::Compaction(int level, int out_level, uint64_t target_file_size,
parent_index_(-1),
score_(0),
bottommost_level_(false),
is_full_compaction_(false),
level_ptrs_(std::vector<size_t>(number_levels)) {
edit_ = new VersionEdit(number_levels_);
for (int i = 0; i < number_levels_; i++) {

@ -589,6 +589,9 @@ class Compaction {
// Is this compaction creating a file in the bottom most level?
bool BottomMostLevel() { return bottommost_level_; }
// Does this compaction include all sst files?
bool IsFullCompaction() { return is_full_compaction_; }
private:
friend class Version;
friend class VersionSet;
@ -624,6 +627,8 @@ class Compaction {
// Is this compaction creating a file in the bottom most level?
bool bottommost_level_;
// Does this compaction include all sst files?
bool is_full_compaction_;
// level_ptrs_ holds indices into input_version_->levels_: our state
// is that we are positioned at one of the file ranges for each

@ -16,8 +16,14 @@ class Slice;
class CompactionFilter {
public:
virtual ~CompactionFilter() {}
// Context information of a compaction run
struct Context {
// Does this compaction run include all data files
bool is_full_compaction;
};
virtual ~CompactionFilter() {}
// The compaction process invokes this
// method for kv that is being compacted. A return value
@ -44,26 +50,28 @@ class CompactionFilter {
// Each compaction will create a new CompactionFilter allowing the
// application to know about different campactions
class CompactionFilterFactory {
public:
virtual ~CompactionFilterFactory() { };
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter() = 0;
public:
virtual ~CompactionFilterFactory() { };
// Returns a name that identifies this compaction filter factory.
virtual const char* Name() const = 0;
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) = 0;
// Returns a name that identifies this compaction filter factory.
virtual const char* Name() const = 0;
};
// Default implementaion of CompactionFilterFactory which does not
// return any filter
class DefaultCompactionFilterFactory : public CompactionFilterFactory {
public:
virtual std::unique_ptr<CompactionFilter>
CreateCompactionFilter() override {
return std::unique_ptr<CompactionFilter>(nullptr);
}
virtual const char* Name() const override {
return "DefaultCompactionFilterFactory";
}
public:
virtual std::unique_ptr<CompactionFilter>
CreateCompactionFilter(const CompactionFilter::Context& context) override {
return std::unique_ptr<CompactionFilter>(nullptr);
}
virtual const char* Name() const override {
return "DefaultCompactionFilterFactory";
}
};
} // namespace rocksdb

@ -236,12 +236,15 @@ class TtlCompactionFilterFactory : public CompactionFilterFactory {
: ttl_(ttl),
user_comp_filter_factory_(comp_filter_factory) { }
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter() {
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) {
return std::unique_ptr<TtlCompactionFilter>(
new TtlCompactionFilter(
ttl_,
nullptr,
std::move(user_comp_filter_factory_->CreateCompactionFilter())));
new TtlCompactionFilter(
ttl_,
nullptr,
std::move(user_comp_filter_factory_->CreateCompactionFilter(context))
)
);
}
virtual const char* Name() const override {

@ -284,7 +284,8 @@ class TtlTest {
}
virtual std::unique_ptr<CompactionFilter>
CreateCompactionFilter() override {
CreateCompactionFilter(
const CompactionFilter::Context& context) override {
return std::unique_ptr<CompactionFilter>(
new TestFilter(kSampleSize_, kNewValue_));
}

Loading…
Cancel
Save