Exit and Join the background compaction threads while running rocksdb tests

Summary:
The background compaction threads are never exitted and therefore caused
memory-leaks while running rpcksdb tests. Have changed the PosixEnv destructor to exit and join them and changed the tests likewise
The memory leaked has reduced from 320 bytes to 64 bytes in all the tests. The 64
bytes is relating to
pthread_exit, but still have to figure out why. The stack-trace right now with
table_test.cc = 64 bytes in 1 blocks are possibly lost in loss record 4 of 5
   at 0x475D8C: malloc (jemalloc.c:914)
   by 0x400D69E: _dl_map_object_deps (dl-deps.c:505)
   by 0x4013393: dl_open_worker (dl-open.c:263)
   by 0x400F015: _dl_catch_error (dl-error.c:178)
   by 0x4013B2B: _dl_open (dl-open.c:569)
   by 0x5D3E913: do_dlopen (dl-libc.c:86)
   by 0x400F015: _dl_catch_error (dl-error.c:178)
   by 0x5D3E9D6: __libc_dlopen_mode (dl-libc.c:47)
   by 0x5048BF3: pthread_cancel_init (unwind-forcedunwind.c:53)
   by 0x5048DC9: _Unwind_ForcedUnwind (unwind-forcedunwind.c:126)
   by 0x5046D9F: __pthread_unwind (unwind.c:130)
   by 0x50413A4: pthread_exit (pthreadP.h:289)

Test Plan: make all check

Reviewers: dhruba, sheki, haobo

Reviewed By: dhruba

CC: leveldb, chip

Differential Revision: https://reviews.facebook.net/D9573
main
Mayank Agarwal 12 years ago
parent e21ba94a69
commit 6594fef7ef
  1. 47
      util/env_posix.cc

@ -600,9 +600,9 @@ class PosixFileLock : public FileLock {
class PosixEnv : public Env { class PosixEnv : public Env {
public: public:
PosixEnv(); PosixEnv();
virtual ~PosixEnv() {
fprintf(stderr, "Destroying Env::Default()\n"); virtual ~PosixEnv(){
exit(1); WaitForBGThreads();
} }
void SetFD_CLOEXEC(int fd, const EnvOptions* options) { void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
@ -804,6 +804,8 @@ class PosixEnv : public Env {
virtual void Schedule(void (*function)(void*), void* arg); virtual void Schedule(void (*function)(void*), void* arg);
virtual void WaitForBGThreads();
virtual void StartThread(void (*function)(void* arg), void* arg); virtual void StartThread(void (*function)(void* arg), void* arg);
virtual Status GetTestDirectory(std::string* result) { virtual Status GetTestDirectory(std::string* result) {
@ -973,22 +975,43 @@ class PosixEnv : public Env {
// Entry per Schedule() call // Entry per Schedule() call
struct BGItem { void* arg; void (*function)(void*); }; struct BGItem { void* arg; void (*function)(void*); };
typedef std::deque<BGItem> BGQueue; typedef std::deque<BGItem> BGQueue;
int queue_size_; // number of items in BGQueue
bool exit_all_threads_;
BGQueue queue_; BGQueue queue_;
std::vector<pthread_t> threads_to_join_;
}; };
PosixEnv::PosixEnv() : checkedDiskForMmap_(false), PosixEnv::PosixEnv() : checkedDiskForMmap_(false),
forceMmapOff(false), forceMmapOff(false),
page_size_(getpagesize()), page_size_(getpagesize()),
started_bgthread_(0), started_bgthread_(0),
num_threads_(1) { num_threads_(1),
queue_size_(0),
exit_all_threads_(false) {
PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr)); PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr));
bgthread_.resize(num_threads_); bgthread_.resize(num_threads_);
} }
// Signal and Join all background threads started by calls to Schedule
void PosixEnv::WaitForBGThreads() {
PthreadCall("lock", pthread_mutex_lock(&mu_));
assert(! exit_all_threads_);
exit_all_threads_ = true;
PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_));
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
for (unsigned int i = 0; i < threads_to_join_.size(); i++) {
pthread_join(threads_to_join_[i], nullptr);
}
}
void PosixEnv::Schedule(void (*function)(void*), void* arg) { void PosixEnv::Schedule(void (*function)(void*), void* arg) {
PthreadCall("lock", pthread_mutex_lock(&mu_)); PthreadCall("lock", pthread_mutex_lock(&mu_));
if (exit_all_threads_) {
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
return;
}
// Start background thread if necessary // Start background thread if necessary
for (; started_bgthread_ < num_threads_; started_bgthread_++) { for (; started_bgthread_ < num_threads_; started_bgthread_++) {
PthreadCall( PthreadCall(
@ -997,6 +1020,7 @@ void PosixEnv::Schedule(void (*function)(void*), void* arg) {
nullptr, nullptr,
&PosixEnv::BGThreadWrapper, &PosixEnv::BGThreadWrapper,
this)); this));
threads_to_join_.push_back(bgthread_[started_bgthread_]);
fprintf(stdout, "Created bg thread 0x%lx\n", bgthread_[started_bgthread_]); fprintf(stdout, "Created bg thread 0x%lx\n", bgthread_[started_bgthread_]);
} }
@ -1015,10 +1039,13 @@ void PosixEnv::BGThread() {
while (true) { while (true) {
// Wait until there is an item that is ready to run // Wait until there is an item that is ready to run
PthreadCall("lock", pthread_mutex_lock(&mu_)); PthreadCall("lock", pthread_mutex_lock(&mu_));
while (queue_.empty()) { while (queue_.empty() && !exit_all_threads_) {
PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_)); PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
} }
if (exit_all_threads_) { // mechanism to let BG threads exit safely
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
break;
}
void (*function)(void*) = queue_.front().function; void (*function)(void*) = queue_.front().function;
void* arg = queue_.front().arg; void* arg = queue_.front().arg;
queue_.pop_front(); queue_.pop_front();
@ -1048,17 +1075,15 @@ void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
state->arg = arg; state->arg = arg;
PthreadCall("start thread", PthreadCall("start thread",
pthread_create(&t, nullptr, &StartThreadWrapper, state)); pthread_create(&t, nullptr, &StartThreadWrapper, state));
threads_to_join_.push_back(t);
} }
} // namespace } // namespace
static pthread_once_t once = PTHREAD_ONCE_INIT; static PosixEnv default_env;
static Env* default_env;
static void InitDefaultEnv() { default_env = new PosixEnv; }
Env* Env::Default() { Env* Env::Default() {
pthread_once(&once, InitDefaultEnv); return &default_env;
return default_env;
} }
} // namespace leveldb } // namespace leveldb

Loading…
Cancel
Save