diff --git a/db/column_family.cc b/db/column_family.cc index a3f936e91..4a605f6f7 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -17,9 +17,7 @@ namespace rocksdb { -SuperVersion::SuperVersion(const int num_memtables) { - to_delete.resize(num_memtables); -} +SuperVersion::SuperVersion() {} SuperVersion::~SuperVersion() { for (auto td : to_delete) { @@ -71,7 +69,8 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, imm_(options.min_write_buffer_number_to_merge), super_version_(nullptr), super_version_number_(0), - log_number_(0) {} + log_number_(0), + need_slowdown_for_num_level0_files_(false) {} ColumnFamilyData::~ColumnFamilyData() { if (super_version_ != nullptr) { @@ -95,6 +94,13 @@ ColumnFamilyData::~ColumnFamilyData() { } } +void ColumnFamilyData::SetCurrent(Version* current) { + current_ = current; + need_slowdown_for_num_level0_files_ = + (options_.level0_slowdown_writes_trigger >= 0 && + current_->NumLevelFiles(0) >= options_.level0_slowdown_writes_trigger); +} + void ColumnFamilyData::CreateNewMemtable() { assert(current_ != nullptr); if (mem_ != nullptr) { diff --git a/db/column_family.h b/db/column_family.h index 6f20daf17..1eb39b487 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -36,7 +36,7 @@ struct SuperVersion { std::vector to_delete; // should be called outside the mutex - explicit SuperVersion(const int num_memtables = 0); + SuperVersion(); ~SuperVersion(); SuperVersion* Ref(); // Returns true if this was the last reference and caller should @@ -72,7 +72,7 @@ class ColumnFamilyData { Version* current() { return current_; } Version* dummy_versions() { return dummy_versions_; } void SetMemtable(MemTable* new_mem) { mem_ = new_mem; } - void SetCurrent(Version* current) { current_ = current; } + void SetCurrent(Version* current); void CreateNewMemtable(); SuperVersion* GetSuperVersion() const { return super_version_; } @@ -85,6 +85,12 @@ class ColumnFamilyData { // the clients to allocate SuperVersion outside of mutex. SuperVersion* InstallSuperVersion(SuperVersion* new_superversion); + // A Flag indicating whether write needs to slowdown because of there are + // too many number of level0 files. + bool NeedSlowdownForNumLevel0Files() const { + return need_slowdown_for_num_level0_files_; + } + private: uint32_t id_; const std::string name_; @@ -105,6 +111,10 @@ class ColumnFamilyData { // Column Family. All earlier log files must be ignored and not // recovered from uint64_t log_number_; + + // A flag indicating whether we should delay writes because + // we have too many level 0 files + bool need_slowdown_for_num_level0_files_; }; // Thread safe only for reading without a writer. All access should be diff --git a/db/db_impl.cc b/db/db_impl.cc index f2c77ce36..13dbaec9d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1298,8 +1298,7 @@ Status DBImpl::ReFitLevel(int level, int target_level) { assert(level < NumberLevels()); SuperVersion* superversion_to_free = nullptr; - SuperVersion* new_superversion = - new SuperVersion(options_.max_write_buffer_number); + SuperVersion* new_superversion = new SuperVersion(); mutex_.Lock(); @@ -2949,6 +2948,13 @@ std::vector DBImpl::MultiGet( return statList; } +// TODO(icanadi) creating column family while writing will cause a data race. +// In write code path, we iterate through column families and call +// MakeRoomForWrite() for each. MakeRoomForWrite() can unlock the mutex +// and wait (delay the write). If we create or drop a column family when +// that mutex is unlocked for delay, that's bad. +// Solution TODO: enable iteration by chaining column families in +// circular linked lists Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, const std::string& column_family_name, ColumnFamilyHandle* handle) { @@ -3106,9 +3112,14 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { RecordTick(options_.statistics.get(), WRITE_DONE_BY_SELF, 1); } - // May temporarily unlock and wait. - SuperVersion* superversion_to_free = nullptr; - Status status = MakeRoomForWrite(my_batch == nullptr, &superversion_to_free); + Status status; + for (auto cfd : *versions_->GetColumnFamilySet()) { + // May temporarily unlock and wait. + status = MakeRoomForWrite(cfd, my_batch == nullptr); + if (!status.ok()) { + break; + } + } uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions @@ -3209,7 +3220,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { writers_.front()->cv.Signal(); } mutex_.Unlock(); - delete superversion_to_free; return status; } @@ -3295,8 +3305,7 @@ uint64_t DBImpl::SlowdownAmount(int n, double bottom, double top) { // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue -Status DBImpl::MakeRoomForWrite(bool force, - SuperVersion** superversion_to_free) { +Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) { mutex_.AssertHeld(); assert(!writers_.empty()); bool allow_delay = !force; @@ -3305,14 +3314,13 @@ Status DBImpl::MakeRoomForWrite(bool force, uint64_t rate_limit_delay_millis = 0; Status s; double score; - *superversion_to_free = nullptr; while (true) { if (!bg_error_.ok()) { // Yield previous error s = bg_error_; break; - } else if (allow_delay && versions_->NeedSlowdownForNumLevel0Files()) { + } else if (allow_delay && cfd->NeedSlowdownForNumLevel0Files()) { // We are getting close to hitting a hard limit on the number of // L0 files. Rather than delaying a single write by several // seconds when we hit the hard limit, start delaying each @@ -3320,9 +3328,9 @@ Status DBImpl::MakeRoomForWrite(bool force, // this delay hands over some CPU to the compaction thread in // case it is sharing the same core as the writer. uint64_t slowdown = - SlowdownAmount(default_cfd_->current()->NumLevelFiles(0), - options_.level0_slowdown_writes_trigger, - options_.level0_stop_writes_trigger); + SlowdownAmount(cfd->current()->NumLevelFiles(0), + cfd->options()->level0_slowdown_writes_trigger, + cfd->options()->level0_stop_writes_trigger); mutex_.Unlock(); uint64_t delayed; { @@ -3335,15 +3343,15 @@ Status DBImpl::MakeRoomForWrite(bool force, allow_delay = false; // Do not delay a single write more than once mutex_.Lock(); delayed_writes_++; - } else if (!force && (default_cfd_->mem()->ApproximateMemoryUsage() <= - options_.write_buffer_size)) { + } else if (!force && (cfd->mem()->ApproximateMemoryUsage() <= + cfd->options()->write_buffer_size)) { // There is room in current memtable if (allow_delay) { DelayLoggingAndReset(); } break; - } else if (default_cfd_->imm()->size() == - options_.max_write_buffer_number - 1) { + } else if (cfd->imm()->size() == + cfd->options()->max_write_buffer_number - 1) { // We have filled up the current memtable, but the previous // ones are still being compacted, so we wait. DelayLoggingAndReset(); @@ -3351,7 +3359,7 @@ Status DBImpl::MakeRoomForWrite(bool force, uint64_t stall; { StopWatch sw(env_, options_.statistics.get(), - STALL_MEMTABLE_COMPACTION_COUNT); + STALL_MEMTABLE_COMPACTION_COUNT); bg_cv_.Wait(); stall = sw.ElapsedMicros(); } @@ -3359,8 +3367,8 @@ Status DBImpl::MakeRoomForWrite(bool force, STALL_MEMTABLE_COMPACTION_MICROS, stall); internal_stats_.RecordWriteStall(InternalStats::MEMTABLE_COMPACTION, stall); - } else if (default_cfd_->current()->NumLevelFiles(0) >= - options_.level0_stop_writes_trigger) { + } else if (cfd->current()->NumLevelFiles(0) >= + cfd->options()->level0_stop_writes_trigger) { // There are too many level-0 files. DelayLoggingAndReset(); Log(options_.info_log, "wait for fewer level0 files...\n"); @@ -3374,10 +3382,10 @@ Status DBImpl::MakeRoomForWrite(bool force, RecordTick(options_.statistics.get(), STALL_L0_NUM_FILES_MICROS, stall); internal_stats_.RecordWriteStall(InternalStats::LEVEL0_NUM_FILES, stall); } else if (allow_hard_rate_limit_delay && options_.hard_rate_limit > 1.0 && - (score = default_cfd_->current()->MaxCompactionScore()) > - options_.hard_rate_limit) { + (score = cfd->current()->MaxCompactionScore()) > + cfd->options()->hard_rate_limit) { // Delay a write when the compaction score for any level is too large. - int max_level = default_cfd_->current()->MaxCompactionScoreLevel(); + int max_level = cfd->current()->MaxCompactionScoreLevel(); mutex_.Unlock(); uint64_t delayed; { @@ -3392,26 +3400,25 @@ Status DBImpl::MakeRoomForWrite(bool force, rate_limit_delay_millis += rate_limit; RecordTick(options_.statistics.get(), RATE_LIMIT_DELAY_MILLIS, rate_limit); - if (options_.rate_limit_delay_max_milliseconds > 0 && + if (cfd->options()->rate_limit_delay_max_milliseconds > 0 && rate_limit_delay_millis >= - (unsigned)options_.rate_limit_delay_max_milliseconds) { + (unsigned)cfd->options()->rate_limit_delay_max_milliseconds) { allow_hard_rate_limit_delay = false; } mutex_.Lock(); - } else if (allow_soft_rate_limit_delay && options_.soft_rate_limit > 0.0 && - (score = default_cfd_->current()->MaxCompactionScore()) > - options_.soft_rate_limit) { + } else if (allow_soft_rate_limit_delay && + cfd->options()->soft_rate_limit > 0.0 && + (score = cfd->current()->MaxCompactionScore()) > + cfd->options()->soft_rate_limit) { // Delay a write when the compaction score for any level is too large. // TODO: add statistics mutex_.Unlock(); { StopWatch sw(env_, options_.statistics.get(), SOFT_RATE_LIMIT_DELAY_COUNT); - env_->SleepForMicroseconds(SlowdownAmount( - score, - options_.soft_rate_limit, - options_.hard_rate_limit) - ); + env_->SleepForMicroseconds( + SlowdownAmount(score, cfd->options()->soft_rate_limit, + cfd->options()->hard_rate_limit)); rate_limit_delay_millis += sw.ElapsedMicros(); } allow_soft_rate_limit_delay = false; @@ -3436,9 +3443,10 @@ Status DBImpl::MakeRoomForWrite(bool force, if (s.ok()) { // Our final size should be less than write_buffer_size // (compression, etc) but err on the side of caution. - lfile->SetPreallocationBlockSize(1.1 * options_.write_buffer_size); - memtmp = new MemTable(internal_comparator_, options_); - new_superversion = new SuperVersion(options_.max_write_buffer_number); + lfile->SetPreallocationBlockSize(1.1 * + cfd->options()->write_buffer_size); + memtmp = new MemTable(internal_comparator_, *cfd->options()); + new_superversion = new SuperVersion(); } } mutex_.Lock(); @@ -3450,20 +3458,19 @@ Status DBImpl::MakeRoomForWrite(bool force, } logfile_number_ = new_log_number; log_.reset(new log::Writer(std::move(lfile))); - default_cfd_->mem()->SetNextLogNumber(logfile_number_); - default_cfd_->imm()->Add(default_cfd_->mem()); + cfd->mem()->SetNextLogNumber(logfile_number_); + cfd->imm()->Add(cfd->mem()); if (force) { - default_cfd_->imm()->FlushRequested(); + cfd->imm()->FlushRequested(); } memtmp->Ref(); memtmp->SetLogNumber(logfile_number_); - default_cfd_->SetMemtable(memtmp); + cfd->SetMemtable(memtmp); Log(options_.info_log, "New memtable created with log file: #%lu\n", (unsigned long)logfile_number_); force = false; // Do not force another compaction if have room MaybeScheduleFlushOrCompaction(); - *superversion_to_free = - default_cfd_->InstallSuperVersion(new_superversion); + delete cfd->InstallSuperVersion(new_superversion); } } return s; diff --git a/db/db_impl.h b/db/db_impl.h index 68bfa84ea..e144582e7 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -201,9 +201,9 @@ class DBImpl : public DB { // a list of memtables to be free std::vector memtables_to_free; - SuperVersion* superversion_to_free; // if nullptr nothing to free + SuperVersion* superversion_to_free; // if nullptr nothing to free - SuperVersion* new_superversion; // if nullptr no new superversion + SuperVersion* new_superversion; // if nullptr no new superversion // the current manifest_file_number, log_number and prev_log_number // that corresponds to the set of files in 'live'. @@ -216,8 +216,7 @@ class DBImpl : public DB { prev_log_number = 0; memtables_to_free.reserve(num_memtables); superversion_to_free = nullptr; - new_superversion = - create_superversion ? new SuperVersion(num_memtables) : nullptr; + new_superversion = create_superversion ? new SuperVersion() : nullptr; } ~DeletionState() { @@ -303,11 +302,8 @@ class DBImpl : public DB { uint64_t* filenumber); uint64_t SlowdownAmount(int n, double bottom, double top); - // MakeRoomForWrite will return superversion_to_free through an arugment, - // which the caller needs to delete. We do it because caller can delete - // the superversion outside of mutex - Status MakeRoomForWrite(bool force /* compact even if there is room? */, - SuperVersion** superversion_to_free); + Status MakeRoomForWrite(ColumnFamilyData* cfd, + bool force /* flush even if there is room? */); void BuildBatchGroup(Writer** last_writer, autovector* write_batch_group); diff --git a/db/version_set.cc b/db/version_set.cc index 63c08f777..fd03e9982 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1377,7 +1377,6 @@ VersionSet::VersionSet(const std::string& dbname, const Options* options, log_number_(0), prev_log_number_(0), num_levels_(options_->num_levels), - need_slowdown_for_num_level0_files_(false), current_version_number_(0), manifest_file_size_(0), storage_options_(storage_options), @@ -1413,9 +1412,6 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, current->Unref(); } column_family_data->SetCurrent(v); - need_slowdown_for_num_level0_files_ = - (options_->level0_slowdown_writes_trigger >= 0 && - v->NumLevelFiles(0) >= options_->level0_slowdown_writes_trigger); v->Ref(); // Append to linked list diff --git a/db/version_set.h b/db/version_set.h index 38f4639d5..8512f0835 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -315,12 +315,6 @@ class VersionSet { const EnvOptions& storage_options, int new_levels); - // A Flag indicating whether write needs to slowdown because of there are - // too many number of level0 files. - bool NeedSlowdownForNumLevel0Files() const { - return need_slowdown_for_num_level0_files_; - } - // Return the current manifest file number uint64_t ManifestFileNumber() const { return manifest_file_number_; } @@ -482,10 +476,6 @@ class VersionSet { // Opened lazily unique_ptr descriptor_log_; - // A flag indicating whether we should delay writes because - // we have too many level 0 files - bool need_slowdown_for_num_level0_files_; - // An object that keeps all the compaction stats // and picks the next compaction std::unique_ptr compaction_picker_;