Add OpenAndTrimHistory API to support trimming data with specified timestamp (#9410)

Summary:
As disscussed in (https://github.com/facebook/rocksdb/issues/9223), Here added a new API  named DB::OpenAndTrimHistory, this API will open DB and trim data to the timestamp specofied by **trim_ts** (The data with newer timestamp than specified trim bound will be removed). This API should only be used at a timestamp-enabled db instance recovery.

And this PR implemented a new iterator named HistoryTrimmingIterator to support trimming history with a new API named DB::OpenAndTrimHistory. HistoryTrimmingIterator wrapped around the underlying InternalITerator such that keys whose timestamps newer than **trim_ts** should not be returned to the compaction iterator while **trim_ts** is not null.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9410

Reviewed By: ltamasi

Differential Revision: D34410207

Pulled By: riversand963

fbshipit-source-id: e54049dc234eccd673244c566b15df58df5a6236
main
slk 3 years ago committed by Facebook GitHub Bot
parent e4c87773e1
commit 95305c44a1
  1. 3
      HISTORY.md
  2. 4
      db/column_family.cc
  3. 3
      db/column_family.h
  4. 5
      db/compaction/compaction.cc
  5. 9
      db/compaction/compaction.h
  6. 22
      db/compaction/compaction_job.cc
  7. 3
      db/compaction/compaction_job.h
  8. 7
      db/compaction/compaction_picker.cc
  9. 5
      db/compaction/compaction_picker.h
  10. 12
      db/compaction/compaction_picker_fifo.cc
  11. 2
      db/compaction/compaction_picker_fifo.h
  12. 3
      db/compaction/compaction_picker_level.cc
  13. 7
      db/compaction/compaction_picker_test.cc
  14. 11
      db/compaction/compaction_picker_universal.cc
  15. 6
      db/db_impl/db_impl.h
  16. 24
      db/db_impl/db_impl_compaction_flush.cc
  17. 3
      db/db_impl/db_impl_debug.cc
  18. 66
      db/db_impl/db_impl_open.cc
  19. 2
      db/db_range_del_test.cc
  20. 71
      db/db_with_timestamp_basic_test.cc
  21. 7
      db/dbformat.h
  22. 91
      db/history_trimming_iterator.h
  23. 13
      include/rocksdb/db.h

@ -1,5 +1,8 @@
# Rocksdb Change Log
## Unreleased
### Public API changes
* Add DB::OpenAndTrimHistory API. This API will open DB and trim data to the timestamp specified by trim_ts (The data with timestamp larger than specified trim bound will be removed). This API should only be used at a timestamp-enabled column families recovery. If the column family doesn't have timestamp enabled, this API won't trim any data on that column family. This API is not compatible with avoid_flush_during_recovery option.
### New Features
* Allow WriteBatchWithIndex to index a WriteBatch that includes keys with user-defined timestamps. The index itself does not have timestamp.
* Add support for user-defined timestamps to write-committed transaction without API change. The `TransactionDB` layer APIs do not allow timestamps because we require that all user-defined-timestamps-aware operations go through the `Transaction` APIs.

@ -1164,12 +1164,12 @@ Compaction* ColumnFamilyData::CompactRange(
int output_level, const CompactRangeOptions& compact_range_options,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end, bool* conflict,
uint64_t max_file_num_to_ignore) {
uint64_t max_file_num_to_ignore, const std::string& trim_ts) {
auto* result = compaction_picker_->CompactRange(
GetName(), mutable_cf_options, mutable_db_options,
current_->storage_info(), input_level, output_level,
compact_range_options, begin, end, compaction_end, conflict,
max_file_num_to_ignore);
max_file_num_to_ignore, trim_ts);
if (result != nullptr) {
result->SetInputVersion(current_);
}

@ -413,7 +413,8 @@ class ColumnFamilyData {
const CompactRangeOptions& compact_range_options,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end, bool* manual_conflict,
uint64_t max_file_num_to_ignore);
uint64_t max_file_num_to_ignore,
const std::string& trim_ts);
CompactionPicker* compaction_picker() { return compaction_picker_.get(); }
// thread-safe

@ -213,8 +213,8 @@ Compaction::Compaction(
uint32_t _output_path_id, CompressionType _compression,
CompressionOptions _compression_opts, Temperature _output_temperature,
uint32_t _max_subcompactions, std::vector<FileMetaData*> _grandparents,
bool _manual_compaction, double _score, bool _deletion_compaction,
CompactionReason _compaction_reason)
bool _manual_compaction, const std::string& _trim_ts, double _score,
bool _deletion_compaction, CompactionReason _compaction_reason)
: input_vstorage_(vstorage),
start_level_(_inputs[0].level),
output_level_(_output_level),
@ -237,6 +237,7 @@ Compaction::Compaction(
bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)),
is_full_compaction_(IsFullCompaction(vstorage, inputs_)),
is_manual_compaction_(_manual_compaction),
trim_ts_(_trim_ts),
is_trivial_move_(false),
compaction_reason_(_compaction_reason),
notify_on_compaction_completion_(false) {

@ -79,8 +79,8 @@ class Compaction {
CompressionOptions compression_opts,
Temperature output_temperature, uint32_t max_subcompactions,
std::vector<FileMetaData*> grandparents,
bool manual_compaction = false, double score = -1,
bool deletion_compaction = false,
bool manual_compaction = false, const std::string& trim_ts = "",
double score = -1, bool deletion_compaction = false,
CompactionReason compaction_reason = CompactionReason::kUnknown);
// No copying allowed
@ -208,6 +208,8 @@ class Compaction {
// Was this compaction triggered manually by the client?
bool is_manual_compaction() const { return is_manual_compaction_; }
std::string trim_ts() const { return trim_ts_; }
// Used when allow_trivial_move option is set in
// Universal compaction. If all the input files are
// non overlapping, then is_trivial_move_ variable
@ -385,6 +387,9 @@ class Compaction {
// Is this compaction requested by the client?
const bool is_manual_compaction_;
// The data with timestamp > trim_ts_ will be removed
const std::string trim_ts_;
// True if we can do trivial move in Universal multi level
// compaction
bool is_trivial_move_;

@ -31,6 +31,7 @@
#include "db/dbformat.h"
#include "db/error_handler.h"
#include "db/event_helpers.h"
#include "db/history_trimming_iterator.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
@ -429,7 +430,8 @@ CompactionJob::CompactionJob(
const std::atomic<int>* manual_compaction_paused,
const std::atomic<bool>* manual_compaction_canceled,
const std::string& db_id, const std::string& db_session_id,
std::string full_history_ts_low, BlobFileCompletionCallback* blob_callback)
std::string full_history_ts_low, std::string trim_ts,
BlobFileCompletionCallback* blob_callback)
: compact_(new CompactionState(compaction)),
compaction_stats_(compaction->compaction_reason(), 1),
db_options_(db_options),
@ -468,6 +470,7 @@ CompactionJob::CompactionJob(
measure_io_stats_(measure_io_stats),
thread_pri_(thread_pri),
full_history_ts_low_(std::move(full_history_ts_low)),
trim_ts_(std::move(trim_ts)),
blob_callback_(blob_callback) {
assert(compaction_job_stats_ != nullptr);
assert(log_buffer_ != nullptr);
@ -1380,21 +1383,28 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
std::unique_ptr<InternalIterator> clip;
if (start || end) {
clip.reset(new ClippingIterator(
clip = std::make_unique<ClippingIterator>(
raw_input.get(), start ? &start_slice : nullptr,
end ? &end_slice : nullptr, &cfd->internal_comparator()));
end ? &end_slice : nullptr, &cfd->internal_comparator());
input = clip.get();
}
std::unique_ptr<InternalIterator> blob_counter;
if (sub_compact->compaction->DoesInputReferenceBlobFiles()) {
sub_compact->blob_garbage_meter.reset(new BlobGarbageMeter);
blob_counter.reset(
new BlobCountingIterator(input, sub_compact->blob_garbage_meter.get()));
sub_compact->blob_garbage_meter = std::make_unique<BlobGarbageMeter>();
blob_counter = std::make_unique<BlobCountingIterator>(
input, sub_compact->blob_garbage_meter.get());
input = blob_counter.get();
}
std::unique_ptr<InternalIterator> trim_history_iter;
if (cfd->user_comparator()->timestamp_size() > 0 && !trim_ts_.empty()) {
trim_history_iter = std::make_unique<HistoryTrimmingIterator>(
input, cfd->user_comparator(), trim_ts_);
input = trim_history_iter.get();
}
input->SeekToFirst();
AutoThreadOperationStageUpdater stage_updater(

@ -82,7 +82,7 @@ class CompactionJob {
const std::atomic<int>* manual_compaction_paused = nullptr,
const std::atomic<bool>* manual_compaction_canceled = nullptr,
const std::string& db_id = "", const std::string& db_session_id = "",
std::string full_history_ts_low = "",
std::string full_history_ts_low = "", std::string trim_ts = "",
BlobFileCompletionCallback* blob_callback = nullptr);
virtual ~CompactionJob();
@ -226,6 +226,7 @@ class CompactionJob {
std::vector<uint64_t> sizes_;
Env::Priority thread_pri_;
std::string full_history_ts_low_;
std::string trim_ts_;
BlobFileCompletionCallback* blob_callback_;
uint64_t GetCompactionId(SubcompactionState* sub_compact);

@ -571,7 +571,7 @@ Compaction* CompactionPicker::CompactRange(
int input_level, int output_level,
const CompactRangeOptions& compact_range_options, const InternalKey* begin,
const InternalKey* end, InternalKey** compaction_end, bool* manual_conflict,
uint64_t max_file_num_to_ignore) {
uint64_t max_file_num_to_ignore, const std::string& trim_ts) {
// CompactionPickerFIFO has its own implementation of compact range
assert(ioptions_.compaction_style != kCompactionStyleFIFO);
@ -640,8 +640,7 @@ Compaction* CompactionPicker::CompactRange(
GetCompressionType(vstorage, mutable_cf_options, output_level, 1),
GetCompressionOptions(mutable_cf_options, vstorage, output_level),
Temperature::kUnknown, compact_range_options.max_subcompactions,
/* grandparents */ {},
/* is manual */ true);
/* grandparents */ {}, /* is manual */ true, trim_ts);
RegisterCompaction(c);
vstorage->ComputeCompactionScore(ioptions_, mutable_cf_options);
return c;
@ -820,7 +819,7 @@ Compaction* CompactionPicker::CompactRange(
GetCompressionOptions(mutable_cf_options, vstorage, output_level),
Temperature::kUnknown, compact_range_options.max_subcompactions,
std::move(grandparents),
/* is manual compaction */ true);
/* is manual compaction */ true, trim_ts);
TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction);
RegisterCompaction(compaction);

@ -78,7 +78,7 @@ class CompactionPicker {
const CompactRangeOptions& compact_range_options,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end, bool* manual_conflict,
uint64_t max_file_num_to_ignore);
uint64_t max_file_num_to_ignore, const std::string& trim_ts);
// The maximum allowed output level. Default value is NumberLevels() - 1.
virtual int MaxOutputLevel() const { return NumberLevels() - 1; }
@ -270,7 +270,8 @@ class NullCompactionPicker : public CompactionPicker {
const InternalKey* /*end*/,
InternalKey** /*compaction_end*/,
bool* /*manual_conflict*/,
uint64_t /*max_file_num_to_ignore*/) override {
uint64_t /*max_file_num_to_ignore*/,
const std::string& /*trim_ts*/) override {
return nullptr;
}

@ -115,7 +115,7 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction(
std::move(inputs), 0, 0, 0, 0, kNoCompression,
mutable_cf_options.compression_opts, Temperature::kUnknown,
/* max_subcompactions */ 0, {}, /* is manual */ false,
vstorage->CompactionScore(0),
/* trim_ts */ "", vstorage->CompactionScore(0),
/* is deletion compaction */ true, CompactionReason::kFIFOTtl);
return c;
}
@ -157,8 +157,8 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
0 /* max compaction bytes, not applicable */,
0 /* output path ID */, mutable_cf_options.compression,
mutable_cf_options.compression_opts, Temperature::kUnknown,
0 /* max_subcompactions */, {},
/* is manual */ false, vstorage->CompactionScore(0),
0 /* max_subcompactions */, {}, /* is manual */ false,
/* trim_ts */ "", vstorage->CompactionScore(0),
/* is deletion compaction */ false,
CompactionReason::kFIFOReduceNumFiles);
return c;
@ -208,7 +208,7 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
std::move(inputs), 0, 0, 0, 0, kNoCompression,
mutable_cf_options.compression_opts, Temperature::kUnknown,
/* max_subcompactions */ 0, {}, /* is manual */ false,
vstorage->CompactionScore(0),
/* trim_ts */ "", vstorage->CompactionScore(0),
/* is deletion compaction */ true, CompactionReason::kFIFOMaxSize);
return c;
}
@ -313,7 +313,7 @@ Compaction* FIFOCompactionPicker::PickCompactionToWarm(
0 /* max compaction bytes, not applicable */, 0 /* output path ID */,
mutable_cf_options.compression, mutable_cf_options.compression_opts,
Temperature::kWarm,
/* max_subcompactions */ 0, {}, /* is manual */ false,
/* max_subcompactions */ 0, {}, /* is manual */ false, /* trim_ts */ "",
vstorage->CompactionScore(0),
/* is deletion compaction */ false, CompactionReason::kChangeTemperature);
return c;
@ -349,7 +349,7 @@ Compaction* FIFOCompactionPicker::CompactRange(
const CompactRangeOptions& /*compact_range_options*/,
const InternalKey* /*begin*/, const InternalKey* /*end*/,
InternalKey** compaction_end, bool* /*manual_conflict*/,
uint64_t /*max_file_num_to_ignore*/) {
uint64_t /*max_file_num_to_ignore*/, const std::string& /*trim_ts*/) {
#ifdef NDEBUG
(void)input_level;
(void)output_level;

@ -32,7 +32,7 @@ class FIFOCompactionPicker : public CompactionPicker {
const CompactRangeOptions& compact_range_options,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end, bool* manual_conflict,
uint64_t max_file_num_to_ignore) override;
uint64_t max_file_num_to_ignore, const std::string& trim_ts) override;
// The maximum allowed output level. Always returns 0.
virtual int MaxOutputLevel() const override { return 0; }

