Clean up MDB mutex code, catch errors.

Catch mutex_lock errors.
Prepare for next commit (robust mutexes), no change in behavior:
- Replace LOCK_MUTEX_R,... -> mdb_mutex_t, MDB_MUTEX, LOCK_MUTEX0,...
- Rename mti_mutex -> mti_rmutex, so MDB_MUTEX() can get at it.
robust
Hallvard Furuseth 10 years ago committed by Hallvard Furuseth
parent 90d9d096bb
commit 644802d090
  1. 92
      libraries/liblmdb/mdb.c

@ -194,6 +194,7 @@
#define pthread_t HANDLE #define pthread_t HANDLE
#define pthread_mutex_t HANDLE #define pthread_mutex_t HANDLE
#define pthread_cond_t HANDLE #define pthread_cond_t HANDLE
typedef HANDLE mdb_mutex_t;
#define pthread_key_t DWORD #define pthread_key_t DWORD
#define pthread_self() GetCurrentThreadId() #define pthread_self() GetCurrentThreadId()
#define pthread_key_create(x,y) \ #define pthread_key_create(x,y) \
@ -207,10 +208,9 @@
#define pthread_cond_wait(cond,mutex) do{SignalObjectAndWait(*mutex, *cond, INFINITE, FALSE); WaitForSingleObject(*mutex, INFINITE);}while(0) #define pthread_cond_wait(cond,mutex) do{SignalObjectAndWait(*mutex, *cond, INFINITE, FALSE); WaitForSingleObject(*mutex, INFINITE);}while(0)
#define THREAD_CREATE(thr,start,arg) thr=CreateThread(NULL,0,start,arg,0,NULL) #define THREAD_CREATE(thr,start,arg) thr=CreateThread(NULL,0,start,arg,0,NULL)
#define THREAD_FINISH(thr) WaitForSingleObject(thr, INFINITE) #define THREAD_FINISH(thr) WaitForSingleObject(thr, INFINITE)
#define LOCK_MUTEX_R(env) pthread_mutex_lock(&(env)->me_rmutex) #define MDB_MUTEX(env, rw) ((env)->me_##rw##mutex)
#define UNLOCK_MUTEX_R(env) pthread_mutex_unlock(&(env)->me_rmutex) #define LOCK_MUTEX0(mutex) WaitForSingleObject(mutex, INFINITE)
#define LOCK_MUTEX_W(env) pthread_mutex_lock(&(env)->me_wmutex) #define UNLOCK_MUTEX(mutex) ReleaseMutex(mutex)
#define UNLOCK_MUTEX_W(env) pthread_mutex_unlock(&(env)->me_wmutex)
#define getpid() GetCurrentProcessId() #define getpid() GetCurrentProcessId()
#define MDB_FDATASYNC(fd) (!FlushFileBuffers(fd)) #define MDB_FDATASYNC(fd) (!FlushFileBuffers(fd))
#define MDB_MSYNC(addr,len,flags) (!FlushViewOfFile(addr,len)) #define MDB_MSYNC(addr,len,flags) (!FlushViewOfFile(addr,len))
@ -235,10 +235,10 @@
#ifdef MDB_USE_POSIX_SEM #ifdef MDB_USE_POSIX_SEM
#define LOCK_MUTEX_R(env) mdb_sem_wait((env)->me_rmutex) typedef sem_t *mdb_mutex_t;
#define UNLOCK_MUTEX_R(env) sem_post((env)->me_rmutex) #define MDB_MUTEX(env, rw) ((env)->me_##rw##mutex)
#define LOCK_MUTEX_W(env) mdb_sem_wait((env)->me_wmutex) #define LOCK_MUTEX0(mutex) mdb_sem_wait(mutex)
#define UNLOCK_MUTEX_W(env) sem_post((env)->me_wmutex) #define UNLOCK_MUTEX(mutex) sem_post(mutex)
static int static int
mdb_sem_wait(sem_t *sem) mdb_sem_wait(sem_t *sem)
@ -249,21 +249,18 @@ mdb_sem_wait(sem_t *sem)
} }
#else #else
/** Lock the reader mutex. /** Pointer/HANDLE type of shared mutex/semaphore.
*/ */
#define LOCK_MUTEX_R(env) pthread_mutex_lock(&(env)->me_txns->mti_mutex) typedef pthread_mutex_t *mdb_mutex_t;
/** Unlock the reader mutex. /** Mutex for the reader table (rw = r) or write transaction (rw = w).
*/ */
#define UNLOCK_MUTEX_R(env) pthread_mutex_unlock(&(env)->me_txns->mti_mutex) #define MDB_MUTEX(env, rw) (&(env)->me_txns->mti_##rw##mutex)
/** Lock the reader or writer mutex.
/** Lock the writer mutex.
* Only a single write transaction is allowed at a time. Other writers
* will block waiting for this mutex.
*/ */
#define LOCK_MUTEX_W(env) pthread_mutex_lock(&(env)->me_txns->mti_wmutex) #define LOCK_MUTEX0(mutex) pthread_mutex_lock(mutex)
/** Unlock the writer mutex. /** Unlock the reader or writer mutex.
*/ */
#define UNLOCK_MUTEX_W(env) pthread_mutex_unlock(&(env)->me_txns->mti_wmutex) #define UNLOCK_MUTEX(mutex) pthread_mutex_unlock(mutex)
#endif /* MDB_USE_POSIX_SEM */ #endif /* MDB_USE_POSIX_SEM */
/** Get the error code for the last failed system function. /** Get the error code for the last failed system function.
@ -297,6 +294,8 @@ mdb_sem_wait(sem_t *sem)
/** @} */ /** @} */
#define LOCK_MUTEX(rc, env, mutex) ((rc) = LOCK_MUTEX0(mutex))
#ifndef _WIN32 #ifndef _WIN32
/** A flag for opening a file and requesting synchronous data writes. /** A flag for opening a file and requesting synchronous data writes.
* This is only used when writing a meta page. It's not strictly needed; * This is only used when writing a meta page. It's not strictly needed;
@ -608,9 +607,9 @@ typedef struct MDB_txbody {
char mtb_rmname[MNAME_LEN]; char mtb_rmname[MNAME_LEN];
#else #else
/** Mutex protecting access to this table. /** Mutex protecting access to this table.
* This is the reader lock that #LOCK_MUTEX_R acquires. * This is the #MDB_MUTEX(env,r) reader table lock.
*/ */
pthread_mutex_t mtb_mutex; pthread_mutex_t mtb_rmutex;
#endif #endif
/** The ID of the last transaction committed to the database. /** The ID of the last transaction committed to the database.
* This is recorded here only for convenience; the value can always * This is recorded here only for convenience; the value can always
@ -630,7 +629,7 @@ typedef struct MDB_txninfo {
MDB_txbody mtb; MDB_txbody mtb;
#define mti_magic mt1.mtb.mtb_magic #define mti_magic mt1.mtb.mtb_magic
#define mti_format mt1.mtb.mtb_format #define mti_format mt1.mtb.mtb_format
#define mti_mutex mt1.mtb.mtb_mutex #define mti_rmutex mt1.mtb.mtb_rmutex
#define mti_rmname mt1.mtb.mtb_rmname #define mti_rmname mt1.mtb.mtb_rmname
#define mti_txnid mt1.mtb.mtb_txnid #define mti_txnid mt1.mtb.mtb_txnid
#define mti_numreaders mt1.mtb.mtb_numreaders #define mti_numreaders mt1.mtb.mtb_numreaders
@ -1119,11 +1118,11 @@ struct MDB_env {
int me_live_reader; /**< have liveness lock in reader table */ int me_live_reader; /**< have liveness lock in reader table */
#ifdef _WIN32 #ifdef _WIN32
int me_pidquery; /**< Used in OpenProcess */ int me_pidquery; /**< Used in OpenProcess */
HANDLE me_rmutex; /* Windows mutexes don't reside in shared mem */ #endif
HANDLE me_wmutex; #if defined(_WIN32) || defined(MDB_USE_POSIX_SEM)
#elif defined(MDB_USE_POSIX_SEM) /* Windows mutexes/POSIX semaphores do not reside in shared mem */
sem_t *me_rmutex; /* Shared mutexes are not supported */ mdb_mutex_t me_rmutex;
sem_t *me_wmutex; mdb_mutex_t me_wmutex;
#endif #endif
void *me_userctx; /**< User-settable context */ void *me_userctx; /**< User-settable context */
MDB_assert_func *me_assert_func; /**< Callback for assertion failures */ MDB_assert_func *me_assert_func; /**< Callback for assertion failures */
@ -2463,6 +2462,7 @@ mdb_txn_renew0(MDB_txn *txn)
} else { } else {
MDB_PID_T pid = env->me_pid; MDB_PID_T pid = env->me_pid;
MDB_THR_T tid = pthread_self(); MDB_THR_T tid = pthread_self();
mdb_mutex_t rmutex = MDB_MUTEX(env, r);
if (!env->me_live_reader) { if (!env->me_live_reader) {
rc = mdb_reader_pid(env, Pidset, pid); rc = mdb_reader_pid(env, Pidset, pid);
@ -2471,13 +2471,14 @@ mdb_txn_renew0(MDB_txn *txn)
env->me_live_reader = 1; env->me_live_reader = 1;
} }
LOCK_MUTEX_R(env); if (LOCK_MUTEX(rc, env, rmutex))
return rc;
nr = ti->mti_numreaders; nr = ti->mti_numreaders;
for (i=0; i<nr; i++) for (i=0; i<nr; i++)
if (ti->mti_readers[i].mr_pid == 0) if (ti->mti_readers[i].mr_pid == 0)
break; break;
if (i == env->me_maxreaders) { if (i == env->me_maxreaders) {
UNLOCK_MUTEX_R(env); UNLOCK_MUTEX(rmutex);
return MDB_READERS_FULL; return MDB_READERS_FULL;
} }
ti->mti_readers[i].mr_pid = pid; ti->mti_readers[i].mr_pid = pid;
@ -2486,7 +2487,7 @@ mdb_txn_renew0(MDB_txn *txn)
ti->mti_numreaders = ++nr; ti->mti_numreaders = ++nr;
/* Save numreaders for un-mutexed mdb_env_close() */ /* Save numreaders for un-mutexed mdb_env_close() */
env->me_numreaders = nr; env->me_numreaders = nr;
UNLOCK_MUTEX_R(env); UNLOCK_MUTEX(rmutex);
r = &ti->mti_readers[i]; r = &ti->mti_readers[i];
new_notls = (env->me_flags & MDB_NOTLS); new_notls = (env->me_flags & MDB_NOTLS);
@ -2501,7 +2502,8 @@ mdb_txn_renew0(MDB_txn *txn)
} }
} else { } else {
if (ti) { if (ti) {
LOCK_MUTEX_W(env); if (LOCK_MUTEX(rc, env, MDB_MUTEX(env, w)))
return rc;
txn->mt_txnid = ti->mti_txnid; txn->mt_txnid = ti->mti_txnid;
meta = env->me_metas[txn->mt_txnid & 1]; meta = env->me_metas[txn->mt_txnid & 1];
@ -2766,7 +2768,7 @@ mdb_txn_reset0(MDB_txn *txn, const char *act)
env->me_txn = NULL; env->me_txn = NULL;
/* The writer mutex was locked in mdb_txn_begin. */ /* The writer mutex was locked in mdb_txn_begin. */
if (env->me_txns) if (env->me_txns)
UNLOCK_MUTEX_W(env); UNLOCK_MUTEX(MDB_MUTEX(env, w));
} }
} }
@ -3333,7 +3335,7 @@ done:
mdb_dbis_update(txn, 1); mdb_dbis_update(txn, 1);
if (env->me_txns) if (env->me_txns)
UNLOCK_MUTEX_W(env); UNLOCK_MUTEX(MDB_MUTEX(env, w));
free(txn); free(txn);
return MDB_SUCCESS; return MDB_SUCCESS;
@ -4302,7 +4304,7 @@ mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
if ((rc = pthread_mutexattr_init(&mattr)) if ((rc = pthread_mutexattr_init(&mattr))
|| (rc = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED)) || (rc = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED))
|| (rc = pthread_mutex_init(&env->me_txns->mti_mutex, &mattr)) || (rc = pthread_mutex_init(&env->me_txns->mti_rmutex, &mattr))
|| (rc = pthread_mutex_init(&env->me_txns->mti_wmutex, &mattr))) || (rc = pthread_mutex_init(&env->me_txns->mti_wmutex, &mattr)))
goto fail; goto fail;
pthread_mutexattr_destroy(&mattr); pthread_mutexattr_destroy(&mattr);
@ -4585,7 +4587,6 @@ mdb_env_close0(MDB_env *env, int excl)
env->me_flags &= ~(MDB_ENV_ACTIVE|MDB_ENV_TXKEY); env->me_flags &= ~(MDB_ENV_ACTIVE|MDB_ENV_TXKEY);
} }
void ESECT void ESECT
mdb_env_close(MDB_env *env) mdb_env_close(MDB_env *env)
{ {
@ -8533,6 +8534,7 @@ static int ESECT
mdb_env_copyfd0(MDB_env *env, HANDLE fd) mdb_env_copyfd0(MDB_env *env, HANDLE fd)
{ {
MDB_txn *txn = NULL; MDB_txn *txn = NULL;
mdb_mutex_t wmutex = NULL;
int rc; int rc;
size_t wsize; size_t wsize;
char *ptr; char *ptr;
@ -8557,11 +8559,13 @@ mdb_env_copyfd0(MDB_env *env, HANDLE fd)
mdb_txn_reset0(txn, "reset-stage1"); mdb_txn_reset0(txn, "reset-stage1");
/* Temporarily block writers until we snapshot the meta pages */ /* Temporarily block writers until we snapshot the meta pages */
LOCK_MUTEX_W(env); wmutex = MDB_MUTEX(env, w);
if (LOCK_MUTEX(rc, env, wmutex))
goto leave;
rc = mdb_txn_renew0(txn); rc = mdb_txn_renew0(txn);
if (rc) { if (rc) {
UNLOCK_MUTEX_W(env); UNLOCK_MUTEX(wmutex);
goto leave; goto leave;
} }
} }
@ -8585,8 +8589,8 @@ mdb_env_copyfd0(MDB_env *env, HANDLE fd)
break; break;
} }
} }
if (env->me_txns) if (wmutex)
UNLOCK_MUTEX_W(env); UNLOCK_MUTEX(wmutex);
if (rc) if (rc)
goto leave; goto leave;
@ -9272,7 +9276,7 @@ mdb_reader_check(MDB_env *env, int *dead)
unsigned int i, j, rdrs; unsigned int i, j, rdrs;
MDB_reader *mr; MDB_reader *mr;
MDB_PID_T *pids, pid; MDB_PID_T *pids, pid;
int count = 0; int rc = MDB_SUCCESS, count = 0;
if (!env) if (!env)
return EINVAL; return EINVAL;
@ -9291,7 +9295,9 @@ mdb_reader_check(MDB_env *env, int *dead)
pid = mr[i].mr_pid; pid = mr[i].mr_pid;
if (mdb_pid_insert(pids, pid) == 0) { if (mdb_pid_insert(pids, pid) == 0) {
if (!mdb_reader_pid(env, Pidcheck, pid)) { if (!mdb_reader_pid(env, Pidcheck, pid)) {
LOCK_MUTEX_R(env); mdb_mutex_t rmutex = MDB_MUTEX(env, r);
if (LOCK_MUTEX(rc, env, rmutex))
break;
/* Recheck, a new process may have reused pid */ /* Recheck, a new process may have reused pid */
if (!mdb_reader_pid(env, Pidcheck, pid)) { if (!mdb_reader_pid(env, Pidcheck, pid)) {
for (j=i; j<rdrs; j++) for (j=i; j<rdrs; j++)
@ -9302,7 +9308,7 @@ mdb_reader_check(MDB_env *env, int *dead)
count++; count++;
} }
} }
UNLOCK_MUTEX_R(env); UNLOCK_MUTEX(rmutex);
} }
} }
} }
@ -9310,6 +9316,6 @@ mdb_reader_check(MDB_env *env, int *dead)
free(pids); free(pids);
if (dead) if (dead)
*dead = count; *dead = count;
return MDB_SUCCESS; return rc;
} }
/** @} */ /** @} */

Loading…
Cancel
Save