Refactor BlockBasedTable::Open (#4636)

Summary:
Refactored and simplified `BlockBasedTable::Open` to be similar to `BlockBasedTableBuilder::Finish` as both these functions complement each other. Also added `BlockBasedTableBuilder::WriteFooter` along the way.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4636

Differential Revision: D12933319

Pulled By: sagar0

fbshipit-source-id: 1ff1d02f6d80a63b5ba720a1fc75e71c7344137b
main
Sagar Vemuri 6 years ago committed by Facebook Github Bot
parent c41c60be13
commit 0463f61837
  1. 59
      table/block_based_table_builder.cc
  2. 2
      table/block_based_table_builder.h
  3. 306
      table/block_based_table_reader.cc
  4. 23
      table/block_based_table_reader.h

@ -875,6 +875,35 @@ void BlockBasedTableBuilder::WriteRangeDelBlock(
}
}
void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
BlockHandle& index_block_handle) {
Rep* r = rep_;
// No need to write out new footer if we're using default checksum.
// We're writing legacy magic number because we want old versions of RocksDB
// be able to read files generated with new release (just in case if
// somebody wants to roll back after an upgrade)
// TODO(icanadi) at some point in the future, when we're absolutely sure
// nobody will roll back to RocksDB 2.x versions, retire the legacy magic
// number and always write new table files with new magic number
bool legacy = (r->table_options.format_version == 0);
// this is guaranteed by BlockBasedTableBuilder's constructor
assert(r->table_options.checksum == kCRC32c ||
r->table_options.format_version != 0);
Footer footer(
legacy ? kLegacyBlockBasedTableMagicNumber : kBlockBasedTableMagicNumber,
r->table_options.format_version);
footer.set_metaindex_handle(metaindex_block_handle);
footer.set_index_handle(index_block_handle);
footer.set_checksum(r->table_options.checksum);
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
assert(r->status.ok());
r->status = r->file->Append(footer_encoding);
if (r->status.ok()) {
r->offset += footer_encoding.size();
}
}
Status BlockBasedTableBuilder::Finish() {
Rep* r = rep_;
bool empty_data_block = r->data_block.empty();
@ -889,13 +918,14 @@ Status BlockBasedTableBuilder::Finish() {
&r->last_key, nullptr /* no next data block */, r->pending_handle);
}
// Write meta blocks and metaindex block with the following order.
// Write meta blocks, metaindex block and footer in the following order.
// 1. [meta block: filter]
// 2. [meta block: index]
// 3. [meta block: compression dictionary]
// 4. [meta block: range deletion tombstone]
// 5. [meta block: properties]
// 6. [metaindex block]
// 7. Footer
BlockHandle metaindex_block_handle, index_block_handle;
MetaIndexBuilder meta_index_builder;
WriteFilterBlock(&meta_index_builder);
@ -908,33 +938,8 @@ Status BlockBasedTableBuilder::Finish() {
WriteRawBlock(meta_index_builder.Finish(), kNoCompression,
&metaindex_block_handle);
}
// Write footer
if (ok()) {
// No need to write out new footer if we're using default checksum.
// We're writing legacy magic number because we want old versions of RocksDB
// be able to read files generated with new release (just in case if
// somebody wants to roll back after an upgrade)
// TODO(icanadi) at some point in the future, when we're absolutely sure
// nobody will roll back to RocksDB 2.x versions, retire the legacy magic
// number and always write new table files with new magic number
bool legacy = (r->table_options.format_version == 0);
// this is guaranteed by BlockBasedTableBuilder's constructor
assert(r->table_options.checksum == kCRC32c ||
r->table_options.format_version != 0);
Footer footer(legacy ? kLegacyBlockBasedTableMagicNumber
: kBlockBasedTableMagicNumber,
r->table_options.format_version);
footer.set_metaindex_handle(metaindex_block_handle);
footer.set_index_handle(index_block_handle);
footer.set_checksum(r->table_options.checksum);
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
assert(r->status.ok());
r->status = r->file->Append(footer_encoding);
if (r->status.ok()) {
r->offset += footer_encoding.size();
}
WriteFooter(metaindex_block_handle, index_block_handle);
}
return r->status;