@ -348,7 +348,8 @@ Compaction* LevelCompactionBuilder::GetCompaction() {
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level_),
Temperature::kUnknown,
/* max_subcompactions */ 0, std::move(grandparents_), is_manual_,
start_level_score_, false /* deletion_compaction */, compaction_reason_);
/* trim_ts */ "", start_level_score_, false /* deletion_compaction */,
compaction_reason_);
// If it's level 0 compaction, make sure we don't execute any other level 0
// compactions in parallel

@ -2648,12 +2648,13 @@ TEST_F(CompactionPickerTest, UniversalMarkedManualCompaction) {
ASSERT_EQ(3U, vstorage_->FilesMarkedForCompaction().size());
bool manual_conflict = false;
InternalKey* manual_end = NULL;
InternalKey* manual_end = nullptr;
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.CompactRange(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
ColumnFamilyData::kCompactAllLevels, 6, CompactRangeOptions(), NULL,
NULL, &manual_end, &manual_conflict, port::kMaxUint64));
ColumnFamilyData::kCompactAllLevels, 6, CompactRangeOptions(),
nullptr, nullptr, &manual_end, &manual_conflict, port::kMaxUint64,
""));
ASSERT_TRUE(compaction);

@ -757,7 +757,7 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns(
start_level, enable_compression),
Temperature::kUnknown,
/* max_subcompactions */ 0, grandparents,
/* is manual */ false, score_,
/* is manual */ false, /* trim_ts */ "", score_,
false /* deletion_compaction */, compaction_reason);
}
@ -1082,7 +1082,7 @@ Compaction* UniversalCompactionBuilder::PickIncrementalForReduceSizeAmp(
true /* enable_compression */),
Temperature::kUnknown,
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
score_, false /* deletion_compaction */,
/* trim_ts */ "", score_, false /* deletion_compaction */,
CompactionReason::kUniversalSizeAmplification);
}
@ -1223,8 +1223,8 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
GetCompressionType(vstorage_, mutable_cf_options_, output_level, 1),
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level),
Temperature::kUnknown,
/* max_subcompactions */ 0, grandparents, /* is manual */ false, score_,
false /* deletion_compaction */,
/* max_subcompactions */ 0, grandparents, /* is manual */ false,
/* trim_ts */ "", score_, false /* deletion_compaction */,
CompactionReason::kFilesMarkedForCompaction);
}
@ -1299,7 +1299,8 @@ Compaction* UniversalCompactionBuilder::PickCompactionToOldest(
true /* enable_compression */),
Temperature::kUnknown,
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
score_, false /* deletion_compaction */, compaction_reason);
/* trim_ts */ "", score_, false /* deletion_compaction */,
compaction_reason);
}
Compaction* UniversalCompactionBuilder::PickPeriodicCompaction() {

@ -663,7 +663,8 @@ class DBImpl : public DB {
const CompactRangeOptions& compact_range_options,
const Slice* begin, const Slice* end,
bool exclusive, bool disallow_trivial_move,
uint64_t max_file_num_to_ignore);
uint64_t max_file_num_to_ignore,
const std::string& trim_ts);
// Return an internal iterator over the current state of the database.
// The keys of this iterator are internal keys (see format.h).
@ -1247,7 +1248,8 @@ class DBImpl : public DB {
Status CompactRangeInternal(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end);
const Slice* begin, const Slice* end,
const std::string& trim_ts);
// The following two functions can only be called when:
// 1. WriteThread::Writer::EnterUnbatched() is used.

@ -927,7 +927,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
size_t ts_sz = ucmp->timestamp_size();
if (ts_sz == 0) {
return CompactRangeInternal(options, column_family, begin_without_ts,
end_without_ts);
end_without_ts, "" /*trim_ts*/);
}
std::string begin_str;
@ -949,7 +949,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
Slice* end_with_ts = end_without_ts ? &end : nullptr;
return CompactRangeInternal(options, column_family, begin_with_ts,
end_with_ts);
end_with_ts, "" /*trim_ts*/);
}
Status DBImpl::IncreaseFullHistoryTsLow(ColumnFamilyHandle* column_family,
@ -995,7 +995,8 @@ Status DBImpl::IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd,
Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) {
const Slice* begin, const Slice* end,
const std::string& trim_ts) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
auto cfd = cfh->cfd();
@ -1066,7 +1067,7 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
}
s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
final_output_level, options, begin, end, exclusive,
false, port::kMaxUint64);
false, port::kMaxUint64, trim_ts);
} else {
int first_overlapped_level = kInvalidLevel;
int max_overlapped_level = kInvalidLevel;
@ -1152,9 +1153,13 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
disallow_trivial_move = true;
}
}
// trim_ts need real compaction to remove latest record
if (!trim_ts.empty()) {
disallow_trivial_move = true;
}
s = RunManualCompaction(cfd, level, output_level, options, begin, end,
exclusive, disallow_trivial_move,
max_file_num_to_ignore);
max_file_num_to_ignore, trim_ts);
if (!s.ok()) {
break;
}
@ -1393,7 +1398,8 @@ Status DBImpl::CompactFilesImpl(
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, Env::Priority::USER, io_tracer_,
&manual_compaction_paused_, nullptr, db_id_, db_session_id_,
c->column_family_data()->GetFullHistoryTsLow(), &blob_callback_);
c->column_family_data()->GetFullHistoryTsLow(), c->trim_ts(),
&blob_callback_);
// Creating a compaction influences the compaction score because the score
// takes running compactions into account (by skipping files that are already
@ -1782,7 +1788,7 @@ Status DBImpl::RunManualCompaction(
ColumnFamilyData* cfd, int input_level, int output_level,
const CompactRangeOptions& compact_range_options, const Slice* begin,
const Slice* end, bool exclusive, bool disallow_trivial_move,
uint64_t max_file_num_to_ignore) {
uint64_t max_file_num_to_ignore, const std::string& trim_ts) {
assert(input_level == ColumnFamilyData::kCompactAllLevels ||
input_level >= 0);
@ -1900,7 +1906,7 @@ Status DBImpl::RunManualCompaction(
*manual.cfd->GetLatestMutableCFOptions(), mutable_db_options_,
manual.input_level, manual.output_level, compact_range_options,
manual.begin, manual.end, &manual.manual_end, &manual_conflict,
max_file_num_to_ignore)) == nullptr &&
max_file_num_to_ignore, trim_ts)) == nullptr &&
manual_conflict))) {
// exclusive manual compactions should not see a conflict during
// CompactRange
@ -3382,7 +3388,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
is_manual ? &manual_compaction_paused_ : nullptr,
is_manual ? manual_compaction->canceled : nullptr, db_id_,
db_session_id_, c->column_family_data()->GetFullHistoryTsLow(),
&blob_callback_);
c->trim_ts(), &blob_callback_);
compaction_job.Prepare();
NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,

