ITS#7505 Fix mdb_update_key when key is too big

vmware
Howard Chu 12 years ago
parent cfedb365b4
commit 0b8ac92b7a
  1. 98
      libraries/liblmdb/mdb.c

@ -1009,7 +1009,7 @@ static size_t mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data);
static size_t mdb_branch_size(MDB_env *env, MDB_val *key); static size_t mdb_branch_size(MDB_env *env, MDB_val *key);
static int mdb_rebalance(MDB_cursor *mc); static int mdb_rebalance(MDB_cursor *mc);
static int mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key); static int mdb_update_key(MDB_cursor *mc, MDB_val *key);
static void mdb_cursor_pop(MDB_cursor *mc); static void mdb_cursor_pop(MDB_cursor *mc);
static int mdb_cursor_push(MDB_cursor *mc, MDB_page *mp); static int mdb_cursor_push(MDB_cursor *mc, MDB_page *mp);
@ -1120,17 +1120,22 @@ mdb_page_list(MDB_page *mp)
DKBUF; DKBUF;
nkeys = NUMKEYS(mp); nkeys = NUMKEYS(mp);
fprintf(stderr, "numkeys %d\n", nkeys); fprintf(stderr, "Page %zu numkeys %d\n", mp->mp_pgno, nkeys);
for (i=0; i<nkeys; i++) { for (i=0; i<nkeys; i++) {
node = NODEPTR(mp, i); node = NODEPTR(mp, i);
key.mv_size = node->mn_ksize; key.mv_size = node->mn_ksize;
key.mv_data = node->mn_data; key.mv_data = node->mn_data;
nsize = NODESIZE + NODEKSZ(node) + sizeof(indx_t); nsize = NODESIZE + NODEKSZ(node) + sizeof(indx_t);
if (F_ISSET(node->mn_flags, F_BIGDATA)) if (IS_BRANCH(mp)) {
nsize += sizeof(pgno_t); fprintf(stderr, "key %d: page %zu, %s\n", i, NODEPGNO(node),
else DKEY(&key));
nsize += NODEDSZ(node); } else {
fprintf(stderr, "key %d: nsize %d, %s\n", i, nsize, DKEY(&key)); if (F_ISSET(node->mn_flags, F_BIGDATA))
nsize += sizeof(pgno_t);
else
nsize += NODEDSZ(node);
fprintf(stderr, "key %d: nsize %d, %s\n", i, nsize, DKEY(&key));
}
} }
} }
@ -5782,15 +5787,18 @@ mdb_cursor_dbi(MDB_cursor *mc)
* @return 0 on success, non-zero on failure. * @return 0 on success, non-zero on failure.
*/ */
static int static int
mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key) mdb_update_key(MDB_cursor *mc, MDB_val *key)
{ {
MDB_page *mp;
MDB_node *node; MDB_node *node;
char *base; char *base;
size_t len; size_t len;
int delta, delta0; int delta, delta0;
indx_t ptr, i, numkeys; indx_t ptr, i, numkeys, indx;
DKBUF; DKBUF;
indx = mc->mc_ki[mc->mc_top];
mp = mc->mc_pg[mc->mc_top];
node = NODEPTR(mp, indx); node = NODEPTR(mp, indx);
ptr = mp->mp_ptrs[indx]; ptr = mp->mp_ptrs[indx];
#if MDB_DEBUG #if MDB_DEBUG
@ -5815,8 +5823,12 @@ mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key)
delta += (delta & 1); delta += (delta & 1);
if (delta) { if (delta) {
if (delta > 0 && SIZELEFT(mp) < delta) { if (delta > 0 && SIZELEFT(mp) < delta) {
DPRINTF("OUCH! Not enough room, delta = %d", delta); pgno_t pgno;
return MDB_PAGE_FULL; /* not enough space left, do a delete and split */
DPRINTF("Not enough room, delta = %d, splitting...", delta);
pgno = NODEPGNO(node);
mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
return mdb_page_split(mc, key, NULL, pgno, MDB_SPLIT_REPLACE);
} }
numkeys = NUMKEYS(mp); numkeys = NUMKEYS(mp);
@ -5843,15 +5855,19 @@ mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key)
return MDB_SUCCESS; return MDB_SUCCESS;
} }
static void
mdb_cursor_copy(const MDB_cursor *csrc, MDB_cursor *cdst);
/** Move a node from csrc to cdst. /** Move a node from csrc to cdst.
*/ */
static int static int
mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst) mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst)
{ {
int rc;
MDB_node *srcnode; MDB_node *srcnode;
MDB_val key, data; MDB_val key, data;
pgno_t srcpg; pgno_t srcpg;
MDB_cursor mn;
int rc;
unsigned short flags; unsigned short flags;
DKBUF; DKBUF;
@ -5912,7 +5928,11 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst)
} }
cdst->mc_snum = snum--; cdst->mc_snum = snum--;
cdst->mc_top = snum; cdst->mc_top = snum;
rc = mdb_update_key(cdst->mc_pg[cdst->mc_top], 0, &bkey); mdb_cursor_copy(cdst, &mn);
mn.mc_ki[snum] = 0;
rc = mdb_update_key(&mn, &bkey);
if (rc)
return rc;
} }
DPRINTF("moving %s node %u [%s] on page %zu to node %u on page %zu", DPRINTF("moving %s node %u [%s] on page %zu to node %u on page %zu",
@ -5968,14 +5988,19 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst)
} }
DPRINTF("update separator for source page %zu to [%s]", DPRINTF("update separator for source page %zu to [%s]",
csrc->mc_pg[csrc->mc_top]->mp_pgno, DKEY(&key)); csrc->mc_pg[csrc->mc_top]->mp_pgno, DKEY(&key));
if ((rc = mdb_update_key(csrc->mc_pg[csrc->mc_top-1], csrc->mc_ki[csrc->mc_top-1], mdb_cursor_copy(csrc, &mn);
&key)) != MDB_SUCCESS) mn.mc_snum--;
mn.mc_top--;
if ((rc = mdb_update_key(&mn, &key)) != MDB_SUCCESS)
return rc; return rc;
} }
if (IS_BRANCH(csrc->mc_pg[csrc->mc_top])) { if (IS_BRANCH(csrc->mc_pg[csrc->mc_top])) {
MDB_val nullkey; MDB_val nullkey;
indx_t ix = csrc->mc_ki[csrc->mc_top];
nullkey.mv_size = 0; nullkey.mv_size = 0;
rc = mdb_update_key(csrc->mc_pg[csrc->mc_top], 0, &nullkey); csrc->mc_ki[csrc->mc_top] = 0;
rc = mdb_update_key(csrc, &nullkey);
csrc->mc_ki[csrc->mc_top] = ix;
assert(rc == MDB_SUCCESS); assert(rc == MDB_SUCCESS);
} }
} }
@ -5991,14 +6016,19 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst)
} }
DPRINTF("update separator for destination page %zu to [%s]", DPRINTF("update separator for destination page %zu to [%s]",
cdst->mc_pg[cdst->mc_top]->mp_pgno, DKEY(&key)); cdst->mc_pg[cdst->mc_top]->mp_pgno, DKEY(&key));
if ((rc = mdb_update_key(cdst->mc_pg[cdst->mc_top-1], cdst->mc_ki[cdst->mc_top-1], mdb_cursor_copy(cdst, &mn);
&key)) != MDB_SUCCESS) mn.mc_snum--;
mn.mc_top--;
if ((rc = mdb_update_key(&mn, &key)) != MDB_SUCCESS)
return rc; return rc;
} }
if (IS_BRANCH(cdst->mc_pg[cdst->mc_top])) { if (IS_BRANCH(cdst->mc_pg[cdst->mc_top])) {
MDB_val nullkey; MDB_val nullkey;
indx_t ix = cdst->mc_ki[cdst->mc_top];
nullkey.mv_size = 0; nullkey.mv_size = 0;
rc = mdb_update_key(cdst->mc_pg[cdst->mc_top], 0, &nullkey); cdst->mc_ki[cdst->mc_top] = 0;
rc = mdb_update_key(cdst, &nullkey);
cdst->mc_ki[cdst->mc_top] = ix;
assert(rc == MDB_SUCCESS); assert(rc == MDB_SUCCESS);
} }
} }
@ -6083,7 +6113,10 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst)
mdb_node_del(csrc->mc_pg[csrc->mc_top-1], csrc->mc_ki[csrc->mc_top-1], 0); mdb_node_del(csrc->mc_pg[csrc->mc_top-1], csrc->mc_ki[csrc->mc_top-1], 0);
if (csrc->mc_ki[csrc->mc_top-1] == 0) { if (csrc->mc_ki[csrc->mc_top-1] == 0) {
key.mv_size = 0; key.mv_size = 0;
if ((rc = mdb_update_key(csrc->mc_pg[csrc->mc_top-1], 0, &key)) != MDB_SUCCESS) csrc->mc_top--;
rc = mdb_update_key(csrc, &key);
csrc->mc_top++;
if (rc)
return rc; return rc;
} }
@ -6287,14 +6320,14 @@ mdb_rebalance(MDB_cursor *mc)
* Otherwise we should try to merge them. * Otherwise we should try to merge them.
*/ */
if (PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) >= FILL_THRESHOLD && NUMKEYS(mn.mc_pg[mn.mc_top]) >= 2) if (PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) >= FILL_THRESHOLD && NUMKEYS(mn.mc_pg[mn.mc_top]) >= 2)
return mdb_node_move(&mn, mc); rc = mdb_node_move(&mn, mc);
else { /* FIXME: if (has_enough_room()) */ else {
mc->mc_flags &= ~C_INITIALIZED;
if (mc->mc_ki[ptop] == 0) if (mc->mc_ki[ptop] == 0)
return mdb_page_merge(&mn, mc); rc = mdb_page_merge(&mn, mc);
else else
return mdb_page_merge(mc, &mn); rc = mdb_page_merge(mc, &mn);
} }
return rc;
} }
/** Complete a delete operation started by #mdb_cursor_del(). */ /** Complete a delete operation started by #mdb_cursor_del(). */
@ -6320,6 +6353,7 @@ mdb_cursor_del0(MDB_cursor *mc, MDB_node *leaf)
mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], mc->mc_db->md_pad); mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], mc->mc_db->md_pad);
mc->mc_db->md_entries--; mc->mc_db->md_entries--;
rc = mdb_rebalance(mc); rc = mdb_rebalance(mc);
mc->mc_flags &= ~C_INITIALIZED;
if (rc != MDB_SUCCESS) if (rc != MDB_SUCCESS)
mc->mc_txn->mt_flags |= MDB_TXN_ERROR; mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
@ -6364,8 +6398,20 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
xdata = NULL; xdata = NULL;
} }
rc = mdb_cursor_set(&mc, key, xdata, op, &exact); rc = mdb_cursor_set(&mc, key, xdata, op, &exact);
if (rc == 0) if (rc == 0) {
/* let mdb_page_split know about this cursor if needed:
* delete will trigger a rebalance; if it needs to move
* a node from one page to another, it will have to
* update the parent's separator key(s). If the new sepkey
* is larger than the current one, the parent page may
* run out of space, triggering a split. We need this
* cursor to be consistent until the end of the rebalance.
*/
mc.mc_next = txn->mt_cursors[dbi];
txn->mt_cursors[dbi] = &mc;
rc = mdb_cursor_del(&mc, data ? 0 : MDB_NODUPDATA); rc = mdb_cursor_del(&mc, data ? 0 : MDB_NODUPDATA);
txn->mt_cursors[dbi] = mc.mc_next;
}
return rc; return rc;
} }

Loading…
Cancel
Save