From 9af14b47c099d1635d1a5fa55967fd892aa49e1f Mon Sep 17 00:00:00 2001 From: Niko PLP Date: Tue, 19 Mar 2024 08:39:12 +0200 Subject: [PATCH] refactor object/block DAG, CommitHeader, sync_req, test, rocksdb integration --- Cargo.lock | 102 +-- Cargo.toml | 1 - README.md | 9 +- ng-wallet/src/lib.rs | 2 +- ng-wallet/src/types.rs | 2 +- ngaccount/Cargo.toml | 1 - ngaccount/src/main.rs | 1 - ngd/src/main.rs | 4 +- p2p-broker/src/server.rs | 2 - p2p-net/src/errors.rs | 2 - p2p-net/src/types.rs | 16 +- p2p-net/src/utils.rs | 2 +- p2p-repo/src/block.rs | 4 + p2p-repo/src/branch.rs | 173 ++-- p2p-repo/src/commit.rs | 422 ++++++++-- p2p-repo/src/errors.rs | 2 - p2p-repo/src/object.rs | 1350 +++++++++++++++++++++++------- p2p-repo/src/store.rs | 27 +- p2p-repo/src/types.rs | 325 ++++--- stores-rocksdb/src/lib.rs | 4 +- stores-rocksdb/src/repo_store.rs | 162 ++-- 21 files changed, 1788 insertions(+), 825 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ee0e6a1..b921ab0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -538,15 +538,6 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" -[[package]] -name = "bincode" -version = "1.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" -dependencies = [ - "serde", -] - [[package]] name = "bindgen" version = "0.65.1" @@ -610,7 +601,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83fc15853171b33280f5614e77f5fa4debd33f51a86c44daa4ba3d759674c561" dependencies = [ "base64 0.13.1", - "uuid 1.3.4", + "uuid", ] [[package]] @@ -790,7 +781,7 @@ checksum = "d38f2da7a0a2c4ccf0065be06397cc26a81f4e528be095826eee9d4adbb8c60f" dependencies = [ "byteorder", "fnv", - "uuid 1.3.4", + "uuid", ] [[package]] @@ -2474,12 +2465,6 @@ dependencies = [ "png", ] -[[package]] -name = "id-arena" -version = "2.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25a2bc672d1148e28034f176e01fffebb08b35768468cc954630da77a1449005" - [[package]] name = "ident_case" version = "1.0.1" @@ -2825,27 +2810,6 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" -[[package]] -name = "lmdb-crypto-rs" -version = "0.14.0" -source = "git+https://git.nextgraph.org/NextGraph/lmdb-rs.git?branch=master#e70c3cae74e0d71b6a4e5c81b0c4909210a6d808" -dependencies = [ - "bitflags", - "byteorder", - "libc", - "lmdb-crypto-sys", -] - -[[package]] -name = "lmdb-crypto-sys" -version = "0.11.2" -source = "git+https://git.nextgraph.org/NextGraph/lmdb-rs.git?branch=master#e70c3cae74e0d71b6a4e5c81b0c4909210a6d808" -dependencies = [ - "cc", - "libc", - "pkg-config", -] - [[package]] name = "lock_api" version = "0.4.10" @@ -3233,7 +3197,6 @@ dependencies = [ "serde_bare", "serde_bytes", "serde_json", - "stores-lmdb", "tokio", "warp", "warp-embed", @@ -3551,15 +3514,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "ordered-float" -version = "3.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fc2dbde8f8a79f2102cc474ceb0ad68e3b80b85289ea62389b60e66777e4213" -dependencies = [ - "num-traits", -] - [[package]] name = "ordered-stream" version = "0.2.0" @@ -4390,28 +4344,6 @@ dependencies = [ "winreg 0.10.1", ] -[[package]] -name = "rkv" -version = "0.18.0" -source = "git+https://git.nextgraph.org/NextGraph/rkv.git?rev=c746abb443b7bb4541ebbef2b71e8d0f9eb39f6a#c746abb443b7bb4541ebbef2b71e8d0f9eb39f6a" -dependencies = [ - "arrayref", - "bincode", - "bitflags", - "byteorder", - "id-arena", - "lazy_static", - "lmdb-crypto-rs", - "log", - "ordered-float", - "paste", - "serde", - "serde_derive", - "thiserror", - "url", - "uuid 0.8.2", -] - [[package]] name = "rocksdb" version = "0.21.0" @@ -4965,18 +4897,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" -[[package]] -name = "stores-lmdb" -version = "0.1.0" -dependencies = [ - "hex", - "p2p-repo", - "rkv", - "serde", - "serde_bare", - "tempfile", -] - [[package]] name = "stores-rocksdb" version = "0.1.0" @@ -5148,7 +5068,7 @@ dependencies = [ "tao-macros", "unicode-segmentation", "url", - "uuid 1.3.4", + "uuid", "windows", "windows-implement", "x11-dl", @@ -5215,7 +5135,7 @@ dependencies = [ "tokio", "tray-icon", "url", - "uuid 1.3.4", + "uuid", "webkit2gtk", "webview2-com", "windows", @@ -5263,7 +5183,7 @@ dependencies = [ "thiserror", "time 0.3.23", "url", - "uuid 1.3.4", + "uuid", "walkdir", ] @@ -5308,7 +5228,7 @@ dependencies = [ "tauri-utils", "thiserror", "url", - "uuid 1.3.4", + "uuid", "windows", ] @@ -5327,7 +5247,7 @@ dependencies = [ "raw-window-handle", "tauri-runtime", "tauri-utils", - "uuid 1.3.4", + "uuid", "webkit2gtk", "webview2-com", "windows", @@ -5860,7 +5780,7 @@ checksum = "ae605c39dfbdec433798d4a8b03ffbac711dc51cdeb1ba5c725bdcaf24e464cc" dependencies = [ "blob-uuid", "lazy_static", - "uuid 1.3.4", + "uuid", ] [[package]] @@ -5897,12 +5817,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" -[[package]] -name = "uuid" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" - [[package]] name = "uuid" version = "1.3.4" diff --git a/Cargo.toml b/Cargo.toml index 24a4373..3b95229 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,6 @@ members = [ "p2p-broker", "p2p-client-ws", "p2p-verifier", - "stores-lmdb", "stores-rocksdb", "ngcli", "ngd", diff --git a/README.md b/README.md index aa1ea32..5554500 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,6 @@ The crates are organized as follow : - p2p-broker : the broker code (as server and core node) - p2p-client-ws : the client connecting to a broker with WebSocket, used by the apps and verifier - p2p-verifier : the code of the verifier -- stores-lmdb : lmdb backed stores (not used anymore) - stores-rocksdb : RocksDB backed stores. see [repo here](https://git.nextgraph.org/NextGraph/rust-rocksdb) - ngcli : CLI tool to manipulate the repos and administrate the server - ngd : binary executable of the daemon (that can run a broker, verifier and/or Rust services) @@ -82,19 +81,19 @@ For the web apps, see the [README](ng-app/README.md) Test all: ``` -cargo test --all --verbose -- --nocapture +cargo test --all --verbose -- --show-output --nocapture ``` Test a single module: ``` -cargo test --package p2p-repo --lib -- branch::test --nocapture +cargo test --package p2p-repo --lib -- branch::test --show-output --nocapture ``` Test end-to-end client and server: ``` -cargo test --package ngcli -- --nocapture +cargo test --package ngcli -- --show-output --nocapture ``` Test WASM websocket @@ -107,7 +106,7 @@ wasm-pack test --chrome --headless Test Rust websocket ``` -cargo test --package p2p-client-ws --lib -- remote_ws::test::test_ws --nocapture +cargo test --package p2p-client-ws --lib -- remote_ws::test::test_ws --show-output --nocapture ``` ### Build release binaries diff --git a/ng-wallet/src/lib.rs b/ng-wallet/src/lib.rs index 3679788..0d1c025 100644 --- a/ng-wallet/src/lib.rs +++ b/ng-wallet/src/lib.rs @@ -499,7 +499,7 @@ pub async fn connect_wallet( } let brokers = broker.unwrap(); let mut tried: Option<(String, String, String, Option, f64)> = None; - //TODO: on tauri (or forward in local broker, or CLI), prefer a BoxPublic to a Domain. Domain always comes first though, so we need to reorder the list + //TODO: on tauri (or forward in local broker, or CLI), prefer a Public to a Domain. Domain always comes first though, so we need to reorder the list //TODO: use site.bootstraps to order the list of brokerInfo. for broker_info in brokers { match broker_info { diff --git a/ng-wallet/src/types.rs b/ng-wallet/src/types.rs index 803f9e8..801e873 100644 --- a/ng-wallet/src/types.rs +++ b/ng-wallet/src/types.rs @@ -255,7 +255,7 @@ impl EncryptedWalletV0 { // Creating a new client let client = ClientV0::new_with_auto_open(self.personal_site); self.add_client(client.clone()); - let mut log = self.log.as_mut().unwrap(); + let log = self.log.as_mut().unwrap(); log.add(WalletOperation::SetClientV0(client.clone())); let (peer_id, nonce) = session.get_first_user_peer_nonce()?; Ok(( diff --git a/ngaccount/Cargo.toml b/ngaccount/Cargo.toml index 8115819..8f97b3b 100644 --- a/ngaccount/Cargo.toml +++ b/ngaccount/Cargo.toml @@ -14,7 +14,6 @@ warp-embed = "0.4" rust-embed = "6" log = "0.4" env_logger = "0.10" -stores-lmdb = { path = "../stores-lmdb" } p2p-repo = { path = "../p2p-repo", features = ["server_log_output"] } p2p-net = { path = "../p2p-net" } p2p-client-ws = { path = "../p2p-client-ws" } diff --git a/ngaccount/src/main.rs b/ngaccount/src/main.rs index be0ee36..d41020c 100644 --- a/ngaccount/src/main.rs +++ b/ngaccount/src/main.rs @@ -15,7 +15,6 @@ use duration_str::parse; use p2p_client_ws::remote_ws::ConnectionWebSocket; use p2p_net::actors::add_invitation::*; use p2p_net::broker::BROKER; -use p2p_repo::store::StorageError; use serde::{Deserialize, Serialize}; use warp::http::header::{HeaderMap, HeaderValue}; use warp::reply::Response; diff --git a/ngd/src/main.rs b/ngd/src/main.rs index ba043aa..9f85472 100644 --- a/ngd/src/main.rs +++ b/ngd/src/main.rs @@ -592,7 +592,7 @@ async fn main_inner() -> Result<(), ()> { } // --core - // core listeners always come after the domain ones, which is good as the first bootstrap in the list should be the domain (if there is also a core_with_clients that generates a BoxPublic bootstrap) + // core listeners always come after the domain ones, which is good as the first bootstrap in the list should be the domain (if there is also a core_with_clients that generates a Public bootstrap) if args.core.is_some() { let arg_value = parse_interface_and_port_for(args.core.as_ref().unwrap(), "--core", DEFAULT_PORT)?; @@ -924,7 +924,7 @@ async fn main_inner() -> Result<(), ()> { if is_private_ip(&bind.0) { BrokerServerTypeV0::BoxPrivate(vec![bind_addr]) } else if is_public_ip(&bind.0) { - BrokerServerTypeV0::BoxPublic(vec![bind_addr]) + BrokerServerTypeV0::Public(vec![bind_addr]) } else { log_err!("Invalid IP address given for --forward option. cannot start"); return Err(()); diff --git a/p2p-broker/src/server.rs b/p2p-broker/src/server.rs index b551f37..b926d40 100644 --- a/p2p-broker/src/server.rs +++ b/p2p-broker/src/server.rs @@ -36,8 +36,6 @@ use p2p_repo::store::RepoStore; use p2p_repo::store::StorageError; use p2p_repo::types::*; use p2p_repo::utils::*; -use stores_lmdb::broker_store::LmdbKCVStore; -use stores_lmdb::repo_store::LmdbRepoStore; #[derive(Debug, Eq, PartialEq, Clone)] pub enum BrokerError { diff --git a/p2p-net/src/errors.rs b/p2p-net/src/errors.rs index 4826d75..069fd60 100644 --- a/p2p-net/src/errors.rs +++ b/p2p-net/src/errors.rs @@ -1,7 +1,5 @@ // Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers // All rights reserved. -// This code is partly derived from work written by TG x Thoth from P2Pcollab. -// Copyright 2022 TG x Thoth // Licensed under the Apache License, Version 2.0 // // or the MIT license , diff --git a/p2p-net/src/types.rs b/p2p-net/src/types.rs index c1c64c4..7ad11b6 100644 --- a/p2p-net/src/types.rs +++ b/p2p-net/src/types.rs @@ -1,7 +1,5 @@ // Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers // All rights reserved. -// This code is partly derived from work written by TG x Thoth from P2Pcollab. -// Copyright 2022 TG x Thoth // Licensed under the Apache License, Version 2.0 // // or the MIT license , @@ -135,7 +133,7 @@ pub enum BrokerCore { pub enum BrokerServerTypeV0 { Localhost(u16), // optional port number BoxPrivate(Vec), - BoxPublic(Vec), + Public(Vec), BoxPublicDyn(Vec), // can be empty Domain(String), // accepts an optional trailing ":port" number //Core(Vec), @@ -279,7 +277,7 @@ impl BrokerServerV0 { /// set ipv6 only if the browser connected with a remote IPV6. always set ipv4 as a fallback (for now). pub async fn get_url_for_ngone(&self, ipv4: bool, ipv6: bool) -> Option { match &self.server_type { - BrokerServerTypeV0::BoxPublic(addrs) => { + BrokerServerTypeV0::Public(addrs) => { Self::app_ng_one_bootstrap_url_with_first_ipv6_or_ipv4( ipv4, ipv6, @@ -338,7 +336,7 @@ impl BrokerServerV0 { match &self.server_type { BrokerServerTypeV0::Localhost(_) => false, BrokerServerTypeV0::BoxPrivate(_) => false, - BrokerServerTypeV0::BoxPublic(_) => true, + BrokerServerTypeV0::Public(_) => true, BrokerServerTypeV0::BoxPublicDyn(_) => true, BrokerServerTypeV0::Domain(_) => true, } @@ -356,7 +354,7 @@ impl BrokerServerV0 { let location = location.as_ref().unwrap(); if location.starts_with(APP_NG_ONE_URL) { match &self.server_type { - BrokerServerTypeV0::BoxPublic(addrs) => { + BrokerServerTypeV0::Public(addrs) => { Some((APP_NG_ONE_WS_URL.to_string(), addrs.clone())) } BrokerServerTypeV0::BoxPublicDyn(addrs) => { @@ -425,7 +423,7 @@ impl BrokerServerV0 { //BrokerServerTypeV0::Core(_) => None, BrokerServerTypeV0::Localhost(port) => Some((local_ws_url(port), vec![])), BrokerServerTypeV0::BoxPrivate(addrs) => Some((String::new(), addrs.clone())), - BrokerServerTypeV0::BoxPublic(addrs) => Some((String::new(), addrs.clone())), + BrokerServerTypeV0::Public(addrs) => Some((String::new(), addrs.clone())), BrokerServerTypeV0::BoxPublicDyn(addrs) => { // let resp = reqwest::get(api_dyn_peer_url(&self.peer_id)).await; // if resp.is_ok() { @@ -1041,7 +1039,7 @@ impl ListenerV0 { let pub_addrs = self.accept_forward_for.get_public_bind_addresses(); //res.push(BrokerServerTypeV0::Core(pub_addrs.clone())); if !self.refuse_clients { - res.push(BrokerServerTypeV0::BoxPublic(pub_addrs)); + res.push(BrokerServerTypeV0::Public(pub_addrs)); } if self.accept_direct { res.push(BrokerServerTypeV0::BoxPrivate(addrs)); @@ -1083,7 +1081,7 @@ impl ListenerV0 { } else if self.if_type == InterfaceType::Public { //res.push(BrokerServerTypeV0::Core(addrs.clone())); if !self.refuse_clients { - res.push(BrokerServerTypeV0::BoxPublic(addrs)); + res.push(BrokerServerTypeV0::Public(addrs)); } } else if self.if_type == InterfaceType::Private { res.push(BrokerServerTypeV0::BoxPrivate(addrs)); diff --git a/p2p-net/src/utils.rs b/p2p-net/src/utils.rs index 4a5a511..0862f1c 100644 --- a/p2p-net/src/utils.rs +++ b/p2p-net/src/utils.rs @@ -67,7 +67,7 @@ pub fn decode_invitation_string(string: String) -> Option { pub fn check_is_local_url(bootstrap: &BrokerServerV0, location: &String) -> Option { if location.starts_with(APP_NG_ONE_URL) { match &bootstrap.server_type { - BrokerServerTypeV0::BoxPublic(_) | BrokerServerTypeV0::BoxPublicDyn(_) => { + BrokerServerTypeV0::Public(_) | BrokerServerTypeV0::BoxPublicDyn(_) => { return Some(APP_NG_ONE_WS_URL.to_string()); } _ => {} diff --git a/p2p-repo/src/block.rs b/p2p-repo/src/block.rs index 43dca77..10d1861 100644 --- a/p2p-repo/src/block.rs +++ b/p2p-repo/src/block.rs @@ -92,6 +92,10 @@ impl Block { Block::V0(BlockV0::new(children, header_ref, content, key)) } + pub fn size(&self) -> usize { + serde_bare::to_vec(&self).unwrap().len() + } + /// Compute the ID pub fn compute_id(&self) -> BlockId { match self { diff --git a/p2p-repo/src/branch.rs b/p2p-repo/src/branch.rs index 465e779..798973e 100644 --- a/p2p-repo/src/branch.rs +++ b/p2p-repo/src/branch.rs @@ -1,7 +1,5 @@ // Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers // All rights reserved. -// This code is partly derived from work written by TG x Thoth from P2Pcollab. -// Copyright 2022 TG x Thoth // Licensed under the Apache License, Version 2.0 // // or the MIT license , @@ -60,114 +58,97 @@ impl Branch { } /// Branch sync request from another peer + /// `target_heads` represents the list of heads the requester would like to reach. this list should not be empty. + /// if the requester doesn't know what to reach, the responder should fill this list with their own current local head. + /// `known_heads` represents the list of current heads at the requester replica at the moment of request. + /// an empty list means the requester has an empty branch locally /// /// Return ObjectIds to send pub fn sync_req( - our_heads: &[ObjectId], - their_heads: &[ObjectId], - their_filter: &BloomFilter, + target_heads: &[ObjectId], + known_heads: &[ObjectId], + //their_filter: &BloomFilter, store: &Box, ) -> Result, ObjectParseError> { //log_debug!(">> sync_req"); - //log_debug!(" our_heads: {:?}", our_heads); - //log_debug!(" their_heads: {:?}", their_heads); + //log_debug!(" target_heads: {:?}", target_heads); + //log_debug!(" known_heads: {:?}", known_heads); - /// Load `Commit` `Object`s of a `Branch` from the `RepoStore` starting from the given `Object`, - /// and collect `ObjectId`s starting from `our_heads` towards `their_heads` - fn load_branch( + /// Load causal past of a Commit `cobj` in a `Branch` from the `RepoStore`, + /// and collect in `visited` the ObjectIds encountered on the way, stopping at any commit already belonging to `theirs` or the root of DAG. + /// optionally collecting the missing objects/blocks that couldn't be found locally on the way + fn load_causal_past( cobj: &Object, store: &Box, - their_heads: &[ObjectId], + theirs: &HashSet, visited: &mut HashSet, - missing: &mut HashSet, - ) -> Result { + missing: &mut Option<&mut HashSet>, + ) -> Result<(), ObjectParseError> { let id = cobj.id(); - //log_debug!(">>> load_branch: {}", id); - // root has no acks - let is_root = cobj.is_root(); - //log_debug!(" acks: {:?}", cobj.acks()); - - // check if this commit object is present in their_heads - let mut their_head_found = their_heads.contains(&id); - - // load deps, stop at the root or if this is a commit object from their_heads - if !is_root && !their_head_found { + // check if this commit object is present in theirs or has already been visited in the current walk + // load deps, stop at the root(including it in visited) or if this is a commit object from known_heads + if !theirs.contains(&id) && !visited.contains(&id) { visited.insert(id); - for id in cobj.deps() { + for id in cobj.acks_and_nacks() { match Object::load(id, None, store) { Ok(o) => { - if !visited.contains(&id) { - if load_branch(&o, store, their_heads, visited, missing)? { - their_head_found = true; - } - } + load_causal_past(&o, store, theirs, visited, missing)?; } - Err(ObjectParseError::MissingBlocks(m)) => { - missing.extend(m); + Err(ObjectParseError::MissingBlocks(blocks)) => { + missing.as_mut().map(|m| m.extend(blocks)); } Err(e) => return Err(e), } } } - Ok(their_head_found) + Ok(()) } - // missing commits from our branch - let mut missing = HashSet::new(); - // our commits - let mut ours = HashSet::new(); // their commits let mut theirs = HashSet::new(); - // collect all commits reachable from our_heads - // up to the root or until encountering a commit from their_heads - for id in our_heads { - let cobj = Object::load(*id, None, store)?; - let mut visited = HashSet::new(); - let their_head_found = - load_branch(&cobj, store, their_heads, &mut visited, &mut missing)?; - //log_debug!("<<< load_branch: {}", their_head_found); - ours.extend(visited); // add if one of their_heads found + // collect causal past of known_heads + for id in known_heads { + if let Ok(cobj) = Object::load(*id, None, store) { + load_causal_past(&cobj, store, &HashSet::new(), &mut theirs, &mut None)?; + } + // we silently discard any load error on the known_heads as the responder might not know them (yet). } - // collect all commits reachable from their_heads - for id in their_heads { - let cobj = Object::load(*id, None, store)?; - let mut visited = HashSet::new(); - let their_head_found = load_branch(&cobj, store, &[], &mut visited, &mut missing)?; - //log_debug!("<<< load_branch: {}", their_head_found); - theirs.extend(visited); // add if one of their_heads found + let mut visited = HashSet::new(); + // collect all commits reachable from target_heads + // up to the root or until encountering a commit from theirs + for id in target_heads { + if let Ok(cobj) = Object::load(*id, None, store) { + load_causal_past(&cobj, store, &theirs, &mut visited, &mut None)?; + } + // we silently discard any load error on the target_heads as they can be wrong if the requester is confused about what the responder has locally. } - let mut result = &ours - &theirs; - //log_debug!("!! ours: {:?}", ours); //log_debug!("!! theirs: {:?}", theirs); - //log_debug!("!! result: {:?}", result); // remove their_commits from result - let filter = Filter::from_u8_array(their_filter.f.as_slice(), their_filter.k.into()); - for id in result.clone() { - match id { - Digest::Blake3Digest32(d) => { - if filter.contains(&d) { - result.remove(&id); - } - } - } - } + // let filter = Filter::from_u8_array(their_filter.f.as_slice(), their_filter.k.into()); + // for id in result.clone() { + // match id { + // Digest::Blake3Digest32(d) => { + // if filter.contains(&d) { + // result.remove(&id); + // } + // } + // } + // } //log_debug!("!! result filtered: {:?}", result); - Ok(Vec::from_iter(result)) + Ok(Vec::from_iter(visited)) } } mod test { use std::collections::HashMap; - use ed25519_dalek::*; - use fastbloom_rs::{BloomFilter as Filter, FilterBuilder, Membership}; - use rand::rngs::OsRng; + //use fastbloom_rs::{BloomFilter as Filter, FilterBuilder, Membership}; use crate::branch::*; use crate::commit::*; @@ -175,6 +156,7 @@ mod test { use crate::repo; use crate::repo::Repo; use crate::store::*; + use crate::utils::*; #[test] pub fn test_branch() { @@ -286,36 +268,17 @@ mod test { } let store = Box::new(HashMapRepoStore::new()); - let mut rng = OsRng {}; // repo - let repo_keypair: Keypair = Keypair::generate(&mut rng); - log_debug!( - "repo private key: ({}) {:?}", - repo_keypair.secret.as_bytes().len(), - repo_keypair.secret.as_bytes() - ); - log_debug!( - "repo public key: ({}) {:?}", - repo_keypair.public.as_bytes().len(), - repo_keypair.public.as_bytes() - ); - let repo_privkey = PrivKey::Ed25519PrivKey(repo_keypair.secret.to_bytes()); - let repo_pubkey = PubKey::Ed25519PubKey(repo_keypair.public.to_bytes()); - let repo_secret = SymKey::ChaCha20Key([9; 32]); - let store_repo = StoreRepo::V0(StoreRepoV0::PublicStore(repo_pubkey)); + let (repo_privkey, repo_pubkey) = generate_keypair(); + let (store_repo, repo_secret) = StoreRepo::dummy_public_v0(); // branch - let branch_keypair: Keypair = Keypair::generate(&mut rng); - log_debug!("branch public key: {:?}", branch_keypair.public.as_bytes()); - let branch_pubkey = PubKey::Ed25519PubKey(branch_keypair.public.to_bytes()); + let (branch_privkey, branch_pubkey) = generate_keypair(); - let member_keypair: Keypair = Keypair::generate(&mut rng); - log_debug!("member public key: {:?}", member_keypair.public.as_bytes()); - let member_privkey = PrivKey::Ed25519PrivKey(member_keypair.secret.to_bytes()); - let member_pubkey = PubKey::Ed25519PubKey(member_keypair.public.to_bytes()); + let (member_privkey, member_pubkey) = generate_keypair(); let metadata = [66u8; 64].to_vec(); @@ -481,28 +444,28 @@ mod test { let mut c7 = Commit::load(a7.clone(), repo.get_store(), true).unwrap(); c7.verify(&repo).unwrap(); - let mut filter = Filter::new(FilterBuilder::new(10, 0.01)); - for commit_ref in [br, t1, t2, t5.clone(), a6.clone()] { - match commit_ref.id { - ObjectId::Blake3Digest32(d) => filter.add(&d), - } - } - let cfg = filter.config(); - let their_commits = BloomFilter { - k: cfg.hashes, - f: filter.get_u8_array().to_vec(), - }; + // let mut filter = Filter::new(FilterBuilder::new(10, 0.01)); + // for commit_ref in [br, t1, t2, t5.clone(), a6.clone()] { + // match commit_ref.id { + // ObjectId::Blake3Digest32(d) => filter.add(&d), + // } + // } + // let cfg = filter.config(); + // let their_commits = BloomFilter { + // k: cfg.hashes, + // f: filter.get_u8_array().to_vec(), + // }; print_branch(); log_debug!(">> sync_req"); log_debug!(" our_heads: [a3, t5, a6, a7]"); - log_debug!(" their_heads: [a3, t5]"); + log_debug!(" known_heads: [a3, t5]"); log_debug!(" their_commits: [br, t1, t2, a3, t5, a6]"); let ids = Branch::sync_req( &[t5.id, a6.id, a7.id], &[t5.id], - &their_commits, + //&their_commits, repo.get_store(), ) .unwrap(); diff --git a/p2p-repo/src/commit.rs b/p2p-repo/src/commit.rs index 864a8c1..4a05782 100644 --- a/p2p-repo/src/commit.rs +++ b/p2p-repo/src/commit.rs @@ -1,7 +1,5 @@ // Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers // All rights reserved. -// This code is partly derived from work written by TG x Thoth from P2Pcollab. -// Copyright 2022 TG x Thoth // Licensed under the Apache License, Version 2.0 // // or the MIT license , @@ -11,7 +9,8 @@ //! Commit -use ed25519_dalek::*; +use core::fmt; +//use ed25519_dalek::*; use once_cell::sync::OnceCell; use crate::errors::NgError; @@ -20,6 +19,7 @@ use crate::object::*; use crate::repo::Repo; use crate::store::*; use crate::types::*; +use crate::utils::*; use std::collections::HashSet; use std::iter::FromIterator; @@ -60,7 +60,7 @@ impl CommitV0 { nrefs: Vec, metadata: Vec, body: ObjectRef, - ) -> Result { + ) -> Result { let headers = CommitHeader::new_with(deps, ndeps, acks, nacks, refs, nrefs); let content = CommitContentV0 { perms: vec![], @@ -75,17 +75,7 @@ impl CommitV0 { let content_ser = serde_bare::to_vec(&content).unwrap(); // sign commit - let kp = match (author_privkey, author_pubkey) { - (PrivKey::Ed25519PrivKey(sk), PubKey::Ed25519PubKey(pk)) => [sk, pk].concat(), - (_, _) => panic!("cannot sign with Montgomery key"), - }; - let keypair = Keypair::from_bytes(kp.as_slice())?; - let sig_bytes = keypair.sign(content_ser.as_slice()).to_bytes(); - let mut it = sig_bytes.chunks_exact(32); - let mut ss: Ed25519Sig = [[0; 32], [0; 32]]; - ss[0].copy_from_slice(it.next().unwrap()); - ss[1].copy_from_slice(it.next().unwrap()); - let sig = Sig::Ed25519Sig(ss); + let sig = sign(&author_privkey, &author_pubkey, &content_ser)?; Ok(CommitV0 { content: CommitContent::V0(content), sig, @@ -113,7 +103,7 @@ impl Commit { nrefs: Vec, metadata: Vec, body: ObjectRef, - ) -> Result { + ) -> Result { CommitV0::new( author_privkey, author_pubkey, @@ -297,6 +287,13 @@ impl Commit { } } + /// Get commit content + pub fn content(&self) -> &CommitContent { + match self { + Commit::V0(CommitV0 { content: c, .. }) => c, + } + } + pub fn body(&self) -> Option<&CommitBody> { match self { Commit::V0(c) => c.body.get(), @@ -432,7 +429,7 @@ impl Commit { } /// Verify commit signature - pub fn verify_sig(&self) -> Result<(), SignatureError> { + pub fn verify_sig(&self) -> Result<(), NgError> { let c = match self { Commit::V0(c) => c, }; @@ -767,40 +764,377 @@ impl CommitBody { } } +impl fmt::Display for CommitHeader { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + CommitHeader::V0(v0) => { + writeln!( + f, + "v0 - compact:{} id:{}", + v0.compact, + v0.id.map_or("None".to_string(), |i| format!("{}", i)) + )?; + writeln!(f, "==== acks : {}", v0.acks.len())?; + for ack in &v0.acks { + writeln!(f, "============== {}", ack)?; + } + writeln!(f, "==== nacks : {}", v0.nacks.len())?; + for nack in &v0.nacks { + writeln!(f, "============== {}", nack)?; + } + writeln!(f, "==== deps : {}", v0.deps.len())?; + for dep in &v0.deps { + writeln!(f, "============== {}", dep)?; + } + writeln!(f, "==== ndeps : {}", v0.ndeps.len())?; + for ndep in &v0.ndeps { + writeln!(f, "============== {}", ndep)?; + } + writeln!(f, "==== refs : {}", v0.refs.len())?; + for rref in &v0.refs { + writeln!(f, "============== {}", rref)?; + } + writeln!(f, "==== nrefs : {}", v0.nrefs.len())?; + for nref in &v0.nrefs { + writeln!(f, "============== {}", nref)?; + } + Ok(()) + } + } + } +} + +impl CommitHeader { + pub fn is_root(&self) -> bool { + match self { + CommitHeader::V0(v0) => v0.is_root(), + } + } + pub fn deps(&self) -> Vec { + match self { + CommitHeader::V0(v0) => v0.deps.clone(), + } + } + pub fn acks(&self) -> Vec { + match self { + CommitHeader::V0(v0) => v0.acks.clone(), + } + } + pub fn acks_and_nacks(&self) -> Vec { + match self { + CommitHeader::V0(v0) => { + let mut res = v0.acks.clone(); + res.extend_from_slice(&v0.nacks); + res + } + } + } + pub fn id(&self) -> &Option { + match self { + CommitHeader::V0(v0) => &v0.id, + } + } + + pub fn set_id(&mut self, id: Digest) { + match self { + CommitHeader::V0(v0) => v0.id = Some(id), + } + } + + pub fn set_compact(&mut self) { + match self { + CommitHeader::V0(v0) => v0.set_compact(), + } + } + + pub fn new_with( + deps: Vec, + ndeps: Vec, + acks: Vec, + nacks: Vec, + refs: Vec, + nrefs: Vec, + ) -> (Option, Option) { + let res = CommitHeaderV0::new_with(deps, ndeps, acks, nacks, refs, nrefs); + ( + res.0.map(|h| CommitHeader::V0(h)), + res.1.map(|h| CommitHeaderKeys::V0(h)), + ) + } + + pub fn new_with_deps(deps: Vec) -> Option { + CommitHeaderV0::new_with_deps(deps).map(|ch| CommitHeader::V0(ch)) + } + + pub fn new_with_deps_and_acks(deps: Vec, acks: Vec) -> Option { + CommitHeaderV0::new_with_deps_and_acks(deps, acks).map(|ch| CommitHeader::V0(ch)) + } + + pub fn new_with_acks(acks: Vec) -> Option { + CommitHeaderV0::new_with_acks(acks).map(|ch| CommitHeader::V0(ch)) + } +} + +impl CommitHeaderV0 { + fn new_empty() -> Self { + Self { + id: None, + compact: false, + deps: vec![], + ndeps: vec![], + acks: vec![], + nacks: vec![], + refs: vec![], + nrefs: vec![], + } + } + + pub fn set_compact(&mut self) { + self.compact = true; + } + + pub fn new_with( + deps: Vec, + ndeps: Vec, + acks: Vec, + nacks: Vec, + refs: Vec, + nrefs: Vec, + ) -> (Option, Option) { + if deps.is_empty() + && ndeps.is_empty() + && acks.is_empty() + && nacks.is_empty() + && refs.is_empty() + && nrefs.is_empty() + { + (None, None) + } else { + let mut ideps: Vec = vec![]; + let mut indeps: Vec = vec![]; + let mut iacks: Vec = vec![]; + let mut inacks: Vec = vec![]; + let mut irefs: Vec = vec![]; + let mut inrefs: Vec = vec![]; + + let mut kdeps: Vec = vec![]; + let mut kacks: Vec = vec![]; + let mut knacks: Vec = vec![]; + for d in deps { + ideps.push(d.id); + kdeps.push(d.key); + } + for d in ndeps { + indeps.push(d.id); + } + for d in acks { + iacks.push(d.id); + kacks.push(d.key); + } + for d in nacks { + inacks.push(d.id); + knacks.push(d.key); + } + for d in refs.clone() { + irefs.push(d.id); + } + for d in nrefs { + inrefs.push(d.id); + } + ( + Some(Self { + id: None, + compact: false, + deps: ideps, + ndeps: indeps, + acks: iacks, + nacks: inacks, + refs: irefs, + nrefs: inrefs, + }), + Some(CommitHeaderKeysV0 { + deps: kdeps, + acks: kacks, + nacks: knacks, + refs, + }), + ) + } + } + pub fn new_with_deps(deps: Vec) -> Option { + assert!(!deps.is_empty()); + let mut n = Self::new_empty(); + n.deps = deps; + Some(n) + } + + pub fn new_with_deps_and_acks(deps: Vec, acks: Vec) -> Option { + assert!(!deps.is_empty() || !acks.is_empty()); + let mut n = Self::new_empty(); + n.deps = deps; + n.acks = acks; + Some(n) + } + + pub fn new_with_acks(acks: Vec) -> Option { + assert!(!acks.is_empty()); + let mut n = Self::new_empty(); + n.acks = acks; + Some(n) + } + + /// we do not check the deps because in a forked branch, they point to previous branch heads. + pub fn is_root(&self) -> bool { + //self.deps.is_empty() + // && self.ndeps.is_empty() + self.acks.is_empty() && self.nacks.is_empty() + } +} + +impl fmt::Display for Commit { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::V0(v0) => { + writeln!(f, "====== Commit V0 ======")?; + if v0.id.is_some() { + writeln!(f, "== ID: {}", v0.id.as_ref().unwrap())?; + } + if v0.key.is_some() { + writeln!(f, "== Key: {}", v0.key.as_ref().unwrap())?; + } + if v0.header.is_some() { + write!(f, "== Header: {}", v0.header.as_ref().unwrap())?; + } + writeln!(f, "== Sig: {}", v0.sig)?; + write!(f, "{}", v0.content)?; + if v0.body.get().is_some() { + writeln!(f, "== Body: {}", v0.body.get().unwrap())?; + } + } + } + Ok(()) + } +} + +impl fmt::Display for CommitBody { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::V0(v0) => { + write!(f, "V0 ")?; + match v0 { + // + // for root branch: + // + CommitBodyV0::Repository(b) => writeln!(f, "Repository {}", b), + CommitBodyV0::RootBranch(b) => writeln!(f, "RootBranch {}", b), + _ => unimplemented!(), + /*UpdateRootBranch(RootBranch), // total order enforced with total_order_quorum + AddMember(AddMember), // total order enforced with total_order_quorum + RemoveMember(RemoveMember), // total order enforced with total_order_quorum + AddPermission(AddPermission), + RemovePermission(RemovePermission), + AddBranch(AddBranch), + ChangeMainBranch(ChangeMainBranch), + RemoveBranch(RemoveBranch), + AddName(AddName), + RemoveName(RemoveName), + // TODO? Quorum(Quorum), // changes the quorum without changing the RootBranch + + // + // For transactional branches: + // + Branch(Branch), // singleton and should be first in branch + UpdateBranch(Branch), // total order enforced with total_order_quorum + Snapshot(Snapshot), // a soft snapshot + AsyncTransaction(Transaction), // partial_order + SyncTransaction(Transaction), // total_order + AddFile(AddFile), + RemoveFile(RemoveFile), + Compact(Compact), // a hard snapshot. total order enforced with total_order_quorum + //Merge(Merge), + //Revert(Revert), // only possible on partial order commit + AsyncSignature(AsyncSignature), + + // + // For both + // + RefreshReadCap(RefreshReadCap), + RefreshWriteCap(RefreshWriteCap), + SyncSignature(SyncSignature),*/ + } + } + } + } +} + +impl fmt::Display for CommitContent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::V0(v0) => { + writeln!(f, "=== CommitContent V0 ===")?; + writeln!(f, "====== author: {}", v0.author)?; + writeln!(f, "====== seq: {}", v0.seq)?; + writeln!(f, "====== BranchID: {}", v0.branch)?; + writeln!(f, "====== quorum: {:?}", v0.quorum)?; + writeln!(f, "====== Ref body: {}", v0.body)?; + if v0.header_keys.is_none() { + writeln!(f, "====== header keys: None")?; + } else { + write!(f, "{}", v0.header_keys.as_ref().unwrap())?; + } + writeln!(f, "====== Perms commits: {}", v0.perms.len())?; + let mut i = 0; + for block in &v0.perms { + writeln!(f, "========== {:03}: {}", i, block)?; + i += 1; + } + } + } + Ok(()) + } +} + +impl fmt::Display for CommitHeaderKeys { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::V0(v0) => { + writeln!(f, "=== CommitHeaderKeys V0 ===")?; + writeln!(f, "==== acks : {}", v0.acks.len())?; + for ack in &v0.acks { + writeln!(f, "============== {}", ack)?; + } + writeln!(f, "==== nacks : {}", v0.nacks.len())?; + for nack in &v0.nacks { + writeln!(f, "============== {}", nack)?; + } + writeln!(f, "==== deps : {}", v0.deps.len())?; + for dep in &v0.deps { + writeln!(f, "============== {}", dep)?; + } + writeln!(f, "==== refs : {}", v0.refs.len())?; + for rref in &v0.refs { + writeln!(f, "============== {}", rref)?; + } + } + } + Ok(()) + } +} + mod test { use std::collections::HashMap; - use ed25519_dalek::*; - use rand::rngs::OsRng; - use crate::branch::*; use crate::commit::*; use crate::store::*; use crate::types::*; + use crate::utils::*; #[test] pub fn test_commit() { - let mut csprng = OsRng {}; - let keypair: Keypair = Keypair::generate(&mut csprng); - log_debug!( - "private key: ({}) {:?}", - keypair.secret.as_bytes().len(), - keypair.secret.as_bytes() - ); - log_debug!( - "public key: ({}) {:?}", - keypair.public.as_bytes().len(), - keypair.public.as_bytes() - ); - let ed_priv_key = keypair.secret.to_bytes(); - let ed_pub_key = keypair.public.to_bytes(); - let priv_key = PrivKey::Ed25519PrivKey(ed_priv_key); - let pub_key = PubKey::Ed25519PubKey(ed_pub_key); + let (priv_key, pub_key) = generate_keypair(); let seq = 3; - let obj_ref = ObjectRef { - id: ObjectId::Blake3Digest32([1; 32]), - key: SymKey::ChaCha20Key([2; 32]), - }; + let obj_ref = ObjectRef::dummy(); let obj_refs = vec![obj_ref.clone()]; let branch = pub_key; let deps = obj_refs.clone(); @@ -825,15 +1159,12 @@ mod test { body_ref, ) .unwrap(); - log_debug!("commit: {:?}", commit); + log_debug!("{}", commit); let store = Box::new(HashMapRepoStore::new()); let repo = Repo::new_with_member(&pub_key, &pub_key, &[PermissionV0::WriteAsync], store); - //let body = CommitBody::Ack(Ack::V0()); - //log_debug!("body: {:?}", body); - match commit.load_body(repo.get_store()) { Ok(_b) => panic!("Body should not exist"), Err(CommitLoadError::MissingBlocks(missing)) => { @@ -842,9 +1173,6 @@ mod test { Err(e) => panic!("Commit verify error: {:?}", e), } - let content = commit.content_v0(); - log_debug!("content: {:?}", content); - commit.verify_sig().expect("Invalid signature"); commit.verify_perm(&repo).expect("Permission denied"); diff --git a/p2p-repo/src/errors.rs b/p2p-repo/src/errors.rs index efe97ad..df071e9 100644 --- a/p2p-repo/src/errors.rs +++ b/p2p-repo/src/errors.rs @@ -1,7 +1,5 @@ // Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers // All rights reserved. -// This code is partly derived from work written by TG x Thoth from P2Pcollab. -// Copyright 2022 TG x Thoth // Licensed under the Apache License, Version 2.0 // // or the MIT license , diff --git a/p2p-repo/src/object.rs b/p2p-repo/src/object.rs index e2e0af6..a8928b9 100644 --- a/p2p-repo/src/object.rs +++ b/p2p-repo/src/object.rs @@ -11,6 +11,7 @@ //! Merkle hash tree of Objects +use core::fmt; use std::collections::{HashMap, HashSet}; use chacha20::cipher::{KeyIvInit, StreamCipher}; @@ -20,34 +21,30 @@ use crate::log::*; use crate::store::*; use crate::types::*; -// TODO: review all those constants. they were done for LMDB but we now use RocksDB. -// Also, the format of Blocks have changed so all should be checked. +const BLOCK_EXTRA: usize = 12; // 8 is the smallest extra + BLOCK_MAX_DATA_EXTRA +const HEADER_REF_EXTRA: usize = 66; +const HEADER_EMBED_EXTRA: usize = 34; +const CHILD_SIZE: usize = 66; -/// Size of a serialized empty Block -const EMPTY_BLOCK_SIZE: usize = 12 + 1; -/// Max size of an embedded CommitHeader -const MAX_EMBEDDED_COMMIT_HEADER_SIZE: usize = 100; -/// Size of a serialized BlockId const BLOCK_ID_SIZE: usize = 33; /// Size of serialized SymKey const BLOCK_KEY_SIZE: usize = 33; /// Size of serialized Object with deps reference. -const EMPTY_ROOT_SIZE_DEPSREF: usize = 77; -/// Extra size needed if depsRef used instead of deps list. -const DEPSREF_OVERLOAD: usize = EMPTY_ROOT_SIZE_DEPSREF - EMPTY_BLOCK_SIZE; -/// Varint extra bytes when reaching the maximum value we will ever use -const BIG_VARINT_EXTRA: usize = 3; +/// Varint extra bytes when reaching the maximum value we will ever use in one block +const BIG_VARINT_EXTRA: usize = 2; /// Varint extra bytes when reaching the maximum size of data byte arrays. const DATA_VARINT_EXTRA: usize = 4; -// Max extra space used by the deps list -//const MAX_DEPS_SIZE: usize = 8 * BLOCK_ID_SIZE; -const MAX_HEADER_SIZE: usize = BLOCK_ID_SIZE; + +const BLOCK_MAX_DATA_EXTRA: usize = 4; #[derive(Debug)] /// An Object in memory. This is not used to serialize data pub struct Object { + /// keeps the deduplicated blocks of the Object + block_contents: HashMap, + /// Blocks of the Object (nodes of the tree) - blocks: Vec, + blocks: Vec, /// Header header: Option, @@ -102,24 +99,30 @@ impl Object { } fn make_block( - content: &[u8], + mut content: Vec, conv_key: &[u8; blake3::OUT_LEN], children: Vec, header_ref: Option, - ) -> Block { - let key_hash = blake3::keyed_hash(conv_key, content); + already_existing: &mut HashMap, + ) -> Result { + let key_hash = blake3::keyed_hash(conv_key, &content); + + let key_slice = key_hash.as_bytes(); + let key = SymKey::ChaCha20Key(key_slice.clone()); + let it = already_existing.get(&key); + if it.is_some() { + return Err(*it.unwrap()); + } let nonce = [0u8; 12]; - let key = key_hash.as_bytes(); - let mut cipher = ChaCha20::new(key.into(), &nonce.into()); - let mut content_enc = Vec::from(content); - let mut content_enc_slice = &mut content_enc.as_mut_slice(); + let mut cipher = ChaCha20::new(key_slice.into(), &nonce.into()); + //let mut content_enc = Vec::from(content); + let mut content_enc_slice = &mut content.as_mut_slice(); cipher.apply_keystream(&mut content_enc_slice); - let key = SymKey::ChaCha20Key(key.clone()); - let block = Block::new(children, header_ref, content_enc, Some(key)); - //log_debug!(">>> make_block:"); - //log_debug!("!! id: {:?}", obj.id()); + + let block = Block::new(children, header_ref, content, Some(key)); + //log_debug!(">>> make_block: {}", block.id()); //log_debug!("!! children: ({}) {:?}", children.len(), children); - block + Ok(block) } fn make_header_v0( @@ -139,7 +142,7 @@ impl Object { id: header_obj.id(), key: header_obj.key().unwrap(), }; - (header_ref, header_obj.blocks) + (header_ref, header_obj.blocks().cloned().collect()) } fn make_header( @@ -153,42 +156,131 @@ impl Object { } } - /// Build tree from leaves, returns parent nodes + /// Build tree from leaves, returns parent nodes and optional header blocks fn make_tree( - leaves: &[Block], + block_contents: &mut HashMap, + already_existing: &mut HashMap, + leaves: &[BlockId], conv_key: &ChaCha20Key, - mut header_ref: Option, + header_prepare_size: usize, + mut header_prepare_block_ref: Option, + mut header_prepare_blocks: Vec, + valid_block_size: usize, arity: usize, - ) -> Vec { - let mut parents = vec![]; + ) -> (Vec, Vec) { + let mut parents: Vec = vec![]; + let mut header_blocks = vec![]; let chunks = leaves.chunks(arity); let mut it = chunks.peekable(); while let Some(nodes) = it.next() { - let keys = nodes.iter().map(|block| block.key().unwrap()).collect(); - let children = nodes.iter().map(|block| block.id()).collect(); + let children = nodes.to_vec(); + let keys: Vec = nodes + .iter() + .map(|block_id| block_contents.get(block_id).unwrap().key().unwrap()) + .collect(); let content = ChunkContentV0::InternalNode(keys); let content_ser = serde_bare::to_vec(&content).unwrap(); //let child_header = None; let header = if parents.is_empty() && it.peek().is_none() { - header_ref.take() + let mut header_prepare_blocks_taken = vec![]; + header_prepare_blocks_taken.append(&mut header_prepare_blocks); + match ( + header_prepare_size, + header_prepare_block_ref.take(), + header_prepare_blocks_taken, + ) { + (0, None, _) => None, + (header_size, Some(block_ref), blocks) => { + let is_embeddable = header_size > 0 + && ((valid_block_size + - BLOCK_EXTRA + - HEADER_EMBED_EXTRA + - header_size) + / CHILD_SIZE) + >= children.len(); + let (header_r, mut h_blocks) = + Self::make_header_ref(is_embeddable, block_ref, blocks); + header_blocks.append(&mut h_blocks); + header_r + } + (_, None, _) => unimplemented!(), + } + //header_ref.take() } else { None }; - parents.push(Self::make_block( - content_ser.as_slice(), - conv_key, - children, - header, - )); + Self::add_block( + Self::make_block(content_ser, conv_key, children, header, already_existing), + &mut parents, + block_contents, + already_existing, + ); } //log_debug!("parents += {}", parents.len()); if 1 < parents.len() { - let mut great_parents = - Self::make_tree(parents.as_slice(), conv_key, header_ref, arity); - parents.append(&mut great_parents); + let mut great_parents = Self::make_tree( + block_contents, + already_existing, + parents.as_slice(), + conv_key, + header_prepare_size, + header_prepare_block_ref, + header_prepare_blocks, + valid_block_size, + arity, + ); + parents.append(&mut great_parents.0); + header_blocks.append(&mut great_parents.1); + } + (parents, header_blocks) + } + + fn make_header_ref( + embedded: bool, + header_ref: BlockRef, + blocks: Vec, + ) -> (Option, Vec) { + if embedded { + ( + Some(CommitHeaderRef { + obj: CommitHeaderObject::EncryptedContent( + blocks[0].encrypted_content().to_vec(), + ), + key: header_ref.key, + }), + vec![], + ) + } else { + ( + Some(CommitHeaderRef { + obj: CommitHeaderObject::Id(header_ref.id), + key: header_ref.key, + }), + blocks, + ) + } + } + + fn add_block( + block_result: Result, + blocks: &mut Vec, + block_contents: &mut HashMap, + already_existing: &mut HashMap, + ) { + match block_result { + Ok(mut block) => { + let id = block.get_and_save_id(); + blocks.push(id); + if !block_contents.contains_key(&id) { + already_existing.insert(block.key().unwrap(), id); + block_contents.insert(id, block); + } + } + Err(id) => { + blocks.push(id); + } } - parents } /// Create new Object from given content @@ -199,9 +291,9 @@ impl Object { /// Arguments: /// * `content`: Object content /// * `header`: CommitHeaderV0 : All references of the object - /// * `block_size`: Desired block size for chunking content, rounded up to nearest valid block size - /// * `store`: store public key - /// * `store_secret`: store's read capability secret + /// * `block_size`: Desired block size for chunking content, will be rounded up to nearest valid block size + /// * `store`: store public key, needed to generate the convergence key + /// * `store_secret`: store's read capability secret, needed to generate the convergence key pub fn new( content: ObjectContent, mut header: Option, @@ -209,85 +301,127 @@ impl Object { store: &StoreRepo, store_secret: &ReadCapSecret, ) -> Object { + if header.is_some() && !content.can_have_header() { + panic!( + "cannot make a new Object with header if ObjectContent type different from Commit" + ); + } + // create blocks by chunking + encrypting content let valid_block_size = store_valid_value_size(block_size); log_debug!("valid_block_size {}", valid_block_size); - let data_chunk_size = valid_block_size - EMPTY_BLOCK_SIZE - DATA_VARINT_EXTRA; - let mut blocks: Vec = vec![]; + // let max_arity_leaves: usize = (valid_block_size - BLOCK_EXTRA) / CHILD_SIZE; + // let max_arity_root: usize = + // (valid_block_size - BLOCK_EXTRA - HEADER_REF_EXTRA) / CHILD_SIZE; + let max_data_payload_size = + valid_block_size - BLOCK_EXTRA - HEADER_REF_EXTRA * header.as_ref().map_or(0, |_| 1); + let max_arity: usize = max_data_payload_size / CHILD_SIZE; + + let mut blocks: Vec = vec![]; + let mut block_contents: HashMap = HashMap::new(); + let mut already_existing: HashMap = HashMap::new(); let conv_key = Self::convergence_key(store, store_secret); - let (header_ref, header_blocks) = match &header { - None => (None, vec![]), + let header_prepare = match &header { + None => (0 as usize, None, vec![]), Some(h) => { - let res = Self::make_header(h.clone(), valid_block_size, store, store_secret); - if res.1.len() == 1 - && res.1[0].encrypted_content().len() < MAX_EMBEDDED_COMMIT_HEADER_SIZE - { + let block_info = + Self::make_header(h.clone(), valid_block_size, store, store_secret); + if block_info.1.len() == 1 { ( - Some(CommitHeaderRef { - obj: CommitHeaderObject::EncryptedContent( - res.1[0].encrypted_content().to_vec(), - ), - key: res.0.key, - }), - vec![], + block_info.1[0].encrypted_content().len(), + Some(block_info.0), + block_info.1, ) } else { - header.as_mut().unwrap().set_id(res.0.id); - ( - Some(CommitHeaderRef { - obj: CommitHeaderObject::Id(res.0.id), - key: res.0.key, - }), - res.1, - ) + (0 as usize, Some(block_info.0), block_info.1) } } }; let content_ser = serde_bare::to_vec(&content).unwrap(); + let content_len = content_ser.len(); - if EMPTY_BLOCK_SIZE - + DATA_VARINT_EXTRA - + MAX_EMBEDDED_COMMIT_HEADER_SIZE - + BLOCK_ID_SIZE * header_ref.as_ref().map_or(0, |_| 1) - + content_ser.len() - <= valid_block_size - { + log_debug!( + "only one block? {} {} {}", + content_len <= max_data_payload_size, + content_len, + max_data_payload_size + ); + let header_blocks = if content_len <= max_data_payload_size { // content fits in root node let data_chunk = ChunkContentV0::DataChunk(content_ser.clone()); let content_ser = serde_bare::to_vec(&data_chunk).unwrap(); - blocks.push(Self::make_block( - content_ser.as_slice(), - &conv_key, - vec![], - header_ref, - )); + + let (header_ref, h_blocks) = match header_prepare { + (0, None, _) => (None, vec![]), + (header_size, Some(block_ref), blocks) => { + let is_embeddable = header_size > 0 + && valid_block_size - BLOCK_EXTRA - HEADER_EMBED_EXTRA - content_ser.len() + > header_size; + Self::make_header_ref(is_embeddable, block_ref, blocks) + } + (_, None, _) => unimplemented!(), + }; + Self::add_block( + Self::make_block( + content_ser, + &conv_key, + vec![], + header_ref, + &mut already_existing, + ), + &mut blocks, + &mut block_contents, + &mut already_existing, + ); + + h_blocks } else { // chunk content and create leaf nodes - for chunk in content_ser.chunks(data_chunk_size) { + let mut i = 0; + let total = content_len / (valid_block_size - BLOCK_EXTRA); + for chunk in content_ser.chunks(valid_block_size - BLOCK_EXTRA) { let data_chunk = ChunkContentV0::DataChunk(chunk.to_vec()); - let content_ser = serde_bare::to_vec(&data_chunk).unwrap(); - blocks.push(Self::make_block( - content_ser.as_slice(), - &conv_key, - vec![], - None, - )); + let chunk_ser = serde_bare::to_vec(&data_chunk).unwrap(); + Self::add_block( + Self::make_block(chunk_ser, &conv_key, vec![], None, &mut already_existing), + &mut blocks, + &mut block_contents, + &mut already_existing, + ); + log_debug!("make_block {} of {} - {}%", i, total, i * 100 / total); + i = i + 1; } // internal nodes - // arity: max number of ObjectRefs that fit inside an InternalNode Object within the object_size limit - let arity: usize = - (valid_block_size - EMPTY_BLOCK_SIZE - BIG_VARINT_EXTRA * 2 - MAX_HEADER_SIZE) - / (BLOCK_ID_SIZE + BLOCK_KEY_SIZE); - let mut parents = Self::make_tree(blocks.as_slice(), &conv_key, header_ref, arity); - blocks.append(&mut parents); - } + // max_arity: max number of ObjectRefs that fit inside an InternalNode Object within the max_data_payload_size limit + let mut parents = Self::make_tree( + &mut block_contents, + &mut already_existing, + blocks.as_slice(), + &conv_key, + header_prepare.0, + header_prepare.1, + header_prepare.2, + valid_block_size, + max_arity, + ); + + blocks.append(&mut parents.0); + parents.1 + }; + if header_blocks.len() > 0 { + header + .as_mut() + .unwrap() + .set_id(header_blocks.last().unwrap().id()); + } Object { blocks, + block_contents, header, header_blocks, #[cfg(test)] @@ -306,8 +440,9 @@ impl Object { fn load_tree( parents: Vec, store: &Box, - blocks: &mut Vec, + blocks: &mut Vec, missing: &mut Vec, + block_contents: &mut HashMap, ) { let mut children: Vec = vec![]; for id in parents { @@ -318,26 +453,36 @@ impl Object { children.extend(o.children().iter().rev()); } } - blocks.insert(0, block); + blocks.insert(0, id); + if !block_contents.contains_key(&id) { + block_contents.insert(id, block); + } } Err(_) => missing.push(id.clone()), } } if !children.is_empty() { - load_tree(children, store, blocks, missing); + load_tree(children, store, blocks, missing, block_contents); } } - let mut blocks: Vec = vec![]; + let mut blocks: Vec = vec![]; + let mut block_contents: HashMap = HashMap::new(); let mut missing: Vec = vec![]; - load_tree(vec![id], store, &mut blocks, &mut missing); + load_tree( + vec![id], + store, + &mut blocks, + &mut missing, + &mut block_contents, + ); if !missing.is_empty() { return Err(ObjectParseError::MissingBlocks(missing)); } - let root = blocks.last_mut().unwrap(); + let root = block_contents.get_mut(blocks.last().unwrap()).unwrap(); if key.is_some() { root.set_key(key); } @@ -350,7 +495,7 @@ impl Object { match obj.content()? { ObjectContent::V0(ObjectContentV0::CommitHeader(mut commit_header)) => { commit_header.set_id(id); - (Some(commit_header), Some(obj.blocks)) + (Some(commit_header), Some(obj.blocks().cloned().collect())) } _ => return Err(ObjectParseError::InvalidHeader), } @@ -370,6 +515,7 @@ impl Object { Ok(Object { blocks, + block_contents, header: header.0, header_blocks: header.1.unwrap_or(vec![]), #[cfg(test)] @@ -380,11 +526,15 @@ impl Object { /// Save blocks of the object and the blocks of the header object in the store pub fn save(&self, store: &Box) -> Result<(), StorageError> { let mut deduplicated: HashSet = HashSet::new(); - for block in self.blocks.iter().chain(self.header_blocks.iter()) { + //.chain(self.header_blocks.iter()) + for block_id in self.blocks.iter() { + store.put(self.block_contents.get(block_id).unwrap())?; + } + for block in &self.header_blocks { let id = block.id(); if deduplicated.get(&id).is_none() { - store.put(block)?; deduplicated.insert(id); + store.put(block)?; } } Ok(()) @@ -407,7 +557,10 @@ impl Object { /// Get the ID of the Object and saves it pub fn get_and_save_id(&mut self) -> ObjectId { - self.blocks.last_mut().unwrap().get_and_save_id() + self.block_contents + .get_mut(self.blocks.last().unwrap()) + .unwrap() + .get_and_save_id() } /// Get the key for the Object @@ -431,7 +584,7 @@ impl Object { self.header.as_ref().map_or(true, |h| h.is_root()) } - /// Get deps (that have an ID in the header, without checking if there is a key for it in the header_keys) + /// Get deps (that have an ID in the header, without checking if there is a key for them in the header_keys) /// if there is no header, returns an empty vec pub fn deps(&self) -> Vec { match &self.header { @@ -440,7 +593,16 @@ impl Object { } } - /// Get acks (that have an ID in the header, without checking if there is a key for it in the header_keys) + /// Get acks and nacks (that have an ID in the header, without checking if there is a key for them in the header_keys) + /// if there is no header, returns an empty vec + pub fn acks_and_nacks(&self) -> Vec { + match &self.header { + Some(h) => h.acks_and_nacks(), + None => vec![], + } + } + + /// Get acks (that have an ID in the header, without checking if there is a key for them in the header_keys) /// if there is no header, returns an empty vec pub fn acks(&self) -> Vec { match &self.header { @@ -450,54 +612,78 @@ impl Object { } pub fn root_block(&self) -> &Block { - self.blocks.last().unwrap() + self.block_contents + .get(self.blocks.last().unwrap()) + .unwrap() } pub fn header(&self) -> &Option { &self.header } - pub fn blocks(&self) -> &Vec { - &self.blocks + pub fn blocks(&self) -> impl Iterator + '_ { + self.blocks + .iter() + .map(|key| self.block_contents.get(key).unwrap()) } - pub fn to_hashmap(&self) -> HashMap { - let mut map: HashMap = HashMap::new(); - for block in &self.blocks { - map.insert(block.id(), block.clone()); - } - map + pub fn size(&self) -> usize { + let mut total = 0; + self.blocks().for_each(|b| total += b.size()); + self.header_blocks.iter().for_each(|b| total += b.size()); + total + } + + pub fn dedup_size(&self) -> usize { + let mut total = 0; + self.block_contents.values().for_each(|b| total += b.size()); + self.header_blocks.iter().for_each(|b| total += b.size()); + total + } + + pub fn hashmap(&self) -> &HashMap { + &self.block_contents } /// Collect leaves from the tree fn collect_leaves( - blocks: &Vec, + blocks: &Vec, parents: &Vec<(ObjectId, SymKey)>, parent_index: usize, leaves: &mut Option<&mut Vec>, obj_content: &mut Option<&mut Vec>, - ) -> Result<(), ObjectParseError> { - /*log_debug!( - ">>> collect_leaves: #{}..{}", - parent_index, - parent_index + parents.len() - 1 - );*/ + block_contents: &HashMap, + ) -> Result { + // log_debug!( + // ">>> collect_leaves: #{}..{}", + // parent_index, + // parent_index + parents.len() - 1 + // ); let mut children: Vec<(ObjectId, SymKey)> = vec![]; let mut i = parent_index; for (id, key) in parents { //log_debug!("!!! parent: #{}", i); - let block = &blocks[i]; + let block = block_contents.get(&blocks[i]).unwrap(); i += 1; // verify object ID - if *id != block.id() { - log_debug!("Invalid ObjectId.\nExp: {:?}\nGot: {:?}", *id, block.id()); + let block_id = block.id(); + if *id != block_id { + log_debug!("Invalid ObjectId.\nExp: {:?}\nGot: {:?}", *id, block_id); return Err(ObjectParseError::InvalidBlockId); } match block { Block::V0(b) => { + let b_children = b.children(); + if leaves.is_none() && obj_content.is_none() { + // we just want to calculate the depth. no need to decrypt + for id in b_children { + children.push((id.clone(), ObjectKey::dummy())); + } + continue; + } // decrypt content in place (this is why we have to clone first) let mut content_dec = b.content.encrypted_content().clone(); match key { @@ -518,7 +704,6 @@ impl Object { return Err(ObjectParseError::BlockDeserializeError); } } - let b_children = b.children(); // parse content match content { ChunkContentV0::InternalNode(keys) => { @@ -555,22 +740,21 @@ impl Object { } } } - if !children.is_empty() { + Ok(if !children.is_empty() { if parent_index < children.len() { return Err(ObjectParseError::InvalidChildren); } - match Self::collect_leaves( + Self::collect_leaves( blocks, &children, parent_index - children.len(), leaves, obj_content, - ) { - Ok(_) => (), - Err(e) => return Err(e), - } - } - Ok(()) + block_contents, + )? + 1 + } else { + 0 + }) } // /// Parse the Object and return the leaf Blocks with decryption key set @@ -591,6 +775,7 @@ impl Object { /// Parse the Object and return the decrypted content assembled from Blocks pub fn content(&self) -> Result { + // TODO: keep a local cache of content (with oncecell) if self.key().is_none() { return Err(ObjectParseError::MissingRootKey); } @@ -602,6 +787,7 @@ impl Object { self.blocks.len() - 1, &mut None, &mut Some(&mut obj_content), + &self.block_contents, ) { Ok(_) => match serde_bare::from_slice(obj_content.as_slice()) { Ok(c) => Ok(c), @@ -614,6 +800,22 @@ impl Object { } } + /// Parse the Object returns the depth of the tree + pub fn depth(&self) -> Result { + if self.key().is_none() { + return Err(ObjectParseError::MissingRootKey); + } + let parents = vec![(self.id(), self.key().unwrap())]; + Self::collect_leaves( + &self.blocks, + &parents, + self.blocks.len() - 1, + &mut None, + &mut None, + &self.block_contents, + ) + } + pub fn content_v0(&self) -> Result { match self.content() { Ok(ObjectContent::V0(v0)) => Ok(v0), @@ -623,6 +825,84 @@ impl Object { } } +impl fmt::Display for Object { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f, "====== Object ID {}", self.id())?; + writeln!( + f, + "== Key: {}", + self.key().map_or("None".to_string(), |k| format!("{}", k)) + )?; + #[cfg(test)] + writeln!(f, "== saved: {}", self.already_saved)?; + writeln!( + f, + "== Header: {}", + self.header + .as_ref() + .map_or("None".to_string(), |k| format!("{}", k)) + )?; + writeln!(f, "== Blocks: {}", self.blocks.len())?; + let mut i = 0; + for block_id in &self.blocks { + writeln!(f, "========== {:03}: {}", i, block_id)?; + i += 1; + } + writeln!(f, "== Depth: {:?}", self.depth().unwrap_or(0))?; + + writeln!(f, "== Header Blocks: {}", self.header_blocks.len())?; + i = 0; + for block in &self.header_blocks { + writeln!(f, "========== {:03}: {}", i, block.id())?; + } + Ok(()) + } +} + +impl ObjectContent { + pub fn can_have_header(&self) -> bool { + match self { + Self::V0(v0) => match v0 { + ObjectContentV0::Commit(_) => true, + _ => false, + }, + } + } + + pub fn new_file_V0_with_content(content: Vec, content_type: &str) -> Self { + ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 { + content_type: content_type.into(), + metadata: vec![], + content, + }))) + } +} + +impl fmt::Display for ObjectContent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let (version, content_type) = match self { + Self::V0(v0) => ( + "v0", + match v0 { + ObjectContentV0::Commit(_) => "Commit", + ObjectContentV0::CommitBody(_) => "CommitBody", + ObjectContentV0::CommitHeader(_) => "CommitHeader", + ObjectContentV0::Quorum(_) => "Quorum", + ObjectContentV0::Signature(_) => "Signature", + ObjectContentV0::Certificate(_) => "Certificate", + ObjectContentV0::File(_) => "File", + }, + ), + }; + writeln!( + f, + "====== ObjectContent {} {} ======", + version, content_type + )?; + Ok(()) + } +} + #[cfg(test)] mod test { @@ -634,13 +914,12 @@ mod test { use std::io::Write; // Those constants are calculated with RepoStore::get_max_value_size - /// Maximum arity of branch containing max number of leaves - const MAX_ARITY_LEAVES: usize = 31774; - /// Maximum arity of root branch - const MAX_ARITY_ROOT: usize = 31770; - /// Maximum data that can fit in object.content - const MAX_DATA_PAYLOAD_SIZE: usize = 2097112; + // const MAX_ARITY_LEAVES: usize = 15887; + // /// Maximum arity of root branch + // const MAX_ARITY_ROOT: usize = 15886; + // /// Maximum data that can fit in object.content + // const MAX_DATA_PAYLOAD_SIZE: usize = 1048564; #[test] pub fn test_pubkey_from_str() { @@ -650,6 +929,27 @@ mod test { assert_eq!(server_key, pubkey); } + /// Test no header needed if not a commit + #[test] + #[should_panic] + pub fn test_no_header() { + let file = File::V0(FileV0 { + content_type: "image/jpeg".into(), + metadata: vec![], + content: vec![], + }); + let content = ObjectContent::V0(ObjectContentV0::File(file)); + let (store_repo, store_secret) = StoreRepo::dummy_public_v0(); + let header = CommitHeader::new_with_acks([ObjectId::dummy()].to_vec()); + let _obj = Object::new( + content, + header, + store_max_value_size(), + &store_repo, + &store_secret, + ); + } + /// Test JPEG file #[test] pub fn test_jpg() { @@ -659,36 +959,22 @@ mod test { reader .read_to_end(&mut img_buffer) .expect("read of test.jpg"); + let content = ObjectContent::new_file_V0_with_content(img_buffer, "image/jpeg"); - let file = File::V0(FileV0 { - content_type: "image/jpeg".into(), - metadata: vec![], - content: img_buffer, - }); - let content = ObjectContent::V0(ObjectContentV0::File(file)); - - let deps: Vec = vec![Digest::Blake3Digest32([9; 32])]; let max_object_size = store_max_value_size(); - - let store_secret = SymKey::ChaCha20Key([0; 32]); - let store_pubkey = PubKey::Ed25519PubKey([1; 32]); - let store_repo = StoreRepo::V0(StoreRepoV0::PublicStore(store_pubkey)); - + let (store_repo, store_secret) = StoreRepo::dummy_public_v0(); let obj = Object::new(content, None, max_object_size, &store_repo, &store_secret); - log_debug!("obj.id: {:?}", obj.id()); - log_debug!("obj.key: {:?}", obj.key()); - log_debug!("obj.blocks.len: {:?}", obj.blocks().len()); + log_debug!("{}", obj); let mut i = 0; for node in obj.blocks() { - log_debug!("#{}: {:?}", i, node.id()); + log_debug!("#{}: {}", i, node.id()); let mut file = std::fs::File::create(format!("tests/{}.ng", node.id())) .expect("open block write file"); let ser_file = serde_bare::to_vec(node).unwrap(); - file.write_all(&ser_file); - log_debug!("{:?}", ser_file); - + file.write_all(&ser_file) + .expect(&format!("write of block #{}", i)); i += 1; } } @@ -703,39 +989,27 @@ mod test { }); let content = ObjectContent::V0(ObjectContentV0::File(file)); - let deps = vec![Digest::Blake3Digest32([9; 32])]; - let header = CommitHeader::new_with_deps(deps.clone()); - let exp = Some(2u32.pow(31)); + let acks = vec![]; + //let header = CommitHeader::new_with_acks(acks.clone()); let max_object_size = 0; - let store_secret = SymKey::ChaCha20Key([0; 32]); - let store_pubkey = PubKey::Ed25519PubKey([1; 32]); - let store_repo = StoreRepo::V0(StoreRepoV0::PublicStore(store_pubkey)); + let (store_repo, store_secret) = StoreRepo::dummy_public_v0(); let mut obj = Object::new( content.clone(), - header, + None, max_object_size, &store_repo, &store_secret, ); - log_debug!("obj.id: {:?}", obj.id()); - log_debug!("obj.key: {:?}", obj.key()); - log_debug!("obj.acks: {:?}", obj.acks()); - log_debug!("obj.deps: {:?}", obj.deps()); - log_debug!("obj.blocks.len: {:?}", obj.blocks().len()); - - let mut i = 0; - for node in obj.blocks() { - log_debug!("#{}: {:?}", i, node.id()); - i += 1; - } + log_debug!("{}", obj); - assert_eq!(*obj.deps(), deps); + assert_eq!(*obj.acks(), acks); match obj.content() { Ok(cnt) => { + log_debug!("{}", cnt); assert_eq!(content, cnt); } Err(e) => panic!("Object parse error: {:?}", e), @@ -746,21 +1020,13 @@ mod test { let obj2 = Object::load(obj.id(), obj.key(), &store).unwrap(); - log_debug!("obj2.id: {:?}", obj2.id()); - log_debug!("obj2.key: {:?}", obj2.key()); - log_debug!("obj2.acks: {:?}", obj2.acks()); - log_debug!("obj2.deps: {:?}", obj2.deps()); - log_debug!("obj2.blocks.len: {:?}", obj2.blocks().len()); - let mut i = 0; - for node in obj2.blocks() { - log_debug!("#{}: {:?}", i, node.id()); - i += 1; - } + log_debug!("{}", obj2); - assert_eq!(*obj2.deps(), deps); + assert_eq!(*obj2.acks(), acks); match obj2.content() { Ok(cnt) => { + log_debug!("{}", cnt); assert_eq!(content, cnt); } Err(e) => panic!("Object2 parse error: {:?}", e), @@ -768,17 +1034,9 @@ mod test { let obj3 = Object::load(obj.id(), None, &store).unwrap(); - log_debug!("obj3.id: {:?}", obj3.id()); - log_debug!("obj3.key: {:?}", obj3.key()); - log_debug!("obj3.deps: {:?}", obj3.deps()); - log_debug!("obj3.blocks.len: {:?}", obj3.blocks().len()); - let mut i = 0; - for node in obj3.blocks() { - log_debug!("#{}: {:?}", i, node.id()); - i += 1; - } + log_debug!("{}", obj3); - assert_eq!(*obj3.deps(), deps); + assert_eq!(*obj3.acks(), acks); match obj3.content() { Err(ObjectParseError::MissingRootKey) => (), @@ -789,24 +1047,87 @@ mod test { /// Checks that a content that fits the root node, will not be chunked into children nodes #[test] - pub fn test_depth_1() { - let deps: Vec = vec![Digest::Blake3Digest32([9; 32])]; + pub fn test_depth_0() { + let (store_repo, store_secret) = StoreRepo::dummy_public_v0(); let empty_file = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 { content_type: "".into(), metadata: vec![], content: vec![], }))); - let empty_file_ser = serde_bare::to_vec(&empty_file).unwrap(); - log_debug!("empty file size: {}", empty_file_ser.len()); + let content_ser = serde_bare::to_vec(&empty_file).unwrap(); + log_debug!("content len for empty : {}", content_ser.len()); + + // let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 { + // content_type: "".into(), + // metadata: vec![], + // content: vec![99; 1000], + // }))); + // let content_ser = serde_bare::to_vec(&content).unwrap(); + // log_debug!("content len for 1000 : {}", content_ser.len()); + + // let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 { + // content_type: "".into(), + // metadata: vec![], + // content: vec![99; 1048554], + // }))); + // let content_ser = serde_bare::to_vec(&content).unwrap(); + // log_debug!("content len for 1048554 : {}", content_ser.len()); + + // let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 { + // content_type: "".into(), + // metadata: vec![], + // content: vec![99; 1550000], + // }))); + // let content_ser = serde_bare::to_vec(&content).unwrap(); + // log_debug!("content len for 1550000 : {}", content_ser.len()); + + // let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 { + // content_type: "".into(), + // metadata: vec![], + // content: vec![99; 1550000000], + // }))); + // let content_ser = serde_bare::to_vec(&content).unwrap(); + // log_debug!("content len for 1550000000 : {}", content_ser.len()); + + // let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 { + // content_type: "".into(), + // metadata: vec![99; 1000], + // content: vec![99; 1000], + // }))); + // let content_ser = serde_bare::to_vec(&content).unwrap(); + // log_debug!("content len for 1000+1000: {}", content_ser.len()); + + // let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 { + // content_type: "".into(), + // metadata: vec![99; 1000], + // content: vec![99; 524277], + // }))); + // let content_ser = serde_bare::to_vec(&content).unwrap(); + // log_debug!("content len for 1000+524277: {}", content_ser.len()); + + // let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 { + // content_type: "".into(), + // metadata: vec![99; 524277], + // content: vec![99; 524277], + // }))); + // let content_ser = serde_bare::to_vec(&content).unwrap(); + // log_debug!("content len for 2*524277: {}", content_ser.len()); + + let empty_obj = Object::new( + empty_file, + None, + store_max_value_size(), + &store_repo, + &store_secret, + ); + + let empty_file_size = empty_obj.size(); + log_debug!("empty file size: {}", empty_file_size); - let size = store_max_value_size() - - EMPTY_BLOCK_SIZE - - DATA_VARINT_EXTRA - - BLOCK_ID_SIZE - - empty_file_ser.len() - - DATA_VARINT_EXTRA; - log_debug!("file size: {}", size); + let size = + store_max_value_size() - empty_file_size - BLOCK_MAX_DATA_EXTRA - BIG_VARINT_EXTRA; + log_debug!("full file content size: {}", size); let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 { content_type: "".into(), @@ -814,127 +1135,570 @@ mod test { content: vec![99; size], }))); let content_ser = serde_bare::to_vec(&content).unwrap(); - log_debug!("content len: {}", content_ser.len()); - - let expiry = Some(2u32.pow(31)); - let max_object_size = store_max_value_size(); - - let store_secret = SymKey::ChaCha20Key([0; 32]); - let store_pubkey = PubKey::Ed25519PubKey([1; 32]); - let store_repo = StoreRepo::V0(StoreRepoV0::PublicStore(store_pubkey)); + log_debug!("content len: {}", content_ser.len()); let object = Object::new( content, - CommitHeader::new_with_deps(deps), - max_object_size, + None, + store_max_value_size(), &store_repo, &store_secret, ); + log_debug!("{}", object); + + log_debug!("object size: {}", object.size()); - log_debug!("root_id: {:?}", object.id()); - log_debug!("root_key: {:?}", object.key().unwrap()); - log_debug!("nodes.len: {:?}", object.blocks().len()); - //log_debug!("root: {:?}", tree.root_block()); - //log_debug!("nodes: {:?}", object.blocks); assert_eq!(object.blocks.len(), 1); } + /// Checks that a content that doesn't fit in all the children of first level in tree #[test] - pub fn test_block_size() { - let max_block_size = store_max_value_size(); - log_debug!("max_object_size: {}", max_block_size); - - let id = Digest::Blake3Digest32([0u8; 32]); - let key = SymKey::ChaCha20Key([0u8; 32]); + pub fn test_depth_1() { + const MAX_ARITY_LEAVES: usize = 15887; + // /// Maximum arity of root branch + // const MAX_ARITY_ROOT: usize = 15886; + // /// Maximum data that can fit in object.content + const MAX_DATA_PAYLOAD_SIZE: usize = 1048564; - let one_key = ChunkContentV0::InternalNode(vec![key.clone()]); - let one_key_ser = serde_bare::to_vec(&one_key).unwrap(); + ////// 16 GB of data! + let data_size = MAX_ARITY_LEAVES * MAX_DATA_PAYLOAD_SIZE - 10; - let two_keys = ChunkContentV0::InternalNode(vec![key.clone(), key.clone()]); - let two_keys_ser = serde_bare::to_vec(&two_keys).unwrap(); + let (store_repo, store_secret) = StoreRepo::dummy_public_v0(); + log_debug!("creating 16GB of data"); + let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 { + content_type: "".into(), + metadata: vec![], + content: vec![99; data_size], + }))); + //let content_ser = serde_bare::to_vec(&content).unwrap(); + //log_debug!("content len: {}", content_ser.len()); + log_debug!("creating object with that data"); + let object = Object::new( + content, + None, + store_max_value_size(), + &store_repo, + &store_secret, + ); + log_debug!("{}", object); - let max_keys = ChunkContentV0::InternalNode(vec![key.clone(); MAX_ARITY_LEAVES]); - let max_keys_ser = serde_bare::to_vec(&max_keys).unwrap(); + let obj_size = object.size(); + log_debug!("object size: {}", obj_size); - let data = ChunkContentV0::DataChunk(vec![]); - let data_ser = serde_bare::to_vec(&data).unwrap(); + log_debug!("data size: {}", data_size); + log_debug!( + "overhead: {} - {}%", + obj_size - data_size, + ((obj_size - data_size) * 100) as f32 / data_size as f32 + ); - let data_full = ChunkContentV0::DataChunk(vec![0; MAX_DATA_PAYLOAD_SIZE]); - let data_full_ser = serde_bare::to_vec(&data_full).unwrap(); + log_debug!("number of blocks : {}", object.blocks.len()); + assert_eq!(object.blocks.len(), MAX_ARITY_LEAVES + 1); + assert_eq!(object.depth().unwrap(), 1); + } - let leaf_empty = Block::new(vec![], None, data_ser.clone(), None); - let leaf_empty_ser = serde_bare::to_vec(&leaf_empty).unwrap(); + /// Checks that a content that doesn't fit in all the children of first level in tree + #[test] + pub fn test_depth_2() { + const MAX_ARITY_LEAVES: usize = 15887; + const MAX_DATA_PAYLOAD_SIZE: usize = 1048564; - let leaf_full_data = Block::new(vec![], None, data_full_ser.clone(), None); - let leaf_full_data_ser = serde_bare::to_vec(&leaf_full_data).unwrap(); + ////// 16 GB of data! + let data_size = MAX_ARITY_LEAVES * MAX_DATA_PAYLOAD_SIZE; - let root_depsref = Block::new( - vec![], - Some(CommitHeaderRef::from_id_key(id, key.clone())), - data_ser.clone(), + let (store_repo, store_secret) = StoreRepo::dummy_public_v0(); + log_debug!("creating 16GB of data"); + let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 { + content_type: "".into(), + metadata: vec![], + content: vec![99; data_size], + }))); + //let content_ser = serde_bare::to_vec(&content).unwrap(); + //log_debug!("content len: {}", content_ser.len()); + log_debug!("creating object with that data"); + let object = Object::new( + content, None, + store_max_value_size(), + &store_repo, + &store_secret, ); + log_debug!("{}", object); - let root_depsref_ser = serde_bare::to_vec(&root_depsref).unwrap(); + let obj_size = object.size(); + log_debug!("object size: {}", obj_size); - let internal_max = Block::new(vec![id; MAX_ARITY_LEAVES], None, max_keys_ser.clone(), None); - let internal_max_ser = serde_bare::to_vec(&internal_max).unwrap(); + log_debug!("data size: {}", data_size); + log_debug!( + "overhead: {} - {}%", + obj_size - data_size, + ((obj_size - data_size) * 100) as f32 / data_size as f32 + ); - let internal_one = Block::new(vec![id; 1], None, one_key_ser.clone(), None); - let internal_one_ser = serde_bare::to_vec(&internal_one).unwrap(); + log_debug!("number of blocks : {}", object.blocks.len()); + assert_eq!(object.blocks.len(), MAX_ARITY_LEAVES + 4); + assert_eq!(object.depth().unwrap(), 2); + } - let internal_two = Block::new(vec![id; 2], None, two_keys_ser.clone(), None); - let internal_two_ser = serde_bare::to_vec(&internal_two).unwrap(); + /// Checks that a content that doesn't fit in all the children of first level in tree + #[test] + pub fn test_depth_3() { + const MAX_ARITY_LEAVES: usize = 61; + const MAX_DATA_PAYLOAD_SIZE: usize = 4084; - let root_one = Block::new( - vec![id; 1], - Some(CommitHeaderRef::from_id_key(id, key.clone())), - one_key_ser.clone(), - None, - ); - let root_one_ser = serde_bare::to_vec(&root_one).unwrap(); + ////// 900 MB of data! + let data_size = + MAX_ARITY_LEAVES * MAX_ARITY_LEAVES * MAX_ARITY_LEAVES * MAX_DATA_PAYLOAD_SIZE - 10; - let root_two = Block::new( - vec![id; 2], - Some(CommitHeaderRef::from_id_key(id, key)), - two_keys_ser.clone(), + let (store_repo, store_secret) = StoreRepo::dummy_public_v0(); + log_debug!("creating 900MB of data"); + let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 { + content_type: "".into(), + metadata: vec![], + content: vec![99; data_size], + }))); + //let content_ser = serde_bare::to_vec(&content).unwrap(); + //log_debug!("content len: {}", content_ser.len()); + log_debug!("creating object with that data"); + let object = Object::new( + content, None, + store_valid_value_size(0), + &store_repo, + &store_secret, ); - let root_two_ser = serde_bare::to_vec(&root_two).unwrap(); + log_debug!("{}", object); + + let obj_size = object.size(); + log_debug!("object size: {}", obj_size); + log_debug!("data size: {}", data_size); log_debug!( - "range of valid value sizes {} {}", - store_valid_value_size(0), - store_max_value_size() + "overhead: {} - {}%", + obj_size - data_size, + ((obj_size - data_size) * 100) as f32 / data_size as f32 ); + let dedup_size = object.dedup_size(); log_debug!( - "max_data_payload_of_object: {}", - max_block_size - EMPTY_BLOCK_SIZE - DATA_VARINT_EXTRA + "dedup compression: {} - {}%", + data_size - dedup_size, + ((data_size - dedup_size) * 100) as f32 / data_size as f32 ); - log_debug!( - "max_data_payload_depth_1: {}", - max_block_size - EMPTY_BLOCK_SIZE - DATA_VARINT_EXTRA - MAX_HEADER_SIZE + log_debug!("number of blocks : {}", object.blocks.len()); + assert_eq!( + object.blocks.len(), + MAX_ARITY_LEAVES * (MAX_ARITY_LEAVES + 1) * MAX_ARITY_LEAVES + MAX_ARITY_LEAVES + 1 ); + assert_eq!(object.depth().unwrap(), 3); + } - log_debug!( - "max_data_payload_depth_2: {}", - MAX_ARITY_ROOT * MAX_DATA_PAYLOAD_SIZE + /// Checks that a content that doesn't fit in all the children of first level in tree + #[test] + pub fn test_depth_4() { + const MAX_ARITY_LEAVES: usize = 61; + const MAX_DATA_PAYLOAD_SIZE: usize = 4084; + + ////// 55GB of data! + let data_size = MAX_ARITY_LEAVES + * MAX_ARITY_LEAVES + * MAX_ARITY_LEAVES + * MAX_ARITY_LEAVES + * MAX_DATA_PAYLOAD_SIZE + - 12; + + let (store_repo, store_secret) = StoreRepo::dummy_public_v0(); + log_debug!("creating 55GB of data"); + let content = ObjectContent::V0(ObjectContentV0::File(File::V0(FileV0 { + content_type: "".into(), + metadata: vec![], + content: vec![99; data_size], + }))); + //let content_ser = serde_bare::to_vec(&content).unwrap(); + //log_debug!("content len: {}", content_ser.len()); + log_debug!("creating object with that data"); + let object = Object::new( + content, + None, + store_valid_value_size(0), + &store_repo, + &store_secret, ); + log_debug!("{}", object); + + let obj_size = object.size(); + log_debug!("object size: {}", obj_size); + log_debug!("data size: {}", data_size); log_debug!( - "max_data_payload_depth_3: {}", - MAX_ARITY_ROOT * MAX_ARITY_LEAVES * MAX_DATA_PAYLOAD_SIZE + "overhead: {} - {}%", + obj_size - data_size, + ((obj_size - data_size) * 100) as f32 / data_size as f32 ); + log_debug!("number of blocks : {}", object.blocks.len()); + assert_eq!( + object.blocks.len(), + MAX_ARITY_LEAVES + * (MAX_ARITY_LEAVES * (MAX_ARITY_LEAVES + 1) * MAX_ARITY_LEAVES + + MAX_ARITY_LEAVES + + 1) + + 1 + ); + assert_eq!(object.depth().unwrap(), 4); + } + + #[test] + pub fn test_block_size() { + //let max_block_size = store_max_value_size(); + + fn test_block(max_block_size: usize) { + let max_arity_leaves: usize = (max_block_size - BLOCK_EXTRA) / CHILD_SIZE; + let max_arity_root: usize = + (max_block_size - BLOCK_EXTRA - HEADER_REF_EXTRA) / CHILD_SIZE; + + let max_data_payload_size = max_block_size - BLOCK_EXTRA; + + log_debug!("max_block_size: {}", max_block_size); + log_debug!("max_arity_leaves: {}", max_arity_leaves); + log_debug!("max_arity_root: {}", max_arity_root); + log_debug!("max_data_payload_size: {}", max_data_payload_size); + + let (id, key) = ObjectRef::dummy().into(); + + // this should never happen + let zero_key = ChunkContentV0::InternalNode(vec![]); + let zero_key_ser = serde_bare::to_vec(&zero_key).unwrap(); + + let one_key = ChunkContentV0::InternalNode(vec![key.clone()]); + let one_key_ser = serde_bare::to_vec(&one_key).unwrap(); + + let two_keys = ChunkContentV0::InternalNode(vec![key.clone(), key.clone()]); + let two_keys_ser = serde_bare::to_vec(&two_keys).unwrap(); + + let max_keys = ChunkContentV0::InternalNode(vec![key.clone(); max_arity_leaves]); + let max_keys_ser = serde_bare::to_vec(&max_keys).unwrap(); + + let max_keys_root = ChunkContentV0::InternalNode(vec![key.clone(); max_arity_root]); + let max_keys_root_ser = serde_bare::to_vec(&max_keys_root).unwrap(); + + // this should never happen + let data_empty = ChunkContentV0::DataChunk(vec![]); + let data_empty_ser = serde_bare::to_vec(&data_empty).unwrap(); + + let data_full = ChunkContentV0::DataChunk(vec![0; max_data_payload_size]); + let data_full_ser = serde_bare::to_vec(&data_full).unwrap(); + + // this should never happen: an empty block with no children and no data and no header + let leaf_empty = Block::new(vec![], None, data_empty_ser.clone(), None); + let leaf_empty_ser = serde_bare::to_vec(&leaf_empty).unwrap(); + + log_debug!( + "block size of empty leaf without header: {}", + leaf_empty_ser.len() + ); + + let leaf_full_data = Block::new(vec![], None, data_full_ser.clone(), None); + let leaf_full_data_ser = serde_bare::to_vec(&leaf_full_data).unwrap(); + + log_debug!( + "block size of full leaf block without header: {}", + leaf_full_data_ser.len() + ); + + // this should never happen: an empty block with no children and no keys + let internal_zero = Block::new(vec![], None, zero_key_ser.clone(), None); + let internal_zero_ser = serde_bare::to_vec(&internal_zero).unwrap(); + + log_debug!( + "block size of empty internal block without header: {}", + internal_zero_ser.len() + ); + + assert!(leaf_full_data_ser.len() <= max_block_size); + + // let root_zero = Block::new( + // vec![], + // None, + // zero_key_ser.clone(), + // None, + // ); + // let root_zero_ser = serde_bare::to_vec(&root_zero).unwrap(); + + let header_ref = CommitHeaderRef::from_id_key(id, key.clone()); + + // this should never happen. an embedded header never has an empty content + let header_embed = CommitHeaderRef::from_content_key(vec![], key.clone()); + + // this should never happen: an empty block with no children and no data and header ref + let root_zero_header_ref = Block::new( + vec![], + Some(header_ref.clone()), + data_empty_ser.clone(), + None, + ); + let root_zero_header_ref_ser = serde_bare::to_vec(&root_zero_header_ref).unwrap(); + + // this should never happen: an empty block with no children and no data and header embed + let root_zero_header_embed = Block::new( + vec![], + Some(header_embed.clone()), + data_empty_ser.clone(), + None, + ); + let root_zero_header_embed_ser = serde_bare::to_vec(&root_zero_header_embed).unwrap(); + + // log_debug!( + // "block size of empty root block without header: {}", + // root_zero_ser.len() + // ); + + log_debug!( + "block size of empty root block with header ref: {}", + root_zero_header_ref_ser.len() + ); + + log_debug!( + "block size of empty root block with header embedded: {}", + root_zero_header_embed_ser.len() + ); + + let internal_max = + Block::new(vec![id; max_arity_leaves], None, max_keys_ser.clone(), None); + let internal_max_ser = serde_bare::to_vec(&internal_max).unwrap(); + + let internal_one = Block::new(vec![id; 1], None, one_key_ser.clone(), None); + let internal_one_ser = serde_bare::to_vec(&internal_one).unwrap(); + + let internal_two = Block::new(vec![id; 2], None, two_keys_ser.clone(), None); + let internal_two_ser = serde_bare::to_vec(&internal_two).unwrap(); + + log_debug!( + "block size of internal block with 1 child, without header: {}", + internal_one_ser.len() + ); + + log_debug!( + "block size of internal block with 2 children, without header: {}", + internal_two_ser.len() + ); + + log_debug!( + "block size of internal block with max arity children, without header: {}", + internal_max_ser.len() + ); + + assert!(internal_max_ser.len() <= max_block_size); + + let root_one = Block::new( + vec![id; 1], + Some(header_ref.clone()), + one_key_ser.clone(), + None, + ); + let root_one_ser = serde_bare::to_vec(&root_one).unwrap(); + + let root_two = Block::new( + vec![id; 2], + Some(header_ref.clone()), + two_keys_ser.clone(), + None, + ); + let root_two_ser = serde_bare::to_vec(&root_two).unwrap(); + + let root_max = Block::new( + vec![id; max_arity_root], + Some(header_ref.clone()), + max_keys_root_ser.clone(), + None, + ); + let root_max_ser = serde_bare::to_vec(&root_max).unwrap(); + + let data_full_when_header_ref = + ChunkContentV0::DataChunk(vec![0; max_data_payload_size - HEADER_REF_EXTRA]); + let data_full_when_header_ref_ser = + serde_bare::to_vec(&data_full_when_header_ref).unwrap(); + + let root_full = Block::new( + vec![], + Some(header_ref.clone()), + data_full_when_header_ref_ser.clone(), + None, + ); + let root_full_ser = serde_bare::to_vec(&root_full).unwrap(); + + log_debug!( + "block size of root block with header ref with 1 child: {}", + root_one_ser.len() + ); + + log_debug!( + "block size of root block with header ref with 2 children: {}", + root_two_ser.len() + ); + + log_debug!( + "block size of root block with header ref with max arity children: {}", + root_max_ser.len() + ); + + log_debug!( + "block size of root block with header ref with full DataChunk (fitting ObjectContent): {}", + root_full_ser.len() + ); + + assert!(root_full_ser.len() <= max_block_size); + + let root_embed_one = Block::new( + vec![id; 1], + Some(header_embed.clone()), + one_key_ser.clone(), + None, + ); + let root_embed_one_ser = serde_bare::to_vec(&root_embed_one).unwrap(); + + let root_embed_two = Block::new( + vec![id; 2], + Some(header_embed.clone()), + two_keys_ser.clone(), + None, + ); + let root_embed_two_ser = serde_bare::to_vec(&root_embed_two).unwrap(); + + let root_embed_max = Block::new( + vec![id; max_arity_root], + Some(header_embed.clone()), + max_keys_root_ser.clone(), + None, + ); + let root_embed_max_ser = serde_bare::to_vec(&root_embed_max).unwrap(); + + let data_full_when_header_embed = + ChunkContentV0::DataChunk(vec![0; max_data_payload_size - HEADER_EMBED_EXTRA]); + let data_full_when_header_embed_ser = + serde_bare::to_vec(&data_full_when_header_embed).unwrap(); + + let root_embed_full = Block::new( + vec![], + Some(header_embed.clone()), + data_full_when_header_embed_ser.clone(), + None, + ); + let root_embed_full_ser = serde_bare::to_vec(&root_embed_full).unwrap(); + + log_debug!( + "block size of root block with header embed with 1 child: {}", + root_embed_one_ser.len() + ); + + log_debug!( + "block size of root block with header embed with 2 children: {}", + root_embed_two_ser.len() + ); + + log_debug!( + "block size of root block with header embed with max arity children: {}", + root_embed_max_ser.len() + ); + + log_debug!( + "block size of root block with header embed with full DataChunk (fitting ObjectContent): {}", + root_embed_full_ser.len() + ); + + assert!(root_embed_full_ser.len() <= max_block_size); + + let header_acks_1 = CommitHeader::new_with_acks(vec![id]); + let header_acks_2 = CommitHeader::new_with_acks(vec![id, id]); + let header_acks_60 = CommitHeader::new_with_acks(vec![id; 60]); + let header_acks_60_deps_60 = + CommitHeader::new_with_deps_and_acks(vec![id; 60], vec![id; 60]); + + fn make_header_block(header: Option) -> CommitHeaderRef { + let content_ser = serde_bare::to_vec(&ObjectContent::V0( + ObjectContentV0::CommitHeader(header.unwrap()), + )) + .unwrap(); + let data_chunk = ChunkContentV0::DataChunk(content_ser.clone()); + let encrypted_content = serde_bare::to_vec(&data_chunk).unwrap(); + CommitHeaderRef::from_content_key(encrypted_content, SymKey::dummy()) + } + + let header_embed_acks_1 = make_header_block(header_acks_1); + let header_embed_acks_2 = make_header_block(header_acks_2); + let header_embed_acks_60 = make_header_block(header_acks_60); + let header_embed_acks_60_deps_60 = make_header_block(header_acks_60_deps_60); + + fn test_header_embed(name: &str, header: CommitHeaderRef, max_block_size: usize) { + let (id, key) = BlockRef::dummy().into(); + + log_debug!("header content size : {}", header.encrypted_content_len()); + + let max_arity = (max_block_size + - header.encrypted_content_len() + - BLOCK_EXTRA + - HEADER_EMBED_EXTRA) + / CHILD_SIZE; + + log_debug!("max arity for header {} : {}", name, max_arity); + + let max_keys_when_real_header = + ChunkContentV0::InternalNode(vec![key.clone(); max_arity]); + let max_keys_when_real_header_ser = + serde_bare::to_vec(&max_keys_when_real_header).unwrap(); + + let root_embed_max = Block::new( + vec![id; max_arity], + Some(header), + max_keys_when_real_header_ser.clone(), + None, + ); + let root_embed_max_ser = serde_bare::to_vec(&root_embed_max).unwrap(); + + log_debug!( + "block size of root block with header {} with max possible arity children : {}", + name, + root_embed_max_ser.len() + ); + + assert!(root_embed_max_ser.len() <= max_block_size); + } + + test_header_embed( + "embed acks 60 deps 60", + header_embed_acks_60_deps_60, + max_block_size, + ); + + test_header_embed("embed acks 60", header_embed_acks_60, max_block_size); + + test_header_embed("embed acks 2", header_embed_acks_2, max_block_size); + + test_header_embed("embed acks 1", header_embed_acks_1, max_block_size); + } + + let max_block_size = store_max_value_size(); + let min_block_size = store_valid_value_size(0); + + test_block(max_block_size); + test_block(min_block_size); + test_block(store_valid_value_size(10000)); + test_block(store_valid_value_size(100000)); + test_block(store_valid_value_size(1000000)); + test_block(store_valid_value_size(5000)); + + /////////////////// + /* + + let max_arity_leaves = (max_block_size - EMPTY_BLOCK_SIZE - BIG_VARINT_EXTRA * 2) / (BLOCK_ID_SIZE + BLOCK_KEY_SIZE); log_debug!("max_arity_leaves: {}", max_arity_leaves); assert_eq!(max_arity_leaves, MAX_ARITY_LEAVES); assert_eq!( max_block_size - EMPTY_BLOCK_SIZE - DATA_VARINT_EXTRA, - MAX_DATA_PAYLOAD_SIZE + max_data_payload_size ); let max_arity_root = (max_block_size - EMPTY_BLOCK_SIZE - MAX_HEADER_SIZE - BIG_VARINT_EXTRA * 2) @@ -945,8 +1709,8 @@ mod test { assert_eq!(leaf_full_data_ser.len(), max_block_size); log_debug!("leaf_empty: {}", leaf_empty_ser.len()); assert_eq!(leaf_empty_ser.len(), EMPTY_BLOCK_SIZE); - log_debug!("root_depsref: {}", root_depsref_ser.len()); - assert_eq!(root_depsref_ser.len(), EMPTY_ROOT_SIZE_DEPSREF); + // log_debug!("root_depsref: {}", root_depsref_ser.len()); + // assert_eq!(root_depsref_ser.len(), EMPTY_ROOT_SIZE_DEPSREF); log_debug!("internal_max: {}", internal_max_ser.len()); assert_eq!( internal_max_ser.len(), @@ -974,7 +1738,7 @@ mod test { assert_eq!( root_two_ser.len(), EMPTY_BLOCK_SIZE + 8 * BLOCK_ID_SIZE + 2 * BLOCK_ID_SIZE + 2 * BLOCK_KEY_SIZE - ); + );*/ // let object_size_1 = 4096 * 1 - VALUE_HEADER_SIZE; // let object_size_512 = 4096 * MAX_PAGES_PER_VALUE - VALUE_HEADER_SIZE; diff --git a/p2p-repo/src/store.rs b/p2p-repo/src/store.rs index d732392..9f17b15 100644 --- a/p2p-repo/src/store.rs +++ b/p2p-repo/src/store.rs @@ -18,7 +18,7 @@ use crate::utils::Receiver; use std::sync::{Arc, RwLock}; use std::{ - cmp::min, + cmp::{max, min}, collections::{hash_map::Iter, HashMap}, mem::size_of_val, }; @@ -57,6 +57,8 @@ impl From for StorageError { } } +/* LMDB values: + const MIN_SIZE: usize = 4072; const PAGE_SIZE: usize = 4096; const HEADER: usize = PAGE_SIZE - MIN_SIZE; @@ -75,6 +77,29 @@ pub fn store_valid_value_size(size: usize) -> usize { pub const fn store_max_value_size() -> usize { MAX_FACTOR * PAGE_SIZE - HEADER } +*/ + +// ROCKSDB values: + +const ONE_MEGA_BYTE: usize = 1024 * 1024; +const DISK_BLOCK_SIZE: usize = 4096; +// HDD block size at 4096, SSD page size at 4096, on openbsd FFS default is 16384 +// see Rocksdb integrated BlobDB https://rocksdb.org/blog/2021/05/26/integrated-blob-db.html +// blob values should be multiple of 4096 because of the BlobCache of RocksDB that is in heap memory (so must align on mem page). +const MAX_FACTOR: usize = 256; + +/// Returns a valid/optimal value size for the entries of the storage backend. +pub fn store_valid_value_size(size: usize) -> usize { + min( + max(1, (size as f32 / DISK_BLOCK_SIZE as f32).ceil() as usize), + MAX_FACTOR, + ) * DISK_BLOCK_SIZE +} + +/// Returns the maximum value size for the entries of the storage backend. +pub const fn store_max_value_size() -> usize { + ONE_MEGA_BYTE +} /// Store with a HashMap backend pub struct HashMapRepoStore { diff --git a/p2p-repo/src/types.rs b/p2p-repo/src/types.rs index e1aef73..a6b899a 100644 --- a/p2p-repo/src/types.rs +++ b/p2p-repo/src/types.rs @@ -1,7 +1,5 @@ // Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers // All rights reserved. -// This code is partly derived from work written by TG x Thoth from P2Pcollab. -// Copyright 2022 TG x Thoth // Licensed under the Apache License, Version 2.0 // // or the MIT license , @@ -88,6 +86,10 @@ impl SymKey { pub fn from_array(array: [u8; 32]) -> Self { SymKey::ChaCha20Key(array) } + #[deprecated(note = "**Don't use dummy method**")] + pub fn dummy() -> Self { + SymKey::ChaCha20Key([0; 32]) + } } impl fmt::Display for SymKey { @@ -235,8 +237,8 @@ impl fmt::Display for PrivKey { match self { Self::Ed25519PrivKey(ed) => { //let priv_key_ser = serde_bare::to_vec(ed).unwrap(); - let prix_key_encoded = base64_url::encode(ed); - write!(f, "{}", prix_key_encoded) + let priv_key_encoded = base64_url::encode(ed); + write!(f, "{}", priv_key_encoded) } _ => { unimplemented!(); @@ -254,6 +256,24 @@ pub enum Sig { Ed25519Sig(Ed25519Sig), } +impl fmt::Display for Sig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Ed25519Sig(ed) => { + write!( + f, + "{} {}", + base64_url::encode(&ed[0]), + base64_url::encode(&ed[1]) + ) + } + _ => { + unimplemented!(); + } + } + } +} + /// Timestamp: absolute time in minutes since 2022-02-22 22:22 UTC pub type Timestamp = u32; @@ -268,6 +288,17 @@ pub enum RelTime { Days(u8), } +impl fmt::Display for RelTime { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Seconds(s) => writeln!(f, "{} sec.", s), + Self::Minutes(s) => writeln!(f, "{} min.", s), + Self::Hours(s) => writeln!(f, "{} h.", s), + Self::Days(s) => writeln!(f, "{} d.", s), + } + } +} + /// Bloom filter (variable size) #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct BloomFilter { @@ -330,6 +361,13 @@ pub struct BlockRef { pub key: BlockKey, } +impl BlockId { + #[deprecated(note = "**Don't use dummy method**")] + pub fn dummy() -> Self { + Digest::Blake3Digest32([0u8; 32]) + } +} + impl BlockRef { #[deprecated(note = "**Don't use dummy method**")] pub fn dummy() -> Self { @@ -343,6 +381,12 @@ impl BlockRef { } } +impl From for (BlockId, BlockKey) { + fn from(blockref: BlockRef) -> (BlockId, BlockKey) { + (blockref.id.clone(), blockref.key.clone()) + } +} + impl From<(&BlockId, &BlockKey)> for BlockRef { fn from(id_key: (&BlockId, &BlockKey)) -> Self { BlockRef { @@ -352,6 +396,12 @@ impl From<(&BlockId, &BlockKey)> for BlockRef { } } +impl fmt::Display for BlockRef { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{} {}", self.id, self.key) + } +} + /// Object ID pub type ObjectId = BlockId; @@ -414,6 +464,23 @@ pub enum StoreOverlay { Own(BranchId), // The repo is a store, so the overlay can be derived from its own ID. In this case, the branchId of the `overlay` branch is entered here. } +impl fmt::Display for StoreOverlay { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::V0(v0) => { + writeln!(f, "StoreOverlay V0")?; + match v0 { + StoreOverlayV0::PublicStore(k) => writeln!(f, "PublicStore: {}", k), + StoreOverlayV0::ProtectedStore(k) => writeln!(f, "ProtectedStore: {}", k), + StoreOverlayV0::Group(k) => writeln!(f, "Group: {}", k), + StoreOverlayV0::Dialog(k) => writeln!(f, "Dialog: {}", k), + } + } + Self::Own(b) => writeln!(f, "Own: {}", b), + } + } +} + /// List of Store Root Repo types #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] pub enum StoreRepoV0 { @@ -441,6 +508,14 @@ impl StoreRepo { }, } } + pub fn dummy_public_v0() -> (Self, SymKey) { + let readcap = SymKey::dummy(); + let store_pubkey = PubKey::nil(); + ( + StoreRepo::V0(StoreRepoV0::PublicStore(store_pubkey)), + readcap, + ) + } } /// Site type @@ -560,172 +635,6 @@ pub enum CommitHeader { V0(CommitHeaderV0), } -impl CommitHeader { - pub fn is_root(&self) -> bool { - match self { - CommitHeader::V0(v0) => v0.is_root(), - } - } - pub fn deps(&self) -> Vec { - match self { - CommitHeader::V0(v0) => v0.deps.clone(), - } - } - pub fn acks(&self) -> Vec { - match self { - CommitHeader::V0(v0) => v0.acks.clone(), - } - } - pub fn id(&self) -> &Option { - match self { - CommitHeader::V0(v0) => &v0.id, - } - } - - pub fn set_id(&mut self, id: Digest) { - match self { - CommitHeader::V0(v0) => v0.id = Some(id), - } - } - - pub fn set_compact(&mut self) { - match self { - CommitHeader::V0(v0) => v0.set_compact(), - } - } - - pub fn new_with( - deps: Vec, - ndeps: Vec, - acks: Vec, - nacks: Vec, - refs: Vec, - nrefs: Vec, - ) -> (Option, Option) { - let res = CommitHeaderV0::new_with(deps, ndeps, acks, nacks, refs, nrefs); - ( - res.0.map(|h| CommitHeader::V0(h)), - res.1.map(|h| CommitHeaderKeys::V0(h)), - ) - } - - pub fn new_with_deps(deps: Vec) -> Option { - CommitHeaderV0::new_with_deps(deps).map(|ch| CommitHeader::V0(ch)) - } - - pub fn new_with_deps_and_acks(deps: Vec, acks: Vec) -> Option { - CommitHeaderV0::new_with_deps_and_acks(deps, acks).map(|ch| CommitHeader::V0(ch)) - } -} - -impl CommitHeaderV0 { - fn new_empty() -> Self { - Self { - id: None, - compact: false, - deps: vec![], - ndeps: vec![], - acks: vec![], - nacks: vec![], - refs: vec![], - nrefs: vec![], - } - } - - pub fn set_compact(&mut self) { - self.compact = true; - } - - pub fn new_with( - deps: Vec, - ndeps: Vec, - acks: Vec, - nacks: Vec, - refs: Vec, - nrefs: Vec, - ) -> (Option, Option) { - if deps.is_empty() - && ndeps.is_empty() - && acks.is_empty() - && nacks.is_empty() - && refs.is_empty() - && nrefs.is_empty() - { - (None, None) - } else { - let mut ideps: Vec = vec![]; - let mut indeps: Vec = vec![]; - let mut iacks: Vec = vec![]; - let mut inacks: Vec = vec![]; - let mut irefs: Vec = vec![]; - let mut inrefs: Vec = vec![]; - - let mut kdeps: Vec = vec![]; - let mut kacks: Vec = vec![]; - let mut knacks: Vec = vec![]; - for d in deps { - ideps.push(d.id); - kdeps.push(d.key); - } - for d in ndeps { - indeps.push(d.id); - } - for d in acks { - iacks.push(d.id); - kacks.push(d.key); - } - for d in nacks { - inacks.push(d.id); - knacks.push(d.key); - } - for d in refs.clone() { - irefs.push(d.id); - } - for d in nrefs { - inrefs.push(d.id); - } - ( - Some(Self { - id: None, - compact: false, - deps: ideps, - ndeps: indeps, - acks: iacks, - nacks: inacks, - refs: irefs, - nrefs: inrefs, - }), - Some(CommitHeaderKeysV0 { - deps: kdeps, - acks: kacks, - nacks: knacks, - refs, - }), - ) - } - } - pub fn new_with_deps(deps: Vec) -> Option { - assert!(!deps.is_empty()); - let mut n = Self::new_empty(); - n.deps = deps; - Some(n) - } - - pub fn new_with_deps_and_acks(deps: Vec, acks: Vec) -> Option { - assert!(!deps.is_empty() || !acks.is_empty()); - let mut n = Self::new_empty(); - n.deps = deps; - n.acks = acks; - Some(n) - } - - pub fn is_root(&self) -> bool { - //self.deps.is_empty() - // && self.ndeps.is_empty() - self.acks.is_empty() && self.nacks.is_empty() - } -} - #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct CommitHeaderKeysV0 { /// Other objects this commit strongly depends on (ex: ADD for a REMOVE, refs for an nrefs) @@ -758,6 +667,7 @@ pub enum CommitHeaderObject { None, } +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct CommitHeaderRef { pub obj: CommitHeaderObject, pub key: ObjectKey, @@ -770,6 +680,18 @@ impl CommitHeaderRef { key, } } + pub fn from_content_key(content: Vec, key: ObjectKey) -> Self { + CommitHeaderRef { + obj: CommitHeaderObject::EncryptedContent(content), + key, + } + } + pub fn encrypted_content_len(&self) -> usize { + match &self.obj { + CommitHeaderObject::EncryptedContent(ec) => ec.len(), + _ => 0, + } + } } #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] @@ -780,10 +702,12 @@ pub struct BlockContentV0 { /// It is an easy way to know if the Block is a commit (but be careful because some root commits can be without a header). pub commit_header: CommitHeaderObject, - /// Block IDs for child nodes in the Merkle tree, can be empty if ObjectContent fits in one block + /// Block IDs for child nodes in the Merkle tree, + /// is empty if ObjectContent fits in one block or this block is a leaf. in both cases, encrypted_content is then not empty pub children: Vec, - /// Encrypted ChunkContentV0 (entirety or chunks of ObjectContentV0) + /// contains encrypted ChunkContentV0 (entirety, when fitting, or chunks of ObjectContentV0, in DataChunk) used for leafs of the Merkle tree, + /// or to store the keys of children (in InternalNode) /// /// Encrypted using convergent encryption with ChaCha20: /// - convergence_key: BLAKE3 derive_key ("NextGraph Data BLAKE3 key", @@ -862,8 +786,8 @@ pub struct RepositoryV0 { pub creator: Option, // TODO: discrete doc type - // TODO: order (store, partial order, total order, partial sign all commits, fsm, smart contract ) - // TODO: immutable conditions (allow_change_quorum, min_quorum, allow_inherit_perms, etc...) + // TODO: order (store, partial order, partial sign all commits,(conflict resolution strategy), total order, fsm, smart contract ) + // TODO: immutable conditions (allow_change_owners, allow_change_quorum, min_quorum, allow_inherit_perms, signers_can_be_editors, all_editors_are_signers, etc...) /// Immutable App-specific metadata #[serde(with = "serde_bytes")] pub metadata: Vec, @@ -875,6 +799,23 @@ pub enum Repository { V0(RepositoryV0), } +impl fmt::Display for Repository { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::V0(v0) => { + writeln!(f, "V0")?; + writeln!(f, "repo_id: {}", v0.id)?; + writeln!( + f, + "creator: {}", + v0.creator.map_or("None".to_string(), |c| format!("{}", c)) + )?; + Ok(()) + } + } + } +} + /// Root Branch definition V0 /// /// Second commit in the root branch, signed by repository key @@ -947,6 +888,42 @@ impl RootBranch { } } +impl fmt::Display for RootBranch { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::V0(v0) => { + writeln!(f, "V0")?; + writeln!(f, "repo_id: {}", v0.id)?; + writeln!(f, "repo_ref: {}", v0.repo)?; + write!(f, "store: {}", v0.store)?; + writeln!( + f, + "store_sig: {}", + v0.store_sig + .map_or("None".to_string(), |c| format!("{}", c)) + )?; + writeln!(f, "topic: {}", v0.repo)?; + writeln!( + f, + "inherit_perms: {}", + v0.inherit_perms_users_and_quorum_from_store + .as_ref() + .map_or("None".to_string(), |c| format!("{}", c)) + )?; + writeln!( + f, + "quorum: {}", + v0.quorum + .as_ref() + .map_or("None".to_string(), |c| format!("{}", c)) + )?; + writeln!(f, "reconciliation_interval: {}", v0.reconciliation_interval)?; + Ok(()) + } + } + } +} + /// Quorum definition V0 /// /// Changed when the signers need to be updated. Signers are not necessarily editors of the repo, and they do not need to be members either, as they will be notified of RefreshReadCaps anyway. diff --git a/stores-rocksdb/src/lib.rs b/stores-rocksdb/src/lib.rs index 1214708..7a62c33 100644 --- a/stores-rocksdb/src/lib.rs +++ b/stores-rocksdb/src/lib.rs @@ -1,5 +1,5 @@ -// #[cfg(not(target_arch = "wasm32"))] -// pub mod repo_store; +#[cfg(not(target_arch = "wasm32"))] +pub mod repo_store; #[cfg(not(target_arch = "wasm32"))] pub mod kcv_store; diff --git a/stores-rocksdb/src/repo_store.rs b/stores-rocksdb/src/repo_store.rs index 463fc0f..19c0a0e 100644 --- a/stores-rocksdb/src/repo_store.rs +++ b/stores-rocksdb/src/repo_store.rs @@ -17,7 +17,7 @@ use std::sync::{Arc, RwLock}; use serde::{Deserialize, Serialize}; use serde_bare::error::Error; - +/* #[derive(Debug)] pub struct LmdbRepoStore { /// the main store where all the repo blocks are stored @@ -483,112 +483,111 @@ impl LmdbRepoStore { } } } +*/ #[cfg(test)] mod test { - use crate::repo_store::LmdbRepoStore; use p2p_repo::log::*; use p2p_repo::store::*; use p2p_repo::types::*; use p2p_repo::utils::*; - use rkv::backend::{BackendInfo, BackendStat, Lmdb, LmdbEnvironment}; - use rkv::{Manager, Rkv, StoreOptions, Value}; #[allow(unused_imports)] use std::time::Duration; #[allow(unused_imports)] use std::{fs, thread}; use tempfile::Builder; + /* + #[test] + pub fn test_remove_least_used() { + let path_str = "test-env"; + let root = Builder::new().prefix(path_str).tempdir().unwrap(); + let key: [u8; 32] = [0; 32]; + fs::create_dir_all(root.path()).unwrap(); + log_debug!("{}", root.path().to_str().unwrap()); + let mut store = LmdbRepoStore::open(root.path(), key).unwrap(); + let mut now = now_timestamp(); + now -= 200; + // TODO: fix the LMDB bug that is triggered with x max set to 86 !!! + for x in 1..85 { + let block = Block::new( + Vec::new(), + ObjectDeps::ObjectIdList(Vec::new()), + None, + vec![x; 10], + None, + ); + let block_id = store.put(&block).unwrap(); + log_debug!("#{} -> objId {:?}", x, block_id); + store + .has_been_synced(&block_id, Some(now + x as u32)) + .unwrap(); + } - #[test] - pub fn test_remove_least_used() { - let path_str = "test-env"; - let root = Builder::new().prefix(path_str).tempdir().unwrap(); - let key: [u8; 32] = [0; 32]; - fs::create_dir_all(root.path()).unwrap(); - log_debug!("{}", root.path().to_str().unwrap()); - let mut store = LmdbRepoStore::open(root.path(), key).unwrap(); - let mut now = now_timestamp(); - now -= 200; - // TODO: fix the LMDB bug that is triggered with x max set to 86 !!! - for x in 1..85 { - let block = Block::new( - Vec::new(), - ObjectDeps::ObjectIdList(Vec::new()), - None, - vec![x; 10], - None, - ); - let block_id = store.put(&block).unwrap(); - log_debug!("#{} -> objId {:?}", x, block_id); - store - .has_been_synced(&block_id, Some(now + x as u32)) - .unwrap(); - } - - let ret = store.remove_least_used(200); - log_debug!("removed {}", ret); - assert_eq!(ret, 208) - - //store.list_all(); - } + let ret = store.remove_least_used(200); + log_debug!("removed {}", ret); + assert_eq!(ret, 208) - #[test] - pub fn test_set_pin() { - let path_str = "test-env"; - let root = Builder::new().prefix(path_str).tempdir().unwrap(); - let key: [u8; 32] = [0; 32]; - fs::create_dir_all(root.path()).unwrap(); - log_debug!("{}", root.path().to_str().unwrap()); - let mut store = LmdbRepoStore::open(root.path(), key).unwrap(); - let mut now = now_timestamp(); - now -= 200; - // TODO: fix the LMDB bug that is triggered with x max set to 86 !!! - for x in 1..100 { - let block = Block::new( - Vec::new(), - ObjectDeps::ObjectIdList(Vec::new()), - None, - vec![x; 10], - None, - ); - let obj_id = store.put(&block).unwrap(); - log_debug!("#{} -> objId {:?}", x, obj_id); - store.set_pin(&obj_id, true).unwrap(); - store - .has_been_synced(&obj_id, Some(now + x as u32)) - .unwrap(); + //store.list_all(); } - let ret = store.remove_least_used(200); - log_debug!("removed {}", ret); - assert_eq!(ret, 0); + #[test] + pub fn test_set_pin() { + let path_str = "test-env"; + let root = Builder::new().prefix(path_str).tempdir().unwrap(); + let key: [u8; 32] = [0; 32]; + fs::create_dir_all(root.path()).unwrap(); + log_debug!("{}", root.path().to_str().unwrap()); + let mut store = LmdbRepoStore::open(root.path(), key).unwrap(); + let mut now = now_timestamp(); + now -= 200; + // TODO: fix the LMDB bug that is triggered with x max set to 86 !!! + for x in 1..100 { + let block = Block::new( + Vec::new(), + ObjectDeps::ObjectIdList(Vec::new()), + None, + vec![x; 10], + None, + ); + let obj_id = store.put(&block).unwrap(); + log_debug!("#{} -> objId {:?}", x, obj_id); + store.set_pin(&obj_id, true).unwrap(); + store + .has_been_synced(&obj_id, Some(now + x as u32)) + .unwrap(); + } - store.list_all(); - } + let ret = store.remove_least_used(200); + log_debug!("removed {}", ret); + assert_eq!(ret, 0); + store.list_all(); + } + */ #[test] pub fn test_get_valid_value_size() { - assert_eq!(store_valid_value_size(0), 4072); - assert_eq!(store_valid_value_size(2), 4072); - assert_eq!(store_valid_value_size(4072), 4072); - assert_eq!(store_valid_value_size(4072 + 1), 4072 + 4096); - assert_eq!(store_valid_value_size(4072 + 4096), 4072 + 4096); - assert_eq!(store_valid_value_size(4072 + 4096 + 1), 4072 + 4096 + 4096); + assert_eq!(store_valid_value_size(0), 4096); + assert_eq!(store_valid_value_size(2), 4096); + assert_eq!(store_valid_value_size(4096 - 1), 4096); + assert_eq!(store_valid_value_size(4096), 4096); + assert_eq!(store_valid_value_size(4096 + 1), 4096 + 4096); + assert_eq!(store_valid_value_size(4096 + 4096), 4096 + 4096); + assert_eq!(store_valid_value_size(4096 + 4096 + 1), 4096 + 4096 + 4096); assert_eq!( - store_valid_value_size(4072 + 4096 + 4096), - 4072 + 4096 + 4096 + store_valid_value_size(4096 + 4096 + 4096), + 4096 + 4096 + 4096 ); assert_eq!( - store_valid_value_size(4072 + 4096 + 4096 + 1), - 4072 + 4096 + 4096 + 4096 + store_valid_value_size(4096 + 4096 + 4096 + 1), + 4096 + 4096 + 4096 + 4096 ); - assert_eq!(store_valid_value_size(4072 + 4096 * 511), 4072 + 4096 * 511); + assert_eq!(store_valid_value_size(4096 + 4096 * 255), 4096 + 4096 * 255); assert_eq!( - store_valid_value_size(4072 + 4096 * 511 + 1), - 4072 + 4096 * 511 + store_valid_value_size(4096 + 4096 * 255 + 1), + 4096 + 4096 * 255 ); } - + /* #[test] pub fn test_remove_expired() { let path_str = "test-env"; @@ -740,7 +739,7 @@ mod test { } #[test] - pub fn test_lmdb() { + pub fn test_rocksdb() { let path_str = "test-env"; let root = Builder::new().prefix(path_str).tempdir().unwrap(); @@ -989,4 +988,5 @@ mod test { // uncomment this if you need time to copy them somewhere for analysis, before the temp folder get destroyed //thread::sleep(Duration::from_millis(20000)); } + */ }