diff --git a/db/column_family.cc b/db/column_family.cc index 7ad3d408f..185ec729c 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -1376,6 +1376,33 @@ Status ColumnFamilyData::ValidateOptions( } } + const auto* ucmp = cf_options.comparator; + assert(ucmp); + if (ucmp->timestamp_size() > 0 && + !cf_options.persist_user_defined_timestamps) { + if (db_options.atomic_flush) { + return Status::NotSupported( + "Not persisting user-defined timestamps feature is not supported" + "in combination with atomic flush."); + } + if (db_options.allow_concurrent_memtable_write) { + return Status::NotSupported( + "Not persisting user-defined timestamps feature is not supported" + " in combination with concurrent memtable write."); + } + const char* comparator_name = cf_options.comparator->Name(); + size_t name_size = strlen(comparator_name); + const char* suffix = ".u64ts"; + size_t suffix_size = strlen(suffix); + if (name_size <= suffix_size || + strcmp(comparator_name + name_size - suffix_size, suffix) != 0) { + return Status::NotSupported( + "Not persisting user-defined timestamps" + "feature only support user-defined timestamps formatted as " + "uint64_t."); + } + } + if (cf_options.enable_blob_garbage_collection) { if (cf_options.blob_garbage_collection_age_cutoff < 0.0 || cf_options.blob_garbage_collection_age_cutoff > 1.0) { @@ -1515,6 +1542,43 @@ FSDirectory* ColumnFamilyData::GetDataDir(size_t path_id) const { return data_dirs_[path_id].get(); } +bool ColumnFamilyData::ShouldPostponeFlushToRetainUDT( + uint64_t max_memtable_id) { + const Comparator* ucmp = user_comparator(); + const size_t ts_sz = ucmp->timestamp_size(); + if (ts_sz == 0 || ioptions_.persist_user_defined_timestamps) { + return false; + } + // If users set the `persist_user_defined_timestamps` flag to false, they + // should also set the `full_history_ts_low` flag to indicate the range of + // user-defined timestamps to retain in memory. Otherwise, we do not + // explicitly postpone flush to retain UDTs. + const std::string& full_history_ts_low = GetFullHistoryTsLow(); + if (full_history_ts_low.empty()) { + return false; + } +#ifndef NDEBUG + Slice last_table_newest_udt; +#endif /* !NDEBUG */ + for (const Slice& table_newest_udt : + imm()->GetTablesNewestUDT(max_memtable_id)) { + assert(table_newest_udt.size() == full_history_ts_low.size()); + assert(last_table_newest_udt.empty() || + ucmp->CompareTimestamp(table_newest_udt, last_table_newest_udt) >= + 0); + // Checking the newest UDT contained in MemTable with ascending ID up to + // `max_memtable_id`. MemTable with bigger ID will have newer UDT, return + // immediately on finding the first MemTable that needs postponing. + if (ucmp->CompareTimestamp(table_newest_udt, full_history_ts_low) >= 0) { + return true; + } +#ifndef NDEBUG + last_table_newest_udt = table_newest_udt; +#endif /* !NDEBUG */ + } + return false; +} + void ColumnFamilyData::RecoverEpochNumbers() { assert(current_); auto* vstorage = current_->storage_info(); diff --git a/db/column_family.h b/db/column_family.h index 05f126ae6..f976c24cc 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -506,6 +506,12 @@ class ColumnFamilyData { return full_history_ts_low_; } + // REQUIRES: DB mutex held. + // Return true if flushing up to MemTables with ID `max_memtable_id` + // should be postponed to retain user-defined timestamps according to the + // user's setting. Called by background flush job. + bool ShouldPostponeFlushToRetainUDT(uint64_t max_memtable_id); + ThreadLocalPtr* TEST_GetLocalSV() { return local_sv_.get(); } WriteBufferManager* write_buffer_mgr() { return write_buffer_manager_; } std::shared_ptr diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 08393c350..c0574ee55 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -17,6 +17,7 @@ #include "options/options_parser.h" #include "port/port.h" #include "port/stack_trace.h" +#include "rocksdb/comparator.h" #include "rocksdb/convenience.h" #include "rocksdb/db.h" #include "rocksdb/env.h" @@ -63,6 +64,9 @@ class ColumnFamilyTestBase : public testing::Test { db_options_.create_if_missing = true; db_options_.fail_if_options_file_error = true; db_options_.env = env_; + } + + void SetUp() override { EXPECT_OK(DestroyDB(dbname_, Options(db_options_, column_family_options_))); } @@ -3380,6 +3384,205 @@ TEST(ColumnFamilyTest, ValidateMemtableKVChecksumOption) { ASSERT_OK(ColumnFamilyData::ValidateOptions(db_options, cf_options)); } +// Tests the flushing behavior of a column family to retain user-defined +// timestamp when `persist_user_defined_timestamp` is false. +class ColumnFamilyRetainUDTTest : public ColumnFamilyTestBase { + public: + ColumnFamilyRetainUDTTest() : ColumnFamilyTestBase(kLatestFormatVersion) {} + + void SetUp() override { + db_options_.allow_concurrent_memtable_write = false; + column_family_options_.comparator = + test::BytewiseComparatorWithU64TsWrapper(); + column_family_options_.persist_user_defined_timestamps = false; + ColumnFamilyTestBase::SetUp(); + } + + Status Put(int cf, const std::string& key, const std::string& ts, + const std::string& value) { + return db_->Put(WriteOptions(), handles_[cf], Slice(key), Slice(ts), + Slice(value)); + } +}; + +class TestTsComparator : public Comparator { + public: + TestTsComparator() : Comparator(8 /*ts_sz*/) {} + + int Compare(const ROCKSDB_NAMESPACE::Slice& /*a*/, + const ROCKSDB_NAMESPACE::Slice& /*b*/) const override { + return 0; + } + const char* Name() const override { return "TestTs"; } + void FindShortestSeparator( + std::string* /*start*/, + const ROCKSDB_NAMESPACE::Slice& /*limit*/) const override {} + void FindShortSuccessor(std::string* /*key*/) const override {} +}; + +TEST_F(ColumnFamilyRetainUDTTest, SanityCheck) { + Open(); + ColumnFamilyOptions cf_options; + cf_options.persist_user_defined_timestamps = false; + TestTsComparator test_comparator; + cf_options.comparator = &test_comparator; + ColumnFamilyHandle* handle; + // Not persisting user-defined timestamps feature only supports user-defined + // timestamps formatted as uint64_t. + ASSERT_TRUE( + db_->CreateColumnFamily(cf_options, "pikachu", &handle).IsNotSupported()); + + Destroy(); + // Not persisting user-defined timestamps feature doesn't work in combination + // with atomic flush. + db_options_.atomic_flush = true; + ASSERT_TRUE(TryOpen({"default"}).IsNotSupported()); + + // Not persisting user-defined timestamps feature doesn't work in combination + // with concurrent memtable write. + db_options_.atomic_flush = false; + db_options_.allow_concurrent_memtable_write = true; + ASSERT_TRUE(TryOpen({"default"}).IsNotSupported()); + Close(); +} + +TEST_F(ColumnFamilyRetainUDTTest, FullHistoryTsLowNotSet) { + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) { + ASSERT_NE(nullptr, arg); + auto reschedule_count = *static_cast(arg); + ASSERT_EQ(1, reschedule_count); + }); + + SyncPoint::GetInstance()->EnableProcessing(); + Open(); + std::string write_ts; + PutFixed64(&write_ts, 1); + ASSERT_OK(Put(0, "foo", write_ts, "v1")); + // No `full_history_ts_low` explicitly set by user, flush is continued + // without checking if its UDTs expired. + ASSERT_OK(Flush(0)); + + // After flush, `full_history_ts_low` should be automatically advanced to + // the effective cutoff timestamp: write_ts + 1 + std::string cutoff_ts; + PutFixed64(&cutoff_ts, 2); + std::string effective_full_history_ts_low; + ASSERT_OK( + db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low)); + ASSERT_EQ(cutoff_ts, effective_full_history_ts_low); + Close(); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_F(ColumnFamilyRetainUDTTest, AllKeysExpired) { + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) { + ASSERT_NE(nullptr, arg); + auto reschedule_count = *static_cast(arg); + ASSERT_EQ(1, reschedule_count); + }); + + SyncPoint::GetInstance()->EnableProcessing(); + Open(); + std::string write_ts; + PutFixed64(&write_ts, 1); + ASSERT_OK(Put(0, "foo", write_ts, "v1")); + std::string cutoff_ts; + PutFixed64(&cutoff_ts, 3); + ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts)); + // All keys expired w.r.t the configured `full_history_ts_low`, flush continue + // without the need for a re-schedule. + ASSERT_OK(Flush(0)); + + // `full_history_ts_low` stays unchanged after flush. + std::string effective_full_history_ts_low; + ASSERT_OK( + db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low)); + ASSERT_EQ(cutoff_ts, effective_full_history_ts_low); + Close(); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} +TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushToAvoidWriteStall) { + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) { + ASSERT_NE(nullptr, arg); + auto reschedule_count = *static_cast(arg); + ASSERT_EQ(1, reschedule_count); + }); + + SyncPoint::GetInstance()->EnableProcessing(); + Open(); + std::string cutoff_ts; + std::string write_ts; + PutFixed64(&write_ts, 1); + ASSERT_OK(Put(0, "foo", write_ts, "v1")); + PutFixed64(&cutoff_ts, 1); + ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts)); + ASSERT_OK(db_->SetOptions(handles_[0], {{"max_write_buffer_number", "1"}})); + // Not all keys expired, but flush is continued without a re-schedule because + // of risk of write stall. + ASSERT_OK(Flush(0)); + + // After flush, `full_history_ts_low` should be automatically advanced to + // the effective cutoff timestamp: write_ts + 1 + std::string effective_full_history_ts_low; + ASSERT_OK( + db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low)); + + cutoff_ts.clear(); + PutFixed64(&cutoff_ts, 2); + ASSERT_EQ(cutoff_ts, effective_full_history_ts_low); + Close(); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) { + std::string cutoff_ts; + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::AfterRetainUDTReschedule:cb", [&](void* /*arg*/) { + // Increasing full_history_ts_low so all keys expired after the initial + // FlushRequest is rescheduled + cutoff_ts.clear(); + PutFixed64(&cutoff_ts, 3); + ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts)); + }); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) { + ASSERT_NE(nullptr, arg); + auto reschedule_count = *static_cast(arg); + ASSERT_EQ(2, reschedule_count); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + Open(); + std::string write_ts; + PutFixed64(&write_ts, 1); + ASSERT_OK(Put(0, "foo", write_ts, "v1")); + PutFixed64(&cutoff_ts, 1); + ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts)); + // Not all keys expired, and there is no risk of write stall. Flush is + // rescheduled. The actual flush happens after `full_history_ts_low` is + // increased to mark all keys expired. + ASSERT_OK(Flush(0)); + + std::string effective_full_history_ts_low; + ASSERT_OK( + db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low)); + // `full_history_ts_low` stays unchanged. + ASSERT_EQ(cutoff_ts, effective_full_history_ts_low); + Close(); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index a1b4035e0..00a33669b 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -5231,6 +5231,12 @@ TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) { } auto manual_compaction_thread = port::Thread([this]() { + // Write something to make the current Memtable non-empty, so an extra + // immutable Memtable will be created upon manual flush requested by + // CompactRange, triggering a write stall mode to be entered because of + // accumulation of write buffers due to manual flush. + Random compact_rnd(301); + ASSERT_OK(Put(Key(0), compact_rnd.RandomString(1024))); CompactRangeOptions cro; cro.allow_write_stall = false; ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 57c43b517..27f539182 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2063,6 +2063,10 @@ class DBImpl : public DB { // flush is considered complete. std::unordered_map cfd_to_max_mem_id_to_persist; + +#ifndef NDEBUG + int reschedule_count = 1; +#endif /* !NDEBUG */ }; void GenerateFlushRequest(const autovector& cfds, @@ -2091,6 +2095,7 @@ class DBImpl : public DB { Env::Priority thread_pri); Status BackgroundFlush(bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer, FlushReason* reason, + bool* flush_rescheduled_to_retain_udt, Env::Priority thread_pri); bool EnoughRoomForCompaction(ColumnFamilyData* cfd, @@ -2103,6 +2108,12 @@ class DBImpl : public DB { std::unique_ptr* token, LogBuffer* log_buffer); + // Return true if the `FlushRequest` can be rescheduled to retain the UDT. + // Only true if there are user-defined timestamps in the involved MemTables + // with newer than cutoff timestamp `full_history_ts_low` and not flushing + // immediately will not cause entering write stall mode. + bool ShouldRescheduleFlushRequestToRetainUDT(const FlushRequest& flush_req); + // Schedule background tasks Status StartPeriodicTaskScheduler(); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index c64d4ecdb..4e0372e69 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -21,6 +21,7 @@ #include "monitoring/thread_status_util.h" #include "test_util/sync_point.h" #include "util/cast_util.h" +#include "util/coding.h" #include "util/concurrent_task_limiter_impl.h" namespace ROCKSDB_NAMESPACE { @@ -76,6 +77,40 @@ bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force, return false; } +bool DBImpl::ShouldRescheduleFlushRequestToRetainUDT( + const FlushRequest& flush_req) { + mutex_.AssertHeld(); + assert(flush_req.cfd_to_max_mem_id_to_persist.size() == 1); + ColumnFamilyData* cfd = flush_req.cfd_to_max_mem_id_to_persist.begin()->first; + uint64_t max_memtable_id = + flush_req.cfd_to_max_mem_id_to_persist.begin()->second; + if (cfd->IsDropped() || + !cfd->ShouldPostponeFlushToRetainUDT(max_memtable_id)) { + return false; + } + // Check if holding on the flush will cause entering write stall mode. + // Write stall entered because of the accumulation of write buffers can be + // alleviated if we continue with the flush instead of postponing it. + const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions(); + + // Taking the status of the active Memtable into consideration so that we are + // not just checking if DB is currently already in write stall mode. + int mem_to_flush = cfd->mem()->ApproximateMemoryUsageFast() >= + cfd->mem()->write_buffer_size() / 2 + ? 1 + : 0; + WriteStallCondition write_stall = + ColumnFamilyData::GetWriteStallConditionAndCause( + cfd->imm()->NumNotFlushed() + mem_to_flush, /*num_l0_files=*/0, + /*num_compaction_needed_bytes=*/0, mutable_cf_options, + *cfd->ioptions()) + .first; + if (write_stall != WriteStallCondition::kNormal) { + return false; + } + return true; +} + IOStatus DBImpl::SyncClosedLogs(JobContext* job_context, VersionEdit* synced_wals) { TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start"); @@ -2506,8 +2541,11 @@ Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd, // check whether one extra immutable memtable or an extra L0 file would // cause write stalling mode to be entered. It could still enter stall // mode due to pending compaction bytes, but that's less common + // No extra immutable Memtable will be created if the current Memtable is + // empty. + int mem_to_flush = cfd->mem()->IsEmpty() ? 0 : 1; write_stall_condition = ColumnFamilyData::GetWriteStallConditionAndCause( - cfd->imm()->NumNotFlushed() + 1, + cfd->imm()->NumNotFlushed() + mem_to_flush, vstorage->l0_delay_trigger_count() + 1, vstorage->estimated_compaction_needed_bytes(), mutable_cf_options, *cfd->ioptions()) @@ -2945,6 +2983,7 @@ void DBImpl::UnscheduleFlushCallback(void* arg) { Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, LogBuffer* log_buffer, FlushReason* reason, + bool* flush_rescheduled_to_retain_udt, Env::Priority thread_pri) { mutex_.AssertHeld(); @@ -2970,12 +3009,43 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, autovector column_families_not_to_flush; while (!flush_queue_.empty()) { // This cfd is already referenced - auto [flush_reason, cfd_to_max_mem_id_to_persist] = - PopFirstFromFlushQueue(); + FlushRequest flush_req = PopFirstFromFlushQueue(); + FlushReason flush_reason = flush_req.flush_reason; + if (!immutable_db_options_.atomic_flush && + ShouldRescheduleFlushRequestToRetainUDT(flush_req)) { + assert(flush_req.cfd_to_max_mem_id_to_persist.size() == 1); + ColumnFamilyData* cfd = + flush_req.cfd_to_max_mem_id_to_persist.begin()->first; + if (cfd->UnrefAndTryDelete()) { + return Status::OK(); + } + ROCKS_LOG_BUFFER(log_buffer, + "FlushRequest for column family %s is re-scheduled to " + "retain user-defined timestamps.", + cfd->GetName().c_str()); + // Reschedule the `FlushRequest` as is without checking dropped column + // family etc. The follow-up job will do the check anyways, so save the + // duplication. Column family is deduplicated by `SchdulePendingFlush` and + // `PopFirstFromFlushQueue` contains at flush request enqueueing and + // dequeueing time. + // This flush request is rescheduled right after it's popped from the + // queue while the db mutex is held, so there should be no other + // FlushRequest for the same column family with higher `max_memtable_id` + // in the queue to block the reschedule from succeeding. +#ifndef NDEBUG + flush_req.reschedule_count += 1; +#endif /* !NDEBUG */ + SchedulePendingFlush(flush_req); + *reason = flush_reason; + *flush_rescheduled_to_retain_udt = true; + return Status::TryAgain(); + } superversion_contexts.clear(); - superversion_contexts.reserve(cfd_to_max_mem_id_to_persist.size()); + superversion_contexts.reserve( + flush_req.cfd_to_max_mem_id_to_persist.size()); - for (const auto& [cfd, max_memtable_id] : cfd_to_max_mem_id_to_persist) { + for (const auto& [cfd, max_memtable_id] : + flush_req.cfd_to_max_mem_id_to_persist) { if (cfd->GetMempurgeUsed()) { // If imm() contains silent memtables (e.g.: because // MemPurge was activated), requesting a flush will @@ -2992,7 +3062,13 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, bg_flush_args.emplace_back(cfd, max_memtable_id, &(superversion_contexts.back()), flush_reason); } - if (!bg_flush_args.empty()) { + // `MaybeScheduleFlushOrCompaction` schedules as many `BackgroundCallFlush` + // jobs as the number of `FlushRequest` in the `flush_queue_`, a.k.a + // `unscheduled_flushes_`. So it's sufficient to make each `BackgroundFlush` + // handle one `FlushRequest` and each have a Status returned. + if (!bg_flush_args.empty() || !column_families_not_to_flush.empty()) { + TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundFlush:CheckFlushRequest:cb", + const_cast(&flush_req.reschedule_count)); break; } } @@ -3054,11 +3130,20 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) { pending_outputs_inserted_elem(new std::list::iterator( CaptureCurrentFileNumberInPendingOutputs())); FlushReason reason; - - Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer, - &reason, thread_pri); - if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped() && - reason != FlushReason::kErrorRecovery) { + bool flush_rescheduled_to_retain_udt = false; + Status s = + BackgroundFlush(&made_progress, &job_context, &log_buffer, &reason, + &flush_rescheduled_to_retain_udt, thread_pri); + if (s.IsTryAgain() && flush_rescheduled_to_retain_udt) { + bg_cv_.SignalAll(); // In case a waiter can proceed despite the error + mutex_.Unlock(); + TEST_SYNC_POINT_CALLBACK("DBImpl::AfterRetainUDTReschedule:cb", nullptr); + immutable_db_options_.clock->SleepForMicroseconds( + 100000); // prevent hot loop + mutex_.Lock(); + } else if (!s.ok() && !s.IsShutdownInProgress() && + !s.IsColumnFamilyDropped() && + reason != FlushReason::kErrorRecovery) { // Wait a little bit before retrying background flush in // case this is an environmental problem and we do not want to // chew up resources for failed flushes for the duration of @@ -3079,29 +3164,33 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) { TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FlushFinish:0"); ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); - - // If flush failed, we want to delete all temporary files that we might have - // created. Thus, we force full scan in FindObsoleteFiles() - FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() && - !s.IsColumnFamilyDropped()); - // delete unnecessary files if any, this is done outside the mutex - if (job_context.HaveSomethingToClean() || - job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { - mutex_.Unlock(); - TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound"); - // Have to flush the info logs before bg_flush_scheduled_-- - // because if bg_flush_scheduled_ becomes 0 and the lock is - // released, the deconstructor of DB can kick in and destroy all the - // states of DB so info_log might not be available after that point. - // It also applies to access other states that DB owns. - log_buffer.FlushBufferToLog(); - if (job_context.HaveSomethingToDelete()) { - PurgeObsoleteFiles(job_context); + // There is no need to do these clean up if the flush job is rescheduled + // to retain user-defined timestamps because the job doesn't get to the + // stage of actually flushing the MemTables. + if (!flush_rescheduled_to_retain_udt) { + // If flush failed, we want to delete all temporary files that we might + // have created. Thus, we force full scan in FindObsoleteFiles() + FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() && + !s.IsColumnFamilyDropped()); + // delete unnecessary files if any, this is done outside the mutex + if (job_context.HaveSomethingToClean() || + job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { + mutex_.Unlock(); + TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound"); + // Have to flush the info logs before bg_flush_scheduled_-- + // because if bg_flush_scheduled_ becomes 0 and the lock is + // released, the deconstructor of DB can kick in and destroy all the + // states of DB so info_log might not be available after that point. + // It also applies to access other states that DB owns. + log_buffer.FlushBufferToLog(); + if (job_context.HaveSomethingToDelete()) { + PurgeObsoleteFiles(job_context); + } + job_context.Clean(); + mutex_.Lock(); } - job_context.Clean(); - mutex_.Lock(); + TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:ContextCleanedUp"); } - TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:ContextCleanedUp"); assert(num_running_flushes_ > 0); num_running_flushes_--; diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 232d32972..915ebe0b0 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -324,10 +324,14 @@ class DBWALTestWithTimestamp } Status CreateAndReopenWithCFWithTs(const std::vector& cfs, - const Options& options, + Options& ts_options, bool avoid_flush_during_recovery = false) { - CreateColumnFamilies(cfs, options); - return ReopenColumnFamiliesWithTs(cfs, options, + Options default_options = CurrentOptions(); + default_options.allow_concurrent_memtable_write = + persist_udt_ ? true : false; + DestroyAndReopen(default_options); + CreateColumnFamilies(cfs, ts_options); + return ReopenColumnFamiliesWithTs(cfs, ts_options, avoid_flush_during_recovery); } @@ -336,6 +340,8 @@ class DBWALTestWithTimestamp bool avoid_flush_during_recovery = false) { Options default_options = CurrentOptions(); default_options.create_if_missing = false; + default_options.allow_concurrent_memtable_write = + persist_udt_ ? true : false; default_options.avoid_flush_during_recovery = avoid_flush_during_recovery; ts_options.create_if_missing = false; @@ -370,12 +376,11 @@ class DBWALTestWithTimestamp TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) { // Set up the option that enables user defined timestmp size. - std::string ts1 = Timestamp(1, 0); - const size_t kTimestampSize = ts1.size(); - TestComparator test_cmp(kTimestampSize); + std::string ts1; + PutFixed64(&ts1, 1); Options ts_options; ts_options.create_if_missing = true; - ts_options.comparator = &test_cmp; + ts_options.comparator = test::BytewiseComparatorWithU64TsWrapper(); // Test that user-defined timestamps are recovered from WAL regardless of // the value of this flag because UDTs are saved in WAL nonetheless. // We however need to explicitly disable flush during recovery by setting @@ -405,14 +410,16 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) { // Write more value versions for key "foo" and "bar" before and after second // reopen. - std::string ts2 = Timestamp(2, 0); + std::string ts2; + PutFixed64(&ts2, 2); ASSERT_OK(Put(1, "bar", ts2, "v2")); ASSERT_OK(Put(1, "foo", ts2, "v3")); ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, avoid_flush_during_recovery)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U); - std::string ts3 = Timestamp(3, 0); + std::string ts3; + PutFixed64(&ts3, 3); ASSERT_OK(Put(1, "foo", ts3, "v4")); // Do a timestamped read with ts1 after third reopen. @@ -435,11 +442,26 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) { } while (ChangeWalOptions()); } +class TestTsSzComparator : public Comparator { + public: + explicit TestTsSzComparator(size_t ts_sz) : Comparator(ts_sz) {} + + int Compare(const ROCKSDB_NAMESPACE::Slice& /*a*/, + const ROCKSDB_NAMESPACE::Slice& /*b*/) const override { + return 0; + } + const char* Name() const override { return "TestTsSzComparator.u64ts"; } + void FindShortestSeparator( + std::string* /*start*/, + const ROCKSDB_NAMESPACE::Slice& /*limit*/) const override {} + void FindShortSuccessor(std::string* /*key*/) const override {} +}; + TEST_P(DBWALTestWithTimestamp, RecoverInconsistentTimestamp) { // Set up the option that enables user defined timestmp size. - std::string ts = Timestamp(1, 0); - const size_t kTimestampSize = ts.size(); - TestComparator test_cmp(kTimestampSize); + std::string ts; + PutFixed16(&ts, 1); + TestTsSzComparator test_cmp(2); Options ts_options; ts_options.create_if_missing = true; ts_options.comparator = &test_cmp; @@ -452,11 +474,11 @@ TEST_P(DBWALTestWithTimestamp, RecoverInconsistentTimestamp) { // In real use cases, switching to a different user comparator is prohibited // by a sanity check during DB open that does a user comparator name // comparison. This test mocked and bypassed that sanity check because the - // before and after user comparator are both named "TestComparator". This is - // to test the user-defined timestamp recovery logic for WAL files have - // the intended consistency check. + // before and after user comparator are both named "TestTsSzComparator.u64ts". + // This is to test the user-defined timestamp recovery logic for WAL files + // have the intended consistency check. // `HandleWriteBatchTimestampSizeDifference` in udt_util.h has more details. - TestComparator diff_test_cmp(kTimestampSize + 1); + TestTsSzComparator diff_test_cmp(3); ts_options.comparator = &diff_test_cmp; ASSERT_TRUE( ReopenColumnFamiliesWithTs({"pikachu"}, ts_options).IsInvalidArgument()); @@ -464,13 +486,13 @@ TEST_P(DBWALTestWithTimestamp, RecoverInconsistentTimestamp) { TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) { // Set up the option that enables user defined timestamp size. - std::string min_ts = Timestamp(0, 0); - std::string write_ts = Timestamp(1, 0); - const size_t kTimestampSize = write_ts.size(); - TestComparator test_cmp(kTimestampSize); + std::string min_ts; + std::string write_ts; + PutFixed64(&min_ts, 0); + PutFixed64(&write_ts, 1); Options ts_options; ts_options.create_if_missing = true; - ts_options.comparator = &test_cmp; + ts_options.comparator = test::BytewiseComparatorWithU64TsWrapper(); ts_options.persist_user_defined_timestamps = persist_udt_; std::string smallest_ukey_without_ts = "baz"; diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index bd82e49e0..9b8ca31d3 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -3289,15 +3289,18 @@ TEST_P(HandleFileBoundariesTest, ConfigurePersistUdt) { options.env = env_; // Write a timestamp that is not the min timestamp to help test the behavior // of flag `persist_user_defined_timestamps`. - std::string write_ts = Timestamp(1, 0); - std::string min_ts = Timestamp(0, 0); + std::string write_ts; + std::string min_ts; + PutFixed64(&write_ts, 1); + PutFixed64(&min_ts, 0); std::string smallest_ukey_without_ts = "bar"; std::string largest_ukey_without_ts = "foo"; - const size_t kTimestampSize = write_ts.size(); - TestComparator test_cmp(kTimestampSize); - options.comparator = &test_cmp; + options.comparator = test::BytewiseComparatorWithU64TsWrapper(); bool persist_udt = test::ShouldPersistUDT(GetParam()); options.persist_user_defined_timestamps = persist_udt; + if (!persist_udt) { + options.allow_concurrent_memtable_write = false; + } DestroyAndReopen(options); ASSERT_OK( diff --git a/db/flush_job.cc b/db/flush_job.cc index 3854e967a..bfdd9a059 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -189,6 +189,10 @@ void FlushJob::PickMemTable() { return; } + // Track effective cutoff user-defined timestamp during flush if + // user-defined timestamps can be stripped. + GetEffectiveCutoffUDTForPickedMemTables(); + ReportFlushInputSize(mems_); // entries mems are (implicitly) sorted in ascending order by their created @@ -294,6 +298,10 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta, s = Status::ShutdownInProgress("Database shutdown"); } + if (s.ok()) { + s = MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT(); + } + if (!s.ok()) { cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber()); } else if (write_manifest_) { @@ -1097,4 +1105,57 @@ std::unique_ptr FlushJob::GetFlushJobInfo() const { return info; } +void FlushJob::GetEffectiveCutoffUDTForPickedMemTables() { + db_mutex_->AssertHeld(); + assert(pick_memtable_called); + const auto* ucmp = cfd_->internal_comparator().user_comparator(); + assert(ucmp); + const size_t ts_sz = ucmp->timestamp_size(); + if (db_options_.atomic_flush || ts_sz == 0 || + cfd_->ioptions()->persist_user_defined_timestamps) { + return; + } + for (MemTable* m : mems_) { + Slice table_newest_udt = m->GetNewestUDT(); + // The picked Memtables should have ascending ID, and should have + // non-decreasing newest user-defined timestamps. + if (!cutoff_udt_.empty()) { + assert(table_newest_udt.size() == cutoff_udt_.size()); + assert(ucmp->CompareTimestamp(table_newest_udt, cutoff_udt_) >= 0); + cutoff_udt_.clear(); + } + cutoff_udt_.assign(table_newest_udt.data(), table_newest_udt.size()); + } +} + +Status FlushJob::MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT() { + db_mutex_->AssertHeld(); + const auto* ucmp = cfd_->user_comparator(); + assert(ucmp); + const std::string& full_history_ts_low = cfd_->GetFullHistoryTsLow(); + // Update full_history_ts_low to right above cutoff udt only if that would + // increase it. + if (cutoff_udt_.empty() || + (!full_history_ts_low.empty() && + ucmp->CompareTimestamp(cutoff_udt_, full_history_ts_low) < 0)) { + return Status::OK(); + } + Slice cutoff_udt_slice = cutoff_udt_; + uint64_t cutoff_udt_ts = 0; + bool format_res = GetFixed64(&cutoff_udt_slice, &cutoff_udt_ts); + assert(format_res); + (void)format_res; + std::string new_full_history_ts_low; + // TODO(yuzhangyu): Add a member to AdvancedColumnFamilyOptions for an + // operation to get the next immediately larger user-defined timestamp to + // expand this feature to other user-defined timestamp formats. + PutFixed64(&new_full_history_ts_low, cutoff_udt_ts + 1); + VersionEdit edit; + edit.SetColumnFamily(cfd_->GetID()); + edit.SetFullHistoryTsLow(new_full_history_ts_low); + return versions_->LogAndApply(cfd_, *cfd_->GetLatestMutableCFOptions(), + ReadOptions(), &edit, db_mutex_, + output_file_directory_); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/flush_job.h b/db/flush_job.h index d3902f0bd..43d10ffe9 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -127,6 +127,20 @@ class FlushJob { Env::IOPriority GetRateLimiterPriorityForWrite(); std::unique_ptr GetFlushJobInfo() const; + // Require db_mutex held. + // Called only when UDT feature is enabled and + // `persist_user_defined_timestamps` flag is false. Because we will refrain + // from flushing as long as there are still UDTs in a memtable that hasn't + // expired w.r.t `full_history_ts_low`. However, flush is continued if there + // is risk of entering write stall mode. In that case, we need + // to track the effective cutoff timestamp below which all the udts are + // removed because of flush, and use it to increase `full_history_ts_low` if + // the effective cutoff timestamp is newer. See + // `MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT` for details. + void GetEffectiveCutoffUDTForPickedMemTables(); + + Status MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT(); + const std::string& dbname_; const std::string db_id_; const std::string db_session_id_; @@ -195,6 +209,10 @@ class FlushJob { // db mutex const SeqnoToTimeMapping& db_impl_seqno_time_mapping_; SeqnoToTimeMapping seqno_to_time_mapping_; + + // Keeps track of the newest user-defined timestamp for this flush job if + // `persist_user_defined_timestamps` flag is false. + std::string cutoff_udt_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index f2915ed39..2e6c4d426 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -654,6 +654,10 @@ class FlushJobTimestampTest installed_file_meta->smallest.Encode()); ASSERT_EQ(expected_largest.Encode(), installed_file_meta->largest.Encode()); } + void CheckFullHistoryTsLow(ColumnFamilyData* cfd, + const std::string& expected_full_history_ts_low) { + ASSERT_EQ(expected_full_history_ts_low, cfd->GetFullHistoryTsLow()); + } }; TEST_P(FlushJobTimestampTest, AllKeysExpired) { @@ -684,6 +688,7 @@ TEST_P(FlushJobTimestampTest, AllKeysExpired) { EventLogger event_logger(db_options_.info_log.get()); std::string full_history_ts_low; PutFixed64(&full_history_ts_low, std::numeric_limits::max()); + cfd->SetFullHistoryTsLow(full_history_ts_low); FlushJob flush_job( dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), std::numeric_limits::max() /* memtable_id */, env_options_, @@ -714,6 +719,7 @@ TEST_P(FlushJobTimestampTest, AllKeysExpired) { } InternalKey ikey(key, curr_seq_ - 1, ValueType::kTypeDeletionWithTimestamp); CheckFileMetaData(cfd, ikey, ikey, &fmeta); + CheckFullHistoryTsLow(cfd, full_history_ts_low); } job_context.Clean(); @@ -744,6 +750,7 @@ TEST_P(FlushJobTimestampTest, NoKeyExpired) { EventLogger event_logger(db_options_.info_log.get()); std::string full_history_ts_low; PutFixed64(&full_history_ts_low, 0); + cfd->SetFullHistoryTsLow(full_history_ts_low); FlushJob flush_job( dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), std::numeric_limits::max() /* memtable_id */, env_options_, @@ -765,6 +772,7 @@ TEST_P(FlushJobTimestampTest, NoKeyExpired) { std::string ukey = test::EncodeInt(0); std::string smallest_key; std::string largest_key; + std::string expected_full_history_ts_low; if (!persist_udt_) { // When `AdvancedColumnFamilyOptions.persist_user_defined_timestamps` flag // is set to false. The user-defined timestamp is stripped from user key @@ -772,14 +780,21 @@ TEST_P(FlushJobTimestampTest, NoKeyExpired) { // timestamp, which is hardcoded to be all zeros for now. smallest_key = ukey + test::EncodeInt(0); largest_key = ukey + test::EncodeInt(0); + // When not all keys have expired and `persist_user_defined_timestamps` is + // false. UDTs will be removed during flush, `full_history_ts_low` should + // be automatically increased to above the effective cutoff UDT in the + // flush. + PutFixed64(&expected_full_history_ts_low, curr_ts_.fetch_add(1)); } else { smallest_key = ukey + test::EncodeInt(curr_ts_.load(std::memory_order_relaxed) - 1); largest_key = ukey + test::EncodeInt(kStartTs); + expected_full_history_ts_low = full_history_ts_low; } InternalKey smallest(smallest_key, curr_seq_ - 1, ValueType::kTypeValue); InternalKey largest(largest_key, kStartSeq, ValueType::kTypeValue); CheckFileMetaData(cfd, smallest, largest, &fmeta); + CheckFullHistoryTsLow(cfd, expected_full_history_ts_low); } job_context.Clean(); ASSERT_TRUE(to_delete.empty()); diff --git a/db/memtable.cc b/db/memtable.cc index dfef13a15..216bb8d6e 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -143,6 +143,10 @@ MemTable::MemTable(const InternalKeyComparator& cmp, new_cache.get()), std::memory_order_relaxed); } + const Comparator* ucmp = cmp.user_comparator(); + assert(ucmp); + ts_sz_ = ucmp->timestamp_size(); + persist_user_defined_timestamps_ = ioptions.persist_user_defined_timestamps; } MemTable::~MemTable() { @@ -357,7 +361,8 @@ class MemTableIterator : public InternalIterator { !mem.GetImmutableMemTableOptions()->inplace_update_support), protection_bytes_per_key_(mem.moptions_.protection_bytes_per_key), status_(Status::OK()), - logger_(mem.moptions_.info_log) { + logger_(mem.moptions_.info_log), + ts_sz_(mem.ts_sz_) { if (use_range_del_table) { iter_ = mem.range_del_table_->GetIterator(arena); } else if (prefix_extractor_ != nullptr && !read_options.total_order_seek && @@ -400,8 +405,7 @@ class MemTableIterator : public InternalIterator { PERF_COUNTER_ADD(seek_on_memtable_count, 1); if (bloom_) { // iterator should only use prefix bloom filter - auto ts_sz = comparator_.comparator.user_comparator()->timestamp_size(); - Slice user_k_without_ts(ExtractUserKeyAndStripTimestamp(k, ts_sz)); + Slice user_k_without_ts(ExtractUserKeyAndStripTimestamp(k, ts_sz_)); if (prefix_extractor_->InDomain(user_k_without_ts)) { if (!bloom_->MayContain( prefix_extractor_->Transform(user_k_without_ts))) { @@ -421,8 +425,7 @@ class MemTableIterator : public InternalIterator { PERF_TIMER_GUARD(seek_on_memtable_time); PERF_COUNTER_ADD(seek_on_memtable_count, 1); if (bloom_) { - auto ts_sz = comparator_.comparator.user_comparator()->timestamp_size(); - Slice user_k_without_ts(ExtractUserKeyAndStripTimestamp(k, ts_sz)); + Slice user_k_without_ts(ExtractUserKeyAndStripTimestamp(k, ts_sz_)); if (prefix_extractor_->InDomain(user_k_without_ts)) { if (!bloom_->MayContain( prefix_extractor_->Transform(user_k_without_ts))) { @@ -512,6 +515,7 @@ class MemTableIterator : public InternalIterator { uint32_t protection_bytes_per_key_; Status status_; Logger* logger_; + size_t ts_sz_; void VerifyEntryChecksum() { if (protection_bytes_per_key_ > 0 && Valid()) { @@ -625,8 +629,7 @@ Status MemTable::VerifyEncodedEntry(Slice encoded, if (!GetVarint32(&encoded, &ikey_len)) { return Status::Corruption("Unable to parse internal key length"); } - size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size(); - if (ikey_len < 8 + ts_sz) { + if (ikey_len < 8 + ts_sz_) { return Status::Corruption("Internal key length too short"); } if (ikey_len > encoded.size()) { @@ -725,8 +728,7 @@ Status MemTable::Add(SequenceNumber s, ValueType type, } } - size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size(); - Slice key_without_ts = StripTimestampFromUserKey(key, ts_sz); + Slice key_without_ts = StripTimestampFromUserKey(key, ts_sz_); if (!allow_concurrent) { // Extract prefix for insert with hint. @@ -776,6 +778,9 @@ Status MemTable::Add(SequenceNumber s, ValueType type, assert(first_seqno_.load() >= earliest_seqno_.load()); } assert(post_process_info == nullptr); + // TODO(yuzhangyu): support updating newest UDT for when `allow_concurrent` + // is true. + MaybeUpdateNewestUDT(key_slice); UpdateFlushState(); } else { bool res = (hint == nullptr) @@ -1286,8 +1291,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, bool found_final_value = false; bool merge_in_progress = s->IsMergeInProgress(); bool may_contain = true; - size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size(); - Slice user_key_without_ts = StripTimestampFromUserKey(key.user_key(), ts_sz); + Slice user_key_without_ts = StripTimestampFromUserKey(key.user_key(), ts_sz_); bool bloom_checked = false; if (bloom_filter_) { // when both memtable_whole_key_filtering and prefix_extractor_ are set, @@ -1672,4 +1676,22 @@ uint64_t MemTable::GetMinLogContainingPrepSection() { return min_prep_log_referenced_.load(); } +void MemTable::MaybeUpdateNewestUDT(const Slice& user_key) { + if (ts_sz_ == 0 || persist_user_defined_timestamps_) { + return; + } + const Comparator* ucmp = GetInternalKeyComparator().user_comparator(); + Slice udt = ExtractTimestampFromUserKey(user_key, ts_sz_); + if (newest_udt_.empty() || ucmp->CompareTimestamp(udt, newest_udt_) > 0) { + newest_udt_ = udt; + } +} + +const Slice& MemTable::GetNewestUDT() const { + // This path should not be invoked for MemTables that does not enable the UDT + // in Memtable only feature. + assert(ts_sz_ > 0 && !persist_user_defined_timestamps_); + return newest_udt_; +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/memtable.h b/db/memtable.h index a461d908b..bfe882b0e 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -353,6 +353,10 @@ class MemTable { return data_size_.load(std::memory_order_relaxed); } + size_t write_buffer_size() const { + return write_buffer_size_.load(std::memory_order_relaxed); + } + // Dynamically change the memtable's capacity. If set below the current usage, // the next key added will trigger a flush. Can only increase size when // memtable prefix bloom is disabled, since we can't easily allocate more @@ -527,6 +531,14 @@ class MemTable { } } + // Get the newest user-defined timestamp contained in this MemTable. Check + // `newest_udt_` for what newer means. This method should only be invoked for + // an MemTable that has enabled user-defined timestamp feature and set + // `persist_user_defined_timestamps` to false. The tracked newest UDT will be + // used by flush job in the background to help check the MemTable's + // eligibility for Flush. + const Slice& GetNewestUDT() const; + // Returns Corruption status if verification fails. static Status VerifyEntryChecksum(const char* entry, uint32_t protection_bytes_per_key, @@ -617,6 +629,19 @@ class MemTable { // Flush job info of the current memtable. std::unique_ptr flush_job_info_; + // Size in bytes for the user-defined timestamps. + size_t ts_sz_; + + // Whether to persist user-defined timestamps + bool persist_user_defined_timestamps_; + + // Newest user-defined timestamp contained in this MemTable. For ts1, and ts2 + // if Comparator::CompareTimestamp(ts1, ts2) > 0, ts1 is considered newer than + // ts2. We track this field for a MemTable if its column family has UDT + // feature enabled and the `persist_user_defined_timestamp` flag is false. + // Otherwise, this field just contains an empty Slice. + Slice newest_udt_; + // Updates flush_state_ using ShouldFlushNow() void UpdateFlushState(); @@ -653,6 +678,8 @@ class MemTable { void UpdateEntryChecksum(const ProtectionInfoKVOS64* kv_prot_info, const Slice& key, const Slice& value, ValueType type, SequenceNumber s, char* checksum_ptr); + + void MaybeUpdateNewestUDT(const Slice& user_key); }; extern const char* EncodeKey(std::string* scratch, const Slice& target); diff --git a/db/memtable_list.h b/db/memtable_list.h index 1ad28a59e..e95493b6f 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -382,6 +382,25 @@ class MemTableList { return memlist.front()->GetID(); } + // DB mutex held. + // Gets the newest user-defined timestamp for the Memtables in ascending ID + // order, up to the `max_memtable_id`. Used by background flush job + // to check Memtables' eligibility for flush w.r.t retaining UDTs. + std::vector GetTablesNewestUDT(uint64_t max_memtable_id) { + std::vector newest_udts; + auto& memlist = current_->memlist_; + // Iterating through the memlist starting at the end, the vector + // ret is filled with memtables already sorted in increasing MemTable ID. + for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { + MemTable* m = *it; + if (m->GetID() > max_memtable_id) { + break; + } + newest_udts.push_back(m->GetNewestUDT()); + } + return newest_udts; + } + void AssignAtomicFlushSeq(const SequenceNumber& seq) { const auto& memlist = current_->memlist_; // Scan the memtable list from new to old diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index c63952b12..dfa1dbfc7 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -43,6 +43,9 @@ class MemTableListTest : public testing::Test { // Open DB only with default column family ColumnFamilyOptions cf_options; std::vector cf_descs; + if (udt_enabled_) { + cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + } cf_descs.emplace_back(kDefaultColumnFamilyName, cf_options); Status s = DB::Open(options, dbname, cf_descs, &handles, &db); EXPECT_OK(s); @@ -200,6 +203,9 @@ class MemTableListTest : public testing::Test { nullptr /* prep_tracker */, &mutex, file_meta_ptrs, committed_flush_jobs_info, to_delete, nullptr, &log_buffer); } + + protected: + bool udt_enabled_ = false; }; TEST_F(MemTableListTest, Empty) { @@ -868,7 +874,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { to_delete.clear(); } -TEST_F(MemTableListTest, EmptyAtomicFlusTest) { +TEST_F(MemTableListTest, EmptyAtomicFlushTest) { autovector lists; autovector cf_ids; autovector options_list; @@ -880,7 +886,7 @@ TEST_F(MemTableListTest, EmptyAtomicFlusTest) { ASSERT_TRUE(to_delete.empty()); } -TEST_F(MemTableListTest, AtomicFlusTest) { +TEST_F(MemTableListTest, AtomicFlushTest) { const int num_cfs = 3; const int num_tables_per_cf = 2; SequenceNumber seq = 1; @@ -1028,6 +1034,86 @@ TEST_F(MemTableListTest, AtomicFlusTest) { } } +class MemTableListWithTimestampTest : public MemTableListTest { + public: + MemTableListWithTimestampTest() : MemTableListTest() {} + + void SetUp() override { udt_enabled_ = true; } +}; + +TEST_F(MemTableListWithTimestampTest, GetTableNewestUDT) { + const int num_tables = 3; + const int num_entries = 5; + SequenceNumber seq = 1; + + auto factory = std::make_shared(); + options.memtable_factory = factory; + options.persist_user_defined_timestamps = false; + ImmutableOptions ioptions(options); + const Comparator* ucmp = test::BytewiseComparatorWithU64TsWrapper(); + InternalKeyComparator cmp(ucmp); + WriteBufferManager wb(options.db_write_buffer_size); + + // Create MemTableList + int min_write_buffer_number_to_merge = 1; + int max_write_buffer_number_to_maintain = 4; + int64_t max_write_buffer_size_to_maintain = + 4 * static_cast(options.write_buffer_size); + MemTableList list(min_write_buffer_number_to_merge, + max_write_buffer_number_to_maintain, + max_write_buffer_size_to_maintain); + + // Create some MemTables + uint64_t memtable_id = 0; + std::vector tables; + MutableCFOptions mutable_cf_options(options); + uint64_t current_ts = 0; + autovector to_delete; + std::vector newest_udts; + + std::string key; + std::string write_ts; + for (int i = 0; i < num_tables; i++) { + MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb, + kMaxSequenceNumber, 0 /* column_family_id */); + mem->SetID(memtable_id++); + mem->Ref(); + + std::string value; + MergeContext merge_context; + + for (int j = 0; j < num_entries; j++) { + key = "key1"; + write_ts.clear(); + PutFixed64(&write_ts, current_ts); + key.append(write_ts); + ASSERT_OK(mem->Add(++seq, kTypeValue, key, std::to_string(i), + nullptr /* kv_prot_info */)); + current_ts++; + } + + tables.push_back(mem); + list.Add(tables.back(), &to_delete); + newest_udts.push_back(write_ts); + } + + ASSERT_EQ(num_tables, list.NumNotFlushed()); + ASSERT_TRUE(list.IsFlushPending()); + std::vector tables_newest_udts = list.GetTablesNewestUDT(num_tables); + ASSERT_EQ(newest_udts.size(), tables_newest_udts.size()); + for (size_t i = 0; i < tables_newest_udts.size(); i++) { + const Slice& table_newest_udt = tables_newest_udts[i]; + const Slice expected_newest_udt = newest_udts[i]; + ASSERT_EQ(expected_newest_udt, table_newest_udt); + } + + list.current()->Unref(&to_delete); + for (MemTable* m : to_delete) { + delete m; + } + to_delete.clear(); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/repair_test.cc b/db/repair_test.cc index 8cca48424..e8cc40aab 100644 --- a/db/repair_test.cc +++ b/db/repair_test.cc @@ -365,12 +365,15 @@ TEST_P(RepairTestWithTimestamp, UnflushedSst) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; - std::string min_ts = Timestamp(0, 0); - std::string write_ts = Timestamp(1, 0); - const size_t kTimestampSize = write_ts.size(); - TestComparator test_cmp(kTimestampSize); - options.comparator = &test_cmp; + std::string min_ts; + std::string write_ts; + PutFixed64(&min_ts, 0); + PutFixed64(&write_ts, 1); + options.comparator = test::BytewiseComparatorWithU64TsWrapper(); options.persist_user_defined_timestamps = persist_udt; + if (!persist_udt) { + options.allow_concurrent_memtable_write = false; + } options.paranoid_file_checks = paranoid_file_checks; ColumnFamilyOptions cf_options(options); diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 31bea00f3..5be134a9d 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -1155,10 +1155,20 @@ struct AdvancedColumnFamilyOptions { // while set this flag to be `false`: user keys in the newly generated SST // files are of the same format as the existing SST files. // + // Currently only user comparator that formats user-defined timesamps as + // uint64_t via using one of the RocksDB provided comparator + // `ComparatorWithU64TsImpl` are supported. + // // When setting this flag to `false`, users should also call // `DB::IncreaseFullHistoryTsLow` to set a cutoff timestamp for flush. RocksDB // refrains from flushing a memtable with data still above - // the cutoff timestamp with best effort. Users can do user-defined + // the cutoff timestamp with best effort. If this cutoff timestamp is not set, + // flushing continues normally. + // NOTE: in order for the cutoff timestamp to work properly, users of this + // feature need to ensure to write to a column family with globally + // non-decreasing user-defined timestamps. + // + // Users can do user-defined // multi-versioned read above the cutoff timestamp. When users try to read // below the cutoff timestamp, an error will be returned. // @@ -1169,6 +1179,10 @@ struct AdvancedColumnFamilyOptions { // downgrade or toggling on / off the user-defined timestamp feature on a // column family. // + // Note that setting this flag to false is not supported in combination with + // atomic flush, or concurrent memtable write enabled by + // `allow_concurrent_memtable_write`. + // // Default: true (user-defined timestamps are persisted) // Not dynamically changeable, change it requires db restart and // only compatible changes are allowed.