From 7785f61132afdea73d2bfef5b9ccb79dfc7a0968 Mon Sep 17 00:00:00 2001 From: Manuel Ung Date: Wed, 14 Aug 2019 16:08:38 -0700 Subject: [PATCH] WriteUnPrepared: Fix bug in savepoints (#5703) Summary: Fix a bug in write unprepared savepoints. When flushing the write batch according to savepoint boundaries, we were forgetting to flush the last write batch after the last savepoint, meaning that some data was not written to DB. Also, add a small optimization where we avoid flushing empty batches. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5703 Differential Revision: D16811996 Pulled By: lth fbshipit-source-id: 600c7e0e520ad7a8fad32d77e11d932453e68e3f --- .../write_unprepared_transaction_test.cc | 20 +++++++++++++ .../transactions/write_unprepared_txn.cc | 30 +++++++++++-------- 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/utilities/transactions/write_unprepared_transaction_test.cc b/utilities/transactions/write_unprepared_transaction_test.cc index 51e860e63..7257c9880 100644 --- a/utilities/transactions/write_unprepared_transaction_test.cc +++ b/utilities/transactions/write_unprepared_transaction_test.cc @@ -621,6 +621,26 @@ TEST_P(WriteUnpreparedTransactionTest, IterateAndWrite) { } } +TEST_P(WriteUnpreparedTransactionTest, SavePoint) { + WriteOptions woptions; + TransactionOptions txn_options; + txn_options.write_batch_flush_threshold = 1; + + Transaction* txn = db->BeginTransaction(woptions, txn_options); + txn->SetSavePoint(); + ASSERT_OK(txn->Put("a", "a")); + ASSERT_OK(txn->Put("b", "b")); + ASSERT_OK(txn->Commit()); + + ReadOptions roptions; + std::string value; + ASSERT_OK(txn->Get(roptions, "a", &value)); + ASSERT_EQ(value, "a"); + ASSERT_OK(txn->Get(roptions, "b", &value)); + ASSERT_EQ(value, "b"); + delete txn; +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index 18ebc3700..d4e5abad5 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -407,10 +407,13 @@ Status WriteUnpreparedTxn::FlushWriteBatchWithSavePointToDB() { size_t prev_boundary = WriteBatchInternal::kHeader; const bool kPrepared = true; - for (size_t i = 0; i < unflushed_save_points_->size(); i++) { + for (size_t i = 0; i < unflushed_save_points_->size() + 1; i++) { + bool trailing_batch = i == unflushed_save_points_->size(); SavePointBatchHandler sp_handler(&write_batch_, *wupt_db_->GetCFHandleMap().get()); - size_t curr_boundary = (*unflushed_save_points_)[i]; + size_t curr_boundary = trailing_batch + ? wb.GetWriteBatch()->GetDataSize() + : (*unflushed_save_points_)[i]; // Construct the partial write batch up to the savepoint. // @@ -424,18 +427,22 @@ Status WriteUnpreparedTxn::FlushWriteBatchWithSavePointToDB() { return s; } - // Flush the write batch. - s = FlushWriteBatchToDBInternal(!kPrepared); - if (!s.ok()) { - return s; + if (write_batch_.GetWriteBatch()->Count() > 0) { + // Flush the write batch. + s = FlushWriteBatchToDBInternal(!kPrepared); + if (!s.ok()) { + return s; + } } - if (flushed_save_points_ == nullptr) { - flushed_save_points_.reset( - new autovector()); + if (!trailing_batch) { + if (flushed_save_points_ == nullptr) { + flushed_save_points_.reset( + new autovector()); + } + flushed_save_points_->emplace_back( + unprep_seqs_, new ManagedSnapshot(db_impl_, wupt_db_->GetSnapshot())); } - flushed_save_points_->emplace_back( - unprep_seqs_, new ManagedSnapshot(db_impl_, wupt_db_->GetSnapshot())); prev_boundary = curr_boundary; const bool kClear = true; @@ -736,7 +743,6 @@ Status WriteUnpreparedTxn::RollbackToSavePointInternal() { assert(flushed_save_points_->size() > 0); WriteUnpreparedTxn::SavePoint& top = flushed_save_points_->back(); - assert(top.unprep_seqs_.size() > 0); assert(save_points_ != nullptr && save_points_->size() > 0); const TransactionKeyMap& tracked_keys = save_points_->top().new_keys_;