From fd392f2ac9dcf11ee213624984714f7bae688d10 Mon Sep 17 00:00:00 2001 From: Niko Date: Wed, 3 May 2023 12:15:49 +0300 Subject: [PATCH] sending Ids of ProtocolMessage --- ng-app-js/src/lib.rs | 3 ++ p2p-client-ws/src/remote_ws.rs | 2 +- p2p-net/src/actor.rs | 14 +++---- p2p-net/src/broker.rs | 5 +-- p2p-net/src/connection.rs | 49 +++++++++++----------- p2p-net/src/types.rs | 75 ++++++++++++++++++++++++++++++++++ 6 files changed, 113 insertions(+), 35 deletions(-) diff --git a/ng-app-js/src/lib.rs b/ng-app-js/src/lib.rs index cce70f4..f7d085f 100644 --- a/ng-app-js/src/lib.rs +++ b/ng-app-js/src/lib.rs @@ -41,6 +41,9 @@ pub async fn greet(name: &str) { ) .await; log!("broker.connect : {:?}", res); + if res.is_err() { + panic!("Cannot connect"); + } BROKER.read().await.print_status(); //res.expect_throw("assume the connection succeeds"); diff --git a/p2p-client-ws/src/remote_ws.rs b/p2p-client-ws/src/remote_ws.rs index 7e1095c..f987471 100644 --- a/p2p-client-ws/src/remote_ws.rs +++ b/p2p-client-ws/src/remote_ws.rs @@ -340,7 +340,7 @@ mod test { ) .await; log!("broker.connect : {:?}", res); - //res.expect_throw("assume the connection succeeds"); + res.expect("assume the connection succeeds"); } async fn timer_close(remote_peer_id: DirectPeerId) -> ResultSend<()> { diff --git a/p2p-net/src/actor.rs b/p2p-net/src/actor.rs index 1d8dec6..af180da 100644 --- a/p2p-net/src/actor.rs +++ b/p2p-net/src/actor.rs @@ -42,7 +42,7 @@ pub trait EActor: Send + Sync + std::fmt::Debug { pub struct Actor< 'a, A: BrokerRequest, - B: TryFrom + std::fmt::Debug + std::marker::Sync, + B: TryFrom + std::fmt::Debug + Sync, > { id: i64, phantom_a: PhantomData<&'a A>, @@ -103,19 +103,19 @@ impl< } } - pub fn verify(&self, msg: ProtocolMessage) -> bool { - self.initiator && msg.type_id() == TypeId::of::() - || !self.initiator && msg.type_id() == TypeId::of::() - } + // pub fn verify(&self, msg: ProtocolMessage) -> bool { + // self.initiator && msg.type_id() == TypeId::of::() + // || !self.initiator && msg.type_id() == TypeId::of::() + // } pub async fn request( &mut self, - msg: impl BrokerRequest + std::marker::Sync + std::marker::Send, + msg: ProtocolMessage, stream: Option, fsm: Arc>, ) -> Result { //sender.send(ConnectionCommand::Msg(msg.send())).await; - fsm.lock().await.send(msg.send()).await?; + fsm.lock().await.send(msg).await?; match self.receiver.next().await { Some(ConnectionCommand::Msg(msg)) => msg.try_into(), _ => Err(ProtocolError::ActorError), diff --git a/p2p-net/src/broker.rs b/p2p-net/src/broker.rs index 518c076..074151c 100644 --- a/p2p-net/src/broker.rs +++ b/p2p-net/src/broker.rs @@ -177,9 +177,8 @@ impl Broker { //let cnx = Arc::new(); let (priv_key, pub_key) = generate_keypair(); log!("CONNECTING"); - let connection_res = cnx.open(ip, priv_key, pub_key, remote_peer_id).await; - log!("CONNECTED {:?}", connection_res); - let mut connection = connection_res.unwrap(); + let mut connection = cnx.open(ip, priv_key, pub_key, remote_peer_id).await?; + let join = connection.take_shutdown(); let connected = if core.is_some() { diff --git a/p2p-net/src/connection.rs b/p2p-net/src/connection.rs index d46a821..6ea53f2 100644 --- a/p2p-net/src/connection.rs +++ b/p2p-net/src/connection.rs @@ -135,21 +135,21 @@ impl NoiseFSM { } } - pub async fn receive( - &mut self, - msg: ProtocolMessage, - ) -> Result { - if self.state == FSMstate::AuthResult && self.noise_cipher_state.is_some() { - if let ProtocolMessage::Noise(noise) = msg { - let new = self.decrypt(&noise); - Ok(new) - } else { - Err(ProtocolError::MustBeEncrypted) - } - } else { - Err(ProtocolError::InvalidState) - } - } + // pub async fn receive( + // &mut self, + // msg: ProtocolMessage, + // ) -> Result { + // if self.state == FSMstate::AuthResult && self.noise_cipher_state.is_some() { + // if let ProtocolMessage::Noise(noise) = msg { + // let new = self.decrypt(&noise); + // Ok(new) + // } else { + // Err(ProtocolError::MustBeEncrypted) + // } + // } else { + // Err(ProtocolError::InvalidState) + // } + // } pub fn step( &mut self, @@ -181,7 +181,7 @@ impl NoiseFSM { FSMstate::Noise0 => {} FSMstate::Noise1 => {} FSMstate::Noise2 => {} - FSMstate::Noise3 => {} + FSMstate::Noise3 => {} //set noise_handshake_state to none FSMstate::ExtRequest => {} FSMstate::ExtResponse => {} FSMstate::ClientHello => {} @@ -348,11 +348,8 @@ impl ConnectionBase { } pub async fn request< - A: crate::actor::BrokerRequest + std::marker::Sync + std::marker::Send + 'static, - B: TryFrom - + std::fmt::Debug - + std::marker::Sync - + 'static, + A: crate::actor::BrokerRequest + Sync + Send + 'static, + B: TryFrom + std::fmt::Debug + Sync + 'static, >( &self, msg: A, @@ -368,9 +365,13 @@ impl ConnectionBase { } let mut actor = Box::new(Actor::::new(id, true)); self.actors.lock().await.insert(id, actor.get_receiver_tx()); - actor - .request(msg, stream, Arc::clone(self.fsm.as_ref().unwrap())) - .await + let mut proto_msg = msg.send(); + proto_msg.set_id(id); + let res = actor + .request(proto_msg, stream, Arc::clone(self.fsm.as_ref().unwrap())) + .await; + self.actors.lock().await.remove(&id); + res } pub async fn send(&mut self, cmd: ConnectionCommand) { diff --git a/p2p-net/src/types.rs b/p2p-net/src/types.rs index 45b4d50..df32454 100644 --- a/p2p-net/src/types.rs +++ b/p2p-net/src/types.rs @@ -842,6 +842,13 @@ impl BrokerRequest { BrokerRequest::V0(o) => o.id, } } + pub fn set_id(&mut self, id: i64) { + match self { + BrokerRequest::V0(v0) => { + v0.id = id; + } + } + } pub fn type_id(&self) -> TypeId { match self { BrokerRequest::V0(o) => o.content.type_id(), @@ -876,6 +883,13 @@ impl BrokerResponse { BrokerResponse::V0(o) => o.id, } } + pub fn set_id(&mut self, id: i64) { + match self { + BrokerResponse::V0(v0) => { + v0.id = id; + } + } + } pub fn result(&self) -> u16 { match self { BrokerResponse::V0(o) => o.result, @@ -1194,6 +1208,13 @@ impl BrokerOverlayRequest { BrokerOverlayRequest::V0(o) => o.id, } } + pub fn set_id(&mut self, id: i64) { + match self { + BrokerOverlayRequest::V0(v0) => { + v0.id = id; + } + } + } pub fn content_v0(&self) -> &BrokerOverlayRequestContentV0 { match self { BrokerOverlayRequest::V0(o) => &o.content, @@ -1234,6 +1255,13 @@ impl BrokerOverlayResponse { BrokerOverlayResponse::V0(o) => o.id, } } + pub fn set_id(&mut self, id: i64) { + match self { + BrokerOverlayResponse::V0(v0) => { + v0.id = id; + } + } + } pub fn result(&self) -> u16 { match self { BrokerOverlayResponse::V0(o) => o.result, @@ -1329,6 +1357,17 @@ impl BrokerOverlayMessage { }, } } + pub fn set_id(&mut self, id: i64) { + match self { + BrokerOverlayMessage::V0(o) => match &mut o.content { + BrokerOverlayMessageContentV0::BrokerOverlayResponse(ref mut r) => r.set_id(id), + BrokerOverlayMessageContentV0::BrokerOverlayRequest(ref mut r) => r.set_id(id), + BrokerOverlayMessageContentV0::Event(_) => { + panic!("it is an event") + } + }, + } + } pub fn result(&self) -> u16 { match self { BrokerOverlayMessage::V0(o) => match &o.content { @@ -1450,6 +1489,16 @@ impl BrokerMessage { BrokerMessage::Close => panic!("Close not implemented"), } } + pub fn set_id(&mut self, id: i64) { + match self { + BrokerMessage::V0(o) => match &mut o.content { + BrokerMessageContentV0::BrokerOverlayMessage(ref mut p) => p.set_id(id), + BrokerMessageContentV0::BrokerResponse(ref mut r) => r.set_id(id), + BrokerMessageContentV0::BrokerRequest(ref mut r) => r.set_id(id), + }, + BrokerMessage::Close => panic!("Close not implemented"), + } + } pub fn result(&self) -> u16 { match self { BrokerMessage::V0(o) => match &o.content { @@ -1577,6 +1626,13 @@ impl ExtRequest { ExtRequest::V0(v0) => v0.id, } } + pub fn set_id(&mut self, id: i64) { + match self { + ExtRequest::V0(v0) => { + v0.id = id; + } + } + } } /// Content of ExtResponseV0 @@ -1612,6 +1668,13 @@ impl ExtResponse { ExtResponse::V0(v0) => v0.id, } } + pub fn set_id(&mut self, id: i64) { + match self { + ExtResponse::V0(v0) => { + v0.id = id; + } + } + } } impl TryFrom for ExtResponse { @@ -1653,6 +1716,18 @@ impl ProtocolMessage { ProtocolMessage::BrokerMessage(broker_msg) => broker_msg.id(), } } + pub fn set_id(&mut self, id: i64) { + match self { + ProtocolMessage::Noise(_) => panic!("cannot set ID"), + ProtocolMessage::Start(_) => panic!("cannot set ID"), + ProtocolMessage::ServerHello(_) => panic!("cannot set ID"), + ProtocolMessage::ClientAuth(_) => panic!("cannot set ID"), + ProtocolMessage::AuthResult(_) => panic!("cannot set ID"), + ProtocolMessage::ExtRequest(ext_req) => ext_req.set_id(id), + ProtocolMessage::ExtResponse(ext_res) => ext_res.set_id(id), + ProtocolMessage::BrokerMessage(broker_msg) => broker_msg.set_id(id), + } + } pub fn type_id(&self) -> TypeId { match self { ProtocolMessage::Noise(a) => a.type_id(),