From 0f459dd77307b296078d3ce507522322bb6549ab Mon Sep 17 00:00:00 2001 From: Howard Chu Date: Mon, 29 Aug 2011 16:55:41 -0700 Subject: [PATCH] Cleanup, fix freelist alloc Don't allow new pages for free list to come from the free list. Otherwise a nasty data self-reference occurs that is too much trouble to unwind. --- libraries/libmdb/mdb.c | 208 ++++++++++++++++++++++------------------- 1 file changed, 110 insertions(+), 98 deletions(-) diff --git a/libraries/libmdb/mdb.c b/libraries/libmdb/mdb.c index e537322..adfe542 100644 --- a/libraries/libmdb/mdb.c +++ b/libraries/libmdb/mdb.c @@ -268,8 +268,8 @@ typedef struct MDB_pageparent { unsigned mp_pi; } MDB_pageparent; -static MDB_dpage *mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num); -static int mdb_touch(MDB_txn *txn, MDB_pageparent *mp); +static MDB_dpage *mdb_alloc_page(MDB_txn *txn, MDB_dbi dbi, MDB_page *parent, unsigned int parent_idx, int num); +static int mdb_touch(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mp); typedef struct MDB_ppage { /* ordered list of pages */ MDB_page *mp_page; @@ -321,7 +321,6 @@ typedef struct MDB_dbx { struct MDB_txn { pgno_t mt_next_pgno; /* next unallocated page */ ULONG mt_txnid; - ULONG mt_oldest; MDB_env *mt_env; pgno_t *mt_free_pgs; /* this is an IDL */ union { @@ -570,73 +569,72 @@ mdb_dcmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b) /* Allocate new page(s) for writing */ static MDB_dpage * -mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num) +mdb_alloc_page(MDB_txn *txn, MDB_dbi dbi, MDB_page *parent, unsigned int parent_idx, int num) { MDB_dpage *dp; pgno_t pgno = P_INVALID; - ULONG oldest; MIDL2 mid; if (txn->mt_txnid > 2) { - oldest = txn->mt_txnid - 1; - if (!txn->mt_env->me_pghead && txn->mt_dbs[FREE_DBI].md_root != P_INVALID) { - /* See if there's anything in the free DB */ - MDB_pageparent mpp; - MDB_node *leaf; - ULONG *kptr; - - mpp.mp_parent = NULL; - mpp.mp_pi = 0; - mdb_search_page(txn, FREE_DBI, NULL, NULL, 0, &mpp); - leaf = NODEPTR(mpp.mp_page, 0); - kptr = (ULONG *)NODEKEY(leaf); + if (!txn->mt_env->me_pghead && dbi != FREE_DBI && + txn->mt_dbs[FREE_DBI].md_root != P_INVALID) { + /* See if there's anything in the free DB */ + MDB_pageparent mpp; + MDB_node *leaf; + ULONG *kptr, oldest; - /* It's potentially usable, unless there are still - * older readers outstanding. Grab it. - */ - if (oldest > *kptr) { - MDB_oldpages *mop; - MDB_val data; - pgno_t *idl; - - mdb_read_data(txn, leaf, &data); - idl = (ULONG *)data.mv_data; - mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(idl) - sizeof(pgno_t)); - mop->mo_next = txn->mt_env->me_pghead; - mop->mo_txnid = *kptr; - txn->mt_env->me_pghead = mop; - memcpy(mop->mo_pages, idl, MDB_IDL_SIZEOF(idl)); + mpp.mp_parent = NULL; + mpp.mp_pi = 0; + mdb_search_page(txn, FREE_DBI, NULL, NULL, 0, &mpp); + leaf = NODEPTR(mpp.mp_page, 0); + kptr = (ULONG *)NODEKEY(leaf); -#if DEBUG > 1 { unsigned int i; - DPRINTF("IDL read txn %lu root %lu num %lu", - mop->mo_txnid, txn->mt_dbs[FREE_DBI].md_root, idl[0]); - for (i=0; imt_txnid - 1; + for (i=0; imt_env->me_txns->mti_numreaders; i++) { + ULONG mr = txn->mt_env->me_txns->mti_readers[i].mr_txnid; + if (mr && mr < oldest) + oldest = mr; } } + + if (oldest > *kptr) { + /* It's usable, grab it. + */ + MDB_oldpages *mop; + MDB_val data; + pgno_t *idl; + + mdb_read_data(txn, leaf, &data); + idl = (ULONG *)data.mv_data; + mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(idl) - sizeof(pgno_t)); + mop->mo_next = txn->mt_env->me_pghead; + mop->mo_txnid = *kptr; + txn->mt_env->me_pghead = mop; + memcpy(mop->mo_pages, idl, MDB_IDL_SIZEOF(idl)); + +#if DEBUG > 1 + { + unsigned int i; + DPRINTF("IDL read txn %lu root %lu num %lu", + mop->mo_txnid, txn->mt_dbs[FREE_DBI].md_root, idl[0]); + for (i=0; imt_env->me_pghead) { - unsigned int i; - for (i=0; imt_env->me_txns->mti_numreaders; i++) { - ULONG mr = txn->mt_env->me_txns->mti_readers[i].mr_txnid; - if (!mr) continue; - if (mr < oldest) - oldest = txn->mt_env->me_txns->mti_readers[i].mr_txnid; + /* drop this IDL from the DB */ + mpp.mp_parent = NULL; + mpp.mp_pi = 0; + mdb_search_page(txn, FREE_DBI, NULL, NULL, 1, &mpp); + leaf = NODEPTR(mpp.mp_page, 0); + mdb_del0(txn, FREE_DBI, 0, &mpp, leaf); + } } - if (oldest > txn->mt_env->me_pghead->mo_txnid) { + if (txn->mt_env->me_pghead) { MDB_oldpages *mop = txn->mt_env->me_pghead; - txn->mt_oldest = oldest; if (num > 1) { /* FIXME: For now, always use fresh pages. We * really ought to search the free list for a @@ -660,7 +658,6 @@ mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num) } } } - } if (pgno == P_INVALID) { /* DB size is maxed out */ @@ -693,7 +690,7 @@ mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num) /* Touch a page: make it dirty and re-insert into tree with updated pgno. */ static int -mdb_touch(MDB_txn *txn, MDB_pageparent *pp) +mdb_touch(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *pp) { MDB_page *mp = pp->mp_page; pgno_t pgno; @@ -702,9 +699,9 @@ mdb_touch(MDB_txn *txn, MDB_pageparent *pp) if (!F_ISSET(mp->mp_flags, P_DIRTY)) { MDB_dpage *dp; - if ((dp = mdb_alloc_page(txn, pp->mp_parent, pp->mp_pi, 1)) == NULL) + if ((dp = mdb_alloc_page(txn, dbi, pp->mp_parent, pp->mp_pi, 1)) == NULL) return ENOMEM; - DPRINTF("touched page %lu -> %lu", mp->mp_pgno, dp->p.mp_pgno); + DPRINTF("touched db %u page %lu -> %lu", dbi, mp->mp_pgno, dp->p.mp_pgno); assert(mp->mp_pgno != dp->p.mp_pgno); mdb_midl_insert(txn->mt_free_pgs, mp->mp_pgno); pgno = dp->p.mp_pgno; @@ -740,11 +737,6 @@ mdb_txn_renew0(MDB_txn *txn) { MDB_env *env = txn->mt_env; - if (env->me_flags & MDB_FATAL_ERROR) { - DPUTS("mdb_txn_begin: environment had fatal error, must shutdown!"); - return MDB_PANIC; - } - if (txn->mt_flags & MDB_TXN_RDONLY) { MDB_reader *r = pthread_getspecific(env->me_txkey); if (!r) { @@ -795,10 +787,6 @@ mdb_txn_renew0(MDB_txn *txn) (txn->mt_numdbs - 2) * sizeof(MDB_db)); LAZY_RWLOCK_UNLOCK(&env->me_dblock); - DPRINTF("begin txn %p %lu%c on mdbenv %p, root page %lu", txn, - txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', - (void *) env, txn->mt_dbs[MAIN_DBI].md_root); - return MDB_SUCCESS; } @@ -810,10 +798,15 @@ mdb_txn_renew(MDB_txn *txn) if (!txn) return EINVAL; + if (txn->mt_env->me_flags & MDB_FATAL_ERROR) { + DPUTS("environment had fatal error, must shutdown!"); + return MDB_PANIC; + } + rc = mdb_txn_renew0(txn); if (rc == MDB_SUCCESS) { - DPRINTF("reset txn %p %lu%c on mdbenv %p, root page %lu", txn, - txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', + DPRINTF("renew txn %lu%c %p on mdbenv %p, root page %lu", + txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', txn, (void *)txn->mt_env, txn->mt_dbs[MAIN_DBI].md_root); } return rc; @@ -826,7 +819,7 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret) int rc; if (env->me_flags & MDB_FATAL_ERROR) { - DPUTS("mdb_txn_begin: environment had fatal error, must shutdown!"); + DPUTS("environment had fatal error, must shutdown!"); return MDB_PANIC; } if ((txn = calloc(1, sizeof(MDB_txn) + env->me_maxdbs * sizeof(MDB_db))) == NULL) { @@ -844,8 +837,8 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret) free(txn); else { *ret = txn; - DPRINTF("begin txn %p %lu%c on mdbenv %p, root page %lu", txn, - txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', + DPRINTF("begin txn %lu%c %p on mdbenv %p, root page %lu", + txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', txn, (void *) env, txn->mt_dbs[MAIN_DBI].md_root); } @@ -894,8 +887,8 @@ mdb_txn_reset(MDB_txn *txn) if (txn == NULL) return; - DPRINTF("reset txn %p %lu%c on mdbenv %p, root page %lu", txn, - txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', + DPRINTF("reset txn %lu%c %p on mdbenv %p, root page %lu", + txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', txn, (void *)txn->mt_env, txn->mt_dbs[MAIN_DBI].md_root); mdb_txn_reset0(txn); @@ -907,8 +900,8 @@ mdb_txn_abort(MDB_txn *txn) if (txn == NULL) return; - DPRINTF("abort txn %p %lu%c on mdbenv %p, root page %lu", txn, - txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', + DPRINTF("abort txn %lu%c %p on mdbenv %p, root page %lu", + txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', txn, (void *)txn->mt_env, txn->mt_dbs[MAIN_DBI].md_root); mdb_txn_reset0(txn); @@ -952,27 +945,23 @@ mdb_txn_commit(MDB_txn *txn) if (!txn->mt_u.dirty_list[0].mid) goto done; - DPRINTF("committing txn %p %lu on mdbenv %p, root page %lu", txn, - txn->mt_txnid, (void *) env, txn->mt_dbs[MAIN_DBI].md_root); + DPRINTF("committing txn %lu %p on mdbenv %p, root page %lu", + txn->mt_txnid, txn, (void *)env, txn->mt_dbs[MAIN_DBI].md_root); /* should only be one record now */ if (env->me_pghead) { - MDB_val key, data; - MDB_oldpages *mop; + MDB_pageparent mpp; - mop = env->me_pghead; - key.mv_size = sizeof(pgno_t); - key.mv_data = (char *)&mop->mo_txnid; - data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages); - data.mv_data = mop->mo_pages; - mdb_put0(txn, FREE_DBI, &key, &data, 0); - free(env->me_pghead); - env->me_pghead = NULL; + /* make sure first page of freeDB is touched and on freelist */ + mpp.mp_parent = NULL; + mpp.mp_pi = 0; + mdb_search_page(txn, FREE_DBI, NULL, NULL, 1, &mpp); } /* save to free list */ if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) { MDB_val key, data; MDB_pageparent mpp; + ULONG i; /* make sure last page of freeDB is touched and on freelist */ key.mv_size = MAXKEYSIZE+1; @@ -995,9 +984,34 @@ mdb_txn_commit(MDB_txn *txn) /* write to last page of freeDB */ key.mv_size = sizeof(pgno_t); key.mv_data = (char *)&txn->mt_txnid; - data.mv_size = MDB_IDL_SIZEOF(txn->mt_free_pgs); data.mv_data = txn->mt_free_pgs; + /* The free list can still grow during this call, + * despite the pre-emptive touches above. So check + * and make sure the entire thing got written. + */ + do { + i = txn->mt_free_pgs[0]; + data.mv_size = MDB_IDL_SIZEOF(txn->mt_free_pgs); + rc = mdb_put0(txn, FREE_DBI, &key, &data, 0); + if (rc) { + mdb_txn_abort(txn); + return rc; + } + } while (i != txn->mt_free_pgs[0]); + } + /* should only be one record now */ + if (env->me_pghead) { + MDB_val key, data; + MDB_oldpages *mop; + + mop = env->me_pghead; + key.mv_size = sizeof(pgno_t); + key.mv_data = (char *)&mop->mo_txnid; + data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages); + data.mv_data = mop->mo_pages; mdb_put0(txn, FREE_DBI, &key, &data, 0); + free(env->me_pghead); + env->me_pghead = NULL; } /* Update DB root pointers. Their pages have already been @@ -1819,7 +1833,7 @@ mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, if (modify) { MDB_dhead *dh = ((MDB_dhead *)mp)-1; - if ((rc = mdb_touch(txn, mpp)) != 0) + if ((rc = mdb_touch(txn, dbi, mpp)) != 0) return rc; dh = ((MDB_dhead *)mpp->mp_page)-1; dh->md_parent = mpp->mp_parent; @@ -1888,7 +1902,7 @@ mdb_search_page(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, if (!F_ISSET(mpp->mp_page->mp_flags, P_DIRTY)) { mpp->mp_parent = NULL; mpp->mp_pi = 0; - if ((rc = mdb_touch(txn, mpp))) + if ((rc = mdb_touch(txn, dbi, mpp))) return rc; txn->mt_dbs[dbi].md_root = mpp->mp_page->mp_pgno; } @@ -2475,7 +2489,7 @@ mdb_new_page(MDB_txn *txn, MDB_dbi dbi, uint32_t flags, int num) { MDB_dpage *dp; - if ((dp = mdb_alloc_page(txn, NULL, 0, num)) == NULL) + if ((dp = mdb_alloc_page(txn, dbi, NULL, 0, num)) == NULL) return NULL; DPRINTF("allocated new mpage %lu, page size %u", dp->p.mp_pgno, txn->mt_env->me_psize); @@ -2728,7 +2742,6 @@ mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_page *mp, MDB_ mx->mx_dbxs[dbn].md_name.mv_data = NODEKEY(node); mx->mx_dbxs[dbn].md_name.mv_size = node->mn_ksize; mx->mx_txn.mt_next_pgno = txn->mt_next_pgno; - mx->mx_txn.mt_oldest = txn->mt_oldest; mx->mx_txn.mt_u = txn->mt_u; mx->mx_cursor.mc_initialized = 0; mx->mx_cursor.mc_eof = 0; @@ -2738,7 +2751,6 @@ static void mdb_xcursor_fini(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx) { txn->mt_next_pgno = mx->mx_txn.mt_next_pgno; - txn->mt_oldest = mx->mx_txn.mt_oldest; txn->mt_u = mx->mx_txn.mt_u; txn->mt_dbs[0] = mx->mx_dbs[0]; txn->mt_dbs[1] = mx->mx_dbs[1]; @@ -2870,8 +2882,8 @@ mdb_move_node(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src, indx_t srcindx, DKBUF; /* Mark src and dst as dirty. */ - if ((rc = mdb_touch(txn, src)) || - (rc = mdb_touch(txn, dst))) + if ((rc = mdb_touch(txn, dbi, src)) || + (rc = mdb_touch(txn, dbi, dst))) return rc;; if (IS_LEAF2(src->mp_page)) { @@ -2964,8 +2976,8 @@ mdb_merge(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src, MDB_pageparent *dst) assert(dst->mp_parent); /* Mark src and dst as dirty. */ - if ((rc = mdb_touch(txn, src)) || - (rc = mdb_touch(txn, dst))) + if ((rc = mdb_touch(txn, dbi, src)) || + (rc = mdb_touch(txn, dbi, dst))) return rc; /* Move all nodes from src to dst.