Raise safe max MDB_MAXKEYSIZE.

Use a sub-DB for DUPSORT item #1/#2 per key if needed: Not a sub-
page too big for a node, nor an overflow page (which not all DUPSORT
code checks for). Move "insert" code, to avoid non-loop goto upwards.

(This is the commit which needs the change to xdata.mv_size in
commit 9d6e4a9163 "page sizes".)
vmware
Hallvard Furuseth 11 years ago
parent 734bc7e6ad
commit d69d2ce230
  1. 60
      libraries/liblmdb/mdb.c

@ -385,8 +385,7 @@ static txnid_t mdb_debug_start;
/** @brief The maximum size of a key we can write to a database. /** @brief The maximum size of a key we can write to a database.
* *
* We require that keys all fit onto a regular page. This limit * We require that keys all fit onto a regular page. This limit
* could be raised a bit further if needed; to something just * can be raised to page size / #MDB_MINKEYS - (64-bit ? 66 : 44).
* under (page size / #MDB_MINKEYS / 3).
* *
* Note that data items in an #MDB_DUPSORT database are actually keys * Note that data items in an #MDB_DUPSORT database are actually keys
* of a subDB, so they're also limited to this size. * of a subDB, so they're also limited to this size.
@ -5666,9 +5665,11 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
enum { MDB_NO_ROOT = MDB_LAST_ERRCODE+10 }; /* internal code */ enum { MDB_NO_ROOT = MDB_LAST_ERRCODE+10 }; /* internal code */
MDB_env *env = mc->mc_txn->mt_env; MDB_env *env = mc->mc_txn->mt_env;
MDB_node *leaf = NULL; MDB_node *leaf = NULL;
MDB_val xdata, *rdata, dkey; MDB_page *fp, *mp;
uint16_t fp_flags;
MDB_val xdata, *rdata, dkey, olddata;
MDB_db dummy; MDB_db dummy;
int do_sub = 0, insert = 0; int do_sub = 0, insert;
unsigned int mcount = 0, dcount = 0, nospill; unsigned int mcount = 0, dcount = 0, nospill;
size_t nsize; size_t nsize;
int rc, rc2; int rc, rc2;
@ -5782,11 +5783,21 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
return rc2; return rc2;
} }
/* The key already exists */ insert = rc;
if (rc == MDB_SUCCESS) { if (insert) {
MDB_page *fp, *mp; /* The key does not exist */
MDB_val olddata; DPRINTF(("inserting key at index %i", mc->mc_ki[mc->mc_top]));
if ((mc->mc_db->md_flags & MDB_DUPSORT) &&
LEAFSIZE(key, data) > env->me_nodemax)
{
/* Too big for a node, insert in sub-DB */
fp_flags = P_LEAF|P_DIRTY;
fp = env->me_pbuf;
fp->mp_pad = data->mv_size; /* used if MDB_DUPFIXED */
fp->mp_lower = fp->mp_upper = olddata.mv_size = PAGEHDRSZ;
goto prep_subDB;
}
} else {
/* there's only a key anyway, so this is a no-op */ /* there's only a key anyway, so this is a no-op */
if (IS_LEAF2(mc->mc_pg[mc->mc_top])) { if (IS_LEAF2(mc->mc_pg[mc->mc_top])) {
unsigned int ksize = mc->mc_db->md_pad; unsigned int ksize = mc->mc_db->md_pad;
@ -5806,6 +5817,12 @@ more:
/* DB has dups? */ /* DB has dups? */
if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT)) { if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT)) {
/* Prepare (sub-)page/sub-DB to accept the new item,
* if needed. fp: old sub-page or a header faking
* it. mp: new (sub-)page. offset: growth in page
* size. xdata: node data with new page or DB.
*/
unsigned i, offset = 0;
mp = fp = xdata.mv_data = env->me_pbuf; mp = fp = xdata.mv_data = env->me_pbuf;
mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno; mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
@ -5851,16 +5868,13 @@ more:
(dkey.mv_size & 1) + (data->mv_size & 1); (dkey.mv_size & 1) + (data->mv_size & 1);
} }
fp->mp_upper = xdata.mv_size; fp->mp_upper = xdata.mv_size;
olddata.mv_size = fp->mp_upper; /* pretend olddata is fp */
} else if (leaf->mn_flags & F_SUBDATA) { } else if (leaf->mn_flags & F_SUBDATA) {
/* Data is on sub-DB, just store it */ /* Data is on sub-DB, just store it */
flags |= F_DUPDATA|F_SUBDATA; flags |= F_DUPDATA|F_SUBDATA;
goto put_sub; goto put_sub;
} else { } else {
/* See if we need to convert from fake page to subDB */ /* Data is on sub-page */
unsigned int offset;
unsigned int i;
uint16_t fp_flags;
fp = olddata.mv_data; fp = olddata.mv_data;
switch (flags) { switch (flags) {
default: default:
@ -5882,11 +5896,16 @@ more:
flags |= F_DUPDATA; flags |= F_DUPDATA;
goto put_sub; goto put_sub;
} }
fp_flags = fp->mp_flags;
xdata.mv_size = olddata.mv_size + offset; xdata.mv_size = olddata.mv_size + offset;
if (NODESIZE+NODEKSZ(leaf)+xdata.mv_size > env->me_nodemax) { }
/* yes, convert it */
fp_flags = fp->mp_flags;
if (NODESIZE + NODEKSZ(leaf) + xdata.mv_size > env->me_nodemax) {
/* Too big for a sub-page, convert to sub-DB */
fp_flags &= ~P_SUBP;
prep_subDB:
if (mc->mc_db->md_flags & MDB_DUPFIXED) { if (mc->mc_db->md_flags & MDB_DUPFIXED) {
fp_flags |= P_LEAF2;
dummy.md_pad = fp->mp_pad; dummy.md_pad = fp->mp_pad;
dummy.md_flags = MDB_DUPFIXED; dummy.md_flags = MDB_DUPFIXED;
if (mc->mc_db->md_flags & MDB_INTEGERDUP) if (mc->mc_db->md_flags & MDB_INTEGERDUP)
@ -5907,13 +5926,13 @@ more:
offset = env->me_psize - olddata.mv_size; offset = env->me_psize - olddata.mv_size;
flags |= F_DUPDATA|F_SUBDATA; flags |= F_DUPDATA|F_SUBDATA;
dummy.md_root = mp->mp_pgno; dummy.md_root = mp->mp_pgno;
fp_flags &= ~P_SUBP;
} }
if (mp != fp) {
mp->mp_flags = fp_flags | P_DIRTY; mp->mp_flags = fp_flags | P_DIRTY;
mp->mp_pad = fp->mp_pad; mp->mp_pad = fp->mp_pad;
mp->mp_lower = fp->mp_lower; mp->mp_lower = fp->mp_lower;
mp->mp_upper = fp->mp_upper + offset; mp->mp_upper = fp->mp_upper + offset;
if (IS_LEAF2(fp)) { if (fp_flags & P_LEAF2) {
memcpy(METADATA(mp), METADATA(fp), NUMKEYS(fp) * fp->mp_pad); memcpy(METADATA(mp), METADATA(fp), NUMKEYS(fp) * fp->mp_pad);
} else { } else {
memcpy((char *)mp + mp->mp_upper, (char *)fp + fp->mp_upper, memcpy((char *)mp + mp->mp_upper, (char *)fp + fp->mp_upper,
@ -6004,9 +6023,6 @@ current:
} }
mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0); mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
mc->mc_db->md_entries--; mc->mc_db->md_entries--;
} else {
DPRINTF(("inserting key at index %i", mc->mc_ki[mc->mc_top]));
insert = 1;
} }
rdata = data; rdata = data;

Loading…
Cancel
Save