From 460b723f1b9e7f71d93cd148cb62872073598e44 Mon Sep 17 00:00:00 2001 From: Niko Date: Wed, 21 Jun 2023 00:24:29 +0300 Subject: [PATCH] listening on all listeners defined in config --- Cargo.lock | 2 +- ngd/Cargo.toml | 1 - ngd/src/main.rs | 170 ++++------------------------------ ngd/src/types.rs | 15 --- p2p-broker/Cargo.toml | 1 + p2p-broker/src/interfaces.rs | 172 +++++++++++++++++++++++++++++++++++ p2p-broker/src/lib.rs | 2 + p2p-broker/src/server_ws.rs | 132 ++++++++++++++++++++++----- p2p-broker/src/types.rs | 15 +++ 9 files changed, 318 insertions(+), 192 deletions(-) create mode 100644 p2p-broker/src/interfaces.rs diff --git a/Cargo.lock b/Cargo.lock index 01e4a01..f212f58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2766,7 +2766,6 @@ dependencies = [ "async-std", "base64-url", "clap", - "default-net", "env_logger", "lazy_static", "log", @@ -2971,6 +2970,7 @@ dependencies = [ "async-tungstenite", "blake3", "chacha20", + "default-net", "futures", "getrandom 0.2.9", "hex", diff --git a/ngd/Cargo.toml b/ngd/Cargo.toml index d41c602..52fc6a3 100644 --- a/ngd/Cargo.toml +++ b/ngd/Cargo.toml @@ -15,7 +15,6 @@ p2p-broker = { path = "../p2p-broker" } p2p-net = { path = "../p2p-net" } p2p-repo = { path = "../p2p-repo", features = ["server_log_output"] } async-std = { version = "1.12.0", features = ["attributes"] } -default-net = "0.15" log = "0.4" env_logger = "0.10" clap = { version = "4.3.4", features = ["derive","env","string"] } diff --git a/ngd/src/main.rs b/ngd/src/main.rs index 4246b97..ca31fde 100644 --- a/ngd/src/main.rs +++ b/ngd/src/main.rs @@ -16,7 +16,8 @@ mod cli; use crate::cli::*; use crate::types::*; use clap::Parser; -use p2p_broker::server_ws::run_server; +use p2p_broker::interfaces::*; +use p2p_broker::server_ws::run_server_v0; use p2p_broker::types::*; use p2p_broker::utils::*; use p2p_net::types::*; @@ -41,157 +42,6 @@ use addr::psl::List; use lazy_static::lazy_static; use regex::Regex; -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum InterfaceType { - Loopback, - Private, - Public, - Invalid, -} - -impl InterfaceType { - pub fn is_ipv4_valid_for_type(&self, ip: &Ipv4Addr) -> bool { - match self { - InterfaceType::Loopback => ip.is_loopback(), - InterfaceType::Public => is_public_ipv4(ip), - // we allow to bind to link-local for IPv4 - InterfaceType::Private => is_ipv4_private(ip), - _ => false, - } - } - pub fn is_ipv6_valid_for_type(&self, ip: &Ipv6Addr) -> bool { - match self { - InterfaceType::Loopback => ip.is_loopback(), - InterfaceType::Public => is_public_ipv6(ip), - // we do NOT allow to bind to link-local for IPv6 - InterfaceType::Private => is_ipv6_private(ip), - _ => false, - } - } -} - -pub fn print_ipv4(ip: &default_net::ip::Ipv4Net) -> String { - format!("{}/{}", ip.addr, ip.prefix_len) -} - -pub fn print_ipv6(ip: &default_net::ip::Ipv6Net) -> String { - format!("{}/{}", ip.addr, ip.prefix_len) -} - -#[derive(Clone, Debug)] -pub struct Interface { - pub if_type: InterfaceType, - pub name: String, - pub mac_addr: Option, - /// List of Ipv4Net for the network interface - pub ipv4: Vec, - /// List of Ipv6Net for the network interface - pub ipv6: Vec, -} - -fn find_first(list: &Vec, iftype: InterfaceType) -> Option { - for inf in list { - if inf.if_type == iftype { - return Some(inf.clone()); - } - } - None -} - -fn find_first_or_name( - list: &Vec, - iftype: InterfaceType, - name: &String, -) -> Option { - for inf in list { - if (name == "default" || *name == inf.name) && inf.if_type == iftype { - return Some(inf.clone()); - } - } - None -} - -fn is_public_ipv4(ip: &Ipv4Addr) -> bool { - // TODO, use core::net::Ipv6Addr.is_global when it will be stable - return is_ipv4_global(ip); -} - -fn is_public_ipv6(ip: &Ipv6Addr) -> bool { - // TODO, use core::net::Ipv6Addr.is_global when it will be stable - return is_ipv6_global(ip); -} - -fn is_public_ip(ip: &IpAddr) -> bool { - match ip { - IpAddr::V4(v4) => is_public_ipv4(v4), - IpAddr::V6(v6) => is_public_ipv6(v6), - } -} - -fn is_private_ip(ip: &IpAddr) -> bool { - match ip { - IpAddr::V4(v4) => is_ipv4_private(v4), - IpAddr::V6(v6) => is_ipv6_private(v6), - } -} - -pub fn get_interface() -> Vec { - let mut res: Vec = vec![]; - let interfaces = default_net::get_interfaces(); - for interface in interfaces { - if interface.ipv4.len() > 0 { - let first_v4 = interface.ipv4[0].addr; - let if_type = if first_v4.is_loopback() { - InterfaceType::Loopback - } else if is_ipv4_private(&first_v4) { - InterfaceType::Private - } else if is_public_ipv4(&first_v4) { - InterfaceType::Public - } else { - continue; - }; - let interf = Interface { - if_type, - name: interface.name, - mac_addr: interface.mac_addr, - ipv4: interface.ipv4, - ipv6: interface.ipv6, - }; - res.push(interf); - } - } - res -} - -pub fn print_interfaces() { - let interfaces = get_interface(); - for interface in interfaces { - println!("{} \t{:?}", interface.name, interface.if_type); - - println!( - "\tIPv4: {}", - interface - .ipv4 - .iter() - .map(|ip| print_ipv4(ip)) - .collect::>() - .join(" ") - ); - println!( - "\tIPv6: {}", - interface - .ipv6 - .iter() - .map(|ip| print_ipv6(ip)) - .collect::>() - .join(" ") - ); - if let Some(mac_addr) = interface.mac_addr { - println!("\tMAC: {}", mac_addr); - } - } -} - fn decode_key(key_string: &String) -> Result<[u8; 32], ()> { let vec = base64_url::decode(key_string).map_err(|_| log_err!("key has invalid content"))?; Ok(*slice_as_array!(&vec, [u8; 32]) @@ -1062,7 +912,7 @@ async fn main_inner() -> Result<(), ()> { config_path.to_str().unwrap() ); log_info!( - "You cannot use Quick config options anymore on the command line in your next start of the server. But you can go to modify the config file directly, or delete it.", + "You not be able to use any Quick config options anymore on the command line at the next command-line start of the server. But you can go to modify the config file directly, or delete it.", ); } } else { @@ -1115,7 +965,19 @@ async fn main_inner() -> Result<(), ()> { log_info!("PeerId of node: {}", pubkey); debug_println!("Private key of peer: {}", prix_key_encoded); - run_server("127.0.0.1", WS_PORT, privkey, pubkey, path).await?; + + match (config.unwrap()) { + DaemonConfig::V0(v0) => { + run_server_v0( + privkey, + pubkey, + Sensitive::<[u8; 32]>::from_slice(&keys[2]), + v0, + path, + ) + .await? + } + } Ok(()) } diff --git a/ngd/src/types.rs b/ngd/src/types.rs index 1a82fa3..23f351d 100644 --- a/ngd/src/types.rs +++ b/ngd/src/types.rs @@ -10,18 +10,3 @@ use p2p_broker::types::BrokerOverlayConfigV0; use p2p_broker::types::ListenerV0; use p2p_repo::types::PrivKey; use serde::{Deserialize, Serialize}; - -/// DaemonConfig Version 0 -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct DaemonConfigV0 { - /// List of listeners for TCP (HTTP) incoming connections - pub listeners: Vec, - - pub overlays_configs: Vec, -} - -/// Daemon config -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum DaemonConfig { - V0(DaemonConfigV0), -} diff --git a/p2p-broker/Cargo.toml b/p2p-broker/Cargo.toml index d12b29d..2213783 100644 --- a/p2p-broker/Cargo.toml +++ b/p2p-broker/Cargo.toml @@ -25,6 +25,7 @@ hex = "0.4.3" async-trait = "0.1.64" async-tungstenite = { version = "0.17.2", features = ["async-std-runtime"] } blake3 = "1.3.1" +default-net = "0.15" [target.'cfg(target_arch = "wasm32")'.dependencies.getrandom] version = "0.2.7" diff --git a/p2p-broker/src/interfaces.rs b/p2p-broker/src/interfaces.rs new file mode 100644 index 0000000..d898a4e --- /dev/null +++ b/p2p-broker/src/interfaces.rs @@ -0,0 +1,172 @@ +/* + * Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers + * All rights reserved. + * Licensed under the Apache License, Version 2.0 + * + * or the MIT license , + * at your option. All files in the project carrying such + * notice may not be copied, modified, or distributed except + * according to those terms. + */ +use p2p_net::utils::{is_ipv4_global, is_ipv4_private, is_ipv6_global, is_ipv6_private}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum InterfaceType { + Loopback, + Private, + Public, + Invalid, +} + +impl InterfaceType { + pub fn is_ipv4_valid_for_type(&self, ip: &Ipv4Addr) -> bool { + match self { + InterfaceType::Loopback => ip.is_loopback(), + InterfaceType::Public => is_public_ipv4(ip), + // we allow to bind to link-local for IPv4 + InterfaceType::Private => is_ipv4_private(ip), + _ => false, + } + } + pub fn is_ipv6_valid_for_type(&self, ip: &Ipv6Addr) -> bool { + match self { + InterfaceType::Loopback => ip.is_loopback(), + InterfaceType::Public => is_public_ipv6(ip), + // we do NOT allow to bind to link-local for IPv6 + InterfaceType::Private => is_ipv6_private(ip), + _ => false, + } + } +} + +pub fn print_ipv4(ip: &default_net::ip::Ipv4Net) -> String { + format!("{}/{}", ip.addr, ip.prefix_len) +} + +pub fn print_ipv6(ip: &default_net::ip::Ipv6Net) -> String { + format!("{}/{}", ip.addr, ip.prefix_len) +} + +#[derive(Clone, Debug)] +pub struct Interface { + pub if_type: InterfaceType, + pub name: String, + pub mac_addr: Option, + /// List of Ipv4Net for the network interface + pub ipv4: Vec, + /// List of Ipv6Net for the network interface + pub ipv6: Vec, +} + +pub fn find_first(list: &Vec, iftype: InterfaceType) -> Option { + for inf in list { + if inf.if_type == iftype { + return Some(inf.clone()); + } + } + None +} + +pub fn find_first_or_name( + list: &Vec, + iftype: InterfaceType, + name: &String, +) -> Option { + for inf in list { + if (name == "default" || *name == inf.name) && inf.if_type == iftype { + return Some(inf.clone()); + } + } + None +} + +pub fn find_name(list: &Vec, name: &String) -> Option { + for inf in list { + if *name == inf.name { + return Some(inf.clone()); + } + } + None +} + +pub fn is_public_ipv4(ip: &Ipv4Addr) -> bool { + // TODO, use core::net::Ipv6Addr.is_global when it will be stable + return is_ipv4_global(ip); +} + +pub fn is_public_ipv6(ip: &Ipv6Addr) -> bool { + // TODO, use core::net::Ipv6Addr.is_global when it will be stable + return is_ipv6_global(ip); +} + +pub fn is_public_ip(ip: &IpAddr) -> bool { + match ip { + IpAddr::V4(v4) => is_public_ipv4(v4), + IpAddr::V6(v6) => is_public_ipv6(v6), + } +} + +pub fn is_private_ip(ip: &IpAddr) -> bool { + match ip { + IpAddr::V4(v4) => is_ipv4_private(v4), + IpAddr::V6(v6) => is_ipv6_private(v6), + } +} + +pub fn get_interface() -> Vec { + let mut res: Vec = vec![]; + let interfaces = default_net::get_interfaces(); + for interface in interfaces { + if interface.ipv4.len() > 0 { + let first_v4 = interface.ipv4[0].addr; + let if_type = if first_v4.is_loopback() { + InterfaceType::Loopback + } else if is_ipv4_private(&first_v4) { + InterfaceType::Private + } else if is_public_ipv4(&first_v4) { + InterfaceType::Public + } else { + continue; + }; + let interf = Interface { + if_type, + name: interface.name, + mac_addr: interface.mac_addr, + ipv4: interface.ipv4, + ipv6: interface.ipv6, + }; + res.push(interf); + } + } + res +} + +pub fn print_interfaces() { + let interfaces = get_interface(); + for interface in interfaces { + println!("{} \t{:?}", interface.name, interface.if_type); + + println!( + "\tIPv4: {}", + interface + .ipv4 + .iter() + .map(|ip| print_ipv4(ip)) + .collect::>() + .join(" ") + ); + println!( + "\tIPv6: {}", + interface + .ipv6 + .iter() + .map(|ip| print_ipv6(ip)) + .collect::>() + .join(" ") + ); + if let Some(mac_addr) = interface.mac_addr { + println!("\tMAC: {}", mac_addr); + } + } +} diff --git a/p2p-broker/src/lib.rs b/p2p-broker/src/lib.rs index 999ee42..e1eb639 100644 --- a/p2p-broker/src/lib.rs +++ b/p2p-broker/src/lib.rs @@ -5,3 +5,5 @@ pub mod server_ws; pub mod types; pub mod utils; + +pub mod interfaces; diff --git a/p2p-broker/src/server_ws.rs b/p2p-broker/src/server_ws.rs index ab3b34a..73ff380 100644 --- a/p2p-broker/src/server_ws.rs +++ b/p2p-broker/src/server_ws.rs @@ -11,7 +11,8 @@ //! WebSocket implementation of the Broker -use crate::broker_store::config::ConfigMode; +use crate::interfaces::*; +use crate::types::*; use async_std::net::{TcpListener, TcpStream}; use async_std::sync::Mutex; use async_std::task; @@ -26,7 +27,10 @@ use p2p_net::utils::Sensitive; use p2p_repo::log::*; use p2p_repo::types::{PrivKey, PubKey}; use p2p_repo::utils::generate_keypair; +use std::collections::HashSet; use std::fs; +use std::net::SocketAddr; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::ops::Deref; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -67,11 +71,6 @@ pub async fn run_server_accept_one( log_debug!("data directory: {}", root.path().to_str().unwrap()); let store = LmdbKCVStore::open(root.path(), master_key); - // TODO: remove this part - // let server: BrokerServer = - // BrokerServer::new(store, ConfigMode::Local).expect("starting broker"); - // let server_arc = Arc::new(server); - let socket = TcpListener::bind(addrs.as_str()).await?; log_debug!("Listening on {}", addrs.as_str()); let mut connections = socket.incoming(); @@ -83,36 +82,126 @@ pub async fn run_server_accept_one( Ok(()) } use p2p_net::utils::U8Array; -pub async fn run_server( - addr: &str, - port: u16, +pub async fn run_server_v0( peer_priv_key: Sensitive<[u8; 32]>, peer_pub_key: PubKey, + wallet_master_key: Sensitive<[u8; 32]>, + config: DaemonConfigV0, mut path: PathBuf, ) -> Result<(), ()> { - let addrs = format!("{}:{}", addr, port); + // check config + + let mut should_run = false; + for overlay_conf in config.overlays_configs { + if overlay_conf.core != BrokerOverlayPermission::Nobody + || overlay_conf.server != BrokerOverlayPermission::Nobody + { + should_run = true; + break; + } + } + if !should_run { + log_err!("There isn't any overlay_config that should run as core or server. Check your config. cannot start"); + return Err(()); + } + + let listeners: HashSet = HashSet::new(); + for listener in &config.listeners { + let mut id = listener.interface_name.clone(); + id.push('@'); + id.push_str(&listener.port.to_string()); + if listeners.contains(&id) { + log_err!( + "The listener {} is defined twice. Check your config file. cannot start", + id + ); + return Err(()); + } + } //let root = tempfile::Builder::new().prefix("ngd").tempdir().unwrap(); path.push("storage"); std::fs::create_dir_all(path.clone()).unwrap(); //log::info!("Home directory is {}"); + // TODO: open wallet let master_key: [u8; 32] = [0; 32]; let store = LmdbKCVStore::open(&path, master_key); - // TODO: remove this part - // let server: BrokerServer = - // BrokerServer::new(store, ConfigMode::Local).expect("starting broker"); - // let server_arc = Arc::new(server); - - let socket = TcpListener::bind(addrs.as_str()) - .await - .map_err(|e| log_err!("bind error: {}", e.to_string()))?; - log_debug!("Listening on {}", addrs.as_str()); - let mut connections = socket.incoming(); + let interfaces = get_interface(); + let mut listeners: Vec = vec![]; + for listener in config.listeners { + match find_name(&interfaces, &listener.interface_name) { + None => { + log_err!( + "The interface {} does not exist on your host. Check your config file. cannot start", + listener.interface_name + ); + return Err(()); + } + Some(interface) => { + let mut ips: Vec = interface + .ipv4 + .iter() + .filter_map(|ip| { + if interface.if_type.is_ipv4_valid_for_type(&ip.addr) { + Some(SocketAddr::new(IpAddr::V4(ip.addr), listener.port)) + } else { + None + } + }) + .collect(); + if ips.len() == 0 { + log_err!( + "The interface {} does not have any IPv4 address. cannot start", + listener.interface_name + ); + return Err(()); + } + if listener.ipv6 { + let mut ipv6s: Vec = interface + .ipv6 + .iter() + .filter_map(|ip| { + if interface.if_type.is_ipv6_valid_for_type(&ip.addr) { + Some(SocketAddr::new(IpAddr::V6(ip.addr), listener.port)) + } else { + None + } + }) + .collect(); + ips.append(&mut ipv6s); + } + + let ips_string = ips + .iter() + .map(|ip| ip.to_string()) + .collect::>() + .join(", "); + let listener = TcpListener::bind(ips.as_slice()).await.map_err(|e| { + log_err!( + "cannot bind to {} with addresses {} : {}", + interface.name, + ips_string, + e.to_string() + ) + })?; + log_info!("Listening on {} {}", interface.name, ips_string); + listeners.push(listener); + } + } + } - while let Some(tcp) = connections.next().await { + // select on all listeners + let mut incoming = futures::stream::select_all( + listeners + .into_iter() + .map(TcpListener::into_incoming) + .map(Box::pin), + ); + // Iterate over all incoming connections + while let Some(tcp) = incoming.next().await { accept( tcp.unwrap(), Sensitive::<[u8; 32]>::from_slice(peer_priv_key.deref()), @@ -120,5 +209,6 @@ pub async fn run_server( ) .await; } + Ok(()) } diff --git a/p2p-broker/src/types.rs b/p2p-broker/src/types.rs index 44353c7..35ac3ef 100644 --- a/p2p-broker/src/types.rs +++ b/p2p-broker/src/types.rs @@ -129,3 +129,18 @@ impl BrokerOverlayConfigV0 { } } } + +/// DaemonConfig Version 0 +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct DaemonConfigV0 { + /// List of listeners for TCP (HTTP) incoming connections + pub listeners: Vec, + + pub overlays_configs: Vec, +} + +/// Daemon config +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum DaemonConfig { + V0(DaemonConfigV0), +}