refactor client to server connection start

pull/19/head
Niko PLP 1 year ago
parent e80c87d925
commit dfdfbedee8
  1. 165
      Cargo.lock
  2. 11
      ng-app/src/routes/WalletCreate.svelte
  3. 15
      ng-sdk-js/src/lib.rs
  4. 4
      p2p-broker/src/server_ws.rs
  5. 22
      p2p-client-ws/src/remote_ws.rs
  6. 5
      p2p-client-ws/src/remote_ws_wasm.rs
  7. 2
      p2p-net/Cargo.toml
  8. 6
      p2p-net/src/actor.rs
  9. 47
      p2p-net/src/broker.rs
  10. 28
      p2p-net/src/connection.rs
  11. 182
      p2p-net/src/types.rs
  12. 7
      p2p-repo/src/types.rs
  13. 6
      p2p-repo/src/utils.rs

165
Cargo.lock generated

@ -153,7 +153,7 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b"
dependencies = [ dependencies = [
"windows-sys", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@ -163,7 +163,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188" checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188"
dependencies = [ dependencies = [
"anstyle", "anstyle",
"windows-sys", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@ -324,7 +324,7 @@ dependencies = [
"futures-lite", "futures-lite",
"rustix", "rustix",
"signal-hook", "signal-hook",
"windows-sys", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@ -1335,7 +1335,7 @@ checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a"
dependencies = [ dependencies = [
"errno-dragonfly", "errno-dragonfly",
"libc", "libc",
"windows-sys", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@ -2131,6 +2131,19 @@ dependencies = [
"want", "want",
] ]
[[package]]
name = "hyper-tls"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes",
"hyper",
"native-tls",
"tokio",
"tokio-native-tls",
]
[[package]] [[package]]
name = "iana-time-zone" name = "iana-time-zone"
version = "0.1.57" version = "0.1.57"
@ -2251,7 +2264,7 @@ checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
dependencies = [ dependencies = [
"hermit-abi 0.3.1", "hermit-abi 0.3.1",
"libc", "libc",
"windows-sys", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@ -2269,7 +2282,7 @@ dependencies = [
"hermit-abi 0.3.1", "hermit-abi 0.3.1",
"io-lifetimes", "io-lifetimes",
"rustix", "rustix",
"windows-sys", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@ -2580,7 +2593,7 @@ checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2"
dependencies = [ dependencies = [
"libc", "libc",
"wasi 0.11.0+wasi-snapshot-preview1", "wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@ -2610,6 +2623,24 @@ dependencies = [
"getrandom 0.2.10", "getrandom 0.2.10",
] ]
[[package]]
name = "native-tls"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e"
dependencies = [
"lazy_static",
"libc",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework",
"security-framework-sys",
"tempfile",
]
[[package]] [[package]]
name = "ndk" name = "ndk"
version = "0.6.0" version = "0.6.0"
@ -2959,6 +2990,50 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "openssl"
version = "0.10.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "345df152bc43501c5eb9e4654ff05f794effb78d4efe3d53abc158baddc0703d"
dependencies = [
"bitflags",
"cfg-if",
"foreign-types",
"libc",
"once_cell",
"openssl-macros",
"openssl-sys",
]
[[package]]
name = "openssl-macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.18",
]
[[package]]
name = "openssl-probe"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "374533b0e45f3a7ced10fcaeccca020e66656bc03dac384f852e4e5a7a8104a6"
dependencies = [
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]] [[package]]
name = "ordered-float" name = "ordered-float"
version = "3.7.0" version = "3.7.0"
@ -3044,10 +3119,12 @@ dependencies = [
"num_enum", "num_enum",
"once_cell", "once_cell",
"p2p-repo", "p2p-repo",
"reqwest",
"serde", "serde",
"serde_bare", "serde_bare",
"serde_bytes", "serde_bytes",
"unique_id", "unique_id",
"url",
"wasm-bindgen", "wasm-bindgen",
] ]
@ -3368,7 +3445,7 @@ dependencies = [
"libc", "libc",
"log", "log",
"pin-project-lite", "pin-project-lite",
"windows-sys", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@ -3710,10 +3787,12 @@ dependencies = [
"http", "http",
"http-body", "http-body",
"hyper", "hyper",
"hyper-tls",
"ipnet", "ipnet",
"js-sys", "js-sys",
"log", "log",
"mime", "mime",
"native-tls",
"once_cell", "once_cell",
"percent-encoding", "percent-encoding",
"pin-project-lite", "pin-project-lite",
@ -3721,6 +3800,7 @@ dependencies = [
"serde_json", "serde_json",
"serde_urlencoded", "serde_urlencoded",
"tokio", "tokio",
"tokio-native-tls",
"tokio-util", "tokio-util",
"tower-service", "tower-service",
"url", "url",
@ -3808,7 +3888,7 @@ dependencies = [
"io-lifetimes", "io-lifetimes",
"libc", "libc",
"linux-raw-sys", "linux-raw-sys",
"windows-sys", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@ -3853,6 +3933,15 @@ dependencies = [
"winapi-util", "winapi-util",
] ]
[[package]]
name = "schannel"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "713cfb06c7059f3588fb8044c0fad1d09e3c01d225e25b9220dbfdcf16dbb1b3"
dependencies = [
"windows-sys 0.42.0",
]
[[package]] [[package]]
name = "scoped-tls" name = "scoped-tls"
version = "1.0.1" version = "1.0.1"
@ -3865,6 +3954,29 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "security-framework"
version = "2.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fc758eb7bffce5b308734e9b0c1468893cae9ff70ebf13e7090be8dcbcc83a8"
dependencies = [
"bitflags",
"core-foundation",
"core-foundation-sys",
"libc",
"security-framework-sys",
]
[[package]]
name = "security-framework-sys"
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f51d0c0d83bec45f16480d0ce0058397a69e48fcdc52d1dc8855fb68acbd31a7"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]] [[package]]
name = "selectors" name = "selectors"
version = "0.22.0" version = "0.22.0"
@ -4605,7 +4717,7 @@ dependencies = [
"fastrand", "fastrand",
"redox_syscall 0.3.5", "redox_syscall 0.3.5",
"rustix", "rustix",
"windows-sys", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@ -4739,7 +4851,7 @@ dependencies = [
"signal-hook-registry", "signal-hook-registry",
"socket2", "socket2",
"tokio-macros", "tokio-macros",
"windows-sys", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@ -4753,6 +4865,16 @@ dependencies = [
"syn 2.0.18", "syn 2.0.18",
] ]
[[package]]
name = "tokio-native-tls"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
dependencies = [
"native-tls",
"tokio",
]
[[package]] [[package]]
name = "tokio-stream" name = "tokio-stream"
version = "0.1.14" version = "0.1.14"
@ -5059,6 +5181,12 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4d330786735ea358f3bc09eea4caa098569c1c93f342d9aca0514915022fe7e" checksum = "a4d330786735ea358f3bc09eea4caa098569c1c93f342d9aca0514915022fe7e"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]] [[package]]
name = "version-compare" name = "version-compare"
version = "0.1.1" version = "0.1.1"
@ -5480,6 +5608,21 @@ version = "0.44.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee78911e3f4ce32c1ad9d3c7b0bd95389662ad8d8f1a3155688fed70bd96e2b6" checksum = "ee78911e3f4ce32c1ad9d3c7b0bd95389662ad8d8f1a3155688fed70bd96e2b6"
[[package]]
name = "windows-sys"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7"
dependencies = [
"windows_aarch64_gnullvm 0.42.2",
"windows_aarch64_msvc 0.42.2",
"windows_i686_gnu 0.42.2",
"windows_i686_msvc 0.42.2",
"windows_x86_64_gnu 0.42.2",
"windows_x86_64_gnullvm 0.42.2",
"windows_x86_64_msvc 0.42.2",
]
[[package]] [[package]]
name = "windows-sys" name = "windows-sys"
version = "0.48.0" version = "0.48.0"

@ -701,12 +701,13 @@
<p class="max-w-xl md:mx-auto lg:max-w-2xl text-left"> <p class="max-w-xl md:mx-auto lg:max-w-2xl text-left">
<span class="text-xl">Do you trust this device? </span> <br /> <span class="text-xl">Do you trust this device? </span> <br />
If you do, if this device is yours or is used by few trusted persons of your If you do, if this device is yours or is used by few trusted persons of your
family or workplace, then you can save your wallet in this device. To the family or workplace, and you would like to login again from this device in
contrary, if this device is public and shared by strangers, do not save your the future, then you can save your wallet on this device. To the contrary,
wallet here. {#if !import.meta.env.TAURI_PLATFORM}By selecting this if this device is public and shared by strangers, do not save your wallet
option, you agree to save some cookies on your browser.{/if}<br /> here. {#if !import.meta.env.TAURI_PLATFORM}By selecting this option, you
agree to save some cookies on your browser.{/if}<br />
<Toggle class="mt-3" bind:checked={options.trusted} <Toggle class="mt-3" bind:checked={options.trusted}
>Save your wallet here?</Toggle >Save your wallet on this device?</Toggle
> >
</p> </p>
<p class="max-w-xl md:mx-auto mt-10 lg:max-w-2xl text-left"> <p class="max-w-xl md:mx-auto mt-10 lg:max-w-2xl text-left">

@ -234,8 +234,8 @@ pub async fn start() {
let x_from_ed = keys.1.to_dh_from_ed(); let x_from_ed = keys.1.to_dh_from_ed();
log_info!("Pub from X {}", x_from_ed); log_info!("Pub from X {}", x_from_ed);
let (client_priv_key, client_pub_key) = generate_keypair(); let (client_priv, client) = generate_keypair();
let (user_priv_key, user_pub_key) = generate_keypair(); let (user_priv, user) = generate_keypair();
log_info!("start connecting"); log_info!("start connecting");
@ -244,16 +244,15 @@ pub async fn start() {
.await .await
.connect( .connect(
Box::new(ConnectionWebSocket {}), Box::new(ConnectionWebSocket {}),
IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(),
WS_PORT,
None,
keys.0, keys.0,
keys.1, keys.1,
server_key, server_key,
StartConfig::Client(ClientConfig { StartConfig::Client(ClientConfig {
user: user_pub_key, url: format!("ws://127.0.0.1:{}", WS_PORT),
client: client_pub_key, user,
client_priv: client_priv_key, user_priv,
client,
client_priv,
}), }),
) )
.await; .await;

@ -276,10 +276,6 @@ fn upgrade_ws_or_serve_app(
Err(make_error(StatusCode::FORBIDDEN)) Err(make_error(StatusCode::FORBIDDEN))
} }
const LOCAL_HOSTS: [&str; 3] = ["localhost", "127.0.0.1", "[::1]"];
const LOCAL_URLS: [&str; 3] = ["http://localhost", "http://127.0.0.1", "http://[::1]"];
//const APP_NG_ONE_URL: &str = "https://app.nextgraph.one";
impl Callback for SecurityCallback { impl Callback for SecurityCallback {
fn on_request(self, request: &Request) -> Result<(), ErrorResponse> { fn on_request(self, request: &Request) -> Result<(), ErrorResponse> {
let local_urls = LOCAL_URLS let local_urls = LOCAL_URLS

@ -43,8 +43,7 @@ pub struct ConnectionWebSocket {}
impl IConnect for ConnectionWebSocket { impl IConnect for ConnectionWebSocket {
async fn open( async fn open(
&self, &self,
ip: IP, url: String,
port: u16,
peer_privk: Sensitive<[u8; 32]>, peer_privk: Sensitive<[u8; 32]>,
peer_pubk: PubKey, peer_pubk: PubKey,
remote_peer: DirectPeerId, remote_peer: DirectPeerId,
@ -52,8 +51,6 @@ impl IConnect for ConnectionWebSocket {
) -> Result<ConnectionBase, NetError> { ) -> Result<ConnectionBase, NetError> {
let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS); let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS);
let url = format!("ws://{}:{}", ip, port);
let res = connect_async(url).await; let res = connect_async(url).await;
match res { match res {
@ -66,6 +63,7 @@ impl IConnect for ConnectionWebSocket {
let s = cnx.take_sender(); let s = cnx.take_sender();
let r = cnx.take_receiver(); let r = cnx.take_receiver();
let mut shutdown = cnx.set_shutdown(); let mut shutdown = cnx.set_shutdown();
cnx.release_shutdown();
let join = task::spawn(async move { let join = task::spawn(async move {
log_debug!("START of WS loop"); log_debug!("START of WS loop");
@ -103,6 +101,7 @@ impl IConnect for ConnectionWebSocket {
let s = cnx.take_sender(); let s = cnx.take_sender();
let r = cnx.take_receiver(); let r = cnx.take_receiver();
let mut shutdown = cnx.set_shutdown(); let mut shutdown = cnx.set_shutdown();
cnx.release_shutdown();
let join = task::spawn(async move { let join = task::spawn(async move {
log_debug!("START of WS loop"); log_debug!("START of WS loop");
@ -324,8 +323,8 @@ mod test {
let x_from_ed = keys.1.to_dh_from_ed(); let x_from_ed = keys.1.to_dh_from_ed();
log_info!("Pub from X {}", x_from_ed); log_info!("Pub from X {}", x_from_ed);
let (client_priv_key, client_pub_key) = generate_keypair(); let (client_priv, client) = generate_keypair();
let (user_priv_key, user_pub_key) = generate_keypair(); let (user_priv, user) = generate_keypair();
log_info!("start connecting"); log_info!("start connecting");
{ {
@ -334,13 +333,16 @@ mod test {
.await .await
.connect( .connect(
Box::new(ConnectionWebSocket {}), Box::new(ConnectionWebSocket {}),
IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(),
WS_PORT,
None,
keys.0, keys.0,
keys.1, keys.1,
server_key, server_key,
StartConfig::Probe, StartConfig::Client(ClientConfig {
url: format!("ws://localhost:{}", WS_PORT),
user,
user_priv,
client,
client_priv,
}),
) )
.await; .await;
log_info!("broker.connect : {:?}", res); log_info!("broker.connect : {:?}", res);

@ -37,8 +37,7 @@ pub struct ConnectionWebSocket {}
impl IConnect for ConnectionWebSocket { impl IConnect for ConnectionWebSocket {
async fn open( async fn open(
&self, &self,
ip: IP, url: String,
port: u16,
peer_privk: Sensitive<[u8; 32]>, peer_privk: Sensitive<[u8; 32]>,
peer_pubk: PubKey, peer_pubk: PubKey,
remote_peer: DirectPeerId, remote_peer: DirectPeerId,
@ -46,8 +45,6 @@ impl IConnect for ConnectionWebSocket {
) -> Result<ConnectionBase, NetError> { ) -> Result<ConnectionBase, NetError> {
let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS); let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS);
let url = format!("ws://{}:{}", ip, port);
let (mut ws, wsio) = WsMeta::connect(url, None).await.map_err(|e| { let (mut ws, wsio) = WsMeta::connect(url, None).await.map_err(|e| {
//log_info!("{:?}", e); //log_info!("{:?}", e);
NetError::ConnectionError NetError::ConnectionError

@ -25,6 +25,8 @@ noise-protocol = "0.2.0-rc1"
noise-rust-crypto = "0.6.0-rc.1" noise-rust-crypto = "0.6.0-rc.1"
ed25519-dalek = "1.0.1" ed25519-dalek = "1.0.1"
either = "1.8.1" either = "1.8.1"
reqwest = { version = "0.11.18", features = ["json"] }
url = "2.4.0"
[target.'cfg(target_arch = "wasm32")'.dependencies.getrandom] [target.'cfg(target_arch = "wasm32")'.dependencies.getrandom]
version = "0.2.7" version = "0.2.7"

@ -121,7 +121,7 @@ impl<
match receiver.next().await { match receiver.next().await {
Some(ConnectionCommand::Msg(msg)) => { Some(ConnectionCommand::Msg(msg)) => {
if let ProtocolMessage::BrokerMessage(ref bm) = msg { if let ProtocolMessage::BrokerMessage(ref bm) = msg {
if bm.result() == ProtocolError::PartialContent.into() if bm.result() == Into::<u16>::into(ProtocolError::PartialContent)
&& TypeId::of::<B>() != TypeId::of::<()>() && TypeId::of::<B>() != TypeId::of::<()>()
{ {
let (mut b_sender, b_receiver) = mpsc::unbounded::<B>(); let (mut b_sender, b_receiver) = mpsc::unbounded::<B>();
@ -141,7 +141,9 @@ impl<
actor_receiver.next().await actor_receiver.next().await
{ {
if let ProtocolMessage::BrokerMessage(ref bm) = msg { if let ProtocolMessage::BrokerMessage(ref bm) = msg {
if bm.result() == ProtocolError::EndOfStream.into() { if bm.result()
== Into::<u16>::into(ProtocolError::EndOfStream)
{
break; break;
} }
let response = msg.try_into(); let response = msg.try_into();

@ -63,7 +63,7 @@ pub static BROKER: Lazy<Arc<RwLock<Broker>>> = Lazy::new(|| Arc::new(RwLock::new
pub struct Broker { pub struct Broker {
direct_connections: HashMap<IP, DirectConnection>, direct_connections: HashMap<IP, DirectConnection>,
peers: HashMap<DirectPeerId, BrokerPeerInfo>, peers: HashMap<X25519PubKey, BrokerPeerInfo>,
/// (local,remote) -> ConnectionBase /// (local,remote) -> ConnectionBase
anonymous_connections: HashMap<(BindAddress, BindAddress), ConnectionBase>, anonymous_connections: HashMap<(BindAddress, BindAddress), ConnectionBase>,
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
@ -248,7 +248,7 @@ impl Broker {
} }
pub fn reconnecting(&mut self, peer_id: &DirectPeerId) { pub fn reconnecting(&mut self, peer_id: &DirectPeerId) {
let peerinfo = self.peers.get_mut(peer_id); let peerinfo = self.peers.get_mut(&peer_id.to_dh_slice());
match peerinfo { match peerinfo {
Some(info) => match &info.connected { Some(info) => match &info.connected {
PeerConnection::NONE => {} PeerConnection::NONE => {}
@ -264,7 +264,7 @@ impl Broker {
} }
} }
pub fn remove_peer_id(&mut self, peer_id: &DirectPeerId) { pub fn remove_peer_id(&mut self, peer_id: &DirectPeerId) {
let removed = self.peers.remove(peer_id); let removed = self.peers.remove(&peer_id.to_dh_slice());
match removed { match removed {
Some(info) => match info.connected { Some(info) => match info.connected {
PeerConnection::NONE => {} PeerConnection::NONE => {}
@ -285,6 +285,9 @@ impl Broker {
let removed = self let removed = self
.anonymous_connections .anonymous_connections
.remove(&(local_bind_address, remote_bind_address)); .remove(&(local_bind_address, remote_bind_address));
if removed.is_some() {
removed.unwrap().release_shutdown();
}
} }
pub fn test(&self) -> u32 { pub fn test(&self) -> u32 {
@ -363,7 +366,7 @@ impl Broker {
anonymous = Vec::from_iter(broker.anonymous_connections.keys().cloned()); anonymous = Vec::from_iter(broker.anonymous_connections.keys().cloned());
} }
for peer_id in peer_ids { for peer_id in peer_ids {
BROKER.write().await.close_peer_connection(&peer_id).await; BROKER.write().await.close_peer_connection_x(&peer_id).await;
} }
for anon in anonymous { for anon in anonymous {
BROKER.write().await.close_anonymous(anon.1, anon.0).await; BROKER.write().await.close_anonymous(anon.1, anon.0).await;
@ -475,7 +478,7 @@ impl Broker {
lastPeerAdvert: None, lastPeerAdvert: None,
connected, connected,
}; };
self.peers.insert(remote_peer_id, bpi); self.peers.insert(remote_peer_id.to_dh_slice(), bpi);
Ok(()) Ok(())
} }
@ -495,9 +498,6 @@ impl Broker {
pub async fn connect( pub async fn connect(
&mut self, &mut self,
cnx: Box<dyn IConnect>, cnx: Box<dyn IConnect>,
ip: IP,
port: u16,
core: Option<String>, // the interface used as egress for this connection
peer_privk: Sensitive<[u8; 32]>, peer_privk: Sensitive<[u8; 32]>,
peer_pubk: PubKey, peer_pubk: PubKey,
remote_peer_id: DirectPeerId, remote_peer_id: DirectPeerId,
@ -513,44 +513,46 @@ impl Broker {
log_info!("CONNECTING"); log_info!("CONNECTING");
let mut connection = cnx let mut connection = cnx
.open( .open(
ip, config.get_url(),
port,
Sensitive::<[u8; 32]>::from_slice(peer_privk.deref()), Sensitive::<[u8; 32]>::from_slice(peer_privk.deref()),
peer_pubk, peer_pubk,
remote_peer_id, remote_peer_id,
config, config.clone(),
) )
.await?; .await?;
let join = connection.take_shutdown(); let join = connection.take_shutdown();
let connected = if core.is_some() { let connected = match &config {
StartConfig::Core(config) => {
let ip = config.addr.ip.clone();
let dc = DirectConnection { let dc = DirectConnection {
ip, ip,
interface: core.clone().unwrap(), interface: config.interface.clone(),
remote_peer_id, remote_peer_id,
tp: connection.transport_protocol(), tp: connection.transport_protocol(),
cnx: connection, cnx: connection,
}; };
self.direct_connections.insert(ip, dc); self.direct_connections.insert(ip, dc);
PeerConnection::Core(ip) PeerConnection::Core(ip)
} else { }
PeerConnection::Client(connection) StartConfig::Client(config) => PeerConnection::Client(connection),
_ => unimplemented!(),
}; };
let bpi = BrokerPeerInfo { let bpi = BrokerPeerInfo {
lastPeerAdvert: None, lastPeerAdvert: None,
connected, connected,
}; };
self.peers.insert(remote_peer_id, bpi); self.peers.insert(remote_peer_id.to_dh_slice(), bpi);
async fn watch_close( async fn watch_close(
mut join: Receiver<Either<NetError, PubKey>>, mut join: Receiver<Either<NetError, PubKey>>,
cnx: Box<dyn IConnect>, cnx: Box<dyn IConnect>,
ip: IP,
core: Option<String>, // the interface used as egress for this connection
peer_privk: Sensitive<[u8; 32]>, peer_privk: Sensitive<[u8; 32]>,
peer_pubkey: PubKey, peer_pubkey: PubKey,
remote_peer_id: DirectPeerId, remote_peer_id: DirectPeerId,
config: StartConfig,
) -> ResultSend<()> { ) -> ResultSend<()> {
async move { async move {
let res = join.next().await; let res = join.next().await;
@ -579,16 +581,15 @@ impl Broker {
spawn_and_log_error(watch_close( spawn_and_log_error(watch_close(
join, join,
cnx, cnx,
ip,
core,
peer_privk, peer_privk,
peer_pubk, peer_pubk,
remote_peer_id, remote_peer_id,
config,
)); ));
Ok(()) Ok(())
} }
pub async fn close_peer_connection(&mut self, peer_id: &DirectPeerId) { pub async fn close_peer_connection_x(&mut self, peer_id: &X25519PubKey) {
if let Some(peer) = self.peers.get_mut(peer_id) { if let Some(peer) = self.peers.get_mut(peer_id) {
match &mut peer.connected { match &mut peer.connected {
PeerConnection::Core(_) => { PeerConnection::Core(_) => {
@ -604,6 +605,10 @@ impl Broker {
} }
} }
pub async fn close_peer_connection(&mut self, peer_id: &DirectPeerId) {
self.close_peer_connection_x(&peer_id.to_dh_slice()).await
}
pub async fn close_anonymous( pub async fn close_anonymous(
&mut self, &mut self,
remote_bind_address: BindAddress, remote_bind_address: BindAddress,

@ -52,8 +52,7 @@ pub enum ConnectionCommand {
pub trait IConnect: Send + Sync { pub trait IConnect: Send + Sync {
async fn open( async fn open(
&self, &self,
ip: IP, url: String,
port: u16,
peer_privk: Sensitive<[u8; 32]>, peer_privk: Sensitive<[u8; 32]>,
peer_pubk: PubKey, peer_pubk: PubKey,
remote_peer: DirectPeerId, remote_peer: DirectPeerId,
@ -144,18 +143,28 @@ pub enum StepReply {
CloseNow, CloseNow,
} }
#[derive(PartialEq, Debug, Clone)]
pub struct ClientConfig { pub struct ClientConfig {
pub url: String,
pub user: PubKey, pub user: PubKey,
pub user_priv: PrivKey,
pub client: PubKey, pub client: PubKey,
pub client_priv: PrivKey, pub client_priv: PrivKey,
} }
#[derive(PartialEq, Debug, Clone)]
pub struct ExtConfig {} pub struct ExtConfig {}
pub struct CoreConfig {} #[derive(PartialEq, Debug, Clone)]
pub struct CoreConfig {
pub addr: BindAddress,
pub interface: String,
}
#[derive(PartialEq, Debug, Clone)]
pub struct AdminConfig {} pub struct AdminConfig {}
#[derive(PartialEq, Debug, Clone)]
pub enum StartConfig { pub enum StartConfig {
Probe, Probe,
Relay(BindAddress), Relay(BindAddress),
@ -165,6 +174,16 @@ pub enum StartConfig {
Admin(AdminConfig), Admin(AdminConfig),
} }
impl StartConfig {
pub fn get_url(&self) -> String {
match self {
Self::Client(config) => config.url.clone(),
Self::Core(config) => format!("ws://{}:{}", config.addr.ip, config.addr.port),
_ => unimplemented!(),
}
}
}
impl NoiseFSM { impl NoiseFSM {
pub fn new( pub fn new(
bind_addresses: Option<(BindAddress, BindAddress)>, bind_addresses: Option<(BindAddress, BindAddress)>,
@ -703,10 +722,11 @@ impl ConnectionBase {
} }
} }
pub async fn release_shutdown(&mut self) { pub fn release_shutdown(&mut self) {
self.shutdown_sender = None; self.shutdown_sender = None;
} }
// only used by accept
pub async fn reset_shutdown(&mut self, remote_peer_id: PubKey) { pub async fn reset_shutdown(&mut self, remote_peer_id: PubKey) {
let _ = self let _ = self
.shutdown_sender .shutdown_sender

@ -88,6 +88,16 @@ pub struct BindAddress {
pub ip: IP, pub ip: IP,
} }
impl BindAddress {
pub fn to_ws_url(&self) -> String {
format!(
"ws://{}:{}",
self.ip,
if self.port == 0 { 80 } else { self.port }
)
}
}
impl From<&SocketAddr> for BindAddress { impl From<&SocketAddr> for BindAddress {
#[inline] #[inline]
fn from(addr: &SocketAddr) -> BindAddress { fn from(addr: &SocketAddr) -> BindAddress {
@ -118,6 +128,162 @@ pub struct BrokerServerV0 {
pub peer_id: PubKey, pub peer_id: PubKey,
} }
pub const APP_NG_ONE_URL: &str = "https://app.nextgraph.one";
pub const APP_NG_ONE_WS_URL: &str = "wss://app.nextgraph.one";
fn api_dyn_peer_url(peer_id: &PubKey) -> String {
format!("https://nextgraph.one/api/v1/dynpeer/{}", peer_id)
}
pub const LOCAL_HOSTS: [&str; 3] = ["localhost", "127.0.0.1", "[::1]"];
fn local_ws_url(port: &u16) -> String {
format!("ws://localhost:{}", if *port == 0 { 80 } else { *port })
}
pub const LOCAL_URLS: [&str; 3] = ["http://localhost", "http://127.0.0.1", "http://[::1]"];
use url::{Host, Url};
impl BrokerServerTypeV0 {
pub fn find_first_ipv4(&self) -> Option<&BindAddress> {
match self {
Self::BoxPrivate(addrs) => {
for addr in addrs {
if addr.ip.is_v4() {
return Some(addr);
}
}
return None;
}
_ => None,
}
}
pub fn find_first_ipv6(&self) -> Option<&BindAddress> {
match self {
Self::BoxPrivate(addrs) => {
for addr in addrs {
if addr.ip.is_v6() {
return Some(addr);
}
}
return None;
}
_ => None,
}
}
}
impl BrokerServerV0 {
fn first_ipv4(&self) -> Option<(String, Vec<BindAddress>)> {
self.server_type.find_first_ipv4().map_or(None, |bindaddr| {
Some((format!("ws://{}:{}", bindaddr.ip, bindaddr.port), vec![]))
})
}
fn first_ipv6(&self) -> Option<(String, Vec<BindAddress>)> {
self.server_type.find_first_ipv6().map_or(None, |bindaddr| {
Some((format!("ws://{}:{}", bindaddr.ip, bindaddr.port), vec![]))
})
}
/// on web browser, returns the connection URL and an optional list of BindAddress if a relay is needed
/// filtered by the current location url of the webpage
/// on native apps, returns or the connection URL without optional BindAddress or an empty string with
/// several BindAddresses to try to connect to with .to_ws_url()
pub async fn get_url(&self, location: Option<String>) -> Option<(String, Vec<BindAddress>)> {
if location.is_some() {
let location = location.unwrap();
if location.starts_with(APP_NG_ONE_URL) {
match &self.server_type {
BrokerServerTypeV0::BoxPublic(addrs) => {
Some((APP_NG_ONE_WS_URL.to_string(), addrs.clone()))
}
BrokerServerTypeV0::BoxPublicDyn(addrs) => {
let resp = reqwest::get(api_dyn_peer_url(&self.peer_id)).await;
if resp.is_ok() {
let resp = resp.unwrap().json::<Vec<BindAddress>>().await;
if resp.is_ok() {
return Some((APP_NG_ONE_WS_URL.to_string(), resp.unwrap()));
}
}
if addrs.len() > 0 {
Some((APP_NG_ONE_WS_URL.to_string(), addrs.clone()))
} else {
None
}
}
_ => None,
}
} else if let BrokerServerTypeV0::Domain(domain) = &self.server_type {
let url = format!("https://{}", domain);
if location.starts_with(&url) {
let wss_url = format!("wss://{}", domain);
Some((wss_url, vec![]))
} else {
None
}
} else {
// localhost
if location.starts_with(LOCAL_URLS[0])
|| location.starts_with(LOCAL_URLS[1])
|| location.starts_with(LOCAL_URLS[2])
{
if let BrokerServerTypeV0::Localhost(port) = self.server_type {
Some((local_ws_url(&port), vec![]))
} else {
None
}
}
// a private address
else if location.starts_with("http://") {
let url = Url::parse(&location).unwrap();
match url.host() {
Some(Host::Ipv4(ip)) => {
if is_ipv4_private(&ip) {
self.first_ipv4()
} else {
None
}
}
Some(Host::Ipv6(ip)) => {
if is_ipv6_private(&ip) {
self.first_ipv6()
} else {
None
}
}
_ => None,
}
} else {
None
}
}
} else {
// From native / tauri app
match &self.server_type {
BrokerServerTypeV0::Localhost(port) => Some((local_ws_url(port), vec![])),
BrokerServerTypeV0::BoxPrivate(addrs) => Some((String::new(), addrs.clone())),
BrokerServerTypeV0::BoxPublic(addrs) => Some((String::new(), addrs.clone())),
BrokerServerTypeV0::BoxPublicDyn(addrs) => {
let resp = reqwest::get(api_dyn_peer_url(&self.peer_id)).await;
if resp.is_ok() {
let resp = resp.unwrap().json::<Vec<BindAddress>>().await;
if resp.is_ok() {
return Some((String::new(), resp.unwrap()));
}
}
if addrs.len() > 0 {
Some((String::new(), addrs.clone()))
} else {
None
}
}
BrokerServerTypeV0::Domain(domain) => Some((format!("wss://{}", domain), vec![])),
}
}
}
}
/// Bootstrap content Version 0 /// Bootstrap content Version 0
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BootstrapContentV0 { pub struct BootstrapContentV0 {
@ -358,9 +524,11 @@ impl ListenerV0 {
} }
} }
AcceptForwardForV0::PublicDomain(_) | AcceptForwardForV0::PublicDomainPeer(_) => { AcceptForwardForV0::PublicDomain(_) | AcceptForwardForV0::PublicDomainPeer(_) => {
if !self.refuse_clients {
res.push(BrokerServerTypeV0::Domain( res.push(BrokerServerTypeV0::Domain(
self.accept_forward_for.get_domain().to_string(), self.accept_forward_for.get_domain().to_string(),
)); ));
}
if self.accept_direct { if self.accept_direct {
if self.if_type == InterfaceType::Private { if self.if_type == InterfaceType::Private {
res.push(BrokerServerTypeV0::BoxPrivate(addrs)); res.push(BrokerServerTypeV0::BoxPrivate(addrs));
@ -500,6 +668,20 @@ impl IP {
let t: &IpAddr = &self.into(); let t: &IpAddr = &self.into();
t.is_loopback() t.is_loopback()
} }
pub fn is_v6(&self) -> bool {
if let Self::IPv6(_) = self {
true
} else {
false
}
}
pub fn is_v4(&self) -> bool {
if let Self::IPv4(_) = self {
true
} else {
false
}
}
} }
impl fmt::Display for IP { impl fmt::Display for IP {

@ -14,7 +14,9 @@
//! Corresponds to the BARE schema //! Corresponds to the BARE schema
use crate::errors::NgError; use crate::errors::NgError;
use crate::utils::{decode_key, dh_pubkey_from_ed_slice, ed_privkey_to_pubkey}; use crate::utils::{
decode_key, dh_pubkey_from_ed_slice, dh_slice_from_ed_slice, ed_privkey_to_pubkey,
};
use core::fmt; use core::fmt;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_bare::to_vec; use serde_bare::to_vec;
@ -92,6 +94,9 @@ impl PubKey {
pub fn dh_from_ed_slice(slice: &[u8]) -> PubKey { pub fn dh_from_ed_slice(slice: &[u8]) -> PubKey {
dh_pubkey_from_ed_slice(slice) dh_pubkey_from_ed_slice(slice)
} }
pub fn to_dh_slice(&self) -> [u8; 32] {
dh_slice_from_ed_slice(self.slice())
}
} }
impl fmt::Display for PubKey { impl fmt::Display for PubKey {

@ -60,12 +60,16 @@ pub fn generate_null_keypair() -> (PrivKey, PubKey) {
} }
pub fn dh_pubkey_from_ed_slice(public: &[u8]) -> PubKey { pub fn dh_pubkey_from_ed_slice(public: &[u8]) -> PubKey {
PubKey::X25519PubKey(dh_slice_from_ed_slice(public))
}
pub fn dh_slice_from_ed_slice(public: &[u8]) -> X25519PubKey {
let mut bits: [u8; 32] = [0u8; 32]; let mut bits: [u8; 32] = [0u8; 32];
bits.copy_from_slice(public); bits.copy_from_slice(public);
let compressed = CompressedEdwardsY(bits); let compressed = CompressedEdwardsY(bits);
let ed_point: EdwardsPoint = compressed.decompress().unwrap(); let ed_point: EdwardsPoint = compressed.decompress().unwrap();
let mon_point = ed_point.to_montgomery(); let mon_point = ed_point.to_montgomery();
PubKey::X25519PubKey(mon_point.to_bytes()) mon_point.to_bytes()
} }
pub fn sign( pub fn sign(

Loading…
Cancel
Save