Add incremental dump/load

incre
Howard Chu 11 years ago
parent 27d69186c1
commit a7fc32c0b3
  1. 49
      libraries/liblmdb/lmdb.h
  2. 241
      libraries/liblmdb/mdb.c
  3. 8
      libraries/liblmdb/mdb_dump.1
  4. 34
      libraries/liblmdb/mdb_dump.c
  5. 5
      libraries/liblmdb/mdb_load.1
  6. 40
      libraries/liblmdb/mdb_load.c

@ -674,6 +674,55 @@ int mdb_env_copy2(MDB_env *env, const char *path, unsigned int flags);
*/
int mdb_env_copyfd2(MDB_env *env, mdb_filehandle_t fd, unsigned int flags);
/** @brief Perform incremental dump of an LMDB environment to the
* specified file descriptor.
*
* This function may be used to make an incremental backup of an
* existing environment.
* @note This call can trigger significant file size growth if run in
* parallel with write transactions, because it employs a read-only
* transaction. See long-lived transactions under @ref caveats_sec.
* @param[in] env An environment handle returned by #mdb_env_create(). It
* must have already been opened successfully.
* @param[in] fd The filedescriptor to write the copy to. It must
* have already been opened for Write access.
* @param[in] txnid The transaction ID of a previous backup. It must
* be greater than zero.
* @return A non-zero error value on failure and 0 on success.
*/
int mdb_env_incr_dumpfd(MDB_env *env, mdb_filehandle_t fd, size_t txnid);
/** @brief Perform incremental dump of an LMDB environment to the
* specified file.
*
* This function may be used to make an incremental backup of an
* existing environment.
* @note This call can trigger significant file size growth if run in
* parallel with write transactions, because it employs a read-only
* transaction. See long-lived transactions under @ref caveats_sec.
* @param[in] env An environment handle returned by #mdb_env_create(). It
* must have already been opened successfully.
* @param[in] path The name of the file to write the copy to. It must
* not already exist.
* @param[in] txnid The transaction ID of a previous backup. It must
* be greater than zero.
* @return A non-zero error value on failure and 0 on success.
*/
int mdb_env_incr_dump(MDB_env *env, const char *path, size_t txnid);
/** @brief Reload an incremental dump of an LMDB environment from the
* specified file descriptor.
*
* This function may be used to load an incremental backup of an
* existing environment.
* @note No other tasks may access the environment while this runs.
* @param[in] env An environment handle returned by #mdb_env_create(). It
* must have already been opened successfully.
* @param[in] fd The filedescriptor to read the backup from.
* @return A non-zero error value on failure and 0 on success.
*/
int mdb_env_incr_loadfd(MDB_env *env, mdb_filehandle_t fd);
/** @brief Return statistics about the LMDB environment.
*
* @param[in] env An environment handle returned by #mdb_env_create()

@ -215,11 +215,13 @@
#define MDB_PROCESS_QUERY_LIMITED_INFORMATION 0x1000
#endif
#define Z "I"
#define ALIGNED_FREE(x) _aligned_free(x)
#else
#define THREAD_RET void *
#define THREAD_CREATE(thr,start,arg) pthread_create(&thr,NULL,start,arg)
#define THREAD_FINISH(thr) pthread_join(thr,NULL)
#define Z "z" /**< printf format modifier for size_t */
#define ALIGNED_FREE(x) free(x)
/** For MDB_LOCK_FORMAT: True if readers take a pid lock in the lockfile */
#define MDB_PIDLOCK 1
@ -8072,6 +8074,12 @@ typedef struct mdb_copy {
} mdb_copy;
#ifdef _WIN32
#define DO_WRITE(rc, fd, ptr, w2, len) rc = WriteFile(fd, ptr, w2, &len, NULL)
#else
#define DO_WRITE(rc, fd, ptr, w2, len) len = write(fd, ptr, w2); rc = (len >= 0)
#endif
/** Dedicated writer thread for compacting copy. */
static THREAD_RET ESECT
mdb_env_copythr(void *arg)
@ -8081,10 +8089,8 @@ mdb_env_copythr(void *arg)
int toggle = 0, wsize, rc;
#ifdef _WIN32
DWORD len;
#define DO_WRITE(rc, fd, ptr, w2, len) rc = WriteFile(fd, ptr, w2, &len, NULL)
#else
int len;
#define DO_WRITE(rc, fd, ptr, w2, len) len = write(fd, ptr, w2); rc = (len >= 0)
#endif
pthread_mutex_lock(&my->mc_mutex);
@ -8134,7 +8140,6 @@ again:
pthread_cond_signal(&my->mc_cond);
pthread_mutex_unlock(&my->mc_mutex);
return (THREAD_RET)0;
#undef DO_WRITE
}
/** Tell the writer thread there's a buffer ready to write */
@ -8350,29 +8355,12 @@ mdb_env_copyfd1(MDB_env *env, HANDLE fd)
my.mc_toggle = 0;
my.mc_env = env;
my.mc_fd = fd;
THREAD_CREATE(thr, mdb_env_copythr, &my);
/* Do the lock/unlock of the reader mutex before starting the
* write txn. Otherwise other read txns could block writers.
*/
rc = mdb_txn_begin(env, NULL, MDB_RDONLY, &txn);
if (rc)
return rc;
if (env->me_txns) {
/* We must start the actual read txn after blocking writers */
mdb_txn_reset0(txn, "reset-stage1");
/* Temporarily block writers until we snapshot the meta pages */
LOCK_MUTEX_W(env);
rc = mdb_txn_renew0(txn);
if (rc) {
UNLOCK_MUTEX_W(env);
goto leave;
}
}
THREAD_CREATE(thr, mdb_env_copythr, &my);
mp = (MDB_page *)my.mc_wbuf[0];
memset(mp, 0, 2*env->me_psize);
mp->mp_pgno = 0;
@ -8422,17 +8410,16 @@ mdb_env_copyfd1(MDB_env *env, HANDLE fd)
pthread_cond_wait(&my.mc_cond, &my.mc_mutex);
pthread_mutex_unlock(&my.mc_mutex);
THREAD_FINISH(thr);
leave:
mdb_txn_abort(txn);
#ifdef _WIN32
CloseHandle(my.mc_cond);
CloseHandle(my.mc_mutex);
_aligned_free(my.mc_wbuf[0]);
#else
pthread_cond_destroy(&my.mc_cond);
pthread_mutex_destroy(&my.mc_mutex);
free(my.mc_wbuf[0]);
#endif
ALIGNED_FREE(my.mc_wbuf[0]);
return rc;
}
@ -8446,11 +8433,9 @@ mdb_env_copyfd0(MDB_env *env, HANDLE fd)
char *ptr;
#ifdef _WIN32
DWORD len, w2;
#define DO_WRITE(rc, fd, ptr, w2, len) rc = WriteFile(fd, ptr, w2, &len, NULL)
#else
ssize_t len;
size_t w2;
#define DO_WRITE(rc, fd, ptr, w2, len) len = write(fd, ptr, w2); rc = (len >= 0)
#endif
/* Do the lock/unlock of the reader mutex before starting the
@ -8556,8 +8541,8 @@ mdb_env_copyfd(MDB_env *env, HANDLE fd)
return mdb_env_copyfd2(env, fd, 0);
}
int ESECT
mdb_env_copy2(MDB_env *env, const char *path, unsigned int flags)
static int ESECT
mdb_env_copy_open(MDB_env *env, const char *path, HANDLE *retfd)
{
int rc, len;
char *lpath;
@ -8584,10 +8569,10 @@ mdb_env_copy2(MDB_env *env, const char *path, unsigned int flags)
#else
newfd = open(lpath, O_WRONLY|O_CREAT|O_EXCL, 0666);
#endif
if (newfd == INVALID_HANDLE_VALUE) {
rc = ErrCode();
goto leave;
}
if (!(env->me_flags & MDB_NOSUBDIR))
free(lpath);
if (newfd == INVALID_HANDLE_VALUE)
return ErrCode();
#ifdef O_DIRECT
/* Set O_DIRECT if the file system supports it */
@ -8595,29 +8580,205 @@ mdb_env_copy2(MDB_env *env, const char *path, unsigned int flags)
(void) fcntl(newfd, F_SETFL, rc | O_DIRECT);
#endif
#ifdef F_NOCACHE /* __APPLE__ */
rc = fcntl(newfd, F_NOCACHE, 1);
(void) fcntl(newfd, F_NOCACHE, 1);
#endif
*retfd = newfd;
return MDB_SUCCESS;
}
int ESECT
mdb_env_copy2(MDB_env *env, const char *path, unsigned int flags)
{
HANDLE newfd = INVALID_HANDLE_VALUE;
int rc;
rc = mdb_env_copy_open(env, path, &newfd);
if (rc)
return rc;
rc = mdb_env_copyfd2(env, newfd, flags);
if (newfd != INVALID_HANDLE_VALUE)
if (close(newfd) < 0 && rc == MDB_SUCCESS)
rc = ErrCode();
return rc;
}
int ESECT
mdb_env_copy(MDB_env *env, const char *path)
{
return mdb_env_copy2(env, path, 0);
}
int ESECT
mdb_env_incr_dumpfd(MDB_env *env, HANDLE fd, size_t txnid)
{
int rc;
MDB_page *mp, *mend;
MDB_txn *txn;
size_t wsize;
char *buf = NULL;
#ifdef _WIN32
DWORD len, w2;
#else
ssize_t len;
size_t w2;
#endif
#ifdef _WIN32
buf = _aligned_malloc(2*env->me_psize, env->me_psize);
if (buf == NULL)
return errno;
#else
rc = posix_memalign((void **)&buf, env->me_psize, 2*env->me_psize);
if (rc)
return rc;
#endif
/* Do the lock/unlock of the reader mutex before starting the
* write txn. Otherwise other read txns could block writers.
*/
rc = mdb_txn_begin(env, NULL, MDB_RDONLY, &txn);
if (rc) {
ALIGNED_FREE(buf);
return rc;
}
if (env->me_txns) {
/* We must start the actual read txn after blocking writers */
mdb_txn_reset0(txn, "reset-stage1");
/* Temporarily block writers until we snapshot the meta pages */
LOCK_MUTEX_W(env);
rc = mdb_txn_renew0(txn);
if (rc) {
UNLOCK_MUTEX_W(env);
goto leave;
}
}
memcpy(buf, env->me_map, env->me_psize*2);
if (env->me_txns)
UNLOCK_MUTEX_W(env);
mp = (MDB_page *)buf;
if (mp->mp_txnid > txnid) {
DO_WRITE(rc, fd, mp, env->me_psize, len);
if (!rc) {
rc = ErrCode();
goto leave;
}
#endif
}
mp = (MDB_page *)((char *)mp + env->me_psize);
if (mp->mp_txnid > txnid) {
DO_WRITE(rc, fd, mp, env->me_psize, len);
if (!rc) {
rc = ErrCode();
goto leave;
}
}
ALIGNED_FREE(buf);
buf = NULL;
rc = mdb_env_copyfd2(env, newfd, flags);
mp = (MDB_page *)((char *)env->me_map + 2*env->me_psize);
mend = (MDB_page *)((char *)env->me_map + txn->mt_next_pgno * env->me_psize);
while (mp < mend) {
wsize = env->me_psize;
if (IS_OVERFLOW(mp))
wsize *= mp->mp_pages;
if (mp->mp_txnid > txnid) {
char *ptr = (char *)mp;
w2 = wsize;
while (w2 > 0) {
DO_WRITE(rc, fd, ptr, w2, len);
if (!rc) {
rc = ErrCode();
goto leave;
} else if (len > 0) {
rc = MDB_SUCCESS;
ptr += len;
w2 -= len;
continue;
} else {
rc = EIO;
goto leave;
}
}
}
mp = (MDB_page *)((char *)mp + wsize);
}
leave:
if (!(env->me_flags & MDB_NOSUBDIR))
free(lpath);
mdb_txn_abort(txn);
if (buf != NULL)
ALIGNED_FREE(buf);
return rc;
}
int ESECT
mdb_env_incr_dump(MDB_env *env, const char *path, size_t txnid)
{
HANDLE newfd = INVALID_HANDLE_VALUE;
int rc;
/* Output is just a plain file, not an environment */
env->me_flags |= MDB_NOSUBDIR;
rc = mdb_env_copy_open(env, path, &newfd);
if (rc)
return rc;
rc = mdb_env_incr_dumpfd(env, newfd, txnid);
if (newfd != INVALID_HANDLE_VALUE)
if (close(newfd) < 0 && rc == MDB_SUCCESS)
rc = ErrCode();
return rc;
}
int ESECT
mdb_env_copy(MDB_env *env, const char *path)
mdb_env_incr_loadfd(MDB_env *env, HANDLE fd)
{
return mdb_env_copy2(env, path, 0);
size_t rsize;
ssize_t rlen;
char buf[PAGEHDRSZ], *ptr;
MDB_page *rp = (MDB_page *)buf, *mp;
if (!(env->me_flags & MDB_WRITEMAP))
return EINVAL;
for (;;) {
#ifdef _WIN32
int rc = ReadFile(fd, buf, sizeof(buf), &rlen, NULL) ? (int)rlen : -1;
if (rc == -1 && ErrCode() == ERROR_HANDLE_EOF)
rc = 0;
#else
rlen = read(fd, buf, sizeof(buf));
#endif
if (rlen != sizeof(buf))
break;
rsize = env->me_psize;
if (IS_OVERFLOW(rp))
rsize *= rp->mp_pages;
rsize -= rlen;
mp = (MDB_page *)(env->me_map + rp->mp_pgno * env->me_psize);
ptr = METADATA(mp);
memcpy(mp, rp, sizeof(buf));
while (rsize > 0) {
#ifdef _WIN32
rc = ReadFile(fd, ptr, rsize, &rlen, NULL) ? (int)rlen : -1;
if (rc == -1)
rlen = -1;
#else
rlen = read(fd, ptr, rsize);
#endif
if (rlen == -1)
return ErrCode();
ptr += rlen;
rsize -= rlen;
}
}
return MDB_SUCCESS;
}
int ESECT

