From 0c05624d501aaab577bea821a6124e711b5ccdf0 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 10 Apr 2020 16:03:33 -0700 Subject: [PATCH] Compaction with timestamp: input boundaries (#6645) Summary: Towards making compaction logic compatible with user timestamp. When computing boundaries and overlapping ranges for inputs of compaction, We need to compare SSTs by user key without timestamp. Test plan (devserver): ``` make check ``` Several individual tests: ``` ./version_set_test --gtest_filter=VersionStorageInfoTimestampTest.GetOverlappingInputs ./db_with_timestamp_compaction_test ./db_with_timestamp_basic_test ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/6645 Reviewed By: ltamasi Differential Revision: D20960012 Pulled By: riversand963 fbshipit-source-id: ad377fa9eb481bf7a8a3e1824aaade48cdc653a4 --- CMakeLists.txt | 1 + Makefile | 4 + TARGETS | 7 ++ db/compaction/compaction.cc | 2 +- db/db_compaction_test.cc | 12 +-- db/db_iter.cc | 6 +- db/db_with_timestamp_basic_test.cc | 13 +-- db/db_with_timestamp_compaction_test.cc | 119 ++++++++++++++++++++++++ db/version_set.cc | 6 +- db/version_set_test.cc | 69 ++++++++++++-- src.mk | 1 + test_util/testutil.cc | 60 ++++++++++++ test_util/testutil.h | 2 + 13 files changed, 275 insertions(+), 27 deletions(-) create mode 100644 db/db_with_timestamp_compaction_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index f98bfcaec..7a9bc71f8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -988,6 +988,7 @@ if(WITH_TESTS) db/db_logical_block_size_cache_test.cc db/db_universal_compaction_test.cc db/db_wal_test.cc + db/db_with_timestamp_compaction_test.cc db/db_write_test.cc db/dbformat_test.cc db/deletefile_test.cc diff --git a/Makefile b/Makefile index 5ef3cb96f..b7feaa017 100644 --- a/Makefile +++ b/Makefile @@ -609,6 +609,7 @@ TESTS = \ blob_file_addition_test \ blob_file_garbage_test \ timer_test \ + db_with_timestamp_compaction_test \ ifeq ($(USE_FOLLY_DISTRIBUTED_MUTEX),1) TESTS += folly_synchronization_distributed_mutex_test @@ -1324,6 +1325,9 @@ db_basic_test: db/db_basic_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) db_with_timestamp_basic_test: db/db_with_timestamp_basic_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +db_with_timestamp_compaction_test: db/db_with_timestamp_compaction_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_encryption_test: db/db_encryption_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 8e2f876f1..3943ad313 100644 --- a/TARGETS +++ b/TARGETS @@ -931,6 +931,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "db_with_timestamp_compaction_test", + "db/db_with_timestamp_compaction_test.cc", + "serial", + [], + [], + ], [ "db_write_test", "db/db_write_test.cc", diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index 5c34fdcaa..04a09365c 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -23,7 +23,7 @@ const uint64_t kRangeTombstoneSentinel = int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a, const InternalKey& b) { - auto c = user_cmp->Compare(a.user_key(), b.user_key()); + auto c = user_cmp->CompareWithoutTimestamp(a.user_key(), b.user_key()); if (c != 0) { return c; } diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index af2e7cdc2..d7738efa2 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -157,21 +157,21 @@ Options DeletionTriggerOptions(Options options) { bool HaveOverlappingKeyRanges( const Comparator* c, const SstFileMetaData& a, const SstFileMetaData& b) { - if (c->Compare(a.smallestkey, b.smallestkey) >= 0) { - if (c->Compare(a.smallestkey, b.largestkey) <= 0) { + if (c->CompareWithoutTimestamp(a.smallestkey, b.smallestkey) >= 0) { + if (c->CompareWithoutTimestamp(a.smallestkey, b.largestkey) <= 0) { // b.smallestkey <= a.smallestkey <= b.largestkey return true; } - } else if (c->Compare(a.largestkey, b.smallestkey) >= 0) { + } else if (c->CompareWithoutTimestamp(a.largestkey, b.smallestkey) >= 0) { // a.smallestkey < b.smallestkey <= a.largestkey return true; } - if (c->Compare(a.largestkey, b.largestkey) <= 0) { - if (c->Compare(a.largestkey, b.smallestkey) >= 0) { + if (c->CompareWithoutTimestamp(a.largestkey, b.largestkey) <= 0) { + if (c->CompareWithoutTimestamp(a.largestkey, b.smallestkey) >= 0) { // b.smallestkey <= a.largestkey <= b.largestkey return true; } - } else if (c->Compare(a.smallestkey, b.largestkey) <= 0) { + } else if (c->CompareWithoutTimestamp(a.smallestkey, b.largestkey) <= 0) { // a.smallestkey <= b.largestkey < a.largestkey return true; } diff --git a/db/db_iter.cc b/db/db_iter.cc index cd0073a04..64388e797 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -256,7 +256,11 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, // possibly be skipped. This condition can potentially be relaxed to // prev_key.seq <= ikey_.sequence. We are cautious because it will be more // prone to bugs causing the same user key with the same sequence number. - if (!is_prev_key_seqnum_zero && skipping_saved_key && + // Note that with current timestamp implementation, the same user key can + // have different timestamps and zero sequence number on the bottommost + // level. This will change in the future. + if ((!is_prev_key_seqnum_zero || timestamp_size_ > 0) && + skipping_saved_key && CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) <= 0) { num_skipped++; // skip this entry PERF_COUNTER_ADD(internal_key_skipped_count, 1); diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index f5bc19a4e..46687a5bc 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -173,8 +173,6 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) { const uint64_t kMaxKey = 1024; Options options = CurrentOptions(); options.env = env_; - // TODO(yanqin) re-enable auto compaction - options.disable_auto_compactions = true; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); @@ -240,7 +238,6 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) { const uint64_t kMaxKey = 1024; Options options = CurrentOptions(); options.env = env_; - options.disable_auto_compactions = true; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); @@ -293,7 +290,9 @@ TEST_F(DBBasicTestWithTimestamp, ForwardIterateStartSeqnum) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; - // TODO(yanqin) re-enable auto compaction + // Need to disable compaction to bottommost level when sequence number will be + // zeroed out, causing the verification of sequence number to fail in this + // test. options.disable_auto_compactions = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); @@ -351,8 +350,6 @@ TEST_F(DBBasicTestWithTimestamp, ReseekToTargetTimestamp) { constexpr size_t kNumKeys = 16; options.max_sequential_skip_in_iterations = kNumKeys / 2; options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); - // TODO(yanqin) re-enable auto compaction - options.disable_auto_compactions = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; @@ -388,8 +385,6 @@ TEST_F(DBBasicTestWithTimestamp, ReseekToNextUserKey) { constexpr size_t kNumKeys = 16; options.max_sequential_skip_in_iterations = kNumKeys / 2; options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); - // TODO(yanqin) re-enable auto compaction - options.disable_auto_compactions = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; @@ -821,8 +816,6 @@ TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; - // TODO(yanqin): re-enable auto compactions - options.disable_auto_compactions = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; diff --git a/db/db_with_timestamp_compaction_test.cc b/db/db_with_timestamp_compaction_test.cc new file mode 100644 index 000000000..f5cc54df1 --- /dev/null +++ b/db/db_with_timestamp_compaction_test.cc @@ -0,0 +1,119 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/compaction/compaction.h" +#include "db/db_test_util.h" +#include "port/stack_trace.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { +std::string Key1(uint64_t key) { + std::string ret; + PutFixed64(&ret, key); + std::reverse(ret.begin(), ret.end()); + return ret; +} + +std::string Timestamp(uint64_t ts) { + std::string ret; + PutFixed64(&ret, ts); + return ret; +} +} // anonymous namespace + +class TimestampCompatibleCompactionTest : public DBTestBase { + public: + TimestampCompatibleCompactionTest() + : DBTestBase("/ts_compatible_compaction_test") {} + + std::string Get(const std::string& key, uint64_t ts) { + ReadOptions read_opts; + std::string ts_str = Timestamp(ts); + Slice ts_slice = ts_str; + read_opts.timestamp = &ts_slice; + std::string value; + Status s = db_->Get(read_opts, key, &value); + if (s.IsNotFound()) { + value.assign("NOT_FOUND"); + } else if (!s.ok()) { + value.assign(s.ToString()); + } + return value; + } +}; + +TEST_F(TimestampCompatibleCompactionTest, UserKeyCrossFileBoundary) { + Options options = CurrentOptions(); + options.env = env_; + options.compaction_style = kCompactionStyleLevel; + options.comparator = test::ComparatorWithU64Ts(); + options.level0_file_num_compaction_trigger = 3; + constexpr size_t kNumKeysPerFile = 101; + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); + DestroyAndReopen(options); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) { + const auto* compaction = reinterpret_cast(arg); + ASSERT_NE(nullptr, compaction); + ASSERT_EQ(0, compaction->start_level()); + ASSERT_EQ(1, compaction->num_input_levels()); + // Check that all 3 L0 ssts are picked for level compaction. + ASSERT_EQ(3, compaction->num_input_files(0)); + }); + SyncPoint::GetInstance()->EnableProcessing(); + // Write a L0 with keys 0, 1, ..., 99 with ts from 100 to 199. + uint64_t ts = 100; + uint64_t key = 0; + WriteOptions write_opts; + for (; key < kNumKeysPerFile - 1; ++key, ++ts) { + std::string ts_str = Timestamp(ts); + Slice ts_slice = ts_str; + write_opts.timestamp = &ts_slice; + ASSERT_OK(db_->Put(write_opts, Key1(key), "foo_" + std::to_string(key))); + } + // Write another L0 with keys 99 with newer ts. + ASSERT_OK(Flush()); + uint64_t saved_read_ts1 = ts++; + key = 99; + for (int i = 0; i < 4; ++i, ++ts) { + std::string ts_str = Timestamp(ts); + Slice ts_slice = ts_str; + write_opts.timestamp = &ts_slice; + ASSERT_OK(db_->Put(write_opts, Key1(key), "bar_" + std::to_string(key))); + } + ASSERT_OK(Flush()); + uint64_t saved_read_ts2 = ts++; + // Write another L0 with keys 99, 100, 101, ..., 150 + for (; key <= 150; ++key, ++ts) { + std::string ts_str = Timestamp(ts); + Slice ts_slice = ts_str; + write_opts.timestamp = &ts_slice; + ASSERT_OK(db_->Put(write_opts, Key1(key), "foo1_" + std::to_string(key))); + } + ASSERT_OK(Flush()); + // Wait for compaction to finish + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + uint64_t read_ts = ts; + ASSERT_EQ("foo_99", Get(Key1(99), saved_read_ts1)); + ASSERT_EQ("bar_99", Get(Key1(99), saved_read_ts2)); + ASSERT_EQ("foo1_99", Get(Key1(99), read_ts)); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/version_set.cc b/db/version_set.cc index 4500d1262..4712f0694 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -990,9 +990,9 @@ class LevelIterator final : public InternalIterator { if (read_options_.iterate_lower_bound != nullptr && file_index_ < flevel_->num_files) { may_be_out_of_lower_bound_ = - user_comparator_.Compare( - ExtractUserKey(file_smallest_key(file_index_)), - *read_options_.iterate_lower_bound) < 0; + user_comparator_.CompareWithoutTimestamp( + ExtractUserKey(file_smallest_key(file_index_)), /*a_has_ts=*/true, + *read_options_.iterate_lower_bound, /*b_has_ts=*/false) < 0; } } diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 1c73cd713..a89d497cd 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -96,7 +96,7 @@ Options GetOptionsWithNumLevels(int num_levels, return opt; } -class VersionStorageInfoTest : public testing::Test { +class VersionStorageInfoTestBase : public testing::Test { public: const Comparator* ucmp_; InternalKeyComparator icmp_; @@ -111,17 +111,19 @@ class VersionStorageInfoTest : public testing::Test { return InternalKey(ukey, smallest_seq, kTypeValue); } - VersionStorageInfoTest() - : ucmp_(BytewiseComparator()), + explicit VersionStorageInfoTestBase(const Comparator* ucmp) + : ucmp_(ucmp), icmp_(ucmp_), logger_(new CountingLogger()), options_(GetOptionsWithNumLevels(6, logger_)), ioptions_(options_), mutable_cf_options_(options_), - vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel, nullptr, false) {} + vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel, + /*src_vstorage=*/nullptr, + /*_force_consistency_checks=*/false) {} - ~VersionStorageInfoTest() override { - for (int i = 0; i < vstorage_.num_levels(); i++) { + ~VersionStorageInfoTestBase() override { + for (int i = 0; i < vstorage_.num_levels(); ++i) { for (auto* f : vstorage_.LevelFiles(i)) { if (--f->refs == 0) { delete f; @@ -172,6 +174,13 @@ class VersionStorageInfoTest : public testing::Test { } }; +class VersionStorageInfoTest : public VersionStorageInfoTestBase { + public: + VersionStorageInfoTest() : VersionStorageInfoTestBase(BytewiseComparator()) {} + + ~VersionStorageInfoTest() override {} +}; + TEST_F(VersionStorageInfoTest, MaxBytesForLevelStatic) { ioptions_.level_compaction_dynamic_level_bytes = false; mutable_cf_options_.max_bytes_for_level_base = 10; @@ -412,6 +421,54 @@ TEST_F(VersionStorageInfoTest, GetOverlappingInputs) { 1, {"i", 0, kTypeValue}, {"j", 0, kTypeValue})); } +class VersionStorageInfoTimestampTest : public VersionStorageInfoTestBase { + public: + VersionStorageInfoTimestampTest() + : VersionStorageInfoTestBase(test::ComparatorWithU64Ts()) {} + ~VersionStorageInfoTimestampTest() override {} + std::string Timestamp(uint64_t ts) const { + std::string ret; + PutFixed64(&ret, ts); + return ret; + } + std::string PackUserKeyAndTimestamp(const Slice& ukey, uint64_t ts) const { + std::string ret; + ret.assign(ukey.data(), ukey.size()); + PutFixed64(&ret, ts); + return ret; + } +}; + +TEST_F(VersionStorageInfoTimestampTest, GetOverlappingInputs) { + Add(/*level=*/1, /*file_number=*/1, /*smallest=*/ + {PackUserKeyAndTimestamp("a", /*ts=*/9), /*s=*/0, kTypeValue}, + /*largest=*/ + {PackUserKeyAndTimestamp("a", /*ts=*/8), /*s=*/0, kTypeValue}, + /*file_size=*/100); + Add(/*level=*/1, /*file_number=*/2, /*smallest=*/ + {PackUserKeyAndTimestamp("a", /*ts=*/5), /*s=*/0, kTypeValue}, + /*largest=*/ + {PackUserKeyAndTimestamp("b", /*ts=*/10), /*s=*/0, kTypeValue}, + /*file_size=*/100); + Add(/*level=*/1, /*file_number=*/3, /*smallest=*/ + {PackUserKeyAndTimestamp("c", /*ts=*/12), /*s=*/0, kTypeValue}, + /*largest=*/ + {PackUserKeyAndTimestamp("d", /*ts=*/1), /*s=*/0, kTypeValue}, + /*file_size=*/100); + vstorage_.UpdateNumNonEmptyLevels(); + vstorage_.GenerateLevelFilesBrief(); + ASSERT_EQ( + "1,2", + GetOverlappingFiles( + /*level=*/1, + {PackUserKeyAndTimestamp("a", /*ts=*/12), /*s=*/0, kTypeValue}, + {PackUserKeyAndTimestamp("a", /*ts=*/11), /*s=*/0, kTypeValue})); + ASSERT_EQ("3", + GetOverlappingFiles( + /*level=*/1, + {PackUserKeyAndTimestamp("c", /*ts=*/15), /*s=*/0, kTypeValue}, + {PackUserKeyAndTimestamp("c", /*ts=*/2), /*s=*/0, kTypeValue})); +} class FindLevelFileTest : public testing::Test { public: diff --git a/src.mk b/src.mk index 1bcc22042..825499146 100644 --- a/src.mk +++ b/src.mk @@ -352,6 +352,7 @@ MAIN_SOURCES = \ db/db_logical_block_size_cache_test.cc \ db/db_universal_compaction_test.cc \ db/db_wal_test.cc \ + db/db_with_timestamp_compaction_test.cc \ db/db_write_test.cc \ db/dbformat_test.cc \ db/deletefile_test.cc \ diff --git a/test_util/testutil.cc b/test_util/testutil.cc index 2969a140a..f9f5468e6 100644 --- a/test_util/testutil.cc +++ b/test_util/testutil.cc @@ -118,6 +118,61 @@ class Uint64ComparatorImpl : public Comparator { void FindShortSuccessor(std::string* /*key*/) const override { return; } }; + +// A test implementation of comparator with 64-bit integer timestamp. +class ComparatorWithU64TsImpl : public Comparator { + public: + ComparatorWithU64TsImpl() + : Comparator(/*ts_sz=*/sizeof(uint64_t)), + cmp_without_ts_(BytewiseComparator()) { + assert(cmp_without_ts_); + assert(cmp_without_ts_->timestamp_size() == 0); + } + const char* Name() const override { return "ComparatorWithU64Ts"; } + void FindShortSuccessor(std::string*) const override {} + void FindShortestSeparator(std::string*, const Slice&) const override {} + int Compare(const Slice& a, const Slice& b) const override { + int ret = CompareWithoutTimestamp(a, b); + size_t ts_sz = timestamp_size(); + if (ret != 0) { + return ret; + } + // Compare timestamp. + // For the same user key with different timestamps, larger (newer) timestamp + // comes first. + return -CompareTimestamp(ExtractTimestampFromUserKey(a, ts_sz), + ExtractTimestampFromUserKey(b, ts_sz)); + } + using Comparator::CompareWithoutTimestamp; + int CompareWithoutTimestamp(const Slice& a, bool a_has_ts, const Slice& b, + bool b_has_ts) const override { + const size_t ts_sz = timestamp_size(); + assert(!a_has_ts || a.size() >= ts_sz); + assert(!b_has_ts || b.size() >= ts_sz); + Slice lhs = a_has_ts ? StripTimestampFromUserKey(a, ts_sz) : a; + Slice rhs = b_has_ts ? StripTimestampFromUserKey(b, ts_sz) : b; + return cmp_without_ts_->Compare(lhs, rhs); + } + int CompareTimestamp(const Slice& ts1, const Slice& ts2) const override { + size_t ts_sz = timestamp_size(); + assert(ts1.size() == ts_sz); + assert(ts2.size() == ts_sz); + assert(ts_sz == sizeof(uint64_t)); + uint64_t lhs = DecodeFixed64(ts1.data()); + uint64_t rhs = DecodeFixed64(ts2.data()); + if (lhs < rhs) { + return -1; + } else if (lhs > rhs) { + return 1; + } else { + return 0; + } + } + + private: + const Comparator* cmp_without_ts_{nullptr}; +}; + } // namespace const Comparator* Uint64Comparator() { @@ -125,6 +180,11 @@ const Comparator* Uint64Comparator() { return &uint64comp; } +const Comparator* ComparatorWithU64Ts() { + static ComparatorWithU64TsImpl comp_with_u64_ts; + return &comp_with_u64_ts; +} + WritableFileWriter* GetWritableFileWriter(WritableFile* wf, const std::string& fname) { std::unique_ptr file(wf); diff --git a/test_util/testutil.h b/test_util/testutil.h index 3f6b47929..965d65664 100644 --- a/test_util/testutil.h +++ b/test_util/testutil.h @@ -777,6 +777,8 @@ class ChanglingCompactionFilterFactory : public CompactionFilterFactory { std::string name_; }; +extern const Comparator* ComparatorWithU64Ts(); + CompressionType RandomCompressionType(Random* rnd); void RandomCompressionTypeVector(const size_t count,