Make SysV semaphores robust. Cleanup MDB_ROBUST.

vl32b
Hallvard Furuseth 10 years ago
parent 9441012435
commit 631970e485
  1. 4
      libraries/liblmdb/lmdb.h
  2. 108
      libraries/liblmdb/mdb.c

@ -109,7 +109,9 @@
* The transaction becomes "long-lived" as above until a check * The transaction becomes "long-lived" as above until a check
* for stale readers is performed or the lockfile is reset, * for stale readers is performed or the lockfile is reset,
* since the process may not remove it from the lockfile. * since the process may not remove it from the lockfile.
* Except write-transactions on Unix with MDB_ROBUST or on Windows. *
* This does not apply to write transactions if the system clears
* stale writers, see above.
* *
* - If you do that anyway, do a periodic check for stale readers. Or * - If you do that anyway, do a periodic check for stale readers. Or
* close the environment once in a while, so the lockfile can get reset. * close the environment once in a while, so the lockfile can get reset.

@ -208,7 +208,7 @@ union semun {
#define MDB_DEVEL 0 #define MDB_DEVEL 0
#endif #endif
#if defined(_WIN32) || (defined(EOWNERDEAD) && !defined(MDB_USE_SYSV_SEM)) #if defined(_WIN32) || defined(MDB_USE_SYSV_SEM) || defined(EOWNERDEAD)
#define MDB_ROBUST_SUPPORTED 1 #define MDB_ROBUST_SUPPORTED 1
#endif #endif
@ -222,6 +222,16 @@ union semun {
# define mdb_func_ "<mdb_unknown>" # define mdb_func_ "<mdb_unknown>"
#endif #endif
/* Internal error codes, not exposed outside liblmdb */
#define MDB_NO_ROOT (MDB_LAST_ERRCODE + 10)
#ifdef _WIN32
#define MDB_OWNERDEAD ((int) WAIT_ABANDONED)
#elif defined MDB_USE_SYSV_SEM
#define MDB_OWNERDEAD (MDB_LAST_ERRCODE + 11)
#else
#define MDB_OWNERDEAD EOWNERDEAD
#endif
#ifdef _WIN32 #ifdef _WIN32
#define MDB_USE_HASH 1 #define MDB_USE_HASH 1
#define MDB_PIDLOCK 0 #define MDB_PIDLOCK 0
@ -237,7 +247,6 @@ typedef HANDLE mdb_mutex_t;
#define pthread_key_delete(x) TlsFree(x) #define pthread_key_delete(x) TlsFree(x)
#define pthread_getspecific(x) TlsGetValue(x) #define pthread_getspecific(x) TlsGetValue(x)
#define pthread_setspecific(x,y) (TlsSetValue(x,y) ? 0 : ErrCode()) #define pthread_setspecific(x,y) (TlsSetValue(x,y) ? 0 : ErrCode())
#define pthread_mutex_consistent(mutex) 0
#define pthread_mutex_unlock(x) ReleaseMutex(*x) #define pthread_mutex_unlock(x) ReleaseMutex(*x)
#define pthread_mutex_lock(x) WaitForSingleObject(*x, INFINITE) #define pthread_mutex_lock(x) WaitForSingleObject(*x, INFINITE)
#define pthread_cond_signal(x) SetEvent(*x) #define pthread_cond_signal(x) SetEvent(*x)
@ -247,6 +256,7 @@ typedef HANDLE mdb_mutex_t;
#define MDB_MUTEX(env, rw) ((env)->me_##rw##mutex) #define MDB_MUTEX(env, rw) ((env)->me_##rw##mutex)
#define LOCK_MUTEX0(mutex) WaitForSingleObject(mutex, INFINITE) #define LOCK_MUTEX0(mutex) WaitForSingleObject(mutex, INFINITE)
#define UNLOCK_MUTEX(mutex) ReleaseMutex(mutex) #define UNLOCK_MUTEX(mutex) ReleaseMutex(mutex)
#define mdb_mutex_consistent(mutex) 0
#define getpid() GetCurrentProcessId() #define getpid() GetCurrentProcessId()
#define MDB_FDATASYNC(fd) (!FlushFileBuffers(fd)) #define MDB_FDATASYNC(fd) (!FlushFileBuffers(fd))
#define MDB_MSYNC(addr,len,flags) (!FlushViewOfFile(addr,len)) #define MDB_MSYNC(addr,len,flags) (!FlushViewOfFile(addr,len))
@ -274,6 +284,7 @@ typedef HANDLE mdb_mutex_t;
typedef struct mdb_mutex { typedef struct mdb_mutex {
int semid; int semid;
int semnum; int semnum;
int *locked;
} mdb_mutex_t; } mdb_mutex_t;
#define MDB_MUTEX(env, rw) (&(env)->me_##rw##mutex) #define MDB_MUTEX(env, rw) (&(env)->me_##rw##mutex)
@ -281,19 +292,28 @@ typedef struct mdb_mutex {
#define UNLOCK_MUTEX(mutex) do { \ #define UNLOCK_MUTEX(mutex) do { \
struct sembuf sb = { 0, 1, SEM_UNDO }; \ struct sembuf sb = { 0, 1, SEM_UNDO }; \
sb.sem_num = (mutex)->semnum; \ sb.sem_num = (mutex)->semnum; \
*(mutex)->locked = 0; \
semop((mutex)->semid, &sb, 1); \ semop((mutex)->semid, &sb, 1); \
} while(0) } while(0)
static int static int
mdb_sem_wait(mdb_mutex_t *sem) mdb_sem_wait(mdb_mutex_t *sem)
{ {
int rc; int rc, *locked = sem->locked;
struct sembuf sb = { 0, -1, SEM_UNDO }; struct sembuf sb = { 0, -1, SEM_UNDO };
sb.sem_num = sem->semnum; sb.sem_num = sem->semnum;
while ((rc = semop(sem->semid, &sb, 1)) && (rc = errno) == EINTR) ; do {
return rc; if (!semop(sem->semid, &sb, 1)) {
rc = *locked ? MDB_OWNERDEAD : MDB_SUCCESS;
*locked = 1;
break;
}
} while ((rc = errno) == EINTR);
return rc;
} }
#define mdb_mutex_consistent(mutex) 0
#else #else
/** Pointer/HANDLE type of shared mutex/semaphore. /** Pointer/HANDLE type of shared mutex/semaphore.
*/ */
@ -308,6 +328,9 @@ typedef pthread_mutex_t mdb_mutex_t;
/** Unlock the reader or writer mutex. /** Unlock the reader or writer mutex.
*/ */
#define UNLOCK_MUTEX(mutex) pthread_mutex_unlock(mutex) #define UNLOCK_MUTEX(mutex) pthread_mutex_unlock(mutex)
/** Mark mutex-protected data as repaired, after death of previous owner.
*/
#define mdb_mutex_consistent(mutex) pthread_mutex_consistent(mutex)
#endif /* MDB_USE_SYSV_SEM */ #endif /* MDB_USE_SYSV_SEM */
/** Get the error code for the last failed system function. /** Get the error code for the last failed system function.
@ -336,11 +359,17 @@ typedef pthread_mutex_t mdb_mutex_t;
#if defined(_WIN32) #if defined(_WIN32)
#define MNAME_LEN 32 #define MNAME_LEN 32
#elif defined(MDB_USE_SYSV_SEM) #elif defined(MDB_USE_SYSV_SEM)
#define MNAME_LEN 0 #define MNAME_LEN (sizeof(int))
#else #else
#define MNAME_LEN (sizeof(pthread_mutex_t)) #define MNAME_LEN (sizeof(pthread_mutex_t))
#endif #endif
#ifdef MDB_USE_SYSV_SEM
#define SYSV_SEM_FLAG 1 /**< SysV sems in lockfile format */
#else
#define SYSV_SEM_FLAG 0
#endif
/** @} */ /** @} */
#ifdef MDB_ROBUST_SUPPORTED #ifdef MDB_ROBUST_SUPPORTED
@ -667,6 +696,7 @@ typedef struct MDB_txbody {
char mtb_rmname[MNAME_LEN]; char mtb_rmname[MNAME_LEN];
#elif defined(MDB_USE_SYSV_SEM) #elif defined(MDB_USE_SYSV_SEM)
int mtb_semid; int mtb_semid;
int mtb_rlocked;
#else #else
/** Mutex protecting access to this table. /** Mutex protecting access to this table.
* This is the #MDB_MUTEX(env,r) reader table lock. * This is the #MDB_MUTEX(env,r) reader table lock.
@ -695,22 +725,25 @@ typedef struct MDB_txninfo {
#define mti_rmname mt1.mtb.mtb_rmname #define mti_rmname mt1.mtb.mtb_rmname
#define mti_txnid mt1.mtb.mtb_txnid #define mti_txnid mt1.mtb.mtb_txnid
#define mti_numreaders mt1.mtb.mtb_numreaders #define mti_numreaders mt1.mtb.mtb_numreaders
char pad[(sizeof(MDB_txbody)+CACHELINE-1) & ~(CACHELINE-1)];
} mt1;
#ifdef MDB_USE_SYSV_SEM #ifdef MDB_USE_SYSV_SEM
#define mti_semid mt1.mtb.mtb_semid #define mti_semid mt1.mtb.mtb_semid
#else #define mti_rlocked mt1.mtb.mtb_rlocked
#endif
char pad[(sizeof(MDB_txbody)+CACHELINE-1) & ~(CACHELINE-1)];
} mt1;
union { union {
#if defined(_WIN32) #if defined(_WIN32)
char mt2_wmname[MNAME_LEN]; char mt2_wmname[MNAME_LEN];
#define mti_wmname mt2.mt2_wmname #define mti_wmname mt2.mt2_wmname
#elif defined MDB_USE_SYSV_SEM
int mt2_wlocked;
#define mti_wlocked mt2.mt2_wlocked
#else #else
pthread_mutex_t mt2_wmutex; pthread_mutex_t mt2_wmutex;
#define mti_wmutex mt2.mt2_wmutex #define mti_wmutex mt2.mt2_wmutex
#endif #endif
char pad[(MNAME_LEN+CACHELINE-1) & ~(CACHELINE-1)]; char pad[(MNAME_LEN+CACHELINE-1) & ~(CACHELINE-1)];
} mt2; } mt2;
#endif
MDB_reader mti_readers[1]; MDB_reader mti_readers[1];
} MDB_txninfo; } MDB_txninfo;
@ -719,7 +752,7 @@ typedef struct MDB_txninfo {
((uint32_t) \ ((uint32_t) \
((MDB_LOCK_VERSION) \ ((MDB_LOCK_VERSION) \
/* Flags which describe functionality */ \ /* Flags which describe functionality */ \
+ (((MNAME_LEN) == 0) << 18) /* MDB_USE_SYSV_SEM */ \ + (SYSV_SEM_FLAG << 18) \
+ (((MDB_PIDLOCK) != 0) << 16))) + (((MDB_PIDLOCK) != 0) << 16)))
/** @} */ /** @} */
@ -2596,17 +2629,8 @@ mdb_txn_renew0(MDB_txn *txn)
if (ti) { if (ti) {
if (LOCK_MUTEX(rc, env, MDB_MUTEX(env, w))) if (LOCK_MUTEX(rc, env, MDB_MUTEX(env, w)))
return rc; return rc;
#ifdef MDB_USE_SYSV_SEM
meta = env->me_metas[ mdb_env_pick_meta(env) ];
txn->mt_txnid = meta->mm_txnid;
/* Update mti_txnid like mdb_mutex_failed() would,
* in case last writer crashed before updating it.
*/
ti->mti_txnid = txn->mt_txnid;
#else
txn->mt_txnid = ti->mti_txnid; txn->mt_txnid = ti->mti_txnid;
meta = env->me_metas[txn->mt_txnid & 1]; meta = env->me_metas[txn->mt_txnid & 1];
#endif
} else { } else {
meta = env->me_metas[ mdb_env_pick_meta(env) ]; meta = env->me_metas[ mdb_env_pick_meta(env) ];
txn->mt_txnid = meta->mm_txnid; txn->mt_txnid = meta->mm_txnid;
@ -4358,6 +4382,10 @@ mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
int fdflags; int fdflags;
# define MDB_CLOEXEC 0 # define MDB_CLOEXEC 0
#endif #endif
#endif
#ifdef MDB_USE_SYSV_SEM
int semid;
union semun semu;
#endif #endif
int rc; int rc;
off_t size, rsize; off_t size, rsize;
@ -4472,17 +4500,10 @@ mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
env->me_wmutex = CreateMutex(&mdb_all_sa, FALSE, env->me_txns->mti_wmname); env->me_wmutex = CreateMutex(&mdb_all_sa, FALSE, env->me_txns->mti_wmname);
if (!env->me_wmutex) goto fail_errno; if (!env->me_wmutex) goto fail_errno;
#elif defined(MDB_USE_SYSV_SEM) #elif defined(MDB_USE_SYSV_SEM)
union semun semu;
unsigned short vals[2] = {1, 1}; unsigned short vals[2] = {1, 1};
int semid = semget(IPC_PRIVATE, 2, mode); semid = semget(IPC_PRIVATE, 2, mode);
if (semid < 0) if (semid < 0)
goto fail_errno; goto fail_errno;
env->me_rmutex.semid = semid;
env->me_wmutex.semid = semid;
env->me_rmutex.semnum = 0;
env->me_wmutex.semnum = 1;
semu.array = vals; semu.array = vals;
if (semctl(semid, 0, SETALL, semu) < 0) if (semctl(semid, 0, SETALL, semu) < 0)
goto fail_errno; goto fail_errno;
@ -4509,8 +4530,6 @@ mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
} else { } else {
#ifdef MDB_USE_SYSV_SEM #ifdef MDB_USE_SYSV_SEM
struct semid_ds buf; struct semid_ds buf;
union semun semu;
int semid;
#endif #endif
if (env->me_txns->mti_magic != MDB_MAGIC) { if (env->me_txns->mti_magic != MDB_MAGIC) {
DPUTS("lock region has invalid magic"); DPUTS("lock region has invalid magic");
@ -4535,20 +4554,23 @@ mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
#elif defined(MDB_USE_SYSV_SEM) #elif defined(MDB_USE_SYSV_SEM)
semid = env->me_txns->mti_semid; semid = env->me_txns->mti_semid;
semu.buf = &buf; semu.buf = &buf;
/* check for read access */ /* check for read access */
if (semctl(semid, 0, IPC_STAT, semu) < 0) if (semctl(semid, 0, IPC_STAT, semu) < 0)
goto fail_errno; goto fail_errno;
/* check for write access */ /* check for write access */
if (semctl(semid, 0, IPC_SET, semu) < 0) if (semctl(semid, 0, IPC_SET, semu) < 0)
goto fail_errno; goto fail_errno;
env->me_rmutex.semid = semid;
env->me_wmutex.semid = semid;
env->me_rmutex.semnum = 0;
env->me_wmutex.semnum = 1;
#endif #endif
} }
#ifdef MDB_USE_SYSV_SEM
env->me_rmutex.semid = semid;
env->me_wmutex.semid = semid;
env->me_rmutex.semnum = 0;
env->me_wmutex.semnum = 1;
env->me_rmutex.locked = &env->me_txns->mti_rlocked;
env->me_wmutex.locked = &env->me_txns->mti_wlocked;
#endif
return MDB_SUCCESS; return MDB_SUCCESS;
fail_errno: fail_errno:
@ -6140,7 +6162,6 @@ int
mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
unsigned int flags) unsigned int flags)
{ {
enum { MDB_NO_ROOT = MDB_LAST_ERRCODE+10 }; /* internal code */
MDB_env *env; MDB_env *env;
MDB_node *leaf = NULL; MDB_node *leaf = NULL;
MDB_page *fp, *mp; MDB_page *fp, *mp;
@ -9593,7 +9614,7 @@ static int mdb_reader_check0(MDB_env *env, int rlocked, int *dead)
#ifdef MDB_ROBUST_SUPPORTED #ifdef MDB_ROBUST_SUPPORTED
/** Handle #LOCK_MUTEX0() failure. /** Handle #LOCK_MUTEX0() failure.
* With #MDB_ROBUST, try to repair the lock file if the mutex owner died. * Try to repair the lock file if the mutex owner died.
* @param[in] env the environment handle * @param[in] env the environment handle
* @param[in] mutex LOCK_MUTEX0() mutex * @param[in] mutex LOCK_MUTEX0() mutex
* @param[in] rc LOCK_MUTEX0() error (nonzero) * @param[in] rc LOCK_MUTEX0() error (nonzero)
@ -9602,11 +9623,8 @@ static int mdb_reader_check0(MDB_env *env, int rlocked, int *dead)
static int mdb_mutex_failed(MDB_env *env, mdb_mutex_t *mutex, int rc) static int mdb_mutex_failed(MDB_env *env, mdb_mutex_t *mutex, int rc)
{ {
int toggle, rlocked, rc2; int toggle, rlocked, rc2;
#ifndef _WIN32
enum { WAIT_ABANDONED = EOWNERDEAD };
#endif
if (rc == (int) WAIT_ABANDONED) { if (rc == MDB_OWNERDEAD) {
/* We own the mutex. Clean up after dead previous owner. */ /* We own the mutex. Clean up after dead previous owner. */
rc = MDB_SUCCESS; rc = MDB_SUCCESS;
rlocked = (mutex == MDB_MUTEX(env, r)); rlocked = (mutex == MDB_MUTEX(env, r));
@ -9627,7 +9645,7 @@ static int mdb_mutex_failed(MDB_env *env, mdb_mutex_t *mutex, int rc)
(rc ? "this process' env is hosed" : "recovering"))); (rc ? "this process' env is hosed" : "recovering")));
rc2 = mdb_reader_check0(env, rlocked, NULL); rc2 = mdb_reader_check0(env, rlocked, NULL);
if (rc2 == 0) if (rc2 == 0)
rc2 = pthread_mutex_consistent(mutex); rc2 = mdb_mutex_consistent(mutex);
if (rc || (rc = rc2)) { if (rc || (rc = rc2)) {
DPRINTF(("LOCK_MUTEX recovery failed, %s", mdb_strerror(rc))); DPRINTF(("LOCK_MUTEX recovery failed, %s", mdb_strerror(rc)));
UNLOCK_MUTEX(mutex); UNLOCK_MUTEX(mutex);

Loading…
Cancel
Save