PinnableSlice

Summary:
Currently the point lookup values are copied to a string provided by the user.
This incures an extra memcpy cost. This patch allows doing point lookup
via a PinnableSlice which pins the source memory location (instead of
copying their content) and releases them after the content is consumed
by the user. The old API of Get(string) is translated to the new API
underneath.

 Here is the summary for improvements:
 1. value 100 byte: 1.8%  regular, 1.2% merge values
 2. value 1k   byte: 11.5% regular, 7.5% merge values
 3. value 10k byte: 26% regular,    29.9% merge values

 The improvement for merge could be more if we extend this approach to
 pin the merge output and delay the full merge operation until the user
 actually needs it. We have put that for future work.

PS:
Sometimes we observe a small decrease in performance when switching from
t5452014 to this patch but with the old Get(string) API. The difference
is a little and could be noise. More importantly it is safely
cancelled
Closes https://github.com/facebook/rocksdb/pull/1732

Differential Revision: D4374613

Pulled By: maysamyabandeh

fbshipit-source-id: a077f1a
main
Maysam Yabandeh 8 years ago committed by Facebook Github Bot
parent e04480faed
commit 54d94e9c2c
  1. 13
      db/compacted_db_impl.cc
  2. 2
      db/compacted_db_impl.h
  3. 43
      db/db_impl.cc
  4. 4
      db/db_impl.h
  5. 6
      db/db_impl_readonly.cc
  6. 2
      db/db_impl_readonly.h
  7. 2
      db/db_test.cc
  8. 41
      db/memtable.cc
  9. 20
      db/memtable.h
  10. 26
      db/memtable_list.cc
  11. 31
      db/memtable_list.h
  12. 14
      db/version_set.cc
  13. 2
      db/version_set.h
  14. 73
      include/rocksdb/cleanable.h
  15. 13
      include/rocksdb/db.h
  16. 33
      include/rocksdb/iterator.h
  17. 57
      include/rocksdb/slice.h
  18. 4
      include/rocksdb/utilities/stackable_db.h
  19. 10
      table/block_based_table_reader.cc
  20. 120
      table/cleanable_test.cc
  21. 13
      table/cuckoo_table_reader_test.cc
  22. 50
      table/get_context.cc
  23. 7
      table/get_context.h
  24. 18
      table/iterator.cc
  25. 2
      table/table_reader_bench.cc
  26. 9
      table/table_test.cc
  27. 15
      tools/db_bench_tool.cc
  28. 49
      util/testutil.cc
  29. 2
      utilities/document/document_db.cc
  30. 19
      utilities/ttl/db_ttl_impl.cc
  31. 4
      utilities/ttl/db_ttl_impl.h

