Add Merge Operator support to WriteBatchWithIndex (#8135)

Summary:
The WBWI has two differing modes of operation dependent on the value
of the constructor parameter `overwrite_key`.
Currently, regardless of the parameter, neither mode performs as
expected when using Merge. This PR remedies this by correctly invoking
the appropriate Merge Operator before returning results from the WBWI.

Examples of issues that exist which are solved by this PR:

## Example 1 with `overwrite_key=false`
Currently, from an empty database, the following sequence:
```
Put('k1', 'v1')
Merge('k1', 'v2')
Get('k1')
```
Incorrectly yields `v2`, that is to say that the Merge behaves like a Put.

## Example 2 with o`verwrite_key=true`
Currently, from an empty database, the following sequence:
```
Put('k1', 'v1')
Merge('k1', 'v2')
Get('k1')
```
Incorrectly yields `ERROR: kMergeInProgress`.

## Example 3 with `overwrite_key=false`
Currently, with a database containing `('k1' -> 'v1')`, the following sequence:
```
Merge('k1', 'v2')
GetFromBatchAndDB('k1')
```
Incorrectly yields `v1,v2`

## Example 4 with `overwrite_key=true`
Currently, with a database containing `('k1' -> 'v1')`, the following sequence:
```
Merge('k1', 'v1')
GetFromBatchAndDB('k1')
```
Incorrectly yields `ERROR: kMergeInProgress`.

## Example 5 with `overwrite_key=false`
Currently, from an empty database, the following sequence:
```
Put('k1', 'v1')
Merge('k1', 'v2')
GetFromBatchAndDB('k1')
```
Incorrectly yields `v1,v2`

## Example 6 with `overwrite_key=true`
Currently, from an empty database, `('k1' -> 'v1')`, the following sequence:
```
Put('k1', 'v1')
Merge('k1', 'v2')
GetFromBatchAndDB('k1')
```
Incorrectly yields `ERROR: kMergeInProgress`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8135

Reviewed By: pdillinger

Differential Revision: D27657938

Pulled By: mrambacher

fbshipit-source-id: 0fbda6bbc66bedeba96a84786d90141d776297df
main
mrambacher 3 years ago committed by Facebook GitHub Bot
parent f89a53655d
commit ff463742b5
  1. 2
      HISTORY.md
  2. 28
      db/merge_context.h
  3. 3
      utilities/transactions/transaction_base.cc
  4. 9
      utilities/transactions/transaction_test.cc
  5. 168
      utilities/write_batch_with_index/write_batch_with_index.cc
  6. 302
      utilities/write_batch_with_index/write_batch_with_index_internal.cc
  7. 60
      utilities/write_batch_with_index/write_batch_with_index_internal.h
  8. 1616
      utilities/write_batch_with_index/write_batch_with_index_test.cc

@ -34,6 +34,8 @@
* `CompactFiles()` can no longer compact files from lower level to up level, which has the risk to corrupt DB (details: #8063). The validation is also added to all compactions.
* Fixed some cases in which DB::OpenForReadOnly() could write to the filesystem. If you want a Logger with a read-only DB, you must now set DBOptions::info_log yourself, such as using CreateLoggerFromOptions().
* get_iostats_context() will never return nullptr. If thread-local support is not available, and user does not opt-out iostats context, then compilation will fail. The same applies to perf context as well.
* Added support for WriteBatchWithIndex::NewIteratorWithBase when overwrite_key=false. Previously, this combination was not supported and would assert or return nullptr.
* Improve the behavior of WriteBatchWithIndex for Merge operations. Now more operations may be stored in order to return the correct merged result.
### Bug Fixes
* Use thread-safe `strerror_r()` to get error messages.

@ -68,7 +68,7 @@ class MergeContext {
}
// Get the operand at the index.
Slice GetOperand(int index) {
Slice GetOperand(int index) const {
assert(operand_list_);
SetDirectionForward();
@ -76,13 +76,21 @@ class MergeContext {
}
// Same as GetOperandsDirectionForward
const std::vector<Slice>& GetOperands() {
//
// Note that the returned reference is only good until another call
// to this MergeContext. If the returned value is needed for longer,
// a copy must be made.
const std::vector<Slice>& GetOperands() const {
return GetOperandsDirectionForward();
}
// Return all the operands in the order as they were merged (passed to
// FullMerge or FullMergeV2)
const std::vector<Slice>& GetOperandsDirectionForward() {
//
// Note that the returned reference is only good until another call
// to this MergeContext. If the returned value is needed for longer,
// a copy must be made.
const std::vector<Slice>& GetOperandsDirectionForward() const {
if (!operand_list_) {
return empty_operand_list;
}
@ -93,7 +101,11 @@ class MergeContext {
// Return all the operands in the reversed order relative to how they were
// merged (passed to FullMerge or FullMergeV2)
const std::vector<Slice>& GetOperandsDirectionBackward() {
//
// Note that the returned reference is only good until another call
// to this MergeContext. If the returned value is needed for longer,
// a copy must be made.
const std::vector<Slice>& GetOperandsDirectionBackward() const {
if (!operand_list_) {
return empty_operand_list;
}
@ -110,14 +122,14 @@ class MergeContext {
}
}
void SetDirectionForward() {
void SetDirectionForward() const {
if (operands_reversed_ == true) {
std::reverse(operand_list_->begin(), operand_list_->end());
operands_reversed_ = false;
}
}
void SetDirectionBackward() {
void SetDirectionBackward() const {
if (operands_reversed_ == false) {
std::reverse(operand_list_->begin(), operand_list_->end());
operands_reversed_ = true;
@ -125,10 +137,10 @@ class MergeContext {
}
// List of operands
std::unique_ptr<std::vector<Slice>> operand_list_;
mutable std::unique_ptr<std::vector<Slice>> operand_list_;
// Copy of operands that are not pinned.
std::unique_ptr<std::vector<std::unique_ptr<std::string>>> copied_operands_;
bool operands_reversed_ = true;
mutable bool operands_reversed_ = true;
};
} // namespace ROCKSDB_NAMESPACE

@ -306,7 +306,8 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options) {
Iterator* db_iter = db_->NewIterator(read_options);
assert(db_iter);
return write_batch_.NewIteratorWithBase(db_iter);
return write_batch_.NewIteratorWithBase(db_->DefaultColumnFamily(), db_iter,
&read_options);
}
Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,

@ -2814,7 +2814,8 @@ TEST_P(TransactionTest, MultiGetBatchedTest) {
ASSERT_TRUE(statuses[1].IsNotFound());
ASSERT_TRUE(statuses[2].ok());
ASSERT_EQ(values[2], "val3_new");
ASSERT_TRUE(statuses[3].IsMergeInProgress());
ASSERT_TRUE(statuses[3].ok());
ASSERT_EQ(values[3], "foo,bar");
ASSERT_TRUE(statuses[4].ok());
ASSERT_EQ(values[4], "val5");
ASSERT_TRUE(statuses[5].ok());
@ -4839,7 +4840,8 @@ TEST_P(TransactionTest, MergeTest) {
ASSERT_OK(s);
s = txn->Get(read_options, "A", &value);
ASSERT_TRUE(s.IsMergeInProgress());
ASSERT_OK(s);
ASSERT_EQ("a0,1,2", value);
s = txn->Put("A", "a");
ASSERT_OK(s);
@ -4852,7 +4854,8 @@ TEST_P(TransactionTest, MergeTest) {
ASSERT_OK(s);
s = txn->Get(read_options, "A", &value);
ASSERT_TRUE(s.IsMergeInProgress());
ASSERT_OK(s);
ASSERT_EQ("a,3", value);
TransactionOptions txn_options;
txn_options.lock_timeout = 1; // 1 ms

@ -53,13 +53,16 @@ struct WriteBatchWithIndex::Rep {
// In overwrite mode, find the existing entry for the same key and update it
// to point to the current entry.
// Return true if the key is found and updated.
bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key);
bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key);
bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key,
WriteType type);
bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key,
WriteType type);
// Add the recent entry to the update.
// In overwrite mode, if key already exists in the index, update it.
void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key);
void AddOrUpdateIndex(const Slice& key);
void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key,
WriteType type);
void AddOrUpdateIndex(const Slice& key, WriteType type);
// Allocate an index entry pointing to the last entry in the write batch and
// put it to skip list.
@ -75,13 +78,13 @@ struct WriteBatchWithIndex::Rep {
};
bool WriteBatchWithIndex::Rep::UpdateExistingEntry(
ColumnFamilyHandle* column_family, const Slice& key) {
ColumnFamilyHandle* column_family, const Slice& key, WriteType type) {
uint32_t cf_id = GetColumnFamilyID(column_family);
return UpdateExistingEntryWithCfId(cf_id, key);
return UpdateExistingEntryWithCfId(cf_id, key, type);
}
bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId(
uint32_t column_family_id, const Slice& key) {
uint32_t column_family_id, const Slice& key, WriteType type) {
if (!overwrite_key) {
return false;
}
@ -91,9 +94,16 @@ bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId(
iter.Seek(key);
if (!iter.Valid()) {
return false;
}
if (!iter.MatchesKey(column_family_id, key)) {
} else if (!iter.MatchesKey(column_family_id, key)) {
return false;
} else {
// Move to the end of this key (NextKey-Prev)
iter.NextKey(); // Move to the next key
if (iter.Valid()) {
iter.Prev(); // Move back one entry
} else {
iter.SeekToLast();
}
}
WriteBatchIndexEntry* non_const_entry =
const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry());
@ -101,13 +111,17 @@ bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId(
last_sub_batch_offset = last_entry_offset;
sub_batch_cnt++;
}
non_const_entry->offset = last_entry_offset;
return true;
if (type == kMergeRecord) {
return false;
} else {
non_const_entry->offset = last_entry_offset;
return true;
}
}
void WriteBatchWithIndex::Rep::AddOrUpdateIndex(
ColumnFamilyHandle* column_family, const Slice& key) {
if (!UpdateExistingEntry(column_family, key)) {
ColumnFamilyHandle* column_family, const Slice& key, WriteType type) {
if (!UpdateExistingEntry(column_family, key, type)) {
uint32_t cf_id = GetColumnFamilyID(column_family);
const auto* cf_cmp = GetColumnFamilyUserComparator(column_family);
if (cf_cmp != nullptr) {
@ -117,8 +131,9 @@ void WriteBatchWithIndex::Rep::AddOrUpdateIndex(
}
}
void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key) {
if (!UpdateExistingEntryWithCfId(0, key)) {
void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key,
WriteType type) {
if (!UpdateExistingEntryWithCfId(0, key, type)) {
AddNewEntry(0);
}
}
@ -190,14 +205,31 @@ Status WriteBatchWithIndex::Rep::ReBuildIndex() {
switch (tag) {
case kTypeColumnFamilyValue:
case kTypeValue:
found++;
if (!UpdateExistingEntryWithCfId(column_family_id, key, kPutRecord)) {
AddNewEntry(column_family_id);
}
break;
case kTypeColumnFamilyDeletion:
case kTypeDeletion:
found++;
if (!UpdateExistingEntryWithCfId(column_family_id, key,
kDeleteRecord)) {
AddNewEntry(column_family_id);
}
break;
case kTypeColumnFamilySingleDeletion:
case kTypeSingleDeletion:
found++;
if (!UpdateExistingEntryWithCfId(column_family_id, key,
kSingleDeleteRecord)) {
AddNewEntry(column_family_id);
}
break;
case kTypeColumnFamilyMerge:
case kTypeMerge:
found++;
if (!UpdateExistingEntryWithCfId(column_family_id, key)) {
if (!UpdateExistingEntryWithCfId(column_family_id, key, kMergeRecord)) {
AddNewEntry(column_family_id);
}
break;
@ -255,22 +287,19 @@ WBWIIterator* WriteBatchWithIndex::NewIterator(
Iterator* WriteBatchWithIndex::NewIteratorWithBase(
ColumnFamilyHandle* column_family, Iterator* base_iterator,
const ReadOptions* read_options) {
if (rep->overwrite_key == false) {
assert(false);
return nullptr;
}
return new BaseDeltaIterator(base_iterator, NewIterator(column_family),
auto wbwiii =
new WBWIIteratorImpl(GetColumnFamilyID(column_family), &(rep->skip_list),
&rep->write_batch, &rep->comparator);
return new BaseDeltaIterator(column_family, base_iterator, wbwiii,
GetColumnFamilyUserComparator(column_family),
read_options);
}
Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) {
if (rep->overwrite_key == false) {
assert(false);
return nullptr;
}
// default column family's comparator
return new BaseDeltaIterator(base_iterator, NewIterator(),
auto wbwiii = new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch,
&rep->comparator);
return new BaseDeltaIterator(nullptr, base_iterator, wbwiii,
rep->comparator.default_comparator());
}
@ -279,7 +308,7 @@ Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
rep->SetLastEntryOffset();
auto s = rep->write_batch.Put(column_family, key, value);
if (s.ok()) {
rep->AddOrUpdateIndex(column_family, key);
rep->AddOrUpdateIndex(column_family, key, kPutRecord);
}
return s;
}
@ -288,7 +317,7 @@ Status WriteBatchWithIndex::Put(const Slice& key, const Slice& value) {
rep->SetLastEntryOffset();
auto s = rep->write_batch.Put(key, value);
if (s.ok()) {
rep->AddOrUpdateIndex(key);
rep->AddOrUpdateIndex(key, kPutRecord);
}
return s;
}
@ -298,7 +327,7 @@ Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
rep->SetLastEntryOffset();
auto s = rep->write_batch.Delete(column_family, key);
if (s.ok()) {
rep->AddOrUpdateIndex(column_family, key);
rep->AddOrUpdateIndex(column_family, key, kDeleteRecord);
}
return s;
}
@ -307,7 +336,7 @@ Status WriteBatchWithIndex::Delete(const Slice& key) {
rep->SetLastEntryOffset();
auto s = rep->write_batch.Delete(key);
if (s.ok()) {
rep->AddOrUpdateIndex(key);
rep->AddOrUpdateIndex(key, kDeleteRecord);
}
return s;
}
@ -317,7 +346,7 @@ Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family,
rep->SetLastEntryOffset();
auto s = rep->write_batch.SingleDelete(column_family, key);
if (s.ok()) {
rep->AddOrUpdateIndex(column_family, key);
rep->AddOrUpdateIndex(column_family, key, kSingleDeleteRecord);
}
return s;
}
@ -326,7 +355,7 @@ Status WriteBatchWithIndex::SingleDelete(const Slice& key) {
rep->SetLastEntryOffset();
auto s = rep->write_batch.SingleDelete(key);
if (s.ok()) {
rep->AddOrUpdateIndex(key);
rep->AddOrUpdateIndex(key, kSingleDeleteRecord);
}
return s;
}
@ -336,7 +365,7 @@ Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family,
rep->SetLastEntryOffset();
auto s = rep->write_batch.Merge(column_family, key, value);
if (s.ok()) {
rep->AddOrUpdateIndex(column_family, key);
rep->AddOrUpdateIndex(column_family, key, kMergeRecord);
}
return s;
}
@ -345,7 +374,7 @@ Status WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) {
rep->SetLastEntryOffset();
auto s = rep->write_batch.Merge(key, value);
if (s.ok()) {
rep->AddOrUpdateIndex(key);
rep->AddOrUpdateIndex(key, kMergeRecord);
}
return s;
}
@ -361,18 +390,18 @@ Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) {
Status s;
WriteBatchWithIndexInternal wbwii(&options, column_family);
auto result = wbwii.GetFromBatch(this, key, value, rep->overwrite_key, &s);
auto result = wbwii.GetFromBatch(this, key, value, &s);
switch (result) {
case WriteBatchWithIndexInternal::Result::kFound:
case WriteBatchWithIndexInternal::Result::kError:
case WBWIIteratorImpl::kFound:
case WBWIIteratorImpl::kError:
// use returned status
break;
case WriteBatchWithIndexInternal::Result::kDeleted:
case WriteBatchWithIndexInternal::Result::kNotFound:
case WBWIIteratorImpl::kDeleted:
case WBWIIteratorImpl::kNotFound:
s = Status::NotFound();
break;
case WriteBatchWithIndexInternal::Result::kMergeInProgress:
case WBWIIteratorImpl::kMergeInProgress:
s = Status::MergeInProgress();
break;
default:
@ -440,29 +469,18 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(
// we cannot pin it as otherwise the returned value will not be available
// after the transaction finishes.
std::string& batch_value = *pinnable_val->GetSelf();
auto result =
wbwii.GetFromBatch(this, key, &batch_value, rep->overwrite_key, &s);
auto result = wbwii.GetFromBatch(this, key, &batch_value, &s);
if (result == WriteBatchWithIndexInternal::Result::kFound) {
if (result == WBWIIteratorImpl::kFound) {
pinnable_val->PinSelf();
return s;
}
if (result == WriteBatchWithIndexInternal::Result::kDeleted) {
return Status::NotFound();
}
if (result == WriteBatchWithIndexInternal::Result::kError) {
} else if (!s.ok() || result == WBWIIteratorImpl::kError) {
return s;
} else if (result == WBWIIteratorImpl::kDeleted) {
return Status::NotFound();
}
if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
rep->overwrite_key == true) {
// Since we've overwritten keys, we do not know what other operations are
// in this batch for this key, so we cannot do a Merge to compute the
// result. Instead, we will simply return MergeInProgress.
return Status::MergeInProgress();
}
assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress ||
result == WriteBatchWithIndexInternal::Result::kNotFound);
assert(result == WBWIIteratorImpl::kMergeInProgress ||
result == WBWIIteratorImpl::kNotFound);
// Did not find key in batch OR could not resolve Merges. Try DB.
if (!callback) {
@ -477,7 +495,7 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(
}
if (s.ok() || s.IsNotFound()) { // DB Get Succeeded
if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress) {
if (result == WBWIIteratorImpl::kMergeInProgress) {
// Merge result from DB with merges in Batch
std::string merge_result;
if (s.ok()) {
@ -513,7 +531,7 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB(
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
// To hold merges from the write batch
autovector<std::pair<WriteBatchWithIndexInternal::Result, MergeContext>,
autovector<std::pair<WBWIIteratorImpl::Result, MergeContext>,
MultiGetContext::MAX_BATCH_SIZE>
merges;
// Since the lifetime of the WriteBatch is the same as that of the transaction
@ -524,31 +542,22 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB(
PinnableSlice* pinnable_val = &values[i];
std::string& batch_value = *pinnable_val->GetSelf();
Status* s = &statuses[i];
auto result = wbwii.GetFromBatch(this, keys[i], &merge_context,
&batch_value, rep->overwrite_key, s);
auto result =
wbwii.GetFromBatch(this, keys[i], &merge_context, &batch_value, s);
if (result == WriteBatchWithIndexInternal::Result::kFound) {
if (result == WBWIIteratorImpl::kFound) {
pinnable_val->PinSelf();
continue;
}
if (result == WriteBatchWithIndexInternal::Result::kDeleted) {
if (result == WBWIIteratorImpl::kDeleted) {
*s = Status::NotFound();
continue;
}
if (result == WriteBatchWithIndexInternal::Result::kError) {
if (result == WBWIIteratorImpl::kError) {
continue;
}
if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
rep->overwrite_key == true) {
// Since we've overwritten keys, we do not know what other operations are
// in this batch for this key, so we cannot do a Merge to compute the
// result. Instead, we will simply return MergeInProgress.
*s = Status::MergeInProgress();
continue;
}
assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress ||
result == WriteBatchWithIndexInternal::Result::kNotFound);
assert(result == WBWIIteratorImpl::kMergeInProgress ||
result == WBWIIteratorImpl::kNotFound);
key_context.emplace_back(column_family, keys[i], &values[i],
/*timestamp*/ nullptr, &statuses[i]);
merges.emplace_back(result, std::move(merge_context));
@ -569,10 +578,9 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB(
KeyContext& key = *iter;
if (key.s->ok() || key.s->IsNotFound()) { // DB Get Succeeded
size_t index = iter - key_context.begin();
std::pair<WriteBatchWithIndexInternal::Result, MergeContext>&
merge_result = merges[index];
if (merge_result.first ==
WriteBatchWithIndexInternal::Result::kMergeInProgress) {
std::pair<WBWIIteratorImpl::Result, MergeContext>& merge_result =
merges[index];
if (merge_result.first == WBWIIteratorImpl::kMergeInProgress) {
// Merge result from DB with merges in Batch
if (key.s->ok()) {
*key.s = wbwii.MergeKey(*key.key, iter->value, merge_result.second,

@ -19,8 +19,9 @@
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
BaseDeltaIterator::BaseDeltaIterator(Iterator* base_iterator,
WBWIIterator* delta_iterator,
BaseDeltaIterator::BaseDeltaIterator(ColumnFamilyHandle* column_family,
Iterator* base_iterator,
WBWIIteratorImpl* delta_iterator,
const Comparator* comparator,
const ReadOptions* read_options)
: forward_(true),
@ -31,7 +32,9 @@ BaseDeltaIterator::BaseDeltaIterator(Iterator* base_iterator,
delta_iterator_(delta_iterator),
comparator_(comparator),
iterate_upper_bound_(read_options ? read_options->iterate_upper_bound
: nullptr) {}
: nullptr) {
wbwii_.reset(new WriteBatchWithIndexInternal(column_family));
}
bool BaseDeltaIterator::Valid() const {
return status_.ok() ? (current_at_base_ ? BaseValid() : DeltaValid()) : false;
@ -144,8 +147,32 @@ Slice BaseDeltaIterator::key() const {
}
Slice BaseDeltaIterator::value() const {
return current_at_base_ ? base_iterator_->value()
: delta_iterator_->Entry().value;
if (current_at_base_) {
return base_iterator_->value();
} else {
WriteEntry delta_entry = delta_iterator_->Entry();
if (wbwii_->GetNumOperands() == 0) {
return delta_entry.value;
} else if (delta_entry.type == kDeleteRecord ||
delta_entry.type == kSingleDeleteRecord) {
status_ =
wbwii_->MergeKey(delta_entry.key, nullptr, merge_result_.GetSelf());
} else if (delta_entry.type == kPutRecord) {
status_ = wbwii_->MergeKey(delta_entry.key, &delta_entry.value,
merge_result_.GetSelf());
} else if (delta_entry.type == kMergeRecord) {
if (equal_keys_) {
Slice base_value = base_iterator_->value();
status_ = wbwii_->MergeKey(delta_entry.key, &base_value,
merge_result_.GetSelf());
} else {
status_ =
wbwii_->MergeKey(delta_entry.key, nullptr, merge_result_.GetSelf());
}
}
merge_result_.PinSelf();
return merge_result_;
}
}
Status BaseDeltaIterator::status() const {
@ -228,12 +255,11 @@ void BaseDeltaIterator::Advance() {
void BaseDeltaIterator::AdvanceDelta() {
if (forward_) {
delta_iterator_->Next();
delta_iterator_->NextKey();
} else {
delta_iterator_->Prev();
delta_iterator_->PrevKey();
}
}
void BaseDeltaIterator::AdvanceBase() {
if (forward_) {
base_iterator_->Next();
@ -243,17 +269,18 @@ void BaseDeltaIterator::AdvanceBase() {
}
bool BaseDeltaIterator::BaseValid() const { return base_iterator_->Valid(); }
bool BaseDeltaIterator::DeltaValid() const { return delta_iterator_->Valid(); }
void BaseDeltaIterator::UpdateCurrent() {
// Suppress false positive clang analyzer warnings.
#ifndef __clang_analyzer__
status_ = Status::OK();
while (true) {
auto delta_result = WBWIIteratorImpl::kNotFound;
WriteEntry delta_entry;
if (DeltaValid()) {
assert(delta_iterator_->status().ok());
delta_result =
delta_iterator_->FindLatestUpdate(wbwii_->GetMergeContext());
delta_entry = delta_iterator_->Entry();
} else if (!delta_iterator_->status().ok()) {
// Expose the error status and stop.
@ -279,8 +306,8 @@ void BaseDeltaIterator::UpdateCurrent() {
return;
}
}
if (delta_entry.type == kDeleteRecord ||
delta_entry.type == kSingleDeleteRecord) {
if (delta_result == WBWIIteratorImpl::kDeleted &&
wbwii_->GetNumOperands() == 0) {
AdvanceDelta();
} else {
current_at_base_ = false;
@ -298,8 +325,8 @@ void BaseDeltaIterator::UpdateCurrent() {
if (compare == 0) {
equal_keys_ = true;
}
if (delta_entry.type != kDeleteRecord &&
delta_entry.type != kSingleDeleteRecord) {
if (delta_result != WBWIIteratorImpl::kDeleted ||
wbwii_->GetNumOperands() > 0) {
current_at_base_ = false;
return;
}
@ -319,9 +346,105 @@ void BaseDeltaIterator::UpdateCurrent() {
#endif // __clang_analyzer__
}
class Env;
class Logger;
class Statistics;
void WBWIIteratorImpl::AdvanceKey(bool forward) {
if (Valid()) {
Slice key = Entry().key;
do {
if (forward) {
Next();
} else {
Prev();
}
} while (MatchesKey(column_family_id_, key));
}
}
void WBWIIteratorImpl::NextKey() { AdvanceKey(true); }
void WBWIIteratorImpl::PrevKey() {
AdvanceKey(false); // Move to the tail of the previous key
if (Valid()) {
AdvanceKey(false); // Move back another key. Now we are at the start of
// the previous key
if (Valid()) { // Still a valid
Next(); // Move forward one onto this key
} else {
SeekToFirst(); // Not valid, move to the start
}
}
}
WBWIIteratorImpl::Result WBWIIteratorImpl::FindLatestUpdate(
MergeContext* merge_context) {
if (Valid()) {
Slice key = Entry().key;
return FindLatestUpdate(key, merge_context);
} else {
merge_context->Clear(); // Clear any entries in the MergeContext
return WBWIIteratorImpl::kNotFound;
}
}
WBWIIteratorImpl::Result WBWIIteratorImpl::FindLatestUpdate(
const Slice& key, MergeContext* merge_context) {
Result result = WBWIIteratorImpl::kNotFound;
merge_context->Clear(); // Clear any entries in the MergeContext
// TODO(agiardullo): consider adding support for reverse iteration
if (!Valid()) {
return result;
} else if (comparator_->CompareKey(column_family_id_, Entry().key, key) !=
0) {
return result;
} else {
// We want to iterate in the reverse order that the writes were added to the
// batch. Since we don't have a reverse iterator, we must seek past the
// end. We do this by seeking to the next key, and then back one step
NextKey();
if (Valid()) {
Prev();
} else {
SeekToLast();
}
// We are at the end of the iterator for this key. Search backwards for the
// last Put or Delete, accumulating merges along the way.
while (Valid()) {
const WriteEntry entry = Entry();
if (comparator_->CompareKey(column_family_id_, entry.key, key) != 0) {
break; // Unexpected error or we've reached a different next key
}
switch (entry.type) {
case kPutRecord:
return WBWIIteratorImpl::kFound;
case kDeleteRecord:
return WBWIIteratorImpl::kDeleted;
case kSingleDeleteRecord:
return WBWIIteratorImpl::kDeleted;
case kMergeRecord:
result = WBWIIteratorImpl::kMergeInProgress;
merge_context->PushOperand(entry.value);
break;
case kLogDataRecord:
break; // ignore
case kXIDRecord:
break; // ignore
default:
return WBWIIteratorImpl::kError;
} // end switch statement
Prev();
} // End while Valid()
// At this point, we have been through the whole list and found no Puts or
// Deletes. The iterator points to the previous key. Move the iterator back
// onto this one.
if (Valid()) {
Next();
} else {
SeekToFirst();
}
}
return result;
}
Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
WriteType* type, Slice* Key,
@ -479,6 +602,10 @@ bool WBWIIteratorImpl::MatchesKey(uint32_t cf_id, const Slice& key) {
}
}
WriteBatchWithIndexInternal::WriteBatchWithIndexInternal(
ColumnFamilyHandle* column_family)
: db_(nullptr), db_options_(nullptr), column_family_(column_family) {}
WriteBatchWithIndexInternal::WriteBatchWithIndexInternal(
DB* db, ColumnFamilyHandle* column_family)
: db_(db), db_options_(nullptr), column_family_(column_family) {
@ -493,9 +620,9 @@ WriteBatchWithIndexInternal::WriteBatchWithIndexInternal(
Status WriteBatchWithIndexInternal::MergeKey(const Slice& key,
const Slice* value,
MergeContext& merge_context,
const MergeContext& context,
std::string* result,
Slice* result_operand) {
Slice* result_operand) const {
if (column_family_ != nullptr) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family_);
const auto merge_operator = cfh->cfd()->ioptions()->merge_operator.get();
@ -509,133 +636,66 @@ Status WriteBatchWithIndexInternal::MergeKey(const Slice& key,
Statistics* statistics = immutable_db_options.statistics.get();
Logger* logger = immutable_db_options.info_log.get();
SystemClock* clock = immutable_db_options.clock;
return MergeHelper::TimedFullMerge(
merge_operator, key, value, merge_context.GetOperands(), result,
logger, statistics, clock, result_operand);
return MergeHelper::TimedFullMerge(merge_operator, key, value,
context.GetOperands(), result, logger,
statistics, clock, result_operand);
} else if (db_options_ != nullptr) {
Statistics* statistics = db_options_->statistics.get();
Env* env = db_options_->env;
Logger* logger = db_options_->info_log.get();
SystemClock* clock = env->GetSystemClock().get();
return MergeHelper::TimedFullMerge(
merge_operator, key, value, merge_context.GetOperands(), result,
logger, statistics, clock, result_operand);
return MergeHelper::TimedFullMerge(merge_operator, key, value,
context.GetOperands(), result, logger,
statistics, clock, result_operand);
} else {
const auto cf_opts = cfh->cfd()->ioptions();
return MergeHelper::TimedFullMerge(
merge_operator, key, value, merge_context.GetOperands(), result,
nullptr, nullptr, SystemClock::Default().get(), result_operand);
merge_operator, key, value, context.GetOperands(), result,
cf_opts->logger, cf_opts->stats, cf_opts->clock, result_operand);
}
} else {
return Status::InvalidArgument("Must provide a column_family");
}
}
WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch(
WriteBatchWithIndex* batch, const Slice& key, MergeContext* merge_context,
std::string* value, bool overwrite_key, Status* s) {
uint32_t cf_id = GetColumnFamilyID(column_family_);
WBWIIteratorImpl::Result WriteBatchWithIndexInternal::GetFromBatch(
WriteBatchWithIndex* batch, const Slice& key, MergeContext* context,
std::string* value, Status* s) {
*s = Status::OK();
Result result = kNotFound;
std::unique_ptr<WBWIIteratorImpl> iter(
static_cast_with_check<WBWIIteratorImpl>(
batch->NewIterator(column_family_)));
// We want to iterate in the reverse order that the writes were added to the
// batch. Since we don't have a reverse iterator, we must seek past the end.
// TODO(agiardullo): consider adding support for reverse iteration
// Search the iterator for this key, and updates/merges to it.
iter->Seek(key);
while (iter->Valid() && iter->MatchesKey(cf_id, key)) {
iter->Next();
}
if (!(*s).ok()) {
return WriteBatchWithIndexInternal::kError;
}
if (!iter->Valid()) {
// Read past end of results. Reposition on last result.
iter->SeekToLast();
} else {
iter->Prev();
}
Slice entry_value;
while (iter->Valid()) {
if (!iter->MatchesKey(cf_id, key)) {
// Unexpected error or we've reached a different next key
break;
}
const WriteEntry entry = iter->Entry();
switch (entry.type) {
case kPutRecord: {
result = WriteBatchWithIndexInternal::Result::kFound;
entry_value = entry.value;
break;
}
case kMergeRecord: {
result = WriteBatchWithIndexInternal::Result::kMergeInProgress;
merge_context->PushOperand(entry.value);
break;
}
case kDeleteRecord:
case kSingleDeleteRecord: {
result = WriteBatchWithIndexInternal::Result::kDeleted;
break;
}
case kLogDataRecord:
case kXIDRecord: {
// ignore
break;
}
default: {
result = WriteBatchWithIndexInternal::Result::kError;
(*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:",
ToString(entry.type));
break;
auto result = iter->FindLatestUpdate(key, context);
if (result == WBWIIteratorImpl::kError) {
(*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:",
ToString(iter->Entry().type));
return result;
} else if (result == WBWIIteratorImpl::kNotFound) {
return result;
} else if (result == WBWIIteratorImpl::Result::kFound) { // PUT
Slice entry_value = iter->Entry().value;
if (context->GetNumOperands() > 0) {
*s = MergeKey(key, &entry_value, *context, value);
if (!s->ok()) {
result = WBWIIteratorImpl::Result::kError;
}
} else {
value->assign(entry_value.data(), entry_value.size());
}
if (result == WriteBatchWithIndexInternal::Result::kFound ||
result == WriteBatchWithIndexInternal::Result::kDeleted ||
result == WriteBatchWithIndexInternal::Result::kError) {
// We can stop iterating once we find a PUT or DELETE
break;
}
if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
overwrite_key == true) {
// Since we've overwritten keys, we do not know what other operations are
// in this batch for this key, so we cannot do a Merge to compute the
// result. Instead, we will simply return MergeInProgress.
break;
}
iter->Prev();
}
if ((*s).ok()) {
if (result == WriteBatchWithIndexInternal::Result::kFound ||
result == WriteBatchWithIndexInternal::Result::kDeleted) {
// Found a Put or Delete. Merge if necessary.
if (merge_context->GetNumOperands() > 0) {
if (result == WriteBatchWithIndexInternal::Result::kFound) {
*s = MergeKey(key, &entry_value, *merge_context, value);
} else {
*s = MergeKey(key, nullptr, *merge_context, value);
}
if ((*s).ok()) {
result = WriteBatchWithIndexInternal::Result::kFound;
} else {
result = WriteBatchWithIndexInternal::Result::kError;
}
} else { // nothing to merge
if (result == WriteBatchWithIndexInternal::Result::kFound) { // PUT
value->assign(entry_value.data(), entry_value.size());
}
} else if (result == WBWIIteratorImpl::kDeleted) {
if (context->GetNumOperands() > 0) {
*s = MergeKey(key, nullptr, *context, value);
if (s->ok()) {
result = WBWIIteratorImpl::Result::kFound;
} else {
result = WBWIIteratorImpl::Result::kError;
}
}
}
return result;
}

@ -23,6 +23,8 @@
namespace ROCKSDB_NAMESPACE {
class MergeContext;
class WBWIIteratorImpl;
class WriteBatchWithIndexInternal;
struct Options;
// when direction == forward
@ -33,7 +35,8 @@ struct Options;
// * equal_keys_ <=> base_iterator == delta_iterator
class BaseDeltaIterator : public Iterator {
public:
BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator,
BaseDeltaIterator(ColumnFamilyHandle* column_family, Iterator* base_iterator,
WBWIIteratorImpl* delta_iterator,
const Comparator* comparator,
const ReadOptions* read_options = nullptr);
@ -60,14 +63,16 @@ class BaseDeltaIterator : public Iterator {
bool DeltaValid() const;
void UpdateCurrent();
std::unique_ptr<WriteBatchWithIndexInternal> wbwii_;
bool forward_;
bool current_at_base_;
bool equal_keys_;
Status status_;
mutable Status status_;
std::unique_ptr<Iterator> base_iterator_;
std::unique_ptr<WBWIIterator> delta_iterator_;
std::unique_ptr<WBWIIteratorImpl> delta_iterator_;
const Comparator* comparator_; // not owned
const Slice* iterate_upper_bound_;
mutable PinnableSlice merge_result_;
};
// Key used by skip list, as the binary searchable index of WriteBatchWithIndex.
@ -174,6 +179,7 @@ typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&>
class WBWIIteratorImpl : public WBWIIterator {
public:
enum Result { kFound, kDeleted, kNotFound, kMergeInProgress, kError };
WBWIIteratorImpl(uint32_t column_family_id,
WriteBatchEntrySkipList* skip_list,
const ReadableWriteBatch* write_batch,
@ -245,6 +251,26 @@ class WBWIIteratorImpl : public WBWIIterator {
bool MatchesKey(uint32_t cf_id, const Slice& key);
// Moves the to first entry of the previous key.
void PrevKey();
// Moves the to first entry of the next key.
void NextKey();
// Moves the iterator to the Update (Put or Delete) for the current key
// If there are no Put/Delete, the Iterator will point to the first entry for
// this key
// @return kFound if a Put was found for the key
// @return kDeleted if a delete was found for the key
// @return kMergeInProgress if only merges were fouund for the key
// @return kError if an unsupported operation was found for the key
// @return kNotFound if no operations were found for this key
//
Result FindLatestUpdate(const Slice& key, MergeContext* merge_context);
Result FindLatestUpdate(MergeContext* merge_context);
protected:
void AdvanceKey(bool forward);
private:
uint32_t column_family_id_;
WriteBatchEntrySkipList::Iterator skip_list_iter_;
@ -257,12 +283,12 @@ class WriteBatchWithIndexInternal {
// For GetFromBatchAndDB or similar
explicit WriteBatchWithIndexInternal(DB* db,
ColumnFamilyHandle* column_family);
// For GetFromBatchAndDB or similar
explicit WriteBatchWithIndexInternal(ColumnFamilyHandle* column_family);
// For GetFromBatch or similar
explicit WriteBatchWithIndexInternal(const DBOptions* db_options,
ColumnFamilyHandle* column_family);
enum Result { kFound, kDeleted, kNotFound, kMergeInProgress, kError };
// If batch contains a value for key, store it in *value and return kFound.
// If batch contains a deletion for key, return Deleted.
// If batch contains Merge operations as the most recent entry for a key,
@ -271,19 +297,25 @@ class WriteBatchWithIndexInternal {
// and return kMergeInProgress
// If batch does not contain this key, return kNotFound
// Else, return kError on error with error Status stored in *s.
Result GetFromBatch(WriteBatchWithIndex* batch, const Slice& key,
std::string* value, bool overwrite_key, Status* s) {
return GetFromBatch(batch, key, &merge_context_, value, overwrite_key, s);
WBWIIteratorImpl::Result GetFromBatch(WriteBatchWithIndex* batch,
const Slice& key, std::string* value,
Status* s) {
return GetFromBatch(batch, key, &merge_context_, value, s);
}
Result GetFromBatch(WriteBatchWithIndex* batch, const Slice& key,
MergeContext* merge_context, std::string* value,
bool overwrite_key, Status* s);
WBWIIteratorImpl::Result GetFromBatch(WriteBatchWithIndex* batch,
const Slice& key,
MergeContext* merge_context,
std::string* value, Status* s);
Status MergeKey(const Slice& key, const Slice* value, std::string* result,
Slice* result_operand = nullptr) {
Slice* result_operand = nullptr) const {
return MergeKey(key, value, merge_context_, result, result_operand);
}
Status MergeKey(const Slice& key, const Slice* value, MergeContext& context,
std::string* result, Slice* result_operand = nullptr);
Status MergeKey(const Slice& key, const Slice* value,
const MergeContext& context, std::string* result,
Slice* result_operand = nullptr) const;
size_t GetNumOperands() const { return merge_context_.GetNumOperands(); }
MergeContext* GetMergeContext() { return &merge_context_; }
Slice GetOperand(int index) const { return merge_context_.GetOperand(index); }
private:
DB* db_;

Loading…
Cancel
Save