From dc5810489ffae7177ecc7027215957dd9c9b9b1a Mon Sep 17 00:00:00 2001 From: Niko PLP Date: Thu, 12 Oct 2023 01:25:59 +0300 Subject: [PATCH] connection to broker --- Cargo.lock | 5 +- ng-app/package.json | 2 + ng-app/src-tauri/Cargo.toml | 1 + ng-app/src-tauri/src/lib.rs | 93 ++++++++++++++++++- ng-app/src/App.svelte | 14 ++- ng-app/src/api.ts | 24 ++++- ng-app/src/lib/Home.svelte | 11 ++- ng-app/src/lib/Login.svelte | 2 +- ng-app/src/routes/User.svelte | 64 ++++++++++++- ng-app/src/routes/WalletLogin.svelte | 10 +- ng-app/src/store.ts | 79 +++++++++++++++- ng-app/src/wallet_emojis.ts | 2 +- ng-sdk-js/Cargo.toml | 2 +- ng-sdk-js/src/lib.rs | 134 +++++++++++++++++++++++---- ng-wallet/src/emojis.rs | 2 +- ng-wallet/src/lib.rs | 129 +++++++++++++++++++++++++- ng-wallet/src/types.rs | 15 ++- ngcli/src/main.rs | 2 - p2p-client-ws/src/remote_ws.rs | 9 +- p2p-client-ws/src/remote_ws_wasm.rs | 6 +- p2p-net/src/actors/connecting.rs | 42 +++++++++ p2p-net/src/actors/mod.rs | 3 + p2p-net/src/broker.rs | 61 +++++++++--- p2p-net/src/connection.rs | 44 +++++++-- p2p-net/src/types.rs | 7 +- pnpm-lock.yaml | 14 +++ 26 files changed, 691 insertions(+), 86 deletions(-) create mode 100644 p2p-net/src/actors/connecting.rs diff --git a/Cargo.lock b/Cargo.lock index dd93798..1568866 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3043,6 +3043,7 @@ version = "0.1.0" dependencies = [ "async-std", "ng-wallet", + "p2p-client-ws", "p2p-net", "p2p-repo", "serde", @@ -4490,9 +4491,9 @@ dependencies = [ [[package]] name = "serde-wasm-bindgen" -version = "0.5.0" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3b143e2833c57ab9ad3ea280d21fd34e285a42837aeb0ee301f4f41890fa00e" +checksum = "30c9933e5689bd420dc6c87b7a1835701810cbc10cd86a26e4da45b73e6b1d78" dependencies = [ "js-sys", "serde", diff --git a/ng-app/package.json b/ng-app/package.json index a5db204..25fb668 100644 --- a/ng-app/package.json +++ b/ng-app/package.json @@ -34,6 +34,7 @@ "@types/node": "^18.7.10", "autoprefixer": "^10.4.14", "cross-env": "^7.0.3", + "dayjs": "^1.11.10", "internal-ip": "^7.0.0", "node-gzip": "^1.1.2", "postcss": "^8.4.23", @@ -43,6 +44,7 @@ "svelte-check": "^3.0.0", "svelte-heros-v2": "^0.10.12", "svelte-preprocess": "^5.0.3", + "svelte-time": "^0.8.0", "tailwindcss": "^3.3.1", "tslib": "^2.4.1", "typescript": "^4.9.5", diff --git a/ng-app/src-tauri/Cargo.toml b/ng-app/src-tauri/Cargo.toml index 87b0728..412c048 100644 --- a/ng-app/src-tauri/Cargo.toml +++ b/ng-app/src-tauri/Cargo.toml @@ -27,6 +27,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" p2p-repo = { path = "../../p2p-repo" } p2p-net = { path = "../../p2p-net" } +p2p-client-ws = { path = "../../p2p-client-ws" } ng-wallet = { path = "../../ng-wallet" } async-std = { version = "1.12.0", features = ["attributes", "unstable"] } # tauri-plugin-window = { git = "https://git.nextgraph.org/NextGraph/plugins-workspace.git", branch="window-alpha.1-nextgraph" } diff --git a/ng-app/src-tauri/src/lib.rs b/ng-app/src-tauri/src/lib.rs index 458acc8..42c6ecc 100644 --- a/ng-app/src-tauri/src/lib.rs +++ b/ng-app/src-tauri/src/lib.rs @@ -9,12 +9,14 @@ use async_std::stream::StreamExt; use ng_wallet::types::*; use ng_wallet::*; +use p2p_client_ws::remote_ws::ConnectionWebSocket; use p2p_net::broker::*; -use p2p_net::types::{CreateAccountBSP, Invitation}; +use p2p_net::types::{ClientInfo, CreateAccountBSP, Invitation}; use p2p_net::utils::{decode_invitation_string, spawn_and_log_error, Receiver, ResultSend}; use p2p_repo::errors::NgError; use p2p_repo::log::*; use p2p_repo::types::*; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fs::{read, write}; use tauri::scope::ipc::RemoteDomainAccessScope; @@ -164,7 +166,7 @@ async fn wallet_import( previous_wallet: Wallet, opened_wallet: EncryptedWallet, app: tauri::AppHandle, -) -> Result<(), String> { +) -> Result<(String, ClientV0), String> { let path = app .path() .resolve("wallets", BaseDirectory::AppLocalData) @@ -185,10 +187,10 @@ async fn wallet_import( app, ) .map_err(|_| "Cannot create new session".to_string())?; - let (wallet, client) = opened_wallet_v0 + let (wallet, client_id, client) = opened_wallet_v0 .import(previous_wallet, session) .map_err(|e| e.to_string())?; - let lws = LocalWalletStorageV0::new(wallet, client); + let lws = LocalWalletStorageV0::new(wallet, &client); wallets.insert(wallet_name, lws); let lws_ser = LocalWalletStorage::v0_to_vec(wallets); @@ -197,7 +199,7 @@ async fn wallet_import( log_debug!("write {:?} {}", path, r.unwrap_err()); Err("Write error".to_string()) } else { - Ok(()) + Ok((client_id, client)) } } else { Err("Already present on this device".to_string()) @@ -351,6 +353,32 @@ async fn cancel_doc_sync_branch(stream_id: &str) -> Result<(), ()> { Ok(()) } +#[tauri::command(rename_all = "snake_case")] +async fn disconnections_subscribe(app: tauri::AppHandle) -> Result<(), ()> { + let main_window = app.get_window("main").unwrap(); + + let reader = BROKER + .write() + .await + .take_disconnections_receiver() + .ok_or(())?; + + async fn inner_task( + mut reader: Receiver, + main_window: tauri::Window, + ) -> ResultSend<()> { + while let Some(user_id) = reader.next().await { + main_window.emit("disconnections", user_id).unwrap(); + } + log_debug!("END OF disconnections listener"); + Ok(()) + } + + spawn_and_log_error(inner_task(reader, main_window)); + + Ok(()) +} + #[tauri::command(rename_all = "snake_case")] async fn doc_get_file_from_store_with_object_ref( nuri: &str, @@ -377,6 +405,58 @@ async fn doc_get_file_from_store_with_object_ref( Ok(obj_content) } +#[tauri::command(rename_all = "snake_case")] +async fn broker_disconnect() { + Broker::close_all_connections().await; +} + +#[derive(Serialize, Deserialize)] +struct ConnectionInfo { + pub server_id: String, + pub server_ip: String, + pub error: Option, + pub since: u64, +} + +#[tauri::command(rename_all = "snake_case")] +async fn broker_connect( + client: PubKey, + info: ClientInfo, + session: HashMap, + opened_wallet: EncryptedWallet, + location: Option, +) -> Result, String> { + let mut opened_connections: HashMap = HashMap::new(); + + let results = connect_wallet( + client, + info, + session, + opened_wallet, + location, + Box::new(ConnectionWebSocket {}), + ) + .await?; + + log_debug!("{:?}", results); + + for result in results { + opened_connections.insert( + result.0, + ConnectionInfo { + server_id: result.1, + server_ip: result.2, + error: result.3, + since: result.4 as u64, + }, + ); + } + + BROKER.read().await.print_status(); + + Ok(opened_connections) +} + #[derive(Default)] pub struct AppBuilder { setup: Option, @@ -435,6 +515,9 @@ impl AppBuilder { get_wallets_from_localstorage, open_window, decode_invitation, + disconnections_subscribe, + broker_connect, + broker_disconnect, ]) .run(tauri::generate_context!()) .expect("error while running tauri application"); diff --git a/ng-app/src/App.svelte b/ng-app/src/App.svelte index 6ba0cdc..dfc5dc7 100644 --- a/ng-app/src/App.svelte +++ b/ng-app/src/App.svelte @@ -17,6 +17,8 @@ active_wallet, opened_wallets, active_session, + close_active_session, + disconnections_subscribe, } from "./store"; import Home from "./routes/Home.svelte"; @@ -49,6 +51,11 @@ let unsub_main_close; onMount(async () => { + try { + await disconnections_subscribe(); + } catch (e) { + console.log("called disconnections_subscribe twice"); + } let tauri_platform = import.meta.env.TAURI_PLATFORM; if (tauri_platform) { //console.log(await ng.test()); @@ -58,6 +65,7 @@ unsubscribe = active_wallet.subscribe((value) => { if (value && !value.wallet) { active_wallet.set(undefined); + push("#/wallet/login"); } }); @@ -86,7 +94,7 @@ wallets.set(await ng.get_wallets_from_localstorage()); wallet_channel = new BroadcastChannel("ng_wallet"); wallet_channel.postMessage({ cmd: "is_opened" }, location.href); - wallet_channel.onmessage = (event) => { + wallet_channel.onmessage = async (event) => { console.log(event.data.cmd, event.data); if (!location.href.startsWith(event.origin)) return; switch (event.data.cmd) { @@ -121,7 +129,7 @@ return w; }); if ($active_wallet && $active_wallet.id == event.data.walletid) { - active_session.set(undefined); + await close_active_session(); active_wallet.set(undefined); push("#/wallet/login"); } @@ -141,7 +149,7 @@ location.href ); active_wallet.set(undefined); - active_session.set(undefined); + //active_session.set(undefined); opened_wallets.update((w) => { delete w[value.id]; return w; diff --git a/ng-app/src/api.ts b/ng-app/src/api.ts index 365ada3..69de723 100644 --- a/ng-app/src/api.ts +++ b/ng-app/src/api.ts @@ -25,6 +25,8 @@ const mapping = { "get_wallets_from_localstorage": [], "open_window": ["url","label","title"], "decode_invitation": ["invite"], + "broker_disconnect": [], + "broker_connect": ["client","info","session","opened_wallet","location"], "test": [ ] } @@ -37,7 +39,7 @@ const handler = { let sdk = await import("ng-sdk-js") if (path[0] === "client_info") { let client_info = await Reflect.apply(sdk[path], caller, args); - client_info.version=version; + client_info.V0.version=version; //console.log(client_info); return client_info; } else if (path[0] === "get_wallets_from_localstorage") { @@ -89,7 +91,25 @@ const handler = { }; //console.log(res); return res; - } else if (path[0] === "doc_sync_branch") { + } else if (path[0] === "disconnections_subscribe") { + let { getCurrent } = await import("@tauri-apps/plugin-window"); + let callback = args[0]; + let unlisten = await getCurrent().listen("disconnections", (event) => { + callback(event.payload).then(()=> {}) + }) + await tauri.invoke(path[0],{}); + return () => { + unlisten(); + } + } else if (path[0] === "broker_connect") { + let arg = {}; + args.map((el,ix) => arg[mapping[path[0]][ix]]=el) + let ret = await tauri.invoke(path[0],arg); + for (let e of Object.entries(ret)) { + e[1].since = new Date(e[1].since); + } + return ret; + }else if (path[0] === "doc_sync_branch") { let stream_id = (lastStreamId += 1).toString(); console.log("stream_id",stream_id); let { getCurrent } = await import("@tauri-apps/plugin-window"); diff --git a/ng-app/src/lib/Home.svelte b/ng-app/src/lib/Home.svelte index 9c2b286..7a6d570 100644 --- a/ng-app/src/lib/Home.svelte +++ b/ng-app/src/lib/Home.svelte @@ -10,16 +10,19 @@ --> @@ -161,9 +173,50 @@
  • - Personal + { + if (personal_site_status.error) { + $connections[personal_site].connecting = true; + await reconnect(); + } else { + $connections[personal_site].error = "Stopped"; + personal_site_status.since = new Date(); + await ng.broker_disconnect(); + } + }} + checked={personal_site_status && + (personal_site_status.connecting || + !personal_site_status.error)}>Personal
  • + {#if personal_site_status} +
  • + {#if personal_site_status.connecting} + Connecting... + {:else} + {#if !personal_site_status.error} + Connected + {:else} + {personal_site_status.error} + {/if} +
  • + {/if} diff --git a/ng-app/src/routes/WalletLogin.svelte b/ng-app/src/routes/WalletLogin.svelte index 6220f02..d5731d3 100644 --- a/ng-app/src/routes/WalletLogin.svelte +++ b/ng-app/src/routes/WalletLogin.svelte @@ -89,10 +89,10 @@ console.error(event.detail); } async function gotWallet(event) { - //console.log(event.detail); if (importing) { try { - await ng.wallet_import(wallet, event.detail.wallet); + let new_client = await ng.wallet_import(wallet, event.detail.wallet); + event.detail.wallet.V0.clients[new_client[0]] = new_client[1]; let walls = await ng.get_wallets_from_localstorage(); wallets.set(walls); } catch (e) { @@ -321,8 +321,10 @@ - {:else if step == "security"}{:else if step == "qrcode"}{:else if step == "cloud"}{:else if step == "loggedin"}you - are logged in{/if} + {:else if step == "security"}{:else if step == "qrcode"}{:else if step == "cloud"} + + {:else if step == "loggedin"} + You are logged in.
    please wait while the app is loading...{/if}
    diff --git a/ng-app/src/store.ts b/ng-app/src/store.ts index a9f3f5d..21ce9a4 100644 --- a/ng-app/src/store.ts +++ b/ng-app/src/store.ts @@ -7,7 +7,7 @@ // notice may not be copied, modified, or distributed except // according to those terms. -import { writable, readonly, derived } from "svelte/store"; +import { writable, readonly, derived, get } from "svelte/store"; import ng from "./api"; let all_branches = {}; @@ -19,7 +19,30 @@ export const active_wallet = writable(undefined); export const wallets = writable({}); -export const online = writable(false); +export const connections = writable({}); + +let next_reconnect = 0; + +export const online = derived(connections,($connections) => { + for (const cnx of Object.keys($connections)) { + if (!$connections[cnx].error) return true; + else if ($connections[cnx].error=="PeerAlreadyConnected") { + connections.update((c) => { + c[cnx].error = undefined; + return c; + }); + return true; } + else if ($connections[cnx].error=="ConnectionError" && !$connections[cnx].connecting && next_reconnect==0) { + console.log("will try reconnect in 1 min"); + next_reconnect = 60; + setTimeout(async ()=> { + next_reconnect = 0; + await reconnect(); + },60000); + } + } + return false; +}); export const has_wallets = derived(wallets,($wallets) => Object.keys($wallets).length); @@ -31,16 +54,62 @@ export const set_active_session = function(session) { export { writable, readonly, derived }; -export const close_active_wallet = function() { +export const close_active_wallet = async function() { - active_session.set(undefined); + await close_active_session(); active_wallet.update((w) => { delete w.wallet; return w; }); - } +export const close_active_session = async function() { + + active_session.set(undefined); + await ng.broker_disconnect(); + +} + +const can_connect = derived([active_wallet, active_session], ([$s1, $s2]) => [ + $s1, + $s2, + ] +); + +export const reconnect = async function() { + try { + let client = get(wallets)[get(active_wallet).id].client; + let info = await ng.client_info() + //console.log("Connecting with",client,info); + connections.set(await ng.broker_connect( + client, + info, + get(active_session), + get(active_wallet).wallet, + location.href + )); + }catch (e) { + console.error(e) + } +} +export const disconnections_subscribe = async function() { + let disconnections_unsub = await ng.disconnections_subscribe(async (user_id) => { + console.log("DISCONNECTION FOR USER", user_id); + connections.update((c) => { + c[user_id].error = "ConnectionError"; + c[user_id].since = new Date(); + return c; + }); + }); +} + +can_connect.subscribe(async (value) => { + if (value[0] && value[0].wallet && value[1]) { + + await reconnect(); + } + }); + const branch_commits = (nura, sub) => { // console.log("branch_commits") // const { subscribe, set, update } = writable([]); // create the underlying writable store diff --git a/ng-app/src/wallet_emojis.ts b/ng-app/src/wallet_emojis.ts index 6e29464..6c160d8 100644 --- a/ng-app/src/wallet_emojis.ts +++ b/ng-app/src/wallet_emojis.ts @@ -734,7 +734,7 @@ let face = [ { hexcode: "1fab4", shortcode: "potted_plant", - code: "plant", + code: "potted_plant", }, { diff --git a/ng-sdk-js/Cargo.toml b/ng-sdk-js/Cargo.toml index dd28866..713bfc2 100644 --- a/ng-sdk-js/Cargo.toml +++ b/ng-sdk-js/Cargo.toml @@ -38,7 +38,7 @@ base64-url = "2.0.0" [target.'cfg(target_arch = "wasm32")'.dependencies] js-sys = "0.3.61" -serde-wasm-bindgen = "0.5" +serde-wasm-bindgen = "0.6" wasm-bindgen-futures = "0.4.34" # web-sys = { version = "0.3.61", features = ["Window"] } gloo-timers = "0.2.6" diff --git a/ng-sdk-js/src/lib.rs b/ng-sdk-js/src/lib.rs index 1b482f5..b031bd4 100644 --- a/ng-sdk-js/src/lib.rs +++ b/ng-sdk-js/src/lib.rs @@ -23,7 +23,7 @@ use p2p_net::broker::*; use p2p_net::connection::{ClientConfig, StartConfig}; use p2p_net::types::{ BootstrapContent, BootstrapContentV0, ClientId, ClientInfo, ClientInfoV0, ClientType, - CreateAccountBSP, DirectPeerId, UserId, IP, + CreateAccountBSP, DirectPeerId, Identity, UserId, IP, }; use p2p_net::utils::{decode_invitation_string, spawn_and_log_error, Receiver, ResultSend, Sender}; #[cfg(target_arch = "wasm32")] @@ -125,7 +125,9 @@ pub fn wallet_open_wallet_with_pazzle( .map_err(|_| "Deserialization error of pin")?; let res = open_wallet_with_pazzle(wallet, pazzle, pin); match res { - Ok(r) => Ok(serde_wasm_bindgen::to_value(&r).unwrap()), + Ok(r) => Ok(r + .serialize(&serde_wasm_bindgen::Serializer::new().serialize_maps_as_objects(true)) + .unwrap()), Err(e) => Err(e.to_string().into()), } } @@ -294,7 +296,7 @@ pub async fn wallet_open_file(js_file: JsValue) -> Result { pub async fn wallet_import( js_previous_wallet: JsValue, //Wallet, js_opened_wallet: JsValue, //EncryptedWallet -) -> Result<(), String> { +) -> Result { let previous_wallet = serde_wasm_bindgen::from_value::(js_previous_wallet) .map_err(|_| "Deserialization error of Wallet".to_string())?; let mut opened_wallet = serde_wasm_bindgen::from_value::(js_opened_wallet) @@ -312,10 +314,10 @@ pub async fn wallet_import( opened_wallet_v0.personal_site, ) .map_err(|e| format!("Cannot create new session: {e}"))?; - let (wallet, client) = opened_wallet_v0 + let (wallet, client_id, client) = opened_wallet_v0 .import(previous_wallet, session) .map_err(|e| e.to_string())?; - let lws = LocalWalletStorageV0::new(wallet, client); + let lws = LocalWalletStorageV0::new(wallet, &client); wallets.insert(wallet_name, lws); @@ -325,7 +327,7 @@ pub async fn wallet_import( if r.is_some() { return Err(r.unwrap()); } - Ok(()) + Ok(serde_wasm_bindgen::to_value(&(client_id, client)).unwrap()) } else { Err("Wallet already present on this device".to_string()) } @@ -364,13 +366,13 @@ extern "C" { #[cfg(wasmpack_target = "nodejs")] #[wasm_bindgen] pub fn client_info() -> JsValue { - let res = ClientInfoV0 { + let res = ClientInfo::V0(ClientInfoV0 { client_type: ClientType::NodeService, details: client_details(), version: version(), timestamp_install: 0, timestamp_updated: 0, - }; + }); //res serde_wasm_bindgen::to_value(&res).unwrap() } @@ -429,7 +431,7 @@ pub fn client_info_() -> ClientInfoV0 { #[cfg(all(not(wasmpack_target = "nodejs"), target_arch = "wasm32"))] #[wasm_bindgen] pub fn client_info() -> JsValue { - let res = client_info_(); + let res = ClientInfo::V0(client_info_()); serde_wasm_bindgen::to_value(&res).unwrap() } @@ -523,6 +525,46 @@ pub async fn doc_sync_branch(anuri: String, callback: &js_sys::Function) -> JsVa return ret; } +#[cfg(target_arch = "wasm32")] +#[wasm_bindgen] +pub async fn disconnections_subscribe(callback: &js_sys::Function) -> Result { + let vec: Vec = vec![2; 10]; + let view = unsafe { Uint8Array::view(&vec) }; + let x = JsValue::from(Uint8Array::new(view.as_ref())); + + let mut reader; + { + reader = BROKER + .write() + .await + .take_disconnections_receiver() + .ok_or(false)?; + } + + async fn inner_task( + mut reader: Receiver, + callback: js_sys::Function, + ) -> ResultSend<()> { + while let Some(user_id) = reader.next().await { + let this = JsValue::null(); + let xx = serde_wasm_bindgen::to_value(&user_id).unwrap(); + let jsval: JsValue = callback.call1(&this, &xx).unwrap(); + let promise_res: Result = jsval.dyn_into(); + match promise_res { + Ok(promise) => { + JsFuture::from(promise).await; + } + Err(_) => {} + } + } + log_debug!("END OF disconnections reader"); + Ok(()) + } + + spawn_and_log_error(inner_task(reader, callback.clone())); + Ok(true.into()) +} + #[cfg(target_arch = "wasm32")] #[wasm_bindgen] pub async fn probe() { @@ -562,7 +604,7 @@ pub async fn start() { .write() .await .connect( - Box::new(ConnectionWebSocket {}), + Arc::new(Box::new(ConnectionWebSocket {})), keys.0, keys.1, server_key, @@ -571,7 +613,6 @@ pub async fn start() { user, user_priv, client, - client_priv, info: ClientInfo::V0(client_info_()), registration: None, }), @@ -610,18 +651,73 @@ pub async fn start() { spawn_and_log_error(inner_task()).await; } -#[cfg(not(target_arch = "wasm32"))] +#[cfg(target_arch = "wasm32")] #[wasm_bindgen] -pub fn start() { - //alert(&format!("I say: {}", name)); - task::spawn(async move {}); +pub async fn broker_disconnect() { + Broker::close_all_connections().await; } +#[cfg(target_arch = "wasm32")] #[wasm_bindgen] -pub fn change(name: &str) -> JsValue { - let mut random_buf = [0u8; 32]; - getrandom::getrandom(&mut random_buf).unwrap(); - JsValue::from_str(&format!("Hellooo, {}!", name)) +pub async fn broker_connect( + client_pub_key_js: JsValue, + client_info_js: JsValue, + session_js: JsValue, + opened_wallet_js: JsValue, + location: Option, +) -> Result { + let client = serde_wasm_bindgen::from_value::(client_pub_key_js) + .map_err(|_| "serde error on client")?; + let info = serde_wasm_bindgen::from_value::(client_info_js) + .map_err(|_| "serde error on info")?; + let session = + serde_wasm_bindgen::from_value::>(session_js) + .map_err(|_| "serde error on session")?; + let opened_wallet = serde_wasm_bindgen::from_value::(opened_wallet_js) + .map_err(|_| "serde error on opened_wallet")?; + + #[derive(Serialize, Deserialize)] + struct ConnectionInfo { + pub server_id: String, + pub server_ip: String, + pub error: Option, + #[serde(with = "serde_wasm_bindgen::preserve")] + pub since: js_sys::Date, + } + + let mut opened_connections: HashMap = HashMap::new(); + + let results = connect_wallet( + client, + info, + session, + opened_wallet, + location, + Box::new(ConnectionWebSocket {}), + ) + .await?; + + log_debug!("{:?}", results); + + for result in results { + let mut date = js_sys::Date::new_0(); + date.set_time(result.4); + opened_connections.insert( + result.0, + ConnectionInfo { + server_id: result.1, + server_ip: result.2, + error: result.3, + since: date, + }, + ); + } + + BROKER.read().await.print_status(); + + Ok(opened_connections + .serialize(&serde_wasm_bindgen::Serializer::new().serialize_maps_as_objects(true)) + .unwrap()) } #[cfg(target_arch = "wasm32")] diff --git a/ng-wallet/src/emojis.rs b/ng-wallet/src/emojis.rs index 18c4991..4b131b1 100644 --- a/ng-wallet/src/emojis.rs +++ b/ng-wallet/src/emojis.rs @@ -698,7 +698,7 @@ const plants: [EmojiDef<'static>; 15] = [ EmojiDef { hexcode: "1fab4", shortcode: "potted_plant", - code: "plant", + code: "potted_plant", }, EmojiDef { hexcode: "1f490", diff --git a/ng-wallet/src/lib.rs b/ng-wallet/src/lib.rs index d55aaa9..759f73f 100644 --- a/ng-wallet/src/lib.rs +++ b/ng-wallet/src/lib.rs @@ -19,7 +19,7 @@ pub mod bip39; pub mod emojis; -use std::{collections::HashMap, io::Cursor}; +use std::{collections::HashMap, io::Cursor, sync::Arc}; use crate::bip39::bip39_wordlist; use crate::types::*; @@ -34,7 +34,14 @@ use zeroize::{Zeroize, ZeroizeOnDrop}; use image::{imageops::FilterType, io::Reader as ImageReader, ImageOutputFormat}; use safe_transmute::transmute_to_bytes; -use p2p_net::types::{SiteType, SiteV0}; +use p2p_net::{ + broker::BROKER, + connection::{ClientConfig, StartConfig}, +}; +use p2p_net::{ + connection::IConnect, + types::{ClientInfo, Identity, SiteType, SiteV0}, +}; use p2p_repo::types::{PubKey, Sig, Timestamp}; use p2p_repo::utils::{generate_keypair, now_timestamp, sign, verify}; use p2p_repo::{log::*, types::PrivKey}; @@ -430,6 +437,124 @@ pub fn gen_shuffle_for_pazzle_opening(pazzle_length: u8) -> ShuffledPazzle { emoji_indices, } } +use web_time::SystemTime; +fn get_unix_time() -> f64 { + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as f64 +} + +// Result is a list of (user_id, server_id, server_ip, error, since_date) +pub async fn connect_wallet( + client: PubKey, + info: ClientInfo, + session: HashMap, + opened_wallet: EncryptedWallet, + location: Option, + cnx: Box, +) -> Result, f64)>, String> { + let mut result: Vec<(String, String, String, Option, f64)> = Vec::new(); + let arc_cnx = Arc::new(cnx); + match opened_wallet { + EncryptedWallet::V0(wallet) => { + log_debug!("XXXX {} {:?}", client.to_string(), wallet); + let auto_open = &wallet + .clients + .get(&client.to_string()) + .ok_or("Client is invalid")? + .auto_open; + for site in auto_open { + match site { + Identity::OrgSite(user) | Identity::IndividualSite(user) => { + let user_id = user.to_string(); + let peer_key = session + .get(&user_id) + .ok_or("Session is missing")? + .peer_key + .clone(); + let peer_id = peer_key.to_pub(); + let site = wallet.sites.get(&user_id); + if site.is_none() { + result.push(( + user_id, + "".into(), + "".into(), + Some("Site is missing".into()), + get_unix_time(), + )); + continue; + } + let site = site.unwrap(); + let user_priv = site.site_key.clone(); + let core = site.cores[0]; //TODO: cycle the other cores if failure to connect (failover) + let server_key = core.0; + let broker = wallet.brokers.get(&core.0.to_string()); + if broker.is_none() { + result.push(( + user_id, + core.0.to_string(), + "".into(), + Some("Broker is missing".into()), + get_unix_time(), + )); + continue; + } + let broker = broker.unwrap(); + for broker_info in broker { + match broker_info { + BrokerInfoV0::ServerV0(server) => { + let url = server.get_ws_url(&location).await; + //Option<(String, Vec)> + if url.is_some() { + let url = url.unwrap(); + if url.1.len() == 0 { + // TODO deal with BoxPublic and on tauri all Box... + let res = BROKER + .write() + .await + .connect( + arc_cnx.clone(), + peer_key, + peer_id, + server_key, + StartConfig::Client(ClientConfig { + url: url.0.clone(), + user: *user, + user_priv, + client, + info: info.clone(), + registration: Some(core.1), + }), + ) + .await; + log_debug!("broker.connect : {:?}", res); + + result.push(( + user_id, + core.0.to_string(), + url.0.into(), + match res { + Ok(_) => None, + Err(e) => Some(e.to_string()), + }, + get_unix_time(), + )); + } + break; // TODO implement failover + } + } + _ => {} + } + } + } + _ => unimplemented!(), + } + } + } + } + Ok(result) +} pub fn gen_shuffle_for_pin() -> Vec { let mut rng = rand::thread_rng(); diff --git a/ng-wallet/src/types.rs b/ng-wallet/src/types.rs index d7a52b4..0c1233c 100644 --- a/ng-wallet/src/types.rs +++ b/ng-wallet/src/types.rs @@ -121,7 +121,7 @@ impl From<&CreateWalletResultV0> for LocalWalletStorageV0 { } impl LocalWalletStorageV0 { - pub fn new(wallet: Wallet, client: ClientV0) -> Self { + pub fn new(wallet: Wallet, client: &ClientV0) -> Self { LocalWalletStorageV0 { bootstrap: BootstrapContent::V0(BootstrapContentV0::new()), wallet, @@ -157,6 +157,9 @@ pub struct ClientV0 { } impl ClientV0 { + pub fn id(&self) -> String { + self.priv_key.to_pub().to_string() + } #[deprecated(note = "**Don't use dummy method**")] pub fn dummy() -> Self { ClientV0 { @@ -212,6 +215,9 @@ pub struct EncryptedWalletV0 { #[zeroize(skip)] pub personal_site: PubKey, + #[zeroize(skip)] + pub personal_site_id: String, + #[zeroize(skip)] pub sites: HashMap, @@ -242,7 +248,7 @@ impl EncryptedWalletV0 { &mut self, previous_wallet: Wallet, session: SessionWalletStorageV0, - ) -> Result<(Wallet, ClientV0), NgWalletError> { + ) -> Result<(Wallet, String, ClientV0), NgWalletError> { if self.log.is_none() { return Err(NgWalletError::InternalError); } @@ -260,6 +266,7 @@ impl EncryptedWalletV0 { nonce, self.wallet_privkey.clone(), )?, + client.id(), client, )) } @@ -711,6 +718,7 @@ pub struct WalletOpCreateV0 { impl From<&WalletOpCreateV0> for EncryptedWalletV0 { fn from(op: &WalletOpCreateV0) -> Self { + let personal_site = op.personal_site.site_key.to_pub(); let mut wallet = EncryptedWalletV0 { wallet_privkey: op.wallet_privkey.clone(), wallet_id: op.wallet_privkey.to_pub().to_string(), @@ -718,7 +726,8 @@ impl From<&WalletOpCreateV0> for EncryptedWalletV0 { mnemonic: op.mnemonic.clone(), pin: op.pin.clone(), save_to_ng_one: op.save_to_ng_one.clone(), - personal_site: op.personal_site.site_key.to_pub(), + personal_site, + personal_site_id: personal_site.to_string(), sites: HashMap::new(), brokers: HashMap::new(), clients: HashMap::new(), diff --git a/ngcli/src/main.rs b/ngcli/src/main.rs index 1d4c9bb..2ab6407 100644 --- a/ngcli/src/main.rs +++ b/ngcli/src/main.rs @@ -1,7 +1,5 @@ // Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers // All rights reserved. -// This code is partly derived from work written by TG x Thoth from P2Pcollab. -// Copyright 2022 TG x Thoth // Licensed under the Apache License, Version 2.0 // // or the MIT license , diff --git a/p2p-client-ws/src/remote_ws.rs b/p2p-client-ws/src/remote_ws.rs index 1311b99..2b1bc01 100644 --- a/p2p-client-ws/src/remote_ws.rs +++ b/p2p-client-ws/src/remote_ws.rs @@ -48,7 +48,7 @@ impl IConnect for ConnectionWebSocket { peer_pubk: PubKey, remote_peer: DirectPeerId, config: StartConfig, - ) -> Result { + ) -> Result { let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS); let res = connect_async(url).await; @@ -56,7 +56,7 @@ impl IConnect for ConnectionWebSocket { match res { Err(e) => { log_debug!("Cannot connect: {:?}", e); - Err(NetError::ConnectionError) + Err(ProtocolError::ConnectionError) } Ok((websocket, _)) => { cnx.start_read_loop(None, Some(peer_privk), Some(remote_peer)); @@ -78,7 +78,7 @@ impl IConnect for ConnectionWebSocket { log_debug!("END of WS loop"); }); - cnx.start(config).await; + cnx.start(config).await?; Ok(cnx) } @@ -330,7 +330,7 @@ mod test { .write() .await .connect( - Box::new(ConnectionWebSocket {}), + Arc::new(Box::new(ConnectionWebSocket {})), keys.0, keys.1, server_key, @@ -339,7 +339,6 @@ mod test { user, user_priv, client, - client_priv, info: ClientInfo::new(ClientType::Cli, "".into(), "".into()), registration: None, }), diff --git a/p2p-client-ws/src/remote_ws_wasm.rs b/p2p-client-ws/src/remote_ws_wasm.rs index ac5fc8a..86e0b8e 100644 --- a/p2p-client-ws/src/remote_ws_wasm.rs +++ b/p2p-client-ws/src/remote_ws_wasm.rs @@ -42,13 +42,13 @@ impl IConnect for ConnectionWebSocket { peer_pubk: PubKey, remote_peer: DirectPeerId, config: StartConfig, - ) -> Result { + ) -> Result { log_debug!("url {}", url); let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS); let (mut ws, wsio) = WsMeta::connect(url, None).await.map_err(|e| { //log_debug!("{:?}", e); - NetError::ConnectionError + ProtocolError::ConnectionError })?; cnx.start_read_loop(None, Some(peer_privk), Some(remote_peer)); @@ -62,7 +62,7 @@ impl IConnect for ConnectionWebSocket { shutdown, )); - cnx.start(config).await; + cnx.start(config).await?; Ok(cnx) } diff --git a/p2p-net/src/actors/connecting.rs b/p2p-net/src/actors/connecting.rs new file mode 100644 index 0000000..3885f85 --- /dev/null +++ b/p2p-net/src/actors/connecting.rs @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers + * All rights reserved. + * Licensed under the Apache License, Version 2.0 + * + * or the MIT license , + * at your option. All files in the project carrying such + * notice may not be copied, modified, or distributed except + * according to those terms. +*/ +use crate::broker::{ServerConfig, BROKER}; +use crate::connection::NoiseFSM; +use crate::types::*; +use crate::{actor::*, errors::ProtocolError, types::ProtocolMessage}; + +use async_std::sync::Mutex; +use p2p_repo::log::*; +use p2p_repo::types::PubKey; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +pub type Connecting = (); + +impl From for ProtocolMessage { + fn from(msg: Connecting) -> ProtocolMessage { + unimplemented!(); + } +} + +impl Actor<'_, Connecting, ()> {} + +#[async_trait::async_trait] +impl EActor for Actor<'_, Connecting, ()> { + async fn respond( + &mut self, + msg: ProtocolMessage, + fsm: Arc>, + ) -> Result<(), ProtocolError> { + fsm.lock().await.remove_actor(0).await; + Ok(()) + } +} diff --git a/p2p-net/src/actors/mod.rs b/p2p-net/src/actors/mod.rs index fe3b8af..2896297 100644 --- a/p2p-net/src/actors/mod.rs +++ b/p2p-net/src/actors/mod.rs @@ -21,3 +21,6 @@ pub use add_invitation::*; pub mod list_invitations; pub use list_invitations::*; + +pub mod connecting; +pub use connecting::*; diff --git a/p2p-net/src/broker.rs b/p2p-net/src/broker.rs index 9fd6a04..725e7d1 100644 --- a/p2p-net/src/broker.rs +++ b/p2p-net/src/broker.rs @@ -90,6 +90,8 @@ pub struct Broker<'a> { test: u32, tauri_streams: HashMap>, + disconnections_sender: Sender, + disconnections_receiver: Option>, } impl<'a> Broker<'a> { @@ -441,6 +443,7 @@ impl<'a> Broker<'a> { let (shutdown_sender, shutdown_receiver) = mpsc::unbounded::(); let mut random_buf = [0u8; 4]; getrandom::getrandom(&mut random_buf).unwrap(); + let (disconnections_sender, disconnections_receiver) = mpsc::unbounded::(); Broker { anonymous_connections: HashMap::new(), #[cfg(not(target_arch = "wasm32"))] @@ -455,6 +458,8 @@ impl<'a> Broker<'a> { closing: false, test: u32::from_be_bytes(random_buf), server_storage: None, + disconnections_sender, + disconnections_receiver: Some(disconnections_receiver), } } @@ -526,6 +531,29 @@ impl<'a> Broker<'a> { .await; } + pub async fn close_all_connections() { + let peer_ids; + let anonymous; + { + let broker = BROKER.write().await; + if broker.closing { + return; + } + peer_ids = Vec::from_iter(broker.peers.keys().cloned()); + anonymous = Vec::from_iter(broker.anonymous_connections.keys().cloned()); + } + for peer_id in peer_ids { + BROKER + .write() + .await + .close_peer_connection_x(peer_id.1, peer_id.0) + .await; + } + for anon in anonymous { + BROKER.write().await.close_anonymous(anon.1, anon.0).await; + } + } + pub async fn shutdown(&mut self) { if self.closing { return; @@ -728,14 +756,14 @@ impl<'a> Broker<'a> { pub async fn connect( &mut self, - cnx: Box, + cnx: Arc>, peer_privk: PrivKey, peer_pubk: PubKey, remote_peer_id: DirectPeerId, config: StartConfig, - ) -> Result<(), NetError> { + ) -> Result<(), ProtocolError> { if self.closing { - return Err(NetError::Closing); + return Err(ProtocolError::Closing); } log_debug!("CONNECTING"); @@ -750,7 +778,7 @@ impl<'a> Broker<'a> { match already.unwrap().connected { PeerConnection::NONE => {} _ => { - return Err(NetError::PeerAlreadyConnected); + return Err(ProtocolError::PeerAlreadyConnected); } }; } @@ -784,7 +812,7 @@ impl<'a> Broker<'a> { self.direct_connections.insert(ip, dc); PeerConnection::Core(ip) } - StartConfig::Client(config) => PeerConnection::Client(connection), + StartConfig::Client(_config) => PeerConnection::Client(connection), _ => unimplemented!(), }; @@ -798,15 +826,16 @@ impl<'a> Broker<'a> { async fn watch_close( mut join: Receiver>, - cnx: Box, + cnx: Arc>, peer_privk: PrivKey, peer_pubkey: PubKey, remote_peer_id: [u8; 32], config: StartConfig, + mut disconnections_sender: Sender, ) -> ResultSend<()> { async move { let res = join.next().await; - log_debug!("SOCKET IS CLOSED {:?} {:?}", res, remote_peer_id); + log_info!("SOCKET IS CLOSED {:?} {:?}", res, remote_peer_id); if res.is_some() && res.as_ref().unwrap().is_left() && res.unwrap().unwrap_left() != NetError::Closing @@ -820,8 +849,13 @@ impl<'a> Broker<'a> { // .await; // log_debug!("SOCKET RECONNECTION {:?} {:?}", result, &remote_peer_id); // TODO: deal with error and incremental backoff + + // if all attempts fail : + if let Some(user) = config.get_user() { + disconnections_sender.send(user.to_string()).await; + } } else { - log_debug!("REMOVED"); + log_info!("REMOVED"); BROKER .write() .await @@ -838,10 +872,15 @@ impl<'a> Broker<'a> { peer_pubk, *remote_peer_id_dh.slice(), config, + self.disconnections_sender.clone(), )); Ok(()) } + pub fn take_disconnections_receiver(&mut self) -> Option> { + self.disconnections_receiver.take() + } + pub async fn close_peer_connection_x(&mut self, peer_id: X25519PubKey, user: Option) { if let Some(peer) = self.peers.get_mut(&(user, peer_id)) { match &mut peer.connected { @@ -878,13 +917,13 @@ impl<'a> Broker<'a> { pub fn print_status(&self) { self.peers.iter().for_each(|(peerId, peerInfo)| { - log_debug!("PEER in BROKER {:?} {:?}", peerId, peerInfo); + log_info!("PEER in BROKER {:?} {:?}", peerId, peerInfo); }); self.direct_connections.iter().for_each(|(ip, directCnx)| { - log_debug!("direct_connection in BROKER {:?} {:?}", ip, directCnx); + log_info!("direct_connection in BROKER {:?} {:?}", ip, directCnx); }); self.anonymous_connections.iter().for_each(|(binds, cb)| { - log_debug!( + log_info!( "ANONYMOUS remote {:?} local {:?} {:?}", binds.1, binds.0, diff --git a/p2p-net/src/connection.rs b/p2p-net/src/connection.rs index 751d092..9f5ef70 100644 --- a/p2p-net/src/connection.rs +++ b/p2p-net/src/connection.rs @@ -68,7 +68,7 @@ pub trait IConnect: Send + Sync { peer_pubk: PubKey, remote_peer: DirectPeerId, config: StartConfig, - ) -> Result; + ) -> Result; async fn probe(&self, ip: IP, port: u16) -> Result, ProtocolError>; } @@ -162,7 +162,6 @@ pub struct ClientConfig { pub user: PubKey, pub user_priv: PrivKey, pub client: PubKey, - pub client_priv: PrivKey, pub info: ClientInfo, pub registration: Option>, } @@ -215,6 +214,12 @@ impl StartConfig { _ => false, } } + pub fn is_admin(&self) -> bool { + match self { + StartConfig::Admin(_) => true, + _ => false, + } + } } impl NoiseFSM { @@ -786,6 +791,16 @@ impl NoiseFSM { log_debug!("AUTHENTICATION SUCCESSFUL ! waiting for requests on the client side"); + // we notify the actor "Connecting" that the connection is ready + let mut lock = self.actors.lock().await; + let exists = lock.remove(&0); + match exists { + Some(mut actor_sender) => { + let _ = actor_sender.send(ConnectionCommand::ReEnter).await; + } + _ => {} + } + return Ok(StepReply::NONE); } } @@ -930,7 +945,7 @@ impl ConnectionBase { res = locked_fsm.step(None).await; } } else { - panic!("shouldn't be here. ConnectionCommand is read_loop can only have 5 different variants") + panic!("shouldn't be here. ConnectionCommand in read_loop can only have 5 different variants") } match res { @@ -1084,7 +1099,7 @@ impl ConnectionBase { _ => Err(ProtocolError::ActorError), } } else { - panic!("cannot call probe on a server-side connection"); + panic!("cannot call admin on a server-side connection"); } } @@ -1145,9 +1160,10 @@ impl ConnectionBase { } } - pub async fn start(&mut self, config: StartConfig) { + pub async fn start(&mut self, config: StartConfig) -> Result<(), ProtocolError> { // BOOTSTRAP the protocol from client-side if !self.dir.is_server() { + let is_admin = config.is_admin(); let res; { let mut fsm = self.fsm.as_ref().unwrap().lock().await; @@ -1155,7 +1171,23 @@ impl ConnectionBase { res = fsm.step(None).await; } if let Err(err) = res { - self.send(ConnectionCommand::ProtocolError(err)).await; + self.send(ConnectionCommand::ProtocolError(err.clone())) + .await; + Err(err) + } else if !is_admin { + let mut actor = Box::new(Actor::::new(0, true)); + self.actors.lock().await.insert(0, actor.get_receiver_tx()); + + let mut receiver = actor.detach_receiver(); + match receiver.next().await { + Some(ConnectionCommand::ReEnter) => Ok(()), + Some(ConnectionCommand::ProtocolError(e)) => Err(e), + Some(ConnectionCommand::Error(e)) => Err(e.into()), + Some(ConnectionCommand::Close) => Err(ProtocolError::Closing), + _ => Err(ProtocolError::ActorError), + } + } else { + Ok(()) } } else { panic!("cannot call start on a server-side connection"); diff --git a/p2p-net/src/types.rs b/p2p-net/src/types.rs index 56d6bb9..cb4cf19 100644 --- a/p2p-net/src/types.rs +++ b/p2p-net/src/types.rs @@ -427,9 +427,12 @@ impl BrokerServerV0 { /// filtered by the current location url of the webpage /// on native apps (do not pass a location), returns or the connection URL without optional BindAddress or an empty string with /// several BindAddresses to try to connect to with .to_ws_url() - pub async fn get_ws_url(&self, location: Option) -> Option<(String, Vec)> { + pub async fn get_ws_url( + &self, + location: &Option, + ) -> Option<(String, Vec)> { if location.is_some() { - let location = location.unwrap(); + let location = location.as_ref().unwrap(); if location.starts_with(APP_NG_ONE_URL) { match &self.server_type { BrokerServerTypeV0::BoxPublic(addrs) => { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0d3957b..3d8f157 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -18,6 +18,7 @@ importers: autoprefixer: ^10.4.14 classnames: ^2.3.2 cross-env: ^7.0.3 + dayjs: ^1.11.10 flowbite: ^1.6.5 flowbite-svelte: ^0.43.3 internal-ip: ^7.0.0 @@ -31,6 +32,7 @@ importers: svelte-heros-v2: ^0.10.12 svelte-preprocess: ^5.0.3 svelte-spa-router: ^3.3.0 + svelte-time: ^0.8.0 tailwindcss: ^3.3.1 tslib: ^2.4.1 typescript: ^4.9.5 @@ -57,6 +59,7 @@ importers: '@types/node': 18.16.16 autoprefixer: 10.4.14_postcss@8.4.24 cross-env: 7.0.3 + dayjs: 1.11.10 internal-ip: 7.0.0 node-gzip: 1.1.2 postcss: 8.4.24 @@ -66,6 +69,7 @@ importers: svelte-check: 3.4.3_sxhny56dlbcmwov4vk7qwrzshi svelte-heros-v2: 0.10.12_svelte@3.59.1 svelte-preprocess: 5.0.4_vmz4xia4c7tzh4ii3qac2x3tom + svelte-time: 0.8.0 tailwindcss: 3.3.2 tslib: 2.5.3 typescript: 4.9.5 @@ -850,6 +854,10 @@ packages: css-tree: 2.2.1 dev: true + /dayjs/1.11.10: + resolution: {integrity: sha512-vjAczensTgRcqDERK0SR2XMwsF/tSvnvlv6VcF2GIhg6Sx4yOIt/irsr1RDJsKiIyBzJDpCoXiWWq28MqH2cnQ==} + dev: true + /debug/4.3.4: resolution: {integrity: sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==} engines: {node: '>=6.0'} @@ -1861,6 +1869,12 @@ packages: regexparam: 2.0.1 dev: false + /svelte-time/0.8.0: + resolution: {integrity: sha512-V0LBpJhYV2Q+jqiJ94ITAo51P6RIhrHQpDt3LCSk8PXfL2UMvSMlDzkqHq8mdKqmBCRZURnXhpynN03GQa/G/A==} + dependencies: + dayjs: 1.11.10 + dev: true + /svelte/3.59.1: resolution: {integrity: sha512-pKj8fEBmqf6mq3/NfrB9SLtcJcUvjYSWyePlfCqN9gujLB25RitWK8PvFzlwim6hD/We35KbPlRteuA6rnPGcQ==} engines: {node: '>= 8'}