From 06149429d91c7e766675c43ae41da695bbd2ced9 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Mon, 18 Dec 2017 22:13:08 -0800 Subject: [PATCH] WritePrepared Txn: Return NotSupported on iterator refresh Summary: A proper implementation of Iterator::Refresh() for WritePreparedTxnDB would require release and acquire another snapshot. Since MyRocks don't make use of Iterator::Refresh(), we just simply mark it as not supported. Closes https://github.com/facebook/rocksdb/pull/3290 Differential Revision: D6599931 Pulled By: yiwu-arbug fbshipit-source-id: 4e1632d967316431424f6e458254ecf9a97567cf --- db/db_impl.cc | 6 ++++-- db/db_impl.h | 3 ++- db/db_iter.cc | 14 ++++++++------ db/db_iter.h | 7 ++++--- .../write_prepared_transaction_test.cc | 6 ++++++ utilities/transactions/write_prepared_txn_db.cc | 10 ++++++++-- 6 files changed, 32 insertions(+), 14 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 900e6b053..6a13fbbc1 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1510,7 +1510,8 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options, ColumnFamilyData* cfd, SequenceNumber snapshot, ReadCallback* read_callback, - bool allow_blob) { + bool allow_blob, + bool allow_refresh) { SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); // Try to generate a DB iterator tree in continuous memory area to be @@ -1559,7 +1560,8 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options, env_, read_options, *cfd->ioptions(), snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations, sv->version_number, read_callback, - ((read_options.snapshot != nullptr) ? nullptr : this), cfd, allow_blob); + ((read_options.snapshot != nullptr) ? nullptr : this), cfd, allow_blob, + allow_refresh); InternalIterator* internal_iter = NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), diff --git a/db/db_impl.h b/db/db_impl.h index 140720946..3c64d2589 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -148,7 +148,8 @@ class DBImpl : public DB { ColumnFamilyData* cfd, SequenceNumber snapshot, ReadCallback* read_callback, - bool allow_blob = false); + bool allow_blob = false, + bool allow_refresh = true); virtual const Snapshot* GetSnapshot() override; virtual void ReleaseSnapshot(const Snapshot* snapshot) override; diff --git a/db/db_iter.cc b/db/db_iter.cc index f2b737536..6ce6010ce 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -1378,17 +1378,19 @@ void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iteration, uint64_t version_number, - ReadCallback* read_callback, bool allow_blob) { + ReadCallback* read_callback, bool allow_blob, + bool allow_refresh) { auto mem = arena_.AllocateAligned(sizeof(DBIter)); db_iter_ = new (mem) DBIter(env, read_options, cf_options, cf_options.user_comparator, nullptr, sequence, true, max_sequential_skip_in_iteration, read_callback, allow_blob); sv_number_ = version_number; + allow_refresh_ = allow_refresh; } Status ArenaWrappedDBIter::Refresh() { - if (cfd_ == nullptr || db_impl_ == nullptr) { + if (cfd_ == nullptr || db_impl_ == nullptr || !allow_refresh_) { return Status::NotSupported("Creating renew iterator is not allowed."); } assert(db_iter_ != nullptr); @@ -1406,7 +1408,7 @@ Status ArenaWrappedDBIter::Refresh() { SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex()); Init(env, read_options_, *(cfd_->ioptions()), latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations, - cur_sv_number, read_callback_, allow_blob_); + cur_sv_number, read_callback_, allow_blob_, allow_refresh_); InternalIterator* internal_iter = db_impl_->NewInternalIterator( read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator()); @@ -1423,12 +1425,12 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator( const ImmutableCFOptions& cf_options, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd, - bool allow_blob) { + bool allow_blob, bool allow_refresh) { ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); iter->Init(env, read_options, cf_options, sequence, max_sequential_skip_in_iterations, version_number, read_callback, - allow_blob); - if (db_impl != nullptr && cfd != nullptr) { + allow_blob, allow_refresh); + if (db_impl != nullptr && cfd != nullptr && allow_refresh) { iter->StoreRefreshInfo(read_options, db_impl, cfd, read_callback, allow_blob); } diff --git a/db/db_iter.h b/db/db_iter.h index 7047f5197..df7c1e1da 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -73,7 +73,7 @@ class ArenaWrappedDBIter : public Iterator { const ImmutableCFOptions& cf_options, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, - ReadCallback* read_callback, bool allow_blob); + ReadCallback* read_callback, bool allow_blob, bool allow_refresh); void StoreRefreshInfo(const ReadOptions& read_options, DBImpl* db_impl, ColumnFamilyData* cfd, ReadCallback* read_callback, @@ -94,6 +94,7 @@ class ArenaWrappedDBIter : public Iterator { ReadOptions read_options_; ReadCallback* read_callback_; bool allow_blob_ = false; + bool allow_refresh_ = true; }; // Generate the arena wrapped iterator class. @@ -104,6 +105,6 @@ extern ArenaWrappedDBIter* NewArenaWrappedDbIterator( const ImmutableCFOptions& cf_options, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, ReadCallback* read_callback, DBImpl* db_impl = nullptr, - ColumnFamilyData* cfd = nullptr, bool allow_blob = false); - + ColumnFamilyData* cfd = nullptr, bool allow_blob = false, + bool allow_refresh = true); } // namespace rocksdb diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 4c77162d5..ca1ec8da5 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -1737,6 +1737,12 @@ TEST_P(WritePreparedTransactionTest, Iterate) { delete transaction; } +TEST_P(WritePreparedTransactionTest, IteratorRefreshNotSupported) { + Iterator* iter = db->NewIterator(ReadOptions()); + ASSERT_TRUE(iter->Refresh().IsNotSupported()); + delete iter; +} + // Test that updating the commit map will not affect the existing snapshots TEST_P(WritePreparedTransactionTest, AtomicCommit) { for (bool skip_prepare : {true, false}) { diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 54b86c1c8..5334a4543 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -110,6 +110,8 @@ static void CleanupWritePreparedTxnDBIterator(void* arg1, void* arg2) { Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options, ColumnFamilyHandle* column_family) { + constexpr bool ALLOW_BLOB = true; + constexpr bool ALLOW_REFRESH = true; std::shared_ptr own_snapshot = nullptr; SequenceNumber snapshot_seq = kMaxSequenceNumber; if (options.snapshot != nullptr) { @@ -125,7 +127,8 @@ Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options, auto* cfd = reinterpret_cast(column_family)->cfd(); auto* state = new IteratorState(this, snapshot_seq, own_snapshot); auto* db_iter = - db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback); + db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback, + !ALLOW_BLOB, !ALLOW_REFRESH); db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr); return db_iter; } @@ -134,6 +137,8 @@ Status WritePreparedTxnDB::NewIterators( const ReadOptions& options, const std::vector& column_families, std::vector* iterators) { + constexpr bool ALLOW_BLOB = true; + constexpr bool ALLOW_REFRESH = true; std::shared_ptr own_snapshot = nullptr; SequenceNumber snapshot_seq = kMaxSequenceNumber; if (options.snapshot != nullptr) { @@ -151,7 +156,8 @@ Status WritePreparedTxnDB::NewIterators( auto* cfd = reinterpret_cast(column_family)->cfd(); auto* state = new IteratorState(this, snapshot_seq, own_snapshot); auto* db_iter = - db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback); + db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback, + !ALLOW_BLOB, !ALLOW_REFRESH); db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr); iterators->push_back(db_iter); }