diff --git a/db/compacted_db_impl.cc b/db/compacted_db_impl.cc index e6a8a9e38..49ca61e3c 100644 --- a/db/compacted_db_impl.cc +++ b/db/compacted_db_impl.cc @@ -42,8 +42,8 @@ size_t CompactedDBImpl::FindFile(const Slice& key) { return right; } -Status CompactedDBImpl::Get(const ReadOptions& options, - ColumnFamilyHandle*, const Slice& key, std::string* value) { +Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*, + const Slice& key, PinnableSlice* value) { GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, GetContext::kNotFound, key, value, nullptr, nullptr, nullptr, nullptr); @@ -75,11 +75,14 @@ std::vector CompactedDBImpl::MultiGet(const ReadOptions& options, int idx = 0; for (auto* r : reader_list) { if (r != nullptr) { + PinnableSlice pinnable_val; + std::string& value = (*values)[idx]; GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, - GetContext::kNotFound, keys[idx], &(*values)[idx], + GetContext::kNotFound, keys[idx], &pinnable_val, nullptr, nullptr, nullptr, nullptr); LookupKey lkey(keys[idx], kMaxSequenceNumber); r->Get(options, lkey.internal_key(), &get_context); + value.assign(pinnable_val.data(), pinnable_val.size()); if (get_context.State() == GetContext::kFound) { statuses[idx] = Status::OK(); } diff --git a/db/compacted_db_impl.h b/db/compacted_db_impl.h index 6d6d512fd..906ba9579 100644 --- a/db/compacted_db_impl.h +++ b/db/compacted_db_impl.h @@ -23,7 +23,7 @@ class CompactedDBImpl : public DBImpl { using DB::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override; + PinnableSlice* value) override; using DB::MultiGet; virtual std::vector MultiGet( const ReadOptions& options, diff --git a/db/db_impl.cc b/db/db_impl.cc index a6c00fa3c..4158aa08b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3940,7 +3940,7 @@ ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const { Status DBImpl::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) { + PinnableSlice* value) { return GetImpl(read_options, column_family, key, value); } @@ -3998,7 +3998,8 @@ SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork( Status DBImpl::GetImpl(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value, bool* value_found) { + PinnableSlice* pinnable_val, bool* value_found) { + assert(pinnable_val != nullptr); StopWatch sw(env_, stats_, DB_GET); PERF_TIMER_GUARD(get_snapshot_time); @@ -4046,14 +4047,16 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, has_unpersisted_data_.load(std::memory_order_relaxed)); bool done = false; if (!skip_memtable) { - if (sv->mem->Get(lkey, value, &s, &merge_context, &range_del_agg, - read_options)) { + if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, + &range_del_agg, read_options)) { done = true; + pinnable_val->PinSelf(); RecordTick(stats_, MEMTABLE_HIT); } else if ((s.ok() || s.IsMergeInProgress()) && - sv->imm->Get(lkey, value, &s, &merge_context, &range_del_agg, - read_options)) { + sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, + &range_del_agg, read_options)) { done = true; + pinnable_val->PinSelf(); RecordTick(stats_, MEMTABLE_HIT); } if (!done && !s.ok() && !s.IsMergeInProgress()) { @@ -4062,7 +4065,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, } if (!done) { PERF_TIMER_GUARD(get_from_output_files_time); - sv->current->Get(read_options, lkey, value, &s, &merge_context, + sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context, &range_del_agg, value_found); RecordTick(stats_, MEMTABLE_MISS); } @@ -4073,8 +4076,9 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, ReturnAndCleanupSuperVersion(cfd, sv); RecordTick(stats_, NUMBER_KEYS_READ); - RecordTick(stats_, BYTES_READ, value->size()); - MeasureTime(stats_, BYTES_PER_READ, value->size()); + size_t size = pinnable_val->size(); + RecordTick(stats_, BYTES_READ, size); + MeasureTime(stats_, BYTES_PER_READ, size); } return s; } @@ -4163,9 +4167,11 @@ std::vector DBImpl::MultiGet( } } if (!done) { + PinnableSlice pinnable_val; PERF_TIMER_GUARD(get_from_output_files_time); - super_version->current->Get(read_options, lkey, value, &s, &merge_context, - &range_del_agg); + super_version->current->Get(read_options, lkey, &pinnable_val, &s, + &merge_context, &range_del_agg); + value->assign(pinnable_val.data(), pinnable_val.size()); // TODO(?): RecordTick(stats_, MEMTABLE_MISS)? } @@ -4377,13 +4383,16 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { bool DBImpl::KeyMayExist(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value, bool* value_found) { + assert(value != nullptr); if (value_found != nullptr) { // falsify later if key-may-exist but can't fetch value *value_found = true; } ReadOptions roptions = read_options; roptions.read_tier = kBlockCacheTier; // read from block cache only - auto s = GetImpl(roptions, column_family, key, value, value_found); + PinnableSlice pinnable_val; + auto s = GetImpl(roptions, column_family, key, &pinnable_val, value_found); + value->assign(pinnable_val.data(), pinnable_val.size()); // If block_cache is enabled and the index block of the table didn't // not present in block_cache, the return value will be Status::Incomplete. diff --git a/db/db_impl.h b/db/db_impl.h index 54562f7e9..3ae37adae 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -91,7 +91,7 @@ class DBImpl : public DB { using DB::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override; + PinnableSlice* value) override; using DB::MultiGet; virtual std::vector MultiGet( const ReadOptions& options, @@ -1104,7 +1104,7 @@ class DBImpl : public DB { // Function that Get and KeyMayExist call with no_io true or false // Note: 'value_found' from KeyMayExist propagates here Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, - const Slice& key, std::string* value, + const Slice& key, PinnableSlice* value, bool* value_found = nullptr); bool GetIntPropertyInternal(ColumnFamilyData* cfd, diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index f185209c1..f92ee7c3c 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -31,7 +31,8 @@ DBImplReadOnly::~DBImplReadOnly() { // Implementations of the DB interface Status DBImplReadOnly::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) { + PinnableSlice* pinnable_val) { + assert(pinnable_val != nullptr); Status s; SequenceNumber snapshot = versions_->LastSequence(); auto cfh = reinterpret_cast(column_family); @@ -40,12 +41,13 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options, MergeContext merge_context; RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot); LookupKey lkey(key, snapshot); - if (super_version->mem->Get(lkey, value, &s, &merge_context, &range_del_agg, - read_options)) { + if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, + &range_del_agg, read_options)) { + pinnable_val->PinSelf(); } else { PERF_TIMER_GUARD(get_from_output_files_time); - super_version->current->Get(read_options, lkey, value, &s, &merge_context, - &range_del_agg); + super_version->current->Get(read_options, lkey, pinnable_val, &s, + &merge_context, &range_del_agg); } return s; } diff --git a/db/db_impl_readonly.h b/db/db_impl_readonly.h index 56d207e6d..b46652060 100644 --- a/db/db_impl_readonly.h +++ b/db/db_impl_readonly.h @@ -22,7 +22,7 @@ class DBImplReadOnly : public DBImpl { using DB::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override; + PinnableSlice* value) override; // TODO: Implement ReadOnly MultiGet? diff --git a/db/db_test.cc b/db/db_test.cc index 0083b37b8..d282de7c0 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2212,7 +2212,7 @@ class ModelDB : public DB { } using DB::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* cf, - const Slice& key, std::string* value) override { + const Slice& key, PinnableSlice* value) override { return Status::NotSupported(key); } diff --git a/db/version_set.cc b/db/version_set.cc index 31606c396..42aeac039 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -927,7 +927,7 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, version_number_(version_number) {} void Version::Get(const ReadOptions& read_options, const LookupKey& k, - std::string* value, Status* status, + PinnableSlice* value, Status* status, MergeContext* merge_context, RangeDelAggregator* range_del_agg, bool* value_found, bool* key_exists, SequenceNumber* seq) { @@ -1004,9 +1004,13 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, } // merge_operands are in saver and we hit the beginning of the key history // do a final merge of nullptr and operands; - *status = MergeHelper::TimedFullMerge(merge_operator_, user_key, nullptr, - merge_context->GetOperands(), value, - info_log_, db_statistics_, env_); + std::string* str_value = value != nullptr ? value->GetSelf() : nullptr; + *status = MergeHelper::TimedFullMerge( + merge_operator_, user_key, nullptr, merge_context->GetOperands(), + str_value, info_log_, db_statistics_, env_); + if (LIKELY(value != nullptr)) { + value->PinSelf(); + } } else { if (key_exists != nullptr) { *key_exists = false; diff --git a/db/version_set.h b/db/version_set.h index bcf6951a7..818a86fea 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -477,7 +477,7 @@ class Version { // for the key if a key was found. // // REQUIRES: lock is not held - void Get(const ReadOptions&, const LookupKey& key, std::string* val, + void Get(const ReadOptions&, const LookupKey& key, PinnableSlice* value, Status* status, MergeContext* merge_context, RangeDelAggregator* range_del_agg, bool* value_found = nullptr, bool* key_exists = nullptr, SequenceNumber* seq = nullptr); diff --git a/include/rocksdb/cleanable.h b/include/rocksdb/cleanable.h new file mode 100644 index 000000000..5df585560 --- /dev/null +++ b/include/rocksdb/cleanable.h @@ -0,0 +1,73 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// An iterator yields a sequence of key/value pairs from a source. +// The following class defines the interface. Multiple implementations +// are provided by this library. In particular, iterators are provided +// to access the contents of a Table or a DB. +// +// Multiple threads can invoke const methods on an Iterator without +// external synchronization, but if any of the threads may call a +// non-const method, all threads accessing the same Iterator must use +// external synchronization. + +#ifndef INCLUDE_ROCKSDB_CLEANABLE_H_ +#define INCLUDE_ROCKSDB_CLEANABLE_H_ + +namespace rocksdb { + +class Cleanable { + public: + Cleanable(); + ~Cleanable(); + // Clients are allowed to register function/arg1/arg2 triples that + // will be invoked when this iterator is destroyed. + // + // Note that unlike all of the preceding methods, this method is + // not abstract and therefore clients should not override it. + typedef void (*CleanupFunction)(void* arg1, void* arg2); + void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2); + void DelegateCleanupsTo(Cleanable* other); + // DoCkeanup and also resets the pointers for reuse + inline void Reset() { + DoCleanup(); + cleanup_.function = nullptr; + cleanup_.next = nullptr; + } + + protected: + struct Cleanup { + CleanupFunction function; + void* arg1; + void* arg2; + Cleanup* next; + }; + Cleanup cleanup_; + // It also becomes the owner of c + void RegisterCleanup(Cleanup* c); + + private: + // Performs all the cleanups. It does not reset the pointers. Making it + // private + // to prevent misuse + inline void DoCleanup() { + if (cleanup_.function != nullptr) { + (*cleanup_.function)(cleanup_.arg1, cleanup_.arg2); + for (Cleanup* c = cleanup_.next; c != nullptr;) { + (*c->function)(c->arg1, c->arg2); + Cleanup* next = c->next; + delete c; + c = next; + } + } + } +}; + +} // namespace rocksdb + +#endif // INCLUDE_ROCKSDB_CLEANABLE_H_ diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index cdbb8abf1..e74f7adef 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -16,6 +16,7 @@ #include #include #include +#include "port/likely.h" #include "rocksdb/iterator.h" #include "rocksdb/listener.h" #include "rocksdb/metadata.h" @@ -280,9 +281,21 @@ class DB { // a status for which Status::IsNotFound() returns true. // // May return some other Status on an error. + inline Status Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value) { + assert(value != nullptr); + PinnableSlice pinnable_val(value); + assert(!pinnable_val.IsPinned()); + auto s = Get(options, column_family, key, &pinnable_val); + if (LIKELY(s.ok()) && pinnable_val.IsPinned()) { + value->assign(pinnable_val.data(), pinnable_val.size()); + } // else value is already assigned + return s; + } virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) = 0; + PinnableSlice* value) = 0; virtual Status Get(const ReadOptions& options, const Slice& key, std::string* value) { return Get(options, DefaultColumnFamily(), key, value); } diff --git a/include/rocksdb/iterator.h b/include/rocksdb/iterator.h index 9d17989a7..9bfb0e3d6 100644 --- a/include/rocksdb/iterator.h +++ b/include/rocksdb/iterator.h @@ -20,43 +20,12 @@ #define STORAGE_ROCKSDB_INCLUDE_ITERATOR_H_ #include +#include "rocksdb/cleanable.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" namespace rocksdb { -class Cleanable { - public: - Cleanable(); - ~Cleanable(); - // Clients are allowed to register function/arg1/arg2 triples that - // will be invoked when this iterator is destroyed. - // - // Note that unlike all of the preceding methods, this method is - // not abstract and therefore clients should not override it. - typedef void (*CleanupFunction)(void* arg1, void* arg2); - void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2); - void DelegateCleanupsTo(Cleanable* other); - // DoCleanup and also resets the pointers for reuse - void Reset(); - - protected: - struct Cleanup { - CleanupFunction function; - void* arg1; - void* arg2; - Cleanup* next; - }; - Cleanup cleanup_; - // It also becomes the owner of c - void RegisterCleanup(Cleanup* c); - - private: - // Performs all the cleanups. It does not reset the pointers. Making it - // private to prevent misuse - inline void DoCleanup(); -}; - class Iterator : public Cleanable { public: Iterator() {} diff --git a/include/rocksdb/slice.h b/include/rocksdb/slice.h index 38d494ed9..2c858a506 100644 --- a/include/rocksdb/slice.h +++ b/include/rocksdb/slice.h @@ -25,6 +25,8 @@ #include #include +#include "rocksdb/cleanable.h" + namespace rocksdb { class Slice { @@ -116,6 +118,81 @@ class Slice { // Intentionally copyable }; +/** + * A Slice that can be pinned with some cleanup tasks, which will be run upon + * ::Reset() or object destruction, whichever is invoked first. This can be used + * to avoid memcpy by having the PinnsableSlice object referring to the data + * that is locked in the memory and release them after the data is consuned. + */ +class PinnableSlice : public Slice, public Cleanable { + public: + PinnableSlice() { buf_ = &self_space_; } + explicit PinnableSlice(std::string* buf) { buf_ = buf; } + + inline void PinSlice(const Slice& s, CleanupFunction f, void* arg1, + void* arg2) { + assert(!pinned_); + pinned_ = true; + data_ = s.data(); + size_ = s.size(); + RegisterCleanup(f, arg1, arg2); + assert(pinned_); + } + + inline void PinSlice(const Slice& s, Cleanable* cleanable) { + assert(!pinned_); + pinned_ = true; + data_ = s.data(); + size_ = s.size(); + cleanable->DelegateCleanupsTo(this); + assert(pinned_); + } + + inline void PinSelf(const Slice& slice) { + assert(!pinned_); + buf_->assign(slice.data(), slice.size()); + data_ = buf_->data(); + size_ = buf_->size(); + assert(!pinned_); + } + + inline void PinSelf() { + assert(!pinned_); + data_ = buf_->data(); + size_ = buf_->size(); + assert(!pinned_); + } + + void remove_suffix(size_t n) { + assert(n <= size()); + if (pinned_) { + size_ -= n; + } else { + buf_->erase(size() - n, n); + PinSelf(); + } + } + + void remove_prefix(size_t n) { + assert(0); // Not implemented + } + + void Reset() { + Cleanable::Reset(); + pinned_ = false; + } + + inline std::string* GetSelf() { return buf_; } + + inline bool IsPinned() { return pinned_; } + + private: + friend class PinnableSlice4Test; + std::string self_space_; + std::string* buf_; + bool pinned_ = false; +}; + // A set of Slices that are virtually concatenated together. 'parts' points // to an array of Slices. The number of elements in the array is 'num_parts'. struct SliceParts { diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index eca800c09..c61e1d9dc 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -56,7 +56,7 @@ class StackableDB : public DB { using DB::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override { + PinnableSlice* value) override { return db_->Get(options, column_family, key, value); } diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 6efba78d0..f60a8c4af 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -1470,9 +1470,6 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, iiter_unique_ptr = std::unique_ptr(iiter); } - PinnedIteratorsManager* pinned_iters_mgr = get_context->pinned_iters_mgr(); - bool pin_blocks = pinned_iters_mgr && pinned_iters_mgr->PinningEnabled(); - bool done = false; for (iiter->Seek(key); iiter->Valid() && !done; iiter->Next()) { Slice handle_value = iiter->value(); @@ -1513,17 +1510,12 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, s = Status::Corruption(Slice()); } - if (!get_context->SaveValue(parsed_key, biter.value(), pin_blocks)) { + if (!get_context->SaveValue(parsed_key, biter.value(), &biter)) { done = true; break; } } s = biter.status(); - - if (pin_blocks && get_context->State() == GetContext::kMerge) { - // Pin blocks as long as we are merging - biter.DelegateCleanupsTo(pinned_iters_mgr); - } } } if (s.ok()) { diff --git a/table/cleanable_test.cc b/table/cleanable_test.cc index 717e20ea6..631586ad0 100644 --- a/table/cleanable_test.cc +++ b/table/cleanable_test.cc @@ -47,6 +47,30 @@ TEST_F(CleanableTest, Register) { } // ~Cleanable ASSERT_EQ(6, res); + + // Test the Reset does cleanup + res = 1; + { + Cleanable c1; + c1.RegisterCleanup(Multiplier, &res, &n2); // res = 2; + c1.RegisterCleanup(Multiplier, &res, &n3); // res = 2 * 3; + c1.Reset(); + ASSERT_EQ(6, res); + } + // ~Cleanable + ASSERT_EQ(6, res); + + // Test Clenable is usable after Reset + res = 1; + { + Cleanable c1; + c1.RegisterCleanup(Multiplier, &res, &n2); // res = 2; + c1.Reset(); + ASSERT_EQ(2, res); + c1.RegisterCleanup(Multiplier, &res, &n3); // res = 2 * 3; + } + // ~Cleanable + ASSERT_EQ(6, res); } // the first Cleanup is on stack and the rest on heap, @@ -174,6 +198,76 @@ TEST_F(CleanableTest, Delegation) { ASSERT_EQ(5, res); } +static void ReleaseStringHeap(void* s, void*) { + delete reinterpret_cast(s); +} + +class PinnableSlice4Test : public PinnableSlice { + public: + void TestStringIsRegistered(std::string* s) { + ASSERT_TRUE(cleanup_.function == ReleaseStringHeap); + ASSERT_EQ(cleanup_.arg1, s); + ASSERT_EQ(cleanup_.arg2, nullptr); + ASSERT_EQ(cleanup_.next, nullptr); + } +}; + +// Putting the PinnableSlice tests here due to similarity to Cleanable tests +TEST_F(CleanableTest, PinnableSlice) { + int n2 = 2; + int res = 1; + const std::string const_str = "123"; + + { + res = 1; + PinnableSlice4Test value; + Slice slice(const_str); + value.PinSlice(slice, Multiplier, &res, &n2); + std::string str; + str.assign(value.data(), value.size()); + ASSERT_EQ(const_str, str); + } + // ~Cleanable + ASSERT_EQ(2, res); + + { + res = 1; + PinnableSlice4Test value; + Slice slice(const_str); + { + Cleanable c1; + c1.RegisterCleanup(Multiplier, &res, &n2); // res = 2; + value.PinSlice(slice, &c1); + } + // ~Cleanable + ASSERT_EQ(1, res); // cleanups must have be delegated to value + std::string str; + str.assign(value.data(), value.size()); + ASSERT_EQ(const_str, str); + } + // ~Cleanable + ASSERT_EQ(2, res); + + { + PinnableSlice4Test value; + Slice slice(const_str); + value.PinSelf(slice); + std::string str; + str.assign(value.data(), value.size()); + ASSERT_EQ(const_str, str); + } + + { + PinnableSlice4Test value; + std::string* self_str_ptr = value.GetSelf(); + self_str_ptr->assign(const_str); + value.PinSelf(); + std::string str; + str.assign(value.data(), value.size()); + ASSERT_EQ(const_str, str); + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/table/cuckoo_table_reader_test.cc b/table/cuckoo_table_reader_test.cc index e207c62bd..5d82d58fc 100644 --- a/table/cuckoo_table_reader_test.cc +++ b/table/cuckoo_table_reader_test.cc @@ -123,12 +123,12 @@ class CuckooReaderTest : public testing::Test { ASSERT_OK(reader.status()); // Assume no merge/deletion for (uint32_t i = 0; i < num_items; ++i) { - std::string value; + PinnableSlice value; GetContext get_context(ucomp, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(user_keys[i]), &value, nullptr, nullptr, nullptr, nullptr); ASSERT_OK(reader.Get(ReadOptions(), Slice(keys[i]), &get_context)); - ASSERT_EQ(values[i], value); + ASSERT_STREQ(values[i].c_str(), value.data()); } } void UpdateKeys(bool with_zero_seqno) { @@ -333,7 +333,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) { AddHashLookups(not_found_user_key, 0, kNumHashFunc); ParsedInternalKey ikey(not_found_user_key, 1000, kTypeValue); AppendInternalKey(¬_found_key, ikey); - std::string value; + PinnableSlice value; GetContext get_context(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(not_found_key), &value, nullptr, nullptr, nullptr, nullptr); @@ -346,6 +346,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) { ParsedInternalKey ikey2(not_found_user_key2, 1000, kTypeValue); std::string not_found_key2; AppendInternalKey(¬_found_key2, ikey2); + value.Reset(); GetContext get_context2(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(not_found_key2), &value, nullptr, nullptr, nullptr, nullptr); @@ -360,6 +361,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) { // Add hash values that map to empty buckets. AddHashLookups(ExtractUserKey(unused_key).ToString(), kNumHashFunc, kNumHashFunc); + value.Reset(); GetContext get_context3(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(unused_key), &value, nullptr, nullptr, nullptr, nullptr); @@ -433,12 +435,13 @@ void WriteFile(const std::vector& keys, test::Uint64Comparator(), nullptr); ASSERT_OK(reader.status()); ReadOptions r_options; - std::string value; + PinnableSlice value; // Assume only the fast path is triggered GetContext get_context(nullptr, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(), &value, nullptr, nullptr, nullptr, nullptr); for (uint64_t i = 0; i < num; ++i) { + value.Reset(); value.clear(); ASSERT_OK(reader.Get(r_options, Slice(keys[i]), &get_context)); ASSERT_TRUE(Slice(keys[i]) == Slice(&keys[i][0], 4)); @@ -480,7 +483,7 @@ void ReadKeys(uint64_t num, uint32_t batch_size) { } std::random_shuffle(keys.begin(), keys.end()); - std::string value; + PinnableSlice value; // Assume only the fast path is triggered GetContext get_context(nullptr, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(), &value, nullptr, diff --git a/table/get_context.cc b/table/get_context.cc index 280206c54..4b517a50b 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -35,7 +35,7 @@ void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) { GetContext::GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, Statistics* statistics, GetState init_state, - const Slice& user_key, std::string* ret_value, + const Slice& user_key, PinnableSlice* pinnable_val, bool* value_found, MergeContext* merge_context, RangeDelAggregator* _range_del_agg, Env* env, SequenceNumber* seq, @@ -46,7 +46,7 @@ GetContext::GetContext(const Comparator* ucmp, statistics_(statistics), state_(init_state), user_key_(user_key), - value_(ret_value), + pinnable_val_(pinnable_val), value_found_(value_found), merge_context_(merge_context), range_del_agg_(_range_del_agg), @@ -76,13 +76,13 @@ void GetContext::SaveValue(const Slice& value, SequenceNumber seq) { appendToReplayLog(replay_log_, kTypeValue, value); state_ = kFound; - if (value_ != nullptr) { - value_->assign(value.data(), value.size()); + if (LIKELY(pinnable_val_ != nullptr)) { + pinnable_val_->PinSelf(value); } } bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, - const Slice& value, bool value_pinned) { + const Slice& value, Cleanable* value_pinner) { assert((state_ != kMerge && parsed_key.type != kTypeMerge) || merge_context_ != nullptr); if (ucmp_->Equal(parsed_key.user_key, user_key_)) { @@ -106,17 +106,24 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, assert(state_ == kNotFound || state_ == kMerge); if (kNotFound == state_) { state_ = kFound; - if (value_ != nullptr) { - value_->assign(value.data(), value.size()); + if (LIKELY(pinnable_val_ != nullptr)) { + if (LIKELY(value_pinner != nullptr)) { + // If the backing resources for the value are provided, pin them + pinnable_val_->PinSlice(value, value_pinner); + } else { + // Otherwise copy the value + pinnable_val_->PinSelf(value); + } } } else if (kMerge == state_) { assert(merge_operator_ != nullptr); state_ = kFound; - if (value_ != nullptr) { + if (LIKELY(pinnable_val_ != nullptr)) { Status merge_status = MergeHelper::TimedFullMerge( merge_operator_, user_key_, &value, - merge_context_->GetOperands(), value_, logger_, statistics_, - env_); + merge_context_->GetOperands(), pinnable_val_->GetSelf(), + logger_, statistics_, env_); + pinnable_val_->PinSelf(); if (!merge_status.ok()) { state_ = kCorrupt; } @@ -134,12 +141,12 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, state_ = kDeleted; } else if (kMerge == state_) { state_ = kFound; - if (value_ != nullptr) { - Status merge_status = - MergeHelper::TimedFullMerge(merge_operator_, user_key_, nullptr, - merge_context_->GetOperands(), - value_, logger_, statistics_, env_); - + if (LIKELY(pinnable_val_ != nullptr)) { + Status merge_status = MergeHelper::TimedFullMerge( + merge_operator_, user_key_, nullptr, + merge_context_->GetOperands(), pinnable_val_->GetSelf(), + logger_, statistics_, env_); + pinnable_val_->PinSelf(); if (!merge_status.ok()) { state_ = kCorrupt; } @@ -150,7 +157,14 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, case kTypeMerge: assert(state_ == kNotFound || state_ == kMerge); state_ = kMerge; - merge_context_->PushOperand(value, value_pinned); + // value_pinner is not set from plain_table_reader.cc for example. + if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() && + value_pinner != nullptr) { + value_pinner->DelegateCleanupsTo(pinned_iters_mgr()); + merge_context_->PushOperand(value, true /*value_pinned*/); + } else { + merge_context_->PushOperand(value, false); + } return true; default: @@ -166,6 +180,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, void replayGetContextLog(const Slice& replay_log, const Slice& user_key, GetContext* get_context) { #ifndef ROCKSDB_LITE + static Cleanable nonToClean; Slice s = replay_log; while (s.size()) { auto type = static_cast(*s.data()); @@ -178,7 +193,8 @@ void replayGetContextLog(const Slice& replay_log, const Slice& user_key, // Since SequenceNumber is not stored and unknown, we will use // kMaxSequenceNumber. get_context->SaveValue( - ParsedInternalKey(user_key, kMaxSequenceNumber, type), value, true); + ParsedInternalKey(user_key, kMaxSequenceNumber, type), value, + &nonToClean); } #else // ROCKSDB_LITE assert(false); diff --git a/table/get_context.h b/table/get_context.h index e57c7352c..9f65156f7 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -9,6 +9,7 @@ #include "db/range_del_aggregator.h" #include "rocksdb/env.h" #include "rocksdb/types.h" +#include "table/block.h" namespace rocksdb { class MergeContext; @@ -26,7 +27,7 @@ class GetContext { GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, Statistics* statistics, GetState init_state, - const Slice& user_key, std::string* ret_value, bool* value_found, + const Slice& user_key, PinnableSlice* value, bool* value_found, MergeContext* merge_context, RangeDelAggregator* range_del_agg, Env* env, SequenceNumber* seq = nullptr, PinnedIteratorsManager* _pinned_iters_mgr = nullptr); @@ -39,7 +40,7 @@ class GetContext { // Returns True if more keys need to be read (due to merges) or // False if the complete value has been found. bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value, - bool value_pinned = false); + Cleanable* value_pinner = nullptr); // Simplified version of the previous function. Should only be used when we // know that the operation is a Put. @@ -68,7 +69,7 @@ class GetContext { GetState state_; Slice user_key_; - std::string* value_; + PinnableSlice* pinnable_val_; bool* value_found_; // Is value set correctly? Used by KeyMayExist MergeContext* merge_context_; RangeDelAggregator* range_del_agg_; diff --git a/table/iterator.cc b/table/iterator.cc index a90c720d6..91f7135c0 100644 --- a/table/iterator.cc +++ b/table/iterator.cc @@ -21,24 +21,6 @@ Cleanable::Cleanable() { Cleanable::~Cleanable() { DoCleanup(); } -void Cleanable::Reset() { - DoCleanup(); - cleanup_.function = nullptr; - cleanup_.next = nullptr; -} - -void Cleanable::DoCleanup() { - if (cleanup_.function != nullptr) { - (*cleanup_.function)(cleanup_.arg1, cleanup_.arg2); - for (Cleanup* c = cleanup_.next; c != nullptr;) { - (*c->function)(c->arg1, c->arg2); - Cleanup* next = c->next; - delete c; - c = next; - } - } -} - // If the entire linked list was on heap we could have simply add attach one // link list to another. However the head is an embeded object to avoid the cost // of creating objects for most of the use cases when the Cleanable has only one diff --git a/table/table_reader_bench.cc b/table/table_reader_bench.cc index 77adb8877..7984cdb1f 100644 --- a/table/table_reader_bench.cc +++ b/table/table_reader_bench.cc @@ -166,7 +166,7 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options, std::string key = MakeKey(r1, r2, through_db); uint64_t start_time = Now(env, measured_by_nanosecond); if (!through_db) { - std::string value; + PinnableSlice value; MergeContext merge_context; RangeDelAggregator range_del_agg(ikc, {} /* snapshots */); GetContext get_context(ioptions.user_comparator, diff --git a/table/table_test.cc b/table/table_test.cc index 4c2fe43ad..4a3b049f6 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -1994,12 +1994,12 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) { ASSERT_OK(c3.Reopen(ioptions4)); reader = dynamic_cast(c3.GetTableReader()); ASSERT_TRUE(!reader->TEST_filter_block_preloaded()); - std::string value; + PinnableSlice value; GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, nullptr, nullptr, nullptr); ASSERT_OK(reader->Get(ReadOptions(), user_key, &get_context)); - ASSERT_EQ(value, "hello"); + ASSERT_STREQ(value.data(), "hello"); BlockCachePropertiesSnapshot props(options.statistics.get()); props.AssertFilterBlockStat(0, 0); c3.ResetTableReader(); @@ -2077,7 +2077,7 @@ TEST_F(BlockBasedTableTest, BlockReadCountTest) { c.Finish(options, ioptions, table_options, GetPlainInternalComparator(options.comparator), &keys, &kvmap); auto reader = c.GetTableReader(); - std::string value; + PinnableSlice value; GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, nullptr, nullptr, nullptr); @@ -2091,13 +2091,14 @@ TEST_F(BlockBasedTableTest, BlockReadCountTest) { ASSERT_EQ(perf_context.block_read_count, 1); } ASSERT_EQ(get_context.State(), GetContext::kFound); - ASSERT_EQ(value, "hello"); + ASSERT_STREQ(value.data(), "hello"); // Get non-existing key user_key = "does-not-exist"; internal_key = InternalKey(user_key, 0, kTypeValue); encoded_key = internal_key.Encode().ToString(); + value.Reset(); get_context = GetContext(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, nullptr, nullptr, nullptr); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index fd7c3c63f..faa60d38b 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -230,6 +230,8 @@ DEFINE_bool(reverse_iterator, false, DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator"); +DEFINE_bool(pin_slice, true, "use pinnable slice for point lookup"); + DEFINE_int64(batch_size, 1, "Batch size"); static bool ValidateKeySize(const char* flagname, int32_t value) { @@ -3821,6 +3823,7 @@ class Benchmark { std::unique_ptr key_guard; Slice key = AllocateKey(&key_guard); std::string value; + PinnableSlice pinnable_val; Duration duration(FLAGS_duration, reads_); while (!duration.Done(1)) { @@ -3836,11 +3839,20 @@ class Benchmark { s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key, &value); } else { - s = db_with_cfh->db->Get(options, key, &value); + if (LIKELY(FLAGS_pin_slice == 1)) { + pinnable_val.Reset(); + s = db_with_cfh->db->Get(options, + db_with_cfh->db->DefaultColumnFamily(), key, + &pinnable_val); + } else { + s = db_with_cfh->db->Get( + options, db_with_cfh->db->DefaultColumnFamily(), key, &value); + } } if (s.ok()) { found++; - bytes += key.size() + value.size(); + bytes += key.size() + + (FLAGS_pin_slice == 1 ? pinnable_val.size() : value.size()); } else if (!s.IsNotFound()) { fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str()); abort(); diff --git a/util/testutil.cc b/util/testutil.cc index 7b432ab59..fef81f406 100644 --- a/util/testutil.cc +++ b/util/testutil.cc @@ -12,6 +12,7 @@ #include #include +#include "db/memtable_list.h" #include "port/port.h" #include "util/file_reader_writer.h" diff --git a/utilities/document/document_db.cc b/utilities/document/document_db.cc index 85330b123..dacf58205 100644 --- a/utilities/document/document_db.cc +++ b/utilities/document/document_db.cc @@ -826,7 +826,7 @@ class DocumentDBImpl : public DocumentDB { // Lock now, since we're starting DB operations MutexLock l(&write_mutex_); // check if there is already a document with the same primary key - std::string value; + PinnableSlice value; Status s = DocumentDB::Get(ReadOptions(), primary_key_column_family_, primary_key_slice, &value); if (!s.IsNotFound()) { @@ -1039,7 +1039,7 @@ class DocumentDBImpl : public DocumentDB { // RocksDB functions virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override { + PinnableSlice* value) override { return Status::NotSupported(""); } virtual Status Get(const ReadOptions& options, const Slice& key, diff --git a/utilities/ttl/db_ttl_impl.cc b/utilities/ttl/db_ttl_impl.cc index b9edb3cf3..975fbf387 100644 --- a/utilities/ttl/db_ttl_impl.cc +++ b/utilities/ttl/db_ttl_impl.cc @@ -170,6 +170,17 @@ bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl, Env* env) { return (timestamp_value + ttl) < curtime; } +// Strips the TS from the end of the slice +Status DBWithTTLImpl::StripTS(PinnableSlice* pinnable_val) { + Status st; + if (pinnable_val->size() < kTSLength) { + return Status::Corruption("Bad timestamp in key-value"); + } + // Erasing characters which hold the TS + pinnable_val->remove_suffix(kTSLength); + return st; +} + // Strips the TS from the end of the string Status DBWithTTLImpl::StripTS(std::string* str) { Status st; @@ -191,7 +202,7 @@ Status DBWithTTLImpl::Put(const WriteOptions& options, Status DBWithTTLImpl::Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) { + PinnableSlice* value) { Status st = db_->Get(options, column_family, key, value); if (!st.ok()) { return st; diff --git a/utilities/ttl/db_ttl_impl.h b/utilities/ttl/db_ttl_impl.h index a9b445e65..1023b87bd 100644 --- a/utilities/ttl/db_ttl_impl.h +++ b/utilities/ttl/db_ttl_impl.h @@ -51,7 +51,7 @@ class DBWithTTLImpl : public DBWithTTL { using StackableDB::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override; + PinnableSlice* value) override; using StackableDB::MultiGet; virtual std::vector MultiGet( @@ -87,6 +87,8 @@ class DBWithTTLImpl : public DBWithTTL { static Status StripTS(std::string* str); + static Status StripTS(PinnableSlice* str); + static const uint32_t kTSLength = sizeof(int32_t); // size of timestamp static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8