From 852ac0e3bc46d497b67f7b5cacc46cc1bfa5d425 Mon Sep 17 00:00:00 2001 From: Howard Chu Date: Sun, 7 Aug 2011 22:37:45 -0700 Subject: [PATCH] More sub-db stuff --- libraries/libmdb/mdb.c | 417 +++++++++++++++++++++-------------------- libraries/libmdb/mdb.h | 9 +- 2 files changed, 223 insertions(+), 203 deletions(-) diff --git a/libraries/libmdb/mdb.c b/libraries/libmdb/mdb.c index 8ee48a4..e265104 100644 --- a/libraries/libmdb/mdb.c +++ b/libraries/libmdb/mdb.c @@ -33,7 +33,7 @@ typedef ULONG pgno_t; #define DEBUG 1 #endif -#if (DEBUG +0) && defined(__GNUC__) +#if DEBUG && defined(__GNUC__) # define DPRINTF(fmt, ...) \ fprintf(stderr, "%s:%d: " fmt "\n", __func__, __LINE__, ##__VA_ARGS__) #else @@ -97,7 +97,11 @@ typedef struct MDB_txninfo { * headers on any page after the first. */ typedef struct MDB_page { /* represents a page of storage */ - pgno_t mp_pgno; /* page number */ +#define mp_pgno mp_p.p_pgno + union padded { + pgno_t p_pgno; /* page number */ + void * p_pad; + } mp_p; #define P_BRANCH 0x01 /* branch page */ #define P_LEAF 0x02 /* leaf page */ #define P_OVERFLOW 0x04 /* overflow page */ @@ -121,29 +125,38 @@ typedef struct MDB_page { /* represents a page of storage */ #define NUMKEYS(p) (((p)->mp_lower - PAGEHDRSZ) >> 1) #define SIZELEFT(p) (indx_t)((p)->mp_upper - (p)->mp_lower) -#define PAGEFILL(env, p) (1000L * ((env)->me_meta.mm_psize - PAGEHDRSZ - SIZELEFT(p)) / \ - ((env)->me_meta.mm_psize - PAGEHDRSZ)) +#define PAGEFILL(env, p) (1000L * ((env)->me_psize - PAGEHDRSZ - SIZELEFT(p)) / \ + ((env)->me_psize - PAGEHDRSZ)) #define IS_LEAF(p) F_ISSET((p)->mp_flags, P_LEAF) #define IS_BRANCH(p) F_ISSET((p)->mp_flags, P_BRANCH) #define IS_OVERFLOW(p) F_ISSET((p)->mp_flags, P_OVERFLOW) #define OVPAGES(size, psize) (PAGEHDRSZ + size + psize - 1) / psize; +typedef struct MDB_db { + uint32_t md_pad; + uint16_t md_flags; + uint16_t md_depth; + ULONG md_branch_pages; + ULONG md_leaf_pages; + ULONG md_overflow_pages; + ULONG md_entries; + pgno_t md_root; +} MDB_db; + +#define FREE_DBI 0 +#define MAIN_DBI 1 + typedef struct MDB_meta { /* meta (footer) page content */ uint32_t mm_magic; uint32_t mm_version; void *mm_address; /* address for fixed mapping */ size_t mm_mapsize; /* size of mmap region */ + MDB_db mm_dbs[2]; /* first is free space, 2nd is main db */ +#define mm_psize mm_dbs[0].md_pad +#define mm_flags mm_dbs[0].md_flags pgno_t mm_last_pg; /* last used page in file */ ULONG mm_txnid; /* txnid that committed this page */ - uint32_t mm_psize; - uint16_t mm_flags; - uint16_t mm_depth; - ULONG mm_branch_pages; - ULONG mm_leaf_pages; - ULONG mm_overflow_pages; - ULONG mm_entries; - pgno_t mm_root; } MDB_meta; typedef struct MDB_dhead { /* a dirty page */ @@ -211,24 +224,12 @@ typedef struct MDB_node { char mn_data[1]; } MDB_node; -typedef struct MDB_db { - uint32_t md_pad; - uint16_t md_flags; - uint16_t md_depth; - ULONG md_branch_pages; - ULONG md_leaf_pages; - ULONG md_overflow_pages; - ULONG md_entries; - pgno_t md_root; -} MDB_db; typedef struct MDB_dbx { MDB_val md_name; MDB_cmp_func *md_cmp; /* user compare function */ MDB_cmp_func *md_dcmp; /* user dupsort function */ MDB_rel_func *md_rel; /* user relocate function */ - MDB_db *md_db; - MDB_page *md_page; } MDB_dbx; struct MDB_txn { @@ -242,7 +243,7 @@ struct MDB_txn { MDB_reader *reader; } mt_u; MDB_dbx *mt_dbxs; /* array */ - MDB_db mt_db0; /* just for write txns */ + MDB_db *mt_dbs; unsigned int mt_numdbs; #define MDB_TXN_RDONLY 0x01 /* read-only transaction */ @@ -255,19 +256,24 @@ struct MDB_env { int me_fd; int me_lfd; uint32_t me_flags; - unsigned int me_maxreaders; + unsigned int me_maxreaders; + unsigned int me_numdbs; + unsigned int me_maxdbs; char *me_path; char *me_map; MDB_txninfo *me_txns; - MDB_meta me_meta; + MDB_meta *me_metas[2]; + MDB_meta *me_meta; MDB_txn *me_txn; /* current write transaction */ size_t me_mapsize; off_t me_size; /* current file size */ - pthread_key_t me_txkey; /* thread-key for readers */ + unsigned int me_psize; + int me_db_toggle; + MDB_dbx *me_dbxs; /* array */ + MDB_db *me_dbs[2]; MDB_oldpages *me_pghead; MDB_oldpages *me_pgtail; - MDB_dbx *me_dbxs; /* array */ - unsigned int me_numdbs; + pthread_key_t me_txkey; /* thread-key for readers */ }; #define NODESIZE offsetof(MDB_node, mn_data) @@ -292,8 +298,7 @@ static int mdb_search_page(MDB_txn *txn, MDB_cursor *cursor, int modify, MDB_pageparent *mpp); -static int mdbenv_read_header(MDB_env *env); -static int mdb_check_meta_page(MDB_page *p); +static int mdbenv_read_header(MDB_env *env, MDB_meta *meta); static int mdbenv_read_meta(MDB_env *env, int *which); static int mdbenv_write_meta(MDB_txn *txn); static MDB_page *mdbenv_get_page(MDB_env *env, pgno_t pgno); @@ -384,7 +389,7 @@ mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b) static int _mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *key1, const MDB_val *key2) { - if (F_ISSET(txn->mt_dbxs[dbi].md_db->md_flags, MDB_REVERSEKEY)) + if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_REVERSEKEY)) return memnrcmp(key1->mv_data, key1->mv_size, key2->mv_data, key2->mv_size); else return memncmp((char *)key1->mv_data, key1->mv_size, key2->mv_data, key2->mv_size); @@ -435,7 +440,7 @@ mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num) } } - if ((dp = malloc(txn->mt_env->me_meta.mm_psize * num + sizeof(MDB_dhead))) == NULL) + if ((dp = malloc(txn->mt_env->me_psize * num + sizeof(MDB_dhead))) == NULL) return NULL; dp->h.md_num = num; dp->h.md_parent = parent; @@ -468,7 +473,7 @@ mdb_touch(MDB_txn *txn, MDB_pageparent *pp) DPRINTF("touched page %lu -> %lu", mp->mp_pgno, dp->p.mp_pgno); mdb_idl_insert(txn->mt_free_pgs, mp->mp_pgno); pgno = dp->p.mp_pgno; - memcpy(&dp->p, mp, txn->mt_env->me_meta.mm_psize); + memcpy(&dp->p, mp, txn->mt_env->me_psize); mp = &dp->p; mp->mp_pgno = pgno; mp->mp_flags |= P_DIRTY; @@ -492,8 +497,6 @@ mdbenv_sync(MDB_env *env) return rc; } -#define DBX_CHUNK 16 /* space for 16 DBs at a time */ - int mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret) { @@ -563,20 +566,21 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret) /* Copy the DB arrays */ txn->mt_numdbs = env->me_numdbs; - rc = (txn->mt_numdbs % DBX_CHUNK) + 1; - txn->mt_dbxs = malloc(rc * DBX_CHUNK * sizeof(MDB_dbx)); - memcpy(txn->mt_dbxs, env->me_dbxs, txn->mt_numdbs * sizeof(MDB_dbx)); + txn->mt_dbxs = env->me_dbxs; /* mostly static anyway */ + txn->mt_dbs = malloc(env->me_maxdbs * sizeof(MDB_db)); + memcpy(txn->mt_dbs, env->me_meta->mm_dbs, 2 * sizeof(MDB_db)); + if (txn->mt_numdbs > 2) + memcpy(txn->mt_dbs+2, env->me_dbs[env->me_db_toggle]+2, + (txn->mt_numdbs - 2) * sizeof(MDB_db)); if (!rdonly) { - memcpy(&txn->mt_db0, txn->mt_dbxs[0].md_db, sizeof(txn->mt_db0)); - txn->mt_dbxs[0].md_db = &txn->mt_db0; if (toggle) txn->mt_flags |= MDB_TXN_METOGGLE; - txn->mt_next_pgno = env->me_meta.mm_last_pg+1; + txn->mt_next_pgno = env->me_meta->mm_last_pg+1; } DPRINTF("begin transaction %lu on mdbenv %p, root page %lu", - txn->mt_txnid, (void *) env, txn->mt_dbxs[0].md_db->md_root); + txn->mt_txnid, (void *) env, txn->mt_dbs[MAIN_DBI].md_root); *ret = txn; return MDB_SUCCESS; @@ -593,9 +597,9 @@ mdb_txn_abort(MDB_txn *txn) env = txn->mt_env; DPRINTF("abort transaction %lu on mdbenv %p, root page %lu", - txn->mt_txnid, (void *) env, txn->mt_dbxs[0].md_db->md_root); + txn->mt_txnid, (void *) env, txn->mt_dbs[MAIN_DBI].md_root); - free(txn->mt_dbxs); + free(txn->mt_dbs); if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { txn->mt_u.reader->mr_txnid = 0; @@ -607,7 +611,7 @@ mdb_txn_abort(MDB_txn *txn) while (!STAILQ_EMPTY(txn->mt_u.dirty_queue)) { dp = STAILQ_FIRST(txn->mt_u.dirty_queue); STAILQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next); - if (dp->p.mp_pgno <= env->me_meta.mm_last_pg) + if (dp->p.mp_pgno <= env->me_meta->mm_last_pg) mdb_idl_insert(txn->mt_free_pgs, dp->p.mp_pgno); free(dp); } @@ -639,6 +643,7 @@ int mdb_txn_commit(MDB_txn *txn) { int n, done; + unsigned int i; ssize_t rc; off_t size; MDB_dpage *dp; @@ -673,7 +678,26 @@ mdb_txn_commit(MDB_txn *txn) goto done; DPRINTF("committing transaction %lu on mdbenv %p, root page %lu", - txn->mt_txnid, (void *) env, txn->mt_dbxs[0].md_db->md_root); + txn->mt_txnid, (void *) env, txn->mt_dbs[MAIN_DBI].md_root); + + /* Update DB root pointers. Their pages have already been + * touched so this is all in-place and cannot fail. + */ + { + MDB_val data; + data.mv_size = sizeof(MDB_db); + + for (i = 2; i < env->me_numdbs; i++) { + if (env->me_dbs[env->me_db_toggle][i].md_root != txn->mt_dbs[i].md_root) { + data.mv_data = &txn->mt_dbs[i]; + mdb_put(txn, i, &txn->mt_dbxs[i].md_name, &data, 0); + } + } + for (i = env->me_numdbs; i < txn->mt_numdbs; i++) { + data.mv_data = &txn->mt_dbs[i]; + mdb_put(txn, i, &txn->mt_dbxs[i].md_name, &data, 0); + } + } /* Commit up to MDB_COMMIT_PAGES dirty pages to disk until done. */ @@ -699,11 +723,11 @@ mdb_txn_commit(MDB_txn *txn) n = 0; size = 0; } - lseek(env->me_fd, dp->p.mp_pgno * env->me_meta.mm_psize, SEEK_SET); + lseek(env->me_fd, dp->p.mp_pgno * env->me_psize, SEEK_SET); next = dp->p.mp_pgno; } DPRINTF("committing page %lu", dp->p.mp_pgno); - iov[n].iov_len = env->me_meta.mm_psize * dp->h.md_num; + iov[n].iov_len = env->me_psize * dp->h.md_num; iov[n].iov_base = &dp->p; size += iov[n].iov_len; next = dp->p.mp_pgno + dp->h.md_num; @@ -748,14 +772,22 @@ mdb_txn_commit(MDB_txn *txn) } env->me_txn = NULL; + /* update the DB tables */ { - MDB_dbx *p1 = env->me_dbxs; + int toggle = !env->me_db_toggle; - txn->mt_dbxs[0].md_db = env->me_dbxs[0].md_db; - env->me_dbxs = txn->mt_dbxs; + for (i = 0; i < env->me_numdbs; i++) { + if (env->me_dbs[toggle][i].md_root != txn->mt_dbs[i].md_root) + env->me_dbs[toggle][i] = txn->mt_dbs[i]; + } + for (i = env->me_numdbs; i < txn->mt_numdbs; i++) { + env->me_dbxs[i] = txn->mt_dbxs[i]; + env->me_dbs[toggle][i] = txn->mt_dbs[i]; + } + env->me_db_toggle = toggle; env->me_numdbs = txn->mt_numdbs; - free(p1); + free(txn->mt_dbs); } /* add to tail of free list */ @@ -787,7 +819,7 @@ done: } static int -mdbenv_read_header(MDB_env *env) +mdbenv_read_header(MDB_env *env, MDB_meta *meta) { char page[PAGESIZE]; MDB_page *p; @@ -827,42 +859,43 @@ mdbenv_read_header(MDB_env *env) return EINVAL; } - memcpy(&env->me_meta, m, sizeof(*m)); + memcpy(meta, m, sizeof(*m)); return 0; } static int -mdbenv_init_meta(MDB_env *env) +mdbenv_init_meta(MDB_env *env, MDB_meta *meta) { MDB_page *p, *q; - MDB_meta *meta; + MDB_meta *m; int rc; unsigned int psize; DPRINTF("writing new meta page"); psize = sysconf(_SC_PAGE_SIZE); - env->me_meta.mm_magic = MDB_MAGIC; - env->me_meta.mm_version = MDB_VERSION; - env->me_meta.mm_psize = psize; - env->me_meta.mm_flags = env->me_flags & 0xffff; - env->me_meta.mm_root = P_INVALID; - env->me_meta.mm_last_pg = 1; + meta->mm_magic = MDB_MAGIC; + meta->mm_version = MDB_VERSION; + meta->mm_psize = psize; + meta->mm_last_pg = 1; + meta->mm_flags = env->me_flags & 0xffff; + meta->mm_dbs[0].md_root = P_INVALID; + meta->mm_dbs[1].md_root = P_INVALID; p = calloc(2, psize); p->mp_pgno = 0; p->mp_flags = P_META; - meta = METADATA(p); - memcpy(meta, &env->me_meta, sizeof(*meta)); + m = METADATA(p); + memcpy(m, meta, sizeof(*meta)); q = (MDB_page *)((char *)p + psize); q->mp_pgno = 1; q->mp_flags = P_META; - meta = METADATA(q); - memcpy(meta, &env->me_meta, sizeof(*meta)); + m = METADATA(q); + memcpy(m, meta, sizeof(*meta)); rc = write(env->me_fd, p, psize * 2); free(p); @@ -881,28 +914,22 @@ mdbenv_write_meta(MDB_txn *txn) assert(txn != NULL); assert(txn->mt_env != NULL); - DPRINTF("writing meta page for root page %lu", txn->mt_dbxs[0].md_db->md_root); + DPRINTF("writing meta page for root page %lu", txn->mt_dbs[MAIN_DBI].md_root); env = txn->mt_env; ptr = (char *)&meta; - off = offsetof(MDB_meta, mm_last_pg); + off = offsetof(MDB_meta, mm_dbs[0].md_depth); len = sizeof(MDB_meta) - off; ptr += off; + meta.mm_dbs[0] = txn->mt_dbs[0]; + meta.mm_dbs[1] = txn->mt_dbs[1]; meta.mm_last_pg = txn->mt_next_pgno - 1; meta.mm_txnid = txn->mt_txnid; - meta.mm_psize = env->me_meta.mm_psize; - meta.mm_flags = env->me_meta.mm_flags; - meta.mm_depth = txn->mt_dbxs[0].md_db->md_depth; - meta.mm_branch_pages = txn->mt_dbxs[0].md_db->md_branch_pages; - meta.mm_leaf_pages = txn->mt_dbxs[0].md_db->md_leaf_pages; - meta.mm_overflow_pages = txn->mt_dbxs[0].md_db->md_overflow_pages; - meta.mm_entries = txn->mt_dbxs[0].md_db->md_entries; - meta.mm_root = txn->mt_dbxs[0].md_db->md_root; if (!F_ISSET(txn->mt_flags, MDB_TXN_METOGGLE)) - off += env->me_meta.mm_psize; + off += env->me_psize; off += PAGEHDRSZ; lseek(env->me_fd, off, SEEK_SET); @@ -915,49 +942,20 @@ mdbenv_write_meta(MDB_txn *txn) return MDB_SUCCESS; } -/* Returns true if page p is a valid meta page, false otherwise. - */ -static int -mdb_check_meta_page(MDB_page *p) -{ - if (!F_ISSET(p->mp_flags, P_META)) { - DPRINTF("page %lu not a meta page", p->mp_pgno); - return EINVAL; - } - - return 0; -} - static int mdbenv_read_meta(MDB_env *env, int *which) { - MDB_page *mp0, *mp1; - MDB_meta *meta[2]; - int toggle = 0, rc; + int toggle = 0; assert(env != NULL); - if ((mp0 = mdbenv_get_page(env, 0)) == NULL || - (mp1 = mdbenv_get_page(env, 1)) == NULL) - return EIO; - - rc = mdb_check_meta_page(mp0); - if (rc) return rc; - - rc = mdb_check_meta_page(mp1); - if (rc) return rc; - - meta[0] = METADATA(mp0); - meta[1] = METADATA(mp1); - - if (meta[0]->mm_txnid < meta[1]->mm_txnid) + if (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid) toggle = 1; - if (meta[toggle]->mm_txnid > env->me_meta.mm_txnid) { - memcpy(&env->me_meta, meta[toggle], sizeof(env->me_meta)); - if (which) - *which = toggle; - } + if (env->me_meta != env->me_metas[toggle]) + env->me_meta = env->me_metas[toggle]; + if (which) + *which = toggle; DPRINTF("Using meta page %d", toggle); @@ -972,8 +970,8 @@ mdbenv_create(MDB_env **env) e = calloc(1, sizeof(*e)); if (!e) return ENOMEM; - e->me_meta.mm_mapsize = DEFAULT_MAPSIZE; e->me_maxreaders = DEFAULT_READERS; + e->me_maxdbs = 2; e->me_fd = -1; e->me_lfd = -1; *env = e; @@ -985,7 +983,16 @@ mdbenv_set_mapsize(MDB_env *env, size_t size) { if (env->me_map) return EINVAL; - env->me_mapsize = env->me_meta.mm_mapsize = size; + env->me_mapsize = size; + return MDB_SUCCESS; +} + +int +mdbenv_set_maxdbs(MDB_env *env, int dbs) +{ + if (env->me_map) + return EINVAL; + env->me_maxdbs = dbs; return MDB_SUCCESS; } @@ -1009,49 +1016,59 @@ int mdbenv_open2(MDB_env *env, unsigned int flags) { int i, newenv = 0; + MDB_meta meta; + MDB_page *p; env->me_flags = flags; - if ((i = mdbenv_read_header(env)) != 0) { + memset(&meta, 0, sizeof(meta)); + + if ((i = mdbenv_read_header(env, &meta)) != 0) { if (i != ENOENT) return i; DPRINTF("new mdbenv"); newenv = 1; } - if (!env->me_mapsize) - env->me_mapsize = env->me_meta.mm_mapsize; + if (!env->me_mapsize) { + env->me_mapsize = newenv ? DEFAULT_MAPSIZE : meta.mm_mapsize; + } i = MAP_SHARED; - if (env->me_meta.mm_address && (flags & MDB_FIXEDMAP)) + if (meta.mm_address && (flags & MDB_FIXEDMAP)) i |= MAP_FIXED; - env->me_map = mmap(env->me_meta.mm_address, env->me_mapsize, PROT_READ, i, + env->me_map = mmap(meta.mm_address, env->me_mapsize, PROT_READ, i, env->me_fd, 0); if (env->me_map == MAP_FAILED) return errno; if (newenv) { - env->me_meta.mm_mapsize = env->me_mapsize; + meta.mm_mapsize = env->me_mapsize; if (flags & MDB_FIXEDMAP) - env->me_meta.mm_address = env->me_map; - i = mdbenv_init_meta(env); + meta.mm_address = env->me_map; + i = mdbenv_init_meta(env, &meta); if (i != MDB_SUCCESS) { munmap(env->me_map, env->me_mapsize); return i; } } + env->me_psize = meta.mm_psize; + + p = (MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)(MDB_page *)env->me_map; + env->me_metas[0] = METADATA(p); + env->me_metas[1] = (MDB_meta *)((char *)env->me_metas[0] + meta.mm_psize); if ((i = mdbenv_read_meta(env, NULL)) != 0) return i; DPRINTF("opened database version %u, pagesize %u", - env->me_meta.mm_version, env->me_meta.mm_psize); - DPRINTF("depth: %u", env->me_meta.mm_depth); - DPRINTF("entries: %lu", env->me_meta.mm_entries); - DPRINTF("branch pages: %lu", env->me_meta.mm_branch_pages); - DPRINTF("leaf pages: %lu", env->me_meta.mm_leaf_pages); - DPRINTF("overflow pages: %lu", env->me_meta.mm_overflow_pages); - DPRINTF("root: %lu", env->me_meta.mm_root); + env->me_meta->mm_version, env->me_psize); + DPRINTF("depth: %u", env->me_meta->mm_dbs[MAIN_DBI].md_depth); + DPRINTF("entries: %lu", env->me_meta->mm_dbs[MAIN_DBI].md_entries); + DPRINTF("branch pages: %lu", env->me_meta->mm_dbs[MAIN_DBI].md_branch_pages); + DPRINTF("leaf pages: %lu", env->me_meta->mm_dbs[MAIN_DBI].md_leaf_pages); + DPRINTF("overflow pages: %lu", env->me_meta->mm_dbs[MAIN_DBI].md_overflow_pages); + DPRINTF("root: %lu", env->me_meta->mm_dbs[MAIN_DBI].md_root); return MDB_SUCCESS; } @@ -1066,12 +1083,13 @@ mdbenv_reader_dest(void *ptr) reader->mr_tid = 0; } +/* downgrade the exclusive lock on the region back to shared */ static void mdbenv_share_locks(MDB_env *env) { struct flock lock_info; - env->me_txns->mt_txnid = env->me_meta.mm_txnid; + env->me_txns->mt_txnid = env->me_meta->mm_txnid; memset((void *)&lock_info, 0, sizeof(lock_info)); lock_info.l_type = F_RDLCK; @@ -1201,12 +1219,12 @@ mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode) pthread_key_create(&env->me_txkey, mdbenv_reader_dest); if (excl) mdbenv_share_locks(env); - env->me_dbxs = calloc(DBX_CHUNK, sizeof(MDB_dbx)); - env->me_numdbs = 1; - env->me_dbxs[0].md_db = (MDB_db *)&env->me_meta.mm_psize; + env->me_dbxs = calloc(env->me_maxdbs, sizeof(MDB_dbx)); + env->me_dbs[0] = calloc(env->me_maxdbs, sizeof(MDB_db)); + env->me_dbs[1] = calloc(env->me_maxdbs, sizeof(MDB_db)); + env->me_numdbs = 2; } - leave: free(lpath); return rc; @@ -1348,7 +1366,7 @@ mdbenv_get_page(MDB_env *env, pgno_t pgno) } } if (!found) { - p = (MDB_page *)(env->me_map + env->me_meta.mm_psize * pgno); + p = (MDB_page *)(env->me_map + env->me_psize * pgno); } return p; } @@ -1450,7 +1468,7 @@ mdb_search_page(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, DPRINTF("transaction has failed, must abort"); return EINVAL; } else - root = txn->mt_dbxs[dbi].md_db->md_root; + root = txn->mt_dbs[dbi].md_root; if (root == P_INVALID) { /* Tree is empty. */ DPRINTF("tree is empty"); @@ -1464,7 +1482,8 @@ mdb_search_page(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, if (modify) { /* For sub-databases, update main root first */ - if (dbi && !F_ISSET(txn->mt_dbxs[dbi].md_page->mp_flags, P_DIRTY)) { + if (dbi > MAIN_DBI && txn->mt_dbs[dbi].md_root == + txn->mt_env->me_dbs[txn->mt_env->me_db_toggle][dbi].md_root) { MDB_pageparent mp2; rc = mdb_search_page(txn, 0, &txn->mt_dbxs[dbi].md_name, NULL, 1, &mp2); @@ -1476,7 +1495,7 @@ mdb_search_page(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, mpp->mp_pi = 0; if ((rc = mdb_touch(txn, mpp))) return rc; - txn->mt_dbxs[dbi].md_db->md_root = mpp->mp_page->mp_pgno; + txn->mt_dbs[dbi].md_root = mpp->mp_page->mp_pgno; } } @@ -1794,17 +1813,17 @@ mdb_new_page(MDB_txn *txn, MDB_dbi dbi, uint32_t flags, int num) if ((dp = mdb_alloc_page(txn, NULL, 0, num)) == NULL) return NULL; DPRINTF("allocated new mpage %lu, page size %u", - dp->p.mp_pgno, txn->mt_env->me_meta.mm_psize); + dp->p.mp_pgno, txn->mt_env->me_psize); dp->p.mp_flags = flags | P_DIRTY; dp->p.mp_lower = PAGEHDRSZ; - dp->p.mp_upper = txn->mt_env->me_meta.mm_psize; + dp->p.mp_upper = txn->mt_env->me_psize; if (IS_BRANCH(&dp->p)) - txn->mt_dbxs[dbi].md_db->md_branch_pages++; + txn->mt_dbs[dbi].md_branch_pages++; else if (IS_LEAF(&dp->p)) - txn->mt_dbxs[dbi].md_db->md_leaf_pages++; + txn->mt_dbs[dbi].md_leaf_pages++; else if (IS_OVERFLOW(&dp->p)) { - txn->mt_dbxs[dbi].md_db->md_overflow_pages += num; + txn->mt_dbs[dbi].md_overflow_pages += num; dp->p.mp_pages = num; } @@ -1817,7 +1836,7 @@ mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data) size_t sz; sz = LEAFSIZE(key, data); - if (data->mv_size >= env->me_meta.mm_psize / MDB_MINKEYS) { + if (data->mv_size >= env->me_psize / MDB_MINKEYS) { /* put on overflow page */ sz -= data->mv_size - sizeof(pgno_t); } @@ -1831,7 +1850,7 @@ mdb_branch_size(MDB_env *env, MDB_val *key) size_t sz; sz = INDXSIZE(key); - if (sz >= env->me_meta.mm_psize / MDB_MINKEYS) { + if (sz >= env->me_psize / MDB_MINKEYS) { /* put on overflow page */ /* not implemented */ /* sz -= key->size - sizeof(pgno_t); */ @@ -1865,8 +1884,8 @@ mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, indx_t indx, if (F_ISSET(flags, F_BIGDATA)) { /* Data already on overflow page. */ node_size += sizeof(pgno_t); - } else if (data->mv_size >= txn->mt_env->me_meta.mm_psize / MDB_MINKEYS) { - int ovpages = OVPAGES(data->mv_size, txn->mt_env->me_meta.mm_psize); + } else if (data->mv_size >= txn->mt_env->me_psize / MDB_MINKEYS) { + int ovpages = OVPAGES(data->mv_size, txn->mt_env->me_psize); /* Put data on overflow page. */ DPRINTF("data size is %zu, put on overflow page", data->mv_size); @@ -2165,9 +2184,9 @@ mdb_merge(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src, MDB_pageparent *dst) } if (IS_LEAF(src->mp_page)) - txn->mt_dbxs[dbi].md_db->md_leaf_pages--; + txn->mt_dbs[dbi].md_leaf_pages--; else - txn->mt_dbxs[dbi].md_db->md_branch_pages--; + txn->mt_dbs[dbi].md_branch_pages--; mpp.mp_page = src->mp_parent; dh = (MDB_dhead *)src->mp_parent; @@ -2204,16 +2223,16 @@ mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp) if (mpp->mp_parent == NULL) { if (NUMKEYS(mpp->mp_page) == 0) { DPRINTF("tree is completely empty"); - txn->mt_dbxs[dbi].md_db->md_root = P_INVALID; - txn->mt_dbxs[dbi].md_db->md_depth--; - txn->mt_dbxs[dbi].md_db->md_leaf_pages--; + txn->mt_dbs[dbi].md_root = P_INVALID; + txn->mt_dbs[dbi].md_depth--; + txn->mt_dbs[dbi].md_leaf_pages--; } else if (IS_BRANCH(mpp->mp_page) && NUMKEYS(mpp->mp_page) == 1) { DPRINTF("collapsing root page!"); - txn->mt_dbxs[dbi].md_db->md_root = NODEPGNO(NODEPTR(mpp->mp_page, 0)); - if ((root = mdbenv_get_page(txn->mt_env, txn->mt_dbxs[dbi].md_db->md_root)) == NULL) + txn->mt_dbs[dbi].md_root = NODEPGNO(NODEPTR(mpp->mp_page, 0)); + if ((root = mdbenv_get_page(txn->mt_env, txn->mt_dbs[dbi].md_root)) == NULL) return MDB_FAIL; - txn->mt_dbxs[dbi].md_db->md_depth--; - txn->mt_dbxs[dbi].md_db->md_branch_pages--; + txn->mt_dbs[dbi].md_depth--; + txn->mt_dbs[dbi].md_branch_pages--; } else DPRINTF("root page doesn't need rebalancing"); return MDB_SUCCESS; @@ -2314,13 +2333,13 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, pgno_t pg; memcpy(&pg, NODEDATA(leaf), sizeof(pg)); - ovpages = OVPAGES(NODEDSZ(leaf), txn->mt_env->me_meta.mm_psize); + ovpages = OVPAGES(NODEDSZ(leaf), txn->mt_env->me_psize); for (i=0; imt_free_pgs, pg); pg++; } } - txn->mt_dbxs[dbi].md_db->md_entries--; + txn->mt_dbs[dbi].md_entries--; rc = mdb_rebalance(txn, dbi, &mpp); if (rc != MDB_SUCCESS) txn->mt_flags |= MDB_TXN_ERROR; @@ -2363,9 +2382,9 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp, return MDB_FAIL; mdp->h.md_pi = 0; mdp->h.md_parent = &pdp->p; - txn->mt_dbxs[dbi].md_db->md_root = pdp->p.mp_pgno; + txn->mt_dbs[dbi].md_root = pdp->p.mp_pgno; DPRINTF("root split! new root = %lu", pdp->p.mp_pgno); - txn->mt_dbxs[dbi].md_db->md_depth++; + txn->mt_dbs[dbi].md_depth++; /* Add left (implicit) pointer. */ if (mdb_add_node(txn, dbi, &pdp->p, 0, NULL, NULL, @@ -2383,12 +2402,12 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp, DPRINTF("new right sibling: page %lu", rdp->p.mp_pgno); /* Move half of the keys to the right sibling. */ - if ((copy = malloc(txn->mt_env->me_meta.mm_psize)) == NULL) + if ((copy = malloc(txn->mt_env->me_psize)) == NULL) return MDB_FAIL; - memcpy(copy, &mdp->p, txn->mt_env->me_meta.mm_psize); - memset(&mdp->p.mp_ptrs, 0, txn->mt_env->me_meta.mm_psize - PAGEHDRSZ); + memcpy(copy, &mdp->p, txn->mt_env->me_psize); + memset(&mdp->p.mp_ptrs, 0, txn->mt_env->me_psize - PAGEHDRSZ); mdp->p.mp_lower = PAGEHDRSZ; - mdp->p.mp_upper = txn->mt_env->me_meta.mm_psize; + mdp->p.mp_upper = txn->mt_env->me_psize; split_indx = NUMKEYS(copy) / 2 + 1; @@ -2524,6 +2543,11 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, (int)key->mv_size, (char *)key->mv_data); return EEXIST; } + /* same size, just replace it */ + if (NODEDSZ(leaf) == data->mv_size) { + memcpy(NODEDATA(leaf), data->mv_data, data->mv_size); + goto done; + } mdb_del_node(mpp.mp_page, ki); } if (leaf == NULL) { /* append if not found */ @@ -2538,8 +2562,8 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, return ENOMEM; } mpp.mp_page = &dp->p; - txn->mt_dbxs[dbi].md_db->md_root = mpp.mp_page->mp_pgno; - txn->mt_dbxs[dbi].md_db->md_depth++; + txn->mt_dbs[dbi].md_root = mpp.mp_page->mp_pgno; + txn->mt_dbs[dbi].md_depth++; ki = 0; } else @@ -2559,7 +2583,7 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, if (rc != MDB_SUCCESS) txn->mt_flags |= MDB_TXN_ERROR; else - txn->mt_dbxs[dbi].md_db->md_entries++; + txn->mt_dbs[dbi].md_entries++; done: return rc; @@ -2591,12 +2615,12 @@ mdbenv_stat(MDB_env *env, MDB_stat *arg) if (env == NULL || arg == NULL) return EINVAL; - arg->ms_psize = env->me_meta.mm_psize; - arg->ms_depth = env->me_meta.mm_depth; - arg->ms_branch_pages = env->me_meta.mm_branch_pages; - arg->ms_leaf_pages = env->me_meta.mm_leaf_pages; - arg->ms_overflow_pages = env->me_meta.mm_overflow_pages; - arg->ms_entries = env->me_meta.mm_entries; + arg->ms_psize = env->me_psize; + arg->ms_depth = env->me_meta->mm_dbs[MAIN_DBI].md_depth; + arg->ms_branch_pages = env->me_meta->mm_dbs[MAIN_DBI].md_branch_pages; + arg->ms_leaf_pages = env->me_meta->mm_dbs[MAIN_DBI].md_leaf_pages; + arg->ms_overflow_pages = env->me_meta->mm_dbs[MAIN_DBI].md_overflow_pages; + arg->ms_entries = env->me_meta->mm_dbs[MAIN_DBI].md_entries; return MDB_SUCCESS; } @@ -2610,24 +2634,27 @@ int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi) /* main DB? */ if (!name) { - *dbi = 0; + *dbi = MAIN_DBI; return MDB_SUCCESS; } /* Is the DB already open? */ len = strlen(name); - for (i=0; imt_numdbs; i++) { + for (i=2; imt_numdbs; i++) { if (len == txn->mt_dbxs[i].md_name.mv_size && - !strcmp(name, txn->mt_dbxs[i].md_name.mv_data)) { + !strncmp(name, txn->mt_dbxs[i].md_name.mv_data, len)) { *dbi = i; return MDB_SUCCESS; } } + if (txn->mt_numdbs >= txn->mt_env->me_maxdbs - 1) + return ENFILE; + /* Find the DB info */ key.mv_size = len; key.mv_data = (void *)name; - rc = mdb_get(txn, 0, &key, &data); + rc = mdb_get(txn, MAIN_DBI, &key, &data); /* Create if requested */ if (rc == ENOENT && (flags & MDB_CREATE)) { @@ -2638,28 +2665,16 @@ int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi) dummy.md_root = P_INVALID; dummy.md_flags = flags & 0xffff; rc = mdb_put(txn, 0, &key, &data, 0); - if (rc == MDB_SUCCESS) - rc = mdb_get(txn, 0, &key, &data); } /* OK, got info, add to table */ if (rc == MDB_SUCCESS) { - /* Is there a free slot? */ - if ((txn->mt_numdbs & (DBX_CHUNK-1)) == 0) { - MDB_dbx *p1; - int i; - i = txn->mt_numdbs + DBX_CHUNK; - p1 = realloc(txn->mt_dbxs, i * sizeof(MDB_dbx)); - if (p1 == NULL) - return ENOMEM; - txn->mt_dbxs = p1; - } txn->mt_dbxs[txn->mt_numdbs].md_name.mv_data = strdup(name); txn->mt_dbxs[txn->mt_numdbs].md_name.mv_size = len; txn->mt_dbxs[txn->mt_numdbs].md_cmp = NULL; txn->mt_dbxs[txn->mt_numdbs].md_dcmp = NULL; txn->mt_dbxs[txn->mt_numdbs].md_rel = NULL; - txn->mt_dbxs[txn->mt_numdbs].md_db = data.mv_data; + memcpy(&txn->mt_dbs[txn->mt_numdbs], data.mv_data, sizeof(MDB_db)); *dbi = txn->mt_numdbs; txn->mt_numdbs++; } @@ -2672,21 +2687,23 @@ int mdb_stat(MDB_txn *txn, MDB_dbi dbi, MDB_stat *arg) if (txn == NULL || arg == NULL) return EINVAL; - arg->ms_psize = txn->mt_env->me_meta.mm_psize; - arg->ms_depth = txn->mt_dbxs[dbi].md_db->md_depth; - arg->ms_branch_pages = txn->mt_dbxs[dbi].md_db->md_branch_pages; - arg->ms_leaf_pages = txn->mt_dbxs[dbi].md_db->md_leaf_pages; - arg->ms_overflow_pages = txn->mt_dbxs[dbi].md_db->md_overflow_pages; - arg->ms_entries = txn->mt_dbxs[dbi].md_db->md_entries; + arg->ms_psize = txn->mt_env->me_psize; + arg->ms_depth = txn->mt_dbs[dbi].md_depth; + arg->ms_branch_pages = txn->mt_dbs[dbi].md_branch_pages; + arg->ms_leaf_pages = txn->mt_dbs[dbi].md_leaf_pages; + arg->ms_overflow_pages = txn->mt_dbs[dbi].md_overflow_pages; + arg->ms_entries = txn->mt_dbs[dbi].md_entries; return MDB_SUCCESS; } void mdb_close(MDB_txn *txn, MDB_dbi dbi) { - if (dbi >= txn->mt_numdbs) + char *ptr; + if (dbi <= MAIN_DBI || dbi >= txn->mt_numdbs) return; - free(txn->mt_dbxs[dbi].md_name.mv_data); + ptr = txn->mt_dbxs[dbi].md_name.mv_data; txn->mt_dbxs[dbi].md_name.mv_data = NULL; txn->mt_dbxs[dbi].md_name.mv_size = 0; + free(ptr); } diff --git a/libraries/libmdb/mdb.h b/libraries/libmdb/mdb.h index 613311a..2a303fa 100644 --- a/libraries/libmdb/mdb.h +++ b/libraries/libmdb/mdb.h @@ -39,12 +39,14 @@ typedef enum MDB_cursor_op { /* cursor operations */ /* DB flags */ #define MDB_REVERSEKEY 0x02 /* use reverse string keys */ #define MDB_DUPSORT 0x04 /* use sorted duplicates */ -#define MDB_NOSYNC 0x10000 /* don't fsync after commit */ -#define MDB_RDONLY 0x20000 /* read only */ -#define MDB_CREATE 0x40000 /* create if not present */ /* environment flags */ #define MDB_FIXEDMAP 0x01 /* mmap at a fixed address */ +#define MDB_NOSYNC 0x10000 /* don't fsync after commit */ +#define MDB_RDONLY 0x20000 /* read only */ + +/* DB or env flags */ +#define MDB_CREATE 0x40000 /* create if not present */ typedef struct MDB_stat { unsigned int ms_psize; @@ -64,6 +66,7 @@ int mdbenv_get_path(MDB_env *env, const char **path); int mdbenv_set_mapsize(MDB_env *env, size_t size); int mdbenv_set_maxreaders(MDB_env *env, int readers); int mdbenv_get_maxreaders(MDB_env *env, int *readers); +int mdbenv_set_maxdbs(MDB_env *env, int dbs); int mdbenv_sync(MDB_env *env); int mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **txn);