WriteUnPrepared: support iterating while writing to transaction (#5699)

Summary:
In MyRocks, there are cases where we write while iterating through keys. This currently breaks WBWIIterator, because if a write batch flushes during iteration, the delta iterator would point to invalid memory.

For now, fix by disallowing flush if there are active iterators. In the future, we will loop through all the iterators on a transaction, and refresh the iterators when a write batch is flushed.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5699

Differential Revision: D16794157

Pulled By: lth

fbshipit-source-id: 5d5bf70688bd68fe58e8a766475ae88fd1be3190
main
Manuel Ung 6 years ago committed by Facebook Github Bot
parent 90cd6c2bb1
commit 4c70cb7306
  1. 57
      utilities/transactions/write_unprepared_transaction_test.cc
  2. 22
      utilities/transactions/write_unprepared_txn.cc
  3. 13
      utilities/transactions/write_unprepared_txn.h

@ -564,6 +564,63 @@ TEST_P(WriteUnpreparedTransactionTest, NoSnapshotWrite) {
delete txn; delete txn;
} }
// Test whether write to a transaction while iterating is supported.
TEST_P(WriteUnpreparedTransactionTest, IterateAndWrite) {
WriteOptions woptions;
TransactionOptions txn_options;
txn_options.write_batch_flush_threshold = 1;
enum Action { DO_DELETE, DO_UPDATE };
for (Action a : {DO_DELETE, DO_UPDATE}) {
for (int i = 0; i < 100; i++) {
ASSERT_OK(db->Put(woptions, ToString(i), ToString(i)));
}
Transaction* txn = db->BeginTransaction(woptions, txn_options);
// write_batch_ now contains 1 key.
ASSERT_OK(txn->Put("9", "a"));
ReadOptions roptions;
auto iter = txn->GetIterator(roptions);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
if (iter->key() == "9") {
ASSERT_EQ(iter->value().ToString(), "a");
} else {
ASSERT_EQ(iter->key().ToString(), iter->value().ToString());
}
if (a == DO_DELETE) {
ASSERT_OK(txn->Delete(iter->key()));
} else {
ASSERT_OK(txn->Put(iter->key(), "b"));
}
}
delete iter;
ASSERT_OK(txn->Commit());
iter = db->NewIterator(roptions);
if (a == DO_DELETE) {
// Check that db is empty.
iter->SeekToFirst();
ASSERT_FALSE(iter->Valid());
} else {
int keys = 0;
// Check that all values are updated to b.
for (iter->SeekToFirst(); iter->Valid(); iter->Next(), keys++) {
ASSERT_OK(iter->status());
ASSERT_EQ(iter->value().ToString(), "b");
}
ASSERT_EQ(keys, 100);
}
delete iter;
delete txn;
}
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -93,13 +93,18 @@ void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) {
unflushed_save_points_.reset(nullptr); unflushed_save_points_.reset(nullptr);
recovered_txn_ = false; recovered_txn_ = false;
largest_validated_seq_ = 0; largest_validated_seq_ = 0;
assert(active_iterators_.empty());
active_iterators_.clear();
} }
Status WriteUnpreparedTxn::HandleWrite(std::function<Status()> do_write) { Status WriteUnpreparedTxn::HandleWrite(std::function<Status()> do_write) {
Status s = MaybeFlushWriteBatchToDB(); Status s;
if (active_iterators_.empty()) {
s = MaybeFlushWriteBatchToDB();
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
}
s = do_write(); s = do_write();
if (s.ok()) { if (s.ok()) {
if (snapshot_) { if (snapshot_) {
@ -688,6 +693,8 @@ void WriteUnpreparedTxn::Clear() {
unflushed_save_points_.reset(nullptr); unflushed_save_points_.reset(nullptr);
recovered_txn_ = false; recovered_txn_ = false;
largest_validated_seq_ = 0; largest_validated_seq_ = 0;
assert(active_iterators_.empty());
active_iterators_.clear();
TransactionBaseImpl::Clear(); TransactionBaseImpl::Clear();
} }
@ -862,6 +869,14 @@ Status WriteUnpreparedTxn::Get(const ReadOptions& options,
} }
} }
namespace {
static void CleanupWriteUnpreparedWBWIIterator(void* arg1, void* arg2) {
auto txn = reinterpret_cast<WriteUnpreparedTxn*>(arg1);
auto iter = reinterpret_cast<Iterator*>(arg2);
txn->RemoveActiveIterator(iter);
}
} // anonymous namespace
Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options) { Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options) {
return GetIterator(options, wupt_db_->DefaultColumnFamily()); return GetIterator(options, wupt_db_->DefaultColumnFamily());
} }
@ -872,7 +887,10 @@ Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options,
Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this); Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this);
assert(db_iter); assert(db_iter);
return write_batch_.NewIteratorWithBase(column_family, db_iter); auto iter = write_batch_.NewIteratorWithBase(column_family, db_iter);
active_iterators_.push_back(iter);
iter->RegisterCleanup(CleanupWriteUnpreparedWBWIIterator, this, iter);
return iter;
} }
Status WriteUnpreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, Status WriteUnpreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,

@ -160,6 +160,12 @@ class WriteUnpreparedTxn : public WritePreparedTxn {
return last_log_number_; return last_log_number_;
} }
void RemoveActiveIterator(Iterator* iter) {
active_iterators_.erase(
std::remove(active_iterators_.begin(), active_iterators_.end(), iter),
active_iterators_.end());
}
protected: protected:
void Initialize(const TransactionOptions& txn_options) override; void Initialize(const TransactionOptions& txn_options) override;
@ -302,6 +308,13 @@ class WriteUnpreparedTxn : public WritePreparedTxn {
std::unique_ptr<autovector<WriteUnpreparedTxn::SavePoint>> std::unique_ptr<autovector<WriteUnpreparedTxn::SavePoint>>
flushed_save_points_; flushed_save_points_;
std::unique_ptr<autovector<size_t>> unflushed_save_points_; std::unique_ptr<autovector<size_t>> unflushed_save_points_;
// It is currently unsafe to flush a write batch if there are active iterators
// created from this transaction. This is because we use WriteBatchWithIndex
// to do merging reads from the DB and the write batch. If we flush the write
// batch, it is possible that the delta iterator on the iterator will point to
// invalid memory.
std::vector<Iterator*> active_iterators_;
}; };
} // namespace rocksdb } // namespace rocksdb

Loading…
Cancel
Save