diff --git a/libraries/libmdb/mdb.c b/libraries/libmdb/mdb.c index 291bd22..9c6af1e 100644 --- a/libraries/libmdb/mdb.c +++ b/libraries/libmdb/mdb.c @@ -252,12 +252,13 @@ typedef struct MDB_ppage { /* ordered list of pages */ #define CURSOR_TOP(c) (&(c)->mc_stack[(c)->mc_snum-1]) #define CURSOR_PARENT(c) (&(c)->mc_stack[(c)->mc_snum-2]) +#define CURSOR_STACK 32 struct MDB_xcursor; struct MDB_cursor { MDB_txn *mc_txn; - MDB_ppage mc_stack[32]; /* stack of parent pages */ + MDB_ppage mc_stack[CURSOR_STACK]; /* stack of parent pages */ unsigned int mc_snum; /* number of pushed pages */ MDB_dbi mc_dbi; short mc_initialized; /* 1 if initialized */ @@ -377,7 +378,7 @@ static int mdb_search_page(MDB_txn *txn, static int mdb_env_read_header(MDB_env *env, MDB_meta *meta); static int mdb_env_read_meta(MDB_env *env, int *which); static int mdb_env_write_meta(MDB_txn *txn); -static MDB_page *mdb_get_page(MDB_txn *txn, pgno_t pgno); +static int mdb_get_page(MDB_txn *txn, pgno_t pgno, MDB_page **mp); static MDB_node *mdb_search_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, MDB_val *key, int *exactp, unsigned int *kip); @@ -677,6 +678,7 @@ mdb_touch(MDB_txn *txn, MDB_pageparent *pp) if ((dp = mdb_alloc_page(txn, pp->mp_parent, pp->mp_pi, 1)) == NULL) return ENOMEM; DPRINTF("touched page %lu -> %lu", mp->mp_pgno, dp->p.mp_pgno); + assert(mp->mp_pgno != dp->p.mp_pgno); mdb_midl_insert(txn->mt_free_pgs, mp->mp_pgno); pgno = dp->p.mp_pgno; memcpy(&dp->p, mp, txn->mt_env->me_psize); @@ -718,18 +720,6 @@ mdb_txn_renew0(MDB_txn *txn) return MDB_PANIC; } - if (!(txn->mt_flags & MDB_TXN_RDONLY)) { - txn->mt_u.dirty_list = env->me_dirty_list; - txn->mt_u.dirty_list[0].mid = 0; - txn->mt_free_pgs = env->me_free_pgs; - txn->mt_free_pgs[0] = 0; - - pthread_mutex_lock(&env->me_txns->mti_wmutex); - env->me_txns->mti_txnid++; - } - - txn->mt_txnid = env->me_txns->mti_txnid; - if (txn->mt_flags & MDB_TXN_RDONLY) { MDB_reader *r = pthread_getspecific(env->me_txkey); if (!r) { @@ -750,9 +740,17 @@ mdb_txn_renew0(MDB_txn *txn) env->me_txns->mti_numreaders = i+1; pthread_mutex_unlock(&env->me_txns->mti_mutex); } + txn->mt_txnid = env->me_txns->mti_txnid; r->mr_txnid = txn->mt_txnid; txn->mt_u.reader = r; } else { + pthread_mutex_lock(&env->me_txns->mti_wmutex); + + txn->mt_txnid = env->me_txns->mti_txnid+1; + txn->mt_u.dirty_list = env->me_dirty_list; + txn->mt_u.dirty_list[0].mid = 0; + txn->mt_free_pgs = env->me_free_pgs; + txn->mt_free_pgs[0] = 0; env->me_txn = txn; } @@ -776,8 +774,9 @@ mdb_txn_renew0(MDB_txn *txn) txn->mt_next_pgno = env->me_meta->mm_last_pg+1; } - DPRINTF("begin transaction %lu on mdbenv %p, root page %lu", - txn->mt_txnid, (void *) env, txn->mt_dbs[MAIN_DBI].md_root); + DPRINTF("begin txn %p %lu%c on mdbenv %p, root page %lu", txn, + txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', + (void *) env, txn->mt_dbs[MAIN_DBI].md_root); return MDB_SUCCESS; } @@ -862,7 +861,6 @@ mdb_txn_reset0(MDB_txn *txn) } env->me_txn = NULL; - env->me_txns->mti_txnid--; for (i=2; ime_numdbs; i++) env->me_dbxs[i].md_dirty = 0; pthread_mutex_unlock(&env->me_txns->mti_wmutex); @@ -933,7 +931,7 @@ mdb_txn_commit(MDB_txn *txn) if (!txn->mt_u.dirty_list[0].mid) goto done; - DPRINTF("committing transaction %lu on mdbenv %p, root page %lu", + DPRINTF("committing txn %p %lu on mdbenv %p, root page %lu", txn, txn->mt_txnid, (void *) env, txn->mt_dbs[MAIN_DBI].md_root); /* should only be one record now */ @@ -1034,6 +1032,7 @@ mdb_txn_commit(MDB_txn *txn) dp->p.mp_flags &= ~P_DIRTY; if (++n >= MDB_COMMIT_PAGES) { done = 0; + i++; break; } } @@ -1067,6 +1066,7 @@ mdb_txn_commit(MDB_txn *txn) } txn->mt_u.dirty_list[i].mid = 0; } + txn->mt_u.dirty_list[0].mid = 0; if ((n = mdb_env_sync(env, 0)) != 0 || (n = mdb_env_write_meta(txn)) != MDB_SUCCESS) { @@ -1074,6 +1074,8 @@ mdb_txn_commit(MDB_txn *txn) return n; } + env->me_txns->mti_txnid = txn->mt_txnid; + done: env->me_txn = NULL; /* update the DB tables */ @@ -1694,17 +1696,18 @@ cursor_push_page(MDB_cursor *cursor, MDB_page *mp) DPRINTF("pushing page %lu on db %u cursor %p", mp->mp_pgno, cursor->mc_dbi, (void *) cursor); + assert(cursor->mc_snum < CURSOR_STACK); + ppage = &cursor->mc_stack[cursor->mc_snum++]; ppage->mp_page = mp; ppage->mp_ki = 0; return ppage; } -static MDB_page * -mdb_get_page(MDB_txn *txn, pgno_t pgno) +static int +mdb_get_page(MDB_txn *txn, pgno_t pgno, MDB_page **ret) { MDB_page *p = NULL; - int found = 0; if (!F_ISSET(txn->mt_flags, MDB_TXN_RDONLY) && txn->mt_u.dirty_list[0].mid) { MDB_dpage *dp; @@ -1715,15 +1718,18 @@ mdb_get_page(MDB_txn *txn, pgno_t pgno) if (x <= txn->mt_u.dirty_list[0].mid && txn->mt_u.dirty_list[x].mid == pgno) { dp = txn->mt_u.dirty_list[x].mptr; p = &dp->p; - found = 1; } } - if (!found) { - if (pgno > txn->mt_env->me_meta->mm_last_pg) - return NULL; - p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno); + if (!p) { + if (pgno <= txn->mt_env->me_meta->mm_last_pg) + p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno); + } + *ret = p; + if (!p) { + DPRINTF("page %lu not found", pgno); + assert(p != NULL); } - return p; + return (p != NULL) ? MDB_SUCCESS : MDB_PAGE_NOTFOUND; } static int @@ -1771,8 +1777,8 @@ mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, CURSOR_TOP(cursor)->mp_ki = i; mpp->mp_parent = mp; - if ((mp = mdb_get_page(txn, NODEPGNO(node))) == NULL) - return MDB_PAGE_NOTFOUND; + if ((rc = mdb_get_page(txn, NODEPGNO(node), &mp))) + return rc; mpp->mp_pi = i; mpp->mp_page = mp; @@ -1831,10 +1837,11 @@ mdb_search_page(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, return MDB_NOTFOUND; } - if ((mpp->mp_page = mdb_get_page(txn, root)) == NULL) - return MDB_PAGE_NOTFOUND; + if (rc = mdb_get_page(txn, root, &mpp->mp_page)) + return rc; - DPRINTF("root page has flags 0x%X", mpp->mp_page->mp_flags); + DPRINTF("db %u root page %lu has flags 0x%X", + dbi, root, mpp->mp_page->mp_flags); if (modify) { /* For sub-databases, update main root first */ @@ -1863,6 +1870,7 @@ mdb_read_data(MDB_txn *txn, MDB_node *leaf, MDB_val *data) { MDB_page *omp; /* overflow mpage */ pgno_t pgno; + int rc; if (!F_ISSET(leaf->mn_flags, F_BIGDATA)) { data->mv_size = leaf->mn_dsize; @@ -1874,9 +1882,9 @@ mdb_read_data(MDB_txn *txn, MDB_node *leaf, MDB_val *data) */ data->mv_size = leaf->mn_dsize; memcpy(&pgno, NODEDATA(leaf), sizeof(pgno)); - if ((omp = mdb_get_page(txn, pgno)) == NULL) { + if (rc = mdb_get_page(txn, pgno, &omp)) { DPRINTF("read overflow page %lu failed", pgno); - return MDB_PAGE_NOTFOUND; + return rc; } data->mv_data = METADATA(omp); @@ -1970,8 +1978,8 @@ mdb_sibling(MDB_cursor *cursor, int move_right) assert(IS_BRANCH(parent->mp_page)); indx = NODEPTR(parent->mp_page, parent->mp_ki); - if ((mp = mdb_get_page(cursor->mc_txn, NODEPGNO(indx))) == NULL) - return MDB_PAGE_NOTFOUND; + if (rc = mdb_get_page(cursor->mc_txn, NODEPGNO(indx), &mp)) + return rc;; #if 0 mp->parent = parent->mp_page; mp->parent_index = parent->mp_ki; @@ -2681,6 +2689,7 @@ mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_node *node) } else { dbn = 2; } + DPRINTF("Sub-db %u for db %u root page %lu", dbn, dbi, db->md_root); mx->mx_dbs[dbn] = *db; mx->mx_dbxs[dbn].md_name.mv_data = NODEKEY(node); mx->mx_dbxs[dbn].md_name.mv_size = node->mn_ksize; @@ -2987,6 +2996,7 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp) MDB_page *root; MDB_pageparent npp; indx_t si = 0, di = 0; + int rc; assert(txn != NULL); assert(mpp != NULL); @@ -3010,8 +3020,8 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp) } else if (IS_BRANCH(mpp->mp_page) && NUMKEYS(mpp->mp_page) == 1) { DPUTS("collapsing root page!"); txn->mt_dbs[dbi].md_root = NODEPGNO(NODEPTR(mpp->mp_page, 0)); - if ((root = mdb_get_page(txn, txn->mt_dbs[dbi].md_root)) == NULL) - return MDB_PAGE_NOTFOUND; + if (rc = mdb_get_page(txn, txn->mt_dbs[dbi].md_root, &root)) + return rc; txn->mt_dbs[dbi].md_depth--; txn->mt_dbs[dbi].md_branch_pages--; } else @@ -3036,8 +3046,8 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp) */ DPUTS("reading right neighbor"); node = NODEPTR(mpp->mp_parent, mpp->mp_pi + 1); - if ((npp.mp_page = mdb_get_page(txn, NODEPGNO(node))) == NULL) - return MDB_PAGE_NOTFOUND; + if (rc = mdb_get_page(txn, NODEPGNO(node), &npp.mp_page)) + return rc; npp.mp_pi = mpp->mp_pi + 1; si = 0; di = NUMKEYS(mpp->mp_page); @@ -3046,8 +3056,8 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp) */ DPUTS("reading left neighbor"); node = NODEPTR(mpp->mp_parent, mpp->mp_pi - 1); - if ((npp.mp_page = mdb_get_page(txn, NODEPGNO(node))) == NULL) - return MDB_PAGE_NOTFOUND; + if (rc = mdb_get_page(txn, NODEPGNO(node), &npp.mp_page)) + return rc; npp.mp_pi = mpp->mp_pi - 1; si = NUMKEYS(npp.mp_page) - 1; di = 0; @@ -3178,7 +3188,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, top = parent; } else { ni = NODEPTR(parent->mp_page, parent->mp_ki); - top->mp_page = mdb_get_page(&mx.mx_txn, NODEPGNO(ni)); + rc = mdb_get_page(&mx.mx_txn, NODEPGNO(ni), &top->mp_page); } } }