diff --git a/libraries/libmdb/mdb.c b/libraries/libmdb/mdb.c index e265104..2e914d8 100644 --- a/libraries/libmdb/mdb.c +++ b/libraries/libmdb/mdb.c @@ -21,6 +21,7 @@ #include #include #include +#include #include "mdb.h" @@ -272,7 +273,6 @@ struct MDB_env { MDB_dbx *me_dbxs; /* array */ MDB_db *me_dbs[2]; MDB_oldpages *me_pghead; - MDB_oldpages *me_pgtail; pthread_key_t me_txkey; /* thread-key for readers */ }; @@ -309,6 +309,8 @@ static int mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, indx_t indx, MDB_val *key, MDB_val *data, pgno_t pgno, uint8_t flags); static void mdb_del_node(MDB_page *mp, indx_t indx); +static int mdb_del0(MDB_txn *txn, MDB_dbi dbi, unsigned int ki, + MDB_pageparent *mpp, MDB_node *leaf); static int mdb_read_data(MDB_env *env, MDB_node *leaf, MDB_val *data); static int mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mp); @@ -401,9 +403,45 @@ mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num) { MDB_dpage *dp; pgno_t pgno = P_INVALID; + ULONG oldest = txn->mt_txnid - 2; + if (!txn->mt_env->me_pghead && txn->mt_dbs[FREE_DBI].md_root != P_INVALID) { + /* See if there's anything in the free DB */ + MDB_pageparent mpp; + MDB_node *leaf; + ULONG *kptr; + + mpp.mp_parent = NULL; + mpp.mp_pi = 0; + mdb_search_page(txn, FREE_DBI, NULL, NULL, 0, &mpp); + leaf = NODEPTR(mpp.mp_page, 0); + kptr = (ULONG *)NODEKEY(leaf); + + /* It's potentially usable, unless there are still + * older readers outstanding. Grab it. + */ + if (oldest > *kptr) { + MDB_oldpages *mop; + MDB_val data; + pgno_t *idl; + + mdb_read_data(txn->mt_env, leaf, &data); + idl = (ULONG *)data.mv_data; + mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(idl) - sizeof(pgno_t)); + mop->mo_next = txn->mt_env->me_pghead; + mop->mo_txnid = *kptr; + txn->mt_env->me_pghead = mop; + memcpy(mop->mo_pages, idl, MDB_IDL_SIZEOF(idl)); + + /* drop this IDL from the DB */ + mpp.mp_parent = NULL; + mpp.mp_pi = 0; + mdb_search_page(txn, FREE_DBI, NULL, NULL, 1, &mpp); + leaf = NODEPTR(mpp.mp_page, 0); + mdb_del0(txn, FREE_DBI, 0, &mpp, leaf); + } + } if (txn->mt_env->me_pghead) { - ULONG oldest = txn->mt_txnid - 2; unsigned int i; for (i=0; imt_env->me_txns->mt_numreaders; i++) { ULONG mr = txn->mt_env->me_txns->mt_readers[i].mr_txnid; @@ -432,8 +470,6 @@ mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num) } if (MDB_IDL_IS_ZERO(mop->mo_pages)) { txn->mt_env->me_pghead = mop->mo_next; - if (!txn->mt_env->me_pghead) - txn->mt_env->me_pgtail = NULL; free(mop); } } @@ -604,31 +640,12 @@ mdb_txn_abort(MDB_txn *txn) if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { txn->mt_u.reader->mr_txnid = 0; } else { - /* Discard all dirty pages. Return any re-used pages - * to the free list. - */ - MDB_IDL_ZERO(txn->mt_free_pgs); + /* Discard all dirty pages. */ while (!STAILQ_EMPTY(txn->mt_u.dirty_queue)) { dp = STAILQ_FIRST(txn->mt_u.dirty_queue); STAILQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next); - if (dp->p.mp_pgno <= env->me_meta->mm_last_pg) - mdb_idl_insert(txn->mt_free_pgs, dp->p.mp_pgno); free(dp); } - /* put back to head of free list */ - if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) { - MDB_oldpages *mop; - - mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(txn->mt_free_pgs) - sizeof(pgno_t)); - mop->mo_next = env->me_pghead; - mop->mo_txnid = txn->mt_oldest - 1; - if (!env->me_pghead) { - env->me_pgtail = mop; - } - env->me_pghead = mop; - memcpy(mop->mo_pages, txn->mt_free_pgs, MDB_IDL_SIZEOF(txn->mt_free_pgs)); - } - free(txn->mt_free_pgs); free(txn->mt_u.dirty_queue); env->me_txn = NULL; @@ -680,6 +697,40 @@ mdb_txn_commit(MDB_txn *txn) DPRINTF("committing transaction %lu on mdbenv %p, root page %lu", txn->mt_txnid, (void *) env, txn->mt_dbs[MAIN_DBI].md_root); + /* should only be one record now */ + if (env->me_pghead) { + MDB_val key, data; + MDB_oldpages *mop; + + mop = env->me_pghead; + key.mv_size = sizeof(pgno_t); + key.mv_data = (char *)&mop->mo_txnid; + data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages); + data.mv_data = mop->mo_pages; + mdb_put(txn, FREE_DBI, &key, &data, 0); + free(env->me_pghead); + env->me_pghead = NULL; + } + /* save to free list */ + if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) { + MDB_val key, data; + MDB_pageparent mpp; + + /* make sure last page of freeDB is touched and on freelist */ + key.mv_size = MAXKEYSIZE+1; + key.mv_data = NULL; + mpp.mp_parent = NULL; + mpp.mp_pi = 0; + mdb_search_page(txn, FREE_DBI, &key, NULL, 1, &mpp); + + /* write to last page of freeDB */ + key.mv_size = sizeof(pgno_t); + key.mv_data = (char *)&txn->mt_txnid; + data.mv_size = MDB_IDL_SIZEOF(txn->mt_free_pgs); + data.mv_data = txn->mt_free_pgs; + mdb_put(txn, FREE_DBI, &key, &data, 0); + } + /* Update DB root pointers. Their pages have already been * touched so this is all in-place and cannot fail. */ @@ -790,22 +841,6 @@ mdb_txn_commit(MDB_txn *txn) free(txn->mt_dbs); } - /* add to tail of free list */ - if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) { - MDB_oldpages *mop; - - mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(txn->mt_free_pgs) - sizeof(pgno_t)); - mop->mo_next = NULL; - if (env->me_pghead) { - env->me_pgtail->mo_next = mop; - } else { - env->me_pghead = mop; - } - env->me_pgtail = mop; - memcpy(mop->mo_pages, txn->mt_free_pgs, MDB_IDL_SIZEOF(txn->mt_free_pgs)); - mop->mo_txnid = txn->mt_txnid; - } - pthread_mutex_unlock(&env->me_txns->mt_wmutex); free(txn->mt_free_pgs); free(txn->mt_u.dirty_queue); @@ -879,6 +914,10 @@ mdbenv_init_meta(MDB_env *env, MDB_meta *meta) meta->mm_psize = psize; meta->mm_last_pg = 1; meta->mm_flags = env->me_flags & 0xffff; +#if __BYTE_ORDER == __LITTLE_ENDIAN + /* freeDB keys are pgno_t's, must compare in int order */ + meta->mm_flags |= MDB_REVERSEKEY; +#endif meta->mm_dbs[0].md_root = P_INVALID; meta->mm_dbs[1].md_root = P_INVALID; @@ -2291,6 +2330,32 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp) } } +static int +mdb_del0(MDB_txn *txn, MDB_dbi dbi, unsigned int ki, MDB_pageparent *mpp, MDB_node *leaf) +{ + int rc; + + mdb_del_node(mpp->mp_page, ki); + /* add overflow pages to free list */ + if (F_ISSET(leaf->mn_flags, F_BIGDATA)) { + int i, ovpages; + pgno_t pg; + + memcpy(&pg, NODEDATA(leaf), sizeof(pg)); + ovpages = OVPAGES(NODEDSZ(leaf), txn->mt_env->me_psize); + for (i=0; imt_free_pgs, pg); + pg++; + } + } + txn->mt_dbs[dbi].md_entries--; + rc = mdb_rebalance(txn, dbi, mpp); + if (rc != MDB_SUCCESS) + txn->mt_flags |= MDB_TXN_ERROR; + + return rc; +} + int mdb_del(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data) @@ -2315,6 +2380,8 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, return EINVAL; } + mpp.mp_parent = NULL; + mpp.mp_pi = 0; if ((rc = mdb_search_page(txn, dbi, key, NULL, 1, &mpp)) != MDB_SUCCESS) return rc; @@ -2326,25 +2393,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, if (data && (rc = mdb_read_data(txn->mt_env, leaf, data)) != MDB_SUCCESS) return rc; - mdb_del_node(mpp.mp_page, ki); - /* add overflow pages to free list */ - if (F_ISSET(leaf->mn_flags, F_BIGDATA)) { - int i, ovpages; - pgno_t pg; - - memcpy(&pg, NODEDATA(leaf), sizeof(pg)); - ovpages = OVPAGES(NODEDSZ(leaf), txn->mt_env->me_psize); - for (i=0; imt_free_pgs, pg); - pg++; - } - } - txn->mt_dbs[dbi].md_entries--; - rc = mdb_rebalance(txn, dbi, &mpp); - if (rc != MDB_SUCCESS) - txn->mt_flags |= MDB_TXN_ERROR; - - return rc; + return mdb_del0(txn, dbi, ki, &mpp, leaf); } /* Split page <*mpp>, and insert in either left or @@ -2534,6 +2583,8 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, DPRINTF("==> put key %.*s, size %zu, data size %zu", (int)key->mv_size, (char *)key->mv_data, key->mv_size, data->mv_size); + mpp.mp_parent = NULL; + mpp.mp_pi = 0; rc = mdb_search_page(txn, dbi, key, NULL, 1, &mpp); if (rc == MDB_SUCCESS) { leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki);