remove malloc when create data and index iterator in Get

Summary:
  Define Block::Iter to be an independent class to be used by block_based_table_reader
  When creating data and index iterator, update an existing iterator rather than new one
  Thus malloc and free could be reduced

Benchmark,
Base:
commit 76286ee67e
commands:
--db=/dev/shm/rocksdb --num_levels=6 --key_size=20 --prefix_size=20 --keys_per_prefix=0 --value_size=100 --write_buffer_size=134217728 --max_write_buffer_number=2 --target_file_size_base=33554432 --max_bytes_for_level_base=1073741824 --verify_checksum=false --max_background_compactions=4 --use_plain_table=0 --memtablerep=prefix_hash --open_files=-1 --mmap_read=1 --mmap_write=0 --bloom_bits=10 --bloom_locality=1 --memtable_bloom_bits=500000 --compression_type=lz4 --num=2621440 --use_hash_search=1 --block_size=1024 --block_restart_interval=1 --use_existing_db=1 --threads=1 --benchmarks=readrandom —disable_auto_compactions=1

malloc: 3.30% -> 1.42%
free: 3.59%->1.61%

Test Plan:
  make all check
  run db_stress
  valgrind ./db_test ./table_test

Reviewers: ljin, yhchiang, dhruba, igor, sdong

Reviewed By: sdong

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D20655
main
Feng Zhu 11 years ago
parent 76286ee67e
commit 8f09d53fd1
  1. 215
      table/block.cc
  2. 128
      table/block.h
  3. 82
      table/block_based_table_reader.cc
  4. 10
      table/block_based_table_reader.h

