diff --git a/HISTORY.md b/HISTORY.md index cacf30fd1..d00c7aad2 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,9 @@ ## Unreleased ### Public API Change * `BackupableDBOptions::max_valid_backups_to_open == 0` now means no backups will be opened during BackupEngine initialization. Previously this condition disabled limiting backups opened. +* `DBOptions::preserve_deletes` is a new option that allows one to specify that DB should not drop tombstones for regular deletes if they have sequence number larger than what was set by the new API call `DB::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum)`. Disabled by default. +* API call `DB::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum)` was added, users who wish to preserve deletes are expected to periodically call this function to advance the cutoff seqnum (all deletes made before this seqnum can be dropped by DB). It's user responsibility to figure out how to advance the seqnum in the way so the tombstones are kept for the desired period of time, yet are eventually processed in time and don't eat up too much space. +* `ReadOptions::iter_start_seqnum` was added; if set to something > 0 user will see 2 changes in iterators behavior 1) only keys written with sequence larger than this parameter would be returned and 2) the `Slice` returned by iter->key() now points to the the memory that keep User-oriented representation of the internal key, rather than user key. New struct `FullKey` was added to represent internal keys, along with a new helper function `ParseFullKey(const Slice& internal_key, FullKey* result);`. * Deprecate trash_dir param in NewSstFileManager, right now we will rename deleted files to .trash instead of moving them to trash directory * Return an error on write if write_options.sync = true and write_options.disableWAL = true to warn user of inconsistent options. Previously we will not write to WAL and not respecting the sync options in this case. @@ -14,6 +17,7 @@ * Add a new db property "rocksdb.estimate-oldest-key-time" to return oldest data timestamp. The property is available only for FIFO compaction with compaction_options_fifo.allow_compaction = false. * Upon snapshot release, recompact bottommost files containing deleted/overwritten keys that previously could not be dropped due to the snapshot. This alleviates space-amp caused by long-held snapshots. * Support lower bound on iterators specified via `ReadOptions::iterate_lower_bound`. +* Support for differential snapshots (via iterator emitting the sequence of key-values representing the difference between DB state at two different sequence numbers). Supports preserving and emitting puts and regular deletes, doesn't support SingleDeletes, MergeOperator, Blobs and Range Deletes. ### Bug Fixes * Fix a potential data inconsistency issue during point-in-time recovery. `DB:Open()` will abort if column family inconsistency is found during PIT recovery. diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 312df2c61..be0931673 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -45,14 +45,16 @@ CompactionIterator::CompactionIterator( bool expect_valid_internal_key, RangeDelAggregator* range_del_agg, const Compaction* compaction, const CompactionFilter* compaction_filter, CompactionEventListener* compaction_listener, - const std::atomic* shutting_down) + const std::atomic* shutting_down, + const SequenceNumber preserve_deletes_seqnum) : CompactionIterator( input, cmp, merge_helper, last_sequence, snapshots, earliest_write_conflict_snapshot, snapshot_checker, env, expect_valid_internal_key, range_del_agg, std::unique_ptr( compaction ? new CompactionProxy(compaction) : nullptr), - compaction_filter, compaction_listener, shutting_down) {} + compaction_filter, compaction_listener, shutting_down, + preserve_deletes_seqnum) {} CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, @@ -63,7 +65,9 @@ CompactionIterator::CompactionIterator( std::unique_ptr compaction, const CompactionFilter* compaction_filter, CompactionEventListener* compaction_listener, - const std::atomic* shutting_down) + const std::atomic* shutting_down, + const SequenceNumber preserve_deletes_seqnum + ) : input_(input), cmp_(cmp), merge_helper_(merge_helper), @@ -79,6 +83,7 @@ CompactionIterator::CompactionIterator( compaction_listener_(compaction_listener), #endif // ROCKSDB_LITE shutting_down_(shutting_down), + preserve_deletes_seqnum_(preserve_deletes_seqnum), ignore_snapshots_(false), current_user_key_sequence_(0), current_user_key_snapshot_(0), @@ -496,6 +501,7 @@ void CompactionIterator::NextFromInput() { input_->Next(); } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion && ikey_.sequence <= earliest_snapshot_ && + ikeyNotNeededForIncrementalSnapshot() && compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, &level_ptrs_)) { // TODO(noetzli): This is the only place where we use compaction_ @@ -595,11 +601,12 @@ void CompactionIterator::PrepareOutput() { // This is safe for TransactionDB write-conflict checking since transactions // only care about sequence number larger than any active snapshots. - if ((compaction_ != nullptr && !compaction_->allow_ingest_behind()) && + if ((compaction_ != nullptr && + !compaction_->allow_ingest_behind()) && + ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ && valid_ && ikey_.sequence <= earliest_snapshot_ && - (snapshot_checker_ == nullptr || - LIKELY(snapshot_checker_->IsInSnapshot(ikey_.sequence, - earliest_snapshot_))) && + (snapshot_checker_ == nullptr || LIKELY(snapshot_checker_->IsInSnapshot( + ikey_.sequence, earliest_snapshot_))) && ikey_.type != kTypeMerge && !cmp_->Equal(compaction_->GetLargestUserKey(), ikey_.user_key)) { assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion); @@ -626,4 +633,11 @@ inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot( return kMaxSequenceNumber; } +// used in 2 places - prevents deletion markers to be dropped if they may be +// needed and disables seqnum zero-out in PrepareOutput for recent keys. +inline bool CompactionIterator::ikeyNotNeededForIncrementalSnapshot() { + return (!compaction_->preserve_deletes()) || + (ikey_.sequence < preserve_deletes_seqnum_); +} + } // namespace rocksdb diff --git a/db/compaction_iterator.h b/db/compaction_iterator.h index 2556543f3..8222f6d54 100644 --- a/db/compaction_iterator.h +++ b/db/compaction_iterator.h @@ -49,6 +49,9 @@ class CompactionIterator { virtual bool allow_ingest_behind() const { return compaction_->immutable_cf_options()->allow_ingest_behind; } + virtual bool preserve_deletes() const { + return compaction_->immutable_cf_options()->preserve_deletes; + } protected: CompactionProxy() = default; @@ -67,7 +70,8 @@ class CompactionIterator { const Compaction* compaction = nullptr, const CompactionFilter* compaction_filter = nullptr, CompactionEventListener* compaction_listener = nullptr, - const std::atomic* shutting_down = nullptr); + const std::atomic* shutting_down = nullptr, + const SequenceNumber preserve_deletes_seqnum = 0); // Constructor with custom CompactionProxy, used for tests. CompactionIterator(InternalIterator* input, const Comparator* cmp, @@ -80,7 +84,8 @@ class CompactionIterator { std::unique_ptr compaction, const CompactionFilter* compaction_filter = nullptr, CompactionEventListener* compaction_listener = nullptr, - const std::atomic* shutting_down = nullptr); + const std::atomic* shutting_down = nullptr, + const SequenceNumber preserve_deletes_seqnum = 0); ~CompactionIterator(); @@ -126,6 +131,11 @@ class CompactionIterator { inline SequenceNumber findEarliestVisibleSnapshot( SequenceNumber in, SequenceNumber* prev_snapshot); + // Checks whether the currently seen ikey_ is needed for + // incremental (differential) snapshot and hence can't be dropped + // or seqnum be zero-ed out even if all other conditions for it are met. + inline bool ikeyNotNeededForIncrementalSnapshot(); + InternalIterator* input_; const Comparator* cmp_; MergeHelper* merge_helper_; @@ -141,6 +151,7 @@ class CompactionIterator { CompactionEventListener* compaction_listener_; #endif // !ROCKSDB_LITE const std::atomic* shutting_down_; + const SequenceNumber preserve_deletes_seqnum_; bool bottommost_level_; bool valid_ = false; bool visible_at_tip_; diff --git a/db/compaction_iterator_test.cc b/db/compaction_iterator_test.cc index 1cca022eb..7c8919023 100644 --- a/db/compaction_iterator_test.cc +++ b/db/compaction_iterator_test.cc @@ -156,6 +156,8 @@ class FakeCompaction : public CompactionIterator::CompactionProxy { } virtual bool allow_ingest_behind() const { return false; } + virtual bool preserve_deletes() const {return false; } + bool key_not_exists_beyond_output_level = false; }; diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 4754b6be4..4027fafc7 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -264,7 +264,9 @@ void CompactionJob::AggregateStatistics() { CompactionJob::CompactionJob( int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, const EnvOptions env_options, VersionSet* versions, - const std::atomic* shutting_down, LogBuffer* log_buffer, + const std::atomic* shutting_down, + const SequenceNumber preserve_deletes_seqnum, + LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory, Statistics* stats, InstrumentedMutex* db_mutex, Status* db_bg_error, std::vector existing_snapshots, @@ -282,6 +284,7 @@ CompactionJob::CompactionJob( env_(db_options.env), versions_(versions), shutting_down_(shutting_down), + preserve_deletes_seqnum_(preserve_deletes_seqnum), log_buffer_(log_buffer), db_directory_(db_directory), output_directory_(output_directory), @@ -764,7 +767,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { &existing_snapshots_, earliest_write_conflict_snapshot_, snapshot_checker_, env_, false, range_del_agg.get(), sub_compact->compaction, compaction_filter, comp_event_listener, - shutting_down_)); + shutting_down_, preserve_deletes_seqnum_)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); if (c_iter->Valid() && diff --git a/db/compaction_job.h b/db/compaction_job.h index 0bb45b03a..498655a08 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -58,7 +58,9 @@ class CompactionJob { CompactionJob(int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, const EnvOptions env_options, VersionSet* versions, - const std::atomic* shutting_down, LogBuffer* log_buffer, + const std::atomic* shutting_down, + const SequenceNumber preserve_deletes_seqnum, + LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory, Statistics* stats, InstrumentedMutex* db_mutex, Status* db_bg_error, @@ -134,6 +136,7 @@ class CompactionJob { Env* env_; VersionSet* versions_; const std::atomic* shutting_down_; + const SequenceNumber preserve_deletes_seqnum_; LogBuffer* log_buffer_; Directory* db_directory_; Directory* output_directory_; diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 4012b183e..9e0da91c5 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -76,6 +76,7 @@ class CompactionJobTest : public testing::Test { table_cache_.get(), &write_buffer_manager_, &write_controller_)), shutting_down_(false), + preserve_deletes_seqnum_(0), mock_table_factory_(new mock::MockTableFactory()) { EXPECT_OK(env_->CreateDirIfMissing(dbname_)); db_options_.db_paths.emplace_back(dbname_, @@ -253,12 +254,12 @@ class CompactionJobTest : public testing::Test { // TODO(yiwu) add a mock snapshot checker and add test for it. SnapshotChecker* snapshot_checker = nullptr; CompactionJob compaction_job(0, &compaction, db_options_, env_options_, - versions_.get(), &shutting_down_, &log_buffer, + versions_.get(), &shutting_down_, + preserve_deletes_seqnum_, &log_buffer, nullptr, nullptr, nullptr, &mutex_, &bg_error_, snapshots, earliest_write_conflict_snapshot, snapshot_checker, table_cache_, &event_logger, false, false, dbname_, &compaction_job_stats_); - VerifyInitializationOfCompactionJobStats(compaction_job_stats_); compaction_job.Prepare(); @@ -294,6 +295,7 @@ class CompactionJobTest : public testing::Test { std::unique_ptr versions_; InstrumentedMutex mutex_; std::atomic shutting_down_; + SequenceNumber preserve_deletes_seqnum_; std::shared_ptr mock_table_factory_; CompactionJobStats compaction_job_stats_; ColumnFamilyData* cfd_; diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 7b826704e..757d10694 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -218,6 +218,84 @@ TEST_P(DBCompactionTestWithParam, CompactionDeletionTrigger) { } } +TEST_P(DBCompactionTestWithParam, CompactionsPreserveDeletes) { + // For each options type we test following + // - Enable preserve_deletes + // - write bunch of keys and deletes + // - Set start_seqnum to the beginning; compact; check that keys are present + // - rewind start_seqnum way forward; compact; check that keys are gone + + for (int tid = 0; tid < 3; ++tid) { + Options options = DeletionTriggerOptions(CurrentOptions()); + options.max_subcompactions = max_subcompactions_; + options.preserve_deletes=true; + options.num_levels = 2; + + if (tid == 1) { + options.skip_stats_update_on_db_open = true; + } else if (tid == 2) { + // third pass with universal compaction + options.compaction_style = kCompactionStyleUniversal; + } + + DestroyAndReopen(options); + Random rnd(301); + // highlight the default; all deletes should be preserved + SetPreserveDeletesSequenceNumber(0); + + const int kTestSize = kCDTKeysPerBuffer; + std::vector values; + for (int k = 0; k < kTestSize; ++k) { + values.push_back(RandomString(&rnd, kCDTValueSize)); + ASSERT_OK(Put(Key(k), values[k])); + } + + for (int k = 0; k < kTestSize; ++k) { + ASSERT_OK(Delete(Key(k))); + } + // to ensure we tackle all tombstones + CompactRangeOptions cro; + cro.change_level = true; + cro.target_level = 2; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->CompactRange(cro, nullptr, nullptr); + + // check that normal user iterator doesn't see anything + Iterator* db_iter = dbfull()->NewIterator(ReadOptions()); + int i = 0; + for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) { + i++; + } + ASSERT_EQ(i, 0); + delete db_iter; + + // check that iterator that sees internal keys sees tombstones + ReadOptions ro; + ro.iter_start_seqnum=1; + db_iter = dbfull()->NewIterator(ro); + i = 0; + for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) { + i++; + } + ASSERT_EQ(i, 4); + delete db_iter; + + // now all deletes should be gone + SetPreserveDeletesSequenceNumber(100000000); + dbfull()->CompactRange(cro, nullptr, nullptr); + + db_iter = dbfull()->NewIterator(ro); + i = 0; + for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) { + i++; + } + ASSERT_EQ(i, 0); + delete db_iter; + } +} + TEST_F(DBCompactionTest, SkipStatsUpdateTest) { // This test verify UpdateAccumulatedStats is not on // if options.skip_stats_update_on_db_open = true diff --git a/db/db_impl.cc b/db/db_impl.cc index 465ca7306..088860eaf 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -196,7 +196,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) manual_wal_flush_(options.manual_wal_flush), seq_per_batch_(options.seq_per_batch), // TODO(myabandeh): revise this when we change options.seq_per_batch - use_custom_gc_(options.seq_per_batch) { + use_custom_gc_(options.seq_per_batch), + preserve_deletes_(options.preserve_deletes) { env_->GetAbsolutePath(dbname, &db_absolute_path_); // Reserve ten files or so for other uses and give the rest to TableCache. @@ -218,6 +219,11 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) immutable_db_options_.Dump(immutable_db_options_.info_log.get()); mutable_db_options_.Dump(immutable_db_options_.info_log.get()); DumpSupportInfo(immutable_db_options_.info_log.get()); + + // always open the DB with 0 here, which means if preserve_deletes_==true + // we won't drop any deletion markers until SetPreserveDeletesSequenceNumber() + // is called by client and this seqnum is advanced. + preserve_deletes_seqnum_.store(0); } // Will lock the mutex_, will wait for completion if wait is true @@ -748,6 +754,15 @@ SequenceNumber DBImpl::IncAndFetchSequenceNumber() { return versions_->FetchAddLastToBeWrittenSequence(1ull) + 1ull; } +bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) { + if (seqnum > preserve_deletes_seqnum_.load()) { + preserve_deletes_seqnum_.store(seqnum); + return true; + } else { + return false; + } +} + InternalIterator* DBImpl::NewInternalIterator( Arena* arena, RangeDelAggregator* range_del_agg, ColumnFamilyHandle* column_family) { @@ -1421,6 +1436,15 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, return NewErrorIterator(Status::NotSupported( "ReadTier::kPersistedData is not yet supported in iterators.")); } + // if iterator wants internal keys, we can only proceed if + // we can guarantee the deletes haven't been processed yet + if (immutable_db_options_.preserve_deletes && + read_options.iter_start_seqnum > 0 && + read_options.iter_start_seqnum < preserve_deletes_seqnum_.load()) { + return NewErrorIterator(Status::InvalidArgument( + "Iterator requested internal keys which are too old and are not" + " guaranteed to be preserved, try larger iter_start_seqnum opt.")); + } auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); ReadCallback* read_callback = nullptr; // No read callback provided. diff --git a/db/db_impl.h b/db/db_impl.h index 24d4f8c86..c89e1fc5b 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -225,6 +225,8 @@ class DBImpl : public DB { // also on data written to the WAL but not to the memtable. SequenceNumber TEST_GetLatestVisibleSequenceNumber() const; + virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) override; + bool HasActiveSnapshotLaterThanSN(SequenceNumber sn); #ifndef ROCKSDB_LITE @@ -1319,6 +1321,13 @@ class DBImpl : public DB { const bool manual_wal_flush_; const bool seq_per_batch_; const bool use_custom_gc_; + + // Clients must periodically call SetPreserveDeletesSequenceNumber() + // to advance this seqnum. Default value is 0 which means ALL deletes are + // preserved. Note that this has no effect if DBOptions.preserve_deletes + // is set to false. + std::atomic preserve_deletes_seqnum_; + const bool preserve_deletes_; }; extern Options SanitizeOptions(const std::string& db, diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 6dc28f969..6706826c0 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -542,8 +542,9 @@ Status DBImpl::CompactFilesImpl( assert(is_snapshot_supported_ || snapshots_.empty()); CompactionJob compaction_job( job_context->job_id, c.get(), immutable_db_options_, - env_options_for_compaction_, versions_.get(), &shutting_down_, log_buffer, - directories_.GetDbDir(), directories_.GetDataDir(c->output_path_id()), + env_options_for_compaction_, versions_.get(), &shutting_down_, + preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(), + directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, &bg_error_, snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, @@ -1694,7 +1695,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, CompactionJob compaction_job( job_context->job_id, c.get(), immutable_db_options_, env_options_for_compaction_, versions_.get(), &shutting_down_, - log_buffer, directories_.GetDbDir(), + preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(), directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, &bg_error_, snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, table_cache_, &event_logger_, diff --git a/db/db_iter.cc b/db/db_iter.cc index 256cad7fe..a7d02175d 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -9,6 +9,7 @@ #include "db/db_iter.h" #include +#include #include #include "db/dbformat.h" @@ -125,7 +126,8 @@ class DBIter final: public Iterator { true /* collapse_deletions */), read_callback_(read_callback), allow_blob_(allow_blob), - is_blob_(false) { + is_blob_(false), + start_seqnum_(read_options.iter_start_seqnum) { RecordTick(statistics_, NO_ITERATORS); prefix_extractor_ = cf_options.prefix_extractor; max_skip_ = max_sequential_skip_in_iterations; @@ -164,7 +166,12 @@ class DBIter final: public Iterator { virtual bool Valid() const override { return valid_; } virtual Slice key() const override { assert(valid_); - return saved_key_.GetUserKey(); + if(start_seqnum_ > 0) { + return saved_key_.GetInternalKey(); + } else { + return saved_key_.GetUserKey(); + } + } virtual Slice value() const override { assert(valid_); @@ -305,6 +312,9 @@ class DBIter final: public Iterator { ReadCallback* read_callback_; bool allow_blob_; bool is_blob_; + // for diff snapshots we want the lower bound on the seqnum; + // if this value > 0 iterator will return internal keys + SequenceNumber start_seqnum_; // No copying allowed DBIter(const DBIter&); @@ -430,40 +440,70 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { case kTypeSingleDeletion: // Arrange to skip all upcoming entries for this key since // they are hidden by this deletion. - saved_key_.SetUserKey( + // if iterartor specified start_seqnum we + // 1) return internal key, including the type + // 2) return ikey only if ikey.seqnum >= start_seqnum_ + // not that if deletion seqnum is < start_seqnum_ we + // just skip it like in normal iterator. + if (start_seqnum_ > 0 && ikey_.sequence >= start_seqnum_) { + saved_key_.SetInternalKey(ikey_); + valid_=true; + return; + } else { + saved_key_.SetUserKey( ikey_.user_key, !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */); - skipping = true; - PERF_COUNTER_ADD(internal_delete_skipped_count, 1); + skipping = true; + PERF_COUNTER_ADD(internal_delete_skipped_count, 1); + } break; case kTypeValue: case kTypeBlobIndex: - saved_key_.SetUserKey( - ikey_.user_key, - !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */); - if (range_del_agg_.ShouldDelete( - ikey_, RangeDelAggregator::RangePositioningMode:: - kForwardTraversal)) { - // Arrange to skip all upcoming entries for this key since - // they are hidden by this deletion. - skipping = true; - num_skipped = 0; - PERF_COUNTER_ADD(internal_delete_skipped_count, 1); - } else if (ikey_.type == kTypeBlobIndex) { - if (!allow_blob_) { - ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); - status_ = Status::NotSupported( - "Encounter unexpected blob index. Please open DB with " - "rocksdb::blob_db::BlobDB instead."); - valid_ = false; - } else { - is_blob_ = true; + if (start_seqnum_ > 0) { + // we are taking incremental snapshot here + // incremental snapshots aren't supported on DB with range deletes + assert(!( + (ikey_.type == kTypeBlobIndex) && (start_seqnum_ > 0) + )); + if (ikey_.sequence >= start_seqnum_) { + saved_key_.SetInternalKey(ikey_); valid_ = true; + return; + } else { + // this key and all previous versions shouldn't be included, + // skipping + saved_key_.SetUserKey(ikey_.user_key, + !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */); + skipping = true; } - return; } else { - valid_ = true; - return; + saved_key_.SetUserKey( + ikey_.user_key, + !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */); + if (range_del_agg_.ShouldDelete( + ikey_, RangeDelAggregator::RangePositioningMode:: + kForwardTraversal)) { + // Arrange to skip all upcoming entries for this key since + // they are hidden by this deletion. + skipping = true; + num_skipped = 0; + PERF_COUNTER_ADD(internal_delete_skipped_count, 1); + } else if (ikey_.type == kTypeBlobIndex) { + if (!allow_blob_) { + ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); + status_ = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + valid_ = false; + } else { + is_blob_ = true; + valid_ = true; + } + return; + } else { + valid_ = true; + return; + } } break; case kTypeMerge: diff --git a/db/db_iter_test.cc b/db/db_iter_test.cc index eb19a8f0d..b4a5f3a58 100644 --- a/db/db_iter_test.cc +++ b/db/db_iter_test.cc @@ -2364,6 +2364,80 @@ TEST_F(DBIteratorTest, DBIterator14) { ASSERT_EQ(db_iter->value().ToString(), "4"); } +TEST_F(DBIteratorTest, DBIteratorTestDifferentialSnapshots) { + { // test that KVs earlier that iter_start_seqnum are filtered out + ReadOptions ro; + ro.iter_start_seqnum=5; + Options options; + options.statistics = rocksdb::CreateDBStatistics(); + + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + for (size_t i = 0; i < 10; ++i) { + internal_iter->AddPut(std::to_string(i), std::to_string(i) + "a"); + internal_iter->AddPut(std::to_string(i), std::to_string(i) + "b"); + internal_iter->AddPut(std::to_string(i), std::to_string(i) + "c"); + } + internal_iter->Finish(); + + std::unique_ptr db_iter( + NewDBIterator(env_, ro, ImmutableCFOptions(options), BytewiseComparator(), + internal_iter, 13, + options.max_sequential_skip_in_iterations, nullptr)); + // Expecting InternalKeys in [5,8] range with correct type + int seqnums[4] = {5,8,11,13}; + std::string user_keys[4] = {"1","2","3","4"}; + std::string values[4] = {"1c", "2c", "3c", "4b"}; + int i = 0; + for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) { + FullKey fkey; + ParseFullKey(db_iter->key(), &fkey); + ASSERT_EQ(user_keys[i], fkey.user_key.ToString()); + ASSERT_EQ(EntryType::kEntryPut, fkey.type); + ASSERT_EQ(seqnums[i], fkey.sequence); + ASSERT_EQ(values[i], db_iter->value().ToString()); + i++; + } + ASSERT_EQ(i, 4); + } + + { // Test that deletes are returned correctly as internal KVs + ReadOptions ro; + ro.iter_start_seqnum=5; + Options options; + options.statistics = rocksdb::CreateDBStatistics(); + + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + for (size_t i = 0; i < 10; ++i) { + internal_iter->AddPut(std::to_string(i), std::to_string(i) + "a"); + internal_iter->AddPut(std::to_string(i), std::to_string(i) + "b"); + internal_iter->AddDeletion(std::to_string(i)); + } + internal_iter->Finish(); + + std::unique_ptr db_iter( + NewDBIterator(env_, ro, ImmutableCFOptions(options), BytewiseComparator(), + internal_iter, 13, + options.max_sequential_skip_in_iterations, nullptr)); + // Expecting InternalKeys in [5,8] range with correct type + int seqnums[4] = {5,8,11,13}; + EntryType key_types[4] = {EntryType::kEntryDelete,EntryType::kEntryDelete, + EntryType::kEntryDelete,EntryType::kEntryPut}; + std::string user_keys[4] = {"1","2","3","4"}; + std::string values[4] = {"", "", "", "4b"}; + int i = 0; + for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) { + FullKey fkey; + ParseFullKey(db_iter->key(), &fkey); + ASSERT_EQ(user_keys[i], fkey.user_key.ToString()); + ASSERT_EQ(key_types[i], fkey.type); + ASSERT_EQ(seqnums[i], fkey.sequence); + ASSERT_EQ(values[i], db_iter->value().ToString()); + i++; + } + ASSERT_EQ(i, 4); + } +} + class DBIterWithMergeIterTest : public testing::Test { public: DBIterWithMergeIterTest() diff --git a/db/db_test.cc b/db/db_test.cc index 94a637928..6a19b7146 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2462,6 +2462,10 @@ class ModelDB : public DB { virtual SequenceNumber GetLatestSequenceNumber() const override { return 0; } + virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) override { + return true; + } + virtual ColumnFamilyHandle* DefaultColumnFamily() const override { return nullptr; } diff --git a/db/db_test_util.cc b/db/db_test_util.cc index ba30ced40..7060568c4 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -667,6 +667,10 @@ Status DBTestBase::SingleDelete(int cf, const std::string& k) { return db_->SingleDelete(WriteOptions(), handles_[cf], k); } +bool DBTestBase::SetPreserveDeletesSequenceNumber(SequenceNumber sn) { + return db_->SetPreserveDeletesSequenceNumber(sn); +} + std::string DBTestBase::Get(const std::string& k, const Snapshot* snapshot) { ReadOptions options; options.verify_checksums = true; diff --git a/db/db_test_util.h b/db/db_test_util.h index f2caa46ca..89083b501 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -829,6 +829,8 @@ class DBTestBase : public testing::Test { Status SingleDelete(int cf, const std::string& k); + bool SetPreserveDeletesSequenceNumber(SequenceNumber sn); + std::string Get(const std::string& k, const Snapshot* snapshot = nullptr); std::string Get(int cf, const std::string& k, diff --git a/db/dbformat.cc b/db/dbformat.cc index 19e45f800..e21612878 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -36,6 +36,32 @@ uint64_t PackSequenceAndType(uint64_t seq, ValueType t) { return (seq << 8) | t; } +EntryType GetEntryType(ValueType value_type) { + switch (value_type) { + case kTypeValue: + return kEntryPut; + case kTypeDeletion: + return kEntryDelete; + case kTypeSingleDeletion: + return kEntrySingleDelete; + case kTypeMerge: + return kEntryMerge; + default: + return kEntryOther; + } +} + +bool ParseFullKey(const Slice& internal_key, FullKey* fkey) { + ParsedInternalKey ikey; + if (!ParseInternalKey(internal_key, &ikey)) { + return false; + } + fkey->user_key = ikey.user_key; + fkey->sequence = ikey.sequence; + fkey->type = GetEntryType(ikey.type); + return true; +} + void UnPackSequenceAndType(uint64_t packed, uint64_t* seq, ValueType* t) { *seq = packed >> 8; *t = static_cast(packed & 0xff); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index e91815877..71ad14362 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -53,6 +53,7 @@ struct ExternalSstFileInfo; class WriteBatch; class Env; class EventListener; +enum EntryType; using std::unique_ptr; @@ -874,6 +875,14 @@ class DB { // The sequence number of the most recent transaction. virtual SequenceNumber GetLatestSequenceNumber() const = 0; + // Instructs DB to preserve deletes with sequence numbers >= passed seqnum. + // Has no effect if DBOptions.preserve_deletes is set to false. + // This function assumes that user calls this function with monotonically + // increasing seqnums (otherwise we can't guarantee that a particular delete + // hasn't been already processed); returns true if the value was successfully + // updated, false if user attempted to call if with seqnum <= current value. + virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) = 0; + #ifndef ROCKSDB_LITE // Prevent file deletions. Compactions will continue to occur, diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index dfc960bd3..09cd15321 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -888,6 +888,18 @@ struct DBOptions { // Immutable. bool allow_ingest_behind = false; + // Needed to support differential snapshots. + // If set to true then DB will only process deletes with sequence number + // less than what was set by SetPreserveDeletesSequenceNumber(uint64_t ts). + // Clients are responsible to periodically call this method to advance + // the cutoff time. If this method is never called and preserve_deletes + // is set to true NO deletes will ever be processed. + // At the moment this only keeps normal deletes, SingleDeletes will + // not be preserved. + // DEFAULT: false + // Immutable (TODO: make it dynamically changeable) + bool preserve_deletes = false; + // If enabled it uses two queues for writes, one for the ones with // disable_memtable and one for the ones that also write to memtable. This // allows the memtable writes not to lag behind other writes. It can be used @@ -1081,6 +1093,13 @@ struct ReadOptions { // Default: empty (every table will be scanned) std::function table_filter; + // Needed to support differential snapshots. Has 2 effects: + // 1) Iterator will skip all internal keys with seqnum < iter_start_seqnum + // 2) if this param > 0 iterator will return INTERNAL keys instead of + // user keys; e.g. return tombstones as well. + // Default: 0 (don't filter by seqnum, return user keys) + SequenceNumber iter_start_seqnum; + ReadOptions(); ReadOptions(bool cksum, bool cache); }; diff --git a/include/rocksdb/table_properties.h b/include/rocksdb/table_properties.h index 2605fadd2..0ab4d5311 100644 --- a/include/rocksdb/table_properties.h +++ b/include/rocksdb/table_properties.h @@ -56,14 +56,6 @@ extern const std::string kPropertiesBlock; extern const std::string kCompressionDictBlock; extern const std::string kRangeDelBlock; -enum EntryType { - kEntryPut, - kEntryDelete, - kEntrySingleDelete, - kEntryMerge, - kEntryOther, -}; - // `TablePropertiesCollector` provides the mechanism for users to collect // their own properties that they are interested in. This class is essentially // a collection of callback functions that will be invoked during table diff --git a/include/rocksdb/types.h b/include/rocksdb/types.h index 106ac2f76..45a59a038 100644 --- a/include/rocksdb/types.h +++ b/include/rocksdb/types.h @@ -7,6 +7,7 @@ #define STORAGE_ROCKSDB_INCLUDE_TYPES_H_ #include +#include "rocksdb/slice.h" namespace rocksdb { @@ -15,6 +16,40 @@ namespace rocksdb { // Represents a sequence number in a WAL file. typedef uint64_t SequenceNumber; +// User-oriented representation of internal key types. +enum EntryType { + kEntryPut, + kEntryDelete, + kEntrySingleDelete, + kEntryMerge, + kEntryOther, +}; + +// tuple. +struct FullKey { + Slice user_key; + SequenceNumber sequence; + EntryType type; + + FullKey() + : sequence(0) + {} // Intentionally left uninitialized (for speed) + FullKey(const Slice& u, const SequenceNumber& seq, EntryType t) + : user_key(u), sequence(seq), type(t) { } + std::string DebugString(bool hex = false) const; + + void clear() { + user_key.clear(); + sequence = 0; + type = EntryType::kEntryPut; + } +}; + +// Parse slice representing internal key to FullKey +// Parsed FullKey is valid for as long as the memory pointed to by +// internal_key is alive. +bool ParseFullKey(const Slice& internal_key, FullKey* result); + } // namespace rocksdb #endif // STORAGE_ROCKSDB_INCLUDE_TYPES_H_ diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 74e74b3fa..1035e6f5d 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -304,6 +304,10 @@ class StackableDB : public DB { return db_->GetLatestSequenceNumber(); } + virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) override { + return db_->SetPreserveDeletesSequenceNumber(seqnum); + } + virtual Status GetSortedWalFiles(VectorLogPtr& files) override { return db_->GetSortedWalFiles(files); } diff --git a/options/cf_options.cc b/options/cf_options.cc index c2d7290f5..049199d2f 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -70,6 +70,7 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options, optimize_filters_for_hits(cf_options.optimize_filters_for_hits), force_consistency_checks(cf_options.force_consistency_checks), allow_ingest_behind(db_options.allow_ingest_behind), + preserve_deletes(db_options.preserve_deletes), listeners(db_options.listeners), row_cache(db_options.row_cache), max_subcompactions(db_options.max_subcompactions), diff --git a/options/cf_options.h b/options/cf_options.h index 35edf7c9c..682e5bf7f 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -111,6 +111,8 @@ struct ImmutableCFOptions { bool allow_ingest_behind; + bool preserve_deletes; + // A vector of EventListeners which call-back functions will be called // when specific RocksDB event happens. std::vector> listeners; diff --git a/options/db_options.cc b/options/db_options.cc index 53b0acece..723327288 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -84,6 +84,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) dump_malloc_stats(options.dump_malloc_stats), avoid_flush_during_recovery(options.avoid_flush_during_recovery), allow_ingest_behind(options.allow_ingest_behind), + preserve_deletes(options.preserve_deletes), concurrent_prepare(options.concurrent_prepare), manual_wal_flush(options.manual_wal_flush), seq_per_batch(options.seq_per_batch) { @@ -214,6 +215,8 @@ void ImmutableDBOptions::Dump(Logger* log) const { avoid_flush_during_recovery); ROCKS_LOG_HEADER(log, " Options.allow_ingest_behind: %d", allow_ingest_behind); + ROCKS_LOG_HEADER(log, " Options.preserve_deletes: %d", + preserve_deletes); ROCKS_LOG_HEADER(log, " Options.concurrent_prepare: %d", concurrent_prepare); ROCKS_LOG_HEADER(log, " Options.manual_wal_flush: %d", diff --git a/options/db_options.h b/options/db_options.h index 653448c44..8027ad04a 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -76,6 +76,7 @@ struct ImmutableDBOptions { bool dump_malloc_stats; bool avoid_flush_during_recovery; bool allow_ingest_behind; + bool preserve_deletes; bool concurrent_prepare; bool manual_wal_flush; bool seq_per_batch; diff --git a/options/options.cc b/options/options.cc index 6761a7b9f..2fba2f749 100644 --- a/options/options.cc +++ b/options/options.cc @@ -527,7 +527,8 @@ ReadOptions::ReadOptions() prefix_same_as_start(false), pin_data(false), background_purge_on_iterator_cleanup(false), - ignore_range_deletions(false) {} + ignore_range_deletions(false), + iter_start_seqnum(0) {} ReadOptions::ReadOptions(bool cksum, bool cache) : snapshot(nullptr), @@ -544,6 +545,7 @@ ReadOptions::ReadOptions(bool cksum, bool cache) prefix_same_as_start(false), pin_data(false), background_purge_on_iterator_cleanup(false), - ignore_range_deletions(false) {} + ignore_range_deletions(false), + iter_start_seqnum(0) {} } // namespace rocksdb diff --git a/options/options_helper.cc b/options/options_helper.cc index 6d95dbc6a..cbb389153 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -121,6 +121,8 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, mutable_db_options.avoid_flush_during_shutdown; options.allow_ingest_behind = immutable_db_options.allow_ingest_behind; + options.preserve_deletes = + immutable_db_options.preserve_deletes; return options; } diff --git a/options/options_helper.h b/options/options_helper.h index c97cd320d..0ee64daee 100644 --- a/options/options_helper.h +++ b/options/options_helper.h @@ -356,6 +356,10 @@ static std::unordered_map db_options_type_info = { {offsetof(struct DBOptions, allow_ingest_behind), OptionType::kBoolean, OptionVerificationType::kNormal, false, offsetof(struct ImmutableDBOptions, allow_ingest_behind)}}, + {"preserve_deletes", + {offsetof(struct DBOptions, preserve_deletes), OptionType::kBoolean, + OptionVerificationType::kNormal, false, + offsetof(struct ImmutableDBOptions, preserve_deletes)}}, {"concurrent_prepare", {offsetof(struct DBOptions, concurrent_prepare), OptionType::kBoolean, OptionVerificationType::kNormal, false, diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 58d0d8a4e..8c3872240 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -282,6 +282,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "avoid_flush_during_recovery=false;" "avoid_flush_during_shutdown=false;" "allow_ingest_behind=false;" + "preserve_deletes=false;" "concurrent_prepare=false;" "manual_wal_flush=false;" "seq_per_batch=false;",