Handle more errors. Invalidate txn if needed.

incre
Hallvard Furuseth 11 years ago
parent 937b5eff07
commit 68e97b2184
  1. 112
      libraries/liblmdb/mdb.c

@ -936,7 +936,7 @@ struct MDB_txn {
* @{
*/
#define MDB_TXN_RDONLY 0x01 /**< read-only transaction */
#define MDB_TXN_ERROR 0x02 /**< an error has occurred */
#define MDB_TXN_ERROR 0x02 /**< txn is unusable after an error */
#define MDB_TXN_DIRTY 0x04 /**< must write, even if dirty list is empty */
#define MDB_TXN_SPILLS 0x08 /**< txn or a parent has spilled pages */
/** @} */
@ -1397,6 +1397,7 @@ static void mdb_audit(MDB_txn *txn)
mdb_cursor_init(&mc, txn, FREE_DBI, NULL);
while ((rc = mdb_cursor_get(&mc, &key, &data, MDB_NEXT)) == 0)
freecount += *(MDB_ID *)data.mv_data;
mdb_tassert(txn, rc == MDB_NOTFOUND);
count = 0;
for (i = 0; i<txn->mt_numdbs; i++) {
@ -1410,8 +1411,8 @@ static void mdb_audit(MDB_txn *txn)
txn->mt_dbs[i].md_leaf_pages +
txn->mt_dbs[i].md_overflow_pages;
if (txn->mt_dbs[i].md_flags & MDB_DUPSORT) {
mdb_page_search(&mc, NULL, MDB_PS_FIRST);
do {
rc = mdb_page_search(&mc, NULL, MDB_PS_FIRST);
for (; rc == MDB_SUCCESS; rc = mdb_cursor_sibling(&mc, 1)) {
unsigned j;
MDB_page *mp;
mp = mc.mc_pg[mc.mc_top];
@ -1425,7 +1426,7 @@ static void mdb_audit(MDB_txn *txn)
}
}
}
while (mdb_cursor_sibling(&mc, 1) == 0);
mdb_tassert(txn, rc == MDB_NOTFOUND);
}
}
if (freecount + count + 2 /* metapages */ != txn->mt_next_pgno) {
@ -6228,8 +6229,10 @@ put_sub:
/* converted, write the original data first */
if (dkey.mv_size) {
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, xflags);
if (rc)
return rc;
if (rc) {
mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
return rc == MDB_KEYEXIST ? MDB_CORRUPTED : rc;
}
{
/* Adjust other cursors pointing to mp */
MDB_cursor *m2;
@ -6316,6 +6319,8 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
mc->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf);
}
rc = mdb_cursor_del(&mc->mc_xcursor->mx_cursor, MDB_NOSPILL);
if (rc)
return rc;
/* If sub-DB still has entries, we're done */
if (mc->mc_xcursor->mx_db.md_entries) {
if (leaf->mn_flags & F_SUBDATA) {
@ -6346,10 +6351,9 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
if (leaf->mn_flags & F_SUBDATA) {
/* add all the child DB's pages to the free list */
rc = mdb_drop0(&mc->mc_xcursor->mx_cursor, 0);
if (rc == MDB_SUCCESS) {
mc->mc_db->md_entries -=
mc->mc_xcursor->mx_db.md_entries;
}
if (rc)
goto fail;
mc->mc_db->md_entries -= mc->mc_xcursor->mx_db.md_entries;
}
}
@ -6361,11 +6365,15 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
memcpy(&pg, NODEDATA(leaf), sizeof(pg));
if ((rc = mdb_page_get(mc->mc_txn, pg, &omp, NULL)) ||
(rc = mdb_ovpage_free(mc, omp)))
return rc;
goto fail;
}
del_key:
return mdb_cursor_del0(mc);
fail:
mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
return rc;
}
/** Allocate and initialize new pages for a database.
@ -7027,7 +7035,9 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst)
unsigned int snum = csrc->mc_snum;
MDB_node *s2;
/* must find the lowest key below src */
mdb_page_search_lowest(csrc);
rc = mdb_page_search_lowest(csrc);
if (rc)
return rc;
if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
key.mv_size = csrc->mc_db->md_pad;
key.mv_data = LEAF2KEY(csrc->mc_pg[csrc->mc_top], 0, key.mv_size);
@ -7050,7 +7060,9 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst)
MDB_node *s2;
MDB_val bkey;
/* must find the lowest key below dst */
mdb_page_search_lowest(cdst);
rc = mdb_page_search_lowest(cdst);
if (rc)
return rc;
if (IS_LEAF2(cdst->mc_pg[cdst->mc_top])) {
bkey.mv_size = cdst->mc_db->md_pad;
bkey.mv_data = LEAF2KEY(cdst->mc_pg[cdst->mc_top], 0, bkey.mv_size);
@ -7212,7 +7224,9 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst)
unsigned int snum = csrc->mc_snum;
MDB_node *s2;
/* must find the lowest key below src */
mdb_page_search_lowest(csrc);
rc = mdb_page_search_lowest(csrc);
if (rc)
return rc;
if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
key.mv_size = csrc->mc_db->md_pad;
key.mv_data = LEAF2KEY(csrc->mc_pg[csrc->mc_top], 0, key.mv_size);
@ -7486,9 +7500,8 @@ mdb_cursor_del0(MDB_cursor *mc)
mdb_node_del(mc, mc->mc_db->md_pad);
mc->mc_db->md_entries--;
rc = mdb_rebalance(mc);
if (rc != MDB_SUCCESS)
mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
else {
if (rc == MDB_SUCCESS) {
MDB_cursor *m2, *m3;
MDB_dbi dbi = mc->mc_dbi;
@ -7496,11 +7509,14 @@ mdb_cursor_del0(MDB_cursor *mc)
nkeys = NUMKEYS(mp);
/* if mc points past last node in page, find next sibling */
if (mc->mc_ki[mc->mc_top] >= nkeys)
mdb_cursor_sibling(mc, 1);
if (mc->mc_ki[mc->mc_top] >= nkeys) {
rc = mdb_cursor_sibling(mc, 1);
if (rc == MDB_NOTFOUND)
rc = MDB_SUCCESS;
}
/* Adjust other cursors pointing to mp */
for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
for (m2 = mc->mc_txn->mt_cursors[dbi]; !rc && m2; m2=m2->mc_next) {
m3 = (mc->mc_flags & C_SUB) ? &m2->mc_xcursor->mx_cursor : m2;
if (! (m2->mc_flags & m3->mc_flags & C_INITIALIZED))
continue;
@ -7512,13 +7528,18 @@ mdb_cursor_del0(MDB_cursor *mc)
if (m3->mc_ki[mc->mc_top] > ki)
m3->mc_ki[mc->mc_top]--;
}
if (m3->mc_ki[mc->mc_top] >= nkeys)
mdb_cursor_sibling(m3, 1);
if (m3->mc_ki[mc->mc_top] >= nkeys) {
rc = mdb_cursor_sibling(m3, 1);
if (rc == MDB_NOTFOUND)
rc = MDB_SUCCESS;
}
}
}
mc->mc_flags |= C_DEL;
}
if (rc)
mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
return rc;
}
@ -7626,7 +7647,7 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
if (mc->mc_snum < 2) {
if ((rc = mdb_page_new(mc, P_BRANCH, 1, &pp)))
return rc;
goto done;
/* shift current top to make room for new parent */
mc->mc_pg[1] = mc->mc_pg[0];
mc->mc_ki[1] = mc->mc_ki[0];
@ -7644,7 +7665,7 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
mc->mc_ki[0] = mc->mc_ki[1];
mc->mc_db->md_root = mp->mp_pgno;
mc->mc_db->md_depth--;
return rc;
goto done;
}
mc->mc_snum = 2;
mc->mc_top = 1;
@ -7673,7 +7694,6 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
int x;
unsigned int lsize, rsize, ksize;
/* Move half of the keys to the right sibling */
copy = NULL;
x = mc->mc_ki[mc->mc_top] - split_indx;
ksize = mc->mc_db->md_pad;
split = LEAF2KEY(mp, split_indx, ksize);
@ -7720,8 +7740,10 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
/* grab a page to hold a temporary copy */
copy = mdb_page_malloc(mc->mc_txn, 1);
if (copy == NULL)
return ENOMEM;
if (copy == NULL) {
rc = ENOMEM;
goto done;
}
copy->mp_pgno = mp->mp_pgno;
copy->mp_flags = mp->mp_flags;
copy->mp_lower = PAGEHDRSZ;
@ -7801,6 +7823,8 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
mn.mc_top--;
did_split = 1;
rc = mdb_page_split(&mn, &sepkey, NULL, rp->mp_pgno, 0);
if (rc)
goto done;
/* root split? */
if (mn.mc_snum == mc->mc_snum) {
@ -7837,14 +7861,14 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
}
mc->mc_flags ^= C_SPLITTING;
if (rc != MDB_SUCCESS) {
return rc;
goto done;
}
if (nflags & MDB_APPEND) {
mc->mc_pg[mc->mc_top] = rp;
mc->mc_ki[mc->mc_top] = 0;
rc = mdb_node_add(mc, 0, newkey, newdata, newpgno, nflags);
if (rc)
return rc;
goto done;
for (i=0; i<mc->mc_top; i++)
mc->mc_ki[i] = mn.mc_ki[i];
} else if (!IS_LEAF2(mp)) {
@ -7882,11 +7906,8 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
}
rc = mdb_node_add(mc, j, &rkey, rdata, pgno, flags);
if (rc) {
/* return tmp page to freelist */
mdb_page_free(env, copy);
return rc;
}
if (rc)
goto done;
if (i == nkeys) {
i = 0;
j = 0;
@ -7926,8 +7947,6 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
}
}
}
/* return tmp page to freelist */
mdb_page_free(env, copy);
}
{
@ -7978,6 +7997,12 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
}
}
DPRINTF(("mp left: %d, rp left: %d", SIZELEFT(mp), SIZELEFT(rp)));
done:
if (copy) /* tmp page */
mdb_page_free(env, copy);
if (rc)
mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
return rc;
}
@ -8317,22 +8342,22 @@ mdb_drop0(MDB_cursor *mc, int subs)
memcpy(&pg, NODEDATA(ni), sizeof(pg));
rc = mdb_page_get(txn, pg, &omp, NULL);
if (rc != 0)
return rc;
goto done;
mdb_cassert(mc, IS_OVERFLOW(omp));
rc = mdb_midl_append_range(&txn->mt_free_pgs,
pg, omp->mp_pages);
if (rc)
return rc;
goto done;
} else if (subs && (ni->mn_flags & F_SUBDATA)) {
mdb_xcursor_init1(mc, ni);
rc = mdb_drop0(&mc->mc_xcursor->mx_cursor, 0);
if (rc)
return rc;
goto done;
}
}
} else {
if ((rc = mdb_midl_need(&txn->mt_free_pgs, n)) != 0)
return rc;
goto done;
for (i=0; i<n; i++) {
pgno_t pg;
ni = NODEPTR(mp, i);
@ -8346,6 +8371,8 @@ mdb_drop0(MDB_cursor *mc, int subs)
mc->mc_ki[mc->mc_top] = i;
rc = mdb_cursor_sibling(mc, 1);
if (rc) {
if (rc != MDB_NOTFOUND)
goto done;
/* no more siblings, go back to beginning
* of previous level.
*/
@ -8359,6 +8386,9 @@ mdb_drop0(MDB_cursor *mc, int subs)
}
/* free it */
rc = mdb_midl_append(&txn->mt_free_pgs, mc->mc_db->md_root);
done:
if (rc)
txn->mt_flags |= MDB_TXN_ERROR;
} else if (rc == MDB_NOTFOUND) {
rc = MDB_SUCCESS;
}
@ -8393,6 +8423,8 @@ int mdb_drop(MDB_txn *txn, MDB_dbi dbi, int del)
if (!rc) {
txn->mt_dbflags[dbi] = DB_STALE;
mdb_dbi_close(txn->mt_env, dbi);
} else {
txn->mt_flags |= MDB_TXN_ERROR;
}
} else {
/* reset the DB record, mark it dirty */

Loading…
Cancel
Save