|
|
@ -22,9 +22,10 @@ use crate::errors::NetError; |
|
|
|
use crate::errors::ProtocolError; |
|
|
|
use crate::errors::ProtocolError; |
|
|
|
use crate::types::*; |
|
|
|
use crate::types::*; |
|
|
|
use crate::utils::*; |
|
|
|
use crate::utils::*; |
|
|
|
|
|
|
|
use async_std::future::TimeoutError; |
|
|
|
use async_std::stream::StreamExt; |
|
|
|
use async_std::stream::StreamExt; |
|
|
|
use async_std::sync::Mutex; |
|
|
|
use async_std::sync::Mutex; |
|
|
|
use futures::future::Either; |
|
|
|
use either::Either; |
|
|
|
use futures::{channel::mpsc, select, Future, FutureExt, SinkExt}; |
|
|
|
use futures::{channel::mpsc, select, Future, FutureExt, SinkExt}; |
|
|
|
use noise_protocol::U8Array; |
|
|
|
use noise_protocol::U8Array; |
|
|
|
use noise_protocol::{patterns::noise_xk, CipherState, HandshakeState}; |
|
|
|
use noise_protocol::{patterns::noise_xk, CipherState, HandshakeState}; |
|
|
@ -52,11 +53,14 @@ pub trait IConnect: Send + Sync { |
|
|
|
async fn open( |
|
|
|
async fn open( |
|
|
|
&self, |
|
|
|
&self, |
|
|
|
ip: IP, |
|
|
|
ip: IP, |
|
|
|
|
|
|
|
port: u16, |
|
|
|
peer_privk: Sensitive<[u8; 32]>, |
|
|
|
peer_privk: Sensitive<[u8; 32]>, |
|
|
|
peer_pubk: PubKey, |
|
|
|
peer_pubk: PubKey, |
|
|
|
remote_peer: DirectPeerId, |
|
|
|
remote_peer: DirectPeerId, |
|
|
|
config: StartConfig, |
|
|
|
config: StartConfig, |
|
|
|
) -> Result<ConnectionBase, NetError>; |
|
|
|
) -> Result<ConnectionBase, NetError>; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async fn probe(&self, ip: IP, port: u16) -> Result<Option<PubKey>, ProtocolError>; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] |
|
|
|
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] |
|
|
@ -87,6 +91,9 @@ impl ConnectionDir { |
|
|
|
#[derive(Debug, PartialEq)] |
|
|
|
#[derive(Debug, PartialEq)] |
|
|
|
pub enum FSMstate { |
|
|
|
pub enum FSMstate { |
|
|
|
Local0, |
|
|
|
Local0, |
|
|
|
|
|
|
|
Start, |
|
|
|
|
|
|
|
Probe, |
|
|
|
|
|
|
|
Relay, |
|
|
|
Noise0, |
|
|
|
Noise0, |
|
|
|
Noise1, |
|
|
|
Noise1, |
|
|
|
Noise2, |
|
|
|
Noise2, |
|
|
@ -97,6 +104,7 @@ pub enum FSMstate { |
|
|
|
ServerHello, |
|
|
|
ServerHello, |
|
|
|
ClientAuth, |
|
|
|
ClientAuth, |
|
|
|
AuthResult, |
|
|
|
AuthResult, |
|
|
|
|
|
|
|
Closing, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub struct NoiseFSM { |
|
|
|
pub struct NoiseFSM { |
|
|
@ -104,6 +112,7 @@ pub struct NoiseFSM { |
|
|
|
dir: ConnectionDir, |
|
|
|
dir: ConnectionDir, |
|
|
|
sender: Sender<ConnectionCommand>, |
|
|
|
sender: Sender<ConnectionCommand>, |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// first is local, second is remote
|
|
|
|
bind_addresses: Option<(BindAddress, BindAddress)>, |
|
|
|
bind_addresses: Option<(BindAddress, BindAddress)>, |
|
|
|
|
|
|
|
|
|
|
|
actors: Arc<Mutex<HashMap<i64, Sender<ConnectionCommand>>>>, |
|
|
|
actors: Arc<Mutex<HashMap<i64, Sender<ConnectionCommand>>>>, |
|
|
@ -112,8 +121,8 @@ pub struct NoiseFSM { |
|
|
|
noise_cipher_state_enc: Option<CipherState<ChaCha20Poly1305>>, |
|
|
|
noise_cipher_state_enc: Option<CipherState<ChaCha20Poly1305>>, |
|
|
|
noise_cipher_state_dec: Option<CipherState<ChaCha20Poly1305>>, |
|
|
|
noise_cipher_state_dec: Option<CipherState<ChaCha20Poly1305>>, |
|
|
|
|
|
|
|
|
|
|
|
from: Option<Sensitive<[u8; 32]>>, |
|
|
|
local: Option<Sensitive<[u8; 32]>>, |
|
|
|
to: Option<PubKey>, |
|
|
|
remote: Option<PubKey>, |
|
|
|
|
|
|
|
|
|
|
|
nonce_for_hello: Vec<u8>, |
|
|
|
nonce_for_hello: Vec<u8>, |
|
|
|
config: Option<StartConfig>, |
|
|
|
config: Option<StartConfig>, |
|
|
@ -132,6 +141,7 @@ pub enum StepReply { |
|
|
|
Responder(ProtocolMessage), |
|
|
|
Responder(ProtocolMessage), |
|
|
|
Response(ProtocolMessage), |
|
|
|
Response(ProtocolMessage), |
|
|
|
NONE, |
|
|
|
NONE, |
|
|
|
|
|
|
|
CloseNow, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub struct ClientConfig { |
|
|
|
pub struct ClientConfig { |
|
|
@ -147,6 +157,8 @@ pub struct CoreConfig {} |
|
|
|
pub struct AdminConfig {} |
|
|
|
pub struct AdminConfig {} |
|
|
|
|
|
|
|
|
|
|
|
pub enum StartConfig { |
|
|
|
pub enum StartConfig { |
|
|
|
|
|
|
|
Probe, |
|
|
|
|
|
|
|
Relay(BindAddress), |
|
|
|
Client(ClientConfig), |
|
|
|
Client(ClientConfig), |
|
|
|
Ext(ExtConfig), |
|
|
|
Ext(ExtConfig), |
|
|
|
Core(CoreConfig), |
|
|
|
Core(CoreConfig), |
|
|
@ -160,14 +172,14 @@ impl NoiseFSM { |
|
|
|
dir: ConnectionDir, |
|
|
|
dir: ConnectionDir, |
|
|
|
actors: Arc<Mutex<HashMap<i64, Sender<ConnectionCommand>>>>, |
|
|
|
actors: Arc<Mutex<HashMap<i64, Sender<ConnectionCommand>>>>, |
|
|
|
sender: Sender<ConnectionCommand>, |
|
|
|
sender: Sender<ConnectionCommand>, |
|
|
|
from: Sensitive<[u8; 32]>, |
|
|
|
local: Option<Sensitive<[u8; 32]>>, |
|
|
|
to: Option<PubKey>, |
|
|
|
remote: Option<PubKey>, |
|
|
|
) -> Self { |
|
|
|
) -> Self { |
|
|
|
Self { |
|
|
|
Self { |
|
|
|
state: if tp == TransportProtocol::Local { |
|
|
|
state: if tp == TransportProtocol::Local { |
|
|
|
FSMstate::Local0 |
|
|
|
FSMstate::Local0 |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
FSMstate::Noise0 |
|
|
|
FSMstate::Start |
|
|
|
}, |
|
|
|
}, |
|
|
|
dir, |
|
|
|
dir, |
|
|
|
bind_addresses, |
|
|
|
bind_addresses, |
|
|
@ -176,8 +188,8 @@ impl NoiseFSM { |
|
|
|
noise_handshake_state: None, |
|
|
|
noise_handshake_state: None, |
|
|
|
noise_cipher_state_enc: None, |
|
|
|
noise_cipher_state_enc: None, |
|
|
|
noise_cipher_state_dec: None, |
|
|
|
noise_cipher_state_dec: None, |
|
|
|
from: Some(from), |
|
|
|
local, |
|
|
|
to, |
|
|
|
remote, |
|
|
|
nonce_for_hello: vec![], |
|
|
|
nonce_for_hello: vec![], |
|
|
|
config: None, |
|
|
|
config: None, |
|
|
|
} |
|
|
|
} |
|
|
@ -211,7 +223,7 @@ impl NoiseFSM { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub async fn send(&mut self, msg: ProtocolMessage) -> Result<(), ProtocolError> { |
|
|
|
pub async fn send(&mut self, msg: ProtocolMessage) -> Result<(), ProtocolError> { |
|
|
|
log_trace!("SENDING: {:?}", msg); |
|
|
|
log_debug!("SENDING: {:?}", msg); |
|
|
|
if self.noise_cipher_state_enc.is_some() { |
|
|
|
if self.noise_cipher_state_enc.is_some() { |
|
|
|
let cipher = self.encrypt(msg)?; |
|
|
|
let cipher = self.encrypt(msg)?; |
|
|
|
self.sender |
|
|
|
self.sender |
|
|
@ -244,6 +256,37 @@ impl NoiseFSM { |
|
|
|
// }
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async fn process_server_noise0(&mut self, noise: &Noise) -> Result<StepReply, ProtocolError> { |
|
|
|
|
|
|
|
let mut handshake = HandshakeState::<X25519, ChaCha20Poly1305, Blake2b>::new( |
|
|
|
|
|
|
|
noise_xk(), |
|
|
|
|
|
|
|
false, |
|
|
|
|
|
|
|
&[], |
|
|
|
|
|
|
|
Some(from_ed_priv_to_dh_priv(self.local.take().unwrap())), |
|
|
|
|
|
|
|
None, |
|
|
|
|
|
|
|
None, |
|
|
|
|
|
|
|
None, |
|
|
|
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let mut payload = handshake.read_message_vec(noise.data()).map_err(|e| { |
|
|
|
|
|
|
|
log_debug!("{:?}", e); |
|
|
|
|
|
|
|
ProtocolError::NoiseHandshakeFailed |
|
|
|
|
|
|
|
})?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
payload = handshake.write_message_vec(&payload).map_err(|e| { |
|
|
|
|
|
|
|
log_debug!("{:?}", e); |
|
|
|
|
|
|
|
ProtocolError::NoiseHandshakeFailed |
|
|
|
|
|
|
|
})?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let noise = Noise::V0(NoiseV0 { data: payload }); |
|
|
|
|
|
|
|
self.send(noise.into()).await?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.noise_handshake_state = Some(handshake); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.state = FSMstate::Noise2; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return Ok(StepReply::NONE); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub async fn step( |
|
|
|
pub async fn step( |
|
|
|
&mut self, |
|
|
|
&mut self, |
|
|
|
mut msg_opt: Option<ProtocolMessage>, |
|
|
|
mut msg_opt: Option<ProtocolMessage>, |
|
|
@ -257,9 +300,10 @@ impl NoiseFSM { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
if msg_opt.is_some() { |
|
|
|
if msg_opt.is_some() { |
|
|
|
log_trace!("RECEIVED: {:?}", msg_opt.as_ref().unwrap()); |
|
|
|
log_debug!("RECEIVED: {:?}", msg_opt.as_ref().unwrap()); |
|
|
|
} |
|
|
|
} |
|
|
|
match self.state { |
|
|
|
match self.state { |
|
|
|
|
|
|
|
FSMstate::Closing => {} |
|
|
|
// TODO verify that ID is zero
|
|
|
|
// TODO verify that ID is zero
|
|
|
|
FSMstate::Local0 => { |
|
|
|
FSMstate::Local0 => { |
|
|
|
// CLIENT LOCAL
|
|
|
|
// CLIENT LOCAL
|
|
|
@ -277,68 +321,125 @@ impl NoiseFSM { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
FSMstate::Noise0 => { |
|
|
|
FSMstate::Start => { |
|
|
|
// CLIENT INITIALIZE NOISE
|
|
|
|
|
|
|
|
if !self.dir.is_server() && msg_opt.is_none() { |
|
|
|
if !self.dir.is_server() && msg_opt.is_none() { |
|
|
|
let mut handshake = HandshakeState::<X25519, ChaCha20Poly1305, Blake2b>::new( |
|
|
|
// CLIENT START
|
|
|
|
noise_xk(), |
|
|
|
match self.config.as_ref().unwrap() { |
|
|
|
true, |
|
|
|
StartConfig::Probe => { |
|
|
|
&[], |
|
|
|
// PROBE REQUEST
|
|
|
|
Some(from_ed_priv_to_dh_priv(self.from.take().unwrap())), |
|
|
|
let request = ProtocolMessage::Probe(MAGIC_NG_REQUEST); |
|
|
|
None, |
|
|
|
self.send(request).await?; |
|
|
|
Some(*self.to.unwrap().to_dh_from_ed().slice()), |
|
|
|
self.state = FSMstate::Probe; |
|
|
|
None, |
|
|
|
return Ok(StepReply::NONE); |
|
|
|
); |
|
|
|
} |
|
|
|
|
|
|
|
StartConfig::Relay(relay_to) => { |
|
|
|
let payload = handshake |
|
|
|
// RELAY REQUEST
|
|
|
|
.write_message_vec(&[]) |
|
|
|
//self.state
|
|
|
|
.map_err(|e| ProtocolError::NoiseHandshakeFailed)?; |
|
|
|
todo!(); |
|
|
|
|
|
|
|
} |
|
|
|
let noise = Noise::V0(NoiseV0 { data: payload }); |
|
|
|
_ => { |
|
|
|
self.send(noise.into()).await?; |
|
|
|
// CLIENT INITIALIZE NOISE
|
|
|
|
|
|
|
|
|
|
|
|
self.noise_handshake_state = Some(handshake); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.state = FSMstate::Noise1; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return Ok(StepReply::NONE); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// SERVER INITIALIZE NOISE
|
|
|
|
|
|
|
|
else if let Some(msg) = msg_opt.as_ref() { |
|
|
|
|
|
|
|
if self.dir.is_server() { |
|
|
|
|
|
|
|
if let ProtocolMessage::Noise(noise) = msg { |
|
|
|
|
|
|
|
let mut handshake = |
|
|
|
let mut handshake = |
|
|
|
HandshakeState::<X25519, ChaCha20Poly1305, Blake2b>::new( |
|
|
|
HandshakeState::<X25519, ChaCha20Poly1305, Blake2b>::new( |
|
|
|
noise_xk(), |
|
|
|
noise_xk(), |
|
|
|
false, |
|
|
|
true, |
|
|
|
&[], |
|
|
|
&[], |
|
|
|
Some(from_ed_priv_to_dh_priv(self.from.take().unwrap())), |
|
|
|
Some(from_ed_priv_to_dh_priv(self.local.take().unwrap())), |
|
|
|
None, |
|
|
|
|
|
|
|
None, |
|
|
|
None, |
|
|
|
|
|
|
|
Some(*self.remote.unwrap().to_dh_from_ed().slice()), |
|
|
|
None, |
|
|
|
None, |
|
|
|
); |
|
|
|
); |
|
|
|
|
|
|
|
|
|
|
|
let mut payload = |
|
|
|
let payload = handshake |
|
|
|
handshake.read_message_vec(noise.data()).map_err(|e| { |
|
|
|
.write_message_vec(&[]) |
|
|
|
log_debug!("{:?}", e); |
|
|
|
.map_err(|e| ProtocolError::NoiseHandshakeFailed)?; |
|
|
|
ProtocolError::NoiseHandshakeFailed |
|
|
|
|
|
|
|
})?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
payload = handshake.write_message_vec(&payload).map_err(|e| { |
|
|
|
|
|
|
|
log_debug!("{:?}", e); |
|
|
|
|
|
|
|
ProtocolError::NoiseHandshakeFailed |
|
|
|
|
|
|
|
})?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let noise = Noise::V0(NoiseV0 { data: payload }); |
|
|
|
let noise = Noise::V0(NoiseV0 { data: payload }); |
|
|
|
self.send(noise.into()).await?; |
|
|
|
self.send(noise.into()).await?; |
|
|
|
|
|
|
|
|
|
|
|
self.noise_handshake_state = Some(handshake); |
|
|
|
self.noise_handshake_state = Some(handshake); |
|
|
|
|
|
|
|
|
|
|
|
self.state = FSMstate::Noise2; |
|
|
|
self.state = FSMstate::Noise1; |
|
|
|
|
|
|
|
|
|
|
|
return Ok(StepReply::NONE); |
|
|
|
return Ok(StepReply::NONE); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
#[cfg(not(target_arch = "wasm32"))] |
|
|
|
|
|
|
|
if let Some(msg) = msg_opt.as_ref() { |
|
|
|
|
|
|
|
if self.dir.is_server() { |
|
|
|
|
|
|
|
// SERVER START
|
|
|
|
|
|
|
|
match msg { |
|
|
|
|
|
|
|
ProtocolMessage::Probe(magic) => { |
|
|
|
|
|
|
|
// PROBE REQUEST
|
|
|
|
|
|
|
|
if *magic != MAGIC_NG_REQUEST { |
|
|
|
|
|
|
|
return Err(ProtocolError::WhereIsTheMagic); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
let mut probe_response = ProbeResponse { |
|
|
|
|
|
|
|
magic: MAGIC_NG_RESPONSE.to_vec(), |
|
|
|
|
|
|
|
peer_id: None, |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
if BROKER |
|
|
|
|
|
|
|
.read() |
|
|
|
|
|
|
|
.await |
|
|
|
|
|
|
|
.authorize( |
|
|
|
|
|
|
|
&self |
|
|
|
|
|
|
|
.bind_addresses |
|
|
|
|
|
|
|
.ok_or(ProtocolError::BrokerError)? |
|
|
|
|
|
|
|
.1, |
|
|
|
|
|
|
|
Authorization::Discover, |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
.is_ok() |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
probe_response.peer_id = |
|
|
|
|
|
|
|
Some(ed_sensitive_privkey_to_pubkey( |
|
|
|
|
|
|
|
self.local |
|
|
|
|
|
|
|
.as_ref() |
|
|
|
|
|
|
|
.ok_or(ProtocolError::BrokerError)?, |
|
|
|
|
|
|
|
)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
self.send(ProtocolMessage::ProbeResponse(probe_response)) |
|
|
|
|
|
|
|
.await?; |
|
|
|
|
|
|
|
self.state = FSMstate::Closing; |
|
|
|
|
|
|
|
sleep!(std::time::Duration::from_secs(2)); |
|
|
|
|
|
|
|
return Ok(StepReply::CloseNow); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
ProtocolMessage::Relay(_) => { |
|
|
|
|
|
|
|
todo!(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
ProtocolMessage::Tunnel(_) => { |
|
|
|
|
|
|
|
self.state = FSMstate::Noise1; |
|
|
|
|
|
|
|
todo!(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
ProtocolMessage::Noise(noise) => { |
|
|
|
|
|
|
|
// SERVER INITIALIZE NOISE
|
|
|
|
|
|
|
|
return self.process_server_noise0(noise).await; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
_ => return Err(ProtocolError::InvalidState), |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
FSMstate::Probe => { |
|
|
|
|
|
|
|
// CLIENT side receiving probe response
|
|
|
|
|
|
|
|
if let Some(msg) = msg_opt { |
|
|
|
|
|
|
|
let id = msg.id(); |
|
|
|
|
|
|
|
if id != 0 { |
|
|
|
|
|
|
|
return Err(ProtocolError::InvalidState); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if let ProtocolMessage::ProbeResponse(probe_res) = &msg { |
|
|
|
|
|
|
|
return Ok(StepReply::Response(msg)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
FSMstate::Relay => {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
FSMstate::Noise0 => { |
|
|
|
|
|
|
|
if let Some(ProtocolMessage::Noise(noise)) = msg_opt.as_ref() { |
|
|
|
|
|
|
|
if self.dir.is_server() { |
|
|
|
|
|
|
|
return self.process_server_noise0(noise).await; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
FSMstate::Noise1 => { |
|
|
|
FSMstate::Noise1 => { |
|
|
@ -379,6 +480,7 @@ impl NoiseFSM { |
|
|
|
StartConfig::Admin(admin_config) => { |
|
|
|
StartConfig::Admin(admin_config) => { |
|
|
|
todo!(); |
|
|
|
todo!(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
_ => return Err(ProtocolError::InvalidState), |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
self.noise_cipher_state_enc = Some(ciphers.0); |
|
|
|
self.noise_cipher_state_enc = Some(ciphers.0); |
|
|
@ -409,7 +511,7 @@ impl NoiseFSM { |
|
|
|
return Err(ProtocolError::NoiseHandshakeFailed); |
|
|
|
return Err(ProtocolError::NoiseHandshakeFailed); |
|
|
|
} |
|
|
|
} |
|
|
|
let peer_id = PubKey::Ed25519PubKey(handshake.get_rs().unwrap()); |
|
|
|
let peer_id = PubKey::Ed25519PubKey(handshake.get_rs().unwrap()); |
|
|
|
self.to = Some(peer_id); |
|
|
|
self.remote = Some(peer_id); |
|
|
|
let (local_bind_address, remote_bind_address) = |
|
|
|
let (local_bind_address, remote_bind_address) = |
|
|
|
self.bind_addresses.ok_or(ProtocolError::BrokerError)?; |
|
|
|
self.bind_addresses.ok_or(ProtocolError::BrokerError)?; |
|
|
|
BROKER |
|
|
|
BROKER |
|
|
@ -601,10 +703,14 @@ impl ConnectionBase { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub async fn reset_shutdown(&self, remote_peer_id: PubKey) { |
|
|
|
pub async fn release_shutdown(&mut self) { |
|
|
|
|
|
|
|
self.shutdown_sender = None; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pub async fn reset_shutdown(&mut self, remote_peer_id: PubKey) { |
|
|
|
let _ = self |
|
|
|
let _ = self |
|
|
|
.shutdown_sender |
|
|
|
.shutdown_sender |
|
|
|
.as_ref() |
|
|
|
.take() |
|
|
|
.unwrap() |
|
|
|
.unwrap() |
|
|
|
.send(Either::Right(remote_peer_id)) |
|
|
|
.send(Either::Right(remote_peer_id)) |
|
|
|
.await; |
|
|
|
.await; |
|
|
@ -663,6 +769,10 @@ impl ConnectionBase { |
|
|
|
break; //TODO test that sending a ProtocolError effectively closes the connection (with ConnectionCommand::Close)
|
|
|
|
break; //TODO test that sending a ProtocolError effectively closes the connection (with ConnectionCommand::Close)
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
Ok(StepReply::CloseNow) => { |
|
|
|
|
|
|
|
let _ = sender.send(ConnectionCommand::Close).await; |
|
|
|
|
|
|
|
break; |
|
|
|
|
|
|
|
} |
|
|
|
Ok(StepReply::NONE) => {} |
|
|
|
Ok(StepReply::NONE) => {} |
|
|
|
Ok(StepReply::Responder(responder)) => { |
|
|
|
Ok(StepReply::Responder(responder)) => { |
|
|
|
let r = responder |
|
|
|
let r = responder |
|
|
@ -756,6 +866,63 @@ impl ConnectionBase { |
|
|
|
self.send(ConnectionCommand::Close).await; |
|
|
|
self.send(ConnectionCommand::Close).await; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pub async fn probe(&mut self) -> Result<Option<PubKey>, ProtocolError> { |
|
|
|
|
|
|
|
if !self.dir.is_server() { |
|
|
|
|
|
|
|
let config = StartConfig::Probe; |
|
|
|
|
|
|
|
let mut actor = Box::new(Actor::<Probe, ProbeResponse>::new(0, true)); |
|
|
|
|
|
|
|
self.actors.lock().await.insert(0, actor.get_receiver_tx()); |
|
|
|
|
|
|
|
let res; |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
let mut fsm = self.fsm.as_ref().unwrap().lock().await; |
|
|
|
|
|
|
|
fsm.config = Some(config); |
|
|
|
|
|
|
|
res = fsm.step(None).await; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if let Err(err) = res { |
|
|
|
|
|
|
|
self.send(ConnectionCommand::ProtocolError(err.clone())) |
|
|
|
|
|
|
|
.await; |
|
|
|
|
|
|
|
return Err(err); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
let mut receiver = actor.detach_receiver(); |
|
|
|
|
|
|
|
let mut shutdown = self.take_shutdown(); |
|
|
|
|
|
|
|
select! { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
res = async_std::future::timeout(std::time::Duration::from_secs(2),receiver.next()).fuse() => { |
|
|
|
|
|
|
|
self.fsm |
|
|
|
|
|
|
|
.as_mut() |
|
|
|
|
|
|
|
.unwrap() |
|
|
|
|
|
|
|
.lock() |
|
|
|
|
|
|
|
.await |
|
|
|
|
|
|
|
.remove_actor(0) |
|
|
|
|
|
|
|
.await; |
|
|
|
|
|
|
|
match res { |
|
|
|
|
|
|
|
Ok(Some(ConnectionCommand::Msg(ProtocolMessage::ProbeResponse(res)))) => { |
|
|
|
|
|
|
|
if res.magic == MAGIC_NG_RESPONSE { |
|
|
|
|
|
|
|
self.close().await; |
|
|
|
|
|
|
|
return Ok(res.peer_id); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
Err(_) => {} |
|
|
|
|
|
|
|
_ => {} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
self.close().await; |
|
|
|
|
|
|
|
return Err(ProtocolError::WhereIsTheMagic); |
|
|
|
|
|
|
|
}, |
|
|
|
|
|
|
|
r = shutdown.next().fuse() => { |
|
|
|
|
|
|
|
self.fsm |
|
|
|
|
|
|
|
.as_mut() |
|
|
|
|
|
|
|
.unwrap() |
|
|
|
|
|
|
|
.lock() |
|
|
|
|
|
|
|
.await |
|
|
|
|
|
|
|
.remove_actor(0) |
|
|
|
|
|
|
|
.await; |
|
|
|
|
|
|
|
return Err(ProtocolError::Closing); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
panic!("cannot call probe on a server-side connection"); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub async fn start(&mut self, config: StartConfig) { |
|
|
|
pub async fn start(&mut self, config: StartConfig) { |
|
|
|
// BOOTSTRAP the protocol from client-side
|
|
|
|
// BOOTSTRAP the protocol from client-side
|
|
|
|
if !self.dir.is_server() { |
|
|
|
if !self.dir.is_server() { |
|
|
@ -776,8 +943,8 @@ impl ConnectionBase { |
|
|
|
pub fn start_read_loop( |
|
|
|
pub fn start_read_loop( |
|
|
|
&mut self, |
|
|
|
&mut self, |
|
|
|
bind_addresses: Option<(BindAddress, BindAddress)>, |
|
|
|
bind_addresses: Option<(BindAddress, BindAddress)>, |
|
|
|
from: Sensitive<[u8; 32]>, |
|
|
|
local: Option<Sensitive<[u8; 32]>>, |
|
|
|
to: Option<PubKey>, |
|
|
|
remote: Option<PubKey>, |
|
|
|
) { |
|
|
|
) { |
|
|
|
let (sender_tx, sender_rx) = mpsc::unbounded(); |
|
|
|
let (sender_tx, sender_rx) = mpsc::unbounded(); |
|
|
|
let (receiver_tx, receiver_rx) = mpsc::unbounded(); |
|
|
|
let (receiver_tx, receiver_rx) = mpsc::unbounded(); |
|
|
@ -792,8 +959,8 @@ impl ConnectionBase { |
|
|
|
self.dir.clone(), |
|
|
|
self.dir.clone(), |
|
|
|
Arc::clone(&self.actors), |
|
|
|
Arc::clone(&self.actors), |
|
|
|
sender_tx.clone(), |
|
|
|
sender_tx.clone(), |
|
|
|
from, |
|
|
|
local, |
|
|
|
to, |
|
|
|
remote, |
|
|
|
))); |
|
|
|
))); |
|
|
|
self.fsm = Some(Arc::clone(&fsm)); |
|
|
|
self.fsm = Some(Arc::clone(&fsm)); |
|
|
|
|
|
|
|
|
|
|
|