Introduce ReadOptions::pin_data (support zero copy for keys)

Summary:
This patch update the Iterator API to introduce new functions that allow users to keep the Slices returned by key() valid as long as the Iterator is not deleted

ReadOptions::pin_data : If true keep loaded blocks in memory as long as the iterator is not deleted
Iterator::IsKeyPinned() : If true, this mean that the Slice returned by key() is valid as long as the iterator is not deleted

Also add a new option BlockBasedTableOptions::use_delta_encoding to allow users to disable delta_encoding if needed.

Benchmark results (using https://phabricator.fb.com/P20083553)

```
// $ du -h /home/tec/local/normal.4K.Snappy/db10077
// 6.1G    /home/tec/local/normal.4K.Snappy/db10077

// $ du -h /home/tec/local/zero.8K.LZ4/db10077
// 6.4G    /home/tec/local/zero.8K.LZ4/db10077

// Benchmarks for shard db10077
// _build/opt/rocks/benchmark/rocks_copy_benchmark \
//      --normal_db_path="/home/tec/local/normal.4K.Snappy/db10077" \
//      --zero_db_path="/home/tec/local/zero.8K.LZ4/db10077"

// First run
// ============================================================================
// rocks/benchmark/RocksCopyBenchmark.cpp          relative  time/iter  iters/s
// ============================================================================
// BM_StringCopy                                                 1.73s  576.97m
// BM_StringPiece                                   103.74%      1.67s  598.55m
// ============================================================================
// Match rate : 1000000 / 1000000

// Second run
// ============================================================================
// rocks/benchmark/RocksCopyBenchmark.cpp          relative  time/iter  iters/s
// ============================================================================
// BM_StringCopy                                              611.99ms     1.63
// BM_StringPiece                                   203.76%   300.35ms     3.33
// ============================================================================
// Match rate : 1000000 / 1000000
```

Test Plan: Unit tests

Reviewers: sdong, igor, anthony, yhchiang, rven

Reviewed By: rven

Subscribers: dhruba, lovro, adsharma

Differential Revision: https://reviews.facebook.net/D48999
main
Islam AbdelRahman 9 years ago
parent e6e505a4d9
commit aececc209e
  1. 18
      db/db_impl.cc
  2. 73
      db/db_iter.cc
  3. 8
      db/db_iter.h
  4. 316
      db/db_test.cc
  5. 64
      db/dbformat.h
  6. 2
      db/listener_test.cc
  7. 15
      db/memtable.cc
  8. 9
      include/rocksdb/iterator.h
  9. 7
      include/rocksdb/options.h
  10. 6
      include/rocksdb/table.h
  11. 9
      table/block.cc
  12. 12
      table/block.h
  13. 3
      table/block_based_table_builder.cc
  14. 15
      table/block_builder.cc
  15. 4
      table/block_builder.h
  16. 18
      table/internal_iterator.h
  17. 90
      table/iterator_wrapper.h
  18. 50
      table/merger.cc
  19. 1
      table/table_test.cc
  20. 7
      table/two_level_iterator.cc
  21. 6
      util/options.cc

@ -3828,7 +3828,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
env_, *cfd->ioptions(), cfd->user_comparator(), iter,
kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
read_options.iterate_upper_bound, read_options.prefix_same_as_start);
read_options.iterate_upper_bound, read_options.prefix_same_as_start,
read_options.pin_data);
#endif
} else {
SequenceNumber latest_snapshot = versions_->LastSequence();
@ -3885,7 +3886,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
env_, *cfd->ioptions(), cfd->user_comparator(), snapshot,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
read_options.iterate_upper_bound, read_options.prefix_same_as_start);
read_options.iterate_upper_bound, read_options.prefix_same_as_start,
read_options.pin_data);
InternalIterator* internal_iter =
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena());
@ -3931,10 +3933,11 @@ Status DBImpl::NewIterators(
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
auto iter = new ForwardIterator(this, read_options, cfd, sv);
iterators->push_back(
NewDBIterator(env_, *cfd->ioptions(), cfd->user_comparator(), iter,
kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations));
iterators->push_back(NewDBIterator(
env_, *cfd->ioptions(), cfd->user_comparator(), iter,
kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations, nullptr,
false, read_options.pin_data));
}
#endif
} else {
@ -3953,7 +3956,8 @@ Status DBImpl::NewIterators(
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
env_, *cfd->ioptions(), cfd->user_comparator(), snapshot,
sv->mutable_cf_options.max_sequential_skip_in_iterations);
sv->mutable_cf_options.max_sequential_skip_in_iterations, nullptr,
false, read_options.pin_data);
InternalIterator* internal_iter =
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena());
db_iter->SetIterUnderDBIter(internal_iter);

