diff --git a/HISTORY.md b/HISTORY.md index ecc8bf1cb..780eaac52 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,8 @@ # Rocksdb Change Log ## Unreleased +### Public API changes +* Add DB::OpenAndTrimHistory API. This API will open DB and trim data to the timestamp specified by trim_ts (The data with timestamp larger than specified trim bound will be removed). This API should only be used at a timestamp-enabled column families recovery. If the column family doesn't have timestamp enabled, this API won't trim any data on that column family. This API is not compatible with avoid_flush_during_recovery option. + ### New Features * Allow WriteBatchWithIndex to index a WriteBatch that includes keys with user-defined timestamps. The index itself does not have timestamp. * Add support for user-defined timestamps to write-committed transaction without API change. The `TransactionDB` layer APIs do not allow timestamps because we require that all user-defined-timestamps-aware operations go through the `Transaction` APIs. diff --git a/db/column_family.cc b/db/column_family.cc index a2885515c..e4fb0a77d 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -1164,12 +1164,12 @@ Compaction* ColumnFamilyData::CompactRange( int output_level, const CompactRangeOptions& compact_range_options, const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end, bool* conflict, - uint64_t max_file_num_to_ignore) { + uint64_t max_file_num_to_ignore, const std::string& trim_ts) { auto* result = compaction_picker_->CompactRange( GetName(), mutable_cf_options, mutable_db_options, current_->storage_info(), input_level, output_level, compact_range_options, begin, end, compaction_end, conflict, - max_file_num_to_ignore); + max_file_num_to_ignore, trim_ts); if (result != nullptr) { result->SetInputVersion(current_); } diff --git a/db/column_family.h b/db/column_family.h index a28f5d7e9..81020afcf 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -413,7 +413,8 @@ class ColumnFamilyData { const CompactRangeOptions& compact_range_options, const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end, bool* manual_conflict, - uint64_t max_file_num_to_ignore); + uint64_t max_file_num_to_ignore, + const std::string& trim_ts); CompactionPicker* compaction_picker() { return compaction_picker_.get(); } // thread-safe diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index 01137c4f3..edda2fe71 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -213,8 +213,8 @@ Compaction::Compaction( uint32_t _output_path_id, CompressionType _compression, CompressionOptions _compression_opts, Temperature _output_temperature, uint32_t _max_subcompactions, std::vector _grandparents, - bool _manual_compaction, double _score, bool _deletion_compaction, - CompactionReason _compaction_reason) + bool _manual_compaction, const std::string& _trim_ts, double _score, + bool _deletion_compaction, CompactionReason _compaction_reason) : input_vstorage_(vstorage), start_level_(_inputs[0].level), output_level_(_output_level), @@ -237,6 +237,7 @@ Compaction::Compaction( bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)), is_full_compaction_(IsFullCompaction(vstorage, inputs_)), is_manual_compaction_(_manual_compaction), + trim_ts_(_trim_ts), is_trivial_move_(false), compaction_reason_(_compaction_reason), notify_on_compaction_completion_(false) { diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h index 1d345f4d8..fc473e293 100644 --- a/db/compaction/compaction.h +++ b/db/compaction/compaction.h @@ -79,8 +79,8 @@ class Compaction { CompressionOptions compression_opts, Temperature output_temperature, uint32_t max_subcompactions, std::vector grandparents, - bool manual_compaction = false, double score = -1, - bool deletion_compaction = false, + bool manual_compaction = false, const std::string& trim_ts = "", + double score = -1, bool deletion_compaction = false, CompactionReason compaction_reason = CompactionReason::kUnknown); // No copying allowed @@ -208,6 +208,8 @@ class Compaction { // Was this compaction triggered manually by the client? bool is_manual_compaction() const { return is_manual_compaction_; } + std::string trim_ts() const { return trim_ts_; } + // Used when allow_trivial_move option is set in // Universal compaction. If all the input files are // non overlapping, then is_trivial_move_ variable @@ -385,6 +387,9 @@ class Compaction { // Is this compaction requested by the client? const bool is_manual_compaction_; + // The data with timestamp > trim_ts_ will be removed + const std::string trim_ts_; + // True if we can do trivial move in Universal multi level // compaction bool is_trivial_move_; diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 35ad06bac..524968b16 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -31,6 +31,7 @@ #include "db/dbformat.h" #include "db/error_handler.h" #include "db/event_helpers.h" +#include "db/history_trimming_iterator.h" #include "db/log_reader.h" #include "db/log_writer.h" #include "db/memtable.h" @@ -429,7 +430,8 @@ CompactionJob::CompactionJob( const std::atomic* manual_compaction_paused, const std::atomic* manual_compaction_canceled, const std::string& db_id, const std::string& db_session_id, - std::string full_history_ts_low, BlobFileCompletionCallback* blob_callback) + std::string full_history_ts_low, std::string trim_ts, + BlobFileCompletionCallback* blob_callback) : compact_(new CompactionState(compaction)), compaction_stats_(compaction->compaction_reason(), 1), db_options_(db_options), @@ -468,6 +470,7 @@ CompactionJob::CompactionJob( measure_io_stats_(measure_io_stats), thread_pri_(thread_pri), full_history_ts_low_(std::move(full_history_ts_low)), + trim_ts_(std::move(trim_ts)), blob_callback_(blob_callback) { assert(compaction_job_stats_ != nullptr); assert(log_buffer_ != nullptr); @@ -1380,21 +1383,28 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { std::unique_ptr clip; if (start || end) { - clip.reset(new ClippingIterator( + clip = std::make_unique( raw_input.get(), start ? &start_slice : nullptr, - end ? &end_slice : nullptr, &cfd->internal_comparator())); + end ? &end_slice : nullptr, &cfd->internal_comparator()); input = clip.get(); } std::unique_ptr blob_counter; if (sub_compact->compaction->DoesInputReferenceBlobFiles()) { - sub_compact->blob_garbage_meter.reset(new BlobGarbageMeter); - blob_counter.reset( - new BlobCountingIterator(input, sub_compact->blob_garbage_meter.get())); + sub_compact->blob_garbage_meter = std::make_unique(); + blob_counter = std::make_unique( + input, sub_compact->blob_garbage_meter.get()); input = blob_counter.get(); } + std::unique_ptr trim_history_iter; + if (cfd->user_comparator()->timestamp_size() > 0 && !trim_ts_.empty()) { + trim_history_iter = std::make_unique( + input, cfd->user_comparator(), trim_ts_); + input = trim_history_iter.get(); + } + input->SeekToFirst(); AutoThreadOperationStageUpdater stage_updater( diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 0f16bb40c..2b6b1c2bc 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -82,7 +82,7 @@ class CompactionJob { const std::atomic* manual_compaction_paused = nullptr, const std::atomic* manual_compaction_canceled = nullptr, const std::string& db_id = "", const std::string& db_session_id = "", - std::string full_history_ts_low = "", + std::string full_history_ts_low = "", std::string trim_ts = "", BlobFileCompletionCallback* blob_callback = nullptr); virtual ~CompactionJob(); @@ -226,6 +226,7 @@ class CompactionJob { std::vector sizes_; Env::Priority thread_pri_; std::string full_history_ts_low_; + std::string trim_ts_; BlobFileCompletionCallback* blob_callback_; uint64_t GetCompactionId(SubcompactionState* sub_compact); diff --git a/db/compaction/compaction_picker.cc b/db/compaction/compaction_picker.cc index 2d662873b..ae86d7894 100644 --- a/db/compaction/compaction_picker.cc +++ b/db/compaction/compaction_picker.cc @@ -571,7 +571,7 @@ Compaction* CompactionPicker::CompactRange( int input_level, int output_level, const CompactRangeOptions& compact_range_options, const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end, bool* manual_conflict, - uint64_t max_file_num_to_ignore) { + uint64_t max_file_num_to_ignore, const std::string& trim_ts) { // CompactionPickerFIFO has its own implementation of compact range assert(ioptions_.compaction_style != kCompactionStyleFIFO); @@ -640,8 +640,7 @@ Compaction* CompactionPicker::CompactRange( GetCompressionType(vstorage, mutable_cf_options, output_level, 1), GetCompressionOptions(mutable_cf_options, vstorage, output_level), Temperature::kUnknown, compact_range_options.max_subcompactions, - /* grandparents */ {}, - /* is manual */ true); + /* grandparents */ {}, /* is manual */ true, trim_ts); RegisterCompaction(c); vstorage->ComputeCompactionScore(ioptions_, mutable_cf_options); return c; @@ -820,7 +819,7 @@ Compaction* CompactionPicker::CompactRange( GetCompressionOptions(mutable_cf_options, vstorage, output_level), Temperature::kUnknown, compact_range_options.max_subcompactions, std::move(grandparents), - /* is manual compaction */ true); + /* is manual compaction */ true, trim_ts); TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction); RegisterCompaction(compaction); diff --git a/db/compaction/compaction_picker.h b/db/compaction/compaction_picker.h index a653aed43..5b0b1b7fb 100644 --- a/db/compaction/compaction_picker.h +++ b/db/compaction/compaction_picker.h @@ -78,7 +78,7 @@ class CompactionPicker { const CompactRangeOptions& compact_range_options, const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end, bool* manual_conflict, - uint64_t max_file_num_to_ignore); + uint64_t max_file_num_to_ignore, const std::string& trim_ts); // The maximum allowed output level. Default value is NumberLevels() - 1. virtual int MaxOutputLevel() const { return NumberLevels() - 1; } @@ -270,7 +270,8 @@ class NullCompactionPicker : public CompactionPicker { const InternalKey* /*end*/, InternalKey** /*compaction_end*/, bool* /*manual_conflict*/, - uint64_t /*max_file_num_to_ignore*/) override { + uint64_t /*max_file_num_to_ignore*/, + const std::string& /*trim_ts*/) override { return nullptr; } diff --git a/db/compaction/compaction_picker_fifo.cc b/db/compaction/compaction_picker_fifo.cc index 5f880fc47..6ebdf674c 100644 --- a/db/compaction/compaction_picker_fifo.cc +++ b/db/compaction/compaction_picker_fifo.cc @@ -115,7 +115,7 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction( std::move(inputs), 0, 0, 0, 0, kNoCompression, mutable_cf_options.compression_opts, Temperature::kUnknown, /* max_subcompactions */ 0, {}, /* is manual */ false, - vstorage->CompactionScore(0), + /* trim_ts */ "", vstorage->CompactionScore(0), /* is deletion compaction */ true, CompactionReason::kFIFOTtl); return c; } @@ -157,8 +157,8 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction( 0 /* max compaction bytes, not applicable */, 0 /* output path ID */, mutable_cf_options.compression, mutable_cf_options.compression_opts, Temperature::kUnknown, - 0 /* max_subcompactions */, {}, - /* is manual */ false, vstorage->CompactionScore(0), + 0 /* max_subcompactions */, {}, /* is manual */ false, + /* trim_ts */ "", vstorage->CompactionScore(0), /* is deletion compaction */ false, CompactionReason::kFIFOReduceNumFiles); return c; @@ -208,7 +208,7 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction( std::move(inputs), 0, 0, 0, 0, kNoCompression, mutable_cf_options.compression_opts, Temperature::kUnknown, /* max_subcompactions */ 0, {}, /* is manual */ false, - vstorage->CompactionScore(0), + /* trim_ts */ "", vstorage->CompactionScore(0), /* is deletion compaction */ true, CompactionReason::kFIFOMaxSize); return c; } @@ -313,7 +313,7 @@ Compaction* FIFOCompactionPicker::PickCompactionToWarm( 0 /* max compaction bytes, not applicable */, 0 /* output path ID */, mutable_cf_options.compression, mutable_cf_options.compression_opts, Temperature::kWarm, - /* max_subcompactions */ 0, {}, /* is manual */ false, + /* max_subcompactions */ 0, {}, /* is manual */ false, /* trim_ts */ "", vstorage->CompactionScore(0), /* is deletion compaction */ false, CompactionReason::kChangeTemperature); return c; @@ -349,7 +349,7 @@ Compaction* FIFOCompactionPicker::CompactRange( const CompactRangeOptions& /*compact_range_options*/, const InternalKey* /*begin*/, const InternalKey* /*end*/, InternalKey** compaction_end, bool* /*manual_conflict*/, - uint64_t /*max_file_num_to_ignore*/) { + uint64_t /*max_file_num_to_ignore*/, const std::string& /*trim_ts*/) { #ifdef NDEBUG (void)input_level; (void)output_level; diff --git a/db/compaction/compaction_picker_fifo.h b/db/compaction/compaction_picker_fifo.h index b0d58aa9d..544259f38 100644 --- a/db/compaction/compaction_picker_fifo.h +++ b/db/compaction/compaction_picker_fifo.h @@ -32,7 +32,7 @@ class FIFOCompactionPicker : public CompactionPicker { const CompactRangeOptions& compact_range_options, const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end, bool* manual_conflict, - uint64_t max_file_num_to_ignore) override; + uint64_t max_file_num_to_ignore, const std::string& trim_ts) override; // The maximum allowed output level. Always returns 0. virtual int MaxOutputLevel() const override { return 0; } diff --git a/db/compaction/compaction_picker_level.cc b/db/compaction/compaction_picker_level.cc index a32d9b46d..31b76fb69 100644 --- a/db/compaction/compaction_picker_level.cc +++ b/db/compaction/compaction_picker_level.cc @@ -348,7 +348,8 @@ Compaction* LevelCompactionBuilder::GetCompaction() { GetCompressionOptions(mutable_cf_options_, vstorage_, output_level_), Temperature::kUnknown, /* max_subcompactions */ 0, std::move(grandparents_), is_manual_, - start_level_score_, false /* deletion_compaction */, compaction_reason_); + /* trim_ts */ "", start_level_score_, false /* deletion_compaction */, + compaction_reason_); // If it's level 0 compaction, make sure we don't execute any other level 0 // compactions in parallel diff --git a/db/compaction/compaction_picker_test.cc b/db/compaction/compaction_picker_test.cc index 1bbf7bc27..695d730fb 100644 --- a/db/compaction/compaction_picker_test.cc +++ b/db/compaction/compaction_picker_test.cc @@ -2648,12 +2648,13 @@ TEST_F(CompactionPickerTest, UniversalMarkedManualCompaction) { ASSERT_EQ(3U, vstorage_->FilesMarkedForCompaction().size()); bool manual_conflict = false; - InternalKey* manual_end = NULL; + InternalKey* manual_end = nullptr; std::unique_ptr compaction( universal_compaction_picker.CompactRange( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - ColumnFamilyData::kCompactAllLevels, 6, CompactRangeOptions(), NULL, - NULL, &manual_end, &manual_conflict, port::kMaxUint64)); + ColumnFamilyData::kCompactAllLevels, 6, CompactRangeOptions(), + nullptr, nullptr, &manual_end, &manual_conflict, port::kMaxUint64, + "")); ASSERT_TRUE(compaction); diff --git a/db/compaction/compaction_picker_universal.cc b/db/compaction/compaction_picker_universal.cc index fd6f7e590..5ca2c41ea 100644 --- a/db/compaction/compaction_picker_universal.cc +++ b/db/compaction/compaction_picker_universal.cc @@ -757,7 +757,7 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns( start_level, enable_compression), Temperature::kUnknown, /* max_subcompactions */ 0, grandparents, - /* is manual */ false, score_, + /* is manual */ false, /* trim_ts */ "", score_, false /* deletion_compaction */, compaction_reason); } @@ -1082,7 +1082,7 @@ Compaction* UniversalCompactionBuilder::PickIncrementalForReduceSizeAmp( true /* enable_compression */), Temperature::kUnknown, /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false, - score_, false /* deletion_compaction */, + /* trim_ts */ "", score_, false /* deletion_compaction */, CompactionReason::kUniversalSizeAmplification); } @@ -1223,8 +1223,8 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() { GetCompressionType(vstorage_, mutable_cf_options_, output_level, 1), GetCompressionOptions(mutable_cf_options_, vstorage_, output_level), Temperature::kUnknown, - /* max_subcompactions */ 0, grandparents, /* is manual */ false, score_, - false /* deletion_compaction */, + /* max_subcompactions */ 0, grandparents, /* is manual */ false, + /* trim_ts */ "", score_, false /* deletion_compaction */, CompactionReason::kFilesMarkedForCompaction); } @@ -1299,7 +1299,8 @@ Compaction* UniversalCompactionBuilder::PickCompactionToOldest( true /* enable_compression */), Temperature::kUnknown, /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false, - score_, false /* deletion_compaction */, compaction_reason); + /* trim_ts */ "", score_, false /* deletion_compaction */, + compaction_reason); } Compaction* UniversalCompactionBuilder::PickPeriodicCompaction() { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index c28248708..d08652d06 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -663,7 +663,8 @@ class DBImpl : public DB { const CompactRangeOptions& compact_range_options, const Slice* begin, const Slice* end, bool exclusive, bool disallow_trivial_move, - uint64_t max_file_num_to_ignore); + uint64_t max_file_num_to_ignore, + const std::string& trim_ts); // Return an internal iterator over the current state of the database. // The keys of this iterator are internal keys (see format.h). @@ -1247,7 +1248,8 @@ class DBImpl : public DB { Status CompactRangeInternal(const CompactRangeOptions& options, ColumnFamilyHandle* column_family, - const Slice* begin, const Slice* end); + const Slice* begin, const Slice* end, + const std::string& trim_ts); // The following two functions can only be called when: // 1. WriteThread::Writer::EnterUnbatched() is used. diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 93de350c2..c55448077 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -927,7 +927,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, size_t ts_sz = ucmp->timestamp_size(); if (ts_sz == 0) { return CompactRangeInternal(options, column_family, begin_without_ts, - end_without_ts); + end_without_ts, "" /*trim_ts*/); } std::string begin_str; @@ -949,7 +949,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, Slice* end_with_ts = end_without_ts ? &end : nullptr; return CompactRangeInternal(options, column_family, begin_with_ts, - end_with_ts); + end_with_ts, "" /*trim_ts*/); } Status DBImpl::IncreaseFullHistoryTsLow(ColumnFamilyHandle* column_family, @@ -995,7 +995,8 @@ Status DBImpl::IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd, Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options, ColumnFamilyHandle* column_family, - const Slice* begin, const Slice* end) { + const Slice* begin, const Slice* end, + const std::string& trim_ts) { auto cfh = static_cast_with_check(column_family); auto cfd = cfh->cfd(); @@ -1066,7 +1067,7 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options, } s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels, final_output_level, options, begin, end, exclusive, - false, port::kMaxUint64); + false, port::kMaxUint64, trim_ts); } else { int first_overlapped_level = kInvalidLevel; int max_overlapped_level = kInvalidLevel; @@ -1152,9 +1153,13 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options, disallow_trivial_move = true; } } + // trim_ts need real compaction to remove latest record + if (!trim_ts.empty()) { + disallow_trivial_move = true; + } s = RunManualCompaction(cfd, level, output_level, options, begin, end, exclusive, disallow_trivial_move, - max_file_num_to_ignore); + max_file_num_to_ignore, trim_ts); if (!s.ok()) { break; } @@ -1393,7 +1398,8 @@ Status DBImpl::CompactFilesImpl( c->mutable_cf_options()->report_bg_io_stats, dbname_, &compaction_job_stats, Env::Priority::USER, io_tracer_, &manual_compaction_paused_, nullptr, db_id_, db_session_id_, - c->column_family_data()->GetFullHistoryTsLow(), &blob_callback_); + c->column_family_data()->GetFullHistoryTsLow(), c->trim_ts(), + &blob_callback_); // Creating a compaction influences the compaction score because the score // takes running compactions into account (by skipping files that are already @@ -1782,7 +1788,7 @@ Status DBImpl::RunManualCompaction( ColumnFamilyData* cfd, int input_level, int output_level, const CompactRangeOptions& compact_range_options, const Slice* begin, const Slice* end, bool exclusive, bool disallow_trivial_move, - uint64_t max_file_num_to_ignore) { + uint64_t max_file_num_to_ignore, const std::string& trim_ts) { assert(input_level == ColumnFamilyData::kCompactAllLevels || input_level >= 0); @@ -1900,7 +1906,7 @@ Status DBImpl::RunManualCompaction( *manual.cfd->GetLatestMutableCFOptions(), mutable_db_options_, manual.input_level, manual.output_level, compact_range_options, manual.begin, manual.end, &manual.manual_end, &manual_conflict, - max_file_num_to_ignore)) == nullptr && + max_file_num_to_ignore, trim_ts)) == nullptr && manual_conflict))) { // exclusive manual compactions should not see a conflict during // CompactRange @@ -3382,7 +3388,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, is_manual ? &manual_compaction_paused_ : nullptr, is_manual ? manual_compaction->canceled : nullptr, db_id_, db_session_id_, c->column_family_data()->GetFullHistoryTsLow(), - &blob_callback_); + c->trim_ts(), &blob_callback_); compaction_job.Prepare(); NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index 783037b4e..7bbd207d9 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -120,7 +120,8 @@ Status DBImpl::TEST_CompactRange(int level, const Slice* begin, : level + 1; return RunManualCompaction(cfd, level, output_level, CompactRangeOptions(), begin, end, true, disallow_trivial_move, - port::kMaxUint64 /*max_file_num_to_ignore*/); + port::kMaxUint64 /*max_file_num_to_ignore*/, + "" /*trim_ts*/); } Status DBImpl::TEST_SwitchMemtable(ColumnFamilyData* cfd) { diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 5dd8be655..dbbedf049 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1546,6 +1546,72 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, !kSeqPerBatch, kBatchPerTxn); } +// TODO: Implement the trimming in flush code path. +// TODO: Perform trimming before inserting into memtable during recovery. +// TODO: Pick files with max_timestamp > trim_ts by each file's timestamp meta +// info, and handle only these files to reduce io. +Status DB::OpenAndTrimHistory( + const DBOptions& db_options, const std::string& dbname, + const std::vector& column_families, + std::vector* handles, DB** dbptr, + std::string trim_ts) { + assert(dbptr != nullptr); + assert(handles != nullptr); + auto validate_options = [&db_options] { + if (db_options.avoid_flush_during_recovery) { + return Status::InvalidArgument( + "avoid_flush_during_recovery incompatible with " + "OpenAndTrimHistory"); + } + return Status::OK(); + }; + auto s = validate_options(); + if (!s.ok()) { + return s; + } + + DB* db = nullptr; + s = DB::Open(db_options, dbname, column_families, handles, &db); + if (!s.ok()) { + return s; + } + assert(db); + CompactRangeOptions options; + options.bottommost_level_compaction = + BottommostLevelCompaction::kForceOptimized; + auto db_impl = static_cast_with_check(db); + for (auto handle : *handles) { + assert(handle != nullptr); + auto cfh = static_cast_with_check(handle); + auto cfd = cfh->cfd(); + assert(cfd != nullptr); + // Only compact column families with timestamp enabled + if (cfd->user_comparator() != nullptr && + cfd->user_comparator()->timestamp_size() > 0) { + s = db_impl->CompactRangeInternal(options, handle, nullptr, nullptr, + trim_ts); + if (!s.ok()) { + break; + } + } + } + auto clean_op = [&handles, &db] { + for (auto handle : *handles) { + auto temp_s = db->DestroyColumnFamilyHandle(handle); + assert(temp_s.ok()); + } + handles->clear(); + delete db; + }; + if (!s.ok()) { + clean_op(); + return s; + } + + *dbptr = db; + return s; +} + IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number, size_t preallocate_block_size, log::Writer** new_log) { diff --git a/db/db_range_del_test.cc b/db/db_range_del_test.cc index 8967ddef5..683b701f0 100644 --- a/db/db_range_del_test.cc +++ b/db/db_range_del_test.cc @@ -500,7 +500,7 @@ TEST_F(DBRangeDelTest, ValidUniversalSubcompactionBoundaries) { 1 /* input_level */, 2 /* output_level */, CompactRangeOptions(), nullptr /* begin */, nullptr /* end */, true /* exclusive */, true /* disallow_trivial_move */, - port::kMaxUint64 /* max_file_num_to_ignore */)); + port::kMaxUint64 /* max_file_num_to_ignore */, "" /*trim_ts*/)); } #endif // ROCKSDB_LITE diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index a8526cbd4..0c14e4333 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -704,6 +704,77 @@ TEST_F(DBBasicTestWithTimestamp, SimpleIterate) { Close(); } +TEST_F(DBBasicTestWithTimestamp, TrimHistoryTest) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + DestroyAndReopen(options); + auto check_value_by_ts = [](DB* db, Slice key, std::string readTs, + Status status, std::string checkValue) { + ReadOptions ropts; + Slice ts = readTs; + ropts.timestamp = &ts; + std::string value; + Status s = db->Get(ropts, key, &value); + ASSERT_TRUE(s == status); + if (s.ok()) { + ASSERT_EQ(checkValue, value); + } + }; + // Construct data of different versions with different ts + ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(2, 0), "v1")); + ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(4, 0), "v2")); + ASSERT_OK(db_->Delete(WriteOptions(), "k1", Timestamp(5, 0))); + ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(6, 0), "v3")); + check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v3"); + ASSERT_OK(Flush()); + Close(); + + ColumnFamilyOptions cf_options(options); + std::vector column_families; + column_families.push_back( + ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); + DBOptions db_options(options); + + // Trim data whose version > Timestamp(5, 0), read(k1, ts(7)) <- NOT_FOUND. + ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families, + &handles_, &db_, Timestamp(5, 0))); + check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::NotFound(), ""); + Close(); + + // Trim data whose timestamp > Timestamp(4, 0), read(k1, ts(7)) <- v2 + ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families, + &handles_, &db_, Timestamp(4, 0))); + check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v2"); + Close(); +} + +TEST_F(DBBasicTestWithTimestamp, OpenAndTrimHistoryInvalidOptionTest) { + Destroy(last_options_); + + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + + ColumnFamilyOptions cf_options(options); + std::vector column_families; + column_families.push_back( + ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); + DBOptions db_options(options); + + // OpenAndTrimHistory should not work with avoid_flush_during_recovery + db_options.avoid_flush_during_recovery = true; + ASSERT_TRUE(DB::OpenAndTrimHistory(db_options, dbname_, column_families, + &handles_, &db_, Timestamp(0, 0)) + .IsInvalidArgument()); +} + #ifndef ROCKSDB_LITE TEST_F(DBBasicTestWithTimestamp, GetTimestampTableProperties) { Options options = CurrentOptions(); diff --git a/db/dbformat.h b/db/dbformat.h index 137f23aa6..ee9c27e76 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -207,6 +207,13 @@ inline Slice ExtractTimestampFromUserKey(const Slice& user_key, size_t ts_sz) { return Slice(user_key.data() + user_key.size() - ts_sz, ts_sz); } +inline Slice ExtractTimestampFromKey(const Slice& internal_key, size_t ts_sz) { + const size_t key_size = internal_key.size(); + assert(key_size >= kNumInternalBytes + ts_sz); + return Slice(internal_key.data() + key_size - ts_sz - kNumInternalBytes, + ts_sz); +} + inline uint64_t ExtractInternalKeyFooter(const Slice& internal_key) { assert(internal_key.size() >= kNumInternalBytes); const size_t n = internal_key.size(); diff --git a/db/history_trimming_iterator.h b/db/history_trimming_iterator.h new file mode 100644 index 000000000..b445ced33 --- /dev/null +++ b/db/history_trimming_iterator.h @@ -0,0 +1,91 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#include +#include + +#include "db/dbformat.h" +#include "rocksdb/iterator.h" +#include "rocksdb/slice.h" +#include "table/internal_iterator.h" + +namespace ROCKSDB_NAMESPACE { + +class HistoryTrimmingIterator : public InternalIterator { + public: + explicit HistoryTrimmingIterator(InternalIterator* input, + const Comparator* cmp, const std::string& ts) + : input_(input), filter_ts_(ts), cmp_(cmp) { + assert(cmp_->timestamp_size() > 0 && !ts.empty()); + } + + bool filter() const { + if (!input_->Valid()) { + return true; + } + Slice current_ts = ExtractTimestampFromKey(key(), cmp_->timestamp_size()); + return cmp_->CompareTimestamp(current_ts, Slice(filter_ts_)) <= 0; + } + + bool Valid() const override { return input_->Valid(); } + + void SeekToFirst() override { + input_->SeekToFirst(); + while (!filter()) { + input_->Next(); + } + } + + void SeekToLast() override { + input_->SeekToLast(); + while (!filter()) { + input_->Prev(); + } + } + + void Seek(const Slice& target) override { + input_->Seek(target); + while (!filter()) { + input_->Next(); + } + } + + void SeekForPrev(const Slice& target) override { + input_->SeekForPrev(target); + while (!filter()) { + input_->Prev(); + } + } + + void Next() override { + do { + input_->Next(); + } while (!filter()); + } + + void Prev() override { + do { + input_->Prev(); + } while (!filter()); + } + + Slice key() const override { return input_->key(); } + + Slice value() const override { return input_->value(); } + + Status status() const override { return input_->status(); } + + bool IsKeyPinned() const override { return input_->IsKeyPinned(); } + + bool IsValuePinned() const override { return input_->IsValuePinned(); } + + private: + InternalIterator* input_; + const std::string filter_ts_; + const Comparator* const cmp_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index abd625746..47c53d5f8 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -274,6 +274,19 @@ class DB { const std::string& input, std::string* output, const CompactionServiceOptionsOverride& override_options); + // Experimental and subject to change + // Open DB and trim data newer than specified timestamp. + // The trim_ts specified the user-defined timestamp trim bound. + // This API should only be used at timestamp enabled column families recovery. + // If some input column families do not support timestamp, nothing will + // be happened to them. The data with timestamp > trim_ts + // will be removed after this API returns successfully. + static Status OpenAndTrimHistory( + const DBOptions& db_options, const std::string& dbname, + const std::vector& column_families, + std::vector* handles, DB** dbptr, + std::string trim_ts); + virtual Status Resume() { return Status::NotSupported(); } // Close the DB by releasing resources, closing files etc. This should be