added probe of broker

Niko PLP 10 months ago
parent e76b104a5d
commit 4045ef3d14
  1. 2
      Cargo.lock
  2. 23
      ng-sdk-js/src/lib.rs
  3. 26
      p2p-broker/src/server_ws.rs
  4. 1
      p2p-client-ws/Cargo.toml
  5. 84
      p2p-client-ws/src/remote_ws.rs
  6. 41
      p2p-client-ws/src/remote_ws_wasm.rs
  7. 3
      p2p-net/Cargo.toml
  8. 4
      p2p-net/src/actor.rs
  9. 3
      p2p-net/src/actors/mod.rs
  10. 70
      p2p-net/src/actors/probe.rs
  11. 57
      p2p-net/src/broker.rs
  12. 287
      p2p-net/src/connection.rs
  13. 1
      p2p-net/src/errors.rs
  14. 95
      p2p-net/src/types.rs
  15. 7
      p2p-net/src/utils.rs

2
Cargo.lock generated

@ -3011,6 +3011,7 @@ dependencies = [
"async-trait",
"async-tungstenite",
"chacha20",
"either",
"futures",
"getrandom 0.2.10",
"p2p-net",
@ -3035,6 +3036,7 @@ dependencies = [
"blake3",
"default-net",
"ed25519-dalek",
"either",
"futures",
"getrandom 0.2.10",
"noise-protocol",

@ -23,6 +23,7 @@ use p2p_net::broker::*;
use p2p_net::connection::{ClientConfig, StartConfig};
use p2p_net::types::{DirectPeerId, IP};
use p2p_net::utils::{gen_ed_keys, spawn_and_log_error, Receiver, ResultSend, Sender};
use p2p_net::WS_PORT;
use p2p_repo::log::*;
use p2p_repo::types::*;
use p2p_repo::utils::generate_keypair;
@ -198,6 +199,23 @@ pub async fn doc_sync_branch(anuri: String, callback: &js_sys::Function) -> JsVa
return ret;
}
#[cfg(target_arch = "wasm32")]
#[wasm_bindgen]
pub async fn probe() {
let res = BROKER
.write()
.await
.probe(
Box::new(ConnectionWebSocket {}),
IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(),
WS_PORT,
)
.await;
log_info!("broker.probe : {:?}", res);
Broker::join_shutdown_with_timeout(std::time::Duration::from_secs(5)).await;
}
#[cfg(target_arch = "wasm32")]
#[wasm_bindgen]
pub async fn start() {
@ -207,7 +225,7 @@ pub async fn start() {
// getrandom::getrandom(&mut random_buf).unwrap();
async fn inner_task() -> ResultSend<()> {
let server_key: PubKey = "KWdmwr4_oO62IFGfKzuyotQOixqXGNWv59CRAGvPTjM".try_into()?;
let server_key: PubKey = "X0nh-gOTGKSx0yL0LYJviOWRNacyqIzjQW_LKdK6opU".try_into()?;
log_debug!("server_key:{}", server_key);
//let keys = p2p_net::utils::gen_dh_keys();
@ -227,6 +245,7 @@ pub async fn start() {
.connect(
Box::new(ConnectionWebSocket {}),
IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(),
WS_PORT,
None,
keys.0,
keys.1,
@ -290,10 +309,12 @@ pub fn change(name: &str) -> JsValue {
mod test {
use wasm_bindgen_test::*;
wasm_bindgen_test_configure!(run_in_browser);
use crate::probe;
use crate::start;
#[wasm_bindgen_test]
pub async fn test_connection() {
//probe().await;
start().await;
}
}

@ -80,6 +80,13 @@ fn make_error(code: StatusCode) -> ErrorResponse {
Response::builder().status(code).body(None).unwrap()
}
fn check_no_origin(origin: Option<&HeaderValue>) -> Result<(), ErrorResponse> {
match origin {
Some(_) => Err(make_error(StatusCode::FORBIDDEN)),
None => Ok(()),
}
}
fn check_origin_is_url(
origin: Option<&HeaderValue>,
domains: Vec<String>,
@ -271,7 +278,7 @@ fn upgrade_ws_or_serve_app(
const LOCAL_HOSTS: [&str; 3] = ["localhost", "127.0.0.1", "[::1]"];
const LOCAL_URLS: [&str; 3] = ["http://localhost", "http://127.0.0.1", "http://[::1]"];
const APP_NG_ONE_URL: &str = "https://app.nextgraph.one";
//const APP_NG_ONE_URL: &str = "https://app.nextgraph.one";
impl Callback for SecurityCallback {
fn on_request(self, request: &Request) -> Result<(), ErrorResponse> {
@ -335,11 +342,12 @@ impl Callback for SecurityCallback {
return Err(make_error(StatusCode::FORBIDDEN));
}
check_no_xff(xff)?;
let mut urls_str = vec![];
if !listener.config.refuse_clients {
urls_str.push(APP_NG_ONE_URL.to_string());
}
check_origin_is_url(origin, urls_str)?;
check_no_origin(origin)?;
// let mut urls_str = vec![];
// if !listener.config.refuse_clients {
// urls_str.push(APP_NG_ONE_URL.to_string());
// }
// check_origin_is_url(origin, urls_str)?;
check_host_in_addrs(host, &listener.addrs)?;
log_debug!(
@ -428,9 +436,9 @@ impl Callback for SecurityCallback {
.accept_forward_for
.get_public_bind_addresses();
let mut urls_str = vec![];
if !listener.config.refuse_clients {
urls_str.push(APP_NG_ONE_URL.to_string());
}
// if !listener.config.refuse_clients {
// urls_str.push(APP_NG_ONE_URL.to_string());
// }
if listener.config.accept_direct {
addrs.extend(&listener.addrs);
urls_str = [

@ -22,6 +22,7 @@ async-oneshot = "0.5.0"
ws_stream_wasm = "0.7"
pharos = "0.5"
wasm-bindgen = "0.2"
either = "1.8.1"
[dev-dependencies]
wasm-bindgen-test = "^0.3"

@ -19,7 +19,7 @@ use async_tungstenite::tungstenite::protocol::CloseFrame;
use async_tungstenite::WebSocketStream;
use async_std::sync::Mutex;
use futures::future::Either;
use either::Either;
use futures::io::Close;
use futures::{future, pin_mut, select, stream, StreamExt};
use futures::{FutureExt, SinkExt};
@ -44,6 +44,7 @@ impl IConnect for ConnectionWebSocket {
async fn open(
&self,
ip: IP,
port: u16,
peer_privk: Sensitive<[u8; 32]>,
peer_pubk: PubKey,
remote_peer: DirectPeerId,
@ -51,17 +52,17 @@ impl IConnect for ConnectionWebSocket {
) -> Result<ConnectionBase, NetError> {
let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS);
let url = format!("ws://{}:{}", ip, WS_PORT);
let url = format!("ws://{}:{}", ip, port);
let res = connect_async(url).await;
match (res) {
match res {
Err(e) => {
log_debug!("Cannot connect: {:?}", e);
Err(NetError::ConnectionError)
}
Ok((mut websocket, _)) => {
cnx.start_read_loop(None, peer_privk, Some(remote_peer));
Ok((websocket, _)) => {
cnx.start_read_loop(None, Some(peer_privk), Some(remote_peer));
let s = cnx.take_sender();
let r = cnx.take_receiver();
let mut shutdown = cnx.set_shutdown();
@ -73,6 +74,8 @@ impl IConnect for ConnectionWebSocket {
if res.is_err() {
let _ = shutdown.send(Either::Left(res.err().unwrap())).await;
} else {
let _ = shutdown.send(Either::Left(NetError::Closing)).await;
}
log_debug!("END of WS loop");
});
@ -83,6 +86,41 @@ impl IConnect for ConnectionWebSocket {
}
}
}
async fn probe(&self, ip: IP, port: u16) -> Result<Option<PubKey>, ProtocolError> {
let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS);
let url = format!("ws://{}:{}", ip, port);
let res = connect_async(url).await;
match res {
Err(e) => {
log_debug!("Cannot connect: {:?}", e);
Err(ProtocolError::ConnectionError)
}
Ok((websocket, _)) => {
cnx.start_read_loop(None, None, None);
let s = cnx.take_sender();
let r = cnx.take_receiver();
let mut shutdown = cnx.set_shutdown();
let join = task::spawn(async move {
log_debug!("START of WS loop");
let res = ws_loop(websocket, s, r).await;
if res.is_err() {
let _ = shutdown.send(Either::Left(res.err().unwrap())).await;
} else {
let _ = shutdown.send(Either::Left(NetError::Closing)).await;
}
log_debug!("END of WS loop");
});
cnx.probe().await
}
}
}
}
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
@ -100,7 +138,7 @@ impl IAccept for ConnectionWebSocket {
cnx.start_read_loop(
Some((local_bind_address, remote_bind_address)),
peer_privk,
Some(peer_privk),
None,
);
let s = cnx.take_sender();
@ -114,6 +152,8 @@ impl IAccept for ConnectionWebSocket {
if res.is_err() {
let _ = shutdown.send(Either::Left(res.err().unwrap())).await;
} else {
let _ = shutdown.send(Either::Left(NetError::Closing)).await;
}
log_debug!("END of WS loop");
});
@ -275,7 +315,7 @@ mod test {
// let mut random_buf = [0u8; 32];
// getrandom::getrandom(&mut random_buf).unwrap();
let server_key: PubKey = "NvMf86FnhcSJ4s9zryguepgqtNCImUM4qUoW6p_wRdA".try_into()?;
let server_key: PubKey = "X0nh-gOTGKSx0yL0LYJviOWRNacyqIzjQW_LKdK6opU".try_into()?;
log_debug!("server_key:{}", server_key);
//let keys = p2p_net::utils::gen_dh_keys();
@ -295,15 +335,12 @@ mod test {
.connect(
Box::new(ConnectionWebSocket {}),
IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(),
WS_PORT,
None,
keys.0,
keys.1,
server_key,
StartConfig::Client(ClientConfig {
user: user_pub_key,
client: client_pub_key,
client_priv: client_priv_key,
}),
StartConfig::Probe,
)
.await;
log_info!("broker.connect : {:?}", res);
@ -332,4 +369,27 @@ mod test {
Broker::join_shutdown_with_timeout(std::time::Duration::from_secs(5)).await;
Ok(())
}
#[async_std::test]
pub async fn probe() -> Result<(), NgError> {
log_info!("start probe");
{
let res = BROKER
.write()
.await
.probe(
Box::new(ConnectionWebSocket {}),
IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(),
WS_PORT,
)
.await;
log_info!("broker.probe : {:?}", res);
res.expect("assume the probe succeeds");
}
//Broker::graceful_shutdown().await;
Broker::join_shutdown_with_timeout(std::time::Duration::from_secs(10)).await;
Ok(())
}
}

@ -11,6 +11,7 @@
//! WebSocket for Wasm Remote Connection to a Broker
use either::Either;
use futures::FutureExt;
use futures::{future, pin_mut, select, stream, SinkExt, StreamExt};
use p2p_net::connection::*;
@ -37,6 +38,7 @@ impl IConnect for ConnectionWebSocket {
async fn open(
&self,
ip: IP,
port: u16,
peer_privk: Sensitive<[u8; 32]>,
peer_pubk: PubKey,
remote_peer: DirectPeerId,
@ -44,14 +46,14 @@ impl IConnect for ConnectionWebSocket {
) -> Result<ConnectionBase, NetError> {
let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS);
let url = format!("ws://{}:{}", ip, WS_PORT);
let url = format!("ws://{}:{}", ip, port);
let (mut ws, wsio) = WsMeta::connect(url, None).await.map_err(|e| {
//log_info!("{:?}", e);
NetError::ConnectionError
})?;
cnx.start_read_loop(None, peer_privk, Some(remote_peer));
cnx.start_read_loop(None, Some(peer_privk), Some(remote_peer));
let mut shutdown = cnx.set_shutdown();
spawn_and_log_error(ws_loop(
@ -66,6 +68,28 @@ impl IConnect for ConnectionWebSocket {
Ok(cnx)
}
async fn probe(&self, ip: IP, port: u16) -> Result<Option<PubKey>, ProtocolError> {
let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS);
let url = format!("ws://{}:{}", ip, port);
let (mut ws, wsio) = WsMeta::connect(url, None).await.map_err(|e| {
//log_info!("{:?}", e);
ProtocolError::ConnectionError
})?;
cnx.start_read_loop(None, None, None);
let mut shutdown = cnx.set_shutdown();
spawn_and_log_error(ws_loop(
ws,
wsio,
cnx.take_sender(),
cnx.take_receiver(),
shutdown,
));
cnx.probe().await
}
}
async fn ws_loop(
@ -73,7 +97,7 @@ async fn ws_loop(
mut stream: WsStream,
sender: Receiver<ConnectionCommand>,
mut receiver: Sender<ConnectionCommand>,
mut shutdown: Sender<NetError>,
mut shutdown: Sender<Either<NetError, PubKey>>,
) -> ResultSend<()> {
async fn inner_loop(
stream: &mut WsStream,
@ -175,10 +199,13 @@ async fn ws_loop(
Some(WsEvent::WsErr(_e)) => ConnectionCommand::Error(NetError::WsError),
};
if let ConnectionCommand::Error(err) = last_command.clone() {
let _ = shutdown.send(err).await;
} else if let ConnectionCommand::ProtocolError(err) = last_command.clone() {
//let _ = shutdown.send(NetError::ProtocolError).await;
} // otherwise, shutdown gracefully (with None). it is done automatically during destroy of shutdown
let _ = shutdown.send(Either::Left(err)).await;
} else {
let _ = shutdown.send(Either::Left(NetError::Closing)).await;
}
// if let ConnectionCommand::ProtocolError(err) = last_command.clone() {
//let _ = shutdown.send(Either::Left(NetError::ProtocolError)).await;
// otherwise, shutdown gracefully (with None). it is done automatically during destroy of shutdown
receiver
.send(last_command)

@ -24,6 +24,7 @@ once_cell = "1.17.1"
noise-protocol = "0.2.0-rc1"
noise-rust-crypto = "0.6.0-rc.1"
ed25519-dalek = "1.0.1"
either = "1.8.1"
[target.'cfg(target_arch = "wasm32")'.dependencies.getrandom]
version = "0.2.7"
@ -31,4 +32,4 @@ features = ["js"]
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
getrandom = "0.2.7"
default-net = "0.15"
default-net = "0.15"

@ -107,6 +107,10 @@ impl<
// || !self.initiator && msg.type_id() == TypeId::of::<A>()
// }
pub fn detach_receiver(&mut self) -> Receiver<ConnectionCommand> {
self.receiver.take().unwrap()
}
pub async fn request(
&mut self,
msg: ProtocolMessage,

@ -3,3 +3,6 @@ pub use noise::*;
pub mod start;
pub use start::*;
pub mod probe;
pub use probe::*;

@ -0,0 +1,70 @@
/*
* Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
* All rights reserved.
* Licensed under the Apache License, Version 2.0
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
* at your option. All files in the project carrying such
* notice may not be copied, modified, or distributed except
* according to those terms.
*/
use crate::connection::NoiseFSM;
use crate::types::{ProbeResponse, MAGIC_NG_REQUEST};
use crate::{actor::*, errors::ProtocolError, types::ProtocolMessage};
use async_std::sync::Mutex;
use serde::{Deserialize, Serialize};
use std::any::{Any, TypeId};
use std::sync::Arc;
/// Send to probe if the server is a NextGraph broker.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Probe {}
impl TryFrom<ProtocolMessage> for ProbeResponse {
type Error = ProtocolError;
fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
if let ProtocolMessage::ProbeResponse(res) = msg {
Ok(res)
} else {
Err(ProtocolError::InvalidValue)
}
}
}
impl TryFrom<ProtocolMessage> for Probe {
type Error = ProtocolError;
fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
if let ProtocolMessage::Probe(magic) = msg {
if magic == MAGIC_NG_REQUEST {
Ok(Probe {})
} else {
Err(ProtocolError::InvalidValue)
}
} else {
Err(ProtocolError::InvalidValue)
}
}
}
impl From<Probe> for ProtocolMessage {
fn from(msg: Probe) -> ProtocolMessage {
ProtocolMessage::Probe(MAGIC_NG_REQUEST)
}
}
impl Actor<'_, Probe, ProbeResponse> {}
#[async_trait::async_trait]
impl EActor for Actor<'_, Probe, ProbeResponse> {
async fn respond(
&mut self,
msg: ProtocolMessage,
fsm: Arc<Mutex<NoiseFSM>>,
) -> Result<(), ProtocolError> {
let req = Probe::try_from(msg)?;
//let res = ProbeResponse()
//fsm.lock().await.send(res.into()).await?;
Ok(())
}
}

@ -17,8 +17,8 @@ use crate::utils::spawn_and_log_error;
use crate::utils::{Receiver, ResultSend, Sender};
use async_std::stream::StreamExt;
use async_std::sync::{Arc, RwLock};
use either::Either;
use futures::channel::mpsc;
use futures::future::Either;
use futures::SinkExt;
use noise_protocol::U8Array;
use noise_rust_crypto::sensitive::Sensitive;
@ -119,6 +119,38 @@ impl Broker {
(copy_listeners, copy_bind_addresses)
}
#[cfg(not(target_arch = "wasm32"))]
pub fn authorize(
&self,
remote_bind_address: &BindAddress,
auth: Authorization,
) -> Result<(), ProtocolError> {
match auth {
Authorization::Discover => {
let listener_id = self
.bind_addresses
.get(remote_bind_address)
.ok_or(ProtocolError::BrokerError)?;
let listener = self
.listeners
.get(listener_id)
.ok_or(ProtocolError::BrokerError)?;
if listener.config.discoverable
&& remote_bind_address.ip.is_private()
&& listener.config.accept_forward_for.is_no()
{
Ok(())
} else {
Err(ProtocolError::AccessDenied)
}
}
Authorization::ExtMessage => Err(ProtocolError::AccessDenied),
Authorization::Client => Err(ProtocolError::AccessDenied),
Authorization::Core => Err(ProtocolError::AccessDenied),
Authorization::Admin => Err(ProtocolError::AccessDenied),
}
}
pub fn set_overlays_configs(&mut self, overlays_configs: Vec<BrokerOverlayConfigV0>) {
self.overlays_configs.extend(overlays_configs)
}
@ -363,7 +395,7 @@ impl Broker {
return Err(NetError::Closing);
}
let join = connection.take_shutdown();
let join: mpsc::UnboundedReceiver<Either<NetError, PubKey>> = connection.take_shutdown();
if self
.anonymous_connections
.insert((local_bind_address, remote_bind_address), connection)
@ -419,7 +451,7 @@ impl Broker {
core: Option<String>,
) -> Result<(), NetError> {
log_debug!("ATTACH PEER_ID {}", remote_peer_id);
let connection = self
let mut connection = self
.anonymous_connections
.remove(&(local_bind_address, remote_bind_address))
.ok_or(NetError::InternalError)?;
@ -448,10 +480,23 @@ impl Broker {
Ok(())
}
pub async fn probe(
&mut self,
cnx: Box<dyn IConnect>,
ip: IP,
port: u16,
) -> Result<Option<PubKey>, ProtocolError> {
if self.closing {
return Err(ProtocolError::Closing);
}
cnx.probe(ip, port).await
}
pub async fn connect(
&mut self,
cnx: Box<dyn IConnect>,
ip: IP,
port: u16,
core: Option<String>, // the interface used as egress for this connection
peer_privk: Sensitive<[u8; 32]>,
peer_pubk: PubKey,
@ -469,6 +514,7 @@ impl Broker {
let mut connection = cnx
.open(
ip,
port,
Sensitive::<[u8; 32]>::from_slice(peer_privk.deref()),
peer_pubk,
remote_peer_id,
@ -509,7 +555,10 @@ impl Broker {
async move {
let res = join.next().await;
log_info!("SOCKET IS CLOSED {:?} {:?}", res, &remote_peer_id);
if res.is_some() {
if res.is_some()
&& res.as_ref().unwrap().is_left()
&& res.unwrap().unwrap_left() != NetError::Closing
{
// we intend to reconnect
let mut broker = BROKER.write().await;
broker.reconnecting(&remote_peer_id);

@ -22,9 +22,10 @@ use crate::errors::NetError;
use crate::errors::ProtocolError;
use crate::types::*;
use crate::utils::*;
use async_std::future::TimeoutError;
use async_std::stream::StreamExt;
use async_std::sync::Mutex;
use futures::future::Either;
use either::Either;
use futures::{channel::mpsc, select, Future, FutureExt, SinkExt};
use noise_protocol::U8Array;
use noise_protocol::{patterns::noise_xk, CipherState, HandshakeState};
@ -52,11 +53,14 @@ pub trait IConnect: Send + Sync {
async fn open(
&self,
ip: IP,
port: u16,
peer_privk: Sensitive<[u8; 32]>,
peer_pubk: PubKey,
remote_peer: DirectPeerId,
config: StartConfig,
) -> Result<ConnectionBase, NetError>;
async fn probe(&self, ip: IP, port: u16) -> Result<Option<PubKey>, ProtocolError>;
}
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
@ -87,6 +91,9 @@ impl ConnectionDir {
#[derive(Debug, PartialEq)]
pub enum FSMstate {
Local0,
Start,
Probe,
Relay,
Noise0,
Noise1,
Noise2,
@ -97,6 +104,7 @@ pub enum FSMstate {
ServerHello,
ClientAuth,
AuthResult,
Closing,
}
pub struct NoiseFSM {
@ -104,6 +112,7 @@ pub struct NoiseFSM {
dir: ConnectionDir,
sender: Sender<ConnectionCommand>,
/// first is local, second is remote
bind_addresses: Option<(BindAddress, BindAddress)>,
actors: Arc<Mutex<HashMap<i64, Sender<ConnectionCommand>>>>,
@ -112,8 +121,8 @@ pub struct NoiseFSM {
noise_cipher_state_enc: Option<CipherState<ChaCha20Poly1305>>,
noise_cipher_state_dec: Option<CipherState<ChaCha20Poly1305>>,
from: Option<Sensitive<[u8; 32]>>,
to: Option<PubKey>,
local: Option<Sensitive<[u8; 32]>>,
remote: Option<PubKey>,
nonce_for_hello: Vec<u8>,
config: Option<StartConfig>,
@ -132,6 +141,7 @@ pub enum StepReply {
Responder(ProtocolMessage),
Response(ProtocolMessage),
NONE,
CloseNow,
}
pub struct ClientConfig {
@ -147,6 +157,8 @@ pub struct CoreConfig {}
pub struct AdminConfig {}
pub enum StartConfig {
Probe,
Relay(BindAddress),
Client(ClientConfig),
Ext(ExtConfig),
Core(CoreConfig),
@ -160,14 +172,14 @@ impl NoiseFSM {
dir: ConnectionDir,
actors: Arc<Mutex<HashMap<i64, Sender<ConnectionCommand>>>>,
sender: Sender<ConnectionCommand>,
from: Sensitive<[u8; 32]>,
to: Option<PubKey>,
local: Option<Sensitive<[u8; 32]>>,
remote: Option<PubKey>,
) -> Self {
Self {
state: if tp == TransportProtocol::Local {
FSMstate::Local0
} else {
FSMstate::Noise0
FSMstate::Start
},
dir,
bind_addresses,
@ -176,8 +188,8 @@ impl NoiseFSM {
noise_handshake_state: None,
noise_cipher_state_enc: None,
noise_cipher_state_dec: None,
from: Some(from),
to,
local,
remote,
nonce_for_hello: vec![],
config: None,
}
@ -211,7 +223,7 @@ impl NoiseFSM {
}
pub async fn send(&mut self, msg: ProtocolMessage) -> Result<(), ProtocolError> {
log_trace!("SENDING: {:?}", msg);
log_debug!("SENDING: {:?}", msg);
if self.noise_cipher_state_enc.is_some() {
let cipher = self.encrypt(msg)?;
self.sender
@ -244,6 +256,37 @@ impl NoiseFSM {
// }
// }
async fn process_server_noise0(&mut self, noise: &Noise) -> Result<StepReply, ProtocolError> {
let mut handshake = HandshakeState::<X25519, ChaCha20Poly1305, Blake2b>::new(
noise_xk(),
false,
&[],
Some(from_ed_priv_to_dh_priv(self.local.take().unwrap())),
None,
None,
None,
);
let mut payload = handshake.read_message_vec(noise.data()).map_err(|e| {
log_debug!("{:?}", e);
ProtocolError::NoiseHandshakeFailed
})?;
payload = handshake.write_message_vec(&payload).map_err(|e| {
log_debug!("{:?}", e);
ProtocolError::NoiseHandshakeFailed
})?;
let noise = Noise::V0(NoiseV0 { data: payload });
self.send(noise.into()).await?;
self.noise_handshake_state = Some(handshake);
self.state = FSMstate::Noise2;
return Ok(StepReply::NONE);
}
pub async fn step(
&mut self,
mut msg_opt: Option<ProtocolMessage>,
@ -257,9 +300,10 @@ impl NoiseFSM {
}
}
if msg_opt.is_some() {
log_trace!("RECEIVED: {:?}", msg_opt.as_ref().unwrap());
log_debug!("RECEIVED: {:?}", msg_opt.as_ref().unwrap());
}
match self.state {
FSMstate::Closing => {}
// TODO verify that ID is zero
FSMstate::Local0 => {
// CLIENT LOCAL
@ -277,68 +321,125 @@ impl NoiseFSM {
}
}
}
FSMstate::Noise0 => {
// CLIENT INITIALIZE NOISE
FSMstate::Start => {
if !self.dir.is_server() && msg_opt.is_none() {
let mut handshake = HandshakeState::<X25519, ChaCha20Poly1305, Blake2b>::new(
noise_xk(),
true,
&[],
Some(from_ed_priv_to_dh_priv(self.from.take().unwrap())),
None,
Some(*self.to.unwrap().to_dh_from_ed().slice()),
None,
);
let payload = handshake
.write_message_vec(&[])
.map_err(|e| ProtocolError::NoiseHandshakeFailed)?;
let noise = Noise::V0(NoiseV0 { data: payload });
self.send(noise.into()).await?;
self.noise_handshake_state = Some(handshake);
self.state = FSMstate::Noise1;
return Ok(StepReply::NONE);
}
// SERVER INITIALIZE NOISE
else if let Some(msg) = msg_opt.as_ref() {
if self.dir.is_server() {
if let ProtocolMessage::Noise(noise) = msg {
// CLIENT START
match self.config.as_ref().unwrap() {
StartConfig::Probe => {
// PROBE REQUEST
let request = ProtocolMessage::Probe(MAGIC_NG_REQUEST);
self.send(request).await?;
self.state = FSMstate::Probe;
return Ok(StepReply::NONE);
}
StartConfig::Relay(relay_to) => {
// RELAY REQUEST
//self.state
todo!();
}
_ => {
// CLIENT INITIALIZE NOISE
let mut handshake =
HandshakeState::<X25519, ChaCha20Poly1305, Blake2b>::new(
noise_xk(),
false,
true,
&[],
Some(from_ed_priv_to_dh_priv(self.from.take().unwrap())),
None,
Some(from_ed_priv_to_dh_priv(self.local.take().unwrap())),
None,
Some(*self.remote.unwrap().to_dh_from_ed().slice()),
None,
);
let mut payload =
handshake.read_message_vec(noise.data()).map_err(|e| {
log_debug!("{:?}", e);
ProtocolError::NoiseHandshakeFailed
})?;
payload = handshake.write_message_vec(&payload).map_err(|e| {
log_debug!("{:?}", e);
ProtocolError::NoiseHandshakeFailed
})?;
let payload = handshake
.write_message_vec(&[])
.map_err(|e| ProtocolError::NoiseHandshakeFailed)?;
let noise = Noise::V0(NoiseV0 { data: payload });
self.send(noise.into()).await?;
self.noise_handshake_state = Some(handshake);
self.state = FSMstate::Noise2;
self.state = FSMstate::Noise1;
return Ok(StepReply::NONE);
}
}
} else {
#[cfg(not(target_arch = "wasm32"))]
if let Some(msg) = msg_opt.as_ref() {
if self.dir.is_server() {
// SERVER START
match msg {
ProtocolMessage::Probe(magic) => {
// PROBE REQUEST
if *magic != MAGIC_NG_REQUEST {
return Err(ProtocolError::WhereIsTheMagic);
}
let mut probe_response = ProbeResponse {
magic: MAGIC_NG_RESPONSE.to_vec(),
peer_id: None,
};
if BROKER
.read()
.await
.authorize(
&self
.bind_addresses
.ok_or(ProtocolError::BrokerError)?
.1,
Authorization::Discover,
)
.is_ok()
{
probe_response.peer_id =
Some(ed_sensitive_privkey_to_pubkey(
self.local
.as_ref()
.ok_or(ProtocolError::BrokerError)?,
));
}
self.send(ProtocolMessage::ProbeResponse(probe_response))
.await?;
self.state = FSMstate::Closing;
sleep!(std::time::Duration::from_secs(2));
return Ok(StepReply::CloseNow);
}
ProtocolMessage::Relay(_) => {
todo!();
}
ProtocolMessage::Tunnel(_) => {
self.state = FSMstate::Noise1;
todo!();
}
ProtocolMessage::Noise(noise) => {
// SERVER INITIALIZE NOISE
return self.process_server_noise0(noise).await;
}
_ => return Err(ProtocolError::InvalidState),
}
}
}
}
}
FSMstate::Probe => {
// CLIENT side receiving probe response
if let Some(msg) = msg_opt {
let id = msg.id();
if id != 0 {
return Err(ProtocolError::InvalidState);
}
if let ProtocolMessage::ProbeResponse(probe_res) = &msg {
return Ok(StepReply::Response(msg));
}
}
}
FSMstate::Relay => {}
FSMstate::Noise0 => {
if let Some(ProtocolMessage::Noise(noise)) = msg_opt.as_ref() {
if self.dir.is_server() {
return self.process_server_noise0(noise).await;
}
}
}
FSMstate::Noise1 => {
@ -379,6 +480,7 @@ impl NoiseFSM {
StartConfig::Admin(admin_config) => {
todo!();
}
_ => return Err(ProtocolError::InvalidState),
}
self.noise_cipher_state_enc = Some(ciphers.0);
@ -409,7 +511,7 @@ impl NoiseFSM {
return Err(ProtocolError::NoiseHandshakeFailed);
}
let peer_id = PubKey::Ed25519PubKey(handshake.get_rs().unwrap());
self.to = Some(peer_id);
self.remote = Some(peer_id);
let (local_bind_address, remote_bind_address) =
self.bind_addresses.ok_or(ProtocolError::BrokerError)?;
BROKER
@ -601,10 +703,14 @@ impl ConnectionBase {
}
}
pub async fn reset_shutdown(&self, remote_peer_id: PubKey) {
pub async fn release_shutdown(&mut self) {
self.shutdown_sender = None;
}
pub async fn reset_shutdown(&mut self, remote_peer_id: PubKey) {
let _ = self
.shutdown_sender
.as_ref()
.take()
.unwrap()
.send(Either::Right(remote_peer_id))
.await;
@ -663,6 +769,10 @@ impl ConnectionBase {
break; //TODO test that sending a ProtocolError effectively closes the connection (with ConnectionCommand::Close)
}
}
Ok(StepReply::CloseNow) => {
let _ = sender.send(ConnectionCommand::Close).await;
break;
}
Ok(StepReply::NONE) => {}
Ok(StepReply::Responder(responder)) => {
let r = responder
@ -756,6 +866,63 @@ impl ConnectionBase {
self.send(ConnectionCommand::Close).await;
}
pub async fn probe(&mut self) -> Result<Option<PubKey>, ProtocolError> {
if !self.dir.is_server() {
let config = StartConfig::Probe;
let mut actor = Box::new(Actor::<Probe, ProbeResponse>::new(0, true));
self.actors.lock().await.insert(0, actor.get_receiver_tx());
let res;
{
let mut fsm = self.fsm.as_ref().unwrap().lock().await;
fsm.config = Some(config);
res = fsm.step(None).await;
}
if let Err(err) = res {
self.send(ConnectionCommand::ProtocolError(err.clone()))
.await;
return Err(err);
}
let mut receiver = actor.detach_receiver();
let mut shutdown = self.take_shutdown();
select! {
res = async_std::future::timeout(std::time::Duration::from_secs(2),receiver.next()).fuse() => {
self.fsm
.as_mut()
.unwrap()
.lock()
.await
.remove_actor(0)
.await;
match res {
Ok(Some(ConnectionCommand::Msg(ProtocolMessage::ProbeResponse(res)))) => {
if res.magic == MAGIC_NG_RESPONSE {
self.close().await;
return Ok(res.peer_id);
}
}
Err(_) => {}
_ => {}
}
self.close().await;
return Err(ProtocolError::WhereIsTheMagic);
},
r = shutdown.next().fuse() => {
self.fsm
.as_mut()
.unwrap()
.lock()
.await
.remove_actor(0)
.await;
return Err(ProtocolError::Closing);
}
}
} else {
panic!("cannot call probe on a server-side connection");
}
}
pub async fn start(&mut self, config: StartConfig) {
// BOOTSTRAP the protocol from client-side
if !self.dir.is_server() {
@ -776,8 +943,8 @@ impl ConnectionBase {
pub fn start_read_loop(
&mut self,
bind_addresses: Option<(BindAddress, BindAddress)>,
from: Sensitive<[u8; 32]>,
to: Option<PubKey>,
local: Option<Sensitive<[u8; 32]>>,
remote: Option<PubKey>,
) {
let (sender_tx, sender_rx) = mpsc::unbounded();
let (receiver_tx, receiver_rx) = mpsc::unbounded();
@ -792,8 +959,8 @@ impl ConnectionBase {
self.dir.clone(),
Arc::clone(&self.actors),
sender_tx.clone(),
from,
to,
local,
remote,
)));
self.fsm = Some(Arc::clone(&fsm));

@ -79,6 +79,7 @@ pub enum ProtocolError {
NoiseHandshakeFailed,
DecryptionError,
EncryptionError,
WhereIsTheMagic,
InvalidNonce,
} //MAX 949 ProtocolErrors

@ -219,6 +219,12 @@ impl AcceptForwardForV0 {
_ => false,
}
}
pub fn is_no(&self) -> bool {
match self {
AcceptForwardForV0::No => true,
_ => false,
}
}
pub fn is_public_dyn(&self) -> bool {
match self {
AcceptForwardForV0::PublicDyn(_) => true,
@ -2121,8 +2127,79 @@ impl TryFrom<ProtocolMessage> for ExtResponse {
///
/// PROTOCOL MESSAGES
///
pub static MAGIC_NG_REQUEST: [u8; 2] = [78u8, 71u8];
pub static MAGIC_NG_RESPONSE: [u8; 4] = [89u8, 88u8, 78u8, 75u8];
#[derive(Clone, Debug)]
pub enum Authorization {
Discover,
ExtMessage,
Client,
Core,
Admin,
}
/// ProbeResponse
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ProbeResponse {
/// Response Magic number
#[serde(with = "serde_bytes")]
pub magic: Vec<u8>,
/// Used for discovery of broker on private LAN
/// see ListenerV0.discoverable
pub peer_id: Option<PubKey>,
}
/// RelayRequest
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RelayRequest {
/// The BindAddress of the broker to relay to should be of the same IP family than the TunnelRequest.remote_addr
pub address: BindAddress,
}
/// RelayResponse
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RelayResponse {
/// Response Magic number
#[serde(with = "serde_bytes")]
pub magic: Vec<u8>,
/// result to the relay request (accept, refuse)
pub result: u16,
}
/// Tunnel Request
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TunnelRequest {
/// Request Magic number
#[serde(with = "serde_bytes")]
pub magic: Vec<u8>,
// Bind address of client as connected to the relaying broker.
pub remote_addr: BindAddress,
}
/// Tunnel Response
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TunnelResponse {
/// Response Magic number
#[serde(with = "serde_bytes")]
pub magic: Vec<u8>,
/// result to the tunnel request (accept, refuse)
pub result: u16,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ProtocolMessage {
Probe([u8; 2]),
ProbeResponse(ProbeResponse),
Relay(RelayRequest),
RelayResponse(RelayResponse),
Tunnel(TunnelRequest),
TunnelResponse(TunnelResponse),
Noise(Noise),
Start(StartProtocol),
ServerHello(ServerHello),
@ -2136,26 +2213,18 @@ pub enum ProtocolMessage {
impl ProtocolMessage {
pub fn id(&self) -> i64 {
match self {
ProtocolMessage::Noise(_) => 0,
ProtocolMessage::Start(_) => 0,
ProtocolMessage::ServerHello(_) => 0,
ProtocolMessage::ClientAuth(_) => 0,
ProtocolMessage::AuthResult(_) => 0,
ProtocolMessage::ExtRequest(ext_req) => ext_req.id(),
ProtocolMessage::ExtResponse(ext_res) => ext_res.id(),
ProtocolMessage::BrokerMessage(broker_msg) => broker_msg.id(),
_ => 0,
}
}
pub fn set_id(&mut self, id: i64) {
match self {
ProtocolMessage::Noise(_) => panic!("cannot set ID"),
ProtocolMessage::Start(_) => panic!("cannot set ID"),
ProtocolMessage::ServerHello(_) => panic!("cannot set ID"),
ProtocolMessage::ClientAuth(_) => panic!("cannot set ID"),
ProtocolMessage::AuthResult(_) => panic!("cannot set ID"),
ProtocolMessage::ExtRequest(ext_req) => ext_req.set_id(id),
ProtocolMessage::ExtResponse(ext_res) => ext_res.set_id(id),
ProtocolMessage::BrokerMessage(broker_msg) => broker_msg.set_id(id),
_ => panic!("cannot set ID"),
}
}
pub fn type_id(&self) -> TypeId {
@ -2168,6 +2237,12 @@ impl ProtocolMessage {
ProtocolMessage::ExtRequest(a) => a.type_id(),
ProtocolMessage::ExtResponse(a) => a.type_id(),
ProtocolMessage::BrokerMessage(a) => a.type_id(),
ProtocolMessage::Probe(a) => a.type_id(),
ProtocolMessage::ProbeResponse(a) => a.type_id(),
ProtocolMessage::Relay(a) => a.type_id(),
ProtocolMessage::RelayResponse(a) => a.type_id(),
ProtocolMessage::Tunnel(a) => a.type_id(),
ProtocolMessage::TunnelResponse(a) => a.type_id(),
}
}

@ -60,6 +60,13 @@ pub fn keypair_from_ed(secret: SecretKey, public: PublicKey) -> (Sensitive<[u8;
(priv_key, pub_key)
}
pub fn ed_sensitive_privkey_to_pubkey(privkey: &Sensitive<[u8; 32]>) -> PubKey {
//TODO FIXME do not create a SecretKey or call into() on it, as this is not using Sensitive<>
let sk = SecretKey::from_bytes(privkey.as_slice()).unwrap();
let pk: PublicKey = (&sk).into();
PubKey::Ed25519PubKey(pk.to_bytes())
}
pub fn keys_from_bytes(secret_key: [u8; 32]) -> (Sensitive<[u8; 32]>, PubKey) {
let sk = SecretKey::from_bytes(&secret_key).unwrap();
let pk: PublicKey = (&sk).into();

Loading…
Cancel
Save