Add MDB_ROBUST

vl32b
Hallvard Furuseth 11 years ago committed by Howard Chu
parent a7639a66a4
commit a2ac10107e
  1. 16
      libraries/liblmdb/lmdb.h
  2. 226
      libraries/liblmdb/mdb.c

@ -49,7 +49,9 @@
* stale locks can block further operation. * stale locks can block further operation.
* *
* Fix: Check for stale readers periodically, using the * Fix: Check for stale readers periodically, using the
* #mdb_reader_check function or the \ref mdb_stat_1 "mdb_stat" tool. Or just * #mdb_reader_check function or the \ref mdb_stat_1 "mdb_stat" tool.
* Catch stale
* locks with option MDB_ROBUST if supported (non-BSD). Or just
* make all programs using the database close it; the lockfile * make all programs using the database close it; the lockfile
* is always reset on first open of the environment. * is always reset on first open of the environment.
* *
@ -105,6 +107,7 @@
* The transaction becomes "long-lived" as above until a check * The transaction becomes "long-lived" as above until a check
* for stale readers is performed or the lockfile is reset, * for stale readers is performed or the lockfile is reset,
* since the process may not remove it from the lockfile. * since the process may not remove it from the lockfile.
* Except write-transactions on Unix with MDB_ROBUST or on Windows.
* *
* - If you do that anyway, do a periodic check for stale readers. Or * - If you do that anyway, do a periodic check for stale readers. Or
* close the environment once in a while, so the lockfile can get reset. * close the environment once in a while, so the lockfile can get reset.
@ -287,6 +290,8 @@ typedef void (MDB_rel_func)(MDB_val *item, void *oldptr, void *newptr, void *rel
#define MDB_NORDAHEAD 0x800000 #define MDB_NORDAHEAD 0x800000
/** don't initialize malloc'd memory before writing to datafile */ /** don't initialize malloc'd memory before writing to datafile */
#define MDB_NOMEMINIT 0x1000000 #define MDB_NOMEMINIT 0x1000000
/** catch stale locks if supported (not on BSD, needs robust mutexes) */
#define MDB_ROBUST 0x2000000
/** @} */ /** @} */
/** @defgroup mdb_dbi_open Database Flags /** @defgroup mdb_dbi_open Database Flags
@ -391,7 +396,7 @@ typedef enum MDB_cursor_op {
#define MDB_PAGE_NOTFOUND (-30797) #define MDB_PAGE_NOTFOUND (-30797)
/** Located page was wrong type */ /** Located page was wrong type */
#define MDB_CORRUPTED (-30796) #define MDB_CORRUPTED (-30796)
/** Update of meta page failed, probably I/O error */ /** Update of meta page failed or environment had fatal error */
#define MDB_PANIC (-30795) #define MDB_PANIC (-30795)
/** Environment version mismatch */ /** Environment version mismatch */
#define MDB_VERSION_MISMATCH (-30794) #define MDB_VERSION_MISMATCH (-30794)
@ -511,6 +516,12 @@ int mdb_env_create(MDB_env **env);
* Open the environment in read-only mode. No write operations will be * Open the environment in read-only mode. No write operations will be
* allowed. LMDB will still modify the lock file - except on read-only * allowed. LMDB will still modify the lock file - except on read-only
* filesystems, where LMDB does not use locks. * filesystems, where LMDB does not use locks.
* <li>#MDB_ROBUST
* Initialize the lockfile to catch stale locks if robust mutexes
* are supported, so aborted processes will not block others.
* Ignored when another process has the environment open. Unsupported
* by liblmdb built with MDB_USE_POSIX_SEM (such as BSD systems).
* Enabled by default on Windows. Some locking slowdown on Unix.
* <li>#MDB_WRITEMAP * <li>#MDB_WRITEMAP
* Use a writeable memory map unless MDB_RDONLY is set. This is faster * Use a writeable memory map unless MDB_RDONLY is set. This is faster
* and uses fewer mallocs, but loses protection from application bugs * and uses fewer mallocs, but loses protection from application bugs
@ -727,6 +738,7 @@ void mdb_env_close(MDB_env *env);
* This may be used to set some flags in addition to those from * This may be used to set some flags in addition to those from
* #mdb_env_open(), or to unset these flags. If several threads * #mdb_env_open(), or to unset these flags. If several threads
* change the flags at the same time, the result is undefined. * change the flags at the same time, the result is undefined.
* Most flags cannot be changed after #mdb_env_open().
* @param[in] env An environment handle returned by #mdb_env_create() * @param[in] env An environment handle returned by #mdb_env_create()
* @param[in] flags The flags to change, bitwise OR'ed together * @param[in] flags The flags to change, bitwise OR'ed together
* @param[in] onoff A non-zero value sets the flags, zero clears them. * @param[in] onoff A non-zero value sets the flags, zero clears them.

@ -191,6 +191,10 @@ extern int cacheflush(char *addr, int nbytes, int cache);
/** Features under development */ /** Features under development */
#ifndef MDB_DEVEL #ifndef MDB_DEVEL
#define MDB_DEVEL 0 #define MDB_DEVEL 0
#endif
#if MDB_DEVEL && (defined(_WIN32) || (defined(EOWNERDEAD) && !defined(MDB_USE_POSIX_SEM)))
#define MDB_ROBUST_SUPPORTED 1
#endif #endif
/** Wrapper around __func__, which is a C99 feature */ /** Wrapper around __func__, which is a C99 feature */
@ -210,6 +214,7 @@ extern int cacheflush(char *addr, int nbytes, int cache);
#define pthread_t HANDLE #define pthread_t HANDLE
#define pthread_mutex_t HANDLE #define pthread_mutex_t HANDLE
#define pthread_cond_t HANDLE #define pthread_cond_t HANDLE
typedef HANDLE mdb_mutex_t;
#define pthread_key_t DWORD #define pthread_key_t DWORD
#define pthread_self() GetCurrentThreadId() #define pthread_self() GetCurrentThreadId()
#define pthread_key_create(x,y) \ #define pthread_key_create(x,y) \
@ -217,16 +222,16 @@ extern int cacheflush(char *addr, int nbytes, int cache);
#define pthread_key_delete(x) TlsFree(x) #define pthread_key_delete(x) TlsFree(x)
#define pthread_getspecific(x) TlsGetValue(x) #define pthread_getspecific(x) TlsGetValue(x)
#define pthread_setspecific(x,y) (TlsSetValue(x,y) ? 0 : ErrCode()) #define pthread_setspecific(x,y) (TlsSetValue(x,y) ? 0 : ErrCode())
#define pthread_mutex_consistent(mutex) 0
#define pthread_mutex_unlock(x) ReleaseMutex(*x) #define pthread_mutex_unlock(x) ReleaseMutex(*x)
#define pthread_mutex_lock(x) WaitForSingleObject(*x, INFINITE) #define pthread_mutex_lock(x) WaitForSingleObject(*x, INFINITE)
#define pthread_cond_signal(x) SetEvent(*x) #define pthread_cond_signal(x) SetEvent(*x)
#define pthread_cond_wait(cond,mutex) do{SignalObjectAndWait(*mutex, *cond, INFINITE, FALSE); WaitForSingleObject(*mutex, INFINITE);}while(0) #define pthread_cond_wait(cond,mutex) do{SignalObjectAndWait(*mutex, *cond, INFINITE, FALSE); WaitForSingleObject(*mutex, INFINITE);}while(0)
#define THREAD_CREATE(thr,start,arg) thr=CreateThread(NULL,0,start,arg,0,NULL) #define THREAD_CREATE(thr,start,arg) thr=CreateThread(NULL,0,start,arg,0,NULL)
#define THREAD_FINISH(thr) WaitForSingleObject(thr, INFINITE) #define THREAD_FINISH(thr) WaitForSingleObject(thr, INFINITE)
#define LOCK_MUTEX_R(env) pthread_mutex_lock(&(env)->me_rmutex) #define MDB_MUTEX(env, rw) ((env)->me_##rw##mutex)
#define UNLOCK_MUTEX_R(env) pthread_mutex_unlock(&(env)->me_rmutex) #define LOCK_MUTEX0(mutex) WaitForSingleObject(mutex, INFINITE)
#define LOCK_MUTEX_W(env) pthread_mutex_lock(&(env)->me_wmutex) #define UNLOCK_MUTEX(mutex) ReleaseMutex(mutex)
#define UNLOCK_MUTEX_W(env) pthread_mutex_unlock(&(env)->me_wmutex)
#define getpid() GetCurrentProcessId() #define getpid() GetCurrentProcessId()
#define MDB_FDATASYNC(fd) (!FlushFileBuffers(fd)) #define MDB_FDATASYNC(fd) (!FlushFileBuffers(fd))
#define MDB_MSYNC(addr,len,flags) (!FlushViewOfFile(addr,len)) #define MDB_MSYNC(addr,len,flags) (!FlushViewOfFile(addr,len))
@ -251,10 +256,10 @@ extern int cacheflush(char *addr, int nbytes, int cache);
#ifdef MDB_USE_POSIX_SEM #ifdef MDB_USE_POSIX_SEM
#define LOCK_MUTEX_R(env) mdb_sem_wait((env)->me_rmutex) typedef sem_t *mdb_mutex_t;
#define UNLOCK_MUTEX_R(env) sem_post((env)->me_rmutex) #define MDB_MUTEX(env, rw) ((env)->me_##rw##mutex)
#define LOCK_MUTEX_W(env) mdb_sem_wait((env)->me_wmutex) #define LOCK_MUTEX0(mutex) mdb_sem_wait(mutex)
#define UNLOCK_MUTEX_W(env) sem_post((env)->me_wmutex) #define UNLOCK_MUTEX(mutex) sem_post(mutex)
static int static int
mdb_sem_wait(sem_t *sem) mdb_sem_wait(sem_t *sem)
@ -265,21 +270,19 @@ mdb_sem_wait(sem_t *sem)
} }
#else #else
/** Lock the reader mutex. /** Pointer/HANDLE type of shared mutex/semaphore.
*/ */
#define LOCK_MUTEX_R(env) pthread_mutex_lock(&(env)->me_txns->mti_mutex) typedef pthread_mutex_t *mdb_mutex_t;
/** Unlock the reader mutex. /** Mutex for the reader table (rw = r) or write transaction (rw = w).
*/ */
#define UNLOCK_MUTEX_R(env) pthread_mutex_unlock(&(env)->me_txns->mti_mutex) #define MDB_MUTEX(env, rw) (&(env)->me_txns->mti_##rw##mutex)
/** Lock the reader or writer mutex.
/** Lock the writer mutex. * Returns 0 or a code to give #mdb_mutex_failed(), as in #LOCK_MUTEX().
* Only a single write transaction is allowed at a time. Other writers
* will block waiting for this mutex.
*/ */
#define LOCK_MUTEX_W(env) pthread_mutex_lock(&(env)->me_txns->mti_wmutex) #define LOCK_MUTEX0(mutex) pthread_mutex_lock(mutex)
/** Unlock the writer mutex. /** Unlock the reader or writer mutex.
*/ */
#define UNLOCK_MUTEX_W(env) pthread_mutex_unlock(&(env)->me_txns->mti_wmutex) #define UNLOCK_MUTEX(mutex) pthread_mutex_unlock(mutex)
#endif /* MDB_USE_POSIX_SEM */ #endif /* MDB_USE_POSIX_SEM */
/** Get the error code for the last failed system function. /** Get the error code for the last failed system function.
@ -313,6 +316,19 @@ mdb_sem_wait(sem_t *sem)
/** @} */ /** @} */
#ifdef MDB_ROBUST_SUPPORTED
/** Lock mutex, handle any error, set rc = result.
* Return 0 on success, nonzero (not rc) on error.
*/
#define LOCK_MUTEX(rc, env, mutex) \
(((rc) = LOCK_MUTEX0(mutex)) && \
((rc) = mdb_mutex_failed(env, mutex, rc)))
static int mdb_mutex_failed(MDB_env *env, mdb_mutex_t mutex, int rc);
#else
#define LOCK_MUTEX(rc, env, mutex) ((rc) = LOCK_MUTEX0(mutex))
#define mdb_mutex_failed(env, mutex, rc) (rc)
#endif
#ifndef _WIN32 #ifndef _WIN32
/** A flag for opening a file and requesting synchronous data writes. /** A flag for opening a file and requesting synchronous data writes.
* This is only used when writing a meta page. It's not strictly needed; * This is only used when writing a meta page. It's not strictly needed;
@ -436,7 +452,7 @@ static txnid_t mdb_debug_start;
/** The version number for a database's datafile format. */ /** The version number for a database's datafile format. */
#define MDB_DATA_VERSION ((MDB_DEVEL) ? 999 : 1) #define MDB_DATA_VERSION ((MDB_DEVEL) ? 999 : 1)
/** The version number for a database's lockfile format. */ /** The version number for a database's lockfile format. */
#define MDB_LOCK_VERSION 1 #define MDB_LOCK_VERSION ((MDB_DEVEL) ? 999 : 1)
/** @brief The max size of a key we can write, or 0 for dynamic max. /** @brief The max size of a key we can write, or 0 for dynamic max.
* *
@ -624,9 +640,9 @@ typedef struct MDB_txbody {
char mtb_rmname[MNAME_LEN]; char mtb_rmname[MNAME_LEN];
#else #else
/** Mutex protecting access to this table. /** Mutex protecting access to this table.
* This is the reader lock that #LOCK_MUTEX_R acquires. * This is the #MDB_MUTEX(env,r) reader table lock.
*/ */
pthread_mutex_t mtb_mutex; pthread_mutex_t mtb_rmutex;
#endif #endif
/** The ID of the last transaction committed to the database. /** The ID of the last transaction committed to the database.
* This is recorded here only for convenience; the value can always * This is recorded here only for convenience; the value can always
@ -638,6 +654,8 @@ typedef struct MDB_txbody {
* when readers release their slots. * when readers release their slots.
*/ */
unsigned mtb_numreaders; unsigned mtb_numreaders;
/** Flags which the lock file was initialized with. */
unsigned mtb_flags;
} MDB_txbody; } MDB_txbody;
/** The actual reader table definition. */ /** The actual reader table definition. */
@ -646,10 +664,11 @@ typedef struct MDB_txninfo {
MDB_txbody mtb; MDB_txbody mtb;
#define mti_magic mt1.mtb.mtb_magic #define mti_magic mt1.mtb.mtb_magic
#define mti_format mt1.mtb.mtb_format #define mti_format mt1.mtb.mtb_format
#define mti_mutex mt1.mtb.mtb_mutex #define mti_rmutex mt1.mtb.mtb_rmutex
#define mti_rmname mt1.mtb.mtb_rmname #define mti_rmname mt1.mtb.mtb_rmname
#define mti_txnid mt1.mtb.mtb_txnid #define mti_txnid mt1.mtb.mtb_txnid
#define mti_numreaders mt1.mtb.mtb_numreaders #define mti_numreaders mt1.mtb.mtb_numreaders
#define mti_flags mt1.mtb.mtb_flags
char pad[(sizeof(MDB_txbody)+CACHELINE-1) & ~(CACHELINE-1)]; char pad[(sizeof(MDB_txbody)+CACHELINE-1) & ~(CACHELINE-1)];
} mt1; } mt1;
union { union {
@ -1137,11 +1156,11 @@ struct MDB_env {
int me_live_reader; /**< have liveness lock in reader table */ int me_live_reader; /**< have liveness lock in reader table */
#ifdef _WIN32 #ifdef _WIN32
int me_pidquery; /**< Used in OpenProcess */ int me_pidquery; /**< Used in OpenProcess */
HANDLE me_rmutex; /* Windows mutexes don't reside in shared mem */ #endif
HANDLE me_wmutex; #if defined(_WIN32) || defined(MDB_USE_POSIX_SEM)
#elif defined(MDB_USE_POSIX_SEM) /* Windows mutexes/POSIX semaphores do not reside in shared mem */
sem_t *me_rmutex; /* Shared mutexes are not supported */ mdb_mutex_t me_rmutex;
sem_t *me_wmutex; mdb_mutex_t me_wmutex;
#endif #endif
void *me_userctx; /**< User-settable context */ void *me_userctx; /**< User-settable context */
MDB_assert_func *me_assert_func; /**< Callback for assertion failures */ MDB_assert_func *me_assert_func; /**< Callback for assertion failures */
@ -1230,6 +1249,7 @@ static void mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node);
static int mdb_drop0(MDB_cursor *mc, int subs); static int mdb_drop0(MDB_cursor *mc, int subs);
static void mdb_default_cmp(MDB_txn *txn, MDB_dbi dbi); static void mdb_default_cmp(MDB_txn *txn, MDB_dbi dbi);
static int mdb_reader_check0(MDB_env *env, int rlocked, int *dead);
/** @cond */ /** @cond */
static MDB_cmp_func mdb_cmp_memn, mdb_cmp_memnr, mdb_cmp_int, mdb_cmp_cint, mdb_cmp_long; static MDB_cmp_func mdb_cmp_memn, mdb_cmp_memnr, mdb_cmp_int, mdb_cmp_cint, mdb_cmp_long;
@ -1257,7 +1277,7 @@ static char *const mdb_errstr[] = {
"MDB_NOTFOUND: No matching key/data pair found", "MDB_NOTFOUND: No matching key/data pair found",
"MDB_PAGE_NOTFOUND: Requested page not found", "MDB_PAGE_NOTFOUND: Requested page not found",
"MDB_CORRUPTED: Located page was wrong type", "MDB_CORRUPTED: Located page was wrong type",
"MDB_PANIC: Update of meta page failed", "MDB_PANIC: Update of meta page failed or environment had fatal error",
"MDB_VERSION_MISMATCH: Database environment version mismatch", "MDB_VERSION_MISMATCH: Database environment version mismatch",
"MDB_INVALID: File is not an LMDB file", "MDB_INVALID: File is not an LMDB file",
"MDB_MAP_FULL: Environment mapsize limit reached", "MDB_MAP_FULL: Environment mapsize limit reached",
@ -2490,6 +2510,7 @@ mdb_txn_renew0(MDB_txn *txn)
} else { } else {
MDB_PID_T pid = env->me_pid; MDB_PID_T pid = env->me_pid;
MDB_THR_T tid = pthread_self(); MDB_THR_T tid = pthread_self();
mdb_mutex_t rmutex = MDB_MUTEX(env, r);
if (!env->me_live_reader) { if (!env->me_live_reader) {
rc = mdb_reader_pid(env, Pidset, pid); rc = mdb_reader_pid(env, Pidset, pid);
@ -2498,13 +2519,14 @@ mdb_txn_renew0(MDB_txn *txn)
env->me_live_reader = 1; env->me_live_reader = 1;
} }
LOCK_MUTEX_R(env); if (LOCK_MUTEX(rc, env, rmutex))
return rc;
nr = ti->mti_numreaders; nr = ti->mti_numreaders;
for (i=0; i<nr; i++) for (i=0; i<nr; i++)
if (ti->mti_readers[i].mr_pid == 0) if (ti->mti_readers[i].mr_pid == 0)
break; break;
if (i == env->me_maxreaders) { if (i == env->me_maxreaders) {
UNLOCK_MUTEX_R(env); UNLOCK_MUTEX(rmutex);
return MDB_READERS_FULL; return MDB_READERS_FULL;
} }
ti->mti_readers[i].mr_pid = pid; ti->mti_readers[i].mr_pid = pid;
@ -2513,7 +2535,7 @@ mdb_txn_renew0(MDB_txn *txn)
ti->mti_numreaders = ++nr; ti->mti_numreaders = ++nr;
/* Save numreaders for un-mutexed mdb_env_close() */ /* Save numreaders for un-mutexed mdb_env_close() */
env->me_numreaders = nr; env->me_numreaders = nr;
UNLOCK_MUTEX_R(env); UNLOCK_MUTEX(rmutex);
r = &ti->mti_readers[i]; r = &ti->mti_readers[i];
new_notls = (env->me_flags & MDB_NOTLS); new_notls = (env->me_flags & MDB_NOTLS);
@ -2528,7 +2550,9 @@ mdb_txn_renew0(MDB_txn *txn)
} }
} else { } else {
if (ti) { if (ti) {
LOCK_MUTEX_W(env); mdb_mutex_t wmutex = MDB_MUTEX(env, w);
if (LOCK_MUTEX(rc, env, wmutex))
return rc;
txn->mt_txnid = ti->mti_txnid; txn->mt_txnid = ti->mti_txnid;
meta = env->me_metas[txn->mt_txnid & 1]; meta = env->me_metas[txn->mt_txnid & 1];
@ -2798,7 +2822,7 @@ mdb_txn_reset0(MDB_txn *txn, const char *act)
env->me_txn = NULL; env->me_txn = NULL;
/* The writer mutex was locked in mdb_txn_begin. */ /* The writer mutex was locked in mdb_txn_begin. */
if (env->me_txns) if (env->me_txns)
UNLOCK_MUTEX_W(env); UNLOCK_MUTEX(MDB_MUTEX(env, w));
} }
} }
@ -3382,7 +3406,7 @@ done:
mdb_dbis_update(txn, 1); mdb_dbis_update(txn, 1);
if (env->me_txns) if (env->me_txns)
UNLOCK_MUTEX_W(env); UNLOCK_MUTEX(MDB_MUTEX(env, w));
if (txn != env->me_txn0) if (txn != env->me_txn0)
free(txn); free(txn);
@ -4293,6 +4317,7 @@ mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
if (!env->me_rmutex) goto fail_errno; if (!env->me_rmutex) goto fail_errno;
env->me_wmutex = CreateMutex(&mdb_all_sa, FALSE, env->me_txns->mti_wmname); env->me_wmutex = CreateMutex(&mdb_all_sa, FALSE, env->me_txns->mti_wmname);
if (!env->me_wmutex) goto fail_errno; if (!env->me_wmutex) goto fail_errno;
env->me_flags |= MDB_ROBUST;
#elif defined(MDB_USE_POSIX_SEM) #elif defined(MDB_USE_POSIX_SEM)
struct stat stbuf; struct stat stbuf;
struct { struct {
@ -4332,16 +4357,24 @@ mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
if ((rc = pthread_mutexattr_init(&mattr)) if ((rc = pthread_mutexattr_init(&mattr))
|| (rc = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED)) || (rc = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED))
|| (rc = pthread_mutex_init(&env->me_txns->mti_mutex, &mattr)) #ifdef MDB_ROBUST_SUPPORTED
|| ((env->me_flags & MDB_ROBUST) &&
(rc = pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST)))
#endif
|| (rc = pthread_mutex_init(&env->me_txns->mti_rmutex, &mattr))
|| (rc = pthread_mutex_init(&env->me_txns->mti_wmutex, &mattr))) || (rc = pthread_mutex_init(&env->me_txns->mti_wmutex, &mattr)))
goto fail; goto fail;
pthread_mutexattr_destroy(&mattr); pthread_mutexattr_destroy(&mattr);
#endif /* _WIN32 || MDB_USE_POSIX_SEM */ #endif /* _WIN32 || MDB_USE_POSIX_SEM */
#ifndef MDB_ROBUST_SUPPORTED
env->me_flags &= ~MDB_ROBUST;
#endif
env->me_txns->mti_magic = MDB_MAGIC; env->me_txns->mti_magic = MDB_MAGIC;
env->me_txns->mti_format = MDB_LOCK_FORMAT; env->me_txns->mti_format = MDB_LOCK_FORMAT;
env->me_txns->mti_txnid = 0; env->me_txns->mti_txnid = 0;
env->me_txns->mti_numreaders = 0; env->me_txns->mti_numreaders = 0;
env->me_txns->mti_flags = env->me_flags;
} else { } else {
if (env->me_txns->mti_magic != MDB_MAGIC) { if (env->me_txns->mti_magic != MDB_MAGIC) {
@ -4359,6 +4392,8 @@ mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
if (rc && rc != EACCES && rc != EAGAIN) { if (rc && rc != EACCES && rc != EAGAIN) {
goto fail; goto fail;
} }
env->me_flags = (env->me_flags & ~MDB_ROBUST) |
(env->me_txns->mti_flags & MDB_ROBUST);
#ifdef _WIN32 #ifdef _WIN32
env->me_rmutex = OpenMutex(SYNCHRONIZE, FALSE, env->me_txns->mti_rmname); env->me_rmutex = OpenMutex(SYNCHRONIZE, FALSE, env->me_txns->mti_rmname);
if (!env->me_rmutex) goto fail_errno; if (!env->me_rmutex) goto fail_errno;
@ -4390,8 +4425,13 @@ fail:
* environment and re-opening it with the new flags. * environment and re-opening it with the new flags.
*/ */
#define CHANGEABLE (MDB_NOSYNC|MDB_NOMETASYNC|MDB_MAPASYNC|MDB_NOMEMINIT) #define CHANGEABLE (MDB_NOSYNC|MDB_NOMETASYNC|MDB_MAPASYNC|MDB_NOMEMINIT)
#define CHANGELESS (MDB_FIXEDMAP|MDB_NOSUBDIR|MDB_RDONLY|MDB_WRITEMAP| \ #define CHANGELESS (MDB_FIXEDMAP|MDB_NOSUBDIR|MDB_RDONLY|ROBUST_FLAG| \
MDB_NOTLS|MDB_NOLOCK|MDB_NORDAHEAD) MDB_WRITEMAP|MDB_NOTLS|MDB_NOLOCK|MDB_NORDAHEAD)
#ifdef MDB_ROBUST_SUPPORTED
#define ROBUST_FLAG MDB_ROBUST
#else
#define ROBUST_FLAG 0
#endif
#if VALID_FLAGS & PERSISTENT_FLAGS & (CHANGEABLE|CHANGELESS) #if VALID_FLAGS & PERSISTENT_FLAGS & (CHANGEABLE|CHANGELESS)
# error "Persistent DB flags & env flags overlap, but both go in mm_flags" # error "Persistent DB flags & env flags overlap, but both go in mm_flags"
@ -4632,7 +4672,6 @@ mdb_env_close0(MDB_env *env, int excl)
env->me_flags &= ~(MDB_ENV_ACTIVE|MDB_ENV_TXKEY); env->me_flags &= ~(MDB_ENV_ACTIVE|MDB_ENV_TXKEY);
} }
void ESECT void ESECT
mdb_env_close(MDB_env *env) mdb_env_close(MDB_env *env)
{ {
@ -8622,6 +8661,7 @@ static int ESECT
mdb_env_copyfd0(MDB_env *env, HANDLE fd) mdb_env_copyfd0(MDB_env *env, HANDLE fd)
{ {
MDB_txn *txn = NULL; MDB_txn *txn = NULL;
mdb_mutex_t wmutex = NULL;
int rc; int rc;
size_t wsize; size_t wsize;
char *ptr; char *ptr;
@ -8646,11 +8686,13 @@ mdb_env_copyfd0(MDB_env *env, HANDLE fd)
mdb_txn_reset0(txn, "reset-stage1"); mdb_txn_reset0(txn, "reset-stage1");
/* Temporarily block writers until we snapshot the meta pages */ /* Temporarily block writers until we snapshot the meta pages */
LOCK_MUTEX_W(env); wmutex = MDB_MUTEX(env, w);
if (LOCK_MUTEX(rc, env, wmutex))
goto leave;
rc = mdb_txn_renew0(txn); rc = mdb_txn_renew0(txn);
if (rc) { if (rc) {
UNLOCK_MUTEX_W(env); UNLOCK_MUTEX(wmutex);
goto leave; goto leave;
} }
} }
@ -8674,8 +8716,8 @@ mdb_env_copyfd0(MDB_env *env, HANDLE fd)
break; break;
} }
} }
if (env->me_txns) if (wmutex)
UNLOCK_MUTEX_W(env); UNLOCK_MUTEX(wmutex);
if (rc) if (rc)
goto leave; goto leave;
@ -8806,7 +8848,7 @@ mdb_env_copy(MDB_env *env, const char *path)
int ESECT int ESECT
mdb_env_set_flags(MDB_env *env, unsigned int flag, int onoff) mdb_env_set_flags(MDB_env *env, unsigned int flag, int onoff)
{ {
if ((flag & CHANGEABLE) != flag) if (flag & (env->me_map ? ~CHANGEABLE : ~(CHANGEABLE|CHANGELESS)))
return EINVAL; return EINVAL;
if (onoff) if (onoff)
env->me_flags |= flag; env->me_flags |= flag;
@ -9360,17 +9402,22 @@ mdb_pid_insert(MDB_PID_T *ids, MDB_PID_T pid)
int ESECT int ESECT
mdb_reader_check(MDB_env *env, int *dead) mdb_reader_check(MDB_env *env, int *dead)
{ {
unsigned int i, j, rdrs;
MDB_reader *mr;
MDB_PID_T *pids, pid;
int count = 0;
if (!env) if (!env)
return EINVAL; return EINVAL;
if (dead) if (dead)
*dead = 0; *dead = 0;
if (!env->me_txns) return env->me_txns ? mdb_reader_check0(env, 0, dead) : MDB_SUCCESS;
return MDB_SUCCESS; }
/** As #mdb_reader_check(). rlocked = <caller locked the reader mutex>. */
static int mdb_reader_check0(MDB_env *env, int rlocked, int *dead)
{
mdb_mutex_t rmutex = rlocked ? NULL : MDB_MUTEX(env, r);
unsigned int i, j, rdrs;
MDB_reader *mr;
pid_t *pids, pid;
int rc = MDB_SUCCESS, count = 0;
rdrs = env->me_txns->mti_numreaders; rdrs = env->me_txns->mti_numreaders;
pids = malloc((rdrs+1) * sizeof(MDB_PID_T)); pids = malloc((rdrs+1) * sizeof(MDB_PID_T));
if (!pids) if (!pids)
@ -9378,22 +9425,32 @@ mdb_reader_check(MDB_env *env, int *dead)
pids[0] = 0; pids[0] = 0;
mr = env->me_txns->mti_readers; mr = env->me_txns->mti_readers;
for (i=0; i<rdrs; i++) { for (i=0; i<rdrs; i++) {
if (mr[i].mr_pid && mr[i].mr_pid != env->me_pid) {
pid = mr[i].mr_pid; pid = mr[i].mr_pid;
if (pid && pid != env->me_pid) {
if (mdb_pid_insert(pids, pid) == 0) { if (mdb_pid_insert(pids, pid) == 0) {
if (!mdb_reader_pid(env, Pidcheck, pid)) { if (!mdb_reader_pid(env, Pidcheck, pid)) {
LOCK_MUTEX_R(env); /* Stale reader found */
j = i;
if (rmutex) {
if ((rc = LOCK_MUTEX0(rmutex)) != 0) {
if ((rc = mdb_mutex_failed(env, rmutex, rc)))
break;
rdrs = 0; /* the above checked all readers */
} else {
/* Recheck, a new process may have reused pid */ /* Recheck, a new process may have reused pid */
if (!mdb_reader_pid(env, Pidcheck, pid)) { if (mdb_reader_pid(env, Pidcheck, pid))
for (j=i; j<rdrs; j++) j = rdrs;
}
}
for (; j<rdrs; j++)
if (mr[j].mr_pid == pid) { if (mr[j].mr_pid == pid) {
DPRINTF(("clear stale reader pid %u txn %"Z"d", DPRINTF(("clear stale reader pid %u txn %"Z"d",
(unsigned) pid, mr[j].mr_txnid)); (unsigned) pid, mr[j].mr_txnid));
mr[j].mr_pid = 0; mr[j].mr_pid = 0;
count++; count++;
} }
} if (rmutex)
UNLOCK_MUTEX_R(env); UNLOCK_MUTEX(rmutex);
} }
} }
} }
@ -9401,6 +9458,59 @@ mdb_reader_check(MDB_env *env, int *dead)
free(pids); free(pids);
if (dead) if (dead)
*dead = count; *dead = count;
return MDB_SUCCESS; return rc;
}
#ifdef MDB_ROBUST_SUPPORTED
/** Handle #LOCK_MUTEX0() failure.
* With #MDB_ROBUST, try to repair the lock file if the mutex owner died.
* @param[in] env the environment handle
* @param[in] mutex LOCK_MUTEX0() mutex
* @param[in] rc LOCK_MUTEX0() error (nonzero)
* @return 0 on success with the mutex locked, or an error code on failure.
*/
static int mdb_mutex_failed(MDB_env *env, mdb_mutex_t mutex, int rc)
{
int toggle, rlocked, rc2;
#ifndef _WIN32
enum { WAIT_ABANDONED = EOWNERDEAD };
#endif
if (rc == (int) WAIT_ABANDONED) {
/* We own the mutex. Clean up after dead previous owner. */
rc = MDB_SUCCESS;
rlocked = (mutex == MDB_MUTEX(env, r));
if (!rlocked) {
/* Keep mti_txnid updated, otherwise next writer can
* overwrite data which latest meta page refers to.
* TODO: Instead revert any aborted commit and sync?
*/
toggle = mdb_env_pick_meta(env);
env->me_txns->mti_txnid = env->me_metas[toggle]->mm_txnid;
/* env is hosed if the dead thread was ours */
if (env->me_txn) {
env->me_flags |= MDB_FATAL_ERROR;
env->me_txn = NULL;
rc = MDB_PANIC;
}
}
DPRINTF(("%cmutex owner died, %s", (rlocked ? 'r' : 'w'),
(rc ? "this process' env is hosed" : "recovering")));
rc2 = mdb_reader_check0(env, rlocked, NULL);
if (rc2 == 0)
rc2 = pthread_mutex_consistent(mutex);
if (rc || (rc = rc2)) {
DPRINTF(("LOCK_MUTEX recovery failed, %s", mdb_strerror(rc)));
UNLOCK_MUTEX(mutex);
}
} else {
#ifdef _WIN32
rc = ErrCode();
#endif
DPRINTF(("LOCK_MUTEX failed, %s", mdb_strerror(rc)));
}
return rc;
} }
#endif /* MDB_ROBUST_SUPPORTED */
/** @} */ /** @} */

Loading…
Cancel
Save