@ -42,10 +42,10 @@ size_t CompactedDBImpl::FindFile(const Slice& key) {
return right; return right;
} }
Status CompactedDBImpl::Get(const ReadOptions& options, Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
ColumnFamilyHandle*, const Slice& key, std::string* value) { const Slice& key, PinnableSlice* pSlice) {
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, key, value, nullptr, nullptr, GetContext::kNotFound, key, pSlice, nullptr, nullptr,
nullptr, nullptr); nullptr, nullptr);
LookupKey lkey(key, kMaxSequenceNumber); LookupKey lkey(key, kMaxSequenceNumber);
files_.files[FindFile(key)].fd.table_reader->Get( files_.files[FindFile(key)].fd.table_reader->Get(
@ -75,11 +75,14 @@ std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
int idx = 0; int idx = 0;
for (auto* r : reader_list) { for (auto* r : reader_list) {
if (r != nullptr) { if (r != nullptr) {
PinnableSlice pSlice;
std::string& value = (*values)[idx];
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, keys[idx], &(*values)[idx], GetContext::kNotFound, keys[idx], &pSlice, nullptr,
nullptr, nullptr, nullptr, nullptr); nullptr, nullptr, nullptr);
LookupKey lkey(keys[idx], kMaxSequenceNumber); LookupKey lkey(keys[idx], kMaxSequenceNumber);
r->Get(options, lkey.internal_key(), &get_context); r->Get(options, lkey.internal_key(), &get_context);
value.assign(pSlice.data(), pSlice.size());
if (get_context.State() == GetContext::kFound) { if (get_context.State() == GetContext::kFound) {
statuses[idx] = Status::OK(); statuses[idx] = Status::OK();
} }

@ -23,7 +23,7 @@ class CompactedDBImpl : public DBImpl {
using DB::Get; using DB::Get;
virtual Status Get(const ReadOptions& options, virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) override; PinnableSlice* value) override;
using DB::MultiGet; using DB::MultiGet;
virtual std::vector<Status> MultiGet( virtual std::vector<Status> MultiGet(
const ReadOptions& options, const ReadOptions& options,

@ -3907,8 +3907,8 @@ ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
Status DBImpl::Get(const ReadOptions& read_options, Status DBImpl::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) { PinnableSlice* pSlice) {
return GetImpl(read_options, column_family, key, value); return GetImpl(read_options, column_family, key, pSlice);
} }
// JobContext gets created and destructed outside of the lock -- // JobContext gets created and destructed outside of the lock --
@ -3965,7 +3965,7 @@ SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork(
Status DBImpl::GetImpl(const ReadOptions& read_options, Status DBImpl::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
std::string* value, bool* value_found) { PinnableSlice* pSlice, bool* value_found) {
StopWatch sw(env_, stats_, DB_GET); StopWatch sw(env_, stats_, DB_GET);
PERF_TIMER_GUARD(get_snapshot_time); PERF_TIMER_GUARD(get_snapshot_time);
@ -3996,12 +3996,12 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
(read_options.read_tier == kPersistedTier && has_unpersisted_data_); (read_options.read_tier == kPersistedTier && has_unpersisted_data_);
bool done = false; bool done = false;
if (!skip_memtable) { if (!skip_memtable) {
if (sv->mem->Get(lkey, value, &s, &merge_context, &range_del_agg, if (sv->mem->Get(lkey, pSlice, &s, &merge_context, &range_del_agg,
read_options)) { read_options)) {
done = true; done = true;
RecordTick(stats_, MEMTABLE_HIT); RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) && } else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->Get(lkey, value, &s, &merge_context, &range_del_agg, sv->imm->Get(lkey, pSlice, &s, &merge_context, &range_del_agg,
read_options)) { read_options)) {
done = true; done = true;
RecordTick(stats_, MEMTABLE_HIT); RecordTick(stats_, MEMTABLE_HIT);
@ -4012,7 +4012,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
} }
if (!done) { if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time); PERF_TIMER_GUARD(get_from_output_files_time);
sv->current->Get(read_options, lkey, value, &s, &merge_context, sv->current->Get(read_options, lkey, pSlice, &s, &merge_context,
&range_del_agg, value_found); &range_del_agg, value_found);
RecordTick(stats_, MEMTABLE_MISS); RecordTick(stats_, MEMTABLE_MISS);
} }
@ -4023,8 +4023,9 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
ReturnAndCleanupSuperVersion(cfd, sv); ReturnAndCleanupSuperVersion(cfd, sv);
RecordTick(stats_, NUMBER_KEYS_READ); RecordTick(stats_, NUMBER_KEYS_READ);
RecordTick(stats_, BYTES_READ, value->size()); size_t size = pSlice->size();
MeasureTime(stats_, BYTES_PER_READ, value->size()); RecordTick(stats_, BYTES_READ, size);
MeasureTime(stats_, BYTES_PER_READ, size);
} }
return s; return s;
} }
@ -4100,26 +4101,30 @@ std::vector<Status> DBImpl::MultiGet(
bool skip_memtable = bool skip_memtable =
(read_options.read_tier == kPersistedTier && has_unpersisted_data_); (read_options.read_tier == kPersistedTier && has_unpersisted_data_);
bool done = false; bool done = false;
PinnableSlice pSlice;
if (!skip_memtable) { if (!skip_memtable) {
if (super_version->mem->Get(lkey, value, &s, &merge_context, if (super_version->mem->Get(lkey, &pSlice, &s, &merge_context,
&range_del_agg, read_options)) { &range_del_agg, read_options)) {
value->assign(pSlice.data(), pSlice.size());
done = true; done = true;
// TODO(?): RecordTick(stats_, MEMTABLE_HIT)? // TODO(?): RecordTick(stats_, MEMTABLE_HIT)?
} else if (super_version->imm->Get(lkey, value, &s, &merge_context, } else if (super_version->imm->Get(lkey, &pSlice, &s, &merge_context,
&range_del_agg, read_options)) { &range_del_agg, read_options)) {
value->assign(pSlice.data(), pSlice.size());
done = true; done = true;
// TODO(?): RecordTick(stats_, MEMTABLE_HIT)? // TODO(?): RecordTick(stats_, MEMTABLE_HIT)?
} }
} }
if (!done) { if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time); PERF_TIMER_GUARD(get_from_output_files_time);
super_version->current->Get(read_options, lkey, value, &s, &merge_context, super_version->current->Get(read_options, lkey, &pSlice, &s,
&range_del_agg); &merge_context, &range_del_agg);
value->assign(pSlice.data(), pSlice.size());
// TODO(?): RecordTick(stats_, MEMTABLE_MISS)? // TODO(?): RecordTick(stats_, MEMTABLE_MISS)?
} }
if (s.ok()) { if (s.ok()) {
bytes_read += value->size(); bytes_read += pSlice.size();
} }
} }
@ -4332,7 +4337,12 @@ bool DBImpl::KeyMayExist(const ReadOptions& read_options,
} }
ReadOptions roptions = read_options; ReadOptions roptions = read_options;
roptions.read_tier = kBlockCacheTier; // read from block cache only roptions.read_tier = kBlockCacheTier; // read from block cache only
auto s = GetImpl(roptions, column_family, key, value, value_found); PinnableSlice pSlice;
PinnableSlice* pSlicePtr = value != nullptr ? &pSlice : nullptr;
auto s = GetImpl(roptions, column_family, key, pSlicePtr, value_found);
if (value != nullptr) {
value->assign(pSlice.data(), pSlice.size());
}
// If block_cache is enabled and the index block of the table didn't // If block_cache is enabled and the index block of the table didn't
// not present in block_cache, the return value will be Status::Incomplete. // not present in block_cache, the return value will be Status::Incomplete.
@ -6395,7 +6405,8 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
*found_record_for_key = false; *found_record_for_key = false;
// Check if there is a record for this key in the latest memtable // Check if there is a record for this key in the latest memtable
sv->mem->Get(lkey, nullptr, &s, &merge_context, &range_del_agg, seq, PinnableSlice* pSliceNullPtr = nullptr;
sv->mem->Get(lkey, pSliceNullPtr, &s, &merge_context, &range_del_agg, seq,
read_options); read_options);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
@ -6414,7 +6425,7 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
} }
// Check if there is a record for this key in the immutable memtables // Check if there is a record for this key in the immutable memtables
sv->imm->Get(lkey, nullptr, &s, &merge_context, &range_del_agg, seq, sv->imm->Get(lkey, pSliceNullPtr, &s, &merge_context, &range_del_agg, seq,
read_options); read_options);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {

@ -91,7 +91,7 @@ class DBImpl : public DB {
using DB::Get; using DB::Get;
virtual Status Get(const ReadOptions& options, virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) override; PinnableSlice* value) override;
using DB::MultiGet; using DB::MultiGet;
virtual std::vector<Status> MultiGet( virtual std::vector<Status> MultiGet(
const ReadOptions& options, const ReadOptions& options,
@ -1088,7 +1088,7 @@ class DBImpl : public DB {
// Function that Get and KeyMayExist call with no_io true or false // Function that Get and KeyMayExist call with no_io true or false
// Note: 'value_found' from KeyMayExist propagates here // Note: 'value_found' from KeyMayExist propagates here
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, std::string* value, const Slice& key, PinnableSlice* pSlice,
bool* value_found = nullptr); bool* value_found = nullptr);
bool GetIntPropertyInternal(ColumnFamilyData* cfd, bool GetIntPropertyInternal(ColumnFamilyData* cfd,

@ -31,7 +31,7 @@ DBImplReadOnly::~DBImplReadOnly() {
// Implementations of the DB interface // Implementations of the DB interface
Status DBImplReadOnly::Get(const ReadOptions& read_options, Status DBImplReadOnly::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) { PinnableSlice* pSlice) {
Status s; Status s;
SequenceNumber snapshot = versions_->LastSequence(); SequenceNumber snapshot = versions_->LastSequence();
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
@ -40,11 +40,11 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options,
MergeContext merge_context; MergeContext merge_context;
RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot); RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot);
LookupKey lkey(key, snapshot); LookupKey lkey(key, snapshot);
if (super_version->mem->Get(lkey, value, &s, &merge_context, &range_del_agg, if (super_version->mem->Get(lkey, pSlice, &s, &merge_context, &range_del_agg,
read_options)) { read_options)) {
} else { } else {
PERF_TIMER_GUARD(get_from_output_files_time); PERF_TIMER_GUARD(get_from_output_files_time);
super_version->current->Get(read_options, lkey, value, &s, &merge_context, super_version->current->Get(read_options, lkey, pSlice, &s, &merge_context,
&range_del_agg); &range_del_agg);
} }
return s; return s;

@ -22,7 +22,7 @@ class DBImplReadOnly : public DBImpl {
using DB::Get; using DB::Get;
virtual Status Get(const ReadOptions& options, virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) override; PinnableSlice* value) override;
// TODO: Implement ReadOnly MultiGet? // TODO: Implement ReadOnly MultiGet?

@ -2691,7 +2691,7 @@ class ModelDB : public DB {
} }
using DB::Get; using DB::Get;
virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* cf, virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* cf,
const Slice& key, std::string* value) override { const Slice& key, PinnableSlice* pSlice) override {
return Status::NotSupported(key); return Status::NotSupported(key);
} }

@ -520,7 +520,7 @@ struct Saver {
const LookupKey* key; const LookupKey* key;
bool* found_final_value; // Is value set correctly? Used by KeyMayExist bool* found_final_value; // Is value set correctly? Used by KeyMayExist
bool* merge_in_progress; bool* merge_in_progress;
std::string* value; PinnableSlice* pSlice;
SequenceNumber seq; SequenceNumber seq;
const MergeOperator* merge_operator; const MergeOperator* merge_operator;
// the merge operations encountered; // the merge operations encountered;
@ -534,6 +534,10 @@ struct Saver {
}; };
} // namespace } // namespace
//static void UnrefMemTable(void* s, void*) {
// reinterpret_cast<MemTable*>(s)->Unref();
//}
static bool SaveValue(void* arg, const char* entry) { static bool SaveValue(void* arg, const char* entry) {
Saver* s = reinterpret_cast<Saver*>(arg); Saver* s = reinterpret_cast<Saver*>(arg);
MergeContext* merge_context = s->merge_context; MergeContext* merge_context = s->merge_context;
@ -571,13 +575,18 @@ static bool SaveValue(void* arg, const char* entry) {
} }
Slice v = GetLengthPrefixedSlice(key_ptr + key_length); Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*(s->status) = Status::OK(); *(s->status) = Status::OK();
if (*(s->merge_in_progress)) { if (LIKELY(s->pSlice != nullptr)) {
*(s->status) = MergeHelper::TimedFullMerge( if (*(s->merge_in_progress)) {
merge_operator, s->key->user_key(), &v, *(s->status) = MergeHelper::TimedFullMerge(
merge_context->GetOperands(), s->value, s->logger, s->statistics, merge_operator, s->key->user_key(), &v,
s->env_); merge_context->GetOperands(), s->pSlice->GetSelf(), s->logger,
} else if (s->value != nullptr) { s->statistics, s->env_);
s->value->assign(v.data(), v.size()); s->pSlice->PinSelf();
} else {
//s->mem->Ref();
//s->pSlice->PinSlice(v, UnrefMemTable, s->mem, nullptr);
s->pSlice->PinSelf(v);
}
} }
if (s->inplace_update_support) { if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadUnlock(); s->mem->GetLock(s->key->user_key())->ReadUnlock();
@ -589,10 +598,14 @@ static bool SaveValue(void* arg, const char* entry) {
case kTypeSingleDeletion: case kTypeSingleDeletion:
case kTypeRangeDeletion: { case kTypeRangeDeletion: {
if (*(s->merge_in_progress)) { if (*(s->merge_in_progress)) {
*(s->status) = MergeHelper::TimedFullMerge( *(s->status) = Status::OK();
merge_operator, s->key->user_key(), nullptr, if (s->pSlice != nullptr) {
merge_context->GetOperands(), s->value, s->logger, s->statistics, *(s->status) = MergeHelper::TimedFullMerge(
s->env_); merge_operator, s->key->user_key(), nullptr,
merge_context->GetOperands(), s->pSlice->GetSelf(), s->logger,
s->statistics, s->env_);
s->pSlice->PinSelf();
}
} else { } else {
*(s->status) = Status::NotFound(); *(s->status) = Status::NotFound();
} }
@ -626,7 +639,7 @@ static bool SaveValue(void* arg, const char* entry) {
return false; return false;
} }
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, bool MemTable::Get(const LookupKey& key, PinnableSlice* pSlice, Status* s,
MergeContext* merge_context, MergeContext* merge_context,
RangeDelAggregator* range_del_agg, SequenceNumber* seq, RangeDelAggregator* range_del_agg, SequenceNumber* seq,
const ReadOptions& read_opts) { const ReadOptions& read_opts) {
@ -664,7 +677,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
saver.found_final_value = &found_final_value; saver.found_final_value = &found_final_value;
saver.merge_in_progress = &merge_in_progress; saver.merge_in_progress = &merge_in_progress;
saver.key = &key; saver.key = &key;
saver.value = value; saver.pSlice = pSlice;
saver.seq = kMaxSequenceNumber; saver.seq = kMaxSequenceNumber;
saver.mem = this; saver.mem = this;
saver.merge_context = merge_context; saver.merge_context = merge_context;

@ -186,17 +186,27 @@ class MemTable {
// returned). Otherwise, *seq will be set to kMaxSequenceNumber. // returned). Otherwise, *seq will be set to kMaxSequenceNumber.
// On success, *s may be set to OK, NotFound, or MergeInProgress. Any other // On success, *s may be set to OK, NotFound, or MergeInProgress. Any other
// status returned indicates a corruption or other unexpected error. // status returned indicates a corruption or other unexpected error.
bool Get(const LookupKey& key, std::string* value, Status* s, bool Get(const LookupKey& key, PinnableSlice* pSlice, Status* s,
MergeContext* merge_context, RangeDelAggregator* range_del_agg, MergeContext* merge_context, RangeDelAggregator* range_del_agg,
SequenceNumber* seq, const ReadOptions& read_opts); SequenceNumber* seq, const ReadOptions& read_opts);
bool Get(const LookupKey& key, std::string* value, Status* s, inline bool Get(const LookupKey& key, PinnableSlice* pSlice, Status* s,
MergeContext* merge_context, RangeDelAggregator* range_del_agg, MergeContext* merge_context,
const ReadOptions& read_opts) { RangeDelAggregator* range_del_agg,
const ReadOptions& read_opts) {
SequenceNumber seq; SequenceNumber seq;
return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts); return Get(key, pSlice, s, merge_context, range_del_agg, &seq, read_opts);
} }
// deprecated. Use Get with PinnableSlice
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
SequenceNumber* seq, const ReadOptions& read_opts);
// deprecated. Use Get with PinnableSlice
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
const ReadOptions& read_opts);
// Attempts to update the new_value inplace, else does normal Add // Attempts to update the new_value inplace, else does normal Add
// Pseudocode // Pseudocode
// if key exists in current memtable && prev_value is of type kTypeValue // if key exists in current memtable && prev_value is of type kTypeValue

@ -100,12 +100,12 @@ int MemTableList::NumFlushed() const {
// Search all the memtables starting from the most recent one. // Search all the memtables starting from the most recent one.
// Return the most recent value found, if any. // Return the most recent value found, if any.
// Operands stores the list of merge operations to apply, so far. // Operands stores the list of merge operations to apply, so far.
bool MemTableListVersion::Get(const LookupKey& key, std::string* value, bool MemTableListVersion::Get(const LookupKey& key, PinnableSlice* pSlice,
Status* s, MergeContext* merge_context, Status* s, MergeContext* merge_context,
RangeDelAggregator* range_del_agg, RangeDelAggregator* range_del_agg,
SequenceNumber* seq, SequenceNumber* seq,
const ReadOptions& read_opts) { const ReadOptions& read_opts) {
return GetFromList(&memlist_, key, value, s, merge_context, range_del_agg, return GetFromList(&memlist_, key, pSlice, s, merge_context, range_del_agg,
seq, read_opts); seq, read_opts);
} }
@ -115,22 +115,26 @@ bool MemTableListVersion::GetFromHistory(const LookupKey& key,
RangeDelAggregator* range_del_agg, RangeDelAggregator* range_del_agg,
SequenceNumber* seq, SequenceNumber* seq,
const ReadOptions& read_opts) { const ReadOptions& read_opts) {
return GetFromList(&memlist_history_, key, value, s, merge_context, PinnableSlice pSlice;
range_del_agg, seq, read_opts); PinnableSlice* pSlicePtr = value != nullptr ? &pSlice : nullptr;
auto res = GetFromList(&memlist_history_, key, pSlicePtr, s, merge_context,
range_del_agg, seq, read_opts);
if (value != nullptr) {
value->assign(pSlice.data(), pSlice.size());
}
return res;
} }
bool MemTableListVersion::GetFromList(std::list<MemTable*>* list, bool MemTableListVersion::GetFromList(
const LookupKey& key, std::string* value, std::list<MemTable*>* list, const LookupKey& key, PinnableSlice* pSlice,
Status* s, MergeContext* merge_context, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg,
RangeDelAggregator* range_del_agg, SequenceNumber* seq, const ReadOptions& read_opts) {
SequenceNumber* seq,
const ReadOptions& read_opts) {
*seq = kMaxSequenceNumber; *seq = kMaxSequenceNumber;
for (auto& memtable : *list) { for (auto& memtable : *list) {
SequenceNumber current_seq = kMaxSequenceNumber; SequenceNumber current_seq = kMaxSequenceNumber;
bool done = memtable->Get(key, value, s, merge_context, range_del_agg, bool done = memtable->Get(key, pSlice, s, merge_context, range_del_agg,
&current_seq, read_opts); &current_seq, read_opts);
if (*seq == kMaxSequenceNumber) { if (*seq == kMaxSequenceNumber) {
// Store the most recent sequence number of any operation on this key. // Store the most recent sequence number of any operation on this key.

@ -53,17 +53,27 @@ class MemTableListVersion {
// If any operation was found for this key, its most recent sequence number // If any operation was found for this key, its most recent sequence number
// will be stored in *seq on success (regardless of whether true/false is // will be stored in *seq on success (regardless of whether true/false is
// returned). Otherwise, *seq will be set to kMaxSequenceNumber. // returned). Otherwise, *seq will be set to kMaxSequenceNumber.
bool Get(const LookupKey& key, std::string* value, Status* s, bool Get(const LookupKey& key, PinnableSlice* pSlice, Status* s,
MergeContext* merge_context, RangeDelAggregator* range_del_agg, MergeContext* merge_context, RangeDelAggregator* range_del_agg,
SequenceNumber* seq, const ReadOptions& read_opts); SequenceNumber* seq, const ReadOptions& read_opts);
bool Get(const LookupKey& key, std::string* value, Status* s, inline bool Get(const LookupKey& key, PinnableSlice* pSlice, Status* s,
MergeContext* merge_context, RangeDelAggregator* range_del_agg, MergeContext* merge_context,
const ReadOptions& read_opts) { RangeDelAggregator* range_del_agg,
const ReadOptions& read_opts) {
SequenceNumber seq; SequenceNumber seq;
return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts); return Get(key, pSlice, s, merge_context, range_del_agg, &seq, read_opts);
} }
// deprecated. Use Get with PinnableSlice
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
SequenceNumber* seq, const ReadOptions& read_opts);
// deprecated. Use Get with PinnableSlice
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
const ReadOptions& read_opts);
// Similar to Get(), but searches the Memtable history of memtables that // Similar to Get(), but searches the Memtable history of memtables that
// have already been flushed. Should only be used from in-memory only // have already been flushed. Should only be used from in-memory only
// queries (such as Transaction validation) as the history may contain // queries (such as Transaction validation) as the history may contain
@ -72,10 +82,10 @@ class MemTableListVersion {
MergeContext* merge_context, MergeContext* merge_context,
RangeDelAggregator* range_del_agg, SequenceNumber* seq, RangeDelAggregator* range_del_agg, SequenceNumber* seq,
const ReadOptions& read_opts); const ReadOptions& read_opts);
bool GetFromHistory(const LookupKey& key, std::string* value, Status* s, inline bool GetFromHistory(const LookupKey& key, std::string* value,
MergeContext* merge_context, Status* s, MergeContext* merge_context,
RangeDelAggregator* range_del_agg, RangeDelAggregator* range_del_agg,
const ReadOptions& read_opts) { const ReadOptions& read_opts) {
SequenceNumber seq; SequenceNumber seq;
return GetFromHistory(key, value, s, merge_context, range_del_agg, &seq, return GetFromHistory(key, value, s, merge_context, range_del_agg, &seq,
read_opts); read_opts);
@ -112,7 +122,8 @@ class MemTableListVersion {
void TrimHistory(autovector<MemTable*>* to_delete); void TrimHistory(autovector<MemTable*>* to_delete);
bool GetFromList(std::list<MemTable*>* list, const LookupKey& key, bool GetFromList(std::list<MemTable*>* list, const LookupKey& key,
std::string* value, Status* s, MergeContext* merge_context, PinnableSlice* pSlice, Status* s,
MergeContext* merge_context,
RangeDelAggregator* range_del_agg, SequenceNumber* seq, RangeDelAggregator* range_del_agg, SequenceNumber* seq,
const ReadOptions& read_opts); const ReadOptions& read_opts);

@ -928,7 +928,7 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
version_number_(version_number) {} version_number_(version_number) {}
void Version::Get(const ReadOptions& read_options, const LookupKey& k, void Version::Get(const ReadOptions& read_options, const LookupKey& k,
std::string* value, Status* status, PinnableSlice* pSlice, Status* status,
MergeContext* merge_context, MergeContext* merge_context,
RangeDelAggregator* range_del_agg, bool* value_found, RangeDelAggregator* range_del_agg, bool* value_found,
bool* key_exists, SequenceNumber* seq) { bool* key_exists, SequenceNumber* seq) {
@ -946,7 +946,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
GetContext get_context( GetContext get_context(
user_comparator(), merge_operator_, info_log_, db_statistics_, user_comparator(), merge_operator_, info_log_, db_statistics_,
status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
value, value_found, merge_context, range_del_agg, this->env_, seq, pSlice, value_found, merge_context, range_del_agg, this->env_, seq,
merge_operator_ ? &pinned_iters_mgr : nullptr); merge_operator_ ? &pinned_iters_mgr : nullptr);
// Pin blocks that we read to hold merge operands // Pin blocks that we read to hold merge operands
@ -1005,9 +1005,13 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
} }
// merge_operands are in saver and we hit the beginning of the key history // merge_operands are in saver and we hit the beginning of the key history
// do a final merge of nullptr and operands; // do a final merge of nullptr and operands;
*status = MergeHelper::TimedFullMerge(merge_operator_, user_key, nullptr, std::string* str_value = pSlice != nullptr ? pSlice->GetSelf() : nullptr;
merge_context->GetOperands(), value, *status = MergeHelper::TimedFullMerge(
info_log_, db_statistics_, env_); merge_operator_, user_key, nullptr, merge_context->GetOperands(),
str_value, info_log_, db_statistics_, env_);
if (LIKELY(pSlice != nullptr)) {
pSlice->PinSelf();
}
} else { } else {
if (key_exists != nullptr) { if (key_exists != nullptr) {
*key_exists = false; *key_exists = false;

@ -457,7 +457,7 @@ class Version {
// for the key if a key was found. // for the key if a key was found.
// //
// REQUIRES: lock is not held // REQUIRES: lock is not held
void Get(const ReadOptions&, const LookupKey& key, std::string* val, void Get(const ReadOptions&, const LookupKey& key, PinnableSlice* pSlice,
Status* status, MergeContext* merge_context, Status* status, MergeContext* merge_context,
RangeDelAggregator* range_del_agg, bool* value_found = nullptr, RangeDelAggregator* range_del_agg, bool* value_found = nullptr,
bool* key_exists = nullptr, SequenceNumber* seq = nullptr); bool* key_exists = nullptr, SequenceNumber* seq = nullptr);

@ -0,0 +1,73 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// An iterator yields a sequence of key/value pairs from a source.
// The following class defines the interface. Multiple implementations
// are provided by this library. In particular, iterators are provided
// to access the contents of a Table or a DB.
//
// Multiple threads can invoke const methods on an Iterator without
// external synchronization, but if any of the threads may call a
// non-const method, all threads accessing the same Iterator must use
// external synchronization.
#ifndef INCLUDE_ROCKSDB_CLEANABLE_H_
#define INCLUDE_ROCKSDB_CLEANABLE_H_
namespace rocksdb {
class Cleanable {
public:
Cleanable();
~Cleanable();
// Clients are allowed to register function/arg1/arg2 triples that
// will be invoked when this iterator is destroyed.
//
// Note that unlike all of the preceding methods, this method is
// not abstract and therefore clients should not override it.
typedef void (*CleanupFunction)(void* arg1, void* arg2);
void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2);
void DelegateCleanupsTo(Cleanable* other);
// DoCkeanup and also resets the pointers for reuse
inline void Reset() {
DoCleanup();
cleanup_.function = nullptr;
cleanup_.next = nullptr;
}
protected:
struct Cleanup {
CleanupFunction function;
void* arg1;
void* arg2;
Cleanup* next;
};
Cleanup cleanup_;
// It also becomes the owner of c
void RegisterCleanup(Cleanup* c);
private:
// Performs all the cleanups. It does not reset the pointers. Making it
// private
// to prevent misuse
inline void DoCleanup() {
if (cleanup_.function != nullptr) {
(*cleanup_.function)(cleanup_.arg1, cleanup_.arg2);
for (Cleanup* c = cleanup_.next; c != nullptr;) {
(*c->function)(c->arg1, c->arg2);
Cleanup* next = c->next;
delete c;
c = next;
}
}
}
};
} // namespace rocksdb
#endif // INCLUDE_ROCKSDB_CLEANABLE_H_

@ -280,9 +280,20 @@ class DB {
// a status for which Status::IsNotFound() returns true. // a status for which Status::IsNotFound() returns true.
// //
// May return some other Status on an error. // May return some other Status on an error.
inline Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) {
PinnableSlice pSlice;
PinnableSlice* pSlicePtr = value != nullptr ? &pSlice : nullptr;
auto s = Get(options, column_family, key, pSlicePtr);
if (value != nullptr && s.ok()) {
value->assign(pSlice.data(), pSlice.size());
}
return s;
}
virtual Status Get(const ReadOptions& options, virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) = 0; PinnableSlice* value) = 0;
virtual Status Get(const ReadOptions& options, const Slice& key, std::string* value) { virtual Status Get(const ReadOptions& options, const Slice& key, std::string* value) {
return Get(options, DefaultColumnFamily(), key, value); return Get(options, DefaultColumnFamily(), key, value);
} }

@ -20,43 +20,12 @@
#define STORAGE_ROCKSDB_INCLUDE_ITERATOR_H_ #define STORAGE_ROCKSDB_INCLUDE_ITERATOR_H_
#include <string> #include <string>
#include "rocksdb/cleanable.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
namespace rocksdb { namespace rocksdb {
class Cleanable {
public:
Cleanable();
~Cleanable();
// Clients are allowed to register function/arg1/arg2 triples that
// will be invoked when this iterator is destroyed.
//
// Note that unlike all of the preceding methods, this method is
// not abstract and therefore clients should not override it.
typedef void (*CleanupFunction)(void* arg1, void* arg2);
void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2);
void DelegateCleanupsTo(Cleanable* other);
// DoCleanup and also resets the pointers for reuse
void Reset();
protected:
struct Cleanup {
CleanupFunction function;
void* arg1;
void* arg2;
Cleanup* next;
};
Cleanup cleanup_;
// It also becomes the owner of c
void RegisterCleanup(Cleanup* c);
private:
// Performs all the cleanups. It does not reset the pointers. Making it
// private to prevent misuse
inline void DoCleanup();
};
class Iterator : public Cleanable { class Iterator : public Cleanable {
public: public:
Iterator() {} Iterator() {}

@ -25,6 +25,8 @@
#include <string.h> #include <string.h>
#include <string> #include <string>
#include "rocksdb/cleanable.h"
namespace rocksdb { namespace rocksdb {
class Slice { class Slice {
@ -116,6 +118,61 @@ class Slice {
// Intentionally copyable // Intentionally copyable
}; };
class PinnableSlice : public Slice, public Cleanable {
public:
PinnableSlice() {}
inline void PinSlice(const Slice& s, CleanupFunction f, void* arg1,
void* arg2) {
data_ = s.data();
size_ = s.size();
RegisterCleanup(f, arg1, arg2);
}
inline void PinSlice(const Slice& s, Cleanable* cleanable) {
data_ = s.data();
size_ = s.size();
cleanable->DelegateCleanupsTo(this);
}
inline void PinHeap(const char* s, const size_t n) {
data_ = s;
size_ = n;
RegisterCleanup(ReleaseCharStrHeap, const_cast<char*>(data_), nullptr);
}
inline void PinHeap(std::string* s) {
data_ = s->data();
size_ = s->size();
RegisterCleanup(ReleaseStringHeap, s, nullptr);
}
inline void PinSelf(const Slice& slice) {
self_space.assign(slice.data(), slice.size());
data_ = self_space.data();
size_ = self_space.size();
}
inline void PinSelf() {
data_ = self_space.data();
size_ = self_space.size();
}
inline std::string* GetSelf() { return &self_space; }
inline bool IsPinned() { return cleanup_.function != nullptr; }
private:
friend class PinnableSlice4Test;
std::string self_space;
static void ReleaseCharStrHeap(void* s, void*) {
delete reinterpret_cast<const char*>(s);
}
static void ReleaseStringHeap(void* s, void*) {
delete reinterpret_cast<const std::string*>(s);
}
};
// A set of Slices that are virtually concatenated together. 'parts' points // A set of Slices that are virtually concatenated together. 'parts' points
// to an array of Slices. The number of elements in the array is 'num_parts'. // to an array of Slices. The number of elements in the array is 'num_parts'.
struct SliceParts { struct SliceParts {

@ -56,8 +56,8 @@ class StackableDB : public DB {
using DB::Get; using DB::Get;
virtual Status Get(const ReadOptions& options, virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) override { PinnableSlice* pSlice) override {
return db_->Get(options, column_family, key, value); return db_->Get(options, column_family, key, pSlice);
} }
using DB::MultiGet; using DB::MultiGet;

@ -1537,9 +1537,6 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
BlockIter iiter; BlockIter iiter;
NewIndexIterator(read_options, &iiter); NewIndexIterator(read_options, &iiter);
PinnedIteratorsManager* pinned_iters_mgr = get_context->pinned_iters_mgr();
bool pin_blocks = pinned_iters_mgr && pinned_iters_mgr->PinningEnabled();
bool done = false; bool done = false;
for (iiter.Seek(key); iiter.Valid() && !done; iiter.Next()) { for (iiter.Seek(key); iiter.Valid() && !done; iiter.Next()) {
Slice handle_value = iiter.value(); Slice handle_value = iiter.value();
@ -1580,17 +1577,12 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
s = Status::Corruption(Slice()); s = Status::Corruption(Slice());
} }
if (!get_context->SaveValue(parsed_key, biter.value(), pin_blocks)) { if (!get_context->SaveValue(parsed_key, biter.value(), &biter)) {
done = true; done = true;
break; break;
} }
} }
s = biter.status(); s = biter.status();
if (pin_blocks && get_context->State() == GetContext::kMerge) {
// Pin blocks as long as we are merging
biter.DelegateCleanupsTo(pinned_iters_mgr);
}
} }
} }
if (s.ok()) { if (s.ok()) {

@ -47,6 +47,30 @@ TEST_F(CleanableTest, Register) {
} }
// ~Cleanable // ~Cleanable
ASSERT_EQ(6, res); ASSERT_EQ(6, res);
// Test the Reset does cleanup
res = 1;
{
Cleanable c1;
c1.RegisterCleanup(Multiplier, &res, &n2); // res = 2;
c1.RegisterCleanup(Multiplier, &res, &n3); // res = 2 * 3;
c1.Reset();
ASSERT_EQ(6, res);
}
// ~Cleanable
ASSERT_EQ(6, res);
// Test Clenable is usable after Reset
res = 1;
{
Cleanable c1;
c1.RegisterCleanup(Multiplier, &res, &n2); // res = 2;
c1.Reset();
ASSERT_EQ(2, res);
c1.RegisterCleanup(Multiplier, &res, &n3); // res = 2 * 3;
}
// ~Cleanable
ASSERT_EQ(6, res);
} }
// the first Cleanup is on stack and the rest on heap, // the first Cleanup is on stack and the rest on heap,
@ -174,6 +198,102 @@ TEST_F(CleanableTest, Delegation) {
ASSERT_EQ(5, res); ASSERT_EQ(5, res);
} }
class PinnableSlice4Test : public PinnableSlice {
public:
void TestCharStrIsRegistered(char* s) {
ASSERT_EQ(cleanup_.function, ReleaseCharStrHeap);
ASSERT_EQ(cleanup_.arg1, s);
ASSERT_EQ(cleanup_.arg2, nullptr);
ASSERT_EQ(cleanup_.next, nullptr);
}
void TestStringIsRegistered(std::string* s) {
ASSERT_EQ(cleanup_.function, ReleaseStringHeap);
ASSERT_EQ(cleanup_.arg1, s);
ASSERT_EQ(cleanup_.arg2, nullptr);
ASSERT_EQ(cleanup_.next, nullptr);
}
};
// Putting the PinnableSlice tests here due to similarity to Cleanable tests
TEST_F(CleanableTest, PinnableSlice) {
int n2 = 2;
int res = 1;
const std::string const_str = "123";
const char* const_s = "123";
{
PinnableSlice4Test pSlice;
char* s = strdup(const_s);
pSlice.PinHeap(s, strlen(const_s));
std::string str;
str.assign(pSlice.data(), pSlice.size());
ASSERT_EQ(const_s, str);
pSlice.TestCharStrIsRegistered(s);
}
{
PinnableSlice4Test pSlice;
std::string* heap_str = new std::string(const_str);
pSlice.PinHeap(heap_str);
std::string str;
str.assign(pSlice.data(), pSlice.size());
ASSERT_EQ(const_str, str);
pSlice.TestStringIsRegistered(heap_str);
}
{
res = 1;
PinnableSlice4Test pSlice;
Slice slice(const_str);
pSlice.PinSlice(slice, Multiplier, &res, &n2);
std::string str;
str.assign(pSlice.data(), pSlice.size());
ASSERT_EQ(const_str, str);
}
// ~Cleanable
ASSERT_EQ(2, res);
{
res = 1;
PinnableSlice4Test pSlice;
Slice slice(const_str);
{
Cleanable c1;
c1.RegisterCleanup(Multiplier, &res, &n2); // res = 2;
pSlice.PinSlice(slice, &c1);
}
// ~Cleanable
ASSERT_EQ(1, res); // cleanups must have be delegated to pSlice
std::string str;
str.assign(pSlice.data(), pSlice.size());
ASSERT_EQ(const_str, str);
}
// ~Cleanable
ASSERT_EQ(2, res);
{
PinnableSlice4Test pSlice;
Slice slice(const_str);
pSlice.PinSelf(slice);
std::string str;
str.assign(pSlice.data(), pSlice.size());
ASSERT_EQ(const_str, str);
ASSERT_EQ(false, pSlice.IsPinned()); // self pinned
}
{
PinnableSlice4Test pSlice;
std::string* self_str_ptr = pSlice.GetSelf();
self_str_ptr->assign(const_str);
pSlice.PinSelf();
std::string str;
str.assign(pSlice.data(), pSlice.size());
ASSERT_EQ(const_str, str);
ASSERT_EQ(false, pSlice.IsPinned()); // self pinned
}
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -123,12 +123,12 @@ class CuckooReaderTest : public testing::Test {
ASSERT_OK(reader.status()); ASSERT_OK(reader.status());
// Assume no merge/deletion // Assume no merge/deletion
for (uint32_t i = 0; i < num_items; ++i) { for (uint32_t i = 0; i < num_items; ++i) {
std::string value; PinnableSlice value;
GetContext get_context(ucomp, nullptr, nullptr, nullptr, GetContext get_context(ucomp, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(user_keys[i]), &value, GetContext::kNotFound, Slice(user_keys[i]), &value,
nullptr, nullptr, nullptr, nullptr); nullptr, nullptr, nullptr, nullptr);
ASSERT_OK(reader.Get(ReadOptions(), Slice(keys[i]), &get_context)); ASSERT_OK(reader.Get(ReadOptions(), Slice(keys[i]), &get_context));
ASSERT_EQ(values[i], value); ASSERT_STREQ(values[i].c_str(), value.data());
} }
} }
void UpdateKeys(bool with_zero_seqno) { void UpdateKeys(bool with_zero_seqno) {
@ -333,7 +333,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) {
AddHashLookups(not_found_user_key, 0, kNumHashFunc); AddHashLookups(not_found_user_key, 0, kNumHashFunc);
ParsedInternalKey ikey(not_found_user_key, 1000, kTypeValue); ParsedInternalKey ikey(not_found_user_key, 1000, kTypeValue);
AppendInternalKey(&not_found_key, ikey); AppendInternalKey(&not_found_key, ikey);
std::string value; PinnableSlice value;
GetContext get_context(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound, GetContext get_context(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound,
Slice(not_found_key), &value, nullptr, nullptr, Slice(not_found_key), &value, nullptr, nullptr,
nullptr, nullptr); nullptr, nullptr);
@ -346,6 +346,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) {
ParsedInternalKey ikey2(not_found_user_key2, 1000, kTypeValue); ParsedInternalKey ikey2(not_found_user_key2, 1000, kTypeValue);
std::string not_found_key2; std::string not_found_key2;
AppendInternalKey(&not_found_key2, ikey2); AppendInternalKey(&not_found_key2, ikey2);
value.Reset();
GetContext get_context2(ucmp, nullptr, nullptr, nullptr, GetContext get_context2(ucmp, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(not_found_key2), &value, GetContext::kNotFound, Slice(not_found_key2), &value,
nullptr, nullptr, nullptr, nullptr); nullptr, nullptr, nullptr, nullptr);
@ -360,6 +361,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) {
// Add hash values that map to empty buckets. // Add hash values that map to empty buckets.
AddHashLookups(ExtractUserKey(unused_key).ToString(), AddHashLookups(ExtractUserKey(unused_key).ToString(),
kNumHashFunc, kNumHashFunc); kNumHashFunc, kNumHashFunc);
value.Reset();
GetContext get_context3(ucmp, nullptr, nullptr, nullptr, GetContext get_context3(ucmp, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(unused_key), &value, GetContext::kNotFound, Slice(unused_key), &value,
nullptr, nullptr, nullptr, nullptr); nullptr, nullptr, nullptr, nullptr);
@ -433,12 +435,13 @@ void WriteFile(const std::vector<std::string>& keys,
test::Uint64Comparator(), nullptr); test::Uint64Comparator(), nullptr);
ASSERT_OK(reader.status()); ASSERT_OK(reader.status());
ReadOptions r_options; ReadOptions r_options;
std::string value; PinnableSlice value;
// Assume only the fast path is triggered // Assume only the fast path is triggered
GetContext get_context(nullptr, nullptr, nullptr, nullptr, GetContext get_context(nullptr, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(), &value, nullptr, GetContext::kNotFound, Slice(), &value, nullptr,
nullptr, nullptr, nullptr); nullptr, nullptr, nullptr);
for (uint64_t i = 0; i < num; ++i) { for (uint64_t i = 0; i < num; ++i) {
value.Reset();
value.clear(); value.clear();
ASSERT_OK(reader.Get(r_options, Slice(keys[i]), &get_context)); ASSERT_OK(reader.Get(r_options, Slice(keys[i]), &get_context));
ASSERT_TRUE(Slice(keys[i]) == Slice(&keys[i][0], 4)); ASSERT_TRUE(Slice(keys[i]) == Slice(&keys[i][0], 4));
@ -480,7 +483,7 @@ void ReadKeys(uint64_t num, uint32_t batch_size) {
} }
std::random_shuffle(keys.begin(), keys.end()); std::random_shuffle(keys.begin(), keys.end());
std::string value; PinnableSlice value;
// Assume only the fast path is triggered // Assume only the fast path is triggered
GetContext get_context(nullptr, nullptr, nullptr, nullptr, GetContext get_context(nullptr, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(), &value, nullptr, GetContext::kNotFound, Slice(), &value, nullptr,

@ -35,7 +35,7 @@ void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) {
GetContext::GetContext(const Comparator* ucmp, GetContext::GetContext(const Comparator* ucmp,
const MergeOperator* merge_operator, Logger* logger, const MergeOperator* merge_operator, Logger* logger,
Statistics* statistics, GetState init_state, Statistics* statistics, GetState init_state,
const Slice& user_key, std::string* ret_value, const Slice& user_key, PinnableSlice* pSlice,
bool* value_found, MergeContext* merge_context, bool* value_found, MergeContext* merge_context,
RangeDelAggregator* _range_del_agg, Env* env, RangeDelAggregator* _range_del_agg, Env* env,
SequenceNumber* seq, SequenceNumber* seq,
@ -46,7 +46,7 @@ GetContext::GetContext(const Comparator* ucmp,
statistics_(statistics), statistics_(statistics),
state_(init_state), state_(init_state),
user_key_(user_key), user_key_(user_key),
value_(ret_value), pSlice_(pSlice),
value_found_(value_found), value_found_(value_found),
merge_context_(merge_context), merge_context_(merge_context),
range_del_agg_(_range_del_agg), range_del_agg_(_range_del_agg),
@ -76,13 +76,13 @@ void GetContext::SaveValue(const Slice& value, SequenceNumber seq) {
appendToReplayLog(replay_log_, kTypeValue, value); appendToReplayLog(replay_log_, kTypeValue, value);
state_ = kFound; state_ = kFound;
if (value_ != nullptr) { if (LIKELY(pSlice_ != nullptr)) {
value_->assign(value.data(), value.size()); pSlice_->PinSelf(value);
} }
} }
bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
const Slice& value, bool value_pinned) { const Slice& value, Cleanable* value_pinner) {
assert((state_ != kMerge && parsed_key.type != kTypeMerge) || assert((state_ != kMerge && parsed_key.type != kTypeMerge) ||
merge_context_ != nullptr); merge_context_ != nullptr);
if (ucmp_->Equal(parsed_key.user_key, user_key_)) { if (ucmp_->Equal(parsed_key.user_key, user_key_)) {
@ -106,17 +106,22 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
assert(state_ == kNotFound || state_ == kMerge); assert(state_ == kNotFound || state_ == kMerge);
if (kNotFound == state_) { if (kNotFound == state_) {
state_ = kFound; state_ = kFound;
if (value_ != nullptr) { if (LIKELY(pSlice_ != nullptr)) {
value_->assign(value.data(), value.size()); if (LIKELY(value_pinner != nullptr)) {
pSlice_->PinSlice(value, value_pinner);
} else {
pSlice_->PinSelf(value);
}
} }
} else if (kMerge == state_) { } else if (kMerge == state_) {
assert(merge_operator_ != nullptr); assert(merge_operator_ != nullptr);
state_ = kFound; state_ = kFound;
if (value_ != nullptr) { if (LIKELY(pSlice_ != nullptr)) {
Status merge_status = MergeHelper::TimedFullMerge( Status merge_status = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, &value, merge_operator_, user_key_, &value,
merge_context_->GetOperands(), value_, logger_, statistics_, merge_context_->GetOperands(), pSlice_->GetSelf(), logger_,
env_); statistics_, env_);
pSlice_->PinSelf();
if (!merge_status.ok()) { if (!merge_status.ok()) {
state_ = kCorrupt; state_ = kCorrupt;
} }
@ -134,12 +139,12 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
state_ = kDeleted; state_ = kDeleted;
} else if (kMerge == state_) { } else if (kMerge == state_) {
state_ = kFound; state_ = kFound;
if (value_ != nullptr) { if (LIKELY(pSlice_ != nullptr)) {
Status merge_status = Status merge_status = MergeHelper::TimedFullMerge(
MergeHelper::TimedFullMerge(merge_operator_, user_key_, nullptr, merge_operator_, user_key_, nullptr,
merge_context_->GetOperands(), merge_context_->GetOperands(), pSlice_->GetSelf(), logger_,
value_, logger_, statistics_, env_); statistics_, env_);
pSlice_->PinSelf();
if (!merge_status.ok()) { if (!merge_status.ok()) {
state_ = kCorrupt; state_ = kCorrupt;
} }
@ -150,7 +155,14 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
case kTypeMerge: case kTypeMerge:
assert(state_ == kNotFound || state_ == kMerge); assert(state_ == kNotFound || state_ == kMerge);
state_ = kMerge; state_ = kMerge;
merge_context_->PushOperand(value, value_pinned); // value_pinner is not set from plain_table_reader.cc for example.
if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() &&
value_pinner != nullptr) {
value_pinner->DelegateCleanupsTo(pinned_iters_mgr());
merge_context_->PushOperand(value, true /*value_pinned*/);
} else {
merge_context_->PushOperand(value, false);
}
return true; return true;
default: default:
@ -166,6 +178,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
void replayGetContextLog(const Slice& replay_log, const Slice& user_key, void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
GetContext* get_context) { GetContext* get_context) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
static Cleanable nonToClean;
Slice s = replay_log; Slice s = replay_log;
while (s.size()) { while (s.size()) {
auto type = static_cast<ValueType>(*s.data()); auto type = static_cast<ValueType>(*s.data());
@ -178,7 +191,8 @@ void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
// Since SequenceNumber is not stored and unknown, we will use // Since SequenceNumber is not stored and unknown, we will use
// kMaxSequenceNumber. // kMaxSequenceNumber.
get_context->SaveValue( get_context->SaveValue(
ParsedInternalKey(user_key, kMaxSequenceNumber, type), value, true); ParsedInternalKey(user_key, kMaxSequenceNumber, type), value,
&nonToClean);
} }
#else // ROCKSDB_LITE #else // ROCKSDB_LITE
assert(false); assert(false);

@ -9,6 +9,7 @@
#include "db/range_del_aggregator.h" #include "db/range_del_aggregator.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/types.h" #include "rocksdb/types.h"
#include "table/block.h"
namespace rocksdb { namespace rocksdb {
class MergeContext; class MergeContext;
@ -26,7 +27,7 @@ class GetContext {
GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, GetContext(const Comparator* ucmp, const MergeOperator* merge_operator,
Logger* logger, Statistics* statistics, GetState init_state, Logger* logger, Statistics* statistics, GetState init_state,
const Slice& user_key, std::string* ret_value, bool* value_found, const Slice& user_key, PinnableSlice* pSlice, bool* value_found,
MergeContext* merge_context, RangeDelAggregator* range_del_agg, MergeContext* merge_context, RangeDelAggregator* range_del_agg,
Env* env, SequenceNumber* seq = nullptr, Env* env, SequenceNumber* seq = nullptr,
PinnedIteratorsManager* _pinned_iters_mgr = nullptr); PinnedIteratorsManager* _pinned_iters_mgr = nullptr);
@ -39,7 +40,7 @@ class GetContext {
// Returns True if more keys need to be read (due to merges) or // Returns True if more keys need to be read (due to merges) or
// False if the complete value has been found. // False if the complete value has been found.
bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value, bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value,
bool value_pinned = false); Cleanable* value_pinner = nullptr);
// Simplified version of the previous function. Should only be used when we // Simplified version of the previous function. Should only be used when we
// know that the operation is a Put. // know that the operation is a Put.
@ -68,7 +69,7 @@ class GetContext {
GetState state_; GetState state_;
Slice user_key_; Slice user_key_;
std::string* value_; PinnableSlice* pSlice_;
bool* value_found_; // Is value set correctly? Used by KeyMayExist bool* value_found_; // Is value set correctly? Used by KeyMayExist
MergeContext* merge_context_; MergeContext* merge_context_;
RangeDelAggregator* range_del_agg_; RangeDelAggregator* range_del_agg_;

@ -21,24 +21,6 @@ Cleanable::Cleanable() {
Cleanable::~Cleanable() { DoCleanup(); } Cleanable::~Cleanable() { DoCleanup(); }
void Cleanable::Reset() {
DoCleanup();
cleanup_.function = nullptr;
cleanup_.next = nullptr;
}
void Cleanable::DoCleanup() {
if (cleanup_.function != nullptr) {
(*cleanup_.function)(cleanup_.arg1, cleanup_.arg2);
for (Cleanup* c = cleanup_.next; c != nullptr;) {
(*c->function)(c->arg1, c->arg2);
Cleanup* next = c->next;
delete c;
c = next;
}
}
}
// If the entire linked list was on heap we could have simply add attach one // If the entire linked list was on heap we could have simply add attach one
// link list to another. However the head is an embeded object to avoid the cost // link list to another. However the head is an embeded object to avoid the cost
// of creating objects for most of the use cases when the Cleanable has only one // of creating objects for most of the use cases when the Cleanable has only one

@ -166,7 +166,7 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options,
std::string key = MakeKey(r1, r2, through_db); std::string key = MakeKey(r1, r2, through_db);
uint64_t start_time = Now(env, measured_by_nanosecond); uint64_t start_time = Now(env, measured_by_nanosecond);
if (!through_db) { if (!through_db) {
std::string value; PinnableSlice value;
MergeContext merge_context; MergeContext merge_context;
RangeDelAggregator range_del_agg(ikc, {} /* snapshots */); RangeDelAggregator range_del_agg(ikc, {} /* snapshots */);
GetContext get_context(ioptions.user_comparator, GetContext get_context(ioptions.user_comparator,

@ -1968,12 +1968,12 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) {
ASSERT_OK(c3.Reopen(ioptions4)); ASSERT_OK(c3.Reopen(ioptions4));
reader = dynamic_cast<BlockBasedTable*>(c3.GetTableReader()); reader = dynamic_cast<BlockBasedTable*>(c3.GetTableReader());
ASSERT_TRUE(!reader->TEST_filter_block_preloaded()); ASSERT_TRUE(!reader->TEST_filter_block_preloaded());
std::string value; PinnableSlice value;
GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, user_key, &value, nullptr, GetContext::kNotFound, user_key, &value, nullptr,
nullptr, nullptr, nullptr); nullptr, nullptr, nullptr);
ASSERT_OK(reader->Get(ReadOptions(), user_key, &get_context)); ASSERT_OK(reader->Get(ReadOptions(), user_key, &get_context));
ASSERT_EQ(value, "hello"); ASSERT_STREQ(value.data(), "hello");
BlockCachePropertiesSnapshot props(options.statistics.get()); BlockCachePropertiesSnapshot props(options.statistics.get());
props.AssertFilterBlockStat(0, 0); props.AssertFilterBlockStat(0, 0);
c3.ResetTableReader(); c3.ResetTableReader();
@ -2051,7 +2051,7 @@ TEST_F(BlockBasedTableTest, BlockReadCountTest) {
c.Finish(options, ioptions, table_options, c.Finish(options, ioptions, table_options,
GetPlainInternalComparator(options.comparator), &keys, &kvmap); GetPlainInternalComparator(options.comparator), &keys, &kvmap);
auto reader = c.GetTableReader(); auto reader = c.GetTableReader();
std::string value; PinnableSlice value;
GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, user_key, &value, nullptr, GetContext::kNotFound, user_key, &value, nullptr,
nullptr, nullptr, nullptr); nullptr, nullptr, nullptr);
@ -2065,13 +2065,14 @@ TEST_F(BlockBasedTableTest, BlockReadCountTest) {
ASSERT_EQ(perf_context.block_read_count, 1); ASSERT_EQ(perf_context.block_read_count, 1);
} }
ASSERT_EQ(get_context.State(), GetContext::kFound); ASSERT_EQ(get_context.State(), GetContext::kFound);
ASSERT_EQ(value, "hello"); ASSERT_STREQ(value.data(), "hello");
// Get non-existing key // Get non-existing key
user_key = "does-not-exist"; user_key = "does-not-exist";
internal_key = InternalKey(user_key, 0, kTypeValue); internal_key = InternalKey(user_key, 0, kTypeValue);
encoded_key = internal_key.Encode().ToString(); encoded_key = internal_key.Encode().ToString();
value.Reset();
get_context = GetContext(options.comparator, nullptr, nullptr, nullptr, get_context = GetContext(options.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, user_key, &value, nullptr, GetContext::kNotFound, user_key, &value, nullptr,
nullptr, nullptr, nullptr); nullptr, nullptr, nullptr);

@ -229,6 +229,8 @@ DEFINE_bool(reverse_iterator, false,
DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator"); DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator");
DEFINE_bool(pin_slice, false, "use pinnable slice for point lookup");
DEFINE_int64(batch_size, 1, "Batch size"); DEFINE_int64(batch_size, 1, "Batch size");
static bool ValidateKeySize(const char* flagname, int32_t value) { static bool ValidateKeySize(const char* flagname, int32_t value) {
@ -3772,6 +3774,7 @@ class Benchmark {
std::unique_ptr<const char[]> key_guard; std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard); Slice key = AllocateKey(&key_guard);
std::string value; std::string value;
PinnableSlice pSlice;
Duration duration(FLAGS_duration, reads_); Duration duration(FLAGS_duration, reads_);
while (!duration.Done(1)) { while (!duration.Done(1)) {
@ -3787,11 +3790,19 @@ class Benchmark {
s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key, s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key,
&value); &value);
} else { } else {
s = db_with_cfh->db->Get(options, key, &value); if (FLAGS_pin_slice == 1) {
pSlice.Reset();
s = db_with_cfh->db->Get(
options, db_with_cfh->db->DefaultColumnFamily(), key, &pSlice);
} else {
s = db_with_cfh->db->Get(
options, db_with_cfh->db->DefaultColumnFamily(), key, &value);
}
} }
if (s.ok()) { if (s.ok()) {
found++; found++;
bytes += key.size() + value.size(); bytes +=
key.size() + (FLAGS_pin_slice == 1 ? pSlice.size() : value.size());
} else if (!s.IsNotFound()) { } else if (!s.IsNotFound()) {
fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str()); fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
abort(); abort();

@ -12,10 +12,59 @@
#include <cctype> #include <cctype>
#include <sstream> #include <sstream>
#include "db/memtable_list.h"
#include "port/port.h" #include "port/port.h"
#include "util/file_reader_writer.h" #include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {
// These methods are deprecated and only being used in tests for backward
// compatibility
bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
Status* s, MergeContext* merge_context,
RangeDelAggregator* range_del_agg,
SequenceNumber* seq,
const ReadOptions& read_opts) {
PinnableSlice pSlice;
PinnableSlice* pSlicePtr = value != nullptr ? &pSlice : nullptr;
auto res =
Get(key, pSlicePtr, s, merge_context, range_del_agg, seq, read_opts);
if (value != nullptr) {
value->assign(pSlice.data(), pSlice.size());
}
return res;
}
bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
Status* s, MergeContext* merge_context,
RangeDelAggregator* range_del_agg,
const ReadOptions& read_opts) {
SequenceNumber seq;
return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts);
}
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context,
RangeDelAggregator* range_del_agg, SequenceNumber* seq,
const ReadOptions& read_opts) {
PinnableSlice pSlice;
PinnableSlice* pSlicePtr = value != nullptr ? &pSlice : nullptr;
auto res =
Get(key, pSlicePtr, s, merge_context, range_del_agg, seq, read_opts);
if (value != nullptr) {
value->assign(pSlice.data(), pSlice.size());
}
return res;
}
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context,
RangeDelAggregator* range_del_agg,
const ReadOptions& read_opts) {
SequenceNumber seq;
return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts);
}
namespace test { namespace test {
Slice RandomString(Random* rnd, int len, std::string* dst) { Slice RandomString(Random* rnd, int len, std::string* dst) {

@ -1039,7 +1039,7 @@ class DocumentDBImpl : public DocumentDB {
// RocksDB functions // RocksDB functions
virtual Status Get(const ReadOptions& options, virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) override { PinnableSlice* pSlice) override {
return Status::NotSupported(""); return Status::NotSupported("");
} }
virtual Status Get(const ReadOptions& options, const Slice& key, virtual Status Get(const ReadOptions& options, const Slice& key,

@ -170,6 +170,17 @@ bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl, Env* env) {
return (timestamp_value + ttl) < curtime; return (timestamp_value + ttl) < curtime;
} }
// Strips the TS from the end of the slice
Status DBWithTTLImpl::StripTS(Slice* slice) {
Status st;
if (slice->size() < kTSLength) {
return Status::Corruption("Bad timestamp in key-value");
}
// Erasing characters which hold the TS
slice->remove_suffix(kTSLength);
return st;
}
// Strips the TS from the end of the string // Strips the TS from the end of the string
Status DBWithTTLImpl::StripTS(std::string* str) { Status DBWithTTLImpl::StripTS(std::string* str) {
Status st; Status st;
@ -191,16 +202,16 @@ Status DBWithTTLImpl::Put(const WriteOptions& options,
Status DBWithTTLImpl::Get(const ReadOptions& options, Status DBWithTTLImpl::Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) { PinnableSlice* pSlice) {
Status st = db_->Get(options, column_family, key, value); Status st = db_->Get(options, column_family, key, pSlice);
if (!st.ok()) { if (!st.ok()) {
return st; return st;
} }
st = SanityCheckTimestamp(*value); st = SanityCheckTimestamp(*pSlice);
if (!st.ok()) { if (!st.ok()) {
return st; return st;
} }
return StripTS(value); return StripTS(pSlice);
} }
std::vector<Status> DBWithTTLImpl::MultiGet( std::vector<Status> DBWithTTLImpl::MultiGet(

@ -51,7 +51,7 @@ class DBWithTTLImpl : public DBWithTTL {
using StackableDB::Get; using StackableDB::Get;
virtual Status Get(const ReadOptions& options, virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) override; PinnableSlice* pSlice) override;
using StackableDB::MultiGet; using StackableDB::MultiGet;
virtual std::vector<Status> MultiGet( virtual std::vector<Status> MultiGet(
@ -87,6 +87,8 @@ class DBWithTTLImpl : public DBWithTTL {
static Status StripTS(std::string* str); static Status StripTS(std::string* str);
static Status StripTS(Slice* str);
static const uint32_t kTSLength = sizeof(int32_t); // size of timestamp static const uint32_t kTSLength = sizeof(int32_t); // size of timestamp
static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8 static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8

Loading…
Cancel
Save