Delay verify compaction output table (#3979)

Summary:
Verify table will load SST into `TableCache`
it occupy memory & `TableCache`‘s capacity ...
but no logic use them
it's unnecessary ...

so , we verify them after all sub compact finished
Closes https://github.com/facebook/rocksdb/pull/3979

Differential Revision: D8389946

Pulled By: ajkr

fbshipit-source-id: 54bd4f474f9e7b3accf39c3068b1f36a27ec4c49
main
奏之章 7 years ago committed by Facebook Github Bot
parent 4faaab70a6
commit f23fed19a1
  1. 108
      db/compaction_job.cc
  2. 18
      db/version_builder.cc

@ -607,6 +607,69 @@ Status CompactionJob::Run() {
} }
} }
if (status.ok()) {
thread_pool.clear();
std::vector<const FileMetaData*> files_meta;
for (const auto& state : compact_->sub_compact_states) {
for (const auto& output : state.outputs) {
files_meta.emplace_back(&output.meta);
}
}
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
auto prefix_extractor =
compact_->compaction->mutable_cf_options()->prefix_extractor.get();
std::atomic<size_t> next_file_meta_idx(0);
auto verify_table = [&](Status& output_status) {
while (true) {
size_t file_idx = next_file_meta_idx.fetch_add(1);
if (file_idx >= files_meta.size()) {
break;
}
// Verify that the table is usable
// We set for_compaction to false and don't OptimizeForCompactionTableRead
// here because this is a special case after we finish the table building
// No matter whether use_direct_io_for_flush_and_compaction is true,
// we will regard this verification as user reads since the goal is
// to cache it here for further user reads
InternalIterator* iter = cfd->table_cache()->NewIterator(
ReadOptions(), env_options_, cfd->internal_comparator(),
files_meta[file_idx]->fd, nullptr /* range_del_agg */,
prefix_extractor, nullptr,
cfd->internal_stats()->GetFileReadHist(
compact_->compaction->output_level()),
false, nullptr /* arena */, false /* skip_filters */,
compact_->compaction->output_level());
auto s = iter->status();
if (s.ok() && paranoid_file_checks_) {
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {}
s = iter->status();
}
delete iter;
if (!s.ok()) {
output_status = s;
break;
}
}
};
for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
thread_pool.emplace_back(verify_table,
std::ref(compact_->sub_compact_states[i].status));
}
verify_table(compact_->sub_compact_states[0].status);
for (auto& thread : thread_pool) {
thread.join();
}
for (const auto& state : compact_->sub_compact_states) {
if (!state.status.ok()) {
status = state.status;
break;
}
}
}
TablePropertiesCollection tp; TablePropertiesCollection tp;
for (const auto& state : compact_->sub_compact_states) { for (const auto& state : compact_->sub_compact_states) {
for (const auto& output : state.outputs) { for (const auto& output : state.outputs) {
@ -1175,43 +1238,16 @@ Status CompactionJob::FinishCompactionOutputFile(
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
TableProperties tp; TableProperties tp;
if (s.ok() && current_entries > 0) { if (s.ok() && current_entries > 0) {
// Verify that the table is usable
// We set for_compaction to false and don't OptimizeForCompactionTableRead
// here because this is a special case after we finish the table building
// No matter whether use_direct_io_for_flush_and_compaction is true,
// we will regard this verification as user reads since the goal is
// to cache it here for further user reads
InternalIterator* iter = cfd->table_cache()->NewIterator(
ReadOptions(), env_options_, cfd->internal_comparator(), meta->fd,
nullptr /* range_del_agg */,
sub_compact->compaction->mutable_cf_options()->prefix_extractor.get(),
nullptr,
cfd->internal_stats()->GetFileReadHist(
compact_->compaction->output_level()),
false, nullptr /* arena */, false /* skip_filters */,
compact_->compaction->output_level());
s = iter->status();
if (s.ok() && paranoid_file_checks_) {
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
}
s = iter->status();
}
delete iter;
// Output to event logger and fire events. // Output to event logger and fire events.
if (s.ok()) { tp = sub_compact->builder->GetTableProperties();
tp = sub_compact->builder->GetTableProperties(); sub_compact->current_output()->table_properties =
sub_compact->current_output()->table_properties = std::make_shared<TableProperties>(tp);
std::make_shared<TableProperties>(tp); ROCKS_LOG_INFO(db_options_.info_log,
ROCKS_LOG_INFO(db_options_.info_log, "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
"[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64 " keys, %" PRIu64 " bytes%s",
" keys, %" PRIu64 " bytes%s", cfd->GetName().c_str(), job_id_, output_number,
cfd->GetName().c_str(), job_id_, output_number, current_entries, current_bytes,
current_entries, current_bytes, meta->marked_for_compaction ? " (need compaction)" : "");
meta->marked_for_compaction ? " (need compaction)" : "");
}
} }
std::string fname; std::string fname;
FileDescriptor output_fd; FileDescriptor output_fd;

@ -405,17 +405,13 @@ class VersionBuilder::Rep {
} }
}; };
if (max_threads <= 1) { std::vector<port::Thread> threads;
load_handlers_func(); for (int i = 1; i < max_threads; i++) {
} else { threads.emplace_back(load_handlers_func);
std::vector<port::Thread> threads; }
for (int i = 0; i < max_threads; i++) { load_handlers_func();
threads.emplace_back(load_handlers_func); for (auto& t : threads) {
} t.join();
for (auto& t : threads) {
t.join();
}
} }
} }

Loading…
Cancel
Save