WritePrepared Txn: Return NotSupported on iterator refresh

Summary:
A proper implementation of Iterator::Refresh() for WritePreparedTxnDB would require release and acquire another snapshot. Since MyRocks don't make use of Iterator::Refresh(), we just simply mark it as not supported.
Closes https://github.com/facebook/rocksdb/pull/3290

Differential Revision: D6599931

Pulled By: yiwu-arbug

fbshipit-source-id: 4e1632d967316431424f6e458254ecf9a97567cf
main
Yi Wu 7 years ago committed by Facebook Github Bot
parent 1563801bce
commit 06149429d9
  1. 6
      db/db_impl.cc
  2. 3
      db/db_impl.h
  3. 14
      db/db_iter.cc
  4. 7
      db/db_iter.h
  5. 6
      utilities/transactions/write_prepared_transaction_test.cc
  6. 10
      utilities/transactions/write_prepared_txn_db.cc

@ -1510,7 +1510,8 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
ColumnFamilyData* cfd, ColumnFamilyData* cfd,
SequenceNumber snapshot, SequenceNumber snapshot,
ReadCallback* read_callback, ReadCallback* read_callback,
bool allow_blob) { bool allow_blob,
bool allow_refresh) {
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
// Try to generate a DB iterator tree in continuous memory area to be // Try to generate a DB iterator tree in continuous memory area to be
@ -1559,7 +1560,8 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
env_, read_options, *cfd->ioptions(), snapshot, env_, read_options, *cfd->ioptions(), snapshot,
sv->mutable_cf_options.max_sequential_skip_in_iterations, sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback, sv->version_number, read_callback,
((read_options.snapshot != nullptr) ? nullptr : this), cfd, allow_blob); ((read_options.snapshot != nullptr) ? nullptr : this), cfd, allow_blob,
allow_refresh);
InternalIterator* internal_iter = InternalIterator* internal_iter =
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),

@ -148,7 +148,8 @@ class DBImpl : public DB {
ColumnFamilyData* cfd, ColumnFamilyData* cfd,
SequenceNumber snapshot, SequenceNumber snapshot,
ReadCallback* read_callback, ReadCallback* read_callback,
bool allow_blob = false); bool allow_blob = false,
bool allow_refresh = true);
virtual const Snapshot* GetSnapshot() override; virtual const Snapshot* GetSnapshot() override;
virtual void ReleaseSnapshot(const Snapshot* snapshot) override; virtual void ReleaseSnapshot(const Snapshot* snapshot) override;

@ -1378,17 +1378,19 @@ void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options,
const SequenceNumber& sequence, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iteration, uint64_t max_sequential_skip_in_iteration,
uint64_t version_number, uint64_t version_number,
ReadCallback* read_callback, bool allow_blob) { ReadCallback* read_callback, bool allow_blob,
bool allow_refresh) {
auto mem = arena_.AllocateAligned(sizeof(DBIter)); auto mem = arena_.AllocateAligned(sizeof(DBIter));
db_iter_ = new (mem) db_iter_ = new (mem)
DBIter(env, read_options, cf_options, cf_options.user_comparator, nullptr, DBIter(env, read_options, cf_options, cf_options.user_comparator, nullptr,
sequence, true, max_sequential_skip_in_iteration, read_callback, sequence, true, max_sequential_skip_in_iteration, read_callback,
allow_blob); allow_blob);
sv_number_ = version_number; sv_number_ = version_number;
allow_refresh_ = allow_refresh;
} }
Status ArenaWrappedDBIter::Refresh() { Status ArenaWrappedDBIter::Refresh() {
if (cfd_ == nullptr || db_impl_ == nullptr) { if (cfd_ == nullptr || db_impl_ == nullptr || !allow_refresh_) {
return Status::NotSupported("Creating renew iterator is not allowed."); return Status::NotSupported("Creating renew iterator is not allowed.");
} }
assert(db_iter_ != nullptr); assert(db_iter_ != nullptr);
@ -1406,7 +1408,7 @@ Status ArenaWrappedDBIter::Refresh() {
SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex()); SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex());
Init(env, read_options_, *(cfd_->ioptions()), latest_seq, Init(env, read_options_, *(cfd_->ioptions()), latest_seq,
sv->mutable_cf_options.max_sequential_skip_in_iterations, sv->mutable_cf_options.max_sequential_skip_in_iterations,
cur_sv_number, read_callback_, allow_blob_); cur_sv_number, read_callback_, allow_blob_, allow_refresh_);
InternalIterator* internal_iter = db_impl_->NewInternalIterator( InternalIterator* internal_iter = db_impl_->NewInternalIterator(
read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator()); read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator());
@ -1423,12 +1425,12 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator(
const ImmutableCFOptions& cf_options, const SequenceNumber& sequence, const ImmutableCFOptions& cf_options, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number, uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd, ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
bool allow_blob) { bool allow_blob, bool allow_refresh) {
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
iter->Init(env, read_options, cf_options, sequence, iter->Init(env, read_options, cf_options, sequence,
max_sequential_skip_in_iterations, version_number, read_callback, max_sequential_skip_in_iterations, version_number, read_callback,
allow_blob); allow_blob, allow_refresh);
if (db_impl != nullptr && cfd != nullptr) { if (db_impl != nullptr && cfd != nullptr && allow_refresh) {
iter->StoreRefreshInfo(read_options, db_impl, cfd, read_callback, iter->StoreRefreshInfo(read_options, db_impl, cfd, read_callback,
allow_blob); allow_blob);
} }

@ -73,7 +73,7 @@ class ArenaWrappedDBIter : public Iterator {
const ImmutableCFOptions& cf_options, const ImmutableCFOptions& cf_options,
const SequenceNumber& sequence, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number, uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, bool allow_blob); ReadCallback* read_callback, bool allow_blob, bool allow_refresh);
void StoreRefreshInfo(const ReadOptions& read_options, DBImpl* db_impl, void StoreRefreshInfo(const ReadOptions& read_options, DBImpl* db_impl,
ColumnFamilyData* cfd, ReadCallback* read_callback, ColumnFamilyData* cfd, ReadCallback* read_callback,
@ -94,6 +94,7 @@ class ArenaWrappedDBIter : public Iterator {
ReadOptions read_options_; ReadOptions read_options_;
ReadCallback* read_callback_; ReadCallback* read_callback_;
bool allow_blob_ = false; bool allow_blob_ = false;
bool allow_refresh_ = true;
}; };
// Generate the arena wrapped iterator class. // Generate the arena wrapped iterator class.
@ -104,6 +105,6 @@ extern ArenaWrappedDBIter* NewArenaWrappedDbIterator(
const ImmutableCFOptions& cf_options, const SequenceNumber& sequence, const ImmutableCFOptions& cf_options, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number, uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl = nullptr, ReadCallback* read_callback, DBImpl* db_impl = nullptr,
ColumnFamilyData* cfd = nullptr, bool allow_blob = false); ColumnFamilyData* cfd = nullptr, bool allow_blob = false,
bool allow_refresh = true);
} // namespace rocksdb } // namespace rocksdb

