File common API for RandomAccessFile and SmallFile

Niko PLP 1 month ago
parent 68a4f1281e
commit ae5b5b81ec
  1. 2
      ng-app/src/lib/Test.svelte
  2. 6
      p2p-net/src/types.rs
  3. 296
      p2p-repo/src/commit.rs
  4. 311
      p2p-repo/src/file.rs
  5. 47
      p2p-repo/src/object.rs
  6. 28
      p2p-repo/src/types.rs

@ -118,7 +118,7 @@
{:then}
{#each $commits as commit}
<p>
{#await get_img(commit.V0.content.refs[0]) then url}
{#await get_img(commit.V0.header.V0.files[0]) then url}
{#if url}
<img src={url} />
{/if}

@ -1540,7 +1540,7 @@ pub enum UnsubReq {
}
/// Content of EventV0
/// Contains the objects of newly published Commit, its optional blocks, and optional refs and their blocks.
/// Contains the objects of newly published Commit, its optional blocks, and optional FILES and their blocks.
/// If a block is not present in the Event, its ID should be present in block_ids and the block should be put on the emitting broker beforehand with BlocksPut.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EventContentV0 {
@ -1555,7 +1555,7 @@ pub struct EventContentV0 {
pub seq: u64,
/// Blocks with encrypted content. First in the list is always the commit block followed by its children, then its optional header and body blocks (and eventual children),
/// blocks of the REFS are optional (only sent here if user specifically want to push them to the pub/sub).
/// blocks of the FILES are optional (only sent here if user specifically want to push them to the pub/sub).
/// the first in the list MUST contain a commit_header_key
/// When saved locally (the broker keeps the associated event, until the topic is refreshed(the last heads retain their events) ),
/// so, this `blocks` list is emptied (as the blocked are saved in the overlay storage anyway) and their IDs are kept on the side.
@ -1563,7 +1563,7 @@ pub struct EventContentV0 {
/// so that a valid EventContent can be sent (and so that its signature can be verified successfully)
pub blocks: Vec<Block>,
/// Ids of additional Blocks (REFS) with encrypted content that are not to be pushed in the pub/sub
/// Ids of additional Blocks (FILES) with encrypted content that are not to be pushed in the pub/sub
/// they will be retrieved later by interested users
pub block_ids: Vec<BlockId>,

@ -42,6 +42,7 @@ pub enum CommitLoadError {
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum CommitVerifyError {
InvalidSignature,
InvalidHeader,
PermissionDenied,
DepLoadError(CommitLoadError),
}
@ -58,12 +59,48 @@ impl CommitV0 {
ndeps: Vec<ObjectRef>,
acks: Vec<ObjectRef>,
nacks: Vec<ObjectRef>,
refs: Vec<ObjectRef>,
nrefs: Vec<ObjectRef>,
files: Vec<ObjectRef>,
nfiles: Vec<ObjectRef>,
metadata: Vec<u8>,
body: ObjectRef,
) -> Result<CommitV0, NgError> {
let headers = CommitHeader::new_with(deps, ndeps, acks, nacks, refs, nrefs);
let headers = CommitHeader::new_with(deps, ndeps, acks, nacks, files, nfiles);
let content = CommitContent::V0(CommitContentV0 {
perms: vec![],
author: (&author_pubkey).into(),
seq,
branch,
header_keys: headers.1,
quorum,
metadata,
body,
});
let content_ser = serde_bare::to_vec(&content).unwrap();
// sign commit
let sig = sign(&author_privkey, &author_pubkey, &content_ser)?;
Ok(CommitV0 {
content: content,
sig,
id: None,
key: None,
header: headers.0,
body: OnceCell::new(),
})
}
#[cfg(test)]
/// New commit with invalid header, only for test purposes
pub fn new_with_invalid_header(
author_privkey: PrivKey,
author_pubkey: PubKey,
seq: u64,
branch: BranchId,
quorum: QuorumType,
metadata: Vec<u8>,
body: ObjectRef,
) -> Result<CommitV0, NgError> {
let headers = CommitHeader::new_invalid();
let content = CommitContent::V0(CommitContentV0 {
perms: vec![],
author: (&author_pubkey).into(),
@ -101,8 +138,8 @@ impl Commit {
ndeps: Vec<ObjectRef>,
acks: Vec<ObjectRef>,
nacks: Vec<ObjectRef>,
refs: Vec<ObjectRef>,
nrefs: Vec<ObjectRef>,
files: Vec<ObjectRef>,
nfiles: Vec<ObjectRef>,
metadata: Vec<u8>,
body: ObjectRef,
) -> Result<Commit, NgError> {
@ -116,8 +153,8 @@ impl Commit {
ndeps,
acks,
nacks,
refs,
nrefs,
files,
nfiles,
metadata,
body,
)
@ -135,8 +172,8 @@ impl Commit {
ndeps: Vec<ObjectRef>,
acks: Vec<ObjectRef>,
nacks: Vec<ObjectRef>,
refs: Vec<ObjectRef>,
nrefs: Vec<ObjectRef>,
files: Vec<ObjectRef>,
nfiles: Vec<ObjectRef>,
metadata: Vec<u8>,
body: CommitBody,
block_size: usize,
@ -158,8 +195,8 @@ impl Commit {
ndeps,
acks,
nacks,
refs,
nrefs,
files,
nfiles,
metadata,
body_ref,
)
@ -463,7 +500,7 @@ impl Commit {
}
/// Get all commits that are in the direct causal past of the commit (`deps`, `acks`, `nacks`)
/// only returns objectRefs that have both an ID from header and a KEY from header_keys (it couldn't be otherwise)
/// only returns objectRefs that have both an ID from header and a KEY from header_keys (they all have a key)
pub fn direct_causal_past(&self) -> Vec<ObjectRef> {
let mut res: Vec<ObjectRef> = vec![];
match self {
@ -476,8 +513,10 @@ impl Commit {
res.push(nack.into());
}
for dep in header_v0.deps.iter().zip(hk_v0.deps.iter()) {
res.push(dep.into());
//TODO deal with deps that are also in acks. should nt be added twice
let obj_ref: ObjectRef = dep.into();
if !res.contains(&obj_ref) {
res.push(obj_ref);
}
}
}
_ => {}
@ -636,6 +675,9 @@ impl Commit {
/// Verify signature, permissions, and full causal past
pub fn verify(&self, repo: &Repo) -> Result<(), NgError> {
if !self.header().as_ref().map_or(true, |h| h.verify()) {
return Err(NgError::CommitVerifyError(CommitVerifyError::InvalidHeader));
}
self.verify_sig(repo)?;
self.verify_perm(repo)?;
self.verify_full_object_refs_of_branch_at_commit(repo.get_store())?;
@ -883,29 +925,29 @@ impl fmt::Display for CommitHeader {
v0.compact,
v0.id.map_or("None".to_string(), |i| format!("{}", i))
)?;
writeln!(f, "==== acks : {}", v0.acks.len())?;
writeln!(f, "==== acks : {}", v0.acks.len())?;
for ack in &v0.acks {
writeln!(f, "============== {}", ack)?;
}
writeln!(f, "==== nacks : {}", v0.nacks.len())?;
writeln!(f, "==== nacks : {}", v0.nacks.len())?;
for nack in &v0.nacks {
writeln!(f, "============== {}", nack)?;
}
writeln!(f, "==== deps : {}", v0.deps.len())?;
writeln!(f, "==== deps : {}", v0.deps.len())?;
for dep in &v0.deps {
writeln!(f, "============== {}", dep)?;
}
writeln!(f, "==== ndeps : {}", v0.ndeps.len())?;
writeln!(f, "==== ndeps : {}", v0.ndeps.len())?;
for ndep in &v0.ndeps {
writeln!(f, "============== {}", ndep)?;
}
writeln!(f, "==== refs : {}", v0.refs.len())?;
for rref in &v0.refs {
writeln!(f, "============== {}", rref)?;
writeln!(f, "==== files : {}", v0.files.len())?;
for file in &v0.files {
writeln!(f, "============== {}", file)?;
}
writeln!(f, "==== nrefs : {}", v0.nrefs.len())?;
for nref in &v0.nrefs {
writeln!(f, "============== {}", nref)?;
writeln!(f, "==== nfiles : {}", v0.nfiles.len())?;
for nfile in &v0.nfiles {
writeln!(f, "============== {}", nfile)?;
}
Ok(())
}
@ -956,29 +998,47 @@ impl CommitHeader {
}
}
pub fn verify(&self) -> bool {
match self {
CommitHeader::V0(v0) => v0.verify(),
}
}
pub fn new_with(
deps: Vec<ObjectRef>,
ndeps: Vec<ObjectRef>,
acks: Vec<ObjectRef>,
nacks: Vec<ObjectRef>,
refs: Vec<ObjectRef>,
nrefs: Vec<ObjectRef>,
files: Vec<ObjectRef>,
nfiles: Vec<ObjectRef>,
) -> (Option<Self>, Option<CommitHeaderKeys>) {
let res = CommitHeaderV0::new_with(deps, ndeps, acks, nacks, refs, nrefs);
let res = CommitHeaderV0::new_with(deps, ndeps, acks, nacks, files, nfiles);
(
res.0.map(|h| CommitHeader::V0(h)),
res.1.map(|h| CommitHeaderKeys::V0(h)),
)
}
#[cfg(test)]
pub fn new_invalid() -> (Option<Self>, Option<CommitHeaderKeys>) {
let res = CommitHeaderV0::new_invalid();
(
res.0.map(|h| CommitHeader::V0(h)),
res.1.map(|h| CommitHeaderKeys::V0(h)),
)
}
#[cfg(test)]
pub fn new_with_deps(deps: Vec<ObjectId>) -> Option<Self> {
CommitHeaderV0::new_with_deps(deps).map(|ch| CommitHeader::V0(ch))
}
#[cfg(test)]
pub fn new_with_deps_and_acks(deps: Vec<ObjectId>, acks: Vec<ObjectId>) -> Option<Self> {
CommitHeaderV0::new_with_deps_and_acks(deps, acks).map(|ch| CommitHeader::V0(ch))
}
#[cfg(test)]
pub fn new_with_acks(acks: Vec<ObjectId>) -> Option<Self> {
CommitHeaderV0::new_with_acks(acks).map(|ch| CommitHeader::V0(ch))
}
@ -993,11 +1053,62 @@ impl CommitHeaderV0 {
ndeps: vec![],
acks: vec![],
nacks: vec![],
refs: vec![],
nrefs: vec![],
files: vec![],
nfiles: vec![],
}
}
#[cfg(test)]
fn new_invalid() -> (Option<Self>, Option<CommitHeaderKeysV0>) {
let ideps: Vec<ObjectId> = vec![ObjectId::dummy()];
let kdeps: Vec<ObjectKey> = vec![ObjectKey::dummy()];
let res = Self {
id: None,
compact: false,
deps: ideps.clone(),
ndeps: ideps,
acks: vec![],
nacks: vec![],
files: vec![],
nfiles: vec![],
};
(
Some(res),
Some(CommitHeaderKeysV0 {
deps: kdeps,
acks: vec![],
nacks: vec![],
files: vec![],
}),
)
}
pub fn verify(&self) -> bool {
if !self.deps.is_empty() && !self.ndeps.is_empty() {
for ndep in self.ndeps.iter() {
if self.deps.contains(ndep) {
return false;
}
}
}
if !self.acks.is_empty() && !self.nacks.is_empty() {
for nack in self.nacks.iter() {
if self.acks.contains(nack) {
return false;
}
}
}
if !self.files.is_empty() && !self.nfiles.is_empty() {
for nref in self.nfiles.iter() {
if self.files.contains(nref) {
return false;
}
}
}
true
}
pub fn set_compact(&mut self) {
self.compact = true;
}
@ -1007,15 +1118,15 @@ impl CommitHeaderV0 {
ndeps: Vec<ObjectRef>,
acks: Vec<ObjectRef>,
nacks: Vec<ObjectRef>,
refs: Vec<ObjectRef>,
nrefs: Vec<ObjectRef>,
files: Vec<ObjectRef>,
nfiles: Vec<ObjectRef>,
) -> (Option<Self>, Option<CommitHeaderKeysV0>) {
if deps.is_empty()
&& ndeps.is_empty()
&& acks.is_empty()
&& nacks.is_empty()
&& refs.is_empty()
&& nrefs.is_empty()
&& files.is_empty()
&& nfiles.is_empty()
{
(None, None)
} else {
@ -1023,8 +1134,8 @@ impl CommitHeaderV0 {
let mut indeps: Vec<ObjectId> = vec![];
let mut iacks: Vec<ObjectId> = vec![];
let mut inacks: Vec<ObjectId> = vec![];
let mut irefs: Vec<ObjectId> = vec![];
let mut inrefs: Vec<ObjectId> = vec![];
let mut ifiles: Vec<ObjectId> = vec![];
let mut infiles: Vec<ObjectId> = vec![];
let mut kdeps: Vec<ObjectKey> = vec![];
let mut kacks: Vec<ObjectKey> = vec![];
@ -1044,32 +1155,38 @@ impl CommitHeaderV0 {
inacks.push(d.id);
knacks.push(d.key);
}
for d in refs.clone() {
irefs.push(d.id);
for d in files.clone() {
ifiles.push(d.id);
}
for d in nfiles {
infiles.push(d.id);
}
for d in nrefs {
inrefs.push(d.id);
let res = Self {
id: None,
compact: false,
deps: ideps,
ndeps: indeps,
acks: iacks,
nacks: inacks,
files: ifiles,
nfiles: infiles,
};
if !res.verify() {
panic!("cannot create a header with conflicting references");
}
(
Some(Self {
id: None,
compact: false,
deps: ideps,
ndeps: indeps,
acks: iacks,
nacks: inacks,
refs: irefs,
nrefs: inrefs,
}),
Some(res),
Some(CommitHeaderKeysV0 {
deps: kdeps,
acks: kacks,
nacks: knacks,
refs,
files,
}),
)
}
}
#[cfg(test)]
pub fn new_with_deps(deps: Vec<ObjectId>) -> Option<Self> {
assert!(!deps.is_empty());
let mut n = Self::new_empty();
@ -1077,6 +1194,7 @@ impl CommitHeaderV0 {
Some(n)
}
#[cfg(test)]
pub fn new_with_deps_and_acks(deps: Vec<ObjectId>, acks: Vec<ObjectId>) -> Option<Self> {
assert!(!deps.is_empty() || !acks.is_empty());
let mut n = Self::new_empty();
@ -1085,6 +1203,7 @@ impl CommitHeaderV0 {
Some(n)
}
#[cfg(test)]
pub fn new_with_acks(acks: Vec<ObjectId>) -> Option<Self> {
assert!(!acks.is_empty());
let mut n = Self::new_empty();
@ -1208,21 +1327,21 @@ impl fmt::Display for CommitHeaderKeys {
match self {
Self::V0(v0) => {
writeln!(f, "=== CommitHeaderKeys V0 ===")?;
writeln!(f, "==== acks : {}", v0.acks.len())?;
writeln!(f, "==== acks : {}", v0.acks.len())?;
for ack in &v0.acks {
writeln!(f, "============== {}", ack)?;
}
writeln!(f, "==== nacks : {}", v0.nacks.len())?;
writeln!(f, "==== nacks : {}", v0.nacks.len())?;
for nack in &v0.nacks {
writeln!(f, "============== {}", nack)?;
}
writeln!(f, "==== deps : {}", v0.deps.len())?;
writeln!(f, "==== deps : {}", v0.deps.len())?;
for dep in &v0.deps {
writeln!(f, "============== {}", dep)?;
}
writeln!(f, "==== refs : {}", v0.refs.len())?;
for rref in &v0.refs {
writeln!(f, "============== {}", rref)?;
writeln!(f, "==== files : {}", v0.files.len())?;
for file in &v0.files {
writeln!(f, "============== {}", file)?;
}
}
}
@ -1262,7 +1381,7 @@ mod test {
let branch = pub_key;
let deps = obj_refs.clone();
let acks = obj_refs.clone();
let refs = obj_refs.clone();
let files = obj_refs.clone();
let body_ref = obj_ref.clone();
let metadata = vec![66; metadata_size];
@ -1277,7 +1396,7 @@ mod test {
vec![],
acks.clone(),
vec![],
refs,
files,
vec![],
metadata,
body_ref,
@ -1338,12 +1457,12 @@ mod test {
#[test]
pub fn test_load_commit_fails_on_non_commit_object() {
let file = File::V0(FileV0 {
let file = SmallFile::V0(SmallFileV0 {
content_type: "file/test".into(),
metadata: Vec::from("some meta data here"),
content: [(0..255).collect::<Vec<u8>>().as_slice(); 320].concat(),
});
let content = ObjectContent::V0(ObjectContentV0::File(file));
let content = ObjectContent::V0(ObjectContentV0::SmallFile(file));
let max_object_size = 0;
@ -1377,7 +1496,7 @@ mod test {
let obj_refs = vec![obj_ref.clone()];
let deps = obj_refs.clone();
let acks = obj_refs.clone();
let refs = obj_refs.clone();
let files = obj_refs.clone();
let metadata = Vec::from("some metadata");
@ -1404,7 +1523,7 @@ mod test {
vec![],
acks.clone(),
vec![],
refs,
files,
vec![],
metadata,
body,
@ -1434,7 +1553,7 @@ mod test {
let branch = pub_key;
let deps = obj_refs.clone();
let acks = obj_refs.clone();
let refs = obj_refs.clone();
let files = obj_refs.clone();
let metadata = vec![1, 2, 3];
let body_ref = obj_ref.clone();
@ -1448,7 +1567,7 @@ mod test {
vec![],
acks,
vec![],
refs,
files,
vec![],
metadata,
body_ref,
@ -1502,10 +1621,6 @@ mod test {
let obj_ref = ObjectRef::dummy();
let branch = pub_key;
let obj_refs = vec![obj_ref.clone()];
let deps = obj_refs.clone();
let acks = obj_refs.clone();
let refs = obj_refs.clone();
let metadata = Vec::from("some metadata");
@ -1520,7 +1635,7 @@ mod test {
let (store_repo, store_secret) = StoreRepo::dummy_public_v0();
let hashmap_storage = HashMapRepoStore::new();
let storage = Box::new(hashmap_storage);
let t = Test::storage(hashmap_storage);
let commit = Commit::new_with_body_and_save(
priv_key,
@ -1539,15 +1654,12 @@ mod test {
max_object_size,
&store_repo,
&store_secret,
&storage,
t.s(),
)
.expect("commit::new_with_body_and_save");
log_debug!("{}", commit);
let hashmap_storage = HashMapRepoStore::new();
let t = Test::storage(hashmap_storage);
let repo = Repo::new_with_member(&pub_key, &pub_key, &[PermissionV0::Create], t.s());
commit.load_body(repo.get_store()).expect("load body");
@ -1564,4 +1676,42 @@ mod test {
commit.verify(&repo).expect("verify");
}
#[test]
pub fn test_load_commit_with_invalid_header() {
let (priv_key, pub_key) = generate_keypair();
let seq = 3;
let obj_ref = ObjectRef::dummy();
let branch = pub_key;
let metadata = Vec::from("some metadata");
//let max_object_size = 0;
//let (store_repo, store_secret) = StoreRepo::dummy_public_v0();
let commit = Commit::V0(
CommitV0::new_with_invalid_header(
priv_key,
pub_key,
seq,
branch,
QuorumType::NoSigning,
metadata,
obj_ref,
)
.expect("commit::new_with_invalid_header"),
);
log_debug!("{}", commit);
let hashmap_storage = HashMapRepoStore::new();
let t = Test::storage(hashmap_storage);
let repo = Repo::new_with_member(&pub_key, &pub_key, &[PermissionV0::Create], t.s());
assert_eq!(
commit.verify(&repo),
Err(NgError::CommitVerifyError(CommitVerifyError::InvalidHeader))
);
}
}

@ -49,6 +49,7 @@ pub enum FileError {
StorageError,
EndOfFile,
InvalidArgument,
NotAFile,
}
impl From<StorageError> for FileError {
@ -68,6 +69,67 @@ impl From<ObjectParseError> for FileError {
}
}
trait ReadFile {
fn read(&self, pos: usize, size: usize) -> Result<Vec<u8>, FileError>;
}
/// A File in memory (read access only)
pub struct File<'a> {
internal: Box<dyn ReadFile + 'a>,
}
impl<'a> File<'a> {
pub fn open(
id: ObjectId,
key: SymKey,
storage: &'a Box<dyn RepoStore + Send + Sync + 'a>,
) -> Result<File<'a>, FileError> {
let root_block = storage.get(&id)?;
if root_block.children().len() == 2
&& *root_block.content().commit_header_obj() == CommitHeaderObject::RandomAccess
{
Ok(File {
internal: Box::new(RandomAccessFile::open(id, key, storage)?),
})
} else {
let obj = Object::load(id, Some(key), storage)?;
match obj.content_v0()? {
ObjectContentV0::SmallFile(small_file) => Ok(File {
internal: Box::new(small_file),
}),
_ => Err(FileError::NotAFile),
}
}
}
}
impl<'a> ReadFile for File<'a> {
fn read(&self, pos: usize, size: usize) -> Result<Vec<u8>, FileError> {
self.internal.read(pos, size)
}
}
impl ReadFile for SmallFile {
fn read(&self, pos: usize, size: usize) -> Result<Vec<u8>, FileError> {
match self {
Self::V0(v0) => v0.read(pos, size),
}
}
}
impl ReadFile for SmallFileV0 {
fn read(&self, pos: usize, size: usize) -> Result<Vec<u8>, FileError> {
if size == 0 {
return Err(FileError::InvalidArgument);
}
if pos + size > self.content.len() {
return Err(FileError::EndOfFile);
}
Ok(self.content[pos..pos + size].to_vec())
}
}
/// A RandomAccessFile in memory. This is not used to serialize data
pub struct RandomAccessFile<'a> {
//storage: Arc<&'a dyn RepoStore>,
@ -95,6 +157,92 @@ pub struct RandomAccessFile<'a> {
size: usize,
}
impl<'a> ReadFile for RandomAccessFile<'a> {
/// reads at most one block from the file. the returned vector should be tested for size. it might be smaller than what you asked for.
/// `pos`ition can be anywhere in the file.
fn read(&self, pos: usize, size: usize) -> Result<Vec<u8>, FileError> {
if size == 0 {
return Err(FileError::InvalidArgument);
}
if self.id.is_some() {
if pos + size > self.meta.total_size() as usize {
return Err(FileError::EndOfFile);
}
let mut current_block_id_key = self.content_block.as_ref().unwrap().clone();
let depth = self.meta.depth();
let arity = self.meta.arity();
let mut level_pos = pos;
for level in 0..depth {
let tree_block = self.storage.get(&current_block_id_key.0)?;
let (children, content) = tree_block.read(&current_block_id_key.1)?;
if children.len() == 0 || content.len() > 0 {
return Err(FileError::BlockDeserializeError);
}
let factor = (arity as usize).pow(depth as u32 - level as u32 - 1)
* self.meta.chunk_size() as usize;
let level_index = pos / factor;
if level_index >= children.len() {
return Err(FileError::EndOfFile);
}
current_block_id_key = (children[level_index]).clone();
level_pos = pos as usize % factor;
}
let content_block = self.storage.get(&current_block_id_key.0)?;
//log_debug!("CONTENT BLOCK SIZE {}", content_block.size());
let (children, content) = content_block.read(&current_block_id_key.1)?;
if children.len() == 0 && content.len() > 0 {
//log_debug!("CONTENT SIZE {}", content.len());
if level_pos >= content.len() {
return Err(FileError::EndOfFile);
}
let end = min(content.len(), level_pos + size);
return Ok(content[level_pos..end].to_vec());
} else {
return Err(FileError::BlockDeserializeError);
}
} else {
// hasn't been saved yet, we can use the self.blocks as a flat array and the remainder too
let factor = self.meta.chunk_size() as usize;
let index = pos / factor as usize;
let level_pos = pos % factor as usize;
let remainder_pos = self.blocks.len() * factor;
if pos >= remainder_pos {
let pos_in_remainder = pos - remainder_pos;
if self.remainder.len() > 0 && pos_in_remainder < self.remainder.len() {
let end = min(self.remainder.len(), pos_in_remainder + size);
return Ok(self.remainder[pos_in_remainder..end].to_vec());
} else {
return Err(FileError::EndOfFile);
}
}
//log_debug!("{} {} {} {}", index, self.blocks.len(), factor, level_pos);
if index >= self.blocks.len() {
return Err(FileError::EndOfFile);
}
let block = &self.blocks[index];
let content_block = self.storage.get(&block.0)?;
let (children, content) = content_block.read(&block.1)?;
if children.len() == 0 && content.len() > 0 {
//log_debug!("CONTENT SIZE {}", content.len());
if level_pos >= content.len() {
return Err(FileError::EndOfFile);
}
let end = min(content.len(), level_pos + size);
return Ok(content[level_pos..end].to_vec());
} else {
return Err(FileError::BlockDeserializeError);
}
}
}
}
impl<'a> RandomAccessFile<'a> {
pub fn meta(&self) -> &RandomAccessFileMeta {
&self.meta
@ -501,90 +649,6 @@ impl<'a> RandomAccessFile<'a> {
})
}
/// reads at most one block from the file. the returned vector should be tested for size. it might be smaller than what you asked for.
/// `pos`ition can be anywhere in the file.
pub fn read(&self, pos: usize, size: usize) -> Result<Vec<u8>, FileError> {
if size == 0 {
return Err(FileError::InvalidArgument);
}
if self.id.is_some() {
if pos + size > self.meta.total_size() as usize {
return Err(FileError::EndOfFile);
}
let mut current_block_id_key = self.content_block.as_ref().unwrap().clone();
let depth = self.meta.depth();
let arity = self.meta.arity();
let mut level_pos = pos;
for level in 0..depth {
let tree_block = self.storage.get(&current_block_id_key.0)?;
let (children, content) = tree_block.read(&current_block_id_key.1)?;
if children.len() == 0 || content.len() > 0 {
return Err(FileError::BlockDeserializeError);
}
let factor = (arity as usize).pow(depth as u32 - level as u32 - 1)
* self.meta.chunk_size() as usize;
let level_index = pos / factor;
if level_index >= children.len() {
return Err(FileError::EndOfFile);
}
current_block_id_key = (children[level_index]).clone();
level_pos = pos as usize % factor;
}
let content_block = self.storage.get(&current_block_id_key.0)?;
//log_debug!("CONTENT BLOCK SIZE {}", content_block.size());
let (children, content) = content_block.read(&current_block_id_key.1)?;
if children.len() == 0 && content.len() > 0 {
//log_debug!("CONTENT SIZE {}", content.len());
if level_pos >= content.len() {
return Err(FileError::EndOfFile);
}
let end = min(content.len(), level_pos + size);
return Ok(content[level_pos..end].to_vec());
} else {
return Err(FileError::BlockDeserializeError);
}
} else {
// hasn't been saved yet, we can use the self.blocks as a flat array and the remainder too
let factor = self.meta.chunk_size() as usize;
let index = pos / factor as usize;
let level_pos = pos % factor as usize;
let remainder_pos = self.blocks.len() * factor;
if pos >= remainder_pos {
let pos_in_remainder = pos - remainder_pos;
if self.remainder.len() > 0 && pos_in_remainder < self.remainder.len() {
let end = min(self.remainder.len(), pos_in_remainder + size);
return Ok(self.remainder[pos_in_remainder..end].to_vec());
} else {
return Err(FileError::EndOfFile);
}
}
//log_debug!("{} {} {} {}", index, self.blocks.len(), factor, level_pos);
if index >= self.blocks.len() {
return Err(FileError::EndOfFile);
}
let block = &self.blocks[index];
let content_block = self.storage.get(&block.0)?;
let (children, content) = content_block.read(&block.1)?;
if children.len() == 0 && content.len() > 0 {
//log_debug!("CONTENT SIZE {}", content.len());
if level_pos >= content.len() {
return Err(FileError::EndOfFile);
}
let end = min(content.len(), level_pos + size);
return Ok(content[level_pos..end].to_vec());
} else {
return Err(FileError::BlockDeserializeError);
}
}
}
pub fn blocks(&self) -> impl Iterator<Item = Block> + '_ {
self.blocks
.iter()
@ -1375,6 +1439,85 @@ mod test {
assert_eq!(file2.read(29454, 0), Err(FileError::InvalidArgument));
}
/// Test read JPEG file small
#[test]
pub fn test_read_small_file() {
let f = std::fs::File::open("tests/test.jpg").expect("open of tests/test.jpg");
let mut reader = BufReader::new(f);
let mut img_buffer: Vec<u8> = Vec::new();
reader
.read_to_end(&mut img_buffer)
.expect("read of test.jpg");
let len = img_buffer.len();
let content = ObjectContent::new_file_v0_with_content(img_buffer.clone(), "image/jpeg");
let max_object_size = store_max_value_size();
let (store_repo, store_secret) = StoreRepo::dummy_public_v0();
let mut obj = Object::new(content, None, max_object_size, &store_repo, &store_secret);
log_debug!("{}", obj);
let hashmap_storage = HashMapRepoStore::new();
let t = Test::storage(hashmap_storage);
let _ = obj.save_in_test(t.s()).expect("save");
let file = File::open(obj.id(), obj.key().unwrap(), t.s()).expect("open");
let res = file.read(0, len).expect("read all");
assert_eq!(res, img_buffer);
}
/// Test read JPEG file random access
#[test]
pub fn test_read_random_access_file() {
let f = std::fs::File::open("tests/test.jpg").expect("open of tests/test.jpg");
let mut reader = BufReader::new(f);
let mut img_buffer: Vec<u8> = Vec::new();
reader
.read_to_end(&mut img_buffer)
.expect("read of test.jpg");
let len = img_buffer.len();
let max_object_size = store_max_value_size();
let (store_repo, store_secret) = StoreRepo::dummy_public_v0();
let hashmap_storage = HashMapRepoStore::new();
let t = Test::storage(hashmap_storage);
log_debug!("creating empty file");
let mut file: RandomAccessFile = RandomAccessFile::new_empty(
max_object_size,
"image/jpeg".to_string(),
vec![],
&store_repo,
&store_secret,
t.s(),
);
file.write(&img_buffer).expect("write all");
log_debug!("{}", file);
file.save().expect("save");
log_debug!("{}", file);
let file = File::open(
file.id().unwrap(),
file.key().as_ref().unwrap().clone(),
t.s(),
)
.expect("open");
// this only works because we chose a big block size (1MB) so the small JPG file files in one block.
// if not, we would have to call read repeatedly and append the results into a buffer, in order to get the full file
let res = file.read(0, len).expect("read all");
assert_eq!(res, img_buffer);
}
/// Test depth 4, but using write in increments, so the memory burden on the system will be minimal
#[test]
pub fn test_depth_4_big_write_small() {

@ -880,7 +880,7 @@ impl ObjectContent {
}
pub fn new_file_v0_with_content(content: Vec<u8>, content_type: &str) -> Self {
ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 {
ObjectContent::V0(ObjectContentV0::SmallFile(SmallFile::V0(SmallFileV0 {
content_type: content_type.into(),
metadata: vec![],
content,
@ -900,7 +900,7 @@ impl fmt::Display for ObjectContent {
ObjectContentV0::Quorum(c) => ("Quorum", format!("{}", "")),
ObjectContentV0::Signature(c) => ("Signature", format!("{}", "")),
ObjectContentV0::Certificate(c) => ("Certificate", format!("{}", "")),
ObjectContentV0::File(c) => ("File", format!("{}", "")),
ObjectContentV0::SmallFile(c) => ("SmallFile", format!("{}", "")),
ObjectContentV0::RandomAccessFileMeta(c) => {
("RandomAccessFileMeta", format!("{}", ""))
}
@ -941,12 +941,12 @@ mod test {
#[test]
#[should_panic]
pub fn test_no_header() {
let file = File::V0(FileV0 {
let file = SmallFile::V0(SmallFileV0 {
content_type: "image/jpeg".into(),
metadata: vec![],
content: vec![],
});
let content = ObjectContent::V0(ObjectContentV0::File(file));
let content = ObjectContent::V0(ObjectContentV0::SmallFile(file));
let (store_repo, store_secret) = StoreRepo::dummy_public_v0();
let header = CommitHeader::new_with_acks([ObjectId::dummy()].to_vec());
let _obj = Object::new(
@ -990,12 +990,12 @@ mod test {
/// Test tree API
#[test]
pub fn test_object() {
let file = File::V0(FileV0 {
let file = SmallFile::V0(SmallFileV0 {
content_type: "file/test".into(),
metadata: Vec::from("some meta data here"),
content: [(0..255).collect::<Vec<u8>>().as_slice(); 320].concat(),
});
let content = ObjectContent::V0(ObjectContentV0::File(file));
let content = ObjectContent::V0(ObjectContentV0::SmallFile(file));
let acks = vec![];
//let header = CommitHeader::new_with_acks(acks.clone());
@ -1058,15 +1058,16 @@ mod test {
pub fn test_depth_0() {
let (store_repo, store_secret) = StoreRepo::dummy_public_v0();
let empty_file = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 {
content_type: "".into(),
metadata: vec![],
content: vec![],
})));
let empty_file =
ObjectContent::V0(ObjectContentV0::SmallFile(SmallFile::V0(SmallFileV0 {
content_type: "".into(),
metadata: vec![],
content: vec![],
})));
let content_ser = serde_bare::to_vec(&empty_file).unwrap();
log_debug!("content len for empty : {}", content_ser.len());
// let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 {
// let content = ObjectContent::V0(ObjectContentV0::SmallFile(SmallFile::V0(SmallFileV0 {
// content_type: "".into(),
// metadata: vec![],
// content: vec![99; 1000],
@ -1074,7 +1075,7 @@ mod test {
// let content_ser = serde_bare::to_vec(&content).unwrap();
// log_debug!("content len for 1000 : {}", content_ser.len());
// let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 {
// let content = ObjectContent::V0(ObjectContentV0::SmallFile(SmallFile::V0(SmallFileV0 {
// content_type: "".into(),
// metadata: vec![],
// content: vec![99; 1048554],
@ -1082,7 +1083,7 @@ mod test {
// let content_ser = serde_bare::to_vec(&content).unwrap();
// log_debug!("content len for 1048554 : {}", content_ser.len());
// let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 {
// let content = ObjectContent::V0(ObjectContentV0::SmallFile(SmallFile::V0(SmallFileV0 {
// content_type: "".into(),
// metadata: vec![],
// content: vec![99; 1550000],
@ -1090,7 +1091,7 @@ mod test {
// let content_ser = serde_bare::to_vec(&content).unwrap();
// log_debug!("content len for 1550000 : {}", content_ser.len());
// let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 {
// let content = ObjectContent::V0(ObjectContentV0::SmallFile(SmallFile::V0(SmallFileV0 {
// content_type: "".into(),
// metadata: vec![],
// content: vec![99; 1550000000],
@ -1098,7 +1099,7 @@ mod test {
// let content_ser = serde_bare::to_vec(&content).unwrap();
// log_debug!("content len for 1550000000 : {}", content_ser.len());
// let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 {
// let content = ObjectContent::V0(ObjectContentV0::SmallFile(SmallFile::V0(SmallFileV0 {
// content_type: "".into(),
// metadata: vec![99; 1000],
// content: vec![99; 1000],
@ -1106,7 +1107,7 @@ mod test {
// let content_ser = serde_bare::to_vec(&content).unwrap();
// log_debug!("content len for 1000+1000: {}", content_ser.len());
// let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 {
// let content = ObjectContent::V0(ObjectContentV0::SmallFile(SmallFile::V0(SmallFileV0 {
// content_type: "".into(),
// metadata: vec![99; 1000],
// content: vec![99; 524277],
@ -1114,7 +1115,7 @@ mod test {
// let content_ser = serde_bare::to_vec(&content).unwrap();
// log_debug!("content len for 1000+524277: {}", content_ser.len());
// let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 {
// let content = ObjectContent::V0(ObjectContentV0::SmallFile(SmallFile::V0(SmallFileV0 {
// content_type: "".into(),
// metadata: vec![99; 524277],
// content: vec![99; 524277],
@ -1137,7 +1138,7 @@ mod test {
store_max_value_size() - empty_file_size - BLOCK_MAX_DATA_EXTRA - BIG_VARINT_EXTRA;
log_debug!("full file content size: {}", size);
let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 {
let content = ObjectContent::V0(ObjectContentV0::SmallFile(SmallFile::V0(SmallFileV0 {
content_type: "".into(),
metadata: vec![],
content: vec![99; size],
@ -1173,7 +1174,7 @@ mod test {
let (store_repo, store_secret) = StoreRepo::dummy_public_v0();
log_debug!("creating 16GB of data");
let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 {
let content = ObjectContent::V0(ObjectContentV0::SmallFile(SmallFile::V0(SmallFileV0 {
content_type: "".into(),
metadata: vec![],
content: vec![99; data_size],
@ -1216,7 +1217,7 @@ mod test {
let (store_repo, store_secret) = StoreRepo::dummy_public_v0();
log_debug!("creating 16GB of data");
let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 {
let content = ObjectContent::V0(ObjectContentV0::SmallFile(SmallFile::V0(SmallFileV0 {
content_type: "".into(),
metadata: vec![],
content: vec![99; data_size],
@ -1260,7 +1261,7 @@ mod test {
let (store_repo, store_secret) = StoreRepo::dummy_public_v0();
log_debug!("creating 900MB of data");
let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 {
let content = ObjectContent::V0(ObjectContentV0::SmallFile(SmallFile::V0(SmallFileV0 {
content_type: "".into(),
metadata: vec![],
content: vec![99; data_size],
@ -1318,7 +1319,7 @@ mod test {
let (store_repo, store_secret) = StoreRepo::dummy_public_v0();
log_debug!("creating 52GB of data");
let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 {
let content = ObjectContent::V0(ObjectContentV0::SmallFile(SmallFile::V0(SmallFileV0 {
content_type: "".into(),
metadata: vec![],
content: vec![99; data_size],

@ -627,7 +627,7 @@ pub struct CommitHeaderV0 {
#[serde(skip)]
pub id: Option<ObjectId>,
/// Other objects this commit strongly depends on (ex: ADD for a REMOVE, refs for an nrefs)
/// Other objects this commit strongly depends on (ex: ADD for a REMOVE, files for an nfiles)
pub deps: Vec<ObjectId>,
/// dependency that is removed after this commit. used for reverts
@ -645,11 +645,11 @@ pub struct CommitHeaderV0 {
pub nacks: Vec<ObjectId>,
/// list of Files that are referenced in this commit
pub refs: Vec<ObjectId>,
pub files: Vec<ObjectId>,
/// list of Files that are not referenced anymore after this commit
/// the commit(s) that created the refs should be in deps
pub nrefs: Vec<ObjectId>,
/// the commit(s) that created the files should be in deps
pub nfiles: Vec<ObjectId>,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
@ -659,7 +659,7 @@ pub enum CommitHeader {
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct CommitHeaderKeysV0 {
/// Other objects this commit strongly depends on (ex: ADD for a REMOVE, refs for an nrefs)
/// Other objects this commit strongly depends on (ex: ADD for a REMOVE, files for an nfiles)
pub deps: Vec<ObjectKey>,
// ndeps keys are not included because we don't need the keys to access the commits we will not need anymore
@ -672,8 +672,8 @@ pub struct CommitHeaderKeysV0 {
/// list of Files that are referenced in this commit. Exceptionally this is an ObjectRef, because
/// even if the CommitHeader is omitted, we want the Files to be openable.
pub refs: Vec<ObjectRef>,
// nrefs keys are not included because we don't need the keys to access the files we will not need anymore
pub files: Vec<ObjectRef>,
// nfiles keys are not included because we don't need the keys to access the files we will not need anymore
// the keys are in the deps of the respective commits that added them anyway
}
@ -1268,7 +1268,7 @@ pub enum Transaction {
}
/// Add a new binary file in a branch
/// REFS: the file ObjectRef
/// FILES: the file ObjectRef
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct AddFileV0 {
/// an optional name. does not conflict (not unique across the branch nor repo)
@ -1286,7 +1286,7 @@ pub enum AddFile {
/// Remove a file from the branch, using ORset CRDT logic
/// (removes the ref counting. not necessarily the file itself)
/// NREFS: the file ObjectRef
/// NFILES: the file ObjectRef
/// DEPS: all the visible AddFile commits in the branch (ORset)
pub type RemoveFileV0 = ();
@ -1602,7 +1602,7 @@ pub struct CommitContentV0 {
/// optional list of dependencies on some commits in the root branch that contain the write permission needed for this commit
pub perms: Vec<ObjectId>,
/// Keys to be able to open all the references (deps, acks, refs, etc...)
/// Keys to be able to open all the references (deps, acks, files, etc...)
pub header_keys: Option<CommitHeaderKeys>,
/// This commit can only be accepted if signed by this quorum
@ -1671,7 +1671,7 @@ pub enum Commit {
/// File Object
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct FileV0 {
pub struct SmallFileV0 {
pub content_type: String,
#[serde(with = "serde_bytes")]
@ -1683,8 +1683,8 @@ pub struct FileV0 {
/// A file stored in an Object
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum File {
V0(FileV0),
pub enum SmallFile {
V0(SmallFileV0),
}
/// Random Access File Object
@ -1773,7 +1773,7 @@ pub enum ObjectContentV0 {
Quorum(Quorum),
Signature(Signature),
Certificate(Certificate),
File(File),
SmallFile(SmallFile),
RandomAccessFileMeta(RandomAccessFileMeta),
}

Loading…
Cancel
Save