diff --git a/ng-net/src/actors/ext/get.rs b/ng-net/src/actors/ext/get.rs new file mode 100644 index 0000000..459bbd4 --- /dev/null +++ b/ng-net/src/actors/ext/get.rs @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers + * All rights reserved. + * Licensed under the Apache License, Version 2.0 + * + * or the MIT license , + * at your option. All files in the project carrying such + * notice may not be copied, modified, or distributed except + * according to those terms. +*/ + +use std::sync::Arc; + +use async_std::sync::Mutex; + +use ng_repo::errors::*; +use ng_repo::log::*; +use ng_repo::object::Object; +use ng_repo::store::Store; +use ng_repo::types::Block; + +use super::super::StartProtocol; + +use crate::broker::BROKER; +use crate::connection::NoiseFSM; +use crate::types::*; +use crate::{actor::*, types::ProtocolMessage}; + +impl ExtObjectGetV0 { + pub fn get_actor(&self) -> Box { + Actor::>::new_responder(0) + } +} + +impl TryFrom for ExtObjectGetV0 { + type Error = ProtocolError; + fn try_from(msg: ProtocolMessage) -> Result { + if let ProtocolMessage::Start(StartProtocol::Ext(ExtRequest::V0(ExtRequestV0 { + content: ExtRequestContentV0::ExtObjectGet(a), + .. + }))) = msg + { + Ok(a) + } else { + log_debug!("INVALID {:?}", msg); + Err(ProtocolError::InvalidValue) + } + } +} + +impl From for ProtocolMessage { + fn from(_msg: ExtObjectGetV0) -> ProtocolMessage { + unimplemented!(); + } +} + +impl From for ExtRequestContentV0 { + fn from(msg: ExtObjectGetV0) -> ExtRequestContentV0 { + ExtRequestContentV0::ExtObjectGet(msg) + } +} + +impl TryFrom for Vec { + type Error = ProtocolError; + fn try_from(msg: ProtocolMessage) -> Result, Self::Error> { + let content: ExtResponseContentV0 = msg.try_into()?; + if let ExtResponseContentV0::Blocks(res) = content { + Ok(res) + } else { + Err(ProtocolError::InvalidValue) + } + } +} + +impl Actor<'_, ExtObjectGetV0, Vec> {} + +#[async_trait::async_trait] +impl EActor for Actor<'_, ExtObjectGetV0, Vec> { + async fn respond( + &mut self, + msg: ProtocolMessage, + fsm: Arc>, + ) -> Result<(), ProtocolError> { + let req = ExtObjectGetV0::try_from(msg)?; + let sb = { + let broker = BROKER.read().await; + broker.get_server_broker()? + }; + let lock = sb.read().await; + let store = Store::new_from_overlay_id(&req.overlay, lock.get_block_storage()); + let mut blocks = Vec::new(); + for obj_id in req.ids { + // TODO: deal with RandomAccessFiles (or is it just working?) + if let Ok(obj) = Object::load_without_header(obj_id, None, &store) { + blocks.append(&mut obj.into_blocks()); + //TODO: load the obj.files too (if req.include_files) + } + } + let response: ExtResponseV0 = Ok(ExtResponseContentV0::Blocks(blocks)).into(); + fsm.lock().await.send(response.into()).await?; + Ok(()) + } +} diff --git a/ng-net/src/actors/ext/mod.rs b/ng-net/src/actors/ext/mod.rs index 86e5bfa..8f6fcb3 100644 --- a/ng-net/src/actors/ext/mod.rs +++ b/ng-net/src/actors/ext/mod.rs @@ -1 +1,3 @@ pub mod wallet_get_export; + +pub mod get; diff --git a/ng-net/src/app_protocol.rs b/ng-net/src/app_protocol.rs index c7e10fc..89ca27d 100644 --- a/ng-net/src/app_protocol.rs +++ b/ng-net/src/app_protocol.rs @@ -39,6 +39,13 @@ lazy_static! { #[doc(hidden)] static ref RE_NAMED_BRANCH_OR_COMMIT: Regex = Regex::new(r"^did:ng:o:([A-Za-z0-9-_]*):v:([A-Za-z0-9-_]*):a:([A-Za-z0-9-_%]*)$").unwrap(); //TODO: allow international chars. disallow digit as first char + #[doc(hidden)] + static ref RE_OBJECTS: Regex = + Regex::new(r"^did:ng(?::o:([A-Za-z0-9-_]{44}))?:v:([A-Za-z0-9-_]{44})((?::c:[A-Za-z0-9-_]{44}:k:[A-Za-z0-9-_]{44})+)(?::s:([A-Za-z0-9-_]{44}):k:([A-Za-z0-9-_]{44}))?:l:([A-Za-z0-9-_]*)$").unwrap(); + #[doc(hidden)] + static ref RE_OBJECT_READ_CAPS: Regex = + Regex::new(r":[cj]:([A-Za-z0-9-_]{44}):k:([A-Za-z0-9-_]{44})").unwrap(); + } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -193,13 +200,15 @@ pub struct NuriV0 { pub target: NuriTargetV0, pub entire_store: bool, // If it is a store, will include all the docs belonging to the store - pub object: Option, // used only for FileGet. // cannot be used for queries. only to download an object (file,commit..) + pub objects: Vec, // used only for FileGet. // cannot be used for queries. only to download an object (file,commit..) + pub signature: Option, + pub branch: Option, // if None, the main branch is chosen pub overlay: Option, pub access: Vec, pub topic: Option, - pub locator: Vec, + pub locator: Option, } impl NuriV0 { @@ -208,12 +217,13 @@ impl NuriV0 { identity: None, target: NuriTargetV0::None, entire_store: false, - object: None, + objects: vec![], + signature: None, branch: None, overlay: None, access: vec![], topic: None, - locator: vec![], + locator: None, } } pub fn copy_target_from(&mut self, nuri: &NuriV0) { @@ -232,12 +242,13 @@ impl NuriV0 { identity: None, target: NuriTargetV0::Repo(store_repo.repo_id().clone()), entire_store: false, - object: None, + objects: vec![], + signature: None, branch: None, overlay: None, access: vec![], topic: None, - locator: vec![], + locator: None, } } @@ -303,7 +314,7 @@ impl NuriV0 { } pub fn is_branch_identifier(&self) -> bool { - self.locator.is_empty() + self.locator.is_none() && self.topic.is_none() && self.access.is_empty() && self.overlay.as_ref().map_or(false, |o| o.is_outer()) @@ -311,13 +322,15 @@ impl NuriV0 { .branch .as_ref() .map_or(true, |b| b.is_valid_for_sparql_update()) - && self.object.is_none() + && self.objects.is_empty() + && self.signature.is_none() && !self.entire_store && self.target.is_repo_id() } pub fn is_valid_for_sparql_update(&self) -> bool { - self.object.is_none() + self.objects.is_empty() + && self.signature.is_none() && self.entire_store == false && self.target.is_valid_for_sparql_update() && self @@ -326,7 +339,8 @@ impl NuriV0 { .map_or(true, |b| b.is_valid_for_sparql_update()) } pub fn is_valid_for_discrete_update(&self) -> bool { - self.object.is_none() + self.objects.is_empty() + && self.signature.is_none() && self.entire_store == false && self.target.is_valid_for_discrete_update() && self @@ -340,12 +354,13 @@ impl NuriV0 { identity: None, target: NuriTargetV0::Repo(repo_id), entire_store: false, - object: None, + objects: vec![], + signature: None, branch: None, overlay: None, access: vec![], topic: None, - locator: vec![], + locator: None, }) } @@ -354,12 +369,13 @@ impl NuriV0 { identity: None, target: NuriTargetV0::None, entire_store: false, - object: Some(obj_ref.id), + objects: vec![obj_ref.clone()], + signature: None, branch: None, overlay: None, - access: vec![NgAccessV0::Key(obj_ref.key.clone())], + access: vec![], topic: None, - locator: vec![], + locator: None, } } @@ -368,12 +384,13 @@ impl NuriV0 { identity: None, target: NuriTargetV0::PrivateStore, entire_store: false, - object: None, + objects: vec![], + signature: None, branch: None, overlay: None, access: vec![], topic: None, - locator: vec![], + locator: None, } } pub fn new_entire_user_site() -> Self { @@ -381,14 +398,71 @@ impl NuriV0 { identity: None, target: NuriTargetV0::UserSite, entire_store: false, - object: None, + objects: vec![], + signature: None, branch: None, overlay: None, access: vec![], topic: None, - locator: vec![], + locator: None, } } + pub fn new_for_readcaps(from: &str) -> Result { + let c = RE_OBJECTS.captures(from); + if let Some(c) = c { + let target = c.get(1).map_or(NuriTargetV0::None, |repo_match| { + if let Ok(id) = decode_key(repo_match.as_str()) { + NuriTargetV0::Repo(id) + } else { + NuriTargetV0::None + } + }); + let overlay_id = decode_overlayid(c.get(2).ok_or(NgError::InvalidNuri)?.as_str())?; + let read_caps = c.get(3).ok_or(NgError::InvalidNuri)?.as_str(); + let sign_obj_id = c.get(4).map(|c| decode_digest(c.as_str())); + let sign_obj_key = c.get(5).map(|c| decode_sym_key(c.as_str())); + let locator = + TryInto::::try_into(c.get(6).ok_or(NgError::InvalidNuri)?.as_str())?; + let signature = if sign_obj_id.is_some() && sign_obj_key.is_some() { + Some(ObjectRef::from_id_key( + sign_obj_id.unwrap()?, + sign_obj_key.unwrap()?, + )) + } else { + None + }; + + let objects = RE_OBJECT_READ_CAPS + .captures_iter(read_caps) + .map(|c| { + Ok(ObjectRef::from_id_key( + decode_digest(c.get(1).ok_or(NgError::InvalidNuri)?.as_str())?, + decode_sym_key(c.get(2).ok_or(NgError::InvalidNuri)?.as_str())?, + )) + }) + .collect::, NgError>>()?; + + if objects.len() < 1 { + return Err(NgError::InvalidNuri); + } + + Ok(Self { + identity: None, + target, + entire_store: false, + objects, + signature, + branch: None, + overlay: Some(overlay_id.into()), + access: vec![], + topic: None, + locator: Some(locator), + }) + } else { + Err(NgError::InvalidNuri) + } + } + pub fn new_from(from: &String) -> Result { let c = RE_REPO_O.captures(from); @@ -401,12 +475,13 @@ impl NuriV0 { identity: None, target: NuriTargetV0::Repo(repo_id), entire_store: false, - object: None, + objects: vec![], + signature: None, branch: None, overlay: None, access: vec![], topic: None, - locator: vec![], + locator: None, }) } else { let c = RE_FILE_READ_CAP.captures(from); @@ -423,12 +498,13 @@ impl NuriV0 { identity: None, target: NuriTargetV0::None, entire_store: false, - object: Some(id), + objects: vec![ObjectRef::from_id_key(id, key)], + signature: None, branch: None, overlay: None, - access: vec![NgAccessV0::Key(key)], + access: vec![], topic: None, - locator: vec![], + locator: None, }) } else { let c = RE_REPO.captures(from); @@ -447,12 +523,13 @@ impl NuriV0 { identity: None, target: NuriTargetV0::Repo(repo_id), entire_store: false, - object: None, + objects: vec![], + signature: None, branch: None, overlay: Some(overlay_id.into()), access: vec![], topic: None, - locator: vec![], + locator: None, }) } else { let c = RE_BRANCH.captures(from); @@ -473,12 +550,13 @@ impl NuriV0 { identity: None, target: NuriTargetV0::Repo(repo_id), entire_store: false, - object: None, + objects: vec![], + signature: None, branch: Some(TargetBranchV0::BranchId(branch_id)), overlay: Some(overlay_id.into()), access: vec![], topic: None, - locator: vec![], + locator: None, }) } else { Err(NgError::InvalidNuri) diff --git a/ng-net/src/connection.rs b/ng-net/src/connection.rs index a7dbb8b..5e4be10 100644 --- a/ng-net/src/connection.rs +++ b/ng-net/src/connection.rs @@ -789,7 +789,6 @@ impl NoiseFSM { let ext_req = ExtRequestV0 { content: ext_config.request.clone(), id: 0, - overlay: None, }; let protocol_start = StartProtocol::Ext(ExtRequest::V0(ext_req)); diff --git a/ng-net/src/types.rs b/ng-net/src/types.rs index 7d86089..2c01f14 100644 --- a/ng-net/src/types.rs +++ b/ng-net/src/types.rs @@ -197,8 +197,21 @@ pub struct BrokerServerV0 { } #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] -pub enum BrokerServer { - V0(BrokerServerV0), +pub struct BrokerServerContentV0 { + pub servers: Vec, + + pub version: u32, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct BrokerServer { + pub content: BrokerServerContentV0, + + /// peerId of the server + pub peer_id: PubKey, + + /// optional signature over content by peer_id + pub sig: Option, } pub type LocatorV0 = Vec; @@ -219,11 +232,64 @@ impl Locator { pub fn empty() -> Self { Self::V0(vec![]) } + pub fn first_broker_server(&self) -> Result { + match self { + Self::V0(v0) => { + let bs = v0.get(0).ok_or(NgError::BrokerNotFound)?; + Ok(BrokerServerV0 { + server_type: bs + .content + .servers + .get(0) + .ok_or(NgError::BrokerNotFound)? + .clone(), + can_verify: false, + can_forward: false, + peer_id: bs.peer_id, + }) + } + } + } + pub fn add(&mut self, bs: BrokerServerV0) { + match self { + Self::V0(v0) => { + for b in v0.iter_mut() { + if b.peer_id == bs.peer_id { + b.content.servers.push(bs.server_type); + return; + } + } + v0.push(BrokerServer { + peer_id: bs.peer_id, + sig: None, + content: BrokerServerContentV0 { + version: 0, + servers: vec![bs.server_type], + }, + }); + } + } + } +} + +impl TryFrom<&str> for Locator { + type Error = NgError; + fn try_from(string: &str) -> Result { + let vec = base64_url::decode(string).map_err(|_| NgError::InvalidKey)?; + Ok(serde_bare::from_slice(&vec).map_err(|_| NgError::InvalidKey)?) + } } impl From for Locator { fn from(bs: BrokerServerV0) -> Self { - Locator::V0(vec![BrokerServer::V0(bs)]) + Locator::V0(vec![BrokerServer { + peer_id: bs.peer_id, + content: BrokerServerContentV0 { + version: 0, + servers: vec![bs.server_type], + }, + sig: None, + }]) } } @@ -1388,6 +1454,18 @@ impl OverlayLink { } } +impl TryFrom for OverlayId { + type Error = NgError; + fn try_from(link: OverlayLink) -> Result { + Ok(match link { + OverlayLink::Inner(Digest::Blake3Digest32(i)) => OverlayId::Inner(i), + OverlayLink::Outer(Digest::Blake3Digest32(i)) => OverlayId::Outer(i), + OverlayLink::Global => OverlayId::Global, + _ => return Err(NgError::InvalidArgument), + }) + } +} + impl From for OverlayLink { fn from(id: OverlayId) -> Self { match id { @@ -3962,29 +4040,23 @@ impl ClientMessage { // EXTERNAL REQUESTS // -/// Request object(s) by ID from a repository by non-members -/// -/// The request is sent by a non-member to an overlay member node, -/// which has a replica of the repository. +/// Request object(s) by ID by non-members to a broker /// /// The response includes the requested objects and all their children recursively, -/// and optionally all object dependencies recursively. +/// and optionally all file object dependencies and their children recursively. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ExtObjectGetV0 { - /// Repository to request the objects from - pub repo: PubKey, + /// outer overlayId + pub overlay: OverlayId, /// List of Object IDs to request, including their children pub ids: Vec, - /// Whether or not to include all children recursively - pub include_children: bool, - - /// Expiry time after which the link becomes invalid - pub expiry: Option, + /// Whether or not to include all files objects + pub include_files: bool, } -/// Request object(s) by ID from a repository by non-members +/// Request object(s) by ID by non-members #[derive(Clone, Debug, Serialize, Deserialize)] pub enum ExtObjectGet { V0(ExtObjectGetV0), @@ -4003,7 +4075,7 @@ pub type ExtTopicSyncReq = TopicSyncReq; #[derive(Clone, Debug, Serialize, Deserialize)] pub enum ExtRequestContentV0 { WalletGetExport(ExtWalletGetExportV0), - ExtObjectGet(ExtObjectGet), + ExtObjectGet(ExtObjectGetV0), ExtTopicSyncReq(ExtTopicSyncReq), // TODO inbox requests // TODO subreq ? @@ -4013,9 +4085,8 @@ impl ExtRequestContentV0 { pub fn get_actor(&self) -> Box { match self { Self::WalletGetExport(a) => a.get_actor(), - _ => unimplemented!() - // Self::ExtObjectGet(a) => a.get_actor(), - // Self::ExtTopicSyncReq(a) => a.get_actor(), + Self::ExtObjectGet(a) => a.get_actor(), + _ => unimplemented!(), // Self::ExtTopicSyncReq(a) => a.get_actor(), } } } @@ -4023,9 +4094,6 @@ impl ExtRequestContentV0 { /// External request with its request ID #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ExtRequestV0 { - /// outer overlayId - pub overlay: Option, - /// Request ID pub id: i64, @@ -4070,6 +4138,7 @@ pub struct ExportedWallet(pub serde_bytes::ByteBuf); pub enum ExtResponseContentV0 { EmptyResponse, Block(Block), + Blocks(Vec), Wallet(ExportedWallet), // TODO inbox related replies // TODO event ? diff --git a/ng-repo/src/block.rs b/ng-repo/src/block.rs index d730c36..2dc68d4 100644 --- a/ng-repo/src/block.rs +++ b/ng-repo/src/block.rs @@ -193,6 +193,12 @@ impl Block { } } + pub fn destroy_header(&mut self) { + match self { + Block::V0(b) => b.commit_header_key = None, + } + } + /// Get the header reference pub fn header_ref(&self) -> Option { match self { diff --git a/ng-repo/src/commit.rs b/ng-repo/src/commit.rs index eaac55d..2bcb85f 100644 --- a/ng-repo/src/commit.rs +++ b/ng-repo/src/commit.rs @@ -492,6 +492,10 @@ impl Commit { } } + pub fn body_ref(&self) -> &ObjectRef { + &self.content_v0().body + } + /// Get commit content V0 pub fn content_v0(&self) -> &CommitContentV0 { match self { diff --git a/ng-repo/src/object.rs b/ng-repo/src/object.rs index 093ed88..b411480 100644 --- a/ng-repo/src/object.rs +++ b/ng-repo/src/object.rs @@ -489,6 +489,23 @@ impl Object { id: ObjectId, key: Option, store: &Store, + ) -> Result { + Self::load_(id, key, store, true) + } + + pub fn load_without_header( + id: ObjectId, + key: Option, + store: &Store, + ) -> Result { + Self::load_(id, key, store, false) + } + + fn load_( + id: ObjectId, + key: Option, + store: &Store, + with_header: bool, ) -> Result { fn load_tree( parents: Vec, @@ -540,22 +557,27 @@ impl Object { root.set_key(key); } - let header = match Self::load_header_(root, store) { - Err(ObjectParseError::MissingBlocks(m)) => { - return Err(ObjectParseError::MissingHeaderBlocks(( - Object { - blocks, - block_contents, - header: None, - header_blocks: vec![], - #[cfg(test)] - already_saved: false, - }, - m, - ))); + let header = if with_header { + match Self::load_header_(root, store) { + Err(ObjectParseError::MissingBlocks(m)) => { + return Err(ObjectParseError::MissingHeaderBlocks(( + Object { + blocks, + block_contents, + header: None, + header_blocks: vec![], + #[cfg(test)] + already_saved: false, + }, + m, + ))); + } + Err(e) => return Err(e), + Ok(h) => h, } - Err(e) => return Err(e), - Ok(h) => h, + } else { + root.destroy_header(); + (None, vec![]) }; Ok(Object { diff --git a/ng-repo/src/store.rs b/ng-repo/src/store.rs index 5f37969..ab4663b 100644 --- a/ng-repo/src/store.rs +++ b/ng-repo/src/store.rs @@ -324,6 +324,8 @@ impl Store { &self, )?; + let repository_commit_body_ref = repository_commit.body_ref().clone(); + //log_debug!("REPOSITORY COMMIT {}", repository_commit); let repository_commit_ref = repository_commit.reference().unwrap(); @@ -492,7 +494,7 @@ impl Store { // creating root certificate of the repo let cert_content = CertificateContentV0 { - previous: repository_commit_ref, + previous: repository_commit_body_ref, readcap_id: root_branch_readcap_id, owners_pk_set: pk_set.public_key(), orders_pk_sets: OrdersPublicKeySetsV0::None, diff --git a/ng-repo/src/types.rs b/ng-repo/src/types.rs index 4d64126..de6793f 100644 --- a/ng-repo/src/types.rs +++ b/ng-repo/src/types.rs @@ -27,7 +27,7 @@ use crate::errors::NgError; use crate::utils::{ decode_key, decode_priv_key, dh_pubkey_array_from_ed_pubkey_slice, dh_pubkey_from_ed_pubkey_slice, ed_privkey_to_ed_pubkey, from_ed_privkey_to_dh_privkey, - random_key, + random_key, verify, }; // @@ -2345,6 +2345,14 @@ pub enum SignatureContent { V0(SignatureContentV0), } +impl SignatureContent { + pub fn commits(&self) -> &[ObjectId] { + match self { + Self::V0(v0) => &v0.commits, + } + } +} + impl fmt::Display for SignatureContent { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -2398,6 +2406,21 @@ pub struct SignatureV0 { pub certificate_ref: ObjectRef, } +impl SignatureV0 { + pub fn verify(&self, cert: &CertificateV0) -> Result<(), NgError> { + let ser = serde_bare::to_vec(&self.content).unwrap(); + match &self.threshold_sig { + ThresholdSignatureV0::Owners(sig) => { + if !cert.get_owners_pub_key().verify(sig, &ser) { + return Err(NgError::InvalidSignature); + } + return Ok(()); + } + _ => unimplemented!(), + } + } +} + impl fmt::Display for Signature { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -2454,7 +2477,7 @@ pub enum OrdersPublicKeySetsV0 { /// A Certificate content, that will be signed by the previous certificate signers. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct CertificateContentV0 { - /// the previous certificate in the chain of trust. Can be another Certificate or the Repository commit when we are at the root of the chain of trust. + /// the previous certificate in the chain of trust. Can be another Certificate or the Repository commit's body when we are at the root of the chain of trust. pub previous: ObjectRef, /// The Commit Id of the latest RootBranch definition (= the ReadCap ID) in order to keep in sync with the options for signing. @@ -2497,6 +2520,19 @@ pub struct CertificateV0 { pub sig: CertificateSignatureV0, } +impl CertificateV0 { + pub fn verify_with_repo_id(&self, repo_id: &RepoId) -> Result<(), NgError> { + let ser = serde_bare::to_vec(&self.content).unwrap(); + match self.sig { + CertificateSignatureV0::Repo(sig) => verify(&ser, sig, repo_id.clone()), + _ => Err(NgError::InvalidArgument), + } + } + pub fn get_owners_pub_key(&self) -> &threshold_crypto::PublicKey { + &self.content.owners_pk_set + } +} + /// A certificate object #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum Certificate { diff --git a/ng-verifier/src/request_processor.rs b/ng-verifier/src/request_processor.rs index 49150d6..e85da60 100644 --- a/ng-verifier/src/request_processor.rs +++ b/ng-verifier/src/request_processor.rs @@ -55,71 +55,66 @@ impl Verifier { _ => unimplemented!(), }, AppRequestCommandV0::FileGet => { - if nuri.access.len() < 1 || nuri.object.is_none() { + if nuri.objects.len() < 1 { return Err(NgError::InvalidArgument); } let (repo_id, _, store_repo) = self.resolve_target(&nuri.target)?; - let access = nuri.access.get(0).unwrap(); - if let NgAccessV0::Key(key) = access { - let repo = self.get_repo(&repo_id, &store_repo)?; - let obj_id = nuri.object.unwrap(); - if let Some(mut stream) = self - .fetch_blocks_if_needed(&obj_id, &repo_id, &store_repo) - .await? - { - // TODO: start opening the file and running the sending_loop after we received 10 (3 mandatory and 7 depths max) blocks. - // for files below 10MB we wont see a difference, but for big files, we can start sending out some AppResponse earlier. - while let Some(block) = stream.next().await { - repo.store.put(&block)?; - } + let obj = nuri.objects.get(0).unwrap(); + let repo = self.get_repo(&repo_id, &store_repo)?; + if let Some(mut stream) = self + .fetch_blocks_if_needed(&obj.id, &repo_id, &store_repo) + .await? + { + // TODO: start opening the file and running the sending_loop after we received 10 (3 mandatory and 7 depths max) blocks. + // for files below 10MB we wont see a difference, but for big files, we can start sending out some AppResponse earlier. + while let Some(block) = stream.next().await { + repo.store.put(&block)?; } - let file = - RandomAccessFile::open(obj_id, key.clone(), Arc::clone(&repo.store))?; - - let (mut tx, rx) = mpsc::unbounded::(); - tx.send(AppResponse::V0(AppResponseV0::FileMeta(FileMetaV0 { - content_type: file.meta().content_type().clone(), - size: file.meta().total_size(), - }))) - .await - .map_err(|_| NgError::InternalError)?; - - async fn sending_loop( - file: Arc, - mut tx: Sender, - ) -> ResultSend<()> { - let mut pos = 0; - loop { - let res = file.read(pos, 1048564); - - if res.is_err() { - //log_info!("ERR={:?}", res.unwrap_err()); - let _ = tx.send(AppResponse::V0(AppResponseV0::EndOfStream)).await; - tx.close_channel(); - break; - } - let res = res.unwrap(); - //log_info!("reading={} {}", pos, res.len()); - pos += res.len(); - if let Err(_) = tx - .send(AppResponse::V0(AppResponseV0::FileBinary(res))) - .await - { - break; - } + } + let file = + RandomAccessFile::open(obj.id, obj.key.clone(), Arc::clone(&repo.store))?; + + let (mut tx, rx) = mpsc::unbounded::(); + tx.send(AppResponse::V0(AppResponseV0::FileMeta(FileMetaV0 { + content_type: file.meta().content_type().clone(), + size: file.meta().total_size(), + }))) + .await + .map_err(|_| NgError::InternalError)?; + + async fn sending_loop( + file: Arc, + mut tx: Sender, + ) -> ResultSend<()> { + let mut pos = 0; + loop { + let res = file.read(pos, 1048564); + + if res.is_err() { + //log_info!("ERR={:?}", res.unwrap_err()); + let _ = tx.send(AppResponse::V0(AppResponseV0::EndOfStream)).await; + tx.close_channel(); + break; + } + let res = res.unwrap(); + //log_info!("reading={} {}", pos, res.len()); + pos += res.len(); + if let Err(_) = tx + .send(AppResponse::V0(AppResponseV0::FileBinary(res))) + .await + { + break; } - Ok(()) } - - spawn_and_log_error(sending_loop(Arc::new(file), tx.clone())); - let fnonce = Box::new(move || { - //log_debug!("FileGet cancelled"); - tx.close_channel(); - }); - Ok((rx, fnonce)) - } else { - return Err(NgError::InvalidArgument); + Ok(()) } + + spawn_and_log_error(sending_loop(Arc::new(file), tx.clone())); + let fnonce = Box::new(move || { + //log_debug!("FileGet cancelled"); + tx.close_channel(); + }); + Ok((rx, fnonce)) } _ => unimplemented!(), } diff --git a/ng-wallet/src/types.rs b/ng-wallet/src/types.rs index fc3fccf..fb389fe 100644 --- a/ng-wallet/src/types.rs +++ b/ng-wallet/src/types.rs @@ -1134,14 +1134,12 @@ impl BrokerInfoV0 { } } pub fn vec_into_locator(list: Vec) -> Locator { - Locator::V0( - list.into_iter() - .filter_map(|info| match info { - Self::CoreV0(_) => None, - Self::ServerV0(bs) => Some(BrokerServer::V0(bs)), - }) - .collect(), - ) + let mut loc = Locator::empty(); + list.into_iter().for_each(|info| match info { + Self::CoreV0(_) => {} + Self::ServerV0(bs) => loc.add(bs), + }); + loc } } diff --git a/ngcli/src/get.rs b/ngcli/src/get.rs new file mode 100644 index 0000000..84d46b8 --- /dev/null +++ b/ngcli/src/get.rs @@ -0,0 +1,8 @@ +// Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers +// All rights reserved. +// Licensed under the Apache License, Version 2.0 +// +// or the MIT license , +// at your option. All files in the project carrying such +// notice may not be copied, modified, or distributed except +// according to those terms. diff --git a/ngcli/src/main.rs b/ngcli/src/main.rs index 1ce7c31..815d6f4 100644 --- a/ngcli/src/main.rs +++ b/ngcli/src/main.rs @@ -10,6 +10,7 @@ #![doc(hidden)] use core::fmt; +use std::collections::HashSet; use std::error::Error; use std::fs::{read_to_string, write}; use std::net::IpAddr; @@ -24,15 +25,20 @@ use zeroize::Zeroize; use ng_repo::errors::*; use ng_repo::log::*; +use ng_repo::object::Object; +use ng_repo::store::Store; use ng_repo::types::*; use ng_repo::utils::{decode_priv_key, display_timestamp, generate_keypair, timestamp_after}; use ng_net::actors::admin::*; -use ng_net::broker::BROKER; +use ng_net::app_protocol::{NuriTargetV0, NuriV0}; +use ng_net::broker::{Broker, BROKER}; use ng_net::types::*; use ng_client_ws::remote_ws::ConnectionWebSocket; +mod get; + /// CliConfig Version 0 #[derive(Clone, Debug, Serialize, Deserialize)] pub struct CliConfigV0 { @@ -227,7 +233,11 @@ async fn main_inner() -> Result<(), NgcliError> { .subcommand( Command::new("gen-key") .about("Generates a new key pair () public key and private key ) to be used by example for user authentication.") - ) + ).subcommand( + Command::new("get") + .about("fetches one or several commits, or a binary object, with an optional signature, from a broker, using the Ext Protocol, and connecting on the Outer Overlay. The request is anonymous and doesn't need any authentication") + .arg(arg!([NURI] "NextGraph URI of the commit(s) or object, containing the ReadCap in the form :c:k :j:k and optionally :s:k and the usual :o:v:l").required(true)) + ) .get_matches(); if std::env::var("RUST_LOG").is_err() { @@ -329,6 +339,183 @@ async fn main_inner() -> Result<(), NgcliError> { None::<()> }); + match matches.subcommand() { + Some(("get", sub_matches)) => { + log_debug!("processing get command"); + + let nuri = NuriV0::new_for_readcaps(sub_matches.get_one::("NURI").unwrap())?; + let overlay_id = nuri.overlay.unwrap().try_into()?; + + let peer_privk = PrivKey::Ed25519PrivKey(keys[1]); + let peer_pubk = peer_privk.to_pub(); + + let broker_server = nuri.locator.unwrap().first_broker_server()?; + + let mut ids = nuri.objects.iter().map(|o| o.id).collect::>(); + let in_nuri: HashSet = HashSet::from_iter(ids.iter().cloned()); + nuri.signature.as_ref().map(|s| ids.push(s.id.clone())); + + let mut store = Store::new_temp_in_mem(); + store.overlay_id = overlay_id; + + let blocks: Vec = Broker::ext( + Box::new(ConnectionWebSocket {}), + peer_privk.clone(), + peer_pubk, + broker_server.peer_id, + broker_server.get_ws_url(&None).await.unwrap().0, // for now we are only connecting to NextGraph SaaS cloud (nextgraph.eu) so it is safe. + ExtObjectGetV0 { + overlay: overlay_id, + ids, + include_files: true, + }, + ) + .await?; + blocks.into_iter().for_each(|b| _ = store.put(&b)); + + let mut next_round = Vec::with_capacity(2); + + let mut signature = None; + + if let Some(sign_ref) = nuri.signature { + if let Ok(o) = Object::load(sign_ref.id, Some(sign_ref.key), &store) { + match o.content_v0() { + Ok(ObjectContentV0::Signature(Signature::V0(v0))) => { + let in_sig = HashSet::from_iter(v0.content.commits().iter().cloned()); + if in_nuri.is_subset(&in_sig) { + next_round.push((v0.certificate_ref.clone(), None)); + signature = Some(v0); + } else { + println!("Signature is invalid"); + log_err!("Signature is invalid"); + } + } + _ => println!("Error: invalid signature object"), + } + } + } + + nuri.objects.into_iter().for_each(|obj_ref| { + match Object::load(obj_ref.id, Some(obj_ref.key), &store) { + Err(e) => println!("Error: {:?}", e), + Ok(o) => match o.content_v0() { + Err(e) => println!("Error: {:?}", e), + Ok(ObjectContentV0::Commit(c)) => { + next_round.push((c.body_ref().clone(), Some(obj_ref.id))); + } + _ => println!("unsupported format"), + }, + } + }); + + let blocks: Vec = Broker::ext( + Box::new(ConnectionWebSocket {}), + peer_privk.clone(), + peer_pubk, + broker_server.peer_id, + broker_server.get_ws_url(&None).await.unwrap().0, // for now we are only connecting to NextGraph SaaS cloud (nextgraph.eu) so it is safe. + ExtObjectGetV0 { + overlay: overlay_id, + ids: next_round + .iter() + .map(|(o, _)| o.id) + .collect::>(), + include_files: true, + }, + ) + .await?; + blocks.into_iter().for_each(|b| _ = store.put(&b)); + + let mut third_round = Vec::with_capacity(2); + let mut certificate = None; + for (body_ref, commit_id) in next_round.into_iter() { + match Object::load(body_ref.id, Some(body_ref.key), &store) { + Err(e) => println!("Error: {:?}", e), + Ok(o) => match o.content_v0() { + Err(e) => println!("Error: {:?}", e), + Ok(ObjectContentV0::CommitBody(CommitBody::V0( + CommitBodyV0::Snapshot(Snapshot::V0(snap)), + ))) => { + println!("Snapshot: {}", commit_id.unwrap()); + third_round.push(snap.content); + } + Ok(ObjectContentV0::CommitBody(CommitBody::V0( + CommitBodyV0::AsyncTransaction(_t), + ))) => { + println!("Transaction: {}", commit_id.unwrap()); + } + Ok(ObjectContentV0::Certificate(Certificate::V0(certif))) => { + if commit_id.is_none() && signature.is_some() { + third_round.push(certif.content.previous.clone()); + certificate = Some(certif); + } + } + _ => println!("unsupported object"), + }, + } + } + + let blocks: Vec = Broker::ext( + Box::new(ConnectionWebSocket {}), + peer_privk, + peer_pubk, + broker_server.peer_id, + broker_server.get_ws_url(&None).await.unwrap().0, // for now we are only connecting to NextGraph SaaS cloud (nextgraph.eu) so it is safe. + ExtObjectGetV0 { + overlay: overlay_id, + ids: third_round.iter().map(|o| o.id).collect::>(), + include_files: true, + }, + ) + .await?; + blocks.into_iter().for_each(|b| _ = store.put(&b)); + + for third_ref in third_round.into_iter() { + match Object::load(third_ref.id, Some(third_ref.key), &store) { + Err(e) => println!("Error: {:?}", e), + Ok(o) => match o.content_v0() { + Err(e) => println!("Error: {:?}", e), + Ok(ObjectContentV0::Snapshot(snap)) => match String::from_utf8(snap) { + Err(e) => println!("Error: {:?}", e), + Ok(s) => println!("Here is the snapshot data :\n\r{}", s), + }, + Ok(ObjectContentV0::CommitBody(CommitBody::V0( + CommitBodyV0::Repository(repo), + ))) => { + // DO THE SIGNATURE VERIFICATION + if signature.is_some() && certificate.is_some() { + if let NuriTargetV0::Repo(repo_id) = nuri.target { + if repo.id() != &repo_id { + println!( + "Repo in Nuri and Certificate differ : {}", + repo_id + ) + } + } + println!("Repo that signed : {}", repo.id()); + + certificate + .as_ref() + .unwrap() + .verify_with_repo_id(&repo.id())?; + signature + .as_ref() + .unwrap() + .verify(certificate.as_ref().unwrap())?; + + println!("Signature is valid!"); + } + // TODO deal with more rounds needed if root certificate is not the second one in chain + } + _ => println!("unsupported object"), + }, + } + } + return Ok(()); + } + _ => {} + } + // reading config from file, if any let mut config_path = path.clone(); config_path.push("config.json");