diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index c84a745a6..16d8c8f35 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -952,6 +952,8 @@ Status DBImpl::IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd, VersionEdit edit; edit.SetColumnFamily(cfd->GetID()); edit.SetFullHistoryTsLow(ts_low); + TEST_SYNC_POINT_CALLBACK("DBImpl::IncreaseFullHistoryTsLowImpl:BeforeEdit", + &edit); InstrumentedMutexLock l(&mutex_); std::string current_ts_low = cfd->GetFullHistoryTsLow(); @@ -959,12 +961,25 @@ Status DBImpl::IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd, assert(ucmp->timestamp_size() == ts_low.size() && !ts_low.empty()); if (!current_ts_low.empty() && ucmp->CompareTimestamp(ts_low, current_ts_low) < 0) { - return Status::InvalidArgument( - "Cannot decrease full_history_timestamp_low"); + return Status::InvalidArgument("Cannot decrease full_history_ts_low"); } - return versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, - &mutex_); + Status s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), + &edit, &mutex_); + if (!s.ok()) { + return s; + } + current_ts_low = cfd->GetFullHistoryTsLow(); + if (!current_ts_low.empty() && + ucmp->CompareTimestamp(current_ts_low, ts_low) > 0) { + std::stringstream oss; + oss << "full_history_ts_low: " << Slice(current_ts_low).ToString(true) + << " is set to be higher than the requested " + "timestamp: " + << Slice(ts_low).ToString(true) << std::endl; + return Status::TryAgain(oss.str()); + } + return Status::OK(); } Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options, diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index 57ce76f79..84fae1f10 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -3022,6 +3022,48 @@ INSTANTIATE_TEST_CASE_P( BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey))); #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) +class UpdateFullHistoryTsLowTest : public DBBasicTestWithTimestampBase { + public: + UpdateFullHistoryTsLowTest() + : DBBasicTestWithTimestampBase("/update_full_history_ts_low_test") {} +}; + +TEST_F(UpdateFullHistoryTsLowTest, ConcurrentUpdate) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + std::string lower_ts_low = Timestamp(10, 0); + std::string higher_ts_low = Timestamp(25, 0); + const size_t kTimestampSize = lower_ts_low.size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + + DestroyAndReopen(options); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + // This workaround swaps `lower_ts_low` originally used for update by the + // caller to `higher_ts_low` after its writer is queued to make sure + // the caller will always get a TryAgain error. + // It mimics cases where two threads update full_history_ts_low concurrently + // with one thread writing a higher ts_low and one thread writing a lower + // ts_low. + VersionEdit* version_edit; + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::IncreaseFullHistoryTsLowImpl:BeforeEdit", + [&](void* arg) { version_edit = reinterpret_cast(arg); }); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::LogAndApply:BeforeWriterWaiting", + [&](void* /*arg*/) { version_edit->SetFullHistoryTsLow(higher_ts_low); }); + SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_TRUE( + db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(), lower_ts_low) + .IsTryAgain()); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + Close(); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 2ea4ffbd5..378cd873a 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1421,6 +1421,8 @@ class DB { // Increase the full_history_ts of column family. The new ts_low value should // be newer than current full_history_ts value. + // If another thread updates full_history_ts_low concurrently to a higher + // timestamp than the requested ts_low, a try again error will be returned. virtual Status IncreaseFullHistoryTsLow(ColumnFamilyHandle* column_family, std::string ts_low) = 0;