diff --git a/HISTORY.md b/HISTORY.md index 44fc66bcb..e9f06b532 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -14,8 +14,12 @@ * Merging iterator to avoid child iterator reseek for some cases * Reduce iterator key comparision for upper/lower bound check. +### General Improvements +* Added new status code kColumnFamilyDropped to distinguish between Column Family Dropped and DB Shutdown in progress. + ### Bug Fixes + ## 6.2.0 (4/30/2019) ### New Features * Add an option `strict_bytes_per_sync` that causes a file-writing thread to block rather than exceed the limit on bytes pending writeback specified by `bytes_per_sync` or `wal_bytes_per_sync`. diff --git a/db/compaction_job.cc b/db/compaction_job.cc index fb77431fd..d1ae19327 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -1004,10 +1004,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats); RecordCompactionIOStats(); - if (status.ok() && - (shutting_down_->load(std::memory_order_relaxed) || cfd->IsDropped())) { - status = Status::ShutdownInProgress( - "Database shutdown or Column family drop during compaction"); + if (status.ok() && cfd->IsDropped()) { + status = + Status::ColumnFamilyDropped("Column family dropped during compaction"); + } + if ((status.ok() || status.IsColumnFamilyDropped()) && + shutting_down_->load(std::memory_order_relaxed)) { + status = Status::ShutdownInProgress("Database shutdown"); } if (status.ok()) { status = input->status(); diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index b5033b66f..91a04205e 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -3890,11 +3890,17 @@ TEST_F(DBCompactionTest, CompactRangeShutdownWhileDelayed) { } Flush(1); } - auto manual_compaction_thread = port::Thread([this]() { + auto manual_compaction_thread = port::Thread([this, i]() { CompactRangeOptions cro; cro.allow_write_stall = false; - ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr) - .IsShutdownInProgress()); + Status s = db_->CompactRange(cro, handles_[1], nullptr, nullptr); + if (i == 0) { + ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr) + .IsColumnFamilyDropped()); + } else { + ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr) + .IsShutdownInProgress()); + } }); TEST_SYNC_POINT( diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index c603f60b4..876605b2e 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -431,7 +431,7 @@ TEST_P(DBAtomicFlushTest, FlushMultipleCFs_DropSomeBeforeRequestFlush) { cf_ids.push_back(cf_id); } ASSERT_OK(dbfull()->DropColumnFamily(handles_[1])); - ASSERT_TRUE(Flush(cf_ids).IsShutdownInProgress()); + ASSERT_TRUE(Flush(cf_ids).IsColumnFamilyDropped()); Destroy(options); } diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 900ea4acd..38c69dfc1 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -201,7 +201,7 @@ Status DBImpl::FlushMemTableToOutputFile( cfd->current()->storage_info()->LevelSummary(&tmp)); } - if (!s.ok() && !s.IsShutdownInProgress()) { + if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) { Status new_bg_error = s; error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); } @@ -254,7 +254,7 @@ Status DBImpl::FlushMemTablesToOutputFiles( snapshot_checker, log_buffer, thread_pri); if (!s.ok()) { status = s; - if (!s.IsShutdownInProgress()) { + if (!s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) { // At this point, DB is not shutting down, nor is cfd dropped. // Something is wrong, thus we break out of the loop. break; @@ -385,7 +385,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( for (const auto& e : exec_status) { if (!e.second.ok()) { s = e.second; - if (!e.second.IsShutdownInProgress()) { + if (!e.second.IsShutdownInProgress() && + !e.second.IsColumnFamilyDropped()) { // If a flush job did not return OK, and the CF is not dropped, and // the DB is not shutting down, then we have to return this result to // caller later. @@ -397,15 +398,11 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( s = error_status.ok() ? s : error_status; } - // If db is NOT shutting down, and one or more column families have been - // dropped. - // TODO: use separate status code for db shutdown and column family dropped. - if (s.IsShutdownInProgress() && - !shutting_down_.load(std::memory_order_acquire)) { + if (s.IsColumnFamilyDropped()) { s = Status::OK(); } - if (s.ok() || s.IsShutdownInProgress()) { + if (s.ok() || s.IsShutdownInProgress() || s.IsColumnFamilyDropped()) { // Sync on all distinct output directories. for (auto dir : distinct_output_dirs) { if (dir != nullptr) { @@ -523,7 +520,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( // Need to undo atomic flush if something went wrong, i.e. s is not OK and // it is not because of CF drop. - if (!s.ok() && !s.IsShutdownInProgress()) { + if (!s.ok() && !s.IsColumnFamilyDropped()) { // Have to cancel the flush jobs that have NOT executed because we need to // unref the versions. for (int i = 0; i != num_cfs; ++i) { @@ -1052,7 +1049,7 @@ Status DBImpl::CompactFilesImpl( if (status.ok()) { // Done - } else if (status.IsShutdownInProgress()) { + } else if (status.IsColumnFamilyDropped()) { // Ignore compaction errors found during shutting down } else { ROCKS_LOG_WARN(immutable_db_options_.info_log, @@ -1697,7 +1694,10 @@ Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd, cfd->GetName().c_str()); bg_cv_.Wait(); } - if (cfd->IsDropped() || shutting_down_.load(std::memory_order_acquire)) { + if (cfd->IsDropped()) { + return Status::ColumnFamilyDropped(); + } + if (shutting_down_.load(std::memory_order_acquire)) { return Status::ShutdownInProgress(); } @@ -2159,7 +2159,7 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) { Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer, &reason, thread_pri); - if (!s.ok() && !s.IsShutdownInProgress() && + if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped() && reason != FlushReason::kErrorRecovery) { // Wait a little bit before retrying background flush in // case this is an environmental problem and we do not want to @@ -2184,7 +2184,8 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) { // If flush failed, we want to delete all temporary files that we might have // created. Thus, we force full scan in FindObsoleteFiles() - FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress()); + FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() && + !s.IsColumnFamilyDropped()); // delete unnecessary files if any, this is done outside the mutex if (job_context.HaveSomethingToClean() || job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { @@ -2248,7 +2249,8 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, mutex_.Unlock(); env_->SleepForMicroseconds(10000); // prevent hot loop mutex_.Lock(); - } else if (!s.ok() && !s.IsShutdownInProgress()) { + } else if (!s.ok() && !s.IsShutdownInProgress() && + !s.IsColumnFamilyDropped()) { // Wait a little bit before retrying background compaction in // case this is an environmental problem and we do not want to // chew up resources for failed compactions for the duration of @@ -2272,7 +2274,8 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, // If compaction failed, we want to delete all temporary files that we might // have created (they might not be all recorded in job_context in case of a // failure). Thus, we force full scan in FindObsoleteFiles() - FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress()); + FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() && + !s.IsColumnFamilyDropped()); TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"); // delete unnecessary files if any, this is done outside the mutex @@ -2710,7 +2713,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, if (status.ok() || status.IsCompactionTooLarge()) { // Done - } else if (status.IsShutdownInProgress()) { + } else if (status.IsColumnFamilyDropped()) { // Ignore compaction errors found during shutting down } else { ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s", diff --git a/db/flush_job.cc b/db/flush_job.cc index 4226589e7..21c1ff3a7 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -229,10 +229,12 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, // This will release and re-acquire the mutex. Status s = WriteLevel0Table(); - if (s.ok() && - (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) { - s = Status::ShutdownInProgress( - "Database shutdown or Column family drop during flush"); + if (s.ok() && cfd_->IsDropped()) { + s = Status::ColumnFamilyDropped("Column family dropped during compaction"); + } + if ((s.ok() || s.IsColumnFamilyDropped()) && + shutting_down_->load(std::memory_order_acquire)) { + s = Status::ShutdownInProgress("Database shutdown"); } if (!s.ok()) { diff --git a/db/memtable_list.cc b/db/memtable_list.cc index d81b1d4d2..bdcbd2186 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -598,7 +598,7 @@ Status InstallMemtableAtomicFlushResults( imm->InstallNewVersion(); } - if (s.ok() || s.IsShutdownInProgress()) { + if (s.ok() || s.IsColumnFamilyDropped()) { for (size_t i = 0; i != cfds.size(); ++i) { if (cfds[i]->IsDropped()) { continue; diff --git a/db/version_set.cc b/db/version_set.cc index 03c590272..15b9d01fe 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3842,8 +3842,6 @@ Status VersionSet::LogAndApply( } } if (0 == num_undropped_cfds) { - // TODO (yanqin) maybe use a different status code to denote column family - // drop other than OK and ShutdownInProgress for (int i = 0; i != num_cfds; ++i) { manifest_writers_.pop_front(); } @@ -3851,7 +3849,7 @@ Status VersionSet::LogAndApply( if (!manifest_writers_.empty()) { manifest_writers_.front()->cv.Signal(); } - return Status::ShutdownInProgress(); + return Status::ColumnFamilyDropped(); } return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log, diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index 12e8070d1..ac97ce442 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -58,7 +58,9 @@ class Status { kBusy = 11, kExpired = 12, kTryAgain = 13, - kCompactionTooLarge = 14 + kCompactionTooLarge = 14, + kColumnFamilyDropped = 15, + kMaxCode }; Code code() const { return code_; } @@ -184,6 +186,15 @@ class Status { return Status(kCompactionTooLarge, msg, msg2); } + static Status ColumnFamilyDropped(SubCode msg = kNone) { + return Status(kColumnFamilyDropped, msg); + } + + static Status ColumnFamilyDropped(const Slice& msg, + const Slice& msg2 = Slice()) { + return Status(kColumnFamilyDropped, msg, msg2); + } + static Status NoSpace() { return Status(kIOError, kNoSpace); } static Status NoSpace(const Slice& msg, const Slice& msg2 = Slice()) { return Status(kIOError, kNoSpace, msg, msg2); @@ -256,6 +267,9 @@ class Status { // Returns true iff the status indicates the proposed compaction is too large bool IsCompactionTooLarge() const { return code() == kCompactionTooLarge; } + // Returns true iff the status indicates Column Family Dropped + bool IsColumnFamilyDropped() const { return code() == kColumnFamilyDropped; } + // Returns true iff the status indicates a NoSpace error // This is caused by an I/O error returning the specific "out of space" // error condition. Stricto sensu, an NoSpace error is an I/O error diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index 193804ac3..d1585fcfa 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -467,6 +467,8 @@ class StatusJni : public RocksDBNativeClass { return 0xC; case rocksdb::Status::Code::kTryAgain: return 0xD; + case rocksdb::Status::Code::kColumnFamilyDropped: + return 0xE; default: return 0x7F; // undefined } @@ -584,6 +586,12 @@ class StatusJni : public RocksDBNativeClass { new rocksdb::Status(rocksdb::Status::TryAgain( rocksdb::SubCodeJni::toCppSubCode(jsub_code_value)))); break; + case 0xE: + // ColumnFamilyDropped + status = std::unique_ptr( + new rocksdb::Status(rocksdb::Status::ColumnFamilyDropped( + rocksdb::SubCodeJni::toCppSubCode(jsub_code_value)))); + break; case 0x7F: default: return nullptr; diff --git a/util/status.cc b/util/status.cc index c66bf6f8e..940594480 100644 --- a/util/status.cc +++ b/util/status.cc @@ -109,6 +109,9 @@ std::string Status::ToString() const { case kTryAgain: type = "Operation failed. Try again.: "; break; + case kColumnFamilyDropped: + type = "Column family dropped: "; + break; default: snprintf(tmp, sizeof(tmp), "Unknown code(%d): ", static_cast(code()));