@ -120,7 +120,8 @@ Status DBImpl::TEST_CompactRange(int level, const Slice* begin,
: level + 1;
return RunManualCompaction(cfd, level, output_level, CompactRangeOptions(),
begin, end, true, disallow_trivial_move,
port::kMaxUint64 /*max_file_num_to_ignore*/);
port::kMaxUint64 /*max_file_num_to_ignore*/,
"" /*trim_ts*/);
}
Status DBImpl::TEST_SwitchMemtable(ColumnFamilyData* cfd) {

@ -1546,6 +1546,72 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
!kSeqPerBatch, kBatchPerTxn);
}
// TODO: Implement the trimming in flush code path.
// TODO: Perform trimming before inserting into memtable during recovery.
// TODO: Pick files with max_timestamp > trim_ts by each file's timestamp meta
// info, and handle only these files to reduce io.
Status DB::OpenAndTrimHistory(
const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
std::string trim_ts) {
assert(dbptr != nullptr);
assert(handles != nullptr);
auto validate_options = [&db_options] {
if (db_options.avoid_flush_during_recovery) {
return Status::InvalidArgument(
"avoid_flush_during_recovery incompatible with "
"OpenAndTrimHistory");
}
return Status::OK();
};
auto s = validate_options();
if (!s.ok()) {
return s;
}
DB* db = nullptr;
s = DB::Open(db_options, dbname, column_families, handles, &db);
if (!s.ok()) {
return s;
}
assert(db);
CompactRangeOptions options;
options.bottommost_level_compaction =
BottommostLevelCompaction::kForceOptimized;
auto db_impl = static_cast_with_check<DBImpl>(db);
for (auto handle : *handles) {
assert(handle != nullptr);
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(handle);
auto cfd = cfh->cfd();
assert(cfd != nullptr);
// Only compact column families with timestamp enabled
if (cfd->user_comparator() != nullptr &&
cfd->user_comparator()->timestamp_size() > 0) {
s = db_impl->CompactRangeInternal(options, handle, nullptr, nullptr,
trim_ts);
if (!s.ok()) {
break;
}
}
}
auto clean_op = [&handles, &db] {
for (auto handle : *handles) {
auto temp_s = db->DestroyColumnFamilyHandle(handle);
assert(temp_s.ok());
}
handles->clear();
delete db;
};
if (!s.ok()) {
clean_op();
return s;
}
*dbptr = db;
return s;
}
IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
size_t preallocate_block_size,
log::Writer** new_log) {

@ -500,7 +500,7 @@ TEST_F(DBRangeDelTest, ValidUniversalSubcompactionBoundaries) {
1 /* input_level */, 2 /* output_level */, CompactRangeOptions(),
nullptr /* begin */, nullptr /* end */, true /* exclusive */,
true /* disallow_trivial_move */,
port::kMaxUint64 /* max_file_num_to_ignore */));
port::kMaxUint64 /* max_file_num_to_ignore */, "" /*trim_ts*/));
}
#endif // ROCKSDB_LITE

