Add full_history_ts_low option to compaction (#7884)

Summary:
The full_history_ts_low is used for user-defined timestamp GC
compaction, which is introduced in https://github.com/facebook/rocksdb/issues/7740, https://github.com/facebook/rocksdb/issues/7657 and https://github.com/facebook/rocksdb/issues/7655.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7884

Reviewed By: ltamasi

Differential Revision: D25982553

Pulled By: jay-zhuang

fbshipit-source-id: 36303d412d65b5d8166b6da24fa21ad85adbabee
main
Jay Zhuang 4 years ago committed by Facebook GitHub Bot
parent 974458891c
commit cf160b98e1
  1. 1
      HISTORY.md
  2. 2
      db/db_impl/db_impl.h
  3. 35
      db/db_impl/db_impl_compaction_flush.cc
  4. 105
      db/db_with_timestamp_basic_test.cc
  5. 3
      include/rocksdb/options.h

@ -5,6 +5,7 @@
### New Features
* Add support for key-value integrity protection in live updates from the user buffers provided to `WriteBatch` through the write to RocksDB's in-memory update buffer (memtable). This is intended to detect some cases of in-memory data corruption, due to either software or hardware errors. Users can enable protection by constructing their `WriteBatch` with `protection_bytes_per_key == 8`.
* Add support for updating `full_history_ts_low` option in manual compaction, which is for old timestamp data GC.
### Bug Fixes
* Since 6.15.0, `TransactionDB` returns error `Status`es from calls to `DeleteRange()` and calls to `Write()` where the `WriteBatch` contains a range deletion. Previously such operations may have succeeded while not providing the expected transactional guarantees. There are certain cases where range deletion can still be used on such DBs; see the API doc on `TransactionDB::DeleteRange()` for details.

@ -1871,6 +1871,8 @@ class DBImpl : public DB {
Status DisableFileDeletionsWithLock();
Status IncreaseFullHistoryTsLow(ColumnFamilyData* cfd, std::string ts_low);
// table_cache_ provides its own synchronization
std::shared_ptr<Cache> table_cache_;

@ -806,6 +806,25 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
end_with_ts);
}
Status DBImpl::IncreaseFullHistoryTsLow(ColumnFamilyData* cfd,
std::string ts_low) {
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
edit.SetFullHistoryTsLow(ts_low);
InstrumentedMutexLock l(&mutex_);
std::string current_ts_low = cfd->GetFullHistoryTsLow();
const Comparator* ucmp = cfd->user_comparator();
if (!current_ts_low.empty() &&
ucmp->CompareTimestamp(ts_low, current_ts_low) < 0) {
return Status::InvalidArgument(
"Cannot decrease full_history_timestamp_low");
}
return versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit,
&mutex_);
}
Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) {
@ -817,6 +836,22 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
}
bool flush_needed = true;
// Update full_history_ts_low if it's set
if (options.full_history_ts_low != nullptr &&
!options.full_history_ts_low->empty()) {
std::string ts_low = options.full_history_ts_low->ToString();
if (begin != nullptr || end != nullptr) {
return Status::InvalidArgument(
"Cannot specify compaction range with full_history_ts_low");
}
Status s = IncreaseFullHistoryTsLow(cfd, ts_low);
if (!s.ok()) {
LogFlush(immutable_db_options_.info_log);
return s;
}
}
Status s;
if (begin != nullptr && end != nullptr) {
// TODO(ajkr): We could also optimize away the flush in certain cases where

@ -223,6 +223,111 @@ TEST_F(DBBasicTestWithTimestamp, CompactRangeWithSpecifiedRange) {
Close();
}
TEST_F(DBBasicTestWithTimestamp, UpdateFullHistoryTsLow) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
const std::string kKey = "test kKey";
// Test set ts_low first and flush()
int current_ts_low = 5;
std::string ts_low_str = Timestamp(current_ts_low, 0);
Slice ts_low = ts_low_str;
CompactRangeOptions comp_opts;
comp_opts.full_history_ts_low = &ts_low;
comp_opts.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(comp_opts, nullptr, nullptr));
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(db_->DefaultColumnFamily())
->cfd();
auto result_ts_low = cfd->GetFullHistoryTsLow();
ASSERT_TRUE(test_cmp.CompareTimestamp(ts_low, result_ts_low) == 0);
for (int i = 0; i < 10; i++) {
WriteOptions write_opts;
std::string ts_str = Timestamp(i, 0);
Slice ts = ts_str;
write_opts.timestamp = &ts;
ASSERT_OK(db_->Put(write_opts, kKey, Key(i)));
}
ASSERT_OK(Flush());
for (int i = 0; i < 10; i++) {
ReadOptions read_opts;
std::string ts_str = Timestamp(i, 0);
Slice ts = ts_str;
read_opts.timestamp = &ts;
std::string value;
Status status = db_->Get(read_opts, kKey, &value);
if (i < current_ts_low) {
ASSERT_TRUE(status.IsNotFound());
} else {
ASSERT_OK(status);
ASSERT_TRUE(value.compare(Key(i)) == 0);
}
}
// Test set ts_low and then trigger compaction
for (int i = 10; i < 20; i++) {
WriteOptions write_opts;
std::string ts_str = Timestamp(i, 0);
Slice ts = ts_str;
write_opts.timestamp = &ts;
ASSERT_OK(db_->Put(write_opts, kKey, Key(i)));
}
ASSERT_OK(Flush());
current_ts_low = 15;
ts_low_str = Timestamp(current_ts_low, 0);
ts_low = ts_low_str;
comp_opts.full_history_ts_low = &ts_low;
ASSERT_OK(db_->CompactRange(comp_opts, nullptr, nullptr));
result_ts_low = cfd->GetFullHistoryTsLow();
ASSERT_TRUE(test_cmp.CompareTimestamp(ts_low, result_ts_low) == 0);
for (int i = 0; i < 20; i++) {
ReadOptions read_opts;
std::string ts_str = Timestamp(i, 0);
Slice ts = ts_str;
read_opts.timestamp = &ts;
std::string value;
Status status = db_->Get(read_opts, kKey, &value);
if (i < current_ts_low) {
ASSERT_TRUE(status.IsNotFound());
} else {
ASSERT_OK(status);
ASSERT_TRUE(value.compare(Key(i)) == 0);
}
}
// Test invalid compaction with range
Slice start(kKey), end(kKey);
Status s = db_->CompactRange(comp_opts, &start, &end);
ASSERT_TRUE(s.IsInvalidArgument());
s = db_->CompactRange(comp_opts, &start, nullptr);
ASSERT_TRUE(s.IsInvalidArgument());
s = db_->CompactRange(comp_opts, nullptr, &end);
ASSERT_TRUE(s.IsInvalidArgument());
// Test invalid compaction with the decreasing ts_low
ts_low_str = Timestamp(current_ts_low - 1, 0);
ts_low = ts_low_str;
comp_opts.full_history_ts_low = &ts_low;
s = db_->CompactRange(comp_opts, nullptr, nullptr);
ASSERT_TRUE(s.IsInvalidArgument());
Close();
}
TEST_F(DBBasicTestWithTimestamp, GetApproximateSizes) {
Options options = CurrentOptions();
options.write_buffer_size = 100000000; // Large write buffer

@ -1580,6 +1580,9 @@ struct CompactRangeOptions {
bool allow_write_stall = false;
// If > 0, it will replace the option in the DBOptions for this compaction.
uint32_t max_subcompactions = 0;
// Set user-defined timestamp low bound, the data with older timestamp than
// low bound maybe GCed by compaction. Default: nullptr
Slice* full_history_ts_low = nullptr;
};
// IngestExternalFileOptions is used by IngestExternalFile()

Loading…
Cancel
Save