Add wide-column support to iterators (#10670)

Summary:
The patch extends the iterator API with a new `columns` method which
can be used to retrieve all wide columns for the current key. Similarly to
the `Get` and `GetEntity` APIs, the classic `value` API returns the value
of the default (anonymous) column for wide-column entities, and `columns`
returns an entity with a single default column for plain old key-values.
(The goal here is to maintain the invariant that `value()` is the same as
the value of the default column in `columns()`.) The patch also involves a
smaller refactoring: historically, `value()` was implemented using a bunch
of conditions, that is, the `Slice` to be returned was decided based on the
direction of the iteration, whether a merge had been done etc. when the
method was called; with the patch, the value to be exposed is stored in a
member `Slice value_` when the iterator lands on a new key, and `value()`
simply returns this `Slice`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10670

Test Plan: Ran `make check` and a simple blackbox crash test.

Reviewed By: riversand963

Differential Revision: D39475551

Pulled By: ltamasi

fbshipit-source-id: 29e7a6ed9ef340841aab36803b832b7c8f668b0b
main
Levi Tamasi 2 years ago committed by Facebook GitHub Bot
parent f291eefb02
commit 06ab0a8b40
  1. 1
      db/arena_wrapped_db_iter.h
  2. 131
      db/db_iter.cc
  3. 47
      db/db_iter.h
  4. 12
      db/wide/db_wide_basic_test.cc
  5. 2
      db/wide/wide_column_serialization.cc
  6. 22
      include/rocksdb/iterator.h
  7. 3
      include/rocksdb/wide_columns.h

@ -70,6 +70,7 @@ class ArenaWrappedDBIter : public Iterator {
void Prev() override { db_iter_->Prev(); }
Slice key() const override { return db_iter_->key(); }
Slice value() const override { return db_iter_->value(); }
const WideColumns& columns() const override { return db_iter_->columns(); }
Status status() const override { return db_iter_->status(); }
Slice timestamp() const override { return db_iter_->timestamp(); }
bool IsBlob() const { return db_iter_->IsBlob(); }

@ -76,7 +76,6 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options,
verify_checksums_(read_options.verify_checksums),
expose_blob_index_(expose_blob_index),
is_blob_(false),
is_wide_(false),
arena_mode_(arena_mode),
db_impl_(db_impl),
cfd_(cfd),
@ -134,7 +133,7 @@ void DBIter::Next() {
// Release temporarily pinned blocks from last operation
ReleaseTempPinnedData();
ResetBlobValue();
ResetWideColumnValue();
ResetValueAndColumns();
local_stats_.skip_count_ += num_internal_keys_skipped_;
local_stats_.skip_count_--;
num_internal_keys_skipped_ = 0;
@ -178,8 +177,6 @@ bool DBIter::SetBlobValueIfNeeded(const Slice& user_key,
const Slice& blob_index) {
assert(!is_blob_);
assert(blob_value_.empty());
assert(!is_wide_);
assert(value_of_default_column_.empty());
if (expose_blob_index_) { // Stacked BlobDB implementation
is_blob_ = true;
@ -215,16 +212,11 @@ bool DBIter::SetBlobValueIfNeeded(const Slice& user_key,
return true;
}
bool DBIter::SetWideColumnValueIfNeeded(const Slice& wide_columns_slice) {
assert(!is_blob_);
assert(blob_value_.empty());
assert(!is_wide_);
assert(value_of_default_column_.empty());
Slice wide_columns_copy = wide_columns_slice;
bool DBIter::SetValueAndColumnsFromEntity(Slice slice) {
assert(value_.empty());
assert(wide_columns_.empty());
const Status s = WideColumnSerialization::GetValueOfDefaultColumn(
wide_columns_copy, value_of_default_column_);
const Status s = WideColumnSerialization::Deserialize(slice, wide_columns_);
if (!s.ok()) {
status_ = s;
@ -232,7 +224,11 @@ bool DBIter::SetWideColumnValueIfNeeded(const Slice& wide_columns_slice) {
return false;
}
is_wide_ = true;
if (!wide_columns_.empty() &&
wide_columns_[0].name() == kDefaultWideColumnName) {
value_ = wide_columns_[0].value();
}
return true;
}
@ -282,11 +278,6 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
// to one.
bool reseek_done = false;
assert(!is_blob_);
assert(blob_value_.empty());
assert(!is_wide_);
assert(value_of_default_column_.empty());
do {
// Will update is_key_seqnum_zero_ as soon as we parsed the current key
// but we need to save the previous value to be used in the loop.
@ -376,35 +367,30 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
case kTypeWideColumnEntity:
if (timestamp_lb_) {
saved_key_.SetInternalKey(ikey_);
if (ikey_.type == kTypeBlobIndex) {
if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
return false;
}
} else if (ikey_.type == kTypeWideColumnEntity) {
if (!SetWideColumnValueIfNeeded(iter_.value())) {
return false;
}
}
valid_ = true;
return true;
} else {
saved_key_.SetUserKey(
ikey_.user_key, !pin_thru_lifetime_ ||
!iter_.iter()->IsKeyPinned() /* copy */);
if (ikey_.type == kTypeBlobIndex) {
if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
return false;
}
} else if (ikey_.type == kTypeWideColumnEntity) {
if (!SetWideColumnValueIfNeeded(iter_.value())) {
return false;
}
}
if (ikey_.type == kTypeBlobIndex) {
if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
return false;
}
valid_ = true;
return true;
SetValueAndColumnsFromPlain(expose_blob_index_ ? iter_.value()
: blob_value_);
} else if (ikey_.type == kTypeWideColumnEntity) {
if (!SetValueAndColumnsFromEntity(iter_.value())) {
return false;
}
} else {
assert(ikey_.type == kTypeValue);
SetValueAndColumnsFromPlain(iter_.value());
}
valid_ = true;
return true;
break;
case kTypeMerge:
saved_key_.SetUserKey(
@ -584,15 +570,12 @@ bool DBIter::MergeValuesNewToOld() {
return false;
}
valid_ = true;
const Slice blob_value = value();
Status s = Merge(&blob_value, ikey.user_key);
Status s = Merge(&blob_value_, ikey.user_key);
if (!s.ok()) {
return false;
}
ResetBlobValue();
assert(!is_wide_);
assert(value_of_default_column_.empty());
// iter_ is positioned after put
iter_.Next();
@ -640,7 +623,7 @@ void DBIter::Prev() {
PERF_CPU_TIMER_GUARD(iter_prev_cpu_nanos, clock_);
ReleaseTempPinnedData();
ResetBlobValue();
ResetWideColumnValue();
ResetValueAndColumns();
ResetInternalKeysSkippedCounter();
bool ok = true;
if (direction_ == kForward) {
@ -957,11 +940,6 @@ bool DBIter::FindValueForCurrentKey() {
Status s;
s.PermitUncheckedError();
assert(!is_blob_);
assert(blob_value_.empty());
assert(!is_wide_);
assert(value_of_default_column_.empty());
switch (last_key_entry_type) {
case kTypeDeletion:
case kTypeDeletionWithTimestamp:
@ -993,15 +971,12 @@ bool DBIter::FindValueForCurrentKey() {
return false;
}
valid_ = true;
const Slice blob_value = value();
s = Merge(&blob_value, saved_key_.GetUserKey());
s = Merge(&blob_value_, saved_key_.GetUserKey());
if (!s.ok()) {
return false;
}
ResetBlobValue();
assert(!is_wide_);
assert(value_of_default_column_.empty());
return true;
} else if (last_not_merge_type == kTypeWideColumnEntity) {
@ -1020,18 +995,24 @@ bool DBIter::FindValueForCurrentKey() {
}
break;
case kTypeValue:
// do nothing - we've already has value in pinned_value_
if (timestamp_lb_ != nullptr) {
saved_key_.SetInternalKey(saved_ikey_);
}
SetValueAndColumnsFromPlain(pinned_value_);
break;
case kTypeBlobIndex:
if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) {
return false;
}
SetValueAndColumnsFromPlain(expose_blob_index_ ? pinned_value_
: blob_value_);
break;
case kTypeWideColumnEntity:
if (!SetWideColumnValueIfNeeded(pinned_value_)) {
if (!SetValueAndColumnsFromEntity(pinned_value_)) {
return false;
}
break;
@ -1078,11 +1059,6 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
// Find the next value that's visible.
ParsedInternalKey ikey;
assert(!is_blob_);
assert(blob_value_.empty());
assert(!is_wide_);
assert(value_of_default_column_.empty());
while (true) {
if (!iter_.Valid()) {
valid_ = false;
@ -1141,10 +1117,16 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
if (!SetBlobValueIfNeeded(ikey.user_key, pinned_value_)) {
return false;
}
} else if (ikey_.type == kTypeWideColumnEntity) {
if (!SetWideColumnValueIfNeeded(pinned_value_)) {
SetValueAndColumnsFromPlain(expose_blob_index_ ? pinned_value_
: blob_value_);
} else if (ikey.type == kTypeWideColumnEntity) {
if (!SetValueAndColumnsFromEntity(pinned_value_)) {
return false;
}
} else {
assert(ikey.type == kTypeValue);
SetValueAndColumnsFromPlain(pinned_value_);
}
if (timestamp_lb_ != nullptr) {
@ -1208,15 +1190,12 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
return false;
}
valid_ = true;
const Slice blob_value = value();
Status s = Merge(&blob_value, saved_key_.GetUserKey());
Status s = Merge(&blob_value_, saved_key_.GetUserKey());
if (!s.ok()) {
return false;
}
ResetBlobValue();
assert(!is_wide_);
assert(value_of_default_column_.empty());
return true;
} else if (ikey.type == kTypeWideColumnEntity) {
@ -1267,6 +1246,10 @@ Status DBIter::Merge(const Slice* val, const Slice& user_key) {
status_ = s;
return s;
}
SetValueAndColumnsFromPlain(pinned_value_.data() ? pinned_value_
: saved_value_);
valid_ = true;
return s;
}
@ -1443,7 +1426,7 @@ void DBIter::Seek(const Slice& target) {
status_ = Status::OK();
ReleaseTempPinnedData();
ResetBlobValue();
ResetWideColumnValue();
ResetValueAndColumns();
ResetInternalKeysSkippedCounter();
// Seek the inner iterator based on the target key.
@ -1520,7 +1503,7 @@ void DBIter::SeekForPrev(const Slice& target) {
status_ = Status::OK();
ReleaseTempPinnedData();
ResetBlobValue();
ResetWideColumnValue();
ResetValueAndColumns();
ResetInternalKeysSkippedCounter();
// Seek the inner iterator based on the target key.
@ -1580,7 +1563,7 @@ void DBIter::SeekToFirst() {
direction_ = kForward;
ReleaseTempPinnedData();
ResetBlobValue();
ResetWideColumnValue();
ResetValueAndColumns();
ResetInternalKeysSkippedCounter();
ClearSavedValue();
is_key_seqnum_zero_ = false;
@ -1628,7 +1611,7 @@ void DBIter::SeekToLast() {
/*b_has_ts=*/false)) {
ReleaseTempPinnedData();
ResetBlobValue();
ResetWideColumnValue();
ResetValueAndColumns();
PrevInternal(nullptr);
k = key();
@ -1651,7 +1634,7 @@ void DBIter::SeekToLast() {
direction_ = kReverse;
ReleaseTempPinnedData();
ResetBlobValue();
ResetWideColumnValue();
ResetValueAndColumns();
ResetInternalKeysSkippedCounter();
ClearSavedValue();
is_key_seqnum_zero_ = false;

@ -17,6 +17,7 @@
#include "options/cf_options.h"
#include "rocksdb/db.h"
#include "rocksdb/iterator.h"
#include "rocksdb/wide_columns.h"
#include "table/iterator_wrapper.h"
#include "util/autovector.h"
@ -159,22 +160,16 @@ class DBIter final : public Iterator {
}
Slice value() const override {
assert(valid_);
assert(!is_blob_ || !is_wide_);
if (!expose_blob_index_ && is_blob_) {
return blob_value_;
} else if (is_wide_) {
return value_of_default_column_;
} else if (current_entry_is_merged_) {
// If pinned_value_ is set then the result of merge operator is one of
// the merge operands and we should return it.
return pinned_value_.data() ? pinned_value_ : saved_value_;
} else if (direction_ == kReverse) {
return pinned_value_;
} else {
return iter_.value();
}
return value_;
}
const WideColumns& columns() const override {
assert(valid_);
return wide_columns_;
}
Status status() const override {
if (status_.ok()) {
return iter_.status();
@ -307,11 +302,19 @@ class DBIter final : public Iterator {
blob_value_.Reset();
}
bool SetWideColumnValueIfNeeded(const Slice& wide_columns_slice);
void SetValueAndColumnsFromPlain(const Slice& slice) {
assert(value_.empty());
assert(wide_columns_.empty());
value_ = slice;
wide_columns_.emplace_back(kDefaultWideColumnName, slice);
}
bool SetValueAndColumnsFromEntity(Slice slice);
void ResetWideColumnValue() {
is_wide_ = false;
value_of_default_column_.clear();
void ResetValueAndColumns() {
value_.clear();
wide_columns_.clear();
}
Status Merge(const Slice* val, const Slice& user_key);
@ -338,7 +341,10 @@ class DBIter final : public Iterator {
Slice pinned_value_;
// for prefix seek mode to support prev()
PinnableSlice blob_value_;
Slice value_of_default_column_;
// Value of the default column
Slice value_;
// All columns (i.e. name-value pairs)
WideColumns wide_columns_;
Statistics* statistics_;
uint64_t max_skip_;
uint64_t max_skippable_internal_keys_;
@ -375,7 +381,6 @@ class DBIter final : public Iterator {
// the stacked BlobDB implementation is used, false otherwise.
bool expose_blob_index_;
bool is_blob_;
bool is_wide_;
bool arena_mode_;
// List of operands for merge operator.
MergeContext merge_context_;

@ -38,6 +38,9 @@ TEST_F(DBWideBasicTest, PutEntity) {
constexpr char third_value[] = "baz";
auto verify = [&]() {
const WideColumns expected_third_columns{
{kDefaultWideColumnName, third_value}};
{
PinnableSlice result;
ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), first_key,
@ -78,8 +81,7 @@ TEST_F(DBWideBasicTest, PutEntity) {
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
third_key, &result));
const WideColumns expected_columns{{kDefaultWideColumnName, third_value}};
ASSERT_EQ(result.columns(), expected_columns);
ASSERT_EQ(result.columns(), expected_third_columns);
}
{
@ -110,18 +112,21 @@ TEST_F(DBWideBasicTest, PutEntity) {
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), first_key);
ASSERT_EQ(iter->value(), first_value_of_default_column);
ASSERT_EQ(iter->columns(), first_columns);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), second_key);
ASSERT_TRUE(iter->value().empty());
ASSERT_EQ(iter->columns(), second_columns);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), third_key);
ASSERT_EQ(iter->value(), third_value);
ASSERT_EQ(iter->columns(), expected_third_columns);
iter->Next();
ASSERT_FALSE(iter->Valid());
@ -132,18 +137,21 @@ TEST_F(DBWideBasicTest, PutEntity) {
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), third_key);
ASSERT_EQ(iter->value(), third_value);
ASSERT_EQ(iter->columns(), expected_third_columns);
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), second_key);
ASSERT_TRUE(iter->value().empty());
ASSERT_EQ(iter->columns(), second_columns);
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), first_key);
ASSERT_EQ(iter->value(), first_value_of_default_column);
ASSERT_EQ(iter->columns(), first_columns);
iter->Prev();
ASSERT_FALSE(iter->Valid());

@ -17,6 +17,8 @@ namespace ROCKSDB_NAMESPACE {
const Slice kDefaultWideColumnName;
const WideColumns kNoWideColumns;
Status WideColumnSerialization::Serialize(const WideColumns& columns,
std::string& output) {
if (columns.size() >

@ -19,9 +19,11 @@
#pragma once
#include <string>
#include "rocksdb/cleanable.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/wide_columns.h"
namespace ROCKSDB_NAMESPACE {
@ -73,17 +75,29 @@ class Iterator : public Cleanable {
virtual void Prev() = 0;
// Return the key for the current entry. The underlying storage for
// the returned slice is valid only until the next modification of
// the iterator.
// the returned slice is valid only until the next modification of the
// iterator (i.e. the next SeekToFirst/SeekToLast/Seek/SeekForPrev/Next/Prev
// operation).
// REQUIRES: Valid()
virtual Slice key() const = 0;
// Return the value for the current entry. The underlying storage for
// the returned slice is valid only until the next modification of
// the iterator.
// the returned slice is valid only until the next modification of the
// iterator (i.e. the next SeekToFirst/SeekToLast/Seek/SeekForPrev/Next/Prev
// operation).
// REQUIRES: Valid()
virtual Slice value() const = 0;
// Return the wide columns for the current entry. The underlying storage for
// the returned structure is valid only until the next modification of the
// iterator (i.e. the next SeekToFirst/SeekToLast/Seek/SeekForPrev/Next/Prev
// operation).
// REQUIRES: Valid()
virtual const WideColumns& columns() const {
assert(false);
return kNoWideColumns;
}
// If an error has occurred, return it. Else return an ok status.
// If non-blocking IO is requested and this operation cannot be
// satisfied without doing some IO, then this returns Status::Incomplete().

@ -76,6 +76,9 @@ using WideColumns = std::vector<WideColumn>;
// The anonymous default wide column (an empty Slice).
extern const Slice kDefaultWideColumnName;
// An empty set of wide columns.
extern const WideColumns kNoWideColumns;
// A self-contained collection of wide columns. Used for the results of
// wide-column queries.
class PinnableWideColumns {

Loading…
Cancel
Save