From 644802d0901c5d29f17486cd5d24d92f629bd5ee Mon Sep 17 00:00:00 2001 From: Hallvard Furuseth Date: Mon, 21 Jul 2014 08:12:27 +0200 Subject: [PATCH] Clean up MDB mutex code, catch errors. Catch mutex_lock errors. Prepare for next commit (robust mutexes), no change in behavior: - Replace LOCK_MUTEX_R,... -> mdb_mutex_t, MDB_MUTEX, LOCK_MUTEX0,... - Rename mti_mutex -> mti_rmutex, so MDB_MUTEX() can get at it. --- libraries/liblmdb/mdb.c | 92 ++++++++++++++++++++++------------------- 1 file changed, 49 insertions(+), 43 deletions(-) diff --git a/libraries/liblmdb/mdb.c b/libraries/liblmdb/mdb.c index 3430ec1..6f6eece 100644 --- a/libraries/liblmdb/mdb.c +++ b/libraries/liblmdb/mdb.c @@ -194,6 +194,7 @@ #define pthread_t HANDLE #define pthread_mutex_t HANDLE #define pthread_cond_t HANDLE +typedef HANDLE mdb_mutex_t; #define pthread_key_t DWORD #define pthread_self() GetCurrentThreadId() #define pthread_key_create(x,y) \ @@ -207,10 +208,9 @@ #define pthread_cond_wait(cond,mutex) do{SignalObjectAndWait(*mutex, *cond, INFINITE, FALSE); WaitForSingleObject(*mutex, INFINITE);}while(0) #define THREAD_CREATE(thr,start,arg) thr=CreateThread(NULL,0,start,arg,0,NULL) #define THREAD_FINISH(thr) WaitForSingleObject(thr, INFINITE) -#define LOCK_MUTEX_R(env) pthread_mutex_lock(&(env)->me_rmutex) -#define UNLOCK_MUTEX_R(env) pthread_mutex_unlock(&(env)->me_rmutex) -#define LOCK_MUTEX_W(env) pthread_mutex_lock(&(env)->me_wmutex) -#define UNLOCK_MUTEX_W(env) pthread_mutex_unlock(&(env)->me_wmutex) +#define MDB_MUTEX(env, rw) ((env)->me_##rw##mutex) +#define LOCK_MUTEX0(mutex) WaitForSingleObject(mutex, INFINITE) +#define UNLOCK_MUTEX(mutex) ReleaseMutex(mutex) #define getpid() GetCurrentProcessId() #define MDB_FDATASYNC(fd) (!FlushFileBuffers(fd)) #define MDB_MSYNC(addr,len,flags) (!FlushViewOfFile(addr,len)) @@ -235,10 +235,10 @@ #ifdef MDB_USE_POSIX_SEM -#define LOCK_MUTEX_R(env) mdb_sem_wait((env)->me_rmutex) -#define UNLOCK_MUTEX_R(env) sem_post((env)->me_rmutex) -#define LOCK_MUTEX_W(env) mdb_sem_wait((env)->me_wmutex) -#define UNLOCK_MUTEX_W(env) sem_post((env)->me_wmutex) +typedef sem_t *mdb_mutex_t; +#define MDB_MUTEX(env, rw) ((env)->me_##rw##mutex) +#define LOCK_MUTEX0(mutex) mdb_sem_wait(mutex) +#define UNLOCK_MUTEX(mutex) sem_post(mutex) static int mdb_sem_wait(sem_t *sem) @@ -249,21 +249,18 @@ mdb_sem_wait(sem_t *sem) } #else - /** Lock the reader mutex. + /** Pointer/HANDLE type of shared mutex/semaphore. */ -#define LOCK_MUTEX_R(env) pthread_mutex_lock(&(env)->me_txns->mti_mutex) - /** Unlock the reader mutex. +typedef pthread_mutex_t *mdb_mutex_t; + /** Mutex for the reader table (rw = r) or write transaction (rw = w). */ -#define UNLOCK_MUTEX_R(env) pthread_mutex_unlock(&(env)->me_txns->mti_mutex) - - /** Lock the writer mutex. - * Only a single write transaction is allowed at a time. Other writers - * will block waiting for this mutex. +#define MDB_MUTEX(env, rw) (&(env)->me_txns->mti_##rw##mutex) + /** Lock the reader or writer mutex. */ -#define LOCK_MUTEX_W(env) pthread_mutex_lock(&(env)->me_txns->mti_wmutex) - /** Unlock the writer mutex. +#define LOCK_MUTEX0(mutex) pthread_mutex_lock(mutex) + /** Unlock the reader or writer mutex. */ -#define UNLOCK_MUTEX_W(env) pthread_mutex_unlock(&(env)->me_txns->mti_wmutex) +#define UNLOCK_MUTEX(mutex) pthread_mutex_unlock(mutex) #endif /* MDB_USE_POSIX_SEM */ /** Get the error code for the last failed system function. @@ -297,6 +294,8 @@ mdb_sem_wait(sem_t *sem) /** @} */ +#define LOCK_MUTEX(rc, env, mutex) ((rc) = LOCK_MUTEX0(mutex)) + #ifndef _WIN32 /** A flag for opening a file and requesting synchronous data writes. * This is only used when writing a meta page. It's not strictly needed; @@ -608,9 +607,9 @@ typedef struct MDB_txbody { char mtb_rmname[MNAME_LEN]; #else /** Mutex protecting access to this table. - * This is the reader lock that #LOCK_MUTEX_R acquires. + * This is the #MDB_MUTEX(env,r) reader table lock. */ - pthread_mutex_t mtb_mutex; + pthread_mutex_t mtb_rmutex; #endif /** The ID of the last transaction committed to the database. * This is recorded here only for convenience; the value can always @@ -630,7 +629,7 @@ typedef struct MDB_txninfo { MDB_txbody mtb; #define mti_magic mt1.mtb.mtb_magic #define mti_format mt1.mtb.mtb_format -#define mti_mutex mt1.mtb.mtb_mutex +#define mti_rmutex mt1.mtb.mtb_rmutex #define mti_rmname mt1.mtb.mtb_rmname #define mti_txnid mt1.mtb.mtb_txnid #define mti_numreaders mt1.mtb.mtb_numreaders @@ -1119,11 +1118,11 @@ struct MDB_env { int me_live_reader; /**< have liveness lock in reader table */ #ifdef _WIN32 int me_pidquery; /**< Used in OpenProcess */ - HANDLE me_rmutex; /* Windows mutexes don't reside in shared mem */ - HANDLE me_wmutex; -#elif defined(MDB_USE_POSIX_SEM) - sem_t *me_rmutex; /* Shared mutexes are not supported */ - sem_t *me_wmutex; +#endif +#if defined(_WIN32) || defined(MDB_USE_POSIX_SEM) + /* Windows mutexes/POSIX semaphores do not reside in shared mem */ + mdb_mutex_t me_rmutex; + mdb_mutex_t me_wmutex; #endif void *me_userctx; /**< User-settable context */ MDB_assert_func *me_assert_func; /**< Callback for assertion failures */ @@ -2463,6 +2462,7 @@ mdb_txn_renew0(MDB_txn *txn) } else { MDB_PID_T pid = env->me_pid; MDB_THR_T tid = pthread_self(); + mdb_mutex_t rmutex = MDB_MUTEX(env, r); if (!env->me_live_reader) { rc = mdb_reader_pid(env, Pidset, pid); @@ -2471,13 +2471,14 @@ mdb_txn_renew0(MDB_txn *txn) env->me_live_reader = 1; } - LOCK_MUTEX_R(env); + if (LOCK_MUTEX(rc, env, rmutex)) + return rc; nr = ti->mti_numreaders; for (i=0; imti_readers[i].mr_pid == 0) break; if (i == env->me_maxreaders) { - UNLOCK_MUTEX_R(env); + UNLOCK_MUTEX(rmutex); return MDB_READERS_FULL; } ti->mti_readers[i].mr_pid = pid; @@ -2486,7 +2487,7 @@ mdb_txn_renew0(MDB_txn *txn) ti->mti_numreaders = ++nr; /* Save numreaders for un-mutexed mdb_env_close() */ env->me_numreaders = nr; - UNLOCK_MUTEX_R(env); + UNLOCK_MUTEX(rmutex); r = &ti->mti_readers[i]; new_notls = (env->me_flags & MDB_NOTLS); @@ -2501,7 +2502,8 @@ mdb_txn_renew0(MDB_txn *txn) } } else { if (ti) { - LOCK_MUTEX_W(env); + if (LOCK_MUTEX(rc, env, MDB_MUTEX(env, w))) + return rc; txn->mt_txnid = ti->mti_txnid; meta = env->me_metas[txn->mt_txnid & 1]; @@ -2766,7 +2768,7 @@ mdb_txn_reset0(MDB_txn *txn, const char *act) env->me_txn = NULL; /* The writer mutex was locked in mdb_txn_begin. */ if (env->me_txns) - UNLOCK_MUTEX_W(env); + UNLOCK_MUTEX(MDB_MUTEX(env, w)); } } @@ -3333,7 +3335,7 @@ done: mdb_dbis_update(txn, 1); if (env->me_txns) - UNLOCK_MUTEX_W(env); + UNLOCK_MUTEX(MDB_MUTEX(env, w)); free(txn); return MDB_SUCCESS; @@ -4302,7 +4304,7 @@ mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl) if ((rc = pthread_mutexattr_init(&mattr)) || (rc = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED)) - || (rc = pthread_mutex_init(&env->me_txns->mti_mutex, &mattr)) + || (rc = pthread_mutex_init(&env->me_txns->mti_rmutex, &mattr)) || (rc = pthread_mutex_init(&env->me_txns->mti_wmutex, &mattr))) goto fail; pthread_mutexattr_destroy(&mattr); @@ -4585,7 +4587,6 @@ mdb_env_close0(MDB_env *env, int excl) env->me_flags &= ~(MDB_ENV_ACTIVE|MDB_ENV_TXKEY); } - void ESECT mdb_env_close(MDB_env *env) { @@ -8533,6 +8534,7 @@ static int ESECT mdb_env_copyfd0(MDB_env *env, HANDLE fd) { MDB_txn *txn = NULL; + mdb_mutex_t wmutex = NULL; int rc; size_t wsize; char *ptr; @@ -8557,11 +8559,13 @@ mdb_env_copyfd0(MDB_env *env, HANDLE fd) mdb_txn_reset0(txn, "reset-stage1"); /* Temporarily block writers until we snapshot the meta pages */ - LOCK_MUTEX_W(env); + wmutex = MDB_MUTEX(env, w); + if (LOCK_MUTEX(rc, env, wmutex)) + goto leave; rc = mdb_txn_renew0(txn); if (rc) { - UNLOCK_MUTEX_W(env); + UNLOCK_MUTEX(wmutex); goto leave; } } @@ -8585,8 +8589,8 @@ mdb_env_copyfd0(MDB_env *env, HANDLE fd) break; } } - if (env->me_txns) - UNLOCK_MUTEX_W(env); + if (wmutex) + UNLOCK_MUTEX(wmutex); if (rc) goto leave; @@ -9272,7 +9276,7 @@ mdb_reader_check(MDB_env *env, int *dead) unsigned int i, j, rdrs; MDB_reader *mr; MDB_PID_T *pids, pid; - int count = 0; + int rc = MDB_SUCCESS, count = 0; if (!env) return EINVAL; @@ -9291,7 +9295,9 @@ mdb_reader_check(MDB_env *env, int *dead) pid = mr[i].mr_pid; if (mdb_pid_insert(pids, pid) == 0) { if (!mdb_reader_pid(env, Pidcheck, pid)) { - LOCK_MUTEX_R(env); + mdb_mutex_t rmutex = MDB_MUTEX(env, r); + if (LOCK_MUTEX(rc, env, rmutex)) + break; /* Recheck, a new process may have reused pid */ if (!mdb_reader_pid(env, Pidcheck, pid)) { for (j=i; j