Track changes to all cursors.

For any change to a page or node, update all other cursors pointing
at the same page (or node). Cursors are now stored in a linked list
off their owning transaction. Cursors are all closed when the transaction
ends. Cursors in parent transactions are updated when their child
transaction commits.
vmware
Howard Chu 13 years ago
parent c4d5e6e786
commit b9d13a29e6
  1. 361
      libraries/libmdb/mdb.c
  2. 3
      libraries/libmdb/mdb.h

@ -731,6 +731,8 @@ struct MDB_txn {
#define DB_DIRTY 0x01 /**< DB was written in this txn */ #define DB_DIRTY 0x01 /**< DB was written in this txn */
#define DB_STALE 0x02 /**< DB record is older than txnID */ #define DB_STALE 0x02 /**< DB record is older than txnID */
/** @} */ /** @} */
/** Array of cursors for each DB */
MDB_cursor **mt_cursors;
/** Array of flags for each DB */ /** Array of flags for each DB */
unsigned char *mt_dbflags; unsigned char *mt_dbflags;
/** Number of DB records in use. This number only ever increments; /** Number of DB records in use. This number only ever increments;
@ -762,6 +764,10 @@ struct MDB_xcursor;
/** Cursors are used for all DB operations */ /** Cursors are used for all DB operations */
struct MDB_cursor { struct MDB_cursor {
/** Next cursor on this DB in this txn */
MDB_cursor *mc_next;
/** Original cursor if this is a shadow */
MDB_cursor *mc_orig;
/** Context used for databases with #MDB_DUPSORT, otherwise NULL */ /** Context used for databases with #MDB_DUPSORT, otherwise NULL */
struct MDB_xcursor *mc_xcursor; struct MDB_xcursor *mc_xcursor;
/** The transaction that owns this cursor */ /** The transaction that owns this cursor */
@ -783,6 +789,8 @@ struct MDB_cursor {
*/ */
#define C_INITIALIZED 0x01 /**< cursor has been initialized and is valid */ #define C_INITIALIZED 0x01 /**< cursor has been initialized and is valid */
#define C_EOF 0x02 /**< No more data */ #define C_EOF 0x02 /**< No more data */
#define C_SUB 0x04 /**< Cursor is a sub-cursor */
#define C_SHADOW 0x08 /**< Cursor is a dup from a parent txn */
/** @} */ /** @} */
unsigned int mc_flags; /**< @ref mdb_cursor */ unsigned int mc_flags; /**< @ref mdb_cursor */
MDB_page *mc_pg[CURSOR_STACK]; /**< stack of pushed pages */ MDB_page *mc_pg[CURSOR_STACK]; /**< stack of pushed pages */
@ -1160,6 +1168,26 @@ mdb_page_touch(MDB_cursor *mc)
mp->mp_flags |= P_DIRTY; mp->mp_flags |= P_DIRTY;
finish: finish:
/* Adjust other cursors pointing to mp */
if (mc->mc_flags & C_SUB) {
MDB_cursor *m2, *m3;
MDB_dbi dbi = mc->mc_dbi-1;
for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
m3 = &m2->mc_xcursor->mx_cursor;
if (m3->mc_pg[mc->mc_top] == mc->mc_pg[mc->mc_top]) {
m3->mc_pg[mc->mc_top] = mp;
}
}
} else {
MDB_cursor *m2;
for (m2 = mc->mc_txn->mt_cursors[mc->mc_dbi]; m2; m2=m2->mc_next) {
if (m2->mc_pg[mc->mc_top] == mc->mc_pg[mc->mc_top]) {
m2->mc_pg[mc->mc_top] = mp;
}
}
}
mc->mc_pg[mc->mc_top] = mp; mc->mc_pg[mc->mc_top] = mp;
/** If this page has a parent, update the parent to point to /** If this page has a parent, update the parent to point to
* this new page. * this new page.
@ -1208,6 +1236,92 @@ mdb_env_sync(MDB_env *env, int force)
return rc; return rc;
} }
/** Make shadow copies of all of parent txn's cursors */
static int
mdb_cursor_shadow(MDB_txn *src, MDB_txn *dst)
{
MDB_cursor *mc, *m2;
unsigned int i, j, size;
for (i=0;i<src->mt_numdbs; i++) {
if (src->mt_cursors[i]) {
size = sizeof(MDB_cursor);
if (src->mt_cursors[i]->mc_xcursor)
size += sizeof(MDB_xcursor);
for (m2 = src->mt_cursors[i]; m2; m2=m2->mc_next) {
mc = malloc(size);
if (!mc)
return ENOMEM;
mc->mc_orig = m2;
mc->mc_txn = dst;
mc->mc_dbi = i;
mc->mc_db = &dst->mt_dbs[i];
mc->mc_dbx = m2->mc_dbx;
mc->mc_dbflag = &dst->mt_dbflags[i];
mc->mc_snum = m2->mc_snum;
mc->mc_top = m2->mc_top;
mc->mc_flags = m2->mc_flags | C_SHADOW;
for (j=0; j<mc->mc_snum; j++) {
mc->mc_pg[j] = m2->mc_pg[j];
mc->mc_ki[j] = m2->mc_ki[j];
}
if (m2->mc_xcursor) {
MDB_xcursor *mx, *mx2;
mx = (MDB_xcursor *)(mc+1);
mc->mc_xcursor = mx;
mx2 = m2->mc_xcursor;
mx->mx_db = mx2->mx_db;
mx->mx_dbx = mx2->mx_dbx;
mx->mx_dbflag = mx2->mx_dbflag;
mx->mx_cursor.mc_txn = dst;
mx->mx_cursor.mc_dbi = mx2->mx_cursor.mc_dbi;
mx->mx_cursor.mc_db = &mx->mx_db;
mx->mx_cursor.mc_dbx = &mx->mx_dbx;
mx->mx_cursor.mc_dbflag = &mx->mx_dbflag;
mx->mx_cursor.mc_snum = mx2->mx_cursor.mc_snum;
mx->mx_cursor.mc_top = mx2->mx_cursor.mc_top;
mx->mx_cursor.mc_flags = mx2->mx_cursor.mc_flags | C_SHADOW;
for (j=0; j<mx2->mx_cursor.mc_snum; j++) {
mx->mx_cursor.mc_pg[j] = mx2->mx_cursor.mc_pg[j];
mx->mx_cursor.mc_ki[j] = mx2->mx_cursor.mc_ki[j];
}
} else {
mc->mc_xcursor = NULL;
}
mc->mc_next = dst->mt_cursors[i];
dst->mt_cursors[i] = mc;
}
}
}
return MDB_SUCCESS;
}
/** Merge shadow cursors back into parent's */
static void
mdb_cursor_merge(MDB_txn *txn)
{
MDB_dbi i;
for (i=0; i<txn->mt_numdbs; i++) {
if (txn->mt_cursors[i]) {
MDB_cursor *mc;
while ((mc = txn->mt_cursors[i])) {
txn->mt_cursors[i] = mc->mc_next;
if (mc->mc_flags & C_SHADOW) {
MDB_cursor *m2 = mc->mc_orig;
unsigned int j;
m2->mc_snum = mc->mc_snum;
m2->mc_top = mc->mc_top;
for (j=0; j<mc->mc_snum; j++) {
m2->mc_pg[j] = mc->mc_pg[j];
m2->mc_ki[j] = mc->mc_ki[j];
}
}
free(mc);
}
}
}
}
static void static void
mdb_txn_reset0(MDB_txn *txn); mdb_txn_reset0(MDB_txn *txn);
@ -1312,7 +1426,7 @@ int
mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret) mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
{ {
MDB_txn *txn; MDB_txn *txn;
int rc; int rc, size;
if (env->me_flags & MDB_FATAL_ERROR) { if (env->me_flags & MDB_FATAL_ERROR) {
DPUTS("environment had fatal error, must shutdown!"); DPUTS("environment had fatal error, must shutdown!");
@ -1324,16 +1438,22 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
return EINVAL; return EINVAL;
} }
} }
if ((txn = calloc(1, sizeof(MDB_txn) + size = sizeof(MDB_txn) + env->me_maxdbs * (sizeof(MDB_db)+1);
env->me_maxdbs * (sizeof(MDB_db)+1))) == NULL) { if (!(flags & MDB_RDONLY))
size += env->me_maxdbs * sizeof(MDB_cursor *);
if ((txn = calloc(1, size)) == NULL) {
DPRINTF("calloc: %s", strerror(ErrCode())); DPRINTF("calloc: %s", strerror(ErrCode()));
return ENOMEM; return ENOMEM;
} }
txn->mt_dbs = (MDB_db *)(txn+1); txn->mt_dbs = (MDB_db *)(txn+1);
if (flags & MDB_RDONLY) { if (flags & MDB_RDONLY) {
txn->mt_flags |= MDB_TXN_RDONLY; txn->mt_flags |= MDB_TXN_RDONLY;
txn->mt_dbflags = (unsigned char *)(txn->mt_dbs + env->me_maxdbs);
} else {
txn->mt_cursors = (MDB_cursor **)(txn->mt_dbs + env->me_maxdbs);
txn->mt_dbflags = (unsigned char *)(txn->mt_cursors + env->me_maxdbs);
} }
txn->mt_dbflags = (unsigned char *)(txn->mt_dbs + env->me_maxdbs);
txn->mt_env = env; txn->mt_env = env;
if (parent) { if (parent) {
@ -1359,6 +1479,7 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
txn->mt_dbxs = parent->mt_dbxs; txn->mt_dbxs = parent->mt_dbxs;
memcpy(txn->mt_dbs, parent->mt_dbs, txn->mt_numdbs * sizeof(MDB_db)); memcpy(txn->mt_dbs, parent->mt_dbs, txn->mt_numdbs * sizeof(MDB_db));
memcpy(txn->mt_dbflags, parent->mt_dbflags, txn->mt_numdbs); memcpy(txn->mt_dbflags, parent->mt_dbflags, txn->mt_numdbs);
mdb_cursor_shadow(parent, txn);
rc = 0; rc = 0;
} else { } else {
rc = mdb_txn_renew0(txn); rc = mdb_txn_renew0(txn);
@ -1390,6 +1511,17 @@ mdb_txn_reset0(MDB_txn *txn)
MDB_page *dp; MDB_page *dp;
unsigned int i; unsigned int i;
/* close(free) all cursors */
for (i=0; i<txn->mt_numdbs; i++) {
if (txn->mt_cursors[i]) {
MDB_cursor *mc;
while ((mc = txn->mt_cursors[i])) {
txn->mt_cursors[i] = mc->mc_next;
free(mc);
}
}
}
/* return all dirty pages to dpage list */ /* return all dirty pages to dpage list */
for (i=1; i<=txn->mt_u.dirty_list[0].mid; i++) { for (i=1; i<=txn->mt_u.dirty_list[0].mid; i++) {
dp = txn->mt_u.dirty_list[i].mptr; dp = txn->mt_u.dirty_list[i].mptr;
@ -1497,7 +1629,6 @@ mdb_txn_commit(MDB_txn *txn)
return MDB_SUCCESS; return MDB_SUCCESS;
} }
if (F_ISSET(txn->mt_flags, MDB_TXN_ERROR)) { if (F_ISSET(txn->mt_flags, MDB_TXN_ERROR)) {
DPUTS("error flag is set, can't commit"); DPUTS("error flag is set, can't commit");
if (txn->mt_parent) if (txn->mt_parent)
@ -1506,6 +1637,9 @@ mdb_txn_commit(MDB_txn *txn)
return EINVAL; return EINVAL;
} }
/* Merge (and close) our cursors with parent's */
mdb_cursor_merge(txn);
if (txn->mt_parent) { if (txn->mt_parent) {
MDB_db *ip, *jp; MDB_db *ip, *jp;
MDB_dbi i; MDB_dbi i;
@ -1532,21 +1666,11 @@ mdb_txn_commit(MDB_txn *txn)
src = txn->mt_u.dirty_list; src = txn->mt_u.dirty_list;
x = mdb_mid2l_search(dst, src[1].mid); x = mdb_mid2l_search(dst, src[1].mid);
for (y=1; y<=src[0].mid; y++) { for (y=1; y<=src[0].mid; y++) {
MDB_page *mp;
while (x <= dst[0].mid && dst[x].mid != src[y].mid) x++; while (x <= dst[0].mid && dst[x].mid != src[y].mid) x++;
if (x > dst[0].mid) if (x > dst[0].mid)
break; break;
mp = dst[x].mptr; free(dst[x].mptr);
dp = src[y].mptr; dst[x].mptr = src[y].mptr;
mp->mp_lower = dp->mp_lower;
mp->mp_upper = dp->mp_upper;
if (IS_LEAF2(mp)) {
memcpy(METADATA(mp), METADATA(dp), NUMKEYS(mp) * mp->mp_pad);
} else {
memcpy((char *)mp+mp->mp_upper, (char *)dp+dp->mp_upper,
txn->mt_env->me_psize - mp->mp_upper);
}
free(src[y].mptr);
} }
x = dst[0].mid; x = dst[0].mid;
for (; y<=src[0].mid; y++) { for (; y<=src[0].mid; y++) {
@ -2838,6 +2962,20 @@ mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp)
return node; return node;
} }
#if 0
static void
mdb_cursor_adjust(MDB_cursor *mc, func)
{
MDB_cursor *m2;
for (m2 = mc->mc_txn->mt_cursors[mc->mc_dbi]; m2; m2=m2->mc_next) {
if (m2->mc_pg[m2->mc_top] == mc->mc_pg[mc->mc_top]) {
func(mc, m2);
}
}
}
#endif
/** Pop a page off the top of the cursor's stack. */ /** Pop a page off the top of the cursor's stack. */
static void static void
mdb_cursor_pop(MDB_cursor *mc) mdb_cursor_pop(MDB_cursor *mc)
@ -3790,12 +3928,13 @@ top:
if (flags == MDB_CURRENT) if (flags == MDB_CURRENT)
goto current; goto current;
/* create a fake page for the dup items */
dkey.mv_size = NODEDSZ(leaf); dkey.mv_size = NODEDSZ(leaf);
dkey.mv_data = NODEDATA(leaf); dkey.mv_data = NODEDATA(leaf);
/* data matches, ignore it */ /* data matches, ignore it */
if (!mc->mc_dbx->md_dcmp(data, &dkey)) if (!mc->mc_dbx->md_dcmp(data, &dkey))
return (flags == MDB_NODUPDATA) ? MDB_KEYEXIST : MDB_SUCCESS; return (flags == MDB_NODUPDATA) ? MDB_KEYEXIST : MDB_SUCCESS;
/* create a fake page for the dup items */
memcpy(dbuf, dkey.mv_data, dkey.mv_size); memcpy(dbuf, dkey.mv_data, dkey.mv_size);
dkey.mv_data = dbuf; dkey.mv_data = dbuf;
fp = (MDB_page *)pbuf; fp = (MDB_page *)pbuf;
@ -3913,6 +4052,26 @@ new_sub:
} else { } else {
/* There is room already in this leaf page. */ /* There is room already in this leaf page. */
rc = mdb_node_add(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, 0); rc = mdb_node_add(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, 0);
if (rc == 0 && !do_sub) {
/* Adjust other cursors pointing to mp */
MDB_cursor *m2, *m3;
MDB_dbi dbi = mc->mc_dbi;
unsigned i = mc->mc_top;
MDB_page *mp = mc->mc_pg[i];
if (mc->mc_flags & C_SUB)
dbi--;
for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
if (mc->mc_flags & C_SUB)
m3 = &m2->mc_xcursor->mx_cursor;
else
m3 = m2;
if (m3->mc_pg[i] == mp && m3->mc_ki[i] >= mc->mc_ki[i]) {
m3->mc_ki[i]++;
}
}
}
} }
if (rc != MDB_SUCCESS) if (rc != MDB_SUCCESS)
@ -3946,6 +4105,18 @@ put_sub:
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, xflags); rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, xflags);
if (rc) if (rc)
return rc; return rc;
{
/* Adjust other cursors pointing to mp */
MDB_cursor *m2;
unsigned i = mc->mc_top;
MDB_page *mp = mc->mc_pg[i];
for (m2 = mc->mc_txn->mt_cursors[mc->mc_dbi]; m2; m2=m2->mc_next) {
if (m2->mc_pg[i] == mp && m2->mc_ki[i] == mc->mc_ki[i]) {
mdb_xcursor_init1(m2, leaf);
}
}
}
} }
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags); rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags);
if (flags & F_SUBDATA) { if (flags & F_SUBDATA) {
@ -4373,7 +4544,7 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node)
MDB_db *db = NODEDATA(node); MDB_db *db = NODEDATA(node);
mx->mx_db = *db; mx->mx_db = *db;
mx->mx_cursor.mc_snum = 0; mx->mx_cursor.mc_snum = 0;
mx->mx_cursor.mc_flags = 0; mx->mx_cursor.mc_flags = C_SUB;
} else { } else {
MDB_page *fp = NODEDATA(node); MDB_page *fp = NODEDATA(node);
mx->mx_db.md_pad = mc->mc_pg[mc->mc_top]->mp_pad; mx->mx_db.md_pad = mc->mc_pg[mc->mc_top]->mp_pad;
@ -4385,7 +4556,7 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node)
mx->mx_db.md_entries = NUMKEYS(fp); mx->mx_db.md_entries = NUMKEYS(fp);
mx->mx_db.md_root = fp->mp_pgno; mx->mx_db.md_root = fp->mp_pgno;
mx->mx_cursor.mc_snum = 1; mx->mx_cursor.mc_snum = 1;
mx->mx_cursor.mc_flags = C_INITIALIZED; mx->mx_cursor.mc_flags = C_INITIALIZED|C_SUB;
mx->mx_cursor.mc_top = 0; mx->mx_cursor.mc_top = 0;
mx->mx_cursor.mc_pg[0] = fp; mx->mx_cursor.mc_pg[0] = fp;
mx->mx_cursor.mc_ki[0] = 0; mx->mx_cursor.mc_ki[0] = 0;
@ -4410,6 +4581,7 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node)
static void static void
mdb_cursor_init(MDB_cursor *mc, MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx) mdb_cursor_init(MDB_cursor *mc, MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
{ {
mc->mc_orig = NULL;
mc->mc_dbi = dbi; mc->mc_dbi = dbi;
mc->mc_txn = txn; mc->mc_txn = txn;
mc->mc_db = &txn->mt_dbs[dbi]; mc->mc_db = &txn->mt_dbs[dbi];
@ -4444,6 +4616,10 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
mx = (MDB_xcursor *)(mc + 1); mx = (MDB_xcursor *)(mc + 1);
} }
mdb_cursor_init(mc, txn, dbi, mx); mdb_cursor_init(mc, txn, dbi, mx);
if (txn->mt_cursors) {
mc->mc_next = txn->mt_cursors[dbi];
txn->mt_cursors[dbi] = mc;
}
} else { } else {
return ENOMEM; return ENOMEM;
} }
@ -4481,6 +4657,13 @@ void
mdb_cursor_close(MDB_cursor *mc) mdb_cursor_close(MDB_cursor *mc)
{ {
if (mc != NULL) { if (mc != NULL) {
/* remove from txn, if tracked */
if (mc->mc_txn->mt_cursors) {
MDB_cursor **prev = &mc->mc_txn->mt_cursors[mc->mc_dbi];
while (*prev && *prev != mc) prev = &(*prev)->mc_next;
if (*prev == mc)
*prev = mc->mc_next;
}
free(mc); free(mc);
} }
} }
@ -4594,6 +4777,28 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst)
*/ */
mdb_node_del(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top], key.mv_size); mdb_node_del(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top], key.mv_size);
{
/* Adjust other cursors pointing to mp */
MDB_cursor *m2, *m3;
MDB_dbi dbi = csrc->mc_dbi;
MDB_page *mp = csrc->mc_pg[csrc->mc_top];
if (csrc->mc_flags & C_SUB)
dbi--;
for (m2 = csrc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
if (csrc->mc_flags & C_SUB)
m3 = &m2->mc_xcursor->mx_cursor;
else
m3 = m2;
if (m3->mc_pg[csrc->mc_top] == mp && m3->mc_ki[csrc->mc_top] ==
csrc->mc_ki[csrc->mc_top]) {
m3->mc_pg[csrc->mc_top] = cdst->mc_pg[cdst->mc_top];
m3->mc_ki[csrc->mc_top] = cdst->mc_ki[cdst->mc_top];
}
}
}
/* Update the parent separators. /* Update the parent separators.
*/ */
if (csrc->mc_ki[csrc->mc_top] == 0) { if (csrc->mc_ki[csrc->mc_top] == 0) {
@ -4657,6 +4862,7 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst)
indx_t i, j; indx_t i, j;
MDB_node *srcnode; MDB_node *srcnode;
MDB_val key, data; MDB_val key, data;
unsigned nkeys;
DPRINTF("merging page %zu into %zu", csrc->mc_pg[csrc->mc_top]->mp_pgno, DPRINTF("merging page %zu into %zu", csrc->mc_pg[csrc->mc_top]->mp_pgno,
cdst->mc_pg[cdst->mc_top]->mp_pgno); cdst->mc_pg[cdst->mc_top]->mp_pgno);
@ -4670,7 +4876,7 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst)
/* Move all nodes from src to dst. /* Move all nodes from src to dst.
*/ */
j = NUMKEYS(cdst->mc_pg[cdst->mc_top]); j = nkeys = NUMKEYS(cdst->mc_pg[cdst->mc_top]);
if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) { if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
key.mv_size = csrc->mc_db->md_pad; key.mv_size = csrc->mc_db->md_pad;
key.mv_data = METADATA(csrc->mc_pg[csrc->mc_top]); key.mv_data = METADATA(csrc->mc_pg[csrc->mc_top]);
@ -4711,6 +4917,26 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst)
csrc->mc_db->md_leaf_pages--; csrc->mc_db->md_leaf_pages--;
else else
csrc->mc_db->md_branch_pages--; csrc->mc_db->md_branch_pages--;
{
/* Adjust other cursors pointing to mp */
MDB_cursor *m2, *m3;
MDB_dbi dbi = csrc->mc_dbi;
MDB_page *mp = cdst->mc_pg[cdst->mc_top];
if (csrc->mc_flags & C_SUB)
dbi--;
for (m2 = csrc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
if (csrc->mc_flags & C_SUB)
m3 = &m2->mc_xcursor->mx_cursor;
else
m3 = m2;
if (m3->mc_pg[csrc->mc_top] == csrc->mc_pg[csrc->mc_top]) {
m3->mc_pg[csrc->mc_top] = mp;
m3->mc_ki[csrc->mc_top] += nkeys;
}
}
}
mdb_cursor_pop(csrc); mdb_cursor_pop(csrc);
return mdb_rebalance(csrc); return mdb_rebalance(csrc);
@ -4763,22 +4989,61 @@ mdb_rebalance(MDB_cursor *mc)
} }
if (mc->mc_snum < 2) { if (mc->mc_snum < 2) {
if (NUMKEYS(mc->mc_pg[mc->mc_top]) == 0) { MDB_page *mp = mc->mc_pg[0];
if (NUMKEYS(mp) == 0) {
DPUTS("tree is completely empty"); DPUTS("tree is completely empty");
mc->mc_db->md_root = P_INVALID; mc->mc_db->md_root = P_INVALID;
mc->mc_db->md_depth = 0; mc->mc_db->md_depth = 0;
mc->mc_db->md_leaf_pages = 0; mc->mc_db->md_leaf_pages = 0;
mdb_midl_append(&mc->mc_txn->mt_free_pgs, mc->mc_pg[mc->mc_top]->mp_pgno); mdb_midl_append(&mc->mc_txn->mt_free_pgs, mp->mp_pgno);
mc->mc_snum = 0; mc->mc_snum = 0;
} else if (IS_BRANCH(mc->mc_pg[mc->mc_top]) && NUMKEYS(mc->mc_pg[mc->mc_top]) == 1) { mc->mc_top = 0;
{
/* Adjust other cursors pointing to mp */
MDB_cursor *m2, *m3;
MDB_dbi dbi = mc->mc_dbi;
if (mc->mc_flags & C_SUB)
dbi--;
for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
if (mc->mc_flags & C_SUB)
m3 = &m2->mc_xcursor->mx_cursor;
else
m3 = m2;
if (m3->mc_pg[0] == mp) {
m3->mc_snum = 0;
m3->mc_top = 0;
}
}
}
} else if (IS_BRANCH(mp) && NUMKEYS(mp) == 1) {
DPUTS("collapsing root page!"); DPUTS("collapsing root page!");
mdb_midl_append(&mc->mc_txn->mt_free_pgs, mc->mc_pg[mc->mc_top]->mp_pgno); mdb_midl_append(&mc->mc_txn->mt_free_pgs, mp->mp_pgno);
mc->mc_db->md_root = NODEPGNO(NODEPTR(mc->mc_pg[mc->mc_top], 0)); mc->mc_db->md_root = NODEPGNO(NODEPTR(mp, 0));
if ((rc = mdb_page_get(mc->mc_txn, mc->mc_db->md_root, if ((rc = mdb_page_get(mc->mc_txn, mc->mc_db->md_root,
&mc->mc_pg[mc->mc_top]))) &mc->mc_pg[0])))
return rc; return rc;
mc->mc_db->md_depth--; mc->mc_db->md_depth--;
mc->mc_db->md_branch_pages--; mc->mc_db->md_branch_pages--;
{
/* Adjust other cursors pointing to mp */
MDB_cursor *m2, *m3;
MDB_dbi dbi = mc->mc_dbi;
if (mc->mc_flags & C_SUB)
dbi--;
for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
if (mc->mc_flags & C_SUB)
m3 = &m2->mc_xcursor->mx_cursor;
else
m3 = m2;
if (m3->mc_pg[0] == mp) {
m3->mc_pg[0] = mc->mc_pg[0];
}
}
}
} else } else
DPUTS("root page doesn't need rebalancing"); DPUTS("root page doesn't need rebalancing");
return MDB_SUCCESS; return MDB_SUCCESS;
@ -4925,7 +5190,7 @@ static int
mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno) mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno)
{ {
uint8_t flags; uint8_t flags;
int rc = MDB_SUCCESS, ins_new = 0; int rc = MDB_SUCCESS, ins_new = 0, new_root = 0;
indx_t newindx; indx_t newindx;
pgno_t pgno = 0; pgno_t pgno = 0;
unsigned int i, j, split_indx, nkeys, pmax; unsigned int i, j, split_indx, nkeys, pmax;
@ -4955,6 +5220,7 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
mc->mc_db->md_root = pp->mp_pgno; mc->mc_db->md_root = pp->mp_pgno;
DPRINTF("root split! new root = %zu", pp->mp_pgno); DPRINTF("root split! new root = %zu", pp->mp_pgno);
mc->mc_db->md_depth++; mc->mc_db->md_depth++;
new_root = 1;
/* Add left (implicit) pointer. */ /* Add left (implicit) pointer. */
if ((rc = mdb_node_add(mc, 0, NULL, NULL, mp->mp_pgno, 0)) != MDB_SUCCESS) { if ((rc = mdb_node_add(mc, 0, NULL, NULL, mp->mp_pgno, 0)) != MDB_SUCCESS) {
@ -5102,12 +5368,12 @@ newsep:
rc = mdb_node_add(&mn, mn.mc_ki[ptop], &sepkey, NULL, rp->mp_pgno, 0); rc = mdb_node_add(&mn, mn.mc_ki[ptop], &sepkey, NULL, rp->mp_pgno, 0);
mn.mc_top++; mn.mc_top++;
} }
if (IS_LEAF2(rp)) {
return rc;
}
if (rc != MDB_SUCCESS) { if (rc != MDB_SUCCESS) {
return rc; return rc;
} }
if (IS_LEAF2(rp)) {
goto done;
}
/* Move half of the keys to the right sibling. */ /* Move half of the keys to the right sibling. */
@ -5183,6 +5449,39 @@ newsep:
/* return tmp page to freelist */ /* return tmp page to freelist */
copy->mp_next = mc->mc_txn->mt_env->me_dpages; copy->mp_next = mc->mc_txn->mt_env->me_dpages;
mc->mc_txn->mt_env->me_dpages = copy; mc->mc_txn->mt_env->me_dpages = copy;
done:
{
/* Adjust other cursors pointing to mp */
MDB_cursor *m2, *m3;
MDB_dbi dbi = mc->mc_dbi;
if (mc->mc_flags & C_SUB)
dbi--;
for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
if (mc->mc_flags & C_SUB)
m3 = &m2->mc_xcursor->mx_cursor;
else
m3 = m2;
if (new_root) {
/* root split */
for (i=m3->mc_top; i>0; i--) {
m3->mc_ki[i+1] = m3->mc_ki[i];
m3->mc_pg[i+1] = m3->mc_pg[i];
}
m3->mc_ki[0] = mc->mc_ki[0];
m3->mc_pg[0] = mc->mc_pg[0];
m3->mc_snum++;
m3->mc_top++;
}
if (m3->mc_pg[mc->mc_top] == mp) {
if (m3->mc_ki[m3->mc_top] >= split_indx) {
m3->mc_pg[m3->mc_top] = rp;
m3->mc_ki[m3->mc_top] -= split_indx;
}
}
}
}
return rc; return rc;
} }

@ -843,9 +843,6 @@ void mdb_cursor_close(MDB_cursor *cursor);
* case of the #MDB_SET option, in which the \b key object is unchanged), and * case of the #MDB_SET option, in which the \b key object is unchanged), and
* the address and length of the data are returned in the object to which \b data * the address and length of the data are returned in the object to which \b data
* refers. * refers.
* @bug Cursors are not coordinated with write operations. If a cursor in a
* write transaction is performing a sequential scan while records are being
* inserted or deleted in the same transaction, the cursor will be corrupted.
* @param[in] cursor A cursor handle returned by #mdb_cursor_open() * @param[in] cursor A cursor handle returned by #mdb_cursor_open()
* @param[in,out] key The key for a retrieved item * @param[in,out] key The key for a retrieved item
* @param[in,out] data The data of a retrieved item * @param[in,out] data The data of a retrieved item

Loading…
Cancel
Save