From 41df7348308fe74fb92bbfa0e330d863524a381a Mon Sep 17 00:00:00 2001 From: Manuel Ung Date: Fri, 26 Jul 2019 12:52:07 -0700 Subject: [PATCH] WriteUnPrepared: Add new variable write_batch_flush_threshold (#5633) Summary: Instead of reusing `TransactionOptions::max_write_batch_size` for determining when to flush a write batch for write unprepared, add a new variable called `write_batch_flush_threshold` for this use case instead. Also add `TransactionDBOptions::default_write_batch_flush_threshold` which sets the default value if `TransactionOptions::write_batch_flush_threshold` is unspecified. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5633 Differential Revision: D16520364 Pulled By: lth fbshipit-source-id: d75ae5a2141ce7708982d5069dc3f0b58d250e8c --- include/rocksdb/utilities/transaction_db.h | 10 +++++++ utilities/transactions/transaction_test.cc | 12 ++------- .../write_unprepared_transaction_test.cc | 12 ++++----- .../transactions/write_unprepared_txn.cc | 27 +++++++++++-------- utilities/transactions/write_unprepared_txn.h | 4 +-- 5 files changed, 36 insertions(+), 29 deletions(-) diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index db32ba0bc..33826bab8 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -101,6 +101,11 @@ struct TransactionDBOptions { // ordering rather than concurrency control. bool skip_concurrency_control = false; + // This option is only valid for write unprepared. If a write batch exceeds + // this threshold, then the transaction will implicitly flush the currently + // pending writes into the database. A value of 0 or less means no limit. + ssize_t default_write_batch_flush_threshold = 0; + private: // 128 entries size_t wp_snapshot_cache_bits = static_cast(7); @@ -162,6 +167,11 @@ struct TransactionOptions { // back/commit before new transactions start. // Default: false bool skip_concurrency_control = false; + + // See TransactionDBOptions::default_write_batch_flush_threshold for + // description. If a negative value is specified, then the default value from + // TransactionDBOptions is used. + ssize_t write_batch_flush_threshold = -1; }; // The per-write optimizations that do not involve transactions. TransactionDB diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 534103a54..98548dd95 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -5303,16 +5303,8 @@ TEST_P(TransactionTest, MemoryLimitTest) { ASSERT_EQ(2, txn->GetNumPuts()); s = txn->Put(Slice("b"), Slice("....")); - auto pdb = reinterpret_cast(db); - // For write unprepared, write batches exceeding max_write_batch_size will - // just flush to DB instead of returning a memory limit error. - if (pdb->GetTxnDBOptions().write_policy != WRITE_UNPREPARED) { - ASSERT_TRUE(s.IsMemoryLimit()); - ASSERT_EQ(2, txn->GetNumPuts()); - } else { - ASSERT_OK(s); - ASSERT_EQ(3, txn->GetNumPuts()); - } + ASSERT_TRUE(s.IsMemoryLimit()); + ASSERT_EQ(2, txn->GetNumPuts()); txn->Rollback(); delete txn; diff --git a/utilities/transactions/write_unprepared_transaction_test.cc b/utilities/transactions/write_unprepared_transaction_test.cc index a2546229e..feaedea06 100644 --- a/utilities/transactions/write_unprepared_transaction_test.cc +++ b/utilities/transactions/write_unprepared_transaction_test.cc @@ -157,7 +157,7 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWriteStress) { Transaction* txn; TransactionOptions txn_options; // batch_size of 1 causes writes to DB for every marker. - txn_options.max_write_batch_size = 1; + txn_options.write_batch_flush_threshold = 1; ReadOptions read_options; for (uint32_t i = 0; i < kNumIter; i++) { @@ -311,7 +311,7 @@ TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) { // batch_size of 1 causes writes to DB for every marker. for (size_t batch_size : {1, 1000000}) { - txn_options.max_write_batch_size = batch_size; + txn_options.write_batch_flush_threshold = batch_size; for (bool empty : {true, false}) { for (Action a : {UNPREPARED, ROLLBACK, COMMIT}) { for (int num_batches = 1; num_batches < 10; num_batches++) { @@ -332,7 +332,7 @@ TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) { txn->SetName("xid"); for (int i = 0; i < num_batches; i++) { ASSERT_OK(txn->Put("k" + ToString(i), "value" + ToString(i))); - if (txn_options.max_write_batch_size == 1) { + if (txn_options.write_batch_flush_threshold == 1) { ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i + 1); } else { ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0); @@ -398,7 +398,7 @@ TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) { // batch_size of 1 causes writes to DB for every marker. for (size_t batch_size : {1, 1000000}) { - txn_options.max_write_batch_size = batch_size; + txn_options.write_batch_flush_threshold = batch_size; for (bool prepare : {false, true}) { for (bool commit : {false, true}) { ReOpen(); @@ -408,7 +408,7 @@ TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) { for (int i = 0; i < kNumKeys; i++) { txn->Put("k" + ToString(i), "v" + ToString(i)); - if (txn_options.max_write_batch_size == 1) { + if (txn_options.write_batch_flush_threshold == 1) { ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i + 1); } else { ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0); @@ -457,7 +457,7 @@ TEST_P(WriteUnpreparedTransactionTest, MarkLogWithPrepSection) { WriteOptions write_options; TransactionOptions txn_options; // batch_size of 1 causes writes to DB for every marker. - txn_options.max_write_batch_size = 1; + txn_options.write_batch_flush_threshold = 1; const int kNumKeys = 10; WriteOptions wopts; diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index 9265c3d4a..c677013aa 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -35,13 +35,12 @@ WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db, wupt_db_(txn_db), recovered_txn_(false), largest_validated_seq_(0) { - max_write_batch_size_ = txn_options.max_write_batch_size; - // We set max bytes to zero so that we don't get a memory limit error. - // Instead of trying to keep write batch strictly under the size limit, we - // just flush to DB when the limit is exceeded in write unprepared, to avoid - // having retry logic. This also allows very big key-value pairs that exceed - // max bytes to succeed. - write_batch_.SetMaxBytes(0); + if (txn_options.write_batch_flush_threshold < 0) { + write_batch_flush_threshold_ = + txn_db_impl_->GetTxnDBOptions().default_write_batch_flush_threshold; + } else { + write_batch_flush_threshold_ = txn_options.write_batch_flush_threshold; + } } WriteUnpreparedTxn::~WriteUnpreparedTxn() { @@ -71,8 +70,13 @@ WriteUnpreparedTxn::~WriteUnpreparedTxn() { void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) { PessimisticTransaction::Initialize(txn_options); - max_write_batch_size_ = txn_options.max_write_batch_size; - write_batch_.SetMaxBytes(0); + if (txn_options.write_batch_flush_threshold < 0) { + write_batch_flush_threshold_ = + txn_db_impl_->GetTxnDBOptions().default_write_batch_flush_threshold; + } else { + write_batch_flush_threshold_ = txn_options.write_batch_flush_threshold; + } + unprep_seqs_.clear(); recovered_txn_ = false; largest_validated_seq_ = 0; @@ -222,8 +226,9 @@ Status WriteUnpreparedTxn::RebuildFromWriteBatch(WriteBatch* wb) { Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() { const bool kPrepared = true; Status s; - if (max_write_batch_size_ != 0 && - write_batch_.GetDataSize() > max_write_batch_size_) { + if (write_batch_flush_threshold_ > 0 && + write_batch_.GetDataSize() > + static_cast(write_batch_flush_threshold_)) { assert(GetState() != PREPARED); s = FlushWriteBatchToDB(!kPrepared); } diff --git a/utilities/transactions/write_unprepared_txn.h b/utilities/transactions/write_unprepared_txn.h index d81c30217..feac749ee 100644 --- a/utilities/transactions/write_unprepared_txn.h +++ b/utilities/transactions/write_unprepared_txn.h @@ -164,10 +164,10 @@ class WriteUnpreparedTxn : public WritePreparedTxn { Status HandleWrite(std::function do_write); // For write unprepared, we check on every writebatch append to see if - // max_write_batch_size_ has been exceeded, and then call + // write_batch_flush_threshold_ has been exceeded, and then call // FlushWriteBatchToDB if so. This logic is encapsulated in // MaybeFlushWriteBatchToDB. - size_t max_write_batch_size_; + ssize_t write_batch_flush_threshold_; WriteUnpreparedTxnDB* wupt_db_; // Ordered list of unprep_seq sequence numbers that we have already written