/* * Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers * All rights reserved. * Licensed under the Apache License, Version 2.0 * * or the MIT license , * at your option. All files in the project carrying such * notice may not be copied, modified, or distributed except * according to those terms. */ //! WebSocket implementation of the Broker use crate::interfaces::*; use crate::server_storage::RocksdbServerStorage; use crate::types::*; use async_std::io::ReadExt; use async_std::net::{TcpListener, TcpStream}; use async_std::sync::Mutex; use async_std::task; use async_tungstenite::accept_hdr_async; use async_tungstenite::tungstenite::handshake::server::{ Callback, ErrorResponse, Request, Response, }; use async_tungstenite::tungstenite::http::{ header::{CONNECTION, HOST, ORIGIN, UPGRADE}, HeaderValue, Method, StatusCode, Uri, Version, }; use async_tungstenite::tungstenite::protocol::Message; use futures::{SinkExt, StreamExt}; use once_cell::sync::Lazy; use once_cell::sync::OnceCell; use p2p_client_ws::remote_ws::ConnectionWebSocket; use p2p_net::broker::*; use p2p_net::connection::IAccept; use p2p_net::types::*; use p2p_net::utils::get_domain_without_port; use p2p_net::utils::is_private_ip; use p2p_net::utils::is_public_ip; use p2p_net::NG_BOOTSTRAP_LOCAL_PATH; use p2p_repo::log::*; use p2p_repo::types::SymKey; use p2p_repo::types::{PrivKey, PubKey}; use p2p_repo::utils::generate_keypair; use rust_embed::RustEmbed; use serde_json::json; use std::collections::HashMap; use std::collections::HashSet; use std::fs; use std::net::SocketAddr; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::num::NonZeroU8; use std::ops::Deref; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::{thread, time}; use tempfile::Builder; static LISTENERS_INFO: OnceCell<(HashMap, HashMap)> = OnceCell::new(); static BOOTSTRAP_STRING: OnceCell = OnceCell::new(); struct SecurityCallback { remote_bind_address: BindAddress, local_bind_address: BindAddress, } impl SecurityCallback { fn new(remote_bind_address: BindAddress, local_bind_address: BindAddress) -> Self { Self { remote_bind_address, local_bind_address, } } } fn make_error(code: StatusCode) -> ErrorResponse { Response::builder().status(code).body(None).unwrap() } fn check_no_origin(origin: Option<&HeaderValue>) -> Result<(), ErrorResponse> { match origin { Some(_) => Err(make_error(StatusCode::FORBIDDEN)), None => Ok(()), } } fn check_origin_is_url( origin: Option<&HeaderValue>, domains: &Vec, ) -> Result<(), ErrorResponse> { match origin { None => Ok(()), Some(val) => { for domain in domains { if val.to_str().unwrap().starts_with(domain.as_str()) { return Ok(()); } } Err(make_error(StatusCode::FORBIDDEN)) } } } fn check_xff_is_public_or_private( xff: Option<&HeaderValue>, none_is_ok: bool, public: bool, ) -> Result<(), ErrorResponse> { match xff { None => { if none_is_ok { Ok(()) } else { Err(make_error(StatusCode::FORBIDDEN)) } } Some(val) => { let mut ip_str = val .to_str() .map_err(|_| make_error(StatusCode::FORBIDDEN))?; if ip_str.starts_with("::ffff:") { ip_str = ip_str.strip_prefix("::ffff:").unwrap(); } let ip: IpAddr = ip_str .parse() .map_err(|_| make_error(StatusCode::FORBIDDEN))?; if public && !is_public_ip(&ip) || !public && !is_private_ip(&ip) { Err(make_error(StatusCode::FORBIDDEN)) } else { Ok(()) } } } } fn check_no_xff(xff: Option<&HeaderValue>) -> Result<(), ErrorResponse> { match xff { None => Ok(()), Some(_) => Err(make_error(StatusCode::FORBIDDEN)), } } fn check_host(host: Option<&HeaderValue>, hosts: Vec) -> Result<(), ErrorResponse> { match host { None => Err(make_error(StatusCode::FORBIDDEN)), Some(val) => { for hos in hosts { if val.to_str().unwrap().starts_with(&hos) { return Ok(()); } } Err(make_error(StatusCode::FORBIDDEN)) } } } fn check_host_in_addrs( host: Option<&HeaderValue>, addrs: &Vec, ) -> Result<(), ErrorResponse> { match host { None => Err(make_error(StatusCode::FORBIDDEN)), Some(val) => { for ba in addrs { if val.to_str().unwrap().starts_with(&ba.ip.to_string()) { return Ok(()); } } Err(make_error(StatusCode::FORBIDDEN)) } } } fn prepare_domain_url_and_host( accept_forward_for: &AcceptForwardForV0, ) -> (Vec, Vec) { let domain_str = accept_forward_for.get_domain(); let url = ["https://", domain_str].concat(); let hosts_str = vec![domain_str.to_string()]; let urls_str = vec![url]; (hosts_str, urls_str) } fn prepare_urls_from_private_addrs(addrs: &Vec, port: u16) -> Vec { let port_str = if port != 80 { [":", &port.to_string()].concat() } else { "".to_string() }; let mut res: Vec = vec![]; for addr in addrs { let url = ["http://", &addr.ip.to_string(), &port_str].concat(); res.push(url); } res } #[derive(RustEmbed)] #[folder = "../ng-app/dist-file/"] #[include = "*.sha256"] #[include = "*.gzip"] struct App; static ROBOTS: &str = "User-agent: *\r\nDisallow: /"; fn upgrade_ws_or_serve_app( connection: Option<&HeaderValue>, remote: IP, serve_app: bool, uri: &Uri, last_etag: Option<&HeaderValue>, cors: Option<&str>, ) -> Result<(), ErrorResponse> { if connection.is_some() && connection .unwrap() .to_str() .unwrap() .split(|c| c == ' ' || c == ',') .any(|p| p.eq_ignore_ascii_case("Upgrade")) { return Ok(()); } if serve_app && (remote.is_private() || remote.is_loopback()) { if uri == "/" { log_debug!("Serving the app"); let sha_file = App::get("index.sha256").unwrap(); let sha = format!( "\"{}\"", std::str::from_utf8(sha_file.data.as_ref()).unwrap() ); if last_etag.is_some() && last_etag.unwrap().to_str().unwrap() == sha { // return 304 let res = Response::builder() .status(StatusCode::NOT_MODIFIED) .header("Cache-Control", "max-age=31536000, must-revalidate") .header("ETag", sha) .body(None) .unwrap(); return Err(res); } let file = App::get("index.gzip").unwrap(); let res = Response::builder() .status(StatusCode::OK) .header("Content-Type", "text/html") .header("Cache-Control", "max-age=31536000, must-revalidate") .header("Content-Encoding", "gzip") .header("ETag", sha) .body(Some(file.data.to_vec())) .unwrap(); return Err(res); } else if uri == NG_BOOTSTRAP_LOCAL_PATH { log_debug!("Serving bootstrap"); let mut builder = Response::builder().status(StatusCode::OK); if cors.is_some() { builder = builder.header("Access-Control-Allow-Origin", cors.unwrap()); } let res = builder .header("Content-Type", "text/json") .header("Cache-Control", "max-age=0, must-revalidate") .body(Some(BOOTSTRAP_STRING.get().unwrap().as_bytes().to_vec())) .unwrap(); return Err(res); } else if uri == "/robots.txt" { let res = Response::builder() .status(StatusCode::OK) .header("Content-Type", "text/plain") .header("Cache-Control", "max-age=3600, must-revalidate") .body(Some(ROBOTS.as_bytes().to_vec())) .unwrap(); return Err(res); } } Err(make_error(StatusCode::FORBIDDEN)) } impl Callback for SecurityCallback { fn on_request(self, request: &Request) -> Result<(), ErrorResponse> { let local_urls = LOCAL_URLS .to_vec() .iter() .map(ToString::to_string) .collect(); let local_hosts = LOCAL_HOSTS .to_vec() .iter() .map(ToString::to_string) .collect(); let (listeners, bind_addresses) = LISTENERS_INFO.get().ok_or( Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body(None) .unwrap(), )?; // check that the remote address is allowed to connect on the listener let listener_id = bind_addresses .get(&self.local_bind_address) .ok_or(make_error(StatusCode::FORBIDDEN))?; let listener = listeners .get(listener_id) .ok_or(make_error(StatusCode::FORBIDDEN))?; if request.method() != Method::GET { return Err(make_error(StatusCode::METHOD_NOT_ALLOWED)); } if request.version() != Version::HTTP_11 { return Err(make_error(StatusCode::HTTP_VERSION_NOT_SUPPORTED)); } let xff = request.headers().get("X-Forwarded-For"); let connection = request.headers().get(CONNECTION); let host = request.headers().get(HOST); let origin = request.headers().get(ORIGIN); let remote = self.remote_bind_address.ip; let last_etag = request.headers().get("If-None-Match"); let uri = request.uri(); log_debug!( "connection:{:?} origin:{:?} host:{:?} xff:{:?} remote:{:?} local:{:?} uri:{:?}", connection, origin, host, xff, remote, self.local_bind_address, uri ); match listener.config.if_type { InterfaceType::Public => { if !remote.is_public() { return Err(make_error(StatusCode::FORBIDDEN)); } check_no_xff(xff)?; check_no_origin(origin)?; // let mut urls_str = vec![]; // if !listener.config.refuse_clients { // urls_str.push(APP_NG_ONE_URL.to_string()); // } // check_origin_is_url(origin, urls_str)?; check_host_in_addrs(host, &listener.addrs)?; log_debug!( "accepted core with refuse_clients {}", listener.config.refuse_clients ); return upgrade_ws_or_serve_app( connection, remote, listener.config.serve_app && !listener.config.refuse_clients, uri, last_etag, None, ); } InterfaceType::Loopback => { if !remote.is_loopback() { return Err(make_error(StatusCode::FORBIDDEN)); } if listener.config.accept_forward_for.is_public_domain() { let (mut hosts_str, mut urls_str) = prepare_domain_url_and_host(&listener.config.accept_forward_for); if listener.config.accept_direct { hosts_str = [hosts_str, local_hosts].concat(); // TODO local_urls might need a trailing :port, but it is ok for now as we do starts_with urls_str = [urls_str, local_urls].concat(); } check_origin_is_url(origin, &urls_str)?; check_host(host, hosts_str)?; check_xff_is_public_or_private(xff, listener.config.accept_direct, true)?; log_debug!( "accepted loopback PUBLIC_DOMAIN with direct {}", listener.config.accept_direct ); return upgrade_ws_or_serve_app( connection, remote, listener.config.serve_app, uri, last_etag, origin.map(|or| or.to_str().unwrap()).and_then(|val| { if listener.config.refuse_clients { None } else { Some(val) } }), ); } else if listener.config.accept_forward_for.is_private_domain() { let (hosts_str, urls_str) = prepare_domain_url_and_host(&listener.config.accept_forward_for); check_origin_is_url(origin, &urls_str)?; check_host(host, hosts_str)?; check_xff_is_public_or_private(xff, false, false)?; log_debug!("accepted loopback PRIVATE_DOMAIN"); return upgrade_ws_or_serve_app( connection, remote, listener.config.serve_app, uri, last_etag, origin.map(|or| or.to_str().unwrap()), ); } else if listener.config.accept_forward_for == AcceptForwardForV0::No { check_host(host, local_hosts)?; check_no_xff(xff)?; // TODO local_urls might need a trailing :port, but it is ok for now as we do starts_with check_origin_is_url(origin, &local_urls)?; log_debug!("accepted loopback DIRECT"); return upgrade_ws_or_serve_app( connection, remote, listener.config.serve_app, uri, last_etag, origin.map(|or| or.to_str().unwrap()), ); } } InterfaceType::Private => { if listener.config.accept_forward_for.is_public_static() || listener.config.accept_forward_for.is_public_dyn() { if !listener.config.accept_direct && !remote.is_public() || listener.config.accept_direct && !remote.is_private() && !remote.is_public() { return Err(make_error(StatusCode::FORBIDDEN)); } check_no_xff(xff)?; let mut addrs = listener .config .accept_forward_for .get_public_bind_addresses(); let mut urls_str = vec![]; // if !listener.config.refuse_clients { // urls_str.push(APP_NG_ONE_URL.to_string()); // } if listener.config.accept_direct { addrs.extend(&listener.addrs); urls_str = [ urls_str, prepare_urls_from_private_addrs(&listener.addrs, listener.config.port), ] .concat(); } check_origin_is_url(origin, &urls_str)?; check_host_in_addrs(host, &addrs)?; log_debug!("accepted private PUBLIC_STATIC or PUBLIC_DYN with direct {} with refuse_clients {}",listener.config.accept_direct, listener.config.refuse_clients); return upgrade_ws_or_serve_app( connection, remote, listener.config.serve_app, uri, last_etag, origin.map(|or| or.to_str().unwrap()), ); } else if listener.config.accept_forward_for.is_public_domain() { if !remote.is_private() { return Err(make_error(StatusCode::FORBIDDEN)); } check_xff_is_public_or_private(xff, listener.config.accept_direct, true)?; let (mut hosts_str, mut urls_str) = prepare_domain_url_and_host(&listener.config.accept_forward_for); if listener.config.accept_direct { for addr in listener.addrs.iter() { let str = addr.ip.to_string(); hosts_str.push(str); } urls_str = [ urls_str, prepare_urls_from_private_addrs(&listener.addrs, listener.config.port), ] .concat(); } check_origin_is_url(origin, &urls_str)?; check_host(host, hosts_str)?; log_debug!( "accepted private PUBLIC_DOMAIN with direct {}", listener.config.accept_direct ); return upgrade_ws_or_serve_app( connection, remote, listener.config.serve_app, uri, last_etag, origin.map(|or| or.to_str().unwrap()).and_then(|val| { if listener.config.refuse_clients { None } else { Some(val) } }), ); } else if listener.config.accept_forward_for == AcceptForwardForV0::No { if !remote.is_private() { return Err(make_error(StatusCode::FORBIDDEN)); } check_no_xff(xff)?; check_host_in_addrs(host, &listener.addrs)?; let urls_str = prepare_urls_from_private_addrs(&listener.addrs, listener.config.port); check_origin_is_url(origin, &urls_str)?; log_debug!("accepted private DIRECT"); return upgrade_ws_or_serve_app( connection, remote, listener.config.serve_app, uri, last_etag, origin.map(|or| or.to_str().unwrap()), ); } } _ => {} } Err(make_error(StatusCode::FORBIDDEN)) } } pub async fn accept(tcp: TcpStream, peer_priv_key: PrivKey) { let remote_addr = tcp.peer_addr().unwrap(); let remote_bind_address: BindAddress = (&remote_addr).into(); let local_addr = tcp.local_addr().unwrap(); let local_bind_address: BindAddress = (&local_addr).into(); let ws = accept_hdr_async( tcp, SecurityCallback::new(remote_bind_address, local_bind_address), ) .await; if ws.is_err() { log_debug!("websocket rejected"); return; } log_debug!("websocket accepted"); let cws = ConnectionWebSocket {}; let base = cws .accept( remote_bind_address, local_bind_address, peer_priv_key, ws.unwrap(), ) .await .unwrap(); let res = BROKER .write() .await .accept(base, remote_bind_address, local_bind_address) .await; } pub async fn run_server_accept_one( addr: &str, port: u16, peer_priv_key: PrivKey, peer_pub_key: PubKey, ) -> std::io::Result<()> { let addrs = format!("{}:{}", addr, port); let root = tempfile::Builder::new().prefix("ngd").tempdir().unwrap(); // let master_key: [u8; 32] = [0; 32]; // std::fs::create_dir_all(root.path()).unwrap(); // log_debug!("data directory: {}", root.path().to_str().unwrap()); // let store = RocksdbKCVStore::open(root.path(), master_key); let socket = TcpListener::bind(addrs.as_str()).await?; log_debug!("Listening on {}", addrs.as_str()); let mut connections = socket.incoming(); let tcp = connections.next().await.unwrap()?; { //BROKER.write().await.set_my_peer_id(peer_pub_key); } accept(tcp, peer_priv_key).await; Ok(()) } pub async fn run_server_v0( peer_priv_key: PrivKey, peer_id: PubKey, wallet_master_key: SymKey, config: DaemonConfigV0, mut path: PathBuf, admin_invite: bool, ) -> Result<(), ()> { // check config let mut run_core = false; let mut run_server = false; for overlay_conf in config.overlays_configs.iter() { if overlay_conf.core != BrokerOverlayPermission::Nobody { run_core = true; } if overlay_conf.server != BrokerOverlayPermission::Nobody { run_server = true; } } if !run_core && !run_server { log_err!("There isn't any overlay_config that should run as core or server. Check your config. cannot start"); return Err(()); } if run_core && !run_server { log_warn!("There isn't any overlay_config that should run as server. This is a misconfiguration as a core server that cannot receive client connections is useless"); } let mut listeners: HashSet = HashSet::new(); for listener in &config.listeners { let id: String = listener.to_string(); if !listeners.insert(id.clone()) { log_err!( "The listener {} is defined twice. Check your config file. cannot start", id ); return Err(()); } } let interfaces = get_interface(); log_debug!("interfaces {:?}", interfaces); let mut listener_infos: HashMap = HashMap::new(); let mut listeners_addrs: Vec<(Vec, String)> = vec![]; let mut listeners: Vec = vec![]; let mut accept_clients = false; //let mut serve_app = false; // TODO: check that there is only one PublicDyn or one PublicStatic or one Core let mut servers: Vec = vec![]; // Preparing the listeners addrs and infos for listener in config.listeners { if !listener.accept_direct && listener.accept_forward_for == AcceptForwardForV0::No { log_warn!( "The interface {} does not accept direct connections nor is configured to forward. it is therefor disabled", listener.interface_name ); continue; } match find_name(&interfaces, &listener.interface_name) { None => { log_err!( "The interface {} does not exist on your host. Check your config file. cannot start", listener.interface_name ); return Err(()); } Some(interface) => { let mut addrs: Vec = interface .ipv4 .iter() .filter_map(|ip| { if interface.if_type.is_ipv4_valid_for_type(&ip.addr) { Some(SocketAddr::new(IpAddr::V4(ip.addr), listener.port)) } else { None } }) .collect(); if addrs.len() == 0 { log_err!( "The interface {} does not have any IPv4 address. cannot start", listener.interface_name ); return Err(()); } if listener.ipv6 { let mut ipv6s: Vec = interface .ipv6 .iter() .filter_map(|ip| { if interface.if_type.is_ipv6_valid_for_type(&ip.addr) || listener.should_bind_public_ipv6_to_private_interface(ip.addr) { Some(SocketAddr::new(IpAddr::V6(ip.addr), listener.port)) } else { None } }) .collect(); addrs.append(&mut ipv6s); } if !listener.refuse_clients { accept_clients = true; } if listener.refuse_clients && listener.accept_forward_for.is_public_domain() { log_warn!( "You have disabled accepting connections from clients on {}. This is unusual as --domain and --domain-private listeners are meant to answer to clients only. This will activate the relay_websocket on this listener. Is it really intended?", listener.interface_name ); } // if listener.serve_app { // serve_app = true; // } let bind_addresses: Vec = addrs.iter().map(|addr| addr.into()).collect(); let server_types = listener.get_bootstraps(bind_addresses.clone()); let common_peer_id = listener.accept_forward_for.domain_with_common_peer_id(); for server_type in server_types { servers.push(BrokerServerV0 { peer_id: common_peer_id.unwrap_or(peer_id), server_type, }) } let listener_id: String = listener.to_string(); let listener_info = ListenerInfo { config: listener, addrs: bind_addresses, }; listener_infos.insert(listener_id, listener_info); listeners_addrs.push((addrs, interface.name)); } } } if listeners_addrs.len() == 0 { log_err!("No listener configured. cannot start",); return Err(()); } if !accept_clients { log_warn!("There isn't any listener that accept clients. This is a misconfiguration as a core server that cannot receive client connections is useless"); } let bootstrap_v0 = BootstrapContentV0 { servers }; let bootstrap = BootstrapContent::V0(bootstrap_v0.clone()); BOOTSTRAP_STRING.set(json!(bootstrap).to_string()).unwrap(); // saving the infos in the broker. This needs to happen before we start listening, as new incoming connections can happen anytime after that. // and we need those infos for permission checking. { //let root = tempfile::Builder::new().prefix("ngd").tempdir().unwrap(); path.push("storage"); std::fs::create_dir_all(path.clone()).unwrap(); // opening the server storage (that contains the encryption keys for each store/overlay ) let broker_storage = RocksdbServerStorage::open( &mut path, wallet_master_key, if admin_invite { Some(bootstrap_v0) } else { None }, ) .map_err(|e| log_err!("Error while opening server storage: {:?}", e))?; let mut broker = BROKER.write().await; broker.set_server_storage(broker_storage); LISTENERS_INFO .set(broker.set_listeners(listener_infos)) .unwrap(); let server_config = ServerConfig { overlays_configs: config.overlays_configs, registration: config.registration, admin_user: config.admin_user, registration_url: config.registration_url, peer_id, bootstrap, }; broker.set_server_config(server_config); } // Actually starting the listeners for addrs in listeners_addrs { let addrs_string = addrs .0 .iter() .map(SocketAddr::to_string) .collect::>() .join(", "); for addr in addrs.0 { let tcp_listener = TcpListener::bind(addr).await.map_err(|e| { log_err!( "cannot bind to {} with addresses {} : {}", addrs.1, addrs_string, e.to_string() ) })?; listeners.push(tcp_listener); } log_info!("Listening on {} {}", addrs.1, addrs_string); } // select on all listeners let mut incoming = futures::stream::select_all( listeners .into_iter() .map(TcpListener::into_incoming) .map(Box::pin), ); // Iterate over all incoming connections // TODO : select on the shutdown stream too while let Some(tcp) = incoming.next().await { // TODO select peer_priv_ket according to config. if --domain-peer present and the connection is for that listener (PublicDomainPeer) then use the peer configured there accept(tcp.unwrap(), peer_priv_key.clone()).await; } Ok(()) }