Fix MultiGet with PinnableSlices and Merge for WBWI (#8299)

Summary:
The MultiGetFromBatchAndDB would fail if the PinnableSlice value being returned was pinned.  This could happen if the value was retrieved from the DB (not memtable) or potentially if the values were reused (and a previous iteration returned a slice that was pinned).

This change resets the pinnable value to clear it prior to attempting to use it, thereby eliminating the problem with the value already being pinned.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8299

Reviewed By: jay-zhuang

Differential Revision: D28455426

Pulled By: mrambacher

fbshipit-source-id: a34d7d983ec9b6bb4c8a2b4892f72858d43e6972
main
mrambacher 4 years ago committed by Facebook GitHub Bot
parent 83d1a66598
commit 6b0a22a4b0
  1. 15
      utilities/write_batch_with_index/write_batch_with_index.cc
  2. 9
      utilities/write_batch_with_index/write_batch_with_index_internal.cc
  3. 9
      utilities/write_batch_with_index/write_batch_with_index_internal.h
  4. 141
      utilities/write_batch_with_index/write_batch_with_index_test.cc

@ -539,13 +539,15 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB(
// after the transaction finishes. // after the transaction finishes.
for (size_t i = 0; i < num_keys; ++i) { for (size_t i = 0; i < num_keys; ++i) {
MergeContext merge_context; MergeContext merge_context;
PinnableSlice* pinnable_val = &values[i]; std::string batch_value;
std::string& batch_value = *pinnable_val->GetSelf();
Status* s = &statuses[i]; Status* s = &statuses[i];
PinnableSlice* pinnable_val = &values[i];
pinnable_val->Reset();
auto result = auto result =
wbwii.GetFromBatch(this, keys[i], &merge_context, &batch_value, s); wbwii.GetFromBatch(this, keys[i], &merge_context, &batch_value, s);
if (result == WBWIIteratorImpl::kFound) { if (result == WBWIIteratorImpl::kFound) {
*pinnable_val->GetSelf() = std::move(batch_value);
pinnable_val->PinSelf(); pinnable_val->PinSelf();
continue; continue;
} }
@ -581,19 +583,24 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB(
std::pair<WBWIIteratorImpl::Result, MergeContext>& merge_result = std::pair<WBWIIteratorImpl::Result, MergeContext>& merge_result =
merges[index]; merges[index];
if (merge_result.first == WBWIIteratorImpl::kMergeInProgress) { if (merge_result.first == WBWIIteratorImpl::kMergeInProgress) {
std::string merged_value;
// Merge result from DB with merges in Batch // Merge result from DB with merges in Batch
if (key.s->ok()) { if (key.s->ok()) {
*key.s = wbwii.MergeKey(*key.key, iter->value, merge_result.second, *key.s = wbwii.MergeKey(*key.key, iter->value, merge_result.second,
key.value->GetSelf()); &merged_value);
} else { // Key not present in db (s.IsNotFound()) } else { // Key not present in db (s.IsNotFound())
*key.s = wbwii.MergeKey(*key.key, nullptr, merge_result.second, *key.s = wbwii.MergeKey(*key.key, nullptr, merge_result.second,
key.value->GetSelf()); &merged_value);
} }
if (key.s->ok()) {
key.value->Reset();
*key.value->GetSelf() = std::move(merged_value);
key.value->PinSelf(); key.value->PinSelf();
} }
} }
} }
} }
}
void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); } void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); }

