From 590c728044348a2c0d0fa0a46105b4c2c63fe326 Mon Sep 17 00:00:00 2001 From: Howard Chu Date: Sun, 21 Aug 2011 17:49:54 -0700 Subject: [PATCH] First cut at DUPFIXED support Also in cursor_set, check the current page (if already set from before) before starting over again from the root. --- libraries/libmdb/Makefile | 1 + libraries/libmdb/mdb.c | 549 ++++++++++++++++++++++++++++---------- libraries/libmdb/mdb.h | 4 + libraries/libmdb/mtest4.c | 161 +++++++++++ 4 files changed, 574 insertions(+), 141 deletions(-) create mode 100644 libraries/libmdb/mtest4.c diff --git a/libraries/libmdb/Makefile b/libraries/libmdb/Makefile index 92e1fa7..425296e 100644 --- a/libraries/libmdb/Makefile +++ b/libraries/libmdb/Makefile @@ -24,6 +24,7 @@ mdb_stat: mdb_stat.o libmdb.a mtest: mtest.o libmdb.a mtest2: mtest2.o libmdb.a mtest3: mtest3.o libmdb.a +mtest4: mtest4.o libmdb.a mdb.o: mdb.c mdb.h midl.h $(CC) $(CFLAGS) -fPIC $(CPPFLAGS) -c mdb.c diff --git a/libraries/libmdb/mdb.c b/libraries/libmdb/mdb.c index bca938a..bd39a20 100644 --- a/libraries/libmdb/mdb.c +++ b/libraries/libmdb/mdb.c @@ -82,6 +82,14 @@ typedef ULONG pgno_t; #define MDB_MAGIC 0xBEEFC0DE #define MDB_VERSION 1 #define MAXKEYSIZE 511 +#if DEBUG +#define KBUF (MAXKEYSIZE*2+1) +#define DKBUF char kbuf[KBUF] +#define DKEY(x) mdb_dkey(x, kbuf) +#else +#define DKBUF +#define DKEY(x) +#endif #define P_INVALID (~0UL) @@ -168,10 +176,6 @@ typedef struct MDB_page { /* represents a page of storage */ indx_t pb_upper; /* upper bound of free space */ } pb; uint32_t pb_pages; /* number of overflow pages */ - struct { - indx_t pb_ksize; /* on a LEAF2 page */ - indx_t pb_numkeys; - } pb2; } mp_pb; indx_t mp_ptrs[1]; /* dynamic size */ } MDB_page; @@ -183,13 +187,14 @@ typedef struct MDB_page { /* represents a page of storage */ #define PAGEFILL(env, p) (1000L * ((env)->me_psize - PAGEHDRSZ - SIZELEFT(p)) / \ ((env)->me_psize - PAGEHDRSZ)) #define IS_LEAF(p) F_ISSET((p)->mp_flags, P_LEAF) +#define IS_LEAF2(p) F_ISSET((p)->mp_flags, P_LEAF2) #define IS_BRANCH(p) F_ISSET((p)->mp_flags, P_BRANCH) #define IS_OVERFLOW(p) F_ISSET((p)->mp_flags, P_OVERFLOW) #define OVPAGES(size, psize) ((PAGEHDRSZ-1 + (size)) / (psize) + 1) typedef struct MDB_db { - uint32_t md_pad; + uint32_t md_pad; /* also ksize for LEAF2 pages */ uint16_t md_flags; uint16_t md_depth; ULONG md_branch_pages; @@ -353,6 +358,10 @@ struct MDB_env { #define NODEDATA(node) (void *)((char *)(node)->mn_data + (node)->mn_ksize) #define NODEPGNO(node) ((node)->mn_pgno) #define NODEDSZ(node) ((node)->mn_dsize) +#define NODEKSZ(node) ((node)->mn_ksize) +#define LEAF2KEY(p, i, ks) ((char *)(p) + PAGEHDRSZ + ((i)*(ks))) + +#define MDB_SET_KEY(node, key) if (key!=NULL) {(key)->mv_size = NODEKSZ(node); (key)->mv_data = NODEKEY(node);} #define MDB_COMMIT_PAGES 64 /* max number of pages to write in one commit */ @@ -375,7 +384,7 @@ static MDB_node *mdb_search_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, static int mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, indx_t indx, MDB_val *key, MDB_val *data, pgno_t pgno, uint8_t flags); -static void mdb_del_node(MDB_page *mp, indx_t indx); +static void mdb_del_node(MDB_page *mp, indx_t indx, int ksize); static int mdb_del0(MDB_txn *txn, MDB_dbi dbi, unsigned int ki, MDB_pageparent *mpp, MDB_node *leaf); static int mdb_put0(MDB_txn *txn, MDB_dbi dbi, @@ -398,7 +407,6 @@ static void cursor_pop_page(MDB_cursor *cursor); static MDB_ppage *cursor_push_page(MDB_cursor *cursor, MDB_page *mp); -static int mdb_set_key(MDB_node *node, MDB_val *key); static int mdb_sibling(MDB_cursor *cursor, int move_right); static int mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op op); @@ -487,6 +495,19 @@ mdb_strerror(int err) return strerror(err); } +static char * +mdb_dkey(MDB_val *key, char *buf) +{ + char *ptr = buf; + unsigned char *c = key->mv_data; + unsigned int i; + if (key->mv_size > MAXKEYSIZE) + return "MAXKEYSIZE"; + for (i=0; imv_size; i++) + ptr += sprintf(ptr, "%02x", *c++); + return buf; +} + int mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b) { @@ -509,7 +530,14 @@ mdb_dcmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b) if (txn->mt_dbxs[dbi].md_dcmp) return txn->mt_dbxs[dbi].md_dcmp(a, b); - return memncmp((char *)a->mv_data, a->mv_size, b->mv_data, b->mv_size); + if (txn->mt_dbs[dbi].md_flags & (0 +#if __BYTE_ORDER == __LITTLE_ENDIAN + |MDB_INTEGERDUP +#endif + )) + return memnrcmp(a->mv_data, a->mv_size, b->mv_data, b->mv_size); + else + return memncmp((char *)a->mv_data, a->mv_size, b->mv_data, b->mv_size); } /* Allocate new page(s) for writing */ @@ -1137,6 +1165,8 @@ mdb_env_write_meta(MDB_txn *txn) /* Write to the SYNC fd */ rc = pwrite(env->me_mfd, ptr, len, off); if (rc != len) { + int r2; + rc = errno; DPUTS("write failed, disk error?"); /* On a failure, the pagecache still contains the new data. * Write some old data back, to prevent it from being used. @@ -1144,9 +1174,9 @@ mdb_env_write_meta(MDB_txn *txn) */ meta.mm_last_pg = metab.mm_last_pg; meta.mm_txnid = metab.mm_txnid; - rc = pwrite(env->me_fd, ptr, len, off); + r2 = pwrite(env->me_fd, ptr, len, off); env->me_flags |= MDB_FATAL_ERROR; - return errno; + return rc; } txn->mt_env->me_txns->mti_me_toggle = toggle; @@ -1523,8 +1553,9 @@ mdb_search_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, MDB_val *key, unsigned int i = 0; int low, high; int rc = 0; - MDB_node *node; + MDB_node *node = NULL; MDB_val nodekey; + DKBUF; DPRINTF("searching %u keys in %s page %lu", NUMKEYS(mp), @@ -1539,20 +1570,25 @@ mdb_search_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, MDB_val *key, high = NUMKEYS(mp) - 1; while (low <= high) { i = (low + high) >> 1; - node = NODEPTR(mp, i); - nodekey.mv_size = node->mn_ksize; - nodekey.mv_data = NODEKEY(node); + if (IS_LEAF2(mp)) { + nodekey.mv_size = txn->mt_dbs[dbi].md_pad; + nodekey.mv_data = LEAF2KEY(mp, i, nodekey.mv_size); + } else { + node = NODEPTR(mp, i); + + nodekey.mv_size = node->mn_ksize; + nodekey.mv_data = NODEKEY(node); + } rc = mdb_cmp(txn, dbi, key, &nodekey); if (IS_LEAF(mp)) - DPRINTF("found leaf index %u [%.*s], rc = %i", - i, (int)nodekey.mv_size, (char *)nodekey.mv_data, rc); + DPRINTF("found leaf index %u [%s], rc = %i", + i, DKEY(&nodekey), rc); else - DPRINTF("found branch index %u [%.*s -> %lu], rc = %i", - i, (int)node->mn_ksize, (char *)NODEKEY(node), - node->mn_pgno, rc); + DPRINTF("found branch index %u [%s -> %lu], rc = %i", + i, DKEY(&nodekey), NODEPGNO(node), rc); if (rc == 0) break; @@ -1573,7 +1609,8 @@ mdb_search_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, MDB_val *key, if (kip) /* Store the key index if requested. */ *kip = i; - return NODEPTR(mp, i); + /* nodeptr is fake for LEAF2 */ + return IS_LEAF2(mp) ? NODEPTR(mp, 0) : NODEPTR(mp, i); } static void @@ -1585,7 +1622,8 @@ cursor_pop_page(MDB_cursor *cursor) top = CURSOR_TOP(cursor); cursor->mc_snum--; - DPRINTF("popped page %lu off cursor %p", top->mp_page->mp_pgno, (void *) cursor); + DPRINTF("popped page %lu off db %u cursor %p", top->mp_page->mp_pgno, + cursor->mc_dbi, (void *) cursor); } } @@ -1594,7 +1632,8 @@ cursor_push_page(MDB_cursor *cursor, MDB_page *mp) { MDB_ppage *ppage; - DPRINTF("pushing page %lu on cursor %p", mp->mp_pgno, (void *) cursor); + DPRINTF("pushing page %lu on db %u cursor %p", mp->mp_pgno, + cursor->mc_dbi, (void *) cursor); ppage = &cursor->mc_stack[cursor->mc_snum++]; ppage->mp_page = mp; @@ -1633,6 +1672,7 @@ mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_cursor *cursor, int modify, MDB_pageparent *mpp) { MDB_page *mp = mpp->mp_page; + DKBUF; int rc; if (cursor && cursor_push_page(cursor, mp) == NULL) @@ -1663,8 +1703,8 @@ mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, } if (key) - DPRINTF("following index %u for key %.*s", - i, (int)key->mv_size, (char *)key->mv_data); + DPRINTF("following index %u for key [%s]", + i, DKEY(key)); assert(i < NUMKEYS(mp)); node = NODEPTR(mp, i); @@ -1698,8 +1738,8 @@ mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, return MDB_CORRUPTED; } - DPRINTF("found leaf page %lu for key %.*s", mp->mp_pgno, - key ? (int)key->mv_size : 0, key ? (char *)key->mv_data : NULL); + DPRINTF("found leaf page %lu for key [%s]", mp->mp_pgno, + key ? DKEY(key) : NULL); return MDB_SUCCESS; } @@ -1791,10 +1831,11 @@ mdb_get(MDB_txn *txn, MDB_dbi dbi, int rc, exact; MDB_node *leaf; MDB_pageparent mpp; + DKBUF; assert(key); assert(data); - DPRINTF("===> get key [%.*s]", (int)key->mv_size, (char *)key->mv_data); + DPRINTF("===> get db %u key [%s]", dbi, DKEY(key)); if (txn == NULL || !dbi || dbi >= txn->mt_numdbs) return EINVAL; @@ -1817,9 +1858,17 @@ mdb_get(MDB_txn *txn, MDB_dbi dbi, rc = mdb_search_page(&mx.mx_txn, mx.mx_cursor.mc_dbi, NULL, NULL, 0, &mpp); if (rc != MDB_SUCCESS) return rc; - leaf = NODEPTR(mpp.mp_page, 0); + if (IS_LEAF2(mpp.mp_page)) { + data->mv_size = txn->mt_dbs[dbi].md_pad; + data->mv_data = LEAF2KEY(mpp.mp_page, 0, data->mv_size); + } else { + leaf = NODEPTR(mpp.mp_page, 0); + data->mv_size = NODEKSZ(leaf); + data->mv_data = NODEKEY(leaf); + } + } else { + rc = mdb_read_data(txn, leaf, data); } - rc = mdb_read_data(txn, leaf, data); } else { rc = MDB_NOTFOUND; } @@ -1862,7 +1911,7 @@ mdb_sibling(MDB_cursor *cursor, int move_right) assert(IS_BRANCH(parent->mp_page)); indx = NODEPTR(parent->mp_page, parent->mp_ki); - if ((mp = mdb_get_page(cursor->mc_txn, indx->mn_pgno)) == NULL) + if ((mp = mdb_get_page(cursor->mc_txn, NODEPGNO(indx))) == NULL) return MDB_PAGE_NOTFOUND; #if 0 mp->parent = parent->mp_page; @@ -1874,18 +1923,6 @@ mdb_sibling(MDB_cursor *cursor, int move_right) return MDB_SUCCESS; } -static int -mdb_set_key(MDB_node *node, MDB_val *key) -{ - if (key == NULL) - return 0; - - key->mv_size = node->mn_ksize; - key->mv_data = NODEKEY(node); - - return 0; -} - static int mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op op) { @@ -1905,10 +1942,16 @@ mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { leaf = NODEPTR(mp, top->mp_ki); - if ((op == MDB_NEXT || op == MDB_NEXT_DUP) && F_ISSET(leaf->mn_flags, F_DUPDATA)) { - rc = mdb_cursor_next(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_NEXT); - if (op != MDB_NEXT || rc == MDB_SUCCESS) - return rc; + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { + if (op == MDB_NEXT || op == MDB_NEXT_DUP) { + rc = mdb_cursor_next(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_NEXT); + if (op != MDB_NEXT || rc == MDB_SUCCESS) + return rc; + } + } else { + cursor->mc_xcursor->mx_cursor.mc_initialized = 0; + if (op == MDB_NEXT_DUP) + return MDB_NOTFOUND; } } @@ -1929,6 +1972,12 @@ mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o DPRINTF("==> cursor points to page %lu with %u keys, key index %u", mp->mp_pgno, NUMKEYS(mp), top->mp_ki); + if (IS_LEAF2(mp)) { + key->mv_size = cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_pad; + key->mv_data = LEAF2KEY(mp, top->mp_ki, key->mv_size); + return MDB_SUCCESS; + } + assert(IS_LEAF(mp)); leaf = NODEPTR(mp, top->mp_ki); @@ -1946,7 +1995,8 @@ mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o } } - return mdb_set_key(leaf, key); + MDB_SET_KEY(leaf, key); + return MDB_SUCCESS; } static int @@ -1964,10 +2014,16 @@ mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) { leaf = NODEPTR(mp, top->mp_ki); - if ((op == MDB_PREV || op == MDB_PREV_DUP) && F_ISSET(leaf->mn_flags, F_DUPDATA)) { - rc = mdb_cursor_prev(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_PREV); - if (op != MDB_PREV || rc == MDB_SUCCESS) - return rc; + if (op == MDB_PREV || op == MDB_PREV_DUP) { + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { + rc = mdb_cursor_prev(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_PREV); + if (op != MDB_PREV || rc == MDB_SUCCESS) + return rc; + } else { + cursor->mc_xcursor->mx_cursor.mc_initialized = 0; + if (op == MDB_PREV_DUP) + return MDB_NOTFOUND; + } } } @@ -1976,6 +2032,7 @@ mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o if (top->mp_ki == 0) { DPUTS("=====> move to prev sibling page"); if (mdb_sibling(cursor, 0) != MDB_SUCCESS) { + cursor->mc_initialized = 0; return MDB_NOTFOUND; } top = CURSOR_TOP(cursor); @@ -1990,6 +2047,12 @@ mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o DPRINTF("==> cursor points to page %lu with %u keys, key index %u", mp->mp_pgno, NUMKEYS(mp), top->mp_ki); + if (IS_LEAF2(mp)) { + key->mv_size = cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_pad; + key->mv_data = LEAF2KEY(mp, top->mp_ki, key->mv_size); + return MDB_SUCCESS; + } + assert(IS_LEAF(mp)); leaf = NODEPTR(mp, top->mp_ki); @@ -2007,7 +2070,8 @@ mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op o } } - return mdb_set_key(leaf, key); + MDB_SET_KEY(leaf, key); + return MDB_SUCCESS; } static int @@ -2018,19 +2082,44 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_node *leaf; MDB_ppage *top; MDB_pageparent mpp; + DKBUF; assert(cursor); assert(key); assert(key->mv_size > 0); + /* See if we're already on the right page */ + if (cursor->mc_initialized) { + MDB_val nodekey; + top = CURSOR_TOP(cursor); + /* Don't try this for LEAF2 pages. Maybe support that later. */ + if ((top->mp_page->mp_flags & (P_LEAF|P_LEAF2)) == P_LEAF) { + leaf = NODEPTR(top->mp_page, 0); + MDB_SET_KEY(leaf, &nodekey); + rc = mdb_cmp(cursor->mc_txn, cursor->mc_dbi, key, &nodekey); + if (rc >= 0) { + leaf = NODEPTR(top->mp_page, NUMKEYS(top->mp_page)-1); + MDB_SET_KEY(leaf, &nodekey); + rc = mdb_cmp(cursor->mc_txn, cursor->mc_dbi, key, &nodekey); + if (rc <= 0) { + /* we're already on the right page */ + mpp.mp_page = top->mp_page; + rc = 0; + goto set2; + } + } + } + } cursor->mc_snum = 0; rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, key, cursor, 0, &mpp); if (rc != MDB_SUCCESS) return rc; + assert(IS_LEAF(mpp.mp_page)); top = CURSOR_TOP(cursor); +set2: leaf = mdb_search_node(cursor->mc_txn, cursor->mc_dbi, mpp.mp_page, key, exactp, &top->mp_ki); if (exactp != NULL && !*exactp) { /* MDB_SET specified and not an exact match. */ @@ -2051,6 +2140,12 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data, cursor->mc_initialized = 1; cursor->mc_eof = 0; + if (IS_LEAF2(mpp.mp_page)) { + key->mv_size = cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_pad; + key->mv_data = LEAF2KEY(mpp.mp_page, top->mp_ki, key->mv_size); + return MDB_SUCCESS; + } + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); } @@ -2060,15 +2155,12 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data, rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL); } else { int ex2, *ex2p; - MDB_cursor_op op2; if (op == MDB_GET_BOTH) { ex2p = &ex2; - op2 = MDB_SET; } else { ex2p = NULL; - op2 = MDB_SET_RANGE; } - rc = mdb_cursor_set(&cursor->mc_xcursor->mx_cursor, data, NULL, op2, ex2p); + rc = mdb_cursor_set(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_SET_RANGE, ex2p); if (rc != MDB_SUCCESS) return rc; } @@ -2088,12 +2180,10 @@ mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data, } } - rc = mdb_set_key(leaf, key); - if (rc == MDB_SUCCESS) { - DPRINTF("==> cursor placed on key %.*s", - (int)key->mv_size, (char *)key->mv_data); - ; - } + /* The key already matches in all other cases */ + if (op == MDB_SET_RANGE) + MDB_SET_KEY(leaf, key); + DPRINTF("==> cursor placed on key [%s]", DKEY(key)); return rc; } @@ -2116,6 +2206,12 @@ mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data) cursor->mc_initialized = 1; cursor->mc_eof = 0; + if (IS_LEAF2(mpp.mp_page)) { + key->mv_size = cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_pad; + key->mv_data = LEAF2KEY(mpp.mp_page, 0, key->mv_size); + return MDB_SUCCESS; + } + if (data) { if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); @@ -2123,11 +2219,14 @@ mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data) if (rc) return rc; } else { + if (cursor->mc_xcursor) + cursor->mc_xcursor->mx_cursor.mc_initialized = 0; if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS) return rc; } } - return mdb_set_key(leaf, key); + MDB_SET_KEY(leaf, key); + return MDB_SUCCESS; } static int @@ -2156,6 +2255,12 @@ mdb_cursor_last(MDB_cursor *cursor, MDB_val *key, MDB_val *data) top = CURSOR_TOP(cursor); top->mp_ki = NUMKEYS(top->mp_page) - 1; + if (IS_LEAF2(mpp.mp_page)) { + key->mv_size = cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_pad; + key->mv_data = LEAF2KEY(mpp.mp_page, top->mp_ki, key->mv_size); + return MDB_SUCCESS; + } + if (data) { if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf); @@ -2168,7 +2273,8 @@ mdb_cursor_last(MDB_cursor *cursor, MDB_val *key, MDB_val *data) } } - return mdb_set_key(leaf, key); + MDB_SET_KEY(leaf, key); + return MDB_SUCCESS; } int @@ -2197,6 +2303,41 @@ mdb_cursor_get(MDB_cursor *cursor, MDB_val *key, MDB_val *data, else rc = mdb_cursor_set(cursor, key, data, op, &exact); break; + case MDB_GET_MULTIPLE: + if (data == NULL || + !(cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPFIXED) || + !cursor->mc_initialized) { + rc = EINVAL; + break; + } + rc = MDB_SUCCESS; + if (!cursor->mc_xcursor->mx_cursor.mc_initialized || cursor->mc_xcursor->mx_cursor.mc_eof) + break; + goto fetchm; + case MDB_NEXT_MULTIPLE: + if (data == NULL || + !(cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPFIXED)) { + rc = EINVAL; + break; + } + if (!cursor->mc_initialized) + rc = mdb_cursor_first(cursor, key, data); + else + rc = mdb_cursor_next(cursor, key, data, MDB_NEXT_DUP); + if (rc == MDB_SUCCESS) { + if (cursor->mc_xcursor->mx_cursor.mc_initialized) { + MDB_ppage *top; +fetchm: + top = CURSOR_TOP(&cursor->mc_xcursor->mx_cursor); + data->mv_size = NUMKEYS(top->mp_page) * + cursor->mc_xcursor->mx_txn.mt_dbs[cursor->mc_xcursor->mx_cursor.mc_dbi].md_pad; + data->mv_data = METADATA(top->mp_page); + top->mp_ki = NUMKEYS(top->mp_page)-1; + } else { + rc = MDB_NOTFOUND; + } + } + break; case MDB_NEXT: case MDB_NEXT_DUP: case MDB_NEXT_NODUP: @@ -2293,14 +2434,31 @@ mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, indx_t indx, indx_t ofs; MDB_node *node; MDB_dpage *ofp = NULL; /* overflow page */ + DKBUF; assert(mp->mp_upper >= mp->mp_lower); - DPRINTF("add node [%.*s] to %s page %lu at index %i, key size %zu", - key ? (int)key->mv_size : 0, key ? (char *)key->mv_data : NULL, + DPRINTF("add node [%s] to %s page %lu at index %i, key size %zu", + key ? DKEY(key) : NULL, IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno, indx, key ? key->mv_size : 0); + if (IS_LEAF2(mp)) { + /* Move higher keys up one slot. */ + int ksize = txn->mt_dbs[dbi].md_pad, dif; + char *ptr = LEAF2KEY(mp, indx, ksize); + dif = NUMKEYS(mp) - indx; + if (dif > 0) + memmove(ptr+ksize, ptr, dif*ksize); + /* insert new key */ + memcpy(ptr, key->mv_data, ksize); + + /* Just using these for counting */ + mp->mp_lower += sizeof(indx_t); + mp->mp_upper -= ksize - sizeof(indx_t); + return MDB_SUCCESS; + } + if (key != NULL) node_size += key->mv_size; @@ -2351,7 +2509,7 @@ mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, indx_t indx, if (IS_LEAF(mp)) node->mn_dsize = data->mv_size; else - node->mn_pgno = pgno; + NODEPGNO(node) = pgno; if (key) memcpy(NODEKEY(node), key->mv_data, key->mv_size); @@ -2376,7 +2534,7 @@ mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, indx_t indx, } static void -mdb_del_node(MDB_page *mp, indx_t indx) +mdb_del_node(MDB_page *mp, indx_t indx, int ksize) { unsigned int sz; indx_t i, j, numkeys, ptr; @@ -2387,6 +2545,16 @@ mdb_del_node(MDB_page *mp, indx_t indx) IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno); assert(indx < NUMKEYS(mp)); + if (IS_LEAF2(mp)) { + int x = NUMKEYS(mp) - 1 - indx; + base = LEAF2KEY(mp, indx, ksize); + if (x) + memmove(base, base + ksize, x * ksize); + mp->mp_lower -= sizeof(indx_t); + mp->mp_upper += ksize - sizeof(indx_t); + return; + } + node = NODEPTR(mp, indx); sz = NODESIZE + node->mn_ksize; if (IS_LEAF(mp)) { @@ -2460,6 +2628,8 @@ mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_node *node) mx->mx_txn.mt_next_pgno = txn->mt_next_pgno; mx->mx_txn.mt_oldest = txn->mt_oldest; mx->mx_txn.mt_u = txn->mt_u; + mx->mx_cursor.mc_initialized = 0; + mx->mx_cursor.mc_eof = 0; } static void @@ -2549,13 +2719,14 @@ mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key) size_t len; MDB_node *node; char *base; + DKBUF; node = NODEPTR(mp, indx); ptr = mp->mp_ptrs[indx]; - DPRINTF("update key %u (ofs %u) [%.*s] to [%.*s] on page %lu", + DPRINTF("update key %u (ofs %u) [%.*s] to [%s] on page %lu", indx, ptr, (int)node->mn_ksize, (char *)NODEKEY(node), - (int)key->mv_size, (char *)key->mv_data, + DKEY(key), mp->mp_pgno); delta = key->mv_size - node->mn_ksize; @@ -2594,26 +2765,35 @@ mdb_move_node(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src, indx_t srcindx, int rc; MDB_node *srcnode; MDB_val key, data; - - srcnode = NODEPTR(src->mp_page, srcindx); - DPRINTF("moving %s node %u [%.*s] on page %lu to node %u on page %lu", - IS_LEAF(src->mp_page) ? "leaf" : "branch", - srcindx, - (int)srcnode->mn_ksize, (char *)NODEKEY(srcnode), - src->mp_page->mp_pgno, - dstindx, dst->mp_page->mp_pgno); + DKBUF; /* Mark src and dst as dirty. */ if ((rc = mdb_touch(txn, src)) || (rc = mdb_touch(txn, dst))) return rc;; + if (IS_LEAF2(src->mp_page)) { + srcnode = NODEPTR(src->mp_page, 0); /* fake */ + key.mv_size = txn->mt_dbs[dbi].md_pad; + key.mv_data = LEAF2KEY(src->mp_page, srcindx, key.mv_size); + data.mv_size = 0; + data.mv_data = NULL; + } else { + srcnode = NODEPTR(src->mp_page, srcindx); + key.mv_size = NODEKSZ(srcnode); + key.mv_data = NODEKEY(srcnode); + data.mv_size = NODEDSZ(srcnode); + data.mv_data = NODEDATA(srcnode); + } + DPRINTF("moving %s node %u [%s] on page %lu to node %u on page %lu", + IS_LEAF(src->mp_page) ? "leaf" : "branch", + srcindx, + DKEY(&key), + src->mp_page->mp_pgno, + dstindx, dst->mp_page->mp_pgno); + /* Add the node to the destination page. */ - key.mv_size = srcnode->mn_ksize; - key.mv_data = NODEKEY(srcnode); - data.mv_size = NODEDSZ(srcnode); - data.mv_data = NODEDATA(srcnode); rc = mdb_add_node(txn, dbi, dst->mp_page, dstindx, &key, &data, NODEPGNO(srcnode), srcnode->mn_flags); if (rc != MDB_SUCCESS) @@ -2621,36 +2801,45 @@ mdb_move_node(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src, indx_t srcindx, /* Delete the node from the source page. */ - mdb_del_node(src->mp_page, srcindx); + mdb_del_node(src->mp_page, srcindx, key.mv_size); - /* Update the parent separators. + /* The key value just changed due to del_node, find it again. */ - if (srcindx == 0 && src->mp_pi != 0) { - DPRINTF("update separator for source page %lu to [%.*s]", - src->mp_page->mp_pgno, (int)key.mv_size, (char *)key.mv_data); - if ((rc = mdb_update_key(src->mp_parent, src->mp_pi, - &key)) != MDB_SUCCESS) - return rc; - } - - if (srcindx == 0 && IS_BRANCH(src->mp_page)) { - MDB_val nullkey; - nullkey.mv_size = 0; - assert(mdb_update_key(src->mp_page, 0, &nullkey) == MDB_SUCCESS); + if (!IS_LEAF2(src->mp_page)) { + srcnode = NODEPTR(src->mp_page, srcindx); + key.mv_data = NODEKEY(srcnode); } - if (dstindx == 0 && dst->mp_pi != 0) { - DPRINTF("update separator for destination page %lu to [%.*s]", - dst->mp_page->mp_pgno, (int)key.mv_size, (char *)key.mv_data); - if ((rc = mdb_update_key(dst->mp_parent, dst->mp_pi, - &key)) != MDB_SUCCESS) - return rc; + /* Update the parent separators. + */ + if (srcindx == 0) { + if (src->mp_pi != 0) { + DPRINTF("update separator for source page %lu to [%s]", + src->mp_page->mp_pgno, DKEY(&key)); + if ((rc = mdb_update_key(src->mp_parent, src->mp_pi, + &key)) != MDB_SUCCESS) + return rc; + } + if (IS_BRANCH(src->mp_page)) { + MDB_val nullkey; + nullkey.mv_size = 0; + assert(mdb_update_key(src->mp_page, 0, &nullkey) == MDB_SUCCESS); + } } - if (dstindx == 0 && IS_BRANCH(dst->mp_page)) { - MDB_val nullkey; - nullkey.mv_size = 0; - assert(mdb_update_key(dst->mp_page, 0, &nullkey) == MDB_SUCCESS); + if (dstindx == 0) { + if (dst->mp_pi != 0) { + DPRINTF("update separator for destination page %lu to [%s]", + dst->mp_page->mp_pgno, DKEY(&key)); + if ((rc = mdb_update_key(dst->mp_parent, dst->mp_pi, + &key)) != MDB_SUCCESS) + return rc; + } + if (IS_BRANCH(dst->mp_page)) { + MDB_val nullkey; + nullkey.mv_size = 0; + assert(mdb_update_key(dst->mp_page, 0, &nullkey) == MDB_SUCCESS); + } } return MDB_SUCCESS; @@ -2679,17 +2868,29 @@ mdb_merge(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src, MDB_pageparent *dst) /* Move all nodes from src to dst. */ - for (i = 0; i < NUMKEYS(src->mp_page); i++) { - srcnode = NODEPTR(src->mp_page, i); - - key.mv_size = srcnode->mn_ksize; - key.mv_data = NODEKEY(srcnode); - data.mv_size = NODEDSZ(srcnode); - data.mv_data = NODEDATA(srcnode); - rc = mdb_add_node(txn, dbi, dst->mp_page, NUMKEYS(dst->mp_page), &key, - &data, NODEPGNO(srcnode), srcnode->mn_flags); - if (rc != MDB_SUCCESS) - return rc; + if (IS_LEAF2(src->mp_page)) { + key.mv_size = txn->mt_dbs[dbi].md_pad; + key.mv_data = METADATA(src->mp_page); + for (i = 0; i < NUMKEYS(src->mp_page); i++) { + rc = mdb_add_node(txn, dbi, dst->mp_page, NUMKEYS(dst->mp_page), &key, + NULL, 0, 0); + if (rc != MDB_SUCCESS) + return rc; + key.mv_data = (char *)key.mv_data + key.mv_size; + } + } else { + for (i = 0; i < NUMKEYS(src->mp_page); i++) { + srcnode = NODEPTR(src->mp_page, i); + + key.mv_size = srcnode->mn_ksize; + key.mv_data = NODEKEY(srcnode); + data.mv_size = NODEDSZ(srcnode); + data.mv_data = NODEDATA(srcnode); + rc = mdb_add_node(txn, dbi, dst->mp_page, NUMKEYS(dst->mp_page), &key, + &data, NODEPGNO(srcnode), srcnode->mn_flags); + if (rc != MDB_SUCCESS) + return rc; + } } DPRINTF("dst page %lu now has %u keys (%.1f%% filled)", @@ -2697,7 +2898,7 @@ mdb_merge(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src, MDB_pageparent *dst) /* Unlink the src page from parent. */ - mdb_del_node(src->mp_parent, src->mp_pi); + mdb_del_node(src->mp_parent, src->mp_pi, 0); if (src->mp_pi == 0) { key.mv_size = 0; if ((rc = mdb_update_key(src->mp_parent, 0, &key)) != MDB_SUCCESS) @@ -2745,8 +2946,8 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp) if (NUMKEYS(mpp->mp_page) == 0) { DPUTS("tree is completely empty"); txn->mt_dbs[dbi].md_root = P_INVALID; - txn->mt_dbs[dbi].md_depth--; - txn->mt_dbs[dbi].md_leaf_pages--; + txn->mt_dbs[dbi].md_depth = 0; + txn->mt_dbs[dbi].md_leaf_pages = 0; } else if (IS_BRANCH(mpp->mp_page) && NUMKEYS(mpp->mp_page) == 1) { DPUTS("collapsing root page!"); txn->mt_dbs[dbi].md_root = NODEPGNO(NODEPTR(mpp->mp_page, 0)); @@ -2818,7 +3019,7 @@ mdb_del0(MDB_txn *txn, MDB_dbi dbi, unsigned int ki, MDB_pageparent *mpp, MDB_no int rc; /* add overflow pages to free list */ - if (F_ISSET(leaf->mn_flags, F_BIGDATA)) { + if (!IS_LEAF2(mpp->mp_page) && F_ISSET(leaf->mn_flags, F_BIGDATA)) { int i, ovpages; pgno_t pg; @@ -2830,7 +3031,7 @@ mdb_del0(MDB_txn *txn, MDB_dbi dbi, unsigned int ki, MDB_pageparent *mpp, MDB_no pg++; } } - mdb_del_node(mpp->mp_page, ki); + mdb_del_node(mpp->mp_page, ki, txn->mt_dbs[dbi].md_pad); txn->mt_dbs[dbi].md_entries--; rc = mdb_rebalance(txn, dbi, mpp); if (rc != MDB_SUCCESS) @@ -2848,11 +3049,12 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, unsigned int ki; MDB_node *leaf; MDB_pageparent mpp; - - DPRINTF("========> delete key %.*s", (int)key->mv_size, (char *)key->mv_data); + DKBUF; assert(key != NULL); + DPRINTF("====> delete db %u key [%s]", dbi, DKEY(key)); + if (txn == NULL || !dbi || dbi >= txn->mt_numdbs) return EINVAL; @@ -2874,7 +3076,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, return MDB_NOTFOUND; } - if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { + if (!IS_LEAF2(mpp.mp_page) && F_ISSET(leaf->mn_flags, F_DUPDATA)) { MDB_xcursor mx; MDB_pageparent mp2; @@ -2909,7 +3111,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, parent = CURSOR_PARENT(&mx.mx_cursor); for (i=0; imp_page); i++) { ni = NODEPTR(top->mp_page, i); - mdb_midl_insert(txn->mt_free_pgs, ni->mn_pgno); + mdb_midl_insert(txn->mt_free_pgs, NODEPGNO(ni)); } parent->mp_ki++; if (parent->mp_ki >= NUMKEYS(parent->mp_page)) { @@ -2917,7 +3119,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, top = parent; } else { ni = NODEPTR(parent->mp_page, parent->mp_ki); - top->mp_page = mdb_get_page(&mx.mx_txn, ni->mn_pgno); + top->mp_page = mdb_get_page(&mx.mx_txn, NODEPGNO(ni)); } } } @@ -2951,6 +3153,7 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp, MDB_page *copy; MDB_dpage *mdp, *rdp, *pdp; MDB_dhead *dh; + DKBUF; assert(txn != NULL); @@ -2958,9 +3161,9 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp, mdp = (MDB_dpage *)dh; newindx = *newindxp; - DPRINTF("-----> splitting %s page %lu and adding [%.*s] at index %i", + DPRINTF("-----> splitting %s page %lu and adding [%s] at index %i", IS_LEAF(&mdp->p) ? "leaf" : "branch", mdp->p.mp_pgno, - (int)newkey->mv_size, (char *)newkey->mv_data, *newindxp); + DKEY(newkey), *newindxp); if (mdp->h.md_parent == NULL) { if ((pdp = mdb_new_page(txn, dbi, P_BRANCH, 1)) == NULL) @@ -2986,6 +3189,51 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp, rdp->h.md_pi = mdp->h.md_pi + 1; DPRINTF("new right sibling: page %lu", rdp->p.mp_pgno); + split_indx = NUMKEYS(&mdp->p) / 2 + 1; + + if (IS_LEAF2(&rdp->p)) { + char *split, *ins; + int x; + unsigned int nkeys = NUMKEYS(&mdp->p), lsize, rsize, ksize; + /* Move half of the keys to the right sibling */ + copy = NULL; + x = *newindxp - split_indx; + ksize = txn->mt_dbs[dbi].md_pad; + split = LEAF2KEY(&mdp->p, split_indx, ksize); + rsize = (nkeys - split_indx) * ksize; + lsize = (nkeys - split_indx) * sizeof(indx_t); + mdp->p.mp_lower -= lsize; + rdp->p.mp_lower += lsize; + mdp->p.mp_upper += rsize - lsize; + rdp->p.mp_upper -= rsize - lsize; + sepkey.mv_size = ksize; + if (newindx == split_indx) { + sepkey.mv_data = newkey->mv_data; + } else { + sepkey.mv_data = split; + } + if (x<0) { + ins = LEAF2KEY(&mdp->p, *newindxp, ksize); + memcpy(&rdp->p.mp_ptrs, split, rsize); + sepkey.mv_data = &rdp->p.mp_ptrs; + memmove(ins+ksize, ins, (split_indx - *newindxp) * ksize); + memcpy(ins, newkey->mv_data, ksize); + mdp->p.mp_lower += sizeof(indx_t); + mdp->p.mp_upper -= ksize - sizeof(indx_t); + } else { + if (x) + memcpy(&rdp->p.mp_ptrs, split, x * ksize); + ins = LEAF2KEY(&rdp->p, x, ksize); + memcpy(ins, newkey->mv_data, ksize); + memcpy(ins+ksize, split + x * ksize, rsize - x * ksize); + rdp->p.mp_lower += sizeof(indx_t); + rdp->p.mp_upper -= ksize - sizeof(indx_t); + *newindxp = x; + *mpp = &rdp->p; + } + goto newsep; + } + /* Move half of the keys to the right sibling. */ if ((copy = malloc(txn->mt_env->me_psize)) == NULL) return ENOMEM; @@ -2994,8 +3242,6 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp, mdp->p.mp_lower = PAGEHDRSZ; mdp->p.mp_upper = txn->mt_env->me_psize; - split_indx = NUMKEYS(copy) / 2 + 1; - /* First find the separating key between the split pages. */ memset(&sepkey, 0, sizeof(sepkey)); @@ -3008,7 +3254,8 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp, sepkey.mv_data = NODEKEY(node); } - DPRINTF("separator is [%.*s]", (int)sepkey.mv_size, (char *)sepkey.mv_data); +newsep: + DPRINTF("separator is [%s]", DKEY(&sepkey)); /* Copy separator key to the parent. */ @@ -3028,6 +3275,9 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp, rc = mdb_add_node(txn, dbi, rdp->h.md_parent, rdp->h.md_pi, &sepkey, NULL, rdp->p.mp_pgno, 0); } + if (IS_LEAF2(&rdp->p)) { + return rc; + } if (rc != MDB_SUCCESS) { free(copy); return rc; @@ -3071,7 +3321,7 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp, rdata.mv_data = NODEDATA(node); rdata.mv_size = node->mn_dsize; } else - pgno = node->mn_pgno; + pgno = NODEPGNO(node); flags = node->mn_flags; i++; @@ -3101,9 +3351,11 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi, MDB_db dummy; char dbuf[PAGESIZE]; int do_sub = 0; + size_t nsize; + DKBUF; - DPRINTF("==> put key %.*s, size %zu, data size %zu", - (int)key->mv_size, (char *)key->mv_data, key->mv_size, data->mv_size); + DPRINTF("==> put db %u key [%s], size %zu, data size %zu", + dbi, DKEY(key), key->mv_size, data->mv_size); dkey.mv_size = 0; mpp.mp_parent = NULL; @@ -3113,22 +3365,35 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi, leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki); if (leaf && exact) { if (flags == MDB_NOOVERWRITE) { - DPRINTF("duplicate key %.*s", - (int)key->mv_size, (char *)key->mv_data); + DPRINTF("duplicate key [%s]", DKEY(key)); return MDB_KEYEXIST; } + /* there's only a key anyway, so this is a no-op */ + if (IS_LEAF2(mpp.mp_page)) + return MDB_SUCCESS; + if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { /* Was a single item before, must convert now */ if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) { dkey.mv_size = NODEDSZ(leaf); + dkey.mv_data = dbuf; memcpy(dbuf, NODEDATA(leaf), dkey.mv_size); + /* data matches, ignore it */ + if (!mdb_dcmp(txn, dbi, data, &dkey)) + return (flags == MDB_NODUPDATA) ? MDB_KEYEXIST : MDB_SUCCESS; memset(&dummy, 0, sizeof(dummy)); + if (txn->mt_dbs[dbi].md_flags & MDB_DUPFIXED) { + dummy.md_pad = data->mv_size; + dummy.md_flags = MDB_DUPFIXED; + if (txn->mt_dbs[dbi].md_flags & MDB_INTEGERDUP) + dummy.md_flags |= MDB_INTEGERKEY; + } dummy.md_root = P_INVALID; if (dkey.mv_size == sizeof(MDB_db)) { memcpy(NODEDATA(leaf), &dummy, sizeof(dummy)); goto put_sub; } - mdb_del_node(mpp.mp_page, ki); + mdb_del_node(mpp.mp_page, ki, 0); do_sub = 1; rdata = &xdata; xdata.mv_size = sizeof(MDB_db); @@ -3142,7 +3407,7 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi, memcpy(NODEDATA(leaf), data->mv_data, data->mv_size); goto done; } - mdb_del_node(mpp.mp_page, ki); + mdb_del_node(mpp.mp_page, ki, 0); } if (leaf == NULL) { /* append if not found */ ki = NUMKEYS(mpp.mp_page); @@ -3159,6 +3424,8 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi, txn->mt_dbs[dbi].md_root = mpp.mp_page->mp_pgno; txn->mt_dbs[dbi].md_depth++; txn->mt_dbxs[dbi].md_dirty = 1; + if ((txn->mt_dbs[dbi].md_flags & (MDB_DUPSORT|MDB_DUPFIXED)) == MDB_DUPFIXED) + mpp.mp_page->mp_flags |= P_LEAF2; ki = 0; } else @@ -3171,7 +3438,8 @@ mdb_put0(MDB_txn *txn, MDB_dbi dbi, rdata = data; new_sub: - if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(txn->mt_env, key, rdata)) { + nsize = IS_LEAF2(mpp.mp_page) ? key->mv_size : mdb_leaf_size(txn->mt_env, key, rdata); + if (SIZELEFT(mpp.mp_page) < nsize) { rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, rdata, P_INVALID); } else { /* There is room already in this leaf page. */ @@ -3205,7 +3473,6 @@ put_sub: flags = MDB_NOOVERWRITE; /* converted, write the original data first */ if (dkey.mv_size) { - dkey.mv_data = dbuf; rc = mdb_put0(&mx.mx_txn, mx.mx_cursor.mc_dbi, &dkey, &xdata, flags); if (rc) return rc; leaf->mn_flags |= F_DUPDATA; diff --git a/libraries/libmdb/mdb.h b/libraries/libmdb/mdb.h index 9d6ebf4..5e679c5 100644 --- a/libraries/libmdb/mdb.h +++ b/libraries/libmdb/mdb.h @@ -70,9 +70,11 @@ typedef enum MDB_cursor_op { /* cursor operations */ MDB_FIRST, MDB_GET_BOTH, /* position at key/data */ MDB_GET_BOTH_RANGE, /* position at key, nearest data */ + MDB_GET_MULTIPLE, /* only for MDB_DUPFIXED */ MDB_LAST, MDB_NEXT, MDB_NEXT_DUP, + MDB_NEXT_MULTIPLE, /* only for MDB_DUPFIXED */ MDB_NEXT_NODUP, MDB_PREV, MDB_PREV_DUP, @@ -95,6 +97,8 @@ typedef enum MDB_cursor_op { /* cursor operations */ #define MDB_REVERSEKEY 0x02 /* use reverse string keys */ #define MDB_DUPSORT 0x04 /* use sorted duplicates */ #define MDB_INTEGERKEY 0x08 /* numeric keys in native byte order */ +#define MDB_DUPFIXED 0x10 /* sorted dup items have fixed size */ +#define MDB_INTEGERDUP 0x20 /* numeric dups in native byte order */ /* environment flags */ #define MDB_FIXEDMAP 0x01 /* mmap at a fixed address */ diff --git a/libraries/libmdb/mtest4.c b/libraries/libmdb/mtest4.c new file mode 100644 index 0000000..92fe184 --- /dev/null +++ b/libraries/libmdb/mtest4.c @@ -0,0 +1,161 @@ +/* mtest4.c - memory-mapped database tester/toy */ +/* + * Copyright 2011 Howard Chu, Symas Corp. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted only as authorized by the OpenLDAP + * Public License. + * + * A copy of this license is available in the file LICENSE in the + * top-level directory of the distribution or, alternatively, at + * . + */ + +/* Tests for sorted duplicate DBs with fixed-size keys */ +#define _XOPEN_SOURCE 500 /* srandom(), random() */ +#include +#include +#include +#include +#include "mdb.h" + +int main(int argc,char * argv[]) +{ + int i = 0, j = 0, rc; + MDB_env *env; + MDB_dbi dbi; + MDB_val key, data; + MDB_txn *txn; + MDB_stat mst; + MDB_cursor *cursor; + int count; + int *values; + char sval[8]; + char kval[sizeof(int)]; + + memset(sval, 0, sizeof(sval)); + + count = 510; + values = (int *)malloc(count*sizeof(int)); + + for(i = 0;i -1; i-= (random()%3)) { + j++; + txn=NULL; + rc = mdb_txn_begin(env, 0, &txn); + sprintf(sval, "%07x", values[i]); + key.mv_size = sizeof(int); + key.mv_data = kval; + data.mv_size = sizeof(sval); + data.mv_data = sval; + rc = mdb_del(txn, dbi, &key, &data, MDB_DEL_DUP); + if (rc) { + j--; + mdb_txn_abort(txn); + } else { + rc = mdb_txn_commit(txn); + } + } + free(values); + printf("Deleted %d values\n", j); + + rc = mdb_env_stat(env, &mst); + rc = mdb_txn_begin(env, 1, &txn); + rc = mdb_cursor_open(txn, dbi, &cursor); + printf("Cursor next\n"); + while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) { + printf("key: %.*s, data: %.*s\n", + (int) key.mv_size, (char *) key.mv_data, + (int) data.mv_size, (char *) data.mv_data); + } + printf("Cursor prev\n"); + while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_PREV)) == 0) { + printf("key: %.*s, data: %.*s\n", + (int) key.mv_size, (char *) key.mv_data, + (int) data.mv_size, (char *) data.mv_data); + } + mdb_cursor_close(cursor); + mdb_close(txn, dbi); + + mdb_txn_abort(txn); + mdb_env_close(env); + + return 0; +}