Extend MultiGet batching to Transactions (#5210)

Summary:
MultiGet batching was implemented in #5011 in order to reduce CPU utilization when looking up multiple keys at once. This PR implements corresponding ```MultiGet``` and ```MultiGetSingleCFForUpdate``` in ```rocksdb::Transaction``` that call the underlying batching implementation.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5210

Differential Revision: D15048164

Pulled By: anand1976

fbshipit-source-id: c52f6043102ab0cbc723f4cba2a7b7d1767f6f52
main
anand76 6 years ago committed by Facebook Github Bot
parent a7d103198e
commit 1c8cbf315f
  1. 13
      include/rocksdb/utilities/transaction.h
  2. 12
      include/rocksdb/utilities/write_batch_with_index.h
  3. 10
      utilities/transactions/transaction_base.cc
  4. 6
      utilities/transactions/transaction_base.h
  5. 87
      utilities/transactions/transaction_test.cc
  6. 103
      utilities/write_batch_with_index/write_batch_with_index.cc

@ -205,6 +205,19 @@ class Transaction {
const std::vector<Slice>& keys, const std::vector<Slice>& keys,
std::vector<std::string>* values) = 0; std::vector<std::string>* values) = 0;
// Batched version of MultiGet - see DBImpl::MultiGet(). Sub-classes are
// expected to override this with an implementation that calls
// DBImpl::MultiGet()
virtual void MultiGet(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
const bool /*sorted_input*/ = false) {
for (size_t i = 0; i < num_keys; ++i) {
statuses[i] = Get(options, column_family, keys[i], &values[i]);
}
}
// Read this key and ensure that this transaction will only // Read this key and ensure that this transaction will only
// be able to be committed if this key is not written outside this // be able to be committed if this key is not written outside this
// transaction after it has first been read (or after the snapshot if a // transaction after it has first been read (or after the snapshot if a

@ -14,6 +14,7 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include <vector>
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
@ -207,6 +208,12 @@ class WriteBatchWithIndex : public WriteBatchBase {
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value); PinnableSlice* value);
void MultiGetFromBatchAndDB(DB* db, const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
bool sorted_input);
// Records the state of the batch for future calls to RollbackToSavePoint(). // Records the state of the batch for future calls to RollbackToSavePoint().
// May be called multiple times to set multiple save points. // May be called multiple times to set multiple save points.
void SetSavePoint() override; void SetSavePoint() override;
@ -246,6 +253,11 @@ class WriteBatchWithIndex : public WriteBatchBase {
Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options, Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value, ReadCallback* callback); PinnableSlice* value, ReadCallback* callback);
void MultiGetFromBatchAndDB(DB* db, const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
bool sorted_input, ReadCallback* callback);
struct Rep; struct Rep;
std::unique_ptr<Rep> rep; std::unique_ptr<Rep> rep;
}; };

