mirror of https://github.com/nextgraph-org/rkv.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1028 lines
36 KiB
1028 lines
36 KiB
// Copyright 2018-2019 Mozilla
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
|
// this file except in compliance with the License. You may obtain a copy of the
|
|
// License at http://www.apache.org/licenses/LICENSE-2.0
|
|
// Unless required by applicable law or agreed to in writing, software distributed
|
|
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
|
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations under the License.
|
|
|
|
//! A utility for migrating data from one LMDB environment to another.
|
|
//! Notably, this tool can migrate data from an enviroment created with
|
|
//! a different bit-depth than the current rkv consumer, which enables
|
|
//! the consumer to retrieve data from an environment that can't be read
|
|
//! directly using the rkv APIs.
|
|
//!
|
|
//! The utility supports both 32-bit and 64-bit LMDB source environments,
|
|
//! and it automatically migrates data in both the default database
|
|
//! and any named (sub) databases. It also migrates the source environment's
|
|
//! "map size" and "max DBs" configuration options to the destination
|
|
//! environment.
|
|
//!
|
|
//! The destination environment must be at the rkv consumer's bit depth
|
|
//! and should be empty of data. It can be an empty directory, in which case
|
|
//! the utility will create a new LMDB environment within the directory.
|
|
//!
|
|
//! The tool currently has these limitations:
|
|
//!
|
|
//! 1. It doesn't support migration from environments created with
|
|
//! `EnvironmentFlags::NO_SUB_DIR`. To migrate such an environment,
|
|
//! create a temporary directory, copy the environment's data file
|
|
//! to a file called data.mdb in the temporary directory, then migrate
|
|
//! the temporary directory as the source environment.
|
|
//! 2. It doesn't support migration from databases created with
|
|
//! `DatabaseFlags::DUP_SORT` (with or without `DatabaseFlags::DUP_FIXED`).
|
|
//! 3. It doesn't account for existing data in the destination environment,
|
|
//! which means that it can overwrite data (causing data loss) or fail
|
|
//! to migrate data if the destination environment contains existing data.
|
|
//!
|
|
//! ## Basic Usage
|
|
//!
|
|
//! Call `Migrator::new()` with the path to the source environment to create
|
|
//! a `Migrator` instance; then call the instance's `migrate()` method
|
|
//! with the path to the destination environment to migrate data from the source
|
|
//! to the destination environment. For example, this snippet migrates data
|
|
//! from the tests/envs/ref_env_32 environment to a new environment
|
|
//! in a temporary directory:
|
|
//!
|
|
//! ```
|
|
//! use rkv::migrate::Migrator;
|
|
//! use std::path::Path;
|
|
//! use tempfile::tempdir;
|
|
//! let mut migrator = Migrator::new(Path::new("tests/envs/ref_env_32")).unwrap();
|
|
//! migrator.migrate(&tempdir().unwrap().path()).unwrap();
|
|
//! ```
|
|
//!
|
|
//! Both `Migrator::new()` and `migrate()` return a `MigrateResult` that is
|
|
//! either an `Ok()` result or an `Err<MigrateError>`, where `MigrateError`
|
|
//! is an enum whose variants identify specific kinds of migration failures.
|
|
|
|
pub use crate::error::MigrateError;
|
|
use bitflags::bitflags;
|
|
use byteorder::{
|
|
LittleEndian,
|
|
ReadBytesExt,
|
|
};
|
|
use lmdb::{
|
|
DatabaseFlags,
|
|
Environment,
|
|
Transaction,
|
|
WriteFlags,
|
|
};
|
|
use std::{
|
|
collections::{
|
|
BTreeMap,
|
|
HashMap,
|
|
},
|
|
convert::TryFrom,
|
|
fs::File,
|
|
io::{
|
|
Cursor,
|
|
Read,
|
|
Seek,
|
|
SeekFrom,
|
|
Write,
|
|
},
|
|
path::{
|
|
Path,
|
|
PathBuf,
|
|
},
|
|
rc::Rc,
|
|
str,
|
|
};
|
|
|
|
const PAGESIZE: u16 = 4096;
|
|
|
|
// The magic number is 0xBEEFC0DE, which is 0xDEC0EFBE in little-endian.
|
|
// It appears at offset 12 on 32-bit systems and 16 on 64-bit systems.
|
|
// We don't support big-endian migration, but presumably we could do so
|
|
// by detecting the order of the bytes.
|
|
const MAGIC: [u8; 4] = [0xDE, 0xC0, 0xEF, 0xBE];
|
|
|
|
pub type MigrateResult<T> = Result<T, MigrateError>;
|
|
|
|
bitflags! {
|
|
#[derive(Default)]
|
|
struct PageFlags: u16 {
|
|
const BRANCH = 0x01;
|
|
const LEAF = 0x02;
|
|
const OVERFLOW = 0x04;
|
|
const META = 0x08;
|
|
const DIRTY = 0x10;
|
|
const LEAF2 = 0x20;
|
|
const SUBP = 0x40;
|
|
const LOOSE = 0x4000;
|
|
const KEEP = 0x8000;
|
|
}
|
|
}
|
|
|
|
bitflags! {
|
|
#[derive(Default)]
|
|
struct NodeFlags: u16 {
|
|
const BIGDATA = 0x01;
|
|
const SUBDATA = 0x02;
|
|
const DUPDATA = 0x04;
|
|
}
|
|
}
|
|
|
|
// The bit depth of the executable that created an LMDB environment.
|
|
// The Migrator determines this automatically based on the location of
|
|
// the magic number in the data.mdb file.
|
|
#[derive(Clone, Copy, PartialEq)]
|
|
enum Bits {
|
|
U32,
|
|
U64,
|
|
}
|
|
|
|
impl Bits {
|
|
// The size of usize for the bit-depth represented by the enum variant.
|
|
fn size(self) -> usize {
|
|
match self {
|
|
Bits::U32 => 4,
|
|
Bits::U64 => 8,
|
|
}
|
|
}
|
|
}
|
|
|
|
// The equivalent of PAGEHDRSZ in LMDB, except that this one varies by bits.
|
|
fn page_header_size(bits: Bits) -> u64 {
|
|
match bits {
|
|
Bits::U32 => 12,
|
|
Bits::U64 => 16,
|
|
}
|
|
}
|
|
|
|
// The equivalent of P_INVALID in LMDB, except that this one varies by bits.
|
|
fn validate_page_num(page_num: u64, bits: Bits) -> MigrateResult<()> {
|
|
let invalid_page_num = match bits {
|
|
Bits::U32 => u64::from(!0u32),
|
|
Bits::U64 => !0u64,
|
|
};
|
|
|
|
if page_num == invalid_page_num {
|
|
return Err(MigrateError::InvalidPageNum);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[derive(Clone, Debug, Default)]
|
|
struct Database {
|
|
md_pad: u32,
|
|
md_flags: DatabaseFlags,
|
|
md_depth: u16,
|
|
md_branch_pages: u64,
|
|
md_leaf_pages: u64,
|
|
md_overflow_pages: u64,
|
|
md_entries: u64,
|
|
md_root: u64,
|
|
}
|
|
|
|
impl Database {
|
|
fn new(cursor: &mut Cursor<&[u8]>, bits: Bits) -> MigrateResult<Database> {
|
|
Ok(Database {
|
|
md_pad: cursor.read_u32::<LittleEndian>()?,
|
|
md_flags: DatabaseFlags::from_bits(cursor.read_u16::<LittleEndian>()?.into())
|
|
.ok_or(MigrateError::InvalidDatabaseBits)?,
|
|
md_depth: cursor.read_u16::<LittleEndian>()?,
|
|
md_branch_pages: cursor.read_uint::<LittleEndian>(bits.size())?,
|
|
md_leaf_pages: cursor.read_uint::<LittleEndian>(bits.size())?,
|
|
md_overflow_pages: cursor.read_uint::<LittleEndian>(bits.size())?,
|
|
md_entries: cursor.read_uint::<LittleEndian>(bits.size())?,
|
|
md_root: cursor.read_uint::<LittleEndian>(bits.size())?,
|
|
})
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Default)]
|
|
struct Databases {
|
|
free: Database,
|
|
main: Database,
|
|
}
|
|
|
|
#[derive(Debug, Default)]
|
|
struct MetaData {
|
|
mm_magic: u32,
|
|
mm_version: u32,
|
|
mm_address: u64,
|
|
mm_mapsize: u64,
|
|
mm_dbs: Databases,
|
|
mm_last_pg: u64,
|
|
mm_txnid: u64,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
enum LeafNode {
|
|
Regular {
|
|
mn_lo: u16,
|
|
mn_hi: u16,
|
|
mn_flags: NodeFlags,
|
|
mn_ksize: u16,
|
|
mv_size: u32,
|
|
key: Vec<u8>,
|
|
value: Vec<u8>,
|
|
},
|
|
BigData {
|
|
mn_lo: u16,
|
|
mn_hi: u16,
|
|
mn_flags: NodeFlags,
|
|
mn_ksize: u16,
|
|
mv_size: u32,
|
|
key: Vec<u8>,
|
|
overflow_pgno: u64,
|
|
},
|
|
SubData {
|
|
mn_lo: u16,
|
|
mn_hi: u16,
|
|
mn_flags: NodeFlags,
|
|
mn_ksize: u16,
|
|
mv_size: u32,
|
|
key: Vec<u8>,
|
|
value: Vec<u8>,
|
|
db: Database,
|
|
},
|
|
}
|
|
|
|
#[derive(Debug, Default)]
|
|
struct BranchNode {
|
|
mp_pgno: u64,
|
|
mn_ksize: u16,
|
|
mn_data: Vec<u8>,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
enum PageHeader {
|
|
Regular {
|
|
mp_pgno: u64,
|
|
mp_flags: PageFlags,
|
|
pb_lower: u16,
|
|
pb_upper: u16,
|
|
},
|
|
Overflow {
|
|
mp_pgno: u64,
|
|
mp_flags: PageFlags,
|
|
pb_pages: u32,
|
|
},
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
enum Page {
|
|
META(MetaData),
|
|
LEAF(Vec<LeafNode>),
|
|
BRANCH(Vec<BranchNode>),
|
|
}
|
|
|
|
impl Page {
|
|
fn new(buf: Vec<u8>, bits: Bits) -> MigrateResult<Page> {
|
|
let mut cursor = std::io::Cursor::new(&buf[..]);
|
|
|
|
match Self::parse_page_header(&mut cursor, bits)? {
|
|
PageHeader::Regular {
|
|
mp_flags,
|
|
pb_lower,
|
|
..
|
|
} => {
|
|
if mp_flags.contains(PageFlags::LEAF2) || mp_flags.contains(PageFlags::SUBP) {
|
|
// We don't yet support DUPFIXED and DUPSORT databases.
|
|
return Err(MigrateError::UnsupportedPageHeaderVariant);
|
|
}
|
|
|
|
if mp_flags.contains(PageFlags::META) {
|
|
let meta_data = Self::parse_meta_data(&mut cursor, bits)?;
|
|
Ok(Page::META(meta_data))
|
|
} else if mp_flags.contains(PageFlags::LEAF) {
|
|
let nodes = Self::parse_leaf_nodes(&mut cursor, pb_lower, bits)?;
|
|
Ok(Page::LEAF(nodes))
|
|
} else if mp_flags.contains(PageFlags::BRANCH) {
|
|
let nodes = Self::parse_branch_nodes(&mut cursor, pb_lower, bits)?;
|
|
Ok(Page::BRANCH(nodes))
|
|
} else {
|
|
Err(MigrateError::UnexpectedPageHeaderVariant)
|
|
}
|
|
},
|
|
PageHeader::Overflow {
|
|
..
|
|
} => {
|
|
// There isn't anything to do, nor should we try to instantiate
|
|
// a page of this type, as we only access them when reading
|
|
// a value that is too large to fit into a leaf node.
|
|
Err(MigrateError::UnexpectedPageHeaderVariant)
|
|
},
|
|
}
|
|
}
|
|
|
|
fn parse_page_header(cursor: &mut Cursor<&[u8]>, bits: Bits) -> MigrateResult<PageHeader> {
|
|
let mp_pgno = cursor.read_uint::<LittleEndian>(bits.size())?;
|
|
let _mp_pad = cursor.read_u16::<LittleEndian>()?;
|
|
let mp_flags = PageFlags::from_bits(cursor.read_u16::<LittleEndian>()?).ok_or(MigrateError::InvalidPageBits)?;
|
|
|
|
if mp_flags.contains(PageFlags::OVERFLOW) {
|
|
let pb_pages = cursor.read_u32::<LittleEndian>()?;
|
|
Ok(PageHeader::Overflow {
|
|
mp_pgno,
|
|
mp_flags,
|
|
pb_pages,
|
|
})
|
|
} else {
|
|
let pb_lower = cursor.read_u16::<LittleEndian>()?;
|
|
let pb_upper = cursor.read_u16::<LittleEndian>()?;
|
|
Ok(PageHeader::Regular {
|
|
mp_pgno,
|
|
mp_flags,
|
|
pb_lower,
|
|
pb_upper,
|
|
})
|
|
}
|
|
}
|
|
|
|
fn parse_meta_data(mut cursor: &mut Cursor<&[u8]>, bits: Bits) -> MigrateResult<MetaData> {
|
|
cursor.seek(SeekFrom::Start(page_header_size(bits)))?;
|
|
|
|
Ok(MetaData {
|
|
mm_magic: cursor.read_u32::<LittleEndian>()?,
|
|
mm_version: cursor.read_u32::<LittleEndian>()?,
|
|
mm_address: cursor.read_uint::<LittleEndian>(bits.size())?,
|
|
mm_mapsize: cursor.read_uint::<LittleEndian>(bits.size())?,
|
|
mm_dbs: Databases {
|
|
free: Database::new(&mut cursor, bits)?,
|
|
main: Database::new(&mut cursor, bits)?,
|
|
},
|
|
mm_last_pg: cursor.read_uint::<LittleEndian>(bits.size())?,
|
|
mm_txnid: cursor.read_uint::<LittleEndian>(bits.size())?,
|
|
})
|
|
}
|
|
|
|
fn parse_leaf_nodes(cursor: &mut Cursor<&[u8]>, pb_lower: u16, bits: Bits) -> MigrateResult<Vec<LeafNode>> {
|
|
cursor.set_position(page_header_size(bits));
|
|
let num_keys = Self::num_keys(pb_lower, bits);
|
|
let mp_ptrs = Self::parse_mp_ptrs(cursor, num_keys)?;
|
|
|
|
let mut leaf_nodes = Vec::with_capacity(num_keys as usize);
|
|
|
|
for mp_ptr in mp_ptrs {
|
|
cursor.set_position(u64::from(mp_ptr));
|
|
leaf_nodes.push(Self::parse_leaf_node(cursor, bits)?);
|
|
}
|
|
|
|
Ok(leaf_nodes)
|
|
}
|
|
|
|
fn parse_leaf_node(cursor: &mut Cursor<&[u8]>, bits: Bits) -> MigrateResult<LeafNode> {
|
|
// The order of the mn_lo and mn_hi fields is endian-dependent and would
|
|
// be reversed in an LMDB environment created on a big-endian system.
|
|
let mn_lo = cursor.read_u16::<LittleEndian>()?;
|
|
let mn_hi = cursor.read_u16::<LittleEndian>()?;
|
|
|
|
let mn_flags = NodeFlags::from_bits(cursor.read_u16::<LittleEndian>()?).ok_or(MigrateError::InvalidNodeBits)?;
|
|
let mn_ksize = cursor.read_u16::<LittleEndian>()?;
|
|
|
|
let start = usize::try_from(cursor.position())?;
|
|
let end = usize::try_from(cursor.position() + u64::from(mn_ksize))?;
|
|
let key = cursor.get_ref()[start..end].to_vec();
|
|
cursor.set_position(end as u64);
|
|
|
|
let mv_size = Self::leaf_node_size(mn_lo, mn_hi);
|
|
if mn_flags.contains(NodeFlags::BIGDATA) {
|
|
let overflow_pgno = cursor.read_uint::<LittleEndian>(bits.size())?;
|
|
|
|
Ok(LeafNode::BigData {
|
|
mn_lo,
|
|
mn_hi,
|
|
mn_flags,
|
|
mn_ksize,
|
|
mv_size,
|
|
key,
|
|
overflow_pgno,
|
|
})
|
|
} else if mn_flags.contains(NodeFlags::SUBDATA) {
|
|
let start = usize::try_from(cursor.position())?;
|
|
let end = usize::try_from(cursor.position() + u64::from(mv_size))?;
|
|
let value = cursor.get_ref()[start..end].to_vec();
|
|
let mut cursor = std::io::Cursor::new(&value[..]);
|
|
let db = Database::new(&mut cursor, bits)?;
|
|
validate_page_num(db.md_root, bits)?;
|
|
|
|
Ok(LeafNode::SubData {
|
|
mn_lo,
|
|
mn_hi,
|
|
mn_flags,
|
|
mn_ksize,
|
|
mv_size,
|
|
key,
|
|
value,
|
|
db,
|
|
})
|
|
} else {
|
|
let start = usize::try_from(cursor.position())?;
|
|
let end = usize::try_from(cursor.position() + u64::from(mv_size))?;
|
|
let value = cursor.get_ref()[start..end].to_vec();
|
|
|
|
Ok(LeafNode::Regular {
|
|
mn_lo,
|
|
mn_hi,
|
|
mn_flags,
|
|
mn_ksize,
|
|
mv_size,
|
|
key,
|
|
value,
|
|
})
|
|
}
|
|
}
|
|
|
|
fn leaf_node_size(mn_lo: u16, mn_hi: u16) -> u32 {
|
|
u32::from(mn_lo) + ((u32::from(mn_hi)) << 16)
|
|
}
|
|
|
|
fn parse_branch_nodes(cursor: &mut Cursor<&[u8]>, pb_lower: u16, bits: Bits) -> MigrateResult<Vec<BranchNode>> {
|
|
let num_keys = Self::num_keys(pb_lower, bits);
|
|
let mp_ptrs = Self::parse_mp_ptrs(cursor, num_keys)?;
|
|
|
|
let mut branch_nodes = Vec::with_capacity(num_keys as usize);
|
|
|
|
for mp_ptr in mp_ptrs {
|
|
cursor.set_position(u64::from(mp_ptr));
|
|
branch_nodes.push(Self::parse_branch_node(cursor, bits)?)
|
|
}
|
|
|
|
Ok(branch_nodes)
|
|
}
|
|
|
|
fn parse_branch_node(cursor: &mut Cursor<&[u8]>, bits: Bits) -> MigrateResult<BranchNode> {
|
|
// The order of the mn_lo and mn_hi fields is endian-dependent and would
|
|
// be reversed in an LMDB environment created on a big-endian system.
|
|
let mn_lo = cursor.read_u16::<LittleEndian>()?;
|
|
let mn_hi = cursor.read_u16::<LittleEndian>()?;
|
|
|
|
let mn_flags = cursor.read_u16::<LittleEndian>()?;
|
|
|
|
// Branch nodes overload the mn_lo, mn_hi, and mn_flags fields
|
|
// to store the page number, so we derive the number from those fields.
|
|
let mp_pgno = Self::branch_node_page_num(mn_lo, mn_hi, mn_flags, bits);
|
|
|
|
let mn_ksize = cursor.read_u16::<LittleEndian>()?;
|
|
|
|
let position = cursor.position();
|
|
let start = usize::try_from(position)?;
|
|
let end = usize::try_from(position + u64::from(mn_ksize))?;
|
|
let mn_data = cursor.get_ref()[start..end].to_vec();
|
|
cursor.set_position(end as u64);
|
|
|
|
Ok(BranchNode {
|
|
mp_pgno,
|
|
mn_ksize,
|
|
mn_data,
|
|
})
|
|
}
|
|
|
|
fn branch_node_page_num(mn_lo: u16, mn_hi: u16, mn_flags: u16, bits: Bits) -> u64 {
|
|
let mut page_num = u64::from(u32::from(mn_lo) + (u32::from(mn_hi) << 16));
|
|
if bits == Bits::U64 {
|
|
page_num += u64::from(mn_flags) << 32;
|
|
}
|
|
page_num
|
|
}
|
|
|
|
fn parse_mp_ptrs(cursor: &mut Cursor<&[u8]>, num_keys: u64) -> MigrateResult<Vec<u16>> {
|
|
let mut mp_ptrs = Vec::with_capacity(num_keys as usize);
|
|
for _ in 0..num_keys {
|
|
mp_ptrs.push(cursor.read_u16::<LittleEndian>()?);
|
|
}
|
|
Ok(mp_ptrs)
|
|
}
|
|
|
|
fn num_keys(pb_lower: u16, bits: Bits) -> u64 {
|
|
(u64::from(pb_lower) - page_header_size(bits)) >> 1
|
|
}
|
|
}
|
|
|
|
pub struct Migrator {
|
|
file: File,
|
|
bits: Bits,
|
|
}
|
|
|
|
impl Migrator {
|
|
/// Create a new Migrator for the LMDB environment at the given path.
|
|
/// This tries to open the data.mdb file in the environment and determine
|
|
/// the bit depth of the executable that created it, so it can fail
|
|
/// and return an Err if the file can't be opened or the depth determined.
|
|
pub fn new(path: &Path) -> MigrateResult<Migrator> {
|
|
let mut path = PathBuf::from(path);
|
|
path.push("data.mdb");
|
|
let mut file = File::open(&path)?;
|
|
|
|
file.seek(SeekFrom::Start(page_header_size(Bits::U32)))?;
|
|
let mut buf = [0; 4];
|
|
file.read_exact(&mut buf)?;
|
|
|
|
let bits = if buf == MAGIC {
|
|
Bits::U32
|
|
} else {
|
|
file.seek(SeekFrom::Start(page_header_size(Bits::U64)))?;
|
|
file.read_exact(&mut buf)?;
|
|
if buf == MAGIC {
|
|
Bits::U64
|
|
} else {
|
|
return Err(MigrateError::IndeterminateBitDepth);
|
|
}
|
|
};
|
|
|
|
Ok(Migrator {
|
|
file,
|
|
bits,
|
|
})
|
|
}
|
|
|
|
/// Dump the data in one of the databases in the LMDB environment.
|
|
/// If the `database` paremeter is None, then we dump the data in the main
|
|
/// database. If it's the name of a subdatabase, then we dump the data
|
|
/// in that subdatabase.
|
|
///
|
|
/// Note that the output isn't identical to that of the mdb_dump utility,
|
|
/// since mdb_dump includes subdatabase key/value pairs when dumping
|
|
/// the main database, and those values are architecture-dependent, since
|
|
/// they contain pointer-sized data.
|
|
///
|
|
/// If we wanted to support identical output, we could parameterize
|
|
/// inclusion of subdatabase pairs in get_pairs() and include them
|
|
/// when dumping data, while continuing to exclude them when migrating
|
|
/// data.
|
|
pub fn dump<T: Write>(&mut self, database: Option<&str>, mut out: T) -> MigrateResult<()> {
|
|
let meta_data = self.get_meta_data()?;
|
|
let root_page_num = meta_data.mm_dbs.main.md_root;
|
|
let root_page = Rc::new(self.get_page(root_page_num)?);
|
|
|
|
let pairs;
|
|
if let Some(database) = database {
|
|
let subdbs = self.get_subdbs(root_page)?;
|
|
let database =
|
|
subdbs.get(database.as_bytes()).ok_or_else(|| MigrateError::DatabaseNotFound(database.to_string()))?;
|
|
let root_page_num = database.md_root;
|
|
let root_page = Rc::new(self.get_page(root_page_num)?);
|
|
pairs = self.get_pairs(root_page)?;
|
|
} else {
|
|
pairs = self.get_pairs(root_page)?;
|
|
}
|
|
|
|
out.write_all(b"VERSION=3\n")?;
|
|
out.write_all(b"format=bytevalue\n")?;
|
|
if let Some(database) = database {
|
|
writeln!(out, "database={}", database)?;
|
|
}
|
|
out.write_all(b"type=btree\n")?;
|
|
writeln!(out, "mapsize={}", meta_data.mm_mapsize)?;
|
|
out.write_all(b"maxreaders=126\n")?;
|
|
out.write_all(b"db_pagesize=4096\n")?;
|
|
out.write_all(b"HEADER=END\n")?;
|
|
|
|
for (key, value) in pairs {
|
|
out.write_all(b" ")?;
|
|
for byte in key {
|
|
write!(out, "{:02x}", byte)?;
|
|
}
|
|
out.write_all(b"\n")?;
|
|
out.write_all(b" ")?;
|
|
for byte in value {
|
|
write!(out, "{:02x}", byte)?;
|
|
}
|
|
out.write_all(b"\n")?;
|
|
}
|
|
|
|
out.write_all(b"DATA=END\n")?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Migrate all data in all of databases in the existing LMDB environment
|
|
/// to a new environment. This includes all key/value pairs in the main
|
|
/// database that aren't metadata about subdatabases and all key/value pairs
|
|
/// in all subdatabases.
|
|
///
|
|
/// We also set the map size and maximum databases of the new environment
|
|
/// to their values for the existing environment. But we don't set
|
|
/// other metadata, and we don't check that the new environment is empty
|
|
/// before migrating data.
|
|
///
|
|
/// Thus it's possible for this to overwrite existing data or fail
|
|
/// to migrate data if the new environment isn't empty. It's the consumer's
|
|
/// responsibility to ensure that data can be safely migrated to the new
|
|
/// environment. In general, this means that environment should be empty.
|
|
pub fn migrate(&mut self, dest: &Path) -> MigrateResult<()> {
|
|
let meta_data = self.get_meta_data()?;
|
|
let root_page_num = meta_data.mm_dbs.main.md_root;
|
|
validate_page_num(root_page_num, self.bits)?;
|
|
let root_page = Rc::new(self.get_page(root_page_num)?);
|
|
let subdbs = self.get_subdbs(Rc::clone(&root_page))?;
|
|
|
|
let env = Environment::new()
|
|
.set_map_size(meta_data.mm_mapsize as usize)
|
|
.set_max_dbs(subdbs.len() as u32)
|
|
.open(dest)?;
|
|
|
|
// Create the databases before we open a read-write transaction,
|
|
// since database creation requires its own read-write transaction,
|
|
// which would hang while awaiting completion of an existing one.
|
|
env.create_db(None, meta_data.mm_dbs.main.md_flags)?;
|
|
for (subdb_name, subdb_info) in &subdbs {
|
|
env.create_db(Some(str::from_utf8(&subdb_name)?), subdb_info.md_flags)?;
|
|
}
|
|
|
|
// Now open the read-write transaction that we'll use to migrate
|
|
// all the data.
|
|
let mut txn = env.begin_rw_txn()?;
|
|
|
|
// Migrate the main database.
|
|
let pairs = self.get_pairs(root_page)?;
|
|
let db = env.open_db(None)?;
|
|
for (key, value) in pairs {
|
|
// If we knew that the target database was empty, we could
|
|
// specify WriteFlags::APPEND to speed up the migration.
|
|
txn.put(db, &key, &value, WriteFlags::empty())?;
|
|
}
|
|
|
|
// Migrate subdatabases.
|
|
for (subdb_name, subdb_info) in &subdbs {
|
|
let root_page = Rc::new(self.get_page(subdb_info.md_root)?);
|
|
let pairs = self.get_pairs(root_page)?;
|
|
let db = env.open_db(Some(str::from_utf8(&subdb_name)?))?;
|
|
for (key, value) in pairs {
|
|
// If we knew that the target database was empty, we could
|
|
// specify WriteFlags::APPEND to speed up the migration.
|
|
txn.put(db, &key, &value, WriteFlags::empty())?;
|
|
}
|
|
}
|
|
|
|
txn.commit()?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn get_subdbs(&mut self, root_page: Rc<Page>) -> MigrateResult<HashMap<Vec<u8>, Database>> {
|
|
let mut subdbs = HashMap::new();
|
|
let mut pages = vec![root_page];
|
|
|
|
while let Some(page) = pages.pop() {
|
|
match &*page {
|
|
Page::BRANCH(nodes) => {
|
|
for branch in nodes {
|
|
pages.push(Rc::new(self.get_page(branch.mp_pgno)?));
|
|
}
|
|
},
|
|
Page::LEAF(nodes) => {
|
|
for leaf in nodes {
|
|
if let LeafNode::SubData {
|
|
key,
|
|
db,
|
|
..
|
|
} = leaf
|
|
{
|
|
subdbs.insert(key.to_vec(), db.clone());
|
|
};
|
|
}
|
|
},
|
|
_ => {
|
|
return Err(MigrateError::UnexpectedPageVariant);
|
|
},
|
|
}
|
|
}
|
|
|
|
Ok(subdbs)
|
|
}
|
|
|
|
fn get_pairs(&mut self, root_page: Rc<Page>) -> MigrateResult<BTreeMap<Vec<u8>, Vec<u8>>> {
|
|
let mut pairs = BTreeMap::new();
|
|
let mut pages = vec![root_page];
|
|
|
|
while let Some(page) = pages.pop() {
|
|
match &*page {
|
|
Page::BRANCH(nodes) => {
|
|
for branch in nodes {
|
|
pages.push(Rc::new(self.get_page(branch.mp_pgno)?));
|
|
}
|
|
},
|
|
Page::LEAF(nodes) => {
|
|
for leaf in nodes {
|
|
match leaf {
|
|
LeafNode::Regular {
|
|
key,
|
|
value,
|
|
..
|
|
} => {
|
|
pairs.insert(key.to_vec(), value.to_vec());
|
|
},
|
|
LeafNode::BigData {
|
|
mv_size,
|
|
key,
|
|
overflow_pgno,
|
|
..
|
|
} => {
|
|
// XXX perhaps we could reduce memory consumption
|
|
// during a migration by waiting to read big data
|
|
// until it's time to write it to the new database.
|
|
let value = self.read_data(
|
|
*overflow_pgno * u64::from(PAGESIZE) + page_header_size(self.bits),
|
|
*mv_size as usize,
|
|
)?;
|
|
pairs.insert(key.to_vec(), value);
|
|
},
|
|
LeafNode::SubData {
|
|
..
|
|
} => {
|
|
// We don't include subdatabase leaves in pairs,
|
|
// since there's no architecture-neutral
|
|
// representation of them, and in any case they're
|
|
// meta-data that should get recreated when we
|
|
// migrate the subdatabases themselves.
|
|
//
|
|
// If we wanted to create identical dumps to those
|
|
// produced by mdb_dump, however, we could allow
|
|
// consumers to specify that they'd like to include
|
|
// these records.
|
|
},
|
|
};
|
|
}
|
|
},
|
|
_ => {
|
|
return Err(MigrateError::UnexpectedPageVariant);
|
|
},
|
|
}
|
|
}
|
|
|
|
Ok(pairs)
|
|
}
|
|
|
|
fn read_data(&mut self, offset: u64, size: usize) -> MigrateResult<Vec<u8>> {
|
|
self.file.seek(SeekFrom::Start(offset))?;
|
|
let mut buf: Vec<u8> = vec![0; size];
|
|
self.file.read_exact(&mut buf[0..size])?;
|
|
Ok(buf.to_vec())
|
|
}
|
|
|
|
fn get_page(&mut self, page_no: u64) -> MigrateResult<Page> {
|
|
Page::new(self.read_data(page_no * u64::from(PAGESIZE), usize::from(PAGESIZE))?, self.bits)
|
|
}
|
|
|
|
fn get_meta_data(&mut self) -> MigrateResult<MetaData> {
|
|
let (page0, page1) = (self.get_page(0)?, self.get_page(1)?);
|
|
|
|
match (page0, page1) {
|
|
(Page::META(meta0), Page::META(meta1)) => {
|
|
let meta = if meta1.mm_txnid > meta0.mm_txnid {
|
|
meta1
|
|
} else {
|
|
meta0
|
|
};
|
|
if meta.mm_magic != 0xBE_EF_C0_DE {
|
|
return Err(MigrateError::InvalidMagicNum);
|
|
}
|
|
if meta.mm_version != 1 && meta.mm_version != 999 {
|
|
return Err(MigrateError::InvalidDataVersion);
|
|
}
|
|
Ok(meta)
|
|
},
|
|
_ => Err(MigrateError::UnexpectedPageVariant),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::MigrateResult;
|
|
use super::Migrator;
|
|
use crate::error::MigrateError;
|
|
use lmdb::{
|
|
Environment,
|
|
Error as LmdbError,
|
|
};
|
|
use std::{
|
|
env,
|
|
fs::{
|
|
self,
|
|
File,
|
|
},
|
|
io::{
|
|
Read,
|
|
Seek,
|
|
SeekFrom,
|
|
},
|
|
mem::size_of,
|
|
path::PathBuf,
|
|
};
|
|
use tempfile::{
|
|
tempdir,
|
|
tempfile,
|
|
};
|
|
|
|
fn compare_files(ref_file: &mut File, new_file: &mut File) -> MigrateResult<()> {
|
|
ref_file.seek(SeekFrom::Start(0))?;
|
|
new_file.seek(SeekFrom::Start(0))?;
|
|
|
|
let ref_buf = &mut [0; 1024];
|
|
let new_buf = &mut [0; 1024];
|
|
|
|
loop {
|
|
match ref_file.read(ref_buf) {
|
|
Err(err) => panic!(err),
|
|
Ok(ref_len) => match new_file.read(new_buf) {
|
|
Err(err) => panic!(err),
|
|
Ok(new_len) => {
|
|
assert_eq!(ref_len, new_len);
|
|
if ref_len == 0 {
|
|
break;
|
|
};
|
|
assert_eq!(ref_buf[0..ref_len], new_buf[0..new_len]);
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn test_dump_32() -> MigrateResult<()> {
|
|
let cwd = env::current_dir()?;
|
|
let cwd = cwd.to_str().ok_or(MigrateError::StringConversionError)?;
|
|
let test_env_path: PathBuf = [cwd, "tests", "envs", "ref_env_32"].iter().collect();
|
|
|
|
// Dump data from the test env to a new dump file.
|
|
let mut migrator = Migrator::new(&test_env_path)?;
|
|
let mut new_dump_file = tempfile()?;
|
|
migrator.dump(None, &new_dump_file)?;
|
|
|
|
// Open the reference dump file.
|
|
let ref_dump_file_path: PathBuf = [cwd, "tests", "envs", "ref_dump.txt"].iter().collect();
|
|
let mut ref_dump_file = File::open(ref_dump_file_path)?;
|
|
|
|
// Compare the new dump file to the reference dump file.
|
|
compare_files(&mut ref_dump_file, &mut new_dump_file)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn test_dump_32_subdb() -> MigrateResult<()> {
|
|
let cwd = env::current_dir()?;
|
|
let cwd = cwd.to_str().ok_or(MigrateError::StringConversionError)?;
|
|
let test_env_path: PathBuf = [cwd, "tests", "envs", "ref_env_32"].iter().collect();
|
|
|
|
// Dump data from the test env to a new dump file.
|
|
let mut migrator = Migrator::new(&test_env_path)?;
|
|
let mut new_dump_file = tempfile()?;
|
|
migrator.dump(Some("subdb"), &new_dump_file)?;
|
|
|
|
// Open the reference dump file.
|
|
let ref_dump_file_path: PathBuf = [cwd, "tests", "envs", "ref_dump_subdb.txt"].iter().collect();
|
|
let mut ref_dump_file = File::open(ref_dump_file_path)?;
|
|
|
|
// Compare the new dump file to the reference dump file.
|
|
compare_files(&mut ref_dump_file, &mut new_dump_file)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn test_dump_64() -> MigrateResult<()> {
|
|
let cwd = env::current_dir()?;
|
|
let cwd = cwd.to_str().ok_or(MigrateError::StringConversionError)?;
|
|
let test_env_path: PathBuf = [cwd, "tests", "envs", "ref_env_64"].iter().collect();
|
|
|
|
// Dump data from the test env to a new dump file.
|
|
let mut migrator = Migrator::new(&test_env_path)?;
|
|
let mut new_dump_file = tempfile()?;
|
|
migrator.dump(None, &new_dump_file)?;
|
|
|
|
// Open the reference dump file.
|
|
let ref_dump_file_path: PathBuf = [cwd, "tests", "envs", "ref_dump.txt"].iter().collect();
|
|
let mut ref_dump_file = File::open(ref_dump_file_path)?;
|
|
|
|
// Compare the new dump file to the reference dump file.
|
|
compare_files(&mut ref_dump_file, &mut new_dump_file)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn test_dump_64_subdb() -> MigrateResult<()> {
|
|
let cwd = env::current_dir()?;
|
|
let cwd = cwd.to_str().ok_or(MigrateError::StringConversionError)?;
|
|
let test_env_path: PathBuf = [cwd, "tests", "envs", "ref_env_64"].iter().collect();
|
|
|
|
// Dump data from the test env to a new dump file.
|
|
let mut migrator = Migrator::new(&test_env_path)?;
|
|
let mut new_dump_file = tempfile()?;
|
|
migrator.dump(Some("subdb"), &new_dump_file)?;
|
|
|
|
// Open the reference dump file.
|
|
let ref_dump_file_path: PathBuf = [cwd, "tests", "envs", "ref_dump_subdb.txt"].iter().collect();
|
|
let mut ref_dump_file = File::open(ref_dump_file_path)?;
|
|
|
|
// Compare the new dump file to the reference dump file.
|
|
compare_files(&mut ref_dump_file, &mut new_dump_file)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn test_migrate_64() -> MigrateResult<()> {
|
|
let cwd = env::current_dir()?;
|
|
let cwd = cwd.to_str().ok_or(MigrateError::StringConversionError)?;
|
|
let test_env_path: PathBuf = [cwd, "tests", "envs", "ref_env_64"].iter().collect();
|
|
|
|
// Migrate data from the old env to a new one.
|
|
let new_env = tempdir()?;
|
|
let mut migrator = Migrator::new(&test_env_path)?;
|
|
migrator.migrate(new_env.path())?;
|
|
|
|
// Dump data from the new env to a new dump file.
|
|
let mut migrator = Migrator::new(&new_env.path())?;
|
|
let mut new_dump_file = tempfile()?;
|
|
migrator.dump(Some("subdb"), &new_dump_file)?;
|
|
|
|
// Open the reference dump file.
|
|
let ref_dump_file_path: PathBuf = [cwd, "tests", "envs", "ref_dump_subdb.txt"].iter().collect();
|
|
let mut ref_dump_file = File::open(ref_dump_file_path)?;
|
|
|
|
// Compare the new dump file to the reference dump file.
|
|
compare_files(&mut ref_dump_file, &mut new_dump_file)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn test_migrate_32() -> MigrateResult<()> {
|
|
let cwd = env::current_dir()?;
|
|
let cwd = cwd.to_str().ok_or(MigrateError::StringConversionError)?;
|
|
let test_env_path: PathBuf = [cwd, "tests", "envs", "ref_env_32"].iter().collect();
|
|
|
|
// Migrate data from the old env to a new one.
|
|
let new_env = tempdir()?;
|
|
let mut migrator = Migrator::new(&test_env_path)?;
|
|
migrator.migrate(new_env.path())?;
|
|
|
|
// Dump data from the new env to a new dump file.
|
|
let mut migrator = Migrator::new(&new_env.path())?;
|
|
let mut new_dump_file = tempfile()?;
|
|
migrator.dump(Some("subdb"), &new_dump_file)?;
|
|
|
|
// Open the reference dump file.
|
|
let ref_dump_file_path: PathBuf = [cwd, "tests", "envs", "ref_dump_subdb.txt"].iter().collect();
|
|
let mut ref_dump_file = File::open(ref_dump_file_path)?;
|
|
|
|
// Compare the new dump file to the reference dump file.
|
|
compare_files(&mut ref_dump_file, &mut new_dump_file)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn test_migrate_and_replace() -> MigrateResult<()> {
|
|
let test_env_name = match size_of::<usize>() {
|
|
4 => "ref_env_64",
|
|
8 => "ref_env_32",
|
|
_ => panic!("only 32- and 64-bit depths are supported"),
|
|
};
|
|
|
|
let cwd = env::current_dir()?;
|
|
let cwd = cwd.to_str().ok_or(MigrateError::StringConversionError)?;
|
|
let test_env_path: PathBuf = [cwd, "tests", "envs", test_env_name].iter().collect();
|
|
|
|
let old_env = tempdir()?;
|
|
fs::copy(test_env_path.join("data.mdb"), old_env.path().join("data.mdb"))?;
|
|
fs::copy(test_env_path.join("lock.mdb"), old_env.path().join("lock.mdb"))?;
|
|
|
|
// Confirm that it isn't possible to open the old environment with LMDB.
|
|
assert_eq!(
|
|
match Environment::new().open(&old_env.path()) {
|
|
Err(err) => err,
|
|
_ => panic!("opening the environment should have failed"),
|
|
},
|
|
LmdbError::Invalid
|
|
);
|
|
|
|
// Migrate data from the old env to a new one.
|
|
let new_env = tempdir()?;
|
|
let mut migrator = Migrator::new(&old_env.path())?;
|
|
migrator.migrate(new_env.path())?;
|
|
|
|
// Dump data from the new env to a new dump file.
|
|
let mut migrator = Migrator::new(&new_env.path())?;
|
|
let mut new_dump_file = tempfile()?;
|
|
migrator.dump(Some("subdb"), &new_dump_file)?;
|
|
|
|
// Open the reference dump file.
|
|
let ref_dump_file_path: PathBuf = [cwd, "tests", "envs", "ref_dump_subdb.txt"].iter().collect();
|
|
let mut ref_dump_file = File::open(ref_dump_file_path)?;
|
|
|
|
// Compare the new dump file to the reference dump file.
|
|
compare_files(&mut ref_dump_file, &mut new_dump_file)?;
|
|
|
|
// Overwrite the old env's files with the new env's files and confirm
|
|
// that it's now possible to open the old env with LMDB.
|
|
fs::copy(new_env.path().join("data.mdb"), old_env.path().join("data.mdb"))?;
|
|
fs::copy(new_env.path().join("lock.mdb"), old_env.path().join("lock.mdb"))?;
|
|
assert!(Environment::new().open(&old_env.path()).is_ok());
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|