Miscellaneous performance improvements

Summary:
I was investigating performance issues in the SstFileWriter and found all of the following:

- The SstFileWriter::Add() function created a local InternalKey every time it was called generating a allocation and free each time.  Changed to have an InternalKey member variable that can be reset with the new InternalKey::Set() function.
- In SstFileWriter::Add() the smallest_key and largest_key values were assigned the result of a ToString() call, but it is simpler to just assign them directly from the user's key.
- The Slice class had no move constructor so each time one was returned from a function a new one had to be allocated, the old data copied to the new, and the old one was freed.  I added the move constructor which also required a copy constructor and assignment operator.
- The BlockBuilder::CurrentSizeEstimate() function calculates the current estimate size, but was being called 2 or 3 times for each key added.  I changed the class to maintain a running estimate (equal to the original calculation) so that the function can return an already calculated value.
- The code in BlockBuilder::Add() that calculated the shared bytes between the last key and the new key duplicated what Slice::difference_offset does, so I replaced it with the standard function.
- BlockBuilder::Add() had code to copy just the changed portion into the last key value (and asserted that it now matched the new key).  It is more efficient just to copy the whole new key over.
- Moved this same code up into the 'if (use_delta_encoding_)' since the last key value is only needed when delta encoding is on.
- FlushBlockBySizePolicy::BlockAlmostFull calculated a standard deviation value each time it was called, but this information would only change if block_size of block_size_deviation changed, so I created a member variable to hold the value to avoid the calculation each time.
- Each PutVarint??() function has a buffer and calls std::string::append().  Two or three calls in a row could share a buffer and a single call to std::string::append().

Some of these will be helpful outside of the SstFileWriter.  I'm not 100% the addition of the move constructor is appropriate as I wonder why this wasn't done before - maybe because of compiler compatibility?  I tried it on gcc 4.8 and 4.9.

Test Plan: The changes should not affect the results so the existing tests should all still work and no new tests were added.  The value of the changes was seen by manually testing the SstFileWriter class through MyRocks and adding timing code to identify problem areas.

Reviewers: sdong, IslamAbdelRahman

Reviewed By: IslamAbdelRahman

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D59607
main
Jay Edgar 9 years ago
parent e6f68faf99
commit efd013d6d8
  1. 2
      db/db_properties_test.cc
  2. 4
      db/dbformat.h
  3. 29
      db/version_edit.cc
  4. 8
      table/block_based_table_builder.cc
  5. 39
      table/block_builder.cc
  6. 3
      table/block_builder.h
  7. 22
      table/flush_block_policy.cc
  8. 3
      table/format.cc
  9. 9
      table/sst_file_writer.cc
  10. 58
      util/coding.h

@ -236,7 +236,7 @@ void GetExpectedTableProperties(TableProperties* expected_tp,
expected_tp->data_size =
kTableCount * (kKeysPerTable * (kKeySize + 8 + kValueSize));
expected_tp->index_size =
expected_tp->num_data_blocks * (kAvgSuccessorSize + 12);
expected_tp->num_data_blocks * (kAvgSuccessorSize + 8);
expected_tp->filter_size =
kTableCount * (kKeysPerTable * kBloomBitsPerKey / 8);
}

