Return try again when full_history_ts_low is higher than requested ts (#10126)

Summary:
This PR helps handle the race condition mentioned in this comment thread: https://github.com/facebook/rocksdb/pull/7884#discussion_r572402281 In case where actual full_history_ts_low is higher than the user's requested ts, return a try again message so they don't have the misconception that data between [ts, full_history_ts_low) is kept.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10126

Test Plan:
```
$COMPILE_WITH_ASAN=1 make -j24 all
$./db_with_timestamp_basic_test --gtest_filter=UpdateFullHistoryTsLowTest.ConcurrentUpdate
$ make -j24 check
```

Reviewed By: riversand963

Differential Revision: D37055368

Pulled By: jowlyzhang

fbshipit-source-id: 787fd0984a246540fa03ac227b1d232590d27828
main
Yu Zhang 3 years ago committed by Facebook GitHub Bot
parent 5fa6ef7f18
commit 693dffd8e8
  1. 23
      db/db_impl/db_impl_compaction_flush.cc
  2. 42
      db/db_with_timestamp_basic_test.cc
  3. 2
      include/rocksdb/db.h

@ -952,6 +952,8 @@ Status DBImpl::IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd,
VersionEdit edit; VersionEdit edit;
edit.SetColumnFamily(cfd->GetID()); edit.SetColumnFamily(cfd->GetID());
edit.SetFullHistoryTsLow(ts_low); edit.SetFullHistoryTsLow(ts_low);
TEST_SYNC_POINT_CALLBACK("DBImpl::IncreaseFullHistoryTsLowImpl:BeforeEdit",
&edit);
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
std::string current_ts_low = cfd->GetFullHistoryTsLow(); std::string current_ts_low = cfd->GetFullHistoryTsLow();
@ -959,12 +961,25 @@ Status DBImpl::IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd,
assert(ucmp->timestamp_size() == ts_low.size() && !ts_low.empty()); assert(ucmp->timestamp_size() == ts_low.size() && !ts_low.empty());
if (!current_ts_low.empty() && if (!current_ts_low.empty() &&
ucmp->CompareTimestamp(ts_low, current_ts_low) < 0) { ucmp->CompareTimestamp(ts_low, current_ts_low) < 0) {
return Status::InvalidArgument( return Status::InvalidArgument("Cannot decrease full_history_ts_low");
"Cannot decrease full_history_timestamp_low");
} }
return versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, Status s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&mutex_); &edit, &mutex_);
if (!s.ok()) {
return s;
}
current_ts_low = cfd->GetFullHistoryTsLow();
if (!current_ts_low.empty() &&
ucmp->CompareTimestamp(current_ts_low, ts_low) > 0) {
std::stringstream oss;
oss << "full_history_ts_low: " << Slice(current_ts_low).ToString(true)
<< " is set to be higher than the requested "
"timestamp: "
<< Slice(ts_low).ToString(true) << std::endl;
return Status::TryAgain(oss.str());
}
return Status::OK();
} }
Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options, Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,

@ -3022,6 +3022,48 @@ INSTANTIATE_TEST_CASE_P(
BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey))); BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey)));
#endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
class UpdateFullHistoryTsLowTest : public DBBasicTestWithTimestampBase {
public:
UpdateFullHistoryTsLowTest()
: DBBasicTestWithTimestampBase("/update_full_history_ts_low_test") {}
};
TEST_F(UpdateFullHistoryTsLowTest, ConcurrentUpdate) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
std::string lower_ts_low = Timestamp(10, 0);
std::string higher_ts_low = Timestamp(25, 0);
const size_t kTimestampSize = lower_ts_low.size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
// This workaround swaps `lower_ts_low` originally used for update by the
// caller to `higher_ts_low` after its writer is queued to make sure
// the caller will always get a TryAgain error.
// It mimics cases where two threads update full_history_ts_low concurrently
// with one thread writing a higher ts_low and one thread writing a lower
// ts_low.
VersionEdit* version_edit;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::IncreaseFullHistoryTsLowImpl:BeforeEdit",
[&](void* arg) { version_edit = reinterpret_cast<VersionEdit*>(arg); });
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::LogAndApply:BeforeWriterWaiting",
[&](void* /*arg*/) { version_edit->SetFullHistoryTsLow(higher_ts_low); });
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_TRUE(
db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(), lower_ts_low)
.IsTryAgain());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -1421,6 +1421,8 @@ class DB {
// Increase the full_history_ts of column family. The new ts_low value should // Increase the full_history_ts of column family. The new ts_low value should
// be newer than current full_history_ts value. // be newer than current full_history_ts value.
// If another thread updates full_history_ts_low concurrently to a higher
// timestamp than the requested ts_low, a try again error will be returned.
virtual Status IncreaseFullHistoryTsLow(ColumnFamilyHandle* column_family, virtual Status IncreaseFullHistoryTsLow(ColumnFamilyHandle* column_family,
std::string ts_low) = 0; std::string ts_low) = 0;

Loading…
Cancel
Save