more tests and fix commit header

pull/19/head
Niko PLP 6 months ago
parent 5dac9c797a
commit e8fc8c477c
  1. 2
      p2p-net/src/broker.rs
  2. 2
      p2p-net/src/errors.rs
  3. 61
      p2p-repo/src/block.rs
  4. 1
      p2p-repo/src/branch.rs
  5. 85
      p2p-repo/src/commit.rs
  6. 22
      p2p-repo/src/errors.rs
  7. 60
      p2p-repo/src/file.rs
  8. 95
      p2p-repo/src/object.rs

@ -21,9 +21,9 @@ use either::Either;
use futures::channel::mpsc;
use futures::SinkExt;
use once_cell::sync::Lazy;
use p2p_repo::errors::ObjectParseError;
use p2p_repo::log::*;
use p2p_repo::object::Object;
use p2p_repo::object::ObjectParseError;
use p2p_repo::store::HashMapRepoStore;
use p2p_repo::types::*;
use p2p_repo::utils::generate_keypair;

@ -10,7 +10,7 @@
use core::fmt;
use num_enum::IntoPrimitive;
use num_enum::TryFromPrimitive;
use p2p_repo::object::ObjectParseError;
use p2p_repo::errors::ObjectParseError;
use p2p_repo::store::StorageError;
use std::convert::From;
use std::error::Error;

@ -11,8 +11,13 @@
//! Immutable Block
use crate::errors::*;
use crate::log::*;
use crate::types::*;
use chacha20::cipher::{KeyIvInit, StreamCipher};
use chacha20::ChaCha20;
impl BlockV0 {
pub fn new(
children: Vec<BlockId>,
@ -120,6 +125,10 @@ impl Block {
Block::V0(BlockV0::new_random_access(children, content, key))
}
pub fn new_with_encrypted_content(content: Vec<u8>, key: Option<SymKey>) -> Block {
Block::V0(BlockV0::new(vec![], None, content, key))
}
pub fn size(&self) -> usize {
serde_bare::to_vec(&self).unwrap().len()
}
@ -208,4 +217,56 @@ impl Block {
Block::V0(b) => b.key = key,
}
}
pub fn read(
&self,
key: &SymKey,
) -> Result<(Vec<(BlockId, BlockKey)>, Vec<u8>), ObjectParseError> {
match self {
Block::V0(b) => {
// decrypt content in place (this is why we have to clone first)
let mut content_dec = b.content.encrypted_content().clone();
match key {
SymKey::ChaCha20Key(key) => {
let nonce = [0u8; 12];
let mut cipher = ChaCha20::new(key.into(), &nonce.into());
let mut content_dec_slice = &mut content_dec.as_mut_slice();
cipher.apply_keystream(&mut content_dec_slice);
}
}
// deserialize content
let content: ChunkContentV0;
match serde_bare::from_slice(content_dec.as_slice()) {
Ok(c) => content = c,
Err(e) => {
log_debug!("Block deserialize error: {}", e);
return Err(ObjectParseError::BlockDeserializeError);
}
}
// parse content
match content {
ChunkContentV0::InternalNode(keys) => {
let b_children = b.children();
if keys.len() != b_children.len() {
log_debug!(
"Invalid keys length: got {}, expected {}",
keys.len(),
b_children.len()
);
log_debug!("!!! children: {:?}", b_children);
log_debug!("!!! keys: {:?}", keys);
return Err(ObjectParseError::InvalidKeys);
}
let mut children = Vec::with_capacity(b_children.len());
for (id, key) in b_children.iter().zip(keys.iter()) {
children.push((id.clone(), key.clone()));
}
Ok((children, vec![]))
}
ChunkContentV0::DataChunk(chunk) => Ok((vec![], chunk)),
}
}
}
}
}

@ -13,6 +13,7 @@ use std::collections::HashSet;
// use fastbloom_rs::{BloomFilter as Filter, Membership};
use crate::errors::*;
use crate::object::*;
use crate::store::*;
use crate::types::*;

