diff --git a/libraries/liblmdb/mdb.c b/libraries/liblmdb/mdb.c index 91a84f0..3963511 100644 --- a/libraries/liblmdb/mdb.c +++ b/libraries/liblmdb/mdb.c @@ -7376,10 +7376,11 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno unsigned int nflags) { unsigned int flags; - int rc = MDB_SUCCESS, ins_new = 0, new_root = 0, newpos = 1, did_split = 0; + int rc = MDB_SUCCESS, new_root = 0, did_split = 0; indx_t newindx; pgno_t pgno = 0; unsigned int i, j, split_indx, nkeys, pmax; + MDB_env *env = mc->mc_txn->mt_env; MDB_node *node; MDB_val sepkey, rkey, xdata, *rdata = &xdata; MDB_page *copy; @@ -7390,10 +7391,11 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno mp = mc->mc_pg[mc->mc_top]; newindx = mc->mc_ki[mc->mc_top]; + nkeys = NUMKEYS(mp); - DPRINTF(("-----> splitting %s page %"Z"u and adding [%s] at index %i", + DPRINTF(("-----> splitting %s page %"Z"u and adding [%s] at index %i/%i", IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno, - DKEY(newkey), mc->mc_ki[mc->mc_top])); + DKEY(newkey), mc->mc_ki[mc->mc_top], nkeys)); /* Create a right sibling. */ if ((rc = mdb_page_new(mc, mp->mp_flags, 1, &rp))) @@ -7440,141 +7442,148 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno sepkey = *newkey; split_indx = newindx; nkeys = 0; - goto newsep; - } + } else { - nkeys = NUMKEYS(mp); - split_indx = nkeys / 2; - if (newindx < split_indx) - newpos = 0; - - if (IS_LEAF2(rp)) { - char *split, *ins; - int x; - unsigned int lsize, rsize, ksize; - /* Move half of the keys to the right sibling */ - copy = NULL; - x = mc->mc_ki[mc->mc_top] - split_indx; - ksize = mc->mc_db->md_pad; - split = LEAF2KEY(mp, split_indx, ksize); - rsize = (nkeys - split_indx) * ksize; - lsize = (nkeys - split_indx) * sizeof(indx_t); - mp->mp_lower -= lsize; - rp->mp_lower += lsize; - mp->mp_upper += rsize - lsize; - rp->mp_upper -= rsize - lsize; - sepkey.mv_size = ksize; - if (newindx == split_indx) { - sepkey.mv_data = newkey->mv_data; - } else { - sepkey.mv_data = split; - } - if (x<0) { - ins = LEAF2KEY(mp, mc->mc_ki[mc->mc_top], ksize); - memcpy(rp->mp_ptrs, split, rsize); - sepkey.mv_data = rp->mp_ptrs; - memmove(ins+ksize, ins, (split_indx - mc->mc_ki[mc->mc_top]) * ksize); - memcpy(ins, newkey->mv_data, ksize); - mp->mp_lower += sizeof(indx_t); - mp->mp_upper -= ksize - sizeof(indx_t); + split_indx = (nkeys+1) / 2; + + if (IS_LEAF2(rp)) { + char *split, *ins; + int x; + unsigned int lsize, rsize, ksize; + /* Move half of the keys to the right sibling */ + copy = NULL; + x = mc->mc_ki[mc->mc_top] - split_indx; + ksize = mc->mc_db->md_pad; + split = LEAF2KEY(mp, split_indx, ksize); + rsize = (nkeys - split_indx) * ksize; + lsize = (nkeys - split_indx) * sizeof(indx_t); + mp->mp_lower -= lsize; + rp->mp_lower += lsize; + mp->mp_upper += rsize - lsize; + rp->mp_upper -= rsize - lsize; + sepkey.mv_size = ksize; + if (newindx == split_indx) { + sepkey.mv_data = newkey->mv_data; + } else { + sepkey.mv_data = split; + } + if (x<0) { + ins = LEAF2KEY(mp, mc->mc_ki[mc->mc_top], ksize); + memcpy(rp->mp_ptrs, split, rsize); + sepkey.mv_data = rp->mp_ptrs; + memmove(ins+ksize, ins, (split_indx - mc->mc_ki[mc->mc_top]) * ksize); + memcpy(ins, newkey->mv_data, ksize); + mp->mp_lower += sizeof(indx_t); + mp->mp_upper -= ksize - sizeof(indx_t); + } else { + if (x) + memcpy(rp->mp_ptrs, split, x * ksize); + ins = LEAF2KEY(rp, x, ksize); + memcpy(ins, newkey->mv_data, ksize); + memcpy(ins+ksize, split + x * ksize, rsize - x * ksize); + rp->mp_lower += sizeof(indx_t); + rp->mp_upper -= ksize - sizeof(indx_t); + mc->mc_ki[mc->mc_top] = x; + mc->mc_pg[mc->mc_top] = rp; + } } else { - if (x) - memcpy(rp->mp_ptrs, split, x * ksize); - ins = LEAF2KEY(rp, x, ksize); - memcpy(ins, newkey->mv_data, ksize); - memcpy(ins+ksize, split + x * ksize, rsize - x * ksize); - rp->mp_lower += sizeof(indx_t); - rp->mp_upper -= ksize - sizeof(indx_t); - mc->mc_ki[mc->mc_top] = x; - mc->mc_pg[mc->mc_top] = rp; - } - goto newsep; - } + unsigned int psize, nsize, tsize; + int k; + /* Maximum free space in an empty page */ + pmax = env->me_psize - PAGEHDRSZ; + if (IS_LEAF(mp)) + nsize = mdb_leaf_size(env, newkey, newdata); + else + nsize = mdb_branch_size(env, newkey); + nsize += nsize & 1; - /* For leaf pages, check the split point based on what - * fits where, since otherwise mdb_node_add can fail. - * - * This check is only needed when the data items are - * relatively large, such that being off by one will - * make the difference between success or failure. - * - * It's also relevant if a page happens to be laid out - * such that one half of its nodes are all "small" and - * the other half of its nodes are "large." If the new - * item is also "large" and falls on the half with - * "large" nodes, it also may not fit. - */ - if (IS_LEAF(mp)) { - unsigned int psize, nsize; - /* Maximum free space in an empty page */ - pmax = mc->mc_txn->mt_env->me_psize - PAGEHDRSZ; - nsize = mdb_leaf_size(mc->mc_txn->mt_env, newkey, newdata); - if ((nkeys < 20) || (nsize > pmax/16)) { - if (newindx <= split_indx) { - psize = nsize; - newpos = 0; - for (i=0; imn_flags, F_BIGDATA)) - psize += sizeof(pgno_t); - else - psize += NODEDSZ(node); - psize += psize & 1; - if (psize > pmax) { - if (i <= newindx) { - split_indx = newindx; - if (i < newindx) - newpos = 1; + /* grab a page to hold a temporary copy */ + copy = mdb_page_malloc(mc->mc_txn, 1); + if (copy == NULL) + return ENOMEM; + copy->mp_pgno = mp->mp_pgno; + copy->mp_flags = mp->mp_flags; + copy->mp_lower = PAGEHDRSZ; + copy->mp_upper = env->me_psize; + + /* prepare to insert */ + for (i=0, j=0; imp_ptrs[j++] = 0; + } + copy->mp_ptrs[j++] = mp->mp_ptrs[i]; + } + + /* When items are relatively large the split point needs + * to be checked, because being off-by-one will make the + * difference between success or failure in mdb_node_add. + * + * It's also relevant if a page happens to be laid out + * such that one half of its nodes are all "small" and + * the other half of its nodes are "large." If the new + * item is also "large" and falls on the half with + * "large" nodes, it also may not fit. + * + * As a final tweak, if the new item goes on the last + * spot on the page (and thus, onto the new page), bias + * the split so the new page is emptier than the old page. + * This yields better packing during sequential inserts. + */ + if (nkeys < 20 || nsize > pmax/16 || newindx >= nkeys) { + /* Find split point */ + psize = 0; + if (newindx <= split_indx || newindx >= nkeys) { + i = 0; j = 1; + k = newindx >= nkeys ? nkeys : split_indx+1; + } else { + i = nkeys; j = -1; + k = split_indx-1; + } + for (; i!=k; i+=j) { + if (i == newindx) { + tsize = nsize; + node = NULL; + } else { + node = (MDB_node *)((char *)mp + copy->mp_ptrs[i]); + tsize = NODESIZE + NODEKSZ(node) + sizeof(indx_t); + if (IS_LEAF(mp)) { + if (F_ISSET(node->mn_flags, F_BIGDATA)) + tsize += sizeof(pgno_t); + else + tsize += NODEDSZ(node); } - else - split_indx = i; - break; + tsize += tsize & 1; } - } - } else { - psize = nsize; - for (i=nkeys-1; i>=split_indx; i--) { - node = NODEPTR(mp, i); - psize += NODESIZE + NODEKSZ(node) + sizeof(indx_t); - if (F_ISSET(node->mn_flags, F_BIGDATA)) - psize += sizeof(pgno_t); - else - psize += NODEDSZ(node); - psize += psize & 1; - if (psize > pmax) { - if (i >= newindx) { - split_indx = newindx; - newpos = 0; - } else - split_indx = i+1; + if (psize + tsize > pmax) { + split_indx = i + (j<0); break; } + psize += tsize; } + /* special case: when the new node was on the last + * slot we may not have tripped the break inside the loop. + * In all other cases we either hit the break condition, + * or the original split_indx was already safe. + */ + if (newindx >= nkeys && i == k) + split_indx = nkeys-1; + } + if (split_indx == newindx) { + sepkey.mv_size = newkey->mv_size; + sepkey.mv_data = newkey->mv_data; + } else { + node = (MDB_node *)((char *)mp + copy->mp_ptrs[split_indx]); + sepkey.mv_size = node->mn_ksize; + sepkey.mv_data = NODEKEY(node); } } } - /* First find the separating key between the split pages. - * The case where newindx == split_indx is ambiguous; the - * new item could go to the new page or stay on the original - * page. If newpos == 1 it goes to the new page. - */ - if (newindx == split_indx && newpos) { - sepkey.mv_size = newkey->mv_size; - sepkey.mv_data = newkey->mv_data; - } else { - node = NODEPTR(mp, split_indx); - sepkey.mv_size = node->mn_ksize; - sepkey.mv_data = NODEKEY(node); - } - -newsep: - DPRINTF(("separator is [%s]", DKEY(&sepkey))); + DPRINTF(("separator is %d [%s]", split_indx, DKEY(&sepkey))); /* Copy separator key to the parent. */ - if (SIZELEFT(mn.mc_pg[ptop]) < mdb_branch_size(mc->mc_txn->mt_env, &sepkey)) { + if (SIZELEFT(mn.mc_pg[ptop]) < mdb_branch_size(env, &sepkey)) { mn.mc_snum--; mn.mc_top--; did_split = 1; @@ -7619,108 +7628,91 @@ newsep: return rc; for (i=0; imc_top; i++) mc->mc_ki[i] = mn.mc_ki[i]; - goto done; - } - if (IS_LEAF2(rp)) { - goto done; - } - - /* Move half of the keys to the right sibling. */ + } else if (!IS_LEAF2(mp)) { + /* Move nodes */ + mc->mc_pg[mc->mc_top] = rp; + i = split_indx; + j = 0; + do { + if (i == newindx) { + rkey.mv_data = newkey->mv_data; + rkey.mv_size = newkey->mv_size; + if (IS_LEAF(mp)) { + rdata = newdata; + } else + pgno = newpgno; + flags = nflags; + /* Update index for the new key. */ + mc->mc_ki[mc->mc_top] = j; + } else { + node = (MDB_node *)((char *)mp + copy->mp_ptrs[i]); + rkey.mv_data = NODEKEY(node); + rkey.mv_size = node->mn_ksize; + if (IS_LEAF(mp)) { + xdata.mv_data = NODEDATA(node); + xdata.mv_size = NODEDSZ(node); + rdata = &xdata; + } else + pgno = NODEPGNO(node); + flags = node->mn_flags; + } - /* grab a page to hold a temporary copy */ - copy = mdb_page_malloc(mc->mc_txn, 1); - if (copy == NULL) - return ENOMEM; + if (!IS_LEAF(mp) && j == 0) { + /* First branch index doesn't need key data. */ + rkey.mv_size = 0; + } - copy->mp_pgno = mp->mp_pgno; - copy->mp_flags = mp->mp_flags; - copy->mp_lower = PAGEHDRSZ; - copy->mp_upper = mc->mc_txn->mt_env->me_psize; - mc->mc_pg[mc->mc_top] = copy; - for (i = j = 0; i <= nkeys; j++) { - if (i == split_indx) { - /* Insert in right sibling. */ - /* Reset insert index for right sibling. */ - if (i != newindx || (newpos ^ ins_new)) { + rc = mdb_node_add(mc, j, &rkey, rdata, pgno, flags); + if (rc) { + /* return tmp page to freelist */ + mdb_page_free(env, copy); + return rc; + } + if (i == nkeys) { + i = 0; j = 0; - mc->mc_pg[mc->mc_top] = rp; + mc->mc_pg[mc->mc_top] = copy; + } else { + i++; + j++; + } + } while (i != split_indx); + + nkeys = NUMKEYS(copy); + for (i=0; imp_ptrs[i] = copy->mp_ptrs[i]; + mp->mp_lower = copy->mp_lower; + mp->mp_upper = copy->mp_upper; + memcpy(NODEPTR(mp, nkeys-1), NODEPTR(copy, nkeys-1), + env->me_psize - copy->mp_upper); + + /* reset back to original page */ + if (newindx < split_indx) { + mc->mc_pg[mc->mc_top] = mp; + if (nflags & MDB_RESERVE) { + node = NODEPTR(mp, mc->mc_ki[mc->mc_top]); + if (!(node->mn_flags & F_BIGDATA)) + newdata->mv_data = NODEDATA(node); } - } - - if (i == newindx && !ins_new) { - /* Insert the original entry that caused the split. */ - rkey.mv_data = newkey->mv_data; - rkey.mv_size = newkey->mv_size; - if (IS_LEAF(mp)) { - rdata = newdata; - } else - pgno = newpgno; - flags = nflags; - - ins_new = 1; - - /* Update index for the new key. */ - mc->mc_ki[mc->mc_top] = j; - } else if (i == nkeys) { - break; } else { - node = NODEPTR(mp, i); - rkey.mv_data = NODEKEY(node); - rkey.mv_size = node->mn_ksize; - if (IS_LEAF(mp)) { - xdata.mv_data = NODEDATA(node); - xdata.mv_size = NODEDSZ(node); - rdata = &xdata; - } else - pgno = NODEPGNO(node); - flags = node->mn_flags; - - i++; - } - - if (!IS_LEAF(mp) && j == 0) { - /* First branch index doesn't need key data. */ - rkey.mv_size = 0; - } - - rc = mdb_node_add(mc, j, &rkey, rdata, pgno, flags); - if (rc) break; - } - - nkeys = NUMKEYS(copy); - for (i=0; imp_ptrs[i] = copy->mp_ptrs[i]; - mp->mp_lower = copy->mp_lower; - mp->mp_upper = copy->mp_upper; - memcpy(NODEPTR(mp, nkeys-1), NODEPTR(copy, nkeys-1), - mc->mc_txn->mt_env->me_psize - copy->mp_upper); - - /* reset back to original page */ - if (newindx < split_indx || (!newpos && newindx == split_indx)) { - mc->mc_pg[mc->mc_top] = mp; - if (nflags & MDB_RESERVE) { - node = NODEPTR(mp, mc->mc_ki[mc->mc_top]); - if (!(node->mn_flags & F_BIGDATA)) - newdata->mv_data = NODEDATA(node); - } - } else { - mc->mc_ki[ptop]++; - /* Make sure mc_ki is still valid. - */ - if (mn.mc_pg[ptop] != mc->mc_pg[ptop] && - mc->mc_ki[ptop] >= NUMKEYS(mc->mc_pg[ptop])) { - for (i=0; imc_pg[i] = mn.mc_pg[i]; - mc->mc_ki[i] = mn.mc_ki[i]; + mc->mc_pg[mc->mc_top] = rp; + mc->mc_ki[ptop]++; + /* Make sure mc_ki is still valid. + */ + if (mn.mc_pg[ptop] != mc->mc_pg[ptop] && + mc->mc_ki[ptop] >= NUMKEYS(mc->mc_pg[ptop])) { + for (i=0; imc_pg[i] = mn.mc_pg[i]; + mc->mc_ki[i] = mn.mc_ki[i]; + } + mc->mc_pg[ptop] = mn.mc_pg[ptop]; + mc->mc_ki[ptop] = mn.mc_ki[ptop] - 1; } - mc->mc_pg[ptop] = mn.mc_pg[ptop]; - mc->mc_ki[ptop] = mn.mc_ki[ptop] - 1; } + /* return tmp page to freelist */ + mdb_page_free(env, copy); } - /* return tmp page to freelist */ - mdb_page_free(mc->mc_txn->mt_env, copy); -done: { /* Adjust other cursors pointing to mp */ MDB_cursor *m2, *m3; @@ -7768,6 +7760,7 @@ done: } } } + DPRINTF(("mp left: %d, rp left: %d", SIZELEFT(mp), SIZELEFT(rp))); return rc; }