Eliminate the creation of ImmutableDBOptions in WBWI::GetFromBatch (#6851)

Summary:
1. Made `WriteBatchWithIndexInternal` into a class that stores the `DB*` or `DBOptions*`.

2. Changed the `GetFromBatch` method to be non-static and use an instance of the class.  Added `MergeKey` methods to perform the merge itself and return any status.

This change unifies the multiple calls to the `MergeHelper` under a single wrapped API.

Closes https://github.com/facebook/rocksdb/issues/6683

Pull Request resolved: https://github.com/facebook/rocksdb/pull/6851

Reviewed By: ajkr

Differential Revision: D21706574

Pulled By: pdillinger

fbshipit-source-id: 6860bd64d62669aaa591846e914eed3b674e68b1
main
mrambacher 4 years ago committed by Facebook GitHub Bot
parent 30cd38c687
commit 81367a4616
  1. 3
      include/rocksdb/utilities/write_batch_with_index.h
  2. 180
      utilities/write_batch_with_index/write_batch_with_index.cc
  3. 126
      utilities/write_batch_with_index/write_batch_with_index_internal.cc
  4. 116
      utilities/write_batch_with_index/write_batch_with_index_internal.h

@ -40,12 +40,13 @@ enum WriteType {
kDeleteRangeRecord,
kLogDataRecord,
kXIDRecord,
kUnknownRecord,
};
// an entry for Put, Merge, Delete, or SingleDelete entry for write batches.
// Used in WBWIIterator.
struct WriteEntry {
WriteType type;
WriteType type = kUnknownRecord;
Slice key;
Slice value;
};

@ -23,99 +23,6 @@
#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
namespace ROCKSDB_NAMESPACE {
typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&>
WriteBatchEntrySkipList;
class WBWIIteratorImpl : public WBWIIterator {
public:
WBWIIteratorImpl(uint32_t column_family_id,
WriteBatchEntrySkipList* skip_list,
const ReadableWriteBatch* write_batch)
: column_family_id_(column_family_id),
skip_list_iter_(skip_list),
write_batch_(write_batch) {}
~WBWIIteratorImpl() override {}
bool Valid() const override {
if (!skip_list_iter_.Valid()) {
return false;
}
const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
return (iter_entry != nullptr &&
iter_entry->column_family == column_family_id_);
}
void SeekToFirst() override {
WriteBatchIndexEntry search_entry(
nullptr /* search_key */, column_family_id_,
true /* is_forward_direction */, true /* is_seek_to_first */);
skip_list_iter_.Seek(&search_entry);
}
void SeekToLast() override {
WriteBatchIndexEntry search_entry(
nullptr /* search_key */, column_family_id_ + 1,
true /* is_forward_direction */, true /* is_seek_to_first */);
skip_list_iter_.Seek(&search_entry);
if (!skip_list_iter_.Valid()) {
skip_list_iter_.SeekToLast();
} else {
skip_list_iter_.Prev();
}
}
void Seek(const Slice& key) override {
WriteBatchIndexEntry search_entry(&key, column_family_id_,
true /* is_forward_direction */,
false /* is_seek_to_first */);
skip_list_iter_.Seek(&search_entry);
}
void SeekForPrev(const Slice& key) override {
WriteBatchIndexEntry search_entry(&key, column_family_id_,
false /* is_forward_direction */,
false /* is_seek_to_first */);
skip_list_iter_.SeekForPrev(&search_entry);
}
void Next() override { skip_list_iter_.Next(); }
void Prev() override { skip_list_iter_.Prev(); }
WriteEntry Entry() const override {
WriteEntry ret;
Slice blob, xid;
const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
// this is guaranteed with Valid()
assert(iter_entry != nullptr &&
iter_entry->column_family == column_family_id_);
auto s = write_batch_->GetEntryFromDataOffset(
iter_entry->offset, &ret.type, &ret.key, &ret.value, &blob, &xid);
assert(s.ok());
assert(ret.type == kPutRecord || ret.type == kDeleteRecord ||
ret.type == kSingleDeleteRecord || ret.type == kDeleteRangeRecord ||
ret.type == kMergeRecord);
return ret;
}
Status status() const override {
// this is in-memory data structure, so the only way status can be non-ok is
// through memory corruption
return Status::OK();
}
const WriteBatchIndexEntry* GetRawEntry() const {
return skip_list_iter_.key();
}
private:
uint32_t column_family_id_;
WriteBatchEntrySkipList::Iterator skip_list_iter_;
const ReadableWriteBatch* write_batch_;
};
struct WriteBatchWithIndex::Rep {
explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
size_t max_bytes = 0, bool _overwrite_key = false)
@ -179,12 +86,13 @@ bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId(
return false;
}
WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch);
WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch,
&comparator);
iter.Seek(key);
if (!iter.Valid()) {
return false;
}
if (comparator.CompareKey(column_family_id, key, iter.Entry().key) != 0) {
if (!iter.MatchesKey(column_family_id, key)) {
return false;
}
WriteBatchIndexEntry* non_const_entry =
@ -333,13 +241,15 @@ WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; }
size_t WriteBatchWithIndex::SubBatchCnt() { return rep->sub_batch_cnt; }
WBWIIterator* WriteBatchWithIndex::NewIterator() {
return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch);
return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch,
&(rep->comparator));
}
WBWIIterator* WriteBatchWithIndex::NewIterator(
ColumnFamilyHandle* column_family) {
return new WBWIIteratorImpl(GetColumnFamilyID(column_family),
&(rep->skip_list), &rep->write_batch);
&(rep->skip_list), &rep->write_batch,
&(rep->comparator));
}
Iterator* WriteBatchWithIndex::NewIteratorWithBase(
@ -450,13 +360,8 @@ Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family,
const DBOptions& options,
const Slice& key, std::string* value) {
Status s;
MergeContext merge_context;
const ImmutableDBOptions immuable_db_options(options);
WriteBatchWithIndexInternal::Result result =
WriteBatchWithIndexInternal::GetFromBatch(
immuable_db_options, this, column_family, key, &merge_context,
&rep->comparator, value, rep->overwrite_key, &s);
WriteBatchWithIndexInternal wbwii(&options, column_family);
auto result = wbwii.GetFromBatch(this, key, value, rep->overwrite_key, &s);
switch (result) {
case WriteBatchWithIndexInternal::Result::kFound:
@ -529,18 +434,14 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(
DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* pinnable_val, ReadCallback* callback) {
Status s;
MergeContext merge_context;
const ImmutableDBOptions& immuable_db_options =
static_cast_with_check<DBImpl>(db->GetRootDB())->immutable_db_options();
WriteBatchWithIndexInternal wbwii(db, column_family);
// Since the lifetime of the WriteBatch is the same as that of the transaction
// we cannot pin it as otherwise the returned value will not be available
// after the transaction finishes.
std::string& batch_value = *pinnable_val->GetSelf();
WriteBatchWithIndexInternal::Result result =
WriteBatchWithIndexInternal::GetFromBatch(
immuable_db_options, this, column_family, key, &merge_context,
&rep->comparator, &batch_value, rep->overwrite_key, &s);
auto result =
wbwii.GetFromBatch(this, key, &batch_value, rep->overwrite_key, &s);
if (result == WriteBatchWithIndexInternal::Result::kFound) {
pinnable_val->PinSelf();
@ -578,30 +479,16 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(
if (s.ok() || s.IsNotFound()) { // DB Get Succeeded
if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress) {
// Merge result from DB with merges in Batch
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
const MergeOperator* merge_operator =
cfh->cfd()->ioptions()->merge_operator;
Statistics* statistics = immuable_db_options.statistics.get();
Env* env = immuable_db_options.env;
Logger* logger = immuable_db_options.info_log.get();
Slice* merge_data;
std::string merge_result;
if (s.ok()) {
merge_data = pinnable_val;
s = wbwii.MergeKey(key, pinnable_val, &merge_result);
} else { // Key not present in db (s.IsNotFound())
merge_data = nullptr;
s = wbwii.MergeKey(key, nullptr, &merge_result);
}
if (merge_operator) {
std::string merge_result;
s = MergeHelper::TimedFullMerge(merge_operator, key, merge_data,
merge_context.GetOperands(),
&merge_result, logger, statistics, env);
if (s.ok()) {
pinnable_val->Reset();
*pinnable_val->GetSelf() = std::move(merge_result);
pinnable_val->PinSelf();
} else {
s = Status::InvalidArgument("Options::merge_operator must be set");
}
}
}
@ -621,8 +508,7 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB(
DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys, PinnableSlice* values,
Status* statuses, bool sorted_input, ReadCallback* callback) {
const ImmutableDBOptions& immuable_db_options =
static_cast_with_check<DBImpl>(db->GetRootDB())->immutable_db_options();
WriteBatchWithIndexInternal wbwii(db, column_family);
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
@ -638,10 +524,8 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB(
PinnableSlice* pinnable_val = &values[i];
std::string& batch_value = *pinnable_val->GetSelf();
Status* s = &statuses[i];
WriteBatchWithIndexInternal::Result result =
WriteBatchWithIndexInternal::GetFromBatch(
immuable_db_options, this, column_family, keys[i], &merge_context,
&rep->comparator, &batch_value, rep->overwrite_key, s);
auto result = wbwii.GetFromBatch(this, keys[i], &merge_context,
&batch_value, rep->overwrite_key, s);
if (result == WriteBatchWithIndexInternal::Result::kFound) {
pinnable_val->PinSelf();
@ -681,9 +565,6 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB(
->MultiGetWithCallback(read_options, column_family, callback,
&sorted_keys);
ColumnFamilyHandleImpl* cfh =
static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
const MergeOperator* merge_operator = cfh->cfd()->ioptions()->merge_operator;
for (auto iter = key_context.begin(); iter != key_context.end(); ++iter) {
KeyContext& key = *iter;
if (key.s->ok() || key.s->IsNotFound()) { // DB Get Succeeded
@ -693,27 +574,14 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB(
if (merge_result.first ==
WriteBatchWithIndexInternal::Result::kMergeInProgress) {
// Merge result from DB with merges in Batch
Statistics* statistics = immuable_db_options.statistics.get();
Env* env = immuable_db_options.env;
Logger* logger = immuable_db_options.info_log.get();
Slice* merge_data;
if (key.s->ok()) {
merge_data = iter->value;
*key.s = wbwii.MergeKey(*key.key, iter->value, merge_result.second,
key.value->GetSelf());
} else { // Key not present in db (s.IsNotFound())
merge_data = nullptr;
}
if (merge_operator) {
*key.s = MergeHelper::TimedFullMerge(
merge_operator, *key.key, merge_data,
merge_result.second.GetOperands(), key.value->GetSelf(), logger,
statistics, env);
key.value->PinSelf();
} else {
*key.s =
Status::InvalidArgument("Options::merge_operator must be set");
*key.s = wbwii.MergeKey(*key.key, nullptr, merge_result.second,
key.value->GetSelf());
}
key.value->PinSelf();
}
}
}

@ -8,6 +8,7 @@
#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
#include "db/column_family.h"
#include "db/db_impl/db_impl.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "rocksdb/comparator.h"
@ -155,34 +156,102 @@ int WriteBatchEntryComparator::CompareKey(uint32_t column_family,
}
}
WriteEntry WBWIIteratorImpl::Entry() const {
WriteEntry ret;
Slice blob, xid;
const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
// this is guaranteed with Valid()
assert(iter_entry != nullptr &&
iter_entry->column_family == column_family_id_);
auto s = write_batch_->GetEntryFromDataOffset(
iter_entry->offset, &ret.type, &ret.key, &ret.value, &blob, &xid);
assert(s.ok());
assert(ret.type == kPutRecord || ret.type == kDeleteRecord ||
ret.type == kSingleDeleteRecord || ret.type == kDeleteRangeRecord ||
ret.type == kMergeRecord);
return ret;
}
bool WBWIIteratorImpl::MatchesKey(uint32_t cf_id, const Slice& key) {
if (Valid()) {
return comparator_->CompareKey(cf_id, key, Entry().key) == 0;
} else {
return false;
}
}
WriteBatchWithIndexInternal::WriteBatchWithIndexInternal(
DB* db, ColumnFamilyHandle* column_family)
: db_(db), db_options_(nullptr), column_family_(column_family) {
if (db_ != nullptr && column_family_ == nullptr) {
column_family_ = db_->DefaultColumnFamily();
}
}
WriteBatchWithIndexInternal::WriteBatchWithIndexInternal(
const DBOptions* db_options, ColumnFamilyHandle* column_family)
: db_(nullptr), db_options_(db_options), column_family_(column_family) {}
Status WriteBatchWithIndexInternal::MergeKey(const Slice& key,
const Slice* value,
MergeContext& merge_context,
std::string* result,
Slice* result_operand) {
if (column_family_ != nullptr) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family_);
const auto merge_operator = cfh->cfd()->ioptions()->merge_operator;
if (merge_operator == nullptr) {
return Status::InvalidArgument(
"Merge_operator must be set for column_family");
} else if (db_ != nullptr) {
const ImmutableDBOptions& immutable_db_options =
static_cast_with_check<DBImpl>(db_->GetRootDB())
->immutable_db_options();
Statistics* statistics = immutable_db_options.statistics.get();
Env* env = immutable_db_options.env;
Logger* logger = immutable_db_options.info_log.get();
return MergeHelper::TimedFullMerge(
merge_operator, key, value, merge_context.GetOperands(), result,
logger, statistics, env, result_operand);
} else if (db_options_ != nullptr) {
Statistics* statistics = db_options_->statistics.get();
Env* env = db_options_->env;
Logger* logger = db_options_->info_log.get();
return MergeHelper::TimedFullMerge(
merge_operator, key, value, merge_context.GetOperands(), result,
logger, statistics, env, result_operand);
} else {
return MergeHelper::TimedFullMerge(
merge_operator, key, value, merge_context.GetOperands(), result,
nullptr, nullptr, Env::Default(), result_operand);
}
} else {
return Status::InvalidArgument("Must provide a column_family");
}
}
WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch(
const ImmutableDBOptions& immuable_db_options, WriteBatchWithIndex* batch,
ColumnFamilyHandle* column_family, const Slice& key,
MergeContext* merge_context, WriteBatchEntryComparator* cmp,
WriteBatchWithIndex* batch, const Slice& key, MergeContext* merge_context,
std::string* value, bool overwrite_key, Status* s) {
uint32_t cf_id = GetColumnFamilyID(column_family);
uint32_t cf_id = GetColumnFamilyID(column_family_);
*s = Status::OK();
WriteBatchWithIndexInternal::Result result =
WriteBatchWithIndexInternal::Result::kNotFound;
Result result = kNotFound;
std::unique_ptr<WBWIIterator> iter =
std::unique_ptr<WBWIIterator>(batch->NewIterator(column_family));
std::unique_ptr<WBWIIteratorImpl> iter(
static_cast_with_check<WBWIIteratorImpl>(
batch->NewIterator(column_family_)));
// We want to iterate in the reverse order that the writes were added to the
// batch. Since we don't have a reverse iterator, we must seek past the end.
// TODO(agiardullo): consider adding support for reverse iteration
iter->Seek(key);
while (iter->Valid()) {
const WriteEntry entry = iter->Entry();
if (cmp->CompareKey(cf_id, entry.key, key) != 0) {
break;
}
while (iter->Valid() && iter->MatchesKey(cf_id, key)) {
iter->Next();
}
if (!(*s).ok()) {
return WriteBatchWithIndexInternal::Result::kError;
return WriteBatchWithIndexInternal::kError;
}
if (!iter->Valid()) {
@ -194,12 +263,12 @@ WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch(
Slice entry_value;
while (iter->Valid()) {
const WriteEntry entry = iter->Entry();
if (cmp->CompareKey(cf_id, entry.key, key) != 0) {
if (!iter->MatchesKey(cf_id, key)) {
// Unexpected error or we've reached a different next key
break;
}
const WriteEntry entry = iter->Entry();
switch (entry.type) {
case kPutRecord: {
result = WriteBatchWithIndexInternal::Result::kFound;
@ -250,27 +319,10 @@ WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch(
result == WriteBatchWithIndexInternal::Result::kDeleted) {
// Found a Put or Delete. Merge if necessary.
if (merge_context->GetNumOperands() > 0) {
const MergeOperator* merge_operator;
if (column_family != nullptr) {
auto cfh =
static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
merge_operator = cfh->cfd()->ioptions()->merge_operator;
} else {
*s = Status::InvalidArgument("Must provide a column_family");
result = WriteBatchWithIndexInternal::Result::kError;
return result;
}
Statistics* statistics = immuable_db_options.statistics.get();
Env* env = immuable_db_options.env;
Logger* logger = immuable_db_options.info_log.get();
if (merge_operator) {
*s = MergeHelper::TimedFullMerge(merge_operator, key, &entry_value,
merge_context->GetOperands(), value,
logger, statistics, env);
if (result == WriteBatchWithIndexInternal::Result::kFound) {
*s = MergeKey(key, &entry_value, *merge_context, value);
} else {
*s = Status::InvalidArgument("Options::merge_operator must be set");
*s = MergeKey(key, nullptr, *merge_context, value);
}
if ((*s).ok()) {
result = WriteBatchWithIndexInternal::Result::kFound;

@ -10,6 +10,8 @@
#include <string>
#include <vector>
#include "db/merge_context.h"
#include "memtable/skiplist.h"
#include "options/db_options.h"
#include "port/port.h"
#include "rocksdb/comparator.h"
@ -440,8 +442,98 @@ class WriteBatchEntryComparator {
const ReadableWriteBatch* write_batch_;
};
typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&>
WriteBatchEntrySkipList;
class WBWIIteratorImpl : public WBWIIterator {
public:
WBWIIteratorImpl(uint32_t column_family_id,
WriteBatchEntrySkipList* skip_list,
const ReadableWriteBatch* write_batch,
WriteBatchEntryComparator* comparator)
: column_family_id_(column_family_id),
skip_list_iter_(skip_list),
write_batch_(write_batch),
comparator_(comparator) {}
~WBWIIteratorImpl() override {}
bool Valid() const override {
if (!skip_list_iter_.Valid()) {
return false;
}
const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
return (iter_entry != nullptr &&
iter_entry->column_family == column_family_id_);
}
void SeekToFirst() override {
WriteBatchIndexEntry search_entry(
nullptr /* search_key */, column_family_id_,
true /* is_forward_direction */, true /* is_seek_to_first */);
skip_list_iter_.Seek(&search_entry);
}
void SeekToLast() override {
WriteBatchIndexEntry search_entry(
nullptr /* search_key */, column_family_id_ + 1,
true /* is_forward_direction */, true /* is_seek_to_first */);
skip_list_iter_.Seek(&search_entry);
if (!skip_list_iter_.Valid()) {
skip_list_iter_.SeekToLast();
} else {
skip_list_iter_.Prev();
}
}
void Seek(const Slice& key) override {
WriteBatchIndexEntry search_entry(&key, column_family_id_,
true /* is_forward_direction */,
false /* is_seek_to_first */);
skip_list_iter_.Seek(&search_entry);
}
void SeekForPrev(const Slice& key) override {
WriteBatchIndexEntry search_entry(&key, column_family_id_,
false /* is_forward_direction */,
false /* is_seek_to_first */);
skip_list_iter_.SeekForPrev(&search_entry);
}
void Next() override { skip_list_iter_.Next(); }
void Prev() override { skip_list_iter_.Prev(); }
WriteEntry Entry() const override;
Status status() const override {
// this is in-memory data structure, so the only way status can be non-ok is
// through memory corruption
return Status::OK();
}
const WriteBatchIndexEntry* GetRawEntry() const {
return skip_list_iter_.key();
}
bool MatchesKey(uint32_t cf_id, const Slice& key);
private:
uint32_t column_family_id_;
WriteBatchEntrySkipList::Iterator skip_list_iter_;
const ReadableWriteBatch* write_batch_;
WriteBatchEntryComparator* comparator_;
};
class WriteBatchWithIndexInternal {
public:
// For GetFromBatchAndDB or similar
explicit WriteBatchWithIndexInternal(DB* db,
ColumnFamilyHandle* column_family);
// For GetFromBatch or similar
explicit WriteBatchWithIndexInternal(const DBOptions* db_options,
ColumnFamilyHandle* column_family);
enum Result { kFound, kDeleted, kNotFound, kMergeInProgress, kError };
// If batch contains a value for key, store it in *value and return kFound.
@ -452,11 +544,25 @@ class WriteBatchWithIndexInternal {
// and return kMergeInProgress
// If batch does not contain this key, return kNotFound
// Else, return kError on error with error Status stored in *s.
static WriteBatchWithIndexInternal::Result GetFromBatch(
const ImmutableDBOptions& ioptions, WriteBatchWithIndex* batch,
ColumnFamilyHandle* column_family, const Slice& key,
MergeContext* merge_context, WriteBatchEntryComparator* cmp,
std::string* value, bool overwrite_key, Status* s);
Result GetFromBatch(WriteBatchWithIndex* batch, const Slice& key,
std::string* value, bool overwrite_key, Status* s) {
return GetFromBatch(batch, key, &merge_context_, value, overwrite_key, s);
}
Result GetFromBatch(WriteBatchWithIndex* batch, const Slice& key,
MergeContext* merge_context, std::string* value,
bool overwrite_key, Status* s);
Status MergeKey(const Slice& key, const Slice* value, std::string* result,
Slice* result_operand = nullptr) {
return MergeKey(key, value, merge_context_, result, result_operand);
}
Status MergeKey(const Slice& key, const Slice* value, MergeContext& context,
std::string* result, Slice* result_operand = nullptr);
private:
DB* db_;
const DBOptions* db_options_;
ColumnFamilyHandle* column_family_;
MergeContext merge_context_;
};
} // namespace ROCKSDB_NAMESPACE

Loading…
Cancel
Save