WritePrepared Txn: duplicate keys

Summary:
With WriteCommitted, when the write batch has duplicate keys, the txn db simply inserts them to the db with different seq numbers and let the db ignore/merge the duplicate values at the read time. With WritePrepared all the entries of the batch are inserted with the same seq number which prevents us from benefiting from this simple solution.

This patch applies a hackish solution to unblock the end-to-end testing. The hack is to be replaced with a proper solution soon. The patch simply detects the duplicate key insertions, and mark the previous one as obsolete. Then before writing to the db it rewrites the batch eliminating the obsolete keys. This would incur a memcpy cost. Furthermore handing duplicate merge would require to do FullMerge instead of simply ignoring the previous value, which is not handled by this patch.
Closes https://github.com/facebook/rocksdb/pull/2969

Differential Revision: D5976337

Pulled By: maysamyabandeh

fbshipit-source-id: 114e65b66f137d8454ff2d1d782b8c05da95f989
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent 1026e794a3
commit 4e3c3d8c6a
  1. 8
      include/rocksdb/utilities/write_batch_with_index.h
  2. 4
      include/rocksdb/write_batch.h
  3. 2
      utilities/transactions/pessimistic_transaction_db.cc
  4. 66
      utilities/transactions/write_prepared_transaction_test.cc
  5. 16
      utilities/transactions/write_prepared_txn.cc
  6. 80
      utilities/write_batch_with_index/write_batch_with_index.cc

@ -228,6 +228,14 @@ class WriteBatchWithIndex : public WriteBatchBase {
private:
friend class WritePreparedTxn;
// TODO(myabandeh): this is hackish, non-efficient solution to enable the e2e
// unit tests. Replace it with a proper solution. Collapse the WriteBatch to
// remove the duplicate keys. The index will not be updated after this.
// Returns false if collapse was not necessary
bool Collapse();
void DisableDuplicateMergeKeys() { allow_dup_merge_ = false; }
bool allow_dup_merge_ = true;
Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value, ReadCallback* callback);

