New sorted-dup subpage support

Instead of converting directly to a subDB when the first duplicate
item is seen for a key, convert to a subpage instead. Allow the
subpage to grow up to the overflow limit, then convert to a subDB.
This saves a significant amount of space in a typical slapd index
database.

Currently we don't convert back to the smaller form if items are
later deleted. Probably could do that with some hysteresis, e.g.,
convert back from subDB to subpage when the size drops below
(overflow limit/2). Maybe later.
vmware
Howard Chu 13 years ago
parent 0a28548131
commit b7057eb61e
  1. 287
      libraries/libmdb/mdb.c

@ -512,6 +512,7 @@ typedef struct MDB_page {
pgno_t p_pgno; /**< page number */ pgno_t p_pgno; /**< page number */
void * p_next; /**< for in-memory list of freed structs */ void * p_next; /**< for in-memory list of freed structs */
} mp_p; } mp_p;
uint16_t mp_pad;
/** @defgroup mdb_page Page Flags /** @defgroup mdb_page Page Flags
* @ingroup internal * @ingroup internal
* Flags for the page headers. * Flags for the page headers.
@ -523,8 +524,9 @@ typedef struct MDB_page {
#define P_META 0x08 /**< meta page */ #define P_META 0x08 /**< meta page */
#define P_DIRTY 0x10 /**< dirty page */ #define P_DIRTY 0x10 /**< dirty page */
#define P_LEAF2 0x20 /**< for #MDB_DUPFIXED records */ #define P_LEAF2 0x20 /**< for #MDB_DUPFIXED records */
#define P_SUBP 0x40 /**< for #MDB_DUPSORT sub-pages */
/** @} */ /** @} */
uint32_t mp_flags; /**< @ref mdb_page */ uint16_t mp_flags; /**< @ref mdb_page */
#define mp_lower mp_pb.pb.pb_lower #define mp_lower mp_pb.pb.pb_lower
#define mp_upper mp_pb.pb.pb_upper #define mp_upper mp_pb.pb.pb_upper
#define mp_pages mp_pb.pb_pages #define mp_pages mp_pb.pb_pages
@ -566,6 +568,8 @@ typedef struct MDB_page {
#define IS_BRANCH(p) F_ISSET((p)->mp_flags, P_BRANCH) #define IS_BRANCH(p) F_ISSET((p)->mp_flags, P_BRANCH)
/** Test if a page is an overflow page */ /** Test if a page is an overflow page */
#define IS_OVERFLOW(p) F_ISSET((p)->mp_flags, P_OVERFLOW) #define IS_OVERFLOW(p) F_ISSET((p)->mp_flags, P_OVERFLOW)
/** Test if a page is a sub page */
#define IS_SUBP(p) F_ISSET((p)->mp_flags, P_SUBP)
/** The number of overflow pages needed to store the given size. */ /** The number of overflow pages needed to store the given size. */
#define OVPAGES(size, psize) ((PAGEHDRSZ-1 + (size)) / (psize) + 1) #define OVPAGES(size, psize) ((PAGEHDRSZ-1 + (size)) / (psize) + 1)
@ -666,9 +670,6 @@ typedef struct MDB_db {
/** Handle for the default DB. */ /** Handle for the default DB. */
#define MAIN_DBI 1 #define MAIN_DBI 1
/** Identify a data item as a valid sub-DB record */
#define MDB_SUBDATA 0x8200
/** Meta page content. */ /** Meta page content. */
typedef struct MDB_meta { typedef struct MDB_meta {
/** Stamp identifying this as an MDB data file. It must be set /** Stamp identifying this as an MDB data file. It must be set
@ -873,6 +874,7 @@ static MDB_node *mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp);
static int mdb_node_add(MDB_cursor *mc, indx_t indx, static int mdb_node_add(MDB_cursor *mc, indx_t indx,
MDB_val *key, MDB_val *data, pgno_t pgno, uint8_t flags); MDB_val *key, MDB_val *data, pgno_t pgno, uint8_t flags);
static void mdb_node_del(MDB_page *mp, indx_t indx, int ksize); static void mdb_node_del(MDB_page *mp, indx_t indx, int ksize);
static void mdb_node_shrink(MDB_page *mp, indx_t indx);
static int mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst); static int mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst);
static int mdb_node_read(MDB_txn *txn, MDB_node *leaf, MDB_val *data); static int mdb_node_read(MDB_txn *txn, MDB_node *leaf, MDB_val *data);
static size_t mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data); static size_t mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data);
@ -2561,8 +2563,8 @@ mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp)
nkeys = NUMKEYS(mp); nkeys = NUMKEYS(mp);
DPRINTF("searching %u keys in %s page %zu", DPRINTF("searching %u keys in %s %spage %zu",
nkeys, IS_LEAF(mp) ? "leaf" : "branch", nkeys, IS_LEAF(mp) ? "leaf" : "branch", IS_SUBP(mp) ? "sub-" : "",
mp->mp_pgno); mp->mp_pgno);
assert(nkeys > 0); assert(nkeys > 0);
@ -2984,6 +2986,7 @@ mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op)
DPUTS("=====> move to next sibling page"); DPUTS("=====> move to next sibling page");
if (mdb_cursor_sibling(mc, 1) != MDB_SUCCESS) { if (mdb_cursor_sibling(mc, 1) != MDB_SUCCESS) {
mc->mc_flags |= C_EOF; mc->mc_flags |= C_EOF;
mc->mc_flags &= ~C_INITIALIZED;
return MDB_NOTFOUND; return MDB_NOTFOUND;
} }
mp = mc->mc_pg[mc->mc_top]; mp = mc->mc_pg[mc->mc_top];
@ -3113,6 +3116,10 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data,
MDB_val nodekey; MDB_val nodekey;
mp = mc->mc_pg[mc->mc_top]; mp = mc->mc_pg[mc->mc_top];
if (!NUMKEYS(mp)) {
mc->mc_ki[mc->mc_top] = 0;
return MDB_NOTFOUND;
}
if (mp->mp_flags & P_LEAF2) { if (mp->mp_flags & P_LEAF2) {
nodekey.mv_size = mc->mc_db->md_pad; nodekey.mv_size = mc->mc_db->md_pad;
nodekey.mv_data = LEAF2KEY(mp, 0, nodekey.mv_size); nodekey.mv_data = LEAF2KEY(mp, 0, nodekey.mv_size);
@ -3170,6 +3177,11 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data,
return MDB_NOTFOUND; return MDB_NOTFOUND;
} }
} }
if (!mc->mc_top) {
/* There are no other pages */
mc->mc_ki[mc->mc_top] = 0;
return MDB_NOTFOUND;
}
} }
rc = mdb_page_search(mc, key, 0); rc = mdb_page_search(mc, key, 0);
@ -3257,9 +3269,11 @@ mdb_cursor_first(MDB_cursor *mc, MDB_val *key, MDB_val *data)
int rc; int rc;
MDB_node *leaf; MDB_node *leaf;
if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) {
rc = mdb_page_search(mc, NULL, 0); rc = mdb_page_search(mc, NULL, 0);
if (rc != MDB_SUCCESS) if (rc != MDB_SUCCESS)
return rc; return rc;
}
assert(IS_LEAF(mc->mc_pg[mc->mc_top])); assert(IS_LEAF(mc->mc_pg[mc->mc_top]));
leaf = NODEPTR(mc->mc_pg[mc->mc_top], 0); leaf = NODEPTR(mc->mc_pg[mc->mc_top], 0);
@ -3302,9 +3316,11 @@ mdb_cursor_last(MDB_cursor *mc, MDB_val *key, MDB_val *data)
lkey.mv_size = MAXKEYSIZE+1; lkey.mv_size = MAXKEYSIZE+1;
lkey.mv_data = NULL; lkey.mv_data = NULL;
if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) {
rc = mdb_page_search(mc, &lkey, 0); rc = mdb_page_search(mc, &lkey, 0);
if (rc != MDB_SUCCESS) if (rc != MDB_SUCCESS)
return rc; return rc;
}
assert(IS_LEAF(mc->mc_pg[mc->mc_top])); assert(IS_LEAF(mc->mc_pg[mc->mc_top]));
leaf = NODEPTR(mc->mc_pg[mc->mc_top], NUMKEYS(mc->mc_pg[mc->mc_top])-1); leaf = NODEPTR(mc->mc_pg[mc->mc_top], NUMKEYS(mc->mc_pg[mc->mc_top])-1);
@ -3488,12 +3504,14 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
{ {
MDB_node *leaf; MDB_node *leaf;
MDB_val xdata, *rdata, dkey; MDB_val xdata, *rdata, dkey;
MDB_page *fp;
MDB_db dummy; MDB_db dummy;
char dbuf[PAGESIZE];
int do_sub = 0; int do_sub = 0;
size_t nsize; size_t nsize;
DKBUF;
int rc, rc2; int rc, rc2;
char pbuf[PAGESIZE];
char dbuf[MAXKEYSIZE+1];
DKBUF;
if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_RDONLY)) if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_RDONLY))
return EACCES; return EACCES;
@ -3564,37 +3582,111 @@ top:
if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT)) { if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT)) {
/* Was a single item before, must convert now */ /* Was a single item before, must convert now */
if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) { if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
/* Just overwrite the current item */
if (flags == MDB_CURRENT)
goto current;
/* create a fake page for the dup items */
dkey.mv_size = NODEDSZ(leaf); dkey.mv_size = NODEDSZ(leaf);
dkey.mv_data = dbuf; dkey.mv_data = NODEDATA(leaf);
memcpy(dbuf, NODEDATA(leaf), dkey.mv_size);
/* data matches, ignore it */ /* data matches, ignore it */
if (!mc->mc_dbx->md_dcmp(data, &dkey)) if (!mc->mc_dbx->md_dcmp(data, &dkey))
return (flags == MDB_NODUPDATA) ? MDB_KEYEXIST : MDB_SUCCESS; return (flags == MDB_NODUPDATA) ? MDB_KEYEXIST : MDB_SUCCESS;
memset(&dummy, 0, sizeof(dummy)); memcpy(dbuf, dkey.mv_data, dkey.mv_size);
dkey.mv_data = dbuf;
fp = (MDB_page *)pbuf;
fp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
fp->mp_flags = P_LEAF|P_DIRTY|P_SUBP;
fp->mp_lower = PAGEHDRSZ;
fp->mp_upper = PAGEHDRSZ + dkey.mv_size + data->mv_size;
if (mc->mc_db->md_flags & MDB_DUPFIXED) {
fp->mp_flags |= P_LEAF2;
fp->mp_pad = data->mv_size;
} else {
fp->mp_upper += 2 * sizeof(indx_t) + 2 * NODESIZE +
(dkey.mv_size & 1) + (data->mv_size & 1);
}
mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
do_sub = 1;
rdata = &xdata;
xdata.mv_size = fp->mp_upper;
xdata.mv_data = pbuf;
flags |= F_DUPDATA;
goto new_sub;
}
if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) {
/* See if we need to convert from fake page to subDB */
MDB_page *mp;
unsigned int offset;
unsigned int i;
fp = NODEDATA(leaf);
if (flags == MDB_CURRENT) {
fp->mp_flags |= P_DIRTY;
fp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
mc->mc_xcursor->mx_cursor.mc_pg[0] = fp;
flags |= F_DUPDATA;
goto put_sub;
}
if (mc->mc_db->md_flags & MDB_DUPFIXED) {
offset = fp->mp_pad;
} else {
offset = NODESIZE + sizeof(indx_t) + data->mv_size;
}
offset += offset & 1;
if (NODEDSZ(leaf) + offset >= mc->mc_txn->mt_env->me_psize / MDB_MINKEYS) {
/* yes, convert it */
dummy.md_flags = 0;
if (mc->mc_db->md_flags & MDB_DUPFIXED) { if (mc->mc_db->md_flags & MDB_DUPFIXED) {
dummy.md_pad = data->mv_size; dummy.md_pad = fp->mp_pad;
dummy.md_flags = MDB_DUPFIXED; dummy.md_flags = MDB_DUPFIXED;
if (mc->mc_db->md_flags & MDB_INTEGERDUP) if (mc->mc_db->md_flags & MDB_INTEGERDUP)
dummy.md_flags |= MDB_INTEGERKEY; dummy.md_flags |= MDB_INTEGERKEY;
} }
dummy.md_flags |= MDB_SUBDATA; dummy.md_depth = 1;
dummy.md_root = P_INVALID; dummy.md_branch_pages = 0;
if (dkey.mv_size == sizeof(MDB_db)) { dummy.md_leaf_pages = 1;
memcpy(NODEDATA(leaf), &dummy, sizeof(dummy)); dummy.md_overflow_pages = 0;
goto put_sub; dummy.md_entries = NUMKEYS(fp);
}
mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
do_sub = 1;
rdata = &xdata; rdata = &xdata;
xdata.mv_size = sizeof(MDB_db); xdata.mv_size = sizeof(MDB_db);
xdata.mv_data = &dummy; xdata.mv_data = &dummy;
/* new sub-DB, must fully init xcursor */ mp = mdb_page_alloc(mc, 1);
if (flags == MDB_CURRENT) if (!mp)
flags = 0; return ENOMEM;
offset = mc->mc_txn->mt_env->me_psize - NODEDSZ(leaf);
flags |= F_DUPDATA|F_SUBDATA;
dummy.md_root = mp->mp_pgno;
} else {
/* no, just grow it */
rdata = &xdata;
xdata.mv_size = NODEDSZ(leaf) + offset;
xdata.mv_data = pbuf;
mp = (MDB_page *)pbuf;
mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
flags |= F_DUPDATA;
}
mp->mp_flags = fp->mp_flags | P_DIRTY;
mp->mp_pad = fp->mp_pad;
mp->mp_lower = fp->mp_lower;
mp->mp_upper = fp->mp_upper + offset;
if (IS_LEAF2(fp)) {
memcpy(METADATA(mp), METADATA(fp), NUMKEYS(fp) * fp->mp_pad);
} else {
nsize = NODEDSZ(leaf) - fp->mp_upper;
memcpy((char *)mp + mp->mp_upper, (char *)fp + fp->mp_upper, nsize);
for (i=0; i<NUMKEYS(fp); i++)
mp->mp_ptrs[i] = fp->mp_ptrs[i] + offset;
}
mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
do_sub = 1;
goto new_sub; goto new_sub;
} }
/* data is on sub-DB, just store it */
flags |= F_DUPDATA|F_SUBDATA;
goto put_sub; goto put_sub;
} }
current:
/* same size, just replace it */ /* same size, just replace it */
if (!F_ISSET(leaf->mn_flags, F_BIGDATA) && if (!F_ISSET(leaf->mn_flags, F_BIGDATA) &&
NODEDSZ(leaf) == data->mv_size) { NODEDSZ(leaf) == data->mv_size) {
@ -3621,9 +3713,9 @@ new_sub:
mc->mc_txn->mt_flags |= MDB_TXN_ERROR; mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
else { else {
/* Remember if we just added a subdatabase */ /* Remember if we just added a subdatabase */
if (flags & F_SUBDATA) { if (flags & (F_SUBDATA|F_DUPDATA)) {
leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
leaf->mn_flags |= F_SUBDATA; leaf->mn_flags |= (flags & (F_SUBDATA|F_DUPDATA));
} }
/* Now store the actual data in the child DB. Note that we're /* Now store the actual data in the child DB. Note that we're
@ -3633,26 +3725,32 @@ new_sub:
*/ */
if (do_sub) { if (do_sub) {
MDB_db *db; MDB_db *db;
leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); int xflags;
put_sub: put_sub:
if (flags != MDB_CURRENT)
mdb_xcursor_init1(mc, leaf);
xdata.mv_size = 0; xdata.mv_size = 0;
xdata.mv_data = ""; xdata.mv_data = "";
if (flags == MDB_NODUPDATA) if (flags & MDB_CURRENT) {
flags = MDB_NOOVERWRITE; xflags = MDB_CURRENT;
} else {
mdb_xcursor_init1(mc, leaf);
xflags = (flags & MDB_NODUPDATA) ? MDB_NOOVERWRITE : 0;
}
/* converted, write the original data first */ /* converted, write the original data first */
if (dkey.mv_size) { if (dkey.mv_size) {
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, flags); rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, xflags);
if (rc) if (rc)
return rc; return rc;
leaf->mn_flags |= F_DUPDATA;
} }
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, flags); rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags);
if (flags & F_SUBDATA) {
db = NODEDATA(leaf); db = NODEDATA(leaf);
assert((db->md_flags & MDB_SUBDATA) == MDB_SUBDATA);
memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db)); memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
} }
}
/* sub-writes might have failed so check rc again.
* Don't increment count if we just replaced an existing item.
*/
if (!rc && !(flags & MDB_CURRENT))
mc->mc_db->md_entries++; mc->mc_db->md_entries++;
} }
done: done:
@ -3679,18 +3777,27 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
if (!IS_LEAF2(mc->mc_pg[mc->mc_top]) && F_ISSET(leaf->mn_flags, F_DUPDATA)) { if (!IS_LEAF2(mc->mc_pg[mc->mc_top]) && F_ISSET(leaf->mn_flags, F_DUPDATA)) {
if (flags != MDB_NODUPDATA) { if (flags != MDB_NODUPDATA) {
if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) {
mc->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf);
}
rc = mdb_cursor_del(&mc->mc_xcursor->mx_cursor, 0); rc = mdb_cursor_del(&mc->mc_xcursor->mx_cursor, 0);
/* If sub-DB still has entries, we're done */ /* If sub-DB still has entries, we're done */
if (mc->mc_xcursor->mx_db.md_root != P_INVALID) { if (mc->mc_xcursor->mx_db.md_root != P_INVALID) {
if (leaf->mn_flags & F_SUBDATA) {
/* update subDB info */
MDB_db *db = NODEDATA(leaf); MDB_db *db = NODEDATA(leaf);
assert((db->md_flags & MDB_SUBDATA) == MDB_SUBDATA);
memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db)); memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
} else {
/* shrink fake page */
mdb_node_shrink(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
}
mc->mc_db->md_entries--; mc->mc_db->md_entries--;
return rc; return rc;
} }
/* otherwise fall thru and delete the sub-DB */ /* otherwise fall thru and delete the sub-DB */
} }
if (leaf->mn_flags & F_SUBDATA) {
/* add all the child DB's pages to the free list */ /* add all the child DB's pages to the free list */
rc = mdb_page_search(&mc->mc_xcursor->mx_cursor, NULL, 0); rc = mdb_page_search(&mc->mc_xcursor->mx_cursor, NULL, 0);
if (rc == MDB_SUCCESS) { if (rc == MDB_SUCCESS) {
@ -3703,26 +3810,37 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
mx->mc_db->md_entries; mx->mc_db->md_entries;
mdb_cursor_pop(mx); mdb_cursor_pop(mx);
while (mx->mc_snum > 1) { while (mx->mc_snum > 0) {
for (i=0; i<NUMKEYS(mx->mc_pg[mx->mc_top]); i++) { for (i=0; i<NUMKEYS(mx->mc_pg[mx->mc_top]); i++) {
MDB_page *mp;
pgno_t pg; pgno_t pg;
ni = NODEPTR(mx->mc_pg[mx->mc_top], i); ni = NODEPTR(mx->mc_pg[mx->mc_top], i);
pg = NODEPGNO(ni); pg = NODEPGNO(ni);
if ((rc = mdb_page_get(mc->mc_txn, pg, &mp)))
return rc;
/* free it */ /* free it */
mdb_midl_append(mc->mc_txn->mt_free_pgs, pg); mdb_midl_append(mc->mc_txn->mt_free_pgs, pg);
} }
if (!mx->mc_top)
break;
rc = mdb_cursor_sibling(mx, 1); rc = mdb_cursor_sibling(mx, 1);
if (rc) if (rc) {
/* no more siblings, go back to beginning
* of previous level. (stack was already popped
* by mdb_cursor_sibling)
*/
for (i=1; i<mx->mc_top; i++) {
pgno_t pg;
ni = NODEPTR(mx->mc_pg[i-1],0);
pg = NODEPGNO(ni);
if ((rc = mdb_page_get(mc->mc_txn, pg, &mx->mc_pg[i])))
break; break;
} }
}
}
/* free it */ /* free it */
mdb_midl_append(mc->mc_txn->mt_free_pgs, mdb_midl_append(mc->mc_txn->mt_free_pgs,
mx->mc_db->md_root); mx->mc_db->md_root);
} }
} }
}
return mdb_cursor_del0(mc, leaf); return mdb_cursor_del0(mc, leaf);
} }
@ -3839,8 +3957,9 @@ mdb_node_add(MDB_cursor *mc, indx_t indx,
assert(mp->mp_upper >= mp->mp_lower); assert(mp->mp_upper >= mp->mp_lower);
DPRINTF("add to %s page %zu index %i, data size %zu key size %zu [%s]", DPRINTF("add to %s %spage %zu index %i, data size %zu key size %zu [%s]",
IS_LEAF(mp) ? "leaf" : "branch", IS_LEAF(mp) ? "leaf" : "branch",
IS_SUBP(mp) ? "sub-" : "",
mp->mp_pgno, indx, data ? data->mv_size : 0, mp->mp_pgno, indx, data ? data->mv_size : 0,
key ? key->mv_size : 0, key ? DKEY(key) : NULL); key ? key->mv_size : 0, key ? DKEY(key) : NULL);
@ -3991,6 +4110,58 @@ mdb_node_del(MDB_page *mp, indx_t indx, int ksize)
mp->mp_upper += sz; mp->mp_upper += sz;
} }
/** Compact the main page after deleting a node on a subpage.
* @param[in] mp The main page to operate on.
* @param[in] indx The index of the subpage on the main page.
*/
static void
mdb_node_shrink(MDB_page *mp, indx_t indx)
{
MDB_node *node;
MDB_page *sp, *xp;
char *base;
int osize, nsize;
int delta;
indx_t i, numkeys, ptr;
node = NODEPTR(mp, indx);
sp = (MDB_page *)NODEDATA(node);
osize = NODEDSZ(node);
delta = sp->mp_upper - sp->mp_lower;
SETDSZ(node, osize - delta);
xp = (MDB_page *)((char *)sp + delta);
/* shift subpage upward */
if (IS_LEAF2(sp)) {
nsize = NUMKEYS(sp) * sp->mp_pad;
memmove(METADATA(xp), METADATA(sp), nsize);
} else {
int i;
nsize = osize - sp->mp_upper;
numkeys = NUMKEYS(sp);
for (i=numkeys-1; i>=0; i--)
xp->mp_ptrs[i] = sp->mp_ptrs[i] - delta;
}
xp->mp_upper = sp->mp_lower;
xp->mp_lower = sp->mp_lower;
xp->mp_flags = sp->mp_flags;
xp->mp_pad = sp->mp_pad;
xp->mp_pgno = mp->mp_pgno;
/* shift lower nodes upward */
ptr = mp->mp_ptrs[indx];
numkeys = NUMKEYS(mp);
for (i = 0; i < numkeys; i++) {
if (mp->mp_ptrs[i] <= ptr)
mp->mp_ptrs[i] += delta;
}
base = (char *)mp + mp->mp_upper;
memmove(base + delta, base, ptr - mp->mp_upper + NODESIZE + NODEKSZ(node));
mp->mp_upper += delta;
}
/** Initial setup of a sorted-dups cursor. /** Initial setup of a sorted-dups cursor.
* Sorted duplicates are implemented as a sub-database for the given key. * Sorted duplicates are implemented as a sub-database for the given key.
* The duplicate data items are actually keys of the sub-database. * The duplicate data items are actually keys of the sub-database.
@ -4026,18 +4197,41 @@ mdb_xcursor_init0(MDB_cursor *mc)
static void static void
mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node) mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node)
{ {
MDB_db *db = NODEDATA(node);
MDB_xcursor *mx = mc->mc_xcursor; MDB_xcursor *mx = mc->mc_xcursor;
assert((db->md_flags & MDB_SUBDATA) == MDB_SUBDATA);
if (node->mn_flags & F_SUBDATA) {
MDB_db *db = NODEDATA(node);
mx->mx_db = *db; mx->mx_db = *db;
mx->mx_cursor.mc_snum = 0;
mx->mx_cursor.mc_flags = 0;
} else {
MDB_page *fp = NODEDATA(node);
mx->mx_db.md_pad = mc->mc_pg[mc->mc_top]->mp_pad;
mx->mx_db.md_flags = 0;
mx->mx_db.md_depth = 1;
mx->mx_db.md_branch_pages = 0;
mx->mx_db.md_leaf_pages = 1;
mx->mx_db.md_overflow_pages = 0;
mx->mx_db.md_entries = NUMKEYS(fp);
mx->mx_db.md_root = fp->mp_pgno;
mx->mx_cursor.mc_snum = 1;
mx->mx_cursor.mc_flags = C_INITIALIZED;
mx->mx_cursor.mc_top = 0;
mx->mx_cursor.mc_pg[0] = fp;
mx->mx_cursor.mc_ki[0] = 0;
if (mc->mc_db->md_flags & MDB_DUPFIXED) {
mx->mx_db.md_flags = MDB_DUPFIXED;
mx->mx_db.md_pad = fp->mp_pad;
if (mc->mc_db->md_flags & MDB_INTEGERDUP)
mx->mx_db.md_flags |= MDB_INTEGERKEY;
}
}
DPRINTF("Sub-db %u for db %u root page %zu", mx->mx_cursor.mc_dbi, mc->mc_dbi, DPRINTF("Sub-db %u for db %u root page %zu", mx->mx_cursor.mc_dbi, mc->mc_dbi,
db->md_root); mx->mx_db.md_root);
if (F_ISSET(mc->mc_pg[mc->mc_top]->mp_flags, P_DIRTY)) if (F_ISSET(mc->mc_pg[mc->mc_top]->mp_flags, P_DIRTY))
mx->mx_dbx.md_dirty = 1; mx->mx_dbx.md_dirty = 1;
mx->mx_dbx.md_name.mv_data = NODEKEY(node); mx->mx_dbx.md_name.mv_data = NODEKEY(node);
mx->mx_dbx.md_name.mv_size = node->mn_ksize; mx->mx_dbx.md_name.mv_size = node->mn_ksize;
mx->mx_cursor.mc_snum = 0;
mx->mx_cursor.mc_flags = 0;
if (mx->mx_dbx.md_cmp == mdb_cmp_int && mx->mx_db.md_pad == sizeof(size_t)) if (mx->mx_dbx.md_cmp == mdb_cmp_int && mx->mx_db.md_pad == sizeof(size_t))
mx->mx_dbx.md_cmp = mdb_cmp_long; mx->mx_dbx.md_cmp = mdb_cmp_long;
} }
@ -4468,6 +4662,7 @@ mdb_rebalance(MDB_cursor *mc)
if (PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) >= FILL_THRESHOLD && NUMKEYS(mn.mc_pg[mn.mc_top]) >= 2) if (PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) >= FILL_THRESHOLD && NUMKEYS(mn.mc_pg[mn.mc_top]) >= 2)
return mdb_node_move(&mn, mc); return mdb_node_move(&mn, mc);
else { /* FIXME: if (has_enough_room()) */ else { /* FIXME: if (has_enough_room()) */
mc->mc_flags &= ~C_INITIALIZED;
if (mc->mc_ki[ptop] == 0) if (mc->mc_ki[ptop] == 0)
return mdb_page_merge(&mn, mc); return mdb_page_merge(&mn, mc);
else else

Loading…
Cancel
Save