Support Merge with wide-column entities in iterator (#10941)

Summary:
The patch adds `Merge` support for wide-column entities in `DBIter`. As before, the `Merge` operation is applied to the default column of the entity; any other columns are unchanged. As a small cleanup, the PR also changes the signature of `DBIter::Merge` to simply return a boolean instead of the `Merge` operation's `Status` since the actual `Status` is already stored in a member variable.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10941

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D41195471

Pulled By: ltamasi

fbshipit-source-id: 362cf555897296e252c3de5ddfbd569ef34f85ef
main
Levi Tamasi 2 years ago committed by Facebook GitHub Bot
parent 9460d4b77e
commit dbc4101b89
  1. 89
      db/db_iter.cc
  2. 3
      db/db_iter.h
  3. 62
      db/wide/db_wide_basic_test.cc

@ -546,8 +546,7 @@ bool DBIter::MergeValuesNewToOld() {
// hit a put, merge the put value with operands and store the // hit a put, merge the put value with operands and store the
// final result in saved_value_. We are done! // final result in saved_value_. We are done!
const Slice val = iter_.value(); const Slice val = iter_.value();
Status s = Merge(&val, ikey.user_key); if (!Merge(&val, ikey.user_key)) {
if (!s.ok()) {
return false; return false;
} }
// iter_ is positioned after put // iter_ is positioned after put
@ -576,8 +575,7 @@ bool DBIter::MergeValuesNewToOld() {
return false; return false;
} }
valid_ = true; valid_ = true;
Status s = Merge(&blob_value_, ikey.user_key); if (!Merge(&blob_value_, ikey.user_key)) {
if (!s.ok()) {
return false; return false;
} }
@ -591,11 +589,18 @@ bool DBIter::MergeValuesNewToOld() {
} }
return true; return true;
} else if (kTypeWideColumnEntity == ikey.type) { } else if (kTypeWideColumnEntity == ikey.type) {
// TODO: support wide-column entities if (!MergeEntity(iter_.value(), ikey.user_key)) {
status_ = Status::NotSupported( return false;
"Merge currently not supported for wide-column entities"); }
valid_ = false;
return false; // iter_ is positioned after put
iter_.Next();
if (!iter_.status().ok()) {
valid_ = false;
return false;
}
return true;
} else { } else {
valid_ = false; valid_ = false;
status_ = Status::Corruption( status_ = Status::Corruption(
@ -614,8 +619,7 @@ bool DBIter::MergeValuesNewToOld() {
// a deletion marker. // a deletion marker.
// feed null as the existing value to the merge operator, such that // feed null as the existing value to the merge operator, such that
// client can differentiate this scenario and do things accordingly. // client can differentiate this scenario and do things accordingly.
Status s = Merge(nullptr, saved_key_.GetUserKey()); if (!Merge(nullptr, saved_key_.GetUserKey())) {
if (!s.ok()) {
return false; return false;
} }
assert(status_.ok()); assert(status_.ok());
@ -964,8 +968,7 @@ bool DBIter::FindValueForCurrentKey() {
if (last_not_merge_type == kTypeDeletion || if (last_not_merge_type == kTypeDeletion ||
last_not_merge_type == kTypeSingleDeletion || last_not_merge_type == kTypeSingleDeletion ||
last_not_merge_type == kTypeDeletionWithTimestamp) { last_not_merge_type == kTypeDeletionWithTimestamp) {
s = Merge(nullptr, saved_key_.GetUserKey()); if (!Merge(nullptr, saved_key_.GetUserKey())) {
if (!s.ok()) {
return false; return false;
} }
return true; return true;
@ -980,8 +983,7 @@ bool DBIter::FindValueForCurrentKey() {
return false; return false;
} }
valid_ = true; valid_ = true;
s = Merge(&blob_value_, saved_key_.GetUserKey()); if (!Merge(&blob_value_, saved_key_.GetUserKey())) {
if (!s.ok()) {
return false; return false;
} }
@ -989,15 +991,14 @@ bool DBIter::FindValueForCurrentKey() {
return true; return true;
} else if (last_not_merge_type == kTypeWideColumnEntity) { } else if (last_not_merge_type == kTypeWideColumnEntity) {
// TODO: support wide-column entities if (!MergeEntity(pinned_value_, saved_key_.GetUserKey())) {
status_ = Status::NotSupported( return false;
"Merge currently not supported for wide-column entities"); }
valid_ = false;
return false; return true;
} else { } else {
assert(last_not_merge_type == kTypeValue); assert(last_not_merge_type == kTypeValue);
s = Merge(&pinned_value_, saved_key_.GetUserKey()); if (!Merge(&pinned_value_, saved_key_.GetUserKey())) {
if (!s.ok()) {
return false; return false;
} }
return true; return true;
@ -1181,8 +1182,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
if (ikey.type == kTypeValue) { if (ikey.type == kTypeValue) {
const Slice val = iter_.value(); const Slice val = iter_.value();
Status s = Merge(&val, saved_key_.GetUserKey()); if (!Merge(&val, saved_key_.GetUserKey())) {
if (!s.ok()) {
return false; return false;
} }
return true; return true;
@ -1201,8 +1201,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
return false; return false;
} }
valid_ = true; valid_ = true;
Status s = Merge(&blob_value_, saved_key_.GetUserKey()); if (!Merge(&blob_value_, saved_key_.GetUserKey())) {
if (!s.ok()) {
return false; return false;
} }
@ -1210,11 +1209,11 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
return true; return true;
} else if (ikey.type == kTypeWideColumnEntity) { } else if (ikey.type == kTypeWideColumnEntity) {
// TODO: support wide-column entities if (!MergeEntity(iter_.value(), saved_key_.GetUserKey())) {
status_ = Status::NotSupported( return false;
"Merge currently not supported for wide-column entities"); }
valid_ = false;
return false; return true;
} else { } else {
valid_ = false; valid_ = false;
status_ = Status::Corruption( status_ = Status::Corruption(
@ -1224,8 +1223,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
} }
} }
Status s = Merge(nullptr, saved_key_.GetUserKey()); if (!Merge(nullptr, saved_key_.GetUserKey())) {
if (!s.ok()) {
return false; return false;
} }
@ -1248,7 +1246,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
return true; return true;
} }
Status DBIter::Merge(const Slice* val, const Slice& user_key) { bool DBIter::Merge(const Slice* val, const Slice& user_key) {
Status s = MergeHelper::TimedFullMerge( Status s = MergeHelper::TimedFullMerge(
merge_operator_, user_key, val, merge_context_.GetOperands(), merge_operator_, user_key, val, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, clock_, &pinned_value_, &saved_value_, logger_, statistics_, clock_, &pinned_value_,
@ -1256,14 +1254,33 @@ Status DBIter::Merge(const Slice* val, const Slice& user_key) {
if (!s.ok()) { if (!s.ok()) {
valid_ = false; valid_ = false;
status_ = s; status_ = s;
return s; return false;
} }
SetValueAndColumnsFromPlain(pinned_value_.data() ? pinned_value_ SetValueAndColumnsFromPlain(pinned_value_.data() ? pinned_value_
: saved_value_); : saved_value_);
valid_ = true; valid_ = true;
return s; return true;
}
bool DBIter::MergeEntity(const Slice& entity, const Slice& user_key) {
Status s = MergeHelper::TimedFullMergeWithEntity(
merge_operator_, user_key, entity, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, clock_,
/* update_num_ops_stats */ true);
if (!s.ok()) {
valid_ = false;
status_ = s;
return false;
}
if (!SetValueAndColumnsFromEntity(saved_value_)) {
return false;
}
valid_ = true;
return true;
} }
// Move backwards until the key smaller than saved_key_. // Move backwards until the key smaller than saved_key_.

@ -318,7 +318,8 @@ class DBIter final : public Iterator {
} }
// If user-defined timestamp is enabled, `user_key` includes timestamp. // If user-defined timestamp is enabled, `user_key` includes timestamp.
Status Merge(const Slice* val, const Slice& user_key); bool Merge(const Slice* val, const Slice& user_key);
bool MergeEntity(const Slice& entity, const Slice& user_key);
const SliceTransform* prefix_extractor_; const SliceTransform* prefix_extractor_;
Env* const env_; Env* const env_;

@ -399,6 +399,16 @@ TEST_F(DBWideBasicTest, MergeEntity) {
auto verify = [&]() { auto verify = [&]() {
const std::string first_expected_default( const std::string first_expected_default(
first_columns[0].value().ToString() + delim + first_merge_operand); first_columns[0].value().ToString() + delim + first_merge_operand);
WideColumns first_expected_columns{
{kDefaultWideColumnName, first_expected_default},
first_columns[1],
first_columns[2]};
const std::string second_expected_default(delim + second_merge_operand);
WideColumns second_expected_columns{
{kDefaultWideColumnName, second_expected_default},
second_columns[0],
second_columns[1]};
{ {
PinnableSlice result; PinnableSlice result;
@ -411,13 +421,7 @@ TEST_F(DBWideBasicTest, MergeEntity) {
PinnableWideColumns result; PinnableWideColumns result;
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
first_key, &result)); first_key, &result));
ASSERT_EQ(result.columns(), first_expected_columns);
WideColumns expected_columns{
{kDefaultWideColumnName, first_expected_default},
first_columns[1],
first_columns[2]};
ASSERT_EQ(result.columns(), expected_columns);
} }
{ {
@ -438,8 +442,6 @@ TEST_F(DBWideBasicTest, MergeEntity) {
ASSERT_EQ(merge_operands[1], first_merge_operand); ASSERT_EQ(merge_operands[1], first_merge_operand);
} }
const std::string second_expected_default(delim + second_merge_operand);
{ {
PinnableSlice result; PinnableSlice result;
ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), second_key, ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), second_key,
@ -451,13 +453,7 @@ TEST_F(DBWideBasicTest, MergeEntity) {
PinnableWideColumns result; PinnableWideColumns result;
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
second_key, &result)); second_key, &result));
ASSERT_EQ(result.columns(), second_expected_columns);
WideColumns expected_columns{
{kDefaultWideColumnName, second_expected_default},
second_columns[0],
second_columns[1]};
ASSERT_EQ(result.columns(), expected_columns);
} }
{ {
@ -495,18 +491,44 @@ TEST_F(DBWideBasicTest, MergeEntity) {
ASSERT_OK(statuses[1]); ASSERT_OK(statuses[1]);
} }
// Note: Merge is currently not supported for wide-column entities in
// iterator
{ {
std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions())); std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
iter->SeekToFirst(); iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), first_key);
ASSERT_EQ(iter->value(), first_expected_default);
ASSERT_EQ(iter->columns(), first_expected_columns);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), second_key);
ASSERT_EQ(iter->value(), second_expected_default);
ASSERT_EQ(iter->columns(), second_expected_columns);
iter->Next();
ASSERT_FALSE(iter->Valid()); ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsNotSupported()); ASSERT_OK(iter->status());
iter->SeekToLast(); iter->SeekToLast();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), second_key);
ASSERT_EQ(iter->value(), second_expected_default);
ASSERT_EQ(iter->columns(), second_expected_columns);
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), first_key);
ASSERT_EQ(iter->value(), first_expected_default);
ASSERT_EQ(iter->columns(), first_expected_columns);
iter->Prev();
ASSERT_FALSE(iter->Valid()); ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsNotSupported()); ASSERT_OK(iter->status());
} }
}; };

Loading…
Cancel
Save