diff --git a/db/db_impl.cc b/db/db_impl.cc index 39ae88d7a..1b6bbfebb 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3828,7 +3828,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, env_, *cfd->ioptions(), cfd->user_comparator(), iter, kMaxSequenceNumber, sv->mutable_cf_options.max_sequential_skip_in_iterations, - read_options.iterate_upper_bound, read_options.prefix_same_as_start); + read_options.iterate_upper_bound, read_options.prefix_same_as_start, + read_options.pin_data); #endif } else { SequenceNumber latest_snapshot = versions_->LastSequence(); @@ -3885,7 +3886,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( env_, *cfd->ioptions(), cfd->user_comparator(), snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations, - read_options.iterate_upper_bound, read_options.prefix_same_as_start); + read_options.iterate_upper_bound, read_options.prefix_same_as_start, + read_options.pin_data); InternalIterator* internal_iter = NewInternalIterator(read_options, cfd, sv, db_iter->GetArena()); @@ -3931,10 +3933,11 @@ Status DBImpl::NewIterators( auto cfd = reinterpret_cast(cfh)->cfd(); SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); auto iter = new ForwardIterator(this, read_options, cfd, sv); - iterators->push_back( - NewDBIterator(env_, *cfd->ioptions(), cfd->user_comparator(), iter, - kMaxSequenceNumber, - sv->mutable_cf_options.max_sequential_skip_in_iterations)); + iterators->push_back(NewDBIterator( + env_, *cfd->ioptions(), cfd->user_comparator(), iter, + kMaxSequenceNumber, + sv->mutable_cf_options.max_sequential_skip_in_iterations, nullptr, + false, read_options.pin_data)); } #endif } else { @@ -3953,7 +3956,8 @@ Status DBImpl::NewIterators( ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( env_, *cfd->ioptions(), cfd->user_comparator(), snapshot, - sv->mutable_cf_options.max_sequential_skip_in_iterations); + sv->mutable_cf_options.max_sequential_skip_in_iterations, nullptr, + false, read_options.pin_data); InternalIterator* internal_iter = NewInternalIterator(read_options, cfd, sv, db_iter->GetArena()); db_iter->SetIterUnderDBIter(internal_iter); diff --git a/db/db_iter.cc b/db/db_iter.cc index 74558d56d..8f8e64df2 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -76,7 +76,8 @@ class DBIter: public Iterator { current_entry_is_merged_(false), statistics_(ioptions.statistics), iterate_upper_bound_(iterate_upper_bound), - prefix_same_as_start_(prefix_same_as_start) { + prefix_same_as_start_(prefix_same_as_start), + iter_pinned_(false) { RecordTick(statistics_, NO_ITERATORS); prefix_extractor_ = ioptions.prefix_extractor; max_skip_ = max_sequential_skip_in_iterations; @@ -92,6 +93,9 @@ class DBIter: public Iterator { virtual void SetIter(InternalIterator* iter) { assert(iter_ == nullptr); iter_ = iter; + if (iter_ && iter_pinned_) { + iter_->PinData(); + } } virtual bool Valid() const override { return valid_; } virtual Slice key() const override { @@ -110,6 +114,32 @@ class DBIter: public Iterator { return status_; } } + virtual Status PinData() { + Status s; + if (iter_) { + s = iter_->PinData(); + } + if (s.ok()) { + // Even if iter_ is nullptr, we set iter_pinned_ to true so that when + // iter_ is updated using SetIter, we Pin it. + iter_pinned_ = true; + } + return s; + } + virtual Status ReleasePinnedData() { + Status s; + if (iter_) { + s = iter_->ReleasePinnedData(); + } + if (s.ok()) { + iter_pinned_ = false; + } + return s; + } + virtual bool IsKeyPinned() const override { + assert(valid_); + return iter_pinned_ && saved_key_.IsKeyPinned(); + } virtual void Next() override; virtual void Prev() override; @@ -159,6 +189,7 @@ class DBIter: public Iterator { const Slice* iterate_upper_bound_; IterKey prefix_start_; bool prefix_same_as_start_; + bool iter_pinned_; // No copying allowed DBIter(const DBIter&); @@ -257,18 +288,21 @@ void DBIter::FindNextUserEntryInternal(bool skipping) { case kTypeSingleDeletion: // Arrange to skip all upcoming entries for this key since // they are hidden by this deletion. - saved_key_.SetKey(ikey.user_key); + saved_key_.SetKey(ikey.user_key, + !iter_->IsKeyPinned() /* copy */); skipping = true; num_skipped = 0; PERF_COUNTER_ADD(internal_delete_skipped_count, 1); break; case kTypeValue: valid_ = true; - saved_key_.SetKey(ikey.user_key); + saved_key_.SetKey(ikey.user_key, + !iter_->IsKeyPinned() /* copy */); return; case kTypeMerge: // By now, we are sure the current ikey is going to yield a value - saved_key_.SetKey(ikey.user_key); + saved_key_.SetKey(ikey.user_key, + !iter_->IsKeyPinned() /* copy */); current_entry_is_merged_ = true; valid_ = true; MergeValuesNewToOld(); // Go to a different state machine @@ -428,7 +462,8 @@ void DBIter::PrevInternal() { ParsedInternalKey ikey; while (iter_->Valid()) { - saved_key_.SetKey(ExtractUserKey(iter_->key())); + saved_key_.SetKey(ExtractUserKey(iter_->key()), + !iter_->IsKeyPinned() /* copy */); if (FindValueForCurrentKey()) { valid_ = true; if (!iter_->Valid()) { @@ -744,7 +779,7 @@ void DBIter::SeekToLast() { // it will seek to the last key before the // ReadOptions.iterate_upper_bound if (iter_->Valid() && iterate_upper_bound_ != nullptr) { - saved_key_.SetKey(*iterate_upper_bound_); + saved_key_.SetKey(*iterate_upper_bound_, false /* copy */); std::string last_key; AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), kMaxSequenceNumber, @@ -781,10 +816,15 @@ Iterator* NewDBIterator(Env* env, const ImmutableCFOptions& ioptions, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, const Slice* iterate_upper_bound, - bool prefix_same_as_start) { - return new DBIter(env, ioptions, user_key_comparator, internal_iter, sequence, - false, max_sequential_skip_in_iterations, - iterate_upper_bound, prefix_same_as_start); + bool prefix_same_as_start, bool pin_data) { + DBIter* db_iter = + new DBIter(env, ioptions, user_key_comparator, internal_iter, sequence, + false, max_sequential_skip_in_iterations, iterate_upper_bound, + prefix_same_as_start); + if (pin_data) { + db_iter->PinData(); + } + return db_iter; } ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); } @@ -806,6 +846,13 @@ inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); } inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); } inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); } inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); } +inline Status ArenaWrappedDBIter::PinData() { return db_iter_->PinData(); } +inline Status ArenaWrappedDBIter::ReleasePinnedData() { + return db_iter_->ReleasePinnedData(); +} +inline bool ArenaWrappedDBIter::IsKeyPinned() const { + return db_iter_->IsKeyPinned(); +} void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1, void* arg2) { db_iter_->RegisterCleanup(function, arg1, arg2); @@ -815,7 +862,8 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator( Env* env, const ImmutableCFOptions& ioptions, const Comparator* user_key_comparator, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, - const Slice* iterate_upper_bound, bool prefix_same_as_start) { + const Slice* iterate_upper_bound, bool prefix_same_as_start, + bool pin_data) { ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); Arena* arena = iter->GetArena(); auto mem = arena->AllocateAligned(sizeof(DBIter)); @@ -825,6 +873,9 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator( iterate_upper_bound, prefix_same_as_start); iter->SetDBIter(db_iter); + if (pin_data) { + iter->PinData(); + } return iter; } diff --git a/db/db_iter.h b/db/db_iter.h index 740d8c51b..ba52c8cf9 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -30,7 +30,8 @@ extern Iterator* NewDBIterator(Env* env, const ImmutableCFOptions& options, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, const Slice* iterate_upper_bound = nullptr, - bool prefix_same_as_start = false); + bool prefix_same_as_start = false, + bool pin_data = false); // A wrapper iterator which wraps DB Iterator and the arena, with which the DB // iterator is supposed be allocated. This class is used as an entry point of @@ -63,6 +64,9 @@ class ArenaWrappedDBIter : public Iterator { virtual Status status() const override; void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2); + virtual Status PinData(); + virtual Status ReleasePinnedData(); + virtual bool IsKeyPinned() const override; private: DBIter* db_iter_; @@ -75,6 +79,6 @@ extern ArenaWrappedDBIter* NewArenaWrappedDbIterator( const Comparator* user_key_comparator, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, const Slice* iterate_upper_bound = nullptr, - bool prefix_same_as_start = false); + bool prefix_same_as_start = false, bool pin_data = false); } // namespace rocksdb diff --git a/db/db_test.cc b/db/db_test.cc index d3bf4b864..86bec7cf6 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -118,7 +118,6 @@ class DBTestWithParam uint32_t max_subcompactions_; bool exclusive_manual_compaction_; }; - #ifndef ROCKSDB_LITE TEST_F(DBTest, Empty) { do { @@ -10143,6 +10142,321 @@ TEST_F(DBTest, SSTsWithLdbSuffixHandling) { Destroy(options); } +TEST_F(DBTest, PinnedDataIteratorRandomized) { + enum TestConfig { + NORMAL, + CLOSE_AND_OPEN, + COMPACT_BEFORE_READ, + FLUSH_EVERY_1000, + MAX + }; + + // Generate Random data + Random rnd(301); + + int puts = 100000; + int key_pool = puts * 0.7; + int key_size = 100; + int val_size = 1000; + int seeks_percentage = 20; // 20% of keys will be used to test seek() + int delete_percentage = 20; // 20% of keys will be deleted + int merge_percentage = 20; // 20% of keys will be added using Merge() + + for (int run_config = 0; run_config < TestConfig::MAX; run_config++) { + Options options = CurrentOptions(); + BlockBasedTableOptions table_options; + table_options.use_delta_encoding = false; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + options.merge_operator = MergeOperators::CreatePutOperator(); + DestroyAndReopen(options); + + std::vector generated_keys(key_pool); + for (int i = 0; i < key_pool; i++) { + generated_keys[i] = RandomString(&rnd, key_size); + } + + std::map true_data; + std::vector random_keys; + std::vector deleted_keys; + for (int i = 0; i < puts; i++) { + auto& k = generated_keys[rnd.Next() % key_pool]; + auto v = RandomString(&rnd, val_size); + + // Insert data to true_data map and to DB + true_data[k] = v; + if (rnd.OneIn(100.0 / merge_percentage)) { + ASSERT_OK(db_->Merge(WriteOptions(), k, v)); + } else { + ASSERT_OK(Put(k, v)); + } + + // Pick random keys to be used to test Seek() + if (rnd.OneIn(100.0 / seeks_percentage)) { + random_keys.push_back(k); + } + + // Delete some random keys + if (rnd.OneIn(100.0 / delete_percentage)) { + deleted_keys.push_back(k); + true_data.erase(k); + ASSERT_OK(Delete(k)); + } + + if (run_config == TestConfig::FLUSH_EVERY_1000) { + if (i && i % 1000 == 0) { + Flush(); + } + } + } + + if (run_config == TestConfig::CLOSE_AND_OPEN) { + Close(); + Reopen(options); + } else if (run_config == TestConfig::COMPACT_BEFORE_READ) { + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + } + + ReadOptions ro; + ro.pin_data = true; + auto iter = db_->NewIterator(ro); + + { + // Test Seek to random keys + printf("Testing seek on %zu keys\n", random_keys.size()); + std::vector keys_slices; + std::vector true_keys; + for (auto& k : random_keys) { + iter->Seek(k); + if (!iter->Valid()) { + ASSERT_EQ(true_data.lower_bound(k), true_data.end()); + continue; + } + ASSERT_TRUE(iter->IsKeyPinned()); + keys_slices.push_back(iter->key()); + true_keys.push_back(true_data.lower_bound(k)->first); + } + + for (size_t i = 0; i < keys_slices.size(); i++) { + ASSERT_EQ(keys_slices[i].ToString(), true_keys[i]); + } + } + + { + // Test iterating all data forward + printf("Testing iterating forward on all keys\n"); + std::vector all_keys; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_TRUE(iter->IsKeyPinned()); + all_keys.push_back(iter->key()); + } + ASSERT_EQ(all_keys.size(), true_data.size()); + + // Verify that all keys slices are valid + auto data_iter = true_data.begin(); + for (size_t i = 0; i < all_keys.size(); i++) { + ASSERT_EQ(all_keys[i].ToString(), data_iter->first); + data_iter++; + } + } + + { + // Test iterating all data backward + printf("Testing iterating backward on all keys\n"); + std::vector all_keys; + for (iter->SeekToLast(); iter->Valid(); iter->Prev()) { + ASSERT_TRUE(iter->IsKeyPinned()); + all_keys.push_back(iter->key()); + } + ASSERT_EQ(all_keys.size(), true_data.size()); + + // Verify that all keys slices are valid (backward) + auto data_iter = true_data.rbegin(); + for (size_t i = 0; i < all_keys.size(); i++) { + ASSERT_EQ(all_keys[i].ToString(), data_iter->first); + data_iter++; + } + } + + delete iter; + } +} + +#ifndef ROCKSDB_LITE +TEST_F(DBTest, PinnedDataIteratorMultipleFiles) { + Options options = CurrentOptions(); + BlockBasedTableOptions table_options; + table_options.use_delta_encoding = false; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + options.disable_auto_compactions = true; + options.write_buffer_size = 1024 * 1024 * 10; // 10 Mb + DestroyAndReopen(options); + + std::map true_data; + + // Generate 4 sst files in L2 + Random rnd(301); + for (int i = 1; i <= 1000; i++) { + std::string k = Key(i * 3); + std::string v = RandomString(&rnd, 100); + ASSERT_OK(Put(k, v)); + true_data[k] = v; + if (i % 250 == 0) { + ASSERT_OK(Flush()); + } + } + ASSERT_EQ(FilesPerLevel(0), "4"); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ(FilesPerLevel(0), "0,4"); + + // Generate 4 sst files in L0 + for (int i = 1; i <= 1000; i++) { + std::string k = Key(i * 2); + std::string v = RandomString(&rnd, 100); + ASSERT_OK(Put(k, v)); + true_data[k] = v; + if (i % 250 == 0) { + ASSERT_OK(Flush()); + } + } + ASSERT_EQ(FilesPerLevel(0), "4,4"); + + // Add some keys/values in memtables + for (int i = 1; i <= 1000; i++) { + std::string k = Key(i); + std::string v = RandomString(&rnd, 100); + ASSERT_OK(Put(k, v)); + true_data[k] = v; + } + ASSERT_EQ(FilesPerLevel(0), "4,4"); + + ReadOptions ro; + ro.pin_data = true; + auto iter = db_->NewIterator(ro); + + std::vector> results; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_TRUE(iter->IsKeyPinned()); + results.emplace_back(iter->key(), iter->value().ToString()); + } + + ASSERT_EQ(results.size(), true_data.size()); + auto data_iter = true_data.begin(); + for (size_t i = 0; i < results.size(); i++, data_iter++) { + auto& kv = results[i]; + ASSERT_EQ(kv.first, data_iter->first); + ASSERT_EQ(kv.second, data_iter->second); + } + + delete iter; +} +#endif + +TEST_F(DBTest, PinnedDataIteratorMergeOperator) { + Options options = CurrentOptions(); + BlockBasedTableOptions table_options; + table_options.use_delta_encoding = false; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + options.merge_operator = MergeOperators::CreateUInt64AddOperator(); + DestroyAndReopen(options); + + std::string numbers[7]; + for (int val = 0; val <= 6; val++) { + PutFixed64(numbers + val, val); + } + + // +1 all keys in range [ 0 => 999] + for (int i = 0; i < 1000; i++) { + WriteOptions wo; + ASSERT_OK(db_->Merge(wo, Key(i), numbers[1])); + } + + // +2 all keys divisible by 2 in range [ 0 => 999] + for (int i = 0; i < 1000; i += 2) { + WriteOptions wo; + ASSERT_OK(db_->Merge(wo, Key(i), numbers[2])); + } + + // +3 all keys divisible by 5 in range [ 0 => 999] + for (int i = 0; i < 1000; i += 5) { + WriteOptions wo; + ASSERT_OK(db_->Merge(wo, Key(i), numbers[3])); + } + + ReadOptions ro; + ro.pin_data = true; + auto iter = db_->NewIterator(ro); + + std::vector> results; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_TRUE(iter->IsKeyPinned()); + results.emplace_back(iter->key(), iter->value().ToString()); + } + + ASSERT_EQ(results.size(), 1000); + for (size_t i = 0; i < results.size(); i++) { + auto& kv = results[i]; + ASSERT_EQ(kv.first, Key(static_cast(i))); + int expected_val = 1; + if (i % 2 == 0) { + expected_val += 2; + } + if (i % 5 == 0) { + expected_val += 3; + } + ASSERT_EQ(kv.second, numbers[expected_val]); + } + + delete iter; +} + +TEST_F(DBTest, PinnedDataIteratorReadAfterUpdate) { + Options options = CurrentOptions(); + BlockBasedTableOptions table_options; + table_options.use_delta_encoding = false; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + options.write_buffer_size = 100000; + DestroyAndReopen(options); + + Random rnd(301); + + std::map true_data; + for (int i = 0; i < 1000; i++) { + std::string k = RandomString(&rnd, 10); + std::string v = RandomString(&rnd, 1000); + ASSERT_OK(Put(k, v)); + true_data[k] = v; + } + + ReadOptions ro; + ro.pin_data = true; + auto iter = db_->NewIterator(ro); + + // Delete 50% of the keys and update the other 50% + for (auto& kv : true_data) { + if (rnd.OneIn(2)) { + ASSERT_OK(Delete(kv.first)); + } else { + std::string new_val = RandomString(&rnd, 1000); + ASSERT_OK(Put(kv.first, new_val)); + } + } + + std::vector> results; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_TRUE(iter->IsKeyPinned()); + results.emplace_back(iter->key(), iter->value().ToString()); + } + + auto data_iter = true_data.begin(); + for (size_t i = 0; i < results.size(); i++, data_iter++) { + auto& kv = results[i]; + ASSERT_EQ(kv.first, data_iter->first); + ASSERT_EQ(kv.second, data_iter->second); + } + + delete iter; +} + INSTANTIATE_TEST_CASE_P(DBTestWithParam, DBTestWithParam, ::testing::Combine(::testing::Values(1, 4), ::testing::Bool())); diff --git a/db/dbformat.h b/db/dbformat.h index 2f5d59e60..1e1169639 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -271,7 +271,8 @@ inline LookupKey::~LookupKey() { class IterKey { public: - IterKey() : key_(space_), buf_size_(sizeof(space_)), key_size_(0) {} + IterKey() + : buf_(space_), buf_size_(sizeof(space_)), key_(buf_), key_size_(0) {} ~IterKey() { ResetBuffer(); } @@ -293,31 +294,41 @@ class IterKey { void TrimAppend(const size_t shared_len, const char* non_shared_data, const size_t non_shared_len) { assert(shared_len <= key_size_); - size_t total_size = shared_len + non_shared_len; - if (total_size <= buf_size_) { - key_size_ = total_size; - } else { + + if (IsKeyPinned() /* key is not in buf_ */) { + // Copy the key from external memory to buf_ (copy shared_len bytes) + EnlargeBufferIfNeeded(total_size); + memcpy(buf_, key_, shared_len); + } else if (total_size > buf_size_) { // Need to allocate space, delete previous space char* p = new char[total_size]; memcpy(p, key_, shared_len); - if (key_ != space_) { - delete[] key_; + if (buf_ != space_) { + delete[] buf_; } - key_ = p; - key_size_ = total_size; + buf_ = p; buf_size_ = total_size; } - memcpy(key_ + shared_len, non_shared_data, non_shared_len); + memcpy(buf_ + shared_len, non_shared_data, non_shared_len); + key_ = buf_; + key_size_ = total_size; } - Slice SetKey(const Slice& key) { + Slice SetKey(const Slice& key, bool copy = true) { size_t size = key.size(); - EnlargeBufferIfNeeded(size); - memcpy(key_, key.data(), size); + if (copy) { + // Copy key to buf_ + EnlargeBufferIfNeeded(size); + memcpy(buf_, key.data(), size); + key_ = buf_; + } else { + // Update key_ to point to external memory + key_ = key.data(); + } key_size_ = size; return Slice(key_, key_size_); } @@ -335,11 +346,14 @@ class IterKey { // Update the sequence number in the internal key. Guarantees not to // invalidate slices to the key (and the user key). void UpdateInternalKey(uint64_t seq, ValueType t) { + assert(!IsKeyPinned()); assert(key_size_ >= 8); uint64_t newval = (seq << 8) | t; - EncodeFixed64(&key_[key_size_ - 8], newval); + EncodeFixed64(&buf_[key_size_ - 8], newval); } + bool IsKeyPinned() const { return (key_ != buf_); } + void SetInternalKey(const Slice& key_prefix, const Slice& user_key, SequenceNumber s, ValueType value_type = kValueTypeForSeek) { @@ -347,10 +361,12 @@ class IterKey { size_t usize = user_key.size(); EnlargeBufferIfNeeded(psize + usize + sizeof(uint64_t)); if (psize > 0) { - memcpy(key_, key_prefix.data(), psize); + memcpy(buf_, key_prefix.data(), psize); } - memcpy(key_ + psize, user_key.data(), usize); - EncodeFixed64(key_ + usize + psize, PackSequenceAndType(s, value_type)); + memcpy(buf_ + psize, user_key.data(), usize); + EncodeFixed64(buf_ + usize + psize, PackSequenceAndType(s, value_type)); + + key_ = buf_; key_size_ = psize + usize + sizeof(uint64_t); } @@ -377,20 +393,22 @@ class IterKey { void EncodeLengthPrefixedKey(const Slice& key) { auto size = key.size(); EnlargeBufferIfNeeded(size + static_cast(VarintLength(size))); - char* ptr = EncodeVarint32(key_, static_cast(size)); + char* ptr = EncodeVarint32(buf_, static_cast(size)); memcpy(ptr, key.data(), size); + key_ = buf_; } private: - char* key_; + char* buf_; size_t buf_size_; + const char* key_; size_t key_size_; char space_[32]; // Avoid allocation for short keys void ResetBuffer() { - if (key_ != space_) { - delete[] key_; - key_ = space_; + if (buf_ != space_) { + delete[] buf_; + buf_ = space_; } buf_size_ = sizeof(space_); key_size_ = 0; @@ -407,7 +425,7 @@ class IterKey { if (key_size > buf_size_) { // Need to enlarge the buffer. ResetBuffer(); - key_ = new char[key_size]; + buf_ = new char[key_size]; buf_size_ = key_size; } } diff --git a/db/listener_test.cc b/db/listener_test.cc index b2825dff8..50c54e65f 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -40,7 +40,7 @@ namespace rocksdb { class EventListenerTest : public DBTestBase { public: - EventListenerTest() : DBTestBase("listener_test") {} + EventListenerTest() : DBTestBase("/listener_test") {} const size_t k110KB = 110 << 10; }; diff --git a/db/memtable.cc b/db/memtable.cc index e48e75e3b..484e18e91 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -274,6 +274,21 @@ class MemTableIterator : public InternalIterator { virtual Status status() const override { return Status::OK(); } + virtual Status PinData() override { + // memtable data is always pinned + return Status::OK(); + } + + virtual Status ReleasePinnedData() override { + // memtable data is always pinned + return Status::OK(); + } + + virtual bool IsKeyPinned() const override { + // memtable data is always pinned + return true; + } + private: DynamicBloom* bloom_; const SliceTransform* const prefix_extractor_; diff --git a/include/rocksdb/iterator.h b/include/rocksdb/iterator.h index 1e7600d84..885232db6 100644 --- a/include/rocksdb/iterator.h +++ b/include/rocksdb/iterator.h @@ -95,6 +95,15 @@ class Iterator : public Cleanable { // satisfied without doing some IO, then this returns Status::Incomplete(). virtual Status status() const = 0; + // If true, this means that the Slice returned by key() is valid as long + // as the iterator is not deleted and ReleasePinnedData() is not called. + // + // IsKeyPinned() is guaranteed to always return true if + // - Iterator created with ReadOptions::pin_data = true + // - DB tables were created with BlockBasedTableOptions::use_delta_encoding + // set to false. + virtual bool IsKeyPinned() const { return false; } + private: // No copying allowed Iterator(const Iterator&); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 97457925d..ad33bd86a 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1325,6 +1325,13 @@ struct ReadOptions { // Default: false bool prefix_same_as_start; + // Keep the blocks loaded by the iterator pinned in memory as long as the + // iterator is not deleted, If used when reading from tables created with + // BlockBasedTableOptions::use_delta_encoding = false, Iterator::IsKeyPinned() + // is guaranteed to return true. + // Default: false + bool pin_data; + ReadOptions(); ReadOptions(bool cksum, bool cache); }; diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index eafcbaeca..f522bf869 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -119,6 +119,12 @@ struct BlockBasedTableOptions { // leave this parameter alone. int block_restart_interval = 16; + // Use delta encoding to compress keys in blocks. + // Iterator::PinData() requires this option to be disabled. + // + // Default: true + bool use_delta_encoding = true; + // If non-nullptr, use the specified filter policy to reduce disk reads. // Many applications will benefit from passing the result of // NewBloomFilterPolicy() here. diff --git a/table/block.cc b/table/block.cc index 9e72a0bd9..c84dc173d 100644 --- a/table/block.cc +++ b/table/block.cc @@ -153,7 +153,14 @@ bool BlockIter::ParseNextKey() { CorruptionError(); return false; } else { - key_.TrimAppend(shared, p, non_shared); + if (shared == 0) { + // If this key dont share any bytes with prev key then we dont need + // to decode it and can use it's address in the block directly. + key_.SetKey(Slice(p, non_shared), false /* copy */); + } else { + // This key share `shared` bytes with prev key, we need to decode it + key_.TrimAppend(shared, p, non_shared); + } value_ = Slice(p + non_shared, value_length); while (restart_index_ + 1 < num_restarts_ && GetRestartPoint(restart_index_ + 1) < current_) { diff --git a/table/block.h b/table/block.h index 0a37b90fa..4fe63add6 100644 --- a/table/block.h +++ b/table/block.h @@ -151,6 +151,18 @@ class BlockIter : public InternalIterator { virtual void SeekToLast() override; + virtual Status PinData() override { + // block data is always pinned. + return Status::OK(); + } + + virtual Status ReleasePinnedData() override { + // block data is always pinned. + return Status::OK(); + } + + virtual bool IsKeyPinned() const override { return key_.IsKeyPinned(); } + private: const Comparator* comparator_; const char* data_; // underlying block contents diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 319235fbe..006908eaa 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -481,7 +481,8 @@ struct BlockBasedTableBuilder::Rep { table_options(table_opt), internal_comparator(icomparator), file(f), - data_block(table_options.block_restart_interval), + data_block(table_options.block_restart_interval, + table_options.use_delta_encoding), internal_prefix_transform(_ioptions.prefix_extractor), index_builder(CreateIndexBuilder(table_options.index_type, &internal_comparator, diff --git a/table/block_builder.cc b/table/block_builder.cc index 1eee96d46..846d62369 100644 --- a/table/block_builder.cc +++ b/table/block_builder.cc @@ -41,8 +41,9 @@ namespace rocksdb { -BlockBuilder::BlockBuilder(int block_restart_interval) +BlockBuilder::BlockBuilder(int block_restart_interval, bool use_delta_encoding) : block_restart_interval_(block_restart_interval), + use_delta_encoding_(use_delta_encoding), restarts_(), counter_(0), finished_(false) { @@ -94,17 +95,17 @@ void BlockBuilder::Add(const Slice& key, const Slice& value) { Slice last_key_piece(last_key_); assert(!finished_); assert(counter_ <= block_restart_interval_); - size_t shared = 0; - if (counter_ < block_restart_interval_) { + size_t shared = 0; // number of bytes shared with prev key + if (counter_ >= block_restart_interval_) { + // Restart compression + restarts_.push_back(static_cast(buffer_.size())); + counter_ = 0; + } else if (use_delta_encoding_) { // See how much sharing to do with previous string const size_t min_length = std::min(last_key_piece.size(), key.size()); while ((shared < min_length) && (last_key_piece[shared] == key[shared])) { shared++; } - } else { - // Restart compression - restarts_.push_back(static_cast(buffer_.size())); - counter_ = 0; } const size_t non_shared = key.size() - shared; diff --git a/table/block_builder.h b/table/block_builder.h index c01a23bea..9eec4ce33 100644 --- a/table/block_builder.h +++ b/table/block_builder.h @@ -20,7 +20,8 @@ class BlockBuilder { BlockBuilder(const BlockBuilder&) = delete; void operator=(const BlockBuilder&) = delete; - explicit BlockBuilder(int block_restart_interval); + explicit BlockBuilder(int block_restart_interval, + bool use_delta_encoding = true); // Reset the contents as if the BlockBuilder was just constructed. void Reset(); @@ -48,6 +49,7 @@ class BlockBuilder { private: const int block_restart_interval_; + const bool use_delta_encoding_; std::string buffer_; // Destination buffer std::vector restarts_; // Restart points diff --git a/table/internal_iterator.h b/table/internal_iterator.h index 51a163256..cc2430ca0 100644 --- a/table/internal_iterator.h +++ b/table/internal_iterator.h @@ -60,6 +60,24 @@ class InternalIterator : public Cleanable { // satisfied without doing some IO, then this returns Status::Incomplete(). virtual Status status() const = 0; + // Make sure that all current and future data blocks used by this iterator + // will be pinned in memory and will not be released except when + // ReleasePinnedData() is called or the iterator is deleted. + virtual Status PinData() { return Status::NotSupported(""); } + + // Release all blocks that were pinned because of PinData() and no future + // blocks will be pinned. + virtual Status ReleasePinnedData() { return Status::NotSupported(""); } + + // If true, this means that the Slice returned by key() is valid as long + // as the iterator is not deleted and ReleasePinnedData() is not called. + // + // IsKeyPinned() is guaranteed to always return true if + // - PinData() is called + // - DB tables were created with BlockBasedTableOptions::use_delta_encoding + // set to false. + virtual bool IsKeyPinned() const { return false; } + private: // No copying allowed InternalIterator(const InternalIterator&) = delete; diff --git a/table/iterator_wrapper.h b/table/iterator_wrapper.h index 2eb33b537..1998c3740 100644 --- a/table/iterator_wrapper.h +++ b/table/iterator_wrapper.h @@ -9,6 +9,8 @@ #pragma once +#include + #include "table/internal_iterator.h" namespace rocksdb { @@ -19,31 +21,95 @@ namespace rocksdb { // cache locality. class IteratorWrapper { public: - IteratorWrapper(): iter_(nullptr), valid_(false) { } - explicit IteratorWrapper(InternalIterator* _iter) : iter_(nullptr) { + IteratorWrapper() : iter_(nullptr), iters_pinned_(false), valid_(false) {} + explicit IteratorWrapper(InternalIterator* _iter) + : iter_(nullptr), iters_pinned_(false) { Set(_iter); } ~IteratorWrapper() {} InternalIterator* iter() const { return iter_; } - // Takes ownership of "iter" and will delete it when destroyed, or - // when Set() is invoked again. + // Takes the ownership of "_iter" and will delete it when destroyed. + // Next call to Set() will destroy "_iter" except if PinData() was called. void Set(InternalIterator* _iter) { - delete iter_; + if (iters_pinned_ && iter_) { + // keep old iterator until ReleasePinnedData() is called + pinned_iters_.insert(iter_); + } else { + delete iter_; + } + iter_ = _iter; if (iter_ == nullptr) { valid_ = false; } else { Update(); + if (iters_pinned_) { + // Pin new iterator + Status s = iter_->PinData(); + assert(s.ok()); + } + } + } + + Status PinData() { + Status s; + if (iters_pinned_) { + return s; + } + + if (iter_) { + s = iter_->PinData(); + } + + if (s.ok()) { + iters_pinned_ = true; + } + + return s; + } + + Status ReleasePinnedData() { + Status s; + if (!iters_pinned_) { + return s; + } + + if (iter_) { + s = iter_->ReleasePinnedData(); + } + + if (s.ok()) { + iters_pinned_ = false; + // No need to call ReleasePinnedData() for pinned_iters_ + // since we will delete them + DeletePinnedIterators(false); + } + + return s; + } + + bool IsKeyPinned() const { + assert(iter_); + return iters_pinned_ && iter_->IsKeyPinned(); + } + + void DeletePinnedIterators(bool is_arena_mode) { + for (auto it : pinned_iters_) { + if (!is_arena_mode) { + delete it; + } else { + it->~InternalIterator(); + } } + pinned_iters_.clear(); } void DeleteIter(bool is_arena_mode) { - if (!is_arena_mode) { - delete iter_; - } else { - iter_->~InternalIterator(); + if (iter_) { + pinned_iters_.insert(iter_); } + DeletePinnedIterators(is_arena_mode); } // Iterator interface methods @@ -67,6 +133,12 @@ class IteratorWrapper { } InternalIterator* iter_; + // If set to true, current and future iterators wont be deleted. + bool iters_pinned_; + // List of past iterators that are pinned and wont be deleted as long as + // iters_pinned_ is true. When we are pinning iterators this set will contain + // iterators of previous data blocks to keep them from being deleted. + std::set pinned_iters_; bool valid_; Slice key_; }; diff --git a/table/merger.cc b/table/merger.cc index 49e512581..81eb9608c 100644 --- a/table/merger.cc +++ b/table/merger.cc @@ -37,7 +37,8 @@ class MergingIterator : public InternalIterator { public: MergingIterator(const Comparator* comparator, InternalIterator** children, int n, bool is_arena_mode) - : is_arena_mode_(is_arena_mode), + : data_pinned_(false), + is_arena_mode_(is_arena_mode), comparator_(comparator), current_(nullptr), direction_(kForward), @@ -57,6 +58,10 @@ class MergingIterator : public InternalIterator { virtual void AddIterator(InternalIterator* iter) { assert(direction_ == kForward); children_.emplace_back(iter); + if (data_pinned_) { + Status s = iter->PinData(); + assert(s.ok()); + } auto new_wrapper = children_.back(); if (new_wrapper.Valid()) { minHeap_.push(&new_wrapper); @@ -238,7 +243,50 @@ class MergingIterator : public InternalIterator { return s; } + virtual Status PinData() override { + Status s; + if (data_pinned_) { + return s; + } + + for (size_t i = 0; i < children_.size(); i++) { + s = children_[i].PinData(); + if (!s.ok()) { + // We failed to pin an iterator, clean up + for (size_t j = 0; j < i; j++) { + children_[j].ReleasePinnedData(); + } + break; + } + } + data_pinned_ = s.ok(); + return s; + } + + virtual Status ReleasePinnedData() override { + Status s; + if (!data_pinned_) { + return s; + } + + for (auto& child : children_) { + Status release_status = child.ReleasePinnedData(); + if (s.ok() && !release_status.ok()) { + s = release_status; + } + } + data_pinned_ = false; + + return s; + } + + virtual bool IsKeyPinned() const override { + assert(Valid()); + return current_->IsKeyPinned(); + } + private: + bool data_pinned_; // Clears heaps for both directions, used when changing direction or seeking void ClearHeaps(); // Ensures that maxHeap_ is initialized when starting to go in the reverse diff --git a/table/table_test.cc b/table/table_test.cc index 58607bbb2..3f0f39c3a 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -1807,6 +1807,7 @@ TEST_F(BlockBasedTableTest, BlockCacheLeak) { iter->Next(); } ASSERT_OK(iter->status()); + iter.release(); const ImmutableCFOptions ioptions1(opt); ASSERT_OK(c.Reopen(ioptions1)); diff --git a/table/two_level_iterator.cc b/table/two_level_iterator.cc index dbc378529..a01c12007 100644 --- a/table/two_level_iterator.cc +++ b/table/two_level_iterator.cc @@ -61,6 +61,13 @@ class TwoLevelIterator : public InternalIterator { return status_; } } + virtual Status PinData() override { return second_level_iter_.PinData(); } + virtual Status ReleasePinnedData() override { + return second_level_iter_.ReleasePinnedData(); + } + virtual bool IsKeyPinned() const override { + return second_level_iter_.iter() ? second_level_iter_.IsKeyPinned() : false; + } private: void SaveError(const Status& s) { diff --git a/util/options.cc b/util/options.cc index 2230b774e..b59cc8d79 100644 --- a/util/options.cc +++ b/util/options.cc @@ -720,7 +720,8 @@ ReadOptions::ReadOptions() tailing(false), managed(false), total_order_seek(false), - prefix_same_as_start(false) { + prefix_same_as_start(false), + pin_data(false) { XFUNC_TEST("", "managed_options", managed_options, xf_manage_options, reinterpret_cast(this)); } @@ -734,7 +735,8 @@ ReadOptions::ReadOptions(bool cksum, bool cache) tailing(false), managed(false), total_order_seek(false), - prefix_same_as_start(false) { + prefix_same_as_start(false), + pin_data(false) { XFUNC_TEST("", "managed_options", managed_options, xf_manage_options, reinterpret_cast(this)); }