Optimization for mdb_rpage_get()

The caller already knows if it's using an overflow page, so
pass the number of expected pages in instead of having to
map the page first and check the count there.
mdb.master3
Howard Chu 8 years ago
parent 53799e51da
commit ad7933ba0c
  1. 89
      libraries/liblmdb/mdb.c

@ -1567,7 +1567,14 @@ enum {
#define MDB_END_SLOT MDB_NOTLS /**< release any reader slot if #MDB_NOTLS */
static void mdb_txn_end(MDB_txn *txn, unsigned mode);
#if MDB_RPAGE_CACHE
static int mdb_page_get(MDB_cursor *mc, pgno_t pgno, int numpgs, MDB_page **mp, int *lvl);
#define MDB_PAGE_GET(mc, pg, numpgs, mp, lvl) mdb_page_get(mc, pg, numpgs, mp, lvl)
#else
static int mdb_page_get(MDB_cursor *mc, pgno_t pgno, MDB_page **mp, int *lvl);
#define MDB_PAGE_GET(mc, pg, numpgs, mp, lvl) mdb_page_get(mc, pg, mp, lvl)
#endif
static int mdb_page_search_root(MDB_cursor *mc,
MDB_val *key, int modify);
#define MDB_PS_MODIFY 1
@ -2215,7 +2222,7 @@ mark_done:
pgno_t pgno = txn->mt_dbs[i].md_root;
if (pgno == P_INVALID)
continue;
if ((rc = mdb_page_get(m0, pgno, &dp, &level)) != MDB_SUCCESS)
if ((rc = MDB_PAGE_GET(m0, pgno, 1, &dp, &level)) != MDB_SUCCESS)
break;
if ((dp->mp_flags & Mask) == pflags && level <= 1)
dp->mp_flags ^= P_KEEP;
@ -5952,7 +5959,7 @@ mdb_cursor_push(MDB_cursor *mc, MDB_page *mp)
* @return 0 on success, non-zero on failure.
*/
static int
mdb_rpage_get(MDB_txn *txn, pgno_t pg0, MDB_page **ret)
mdb_rpage_get(MDB_txn *txn, pgno_t pg0, int numpgs, MDB_page **ret)
{
MDB_env *env = txn->mt_env;
MDB_page *p;
@ -5992,9 +5999,8 @@ mdb_rpage_get(MDB_txn *txn, pgno_t pg0, MDB_page **ret)
if (x != tl[0].mid && tl[x+1].mid == pg0)
x++;
/* check for overflow size */
p = (MDB_page *)((char *)tl[x].mptr + rem * env->me_psize);
if (IS_OVERFLOW(p) && p->mp_pages + rem > tl[x].mcnt) {
id3.mcnt = p->mp_pages + rem;
if (numpgs + rem > tl[x].mcnt) {
id3.mcnt = numpgs + rem;
len = id3.mcnt * env->me_psize;
SET_OFF(off, pgno * env->me_psize);
MAP(rc, env, id3.mptr, len, off);
@ -6088,7 +6094,7 @@ retry:
if ((env->me_flags & MDB_RDONLY) && pgno + MDB_RPAGE_CHUNK-1 > txn->mt_last_pgno)
id3.mcnt = txn->mt_last_pgno + 1 - pgno;
else
id3.mcnt = MDB_RPAGE_CHUNK;
id3.mcnt = numpgs + rem > MDB_RPAGE_CHUNK ? numpgs + rem : MDB_RPAGE_CHUNK;
len = id3.mcnt * env->me_psize;
id3.mid = pgno;
@ -6097,12 +6103,8 @@ retry:
x = mdb_mid3l_search(el, pgno);
if (x <= el[0].mid && el[x].mid == pgno) {
id3.mptr = el[x].mptr;
id3.mcnt = el[x].mcnt;
/* check for overflow size */
p = (MDB_page *)((char *)id3.mptr + rem * env->me_psize);
if (IS_OVERFLOW(p) && p->mp_pages + rem > id3.mcnt) {
id3.mcnt = p->mp_pages + rem;
len = id3.mcnt * env->me_psize;
if (id3.mcnt > el[x].mcnt) {
SET_OFF(off, pgno * env->me_psize);
MAP(rc, env, id3.mptr, len, off);
if (rc)
@ -6160,16 +6162,6 @@ fail:
pthread_mutex_unlock(&env->me_rpmutex);
return rc;
}
/* check for overflow size */
p = (MDB_page *)((char *)id3.mptr + rem * env->me_psize);
if (IS_OVERFLOW(p) && p->mp_pages + rem > id3.mcnt) {
id3.mcnt = p->mp_pages + rem;
munmap(id3.mptr, len);
len = id3.mcnt * env->me_psize;
MAP(rc, env, id3.mptr, len, off);
if (rc)
goto fail;
}
mdb_mid3l_insert(el, &id3);
pthread_mutex_unlock(&env->me_rpmutex);
found:
@ -6198,7 +6190,11 @@ ok:
* @return 0 on success, non-zero on failure.
*/
static int
mdb_page_get(MDB_cursor *mc, pgno_t pgno, MDB_page **ret, int *lvl)
mdb_page_get(MDB_cursor *mc, pgno_t pgno,
#if MDB_RPAGE_CACHE
int numpgs,
#endif
MDB_page **ret, int *lvl)
{
MDB_txn *txn = mc->mc_txn;
MDB_page *p = NULL;
@ -6244,7 +6240,7 @@ mdb_page_get(MDB_cursor *mc, pgno_t pgno, MDB_page **ret, int *lvl)
mapped:
#if MDB_RPAGE_CACHE
if (MDB_REMAPPING(txn->mt_env->me_flags)) {
int rc = mdb_rpage_get(txn, pgno, &p);
int rc = mdb_rpage_get(txn, pgno, numpgs, &p);
if (rc) {
txn->mt_flags |= MDB_TXN_ERROR;
return rc;
@ -6316,7 +6312,7 @@ mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int flags)
mdb_cassert(mc, i < NUMKEYS(mp));
node = NODEPTR(mp, i);
if ((rc = mdb_page_get(mc, NODEPGNO(node), &mp, NULL)) != 0)
if ((rc = MDB_PAGE_GET(mc, NODEPGNO(node), 1, &mp, NULL)) != 0)
return rc;
mc->mc_ki[mc->mc_top] = i;
@ -6359,7 +6355,7 @@ mdb_page_search_lowest(MDB_cursor *mc)
MDB_node *node = NODEPTR(mp, 0);
int rc;
if ((rc = mdb_page_get(mc, NODEPGNO(node), &mp, NULL)) != 0)
if ((rc = MDB_PAGE_GET(mc, NODEPGNO(node), 1, &mp, NULL)) != 0)
return rc;
mc->mc_ki[mc->mc_top] = 0;
@ -6439,7 +6435,7 @@ mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags)
if (mc->mc_pg[0])
MDB_PAGE_UNREF(mc->mc_txn, mc->mc_pg[0]);
#endif
if ((rc = mdb_page_get(mc, root, &mc->mc_pg[0], NULL)) != 0)
if ((rc = MDB_PAGE_GET(mc, root, 1, &mc->mc_pg[0], NULL)) != 0)
return rc;
}
@ -6571,10 +6567,15 @@ mdb_node_read(MDB_cursor *mc, MDB_node *leaf, MDB_val *data)
*/
data->mv_size = NODEDSZ(leaf);
memcpy(&pgno, NODEDATA(leaf), sizeof(pgno));
if ((rc = mdb_page_get(mc, pgno, &omp, NULL)) != 0) {
{
#if MDB_RPAGE_CACHE
int dpages = OVPAGES(data->mv_size, mc->mc_txn->mt_env->me_psize);
#endif
if ((rc = MDB_PAGE_GET(mc, pgno, dpages, &omp, NULL)) != 0) {
DPRINTF(("read overflow page %"Yu" failed", pgno));
return rc;
}
}
data->mv_data = METADATA(omp);
MC_SET_OVPG(mc, omp);
@ -6659,7 +6660,7 @@ mdb_cursor_sibling(MDB_cursor *mc, int move_right)
MDB_PAGE_UNREF(mc->mc_txn, op);
indx = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
if ((rc = mdb_page_get(mc, NODEPGNO(indx), &mp, NULL)) != 0) {
if ((rc = MDB_PAGE_GET(mc, NODEPGNO(indx), 1, &mp, NULL)) != 0) {
/* mc will be inconsistent if caller does mc_snum++ as above */
mc->mc_flags &= ~(C_INITIALIZED|C_EOF);
return rc;
@ -7678,7 +7679,7 @@ current:
int level, ovpages, dpages = OVPAGES(data->mv_size, env->me_psize);
memcpy(&pg, olddata.mv_data, sizeof(pg));
if ((rc2 = mdb_page_get(mc, pg, &omp, &level)) != 0)
if ((rc2 = MDB_PAGE_GET(mc, pg, dpages, &omp, &level)) != 0)
return rc2;
ovpages = omp->mp_pages;
@ -7978,7 +7979,10 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
pgno_t pg;
memcpy(&pg, NODEDATA(leaf), sizeof(pg));
if ((rc = mdb_page_get(mc, pg, &omp, NULL)) ||
/* note we don't care about page count here since
* we're just adding pgno to the freelist anyway
*/
if ((rc = MDB_PAGE_GET(mc, pg, 1, &omp, NULL)) ||
(rc = mdb_ovpage_free(mc, omp)))
goto fail;
}
@ -9141,7 +9145,7 @@ mdb_rebalance(MDB_cursor *mc)
if (rc)
return rc;
mc->mc_db->md_root = NODEPGNO(NODEPTR(mp, 0));
rc = mdb_page_get(mc, mc->mc_db->md_root, &mc->mc_pg[0], NULL);
rc = MDB_PAGE_GET(mc, mc->mc_db->md_root, 1, &mc->mc_pg[0], NULL);
if (rc)
return rc;
mc->mc_db->md_depth--;
@ -9202,7 +9206,7 @@ mdb_rebalance(MDB_cursor *mc)
DPUTS("reading right neighbor");
mn.mc_ki[ptop]++;
node = NODEPTR(mc->mc_pg[ptop], mn.mc_ki[ptop]);
rc = mdb_page_get(mc, NODEPGNO(node), &mn.mc_pg[mn.mc_top], NULL);
rc = MDB_PAGE_GET(mc, NODEPGNO(node), 1, &mn.mc_pg[mn.mc_top], NULL);
if (rc)
return rc;
mn.mc_ki[mn.mc_top] = 0;
@ -9214,7 +9218,7 @@ mdb_rebalance(MDB_cursor *mc)
DPUTS("reading left neighbor");
mn.mc_ki[ptop]--;
node = NODEPTR(mc->mc_pg[ptop], mn.mc_ki[ptop]);
rc = mdb_page_get(mc, NODEPGNO(node), &mn.mc_pg[mn.mc_top], NULL);
rc = MDB_PAGE_GET(mc, NODEPGNO(node), 1, &mn.mc_pg[mn.mc_top], NULL);
if (rc)
return rc;
mn.mc_ki[mn.mc_top] = NUMKEYS(mn.mc_pg[mn.mc_top]) - 1;
@ -10008,7 +10012,7 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags)
mc.mc_txn = my->mc_txn;
mc.mc_flags = my->mc_txn->mt_flags & (C_ORIG_RDONLY|C_WRITEMAP);
rc = mdb_page_get(&mc, *pg, &mc.mc_pg[0], NULL);
rc = MDB_PAGE_GET(&mc, *pg, 1, &mc.mc_pg[0], NULL);
if (rc)
return rc;
rc = mdb_page_search_root(&mc, NULL, MDB_PS_FIRST);
@ -10042,6 +10046,8 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags)
if (ni->mn_flags & F_BIGDATA) {
MDB_page *omp;
pgno_t pg;
size_t dsize;
int dpages;
/* Need writable leaf */
if (mp != leaf) {
@ -10050,10 +10056,12 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags)
mp = leaf;
ni = NODEPTR(mp, i);
}
dsize = NODEDSZ(ni);
dpages = OVPAGES(dsize, my->mc_env->me_psize);
memcpy(&pg, NODEDATA(ni), sizeof(pg));
memcpy(NODEDATA(ni), &my->mc_next_pgno, sizeof(pgno_t));
rc = mdb_page_get(&mc, pg, &omp, NULL);
rc = MDB_PAGE_GET(&mc, pg, dpages, &omp, NULL);
if (rc)
goto done;
if (my->mc_wlen[toggle] >= MDB_WBUF) {
@ -10067,8 +10075,8 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags)
mo->mp_pgno = my->mc_next_pgno;
my->mc_next_pgno += omp->mp_pages;
my->mc_wlen[toggle] += my->mc_env->me_psize;
if (omp->mp_pages > 1) {
my->mc_olen[toggle] = my->mc_env->me_psize * (omp->mp_pages - 1);
if (dpages > 1) {
my->mc_olen[toggle] = my->mc_env->me_psize * (dpages - 1);
my->mc_over[toggle] = (char *)omp + my->mc_env->me_psize;
rc = mdb_env_cthr_toggle(my, 1);
if (rc)
@ -10103,7 +10111,7 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags)
again:
ni = NODEPTR(mp, mc.mc_ki[mc.mc_top]);
pg = NODEPGNO(ni);
rc = mdb_page_get(&mc, pg, &mp, NULL);
rc = MDB_PAGE_GET(&mc, pg, 1, &mp, NULL);
if (rc)
goto done;
mc.mc_top++;
@ -10749,7 +10757,7 @@ mdb_drop0(MDB_cursor *mc, int subs)
if (MDB_REMAPPING(mc->mc_txn->mt_env->me_flags)) {
/* bump refcount for mx's pages */
for (i=0; i<mc->mc_snum; i++)
mdb_page_get(&mx, mc->mc_pg[i]->mp_pgno, &mx.mc_pg[i], NULL);
MDB_PAGE_GET(&mx, mc->mc_pg[i]->mp_pgno, 1, &mx.mc_pg[i], NULL);
}
while (mc->mc_snum > 0) {
@ -10762,7 +10770,8 @@ mdb_drop0(MDB_cursor *mc, int subs)
MDB_page *omp;
pgno_t pg;
memcpy(&pg, NODEDATA(ni), sizeof(pg));
rc = mdb_page_get(mc, pg, &omp, NULL);
/* page count is irrelevant here */
rc = MDB_PAGE_GET(mc, pg, 1, &omp, NULL);
if (rc != 0)
goto done;
mdb_cassert(mc, IS_OVERFLOW(omp));

Loading…
Cancel
Save