From fa73850873337d90f0f5dcc959d045875e9528b3 Mon Sep 17 00:00:00 2001 From: Niko Date: Wed, 3 May 2023 21:07:40 +0300 Subject: [PATCH] allowing stream responses to broker request --- p2p-broker/src/server.rs | 9 ++-- p2p-client-ws/src/remote_ws.rs | 2 +- p2p-net/src/actor.rs | 86 ++++++++++++++++++++++++++-------- p2p-net/src/actors/noise.rs | 22 ++++----- p2p-net/src/actors/start.rs | 34 +++++++++----- p2p-net/src/broker.rs | 2 +- p2p-net/src/connection.rs | 27 ++++++----- p2p-net/src/errors.rs | 9 ++-- p2p-net/src/types.rs | 30 +++++++----- p2p-net/src/utils.rs | 3 ++ 10 files changed, 147 insertions(+), 77 deletions(-) diff --git a/p2p-broker/src/server.rs b/p2p-broker/src/server.rs index 86a1537..15d6d67 100644 --- a/p2p-broker/src/server.rs +++ b/p2p-broker/src/server.rs @@ -245,6 +245,7 @@ impl BrokerProtocolHandler { content: BrokerMessageContentV0::BrokerResponse(BrokerResponse::V0(BrokerResponseV0 { id, result, + content: BrokerResponseContentV0::EmptyResponse(()), })), }); msg @@ -262,8 +263,8 @@ impl BrokerProtocolHandler { Err(e) => e.into(), }; let content = match block { - Some(b) => Some(BrokerOverlayResponseContentV0::Block(b)), - None => None, + Some(b) => BrokerOverlayResponseContentV0::Block(b), + None => BrokerOverlayResponseContentV0::EmptyResponse(()), }; let msg = BrokerMessage::V0(BrokerMessageV0 { padding: vec![0; padding_size], @@ -294,8 +295,8 @@ impl BrokerProtocolHandler { Err(e) => (*e).clone().into(), }; let content = match res { - Ok(r) => Some(BrokerOverlayResponseContentV0::Block(r)), - Err(_) => None, + Ok(r) => BrokerOverlayResponseContentV0::Block(r), + Err(_) => BrokerOverlayResponseContentV0::EmptyResponse(()), }; let msg = BrokerMessage::V0(BrokerMessageV0 { padding: vec![0; padding_size], diff --git a/p2p-client-ws/src/remote_ws.rs b/p2p-client-ws/src/remote_ws.rs index f987471..e03fd81 100644 --- a/p2p-client-ws/src/remote_ws.rs +++ b/p2p-client-ws/src/remote_ws.rs @@ -28,7 +28,7 @@ use async_std::task; use p2p_net::errors::*; use p2p_net::log; use p2p_net::types::*; -use p2p_net::utils::{spawn_and_log_error, ResultSend}; +use p2p_net::utils::{spawn_and_log_error, Receiver, ResultSend, Sender}; use p2p_net::{connection::*, WS_PORT}; use p2p_repo::types::*; use p2p_repo::utils::{generate_keypair, now_timestamp}; diff --git a/p2p-net/src/actor.rs b/p2p-net/src/actor.rs index af180da..a604a3d 100644 --- a/p2p-net/src/actor.rs +++ b/p2p-net/src/actor.rs @@ -6,12 +6,13 @@ use std::any::{Any, TypeId}; use std::convert::From; use std::sync::Arc; +use crate::utils::{spawn_and_log_error, Receiver, ResultSend, Sender}; use crate::{connection::*, errors::ProtocolError, log, types::ProtocolMessage}; use std::marker::PhantomData; -pub trait BrokerRequest: std::fmt::Debug { - fn send(&self) -> ProtocolMessage; -} +// pub trait BrokerRequest: std::fmt::Debug { +// fn send(&self) -> ProtocolMessage; +// } //pub trait BrokerResponse: TryFrom + std::fmt::Debug {} @@ -22,10 +23,6 @@ impl TryFrom for () { } } -pub trait IActor: EActor { - //fn process_request(&self, req: Box) -> Box {} -} - #[async_trait::async_trait] pub trait EActor: Send + Sync + std::fmt::Debug { //type T: TryFrom + std::fmt::Debug; @@ -41,13 +38,13 @@ pub trait EActor: Send + Sync + std::fmt::Debug { #[derive(Debug)] pub struct Actor< 'a, - A: BrokerRequest, + A: Into + std::fmt::Debug, B: TryFrom + std::fmt::Debug + Sync, > { id: i64, phantom_a: PhantomData<&'a A>, phantom_b: PhantomData<&'a B>, - receiver: Receiver, + receiver: Option>, receiver_tx: Sender, initiator: bool, } @@ -83,19 +80,21 @@ pub struct Actor< // // } // } +pub enum SoS { + Single(B), + Stream(mpsc::UnboundedReceiver), +} + impl< - A: BrokerRequest + 'static, - B: TryFrom - + std::marker::Sync - + std::fmt::Debug - + 'static, + A: Into + std::fmt::Debug + 'static, + B: TryFrom + Sync + Send + std::fmt::Debug + 'static, > Actor<'_, A, B> { pub fn new(id: i64, initiator: bool) -> Self { let (mut receiver_tx, receiver) = mpsc::unbounded::(); Self { id, - receiver, + receiver: Some(receiver), receiver_tx, phantom_a: PhantomData, phantom_b: PhantomData, @@ -111,13 +110,62 @@ impl< pub async fn request( &mut self, msg: ProtocolMessage, - stream: Option, + //stream: Option, fsm: Arc>, - ) -> Result { + ) -> Result, ProtocolError> { //sender.send(ConnectionCommand::Msg(msg.send())).await; fsm.lock().await.send(msg).await?; - match self.receiver.next().await { - Some(ConnectionCommand::Msg(msg)) => msg.try_into(), + let mut receiver = self.receiver.take().unwrap(); + match receiver.next().await { + Some(ConnectionCommand::Msg(msg)) => { + if let ProtocolMessage::BrokerMessage(ref bm) = msg { + if bm.result() == ProtocolError::PartialContent.into() + && TypeId::of::() != TypeId::of::<()>() + { + let (b_sender, b_receiver) = mpsc::unbounded::(); + async fn pump_stream>( + mut actor_receiver: Receiver, + mut sos_sender: Sender, + fsm: Arc>, + id: i64, + ) -> ResultSend<()> { + async move { + while let Some(ConnectionCommand::Msg(msg)) = + actor_receiver.next().await + { + if let ProtocolMessage::BrokerMessage(ref bm) = msg { + if bm.result() == ProtocolError::EndOfStream.into() { + break; + } + let response = msg.try_into(); + if response.is_err() { + // TODO deal with errors. + break; + } + sos_sender.send(response.unwrap()).await; + } else { + // todo deal with error (not a brokermessage) + break; + } + } + fsm.lock().await.remove_actor(id).await; + } + .await; + Ok(()) + } + spawn_and_log_error(pump_stream::( + receiver, + b_sender, + Arc::clone(&fsm), + self.id, + )); + return Ok(SoS::::Stream(b_receiver)); + } + } + fsm.lock().await.remove_actor(self.id).await; + let response: B = msg.try_into()?; + Ok(SoS::::Single(response)) + } _ => Err(ProtocolError::ActorError), } } diff --git a/p2p-net/src/actors/noise.rs b/p2p-net/src/actors/noise.rs index e7dcd79..205204c 100644 --- a/p2p-net/src/actors/noise.rs +++ b/p2p-net/src/actors/noise.rs @@ -15,9 +15,15 @@ pub enum Noise { V0(NoiseV0), } -impl BrokerRequest for Noise { - fn send(&self) -> ProtocolMessage { - ProtocolMessage::Noise(self.clone()) +// impl BrokerRequest for Noise { +// fn send(&self) -> ProtocolMessage { +// ProtocolMessage::Noise(self.clone()) +// } +// } + +impl From for ProtocolMessage { + fn from(msg: Noise) -> ProtocolMessage { + ProtocolMessage::Noise(msg) } } @@ -34,18 +40,8 @@ impl TryFrom for Noise { impl Actor<'_, Noise, Noise> {} -// impl IActor for Actor<'_, Noise, Noise> { -// // fn process_request(&self, req: Box) -> Box { -// // //let r = req as Noise; -// // } -// } - #[async_trait::async_trait] impl EActor for Actor<'_, Noise, Noise> { - // fn process_request(&self, req: Box) -> Box { - // //let r = req as Noise; - // } - async fn respond( &mut self, msg: ProtocolMessage, diff --git a/p2p-net/src/actors/start.rs b/p2p-net/src/actors/start.rs index 2bfc631..e120621 100644 --- a/p2p-net/src/actors/start.rs +++ b/p2p-net/src/actors/start.rs @@ -48,9 +48,15 @@ impl ExtHello { } } -impl BrokerRequest for ExtHello { - fn send(&self) -> ProtocolMessage { - ProtocolMessage::Start(StartProtocol::Ext(self.clone())) +// impl BrokerRequest for ExtHello { +// fn send(&self) -> ProtocolMessage { +// ProtocolMessage::Start(StartProtocol::Ext(self.clone())) +// } +// } + +impl From for ProtocolMessage { + fn from(msg: ExtHello) -> ProtocolMessage { + ProtocolMessage::Start(StartProtocol::Ext(msg)) } } @@ -96,9 +102,15 @@ impl ServerHello { } } -impl BrokerRequest for ClientHello { - fn send(&self) -> ProtocolMessage { - ProtocolMessage::Start(StartProtocol::Client(ClientHello::Local)) +// impl BrokerRequest for ClientHello { +// fn send(&self) -> ProtocolMessage { +// ProtocolMessage::Start(StartProtocol::Client(ClientHello::Local)) +// } +// } + +impl From for ProtocolMessage { + fn from(msg: ClientHello) -> ProtocolMessage { + ProtocolMessage::Start(StartProtocol::Client(msg)) } } @@ -124,11 +136,11 @@ impl TryFrom for ServerHello { } } -impl BrokerRequest for ServerHello { - fn send(&self) -> ProtocolMessage { - ProtocolMessage::ServerHello(self.clone()) - } -} +// impl BrokerRequest for ServerHello { +// fn send(&self) -> ProtocolMessage { +// ProtocolMessage::ServerHello(self.clone()) +// } +// } impl From for ProtocolMessage { fn from(msg: ServerHello) -> ProtocolMessage { diff --git a/p2p-net/src/broker.rs b/p2p-net/src/broker.rs index 074151c..f5cb582 100644 --- a/p2p-net/src/broker.rs +++ b/p2p-net/src/broker.rs @@ -3,7 +3,7 @@ use crate::connection::*; use crate::errors::*; use crate::types::*; use crate::utils::spawn_and_log_error; -use crate::utils::ResultSend; +use crate::utils::{Receiver, ResultSend, Sender}; use crate::{log, sleep}; use async_std::stream::StreamExt; use async_std::sync::{Arc, RwLock}; diff --git a/p2p-net/src/connection.rs b/p2p-net/src/connection.rs index 6ea53f2..27ce8f1 100644 --- a/p2p-net/src/connection.rs +++ b/p2p-net/src/connection.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; use std::fmt; use std::sync::Arc; -use crate::actor::{Actor, EActor}; +use crate::actor::{Actor, SoS}; use crate::actors::*; use crate::errors::NetError; use crate::errors::ProtocolError; @@ -21,9 +21,6 @@ use unique_id::sequence::SequenceGenerator; use unique_id::Generator; use unique_id::GeneratorFromSeed; -pub type Sender = mpsc::UnboundedSender; -pub type Receiver = mpsc::UnboundedReceiver; - #[derive(Debug, Clone)] pub enum ConnectionCommand { Msg(ProtocolMessage), @@ -77,6 +74,8 @@ pub struct NoiseFSM { dir: ConnectionDir, sender: Sender, + actors: Arc>>>, + noise_handshake_state: Option>, noise_cipher_state: Option>, } @@ -100,6 +99,7 @@ impl NoiseFSM { pub fn new( tp: TransportProtocol, dir: ConnectionDir, + actors: Arc>>>, sender: Sender, ) -> Self { Self { @@ -109,6 +109,7 @@ impl NoiseFSM { FSMstate::Noise0 }, dir, + actors, sender, noise_handshake_state: None, noise_cipher_state: None, @@ -123,6 +124,10 @@ impl NoiseFSM { unimplemented!(); } + pub async fn remove_actor(&self, id: i64) { + self.actors.lock().await.remove(&id); + } + pub async fn send(&mut self, msg: ProtocolMessage) -> Result<(), ProtocolError> { if self.state == FSMstate::AuthResult && self.noise_cipher_state.is_some() { let cipher = self.encrypt(msg); @@ -348,13 +353,13 @@ impl ConnectionBase { } pub async fn request< - A: crate::actor::BrokerRequest + Sync + Send + 'static, - B: TryFrom + std::fmt::Debug + Sync + 'static, + A: Into + std::fmt::Debug + Sync + Send + 'static, + B: TryFrom + std::fmt::Debug + Sync + Send + 'static, >( &self, msg: A, - stream: Option, - ) -> Result { + //stream: Option, + ) -> Result, ProtocolError> { if self.fsm.is_none() { return Err(ProtocolError::FsmNotReady); } @@ -365,12 +370,11 @@ impl ConnectionBase { } let mut actor = Box::new(Actor::::new(id, true)); self.actors.lock().await.insert(id, actor.get_receiver_tx()); - let mut proto_msg = msg.send(); + let mut proto_msg: ProtocolMessage = msg.into(); proto_msg.set_id(id); let res = actor - .request(proto_msg, stream, Arc::clone(self.fsm.as_ref().unwrap())) + .request(proto_msg, Arc::clone(self.fsm.as_ref().unwrap())) .await; - self.actors.lock().await.remove(&id); res } @@ -403,6 +407,7 @@ impl ConnectionBase { let fsm = Arc::new(Mutex::new(NoiseFSM::new( self.tp, self.dir.clone(), + Arc::clone(&self.actors), sender_tx.clone(), ))); self.fsm = Some(Arc::clone(&fsm)); diff --git a/p2p-net/src/errors.rs b/p2p-net/src/errors.rs index b546668..aa41374 100644 --- a/p2p-net/src/errors.rs +++ b/p2p-net/src/errors.rs @@ -43,20 +43,22 @@ impl fmt::Display for NetError { #[derive(Debug, Eq, PartialEq, TryFromPrimitive, IntoPrimitive, Clone)] #[repr(u16)] pub enum ProtocolError { - WriteError = 1, + NoError = 0, + PartialContent, + EndOfStream, + + WriteError, WsError, ActorError, InvalidState, SignatureError, InvalidSignature, SerializationError, - PartialContent, AccessDenied, OverlayNotJoined, OverlayNotFound, BrokerError, NotFound, - EndOfStream, StoreError, MissingBlocks, ObjectParseError, @@ -68,7 +70,6 @@ pub enum ProtocolError { Timeout, PeerAlreadyConnected, - NoError, OtherError, Closing, FsmNotReady, diff --git a/p2p-net/src/types.rs b/p2p-net/src/types.rs index df32454..30fab32 100644 --- a/p2p-net/src/types.rs +++ b/p2p-net/src/types.rs @@ -501,6 +501,7 @@ pub enum OverlayRequest { /// Content of OverlayResponseV0 #[derive(Clone, Debug, Serialize, Deserialize)] pub enum OverlayResponseContentV0 { + EmptyResponse(()), Block(Block), EventResp(EventResp), Event(Event), @@ -513,10 +514,10 @@ pub struct OverlayResponseV0 { pub id: i64, /// Result - pub result: u8, + pub result: u16, /// Response content - pub content: Option, + pub content: OverlayResponseContentV0, } /// Request sent to an OverlayRequest @@ -861,6 +862,12 @@ impl BrokerRequest { } } +/// Content of `BrokerResponseV0` +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum BrokerResponseContentV0 { + EmptyResponse(()), +} + /// Response to a `BrokerRequest` #[derive(Clone, Debug, Serialize, Deserialize)] pub struct BrokerResponseV0 { @@ -869,6 +876,8 @@ pub struct BrokerResponseV0 { /// Result (including but not limited to Result) pub result: u16, + + pub content: BrokerResponseContentV0, } /// Response to a `BrokerRequest` @@ -1225,6 +1234,7 @@ impl BrokerOverlayRequest { /// Content of `BrokerOverlayResponseV0` #[derive(Clone, Debug, Serialize, Deserialize)] pub enum BrokerOverlayResponseContentV0 { + EmptyResponse(()), Block(Block), ObjectId(ObjectId), OverlayStatusResp(OverlayStatusResp), @@ -1240,7 +1250,7 @@ pub struct BrokerOverlayResponseV0 { pub result: u16, /// Response content - pub content: Option, + pub content: BrokerOverlayResponseContentV0, } /// Response to a `BrokerOverlayRequest` @@ -1270,22 +1280,16 @@ impl BrokerOverlayResponse { pub fn block(&self) -> Option<&Block> { match self { BrokerOverlayResponse::V0(o) => match &o.content { - Some(contentv0) => match contentv0 { - BrokerOverlayResponseContentV0::Block(b) => Some(b), - _ => panic!("this not a block reponse"), - }, - None => None, + BrokerOverlayResponseContentV0::Block(b) => Some(b), + _ => panic!("this not a block response"), }, } } pub fn object_id(&self) -> ObjectId { match self { BrokerOverlayResponse::V0(o) => match &o.content { - Some(contentv0) => match contentv0 { - BrokerOverlayResponseContentV0::ObjectId(id) => id.clone(), - _ => panic!("this not an objectId reponse"), - }, - None => panic!("this not an objectId reponse (doesnt have content)"), + BrokerOverlayResponseContentV0::ObjectId(id) => id.clone(), + _ => panic!("this not an objectId reponse"), }, } } diff --git a/p2p-net/src/utils.rs b/p2p-net/src/utils.rs index 0dee752..19043c7 100644 --- a/p2p-net/src/utils.rs +++ b/p2p-net/src/utils.rs @@ -30,3 +30,6 @@ where } }) } + +pub type Sender = mpsc::UnboundedSender; +pub type Receiver = mpsc::UnboundedReceiver;