@ -621,8 +621,7 @@ WriteBatchWithIndexInternal::WriteBatchWithIndexInternal(
Status WriteBatchWithIndexInternal::MergeKey(const Slice& key, Status WriteBatchWithIndexInternal::MergeKey(const Slice& key,
const Slice* value, const Slice* value,
const MergeContext& context, const MergeContext& context,
std::string* result, std::string* result) const {
Slice* result_operand) const {
if (column_family_ != nullptr) { if (column_family_ != nullptr) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family_); auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family_);
const auto merge_operator = cfh->cfd()->ioptions()->merge_operator.get(); const auto merge_operator = cfh->cfd()->ioptions()->merge_operator.get();
@ -638,7 +637,7 @@ Status WriteBatchWithIndexInternal::MergeKey(const Slice& key,
SystemClock* clock = immutable_db_options.clock; SystemClock* clock = immutable_db_options.clock;
return MergeHelper::TimedFullMerge(merge_operator, key, value, return MergeHelper::TimedFullMerge(merge_operator, key, value,
context.GetOperands(), result, logger, context.GetOperands(), result, logger,
statistics, clock, result_operand); statistics, clock);
} else if (db_options_ != nullptr) { } else if (db_options_ != nullptr) {
Statistics* statistics = db_options_->statistics.get(); Statistics* statistics = db_options_->statistics.get();
Env* env = db_options_->env; Env* env = db_options_->env;
@ -646,12 +645,12 @@ Status WriteBatchWithIndexInternal::MergeKey(const Slice& key,
SystemClock* clock = env->GetSystemClock().get(); SystemClock* clock = env->GetSystemClock().get();
return MergeHelper::TimedFullMerge(merge_operator, key, value, return MergeHelper::TimedFullMerge(merge_operator, key, value,
context.GetOperands(), result, logger, context.GetOperands(), result, logger,
statistics, clock, result_operand); statistics, clock);
} else { } else {
const auto cf_opts = cfh->cfd()->ioptions(); const auto cf_opts = cfh->cfd()->ioptions();
return MergeHelper::TimedFullMerge( return MergeHelper::TimedFullMerge(
merge_operator, key, value, context.GetOperands(), result, merge_operator, key, value, context.GetOperands(), result,
cf_opts->logger, cf_opts->stats, cf_opts->clock, result_operand); cf_opts->logger, cf_opts->stats, cf_opts->clock);
} }
} else { } else {
return Status::InvalidArgument("Must provide a column_family"); return Status::InvalidArgument("Must provide a column_family");

@ -306,13 +306,12 @@ class WriteBatchWithIndexInternal {
const Slice& key, const Slice& key,
MergeContext* merge_context, MergeContext* merge_context,
std::string* value, Status* s); std::string* value, Status* s);
Status MergeKey(const Slice& key, const Slice* value, std::string* result, Status MergeKey(const Slice& key, const Slice* value,
Slice* result_operand = nullptr) const { std::string* result) const {
return MergeKey(key, value, merge_context_, result, result_operand); return MergeKey(key, value, merge_context_, result);
} }
Status MergeKey(const Slice& key, const Slice* value, Status MergeKey(const Slice& key, const Slice* value,
const MergeContext& context, std::string* result, const MergeContext& context, std::string* result) const;
Slice* result_operand = nullptr) const;
size_t GetNumOperands() const { return merge_context_.GetNumOperands(); } size_t GetNumOperands() const { return merge_context_.GetNumOperands(); }
MergeContext* GetMergeContext() { return &merge_context_; } MergeContext* GetMergeContext() { return &merge_context_; }
Slice GetOperand(int index) const { return merge_context_.GetOperand(index); } Slice GetOperand(int index) const { return merge_context_.GetOperand(index); }

@ -1412,6 +1412,49 @@ TEST_P(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge3) {
ASSERT_EQ(value, "1,2"); ASSERT_EQ(value, "1,2");
} }
TEST_P(WriteBatchWithIndexTest, TestPinnedGetFromBatchAndDB) {
Status s = OpenDB();
ASSERT_OK(s);
PinnableSlice value;
ASSERT_OK(db_->Put(write_opts_, "a", "a0"));
ASSERT_OK(db_->Put(write_opts_, "b", "b0"));
ASSERT_OK(db_->Merge(write_opts_, "b", "b1"));
ASSERT_OK(db_->Merge(write_opts_, "c", "c0"));
ASSERT_OK(db_->Merge(write_opts_, "d", "d0"));
ASSERT_OK(batch_->Merge("a", "a1"));
ASSERT_OK(batch_->Merge("a", "a2"));
ASSERT_OK(batch_->Merge("b", "b2"));
ASSERT_OK(batch_->Merge("d", "d1"));
ASSERT_OK(batch_->Merge("e", "e0"));
for (int i = 0; i < 2; i++) {
if (i == 1) {
// Do it again with a flushed DB...
ASSERT_OK(db_->Flush(FlushOptions(), db_->DefaultColumnFamily()));
}
ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "a", &value));
ASSERT_EQ("a0,a1,a2", value.ToString());
ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "b", &value));
ASSERT_EQ("b0,b1,b2", value.ToString());
ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "c", &value));
ASSERT_EQ("c0", value.ToString());
ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "d", &value));
ASSERT_EQ("d0,d1", value.ToString());
ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "e", &value));
ASSERT_EQ("e0", value.ToString());
ASSERT_OK(db_->Delete(write_opts_, "x"));
s = batch_->GetFromBatchAndDB(db_, read_opts_, "x", &value);
ASSERT_TRUE(s.IsNotFound());
}
}
void AssertKey(std::string key, WBWIIterator* iter) { void AssertKey(std::string key, WBWIIterator* iter) {
ASSERT_TRUE(iter->Valid()); ASSERT_TRUE(iter->Valid());
ASSERT_EQ(key, iter->Entry().key.ToString()); ASSERT_EQ(key, iter->Entry().key.ToString());
@ -1808,12 +1851,12 @@ TEST_P(WriteBatchWithIndexTest, MultiGetTest) {
// Write some data to the db for the even numbered keys // Write some data to the db for the even numbered keys
{ {
WriteBatch wb; WriteBatch wb;
for (size_t i = 1; i < keys.size(); ++i) { for (size_t i = 0; i < keys.size(); i += 2) {
std::string val = "val" + std::to_string(i); std::string val = "val" + std::to_string(i);
ASSERT_OK(wb.Put(cf0, keys[i], val)); ASSERT_OK(wb.Put(cf0, keys[i], val));
} }
ASSERT_OK(db_->Write(write_opts_, &wb)); ASSERT_OK(db_->Write(write_opts_, &wb));
for (size_t i = 1; i < keys.size(); ++i) { for (size_t i = 0; i < keys.size(); i += 2) {
std::string value; std::string value;
ASSERT_OK(db_->Get(read_opts_, cf0, keys[i], &value)); ASSERT_OK(db_->Get(read_opts_, cf0, keys[i], &value));
} }
@ -1852,20 +1895,110 @@ TEST_P(WriteBatchWithIndexTest, MultiGetTest) {
} else if ((i % 7) == 0) { // Merge after Put } else if ((i % 7) == 0) { // Merge after Put
std::string val = "new" + std::to_string(i); std::string val = "new" + std::to_string(i);
ASSERT_EQ(values[i], val + ",merge"); ASSERT_EQ(values[i], val + ",merge");
} else { } else if ((i % 2) == 0) {
std::string val = "val" + std::to_string(i); std::string val = "val" + std::to_string(i);
ASSERT_EQ(values[i], val + ",merge"); ASSERT_EQ(values[i], val + ",merge");
} else {
ASSERT_EQ(values[i], "merge");
} }
} else if ((i % 5) == 0) { } else if ((i % 5) == 0) {
ASSERT_TRUE(statuses[i].IsNotFound()); ASSERT_TRUE(statuses[i].IsNotFound());
} else if ((i % 7) == 0) { } else if ((i % 7) == 0) {
ASSERT_OK(statuses[i]); ASSERT_OK(statuses[i]);
ASSERT_EQ(values[i], "new" + std::to_string(i)); ASSERT_EQ(values[i], "new" + std::to_string(i));
} else { } else if ((i % 2) == 0) {
ASSERT_OK(statuses[i]); ASSERT_OK(statuses[i]);
ASSERT_EQ(values[i], "val" + std::to_string(i)); ASSERT_EQ(values[i], "val" + std::to_string(i));
} else {
ASSERT_TRUE(statuses[i].IsNotFound());
}
} }
} }
TEST_P(WriteBatchWithIndexTest, MultiGetTest2) {
// MultiGet a lot of keys in order to force std::vector reallocations
const int num_keys = 700;
const int keys_per_pass = 100;
std::vector<std::string> keys;
for (size_t i = 0; i < num_keys; ++i) {
keys.emplace_back(std::to_string(i));
}
ASSERT_OK(OpenDB());
ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily();
// Keys 0- 99 have a PUT in the batch but not DB
// Keys 100-199 have a PUT in the DB
// Keys 200-299 Have a PUT/DELETE
// Keys 300-399 Have a PUT/DELETE/MERGE
// Keys 400-499 have a PUT/MERGE
// Keys 500-599 have a MERGE only
// Keys 600-699 were never written
{
WriteBatch wb;
for (size_t i = 100; i < 500; i++) {
std::string val = std::to_string(i);
ASSERT_OK(wb.Put(cf0, keys[i], val));
}
ASSERT_OK(db_->Write(write_opts_, &wb));
}
ASSERT_OK(db_->Flush(FlushOptions(), cf0));
for (size_t i = 0; i < 100; i++) {
ASSERT_OK(batch_->Put(cf0, keys[i], keys[i]));
}
for (size_t i = 200; i < 400; i++) {
ASSERT_OK(batch_->Delete(cf0, keys[i]));
}
for (size_t i = 300; i < 600; i++) {
std::string val = std::to_string(i) + "m";
ASSERT_OK(batch_->Merge(cf0, keys[i], val));
}
Random rnd(301);
std::vector<PinnableSlice> values(keys_per_pass);
std::vector<Status> statuses(keys_per_pass);
for (int pass = 0; pass < 40; pass++) {
std::vector<Slice> key_slices;
for (size_t i = 0; i < keys_per_pass; i++) {
int random = rnd.Uniform(num_keys);
key_slices.emplace_back(keys[random]);
}
batch_->MultiGetFromBatchAndDB(db_, read_opts_, cf0, keys_per_pass,
key_slices.data(), values.data(),
statuses.data(), false);
for (size_t i = 0; i < keys_per_pass; i++) {
int key = ParseInt(key_slices[i].ToString());
switch (key / 100) {
case 0: // 0-99 PUT only
ASSERT_OK(statuses[i]);
ASSERT_EQ(values[i], key_slices[i].ToString());
break;
case 1: // 100-199 PUT only
ASSERT_OK(statuses[i]);
ASSERT_EQ(values[i], key_slices[i].ToString());
break;
case 2: // 200-299 Deleted
ASSERT_TRUE(statuses[i].IsNotFound());
break;
case 3: // 300-399 Delete+Merge
ASSERT_OK(statuses[i]);
ASSERT_EQ(values[i], key_slices[i].ToString() + "m");
break;
case 4: // 400-400 Put+ Merge
ASSERT_OK(statuses[i]);
ASSERT_EQ(values[i], key_slices[i].ToString() + "," +
key_slices[i].ToString() + "m");
break;
case 5: // Merge only
ASSERT_OK(statuses[i]);
ASSERT_EQ(values[i], key_slices[i].ToString() + "m");
break;
case 6: // Never written
ASSERT_TRUE(statuses[i].IsNotFound());
break;
default:
assert(false);
} // end switch
} // End for each key
} // end for passes
} }
// This test has merges, but the merge does not play into the final result // This test has merges, but the merge does not play into the final result

Loading…
Cancel
Save