@ -327,6 +327,10 @@ class WriteBatch : public WriteBatchBase {
private:
friend class WriteBatchInternal;
friend class LocalSavePoint;
// TODO(myabandeh): this is needed for a hack to collapse the write batch and
// remove duplicate keys. Remove it when the hack is replaced with a propper
// solution.
friend class WriteBatchWithIndex;
SavePoints* save_points_;
// When sending a WriteBatch through WriteImpl we might want to

@ -465,6 +465,8 @@ Status PessimisticTransactionDB::Write(const WriteOptions& opts,
// concurrent transactions.
Transaction* txn = BeginInternalTransaction(opts);
txn->DisableIndexing();
// TODO(myabandeh): indexing being disabled we need another machanism to
// detect duplicattes in the input patch
auto txn_impl =
static_cast_with_check<PessimisticTransaction, Transaction>(txn);

@ -1238,6 +1238,72 @@ TEST_P(WritePreparedTransactionTest, RollbackTest) {
}
}
// TODO(myabandeh): move it to transaction_test when it is extended to
// WROTE_PREPARED.
// Test that the transactional db can handle duplicate keys in the write batch
TEST_P(WritePreparedTransactionTest, DuplicateKeyTest) {
for (bool do_prepare : {true, false}) {
TransactionOptions txn_options;
WriteOptions write_options;
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
auto s = txn0->SetName("xid");
ASSERT_OK(s);
s = txn0->Put(Slice("foo0"), Slice("bar0a"));
ASSERT_OK(s);
s = txn0->Put(Slice("foo0"), Slice("bar0b"));
ASSERT_OK(s);
s = txn0->Put(Slice("foo1"), Slice("bar1"));
ASSERT_OK(s);
s = txn0->Merge(Slice("foo2"), Slice("bar2a"));
ASSERT_OK(s);
// TODO(myabandeh): enable this after duplicatae merge keys are supported
// s = txn0->Merge(Slice("foo2"), Slice("bar2a"));
// ASSERT_OK(s);
s = txn0->Put(Slice("foo2"), Slice("bar2b"));
ASSERT_OK(s);
s = txn0->Put(Slice("foo3"), Slice("bar3"));
ASSERT_OK(s);
// TODO(myabandeh): enable this after duplicatae merge keys are supported
// s = txn0->Merge(Slice("foo3"), Slice("bar3"));
// ASSERT_OK(s);
s = txn0->Put(Slice("foo4"), Slice("bar4"));
ASSERT_OK(s);
s = txn0->Delete(Slice("foo4"));
ASSERT_OK(s);
s = txn0->SingleDelete(Slice("foo4"));
ASSERT_OK(s);
if (do_prepare) {
s = txn0->Prepare();
ASSERT_OK(s);
}
s = txn0->Commit();
ASSERT_OK(s);
if (!do_prepare) {
auto pdb = reinterpret_cast<PessimisticTransactionDB*>(db);
pdb->UnregisterTransaction(txn0);
}
delete txn0;
ReadOptions ropt;
PinnableSlice pinnable_val;
s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == ("bar0b"));
s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == ("bar1"));
s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == ("bar2b"));
s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == ("bar3"));
s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
}
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -27,6 +27,7 @@ WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db,
: PessimisticTransaction(txn_db, write_options, txn_options),
wpt_db_(txn_db) {
PessimisticTransaction::Initialize(txn_options);
GetWriteBatch()->DisableDuplicateMergeKeys();
}
Status WritePreparedTxn::Get(const ReadOptions& read_options,
@ -47,6 +48,11 @@ Status WritePreparedTxn::PrepareInternal() {
WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_);
const bool disable_memtable = true;
uint64_t seq_used = kMaxSequenceNumber;
bool collapsed = GetWriteBatch()->Collapse();
if (collapsed) {
ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
"Collapse overhead due to duplicate keys");
}
Status s =
db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, &log_number_, /*log ref*/ 0,
@ -59,11 +65,17 @@ Status WritePreparedTxn::PrepareInternal() {
}
Status WritePreparedTxn::CommitWithoutPrepareInternal() {
bool collapsed = GetWriteBatch()->Collapse();
if (collapsed) {
ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
"Collapse overhead due to duplicate keys");
}
return CommitBatchInternal(GetWriteBatch()->GetWriteBatch());
}
Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) {
// In the absense of Prepare markers, use Noop as a batch separator
// TODO(myabandeh): handle the duplicate keys in the batch
// In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(batch);
const bool disable_memtable = true;
const uint64_t no_log_ref = 0;
@ -112,7 +124,7 @@ Status WritePreparedTxn::RollbackInternal() {
WriteBatch rollback_batch;
assert(GetId() != kMaxSequenceNumber);
assert(GetId() > 0);
// In the absense of Prepare markers, use Noop as a batch separator
// In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(&rollback_batch);
// In WritePrepared, the txn is is the same as prepare seq
auto last_visible_txn = GetId() - 1;

@ -7,8 +7,8 @@
#include "rocksdb/utilities/write_batch_with_index.h"
#include <limits>
#include <memory>
#include <vector>
#include "db/column_family.h"
#include "db/db_impl.h"
@ -399,6 +399,7 @@ struct WriteBatchWithIndex::Rep {
WriteBatchEntrySkipList skip_list;
bool overwrite_key;
size_t last_entry_offset;
std::vector<size_t> obsolete_offsets;
// Remember current offset of internal write batch, which is used as
// the starting offset of the next record.
@ -450,6 +451,7 @@ bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId(
}
WriteBatchIndexEntry* non_const_entry =
const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry());
obsolete_offsets.push_back(non_const_entry->offset);
non_const_entry->offset = last_entry_offset;
return true;
}
@ -576,6 +578,66 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; }
bool WriteBatchWithIndex::Collapse() {
if (rep->obsolete_offsets.size() == 0) {
return false;
}
WriteBatch& write_batch = rep->write_batch;
assert(write_batch.Count() != 0);
size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch);
Slice input(write_batch.Data());
input.remove_prefix(offset);
std::string collapsed_buf;
collapsed_buf.resize(WriteBatchInternal::kHeader);
size_t count = 0;
Status s;
// Loop through all entries in the write batch and add keep them if they are
// not obsolete by a newere entry.
while (s.ok() && !input.empty()) {
Slice key, value, blob, xid;
uint32_t column_family_id = 0; // default
char tag = 0;
// set offset of current entry for call to AddNewEntry()
size_t last_entry_offset = input.data() - write_batch.Data().data();
s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key,
&value, &blob, &xid);
if (rep->obsolete_offsets.front() == last_entry_offset) {
rep->obsolete_offsets.erase(rep->obsolete_offsets.begin());
continue;
}
switch (tag) {
case kTypeColumnFamilyValue:
case kTypeValue:
case kTypeColumnFamilyDeletion:
case kTypeDeletion:
case kTypeColumnFamilySingleDeletion:
case kTypeSingleDeletion:
case kTypeColumnFamilyMerge:
case kTypeMerge:
count++;
break;
case kTypeLogData:
case kTypeBeginPrepareXID:
case kTypeEndPrepareXID:
case kTypeCommitXID:
case kTypeRollbackXID:
case kTypeNoop:
break;
default:
assert(0);
}
size_t entry_offset = input.data() - write_batch.Data().data();
const std::string& wb_data = write_batch.Data();
Slice entry_ptr = Slice(wb_data.data() + last_entry_offset,
entry_offset - last_entry_offset);
collapsed_buf.append(entry_ptr.data(), entry_ptr.size());
}
write_batch.rep_ = std::move(collapsed_buf);
WriteBatchInternal::SetCount(&write_batch, static_cast<int>(count));
return true;
}
WBWIIterator* WriteBatchWithIndex::NewIterator() {
return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch);
}
@ -689,7 +751,15 @@ Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family,
rep->SetLastEntryOffset();
auto s = rep->write_batch.Merge(column_family, key, value);
if (s.ok()) {
auto size_before = rep->obsolete_offsets.size();
rep->AddOrUpdateIndex(column_family, key);
auto size_after = rep->obsolete_offsets.size();
bool duplicate_key = size_before != size_after;
if (!allow_dup_merge_ && duplicate_key) {
assert(0);
return Status::NotSupported(
"Duplicate key with merge value is not supported yet");
}
}
return s;
}
@ -698,7 +768,15 @@ Status WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) {
rep->SetLastEntryOffset();
auto s = rep->write_batch.Merge(key, value);
if (s.ok()) {
auto size_before = rep->obsolete_offsets.size();
rep->AddOrUpdateIndex(key);
auto size_after = rep->obsolete_offsets.size();
bool duplicate_key = size_before != size_after;
if (!allow_dup_merge_ && duplicate_key) {
assert(0);
return Status::NotSupported(
"Duplicate key with merge value is not supported yet");
}
}
return s;
}

Loading…
Cancel
Save