diff --git a/Cargo.lock b/Cargo.lock index f212f58..1e8b298 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -359,16 +359,16 @@ dependencies = [ [[package]] name = "async-tungstenite" -version = "0.17.2" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1b71b31561643aa8e7df3effe284fa83ab1a840e52294c5f4bd7bfd8b2becbb" +checksum = "ce01ac37fdc85f10a43c43bc582cbd566720357011578a935761075f898baf58" dependencies = [ "async-std", "futures-io", "futures-util", "log", "pin-project-lite", - "tungstenite 0.17.3", + "tungstenite 0.19.0", ] [[package]] @@ -1105,6 +1105,12 @@ dependencies = [ "syn 2.0.16", ] +[[package]] +name = "data-encoding" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" + [[package]] name = "debug_print" version = "1.0.0" @@ -2974,10 +2980,10 @@ dependencies = [ "futures", "getrandom 0.2.9", "hex", + "once_cell", "p2p-client-ws", "p2p-net", "p2p-repo", - "rust-fsm", "serde", "serde_bare", "serde_bytes", @@ -3017,6 +3023,7 @@ dependencies = [ "async-std", "async-trait", "blake3", + "default-net", "ed25519-dalek", "futures", "getrandom 0.2.9", @@ -3769,25 +3776,6 @@ dependencies = [ "walkdir", ] -[[package]] -name = "rust-fsm" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "021d7de715253e45ad24a2fbb0725a0f7f271fd8d3163b130bd65ce2816a860d" -dependencies = [ - "rust-fsm-dsl", -] - -[[package]] -name = "rust-fsm-dsl" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a66b1273014079e4cf2b04aad1f3a2849e26e9a106f0411be2b1c15c23a791a" -dependencies = [ - "quote", - "syn 1.0.109", -] - [[package]] name = "rustc_version" version = "0.4.0" @@ -4061,17 +4049,6 @@ dependencies = [ "stable_deref_trait", ] -[[package]] -name = "sha-1" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" -dependencies = [ - "cfg-if", - "cpufeatures", - "digest 0.10.7", -] - [[package]] name = "sha1" version = "0.10.5" @@ -4919,9 +4896,9 @@ checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" [[package]] name = "tungstenite" -version = "0.17.3" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e27992fd6a8c29ee7eef28fc78349aa244134e10ad447ce3b9f0ac0ed0fa4ce0" +checksum = "30ee6ab729cd4cf0fd55218530c4522ed30b7b6081752839b68fcec8d0960788" dependencies = [ "base64 0.13.1", "byteorder", @@ -4930,7 +4907,7 @@ dependencies = [ "httparse", "log", "rand 0.8.5", - "sha-1", + "sha1", "thiserror", "url", "utf-8", @@ -4938,13 +4915,13 @@ dependencies = [ [[package]] name = "tungstenite" -version = "0.18.0" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30ee6ab729cd4cf0fd55218530c4522ed30b7b6081752839b68fcec8d0960788" +checksum = "15fba1a6d6bb030745759a9a2a588bfe8490fc8b4751a277db3a0be1c9ebbf67" dependencies = [ - "base64 0.13.1", "byteorder", "bytes", + "data-encoding", "http", "httparse", "log", diff --git a/README.md b/README.md index 3d06ac3..62e5faa 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,7 @@ cargo run --bin ngd cargo run --bin ngcli ``` -For the web apps, see the [README](ng-app-js/README.md) +For the web apps, see the [README](ng-app/README.md) ### Test @@ -96,7 +96,7 @@ cargo test --package ngcli -- --nocapture Test WASM websocket ``` -cd ng-app-js +cd ng-sdk-js wasm-pack test --chrome --headless ``` diff --git a/ng-sdk-js/README.md b/ng-sdk-js/README.md index 97926a1..050d6e4 100644 --- a/ng-sdk-js/README.md +++ b/ng-sdk-js/README.md @@ -14,7 +14,7 @@ JS/WASM crate containing the SDK of NextGraph This crate is composed of -- the npm package `ng-app-js` which is the SDK +- the npm package `ng-sdk-js` which is the SDK - an example of web app using the ESmodule and webpack as bundler `app-web` - an example of React web app `app-react` - an example of node-js app `app-node` diff --git a/ng-sdk-js/app-node/package-lock.json b/ng-sdk-js/app-node/package-lock.json index a0e1fbc..e5b3c7e 100644 --- a/ng-sdk-js/app-node/package-lock.json +++ b/ng-sdk-js/app-node/package-lock.json @@ -9,12 +9,12 @@ "version": "0.1.0", "license": "(MIT OR Apache-2.0)", "dependencies": { - "ng-app-node-sdk": "^0.1.0", + "ng-sdk-node": "^0.1.0", "ws": "^8.13.0" } }, "../pkg-node": { - "name": "ng-app-js-sdk", + "name": "ng-sdk-node", "version": "0.1.0", "license": "MIT/Apache-2.0" }, @@ -32,10 +32,6 @@ "node": ">=6.14.2" } }, - "node_modules/ng-app-node-sdk": { - "resolved": "../pkg-node", - "link": true - }, "node_modules/node-gyp-build": { "version": "4.6.0", "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.6.0.tgz", @@ -94,9 +90,6 @@ "node-gyp-build": "^4.3.0" } }, - "ng-app-node-sdk": { - "version": "file:../pkg-node" - }, "node-gyp-build": { "version": "4.6.0", "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.6.0.tgz", diff --git a/ngd/src/cli.rs b/ngd/src/cli.rs index f788023..bf76e60 100644 --- a/ngd/src/cli.rs +++ b/ngd/src/cli.rs @@ -43,6 +43,10 @@ pub(crate) struct Cli { #[arg(short, long, value_name("INTERFACE:PORT"), default_missing_value("default"), num_args(0..=1))] pub core: Option, + /// When --core is used, this option will allow clients to connect to the public interface too. Otherwise, by default, they cannot. + #[arg(long, requires("core"))] + pub core_with_clients: bool, + /// Quick config to forward all requests to another BROKER. format is "[DOMAIN/IP:PORT]@PEERID". An IPv6 should be encased in square brackets [IPv6] and the whole option should be between double quotes. Port defaults to 80 for IPs and 443 for domains #[arg( short, @@ -67,6 +71,10 @@ pub(crate) struct Cli { )] pub public: Option, + /// When --public is used, this option will disallow clients to connect to the public interface too. Otherwise, by default, they can. Should be used in combination with a --domain option + #[arg(long, requires("public"), conflicts_with("private"))] + pub public_without_clients: bool, + /// Quick config to listen for clients and core brokers on PRIVATE_INTERFACE, behind a DMZ or port forwarding of a public dynamic IP. PORTs defaults to 80 #[arg(short('y'), long, value_name("PRIVATE_INTERFACE:PORT,PUBLIC_PORT"), default_missing_value("default"), num_args(0..=1), conflicts_with("public"), conflicts_with("core"))] pub dynamic: Option, diff --git a/ngd/src/main.rs b/ngd/src/main.rs index 686df41..09887f2 100644 --- a/ngd/src/main.rs +++ b/ngd/src/main.rs @@ -21,6 +21,10 @@ use p2p_broker::server_ws::run_server_v0; use p2p_broker::types::*; use p2p_broker::utils::*; use p2p_net::types::*; +use p2p_net::utils::is_private_ip; +use p2p_net::utils::is_public_ip; +use p2p_net::utils::is_public_ipv4; +use p2p_net::utils::is_public_ipv6; use p2p_net::utils::{ gen_keys, is_ipv4_global, is_ipv4_private, is_ipv6_global, is_ipv6_private, keys_from_bytes, Dual25519Keys, Sensitive, U8Array, @@ -501,8 +505,7 @@ async fn main_inner() -> Result<(), ()> { } Some(loopback) => { overlays_config.server = BrokerOverlayPermission::AllRegisteredUser; - let mut listener = - ListenerV0::new_direct(loopback.name, !args.no_ipv6, local_port); + let mut listener = ListenerV0::new_direct(loopback, !args.no_ipv6, local_port); listener.accept_direct = false; let res = prepare_accept_forward_for_domain(domain, &args).map_err(|_| { log_err!("The --domain-peer option has an invalid key. it must be a base64_url encoded serialization of a PrivKey. cannot start") @@ -535,7 +538,7 @@ async fn main_inner() -> Result<(), ()> { r.ipv6 = !args.no_ipv6; } else { listeners.push(ListenerV0::new_direct( - loopback.name, + loopback, !args.no_ipv6, args.local.unwrap(), )); @@ -568,12 +571,13 @@ async fn main_inner() -> Result<(), ()> { } Some(public) => { overlays_config.core = BrokerOverlayPermission::AllRegisteredUser; - overlays_config.server = BrokerOverlayPermission::AllRegisteredUser; - listeners.push(ListenerV0::new_direct( - public.name, - !args.no_ipv6, - arg_value.1, - )); + let mut listener = ListenerV0::new_direct(public, !args.no_ipv6, arg_value.1); + if args.core_with_clients { + overlays_config.server = BrokerOverlayPermission::AllRegisteredUser; + } else { + listener.refuse_clients = true; + } + listeners.push(listener); } } } @@ -624,7 +628,9 @@ async fn main_inner() -> Result<(), ()> { } overlays_config.core = BrokerOverlayPermission::AllRegisteredUser; - overlays_config.server = BrokerOverlayPermission::AllRegisteredUser; + if !args.public_without_clients { + overlays_config.server = BrokerOverlayPermission::AllRegisteredUser; + } let ipv6 = public_part.0.map(|ipv6| BindAddress { port: public_part.1 .1, @@ -633,10 +639,13 @@ async fn main_inner() -> Result<(), ()> { listeners.push(ListenerV0 { interface_name: private_interface.name, + if_type: private_interface.if_type, ipv6: public_part.0.is_some(), interface_refresh: 0, port: private_part.1, discoverable: false, + refuse_clients: args.public_without_clients, + serve_app: true, accept_direct: false, accept_forward_for: AcceptForwardForV0::PublicStatic(( BindAddress { @@ -700,16 +709,16 @@ async fn main_inner() -> Result<(), ()> { && listeners.last().unwrap().port == arg_value.1 { let r = listeners.last_mut().unwrap(); - r.ipv6 = !args.no_ipv6; if r.accept_forward_for != AcceptForwardForV0::No { log_err!("The same private interface is already forwarding with a different setting, probably because of a --public option conflicting with a --dynamic option. Changing the port on one of the interfaces can help. cannot start"); return Err(()); } - r.accept_forward_for = - AcceptForwardForV0::PublicDyn((public_port, 60, "".to_string())); + panic!("this should never happen. --dynamic created after a --private"); + //r.ipv6 = !args.no_ipv6; + //r.accept_forward_for = AcceptForwardForV0::PublicDyn((public_port, 60, "".to_string())); } else { let mut listener = - ListenerV0::new_direct(inter.name, !args.no_ipv6, arg_value.1); + ListenerV0::new_direct(inter, !args.no_ipv6, arg_value.1); listener.accept_direct = false; listener.accept_forward_for = AcceptForwardForV0::PublicDyn((public_port, 60, "".to_string())); @@ -764,15 +773,18 @@ async fn main_inner() -> Result<(), ()> { && listeners.last().unwrap().port == arg_value.1 { let r = listeners.last_mut().unwrap(); - r.ipv6 = !args.no_ipv6; if r.accept_forward_for != AcceptForwardForV0::No { log_err!("The same private interface is already forwarding with a different setting, probably because of a --public or --dynamic option conflicting with the --domain-private option. Changing the port on one of the interfaces can help. cannot start"); return Err(()); } - r.accept_forward_for = res; + panic!( + "this should never happen. --domain-private created after a --private" + ); + //r.ipv6 = !args.no_ipv6; + //r.accept_forward_for = res; } else { let mut listener = - ListenerV0::new_direct(inter.name, !args.no_ipv6, arg_value.1); + ListenerV0::new_direct(inter, !args.no_ipv6, arg_value.1); listener.accept_direct = false; listener.accept_forward_for = res; @@ -819,11 +831,7 @@ async fn main_inner() -> Result<(), ()> { r.accept_direct = true; r.ipv6 = !args.no_ipv6; } else { - listeners.push(ListenerV0::new_direct( - inter.name, - !args.no_ipv6, - arg_value.1, - )); + listeners.push(ListenerV0::new_direct(inter, !args.no_ipv6, arg_value.1)); } } } @@ -954,7 +962,7 @@ async fn main_inner() -> Result<(), ()> { log_info!("PeerId of node: {}", pubkey); debug_println!("Private key of peer: {}", prix_key_encoded); - match (config.unwrap()) { + match config.unwrap() { DaemonConfig::V0(v0) => { run_server_v0( privkey, diff --git a/ngd/src/types.rs b/ngd/src/types.rs index 23f351d..053a45a 100644 --- a/ngd/src/types.rs +++ b/ngd/src/types.rs @@ -6,7 +6,3 @@ // at your option. All files in the project carrying such // notice may not be copied, modified, or distributed except // according to those terms. -use p2p_broker::types::BrokerOverlayConfigV0; -use p2p_broker::types::ListenerV0; -use p2p_repo::types::PrivKey; -use serde::{Deserialize, Serialize}; diff --git a/p2p-broker/Cargo.toml b/p2p-broker/Cargo.toml index 2213783..f58c3dc 100644 --- a/p2p-broker/Cargo.toml +++ b/p2p-broker/Cargo.toml @@ -18,18 +18,18 @@ serde_bare = "0.5.0" serde_bytes = "0.11.7" async-std = { version = "1.12.0", features = ["attributes"] } futures = "0.3.24" -rust-fsm = "0.6.0" async-channel = "1.7.1" tempfile = "3" hex = "0.4.3" async-trait = "0.1.64" -async-tungstenite = { version = "0.17.2", features = ["async-std-runtime"] } +async-tungstenite = { version = "0.22.2", features = ["async-std-runtime"] } blake3 = "1.3.1" -default-net = "0.15" +once_cell = "1.17.1" [target.'cfg(target_arch = "wasm32")'.dependencies.getrandom] version = "0.2.7" features = ["js"] [target.'cfg(not(target_arch = "wasm32"))'.dependencies] -getrandom = "0.2.7" \ No newline at end of file +getrandom = "0.2.7" +default-net = "0.15" \ No newline at end of file diff --git a/p2p-broker/src/interfaces.rs b/p2p-broker/src/interfaces.rs index d898a4e..4823115 100644 --- a/p2p-broker/src/interfaces.rs +++ b/p2p-broker/src/interfaces.rs @@ -8,57 +8,18 @@ * notice may not be copied, modified, or distributed except * according to those terms. */ -use p2p_net::utils::{is_ipv4_global, is_ipv4_private, is_ipv6_global, is_ipv6_private}; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; - -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum InterfaceType { - Loopback, - Private, - Public, - Invalid, -} - -impl InterfaceType { - pub fn is_ipv4_valid_for_type(&self, ip: &Ipv4Addr) -> bool { - match self { - InterfaceType::Loopback => ip.is_loopback(), - InterfaceType::Public => is_public_ipv4(ip), - // we allow to bind to link-local for IPv4 - InterfaceType::Private => is_ipv4_private(ip), - _ => false, - } - } - pub fn is_ipv6_valid_for_type(&self, ip: &Ipv6Addr) -> bool { - match self { - InterfaceType::Loopback => ip.is_loopback(), - InterfaceType::Public => is_public_ipv6(ip), - // we do NOT allow to bind to link-local for IPv6 - InterfaceType::Private => is_ipv6_private(ip), - _ => false, - } - } -} +use p2p_net::types::{Interface, InterfaceType}; +use p2p_net::utils::{is_ipv4_private, is_public_ipv4}; +#[cfg(not(target_arch = "wasm32"))] pub fn print_ipv4(ip: &default_net::ip::Ipv4Net) -> String { format!("{}/{}", ip.addr, ip.prefix_len) } - +#[cfg(not(target_arch = "wasm32"))] pub fn print_ipv6(ip: &default_net::ip::Ipv6Net) -> String { format!("{}/{}", ip.addr, ip.prefix_len) } -#[derive(Clone, Debug)] -pub struct Interface { - pub if_type: InterfaceType, - pub name: String, - pub mac_addr: Option, - /// List of Ipv4Net for the network interface - pub ipv4: Vec, - /// List of Ipv6Net for the network interface - pub ipv6: Vec, -} - pub fn find_first(list: &Vec, iftype: InterfaceType) -> Option { for inf in list { if inf.if_type == iftype { @@ -90,30 +51,7 @@ pub fn find_name(list: &Vec, name: &String) -> Option { None } -pub fn is_public_ipv4(ip: &Ipv4Addr) -> bool { - // TODO, use core::net::Ipv6Addr.is_global when it will be stable - return is_ipv4_global(ip); -} - -pub fn is_public_ipv6(ip: &Ipv6Addr) -> bool { - // TODO, use core::net::Ipv6Addr.is_global when it will be stable - return is_ipv6_global(ip); -} - -pub fn is_public_ip(ip: &IpAddr) -> bool { - match ip { - IpAddr::V4(v4) => is_public_ipv4(v4), - IpAddr::V6(v6) => is_public_ipv6(v6), - } -} - -pub fn is_private_ip(ip: &IpAddr) -> bool { - match ip { - IpAddr::V4(v4) => is_ipv4_private(v4), - IpAddr::V6(v6) => is_ipv6_private(v6), - } -} - +#[cfg(not(target_arch = "wasm32"))] pub fn get_interface() -> Vec { let mut res: Vec = vec![]; let interfaces = default_net::get_interfaces(); diff --git a/p2p-broker/src/server_ws.rs b/p2p-broker/src/server_ws.rs index 73ff380..c55e013 100644 --- a/p2p-broker/src/server_ws.rs +++ b/p2p-broker/src/server_ws.rs @@ -13,20 +13,31 @@ use crate::interfaces::*; use crate::types::*; +use async_std::io::ReadExt; use async_std::net::{TcpListener, TcpStream}; use async_std::sync::Mutex; use async_std::task; -use async_tungstenite::accept_async; +use async_tungstenite::accept_hdr_async; +use async_tungstenite::tungstenite::handshake::server::{ + Callback, ErrorResponse, Request, Response, +}; +use async_tungstenite::tungstenite::http::header::{CONNECTION, HOST, ORIGIN, UPGRADE}; +use async_tungstenite::tungstenite::http::HeaderValue; +use async_tungstenite::tungstenite::http::StatusCode; use async_tungstenite::tungstenite::protocol::Message; use futures::{SinkExt, StreamExt}; +use once_cell::sync::OnceCell; use p2p_client_ws::remote_ws::ConnectionWebSocket; use p2p_net::broker::*; use p2p_net::connection::IAccept; -use p2p_net::types::IP; -use p2p_net::utils::Sensitive; +use p2p_net::types::*; +use p2p_net::utils::is_private_ip; +use p2p_net::utils::is_public_ip; +use p2p_net::utils::{get_domain_without_port, Sensitive, U8Array}; use p2p_repo::log::*; use p2p_repo::types::{PrivKey, PubKey}; use p2p_repo::utils::generate_keypair; +use std::collections::HashMap; use std::collections::HashSet; use std::fs; use std::net::SocketAddr; @@ -39,22 +50,382 @@ use stores_lmdb::kcv_store::LmdbKCVStore; use stores_lmdb::repo_store::LmdbRepoStore; use tempfile::Builder; -pub async fn accept(tcp: TcpStream, peer_priv_key: Sensitive<[u8; 32]>, peer_pub_key: PubKey) { - let sock_addr = tcp.peer_addr().unwrap(); - let ip = sock_addr.ip(); - let mut ws = accept_async(tcp).await.unwrap(); +static LISTENERS_INFO: OnceCell<(HashMap, HashMap)> = + OnceCell::new(); +struct SecurityCallback { + remote_bind_address: BindAddress, + local_bind_address: BindAddress, +} - let cws = ConnectionWebSocket {}; - let base = cws.accept(peer_priv_key, peer_pub_key, ws).await.unwrap(); +impl SecurityCallback { + fn new(remote_bind_address: BindAddress, local_bind_address: BindAddress) -> Self { + Self { + remote_bind_address, + local_bind_address, + } + } +} + +fn make_error(code: StatusCode) -> ErrorResponse { + Response::builder().status(code).body(None).unwrap() +} + +fn check_origin_is_url( + origin: Option<&HeaderValue>, + domains: Vec, +) -> Result<(), ErrorResponse> { + match origin { + None => Ok(()), + Some(val) => { + for domain in domains { + if val.to_str().unwrap().starts_with(domain.as_str()) { + return Ok(()); + } + } + Err(make_error(StatusCode::FORBIDDEN)) + } + } +} + +fn check_xff_is_public_or_private( + xff: Option<&HeaderValue>, + none_is_ok: bool, + public: bool, +) -> Result<(), ErrorResponse> { + match xff { + None => { + if none_is_ok { + Ok(()) + } else { + Err(make_error(StatusCode::FORBIDDEN)) + } + } + Some(val) => { + let ip_str = val + .to_str() + .map_err(|_| make_error(StatusCode::FORBIDDEN))?; + let ip: IpAddr = ip_str + .parse() + .map_err(|_| make_error(StatusCode::FORBIDDEN))?; + if public && !is_public_ip(&ip) || !public && !is_private_ip(&ip) { + Err(make_error(StatusCode::FORBIDDEN)) + } else { + Ok(()) + } + } + } +} + +fn check_no_xff(xff: Option<&HeaderValue>) -> Result<(), ErrorResponse> { + match xff { + None => Ok(()), + Some(_) => Err(make_error(StatusCode::FORBIDDEN)), + } +} + +fn check_host(host: Option<&HeaderValue>, hosts: Vec) -> Result<(), ErrorResponse> { + match host { + None => Err(make_error(StatusCode::FORBIDDEN)), + Some(val) => { + for hos in hosts { + if val.to_str().unwrap().starts_with(&hos) { + return Ok(()); + } + } + Err(make_error(StatusCode::FORBIDDEN)) + } + } +} + +fn check_host_in_addrs( + host: Option<&HeaderValue>, + addrs: &Vec, +) -> Result<(), ErrorResponse> { + match host { + None => Err(make_error(StatusCode::FORBIDDEN)), + Some(val) => { + for ba in addrs { + if val.to_str().unwrap().starts_with(&ba.ip.to_string()) { + return Ok(()); + } + } + Err(make_error(StatusCode::FORBIDDEN)) + } + } +} + +fn prepare_domain_url_and_host( + accept_forward_for: &AcceptForwardForV0, +) -> (Vec, Vec) { + let domain_str = accept_forward_for.get_domain(); + let url = ["https://", domain_str].concat(); + let hosts_str = vec![domain_str.to_string()]; + let urls_str = vec![url]; + (hosts_str, urls_str) +} + +fn prepare_urls_from_private_addrs(addrs: &Vec, port: u16) -> Vec { + let port_str = if port != 80 { + [":", &port.to_string()].concat() + } else { + "".to_string() + }; + let mut res: Vec = vec![]; + for addr in addrs { + let url = ["http://", &addr.ip.to_string(), &port_str].concat(); + res.push(url); + } + res +} + +fn upgrade_ws_or_serve_app( + upgrade: Option<&HeaderValue>, + remote: IP, + serve_app: bool, + response: Response, +) -> Result { + if upgrade.is_some() + && upgrade + .unwrap() + .to_str() + .unwrap() + .split(|c| c == ' ' || c == ',') + .any(|p| p.eq_ignore_ascii_case("Upgrade")) + { + return Ok(response); + } + + if serve_app && (remote.is_private() || remote.is_loopback()) { + return Err(make_error(StatusCode::OK)); + } + + Err(make_error(StatusCode::FORBIDDEN)) +} + +const LOCAL_HOSTS: [&str; 3] = ["localhost", "127.0.0.1", "::1"]; +const LOCAL_URLS: [&str; 3] = ["http://localhost", "http://127.0.0.1", "http://::1"]; +const APP_NG_ONE_URL: &str = "https://app.nextgraph.one"; + +impl Callback for SecurityCallback { + fn on_request(self, request: &Request, response: Response) -> Result { + let local_urls = LOCAL_URLS + .to_vec() + .iter() + .map(ToString::to_string) + .collect(); + + let local_hosts = LOCAL_HOSTS + .to_vec() + .iter() + .map(ToString::to_string) + .collect(); + + let (listeners, bind_addresses) = LISTENERS_INFO.get().ok_or( + Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(None) + .unwrap(), + )?; + + // check that the remote address is allowed to connect on the listener + + let listener_id = bind_addresses + .get(&self.local_bind_address) + .ok_or(make_error(StatusCode::FORBIDDEN))?; + let listener = listeners + .get(listener_id) + .ok_or(make_error(StatusCode::FORBIDDEN))?; + + let xff = request.headers().get("X-Forwarded-For"); + let upgrade = request.headers().get(CONNECTION); + let host = request.headers().get(HOST); + let origin = request.headers().get(ORIGIN); + let remote = self.remote_bind_address.ip; + let xff = request.headers().get("X-Forwarded-For"); + + log_debug!( + "upgrade:{:?} origin:{:?} host:{:?} xff:{:?} remote:{:?} local:{:?}", + upgrade, + origin, + host, + xff, + remote, + self.local_bind_address + ); + + match listener.config.if_type { + InterfaceType::Public => { + if !remote.is_public() { + return Err(make_error(StatusCode::FORBIDDEN)); + } + check_no_xff(xff)?; + let mut urls_str = vec![]; + if !listener.config.refuse_clients { + urls_str.push(APP_NG_ONE_URL.to_string()); + } + check_origin_is_url(origin, urls_str)?; + + check_host_in_addrs(host, &listener.addrs)?; + log_debug!( + "accepted core with refuse_clients {}", + listener.config.refuse_clients + ); + return Ok(response); + } + InterfaceType::Loopback => { + if !remote.is_loopback() { + return Err(make_error(StatusCode::FORBIDDEN)); + } + + if listener.config.accept_forward_for.is_public_domain() { + let (mut hosts_str, mut urls_str) = + prepare_domain_url_and_host(&listener.config.accept_forward_for); + if listener.config.accept_direct { + hosts_str = [hosts_str, local_hosts].concat(); + // TODO local_urls might need a trailing :port, but it is ok for now as we do starts_with + urls_str = [urls_str, local_urls].concat(); + } + check_origin_is_url(origin, urls_str)?; + check_host(host, hosts_str)?; + check_xff_is_public_or_private(xff, listener.config.accept_direct, true)?; + log_debug!( + "accepted loopback PUBLIC_DOMAIN with direct {}", + listener.config.accept_direct + ); + return upgrade_ws_or_serve_app( + upgrade, + remote, + listener.config.serve_app, + response, + ); + } else if listener.config.accept_forward_for.is_private_domain() { + let (hosts_str, urls_str) = + prepare_domain_url_and_host(&listener.config.accept_forward_for); + check_origin_is_url(origin, urls_str)?; + check_host(host, hosts_str)?; + check_xff_is_public_or_private(xff, false, false)?; + log_debug!("accepted loopback PRIVATE_DOMAIN"); + return Ok(response); + } else if listener.config.accept_forward_for == AcceptForwardForV0::No { + check_host(host, local_hosts)?; + check_no_xff(xff)?; + // TODO local_urls might need a trailing :port, but it is ok for now as we do starts_with + check_origin_is_url(origin, local_urls)?; + log_debug!("accepted loopback DIRECT"); + return Ok(response); + } + } + InterfaceType::Private => { + if listener.config.accept_forward_for.is_public_static() + || listener.config.accept_forward_for.is_public_dyn() + { + if !listener.config.accept_direct && !remote.is_public() + || listener.config.accept_direct + && !remote.is_private() + && !remote.is_public() + { + return Err(make_error(StatusCode::FORBIDDEN)); + } + check_no_xff(xff)?; - //TODO FIXME get remote_peer_id from ConnectionBase (once it is available) - let (priv_key, pub_key) = generate_keypair(); - let remote_peer_id = pub_key; + let mut addrs = listener + .config + .accept_forward_for + .get_public_bind_addresses(); + let mut urls_str = vec![]; + if !listener.config.refuse_clients { + urls_str.push(APP_NG_ONE_URL.to_string()); + } + if listener.config.accept_direct { + addrs.extend(&listener.addrs); + urls_str = [ + urls_str, + prepare_urls_from_private_addrs(&listener.addrs, listener.config.port), + ] + .concat(); + } + check_origin_is_url(origin, urls_str)?; + check_host_in_addrs(host, &addrs)?; + log_debug!("accepted private PUBLIC_STATIC or PUBLIC_DYN with direct {} with refuse_clients {}",listener.config.accept_direct, listener.config.refuse_clients); + return Ok(response); + } else if listener.config.accept_forward_for.is_public_domain() { + if !remote.is_private() { + return Err(make_error(StatusCode::FORBIDDEN)); + } + check_xff_is_public_or_private(xff, listener.config.accept_direct, true)?; + + let (mut hosts_str, mut urls_str) = + prepare_domain_url_and_host(&listener.config.accept_forward_for); + if listener.config.accept_direct { + for addr in listener.addrs.iter() { + let str = addr.ip.to_string(); + hosts_str.push(str); + } + urls_str = [ + urls_str, + prepare_urls_from_private_addrs(&listener.addrs, listener.config.port), + ] + .concat(); + } + check_origin_is_url(origin, urls_str)?; + check_host(host, hosts_str)?; + log_debug!( + "accepted private PUBLIC_DOMAIN with direct {}", + listener.config.accept_direct + ); + return Ok(response); + } else if listener.config.accept_forward_for == AcceptForwardForV0::No { + if !remote.is_private() { + return Err(make_error(StatusCode::FORBIDDEN)); + } + + check_no_xff(xff)?; + + check_host_in_addrs(host, &listener.addrs)?; + check_origin_is_url( + origin, + prepare_urls_from_private_addrs(&listener.addrs, listener.config.port), + )?; + log_debug!("accepted private DIRECT"); + return Ok(response); + } + } + _ => {} + } + + Err(make_error(StatusCode::FORBIDDEN)) + } +} + +pub async fn accept(tcp: TcpStream, peer_priv_key: Sensitive<[u8; 32]>) { + let remote_addr = tcp.peer_addr().unwrap(); + let remote_bind_address: BindAddress = (&remote_addr).into(); + + let local_addr = tcp.local_addr().unwrap(); + let local_bind_address: BindAddress = (&local_addr).into(); + + let ws = accept_hdr_async( + tcp, + SecurityCallback::new(remote_bind_address, local_bind_address), + ) + .await; + if ws.is_err() { + log_debug!("websocket rejected {:?}", ws.err()); + //let mut buffer = Vec::new(); + //tcp.read_to_end(&mut buffer).await; + //log_debug!("{:?}", buffer); + return; + } + + log_debug!("websocket accepted"); + + let cws = ConnectionWebSocket {}; + let base = cws.accept(peer_priv_key, ws.unwrap()).await.unwrap(); let res = BROKER .write() .await - .accept(base, IP::try_from(&ip).unwrap(), None, remote_peer_id) + .accept(base, remote_bind_address, local_bind_address) .await; } @@ -77,11 +448,15 @@ pub async fn run_server_accept_one( let tcp = connections.next().await.unwrap()?; - accept(tcp, peer_priv_key, peer_pub_key).await; + { + BROKER.write().await.set_my_peer_id(peer_pub_key); + } + + accept(tcp, peer_priv_key).await; Ok(()) } -use p2p_net::utils::U8Array; + pub async fn run_server_v0( peer_priv_key: Sensitive<[u8; 32]>, peer_pub_key: PubKey, @@ -91,26 +466,29 @@ pub async fn run_server_v0( ) -> Result<(), ()> { // check config - let mut should_run = false; - for overlay_conf in config.overlays_configs { - if overlay_conf.core != BrokerOverlayPermission::Nobody - || overlay_conf.server != BrokerOverlayPermission::Nobody - { - should_run = true; - break; + let mut run_core = false; + let mut run_server = false; + for overlay_conf in config.overlays_configs.iter() { + if overlay_conf.core != BrokerOverlayPermission::Nobody { + run_core = true; + } + if overlay_conf.server != BrokerOverlayPermission::Nobody { + run_server = true; } } - if !should_run { + if !run_core && !run_server { log_err!("There isn't any overlay_config that should run as core or server. Check your config. cannot start"); return Err(()); } - let listeners: HashSet = HashSet::new(); + if run_core && !run_server { + log_warn!("There isn't any overlay_config that should run as server. This is a misconfiguration as a core server that cannot receive client connections is useless"); + } + + let mut listeners: HashSet = HashSet::new(); for listener in &config.listeners { - let mut id = listener.interface_name.clone(); - id.push('@'); - id.push_str(&listener.port.to_string()); - if listeners.contains(&id) { + let id: String = listener.to_string(); + if !listeners.insert(id.clone()) { log_err!( "The listener {} is defined twice. Check your config file. cannot start", id @@ -130,8 +508,23 @@ pub async fn run_server_v0( let store = LmdbKCVStore::open(&path, master_key); let interfaces = get_interface(); + let mut listener_infos: HashMap = HashMap::new(); + let mut listeners_addrs: Vec<(Vec, String)> = vec![]; let mut listeners: Vec = vec![]; + let mut accept_clients = false; + + // TODO: check that there is only one PublicDyn or one PublicStatic or one Core + + // Preparing the listeners addrs and infos for listener in config.listeners { + if !listener.accept_direct && listener.accept_forward_for == AcceptForwardForV0::No { + log_warn!( + "The interface {} does not accept direct connections nor is configured to forward. it is therefor disabled", + listener.interface_name + ); + continue; + } + match find_name(&interfaces, &listener.interface_name) { None => { log_err!( @@ -141,7 +534,7 @@ pub async fn run_server_v0( return Err(()); } Some(interface) => { - let mut ips: Vec = interface + let mut addrs: Vec = interface .ipv4 .iter() .filter_map(|ip| { @@ -152,7 +545,7 @@ pub async fn run_server_v0( } }) .collect(); - if ips.len() == 0 { + if addrs.len() == 0 { log_err!( "The interface {} does not have any IPv4 address. cannot start", listener.interface_name @@ -171,28 +564,72 @@ pub async fn run_server_v0( } }) .collect(); - ips.append(&mut ipv6s); + addrs.append(&mut ipv6s); } - let ips_string = ips - .iter() - .map(|ip| ip.to_string()) - .collect::>() - .join(", "); - let listener = TcpListener::bind(ips.as_slice()).await.map_err(|e| { - log_err!( - "cannot bind to {} with addresses {} : {}", - interface.name, - ips_string, - e.to_string() - ) - })?; - log_info!("Listening on {} {}", interface.name, ips_string); - listeners.push(listener); + if !listener.refuse_clients { + accept_clients = true; + } + if listener.refuse_clients && listener.accept_forward_for.is_public_domain() { + log_warn!( + "You have disabled accepting connections from clients on {}. This is unusual as --domain and --domain-private listeners are meant to answer to clients only. This will activate the relay_websocket on this listener. Is it really intended?", + listener.interface_name + ); + } + + let listener_id: String = listener.to_string(); + let listener_info = ListenerInfo { + config: listener, + addrs: addrs.iter().map(|addr| addr.into()).collect(), + }; + + listener_infos.insert(listener_id, listener_info); + listeners_addrs.push((addrs, interface.name)); } } } + if listeners_addrs.len() == 0 { + log_err!("No listener configured. cannot start",); + return Err(()); + } + + if !accept_clients { + log_warn!("There isn't any listener that accept clients. This is a misconfiguration as a core server that cannot receive client connections is useless"); + } + + // saving the infos in the broker. This needs to happen before we start listening, as new incoming connections can happen anytime after that. + // and we need those infos for permission checking. + { + let mut broker = BROKER.write().await; + broker.set_my_peer_id(peer_pub_key); + LISTENERS_INFO + .set(broker.set_listeners(listener_infos)) + .unwrap(); + broker.set_overlays_configs(config.overlays_configs); + } + + // Actually starting the listeners + for addrs in listeners_addrs { + let addrs_string = addrs + .0 + .iter() + .map(SocketAddr::to_string) + .collect::>() + .join(", "); + let tcp_listener = TcpListener::bind(addrs.0.as_slice()).await.map_err(|e| { + log_err!( + "cannot bind to {} with addresses {} : {}", + addrs.1, + addrs_string, + e.to_string() + ) + })?; + log_info!("Listening on {} {}", addrs.1, addrs_string); + + listeners.push(tcp_listener); + } + // select on all listeners let mut incoming = futures::stream::select_all( listeners @@ -200,12 +637,14 @@ pub async fn run_server_v0( .map(TcpListener::into_incoming) .map(Box::pin), ); + // Iterate over all incoming connections + + // TODO : select on the shutdown stream too while let Some(tcp) = incoming.next().await { accept( tcp.unwrap(), Sensitive::<[u8; 32]>::from_slice(peer_priv_key.deref()), - peer_pub_key, ) .await; } diff --git a/p2p-broker/src/types.rs b/p2p-broker/src/types.rs index 35ac3ef..b216294 100644 --- a/p2p-broker/src/types.rs +++ b/p2p-broker/src/types.rs @@ -6,130 +6,10 @@ // at your option. All files in the project carrying such // notice may not be copied, modified, or distributed except // according to those terms. -use p2p_net::types::{BindAddress, BrokerServerV0, OverlayId, UserId, IP}; +use p2p_net::types::{BrokerOverlayConfigV0, ListenerV0}; use p2p_repo::types::PrivKey; use serde::{Deserialize, Serialize}; -/// AcceptForwardForV0 type -/// allow answers to connection requests originating from a client behind a reverse proxy -/// Format of last param in the tuple is a list of comma separated hosts or CIDR subnetworks IPv4 and/or IPv6 addresses accepted as X-Forwarded-For -/// Empty string means all addresses are accepted -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub enum AcceptForwardForV0 { - /// X-Forwarded-For not allowed - No, - - /// X-Forwarded-For accepted only for clients with private LAN addresses. First param is the domain of the proxy server - Private((String, String)), - - /// X-Forwarded-For accepted only for clients with public addresses. First param is the domain of the proxy server - /// domain can take an option port (trailing `:port`) - PublicDomain((String, String)), - - /// X-Forwarded-For accepted only for clients with public addresses. First param is the domain of the proxy server - /// domain can take an option port (trailing `:port`) - /// second param is the privKey of the PeerId of the proxy server, useful when the proxy server is load balancing to several daemons - /// that should all use the same PeerId to answer requests - PublicDomainPeer((String, PrivKey, String)), - - /// accepts only clients with public addresses that arrive on a LAN address binding. This is used for DMZ and port forwarding configs - /// first param is the port, second param in tuple is the interval for periodic probe of the external IP - PublicDyn((u16, u32, String)), - - /// accepts only clients with public addresses that arrive on a LAN address binding. This is used for DMZ and port forwarding configs - /// First param is the IPv4 bind address of the reverse NAT server (DMZ, port forwarding) - /// Second param is ab optional IPv6 bind address of the reverse NAT server (DMZ, port forwarding) - PublicStatic((BindAddress, Option, String)), -} - -/// DaemonConfig Listener Version 0 -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ListenerV0 { - /// local interface name to bind to - /// names of interfaces can be retrieved with the --list-interfaces option - pub interface_name: String, - - /// optional number of seconds for an interval of periodic refresh - /// of the actual IP(s) of the interface. Used for dynamic IP interfaces (DHCP) - pub interface_refresh: u32, - - // if to bind to the ipv6 address of the interface - pub ipv6: bool, - - /// local port to listen on - pub port: u16, - - // will answer a probe coming from private LAN and if is_private, with its own peerId, so that guests on the network will be able to connect. - pub discoverable: bool, - - /// Answers to connection requests originating from a direct client, without X-Forwarded-For headers - /// Can be used in combination with a accept_forward_for config, when a local daemon is behind a proxy, and also serves as broker for local apps/webbrowsers - pub accept_direct: bool, - - /// X-Forwarded-For config. only valid if IP/interface is localhost or private - pub accept_forward_for: AcceptForwardForV0, - // impl fn is_private() - // returns false if public IP in interface, or if PublicDyn, PublicStatic - // if the ip is local or private, and the forwarding is not PublicDyn nor PublicStatic, (if is_private) then the app is served on HTTP get of / - - // an interface with no accept_forward_for and no accept_direct, is de facto, disabled -} - -impl ListenerV0 { - pub fn new_direct(name: String, ipv6: bool, port: u16) -> Self { - Self { - interface_name: name, - interface_refresh: 0, - ipv6, - port, - discoverable: false, - accept_direct: true, - accept_forward_for: AcceptForwardForV0::No, - } - } -} - -/// Broker Overlay Permission -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub enum BrokerOverlayPermission { - Nobody, - Anybody, - AllRegisteredUser, - UsersList(Vec), -} - -/// Broker Overlay Config -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct BrokerOverlayConfigV0 { - // list of overlays this config applies to. empty array means applying to all - pub overlays: Vec, - // Who can ask to join an overlay on the core - pub core: BrokerOverlayPermission, - // Who can connect as a client to this server - pub server: BrokerOverlayPermission, - // if core == Nobody and server == Nobody then the listeners will not be started - - // are ExtRequest allowed on the server? this requires the core to be ON. - pub allow_read: bool, - - /// an empty list means to forward to the peer known for each overlay. - /// forward and core are mutually exclusive. forward becomes the default when core is disabled (set to Nobody). - /// core always takes precedence. - pub forward: Vec, -} - -impl BrokerOverlayConfigV0 { - pub fn new() -> Self { - BrokerOverlayConfigV0 { - overlays: vec![], - core: BrokerOverlayPermission::Nobody, - server: BrokerOverlayPermission::Nobody, - allow_read: false, - forward: vec![], - } - } -} - /// DaemonConfig Version 0 #[derive(Clone, Debug, Serialize, Deserialize)] pub struct DaemonConfigV0 { diff --git a/p2p-client-ws/Cargo.toml b/p2p-client-ws/Cargo.toml index 7da8970..6ecb205 100644 --- a/p2p-client-ws/Cargo.toml +++ b/p2p-client-ws/Cargo.toml @@ -33,4 +33,4 @@ features = ["js"] [target.'cfg(not(target_arch = "wasm32"))'.dependencies] getrandom = "0.2.7" xactor = "0.7.11" -async-tungstenite = { version = "0.17.2", features = ["async-std-runtime"] } +async-tungstenite = { version = "0.22.2", features = ["async-std-runtime"] } diff --git a/p2p-client-ws/src/remote_ws.rs b/p2p-client-ws/src/remote_ws.rs index de726ba..d33ccf9 100644 --- a/p2p-client-ws/src/remote_ws.rs +++ b/p2p-client-ws/src/remote_ws.rs @@ -91,7 +91,6 @@ impl IAccept for ConnectionWebSocket { async fn accept( &self, peer_privk: Sensitive<[u8; 32]>, - peer_pubk: PubKey, socket: Self::Socket, ) -> Result { let mut cnx = ConnectionBase::new(ConnectionDir::Server, TransportProtocol::WS); diff --git a/p2p-net/Cargo.toml b/p2p-net/Cargo.toml index 55ff5c4..7903d7b 100644 --- a/p2p-net/Cargo.toml +++ b/p2p-net/Cargo.toml @@ -30,4 +30,5 @@ version = "0.2.7" features = ["js"] [target.'cfg(not(target_arch = "wasm32"))'.dependencies] -getrandom = "0.2.7" \ No newline at end of file +getrandom = "0.2.7" +default-net = "0.15" \ No newline at end of file diff --git a/p2p-net/src/broker.rs b/p2p-net/src/broker.rs index 5d47b43..03bb2ef 100644 --- a/p2p-net/src/broker.rs +++ b/p2p-net/src/broker.rs @@ -63,9 +63,15 @@ pub static BROKER: Lazy>> = Lazy::new(|| Arc::new(RwLock::new pub struct Broker { direct_connections: HashMap, peers: HashMap, + incoming_anonymous_connections: HashMap<(BindAddress, BindAddress), ConnectionBase>, + #[cfg(not(target_arch = "wasm32"))] + listeners: HashMap, + bind_addresses: HashMap, + overlays_configs: Vec, shutdown: Option>, shutdown_sender: Sender, closing: bool, + my_peer_id: Option, test: u32, tauri_streams: HashMap>, @@ -79,7 +85,7 @@ impl Broker { } /// helper function to cancel a tauri stream - /// /// only used in Tauri, not used in the JS SDK + /// only used in Tauri, not used in the JS SDK pub fn tauri_stream_cancel(&mut self, stream_id: String) { let s = self.tauri_streams.remove(&stream_id); if let Some(sender) = s { @@ -87,6 +93,34 @@ impl Broker { } } + pub fn set_my_peer_id(&mut self, id: PubKey) { + if self.my_peer_id.is_none() { + self.my_peer_id = Some(id) + } + } + + #[cfg(not(target_arch = "wasm32"))] + pub fn set_listeners( + &mut self, + listeners: HashMap, + ) -> (HashMap, HashMap) { + for entry in listeners.iter() { + for ba in entry.1.addrs.iter() { + self.bind_addresses.insert(ba.clone(), entry.0.clone()); + } + } + self.listeners.extend(listeners); + let mut copy_listeners: HashMap = HashMap::new(); + let mut copy_bind_addresses: HashMap = HashMap::new(); + copy_listeners.clone_from(&self.listeners); + copy_bind_addresses.clone_from(&self.bind_addresses); + (copy_listeners, copy_bind_addresses) + } + + pub fn set_overlays_configs(&mut self, overlays_configs: Vec) { + self.overlays_configs.extend(overlays_configs) + } + pub async fn get_block_from_store_with_block_id( &mut self, nuri: String, @@ -218,6 +252,11 @@ impl Broker { let mut random_buf = [0u8; 4]; getrandom::getrandom(&mut random_buf).unwrap(); Broker { + incoming_anonymous_connections: HashMap::new(), + #[cfg(not(target_arch = "wasm32"))] + listeners: HashMap::new(), + bind_addresses: HashMap::new(), + overlays_configs: vec![], shutdown: Some(shutdown_receiver), shutdown_sender, direct_connections: HashMap::new(), @@ -225,6 +264,7 @@ impl Broker { tauri_streams: HashMap::new(), closing: false, test: u32::from_be_bytes(random_buf), + my_peer_id: None, } } @@ -299,49 +339,50 @@ impl Broker { pub async fn accept( &mut self, mut connection: ConnectionBase, - ip: IP, - core: Option, - remote_peer_id: DirectPeerId, + remote_bind_address: BindAddress, + local_bind_address: BindAddress, ) -> Result<(), NetError> { if self.closing { return Err(NetError::Closing); } - let join = connection.take_shutdown(); - - let connected = if core.is_some() { - let dc = DirectConnection { - ip, - interface: core.clone().unwrap(), - remote_peer_id, - tp: connection.transport_protocol(), - cnx: connection, - }; - self.direct_connections.insert(ip, dc); - PeerConnection::Core(ip) - } else { - PeerConnection::Client(connection) - }; - let bpi = BrokerPeerInfo { - lastPeerAdvert: None, - connected, - }; - self.peers.insert(remote_peer_id, bpi); - - async fn watch_close( - mut join: Receiver, - remote_peer_id: DirectPeerId, - ) -> ResultSend<()> { - async move { - let res = join.next().await; - log_info!("SOCKET IS CLOSED {:?} {:?}", res, &remote_peer_id); - log_info!("REMOVED"); - BROKER.write().await.remove(&remote_peer_id); - } - .await; - Ok(()) - } - spawn_and_log_error(watch_close(join, remote_peer_id)); + //self.incoming_anonymous_connections + + // let join = connection.take_shutdown(); + + // let connected = if core.is_some() { + // let dc = DirectConnection { + // ip, + // interface: core.clone().unwrap(), + // remote_peer_id, + // tp: connection.transport_protocol(), + // cnx: connection, + // }; + // self.direct_connections.insert(ip, dc); + // PeerConnection::Core(ip) + // } else { + // PeerConnection::Client(connection) + // }; + // let bpi = BrokerPeerInfo { + // lastPeerAdvert: None, + // connected, + // }; + // self.peers.insert(remote_peer_id, bpi); + + // async fn watch_close( + // mut join: Receiver, + // remote_peer_id: DirectPeerId, + // ) -> ResultSend<()> { + // async move { + // let res = join.next().await; + // log_info!("SOCKET IS CLOSED {:?} {:?}", res, &remote_peer_id); + // log_info!("REMOVED"); + // BROKER.write().await.remove(&remote_peer_id); + // } + // .await; + // Ok(()) + // } + // spawn_and_log_error(watch_close(join, remote_peer_id)); Ok(()) } diff --git a/p2p-net/src/connection.rs b/p2p-net/src/connection.rs index 17aa340..9a8af4a 100644 --- a/p2p-net/src/connection.rs +++ b/p2p-net/src/connection.rs @@ -64,7 +64,6 @@ pub trait IAccept: Send + Sync { async fn accept( &self, peer_privk: Sensitive<[u8; 32]>, - peer_pubk: PubKey, socket: Self::Socket, ) -> Result; } @@ -204,7 +203,7 @@ impl NoiseFSM { } pub async fn send(&mut self, msg: ProtocolMessage) -> Result<(), ProtocolError> { - log_info!("SENDING: {:?}", msg); + log_trace!("SENDING: {:?}", msg); if self.noise_cipher_state_enc.is_some() { let cipher = self.encrypt(msg)?; self.sender @@ -250,7 +249,7 @@ impl NoiseFSM { } } if msg_opt.is_some() { - log_info!("RECEIVED: {:?}", msg_opt.as_ref().unwrap()); + log_trace!("RECEIVED: {:?}", msg_opt.as_ref().unwrap()); } match self.state { // TODO verify that ID is zero diff --git a/p2p-net/src/errors.rs b/p2p-net/src/errors.rs index 49bc714..b147b9b 100644 --- a/p2p-net/src/errors.rs +++ b/p2p-net/src/errors.rs @@ -29,6 +29,7 @@ pub enum NetError { ConnectionError, SerializationError, ProtocolError, + ConnectionDenied, Closing, } //MAX 50 NetErrors diff --git a/p2p-net/src/types.rs b/p2p-net/src/types.rs index 3501f06..3060a29 100644 --- a/p2p-net/src/types.rs +++ b/p2p-net/src/types.rs @@ -13,27 +13,91 @@ //! //! Corresponds to the BARE schema -use core::fmt; -use std::{ - any::{Any, TypeId}, - net::IpAddr, +use crate::utils::{ + get_domain_without_port, get_domain_without_port_443, is_ipv4_private, is_ipv6_private, + is_private_ip, is_public_ip, is_public_ipv4, is_public_ipv6, }; - use crate::{actor::EActor, actors::*, errors::ProtocolError}; +use core::fmt; use p2p_repo::types::*; use serde::{Deserialize, Serialize}; +use std::{ + any::{Any, TypeId}, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, +}; // // Broker common types // +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum InterfaceType { + Loopback, + Private, + Public, + Invalid, +} + +impl InterfaceType { + pub fn is_ip_valid_for_type(&self, ip: &IP) -> bool { + self.is_ipaddr_valid_for_type(&ip.into()) + } + pub fn is_ipaddr_valid_for_type(&self, ip: &IpAddr) -> bool { + match ip { + IpAddr::V4(v4) => self.is_ipv4_valid_for_type(v4), + IpAddr::V6(v6) => self.is_ipv6_valid_for_type(v6), + } + } + + pub fn is_ipv4_valid_for_type(&self, ip: &Ipv4Addr) -> bool { + match self { + InterfaceType::Loopback => ip.is_loopback(), + InterfaceType::Public => is_public_ipv4(ip), + // we allow to bind to link-local for IPv4 + InterfaceType::Private => is_ipv4_private(ip), + _ => false, + } + } + pub fn is_ipv6_valid_for_type(&self, ip: &Ipv6Addr) -> bool { + match self { + InterfaceType::Loopback => ip.is_loopback(), + InterfaceType::Public => is_public_ipv6(ip), + // we do NOT allow to bind to link-local for IPv6 + InterfaceType::Private => is_ipv6_private(ip), + _ => false, + } + } +} + +#[cfg(not(target_arch = "wasm32"))] +#[derive(Clone, Debug)] +pub struct Interface { + pub if_type: InterfaceType, + pub name: String, + pub mac_addr: Option, + /// List of Ipv4Net for the network interface + pub ipv4: Vec, + /// List of Ipv6Net for the network interface + pub ipv6: Vec, +} + /// Bind address -#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct BindAddress { pub port: u16, pub ip: IP, } +impl From<&SocketAddr> for BindAddress { + #[inline] + fn from(addr: &SocketAddr) -> BindAddress { + let ip_addr = addr.ip(); + let ip = IP::try_from(&ip_addr).unwrap(); + let port = addr.port(); + BindAddress { ip, port } + } +} + /// BrokerServerTypeV0 type #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum BrokerServerTypeV0 { @@ -54,6 +118,214 @@ pub struct BrokerServerV0 { pub peer_id: PubKey, } +/// ListenerInfo +#[cfg(not(target_arch = "wasm32"))] +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ListenerInfo { + pub config: ListenerV0, + + /// list of BindAddresses + pub addrs: Vec, +} + +/// AcceptForwardForV0 type +/// allow answers to connection requests originating from a client behind a reverse proxy +/// Format of last param in the tuple is a list of comma separated hosts or CIDR subnetworks IPv4 and/or IPv6 addresses accepted as X-Forwarded-For +/// Empty string means all addresses are accepted +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub enum AcceptForwardForV0 { + /// X-Forwarded-For not allowed + No, + + /// X-Forwarded-For accepted only for clients with private LAN addresses. First param is the domain of the proxy server + PrivateDomain((String, String)), + + /// X-Forwarded-For accepted only for clients with public addresses. First param is the domain of the proxy server + /// domain can take an option port (trailing `:port`) + PublicDomain((String, String)), + + /// X-Forwarded-For accepted only for clients with public addresses. First param is the domain of the proxy server + /// domain can take an option port (trailing `:port`) + /// second param is the privKey of the PeerId of the proxy server, useful when the proxy server is load balancing to several daemons + /// that should all use the same PeerId to answer requests + PublicDomainPeer((String, PrivKey, String)), + + /// accepts only clients with public addresses that arrive on a LAN address binding. This is used for DMZ and port forwarding configs + /// first param is the port, second param in tuple is the interval for periodic probe of the external IP + PublicDyn((u16, u32, String)), + + /// accepts only clients with public addresses that arrive on a LAN address binding. This is used for DMZ and port forwarding configs + /// First param is the IPv4 bind address of the reverse NAT server (DMZ, port forwarding) + /// Second param is an optional IPv6 bind address of the reverse NAT server (DMZ, port forwarding) + PublicStatic((BindAddress, Option, String)), +} + +impl AcceptForwardForV0 { + pub fn get_public_bind_addresses(&self) -> Vec { + match self { + AcceptForwardForV0::PublicStatic((ipv4, ipv6, _)) => { + let mut res = vec![ipv4.clone()]; + if ipv6.is_some() { + res.push(ipv6.unwrap().clone()) + } + res + } + AcceptForwardForV0::PublicDyn(_) => { + todo!(); + } + _ => panic!("cannot call get_public_bind_addresses"), + } + } + + pub fn is_public_domain(&self) -> bool { + match self { + AcceptForwardForV0::PublicDomainPeer(_) => true, + AcceptForwardForV0::PublicDomain(_) => true, + _ => false, + } + } + pub fn is_public_static(&self) -> bool { + match self { + AcceptForwardForV0::PublicStatic(_) => true, + _ => false, + } + } + pub fn is_public_dyn(&self) -> bool { + match self { + AcceptForwardForV0::PublicDyn(_) => true, + _ => false, + } + } + pub fn is_private_domain(&self) -> bool { + match self { + AcceptForwardForV0::PrivateDomain(_) => true, + _ => false, + } + } + pub fn get_domain(&self) -> &str { + let domain = get_domain_without_port_443(match self { + AcceptForwardForV0::PrivateDomain((d, _)) => d, + AcceptForwardForV0::PublicDomain((d, _)) => d, + AcceptForwardForV0::PublicDomainPeer((d, _, _)) => d, + _ => panic!("cannot call get_domain if AcceptForwardForV0 is not a domain"), + }); + //let mut url = "https://".to_string(); + //url.push_str(domain); + domain + } +} + +#[cfg(not(target_arch = "wasm32"))] +/// DaemonConfig Listener Version 0 +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ListenerV0 { + /// local interface name to bind to + /// names of interfaces can be retrieved with the --list-interfaces option + pub interface_name: String, + + pub if_type: InterfaceType, + + /// optional number of seconds for an interval of periodic refresh + /// of the actual IP(s) of the interface. Used for dynamic IP interfaces (DHCP) + pub interface_refresh: u32, + + // if to bind to the ipv6 address of the interface + pub ipv6: bool, + + /// local port to listen on + pub port: u16, + + /// should the server serve the app files in HTTP mode (not WS). this setting will be discarded and app will not be served anyway if remote IP is public or listener is public + pub serve_app: bool, + + /// default to false. Set to true by --core (use --core-and-clients to override to false). only useful for a public IP listener, if the clients should use another listener like --domain or --domain-private. + /// do not set it on a --domain or --domain-private, as this will enable the relay_websocket feature, which should not be used except by app.nextgraph.one + pub refuse_clients: bool, + + // will answer a probe coming from private LAN and if is_private, with its own peerId, so that guests on the network will be able to connect. + pub discoverable: bool, + + /// Answers to connection requests originating from a direct client, without X-Forwarded-For headers + /// Can be used in combination with a accept_forward_for config, when a local daemon is behind a proxy, and also serves as broker for local apps/webbrowsers + pub accept_direct: bool, + + /// X-Forwarded-For config. only valid if IP/interface is localhost or private + pub accept_forward_for: AcceptForwardForV0, + // impl fn is_private() + // returns false if public IP in interface, or if PublicDyn, PublicStatic + // if the ip is local or private, and the forwarding is not PublicDyn nor PublicStatic, (if is_private) then the app is served on HTTP get of / + + // an interface with no accept_forward_for and no accept_direct, is de facto, disabled +} + +#[cfg(not(target_arch = "wasm32"))] +impl ListenerV0 { + pub fn new_direct(interface: Interface, ipv6: bool, port: u16) -> Self { + Self { + interface_name: interface.name, + if_type: interface.if_type, + interface_refresh: 0, + ipv6, + port, + discoverable: false, + accept_direct: true, + refuse_clients: false, + serve_app: true, + accept_forward_for: AcceptForwardForV0::No, + } + } +} +#[cfg(not(target_arch = "wasm32"))] +impl fmt::Display for ListenerV0 { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut id = self.interface_name.clone(); + id.push('@'); + id.push_str(&self.port.to_string()); + write!(f, "{}", id) + } +} + +/// Broker Overlay Permission +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub enum BrokerOverlayPermission { + Nobody, + Anybody, + AllRegisteredUser, + UsersList(Vec), +} + +/// Broker Overlay Config +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct BrokerOverlayConfigV0 { + // list of overlays this config applies to. empty array means applying to all + pub overlays: Vec, + // Who can ask to join an overlay on the core + pub core: BrokerOverlayPermission, + // Who can connect as a client to this server + pub server: BrokerOverlayPermission, + // if core == Nobody and server == Nobody then the listeners will not be started + + // are ExtRequest allowed on the server? this requires the core to be ON. + pub allow_read: bool, + + /// an empty list means to forward to the peer known for each overlay. + /// forward and core are mutually exclusive. forward becomes the default when core is disabled (set to Nobody). + /// core always takes precedence. + pub forward: Vec, +} + +impl BrokerOverlayConfigV0 { + pub fn new() -> Self { + BrokerOverlayConfigV0 { + overlays: vec![], + core: BrokerOverlayPermission::Nobody, + server: BrokerOverlayPermission::Nobody, + allow_read: false, + forward: vec![], + } + } +} + // // COMMON TYPES FOR MESSAGES // @@ -104,6 +376,19 @@ pub enum IP { IPv6(IPv6), } +impl IP { + pub fn is_public(&self) -> bool { + is_public_ip(&self.into()) + } + pub fn is_private(&self) -> bool { + is_private_ip(&self.into()) + } + pub fn is_loopback(&self) -> bool { + let t: &IpAddr = &self.into(); + t.is_loopback() + } +} + impl fmt::Display for IP { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let t: IpAddr = self.try_into().unwrap(); diff --git a/p2p-net/src/utils.rs b/p2p-net/src/utils.rs index d5bacbc..cde9c4f 100644 --- a/p2p-net/src/utils.rs +++ b/p2p-net/src/utils.rs @@ -17,6 +17,7 @@ use noise_protocol::DH; pub use noise_rust_crypto::sensitive::Sensitive; use p2p_repo::log::*; use p2p_repo::types::PubKey; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; #[cfg(target_arch = "wasm32")] pub fn spawn_and_log_error(fut: F) -> task::JoinHandle<()> @@ -95,7 +96,43 @@ impl Dual25519Keys { } } } -use std::net::{Ipv4Addr, Ipv6Addr}; + +pub fn get_domain_without_port(domain: &String) -> String { + let parts: Vec<&str> = domain.split(':').collect(); + parts[0].to_string() +} + +pub fn get_domain_without_port_443(domain: &str) -> &str { + let parts: Vec<&str> = domain.split(':').collect(); + if parts.len() > 1 && parts[1] == "443" { + return parts[0]; + } + domain +} + +pub fn is_public_ipv4(ip: &Ipv4Addr) -> bool { + // TODO, use core::net::Ipv6Addr.is_global when it will be stable + return is_ipv4_global(ip); +} + +pub fn is_public_ipv6(ip: &Ipv6Addr) -> bool { + // TODO, use core::net::Ipv6Addr.is_global when it will be stable + return is_ipv6_global(ip); +} + +pub fn is_public_ip(ip: &IpAddr) -> bool { + match ip { + IpAddr::V4(v4) => is_public_ipv4(v4), + IpAddr::V6(v6) => is_public_ipv6(v6), + } +} + +pub fn is_private_ip(ip: &IpAddr) -> bool { + match ip { + IpAddr::V4(v4) => is_ipv4_private(v4), + IpAddr::V6(v6) => is_ipv6_private(v6), + } +} #[must_use] #[inline] diff --git a/p2p-repo/src/lib.rs b/p2p-repo/src/lib.rs index 1a2cdff..ddc6f6d 100644 --- a/p2p-repo/src/lib.rs +++ b/p2p-repo/src/lib.rs @@ -138,13 +138,13 @@ pub mod log { #[cfg(all(debug_assertions, target_arch = "wasm32"))] #[macro_export] macro_rules! log_debug { - ($($t:tt)*) => (log(format!("DEBUG:{}",&format_args!($($t)*).to_string()))) + ($($t:tt)*) => (log(&format!("DEBUG:{}",&format_args!($($t)*).to_string()).to_string())) } #[cfg(all(debug_assertions, target_arch = "wasm32"))] #[macro_export] macro_rules! log_trace { - ($($t:tt)*) => (log(format!("TRACE:{}",&format_args!($($t)*).to_string()))) + ($($t:tt)*) => (log(&format!("TRACE:{}",&format_args!($($t)*).to_string()).to_string())) } #[cfg(all(not(debug_assertions), target_arch = "wasm32"))]