|
|
@ -13,10 +13,14 @@ use crate::types::*; |
|
|
|
use crate::utils::*; |
|
|
|
use crate::utils::*; |
|
|
|
use async_std::stream::StreamExt; |
|
|
|
use async_std::stream::StreamExt; |
|
|
|
use async_std::sync::Mutex; |
|
|
|
use async_std::sync::Mutex; |
|
|
|
|
|
|
|
use debug_print::debug_println; |
|
|
|
use futures::{channel::mpsc, select, Future, FutureExt, SinkExt}; |
|
|
|
use futures::{channel::mpsc, select, Future, FutureExt, SinkExt}; |
|
|
|
use noise_protocol::{CipherState, HandshakeState}; |
|
|
|
use noise_protocol::U8Array; |
|
|
|
|
|
|
|
use noise_protocol::{patterns::noise_xk, CipherState, HandshakeState}; |
|
|
|
|
|
|
|
use noise_rust_crypto::sensitive::Sensitive; |
|
|
|
use noise_rust_crypto::*; |
|
|
|
use noise_rust_crypto::*; |
|
|
|
use p2p_repo::types::{PrivKey, PubKey}; |
|
|
|
use p2p_repo::types::{PrivKey, PubKey}; |
|
|
|
|
|
|
|
use serde_bare::from_slice; |
|
|
|
use unique_id::sequence::SequenceGenerator; |
|
|
|
use unique_id::sequence::SequenceGenerator; |
|
|
|
use unique_id::Generator; |
|
|
|
use unique_id::Generator; |
|
|
|
use unique_id::GeneratorFromSeed; |
|
|
|
use unique_id::GeneratorFromSeed; |
|
|
@ -31,15 +35,26 @@ pub enum ConnectionCommand { |
|
|
|
|
|
|
|
|
|
|
|
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] |
|
|
|
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] |
|
|
|
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] |
|
|
|
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] |
|
|
|
pub trait IConnection: Send + Sync { |
|
|
|
pub trait IConnect: Send + Sync { |
|
|
|
async fn open( |
|
|
|
async fn open( |
|
|
|
&self, |
|
|
|
&self, |
|
|
|
ip: IP, |
|
|
|
ip: IP, |
|
|
|
peer_pubk: PrivKey, |
|
|
|
peer_privk: PrivKey, |
|
|
|
peer_privk: PubKey, |
|
|
|
peer_pubk: PubKey, |
|
|
|
remote_peer: DirectPeerId, |
|
|
|
remote_peer: DirectPeerId, |
|
|
|
) -> Result<ConnectionBase, NetError>; |
|
|
|
) -> Result<ConnectionBase, NetError>; |
|
|
|
async fn accept(&self) -> Result<ConnectionBase, NetError>; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] |
|
|
|
|
|
|
|
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] |
|
|
|
|
|
|
|
pub trait IAccept: Send + Sync { |
|
|
|
|
|
|
|
type Socket; |
|
|
|
|
|
|
|
async fn accept( |
|
|
|
|
|
|
|
&self, |
|
|
|
|
|
|
|
peer_privk: PrivKey, |
|
|
|
|
|
|
|
peer_pubk: PubKey, |
|
|
|
|
|
|
|
socket: Self::Socket, |
|
|
|
|
|
|
|
) -> Result<ConnectionBase, NetError>; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
#[derive(PartialEq, Debug, Clone)] |
|
|
|
#[derive(PartialEq, Debug, Clone)] |
|
|
@ -77,7 +92,11 @@ pub struct NoiseFSM { |
|
|
|
actors: Arc<Mutex<HashMap<i64, Sender<ConnectionCommand>>>>, |
|
|
|
actors: Arc<Mutex<HashMap<i64, Sender<ConnectionCommand>>>>, |
|
|
|
|
|
|
|
|
|
|
|
noise_handshake_state: Option<HandshakeState<X25519, ChaCha20Poly1305, Blake2b>>, |
|
|
|
noise_handshake_state: Option<HandshakeState<X25519, ChaCha20Poly1305, Blake2b>>, |
|
|
|
noise_cipher_state: Option<CipherState<ChaCha20Poly1305>>, |
|
|
|
noise_cipher_state_enc: Option<CipherState<ChaCha20Poly1305>>, |
|
|
|
|
|
|
|
noise_cipher_state_dec: Option<CipherState<ChaCha20Poly1305>>, |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from: PrivKey, |
|
|
|
|
|
|
|
to: Option<PubKey>, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
impl fmt::Debug for NoiseFSM { |
|
|
|
impl fmt::Debug for NoiseFSM { |
|
|
@ -101,6 +120,8 @@ impl NoiseFSM { |
|
|
|
dir: ConnectionDir, |
|
|
|
dir: ConnectionDir, |
|
|
|
actors: Arc<Mutex<HashMap<i64, Sender<ConnectionCommand>>>>, |
|
|
|
actors: Arc<Mutex<HashMap<i64, Sender<ConnectionCommand>>>>, |
|
|
|
sender: Sender<ConnectionCommand>, |
|
|
|
sender: Sender<ConnectionCommand>, |
|
|
|
|
|
|
|
from: PrivKey, |
|
|
|
|
|
|
|
to: Option<PubKey>, |
|
|
|
) -> Self { |
|
|
|
) -> Self { |
|
|
|
Self { |
|
|
|
Self { |
|
|
|
state: if tp == TransportProtocol::Local { |
|
|
|
state: if tp == TransportProtocol::Local { |
|
|
@ -112,16 +133,34 @@ impl NoiseFSM { |
|
|
|
actors, |
|
|
|
actors, |
|
|
|
sender, |
|
|
|
sender, |
|
|
|
noise_handshake_state: None, |
|
|
|
noise_handshake_state: None, |
|
|
|
noise_cipher_state: None, |
|
|
|
noise_cipher_state_enc: None, |
|
|
|
|
|
|
|
noise_cipher_state_dec: None, |
|
|
|
|
|
|
|
from, |
|
|
|
|
|
|
|
to, |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
fn decrypt(&mut self, ciphertext: &Noise) -> ProtocolMessage { |
|
|
|
fn decrypt(&mut self, ciphertext: &Noise) -> Result<ProtocolMessage, ProtocolError> { |
|
|
|
unimplemented!(); |
|
|
|
let ser = self |
|
|
|
|
|
|
|
.noise_cipher_state_dec |
|
|
|
|
|
|
|
.as_mut() |
|
|
|
|
|
|
|
.unwrap() |
|
|
|
|
|
|
|
.decrypt_vec(ciphertext.data()) |
|
|
|
|
|
|
|
.map_err(|e| ProtocolError::DecryptionError)?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Ok(from_slice::<ProtocolMessage>(&ser)?) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
fn encrypt(&mut self, plaintext: ProtocolMessage) -> Noise { |
|
|
|
fn encrypt(&mut self, plaintext: ProtocolMessage) -> Result<Noise, ProtocolError> { |
|
|
|
unimplemented!(); |
|
|
|
let ser = serde_bare::to_vec(&plaintext)?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let cipher = self |
|
|
|
|
|
|
|
.noise_cipher_state_enc |
|
|
|
|
|
|
|
.as_mut() |
|
|
|
|
|
|
|
.unwrap() |
|
|
|
|
|
|
|
.encrypt_vec(&ser); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Ok(Noise::V0(NoiseV0 { data: cipher })) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub async fn remove_actor(&self, id: i64) { |
|
|
|
pub async fn remove_actor(&self, id: i64) { |
|
|
@ -129,8 +168,8 @@ impl NoiseFSM { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub async fn send(&mut self, msg: ProtocolMessage) -> Result<(), ProtocolError> { |
|
|
|
pub async fn send(&mut self, msg: ProtocolMessage) -> Result<(), ProtocolError> { |
|
|
|
if self.state == FSMstate::AuthResult && self.noise_cipher_state.is_some() { |
|
|
|
if self.state == FSMstate::AuthResult && self.noise_cipher_state_enc.is_some() { |
|
|
|
let cipher = self.encrypt(msg); |
|
|
|
let cipher = self.encrypt(msg)?; |
|
|
|
self.sender |
|
|
|
self.sender |
|
|
|
.send(ConnectionCommand::Msg(ProtocolMessage::Noise(cipher))) |
|
|
|
.send(ConnectionCommand::Msg(ProtocolMessage::Noise(cipher))) |
|
|
|
.await; |
|
|
|
.await; |
|
|
@ -156,13 +195,13 @@ impl NoiseFSM { |
|
|
|
// }
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
|
|
pub fn step( |
|
|
|
pub async fn step( |
|
|
|
&mut self, |
|
|
|
&mut self, |
|
|
|
mut msg_opt: Option<ProtocolMessage>, |
|
|
|
mut msg_opt: Option<ProtocolMessage>, |
|
|
|
) -> Result<StepReply, ProtocolError> { |
|
|
|
) -> Result<StepReply, ProtocolError> { |
|
|
|
if self.noise_cipher_state.is_some() { |
|
|
|
if self.noise_cipher_state_dec.is_some() { |
|
|
|
if let Some(ProtocolMessage::Noise(noise)) = msg_opt.as_ref() { |
|
|
|
if let Some(ProtocolMessage::Noise(noise)) = msg_opt.as_ref() { |
|
|
|
let new = self.decrypt(noise); |
|
|
|
let new = self.decrypt(noise)?; |
|
|
|
msg_opt.replace(new); |
|
|
|
msg_opt.replace(new); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
return Err(ProtocolError::MustBeEncrypted); |
|
|
|
return Err(ProtocolError::MustBeEncrypted); |
|
|
@ -171,22 +210,157 @@ impl NoiseFSM { |
|
|
|
match self.state { |
|
|
|
match self.state { |
|
|
|
// TODO verify that ID is zero
|
|
|
|
// TODO verify that ID is zero
|
|
|
|
FSMstate::Local0 => { |
|
|
|
FSMstate::Local0 => { |
|
|
|
if let Some(msg) = msg_opt.as_ref() { |
|
|
|
// CLIENT LOCAL
|
|
|
|
|
|
|
|
if !self.dir.is_server() && msg_opt.is_none() { |
|
|
|
|
|
|
|
self.state = FSMstate::ClientHello; |
|
|
|
|
|
|
|
Box::new(Actor::<ClientHello, ServerHello>::new(0, true)); |
|
|
|
|
|
|
|
return Ok(StepReply::NONE); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// SERVER LOCAL
|
|
|
|
|
|
|
|
else if let Some(msg) = msg_opt.as_ref() { |
|
|
|
if self.dir.is_server() && msg.type_id() == ClientHello::Local.type_id() { |
|
|
|
if self.dir.is_server() && msg.type_id() == ClientHello::Local.type_id() { |
|
|
|
self.state = FSMstate::ServerHello; |
|
|
|
self.state = FSMstate::ServerHello; |
|
|
|
Box::new(Actor::<ClientHello, ServerHello>::new(msg.id(), false)); |
|
|
|
Box::new(Actor::<ClientHello, ServerHello>::new(msg.id(), false)); |
|
|
|
return Ok(StepReply::NONE); |
|
|
|
return Ok(StepReply::NONE); |
|
|
|
} |
|
|
|
} |
|
|
|
} else if !self.dir.is_server() && msg_opt.is_none() { |
|
|
|
} |
|
|
|
self.state = FSMstate::ClientHello; |
|
|
|
} |
|
|
|
Box::new(Actor::<ClientHello, ServerHello>::new(0, true)); |
|
|
|
FSMstate::Noise0 => { |
|
|
|
|
|
|
|
// CLIENT INITIALIZE NOISE
|
|
|
|
|
|
|
|
if !self.dir.is_server() && msg_opt.is_none() { |
|
|
|
|
|
|
|
let mut handshake = HandshakeState::<X25519, ChaCha20Poly1305, Blake2b>::new( |
|
|
|
|
|
|
|
noise_xk(), |
|
|
|
|
|
|
|
true, |
|
|
|
|
|
|
|
&[], |
|
|
|
|
|
|
|
Some(Sensitive::from_slice(self.from.slice())), |
|
|
|
|
|
|
|
None, |
|
|
|
|
|
|
|
Some(*self.to.unwrap().slice()), |
|
|
|
|
|
|
|
None, |
|
|
|
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let payload = handshake |
|
|
|
|
|
|
|
.write_message_vec(&[]) |
|
|
|
|
|
|
|
.map_err(|e| ProtocolError::NoiseHandshakeFailed)?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let noise = Noise::V0(NoiseV0 { data: payload }); |
|
|
|
|
|
|
|
self.sender.send(ConnectionCommand::Msg(noise.into())).await; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.noise_handshake_state = Some(handshake); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.state = FSMstate::Noise1; |
|
|
|
|
|
|
|
|
|
|
|
return Ok(StepReply::NONE); |
|
|
|
return Ok(StepReply::NONE); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// SERVER INITIALIZE NOISE
|
|
|
|
|
|
|
|
else if let Some(msg) = msg_opt.as_ref() { |
|
|
|
|
|
|
|
if self.dir.is_server() { |
|
|
|
|
|
|
|
if let ProtocolMessage::Noise(noise) = msg { |
|
|
|
|
|
|
|
let mut handshake = |
|
|
|
|
|
|
|
HandshakeState::<X25519, ChaCha20Poly1305, Blake2b>::new( |
|
|
|
|
|
|
|
noise_xk(), |
|
|
|
|
|
|
|
false, |
|
|
|
|
|
|
|
&[], |
|
|
|
|
|
|
|
Some(Sensitive::from_slice(self.from.slice())), |
|
|
|
|
|
|
|
None, |
|
|
|
|
|
|
|
None, |
|
|
|
|
|
|
|
None, |
|
|
|
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let payload = |
|
|
|
|
|
|
|
handshake.read_message_vec(noise.data()).map_err(|e| { |
|
|
|
|
|
|
|
debug_println!("{:?}", e); |
|
|
|
|
|
|
|
ProtocolError::NoiseHandshakeFailed |
|
|
|
|
|
|
|
})?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let noise = Noise::V0(NoiseV0 { data: payload }); |
|
|
|
|
|
|
|
self.sender.send(ConnectionCommand::Msg(noise.into())).await; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.noise_handshake_state = Some(handshake); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.state = FSMstate::Noise2; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return Ok(StepReply::NONE); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
FSMstate::Noise1 => { |
|
|
|
|
|
|
|
// CLIENT second round NOISE
|
|
|
|
|
|
|
|
if let Some(msg) = msg_opt.as_ref() { |
|
|
|
|
|
|
|
if !self.dir.is_server() { |
|
|
|
|
|
|
|
if let ProtocolMessage::Noise(noise) = msg { |
|
|
|
|
|
|
|
let handshake = self.noise_handshake_state.as_mut().unwrap(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let payload = handshake |
|
|
|
|
|
|
|
.read_message_vec(noise.data()) |
|
|
|
|
|
|
|
.map_err(|e| ProtocolError::NoiseHandshakeFailed)?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if !handshake.completed() { |
|
|
|
|
|
|
|
return Err(ProtocolError::NoiseHandshakeFailed); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let noise3 = ClientHello::Noise3(Noise::V0(NoiseV0 { data: payload })); |
|
|
|
|
|
|
|
self.sender |
|
|
|
|
|
|
|
.send(ConnectionCommand::Msg(noise3.into())) |
|
|
|
|
|
|
|
.await; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let ciphers = handshake.get_ciphers(); |
|
|
|
|
|
|
|
self.noise_cipher_state_enc = Some(ciphers.0); |
|
|
|
|
|
|
|
self.noise_cipher_state_dec = Some(ciphers.1); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.noise_handshake_state = None; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.state = FSMstate::ClientHello; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return Ok(StepReply::NONE); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
FSMstate::Noise2 => { |
|
|
|
|
|
|
|
// SERVER second round NOISE
|
|
|
|
|
|
|
|
if let Some(msg) = msg_opt.as_ref() { |
|
|
|
|
|
|
|
if self.dir.is_server() { |
|
|
|
|
|
|
|
if let ProtocolMessage::Start(StartProtocol::Client(ClientHello::Noise3( |
|
|
|
|
|
|
|
noise, |
|
|
|
|
|
|
|
))) = msg |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
let handshake = self.noise_handshake_state.as_mut().unwrap(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let _ = handshake |
|
|
|
|
|
|
|
.read_message_vec(noise.data()) |
|
|
|
|
|
|
|
.map_err(|e| ProtocolError::NoiseHandshakeFailed)?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if !handshake.completed() { |
|
|
|
|
|
|
|
return Err(ProtocolError::NoiseHandshakeFailed); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.to = Some(PubKey::Ed25519PubKey(handshake.get_rs().unwrap())); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let ciphers = handshake.get_ciphers(); |
|
|
|
|
|
|
|
self.noise_cipher_state_enc = Some(ciphers.1); |
|
|
|
|
|
|
|
self.noise_cipher_state_dec = Some(ciphers.0); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.noise_handshake_state = None; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let mut nonce_buf = [0u8; 32]; |
|
|
|
|
|
|
|
getrandom::getrandom(&mut nonce_buf).unwrap(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let server_hello = ServerHello::V0(ServerHelloV0 { |
|
|
|
|
|
|
|
nonce: nonce_buf.to_vec(), |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
self.sender |
|
|
|
|
|
|
|
.send(ConnectionCommand::Msg(server_hello.into())) |
|
|
|
|
|
|
|
.await; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.state = FSMstate::ServerHello; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return Ok(StepReply::NONE); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
FSMstate::Noise0 => {} |
|
|
|
FSMstate::Noise3 => {} |
|
|
|
FSMstate::Noise1 => {} |
|
|
|
|
|
|
|
FSMstate::Noise2 => {} |
|
|
|
|
|
|
|
FSMstate::Noise3 => {} //set noise_handshake_state to none
|
|
|
|
|
|
|
|
FSMstate::ExtRequest => {} |
|
|
|
FSMstate::ExtRequest => {} |
|
|
|
FSMstate::ExtResponse => {} |
|
|
|
FSMstate::ExtResponse => {} |
|
|
|
FSMstate::ClientHello => {} |
|
|
|
FSMstate::ClientHello => {} |
|
|
@ -284,64 +458,68 @@ impl ConnectionBase { |
|
|
|
) -> ResultSend<()> { |
|
|
|
) -> ResultSend<()> { |
|
|
|
while let Some(msg) = receiver.next().await { |
|
|
|
while let Some(msg) = receiver.next().await { |
|
|
|
log!("RECEIVED: {:?}", msg); |
|
|
|
log!("RECEIVED: {:?}", msg); |
|
|
|
|
|
|
|
match msg { |
|
|
|
if let ConnectionCommand::Close = msg { |
|
|
|
ConnectionCommand::Close |
|
|
|
log!("EXIT READ LOOP"); |
|
|
|
| ConnectionCommand::Error(_) |
|
|
|
break; |
|
|
|
| ConnectionCommand::ProtocolError(_) => { |
|
|
|
} else if let ConnectionCommand::Msg(proto_msg) = msg { |
|
|
|
log!("EXIT READ LOOP"); |
|
|
|
let res; |
|
|
|
break; |
|
|
|
{ |
|
|
|
|
|
|
|
let mut locked_fsm = fsm.lock().await; |
|
|
|
|
|
|
|
res = locked_fsm.step(Some(proto_msg)); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
match res { |
|
|
|
ConnectionCommand::Msg(proto_msg) => { |
|
|
|
Err(e) => { |
|
|
|
let res; |
|
|
|
if sender |
|
|
|
{ |
|
|
|
.send(ConnectionCommand::ProtocolError(e)) |
|
|
|
let mut locked_fsm = fsm.lock().await; |
|
|
|
.await |
|
|
|
res = locked_fsm.step(Some(proto_msg)).await; |
|
|
|
.is_err() |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
break; //TODO test that sending a ProtocolError effectively closes the connection (with ConnectionCommand::Close)
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
Ok(StepReply::NONE) => {} |
|
|
|
match res { |
|
|
|
Ok(StepReply::Responder(responder)) => { |
|
|
|
Err(e) => { |
|
|
|
let r = responder |
|
|
|
|
|
|
|
.get_actor() |
|
|
|
|
|
|
|
.respond(responder, Arc::clone(&fsm)) |
|
|
|
|
|
|
|
.await; |
|
|
|
|
|
|
|
if r.is_err() { |
|
|
|
|
|
|
|
if sender |
|
|
|
if sender |
|
|
|
.send(ConnectionCommand::ProtocolError(r.unwrap_err())) |
|
|
|
.send(ConnectionCommand::ProtocolError(e)) |
|
|
|
.await |
|
|
|
.await |
|
|
|
.is_err() |
|
|
|
.is_err() |
|
|
|
{ |
|
|
|
{ |
|
|
|
break; |
|
|
|
break; //TODO test that sending a ProtocolError effectively closes the connection (with ConnectionCommand::Close)
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
Ok(StepReply::NONE) => {} |
|
|
|
Ok(StepReply::Response(response)) => { |
|
|
|
Ok(StepReply::Responder(responder)) => { |
|
|
|
let mut lock = actors.lock().await; |
|
|
|
let r = responder |
|
|
|
let exists = lock.get_mut(&response.id()); |
|
|
|
.get_actor() |
|
|
|
match exists { |
|
|
|
.respond(responder, Arc::clone(&fsm)) |
|
|
|
Some(actor_sender) => { |
|
|
|
.await; |
|
|
|
if actor_sender |
|
|
|
if r.is_err() { |
|
|
|
.send(ConnectionCommand::Msg(response)) |
|
|
|
if sender |
|
|
|
|
|
|
|
.send(ConnectionCommand::ProtocolError(r.unwrap_err())) |
|
|
|
.await |
|
|
|
.await |
|
|
|
.is_err() |
|
|
|
.is_err() |
|
|
|
{ |
|
|
|
{ |
|
|
|
break; |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
None => { |
|
|
|
} |
|
|
|
if sender |
|
|
|
Ok(StepReply::Response(response)) => { |
|
|
|
.send(ConnectionCommand::ProtocolError( |
|
|
|
let mut lock = actors.lock().await; |
|
|
|
ProtocolError::ActorError, |
|
|
|
let exists = lock.get_mut(&response.id()); |
|
|
|
)) |
|
|
|
match exists { |
|
|
|
.await |
|
|
|
Some(actor_sender) => { |
|
|
|
.is_err() |
|
|
|
if actor_sender |
|
|
|
{ |
|
|
|
.send(ConnectionCommand::Msg(response)) |
|
|
|
break; |
|
|
|
.await |
|
|
|
|
|
|
|
.is_err() |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
break; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
None => { |
|
|
|
|
|
|
|
if sender |
|
|
|
|
|
|
|
.send(ConnectionCommand::ProtocolError( |
|
|
|
|
|
|
|
ProtocolError::ActorError, |
|
|
|
|
|
|
|
)) |
|
|
|
|
|
|
|
.await |
|
|
|
|
|
|
|
.is_err() |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
break; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -396,7 +574,19 @@ impl ConnectionBase { |
|
|
|
self.send(ConnectionCommand::Close).await; |
|
|
|
self.send(ConnectionCommand::Close).await; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub fn start_read_loop(&mut self) { |
|
|
|
pub async fn start(&mut self) { |
|
|
|
|
|
|
|
// BOOTSTRAP the protocol
|
|
|
|
|
|
|
|
if !self.dir.is_server() { |
|
|
|
|
|
|
|
let res; |
|
|
|
|
|
|
|
let fsm = self.fsm.as_ref().unwrap(); |
|
|
|
|
|
|
|
res = fsm.lock().await.step(None).await; |
|
|
|
|
|
|
|
if let Err(err) = res { |
|
|
|
|
|
|
|
self.send(ConnectionCommand::ProtocolError(err)).await; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pub fn start_read_loop(&mut self, from: PrivKey, to: Option<PubKey>) { |
|
|
|
let (sender_tx, sender_rx) = mpsc::unbounded(); |
|
|
|
let (sender_tx, sender_rx) = mpsc::unbounded(); |
|
|
|
let (receiver_tx, receiver_rx) = mpsc::unbounded(); |
|
|
|
let (receiver_tx, receiver_rx) = mpsc::unbounded(); |
|
|
|
self.sender = Some(sender_rx); |
|
|
|
self.sender = Some(sender_rx); |
|
|
@ -409,6 +599,8 @@ impl ConnectionBase { |
|
|
|
self.dir.clone(), |
|
|
|
self.dir.clone(), |
|
|
|
Arc::clone(&self.actors), |
|
|
|
Arc::clone(&self.actors), |
|
|
|
sender_tx.clone(), |
|
|
|
sender_tx.clone(), |
|
|
|
|
|
|
|
from, |
|
|
|
|
|
|
|
to, |
|
|
|
))); |
|
|
|
))); |
|
|
|
self.fsm = Some(Arc::clone(&fsm)); |
|
|
|
self.fsm = Some(Arc::clone(&fsm)); |
|
|
|
|
|
|
|
|
|
|
|