@ -11,6 +11,8 @@ mdb_dump \- LMDB environment export tool
[\c
.BI \-f \ file\fR]
[\c
.BI \-i \ txnid\fR]
[\c
.BR \-l ]
[\c
.BR \-n ]
@ -35,6 +37,12 @@ Write the library version number to the standard output, and exit.
.BR \-f \ file
Write to the specified file instead of to the standard output.
.TP
.BR \-i \ txnid
Perform an incremental backup. Only pages that have been modified
after the given transaction ID will be written in the dump.
Note: This is a raw binary dump of the database pages, not the portable output format.
.TP
.BR \-l
List the databases stored in the environment. Just the
names will be listed, no data will be output.

@ -21,9 +21,12 @@
#include "lmdb.h"
#ifdef _WIN32
#include <windows.h>
#define Z "I"
#define MDB_STDOUT GetStdHandle(STD_OUTPUT_HANDLE)
#else
#define Z "z"
#define MDB_STDOUT 1
#endif
#define PRINT 1
@ -155,7 +158,7 @@ static int dumpit(MDB_txn *txn, MDB_dbi dbi, char *name)
static void usage(char *prog)
{
fprintf(stderr, "usage: %s dbpath [-V] [-f output] [-l] [-n] [-p] [-a|-s subdb]\n", prog);
fprintf(stderr, "usage: %s dbpath [-V] [-f output] [-i txnid] [-l] [-n] [-p] [-a|-s subdb]\n", prog);
exit(EXIT_FAILURE);
}
@ -166,8 +169,9 @@ int main(int argc, char *argv[])
MDB_txn *txn;
MDB_dbi dbi;
char *prog = argv[0];
char *envname;
char *envname, *outname = NULL;
char *subname = NULL;
size_t txnid = 0;
int alldbs = 0, envflags = 0, list = 0;
if (argc < 2) {
@ -179,10 +183,11 @@ int main(int argc, char *argv[])
* -n: use NOSUBDIR flag on env_open
* -p: use printable characters
* -f: write to file instead of stdout
* -i: do incremental dump from txnid
* -V: print version and exit
* (default) dump only the main DB
*/
while ((i = getopt(argc, argv, "af:lnps:V")) != EOF) {
while ((i = getopt(argc, argv, "af:i:lnps:V")) != EOF) {
switch(i) {
case 'V':
printf("%s\n", MDB_VERSION_STRING);
@ -197,8 +202,11 @@ int main(int argc, char *argv[])
alldbs++;
break;
case 'f':
if (freopen(optarg, "w", stdout) == NULL) {
fprintf(stderr, "%s: %s: reopen: %s\n",
outname = optarg;
break;
case 'i':
if (sscanf(optarg, "%" Z "i", &txnid) != 1 || !txnid) {
fprintf(stderr, "%s: %s: invalid txnid: %s\n",
prog, optarg, strerror(errno));
exit(EXIT_FAILURE);
}
@ -244,6 +252,22 @@ int main(int argc, char *argv[])
goto env_close;
}
if (txnid) {
if (outname)
rc = mdb_env_incr_dump(env, outname, txnid);
else
rc = mdb_env_incr_dumpfd(env, MDB_STDOUT, txnid);
if (rc)
fprintf(stderr, "mdb_env_incr_dump failed, error %d %s\n", rc, mdb_strerror(rc));
goto env_close;
}
if (outname && freopen(outname, "w", stdout) == NULL) {
fprintf(stderr, "%s: %s: reopen: %s\n",
prog, outname, strerror(errno));
exit(EXIT_FAILURE);
}
rc = mdb_txn_begin(env, NULL, MDB_RDONLY, &txn);
if (rc) {
fprintf(stderr, "mdb_txn_begin failed, error %d %s\n", rc, mdb_strerror(rc));

@ -11,6 +11,8 @@ mdb_load \- LMDB environment import tool
[\c
.BI \-f \ file\fR]
[\c
.BR \-i ]
[\c
.BR \-n ]
[\c
.BI \-s \ subdb\fR]
@ -40,6 +42,9 @@ Write the library version number to the standard output, and exit.
.BR \-f \ file
Read from the specified file instead of from the standard input.
.TP
.BR \-i
Load an incremental backup.
.TP
.BR \-n
Load an LMDB database which does not use subdirectories.
.TP

@ -19,6 +19,15 @@
#include <unistd.h>
#include "lmdb.h"
#ifdef _WIN32
#include <windows.h>
#define Z "I"
#define MDB_STDIN GetStdHandle(STD_INPUT_HANDLE)
#else
#define Z "z"
#define MDB_STDIN 0
#endif
#define PRINT 1
#define NOHDR 2
static int mode;
@ -38,12 +47,6 @@ static MDB_envinfo info;
static MDB_val kbuf, dbuf;
#ifdef _WIN32
#define Z "I"
#else
#define Z "z"
#endif
#define STRLENOF(s) (sizeof(s)-1)
typedef struct flagbit {
@ -276,7 +279,7 @@ badend:
static void usage()
{
fprintf(stderr, "usage: %s dbpath [-V] [-f input] [-n] [-s name] [-N] [-T]\n", prog);
fprintf(stderr, "usage: %s dbpath [-V] [-f input] [-i] [-n] [-s name] [-N] [-T]\n", prog);
exit(EXIT_FAILURE);
}
@ -290,6 +293,7 @@ int main(int argc, char *argv[])
char *envname;
int envflags = 0, putflags = 0;
int dohdr = 0;
int incr = 0;
prog = argv[0];
@ -298,13 +302,14 @@ int main(int argc, char *argv[])
}
/* -f: load file instead of stdin
* -i: load an incremental dump
* -n: use NOSUBDIR flag on env_open
* -s: load into named subDB
* -N: use NOOVERWRITE on puts
* -T: read plaintext
* -V: print version and exit
*/
while ((i = getopt(argc, argv, "f:ns:NTV")) != EOF) {
while ((i = getopt(argc, argv, "f:ins:NTV")) != EOF) {
switch(i) {
case 'V':
printf("%s\n", MDB_VERSION_STRING);
@ -317,6 +322,9 @@ int main(int argc, char *argv[])
exit(EXIT_FAILURE);
}
break;
case 'i':
incr = 1;
break;
case 'n':
envflags |= MDB_NOSUBDIR;
break;
@ -336,6 +344,21 @@ int main(int argc, char *argv[])
if (optind != argc - 1)
usage(prog);
envname = argv[optind];
if (incr) {
rc = mdb_env_create(&env);
envflags |= MDB_WRITEMAP;
rc = mdb_env_open(env, envname, envflags, 0664);
if (rc) {
fprintf(stderr, "mdb_env_open failed, error %d %s\n", rc, mdb_strerror(rc));
goto env_close;
}
rc = mdb_env_incr_loadfd(env, MDB_STDIN);
if (rc)
fprintf(stderr, "mdb_env_incr_load failed, error %d %s\n", rc, mdb_strerror(rc));
goto env_close;
}
dbuf.mv_size = 4096;
dbuf.mv_data = malloc(dbuf.mv_size);
@ -343,7 +366,6 @@ int main(int argc, char *argv[])
if (!(mode & NOHDR))
readhdr();
envname = argv[optind];
rc = mdb_env_create(&env);
mdb_env_set_maxdbs(env, 2);

Loading…
Cancel
Save