Tweak mdb_page_malloc(),mdb_page_get() semantics.

mdb_page_malloc(): Add "number of pages" parameter.
mdb_page_get(): Add output param for how page was found.
Do not set return params on error.
mdb_cursor_put(): Catch mdb_page_get() error.

Prepares for next commit, no change in caller behavior other
than on mdb_page_get error.
vmware
Hallvard Furuseth 12 years ago
parent 2eb50b1d2e
commit 5bdf2aae6e
  1. 85
      libraries/liblmdb/mdb.c

@ -999,7 +999,7 @@ static int mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp);
static int mdb_page_new(MDB_cursor *mc, uint32_t flags, int num, MDB_page **mp); static int mdb_page_new(MDB_cursor *mc, uint32_t flags, int num, MDB_page **mp);
static int mdb_page_touch(MDB_cursor *mc); static int mdb_page_touch(MDB_cursor *mc);
static int mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **mp); static int mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **mp, int *lvl);
static int mdb_page_search_root(MDB_cursor *mc, static int mdb_page_search_root(MDB_cursor *mc,
MDB_val *key, int modify); MDB_val *key, int modify);
#define MDB_PS_MODIFY 1 #define MDB_PS_MODIFY 1
@ -1252,19 +1252,27 @@ mdb_dcmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b)
return txn->mt_dbxs[dbi].md_dcmp(a, b); return txn->mt_dbxs[dbi].md_dcmp(a, b);
} }
/** Allocate a single page. /** Allocate a page.
* Re-use old malloc'd pages first, otherwise just malloc. * Re-use old malloc'd pages first for singletons, otherwise just malloc.
*/ */
static MDB_page * static MDB_page *
mdb_page_malloc(MDB_cursor *mc) { mdb_page_malloc(MDB_cursor *mc, unsigned num)
MDB_page *ret; {
size_t sz = mc->mc_txn->mt_env->me_psize; MDB_env *env = mc->mc_txn->mt_env;
if ((ret = mc->mc_txn->mt_env->me_dpages) != NULL) { MDB_page *ret = env->me_dpages;
VGMEMP_ALLOC(mc->mc_txn->mt_env, ret, sz); size_t sz = env->me_psize;
if (num == 1) {
if (ret) {
VGMEMP_ALLOC(env, ret, sz);
VGMEMP_DEFINED(ret, sizeof(ret->mp_next)); VGMEMP_DEFINED(ret, sizeof(ret->mp_next));
mc->mc_txn->mt_env->me_dpages = ret->mp_next; env->me_dpages = ret->mp_next;
} else if ((ret = malloc(sz)) != NULL) { return ret;
VGMEMP_ALLOC(mc->mc_txn->mt_env, ret, sz); }
} else {
sz *= num;
}
if ((ret = malloc(sz)) != NULL) {
VGMEMP_ALLOC(env, ret, sz);
} }
return ret; return ret;
} }
@ -1520,17 +1528,8 @@ none:
np = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno); np = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
np->mp_pgno = pgno; np->mp_pgno = pgno;
} else { } else {
if (txn->mt_env->me_dpages && num == 1) { if (!(np = mdb_page_malloc(mc, num)))
np = txn->mt_env->me_dpages;
VGMEMP_ALLOC(txn->mt_env, np, txn->mt_env->me_psize);
VGMEMP_DEFINED(np, sizeof(np->mp_next));
txn->mt_env->me_dpages = np->mp_next;
} else {
size_t sz = txn->mt_env->me_psize * num;
if ((np = malloc(sz)) == NULL)
return ENOMEM; return ENOMEM;
VGMEMP_ALLOC(txn->mt_env, np, sz);
}
if (pgno == P_INVALID) { if (pgno == P_INVALID) {
np->mp_pgno = txn->mt_next_pgno; np->mp_pgno = txn->mt_next_pgno;
txn->mt_next_pgno += num; txn->mt_next_pgno += num;
@ -1653,7 +1652,7 @@ finish:
} }
assert(mc->mc_txn->mt_u.dirty_list[0].mid < MDB_IDL_UM_MAX); assert(mc->mc_txn->mt_u.dirty_list[0].mid < MDB_IDL_UM_MAX);
/* No - copy it */ /* No - copy it */
np = mdb_page_malloc(mc); np = mdb_page_malloc(mc, 1);
if (!np) if (!np)
return ENOMEM; return ENOMEM;
memcpy(np, mp, mc->mc_txn->mt_env->me_psize); memcpy(np, mp, mc->mc_txn->mt_env->me_psize);
@ -4032,17 +4031,20 @@ mdb_cursor_push(MDB_cursor *mc, MDB_page *mp)
* @param[in] txn the transaction for this access. * @param[in] txn the transaction for this access.
* @param[in] pgno the page number for the page to retrieve. * @param[in] pgno the page number for the page to retrieve.
* @param[out] ret address of a pointer where the page's address will be stored. * @param[out] ret address of a pointer where the page's address will be stored.
* @param[out] lvl dirty_list inheritance level of found page. 1=current txn, 0=mapped page.
* @return 0 on success, non-zero on failure. * @return 0 on success, non-zero on failure.
*/ */
static int static int
mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **ret) mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **ret, int *lvl)
{ {
MDB_page *p = NULL; MDB_page *p = NULL;
int level;
if (!((txn->mt_flags & MDB_TXN_RDONLY) | if (!((txn->mt_flags & MDB_TXN_RDONLY) |
(txn->mt_env->me_flags & MDB_WRITEMAP))) (txn->mt_env->me_flags & MDB_WRITEMAP)))
{ {
MDB_txn *tx2 = txn; MDB_txn *tx2 = txn;
level = 1;
do { do {
MDB_ID2L dl = tx2->mt_u.dirty_list; MDB_ID2L dl = tx2->mt_u.dirty_list;
if (dl[0].mid) { if (dl[0].mid) {
@ -4052,19 +4054,24 @@ mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **ret)
goto done; goto done;
} }
} }
level++;
} while ((tx2 = tx2->mt_parent) != NULL); } while ((tx2 = tx2->mt_parent) != NULL);
} }
if (pgno < txn->mt_next_pgno) { if (pgno < txn->mt_next_pgno) {
level = 0;
p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno); p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
} else { } else {
DPRINTF("page %zu not found", pgno); DPRINTF("page %zu not found", pgno);
assert(p != NULL); assert(p != NULL);
return MDB_PAGE_NOTFOUND;
} }
done: done:
*ret = p; *ret = p;
return (p != NULL) ? MDB_SUCCESS : MDB_PAGE_NOTFOUND; if (lvl)
*lvl = level;
return MDB_SUCCESS;
} }
/** Search for the page a given key should be in. /** Search for the page a given key should be in.
@ -4118,7 +4125,7 @@ mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify)
assert(i < NUMKEYS(mp)); assert(i < NUMKEYS(mp));
node = NODEPTR(mp, i); node = NODEPTR(mp, i);
if ((rc = mdb_page_get(mc->mc_txn, NODEPGNO(node), &mp))) if ((rc = mdb_page_get(mc->mc_txn, NODEPGNO(node), &mp, NULL)) != 0)
return rc; return rc;
mc->mc_ki[mc->mc_top] = i; mc->mc_ki[mc->mc_top] = i;
@ -4157,7 +4164,7 @@ mdb_page_search_lowest(MDB_cursor *mc)
MDB_node *node = NODEPTR(mp, 0); MDB_node *node = NODEPTR(mp, 0);
int rc; int rc;
if ((rc = mdb_page_get(mc->mc_txn, NODEPGNO(node), &mp))) if ((rc = mdb_page_get(mc->mc_txn, NODEPGNO(node), &mp, NULL)) != 0)
return rc; return rc;
mc->mc_ki[mc->mc_top] = 0; mc->mc_ki[mc->mc_top] = 0;
@ -4235,7 +4242,7 @@ mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags)
assert(root > 1); assert(root > 1);
if (!mc->mc_pg[0] || mc->mc_pg[0]->mp_pgno != root) if (!mc->mc_pg[0] || mc->mc_pg[0]->mp_pgno != root)
if ((rc = mdb_page_get(mc->mc_txn, root, &mc->mc_pg[0]))) if ((rc = mdb_page_get(mc->mc_txn, root, &mc->mc_pg[0], NULL)) != 0)
return rc; return rc;
mc->mc_snum = 1; mc->mc_snum = 1;
@ -4278,7 +4285,7 @@ mdb_node_read(MDB_txn *txn, MDB_node *leaf, MDB_val *data)
*/ */
data->mv_size = NODEDSZ(leaf); data->mv_size = NODEDSZ(leaf);
memcpy(&pgno, NODEDATA(leaf), sizeof(pgno)); memcpy(&pgno, NODEDATA(leaf), sizeof(pgno));
if ((rc = mdb_page_get(txn, pgno, &omp))) { if ((rc = mdb_page_get(txn, pgno, &omp, NULL)) != 0) {
DPRINTF("read overflow page %zu failed", pgno); DPRINTF("read overflow page %zu failed", pgno);
return rc; return rc;
} }
@ -4355,7 +4362,7 @@ mdb_cursor_sibling(MDB_cursor *mc, int move_right)
assert(IS_BRANCH(mc->mc_pg[mc->mc_top])); assert(IS_BRANCH(mc->mc_pg[mc->mc_top]));
indx = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); indx = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
if ((rc = mdb_page_get(mc->mc_txn, NODEPGNO(indx), &mp))) if ((rc = mdb_page_get(mc->mc_txn, NODEPGNO(indx), &mp, NULL) != 0))
return rc; return rc;
mdb_cursor_push(mc, mp); mdb_cursor_push(mc, mp);
@ -5194,7 +5201,8 @@ current:
dpages = OVPAGES(data->mv_size, mc->mc_txn->mt_env->me_psize); dpages = OVPAGES(data->mv_size, mc->mc_txn->mt_env->me_psize);
memcpy(&pg, NODEDATA(leaf), sizeof(pg)); memcpy(&pg, NODEDATA(leaf), sizeof(pg));
mdb_page_get(mc->mc_txn, pg, &omp); if ((rc2 = mdb_page_get(mc->mc_txn, pg, &omp, NULL)) != 0)
return rc2;
ovpages = omp->mp_pages; ovpages = omp->mp_pages;
/* Is the ov page writable and large enough? */ /* Is the ov page writable and large enough? */
@ -6415,8 +6423,8 @@ mdb_rebalance(MDB_cursor *mc)
DPUTS("collapsing root page!"); DPUTS("collapsing root page!");
mdb_midl_append(&mc->mc_txn->mt_free_pgs, mp->mp_pgno); mdb_midl_append(&mc->mc_txn->mt_free_pgs, mp->mp_pgno);
mc->mc_db->md_root = NODEPGNO(NODEPTR(mp, 0)); mc->mc_db->md_root = NODEPGNO(NODEPTR(mp, 0));
if ((rc = mdb_page_get(mc->mc_txn, mc->mc_db->md_root, rc = mdb_page_get(mc->mc_txn,mc->mc_db->md_root,&mc->mc_pg[0],NULL);
&mc->mc_pg[0]))) if (rc)
return rc; return rc;
mc->mc_db->md_depth--; mc->mc_db->md_depth--;
mc->mc_db->md_branch_pages--; mc->mc_db->md_branch_pages--;
@ -6469,7 +6477,8 @@ mdb_rebalance(MDB_cursor *mc)
DPUTS("reading right neighbor"); DPUTS("reading right neighbor");
mn.mc_ki[ptop]++; mn.mc_ki[ptop]++;
node = NODEPTR(mc->mc_pg[ptop], mn.mc_ki[ptop]); node = NODEPTR(mc->mc_pg[ptop], mn.mc_ki[ptop]);
if ((rc = mdb_page_get(mc->mc_txn, NODEPGNO(node), &mn.mc_pg[mn.mc_top]))) rc = mdb_page_get(mc->mc_txn,NODEPGNO(node),&mn.mc_pg[mn.mc_top],NULL);
if (rc)
return rc; return rc;
mn.mc_ki[mn.mc_top] = 0; mn.mc_ki[mn.mc_top] = 0;
mc->mc_ki[mc->mc_top] = NUMKEYS(mc->mc_pg[mc->mc_top]); mc->mc_ki[mc->mc_top] = NUMKEYS(mc->mc_pg[mc->mc_top]);
@ -6479,7 +6488,8 @@ mdb_rebalance(MDB_cursor *mc)
DPUTS("reading left neighbor"); DPUTS("reading left neighbor");
mn.mc_ki[ptop]--; mn.mc_ki[ptop]--;
node = NODEPTR(mc->mc_pg[ptop], mn.mc_ki[ptop]); node = NODEPTR(mc->mc_pg[ptop], mn.mc_ki[ptop]);
if ((rc = mdb_page_get(mc->mc_txn, NODEPGNO(node), &mn.mc_pg[mn.mc_top]))) rc = mdb_page_get(mc->mc_txn,NODEPGNO(node),&mn.mc_pg[mn.mc_top],NULL);
if (rc)
return rc; return rc;
mn.mc_ki[mn.mc_top] = NUMKEYS(mn.mc_pg[mn.mc_top]) - 1; mn.mc_ki[mn.mc_top] = NUMKEYS(mn.mc_pg[mn.mc_top]) - 1;
mc->mc_ki[mc->mc_top] = 0; mc->mc_ki[mc->mc_top] = 0;
@ -6518,7 +6528,7 @@ mdb_cursor_del0(MDB_cursor *mc, MDB_node *leaf)
pgno_t pg; pgno_t pg;
memcpy(&pg, NODEDATA(leaf), sizeof(pg)); memcpy(&pg, NODEDATA(leaf), sizeof(pg));
if ((rc = mdb_page_get(mc->mc_txn, pg, &omp)) != 0) if ((rc = mdb_page_get(mc->mc_txn, pg, &omp, NULL)) != 0)
return rc; return rc;
assert(IS_OVERFLOW(omp)); assert(IS_OVERFLOW(omp));
ovpages = omp->mp_pages; ovpages = omp->mp_pages;
@ -6863,7 +6873,7 @@ newsep:
/* Move half of the keys to the right sibling. */ /* Move half of the keys to the right sibling. */
/* grab a page to hold a temporary copy */ /* grab a page to hold a temporary copy */
copy = mdb_page_malloc(mc); copy = mdb_page_malloc(mc, 1);
if (copy == NULL) if (copy == NULL)
return ENOMEM; return ENOMEM;
@ -7287,7 +7297,8 @@ mdb_drop0(MDB_cursor *mc, int subs)
MDB_page *omp; MDB_page *omp;
pgno_t pg; pgno_t pg;
memcpy(&pg, NODEDATA(ni), sizeof(pg)); memcpy(&pg, NODEDATA(ni), sizeof(pg));
if ((rc = mdb_page_get(mc->mc_txn, pg, &omp)) != 0) rc = mdb_page_get(mc->mc_txn, pg, &omp, NULL);
if (rc != 0)
return rc; return rc;
assert(IS_OVERFLOW(omp)); assert(IS_OVERFLOW(omp));
ovpages = omp->mp_pages; ovpages = omp->mp_pages;

Loading…
Cancel
Save