@ -76,7 +76,8 @@ class DBIter: public Iterator {
current_entry_is_merged_(false),
statistics_(ioptions.statistics),
iterate_upper_bound_(iterate_upper_bound),
prefix_same_as_start_(prefix_same_as_start) {
prefix_same_as_start_(prefix_same_as_start),
iter_pinned_(false) {
RecordTick(statistics_, NO_ITERATORS);
prefix_extractor_ = ioptions.prefix_extractor;
max_skip_ = max_sequential_skip_in_iterations;
@ -92,6 +93,9 @@ class DBIter: public Iterator {
virtual void SetIter(InternalIterator* iter) {
assert(iter_ == nullptr);
iter_ = iter;
if (iter_ && iter_pinned_) {
iter_->PinData();
}
}
virtual bool Valid() const override { return valid_; }
virtual Slice key() const override {
@ -110,6 +114,32 @@ class DBIter: public Iterator {
return status_;
}
}
virtual Status PinData() {
Status s;
if (iter_) {
s = iter_->PinData();
}
if (s.ok()) {
// Even if iter_ is nullptr, we set iter_pinned_ to true so that when
// iter_ is updated using SetIter, we Pin it.
iter_pinned_ = true;
}
return s;
}
virtual Status ReleasePinnedData() {
Status s;
if (iter_) {
s = iter_->ReleasePinnedData();
}
if (s.ok()) {
iter_pinned_ = false;
}
return s;
}
virtual bool IsKeyPinned() const override {
assert(valid_);
return iter_pinned_ && saved_key_.IsKeyPinned();
}
virtual void Next() override;
virtual void Prev() override;
@ -159,6 +189,7 @@ class DBIter: public Iterator {
const Slice* iterate_upper_bound_;
IterKey prefix_start_;
bool prefix_same_as_start_;
bool iter_pinned_;
// No copying allowed
DBIter(const DBIter&);
@ -257,18 +288,21 @@ void DBIter::FindNextUserEntryInternal(bool skipping) {
case kTypeSingleDeletion:
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
saved_key_.SetKey(ikey.user_key);
saved_key_.SetKey(ikey.user_key,
!iter_->IsKeyPinned() /* copy */);
skipping = true;
num_skipped = 0;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
break;
case kTypeValue:
valid_ = true;
saved_key_.SetKey(ikey.user_key);
saved_key_.SetKey(ikey.user_key,
!iter_->IsKeyPinned() /* copy */);
return;
case kTypeMerge:
// By now, we are sure the current ikey is going to yield a value
saved_key_.SetKey(ikey.user_key);
saved_key_.SetKey(ikey.user_key,
!iter_->IsKeyPinned() /* copy */);
current_entry_is_merged_ = true;
valid_ = true;
MergeValuesNewToOld(); // Go to a different state machine
@ -428,7 +462,8 @@ void DBIter::PrevInternal() {
ParsedInternalKey ikey;
while (iter_->Valid()) {
saved_key_.SetKey(ExtractUserKey(iter_->key()));
saved_key_.SetKey(ExtractUserKey(iter_->key()),
!iter_->IsKeyPinned() /* copy */);
if (FindValueForCurrentKey()) {
valid_ = true;
if (!iter_->Valid()) {
@ -744,7 +779,7 @@ void DBIter::SeekToLast() {
// it will seek to the last key before the
// ReadOptions.iterate_upper_bound
if (iter_->Valid() && iterate_upper_bound_ != nullptr) {
saved_key_.SetKey(*iterate_upper_bound_);
saved_key_.SetKey(*iterate_upper_bound_, false /* copy */);
std::string last_key;
AppendInternalKey(&last_key,
ParsedInternalKey(saved_key_.GetKey(), kMaxSequenceNumber,
@ -781,10 +816,15 @@ Iterator* NewDBIterator(Env* env, const ImmutableCFOptions& ioptions,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations,
const Slice* iterate_upper_bound,
bool prefix_same_as_start) {
return new DBIter(env, ioptions, user_key_comparator, internal_iter, sequence,
false, max_sequential_skip_in_iterations,
iterate_upper_bound, prefix_same_as_start);
bool prefix_same_as_start, bool pin_data) {
DBIter* db_iter =
new DBIter(env, ioptions, user_key_comparator, internal_iter, sequence,
false, max_sequential_skip_in_iterations, iterate_upper_bound,
prefix_same_as_start);
if (pin_data) {
db_iter->PinData();
}
return db_iter;
}
ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
@ -806,6 +846,13 @@ inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); }
inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); }
inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); }
inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); }
inline Status ArenaWrappedDBIter::PinData() { return db_iter_->PinData(); }
inline Status ArenaWrappedDBIter::ReleasePinnedData() {
return db_iter_->ReleasePinnedData();
}
inline bool ArenaWrappedDBIter::IsKeyPinned() const {
return db_iter_->IsKeyPinned();
}
void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1,
void* arg2) {
db_iter_->RegisterCleanup(function, arg1, arg2);
@ -815,7 +862,8 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ImmutableCFOptions& ioptions,
const Comparator* user_key_comparator, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations,
const Slice* iterate_upper_bound, bool prefix_same_as_start) {
const Slice* iterate_upper_bound, bool prefix_same_as_start,
bool pin_data) {
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
Arena* arena = iter->GetArena();
auto mem = arena->AllocateAligned(sizeof(DBIter));
@ -825,6 +873,9 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator(
iterate_upper_bound, prefix_same_as_start);
iter->SetDBIter(db_iter);
if (pin_data) {
iter->PinData();
}
return iter;
}

@ -30,7 +30,8 @@ extern Iterator* NewDBIterator(Env* env, const ImmutableCFOptions& options,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations,
const Slice* iterate_upper_bound = nullptr,
bool prefix_same_as_start = false);
bool prefix_same_as_start = false,
bool pin_data = false);
// A wrapper iterator which wraps DB Iterator and the arena, with which the DB
// iterator is supposed be allocated. This class is used as an entry point of
@ -63,6 +64,9 @@ class ArenaWrappedDBIter : public Iterator {
virtual Status status() const override;
void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2);
virtual Status PinData();
virtual Status ReleasePinnedData();
virtual bool IsKeyPinned() const override;
private:
DBIter* db_iter_;
@ -75,6 +79,6 @@ extern ArenaWrappedDBIter* NewArenaWrappedDbIterator(
const Comparator* user_key_comparator, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations,
const Slice* iterate_upper_bound = nullptr,
bool prefix_same_as_start = false);
bool prefix_same_as_start = false, bool pin_data = false);
} // namespace rocksdb

@ -118,7 +118,6 @@ class DBTestWithParam
uint32_t max_subcompactions_;
bool exclusive_manual_compaction_;
};
#ifndef ROCKSDB_LITE
TEST_F(DBTest, Empty) {
do {
@ -10143,6 +10142,321 @@ TEST_F(DBTest, SSTsWithLdbSuffixHandling) {
Destroy(options);
}
TEST_F(DBTest, PinnedDataIteratorRandomized) {
enum TestConfig {
NORMAL,
CLOSE_AND_OPEN,
COMPACT_BEFORE_READ,
FLUSH_EVERY_1000,
MAX
};
// Generate Random data
Random rnd(301);
int puts = 100000;
int key_pool = puts * 0.7;
int key_size = 100;
int val_size = 1000;
int seeks_percentage = 20; // 20% of keys will be used to test seek()
int delete_percentage = 20; // 20% of keys will be deleted
int merge_percentage = 20; // 20% of keys will be added using Merge()
for (int run_config = 0; run_config < TestConfig::MAX; run_config++) {
Options options = CurrentOptions();
BlockBasedTableOptions table_options;
table_options.use_delta_encoding = false;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.merge_operator = MergeOperators::CreatePutOperator();
DestroyAndReopen(options);
std::vector<std::string> generated_keys(key_pool);
for (int i = 0; i < key_pool; i++) {
generated_keys[i] = RandomString(&rnd, key_size);
}
std::map<std::string, std::string> true_data;
std::vector<std::string> random_keys;
std::vector<std::string> deleted_keys;
for (int i = 0; i < puts; i++) {
auto& k = generated_keys[rnd.Next() % key_pool];
auto v = RandomString(&rnd, val_size);
// Insert data to true_data map and to DB
true_data[k] = v;
if (rnd.OneIn(100.0 / merge_percentage)) {
ASSERT_OK(db_->Merge(WriteOptions(), k, v));
} else {
ASSERT_OK(Put(k, v));
}
// Pick random keys to be used to test Seek()
if (rnd.OneIn(100.0 / seeks_percentage)) {
random_keys.push_back(k);
}
// Delete some random keys
if (rnd.OneIn(100.0 / delete_percentage)) {
deleted_keys.push_back(k);
true_data.erase(k);
ASSERT_OK(Delete(k));
}
if (run_config == TestConfig::FLUSH_EVERY_1000) {
if (i && i % 1000 == 0) {
Flush();
}
}
}
if (run_config == TestConfig::CLOSE_AND_OPEN) {
Close();
Reopen(options);
} else if (run_config == TestConfig::COMPACT_BEFORE_READ) {
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
}
ReadOptions ro;
ro.pin_data = true;
auto iter = db_->NewIterator(ro);
{
// Test Seek to random keys
printf("Testing seek on %zu keys\n", random_keys.size());
std::vector<Slice> keys_slices;
std::vector<std::string> true_keys;
for (auto& k : random_keys) {
iter->Seek(k);
if (!iter->Valid()) {
ASSERT_EQ(true_data.lower_bound(k), true_data.end());
continue;
}
ASSERT_TRUE(iter->IsKeyPinned());
keys_slices.push_back(iter->key());
true_keys.push_back(true_data.lower_bound(k)->first);
}
for (size_t i = 0; i < keys_slices.size(); i++) {
ASSERT_EQ(keys_slices[i].ToString(), true_keys[i]);
}
}
{
// Test iterating all data forward
printf("Testing iterating forward on all keys\n");
std::vector<Slice> all_keys;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_TRUE(iter->IsKeyPinned());
all_keys.push_back(iter->key());
}
ASSERT_EQ(all_keys.size(), true_data.size());
// Verify that all keys slices are valid
auto data_iter = true_data.begin();
for (size_t i = 0; i < all_keys.size(); i++) {
ASSERT_EQ(all_keys[i].ToString(), data_iter->first);
data_iter++;
}
}
{
// Test iterating all data backward
printf("Testing iterating backward on all keys\n");
std::vector<Slice> all_keys;
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
ASSERT_TRUE(iter->IsKeyPinned());
all_keys.push_back(iter->key());
}
ASSERT_EQ(all_keys.size(), true_data.size());
// Verify that all keys slices are valid (backward)
auto data_iter = true_data.rbegin();
for (size_t i = 0; i < all_keys.size(); i++) {
ASSERT_EQ(all_keys[i].ToString(), data_iter->first);
data_iter++;
}
}
delete iter;
}
}
#ifndef ROCKSDB_LITE
TEST_F(DBTest, PinnedDataIteratorMultipleFiles) {
Options options = CurrentOptions();
BlockBasedTableOptions table_options;
table_options.use_delta_encoding = false;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.disable_auto_compactions = true;
options.write_buffer_size = 1024 * 1024 * 10; // 10 Mb
DestroyAndReopen(options);
std::map<std::string, std::string> true_data;
// Generate 4 sst files in L2
Random rnd(301);
for (int i = 1; i <= 1000; i++) {
std::string k = Key(i * 3);
std::string v = RandomString(&rnd, 100);
ASSERT_OK(Put(k, v));
true_data[k] = v;
if (i % 250 == 0) {
ASSERT_OK(Flush());
}
}
ASSERT_EQ(FilesPerLevel(0), "4");
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ(FilesPerLevel(0), "0,4");
// Generate 4 sst files in L0
for (int i = 1; i <= 1000; i++) {
std::string k = Key(i * 2);
std::string v = RandomString(&rnd, 100);
ASSERT_OK(Put(k, v));
true_data[k] = v;
if (i % 250 == 0) {
ASSERT_OK(Flush());
}
}
ASSERT_EQ(FilesPerLevel(0), "4,4");
// Add some keys/values in memtables
for (int i = 1; i <= 1000; i++) {
std::string k = Key(i);
std::string v = RandomString(&rnd, 100);
ASSERT_OK(Put(k, v));
true_data[k] = v;
}
ASSERT_EQ(FilesPerLevel(0), "4,4");
ReadOptions ro;
ro.pin_data = true;
auto iter = db_->NewIterator(ro);
std::vector<std::pair<Slice, std::string>> results;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_TRUE(iter->IsKeyPinned());
results.emplace_back(iter->key(), iter->value().ToString());
}
ASSERT_EQ(results.size(), true_data.size());
auto data_iter = true_data.begin();
for (size_t i = 0; i < results.size(); i++, data_iter++) {
auto& kv = results[i];
ASSERT_EQ(kv.first, data_iter->first);
ASSERT_EQ(kv.second, data_iter->second);
}
delete iter;
}
#endif
TEST_F(DBTest, PinnedDataIteratorMergeOperator) {
Options options = CurrentOptions();
BlockBasedTableOptions table_options;
table_options.use_delta_encoding = false;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.merge_operator = MergeOperators::CreateUInt64AddOperator();
DestroyAndReopen(options);
std::string numbers[7];
for (int val = 0; val <= 6; val++) {
PutFixed64(numbers + val, val);
}
// +1 all keys in range [ 0 => 999]
for (int i = 0; i < 1000; i++) {
WriteOptions wo;
ASSERT_OK(db_->Merge(wo, Key(i), numbers[1]));
}
// +2 all keys divisible by 2 in range [ 0 => 999]
for (int i = 0; i < 1000; i += 2) {
WriteOptions wo;
ASSERT_OK(db_->Merge(wo, Key(i), numbers[2]));
}
// +3 all keys divisible by 5 in range [ 0 => 999]
for (int i = 0; i < 1000; i += 5) {
WriteOptions wo;
ASSERT_OK(db_->Merge(wo, Key(i), numbers[3]));
}
ReadOptions ro;
ro.pin_data = true;
auto iter = db_->NewIterator(ro);
std::vector<std::pair<Slice, std::string>> results;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_TRUE(iter->IsKeyPinned());
results.emplace_back(iter->key(), iter->value().ToString());
}
ASSERT_EQ(results.size(), 1000);
for (size_t i = 0; i < results.size(); i++) {
auto& kv = results[i];
ASSERT_EQ(kv.first, Key(static_cast<int>(i)));
int expected_val = 1;
if (i % 2 == 0) {
expected_val += 2;
}
if (i % 5 == 0) {
expected_val += 3;
}
ASSERT_EQ(kv.second, numbers[expected_val]);
}
delete iter;
}
TEST_F(DBTest, PinnedDataIteratorReadAfterUpdate) {
Options options = CurrentOptions();
BlockBasedTableOptions table_options;
table_options.use_delta_encoding = false;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.write_buffer_size = 100000;
DestroyAndReopen(options);
Random rnd(301);
std::map<std::string, std::string> true_data;
for (int i = 0; i < 1000; i++) {
std::string k = RandomString(&rnd, 10);
std::string v = RandomString(&rnd, 1000);
ASSERT_OK(Put(k, v));
true_data[k] = v;
}
ReadOptions ro;
ro.pin_data = true;
auto iter = db_->NewIterator(ro);
// Delete 50% of the keys and update the other 50%
for (auto& kv : true_data) {
if (rnd.OneIn(2)) {
ASSERT_OK(Delete(kv.first));
} else {
std::string new_val = RandomString(&rnd, 1000);
ASSERT_OK(Put(kv.first, new_val));
}
}
std::vector<std::pair<Slice, std::string>> results;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_TRUE(iter->IsKeyPinned());
results.emplace_back(iter->key(), iter->value().ToString());
}
auto data_iter = true_data.begin();
for (size_t i = 0; i < results.size(); i++, data_iter++) {
auto& kv = results[i];
ASSERT_EQ(kv.first, data_iter->first);
ASSERT_EQ(kv.second, data_iter->second);
}
delete iter;
}
INSTANTIATE_TEST_CASE_P(DBTestWithParam, DBTestWithParam,
::testing::Combine(::testing::Values(1, 4),
::testing::Bool()));

