diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 1fdd2bb82..97ce58a3e 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1583,19 +1583,32 @@ Status DBImpl::Get(const ReadOptions& read_options, return s; } +namespace { +class GetWithTimestampReadCallback : public ReadCallback { + public: + explicit GetWithTimestampReadCallback(SequenceNumber seq) + : ReadCallback(seq) {} + bool IsVisibleFullCheck(SequenceNumber seq) override { + return seq <= max_visible_seq_; + } +}; +} // namespace + Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, GetImplOptions& get_impl_options) { assert(get_impl_options.value != nullptr || get_impl_options.merge_operands != nullptr); -#ifndef NDEBUG assert(get_impl_options.column_family); - ColumnFamilyHandle* cf = get_impl_options.column_family; - const Comparator* const ucmp = cf->GetComparator(); + const Comparator* ucmp = get_impl_options.column_family->GetComparator(); assert(ucmp); - if (ucmp->timestamp_size() > 0) { + size_t ts_sz = ucmp->timestamp_size(); + GetWithTimestampReadCallback read_cb(0); // Will call Refresh + +#ifndef NDEBUG + if (ts_sz > 0) { assert(read_options.timestamp); - assert(read_options.timestamp->size() == ucmp->timestamp_size()); + assert(read_options.timestamp->size() == ts_sz); } else { assert(!read_options.timestamp); } @@ -1661,6 +1674,12 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, snapshot = get_impl_options.callback->max_visible_seq(); } } + // If timestamp is used, we use read callback to ensure is returned + // only if t <= read_opts.timestamp and s <= snapshot. + if (ts_sz > 0 && !get_impl_options.callback) { + read_cb.Refresh(snapshot); + get_impl_options.callback = &read_cb; + } TEST_SYNC_POINT("DBImpl::GetImpl:3"); TEST_SYNC_POINT("DBImpl::GetImpl:4"); @@ -1678,9 +1697,6 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, bool skip_memtable = (read_options.read_tier == kPersistedTier && has_unpersisted_data_.load(std::memory_order_relaxed)); bool done = false; - const Comparator* comparator = - get_impl_options.column_family->GetComparator(); - size_t ts_sz = comparator->timestamp_size(); std::string* timestamp = ts_sz > 0 ? get_impl_options.timestamp : nullptr; if (!skip_memtable) { // Get value associated with key diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index a9a06ac40..feec40b47 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -588,6 +588,324 @@ TEST_F(DBBasicTestWithTimestamp, CompactDeletionWithTimestampMarkerToBottom) { Close(); } +class DataVisibilityTest : public DBBasicTestWithTimestampBase { + public: + DataVisibilityTest() : DBBasicTestWithTimestampBase("data_visibility_test") {} +}; + +// Application specifies timestamp but not snapshot. +// reader writer +// ts'=90 +// ts=100 +// seq=10 +// seq'=11 +// write finishes +// GetImpl(ts,seq) +// It is OK to return if ts>=t1 AND seq>=s1. If ts>=1t1 but seqDisableProcessing(); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::GetImpl:3", + "DataVisibilityTest::PointLookupWithoutSnapshot1:BeforePut"}, + {"DataVisibilityTest::PointLookupWithoutSnapshot1:AfterPut", + "DBImpl::GetImpl:4"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + port::Thread writer_thread([this]() { + std::string write_ts_str = Timestamp(1, 0); + Slice write_ts = write_ts_str; + WriteOptions write_opts; + write_opts.timestamp = &write_ts; + TEST_SYNC_POINT( + "DataVisibilityTest::PointLookupWithoutSnapshot1:BeforePut"); + Status s = db_->Put(write_opts, "foo", "value"); + ASSERT_OK(s); + TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithoutSnapshot1:AfterPut"); + }); + ReadOptions read_opts; + std::string read_ts_str = Timestamp(3, 0); + Slice read_ts = read_ts_str; + read_opts.timestamp = &read_ts; + std::string value; + Status s = db_->Get(read_opts, "foo", &value); + + writer_thread.join(); + ASSERT_TRUE(s.IsNotFound()); + Close(); +} + +// Application specifies timestamp but not snapshot. +// reader writer +// ts'=90 +// ts=100 +// seq=10 +// seq'=11 +// write finishes +// Flush +// GetImpl(ts,seq) +// It is OK to return if ts>=t1 AND seq>=s1. If ts>=t1 but seqDisableProcessing(); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::GetImpl:3", + "DataVisibilityTest::PointLookupWithoutSnapshot2:BeforePut"}, + {"DataVisibilityTest::PointLookupWithoutSnapshot2:AfterPut", + "DBImpl::GetImpl:4"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + port::Thread writer_thread([this]() { + std::string write_ts_str = Timestamp(1, 0); + Slice write_ts = write_ts_str; + WriteOptions write_opts; + write_opts.timestamp = &write_ts; + TEST_SYNC_POINT( + "DataVisibilityTest::PointLookupWithoutSnapshot2:BeforePut"); + Status s = db_->Put(write_opts, "foo", "value"); + ASSERT_OK(s); + ASSERT_OK(Flush()); + + write_ts_str = Timestamp(2, 0); + write_ts = write_ts_str; + write_opts.timestamp = &write_ts; + s = db_->Put(write_opts, "bar", "value"); + ASSERT_OK(s); + TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithoutSnapshot2:AfterPut"); + }); + ReadOptions read_opts; + std::string read_ts_str = Timestamp(3, 0); + Slice read_ts = read_ts_str; + read_opts.timestamp = &read_ts; + std::string value; + Status s = db_->Get(read_opts, "foo", &value); + writer_thread.join(); + ASSERT_TRUE(s.IsNotFound()); + Close(); +} + +// Application specifies both timestamp and snapshot. +// reader writer +// seq=10 +// ts'=90 +// ts=100 +// seq'=11 +// write finishes +// GetImpl(ts,seq) +// Since application specifies both timestamp and snapshot, application expects +// to see data that visible in BOTH timestamp and sequence number. Therefore, +// can be returned only if t1<=ts AND s1<=seq. +TEST_F(DataVisibilityTest, PointLookupWithSnapshot1) { + Options options = CurrentOptions(); + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + DestroyAndReopen(options); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency({ + {"DataVisibilityTest::PointLookupWithSnapshot1:AfterTakingSnap", + "DataVisibilityTest::PointLookupWithSnapshot1:BeforePut"}, + {"DataVisibilityTest::PointLookupWithSnapshot1:AfterPut", + "DBImpl::GetImpl:1"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + port::Thread writer_thread([this]() { + std::string write_ts_str = Timestamp(1, 0); + Slice write_ts = write_ts_str; + WriteOptions write_opts; + write_opts.timestamp = &write_ts; + TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithSnapshot1:BeforePut"); + Status s = db_->Put(write_opts, "foo", "value"); + TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithSnapshot1:AfterPut"); + ASSERT_OK(s); + }); + ReadOptions read_opts; + const Snapshot* snap = db_->GetSnapshot(); + TEST_SYNC_POINT( + "DataVisibilityTest::PointLookupWithSnapshot1:AfterTakingSnap"); + read_opts.snapshot = snap; + std::string read_ts_str = Timestamp(3, 0); + Slice read_ts = read_ts_str; + read_opts.timestamp = &read_ts; + std::string value; + Status s = db_->Get(read_opts, "foo", &value); + writer_thread.join(); + + ASSERT_TRUE(s.IsNotFound()); + + db_->ReleaseSnapshot(snap); + Close(); +} + +// Application specifies both timestamp and snapshot. +// reader writer +// seq=10 +// ts'=90 +// ts=100 +// seq'=11 +// write finishes +// Flush +// GetImpl(ts,seq) +// Since application specifies both timestamp and snapshot, application expects +// to see data that visible in BOTH timestamp and sequence number. Therefore, +// can be returned only if t1<=ts AND s1<=seq. +TEST_F(DataVisibilityTest, PointLookupWithSnapshot2) { + Options options = CurrentOptions(); + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + DestroyAndReopen(options); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency({ + {"DataVisibilityTest::PointLookupWithSnapshot2:AfterTakingSnap", + "DataVisibilityTest::PointLookupWithSnapshot2:BeforePut"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + port::Thread writer_thread([this]() { + std::string write_ts_str = Timestamp(1, 0); + Slice write_ts = write_ts_str; + WriteOptions write_opts; + write_opts.timestamp = &write_ts; + TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithSnapshot2:BeforePut"); + Status s = db_->Put(write_opts, "foo", "value1"); + ASSERT_OK(s); + ASSERT_OK(Flush()); + + write_ts_str = Timestamp(2, 0); + write_ts = write_ts_str; + write_opts.timestamp = &write_ts; + s = db_->Put(write_opts, "bar", "value2"); + ASSERT_OK(s); + }); + const Snapshot* snap = db_->GetSnapshot(); + TEST_SYNC_POINT( + "DataVisibilityTest::PointLookupWithSnapshot2:AfterTakingSnap"); + writer_thread.join(); + std::string read_ts_str = Timestamp(3, 0); + Slice read_ts = read_ts_str; + ReadOptions read_opts; + read_opts.snapshot = snap; + read_opts.timestamp = &read_ts; + std::string value; + Status s = db_->Get(read_opts, "foo", &value); + ASSERT_TRUE(s.IsNotFound()); + db_->ReleaseSnapshot(snap); + Close(); +} + +// Application specifies timestamp but not snapshot. +// reader writer +// ts'=90 +// ts=100 +// seq=10 +// seq'=11 +// write finishes +// scan(ts,seq) +// can be seen in scan as long as ts>=t1 AND seq>=s1. If ts>=t1 but +// seqDisableProcessing(); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::NewIterator:3", + "DataVisibilityTest::RangeScanWithoutSnapshot:BeforePut"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + port::Thread writer_thread([this]() { + WriteOptions write_opts; + TEST_SYNC_POINT("DataVisibilityTest::RangeScanWithoutSnapshot:BeforePut"); + for (int i = 0; i < 3; ++i) { + std::string write_ts_str = Timestamp(i + 1, 0); + Slice write_ts = write_ts_str; + write_opts.timestamp = &write_ts; + Status s = db_->Put(write_opts, "key" + std::to_string(i), + "value" + std::to_string(i)); + ASSERT_OK(s); + } + }); + std::string read_ts_str = Timestamp(10, 0); + Slice read_ts = read_ts_str; + ReadOptions read_opts; + read_opts.total_order_seek = true; + read_opts.timestamp = &read_ts; + Iterator* it = db_->NewIterator(read_opts); + ASSERT_NE(nullptr, it); + writer_thread.join(); + it->SeekToFirst(); + ASSERT_FALSE(it->Valid()); + delete it; + Close(); +} + +// Application specifies both timestamp and snapshot. +// reader writer +// seq=10 +// ts'=90 +// ts=100 seq'=11 +// write finishes +// scan(ts,seq) +// can be seen by the scan only if t1<=ts AND s1<=seq. If t1<=ts +// but s1>seq, then the key should not be returned. +TEST_F(DataVisibilityTest, RangeScanWithSnapshot) { + Options options = CurrentOptions(); + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + DestroyAndReopen(options); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency({ + {"DataVisibilityTest::RangeScanWithSnapshot:AfterTakingSnapshot", + "DataVisibilityTest::RangeScanWithSnapshot:BeforePut"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + port::Thread writer_thread([this]() { + WriteOptions write_opts; + TEST_SYNC_POINT("DataVisibilityTest::RangeScanWithSnapshot:BeforePut"); + for (int i = 0; i < 3; ++i) { + std::string write_ts_str = Timestamp(i + 1, 0); + Slice write_ts = write_ts_str; + write_opts.timestamp = &write_ts; + Status s = db_->Put(write_opts, "key" + std::to_string(i), + "value" + std::to_string(i)); + ASSERT_OK(s); + } + }); + const Snapshot* snap = db_->GetSnapshot(); + TEST_SYNC_POINT( + "DataVisibilityTest::RangeScanWithSnapshot:AfterTakingSnapshot"); + + writer_thread.join(); + + std::string read_ts_str = Timestamp(10, 0); + Slice read_ts = read_ts_str; + ReadOptions read_opts; + read_opts.snapshot = snap; + read_opts.total_order_seek = true; + read_opts.timestamp = &read_ts; + Iterator* it = db_->NewIterator(read_opts); + ASSERT_NE(nullptr, it); + it->Seek("key0"); + ASSERT_FALSE(it->Valid()); + + delete it; + db_->ReleaseSnapshot(snap); + Close(); +} + class DBBasicTestWithTimestampCompressionSettings : public DBBasicTestWithTimestampBase, public testing::WithParamInterface< diff --git a/db/read_callback.h b/db/read_callback.h index fbef1dd0d..8989c755a 100644 --- a/db/read_callback.h +++ b/db/read_callback.h @@ -11,7 +11,7 @@ namespace ROCKSDB_NAMESPACE { class ReadCallback { public: - ReadCallback(SequenceNumber last_visible_seq) + explicit ReadCallback(SequenceNumber last_visible_seq) : max_visible_seq_(last_visible_seq) {} ReadCallback(SequenceNumber last_visible_seq, SequenceNumber min_uncommitted) : max_visible_seq_(last_visible_seq), min_uncommitted_(min_uncommitted) {}