From bb6a034e282def9c201153a5b4e3a4d139ac54ab Mon Sep 17 00:00:00 2001 From: Howard Chu Date: Wed, 10 Aug 2011 19:26:06 -0700 Subject: [PATCH] Add mdb_put for sorted dups --- libraries/libmdb/mdb.c | 108 +++++++++++++++++++++++++++++++++++------ 1 file changed, 94 insertions(+), 14 deletions(-) diff --git a/libraries/libmdb/mdb.c b/libraries/libmdb/mdb.c index 545c763..319c1fc 100644 --- a/libraries/libmdb/mdb.c +++ b/libraries/libmdb/mdb.c @@ -74,7 +74,7 @@ typedef ULONG pgno_t; #define MDB_MINKEYS 4 #define MDB_MAGIC 0xBEEFC0DE #define MDB_VERSION 1 -#define MAXKEYSIZE 255 +#define MAXKEYSIZE 511 #define P_INVALID (~0UL) @@ -2147,6 +2147,56 @@ mdb_del_node(MDB_page *mp, indx_t indx) mp->mp_upper += sz; } +static void +mdb_xcursor_init0(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx) +{ + MDB_dbi dbn; + + mx->mx_cursor.mc_txn = &mx->mx_txn; + mx->mx_txn = *txn; + mx->mx_txn.mt_dbxs = mx->mx_dbxs; + mx->mx_txn.mt_dbs = mx->mx_dbs; + mx->mx_dbxs[0] = txn->mt_dbxs[0]; + mx->mx_dbxs[1] = txn->mt_dbxs[1]; + if (dbi > 1) { + mx->mx_dbxs[2] = txn->mt_dbxs[dbi]; + dbn = 2; + } else { + dbn = 1; + } + mx->mx_dbxs[dbn+1].md_parent = dbn; + mx->mx_dbxs[dbn+1].md_cmp = mx->mx_dbxs[dbn].md_dcmp; + mx->mx_dbxs[dbn+1].md_rel = mx->mx_dbxs[dbn].md_rel; + mx->mx_dbxs[dbn+1].md_dirty = 0; + mx->mx_txn.mt_numdbs = dbn+2; +} + +static void +mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_db *db) +{ + mx->mx_dbs[0] = txn->mt_dbs[0]; + mx->mx_dbs[1] = txn->mt_dbs[1]; + if (dbi > 1) { + mx->mx_dbs[2] = txn->mt_dbs[dbi]; + mx->mx_dbs[3] = *db; + } else { + mx->mx_dbs[2] = *db; + } +} + +static void +mdb_xcursor_fini(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx) +{ + txn->mt_dbs[0] = mx->mx_dbs[0]; + txn->mt_dbs[1] = mx->mx_dbs[1]; + txn->mt_dbxs[0].md_dirty = mx->mx_dbxs[0].md_dirty; + txn->mt_dbxs[1].md_dirty = mx->mx_dbxs[1].md_dirty; + if (dbi > 1) { + txn->mt_dbs[dbi] = mx->mx_dbs[2]; + txn->mt_dbxs[2].md_dirty = mx->mx_dbxs[2].md_dirty; + } +} + int mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret) { @@ -2166,18 +2216,7 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret) if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) { MDB_xcursor *mx = (MDB_xcursor *)(cursor + 1); cursor->mc_xcursor = mx; - mx->mx_cursor.mc_txn = &mx->mx_txn; - mx->mx_txn = *txn; - mx->mx_txn.mt_dbxs = mx->mx_dbxs; - mx->mx_txn.mt_dbs = mx->mx_dbs; - mx->mx_dbxs[0] = txn->mt_dbxs[0]; - mx->mx_dbxs[1] = txn->mt_dbxs[1]; - if (dbi > 1) { - mx->mx_dbxs[2] = txn->mt_dbxs[dbi]; - mx->mx_txn.mt_numdbs = 4; - } else { - mx->mx_txn.mt_numdbs = 3; - } + mdb_xcursor_init0(txn, dbi, mx); } } else { return ENOMEM; @@ -2535,6 +2574,10 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, if (data && (rc = mdb_read_data(txn, leaf, data)) != MDB_SUCCESS) return rc; + if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + /* add all the child DB's pages to the free list */ + } + return mdb_del0(txn, dbi, ki, &mpp, leaf); } @@ -2703,6 +2746,8 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, unsigned int ki; MDB_node *leaf; MDB_pageparent mpp; + MDB_val xdata, *rdata; + MDB_db dummy; assert(key != NULL); assert(data != NULL); @@ -2731,6 +2776,9 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, if (rc == MDB_SUCCESS) { leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki); if (leaf && exact) { + if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + goto put_sub; + } if (F_ISSET(flags, MDB_NOOVERWRITE)) { DPRINTF("duplicate key %.*s", (int)key->mv_size, (char *)key->mv_data); @@ -2766,6 +2814,20 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, DPRINTF("there are %u keys, should insert new key at index %i", NUMKEYS(mpp.mp_page), ki); + /* For sorted dups, the data item at this level is a DB record + * for a child DB; the actual data elements are stored as keys + * in the child DB. + */ + if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + rdata = &xdata; + xdata.mv_size = sizeof(MDB_db); + xdata.mv_data = &dummy; + memset(&dummy, 0, sizeof(dummy)); + dummy.md_root = P_INVALID; + } else { + rdata = data; + } + if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(txn->mt_env, key, data)) { rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, data, P_INVALID); } else { @@ -2775,8 +2837,26 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, if (rc != MDB_SUCCESS) txn->mt_flags |= MDB_TXN_ERROR; - else + else { txn->mt_dbs[dbi].md_entries++; + /* Now store the actual data in the child DB. Note that we're + * storing the user data in the keys field, so there are strict + * size limits on dupdata. The actual data fields of the child + * DB are all zero size. + */ + if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + MDB_xcursor mx; + + leaf = NODEPTR(mpp.mp_page, ki); +put_sub: + mdb_xcursor_init0(txn, dbi, &mx); + mdb_xcursor_init1(txn, dbi, &mx, NODEDATA(leaf)); + xdata.mv_size = 0; + xdata.mv_data = ""; + rc = mdb_put(&mx.mx_txn, mx.mx_txn.mt_numdbs-1, data, &xdata, flags); + mdb_xcursor_fini(txn, dbi, &mx); + } + } done: return rc;