refactor shutdown of anonymous connection

Niko PLP 11 months ago
parent c1603844e2
commit e76b104a5d
  1. 5
      p2p-client-ws/src/remote_ws.rs
  2. 112
      p2p-net/src/broker.rs
  3. 24
      p2p-net/src/connection.rs

@ -19,6 +19,7 @@ use async_tungstenite::tungstenite::protocol::CloseFrame;
use async_tungstenite::WebSocketStream; use async_tungstenite::WebSocketStream;
use async_std::sync::Mutex; use async_std::sync::Mutex;
use futures::future::Either;
use futures::io::Close; use futures::io::Close;
use futures::{future, pin_mut, select, stream, StreamExt}; use futures::{future, pin_mut, select, stream, StreamExt};
use futures::{FutureExt, SinkExt}; use futures::{FutureExt, SinkExt};
@ -71,7 +72,7 @@ impl IConnect for ConnectionWebSocket {
let res = ws_loop(websocket, s, r).await; let res = ws_loop(websocket, s, r).await;
if res.is_err() { if res.is_err() {
let _ = shutdown.send(res.err().unwrap()).await; let _ = shutdown.send(Either::Left(res.err().unwrap())).await;
} }
log_debug!("END of WS loop"); log_debug!("END of WS loop");
}); });
@ -112,7 +113,7 @@ impl IAccept for ConnectionWebSocket {
let res = ws_loop(socket, s, r).await; let res = ws_loop(socket, s, r).await;
if res.is_err() { if res.is_err() {
let _ = shutdown.send(res.err().unwrap()).await; let _ = shutdown.send(Either::Left(res.err().unwrap())).await;
} }
log_debug!("END of WS loop"); log_debug!("END of WS loop");
}); });

