From e80c87d925f4277f77d55314cff366ed901c198f Mon Sep 17 00:00:00 2001 From: Niko PLP Date: Tue, 27 Jun 2023 03:42:04 +0300 Subject: [PATCH] added probe of broker --- Cargo.lock | 2 + ng-sdk-js/src/lib.rs | 23 ++- p2p-broker/src/server_ws.rs | 26 ++- p2p-client-ws/Cargo.toml | 1 + p2p-client-ws/src/remote_ws.rs | 84 ++++++-- p2p-client-ws/src/remote_ws_wasm.rs | 41 +++- p2p-net/Cargo.toml | 3 +- p2p-net/src/actor.rs | 4 + p2p-net/src/actors/mod.rs | 3 + p2p-net/src/actors/probe.rs | 70 +++++++ p2p-net/src/broker.rs | 57 +++++- p2p-net/src/connection.rs | 287 ++++++++++++++++++++++------ p2p-net/src/errors.rs | 1 + p2p-net/src/types.rs | 95 ++++++++- p2p-net/src/utils.rs | 7 + 15 files changed, 600 insertions(+), 104 deletions(-) create mode 100644 p2p-net/src/actors/probe.rs diff --git a/Cargo.lock b/Cargo.lock index e7aa4a2..a437247 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3011,6 +3011,7 @@ dependencies = [ "async-trait", "async-tungstenite", "chacha20", + "either", "futures", "getrandom 0.2.10", "p2p-net", @@ -3035,6 +3036,7 @@ dependencies = [ "blake3", "default-net", "ed25519-dalek", + "either", "futures", "getrandom 0.2.10", "noise-protocol", diff --git a/ng-sdk-js/src/lib.rs b/ng-sdk-js/src/lib.rs index 1dedfab..c6cad78 100644 --- a/ng-sdk-js/src/lib.rs +++ b/ng-sdk-js/src/lib.rs @@ -23,6 +23,7 @@ use p2p_net::broker::*; use p2p_net::connection::{ClientConfig, StartConfig}; use p2p_net::types::{DirectPeerId, IP}; use p2p_net::utils::{gen_ed_keys, spawn_and_log_error, Receiver, ResultSend, Sender}; +use p2p_net::WS_PORT; use p2p_repo::log::*; use p2p_repo::types::*; use p2p_repo::utils::generate_keypair; @@ -198,6 +199,23 @@ pub async fn doc_sync_branch(anuri: String, callback: &js_sys::Function) -> JsVa return ret; } +#[cfg(target_arch = "wasm32")] +#[wasm_bindgen] +pub async fn probe() { + let res = BROKER + .write() + .await + .probe( + Box::new(ConnectionWebSocket {}), + IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(), + WS_PORT, + ) + .await; + log_info!("broker.probe : {:?}", res); + + Broker::join_shutdown_with_timeout(std::time::Duration::from_secs(5)).await; +} + #[cfg(target_arch = "wasm32")] #[wasm_bindgen] pub async fn start() { @@ -207,7 +225,7 @@ pub async fn start() { // getrandom::getrandom(&mut random_buf).unwrap(); async fn inner_task() -> ResultSend<()> { - let server_key: PubKey = "KWdmwr4_oO62IFGfKzuyotQOixqXGNWv59CRAGvPTjM".try_into()?; + let server_key: PubKey = "X0nh-gOTGKSx0yL0LYJviOWRNacyqIzjQW_LKdK6opU".try_into()?; log_debug!("server_key:{}", server_key); //let keys = p2p_net::utils::gen_dh_keys(); @@ -227,6 +245,7 @@ pub async fn start() { .connect( Box::new(ConnectionWebSocket {}), IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(), + WS_PORT, None, keys.0, keys.1, @@ -290,10 +309,12 @@ pub fn change(name: &str) -> JsValue { mod test { use wasm_bindgen_test::*; wasm_bindgen_test_configure!(run_in_browser); + use crate::probe; use crate::start; #[wasm_bindgen_test] pub async fn test_connection() { + //probe().await; start().await; } } diff --git a/p2p-broker/src/server_ws.rs b/p2p-broker/src/server_ws.rs index 43c4fe2..8525664 100644 --- a/p2p-broker/src/server_ws.rs +++ b/p2p-broker/src/server_ws.rs @@ -80,6 +80,13 @@ fn make_error(code: StatusCode) -> ErrorResponse { Response::builder().status(code).body(None).unwrap() } +fn check_no_origin(origin: Option<&HeaderValue>) -> Result<(), ErrorResponse> { + match origin { + Some(_) => Err(make_error(StatusCode::FORBIDDEN)), + None => Ok(()), + } +} + fn check_origin_is_url( origin: Option<&HeaderValue>, domains: Vec, @@ -271,7 +278,7 @@ fn upgrade_ws_or_serve_app( const LOCAL_HOSTS: [&str; 3] = ["localhost", "127.0.0.1", "[::1]"]; const LOCAL_URLS: [&str; 3] = ["http://localhost", "http://127.0.0.1", "http://[::1]"]; -const APP_NG_ONE_URL: &str = "https://app.nextgraph.one"; +//const APP_NG_ONE_URL: &str = "https://app.nextgraph.one"; impl Callback for SecurityCallback { fn on_request(self, request: &Request) -> Result<(), ErrorResponse> { @@ -335,11 +342,12 @@ impl Callback for SecurityCallback { return Err(make_error(StatusCode::FORBIDDEN)); } check_no_xff(xff)?; - let mut urls_str = vec![]; - if !listener.config.refuse_clients { - urls_str.push(APP_NG_ONE_URL.to_string()); - } - check_origin_is_url(origin, urls_str)?; + check_no_origin(origin)?; + // let mut urls_str = vec![]; + // if !listener.config.refuse_clients { + // urls_str.push(APP_NG_ONE_URL.to_string()); + // } + // check_origin_is_url(origin, urls_str)?; check_host_in_addrs(host, &listener.addrs)?; log_debug!( @@ -428,9 +436,9 @@ impl Callback for SecurityCallback { .accept_forward_for .get_public_bind_addresses(); let mut urls_str = vec![]; - if !listener.config.refuse_clients { - urls_str.push(APP_NG_ONE_URL.to_string()); - } + // if !listener.config.refuse_clients { + // urls_str.push(APP_NG_ONE_URL.to_string()); + // } if listener.config.accept_direct { addrs.extend(&listener.addrs); urls_str = [ diff --git a/p2p-client-ws/Cargo.toml b/p2p-client-ws/Cargo.toml index f589725..565178e 100644 --- a/p2p-client-ws/Cargo.toml +++ b/p2p-client-ws/Cargo.toml @@ -22,6 +22,7 @@ async-oneshot = "0.5.0" ws_stream_wasm = "0.7" pharos = "0.5" wasm-bindgen = "0.2" +either = "1.8.1" [dev-dependencies] wasm-bindgen-test = "^0.3" diff --git a/p2p-client-ws/src/remote_ws.rs b/p2p-client-ws/src/remote_ws.rs index d87780e..aea4f34 100644 --- a/p2p-client-ws/src/remote_ws.rs +++ b/p2p-client-ws/src/remote_ws.rs @@ -19,7 +19,7 @@ use async_tungstenite::tungstenite::protocol::CloseFrame; use async_tungstenite::WebSocketStream; use async_std::sync::Mutex; -use futures::future::Either; +use either::Either; use futures::io::Close; use futures::{future, pin_mut, select, stream, StreamExt}; use futures::{FutureExt, SinkExt}; @@ -44,6 +44,7 @@ impl IConnect for ConnectionWebSocket { async fn open( &self, ip: IP, + port: u16, peer_privk: Sensitive<[u8; 32]>, peer_pubk: PubKey, remote_peer: DirectPeerId, @@ -51,17 +52,17 @@ impl IConnect for ConnectionWebSocket { ) -> Result { let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS); - let url = format!("ws://{}:{}", ip, WS_PORT); + let url = format!("ws://{}:{}", ip, port); let res = connect_async(url).await; - match (res) { + match res { Err(e) => { log_debug!("Cannot connect: {:?}", e); Err(NetError::ConnectionError) } - Ok((mut websocket, _)) => { - cnx.start_read_loop(None, peer_privk, Some(remote_peer)); + Ok((websocket, _)) => { + cnx.start_read_loop(None, Some(peer_privk), Some(remote_peer)); let s = cnx.take_sender(); let r = cnx.take_receiver(); let mut shutdown = cnx.set_shutdown(); @@ -73,6 +74,8 @@ impl IConnect for ConnectionWebSocket { if res.is_err() { let _ = shutdown.send(Either::Left(res.err().unwrap())).await; + } else { + let _ = shutdown.send(Either::Left(NetError::Closing)).await; } log_debug!("END of WS loop"); }); @@ -83,6 +86,41 @@ impl IConnect for ConnectionWebSocket { } } } + + async fn probe(&self, ip: IP, port: u16) -> Result, ProtocolError> { + let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS); + let url = format!("ws://{}:{}", ip, port); + + let res = connect_async(url).await; + + match res { + Err(e) => { + log_debug!("Cannot connect: {:?}", e); + Err(ProtocolError::ConnectionError) + } + Ok((websocket, _)) => { + cnx.start_read_loop(None, None, None); + let s = cnx.take_sender(); + let r = cnx.take_receiver(); + let mut shutdown = cnx.set_shutdown(); + + let join = task::spawn(async move { + log_debug!("START of WS loop"); + + let res = ws_loop(websocket, s, r).await; + + if res.is_err() { + let _ = shutdown.send(Either::Left(res.err().unwrap())).await; + } else { + let _ = shutdown.send(Either::Left(NetError::Closing)).await; + } + log_debug!("END of WS loop"); + }); + + cnx.probe().await + } + } + } } #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] @@ -100,7 +138,7 @@ impl IAccept for ConnectionWebSocket { cnx.start_read_loop( Some((local_bind_address, remote_bind_address)), - peer_privk, + Some(peer_privk), None, ); let s = cnx.take_sender(); @@ -114,6 +152,8 @@ impl IAccept for ConnectionWebSocket { if res.is_err() { let _ = shutdown.send(Either::Left(res.err().unwrap())).await; + } else { + let _ = shutdown.send(Either::Left(NetError::Closing)).await; } log_debug!("END of WS loop"); }); @@ -275,7 +315,7 @@ mod test { // let mut random_buf = [0u8; 32]; // getrandom::getrandom(&mut random_buf).unwrap(); - let server_key: PubKey = "NvMf86FnhcSJ4s9zryguepgqtNCImUM4qUoW6p_wRdA".try_into()?; + let server_key: PubKey = "X0nh-gOTGKSx0yL0LYJviOWRNacyqIzjQW_LKdK6opU".try_into()?; log_debug!("server_key:{}", server_key); //let keys = p2p_net::utils::gen_dh_keys(); @@ -295,15 +335,12 @@ mod test { .connect( Box::new(ConnectionWebSocket {}), IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(), + WS_PORT, None, keys.0, keys.1, server_key, - StartConfig::Client(ClientConfig { - user: user_pub_key, - client: client_pub_key, - client_priv: client_priv_key, - }), + StartConfig::Probe, ) .await; log_info!("broker.connect : {:?}", res); @@ -332,4 +369,27 @@ mod test { Broker::join_shutdown_with_timeout(std::time::Duration::from_secs(5)).await; Ok(()) } + + #[async_std::test] + pub async fn probe() -> Result<(), NgError> { + log_info!("start probe"); + { + let res = BROKER + .write() + .await + .probe( + Box::new(ConnectionWebSocket {}), + IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(), + WS_PORT, + ) + .await; + log_info!("broker.probe : {:?}", res); + res.expect("assume the probe succeeds"); + } + + //Broker::graceful_shutdown().await; + + Broker::join_shutdown_with_timeout(std::time::Duration::from_secs(10)).await; + Ok(()) + } } diff --git a/p2p-client-ws/src/remote_ws_wasm.rs b/p2p-client-ws/src/remote_ws_wasm.rs index c3a5a49..337faa7 100644 --- a/p2p-client-ws/src/remote_ws_wasm.rs +++ b/p2p-client-ws/src/remote_ws_wasm.rs @@ -11,6 +11,7 @@ //! WebSocket for Wasm Remote Connection to a Broker +use either::Either; use futures::FutureExt; use futures::{future, pin_mut, select, stream, SinkExt, StreamExt}; use p2p_net::connection::*; @@ -37,6 +38,7 @@ impl IConnect for ConnectionWebSocket { async fn open( &self, ip: IP, + port: u16, peer_privk: Sensitive<[u8; 32]>, peer_pubk: PubKey, remote_peer: DirectPeerId, @@ -44,14 +46,14 @@ impl IConnect for ConnectionWebSocket { ) -> Result { let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS); - let url = format!("ws://{}:{}", ip, WS_PORT); + let url = format!("ws://{}:{}", ip, port); let (mut ws, wsio) = WsMeta::connect(url, None).await.map_err(|e| { //log_info!("{:?}", e); NetError::ConnectionError })?; - cnx.start_read_loop(None, peer_privk, Some(remote_peer)); + cnx.start_read_loop(None, Some(peer_privk), Some(remote_peer)); let mut shutdown = cnx.set_shutdown(); spawn_and_log_error(ws_loop( @@ -66,6 +68,28 @@ impl IConnect for ConnectionWebSocket { Ok(cnx) } + async fn probe(&self, ip: IP, port: u16) -> Result, ProtocolError> { + let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS); + let url = format!("ws://{}:{}", ip, port); + + let (mut ws, wsio) = WsMeta::connect(url, None).await.map_err(|e| { + //log_info!("{:?}", e); + ProtocolError::ConnectionError + })?; + + cnx.start_read_loop(None, None, None); + let mut shutdown = cnx.set_shutdown(); + + spawn_and_log_error(ws_loop( + ws, + wsio, + cnx.take_sender(), + cnx.take_receiver(), + shutdown, + )); + + cnx.probe().await + } } async fn ws_loop( @@ -73,7 +97,7 @@ async fn ws_loop( mut stream: WsStream, sender: Receiver, mut receiver: Sender, - mut shutdown: Sender, + mut shutdown: Sender>, ) -> ResultSend<()> { async fn inner_loop( stream: &mut WsStream, @@ -175,10 +199,13 @@ async fn ws_loop( Some(WsEvent::WsErr(_e)) => ConnectionCommand::Error(NetError::WsError), }; if let ConnectionCommand::Error(err) = last_command.clone() { - let _ = shutdown.send(err).await; - } else if let ConnectionCommand::ProtocolError(err) = last_command.clone() { - //let _ = shutdown.send(NetError::ProtocolError).await; - } // otherwise, shutdown gracefully (with None). it is done automatically during destroy of shutdown + let _ = shutdown.send(Either::Left(err)).await; + } else { + let _ = shutdown.send(Either::Left(NetError::Closing)).await; + } + // if let ConnectionCommand::ProtocolError(err) = last_command.clone() { + //let _ = shutdown.send(Either::Left(NetError::ProtocolError)).await; + // otherwise, shutdown gracefully (with None). it is done automatically during destroy of shutdown receiver .send(last_command) diff --git a/p2p-net/Cargo.toml b/p2p-net/Cargo.toml index 7903d7b..b57c1ba 100644 --- a/p2p-net/Cargo.toml +++ b/p2p-net/Cargo.toml @@ -24,6 +24,7 @@ once_cell = "1.17.1" noise-protocol = "0.2.0-rc1" noise-rust-crypto = "0.6.0-rc.1" ed25519-dalek = "1.0.1" +either = "1.8.1" [target.'cfg(target_arch = "wasm32")'.dependencies.getrandom] version = "0.2.7" @@ -31,4 +32,4 @@ features = ["js"] [target.'cfg(not(target_arch = "wasm32"))'.dependencies] getrandom = "0.2.7" -default-net = "0.15" \ No newline at end of file +default-net = "0.15" diff --git a/p2p-net/src/actor.rs b/p2p-net/src/actor.rs index cb0094d..3df7978 100644 --- a/p2p-net/src/actor.rs +++ b/p2p-net/src/actor.rs @@ -107,6 +107,10 @@ impl< // || !self.initiator && msg.type_id() == TypeId::of::() // } + pub fn detach_receiver(&mut self) -> Receiver { + self.receiver.take().unwrap() + } + pub async fn request( &mut self, msg: ProtocolMessage, diff --git a/p2p-net/src/actors/mod.rs b/p2p-net/src/actors/mod.rs index 339b7ba..4a5268c 100644 --- a/p2p-net/src/actors/mod.rs +++ b/p2p-net/src/actors/mod.rs @@ -3,3 +3,6 @@ pub use noise::*; pub mod start; pub use start::*; + +pub mod probe; +pub use probe::*; diff --git a/p2p-net/src/actors/probe.rs b/p2p-net/src/actors/probe.rs new file mode 100644 index 0000000..123dfab --- /dev/null +++ b/p2p-net/src/actors/probe.rs @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers + * All rights reserved. + * Licensed under the Apache License, Version 2.0 + * + * or the MIT license , + * at your option. All files in the project carrying such + * notice may not be copied, modified, or distributed except + * according to those terms. +*/ + +use crate::connection::NoiseFSM; +use crate::types::{ProbeResponse, MAGIC_NG_REQUEST}; +use crate::{actor::*, errors::ProtocolError, types::ProtocolMessage}; +use async_std::sync::Mutex; +use serde::{Deserialize, Serialize}; +use std::any::{Any, TypeId}; +use std::sync::Arc; + +/// Send to probe if the server is a NextGraph broker. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Probe {} + +impl TryFrom for ProbeResponse { + type Error = ProtocolError; + fn try_from(msg: ProtocolMessage) -> Result { + if let ProtocolMessage::ProbeResponse(res) = msg { + Ok(res) + } else { + Err(ProtocolError::InvalidValue) + } + } +} + +impl TryFrom for Probe { + type Error = ProtocolError; + fn try_from(msg: ProtocolMessage) -> Result { + if let ProtocolMessage::Probe(magic) = msg { + if magic == MAGIC_NG_REQUEST { + Ok(Probe {}) + } else { + Err(ProtocolError::InvalidValue) + } + } else { + Err(ProtocolError::InvalidValue) + } + } +} + +impl From for ProtocolMessage { + fn from(msg: Probe) -> ProtocolMessage { + ProtocolMessage::Probe(MAGIC_NG_REQUEST) + } +} + +impl Actor<'_, Probe, ProbeResponse> {} + +#[async_trait::async_trait] +impl EActor for Actor<'_, Probe, ProbeResponse> { + async fn respond( + &mut self, + msg: ProtocolMessage, + fsm: Arc>, + ) -> Result<(), ProtocolError> { + let req = Probe::try_from(msg)?; + //let res = ProbeResponse() + //fsm.lock().await.send(res.into()).await?; + Ok(()) + } +} diff --git a/p2p-net/src/broker.rs b/p2p-net/src/broker.rs index aa44949..bd21132 100644 --- a/p2p-net/src/broker.rs +++ b/p2p-net/src/broker.rs @@ -17,8 +17,8 @@ use crate::utils::spawn_and_log_error; use crate::utils::{Receiver, ResultSend, Sender}; use async_std::stream::StreamExt; use async_std::sync::{Arc, RwLock}; +use either::Either; use futures::channel::mpsc; -use futures::future::Either; use futures::SinkExt; use noise_protocol::U8Array; use noise_rust_crypto::sensitive::Sensitive; @@ -119,6 +119,38 @@ impl Broker { (copy_listeners, copy_bind_addresses) } + #[cfg(not(target_arch = "wasm32"))] + pub fn authorize( + &self, + remote_bind_address: &BindAddress, + auth: Authorization, + ) -> Result<(), ProtocolError> { + match auth { + Authorization::Discover => { + let listener_id = self + .bind_addresses + .get(remote_bind_address) + .ok_or(ProtocolError::BrokerError)?; + let listener = self + .listeners + .get(listener_id) + .ok_or(ProtocolError::BrokerError)?; + if listener.config.discoverable + && remote_bind_address.ip.is_private() + && listener.config.accept_forward_for.is_no() + { + Ok(()) + } else { + Err(ProtocolError::AccessDenied) + } + } + Authorization::ExtMessage => Err(ProtocolError::AccessDenied), + Authorization::Client => Err(ProtocolError::AccessDenied), + Authorization::Core => Err(ProtocolError::AccessDenied), + Authorization::Admin => Err(ProtocolError::AccessDenied), + } + } + pub fn set_overlays_configs(&mut self, overlays_configs: Vec) { self.overlays_configs.extend(overlays_configs) } @@ -363,7 +395,7 @@ impl Broker { return Err(NetError::Closing); } - let join = connection.take_shutdown(); + let join: mpsc::UnboundedReceiver> = connection.take_shutdown(); if self .anonymous_connections .insert((local_bind_address, remote_bind_address), connection) @@ -419,7 +451,7 @@ impl Broker { core: Option, ) -> Result<(), NetError> { log_debug!("ATTACH PEER_ID {}", remote_peer_id); - let connection = self + let mut connection = self .anonymous_connections .remove(&(local_bind_address, remote_bind_address)) .ok_or(NetError::InternalError)?; @@ -448,10 +480,23 @@ impl Broker { Ok(()) } + pub async fn probe( + &mut self, + cnx: Box, + ip: IP, + port: u16, + ) -> Result, ProtocolError> { + if self.closing { + return Err(ProtocolError::Closing); + } + cnx.probe(ip, port).await + } + pub async fn connect( &mut self, cnx: Box, ip: IP, + port: u16, core: Option, // the interface used as egress for this connection peer_privk: Sensitive<[u8; 32]>, peer_pubk: PubKey, @@ -469,6 +514,7 @@ impl Broker { let mut connection = cnx .open( ip, + port, Sensitive::<[u8; 32]>::from_slice(peer_privk.deref()), peer_pubk, remote_peer_id, @@ -509,7 +555,10 @@ impl Broker { async move { let res = join.next().await; log_info!("SOCKET IS CLOSED {:?} {:?}", res, &remote_peer_id); - if res.is_some() { + if res.is_some() + && res.as_ref().unwrap().is_left() + && res.unwrap().unwrap_left() != NetError::Closing + { // we intend to reconnect let mut broker = BROKER.write().await; broker.reconnecting(&remote_peer_id); diff --git a/p2p-net/src/connection.rs b/p2p-net/src/connection.rs index a009e41..bb464dd 100644 --- a/p2p-net/src/connection.rs +++ b/p2p-net/src/connection.rs @@ -22,9 +22,10 @@ use crate::errors::NetError; use crate::errors::ProtocolError; use crate::types::*; use crate::utils::*; +use async_std::future::TimeoutError; use async_std::stream::StreamExt; use async_std::sync::Mutex; -use futures::future::Either; +use either::Either; use futures::{channel::mpsc, select, Future, FutureExt, SinkExt}; use noise_protocol::U8Array; use noise_protocol::{patterns::noise_xk, CipherState, HandshakeState}; @@ -52,11 +53,14 @@ pub trait IConnect: Send + Sync { async fn open( &self, ip: IP, + port: u16, peer_privk: Sensitive<[u8; 32]>, peer_pubk: PubKey, remote_peer: DirectPeerId, config: StartConfig, ) -> Result; + + async fn probe(&self, ip: IP, port: u16) -> Result, ProtocolError>; } #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] @@ -87,6 +91,9 @@ impl ConnectionDir { #[derive(Debug, PartialEq)] pub enum FSMstate { Local0, + Start, + Probe, + Relay, Noise0, Noise1, Noise2, @@ -97,6 +104,7 @@ pub enum FSMstate { ServerHello, ClientAuth, AuthResult, + Closing, } pub struct NoiseFSM { @@ -104,6 +112,7 @@ pub struct NoiseFSM { dir: ConnectionDir, sender: Sender, + /// first is local, second is remote bind_addresses: Option<(BindAddress, BindAddress)>, actors: Arc>>>, @@ -112,8 +121,8 @@ pub struct NoiseFSM { noise_cipher_state_enc: Option>, noise_cipher_state_dec: Option>, - from: Option>, - to: Option, + local: Option>, + remote: Option, nonce_for_hello: Vec, config: Option, @@ -132,6 +141,7 @@ pub enum StepReply { Responder(ProtocolMessage), Response(ProtocolMessage), NONE, + CloseNow, } pub struct ClientConfig { @@ -147,6 +157,8 @@ pub struct CoreConfig {} pub struct AdminConfig {} pub enum StartConfig { + Probe, + Relay(BindAddress), Client(ClientConfig), Ext(ExtConfig), Core(CoreConfig), @@ -160,14 +172,14 @@ impl NoiseFSM { dir: ConnectionDir, actors: Arc>>>, sender: Sender, - from: Sensitive<[u8; 32]>, - to: Option, + local: Option>, + remote: Option, ) -> Self { Self { state: if tp == TransportProtocol::Local { FSMstate::Local0 } else { - FSMstate::Noise0 + FSMstate::Start }, dir, bind_addresses, @@ -176,8 +188,8 @@ impl NoiseFSM { noise_handshake_state: None, noise_cipher_state_enc: None, noise_cipher_state_dec: None, - from: Some(from), - to, + local, + remote, nonce_for_hello: vec![], config: None, } @@ -211,7 +223,7 @@ impl NoiseFSM { } pub async fn send(&mut self, msg: ProtocolMessage) -> Result<(), ProtocolError> { - log_trace!("SENDING: {:?}", msg); + log_debug!("SENDING: {:?}", msg); if self.noise_cipher_state_enc.is_some() { let cipher = self.encrypt(msg)?; self.sender @@ -244,6 +256,37 @@ impl NoiseFSM { // } // } + async fn process_server_noise0(&mut self, noise: &Noise) -> Result { + let mut handshake = HandshakeState::::new( + noise_xk(), + false, + &[], + Some(from_ed_priv_to_dh_priv(self.local.take().unwrap())), + None, + None, + None, + ); + + let mut payload = handshake.read_message_vec(noise.data()).map_err(|e| { + log_debug!("{:?}", e); + ProtocolError::NoiseHandshakeFailed + })?; + + payload = handshake.write_message_vec(&payload).map_err(|e| { + log_debug!("{:?}", e); + ProtocolError::NoiseHandshakeFailed + })?; + + let noise = Noise::V0(NoiseV0 { data: payload }); + self.send(noise.into()).await?; + + self.noise_handshake_state = Some(handshake); + + self.state = FSMstate::Noise2; + + return Ok(StepReply::NONE); + } + pub async fn step( &mut self, mut msg_opt: Option, @@ -257,9 +300,10 @@ impl NoiseFSM { } } if msg_opt.is_some() { - log_trace!("RECEIVED: {:?}", msg_opt.as_ref().unwrap()); + log_debug!("RECEIVED: {:?}", msg_opt.as_ref().unwrap()); } match self.state { + FSMstate::Closing => {} // TODO verify that ID is zero FSMstate::Local0 => { // CLIENT LOCAL @@ -277,68 +321,125 @@ impl NoiseFSM { } } } - FSMstate::Noise0 => { - // CLIENT INITIALIZE NOISE + FSMstate::Start => { if !self.dir.is_server() && msg_opt.is_none() { - let mut handshake = HandshakeState::::new( - noise_xk(), - true, - &[], - Some(from_ed_priv_to_dh_priv(self.from.take().unwrap())), - None, - Some(*self.to.unwrap().to_dh_from_ed().slice()), - None, - ); - - let payload = handshake - .write_message_vec(&[]) - .map_err(|e| ProtocolError::NoiseHandshakeFailed)?; - - let noise = Noise::V0(NoiseV0 { data: payload }); - self.send(noise.into()).await?; - - self.noise_handshake_state = Some(handshake); - - self.state = FSMstate::Noise1; - - return Ok(StepReply::NONE); - } - // SERVER INITIALIZE NOISE - else if let Some(msg) = msg_opt.as_ref() { - if self.dir.is_server() { - if let ProtocolMessage::Noise(noise) = msg { + // CLIENT START + match self.config.as_ref().unwrap() { + StartConfig::Probe => { + // PROBE REQUEST + let request = ProtocolMessage::Probe(MAGIC_NG_REQUEST); + self.send(request).await?; + self.state = FSMstate::Probe; + return Ok(StepReply::NONE); + } + StartConfig::Relay(relay_to) => { + // RELAY REQUEST + //self.state + todo!(); + } + _ => { + // CLIENT INITIALIZE NOISE let mut handshake = HandshakeState::::new( noise_xk(), - false, + true, &[], - Some(from_ed_priv_to_dh_priv(self.from.take().unwrap())), - None, + Some(from_ed_priv_to_dh_priv(self.local.take().unwrap())), None, + Some(*self.remote.unwrap().to_dh_from_ed().slice()), None, ); - let mut payload = - handshake.read_message_vec(noise.data()).map_err(|e| { - log_debug!("{:?}", e); - ProtocolError::NoiseHandshakeFailed - })?; - - payload = handshake.write_message_vec(&payload).map_err(|e| { - log_debug!("{:?}", e); - ProtocolError::NoiseHandshakeFailed - })?; + let payload = handshake + .write_message_vec(&[]) + .map_err(|e| ProtocolError::NoiseHandshakeFailed)?; let noise = Noise::V0(NoiseV0 { data: payload }); self.send(noise.into()).await?; self.noise_handshake_state = Some(handshake); - self.state = FSMstate::Noise2; + self.state = FSMstate::Noise1; return Ok(StepReply::NONE); } } + } else { + #[cfg(not(target_arch = "wasm32"))] + if let Some(msg) = msg_opt.as_ref() { + if self.dir.is_server() { + // SERVER START + match msg { + ProtocolMessage::Probe(magic) => { + // PROBE REQUEST + if *magic != MAGIC_NG_REQUEST { + return Err(ProtocolError::WhereIsTheMagic); + } + let mut probe_response = ProbeResponse { + magic: MAGIC_NG_RESPONSE.to_vec(), + peer_id: None, + }; + if BROKER + .read() + .await + .authorize( + &self + .bind_addresses + .ok_or(ProtocolError::BrokerError)? + .1, + Authorization::Discover, + ) + .is_ok() + { + probe_response.peer_id = + Some(ed_sensitive_privkey_to_pubkey( + self.local + .as_ref() + .ok_or(ProtocolError::BrokerError)?, + )); + } + self.send(ProtocolMessage::ProbeResponse(probe_response)) + .await?; + self.state = FSMstate::Closing; + sleep!(std::time::Duration::from_secs(2)); + return Ok(StepReply::CloseNow); + } + ProtocolMessage::Relay(_) => { + todo!(); + } + ProtocolMessage::Tunnel(_) => { + self.state = FSMstate::Noise1; + todo!(); + } + ProtocolMessage::Noise(noise) => { + // SERVER INITIALIZE NOISE + return self.process_server_noise0(noise).await; + } + _ => return Err(ProtocolError::InvalidState), + } + } + } + } + } + FSMstate::Probe => { + // CLIENT side receiving probe response + if let Some(msg) = msg_opt { + let id = msg.id(); + if id != 0 { + return Err(ProtocolError::InvalidState); + } + if let ProtocolMessage::ProbeResponse(probe_res) = &msg { + return Ok(StepReply::Response(msg)); + } + } + } + FSMstate::Relay => {} + + FSMstate::Noise0 => { + if let Some(ProtocolMessage::Noise(noise)) = msg_opt.as_ref() { + if self.dir.is_server() { + return self.process_server_noise0(noise).await; + } } } FSMstate::Noise1 => { @@ -379,6 +480,7 @@ impl NoiseFSM { StartConfig::Admin(admin_config) => { todo!(); } + _ => return Err(ProtocolError::InvalidState), } self.noise_cipher_state_enc = Some(ciphers.0); @@ -409,7 +511,7 @@ impl NoiseFSM { return Err(ProtocolError::NoiseHandshakeFailed); } let peer_id = PubKey::Ed25519PubKey(handshake.get_rs().unwrap()); - self.to = Some(peer_id); + self.remote = Some(peer_id); let (local_bind_address, remote_bind_address) = self.bind_addresses.ok_or(ProtocolError::BrokerError)?; BROKER @@ -601,10 +703,14 @@ impl ConnectionBase { } } - pub async fn reset_shutdown(&self, remote_peer_id: PubKey) { + pub async fn release_shutdown(&mut self) { + self.shutdown_sender = None; + } + + pub async fn reset_shutdown(&mut self, remote_peer_id: PubKey) { let _ = self .shutdown_sender - .as_ref() + .take() .unwrap() .send(Either::Right(remote_peer_id)) .await; @@ -663,6 +769,10 @@ impl ConnectionBase { break; //TODO test that sending a ProtocolError effectively closes the connection (with ConnectionCommand::Close) } } + Ok(StepReply::CloseNow) => { + let _ = sender.send(ConnectionCommand::Close).await; + break; + } Ok(StepReply::NONE) => {} Ok(StepReply::Responder(responder)) => { let r = responder @@ -756,6 +866,63 @@ impl ConnectionBase { self.send(ConnectionCommand::Close).await; } + pub async fn probe(&mut self) -> Result, ProtocolError> { + if !self.dir.is_server() { + let config = StartConfig::Probe; + let mut actor = Box::new(Actor::::new(0, true)); + self.actors.lock().await.insert(0, actor.get_receiver_tx()); + let res; + { + let mut fsm = self.fsm.as_ref().unwrap().lock().await; + fsm.config = Some(config); + res = fsm.step(None).await; + } + if let Err(err) = res { + self.send(ConnectionCommand::ProtocolError(err.clone())) + .await; + return Err(err); + } + let mut receiver = actor.detach_receiver(); + let mut shutdown = self.take_shutdown(); + select! { + + res = async_std::future::timeout(std::time::Duration::from_secs(2),receiver.next()).fuse() => { + self.fsm + .as_mut() + .unwrap() + .lock() + .await + .remove_actor(0) + .await; + match res { + Ok(Some(ConnectionCommand::Msg(ProtocolMessage::ProbeResponse(res)))) => { + if res.magic == MAGIC_NG_RESPONSE { + self.close().await; + return Ok(res.peer_id); + } + } + Err(_) => {} + _ => {} + } + self.close().await; + return Err(ProtocolError::WhereIsTheMagic); + }, + r = shutdown.next().fuse() => { + self.fsm + .as_mut() + .unwrap() + .lock() + .await + .remove_actor(0) + .await; + return Err(ProtocolError::Closing); + } + } + } else { + panic!("cannot call probe on a server-side connection"); + } + } + pub async fn start(&mut self, config: StartConfig) { // BOOTSTRAP the protocol from client-side if !self.dir.is_server() { @@ -776,8 +943,8 @@ impl ConnectionBase { pub fn start_read_loop( &mut self, bind_addresses: Option<(BindAddress, BindAddress)>, - from: Sensitive<[u8; 32]>, - to: Option, + local: Option>, + remote: Option, ) { let (sender_tx, sender_rx) = mpsc::unbounded(); let (receiver_tx, receiver_rx) = mpsc::unbounded(); @@ -792,8 +959,8 @@ impl ConnectionBase { self.dir.clone(), Arc::clone(&self.actors), sender_tx.clone(), - from, - to, + local, + remote, ))); self.fsm = Some(Arc::clone(&fsm)); diff --git a/p2p-net/src/errors.rs b/p2p-net/src/errors.rs index 6798205..f3656b8 100644 --- a/p2p-net/src/errors.rs +++ b/p2p-net/src/errors.rs @@ -79,6 +79,7 @@ pub enum ProtocolError { NoiseHandshakeFailed, DecryptionError, EncryptionError, + WhereIsTheMagic, InvalidNonce, } //MAX 949 ProtocolErrors diff --git a/p2p-net/src/types.rs b/p2p-net/src/types.rs index 0a2db6a..635fa96 100644 --- a/p2p-net/src/types.rs +++ b/p2p-net/src/types.rs @@ -219,6 +219,12 @@ impl AcceptForwardForV0 { _ => false, } } + pub fn is_no(&self) -> bool { + match self { + AcceptForwardForV0::No => true, + _ => false, + } + } pub fn is_public_dyn(&self) -> bool { match self { AcceptForwardForV0::PublicDyn(_) => true, @@ -2121,8 +2127,79 @@ impl TryFrom for ExtResponse { /// /// PROTOCOL MESSAGES /// + +pub static MAGIC_NG_REQUEST: [u8; 2] = [78u8, 71u8]; +pub static MAGIC_NG_RESPONSE: [u8; 4] = [89u8, 88u8, 78u8, 75u8]; + +#[derive(Clone, Debug)] +pub enum Authorization { + Discover, + ExtMessage, + Client, + Core, + Admin, +} + +/// ProbeResponse +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ProbeResponse { + /// Response Magic number + #[serde(with = "serde_bytes")] + pub magic: Vec, + + /// Used for discovery of broker on private LAN + /// see ListenerV0.discoverable + pub peer_id: Option, +} + +/// RelayRequest +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct RelayRequest { + /// The BindAddress of the broker to relay to should be of the same IP family than the TunnelRequest.remote_addr + pub address: BindAddress, +} + +/// RelayResponse +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct RelayResponse { + /// Response Magic number + #[serde(with = "serde_bytes")] + pub magic: Vec, + + /// result to the relay request (accept, refuse) + pub result: u16, +} + +/// Tunnel Request +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct TunnelRequest { + /// Request Magic number + #[serde(with = "serde_bytes")] + pub magic: Vec, + + // Bind address of client as connected to the relaying broker. + pub remote_addr: BindAddress, +} + +/// Tunnel Response +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct TunnelResponse { + /// Response Magic number + #[serde(with = "serde_bytes")] + pub magic: Vec, + + /// result to the tunnel request (accept, refuse) + pub result: u16, +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub enum ProtocolMessage { + Probe([u8; 2]), + ProbeResponse(ProbeResponse), + Relay(RelayRequest), + RelayResponse(RelayResponse), + Tunnel(TunnelRequest), + TunnelResponse(TunnelResponse), Noise(Noise), Start(StartProtocol), ServerHello(ServerHello), @@ -2136,26 +2213,18 @@ pub enum ProtocolMessage { impl ProtocolMessage { pub fn id(&self) -> i64 { match self { - ProtocolMessage::Noise(_) => 0, - ProtocolMessage::Start(_) => 0, - ProtocolMessage::ServerHello(_) => 0, - ProtocolMessage::ClientAuth(_) => 0, - ProtocolMessage::AuthResult(_) => 0, ProtocolMessage::ExtRequest(ext_req) => ext_req.id(), ProtocolMessage::ExtResponse(ext_res) => ext_res.id(), ProtocolMessage::BrokerMessage(broker_msg) => broker_msg.id(), + _ => 0, } } pub fn set_id(&mut self, id: i64) { match self { - ProtocolMessage::Noise(_) => panic!("cannot set ID"), - ProtocolMessage::Start(_) => panic!("cannot set ID"), - ProtocolMessage::ServerHello(_) => panic!("cannot set ID"), - ProtocolMessage::ClientAuth(_) => panic!("cannot set ID"), - ProtocolMessage::AuthResult(_) => panic!("cannot set ID"), ProtocolMessage::ExtRequest(ext_req) => ext_req.set_id(id), ProtocolMessage::ExtResponse(ext_res) => ext_res.set_id(id), ProtocolMessage::BrokerMessage(broker_msg) => broker_msg.set_id(id), + _ => panic!("cannot set ID"), } } pub fn type_id(&self) -> TypeId { @@ -2168,6 +2237,12 @@ impl ProtocolMessage { ProtocolMessage::ExtRequest(a) => a.type_id(), ProtocolMessage::ExtResponse(a) => a.type_id(), ProtocolMessage::BrokerMessage(a) => a.type_id(), + ProtocolMessage::Probe(a) => a.type_id(), + ProtocolMessage::ProbeResponse(a) => a.type_id(), + ProtocolMessage::Relay(a) => a.type_id(), + ProtocolMessage::RelayResponse(a) => a.type_id(), + ProtocolMessage::Tunnel(a) => a.type_id(), + ProtocolMessage::TunnelResponse(a) => a.type_id(), } } diff --git a/p2p-net/src/utils.rs b/p2p-net/src/utils.rs index 87e47c2..70c2efc 100644 --- a/p2p-net/src/utils.rs +++ b/p2p-net/src/utils.rs @@ -60,6 +60,13 @@ pub fn keypair_from_ed(secret: SecretKey, public: PublicKey) -> (Sensitive<[u8; (priv_key, pub_key) } +pub fn ed_sensitive_privkey_to_pubkey(privkey: &Sensitive<[u8; 32]>) -> PubKey { + //TODO FIXME do not create a SecretKey or call into() on it, as this is not using Sensitive<> + let sk = SecretKey::from_bytes(privkey.as_slice()).unwrap(); + let pk: PublicKey = (&sk).into(); + PubKey::Ed25519PubKey(pk.to_bytes()) +} + pub fn keys_from_bytes(secret_key: [u8; 32]) -> (Sensitive<[u8; 32]>, PubKey) { let sk = SecretKey::from_bytes(&secret_key).unwrap(); let pk: PublicKey = (&sk).into();