@ -704,6 +704,77 @@ TEST_F(DBBasicTestWithTimestamp, SimpleIterate) {
Close();
}
TEST_F(DBBasicTestWithTimestamp, TrimHistoryTest) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
auto check_value_by_ts = [](DB* db, Slice key, std::string readTs,
Status status, std::string checkValue) {
ReadOptions ropts;
Slice ts = readTs;
ropts.timestamp = &ts;
std::string value;
Status s = db->Get(ropts, key, &value);
ASSERT_TRUE(s == status);
if (s.ok()) {
ASSERT_EQ(checkValue, value);
}
};
// Construct data of different versions with different ts
ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(2, 0), "v1"));
ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(4, 0), "v2"));
ASSERT_OK(db_->Delete(WriteOptions(), "k1", Timestamp(5, 0)));
ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(6, 0), "v3"));
check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v3");
ASSERT_OK(Flush());
Close();
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
DBOptions db_options(options);
// Trim data whose version > Timestamp(5, 0), read(k1, ts(7)) <- NOT_FOUND.
ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families,
&handles_, &db_, Timestamp(5, 0)));
check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::NotFound(), "");
Close();
// Trim data whose timestamp > Timestamp(4, 0), read(k1, ts(7)) <- v2
ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families,
&handles_, &db_, Timestamp(4, 0)));
check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v2");
Close();
}
TEST_F(DBBasicTestWithTimestamp, OpenAndTrimHistoryInvalidOptionTest) {
Destroy(last_options_);
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
DBOptions db_options(options);
// OpenAndTrimHistory should not work with avoid_flush_during_recovery
db_options.avoid_flush_during_recovery = true;
ASSERT_TRUE(DB::OpenAndTrimHistory(db_options, dbname_, column_families,
&handles_, &db_, Timestamp(0, 0))
.IsInvalidArgument());
}
#ifndef ROCKSDB_LITE
TEST_F(DBBasicTestWithTimestamp, GetTimestampTableProperties) {
Options options = CurrentOptions();

@ -207,6 +207,13 @@ inline Slice ExtractTimestampFromUserKey(const Slice& user_key, size_t ts_sz) {
return Slice(user_key.data() + user_key.size() - ts_sz, ts_sz);
}
inline Slice ExtractTimestampFromKey(const Slice& internal_key, size_t ts_sz) {
const size_t key_size = internal_key.size();
assert(key_size >= kNumInternalBytes + ts_sz);
return Slice(internal_key.data() + key_size - ts_sz - kNumInternalBytes,
ts_sz);
}
inline uint64_t ExtractInternalKeyFooter(const Slice& internal_key) {
assert(internal_key.size() >= kNumInternalBytes);
const size_t n = internal_key.size();

@ -0,0 +1,91 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <string>
#include <vector>
#include "db/dbformat.h"
#include "rocksdb/iterator.h"
#include "rocksdb/slice.h"
#include "table/internal_iterator.h"
namespace ROCKSDB_NAMESPACE {
class HistoryTrimmingIterator : public InternalIterator {
public:
explicit HistoryTrimmingIterator(InternalIterator* input,
const Comparator* cmp, const std::string& ts)
: input_(input), filter_ts_(ts), cmp_(cmp) {
assert(cmp_->timestamp_size() > 0 && !ts.empty());
}
bool filter() const {
if (!input_->Valid()) {
return true;
}
Slice current_ts = ExtractTimestampFromKey(key(), cmp_->timestamp_size());
return cmp_->CompareTimestamp(current_ts, Slice(filter_ts_)) <= 0;
}
bool Valid() const override { return input_->Valid(); }
void SeekToFirst() override {
input_->SeekToFirst();
while (!filter()) {
input_->Next();
}
}
void SeekToLast() override {
input_->SeekToLast();
while (!filter()) {
input_->Prev();
}
}
void Seek(const Slice& target) override {
input_->Seek(target);
while (!filter()) {
input_->Next();
}
}
void SeekForPrev(const Slice& target) override {
input_->SeekForPrev(target);
while (!filter()) {
input_->Prev();
}
}
void Next() override {
do {
input_->Next();
} while (!filter());
}
void Prev() override {
do {
input_->Prev();
} while (!filter());
}
Slice key() const override { return input_->key(); }
Slice value() const override { return input_->value(); }
Status status() const override { return input_->status(); }
bool IsKeyPinned() const override { return input_->IsKeyPinned(); }
bool IsValuePinned() const override { return input_->IsValuePinned(); }
private:
InternalIterator* input_;
const std::string filter_ts_;
const Comparator* const cmp_;
};
} // namespace ROCKSDB_NAMESPACE

@ -274,6 +274,19 @@ class DB {
const std::string& input, std::string* output,
const CompactionServiceOptionsOverride& override_options);
// Experimental and subject to change
// Open DB and trim data newer than specified timestamp.
// The trim_ts specified the user-defined timestamp trim bound.
// This API should only be used at timestamp enabled column families recovery.
// If some input column families do not support timestamp, nothing will
// be happened to them. The data with timestamp > trim_ts
// will be removed after this API returns successfully.
static Status OpenAndTrimHistory(
const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
std::string trim_ts);
virtual Status Resume() { return Status::NotSupported(); }
// Close the DB by releasing resources, closing files etc. This should be

Loading…
Cancel
Save