@ -15,6 +15,8 @@ use once_cell::sync::OnceCell;
use crate::errors::NgError;
use crate::errors::*;
use crate::log::*;
use crate::object::*;
use crate::repo::Repo;
use crate::store::*;
@ -131,6 +133,7 @@ impl Commit {
) -> Result<ObjectRef, StorageError> {
match self {
Commit::V0(v0) => {
log_debug!("{:?}", v0.header);
let mut obj = Object::new(
ObjectContent::V0(ObjectContentV0::Commit(Commit::V0(v0.clone()))),
v0.header.clone(),
@ -140,7 +143,9 @@ impl Commit {
);
obj.save(store)?;
if let Some(h) = &mut v0.header {
h.set_id(obj.header().as_ref().unwrap().id().unwrap());
if let Some(id) = obj.header().as_ref().unwrap().id() {
h.set_id(*id);
}
}
self.set_id(obj.get_and_save_id());
self.set_key(obj.key().unwrap());
@ -1122,6 +1127,84 @@ mod test {
use crate::commit::*;
use crate::log::*;
fn test_commit_header_ref_content_fits(
obj_refs: Vec<BlockRef>,
metadata_size: usize,
expect_blocks_len: usize,
) {
let (priv_key, pub_key) = generate_keypair();
let seq = 3;
let obj_ref = ObjectRef::dummy();
let branch = pub_key;
let deps = obj_refs.clone();
let acks = obj_refs.clone();
let refs = obj_refs.clone();
let body_ref = obj_ref.clone();
let metadata = vec![66; metadata_size];
let mut commit = Commit::new(
priv_key,
pub_key,
seq,
branch,
QuorumType::NoSigning,
deps,
vec![],
acks.clone(),
vec![],
refs,
vec![],
metadata,
body_ref,
)
.unwrap();
log_debug!("{}", commit);
let max_object_size = 0;
let (store_repo, store_secret) = StoreRepo::dummy_public_v0();
let hashmap_storage = HashMapRepoStore::new();
let storage = Box::new(hashmap_storage);
let commit_ref = commit
.save(max_object_size, &store_repo, &store_secret, &storage)
.expect("save commit");
let commit_object = Object::load(commit_ref.id, Some(commit_ref.key), &storage)
.expect("load object from storage");
assert_eq!(
commit_object.acks(),
acks.iter().map(|a| a.id).collect::<Vec<ObjectId>>()
);
log_debug!("{}", commit_object);
log_debug!("object size: {}", commit_object.size());
assert_eq!(commit_object.all_blocks_len(), expect_blocks_len);
}
#[test]
pub fn test_commit_header_ref_content_fits_or_not() {
let obj_ref = ObjectRef::dummy();
let obj_refs2 = vec![obj_ref.clone(), obj_ref.clone()];
let obj_refs = vec![obj_ref.clone()];
// with 1 refs in header
test_commit_header_ref_content_fits(obj_refs.clone(), 3733, 2);
test_commit_header_ref_content_fits(obj_refs.clone(), 3734, 3);
test_commit_header_ref_content_fits(obj_refs.clone(), 3584, 1);
test_commit_header_ref_content_fits(obj_refs.clone(), 3585, 2);
// with 2 refs in header
test_commit_header_ref_content_fits(obj_refs2.clone(), 3352, 1);
test_commit_header_ref_content_fits(obj_refs2.clone(), 3353, 2);
test_commit_header_ref_content_fits(obj_refs2.clone(), 3601, 2);
test_commit_header_ref_content_fits(obj_refs2.clone(), 3602, 3);
}
#[test]
pub fn test_commit() {
let (priv_key, pub_key) = generate_keypair();

@ -10,6 +10,7 @@
//! Errors
use crate::commit::CommitLoadError;
use crate::types::BlockId;
use core::fmt;
use std::error::Error;
@ -52,3 +53,24 @@ impl From<CommitLoadError> for NgError {
NgError::RepoLoadError
}
}
/// Object parsing errors
#[derive(Debug)]
pub enum ObjectParseError {
/// Missing blocks
MissingBlocks(Vec<BlockId>),
/// Missing root key
MissingRootKey,
/// Invalid BlockId encountered in the tree
InvalidBlockId,
/// Too many or too few children of a block
InvalidChildren,
/// Number of keys does not match number of children of a block
InvalidKeys,
/// Invalid CommitHeader object content
InvalidHeader,
/// Error deserializing content of a block
BlockDeserializeError,
/// Error deserializing content of the object
ObjectDeserializeError,
}

@ -140,58 +140,6 @@ impl<'a> RandomAccessFile<'a> {
Ok((id, key))
}
fn read_block(
block: Block,
key: &SymKey,
) -> Result<(Vec<(BlockId, BlockKey)>, Vec<u8>), ObjectParseError> {
match block {
Block::V0(b) => {
// decrypt content in place (this is why we have to clone first)
let mut content_dec = b.content.encrypted_content().clone();
match key {
SymKey::ChaCha20Key(key) => {
let nonce = [0u8; 12];
let mut cipher = ChaCha20::new(key.into(), &nonce.into());
let mut content_dec_slice = &mut content_dec.as_mut_slice();
cipher.apply_keystream(&mut content_dec_slice);
}
}
// deserialize content
let content: ChunkContentV0;
match serde_bare::from_slice(content_dec.as_slice()) {
Ok(c) => content = c,
Err(e) => {
log_debug!("Block deserialize error: {}", e);
return Err(ObjectParseError::BlockDeserializeError);
}
}
// parse content
match content {
ChunkContentV0::InternalNode(keys) => {
let b_children = b.children();
if keys.len() != b_children.len() {
log_debug!(
"Invalid keys length: got {}, expected {}",
keys.len(),
b_children.len()
);
log_debug!("!!! children: {:?}", b_children);
log_debug!("!!! keys: {:?}", keys);
return Err(ObjectParseError::InvalidKeys);
}
let mut children = Vec::with_capacity(b_children.len());
for (id, key) in b_children.iter().zip(keys.iter()) {
children.push((id.clone(), key.clone()));
}
Ok((children, vec![]))
}
ChunkContentV0::DataChunk(chunk) => Ok((vec![], chunk)),
}
}
}
}
fn make_parent_block(
conv_key: &[u8; blake3::OUT_LEN],
children: Vec<(BlockId, BlockKey)>,
@ -525,7 +473,7 @@ impl<'a> RandomAccessFile<'a> {
return Err(FileError::BlockDeserializeError);
}
let (root_sub_blocks, _) = Self::read_block(root_block, &key)?;
let (root_sub_blocks, _) = root_block.read(&key)?;
// load meta object (first one in root block)
let meta_object = Object::load(
@ -571,7 +519,7 @@ impl<'a> RandomAccessFile<'a> {
let mut level_pos = pos;
for level in 0..depth {
let tree_block = self.storage.get(&current_block_id_key.0)?;
let (children, content) = Self::read_block(tree_block, &current_block_id_key.1)?;
let (children, content) = tree_block.read(&current_block_id_key.1)?;
if children.len() == 0 || content.len() > 0 {
return Err(FileError::BlockDeserializeError);
}
@ -588,7 +536,7 @@ impl<'a> RandomAccessFile<'a> {
let content_block = self.storage.get(&current_block_id_key.0)?;
//log_debug!("CONTENT BLOCK SIZE {}", content_block.size());
let (children, content) = Self::read_block(content_block, &current_block_id_key.1)?;
let (children, content) = content_block.read(&current_block_id_key.1)?;
if children.len() == 0 && content.len() > 0 {
//log_debug!("CONTENT SIZE {}", content.len());
@ -622,7 +570,7 @@ impl<'a> RandomAccessFile<'a> {
}
let block = &self.blocks[index];
let content_block = self.storage.get(&block.0)?;
let (children, content) = Self::read_block(content_block, &block.1)?;
let (children, content) = content_block.read(&block.1)?;
if children.len() == 0 && content.len() > 0 {
//log_debug!("CONTENT SIZE {}", content.len());

@ -12,11 +12,13 @@
//! Merkle hash tree of Objects
use core::fmt;
use std::cmp::max;
use std::collections::{HashMap, HashSet};
use chacha20::cipher::{KeyIvInit, StreamCipher};
use chacha20::ChaCha20;
use crate::errors::*;
use crate::log::*;
use crate::store::*;
use crate::types::*;
@ -56,34 +58,6 @@ pub struct Object {
already_saved: bool,
}
/// Object parsing errors
#[derive(Debug)]
pub enum ObjectParseError {
/// Missing blocks
MissingBlocks(Vec<BlockId>),
/// Missing root key
MissingRootKey,
/// Invalid BlockId encountered in the tree
InvalidBlockId,
/// Too many or too few children of a block
InvalidChildren,
/// Number of keys does not match number of children of a block
InvalidKeys,
/// Invalid CommitHeader object content
InvalidHeader,
/// Error deserializing content of a block
BlockDeserializeError,
/// Error deserializing content of the object
ObjectDeserializeError,
}
/// Object copy error
#[derive(Debug)]
pub enum ObjectCopyError {
NotFound,
ParseError,
}
impl Object {
pub(crate) fn convergence_key(
store_pubkey: &StoreRepo,
@ -313,7 +287,7 @@ impl Object {
"cannot make a new Object with header if ObjectContent type different from Commit"
);
}
log_debug!("header {:?}", header);
// create blocks by chunking + encrypting content
let valid_block_size = store_valid_value_size(block_size);
log_debug!("valid_block_size {}", valid_block_size);
@ -344,6 +318,7 @@ impl Object {
}
}
};
log_debug!("{:?} {:?}", header, header_prepare);
let content_ser = serde_bare::to_vec(&content).unwrap();
let content_len = content_ser.len();
@ -386,7 +361,7 @@ impl Object {
} else {
// chunk content and create leaf nodes
let mut i = 0;
let total = content_len / (valid_block_size - BLOCK_EXTRA);
let total = max(1, content_len / (valid_block_size - BLOCK_EXTRA));
for chunk in content_ser.chunks(valid_block_size - BLOCK_EXTRA) {
let data_chunk = ChunkContentV0::DataChunk(chunk.to_vec());
let chunk_ser = serde_bare::to_vec(&data_chunk).unwrap();
@ -419,6 +394,11 @@ impl Object {
};
if header_blocks.len() > 0 {
log_debug!(
"header_blocks.len() {} {}",
header_blocks.len(),
header_blocks.last().unwrap().id()
);
header
.as_mut()
.unwrap()
@ -508,7 +488,9 @@ impl Object {
}
}
CommitHeaderObject::EncryptedContent(content) => {
match serde_bare::from_slice(content.as_slice()) {
let (_, decrypted_content) =
Block::new_with_encrypted_content(content, None).read(&header_ref.key)?;
match serde_bare::from_slice(&decrypted_content) {
Ok(ObjectContent::V0(ObjectContentV0::CommitHeader(commit_header))) => {
(Some(commit_header), None)
}
@ -635,10 +617,22 @@ impl Object {
.map(|key| self.block_contents.get(key).unwrap())
}
pub fn all_blocks_len(&self) -> usize {
self.blocks.len() + self.header_blocks.len()
}
pub fn size(&self) -> usize {
let mut total = 0;
self.blocks().for_each(|b| total += b.size());
self.header_blocks.iter().for_each(|b| total += b.size());
self.blocks().for_each(|b| {
let s = b.size();
log_debug!("@@@@ {}", s);
total += s;
});
self.header_blocks.iter().for_each(|b| {
let s = b.size();
log_debug!("@@@@ {}", s);
total += s;
});
total
}
@ -863,6 +857,14 @@ impl fmt::Display for Object {
for block in &self.header_blocks {
writeln!(f, "========== {:03}: {}", i, block.id())?;
}
write!(
f,
"{}",
self.content().map_or_else(
|e| format!("Error on content: {:?}", e),
|c| format!("{}", c)
)
)?;
Ok(())
}
}
@ -888,26 +890,25 @@ impl ObjectContent {
impl fmt::Display for ObjectContent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let (version, content_type) = match self {
let (version, content) = match self {
Self::V0(v0) => (
"v0",
match v0 {
ObjectContentV0::Commit(_) => "Commit",
ObjectContentV0::CommitBody(_) => "CommitBody",
ObjectContentV0::CommitHeader(_) => "CommitHeader",
ObjectContentV0::Quorum(_) => "Quorum",
ObjectContentV0::Signature(_) => "Signature",
ObjectContentV0::Certificate(_) => "Certificate",
ObjectContentV0::File(_) => "File",
ObjectContentV0::RandomAccessFileMeta(_) => "RandomAccessFileMeta",
ObjectContentV0::Commit(c) => ("Commit", format!("{}", c)),
ObjectContentV0::CommitBody(c) => ("CommitBody", format!("{}", c)),
ObjectContentV0::CommitHeader(c) => ("CommitHeader", format!("{}", c)),
ObjectContentV0::Quorum(c) => ("Quorum", format!("{}", "")),
ObjectContentV0::Signature(c) => ("Signature", format!("{}", "")),
ObjectContentV0::Certificate(c) => ("Certificate", format!("{}", "")),
ObjectContentV0::File(c) => ("File", format!("{}", "")),
ObjectContentV0::RandomAccessFileMeta(c) => {
("RandomAccessFileMeta", format!("{}", ""))
}
},
),
};
writeln!(
f,
"====== ObjectContent {} {} ======",
version, content_type
)?;
writeln!(f, "====== ObjectContent {} {} ======", version, content.0)?;
write!(f, "{}", content.1)?;
Ok(())
}
}

Loading…
Cancel
Save