Make CompactRange and GetApproximateSizes work with timestamp (#7684)

Summary:
Add timestamp to the `CompactRange()` and `GetApproximateSizes` range keys if needed.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7684

Test Plan: make check

Reviewed By: riversand963

Differential Revision: D25015421

Pulled By: jay-zhuang

fbshipit-source-id: 51ca0756087eb053a3b11801e5c7ce1c6e2d38a9
main
Jay Zhuang 4 years ago committed by Facebook GitHub Bot
parent e062a719cc
commit 7fec715db4
  1. 3
      HISTORY.md
  2. 22
      db/db_impl/db_impl.cc
  3. 9
      db/db_impl/db_impl.h
  4. 36
      db/db_impl/db_impl_compaction_flush.cc
  5. 119
      db/db_with_timestamp_basic_test.cc
  6. 16
      db/dbformat.cc
  7. 8
      db/dbformat.h
  8. 3
      include/rocksdb/db.h

@ -10,6 +10,9 @@
* Fixed the logic of populating native data structure for `read_amp_bytes_per_bit` during OPTIONS file parsing on big-endian architecture. Without this fix, original code introduced in PR7659, when running on big-endian machine, can mistakenly store read_amp_bytes_per_bit (an uint32) in little endian format. Future access to `read_amp_bytes_per_bit` will give wrong values. Little endian architecture is not affected.
* Fixed prefix extractor with timestamp issues.
### New Features
* User defined timestamp feature supports `CompactRange` and `GetApproximateSizes`.
## 6.15.0 (11/13/2020)
### Bug Fixes
* Fixed a bug in the following combination of features: indexes with user keys (`format_version >= 3`), indexes are partitioned (`index_type == kTwoLevelIndexSearch`), and some index partitions are pinned in memory (`BlockBasedTableOptions::pin_l0_filter_and_index_blocks_in_cache`). The bug could cause keys to be truncated when read from the index leading to wrong read results or other unexpected behavior.

@ -3401,6 +3401,10 @@ Status DBImpl::GetApproximateSizes(const SizeApproximationOptions& options,
return Status::InvalidArgument("Invalid options");
}
const Comparator* const ucmp = column_family->GetComparator();
assert(ucmp);
size_t ts_sz = ucmp->timestamp_size();
Version* v;
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
auto cfd = cfh->cfd();
@ -3408,9 +3412,23 @@ Status DBImpl::GetApproximateSizes(const SizeApproximationOptions& options,
v = sv->current;
for (int i = 0; i < n; i++) {
Slice start = range[i].start;
Slice limit = range[i].limit;
// Add timestamp if needed
std::string start_with_ts, limit_with_ts;
if (ts_sz > 0) {
// Maximum timestamp means including all key with any timestamp
AppendKeyWithMaxTimestamp(&start_with_ts, start, ts_sz);
// Append a maximum timestamp as the range limit is exclusive:
// [start, limit)
AppendKeyWithMaxTimestamp(&limit_with_ts, limit, ts_sz);
start = start_with_ts;
limit = limit_with_ts;
}
// Convert user_key into a corresponding internal key.
InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
InternalKey k1(start, kMaxSequenceNumber, kValueTypeForSeek);
InternalKey k2(limit, kMaxSequenceNumber, kValueTypeForSeek);
sizes[i] = 0;
if (options.include_files) {
sizes[i] += versions_->ApproximateSize(

@ -1110,6 +1110,15 @@ class DBImpl : public DB {
// If need_enter_write_thread = false, the method will enter write thread.
Status WriteOptionsFile(bool need_mutex_lock, bool need_enter_write_thread);
Status CompactRangeInternal(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end);
Status GetApproximateSizesInternal(const SizeApproximationOptions& options,
ColumnFamilyHandle* column_family,
const Range* range, int n,
uint64_t* sizes);
// The following two functions can only be called when:
// 1. WriteThread::Writer::EnterUnbatched() is used.
// 2. db_mutex is NOT held

@ -773,7 +773,41 @@ void DBImpl::NotifyOnFlushCompleted(
Status DBImpl::CompactRange(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) {
const Slice* begin_without_ts,
const Slice* end_without_ts) {
const Comparator* const ucmp = column_family->GetComparator();
assert(ucmp);
size_t ts_sz = ucmp->timestamp_size();
if (ts_sz == 0) {
return CompactRangeInternal(options, column_family, begin_without_ts,
end_without_ts);
}
std::string begin_str;
std::string end_str;
// CompactRange compact all keys: [begin, end] inclusively. Add maximum
// timestamp to include all `begin` keys, and add minimal timestamp to include
// all `end` keys.
if (begin_without_ts != nullptr) {
AppendKeyWithMaxTimestamp(&begin_str, *begin_without_ts, ts_sz);
}
if (end_without_ts != nullptr) {
AppendKeyWithMinTimestamp(&end_str, *end_without_ts, ts_sz);
}
Slice begin(begin_str);
Slice end(end_str);
Slice* begin_with_ts = begin_without_ts ? &begin : nullptr;
Slice* end_with_ts = end_without_ts ? &end : nullptr;
return CompactRangeInternal(options, column_family, begin_with_ts,
end_with_ts);
}
Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
auto cfd = cfh->cfd();

@ -195,6 +195,113 @@ class DBBasicTestWithTimestamp : public DBBasicTestWithTimestampBase {
: DBBasicTestWithTimestampBase("db_basic_test_with_timestamp") {}
};
TEST_F(DBBasicTestWithTimestamp, CompactRangeWithSpecifiedRange) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
WriteOptions write_opts;
std::string ts_str = Timestamp(1, 0);
Slice ts = ts_str;
write_opts.timestamp = &ts;
ASSERT_OK(db_->Put(write_opts, "foo1", "bar"));
ASSERT_OK(Flush());
ASSERT_OK(db_->Put(write_opts, "foo2", "bar"));
ASSERT_OK(Flush());
std::string start_str = "foo";
std::string end_str = "foo2";
Slice start(start_str), end(end_str);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
Close();
}
TEST_F(DBBasicTestWithTimestamp, GetApproximateSizes) {
Options options = CurrentOptions();
options.write_buffer_size = 100000000; // Large write buffer
options.compression = kNoCompression;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
auto default_cf = db_->DefaultColumnFamily();
WriteOptions write_opts;
std::string ts_str = Timestamp(1, 0);
Slice ts = ts_str;
write_opts.timestamp = &ts;
const int N = 128;
Random rnd(301);
for (int i = 0; i < N; i++) {
ASSERT_OK(db_->Put(write_opts, Key(i), rnd.RandomString(1024)));
}
uint64_t size;
std::string start = Key(50);
std::string end = Key(60);
Range r(start, end);
SizeApproximationOptions size_approx_options;
size_approx_options.include_memtabtles = true;
size_approx_options.include_files = true;
ASSERT_OK(
db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size));
ASSERT_GT(size, 6000);
ASSERT_LT(size, 204800);
// test multiple ranges
std::vector<Range> ranges;
std::string start_tmp = Key(10);
std::string end_tmp = Key(20);
ranges.emplace_back(Range(start_tmp, end_tmp));
ranges.emplace_back(Range(start, end));
uint64_t range_sizes[2];
ASSERT_OK(db_->GetApproximateSizes(size_approx_options, default_cf,
ranges.data(), 2, range_sizes));
ASSERT_EQ(range_sizes[1], size);
// Zero if not including mem table
db_->GetApproximateSizes(&r, 1, &size);
ASSERT_EQ(size, 0);
start = Key(500);
end = Key(600);
r = Range(start, end);
ASSERT_OK(
db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size));
ASSERT_EQ(size, 0);
// Test range boundaries
ASSERT_OK(db_->Put(write_opts, Key(1000), rnd.RandomString(1024)));
// Should include start key
start = Key(1000);
end = Key(1100);
r = Range(start, end);
ASSERT_OK(
db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size));
ASSERT_GT(size, 0);
// Should exclude end key
start = Key(900);
end = Key(1000);
r = Range(start, end);
ASSERT_OK(
db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size));
ASSERT_EQ(size, 0);
std::cout << size << std::endl;
Close();
}
TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) {
const int kNumKeysPerFile = 128;
const uint64_t kMaxKey = 1024;
@ -283,16 +390,16 @@ TEST_F(DBBasicTestWithTimestamp, SeekWithPrefixLessThanKey) {
write_opts.timestamp = &ts;
ASSERT_OK(db_->Put(write_opts, "foo1", "bar"));
Flush();
ASSERT_OK(Flush());
ASSERT_OK(db_->Put(write_opts, "foo2", "bar"));
Flush();
ASSERT_OK(Flush());
// Move sst file to next level
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_OK(db_->Put(write_opts, "foo3", "bar"));
Flush();
ASSERT_OK(Flush());
ReadOptions read_opts;
std::string read_ts = Timestamp(1, 0);
@ -338,16 +445,16 @@ TEST_F(DBBasicTestWithTimestamp, SeekWithPrefixLargerThanKey) {
write_opts.timestamp = &ts;
ASSERT_OK(db_->Put(write_opts, "foo1", "bar"));
Flush();
ASSERT_OK(Flush());
ASSERT_OK(db_->Put(write_opts, "foo2", "bar"));
Flush();
ASSERT_OK(Flush());
// Move sst file to next level
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_OK(db_->Put(write_opts, "foo3", "bar"));
Flush();
ASSERT_OK(Flush());
ReadOptions read_opts;
std::string read_ts = Timestamp(2, 0);

@ -78,6 +78,22 @@ void AppendInternalKeyFooter(std::string* result, SequenceNumber s,
PutFixed64(result, PackSequenceAndType(s, t));
}
void AppendKeyWithMinTimestamp(std::string* result, const Slice& key,
size_t ts_sz) {
assert(ts_sz > 0);
const std::string kTsMin(ts_sz, static_cast<unsigned char>(0));
result->append(key.data(), key.size());
result->append(kTsMin.data(), ts_sz);
}
void AppendKeyWithMaxTimestamp(std::string* result, const Slice& key,
size_t ts_sz) {
assert(ts_sz > 0);
const std::string kTsMax(ts_sz, static_cast<unsigned char>(0xff));
result->append(key.data(), key.size());
result->append(kTsMax.data(), ts_sz);
}
std::string ParsedInternalKey::DebugString(bool log_err_key, bool hex) const {
std::string result = "'";
if (log_err_key) {

@ -167,6 +167,14 @@ extern void AppendInternalKeyWithDifferentTimestamp(
extern void AppendInternalKeyFooter(std::string* result, SequenceNumber s,
ValueType t);
// Append the key and a minimal timestamp to *result
extern void AppendKeyWithMinTimestamp(std::string* result, const Slice& key,
size_t ts_sz);
// Append the key and a maximal timestamp to *result
extern void AppendKeyWithMaxTimestamp(std::string* result, const Slice& key,
size_t ts_sz);
// Attempt to parse an internal key from "internal_key". On success,
// stores the parsed data in "*result", and returns true.
//

@ -1032,7 +1032,8 @@ class DB {
(include_flags & SizeApproximationFlags::INCLUDE_MEMTABLES) != 0;
options.include_files =
(include_flags & SizeApproximationFlags::INCLUDE_FILES) != 0;
GetApproximateSizes(options, column_family, ranges, n, sizes);
Status s = GetApproximateSizes(options, column_family, ranges, n, sizes);
s.PermitUncheckedError();
}
virtual void GetApproximateSizes(const Range* ranges, int n, uint64_t* sizes,
uint8_t include_flags = INCLUDE_FILES) {

Loading…
Cancel
Save