diff --git a/libraries/liblmdb/mdb.c b/libraries/liblmdb/mdb.c index 575e494..061d752 100644 --- a/libraries/liblmdb/mdb.c +++ b/libraries/liblmdb/mdb.c @@ -1009,7 +1009,7 @@ static size_t mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data); static size_t mdb_branch_size(MDB_env *env, MDB_val *key); static int mdb_rebalance(MDB_cursor *mc); -static int mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key); +static int mdb_update_key(MDB_cursor *mc, MDB_val *key); static void mdb_cursor_pop(MDB_cursor *mc); static int mdb_cursor_push(MDB_cursor *mc, MDB_page *mp); @@ -1120,17 +1120,22 @@ mdb_page_list(MDB_page *mp) DKBUF; nkeys = NUMKEYS(mp); - fprintf(stderr, "numkeys %d\n", nkeys); + fprintf(stderr, "Page %zu numkeys %d\n", mp->mp_pgno, nkeys); for (i=0; imn_ksize; key.mv_data = node->mn_data; nsize = NODESIZE + NODEKSZ(node) + sizeof(indx_t); - if (F_ISSET(node->mn_flags, F_BIGDATA)) - nsize += sizeof(pgno_t); - else - nsize += NODEDSZ(node); - fprintf(stderr, "key %d: nsize %d, %s\n", i, nsize, DKEY(&key)); + if (IS_BRANCH(mp)) { + fprintf(stderr, "key %d: page %zu, %s\n", i, NODEPGNO(node), + DKEY(&key)); + } else { + if (F_ISSET(node->mn_flags, F_BIGDATA)) + nsize += sizeof(pgno_t); + else + nsize += NODEDSZ(node); + fprintf(stderr, "key %d: nsize %d, %s\n", i, nsize, DKEY(&key)); + } } } @@ -5782,15 +5787,18 @@ mdb_cursor_dbi(MDB_cursor *mc) * @return 0 on success, non-zero on failure. */ static int -mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key) +mdb_update_key(MDB_cursor *mc, MDB_val *key) { + MDB_page *mp; MDB_node *node; char *base; size_t len; int delta, delta0; - indx_t ptr, i, numkeys; + indx_t ptr, i, numkeys, indx; DKBUF; + indx = mc->mc_ki[mc->mc_top]; + mp = mc->mc_pg[mc->mc_top]; node = NODEPTR(mp, indx); ptr = mp->mp_ptrs[indx]; #if MDB_DEBUG @@ -5815,8 +5823,12 @@ mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key) delta += (delta & 1); if (delta) { if (delta > 0 && SIZELEFT(mp) < delta) { - DPRINTF("OUCH! Not enough room, delta = %d", delta); - return MDB_PAGE_FULL; + pgno_t pgno; + /* not enough space left, do a delete and split */ + DPRINTF("Not enough room, delta = %d, splitting...", delta); + pgno = NODEPGNO(node); + mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0); + return mdb_page_split(mc, key, NULL, pgno, MDB_SPLIT_REPLACE); } numkeys = NUMKEYS(mp); @@ -5843,15 +5855,19 @@ mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key) return MDB_SUCCESS; } +static void +mdb_cursor_copy(const MDB_cursor *csrc, MDB_cursor *cdst); + /** Move a node from csrc to cdst. */ static int mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst) { - int rc; MDB_node *srcnode; MDB_val key, data; pgno_t srcpg; + MDB_cursor mn; + int rc; unsigned short flags; DKBUF; @@ -5912,7 +5928,11 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst) } cdst->mc_snum = snum--; cdst->mc_top = snum; - rc = mdb_update_key(cdst->mc_pg[cdst->mc_top], 0, &bkey); + mdb_cursor_copy(cdst, &mn); + mn.mc_ki[snum] = 0; + rc = mdb_update_key(&mn, &bkey); + if (rc) + return rc; } DPRINTF("moving %s node %u [%s] on page %zu to node %u on page %zu", @@ -5968,14 +5988,19 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst) } DPRINTF("update separator for source page %zu to [%s]", csrc->mc_pg[csrc->mc_top]->mp_pgno, DKEY(&key)); - if ((rc = mdb_update_key(csrc->mc_pg[csrc->mc_top-1], csrc->mc_ki[csrc->mc_top-1], - &key)) != MDB_SUCCESS) + mdb_cursor_copy(csrc, &mn); + mn.mc_snum--; + mn.mc_top--; + if ((rc = mdb_update_key(&mn, &key)) != MDB_SUCCESS) return rc; } if (IS_BRANCH(csrc->mc_pg[csrc->mc_top])) { MDB_val nullkey; + indx_t ix = csrc->mc_ki[csrc->mc_top]; nullkey.mv_size = 0; - rc = mdb_update_key(csrc->mc_pg[csrc->mc_top], 0, &nullkey); + csrc->mc_ki[csrc->mc_top] = 0; + rc = mdb_update_key(csrc, &nullkey); + csrc->mc_ki[csrc->mc_top] = ix; assert(rc == MDB_SUCCESS); } } @@ -5991,14 +6016,19 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst) } DPRINTF("update separator for destination page %zu to [%s]", cdst->mc_pg[cdst->mc_top]->mp_pgno, DKEY(&key)); - if ((rc = mdb_update_key(cdst->mc_pg[cdst->mc_top-1], cdst->mc_ki[cdst->mc_top-1], - &key)) != MDB_SUCCESS) + mdb_cursor_copy(cdst, &mn); + mn.mc_snum--; + mn.mc_top--; + if ((rc = mdb_update_key(&mn, &key)) != MDB_SUCCESS) return rc; } if (IS_BRANCH(cdst->mc_pg[cdst->mc_top])) { MDB_val nullkey; + indx_t ix = cdst->mc_ki[cdst->mc_top]; nullkey.mv_size = 0; - rc = mdb_update_key(cdst->mc_pg[cdst->mc_top], 0, &nullkey); + cdst->mc_ki[cdst->mc_top] = 0; + rc = mdb_update_key(cdst, &nullkey); + cdst->mc_ki[cdst->mc_top] = ix; assert(rc == MDB_SUCCESS); } } @@ -6083,7 +6113,10 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst) mdb_node_del(csrc->mc_pg[csrc->mc_top-1], csrc->mc_ki[csrc->mc_top-1], 0); if (csrc->mc_ki[csrc->mc_top-1] == 0) { key.mv_size = 0; - if ((rc = mdb_update_key(csrc->mc_pg[csrc->mc_top-1], 0, &key)) != MDB_SUCCESS) + csrc->mc_top--; + rc = mdb_update_key(csrc, &key); + csrc->mc_top++; + if (rc) return rc; } @@ -6287,14 +6320,14 @@ mdb_rebalance(MDB_cursor *mc) * Otherwise we should try to merge them. */ if (PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) >= FILL_THRESHOLD && NUMKEYS(mn.mc_pg[mn.mc_top]) >= 2) - return mdb_node_move(&mn, mc); - else { /* FIXME: if (has_enough_room()) */ - mc->mc_flags &= ~C_INITIALIZED; + rc = mdb_node_move(&mn, mc); + else { if (mc->mc_ki[ptop] == 0) - return mdb_page_merge(&mn, mc); + rc = mdb_page_merge(&mn, mc); else - return mdb_page_merge(mc, &mn); + rc = mdb_page_merge(mc, &mn); } + return rc; } /** Complete a delete operation started by #mdb_cursor_del(). */ @@ -6320,6 +6353,7 @@ mdb_cursor_del0(MDB_cursor *mc, MDB_node *leaf) mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], mc->mc_db->md_pad); mc->mc_db->md_entries--; rc = mdb_rebalance(mc); + mc->mc_flags &= ~C_INITIALIZED; if (rc != MDB_SUCCESS) mc->mc_txn->mt_flags |= MDB_TXN_ERROR; @@ -6364,8 +6398,20 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, xdata = NULL; } rc = mdb_cursor_set(&mc, key, xdata, op, &exact); - if (rc == 0) + if (rc == 0) { + /* let mdb_page_split know about this cursor if needed: + * delete will trigger a rebalance; if it needs to move + * a node from one page to another, it will have to + * update the parent's separator key(s). If the new sepkey + * is larger than the current one, the parent page may + * run out of space, triggering a split. We need this + * cursor to be consistent until the end of the rebalance. + */ + mc.mc_next = txn->mt_cursors[dbi]; + txn->mt_cursors[dbi] = &mc; rc = mdb_cursor_del(&mc, data ? 0 : MDB_NODUPDATA); + txn->mt_cursors[dbi] = mc.mc_next; + } return rc; }