diff --git a/CMakeLists.txt b/CMakeLists.txt index f98bfcaec..7a9bc71f8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -988,6 +988,7 @@ if(WITH_TESTS) db/db_logical_block_size_cache_test.cc db/db_universal_compaction_test.cc db/db_wal_test.cc + db/db_with_timestamp_compaction_test.cc db/db_write_test.cc db/dbformat_test.cc db/deletefile_test.cc diff --git a/Makefile b/Makefile index 5ef3cb96f..b7feaa017 100644 --- a/Makefile +++ b/Makefile @@ -609,6 +609,7 @@ TESTS = \ blob_file_addition_test \ blob_file_garbage_test \ timer_test \ + db_with_timestamp_compaction_test \ ifeq ($(USE_FOLLY_DISTRIBUTED_MUTEX),1) TESTS += folly_synchronization_distributed_mutex_test @@ -1324,6 +1325,9 @@ db_basic_test: db/db_basic_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) db_with_timestamp_basic_test: db/db_with_timestamp_basic_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +db_with_timestamp_compaction_test: db/db_with_timestamp_compaction_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_encryption_test: db/db_encryption_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 8e2f876f1..3943ad313 100644 --- a/TARGETS +++ b/TARGETS @@ -931,6 +931,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "db_with_timestamp_compaction_test", + "db/db_with_timestamp_compaction_test.cc", + "serial", + [], + [], + ], [ "db_write_test", "db/db_write_test.cc", diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index 5c34fdcaa..04a09365c 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -23,7 +23,7 @@ const uint64_t kRangeTombstoneSentinel = int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a, const InternalKey& b) { - auto c = user_cmp->Compare(a.user_key(), b.user_key()); + auto c = user_cmp->CompareWithoutTimestamp(a.user_key(), b.user_key()); if (c != 0) { return c; } diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index af2e7cdc2..d7738efa2 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -157,21 +157,21 @@ Options DeletionTriggerOptions(Options options) { bool HaveOverlappingKeyRanges( const Comparator* c, const SstFileMetaData& a, const SstFileMetaData& b) { - if (c->Compare(a.smallestkey, b.smallestkey) >= 0) { - if (c->Compare(a.smallestkey, b.largestkey) <= 0) { + if (c->CompareWithoutTimestamp(a.smallestkey, b.smallestkey) >= 0) { + if (c->CompareWithoutTimestamp(a.smallestkey, b.largestkey) <= 0) { // b.smallestkey <= a.smallestkey <= b.largestkey return true; } - } else if (c->Compare(a.largestkey, b.smallestkey) >= 0) { + } else if (c->CompareWithoutTimestamp(a.largestkey, b.smallestkey) >= 0) { // a.smallestkey < b.smallestkey <= a.largestkey return true; } - if (c->Compare(a.largestkey, b.largestkey) <= 0) { - if (c->Compare(a.largestkey, b.smallestkey) >= 0) { + if (c->CompareWithoutTimestamp(a.largestkey, b.largestkey) <= 0) { + if (c->CompareWithoutTimestamp(a.largestkey, b.smallestkey) >= 0) { // b.smallestkey <= a.largestkey <= b.largestkey return true; } - } else if (c->Compare(a.smallestkey, b.largestkey) <= 0) { + } else if (c->CompareWithoutTimestamp(a.smallestkey, b.largestkey) <= 0) { // a.smallestkey <= b.largestkey < a.largestkey return true; } diff --git a/db/db_iter.cc b/db/db_iter.cc index cd0073a04..64388e797 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -256,7 +256,11 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, // possibly be skipped. This condition can potentially be relaxed to // prev_key.seq <= ikey_.sequence. We are cautious because it will be more // prone to bugs causing the same user key with the same sequence number. - if (!is_prev_key_seqnum_zero && skipping_saved_key && + // Note that with current timestamp implementation, the same user key can + // have different timestamps and zero sequence number on the bottommost + // level. This will change in the future. + if ((!is_prev_key_seqnum_zero || timestamp_size_ > 0) && + skipping_saved_key && CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) <= 0) { num_skipped++; // skip this entry PERF_COUNTER_ADD(internal_key_skipped_count, 1); diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index f5bc19a4e..46687a5bc 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -173,8 +173,6 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) { const uint64_t kMaxKey = 1024; Options options = CurrentOptions(); options.env = env_; - // TODO(yanqin) re-enable auto compaction - options.disable_auto_compactions = true; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); @@ -240,7 +238,6 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) { const uint64_t kMaxKey = 1024; Options options = CurrentOptions(); options.env = env_; - options.disable_auto_compactions = true; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); @@ -293,7 +290,9 @@ TEST_F(DBBasicTestWithTimestamp, ForwardIterateStartSeqnum) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; - // TODO(yanqin) re-enable auto compaction + // Need to disable compaction to bottommost level when sequence number will be + // zeroed out, causing the verification of sequence number to fail in this + // test. options.disable_auto_compactions = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); @@ -351,8 +350,6 @@ TEST_F(DBBasicTestWithTimestamp, ReseekToTargetTimestamp) { constexpr size_t kNumKeys = 16; options.max_sequential_skip_in_iterations = kNumKeys / 2; options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); - // TODO(yanqin) re-enable auto compaction - options.disable_auto_compactions = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; @@ -388,8 +385,6 @@ TEST_F(DBBasicTestWithTimestamp, ReseekToNextUserKey) { constexpr size_t kNumKeys = 16; options.max_sequential_skip_in_iterations = kNumKeys / 2; options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); - // TODO(yanqin) re-enable auto compaction - options.disable_auto_compactions = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; @@ -821,8 +816,6 @@ TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; - // TODO(yanqin): re-enable auto compactions - options.disable_auto_compactions = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; diff --git a/db/db_with_timestamp_compaction_test.cc b/db/db_with_timestamp_compaction_test.cc new file mode 100644 index 000000000..f5cc54df1 --- /dev/null +++ b/db/db_with_timestamp_compaction_test.cc @@ -0,0 +1,119 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/compaction/compaction.h" +#include "db/db_test_util.h" +#include "port/stack_trace.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { +std::string Key1(uint64_t key) { + std::string ret; + PutFixed64(&ret, key); + std::reverse(ret.begin(), ret.end()); + return ret; +} + +std::string Timestamp(uint64_t ts) { + std::string ret; + PutFixed64(&ret, ts); + return ret; +} +} // anonymous namespace + +class TimestampCompatibleCompactionTest : public DBTestBase { + public: + TimestampCompatibleCompactionTest() + : DBTestBase("/ts_compatible_compaction_test") {} + + std::string Get(const std::string& key, uint64_t ts) { + ReadOptions read_opts; + std::string ts_str = Timestamp(ts); + Slice ts_slice = ts_str; + read_opts.timestamp = &ts_slice; + std::string value; + Status s = db_->Get(read_opts, key, &value); + if (s.IsNotFound()) { + value.assign("NOT_FOUND"); + } else if (!s.ok()) { + value.assign(s.ToString()); + } + return value; + } +}; + +TEST_F(TimestampCompatibleCompactionTest, UserKeyCrossFileBoundary) { + Options options = CurrentOptions(); + options.env = env_; + options.compaction_style = kCompactionStyleLevel; + options.comparator = test::ComparatorWithU64Ts(); + options.level0_file_num_compaction_trigger = 3; + constexpr size_t kNumKeysPerFile = 101; + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); + DestroyAndReopen(options); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) { + const auto* compaction = reinterpret_cast(arg); + ASSERT_NE(nullptr, compaction); + ASSERT_EQ(0, compaction->start_level()); + ASSERT_EQ(1, compaction->num_input_levels()); + // Check that all 3 L0 ssts are picked for level compaction. + ASSERT_EQ(3, compaction->num_input_files(0)); + }); + SyncPoint::GetInstance()->EnableProcessing(); + // Write a L0 with keys 0, 1, ..., 99 with ts from 100 to 199. + uint64_t ts = 100; + uint64_t key = 0; + WriteOptions write_opts; + for (; key < kNumKeysPerFile - 1; ++key, ++ts) { + std::string ts_str = Timestamp(ts); + Slice ts_slice = ts_str; + write_opts.timestamp = &ts_slice; + ASSERT_OK(db_->Put(write_opts, Key1(key), "foo_" + std::to_string(key))); + } + // Write another L0 with keys 99 with newer ts. + ASSERT_OK(Flush()); + uint64_t saved_read_ts1 = ts++; + key = 99; + for (int i = 0; i < 4; ++i, ++ts) { + std::string ts_str = Timestamp(ts); + Slice ts_slice = ts_str; + write_opts.timestamp = &ts_slice; + ASSERT_OK(db_->Put(write_opts, Key1(key), "bar_" + std::to_string(key))); + } + ASSERT_OK(Flush()); + uint64_t saved_read_ts2 = ts++; + // Write another L0 with keys 99, 100, 101, ..., 150 + for (; key <= 150; ++key, ++ts) { + std::string ts_str = Timestamp(ts); + Slice ts_slice = ts_str; + write_opts.timestamp = &ts_slice; + ASSERT_OK(db_->Put(write_opts, Key1(key), "foo1_" + std::to_string(key))); + } + ASSERT_OK(Flush()); + // Wait for compaction to finish + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + uint64_t read_ts = ts; + ASSERT_EQ("foo_99", Get(Key1(99), saved_read_ts1)); + ASSERT_EQ("bar_99", Get(Key1(99), saved_read_ts2)); + ASSERT_EQ("foo1_99", Get(Key1(99), read_ts)); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/version_set.cc b/db/version_set.cc index 4500d1262..4712f0694 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -990,9 +990,9 @@ class LevelIterator final : public InternalIterator { if (read_options_.iterate_lower_bound != nullptr && file_index_ < flevel_->num_files) { may_be_out_of_lower_bound_ = - user_comparator_.Compare( - ExtractUserKey(file_smallest_key(file_index_)), - *read_options_.iterate_lower_bound) < 0; + user_comparator_.CompareWithoutTimestamp( + ExtractUserKey(file_smallest_key(file_index_)), /*a_has_ts=*/true, + *read_options_.iterate_lower_bound, /*b_has_ts=*/false) < 0; } } diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 1c73cd713..a89d497cd 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -96,7 +96,7 @@ Options GetOptionsWithNumLevels(int num_levels, return opt; } -class VersionStorageInfoTest : public testing::Test { +class VersionStorageInfoTestBase : public testing::Test { public: const Comparator* ucmp_; InternalKeyComparator icmp_; @@ -111,17 +111,19 @@ class VersionStorageInfoTest : public testing::Test { return InternalKey(ukey, smallest_seq, kTypeValue); } - VersionStorageInfoTest() - : ucmp_(BytewiseComparator()), + explicit VersionStorageInfoTestBase(const Comparator* ucmp) + : ucmp_(ucmp), icmp_(ucmp_), logger_(new CountingLogger()), options_(GetOptionsWithNumLevels(6, logger_)), ioptions_(options_), mutable_cf_options_(options_), - vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel, nullptr, false) {} + vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel, + /*src_vstorage=*/nullptr, + /*_force_consistency_checks=*/false) {} - ~VersionStorageInfoTest() override { - for (int i = 0; i < vstorage_.num_levels(); i++) { + ~VersionStorageInfoTestBase() override { + for (int i = 0; i < vstorage_.num_levels(); ++i) { for (auto* f : vstorage_.LevelFiles(i)) { if (--f->refs == 0) { delete f; @@ -172,6 +174,13 @@ class VersionStorageInfoTest : public testing::Test { } }; +class VersionStorageInfoTest : public VersionStorageInfoTestBase { + public: + VersionStorageInfoTest() : VersionStorageInfoTestBase(BytewiseComparator()) {} + + ~VersionStorageInfoTest() override {} +}; + TEST_F(VersionStorageInfoTest, MaxBytesForLevelStatic) { ioptions_.level_compaction_dynamic_level_bytes = false; mutable_cf_options_.max_bytes_for_level_base = 10; @@ -412,6 +421,54 @@ TEST_F(VersionStorageInfoTest, GetOverlappingInputs) { 1, {"i", 0, kTypeValue}, {"j", 0, kTypeValue})); } +class VersionStorageInfoTimestampTest : public VersionStorageInfoTestBase { + public: + VersionStorageInfoTimestampTest() + : VersionStorageInfoTestBase(test::ComparatorWithU64Ts()) {} + ~VersionStorageInfoTimestampTest() override {} + std::string Timestamp(uint64_t ts) const { + std::string ret; + PutFixed64(&ret, ts); + return ret; + } + std::string PackUserKeyAndTimestamp(const Slice& ukey, uint64_t ts) const { + std::string ret; + ret.assign(ukey.data(), ukey.size()); + PutFixed64(&ret, ts); + return ret; + } +}; + +TEST_F(VersionStorageInfoTimestampTest, GetOverlappingInputs) { + Add(/*level=*/1, /*file_number=*/1, /*smallest=*/ + {PackUserKeyAndTimestamp("a", /*ts=*/9), /*s=*/0, kTypeValue}, + /*largest=*/ + {PackUserKeyAndTimestamp("a", /*ts=*/8), /*s=*/0, kTypeValue}, + /*file_size=*/100); + Add(/*level=*/1, /*file_number=*/2, /*smallest=*/ + {PackUserKeyAndTimestamp("a", /*ts=*/5), /*s=*/0, kTypeValue}, + /*largest=*/ + {PackUserKeyAndTimestamp("b", /*ts=*/10), /*s=*/0, kTypeValue}, + /*file_size=*/100); + Add(/*level=*/1, /*file_number=*/3, /*smallest=*/ + {PackUserKeyAndTimestamp("c", /*ts=*/12), /*s=*/0, kTypeValue}, + /*largest=*/ + {PackUserKeyAndTimestamp("d", /*ts=*/1), /*s=*/0, kTypeValue}, + /*file_size=*/100); + vstorage_.UpdateNumNonEmptyLevels(); + vstorage_.GenerateLevelFilesBrief(); + ASSERT_EQ( + "1,2", + GetOverlappingFiles( + /*level=*/1, + {PackUserKeyAndTimestamp("a", /*ts=*/12), /*s=*/0, kTypeValue}, + {PackUserKeyAndTimestamp("a", /*ts=*/11), /*s=*/0, kTypeValue})); + ASSERT_EQ("3", + GetOverlappingFiles( + /*level=*/1, + {PackUserKeyAndTimestamp("c", /*ts=*/15), /*s=*/0, kTypeValue}, + {PackUserKeyAndTimestamp("c", /*ts=*/2), /*s=*/0, kTypeValue})); +} class FindLevelFileTest : public testing::Test { public: diff --git a/src.mk b/src.mk index 1bcc22042..825499146 100644 --- a/src.mk +++ b/src.mk @@ -352,6 +352,7 @@ MAIN_SOURCES = \ db/db_logical_block_size_cache_test.cc \ db/db_universal_compaction_test.cc \ db/db_wal_test.cc \ + db/db_with_timestamp_compaction_test.cc \ db/db_write_test.cc \ db/dbformat_test.cc \ db/deletefile_test.cc \ diff --git a/test_util/testutil.cc b/test_util/testutil.cc index 2969a140a..f9f5468e6 100644 --- a/test_util/testutil.cc +++ b/test_util/testutil.cc @@ -118,6 +118,61 @@ class Uint64ComparatorImpl : public Comparator { void FindShortSuccessor(std::string* /*key*/) const override { return; } }; + +// A test implementation of comparator with 64-bit integer timestamp. +class ComparatorWithU64TsImpl : public Comparator { + public: + ComparatorWithU64TsImpl() + : Comparator(/*ts_sz=*/sizeof(uint64_t)), + cmp_without_ts_(BytewiseComparator()) { + assert(cmp_without_ts_); + assert(cmp_without_ts_->timestamp_size() == 0); + } + const char* Name() const override { return "ComparatorWithU64Ts"; } + void FindShortSuccessor(std::string*) const override {} + void FindShortestSeparator(std::string*, const Slice&) const override {} + int Compare(const Slice& a, const Slice& b) const override { + int ret = CompareWithoutTimestamp(a, b); + size_t ts_sz = timestamp_size(); + if (ret != 0) { + return ret; + } + // Compare timestamp. + // For the same user key with different timestamps, larger (newer) timestamp + // comes first. + return -CompareTimestamp(ExtractTimestampFromUserKey(a, ts_sz), + ExtractTimestampFromUserKey(b, ts_sz)); + } + using Comparator::CompareWithoutTimestamp; + int CompareWithoutTimestamp(const Slice& a, bool a_has_ts, const Slice& b, + bool b_has_ts) const override { + const size_t ts_sz = timestamp_size(); + assert(!a_has_ts || a.size() >= ts_sz); + assert(!b_has_ts || b.size() >= ts_sz); + Slice lhs = a_has_ts ? StripTimestampFromUserKey(a, ts_sz) : a; + Slice rhs = b_has_ts ? StripTimestampFromUserKey(b, ts_sz) : b; + return cmp_without_ts_->Compare(lhs, rhs); + } + int CompareTimestamp(const Slice& ts1, const Slice& ts2) const override { + size_t ts_sz = timestamp_size(); + assert(ts1.size() == ts_sz); + assert(ts2.size() == ts_sz); + assert(ts_sz == sizeof(uint64_t)); + uint64_t lhs = DecodeFixed64(ts1.data()); + uint64_t rhs = DecodeFixed64(ts2.data()); + if (lhs < rhs) { + return -1; + } else if (lhs > rhs) { + return 1; + } else { + return 0; + } + } + + private: + const Comparator* cmp_without_ts_{nullptr}; +}; + } // namespace const Comparator* Uint64Comparator() { @@ -125,6 +180,11 @@ const Comparator* Uint64Comparator() { return &uint64comp; } +const Comparator* ComparatorWithU64Ts() { + static ComparatorWithU64TsImpl comp_with_u64_ts; + return &comp_with_u64_ts; +} + WritableFileWriter* GetWritableFileWriter(WritableFile* wf, const std::string& fname) { std::unique_ptr file(wf); diff --git a/test_util/testutil.h b/test_util/testutil.h index 3f6b47929..965d65664 100644 --- a/test_util/testutil.h +++ b/test_util/testutil.h @@ -777,6 +777,8 @@ class ChanglingCompactionFilterFactory : public CompactionFilterFactory { std::string name_; }; +extern const Comparator* ComparatorWithU64Ts(); + CompressionType RandomCompressionType(Random* rnd); void RandomCompressionTypeVector(const size_t count,