Update VersionSet last seqno after LogAndApply (#10051)

Summary:
This PR fixes the issue of unstable snapshot during external SST file ingestion. Credit ajkr for the following walk through:  consider these relevant steps for of IngestExternalFile():

(1) increase seqno while holding mutex -- 677d2b4a8f/db/db_impl/db_impl.cc (L4768)
(2) LogAndApply() -- 677d2b4a8f/db/db_impl/db_impl.cc (L4797-L4798)
  (a) write to MANIFEST with mutex released a96a4a2f7b/db/version_set.cc (L4407)
  (b) apply to in-memory state with mutex held

A snapshot taken during (2a) will be unstable. In particular, queries against that snapshot will not include data from the ingested file before (2b), and will include data from the ingested file after (2b).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10051

Test Plan:
Added a new unit test: `ExternalSSTFileBasicTest.WriteAfterReopenStableSnapshotWhileLoggingToManifest`.
```
make external_sst_file_basic_test
./external_sst_file_basic_test
```

Reviewed By: ajkr

Differential Revision: D36654033

Pulled By: cbi42

fbshipit-source-id: bf720cca313e0cf211585960f3aff04853a31b96
main
Changyu Bi 2 years ago committed by Facebook GitHub Bot
parent b71466e982
commit b0e190604b
  1. 34
      db/db_impl/db_impl.cc
  2. 57
      db/external_sst_file_basic_test.cc

@ -4809,19 +4809,6 @@ Status DBImpl::IngestExternalFiles(
}
}
if (status.ok()) {
int consumed_seqno_count =
ingestion_jobs[0].ConsumedSequenceNumbersCount();
for (size_t i = 1; i != num_cfs; ++i) {
consumed_seqno_count =
std::max(consumed_seqno_count,
ingestion_jobs[i].ConsumedSequenceNumbersCount());
}
if (consumed_seqno_count > 0) {
const SequenceNumber last_seqno = versions_->LastSequence();
versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count);
versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count);
versions_->SetLastSequence(last_seqno + consumed_seqno_count);
}
autovector<ColumnFamilyData*> cfds_to_commit;
autovector<const MutableCFOptions*> mutable_cf_options_list;
autovector<autovector<VersionEdit*>> edit_lists;
@ -4851,6 +4838,27 @@ Status DBImpl::IngestExternalFiles(
status =
versions_->LogAndApply(cfds_to_commit, mutable_cf_options_list,
edit_lists, &mutex_, directories_.GetDbDir());
// It is safe to update VersionSet last seqno here after LogAndApply since
// LogAndApply persists last sequence number from VersionEdits,
// which are from file's largest seqno and not from VersionSet.
//
// It is necessary to update last seqno here since LogAndApply releases
// mutex when persisting MANIFEST file, and the snapshots taken during
// that period will not be stable if VersionSet last seqno is updated
// before LogAndApply.
int consumed_seqno_count =
ingestion_jobs[0].ConsumedSequenceNumbersCount();
for (size_t i = 1; i != num_cfs; ++i) {
consumed_seqno_count =
std::max(consumed_seqno_count,
ingestion_jobs[i].ConsumedSequenceNumbersCount());
}
if (consumed_seqno_count > 0) {
const SequenceNumber last_seqno = versions_->LastSequence();
versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count);
versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count);
versions_->SetLastSequence(last_seqno + consumed_seqno_count);
}
}
if (status.ok()) {

@ -1923,6 +1923,63 @@ TEST_F(ExternalSSTFileBasicTest, VerifySstUniqueId) {
ASSERT_EQ(skipped, 1);
}
TEST_F(ExternalSSTFileBasicTest, StableSnapshotWhileLoggingToManifest) {
const std::string kPutVal = "put_val";
const std::string kIngestedVal = "ingested_val";
ASSERT_OK(Put("k", kPutVal, WriteOptions()));
ASSERT_OK(Flush());
std::string external_file = sst_files_dir_ + "/file_to_ingest.sst";
{
SstFileWriter sst_file_writer{EnvOptions(), CurrentOptions()};
ASSERT_OK(sst_file_writer.Open(external_file));
ASSERT_OK(sst_file_writer.Put("k", kIngestedVal));
ASSERT_OK(sst_file_writer.Finish());
}
const Snapshot* snapshot = nullptr;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"VersionSet::LogAndApply:WriteManifest", [&](void* /* arg */) {
// Prevent memory leak: this callback may be called multiple times
// and previous snapshot need to be freed
db_->ReleaseSnapshot(snapshot);
snapshot = db_->GetSnapshot();
ReadOptions read_opts;
read_opts.snapshot = snapshot;
std::string value;
ASSERT_OK(db_->Get(read_opts, "k", &value));
ASSERT_EQ(kPutVal, value);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db_->IngestExternalFile(db_->DefaultColumnFamily(), {external_file},
IngestExternalFileOptions()));
auto ingested_file_seqno = db_->GetLatestSequenceNumber();
ASSERT_NE(nullptr, snapshot);
// snapshot is taken before SST ingestion is done
ASSERT_EQ(ingested_file_seqno, snapshot->GetSequenceNumber() + 1);
ReadOptions read_opts;
read_opts.snapshot = snapshot;
std::string value;
ASSERT_OK(db_->Get(read_opts, "k", &value));
ASSERT_EQ(kPutVal, value);
db_->ReleaseSnapshot(snapshot);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
// After reopen, sequence number should be up current such that
// ingested value is read
Reopen(CurrentOptions());
ASSERT_OK(db_->Get(ReadOptions(), "k", &value));
ASSERT_EQ(kIngestedVal, value);
// New write should get higher seqno compared to ingested file
ASSERT_OK(Put("k", kPutVal, WriteOptions()));
ASSERT_EQ(db_->GetLatestSequenceNumber(), ingested_file_seqno + 1);
}
INSTANTIATE_TEST_CASE_P(ExternalSSTFileBasicTest, ExternalSSTFileBasicTest,
testing::Values(std::make_tuple(true, true),
std::make_tuple(true, false),

Loading…
Cancel
Save