// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/db_with_timestamp_test_util.h" #include "port/stack_trace.h" #include "rocksdb/perf_context.h" #include "rocksdb/utilities/debug.h" #include "table/block_based/block_based_table_reader.h" #include "table/block_based/block_builder.h" #if !defined(ROCKSDB_LITE) #include "test_util/sync_point.h" #endif #include "test_util/testutil.h" #include "utilities/fault_injection_env.h" #include "utilities/merge_operators/string_append/stringappend2.h" namespace ROCKSDB_NAMESPACE { class DBBasicTestWithTimestamp : public DBBasicTestWithTimestampBase { public: DBBasicTestWithTimestamp() : DBBasicTestWithTimestampBase("db_basic_test_with_timestamp") {} }; TEST_F(DBBasicTestWithTimestamp, SanityChecks) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; options.avoid_flush_during_shutdown = true; options.merge_operator = MergeOperators::CreateStringAppendTESTOperator(); DestroyAndReopen(options); Options options1 = CurrentOptions(); options1.env = env_; options1.comparator = test::BytewiseComparatorWithU64TsWrapper(); options1.merge_operator = MergeOperators::CreateStringAppendTESTOperator(); assert(options1.comparator && options1.comparator->timestamp_size() == sizeof(uint64_t)); ColumnFamilyHandle* handle = nullptr; Status s = db_->CreateColumnFamily(options1, "data", &handle); ASSERT_OK(s); std::string dummy_ts(sizeof(uint64_t), '\0'); // Perform timestamp operations on default cf. ASSERT_TRUE( db_->Put(WriteOptions(), "key", dummy_ts, "value").IsInvalidArgument()); ASSERT_TRUE(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), "key", dummy_ts, "value") .IsInvalidArgument()); ASSERT_TRUE(db_->Delete(WriteOptions(), "key", dummy_ts).IsInvalidArgument()); ASSERT_TRUE( db_->SingleDelete(WriteOptions(), "key", dummy_ts).IsInvalidArgument()); ASSERT_TRUE(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "begin_key", "end_key", dummy_ts) .IsInvalidArgument()); // Perform non-timestamp operations on "data" cf. ASSERT_TRUE( db_->Put(WriteOptions(), handle, "key", "value").IsInvalidArgument()); ASSERT_TRUE(db_->Delete(WriteOptions(), handle, "key").IsInvalidArgument()); ASSERT_TRUE( db_->SingleDelete(WriteOptions(), handle, "key").IsInvalidArgument()); ASSERT_TRUE( db_->Merge(WriteOptions(), handle, "key", "value").IsInvalidArgument()); ASSERT_TRUE(db_->DeleteRange(WriteOptions(), handle, "begin_key", "end_key") .IsInvalidArgument()); { WriteBatch wb; ASSERT_OK(wb.Put(handle, "key", "value")); ASSERT_TRUE(db_->Write(WriteOptions(), &wb).IsInvalidArgument()); } { WriteBatch wb; ASSERT_OK(wb.Delete(handle, "key")); ASSERT_TRUE(db_->Write(WriteOptions(), &wb).IsInvalidArgument()); } { WriteBatch wb; ASSERT_OK(wb.SingleDelete(handle, "key")); ASSERT_TRUE(db_->Write(WriteOptions(), &wb).IsInvalidArgument()); } { WriteBatch wb; ASSERT_OK(wb.DeleteRange(handle, "begin_key", "end_key")); ASSERT_TRUE(db_->Write(WriteOptions(), &wb).IsInvalidArgument()); } // Perform timestamp operations with timestamps of incorrect size. const std::string wrong_ts(sizeof(uint32_t), '\0'); ASSERT_TRUE(db_->Put(WriteOptions(), handle, "key", wrong_ts, "value") .IsInvalidArgument()); ASSERT_TRUE(db_->Merge(WriteOptions(), handle, "key", wrong_ts, "value") .IsInvalidArgument()); ASSERT_TRUE( db_->Delete(WriteOptions(), handle, "key", wrong_ts).IsInvalidArgument()); ASSERT_TRUE(db_->SingleDelete(WriteOptions(), handle, "key", wrong_ts) .IsInvalidArgument()); ASSERT_TRUE( db_->DeleteRange(WriteOptions(), handle, "begin_key", "end_key", wrong_ts) .IsInvalidArgument()); delete handle; } TEST_F(DBBasicTestWithTimestamp, MixedCfs) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; options.avoid_flush_during_shutdown = true; DestroyAndReopen(options); Options options1 = CurrentOptions(); options1.env = env_; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options1.comparator = &test_cmp; ColumnFamilyHandle* handle = nullptr; Status s = db_->CreateColumnFamily(options1, "data", &handle); ASSERT_OK(s); WriteBatch wb; ASSERT_OK(wb.Put("a", "value")); ASSERT_OK(wb.Put(handle, "a", "value")); { std::string ts = Timestamp(1, 0); const auto ts_sz_func = [kTimestampSize, handle](uint32_t cf_id) { assert(handle); if (cf_id == 0) { return static_cast(0); } else if (cf_id == handle->GetID()) { return kTimestampSize; } else { assert(false); return std::numeric_limits::max(); } }; ASSERT_OK(wb.UpdateTimestamps(ts, ts_sz_func)); ASSERT_OK(db_->Write(WriteOptions(), &wb)); } const auto verify_db = [this](ColumnFamilyHandle* h, const std::string& key, const std::string& ts, const std::string& expected_value) { ASSERT_EQ(expected_value, Get(key)); Slice read_ts_slice(ts); ReadOptions read_opts; read_opts.timestamp = &read_ts_slice; std::string value; ASSERT_OK(db_->Get(read_opts, h, key, &value)); ASSERT_EQ(expected_value, value); }; verify_db(handle, "a", Timestamp(1, 0), "value"); delete handle; Close(); std::vector cf_descs; cf_descs.emplace_back(kDefaultColumnFamilyName, options); cf_descs.emplace_back("data", options1); options.create_if_missing = false; s = DB::Open(options, dbname_, cf_descs, &handles_, &db_); ASSERT_OK(s); verify_db(handles_[1], "a", Timestamp(1, 0), "value"); Close(); } TEST_F(DBBasicTestWithTimestamp, CompactRangeWithSpecifiedRange) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); WriteOptions write_opts; std::string ts = Timestamp(1, 0); ASSERT_OK(db_->Put(write_opts, "foo1", ts, "bar")); ASSERT_OK(Flush()); ASSERT_OK(db_->Put(write_opts, "foo2", ts, "bar")); ASSERT_OK(Flush()); std::string start_str = "foo"; std::string end_str = "foo2"; Slice start(start_str), end(end_str); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end)); Close(); } TEST_F(DBBasicTestWithTimestamp, GcPreserveLatestVersionBelowFullHistoryLow) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); std::string ts_str = Timestamp(1, 0); WriteOptions wopts; ASSERT_OK(db_->Put(wopts, "k1", ts_str, "v1")); ASSERT_OK(db_->Put(wopts, "k2", ts_str, "v2")); ASSERT_OK(db_->Put(wopts, "k3", ts_str, "v3")); ts_str = Timestamp(2, 0); ASSERT_OK(db_->Delete(wopts, "k3", ts_str)); ts_str = Timestamp(4, 0); ASSERT_OK(db_->Put(wopts, "k1", ts_str, "v5")); ts_str = Timestamp(5, 0); ASSERT_OK( db_->DeleteRange(wopts, db_->DefaultColumnFamily(), "k0", "k9", ts_str)); ts_str = Timestamp(3, 0); Slice ts = ts_str; CompactRangeOptions cro; cro.full_history_ts_low = &ts; ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); ASSERT_OK(Flush()); ReadOptions ropts; ropts.timestamp = &ts; std::string value; Status s = db_->Get(ropts, "k1", &value); ASSERT_OK(s); ASSERT_EQ("v1", value); std::string key_ts; ASSERT_TRUE(db_->Get(ropts, "k3", &value, &key_ts).IsNotFound()); ASSERT_EQ(Timestamp(2, 0), key_ts); ts_str = Timestamp(5, 0); ts = ts_str; ropts.timestamp = &ts; ASSERT_TRUE(db_->Get(ropts, "k2", &value, &key_ts).IsNotFound()); ASSERT_EQ(Timestamp(5, 0), key_ts); ASSERT_TRUE(db_->Get(ropts, "k2", &value).IsNotFound()); Close(); } TEST_F(DBBasicTestWithTimestamp, UpdateFullHistoryTsLow) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); const std::string kKey = "test kKey"; // Test set ts_low first and flush() int current_ts_low = 5; std::string ts_low_str = Timestamp(current_ts_low, 0); Slice ts_low = ts_low_str; CompactRangeOptions comp_opts; comp_opts.full_history_ts_low = &ts_low; comp_opts.bottommost_level_compaction = BottommostLevelCompaction::kForce; ASSERT_OK(db_->CompactRange(comp_opts, nullptr, nullptr)); auto* cfd = static_cast_with_check(db_->DefaultColumnFamily()) ->cfd(); auto result_ts_low = cfd->GetFullHistoryTsLow(); ASSERT_TRUE(test_cmp.CompareTimestamp(ts_low, result_ts_low) == 0); for (int i = 0; i < 10; i++) { WriteOptions write_opts; std::string ts = Timestamp(i, 0); ASSERT_OK(db_->Put(write_opts, kKey, ts, Key(i))); } ASSERT_OK(Flush()); for (int i = 0; i < 10; i++) { ReadOptions read_opts; std::string ts_str = Timestamp(i, 0); Slice ts = ts_str; read_opts.timestamp = &ts; std::string value; Status status = db_->Get(read_opts, kKey, &value); if (i < current_ts_low) { ASSERT_TRUE(status.IsInvalidArgument()); } else { ASSERT_OK(status); ASSERT_TRUE(value.compare(Key(i)) == 0); } } // Test set ts_low and then trigger compaction for (int i = 10; i < 20; i++) { WriteOptions write_opts; std::string ts = Timestamp(i, 0); ASSERT_OK(db_->Put(write_opts, kKey, ts, Key(i))); } ASSERT_OK(Flush()); current_ts_low = 15; ts_low_str = Timestamp(current_ts_low, 0); ts_low = ts_low_str; comp_opts.full_history_ts_low = &ts_low; ASSERT_OK(db_->CompactRange(comp_opts, nullptr, nullptr)); result_ts_low = cfd->GetFullHistoryTsLow(); ASSERT_TRUE(test_cmp.CompareTimestamp(ts_low, result_ts_low) == 0); for (int i = current_ts_low; i < 20; i++) { ReadOptions read_opts; std::string ts_str = Timestamp(i, 0); Slice ts = ts_str; read_opts.timestamp = &ts; std::string value; Status status = db_->Get(read_opts, kKey, &value); ASSERT_OK(status); ASSERT_TRUE(value.compare(Key(i)) == 0); } // Test invalid compaction with range Slice start(kKey), end(kKey); Status s = db_->CompactRange(comp_opts, &start, &end); ASSERT_TRUE(s.IsInvalidArgument()); s = db_->CompactRange(comp_opts, &start, nullptr); ASSERT_TRUE(s.IsInvalidArgument()); s = db_->CompactRange(comp_opts, nullptr, &end); ASSERT_TRUE(s.IsInvalidArgument()); // Test invalid compaction with the decreasing ts_low ts_low_str = Timestamp(current_ts_low - 1, 0); ts_low = ts_low_str; comp_opts.full_history_ts_low = &ts_low; s = db_->CompactRange(comp_opts, nullptr, nullptr); ASSERT_TRUE(s.IsInvalidArgument()); Close(); } TEST_F(DBBasicTestWithTimestamp, UpdateFullHistoryTsLowWithPublicAPI) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); std::string ts_low_str = Timestamp(9, 0); ASSERT_OK( db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(), ts_low_str)); std::string result_ts_low; ASSERT_OK(db_->GetFullHistoryTsLow(nullptr, &result_ts_low)); ASSERT_TRUE(test_cmp.CompareTimestamp(ts_low_str, result_ts_low) == 0); // test increase full_history_low backward std::string ts_low_str_back = Timestamp(8, 0); auto s = db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(), ts_low_str_back); ASSERT_EQ(s, Status::InvalidArgument()); // test IncreaseFullHistoryTsLow with a timestamp whose length is longger // than the cf's timestamp size std::string ts_low_str_long(Timestamp(0, 0).size() + 1, 'a'); s = db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(), ts_low_str_long); ASSERT_EQ(s, Status::InvalidArgument()); // test IncreaseFullHistoryTsLow with a timestamp which is null std::string ts_low_str_null = ""; s = db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(), ts_low_str_null); ASSERT_EQ(s, Status::InvalidArgument()); // test IncreaseFullHistoryTsLow for a column family that does not enable // timestamp options.comparator = BytewiseComparator(); DestroyAndReopen(options); ts_low_str = Timestamp(10, 0); s = db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(), ts_low_str); ASSERT_EQ(s, Status::InvalidArgument()); // test GetFullHistoryTsLow for a column family that does not enable // timestamp std::string current_ts_low; s = db_->GetFullHistoryTsLow(db_->DefaultColumnFamily(), ¤t_ts_low); ASSERT_EQ(s, Status::InvalidArgument()); Close(); } TEST_F(DBBasicTestWithTimestamp, GetApproximateSizes) { Options options = CurrentOptions(); options.write_buffer_size = 100000000; // Large write buffer options.compression = kNoCompression; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); auto default_cf = db_->DefaultColumnFamily(); WriteOptions write_opts; std::string ts = Timestamp(1, 0); const int N = 128; Random rnd(301); for (int i = 0; i < N; i++) { ASSERT_OK(db_->Put(write_opts, Key(i), ts, rnd.RandomString(1024))); } uint64_t size; std::string start = Key(50); std::string end = Key(60); Range r(start, end); SizeApproximationOptions size_approx_options; size_approx_options.include_memtables = true; size_approx_options.include_files = true; ASSERT_OK( db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size)); ASSERT_GT(size, 6000); ASSERT_LT(size, 204800); // test multiple ranges std::vector ranges; std::string start_tmp = Key(10); std::string end_tmp = Key(20); ranges.emplace_back(Range(start_tmp, end_tmp)); ranges.emplace_back(Range(start, end)); uint64_t range_sizes[2]; ASSERT_OK(db_->GetApproximateSizes(size_approx_options, default_cf, ranges.data(), 2, range_sizes)); ASSERT_EQ(range_sizes[1], size); // Zero if not including mem table ASSERT_OK(db_->GetApproximateSizes(&r, 1, &size)); ASSERT_EQ(size, 0); start = Key(500); end = Key(600); r = Range(start, end); ASSERT_OK( db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size)); ASSERT_EQ(size, 0); // Test range boundaries ASSERT_OK(db_->Put(write_opts, Key(1000), ts, rnd.RandomString(1024))); // Should include start key start = Key(1000); end = Key(1100); r = Range(start, end); ASSERT_OK( db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size)); ASSERT_GT(size, 0); // Should exclude end key start = Key(900); end = Key(1000); r = Range(start, end); ASSERT_OK( db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size)); ASSERT_EQ(size, 0); Close(); } TEST_F(DBBasicTestWithTimestamp, SimpleIterate) { const int kNumKeysPerFile = 128; const uint64_t kMaxKey = 1024; Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; options.memtable_factory.reset( test::NewSpecialSkipListFactory(kNumKeysPerFile)); DestroyAndReopen(options); const std::vector start_keys = {1, 0}; const std::vector write_timestamps = {Timestamp(1, 0), Timestamp(3, 0)}; const std::vector read_timestamps = {Timestamp(2, 0), Timestamp(4, 0)}; for (size_t i = 0; i < write_timestamps.size(); ++i) { WriteOptions write_opts; for (uint64_t key = start_keys[i]; key <= kMaxKey; ++key) { Status s = db_->Put(write_opts, Key1(key), write_timestamps[i], "value" + std::to_string(i)); ASSERT_OK(s); } } for (size_t i = 0; i < read_timestamps.size(); ++i) { ReadOptions read_opts; Slice read_ts = read_timestamps[i]; read_opts.timestamp = &read_ts; std::unique_ptr it(db_->NewIterator(read_opts)); int count = 0; uint64_t key = 0; // Forward iterate. for (it->Seek(Key1(0)), key = start_keys[i]; it->Valid(); it->Next(), ++count, ++key) { CheckIterUserEntry(it.get(), Key1(key), kTypeValue, "value" + std::to_string(i), write_timestamps[i]); } size_t expected_count = kMaxKey - start_keys[i] + 1; ASSERT_EQ(expected_count, count); // Backward iterate. count = 0; for (it->SeekForPrev(Key1(kMaxKey)), key = kMaxKey; it->Valid(); it->Prev(), ++count, --key) { CheckIterUserEntry(it.get(), Key1(key), kTypeValue, "value" + std::to_string(i), write_timestamps[i]); } ASSERT_EQ(static_cast(kMaxKey) - start_keys[i] + 1, count); // SeekToFirst()/SeekToLast() with lower/upper bounds. // Then iter with lower and upper bounds. uint64_t l = 0; uint64_t r = kMaxKey + 1; while (l < r) { std::string lb_str = Key1(l); Slice lb = lb_str; std::string ub_str = Key1(r); Slice ub = ub_str; read_opts.iterate_lower_bound = &lb; read_opts.iterate_upper_bound = &ub; it.reset(db_->NewIterator(read_opts)); for (it->SeekToFirst(), key = std::max(l, start_keys[i]), count = 0; it->Valid(); it->Next(), ++key, ++count) { CheckIterUserEntry(it.get(), Key1(key), kTypeValue, "value" + std::to_string(i), write_timestamps[i]); } ASSERT_EQ(r - std::max(l, start_keys[i]), count); for (it->SeekToLast(), key = std::min(r, kMaxKey + 1), count = 0; it->Valid(); it->Prev(), --key, ++count) { CheckIterUserEntry(it.get(), Key1(key - 1), kTypeValue, "value" + std::to_string(i), write_timestamps[i]); } l += (kMaxKey / 100); r -= (kMaxKey / 100); } } Close(); } TEST_F(DBBasicTestWithTimestamp, TrimHistoryTest) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); auto check_value_by_ts = [](DB* db, Slice key, std::string readTs, Status status, std::string checkValue, std::string expected_ts) { ReadOptions ropts; Slice ts = readTs; ropts.timestamp = &ts; std::string value; std::string key_ts; Status s = db->Get(ropts, key, &value, &key_ts); ASSERT_TRUE(s == status); if (s.ok()) { ASSERT_EQ(checkValue, value); } if (s.ok() || s.IsNotFound()) { ASSERT_EQ(expected_ts, key_ts); } }; // Construct data of different versions with different ts ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(2, 0), "v1")); ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(4, 0), "v2")); ASSERT_OK(db_->Delete(WriteOptions(), "k1", Timestamp(5, 0))); ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(6, 0), "v3")); check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v3", Timestamp(6, 0)); ASSERT_OK(Flush()); Close(); ColumnFamilyOptions cf_options(options); std::vector column_families; column_families.push_back( ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); DBOptions db_options(options); // Trim data whose version > Timestamp(5, 0), read(k1, ts(7)) <- NOT_FOUND. ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families, &handles_, &db_, Timestamp(5, 0))); check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::NotFound(), "", Timestamp(5, 0)); Close(); // Trim data whose timestamp > Timestamp(4, 0), read(k1, ts(7)) <- v2 ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families, &handles_, &db_, Timestamp(4, 0))); check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v2", Timestamp(4, 0)); Close(); Reopen(options); ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "k1", "k3", Timestamp(7, 0))); check_value_by_ts(db_, "k1", Timestamp(8, 0), Status::NotFound(), "", Timestamp(7, 0)); Close(); // Trim data whose timestamp > Timestamp(6, 0), read(k1, ts(8)) <- v2 ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families, &handles_, &db_, Timestamp(6, 0))); check_value_by_ts(db_, "k1", Timestamp(8, 0), Status::OK(), "v2", Timestamp(4, 0)); Close(); } TEST_F(DBBasicTestWithTimestamp, OpenAndTrimHistoryInvalidOptionTest) { Destroy(last_options_); Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; ColumnFamilyOptions cf_options(options); std::vector column_families; column_families.push_back( ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); DBOptions db_options(options); // OpenAndTrimHistory should not work with avoid_flush_during_recovery db_options.avoid_flush_during_recovery = true; ASSERT_TRUE(DB::OpenAndTrimHistory(db_options, dbname_, column_families, &handles_, &db_, Timestamp(0, 0)) .IsInvalidArgument()); } #ifndef ROCKSDB_LITE TEST_F(DBBasicTestWithTimestamp, GetTimestampTableProperties) { Options options = CurrentOptions(); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); // Create 2 tables for (int table = 0; table < 2; ++table) { for (int i = 0; i < 10; i++) { std::string ts = Timestamp(i, 0); ASSERT_OK(db_->Put(WriteOptions(), "key", ts, Key(i))); } ASSERT_OK(Flush()); } TablePropertiesCollection props; ASSERT_OK(db_->GetPropertiesOfAllTables(&props)); ASSERT_EQ(2U, props.size()); for (const auto& item : props) { auto& user_collected = item.second->user_collected_properties; ASSERT_TRUE(user_collected.find("rocksdb.timestamp_min") != user_collected.end()); ASSERT_TRUE(user_collected.find("rocksdb.timestamp_max") != user_collected.end()); ASSERT_EQ(user_collected.at("rocksdb.timestamp_min"), Timestamp(0, 0)); ASSERT_EQ(user_collected.at("rocksdb.timestamp_max"), Timestamp(9, 0)); } Close(); } #endif // !ROCKSDB_LITE class DBBasicTestWithTimestampTableOptions : public DBBasicTestWithTimestampBase, public testing::WithParamInterface { public: explicit DBBasicTestWithTimestampTableOptions() : DBBasicTestWithTimestampBase( "db_basic_test_with_timestamp_table_options") {} }; INSTANTIATE_TEST_CASE_P( Timestamp, DBBasicTestWithTimestampTableOptions, testing::Values( BlockBasedTableOptions::IndexType::kBinarySearch, BlockBasedTableOptions::IndexType::kHashSearch, BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch, BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey)); TEST_P(DBBasicTestWithTimestampTableOptions, GetAndMultiGet) { Options options = GetDefaultOptions(); options.create_if_missing = true; options.prefix_extractor.reset(NewFixedPrefixTransform(3)); options.compression = kNoCompression; BlockBasedTableOptions bbto; bbto.index_type = GetParam(); bbto.block_size = 100; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator cmp(kTimestampSize); options.comparator = &cmp; DestroyAndReopen(options); constexpr uint64_t kNumKeys = 1024; for (uint64_t k = 0; k < kNumKeys; ++k) { WriteOptions write_opts; ASSERT_OK(db_->Put(write_opts, Key1(k), Timestamp(1, 0), "value" + std::to_string(k))); } ASSERT_OK(Flush()); { ReadOptions read_opts; read_opts.total_order_seek = true; std::string ts_str = Timestamp(2, 0); Slice ts = ts_str; read_opts.timestamp = &ts; std::unique_ptr it(db_->NewIterator(read_opts)); // verify Get() for (it->SeekToFirst(); it->Valid(); it->Next()) { std::string value_from_get; std::string key_str(it->key().data(), it->key().size()); std::string timestamp; ASSERT_OK(db_->Get(read_opts, key_str, &value_from_get, ×tamp)); ASSERT_EQ(it->value(), value_from_get); ASSERT_EQ(Timestamp(1, 0), timestamp); } // verify MultiGet() constexpr uint64_t step = 2; static_assert(0 == (kNumKeys % step), "kNumKeys must be a multiple of step"); for (uint64_t k = 0; k < kNumKeys; k += 2) { std::vector key_strs; std::vector keys; for (size_t i = 0; i < step; ++i) { key_strs.push_back(Key1(k + i)); } for (size_t i = 0; i < step; ++i) { keys.emplace_back(key_strs[i]); } std::vector values; std::vector timestamps; std::vector statuses = db_->MultiGet(read_opts, keys, &values, ×tamps); ASSERT_EQ(step, statuses.size()); ASSERT_EQ(step, values.size()); ASSERT_EQ(step, timestamps.size()); for (uint64_t i = 0; i < step; ++i) { ASSERT_OK(statuses[i]); ASSERT_EQ("value" + std::to_string(k + i), values[i]); ASSERT_EQ(Timestamp(1, 0), timestamps[i]); } } } Close(); } TEST_P(DBBasicTestWithTimestampTableOptions, SeekWithPrefixLessThanKey) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; options.prefix_extractor.reset(NewFixedPrefixTransform(3)); options.memtable_whole_key_filtering = true; options.memtable_prefix_bloom_size_ratio = 0.1; BlockBasedTableOptions bbto; bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); bbto.cache_index_and_filter_blocks = true; bbto.whole_key_filtering = true; bbto.index_type = GetParam(); options.table_factory.reset(NewBlockBasedTableFactory(bbto)); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); WriteOptions write_opts; std::string ts = Timestamp(1, 0); ASSERT_OK(db_->Put(write_opts, "foo1", ts, "bar")); ASSERT_OK(Flush()); ASSERT_OK(db_->Put(write_opts, "foo2", ts, "bar")); ASSERT_OK(Flush()); // Move sst file to next level ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_OK(db_->Put(write_opts, "foo3", ts, "bar")); ASSERT_OK(Flush()); ReadOptions read_opts; Slice read_ts = ts; read_opts.timestamp = &read_ts; { std::unique_ptr iter(db_->NewIterator(read_opts)); iter->Seek("foo"); ASSERT_TRUE(iter->Valid()); ASSERT_OK(iter->status()); iter->Next(); ASSERT_TRUE(iter->Valid()); ASSERT_OK(iter->status()); iter->Seek("bbb"); ASSERT_FALSE(iter->Valid()); ASSERT_OK(iter->status()); } Close(); } TEST_P(DBBasicTestWithTimestampTableOptions, SeekWithCappedPrefix) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; // All of the keys or this test must be longer than 3 characters constexpr int kMinKeyLen = 3; options.prefix_extractor.reset(NewCappedPrefixTransform(kMinKeyLen)); options.memtable_whole_key_filtering = true; options.memtable_prefix_bloom_size_ratio = 0.1; BlockBasedTableOptions bbto; bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); bbto.cache_index_and_filter_blocks = true; bbto.whole_key_filtering = true; bbto.index_type = GetParam(); options.table_factory.reset(NewBlockBasedTableFactory(bbto)); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); WriteOptions write_opts; std::string ts = Timestamp(1, 0); ASSERT_OK(db_->Put(write_opts, "foo1", ts, "bar")); ASSERT_OK(Flush()); ASSERT_OK(db_->Put(write_opts, "foo2", ts, "bar")); ASSERT_OK(Flush()); // Move sst file to next level ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_OK(db_->Put(write_opts, "foo3", ts, "bar")); ASSERT_OK(Flush()); ReadOptions read_opts; ts = Timestamp(2, 0); Slice read_ts = ts; read_opts.timestamp = &read_ts; { std::unique_ptr iter(db_->NewIterator(read_opts)); // Make sure the prefix extractor doesn't include timestamp, otherwise it // may return invalid result. iter->Seek("foo"); ASSERT_TRUE(iter->Valid()); ASSERT_OK(iter->status()); iter->Next(); ASSERT_TRUE(iter->Valid()); ASSERT_OK(iter->status()); } Close(); } TEST_P(DBBasicTestWithTimestampTableOptions, SeekWithBound) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; options.prefix_extractor.reset(NewFixedPrefixTransform(2)); BlockBasedTableOptions bbto; bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); bbto.cache_index_and_filter_blocks = true; bbto.whole_key_filtering = true; bbto.index_type = GetParam(); options.table_factory.reset(NewBlockBasedTableFactory(bbto)); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); WriteOptions write_opts; std::string ts = Timestamp(1, 0); ASSERT_OK(db_->Put(write_opts, "foo1", ts, "bar1")); ASSERT_OK(Flush()); ASSERT_OK(db_->Put(write_opts, "foo2", ts, "bar2")); ASSERT_OK(Flush()); // Move sst file to next level ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); for (int i = 3; i < 9; ++i) { ASSERT_OK(db_->Put(write_opts, "foo" + std::to_string(i), ts, "bar" + std::to_string(i))); } ASSERT_OK(Flush()); ReadOptions read_opts; ts = Timestamp(2, 0); Slice read_ts = ts; read_opts.timestamp = &read_ts; std::string up_bound = "foo5"; // exclusive Slice up_bound_slice = up_bound; std::string lo_bound = "foo2"; // inclusive Slice lo_bound_slice = lo_bound; read_opts.iterate_upper_bound = &up_bound_slice; read_opts.iterate_lower_bound = &lo_bound_slice; read_opts.auto_prefix_mode = true; { std::unique_ptr iter(db_->NewIterator(read_opts)); // Make sure the prefix extractor doesn't include timestamp, otherwise it // may return invalid result. iter->Seek("foo"); CheckIterUserEntry(iter.get(), lo_bound, kTypeValue, "bar2", Timestamp(1, 0)); iter->SeekToFirst(); CheckIterUserEntry(iter.get(), lo_bound, kTypeValue, "bar2", Timestamp(1, 0)); iter->SeekForPrev("g"); CheckIterUserEntry(iter.get(), "foo4", kTypeValue, "bar4", Timestamp(1, 0)); iter->SeekToLast(); CheckIterUserEntry(iter.get(), "foo4", kTypeValue, "bar4", Timestamp(1, 0)); } Close(); } TEST_F(DBBasicTestWithTimestamp, ChangeIterationDirection) { Options options = GetDefaultOptions(); options.create_if_missing = true; options.env = env_; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; options.prefix_extractor.reset(NewFixedPrefixTransform(1)); options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); DestroyAndReopen(options); const std::vector timestamps = {Timestamp(1, 1), Timestamp(0, 2), Timestamp(4, 3)}; const std::vector> kvs = { std::make_tuple("aa", "value1"), std::make_tuple("ab", "value2")}; for (const auto& ts : timestamps) { WriteBatch wb(0, 0, 0, kTimestampSize); for (const auto& kv : kvs) { const std::string& key = std::get<0>(kv); const std::string& value = std::get<1>(kv); ASSERT_OK(wb.Put(key, value)); } ASSERT_OK(wb.UpdateTimestamps( ts, [kTimestampSize](uint32_t) { return kTimestampSize; })); ASSERT_OK(db_->Write(WriteOptions(), &wb)); } std::string read_ts_str = Timestamp(5, 3); Slice read_ts = read_ts_str; ReadOptions read_opts; read_opts.timestamp = &read_ts; std::unique_ptr it(db_->NewIterator(read_opts)); it->SeekToFirst(); ASSERT_TRUE(it->Valid()); it->Prev(); ASSERT_FALSE(it->Valid()); it->SeekToLast(); ASSERT_TRUE(it->Valid()); uint64_t prev_reseek_count = options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION); ASSERT_EQ(0, prev_reseek_count); it->Next(); ASSERT_FALSE(it->Valid()); ASSERT_EQ(1 + prev_reseek_count, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION)); it->Seek(std::get<0>(kvs[0])); CheckIterUserEntry(it.get(), std::get<0>(kvs[0]), kTypeValue, std::get<1>(kvs[0]), Timestamp(4, 3)); it->Next(); CheckIterUserEntry(it.get(), std::get<0>(kvs[1]), kTypeValue, std::get<1>(kvs[1]), Timestamp(4, 3)); it->Prev(); CheckIterUserEntry(it.get(), std::get<0>(kvs[0]), kTypeValue, std::get<1>(kvs[0]), Timestamp(4, 3)); prev_reseek_count = options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION); ASSERT_EQ(1, prev_reseek_count); it->Next(); CheckIterUserEntry(it.get(), std::get<0>(kvs[1]), kTypeValue, std::get<1>(kvs[1]), Timestamp(4, 3)); ASSERT_EQ(1 + prev_reseek_count, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION)); it->SeekForPrev(std::get<0>(kvs[1])); CheckIterUserEntry(it.get(), std::get<0>(kvs[1]), kTypeValue, std::get<1>(kvs[1]), Timestamp(4, 3)); it->Prev(); CheckIterUserEntry(it.get(), std::get<0>(kvs[0]), kTypeValue, std::get<1>(kvs[0]), Timestamp(4, 3)); prev_reseek_count = options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION); it->Next(); CheckIterUserEntry(it.get(), std::get<0>(kvs[1]), kTypeValue, std::get<1>(kvs[1]), Timestamp(4, 3)); ASSERT_EQ(1 + prev_reseek_count, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION)); it.reset(); Close(); } TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) { constexpr int kNumKeysPerFile = 128; constexpr uint64_t kMaxKey = 1024; Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; options.memtable_factory.reset( test::NewSpecialSkipListFactory(kNumKeysPerFile)); DestroyAndReopen(options); const std::vector write_timestamps = {Timestamp(1, 0), Timestamp(3, 0)}; const std::vector read_timestamps = {Timestamp(2, 0), Timestamp(4, 0)}; const std::vector read_timestamps_lb = {Timestamp(1, 0), Timestamp(1, 0)}; for (size_t i = 0; i < write_timestamps.size(); ++i) { WriteOptions write_opts; for (uint64_t key = 0; key <= kMaxKey; ++key) { Status s = db_->Put(write_opts, Key1(key), write_timestamps[i], "value" + std::to_string(i)); ASSERT_OK(s); } } for (size_t i = 0; i < read_timestamps.size(); ++i) { ReadOptions read_opts; Slice read_ts = read_timestamps[i]; Slice read_ts_lb = read_timestamps_lb[i]; read_opts.timestamp = &read_ts; read_opts.iter_start_ts = &read_ts_lb; std::unique_ptr it(db_->NewIterator(read_opts)); int count = 0; uint64_t key = 0; for (it->Seek(Key1(0)), key = 0; it->Valid(); it->Next(), ++count, ++key) { CheckIterEntry(it.get(), Key1(key), kTypeValue, "value" + std::to_string(i), write_timestamps[i]); if (i > 0) { it->Next(); CheckIterEntry(it.get(), Key1(key), kTypeValue, "value" + std::to_string(i - 1), write_timestamps[i - 1]); } } size_t expected_count = kMaxKey + 1; ASSERT_EQ(expected_count, count); } // Delete all keys@ts=5 and check iteration result with start ts set { std::string write_timestamp = Timestamp(5, 0); WriteOptions write_opts; for (uint64_t key = 0; key < kMaxKey + 1; ++key) { Status s = db_->Delete(write_opts, Key1(key), write_timestamp); ASSERT_OK(s); } std::string read_timestamp = Timestamp(6, 0); ReadOptions read_opts; Slice read_ts = read_timestamp; read_opts.timestamp = &read_ts; std::string read_timestamp_lb = Timestamp(2, 0); Slice read_ts_lb = read_timestamp_lb; read_opts.iter_start_ts = &read_ts_lb; std::unique_ptr it(db_->NewIterator(read_opts)); int count = 0; uint64_t key = 0; for (it->Seek(Key1(0)), key = 0; it->Valid(); it->Next(), ++count, ++key) { CheckIterEntry(it.get(), Key1(key), kTypeDeletionWithTimestamp, Slice(), write_timestamp); // Skip key@ts=3 and land on tombstone key@ts=5 it->Next(); } ASSERT_EQ(kMaxKey + 1, count); } Close(); } TEST_F(DBBasicTestWithTimestamp, BackwardIterateLowerTsBound) { constexpr int kNumKeysPerFile = 128; constexpr uint64_t kMaxKey = 1024; Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; options.memtable_factory.reset( test::NewSpecialSkipListFactory(kNumKeysPerFile)); DestroyAndReopen(options); const std::vector write_timestamps = {Timestamp(1, 0), Timestamp(3, 0)}; const std::vector read_timestamps = {Timestamp(2, 0), Timestamp(4, 0)}; const std::vector read_timestamps_lb = {Timestamp(1, 0), Timestamp(1, 0)}; for (size_t i = 0; i < write_timestamps.size(); ++i) { WriteOptions write_opts; for (uint64_t key = 0; key <= kMaxKey; ++key) { Status s = db_->Put(write_opts, Key1(key), write_timestamps[i], "value" + std::to_string(i)); ASSERT_OK(s); } } for (size_t i = 0; i < read_timestamps.size(); ++i) { ReadOptions read_opts; Slice read_ts = read_timestamps[i]; Slice read_ts_lb = read_timestamps_lb[i]; read_opts.timestamp = &read_ts; read_opts.iter_start_ts = &read_ts_lb; std::unique_ptr it(db_->NewIterator(read_opts)); int count = 0; uint64_t key = 0; for (it->SeekForPrev(Key1(kMaxKey)), key = kMaxKey; it->Valid(); it->Prev(), ++count, --key) { CheckIterEntry(it.get(), Key1(key), kTypeValue, "value0", write_timestamps[0]); if (i > 0) { it->Prev(); CheckIterEntry(it.get(), Key1(key), kTypeValue, "value1", write_timestamps[1]); } } size_t expected_count = kMaxKey + 1; ASSERT_EQ(expected_count, count); } // Delete all keys@ts=5 and check iteration result with start ts set { std::string write_timestamp = Timestamp(5, 0); WriteOptions write_opts; for (uint64_t key = 0; key < kMaxKey + 1; ++key) { Status s = db_->Delete(write_opts, Key1(key), write_timestamp); ASSERT_OK(s); } std::string read_timestamp = Timestamp(6, 0); ReadOptions read_opts; Slice read_ts = read_timestamp; read_opts.timestamp = &read_ts; std::string read_timestamp_lb = Timestamp(2, 0); Slice read_ts_lb = read_timestamp_lb; read_opts.iter_start_ts = &read_ts_lb; std::unique_ptr it(db_->NewIterator(read_opts)); int count = 0; uint64_t key = kMaxKey; for (it->SeekForPrev(Key1(key)), key = kMaxKey; it->Valid(); it->Prev(), ++count, --key) { CheckIterEntry(it.get(), Key1(key), kTypeValue, "value1", Timestamp(3, 0)); it->Prev(); CheckIterEntry(it.get(), Key1(key), kTypeDeletionWithTimestamp, Slice(), write_timestamp); } ASSERT_EQ(kMaxKey + 1, count); } Close(); } TEST_F(DBBasicTestWithTimestamp, SimpleBackwardIterateLowerTsBound) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); std::string ts_ub_buf = Timestamp(5, 0); Slice ts_ub = ts_ub_buf; std::string ts_lb_buf = Timestamp(1, 0); Slice ts_lb = ts_lb_buf; { ReadOptions read_opts; read_opts.timestamp = &ts_ub; read_opts.iter_start_ts = &ts_lb; std::unique_ptr it(db_->NewIterator(read_opts)); it->SeekToLast(); ASSERT_FALSE(it->Valid()); ASSERT_OK(it->status()); it->SeekForPrev("foo"); ASSERT_FALSE(it->Valid()); ASSERT_OK(it->status()); } // Test iterate_upper_bound ASSERT_OK(db_->Put(WriteOptions(), "a", Timestamp(0, 0), "v0")); ASSERT_OK(db_->SingleDelete(WriteOptions(), "a", Timestamp(1, 0))); for (int i = 0; i < 5; ++i) { ASSERT_OK(db_->Put(WriteOptions(), "b", Timestamp(i, 0), "v" + std::to_string(i))); } { ReadOptions read_opts; read_opts.timestamp = &ts_ub; read_opts.iter_start_ts = &ts_lb; std::string key_ub_str = "b"; // exclusive Slice key_ub = key_ub_str; read_opts.iterate_upper_bound = &key_ub; std::unique_ptr it(db_->NewIterator(read_opts)); it->SeekToLast(); CheckIterEntry(it.get(), "a", kTypeSingleDeletion, Slice(), Timestamp(1, 0)); key_ub_str = "a"; // exclusive key_ub = key_ub_str; read_opts.iterate_upper_bound = &key_ub; it.reset(db_->NewIterator(read_opts)); it->SeekToLast(); ASSERT_FALSE(it->Valid()); ASSERT_OK(it->status()); } Close(); } TEST_F(DBBasicTestWithTimestamp, BackwardIterateLowerTsBound_Reseek) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; options.max_sequential_skip_in_iterations = 2; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); for (int i = 0; i < 10; ++i) { ASSERT_OK(db_->Put(WriteOptions(), "a", Timestamp(i, 0), "v" + std::to_string(i))); } for (int i = 0; i < 10; ++i) { ASSERT_OK(db_->Put(WriteOptions(), "b", Timestamp(i, 0), "v" + std::to_string(i))); } { std::string ts_ub_buf = Timestamp(6, 0); Slice ts_ub = ts_ub_buf; std::string ts_lb_buf = Timestamp(4, 0); Slice ts_lb = ts_lb_buf; ReadOptions read_opts; read_opts.timestamp = &ts_ub; read_opts.iter_start_ts = &ts_lb; std::unique_ptr it(db_->NewIterator(read_opts)); it->SeekToLast(); for (int i = 0; i < 3 && it->Valid(); it->Prev(), ++i) { CheckIterEntry(it.get(), "b", kTypeValue, "v" + std::to_string(4 + i), Timestamp(4 + i, 0)); } for (int i = 0; i < 3 && it->Valid(); it->Prev(), ++i) { CheckIterEntry(it.get(), "a", kTypeValue, "v" + std::to_string(4 + i), Timestamp(4 + i, 0)); } } Close(); } TEST_F(DBBasicTestWithTimestamp, ReseekToTargetTimestamp) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; constexpr size_t kNumKeys = 16; options.max_sequential_skip_in_iterations = kNumKeys / 2; options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); // Insert kNumKeys WriteOptions write_opts; Status s; for (size_t i = 0; i != kNumKeys; ++i) { std::string ts = Timestamp(static_cast(i + 1), 0); s = db_->Put(write_opts, "foo", ts, "value" + std::to_string(i)); ASSERT_OK(s); } { ReadOptions read_opts; std::string ts_str = Timestamp(1, 0); Slice ts = ts_str; read_opts.timestamp = &ts; std::unique_ptr iter(db_->NewIterator(read_opts)); iter->SeekToFirst(); CheckIterUserEntry(iter.get(), "foo", kTypeValue, "value0", ts_str); ASSERT_EQ( 1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION)); ts_str = Timestamp(kNumKeys, 0); ts = ts_str; read_opts.timestamp = &ts; iter.reset(db_->NewIterator(read_opts)); iter->SeekToLast(); CheckIterUserEntry(iter.get(), "foo", kTypeValue, "value" + std::to_string(kNumKeys - 1), ts_str); ASSERT_EQ( 2, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION)); } Close(); } TEST_F(DBBasicTestWithTimestamp, ReseekToNextUserKey) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; constexpr size_t kNumKeys = 16; options.max_sequential_skip_in_iterations = kNumKeys / 2; options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); // Write kNumKeys + 1 keys WriteOptions write_opts; Status s; for (size_t i = 0; i != kNumKeys; ++i) { std::string ts = Timestamp(static_cast(i + 1), 0); s = db_->Put(write_opts, "a", ts, "value" + std::to_string(i)); ASSERT_OK(s); } { std::string ts_str = Timestamp(static_cast(kNumKeys + 1), 0); WriteBatch batch(0, 0, 0, kTimestampSize); { ASSERT_OK(batch.Put("a", "new_value")); } { ASSERT_OK(batch.Put("b", "new_value")); } s = batch.UpdateTimestamps( ts_str, [kTimestampSize](uint32_t) { return kTimestampSize; }); ASSERT_OK(s); s = db_->Write(write_opts, &batch); ASSERT_OK(s); } { ReadOptions read_opts; std::string ts_str = Timestamp(static_cast(kNumKeys + 1), 0); Slice ts = ts_str; read_opts.timestamp = &ts; std::unique_ptr iter(db_->NewIterator(read_opts)); iter->Seek("a"); iter->Next(); CheckIterUserEntry(iter.get(), "b", kTypeValue, "new_value", ts_str); ASSERT_EQ( 1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION)); } Close(); } TEST_F(DBBasicTestWithTimestamp, ReseekToUserKeyBeforeSavedKey) { Options options = GetDefaultOptions(); options.env = env_; options.create_if_missing = true; constexpr size_t kNumKeys = 16; options.max_sequential_skip_in_iterations = kNumKeys / 2; options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); for (size_t i = 0; i < kNumKeys; ++i) { std::string ts = Timestamp(static_cast(i + 1), 0); WriteOptions write_opts; Status s = db_->Put(write_opts, "b", ts, "value" + std::to_string(i)); ASSERT_OK(s); } { std::string ts = Timestamp(1, 0); WriteOptions write_opts; ASSERT_OK(db_->Put(write_opts, "a", ts, "value")); } { ReadOptions read_opts; std::string ts_str = Timestamp(1, 0); Slice ts = ts_str; read_opts.timestamp = &ts; std::unique_ptr iter(db_->NewIterator(read_opts)); iter->SeekToLast(); iter->Prev(); CheckIterUserEntry(iter.get(), "a", kTypeValue, "value", ts_str); ASSERT_EQ( 1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION)); } Close(); } TEST_F(DBBasicTestWithTimestamp, MultiGetWithFastLocalBloom) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; BlockBasedTableOptions bbto; bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); bbto.cache_index_and_filter_blocks = true; bbto.whole_key_filtering = true; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); // Write any value WriteOptions write_opts; std::string ts = Timestamp(1, 0); ASSERT_OK(db_->Put(write_opts, "foo", ts, "bar")); ASSERT_OK(Flush()); // Read with MultiGet ReadOptions read_opts; Slice read_ts = ts; read_opts.timestamp = &read_ts; size_t batch_size = 1; std::vector keys(batch_size); std::vector values(batch_size); std::vector statuses(batch_size); std::vector timestamps(batch_size); keys[0] = "foo"; ColumnFamilyHandle* cfh = db_->DefaultColumnFamily(); db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(), timestamps.data(), statuses.data(), true); ASSERT_OK(statuses[0]); ASSERT_EQ(Timestamp(1, 0), timestamps[0]); for (auto& elem : values) { elem.Reset(); } ASSERT_OK(db_->SingleDelete(WriteOptions(), "foo", Timestamp(2, 0))); ts = Timestamp(3, 0); read_ts = ts; read_opts.timestamp = &read_ts; db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(), timestamps.data(), statuses.data(), true); ASSERT_TRUE(statuses[0].IsNotFound()); ASSERT_EQ(Timestamp(2, 0), timestamps[0]); Close(); } TEST_P(DBBasicTestWithTimestampTableOptions, MultiGetWithPrefix) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; options.prefix_extractor.reset(NewCappedPrefixTransform(5)); BlockBasedTableOptions bbto; bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); bbto.cache_index_and_filter_blocks = true; bbto.whole_key_filtering = false; bbto.index_type = GetParam(); options.table_factory.reset(NewBlockBasedTableFactory(bbto)); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); // Write any value WriteOptions write_opts; std::string ts = Timestamp(1, 0); ASSERT_OK(db_->Put(write_opts, "foo", ts, "bar")); ASSERT_OK(Flush()); // Read with MultiGet ReadOptions read_opts; Slice read_ts = ts; read_opts.timestamp = &read_ts; size_t batch_size = 1; std::vector keys(batch_size); std::vector values(batch_size); std::vector statuses(batch_size); std::vector timestamps(batch_size); keys[0] = "foo"; ColumnFamilyHandle* cfh = db_->DefaultColumnFamily(); db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(), timestamps.data(), statuses.data(), true); ASSERT_OK(statuses[0]); ASSERT_EQ(Timestamp(1, 0), timestamps[0]); for (auto& elem : values) { elem.Reset(); } ASSERT_OK(db_->SingleDelete(WriteOptions(), "foo", Timestamp(2, 0))); // TODO re-enable after fixing a bug of kHashSearch if (GetParam() != BlockBasedTableOptions::IndexType::kHashSearch) { ASSERT_OK(Flush()); } ts = Timestamp(3, 0); read_ts = ts; db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(), timestamps.data(), statuses.data(), true); ASSERT_TRUE(statuses[0].IsNotFound()); ASSERT_EQ(Timestamp(2, 0), timestamps[0]); Close(); } TEST_P(DBBasicTestWithTimestampTableOptions, MultiGetWithMemBloomFilter) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; options.prefix_extractor.reset(NewCappedPrefixTransform(5)); BlockBasedTableOptions bbto; bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); bbto.cache_index_and_filter_blocks = true; bbto.whole_key_filtering = false; bbto.index_type = GetParam(); options.memtable_prefix_bloom_size_ratio = 0.1; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); // Write any value WriteOptions write_opts; std::string ts = Timestamp(1, 0); ASSERT_OK(db_->Put(write_opts, "foo", ts, "bar")); // Read with MultiGet ts = Timestamp(2, 0); Slice read_ts = ts; ReadOptions read_opts; read_opts.timestamp = &read_ts; size_t batch_size = 1; std::vector keys(batch_size); std::vector values(batch_size); std::vector statuses(batch_size); keys[0] = "foo"; ColumnFamilyHandle* cfh = db_->DefaultColumnFamily(); db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(), statuses.data()); ASSERT_OK(statuses[0]); Close(); } TEST_F(DBBasicTestWithTimestamp, MultiGetRangeFiltering) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; BlockBasedTableOptions bbto; bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); bbto.cache_index_and_filter_blocks = true; bbto.whole_key_filtering = false; options.memtable_prefix_bloom_size_ratio = 0.1; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); // Write any value WriteOptions write_opts; std::string ts = Timestamp(1, 0); // random data for (int i = 0; i < 3; i++) { auto key = std::to_string(i * 10); auto value = std::to_string(i * 10); Slice key_slice = key; Slice value_slice = value; ASSERT_OK(db_->Put(write_opts, key_slice, ts, value_slice)); ASSERT_OK(Flush()); } // Make num_levels to 2 to do key range filtering of sst files ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_OK(db_->Put(write_opts, "foo", ts, "bar")); ASSERT_OK(Flush()); // Read with MultiGet ts = Timestamp(2, 0); Slice read_ts = ts; ReadOptions read_opts; read_opts.timestamp = &read_ts; size_t batch_size = 1; std::vector keys(batch_size); std::vector values(batch_size); std::vector statuses(batch_size); keys[0] = "foo"; ColumnFamilyHandle* cfh = db_->DefaultColumnFamily(); db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(), statuses.data()); ASSERT_OK(statuses[0]); Close(); } TEST_P(DBBasicTestWithTimestampTableOptions, MultiGetPrefixFilter) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; options.prefix_extractor.reset(NewCappedPrefixTransform(3)); BlockBasedTableOptions bbto; bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); bbto.cache_index_and_filter_blocks = true; bbto.whole_key_filtering = false; bbto.index_type = GetParam(); options.memtable_prefix_bloom_size_ratio = 0.1; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); WriteOptions write_opts; std::string ts = Timestamp(1, 0); ASSERT_OK(db_->Put(write_opts, "foo", ts, "bar")); ASSERT_OK(Flush()); // Read with MultiGet ts = Timestamp(2, 0); Slice read_ts = ts; ReadOptions read_opts; read_opts.timestamp = &read_ts; size_t batch_size = 1; std::vector keys(batch_size); std::vector values(batch_size); std::vector timestamps(batch_size); keys[0] = "foo"; ColumnFamilyHandle* cfh = db_->DefaultColumnFamily(); std::vector cfhs(keys.size(), cfh); std::vector statuses = db_->MultiGet(read_opts, cfhs, keys, &values, ×tamps); ASSERT_OK(statuses[0]); Close(); } TEST_F(DBBasicTestWithTimestamp, MaxKeysSkippedDuringNext) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); constexpr size_t max_skippable_internal_keys = 2; const size_t kNumKeys = max_skippable_internal_keys + 2; WriteOptions write_opts; Status s; { std::string ts = Timestamp(1, 0); ASSERT_OK(db_->Put(write_opts, "a", ts, "value")); } for (size_t i = 0; i < kNumKeys; ++i) { std::string ts = Timestamp(static_cast(i + 1), 0); s = db_->Put(write_opts, "b", ts, "value" + std::to_string(i)); ASSERT_OK(s); } { ReadOptions read_opts; read_opts.max_skippable_internal_keys = max_skippable_internal_keys; std::string ts_str = Timestamp(1, 0); Slice ts = ts_str; read_opts.timestamp = &ts; std::unique_ptr iter(db_->NewIterator(read_opts)); iter->SeekToFirst(); iter->Next(); ASSERT_TRUE(iter->status().IsIncomplete()); } Close(); } TEST_F(DBBasicTestWithTimestamp, MaxKeysSkippedDuringPrev) { Options options = GetDefaultOptions(); options.env = env_; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); constexpr size_t max_skippable_internal_keys = 2; const size_t kNumKeys = max_skippable_internal_keys + 2; WriteOptions write_opts; Status s; { std::string ts = Timestamp(1, 0); ASSERT_OK(db_->Put(write_opts, "b", ts, "value")); } for (size_t i = 0; i < kNumKeys; ++i) { std::string ts = Timestamp(static_cast(i + 1), 0); s = db_->Put(write_opts, "a", ts, "value" + std::to_string(i)); ASSERT_OK(s); } { ReadOptions read_opts; read_opts.max_skippable_internal_keys = max_skippable_internal_keys; std::string ts_str = Timestamp(1, 0); Slice ts = ts_str; read_opts.timestamp = &ts; std::unique_ptr iter(db_->NewIterator(read_opts)); iter->SeekToLast(); iter->Prev(); ASSERT_TRUE(iter->status().IsIncomplete()); } Close(); } // Create two L0, and compact them to a new L1. In this test, L1 is L_bottom. // Two L0s: // f1 f2 // ... // Since f2.smallest < f1.largest < f2.largest // f1 and f2 will be the inputs of a real compaction instead of trivial move. TEST_F(DBBasicTestWithTimestamp, CompactDeletionWithTimestampMarkerToBottom) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; options.num_levels = 2; options.level0_file_num_compaction_trigger = 2; DestroyAndReopen(options); WriteOptions write_opts; std::string ts = Timestamp(1, 0); ASSERT_OK(db_->Put(write_opts, "a", ts, "value0")); ASSERT_OK(Flush()); ts = Timestamp(2, 0); ASSERT_OK(db_->Put(write_opts, "b", ts, "value0")); ts = Timestamp(3, 0); ASSERT_OK(db_->Delete(write_opts, "a", ts)); ASSERT_OK(Flush()); ASSERT_OK(dbfull()->TEST_WaitForCompact()); ReadOptions read_opts; ts = Timestamp(1, 0); Slice read_ts = ts; read_opts.timestamp = &read_ts; std::string value; Status s = db_->Get(read_opts, "a", &value); ASSERT_OK(s); ASSERT_EQ("value0", value); ts = Timestamp(3, 0); read_ts = ts; read_opts.timestamp = &read_ts; std::string key_ts; s = db_->Get(read_opts, "a", &value, &key_ts); ASSERT_TRUE(s.IsNotFound()); ASSERT_EQ(Timestamp(3, 0), key_ts); // Time-travel to the past before deletion ts = Timestamp(2, 0); read_ts = ts; read_opts.timestamp = &read_ts; s = db_->Get(read_opts, "a", &value); ASSERT_OK(s); ASSERT_EQ("value0", value); Close(); } #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) class DBBasicTestWithTimestampFilterPrefixSettings : public DBBasicTestWithTimestampBase, public testing::WithParamInterface< std::tuple, bool, bool, std::shared_ptr, bool, double, BlockBasedTableOptions::IndexType>> { public: DBBasicTestWithTimestampFilterPrefixSettings() : DBBasicTestWithTimestampBase( "db_basic_test_with_timestamp_filter_prefix") {} }; TEST_P(DBBasicTestWithTimestampFilterPrefixSettings, GetAndMultiGet) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; BlockBasedTableOptions bbto; bbto.filter_policy = std::get<0>(GetParam()); bbto.whole_key_filtering = std::get<1>(GetParam()); bbto.cache_index_and_filter_blocks = std::get<2>(GetParam()); bbto.index_type = std::get<6>(GetParam()); options.table_factory.reset(NewBlockBasedTableFactory(bbto)); options.prefix_extractor = std::get<3>(GetParam()); options.memtable_whole_key_filtering = std::get<4>(GetParam()); options.memtable_prefix_bloom_size_ratio = std::get<5>(GetParam()); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); const int kMaxKey = 1000; // Write any value WriteOptions write_opts; std::string ts = Timestamp(1, 0); int idx = 0; for (; idx < kMaxKey / 4; idx++) { ASSERT_OK(db_->Put(write_opts, Key1(idx), ts, "bar")); ASSERT_OK(db_->Put(write_opts, KeyWithPrefix("foo", idx), ts, "bar")); } ASSERT_OK(Flush()); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); for (; idx < kMaxKey / 2; idx++) { ASSERT_OK(db_->Put(write_opts, Key1(idx), ts, "bar")); ASSERT_OK(db_->Put(write_opts, KeyWithPrefix("foo", idx), ts, "bar")); } ASSERT_OK(Flush()); for (; idx < kMaxKey; idx++) { ASSERT_OK(db_->Put(write_opts, Key1(idx), ts, "bar")); ASSERT_OK(db_->Put(write_opts, KeyWithPrefix("foo", idx), ts, "bar")); } // Read with MultiGet ReadOptions read_opts; Slice read_ts = ts; read_opts.timestamp = &read_ts; for (idx = 0; idx < kMaxKey; idx++) { size_t batch_size = 4; std::vector keys_str(batch_size); std::vector values(batch_size); std::vector statuses(batch_size); ColumnFamilyHandle* cfh = db_->DefaultColumnFamily(); keys_str[0] = Key1(idx); keys_str[1] = KeyWithPrefix("foo", idx); keys_str[2] = Key1(kMaxKey + idx); keys_str[3] = KeyWithPrefix("foo", kMaxKey + idx); auto keys = ConvertStrToSlice(keys_str); db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(), statuses.data()); for (int i = 0; i < 2; i++) { ASSERT_OK(statuses[i]); } for (int i = 2; i < 4; i++) { ASSERT_TRUE(statuses[i].IsNotFound()); } for (int i = 0; i < 2; i++) { std::string value; ASSERT_OK(db_->Get(read_opts, keys[i], &value)); std::unique_ptr it1(db_->NewIterator(read_opts)); ASSERT_NE(nullptr, it1); ASSERT_OK(it1->status()); it1->Seek(keys[i]); ASSERT_TRUE(it1->Valid()); } for (int i = 2; i < 4; i++) { std::string value; Status s = db_->Get(read_opts, keys[i], &value); ASSERT_TRUE(s.IsNotFound()); } } Close(); } INSTANTIATE_TEST_CASE_P( Timestamp, DBBasicTestWithTimestampFilterPrefixSettings, ::testing::Combine( ::testing::Values( std::shared_ptr(nullptr), std::shared_ptr(NewBloomFilterPolicy(10, true)), std::shared_ptr(NewBloomFilterPolicy(10, false))), ::testing::Bool(), ::testing::Bool(), ::testing::Values( std::shared_ptr(NewFixedPrefixTransform(1)), std::shared_ptr(NewFixedPrefixTransform(4)), std::shared_ptr(NewFixedPrefixTransform(7)), std::shared_ptr(NewFixedPrefixTransform(8))), ::testing::Bool(), ::testing::Values(0, 0.1), ::testing::Values( BlockBasedTableOptions::IndexType::kBinarySearch, BlockBasedTableOptions::IndexType::kHashSearch, BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch, BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey))); #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) class DataVisibilityTest : public DBBasicTestWithTimestampBase { public: DataVisibilityTest() : DBBasicTestWithTimestampBase("data_visibility_test") { // Initialize test data for (int i = 0; i < kTestDataSize; i++) { test_data_[i].key = "key" + std::to_string(i); test_data_[i].value = "value" + std::to_string(i); test_data_[i].timestamp = Timestamp(i, 0); test_data_[i].ts = i; test_data_[i].seq_num = kMaxSequenceNumber; } } protected: struct TestData { std::string key; std::string value; int ts; std::string timestamp; SequenceNumber seq_num; }; constexpr static int kTestDataSize = 3; TestData test_data_[kTestDataSize]; void PutTestData(int index, ColumnFamilyHandle* cfh = nullptr) { ASSERT_LE(index, kTestDataSize); WriteOptions write_opts; if (cfh == nullptr) { ASSERT_OK(db_->Put(write_opts, test_data_[index].key, test_data_[index].timestamp, test_data_[index].value)); const Snapshot* snap = db_->GetSnapshot(); test_data_[index].seq_num = snap->GetSequenceNumber(); if (index > 0) { ASSERT_GT(test_data_[index].seq_num, test_data_[index - 1].seq_num); } db_->ReleaseSnapshot(snap); } else { ASSERT_OK(db_->Put(write_opts, cfh, test_data_[index].key, test_data_[index].timestamp, test_data_[index].value)); } } void AssertVisibility(int ts, SequenceNumber seq, std::vector statuses) { ASSERT_EQ(kTestDataSize, statuses.size()); for (int i = 0; i < kTestDataSize; i++) { if (test_data_[i].seq_num <= seq && test_data_[i].ts <= ts) { ASSERT_OK(statuses[i]); } else { ASSERT_TRUE(statuses[i].IsNotFound()); } } } std::vector GetKeys() { std::vector ret(kTestDataSize); for (int i = 0; i < kTestDataSize; i++) { ret[i] = test_data_[i].key; } return ret; } void VerifyDefaultCF(int ts, const Snapshot* snap = nullptr) { ReadOptions read_opts; std::string read_ts = Timestamp(ts, 0); Slice read_ts_slice = read_ts; read_opts.timestamp = &read_ts_slice; read_opts.snapshot = snap; ColumnFamilyHandle* cfh = db_->DefaultColumnFamily(); std::vector cfs(kTestDataSize, cfh); SequenceNumber seq = snap ? snap->GetSequenceNumber() : kMaxSequenceNumber - 1; // There're several MultiGet interfaces with not exactly the same // implementations, query data with all of them. auto keys = GetKeys(); std::vector values; auto s1 = db_->MultiGet(read_opts, cfs, keys, &values); AssertVisibility(ts, seq, s1); auto s2 = db_->MultiGet(read_opts, keys, &values); AssertVisibility(ts, seq, s2); std::vector timestamps; auto s3 = db_->MultiGet(read_opts, cfs, keys, &values, ×tamps); AssertVisibility(ts, seq, s3); auto s4 = db_->MultiGet(read_opts, keys, &values, ×tamps); AssertVisibility(ts, seq, s4); std::vector values_ps5(kTestDataSize); std::vector s5(kTestDataSize); db_->MultiGet(read_opts, cfh, kTestDataSize, keys.data(), values_ps5.data(), s5.data()); AssertVisibility(ts, seq, s5); std::vector values_ps6(kTestDataSize); std::vector s6(kTestDataSize); std::vector timestamps_array(kTestDataSize); db_->MultiGet(read_opts, cfh, kTestDataSize, keys.data(), values_ps6.data(), timestamps_array.data(), s6.data()); AssertVisibility(ts, seq, s6); std::vector values_ps7(kTestDataSize); std::vector s7(kTestDataSize); db_->MultiGet(read_opts, kTestDataSize, cfs.data(), keys.data(), values_ps7.data(), s7.data()); AssertVisibility(ts, seq, s7); std::vector values_ps8(kTestDataSize); std::vector s8(kTestDataSize); db_->MultiGet(read_opts, kTestDataSize, cfs.data(), keys.data(), values_ps8.data(), timestamps_array.data(), s8.data()); AssertVisibility(ts, seq, s8); } void VerifyDefaultCF(const Snapshot* snap = nullptr) { for (int i = 0; i <= kTestDataSize; i++) { VerifyDefaultCF(i, snap); } } }; constexpr int DataVisibilityTest::kTestDataSize; // Application specifies timestamp but not snapshot. // reader writer // ts'=90 // ts=100 // seq=10 // seq'=11 // write finishes // GetImpl(ts,seq) // It is OK to return if ts>=t1 AND seq>=s1. If ts>=t1 but seqDisableProcessing(); SyncPoint::GetInstance()->LoadDependency({ {"DBImpl::GetImpl:3", "DataVisibilityTest::PointLookupWithoutSnapshot1:BeforePut"}, {"DataVisibilityTest::PointLookupWithoutSnapshot1:AfterPut", "DBImpl::GetImpl:4"}, }); SyncPoint::GetInstance()->EnableProcessing(); port::Thread writer_thread([this]() { std::string write_ts = Timestamp(1, 0); WriteOptions write_opts; TEST_SYNC_POINT( "DataVisibilityTest::PointLookupWithoutSnapshot1:BeforePut"); Status s = db_->Put(write_opts, "foo", write_ts, "value"); ASSERT_OK(s); TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithoutSnapshot1:AfterPut"); }); ReadOptions read_opts; std::string read_ts_str = Timestamp(3, 0); Slice read_ts = read_ts_str; read_opts.timestamp = &read_ts; std::string value; Status s = db_->Get(read_opts, "foo", &value); writer_thread.join(); ASSERT_TRUE(s.IsNotFound()); Close(); } // Application specifies timestamp but not snapshot. // reader writer // ts'=90 // ts=100 // seq=10 // seq'=11 // write finishes // Flush // GetImpl(ts,seq) // It is OK to return if ts>=t1 AND seq>=s1. If ts>=t1 but seqDisableProcessing(); SyncPoint::GetInstance()->LoadDependency({ {"DBImpl::GetImpl:3", "DataVisibilityTest::PointLookupWithoutSnapshot2:BeforePut"}, {"DataVisibilityTest::PointLookupWithoutSnapshot2:AfterPut", "DBImpl::GetImpl:4"}, }); SyncPoint::GetInstance()->EnableProcessing(); port::Thread writer_thread([this]() { std::string write_ts = Timestamp(1, 0); WriteOptions write_opts; TEST_SYNC_POINT( "DataVisibilityTest::PointLookupWithoutSnapshot2:BeforePut"); Status s = db_->Put(write_opts, "foo", write_ts, "value"); ASSERT_OK(s); ASSERT_OK(Flush()); write_ts = Timestamp(2, 0); s = db_->Put(write_opts, "bar", write_ts, "value"); ASSERT_OK(s); TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithoutSnapshot2:AfterPut"); }); ReadOptions read_opts; std::string read_ts_str = Timestamp(3, 0); Slice read_ts = read_ts_str; read_opts.timestamp = &read_ts; std::string value; Status s = db_->Get(read_opts, "foo", &value); writer_thread.join(); ASSERT_TRUE(s.IsNotFound()); Close(); } // Application specifies both timestamp and snapshot. // reader writer // seq=10 // ts'=90 // ts=100 // seq'=11 // write finishes // GetImpl(ts,seq) // Since application specifies both timestamp and snapshot, application expects // to see data that visible in BOTH timestamp and sequence number. Therefore, // can be returned only if t1<=ts AND s1<=seq. TEST_F(DataVisibilityTest, PointLookupWithSnapshot1) { Options options = CurrentOptions(); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->LoadDependency({ {"DataVisibilityTest::PointLookupWithSnapshot1:AfterTakingSnap", "DataVisibilityTest::PointLookupWithSnapshot1:BeforePut"}, {"DataVisibilityTest::PointLookupWithSnapshot1:AfterPut", "DBImpl::GetImpl:1"}, }); SyncPoint::GetInstance()->EnableProcessing(); port::Thread writer_thread([this]() { std::string write_ts = Timestamp(1, 0); WriteOptions write_opts; TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithSnapshot1:BeforePut"); Status s = db_->Put(write_opts, "foo", write_ts, "value"); TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithSnapshot1:AfterPut"); ASSERT_OK(s); }); ReadOptions read_opts; const Snapshot* snap = db_->GetSnapshot(); TEST_SYNC_POINT( "DataVisibilityTest::PointLookupWithSnapshot1:AfterTakingSnap"); read_opts.snapshot = snap; std::string read_ts_str = Timestamp(3, 0); Slice read_ts = read_ts_str; read_opts.timestamp = &read_ts; std::string value; Status s = db_->Get(read_opts, "foo", &value); writer_thread.join(); ASSERT_TRUE(s.IsNotFound()); db_->ReleaseSnapshot(snap); Close(); } // Application specifies both timestamp and snapshot. // reader writer // seq=10 // ts'=90 // ts=100 // seq'=11 // write finishes // Flush // GetImpl(ts,seq) // Since application specifies both timestamp and snapshot, application expects // to see data that visible in BOTH timestamp and sequence number. Therefore, // can be returned only if t1<=ts AND s1<=seq. TEST_F(DataVisibilityTest, PointLookupWithSnapshot2) { Options options = CurrentOptions(); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->LoadDependency({ {"DataVisibilityTest::PointLookupWithSnapshot2:AfterTakingSnap", "DataVisibilityTest::PointLookupWithSnapshot2:BeforePut"}, }); SyncPoint::GetInstance()->EnableProcessing(); port::Thread writer_thread([this]() { std::string write_ts = Timestamp(1, 0); WriteOptions write_opts; TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithSnapshot2:BeforePut"); Status s = db_->Put(write_opts, "foo", write_ts, "value1"); ASSERT_OK(s); ASSERT_OK(Flush()); write_ts = Timestamp(2, 0); s = db_->Put(write_opts, "bar", write_ts, "value2"); ASSERT_OK(s); }); const Snapshot* snap = db_->GetSnapshot(); TEST_SYNC_POINT( "DataVisibilityTest::PointLookupWithSnapshot2:AfterTakingSnap"); writer_thread.join(); std::string read_ts_str = Timestamp(3, 0); Slice read_ts = read_ts_str; ReadOptions read_opts; read_opts.snapshot = snap; read_opts.timestamp = &read_ts; std::string value; Status s = db_->Get(read_opts, "foo", &value); ASSERT_TRUE(s.IsNotFound()); db_->ReleaseSnapshot(snap); Close(); } // Application specifies timestamp but not snapshot. // reader writer // ts'=90 // ts=100 // seq=10 // seq'=11 // write finishes // scan(ts,seq) // can be seen in scan as long as ts>=t1 AND seq>=s1. If ts>=t1 but // seqDisableProcessing(); SyncPoint::GetInstance()->LoadDependency({ {"DBImpl::NewIterator:3", "DataVisibilityTest::RangeScanWithoutSnapshot:BeforePut"}, }); SyncPoint::GetInstance()->EnableProcessing(); port::Thread writer_thread([this]() { WriteOptions write_opts; TEST_SYNC_POINT("DataVisibilityTest::RangeScanWithoutSnapshot:BeforePut"); for (int i = 0; i < 3; ++i) { std::string write_ts = Timestamp(i + 1, 0); Status s = db_->Put(write_opts, "key" + std::to_string(i), write_ts, "value" + std::to_string(i)); ASSERT_OK(s); } }); std::string read_ts_str = Timestamp(10, 0); Slice read_ts = read_ts_str; ReadOptions read_opts; read_opts.total_order_seek = true; read_opts.timestamp = &read_ts; Iterator* it = db_->NewIterator(read_opts); ASSERT_NE(nullptr, it); writer_thread.join(); it->SeekToFirst(); ASSERT_FALSE(it->Valid()); delete it; Close(); } // Application specifies both timestamp and snapshot. // reader writer // seq=10 // ts'=90 // ts=100 seq'=11 // write finishes // scan(ts,seq) // can be seen by the scan only if t1<=ts AND s1<=seq. If t1<=ts // but s1>seq, then the key should not be returned. TEST_F(DataVisibilityTest, RangeScanWithSnapshot) { Options options = CurrentOptions(); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->LoadDependency({ {"DataVisibilityTest::RangeScanWithSnapshot:AfterTakingSnapshot", "DataVisibilityTest::RangeScanWithSnapshot:BeforePut"}, }); SyncPoint::GetInstance()->EnableProcessing(); port::Thread writer_thread([this]() { WriteOptions write_opts; TEST_SYNC_POINT("DataVisibilityTest::RangeScanWithSnapshot:BeforePut"); for (int i = 0; i < 3; ++i) { std::string write_ts = Timestamp(i + 1, 0); Status s = db_->Put(write_opts, "key" + std::to_string(i), write_ts, "value" + std::to_string(i)); ASSERT_OK(s); } }); const Snapshot* snap = db_->GetSnapshot(); TEST_SYNC_POINT( "DataVisibilityTest::RangeScanWithSnapshot:AfterTakingSnapshot"); writer_thread.join(); std::string read_ts_str = Timestamp(10, 0); Slice read_ts = read_ts_str; ReadOptions read_opts; read_opts.snapshot = snap; read_opts.total_order_seek = true; read_opts.timestamp = &read_ts; Iterator* it = db_->NewIterator(read_opts); ASSERT_NE(nullptr, it); it->Seek("key0"); ASSERT_FALSE(it->Valid()); delete it; db_->ReleaseSnapshot(snap); Close(); } // Application specifies both timestamp and snapshot. // Query each combination and make sure for MultiGet key , only // return keys that ts>=t1 AND seq>=s1. TEST_F(DataVisibilityTest, MultiGetWithTimestamp) { Options options = CurrentOptions(); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); const Snapshot* snap0 = db_->GetSnapshot(); PutTestData(0); VerifyDefaultCF(); VerifyDefaultCF(snap0); const Snapshot* snap1 = db_->GetSnapshot(); PutTestData(1); VerifyDefaultCF(); VerifyDefaultCF(snap0); VerifyDefaultCF(snap1); ASSERT_OK(Flush()); const Snapshot* snap2 = db_->GetSnapshot(); PutTestData(2); VerifyDefaultCF(); VerifyDefaultCF(snap0); VerifyDefaultCF(snap1); VerifyDefaultCF(snap2); db_->ReleaseSnapshot(snap0); db_->ReleaseSnapshot(snap1); db_->ReleaseSnapshot(snap2); Close(); } // Application specifies timestamp but not snapshot. // reader writer // ts'=0, 1 // ts=3 // seq=10 // seq'=11, 12 // write finishes // MultiGet(ts,seq) // For MultiGet , only return keys that ts>=t1 AND seq>=s1. TEST_F(DataVisibilityTest, MultiGetWithoutSnapshot) { Options options = CurrentOptions(); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->LoadDependency({ {"DBImpl::MultiGet:AfterGetSeqNum1", "DataVisibilityTest::MultiGetWithoutSnapshot:BeforePut"}, {"DataVisibilityTest::MultiGetWithoutSnapshot:AfterPut", "DBImpl::MultiGet:AfterGetSeqNum2"}, }); SyncPoint::GetInstance()->EnableProcessing(); port::Thread writer_thread([this]() { TEST_SYNC_POINT("DataVisibilityTest::MultiGetWithoutSnapshot:BeforePut"); PutTestData(0); PutTestData(1); TEST_SYNC_POINT("DataVisibilityTest::MultiGetWithoutSnapshot:AfterPut"); }); ReadOptions read_opts; std::string read_ts = Timestamp(kTestDataSize, 0); Slice read_ts_slice = read_ts; read_opts.timestamp = &read_ts_slice; auto keys = GetKeys(); std::vector values; auto ss = db_->MultiGet(read_opts, keys, &values); writer_thread.join(); for (auto s : ss) { ASSERT_TRUE(s.IsNotFound()); } VerifyDefaultCF(); Close(); } TEST_F(DataVisibilityTest, MultiGetCrossCF) { Options options = CurrentOptions(); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); CreateAndReopenWithCF({"second"}, options); ColumnFamilyHandle* second_cf = handles_[1]; const Snapshot* snap0 = db_->GetSnapshot(); PutTestData(0); PutTestData(0, second_cf); VerifyDefaultCF(); VerifyDefaultCF(snap0); const Snapshot* snap1 = db_->GetSnapshot(); PutTestData(1); PutTestData(1, second_cf); VerifyDefaultCF(); VerifyDefaultCF(snap0); VerifyDefaultCF(snap1); ASSERT_OK(Flush()); const Snapshot* snap2 = db_->GetSnapshot(); PutTestData(2); PutTestData(2, second_cf); VerifyDefaultCF(); VerifyDefaultCF(snap0); VerifyDefaultCF(snap1); VerifyDefaultCF(snap2); ReadOptions read_opts; std::string read_ts = Timestamp(kTestDataSize, 0); Slice read_ts_slice = read_ts; read_opts.timestamp = &read_ts_slice; read_opts.snapshot = snap1; auto keys = GetKeys(); auto keys2 = GetKeys(); keys.insert(keys.end(), keys2.begin(), keys2.end()); std::vector cfs(kTestDataSize, db_->DefaultColumnFamily()); std::vector cfs2(kTestDataSize, second_cf); cfs.insert(cfs.end(), cfs2.begin(), cfs2.end()); std::vector values; auto ss = db_->MultiGet(read_opts, cfs, keys, &values); for (int i = 0; i < 2 * kTestDataSize; i++) { if (i % 3 == 0) { // only the first key for each column family should be returned ASSERT_OK(ss[i]); } else { ASSERT_TRUE(ss[i].IsNotFound()); } } db_->ReleaseSnapshot(snap0); db_->ReleaseSnapshot(snap1); db_->ReleaseSnapshot(snap2); Close(); } #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) class DBBasicTestWithTimestampCompressionSettings : public DBBasicTestWithTimestampBase, public testing::WithParamInterface< std::tuple, CompressionType, uint32_t, uint32_t>> { public: DBBasicTestWithTimestampCompressionSettings() : DBBasicTestWithTimestampBase( "db_basic_test_with_timestamp_compression") {} }; TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGet) { const int kNumKeysPerFile = 1024; const size_t kNumTimestamps = 4; Options options = CurrentOptions(); options.create_if_missing = true; options.env = env_; options.memtable_factory.reset( test::NewSpecialSkipListFactory(kNumKeysPerFile)); size_t ts_sz = Timestamp(0, 0).size(); TestComparator test_cmp(ts_sz); options.comparator = &test_cmp; BlockBasedTableOptions bbto; bbto.filter_policy = std::get<0>(GetParam()); bbto.whole_key_filtering = true; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); const CompressionType comp_type = std::get<1>(GetParam()); #if LZ4_VERSION_NUMBER < 10400 // r124+ if (comp_type == kLZ4Compression || comp_type == kLZ4HCCompression) { return; } #endif // LZ4_VERSION_NUMBER >= 10400 if (!ZSTD_Supported() && comp_type == kZSTD) { return; } if (!Zlib_Supported() && comp_type == kZlibCompression) { return; } options.compression = comp_type; options.compression_opts.max_dict_bytes = std::get<2>(GetParam()); if (comp_type == kZSTD) { options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam()); } options.compression_opts.parallel_threads = std::get<3>(GetParam()); options.target_file_size_base = 1 << 26; // 64MB DestroyAndReopen(options); CreateAndReopenWithCF({"pikachu"}, options); size_t num_cfs = handles_.size(); ASSERT_EQ(2, num_cfs); std::vector write_ts_list; std::vector read_ts_list; for (size_t i = 0; i != kNumTimestamps; ++i) { write_ts_list.push_back(Timestamp(i * 2, 0)); read_ts_list.push_back(Timestamp(1 + i * 2, 0)); const Slice write_ts = write_ts_list.back(); WriteOptions wopts; for (int cf = 0; cf != static_cast(num_cfs); ++cf) { for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) { ASSERT_OK( db_->Put(wopts, handles_[cf], Key1(j), write_ts, "value_" + std::to_string(j) + "_" + std::to_string(i))); } } } const auto& verify_db_func = [&]() { for (size_t i = 0; i != kNumTimestamps; ++i) { ReadOptions ropts; const Slice read_ts = read_ts_list[i]; ropts.timestamp = &read_ts; for (int cf = 0; cf != static_cast(num_cfs); ++cf) { ColumnFamilyHandle* cfh = handles_[cf]; for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) { std::string value; ASSERT_OK(db_->Get(ropts, cfh, Key1(j), &value)); ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i), value); } } } }; verify_db_func(); Close(); } TEST_P(DBBasicTestWithTimestampCompressionSettings, PutDeleteGet) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; const int kNumKeysPerFile = 1024; options.memtable_factory.reset( test::NewSpecialSkipListFactory(kNumKeysPerFile)); BlockBasedTableOptions bbto; bbto.filter_policy = std::get<0>(GetParam()); bbto.whole_key_filtering = true; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); const CompressionType comp_type = std::get<1>(GetParam()); #if LZ4_VERSION_NUMBER < 10400 // r124+ if (comp_type == kLZ4Compression || comp_type == kLZ4HCCompression) { return; } #endif // LZ4_VERSION_NUMBER >= 10400 if (!ZSTD_Supported() && comp_type == kZSTD) { return; } if (!Zlib_Supported() && comp_type == kZlibCompression) { return; } options.compression = comp_type; options.compression_opts.max_dict_bytes = std::get<2>(GetParam()); if (comp_type == kZSTD) { options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam()); } options.compression_opts.parallel_threads = std::get<3>(GetParam()); options.target_file_size_base = 1 << 26; // 64MB DestroyAndReopen(options); const size_t kNumL0Files = static_cast(Options().level0_file_num_compaction_trigger); { // Half of the keys will go through Deletion and remaining half with // SingleDeletion. Generate enough L0 files with ts=1 to trigger compaction // to L1 std::string ts = Timestamp(1, 0); WriteOptions wopts; for (size_t i = 0; i < kNumL0Files; ++i) { for (int j = 0; j < kNumKeysPerFile; ++j) { ASSERT_OK(db_->Put(wopts, Key1(j), ts, "value" + std::to_string(i))); } ASSERT_OK(db_->Flush(FlushOptions())); } ASSERT_OK(dbfull()->TEST_WaitForCompact()); // Generate another L0 at ts=3 ts = Timestamp(3, 0); for (int i = 0; i < kNumKeysPerFile; ++i) { std::string key_str = Key1(i); Slice key(key_str); if ((i % 3) == 0) { if (i < kNumKeysPerFile / 2) { ASSERT_OK(db_->Delete(wopts, key, ts)); } else { ASSERT_OK(db_->SingleDelete(wopts, key, ts)); } } else { ASSERT_OK(db_->Put(wopts, key, ts, "new_value")); } } ASSERT_OK(db_->Flush(FlushOptions())); // Populate memtable at ts=5 ts = Timestamp(5, 0); for (int i = 0; i != kNumKeysPerFile; ++i) { std::string key_str = Key1(i); Slice key(key_str); if ((i % 3) == 1) { if (i < kNumKeysPerFile / 2) { ASSERT_OK(db_->Delete(wopts, key, ts)); } else { ASSERT_OK(db_->SingleDelete(wopts, key, ts)); } } else if ((i % 3) == 2) { ASSERT_OK(db_->Put(wopts, key, ts, "new_value_2")); } } } { std::string ts_str = Timestamp(6, 0); Slice ts = ts_str; ReadOptions ropts; ropts.timestamp = &ts; for (uint64_t i = 0; i != static_cast(kNumKeysPerFile); ++i) { std::string value; std::string key_ts; Status s = db_->Get(ropts, Key1(i), &value, &key_ts); if ((i % 3) == 2) { ASSERT_OK(s); ASSERT_EQ("new_value_2", value); ASSERT_EQ(Timestamp(5, 0), key_ts); } else if ((i % 3) == 1) { ASSERT_TRUE(s.IsNotFound()); ASSERT_EQ(Timestamp(5, 0), key_ts); } else { ASSERT_TRUE(s.IsNotFound()); ASSERT_EQ(Timestamp(3, 0), key_ts); } } } } #ifndef ROCKSDB_LITE // A class which remembers the name of each flushed file. class FlushedFileCollector : public EventListener { public: FlushedFileCollector() {} ~FlushedFileCollector() override {} void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override { InstrumentedMutexLock lock(&mutex_); flushed_files_.push_back(info.file_path); } std::vector GetFlushedFiles() { std::vector result; { InstrumentedMutexLock lock(&mutex_); result = flushed_files_; } return result; } void ClearFlushedFiles() { InstrumentedMutexLock lock(&mutex_); flushed_files_.clear(); } private: std::vector flushed_files_; InstrumentedMutex mutex_; }; TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) { const int kNumKeysPerFile = 1024; const size_t kNumTimestamps = 2; const size_t kNumKeysPerTimestamp = (kNumKeysPerFile - 1) / kNumTimestamps; const size_t kSplitPosBase = kNumKeysPerTimestamp / 2; Options options = CurrentOptions(); options.create_if_missing = true; options.env = env_; options.memtable_factory.reset( test::NewSpecialSkipListFactory(kNumKeysPerFile)); FlushedFileCollector* collector = new FlushedFileCollector(); options.listeners.emplace_back(collector); size_t ts_sz = Timestamp(0, 0).size(); TestComparator test_cmp(ts_sz); options.comparator = &test_cmp; BlockBasedTableOptions bbto; bbto.filter_policy = std::get<0>(GetParam()); bbto.whole_key_filtering = true; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); const CompressionType comp_type = std::get<1>(GetParam()); #if LZ4_VERSION_NUMBER < 10400 // r124+ if (comp_type == kLZ4Compression || comp_type == kLZ4HCCompression) { return; } #endif // LZ4_VERSION_NUMBER >= 10400 if (!ZSTD_Supported() && comp_type == kZSTD) { return; } if (!Zlib_Supported() && comp_type == kZlibCompression) { return; } options.compression = comp_type; options.compression_opts.max_dict_bytes = std::get<2>(GetParam()); if (comp_type == kZSTD) { options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam()); } options.compression_opts.parallel_threads = std::get<3>(GetParam()); DestroyAndReopen(options); CreateAndReopenWithCF({"pikachu"}, options); size_t num_cfs = handles_.size(); ASSERT_EQ(2, num_cfs); std::vector write_ts_list; std::vector read_ts_list; const auto& verify_records_func = [&](size_t i, size_t begin, size_t end, ColumnFamilyHandle* cfh) { std::string value; std::string timestamp; ReadOptions ropts; const Slice read_ts = read_ts_list[i]; ropts.timestamp = &read_ts; std::string expected_timestamp = std::string(write_ts_list[i].data(), write_ts_list[i].size()); for (size_t j = begin; j <= end; ++j) { ASSERT_OK(db_->Get(ropts, cfh, Key1(j), &value, ×tamp)); ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i), value); ASSERT_EQ(expected_timestamp, timestamp); } }; for (size_t i = 0; i != kNumTimestamps; ++i) { write_ts_list.push_back(Timestamp(i * 2, 0)); read_ts_list.push_back(Timestamp(1 + i * 2, 0)); const Slice write_ts = write_ts_list.back(); WriteOptions wopts; for (int cf = 0; cf != static_cast(num_cfs); ++cf) { size_t memtable_get_start = 0; for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { ASSERT_OK( db_->Put(wopts, handles_[cf], Key1(j), write_ts, "value_" + std::to_string(j) + "_" + std::to_string(i))); if (j == kSplitPosBase + i || j == kNumKeysPerTimestamp - 1) { verify_records_func(i, memtable_get_start, j, handles_[cf]); memtable_get_start = j + 1; // flush all keys with the same timestamp to two sst files, split at // incremental positions such that lowerlevel[1].smallest.userkey == // higherlevel[0].largest.userkey ASSERT_OK(Flush(cf)); ASSERT_OK(dbfull()->TEST_WaitForCompact()); // wait for flush (which // is also a compaction) // compact files (2 at each level) to a lower level such that all // keys with the same timestamp is at one level, with newer versions // at higher levels. CompactionOptions compact_opt; compact_opt.compression = kNoCompression; ASSERT_OK(db_->CompactFiles(compact_opt, handles_[cf], collector->GetFlushedFiles(), static_cast(kNumTimestamps - i))); collector->ClearFlushedFiles(); } } } } const auto& verify_db_func = [&]() { for (size_t i = 0; i != kNumTimestamps; ++i) { ReadOptions ropts; const Slice read_ts = read_ts_list[i]; ropts.timestamp = &read_ts; std::string expected_timestamp(write_ts_list[i].data(), write_ts_list[i].size()); for (int cf = 0; cf != static_cast(num_cfs); ++cf) { ColumnFamilyHandle* cfh = handles_[cf]; verify_records_func(i, 0, kNumKeysPerTimestamp - 1, cfh); } } }; verify_db_func(); Close(); } TEST_F(DBBasicTestWithTimestamp, BatchWriteAndMultiGet) { const int kNumKeysPerFile = 8192; const size_t kNumTimestamps = 2; const size_t kNumKeysPerTimestamp = (kNumKeysPerFile - 1) / kNumTimestamps; Options options = CurrentOptions(); options.create_if_missing = true; options.env = env_; options.memtable_factory.reset( test::NewSpecialSkipListFactory(kNumKeysPerFile)); options.memtable_prefix_bloom_size_ratio = 0.1; options.memtable_whole_key_filtering = true; size_t ts_sz = Timestamp(0, 0).size(); TestComparator test_cmp(ts_sz); options.comparator = &test_cmp; BlockBasedTableOptions bbto; bbto.filter_policy.reset(NewBloomFilterPolicy( 10 /*bits_per_key*/, false /*use_block_based_builder*/)); bbto.whole_key_filtering = true; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); DestroyAndReopen(options); CreateAndReopenWithCF({"pikachu"}, options); size_t num_cfs = handles_.size(); ASSERT_EQ(2, num_cfs); std::vector write_ts_list; std::vector read_ts_list; const auto& verify_records_func = [&](size_t i, ColumnFamilyHandle* cfh) { std::vector keys; std::vector key_vals; std::vector values; std::vector timestamps; for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { key_vals.push_back(Key1(j)); } for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { keys.push_back(key_vals[j]); } ReadOptions ropts; const Slice read_ts = read_ts_list[i]; ropts.timestamp = &read_ts; std::string expected_timestamp(write_ts_list[i].data(), write_ts_list[i].size()); std::vector cfhs(keys.size(), cfh); std::vector statuses = db_->MultiGet(ropts, cfhs, keys, &values, ×tamps); for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { ASSERT_OK(statuses[j]); ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i), values[j]); ASSERT_EQ(expected_timestamp, timestamps[j]); } }; const std::string dummy_ts(ts_sz, '\0'); for (size_t i = 0; i != kNumTimestamps; ++i) { write_ts_list.push_back(Timestamp(i * 2, 0)); read_ts_list.push_back(Timestamp(1 + i * 2, 0)); const Slice& write_ts = write_ts_list.back(); for (int cf = 0; cf != static_cast(num_cfs); ++cf) { WriteOptions wopts; WriteBatch batch(0, 0, 0, ts_sz); for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { const std::string key = Key1(j); const std::string value = "value_" + std::to_string(j) + "_" + std::to_string(i); ASSERT_OK(batch.Put(handles_[cf], key, value)); } ASSERT_OK(batch.UpdateTimestamps(write_ts, [ts_sz](uint32_t) { return ts_sz; })); ASSERT_OK(db_->Write(wopts, &batch)); verify_records_func(i, handles_[cf]); ASSERT_OK(Flush(cf)); } } const auto& verify_db_func = [&]() { for (size_t i = 0; i != kNumTimestamps; ++i) { ReadOptions ropts; const Slice read_ts = read_ts_list[i]; ropts.timestamp = &read_ts; for (int cf = 0; cf != static_cast(num_cfs); ++cf) { ColumnFamilyHandle* cfh = handles_[cf]; verify_records_func(i, cfh); } } }; verify_db_func(); Close(); } TEST_F(DBBasicTestWithTimestamp, MultiGetNoReturnTs) { Options options = CurrentOptions(); options.env = env_; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); WriteOptions write_opts; std::string ts = Timestamp(1, 0); ASSERT_OK(db_->Put(write_opts, "foo", ts, "value")); ASSERT_OK(db_->Put(write_opts, "bar", ts, "value")); ASSERT_OK(db_->Put(write_opts, "fooxxxxxxxxxxxxxxxx", ts, "value")); ASSERT_OK(db_->Put(write_opts, "barxxxxxxxxxxxxxxxx", ts, "value")); ColumnFamilyHandle* cfh = dbfull()->DefaultColumnFamily(); ts = Timestamp(2, 0); Slice read_ts = ts; ReadOptions read_opts; read_opts.timestamp = &read_ts; { ColumnFamilyHandle* column_families[] = {cfh, cfh}; Slice keys[] = {"foo", "bar"}; PinnableSlice values[] = {PinnableSlice(), PinnableSlice()}; Status statuses[] = {Status::OK(), Status::OK()}; dbfull()->MultiGet(read_opts, /*num_keys=*/2, &column_families[0], &keys[0], &values[0], &statuses[0], /*sorted_input=*/false); for (const auto& s : statuses) { ASSERT_OK(s); } } { ColumnFamilyHandle* column_families[] = {cfh, cfh, cfh, cfh}; // Make user keys longer than configured timestamp size (16 bytes) to // verify RocksDB does not use the trailing bytes 'x' as timestamp. Slice keys[] = {"fooxxxxxxxxxxxxxxxx", "barxxxxxxxxxxxxxxxx", "foo", "bar"}; PinnableSlice values[] = {PinnableSlice(), PinnableSlice(), PinnableSlice(), PinnableSlice()}; Status statuses[] = {Status::OK(), Status::OK(), Status::OK(), Status::OK()}; dbfull()->MultiGet(read_opts, /*num_keys=*/4, &column_families[0], &keys[0], &values[0], &statuses[0], /*sorted_input=*/false); for (const auto& s : statuses) { ASSERT_OK(s); } } Close(); } #endif // !ROCKSDB_LITE INSTANTIATE_TEST_CASE_P( Timestamp, DBBasicTestWithTimestampCompressionSettings, ::testing::Combine( ::testing::Values(std::shared_ptr(nullptr), std::shared_ptr( NewBloomFilterPolicy(10, false))), ::testing::Values(kNoCompression, kZlibCompression, kLZ4Compression, kLZ4HCCompression, kZSTD), ::testing::Values(0, 1 << 14), ::testing::Values(1, 4))); class DBBasicTestWithTimestampPrefixSeek : public DBBasicTestWithTimestampBase, public testing::WithParamInterface< std::tuple, std::shared_ptr, bool, BlockBasedTableOptions::IndexType>> { public: DBBasicTestWithTimestampPrefixSeek() : DBBasicTestWithTimestampBase( "/db_basic_test_with_timestamp_prefix_seek") {} }; TEST_P(DBBasicTestWithTimestampPrefixSeek, IterateWithPrefix) { const size_t kNumKeysPerFile = 128; Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; options.prefix_extractor = std::get<0>(GetParam()); options.memtable_factory.reset( test::NewSpecialSkipListFactory(kNumKeysPerFile)); BlockBasedTableOptions bbto; bbto.filter_policy = std::get<1>(GetParam()); bbto.index_type = std::get<3>(GetParam()); options.table_factory.reset(NewBlockBasedTableFactory(bbto)); DestroyAndReopen(options); const uint64_t kMaxKey = 0xffffffffffffffff; const uint64_t kMinKey = 0xfffffffffffff000; const std::vector write_ts_list = {Timestamp(3, 0xffffffff), Timestamp(6, 0xffffffff)}; WriteOptions write_opts; { for (size_t i = 0; i != write_ts_list.size(); ++i) { for (uint64_t key = kMaxKey; key >= kMinKey; --key) { Status s = db_->Put(write_opts, Key1(key), write_ts_list[i], "value" + std::to_string(i)); ASSERT_OK(s); } } } const std::vector read_ts_list = {Timestamp(5, 0xffffffff), Timestamp(9, 0xffffffff)}; { ReadOptions read_opts; read_opts.total_order_seek = false; read_opts.prefix_same_as_start = std::get<2>(GetParam()); fprintf(stdout, "%s %s %d\n", options.prefix_extractor->Name(), bbto.filter_policy ? bbto.filter_policy->Name() : "null", static_cast(read_opts.prefix_same_as_start)); for (size_t i = 0; i != read_ts_list.size(); ++i) { Slice read_ts = read_ts_list[i]; read_opts.timestamp = &read_ts; std::unique_ptr iter(db_->NewIterator(read_opts)); // Seek to kMaxKey iter->Seek(Key1(kMaxKey)); CheckIterUserEntry(iter.get(), Key1(kMaxKey), kTypeValue, "value" + std::to_string(i), write_ts_list[i]); iter->Next(); ASSERT_FALSE(iter->Valid()); // Seek to kMinKey iter->Seek(Key1(kMinKey)); CheckIterUserEntry(iter.get(), Key1(kMinKey), kTypeValue, "value" + std::to_string(i), write_ts_list[i]); iter->Prev(); ASSERT_FALSE(iter->Valid()); } const std::vector targets = {kMinKey, kMinKey + 0x10, kMinKey + 0x100, kMaxKey}; const SliceTransform* const pe = options.prefix_extractor.get(); ASSERT_NE(nullptr, pe); const size_t kPrefixShift = 8 * (Key1(0).size() - pe->Transform(Key1(0)).size()); const uint64_t kPrefixMask = ~((static_cast(1) << kPrefixShift) - 1); const uint64_t kNumKeysWithinPrefix = (static_cast(1) << kPrefixShift); for (size_t i = 0; i != read_ts_list.size(); ++i) { Slice read_ts = read_ts_list[i]; read_opts.timestamp = &read_ts; std::unique_ptr it(db_->NewIterator(read_opts)); // Forward and backward iterate. for (size_t j = 0; j != targets.size(); ++j) { std::string start_key = Key1(targets[j]); uint64_t expected_ub = (targets[j] & kPrefixMask) - 1 + kNumKeysWithinPrefix; uint64_t expected_key = targets[j]; size_t count = 0; it->Seek(Key1(targets[j])); while (it->Valid()) { std::string saved_prev_key; saved_prev_key.assign(it->key().data(), it->key().size()); // Out of prefix if (!read_opts.prefix_same_as_start && pe->Transform(saved_prev_key) != pe->Transform(start_key)) { break; } CheckIterUserEntry(it.get(), Key1(expected_key), kTypeValue, "value" + std::to_string(i), write_ts_list[i]); ++count; ++expected_key; it->Next(); } ASSERT_EQ(expected_ub - targets[j] + 1, count); count = 0; expected_key = targets[j]; it->SeekForPrev(start_key); uint64_t expected_lb = (targets[j] & kPrefixMask); while (it->Valid()) { // Out of prefix if (!read_opts.prefix_same_as_start && pe->Transform(it->key()) != pe->Transform(start_key)) { break; } CheckIterUserEntry(it.get(), Key1(expected_key), kTypeValue, "value" + std::to_string(i), write_ts_list[i]); ++count; --expected_key; it->Prev(); } ASSERT_EQ(targets[j] - std::max(expected_lb, kMinKey) + 1, count); } } } Close(); } // TODO(yanqin): consider handling non-fixed-length prefix extractors, e.g. // NoopTransform. INSTANTIATE_TEST_CASE_P( Timestamp, DBBasicTestWithTimestampPrefixSeek, ::testing::Combine( ::testing::Values( std::shared_ptr(NewFixedPrefixTransform(1)), std::shared_ptr(NewFixedPrefixTransform(4)), std::shared_ptr(NewFixedPrefixTransform(7)), std::shared_ptr(NewFixedPrefixTransform(8))), ::testing::Values(std::shared_ptr(nullptr), std::shared_ptr( NewBloomFilterPolicy(10 /*bits_per_key*/, false)), std::shared_ptr( NewBloomFilterPolicy(20 /*bits_per_key*/, false))), ::testing::Bool(), ::testing::Values( BlockBasedTableOptions::IndexType::kBinarySearch, BlockBasedTableOptions::IndexType::kHashSearch, BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch, BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey))); class DBBasicTestWithTsIterTombstones : public DBBasicTestWithTimestampBase, public testing::WithParamInterface< std::tuple, std::shared_ptr, int, BlockBasedTableOptions::IndexType>> { public: DBBasicTestWithTsIterTombstones() : DBBasicTestWithTimestampBase("/db_basic_ts_iter_tombstones") {} }; TEST_P(DBBasicTestWithTsIterTombstones, IterWithDelete) { constexpr size_t kNumKeysPerFile = 128; Options options = CurrentOptions(); options.env = env_; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; options.prefix_extractor = std::get<0>(GetParam()); options.memtable_factory.reset( test::NewSpecialSkipListFactory(kNumKeysPerFile)); BlockBasedTableOptions bbto; bbto.filter_policy = std::get<1>(GetParam()); bbto.index_type = std::get<3>(GetParam()); options.table_factory.reset(NewBlockBasedTableFactory(bbto)); options.num_levels = std::get<2>(GetParam()); DestroyAndReopen(options); std::vector write_ts_strs = {Timestamp(2, 0), Timestamp(4, 0)}; constexpr uint64_t kMaxKey = 0xffffffffffffffff; constexpr uint64_t kMinKey = 0xfffffffffffff000; // Insert kMinKey...kMaxKey uint64_t key = kMinKey; WriteOptions write_opts; Slice ts = write_ts_strs[0]; do { Status s = db_->Put(write_opts, Key1(key), write_ts_strs[0], "value" + std::to_string(key)); ASSERT_OK(s); if (kMaxKey == key) { break; } ++key; } while (true); for (key = kMaxKey; key >= kMinKey; --key) { Status s; if (0 != (key % 2)) { s = db_->Put(write_opts, Key1(key), write_ts_strs[1], "value1" + std::to_string(key)); } else { s = db_->Delete(write_opts, Key1(key), write_ts_strs[1]); } ASSERT_OK(s); } ASSERT_OK(dbfull()->TEST_WaitForCompact()); { std::string read_ts = Timestamp(4, 0); ts = read_ts; ReadOptions read_opts; read_opts.total_order_seek = true; read_opts.timestamp = &ts; std::unique_ptr iter(db_->NewIterator(read_opts)); size_t count = 0; key = kMinKey + 1; for (iter->SeekToFirst(); iter->Valid(); iter->Next(), ++count, key += 2) { ASSERT_EQ(Key1(key), iter->key()); ASSERT_EQ("value1" + std::to_string(key), iter->value()); } ASSERT_EQ((kMaxKey - kMinKey + 1) / 2, count); for (iter->SeekToLast(), count = 0, key = kMaxKey; iter->Valid(); key -= 2, ++count, iter->Prev()) { ASSERT_EQ(Key1(key), iter->key()); ASSERT_EQ("value1" + std::to_string(key), iter->value()); } ASSERT_EQ((kMaxKey - kMinKey + 1) / 2, count); } Close(); } INSTANTIATE_TEST_CASE_P( Timestamp, DBBasicTestWithTsIterTombstones, ::testing::Combine( ::testing::Values( std::shared_ptr(NewFixedPrefixTransform(7)), std::shared_ptr(NewFixedPrefixTransform(8))), ::testing::Values(std::shared_ptr(nullptr), std::shared_ptr( NewBloomFilterPolicy(10, false)), std::shared_ptr( NewBloomFilterPolicy(20, false))), ::testing::Values(2, 6), ::testing::Values( BlockBasedTableOptions::IndexType::kBinarySearch, BlockBasedTableOptions::IndexType::kHashSearch, BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch, BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey))); #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) class UpdateFullHistoryTsLowTest : public DBBasicTestWithTimestampBase { public: UpdateFullHistoryTsLowTest() : DBBasicTestWithTimestampBase("/update_full_history_ts_low_test") {} }; TEST_F(UpdateFullHistoryTsLowTest, ConcurrentUpdate) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; std::string lower_ts_low = Timestamp(10, 0); std::string higher_ts_low = Timestamp(25, 0); const size_t kTimestampSize = lower_ts_low.size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); // This workaround swaps `lower_ts_low` originally used for update by the // caller to `higher_ts_low` after its writer is queued to make sure // the caller will always get a TryAgain error. // It mimics cases where two threads update full_history_ts_low concurrently // with one thread writing a higher ts_low and one thread writing a lower // ts_low. VersionEdit* version_edit; SyncPoint::GetInstance()->SetCallBack( "DBImpl::IncreaseFullHistoryTsLowImpl:BeforeEdit", [&](void* arg) { version_edit = reinterpret_cast(arg); }); SyncPoint::GetInstance()->SetCallBack( "VersionSet::LogAndApply:BeforeWriterWaiting", [&](void* /*arg*/) { version_edit->SetFullHistoryTsLow(higher_ts_low); }); SyncPoint::GetInstance()->EnableProcessing(); ASSERT_TRUE( db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(), lower_ts_low) .IsTryAgain()); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); Close(); } TEST_F(DBBasicTestWithTimestamp, GCPreserveRangeTombstoneWhenNoOrSmallFullHistoryLow) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); std::string ts_str = Timestamp(1, 0); WriteOptions wopts; ASSERT_OK(db_->Put(wopts, "k1", ts_str, "v1")); ASSERT_OK(db_->Put(wopts, "k2", ts_str, "v2")); ASSERT_OK(db_->Put(wopts, "k3", ts_str, "v3")); ts_str = Timestamp(2, 0); ASSERT_OK( db_->DeleteRange(wopts, db_->DefaultColumnFamily(), "k1", "k3", ts_str)); ts_str = Timestamp(3, 0); Slice ts = ts_str; ReadOptions ropts; ropts.timestamp = &ts; CompactRangeOptions cro; cro.full_history_ts_low = nullptr; std::string value, key_ts; Status s; auto verify = [&] { s = db_->Get(ropts, "k1", &value); ASSERT_TRUE(s.IsNotFound()); s = db_->Get(ropts, "k2", &value, &key_ts); ASSERT_TRUE(s.IsNotFound()); ASSERT_EQ(key_ts, Timestamp(2, 0)); ASSERT_OK(db_->Get(ropts, "k3", &value, &key_ts)); ASSERT_EQ(value, "v3"); ASSERT_EQ(Timestamp(1, 0), key_ts); size_t batch_size = 3; std::vector key_strs = {"k1", "k2", "k3"}; std::vector keys{key_strs.begin(), key_strs.end()}; std::vector values(batch_size); std::vector statuses(batch_size); db_->MultiGet(ropts, db_->DefaultColumnFamily(), batch_size, keys.data(), values.data(), statuses.data(), true /* sorted_input */); ASSERT_TRUE(statuses[0].IsNotFound()); ASSERT_TRUE(statuses[1].IsNotFound()); ASSERT_OK(statuses[2]); ; ASSERT_EQ(values[2], "v3"); }; verify(); ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); verify(); std::string lb = Timestamp(0, 0); Slice lb_slice = lb; cro.full_history_ts_low = &lb_slice; ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); verify(); Close(); } TEST_F(DBBasicTestWithTimestamp, GCRangeTombstonesAndCoveredKeysRespectingTslow) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; BlockBasedTableOptions bbto; bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); bbto.cache_index_and_filter_blocks = true; bbto.whole_key_filtering = true; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; options.num_levels = 2; DestroyAndReopen(options); WriteOptions wopts; ASSERT_OK(db_->Put(wopts, "k1", Timestamp(1, 0), "v1")); ASSERT_OK(db_->Delete(wopts, "k2", Timestamp(2, 0))); ASSERT_OK(db_->DeleteRange(wopts, db_->DefaultColumnFamily(), "k1", "k3", Timestamp(3, 0))); ASSERT_OK(db_->Put(wopts, "k3", Timestamp(4, 0), "v3")); ReadOptions ropts; std::string read_ts = Timestamp(5, 0); Slice read_ts_slice = read_ts; ropts.timestamp = &read_ts_slice; size_t batch_size = 3; std::vector key_strs = {"k1", "k2", "k3"}; std::vector keys = {key_strs.begin(), key_strs.end()}; std::vector values(batch_size); std::vector statuses(batch_size); std::vector timestamps(batch_size); db_->MultiGet(ropts, db_->DefaultColumnFamily(), batch_size, keys.data(), values.data(), timestamps.data(), statuses.data(), true /* sorted_input */); ASSERT_TRUE(statuses[0].IsNotFound()); ASSERT_EQ(timestamps[0], Timestamp(3, 0)); ASSERT_TRUE(statuses[1].IsNotFound()); // DeleteRange has a higher timestamp than Delete for "k2" ASSERT_EQ(timestamps[1], Timestamp(3, 0)); ASSERT_OK(statuses[2]); ASSERT_EQ(values[2], "v3"); ASSERT_EQ(timestamps[2], Timestamp(4, 0)); CompactRangeOptions cro; // Range tombstone has timestamp >= full_history_ts_low, covered keys // are not dropped. std::string compaction_ts_str = Timestamp(2, 0); Slice compaction_ts = compaction_ts_str; cro.full_history_ts_low = &compaction_ts; cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); ropts.timestamp = &compaction_ts; std::string value, ts; ASSERT_OK(db_->Get(ropts, "k1", &value, &ts)); ASSERT_EQ(value, "v1"); // timestamp is below full_history_ts_low, zeroed out as the key goes into // bottommost level ASSERT_EQ(ts, Timestamp(0, 0)); ASSERT_TRUE(db_->Get(ropts, "k2", &value, &ts).IsNotFound()); ASSERT_EQ(ts, Timestamp(2, 0)); compaction_ts_str = Timestamp(4, 0); compaction_ts = compaction_ts_str; cro.full_history_ts_low = &compaction_ts; ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); ropts.timestamp = &read_ts_slice; // k1, k2 and the range tombstone should be dropped // k3 should still exist db_->MultiGet(ropts, db_->DefaultColumnFamily(), batch_size, keys.data(), values.data(), timestamps.data(), statuses.data(), true /* sorted_input */); ASSERT_TRUE(statuses[0].IsNotFound()); ASSERT_TRUE(timestamps[0].empty()); ASSERT_TRUE(statuses[1].IsNotFound()); ASSERT_TRUE(timestamps[1].empty()); ASSERT_OK(statuses[2]); ASSERT_EQ(values[2], "v3"); ASSERT_EQ(timestamps[2], Timestamp(4, 0)); Close(); } TEST_P(DBBasicTestWithTimestampTableOptions, DeleteRangeBaiscReadAndIterate) { const int kNum = 200, kRangeBegin = 50, kRangeEnd = 150, kNumPerFile = 25; Options options = CurrentOptions(); options.prefix_extractor.reset(NewFixedPrefixTransform(3)); options.compression = kNoCompression; BlockBasedTableOptions bbto; bbto.index_type = GetParam(); bbto.block_size = 100; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); options.env = env_; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; options.memtable_factory.reset(test::NewSpecialSkipListFactory(kNumPerFile)); DestroyAndReopen(options); // Write half of the keys before the tombstone and half after the tombstone. // Only covered keys (i.e., within the range and older than the tombstone) // should be deleted. for (int i = 0; i < kNum; ++i) { if (i == kNum / 2) { ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key1(kRangeBegin), Key1(kRangeEnd), Timestamp(i, 0))); } ASSERT_OK(db_->Put(WriteOptions(), Key1(i), Timestamp(i, 0), "val" + std::to_string(i))); if (i == kNum - kNumPerFile) { ASSERT_OK(Flush()); } } ReadOptions read_opts; read_opts.total_order_seek = true; std::string read_ts = Timestamp(kNum, 0); Slice read_ts_slice = read_ts; read_opts.timestamp = &read_ts_slice; { std::unique_ptr iter(db_->NewIterator(read_opts)); ASSERT_OK(iter->status()); int expected = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { ASSERT_EQ(Key1(expected), iter->key()); if (expected == kRangeBegin - 1) { expected = kNum / 2; } else { ++expected; } } ASSERT_EQ(kNum, expected); expected = kNum / 2; for (iter->Seek(Key1(kNum / 2)); iter->Valid(); iter->Next()) { ASSERT_EQ(Key1(expected), iter->key()); ++expected; } ASSERT_EQ(kNum, expected); expected = kRangeBegin - 1; for (iter->SeekForPrev(Key1(kNum / 2 - 1)); iter->Valid(); iter->Prev()) { ASSERT_EQ(Key1(expected), iter->key()); --expected; } ASSERT_EQ(-1, expected); read_ts = Timestamp(0, 0); read_ts_slice = read_ts; read_opts.timestamp = &read_ts_slice; iter.reset(db_->NewIterator(read_opts)); iter->SeekToFirst(); ASSERT_TRUE(iter->Valid()); ASSERT_EQ(iter->key(), Key1(0)); iter->Next(); ASSERT_FALSE(iter->Valid()); ASSERT_OK(iter->status()); } read_ts = Timestamp(kNum, 0); read_ts_slice = read_ts; read_opts.timestamp = &read_ts_slice; std::string value, timestamp; Status s; for (int i = 0; i < kNum; ++i) { s = db_->Get(read_opts, Key1(i), &value, ×tamp); if (i >= kRangeBegin && i < kNum / 2) { ASSERT_TRUE(s.IsNotFound()); ASSERT_EQ(timestamp, Timestamp(kNum / 2, 0)); } else { ASSERT_OK(s); ASSERT_EQ(value, "val" + std::to_string(i)); ASSERT_EQ(timestamp, Timestamp(i, 0)); } } size_t batch_size = kNum; std::vector key_strs(batch_size); std::vector keys(batch_size); std::vector values(batch_size); std::vector statuses(batch_size); std::vector timestamps(batch_size); for (int i = 0; i < kNum; ++i) { key_strs[i] = Key1(i); keys[i] = key_strs[i]; } db_->MultiGet(read_opts, db_->DefaultColumnFamily(), batch_size, keys.data(), values.data(), timestamps.data(), statuses.data(), true /* sorted_input */); for (int i = 0; i < kNum; ++i) { if (i >= kRangeBegin && i < kNum / 2) { ASSERT_TRUE(statuses[i].IsNotFound()); ASSERT_EQ(timestamps[i], Timestamp(kNum / 2, 0)); } else { ASSERT_OK(statuses[i]); ASSERT_EQ(values[i], "val" + std::to_string(i)); ASSERT_EQ(timestamps[i], Timestamp(i, 0)); } } Close(); } TEST_F(DBBasicTestWithTimestamp, DeleteRangeGetIteratorWithSnapshot) { // 4 keys 0, 1, 2, 3 at timestamps 0, 1, 2, 3 respectively. // A range tombstone [1, 3) at timestamp 1 and has a sequence number between // key 1 and 2. Options options = CurrentOptions(); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); WriteOptions write_opts; std::string put_ts = Timestamp(0, 0); const int kNum = 4, kNumPerFile = 1, kRangeBegin = 1, kRangeEnd = 3; options.memtable_factory.reset(test::NewSpecialSkipListFactory(kNumPerFile)); const Snapshot* before_tombstone = nullptr; const Snapshot* after_tombstone = nullptr; for (int i = 0; i < kNum; ++i) { ASSERT_OK(db_->Put(WriteOptions(), Key1(i), Timestamp(i, 0), "val" + std::to_string(i))); if (i == kRangeBegin) { before_tombstone = db_->GetSnapshot(); ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key1(kRangeBegin), Key1(kRangeEnd), Timestamp(kRangeBegin, 0))); } if (i == kNum / 2) { ASSERT_OK(Flush()); } } assert(before_tombstone); after_tombstone = db_->GetSnapshot(); // snapshot and ts before tombstone std::string read_ts_str = Timestamp(kRangeBegin - 1, 0); // (0, 0) Slice read_ts = read_ts_str; ReadOptions read_opts; read_opts.timestamp = &read_ts; read_opts.snapshot = before_tombstone; std::vector expected_status = { Status::OK(), Status::NotFound(), Status::NotFound(), Status::NotFound()}; std::vector expected_values(kNum); expected_values[0] = "val" + std::to_string(0); std::vector expected_timestamps(kNum); expected_timestamps[0] = Timestamp(0, 0); size_t batch_size = kNum; std::vector key_strs(batch_size); std::vector keys(batch_size); std::vector values(batch_size); std::vector statuses(batch_size); std::vector timestamps(batch_size); for (int i = 0; i < kNum; ++i) { key_strs[i] = Key1(i); keys[i] = key_strs[i]; } auto verify = [&] { db_->MultiGet(read_opts, db_->DefaultColumnFamily(), batch_size, keys.data(), values.data(), timestamps.data(), statuses.data(), true /* sorted_input */); std::string value, timestamp; Status s; for (int i = 0; i < kNum; ++i) { s = db_->Get(read_opts, Key1(i), &value, ×tamp); ASSERT_EQ(s, expected_status[i]); ASSERT_EQ(statuses[i], expected_status[i]); if (s.ok()) { ASSERT_EQ(value, expected_values[i]); ASSERT_EQ(values[i], expected_values[i]); } if (!timestamp.empty()) { ASSERT_EQ(timestamp, expected_timestamps[i]); ASSERT_EQ(timestamps[i], expected_timestamps[i]); } else { ASSERT_TRUE(timestamps[i].empty()); } } std::unique_ptr iter(db_->NewIterator(read_opts)); std::unique_ptr iter_for_seek(db_->NewIterator(read_opts)); iter->SeekToFirst(); for (int i = 0; i < kNum; ++i) { if (expected_status[i].ok()) { auto verify_iter = [&](Iterator* iter_ptr) { ASSERT_TRUE(iter_ptr->Valid()); ASSERT_EQ(iter_ptr->key(), keys[i]); ASSERT_EQ(iter_ptr->value(), expected_values[i]); ASSERT_EQ(iter_ptr->timestamp(), expected_timestamps[i]); }; verify_iter(iter.get()); iter->Next(); iter_for_seek->Seek(keys[i]); verify_iter(iter_for_seek.get()); iter_for_seek->SeekForPrev(keys[i]); verify_iter(iter_for_seek.get()); } } ASSERT_FALSE(iter->Valid()); ASSERT_OK(iter->status()); }; verify(); // snapshot before tombstone and ts after tombstone read_ts_str = Timestamp(kNum, 0); // (4, 0) read_ts = read_ts_str; read_opts.timestamp = &read_ts; read_opts.snapshot = before_tombstone; expected_status[1] = Status::OK(); expected_timestamps[1] = Timestamp(1, 0); expected_values[1] = "val" + std::to_string(1); verify(); // snapshot after tombstone and ts before tombstone read_ts_str = Timestamp(kRangeBegin - 1, 0); // (0, 0) read_ts = read_ts_str; read_opts.timestamp = &read_ts; read_opts.snapshot = after_tombstone; expected_status[1] = Status::NotFound(); expected_timestamps[1].clear(); expected_values[1].clear(); verify(); // snapshot and ts after tombstone read_ts_str = Timestamp(kNum, 0); // (4, 0) read_ts = read_ts_str; read_opts.timestamp = &read_ts; read_opts.snapshot = after_tombstone; for (int i = 0; i < kNum; ++i) { if (i == kRangeBegin) { expected_status[i] = Status::NotFound(); expected_values[i].clear(); } else { expected_status[i] = Status::OK(); expected_values[i] = "val" + std::to_string(i); } expected_timestamps[i] = Timestamp(i, 0); } verify(); db_->ReleaseSnapshot(before_tombstone); db_->ReleaseSnapshot(after_tombstone); Close(); } TEST_F(DBBasicTestWithTimestamp, MergeBasic) { Options options = GetDefaultOptions(); options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; options.merge_operator = std::make_shared('.'); DestroyAndReopen(options); const std::array write_ts_strs = { Timestamp(100, 0), Timestamp(200, 0), Timestamp(300, 0)}; constexpr size_t kNumOfUniqKeys = 100; ColumnFamilyHandle* default_cf = db_->DefaultColumnFamily(); for (size_t i = 0; i < write_ts_strs.size(); ++i) { for (size_t j = 0; j < kNumOfUniqKeys; ++j) { Status s; if (i == 0) { const std::string val = "v" + std::to_string(j) + "_0"; s = db_->Put(WriteOptions(), Key1(j), write_ts_strs[i], val); } else { const std::string merge_op = std::to_string(i); s = db_->Merge(WriteOptions(), default_cf, Key1(j), write_ts_strs[i], merge_op); } ASSERT_OK(s); } } std::array read_ts_strs = { Timestamp(150, 0), Timestamp(250, 0), Timestamp(350, 0)}; const auto verify_db_with_get = [&]() { for (size_t i = 0; i < kNumOfUniqKeys; ++i) { const std::string base_val = "v" + std::to_string(i) + "_0"; const std::array expected_values = { base_val, base_val + ".1", base_val + ".1.2"}; const std::array& expected_ts = write_ts_strs; ReadOptions read_opts; for (size_t j = 0; j < read_ts_strs.size(); ++j) { Slice read_ts = read_ts_strs[j]; read_opts.timestamp = &read_ts; std::string value; std::string ts; const Status s = db_->Get(read_opts, Key1(i), &value, &ts); ASSERT_OK(s); ASSERT_EQ(expected_values[j], value); ASSERT_EQ(expected_ts[j], ts); // Do Seek/SeekForPrev std::unique_ptr it(db_->NewIterator(read_opts)); it->Seek(Key1(i)); ASSERT_TRUE(it->Valid()); ASSERT_EQ(expected_values[j], it->value()); ASSERT_EQ(expected_ts[j], it->timestamp()); it->SeekForPrev(Key1(i)); ASSERT_TRUE(it->Valid()); ASSERT_EQ(expected_values[j], it->value()); ASSERT_EQ(expected_ts[j], it->timestamp()); } } }; const auto verify_db_with_iterator = [&]() { std::string value_suffix; for (size_t i = 0; i < read_ts_strs.size(); ++i) { ReadOptions read_opts; Slice read_ts = read_ts_strs[i]; read_opts.timestamp = &read_ts; std::unique_ptr it(db_->NewIterator(read_opts)); size_t key_int_val = 0; for (it->SeekToFirst(); it->Valid(); it->Next(), ++key_int_val) { const std::string key = Key1(key_int_val); const std::string value = "v" + std::to_string(key_int_val) + "_0" + value_suffix; ASSERT_EQ(key, it->key()); ASSERT_EQ(value, it->value()); ASSERT_EQ(write_ts_strs[i], it->timestamp()); } ASSERT_EQ(kNumOfUniqKeys, key_int_val); key_int_val = kNumOfUniqKeys - 1; for (it->SeekToLast(); it->Valid(); it->Prev(), --key_int_val) { const std::string key = Key1(key_int_val); const std::string value = "v" + std::to_string(key_int_val) + "_0" + value_suffix; ASSERT_EQ(key, it->key()); ASSERT_EQ(value, it->value()); ASSERT_EQ(write_ts_strs[i], it->timestamp()); } ASSERT_EQ(std::numeric_limits::max(), key_int_val); value_suffix = value_suffix + "." + std::to_string(i + 1); } }; verify_db_with_get(); verify_db_with_iterator(); ASSERT_OK(db_->Flush(FlushOptions())); verify_db_with_get(); verify_db_with_iterator(); Close(); } TEST_F(DBBasicTestWithTimestamp, MergeAfterDeletion) { Options options = GetDefaultOptions(); options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; options.merge_operator = std::make_shared('.'); DestroyAndReopen(options); ColumnFamilyHandle* const column_family = db_->DefaultColumnFamily(); const size_t num_keys_per_file = 10; const size_t num_merges_per_key = 2; for (size_t i = 0; i < num_keys_per_file; ++i) { std::string ts = Timestamp(i + 10000, 0); Status s = db_->Delete(WriteOptions(), Key1(i), ts); ASSERT_OK(s); for (size_t j = 1; j <= num_merges_per_key; ++j) { ts = Timestamp(i + 10000 + j, 0); s = db_->Merge(WriteOptions(), column_family, Key1(i), ts, std::to_string(j)); ASSERT_OK(s); } } const auto verify_db = [&]() { ReadOptions read_opts; std::string read_ts_str = Timestamp(20000, 0); Slice ts = read_ts_str; read_opts.timestamp = &ts; std::unique_ptr it(db_->NewIterator(read_opts)); size_t count = 0; for (it->SeekToFirst(); it->Valid(); it->Next(), ++count) { std::string key = Key1(count); ASSERT_EQ(key, it->key()); std::string value; for (size_t j = 1; j <= num_merges_per_key; ++j) { value.append(std::to_string(j)); if (j < num_merges_per_key) { value.push_back('.'); } } ASSERT_EQ(value, it->value()); std::string ts1 = Timestamp(count + 10000 + num_merges_per_key, 0); ASSERT_EQ(ts1, it->timestamp()); } ASSERT_OK(it->status()); ASSERT_EQ(num_keys_per_file, count); for (it->SeekToLast(); it->Valid(); it->Prev(), --count) { std::string key = Key1(count - 1); ASSERT_EQ(key, it->key()); std::string value; for (size_t j = 1; j <= num_merges_per_key; ++j) { value.append(std::to_string(j)); if (j < num_merges_per_key) { value.push_back('.'); } } ASSERT_EQ(value, it->value()); std::string ts1 = Timestamp(count - 1 + 10000 + num_merges_per_key, 0); ASSERT_EQ(ts1, it->timestamp()); } ASSERT_OK(it->status()); ASSERT_EQ(0, count); }; verify_db(); Close(); } TEST_F(DBBasicTestWithTimestamp, RangeTombstoneApproximateSize) { // Test code path for calculating range tombstone compensated size // during flush and compaction. Options options = CurrentOptions(); const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); // So that the compaction below is non-bottommost and will calcualte // compensated range tombstone size. ASSERT_OK(db_->Put(WriteOptions(), Key(1), Timestamp(1, 0), "val")); ASSERT_OK(Flush()); MoveFilesToLevel(5); ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0), Key(1), Timestamp(1, 0))); ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(1), Key(2), Timestamp(2, 0))); ASSERT_OK(Flush()); ASSERT_OK(dbfull()->RunManualCompaction( static_cast_with_check(db_->DefaultColumnFamily()) ->cfd(), 0 /* input_level */, 1 /* output_level */, CompactRangeOptions(), nullptr /* begin */, nullptr /* end */, true /* exclusive */, true /* disallow_trivial_move */, std::numeric_limits::max() /* max_file_num_to_ignore */, "" /*trim_ts*/)); } } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); ::testing::InitGoogleTest(&argc, argv); RegisterCustomObjects(argc, argv); return RUN_ALL_TESTS(); }