Make BlockBasedTableIterator compaction-aware (#4048)

Summary:
Pass in `for_compaction` to `BlockBasedTableIterator` via `BlockBasedTableReader::NewIterator`.

In 7103559f49, `for_compaction` was set in `BlockBasedTable::Rep` via `BlockBasedTable::SetupForCompaction`. In hindsight it was not the right decision; it also caused TSAN to complain.
Closes https://github.com/facebook/rocksdb/pull/4048

Differential Revision: D8601056

Pulled By: sagar0

fbshipit-source-id: 30127e898c15c38c1080d57710b8c5a6d64a0ab3
main
Sagar Vemuri 7 years ago committed by Facebook Github Bot
parent a71e467381
commit 189f0c27aa
  1. 2
      db/table_cache.cc
  2. 11
      table/block_based_table_reader.cc
  3. 11
      table/block_based_table_reader.h
  4. 2
      table/cuckoo_table_reader.cc
  5. 3
      table/cuckoo_table_reader.h
  6. 2
      table/mock_table.cc
  7. 3
      table/mock_table.h
  8. 2
      table/plain_table_reader.cc
  9. 3
      table/plain_table_reader.h
  10. 3
      table/table_reader.h

@ -236,7 +236,7 @@ InternalIterator* TableCache::NewIterator(
result = NewEmptyInternalIterator(arena); result = NewEmptyInternalIterator(arena);
} else { } else {
result = table_reader->NewIterator(options, prefix_extractor, arena, result = table_reader->NewIterator(options, prefix_extractor, arena,
skip_filters); skip_filters, for_compaction);
} }
if (create_new_table_reader) { if (create_new_table_reader) {
assert(handle == nullptr); assert(handle == nullptr);

@ -1028,7 +1028,6 @@ void BlockBasedTable::SetupForCompaction() {
default: default:
assert(false); assert(false);
} }
rep_->for_compaction = true;
} }
std::shared_ptr<const TableProperties> BlockBasedTable::GetTableProperties() std::shared_ptr<const TableProperties> BlockBasedTable::GetTableProperties()
@ -2004,7 +2003,7 @@ void BlockBasedTableIterator::InitDataBlock() {
// Automatically prefetch additional data when a range scan (iterator) does // Automatically prefetch additional data when a range scan (iterator) does
// more than 2 sequential IOs. This is enabled only for user reads and when // more than 2 sequential IOs. This is enabled only for user reads and when
// ReadOptions.readahead_size is 0. // ReadOptions.readahead_size is 0.
if (!rep->for_compaction && read_options_.readahead_size == 0) { if (!for_compaction_ && read_options_.readahead_size == 0) {
num_file_reads_++; num_file_reads_++;
if (num_file_reads_ > 2) { if (num_file_reads_ > 2) {
if (!rep->file->use_direct_io() && if (!rep->file->use_direct_io() &&
@ -2101,7 +2100,7 @@ void BlockBasedTableIterator::FindKeyBackward() {
InternalIterator* BlockBasedTable::NewIterator( InternalIterator* BlockBasedTable::NewIterator(
const ReadOptions& read_options, const SliceTransform* prefix_extractor, const ReadOptions& read_options, const SliceTransform* prefix_extractor,
Arena* arena, bool skip_filters) { Arena* arena, bool skip_filters, bool for_compaction) {
bool prefix_extractor_changed = bool prefix_extractor_changed =
PrefixExtractorChanged(rep_->table_properties.get(), prefix_extractor); PrefixExtractorChanged(rep_->table_properties.get(), prefix_extractor);
const bool kIsNotIndex = false; const bool kIsNotIndex = false;
@ -2114,7 +2113,8 @@ InternalIterator* BlockBasedTable::NewIterator(
rep_->index_type == BlockBasedTableOptions::kHashSearch), rep_->index_type == BlockBasedTableOptions::kHashSearch),
!skip_filters && !read_options.total_order_seek && !skip_filters && !read_options.total_order_seek &&
prefix_extractor != nullptr && !prefix_extractor_changed, prefix_extractor != nullptr && !prefix_extractor_changed,
prefix_extractor, kIsNotIndex); prefix_extractor, kIsNotIndex, true /*key_includes_seq*/,
for_compaction);
} else { } else {
auto* mem = arena->AllocateAligned(sizeof(BlockBasedTableIterator)); auto* mem = arena->AllocateAligned(sizeof(BlockBasedTableIterator));
return new (mem) BlockBasedTableIterator( return new (mem) BlockBasedTableIterator(
@ -2122,7 +2122,8 @@ InternalIterator* BlockBasedTable::NewIterator(
NewIndexIterator(read_options, prefix_extractor_changed), NewIndexIterator(read_options, prefix_extractor_changed),
!skip_filters && !read_options.total_order_seek && !skip_filters && !read_options.total_order_seek &&
prefix_extractor != nullptr && !prefix_extractor_changed, prefix_extractor != nullptr && !prefix_extractor_changed,
prefix_extractor, kIsNotIndex); prefix_extractor, kIsNotIndex, true /*key_includes_seq*/,
for_compaction);
} }
} }

@ -104,7 +104,8 @@ class BlockBasedTable : public TableReader {
InternalIterator* NewIterator(const ReadOptions&, InternalIterator* NewIterator(const ReadOptions&,
const SliceTransform* prefix_extractor, const SliceTransform* prefix_extractor,
Arena* arena = nullptr, Arena* arena = nullptr,
bool skip_filters = false) override; bool skip_filters = false,
bool for_compaction = false) override;
InternalIterator* NewRangeTombstoneIterator( InternalIterator* NewRangeTombstoneIterator(
const ReadOptions& read_options) override; const ReadOptions& read_options) override;
@ -506,8 +507,6 @@ struct BlockBasedTable::Rep {
bool blocks_maybe_compressed = true; bool blocks_maybe_compressed = true;
bool closed = false; bool closed = false;
bool for_compaction = false;
}; };
class BlockBasedTableIterator : public InternalIterator { class BlockBasedTableIterator : public InternalIterator {
@ -517,7 +516,8 @@ class BlockBasedTableIterator : public InternalIterator {
const InternalKeyComparator& icomp, const InternalKeyComparator& icomp,
InternalIterator* index_iter, bool check_filter, InternalIterator* index_iter, bool check_filter,
const SliceTransform* prefix_extractor, bool is_index, const SliceTransform* prefix_extractor, bool is_index,
bool key_includes_seq = true) bool key_includes_seq = true,
bool for_compaction = false)
: table_(table), : table_(table),
read_options_(read_options), read_options_(read_options),
icomp_(icomp), icomp_(icomp),
@ -527,6 +527,7 @@ class BlockBasedTableIterator : public InternalIterator {
check_filter_(check_filter), check_filter_(check_filter),
is_index_(is_index), is_index_(is_index),
key_includes_seq_(key_includes_seq), key_includes_seq_(key_includes_seq),
for_compaction_(for_compaction),
prefix_extractor_(prefix_extractor) {} prefix_extractor_(prefix_extractor) {}
~BlockBasedTableIterator() { delete index_iter_; } ~BlockBasedTableIterator() { delete index_iter_; }
@ -624,6 +625,8 @@ class BlockBasedTableIterator : public InternalIterator {
bool is_index_; bool is_index_;
// If the keys in the blocks over which we iterate include 8 byte sequence // If the keys in the blocks over which we iterate include 8 byte sequence
bool key_includes_seq_; bool key_includes_seq_;
// If this iterator is created for compaction
bool for_compaction_;
// TODO use block offset instead // TODO use block offset instead
std::string prev_index_value_; std::string prev_index_value_;
const SliceTransform* prefix_extractor_; const SliceTransform* prefix_extractor_;

@ -380,7 +380,7 @@ extern InternalIterator* NewErrorInternalIterator(const Status& status,
InternalIterator* CuckooTableReader::NewIterator( InternalIterator* CuckooTableReader::NewIterator(
const ReadOptions& /*read_options*/, const ReadOptions& /*read_options*/,
const SliceTransform* /* prefix_extractor */, Arena* arena, const SliceTransform* /* prefix_extractor */, Arena* arena,
bool /*skip_filters*/) { bool /*skip_filters*/, bool /*for_compaction*/) {
if (!status().ok()) { if (!status().ok()) {
return NewErrorInternalIterator( return NewErrorInternalIterator(
Status::Corruption("CuckooTableReader status is not okay."), arena); Status::Corruption("CuckooTableReader status is not okay."), arena);

@ -49,7 +49,8 @@ class CuckooTableReader: public TableReader {
InternalIterator* NewIterator(const ReadOptions&, InternalIterator* NewIterator(const ReadOptions&,
const SliceTransform* prefix_extractor, const SliceTransform* prefix_extractor,
Arena* arena = nullptr, Arena* arena = nullptr,
bool skip_filters = false) override; bool skip_filters = false,
bool for_compaction = false) override;
void Prepare(const Slice& target) override; void Prepare(const Slice& target) override;
// Report an approximation of how much memory has been used. // Report an approximation of how much memory has been used.

@ -28,7 +28,7 @@ stl_wrappers::KVMap MakeMockFile(
InternalIterator* MockTableReader::NewIterator( InternalIterator* MockTableReader::NewIterator(
const ReadOptions&, const SliceTransform* /* prefix_extractor */, const ReadOptions&, const SliceTransform* /* prefix_extractor */,
Arena* /*arena*/, bool /*skip_filters*/) { Arena* /*arena*/, bool /*skip_filters*/, bool /*for_compaction*/) {
return new MockTableIterator(table_); return new MockTableIterator(table_);
} }

@ -41,7 +41,8 @@ class MockTableReader : public TableReader {
InternalIterator* NewIterator(const ReadOptions&, InternalIterator* NewIterator(const ReadOptions&,
const SliceTransform* prefix_extractor, const SliceTransform* prefix_extractor,
Arena* arena = nullptr, Arena* arena = nullptr,
bool skip_filters = false) override; bool skip_filters = false,
bool for_compaction = false) override;
Status Get(const ReadOptions& readOptions, const Slice& key, Status Get(const ReadOptions& readOptions, const Slice& key,
GetContext* get_context, const SliceTransform* prefix_extractor, GetContext* get_context, const SliceTransform* prefix_extractor,

@ -191,7 +191,7 @@ void PlainTableReader::SetupForCompaction() {
InternalIterator* PlainTableReader::NewIterator( InternalIterator* PlainTableReader::NewIterator(
const ReadOptions& options, const SliceTransform* /* prefix_extractor */, const ReadOptions& options, const SliceTransform* /* prefix_extractor */,
Arena* arena, bool /*skip_filters*/) { Arena* arena, bool /*skip_filters*/, bool /*for_compaction*/) {
bool use_prefix_seek = !IsTotalOrderMode() && !options.total_order_seek; bool use_prefix_seek = !IsTotalOrderMode() && !options.total_order_seek;
if (arena == nullptr) { if (arena == nullptr) {
return new PlainTableIterator(this, use_prefix_seek); return new PlainTableIterator(this, use_prefix_seek);

@ -82,7 +82,8 @@ class PlainTableReader: public TableReader {
InternalIterator* NewIterator(const ReadOptions&, InternalIterator* NewIterator(const ReadOptions&,
const SliceTransform* prefix_extractor, const SliceTransform* prefix_extractor,
Arena* arena = nullptr, Arena* arena = nullptr,
bool skip_filters = false) override; bool skip_filters = false,
bool for_compaction = false) override;
void Prepare(const Slice& target) override; void Prepare(const Slice& target) override;

@ -42,7 +42,8 @@ class TableReader {
virtual InternalIterator* NewIterator(const ReadOptions&, virtual InternalIterator* NewIterator(const ReadOptions&,
const SliceTransform* prefix_extractor, const SliceTransform* prefix_extractor,
Arena* arena = nullptr, Arena* arena = nullptr,
bool skip_filters = false) = 0; bool skip_filters = false,
bool for_compaction = false) = 0;
virtual InternalIterator* NewRangeTombstoneIterator( virtual InternalIterator* NewRangeTombstoneIterator(
const ReadOptions& /*read_options*/) { const ReadOptions& /*read_options*/) {

Loading…
Cancel
Save