Add MDB_RESERVE mode

When putting a record, just make space for the data, don't copy it.
(Not compatible with MDB_DUPSORT, since the actual data is needed
to determine the insert location.)
vmware
Howard Chu 13 years ago
parent fe11433619
commit 6e05ca17e9
  1. 51
      libraries/libmdb/mdb.c
  2. 4
      libraries/libmdb/mdb.h

@ -598,6 +598,10 @@ typedef struct MDB_node {
#define F_BIGDATA 0x01 /**< data put on overflow page */ #define F_BIGDATA 0x01 /**< data put on overflow page */
#define F_SUBDATA 0x02 /**< data is a sub-database */ #define F_SUBDATA 0x02 /**< data is a sub-database */
#define F_DUPDATA 0x04 /**< data has duplicates */ #define F_DUPDATA 0x04 /**< data has duplicates */
/** valid flags for #mdb_node_add() */
#define NODE_ADD_FLAGS (F_DUPDATA|F_SUBDATA|MDB_RESERVE)
/** @} */ /** @} */
unsigned short mn_flags; /**< @ref mdb_node */ unsigned short mn_flags; /**< @ref mdb_node */
unsigned short mn_ksize; /**< key size */ unsigned short mn_ksize; /**< key size */
@ -886,7 +890,7 @@ static int mdb_page_search(MDB_cursor *mc,
MDB_val *key, int modify); MDB_val *key, int modify);
static int mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst); static int mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst);
static int mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, static int mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata,
pgno_t newpgno); pgno_t newpgno, unsigned int nflags);
static int mdb_env_read_header(MDB_env *env, MDB_meta *meta); static int mdb_env_read_header(MDB_env *env, MDB_meta *meta);
static int mdb_env_read_meta(MDB_env *env, int *which); static int mdb_env_read_meta(MDB_env *env, int *which);
@ -894,7 +898,7 @@ static int mdb_env_write_meta(MDB_txn *txn);
static MDB_node *mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp); static MDB_node *mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp);
static int mdb_node_add(MDB_cursor *mc, indx_t indx, static int mdb_node_add(MDB_cursor *mc, indx_t indx,
MDB_val *key, MDB_val *data, pgno_t pgno, uint8_t flags); MDB_val *key, MDB_val *data, pgno_t pgno, unsigned int flags);
static void mdb_node_del(MDB_page *mp, indx_t indx, int ksize); static void mdb_node_del(MDB_page *mp, indx_t indx, int ksize);
static void mdb_node_shrink(MDB_page *mp, indx_t indx); static void mdb_node_shrink(MDB_page *mp, indx_t indx);
static int mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst); static int mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst);
@ -3875,6 +3879,7 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
int rc, rc2; int rc, rc2;
char pbuf[PAGESIZE]; char pbuf[PAGESIZE];
char dbuf[MAXKEYSIZE+1]; char dbuf[MAXKEYSIZE+1];
unsigned int nflags;
DKBUF; DKBUF;
if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_RDONLY)) if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_RDONLY))
@ -4068,12 +4073,13 @@ current:
rdata = data; rdata = data;
new_sub: new_sub:
nflags = flags & NODE_ADD_FLAGS;
nsize = IS_LEAF2(mc->mc_pg[mc->mc_top]) ? key->mv_size : mdb_leaf_size(mc->mc_txn->mt_env, key, rdata); nsize = IS_LEAF2(mc->mc_pg[mc->mc_top]) ? key->mv_size : mdb_leaf_size(mc->mc_txn->mt_env, key, rdata);
if (SIZELEFT(mc->mc_pg[mc->mc_top]) < nsize) { if (SIZELEFT(mc->mc_pg[mc->mc_top]) < nsize) {
rc = mdb_page_split(mc, key, rdata, P_INVALID); rc = mdb_page_split(mc, key, rdata, P_INVALID, nflags);
} else { } else {
/* There is room already in this leaf page. */ /* There is room already in this leaf page. */
rc = mdb_node_add(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, 0); rc = mdb_node_add(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, nflags);
if (rc == 0 && !do_sub) { if (rc == 0 && !do_sub) {
/* Adjust other cursors pointing to mp */ /* Adjust other cursors pointing to mp */
MDB_cursor *m2, *m3; MDB_cursor *m2, *m3;
@ -4100,12 +4106,6 @@ new_sub:
if (rc != MDB_SUCCESS) if (rc != MDB_SUCCESS)
mc->mc_txn->mt_flags |= MDB_TXN_ERROR; mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
else { else {
/* Remember if we just added a subdatabase */
if (flags & (F_SUBDATA|F_DUPDATA)) {
leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
leaf->mn_flags |= (flags & (F_SUBDATA|F_DUPDATA));
}
/* Now store the actual data in the child DB. Note that we're /* Now store the actual data in the child DB. Note that we're
* storing the user data in the keys field, so there are strict * storing the user data in the keys field, so there are strict
* size limits on dupdata. The actual data fields of the child * size limits on dupdata. The actual data fields of the child
@ -4117,6 +4117,7 @@ new_sub:
put_sub: put_sub:
xdata.mv_size = 0; xdata.mv_size = 0;
xdata.mv_data = ""; xdata.mv_data = "";
leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
if (flags & MDB_CURRENT) { if (flags & MDB_CURRENT) {
xflags = MDB_CURRENT; xflags = MDB_CURRENT;
} else { } else {
@ -4311,7 +4312,7 @@ mdb_branch_size(MDB_env *env, MDB_val *key)
*/ */
static int static int
mdb_node_add(MDB_cursor *mc, indx_t indx, mdb_node_add(MDB_cursor *mc, indx_t indx,
MDB_val *key, MDB_val *data, pgno_t pgno, uint8_t flags) MDB_val *key, MDB_val *data, pgno_t pgno, unsigned int flags)
{ {
unsigned int i; unsigned int i;
size_t node_size = NODESIZE; size_t node_size = NODESIZE;
@ -4407,12 +4408,17 @@ mdb_node_add(MDB_cursor *mc, indx_t indx,
if (F_ISSET(flags, F_BIGDATA)) if (F_ISSET(flags, F_BIGDATA))
memcpy(node->mn_data + key->mv_size, data->mv_data, memcpy(node->mn_data + key->mv_size, data->mv_data,
sizeof(pgno_t)); sizeof(pgno_t));
else if (F_ISSET(flags, MDB_RESERVE))
data->mv_data = node->mn_data + key->mv_size;
else else
memcpy(node->mn_data + key->mv_size, data->mv_data, memcpy(node->mn_data + key->mv_size, data->mv_data,
data->mv_size); data->mv_size);
} else { } else {
memcpy(node->mn_data + key->mv_size, &ofp->mp_pgno, memcpy(node->mn_data + key->mv_size, &ofp->mp_pgno,
sizeof(pgno_t)); sizeof(pgno_t));
if (F_ISSET(flags, MDB_RESERVE))
data->mv_data = METADATA(ofp);
else
memcpy(METADATA(ofp), data->mv_data, data->mv_size); memcpy(METADATA(ofp), data->mv_data, data->mv_size);
} }
} }
@ -5219,15 +5225,16 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
* @return 0 on success, non-zero on failure. * @return 0 on success, non-zero on failure.
*/ */
static int static int
mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno) mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno,
unsigned int nflags)
{ {
uint8_t flags; unsigned int flags;
int rc = MDB_SUCCESS, ins_new = 0, new_root = 0; int rc = MDB_SUCCESS, ins_new = 0, new_root = 0;
indx_t newindx; indx_t newindx;
pgno_t pgno = 0; pgno_t pgno = 0;
unsigned int i, j, split_indx, nkeys, pmax; unsigned int i, j, split_indx, nkeys, pmax;
MDB_node *node; MDB_node *node;
MDB_val sepkey, rkey, rdata; MDB_val sepkey, rkey, xdata, *rdata = &xdata;
MDB_page *copy; MDB_page *copy;
MDB_page *mp, *rp, *pp; MDB_page *mp, *rp, *pp;
unsigned int ptop; unsigned int ptop;
@ -5385,7 +5392,7 @@ newsep:
if (SIZELEFT(mn.mc_pg[ptop]) < mdb_branch_size(mc->mc_txn->mt_env, &sepkey)) { if (SIZELEFT(mn.mc_pg[ptop]) < mdb_branch_size(mc->mc_txn->mt_env, &sepkey)) {
mn.mc_snum--; mn.mc_snum--;
mn.mc_top--; mn.mc_top--;
rc = mdb_page_split(&mn, &sepkey, NULL, rp->mp_pgno); rc = mdb_page_split(&mn, &sepkey, NULL, rp->mp_pgno, 0);
/* Right page might now have changed parent. /* Right page might now have changed parent.
* Check if left page also changed parent. * Check if left page also changed parent.
@ -5432,11 +5439,10 @@ newsep:
rkey.mv_data = newkey->mv_data; rkey.mv_data = newkey->mv_data;
rkey.mv_size = newkey->mv_size; rkey.mv_size = newkey->mv_size;
if (IS_LEAF(mp)) { if (IS_LEAF(mp)) {
rdata.mv_data = newdata->mv_data; rdata = newdata;
rdata.mv_size = newdata->mv_size;
} else } else
pgno = newpgno; pgno = newpgno;
flags = 0; flags = nflags;
ins_new = 1; ins_new = 1;
@ -5449,8 +5455,9 @@ newsep:
rkey.mv_data = NODEKEY(node); rkey.mv_data = NODEKEY(node);
rkey.mv_size = node->mn_ksize; rkey.mv_size = node->mn_ksize;
if (IS_LEAF(mp)) { if (IS_LEAF(mp)) {
rdata.mv_data = NODEDATA(node); xdata.mv_data = NODEDATA(node);
rdata.mv_size = NODEDSZ(node); xdata.mv_size = NODEDSZ(node);
rdata = &xdata;
} else } else
pgno = NODEPGNO(node); pgno = NODEPGNO(node);
flags = node->mn_flags; flags = node->mn_flags;
@ -5463,7 +5470,7 @@ newsep:
rkey.mv_size = 0; rkey.mv_size = 0;
} }
rc = mdb_node_add(mc, j, &rkey, &rdata, pgno, flags); rc = mdb_node_add(mc, j, &rkey, rdata, pgno, flags);
} }
/* reset back to original page */ /* reset back to original page */
@ -5539,7 +5546,7 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
return EINVAL; return EINVAL;
} }
if ((flags & (MDB_NOOVERWRITE|MDB_NODUPDATA)) != flags) if ((flags & (MDB_NOOVERWRITE|MDB_NODUPDATA|MDB_RESERVE)) != flags)
return EINVAL; return EINVAL;
mdb_cursor_init(&mc, txn, dbi, &mx); mdb_cursor_init(&mc, txn, dbi, &mx);

@ -189,6 +189,10 @@ typedef void (MDB_rel_func)(MDB_val *item, void *oldptr, void *newptr, void *rel
#define MDB_NODUPDATA 0x20 #define MDB_NODUPDATA 0x20
/** For mdb_cursor_put: overwrite the current key/data pair */ /** For mdb_cursor_put: overwrite the current key/data pair */
#define MDB_CURRENT 0x40 #define MDB_CURRENT 0x40
/** For put: Just reserve space for data, don't copy it. Return a
* pointer to the reserved space.
*/
#define MDB_RESERVE 0x10000
/* @} */ /* @} */
/** @brief Cursor Get operations. /** @brief Cursor Get operations.

Loading…
Cancel
Save