Fix interaction between CompactionFilter::Decision::kRemoveAndSkipUnt…

Summary:
Fixes the following scenario:
 1. Set prefix extractor. Enable bloom filters, with `whole_key_filtering = false`. Use compaction filter that sometimes returns `kRemoveAndSkipUntil`.
 2. Do a compaction.
 3. Compaction creates an iterator with `total_order_seek = false`, calls `SeekToFirst()` on it, then repeatedly calls `Next()`.
 4. At some point compaction filter returns `kRemoveAndSkipUntil`.
 5. Compaction calls `Seek(skip_until)` on the iterator. The key that it seeks to happens to have prefix that doesn't match the bloom filter. Since `total_order_seek = false`, iterator becomes invalid, and compaction thinks that it has reached the end. The rest of the compaction input is silently discarded.

The fix is to make compaction iterator use `total_order_seek = true`.

The implementation for PlainTable is quite awkward. I've made `kRemoveAndSkipUntil` officially incompatible with PlainTable. If you try to use them together, compaction will fail, and DB will enter read-only mode (`bg_error_`). That's not a very graceful way to communicate a misconfiguration, but the alternatives don't seem worth the implementation time and complexity. To be able to check in advance that `kRemoveAndSkipUntil` is not going to be used with PlainTable, we'd need to extend the interface of either `CompactionFilter` or `InternalIterator`. It seems unlikely that anyone will ever want to use `kRemoveAndSkipUntil` with PlainTable: PlainTable probably has very few users, and `kRemoveAndSkipUntil` has only one user so far: us (logdevice).
Closes https://github.com/facebook/rocksdb/pull/2349

Differential Revision: D5110388

Pulled By: lightmark

fbshipit-source-id: ec29101a99d9dcd97db33923b87f72bce56cc17a
main
Mike Kolupaev 8 years ago committed by Facebook Github Bot
parent 95b0e89b5d
commit 138b87eae4
  1. 39
      db/db_compaction_filter_test.cc
  2. 6
      db/version_set.cc
  3. 14
      include/rocksdb/compaction_filter.h
  4. 1
      table/cuckoo_table_factory.h
  5. 4
      table/cuckoo_table_reader.cc
  6. 24
      table/plain_table_reader.cc

