@ -21,11 +21,15 @@ use async_tungstenite::accept_hdr_async;
use async_tungstenite ::tungstenite ::handshake ::server ::{
Callback , ErrorResponse , Request , Response ,
} ;
use async_tungstenite ::tungstenite ::http ::header ::{ CONNECTION , HOST , ORIGIN , UPGRADE } ;
use async_tungstenite ::tungstenite ::http ::HeaderValue ;
use async_tungstenite ::tungstenite ::http ::StatusCode ;
use async_tungstenite ::tungstenite ::http ::{
header ::{ CONNECTION , HOST , ORIGIN , UPGRADE } ,
HeaderValue , Method , StatusCode , Uri , Version ,
} ;
use async_tungstenite ::tungstenite ::protocol ::Message ;
use futures ::{ SinkExt , StreamExt } ;
use once_cell ::sync ::Lazy ;
use once_cell ::sync ::OnceCell ;
use p2p_client_ws ::remote_ws ::ConnectionWebSocket ;
use p2p_net ::broker ::* ;
@ -37,11 +41,14 @@ use p2p_net::utils::{get_domain_without_port, Sensitive, U8Array};
use p2p_repo ::log ::* ;
use p2p_repo ::types ::{ PrivKey , PubKey } ;
use p2p_repo ::utils ::generate_keypair ;
use rust_embed ::RustEmbed ;
use serde_json ::json ;
use std ::collections ::HashMap ;
use std ::collections ::HashSet ;
use std ::fs ;
use std ::net ::SocketAddr ;
use std ::net ::{ IpAddr , Ipv4Addr , Ipv6Addr } ;
use std ::num ::NonZeroU8 ;
use std ::ops ::Deref ;
use std ::path ::{ Path , PathBuf } ;
use std ::sync ::Arc ;
@ -52,6 +59,9 @@ use tempfile::Builder;
static LISTENERS_INFO : OnceCell < ( HashMap < String , ListenerInfo > , HashMap < BindAddress , String > ) > =
OnceCell ::new ( ) ;
static BOOTSTRAP_STRING : OnceCell < String > = OnceCell ::new ( ) ;
struct SecurityCallback {
remote_bind_address : BindAddress ,
local_bind_address : BindAddress ,
@ -178,25 +188,69 @@ fn prepare_urls_from_private_addrs(addrs: &Vec<BindAddress>, port: u16) -> Vec<S
res
}
#[ derive(RustEmbed) ]
#[ folder = " ../ng-app/dist-file/ " ]
#[ include = " *.sha256 " ]
#[ include = " *.gzip " ]
struct App ;
fn upgrade_ws_or_serve_app (
upgrade : Option < & HeaderValue > ,
connection : Option < & HeaderValue > ,
remote : IP ,
serve_app : bool ,
response : Response ,
) -> Result < Response , ErrorResponse > {
if upgrade . is_some ( )
& & upgrade
uri : & Uri ,
last_etag : Option < & HeaderValue > ,
) -> Result < ( ) , ErrorResponse > {
if connection . is_some ( )
& & connection
. unwrap ( )
. to_str ( )
. unwrap ( )
. split ( | c | c = = ' ' | | c = = ',' )
. any ( | p | p . eq_ignore_ascii_case ( "Upgrade" ) )
{
return Ok ( response ) ;
return Ok ( ( ) ) ;
}
if serve_app & & ( remote . is_private ( ) | | remote . is_loopback ( ) ) {
return Err ( make_error ( StatusCode ::OK ) ) ;
if uri = = "/" {
log_debug ! ( "Serving the app" ) ;
let sha_file = App ::get ( "index.sha256" ) . unwrap ( ) ;
let sha = format! (
"\"{}\"" ,
std ::str ::from_utf8 ( sha_file . data . as_ref ( ) ) . unwrap ( )
) ;
if last_etag . is_some ( ) & & last_etag . unwrap ( ) . to_str ( ) . unwrap ( ) = = sha {
// return 304
let res = Response ::builder ( )
. status ( StatusCode ::NOT_MODIFIED )
. header ( "Cache-Control" , "max-age=31536000, must-revalidate" )
. header ( "ETag" , sha )
. body ( None )
. unwrap ( ) ;
return Err ( res ) ;
}
let file = App ::get ( "index.gzip" ) . unwrap ( ) ;
let res = Response ::builder ( )
. status ( StatusCode ::OK )
. header ( "Content-Type" , "text/html" )
. header ( "Cache-Control" , "max-age=31536000, must-revalidate" )
. header ( "Content-Encoding" , "gzip" )
. header ( "ETag" , sha )
. body ( Some ( file . data . to_vec ( ) ) )
. unwrap ( ) ;
return Err ( res ) ;
} else if uri = = "/.ng_bootstrap" {
log_debug ! ( "Serving bootstrap" ) ;
let res = Response ::builder ( )
. status ( StatusCode ::OK )
. header ( "Content-Type" , "text/json" )
. header ( "Cache-Control" , "max-age=3600, must-revalidate" )
. body ( Some ( BOOTSTRAP_STRING . get ( ) . unwrap ( ) . as_bytes ( ) . to_vec ( ) ) )
. unwrap ( ) ;
return Err ( res ) ;
}
}
Err ( make_error ( StatusCode ::FORBIDDEN ) )
@ -207,7 +261,7 @@ const LOCAL_URLS: [&str; 3] = ["http://localhost", "http://127.0.0.1", "http://:
const APP_NG_ONE_URL : & str = "https://app.nextgraph.one" ;
impl Callback for SecurityCallback {
fn on_request ( self , request : & Request , response : Response ) -> Result < Response , ErrorResponse > {
fn on_request ( self , request : & Request ) -> Result < ( ) , ErrorResponse > {
let local_urls = LOCAL_URLS
. to_vec ( )
. iter ( )
@ -236,21 +290,30 @@ impl Callback for SecurityCallback {
. get ( listener_id )
. ok_or ( make_error ( StatusCode ::FORBIDDEN ) ) ? ;
if request . method ( ) ! = Method ::GET {
return Err ( make_error ( StatusCode ::METHOD_NOT_ALLOWED ) ) ;
}
if request . version ( ) ! = Version ::HTTP_11 {
return Err ( make_error ( StatusCode ::HTTP_VERSION_NOT_SUPPORTED ) ) ;
}
let xff = request . headers ( ) . get ( "X-Forwarded-For" ) ;
let upgrade = request . headers ( ) . get ( CONNECTION ) ;
let connection = request . headers ( ) . get ( CONNECTION ) ;
let host = request . headers ( ) . get ( HOST ) ;
let origin = request . headers ( ) . get ( ORIGIN ) ;
let remote = self . remote_bind_address . ip ;
let xff = request . headers ( ) . get ( "X-Forwarded-For" ) ;
let last_etag = request . headers ( ) . get ( "If-None-Match" ) ;
let uri = request . uri ( ) ;
log_debug ! (
"upgrade :{:?} origin:{:?} host:{:?} xff:{:?} remote:{:?} local:{:?}" ,
upgrade ,
"connection :{:?} origin:{:?} host:{:?} xff:{:?} remote:{:?} local:{:?} uri :{:?}" ,
connection ,
origin ,
host ,
xff ,
remote ,
self . local_bind_address
self . local_bind_address ,
uri
) ;
match listener . config . if_type {
@ -270,7 +333,13 @@ impl Callback for SecurityCallback {
"accepted core with refuse_clients {}" ,
listener . config . refuse_clients
) ;
return Ok ( response ) ;
return upgrade_ws_or_serve_app (
connection ,
remote ,
listener . config . serve_app ,
uri ,
last_etag ,
) ;
}
InterfaceType ::Loopback = > {
if ! remote . is_loopback ( ) {
@ -293,10 +362,11 @@ impl Callback for SecurityCallback {
listener . config . accept_direct
) ;
return upgrade_ws_or_serve_app (
upgrade ,
connection ,
remote ,
listener . config . serve_app ,
response ,
uri ,
last_etag ,
) ;
} else if listener . config . accept_forward_for . is_private_domain ( ) {
let ( hosts_str , urls_str ) =
@ -305,14 +375,26 @@ impl Callback for SecurityCallback {
check_host ( host , hosts_str ) ? ;
check_xff_is_public_or_private ( xff , false , false ) ? ;
log_debug ! ( "accepted loopback PRIVATE_DOMAIN" ) ;
return Ok ( response ) ;
return upgrade_ws_or_serve_app (
connection ,
remote ,
listener . config . serve_app ,
uri ,
last_etag ,
) ;
} else if listener . config . accept_forward_for = = AcceptForwardForV0 ::No {
check_host ( host , local_hosts ) ? ;
check_no_xff ( xff ) ? ;
// TODO local_urls might need a trailing :port, but it is ok for now as we do starts_with
check_origin_is_url ( origin , local_urls ) ? ;
log_debug ! ( "accepted loopback DIRECT" ) ;
return Ok ( response ) ;
return upgrade_ws_or_serve_app (
connection ,
remote ,
listener . config . serve_app ,
uri ,
last_etag ,
) ;
}
}
InterfaceType ::Private = > {
@ -347,7 +429,13 @@ impl Callback for SecurityCallback {
check_origin_is_url ( origin , urls_str ) ? ;
check_host_in_addrs ( host , & addrs ) ? ;
log_debug ! ( "accepted private PUBLIC_STATIC or PUBLIC_DYN with direct {} with refuse_clients {}" , listener . config . accept_direct , listener . config . refuse_clients ) ;
return Ok ( response ) ;
return upgrade_ws_or_serve_app (
connection ,
remote ,
listener . config . serve_app ,
uri ,
last_etag ,
) ;
} else if listener . config . accept_forward_for . is_public_domain ( ) {
if ! remote . is_private ( ) {
return Err ( make_error ( StatusCode ::FORBIDDEN ) ) ;
@ -373,7 +461,13 @@ impl Callback for SecurityCallback {
"accepted private PUBLIC_DOMAIN with direct {}" ,
listener . config . accept_direct
) ;
return Ok ( response ) ;
return upgrade_ws_or_serve_app (
connection ,
remote ,
listener . config . serve_app ,
uri ,
last_etag ,
) ;
} else if listener . config . accept_forward_for = = AcceptForwardForV0 ::No {
if ! remote . is_private ( ) {
return Err ( make_error ( StatusCode ::FORBIDDEN ) ) ;
@ -387,7 +481,13 @@ impl Callback for SecurityCallback {
prepare_urls_from_private_addrs ( & listener . addrs , listener . config . port ) ,
) ? ;
log_debug ! ( "accepted private DIRECT" ) ;
return Ok ( response ) ;
return upgrade_ws_or_serve_app (
connection ,
remote ,
listener . config . serve_app ,
uri ,
last_etag ,
) ;
}
}
_ = > { }
@ -410,10 +510,7 @@ pub async fn accept(tcp: TcpStream, peer_priv_key: Sensitive<[u8; 32]>) {
)
. await ;
if ws . is_err ( ) {
log_debug ! ( "websocket rejected {:?}" , ws . err ( ) ) ;
//let mut buffer = Vec::new();
//tcp.read_to_end(&mut buffer).await;
//log_debug!("{:?}", buffer);
log_debug ! ( "websocket rejected" ) ;
return ;
}
@ -459,7 +556,7 @@ pub async fn run_server_accept_one(
pub async fn run_server_v0 (
peer_priv_key : Sensitive < [ u8 ; 32 ] > ,
peer_pub_key : PubKey ,
peer_id : PubKey ,
wallet_master_key : Sensitive < [ u8 ; 32 ] > ,
config : DaemonConfigV0 ,
mut path : PathBuf ,
@ -512,9 +609,12 @@ pub async fn run_server_v0(
let mut listeners_addrs : Vec < ( Vec < SocketAddr > , String ) > = vec! [ ] ;
let mut listeners : Vec < TcpListener > = vec! [ ] ;
let mut accept_clients = false ;
//let mut serve_app = false;
// TODO: check that there is only one PublicDyn or one PublicStatic or one Core
let mut servers : Vec < BrokerServerV0 > = vec! [ ] ;
// Preparing the listeners addrs and infos
for listener in config . listeners {
if ! listener . accept_direct & & listener . accept_forward_for = = AcceptForwardForV0 ::No {
@ -576,11 +676,26 @@ pub async fn run_server_v0(
listener . interface_name
) ;
}
// if listener.serve_app {
// serve_app = true;
// }
let bind_addresses : Vec < BindAddress > =
addrs . iter ( ) . map ( | addr | addr . into ( ) ) . collect ( ) ;
let server_types = listener . get_bootstraps ( bind_addresses . clone ( ) ) ;
for server_type in server_types {
servers . push ( BrokerServerV0 {
peer_id ,
server_type ,
} )
}
let listener_id : String = listener . to_string ( ) ;
let listener_info = ListenerInfo {
config : listener ,
addrs : addrs . iter ( ) . map ( | addr | addr . into ( ) ) . collect ( ) ,
addrs : bind_addresses ,
} ;
listener_infos . insert ( listener_id , listener_info ) ;
@ -598,11 +713,14 @@ pub async fn run_server_v0(
log_warn ! ( "There isn't any listener that accept clients. This is a misconfiguration as a core server that cannot receive client connections is useless" ) ;
}
let bootstrap = BootstrapContent ::V0 ( BootstrapContentV0 { servers } ) ;
BOOTSTRAP_STRING . set ( json ! ( bootstrap ) . to_string ( ) ) . unwrap ( ) ;
// saving the infos in the broker. This needs to happen before we start listening, as new incoming connections can happen anytime after that.
// and we need those infos for permission checking.
{
let mut broker = BROKER . write ( ) . await ;
broker . set_my_peer_id ( peer_pub_key ) ;
broker . set_my_peer_id ( peer_id ) ;
LISTENERS_INFO
. set ( broker . set_listeners ( listener_infos ) )
. unwrap ( ) ;