@ -1737,6 +1737,12 @@ TEST_P(WritePreparedTransactionTest, Iterate) {
delete transaction; delete transaction;
} }
TEST_P(WritePreparedTransactionTest, IteratorRefreshNotSupported) {
Iterator* iter = db->NewIterator(ReadOptions());
ASSERT_TRUE(iter->Refresh().IsNotSupported());
delete iter;
}
// Test that updating the commit map will not affect the existing snapshots // Test that updating the commit map will not affect the existing snapshots
TEST_P(WritePreparedTransactionTest, AtomicCommit) { TEST_P(WritePreparedTransactionTest, AtomicCommit) {
for (bool skip_prepare : {true, false}) { for (bool skip_prepare : {true, false}) {

@ -110,6 +110,8 @@ static void CleanupWritePreparedTxnDBIterator(void* arg1, void* arg2) {
Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options, Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) { ColumnFamilyHandle* column_family) {
constexpr bool ALLOW_BLOB = true;
constexpr bool ALLOW_REFRESH = true;
std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr; std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
SequenceNumber snapshot_seq = kMaxSequenceNumber; SequenceNumber snapshot_seq = kMaxSequenceNumber;
if (options.snapshot != nullptr) { if (options.snapshot != nullptr) {
@ -125,7 +127,8 @@ Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
auto* state = new IteratorState(this, snapshot_seq, own_snapshot); auto* state = new IteratorState(this, snapshot_seq, own_snapshot);
auto* db_iter = auto* db_iter =
db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback); db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
!ALLOW_BLOB, !ALLOW_REFRESH);
db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr); db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
return db_iter; return db_iter;
} }
@ -134,6 +137,8 @@ Status WritePreparedTxnDB::NewIterators(
const ReadOptions& options, const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families, const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) { std::vector<Iterator*>* iterators) {
constexpr bool ALLOW_BLOB = true;
constexpr bool ALLOW_REFRESH = true;
std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr; std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
SequenceNumber snapshot_seq = kMaxSequenceNumber; SequenceNumber snapshot_seq = kMaxSequenceNumber;
if (options.snapshot != nullptr) { if (options.snapshot != nullptr) {
@ -151,7 +156,8 @@ Status WritePreparedTxnDB::NewIterators(
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
auto* state = new IteratorState(this, snapshot_seq, own_snapshot); auto* state = new IteratorState(this, snapshot_seq, own_snapshot);
auto* db_iter = auto* db_iter =
db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback); db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
!ALLOW_BLOB, !ALLOW_REFRESH);
db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr); db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
iterators->push_back(db_iter); iterators->push_back(db_iter);
} }

Loading…
Cancel
Save