From d758273cebb320c3a188c3dc7ac4293698fd1c5b Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 14 Aug 2020 19:19:36 -0700 Subject: [PATCH] Get() with timestamp should respect snapshot (#7227) Summary: If user-defined timestamp is enabled, current implementation can expose newer data to queries even if an older sequence number is specified via read_options.snapshot. This PR makes Get() respect sequence-number-based snapshot. Solution is simple. Besides using to search the index for the key, we also verify that the candidate result's seq is smaller than or equal to seq. This requires passing a seq via `GetContext`, which results in the majority of code change caused by this PR. Also added a few unit tests to demonstrate standard visibility during point lookup and range scan when timestamp and snapshot are both present. Test plan (devserver): ``` make check $./db_bench --benchmarks=fillseq,readrandom -cache_size=$[64*1024*1024] ``` Result this PR: readrandom : 4.827 micros/op 207180 ops/sec; 22.9 MB/s (1000000 of 1000000 found) master: readrandom : 4.936 micros/op 202610 ops/sec; 22.4 MB/s (1000000 of 1000000 found) Pull Request resolved: https://github.com/facebook/rocksdb/pull/7227 Reviewed By: ltamasi Differential Revision: D23015242 Pulled By: riversand963 fbshipit-source-id: ea7b85a728654553ba357d2e6a207b5e40f7376a --- db/db_impl/db_impl.cc | 32 ++- db/db_with_timestamp_basic_test.cc | 318 +++++++++++++++++++++++++++++ db/read_callback.h | 2 +- 3 files changed, 343 insertions(+), 9 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 1fdd2bb82..97ce58a3e 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1583,19 +1583,32 @@ Status DBImpl::Get(const ReadOptions& read_options, return s; } +namespace { +class GetWithTimestampReadCallback : public ReadCallback { + public: + explicit GetWithTimestampReadCallback(SequenceNumber seq) + : ReadCallback(seq) {} + bool IsVisibleFullCheck(SequenceNumber seq) override { + return seq <= max_visible_seq_; + } +}; +} // namespace + Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, GetImplOptions& get_impl_options) { assert(get_impl_options.value != nullptr || get_impl_options.merge_operands != nullptr); -#ifndef NDEBUG assert(get_impl_options.column_family); - ColumnFamilyHandle* cf = get_impl_options.column_family; - const Comparator* const ucmp = cf->GetComparator(); + const Comparator* ucmp = get_impl_options.column_family->GetComparator(); assert(ucmp); - if (ucmp->timestamp_size() > 0) { + size_t ts_sz = ucmp->timestamp_size(); + GetWithTimestampReadCallback read_cb(0); // Will call Refresh + +#ifndef NDEBUG + if (ts_sz > 0) { assert(read_options.timestamp); - assert(read_options.timestamp->size() == ucmp->timestamp_size()); + assert(read_options.timestamp->size() == ts_sz); } else { assert(!read_options.timestamp); } @@ -1661,6 +1674,12 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, snapshot = get_impl_options.callback->max_visible_seq(); } } + // If timestamp is used, we use read callback to ensure is returned + // only if t <= read_opts.timestamp and s <= snapshot. + if (ts_sz > 0 && !get_impl_options.callback) { + read_cb.Refresh(snapshot); + get_impl_options.callback = &read_cb; + } TEST_SYNC_POINT("DBImpl::GetImpl:3"); TEST_SYNC_POINT("DBImpl::GetImpl:4"); @@ -1678,9 +1697,6 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, bool skip_memtable = (read_options.read_tier == kPersistedTier && has_unpersisted_data_.load(std::memory_order_relaxed)); bool done = false; - const Comparator* comparator = - get_impl_options.column_family->GetComparator(); - size_t ts_sz = comparator->timestamp_size(); std::string* timestamp = ts_sz > 0 ? get_impl_options.timestamp : nullptr; if (!skip_memtable) { // Get value associated with key diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index a9a06ac40..feec40b47 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -588,6 +588,324 @@ TEST_F(DBBasicTestWithTimestamp, CompactDeletionWithTimestampMarkerToBottom) { Close(); } +class DataVisibilityTest : public DBBasicTestWithTimestampBase { + public: + DataVisibilityTest() : DBBasicTestWithTimestampBase("data_visibility_test") {} +}; + +// Application specifies timestamp but not snapshot. +// reader writer +// ts'=90 +// ts=100 +// seq=10 +// seq'=11 +// write finishes +// GetImpl(ts,seq) +// It is OK to return if ts>=t1 AND seq>=s1. If ts>=1t1 but seqDisableProcessing(); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::GetImpl:3", + "DataVisibilityTest::PointLookupWithoutSnapshot1:BeforePut"}, + {"DataVisibilityTest::PointLookupWithoutSnapshot1:AfterPut", + "DBImpl::GetImpl:4"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + port::Thread writer_thread([this]() { + std::string write_ts_str = Timestamp(1, 0); + Slice write_ts = write_ts_str; + WriteOptions write_opts; + write_opts.timestamp = &write_ts; + TEST_SYNC_POINT( + "DataVisibilityTest::PointLookupWithoutSnapshot1:BeforePut"); + Status s = db_->Put(write_opts, "foo", "value"); + ASSERT_OK(s); + TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithoutSnapshot1:AfterPut"); + }); + ReadOptions read_opts; + std::string read_ts_str = Timestamp(3, 0); + Slice read_ts = read_ts_str; + read_opts.timestamp = &read_ts; + std::string value; + Status s = db_->Get(read_opts, "foo", &value); + + writer_thread.join(); + ASSERT_TRUE(s.IsNotFound()); + Close(); +} + +// Application specifies timestamp but not snapshot. +// reader writer +// ts'=90 +// ts=100 +// seq=10 +// seq'=11 +// write finishes +// Flush +// GetImpl(ts,seq) +// It is OK to return if ts>=t1 AND seq>=s1. If ts>=t1 but seqDisableProcessing(); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::GetImpl:3", + "DataVisibilityTest::PointLookupWithoutSnapshot2:BeforePut"}, + {"DataVisibilityTest::PointLookupWithoutSnapshot2:AfterPut", + "DBImpl::GetImpl:4"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + port::Thread writer_thread([this]() { + std::string write_ts_str = Timestamp(1, 0); + Slice write_ts = write_ts_str; + WriteOptions write_opts; + write_opts.timestamp = &write_ts; + TEST_SYNC_POINT( + "DataVisibilityTest::PointLookupWithoutSnapshot2:BeforePut"); + Status s = db_->Put(write_opts, "foo", "value"); + ASSERT_OK(s); + ASSERT_OK(Flush()); + + write_ts_str = Timestamp(2, 0); + write_ts = write_ts_str; + write_opts.timestamp = &write_ts; + s = db_->Put(write_opts, "bar", "value"); + ASSERT_OK(s); + TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithoutSnapshot2:AfterPut"); + }); + ReadOptions read_opts; + std::string read_ts_str = Timestamp(3, 0); + Slice read_ts = read_ts_str; + read_opts.timestamp = &read_ts; + std::string value; + Status s = db_->Get(read_opts, "foo", &value); + writer_thread.join(); + ASSERT_TRUE(s.IsNotFound()); + Close(); +} + +// Application specifies both timestamp and snapshot. +// reader writer +// seq=10 +// ts'=90 +// ts=100 +// seq'=11 +// write finishes +// GetImpl(ts,seq) +// Since application specifies both timestamp and snapshot, application expects +// to see data that visible in BOTH timestamp and sequence number. Therefore, +// can be returned only if t1<=ts AND s1<=seq. +TEST_F(DataVisibilityTest, PointLookupWithSnapshot1) { + Options options = CurrentOptions(); + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + DestroyAndReopen(options); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency({ + {"DataVisibilityTest::PointLookupWithSnapshot1:AfterTakingSnap", + "DataVisibilityTest::PointLookupWithSnapshot1:BeforePut"}, + {"DataVisibilityTest::PointLookupWithSnapshot1:AfterPut", + "DBImpl::GetImpl:1"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + port::Thread writer_thread([this]() { + std::string write_ts_str = Timestamp(1, 0); + Slice write_ts = write_ts_str; + WriteOptions write_opts; + write_opts.timestamp = &write_ts; + TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithSnapshot1:BeforePut"); + Status s = db_->Put(write_opts, "foo", "value"); + TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithSnapshot1:AfterPut"); + ASSERT_OK(s); + }); + ReadOptions read_opts; + const Snapshot* snap = db_->GetSnapshot(); + TEST_SYNC_POINT( + "DataVisibilityTest::PointLookupWithSnapshot1:AfterTakingSnap"); + read_opts.snapshot = snap; + std::string read_ts_str = Timestamp(3, 0); + Slice read_ts = read_ts_str; + read_opts.timestamp = &read_ts; + std::string value; + Status s = db_->Get(read_opts, "foo", &value); + writer_thread.join(); + + ASSERT_TRUE(s.IsNotFound()); + + db_->ReleaseSnapshot(snap); + Close(); +} + +// Application specifies both timestamp and snapshot. +// reader writer +// seq=10 +// ts'=90 +// ts=100 +// seq'=11 +// write finishes +// Flush +// GetImpl(ts,seq) +// Since application specifies both timestamp and snapshot, application expects +// to see data that visible in BOTH timestamp and sequence number. Therefore, +// can be returned only if t1<=ts AND s1<=seq. +TEST_F(DataVisibilityTest, PointLookupWithSnapshot2) { + Options options = CurrentOptions(); + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + DestroyAndReopen(options); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency({ + {"DataVisibilityTest::PointLookupWithSnapshot2:AfterTakingSnap", + "DataVisibilityTest::PointLookupWithSnapshot2:BeforePut"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + port::Thread writer_thread([this]() { + std::string write_ts_str = Timestamp(1, 0); + Slice write_ts = write_ts_str; + WriteOptions write_opts; + write_opts.timestamp = &write_ts; + TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithSnapshot2:BeforePut"); + Status s = db_->Put(write_opts, "foo", "value1"); + ASSERT_OK(s); + ASSERT_OK(Flush()); + + write_ts_str = Timestamp(2, 0); + write_ts = write_ts_str; + write_opts.timestamp = &write_ts; + s = db_->Put(write_opts, "bar", "value2"); + ASSERT_OK(s); + }); + const Snapshot* snap = db_->GetSnapshot(); + TEST_SYNC_POINT( + "DataVisibilityTest::PointLookupWithSnapshot2:AfterTakingSnap"); + writer_thread.join(); + std::string read_ts_str = Timestamp(3, 0); + Slice read_ts = read_ts_str; + ReadOptions read_opts; + read_opts.snapshot = snap; + read_opts.timestamp = &read_ts; + std::string value; + Status s = db_->Get(read_opts, "foo", &value); + ASSERT_TRUE(s.IsNotFound()); + db_->ReleaseSnapshot(snap); + Close(); +} + +// Application specifies timestamp but not snapshot. +// reader writer +// ts'=90 +// ts=100 +// seq=10 +// seq'=11 +// write finishes +// scan(ts,seq) +// can be seen in scan as long as ts>=t1 AND seq>=s1. If ts>=t1 but +// seqDisableProcessing(); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::NewIterator:3", + "DataVisibilityTest::RangeScanWithoutSnapshot:BeforePut"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + port::Thread writer_thread([this]() { + WriteOptions write_opts; + TEST_SYNC_POINT("DataVisibilityTest::RangeScanWithoutSnapshot:BeforePut"); + for (int i = 0; i < 3; ++i) { + std::string write_ts_str = Timestamp(i + 1, 0); + Slice write_ts = write_ts_str; + write_opts.timestamp = &write_ts; + Status s = db_->Put(write_opts, "key" + std::to_string(i), + "value" + std::to_string(i)); + ASSERT_OK(s); + } + }); + std::string read_ts_str = Timestamp(10, 0); + Slice read_ts = read_ts_str; + ReadOptions read_opts; + read_opts.total_order_seek = true; + read_opts.timestamp = &read_ts; + Iterator* it = db_->NewIterator(read_opts); + ASSERT_NE(nullptr, it); + writer_thread.join(); + it->SeekToFirst(); + ASSERT_FALSE(it->Valid()); + delete it; + Close(); +} + +// Application specifies both timestamp and snapshot. +// reader writer +// seq=10 +// ts'=90 +// ts=100 seq'=11 +// write finishes +// scan(ts,seq) +// can be seen by the scan only if t1<=ts AND s1<=seq. If t1<=ts +// but s1>seq, then the key should not be returned. +TEST_F(DataVisibilityTest, RangeScanWithSnapshot) { + Options options = CurrentOptions(); + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + DestroyAndReopen(options); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency({ + {"DataVisibilityTest::RangeScanWithSnapshot:AfterTakingSnapshot", + "DataVisibilityTest::RangeScanWithSnapshot:BeforePut"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + port::Thread writer_thread([this]() { + WriteOptions write_opts; + TEST_SYNC_POINT("DataVisibilityTest::RangeScanWithSnapshot:BeforePut"); + for (int i = 0; i < 3; ++i) { + std::string write_ts_str = Timestamp(i + 1, 0); + Slice write_ts = write_ts_str; + write_opts.timestamp = &write_ts; + Status s = db_->Put(write_opts, "key" + std::to_string(i), + "value" + std::to_string(i)); + ASSERT_OK(s); + } + }); + const Snapshot* snap = db_->GetSnapshot(); + TEST_SYNC_POINT( + "DataVisibilityTest::RangeScanWithSnapshot:AfterTakingSnapshot"); + + writer_thread.join(); + + std::string read_ts_str = Timestamp(10, 0); + Slice read_ts = read_ts_str; + ReadOptions read_opts; + read_opts.snapshot = snap; + read_opts.total_order_seek = true; + read_opts.timestamp = &read_ts; + Iterator* it = db_->NewIterator(read_opts); + ASSERT_NE(nullptr, it); + it->Seek("key0"); + ASSERT_FALSE(it->Valid()); + + delete it; + db_->ReleaseSnapshot(snap); + Close(); +} + class DBBasicTestWithTimestampCompressionSettings : public DBBasicTestWithTimestampBase, public testing::WithParamInterface< diff --git a/db/read_callback.h b/db/read_callback.h index fbef1dd0d..8989c755a 100644 --- a/db/read_callback.h +++ b/db/read_callback.h @@ -11,7 +11,7 @@ namespace ROCKSDB_NAMESPACE { class ReadCallback { public: - ReadCallback(SequenceNumber last_visible_seq) + explicit ReadCallback(SequenceNumber last_visible_seq) : max_visible_seq_(last_visible_seq) {} ReadCallback(SequenceNumber last_visible_seq, SequenceNumber min_uncommitted) : max_visible_seq_(last_visible_seq), min_uncommitted_(min_uncommitted) {}