diff --git a/libraries/libmdb/mdb.c b/libraries/libmdb/mdb.c index 112936a..8a877c7 100644 --- a/libraries/libmdb/mdb.c +++ b/libraries/libmdb/mdb.c @@ -512,6 +512,7 @@ typedef struct MDB_page { pgno_t p_pgno; /**< page number */ void * p_next; /**< for in-memory list of freed structs */ } mp_p; + uint16_t mp_pad; /** @defgroup mdb_page Page Flags * @ingroup internal * Flags for the page headers. @@ -523,8 +524,9 @@ typedef struct MDB_page { #define P_META 0x08 /**< meta page */ #define P_DIRTY 0x10 /**< dirty page */ #define P_LEAF2 0x20 /**< for #MDB_DUPFIXED records */ +#define P_SUBP 0x40 /**< for #MDB_DUPSORT sub-pages */ /** @} */ - uint32_t mp_flags; /**< @ref mdb_page */ + uint16_t mp_flags; /**< @ref mdb_page */ #define mp_lower mp_pb.pb.pb_lower #define mp_upper mp_pb.pb.pb_upper #define mp_pages mp_pb.pb_pages @@ -566,6 +568,8 @@ typedef struct MDB_page { #define IS_BRANCH(p) F_ISSET((p)->mp_flags, P_BRANCH) /** Test if a page is an overflow page */ #define IS_OVERFLOW(p) F_ISSET((p)->mp_flags, P_OVERFLOW) + /** Test if a page is a sub page */ +#define IS_SUBP(p) F_ISSET((p)->mp_flags, P_SUBP) /** The number of overflow pages needed to store the given size. */ #define OVPAGES(size, psize) ((PAGEHDRSZ-1 + (size)) / (psize) + 1) @@ -666,9 +670,6 @@ typedef struct MDB_db { /** Handle for the default DB. */ #define MAIN_DBI 1 - /** Identify a data item as a valid sub-DB record */ -#define MDB_SUBDATA 0x8200 - /** Meta page content. */ typedef struct MDB_meta { /** Stamp identifying this as an MDB data file. It must be set @@ -873,6 +874,7 @@ static MDB_node *mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp); static int mdb_node_add(MDB_cursor *mc, indx_t indx, MDB_val *key, MDB_val *data, pgno_t pgno, uint8_t flags); static void mdb_node_del(MDB_page *mp, indx_t indx, int ksize); +static void mdb_node_shrink(MDB_page *mp, indx_t indx); static int mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst); static int mdb_node_read(MDB_txn *txn, MDB_node *leaf, MDB_val *data); static size_t mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data); @@ -2561,8 +2563,8 @@ mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp) nkeys = NUMKEYS(mp); - DPRINTF("searching %u keys in %s page %zu", - nkeys, IS_LEAF(mp) ? "leaf" : "branch", + DPRINTF("searching %u keys in %s %spage %zu", + nkeys, IS_LEAF(mp) ? "leaf" : "branch", IS_SUBP(mp) ? "sub-" : "", mp->mp_pgno); assert(nkeys > 0); @@ -2984,6 +2986,7 @@ mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op) DPUTS("=====> move to next sibling page"); if (mdb_cursor_sibling(mc, 1) != MDB_SUCCESS) { mc->mc_flags |= C_EOF; + mc->mc_flags &= ~C_INITIALIZED; return MDB_NOTFOUND; } mp = mc->mc_pg[mc->mc_top]; @@ -3113,6 +3116,10 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_val nodekey; mp = mc->mc_pg[mc->mc_top]; + if (!NUMKEYS(mp)) { + mc->mc_ki[mc->mc_top] = 0; + return MDB_NOTFOUND; + } if (mp->mp_flags & P_LEAF2) { nodekey.mv_size = mc->mc_db->md_pad; nodekey.mv_data = LEAF2KEY(mp, 0, nodekey.mv_size); @@ -3170,6 +3177,11 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data, return MDB_NOTFOUND; } } + if (!mc->mc_top) { + /* There are no other pages */ + mc->mc_ki[mc->mc_top] = 0; + return MDB_NOTFOUND; + } } rc = mdb_page_search(mc, key, 0); @@ -3257,9 +3269,11 @@ mdb_cursor_first(MDB_cursor *mc, MDB_val *key, MDB_val *data) int rc; MDB_node *leaf; - rc = mdb_page_search(mc, NULL, 0); - if (rc != MDB_SUCCESS) - return rc; + if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) { + rc = mdb_page_search(mc, NULL, 0); + if (rc != MDB_SUCCESS) + return rc; + } assert(IS_LEAF(mc->mc_pg[mc->mc_top])); leaf = NODEPTR(mc->mc_pg[mc->mc_top], 0); @@ -3302,9 +3316,11 @@ mdb_cursor_last(MDB_cursor *mc, MDB_val *key, MDB_val *data) lkey.mv_size = MAXKEYSIZE+1; lkey.mv_data = NULL; - rc = mdb_page_search(mc, &lkey, 0); - if (rc != MDB_SUCCESS) - return rc; + if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) { + rc = mdb_page_search(mc, &lkey, 0); + if (rc != MDB_SUCCESS) + return rc; + } assert(IS_LEAF(mc->mc_pg[mc->mc_top])); leaf = NODEPTR(mc->mc_pg[mc->mc_top], NUMKEYS(mc->mc_pg[mc->mc_top])-1); @@ -3488,12 +3504,14 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, { MDB_node *leaf; MDB_val xdata, *rdata, dkey; + MDB_page *fp; MDB_db dummy; - char dbuf[PAGESIZE]; int do_sub = 0; size_t nsize; - DKBUF; int rc, rc2; + char pbuf[PAGESIZE]; + char dbuf[MAXKEYSIZE+1]; + DKBUF; if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_RDONLY)) return EACCES; @@ -3564,37 +3582,111 @@ top: if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT)) { /* Was a single item before, must convert now */ if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) { + /* Just overwrite the current item */ + if (flags == MDB_CURRENT) + goto current; + + /* create a fake page for the dup items */ dkey.mv_size = NODEDSZ(leaf); - dkey.mv_data = dbuf; - memcpy(dbuf, NODEDATA(leaf), dkey.mv_size); + dkey.mv_data = NODEDATA(leaf); /* data matches, ignore it */ if (!mc->mc_dbx->md_dcmp(data, &dkey)) return (flags == MDB_NODUPDATA) ? MDB_KEYEXIST : MDB_SUCCESS; - memset(&dummy, 0, sizeof(dummy)); + memcpy(dbuf, dkey.mv_data, dkey.mv_size); + dkey.mv_data = dbuf; + fp = (MDB_page *)pbuf; + fp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno; + fp->mp_flags = P_LEAF|P_DIRTY|P_SUBP; + fp->mp_lower = PAGEHDRSZ; + fp->mp_upper = PAGEHDRSZ + dkey.mv_size + data->mv_size; if (mc->mc_db->md_flags & MDB_DUPFIXED) { - dummy.md_pad = data->mv_size; - dummy.md_flags = MDB_DUPFIXED; - if (mc->mc_db->md_flags & MDB_INTEGERDUP) - dummy.md_flags |= MDB_INTEGERKEY; + fp->mp_flags |= P_LEAF2; + fp->mp_pad = data->mv_size; + } else { + fp->mp_upper += 2 * sizeof(indx_t) + 2 * NODESIZE + + (dkey.mv_size & 1) + (data->mv_size & 1); } - dummy.md_flags |= MDB_SUBDATA; - dummy.md_root = P_INVALID; - if (dkey.mv_size == sizeof(MDB_db)) { - memcpy(NODEDATA(leaf), &dummy, sizeof(dummy)); + mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0); + do_sub = 1; + rdata = &xdata; + xdata.mv_size = fp->mp_upper; + xdata.mv_data = pbuf; + flags |= F_DUPDATA; + goto new_sub; + } + if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) { + /* See if we need to convert from fake page to subDB */ + MDB_page *mp; + unsigned int offset; + unsigned int i; + + fp = NODEDATA(leaf); + if (flags == MDB_CURRENT) { + fp->mp_flags |= P_DIRTY; + fp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno; + mc->mc_xcursor->mx_cursor.mc_pg[0] = fp; + flags |= F_DUPDATA; goto put_sub; } + if (mc->mc_db->md_flags & MDB_DUPFIXED) { + offset = fp->mp_pad; + } else { + offset = NODESIZE + sizeof(indx_t) + data->mv_size; + } + offset += offset & 1; + if (NODEDSZ(leaf) + offset >= mc->mc_txn->mt_env->me_psize / MDB_MINKEYS) { + /* yes, convert it */ + dummy.md_flags = 0; + if (mc->mc_db->md_flags & MDB_DUPFIXED) { + dummy.md_pad = fp->mp_pad; + dummy.md_flags = MDB_DUPFIXED; + if (mc->mc_db->md_flags & MDB_INTEGERDUP) + dummy.md_flags |= MDB_INTEGERKEY; + } + dummy.md_depth = 1; + dummy.md_branch_pages = 0; + dummy.md_leaf_pages = 1; + dummy.md_overflow_pages = 0; + dummy.md_entries = NUMKEYS(fp); + rdata = &xdata; + xdata.mv_size = sizeof(MDB_db); + xdata.mv_data = &dummy; + mp = mdb_page_alloc(mc, 1); + if (!mp) + return ENOMEM; + offset = mc->mc_txn->mt_env->me_psize - NODEDSZ(leaf); + flags |= F_DUPDATA|F_SUBDATA; + dummy.md_root = mp->mp_pgno; + } else { + /* no, just grow it */ + rdata = &xdata; + xdata.mv_size = NODEDSZ(leaf) + offset; + xdata.mv_data = pbuf; + mp = (MDB_page *)pbuf; + mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno; + flags |= F_DUPDATA; + } + mp->mp_flags = fp->mp_flags | P_DIRTY; + mp->mp_pad = fp->mp_pad; + mp->mp_lower = fp->mp_lower; + mp->mp_upper = fp->mp_upper + offset; + if (IS_LEAF2(fp)) { + memcpy(METADATA(mp), METADATA(fp), NUMKEYS(fp) * fp->mp_pad); + } else { + nsize = NODEDSZ(leaf) - fp->mp_upper; + memcpy((char *)mp + mp->mp_upper, (char *)fp + fp->mp_upper, nsize); + for (i=0; imp_ptrs[i] = fp->mp_ptrs[i] + offset; + } mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0); do_sub = 1; - rdata = &xdata; - xdata.mv_size = sizeof(MDB_db); - xdata.mv_data = &dummy; - /* new sub-DB, must fully init xcursor */ - if (flags == MDB_CURRENT) - flags = 0; goto new_sub; } + /* data is on sub-DB, just store it */ + flags |= F_DUPDATA|F_SUBDATA; goto put_sub; } +current: /* same size, just replace it */ if (!F_ISSET(leaf->mn_flags, F_BIGDATA) && NODEDSZ(leaf) == data->mv_size) { @@ -3621,9 +3713,9 @@ new_sub: mc->mc_txn->mt_flags |= MDB_TXN_ERROR; else { /* Remember if we just added a subdatabase */ - if (flags & F_SUBDATA) { + if (flags & (F_SUBDATA|F_DUPDATA)) { leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); - leaf->mn_flags |= F_SUBDATA; + leaf->mn_flags |= (flags & (F_SUBDATA|F_DUPDATA)); } /* Now store the actual data in the child DB. Note that we're @@ -3633,27 +3725,33 @@ new_sub: */ if (do_sub) { MDB_db *db; - leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); + int xflags; put_sub: - if (flags != MDB_CURRENT) - mdb_xcursor_init1(mc, leaf); xdata.mv_size = 0; xdata.mv_data = ""; - if (flags == MDB_NODUPDATA) - flags = MDB_NOOVERWRITE; + if (flags & MDB_CURRENT) { + xflags = MDB_CURRENT; + } else { + mdb_xcursor_init1(mc, leaf); + xflags = (flags & MDB_NODUPDATA) ? MDB_NOOVERWRITE : 0; + } /* converted, write the original data first */ if (dkey.mv_size) { - rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, flags); + rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, xflags); if (rc) return rc; - leaf->mn_flags |= F_DUPDATA; } - rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, flags); - db = NODEDATA(leaf); - assert((db->md_flags & MDB_SUBDATA) == MDB_SUBDATA); - memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db)); + rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags); + if (flags & F_SUBDATA) { + db = NODEDATA(leaf); + memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db)); + } } - mc->mc_db->md_entries++; + /* sub-writes might have failed so check rc again. + * Don't increment count if we just replaced an existing item. + */ + if (!rc && !(flags & MDB_CURRENT)) + mc->mc_db->md_entries++; } done: return rc; @@ -3679,48 +3777,68 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags) if (!IS_LEAF2(mc->mc_pg[mc->mc_top]) && F_ISSET(leaf->mn_flags, F_DUPDATA)) { if (flags != MDB_NODUPDATA) { + if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) { + mc->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf); + } rc = mdb_cursor_del(&mc->mc_xcursor->mx_cursor, 0); /* If sub-DB still has entries, we're done */ if (mc->mc_xcursor->mx_db.md_root != P_INVALID) { - MDB_db *db = NODEDATA(leaf); - assert((db->md_flags & MDB_SUBDATA) == MDB_SUBDATA); - memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db)); + if (leaf->mn_flags & F_SUBDATA) { + /* update subDB info */ + MDB_db *db = NODEDATA(leaf); + memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db)); + } else { + /* shrink fake page */ + mdb_node_shrink(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); + } mc->mc_db->md_entries--; return rc; } /* otherwise fall thru and delete the sub-DB */ } - /* add all the child DB's pages to the free list */ - rc = mdb_page_search(&mc->mc_xcursor->mx_cursor, NULL, 0); - if (rc == MDB_SUCCESS) { - MDB_node *ni; - MDB_cursor *mx; - unsigned int i; + if (leaf->mn_flags & F_SUBDATA) { + /* add all the child DB's pages to the free list */ + rc = mdb_page_search(&mc->mc_xcursor->mx_cursor, NULL, 0); + if (rc == MDB_SUCCESS) { + MDB_node *ni; + MDB_cursor *mx; + unsigned int i; - mx = &mc->mc_xcursor->mx_cursor; - mc->mc_db->md_entries -= - mx->mc_db->md_entries; - - mdb_cursor_pop(mx); - while (mx->mc_snum > 1) { - for (i=0; imc_pg[mx->mc_top]); i++) { - MDB_page *mp; - pgno_t pg; - ni = NODEPTR(mx->mc_pg[mx->mc_top], i); - pg = NODEPGNO(ni); - if ((rc = mdb_page_get(mc->mc_txn, pg, &mp))) - return rc; - /* free it */ - mdb_midl_append(mc->mc_txn->mt_free_pgs, pg); + mx = &mc->mc_xcursor->mx_cursor; + mc->mc_db->md_entries -= + mx->mc_db->md_entries; + + mdb_cursor_pop(mx); + while (mx->mc_snum > 0) { + for (i=0; imc_pg[mx->mc_top]); i++) { + pgno_t pg; + ni = NODEPTR(mx->mc_pg[mx->mc_top], i); + pg = NODEPGNO(ni); + /* free it */ + mdb_midl_append(mc->mc_txn->mt_free_pgs, pg); + } + if (!mx->mc_top) + break; + rc = mdb_cursor_sibling(mx, 1); + if (rc) { + /* no more siblings, go back to beginning + * of previous level. (stack was already popped + * by mdb_cursor_sibling) + */ + for (i=1; imc_top; i++) { + pgno_t pg; + ni = NODEPTR(mx->mc_pg[i-1],0); + pg = NODEPGNO(ni); + if ((rc = mdb_page_get(mc->mc_txn, pg, &mx->mc_pg[i]))) + break; + } + } } - rc = mdb_cursor_sibling(mx, 1); - if (rc) - break; + /* free it */ + mdb_midl_append(mc->mc_txn->mt_free_pgs, + mx->mc_db->md_root); } - /* free it */ - mdb_midl_append(mc->mc_txn->mt_free_pgs, - mx->mc_db->md_root); } } @@ -3839,8 +3957,9 @@ mdb_node_add(MDB_cursor *mc, indx_t indx, assert(mp->mp_upper >= mp->mp_lower); - DPRINTF("add to %s page %zu index %i, data size %zu key size %zu [%s]", + DPRINTF("add to %s %spage %zu index %i, data size %zu key size %zu [%s]", IS_LEAF(mp) ? "leaf" : "branch", + IS_SUBP(mp) ? "sub-" : "", mp->mp_pgno, indx, data ? data->mv_size : 0, key ? key->mv_size : 0, key ? DKEY(key) : NULL); @@ -3991,6 +4110,58 @@ mdb_node_del(MDB_page *mp, indx_t indx, int ksize) mp->mp_upper += sz; } +/** Compact the main page after deleting a node on a subpage. + * @param[in] mp The main page to operate on. + * @param[in] indx The index of the subpage on the main page. + */ +static void +mdb_node_shrink(MDB_page *mp, indx_t indx) +{ + MDB_node *node; + MDB_page *sp, *xp; + char *base; + int osize, nsize; + int delta; + indx_t i, numkeys, ptr; + + node = NODEPTR(mp, indx); + sp = (MDB_page *)NODEDATA(node); + osize = NODEDSZ(node); + + delta = sp->mp_upper - sp->mp_lower; + SETDSZ(node, osize - delta); + xp = (MDB_page *)((char *)sp + delta); + + /* shift subpage upward */ + if (IS_LEAF2(sp)) { + nsize = NUMKEYS(sp) * sp->mp_pad; + memmove(METADATA(xp), METADATA(sp), nsize); + } else { + int i; + nsize = osize - sp->mp_upper; + numkeys = NUMKEYS(sp); + for (i=numkeys-1; i>=0; i--) + xp->mp_ptrs[i] = sp->mp_ptrs[i] - delta; + } + xp->mp_upper = sp->mp_lower; + xp->mp_lower = sp->mp_lower; + xp->mp_flags = sp->mp_flags; + xp->mp_pad = sp->mp_pad; + xp->mp_pgno = mp->mp_pgno; + + /* shift lower nodes upward */ + ptr = mp->mp_ptrs[indx]; + numkeys = NUMKEYS(mp); + for (i = 0; i < numkeys; i++) { + if (mp->mp_ptrs[i] <= ptr) + mp->mp_ptrs[i] += delta; + } + + base = (char *)mp + mp->mp_upper; + memmove(base + delta, base, ptr - mp->mp_upper + NODESIZE + NODEKSZ(node)); + mp->mp_upper += delta; +} + /** Initial setup of a sorted-dups cursor. * Sorted duplicates are implemented as a sub-database for the given key. * The duplicate data items are actually keys of the sub-database. @@ -4026,18 +4197,41 @@ mdb_xcursor_init0(MDB_cursor *mc) static void mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node) { - MDB_db *db = NODEDATA(node); MDB_xcursor *mx = mc->mc_xcursor; - assert((db->md_flags & MDB_SUBDATA) == MDB_SUBDATA); - mx->mx_db = *db; + + if (node->mn_flags & F_SUBDATA) { + MDB_db *db = NODEDATA(node); + mx->mx_db = *db; + mx->mx_cursor.mc_snum = 0; + mx->mx_cursor.mc_flags = 0; + } else { + MDB_page *fp = NODEDATA(node); + mx->mx_db.md_pad = mc->mc_pg[mc->mc_top]->mp_pad; + mx->mx_db.md_flags = 0; + mx->mx_db.md_depth = 1; + mx->mx_db.md_branch_pages = 0; + mx->mx_db.md_leaf_pages = 1; + mx->mx_db.md_overflow_pages = 0; + mx->mx_db.md_entries = NUMKEYS(fp); + mx->mx_db.md_root = fp->mp_pgno; + mx->mx_cursor.mc_snum = 1; + mx->mx_cursor.mc_flags = C_INITIALIZED; + mx->mx_cursor.mc_top = 0; + mx->mx_cursor.mc_pg[0] = fp; + mx->mx_cursor.mc_ki[0] = 0; + if (mc->mc_db->md_flags & MDB_DUPFIXED) { + mx->mx_db.md_flags = MDB_DUPFIXED; + mx->mx_db.md_pad = fp->mp_pad; + if (mc->mc_db->md_flags & MDB_INTEGERDUP) + mx->mx_db.md_flags |= MDB_INTEGERKEY; + } + } DPRINTF("Sub-db %u for db %u root page %zu", mx->mx_cursor.mc_dbi, mc->mc_dbi, - db->md_root); + mx->mx_db.md_root); if (F_ISSET(mc->mc_pg[mc->mc_top]->mp_flags, P_DIRTY)) mx->mx_dbx.md_dirty = 1; mx->mx_dbx.md_name.mv_data = NODEKEY(node); mx->mx_dbx.md_name.mv_size = node->mn_ksize; - mx->mx_cursor.mc_snum = 0; - mx->mx_cursor.mc_flags = 0; if (mx->mx_dbx.md_cmp == mdb_cmp_int && mx->mx_db.md_pad == sizeof(size_t)) mx->mx_dbx.md_cmp = mdb_cmp_long; } @@ -4468,6 +4662,7 @@ mdb_rebalance(MDB_cursor *mc) if (PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) >= FILL_THRESHOLD && NUMKEYS(mn.mc_pg[mn.mc_top]) >= 2) return mdb_node_move(&mn, mc); else { /* FIXME: if (has_enough_room()) */ + mc->mc_flags &= ~C_INITIALIZED; if (mc->mc_ki[ptop] == 0) return mdb_page_merge(&mn, mc); else