Use separate status code for column family drop and db shutdown in progress (#5275)

Summary:
Currently RocksDB uses Status::ShutdownInProgress to inform about column family drop. I would like to have a separate Status code for this event.
https://github.com/facebook/rocksdb/blob/master/include/rocksdb/status.h#L55
Comment on this:
abc4202e47/db/version_set.cc (L2742):L2743
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5275

Differential Revision: D15204583

Pulled By: vjnadimpalli

fbshipit-source-id: 95e99e34b27bc165b554ecb8a48a7f8e60f21e2a
main
Vijay Nadimpalli 6 years ago committed by Facebook Github Bot
parent 5c0e304170
commit 931c9df886
  1. 4
      HISTORY.md
  2. 11
      db/compaction_job.cc
  3. 8
      db/db_compaction_test.cc
  4. 2
      db/db_flush_test.cc
  5. 37
      db/db_impl_compaction_flush.cc
  6. 10
      db/flush_job.cc
  7. 2
      db/memtable_list.cc
  8. 4
      db/version_set.cc
  9. 16
      include/rocksdb/status.h
  10. 8
      java/rocksjni/portal.h
  11. 3
      util/status.cc

@ -14,8 +14,12 @@
* Merging iterator to avoid child iterator reseek for some cases * Merging iterator to avoid child iterator reseek for some cases
* Reduce iterator key comparision for upper/lower bound check. * Reduce iterator key comparision for upper/lower bound check.
### General Improvements
* Added new status code kColumnFamilyDropped to distinguish between Column Family Dropped and DB Shutdown in progress.
### Bug Fixes ### Bug Fixes
## 6.2.0 (4/30/2019) ## 6.2.0 (4/30/2019)
### New Features ### New Features
* Add an option `strict_bytes_per_sync` that causes a file-writing thread to block rather than exceed the limit on bytes pending writeback specified by `bytes_per_sync` or `wal_bytes_per_sync`. * Add an option `strict_bytes_per_sync` that causes a file-writing thread to block rather than exceed the limit on bytes pending writeback specified by `bytes_per_sync` or `wal_bytes_per_sync`.

@ -1004,10 +1004,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats); RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
RecordCompactionIOStats(); RecordCompactionIOStats();
if (status.ok() && if (status.ok() && cfd->IsDropped()) {
(shutting_down_->load(std::memory_order_relaxed) || cfd->IsDropped())) { status =
status = Status::ShutdownInProgress( Status::ColumnFamilyDropped("Column family dropped during compaction");
"Database shutdown or Column family drop during compaction"); }
if ((status.ok() || status.IsColumnFamilyDropped()) &&
shutting_down_->load(std::memory_order_relaxed)) {
status = Status::ShutdownInProgress("Database shutdown");
} }
if (status.ok()) { if (status.ok()) {
status = input->status(); status = input->status();

@ -3890,11 +3890,17 @@ TEST_F(DBCompactionTest, CompactRangeShutdownWhileDelayed) {
} }
Flush(1); Flush(1);
} }
auto manual_compaction_thread = port::Thread([this]() { auto manual_compaction_thread = port::Thread([this, i]() {
CompactRangeOptions cro; CompactRangeOptions cro;
cro.allow_write_stall = false; cro.allow_write_stall = false;
Status s = db_->CompactRange(cro, handles_[1], nullptr, nullptr);
if (i == 0) {
ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr)
.IsColumnFamilyDropped());
} else {
ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr) ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr)
.IsShutdownInProgress()); .IsShutdownInProgress());
}
}); });
TEST_SYNC_POINT( TEST_SYNC_POINT(

@ -431,7 +431,7 @@ TEST_P(DBAtomicFlushTest, FlushMultipleCFs_DropSomeBeforeRequestFlush) {
cf_ids.push_back(cf_id); cf_ids.push_back(cf_id);
} }
ASSERT_OK(dbfull()->DropColumnFamily(handles_[1])); ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
ASSERT_TRUE(Flush(cf_ids).IsShutdownInProgress()); ASSERT_TRUE(Flush(cf_ids).IsColumnFamilyDropped());
Destroy(options); Destroy(options);
} }

