|
|
|
@ -727,7 +727,8 @@ Status DBImpl::CompactFiles(const CompactionOptions& compact_options, |
|
|
|
|
ColumnFamilyHandle* column_family, |
|
|
|
|
const std::vector<std::string>& input_file_names, |
|
|
|
|
const int output_level, const int output_path_id, |
|
|
|
|
std::vector<std::string>* const output_file_names) { |
|
|
|
|
std::vector<std::string>* const output_file_names, |
|
|
|
|
CompactionJobInfo* compaction_job_info) { |
|
|
|
|
#ifdef ROCKSDB_LITE |
|
|
|
|
(void)compact_options; |
|
|
|
|
(void)column_family; |
|
|
|
@ -735,6 +736,7 @@ Status DBImpl::CompactFiles(const CompactionOptions& compact_options, |
|
|
|
|
(void)output_level; |
|
|
|
|
(void)output_path_id; |
|
|
|
|
(void)output_file_names; |
|
|
|
|
(void)compaction_job_info; |
|
|
|
|
// not supported in lite version
|
|
|
|
|
return Status::NotSupported("Not supported in ROCKSDB LITE"); |
|
|
|
|
#else |
|
|
|
@ -766,7 +768,7 @@ Status DBImpl::CompactFiles(const CompactionOptions& compact_options, |
|
|
|
|
|
|
|
|
|
s = CompactFilesImpl(compact_options, cfd, current, input_file_names, |
|
|
|
|
output_file_names, output_level, output_path_id, |
|
|
|
|
&job_context, &log_buffer); |
|
|
|
|
&job_context, &log_buffer, compaction_job_info); |
|
|
|
|
|
|
|
|
|
current->Unref(); |
|
|
|
|
} |
|
|
|
@ -806,7 +808,8 @@ Status DBImpl::CompactFilesImpl( |
|
|
|
|
const CompactionOptions& compact_options, ColumnFamilyData* cfd, |
|
|
|
|
Version* version, const std::vector<std::string>& input_file_names, |
|
|
|
|
std::vector<std::string>* const output_file_names, const int output_level, |
|
|
|
|
int output_path_id, JobContext* job_context, LogBuffer* log_buffer) { |
|
|
|
|
int output_path_id, JobContext* job_context, LogBuffer* log_buffer, |
|
|
|
|
CompactionJobInfo* compaction_job_info) { |
|
|
|
|
mutex_.AssertHeld(); |
|
|
|
|
|
|
|
|
|
if (shutting_down_.load(std::memory_order_acquire)) { |
|
|
|
@ -892,6 +895,7 @@ Status DBImpl::CompactFilesImpl( |
|
|
|
|
snapshot_checker = DisableGCSnapshotChecker::Instance(); |
|
|
|
|
} |
|
|
|
|
assert(is_snapshot_supported_ || snapshots_.empty()); |
|
|
|
|
CompactionJobStats compaction_job_stats; |
|
|
|
|
CompactionJob compaction_job( |
|
|
|
|
job_context->job_id, c.get(), immutable_db_options_, |
|
|
|
|
env_options_for_compaction_, versions_.get(), &shutting_down_, |
|
|
|
@ -901,19 +905,7 @@ Status DBImpl::CompactFilesImpl( |
|
|
|
|
snapshot_checker, table_cache_, &event_logger_, |
|
|
|
|
c->mutable_cf_options()->paranoid_file_checks, |
|
|
|
|
c->mutable_cf_options()->report_bg_io_stats, dbname_, |
|
|
|
|
nullptr); // Here we pass a nullptr for CompactionJobStats because
|
|
|
|
|
// CompactFiles does not trigger OnCompactionCompleted(),
|
|
|
|
|
// which is the only place where CompactionJobStats is
|
|
|
|
|
// returned. The idea of not triggering OnCompationCompleted()
|
|
|
|
|
// is that CompactFiles runs in the caller thread, so the user
|
|
|
|
|
// should always know when it completes. As a result, it makes
|
|
|
|
|
// less sense to notify the users something they should already
|
|
|
|
|
// know.
|
|
|
|
|
//
|
|
|
|
|
// In the future, if we would like to add CompactionJobStats
|
|
|
|
|
// support for CompactFiles, we should have CompactFiles API
|
|
|
|
|
// pass a pointer of CompactionJobStats as the out-value
|
|
|
|
|
// instead of using EventListener.
|
|
|
|
|
&compaction_job_stats); |
|
|
|
|
|
|
|
|
|
// Creating a compaction influences the compaction score because the score
|
|
|
|
|
// takes running compactions into account (by skipping files that are already
|
|
|
|
@ -950,6 +942,11 @@ Status DBImpl::CompactFilesImpl( |
|
|
|
|
|
|
|
|
|
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); |
|
|
|
|
|
|
|
|
|
if (compaction_job_info != nullptr) { |
|
|
|
|
BuildCompactionJobInfo(cfd, c.get(), s, compaction_job_stats, |
|
|
|
|
job_context->job_id, version, compaction_job_info); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (status.ok()) { |
|
|
|
|
// Done
|
|
|
|
|
} else if (status.IsShutdownInProgress()) { |
|
|
|
@ -1092,36 +1089,8 @@ void DBImpl::NotifyOnCompactionCompleted( |
|
|
|
|
TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex"); |
|
|
|
|
{ |
|
|
|
|
CompactionJobInfo info; |
|
|
|
|
info.cf_id = cfd->GetID(); |
|
|
|
|
info.cf_name = cfd->GetName(); |
|
|
|
|
info.status = st; |
|
|
|
|
info.thread_id = env_->GetThreadID(); |
|
|
|
|
info.job_id = job_id; |
|
|
|
|
info.base_input_level = c->start_level(); |
|
|
|
|
info.output_level = c->output_level(); |
|
|
|
|
info.stats = compaction_job_stats; |
|
|
|
|
info.table_properties = c->GetOutputTableProperties(); |
|
|
|
|
info.compaction_reason = c->compaction_reason(); |
|
|
|
|
info.compression = c->output_compression(); |
|
|
|
|
for (size_t i = 0; i < c->num_input_levels(); ++i) { |
|
|
|
|
for (const auto fmd : *c->inputs(i)) { |
|
|
|
|
auto fn = TableFileName(c->immutable_cf_options()->cf_paths, |
|
|
|
|
fmd->fd.GetNumber(), fmd->fd.GetPathId()); |
|
|
|
|
info.input_files.push_back(fn); |
|
|
|
|
if (info.table_properties.count(fn) == 0) { |
|
|
|
|
std::shared_ptr<const TableProperties> tp; |
|
|
|
|
auto s = current->GetTableProperties(&tp, fmd, &fn); |
|
|
|
|
if (s.ok()) { |
|
|
|
|
info.table_properties[fn] = tp; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
for (const auto newf : c->edit()->GetNewFiles()) { |
|
|
|
|
info.output_files.push_back(TableFileName( |
|
|
|
|
c->immutable_cf_options()->cf_paths, newf.second.fd.GetNumber(), |
|
|
|
|
newf.second.fd.GetPathId())); |
|
|
|
|
} |
|
|
|
|
BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, current, |
|
|
|
|
&info); |
|
|
|
|
for (auto listener : immutable_db_options_.listeners) { |
|
|
|
|
listener->OnCompactionCompleted(this, info); |
|
|
|
|
} |
|
|
|
@ -2762,6 +2731,45 @@ bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) { |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#ifndef ROCKSDB_LITE |
|
|
|
|
void DBImpl::BuildCompactionJobInfo( |
|
|
|
|
const ColumnFamilyData* cfd, Compaction* c, const Status& st, |
|
|
|
|
const CompactionJobStats& compaction_job_stats, const int job_id, |
|
|
|
|
const Version* current, CompactionJobInfo* compaction_job_info) const { |
|
|
|
|
assert(compaction_job_info != nullptr); |
|
|
|
|
compaction_job_info->cf_id = cfd->GetID(); |
|
|
|
|
compaction_job_info->cf_name = cfd->GetName(); |
|
|
|
|
compaction_job_info->status = st; |
|
|
|
|
compaction_job_info->thread_id = env_->GetThreadID(); |
|
|
|
|
compaction_job_info->job_id = job_id; |
|
|
|
|
compaction_job_info->base_input_level = c->start_level(); |
|
|
|
|
compaction_job_info->output_level = c->output_level(); |
|
|
|
|
compaction_job_info->stats = compaction_job_stats; |
|
|
|
|
compaction_job_info->table_properties = c->GetOutputTableProperties(); |
|
|
|
|
compaction_job_info->compaction_reason = c->compaction_reason(); |
|
|
|
|
compaction_job_info->compression = c->output_compression(); |
|
|
|
|
for (size_t i = 0; i < c->num_input_levels(); ++i) { |
|
|
|
|
for (const auto fmd : *c->inputs(i)) { |
|
|
|
|
auto fn = TableFileName(c->immutable_cf_options()->cf_paths, |
|
|
|
|
fmd->fd.GetNumber(), fmd->fd.GetPathId()); |
|
|
|
|
compaction_job_info->input_files.push_back(fn); |
|
|
|
|
if (compaction_job_info->table_properties.count(fn) == 0) { |
|
|
|
|
shared_ptr<const TableProperties> tp; |
|
|
|
|
auto s = current->GetTableProperties(&tp, fmd, &fn); |
|
|
|
|
if (s.ok()) { |
|
|
|
|
compaction_job_info->table_properties[fn] = tp; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
for (const auto& newf : c->edit()->GetNewFiles()) { |
|
|
|
|
compaction_job_info->output_files.push_back( |
|
|
|
|
TableFileName(c->immutable_cf_options()->cf_paths, |
|
|
|
|
newf.second.fd.GetNumber(), newf.second.fd.GetPathId())); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
// SuperVersionContext gets created and destructed outside of the lock --
|
|
|
|
|
// we use this conveniently to:
|
|
|
|
|
// * malloc one SuperVersion() outside of the lock -- new_superversion
|
|
|
|
|