@ -271,7 +271,8 @@ inline LookupKey::~LookupKey() {
class IterKey {
public:
IterKey() : key_(space_), buf_size_(sizeof(space_)), key_size_(0) {}
IterKey()
: buf_(space_), buf_size_(sizeof(space_)), key_(buf_), key_size_(0) {}
~IterKey() { ResetBuffer(); }
@ -293,31 +294,41 @@ class IterKey {
void TrimAppend(const size_t shared_len, const char* non_shared_data,
const size_t non_shared_len) {
assert(shared_len <= key_size_);
size_t total_size = shared_len + non_shared_len;
if (total_size <= buf_size_) {
key_size_ = total_size;
} else {
if (IsKeyPinned() /* key is not in buf_ */) {
// Copy the key from external memory to buf_ (copy shared_len bytes)
EnlargeBufferIfNeeded(total_size);
memcpy(buf_, key_, shared_len);
} else if (total_size > buf_size_) {
// Need to allocate space, delete previous space
char* p = new char[total_size];
memcpy(p, key_, shared_len);
if (key_ != space_) {
delete[] key_;
if (buf_ != space_) {
delete[] buf_;
}
key_ = p;
key_size_ = total_size;
buf_ = p;
buf_size_ = total_size;
}
memcpy(key_ + shared_len, non_shared_data, non_shared_len);
memcpy(buf_ + shared_len, non_shared_data, non_shared_len);
key_ = buf_;
key_size_ = total_size;
}
Slice SetKey(const Slice& key) {
Slice SetKey(const Slice& key, bool copy = true) {
size_t size = key.size();
EnlargeBufferIfNeeded(size);
memcpy(key_, key.data(), size);
if (copy) {
// Copy key to buf_
EnlargeBufferIfNeeded(size);
memcpy(buf_, key.data(), size);
key_ = buf_;
} else {
// Update key_ to point to external memory
key_ = key.data();
}
key_size_ = size;
return Slice(key_, key_size_);
}
@ -335,11 +346,14 @@ class IterKey {
// Update the sequence number in the internal key. Guarantees not to
// invalidate slices to the key (and the user key).
void UpdateInternalKey(uint64_t seq, ValueType t) {
assert(!IsKeyPinned());
assert(key_size_ >= 8);
uint64_t newval = (seq << 8) | t;
EncodeFixed64(&key_[key_size_ - 8], newval);
EncodeFixed64(&buf_[key_size_ - 8], newval);
}
bool IsKeyPinned() const { return (key_ != buf_); }
void SetInternalKey(const Slice& key_prefix, const Slice& user_key,
SequenceNumber s,
ValueType value_type = kValueTypeForSeek) {
@ -347,10 +361,12 @@ class IterKey {
size_t usize = user_key.size();
EnlargeBufferIfNeeded(psize + usize + sizeof(uint64_t));
if (psize > 0) {
memcpy(key_, key_prefix.data(), psize);
memcpy(buf_, key_prefix.data(), psize);
}
memcpy(key_ + psize, user_key.data(), usize);
EncodeFixed64(key_ + usize + psize, PackSequenceAndType(s, value_type));
memcpy(buf_ + psize, user_key.data(), usize);
EncodeFixed64(buf_ + usize + psize, PackSequenceAndType(s, value_type));
key_ = buf_;
key_size_ = psize + usize + sizeof(uint64_t);
}
@ -377,20 +393,22 @@ class IterKey {
void EncodeLengthPrefixedKey(const Slice& key) {
auto size = key.size();
EnlargeBufferIfNeeded(size + static_cast<size_t>(VarintLength(size)));
char* ptr = EncodeVarint32(key_, static_cast<uint32_t>(size));
char* ptr = EncodeVarint32(buf_, static_cast<uint32_t>(size));
memcpy(ptr, key.data(), size);
key_ = buf_;
}
private:
char* key_;
char* buf_;
size_t buf_size_;
const char* key_;
size_t key_size_;
char space_[32]; // Avoid allocation for short keys
void ResetBuffer() {
if (key_ != space_) {
delete[] key_;
key_ = space_;
if (buf_ != space_) {
delete[] buf_;
buf_ = space_;
}
buf_size_ = sizeof(space_);
key_size_ = 0;
@ -407,7 +425,7 @@ class IterKey {
if (key_size > buf_size_) {
// Need to enlarge the buffer.
ResetBuffer();
key_ = new char[key_size];
buf_ = new char[key_size];
buf_size_ = key_size;
}
}

@ -40,7 +40,7 @@ namespace rocksdb {
class EventListenerTest : public DBTestBase {
public:
EventListenerTest() : DBTestBase("listener_test") {}
EventListenerTest() : DBTestBase("/listener_test") {}
const size_t k110KB = 110 << 10;
};

@ -274,6 +274,21 @@ class MemTableIterator : public InternalIterator {
virtual Status status() const override { return Status::OK(); }
virtual Status PinData() override {
// memtable data is always pinned
return Status::OK();
}
virtual Status ReleasePinnedData() override {
// memtable data is always pinned
return Status::OK();
}
virtual bool IsKeyPinned() const override {
// memtable data is always pinned
return true;
}
private:
DynamicBloom* bloom_;
const SliceTransform* const prefix_extractor_;

@ -95,6 +95,15 @@ class Iterator : public Cleanable {
// satisfied without doing some IO, then this returns Status::Incomplete().
virtual Status status() const = 0;
// If true, this means that the Slice returned by key() is valid as long
// as the iterator is not deleted and ReleasePinnedData() is not called.
//
// IsKeyPinned() is guaranteed to always return true if
// - Iterator created with ReadOptions::pin_data = true
// - DB tables were created with BlockBasedTableOptions::use_delta_encoding
// set to false.
virtual bool IsKeyPinned() const { return false; }
private:
// No copying allowed
Iterator(const Iterator&);

@ -1325,6 +1325,13 @@ struct ReadOptions {
// Default: false
bool prefix_same_as_start;
// Keep the blocks loaded by the iterator pinned in memory as long as the
// iterator is not deleted, If used when reading from tables created with
// BlockBasedTableOptions::use_delta_encoding = false, Iterator::IsKeyPinned()
// is guaranteed to return true.
// Default: false
bool pin_data;
ReadOptions();
ReadOptions(bool cksum, bool cache);
};

@ -119,6 +119,12 @@ struct BlockBasedTableOptions {
// leave this parameter alone.
int block_restart_interval = 16;
// Use delta encoding to compress keys in blocks.
// Iterator::PinData() requires this option to be disabled.
//
// Default: true
bool use_delta_encoding = true;
// If non-nullptr, use the specified filter policy to reduce disk reads.
// Many applications will benefit from passing the result of
// NewBloomFilterPolicy() here.

@ -153,7 +153,14 @@ bool BlockIter::ParseNextKey() {
CorruptionError();
return false;
} else {
key_.TrimAppend(shared, p, non_shared);
if (shared == 0) {
// If this key dont share any bytes with prev key then we dont need
// to decode it and can use it's address in the block directly.
key_.SetKey(Slice(p, non_shared), false /* copy */);
} else {
// This key share `shared` bytes with prev key, we need to decode it
key_.TrimAppend(shared, p, non_shared);
}
value_ = Slice(p + non_shared, value_length);
while (restart_index_ + 1 < num_restarts_ &&
GetRestartPoint(restart_index_ + 1) < current_) {

@ -151,6 +151,18 @@ class BlockIter : public InternalIterator {
virtual void SeekToLast() override;
virtual Status PinData() override {
// block data is always pinned.
return Status::OK();
}
virtual Status ReleasePinnedData() override {
// block data is always pinned.
return Status::OK();
}
virtual bool IsKeyPinned() const override { return key_.IsKeyPinned(); }
private:
const Comparator* comparator_;
const char* data_; // underlying block contents

@ -481,7 +481,8 @@ struct BlockBasedTableBuilder::Rep {
table_options(table_opt),
internal_comparator(icomparator),
file(f),
data_block(table_options.block_restart_interval),
data_block(table_options.block_restart_interval,
table_options.use_delta_encoding),
internal_prefix_transform(_ioptions.prefix_extractor),
index_builder(CreateIndexBuilder(table_options.index_type,
&internal_comparator,

@ -41,8 +41,9 @@
namespace rocksdb {
BlockBuilder::BlockBuilder(int block_restart_interval)
BlockBuilder::BlockBuilder(int block_restart_interval, bool use_delta_encoding)
: block_restart_interval_(block_restart_interval),
use_delta_encoding_(use_delta_encoding),
restarts_(),
counter_(0),
finished_(false) {
@ -94,17 +95,17 @@ void BlockBuilder::Add(const Slice& key, const Slice& value) {
Slice last_key_piece(last_key_);
assert(!finished_);
assert(counter_ <= block_restart_interval_);
size_t shared = 0;
if (counter_ < block_restart_interval_) {
size_t shared = 0; // number of bytes shared with prev key
if (counter_ >= block_restart_interval_) {
// Restart compression
restarts_.push_back(static_cast<uint32_t>(buffer_.size()));
counter_ = 0;
} else if (use_delta_encoding_) {
// See how much sharing to do with previous string
const size_t min_length = std::min(last_key_piece.size(), key.size());
while ((shared < min_length) && (last_key_piece[shared] == key[shared])) {
shared++;
}
} else {
// Restart compression
restarts_.push_back(static_cast<uint32_t>(buffer_.size()));
counter_ = 0;
}
const size_t non_shared = key.size() - shared;

@ -20,7 +20,8 @@ class BlockBuilder {
BlockBuilder(const BlockBuilder&) = delete;
void operator=(const BlockBuilder&) = delete;
explicit BlockBuilder(int block_restart_interval);
explicit BlockBuilder(int block_restart_interval,
bool use_delta_encoding = true);
// Reset the contents as if the BlockBuilder was just constructed.
void Reset();
@ -48,6 +49,7 @@ class BlockBuilder {
private:
const int block_restart_interval_;
const bool use_delta_encoding_;
std::string buffer_; // Destination buffer
std::vector<uint32_t> restarts_; // Restart points

@ -60,6 +60,24 @@ class InternalIterator : public Cleanable {
// satisfied without doing some IO, then this returns Status::Incomplete().
virtual Status status() const = 0;
// Make sure that all current and future data blocks used by this iterator
// will be pinned in memory and will not be released except when
// ReleasePinnedData() is called or the iterator is deleted.
virtual Status PinData() { return Status::NotSupported(""); }
// Release all blocks that were pinned because of PinData() and no future
// blocks will be pinned.
virtual Status ReleasePinnedData() { return Status::NotSupported(""); }
// If true, this means that the Slice returned by key() is valid as long
// as the iterator is not deleted and ReleasePinnedData() is not called.
//
// IsKeyPinned() is guaranteed to always return true if
// - PinData() is called
// - DB tables were created with BlockBasedTableOptions::use_delta_encoding
// set to false.
virtual bool IsKeyPinned() const { return false; }
private:
// No copying allowed
InternalIterator(const InternalIterator&) = delete;

@ -9,6 +9,8 @@
#pragma once
#include <set>
#include "table/internal_iterator.h"
namespace rocksdb {
@ -19,31 +21,95 @@ namespace rocksdb {
// cache locality.
class IteratorWrapper {
public:
IteratorWrapper(): iter_(nullptr), valid_(false) { }
explicit IteratorWrapper(InternalIterator* _iter) : iter_(nullptr) {
IteratorWrapper() : iter_(nullptr), iters_pinned_(false), valid_(false) {}
explicit IteratorWrapper(InternalIterator* _iter)
: iter_(nullptr), iters_pinned_(false) {
Set(_iter);
}
~IteratorWrapper() {}
InternalIterator* iter() const { return iter_; }
// Takes ownership of "iter" and will delete it when destroyed, or
// when Set() is invoked again.
// Takes the ownership of "_iter" and will delete it when destroyed.
// Next call to Set() will destroy "_iter" except if PinData() was called.
void Set(InternalIterator* _iter) {
delete iter_;
if (iters_pinned_ && iter_) {
// keep old iterator until ReleasePinnedData() is called
pinned_iters_.insert(iter_);
} else {
delete iter_;
}
iter_ = _iter;
if (iter_ == nullptr) {
valid_ = false;
} else {
Update();
if (iters_pinned_) {
// Pin new iterator
Status s = iter_->PinData();
assert(s.ok());
}
}
}
Status PinData() {
Status s;
if (iters_pinned_) {
return s;
}
if (iter_) {
s = iter_->PinData();
}
if (s.ok()) {
iters_pinned_ = true;
}
return s;
}
Status ReleasePinnedData() {
Status s;
if (!iters_pinned_) {
return s;
}
if (iter_) {
s = iter_->ReleasePinnedData();
}
if (s.ok()) {
iters_pinned_ = false;
// No need to call ReleasePinnedData() for pinned_iters_
// since we will delete them
DeletePinnedIterators(false);
}
return s;
}
bool IsKeyPinned() const {
assert(iter_);
return iters_pinned_ && iter_->IsKeyPinned();
}
void DeletePinnedIterators(bool is_arena_mode) {
for (auto it : pinned_iters_) {
if (!is_arena_mode) {
delete it;
} else {
it->~InternalIterator();
}
}
pinned_iters_.clear();
}
void DeleteIter(bool is_arena_mode) {
if (!is_arena_mode) {
delete iter_;
} else {
iter_->~InternalIterator();
if (iter_) {
pinned_iters_.insert(iter_);
}
DeletePinnedIterators(is_arena_mode);
}
// Iterator interface methods
@ -67,6 +133,12 @@ class IteratorWrapper {
}
InternalIterator* iter_;
// If set to true, current and future iterators wont be deleted.
bool iters_pinned_;
// List of past iterators that are pinned and wont be deleted as long as
// iters_pinned_ is true. When we are pinning iterators this set will contain
// iterators of previous data blocks to keep them from being deleted.
std::set<InternalIterator*> pinned_iters_;
bool valid_;
Slice key_;
};

@ -37,7 +37,8 @@ class MergingIterator : public InternalIterator {
public:
MergingIterator(const Comparator* comparator, InternalIterator** children,
int n, bool is_arena_mode)
: is_arena_mode_(is_arena_mode),
: data_pinned_(false),
is_arena_mode_(is_arena_mode),
comparator_(comparator),
current_(nullptr),
direction_(kForward),
@ -57,6 +58,10 @@ class MergingIterator : public InternalIterator {
virtual void AddIterator(InternalIterator* iter) {
assert(direction_ == kForward);
children_.emplace_back(iter);
if (data_pinned_) {
Status s = iter->PinData();
assert(s.ok());
}
auto new_wrapper = children_.back();
if (new_wrapper.Valid()) {
minHeap_.push(&new_wrapper);
@ -238,7 +243,50 @@ class MergingIterator : public InternalIterator {
return s;
}
virtual Status PinData() override {
Status s;
if (data_pinned_) {
return s;
}
for (size_t i = 0; i < children_.size(); i++) {
s = children_[i].PinData();
if (!s.ok()) {
// We failed to pin an iterator, clean up
for (size_t j = 0; j < i; j++) {
children_[j].ReleasePinnedData();
}
break;
}
}
data_pinned_ = s.ok();
return s;
}
virtual Status ReleasePinnedData() override {
Status s;
if (!data_pinned_) {
return s;
}
for (auto& child : children_) {
Status release_status = child.ReleasePinnedData();
if (s.ok() && !release_status.ok()) {
s = release_status;
}
}
data_pinned_ = false;
return s;
}
virtual bool IsKeyPinned() const override {
assert(Valid());
return current_->IsKeyPinned();
}
private:
bool data_pinned_;
// Clears heaps for both directions, used when changing direction or seeking
void ClearHeaps();
// Ensures that maxHeap_ is initialized when starting to go in the reverse

@ -1807,6 +1807,7 @@ TEST_F(BlockBasedTableTest, BlockCacheLeak) {
iter->Next();
}
ASSERT_OK(iter->status());
iter.release();
const ImmutableCFOptions ioptions1(opt);
ASSERT_OK(c.Reopen(ioptions1));

@ -61,6 +61,13 @@ class TwoLevelIterator : public InternalIterator {
return status_;
}
}
virtual Status PinData() override { return second_level_iter_.PinData(); }
virtual Status ReleasePinnedData() override {
return second_level_iter_.ReleasePinnedData();
}
virtual bool IsKeyPinned() const override {
return second_level_iter_.iter() ? second_level_iter_.IsKeyPinned() : false;
}
private:
void SaveError(const Status& s) {

@ -720,7 +720,8 @@ ReadOptions::ReadOptions()
tailing(false),
managed(false),
total_order_seek(false),
prefix_same_as_start(false) {
prefix_same_as_start(false),
pin_data(false) {
XFUNC_TEST("", "managed_options", managed_options, xf_manage_options,
reinterpret_cast<ReadOptions*>(this));
}
@ -734,7 +735,8 @@ ReadOptions::ReadOptions(bool cksum, bool cache)
tailing(false),
managed(false),
total_order_seek(false),
prefix_same_as_start(false) {
prefix_same_as_start(false),
pin_data(false) {
XFUNC_TEST("", "managed_options", managed_options, xf_manage_options,
reinterpret_cast<ReadOptions*>(this));
}

Loading…
Cancel
Save