Scaled back on overflow page work

Still keeping page header at top of overflow page for now
mdb.master3
Howard Chu 8 years ago
parent f0f985fa93
commit 757378fc1d
  1. 75
      libraries/liblmdb/mdb.c

@ -942,9 +942,9 @@ enum {
* sorted #mp_ptrs[] entries referring to them. Exception: #P_LEAF2 pages
* omit mp_ptrs and pack sorted #MDB_DUPFIXED values after the page header.
*
* #P_OVERFLOW records occupy one or more contiguous pages that contain
* pure data with no page header. They hold the real data of #F_BIGDATA nodes,
* and the node stores what would have gone in a page header.
* #P_OVERFLOW records occupy one or more contiguous pages where only the
* first has a page header. They hold the real data of #F_BIGDATA nodes,
* and the node stores the pgno and number of pages used by the record.
*
* #P_SUBP sub-pages are small leaf "pages" with duplicate data.
* A node with flag #F_DUPDATA but not #F_SUBDATA contains a sub-page.
@ -1004,7 +1004,7 @@ typedef struct MDB_page {
} MDB_page;
/** Size of the page header, excluding dynamic data at the end */
#define PAGEHDRSZ sizeof(MDB_page_header)
#define PAGEHDRSZ ((unsigned)sizeof(MDB_page_header))
/** Address of first usable data byte in a page, after the header */
#define METADATA(p) ((void *)((char *)(p) + PAGEHDRSZ))
@ -1300,8 +1300,10 @@ struct MDB_txn {
/** For read txns: This thread/txn's reader table slot, or NULL. */
MDB_reader *reader;
} mt_u;
#if OVERFLOW_NOTYET
/** The sorted list of dirty overflow pages. */
MDB_ID2L mt_dirty_ovs;
#endif
/** Array of records for each DB known in the environment. */
MDB_dbx *mt_dbxs;
/** Array of MDB_db records for each known DB */
@ -1579,7 +1581,7 @@ typedef struct MDB_ntxn {
#define TXN_DBI_CHANGED(txn, dbi) \
((txn)->mt_dbiseqs[dbi] != (txn)->mt_env->me_dbiseqs[dbi])
static int mdb_page_alloc(MDB_cursor *mc, int num, int ov, MDB_page **mp);
static int mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp);
static int mdb_page_new(MDB_cursor *mc, uint32_t flags, int num, MDB_page **mp);
static int mdb_page_touch(MDB_cursor *mc);
@ -2435,7 +2437,7 @@ mdb_find_oldest(MDB_txn *txn)
/** Add a page to the txn's dirty list */
static void
mdb_page_dirty(MDB_txn *txn, MDB_page *mp, int ov)
mdb_page_dirty(MDB_txn *txn, MDB_page *mp)
{
MDB_ID2 mid;
int rc, (*insert)(MDB_ID2L, MDB_ID2 *);
@ -2447,13 +2449,9 @@ mdb_page_dirty(MDB_txn *txn, MDB_page *mp, int ov)
}
mid.mid = mp->mp_pgno;
mid.mptr = mp;
if (ov) {
rc = mdb_mid2l_insert(txn->mt_dirty_ovs, &mid);
} else {
rc = insert(txn->mt_u.dirty_list, &mid);
txn->mt_dirty_room--;
}
rc = insert(txn->mt_u.dirty_list, &mid);
mdb_tassert(txn, rc == 0);
txn->mt_dirty_room--;
}
/** Allocate page numbers and memory for writing. Maintain me_pglast,
@ -2474,7 +2472,7 @@ mdb_page_dirty(MDB_txn *txn, MDB_page *mp, int ov)
* @return 0 on success, non-zero on failure.
*/
static int
mdb_page_alloc(MDB_cursor *mc, int num, int ov, MDB_page **mp)
mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp)
{
#ifdef MDB_PARANOID /* Seems like we can ignore this now */
/* Get at most <Max_retries> more freeDB records once me_pghead
@ -2497,6 +2495,8 @@ mdb_page_alloc(MDB_cursor *mc, int num, int ov, MDB_page **mp)
MDB_cursor_op op;
MDB_cursor m2;
int found_old = 0;
#if OVERFLOW_NOTYET
MDB_dovpage *dph = NULL;
if (ov) {
@ -2508,6 +2508,7 @@ mdb_page_alloc(MDB_cursor *mc, int num, int ov, MDB_page **mp)
return ENOMEM;
dph = malloc(sizeof(MDB_dovpage));
}
#endif
/* If there are any loose pages, just use them */
if (num == 1 && txn->mt_loose_pgs) {
@ -2515,11 +2516,13 @@ mdb_page_alloc(MDB_cursor *mc, int num, int ov, MDB_page **mp)
txn->mt_loose_pgs = NEXT_LOOSE_PAGE(np);
txn->mt_loose_count--;
DPRINTF(("db %d use loose page %"Yu, DDBI(mc), np->mp_pgno));
#if OVERFLOW_NOTYET
if (ov) {
dph->mp_hdr = np->mp_hdr;
dph->mp_ptr = np;
np = (MDB_page *)dph;
}
#endif
*mp = np;
return MDB_SUCCESS;
}
@ -2673,19 +2676,25 @@ search_done:
}
np->mp_pgno = pgno;
np->mp_txnid = txn->mt_txnid;
#if OVERFLOW_NOTYET
if (ov) {
dph->mp_hdr = np->mp_hdr;
dph->mp_ptr = np;
np = (MDB_page *)dph;
}
mdb_page_dirty(txn, np, ov);
#else
mdb_page_dirty(txn, np);
#endif
*mp = np;
return MDB_SUCCESS;
fail:
#if OVERFLOW_NOTYET
if (dph)
free(dph);
#endif
txn->mt_flags |= MDB_TXN_ERROR;
return rc;
}
@ -2723,7 +2732,7 @@ mdb_page_copy(MDB_page *dst, MDB_page *src, unsigned int psize)
* mp wasn't spilled.
*/
static int
mdb_page_unspill(MDB_txn *txn, MDB_page *mp, int num, int ov, MDB_page **ret)
mdb_page_unspill(MDB_txn *txn, MDB_page *mp, MDB_page **ret)
{
MDB_env *env = txn->mt_env;
const MDB_txn *tx2;
@ -2736,15 +2745,20 @@ mdb_page_unspill(MDB_txn *txn, MDB_page *mp, int num, int ov, MDB_page **ret)
x = mdb_midl_search(tx2->mt_spill_pgs, pn);
if (x <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[x] == pn) {
MDB_page *np;
int num;
if (txn->mt_dirty_room == 0)
return MDB_TXN_FULL;
if (IS_OVERFLOW(mp))
num = mp->mp_pages;
else
num = 1;
if (env->me_flags & MDB_WRITEMAP) {
np = mp;
} else {
np = mdb_page_malloc(txn, num, 1);
if (!np)
return ENOMEM;
if (ov)
if (num > 1)
memcpy(np, mp, num * env->me_psize);
else
mdb_page_copy(np, mp, env->me_psize);
@ -2762,9 +2776,8 @@ mdb_page_unspill(MDB_txn *txn, MDB_page *mp, int num, int ov, MDB_page **ret)
* page remains spilled until child commits
*/
mdb_page_dirty(txn, np, ov);
if (!ov)
np->mp_flags |= P_DIRTY;
mdb_page_dirty(txn, np);
np->mp_flags |= P_DIRTY;
*ret = np;
break;
}
@ -2789,14 +2802,14 @@ mdb_page_touch(MDB_cursor *mc)
if (!F_ISSET(mp->mp_flags, P_DIRTY)) {
if (txn->mt_flags & MDB_TXN_SPILLS) {
np = NULL;
rc = mdb_page_unspill(txn, mp, 1, 0, &np);
rc = mdb_page_unspill(txn, mp, &np);
if (rc)
goto fail;
if (np)
goto done;
}
if ((rc = mdb_midl_need(&txn->mt_free_pgs, 1)) ||
(rc = mdb_page_alloc(mc, 1, 0, &np)))
(rc = mdb_page_alloc(mc, 1, &np)))
goto fail;
pgno = np->mp_pgno;
DPRINTF(("touched db %d page %"Yu" -> %"Yu, DDBI(mc),
@ -3148,7 +3161,9 @@ mdb_txn_renew0(MDB_txn *txn)
txn->mt_free_pgs = env->me_free_pgs;
txn->mt_free_pgs[0] = 0;
txn->mt_spill_pgs = NULL;
#if OVERFLOW_NOTYET
txn->mt_dirty_ovs = NULL;
#endif
env->me_txn = txn;
memcpy(txn->mt_dbiseqs, env->me_dbiseqs, env->me_maxdbs * sizeof(unsigned int));
}
@ -3273,7 +3288,9 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
txn->mt_dirty_room = parent->mt_dirty_room;
txn->mt_u.dirty_list[0].mid = 0;
txn->mt_spill_pgs = NULL;
#if OVERFLOW_NOTYET
txn->mt_dirty_ovs = NULL;
#endif
txn->mt_next_pgno = parent->mt_next_pgno;
parent->mt_flags |= MDB_TXN_HAS_CHILD;
parent->mt_child = txn;
@ -3434,7 +3451,9 @@ mdb_txn_end(MDB_txn *txn, unsigned mode)
env->me_pgstate = ((MDB_ntxn *)txn)->mnt_pgstate;
mdb_midl_free(txn->mt_free_pgs);
mdb_midl_free(txn->mt_spill_pgs);
#if OVERFLOW_NOTYET
mdb_mid2l_free(txn->mt_dirty_ovs);
#endif
free(txn->mt_u.dirty_list);
}
@ -7981,7 +8000,7 @@ prep_subDB:
dummy.md_entries = NUMKEYS(fp);
xdata.mv_size = sizeof(MDB_db);
xdata.mv_data = &dummy;
if ((rc = mdb_page_alloc(mc, 1, 0, &mp)))
if ((rc = mdb_page_alloc(mc, 1, &mp)))
return rc;
offset = env->me_psize - olddata.mv_size;
flags |= F_DUPDATA|F_SUBDATA;
@ -8031,13 +8050,13 @@ current:
if (!(omp->mp_flags & P_DIRTY) &&
(level || (env->me_flags & MDB_WRITEMAP)))
{
rc = mdb_page_unspill(mc->mc_txn, omp, ovpages, 1, &omp);
rc = mdb_page_unspill(mc->mc_txn, omp, &omp);
if (rc)
return rc;
level = 0; /* dirty in this txn or clean */
}
/* Is it dirty? */
if (ovp.op_txnid == mc->mc_txn->mt_txnid) {
if (omp->mp_flags & P_DIRTY) {
/* yes, overwrite it. Note in this case we don't
* bother to try shrinking the page if the new data
* is smaller than the overflow threshold.
@ -8063,10 +8082,12 @@ current:
* Copy end of page, adjusting alignment so
* compiler may copy words instead of bytes.
*/
off = data->mv_size & -sizeof(size_t);
off = (PAGEHDRSZ + data->mv_size) & -sizeof(size_t);
memcpy((size_t *)((char *)np + off),
(size_t *)((char *)omp + off), sz - off);
sz = PAGEHDRSZ;
}
memcpy(np, omp, sz); /* Copy beginning of page */
omp = np;
}
SETDSZ(leaf, data->mv_size);
@ -8351,7 +8372,7 @@ mdb_page_new(MDB_cursor *mc, uint32_t flags, int num, MDB_page **mp)
MDB_page *np;
int rc;
if ((rc = mdb_page_alloc(mc, num, flags & P_OVERFLOW, &np)))
if ((rc = mdb_page_alloc(mc, num, &np)))
return rc;
DPRINTF(("allocated new mpage %"Yu", page size %u",
np->mp_pgno, mc->mc_txn->mt_env->me_psize));
@ -8540,7 +8561,9 @@ update:
else
memcpy(ndata, data->mv_data, data->mv_size);
} else {
ndata = ((MDB_dovpage *)ofp)->mp_ptr;
MDB_ovpage ovp = {ofp->mp_pgno, mc->mc_txn->mt_txnid, ofp->mp_pages};
memcpy(ndata, &ovp, sizeof(MDB_ovpage));
ndata = METADATA(ofp);
if (F_ISSET(flags, MDB_RESERVE))
data->mv_data = ndata;
else

Loading…
Cancel
Save