From fe56d2c8dad30a28744d02e5e239a57612a7b5c4 Mon Sep 17 00:00:00 2001 From: Niko PLP Date: Mon, 26 Jun 2023 01:41:24 +0300 Subject: [PATCH] refactor shutdown of anonymous connection --- p2p-client-ws/src/remote_ws.rs | 5 +- p2p-net/src/broker.rs | 112 +++++++++++++++++++++++++-------- p2p-net/src/connection.rs | 24 +++++-- 3 files changed, 107 insertions(+), 34 deletions(-) diff --git a/p2p-client-ws/src/remote_ws.rs b/p2p-client-ws/src/remote_ws.rs index eed010e..d87780e 100644 --- a/p2p-client-ws/src/remote_ws.rs +++ b/p2p-client-ws/src/remote_ws.rs @@ -19,6 +19,7 @@ use async_tungstenite::tungstenite::protocol::CloseFrame; use async_tungstenite::WebSocketStream; use async_std::sync::Mutex; +use futures::future::Either; use futures::io::Close; use futures::{future, pin_mut, select, stream, StreamExt}; use futures::{FutureExt, SinkExt}; @@ -71,7 +72,7 @@ impl IConnect for ConnectionWebSocket { let res = ws_loop(websocket, s, r).await; if res.is_err() { - let _ = shutdown.send(res.err().unwrap()).await; + let _ = shutdown.send(Either::Left(res.err().unwrap())).await; } log_debug!("END of WS loop"); }); @@ -112,7 +113,7 @@ impl IAccept for ConnectionWebSocket { let res = ws_loop(socket, s, r).await; if res.is_err() { - let _ = shutdown.send(res.err().unwrap()).await; + let _ = shutdown.send(Either::Left(res.err().unwrap())).await; } log_debug!("END of WS loop"); }); diff --git a/p2p-net/src/broker.rs b/p2p-net/src/broker.rs index 2ea6857..aa44949 100644 --- a/p2p-net/src/broker.rs +++ b/p2p-net/src/broker.rs @@ -18,6 +18,7 @@ use crate::utils::{Receiver, ResultSend, Sender}; use async_std::stream::StreamExt; use async_std::sync::{Arc, RwLock}; use futures::channel::mpsc; +use futures::future::Either; use futures::SinkExt; use noise_protocol::U8Array; use noise_rust_crypto::sensitive::Sensitive; @@ -64,7 +65,7 @@ pub struct Broker { direct_connections: HashMap, peers: HashMap, /// (local,remote) -> ConnectionBase - incoming_anonymous_connections: HashMap<(BindAddress, BindAddress), ConnectionBase>, + anonymous_connections: HashMap<(BindAddress, BindAddress), ConnectionBase>, #[cfg(not(target_arch = "wasm32"))] listeners: HashMap, bind_addresses: HashMap, @@ -230,7 +231,7 @@ impl Broker { None => {} } } - pub fn remove(&mut self, peer_id: &DirectPeerId) { + pub fn remove_peer_id(&mut self, peer_id: &DirectPeerId) { let removed = self.peers.remove(peer_id); match removed { Some(info) => match info.connected { @@ -244,6 +245,16 @@ impl Broker { } } + pub fn remove_anonymous( + &mut self, + remote_bind_address: BindAddress, + local_bind_address: BindAddress, + ) { + let removed = self + .anonymous_connections + .remove(&(local_bind_address, remote_bind_address)); + } + pub fn test(&self) -> u32 { self.test } @@ -253,7 +264,7 @@ impl Broker { let mut random_buf = [0u8; 4]; getrandom::getrandom(&mut random_buf).unwrap(); Broker { - incoming_anonymous_connections: HashMap::new(), + anonymous_connections: HashMap::new(), #[cfg(not(target_arch = "wasm32"))] listeners: HashMap::new(), bind_addresses: HashMap::new(), @@ -308,18 +319,23 @@ impl Broker { } pub async fn graceful_shutdown() { - let keys; + let peer_ids; + let anonymous; { let mut broker = BROKER.write().await; if broker.closing { return; } broker.closing = true; - keys = Vec::from_iter(broker.peers.keys().cloned()); + peer_ids = Vec::from_iter(broker.peers.keys().cloned()); + anonymous = Vec::from_iter(broker.anonymous_connections.keys().cloned()); } - for peer_id in keys { + for peer_id in peer_ids { BROKER.write().await.close_peer_connection(&peer_id).await; } + for anon in anonymous { + BROKER.write().await.close_anonymous(anon.1, anon.0).await; + } let _ = BROKER .write() .await @@ -339,7 +355,7 @@ impl Broker { pub async fn accept( &mut self, - connection: ConnectionBase, + mut connection: ConnectionBase, remote_bind_address: BindAddress, local_bind_address: BindAddress, ) -> Result<(), NetError> { @@ -347,8 +363,9 @@ impl Broker { return Err(NetError::Closing); } + let join = connection.take_shutdown(); if self - .incoming_anonymous_connections + .anonymous_connections .insert((local_bind_address, remote_bind_address), connection) .is_some() { @@ -358,6 +375,39 @@ impl Broker { remote_bind_address ); } + + async fn watch_close( + mut join: Receiver>, + remote_bind_address: BindAddress, + local_bind_address: BindAddress, + ) -> ResultSend<()> { + async move { + let res = join.next().await; + match res { + Some(Either::Right(remote_peer_id)) => { + let res = join.next().await; + log_info!("SOCKET IS CLOSED {:?} peer_id: {:?}", res, remote_peer_id); + BROKER.write().await.remove_peer_id(&remote_peer_id); + } + _ => { + log_info!( + "SOCKET IS CLOSED {:?} remote: {:?} local: {:?}", + res, + remote_bind_address, + local_bind_address + ); + BROKER + .write() + .await + .remove_anonymous(remote_bind_address, local_bind_address); + } + } + } + .await; + Ok(()) + } + spawn_and_log_error(watch_close(join, remote_bind_address, local_bind_address)); + Ok(()) } @@ -369,11 +419,12 @@ impl Broker { core: Option, ) -> Result<(), NetError> { log_debug!("ATTACH PEER_ID {}", remote_peer_id); - let mut connection = self - .incoming_anonymous_connections + let connection = self + .anonymous_connections .remove(&(local_bind_address, remote_bind_address)) .ok_or(NetError::InternalError)?; - let join = connection.take_shutdown(); + + connection.reset_shutdown(remote_peer_id).await; let ip = remote_bind_address.ip; let connected = if core.is_some() { let dc = DirectConnection { @@ -394,20 +445,6 @@ impl Broker { }; self.peers.insert(remote_peer_id, bpi); - async fn watch_close( - mut join: Receiver, - remote_peer_id: DirectPeerId, - ) -> ResultSend<()> { - async move { - let res = join.next().await; - log_info!("SOCKET IS CLOSED {:?} {:?}", res, &remote_peer_id); - log_info!("REMOVED"); - BROKER.write().await.remove(&remote_peer_id); - } - .await; - Ok(()) - } - spawn_and_log_error(watch_close(join, remote_peer_id)); Ok(()) } @@ -461,7 +498,7 @@ impl Broker { self.peers.insert(remote_peer_id, bpi); async fn watch_close( - mut join: Receiver, + mut join: Receiver>, cnx: Box, ip: IP, core: Option, // the interface used as egress for this connection @@ -484,7 +521,7 @@ impl Broker { // TODO: deal with error and incremental backoff } else { log_info!("REMOVED"); - BROKER.write().await.remove(&remote_peer_id); + BROKER.write().await.remove_peer_id(&remote_peer_id); } } .await; @@ -518,6 +555,19 @@ impl Broker { } } + pub async fn close_anonymous( + &mut self, + remote_bind_address: BindAddress, + local_bind_address: BindAddress, + ) { + if let Some(cb) = self + .anonymous_connections + .get_mut(&(local_bind_address, remote_bind_address)) + { + cb.close().await; + } + } + pub fn print_status(&self) { self.peers.iter().for_each(|(peerId, peerInfo)| { log_info!("PEER in BROKER {:?} {:?}", peerId, peerInfo); @@ -525,5 +575,13 @@ impl Broker { self.direct_connections.iter().for_each(|(ip, directCnx)| { log_info!("direct_connection in BROKER {:?} {:?}", ip, directCnx) }); + self.anonymous_connections.iter().for_each(|(binds, cb)| { + log_info!( + "ANONYMOUS remote {:?} local {:?} {:?}", + binds.1, + binds.0, + cb + ); + }); } } diff --git a/p2p-net/src/connection.rs b/p2p-net/src/connection.rs index 4cffe06..a009e41 100644 --- a/p2p-net/src/connection.rs +++ b/p2p-net/src/connection.rs @@ -24,6 +24,7 @@ use crate::types::*; use crate::utils::*; use async_std::stream::StreamExt; use async_std::sync::Mutex; +use futures::future::Either; use futures::{channel::mpsc, select, Future, FutureExt, SinkExt}; use noise_protocol::U8Array; use noise_protocol::{patterns::noise_xk, CipherState, HandshakeState}; @@ -558,7 +559,8 @@ pub struct ConnectionBase { receiver: Option>, sender_tx: Option>, receiver_tx: Option>, - shutdown: Option>, + shutdown: Option>>, + shutdown_sender: Option>>, dir: ConnectionDir, next_request_id: SequenceGenerator, tp: TransportProtocol, @@ -575,6 +577,7 @@ impl ConnectionBase { sender_tx: None, receiver_tx: None, shutdown: None, + shutdown_sender: None, next_request_id: SequenceGenerator::new(1), dir, tp, @@ -586,20 +589,31 @@ impl ConnectionBase { self.tp } - pub fn take_shutdown(&mut self) -> Receiver { + pub fn take_shutdown(&mut self) -> Receiver> { self.shutdown.take().unwrap() } pub async fn join_shutdown(&mut self) -> Result<(), NetError> { match self.take_shutdown().next().await { - Some(error) => Err(error), + Some(Either::Left(error)) => Err(error), + Some(Either::Right(_)) => Ok(()), None => Ok(()), } } - pub fn set_shutdown(&mut self) -> Sender { - let (shutdown_sender, shutdown_receiver) = mpsc::unbounded::(); + pub async fn reset_shutdown(&self, remote_peer_id: PubKey) { + let _ = self + .shutdown_sender + .as_ref() + .unwrap() + .send(Either::Right(remote_peer_id)) + .await; + } + + pub fn set_shutdown(&mut self) -> Sender> { + let (shutdown_sender, shutdown_receiver) = mpsc::unbounded::>(); self.shutdown = Some(shutdown_receiver); + self.shutdown_sender = Some(shutdown_sender.clone()); shutdown_sender }