@ -780,7 +780,7 @@ TEST_F(DBTestCompactionFilter, SkipUntil) {
cfilter_skips = 0; cfilter_skips = 0;
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// Numberof skips in tables: 2, 3, 3, 3. // Number of skips in tables: 2, 3, 3, 3.
ASSERT_EQ(11, cfilter_skips); ASSERT_EQ(11, cfilter_skips);
for (int table = 0; table < 4; ++table) { for (int table = 0; table < 4; ++table) {
@ -801,6 +801,43 @@ TEST_F(DBTestCompactionFilter, SkipUntil) {
} }
} }
TEST_F(DBTestCompactionFilter, SkipUntilWithBloomFilter) {
BlockBasedTableOptions table_options;
table_options.whole_key_filtering = false;
table_options.filter_policy.reset(NewBloomFilterPolicy(100, false));
Options options = CurrentOptions();
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.prefix_extractor.reset(NewCappedPrefixTransform(9));
options.compaction_filter_factory = std::make_shared<SkipEvenFilterFactory>();
options.disable_auto_compactions = true;
options.create_if_missing = true;
DestroyAndReopen(options);
Put("0000000010", "v10");
Put("0000000020", "v20"); // skipped
Put("0000000050", "v50");
Flush();
cfilter_skips = 0;
EXPECT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
EXPECT_EQ(1, cfilter_skips);
Status s;
std::string val;
s = db_->Get(ReadOptions(), "0000000010", &val);
ASSERT_OK(s);
EXPECT_EQ("v10", val);
s = db_->Get(ReadOptions(), "0000000020", &val);
EXPECT_TRUE(s.IsNotFound());
s = db_->Get(ReadOptions(), "0000000050", &val);
ASSERT_OK(s);
EXPECT_EQ("v50", val);
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -3495,9 +3495,11 @@ InternalIterator* VersionSet::MakeInputIterator(
ReadOptions read_options; ReadOptions read_options;
read_options.verify_checksums = true; read_options.verify_checksums = true;
read_options.fill_cache = false; read_options.fill_cache = false;
if (c->ShouldFormSubcompactions()) { // Compaction iterators shouldn't be confined to a single prefix.
// Compactions use Seek() for
// (a) concurrent compactions,
// (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
read_options.total_order_seek = true; read_options.total_order_seek = true;
}
// Level-0 files have to be merged together. For other levels, // Level-0 files have to be merged together. For other levels,
// we will make a concatenating iterator per level. // we will make a concatenating iterator per level.

@ -135,18 +135,18 @@ class CompactionFilter {
// *skip_until <= key is treated the same as Decision::kKeep // *skip_until <= key is treated the same as Decision::kKeep
// (since the range [key, *skip_until) is empty). // (since the range [key, *skip_until) is empty).
// //
// The keys are skipped even if there are snapshots containing them, // Caveats:
// - The keys are skipped even if there are snapshots containing them,
// as if IgnoreSnapshots() was true; i.e. values removed // as if IgnoreSnapshots() was true; i.e. values removed
// by kRemoveAndSkipUntil can disappear from a snapshot - beware // by kRemoveAndSkipUntil can disappear from a snapshot - beware
// if you're using TransactionDB or DB::GetSnapshot(). // if you're using TransactionDB or DB::GetSnapshot().
// // - If value for a key was overwritten or merged into (multiple Put()s
// Another warning: if value for a key was overwritten or merged into // or Merge()s), and compaction filter skips this key with
// (multiple Put()s or Merge()s), and compaction filter skips this key // kRemoveAndSkipUntil, it's possible that it will remove only
// with kRemoveAndSkipUntil, it's possible that it will remove only
// the new value, exposing the old value that was supposed to be // the new value, exposing the old value that was supposed to be
// overwritten. // overwritten.
// // - Doesn't work with PlainTableFactory in prefix mode.
// If you use kRemoveAndSkipUntil, consider also reducing // - If you use kRemoveAndSkipUntil, consider also reducing
// compaction_readahead_size option. // compaction_readahead_size option.
// //
// Note: If you are using a TransactionDB, it is not recommended to filter // Note: If you are using a TransactionDB, it is not recommended to filter

@ -47,6 +47,7 @@ static inline uint64_t CuckooHash(
// - Key length and Value length are fixed. // - Key length and Value length are fixed.
// - Does not support Snapshot. // - Does not support Snapshot.
// - Does not support Merge operations. // - Does not support Merge operations.
// - Does not support prefix bloom filters.
class CuckooTableFactory : public TableFactory { class CuckooTableFactory : public TableFactory {
public: public:
explicit CuckooTableFactory(const CuckooTableOptions& table_options) explicit CuckooTableFactory(const CuckooTableOptions& table_options)

@ -372,10 +372,6 @@ InternalIterator* CuckooTableReader::NewIterator(
return NewErrorInternalIterator( return NewErrorInternalIterator(
Status::Corruption("CuckooTableReader status is not okay."), arena); Status::Corruption("CuckooTableReader status is not okay."), arena);
} }
if (read_options.total_order_seek) {
return NewErrorInternalIterator(
Status::InvalidArgument("total_order_seek is not supported."), arena);
}
CuckooTableIterator* iter; CuckooTableIterator* iter;
if (arena == nullptr) { if (arena == nullptr) {
iter = new CuckooTableIterator(this); iter = new CuckooTableIterator(this);

@ -193,15 +193,12 @@ InternalIterator* PlainTableReader::NewIterator(const ReadOptions& options,
Arena* arena, Arena* arena,
const InternalKeyComparator*, const InternalKeyComparator*,
bool skip_filters) { bool skip_filters) {
if (options.total_order_seek && !IsTotalOrderMode()) { bool use_prefix_seek = !IsTotalOrderMode() && !options.total_order_seek;
return NewErrorInternalIterator(
Status::InvalidArgument("total_order_seek not supported"), arena);
}
if (arena == nullptr) { if (arena == nullptr) {
return new PlainTableIterator(this, prefix_extractor_ != nullptr); return new PlainTableIterator(this, use_prefix_seek);
} else { } else {
auto mem = arena->AllocateAligned(sizeof(PlainTableIterator)); auto mem = arena->AllocateAligned(sizeof(PlainTableIterator));
return new (mem) PlainTableIterator(this, prefix_extractor_ != nullptr); return new (mem) PlainTableIterator(this, use_prefix_seek);
} }
} }
@ -641,9 +638,22 @@ void PlainTableIterator::SeekToLast() {
} }
void PlainTableIterator::Seek(const Slice& target) { void PlainTableIterator::Seek(const Slice& target) {
if (use_prefix_seek_ != !table_->IsTotalOrderMode()) {
// This check is done here instead of NewIterator() to permit creating an
// iterator with total_order_seek = true even if we won't be able to Seek()
// it. This is needed for compaction: it creates iterator with
// total_order_seek = true but usually never does Seek() on it,
// only SeekToFirst().
status_ =
Status::InvalidArgument(
"total_order_seek not implemented for PlainTable.");
offset_ = next_offset_ = table_->file_info_.data_end_offset;
return;
}
// If the user doesn't set prefix seek option and we are not able to do a // If the user doesn't set prefix seek option and we are not able to do a
// total Seek(). assert failure. // total Seek(). assert failure.
if (!use_prefix_seek_) { if (table_->IsTotalOrderMode()) {
if (table_->full_scan_mode_) { if (table_->full_scan_mode_) {
status_ = status_ =
Status::InvalidArgument("Seek() is not allowed in full scan mode."); Status::InvalidArgument("Seek() is not allowed in full scan mode.");

Loading…
Cancel
Save