listening on all listeners defined in config

master
Niko 11 months ago
parent 5453b661be
commit fdcb241ef3
  1. 2
      Cargo.lock
  2. 1
      ngd/Cargo.toml
  3. 170
      ngd/src/main.rs
  4. 15
      ngd/src/types.rs
  5. 1
      p2p-broker/Cargo.toml
  6. 172
      p2p-broker/src/interfaces.rs
  7. 2
      p2p-broker/src/lib.rs
  8. 132
      p2p-broker/src/server_ws.rs
  9. 15
      p2p-broker/src/types.rs

2
Cargo.lock generated

@ -2766,7 +2766,6 @@ dependencies = [
"async-std",
"base64-url",
"clap",
"default-net",
"env_logger",
"lazy_static",
"log",
@ -2971,6 +2970,7 @@ dependencies = [
"async-tungstenite",
"blake3",
"chacha20",
"default-net",
"futures",
"getrandom 0.2.9",
"hex",

@ -15,7 +15,6 @@ p2p-broker = { path = "../p2p-broker" }
p2p-net = { path = "../p2p-net" }
p2p-repo = { path = "../p2p-repo", features = ["server_log_output"] }
async-std = { version = "1.12.0", features = ["attributes"] }
default-net = "0.15"
log = "0.4"
env_logger = "0.10"
clap = { version = "4.3.4", features = ["derive","env","string"] }

@ -16,7 +16,8 @@ mod cli;
use crate::cli::*;
use crate::types::*;
use clap::Parser;
use p2p_broker::server_ws::run_server;
use p2p_broker::interfaces::*;
use p2p_broker::server_ws::run_server_v0;
use p2p_broker::types::*;
use p2p_broker::utils::*;
use p2p_net::types::*;
@ -41,157 +42,6 @@ use addr::psl::List;
use lazy_static::lazy_static;
use regex::Regex;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum InterfaceType {
Loopback,
Private,
Public,
Invalid,
}
impl InterfaceType {
pub fn is_ipv4_valid_for_type(&self, ip: &Ipv4Addr) -> bool {
match self {
InterfaceType::Loopback => ip.is_loopback(),
InterfaceType::Public => is_public_ipv4(ip),
// we allow to bind to link-local for IPv4
InterfaceType::Private => is_ipv4_private(ip),
_ => false,
}
}
pub fn is_ipv6_valid_for_type(&self, ip: &Ipv6Addr) -> bool {
match self {
InterfaceType::Loopback => ip.is_loopback(),
InterfaceType::Public => is_public_ipv6(ip),
// we do NOT allow to bind to link-local for IPv6
InterfaceType::Private => is_ipv6_private(ip),
_ => false,
}
}
}
pub fn print_ipv4(ip: &default_net::ip::Ipv4Net) -> String {
format!("{}/{}", ip.addr, ip.prefix_len)
}
pub fn print_ipv6(ip: &default_net::ip::Ipv6Net) -> String {
format!("{}/{}", ip.addr, ip.prefix_len)
}
#[derive(Clone, Debug)]
pub struct Interface {
pub if_type: InterfaceType,
pub name: String,
pub mac_addr: Option<default_net::interface::MacAddr>,
/// List of Ipv4Net for the network interface
pub ipv4: Vec<default_net::ip::Ipv4Net>,
/// List of Ipv6Net for the network interface
pub ipv6: Vec<default_net::ip::Ipv6Net>,
}
fn find_first(list: &Vec<Interface>, iftype: InterfaceType) -> Option<Interface> {
for inf in list {
if inf.if_type == iftype {
return Some(inf.clone());
}
}
None
}
fn find_first_or_name(
list: &Vec<Interface>,
iftype: InterfaceType,
name: &String,
) -> Option<Interface> {
for inf in list {
if (name == "default" || *name == inf.name) && inf.if_type == iftype {
return Some(inf.clone());
}
}
None
}
fn is_public_ipv4(ip: &Ipv4Addr) -> bool {
// TODO, use core::net::Ipv6Addr.is_global when it will be stable
return is_ipv4_global(ip);
}
fn is_public_ipv6(ip: &Ipv6Addr) -> bool {
// TODO, use core::net::Ipv6Addr.is_global when it will be stable
return is_ipv6_global(ip);
}
fn is_public_ip(ip: &IpAddr) -> bool {
match ip {
IpAddr::V4(v4) => is_public_ipv4(v4),
IpAddr::V6(v6) => is_public_ipv6(v6),
}
}
fn is_private_ip(ip: &IpAddr) -> bool {
match ip {
IpAddr::V4(v4) => is_ipv4_private(v4),
IpAddr::V6(v6) => is_ipv6_private(v6),
}
}
pub fn get_interface() -> Vec<Interface> {
let mut res: Vec<Interface> = vec![];
let interfaces = default_net::get_interfaces();
for interface in interfaces {
if interface.ipv4.len() > 0 {
let first_v4 = interface.ipv4[0].addr;
let if_type = if first_v4.is_loopback() {
InterfaceType::Loopback
} else if is_ipv4_private(&first_v4) {
InterfaceType::Private
} else if is_public_ipv4(&first_v4) {
InterfaceType::Public
} else {
continue;
};
let interf = Interface {
if_type,
name: interface.name,
mac_addr: interface.mac_addr,
ipv4: interface.ipv4,
ipv6: interface.ipv6,
};
res.push(interf);
}
}
res
}
pub fn print_interfaces() {
let interfaces = get_interface();
for interface in interfaces {
println!("{} \t{:?}", interface.name, interface.if_type);
println!(
"\tIPv4: {}",
interface
.ipv4
.iter()
.map(|ip| print_ipv4(ip))
.collect::<Vec<String>>()
.join(" ")
);
println!(
"\tIPv6: {}",
interface
.ipv6
.iter()
.map(|ip| print_ipv6(ip))
.collect::<Vec<String>>()
.join(" ")
);
if let Some(mac_addr) = interface.mac_addr {
println!("\tMAC: {}", mac_addr);
}
}
}
fn decode_key(key_string: &String) -> Result<[u8; 32], ()> {
let vec = base64_url::decode(key_string).map_err(|_| log_err!("key has invalid content"))?;
Ok(*slice_as_array!(&vec, [u8; 32])
@ -1062,7 +912,7 @@ async fn main_inner() -> Result<(), ()> {
config_path.to_str().unwrap()
);
log_info!(
"You cannot use Quick config options anymore on the command line in your next start of the server. But you can go to modify the config file directly, or delete it.",
"You not be able to use any Quick config options anymore on the command line at the next command-line start of the server. But you can go to modify the config file directly, or delete it.",
);
}
} else {
@ -1115,7 +965,19 @@ async fn main_inner() -> Result<(), ()> {
log_info!("PeerId of node: {}", pubkey);
debug_println!("Private key of peer: {}", prix_key_encoded);
run_server("127.0.0.1", WS_PORT, privkey, pubkey, path).await?;
match (config.unwrap()) {
DaemonConfig::V0(v0) => {
run_server_v0(
privkey,
pubkey,
Sensitive::<[u8; 32]>::from_slice(&keys[2]),
v0,
path,
)
.await?
}
}
Ok(())
}

@ -10,18 +10,3 @@ use p2p_broker::types::BrokerOverlayConfigV0;
use p2p_broker::types::ListenerV0;
use p2p_repo::types::PrivKey;
use serde::{Deserialize, Serialize};
/// DaemonConfig Version 0
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DaemonConfigV0 {
/// List of listeners for TCP (HTTP) incoming connections
pub listeners: Vec<ListenerV0>,
pub overlays_configs: Vec<BrokerOverlayConfigV0>,
}
/// Daemon config
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum DaemonConfig {
V0(DaemonConfigV0),
}

@ -25,6 +25,7 @@ hex = "0.4.3"
async-trait = "0.1.64"
async-tungstenite = { version = "0.17.2", features = ["async-std-runtime"] }
blake3 = "1.3.1"
default-net = "0.15"
[target.'cfg(target_arch = "wasm32")'.dependencies.getrandom]
version = "0.2.7"

@ -0,0 +1,172 @@
/*
* Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
* All rights reserved.
* Licensed under the Apache License, Version 2.0
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
* at your option. All files in the project carrying such
* notice may not be copied, modified, or distributed except
* according to those terms.
*/
use p2p_net::utils::{is_ipv4_global, is_ipv4_private, is_ipv6_global, is_ipv6_private};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum InterfaceType {
Loopback,
Private,
Public,
Invalid,
}
impl InterfaceType {
pub fn is_ipv4_valid_for_type(&self, ip: &Ipv4Addr) -> bool {
match self {
InterfaceType::Loopback => ip.is_loopback(),
InterfaceType::Public => is_public_ipv4(ip),
// we allow to bind to link-local for IPv4
InterfaceType::Private => is_ipv4_private(ip),
_ => false,
}
}
pub fn is_ipv6_valid_for_type(&self, ip: &Ipv6Addr) -> bool {
match self {
InterfaceType::Loopback => ip.is_loopback(),
InterfaceType::Public => is_public_ipv6(ip),
// we do NOT allow to bind to link-local for IPv6
InterfaceType::Private => is_ipv6_private(ip),
_ => false,
}
}
}
pub fn print_ipv4(ip: &default_net::ip::Ipv4Net) -> String {
format!("{}/{}", ip.addr, ip.prefix_len)
}
pub fn print_ipv6(ip: &default_net::ip::Ipv6Net) -> String {
format!("{}/{}", ip.addr, ip.prefix_len)
}
#[derive(Clone, Debug)]
pub struct Interface {
pub if_type: InterfaceType,
pub name: String,
pub mac_addr: Option<default_net::interface::MacAddr>,
/// List of Ipv4Net for the network interface
pub ipv4: Vec<default_net::ip::Ipv4Net>,
/// List of Ipv6Net for the network interface
pub ipv6: Vec<default_net::ip::Ipv6Net>,
}
pub fn find_first(list: &Vec<Interface>, iftype: InterfaceType) -> Option<Interface> {
for inf in list {
if inf.if_type == iftype {
return Some(inf.clone());
}
}
None
}
pub fn find_first_or_name(
list: &Vec<Interface>,
iftype: InterfaceType,
name: &String,
) -> Option<Interface> {
for inf in list {
if (name == "default" || *name == inf.name) && inf.if_type == iftype {
return Some(inf.clone());
}
}
None
}
pub fn find_name(list: &Vec<Interface>, name: &String) -> Option<Interface> {
for inf in list {
if *name == inf.name {
return Some(inf.clone());
}
}
None
}
pub fn is_public_ipv4(ip: &Ipv4Addr) -> bool {
// TODO, use core::net::Ipv6Addr.is_global when it will be stable
return is_ipv4_global(ip);
}
pub fn is_public_ipv6(ip: &Ipv6Addr) -> bool {
// TODO, use core::net::Ipv6Addr.is_global when it will be stable
return is_ipv6_global(ip);
}
pub fn is_public_ip(ip: &IpAddr) -> bool {
match ip {
IpAddr::V4(v4) => is_public_ipv4(v4),
IpAddr::V6(v6) => is_public_ipv6(v6),
}
}
pub fn is_private_ip(ip: &IpAddr) -> bool {
match ip {
IpAddr::V4(v4) => is_ipv4_private(v4),
IpAddr::V6(v6) => is_ipv6_private(v6),
}
}
pub fn get_interface() -> Vec<Interface> {
let mut res: Vec<Interface> = vec![];
let interfaces = default_net::get_interfaces();
for interface in interfaces {
if interface.ipv4.len() > 0 {
let first_v4 = interface.ipv4[0].addr;
let if_type = if first_v4.is_loopback() {
InterfaceType::Loopback
} else if is_ipv4_private(&first_v4) {
InterfaceType::Private
} else if is_public_ipv4(&first_v4) {
InterfaceType::Public
} else {
continue;
};
let interf = Interface {
if_type,
name: interface.name,
mac_addr: interface.mac_addr,
ipv4: interface.ipv4,
ipv6: interface.ipv6,
};
res.push(interf);
}
}
res
}
pub fn print_interfaces() {
let interfaces = get_interface();
for interface in interfaces {
println!("{} \t{:?}", interface.name, interface.if_type);
println!(
"\tIPv4: {}",
interface
.ipv4
.iter()
.map(|ip| print_ipv4(ip))
.collect::<Vec<String>>()
.join(" ")
);
println!(
"\tIPv6: {}",
interface
.ipv6
.iter()
.map(|ip| print_ipv6(ip))
.collect::<Vec<String>>()
.join(" ")
);
if let Some(mac_addr) = interface.mac_addr {
println!("\tMAC: {}", mac_addr);
}
}
}

@ -5,3 +5,5 @@ pub mod server_ws;
pub mod types;
pub mod utils;
pub mod interfaces;

@ -11,7 +11,8 @@
//! WebSocket implementation of the Broker
use crate::broker_store::config::ConfigMode;
use crate::interfaces::*;
use crate::types::*;
use async_std::net::{TcpListener, TcpStream};
use async_std::sync::Mutex;
use async_std::task;
@ -26,7 +27,10 @@ use p2p_net::utils::Sensitive;
use p2p_repo::log::*;
use p2p_repo::types::{PrivKey, PubKey};
use p2p_repo::utils::generate_keypair;
use std::collections::HashSet;
use std::fs;
use std::net::SocketAddr;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::Arc;
@ -67,11 +71,6 @@ pub async fn run_server_accept_one(
log_debug!("data directory: {}", root.path().to_str().unwrap());
let store = LmdbKCVStore::open(root.path(), master_key);
// TODO: remove this part
// let server: BrokerServer =
// BrokerServer::new(store, ConfigMode::Local).expect("starting broker");
// let server_arc = Arc::new(server);
let socket = TcpListener::bind(addrs.as_str()).await?;
log_debug!("Listening on {}", addrs.as_str());
let mut connections = socket.incoming();
@ -83,36 +82,126 @@ pub async fn run_server_accept_one(
Ok(())
}
use p2p_net::utils::U8Array;
pub async fn run_server(
addr: &str,
port: u16,
pub async fn run_server_v0(
peer_priv_key: Sensitive<[u8; 32]>,
peer_pub_key: PubKey,
wallet_master_key: Sensitive<[u8; 32]>,
config: DaemonConfigV0,
mut path: PathBuf,
) -> Result<(), ()> {
let addrs = format!("{}:{}", addr, port);
// check config
let mut should_run = false;
for overlay_conf in config.overlays_configs {
if overlay_conf.core != BrokerOverlayPermission::Nobody
|| overlay_conf.server != BrokerOverlayPermission::Nobody
{
should_run = true;
break;
}
}
if !should_run {
log_err!("There isn't any overlay_config that should run as core or server. Check your config. cannot start");
return Err(());
}
let listeners: HashSet<String> = HashSet::new();
for listener in &config.listeners {
let mut id = listener.interface_name.clone();
id.push('@');
id.push_str(&listener.port.to_string());
if listeners.contains(&id) {
log_err!(
"The listener {} is defined twice. Check your config file. cannot start",
id
);
return Err(());
}
}
//let root = tempfile::Builder::new().prefix("ngd").tempdir().unwrap();
path.push("storage");
std::fs::create_dir_all(path.clone()).unwrap();
//log::info!("Home directory is {}");
// TODO: open wallet
let master_key: [u8; 32] = [0; 32];
let store = LmdbKCVStore::open(&path, master_key);
// TODO: remove this part
// let server: BrokerServer =
// BrokerServer::new(store, ConfigMode::Local).expect("starting broker");
// let server_arc = Arc::new(server);
let socket = TcpListener::bind(addrs.as_str())
.await
.map_err(|e| log_err!("bind error: {}", e.to_string()))?;
log_debug!("Listening on {}", addrs.as_str());
let mut connections = socket.incoming();
let interfaces = get_interface();
let mut listeners: Vec<TcpListener> = vec![];
for listener in config.listeners {
match find_name(&interfaces, &listener.interface_name) {
None => {
log_err!(
"The interface {} does not exist on your host. Check your config file. cannot start",
listener.interface_name
);
return Err(());
}
Some(interface) => {
let mut ips: Vec<SocketAddr> = interface
.ipv4
.iter()
.filter_map(|ip| {
if interface.if_type.is_ipv4_valid_for_type(&ip.addr) {
Some(SocketAddr::new(IpAddr::V4(ip.addr), listener.port))
} else {
None
}
})
.collect();
if ips.len() == 0 {
log_err!(
"The interface {} does not have any IPv4 address. cannot start",
listener.interface_name
);
return Err(());
}
if listener.ipv6 {
let mut ipv6s: Vec<SocketAddr> = interface
.ipv6
.iter()
.filter_map(|ip| {
if interface.if_type.is_ipv6_valid_for_type(&ip.addr) {
Some(SocketAddr::new(IpAddr::V6(ip.addr), listener.port))
} else {
None
}
})
.collect();
ips.append(&mut ipv6s);
}
let ips_string = ips
.iter()
.map(|ip| ip.to_string())
.collect::<Vec<String>>()
.join(", ");
let listener = TcpListener::bind(ips.as_slice()).await.map_err(|e| {
log_err!(
"cannot bind to {} with addresses {} : {}",
interface.name,
ips_string,
e.to_string()
)
})?;
log_info!("Listening on {} {}", interface.name, ips_string);
listeners.push(listener);
}
}
}
while let Some(tcp) = connections.next().await {
// select on all listeners
let mut incoming = futures::stream::select_all(
listeners
.into_iter()
.map(TcpListener::into_incoming)
.map(Box::pin),
);
// Iterate over all incoming connections
while let Some(tcp) = incoming.next().await {
accept(
tcp.unwrap(),
Sensitive::<[u8; 32]>::from_slice(peer_priv_key.deref()),
@ -120,5 +209,6 @@ pub async fn run_server(
)
.await;
}
Ok(())
}

@ -129,3 +129,18 @@ impl BrokerOverlayConfigV0 {
}
}
}
/// DaemonConfig Version 0
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DaemonConfigV0 {
/// List of listeners for TCP (HTTP) incoming connections
pub listeners: Vec<ListenerV0>,
pub overlays_configs: Vec<BrokerOverlayConfigV0>,
}
/// Daemon config
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum DaemonConfig {
V0(DaemonConfigV0),
}

Loading…
Cancel
Save