Inform caller when rocksdb is stalling writes

Summary:
Add a new function in Listener to let the caller know when rocksdb
is stalling writes.
Closes https://github.com/facebook/rocksdb/pull/2897

Differential Revision: D5860124

Pulled By: schischi

fbshipit-source-id: ee791606169aa64f772c86f817cebf02624e05e1
main
Adrien Schildknecht 7 years ago committed by Facebook Github Bot
parent cc20ec3689
commit 01542400a8
  1. 41
      db/column_family.cc
  2. 8
      db/column_family.h
  3. 4
      db/compacted_db_impl.cc
  4. 26
      db/db_impl.cc
  5. 19
      db/db_impl.h
  6. 54
      db/db_impl_compaction_flush.cc
  7. 5
      db/db_impl_experimental.cc
  8. 6
      db/db_impl_open.cc
  9. 5
      db/db_impl_readonly.cc
  10. 7
      db/db_impl_write.cc
  11. 21
      db/db_test.cc
  12. 85
      db/job_context.h
  13. 24
      include/rocksdb/listener.h

@ -613,8 +613,9 @@ int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
}
} // namespace
void ColumnFamilyData::RecalculateWriteStallConditions(
WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options) {
auto write_stall_condition = WriteStallCondition::kNormal;
if (current_ != nullptr) {
auto* vstorage = current_->storage_info();
auto write_controller = column_family_set_->write_controller_;
@ -627,6 +628,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
write_stall_condition = WriteStallCondition::kStopped;
ROCKS_LOG_WARN(
ioptions_.info_log,
"[%s] Stopping writes because we have %d immutable memtables "
@ -638,6 +640,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
mutable_cf_options.level0_stop_writes_trigger) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
write_stall_condition = WriteStallCondition::kStopped;
if (compaction_picker_->IsLevel0CompactionInProgress()) {
internal_stats_->AddCFStats(
InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1);
@ -652,6 +655,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(
InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
write_stall_condition = WriteStallCondition::kStopped;
ROCKS_LOG_WARN(
ioptions_.info_log,
"[%s] Stopping writes because of estimated pending compaction "
@ -665,6 +669,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
prev_compaction_needed_bytes_, was_stopped,
mutable_cf_options.disable_auto_compactions);
internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1);
write_stall_condition = WriteStallCondition::kDelayed;
ROCKS_LOG_WARN(
ioptions_.info_log,
"[%s] Stalling writes because we have %d immutable memtables "
@ -686,6 +691,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
mutable_cf_options.disable_auto_compactions);
internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS,
1);
write_stall_condition = WriteStallCondition::kDelayed;
if (compaction_picker_->IsLevel0CompactionInProgress()) {
internal_stats_->AddCFStats(
InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1);
@ -716,6 +722,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
mutable_cf_options.disable_auto_compactions);
internal_stats_->AddCFStats(
InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1);
write_stall_condition = WriteStallCondition::kDelayed;
ROCKS_LOG_WARN(
ioptions_.info_log,
"[%s] Stalling writes because of estimated pending compaction "
@ -770,6 +777,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
}
prev_compaction_needed_bytes_ = compaction_needed_bytes;
}
return write_stall_condition;
}
const EnvOptions* ColumnFamilyData::soptions() const {
@ -915,15 +923,16 @@ bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
return false;
}
SuperVersion* ColumnFamilyData::InstallSuperVersion(
SuperVersion* new_superversion, InstrumentedMutex* db_mutex) {
void ColumnFamilyData::InstallSuperVersion(
SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
db_mutex->AssertHeld();
return InstallSuperVersion(new_superversion, db_mutex, mutable_cf_options_);
return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_);
}
SuperVersion* ColumnFamilyData::InstallSuperVersion(
SuperVersion* new_superversion, InstrumentedMutex* db_mutex,
void ColumnFamilyData::InstallSuperVersion(
SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
const MutableCFOptions& mutable_cf_options) {
SuperVersion* new_superversion = sv_context->new_superversion.release();
new_superversion->db_mutex = db_mutex;
new_superversion->mutable_cf_options = mutable_cf_options;
new_superversion->Init(mem_, imm_.current(), current_);
@ -931,16 +940,24 @@ SuperVersion* ColumnFamilyData::InstallSuperVersion(
super_version_ = new_superversion;
++super_version_number_;
super_version_->version_number = super_version_number_;
// Reset SuperVersions cached in thread local storage
ResetThreadLocalSuperVersions();
super_version_->write_stall_condition =
RecalculateWriteStallConditions(mutable_cf_options);
if (old_superversion != nullptr && old_superversion->Unref()) {
if (old_superversion != nullptr) {
if (old_superversion->write_stall_condition !=
new_superversion->write_stall_condition) {
sv_context->PushWriteStallNotification(
old_superversion->write_stall_condition,
new_superversion->write_stall_condition, GetName(), ioptions());
}
if (old_superversion->Unref()) {
old_superversion->Cleanup();
return old_superversion; // will let caller delete outside of mutex
sv_context->superversions_to_free.push_back(old_superversion);
}
return nullptr;
}
// Reset SuperVersions cached in thread local storage
ResetThreadLocalSuperVersions();
}
void ColumnFamilyData::ResetThreadLocalSuperVersions() {

@ -41,6 +41,7 @@ class DBImpl;
class LogBuffer;
class InstrumentedMutex;
class InstrumentedMutexLock;
struct SuperVersionContext;
extern const double kIncSlowdownRatio;
@ -95,6 +96,7 @@ struct SuperVersion {
MutableCFOptions mutable_cf_options;
// Version number of the current SuperVersion
uint64_t version_number;
WriteStallCondition write_stall_condition;
InstrumentedMutex* db_mutex;
@ -311,10 +313,10 @@ class ColumnFamilyData {
// As argument takes a pointer to allocated SuperVersion to enable
// the clients to allocate SuperVersion outside of mutex.
// IMPORTANT: Only call this from DBImpl::InstallSuperVersion()
SuperVersion* InstallSuperVersion(SuperVersion* new_superversion,
void InstallSuperVersion(SuperVersionContext* sv_context,
InstrumentedMutex* db_mutex,
const MutableCFOptions& mutable_cf_options);
SuperVersion* InstallSuperVersion(SuperVersion* new_superversion,
void InstallSuperVersion(SuperVersionContext* sv_context,
InstrumentedMutex* db_mutex);
void ResetThreadLocalSuperVersions();
@ -330,7 +332,7 @@ class ColumnFamilyData {
// recalculation of compaction score. These values are used in
// DBImpl::MakeRoomForWrite function to decide, if it need to make
// a write stall
void RecalculateWriteStallConditions(
WriteStallCondition RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options);
void set_initialized() { initialized_.store(true); }

@ -93,6 +93,7 @@ std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
}
Status CompactedDBImpl::Init(const Options& options) {
SuperVersionContext sv_context(/* create_superversion */ true);
mutex_.Lock();
ColumnFamilyDescriptor cf(kDefaultColumnFamilyName,
ColumnFamilyOptions(options));
@ -100,9 +101,10 @@ Status CompactedDBImpl::Init(const Options& options) {
if (s.ok()) {
cfd_ = reinterpret_cast<ColumnFamilyHandleImpl*>(
DefaultColumnFamily())->cfd();
delete cfd_->InstallSuperVersion(new SuperVersion(), &mutex_);
cfd_->InstallSuperVersion(&sv_context, &mutex_);
}
mutex_.Unlock();
sv_context.Clean();
if (!s.ok()) {
return s;
}

@ -484,6 +484,7 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
Status s;
Status persist_options_status;
WriteThread::Writer w;
SuperVersionContext sv_context(/* create_superversion */ true);
{
InstrumentedMutexLock l(&mutex_);
s = cfd->SetOptions(options_map);
@ -496,14 +497,13 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
// Trigger possible flush/compactions. This has to be before we persist
// options to file, otherwise there will be a deadlock with writer
// thread.
auto* old_sv =
InstallSuperVersionAndScheduleWork(cfd, nullptr, new_options);
delete old_sv;
InstallSuperVersionAndScheduleWork(cfd, &sv_context, new_options);
persist_options_status = WriteOptionsFile(
false /*need_mutex_lock*/, true /*need_enter_write_thread*/);
}
}
sv_context.Clean();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"SetOptions() on column family [%s], inputs:",
@ -1229,6 +1229,7 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
return s;
}
SuperVersionContext sv_context(/* create_superversion */ true);
{
InstrumentedMutexLock l(&mutex_);
@ -1260,8 +1261,8 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
auto* cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
assert(cfd != nullptr);
delete InstallSuperVersionAndScheduleWork(
cfd, nullptr, *cfd->GetLatestMutableCFOptions());
InstallSuperVersionAndScheduleWork(
cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
if (!cfd->mem()->IsSnapshotSupported()) {
is_snapshot_supported_ = false;
@ -1280,6 +1281,7 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
}
} // InstrumentedMutexLock l(&mutex_)
sv_context.Clean();
// this is outside the mutex
if (s.ok()) {
NewThreadStatusCfInfo(
@ -2035,8 +2037,9 @@ Status DBImpl::DeleteFile(std::string name) {
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_, directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWorkWrapper(
cfd, &job_context, *cfd->GetLatestMutableCFOptions());
InstallSuperVersionAndScheduleWork(
cfd, &job_context.superversion_context,
*cfd->GetLatestMutableCFOptions());
}
FindObsoleteFiles(&job_context, false);
} // lock released here
@ -2113,8 +2116,9 @@ Status DBImpl::DeleteFilesInRange(ColumnFamilyHandle* column_family,
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_, directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWorkWrapper(
cfd, &job_context, *cfd->GetLatestMutableCFOptions());
InstallSuperVersionAndScheduleWork(
cfd, &job_context.superversion_context,
*cfd->GetLatestMutableCFOptions());
}
for (auto* deleted_file : deleted_files) {
deleted_file->being_compacted = false;
@ -2678,6 +2682,7 @@ Status DBImpl::IngestExternalFile(
return status;
}
SuperVersionContext sv_context(/* create_superversion */ true);
TEST_SYNC_POINT("DBImpl::AddFile:Start");
{
// Lock db mutex
@ -2726,7 +2731,7 @@ Status DBImpl::IngestExternalFile(
&mutex_, directories_.GetDbDir());
}
if (status.ok()) {
delete InstallSuperVersionAndScheduleWork(cfd, nullptr,
InstallSuperVersionAndScheduleWork(cfd, &sv_context,
*mutable_cf_options);
}
@ -2753,6 +2758,7 @@ Status DBImpl::IngestExternalFile(
// mutex_ is unlocked here
// Cleanup
sv_context.Clean();
ingestion_job.Cleanup(status);
if (status.ok()) {

@ -666,13 +666,14 @@ class DBImpl : public DB {
struct CompactionState;
struct WriteContext {
autovector<SuperVersion*> superversions_to_free_;
SuperVersionContext superversion_context;
autovector<MemTable*> memtables_to_free_;
explicit WriteContext(bool create_superversion = false)
: superversion_context(create_superversion) {}
~WriteContext() {
for (auto& sv : superversions_to_free_) {
delete sv;
}
superversion_context.Clean();
for (auto& m : memtables_to_free_) {
delete m;
}
@ -1236,17 +1237,13 @@ class DBImpl : public DB {
// Background threads call this function, which is just a wrapper around
// the InstallSuperVersion() function. Background threads carry
// job_context which can have new_superversion already
// sv_context which can have new_superversion already
// allocated.
void InstallSuperVersionAndScheduleWorkWrapper(
ColumnFamilyData* cfd, JobContext* job_context,
const MutableCFOptions& mutable_cf_options);
// All ColumnFamily state changes go through this function. Here we analyze
// the new state and we schedule background work if we detect that the new
// state needs flush or compaction.
SuperVersion* InstallSuperVersionAndScheduleWork(
ColumnFamilyData* cfd, SuperVersion* new_sv,
void InstallSuperVersionAndScheduleWork(
ColumnFamilyData* cfd, SuperVersionContext* sv_context,
const MutableCFOptions& mutable_cf_options);
#ifndef ROCKSDB_LITE

@ -130,7 +130,7 @@ Status DBImpl::FlushMemTableToOutputFile(
}
if (s.ok()) {
InstallSuperVersionAndScheduleWorkWrapper(cfd, job_context,
InstallSuperVersionAndScheduleWork(cfd, &job_context->superversion_context,
mutable_cf_options);
if (made_progress) {
*made_progress = 1;
@ -573,8 +573,9 @@ Status DBImpl::CompactFilesImpl(
Status status = compaction_job.Install(*c->mutable_cf_options());
if (status.ok()) {
InstallSuperVersionAndScheduleWorkWrapper(
c->column_family_data(), job_context, *c->mutable_cf_options());
InstallSuperVersionAndScheduleWork(
c->column_family_data(), &job_context->superversion_context,
*c->mutable_cf_options());
}
c->ReleaseCompactionFiles(s);
@ -707,8 +708,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
return Status::InvalidArgument("Target level exceeds number of levels");
}
std::unique_ptr<SuperVersion> superversion_to_free;
std::unique_ptr<SuperVersion> new_superversion(new SuperVersion());
SuperVersionContext sv_context(/* create_superversion */ true);
Status status;
@ -763,8 +763,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
directories_.GetDbDir());
superversion_to_free.reset(InstallSuperVersionAndScheduleWork(
cfd, new_superversion.release(), mutable_cf_options));
InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options);
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n",
cfd->GetName().c_str(), status.ToString().data());
@ -776,6 +775,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
}
}
sv_context.Clean();
refitting_level_ = false;
return status;
@ -1576,8 +1576,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir());
InstallSuperVersionAndScheduleWorkWrapper(
c->column_family_data(), job_context, *c->mutable_cf_options());
InstallSuperVersionAndScheduleWork(
c->column_family_data(), &job_context->superversion_context,
*c->mutable_cf_options());
ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n",
c->column_family_data()->GetName().c_str(),
c->num_input_files(0));
@ -1622,8 +1623,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
*c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir());
// Use latest MutableCFOptions
InstallSuperVersionAndScheduleWorkWrapper(
c->column_family_data(), job_context, *c->mutable_cf_options());
InstallSuperVersionAndScheduleWork(
c->column_family_data(), &job_context->superversion_context,
*c->mutable_cf_options());
VersionStorageInfo::LevelSummaryStorage tmp;
c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(),
@ -1696,8 +1698,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
status = compaction_job.Install(*c->mutable_cf_options());
if (status.ok()) {
InstallSuperVersionAndScheduleWorkWrapper(
c->column_family_data(), job_context, *c->mutable_cf_options());
InstallSuperVersionAndScheduleWork(
c->column_family_data(), &job_context->superversion_context,
*c->mutable_cf_options());
}
*made_progress = true;
}
@ -1863,30 +1866,21 @@ bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
return true;
}
// JobContext gets created and destructed outside of the lock --
// SuperVersionContext gets created and destructed outside of the lock --
// we
// use this convinently to:
// * malloc one SuperVersion() outside of the lock -- new_superversion
// * delete SuperVersion()s outside of the lock -- superversions_to_free
//
// However, if InstallSuperVersionAndScheduleWork() gets called twice with the
// same job_context, we can't reuse the SuperVersion() that got
// same sv_context, we can't reuse the SuperVersion() that got
// malloced because
// first call already used it. In that rare case, we take a hit and create a
// new SuperVersion() inside of the mutex. We do similar thing
// for superversion_to_free
void DBImpl::InstallSuperVersionAndScheduleWorkWrapper(
ColumnFamilyData* cfd, JobContext* job_context,
const MutableCFOptions& mutable_cf_options) {
mutex_.AssertHeld();
SuperVersion* old_superversion = InstallSuperVersionAndScheduleWork(
cfd, job_context->new_superversion, mutable_cf_options);
job_context->new_superversion = nullptr;
job_context->superversions_to_free.push_back(old_superversion);
}
SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork(
ColumnFamilyData* cfd, SuperVersion* new_sv,
void DBImpl::InstallSuperVersionAndScheduleWork(
ColumnFamilyData* cfd, SuperVersionContext* sv_context,
const MutableCFOptions& mutable_cf_options) {
mutex_.AssertHeld();
@ -1898,8 +1892,11 @@ SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork(
old_sv->mutable_cf_options.max_write_buffer_number;
}
auto* old = cfd->InstallSuperVersion(
new_sv ? new_sv : new SuperVersion(), &mutex_, mutable_cf_options);
if (sv_context->new_superversion == nullptr) {
sv_context->NewSuperVersion();
}
cfd->InstallSuperVersion(sv_context, &mutex_,
mutable_cf_options);
// Whenever we install new SuperVersion, we might need to issue new flushes or
// compactions.
@ -1912,6 +1909,5 @@ SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork(
max_total_in_memory_state_ - old_memtable_size +
mutable_cf_options.write_buffer_size *
mutable_cf_options.max_write_buffer_number;
return old;
}
} // namespace rocksdb

@ -138,8 +138,9 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_, directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWorkWrapper(
cfd, &job_context, *cfd->GetLatestMutableCFOptions());
InstallSuperVersionAndScheduleWork(
cfd, &job_context.superversion_context,
*cfd->GetLatestMutableCFOptions());
}
} // lock released here
LogFlush(immutable_db_options_.info_log);

@ -1046,10 +1046,12 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
}
}
if (s.ok()) {
SuperVersionContext sv_context(/* create_superversion */ true);
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
delete impl->InstallSuperVersionAndScheduleWork(
cfd, nullptr, *cfd->GetLatestMutableCFOptions());
impl->InstallSuperVersionAndScheduleWork(
cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
}
sv_context.Clean();
if (impl->concurrent_prepare_) {
impl->log_write_mutex_.Lock();
}

@ -140,6 +140,7 @@ Status DB::OpenForReadOnly(
*dbptr = nullptr;
handles->clear();
SuperVersionContext sv_context(/* create_superversion */ true);
DBImplReadOnly* impl = new DBImplReadOnly(db_options, dbname);
impl->mutex_.Lock();
Status s = impl->Recover(column_families, true /* read only */,
@ -158,10 +159,12 @@ Status DB::OpenForReadOnly(
}
if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_);
sv_context.NewSuperVersion();
cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
}
}
impl->mutex_.Unlock();
sv_context.Clean();
if (s.ok()) {
*dbptr = impl;
for (auto* h : *handles) {

@ -1090,7 +1090,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
}
uint64_t new_log_number =
creating_new_log ? versions_->NewFileNumber() : logfile_number_;
SuperVersion* new_superversion = nullptr;
const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
// Set memtable_info for memtable sealed callback
@ -1146,7 +1145,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
if (s.ok()) {
SequenceNumber seq = versions_->LastSequence();
new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
new_superversion = new SuperVersion();
context->superversion_context.NewSuperVersion();
}
#ifndef ROCKSDB_LITE
@ -1204,8 +1203,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
new_mem->Ref();
cfd->SetMemtable(new_mem);
context->superversions_to_free_.push_back(InstallSuperVersionAndScheduleWork(
cfd, new_superversion, mutable_cf_options));
InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context,
mutable_cf_options);
if (concurrent_prepare_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}

@ -5160,6 +5160,19 @@ TEST_F(DBTest, HardLimit) {
}
#ifndef ROCKSDB_LITE
class WriteStallListener : public EventListener {
public:
WriteStallListener() : condition_(WriteStallCondition::kNormal) {}
void OnStallConditionsChanged(const WriteStallInfo& info) override {
condition_ = info.condition.cur;
}
bool CheckCondition(WriteStallCondition expected) {
return expected == condition_;
}
private:
WriteStallCondition condition_;
};
TEST_F(DBTest, SoftLimit) {
Options options = CurrentOptions();
options.env = env_;
@ -5175,6 +5188,8 @@ TEST_F(DBTest, SoftLimit) {
options.max_bytes_for_level_multiplier = 10;
options.max_background_compactions = 1;
options.compression = kNoCompression;
WriteStallListener* listener = new WriteStallListener();
options.listeners.emplace_back(listener);
Reopen(options);
@ -5214,6 +5229,7 @@ TEST_F(DBTest, SoftLimit) {
Flush();
}
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed));
sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();
@ -5224,6 +5240,7 @@ TEST_F(DBTest, SoftLimit) {
// The L1 file size is around 30KB.
ASSERT_EQ(NumTableFilesAtLevel(1), 1);
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kNormal));
// Only allow one compactin going through.
rocksdb::SyncPoint::GetInstance()->SetCallBack(
@ -5258,6 +5275,7 @@ TEST_F(DBTest, SoftLimit) {
// doesn't trigger soft_pending_compaction_bytes_limit
ASSERT_EQ(NumTableFilesAtLevel(1), 1);
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kNormal));
// Create 3 L0 files, making score of L0 to be 3, higher than L0.
for (int i = 0; i < 3; i++) {
@ -5278,11 +5296,13 @@ TEST_F(DBTest, SoftLimit) {
// triggerring soft_pending_compaction_bytes_limit
ASSERT_EQ(NumTableFilesAtLevel(1), 1);
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed));
sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilSleeping();
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kNormal));
// shrink level base so L2 will hit soft limit easier.
ASSERT_OK(dbfull()->SetOptions({
@ -5292,6 +5312,7 @@ TEST_F(DBTest, SoftLimit) {
Put("", "");
Flush();
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed));
sleeping_task_low.WaitUntilSleeping();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();

@ -13,17 +13,77 @@
#include <vector>
#include "db/log_writer.h"
#include "db/column_family.h"
namespace rocksdb {
class MemTable;
struct SuperVersion;
struct SuperVersionContext {
struct WriteStallNotification {
WriteStallInfo write_stall_info;
const ImmutableCFOptions* immutable_cf_options;
};
autovector<SuperVersion*> superversions_to_free;
autovector<WriteStallNotification> write_stall_notifications;
unique_ptr<SuperVersion> new_superversion; // if nullptr no new superversion
explicit SuperVersionContext(bool create_superversion = false)
: new_superversion(create_superversion ? new SuperVersion() : nullptr) {}
void NewSuperVersion() {
new_superversion = unique_ptr<SuperVersion>(new SuperVersion());
}
inline bool HaveSomethingToDelete() const {
return superversions_to_free.size() > 0 ||
write_stall_notifications.size() > 0;
}
void PushWriteStallNotification(
WriteStallCondition old_cond, WriteStallCondition new_cond,
const std::string& name, const ImmutableCFOptions* ioptions) {
#ifndef ROCKSDB_LITE
WriteStallNotification notif;
notif.write_stall_info.cf_name = name;
notif.write_stall_info.condition.prev = old_cond;
notif.write_stall_info.condition.cur = new_cond;
notif.immutable_cf_options = ioptions;
write_stall_notifications.push_back(notif);
#endif // !ROCKSDB_LITE
}
void Clean() {
#ifndef ROCKSDB_LITE
// notify listeners on changed write stall conditions
for (auto& notif : write_stall_notifications) {
for (auto listener : notif.immutable_cf_options->listeners) {
listener->OnStallConditionsChanged(notif.write_stall_info);
}
}
write_stall_notifications.clear();
#endif // !ROCKSDB_LITE
// free superversions
for (auto s : superversions_to_free) {
delete s;
}
superversions_to_free.clear();
}
~SuperVersionContext() {
assert(write_stall_notifications.size() == 0);
assert(superversions_to_free.size() == 0);
}
};
struct JobContext {
inline bool HaveSomethingToDelete() const {
return full_scan_candidate_files.size() || sst_delete_files.size() ||
log_delete_files.size() || manifest_delete_files.size() ||
new_superversion != nullptr || superversions_to_free.size() > 0 ||
memtables_to_free.size() > 0 || logs_to_free.size() > 0;
memtables_to_free.size() > 0 || logs_to_free.size() > 0 ||
superversion_context.HaveSomethingToDelete();
}
// Structure to store information for candidate files to delete.
@ -65,12 +125,10 @@ struct JobContext {
// a list of memtables to be free
autovector<MemTable*> memtables_to_free;
autovector<SuperVersion*> superversions_to_free;
SuperVersionContext superversion_context;
autovector<log::Writer*> logs_to_free;
SuperVersion* new_superversion; // if nullptr no new superversion
// the current manifest_file_number, log_number and prev_log_number
// that corresponds to the set of files in 'live'.
uint64_t manifest_file_number;
@ -83,13 +141,13 @@ struct JobContext {
size_t num_alive_log_files = 0;
uint64_t size_log_to_delete = 0;
explicit JobContext(int _job_id, bool create_superversion = false) {
explicit JobContext(int _job_id, bool create_superversion = false)
: superversion_context(create_superversion) {
job_id = _job_id;
manifest_file_number = 0;
pending_manifest_file_number = 0;
log_number = 0;
prev_log_number = 0;
new_superversion = create_superversion ? new SuperVersion() : nullptr;
}
// For non-empty JobContext Clean() has to be called at least once before
@ -97,31 +155,22 @@ struct JobContext {
// unlocked DB mutex. Destructor doesn't call Clean() to avoid accidentally
// doing potentially slow Clean() with locked DB mutex.
void Clean() {
// free superversions
superversion_context.Clean();
// free pending memtables
for (auto m : memtables_to_free) {
delete m;
}
// free superversions
for (auto s : superversions_to_free) {
delete s;
}
for (auto l : logs_to_free) {
delete l;
}
// if new_superversion was not used, it will be non-nullptr and needs
// to be freed here
delete new_superversion;
memtables_to_free.clear();
superversions_to_free.clear();
logs_to_free.clear();
new_superversion = nullptr;
}
~JobContext() {
assert(memtables_to_free.size() == 0);
assert(superversions_to_free.size() == 0);
assert(new_superversion == nullptr);
assert(logs_to_free.size() == 0);
}
};

@ -86,6 +86,22 @@ enum class BackgroundErrorReason {
kMemTable,
};
enum class WriteStallCondition {
kNormal,
kDelayed,
kStopped,
};
struct WriteStallInfo {
// the name of the column family
std::string cf_name;
// state of the write controller
struct {
WriteStallCondition cur;
WriteStallCondition prev;
} condition;
};
#ifndef ROCKSDB_LITE
struct TableFileDeletionInfo {
@ -372,6 +388,14 @@ class EventListener {
virtual void OnBackgroundError(BackgroundErrorReason /* reason */,
Status* /* bg_error */) {}
// A call-back function for RocksDB which will be called whenever a change
// of superversion triggers a change of the stall conditions.
//
// Note that the this function must be implemented in a way such that
// it should not run for an extended period of time before the function
// returns. Otherwise, RocksDB may be blocked.
virtual void OnStallConditionsChanged(const WriteStallInfo& /*info*/) {}
// Factory method to return CompactionEventListener. If multiple listeners
// provides CompactionEventListner, only the first one will be used.
virtual CompactionEventListener* GetCompactionEventListener() {

Loading…
Cancel
Save