Fix subDB/ovpage copying

incre
Howard Chu 11 years ago
parent af3c340758
commit 8836b78e94
  1. 47
      libraries/liblmdb/mdb.c

@ -8018,7 +8018,6 @@ typedef struct mdb_copy {
pthread_mutex_t mc_mutex[2]; pthread_mutex_t mc_mutex[2];
char *mc_wbuf[2]; char *mc_wbuf[2];
char *mc_over[2]; char *mc_over[2];
void *mc_obuf[2];
void *mc_free; void *mc_free;
MDB_env *mc_env; MDB_env *mc_env;
MDB_txn *mc_txn; MDB_txn *mc_txn;
@ -8043,8 +8042,10 @@ mdb_env_copythr(void *arg)
#define DO_WRITE(rc, fd, ptr, w2, len) len = write(fd, ptr, w2); rc = (len >= 0) #define DO_WRITE(rc, fd, ptr, w2, len) len = write(fd, ptr, w2); rc = (len >= 0)
#endif #endif
pthread_mutex_lock(&my->mc_mutex[toggle^1]);
for(;;) { for(;;) {
pthread_mutex_lock(&my->mc_mutex[toggle]); pthread_mutex_lock(&my->mc_mutex[toggle]);
pthread_mutex_unlock(&my->mc_mutex[toggle^1]);
if (!my->mc_wlen[toggle]) { if (!my->mc_wlen[toggle]) {
pthread_mutex_unlock(&my->mc_mutex[toggle]); pthread_mutex_unlock(&my->mc_mutex[toggle]);
break; break;
@ -8067,6 +8068,7 @@ again:
break; break;
} }
} }
my->mc_wlen[toggle] = wsize;
if (rc) { if (rc) {
my->mc_status = rc; my->mc_status = rc;
pthread_mutex_unlock(&my->mc_mutex[toggle]); pthread_mutex_unlock(&my->mc_mutex[toggle]);
@ -8079,7 +8081,6 @@ again:
my->mc_olen[toggle] = 0; my->mc_olen[toggle] = 0;
goto again; goto again;
} }
pthread_mutex_unlock(&my->mc_mutex[toggle]);
toggle ^= 1; toggle ^= 1;
} }
return NULL; return NULL;
@ -8104,12 +8105,12 @@ mdb_env_cthr_toggle(mdb_copy *my)
} }
static int static int
mdb_env_cwalk(mdb_copy *my, pgno_t pg) mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags)
{ {
MDB_cursor mc; MDB_cursor mc;
MDB_txn *txn = my->mc_txn; MDB_txn *txn = my->mc_txn;
MDB_node *ni; MDB_node *ni;
MDB_page *mo, *mp; MDB_page *mo, *mp, *leaf;
char *buf, *ptr; char *buf, *ptr;
int rc, toggle; int rc, toggle;
unsigned int i; unsigned int i;
@ -8118,7 +8119,7 @@ mdb_env_cwalk(mdb_copy *my, pgno_t pg)
mc.mc_top = 0; mc.mc_top = 0;
mc.mc_txn = txn; mc.mc_txn = txn;
rc = mdb_page_get(my->mc_txn, pg, &mc.mc_pg[0], NULL); rc = mdb_page_get(my->mc_txn, *pg, &mc.mc_pg[0], NULL);
if (rc) if (rc)
return rc; return rc;
rc = mdb_page_search_root(&mc, NULL, MDB_PS_FIRST); rc = mdb_page_search_root(&mc, NULL, MDB_PS_FIRST);
@ -8126,7 +8127,7 @@ mdb_env_cwalk(mdb_copy *my, pgno_t pg)
return rc; return rc;
/* Make cursor pages writable */ /* Make cursor pages writable */
buf = ptr = malloc(my->mc_env->me_psize * mc.mc_top); buf = ptr = malloc(my->mc_env->me_psize * mc.mc_snum);
if (buf == NULL) if (buf == NULL)
return ENOMEM; return ENOMEM;
@ -8136,6 +8137,9 @@ mdb_env_cwalk(mdb_copy *my, pgno_t pg)
ptr += my->mc_env->me_psize; ptr += my->mc_env->me_psize;
} }
/* This is writable space for a leaf page. Usually not needed. */
leaf = (MDB_page *)ptr;
toggle = my->mc_toggle; toggle = my->mc_toggle;
while (mc.mc_snum > 0) { while (mc.mc_snum > 0) {
unsigned n; unsigned n;
@ -8143,12 +8147,21 @@ mdb_env_cwalk(mdb_copy *my, pgno_t pg)
n = NUMKEYS(mp); n = NUMKEYS(mp);
if (IS_LEAF(mp)) { if (IS_LEAF(mp)) {
if (!IS_LEAF2(mp)) { if (!IS_LEAF2(mp) && !(flags & F_DUPDATA)) {
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
ni = NODEPTR(mp, i); ni = NODEPTR(mp, i);
if (ni->mn_flags & F_BIGDATA) { if (ni->mn_flags & F_BIGDATA) {
MDB_page *omp; MDB_page *omp;
pgno_t pg; pgno_t pg;
/* Need writable leaf */
if (mp != leaf) {
mc.mc_pg[mc.mc_top] = leaf;
mdb_page_copy(leaf, mp, my->mc_env->me_psize);
mp = leaf;
ni = NODEPTR(mp, i);
}
memcpy(&pg, NODEDATA(ni), sizeof(pg)); memcpy(&pg, NODEDATA(ni), sizeof(pg));
rc = mdb_page_get(txn, pg, &omp, NULL); rc = mdb_page_get(txn, pg, &omp, NULL);
if (rc) if (rc)
@ -8164,20 +8177,33 @@ mdb_env_cwalk(mdb_copy *my, pgno_t pg)
mo->mp_pgno = my->mc_next_pgno; mo->mp_pgno = my->mc_next_pgno;
my->mc_next_pgno += omp->mp_pages; my->mc_next_pgno += omp->mp_pages;
my->mc_wlen[toggle] += my->mc_env->me_psize; my->mc_wlen[toggle] += my->mc_env->me_psize;
if (omp->mp_pages > 1) {
my->mc_olen[toggle] = my->mc_env->me_psize * (omp->mp_pages - 1); my->mc_olen[toggle] = my->mc_env->me_psize * (omp->mp_pages - 1);
my->mc_obuf[toggle] = (char *)omp + my->mc_env->me_psize; my->mc_over[toggle] = (char *)omp + my->mc_env->me_psize;
rc = mdb_env_cthr_toggle(my); rc = mdb_env_cthr_toggle(my);
if (rc) if (rc)
goto done; goto done;
toggle ^= 1; toggle ^= 1;
}
memcpy(NODEDATA(ni), &mo->mp_pgno, sizeof(pgno_t));
} else if (ni->mn_flags & F_SUBDATA) { } else if (ni->mn_flags & F_SUBDATA) {
MDB_db db; MDB_db db;
/* Need writable leaf */
if (mp != leaf) {
mc.mc_pg[mc.mc_top] = leaf;
mdb_page_copy(leaf, mp, my->mc_env->me_psize);
mp = leaf;
ni = NODEPTR(mp, i);
}
memcpy(&db, NODEDATA(ni), sizeof(db)); memcpy(&db, NODEDATA(ni), sizeof(db));
my->mc_toggle = toggle; my->mc_toggle = toggle;
rc = mdb_env_cwalk(my, db.md_root); rc = mdb_env_cwalk(my, &db.md_root, ni->mn_flags & F_DUPDATA);
if (rc) if (rc)
goto done; goto done;
toggle = my->mc_toggle; toggle = my->mc_toggle;
memcpy(NODEDATA(ni), &db, sizeof(db));
} }
} }
} }
@ -8218,6 +8244,7 @@ again:
my->mc_wlen[toggle] += my->mc_env->me_psize; my->mc_wlen[toggle] += my->mc_env->me_psize;
mdb_cursor_pop(&mc); mdb_cursor_pop(&mc);
} }
*pg = mo->mp_pgno;
done: done:
free(buf); free(buf);
return rc; return rc;
@ -8309,7 +8336,7 @@ mdb_env_copyfd2(MDB_env *env, HANDLE fd)
my.mc_wlen[0] = env->me_psize * 2; my.mc_wlen[0] = env->me_psize * 2;
my.mc_txn = txn; my.mc_txn = txn;
pthread_create(&thr, NULL, mdb_env_copythr, &my); pthread_create(&thr, NULL, mdb_env_copythr, &my);
rc = mdb_env_cwalk(&my, txn->mt_dbs[1].md_root); rc = mdb_env_cwalk(&my, &txn->mt_dbs[1].md_root, 0);
if (rc == MDB_SUCCESS && my.mc_wlen[my.mc_toggle]) if (rc == MDB_SUCCESS && my.mc_wlen[my.mc_toggle])
rc = mdb_env_cthr_toggle(&my); rc = mdb_env_cthr_toggle(&my);
my.mc_wlen[my.mc_toggle] = 0; my.mc_wlen[my.mc_toggle] = 0;

Loading…
Cancel
Save