@ -201,7 +201,7 @@ Status DBImpl::FlushMemTableToOutputFile(
cfd->current()->storage_info()->LevelSummary(&tmp)); cfd->current()->storage_info()->LevelSummary(&tmp));
} }
if (!s.ok() && !s.IsShutdownInProgress()) { if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
Status new_bg_error = s; Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
} }
@ -254,7 +254,7 @@ Status DBImpl::FlushMemTablesToOutputFiles(
snapshot_checker, log_buffer, thread_pri); snapshot_checker, log_buffer, thread_pri);
if (!s.ok()) { if (!s.ok()) {
status = s; status = s;
if (!s.IsShutdownInProgress()) { if (!s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
// At this point, DB is not shutting down, nor is cfd dropped. // At this point, DB is not shutting down, nor is cfd dropped.
// Something is wrong, thus we break out of the loop. // Something is wrong, thus we break out of the loop.
break; break;
@ -385,7 +385,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
for (const auto& e : exec_status) { for (const auto& e : exec_status) {
if (!e.second.ok()) { if (!e.second.ok()) {
s = e.second; s = e.second;
if (!e.second.IsShutdownInProgress()) { if (!e.second.IsShutdownInProgress() &&
!e.second.IsColumnFamilyDropped()) {
// If a flush job did not return OK, and the CF is not dropped, and // If a flush job did not return OK, and the CF is not dropped, and
// the DB is not shutting down, then we have to return this result to // the DB is not shutting down, then we have to return this result to
// caller later. // caller later.
@ -397,15 +398,11 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
s = error_status.ok() ? s : error_status; s = error_status.ok() ? s : error_status;
} }
// If db is NOT shutting down, and one or more column families have been if (s.IsColumnFamilyDropped()) {
// dropped.
// TODO: use separate status code for db shutdown and column family dropped.
if (s.IsShutdownInProgress() &&
!shutting_down_.load(std::memory_order_acquire)) {
s = Status::OK(); s = Status::OK();
} }
if (s.ok() || s.IsShutdownInProgress()) { if (s.ok() || s.IsShutdownInProgress() || s.IsColumnFamilyDropped()) {
// Sync on all distinct output directories. // Sync on all distinct output directories.
for (auto dir : distinct_output_dirs) { for (auto dir : distinct_output_dirs) {
if (dir != nullptr) { if (dir != nullptr) {
@ -523,7 +520,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
// Need to undo atomic flush if something went wrong, i.e. s is not OK and // Need to undo atomic flush if something went wrong, i.e. s is not OK and
// it is not because of CF drop. // it is not because of CF drop.
if (!s.ok() && !s.IsShutdownInProgress()) { if (!s.ok() && !s.IsColumnFamilyDropped()) {
// Have to cancel the flush jobs that have NOT executed because we need to // Have to cancel the flush jobs that have NOT executed because we need to
// unref the versions. // unref the versions.
for (int i = 0; i != num_cfs; ++i) { for (int i = 0; i != num_cfs; ++i) {
@ -1052,7 +1049,7 @@ Status DBImpl::CompactFilesImpl(
if (status.ok()) { if (status.ok()) {
// Done // Done
} else if (status.IsShutdownInProgress()) { } else if (status.IsColumnFamilyDropped()) {
// Ignore compaction errors found during shutting down // Ignore compaction errors found during shutting down
} else { } else {
ROCKS_LOG_WARN(immutable_db_options_.info_log, ROCKS_LOG_WARN(immutable_db_options_.info_log,
@ -1697,7 +1694,10 @@ Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
cfd->GetName().c_str()); cfd->GetName().c_str());
bg_cv_.Wait(); bg_cv_.Wait();
} }
if (cfd->IsDropped() || shutting_down_.load(std::memory_order_acquire)) { if (cfd->IsDropped()) {
return Status::ColumnFamilyDropped();
}
if (shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress(); return Status::ShutdownInProgress();
} }
@ -2159,7 +2159,7 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer, Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer,
&reason, thread_pri); &reason, thread_pri);
if (!s.ok() && !s.IsShutdownInProgress() && if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped() &&
reason != FlushReason::kErrorRecovery) { reason != FlushReason::kErrorRecovery) {
// Wait a little bit before retrying background flush in // Wait a little bit before retrying background flush in
// case this is an environmental problem and we do not want to // case this is an environmental problem and we do not want to
@ -2184,7 +2184,8 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
// If flush failed, we want to delete all temporary files that we might have // If flush failed, we want to delete all temporary files that we might have
// created. Thus, we force full scan in FindObsoleteFiles() // created. Thus, we force full scan in FindObsoleteFiles()
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress()); FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
!s.IsColumnFamilyDropped());
// delete unnecessary files if any, this is done outside the mutex // delete unnecessary files if any, this is done outside the mutex
if (job_context.HaveSomethingToClean() || if (job_context.HaveSomethingToClean() ||
job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
@ -2248,7 +2249,8 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
mutex_.Unlock(); mutex_.Unlock();
env_->SleepForMicroseconds(10000); // prevent hot loop env_->SleepForMicroseconds(10000); // prevent hot loop
mutex_.Lock(); mutex_.Lock();
} else if (!s.ok() && !s.IsShutdownInProgress()) { } else if (!s.ok() && !s.IsShutdownInProgress() &&
!s.IsColumnFamilyDropped()) {
// Wait a little bit before retrying background compaction in // Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to // case this is an environmental problem and we do not want to
// chew up resources for failed compactions for the duration of // chew up resources for failed compactions for the duration of
@ -2272,7 +2274,8 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
// If compaction failed, we want to delete all temporary files that we might // If compaction failed, we want to delete all temporary files that we might
// have created (they might not be all recorded in job_context in case of a // have created (they might not be all recorded in job_context in case of a
// failure). Thus, we force full scan in FindObsoleteFiles() // failure). Thus, we force full scan in FindObsoleteFiles()
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress()); FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
!s.IsColumnFamilyDropped());
TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"); TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles");
// delete unnecessary files if any, this is done outside the mutex // delete unnecessary files if any, this is done outside the mutex
@ -2710,7 +2713,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
if (status.ok() || status.IsCompactionTooLarge()) { if (status.ok() || status.IsCompactionTooLarge()) {
// Done // Done
} else if (status.IsShutdownInProgress()) { } else if (status.IsColumnFamilyDropped()) {
// Ignore compaction errors found during shutting down // Ignore compaction errors found during shutting down
} else { } else {
ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s", ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",

@ -229,10 +229,12 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
// This will release and re-acquire the mutex. // This will release and re-acquire the mutex.
Status s = WriteLevel0Table(); Status s = WriteLevel0Table();
if (s.ok() && if (s.ok() && cfd_->IsDropped()) {
(shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) { s = Status::ColumnFamilyDropped("Column family dropped during compaction");
s = Status::ShutdownInProgress( }
"Database shutdown or Column family drop during flush"); if ((s.ok() || s.IsColumnFamilyDropped()) &&
shutting_down_->load(std::memory_order_acquire)) {
s = Status::ShutdownInProgress("Database shutdown");
} }
if (!s.ok()) { if (!s.ok()) {

@ -598,7 +598,7 @@ Status InstallMemtableAtomicFlushResults(
imm->InstallNewVersion(); imm->InstallNewVersion();
} }
if (s.ok() || s.IsShutdownInProgress()) { if (s.ok() || s.IsColumnFamilyDropped()) {
for (size_t i = 0; i != cfds.size(); ++i) { for (size_t i = 0; i != cfds.size(); ++i) {
if (cfds[i]->IsDropped()) { if (cfds[i]->IsDropped()) {
continue; continue;

@ -3842,8 +3842,6 @@ Status VersionSet::LogAndApply(
} }
} }
if (0 == num_undropped_cfds) { if (0 == num_undropped_cfds) {
// TODO (yanqin) maybe use a different status code to denote column family
// drop other than OK and ShutdownInProgress
for (int i = 0; i != num_cfds; ++i) { for (int i = 0; i != num_cfds; ++i) {
manifest_writers_.pop_front(); manifest_writers_.pop_front();
} }
@ -3851,7 +3849,7 @@ Status VersionSet::LogAndApply(
if (!manifest_writers_.empty()) { if (!manifest_writers_.empty()) {
manifest_writers_.front()->cv.Signal(); manifest_writers_.front()->cv.Signal();
} }
return Status::ShutdownInProgress(); return Status::ColumnFamilyDropped();
} }
return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log, return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log,

@ -58,7 +58,9 @@ class Status {
kBusy = 11, kBusy = 11,
kExpired = 12, kExpired = 12,
kTryAgain = 13, kTryAgain = 13,
kCompactionTooLarge = 14 kCompactionTooLarge = 14,
kColumnFamilyDropped = 15,
kMaxCode
}; };
Code code() const { return code_; } Code code() const { return code_; }
@ -184,6 +186,15 @@ class Status {
return Status(kCompactionTooLarge, msg, msg2); return Status(kCompactionTooLarge, msg, msg2);
} }
static Status ColumnFamilyDropped(SubCode msg = kNone) {
return Status(kColumnFamilyDropped, msg);
}
static Status ColumnFamilyDropped(const Slice& msg,
const Slice& msg2 = Slice()) {
return Status(kColumnFamilyDropped, msg, msg2);
}
static Status NoSpace() { return Status(kIOError, kNoSpace); } static Status NoSpace() { return Status(kIOError, kNoSpace); }
static Status NoSpace(const Slice& msg, const Slice& msg2 = Slice()) { static Status NoSpace(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kIOError, kNoSpace, msg, msg2); return Status(kIOError, kNoSpace, msg, msg2);
@ -256,6 +267,9 @@ class Status {
// Returns true iff the status indicates the proposed compaction is too large // Returns true iff the status indicates the proposed compaction is too large
bool IsCompactionTooLarge() const { return code() == kCompactionTooLarge; } bool IsCompactionTooLarge() const { return code() == kCompactionTooLarge; }
// Returns true iff the status indicates Column Family Dropped
bool IsColumnFamilyDropped() const { return code() == kColumnFamilyDropped; }
// Returns true iff the status indicates a NoSpace error // Returns true iff the status indicates a NoSpace error
// This is caused by an I/O error returning the specific "out of space" // This is caused by an I/O error returning the specific "out of space"
// error condition. Stricto sensu, an NoSpace error is an I/O error // error condition. Stricto sensu, an NoSpace error is an I/O error

@ -467,6 +467,8 @@ class StatusJni : public RocksDBNativeClass<rocksdb::Status*, StatusJni> {
return 0xC; return 0xC;
case rocksdb::Status::Code::kTryAgain: case rocksdb::Status::Code::kTryAgain:
return 0xD; return 0xD;
case rocksdb::Status::Code::kColumnFamilyDropped:
return 0xE;
default: default:
return 0x7F; // undefined return 0x7F; // undefined
} }
@ -584,6 +586,12 @@ class StatusJni : public RocksDBNativeClass<rocksdb::Status*, StatusJni> {
new rocksdb::Status(rocksdb::Status::TryAgain( new rocksdb::Status(rocksdb::Status::TryAgain(
rocksdb::SubCodeJni::toCppSubCode(jsub_code_value)))); rocksdb::SubCodeJni::toCppSubCode(jsub_code_value))));
break; break;
case 0xE:
// ColumnFamilyDropped
status = std::unique_ptr<rocksdb::Status>(
new rocksdb::Status(rocksdb::Status::ColumnFamilyDropped(
rocksdb::SubCodeJni::toCppSubCode(jsub_code_value))));
break;
case 0x7F: case 0x7F:
default: default:
return nullptr; return nullptr;

@ -109,6 +109,9 @@ std::string Status::ToString() const {
case kTryAgain: case kTryAgain:
type = "Operation failed. Try again.: "; type = "Operation failed. Try again.: ";
break; break;
case kColumnFamilyDropped:
type = "Column family dropped: ";
break;
default: default:
snprintf(tmp, sizeof(tmp), "Unknown code(%d): ", snprintf(tmp, sizeof(tmp), "Unknown code(%d): ",
static_cast<int>(code())); static_cast<int>(code()));

Loading…
Cancel
Save