@ -179,6 +179,10 @@ class InternalKey {
Slice user_key() const { return ExtractUserKey(rep_); }
size_t size() { return rep_.size(); }
void Set(const Slice& _user_key, SequenceNumber s, ValueType t) {
SetFrom(ParsedInternalKey(_user_key, s, t));
}
void SetFrom(const ParsedInternalKey& p) {
rep_.clear();
AppendInternalKey(&rep_, p);

@ -82,30 +82,24 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
PutLengthPrefixedSlice(dst, comparator_);
}
if (has_log_number_) {
PutVarint32(dst, kLogNumber);
PutVarint64(dst, log_number_);
PutVarint32Varint64(dst, kLogNumber, log_number_);
}
if (has_prev_log_number_) {
PutVarint32(dst, kPrevLogNumber);
PutVarint64(dst, prev_log_number_);
PutVarint32Varint64(dst, kPrevLogNumber, prev_log_number_);
}
if (has_next_file_number_) {
PutVarint32(dst, kNextFileNumber);
PutVarint64(dst, next_file_number_);
PutVarint32Varint64(dst, kNextFileNumber, next_file_number_);
}
if (has_last_sequence_) {
PutVarint32(dst, kLastSequence);
PutVarint64(dst, last_sequence_);
PutVarint32Varint64(dst, kLastSequence, last_sequence_);
}
if (has_max_column_family_) {
PutVarint32(dst, kMaxColumnFamily);
PutVarint32(dst, max_column_family_);
PutVarint32Varint32(dst, kMaxColumnFamily, max_column_family_);
}
for (const auto& deleted : deleted_files_) {
PutVarint32(dst, kDeletedFile);
PutVarint32(dst, deleted.first /* level */);
PutVarint64(dst, deleted.second /* file number */);
PutVarint32Varint32Varint64(dst, kDeletedFile, deleted.first /* level */,
deleted.second /* file number */);
}
for (size_t i = 0; i < new_files_.size(); i++) {
@ -124,8 +118,7 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
} else {
PutVarint32(dst, kNewFile3);
}
PutVarint32(dst, new_files_[i].first); // level
PutVarint64(dst, f.fd.GetNumber());
PutVarint32Varint64(dst, new_files_[i].first /* level */, f.fd.GetNumber());
if (f.fd.GetPathId() != 0 && !has_customized_fields) {
// kNewFile3
PutVarint32(dst, f.fd.GetPathId());
@ -133,8 +126,7 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
PutVarint64(dst, f.fd.GetFileSize());
PutLengthPrefixedSlice(dst, f.smallest.Encode());
PutLengthPrefixedSlice(dst, f.largest.Encode());
PutVarint64(dst, f.smallest_seqno);
PutVarint64(dst, f.largest_seqno);
PutVarint64Varint64(dst, f.smallest_seqno, f.largest_seqno);
if (has_customized_fields) {
// Customized fields' format:
// +-----------------------------+
@ -181,8 +173,7 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
// 0 is default and does not need to be explicitly written
if (column_family_ != 0) {
PutVarint32(dst, kColumnFamily);
PutVarint32(dst, column_family_);
PutVarint32Varint32(dst, kColumnFamily, column_family_);
}
if (is_column_family_add_) {

@ -241,10 +241,10 @@ class HashIndexBuilder : public IndexBuilder {
void FlushPendingPrefix() {
prefix_block_.append(pending_entry_prefix_.data(),
pending_entry_prefix_.size());
PutVarint32(&prefix_meta_block_,
static_cast<uint32_t>(pending_entry_prefix_.size()));
PutVarint32(&prefix_meta_block_, pending_entry_index_);
PutVarint32(&prefix_meta_block_, pending_block_num_);
PutVarint32Varint32Varint32(
&prefix_meta_block_,
static_cast<uint32_t>(pending_entry_prefix_.size()),
pending_entry_index_, pending_block_num_);
}
ShortenedIndexBuilder primary_index_builder_;

@ -49,23 +49,19 @@ BlockBuilder::BlockBuilder(int block_restart_interval, bool use_delta_encoding)
finished_(false) {
assert(block_restart_interval_ >= 1);
restarts_.push_back(0); // First restart point is at offset 0
estimate_ = sizeof(uint32_t) + sizeof(uint32_t);
}
void BlockBuilder::Reset() {
buffer_.clear();
restarts_.clear();
restarts_.push_back(0); // First restart point is at offset 0
estimate_ = sizeof(uint32_t) + sizeof(uint32_t);
counter_ = 0;
finished_ = false;
last_key_.clear();
}
size_t BlockBuilder::CurrentSizeEstimate() const {
return (buffer_.size() + // Raw data buffer
restarts_.size() * sizeof(uint32_t) + // Restart array
sizeof(uint32_t)); // Restart array length
}
size_t BlockBuilder::EstimateSizeAfterKV(const Slice& key, const Slice& value)
const {
size_t estimate = CurrentSizeEstimate();
@ -92,37 +88,44 @@ Slice BlockBuilder::Finish() {
}
void BlockBuilder::Add(const Slice& key, const Slice& value) {
Slice last_key_piece(last_key_);
assert(!finished_);
assert(counter_ <= block_restart_interval_);
size_t shared = 0; // number of bytes shared with prev key
if (counter_ >= block_restart_interval_) {
// Restart compression
restarts_.push_back(static_cast<uint32_t>(buffer_.size()));
estimate_ += sizeof(uint32_t);
counter_ = 0;
if (use_delta_encoding_) {
// Update state
last_key_.assign(key.data(), key.size());
}
} else if (use_delta_encoding_) {
Slice last_key_piece(last_key_);
// See how much sharing to do with previous string
const size_t min_length = std::min(last_key_piece.size(), key.size());
while ((shared < min_length) && (last_key_piece[shared] == key[shared])) {
shared++;
}
shared = key.difference_offset(last_key_piece);
// Update state
// We used to just copy the changed data here, but it appears to be
// faster to just copy the whole thing.
last_key_.assign(key.data(), key.size());
}
const size_t non_shared = key.size() - shared;
const size_t curr_size = buffer_.size();
// Add "<shared><non_shared><value_size>" to buffer_
PutVarint32(&buffer_, static_cast<uint32_t>(shared));
PutVarint32(&buffer_, static_cast<uint32_t>(non_shared));
PutVarint32(&buffer_, static_cast<uint32_t>(value.size()));
PutVarint32Varint32Varint32(&buffer_, static_cast<uint32_t>(shared),
static_cast<uint32_t>(non_shared),
static_cast<uint32_t>(value.size()));
// Add string delta to buffer_ followed by value
buffer_.append(key.data() + shared, non_shared);
buffer_.append(value.data(), value.size());
// Update state
last_key_.resize(shared);
last_key_.append(key.data() + shared, non_shared);
assert(Slice(last_key_) == key);
counter_++;
estimate_ += buffer_.size() - curr_size;
}
} // namespace rocksdb

@ -37,7 +37,7 @@ class BlockBuilder {
// Returns an estimate of the current (uncompressed) size of the block
// we are building.
size_t CurrentSizeEstimate() const;
inline size_t CurrentSizeEstimate() const { return estimate_; }
// Returns an estimated block size after appending key and value.
size_t EstimateSizeAfterKV(const Slice& key, const Slice& value) const;
@ -53,6 +53,7 @@ class BlockBuilder {
std::string buffer_; // Destination buffer
std::vector<uint32_t> restarts_; // Restart points
size_t estimate_;
int counter_; // Number of entries emitted since restart
bool finished_; // Has Finish() been called?
std::string last_key_;

@ -21,11 +21,11 @@ class FlushBlockBySizePolicy : public FlushBlockPolicy {
// reaches the configured
FlushBlockBySizePolicy(const uint64_t block_size,
const uint64_t block_size_deviation,
const BlockBuilder& data_block_builder) :
block_size_(block_size),
block_size_deviation_(block_size_deviation),
data_block_builder_(data_block_builder) {
}
const BlockBuilder& data_block_builder)
: block_size_(block_size),
block_size_deviation_limit_(
((block_size * (100 - block_size_deviation)) + 99) / 100),
data_block_builder_(data_block_builder) {}
virtual bool Update(const Slice& key,
const Slice& value) override {
@ -46,18 +46,20 @@ class FlushBlockBySizePolicy : public FlushBlockPolicy {
private:
bool BlockAlmostFull(const Slice& key, const Slice& value) const {
if (block_size_deviation_limit_ == 0) {
return false;
}
const auto curr_size = data_block_builder_.CurrentSizeEstimate();
const auto estimated_size_after =
data_block_builder_.EstimateSizeAfterKV(key, value);
return
estimated_size_after > block_size_ &&
block_size_deviation_ > 0 &&
curr_size * 100 > block_size_ * (100 - block_size_deviation_);
return estimated_size_after > block_size_ &&
curr_size > block_size_deviation_limit_;
}
const uint64_t block_size_;
const uint64_t block_size_deviation_;
const uint64_t block_size_deviation_limit_;
const BlockBuilder& data_block_builder_;
};

@ -43,8 +43,7 @@ void BlockHandle::EncodeTo(std::string* dst) const {
// Sanity check that all fields have been set
assert(offset_ != ~static_cast<uint64_t>(0));
assert(size_ != ~static_cast<uint64_t>(0));
PutVarint64(dst, offset_);
PutVarint64(dst, size_);
PutVarint64Varint64(dst, offset_, size_);
}
Status BlockHandle::DecodeFrom(Slice* input) {

@ -86,6 +86,7 @@ struct SstFileWriter::Rep {
InternalKeyComparator internal_comparator;
ExternalSstFileInfo file_info;
std::string column_family_name;
InternalKey ikey;
};
SstFileWriter::SstFileWriter(const EnvOptions& env_options,
@ -146,7 +147,7 @@ Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
}
if (r->file_info.num_entries == 0) {
r->file_info.smallest_key = user_key.ToString();
r->file_info.smallest_key.assign(user_key.data(), user_key.size());
} else {
if (r->internal_comparator.user_comparator()->Compare(
user_key, r->file_info.largest_key) <= 0) {
@ -157,12 +158,12 @@ Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
// update file info
r->file_info.num_entries++;
r->file_info.largest_key = user_key.ToString();
r->file_info.largest_key.assign(user_key.data(), user_key.size());
r->file_info.file_size = r->builder->FileSize();
InternalKey ikey(user_key, 0 /* Sequence Number */,
r->ikey.Set(user_key, 0 /* Sequence Number */,
ValueType::kTypeValue /* Put */);
r->builder->Add(ikey.Encode(), value);
r->builder->Add(r->ikey.Encode(), value);
return Status::OK();
}

@ -30,7 +30,17 @@ const unsigned int kMaxVarint64Length = 10;
extern void PutFixed32(std::string* dst, uint32_t value);
extern void PutFixed64(std::string* dst, uint64_t value);
extern void PutVarint32(std::string* dst, uint32_t value);
extern void PutVarint32Varint32(std::string* dst, uint32_t value1,
uint32_t value2);
extern void PutVarint32Varint32Varint32(std::string* dst, uint32_t value1,
uint32_t value2, uint32_t value3);
extern void PutVarint64(std::string* dst, uint64_t value);
extern void PutVarint64Varint64(std::string* dst, uint64_t value1,
uint64_t value2);
extern void PutVarint32Varint64(std::string* dst, uint32_t value1,
uint64_t value2);
extern void PutVarint32Varint32Varint64(std::string* dst, uint32_t value1,
uint32_t value2, uint64_t value3);
extern void PutLengthPrefixedSlice(std::string* dst, const Slice& value);
extern void PutLengthPrefixedSliceParts(std::string* dst,
const SliceParts& slice_parts);
@ -141,16 +151,25 @@ inline void EncodeFixed64(char* buf, uint64_t value) {
#endif
}
// Pull the last 8 bits and cast it to a character
inline void PutFixed32(std::string* dst, uint32_t value) {
#if __BYTE_ORDER__ == __LITTLE_ENDIAN__
dst->append(static_cast<const char*>(&value), sizeof(value));
#else
char buf[sizeof(value)];
EncodeFixed32(buf, value);
dst->append(buf, sizeof(buf));
#endif
}
inline void PutFixed64(std::string* dst, uint64_t value) {
#if __BYTE_ORDER__ == __LITTLE_ENDIAN__
dst->append(static_const<const char*>(&value), sizeof(value));
#else
char buf[sizeof(value)];
EncodeFixed64(buf, value);
dst->append(buf, sizeof(buf));
#endif
}
inline void PutVarint32(std::string* dst, uint32_t v) {
@ -159,6 +178,22 @@ inline void PutVarint32(std::string* dst, uint32_t v) {
dst->append(buf, static_cast<size_t>(ptr - buf));
}
inline void PutVarint32Varint32(std::string* dst, uint32_t v1, uint32_t v2) {
char buf[10];
char* ptr = EncodeVarint32(buf, v1);
ptr = EncodeVarint32(ptr, v2);
dst->append(buf, static_cast<size_t>(ptr - buf));
}
inline void PutVarint32Varint32Varint32(std::string* dst, uint32_t v1,
uint32_t v2, uint32_t v3) {
char buf[15];
char* ptr = EncodeVarint32(buf, v1);
ptr = EncodeVarint32(ptr, v2);
ptr = EncodeVarint32(ptr, v3);
dst->append(buf, static_cast<size_t>(ptr - buf));
}
inline char* EncodeVarint64(char* dst, uint64_t v) {
static const unsigned int B = 128;
unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
@ -176,6 +211,29 @@ inline void PutVarint64(std::string* dst, uint64_t v) {
dst->append(buf, static_cast<size_t>(ptr - buf));
}
inline void PutVarint64Varint64(std::string* dst, uint64_t v1, uint64_t v2) {
char buf[20];
char* ptr = EncodeVarint64(buf, v1);
ptr = EncodeVarint64(ptr, v2);
dst->append(buf, static_cast<size_t>(ptr - buf));
}
inline void PutVarint32Varint64(std::string* dst, uint32_t v1, uint64_t v2) {
char buf[15];
char* ptr = EncodeVarint32(buf, v1);
ptr = EncodeVarint64(ptr, v2);
dst->append(buf, static_cast<size_t>(ptr - buf));
}
inline void PutVarint32Varint32Varint64(std::string* dst, uint32_t v1,
uint32_t v2, uint64_t v3) {
char buf[20];
char* ptr = EncodeVarint32(buf, v1);
ptr = EncodeVarint32(ptr, v2);
ptr = EncodeVarint64(ptr, v3);
dst->append(buf, static_cast<size_t>(ptr - buf));
}
inline void PutLengthPrefixedSlice(std::string* dst, const Slice& value) {
PutVarint32(dst, static_cast<uint32_t>(value.size()));
dst->append(value.data(), value.size());

Loading…
Cancel
Save