allowing stream responses to broker request

pull/19/head
Niko 2 years ago
parent 9b1df4c33d
commit 4699ffd84b
  1. 9
      p2p-broker/src/server.rs
  2. 2
      p2p-client-ws/src/remote_ws.rs
  3. 86
      p2p-net/src/actor.rs
  4. 22
      p2p-net/src/actors/noise.rs
  5. 34
      p2p-net/src/actors/start.rs
  6. 2
      p2p-net/src/broker.rs
  7. 27
      p2p-net/src/connection.rs
  8. 9
      p2p-net/src/errors.rs
  9. 30
      p2p-net/src/types.rs
  10. 3
      p2p-net/src/utils.rs

@ -245,6 +245,7 @@ impl BrokerProtocolHandler {
content: BrokerMessageContentV0::BrokerResponse(BrokerResponse::V0(BrokerResponseV0 { content: BrokerMessageContentV0::BrokerResponse(BrokerResponse::V0(BrokerResponseV0 {
id, id,
result, result,
content: BrokerResponseContentV0::EmptyResponse(()),
})), })),
}); });
msg msg
@ -262,8 +263,8 @@ impl BrokerProtocolHandler {
Err(e) => e.into(), Err(e) => e.into(),
}; };
let content = match block { let content = match block {
Some(b) => Some(BrokerOverlayResponseContentV0::Block(b)), Some(b) => BrokerOverlayResponseContentV0::Block(b),
None => None, None => BrokerOverlayResponseContentV0::EmptyResponse(()),
}; };
let msg = BrokerMessage::V0(BrokerMessageV0 { let msg = BrokerMessage::V0(BrokerMessageV0 {
padding: vec![0; padding_size], padding: vec![0; padding_size],
@ -294,8 +295,8 @@ impl BrokerProtocolHandler {
Err(e) => (*e).clone().into(), Err(e) => (*e).clone().into(),
}; };
let content = match res { let content = match res {
Ok(r) => Some(BrokerOverlayResponseContentV0::Block(r)), Ok(r) => BrokerOverlayResponseContentV0::Block(r),
Err(_) => None, Err(_) => BrokerOverlayResponseContentV0::EmptyResponse(()),
}; };
let msg = BrokerMessage::V0(BrokerMessageV0 { let msg = BrokerMessage::V0(BrokerMessageV0 {
padding: vec![0; padding_size], padding: vec![0; padding_size],

@ -28,7 +28,7 @@ use async_std::task;
use p2p_net::errors::*; use p2p_net::errors::*;
use p2p_net::log; use p2p_net::log;
use p2p_net::types::*; use p2p_net::types::*;
use p2p_net::utils::{spawn_and_log_error, ResultSend}; use p2p_net::utils::{spawn_and_log_error, Receiver, ResultSend, Sender};
use p2p_net::{connection::*, WS_PORT}; use p2p_net::{connection::*, WS_PORT};
use p2p_repo::types::*; use p2p_repo::types::*;
use p2p_repo::utils::{generate_keypair, now_timestamp}; use p2p_repo::utils::{generate_keypair, now_timestamp};

@ -6,12 +6,13 @@ use std::any::{Any, TypeId};
use std::convert::From; use std::convert::From;
use std::sync::Arc; use std::sync::Arc;
use crate::utils::{spawn_and_log_error, Receiver, ResultSend, Sender};
use crate::{connection::*, errors::ProtocolError, log, types::ProtocolMessage}; use crate::{connection::*, errors::ProtocolError, log, types::ProtocolMessage};
use std::marker::PhantomData; use std::marker::PhantomData;
pub trait BrokerRequest: std::fmt::Debug { // pub trait BrokerRequest: std::fmt::Debug {
fn send(&self) -> ProtocolMessage; // fn send(&self) -> ProtocolMessage;
} // }
//pub trait BrokerResponse: TryFrom<ProtocolMessage> + std::fmt::Debug {} //pub trait BrokerResponse: TryFrom<ProtocolMessage> + std::fmt::Debug {}
@ -22,10 +23,6 @@ impl TryFrom<ProtocolMessage> for () {
} }
} }
pub trait IActor: EActor {
//fn process_request(&self, req: Box<dyn BrokerRequest>) -> Box<dyn BrokerResponse> {}
}
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait EActor: Send + Sync + std::fmt::Debug { pub trait EActor: Send + Sync + std::fmt::Debug {
//type T: TryFrom<ProtocolMessage, Error = ProtocolError> + std::fmt::Debug; //type T: TryFrom<ProtocolMessage, Error = ProtocolError> + std::fmt::Debug;
@ -41,13 +38,13 @@ pub trait EActor: Send + Sync + std::fmt::Debug {
#[derive(Debug)] #[derive(Debug)]
pub struct Actor< pub struct Actor<
'a, 'a,
A: BrokerRequest, A: Into<ProtocolMessage> + std::fmt::Debug,
B: TryFrom<ProtocolMessage, Error = ProtocolError> + std::fmt::Debug + Sync, B: TryFrom<ProtocolMessage, Error = ProtocolError> + std::fmt::Debug + Sync,
> { > {
id: i64, id: i64,
phantom_a: PhantomData<&'a A>, phantom_a: PhantomData<&'a A>,
phantom_b: PhantomData<&'a B>, phantom_b: PhantomData<&'a B>,
receiver: Receiver<ConnectionCommand>, receiver: Option<Receiver<ConnectionCommand>>,
receiver_tx: Sender<ConnectionCommand>, receiver_tx: Sender<ConnectionCommand>,
initiator: bool, initiator: bool,
} }
@ -83,19 +80,21 @@ pub struct Actor<
// // } // // }
// } // }
pub enum SoS<B> {
Single(B),
Stream(mpsc::UnboundedReceiver<B>),
}
impl< impl<
A: BrokerRequest + 'static, A: Into<ProtocolMessage> + std::fmt::Debug + 'static,
B: TryFrom<ProtocolMessage, Error = ProtocolError> B: TryFrom<ProtocolMessage, Error = ProtocolError> + Sync + Send + std::fmt::Debug + 'static,
+ std::marker::Sync
+ std::fmt::Debug
+ 'static,
> Actor<'_, A, B> > Actor<'_, A, B>
{ {
pub fn new(id: i64, initiator: bool) -> Self { pub fn new(id: i64, initiator: bool) -> Self {
let (mut receiver_tx, receiver) = mpsc::unbounded::<ConnectionCommand>(); let (mut receiver_tx, receiver) = mpsc::unbounded::<ConnectionCommand>();
Self { Self {
id, id,
receiver, receiver: Some(receiver),
receiver_tx, receiver_tx,
phantom_a: PhantomData, phantom_a: PhantomData,
phantom_b: PhantomData, phantom_b: PhantomData,
@ -111,13 +110,62 @@ impl<
pub async fn request( pub async fn request(
&mut self, &mut self,
msg: ProtocolMessage, msg: ProtocolMessage,
stream: Option<impl BrokerRequest + std::marker::Send>, //stream: Option<impl BrokerRequest + std::marker::Send>,
fsm: Arc<Mutex<NoiseFSM>>, fsm: Arc<Mutex<NoiseFSM>>,
) -> Result<B, ProtocolError> { ) -> Result<SoS<B>, ProtocolError> {
//sender.send(ConnectionCommand::Msg(msg.send())).await; //sender.send(ConnectionCommand::Msg(msg.send())).await;
fsm.lock().await.send(msg).await?; fsm.lock().await.send(msg).await?;
match self.receiver.next().await { let mut receiver = self.receiver.take().unwrap();
Some(ConnectionCommand::Msg(msg)) => msg.try_into(), match receiver.next().await {
Some(ConnectionCommand::Msg(msg)) => {
if let ProtocolMessage::BrokerMessage(ref bm) = msg {
if bm.result() == ProtocolError::PartialContent.into()
&& TypeId::of::<B>() != TypeId::of::<()>()
{
let (b_sender, b_receiver) = mpsc::unbounded::<B>();
async fn pump_stream<C: TryFrom<ProtocolMessage, Error = ProtocolError>>(
mut actor_receiver: Receiver<ConnectionCommand>,
mut sos_sender: Sender<C>,
fsm: Arc<Mutex<NoiseFSM>>,
id: i64,
) -> ResultSend<()> {
async move {
while let Some(ConnectionCommand::Msg(msg)) =
actor_receiver.next().await
{
if let ProtocolMessage::BrokerMessage(ref bm) = msg {
if bm.result() == ProtocolError::EndOfStream.into() {
break;
}
let response = msg.try_into();
if response.is_err() {
// TODO deal with errors.
break;
}
sos_sender.send(response.unwrap()).await;
} else {
// todo deal with error (not a brokermessage)
break;
}
}
fsm.lock().await.remove_actor(id).await;
}
.await;
Ok(())
}
spawn_and_log_error(pump_stream::<B>(
receiver,
b_sender,
Arc::clone(&fsm),
self.id,
));
return Ok(SoS::<B>::Stream(b_receiver));
}
}
fsm.lock().await.remove_actor(self.id).await;
let response: B = msg.try_into()?;
Ok(SoS::<B>::Single(response))
}
_ => Err(ProtocolError::ActorError), _ => Err(ProtocolError::ActorError),
} }
} }

@ -15,9 +15,15 @@ pub enum Noise {
V0(NoiseV0), V0(NoiseV0),
} }
impl BrokerRequest for Noise { // impl BrokerRequest for Noise {
fn send(&self) -> ProtocolMessage { // fn send(&self) -> ProtocolMessage {
ProtocolMessage::Noise(self.clone()) // ProtocolMessage::Noise(self.clone())
// }
// }
impl From<Noise> for ProtocolMessage {
fn from(msg: Noise) -> ProtocolMessage {
ProtocolMessage::Noise(msg)
} }
} }
@ -34,18 +40,8 @@ impl TryFrom<ProtocolMessage> for Noise {
impl Actor<'_, Noise, Noise> {} impl Actor<'_, Noise, Noise> {}
// impl IActor for Actor<'_, Noise, Noise> {
// // fn process_request(&self, req: Box<dyn BrokerRequest>) -> Box<dyn BrokerResponse> {
// // //let r = req as Noise;
// // }
// }
#[async_trait::async_trait] #[async_trait::async_trait]
impl EActor for Actor<'_, Noise, Noise> { impl EActor for Actor<'_, Noise, Noise> {
// fn process_request(&self, req: Box<dyn BrokerRequest>) -> Box<dyn BrokerResponse> {
// //let r = req as Noise;
// }
async fn respond( async fn respond(
&mut self, &mut self,
msg: ProtocolMessage, msg: ProtocolMessage,

@ -48,9 +48,15 @@ impl ExtHello {
} }
} }
impl BrokerRequest for ExtHello { // impl BrokerRequest for ExtHello {
fn send(&self) -> ProtocolMessage { // fn send(&self) -> ProtocolMessage {
ProtocolMessage::Start(StartProtocol::Ext(self.clone())) // ProtocolMessage::Start(StartProtocol::Ext(self.clone()))
// }
// }
impl From<ExtHello> for ProtocolMessage {
fn from(msg: ExtHello) -> ProtocolMessage {
ProtocolMessage::Start(StartProtocol::Ext(msg))
} }
} }
@ -96,9 +102,15 @@ impl ServerHello {
} }
} }
impl BrokerRequest for ClientHello { // impl BrokerRequest for ClientHello {
fn send(&self) -> ProtocolMessage { // fn send(&self) -> ProtocolMessage {
ProtocolMessage::Start(StartProtocol::Client(ClientHello::Local)) // ProtocolMessage::Start(StartProtocol::Client(ClientHello::Local))
// }
// }
impl From<ClientHello> for ProtocolMessage {
fn from(msg: ClientHello) -> ProtocolMessage {
ProtocolMessage::Start(StartProtocol::Client(msg))
} }
} }
@ -124,11 +136,11 @@ impl TryFrom<ProtocolMessage> for ServerHello {
} }
} }
impl BrokerRequest for ServerHello { // impl BrokerRequest for ServerHello {
fn send(&self) -> ProtocolMessage { // fn send(&self) -> ProtocolMessage {
ProtocolMessage::ServerHello(self.clone()) // ProtocolMessage::ServerHello(self.clone())
} // }
} // }
impl From<ServerHello> for ProtocolMessage { impl From<ServerHello> for ProtocolMessage {
fn from(msg: ServerHello) -> ProtocolMessage { fn from(msg: ServerHello) -> ProtocolMessage {

@ -3,7 +3,7 @@ use crate::connection::*;
use crate::errors::*; use crate::errors::*;
use crate::types::*; use crate::types::*;
use crate::utils::spawn_and_log_error; use crate::utils::spawn_and_log_error;
use crate::utils::ResultSend; use crate::utils::{Receiver, ResultSend, Sender};
use crate::{log, sleep}; use crate::{log, sleep};
use async_std::stream::StreamExt; use async_std::stream::StreamExt;
use async_std::sync::{Arc, RwLock}; use async_std::sync::{Arc, RwLock};

@ -4,7 +4,7 @@ use std::collections::HashMap;
use std::fmt; use std::fmt;
use std::sync::Arc; use std::sync::Arc;
use crate::actor::{Actor, EActor}; use crate::actor::{Actor, SoS};
use crate::actors::*; use crate::actors::*;
use crate::errors::NetError; use crate::errors::NetError;
use crate::errors::ProtocolError; use crate::errors::ProtocolError;
@ -21,9 +21,6 @@ use unique_id::sequence::SequenceGenerator;
use unique_id::Generator; use unique_id::Generator;
use unique_id::GeneratorFromSeed; use unique_id::GeneratorFromSeed;
pub type Sender<T> = mpsc::UnboundedSender<T>;
pub type Receiver<T> = mpsc::UnboundedReceiver<T>;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum ConnectionCommand { pub enum ConnectionCommand {
Msg(ProtocolMessage), Msg(ProtocolMessage),
@ -77,6 +74,8 @@ pub struct NoiseFSM {
dir: ConnectionDir, dir: ConnectionDir,
sender: Sender<ConnectionCommand>, sender: Sender<ConnectionCommand>,
actors: Arc<Mutex<HashMap<i64, Sender<ConnectionCommand>>>>,
noise_handshake_state: Option<HandshakeState<X25519, ChaCha20Poly1305, Blake2b>>, noise_handshake_state: Option<HandshakeState<X25519, ChaCha20Poly1305, Blake2b>>,
noise_cipher_state: Option<CipherState<ChaCha20Poly1305>>, noise_cipher_state: Option<CipherState<ChaCha20Poly1305>>,
} }
@ -100,6 +99,7 @@ impl NoiseFSM {
pub fn new( pub fn new(
tp: TransportProtocol, tp: TransportProtocol,
dir: ConnectionDir, dir: ConnectionDir,
actors: Arc<Mutex<HashMap<i64, Sender<ConnectionCommand>>>>,
sender: Sender<ConnectionCommand>, sender: Sender<ConnectionCommand>,
) -> Self { ) -> Self {
Self { Self {
@ -109,6 +109,7 @@ impl NoiseFSM {
FSMstate::Noise0 FSMstate::Noise0
}, },
dir, dir,
actors,
sender, sender,
noise_handshake_state: None, noise_handshake_state: None,
noise_cipher_state: None, noise_cipher_state: None,
@ -123,6 +124,10 @@ impl NoiseFSM {
unimplemented!(); unimplemented!();
} }
pub async fn remove_actor(&self, id: i64) {
self.actors.lock().await.remove(&id);
}
pub async fn send(&mut self, msg: ProtocolMessage) -> Result<(), ProtocolError> { pub async fn send(&mut self, msg: ProtocolMessage) -> Result<(), ProtocolError> {
if self.state == FSMstate::AuthResult && self.noise_cipher_state.is_some() { if self.state == FSMstate::AuthResult && self.noise_cipher_state.is_some() {
let cipher = self.encrypt(msg); let cipher = self.encrypt(msg);
@ -348,13 +353,13 @@ impl ConnectionBase {
} }
pub async fn request< pub async fn request<
A: crate::actor::BrokerRequest + Sync + Send + 'static, A: Into<ProtocolMessage> + std::fmt::Debug + Sync + Send + 'static,
B: TryFrom<ProtocolMessage, Error = ProtocolError> + std::fmt::Debug + Sync + 'static, B: TryFrom<ProtocolMessage, Error = ProtocolError> + std::fmt::Debug + Sync + Send + 'static,
>( >(
&self, &self,
msg: A, msg: A,
stream: Option<A>, //stream: Option<A>,
) -> Result<B, ProtocolError> { ) -> Result<SoS<B>, ProtocolError> {
if self.fsm.is_none() { if self.fsm.is_none() {
return Err(ProtocolError::FsmNotReady); return Err(ProtocolError::FsmNotReady);
} }
@ -365,12 +370,11 @@ impl ConnectionBase {
} }
let mut actor = Box::new(Actor::<A, B>::new(id, true)); let mut actor = Box::new(Actor::<A, B>::new(id, true));
self.actors.lock().await.insert(id, actor.get_receiver_tx()); self.actors.lock().await.insert(id, actor.get_receiver_tx());
let mut proto_msg = msg.send(); let mut proto_msg: ProtocolMessage = msg.into();
proto_msg.set_id(id); proto_msg.set_id(id);
let res = actor let res = actor
.request(proto_msg, stream, Arc::clone(self.fsm.as_ref().unwrap())) .request(proto_msg, Arc::clone(self.fsm.as_ref().unwrap()))
.await; .await;
self.actors.lock().await.remove(&id);
res res
} }
@ -403,6 +407,7 @@ impl ConnectionBase {
let fsm = Arc::new(Mutex::new(NoiseFSM::new( let fsm = Arc::new(Mutex::new(NoiseFSM::new(
self.tp, self.tp,
self.dir.clone(), self.dir.clone(),
Arc::clone(&self.actors),
sender_tx.clone(), sender_tx.clone(),
))); )));
self.fsm = Some(Arc::clone(&fsm)); self.fsm = Some(Arc::clone(&fsm));

@ -43,20 +43,22 @@ impl fmt::Display for NetError {
#[derive(Debug, Eq, PartialEq, TryFromPrimitive, IntoPrimitive, Clone)] #[derive(Debug, Eq, PartialEq, TryFromPrimitive, IntoPrimitive, Clone)]
#[repr(u16)] #[repr(u16)]
pub enum ProtocolError { pub enum ProtocolError {
WriteError = 1, NoError = 0,
PartialContent,
EndOfStream,
WriteError,
WsError, WsError,
ActorError, ActorError,
InvalidState, InvalidState,
SignatureError, SignatureError,
InvalidSignature, InvalidSignature,
SerializationError, SerializationError,
PartialContent,
AccessDenied, AccessDenied,
OverlayNotJoined, OverlayNotJoined,
OverlayNotFound, OverlayNotFound,
BrokerError, BrokerError,
NotFound, NotFound,
EndOfStream,
StoreError, StoreError,
MissingBlocks, MissingBlocks,
ObjectParseError, ObjectParseError,
@ -68,7 +70,6 @@ pub enum ProtocolError {
Timeout, Timeout,
PeerAlreadyConnected, PeerAlreadyConnected,
NoError,
OtherError, OtherError,
Closing, Closing,
FsmNotReady, FsmNotReady,

@ -501,6 +501,7 @@ pub enum OverlayRequest {
/// Content of OverlayResponseV0 /// Content of OverlayResponseV0
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub enum OverlayResponseContentV0 { pub enum OverlayResponseContentV0 {
EmptyResponse(()),
Block(Block), Block(Block),
EventResp(EventResp), EventResp(EventResp),
Event(Event), Event(Event),
@ -513,10 +514,10 @@ pub struct OverlayResponseV0 {
pub id: i64, pub id: i64,
/// Result /// Result
pub result: u8, pub result: u16,
/// Response content /// Response content
pub content: Option<OverlayResponseContentV0>, pub content: OverlayResponseContentV0,
} }
/// Request sent to an OverlayRequest /// Request sent to an OverlayRequest
@ -861,6 +862,12 @@ impl BrokerRequest {
} }
} }
/// Content of `BrokerResponseV0`
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum BrokerResponseContentV0 {
EmptyResponse(()),
}
/// Response to a `BrokerRequest` /// Response to a `BrokerRequest`
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BrokerResponseV0 { pub struct BrokerResponseV0 {
@ -869,6 +876,8 @@ pub struct BrokerResponseV0 {
/// Result (including but not limited to Result) /// Result (including but not limited to Result)
pub result: u16, pub result: u16,
pub content: BrokerResponseContentV0,
} }
/// Response to a `BrokerRequest` /// Response to a `BrokerRequest`
@ -1225,6 +1234,7 @@ impl BrokerOverlayRequest {
/// Content of `BrokerOverlayResponseV0` /// Content of `BrokerOverlayResponseV0`
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub enum BrokerOverlayResponseContentV0 { pub enum BrokerOverlayResponseContentV0 {
EmptyResponse(()),
Block(Block), Block(Block),
ObjectId(ObjectId), ObjectId(ObjectId),
OverlayStatusResp(OverlayStatusResp), OverlayStatusResp(OverlayStatusResp),
@ -1240,7 +1250,7 @@ pub struct BrokerOverlayResponseV0 {
pub result: u16, pub result: u16,
/// Response content /// Response content
pub content: Option<BrokerOverlayResponseContentV0>, pub content: BrokerOverlayResponseContentV0,
} }
/// Response to a `BrokerOverlayRequest` /// Response to a `BrokerOverlayRequest`
@ -1270,22 +1280,16 @@ impl BrokerOverlayResponse {
pub fn block(&self) -> Option<&Block> { pub fn block(&self) -> Option<&Block> {
match self { match self {
BrokerOverlayResponse::V0(o) => match &o.content { BrokerOverlayResponse::V0(o) => match &o.content {
Some(contentv0) => match contentv0 { BrokerOverlayResponseContentV0::Block(b) => Some(b),
BrokerOverlayResponseContentV0::Block(b) => Some(b), _ => panic!("this not a block response"),
_ => panic!("this not a block reponse"),
},
None => None,
}, },
} }
} }
pub fn object_id(&self) -> ObjectId { pub fn object_id(&self) -> ObjectId {
match self { match self {
BrokerOverlayResponse::V0(o) => match &o.content { BrokerOverlayResponse::V0(o) => match &o.content {
Some(contentv0) => match contentv0 { BrokerOverlayResponseContentV0::ObjectId(id) => id.clone(),
BrokerOverlayResponseContentV0::ObjectId(id) => id.clone(), _ => panic!("this not an objectId reponse"),
_ => panic!("this not an objectId reponse"),
},
None => panic!("this not an objectId reponse (doesnt have content)"),
}, },
} }
} }

@ -30,3 +30,6 @@ where
} }
}) })
} }
pub type Sender<T> = mpsc::UnboundedSender<T>;
pub type Receiver<T> = mpsc::UnboundedReceiver<T>;

Loading…
Cancel
Save