[RocksDB] fix compaction filter trigger condition

Summary:
Currently, compaction filter is run on internal key older than the oldest snapshot, which is incorrect.
Compaction filter should really be run on the most recent internal key when there is no external snapshot.

Test Plan: make check; db_stress

Reviewers: dhruba

Reviewed By: dhruba

Differential Revision: https://reviews.facebook.net/D10641
main
Haobo Xu 12 years ago
parent 3102628873
commit 73c0a33346
  1. 9
      db/db_impl.cc
  2. 28
      db/db_test.cc
  3. 5
      db/snapshot.h

@ -1703,10 +1703,13 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
value = merge.value(); value = merge.value();
} else if (options_.CompactionFilter != nullptr && } else if (options_.CompactionFilter != nullptr &&
ikey.type != kTypeDeletion && ikey.type != kTypeDeletion &&
ikey.sequence < earliest_snapshot) { visible_at_tip) {
// If the user has specified a compaction filter, then invoke // If the user has specified a compaction filter and there are no
// it. If the return value of the compaction filter is true, // outstanding external snapshots, then invoke the filter.
// If the return value of the compaction filter is true,
// drop this key from the output. // drop this key from the output.
assert(ikey.type == kTypeValue);
assert(!drop);
bool value_changed = false; bool value_changed = false;
compaction_filter_value.clear(); compaction_filter_value.clear();
drop = options_.CompactionFilter(options_.compaction_filter_args, drop = options_.CompactionFilter(options_.compaction_filter_args,

@ -1400,13 +1400,9 @@ TEST(DBTest, CompactionFilter) {
options.CompactionFilter = keep_filter; options.CompactionFilter = keep_filter;
Reopen(&options); Reopen(&options);
// Write 100K+1 keys, these are written to a few files // Write 100K keys, these are written to a few files in L0.
// in L0. We do this so that the current snapshot points
// to the 100001 key.The compaction filter is not invoked
// on keys that are visible via a snapshot because we
// anyways cannot delete it.
const std::string value(10, 'x'); const std::string value(10, 'x');
for (int i = 0; i < 100001; i++) { for (int i = 0; i < 100000; i++) {
char key[100]; char key[100];
snprintf(key, sizeof(key), "B%010d", i); snprintf(key, sizeof(key), "B%010d", i);
Put(key, value); Put(key, value);
@ -1433,6 +1429,7 @@ TEST(DBTest, CompactionFilter) {
// has sequence number zero. The 100001st record // has sequence number zero. The 100001st record
// is at the tip of this snapshot and cannot // is at the tip of this snapshot and cannot
// be zeroed out. // be zeroed out.
// TODO: figure out sequence number squashtoo
int count = 0; int count = 0;
int total = 0; int total = 0;
Iterator* iter = dbfull()->TEST_NewInternalIterator(); Iterator* iter = dbfull()->TEST_NewInternalIterator();
@ -1448,12 +1445,12 @@ TEST(DBTest, CompactionFilter) {
} }
iter->Next(); iter->Next();
} }
ASSERT_EQ(total, 100001); ASSERT_EQ(total, 100000);
ASSERT_EQ(count, 1); ASSERT_EQ(count, 1);
delete iter; delete iter;
// overwrite all the 100K+1 keys once again. // overwrite all the 100K keys once again.
for (int i = 0; i < 100001; i++) { for (int i = 0; i < 100000; i++) {
char key[100]; char key[100];
snprintf(key, sizeof(key), "B%010d", i); snprintf(key, sizeof(key), "B%010d", i);
Put(key, value); Put(key, value);
@ -1480,7 +1477,7 @@ TEST(DBTest, CompactionFilter) {
DestroyAndReopen(&options); DestroyAndReopen(&options);
// write all the keys once again. // write all the keys once again.
for (int i = 0; i < 100001; i++) { for (int i = 0; i < 100000; i++) {
char key[100]; char key[100];
snprintf(key, sizeof(key), "B%010d", i); snprintf(key, sizeof(key), "B%010d", i);
Put(key, value); Put(key, value);
@ -1503,10 +1500,7 @@ TEST(DBTest, CompactionFilter) {
ASSERT_EQ(NumTableFilesAtLevel(0), 0); ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1), 0); ASSERT_EQ(NumTableFilesAtLevel(1), 0);
// Scan the entire database to ensure that only the // Scan the entire database to ensure that nothing is left
// 100001th key is left in the db. The 100001th key
// is part of the default-most-current snapshot and
// cannot be deleted.
iter = db_->NewIterator(ReadOptions()); iter = db_->NewIterator(ReadOptions());
iter->SeekToFirst(); iter->SeekToFirst();
count = 0; count = 0;
@ -1514,12 +1508,14 @@ TEST(DBTest, CompactionFilter) {
count++; count++;
iter->Next(); iter->Next();
} }
ASSERT_EQ(count, 1); ASSERT_EQ(count, 0);
delete iter; delete iter;
// The sequence number of the remaining record // The sequence number of the remaining record
// is not zeroed out even though it is at the // is not zeroed out even though it is at the
// level Lmax because this record is at the tip // level Lmax because this record is at the tip
// TODO: remove the following or design a different
// test
count = 0; count = 0;
iter = dbfull()->TEST_NewInternalIterator(); iter = dbfull()->TEST_NewInternalIterator();
iter->SeekToFirst(); iter->SeekToFirst();
@ -1531,7 +1527,7 @@ TEST(DBTest, CompactionFilter) {
count++; count++;
iter->Next(); iter->Next();
} }
ASSERT_EQ(count, 1); ASSERT_EQ(count, 0);
delete iter; delete iter;
} }

@ -59,12 +59,9 @@ class SnapshotList {
// retrieve all snapshot numbers. They are sorted in ascending order. // retrieve all snapshot numbers. They are sorted in ascending order.
void getAll(std::vector<SequenceNumber>& ret) { void getAll(std::vector<SequenceNumber>& ret) {
SnapshotImpl* s = &list_;
if (empty()) return; if (empty()) return;
SequenceNumber prev __attribute__((unused)) = 0; SnapshotImpl* s = &list_;
while (s->next_ != &list_) { while (s->next_ != &list_) {
assert(prev <= s->next_->number_);
assert(prev = s->next_->number_); // assignment
ret.push_back(s->next_->number_); ret.push_back(s->next_->number_);
s = s ->next_; s = s ->next_;
} }

Loading…
Cancel
Save