From eee0af9af179063f189680e2bd79900de2f9fc6f Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Sat, 5 Dec 2020 14:17:11 -0800 Subject: [PATCH] Add full_history_ts_low to column family (#7740) Summary: Following https://github.com/facebook/rocksdb/issues/7655 and https://github.com/facebook/rocksdb/issues/7657, this PR adds `full_history_ts_low_` to `ColumnFamilyData`. `ColumnFamilyData::full_history_ts_low_` will be used to create `FlushJob` and `CompactionJob`. `ColumnFamilyData::full_history_ts_low` is persisted to the MANIFEST file. An application can only increase its value. Consider the following case: > > The database has a key at ts=950. `full_history_ts_low` is first set to 1000, and then a GC is triggered > and cleans up all data older than 1000. If the application sets `full_history_ts_low` to 900 afterwards, > and tries to read at ts=960, the key at 950 is not seen. From the perspective of the read, the result > is hard to reason. For simplicity, we just do now allow decreasing full_history_ts_low for now. > During recovery, the value of `full_history_ts_low` is restored for each column family if applicable. Note that version edits in the MANIFEST file for the same column family may have `full_history_ts_low` unsorted due to the potential interleaving of `LogAndApply` calls. Only the max will be used to restore the state of the column family. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7740 Test Plan: make check Reviewed By: ltamasi Differential Revision: D25296217 Pulled By: riversand963 fbshipit-source-id: 24acda1df8262cd7cfdc6ce7b0ec56438abe242a --- db/column_family.h | 17 ++++ db/db_impl/db_impl_compaction_flush.cc | 10 ++- db/version_edit.cc | 24 +++++ db/version_edit.h | 13 +++ db/version_edit_handler.cc | 4 + db/version_edit_test.cc | 10 +++ db/version_set.cc | 26 +++++- db/version_set.h | 5 ++ db/version_set_test.cc | 120 +++++++++++++++++++++++++ 9 files changed, 221 insertions(+), 8 deletions(-) diff --git a/db/column_family.h b/db/column_family.h index f622783ba..cc41c5b2b 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -509,6 +509,21 @@ class ColumnFamilyData { FSDirectory* GetDataDir(size_t path_id) const; + // full_history_ts_low_ can only increase. + void SetFullHistoryTsLow(std::string ts_low) { + assert(!ts_low.empty()); + const Comparator* ucmp = user_comparator(); + assert(ucmp); + if (full_history_ts_low_.empty() || + ucmp->CompareTimestamp(ts_low, full_history_ts_low_) > 0) { + full_history_ts_low_ = std::move(ts_low); + } + } + + const std::string& GetFullHistoryTsLow() const { + return full_history_ts_low_; + } + ThreadLocalPtr* TEST_GetLocalSV() { return local_sv_.get(); } private: @@ -605,6 +620,8 @@ class ColumnFamilyData { std::vector> data_dirs_; bool db_paths_registered_; + + std::string full_history_ts_low_; }; // ColumnFamilySet has interesting thread-safety requirements diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 04186a220..ee5756cf5 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -166,7 +166,7 @@ Status DBImpl::FlushMemTableToOutputFile( GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, true /* sync_output_directory */, true /* write_manifest */, thread_pri, - io_tracer_, db_id_, db_session_id_); + io_tracer_, db_id_, db_session_id_, cfd->GetFullHistoryTsLow()); FileMetaData file_meta; TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables"); @@ -407,7 +407,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, false /* sync_output_directory */, false /* write_manifest */, - thread_pri, io_tracer_, db_id_, db_session_id_)); + thread_pri, io_tracer_, db_id_, db_session_id_, + cfd->GetFullHistoryTsLow())); jobs.back()->PickMemTable(); } @@ -1200,7 +1201,8 @@ Status DBImpl::CompactFilesImpl( c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, &compaction_job_stats, Env::Priority::USER, io_tracer_, - &manual_compaction_paused_, db_id_, db_session_id_); + &manual_compaction_paused_, db_id_, db_session_id_, + c->column_family_data()->GetFullHistoryTsLow()); // Creating a compaction influences the compaction score because the score // takes running compactions into account (by skipping files that are already @@ -3054,7 +3056,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, c->mutable_cf_options()->report_bg_io_stats, dbname_, &compaction_job_stats, thread_pri, io_tracer_, is_manual ? &manual_compaction_paused_ : nullptr, db_id_, - db_session_id_); + db_session_id_, c->column_family_data()->GetFullHistoryTsLow()); compaction_job.Prepare(); NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, diff --git a/db/version_edit.cc b/db/version_edit.cc index 8f4fb5766..ddaadc58d 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -96,6 +96,7 @@ void VersionEdit::Clear() { column_family_name_.clear(); is_in_atomic_group_ = false; remaining_entries_ = 0; + full_history_ts_low_.clear(); } bool VersionEdit::EncodeTo(std::string* dst) const { @@ -252,6 +253,11 @@ bool VersionEdit::EncodeTo(std::string* dst) const { PutVarint32(dst, kInAtomicGroup); PutVarint32(dst, remaining_entries_); } + + if (HasFullHistoryTsLow()) { + PutVarint32(dst, kFullHistoryTsLow); + PutLengthPrefixedSlice(dst, full_history_ts_low_); + } return true; } @@ -612,6 +618,16 @@ Status VersionEdit::DecodeFrom(const Slice& src) { } break; + case kFullHistoryTsLow: + if (!GetLengthPrefixedSlice(&input, &str)) { + msg = "full_history_ts_low"; + } else if (str.empty()) { + msg = "full_history_ts_low: empty"; + } else { + full_history_ts_low_.assign(str.data(), str.size()); + } + break; + default: if (tag & kTagSafeIgnoreMask) { // Tag from future which can be safely ignored. @@ -744,6 +760,10 @@ std::string VersionEdit::DebugString(bool hex_key) const { AppendNumberTo(&r, remaining_entries_); r.append(" entries remains"); } + if (HasFullHistoryTsLow()) { + r.append("\n FullHistoryTsLow: "); + r.append(Slice(full_history_ts_low_).ToString(hex_key)); + } r.append("\n}\n"); return r; } @@ -873,6 +893,10 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const { jw << "AtomicGroup" << remaining_entries_; } + if (HasFullHistoryTsLow()) { + jw << "FullHistoryTsLow" << Slice(full_history_ts_low_).ToString(hex_key); + } + jw.EndObject(); return jw.Get(); diff --git a/db/version_edit.h b/db/version_edit.h index fed9613e3..6b045878b 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -61,6 +61,7 @@ enum Tag : uint32_t { kBlobFileGarbage, kWalAddition, kWalDeletion, + kFullHistoryTsLow, }; enum NewFileCustomTag : uint32_t { @@ -524,6 +525,16 @@ class VersionEdit { bool IsInAtomicGroup() const { return is_in_atomic_group_; } uint32_t GetRemainingEntries() const { return remaining_entries_; } + bool HasFullHistoryTsLow() const { return !full_history_ts_low_.empty(); } + const std::string& GetFullHistoryTsLow() const { + assert(HasFullHistoryTsLow()); + return full_history_ts_low_; + } + void SetFullHistoryTsLow(std::string full_history_ts_low) { + assert(!full_history_ts_low.empty()); + full_history_ts_low_ = std::move(full_history_ts_low); + } + // return true on success. bool EncodeTo(std::string* dst) const; Status DecodeFrom(const Slice& src); @@ -586,6 +597,8 @@ class VersionEdit { bool is_in_atomic_group_ = false; uint32_t remaining_entries_ = 0; + + std::string full_history_ts_low_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 44ec792ae..98cb0a831 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -520,6 +520,10 @@ Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, cfd->user_comparator()->Name(), "does not match existing comparator " + edit.comparator_); } + if (edit.HasFullHistoryTsLow()) { + const std::string& new_ts = edit.GetFullHistoryTsLow(); + cfd->SetFullHistoryTsLow(new_ts); + } } if (s.ok()) { diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index 88e98606a..a0869b3c7 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -8,8 +8,10 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/version_edit.h" + #include "test_util/sync_point.h" #include "test_util/testharness.h" +#include "test_util/testutil.h" #include "util/coding.h" #include "util/string_util.h" @@ -505,6 +507,14 @@ TEST_F(VersionEditTest, DeleteWalDebug) { ASSERT_EQ(edit.DebugJSON(4, true), expected_json); } +TEST_F(VersionEditTest, FullHistoryTsLow) { + VersionEdit edit; + ASSERT_FALSE(edit.HasFullHistoryTsLow()); + std::string ts = test::EncodeInt(0); + edit.SetFullHistoryTsLow(ts); + TestEncodeDecode(edit); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/version_set.cc b/db/version_set.cc index 1d75b8322..2f5348cfb 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -4044,7 +4044,9 @@ Status VersionSet::ProcessManifestWrites( } for (const auto* cfd : *column_family_set_) { assert(curr_state.find(cfd->GetID()) == curr_state.end()); - curr_state[cfd->GetID()] = {cfd->GetLogNumber()}; + curr_state.emplace(std::make_pair( + cfd->GetID(), + MutableCFState(cfd->GetLogNumber(), cfd->GetFullHistoryTsLow()))); } for (const auto& wal : wals_.GetWals()) { @@ -4227,13 +4229,21 @@ Status VersionSet::ProcessManifestWrites( // Each version in versions corresponds to a column family. // For each column family, update its log number indicating that logs // with number smaller than this should be ignored. + // TODO (yanqin): remove the nested loop if possible. for (const auto version : versions) { uint64_t max_log_number_in_batch = 0; + assert(version->cfd_); uint32_t cf_id = version->cfd_->GetID(); + std::string full_history_ts_low; for (const auto& e : batch_edits) { - if (e->has_log_number_ && e->column_family_ == cf_id) { - max_log_number_in_batch = - std::max(max_log_number_in_batch, e->log_number_); + if (e->column_family_ == cf_id) { + if (e->has_log_number_) { + max_log_number_in_batch = + std::max(max_log_number_in_batch, e->log_number_); + } + if (e->HasFullHistoryTsLow()) { + version->cfd_->SetFullHistoryTsLow(e->GetFullHistoryTsLow()); + } } } if (max_log_number_in_batch != 0) { @@ -4589,6 +4599,10 @@ Status VersionSet::ExtractInfoFromVersionEdit( cfd->user_comparator()->Name(), "does not match existing comparator " + from_edit.comparator_); } + if (from_edit.HasFullHistoryTsLow()) { + const std::string& new_ts = from_edit.GetFullHistoryTsLow(); + cfd->SetFullHistoryTsLow(new_ts); + } } if (from_edit.has_prev_log_number_) { @@ -5279,6 +5293,10 @@ Status VersionSet::WriteCurrentStateToManifest( assert(iter != curr_state.end()); uint64_t log_number = iter->second.log_number; edit.SetLogNumber(log_number); + const std::string& full_history_ts_low = iter->second.full_history_ts_low; + if (!full_history_ts_low.empty()) { + edit.SetFullHistoryTsLow(full_history_ts_low); + } std::string record; if (!edit.EncodeTo(&record)) { return Status::Corruption( diff --git a/db/version_set.h b/db/version_set.h index 9323eb57b..685d4348e 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1278,6 +1278,11 @@ class VersionSet { struct MutableCFState { uint64_t log_number; + std::string full_history_ts_low; + + explicit MutableCFState() = default; + explicit MutableCFState(uint64_t _log_number, std::string ts_low) + : log_number(_log_number), full_history_ts_low(std::move(ts_low)) {} }; // Save current contents to *log diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 540885720..b480deb3e 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -869,6 +869,28 @@ class VersionSetTestBase { mutex_.Unlock(); } + ColumnFamilyData* CreateColumnFamily(const std::string& cf_name, + const ColumnFamilyOptions& cf_options) { + VersionEdit new_cf; + new_cf.AddColumnFamily(cf_name); + uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID(); + new_cf.SetColumnFamily(new_id); + new_cf.SetLogNumber(0); + new_cf.SetComparatorName(cf_options.comparator->Name()); + Status s; + mutex_.Lock(); + s = versions_->LogAndApply(/*column_family_data=*/nullptr, + MutableCFOptions(cf_options), &new_cf, &mutex_, + /*db_directory=*/nullptr, + /*new_descriptor_log=*/false, &cf_options); + mutex_.Unlock(); + EXPECT_OK(s); + ColumnFamilyData* cfd = + versions_->GetColumnFamilySet()->GetColumnFamily(cf_name); + EXPECT_NE(nullptr, cfd); + return cfd; + } + Env* mem_env_; Env* env_; std::shared_ptr env_guard_; @@ -1667,6 +1689,104 @@ TEST_F(VersionSetTest, AtomicGroupWithWalEdits) { } } +class VersionSetWithTimestampTest : public VersionSetTest { + public: + static const std::string kNewCfName; + + explicit VersionSetWithTimestampTest() : VersionSetTest() {} + + void SetUp() override { + NewDB(); + Options options; + options.comparator = test::ComparatorWithU64Ts(); + cfd_ = CreateColumnFamily(kNewCfName, options); + EXPECT_NE(nullptr, cfd_); + EXPECT_NE(nullptr, cfd_->GetLatestMutableCFOptions()); + column_families_.emplace_back(kNewCfName, options); + } + + void TearDown() override { + for (auto* e : edits_) { + delete e; + } + edits_.clear(); + } + + void GenVersionEditsToSetFullHistoryTsLow( + const std::vector& ts_lbs) { + for (const auto ts_lb : ts_lbs) { + VersionEdit* edit = new VersionEdit; + edit->SetColumnFamily(cfd_->GetID()); + std::string ts_str = test::EncodeInt(ts_lb); + edit->SetFullHistoryTsLow(ts_str); + edits_.emplace_back(edit); + } + } + + void VerifyFullHistoryTsLow(uint64_t expected_ts_low) { + std::unique_ptr vset( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); + ASSERT_OK(vset->Recover(column_families_, /*read_only=*/false, + /*db_id=*/nullptr)); + for (auto* cfd : *(vset->GetColumnFamilySet())) { + ASSERT_NE(nullptr, cfd); + if (cfd->GetName() == kNewCfName) { + ASSERT_EQ(test::EncodeInt(expected_ts_low), cfd->GetFullHistoryTsLow()); + } else { + ASSERT_TRUE(cfd->GetFullHistoryTsLow().empty()); + } + } + } + + void DoTest(const std::vector& ts_lbs) { + if (ts_lbs.empty()) { + return; + } + + GenVersionEditsToSetFullHistoryTsLow(ts_lbs); + + Status s; + mutex_.Lock(); + s = versions_->LogAndApply(cfd_, *(cfd_->GetLatestMutableCFOptions()), + edits_, &mutex_); + mutex_.Unlock(); + ASSERT_OK(s); + VerifyFullHistoryTsLow(*std::max_element(ts_lbs.begin(), ts_lbs.end())); + } + + protected: + ColumnFamilyData* cfd_{nullptr}; + // edits_ must contain and own pointers to heap-alloc VersionEdit objects. + autovector edits_; +}; + +const std::string VersionSetWithTimestampTest::kNewCfName("new_cf"); + +TEST_F(VersionSetWithTimestampTest, SetFullHistoryTsLbOnce) { + constexpr uint64_t kTsLow = 100; + DoTest({kTsLow}); +} + +// Simulate the application increasing full_history_ts_low. +TEST_F(VersionSetWithTimestampTest, IncreaseFullHistoryTsLb) { + const std::vector ts_lbs = {100, 101, 102, 103}; + DoTest(ts_lbs); +} + +// Simulate the application trying to decrease full_history_ts_low +// unsuccessfully. If the application calls public API sequentially to +// decrease the lower bound ts, RocksDB will return an InvalidArgument +// status before involving VersionSet. Only when multiple threads trying +// to decrease the lower bound concurrently will this case ever happen. Even +// so, the lower bound cannot be decreased. The application will be notified +// via return value of the API. +TEST_F(VersionSetWithTimestampTest, TryDecreaseFullHistoryTsLb) { + const std::vector ts_lbs = {103, 102, 101, 100}; + DoTest(ts_lbs); +} + class VersionSetAtomicGroupTest : public VersionSetTestBase, public testing::Test { public: