ITS#8209 MDB_CP_COMPACT: Threading/error handling

Handle errors.  Fix cond_wait condition so mc_new
is the sole control var.  Drop specious cond_waits.
Do not look at 'mo' while copythr writes it.

Don't know if posix_memalign() pointer is defined after failure.
Some _aligned_free() doc seems to say arg NULL = user error.
mdb.RE/0.9
Hallvard Furuseth 9 years ago
parent f24d7d2c83
commit 1d86235047
  1. 155
      libraries/liblmdb/mdb.c

@ -291,8 +291,10 @@ typedef HANDLE mdb_mutex_t, mdb_mutexref_t;
#define pthread_mutex_lock(x) WaitForSingleObject(*x, INFINITE) #define pthread_mutex_lock(x) WaitForSingleObject(*x, INFINITE)
#define pthread_cond_signal(x) SetEvent(*x) #define pthread_cond_signal(x) SetEvent(*x)
#define pthread_cond_wait(cond,mutex) do{SignalObjectAndWait(*mutex, *cond, INFINITE, FALSE); WaitForSingleObject(*mutex, INFINITE);}while(0) #define pthread_cond_wait(cond,mutex) do{SignalObjectAndWait(*mutex, *cond, INFINITE, FALSE); WaitForSingleObject(*mutex, INFINITE);}while(0)
#define THREAD_CREATE(thr,start,arg) thr=CreateThread(NULL,0,start,arg,0,NULL) #define THREAD_CREATE(thr,start,arg) \
#define THREAD_FINISH(thr) WaitForSingleObject(thr, INFINITE) (((thr) = CreateThread(NULL, 0, start, arg, 0, NULL)) ? 0 : ErrCode())
#define THREAD_FINISH(thr) \
(WaitForSingleObject(thr, INFINITE) ? ErrCode() : 0)
#define LOCK_MUTEX0(mutex) WaitForSingleObject(mutex, INFINITE) #define LOCK_MUTEX0(mutex) WaitForSingleObject(mutex, INFINITE)
#define UNLOCK_MUTEX(mutex) ReleaseMutex(mutex) #define UNLOCK_MUTEX(mutex) ReleaseMutex(mutex)
#define mdb_mutex_consistent(mutex) 0 #define mdb_mutex_consistent(mutex) 0
@ -8818,11 +8820,12 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
#ifndef MDB_WBUF #ifndef MDB_WBUF
#define MDB_WBUF (1024*1024) #define MDB_WBUF (1024*1024)
#endif #endif
#define MDB_EOF 0x10 /**< #mdb_env_copyfd1() is done reading */
/** State needed for a compacting copy. */ /** State needed for a double-buffering compacting copy. */
typedef struct mdb_copy { typedef struct mdb_copy {
pthread_mutex_t mc_mutex; pthread_mutex_t mc_mutex;
pthread_cond_t mc_cond; pthread_cond_t mc_cond; /**< Condition variable for #mc_new */
char *mc_wbuf[2]; char *mc_wbuf[2];
char *mc_over[2]; char *mc_over[2];
MDB_env *mc_env; MDB_env *mc_env;
@ -8831,10 +8834,12 @@ typedef struct mdb_copy {
int mc_olen[2]; int mc_olen[2];
pgno_t mc_next_pgno; pgno_t mc_next_pgno;
HANDLE mc_fd; HANDLE mc_fd;
int mc_status; int mc_toggle; /**< Buffer number in provider */
volatile int mc_new; int mc_new; /**< (0-2 buffers to write) | (#MDB_EOF at end) */
int mc_toggle; /** Error code. Never cleared if set. Both threads can set nonzero
* to fail the copy. Not mutex-protected, LMDB expects atomic int.
*/
volatile int mc_error;
} mdb_copy; } mdb_copy;
/** Dedicated writer thread for compacting copy. */ /** Dedicated writer thread for compacting copy. */
@ -8853,20 +8858,16 @@ mdb_env_copythr(void *arg)
#endif #endif
pthread_mutex_lock(&my->mc_mutex); pthread_mutex_lock(&my->mc_mutex);
my->mc_new = 0;
pthread_cond_signal(&my->mc_cond);
for(;;) { for(;;) {
while (!my->mc_new) while (!my->mc_new)
pthread_cond_wait(&my->mc_cond, &my->mc_mutex); pthread_cond_wait(&my->mc_cond, &my->mc_mutex);
if (my->mc_new < 0) { if (my->mc_new == 0 + MDB_EOF) /* 0 buffers, just EOF */
my->mc_new = 0;
break; break;
}
my->mc_new = 0;
wsize = my->mc_wlen[toggle]; wsize = my->mc_wlen[toggle];
ptr = my->mc_wbuf[toggle]; ptr = my->mc_wbuf[toggle];
again: again:
while (wsize > 0) { rc = MDB_SUCCESS;
while (wsize > 0 && !my->mc_error) {
DO_WRITE(rc, my->mc_fd, ptr, wsize, len); DO_WRITE(rc, my->mc_fd, ptr, wsize, len);
if (!rc) { if (!rc) {
rc = ErrCode(); rc = ErrCode();
@ -8882,8 +8883,7 @@ again:
} }
} }
if (rc) { if (rc) {
my->mc_status = rc; my->mc_error = rc;
break;
} }
/* If there's an overflow page tail, write it too */ /* If there's an overflow page tail, write it too */
if (my->mc_olen[toggle]) { if (my->mc_olen[toggle]) {
@ -8894,34 +8894,41 @@ again:
} }
my->mc_wlen[toggle] = 0; my->mc_wlen[toggle] = 0;
toggle ^= 1; toggle ^= 1;
/* Return the empty buffer to provider */
my->mc_new--;
pthread_cond_signal(&my->mc_cond); pthread_cond_signal(&my->mc_cond);
} }
pthread_cond_signal(&my->mc_cond);
pthread_mutex_unlock(&my->mc_mutex); pthread_mutex_unlock(&my->mc_mutex);
return (THREAD_RET)0; return (THREAD_RET)0;
#undef DO_WRITE #undef DO_WRITE
} }
/** Tell the writer thread there's a buffer ready to write */ /** Give buffer and/or #MDB_EOF to writer thread, await unused buffer.
*
* @param[in] my control structure.
* @param[in] adjust (1 to hand off 1 buffer) | (MDB_EOF when ending).
*/
static int ESECT static int ESECT
mdb_env_cthr_toggle(mdb_copy *my, int st) mdb_env_cthr_toggle(mdb_copy *my, int adjust)
{ {
int toggle = my->mc_toggle ^ 1;
pthread_mutex_lock(&my->mc_mutex); pthread_mutex_lock(&my->mc_mutex);
if (my->mc_status) { my->mc_new += adjust;
pthread_mutex_unlock(&my->mc_mutex);
return my->mc_status;
}
while (my->mc_new == 1)
pthread_cond_wait(&my->mc_cond, &my->mc_mutex);
my->mc_new = st;
my->mc_toggle = toggle;
pthread_cond_signal(&my->mc_cond); pthread_cond_signal(&my->mc_cond);
while (my->mc_new & 2) /* both buffers in use */
pthread_cond_wait(&my->mc_cond, &my->mc_mutex);
pthread_mutex_unlock(&my->mc_mutex); pthread_mutex_unlock(&my->mc_mutex);
return 0;
my->mc_toggle ^= (adjust & 1);
/* Both threads reset mc_wlen, to be safe from threading errors */
my->mc_wlen[my->mc_toggle] = 0;
return my->mc_error;
} }
/** Depth-first tree traversal for compacting copy. */ /** Depth-first tree traversal for compacting copy.
* @param[in] my control structure.
* @param[in,out] pg database root.
* @param[in] flags includes #F_DUPDATA if it is a sorted-duplicate sub-DB.
*/
static int ESECT static int ESECT
mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags) mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags)
{ {
@ -8983,6 +8990,7 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags)
} }
memcpy(&pg, NODEDATA(ni), sizeof(pg)); memcpy(&pg, NODEDATA(ni), sizeof(pg));
memcpy(NODEDATA(ni), &my->mc_next_pgno, sizeof(pgno_t));
rc = mdb_page_get(&mc, pg, &omp, NULL); rc = mdb_page_get(&mc, pg, &omp, NULL);
if (rc) if (rc)
goto done; goto done;
@ -9005,7 +9013,6 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags)
goto done; goto done;
toggle = my->mc_toggle; toggle = my->mc_toggle;
} }
memcpy(NODEDATA(ni), &mo->mp_pgno, sizeof(pgno_t));
} else if (ni->mn_flags & F_SUBDATA) { } else if (ni->mn_flags & F_SUBDATA) {
MDB_db db; MDB_db db;
@ -9083,47 +9090,55 @@ mdb_env_copyfd1(MDB_env *env, HANDLE fd)
{ {
MDB_meta *mm; MDB_meta *mm;
MDB_page *mp; MDB_page *mp;
mdb_copy my; mdb_copy my = {0};
MDB_txn *txn = NULL; MDB_txn *txn = NULL;
pthread_t thr; pthread_t thr;
int rc; int rc = MDB_SUCCESS;
#ifdef _WIN32 #ifdef _WIN32
my.mc_mutex = CreateMutex(NULL, FALSE, NULL); if (!(my.mc_mutex = CreateMutex(NULL, FALSE, NULL)) ||
my.mc_cond = CreateEvent(NULL, FALSE, FALSE, NULL); !(my.mc_cond = CreateEvent(NULL, FALSE, FALSE, NULL))) {
rc = ErrCode();
goto done;
}
my.mc_wbuf[0] = _aligned_malloc(MDB_WBUF*2, env->me_os_psize); my.mc_wbuf[0] = _aligned_malloc(MDB_WBUF*2, env->me_os_psize);
if (my.mc_wbuf[0] == NULL) if (my.mc_wbuf[0] == NULL) {
return errno; /* _aligned_malloc() sets errno, but we use Windows error codes */
rc = ERROR_NOT_ENOUGH_MEMORY;
goto done;
}
#else #else
pthread_mutex_init(&my.mc_mutex, NULL); if ((rc = pthread_mutex_init(&my.mc_mutex, NULL)) != 0)
pthread_cond_init(&my.mc_cond, NULL); return rc;
if ((rc = pthread_cond_init(&my.mc_cond, NULL)) != 0)
goto done2;
#ifdef HAVE_MEMALIGN #ifdef HAVE_MEMALIGN
my.mc_wbuf[0] = memalign(env->me_os_psize, MDB_WBUF*2); my.mc_wbuf[0] = memalign(env->me_os_psize, MDB_WBUF*2);
if (my.mc_wbuf[0] == NULL) if (my.mc_wbuf[0] == NULL) {
return errno; rc = errno;
goto done;
}
#else #else
rc = posix_memalign((void **)&my.mc_wbuf[0], env->me_os_psize, MDB_WBUF*2); {
if (rc) void *p;
return rc; if ((rc = posix_memalign(&p, env->me_os_psize, MDB_WBUF*2)) != 0)
goto done;
my.mc_wbuf[0] = p;
}
#endif #endif
#endif #endif
memset(my.mc_wbuf[0], 0, MDB_WBUF*2); memset(my.mc_wbuf[0], 0, MDB_WBUF*2);
my.mc_wbuf[1] = my.mc_wbuf[0] + MDB_WBUF; my.mc_wbuf[1] = my.mc_wbuf[0] + MDB_WBUF;
my.mc_wlen[0] = 0;
my.mc_wlen[1] = 0;
my.mc_olen[0] = 0;
my.mc_olen[1] = 0;
my.mc_next_pgno = NUM_METAS; my.mc_next_pgno = NUM_METAS;
my.mc_status = 0;
my.mc_new = 1;
my.mc_toggle = 0;
my.mc_env = env; my.mc_env = env;
my.mc_fd = fd; my.mc_fd = fd;
THREAD_CREATE(thr, mdb_env_copythr, &my); rc = THREAD_CREATE(thr, mdb_env_copythr, &my);
if (rc)
goto done;
rc = mdb_txn_begin(env, NULL, MDB_RDONLY, &txn); rc = mdb_txn_begin(env, NULL, MDB_RDONLY, &txn);
if (rc) if (rc)
return rc; goto finish;
mp = (MDB_page *)my.mc_wbuf[0]; mp = (MDB_page *)my.mc_wbuf[0];
memset(mp, 0, NUM_METAS * env->me_psize); memset(mp, 0, NUM_METAS * env->me_psize);
@ -9149,6 +9164,8 @@ mdb_env_copyfd1(MDB_env *env, HANDLE fd)
mdb_cursor_init(&mc, txn, FREE_DBI, NULL); mdb_cursor_init(&mc, txn, FREE_DBI, NULL);
while ((rc = mdb_cursor_get(&mc, &key, &data, MDB_NEXT)) == 0) while ((rc = mdb_cursor_get(&mc, &key, &data, MDB_NEXT)) == 0)
freecount += *(MDB_ID *)data.mv_data; freecount += *(MDB_ID *)data.mv_data;
if (rc != MDB_NOTFOUND)
goto finish;
freecount += txn->mt_dbs[FREE_DBI].md_branch_pages + freecount += txn->mt_dbs[FREE_DBI].md_branch_pages +
txn->mt_dbs[FREE_DBI].md_leaf_pages + txn->mt_dbs[FREE_DBI].md_leaf_pages +
txn->mt_dbs[FREE_DBI].md_overflow_pages; txn->mt_dbs[FREE_DBI].md_overflow_pages;
@ -9165,31 +9182,27 @@ mdb_env_copyfd1(MDB_env *env, HANDLE fd)
} }
my.mc_wlen[0] = env->me_psize * NUM_METAS; my.mc_wlen[0] = env->me_psize * NUM_METAS;
my.mc_txn = txn; my.mc_txn = txn;
pthread_mutex_lock(&my.mc_mutex);
while(my.mc_new)
pthread_cond_wait(&my.mc_cond, &my.mc_mutex);
pthread_mutex_unlock(&my.mc_mutex);
rc = mdb_env_cwalk(&my, &txn->mt_dbs[MAIN_DBI].md_root, 0); rc = mdb_env_cwalk(&my, &txn->mt_dbs[MAIN_DBI].md_root, 0);
if (rc == MDB_SUCCESS && my.mc_wlen[my.mc_toggle])
rc = mdb_env_cthr_toggle(&my, 1);
mdb_env_cthr_toggle(&my, -1);
pthread_mutex_lock(&my.mc_mutex);
while(my.mc_new)
pthread_cond_wait(&my.mc_cond, &my.mc_mutex);
pthread_mutex_unlock(&my.mc_mutex);
THREAD_FINISH(thr);
finish:
if (rc)
my.mc_error = rc;
mdb_env_cthr_toggle(&my, 1 | MDB_EOF);
rc = THREAD_FINISH(thr);
mdb_txn_abort(txn); mdb_txn_abort(txn);
done:
#ifdef _WIN32 #ifdef _WIN32
CloseHandle(my.mc_cond); if (my.mc_wbuf[0]) _aligned_free(my.mc_wbuf[0]);
CloseHandle(my.mc_mutex); if (my.mc_cond) CloseHandle(my.mc_cond);
_aligned_free(my.mc_wbuf[0]); if (my.mc_mutex) CloseHandle(my.mc_mutex);
#else #else
free(my.mc_wbuf[0]);
pthread_cond_destroy(&my.mc_cond); pthread_cond_destroy(&my.mc_cond);
done2:
pthread_mutex_destroy(&my.mc_mutex); pthread_mutex_destroy(&my.mc_mutex);
free(my.mc_wbuf[0]);
#endif #endif
return rc; return rc ? rc : my.mc_error;
} }
/** Copy environment as-is. */ /** Copy environment as-is. */

Loading…
Cancel
Save