@ -114,6 +114,8 @@ class BlockBasedTableBuilder : public TableBuilder {
void WritePropertiesBlock(MetaIndexBuilder* meta_index_builder);
void WriteCompressionDictBlock(MetaIndexBuilder* meta_index_builder);
void WriteRangeDelBlock(MetaIndexBuilder* meta_index_builder);
void WriteFooter(BlockHandle& metaindex_block_handle,
BlockHandle& index_block_handle);
struct Rep;
class BlockBasedTablePropertiesCollectorFactory;

@ -780,49 +780,25 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
TailPrefetchStats* tail_prefetch_stats) {
table_reader->reset();
Status s;
Footer footer;
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
// prefetch both index and filters, down to all partitions
const bool prefetch_all = prefetch_index_and_filter_in_cache || level == 0;
const bool preload_all = !table_options.cache_index_and_filter_blocks;
size_t tail_prefetch_size = 0;
if (tail_prefetch_stats != nullptr) {
// Multiple threads may get a 0 (no history) when running in parallel,
// but it will get cleared after the first of them finishes.
tail_prefetch_size = tail_prefetch_stats->GetSuggestedPrefetchSize();
}
if (tail_prefetch_size == 0) {
// Before read footer, readahead backwards to prefetch data. Do more
// readahead if we're going to read index/filter.
// TODO: This may incorrectly select small readahead in case partitioned
// index/filter is enabled and top-level partition pinning is enabled.
// That's because we need to issue readahead before we read the properties,
// at which point we don't yet know the index type.
tail_prefetch_size = prefetch_all || preload_all ? 512 * 1024 : 4 * 1024;
}
size_t prefetch_off;
size_t prefetch_len;
if (file_size < tail_prefetch_size) {
prefetch_off = 0;
prefetch_len = static_cast<size_t>(file_size);
} else {
prefetch_off = static_cast<size_t>(file_size - tail_prefetch_size);
prefetch_len = tail_prefetch_size;
}
TEST_SYNC_POINT_CALLBACK("BlockBasedTable::Open::TailPrefetchLen",
&tail_prefetch_size);
Status s;
// TODO should not have this special logic in the future.
if (!file->use_direct_io()) {
prefetch_buffer.reset(new FilePrefetchBuffer(nullptr, 0, 0, false, true));
s = file->Prefetch(prefetch_off, prefetch_len);
} else {
prefetch_buffer.reset(new FilePrefetchBuffer(nullptr, 0, 0, true, true));
s = prefetch_buffer->Prefetch(file.get(), prefetch_off, prefetch_len);
}
s = PrefetchTail(file.get(), file_size, tail_prefetch_stats, prefetch_all,
preload_all, &prefetch_buffer);
// Read in the following order:
// 1. Footer
// 2. [metaindex block]
// 3. [meta block: properties]
// 4. [meta block: range deletion tombstone]
// 5. [meta block: compression dictionary]
// 6. [meta block: index]
// 7. [meta block: filter]
s = ReadFooterFromFile(file.get(), prefetch_buffer.get(), file_size, &footer,
kBlockBasedTableMagicNumber);
if (!s.ok()) {
@ -857,9 +833,9 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
PersistentCacheOptions(rep->table_options.persistent_cache,
std::string(rep->persistent_cache_key_prefix,
rep->persistent_cache_key_prefix_size),
rep->ioptions.statistics);
rep->ioptions.statistics);
// Read meta index
// Read metaindex
std::unique_ptr<Block> meta;
std::unique_ptr<InternalIterator> meta_iter;
s = ReadMetaBlock(rep, prefetch_buffer.get(), &meta, &meta_iter);
@ -867,38 +843,85 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
return s;
}
// Find filter handle and filter type
if (rep->filter_policy) {
for (auto filter_type :
{Rep::FilterType::kFullFilter, Rep::FilterType::kPartitionedFilter,
Rep::FilterType::kBlockFilter}) {
std::string prefix;
switch (filter_type) {
case Rep::FilterType::kFullFilter:
prefix = kFullFilterBlockPrefix;
break;
case Rep::FilterType::kPartitionedFilter:
prefix = kPartitionedFilterBlockPrefix;
break;
case Rep::FilterType::kBlockFilter:
prefix = kFilterBlockPrefix;
break;
default:
assert(0);
}
std::string filter_block_key = prefix;
filter_block_key.append(rep->filter_policy->Name());
if (FindMetaBlock(meta_iter.get(), filter_block_key, &rep->filter_handle)
.ok()) {
rep->filter_type = filter_type;
break;
}
s = ReadPropertiesBlock(rep, prefetch_buffer.get(), meta_iter.get(),
largest_seqno);
if (!s.ok()) {
return s;
}
// Disregard return status of ReadRangeDelBlock and ReadCompressionDictBlock.
s = ReadRangeDelBlock(rep, prefetch_buffer.get(), meta_iter.get(),
new_table.get(), internal_comparator);
s = ReadCompressionDictBlock(rep, prefetch_buffer.get(), meta_iter.get());
s = PrefetchIndexAndFilterBlocks(rep, prefetch_buffer.get(), meta_iter.get(),
new_table.get(), prefix_extractor,
prefetch_all, table_options, level,
prefetch_index_and_filter_in_cache);
if (s.ok()) {
// Update tail prefetch stats
assert(prefetch_buffer.get() != nullptr);
if (tail_prefetch_stats != nullptr) {
assert(prefetch_buffer->min_offset_read() < file_size);
tail_prefetch_stats->RecordEffectiveSize(
static_cast<size_t>(file_size) - prefetch_buffer->min_offset_read());
}
*table_reader = std::move(new_table);
}
// Read the properties
return s;
}
Status BlockBasedTable::PrefetchTail(
RandomAccessFileReader* file, uint64_t file_size,
TailPrefetchStats* tail_prefetch_stats, const bool prefetch_all,
const bool preload_all,
std::unique_ptr<FilePrefetchBuffer>* prefetch_buffer) {
size_t tail_prefetch_size = 0;
if (tail_prefetch_stats != nullptr) {
// Multiple threads may get a 0 (no history) when running in parallel,
// but it will get cleared after the first of them finishes.
tail_prefetch_size = tail_prefetch_stats->GetSuggestedPrefetchSize();
}
if (tail_prefetch_size == 0) {
// Before read footer, readahead backwards to prefetch data. Do more
// readahead if we're going to read index/filter.
// TODO: This may incorrectly select small readahead in case partitioned
// index/filter is enabled and top-level partition pinning is enabled.
// That's because we need to issue readahead before we read the properties,
// at which point we don't yet know the index type.
tail_prefetch_size = prefetch_all || preload_all ? 512 * 1024 : 4 * 1024;
}
size_t prefetch_off;
size_t prefetch_len;
if (file_size < tail_prefetch_size) {
prefetch_off = 0;
prefetch_len = static_cast<size_t>(file_size);
} else {
prefetch_off = static_cast<size_t>(file_size - tail_prefetch_size);
prefetch_len = tail_prefetch_size;
}
TEST_SYNC_POINT_CALLBACK("BlockBasedTable::Open::TailPrefetchLen",
&tail_prefetch_size);
Status s;
// TODO should not have this special logic in the future.
if (!file->use_direct_io()) {
prefetch_buffer->reset(new FilePrefetchBuffer(nullptr, 0, 0, false, true));
s = file->Prefetch(prefetch_off, prefetch_len);
} else {
prefetch_buffer->reset(new FilePrefetchBuffer(nullptr, 0, 0, true, true));
s = (*prefetch_buffer)->Prefetch(file, prefetch_off, prefetch_len);
}
return s;
}
Status BlockBasedTable::ReadPropertiesBlock(
Rep* rep, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter,
const SequenceNumber largest_seqno) {
bool found_properties_block = true;
s = SeekToPropertiesBlock(meta_iter.get(), &found_properties_block);
Status s;
s = SeekToPropertiesBlock(meta_iter, &found_properties_block);
if (!s.ok()) {
ROCKS_LOG_WARN(rep->ioptions.info_log,
@ -908,9 +931,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
s = meta_iter->status();
TableProperties* table_properties = nullptr;
if (s.ok()) {
s = ReadProperties(meta_iter->value(), rep->file.get(),
prefetch_buffer.get(), rep->footer, rep->ioptions,
&table_properties,
s = ReadProperties(meta_iter->value(), rep->file.get(), prefetch_buffer,
rep->footer, rep->ioptions, &table_properties,
false /* compression_type_missing */,
nullptr /* memory_allocator */);
}
@ -937,41 +959,6 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
}
#endif // ROCKSDB_LITE
// Read the compression dictionary meta block
bool found_compression_dict;
BlockHandle compression_dict_handle;
s = SeekToCompressionDictBlock(meta_iter.get(), &found_compression_dict,
&compression_dict_handle);
if (!s.ok()) {
ROCKS_LOG_WARN(
rep->ioptions.info_log,
"Error when seeking to compression dictionary block from file: %s",
s.ToString().c_str());
} else if (found_compression_dict && !compression_dict_handle.IsNull()) {
// TODO(andrewkr): Add to block cache if cache_index_and_filter_blocks is
// true.
std::unique_ptr<BlockContents> compression_dict_cont{new BlockContents()};
PersistentCacheOptions cache_options;
ReadOptions read_options;
read_options.verify_checksums = false;
BlockFetcher compression_block_fetcher(
rep->file.get(), prefetch_buffer.get(), rep->footer, read_options,
compression_dict_handle, compression_dict_cont.get(), rep->ioptions,
false /* decompress */, false /*maybe_compressed*/,
Slice() /*compression dict*/, cache_options);
s = compression_block_fetcher.ReadBlockContents();
if (!s.ok()) {
ROCKS_LOG_WARN(
rep->ioptions.info_log,
"Encountered error while reading data from compression dictionary "
"block %s",
s.ToString().c_str());
} else {
rep->compression_dict_block = std::move(compression_dict_cont);
}
}
// Read the table properties, if provided.
if (rep->table_properties) {
rep->whole_key_filtering &=
@ -986,13 +973,18 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
&(rep->global_seqno));
if (!s.ok()) {
ROCKS_LOG_ERROR(rep->ioptions.info_log, "%s", s.ToString().c_str());
return s;
}
}
return s;
}
// Read the range del meta block
Status BlockBasedTable::ReadRangeDelBlock(
Rep* rep, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter,
BlockBasedTable* new_table,
const InternalKeyComparator& internal_comparator) {
Status s;
bool found_range_del_block;
s = SeekToRangeDelBlock(meta_iter.get(), &found_range_del_block,
s = SeekToRangeDelBlock(meta_iter, &found_range_del_block,
&rep->range_del_handle);
if (!s.ok()) {
ROCKS_LOG_WARN(
@ -1002,7 +994,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
} else if (found_range_del_block && !rep->range_del_handle.IsNull()) {
ReadOptions read_options;
s = MaybeReadBlockAndLoadToCache(
prefetch_buffer.get(), rep, read_options, rep->range_del_handle,
prefetch_buffer, rep, read_options, rep->range_del_handle,
Slice() /* compression_dict */, &rep->range_del_entry,
false /* is_index */, nullptr /* get_context */);
if (!s.ok()) {
@ -1016,6 +1008,84 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
rep->fragmented_range_dels = std::make_shared<FragmentedRangeTombstoneList>(
std::move(iter), internal_comparator);
}
return s;
}
Status BlockBasedTable::ReadCompressionDictBlock(
Rep* rep, FilePrefetchBuffer* prefetch_buffer,
InternalIterator* meta_iter) {
Status s;
bool found_compression_dict;
BlockHandle compression_dict_handle;
s = SeekToCompressionDictBlock(meta_iter, &found_compression_dict,
&compression_dict_handle);
if (!s.ok()) {
ROCKS_LOG_WARN(
rep->ioptions.info_log,
"Error when seeking to compression dictionary block from file: %s",
s.ToString().c_str());
} else if (found_compression_dict && !compression_dict_handle.IsNull()) {
// TODO(andrewkr): Add to block cache if cache_index_and_filter_blocks is
// true.
std::unique_ptr<BlockContents> compression_dict_cont{new BlockContents()};
PersistentCacheOptions cache_options;
ReadOptions read_options;
read_options.verify_checksums = false;
BlockFetcher compression_block_fetcher(
rep->file.get(), prefetch_buffer, rep->footer, read_options,
compression_dict_handle, compression_dict_cont.get(), rep->ioptions,
false /* decompress */, false /*maybe_compressed*/,
Slice() /*compression dict*/, cache_options);
s = compression_block_fetcher.ReadBlockContents();
if (!s.ok()) {
ROCKS_LOG_WARN(
rep->ioptions.info_log,
"Encountered error while reading data from compression dictionary "
"block %s",
s.ToString().c_str());
} else {
rep->compression_dict_block = std::move(compression_dict_cont);
}
}
return s;
}
Status BlockBasedTable::PrefetchIndexAndFilterBlocks(
Rep* rep, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter,
BlockBasedTable* new_table, const SliceTransform* prefix_extractor,
bool prefetch_all, const BlockBasedTableOptions& table_options,
const int level, const bool prefetch_index_and_filter_in_cache) {
Status s;
// Find filter handle and filter type
if (rep->filter_policy) {
for (auto filter_type :
{Rep::FilterType::kFullFilter, Rep::FilterType::kPartitionedFilter,
Rep::FilterType::kBlockFilter}) {
std::string prefix;
switch (filter_type) {
case Rep::FilterType::kFullFilter:
prefix = kFullFilterBlockPrefix;
break;
case Rep::FilterType::kPartitionedFilter:
prefix = kPartitionedFilterBlockPrefix;
break;
case Rep::FilterType::kBlockFilter:
prefix = kFilterBlockPrefix;
break;
default:
assert(0);
}
std::string filter_block_key = prefix;
filter_block_key.append(rep->filter_policy->Name());
if (FindMetaBlock(meta_iter, filter_block_key, &rep->filter_handle)
.ok()) {
rep->filter_type = filter_type;
break;
}
}
}
bool need_upper_bound_check =
PrefixExtractorChanged(rep->table_properties.get(), prefix_extractor);
@ -1097,8 +1167,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
// pre-load these blocks, which will kept in member variables in Rep
// and with a same life-time as this table object.
IndexReader* index_reader = nullptr;
s = new_table->CreateIndexReader(prefetch_buffer.get(), &index_reader,
meta_iter.get(), level);
s = new_table->CreateIndexReader(prefetch_buffer, &index_reader, meta_iter,
level);
if (s.ok()) {
rep->index_reader.reset(index_reader);
// The partitions of partitioned index are always stored in cache. They
@ -1111,9 +1181,9 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
// Set filter block
if (rep->filter_policy) {
const bool is_a_filter_partition = true;
auto filter = new_table->ReadFilter(
prefetch_buffer.get(), rep->filter_handle, !is_a_filter_partition,
rep->table_prefix_extractor.get());
auto filter = new_table->ReadFilter(prefetch_buffer, rep->filter_handle,
!is_a_filter_partition,
rep->table_prefix_extractor.get());
rep->filter.reset(filter);
// Refer to the comment above about paritioned indexes always being
// cached
@ -1126,16 +1196,6 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
}
}
if (s.ok()) {
assert(prefetch_buffer.get() != nullptr);
if (tail_prefetch_stats != nullptr) {
assert(prefetch_buffer->min_offset_read() < file_size);
tail_prefetch_stats->RecordEffectiveSize(
static_cast<size_t>(file_size) - prefetch_buffer->min_offset_read());
}
*table_reader = std::move(new_table);
}
return s;
}

@ -351,10 +351,31 @@ class BlockBasedTable : public TableReader {
const Slice& user_key, const bool no_io,
const SliceTransform* prefix_extractor = nullptr) const;
// Read the meta block from sst.
static Status PrefetchTail(
RandomAccessFileReader* file, uint64_t file_size,
TailPrefetchStats* tail_prefetch_stats, const bool prefetch_all,
const bool preload_all,
std::unique_ptr<FilePrefetchBuffer>* prefetch_buffer);
static Status ReadMetaBlock(Rep* rep, FilePrefetchBuffer* prefetch_buffer,
std::unique_ptr<Block>* meta_block,
std::unique_ptr<InternalIterator>* iter);
static Status ReadPropertiesBlock(Rep* rep,
FilePrefetchBuffer* prefetch_buffer,
InternalIterator* meta_iter,
const SequenceNumber largest_seqno);
static Status ReadRangeDelBlock(
Rep* rep, FilePrefetchBuffer* prefetch_buffer,
InternalIterator* meta_iter, BlockBasedTable* new_table,
const InternalKeyComparator& internal_comparator);
static Status ReadCompressionDictBlock(Rep* rep,
FilePrefetchBuffer* prefetch_buffer,
InternalIterator* meta_iter);
static Status PrefetchIndexAndFilterBlocks(
Rep* rep, FilePrefetchBuffer* prefetch_buffer,
InternalIterator* meta_iter, BlockBasedTable* new_table,
const SliceTransform* prefix_extractor, bool prefetch_all,
const BlockBasedTableOptions& table_options, const int level,
const bool prefetch_index_and_filter_in_cache);
Status VerifyChecksumInBlocks(InternalIteratorBase<Slice>* index_iter);
Status VerifyChecksumInBlocks(InternalIteratorBase<BlockHandle>* index_iter);

Loading…
Cancel
Save