Pin mmap files in ReadOnlyDB (#4053)

Summary:
https://github.com/facebook/rocksdb/pull/3881 fixed a bug where PinnableSlice pin mmap files which could be deleted with background compaction. This is however a non-issue for ReadOnlyDB when there is no compaction running and max_open_files is -1. This patch reenables the pinning feature for that case.
Closes https://github.com/facebook/rocksdb/pull/4053

Differential Revision: D8662546

Pulled By: maysamyabandeh

fbshipit-source-id: 402962602eb0f644e17822748332999c3af029fd
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent e8f9d7f0d4
commit 235ab9dd32
  1. 1
      HISTORY.md
  2. 3
      db/convenience.cc
  3. 24
      db/db_test2.cc
  4. 8
      db/table_cache.cc
  5. 9
      db/table_cache.h
  6. 3
      db/version_set.cc
  7. 2
      table/block_based_table_factory.cc
  8. 14
      table/block_based_table_reader.cc
  9. 10
      table/block_based_table_reader.h
  10. 4
      table/block_fetcher.cc
  11. 8
      table/block_fetcher.h
  12. 7
      table/partitioned_filter_block_test.cc
  13. 6
      table/table_builder.h
  14. 6
      table/table_test.cc
  15. 3
      tools/sst_dump_tool.cc
  16. 3
      utilities/column_aware_encoding_util.cc

@ -10,6 +10,7 @@
* Add a new table property, "rocksdb.num.range-deletions", which counts the number of range deletion tombstones in the table. * Add a new table property, "rocksdb.num.range-deletions", which counts the number of range deletion tombstones in the table.
* Improve the performance of iterators doing long range scans by using readahead, when using direct IO. * Improve the performance of iterators doing long range scans by using readahead, when using direct IO.
* pin_top_level_index_and_filter (default true) in BlockBasedTableOptions can be used in combination with cache_index_and_filter_blocks to prefetch and pin the top-level index of partitioned index and filter blocks in cache. It has no impact when cache_index_and_filter_blocks is false. * pin_top_level_index_and_filter (default true) in BlockBasedTableOptions can be used in combination with cache_index_and_filter_blocks to prefetch and pin the top-level index of partitioned index and filter blocks in cache. It has no impact when cache_index_and_filter_blocks is false.
* Avoid memcpy when reading mmap files with OpenReadOnly and max_open_files==-1
### Bug Fixes ### Bug Fixes
* fix deadlock with enable_pipelined_write=true and max_successive_merges > 0 * fix deadlock with enable_pipelined_write=true and max_successive_merges > 0

@ -49,10 +49,11 @@ Status VerifySstFileChecksum(const Options& options,
unique_ptr<TableReader> table_reader; unique_ptr<TableReader> table_reader;
std::unique_ptr<RandomAccessFileReader> file_reader( std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(std::move(file), file_path)); new RandomAccessFileReader(std::move(file), file_path));
const bool kImmortal = true;
s = ioptions.table_factory->NewTableReader( s = ioptions.table_factory->NewTableReader(
TableReaderOptions(ioptions, options.prefix_extractor.get(), env_options, TableReaderOptions(ioptions, options.prefix_extractor.get(), env_options,
internal_comparator, false /* skip_filters */, internal_comparator, false /* skip_filters */,
-1 /* level */), !kImmortal, -1 /* level */),
std::move(file_reader), file_size, &table_reader, std::move(file_reader), file_size, &table_reader,
false /* prefetch_index_and_filter_in_cache */); false /* prefetch_index_and_filter_in_cache */);
if (!s.ok()) { if (!s.ok()) {

@ -2505,6 +2505,8 @@ TEST_F(DBTest2, LiveFilesOmitObsoleteFiles) {
TEST_F(DBTest2, PinnableSliceAndMmapReads) { TEST_F(DBTest2, PinnableSliceAndMmapReads) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.allow_mmap_reads = true; options.allow_mmap_reads = true;
options.max_open_files = 100;
options.compression = kNoCompression;
Reopen(options); Reopen(options);
ASSERT_OK(Put("foo", "bar")); ASSERT_OK(Put("foo", "bar"));
@ -2512,6 +2514,8 @@ TEST_F(DBTest2, PinnableSliceAndMmapReads) {
PinnableSlice pinned_value; PinnableSlice pinned_value;
ASSERT_EQ(Get("foo", &pinned_value), Status::OK()); ASSERT_EQ(Get("foo", &pinned_value), Status::OK());
// It is not safe to pin mmap files as they might disappear by compaction
ASSERT_FALSE(pinned_value.IsPinned());
ASSERT_EQ(pinned_value.ToString(), "bar"); ASSERT_EQ(pinned_value.ToString(), "bar");
dbfull()->TEST_CompactRange(0 /* level */, nullptr /* begin */, dbfull()->TEST_CompactRange(0 /* level */, nullptr /* begin */,
@ -2519,7 +2523,25 @@ TEST_F(DBTest2, PinnableSliceAndMmapReads) {
true /* disallow_trivial_move */); true /* disallow_trivial_move */);
// Ensure pinned_value doesn't rely on memory munmap'd by the above // Ensure pinned_value doesn't rely on memory munmap'd by the above
// compaction. // compaction. It crashes if it does.
ASSERT_EQ(pinned_value.ToString(), "bar");
pinned_value.Reset();
// Unsafe to pin mmap files when they could be kicked out of table cache
Close();
ReadOnlyReopen(options);
ASSERT_EQ(Get("foo", &pinned_value), Status::OK());
ASSERT_FALSE(pinned_value.IsPinned());
ASSERT_EQ(pinned_value.ToString(), "bar");
pinned_value.Reset();
// In read-only mode with infinite capacity on table cache it should pin the
// value and avoid the memcpy
Close();
options.max_open_files = -1;
ReadOnlyReopen(options);
ASSERT_EQ(Get("foo", &pinned_value), Status::OK());
ASSERT_TRUE(pinned_value.IsPinned());
ASSERT_EQ(pinned_value.ToString(), "bar"); ASSERT_EQ(pinned_value.ToString(), "bar");
} }

@ -65,7 +65,10 @@ void AppendVarint64(IterKey* key, uint64_t v) {
TableCache::TableCache(const ImmutableCFOptions& ioptions, TableCache::TableCache(const ImmutableCFOptions& ioptions,
const EnvOptions& env_options, Cache* const cache) const EnvOptions& env_options, Cache* const cache)
: ioptions_(ioptions), env_options_(env_options), cache_(cache) { : ioptions_(ioptions),
env_options_(env_options),
cache_(cache),
immortal_tables_(false) {
if (ioptions_.row_cache) { if (ioptions_.row_cache) {
// If the same cache is shared by multiple instances, we need to // If the same cache is shared by multiple instances, we need to
// disambiguate its entries. // disambiguate its entries.
@ -116,7 +119,8 @@ Status TableCache::GetTableReader(
file_read_hist, ioptions_.rate_limiter, for_compaction)); file_read_hist, ioptions_.rate_limiter, for_compaction));
s = ioptions_.table_factory->NewTableReader( s = ioptions_.table_factory->NewTableReader(
TableReaderOptions(ioptions_, prefix_extractor, env_options, TableReaderOptions(ioptions_, prefix_extractor, env_options,
internal_comparator, skip_filters, level), internal_comparator, skip_filters, immortal_tables_,
level),
std::move(file_reader), fd.GetFileSize(), table_reader, std::move(file_reader), fd.GetFileSize(), table_reader,
prefetch_index_and_filter_in_cache); prefetch_index_and_filter_in_cache);
TEST_SYNC_POINT("TableCache::GetTableReader:0"); TEST_SYNC_POINT("TableCache::GetTableReader:0");

@ -132,6 +132,14 @@ class TableCache {
// For example when max_open_files is -1 we set the backing Cache to this. // For example when max_open_files is -1 we set the backing Cache to this.
static const int kInfiniteCapacity = 0x400000; static const int kInfiniteCapacity = 0x400000;
// The tables opened with this TableCache will be immortal, i.e., their
// lifetime is as long as that of the DB.
void SetTablesAreImmortal() {
if (cache_->GetCapacity() >= kInfiniteCapacity) {
immortal_tables_ = true;
}
}
private: private:
// Build a table reader // Build a table reader
Status GetTableReader(const EnvOptions& env_options, Status GetTableReader(const EnvOptions& env_options,
@ -149,6 +157,7 @@ class TableCache {
const EnvOptions& env_options_; const EnvOptions& env_options_;
Cache* const cache_; Cache* const cache_;
std::string row_cache_id_; std::string row_cache_id_;
bool immortal_tables_;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -3328,6 +3328,9 @@ Status VersionSet::Recover(
if (cfd->IsDropped()) { if (cfd->IsDropped()) {
continue; continue;
} }
if (read_only) {
cfd->table_cache()->SetTablesAreImmortal();
}
assert(cfd->initialized()); assert(cfd->initialized());
auto builders_iter = builders.find(cfd->GetID()); auto builders_iter = builders.find(cfd->GetID());
assert(builders_iter != builders.end()); assert(builders_iter != builders.end());

@ -71,7 +71,7 @@ Status BlockBasedTableFactory::NewTableReader(
table_options_, table_reader_options.internal_comparator, std::move(file), table_options_, table_reader_options.internal_comparator, std::move(file),
file_size, table_reader, table_reader_options.prefix_extractor, file_size, table_reader, table_reader_options.prefix_extractor,
prefetch_index_and_filter_in_cache, table_reader_options.skip_filters, prefetch_index_and_filter_in_cache, table_reader_options.skip_filters,
table_reader_options.level); table_reader_options.level, table_reader_options.immortal);
} }
TableBuilder* BlockBasedTableFactory::NewTableBuilder( TableBuilder* BlockBasedTableFactory::NewTableBuilder(

@ -79,11 +79,11 @@ Status ReadBlockFromFile(
std::unique_ptr<Block>* result, const ImmutableCFOptions& ioptions, std::unique_ptr<Block>* result, const ImmutableCFOptions& ioptions,
bool do_uncompress, const Slice& compression_dict, bool do_uncompress, const Slice& compression_dict,
const PersistentCacheOptions& cache_options, SequenceNumber global_seqno, const PersistentCacheOptions& cache_options, SequenceNumber global_seqno,
size_t read_amp_bytes_per_bit) { size_t read_amp_bytes_per_bit, const bool immortal_file = false) {
BlockContents contents; BlockContents contents;
BlockFetcher block_fetcher(file, prefetch_buffer, footer, options, handle, BlockFetcher block_fetcher(file, prefetch_buffer, footer, options, handle,
&contents, ioptions, do_uncompress, &contents, ioptions, do_uncompress,
compression_dict, cache_options); compression_dict, cache_options, immortal_file);
Status s = block_fetcher.ReadBlockContents(); Status s = block_fetcher.ReadBlockContents();
if (s.ok()) { if (s.ok()) {
result->reset(new Block(std::move(contents), global_seqno, result->reset(new Block(std::move(contents), global_seqno,
@ -694,7 +694,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
unique_ptr<TableReader>* table_reader, unique_ptr<TableReader>* table_reader,
const SliceTransform* prefix_extractor, const SliceTransform* prefix_extractor,
const bool prefetch_index_and_filter_in_cache, const bool prefetch_index_and_filter_in_cache,
const bool skip_filters, const int level) { const bool skip_filters, const int level,
const bool immortal_table) {
table_reader->reset(); table_reader->reset();
Footer footer; Footer footer;
@ -736,7 +737,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
// raw pointer will be used to create HashIndexReader, whose reset may // raw pointer will be used to create HashIndexReader, whose reset may
// access a dangling pointer. // access a dangling pointer.
Rep* rep = new BlockBasedTable::Rep(ioptions, env_options, table_options, Rep* rep = new BlockBasedTable::Rep(ioptions, env_options, table_options,
internal_comparator, skip_filters); internal_comparator, skip_filters,
immortal_table);
rep->file = std::move(file); rep->file = std::move(file);
rep->footer = footer; rep->footer = footer;
rep->index_type = table_options.index_type; rep->index_type = table_options.index_type;
@ -1622,7 +1624,7 @@ BlockIter* BlockBasedTable::NewDataBlockIterator(
&block_value, rep->ioptions, rep->blocks_maybe_compressed, &block_value, rep->ioptions, rep->blocks_maybe_compressed,
compression_dict, rep->persistent_cache_options, compression_dict, rep->persistent_cache_options,
is_index ? kDisableGlobalSequenceNumber : rep->global_seqno, is_index ? kDisableGlobalSequenceNumber : rep->global_seqno,
rep->table_options.read_amp_bytes_per_bit); rep->table_options.read_amp_bytes_per_bit, rep->immortal_table);
} }
if (s.ok()) { if (s.ok()) {
block.value = block_value.release(); block.value = block_value.release();
@ -1723,7 +1725,7 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
block_cache_compressed == nullptr && rep->blocks_maybe_compressed, block_cache_compressed == nullptr && rep->blocks_maybe_compressed,
compression_dict, rep->persistent_cache_options, compression_dict, rep->persistent_cache_options,
is_index ? kDisableGlobalSequenceNumber : rep->global_seqno, is_index ? kDisableGlobalSequenceNumber : rep->global_seqno,
rep->table_options.read_amp_bytes_per_bit); rep->table_options.read_amp_bytes_per_bit, rep->immortal_table);
} }
if (s.ok()) { if (s.ok()) {

@ -92,7 +92,8 @@ class BlockBasedTable : public TableReader {
uint64_t file_size, unique_ptr<TableReader>* table_reader, uint64_t file_size, unique_ptr<TableReader>* table_reader,
const SliceTransform* prefix_extractor = nullptr, const SliceTransform* prefix_extractor = nullptr,
bool prefetch_index_and_filter_in_cache = true, bool prefetch_index_and_filter_in_cache = true,
bool skip_filters = false, int level = -1); bool skip_filters = false, int level = -1,
const bool immortal_table = false);
bool PrefixMayMatch(const Slice& internal_key, bool PrefixMayMatch(const Slice& internal_key,
const ReadOptions& read_options, const ReadOptions& read_options,
@ -420,7 +421,8 @@ struct BlockBasedTable::CachableEntry {
struct BlockBasedTable::Rep { struct BlockBasedTable::Rep {
Rep(const ImmutableCFOptions& _ioptions, const EnvOptions& _env_options, Rep(const ImmutableCFOptions& _ioptions, const EnvOptions& _env_options,
const BlockBasedTableOptions& _table_opt, const BlockBasedTableOptions& _table_opt,
const InternalKeyComparator& _internal_comparator, bool skip_filters) const InternalKeyComparator& _internal_comparator, bool skip_filters,
const bool _immortal_table)
: ioptions(_ioptions), : ioptions(_ioptions),
env_options(_env_options), env_options(_env_options),
table_options(_table_opt), table_options(_table_opt),
@ -432,7 +434,8 @@ struct BlockBasedTable::Rep {
whole_key_filtering(_table_opt.whole_key_filtering), whole_key_filtering(_table_opt.whole_key_filtering),
prefix_filtering(true), prefix_filtering(true),
range_del_handle(BlockHandle::NullBlockHandle()), range_del_handle(BlockHandle::NullBlockHandle()),
global_seqno(kDisableGlobalSequenceNumber) {} global_seqno(kDisableGlobalSequenceNumber),
immortal_table(_immortal_table) {}
const ImmutableCFOptions& ioptions; const ImmutableCFOptions& ioptions;
const EnvOptions& env_options; const EnvOptions& env_options;
@ -510,6 +513,7 @@ struct BlockBasedTable::Rep {
bool blocks_maybe_compressed = true; bool blocks_maybe_compressed = true;
bool closed = false; bool closed = false;
const bool immortal_table;
}; };
class BlockBasedTableIterator : public InternalIterator { class BlockBasedTableIterator : public InternalIterator {

@ -163,8 +163,8 @@ inline
void BlockFetcher::GetBlockContents() { void BlockFetcher::GetBlockContents() {
if (slice_.data() != used_buf_) { if (slice_.data() != used_buf_) {
// the slice content is not the buffer provided // the slice content is not the buffer provided
*contents_ = BlockContents(Slice(slice_.data(), block_size_), false, *contents_ = BlockContents(Slice(slice_.data(), block_size_),
compression_type); immortal_source_, compression_type);
} else { } else {
// page is uncompressed, the buffer either stack or heap provided // page is uncompressed, the buffer either stack or heap provided
if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) { if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) {

@ -23,10 +23,10 @@ class BlockFetcher {
BlockFetcher(RandomAccessFileReader* file, BlockFetcher(RandomAccessFileReader* file,
FilePrefetchBuffer* prefetch_buffer, const Footer& footer, FilePrefetchBuffer* prefetch_buffer, const Footer& footer,
const ReadOptions& read_options, const BlockHandle& handle, const ReadOptions& read_options, const BlockHandle& handle,
BlockContents* contents, BlockContents* contents, const ImmutableCFOptions& ioptions,
const ImmutableCFOptions& ioptions,
bool do_uncompress, const Slice& compression_dict, bool do_uncompress, const Slice& compression_dict,
const PersistentCacheOptions& cache_options) const PersistentCacheOptions& cache_options,
const bool immortal_source = false)
: file_(file), : file_(file),
prefetch_buffer_(prefetch_buffer), prefetch_buffer_(prefetch_buffer),
footer_(footer), footer_(footer),
@ -35,6 +35,7 @@ class BlockFetcher {
contents_(contents), contents_(contents),
ioptions_(ioptions), ioptions_(ioptions),
do_uncompress_(do_uncompress), do_uncompress_(do_uncompress),
immortal_source_(immortal_source),
compression_dict_(compression_dict), compression_dict_(compression_dict),
cache_options_(cache_options) {} cache_options_(cache_options) {}
Status ReadBlockContents(); Status ReadBlockContents();
@ -50,6 +51,7 @@ class BlockFetcher {
BlockContents* contents_; BlockContents* contents_;
const ImmutableCFOptions& ioptions_; const ImmutableCFOptions& ioptions_;
bool do_uncompress_; bool do_uncompress_;
const bool immortal_source_;
const Slice& compression_dict_; const Slice& compression_dict_;
const PersistentCacheOptions& cache_options_; const PersistentCacheOptions& cache_options_;
Status status_; Status status_;

@ -136,8 +136,11 @@ class PartitionedFilterBlockTest : public testing::Test {
const ImmutableCFOptions ioptions(options); const ImmutableCFOptions ioptions(options);
const MutableCFOptions moptions(options); const MutableCFOptions moptions(options);
const EnvOptions env_options; const EnvOptions env_options;
table.reset(new MockedBlockBasedTable(new BlockBasedTable::Rep( const bool kSkipFilters = true;
ioptions, env_options, table_options_, icomp, false))); const bool kImmortal = true;
table.reset(new MockedBlockBasedTable(
new BlockBasedTable::Rep(ioptions, env_options, table_options_, icomp,
!kSkipFilters, !kImmortal)));
auto reader = new PartitionedFilterBlockReader( auto reader = new PartitionedFilterBlockReader(
prefix_extractor, true, BlockContents(slice, false, kNoCompression), prefix_extractor, true, BlockContents(slice, false, kNoCompression),
nullptr, nullptr, icomp, table.get(), pib->seperator_is_key_plus_seq()); nullptr, nullptr, icomp, table.get(), pib->seperator_is_key_plus_seq());

@ -30,12 +30,14 @@ struct TableReaderOptions {
const SliceTransform* _prefix_extractor, const SliceTransform* _prefix_extractor,
const EnvOptions& _env_options, const EnvOptions& _env_options,
const InternalKeyComparator& _internal_comparator, const InternalKeyComparator& _internal_comparator,
bool _skip_filters = false, int _level = -1) bool _skip_filters = false, bool _immortal = false,
int _level = -1)
: ioptions(_ioptions), : ioptions(_ioptions),
prefix_extractor(_prefix_extractor), prefix_extractor(_prefix_extractor),
env_options(_env_options), env_options(_env_options),
internal_comparator(_internal_comparator), internal_comparator(_internal_comparator),
skip_filters(_skip_filters), skip_filters(_skip_filters),
immortal(_immortal),
level(_level) {} level(_level) {}
const ImmutableCFOptions& ioptions; const ImmutableCFOptions& ioptions;
@ -44,6 +46,8 @@ struct TableReaderOptions {
const InternalKeyComparator& internal_comparator; const InternalKeyComparator& internal_comparator;
// This is only used for BlockBasedTable (reader) // This is only used for BlockBasedTable (reader)
bool skip_filters; bool skip_filters;
// Whether the table will be valid as long as the DB is open
bool immortal;
// what level this table/file is on, -1 for "not set, don't know" // what level this table/file is on, -1 for "not set, don't know"
int level; int level;
}; };

@ -358,10 +358,12 @@ class TableConstructor: public Constructor {
uniq_id_ = cur_uniq_id_++; uniq_id_ = cur_uniq_id_++;
file_reader_.reset(test::GetRandomAccessFileReader(new test::StringSource( file_reader_.reset(test::GetRandomAccessFileReader(new test::StringSource(
GetSink()->contents(), uniq_id_, ioptions.allow_mmap_reads))); GetSink()->contents(), uniq_id_, ioptions.allow_mmap_reads)));
const bool skip_filters = false; const bool kSkipFilters = true;
const bool kImmortal = true;
return ioptions.table_factory->NewTableReader( return ioptions.table_factory->NewTableReader(
TableReaderOptions(ioptions, moptions.prefix_extractor.get(), soptions, TableReaderOptions(ioptions, moptions.prefix_extractor.get(), soptions,
internal_comparator, skip_filters, level_), internal_comparator, !kSkipFilters, !kImmortal,
level_),
std::move(file_reader_), GetSink()->contents().size(), &table_reader_); std::move(file_reader_), GetSink()->contents().size(), &table_reader_);
} }

@ -132,8 +132,7 @@ Status SstFileReader::NewTableReader(
if (BlockBasedTableFactory::kName == options_.table_factory->Name()) { if (BlockBasedTableFactory::kName == options_.table_factory->Name()) {
return options_.table_factory->NewTableReader( return options_.table_factory->NewTableReader(
TableReaderOptions(ioptions_, moptions_.prefix_extractor.get(), TableReaderOptions(ioptions_, moptions_.prefix_extractor.get(),
soptions_, internal_comparator_, soptions_, internal_comparator_),
/*skip_filters=*/false),
std::move(file_), file_size, &table_reader_, /*enable_prefetch=*/false); std::move(file_), file_size, &table_reader_, /*enable_prefetch=*/false);
} }

@ -57,8 +57,7 @@ void ColumnAwareEncodingReader::InitTableReader(const std::string& file_path) {
std::unique_ptr<TableReader> table_reader; std::unique_ptr<TableReader> table_reader;
options_.table_factory->NewTableReader( options_.table_factory->NewTableReader(
TableReaderOptions(ioptions_, moptions_.prefix_extractor.get(), soptions_, TableReaderOptions(ioptions_, moptions_.prefix_extractor.get(), soptions_,
internal_comparator_, internal_comparator_),
/*skip_filters=*/false),
std::move(file_), file_size, &table_reader, /*enable_prefetch=*/false); std::move(file_), file_size, &table_reader, /*enable_prefetch=*/false);
table_reader_.reset(static_cast_with_check<BlockBasedTable, TableReader>( table_reader_.reset(static_cast_with_check<BlockBasedTable, TableReader>(

Loading…
Cancel
Save