From 229350ef48ac4ee3c696bda1a60ea157e98da63f Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Tue, 24 Aug 2021 15:39:31 -0700 Subject: [PATCH] Allow iterate refresh for secondary instance (#8700) Summary: Test plan make check Pull Request resolved: https://github.com/facebook/rocksdb/pull/8700 Reviewed By: zhichao-cao Differential Revision: D30523907 Pulled By: riversand963 fbshipit-source-id: 68928ab4dafb64ce80ab7bc69d83727a4713ab91 --- HISTORY.md | 3 +++ db/db_impl/db_impl_secondary.cc | 11 +++++--- db/db_impl/db_impl_secondary.h | 4 ++- db/db_secondary_test.cc | 45 +++++++++++++++++++++++++++++++++ 4 files changed, 59 insertions(+), 4 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index d8953b190..ba89774c5 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,8 @@ # Rocksdb Change Log ## Unreleased +### Bug Fixes +* Allow secondary instance to refresh iterator. Assign read seq after referencing SuperVersion. + ### New Features * RemoteCompaction's interface now includes `db_name`, `db_id`, `session_id`, which could help the user uniquely identify compaction job between db instances and sessions. diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index d9e0ad2f0..baf385c9b 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -415,7 +415,7 @@ Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options, return NewErrorIterator( Status::NotSupported("snapshot not supported in secondary mode")); } else { - auto snapshot = versions_->LastSequence(); + SequenceNumber snapshot(kMaxSequenceNumber); result = NewIteratorImpl(read_options, cfd, snapshot, read_callback); } return result; @@ -423,14 +423,19 @@ Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options, ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl( const ReadOptions& read_options, ColumnFamilyData* cfd, - SequenceNumber snapshot, ReadCallback* read_callback) { + SequenceNumber snapshot, ReadCallback* read_callback, + bool expose_blob_index, bool allow_refresh) { assert(nullptr != cfd); SuperVersion* super_version = cfd->GetReferencedSuperVersion(this); + assert(snapshot == kMaxSequenceNumber); + snapshot = versions_->LastSequence(); + assert(snapshot != kMaxSequenceNumber); auto db_iter = NewArenaWrappedDbIterator( env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options, super_version->current, snapshot, super_version->mutable_cf_options.max_sequential_skip_in_iterations, - super_version->version_number, read_callback); + super_version->version_number, read_callback, this, cfd, + expose_blob_index, read_options.snapshot ? false : allow_refresh); auto internal_iter = NewInternalIterator( db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(), db_iter->GetRangeDelAggregator(), snapshot, diff --git a/db/db_impl/db_impl_secondary.h b/db/db_impl/db_impl_secondary.h index eedb5100e..397d22abb 100644 --- a/db/db_impl/db_impl_secondary.h +++ b/db/db_impl/db_impl_secondary.h @@ -97,7 +97,9 @@ class DBImplSecondary : public DBImpl { ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& read_options, ColumnFamilyData* cfd, SequenceNumber snapshot, - ReadCallback* read_callback); + ReadCallback* read_callback, + bool expose_blob_index = false, + bool allow_refresh = true); Status NewIterators(const ReadOptions& options, const std::vector& column_families, diff --git a/db/db_secondary_test.cc b/db/db_secondary_test.cc index ee270f7aa..178844ff7 100644 --- a/db/db_secondary_test.cc +++ b/db/db_secondary_test.cc @@ -560,6 +560,51 @@ TEST_F(DBSecondaryTest, OpenAsSecondaryWALTailing) { verify_db_func("new_foo_value_1", "new_bar_value"); } +TEST_F(DBSecondaryTest, RefreshIterator) { + Options options; + options.env = env_; + Reopen(options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + + std::unique_ptr it(db_secondary_->NewIterator(ReadOptions())); + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + if (0 == i) { + it->Seek("foo"); + ASSERT_FALSE(it->Valid()); + ASSERT_OK(it->status()); + + ASSERT_OK(it->Refresh()); + + it->Seek("foo"); + ASSERT_OK(it->status()); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ("foo", it->key()); + ASSERT_EQ("foo_value0", it->value()); + } else { + it->Seek("foo"); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ("foo", it->key()); + ASSERT_EQ("foo_value" + std::to_string(i - 1), it->value()); + ASSERT_OK(it->status()); + + ASSERT_OK(it->Refresh()); + + it->Seek("foo"); + ASSERT_OK(it->status()); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ("foo", it->key()); + ASSERT_EQ("foo_value" + std::to_string(i), it->value()); + } + } +} + TEST_F(DBSecondaryTest, OpenWithNonExistColumnFamily) { Options options; options.env = env_;