Update JobContext. (#3949)

Summary:
In the past, we assume that a job modifies a single column family. Therefore, a job can create at most one superversion since each superversion corresponds to one column family. This assumption leads to the fact that a `JobContext` has only one member variable called `superversion_context`.
Now we want to support group flush of column families, indicating that each job can create multiple superversions. Therefore, we need to make the following change to accommodate this new feature.

Add a vector of `SuperVersionContext` to `JobContext` to support installing
superversions for multiple column families in one job context.

This PR is a subset of [PR 3752](https://github.com/facebook/rocksdb/pull/3752).
Pull Request resolved: https://github.com/facebook/rocksdb/pull/3949

Differential Revision: D8864895

Pulled By: riversand963

fbshipit-source-id: 5937a48817276370d3c8172db9c8aafc826d97ca
main
Yanqin Jin 6 years ago committed by Facebook Github Bot
parent 22368965a0
commit 1f802773bc
  1. 12
      db/db_impl.cc
  2. 12
      db/db_impl_compaction_flush.cc
  3. 6
      db/db_impl_experimental.cc
  4. 29
      db/job_context.h

@ -2236,9 +2236,9 @@ Status DBImpl::DeleteFile(std::string name) {
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_, directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &job_context.superversion_context,
*cfd->GetLatestMutableCFOptions(),
FlushReason::kDeleteFiles);
InstallSuperVersionAndScheduleWork(
cfd, &job_context.superversion_contexts[0],
*cfd->GetLatestMutableCFOptions(), FlushReason::kDeleteFiles);
}
FindObsoleteFiles(&job_context, false);
} // lock released here
@ -2321,9 +2321,9 @@ Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_, directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &job_context.superversion_context,
*cfd->GetLatestMutableCFOptions(),
FlushReason::kDeleteFiles);
InstallSuperVersionAndScheduleWork(
cfd, &job_context.superversion_contexts[0],
*cfd->GetLatestMutableCFOptions(), FlushReason::kDeleteFiles);
}
for (auto* deleted_file : deleted_files) {
deleted_file->being_compacted = false;

@ -160,8 +160,8 @@ Status DBImpl::FlushMemTableToOutputFile(
}
if (s.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &job_context->superversion_context,
mutable_cf_options);
InstallSuperVersionAndScheduleWork(
cfd, &job_context->superversion_contexts[0], mutable_cf_options);
if (made_progress) {
*made_progress = 1;
}
@ -691,7 +691,7 @@ Status DBImpl::CompactFilesImpl(
Status status = compaction_job.Install(*c->mutable_cf_options());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(
c->column_family_data(), &job_context->superversion_context,
c->column_family_data(), &job_context->superversion_contexts[0],
*c->mutable_cf_options(), FlushReason::kManualCompaction);
}
c->ReleaseCompactionFiles(s);
@ -1775,7 +1775,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
*c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir());
InstallSuperVersionAndScheduleWork(
c->column_family_data(), &job_context->superversion_context,
c->column_family_data(), &job_context->superversion_contexts[0],
*c->mutable_cf_options(), FlushReason::kAutoCompaction);
ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n",
c->column_family_data()->GetName().c_str(),
@ -1822,7 +1822,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
&mutex_, directories_.GetDbDir());
// Use latest MutableCFOptions
InstallSuperVersionAndScheduleWork(
c->column_family_data(), &job_context->superversion_context,
c->column_family_data(), &job_context->superversion_contexts[0],
*c->mutable_cf_options(), FlushReason::kAutoCompaction);
VersionStorageInfo::LevelSummaryStorage tmp;
@ -1899,7 +1899,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
status = compaction_job.Install(*c->mutable_cf_options());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(
c->column_family_data(), &job_context->superversion_context,
c->column_family_data(), &job_context->superversion_contexts[0],
*c->mutable_cf_options(), FlushReason::kAutoCompaction);
}
*made_progress = true;

@ -138,9 +138,9 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_, directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(
cfd, &job_context.superversion_context,
*cfd->GetLatestMutableCFOptions());
InstallSuperVersionAndScheduleWork(cfd,
&job_context.superversion_contexts[0],
*cfd->GetLatestMutableCFOptions());
}
} // lock released here
LogFlush(immutable_db_options_.info_log);

@ -35,6 +35,14 @@ struct SuperVersionContext {
explicit SuperVersionContext(bool create_superversion = false)
: new_superversion(create_superversion ? new SuperVersion() : nullptr) {}
explicit SuperVersionContext(SuperVersionContext&& other)
: superversions_to_free(std::move(other.superversions_to_free)),
#ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION
write_stall_notifications(std::move(other.write_stall_notifications)),
#endif
new_superversion(std::move(other.new_superversion)) {
}
void NewSuperVersion() {
new_superversion = unique_ptr<SuperVersion>(new SuperVersion());
}
@ -98,8 +106,15 @@ struct JobContext {
}
inline bool HaveSomethingToClean() const {
bool sv_have_sth = false;
for (const auto& sv_ctx : superversion_contexts) {
if (sv_ctx.HaveSomethingToDelete()) {
sv_have_sth = true;
break;
}
}
return memtables_to_free.size() > 0 || logs_to_free.size() > 0 ||
superversion_context.HaveSomethingToDelete();
sv_have_sth;
}
// Structure to store information for candidate files to delete.
@ -142,7 +157,8 @@ struct JobContext {
// a list of memtables to be free
autovector<MemTable*> memtables_to_free;
SuperVersionContext superversion_context;
// contexts for installing superversions for multiple column families
std::vector<SuperVersionContext> superversion_contexts;
autovector<log::Writer*> logs_to_free;
@ -158,13 +174,14 @@ struct JobContext {
size_t num_alive_log_files = 0;
uint64_t size_log_to_delete = 0;
explicit JobContext(int _job_id, bool create_superversion = false)
: superversion_context(create_superversion) {
explicit JobContext(int _job_id, bool create_superversion = false) {
job_id = _job_id;
manifest_file_number = 0;
pending_manifest_file_number = 0;
log_number = 0;
prev_log_number = 0;
superversion_contexts.emplace_back(
SuperVersionContext(create_superversion));
}
// For non-empty JobContext Clean() has to be called at least once before
@ -173,7 +190,9 @@ struct JobContext {
// doing potentially slow Clean() with locked DB mutex.
void Clean() {
// free superversions
superversion_context.Clean();
for (auto& sv_context : superversion_contexts) {
sv_context.Clean();
}
// free pending memtables
for (auto m : memtables_to_free) {
delete m;

Loading…
Cancel
Save