From 138b87eae4ce6a4ba0852f3bf3bfd8632b9bdb64 Mon Sep 17 00:00:00 2001 From: Mike Kolupaev Date: Fri, 2 Jun 2017 14:56:31 -0700 Subject: [PATCH] =?UTF-8?q?Fix=20interaction=20between=20CompactionFilter:?= =?UTF-8?q?:Decision::kRemoveAndSkipUnt=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: Fixes the following scenario: 1. Set prefix extractor. Enable bloom filters, with `whole_key_filtering = false`. Use compaction filter that sometimes returns `kRemoveAndSkipUntil`. 2. Do a compaction. 3. Compaction creates an iterator with `total_order_seek = false`, calls `SeekToFirst()` on it, then repeatedly calls `Next()`. 4. At some point compaction filter returns `kRemoveAndSkipUntil`. 5. Compaction calls `Seek(skip_until)` on the iterator. The key that it seeks to happens to have prefix that doesn't match the bloom filter. Since `total_order_seek = false`, iterator becomes invalid, and compaction thinks that it has reached the end. The rest of the compaction input is silently discarded. The fix is to make compaction iterator use `total_order_seek = true`. The implementation for PlainTable is quite awkward. I've made `kRemoveAndSkipUntil` officially incompatible with PlainTable. If you try to use them together, compaction will fail, and DB will enter read-only mode (`bg_error_`). That's not a very graceful way to communicate a misconfiguration, but the alternatives don't seem worth the implementation time and complexity. To be able to check in advance that `kRemoveAndSkipUntil` is not going to be used with PlainTable, we'd need to extend the interface of either `CompactionFilter` or `InternalIterator`. It seems unlikely that anyone will ever want to use `kRemoveAndSkipUntil` with PlainTable: PlainTable probably has very few users, and `kRemoveAndSkipUntil` has only one user so far: us (logdevice). Closes https://github.com/facebook/rocksdb/pull/2349 Differential Revision: D5110388 Pulled By: lightmark fbshipit-source-id: ec29101a99d9dcd97db33923b87f72bce56cc17a --- db/db_compaction_filter_test.cc | 39 ++++++++++++++++++++++++++++- db/version_set.cc | 8 +++--- include/rocksdb/compaction_filter.h | 26 +++++++++---------- table/cuckoo_table_factory.h | 1 + table/cuckoo_table_reader.cc | 4 --- table/plain_table_reader.cc | 24 ++++++++++++------ 6 files changed, 74 insertions(+), 28 deletions(-) diff --git a/db/db_compaction_filter_test.cc b/db/db_compaction_filter_test.cc index b1197c226..a2534d259 100644 --- a/db/db_compaction_filter_test.cc +++ b/db/db_compaction_filter_test.cc @@ -780,7 +780,7 @@ TEST_F(DBTestCompactionFilter, SkipUntil) { cfilter_skips = 0; ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); - // Numberof skips in tables: 2, 3, 3, 3. + // Number of skips in tables: 2, 3, 3, 3. ASSERT_EQ(11, cfilter_skips); for (int table = 0; table < 4; ++table) { @@ -801,6 +801,43 @@ TEST_F(DBTestCompactionFilter, SkipUntil) { } } +TEST_F(DBTestCompactionFilter, SkipUntilWithBloomFilter) { + BlockBasedTableOptions table_options; + table_options.whole_key_filtering = false; + table_options.filter_policy.reset(NewBloomFilterPolicy(100, false)); + + Options options = CurrentOptions(); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + options.prefix_extractor.reset(NewCappedPrefixTransform(9)); + options.compaction_filter_factory = std::make_shared(); + options.disable_auto_compactions = true; + options.create_if_missing = true; + DestroyAndReopen(options); + + Put("0000000010", "v10"); + Put("0000000020", "v20"); // skipped + Put("0000000050", "v50"); + Flush(); + + cfilter_skips = 0; + EXPECT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + EXPECT_EQ(1, cfilter_skips); + + Status s; + std::string val; + + s = db_->Get(ReadOptions(), "0000000010", &val); + ASSERT_OK(s); + EXPECT_EQ("v10", val); + + s = db_->Get(ReadOptions(), "0000000020", &val); + EXPECT_TRUE(s.IsNotFound()); + + s = db_->Get(ReadOptions(), "0000000050", &val); + ASSERT_OK(s); + EXPECT_EQ("v50", val); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/version_set.cc b/db/version_set.cc index 49981741b..ae284c085 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3495,9 +3495,11 @@ InternalIterator* VersionSet::MakeInputIterator( ReadOptions read_options; read_options.verify_checksums = true; read_options.fill_cache = false; - if (c->ShouldFormSubcompactions()) { - read_options.total_order_seek = true; - } + // Compaction iterators shouldn't be confined to a single prefix. + // Compactions use Seek() for + // (a) concurrent compactions, + // (b) CompactionFilter::Decision::kRemoveAndSkipUntil. + read_options.total_order_seek = true; // Level-0 files have to be merged together. For other levels, // we will make a concatenating iterator per level. diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index 209d59d89..030165528 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -135,19 +135,19 @@ class CompactionFilter { // *skip_until <= key is treated the same as Decision::kKeep // (since the range [key, *skip_until) is empty). // - // The keys are skipped even if there are snapshots containing them, - // as if IgnoreSnapshots() was true; i.e. values removed - // by kRemoveAndSkipUntil can disappear from a snapshot - beware - // if you're using TransactionDB or DB::GetSnapshot(). - // - // Another warning: if value for a key was overwritten or merged into - // (multiple Put()s or Merge()s), and compaction filter skips this key - // with kRemoveAndSkipUntil, it's possible that it will remove only - // the new value, exposing the old value that was supposed to be - // overwritten. - // - // If you use kRemoveAndSkipUntil, consider also reducing - // compaction_readahead_size option. + // Caveats: + // - The keys are skipped even if there are snapshots containing them, + // as if IgnoreSnapshots() was true; i.e. values removed + // by kRemoveAndSkipUntil can disappear from a snapshot - beware + // if you're using TransactionDB or DB::GetSnapshot(). + // - If value for a key was overwritten or merged into (multiple Put()s + // or Merge()s), and compaction filter skips this key with + // kRemoveAndSkipUntil, it's possible that it will remove only + // the new value, exposing the old value that was supposed to be + // overwritten. + // - Doesn't work with PlainTableFactory in prefix mode. + // - If you use kRemoveAndSkipUntil, consider also reducing + // compaction_readahead_size option. // // Note: If you are using a TransactionDB, it is not recommended to filter // out or modify merge operands (ValueType::kMergeOperand). diff --git a/table/cuckoo_table_factory.h b/table/cuckoo_table_factory.h index ff67ae247..acf0da9d3 100644 --- a/table/cuckoo_table_factory.h +++ b/table/cuckoo_table_factory.h @@ -47,6 +47,7 @@ static inline uint64_t CuckooHash( // - Key length and Value length are fixed. // - Does not support Snapshot. // - Does not support Merge operations. +// - Does not support prefix bloom filters. class CuckooTableFactory : public TableFactory { public: explicit CuckooTableFactory(const CuckooTableOptions& table_options) diff --git a/table/cuckoo_table_reader.cc b/table/cuckoo_table_reader.cc index 3b6819dce..7a886229b 100644 --- a/table/cuckoo_table_reader.cc +++ b/table/cuckoo_table_reader.cc @@ -372,10 +372,6 @@ InternalIterator* CuckooTableReader::NewIterator( return NewErrorInternalIterator( Status::Corruption("CuckooTableReader status is not okay."), arena); } - if (read_options.total_order_seek) { - return NewErrorInternalIterator( - Status::InvalidArgument("total_order_seek is not supported."), arena); - } CuckooTableIterator* iter; if (arena == nullptr) { iter = new CuckooTableIterator(this); diff --git a/table/plain_table_reader.cc b/table/plain_table_reader.cc index 86dc2b5fe..0f9449e86 100644 --- a/table/plain_table_reader.cc +++ b/table/plain_table_reader.cc @@ -193,15 +193,12 @@ InternalIterator* PlainTableReader::NewIterator(const ReadOptions& options, Arena* arena, const InternalKeyComparator*, bool skip_filters) { - if (options.total_order_seek && !IsTotalOrderMode()) { - return NewErrorInternalIterator( - Status::InvalidArgument("total_order_seek not supported"), arena); - } + bool use_prefix_seek = !IsTotalOrderMode() && !options.total_order_seek; if (arena == nullptr) { - return new PlainTableIterator(this, prefix_extractor_ != nullptr); + return new PlainTableIterator(this, use_prefix_seek); } else { auto mem = arena->AllocateAligned(sizeof(PlainTableIterator)); - return new (mem) PlainTableIterator(this, prefix_extractor_ != nullptr); + return new (mem) PlainTableIterator(this, use_prefix_seek); } } @@ -641,9 +638,22 @@ void PlainTableIterator::SeekToLast() { } void PlainTableIterator::Seek(const Slice& target) { + if (use_prefix_seek_ != !table_->IsTotalOrderMode()) { + // This check is done here instead of NewIterator() to permit creating an + // iterator with total_order_seek = true even if we won't be able to Seek() + // it. This is needed for compaction: it creates iterator with + // total_order_seek = true but usually never does Seek() on it, + // only SeekToFirst(). + status_ = + Status::InvalidArgument( + "total_order_seek not implemented for PlainTable."); + offset_ = next_offset_ = table_->file_info_.data_end_offset; + return; + } + // If the user doesn't set prefix seek option and we are not able to do a // total Seek(). assert failure. - if (!use_prefix_seek_) { + if (table_->IsTotalOrderMode()) { if (table_->full_scan_mode_) { status_ = Status::InvalidArgument("Seek() is not allowed in full scan mode.");