[RocksDB] Refactor table.cc to reduce code duplication and improve readability.

Summary: In table.cc, the code section that reads in BlockContent and then put it into a Block, appears at least 4 times. This is too much duplication. BlockReader is much shorter after the change and reads way better. D10077 attempted that for index block read. This is a complete cleanup.

Test Plan: make check; ./db_stress

Reviewers: dhruba, sheki

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D10527
main
Haobo Xu 12 years ago
parent fb96ec1686
commit 49fbd5531b
  1. 3
      table/block.cc
  2. 2
      table/block.h
  3. 8
      table/format.cc
  4. 8
      table/format.h
  5. 68
      table/table.cc

@ -23,7 +23,8 @@ inline uint32_t Block::NumRestarts() const {
Block::Block(const BlockContents& contents) Block::Block(const BlockContents& contents)
: data_(contents.data.data()), : data_(contents.data.data()),
size_(contents.data.size()), size_(contents.data.size()),
owned_(contents.heap_allocated) { owned_(contents.heap_allocated),
cachable_(contents.cachable) {
if (size_ < sizeof(uint32_t)) { if (size_ < sizeof(uint32_t)) {
size_ = 0; // Error marker size_ = 0; // Error marker
} else { } else {

@ -22,6 +22,7 @@ class Block {
~Block(); ~Block();
size_t size() const { return size_; } size_t size() const { return size_; }
bool isCachable() const { return cachable_; }
Iterator* NewIterator(const Comparator* comparator); Iterator* NewIterator(const Comparator* comparator);
private: private:
@ -31,6 +32,7 @@ class Block {
size_t size_; size_t size_;
uint32_t restart_offset_; // Offset in data_ of restart array uint32_t restart_offset_; // Offset in data_ of restart array
bool owned_; // Block owns data_[] bool owned_; // Block owns data_[]
bool cachable_;
// No copying allowed // No copying allowed
Block(const Block&); Block(const Block&);

@ -66,10 +66,10 @@ Status Footer::DecodeFrom(Slice* input) {
return result; return result;
} }
Status ReadBlock(RandomAccessFile* file, Status ReadBlockContents(RandomAccessFile* file,
const ReadOptions& options, const ReadOptions& options,
const BlockHandle& handle, const BlockHandle& handle,
BlockContents* result) { BlockContents* result) {
result->data = Slice(); result->data = Slice();
result->cachable = false; result->cachable = false;
result->heap_allocated = false; result->heap_allocated = false;

@ -91,10 +91,10 @@ struct BlockContents {
// Read the block identified by "handle" from "file". On failure // Read the block identified by "handle" from "file". On failure
// return non-OK. On success fill *result and return OK. // return non-OK. On success fill *result and return OK.
extern Status ReadBlock(RandomAccessFile* file, extern Status ReadBlockContents(RandomAccessFile* file,
const ReadOptions& options, const ReadOptions& options,
const BlockHandle& handle, const BlockHandle& handle,
BlockContents* result); BlockContents* result);
// Implementation details follow. Clients should ignore, // Implementation details follow. Clients should ignore,

@ -64,6 +64,32 @@ void Table::SetupCacheKeyPrefix(Rep* rep) {
} }
} }
namespace { // anonymous namespace, not visible externally
// Read the block identified by "handle" from "file".
// The only relevant option is options.verify_checksums for now.
// Set *didIO to true if didIO is not null.
// On failure return non-OK.
// On success fill *result and return OK - caller owns *result
Status ReadBlock(RandomAccessFile* file,
const ReadOptions& options,
const BlockHandle& handle,
Block** result,
bool* didIO = nullptr) {
BlockContents contents;
Status s = ReadBlockContents(file, options, handle, &contents);
if (s.ok()) {
*result = new Block(contents);
}
if (didIO) {
*didIO = true;
}
return s;
}
} // end of anonymous namespace
Status Table::Open(const Options& options, Status Table::Open(const Options& options,
const EnvOptions& soptions, const EnvOptions& soptions,
unique_ptr<RandomAccessFile>&& file, unique_ptr<RandomAccessFile>&& file,
@ -91,15 +117,9 @@ Status Table::Open(const Options& options,
s = footer.DecodeFrom(&footer_input); s = footer.DecodeFrom(&footer_input);
if (!s.ok()) return s; if (!s.ok()) return s;
// Read the index block
BlockContents contents;
Block* index_block = nullptr; Block* index_block = nullptr;
if (s.ok()) { // TODO: we never really verify check sum for index block
s = ReadBlock(file.get(), ReadOptions(), footer.index_handle(), &contents); s = ReadBlock(file.get(), ReadOptions(), footer.index_handle(), &index_block);
if (s.ok()) {
index_block = new Block(contents);
}
}
if (s.ok()) { if (s.ok()) {
// We've successfully read the footer and the index block: we're // We've successfully read the footer and the index block: we're
@ -128,14 +148,13 @@ void Table::ReadMeta(const Footer& footer) {
// TODO(sanjay): Skip this if footer.metaindex_handle() size indicates // TODO(sanjay): Skip this if footer.metaindex_handle() size indicates
// it is an empty block. // it is an empty block.
ReadOptions opt; // TODO: we never really verify check sum for meta index block
BlockContents contents; Block* meta = nullptr;
if (!ReadBlock(rep_->file.get(), opt, footer.metaindex_handle(), if (!ReadBlock(rep_->file.get(), ReadOptions(), footer.metaindex_handle(),
&contents).ok()) { &meta).ok()) {
// Do not propagate errors since meta info is not needed for operation // Do not propagate errors since meta info is not needed for operation
return; return;
} }
Block* meta = new Block(contents);
Iterator* iter = meta->NewIterator(BytewiseComparator()); Iterator* iter = meta->NewIterator(BytewiseComparator());
std::string key = "filter."; std::string key = "filter.";
@ -155,11 +174,11 @@ void Table::ReadFilter(const Slice& filter_handle_value) {
return; return;
} }
// We might want to unify with ReadBlock() if we start // TODO: We might want to unify with ReadBlock() if we start
// requiring checksum verification in Table::Open. // requiring checksum verification in Table::Open.
ReadOptions opt; ReadOptions opt;
BlockContents block; BlockContents block;
if (!ReadBlock(rep_->file.get(), opt, filter_handle, &block).ok()) { if (!ReadBlockContents(rep_->file.get(), opt, filter_handle, &block).ok()) {
return; return;
} }
if (block.heap_allocated) { if (block.heap_allocated) {
@ -206,7 +225,6 @@ Iterator* Table::BlockReader(void* arg,
// can add more features in the future. // can add more features in the future.
if (s.ok()) { if (s.ok()) {
BlockContents contents;
if (block_cache != nullptr) { if (block_cache != nullptr) {
char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
const size_t cache_key_prefix_size = table->rep_->cache_key_prefix_size; const size_t cache_key_prefix_size = table->rep_->cache_key_prefix_size;
@ -223,28 +241,18 @@ Iterator* Table::BlockReader(void* arg,
RecordTick(statistics, BLOCK_CACHE_HIT); RecordTick(statistics, BLOCK_CACHE_HIT);
} else { } else {
s = ReadBlock(table->rep_->file.get(), options, handle, &contents); s = ReadBlock(table->rep_->file.get(), options, handle, &block, didIO);
if (s.ok()) { if (s.ok()) {
block = new Block(contents); if (block->isCachable() && options.fill_cache) {
if (contents.cachable && options.fill_cache) {
cache_handle = block_cache->Insert( cache_handle = block_cache->Insert(
key, block, block->size(), &DeleteCachedBlock); key, block, block->size(), &DeleteCachedBlock);
} }
} }
if (didIO != nullptr) {
*didIO = true; // we did some io from storage
}
RecordTick(statistics, BLOCK_CACHE_MISS); RecordTick(statistics, BLOCK_CACHE_MISS);
} }
} else { } else {
s = ReadBlock(table->rep_->file.get(), options, handle, &contents); s = ReadBlock(table->rep_->file.get(), options, handle, &block, didIO);
if (s.ok()) {
block = new Block(contents);
}
if (didIO != nullptr) {
*didIO = true; // we did some io from storage
}
} }
} }

Loading…
Cancel
Save