diff --git a/libraries/libmdb/mdb.c b/libraries/libmdb/mdb.c index 0ca8ffd..69da48f 100644 --- a/libraries/libmdb/mdb.c +++ b/libraries/libmdb/mdb.c @@ -262,6 +262,7 @@ typedef struct MDB_node { unsigned int mn_ksize:12; /* key size */ #define F_BIGDATA 0x01 /* data put on overflow page */ #define F_SUBDATA 0x02 /* data is a sub-database */ +#define F_DUPDATA 0x04 /* data has duplicates */ char mn_data[1]; } MDB_node; @@ -1763,7 +1764,7 @@ mdb_get(MDB_txn *txn, MDB_dbi dbi, leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, NULL); if (leaf && exact) { /* Return first duplicate data item */ - if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { MDB_xcursor mx; mdb_xcursor_init0(txn, dbi, &mx); @@ -1854,17 +1855,18 @@ mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o assert(cursor->mc_initialized); + top = CURSOR_TOP(cursor); + mp = top->mp_page; + if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { - if (op == MDB_NEXT || op == MDB_NEXT_DUP) { + leaf = NODEPTR(mp, top->mp_ki); + if ((op == MDB_NEXT || op == MDB_NEXT_DUP) && F_ISSET(leaf->mn_flags, F_DUPDATA)) { rc = mdb_cursor_next(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_NEXT); if (op != MDB_NEXT || rc == MDB_SUCCESS) return rc; } } - top = CURSOR_TOP(cursor); - mp = top->mp_page; - DPRINTF("cursor_next: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor); if (top->mp_ki + 1 >= NUMKEYS(mp)) { @@ -1885,12 +1887,14 @@ mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o assert(IS_LEAF(mp)); leaf = NODEPTR(mp, top->mp_ki); + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { + mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); + } if (data) { if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS)) return rc; - if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { - mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL); if (rc != MDB_SUCCESS) return rc; @@ -1910,17 +1914,18 @@ mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o assert(cursor->mc_initialized); + top = CURSOR_TOP(cursor); + mp = top->mp_page; + if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { - if (op == MDB_PREV || op == MDB_PREV_DUP) { + leaf = NODEPTR(mp, top->mp_ki); + if ((op == MDB_PREV || op == MDB_PREV_DUP) && F_ISSET(leaf->mn_flags, F_DUPDATA)) { rc = mdb_cursor_prev(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_PREV); if (op != MDB_PREV || rc == MDB_SUCCESS) return rc; } } - top = CURSOR_TOP(cursor); - mp = top->mp_page; - DPRINTF("cursor_prev: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor); if (top->mp_ki == 0) { @@ -1943,12 +1948,14 @@ mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o assert(IS_LEAF(mp)); leaf = NODEPTR(mp, top->mp_ki); + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { + mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); + } if (data) { if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS)) return rc; - if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { - mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL); if (rc != MDB_SUCCESS) return rc; @@ -1999,13 +2006,11 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data, cursor->mc_initialized = 1; cursor->mc_eof = 0; + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { + mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); + } if (data) { - MDB_val d2; - if ((rc = mdb_read_data(cursor->mc_txn, leaf, &d2)) != MDB_SUCCESS) - return rc; - - if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { - mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { if (op == MDB_SET || op == MDB_SET_RANGE) { rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL); } else { @@ -2023,9 +2028,9 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data, return rc; } } else { - *data = d2; + if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS) + return rc; } - } rc = mdb_set_key(leaf, key); @@ -2057,14 +2062,14 @@ mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data) cursor->mc_eof = 0; if (data) { - if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS) - return rc; - - if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL); if (rc) return rc; + } else { + if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS) + return rc; } } return mdb_set_key(leaf, key); @@ -2097,14 +2102,14 @@ mdb_cursor_last(MDB_cursor *cursor, MDB_val *key, MDB_val *data) top->mp_ki = NUMKEYS(top->mp_page) - 1; if (data) { - if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS) - return rc; - - if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL); if (rc) return rc; + } else { + if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS) + return rc; } } @@ -2451,16 +2456,25 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret) int mdb_cursor_count(MDB_cursor *mc, unsigned long *countp) { + MDB_ppage *top; + MDB_node *leaf; + if (mc == NULL || countp == NULL) return EINVAL; if (!(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPSORT)) return EINVAL; - if (!mc->mc_xcursor->mx_cursor.mc_initialized) - return EINVAL; + top = CURSOR_TOP(mc); + leaf = NODEPTR(top->mp_page, top->mp_ki); + if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) { + *countp = 1; + } else { + if (!mc->mc_xcursor->mx_cursor.mc_initialized) + return EINVAL; - *countp = mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi].md_entries; + *countp = mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi].md_entries; + } return MDB_SUCCESS; } @@ -2805,7 +2819,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, return MDB_NOTFOUND; } - if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { MDB_xcursor mx; MDB_pageparent mp2; @@ -3027,26 +3041,46 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi, unsigned int ki; MDB_node *leaf; MDB_pageparent mpp; - MDB_val xdata, *rdata; + MDB_val xdata, *rdata, dkey; MDB_db dummy; + char dbuf[PAGESIZE]; + int do_sub = 0; DPRINTF("==> put key %.*s, size %zu, data size %zu", (int)key->mv_size, (char *)key->mv_data, key->mv_size, data->mv_size); + dkey.mv_size = 0; mpp.mp_parent = NULL; mpp.mp_pi = 0; rc = mdb_search_page(txn, dbi, key, NULL, 1, &mpp); if (rc == MDB_SUCCESS) { leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki); if (leaf && exact) { - if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { - goto put_sub; - } if (flags == MDB_NOOVERWRITE) { DPRINTF("duplicate key %.*s", (int)key->mv_size, (char *)key->mv_data); return MDB_KEYEXIST; } + if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + /* Was a single item before, must convert now */ + if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) { + dkey.mv_size = NODEDSZ(leaf); + memcpy(dbuf, NODEDATA(leaf), dkey.mv_size); + memset(&dummy, 0, sizeof(dummy)); + dummy.md_root = P_INVALID; + if (dkey.mv_size == sizeof(MDB_db)) { + memcpy(NODEDATA(leaf), &dummy, sizeof(dummy)); + goto put_sub; + } + mdb_del_node(mpp.mp_page, ki); + do_sub = 1; + rdata = &xdata; + xdata.mv_size = sizeof(MDB_db); + xdata.mv_data = &dummy; + goto new_sub; + } + goto put_sub; + } /* same size, just replace it */ if (NODEDSZ(leaf) == data->mv_size) { memcpy(NODEDATA(leaf), data->mv_data, data->mv_size); @@ -3078,20 +3112,9 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi, DPRINTF("there are %u keys, should insert new key at index %i", NUMKEYS(mpp.mp_page), ki); - /* For sorted dups, the data item at this level is a DB record - * for a child DB; the actual data elements are stored as keys - * in the child DB. - */ - if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { - rdata = &xdata; - xdata.mv_size = sizeof(MDB_db); - xdata.mv_data = &dummy; - memset(&dummy, 0, sizeof(dummy)); - dummy.md_root = P_INVALID; - } else { - rdata = data; - } + rdata = data; +new_sub: if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(txn->mt_env, key, rdata)) { rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, rdata, P_INVALID); } else { @@ -3115,7 +3138,7 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi, * size limits on dupdata. The actual data fields of the child * DB are all zero size. */ - if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + if (do_sub) { MDB_xcursor mx; leaf = NODEPTR(mpp.mp_page, ki); @@ -3126,6 +3149,13 @@ put_sub: xdata.mv_data = ""; if (flags == MDB_NODUPDATA) flags = MDB_NOOVERWRITE; + /* converted, write the original data first */ + if (dkey.mv_size) { + dkey.mv_data = dbuf; + rc = mdb_put0(&mx.mx_txn, mx.mx_cursor.mc_dbi, &dkey, &xdata, flags); + if (rc) return rc; + leaf->mn_flags |= F_DUPDATA; + } rc = mdb_put0(&mx.mx_txn, mx.mx_cursor.mc_dbi, data, &xdata, flags); mdb_xcursor_fini(txn, dbi, &mx); memcpy(NODEDATA(leaf), &mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi],