From 905d7eb613c94a4b3dfe6ccc5347251389a84fe4 Mon Sep 17 00:00:00 2001 From: Niko Date: Wed, 3 May 2023 02:29:39 +0300 Subject: [PATCH] connection mechanism with FSM and Actors --- Cargo.lock | 38 ++++ p2p-broker/src/auth.rs | 99 +++++----- p2p-broker/src/server.rs | 118 +++++------ p2p-client-ws/src/remote_ws.rs | 4 +- p2p-client-ws/src/remote_ws_wasm.rs | 6 +- p2p-client/src/lib.rs | 9 +- p2p-net/Cargo.toml | 2 + p2p-net/src/actor.rs | 140 ++++++++++--- p2p-net/src/actors/mod.rs | 3 + p2p-net/src/actors/noise.rs | 48 ++++- p2p-net/src/actors/start.rs | 166 ++++++++++++++++ p2p-net/src/broker.rs | 1 - p2p-net/src/connection.rs | 292 +++++++++++++++++++++++++++- p2p-net/src/errors.rs | 2 + p2p-net/src/types.rs | 184 ++++++++++++------ 15 files changed, 893 insertions(+), 219 deletions(-) create mode 100644 p2p-net/src/actors/start.rs diff --git a/Cargo.lock b/Cargo.lock index 027416c..abbafaf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1123,6 +1123,31 @@ dependencies = [ "p2p-net", ] +[[package]] +name = "noise-protocol" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fb474d36dfe51bb4d7e733fee2b0dfd92ee1b95c716030a70e92737dea1a52b" +dependencies = [ + "arrayvec", +] + +[[package]] +name = "noise-rust-crypto" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82e7cfeb8e6a63b4a5ccef34ed7a22d084a129b1e53a000c080bbc54c0da6f8c" +dependencies = [ + "aes-gcm", + "blake2", + "chacha20poly1305", + "getrandom 0.2.8", + "noise-protocol", + "sha2 0.10.6", + "x25519-dalek", + "zeroize", +] + [[package]] name = "num-traits" version = "0.2.15" @@ -1254,6 +1279,8 @@ dependencies = [ "debug_print", "futures", "gloo-timers", + "noise-protocol", + "noise-rust-crypto", "num_enum", "once_cell", "p2p-repo", @@ -2291,6 +2318,17 @@ dependencies = [ "web-sys", ] +[[package]] +name = "x25519-dalek" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2392b6b94a576b4e2bf3c5b2757d63f10ada8020a2e4d08ac849ebcf6ea8e077" +dependencies = [ + "curve25519-dalek 3.2.1", + "rand_core 0.5.1", + "zeroize", +] + [[package]] name = "xactor" version = "0.7.11" diff --git a/p2p-broker/src/auth.rs b/p2p-broker/src/auth.rs index 6358182..fbb1678 100644 --- a/p2p-broker/src/auth.rs +++ b/p2p-broker/src/auth.rs @@ -1,7 +1,7 @@ // Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers // All rights reserved. // Licensed under the Apache License, Version 2.0 -// +// // or the MIT license , // at your option. All files in the project carrying such // notice may not be copied, modified, or distributed except @@ -13,10 +13,11 @@ use debug_print::*; use futures::future::BoxFuture; use futures::future::OptionFuture; use futures::FutureExt; -use p2p_repo::types::*; -use p2p_repo::utils::*; +use p2p_net::actors::*; use p2p_net::errors::*; use p2p_net::types::*; +use p2p_repo::types::*; +use p2p_repo::utils::*; use rust_fsm::*; // state_machine! { @@ -113,52 +114,52 @@ impl AuthProtocolHandler { ) -> Result, ProtocolError> { // match handler.machine.state() { // &AuthProtocolServerState::ServerHelloSent => { - let message = serde_bare::from_slice::(&frame)?; - // let _ = handler - // .machine - // .consume(&AuthProtocolServerInput::ClientAuthReceived) - // .map_err(|_e| ProtocolError::InvalidState)?; - - // verifying client auth - - debug_println!("verifying client auth"); - - let _ = verify( - &serde_bare::to_vec(&message.content_v0()).unwrap(), - message.sig(), - message.user(), - ) - .map_err(|_e| ProtocolError::AccessDenied)?; - - // debug_println!( - // "matching nonce : {:?} {:?}", - // message.nonce(), - // handler.nonce.as_ref().unwrap() - // ); - - if message.nonce() != handler.nonce.as_ref().unwrap() { - // let _ = handler - // .machine - // .consume(&AuthProtocolServerInput::Error) - // .map_err(|_e| ProtocolError::InvalidState); - - return Err(ProtocolError::AccessDenied); - } - - // TODO check that the device has been registered for this user. if not, return AccessDenied - - // all is good, we advance the FSM and send back response - // let _ = handler - // .machine - // .consume(&AuthProtocolServerInput::Ok) - // .map_err(|_e| ProtocolError::InvalidState)?; - - handler.user = Some(message.user()); - - Ok(vec![]) // without any metadata - //} - //_ => Err(ProtocolError::InvalidState), - //} + let message = serde_bare::from_slice::(&frame)?; + // let _ = handler + // .machine + // .consume(&AuthProtocolServerInput::ClientAuthReceived) + // .map_err(|_e| ProtocolError::InvalidState)?; + + // verifying client auth + + debug_println!("verifying client auth"); + + let _ = verify( + &serde_bare::to_vec(&message.content_v0()).unwrap(), + message.sig(), + message.user(), + ) + .map_err(|_e| ProtocolError::AccessDenied)?; + + // debug_println!( + // "matching nonce : {:?} {:?}", + // message.nonce(), + // handler.nonce.as_ref().unwrap() + // ); + + if message.nonce() != handler.nonce.as_ref().unwrap() { + // let _ = handler + // .machine + // .consume(&AuthProtocolServerInput::Error) + // .map_err(|_e| ProtocolError::InvalidState); + + return Err(ProtocolError::AccessDenied); + } + + // TODO check that the device has been registered for this user. if not, return AccessDenied + + // all is good, we advance the FSM and send back response + // let _ = handler + // .machine + // .consume(&AuthProtocolServerInput::Ok) + // .map_err(|_e| ProtocolError::InvalidState)?; + + handler.user = Some(message.user()); + + Ok(vec![]) // without any metadata + //} + //_ => Err(ProtocolError::InvalidState), + //} } let res = process_state(self, frame); diff --git a/p2p-broker/src/server.rs b/p2p-broker/src/server.rs index 2416a17..86a1537 100644 --- a/p2p-broker/src/server.rs +++ b/p2p-broker/src/server.rs @@ -1,7 +1,7 @@ // Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers // All rights reserved. // Licensed under the Apache License, Version 2.0 -// +// // or the MIT license , // at your option. All files in the project carrying such // notice may not be copied, modified, or distributed except @@ -16,27 +16,28 @@ use std::pin::Pin; use std::sync::Arc; use std::sync::RwLock; -use crate::broker_store::account::Account; use crate::auth::*; +use crate::broker_store::account::Account; use crate::broker_store::config::Config; use crate::broker_store::config::ConfigMode; -use crate::connection_local::BrokerConnectionLocal; use crate::broker_store::overlay::Overlay; use crate::broker_store::peer::Peer; use crate::broker_store::repostoreinfo::RepoStoreInfo; +use crate::connection_local::BrokerConnectionLocal; use async_std::task; use debug_print::*; use futures::future::BoxFuture; use futures::future::OptionFuture; use futures::FutureExt; use futures::Stream; +use p2p_net::actors::*; +use p2p_net::errors::*; +use p2p_net::types::*; use p2p_repo::object::Object; use p2p_repo::store::RepoStore; use p2p_repo::store::StorageError; use p2p_repo::types::*; use p2p_repo::utils::*; -use p2p_net::errors::*; -use p2p_net::types::*; use p2p_stores_lmdb::broker_store::LmdbBrokerStore; use p2p_stores_lmdb::repo_store::LmdbRepoStore; @@ -87,7 +88,6 @@ pub struct ProtocolHandler { } impl ProtocolHandler { - pub fn register(&mut self, addr: SocketAddr) { self.addr = Some(addr); } @@ -97,8 +97,12 @@ impl ProtocolHandler { ProtocolType::Start => (), ProtocolType::Auth => (), ProtocolType::Broker => { - let _ = self.broker_protocol.as_ref().unwrap().deregister(self.addr.unwrap()); - }, + let _ = self + .broker_protocol + .as_ref() + .unwrap() + .deregister(self.addr.unwrap()); + } ProtocolType::Ext => (), ProtocolType::Core => (), } @@ -122,7 +126,7 @@ impl ProtocolHandler { ProtocolType::Start => { let message = serde_bare::from_slice::(&frame); match message { - Ok(StartProtocol::Auth(b)) => { + Ok(StartProtocol::Client(b)) => { self.protocol = ProtocolType::Auth; self.auth_protocol = Some(AuthProtocolHandler::new()); return ( @@ -140,7 +144,10 @@ impl ProtocolHandler { ); } Err(e) => { - return (Err(ProtocolError::SerializationError),OptionFuture::from(None)) + return ( + Err(ProtocolError::SerializationError), + OptionFuture::from(None), + ) } } } @@ -160,12 +167,18 @@ impl ProtocolHandler { self.protocol = ProtocolType::Broker; self.broker_protocol = Some(Arc::clone(&bp)); self.auth_protocol = None; - return (res.0, OptionFuture::from(None)) - }, + return (res.0, OptionFuture::from(None)); + } Err(e) => { let val = e.clone() as u16; - let reply = AuthResult::V0(AuthResultV0 { result:val, metadata:vec![] }); - return (Ok(serde_bare::to_vec(&reply).unwrap()), OptionFuture::from(Some(async move { val }.boxed()))) + let reply = AuthResult::V0(AuthResultV0 { + result: val, + metadata: vec![], + }); + return ( + Ok(serde_bare::to_vec(&reply).unwrap()), + OptionFuture::from(Some(async move { val }.boxed())), + ); } } } @@ -184,9 +197,10 @@ impl ProtocolHandler { .await; (Ok(serde_bare::to_vec(&reply.0).unwrap()), reply.1) } - Err(e_) => { - (Err(ProtocolError::SerializationError),OptionFuture::from(None)) - } + Err(e_) => ( + Err(ProtocolError::SerializationError), + OptionFuture::from(None), + ), } } ProtocolType::Ext => { @@ -204,7 +218,7 @@ impl ProtocolHandler { pub struct ExtProtocolHandler {} impl ExtProtocolHandler { - pub fn handle_incoming(&self, msg: ExtRequest) -> ExtResponse { + pub fn handle_incoming(&self, msg: ExtHello) -> ExtResponse { unimplemented!() } } @@ -219,7 +233,7 @@ use std::{thread, time}; impl BrokerProtocolHandler { fn prepare_reply_broker_message( res: Result<(), ProtocolError>, - id: u64, + id: i64, padding_size: usize, ) -> BrokerMessage { let result = match res { @@ -238,7 +252,7 @@ impl BrokerProtocolHandler { fn prepare_reply_broker_overlay_message( res: Result<(), ProtocolError>, - id: u64, + id: i64, overlay: OverlayId, block: Option, padding_size: usize, @@ -271,7 +285,7 @@ impl BrokerProtocolHandler { fn prepare_reply_broker_overlay_message_stream( res: Result, - id: u64, + id: i64, overlay: OverlayId, padding_size: usize, ) -> BrokerMessage { @@ -304,7 +318,7 @@ impl BrokerProtocolHandler { async fn send_block_stream_response_to_client( &self, res: Result, ProtocolError>, - id: u64, + id: i64, overlay: OverlayId, padding_size: usize, ) -> (BrokerMessage, OptionFuture>) { @@ -364,23 +378,23 @@ impl BrokerProtocolHandler { ); } - pub fn register(self: Arc, addr: SocketAddr) -> Result<(),ProtocolError> { + pub fn register(self: Arc, addr: SocketAddr) -> Result<(), ProtocolError> { //FIXME: peer_id must be real one - - self.broker.add_client_peer(PubKey::Ed25519PubKey([0;32]), Arc::clone(&self)) + + self.broker + .add_client_peer(PubKey::Ed25519PubKey([0; 32]), Arc::clone(&self)) } - pub fn deregister(&self, addr: SocketAddr) -> Result<(),ProtocolError> { - - self.broker.remove_client_peer(PubKey::Ed25519PubKey([0;32])); + pub fn deregister(&self, addr: SocketAddr) -> Result<(), ProtocolError> { + self.broker + .remove_client_peer(PubKey::Ed25519PubKey([0; 32])); Ok(()) - } + } pub async fn handle_incoming( &self, msg: BrokerMessage, ) -> (BrokerMessage, OptionFuture>) { - let padding_size = 20; // TODO randomize, if config of server contains padding_max let id = msg.id(); @@ -508,7 +522,6 @@ pub enum PeerConnection { pub struct BrokerPeerInfo { lastPeerAdvert: Option, connected: PeerConnection, - } const REPO_STORES_SUBDIR: &str = "repos"; @@ -522,32 +535,30 @@ pub struct BrokerServer { //overlayid_to_repostore: HashMap, //overlayid_to_repostore: Arc>>, peers: RwLock>, - - //local_connections: + //local_connections: } impl BrokerServer { - - pub fn add_client_peer(&self, peer_id: DirectPeerId, bph: Arc) -> Result<(),ProtocolError> { - + pub fn add_client_peer( + &self, + peer_id: DirectPeerId, + bph: Arc, + ) -> Result<(), ProtocolError> { let mut writer = self.peers.write().expect("write peers hashmap"); let bpi = BrokerPeerInfo { lastPeerAdvert: None, //TODO: load from store - connected: PeerConnection::CLIENT(bph) + connected: PeerConnection::CLIENT(bph), }; - if !writer.get(&peer_id).is_none() { - return Err(ProtocolError::PeerAlreadyConnected); + if !writer.get(&peer_id).is_none() { + return Err(ProtocolError::PeerAlreadyConnected); } writer.insert(peer_id.clone(), bpi); Ok(()) - } pub fn remove_client_peer(&self, peer_id: DirectPeerId) { - let mut writer = self.peers.write().expect("write peers hashmap"); writer.remove(&peer_id); - } pub fn new(store: LmdbBrokerStore, mode: ConfigMode) -> Result { @@ -561,15 +572,11 @@ impl BrokerServer { mode: configmode, repo_stores: Arc::new(RwLock::new(HashMap::new())), //overlayid_to_repostore: Arc::new(RwLock::new(HashMap::new())), - peers: RwLock::new(HashMap::new()) + peers: RwLock::new(HashMap::new()), }) } - fn open_or_create_repostore( - &self, - repo_hash: RepoHash, - f: F, - ) -> Result + fn open_or_create_repostore(&self, repo_hash: RepoHash, f: F) -> Result where F: FnOnce(&LmdbRepoStore) -> Result, { @@ -580,7 +587,7 @@ impl BrokerServer { let mut path = self.store.path(); path.push(REPO_STORES_SUBDIR); path.push::(repo_hash.clone().into()); - std::fs::create_dir_all(path.clone()).map_err(|_e| ProtocolError::WriteError )?; + std::fs::create_dir_all(path.clone()).map_err(|_e| ProtocolError::WriteError)?; println!("path for repo store: {}", path.to_str().unwrap()); let repo = LmdbRepoStore::open(&path, *key.slice()); let mut writer = self.repo_stores.write().expect("write repo_store hashmap"); @@ -602,14 +609,13 @@ impl BrokerServer { let reader = self.repo_stores.read().expect("read repo_store hashmap"); let rep = reader.get(&repostore_id); if let Some(repo) = rep { - return f(repo) + return f(repo); } } // we need to open/create it // TODO: last_access return self.open_or_create_repostore(repostore_id, |repo| f(repo)); - // } else { // // it is ConfigMode::Local // { @@ -654,7 +660,7 @@ impl BrokerServer { pub fn protocol_handler(self: Arc) -> ProtocolHandler { let (s, r) = async_channel::unbounded::>(); return ProtocolHandler { - addr:None, + addr: None, broker: Arc::clone(&self), protocol: ProtocolType::Start, auth_protocol: None, @@ -943,12 +949,8 @@ impl BrokerServer { getrandom::getrandom(&mut random_buf).unwrap(); let key = SymKey::ChaCha20Key(random_buf); - let _ = RepoStoreInfo::create( - &overlay_id, - &key, - &self.store, - )?; // TODO in case of error, delete the previously created Overlay - //debug_println!("KEY ADDED"); + let _ = RepoStoreInfo::create(&overlay_id, &key, &self.store)?; // TODO in case of error, delete the previously created Overlay + //debug_println!("KEY ADDED"); over } Err(e) => return Err(e.into()), diff --git a/p2p-client-ws/src/remote_ws.rs b/p2p-client-ws/src/remote_ws.rs index 6f7199c..7e1095c 100644 --- a/p2p-client-ws/src/remote_ws.rs +++ b/p2p-client-ws/src/remote_ws.rs @@ -127,8 +127,6 @@ impl IConnection for ConnectionWebSocket { //spawn_and_log_error(ws_loop(ws, cnx.take_sender(), cnx.take_receiver())); - log!("sending..."); - // //cnx.close().await; @@ -288,7 +286,7 @@ async fn ws_loop( code = ProtocolError::OtherError as u16; } close_ws(&mut ws, &mut receiver, code + 4000, &proto_err.to_string()).await?; - return Err(NetError::ProtocolError); + //return Err(NetError::ProtocolError); } } Err(e) => { diff --git a/p2p-client-ws/src/remote_ws_wasm.rs b/p2p-client-ws/src/remote_ws_wasm.rs index f9039ad..01a80ce 100644 --- a/p2p-client-ws/src/remote_ws_wasm.rs +++ b/p2p-client-ws/src/remote_ws_wasm.rs @@ -83,9 +83,9 @@ impl IConnection for ConnectionWebSocket { // StartProtocol::Auth(ClientHello::V0()), // ))) // .await; - log!("waiting..."); + //log!("waiting..."); //let res = join.next().await; - log!("finished..."); + //log!("finished..."); //log!("JOIN SHUTDOWN {:?}", res); // Note that since WsMeta::connect resolves to an opened connection, we don't see // any Open events here. @@ -214,7 +214,7 @@ async fn ws_loop( if let ConnectionCommand::Error(err) = last_command.clone() { let _ = shutdown.send(err).await; } else if let ConnectionCommand::ProtocolError(err) = last_command.clone() { - let _ = shutdown.send(NetError::ProtocolError).await; + //let _ = shutdown.send(NetError::ProtocolError).await; } // otherwise, shutdown gracefully (with None). it is done automatically during destroy of shutdown receiver diff --git a/p2p-client/src/lib.rs b/p2p-client/src/lib.rs index 63e24e7..321870a 100644 --- a/p2p-client/src/lib.rs +++ b/p2p-client/src/lib.rs @@ -1,6 +1,6 @@ // All rights reserved. // Licensed under the Apache License, Version 2.0 -// +// // or the MIT license , // at your option. All files in the project carrying such // notice may not be copied, modified, or distributed except @@ -32,7 +32,9 @@ macro_rules! after { $addr.wait_for_stop().await; // TODO add timeout and close connection if there's no reply let r = $receiver.await; - if r.is_err() { return Err(ProtocolError::Closing);} + if r.is_err() { + return Err(ProtocolError::Closing); + } let $reply = r.unwrap(); //debug_println!("reply arrived {:?}", $reply); { @@ -42,5 +44,4 @@ macro_rules! after { }; } -pub mod connection_remote; - +//pub mod connection_remote; diff --git a/p2p-net/Cargo.toml b/p2p-net/Cargo.toml index 1ae762f..efdc469 100644 --- a/p2p-net/Cargo.toml +++ b/p2p-net/Cargo.toml @@ -22,6 +22,8 @@ async-std = { version = "1.12.0", features = ["attributes","unstable"] } wasm-bindgen = "0.2" unique_id = "0.1.5" once_cell = "1.17.1" +noise-protocol = "0.1.4" +noise-rust-crypto = "0.5.0" [target.'cfg(target_arch = "wasm32")'.dependencies] gloo-timers = "0.2.6" \ No newline at end of file diff --git a/p2p-net/src/actor.rs b/p2p-net/src/actor.rs index c095464..1d8dec6 100644 --- a/p2p-net/src/actor.rs +++ b/p2p-net/src/actor.rs @@ -1,47 +1,97 @@ +use async_std::stream::StreamExt; +use async_std::sync::{Mutex, MutexGuard}; use futures::{channel::mpsc, SinkExt}; use serde::de::DeserializeOwned; +use std::any::{Any, TypeId}; +use std::convert::From; +use std::sync::Arc; -use crate::{connection::*, errors::ProtocolError}; +use crate::{connection::*, errors::ProtocolError, log, types::ProtocolMessage}; use std::marker::PhantomData; -pub trait BrokerRequest: DeserializeOwned {} - -pub trait BrokerResponse: DeserializeOwned { - fn test(&self); +pub trait BrokerRequest: std::fmt::Debug { + fn send(&self) -> ProtocolMessage; } -impl BrokerResponse for () { - fn test(&self) {} +//pub trait BrokerResponse: TryFrom + std::fmt::Debug {} + +impl TryFrom for () { + type Error = ProtocolError; + fn try_from(msg: ProtocolMessage) -> Result { + Ok(()) + } } pub trait IActor: EActor { - fn process_request(&self) {} + //fn process_request(&self, req: Box) -> Box {} } #[async_trait::async_trait] -pub trait EActor { - async fn handle(&mut self, cmd: ConnectionCommand); +pub trait EActor: Send + Sync + std::fmt::Debug { + //type T: TryFrom + std::fmt::Debug; + //async fn handle(&mut self, msg: ProtocolMessage); + async fn respond( + &mut self, + msg: ProtocolMessage, + //stream: Option, + fsm: Arc>, + ) -> Result<(), ProtocolError>; } -pub struct Actor<'a, A: BrokerRequest, B: BrokerResponse> { +#[derive(Debug)] +pub struct Actor< + 'a, + A: BrokerRequest, + B: TryFrom + std::fmt::Debug + std::marker::Sync, +> { id: i64, phantom_a: PhantomData<&'a A>, phantom_b: PhantomData<&'a B>, receiver: Receiver, receiver_tx: Sender, + initiator: bool, } -#[async_trait::async_trait] -impl EActor - for Actor<'_, A, B> -{ - async fn handle(&mut self, cmd: ConnectionCommand) { - let _ = self.receiver_tx.send(cmd).await; - } -} +// #[async_trait::async_trait] +// impl< +// A: BrokerRequest + std::marker::Sync + 'static, +// B: TryFrom +// + std::fmt::Debug +// + std::marker::Sync +// + 'static, +// > EActor for Actor<'_, A, B> +// { +// //type T = B; + +// // async fn handle(&mut self, msg: ProtocolMessage) { +// // if self.initiator && msg.type_id() == TypeId::of::() +// // || !self.initiator && msg.type_id() == TypeId::of::() +// // { +// // let _ = self.receiver_tx.send(ConnectionCommand::Msg(msg)).await; +// // } else { +// // log!("NOT OK"); +// // } +// // } + +// // async fn respond(id: i64, msg: A) -> Result { +// // let mut actor = Box::new(Actor::::new(id, false)); +// // //actor.process_request +// // match self.receiver.next().await { +// // Some(msg) => B::receive(msg), +// // _ => Err(ProtocolError::ActorError), +// // } +// // } +// } -impl Actor<'_, A, B> { - pub fn new(id: i64) -> Self { +impl< + A: BrokerRequest + 'static, + B: TryFrom + + std::marker::Sync + + std::fmt::Debug + + 'static, + > Actor<'_, A, B> +{ + pub fn new(id: i64, initiator: bool) -> Self { let (mut receiver_tx, receiver) = mpsc::unbounded::(); Self { id, @@ -49,12 +99,52 @@ impl Actor<'_, A, B> { receiver_tx, phantom_a: PhantomData, phantom_b: PhantomData, + initiator, + } + } + + pub fn verify(&self, msg: ProtocolMessage) -> bool { + self.initiator && msg.type_id() == TypeId::of::() + || !self.initiator && msg.type_id() == TypeId::of::() + } + + pub async fn request( + &mut self, + msg: impl BrokerRequest + std::marker::Sync + std::marker::Send, + stream: Option, + fsm: Arc>, + ) -> Result { + //sender.send(ConnectionCommand::Msg(msg.send())).await; + fsm.lock().await.send(msg.send()).await?; + match self.receiver.next().await { + Some(ConnectionCommand::Msg(msg)) => msg.try_into(), + _ => Err(ProtocolError::ActorError), } } - pub fn request(&self, msg: A, stream: Option) -> Result { - let b: Vec = vec![]; - let a = serde_bare::from_slice::(&b).unwrap(); - Ok(a) + pub fn new_responder() -> Box { + Box::new(Self::new(0, false)) + } + + pub fn get_receiver_tx(&self) -> Sender { + self.receiver_tx.clone() + } +} + +mod test { + + use crate::actor::*; + use crate::actors::*; + use crate::types::*; + + #[async_std::test] + pub async fn test_actor() { + let mut a = Actor::::new(1, true); + // a.handle(ProtocolMessage::Start(StartProtocol::Client( + // ClientHello::Noise3(Noise::V0(NoiseV0 { data: vec![] })), + // ))) + // .await; + // a.handle(ProtocolMessage::Noise(Noise::V0(NoiseV0 { data: vec![] }))) + // .await; } } diff --git a/p2p-net/src/actors/mod.rs b/p2p-net/src/actors/mod.rs index 53c7fe9..339b7ba 100644 --- a/p2p-net/src/actors/mod.rs +++ b/p2p-net/src/actors/mod.rs @@ -1,2 +1,5 @@ pub mod noise; pub use noise::*; + +pub mod start; +pub use start::*; diff --git a/p2p-net/src/actors/noise.rs b/p2p-net/src/actors/noise.rs index 994eb58..e7dcd79 100644 --- a/p2p-net/src/actors/noise.rs +++ b/p2p-net/src/actors/noise.rs @@ -1,9 +1,13 @@ -use crate::{actor::*, errors::ProtocolError}; +use std::sync::Arc; + +use crate::{actor::*, connection::NoiseFSM, errors::ProtocolError, types::ProtocolMessage}; +use async_std::sync::Mutex; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NoiseV0 { - data: Vec, + // contains the handshake messages or the encrypted content of a ProtocolMessage + pub data: Vec, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -11,10 +15,42 @@ pub enum Noise { V0(NoiseV0), } -impl BrokerRequest for Noise {} +impl BrokerRequest for Noise { + fn send(&self) -> ProtocolMessage { + ProtocolMessage::Noise(self.clone()) + } +} + +impl TryFrom for Noise { + type Error = ProtocolError; + fn try_from(msg: ProtocolMessage) -> Result { + if let ProtocolMessage::Noise(n) = msg { + Ok(n) + } else { + Err(ProtocolError::InvalidValue) + } + } +} + +impl Actor<'_, Noise, Noise> {} + +// impl IActor for Actor<'_, Noise, Noise> { +// // fn process_request(&self, req: Box) -> Box { +// // //let r = req as Noise; +// // } +// } -impl Actor<'_, Noise, ()> {} +#[async_trait::async_trait] +impl EActor for Actor<'_, Noise, Noise> { + // fn process_request(&self, req: Box) -> Box { + // //let r = req as Noise; + // } -impl IActor for Actor<'_, Noise, ()> { - //fn process_request(&self) {} + async fn respond( + &mut self, + msg: ProtocolMessage, + fsm: Arc>, + ) -> Result<(), ProtocolError> { + Ok(()) + } } diff --git a/p2p-net/src/actors/start.rs b/p2p-net/src/actors/start.rs new file mode 100644 index 0000000..2bfc631 --- /dev/null +++ b/p2p-net/src/actors/start.rs @@ -0,0 +1,166 @@ +use crate::actors::noise::Noise; +use crate::connection::NoiseFSM; +use crate::types::ExtResponse; +use crate::{actor::*, errors::ProtocolError, types::ProtocolMessage}; +use async_std::sync::Mutex; +use serde::{Deserialize, Serialize}; +use std::any::{Any, TypeId}; +use std::sync::Arc; + +pub struct Noise3(Noise); + +/// Start chosen protocol +/// First message sent by the client +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum StartProtocol { + Client(ClientHello), + Ext(ExtHello), +} + +impl StartProtocol { + pub fn type_id(&self) -> TypeId { + match self { + StartProtocol::Client(a) => a.type_id(), + StartProtocol::Ext(a) => a.type_id(), + } + } + pub fn get_actor(&self) -> Box { + match self { + StartProtocol::Client(a) => a.get_actor(), + StartProtocol::Ext(a) => a.get_actor(), + } + } +} + +/// External Hello (finalizes the Noise handshake and send first ExtRequest) +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ExtHello { + // contains the 3rd Noise handshake message "s,se" + pub noise: Noise, + + /// Noise encrypted payload (an ExtRequest) + pub payload: Vec, +} + +impl ExtHello { + pub fn get_actor(&self) -> Box { + Actor::::new_responder() + } +} + +impl BrokerRequest for ExtHello { + fn send(&self) -> ProtocolMessage { + ProtocolMessage::Start(StartProtocol::Ext(self.clone())) + } +} + +/// Client Hello +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum ClientHello { + // contains the 3rd Noise handshake message "s,se" + Noise3(Noise), + Local, +} + +impl ClientHello { + pub fn type_id(&self) -> TypeId { + match self { + ClientHello::Noise3(a) => a.type_id(), + ClientHello::Local => TypeId::of::(), + } + } + pub fn get_actor(&self) -> Box { + Actor::::new_responder() + } +} + +/// Server hello sent upon a client connection +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ServerHelloV0 { + /// Nonce for ClientAuth + #[serde(with = "serde_bytes")] + pub nonce: Vec, +} + +/// Server hello sent upon a client connection +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum ServerHello { + V0(ServerHelloV0), +} + +impl ServerHello { + pub fn nonce(&self) -> &Vec { + match self { + ServerHello::V0(o) => &o.nonce, + } + } +} + +impl BrokerRequest for ClientHello { + fn send(&self) -> ProtocolMessage { + ProtocolMessage::Start(StartProtocol::Client(ClientHello::Local)) + } +} + +impl TryFrom for ClientHello { + type Error = ProtocolError; + fn try_from(msg: ProtocolMessage) -> Result { + if let ProtocolMessage::Start(StartProtocol::Client(a)) = msg { + Ok(a) + } else { + Err(ProtocolError::InvalidValue) + } + } +} + +impl TryFrom for ServerHello { + type Error = ProtocolError; + fn try_from(msg: ProtocolMessage) -> Result { + if let ProtocolMessage::ServerHello(server_hello) = msg { + Ok(server_hello) + } else { + Err(ProtocolError::InvalidValue) + } + } +} + +impl BrokerRequest for ServerHello { + fn send(&self) -> ProtocolMessage { + ProtocolMessage::ServerHello(self.clone()) + } +} + +impl From for ProtocolMessage { + fn from(msg: ServerHello) -> ProtocolMessage { + ProtocolMessage::ServerHello(msg) + } +} + +impl Actor<'_, ClientHello, ServerHello> {} + +#[async_trait::async_trait] +impl EActor for Actor<'_, ClientHello, ServerHello> { + async fn respond( + &mut self, + msg: ProtocolMessage, + fsm: Arc>, + ) -> Result<(), ProtocolError> { + let req = ClientHello::try_from(msg)?; + let res = ServerHello::V0(ServerHelloV0 { nonce: vec![] }); + fsm.lock().await.send(res.into()).await?; + Ok(()) + } +} + +impl Actor<'_, ExtHello, ExtResponse> {} + +#[async_trait::async_trait] +impl EActor for Actor<'_, ExtHello, ExtResponse> { + async fn respond( + &mut self, + msg: ProtocolMessage, + fsm: Arc>, + ) -> Result<(), ProtocolError> { + Ok(()) + } +} diff --git a/p2p-net/src/broker.rs b/p2p-net/src/broker.rs index a5a19f6..518c076 100644 --- a/p2p-net/src/broker.rs +++ b/p2p-net/src/broker.rs @@ -41,7 +41,6 @@ pub struct DirectConnection { pub static BROKER: Lazy>> = Lazy::new(|| Arc::new(RwLock::new(Broker::new()))); pub struct Broker { - //actors: Arc>>>, direct_connections: HashMap, peers: HashMap, shutdown: Option>, diff --git a/p2p-net/src/connection.rs b/p2p-net/src/connection.rs index 2349944..d46a821 100644 --- a/p2p-net/src/connection.rs +++ b/p2p-net/src/connection.rs @@ -1,7 +1,10 @@ static NOISE_CONFIG: &'static str = "Noise_XK_25519_ChaChaPoly_BLAKE2b"; +use std::collections::HashMap; +use std::fmt; use std::sync::Arc; +use crate::actor::{Actor, EActor}; use crate::actors::*; use crate::errors::NetError; use crate::errors::ProtocolError; @@ -9,7 +12,10 @@ use crate::log; use crate::types::*; use crate::utils::*; use async_std::stream::StreamExt; +use async_std::sync::Mutex; use futures::{channel::mpsc, select, Future, FutureExt, SinkExt}; +use noise_protocol::{CipherState, HandshakeState}; +use noise_rust_crypto::*; use p2p_repo::types::{PrivKey, PubKey}; use unique_id::sequence::SequenceGenerator; use unique_id::Generator; @@ -39,14 +45,167 @@ pub trait IConnection: Send + Sync { async fn accept(&self) -> Result; } -#[derive(PartialEq, Debug)] +#[derive(PartialEq, Debug, Clone)] pub enum ConnectionDir { Server, Client, } +impl ConnectionDir { + pub fn is_server(&self) -> bool { + *self == ConnectionDir::Server + } +} + +#[derive(Debug, PartialEq)] +pub enum FSMstate { + Local0, + Noise0, + Noise1, + Noise2, + Noise3, + ExtRequest, + ExtResponse, + ClientHello, + ServerHello, + ClientAuth, + AuthResult, +} + +pub struct NoiseFSM { + state: FSMstate, + dir: ConnectionDir, + sender: Sender, + + noise_handshake_state: Option>, + noise_cipher_state: Option>, +} + +impl fmt::Debug for NoiseFSM { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("NoiseFSM") + .field("state", &self.state) + .field("dir", &self.dir) + .finish() + } +} + +pub enum StepReply { + Responder(ProtocolMessage), + Response(ProtocolMessage), + NONE, +} + +impl NoiseFSM { + pub fn new( + tp: TransportProtocol, + dir: ConnectionDir, + sender: Sender, + ) -> Self { + Self { + state: if tp == TransportProtocol::Local { + FSMstate::Local0 + } else { + FSMstate::Noise0 + }, + dir, + sender, + noise_handshake_state: None, + noise_cipher_state: None, + } + } + + fn decrypt(&mut self, ciphertext: &Noise) -> ProtocolMessage { + unimplemented!(); + } + + fn encrypt(&mut self, plaintext: ProtocolMessage) -> Noise { + unimplemented!(); + } + + pub async fn send(&mut self, msg: ProtocolMessage) -> Result<(), ProtocolError> { + if self.state == FSMstate::AuthResult && self.noise_cipher_state.is_some() { + let cipher = self.encrypt(msg); + self.sender + .send(ConnectionCommand::Msg(ProtocolMessage::Noise(cipher))) + .await; + return Ok(()); + } else { + return Err(ProtocolError::InvalidState); + } + } + + pub async fn receive( + &mut self, + msg: ProtocolMessage, + ) -> Result { + if self.state == FSMstate::AuthResult && self.noise_cipher_state.is_some() { + if let ProtocolMessage::Noise(noise) = msg { + let new = self.decrypt(&noise); + Ok(new) + } else { + Err(ProtocolError::MustBeEncrypted) + } + } else { + Err(ProtocolError::InvalidState) + } + } + + pub fn step( + &mut self, + mut msg_opt: Option, + ) -> Result { + if self.noise_cipher_state.is_some() { + if let Some(ProtocolMessage::Noise(noise)) = msg_opt.as_ref() { + let new = self.decrypt(noise); + msg_opt.replace(new); + } else { + return Err(ProtocolError::MustBeEncrypted); + } + } + match self.state { + // TODO verify that ID is zero + FSMstate::Local0 => { + if let Some(msg) = msg_opt.as_ref() { + if self.dir.is_server() && msg.type_id() == ClientHello::Local.type_id() { + self.state = FSMstate::ServerHello; + Box::new(Actor::::new(msg.id(), false)); + return Ok(StepReply::NONE); + } + } else if !self.dir.is_server() && msg_opt.is_none() { + self.state = FSMstate::ClientHello; + Box::new(Actor::::new(0, true)); + return Ok(StepReply::NONE); + } + } + FSMstate::Noise0 => {} + FSMstate::Noise1 => {} + FSMstate::Noise2 => {} + FSMstate::Noise3 => {} + FSMstate::ExtRequest => {} + FSMstate::ExtResponse => {} + FSMstate::ClientHello => {} + FSMstate::ServerHello => {} + FSMstate::ClientAuth => {} + FSMstate::AuthResult => { + if let Some(msg) = msg_opt { + let id = msg.id(); + if self.dir.is_server() && id > 0 || !self.dir.is_server() && id < 0 { + return Ok(StepReply::Responder(msg)); + } else if id != 0 { + return Ok(StepReply::Response(msg)); + } + } + } + } + Err(ProtocolError::InvalidState) + } +} + #[derive(Debug)] pub struct ConnectionBase { + fsm: Option>>, + sender: Option>, receiver: Option>, sender_tx: Option>, @@ -55,11 +214,14 @@ pub struct ConnectionBase { dir: ConnectionDir, next_request_id: SequenceGenerator, tp: TransportProtocol, + + actors: Arc>>>, } impl ConnectionBase { pub fn new(dir: ConnectionDir, tp: TransportProtocol) -> Self { Self { + fsm: None, receiver: None, sender: None, sender_tx: None, @@ -68,6 +230,7 @@ impl ConnectionBase { next_request_id: SequenceGenerator::new(1), dir, tp, + actors: Arc::new(Mutex::new(HashMap::new())), } } @@ -111,29 +274,103 @@ impl ConnectionBase { async fn read_loop( mut receiver: Receiver, mut sender: Sender, + actors: Arc>>>, + fsm: Arc>, ) -> ResultSend<()> { while let Some(msg) = receiver.next().await { log!("RECEIVED: {:?}", msg); - // sender - // .send(ConnectionCommand::Close) - // .await - // .map_err(|e| "channel send error")? - if let ConnectionCommand::Close = msg { log!("EXIT READ LOOP"); break; + } else if let ConnectionCommand::Msg(proto_msg) = msg { + let res; + { + let mut locked_fsm = fsm.lock().await; + res = locked_fsm.step(Some(proto_msg)); + } + match res { + Err(e) => { + if sender + .send(ConnectionCommand::ProtocolError(e)) + .await + .is_err() + { + break; //TODO test that sending a ProtocolError effectively closes the connection (with ConnectionCommand::Close) + } + } + Ok(StepReply::NONE) => {} + Ok(StepReply::Responder(responder)) => { + let r = responder + .get_actor() + .respond(responder, Arc::clone(&fsm)) + .await; + if r.is_err() { + if sender + .send(ConnectionCommand::ProtocolError(r.unwrap_err())) + .await + .is_err() + { + break; + } + } + } + Ok(StepReply::Response(response)) => { + let mut lock = actors.lock().await; + let exists = lock.get_mut(&response.id()); + match exists { + Some(actor_sender) => { + if actor_sender + .send(ConnectionCommand::Msg(response)) + .await + .is_err() + { + break; + } + } + None => { + if sender + .send(ConnectionCommand::ProtocolError( + ProtocolError::ActorError, + )) + .await + .is_err() + { + break; + } + } + } + } + } } } Ok(()) } - pub async fn request(&mut self) { + pub async fn request< + A: crate::actor::BrokerRequest + std::marker::Sync + std::marker::Send + 'static, + B: TryFrom + + std::fmt::Debug + + std::marker::Sync + + 'static, + >( + &self, + msg: A, + stream: Option, + ) -> Result { + if self.fsm.is_none() { + return Err(ProtocolError::FsmNotReady); + } + let mut id = self.next_request_id.next_id(); if self.dir == ConnectionDir::Server { id = !id + 1; } - // id + let mut actor = Box::new(Actor::::new(id, true)); + self.actors.lock().await.insert(id, actor.get_receiver_tx()); + actor + .request(msg, stream, Arc::clone(self.fsm.as_ref().unwrap())) + .await } pub async fn send(&mut self, cmd: ConnectionCommand) { @@ -162,6 +399,43 @@ impl ConnectionBase { self.sender_tx = Some(sender_tx.clone()); self.receiver_tx = Some(receiver_tx); - spawn_and_log_error(Self::read_loop(receiver_rx, sender_tx)); + let fsm = Arc::new(Mutex::new(NoiseFSM::new( + self.tp, + self.dir.clone(), + sender_tx.clone(), + ))); + self.fsm = Some(Arc::clone(&fsm)); + + spawn_and_log_error(Self::read_loop( + receiver_rx, + sender_tx, + Arc::clone(&self.actors), + fsm, + )); + } +} + +mod test { + + use crate::actor::*; + use crate::actors::*; + use crate::log; + use crate::types::*; + use std::any::{Any, TypeId}; + + #[async_std::test] + pub async fn test_connection() {} + + #[async_std::test] + pub async fn test_typeid() { + log!( + "{:?}", + ClientHello::Noise3(Noise::V0(NoiseV0 { data: vec![] })).type_id() + ); + let a = Noise::V0(NoiseV0 { data: [].to_vec() }); + log!("{:?}", a.type_id()); + log!("{:?}", TypeId::of::()); + log!("{:?}", ClientHello::Local.type_id()); + log!("{:?}", TypeId::of::()); } } diff --git a/p2p-net/src/errors.rs b/p2p-net/src/errors.rs index 011318f..b546668 100644 --- a/p2p-net/src/errors.rs +++ b/p2p-net/src/errors.rs @@ -71,6 +71,8 @@ pub enum ProtocolError { NoError, OtherError, Closing, + FsmNotReady, + MustBeEncrypted, } //MAX 949 ProtocolErrors impl ProtocolError { diff --git a/p2p-net/src/types.rs b/p2p-net/src/types.rs index 9d56588..45b4d50 100644 --- a/p2p-net/src/types.rs +++ b/p2p-net/src/types.rs @@ -14,9 +14,12 @@ //! Corresponds to the BARE schema use core::fmt; -use std::net::IpAddr; +use std::{ + any::{Any, TypeId}, + net::IpAddr, +}; -use crate::actors::*; +use crate::{actor::EActor, actors::*, errors::ProtocolError}; use p2p_repo::types::*; use serde::{Deserialize, Serialize}; @@ -102,6 +105,7 @@ impl From<&IP> for IpAddr { pub enum TransportProtocol { WS, QUIC, + Local, } /// IP transport address @@ -482,7 +486,7 @@ pub enum OverlayRequestContentV0 { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct OverlayRequestV0 { /// Request ID - pub id: u64, + pub id: i64, /// Request content pub content: OverlayRequestContentV0, @@ -506,7 +510,7 @@ pub enum OverlayResponseContentV0 { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct OverlayResponseV0 { /// Request ID - pub id: u64, + pub id: i64, /// Result pub result: u8, @@ -805,12 +809,22 @@ pub enum BrokerRequestContentV0 { AddClient(AddClient), DelClient(DelClient), } +impl BrokerRequestContentV0 { + pub fn type_id(&self) -> TypeId { + match self { + BrokerRequestContentV0::AddUser(a) => a.type_id(), + BrokerRequestContentV0::DelUser(a) => a.type_id(), + BrokerRequestContentV0::AddClient(a) => a.type_id(), + BrokerRequestContentV0::DelClient(a) => a.type_id(), + } + } +} /// Broker request #[derive(Clone, Debug, Serialize, Deserialize)] pub struct BrokerRequestV0 { /// Request ID - pub id: u64, + pub id: i64, /// Request content pub content: BrokerRequestContentV0, @@ -823,11 +837,16 @@ pub enum BrokerRequest { } impl BrokerRequest { - pub fn id(&self) -> u64 { + pub fn id(&self) -> i64 { match self { BrokerRequest::V0(o) => o.id, } } + pub fn type_id(&self) -> TypeId { + match self { + BrokerRequest::V0(o) => o.content.type_id(), + } + } pub fn content_v0(&self) -> BrokerRequestContentV0 { match self { BrokerRequest::V0(o) => o.content.clone(), @@ -839,7 +858,7 @@ impl BrokerRequest { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct BrokerResponseV0 { /// Request ID - pub id: u64, + pub id: i64, /// Result (including but not limited to Result) pub result: u16, @@ -852,7 +871,7 @@ pub enum BrokerResponse { } impl BrokerResponse { - pub fn id(&self) -> u64 { + pub fn id(&self) -> i64 { match self { BrokerResponse::V0(o) => o.id, } @@ -1157,7 +1176,7 @@ pub enum BrokerOverlayRequestContentV0 { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct BrokerOverlayRequestV0 { /// Request ID - pub id: u64, + pub id: i64, /// Request content pub content: BrokerOverlayRequestContentV0, @@ -1170,7 +1189,7 @@ pub enum BrokerOverlayRequest { } impl BrokerOverlayRequest { - pub fn id(&self) -> u64 { + pub fn id(&self) -> i64 { match self { BrokerOverlayRequest::V0(o) => o.id, } @@ -1194,7 +1213,7 @@ pub enum BrokerOverlayResponseContentV0 { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct BrokerOverlayResponseV0 { /// Request ID - pub id: u64, + pub id: i64, /// Result (including but not limited to Result) pub result: u16, @@ -1210,7 +1229,7 @@ pub enum BrokerOverlayResponse { } impl BrokerOverlayResponse { - pub fn id(&self) -> u64 { + pub fn id(&self) -> i64 { match self { BrokerOverlayResponse::V0(o) => o.id, } @@ -1299,7 +1318,7 @@ impl BrokerOverlayMessage { ), } } - pub fn id(&self) -> u64 { + pub fn id(&self) -> i64 { match self { BrokerOverlayMessage::V0(o) => match &o.content { BrokerOverlayMessageContentV0::BrokerOverlayResponse(r) => r.id(), @@ -1374,10 +1393,20 @@ pub struct BrokerMessageV0 { #[derive(Clone, Debug, Serialize, Deserialize)] pub enum BrokerMessage { V0(BrokerMessageV0), - Close, + Close, //TODO: remove Close. } impl BrokerMessage { + pub fn type_id(&self) -> TypeId { + match self { + BrokerMessage::V0(a) => match &a.content { + BrokerMessageContentV0::BrokerOverlayMessage(p) => p.type_id(), + BrokerMessageContentV0::BrokerResponse(p) => p.type_id(), + BrokerMessageContentV0::BrokerRequest(p) => p.type_id(), + }, + BrokerMessage::Close => TypeId::of::(), + } + } pub fn is_close(&self) -> bool { match self { BrokerMessage::V0(o) => false, @@ -1411,7 +1440,7 @@ impl BrokerMessage { BrokerMessage::Close => panic!("Close not implemented"), } } - pub fn id(&self) -> u64 { + pub fn id(&self) -> i64 { match self { BrokerMessage::V0(o) => match &o.content { BrokerMessageContentV0::BrokerOverlayMessage(p) => p.id(), @@ -1520,28 +1549,36 @@ pub enum ExtRequestContentV0 { ExtBranchSyncReq(ExtBranchSyncReq), } -/// External request authenticated by a MAC +/// External Request Payload V0 +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ExtRequestPayload { + content: ExtRequestContentV0, + // ... +} + +/// External request with its request ID #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ExtRequestV0 { /// Request ID - pub id: u64, + pub id: i64, - /// Request content - pub content: ExtRequestContentV0, - - /// BLAKE3 MAC over content - /// BLAKE3 keyed hash: - /// - key: BLAKE3 derive_key ("NextGraph ExtRequest BLAKE3 key", - /// repo_pubkey + repo_secret) - pub mac: Digest, + /// Request payload + pub payload: ExtRequestPayload, } -/// External request authenticated by a MAC #[derive(Clone, Debug, Serialize, Deserialize)] pub enum ExtRequest { V0(ExtRequestV0), } +impl ExtRequest { + pub fn id(&self) -> i64 { + match self { + ExtRequest::V0(v0) => v0.id, + } + } +} + /// Content of ExtResponseV0 #[derive(Clone, Debug, Serialize, Deserialize)] pub enum ExtResponseContentV0 { @@ -1554,7 +1591,7 @@ pub enum ExtResponseContentV0 { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ExtResponseV0 { /// Request ID - pub id: u64, + pub id: i64, /// Result code pub result: u16, @@ -1569,6 +1606,25 @@ pub enum ExtResponse { V0(ExtResponseV0), } +impl ExtResponse { + pub fn id(&self) -> i64 { + match self { + ExtResponse::V0(v0) => v0.id, + } + } +} + +impl TryFrom for ExtResponse { + type Error = ProtocolError; + fn try_from(msg: ProtocolMessage) -> Result { + if let ProtocolMessage::ExtResponse(ext_res) = msg { + Ok(ext_res) + } else { + Err(ProtocolError::InvalidValue) + } + } +} + /// /// PROTOCOL MESSAGES /// @@ -1579,50 +1635,56 @@ pub enum ProtocolMessage { ServerHello(ServerHello), ClientAuth(ClientAuth), AuthResult(AuthResult), + ExtRequest(ExtRequest), ExtResponse(ExtResponse), BrokerMessage(BrokerMessage), } -/// -/// AUTHENTICATION MESSAGES -/// - -/// Client Hello -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum ClientHello { - V0(), -} - -/// Start chosen protocol -/// First message sent by the client -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum StartProtocol { - Auth(ClientHello), - Ext(ExtRequest), -} - -/// Server hello sent upon a client connection -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ServerHelloV0 { - /// Nonce for ClientAuth - #[serde(with = "serde_bytes")] - pub nonce: Vec, -} - -/// Server hello sent upon a client connection -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum ServerHello { - V0(ServerHelloV0), -} +impl ProtocolMessage { + pub fn id(&self) -> i64 { + match self { + ProtocolMessage::Noise(_) => 0, + ProtocolMessage::Start(_) => 0, + ProtocolMessage::ServerHello(_) => 0, + ProtocolMessage::ClientAuth(_) => 0, + ProtocolMessage::AuthResult(_) => 0, + ProtocolMessage::ExtRequest(ext_req) => ext_req.id(), + ProtocolMessage::ExtResponse(ext_res) => ext_res.id(), + ProtocolMessage::BrokerMessage(broker_msg) => broker_msg.id(), + } + } + pub fn type_id(&self) -> TypeId { + match self { + ProtocolMessage::Noise(a) => a.type_id(), + ProtocolMessage::Start(a) => a.type_id(), + ProtocolMessage::ServerHello(a) => a.type_id(), + ProtocolMessage::ClientAuth(a) => a.type_id(), + ProtocolMessage::AuthResult(a) => a.type_id(), + ProtocolMessage::ExtRequest(a) => a.type_id(), + ProtocolMessage::ExtResponse(a) => a.type_id(), + ProtocolMessage::BrokerMessage(a) => a.type_id(), + } + } -impl ServerHello { - pub fn nonce(&self) -> &Vec { + pub fn get_actor(&self) -> Box { match self { - ServerHello::V0(o) => &o.nonce, + //ProtocolMessage::Noise(a) => a.get_actor(), + ProtocolMessage::Start(a) => a.get_actor(), + // ProtocolMessage::ServerHello(a) => a.get_actor(), + // ProtocolMessage::ClientAuth(a) => a.get_actor(), + // ProtocolMessage::AuthResult(a) => a.get_actor(), + // ProtocolMessage::ExtRequest(a) => a.get_actor(), + // ProtocolMessage::ExtResponse(a) => a.get_actor(), + // ProtocolMessage::BrokerMessage(a) => a.get_actor(), + _ => unimplemented!(), } } } +/// +/// AUTHENTICATION MESSAGES +/// + /// Content of ClientAuthV0 #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ClientAuthContentV0 {