diff --git a/HISTORY.md b/HISTORY.md index b35ef0127..bd7bf579f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -20,6 +20,7 @@ * Added new option -- verify_checksums_in_compaction * Chagned Options.prefix_extractor from raw pointer to shared_ptr (take ownership) Changed HashSkipListRepFactory and HashLinkListRepFactory constructor to not take SliceTransform object (use Options.prefix_extractor implicitly) +* Added Env::GetThreadPoolQueueLen(), which returns the waiting queue length of thread pools ### New Features * If we find one truncated record at the end of the MANIFEST or WAL files, diff --git a/db/db_impl.cc b/db/db_impl.cc index 437496cd0..d91466b88 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -226,6 +226,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) logfile_number_(0), default_cf_handle_(nullptr), tmp_batch_(), + bg_schedule_needed_(false), bg_compaction_scheduled_(0), bg_manual_only_(0), bg_flush_scheduled_(0), @@ -1732,6 +1733,7 @@ Status DBImpl::TEST_WaitForCompact() { void DBImpl::MaybeScheduleFlushOrCompaction() { mutex_.AssertHeld(); + bg_schedule_needed_ = false; if (bg_work_gate_closed_) { // gate closed for backgrond work } else if (shutting_down_.Acquire_Load()) { @@ -1752,6 +1754,8 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { if (bg_flush_scheduled_ < options_.max_background_flushes) { bg_flush_scheduled_++; env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH); + } else { + bg_schedule_needed_ = true; } } bool is_compaction_needed = false; @@ -1767,11 +1771,13 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { // Do it only if max_background_compactions hasn't been reached and, in case // bg_manual_only_ > 0, if it's a manual compaction. if ((manual_compaction_ || is_compaction_needed) && - bg_compaction_scheduled_ < options_.max_background_compactions && (!bg_manual_only_ || manual_compaction_)) { - - bg_compaction_scheduled_++; - env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW); + if (bg_compaction_scheduled_ < options_.max_background_compactions) { + bg_compaction_scheduled_++; + env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW); + } else { + bg_schedule_needed_ = true; + } } } } @@ -1850,20 +1856,34 @@ void DBImpl::BackgroundCallFlush() { // to delete all obsolete files and we force FindObsoleteFiles() FindObsoleteFiles(deletion_state, !s.ok()); // delete unnecessary files if any, this is done outside the mutex - if (deletion_state.HaveSomethingToDelete()) { + if (deletion_state.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { mutex_.Unlock(); + // Have to flush the info logs before bg_flush_scheduled_-- + // because if bg_flush_scheduled_ becomes 0 and the lock is + // released, the deconstructor of DB can kick in and destroy all the + // states of DB so info_log might not be available after that point. + // It also applies to access other states that DB owns. log_buffer.FlushBufferToLog(); - PurgeObsoleteFiles(deletion_state); + if (deletion_state.HaveSomethingToDelete()) { + PurgeObsoleteFiles(deletion_state); + } mutex_.Lock(); } bg_flush_scheduled_--; - if (madeProgress) { + // Any time the mutex is released After finding the work to do, another + // thread might execute MaybeScheduleFlushOrCompaction(). It is possible + // that there is a pending job but it is not scheduled because of the + // max thread limit. + if (madeProgress || bg_schedule_needed_) { MaybeScheduleFlushOrCompaction(); } bg_cv_.SignalAll(); + // IMPORTANT: there should be no code after calling SignalAll. This call may + // signal the DB destructor that it's OK to proceed with destruction. In + // that case, all DB variables will be dealloacated and referencing them + // will cause trouble. } - log_buffer.FlushBufferToLog(); } @@ -1913,10 +1933,17 @@ void DBImpl::BackgroundCallCompaction() { FindObsoleteFiles(deletion_state, !s.ok()); // delete unnecessary files if any, this is done outside the mutex - if (deletion_state.HaveSomethingToDelete()) { + if (deletion_state.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { mutex_.Unlock(); + // Have to flush the info logs before bg_compaction_scheduled_-- + // because if bg_flush_scheduled_ becomes 0 and the lock is + // released, the deconstructor of DB can kick in and destroy all the + // states of DB so info_log might not be available after that point. + // It also applies to access other states that DB owns. log_buffer.FlushBufferToLog(); - PurgeObsoleteFiles(deletion_state); + if (deletion_state.HaveSomethingToDelete()) { + PurgeObsoleteFiles(deletion_state); + } mutex_.Lock(); } @@ -1927,12 +1954,20 @@ void DBImpl::BackgroundCallCompaction() { // Previous compaction may have produced too many files in a level, // So reschedule another compaction if we made progress in the // last compaction. - if (madeProgress) { + // + // Also, any time the mutex is released After finding the work to do, + // another thread might execute MaybeScheduleFlushOrCompaction(). It is + // possible that there is a pending job but it is not scheduled because of + // the max thread limit. + if (madeProgress || bg_schedule_needed_) { MaybeScheduleFlushOrCompaction(); } bg_cv_.SignalAll(); + // IMPORTANT: there should be no code after calling SignalAll. This call may + // signal the DB destructor that it's OK to proceed with destruction. In + // that case, all DB variables will be dealloacated and referencing them + // will cause trouble. } - log_buffer.FlushBufferToLog(); } Status DBImpl::BackgroundCompaction(bool* madeProgress, diff --git a/db/db_impl.h b/db/db_impl.h index 60836bac0..daa467010 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -410,6 +410,10 @@ class DBImpl : public DB { // part of ongoing compactions. std::set pending_outputs_; + // At least one compaction or flush job is pending but not yet scheduled + // because of the max background thread limit. + bool bg_schedule_needed_; + // count how many background compactions are running or have been scheduled int bg_compaction_scheduled_; diff --git a/hdfs/env_hdfs.h b/hdfs/env_hdfs.h index 17d8fcb2b..303cd81cf 100644 --- a/hdfs/env_hdfs.h +++ b/hdfs/env_hdfs.h @@ -110,6 +110,11 @@ class HdfsEnv : public Env { virtual void WaitForJoin() { posixEnv->WaitForJoin(); } + virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const + override { + return posixEnv->GetThreadPoolQueueLen(pri); + } + virtual Status GetTestDirectory(std::string* path) { return posixEnv->GetTestDirectory(path); } @@ -292,6 +297,10 @@ class HdfsEnv : public Env { virtual void WaitForJoin() {} + virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const { + return 0; + } + virtual Status GetTestDirectory(std::string* path) {return notsup;} virtual uint64_t NowMicros() {return 0;} diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 4d983619e..e2df8d93f 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -210,6 +210,11 @@ class Env { // Wait for all threads started by StartThread to terminate. virtual void WaitForJoin() = 0; + // Get thread pool queue length for specific thrad pool. + virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const { + return 0; + } + // *path is set to a temporary directory that can be used for testing. It may // or many not have just been created. The directory may or may not differ // between runs of the same process, but subsequent calls will return the @@ -702,6 +707,9 @@ class EnvWrapper : public Env { return target_->StartThread(f, a); } void WaitForJoin() { return target_->WaitForJoin(); } + virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const { + return target_->GetThreadPoolQueueLen(pri); + } virtual Status GetTestDirectory(std::string* path) { return target_->GetTestDirectory(path); } diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index b3d08f888..16603cfd6 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -93,6 +93,8 @@ def main(argv): --max_background_compactions=20 --max_bytes_for_level_base=10485760 --filter_deletes=%s + --memtablerep=prefix_hash + --prefix_size=7 """ % (ops_per_thread, threads, write_buf_size, @@ -108,16 +110,23 @@ def main(argv): print("Running db_stress with pid=%d: %s\n\n" % (child.pid, cmd)) + stop_early = False while time.time() < killtime: - time.sleep(10) + if child.poll() is not None: + print("WARNING: db_stress ended before kill: exitcode=%d\n" + % child.returncode) + stop_early = True + break + time.sleep(1) - if child.poll() is not None: - print("WARNING: db_stress ended before kill: exitcode=%d\n" - % child.returncode) - else: - child.kill() - print("KILLED %d\n" % child.pid) - time.sleep(1) # time to stabilize after a kill + if not stop_early: + if child.poll() is not None: + print("WARNING: db_stress ended before kill: exitcode=%d\n" + % child.returncode) + else: + child.kill() + print("KILLED %d\n" % child.pid) + time.sleep(1) # time to stabilize after a kill while True: line = child.stderr.readline().strip() diff --git a/tools/db_crashtest2.py b/tools/db_crashtest2.py index 8b7ce969d..6a28a0ba4 100644 --- a/tools/db_crashtest2.py +++ b/tools/db_crashtest2.py @@ -107,6 +107,8 @@ def main(argv): --max_background_compactions=20 --max_bytes_for_level_base=10485760 --filter_deletes=%s + --memtablerep=prefix_hash + --prefix_size=7 %s """ % (random.randint(0, 1), threads, diff --git a/tools/db_stress.cc b/tools/db_stress.cc index db4cec480..e6af188d5 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -334,19 +334,19 @@ enum RepFactory StringToRepFactory(const char* ctype) { return kSkipList; } static enum RepFactory FLAGS_rep_factory; -DEFINE_string(memtablerep, "skip_list", ""); +DEFINE_string(memtablerep, "prefix_hash", ""); static bool ValidatePrefixSize(const char* flagname, int32_t value) { - if (value < 0 || value>=2000000000) { - fprintf(stderr, "Invalid value for --%s: %d. 0<= PrefixSize <=2000000000\n", + if (value < 0 || value > 8) { + fprintf(stderr, "Invalid value for --%s: %d. 0 <= PrefixSize <= 8\n", flagname, value); return false; } return true; } -DEFINE_int32(prefix_size, 0, "Control the prefix size for HashSkipListRep"); -static const bool FLAGS_prefix_size_dummy __attribute__((unused)) = - google::RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize); +DEFINE_int32(prefix_size, 7, "Control the prefix size for HashSkipListRep"); +static const bool FLAGS_prefix_size_dummy = + google::RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize); DEFINE_bool(use_merge, false, "On true, replaces all writes with a Merge " "that behaves like a Put"); @@ -951,15 +951,15 @@ class StressTest { return s; } - // Given a prefix P, this does prefix scans for "0"+P, "1"+P,..."9"+P - // in the same snapshot. Each of these 10 scans returns a series of - // values; each series should be the same length, and it is verified - // for each index i that all the i'th values are of the form "0"+V, - // "1"+V,..."9"+V. + // Given a key, this does prefix scans for "0"+P, "1"+P,..."9"+P + // in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes + // of the key. Each of these 10 scans returns a series of values; + // each series should be the same length, and it is verified for each + // index i that all the i'th values are of the form "0"+V, "1"+V,..."9"+V. // ASSUMES that MultiPut was used to put (K, V) Status MultiPrefixScan(ThreadState* thread, const ReadOptions& readoptions, ColumnFamilyHandle* column_family, - const Slice& prefix) { + const Slice& key) { std::string prefixes[10] = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}; Slice prefix_slices[10]; @@ -968,8 +968,9 @@ class StressTest { Iterator* iters[10]; Status s = Status::OK(); for (int i = 0; i < 10; i++) { - prefixes[i] += prefix.ToString(); - prefix_slices[i] = prefixes[i]; + prefixes[i] += key.ToString(); + prefixes[i].resize(FLAGS_prefix_size); + prefix_slices[i] = Slice(prefixes[i]); readoptionscopy[i] = readoptions; readoptionscopy[i].prefix = &prefix_slices[i]; readoptionscopy[i].snapshot = snapshot; @@ -1000,7 +1001,7 @@ class StressTest { for (int i = 0; i < 10; i++) { if (values[i] != values[0]) { fprintf(stderr, "error : inconsistent values for prefix %s: %s, %s\n", - prefix.ToString().c_str(), values[0].c_str(), + prefixes[i].c_str(), values[0].c_str(), values[i].c_str()); // we continue after error rather than exiting so that we can // find more errors if any @@ -1035,6 +1036,7 @@ class StressTest { const Snapshot* snapshot = db_->GetSnapshot(); ReadOptions readoptionscopy = readoptions; readoptionscopy.snapshot = snapshot; + readoptionscopy.prefix_seek = FLAGS_prefix_size > 0; unique_ptr iter(db_->NewIterator(readoptionscopy, column_family)); iter->Seek(key); @@ -1149,19 +1151,21 @@ class StressTest { } } else if ((int)FLAGS_readpercent <= prob_op && prob_op < prefixBound) { // OPERATION prefix scan - // keys are longs (e.g., 8 bytes), so we let prefixes be - // everything except the last byte. So there will be 2^8=256 - // keys per prefix. - Slice prefix = Slice(key.data(), key.size() - 1); + // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are + // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will + // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same + // prefix if (!FLAGS_test_batches_snapshots) { + Slice prefix = Slice(key.data(), FLAGS_prefix_size); read_opts.prefix = &prefix; Iterator* iter = db_->NewIterator(read_opts, column_family); - int count = 0; + int64_t count = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { assert(iter->key().starts_with(prefix)); - count++; + ++count; } - assert(count <= 256); + assert(count <= + (static_cast(1) << ((8 - FLAGS_prefix_size) * 8))); if (iter->status().ok()) { thread->stats.AddPrefixes(1, count); } else { @@ -1169,7 +1173,7 @@ class StressTest { } delete iter; } else { - MultiPrefixScan(thread, read_opts, column_family, prefix); + MultiPrefixScan(thread, read_opts, column_family, key); } read_opts.prefix = nullptr; } else if (prefixBound <= prob_op && prob_op < writeBound) { @@ -1617,6 +1621,18 @@ int main(int argc, char** argv) { // max number of concurrent compactions. FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions); + if (FLAGS_prefixpercent > 0 && FLAGS_prefix_size <= 0) { + fprintf(stderr, + "Error: prefixpercent is non-zero while prefix_size is " + "not positive!\n"); + exit(1); + } + if (FLAGS_test_batches_snapshots && FLAGS_prefix_size <= 0) { + fprintf(stderr, + "Error: please specify prefix_size for " + "test_batches_snapshots test!\n"); + exit(1); + } if ((FLAGS_readpercent + FLAGS_prefixpercent + FLAGS_writepercent + FLAGS_delpercent + FLAGS_iterpercent) != 100) { fprintf(stderr, diff --git a/util/env_posix.cc b/util/env_posix.cc index e019d6af0..89d8df68d 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -1206,6 +1206,8 @@ class PosixEnv : public Env { virtual void WaitForJoin(); + virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override; + virtual Status GetTestDirectory(std::string* result) { const char* env = getenv("TEST_TMPDIR"); if (env && env[0] != '\0') { @@ -1370,12 +1372,12 @@ class PosixEnv : public Env { class ThreadPool { public: - - ThreadPool() : - total_threads_limit_(1), - bgthreads_(0), - queue_(), - exit_all_threads_(false) { + ThreadPool() + : total_threads_limit_(1), + bgthreads_(0), + queue_(), + queue_len_(0), + exit_all_threads_(false) { PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr)); } @@ -1405,6 +1407,7 @@ class PosixEnv : public Env { void (*function)(void*) = queue_.front().function; void* arg = queue_.front().arg; queue_.pop_front(); + queue_len_.store(queue_.size(), std::memory_order_relaxed); PthreadCall("unlock", pthread_mutex_unlock(&mu_)); (*function)(arg); @@ -1459,6 +1462,7 @@ class PosixEnv : public Env { queue_.push_back(BGItem()); queue_.back().function = function; queue_.back().arg = arg; + queue_len_.store(queue_.size(), std::memory_order_relaxed); // always wake up at least one waiting thread. PthreadCall("signal", pthread_cond_signal(&bgsignal_)); @@ -1466,6 +1470,10 @@ class PosixEnv : public Env { PthreadCall("unlock", pthread_mutex_unlock(&mu_)); } + unsigned int GetQueueLen() const { + return queue_len_.load(std::memory_order_relaxed); + } + private: // Entry per Schedule() call struct BGItem { void* arg; void (*function)(void*); }; @@ -1476,6 +1484,7 @@ class PosixEnv : public Env { int total_threads_limit_; std::vector bgthreads_; BGQueue queue_; + std::atomic_uint queue_len_; // Queue length. Used for stats reporting bool exit_all_threads_; }; @@ -1498,6 +1507,11 @@ void PosixEnv::Schedule(void (*function)(void*), void* arg, Priority pri) { thread_pools_[pri].Schedule(function, arg); } +unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const { + assert(pri >= Priority::LOW && pri <= Priority::HIGH); + return thread_pools_[pri].GetQueueLen(); +} + namespace { struct StartThreadState { void (*user_function)(void*); diff --git a/util/env_test.cc b/util/env_test.cc index a442e3a5c..e17027a39 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -172,17 +172,30 @@ TEST(EnvPosixTest, TwoPools) { env_->SetBackgroundThreads(kLowPoolSize); env_->SetBackgroundThreads(kHighPoolSize, Env::Priority::HIGH); + ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + // schedule same number of jobs in each pool for (int i = 0; i < kJobs; i++) { env_->Schedule(&CB::Run, &low_pool_job); env_->Schedule(&CB::Run, &high_pool_job, Env::Priority::HIGH); } + // Wait a short while for the jobs to be dispatched. + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(kJobs - kLowPoolSize, env_->GetThreadPoolQueueLen()); + ASSERT_EQ(kJobs - kLowPoolSize, + env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + ASSERT_EQ(kJobs - kHighPoolSize, + env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); // wait for all jobs to finish while (low_pool_job.NumFinished() < kJobs || high_pool_job.NumFinished() < kJobs) { env_->SleepForMicroseconds(kDelayMicros); } + + ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); } bool IsSingleVarint(const std::string& s) { diff --git a/util/log_buffer.h b/util/log_buffer.h index 76503a084..8ebe92e03 100644 --- a/util/log_buffer.h +++ b/util/log_buffer.h @@ -23,6 +23,8 @@ class LogBuffer { // Add a log entry to the buffer. void AddLogToBuffer(const char* format, va_list ap); + size_t IsEmpty() const { return logs_.empty(); } + // Flush all buffered log to the info log. void FlushBufferToLog();