WriteUnPrepared: Add new variable write_batch_flush_threshold (#5633)

Summary:
Instead of reusing `TransactionOptions::max_write_batch_size` for determining when to flush a write batch for write unprepared, add a new variable called `write_batch_flush_threshold` for this use case instead.

Also add `TransactionDBOptions::default_write_batch_flush_threshold` which sets the default value if `TransactionOptions::write_batch_flush_threshold` is unspecified.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5633

Differential Revision: D16520364

Pulled By: lth

fbshipit-source-id: d75ae5a2141ce7708982d5069dc3f0b58d250e8c
main
Manuel Ung 6 years ago committed by Facebook Github Bot
parent 3617287e0e
commit 41df734830
  1. 10
      include/rocksdb/utilities/transaction_db.h
  2. 12
      utilities/transactions/transaction_test.cc
  3. 12
      utilities/transactions/write_unprepared_transaction_test.cc
  4. 27
      utilities/transactions/write_unprepared_txn.cc
  5. 4
      utilities/transactions/write_unprepared_txn.h

@ -101,6 +101,11 @@ struct TransactionDBOptions {
// ordering rather than concurrency control.
bool skip_concurrency_control = false;
// This option is only valid for write unprepared. If a write batch exceeds
// this threshold, then the transaction will implicitly flush the currently
// pending writes into the database. A value of 0 or less means no limit.
ssize_t default_write_batch_flush_threshold = 0;
private:
// 128 entries
size_t wp_snapshot_cache_bits = static_cast<size_t>(7);
@ -162,6 +167,11 @@ struct TransactionOptions {
// back/commit before new transactions start.
// Default: false
bool skip_concurrency_control = false;
// See TransactionDBOptions::default_write_batch_flush_threshold for
// description. If a negative value is specified, then the default value from
// TransactionDBOptions is used.
ssize_t write_batch_flush_threshold = -1;
};
// The per-write optimizations that do not involve transactions. TransactionDB

@ -5303,16 +5303,8 @@ TEST_P(TransactionTest, MemoryLimitTest) {
ASSERT_EQ(2, txn->GetNumPuts());
s = txn->Put(Slice("b"), Slice("...."));
auto pdb = reinterpret_cast<PessimisticTransactionDB*>(db);
// For write unprepared, write batches exceeding max_write_batch_size will
// just flush to DB instead of returning a memory limit error.
if (pdb->GetTxnDBOptions().write_policy != WRITE_UNPREPARED) {
ASSERT_TRUE(s.IsMemoryLimit());
ASSERT_EQ(2, txn->GetNumPuts());
} else {
ASSERT_OK(s);
ASSERT_EQ(3, txn->GetNumPuts());
}
ASSERT_TRUE(s.IsMemoryLimit());
ASSERT_EQ(2, txn->GetNumPuts());
txn->Rollback();
delete txn;

@ -157,7 +157,7 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWriteStress) {
Transaction* txn;
TransactionOptions txn_options;
// batch_size of 1 causes writes to DB for every marker.
txn_options.max_write_batch_size = 1;
txn_options.write_batch_flush_threshold = 1;
ReadOptions read_options;
for (uint32_t i = 0; i < kNumIter; i++) {
@ -311,7 +311,7 @@ TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) {
// batch_size of 1 causes writes to DB for every marker.
for (size_t batch_size : {1, 1000000}) {
txn_options.max_write_batch_size = batch_size;
txn_options.write_batch_flush_threshold = batch_size;
for (bool empty : {true, false}) {
for (Action a : {UNPREPARED, ROLLBACK, COMMIT}) {
for (int num_batches = 1; num_batches < 10; num_batches++) {
@ -332,7 +332,7 @@ TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) {
txn->SetName("xid");
for (int i = 0; i < num_batches; i++) {
ASSERT_OK(txn->Put("k" + ToString(i), "value" + ToString(i)));
if (txn_options.max_write_batch_size == 1) {
if (txn_options.write_batch_flush_threshold == 1) {
ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i + 1);
} else {
ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0);
@ -398,7 +398,7 @@ TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) {
// batch_size of 1 causes writes to DB for every marker.
for (size_t batch_size : {1, 1000000}) {
txn_options.max_write_batch_size = batch_size;
txn_options.write_batch_flush_threshold = batch_size;
for (bool prepare : {false, true}) {
for (bool commit : {false, true}) {
ReOpen();
@ -408,7 +408,7 @@ TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) {
for (int i = 0; i < kNumKeys; i++) {
txn->Put("k" + ToString(i), "v" + ToString(i));
if (txn_options.max_write_batch_size == 1) {
if (txn_options.write_batch_flush_threshold == 1) {
ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i + 1);
} else {
ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0);
@ -457,7 +457,7 @@ TEST_P(WriteUnpreparedTransactionTest, MarkLogWithPrepSection) {
WriteOptions write_options;
TransactionOptions txn_options;
// batch_size of 1 causes writes to DB for every marker.
txn_options.max_write_batch_size = 1;
txn_options.write_batch_flush_threshold = 1;
const int kNumKeys = 10;
WriteOptions wopts;

@ -35,13 +35,12 @@ WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db,
wupt_db_(txn_db),
recovered_txn_(false),
largest_validated_seq_(0) {
max_write_batch_size_ = txn_options.max_write_batch_size;
// We set max bytes to zero so that we don't get a memory limit error.
// Instead of trying to keep write batch strictly under the size limit, we
// just flush to DB when the limit is exceeded in write unprepared, to avoid
// having retry logic. This also allows very big key-value pairs that exceed
// max bytes to succeed.
write_batch_.SetMaxBytes(0);
if (txn_options.write_batch_flush_threshold < 0) {
write_batch_flush_threshold_ =
txn_db_impl_->GetTxnDBOptions().default_write_batch_flush_threshold;
} else {
write_batch_flush_threshold_ = txn_options.write_batch_flush_threshold;
}
}
WriteUnpreparedTxn::~WriteUnpreparedTxn() {
@ -71,8 +70,13 @@ WriteUnpreparedTxn::~WriteUnpreparedTxn() {
void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) {
PessimisticTransaction::Initialize(txn_options);
max_write_batch_size_ = txn_options.max_write_batch_size;
write_batch_.SetMaxBytes(0);
if (txn_options.write_batch_flush_threshold < 0) {
write_batch_flush_threshold_ =
txn_db_impl_->GetTxnDBOptions().default_write_batch_flush_threshold;
} else {
write_batch_flush_threshold_ = txn_options.write_batch_flush_threshold;
}
unprep_seqs_.clear();
recovered_txn_ = false;
largest_validated_seq_ = 0;
@ -222,8 +226,9 @@ Status WriteUnpreparedTxn::RebuildFromWriteBatch(WriteBatch* wb) {
Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() {
const bool kPrepared = true;
Status s;
if (max_write_batch_size_ != 0 &&
write_batch_.GetDataSize() > max_write_batch_size_) {
if (write_batch_flush_threshold_ > 0 &&
write_batch_.GetDataSize() >
static_cast<size_t>(write_batch_flush_threshold_)) {
assert(GetState() != PREPARED);
s = FlushWriteBatchToDB(!kPrepared);
}

@ -164,10 +164,10 @@ class WriteUnpreparedTxn : public WritePreparedTxn {
Status HandleWrite(std::function<Status()> do_write);
// For write unprepared, we check on every writebatch append to see if
// max_write_batch_size_ has been exceeded, and then call
// write_batch_flush_threshold_ has been exceeded, and then call
// FlushWriteBatchToDB if so. This logic is encapsulated in
// MaybeFlushWriteBatchToDB.
size_t max_write_batch_size_;
ssize_t write_batch_flush_threshold_;
WriteUnpreparedTxnDB* wupt_db_;
// Ordered list of unprep_seq sequence numbers that we have already written

Loading…
Cancel
Save