@ -281,6 +281,16 @@ std::vector<Status> TransactionBaseImpl::MultiGet(
return stat_list; return stat_list;
} }
void TransactionBaseImpl::MultiGet(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
bool sorted_input) {
write_batch_.MultiGetFromBatchAndDB(db_, read_options, column_family,
num_keys, keys, values, statuses,
sorted_input);
}
std::vector<Status> TransactionBaseImpl::MultiGetForUpdate( std::vector<Status> TransactionBaseImpl::MultiGetForUpdate(
const ReadOptions& read_options, const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_family, const std::vector<ColumnFamilyHandle*>& column_family,

@ -80,6 +80,7 @@ class TransactionBaseImpl : public Transaction {
exclusive, do_validate); exclusive, do_validate);
} }
using Transaction::MultiGet;
std::vector<Status> MultiGet( std::vector<Status> MultiGet(
const ReadOptions& options, const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family, const std::vector<ColumnFamilyHandle*>& column_family,
@ -94,6 +95,11 @@ class TransactionBaseImpl : public Transaction {
keys, values); keys, values);
} }
void MultiGet(const ReadOptions& options, ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys, PinnableSlice* values,
Status* statuses, bool sorted_input = false) override;
using Transaction::MultiGetForUpdate;
std::vector<Status> MultiGetForUpdate( std::vector<Status> MultiGetForUpdate(
const ReadOptions& options, const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family, const std::vector<ColumnFamilyHandle*>& column_family,

@ -2676,7 +2676,6 @@ TEST_P(TransactionTest, ColumnFamiliesTest) {
handles[0], handles[2]}; handles[0], handles[2]};
std::vector<Slice> multiget_keys = {"AAA", "AAAZZZ", "foo", "foo"}; std::vector<Slice> multiget_keys = {"AAA", "AAAZZZ", "foo", "foo"};
std::vector<std::string> values(4); std::vector<std::string> values(4);
std::vector<Status> results = txn->MultiGetForUpdate( std::vector<Status> results = txn->MultiGetForUpdate(
snapshot_read_options, multiget_cfh, multiget_keys, &values); snapshot_read_options, multiget_cfh, multiget_keys, &values);
ASSERT_OK(results[0]); ASSERT_OK(results[0]);
@ -2736,6 +2735,92 @@ TEST_P(TransactionTest, ColumnFamiliesTest) {
} }
} }
TEST_P(TransactionTest, MultiGetBatchedTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
TransactionOptions txn_options;
string value;
Status s;
ColumnFamilyHandle* cf;
ColumnFamilyOptions cf_options;
// Create a new column families
s = db->CreateColumnFamily(cf_options, "CF", &cf);
ASSERT_OK(s);
delete cf;
delete db;
db = nullptr;
// open DB with three column families
std::vector<ColumnFamilyDescriptor> column_families;
// have to open default column family
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
// open the new column families
cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
column_families.push_back(ColumnFamilyDescriptor("CF", cf_options));
std::vector<ColumnFamilyHandle*> handles;
options.merge_operator = MergeOperators::CreateStringAppendOperator();
s = TransactionDB::Open(options, txn_db_options, dbname, column_families,
&handles, &db);
assert(db != nullptr);
ASSERT_OK(s);
// Write some data to the db
WriteBatch batch;
batch.Put(handles[1], "aaa", "val1");
batch.Put(handles[1], "bbb", "val2");
batch.Put(handles[1], "ccc", "val3");
batch.Put(handles[1], "ddd", "foo");
batch.Put(handles[1], "eee", "val5");
batch.Put(handles[1], "fff", "val6");
batch.Merge(handles[1], "ggg", "foo");
s = db->Write(write_options, &batch);
ASSERT_OK(s);
Transaction* txn = db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
txn->SetSnapshot();
snapshot_read_options.snapshot = txn->GetSnapshot();
txn_options.set_snapshot = true;
// Write some data to the db
s = txn->Delete(handles[1], "bbb");
ASSERT_OK(s);
s = txn->Put(handles[1], "ccc", "val3_new");
ASSERT_OK(s);
s = txn->Merge(handles[1], "ddd", "bar");
ASSERT_OK(s);
std::vector<Slice> keys = {"aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg"};
std::vector<PinnableSlice> values(keys.size());
std::vector<Status> statuses(keys.size());
txn->MultiGet(snapshot_read_options, handles[1], keys.size(), keys.data(),
values.data(), statuses.data());
ASSERT_TRUE(statuses[0].ok());
ASSERT_EQ(values[0], "val1");
ASSERT_TRUE(statuses[1].IsNotFound());
ASSERT_TRUE(statuses[2].ok());
ASSERT_EQ(values[2], "val3_new");
ASSERT_TRUE(statuses[3].IsMergeInProgress());
ASSERT_TRUE(statuses[4].ok());
ASSERT_EQ(values[4], "val5");
ASSERT_TRUE(statuses[5].ok());
ASSERT_EQ(values[5], "val6");
ASSERT_TRUE(statuses[6].ok());
ASSERT_EQ(values[6], "foo");
delete txn;
for (auto handle : handles) {
delete handle;
}
}
TEST_P(TransactionTest, ColumnFamiliesTest2) { TEST_P(TransactionTest, ColumnFamiliesTest2) {
WriteOptions write_options; WriteOptions write_options;
ReadOptions read_options, snapshot_read_options; ReadOptions read_options, snapshot_read_options;

@ -922,6 +922,109 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(
return s; return s;
} }
void WriteBatchWithIndex::MultiGetFromBatchAndDB(
DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys, PinnableSlice* values,
Status* statuses, bool sorted_input) {
MultiGetFromBatchAndDB(db, read_options, column_family, num_keys, keys,
values, statuses, sorted_input, nullptr);
}
void WriteBatchWithIndex::MultiGetFromBatchAndDB(
DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys, PinnableSlice* values,
Status* statuses, bool sorted_input, ReadCallback* callback) {
const ImmutableDBOptions& immuable_db_options =
static_cast_with_check<DBImpl, DB>(db->GetRootDB())
->immutable_db_options();
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
// To hold merges from the write batch
autovector<std::pair<WriteBatchWithIndexInternal::Result, MergeContext>,
MultiGetContext::MAX_BATCH_SIZE>
merges;
// Since the lifetime of the WriteBatch is the same as that of the transaction
// we cannot pin it as otherwise the returned value will not be available
// after the transaction finishes.
for (size_t i = 0; i < num_keys; ++i) {
MergeContext merge_context;
PinnableSlice* pinnable_val = &values[i];
std::string& batch_value = *pinnable_val->GetSelf();
Status* s = &statuses[i];
WriteBatchWithIndexInternal::Result result =
WriteBatchWithIndexInternal::GetFromBatch(
immuable_db_options, this, column_family, keys[i], &merge_context,
&rep->comparator, &batch_value, rep->overwrite_key, s);
if (result == WriteBatchWithIndexInternal::Result::kFound) {
pinnable_val->PinSelf();
continue;
}
if (result == WriteBatchWithIndexInternal::Result::kDeleted) {
*s = Status::NotFound();
continue;
}
if (result == WriteBatchWithIndexInternal::Result::kError) {
continue;
}
if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
rep->overwrite_key == true) {
// Since we've overwritten keys, we do not know what other operations are
// in this batch for this key, so we cannot do a Merge to compute the
// result. Instead, we will simply return MergeInProgress.
*s = Status::MergeInProgress();
continue;
}
assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress ||
result == WriteBatchWithIndexInternal::Result::kNotFound);
key_context.emplace_back(keys[i], &values[i], &statuses[i]);
merges.emplace_back(result, std::move(merge_context));
}
// Did not find key in batch OR could not resolve Merges. Try DB.
static_cast_with_check<DBImpl, DB>(db->GetRootDB())
->MultiGetImpl(read_options, column_family, key_context, sorted_input,
callback);
ColumnFamilyHandleImpl* cfh =
reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
const MergeOperator* merge_operator = cfh->cfd()->ioptions()->merge_operator;
for (auto iter = key_context.begin(); iter != key_context.end(); ++iter) {
KeyContext& key = *iter;
if (key.s->ok() || key.s->IsNotFound()) { // DB Get Succeeded
size_t index = iter - key_context.begin();
std::pair<WriteBatchWithIndexInternal::Result, MergeContext>&
merge_result = merges[index];
if (merge_result.first ==
WriteBatchWithIndexInternal::Result::kMergeInProgress) {
// Merge result from DB with merges in Batch
Statistics* statistics = immuable_db_options.statistics.get();
Env* env = immuable_db_options.env;
Logger* logger = immuable_db_options.info_log.get();
Slice* merge_data;
if (key.s->ok()) {
merge_data = iter->value;
} else { // Key not present in db (s.IsNotFound())
merge_data = nullptr;
}
if (merge_operator) {
*key.s = MergeHelper::TimedFullMerge(
merge_operator, *key.key, merge_data,
merge_result.second.GetOperands(), key.value->GetSelf(), logger,
statistics, env);
key.value->PinSelf();
} else {
*key.s =
Status::InvalidArgument("Options::merge_operator must be set");
}
}
}
}
}
void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); } void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); }
Status WriteBatchWithIndex::RollbackToSavePoint() { Status WriteBatchWithIndex::RollbackToSavePoint() {

Loading…
Cancel
Save