Remove mdb data races. Use (txnid_t)-1 as "no ID".

Avoid race between numreaders++ and reading numreaders at cleanup. Make
the un-mutexed reset of reader table entry, atomic: Reset mr_pid only.

Instead check mr_pid != 0 in mdb_page_alloc()'s scan for readers.
(txnid_t)-1 as "no ID"-mark avoids a check for mr_txnid != 0.
The scan can stop when seeing an old reader.
vmware
Hallvard Furuseth 12 years ago
parent 38cc1e96b4
commit a35f9b2a53
  1. 31
      libraries/libmdb/mdb.c

@ -433,7 +433,7 @@ typedef uint16_t indx_t;
* lock file.
*/
typedef struct MDB_rxbody {
/** The current Transaction ID when this transaction began.
/** Current Transaction ID when this transaction began, or (txnid_t)-1.
* Multiple readers that start at the same time will probably have the
* same ID here. Again, it's not important to exclude them from
* anything; all we need to know is which version of the DB they
@ -904,6 +904,7 @@ struct MDB_env {
uint32_t me_flags; /**< @ref mdb_env */
unsigned int me_psize; /**< size of a page, from #GET_PAGESIZE */
unsigned int me_maxreaders; /**< size of the reader table */
unsigned int me_numreaders; /**< max numreaders set by this env */
MDB_dbi me_numdbs; /**< number of DBs opened */
MDB_dbi me_maxdbs; /**< size of the DB table */
pid_t me_pid; /**< process ID of this env */
@ -1233,10 +1234,12 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp)
if (!txn->mt_env->me_pghead &&
txn->mt_dbs[FREE_DBI].md_root != P_INVALID) {
/* See if there's anything in the free DB */
int j;
MDB_reader *r;
MDB_cursor m2;
MDB_node *leaf;
MDB_val data;
txnid_t *kptr, oldest, last;
txnid_t *kptr, last;
mdb_cursor_init(&m2, txn, FREE_DBI, NULL);
if (!txn->mt_env->me_pgfirst) {
@ -1259,17 +1262,15 @@ again:
last = *(txnid_t *)key.mv_data;
}
{
unsigned int i;
oldest = txn->mt_txnid - 1;
for (i=0; i<txn->mt_env->me_txns->mti_numreaders; i++) {
txnid_t mr = txn->mt_env->me_txns->mti_readers[i].mr_txnid;
if (mr && mr < oldest)
oldest = mr;
}
/* Unusable if referred by a meta page or reader... */
j = 1;
if (last < txn->mt_txnid-1) {
j = txn->mt_env->me_txns->mti_numreaders;
r = txn->mt_env->me_txns->mti_readers + j;
for (j = -j; j && (last<r[j].mr_txnid || !r[j].mr_pid); j++) ;
}
if (oldest > last) {
if (!j) {
/* It's usable, grab it.
*/
MDB_oldpages *mop;
@ -1641,6 +1642,8 @@ mdb_txn_renew0(MDB_txn *txn)
env->me_txns->mti_readers[i].mr_tid = tid;
if (i >= env->me_txns->mti_numreaders)
env->me_txns->mti_numreaders = i+1;
/* Save numreaders for un-mutexed mdb_env_close() */
env->me_numreaders = env->me_txns->mti_numreaders;
UNLOCK_MUTEX_R(env);
r = &env->me_txns->mti_readers[i];
pthread_setspecific(env->me_txkey, r);
@ -1788,7 +1791,7 @@ mdb_txn_reset0(MDB_txn *txn)
MDB_env *env = txn->mt_env;
if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
txn->mt_u.reader->mr_txnid = 0;
txn->mt_u.reader->mr_txnid = (txnid_t)-1;
} else {
MDB_oldpages *mop;
MDB_page *dp;
@ -2668,9 +2671,7 @@ mdb_env_reader_dest(void *ptr)
{
MDB_reader *reader = ptr;
reader->mr_txnid = 0;
reader->mr_pid = 0;
reader->mr_tid = 0;
}
#ifdef _WIN32
@ -3259,7 +3260,7 @@ mdb_env_close(MDB_env *env)
if (env->me_txns) {
pid_t pid = env->me_pid;
unsigned int i;
for (i=0; i<env->me_txns->mti_numreaders; i++)
for (i=0; i<env->me_numreaders; i++)
if (env->me_txns->mti_readers[i].mr_pid == pid)
env->me_txns->mti_readers[i].mr_pid = 0;
#ifdef _WIN32

Loading…
Cancel
Save