refactor object/block DAG, CommitHeader, sync_req, test, rocksdb integration

Niko PLP 1 month ago
parent 8fafe0cb33
commit 8254dff83b
  1. 102
      Cargo.lock
  2. 1
      Cargo.toml
  3. 9
      README.md
  4. 2
      ng-wallet/src/lib.rs
  5. 2
      ng-wallet/src/types.rs
  6. 1
      ngaccount/Cargo.toml
  7. 1
      ngaccount/src/main.rs
  8. 4
      ngd/src/main.rs
  9. 2
      p2p-broker/src/server.rs
  10. 2
      p2p-net/src/errors.rs
  11. 16
      p2p-net/src/types.rs
  12. 2
      p2p-net/src/utils.rs
  13. 4
      p2p-repo/src/block.rs
  14. 173
      p2p-repo/src/branch.rs
  15. 422
      p2p-repo/src/commit.rs
  16. 2
      p2p-repo/src/errors.rs
  17. 1350
      p2p-repo/src/object.rs
  18. 27
      p2p-repo/src/store.rs
  19. 325
      p2p-repo/src/types.rs
  20. 4
      stores-rocksdb/src/lib.rs
  21. 162
      stores-rocksdb/src/repo_store.rs

102
Cargo.lock generated

@ -538,15 +538,6 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
[[package]]
name = "bincode"
version = "1.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad"
dependencies = [
"serde",
]
[[package]]
name = "bindgen"
version = "0.65.1"
@ -610,7 +601,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83fc15853171b33280f5614e77f5fa4debd33f51a86c44daa4ba3d759674c561"
dependencies = [
"base64 0.13.1",
"uuid 1.3.4",
"uuid",
]
[[package]]
@ -790,7 +781,7 @@ checksum = "d38f2da7a0a2c4ccf0065be06397cc26a81f4e528be095826eee9d4adbb8c60f"
dependencies = [
"byteorder",
"fnv",
"uuid 1.3.4",
"uuid",
]
[[package]]
@ -2474,12 +2465,6 @@ dependencies = [
"png",
]
[[package]]
name = "id-arena"
version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25a2bc672d1148e28034f176e01fffebb08b35768468cc954630da77a1449005"
[[package]]
name = "ident_case"
version = "1.0.1"
@ -2825,27 +2810,6 @@ version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519"
[[package]]
name = "lmdb-crypto-rs"
version = "0.14.0"
source = "git+https://git.nextgraph.org/NextGraph/lmdb-rs.git?branch=master#e70c3cae74e0d71b6a4e5c81b0c4909210a6d808"
dependencies = [
"bitflags",
"byteorder",
"libc",
"lmdb-crypto-sys",
]
[[package]]
name = "lmdb-crypto-sys"
version = "0.11.2"
source = "git+https://git.nextgraph.org/NextGraph/lmdb-rs.git?branch=master#e70c3cae74e0d71b6a4e5c81b0c4909210a6d808"
dependencies = [
"cc",
"libc",
"pkg-config",
]
[[package]]
name = "lock_api"
version = "0.4.10"
@ -3233,7 +3197,6 @@ dependencies = [
"serde_bare",
"serde_bytes",
"serde_json",
"stores-lmdb",
"tokio",
"warp",
"warp-embed",
@ -3551,15 +3514,6 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "ordered-float"
version = "3.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fc2dbde8f8a79f2102cc474ceb0ad68e3b80b85289ea62389b60e66777e4213"
dependencies = [
"num-traits",
]
[[package]]
name = "ordered-stream"
version = "0.2.0"
@ -4390,28 +4344,6 @@ dependencies = [
"winreg 0.10.1",
]
[[package]]
name = "rkv"
version = "0.18.0"
source = "git+https://git.nextgraph.org/NextGraph/rkv.git?rev=c746abb443b7bb4541ebbef2b71e8d0f9eb39f6a#c746abb443b7bb4541ebbef2b71e8d0f9eb39f6a"
dependencies = [
"arrayref",
"bincode",
"bitflags",
"byteorder",
"id-arena",
"lazy_static",
"lmdb-crypto-rs",
"log",
"ordered-float",
"paste",
"serde",
"serde_derive",
"thiserror",
"url",
"uuid 0.8.2",
]
[[package]]
name = "rocksdb"
version = "0.21.0"
@ -4965,18 +4897,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "stores-lmdb"
version = "0.1.0"
dependencies = [
"hex",
"p2p-repo",
"rkv",
"serde",
"serde_bare",
"tempfile",
]
[[package]]
name = "stores-rocksdb"
version = "0.1.0"
@ -5148,7 +5068,7 @@ dependencies = [
"tao-macros",
"unicode-segmentation",
"url",
"uuid 1.3.4",
"uuid",
"windows",
"windows-implement",
"x11-dl",
@ -5215,7 +5135,7 @@ dependencies = [
"tokio",
"tray-icon",
"url",
"uuid 1.3.4",
"uuid",
"webkit2gtk",
"webview2-com",
"windows",
@ -5263,7 +5183,7 @@ dependencies = [
"thiserror",
"time 0.3.23",
"url",
"uuid 1.3.4",
"uuid",
"walkdir",
]
@ -5308,7 +5228,7 @@ dependencies = [
"tauri-utils",
"thiserror",
"url",
"uuid 1.3.4",
"uuid",
"windows",
]
@ -5327,7 +5247,7 @@ dependencies = [
"raw-window-handle",
"tauri-runtime",
"tauri-utils",
"uuid 1.3.4",
"uuid",
"webkit2gtk",
"webview2-com",
"windows",
@ -5860,7 +5780,7 @@ checksum = "ae605c39dfbdec433798d4a8b03ffbac711dc51cdeb1ba5c725bdcaf24e464cc"
dependencies = [
"blob-uuid",
"lazy_static",
"uuid 1.3.4",
"uuid",
]
[[package]]
@ -5897,12 +5817,6 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "uuid"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
[[package]]
name = "uuid"
version = "1.3.4"

@ -5,7 +5,6 @@ members = [
"p2p-broker",
"p2p-client-ws",
"p2p-verifier",
"stores-lmdb",
"stores-rocksdb",
"ngcli",
"ngd",

@ -53,7 +53,6 @@ The crates are organized as follow :
- p2p-broker : the broker code (as server and core node)
- p2p-client-ws : the client connecting to a broker with WebSocket, used by the apps and verifier
- p2p-verifier : the code of the verifier
- stores-lmdb : lmdb backed stores (not used anymore)
- stores-rocksdb : RocksDB backed stores. see [repo here](https://git.nextgraph.org/NextGraph/rust-rocksdb)
- ngcli : CLI tool to manipulate the repos and administrate the server
- ngd : binary executable of the daemon (that can run a broker, verifier and/or Rust services)
@ -82,19 +81,19 @@ For the web apps, see the [README](ng-app/README.md)
Test all:
```
cargo test --all --verbose -- --nocapture
cargo test --all --verbose -- --show-output --nocapture
```
Test a single module:
```
cargo test --package p2p-repo --lib -- branch::test --nocapture
cargo test --package p2p-repo --lib -- branch::test --show-output --nocapture
```
Test end-to-end client and server:
```
cargo test --package ngcli -- --nocapture
cargo test --package ngcli -- --show-output --nocapture
```
Test WASM websocket
@ -107,7 +106,7 @@ wasm-pack test --chrome --headless
Test Rust websocket
```
cargo test --package p2p-client-ws --lib -- remote_ws::test::test_ws --nocapture
cargo test --package p2p-client-ws --lib -- remote_ws::test::test_ws --show-output --nocapture
```
### Build release binaries

@ -499,7 +499,7 @@ pub async fn connect_wallet(
}
let brokers = broker.unwrap();
let mut tried: Option<(String, String, String, Option<String>, f64)> = None;
//TODO: on tauri (or forward in local broker, or CLI), prefer a BoxPublic to a Domain. Domain always comes first though, so we need to reorder the list
//TODO: on tauri (or forward in local broker, or CLI), prefer a Public to a Domain. Domain always comes first though, so we need to reorder the list
//TODO: use site.bootstraps to order the list of brokerInfo.
for broker_info in brokers {
match broker_info {

@ -255,7 +255,7 @@ impl EncryptedWalletV0 {
// Creating a new client
let client = ClientV0::new_with_auto_open(self.personal_site);
self.add_client(client.clone());
let mut log = self.log.as_mut().unwrap();
let log = self.log.as_mut().unwrap();
log.add(WalletOperation::SetClientV0(client.clone()));
let (peer_id, nonce) = session.get_first_user_peer_nonce()?;
Ok((

@ -14,7 +14,6 @@ warp-embed = "0.4"
rust-embed = "6"
log = "0.4"
env_logger = "0.10"
stores-lmdb = { path = "../stores-lmdb" }
p2p-repo = { path = "../p2p-repo", features = ["server_log_output"] }
p2p-net = { path = "../p2p-net" }
p2p-client-ws = { path = "../p2p-client-ws" }

@ -15,7 +15,6 @@ use duration_str::parse;
use p2p_client_ws::remote_ws::ConnectionWebSocket;
use p2p_net::actors::add_invitation::*;
use p2p_net::broker::BROKER;
use p2p_repo::store::StorageError;
use serde::{Deserialize, Serialize};
use warp::http::header::{HeaderMap, HeaderValue};
use warp::reply::Response;

@ -592,7 +592,7 @@ async fn main_inner() -> Result<(), ()> {
}
// --core
// core listeners always come after the domain ones, which is good as the first bootstrap in the list should be the domain (if there is also a core_with_clients that generates a BoxPublic bootstrap)
// core listeners always come after the domain ones, which is good as the first bootstrap in the list should be the domain (if there is also a core_with_clients that generates a Public bootstrap)
if args.core.is_some() {
let arg_value =
parse_interface_and_port_for(args.core.as_ref().unwrap(), "--core", DEFAULT_PORT)?;
@ -924,7 +924,7 @@ async fn main_inner() -> Result<(), ()> {
if is_private_ip(&bind.0) {
BrokerServerTypeV0::BoxPrivate(vec![bind_addr])
} else if is_public_ip(&bind.0) {
BrokerServerTypeV0::BoxPublic(vec![bind_addr])
BrokerServerTypeV0::Public(vec![bind_addr])
} else {
log_err!("Invalid IP address given for --forward option. cannot start");
return Err(());

@ -36,8 +36,6 @@ use p2p_repo::store::RepoStore;
use p2p_repo::store::StorageError;
use p2p_repo::types::*;
use p2p_repo::utils::*;
use stores_lmdb::broker_store::LmdbKCVStore;
use stores_lmdb::repo_store::LmdbRepoStore;
#[derive(Debug, Eq, PartialEq, Clone)]
pub enum BrokerError {

@ -1,7 +1,5 @@
// Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// This code is partly derived from work written by TG x Thoth from P2Pcollab.
// Copyright 2022 TG x Thoth
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,

@ -1,7 +1,5 @@
// Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// This code is partly derived from work written by TG x Thoth from P2Pcollab.
// Copyright 2022 TG x Thoth
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
@ -135,7 +133,7 @@ pub enum BrokerCore {
pub enum BrokerServerTypeV0 {
Localhost(u16), // optional port number
BoxPrivate(Vec<BindAddress>),
BoxPublic(Vec<BindAddress>),
Public(Vec<BindAddress>),
BoxPublicDyn(Vec<BindAddress>), // can be empty
Domain(String), // accepts an optional trailing ":port" number
//Core(Vec<BindAddress>),
@ -279,7 +277,7 @@ impl BrokerServerV0 {
/// set ipv6 only if the browser connected with a remote IPV6. always set ipv4 as a fallback (for now).
pub async fn get_url_for_ngone(&self, ipv4: bool, ipv6: bool) -> Option<String> {
match &self.server_type {
BrokerServerTypeV0::BoxPublic(addrs) => {
BrokerServerTypeV0::Public(addrs) => {
Self::app_ng_one_bootstrap_url_with_first_ipv6_or_ipv4(
ipv4,
ipv6,
@ -338,7 +336,7 @@ impl BrokerServerV0 {
match &self.server_type {
BrokerServerTypeV0::Localhost(_) => false,
BrokerServerTypeV0::BoxPrivate(_) => false,
BrokerServerTypeV0::BoxPublic(_) => true,
BrokerServerTypeV0::Public(_) => true,
BrokerServerTypeV0::BoxPublicDyn(_) => true,
BrokerServerTypeV0::Domain(_) => true,
}
@ -356,7 +354,7 @@ impl BrokerServerV0 {
let location = location.as_ref().unwrap();
if location.starts_with(APP_NG_ONE_URL) {
match &self.server_type {
BrokerServerTypeV0::BoxPublic(addrs) => {
BrokerServerTypeV0::Public(addrs) => {
Some((APP_NG_ONE_WS_URL.to_string(), addrs.clone()))
}
BrokerServerTypeV0::BoxPublicDyn(addrs) => {
@ -425,7 +423,7 @@ impl BrokerServerV0 {
//BrokerServerTypeV0::Core(_) => None,
BrokerServerTypeV0::Localhost(port) => Some((local_ws_url(port), vec![])),
BrokerServerTypeV0::BoxPrivate(addrs) => Some((String::new(), addrs.clone())),
BrokerServerTypeV0::BoxPublic(addrs) => Some((String::new(), addrs.clone())),
BrokerServerTypeV0::Public(addrs) => Some((String::new(), addrs.clone())),
BrokerServerTypeV0::BoxPublicDyn(addrs) => {
// let resp = reqwest::get(api_dyn_peer_url(&self.peer_id)).await;
// if resp.is_ok() {
@ -1041,7 +1039,7 @@ impl ListenerV0 {
let pub_addrs = self.accept_forward_for.get_public_bind_addresses();
//res.push(BrokerServerTypeV0::Core(pub_addrs.clone()));
if !self.refuse_clients {
res.push(BrokerServerTypeV0::BoxPublic(pub_addrs));
res.push(BrokerServerTypeV0::Public(pub_addrs));
}
if self.accept_direct {
res.push(BrokerServerTypeV0::BoxPrivate(addrs));
@ -1083,7 +1081,7 @@ impl ListenerV0 {
} else if self.if_type == InterfaceType::Public {
//res.push(BrokerServerTypeV0::Core(addrs.clone()));
if !self.refuse_clients {
res.push(BrokerServerTypeV0::BoxPublic(addrs));
res.push(BrokerServerTypeV0::Public(addrs));
}
} else if self.if_type == InterfaceType::Private {
res.push(BrokerServerTypeV0::BoxPrivate(addrs));

@ -67,7 +67,7 @@ pub fn decode_invitation_string(string: String) -> Option<Invitation> {
pub fn check_is_local_url(bootstrap: &BrokerServerV0, location: &String) -> Option<String> {
if location.starts_with(APP_NG_ONE_URL) {
match &bootstrap.server_type {
BrokerServerTypeV0::BoxPublic(_) | BrokerServerTypeV0::BoxPublicDyn(_) => {
BrokerServerTypeV0::Public(_) | BrokerServerTypeV0::BoxPublicDyn(_) => {
return Some(APP_NG_ONE_WS_URL.to_string());
}
_ => {}

@ -92,6 +92,10 @@ impl Block {
Block::V0(BlockV0::new(children, header_ref, content, key))
}
pub fn size(&self) -> usize {
serde_bare::to_vec(&self).unwrap().len()
}
/// Compute the ID
pub fn compute_id(&self) -> BlockId {
match self {

@ -1,7 +1,5 @@
// Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// This code is partly derived from work written by TG x Thoth from P2Pcollab.
// Copyright 2022 TG x Thoth
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
@ -60,114 +58,97 @@ impl Branch {
}
/// Branch sync request from another peer
/// `target_heads` represents the list of heads the requester would like to reach. this list should not be empty.
/// if the requester doesn't know what to reach, the responder should fill this list with their own current local head.
/// `known_heads` represents the list of current heads at the requester replica at the moment of request.
/// an empty list means the requester has an empty branch locally
///
/// Return ObjectIds to send
pub fn sync_req(
our_heads: &[ObjectId],
their_heads: &[ObjectId],
their_filter: &BloomFilter,
target_heads: &[ObjectId],
known_heads: &[ObjectId],
//their_filter: &BloomFilter,
store: &Box<impl RepoStore + ?Sized>,
) -> Result<Vec<ObjectId>, ObjectParseError> {
//log_debug!(">> sync_req");
//log_debug!(" our_heads: {:?}", our_heads);
//log_debug!(" their_heads: {:?}", their_heads);
//log_debug!(" target_heads: {:?}", target_heads);
//log_debug!(" known_heads: {:?}", known_heads);
/// Load `Commit` `Object`s of a `Branch` from the `RepoStore` starting from the given `Object`,
/// and collect `ObjectId`s starting from `our_heads` towards `their_heads`
fn load_branch(
/// Load causal past of a Commit `cobj` in a `Branch` from the `RepoStore`,
/// and collect in `visited` the ObjectIds encountered on the way, stopping at any commit already belonging to `theirs` or the root of DAG.
/// optionally collecting the missing objects/blocks that couldn't be found locally on the way
fn load_causal_past(
cobj: &Object,
store: &Box<impl RepoStore + ?Sized>,
their_heads: &[ObjectId],
theirs: &HashSet<ObjectId>,
visited: &mut HashSet<ObjectId>,
missing: &mut HashSet<ObjectId>,
) -> Result<bool, ObjectParseError> {
missing: &mut Option<&mut HashSet<ObjectId>>,
) -> Result<(), ObjectParseError> {
let id = cobj.id();
//log_debug!(">>> load_branch: {}", id);
// root has no acks
let is_root = cobj.is_root();
//log_debug!(" acks: {:?}", cobj.acks());
// check if this commit object is present in their_heads
let mut their_head_found = their_heads.contains(&id);
// load deps, stop at the root or if this is a commit object from their_heads
if !is_root && !their_head_found {
// check if this commit object is present in theirs or has already been visited in the current walk
// load deps, stop at the root(including it in visited) or if this is a commit object from known_heads
if !theirs.contains(&id) && !visited.contains(&id) {
visited.insert(id);
for id in cobj.deps() {
for id in cobj.acks_and_nacks() {
match Object::load(id, None, store) {
Ok(o) => {
if !visited.contains(&id) {
if load_branch(&o, store, their_heads, visited, missing)? {
their_head_found = true;
}
}
load_causal_past(&o, store, theirs, visited, missing)?;
}
Err(ObjectParseError::MissingBlocks(m)) => {
missing.extend(m);
Err(ObjectParseError::MissingBlocks(blocks)) => {
missing.as_mut().map(|m| m.extend(blocks));
}
Err(e) => return Err(e),
}
}
}
Ok(their_head_found)
Ok(())
}
// missing commits from our branch
let mut missing = HashSet::new();
// our commits
let mut ours = HashSet::new();
// their commits
let mut theirs = HashSet::new();
// collect all commits reachable from our_heads
// up to the root or until encountering a commit from their_heads
for id in our_heads {
let cobj = Object::load(*id, None, store)?;
let mut visited = HashSet::new();
let their_head_found =
load_branch(&cobj, store, their_heads, &mut visited, &mut missing)?;
//log_debug!("<<< load_branch: {}", their_head_found);
ours.extend(visited); // add if one of their_heads found
// collect causal past of known_heads
for id in known_heads {
if let Ok(cobj) = Object::load(*id, None, store) {
load_causal_past(&cobj, store, &HashSet::new(), &mut theirs, &mut None)?;
}
// we silently discard any load error on the known_heads as the responder might not know them (yet).
}
// collect all commits reachable from their_heads
for id in their_heads {
let cobj = Object::load(*id, None, store)?;
let mut visited = HashSet::new();
let their_head_found = load_branch(&cobj, store, &[], &mut visited, &mut missing)?;
//log_debug!("<<< load_branch: {}", their_head_found);
theirs.extend(visited); // add if one of their_heads found
let mut visited = HashSet::new();
// collect all commits reachable from target_heads
// up to the root or until encountering a commit from theirs
for id in target_heads {
if let Ok(cobj) = Object::load(*id, None, store) {
load_causal_past(&cobj, store, &theirs, &mut visited, &mut None)?;
}
// we silently discard any load error on the target_heads as they can be wrong if the requester is confused about what the responder has locally.
}
let mut result = &ours - &theirs;
//log_debug!("!! ours: {:?}", ours);
//log_debug!("!! theirs: {:?}", theirs);
//log_debug!("!! result: {:?}", result);
// remove their_commits from result
let filter = Filter::from_u8_array(their_filter.f.as_slice(), their_filter.k.into());
for id in result.clone() {
match id {
Digest::Blake3Digest32(d) => {
if filter.contains(&d) {
result.remove(&id);
}
}
}
}
// let filter = Filter::from_u8_array(their_filter.f.as_slice(), their_filter.k.into());
// for id in result.clone() {
// match id {
// Digest::Blake3Digest32(d) => {
// if filter.contains(&d) {
// result.remove(&id);
// }
// }
// }
// }
//log_debug!("!! result filtered: {:?}", result);
Ok(Vec::from_iter(result))
Ok(Vec::from_iter(visited))
}
}
mod test {
use std::collections::HashMap;
use ed25519_dalek::*;
use fastbloom_rs::{BloomFilter as Filter, FilterBuilder, Membership};
use rand::rngs::OsRng;
//use fastbloom_rs::{BloomFilter as Filter, FilterBuilder, Membership};
use crate::branch::*;
use crate::commit::*;
@ -175,6 +156,7 @@ mod test {
use crate::repo;
use crate::repo::Repo;
use crate::store::*;
use crate::utils::*;
#[test]
pub fn test_branch() {
@ -286,36 +268,17 @@ mod test {
}
let store = Box::new(HashMapRepoStore::new());
let mut rng = OsRng {};
// repo
let repo_keypair: Keypair = Keypair::generate(&mut rng);
log_debug!(
"repo private key: ({}) {:?}",
repo_keypair.secret.as_bytes().len(),
repo_keypair.secret.as_bytes()
);
log_debug!(
"repo public key: ({}) {:?}",
repo_keypair.public.as_bytes().len(),
repo_keypair.public.as_bytes()
);
let repo_privkey = PrivKey::Ed25519PrivKey(repo_keypair.secret.to_bytes());
let repo_pubkey = PubKey::Ed25519PubKey(repo_keypair.public.to_bytes());
let repo_secret = SymKey::ChaCha20Key([9; 32]);
let store_repo = StoreRepo::V0(StoreRepoV0::PublicStore(repo_pubkey));
let (repo_privkey, repo_pubkey) = generate_keypair();
let (store_repo, repo_secret) = StoreRepo::dummy_public_v0();
// branch
let branch_keypair: Keypair = Keypair::generate(&mut rng);
log_debug!("branch public key: {:?}", branch_keypair.public.as_bytes());
let branch_pubkey = PubKey::Ed25519PubKey(branch_keypair.public.to_bytes());
let (branch_privkey, branch_pubkey) = generate_keypair();
let member_keypair: Keypair = Keypair::generate(&mut rng);
log_debug!("member public key: {:?}", member_keypair.public.as_bytes());
let member_privkey = PrivKey::Ed25519PrivKey(member_keypair.secret.to_bytes());
let member_pubkey = PubKey::Ed25519PubKey(member_keypair.public.to_bytes());
let (member_privkey, member_pubkey) = generate_keypair();
let metadata = [66u8; 64].to_vec();
@ -481,28 +444,28 @@ mod test {
let mut c7 = Commit::load(a7.clone(), repo.get_store(), true).unwrap();
c7.verify(&repo).unwrap();
let mut filter = Filter::new(FilterBuilder::new(10, 0.01));
for commit_ref in [br, t1, t2, t5.clone(), a6.clone()] {
match commit_ref.id {
ObjectId::Blake3Digest32(d) => filter.add(&d),
}
}
let cfg = filter.config();
let their_commits = BloomFilter {
k: cfg.hashes,
f: filter.get_u8_array().to_vec(),
};
// let mut filter = Filter::new(FilterBuilder::new(10, 0.01));
// for commit_ref in [br, t1, t2, t5.clone(), a6.clone()] {
// match commit_ref.id {
// ObjectId::Blake3Digest32(d) => filter.add(&d),
// }
// }
// let cfg = filter.config();
// let their_commits = BloomFilter {
// k: cfg.hashes,
// f: filter.get_u8_array().to_vec(),
// };
print_branch();
log_debug!(">> sync_req");
log_debug!(" our_heads: [a3, t5, a6, a7]");
log_debug!(" their_heads: [a3, t5]");
log_debug!(" known_heads: [a3, t5]");
log_debug!(" their_commits: [br, t1, t2, a3, t5, a6]");
let ids = Branch::sync_req(
&[t5.id, a6.id, a7.id],
&[t5.id],
&their_commits,
//&their_commits,
repo.get_store(),
)
.unwrap();

@ -1,7 +1,5 @@
// Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// This code is partly derived from work written by TG x Thoth from P2Pcollab.
// Copyright 2022 TG x Thoth
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
@ -11,7 +9,8 @@
//! Commit
use ed25519_dalek::*;
use core::fmt;
//use ed25519_dalek::*;
use once_cell::sync::OnceCell;
use crate::errors::NgError;
@ -20,6 +19,7 @@ use crate::object::*;
use crate::repo::Repo;
use crate::store::*;
use crate::types::*;
use crate::utils::*;
use std::collections::HashSet;
use std::iter::FromIterator;
@ -60,7 +60,7 @@ impl CommitV0 {
nrefs: Vec<ObjectRef>,
metadata: Vec<u8>,
body: ObjectRef,
) -> Result<CommitV0, SignatureError> {
) -> Result<CommitV0, NgError> {
let headers = CommitHeader::new_with(deps, ndeps, acks, nacks, refs, nrefs);
let content = CommitContentV0 {
perms: vec![],
@ -75,17 +75,7 @@ impl CommitV0 {
let content_ser = serde_bare::to_vec(&content).unwrap();
// sign commit
let kp = match (author_privkey, author_pubkey) {
(PrivKey::Ed25519PrivKey(sk), PubKey::Ed25519PubKey(pk)) => [sk, pk].concat(),
(_, _) => panic!("cannot sign with Montgomery key"),
};
let keypair = Keypair::from_bytes(kp.as_slice())?;
let sig_bytes = keypair.sign(content_ser.as_slice()).to_bytes();
let mut it = sig_bytes.chunks_exact(32);
let mut ss: Ed25519Sig = [[0; 32], [0; 32]];
ss[0].copy_from_slice(it.next().unwrap());
ss[1].copy_from_slice(it.next().unwrap());
let sig = Sig::Ed25519Sig(ss);
let sig = sign(&author_privkey, &author_pubkey, &content_ser)?;
Ok(CommitV0 {
content: CommitContent::V0(content),
sig,
@ -113,7 +103,7 @@ impl Commit {
nrefs: Vec<ObjectRef>,
metadata: Vec<u8>,
body: ObjectRef,
) -> Result<Commit, SignatureError> {
) -> Result<Commit, NgError> {
CommitV0::new(
author_privkey,
author_pubkey,
@ -297,6 +287,13 @@ impl Commit {
}
}
/// Get commit content
pub fn content(&self) -> &CommitContent {
match self {
Commit::V0(CommitV0 { content: c, .. }) => c,
}
}
pub fn body(&self) -> Option<&CommitBody> {
match self {
Commit::V0(c) => c.body.get(),
@ -432,7 +429,7 @@ impl Commit {
}
/// Verify commit signature
pub fn verify_sig(&self) -> Result<(), SignatureError> {
pub fn verify_sig(&self) -> Result<(), NgError> {
let c = match self {
Commit::V0(c) => c,
};
@ -767,40 +764,377 @@ impl CommitBody {
}
}
impl fmt::Display for CommitHeader {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
CommitHeader::V0(v0) => {
writeln!(
f,
"v0 - compact:{} id:{}",
v0.compact,
v0.id.map_or("None".to_string(), |i| format!("{}", i))
)?;
writeln!(f, "==== acks : {}", v0.acks.len())?;
for ack in &v0.acks {
writeln!(f, "============== {}", ack)?;
}
writeln!(f, "==== nacks : {}", v0.nacks.len())?;
for nack in &v0.nacks {
writeln!(f, "============== {}", nack)?;
}
writeln!(f, "==== deps : {}", v0.deps.len())?;
for dep in &v0.deps {
writeln!(f, "============== {}", dep)?;
}
writeln!(f, "==== ndeps : {}", v0.ndeps.len())?;
for ndep in &v0.ndeps {
writeln!(f, "============== {}", ndep)?;
}
writeln!(f, "==== refs : {}", v0.refs.len())?;
for rref in &v0.refs {
writeln!(f, "============== {}", rref)?;
}
writeln!(f, "==== nrefs : {}", v0.nrefs.len())?;
for nref in &v0.nrefs {
writeln!(f, "============== {}", nref)?;
}
Ok(())
}
}
}
}
impl CommitHeader {
pub fn is_root(&self) -> bool {
match self {
CommitHeader::V0(v0) => v0.is_root(),
}
}
pub fn deps(&self) -> Vec<ObjectId> {
match self {
CommitHeader::V0(v0) => v0.deps.clone(),
}
}
pub fn acks(&self) -> Vec<ObjectId> {
match self {
CommitHeader::V0(v0) => v0.acks.clone(),
}
}
pub fn acks_and_nacks(&self) -> Vec<ObjectId> {
match self {
CommitHeader::V0(v0) => {
let mut res = v0.acks.clone();
res.extend_from_slice(&v0.nacks);
res
}
}
}
pub fn id(&self) -> &Option<ObjectId> {
match self {
CommitHeader::V0(v0) => &v0.id,
}
}
pub fn set_id(&mut self, id: Digest) {
match self {
CommitHeader::V0(v0) => v0.id = Some(id),
}
}
pub fn set_compact(&mut self) {
match self {
CommitHeader::V0(v0) => v0.set_compact(),
}
}
pub fn new_with(
deps: Vec<ObjectRef>,
ndeps: Vec<ObjectRef>,
acks: Vec<ObjectRef>,
nacks: Vec<ObjectRef>,
refs: Vec<ObjectRef>,
nrefs: Vec<ObjectRef>,
) -> (Option<Self>, Option<CommitHeaderKeys>) {
let res = CommitHeaderV0::new_with(deps, ndeps, acks, nacks, refs, nrefs);
(
res.0.map(|h| CommitHeader::V0(h)),
res.1.map(|h| CommitHeaderKeys::V0(h)),
)
}
pub fn new_with_deps(deps: Vec<ObjectId>) -> Option<Self> {
CommitHeaderV0::new_with_deps(deps).map(|ch| CommitHeader::V0(ch))
}
pub fn new_with_deps_and_acks(deps: Vec<ObjectId>, acks: Vec<ObjectId>) -> Option<Self> {
CommitHeaderV0::new_with_deps_and_acks(deps, acks).map(|ch| CommitHeader::V0(ch))
}
pub fn new_with_acks(acks: Vec<ObjectId>) -> Option<Self> {
CommitHeaderV0::new_with_acks(acks).map(|ch| CommitHeader::V0(ch))
}
}
impl CommitHeaderV0 {
fn new_empty() -> Self {
Self {
id: None,
compact: false,
deps: vec![],
ndeps: vec![],
acks: vec![],
nacks: vec![],
refs: vec![],
nrefs: vec![],
}
}
pub fn set_compact(&mut self) {
self.compact = true;
}
pub fn new_with(
deps: Vec<ObjectRef>,
ndeps: Vec<ObjectRef>,
acks: Vec<ObjectRef>,
nacks: Vec<ObjectRef>,
refs: Vec<ObjectRef>,
nrefs: Vec<ObjectRef>,
) -> (Option<Self>, Option<CommitHeaderKeysV0>) {
if deps.is_empty()
&& ndeps.is_empty()
&& acks.is_empty()
&& nacks.is_empty()
&& refs.is_empty()
&& nrefs.is_empty()
{
(None, None)
} else {
let mut ideps: Vec<ObjectId> = vec![];
let mut indeps: Vec<ObjectId> = vec![];
let mut iacks: Vec<ObjectId> = vec![];
let mut inacks: Vec<ObjectId> = vec![];
let mut irefs: Vec<ObjectId> = vec![];
let mut inrefs: Vec<ObjectId> = vec![];
let mut kdeps: Vec<ObjectKey> = vec![];
let mut kacks: Vec<ObjectKey> = vec![];
let mut knacks: Vec<ObjectKey> = vec![];
for d in deps {
ideps.push(d.id);
kdeps.push(d.key);
}
for d in ndeps {
indeps.push(d.id);
}
for d in acks {
iacks.push(d.id);
kacks.push(d.key);
}
for d in nacks {
inacks.push(d.id);
knacks.push(d.key);
}
for d in refs.clone() {
irefs.push(d.id);
}
for d in nrefs {
inrefs.push(d.id);
}
(
Some(Self {
id: None,
compact: false,
deps: ideps,
ndeps: indeps,
acks: iacks,
nacks: inacks,
refs: irefs,
nrefs: inrefs,
}),
Some(CommitHeaderKeysV0 {
deps: kdeps,
acks: kacks,
nacks: knacks,
refs,
}),
)
}
}
pub fn new_with_deps(deps: Vec<ObjectId>) -> Option<Self> {
assert!(!deps.is_empty());
let mut n = Self::new_empty();
n.deps = deps;
Some(n)
}
pub fn new_with_deps_and_acks(deps: Vec<ObjectId>, acks: Vec<ObjectId>) -> Option<Self> {
assert!(!deps.is_empty() || !acks.is_empty());
let mut n = Self::new_empty();
n.deps = deps;
n.acks = acks;
Some(n)
}
pub fn new_with_acks(acks: Vec<ObjectId>) -> Option<Self> {
assert!(!acks.is_empty());
let mut n = Self::new_empty();
n.acks = acks;
Some(n)
}
/// we do not check the deps because in a forked branch, they point to previous branch heads.
pub fn is_root(&self) -> bool {
//self.deps.is_empty()
// && self.ndeps.is_empty()
self.acks.is_empty() && self.nacks.is_empty()
}
}
impl fmt::Display for Commit {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::V0(v0) => {
writeln!(f, "====== Commit V0 ======")?;
if v0.id.is_some() {
writeln!(f, "== ID: {}", v0.id.as_ref().unwrap())?;
}
if v0.key.is_some() {
writeln!(f, "== Key: {}", v0.key.as_ref().unwrap())?;
}
if v0.header.is_some() {
write!(f, "== Header: {}", v0.header.as_ref().unwrap())?;
}
writeln!(f, "== Sig: {}", v0.sig)?;
write!(f, "{}", v0.content)?;
if v0.body.get().is_some() {
writeln!(f, "== Body: {}", v0.body.get().unwrap())?;
}
}
}
Ok(())
}
}
impl fmt::Display for CommitBody {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::V0(v0) => {
write!(f, "V0 ")?;
match v0 {
//
// for root branch:
//
CommitBodyV0::Repository(b) => writeln!(f, "Repository {}", b),
CommitBodyV0::RootBranch(b) => writeln!(f, "RootBranch {}", b),
_ => unimplemented!(),
/*UpdateRootBranch(RootBranch), // total order enforced with total_order_quorum
AddMember(AddMember), // total order enforced with total_order_quorum
RemoveMember(RemoveMember), // total order enforced with total_order_quorum
AddPermission(AddPermission),
RemovePermission(RemovePermission),
AddBranch(AddBranch),
ChangeMainBranch(ChangeMainBranch),
RemoveBranch(RemoveBranch),
AddName(AddName),
RemoveName(RemoveName),
// TODO? Quorum(Quorum), // changes the quorum without changing the RootBranch
//
// For transactional branches:
//
Branch(Branch), // singleton and should be first in branch
UpdateBranch(Branch), // total order enforced with total_order_quorum
Snapshot(Snapshot), // a soft snapshot
AsyncTransaction(Transaction), // partial_order
SyncTransaction(Transaction), // total_order
AddFile(AddFile),
RemoveFile(RemoveFile),
Compact(Compact), // a hard snapshot. total order enforced with total_order_quorum
//Merge(Merge),
//Revert(Revert), // only possible on partial order commit
AsyncSignature(AsyncSignature),
//
// For both
//
RefreshReadCap(RefreshReadCap),
RefreshWriteCap(RefreshWriteCap),
SyncSignature(SyncSignature),*/
}
}
}
}
}
impl fmt::Display for CommitContent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::V0(v0) => {
writeln!(f, "=== CommitContent V0 ===")?;
writeln!(f, "====== author: {}", v0.author)?;
writeln!(f, "====== seq: {}", v0.seq)?;
writeln!(f, "====== BranchID: {}", v0.branch)?;
writeln!(f, "====== quorum: {:?}", v0.quorum)?;
writeln!(f, "====== Ref body: {}", v0.body)?;
if v0.header_keys.is_none() {
writeln!(f, "====== header keys: None")?;
} else {
write!(f, "{}", v0.header_keys.as_ref().unwrap())?;
}
writeln!(f, "====== Perms commits: {}", v0.perms.len())?;
let mut i = 0;
for block in &v0.perms {
writeln!(f, "========== {:03}: {}", i, block)?;
i += 1;
}
}
}
Ok(())
}
}
impl fmt::Display for CommitHeaderKeys {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::V0(v0) => {
writeln!(f, "=== CommitHeaderKeys V0 ===")?;
writeln!(f, "==== acks : {}", v0.acks.len())?;
for ack in &v0.acks {
writeln!(f, "============== {}", ack)?;
}
writeln!(f, "==== nacks : {}", v0.nacks.len())?;
for nack in &v0.nacks {
writeln!(f, "============== {}", nack)?;
}
writeln!(f, "==== deps : {}", v0.deps.len())?;
for dep in &v0.deps {
writeln!(f, "============== {}", dep)?;
}
writeln!(f, "==== refs : {}", v0.refs.len())?;
for rref in &v0.refs {
writeln!(f, "============== {}", rref)?;
}
}
}
Ok(())
}
}
mod test {
use std::collections::HashMap;
use ed25519_dalek::*;
use rand::rngs::OsRng;
use crate::branch::*;
use crate::commit::*;
use crate::store::*;
use crate::types::*;
use crate::utils::*;
#[test]
pub fn test_commit() {
let mut csprng = OsRng {};
let keypair: Keypair = Keypair::generate(&mut csprng);
log_debug!(
"private key: ({}) {:?}",
keypair.secret.as_bytes().len(),
keypair.secret.as_bytes()
);
log_debug!(
"public key: ({}) {:?}",
keypair.public.as_bytes().len(),
keypair.public.as_bytes()
);
let ed_priv_key = keypair.secret.to_bytes();
let ed_pub_key = keypair.public.to_bytes();
let priv_key = PrivKey::Ed25519PrivKey(ed_priv_key);
let pub_key = PubKey::Ed25519PubKey(ed_pub_key);
let (priv_key, pub_key) = generate_keypair();
let seq = 3;
let obj_ref = ObjectRef {
id: ObjectId::Blake3Digest32([1; 32]),
key: SymKey::ChaCha20Key([2; 32]),
};
let obj_ref = ObjectRef::dummy();
let obj_refs = vec![obj_ref.clone()];
let branch = pub_key;
let deps = obj_refs.clone();
@ -825,15 +1159,12 @@ mod test {
body_ref,
)
.unwrap();
log_debug!("commit: {:?}", commit);
log_debug!("{}", commit);
let store = Box::new(HashMapRepoStore::new());
let repo = Repo::new_with_member(&pub_key, &pub_key, &[PermissionV0::WriteAsync], store);
//let body = CommitBody::Ack(Ack::V0());
//log_debug!("body: {:?}", body);
match commit.load_body(repo.get_store()) {
Ok(_b) => panic!("Body should not exist"),
Err(CommitLoadError::MissingBlocks(missing)) => {
@ -842,9 +1173,6 @@ mod test {
Err(e) => panic!("Commit verify error: {:?}", e),
}
let content = commit.content_v0();
log_debug!("content: {:?}", content);
commit.verify_sig().expect("Invalid signature");
commit.verify_perm(&repo).expect("Permission denied");

@ -1,7 +1,5 @@
// Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// This code is partly derived from work written by TG x Thoth from P2Pcollab.
// Copyright 2022 TG x Thoth
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,

File diff suppressed because it is too large Load Diff

@ -18,7 +18,7 @@ use crate::utils::Receiver;
use std::sync::{Arc, RwLock};
use std::{
cmp::min,
cmp::{max, min},
collections::{hash_map::Iter, HashMap},
mem::size_of_val,
};
@ -57,6 +57,8 @@ impl From<serde_bare::error::Error> for StorageError {
}
}
/* LMDB values:
const MIN_SIZE: usize = 4072;
const PAGE_SIZE: usize = 4096;
const HEADER: usize = PAGE_SIZE - MIN_SIZE;
@ -75,6 +77,29 @@ pub fn store_valid_value_size(size: usize) -> usize {
pub const fn store_max_value_size() -> usize {
MAX_FACTOR * PAGE_SIZE - HEADER
}
*/
// ROCKSDB values:
const ONE_MEGA_BYTE: usize = 1024 * 1024;
const DISK_BLOCK_SIZE: usize = 4096;
// HDD block size at 4096, SSD page size at 4096, on openbsd FFS default is 16384
// see Rocksdb integrated BlobDB https://rocksdb.org/blog/2021/05/26/integrated-blob-db.html
// blob values should be multiple of 4096 because of the BlobCache of RocksDB that is in heap memory (so must align on mem page).
const MAX_FACTOR: usize = 256;
/// Returns a valid/optimal value size for the entries of the storage backend.
pub fn store_valid_value_size(size: usize) -> usize {
min(
max(1, (size as f32 / DISK_BLOCK_SIZE as f32).ceil() as usize),
MAX_FACTOR,
) * DISK_BLOCK_SIZE
}
/// Returns the maximum value size for the entries of the storage backend.
pub const fn store_max_value_size() -> usize {
ONE_MEGA_BYTE
}
/// Store with a HashMap backend
pub struct HashMapRepoStore {

@ -1,7 +1,5 @@
// Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// This code is partly derived from work written by TG x Thoth from P2Pcollab.
// Copyright 2022 TG x Thoth
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
@ -88,6 +86,10 @@ impl SymKey {
pub fn from_array(array: [u8; 32]) -> Self {
SymKey::ChaCha20Key(array)
}
#[deprecated(note = "**Don't use dummy method**")]
pub fn dummy() -> Self {
SymKey::ChaCha20Key([0; 32])
}
}
impl fmt::Display for SymKey {
@ -235,8 +237,8 @@ impl fmt::Display for PrivKey {
match self {
Self::Ed25519PrivKey(ed) => {
//let priv_key_ser = serde_bare::to_vec(ed).unwrap();
let prix_key_encoded = base64_url::encode(ed);
write!(f, "{}", prix_key_encoded)
let priv_key_encoded = base64_url::encode(ed);
write!(f, "{}", priv_key_encoded)
}
_ => {
unimplemented!();
@ -254,6 +256,24 @@ pub enum Sig {
Ed25519Sig(Ed25519Sig),
}
impl fmt::Display for Sig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Ed25519Sig(ed) => {
write!(
f,
"{} {}",
base64_url::encode(&ed[0]),
base64_url::encode(&ed[1])
)
}
_ => {
unimplemented!();
}
}
}
}
/// Timestamp: absolute time in minutes since 2022-02-22 22:22 UTC
pub type Timestamp = u32;
@ -268,6 +288,17 @@ pub enum RelTime {
Days(u8),
}
impl fmt::Display for RelTime {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Seconds(s) => writeln!(f, "{} sec.", s),
Self::Minutes(s) => writeln!(f, "{} min.", s),
Self::Hours(s) => writeln!(f, "{} h.", s),
Self::Days(s) => writeln!(f, "{} d.", s),
}
}
}
/// Bloom filter (variable size)
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct BloomFilter {
@ -330,6 +361,13 @@ pub struct BlockRef {
pub key: BlockKey,
}
impl BlockId {
#[deprecated(note = "**Don't use dummy method**")]
pub fn dummy() -> Self {
Digest::Blake3Digest32([0u8; 32])
}
}
impl BlockRef {
#[deprecated(note = "**Don't use dummy method**")]
pub fn dummy() -> Self {
@ -343,6 +381,12 @@ impl BlockRef {
}
}
impl From<BlockRef> for (BlockId, BlockKey) {
fn from(blockref: BlockRef) -> (BlockId, BlockKey) {
(blockref.id.clone(), blockref.key.clone())
}
}
impl From<(&BlockId, &BlockKey)> for BlockRef {
fn from(id_key: (&BlockId, &BlockKey)) -> Self {
BlockRef {
@ -352,6 +396,12 @@ impl From<(&BlockId, &BlockKey)> for BlockRef {
}
}
impl fmt::Display for BlockRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{} {}", self.id, self.key)
}
}
/// Object ID
pub type ObjectId = BlockId;
@ -414,6 +464,23 @@ pub enum StoreOverlay {
Own(BranchId), // The repo is a store, so the overlay can be derived from its own ID. In this case, the branchId of the `overlay` branch is entered here.
}
impl fmt::Display for StoreOverlay {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::V0(v0) => {
writeln!(f, "StoreOverlay V0")?;
match v0 {
StoreOverlayV0::PublicStore(k) => writeln!(f, "PublicStore: {}", k),
StoreOverlayV0::ProtectedStore(k) => writeln!(f, "ProtectedStore: {}", k),
StoreOverlayV0::Group(k) => writeln!(f, "Group: {}", k),
StoreOverlayV0::Dialog(k) => writeln!(f, "Dialog: {}", k),
}
}
Self::Own(b) => writeln!(f, "Own: {}", b),
}
}
}
/// List of Store Root Repo types
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub enum StoreRepoV0 {
@ -441,6 +508,14 @@ impl StoreRepo {
},
}
}
pub fn dummy_public_v0() -> (Self, SymKey) {
let readcap = SymKey::dummy();
let store_pubkey = PubKey::nil();
(
StoreRepo::V0(StoreRepoV0::PublicStore(store_pubkey)),
readcap,
)
}
}
/// Site type
@ -560,172 +635,6 @@ pub enum CommitHeader {
V0(CommitHeaderV0),
}
impl CommitHeader {
pub fn is_root(&self) -> bool {
match self {
CommitHeader::V0(v0) => v0.is_root(),
}
}
pub fn deps(&self) -> Vec<ObjectId> {
match self {
CommitHeader::V0(v0) => v0.deps.clone(),
}
}
pub fn acks(&self) -> Vec<ObjectId> {
match self {
CommitHeader::V0(v0) => v0.acks.clone(),
}
}
pub fn id(&self) -> &Option<ObjectId> {
match self {
CommitHeader::V0(v0) => &v0.id,
}
}
pub fn set_id(&mut self, id: Digest) {
match self {
CommitHeader::V0(v0) => v0.id = Some(id),
}
}
pub fn set_compact(&mut self) {
match self {
CommitHeader::V0(v0) => v0.set_compact(),
}
}
pub fn new_with(
deps: Vec<ObjectRef>,
ndeps: Vec<ObjectRef>,
acks: Vec<ObjectRef>,
nacks: Vec<ObjectRef>,
refs: Vec<ObjectRef>,
nrefs: Vec<ObjectRef>,
) -> (Option<Self>, Option<CommitHeaderKeys>) {
let res = CommitHeaderV0::new_with(deps, ndeps, acks, nacks, refs, nrefs);
(
res.0.map(|h| CommitHeader::V0(h)),
res.1.map(|h| CommitHeaderKeys::V0(h)),
)
}
pub fn new_with_deps(deps: Vec<ObjectId>) -> Option<Self> {
CommitHeaderV0::new_with_deps(deps).map(|ch| CommitHeader::V0(ch))
}
pub fn new_with_deps_and_acks(deps: Vec<ObjectId>, acks: Vec<ObjectId>) -> Option<Self> {
CommitHeaderV0::new_with_deps_and_acks(deps, acks).map(|ch| CommitHeader::V0(ch))
}
}
impl CommitHeaderV0 {
fn new_empty() -> Self {
Self {
id: None,
compact: false,
deps: vec![],
ndeps: vec![],
acks: vec![],
nacks: vec![],
refs: vec![],
nrefs: vec![],
}
}
pub fn set_compact(&mut self) {
self.compact = true;
}
pub fn new_with(
deps: Vec<ObjectRef>,
ndeps: Vec<ObjectRef>,
acks: Vec<ObjectRef>,
nacks: Vec<ObjectRef>,
refs: Vec<ObjectRef>,
nrefs: Vec<ObjectRef>,
) -> (Option<Self>, Option<CommitHeaderKeysV0>) {
if deps.is_empty()
&& ndeps.is_empty()
&& acks.is_empty()
&& nacks.is_empty()
&& refs.is_empty()
&& nrefs.is_empty()
{
(None, None)
} else {
let mut ideps: Vec<ObjectId> = vec![];
let mut indeps: Vec<ObjectId> = vec![];
let mut iacks: Vec<ObjectId> = vec![];
let mut inacks: Vec<ObjectId> = vec![];
let mut irefs: Vec<ObjectId> = vec![];
let mut inrefs: Vec<ObjectId> = vec![];
let mut kdeps: Vec<ObjectKey> = vec![];
let mut kacks: Vec<ObjectKey> = vec![];
let mut knacks: Vec<ObjectKey> = vec![];
for d in deps {
ideps.push(d.id);
kdeps.push(d.key);
}
for d in ndeps {
indeps.push(d.id);
}
for d in acks {
iacks.push(d.id);
kacks.push(d.key);
}
for d in nacks {
inacks.push(d.id);
knacks.push(d.key);
}
for d in refs.clone() {
irefs.push(d.id);
}
for d in nrefs {
inrefs.push(d.id);
}
(
Some(Self {
id: None,
compact: false,
deps: ideps,
ndeps: indeps,
acks: iacks,
nacks: inacks,
refs: irefs,
nrefs: inrefs,
}),
Some(CommitHeaderKeysV0 {
deps: kdeps,
acks: kacks,
nacks: knacks,
refs,
}),
)
}
}
pub fn new_with_deps(deps: Vec<ObjectId>) -> Option<Self> {
assert!(!deps.is_empty());
let mut n = Self::new_empty();
n.deps = deps;
Some(n)
}
pub fn new_with_deps_and_acks(deps: Vec<ObjectId>, acks: Vec<ObjectId>) -> Option<Self> {
assert!(!deps.is_empty() || !acks.is_empty());
let mut n = Self::new_empty();
n.deps = deps;
n.acks = acks;
Some(n)
}
pub fn is_root(&self) -> bool {
//self.deps.is_empty()
// && self.ndeps.is_empty()
self.acks.is_empty() && self.nacks.is_empty()
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct CommitHeaderKeysV0 {
/// Other objects this commit strongly depends on (ex: ADD for a REMOVE, refs for an nrefs)
@ -758,6 +667,7 @@ pub enum CommitHeaderObject {
None,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct CommitHeaderRef {
pub obj: CommitHeaderObject,
pub key: ObjectKey,
@ -770,6 +680,18 @@ impl CommitHeaderRef {
key,
}
}
pub fn from_content_key(content: Vec<u8>, key: ObjectKey) -> Self {
CommitHeaderRef {
obj: CommitHeaderObject::EncryptedContent(content),
key,
}
}
pub fn encrypted_content_len(&self) -> usize {
match &self.obj {
CommitHeaderObject::EncryptedContent(ec) => ec.len(),
_ => 0,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
@ -780,10 +702,12 @@ pub struct BlockContentV0 {
/// It is an easy way to know if the Block is a commit (but be careful because some root commits can be without a header).
pub commit_header: CommitHeaderObject,
/// Block IDs for child nodes in the Merkle tree, can be empty if ObjectContent fits in one block
/// Block IDs for child nodes in the Merkle tree,
/// is empty if ObjectContent fits in one block or this block is a leaf. in both cases, encrypted_content is then not empty
pub children: Vec<BlockId>,
/// Encrypted ChunkContentV0 (entirety or chunks of ObjectContentV0)
/// contains encrypted ChunkContentV0 (entirety, when fitting, or chunks of ObjectContentV0, in DataChunk) used for leafs of the Merkle tree,
/// or to store the keys of children (in InternalNode)
///
/// Encrypted using convergent encryption with ChaCha20:
/// - convergence_key: BLAKE3 derive_key ("NextGraph Data BLAKE3 key",
@ -862,8 +786,8 @@ pub struct RepositoryV0 {
pub creator: Option<UserId>,
// TODO: discrete doc type
// TODO: order (store, partial order, total order, partial sign all commits, fsm, smart contract )
// TODO: immutable conditions (allow_change_quorum, min_quorum, allow_inherit_perms, etc...)
// TODO: order (store, partial order, partial sign all commits,(conflict resolution strategy), total order, fsm, smart contract )
// TODO: immutable conditions (allow_change_owners, allow_change_quorum, min_quorum, allow_inherit_perms, signers_can_be_editors, all_editors_are_signers, etc...)
/// Immutable App-specific metadata
#[serde(with = "serde_bytes")]
pub metadata: Vec<u8>,
@ -875,6 +799,23 @@ pub enum Repository {
V0(RepositoryV0),
}
impl fmt::Display for Repository {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::V0(v0) => {
writeln!(f, "V0")?;
writeln!(f, "repo_id: {}", v0.id)?;
writeln!(
f,
"creator: {}",
v0.creator.map_or("None".to_string(), |c| format!("{}", c))
)?;
Ok(())
}
}
}
}
/// Root Branch definition V0
///
/// Second commit in the root branch, signed by repository key
@ -947,6 +888,42 @@ impl RootBranch {
}
}
impl fmt::Display for RootBranch {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::V0(v0) => {
writeln!(f, "V0")?;
writeln!(f, "repo_id: {}", v0.id)?;
writeln!(f, "repo_ref: {}", v0.repo)?;
write!(f, "store: {}", v0.store)?;
writeln!(
f,
"store_sig: {}",
v0.store_sig
.map_or("None".to_string(), |c| format!("{}", c))
)?;
writeln!(f, "topic: {}", v0.repo)?;
writeln!(
f,
"inherit_perms: {}",
v0.inherit_perms_users_and_quorum_from_store
.as_ref()
.map_or("None".to_string(), |c| format!("{}", c))
)?;
writeln!(
f,
"quorum: {}",
v0.quorum
.as_ref()
.map_or("None".to_string(), |c| format!("{}", c))
)?;
writeln!(f, "reconciliation_interval: {}", v0.reconciliation_interval)?;
Ok(())
}
}
}
}
/// Quorum definition V0
///
/// Changed when the signers need to be updated. Signers are not necessarily editors of the repo, and they do not need to be members either, as they will be notified of RefreshReadCaps anyway.

@ -1,5 +1,5 @@
// #[cfg(not(target_arch = "wasm32"))]
// pub mod repo_store;
#[cfg(not(target_arch = "wasm32"))]
pub mod repo_store;
#[cfg(not(target_arch = "wasm32"))]
pub mod kcv_store;

@ -17,7 +17,7 @@ use std::sync::{Arc, RwLock};
use serde::{Deserialize, Serialize};
use serde_bare::error::Error;
/*
#[derive(Debug)]
pub struct LmdbRepoStore {
/// the main store where all the repo blocks are stored
@ -483,112 +483,111 @@ impl LmdbRepoStore {
}
}
}
*/
#[cfg(test)]
mod test {
use crate::repo_store::LmdbRepoStore;
use p2p_repo::log::*;
use p2p_repo::store::*;
use p2p_repo::types::*;
use p2p_repo::utils::*;
use rkv::backend::{BackendInfo, BackendStat, Lmdb, LmdbEnvironment};
use rkv::{Manager, Rkv, StoreOptions, Value};
#[allow(unused_imports)]
use std::time::Duration;
#[allow(unused_imports)]
use std::{fs, thread};
use tempfile::Builder;
/*
#[test]
pub fn test_remove_least_used() {
let path_str = "test-env";
let root = Builder::new().prefix(path_str).tempdir().unwrap();
let key: [u8; 32] = [0; 32];
fs::create_dir_all(root.path()).unwrap();
log_debug!("{}", root.path().to_str().unwrap());
let mut store = LmdbRepoStore::open(root.path(), key).unwrap();
let mut now = now_timestamp();
now -= 200;
// TODO: fix the LMDB bug that is triggered with x max set to 86 !!!
for x in 1..85 {
let block = Block::new(
Vec::new(),
ObjectDeps::ObjectIdList(Vec::new()),
None,
vec![x; 10],
None,
);
let block_id = store.put(&block).unwrap();
log_debug!("#{} -> objId {:?}", x, block_id);
store
.has_been_synced(&block_id, Some(now + x as u32))
.unwrap();
}
#[test]
pub fn test_remove_least_used() {
let path_str = "test-env";
let root = Builder::new().prefix(path_str).tempdir().unwrap();
let key: [u8; 32] = [0; 32];
fs::create_dir_all(root.path()).unwrap();
log_debug!("{}", root.path().to_str().unwrap());
let mut store = LmdbRepoStore::open(root.path(), key).unwrap();
let mut now = now_timestamp();
now -= 200;
// TODO: fix the LMDB bug that is triggered with x max set to 86 !!!
for x in 1..85 {
let block = Block::new(
Vec::new(),
ObjectDeps::ObjectIdList(Vec::new()),
None,
vec![x; 10],
None,
);
let block_id = store.put(&block).unwrap();
log_debug!("#{} -> objId {:?}", x, block_id);
store
.has_been_synced(&block_id, Some(now + x as u32))
.unwrap();
}
let ret = store.remove_least_used(200);
log_debug!("removed {}", ret);
assert_eq!(ret, 208)
//store.list_all();
}
let ret = store.remove_least_used(200);
log_debug!("removed {}", ret);
assert_eq!(ret, 208)
#[test]
pub fn test_set_pin() {
let path_str = "test-env";
let root = Builder::new().prefix(path_str).tempdir().unwrap();
let key: [u8; 32] = [0; 32];
fs::create_dir_all(root.path()).unwrap();
log_debug!("{}", root.path().to_str().unwrap());
let mut store = LmdbRepoStore::open(root.path(), key).unwrap();
let mut now = now_timestamp();
now -= 200;
// TODO: fix the LMDB bug that is triggered with x max set to 86 !!!
for x in 1..100 {
let block = Block::new(
Vec::new(),
ObjectDeps::ObjectIdList(Vec::new()),
None,
vec![x; 10],
None,
);
let obj_id = store.put(&block).unwrap();
log_debug!("#{} -> objId {:?}", x, obj_id);
store.set_pin(&obj_id, true).unwrap();
store
.has_been_synced(&obj_id, Some(now + x as u32))
.unwrap();
//store.list_all();
}
let ret = store.remove_least_used(200);
log_debug!("removed {}", ret);
assert_eq!(ret, 0);
#[test]
pub fn test_set_pin() {
let path_str = "test-env";
let root = Builder::new().prefix(path_str).tempdir().unwrap();
let key: [u8; 32] = [0; 32];
fs::create_dir_all(root.path()).unwrap();
log_debug!("{}", root.path().to_str().unwrap());
let mut store = LmdbRepoStore::open(root.path(), key).unwrap();
let mut now = now_timestamp();
now -= 200;
// TODO: fix the LMDB bug that is triggered with x max set to 86 !!!
for x in 1..100 {
let block = Block::new(
Vec::new(),
ObjectDeps::ObjectIdList(Vec::new()),
None,
vec![x; 10],
None,
);
let obj_id = store.put(&block).unwrap();
log_debug!("#{} -> objId {:?}", x, obj_id);
store.set_pin(&obj_id, true).unwrap();
store
.has_been_synced(&obj_id, Some(now + x as u32))
.unwrap();
}
store.list_all();
}
let ret = store.remove_least_used(200);
log_debug!("removed {}", ret);
assert_eq!(ret, 0);
store.list_all();
}
*/
#[test]
pub fn test_get_valid_value_size() {
assert_eq!(store_valid_value_size(0), 4072);
assert_eq!(store_valid_value_size(2), 4072);
assert_eq!(store_valid_value_size(4072), 4072);
assert_eq!(store_valid_value_size(4072 + 1), 4072 + 4096);
assert_eq!(store_valid_value_size(4072 + 4096), 4072 + 4096);
assert_eq!(store_valid_value_size(4072 + 4096 + 1), 4072 + 4096 + 4096);
assert_eq!(store_valid_value_size(0), 4096);
assert_eq!(store_valid_value_size(2), 4096);
assert_eq!(store_valid_value_size(4096 - 1), 4096);
assert_eq!(store_valid_value_size(4096), 4096);
assert_eq!(store_valid_value_size(4096 + 1), 4096 + 4096);
assert_eq!(store_valid_value_size(4096 + 4096), 4096 + 4096);
assert_eq!(store_valid_value_size(4096 + 4096 + 1), 4096 + 4096 + 4096);
assert_eq!(
store_valid_value_size(4072 + 4096 + 4096),
4072 + 4096 + 4096
store_valid_value_size(4096 + 4096 + 4096),
4096 + 4096 + 4096
);
assert_eq!(
store_valid_value_size(4072 + 4096 + 4096 + 1),
4072 + 4096 + 4096 + 4096
store_valid_value_size(4096 + 4096 + 4096 + 1),
4096 + 4096 + 4096 + 4096
);
assert_eq!(store_valid_value_size(4072 + 4096 * 511), 4072 + 4096 * 511);
assert_eq!(store_valid_value_size(4096 + 4096 * 255), 4096 + 4096 * 255);
assert_eq!(
store_valid_value_size(4072 + 4096 * 511 + 1),
4072 + 4096 * 511
store_valid_value_size(4096 + 4096 * 255 + 1),
4096 + 4096 * 255
);
}
/*
#[test]
pub fn test_remove_expired() {
let path_str = "test-env";
@ -740,7 +739,7 @@ mod test {
}
#[test]
pub fn test_lmdb() {
pub fn test_rocksdb() {
let path_str = "test-env";
let root = Builder::new().prefix(path_str).tempdir().unwrap();
@ -989,4 +988,5 @@ mod test {
// uncomment this if you need time to copy them somewhere for analysis, before the temp folder get destroyed
//thread::sleep(Duration::from_millis(20000));
}
*/
}

Loading…
Cancel
Save