@ -18,6 +18,7 @@ use crate::utils::{Receiver, ResultSend, Sender};
use async_std::stream::StreamExt; use async_std::stream::StreamExt;
use async_std::sync::{Arc, RwLock}; use async_std::sync::{Arc, RwLock};
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::future::Either;
use futures::SinkExt; use futures::SinkExt;
use noise_protocol::U8Array; use noise_protocol::U8Array;
use noise_rust_crypto::sensitive::Sensitive; use noise_rust_crypto::sensitive::Sensitive;
@ -64,7 +65,7 @@ pub struct Broker {
direct_connections: HashMap<IP, DirectConnection>, direct_connections: HashMap<IP, DirectConnection>,
peers: HashMap<DirectPeerId, BrokerPeerInfo>, peers: HashMap<DirectPeerId, BrokerPeerInfo>,
/// (local,remote) -> ConnectionBase /// (local,remote) -> ConnectionBase
incoming_anonymous_connections: HashMap<(BindAddress, BindAddress), ConnectionBase>, anonymous_connections: HashMap<(BindAddress, BindAddress), ConnectionBase>,
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
listeners: HashMap<String, ListenerInfo>, listeners: HashMap<String, ListenerInfo>,
bind_addresses: HashMap<BindAddress, String>, bind_addresses: HashMap<BindAddress, String>,
@ -230,7 +231,7 @@ impl Broker {
None => {} None => {}
} }
} }
pub fn remove(&mut self, peer_id: &DirectPeerId) { pub fn remove_peer_id(&mut self, peer_id: &DirectPeerId) {
let removed = self.peers.remove(peer_id); let removed = self.peers.remove(peer_id);
match removed { match removed {
Some(info) => match info.connected { Some(info) => match info.connected {
@ -244,6 +245,16 @@ impl Broker {
} }
} }
pub fn remove_anonymous(
&mut self,
remote_bind_address: BindAddress,
local_bind_address: BindAddress,
) {
let removed = self
.anonymous_connections
.remove(&(local_bind_address, remote_bind_address));
}
pub fn test(&self) -> u32 { pub fn test(&self) -> u32 {
self.test self.test
} }
@ -253,7 +264,7 @@ impl Broker {
let mut random_buf = [0u8; 4]; let mut random_buf = [0u8; 4];
getrandom::getrandom(&mut random_buf).unwrap(); getrandom::getrandom(&mut random_buf).unwrap();
Broker { Broker {
incoming_anonymous_connections: HashMap::new(), anonymous_connections: HashMap::new(),
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
listeners: HashMap::new(), listeners: HashMap::new(),
bind_addresses: HashMap::new(), bind_addresses: HashMap::new(),
@ -308,18 +319,23 @@ impl Broker {
} }
pub async fn graceful_shutdown() { pub async fn graceful_shutdown() {
let keys; let peer_ids;
let anonymous;
{ {
let mut broker = BROKER.write().await; let mut broker = BROKER.write().await;
if broker.closing { if broker.closing {
return; return;
} }
broker.closing = true; broker.closing = true;
keys = Vec::from_iter(broker.peers.keys().cloned()); peer_ids = Vec::from_iter(broker.peers.keys().cloned());
anonymous = Vec::from_iter(broker.anonymous_connections.keys().cloned());
} }
for peer_id in keys { for peer_id in peer_ids {
BROKER.write().await.close_peer_connection(&peer_id).await; BROKER.write().await.close_peer_connection(&peer_id).await;
} }
for anon in anonymous {
BROKER.write().await.close_anonymous(anon.1, anon.0).await;
}
let _ = BROKER let _ = BROKER
.write() .write()
.await .await
@ -339,7 +355,7 @@ impl Broker {
pub async fn accept( pub async fn accept(
&mut self, &mut self,
connection: ConnectionBase, mut connection: ConnectionBase,
remote_bind_address: BindAddress, remote_bind_address: BindAddress,
local_bind_address: BindAddress, local_bind_address: BindAddress,
) -> Result<(), NetError> { ) -> Result<(), NetError> {
@ -347,8 +363,9 @@ impl Broker {
return Err(NetError::Closing); return Err(NetError::Closing);
} }
let join = connection.take_shutdown();
if self if self
.incoming_anonymous_connections .anonymous_connections
.insert((local_bind_address, remote_bind_address), connection) .insert((local_bind_address, remote_bind_address), connection)
.is_some() .is_some()
{ {
@ -358,6 +375,39 @@ impl Broker {
remote_bind_address remote_bind_address
); );
} }
async fn watch_close(
mut join: Receiver<Either<NetError, PubKey>>,
remote_bind_address: BindAddress,
local_bind_address: BindAddress,
) -> ResultSend<()> {
async move {
let res = join.next().await;
match res {
Some(Either::Right(remote_peer_id)) => {
let res = join.next().await;
log_info!("SOCKET IS CLOSED {:?} peer_id: {:?}", res, remote_peer_id);
BROKER.write().await.remove_peer_id(&remote_peer_id);
}
_ => {
log_info!(
"SOCKET IS CLOSED {:?} remote: {:?} local: {:?}",
res,
remote_bind_address,
local_bind_address
);
BROKER
.write()
.await
.remove_anonymous(remote_bind_address, local_bind_address);
}
}
}
.await;
Ok(())
}
spawn_and_log_error(watch_close(join, remote_bind_address, local_bind_address));
Ok(()) Ok(())
} }
@ -369,11 +419,12 @@ impl Broker {
core: Option<String>, core: Option<String>,
) -> Result<(), NetError> { ) -> Result<(), NetError> {
log_debug!("ATTACH PEER_ID {}", remote_peer_id); log_debug!("ATTACH PEER_ID {}", remote_peer_id);
let mut connection = self let connection = self
.incoming_anonymous_connections .anonymous_connections
.remove(&(local_bind_address, remote_bind_address)) .remove(&(local_bind_address, remote_bind_address))
.ok_or(NetError::InternalError)?; .ok_or(NetError::InternalError)?;
let join = connection.take_shutdown();
connection.reset_shutdown(remote_peer_id).await;
let ip = remote_bind_address.ip; let ip = remote_bind_address.ip;
let connected = if core.is_some() { let connected = if core.is_some() {
let dc = DirectConnection { let dc = DirectConnection {
@ -394,20 +445,6 @@ impl Broker {
}; };
self.peers.insert(remote_peer_id, bpi); self.peers.insert(remote_peer_id, bpi);
async fn watch_close(
mut join: Receiver<NetError>,
remote_peer_id: DirectPeerId,
) -> ResultSend<()> {
async move {
let res = join.next().await;
log_info!("SOCKET IS CLOSED {:?} {:?}", res, &remote_peer_id);
log_info!("REMOVED");
BROKER.write().await.remove(&remote_peer_id);
}
.await;
Ok(())
}
spawn_and_log_error(watch_close(join, remote_peer_id));
Ok(()) Ok(())
} }
@ -461,7 +498,7 @@ impl Broker {
self.peers.insert(remote_peer_id, bpi); self.peers.insert(remote_peer_id, bpi);
async fn watch_close( async fn watch_close(
mut join: Receiver<NetError>, mut join: Receiver<Either<NetError, PubKey>>,
cnx: Box<dyn IConnect>, cnx: Box<dyn IConnect>,
ip: IP, ip: IP,
core: Option<String>, // the interface used as egress for this connection core: Option<String>, // the interface used as egress for this connection
@ -484,7 +521,7 @@ impl Broker {
// TODO: deal with error and incremental backoff // TODO: deal with error and incremental backoff
} else { } else {
log_info!("REMOVED"); log_info!("REMOVED");
BROKER.write().await.remove(&remote_peer_id); BROKER.write().await.remove_peer_id(&remote_peer_id);
} }
} }
.await; .await;
@ -518,6 +555,19 @@ impl Broker {
} }
} }
pub async fn close_anonymous(
&mut self,
remote_bind_address: BindAddress,
local_bind_address: BindAddress,
) {
if let Some(cb) = self
.anonymous_connections
.get_mut(&(local_bind_address, remote_bind_address))
{
cb.close().await;
}
}
pub fn print_status(&self) { pub fn print_status(&self) {
self.peers.iter().for_each(|(peerId, peerInfo)| { self.peers.iter().for_each(|(peerId, peerInfo)| {
log_info!("PEER in BROKER {:?} {:?}", peerId, peerInfo); log_info!("PEER in BROKER {:?} {:?}", peerId, peerInfo);
@ -525,5 +575,13 @@ impl Broker {
self.direct_connections.iter().for_each(|(ip, directCnx)| { self.direct_connections.iter().for_each(|(ip, directCnx)| {
log_info!("direct_connection in BROKER {:?} {:?}", ip, directCnx) log_info!("direct_connection in BROKER {:?} {:?}", ip, directCnx)
}); });
self.anonymous_connections.iter().for_each(|(binds, cb)| {
log_info!(
"ANONYMOUS remote {:?} local {:?} {:?}",
binds.1,
binds.0,
cb
);
});
} }
} }

@ -24,6 +24,7 @@ use crate::types::*;
use crate::utils::*; use crate::utils::*;
use async_std::stream::StreamExt; use async_std::stream::StreamExt;
use async_std::sync::Mutex; use async_std::sync::Mutex;
use futures::future::Either;
use futures::{channel::mpsc, select, Future, FutureExt, SinkExt}; use futures::{channel::mpsc, select, Future, FutureExt, SinkExt};
use noise_protocol::U8Array; use noise_protocol::U8Array;
use noise_protocol::{patterns::noise_xk, CipherState, HandshakeState}; use noise_protocol::{patterns::noise_xk, CipherState, HandshakeState};
@ -558,7 +559,8 @@ pub struct ConnectionBase {
receiver: Option<Sender<ConnectionCommand>>, receiver: Option<Sender<ConnectionCommand>>,
sender_tx: Option<Sender<ConnectionCommand>>, sender_tx: Option<Sender<ConnectionCommand>>,
receiver_tx: Option<Sender<ConnectionCommand>>, receiver_tx: Option<Sender<ConnectionCommand>>,
shutdown: Option<Receiver<NetError>>, shutdown: Option<Receiver<Either<NetError, PubKey>>>,
shutdown_sender: Option<Sender<Either<NetError, PubKey>>>,
dir: ConnectionDir, dir: ConnectionDir,
next_request_id: SequenceGenerator, next_request_id: SequenceGenerator,
tp: TransportProtocol, tp: TransportProtocol,
@ -575,6 +577,7 @@ impl ConnectionBase {
sender_tx: None, sender_tx: None,
receiver_tx: None, receiver_tx: None,
shutdown: None, shutdown: None,
shutdown_sender: None,
next_request_id: SequenceGenerator::new(1), next_request_id: SequenceGenerator::new(1),
dir, dir,
tp, tp,
@ -586,20 +589,31 @@ impl ConnectionBase {
self.tp self.tp
} }
pub fn take_shutdown(&mut self) -> Receiver<NetError> { pub fn take_shutdown(&mut self) -> Receiver<Either<NetError, PubKey>> {
self.shutdown.take().unwrap() self.shutdown.take().unwrap()
} }
pub async fn join_shutdown(&mut self) -> Result<(), NetError> { pub async fn join_shutdown(&mut self) -> Result<(), NetError> {
match self.take_shutdown().next().await { match self.take_shutdown().next().await {
Some(error) => Err(error), Some(Either::Left(error)) => Err(error),
Some(Either::Right(_)) => Ok(()),
None => Ok(()), None => Ok(()),
} }
} }
pub fn set_shutdown(&mut self) -> Sender<NetError> { pub async fn reset_shutdown(&self, remote_peer_id: PubKey) {
let (shutdown_sender, shutdown_receiver) = mpsc::unbounded::<NetError>(); let _ = self
.shutdown_sender
.as_ref()
.unwrap()
.send(Either::Right(remote_peer_id))
.await;
}
pub fn set_shutdown(&mut self) -> Sender<Either<NetError, PubKey>> {
let (shutdown_sender, shutdown_receiver) = mpsc::unbounded::<Either<NetError, PubKey>>();
self.shutdown = Some(shutdown_receiver); self.shutdown = Some(shutdown_receiver);
self.shutdown_sender = Some(shutdown_sender.clone());
shutdown_sender shutdown_sender
} }

Loading…
Cancel
Save