allowing stream responses to broker request

Niko 2 years ago
parent fd392f2ac9
commit fa73850873
  1. 9
      p2p-broker/src/server.rs
  2. 2
      p2p-client-ws/src/remote_ws.rs
  3. 86
      p2p-net/src/actor.rs
  4. 22
      p2p-net/src/actors/noise.rs
  5. 34
      p2p-net/src/actors/start.rs
  6. 2
      p2p-net/src/broker.rs
  7. 27
      p2p-net/src/connection.rs
  8. 9
      p2p-net/src/errors.rs
  9. 24
      p2p-net/src/types.rs
  10. 3
      p2p-net/src/utils.rs

@ -245,6 +245,7 @@ impl BrokerProtocolHandler {
content: BrokerMessageContentV0::BrokerResponse(BrokerResponse::V0(BrokerResponseV0 {
id,
result,
content: BrokerResponseContentV0::EmptyResponse(()),
})),
});
msg
@ -262,8 +263,8 @@ impl BrokerProtocolHandler {
Err(e) => e.into(),
};
let content = match block {
Some(b) => Some(BrokerOverlayResponseContentV0::Block(b)),
None => None,
Some(b) => BrokerOverlayResponseContentV0::Block(b),
None => BrokerOverlayResponseContentV0::EmptyResponse(()),
};
let msg = BrokerMessage::V0(BrokerMessageV0 {
padding: vec![0; padding_size],
@ -294,8 +295,8 @@ impl BrokerProtocolHandler {
Err(e) => (*e).clone().into(),
};
let content = match res {
Ok(r) => Some(BrokerOverlayResponseContentV0::Block(r)),
Err(_) => None,
Ok(r) => BrokerOverlayResponseContentV0::Block(r),
Err(_) => BrokerOverlayResponseContentV0::EmptyResponse(()),
};
let msg = BrokerMessage::V0(BrokerMessageV0 {
padding: vec![0; padding_size],

@ -28,7 +28,7 @@ use async_std::task;
use p2p_net::errors::*;
use p2p_net::log;
use p2p_net::types::*;
use p2p_net::utils::{spawn_and_log_error, ResultSend};
use p2p_net::utils::{spawn_and_log_error, Receiver, ResultSend, Sender};
use p2p_net::{connection::*, WS_PORT};
use p2p_repo::types::*;
use p2p_repo::utils::{generate_keypair, now_timestamp};

@ -6,12 +6,13 @@ use std::any::{Any, TypeId};
use std::convert::From;
use std::sync::Arc;
use crate::utils::{spawn_and_log_error, Receiver, ResultSend, Sender};
use crate::{connection::*, errors::ProtocolError, log, types::ProtocolMessage};
use std::marker::PhantomData;
pub trait BrokerRequest: std::fmt::Debug {
fn send(&self) -> ProtocolMessage;
}
// pub trait BrokerRequest: std::fmt::Debug {
// fn send(&self) -> ProtocolMessage;
// }
//pub trait BrokerResponse: TryFrom<ProtocolMessage> + std::fmt::Debug {}
@ -22,10 +23,6 @@ impl TryFrom<ProtocolMessage> for () {
}
}
pub trait IActor: EActor {
//fn process_request(&self, req: Box<dyn BrokerRequest>) -> Box<dyn BrokerResponse> {}
}
#[async_trait::async_trait]
pub trait EActor: Send + Sync + std::fmt::Debug {
//type T: TryFrom<ProtocolMessage, Error = ProtocolError> + std::fmt::Debug;
@ -41,13 +38,13 @@ pub trait EActor: Send + Sync + std::fmt::Debug {
#[derive(Debug)]
pub struct Actor<
'a,
A: BrokerRequest,
A: Into<ProtocolMessage> + std::fmt::Debug,
B: TryFrom<ProtocolMessage, Error = ProtocolError> + std::fmt::Debug + Sync,
> {
id: i64,
phantom_a: PhantomData<&'a A>,
phantom_b: PhantomData<&'a B>,
receiver: Receiver<ConnectionCommand>,
receiver: Option<Receiver<ConnectionCommand>>,
receiver_tx: Sender<ConnectionCommand>,
initiator: bool,
}
@ -83,19 +80,21 @@ pub struct Actor<
// // }
// }
pub enum SoS<B> {
Single(B),
Stream(mpsc::UnboundedReceiver<B>),
}
impl<
A: BrokerRequest + 'static,
B: TryFrom<ProtocolMessage, Error = ProtocolError>
+ std::marker::Sync
+ std::fmt::Debug
+ 'static,
A: Into<ProtocolMessage> + std::fmt::Debug + 'static,
B: TryFrom<ProtocolMessage, Error = ProtocolError> + Sync + Send + std::fmt::Debug + 'static,
> Actor<'_, A, B>
{
pub fn new(id: i64, initiator: bool) -> Self {
let (mut receiver_tx, receiver) = mpsc::unbounded::<ConnectionCommand>();
Self {
id,
receiver,
receiver: Some(receiver),
receiver_tx,
phantom_a: PhantomData,
phantom_b: PhantomData,
@ -111,13 +110,62 @@ impl<
pub async fn request(
&mut self,
msg: ProtocolMessage,
stream: Option<impl BrokerRequest + std::marker::Send>,
//stream: Option<impl BrokerRequest + std::marker::Send>,
fsm: Arc<Mutex<NoiseFSM>>,
) -> Result<B, ProtocolError> {
) -> Result<SoS<B>, ProtocolError> {
//sender.send(ConnectionCommand::Msg(msg.send())).await;
fsm.lock().await.send(msg).await?;
match self.receiver.next().await {
Some(ConnectionCommand::Msg(msg)) => msg.try_into(),
let mut receiver = self.receiver.take().unwrap();
match receiver.next().await {
Some(ConnectionCommand::Msg(msg)) => {
if let ProtocolMessage::BrokerMessage(ref bm) = msg {
if bm.result() == ProtocolError::PartialContent.into()
&& TypeId::of::<B>() != TypeId::of::<()>()
{
let (b_sender, b_receiver) = mpsc::unbounded::<B>();
async fn pump_stream<C: TryFrom<ProtocolMessage, Error = ProtocolError>>(
mut actor_receiver: Receiver<ConnectionCommand>,
mut sos_sender: Sender<C>,
fsm: Arc<Mutex<NoiseFSM>>,
id: i64,
) -> ResultSend<()> {
async move {
while let Some(ConnectionCommand::Msg(msg)) =
actor_receiver.next().await
{
if let ProtocolMessage::BrokerMessage(ref bm) = msg {
if bm.result() == ProtocolError::EndOfStream.into() {
break;
}
let response = msg.try_into();
if response.is_err() {
// TODO deal with errors.
break;
}
sos_sender.send(response.unwrap()).await;
} else {
// todo deal with error (not a brokermessage)
break;
}
}
fsm.lock().await.remove_actor(id).await;
}
.await;
Ok(())
}
spawn_and_log_error(pump_stream::<B>(
receiver,
b_sender,
Arc::clone(&fsm),
self.id,
));
return Ok(SoS::<B>::Stream(b_receiver));
}
}
fsm.lock().await.remove_actor(self.id).await;
let response: B = msg.try_into()?;
Ok(SoS::<B>::Single(response))
}
_ => Err(ProtocolError::ActorError),
}
}

@ -15,9 +15,15 @@ pub enum Noise {
V0(NoiseV0),
}
impl BrokerRequest for Noise {
fn send(&self) -> ProtocolMessage {
ProtocolMessage::Noise(self.clone())
// impl BrokerRequest for Noise {
// fn send(&self) -> ProtocolMessage {
// ProtocolMessage::Noise(self.clone())
// }
// }
impl From<Noise> for ProtocolMessage {
fn from(msg: Noise) -> ProtocolMessage {
ProtocolMessage::Noise(msg)
}
}
@ -34,18 +40,8 @@ impl TryFrom<ProtocolMessage> for Noise {
impl Actor<'_, Noise, Noise> {}
// impl IActor for Actor<'_, Noise, Noise> {
// // fn process_request(&self, req: Box<dyn BrokerRequest>) -> Box<dyn BrokerResponse> {
// // //let r = req as Noise;
// // }
// }
#[async_trait::async_trait]
impl EActor for Actor<'_, Noise, Noise> {
// fn process_request(&self, req: Box<dyn BrokerRequest>) -> Box<dyn BrokerResponse> {
// //let r = req as Noise;
// }
async fn respond(
&mut self,
msg: ProtocolMessage,

@ -48,9 +48,15 @@ impl ExtHello {
}
}
impl BrokerRequest for ExtHello {
fn send(&self) -> ProtocolMessage {
ProtocolMessage::Start(StartProtocol::Ext(self.clone()))
// impl BrokerRequest for ExtHello {
// fn send(&self) -> ProtocolMessage {
// ProtocolMessage::Start(StartProtocol::Ext(self.clone()))
// }
// }
impl From<ExtHello> for ProtocolMessage {
fn from(msg: ExtHello) -> ProtocolMessage {
ProtocolMessage::Start(StartProtocol::Ext(msg))
}
}
@ -96,9 +102,15 @@ impl ServerHello {
}
}
impl BrokerRequest for ClientHello {
fn send(&self) -> ProtocolMessage {
ProtocolMessage::Start(StartProtocol::Client(ClientHello::Local))
// impl BrokerRequest for ClientHello {
// fn send(&self) -> ProtocolMessage {
// ProtocolMessage::Start(StartProtocol::Client(ClientHello::Local))
// }
// }
impl From<ClientHello> for ProtocolMessage {
fn from(msg: ClientHello) -> ProtocolMessage {
ProtocolMessage::Start(StartProtocol::Client(msg))
}
}
@ -124,11 +136,11 @@ impl TryFrom<ProtocolMessage> for ServerHello {
}
}
impl BrokerRequest for ServerHello {
fn send(&self) -> ProtocolMessage {
ProtocolMessage::ServerHello(self.clone())
}
}
// impl BrokerRequest for ServerHello {
// fn send(&self) -> ProtocolMessage {
// ProtocolMessage::ServerHello(self.clone())
// }
// }
impl From<ServerHello> for ProtocolMessage {
fn from(msg: ServerHello) -> ProtocolMessage {

@ -3,7 +3,7 @@ use crate::connection::*;
use crate::errors::*;
use crate::types::*;
use crate::utils::spawn_and_log_error;
use crate::utils::ResultSend;
use crate::utils::{Receiver, ResultSend, Sender};
use crate::{log, sleep};
use async_std::stream::StreamExt;
use async_std::sync::{Arc, RwLock};

@ -4,7 +4,7 @@ use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use crate::actor::{Actor, EActor};
use crate::actor::{Actor, SoS};
use crate::actors::*;
use crate::errors::NetError;
use crate::errors::ProtocolError;
@ -21,9 +21,6 @@ use unique_id::sequence::SequenceGenerator;
use unique_id::Generator;
use unique_id::GeneratorFromSeed;
pub type Sender<T> = mpsc::UnboundedSender<T>;
pub type Receiver<T> = mpsc::UnboundedReceiver<T>;
#[derive(Debug, Clone)]
pub enum ConnectionCommand {
Msg(ProtocolMessage),
@ -77,6 +74,8 @@ pub struct NoiseFSM {
dir: ConnectionDir,
sender: Sender<ConnectionCommand>,
actors: Arc<Mutex<HashMap<i64, Sender<ConnectionCommand>>>>,
noise_handshake_state: Option<HandshakeState<X25519, ChaCha20Poly1305, Blake2b>>,
noise_cipher_state: Option<CipherState<ChaCha20Poly1305>>,
}
@ -100,6 +99,7 @@ impl NoiseFSM {
pub fn new(
tp: TransportProtocol,
dir: ConnectionDir,
actors: Arc<Mutex<HashMap<i64, Sender<ConnectionCommand>>>>,
sender: Sender<ConnectionCommand>,
) -> Self {
Self {
@ -109,6 +109,7 @@ impl NoiseFSM {
FSMstate::Noise0
},
dir,
actors,
sender,
noise_handshake_state: None,
noise_cipher_state: None,
@ -123,6 +124,10 @@ impl NoiseFSM {
unimplemented!();
}
pub async fn remove_actor(&self, id: i64) {
self.actors.lock().await.remove(&id);
}
pub async fn send(&mut self, msg: ProtocolMessage) -> Result<(), ProtocolError> {
if self.state == FSMstate::AuthResult && self.noise_cipher_state.is_some() {
let cipher = self.encrypt(msg);
@ -348,13 +353,13 @@ impl ConnectionBase {
}
pub async fn request<
A: crate::actor::BrokerRequest + Sync + Send + 'static,
B: TryFrom<ProtocolMessage, Error = ProtocolError> + std::fmt::Debug + Sync + 'static,
A: Into<ProtocolMessage> + std::fmt::Debug + Sync + Send + 'static,
B: TryFrom<ProtocolMessage, Error = ProtocolError> + std::fmt::Debug + Sync + Send + 'static,
>(
&self,
msg: A,
stream: Option<A>,
) -> Result<B, ProtocolError> {
//stream: Option<A>,
) -> Result<SoS<B>, ProtocolError> {
if self.fsm.is_none() {
return Err(ProtocolError::FsmNotReady);
}
@ -365,12 +370,11 @@ impl ConnectionBase {
}
let mut actor = Box::new(Actor::<A, B>::new(id, true));
self.actors.lock().await.insert(id, actor.get_receiver_tx());
let mut proto_msg = msg.send();
let mut proto_msg: ProtocolMessage = msg.into();
proto_msg.set_id(id);
let res = actor
.request(proto_msg, stream, Arc::clone(self.fsm.as_ref().unwrap()))
.request(proto_msg, Arc::clone(self.fsm.as_ref().unwrap()))
.await;
self.actors.lock().await.remove(&id);
res
}
@ -403,6 +407,7 @@ impl ConnectionBase {
let fsm = Arc::new(Mutex::new(NoiseFSM::new(
self.tp,
self.dir.clone(),
Arc::clone(&self.actors),
sender_tx.clone(),
)));
self.fsm = Some(Arc::clone(&fsm));

@ -43,20 +43,22 @@ impl fmt::Display for NetError {
#[derive(Debug, Eq, PartialEq, TryFromPrimitive, IntoPrimitive, Clone)]
#[repr(u16)]
pub enum ProtocolError {
WriteError = 1,
NoError = 0,
PartialContent,
EndOfStream,
WriteError,
WsError,
ActorError,
InvalidState,
SignatureError,
InvalidSignature,
SerializationError,
PartialContent,
AccessDenied,
OverlayNotJoined,
OverlayNotFound,
BrokerError,
NotFound,
EndOfStream,
StoreError,
MissingBlocks,
ObjectParseError,
@ -68,7 +70,6 @@ pub enum ProtocolError {
Timeout,
PeerAlreadyConnected,
NoError,
OtherError,
Closing,
FsmNotReady,

@ -501,6 +501,7 @@ pub enum OverlayRequest {
/// Content of OverlayResponseV0
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum OverlayResponseContentV0 {
EmptyResponse(()),
Block(Block),
EventResp(EventResp),
Event(Event),
@ -513,10 +514,10 @@ pub struct OverlayResponseV0 {
pub id: i64,
/// Result
pub result: u8,
pub result: u16,
/// Response content
pub content: Option<OverlayResponseContentV0>,
pub content: OverlayResponseContentV0,
}
/// Request sent to an OverlayRequest
@ -861,6 +862,12 @@ impl BrokerRequest {
}
}
/// Content of `BrokerResponseV0`
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum BrokerResponseContentV0 {
EmptyResponse(()),
}
/// Response to a `BrokerRequest`
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BrokerResponseV0 {
@ -869,6 +876,8 @@ pub struct BrokerResponseV0 {
/// Result (including but not limited to Result)
pub result: u16,
pub content: BrokerResponseContentV0,
}
/// Response to a `BrokerRequest`
@ -1225,6 +1234,7 @@ impl BrokerOverlayRequest {
/// Content of `BrokerOverlayResponseV0`
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum BrokerOverlayResponseContentV0 {
EmptyResponse(()),
Block(Block),
ObjectId(ObjectId),
OverlayStatusResp(OverlayStatusResp),
@ -1240,7 +1250,7 @@ pub struct BrokerOverlayResponseV0 {
pub result: u16,
/// Response content
pub content: Option<BrokerOverlayResponseContentV0>,
pub content: BrokerOverlayResponseContentV0,
}
/// Response to a `BrokerOverlayRequest`
@ -1270,23 +1280,17 @@ impl BrokerOverlayResponse {
pub fn block(&self) -> Option<&Block> {
match self {
BrokerOverlayResponse::V0(o) => match &o.content {
Some(contentv0) => match contentv0 {
BrokerOverlayResponseContentV0::Block(b) => Some(b),
_ => panic!("this not a block reponse"),
},
None => None,
_ => panic!("this not a block response"),
},
}
}
pub fn object_id(&self) -> ObjectId {
match self {
BrokerOverlayResponse::V0(o) => match &o.content {
Some(contentv0) => match contentv0 {
BrokerOverlayResponseContentV0::ObjectId(id) => id.clone(),
_ => panic!("this not an objectId reponse"),
},
None => panic!("this not an objectId reponse (doesnt have content)"),
},
}
}
}

@ -30,3 +30,6 @@ where
}
})
}
pub type Sender<T> = mpsc::UnboundedSender<T>;
pub type Receiver<T> = mpsc::UnboundedReceiver<T>;

Loading…
Cancel
Save