Fix crash when background task fails (#5879)

Summary:
Fixing crash. Full story in issue: https://github.com/facebook/rocksdb/issues/5878
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5879

Differential Revision: D17812299

Pulled By: anand1976

fbshipit-source-id: 14e5a4fc502ade974583da9692d0ed6e5014613a
main
Tomas Kolda 5 years ago committed by Facebook Github Bot
parent 46ca51d430
commit e3a93c9ee1
  1. 1
      HISTORY.md
  2. 22
      db/db_impl/db_impl.cc
  3. 5
      db/db_impl/db_impl.h
  4. 15
      db/db_impl/db_impl_compaction_flush.cc
  5. 5
      db/db_impl/db_impl_open.cc

@ -7,6 +7,7 @@
* Fix a bug in file ingestion caused by incorrect file number allocation when the number of column families involved in the ingestion exceeds 2.
* Fix a bug when format_version=3, partitioned fitlers, and prefix search are used in conjunction. The bug could result into Seek::(prefix) returning NotFound for an existing prefix.
* Revert the feature "Merging iterator to avoid child iterator reseek for some cases (#5286)" since it might cause strong results when reseek happens with a different iterator upper bound.
* Fix a bug causing a crash during ingest external file when background compaction cause severe error (file not found).
### New Features
* Introduced DBOptions::max_write_batch_group_size_bytes to configure maximum limit on number of bytes that are written in a single batch of WAL or memtable write. It is followed when the leader write size is larger than 1/8 of this limit.
* VerifyChecksum() by default will issue readahead. Allow ReadOptions to be passed in to those functions to override the readhead size. For checksum verifying before external SST file ingestion, a new option IngestExternalFileOptions.verify_checksums_readahead_size, is added for this readahead setting.

@ -2904,8 +2904,11 @@ DBImpl::CaptureCurrentFileNumberInPendingOutputs() {
}
void DBImpl::ReleaseFileNumberFromPendingOutputs(
std::list<uint64_t>::iterator v) {
pending_outputs_.erase(v);
std::unique_ptr<std::list<uint64_t>::iterator>& v) {
if (v.get() != nullptr) {
pending_outputs_.erase(*v.get());
v.reset();
}
}
#ifndef ROCKSDB_LITE
@ -3744,7 +3747,7 @@ Status DBImpl::IngestExternalFiles(
// TODO (yanqin) maybe handle the case in which column_families have
// duplicates
std::list<uint64_t>::iterator pending_output_elem;
std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
size_t total = 0;
for (const auto& arg : args) {
total += arg.external_files.size();
@ -3752,7 +3755,7 @@ Status DBImpl::IngestExternalFiles(
uint64_t next_file_number = 0;
Status status = ReserveFileNumbersBeforeIngestion(
static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd(), total,
&pending_output_elem, &next_file_number);
pending_output_elem, &next_file_number);
if (!status.ok()) {
InstrumentedMutexLock l(&mutex_);
ReleaseFileNumberFromPendingOutputs(pending_output_elem);
@ -4026,7 +4029,7 @@ Status DBImpl::CreateColumnFamilyWithImport(
SuperVersionContext dummy_sv_ctx(/* create_superversion */ true);
VersionEdit dummy_edit;
uint64_t next_file_number = 0;
std::list<uint64_t>::iterator pending_output_elem;
std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
{
// Lock db mutex
InstrumentedMutexLock l(&mutex_);
@ -4036,7 +4039,8 @@ Status DBImpl::CreateColumnFamilyWithImport(
}
// Make sure that bg cleanup wont delete the files that we are importing
pending_output_elem = CaptureCurrentFileNumberInPendingOutputs();
pending_output_elem.reset(new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));
if (status.ok()) {
// If crash happen after a hard link established, Recover function may
@ -4254,18 +4258,18 @@ Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id,
Status DBImpl::ReserveFileNumbersBeforeIngestion(
ColumnFamilyData* cfd, uint64_t num,
std::list<uint64_t>::iterator* pending_output_elem,
std::unique_ptr<std::list<uint64_t>::iterator>& pending_output_elem,
uint64_t* next_file_number) {
Status s;
SuperVersionContext dummy_sv_ctx(true /* create_superversion */);
assert(nullptr != pending_output_elem);
assert(nullptr != next_file_number);
InstrumentedMutexLock l(&mutex_);
if (error_handler_.IsDBStopped()) {
// Do not ingest files when there is a bg_error
return error_handler_.GetBGError();
}
*pending_output_elem = CaptureCurrentFileNumberInPendingOutputs();
pending_output_elem.reset(new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));
*next_file_number = versions_->FetchAddFileNumber(static_cast<uint64_t>(num));
auto cf_options = cfd->GetLatestMutableCFOptions();
VersionEdit dummy_edit;

@ -1313,7 +1313,8 @@ class DBImpl : public DB {
// created between the calls CaptureCurrentFileNumberInPendingOutputs() and
// ReleaseFileNumberFromPendingOutputs() can now be deleted (if it's not live
// and blocked by any other pending_outputs_ calls)
void ReleaseFileNumberFromPendingOutputs(std::list<uint64_t>::iterator v);
void ReleaseFileNumberFromPendingOutputs(
std::unique_ptr<std::list<uint64_t>::iterator>& v);
Status SyncClosedLogs(JobContext* job_context);
@ -1605,7 +1606,7 @@ class DBImpl : public DB {
// Write a version edit to the MANIFEST.
Status ReserveFileNumbersBeforeIngestion(
ColumnFamilyData* cfd, uint64_t num,
std::list<uint64_t>::iterator* pending_output_elem,
std::unique_ptr<std::list<uint64_t>::iterator>& pending_output_elem,
uint64_t* next_file_number);
#endif //! ROCKSDB_LITE

@ -969,8 +969,9 @@ Status DBImpl::CompactFilesImpl(
GetSnapshotContext(job_context, &snapshot_seqs,
&earliest_write_conflict_snapshot, &snapshot_checker);
auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs();
std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));
assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJobStats compaction_job_stats;
@ -2216,8 +2217,9 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
assert(bg_flush_scheduled_);
num_running_flushes_++;
auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs();
std::unique_ptr<std::list<uint64_t>::iterator>
pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));
FlushReason reason;
Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer,
@ -2298,8 +2300,9 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
num_running_compactions_++;
auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs();
std::unique_ptr<std::list<uint64_t>::iterator>
pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));
assert((bg_thread_pri == Env::Priority::BOTTOM &&
bg_bottom_compaction_scheduled_) ||

@ -1136,8 +1136,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros();
FileMetaData meta;
auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs();
std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));
meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
ReadOptions ro;
ro.total_order_seek = true;

Loading…
Cancel
Save