@ -17,44 +17,14 @@
#include <vector>
#include "rocksdb/comparator.h"
#include "table/format.h"
#include "table/block_hash_index.h"
#include "table/block_prefix_index.h"
#include "table/format.h"
#include "util/coding.h"
#include "util/logging.h"
#include "db/dbformat.h"
namespace rocksdb {
uint32_t Block::NumRestarts() const {
assert(size_ >= 2*sizeof(uint32_t));
return DecodeFixed32(data_ + size_ - sizeof(uint32_t));
}
Block::Block(const BlockContents& contents)
: data_(contents.data.data()),
size_(contents.data.size()),
owned_(contents.heap_allocated),
cachable_(contents.cachable),
compression_type_(contents.compression_type) {
if (size_ < sizeof(uint32_t)) {
size_ = 0; // Error marker
} else {
restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32_t);
if (restart_offset_ > size_ - sizeof(uint32_t)) {
// The size is too small for NumRestarts() and therefore
// restart_offset_ wrapped around.
size_ = 0;
}
}
}
Block::~Block() {
if (owned_) {
delete[] data_;
}
}
// Helper routine: decode the next block entry starting at "p",
// storing the number of shared key bytes, non_shared key bytes,
// and the length of the value in "*shared", "*non_shared", and
@ -85,78 +55,12 @@ static inline const char* DecodeEntry(const char* p, const char* limit,
return p;
}
class Block::Iter : public Iterator {
private:
const Comparator* const comparator_;
const char* const data_; // underlying block contents
uint32_t const restarts_; // Offset of restart array (list of fixed32)
uint32_t const num_restarts_; // Number of uint32_t entries in restart array
// current_ is offset in data_ of current entry. >= restarts_ if !Valid
uint32_t current_;
uint32_t restart_index_; // Index of restart block in which current_ falls
IterKey key_;
Slice value_;
Status status_;
BlockHashIndex* hash_index_;
BlockPrefixIndex* prefix_index_;
inline int Compare(const Slice& a, const Slice& b) const {
return comparator_->Compare(a, b);
}
// Return the offset in data_ just past the end of the current entry.
inline uint32_t NextEntryOffset() const {
return (value_.data() + value_.size()) - data_;
}
uint32_t GetRestartPoint(uint32_t index) {
assert(index < num_restarts_);
return DecodeFixed32(data_ + restarts_ + index * sizeof(uint32_t));
}
void SeekToRestartPoint(uint32_t index) {
key_.Clear();
restart_index_ = index;
// current_ will be fixed by ParseNextKey();
// ParseNextKey() starts at the end of value_, so set value_ accordingly
uint32_t offset = GetRestartPoint(index);
value_ = Slice(data_ + offset, 0);
}
public:
Iter(const Comparator* comparator, const char* data, uint32_t restarts,
uint32_t num_restarts, BlockHashIndex* hash_index,
BlockPrefixIndex* prefix_index)
: comparator_(comparator),
data_(data),
restarts_(restarts),
num_restarts_(num_restarts),
current_(restarts_),
restart_index_(num_restarts_),
hash_index_(hash_index),
prefix_index_(prefix_index) {
assert(num_restarts_ > 0);
}
virtual bool Valid() const { return current_ < restarts_; }
virtual Status status() const { return status_; }
virtual Slice key() const {
assert(Valid());
return key_.GetKey();
}
virtual Slice value() const {
assert(Valid());
return value_;
}
virtual void Next() {
void BlockIter::Next() {
assert(Valid());
ParseNextKey();
}
}
virtual void Prev() {
void BlockIter::Prev() {
assert(Valid());
// Scan backwards to a restart point before current_
@ -175,9 +79,12 @@ class Block::Iter : public Iterator {
do {
// Loop until end of current entry hits the start of original entry
} while (ParseNextKey() && NextEntryOffset() < original);
}
}
virtual void Seek(const Slice& target) {
void BlockIter::Seek(const Slice& target) {
if (data_ == nullptr) { // Not init yet
return;
}
uint32_t index = 0;
bool ok = false;
if (prefix_index_) {
@ -198,29 +105,35 @@ class Block::Iter : public Iterator {
return;
}
}
}
void BlockIter::SeekToFirst() {
if (data_ == nullptr) { // Not init yet
return;
}
virtual void SeekToFirst() {
SeekToRestartPoint(0);
ParseNextKey();
}
}
virtual void SeekToLast() {
void BlockIter::SeekToLast() {
if (data_ == nullptr) { // Not init yet
return;
}
SeekToRestartPoint(num_restarts_ - 1);
while (ParseNextKey() && NextEntryOffset() < restarts_) {
// Keep skipping
}
}
}
private:
void CorruptionError() {
void BlockIter::CorruptionError() {
current_ = restarts_;
restart_index_ = num_restarts_;
status_ = Status::Corruption("bad entry in block");
key_.Clear();
value_.clear();
}
}
bool ParseNextKey() {
bool BlockIter::ParseNextKey() {
current_ = NextEntryOffset();
const char* p = data_ + current_;
const char* limit = data_ + restarts_; // Restarts come right after data
@ -248,9 +161,9 @@ class Block::Iter : public Iterator {
}
}
// Binary search in restart array to find the first restart point
// with a key >= target (TODO: this comment is inaccurate)
bool BinarySeek(const Slice& target, uint32_t left, uint32_t right,
// Binary search in restart array to find the first restart point
// with a key >= target (TODO: this comment is inaccurate)
bool BlockIter::BinarySeek(const Slice& target, uint32_t left, uint32_t right,
uint32_t* index) {
assert(left <= right);
@ -282,11 +195,11 @@ class Block::Iter : public Iterator {
*index = left;
return true;
}
}
// Compare target key and the block key of the block of `block_index`.
// Return -1 if error.
int CompareBlockKey(uint32_t block_index, const Slice& target) {
// Compare target key and the block key of the block of `block_index`.
// Return -1 if error.
int BlockIter::CompareBlockKey(uint32_t block_index, const Slice& target) {
uint32_t region_offset = GetRestartPoint(block_index);
uint32_t shared, non_shared, value_length;
const char* key_ptr = DecodeEntry(data_ + region_offset, data_ + restarts_,
@ -297,11 +210,11 @@ class Block::Iter : public Iterator {
}
Slice block_key(key_ptr, non_shared);
return Compare(block_key, target);
}
}
// Binary search in block_ids to find the first block
// with a key >= target
bool BinaryBlockIndexSeek(const Slice& target, uint32_t* block_ids,
// Binary search in block_ids to find the first block
// with a key >= target
bool BlockIter::BinaryBlockIndexSeek(const Slice& target, uint32_t* block_ids,
uint32_t left, uint32_t right,
uint32_t* index) {
assert(left <= right);
@ -349,9 +262,9 @@ class Block::Iter : public Iterator {
current_ = restarts_;
return false;
}
}
}
bool HashSeek(const Slice& target, uint32_t* index) {
bool BlockIter::HashSeek(const Slice& target, uint32_t* index) {
assert(hash_index_);
auto restart_index = hash_index_->GetRestartIndex(target);
if (restart_index == nullptr) {
@ -364,34 +277,78 @@ class Block::Iter : public Iterator {
auto left = restart_index->first_index;
auto right = restart_index->first_index + restart_index->num_blocks - 1;
return BinarySeek(target, left, right, index);
}
}
bool PrefixSeek(const Slice& target, uint32_t* index) {
bool BlockIter::PrefixSeek(const Slice& target, uint32_t* index) {
assert(prefix_index_);
uint32_t* block_ids = nullptr;
uint32_t num_blocks = prefix_index_->GetBlocks(target, &block_ids);
if (num_blocks == 0) {
current_ = restarts_;
return false;
} else {
return BinaryBlockIndexSeek(target, block_ids, 0, num_blocks - 1, index);
}
}
uint32_t Block::NumRestarts() const {
assert(size_ >= 2*sizeof(uint32_t));
return DecodeFixed32(data_ + size_ - sizeof(uint32_t));
}
Block::Block(const BlockContents& contents)
: data_(contents.data.data()),
size_(contents.data.size()),
owned_(contents.heap_allocated),
cachable_(contents.cachable),
compression_type_(contents.compression_type) {
if (size_ < sizeof(uint32_t)) {
size_ = 0; // Error marker
} else {
restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32_t);
if (restart_offset_ > size_ - sizeof(uint32_t)) {
// The size is too small for NumRestarts() and therefore
// restart_offset_ wrapped around.
size_ = 0;
}
}
};
}
Iterator* Block::NewIterator(const Comparator* cmp) {
Block::~Block() {
if (owned_) {
delete[] data_;
}
}
Iterator* Block::NewIterator(const Comparator* cmp, BlockIter* iter) {
if (size_ < 2*sizeof(uint32_t)) {
if (iter != nullptr) {
iter->SetStatus(Status::Corruption("bad block contents"));
return iter;
} else {
return NewErrorIterator(Status::Corruption("bad block contents"));
}
}
const uint32_t num_restarts = NumRestarts();
if (num_restarts == 0) {
if (iter != nullptr) {
iter->SetStatus(Status::OK());
return iter;
} else {
return NewEmptyIterator();
}
} else {
if (iter != nullptr) {
iter->Initialize(cmp, data_, restart_offset_, num_restarts,
hash_index_.get(), prefix_index_.get());
} else {
return new Iter(cmp, data_, restart_offset_, num_restarts,
iter = new BlockIter(cmp, data_, restart_offset_, num_restarts,
hash_index_.get(), prefix_index_.get());
}
}
return iter;
}
void Block::SetBlockHashIndex(BlockHashIndex* hash_index) {

@ -13,11 +13,13 @@
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "db/dbformat.h"
namespace rocksdb {
struct BlockContents;
class Comparator;
class BlockIter;
class BlockHashIndex;
class BlockPrefixIndex;
@ -40,7 +42,11 @@ class Block {
// NOTE: for the hash based lookup, if a key prefix doesn't match any key,
// the iterator will simply be set as "invalid", rather than returning
// the key that is just pass the target key.
Iterator* NewIterator(const Comparator* comparator);
//
// If iter is null, return new Iterator
// If iter is not null, update this one and return it as Iterator*
Iterator* NewIterator(const Comparator* comparator,
BlockIter* iter = nullptr);
void SetBlockHashIndex(BlockHashIndex* hash_index);
void SetBlockPrefixIndex(BlockPrefixIndex* prefix_index);
@ -57,8 +63,126 @@ class Block {
// No copying allowed
Block(const Block&);
void operator=(const Block&);
};
class BlockIter : public Iterator {
public:
BlockIter()
: comparator_(nullptr),
data_(nullptr),
restarts_(0),
num_restarts_(0),
current_(0),
restart_index_(0),
status_(Status::OK()),
hash_index_(nullptr),
prefix_index_(nullptr) {}
BlockIter(const Comparator* comparator, const char* data, uint32_t restarts,
uint32_t num_restarts, BlockHashIndex* hash_index,
BlockPrefixIndex* prefix_index)
: BlockIter() {
Initialize(comparator, data, restarts, num_restarts,
hash_index, prefix_index);
}
void Initialize(const Comparator* comparator, const char* data,
uint32_t restarts, uint32_t num_restarts, BlockHashIndex* hash_index,
BlockPrefixIndex* prefix_index) {
assert(data_ == nullptr); // Ensure it is called only once
assert(num_restarts > 0); // Ensure the param is valid
comparator_ = comparator;
data_ = data;
restarts_ = restarts;
num_restarts_ = num_restarts;
current_ = restarts_;
restart_index_ = num_restarts_;
hash_index_ = hash_index;
prefix_index_ = prefix_index;
}
void SetStatus(Status s) {
status_ = s;
}
virtual bool Valid() const override { return current_ < restarts_; }
virtual Status status() const override { return status_; }
virtual Slice key() const override {
assert(Valid());
return key_.GetKey();
}
virtual Slice value() const override {
assert(Valid());
return value_;
}
virtual void Next() override;
virtual void Prev() override;
virtual void Seek(const Slice& target) override;
virtual void SeekToFirst() override;
virtual void SeekToLast() override;
private:
const Comparator* comparator_;
const char* data_; // underlying block contents
uint32_t restarts_; // Offset of restart array (list of fixed32)
uint32_t num_restarts_; // Number of uint32_t entries in restart array
// current_ is offset in data_ of current entry. >= restarts_ if !Valid
uint32_t current_;
uint32_t restart_index_; // Index of restart block in which current_ falls
IterKey key_;
Slice value_;
Status status_;
BlockHashIndex* hash_index_;
BlockPrefixIndex* prefix_index_;
inline int Compare(const Slice& a, const Slice& b) const {
return comparator_->Compare(a, b);
}
// Return the offset in data_ just past the end of the current entry.
inline uint32_t NextEntryOffset() const {
return (value_.data() + value_.size()) - data_;
}
uint32_t GetRestartPoint(uint32_t index) {
assert(index < num_restarts_);
return DecodeFixed32(data_ + restarts_ + index * sizeof(uint32_t));
}
void SeekToRestartPoint(uint32_t index) {
key_.Clear();
restart_index_ = index;
// current_ will be fixed by ParseNextKey();
// ParseNextKey() starts at the end of value_, so set value_ accordingly
uint32_t offset = GetRestartPoint(index);
value_ = Slice(data_ + offset, 0);
}
void CorruptionError();
bool ParseNextKey();
bool BinarySeek(const Slice& target, uint32_t left, uint32_t right,
uint32_t* index);
int CompareBlockKey(uint32_t block_index, const Slice& target);
bool BinaryBlockIndexSeek(const Slice& target, uint32_t* block_ids,
uint32_t left, uint32_t right,
uint32_t* index);
bool HashSeek(const Slice& target, uint32_t* index);
bool PrefixSeek(const Slice& target, uint32_t* index);
class Iter;
};
} // namespace rocksdb

@ -135,7 +135,9 @@ class BlockBasedTable::IndexReader {
virtual ~IndexReader() {}
// Create an iterator for index access.
virtual Iterator* NewIterator() = 0;
// An iter is passed in, if it is not null, update this one and return it
// If it is null, create a new Iterator
virtual Iterator* NewIterator(BlockIter* iter = nullptr) = 0;
// The size of the index.
virtual size_t size() const = 0;
@ -168,8 +170,8 @@ class BinarySearchIndexReader : public IndexReader {
return s;
}
virtual Iterator* NewIterator() override {
return index_block_->NewIterator(comparator_);
virtual Iterator* NewIterator(BlockIter* iter = nullptr) override {
return index_block_->NewIterator(comparator_, iter);
}
virtual size_t size() const override { return index_block_->size(); }
@ -284,8 +286,8 @@ class HashIndexReader : public IndexReader {
return Status::OK();
}
virtual Iterator* NewIterator() override {
return index_block_->NewIterator(comparator_);
virtual Iterator* NewIterator(BlockIter* iter = nullptr) override {
return index_block_->NewIterator(comparator_, iter);
}
virtual size_t size() const override { return index_block_->size(); }
@ -779,10 +781,11 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
return { filter, cache_handle };
}
Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options) {
Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options,
BlockIter* input_iter) {
// index reader has already been pre-populated.
if (rep_->index_reader) {
return rep_->index_reader->NewIterator();
return rep_->index_reader->NewIterator(input_iter);
}
bool no_io = read_options.read_tier == kBlockCacheTier;
@ -796,8 +799,13 @@ Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options) {
BLOCK_CACHE_INDEX_HIT, statistics);
if (cache_handle == nullptr && no_io) {
if (input_iter != nullptr) {
input_iter->SetStatus(Status::Incomplete("no blocking io"));
return input_iter;
} else {
return NewErrorIterator(Status::Incomplete("no blocking io"));
}
}
IndexReader* index_reader = nullptr;
if (cache_handle != nullptr) {
@ -811,8 +819,13 @@ Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options) {
if (!s.ok()) {
// make sure if something goes wrong, index_reader shall remain intact.
assert(index_reader == nullptr);
if (input_iter != nullptr) {
input_iter->SetStatus(s);
return input_iter;
} else {
return NewErrorIterator(s);
}
}
cache_handle = block_cache->Insert(key, index_reader, index_reader->size(),
&DeleteCachedEntry<IndexReader>);
@ -820,7 +833,8 @@ Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options) {
}
assert(cache_handle);
auto iter = index_reader->NewIterator();
Iterator* iter;
iter = index_reader->NewIterator(input_iter);
iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, cache_handle);
return iter;
@ -828,8 +842,11 @@ Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options) {
// Convert an index iterator value (i.e., an encoded BlockHandle)
// into an iterator over the contents of the corresponding block.
// If input_iter is null, new a iterator
// If input_iter is not null, update this iter and return it
Iterator* BlockBasedTable::NewDataBlockIterator(Rep* rep,
const ReadOptions& ro, const Slice& index_value) {
const ReadOptions& ro, const Slice& index_value,
BlockIter* input_iter) {
const bool no_io = (ro.read_tier == kBlockCacheTier);
Cache* block_cache = rep->options.block_cache.get();
Cache* block_cache_compressed = rep->options.
@ -843,8 +860,13 @@ Iterator* BlockBasedTable::NewDataBlockIterator(Rep* rep,
Status s = handle.DecodeFrom(&input);
if (!s.ok()) {
if (input_iter != nullptr) {
input_iter->SetStatus(s);
return input_iter;
} else {
return NewErrorIterator(s);
}
}
// If either block cache is enabled, we'll try to read from it.
if (block_cache != nullptr || block_cache_compressed != nullptr) {
@ -889,24 +911,34 @@ Iterator* BlockBasedTable::NewDataBlockIterator(Rep* rep,
if (block.value == nullptr) {
if (no_io) {
// Could not read from block_cache and can't do IO
if (input_iter != nullptr) {
input_iter->SetStatus(Status::Incomplete("no blocking io"));
return input_iter;
} else {
return NewErrorIterator(Status::Incomplete("no blocking io"));
}
}
s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle,
&block.value, rep->options.env);
}
Iterator* iter;
if (block.value != nullptr) {
iter = block.value->NewIterator(&rep->internal_comparator);
iter = block.value->NewIterator(&rep->internal_comparator, input_iter);
if (block.cache_handle != nullptr) {
iter->RegisterCleanup(&ReleaseCachedEntry, block_cache,
block.cache_handle);
} else {
iter->RegisterCleanup(&DeleteHeldResource<Block>, block.value, nullptr);
}
} else {
if (input_iter != nullptr) {
input_iter->SetStatus(s);
iter = input_iter;
} else {
iter = NewErrorIterator(s);
}
}
return iter;
}
@ -1023,12 +1055,14 @@ Status BlockBasedTable::Get(
const Slice& v),
void (*mark_key_may_exist_handler)(void* handle_context)) {
Status s;
Iterator* iiter = NewIndexIterator(read_options);
BlockIter iiter;
NewIndexIterator(read_options, &iiter);
auto filter_entry = GetFilter(read_options.read_tier == kBlockCacheTier);
FilterBlockReader* filter = filter_entry.value;
bool done = false;
for (iiter->Seek(key); iiter->Valid() && !done; iiter->Next()) {
Slice handle_value = iiter->value();
for (iiter.Seek(key); iiter.Valid() && !done; iiter.Next()) {
Slice handle_value = iiter.value();
BlockHandle handle;
bool may_not_exist_in_filter =
@ -1043,39 +1077,43 @@ Status BlockBasedTable::Get(
RecordTick(rep_->options.statistics.get(), BLOOM_FILTER_USEFUL);
break;
} else {
unique_ptr<Iterator> block_iter(
NewDataBlockIterator(rep_, read_options, iiter->value()));
BlockIter biter;
NewDataBlockIterator(rep_, read_options, iiter.value(), &biter);
if (read_options.read_tier && block_iter->status().IsIncomplete()) {
if (read_options.read_tier && biter.status().IsIncomplete()) {
// couldn't get block from block_cache
// Update Saver.state to Found because we are only looking for whether
// we can guarantee the key is not there when "no_io" is set
(*mark_key_may_exist_handler)(handle_context);
break;
}
if (!biter.status().ok()) {
s = biter.status();
break;
}
// Call the *saver function on each entry/block until it returns false
for (block_iter->Seek(key); block_iter->Valid(); block_iter->Next()) {
for (biter.Seek(key); biter.Valid(); biter.Next()) {
ParsedInternalKey parsed_key;
if (!ParseInternalKey(block_iter->key(), &parsed_key)) {
if (!ParseInternalKey(biter.key(), &parsed_key)) {
s = Status::Corruption(Slice());
}
if (!(*result_handler)(handle_context, parsed_key,
block_iter->value())) {
biter.value())) {
done = true;
break;
}
}
s = block_iter->status();
s = biter.status();
}
}
filter_entry.Release(rep_->options.block_cache.get());
if (s.ok()) {
s = iiter->status();
s = iiter.status();
}
delete iiter;
return s;
}

@ -23,6 +23,7 @@
namespace rocksdb {
class Block;
class BlockIter;
class BlockHandle;
class Cache;
class FilterBlockReader;
@ -111,8 +112,10 @@ class BlockBasedTable : public TableReader {
bool compaction_optimized_;
class BlockEntryIteratorState;
// input_iter: if it is not null, update this one and return it as Iterator
static Iterator* NewDataBlockIterator(Rep* rep, const ReadOptions& ro,
const Slice& index_value);
const Slice& index_value,
BlockIter* input_iter = nullptr);
// For the following two functions:
// if `no_io == true`, we will not try to read filter/index from sst file
@ -120,6 +123,8 @@ class BlockBasedTable : public TableReader {
CachableEntry<FilterBlockReader> GetFilter(bool no_io = false) const;
// Get the iterator from the index reader.
// If input_iter is not set, return new Iterator
// If input_iter is set, update it and return it as Iterator
//
// Note: ErrorIterator with Status::Incomplete shall be returned if all the
// following conditions are met:
@ -127,7 +132,8 @@ class BlockBasedTable : public TableReader {
// 2. index is not present in block cache.
// 3. We disallowed any io to be performed, that is, read_options ==
// kBlockCacheTier
Iterator* NewIndexIterator(const ReadOptions& read_options);
Iterator* NewIndexIterator(const ReadOptions& read_options,
BlockIter* input_iter = nullptr);
// Read block cache from block caches (if set): block_cache and
// block_cache_compressed.

Loading…
Cancel
Save