diff --git a/utilities/transactions/write_unprepared_transaction_test.cc b/utilities/transactions/write_unprepared_transaction_test.cc index 51e860e63..7257c9880 100644 --- a/utilities/transactions/write_unprepared_transaction_test.cc +++ b/utilities/transactions/write_unprepared_transaction_test.cc @@ -621,6 +621,26 @@ TEST_P(WriteUnpreparedTransactionTest, IterateAndWrite) { } } +TEST_P(WriteUnpreparedTransactionTest, SavePoint) { + WriteOptions woptions; + TransactionOptions txn_options; + txn_options.write_batch_flush_threshold = 1; + + Transaction* txn = db->BeginTransaction(woptions, txn_options); + txn->SetSavePoint(); + ASSERT_OK(txn->Put("a", "a")); + ASSERT_OK(txn->Put("b", "b")); + ASSERT_OK(txn->Commit()); + + ReadOptions roptions; + std::string value; + ASSERT_OK(txn->Get(roptions, "a", &value)); + ASSERT_EQ(value, "a"); + ASSERT_OK(txn->Get(roptions, "b", &value)); + ASSERT_EQ(value, "b"); + delete txn; +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index 18ebc3700..d4e5abad5 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -407,10 +407,13 @@ Status WriteUnpreparedTxn::FlushWriteBatchWithSavePointToDB() { size_t prev_boundary = WriteBatchInternal::kHeader; const bool kPrepared = true; - for (size_t i = 0; i < unflushed_save_points_->size(); i++) { + for (size_t i = 0; i < unflushed_save_points_->size() + 1; i++) { + bool trailing_batch = i == unflushed_save_points_->size(); SavePointBatchHandler sp_handler(&write_batch_, *wupt_db_->GetCFHandleMap().get()); - size_t curr_boundary = (*unflushed_save_points_)[i]; + size_t curr_boundary = trailing_batch + ? wb.GetWriteBatch()->GetDataSize() + : (*unflushed_save_points_)[i]; // Construct the partial write batch up to the savepoint. // @@ -424,18 +427,22 @@ Status WriteUnpreparedTxn::FlushWriteBatchWithSavePointToDB() { return s; } - // Flush the write batch. - s = FlushWriteBatchToDBInternal(!kPrepared); - if (!s.ok()) { - return s; + if (write_batch_.GetWriteBatch()->Count() > 0) { + // Flush the write batch. + s = FlushWriteBatchToDBInternal(!kPrepared); + if (!s.ok()) { + return s; + } } - if (flushed_save_points_ == nullptr) { - flushed_save_points_.reset( - new autovector()); + if (!trailing_batch) { + if (flushed_save_points_ == nullptr) { + flushed_save_points_.reset( + new autovector()); + } + flushed_save_points_->emplace_back( + unprep_seqs_, new ManagedSnapshot(db_impl_, wupt_db_->GetSnapshot())); } - flushed_save_points_->emplace_back( - unprep_seqs_, new ManagedSnapshot(db_impl_, wupt_db_->GetSnapshot())); prev_boundary = curr_boundary; const bool kClear = true; @@ -736,7 +743,6 @@ Status WriteUnpreparedTxn::RollbackToSavePointInternal() { assert(flushed_save_points_->size() > 0); WriteUnpreparedTxn::SavePoint& top = flushed_save_points_->back(); - assert(top.unprep_seqs_.size() > 0); assert(save_points_ != nullptr && save_points_->size() > 0); const TransactionKeyMap& tracked_keys = save_points_->top().new_keys_;