Compaction with timestamp: input boundaries (#6645)

Summary:
Towards making compaction logic compatible with user timestamp.
When computing boundaries and overlapping ranges for inputs of compaction, We need to compare SSTs by user key without timestamp.

Test plan (devserver):
```
make check
```
Several individual tests:
```
./version_set_test --gtest_filter=VersionStorageInfoTimestampTest.GetOverlappingInputs
./db_with_timestamp_compaction_test
./db_with_timestamp_basic_test
```
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6645

Reviewed By: ltamasi

Differential Revision: D20960012

Pulled By: riversand963

fbshipit-source-id: ad377fa9eb481bf7a8a3e1824aaade48cdc653a4
main
Yanqin Jin 4 years ago committed by Facebook GitHub Bot
parent a0faff126d
commit 0c05624d50
  1. 1
      CMakeLists.txt
  2. 4
      Makefile
  3. 7
      TARGETS
  4. 2
      db/compaction/compaction.cc
  5. 12
      db/db_compaction_test.cc
  6. 6
      db/db_iter.cc
  7. 13
      db/db_with_timestamp_basic_test.cc
  8. 119
      db/db_with_timestamp_compaction_test.cc
  9. 6
      db/version_set.cc
  10. 69
      db/version_set_test.cc
  11. 1
      src.mk
  12. 60
      test_util/testutil.cc
  13. 2
      test_util/testutil.h

@ -988,6 +988,7 @@ if(WITH_TESTS)
db/db_logical_block_size_cache_test.cc
db/db_universal_compaction_test.cc
db/db_wal_test.cc
db/db_with_timestamp_compaction_test.cc
db/db_write_test.cc
db/dbformat_test.cc
db/deletefile_test.cc

@ -609,6 +609,7 @@ TESTS = \
blob_file_addition_test \
blob_file_garbage_test \
timer_test \
db_with_timestamp_compaction_test \
ifeq ($(USE_FOLLY_DISTRIBUTED_MUTEX),1)
TESTS += folly_synchronization_distributed_mutex_test
@ -1324,6 +1325,9 @@ db_basic_test: db/db_basic_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
db_with_timestamp_basic_test: db/db_with_timestamp_basic_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_with_timestamp_compaction_test: db/db_with_timestamp_compaction_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_encryption_test: db/db_encryption_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

@ -931,6 +931,13 @@ ROCKS_TESTS = [
[],
[],
],
[
"db_with_timestamp_compaction_test",
"db/db_with_timestamp_compaction_test.cc",
"serial",
[],
[],
],
[
"db_write_test",
"db/db_write_test.cc",

@ -23,7 +23,7 @@ const uint64_t kRangeTombstoneSentinel =
int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
const InternalKey& b) {
auto c = user_cmp->Compare(a.user_key(), b.user_key());
auto c = user_cmp->CompareWithoutTimestamp(a.user_key(), b.user_key());
if (c != 0) {
return c;
}

@ -157,21 +157,21 @@ Options DeletionTriggerOptions(Options options) {
bool HaveOverlappingKeyRanges(
const Comparator* c,
const SstFileMetaData& a, const SstFileMetaData& b) {
if (c->Compare(a.smallestkey, b.smallestkey) >= 0) {
if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
if (c->CompareWithoutTimestamp(a.smallestkey, b.smallestkey) >= 0) {
if (c->CompareWithoutTimestamp(a.smallestkey, b.largestkey) <= 0) {
// b.smallestkey <= a.smallestkey <= b.largestkey
return true;
}
} else if (c->Compare(a.largestkey, b.smallestkey) >= 0) {
} else if (c->CompareWithoutTimestamp(a.largestkey, b.smallestkey) >= 0) {
// a.smallestkey < b.smallestkey <= a.largestkey
return true;
}
if (c->Compare(a.largestkey, b.largestkey) <= 0) {
if (c->Compare(a.largestkey, b.smallestkey) >= 0) {
if (c->CompareWithoutTimestamp(a.largestkey, b.largestkey) <= 0) {
if (c->CompareWithoutTimestamp(a.largestkey, b.smallestkey) >= 0) {
// b.smallestkey <= a.largestkey <= b.largestkey
return true;
}
} else if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
} else if (c->CompareWithoutTimestamp(a.smallestkey, b.largestkey) <= 0) {
// a.smallestkey <= b.largestkey < a.largestkey
return true;
}

@ -256,7 +256,11 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
// possibly be skipped. This condition can potentially be relaxed to
// prev_key.seq <= ikey_.sequence. We are cautious because it will be more
// prone to bugs causing the same user key with the same sequence number.
if (!is_prev_key_seqnum_zero && skipping_saved_key &&
// Note that with current timestamp implementation, the same user key can
// have different timestamps and zero sequence number on the bottommost
// level. This will change in the future.
if ((!is_prev_key_seqnum_zero || timestamp_size_ > 0) &&
skipping_saved_key &&
CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) <= 0) {
num_skipped++; // skip this entry
PERF_COUNTER_ADD(internal_key_skipped_count, 1);

@ -173,8 +173,6 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) {
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
// TODO(yanqin) re-enable auto compaction
options.disable_auto_compactions = true;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
@ -240,7 +238,6 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) {
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.disable_auto_compactions = true;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
@ -293,7 +290,9 @@ TEST_F(DBBasicTestWithTimestamp, ForwardIterateStartSeqnum) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
// TODO(yanqin) re-enable auto compaction
// Need to disable compaction to bottommost level when sequence number will be
// zeroed out, causing the verification of sequence number to fail in this
// test.
options.disable_auto_compactions = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
@ -351,8 +350,6 @@ TEST_F(DBBasicTestWithTimestamp, ReseekToTargetTimestamp) {
constexpr size_t kNumKeys = 16;
options.max_sequential_skip_in_iterations = kNumKeys / 2;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
// TODO(yanqin) re-enable auto compaction
options.disable_auto_compactions = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
@ -388,8 +385,6 @@ TEST_F(DBBasicTestWithTimestamp, ReseekToNextUserKey) {
constexpr size_t kNumKeys = 16;
options.max_sequential_skip_in_iterations = kNumKeys / 2;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
// TODO(yanqin) re-enable auto compaction
options.disable_auto_compactions = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
@ -821,8 +816,6 @@ TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
// TODO(yanqin): re-enable auto compactions
options.disable_auto_compactions = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;

@ -0,0 +1,119 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/compaction/compaction.h"
#include "db/db_test_util.h"
#include "port/stack_trace.h"
namespace ROCKSDB_NAMESPACE {
namespace {
std::string Key1(uint64_t key) {
std::string ret;
PutFixed64(&ret, key);
std::reverse(ret.begin(), ret.end());
return ret;
}
std::string Timestamp(uint64_t ts) {
std::string ret;
PutFixed64(&ret, ts);
return ret;
}
} // anonymous namespace
class TimestampCompatibleCompactionTest : public DBTestBase {
public:
TimestampCompatibleCompactionTest()
: DBTestBase("/ts_compatible_compaction_test") {}
std::string Get(const std::string& key, uint64_t ts) {
ReadOptions read_opts;
std::string ts_str = Timestamp(ts);
Slice ts_slice = ts_str;
read_opts.timestamp = &ts_slice;
std::string value;
Status s = db_->Get(read_opts, key, &value);
if (s.IsNotFound()) {
value.assign("NOT_FOUND");
} else if (!s.ok()) {
value.assign(s.ToString());
}
return value;
}
};
TEST_F(TimestampCompatibleCompactionTest, UserKeyCrossFileBoundary) {
Options options = CurrentOptions();
options.env = env_;
options.compaction_style = kCompactionStyleLevel;
options.comparator = test::ComparatorWithU64Ts();
options.level0_file_num_compaction_trigger = 3;
constexpr size_t kNumKeysPerFile = 101;
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
const auto* compaction = reinterpret_cast<Compaction*>(arg);
ASSERT_NE(nullptr, compaction);
ASSERT_EQ(0, compaction->start_level());
ASSERT_EQ(1, compaction->num_input_levels());
// Check that all 3 L0 ssts are picked for level compaction.
ASSERT_EQ(3, compaction->num_input_files(0));
});
SyncPoint::GetInstance()->EnableProcessing();
// Write a L0 with keys 0, 1, ..., 99 with ts from 100 to 199.
uint64_t ts = 100;
uint64_t key = 0;
WriteOptions write_opts;
for (; key < kNumKeysPerFile - 1; ++key, ++ts) {
std::string ts_str = Timestamp(ts);
Slice ts_slice = ts_str;
write_opts.timestamp = &ts_slice;
ASSERT_OK(db_->Put(write_opts, Key1(key), "foo_" + std::to_string(key)));
}
// Write another L0 with keys 99 with newer ts.
ASSERT_OK(Flush());
uint64_t saved_read_ts1 = ts++;
key = 99;
for (int i = 0; i < 4; ++i, ++ts) {
std::string ts_str = Timestamp(ts);
Slice ts_slice = ts_str;
write_opts.timestamp = &ts_slice;
ASSERT_OK(db_->Put(write_opts, Key1(key), "bar_" + std::to_string(key)));
}
ASSERT_OK(Flush());
uint64_t saved_read_ts2 = ts++;
// Write another L0 with keys 99, 100, 101, ..., 150
for (; key <= 150; ++key, ++ts) {
std::string ts_str = Timestamp(ts);
Slice ts_slice = ts_str;
write_opts.timestamp = &ts_slice;
ASSERT_OK(db_->Put(write_opts, Key1(key), "foo1_" + std::to_string(key)));
}
ASSERT_OK(Flush());
// Wait for compaction to finish
ASSERT_OK(dbfull()->TEST_WaitForCompact());
uint64_t read_ts = ts;
ASSERT_EQ("foo_99", Get(Key1(99), saved_read_ts1));
ASSERT_EQ("bar_99", Get(Key1(99), saved_read_ts2));
ASSERT_EQ("foo1_99", Get(Key1(99), read_ts));
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -990,9 +990,9 @@ class LevelIterator final : public InternalIterator {
if (read_options_.iterate_lower_bound != nullptr &&
file_index_ < flevel_->num_files) {
may_be_out_of_lower_bound_ =
user_comparator_.Compare(
ExtractUserKey(file_smallest_key(file_index_)),
*read_options_.iterate_lower_bound) < 0;
user_comparator_.CompareWithoutTimestamp(
ExtractUserKey(file_smallest_key(file_index_)), /*a_has_ts=*/true,
*read_options_.iterate_lower_bound, /*b_has_ts=*/false) < 0;
}
}

@ -96,7 +96,7 @@ Options GetOptionsWithNumLevels(int num_levels,
return opt;
}
class VersionStorageInfoTest : public testing::Test {
class VersionStorageInfoTestBase : public testing::Test {
public:
const Comparator* ucmp_;
InternalKeyComparator icmp_;
@ -111,17 +111,19 @@ class VersionStorageInfoTest : public testing::Test {
return InternalKey(ukey, smallest_seq, kTypeValue);
}
VersionStorageInfoTest()
: ucmp_(BytewiseComparator()),
explicit VersionStorageInfoTestBase(const Comparator* ucmp)
: ucmp_(ucmp),
icmp_(ucmp_),
logger_(new CountingLogger()),
options_(GetOptionsWithNumLevels(6, logger_)),
ioptions_(options_),
mutable_cf_options_(options_),
vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel, nullptr, false) {}
vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel,
/*src_vstorage=*/nullptr,
/*_force_consistency_checks=*/false) {}
~VersionStorageInfoTest() override {
for (int i = 0; i < vstorage_.num_levels(); i++) {
~VersionStorageInfoTestBase() override {
for (int i = 0; i < vstorage_.num_levels(); ++i) {
for (auto* f : vstorage_.LevelFiles(i)) {
if (--f->refs == 0) {
delete f;
@ -172,6 +174,13 @@ class VersionStorageInfoTest : public testing::Test {
}
};
class VersionStorageInfoTest : public VersionStorageInfoTestBase {
public:
VersionStorageInfoTest() : VersionStorageInfoTestBase(BytewiseComparator()) {}
~VersionStorageInfoTest() override {}
};
TEST_F(VersionStorageInfoTest, MaxBytesForLevelStatic) {
ioptions_.level_compaction_dynamic_level_bytes = false;
mutable_cf_options_.max_bytes_for_level_base = 10;
@ -412,6 +421,54 @@ TEST_F(VersionStorageInfoTest, GetOverlappingInputs) {
1, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}));
}
class VersionStorageInfoTimestampTest : public VersionStorageInfoTestBase {
public:
VersionStorageInfoTimestampTest()
: VersionStorageInfoTestBase(test::ComparatorWithU64Ts()) {}
~VersionStorageInfoTimestampTest() override {}
std::string Timestamp(uint64_t ts) const {
std::string ret;
PutFixed64(&ret, ts);
return ret;
}
std::string PackUserKeyAndTimestamp(const Slice& ukey, uint64_t ts) const {
std::string ret;
ret.assign(ukey.data(), ukey.size());
PutFixed64(&ret, ts);
return ret;
}
};
TEST_F(VersionStorageInfoTimestampTest, GetOverlappingInputs) {
Add(/*level=*/1, /*file_number=*/1, /*smallest=*/
{PackUserKeyAndTimestamp("a", /*ts=*/9), /*s=*/0, kTypeValue},
/*largest=*/
{PackUserKeyAndTimestamp("a", /*ts=*/8), /*s=*/0, kTypeValue},
/*file_size=*/100);
Add(/*level=*/1, /*file_number=*/2, /*smallest=*/
{PackUserKeyAndTimestamp("a", /*ts=*/5), /*s=*/0, kTypeValue},
/*largest=*/
{PackUserKeyAndTimestamp("b", /*ts=*/10), /*s=*/0, kTypeValue},
/*file_size=*/100);
Add(/*level=*/1, /*file_number=*/3, /*smallest=*/
{PackUserKeyAndTimestamp("c", /*ts=*/12), /*s=*/0, kTypeValue},
/*largest=*/
{PackUserKeyAndTimestamp("d", /*ts=*/1), /*s=*/0, kTypeValue},
/*file_size=*/100);
vstorage_.UpdateNumNonEmptyLevels();
vstorage_.GenerateLevelFilesBrief();
ASSERT_EQ(
"1,2",
GetOverlappingFiles(
/*level=*/1,
{PackUserKeyAndTimestamp("a", /*ts=*/12), /*s=*/0, kTypeValue},
{PackUserKeyAndTimestamp("a", /*ts=*/11), /*s=*/0, kTypeValue}));
ASSERT_EQ("3",
GetOverlappingFiles(
/*level=*/1,
{PackUserKeyAndTimestamp("c", /*ts=*/15), /*s=*/0, kTypeValue},
{PackUserKeyAndTimestamp("c", /*ts=*/2), /*s=*/0, kTypeValue}));
}
class FindLevelFileTest : public testing::Test {
public:

@ -352,6 +352,7 @@ MAIN_SOURCES = \
db/db_logical_block_size_cache_test.cc \
db/db_universal_compaction_test.cc \
db/db_wal_test.cc \
db/db_with_timestamp_compaction_test.cc \
db/db_write_test.cc \
db/dbformat_test.cc \
db/deletefile_test.cc \

@ -118,6 +118,61 @@ class Uint64ComparatorImpl : public Comparator {
void FindShortSuccessor(std::string* /*key*/) const override { return; }
};
// A test implementation of comparator with 64-bit integer timestamp.
class ComparatorWithU64TsImpl : public Comparator {
public:
ComparatorWithU64TsImpl()
: Comparator(/*ts_sz=*/sizeof(uint64_t)),
cmp_without_ts_(BytewiseComparator()) {
assert(cmp_without_ts_);
assert(cmp_without_ts_->timestamp_size() == 0);
}
const char* Name() const override { return "ComparatorWithU64Ts"; }
void FindShortSuccessor(std::string*) const override {}
void FindShortestSeparator(std::string*, const Slice&) const override {}
int Compare(const Slice& a, const Slice& b) const override {
int ret = CompareWithoutTimestamp(a, b);
size_t ts_sz = timestamp_size();
if (ret != 0) {
return ret;
}
// Compare timestamp.
// For the same user key with different timestamps, larger (newer) timestamp
// comes first.
return -CompareTimestamp(ExtractTimestampFromUserKey(a, ts_sz),
ExtractTimestampFromUserKey(b, ts_sz));
}
using Comparator::CompareWithoutTimestamp;
int CompareWithoutTimestamp(const Slice& a, bool a_has_ts, const Slice& b,
bool b_has_ts) const override {
const size_t ts_sz = timestamp_size();
assert(!a_has_ts || a.size() >= ts_sz);
assert(!b_has_ts || b.size() >= ts_sz);
Slice lhs = a_has_ts ? StripTimestampFromUserKey(a, ts_sz) : a;
Slice rhs = b_has_ts ? StripTimestampFromUserKey(b, ts_sz) : b;
return cmp_without_ts_->Compare(lhs, rhs);
}
int CompareTimestamp(const Slice& ts1, const Slice& ts2) const override {
size_t ts_sz = timestamp_size();
assert(ts1.size() == ts_sz);
assert(ts2.size() == ts_sz);
assert(ts_sz == sizeof(uint64_t));
uint64_t lhs = DecodeFixed64(ts1.data());
uint64_t rhs = DecodeFixed64(ts2.data());
if (lhs < rhs) {
return -1;
} else if (lhs > rhs) {
return 1;
} else {
return 0;
}
}
private:
const Comparator* cmp_without_ts_{nullptr};
};
} // namespace
const Comparator* Uint64Comparator() {
@ -125,6 +180,11 @@ const Comparator* Uint64Comparator() {
return &uint64comp;
}
const Comparator* ComparatorWithU64Ts() {
static ComparatorWithU64TsImpl comp_with_u64_ts;
return &comp_with_u64_ts;
}
WritableFileWriter* GetWritableFileWriter(WritableFile* wf,
const std::string& fname) {
std::unique_ptr<WritableFile> file(wf);

@ -777,6 +777,8 @@ class ChanglingCompactionFilterFactory : public CompactionFilterFactory {
std::string name_;
};
extern const Comparator* ComparatorWithU64Ts();
CompressionType RandomCompressionType(Random* rnd);
void RandomCompressionTypeVector(const size_t count,

Loading…
Cancel
Save