connection to broker

pull/19/head
Niko PLP 1 year ago
parent 65d929c423
commit dc5810489f
  1. 5
      Cargo.lock
  2. 2
      ng-app/package.json
  3. 1
      ng-app/src-tauri/Cargo.toml
  4. 93
      ng-app/src-tauri/src/lib.rs
  5. 14
      ng-app/src/App.svelte
  6. 24
      ng-app/src/api.ts
  7. 11
      ng-app/src/lib/Home.svelte
  8. 2
      ng-app/src/lib/Login.svelte
  9. 64
      ng-app/src/routes/User.svelte
  10. 10
      ng-app/src/routes/WalletLogin.svelte
  11. 79
      ng-app/src/store.ts
  12. 2
      ng-app/src/wallet_emojis.ts
  13. 2
      ng-sdk-js/Cargo.toml
  14. 134
      ng-sdk-js/src/lib.rs
  15. 2
      ng-wallet/src/emojis.rs
  16. 129
      ng-wallet/src/lib.rs
  17. 15
      ng-wallet/src/types.rs
  18. 2
      ngcli/src/main.rs
  19. 9
      p2p-client-ws/src/remote_ws.rs
  20. 6
      p2p-client-ws/src/remote_ws_wasm.rs
  21. 42
      p2p-net/src/actors/connecting.rs
  22. 3
      p2p-net/src/actors/mod.rs
  23. 61
      p2p-net/src/broker.rs
  24. 44
      p2p-net/src/connection.rs
  25. 7
      p2p-net/src/types.rs
  26. 14
      pnpm-lock.yaml

5
Cargo.lock generated

@ -3043,6 +3043,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"async-std", "async-std",
"ng-wallet", "ng-wallet",
"p2p-client-ws",
"p2p-net", "p2p-net",
"p2p-repo", "p2p-repo",
"serde", "serde",
@ -4490,9 +4491,9 @@ dependencies = [
[[package]] [[package]]
name = "serde-wasm-bindgen" name = "serde-wasm-bindgen"
version = "0.5.0" version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3b143e2833c57ab9ad3ea280d21fd34e285a42837aeb0ee301f4f41890fa00e" checksum = "30c9933e5689bd420dc6c87b7a1835701810cbc10cd86a26e4da45b73e6b1d78"
dependencies = [ dependencies = [
"js-sys", "js-sys",
"serde", "serde",

@ -34,6 +34,7 @@
"@types/node": "^18.7.10", "@types/node": "^18.7.10",
"autoprefixer": "^10.4.14", "autoprefixer": "^10.4.14",
"cross-env": "^7.0.3", "cross-env": "^7.0.3",
"dayjs": "^1.11.10",
"internal-ip": "^7.0.0", "internal-ip": "^7.0.0",
"node-gzip": "^1.1.2", "node-gzip": "^1.1.2",
"postcss": "^8.4.23", "postcss": "^8.4.23",
@ -43,6 +44,7 @@
"svelte-check": "^3.0.0", "svelte-check": "^3.0.0",
"svelte-heros-v2": "^0.10.12", "svelte-heros-v2": "^0.10.12",
"svelte-preprocess": "^5.0.3", "svelte-preprocess": "^5.0.3",
"svelte-time": "^0.8.0",
"tailwindcss": "^3.3.1", "tailwindcss": "^3.3.1",
"tslib": "^2.4.1", "tslib": "^2.4.1",
"typescript": "^4.9.5", "typescript": "^4.9.5",

@ -27,6 +27,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
p2p-repo = { path = "../../p2p-repo" } p2p-repo = { path = "../../p2p-repo" }
p2p-net = { path = "../../p2p-net" } p2p-net = { path = "../../p2p-net" }
p2p-client-ws = { path = "../../p2p-client-ws" }
ng-wallet = { path = "../../ng-wallet" } ng-wallet = { path = "../../ng-wallet" }
async-std = { version = "1.12.0", features = ["attributes", "unstable"] } async-std = { version = "1.12.0", features = ["attributes", "unstable"] }
# tauri-plugin-window = { git = "https://git.nextgraph.org/NextGraph/plugins-workspace.git", branch="window-alpha.1-nextgraph" } # tauri-plugin-window = { git = "https://git.nextgraph.org/NextGraph/plugins-workspace.git", branch="window-alpha.1-nextgraph" }

@ -9,12 +9,14 @@
use async_std::stream::StreamExt; use async_std::stream::StreamExt;
use ng_wallet::types::*; use ng_wallet::types::*;
use ng_wallet::*; use ng_wallet::*;
use p2p_client_ws::remote_ws::ConnectionWebSocket;
use p2p_net::broker::*; use p2p_net::broker::*;
use p2p_net::types::{CreateAccountBSP, Invitation}; use p2p_net::types::{ClientInfo, CreateAccountBSP, Invitation};
use p2p_net::utils::{decode_invitation_string, spawn_and_log_error, Receiver, ResultSend}; use p2p_net::utils::{decode_invitation_string, spawn_and_log_error, Receiver, ResultSend};
use p2p_repo::errors::NgError; use p2p_repo::errors::NgError;
use p2p_repo::log::*; use p2p_repo::log::*;
use p2p_repo::types::*; use p2p_repo::types::*;
use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::{read, write}; use std::fs::{read, write};
use tauri::scope::ipc::RemoteDomainAccessScope; use tauri::scope::ipc::RemoteDomainAccessScope;
@ -164,7 +166,7 @@ async fn wallet_import(
previous_wallet: Wallet, previous_wallet: Wallet,
opened_wallet: EncryptedWallet, opened_wallet: EncryptedWallet,
app: tauri::AppHandle, app: tauri::AppHandle,
) -> Result<(), String> { ) -> Result<(String, ClientV0), String> {
let path = app let path = app
.path() .path()
.resolve("wallets", BaseDirectory::AppLocalData) .resolve("wallets", BaseDirectory::AppLocalData)
@ -185,10 +187,10 @@ async fn wallet_import(
app, app,
) )
.map_err(|_| "Cannot create new session".to_string())?; .map_err(|_| "Cannot create new session".to_string())?;
let (wallet, client) = opened_wallet_v0 let (wallet, client_id, client) = opened_wallet_v0
.import(previous_wallet, session) .import(previous_wallet, session)
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
let lws = LocalWalletStorageV0::new(wallet, client); let lws = LocalWalletStorageV0::new(wallet, &client);
wallets.insert(wallet_name, lws); wallets.insert(wallet_name, lws);
let lws_ser = LocalWalletStorage::v0_to_vec(wallets); let lws_ser = LocalWalletStorage::v0_to_vec(wallets);
@ -197,7 +199,7 @@ async fn wallet_import(
log_debug!("write {:?} {}", path, r.unwrap_err()); log_debug!("write {:?} {}", path, r.unwrap_err());
Err("Write error".to_string()) Err("Write error".to_string())
} else { } else {
Ok(()) Ok((client_id, client))
} }
} else { } else {
Err("Already present on this device".to_string()) Err("Already present on this device".to_string())
@ -351,6 +353,32 @@ async fn cancel_doc_sync_branch(stream_id: &str) -> Result<(), ()> {
Ok(()) Ok(())
} }
#[tauri::command(rename_all = "snake_case")]
async fn disconnections_subscribe(app: tauri::AppHandle) -> Result<(), ()> {
let main_window = app.get_window("main").unwrap();
let reader = BROKER
.write()
.await
.take_disconnections_receiver()
.ok_or(())?;
async fn inner_task(
mut reader: Receiver<String>,
main_window: tauri::Window,
) -> ResultSend<()> {
while let Some(user_id) = reader.next().await {
main_window.emit("disconnections", user_id).unwrap();
}
log_debug!("END OF disconnections listener");
Ok(())
}
spawn_and_log_error(inner_task(reader, main_window));
Ok(())
}
#[tauri::command(rename_all = "snake_case")] #[tauri::command(rename_all = "snake_case")]
async fn doc_get_file_from_store_with_object_ref( async fn doc_get_file_from_store_with_object_ref(
nuri: &str, nuri: &str,
@ -377,6 +405,58 @@ async fn doc_get_file_from_store_with_object_ref(
Ok(obj_content) Ok(obj_content)
} }
#[tauri::command(rename_all = "snake_case")]
async fn broker_disconnect() {
Broker::close_all_connections().await;
}
#[derive(Serialize, Deserialize)]
struct ConnectionInfo {
pub server_id: String,
pub server_ip: String,
pub error: Option<String>,
pub since: u64,
}
#[tauri::command(rename_all = "snake_case")]
async fn broker_connect(
client: PubKey,
info: ClientInfo,
session: HashMap<String, SessionPeerStorageV0>,
opened_wallet: EncryptedWallet,
location: Option<String>,
) -> Result<HashMap<String, ConnectionInfo>, String> {
let mut opened_connections: HashMap<String, ConnectionInfo> = HashMap::new();
let results = connect_wallet(
client,
info,
session,
opened_wallet,
location,
Box::new(ConnectionWebSocket {}),
)
.await?;
log_debug!("{:?}", results);
for result in results {
opened_connections.insert(
result.0,
ConnectionInfo {
server_id: result.1,
server_ip: result.2,
error: result.3,
since: result.4 as u64,
},
);
}
BROKER.read().await.print_status();
Ok(opened_connections)
}
#[derive(Default)] #[derive(Default)]
pub struct AppBuilder { pub struct AppBuilder {
setup: Option<SetupHook>, setup: Option<SetupHook>,
@ -435,6 +515,9 @@ impl AppBuilder {
get_wallets_from_localstorage, get_wallets_from_localstorage,
open_window, open_window,
decode_invitation, decode_invitation,
disconnections_subscribe,
broker_connect,
broker_disconnect,
]) ])
.run(tauri::generate_context!()) .run(tauri::generate_context!())
.expect("error while running tauri application"); .expect("error while running tauri application");

@ -17,6 +17,8 @@
active_wallet, active_wallet,
opened_wallets, opened_wallets,
active_session, active_session,
close_active_session,
disconnections_subscribe,
} from "./store"; } from "./store";
import Home from "./routes/Home.svelte"; import Home from "./routes/Home.svelte";
@ -49,6 +51,11 @@
let unsub_main_close; let unsub_main_close;
onMount(async () => { onMount(async () => {
try {
await disconnections_subscribe();
} catch (e) {
console.log("called disconnections_subscribe twice");
}
let tauri_platform = import.meta.env.TAURI_PLATFORM; let tauri_platform = import.meta.env.TAURI_PLATFORM;
if (tauri_platform) { if (tauri_platform) {
//console.log(await ng.test()); //console.log(await ng.test());
@ -58,6 +65,7 @@
unsubscribe = active_wallet.subscribe((value) => { unsubscribe = active_wallet.subscribe((value) => {
if (value && !value.wallet) { if (value && !value.wallet) {
active_wallet.set(undefined); active_wallet.set(undefined);
push("#/wallet/login");
} }
}); });
@ -86,7 +94,7 @@
wallets.set(await ng.get_wallets_from_localstorage()); wallets.set(await ng.get_wallets_from_localstorage());
wallet_channel = new BroadcastChannel("ng_wallet"); wallet_channel = new BroadcastChannel("ng_wallet");
wallet_channel.postMessage({ cmd: "is_opened" }, location.href); wallet_channel.postMessage({ cmd: "is_opened" }, location.href);
wallet_channel.onmessage = (event) => { wallet_channel.onmessage = async (event) => {
console.log(event.data.cmd, event.data); console.log(event.data.cmd, event.data);
if (!location.href.startsWith(event.origin)) return; if (!location.href.startsWith(event.origin)) return;
switch (event.data.cmd) { switch (event.data.cmd) {
@ -121,7 +129,7 @@
return w; return w;
}); });
if ($active_wallet && $active_wallet.id == event.data.walletid) { if ($active_wallet && $active_wallet.id == event.data.walletid) {
active_session.set(undefined); await close_active_session();
active_wallet.set(undefined); active_wallet.set(undefined);
push("#/wallet/login"); push("#/wallet/login");
} }
@ -141,7 +149,7 @@
location.href location.href
); );
active_wallet.set(undefined); active_wallet.set(undefined);
active_session.set(undefined); //active_session.set(undefined);
opened_wallets.update((w) => { opened_wallets.update((w) => {
delete w[value.id]; delete w[value.id];
return w; return w;

@ -25,6 +25,8 @@ const mapping = {
"get_wallets_from_localstorage": [], "get_wallets_from_localstorage": [],
"open_window": ["url","label","title"], "open_window": ["url","label","title"],
"decode_invitation": ["invite"], "decode_invitation": ["invite"],
"broker_disconnect": [],
"broker_connect": ["client","info","session","opened_wallet","location"],
"test": [ ] "test": [ ]
} }
@ -37,7 +39,7 @@ const handler = {
let sdk = await import("ng-sdk-js") let sdk = await import("ng-sdk-js")
if (path[0] === "client_info") { if (path[0] === "client_info") {
let client_info = await Reflect.apply(sdk[path], caller, args); let client_info = await Reflect.apply(sdk[path], caller, args);
client_info.version=version; client_info.V0.version=version;
//console.log(client_info); //console.log(client_info);
return client_info; return client_info;
} else if (path[0] === "get_wallets_from_localstorage") { } else if (path[0] === "get_wallets_from_localstorage") {
@ -89,7 +91,25 @@ const handler = {
}; };
//console.log(res); //console.log(res);
return res; return res;
} else if (path[0] === "doc_sync_branch") { } else if (path[0] === "disconnections_subscribe") {
let { getCurrent } = await import("@tauri-apps/plugin-window");
let callback = args[0];
let unlisten = await getCurrent().listen("disconnections", (event) => {
callback(event.payload).then(()=> {})
})
await tauri.invoke(path[0],{});
return () => {
unlisten();
}
} else if (path[0] === "broker_connect") {
let arg = {};
args.map((el,ix) => arg[mapping[path[0]][ix]]=el)
let ret = await tauri.invoke(path[0],arg);
for (let e of Object.entries(ret)) {
e[1].since = new Date(e[1].since);
}
return ret;
}else if (path[0] === "doc_sync_branch") {
let stream_id = (lastStreamId += 1).toString(); let stream_id = (lastStreamId += 1).toString();
console.log("stream_id",stream_id); console.log("stream_id",stream_id);
let { getCurrent } = await import("@tauri-apps/plugin-window"); let { getCurrent } = await import("@tauri-apps/plugin-window");

@ -10,16 +10,19 @@
--> -->
<script lang="ts"> <script lang="ts">
import { close_active_wallet, online } from "../store"; import {
online,
wallets,
active_wallet,
active_session,
connections,
} from "../store";
import FullLayout from "./FullLayout.svelte"; import FullLayout from "./FullLayout.svelte";
import { PaperAirplane, Bell, ArrowRightOnRectangle } from "svelte-heros-v2"; import { PaperAirplane, Bell, ArrowRightOnRectangle } from "svelte-heros-v2";
// @ts-ignore // @ts-ignore
import Logo from "../assets/nextgraph.svg?component"; import Logo from "../assets/nextgraph.svg?component";
// @ts-ignore // @ts-ignore
import LogoGray from "../assets/nextgraph-gray.svg?component"; import LogoGray from "../assets/nextgraph-gray.svg?component";
function logout() {
close_active_wallet();
}
let width: number; let width: number;
let breakPoint: number = 662; let breakPoint: number = 662;

@ -373,7 +373,7 @@
</div> </div>
{:else} {:else}
<div class=" max-w-6xl lg:px-8 mx-auto px-4 text-green-800"> <div class=" max-w-6xl lg:px-8 mx-auto px-4 text-green-800">
Your wallet is opened! Your wallet is opened! <br />Please wait while the app is loading...
<svg <svg
class="my-10 h-16 w-16 mx-auto" class="my-10 h-16 w-16 mx-auto"
fill="none" fill="none"

@ -13,6 +13,7 @@
import { link, push } from "svelte-spa-router"; import { link, push } from "svelte-spa-router";
import CenteredLayout from "../lib/CenteredLayout.svelte"; import CenteredLayout from "../lib/CenteredLayout.svelte";
import { version } from "../../package.json"; import { version } from "../../package.json";
import Time from "svelte-time";
// @ts-ignore // @ts-ignore
import Logo from "../assets/nextgraph.svg?component"; import Logo from "../assets/nextgraph.svg?component";
import { import {
@ -35,7 +36,14 @@
SidebarWrapper, SidebarWrapper,
} from "flowbite-svelte"; } from "flowbite-svelte";
import { online, close_active_wallet, active_session } from "../store"; import {
online,
close_active_wallet,
active_session,
active_wallet,
connections,
reconnect,
} from "../store";
import { import {
NG_EU_BSP, NG_EU_BSP,
@ -63,9 +71,13 @@
} }
}); });
function logout() { async function logout() {
close_active_wallet(); await close_active_wallet();
} }
$: personal_site = $active_wallet?.wallet?.V0.personal_site_id;
$: personal_site_status = $connections[personal_site];
</script> </script>
<CenteredLayout> <CenteredLayout>
@ -161,9 +173,50 @@
</SidebarItem> </SidebarItem>
<li <li
class="flex items-center p-2 text-base font-normal text-gray-900" class="flex items-center p-2 text-base font-normal text-gray-900"
title={(personal_site_status &&
personal_site_status.server_ip +
" " +
personal_site_status.server_id) ||
"offline"}
> >
<Toggle checked={true}>Personal</Toggle> <Toggle
on:change={async () => {
if (personal_site_status.error) {
$connections[personal_site].connecting = true;
await reconnect();
} else {
$connections[personal_site].error = "Stopped";
personal_site_status.since = new Date();
await ng.broker_disconnect();
}
}}
checked={personal_site_status &&
(personal_site_status.connecting ||
!personal_site_status.error)}>Personal</Toggle
>
</li> </li>
{#if personal_site_status}
<li
class="site-cnx-details flex items-center px-2 text-sm text-left font-normal text-gray-600"
>
{#if personal_site_status.connecting}
Connecting...
{:else}
{#if !personal_site_status.error}
Connected
{:else}
{personal_site_status.error}
{/if}
<Time
style="display:contents;"
live={5 * 1_000}
relative
format="dddd @ h:mm A · MMMM D, YYYY"
timestamp={personal_site_status.since}
/>
{/if}
</li>
{/if}
</SidebarGroup> </SidebarGroup>
<SidebarGroup border> <SidebarGroup border>
<SidebarItem <SidebarItem
@ -253,4 +306,7 @@
li.clickable { li.clickable {
cursor: pointer; cursor: pointer;
} }
.site-cnx-details {
@apply mt-0 !important;
}
</style> </style>

@ -89,10 +89,10 @@
console.error(event.detail); console.error(event.detail);
} }
async function gotWallet(event) { async function gotWallet(event) {
//console.log(event.detail);
if (importing) { if (importing) {
try { try {
await ng.wallet_import(wallet, event.detail.wallet); let new_client = await ng.wallet_import(wallet, event.detail.wallet);
event.detail.wallet.V0.clients[new_client[0]] = new_client[1];
let walls = await ng.get_wallets_from_localstorage(); let walls = await ng.get_wallets_from_localstorage();
wallets.set(walls); wallets.set(walls);
} catch (e) { } catch (e) {
@ -321,8 +321,10 @@
</a> </a>
</div> </div>
</div> </div>
{:else if step == "security"}{:else if step == "qrcode"}{:else if step == "cloud"}{:else if step == "loggedin"}you {:else if step == "security"}{:else if step == "qrcode"}{:else if step == "cloud"}
are logged in{/if}
{:else if step == "loggedin"}
You are logged in.<br /> please wait while the app is loading...{/if}
</CenteredLayout> </CenteredLayout>
</div> </div>

@ -7,7 +7,7 @@
// notice may not be copied, modified, or distributed except // notice may not be copied, modified, or distributed except
// according to those terms. // according to those terms.
import { writable, readonly, derived } from "svelte/store"; import { writable, readonly, derived, get } from "svelte/store";
import ng from "./api"; import ng from "./api";
let all_branches = {}; let all_branches = {};
@ -19,7 +19,30 @@ export const active_wallet = writable(undefined);
export const wallets = writable({}); export const wallets = writable({});
export const online = writable(false); export const connections = writable({});
let next_reconnect = 0;
export const online = derived(connections,($connections) => {
for (const cnx of Object.keys($connections)) {
if (!$connections[cnx].error) return true;
else if ($connections[cnx].error=="PeerAlreadyConnected") {
connections.update((c) => {
c[cnx].error = undefined;
return c;
});
return true; }
else if ($connections[cnx].error=="ConnectionError" && !$connections[cnx].connecting && next_reconnect==0) {
console.log("will try reconnect in 1 min");
next_reconnect = 60;
setTimeout(async ()=> {
next_reconnect = 0;
await reconnect();
},60000);
}
}
return false;
});
export const has_wallets = derived(wallets,($wallets) => Object.keys($wallets).length); export const has_wallets = derived(wallets,($wallets) => Object.keys($wallets).length);
@ -31,16 +54,62 @@ export const set_active_session = function(session) {
export { writable, readonly, derived }; export { writable, readonly, derived };
export const close_active_wallet = function() { export const close_active_wallet = async function() {
active_session.set(undefined); await close_active_session();
active_wallet.update((w) => { active_wallet.update((w) => {
delete w.wallet; delete w.wallet;
return w; return w;
}); });
} }
export const close_active_session = async function() {
active_session.set(undefined);
await ng.broker_disconnect();
}
const can_connect = derived([active_wallet, active_session], ([$s1, $s2]) => [
$s1,
$s2,
]
);
export const reconnect = async function() {
try {
let client = get(wallets)[get(active_wallet).id].client;
let info = await ng.client_info()
//console.log("Connecting with",client,info);
connections.set(await ng.broker_connect(
client,
info,
get(active_session),
get(active_wallet).wallet,
location.href
));
}catch (e) {
console.error(e)
}
}
export const disconnections_subscribe = async function() {
let disconnections_unsub = await ng.disconnections_subscribe(async (user_id) => {
console.log("DISCONNECTION FOR USER", user_id);
connections.update((c) => {
c[user_id].error = "ConnectionError";
c[user_id].since = new Date();
return c;
});
});
}
can_connect.subscribe(async (value) => {
if (value[0] && value[0].wallet && value[1]) {
await reconnect();
}
});
const branch_commits = (nura, sub) => { const branch_commits = (nura, sub) => {
// console.log("branch_commits") // console.log("branch_commits")
// const { subscribe, set, update } = writable([]); // create the underlying writable store // const { subscribe, set, update } = writable([]); // create the underlying writable store

@ -734,7 +734,7 @@ let face = [
{ {
hexcode: "1fab4", hexcode: "1fab4",
shortcode: "potted_plant", shortcode: "potted_plant",
code: "plant", code: "potted_plant",
}, },
{ {

@ -38,7 +38,7 @@ base64-url = "2.0.0"
[target.'cfg(target_arch = "wasm32")'.dependencies] [target.'cfg(target_arch = "wasm32")'.dependencies]
js-sys = "0.3.61" js-sys = "0.3.61"
serde-wasm-bindgen = "0.5" serde-wasm-bindgen = "0.6"
wasm-bindgen-futures = "0.4.34" wasm-bindgen-futures = "0.4.34"
# web-sys = { version = "0.3.61", features = ["Window"] } # web-sys = { version = "0.3.61", features = ["Window"] }
gloo-timers = "0.2.6" gloo-timers = "0.2.6"

@ -23,7 +23,7 @@ use p2p_net::broker::*;
use p2p_net::connection::{ClientConfig, StartConfig}; use p2p_net::connection::{ClientConfig, StartConfig};
use p2p_net::types::{ use p2p_net::types::{
BootstrapContent, BootstrapContentV0, ClientId, ClientInfo, ClientInfoV0, ClientType, BootstrapContent, BootstrapContentV0, ClientId, ClientInfo, ClientInfoV0, ClientType,
CreateAccountBSP, DirectPeerId, UserId, IP, CreateAccountBSP, DirectPeerId, Identity, UserId, IP,
}; };
use p2p_net::utils::{decode_invitation_string, spawn_and_log_error, Receiver, ResultSend, Sender}; use p2p_net::utils::{decode_invitation_string, spawn_and_log_error, Receiver, ResultSend, Sender};
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
@ -125,7 +125,9 @@ pub fn wallet_open_wallet_with_pazzle(
.map_err(|_| "Deserialization error of pin")?; .map_err(|_| "Deserialization error of pin")?;
let res = open_wallet_with_pazzle(wallet, pazzle, pin); let res = open_wallet_with_pazzle(wallet, pazzle, pin);
match res { match res {
Ok(r) => Ok(serde_wasm_bindgen::to_value(&r).unwrap()), Ok(r) => Ok(r
.serialize(&serde_wasm_bindgen::Serializer::new().serialize_maps_as_objects(true))
.unwrap()),
Err(e) => Err(e.to_string().into()), Err(e) => Err(e.to_string().into()),
} }
} }
@ -294,7 +296,7 @@ pub async fn wallet_open_file(js_file: JsValue) -> Result<JsValue, String> {
pub async fn wallet_import( pub async fn wallet_import(
js_previous_wallet: JsValue, //Wallet, js_previous_wallet: JsValue, //Wallet,
js_opened_wallet: JsValue, //EncryptedWallet js_opened_wallet: JsValue, //EncryptedWallet
) -> Result<(), String> { ) -> Result<JsValue, String> {
let previous_wallet = serde_wasm_bindgen::from_value::<Wallet>(js_previous_wallet) let previous_wallet = serde_wasm_bindgen::from_value::<Wallet>(js_previous_wallet)
.map_err(|_| "Deserialization error of Wallet".to_string())?; .map_err(|_| "Deserialization error of Wallet".to_string())?;
let mut opened_wallet = serde_wasm_bindgen::from_value::<EncryptedWallet>(js_opened_wallet) let mut opened_wallet = serde_wasm_bindgen::from_value::<EncryptedWallet>(js_opened_wallet)
@ -312,10 +314,10 @@ pub async fn wallet_import(
opened_wallet_v0.personal_site, opened_wallet_v0.personal_site,
) )
.map_err(|e| format!("Cannot create new session: {e}"))?; .map_err(|e| format!("Cannot create new session: {e}"))?;
let (wallet, client) = opened_wallet_v0 let (wallet, client_id, client) = opened_wallet_v0
.import(previous_wallet, session) .import(previous_wallet, session)
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
let lws = LocalWalletStorageV0::new(wallet, client); let lws = LocalWalletStorageV0::new(wallet, &client);
wallets.insert(wallet_name, lws); wallets.insert(wallet_name, lws);
@ -325,7 +327,7 @@ pub async fn wallet_import(
if r.is_some() { if r.is_some() {
return Err(r.unwrap()); return Err(r.unwrap());
} }
Ok(()) Ok(serde_wasm_bindgen::to_value(&(client_id, client)).unwrap())
} else { } else {
Err("Wallet already present on this device".to_string()) Err("Wallet already present on this device".to_string())
} }
@ -364,13 +366,13 @@ extern "C" {
#[cfg(wasmpack_target = "nodejs")] #[cfg(wasmpack_target = "nodejs")]
#[wasm_bindgen] #[wasm_bindgen]
pub fn client_info() -> JsValue { pub fn client_info() -> JsValue {
let res = ClientInfoV0 { let res = ClientInfo::V0(ClientInfoV0 {
client_type: ClientType::NodeService, client_type: ClientType::NodeService,
details: client_details(), details: client_details(),
version: version(), version: version(),
timestamp_install: 0, timestamp_install: 0,
timestamp_updated: 0, timestamp_updated: 0,
}; });
//res //res
serde_wasm_bindgen::to_value(&res).unwrap() serde_wasm_bindgen::to_value(&res).unwrap()
} }
@ -429,7 +431,7 @@ pub fn client_info_() -> ClientInfoV0 {
#[cfg(all(not(wasmpack_target = "nodejs"), target_arch = "wasm32"))] #[cfg(all(not(wasmpack_target = "nodejs"), target_arch = "wasm32"))]
#[wasm_bindgen] #[wasm_bindgen]
pub fn client_info() -> JsValue { pub fn client_info() -> JsValue {
let res = client_info_(); let res = ClientInfo::V0(client_info_());
serde_wasm_bindgen::to_value(&res).unwrap() serde_wasm_bindgen::to_value(&res).unwrap()
} }
@ -523,6 +525,46 @@ pub async fn doc_sync_branch(anuri: String, callback: &js_sys::Function) -> JsVa
return ret; return ret;
} }
#[cfg(target_arch = "wasm32")]
#[wasm_bindgen]
pub async fn disconnections_subscribe(callback: &js_sys::Function) -> Result<JsValue, JsValue> {
let vec: Vec<u8> = vec![2; 10];
let view = unsafe { Uint8Array::view(&vec) };
let x = JsValue::from(Uint8Array::new(view.as_ref()));
let mut reader;
{
reader = BROKER
.write()
.await
.take_disconnections_receiver()
.ok_or(false)?;
}
async fn inner_task(
mut reader: Receiver<String>,
callback: js_sys::Function,
) -> ResultSend<()> {
while let Some(user_id) = reader.next().await {
let this = JsValue::null();
let xx = serde_wasm_bindgen::to_value(&user_id).unwrap();
let jsval: JsValue = callback.call1(&this, &xx).unwrap();
let promise_res: Result<js_sys::Promise, JsValue> = jsval.dyn_into();
match promise_res {
Ok(promise) => {
JsFuture::from(promise).await;
}
Err(_) => {}
}
}
log_debug!("END OF disconnections reader");
Ok(())
}
spawn_and_log_error(inner_task(reader, callback.clone()));
Ok(true.into())
}
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
#[wasm_bindgen] #[wasm_bindgen]
pub async fn probe() { pub async fn probe() {
@ -562,7 +604,7 @@ pub async fn start() {
.write() .write()
.await .await
.connect( .connect(
Box::new(ConnectionWebSocket {}), Arc::new(Box::new(ConnectionWebSocket {})),
keys.0, keys.0,
keys.1, keys.1,
server_key, server_key,
@ -571,7 +613,6 @@ pub async fn start() {
user, user,
user_priv, user_priv,
client, client,
client_priv,
info: ClientInfo::V0(client_info_()), info: ClientInfo::V0(client_info_()),
registration: None, registration: None,
}), }),
@ -610,18 +651,73 @@ pub async fn start() {
spawn_and_log_error(inner_task()).await; spawn_and_log_error(inner_task()).await;
} }
#[cfg(not(target_arch = "wasm32"))] #[cfg(target_arch = "wasm32")]
#[wasm_bindgen] #[wasm_bindgen]
pub fn start() { pub async fn broker_disconnect() {
//alert(&format!("I say: {}", name)); Broker::close_all_connections().await;
task::spawn(async move {});
} }
#[cfg(target_arch = "wasm32")]
#[wasm_bindgen] #[wasm_bindgen]
pub fn change(name: &str) -> JsValue { pub async fn broker_connect(
let mut random_buf = [0u8; 32]; client_pub_key_js: JsValue,
getrandom::getrandom(&mut random_buf).unwrap(); client_info_js: JsValue,
JsValue::from_str(&format!("Hellooo, {}!", name)) session_js: JsValue,
opened_wallet_js: JsValue,
location: Option<String>,
) -> Result<JsValue, String> {
let client = serde_wasm_bindgen::from_value::<PubKey>(client_pub_key_js)
.map_err(|_| "serde error on client")?;
let info = serde_wasm_bindgen::from_value::<ClientInfo>(client_info_js)
.map_err(|_| "serde error on info")?;
let session =
serde_wasm_bindgen::from_value::<HashMap<String, SessionPeerStorageV0>>(session_js)
.map_err(|_| "serde error on session")?;
let opened_wallet = serde_wasm_bindgen::from_value::<EncryptedWallet>(opened_wallet_js)
.map_err(|_| "serde error on opened_wallet")?;
#[derive(Serialize, Deserialize)]
struct ConnectionInfo {
pub server_id: String,
pub server_ip: String,
pub error: Option<String>,
#[serde(with = "serde_wasm_bindgen::preserve")]
pub since: js_sys::Date,
}
let mut opened_connections: HashMap<String, ConnectionInfo> = HashMap::new();
let results = connect_wallet(
client,
info,
session,
opened_wallet,
location,
Box::new(ConnectionWebSocket {}),
)
.await?;
log_debug!("{:?}", results);
for result in results {
let mut date = js_sys::Date::new_0();
date.set_time(result.4);
opened_connections.insert(
result.0,
ConnectionInfo {
server_id: result.1,
server_ip: result.2,
error: result.3,
since: date,
},
);
}
BROKER.read().await.print_status();
Ok(opened_connections
.serialize(&serde_wasm_bindgen::Serializer::new().serialize_maps_as_objects(true))
.unwrap())
} }
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]

@ -698,7 +698,7 @@ const plants: [EmojiDef<'static>; 15] = [
EmojiDef { EmojiDef {
hexcode: "1fab4", hexcode: "1fab4",
shortcode: "potted_plant", shortcode: "potted_plant",
code: "plant", code: "potted_plant",
}, },
EmojiDef { EmojiDef {
hexcode: "1f490", hexcode: "1f490",

@ -19,7 +19,7 @@ pub mod bip39;
pub mod emojis; pub mod emojis;
use std::{collections::HashMap, io::Cursor}; use std::{collections::HashMap, io::Cursor, sync::Arc};
use crate::bip39::bip39_wordlist; use crate::bip39::bip39_wordlist;
use crate::types::*; use crate::types::*;
@ -34,7 +34,14 @@ use zeroize::{Zeroize, ZeroizeOnDrop};
use image::{imageops::FilterType, io::Reader as ImageReader, ImageOutputFormat}; use image::{imageops::FilterType, io::Reader as ImageReader, ImageOutputFormat};
use safe_transmute::transmute_to_bytes; use safe_transmute::transmute_to_bytes;
use p2p_net::types::{SiteType, SiteV0}; use p2p_net::{
broker::BROKER,
connection::{ClientConfig, StartConfig},
};
use p2p_net::{
connection::IConnect,
types::{ClientInfo, Identity, SiteType, SiteV0},
};
use p2p_repo::types::{PubKey, Sig, Timestamp}; use p2p_repo::types::{PubKey, Sig, Timestamp};
use p2p_repo::utils::{generate_keypair, now_timestamp, sign, verify}; use p2p_repo::utils::{generate_keypair, now_timestamp, sign, verify};
use p2p_repo::{log::*, types::PrivKey}; use p2p_repo::{log::*, types::PrivKey};
@ -430,6 +437,124 @@ pub fn gen_shuffle_for_pazzle_opening(pazzle_length: u8) -> ShuffledPazzle {
emoji_indices, emoji_indices,
} }
} }
use web_time::SystemTime;
fn get_unix_time() -> f64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as f64
}
// Result is a list of (user_id, server_id, server_ip, error, since_date)
pub async fn connect_wallet(
client: PubKey,
info: ClientInfo,
session: HashMap<String, SessionPeerStorageV0>,
opened_wallet: EncryptedWallet,
location: Option<String>,
cnx: Box<dyn IConnect>,
) -> Result<Vec<(String, String, String, Option<String>, f64)>, String> {
let mut result: Vec<(String, String, String, Option<String>, f64)> = Vec::new();
let arc_cnx = Arc::new(cnx);
match opened_wallet {
EncryptedWallet::V0(wallet) => {
log_debug!("XXXX {} {:?}", client.to_string(), wallet);
let auto_open = &wallet
.clients
.get(&client.to_string())
.ok_or("Client is invalid")?
.auto_open;
for site in auto_open {
match site {
Identity::OrgSite(user) | Identity::IndividualSite(user) => {
let user_id = user.to_string();
let peer_key = session
.get(&user_id)
.ok_or("Session is missing")?
.peer_key
.clone();
let peer_id = peer_key.to_pub();
let site = wallet.sites.get(&user_id);
if site.is_none() {
result.push((
user_id,
"".into(),
"".into(),
Some("Site is missing".into()),
get_unix_time(),
));
continue;
}
let site = site.unwrap();
let user_priv = site.site_key.clone();
let core = site.cores[0]; //TODO: cycle the other cores if failure to connect (failover)
let server_key = core.0;
let broker = wallet.brokers.get(&core.0.to_string());
if broker.is_none() {
result.push((
user_id,
core.0.to_string(),
"".into(),
Some("Broker is missing".into()),
get_unix_time(),
));
continue;
}
let broker = broker.unwrap();
for broker_info in broker {
match broker_info {
BrokerInfoV0::ServerV0(server) => {
let url = server.get_ws_url(&location).await;
//Option<(String, Vec<BindAddress>)>
if url.is_some() {
let url = url.unwrap();
if url.1.len() == 0 {
// TODO deal with BoxPublic and on tauri all Box...
let res = BROKER
.write()
.await
.connect(
arc_cnx.clone(),
peer_key,
peer_id,
server_key,
StartConfig::Client(ClientConfig {
url: url.0.clone(),
user: *user,
user_priv,
client,
info: info.clone(),
registration: Some(core.1),
}),
)
.await;
log_debug!("broker.connect : {:?}", res);
result.push((
user_id,
core.0.to_string(),
url.0.into(),
match res {
Ok(_) => None,
Err(e) => Some(e.to_string()),
},
get_unix_time(),
));
}
break; // TODO implement failover
}
}
_ => {}
}
}
}
_ => unimplemented!(),
}
}
}
}
Ok(result)
}
pub fn gen_shuffle_for_pin() -> Vec<u8> { pub fn gen_shuffle_for_pin() -> Vec<u8> {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();

@ -121,7 +121,7 @@ impl From<&CreateWalletResultV0> for LocalWalletStorageV0 {
} }
impl LocalWalletStorageV0 { impl LocalWalletStorageV0 {
pub fn new(wallet: Wallet, client: ClientV0) -> Self { pub fn new(wallet: Wallet, client: &ClientV0) -> Self {
LocalWalletStorageV0 { LocalWalletStorageV0 {
bootstrap: BootstrapContent::V0(BootstrapContentV0::new()), bootstrap: BootstrapContent::V0(BootstrapContentV0::new()),
wallet, wallet,
@ -157,6 +157,9 @@ pub struct ClientV0 {
} }
impl ClientV0 { impl ClientV0 {
pub fn id(&self) -> String {
self.priv_key.to_pub().to_string()
}
#[deprecated(note = "**Don't use dummy method**")] #[deprecated(note = "**Don't use dummy method**")]
pub fn dummy() -> Self { pub fn dummy() -> Self {
ClientV0 { ClientV0 {
@ -212,6 +215,9 @@ pub struct EncryptedWalletV0 {
#[zeroize(skip)] #[zeroize(skip)]
pub personal_site: PubKey, pub personal_site: PubKey,
#[zeroize(skip)]
pub personal_site_id: String,
#[zeroize(skip)] #[zeroize(skip)]
pub sites: HashMap<String, SiteV0>, pub sites: HashMap<String, SiteV0>,
@ -242,7 +248,7 @@ impl EncryptedWalletV0 {
&mut self, &mut self,
previous_wallet: Wallet, previous_wallet: Wallet,
session: SessionWalletStorageV0, session: SessionWalletStorageV0,
) -> Result<(Wallet, ClientV0), NgWalletError> { ) -> Result<(Wallet, String, ClientV0), NgWalletError> {
if self.log.is_none() { if self.log.is_none() {
return Err(NgWalletError::InternalError); return Err(NgWalletError::InternalError);
} }
@ -260,6 +266,7 @@ impl EncryptedWalletV0 {
nonce, nonce,
self.wallet_privkey.clone(), self.wallet_privkey.clone(),
)?, )?,
client.id(),
client, client,
)) ))
} }
@ -711,6 +718,7 @@ pub struct WalletOpCreateV0 {
impl From<&WalletOpCreateV0> for EncryptedWalletV0 { impl From<&WalletOpCreateV0> for EncryptedWalletV0 {
fn from(op: &WalletOpCreateV0) -> Self { fn from(op: &WalletOpCreateV0) -> Self {
let personal_site = op.personal_site.site_key.to_pub();
let mut wallet = EncryptedWalletV0 { let mut wallet = EncryptedWalletV0 {
wallet_privkey: op.wallet_privkey.clone(), wallet_privkey: op.wallet_privkey.clone(),
wallet_id: op.wallet_privkey.to_pub().to_string(), wallet_id: op.wallet_privkey.to_pub().to_string(),
@ -718,7 +726,8 @@ impl From<&WalletOpCreateV0> for EncryptedWalletV0 {
mnemonic: op.mnemonic.clone(), mnemonic: op.mnemonic.clone(),
pin: op.pin.clone(), pin: op.pin.clone(),
save_to_ng_one: op.save_to_ng_one.clone(), save_to_ng_one: op.save_to_ng_one.clone(),
personal_site: op.personal_site.site_key.to_pub(), personal_site,
personal_site_id: personal_site.to_string(),
sites: HashMap::new(), sites: HashMap::new(),
brokers: HashMap::new(), brokers: HashMap::new(),
clients: HashMap::new(), clients: HashMap::new(),

@ -1,7 +1,5 @@
// Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers // Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved. // All rights reserved.
// This code is partly derived from work written by TG x Thoth from P2Pcollab.
// Copyright 2022 TG x Thoth
// Licensed under the Apache License, Version 2.0 // Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0> // <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>, // or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,

@ -48,7 +48,7 @@ impl IConnect for ConnectionWebSocket {
peer_pubk: PubKey, peer_pubk: PubKey,
remote_peer: DirectPeerId, remote_peer: DirectPeerId,
config: StartConfig, config: StartConfig,
) -> Result<ConnectionBase, NetError> { ) -> Result<ConnectionBase, ProtocolError> {
let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS); let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS);
let res = connect_async(url).await; let res = connect_async(url).await;
@ -56,7 +56,7 @@ impl IConnect for ConnectionWebSocket {
match res { match res {
Err(e) => { Err(e) => {
log_debug!("Cannot connect: {:?}", e); log_debug!("Cannot connect: {:?}", e);
Err(NetError::ConnectionError) Err(ProtocolError::ConnectionError)
} }
Ok((websocket, _)) => { Ok((websocket, _)) => {
cnx.start_read_loop(None, Some(peer_privk), Some(remote_peer)); cnx.start_read_loop(None, Some(peer_privk), Some(remote_peer));
@ -78,7 +78,7 @@ impl IConnect for ConnectionWebSocket {
log_debug!("END of WS loop"); log_debug!("END of WS loop");
}); });
cnx.start(config).await; cnx.start(config).await?;
Ok(cnx) Ok(cnx)
} }
@ -330,7 +330,7 @@ mod test {
.write() .write()
.await .await
.connect( .connect(
Box::new(ConnectionWebSocket {}), Arc::new(Box::new(ConnectionWebSocket {})),
keys.0, keys.0,
keys.1, keys.1,
server_key, server_key,
@ -339,7 +339,6 @@ mod test {
user, user,
user_priv, user_priv,
client, client,
client_priv,
info: ClientInfo::new(ClientType::Cli, "".into(), "".into()), info: ClientInfo::new(ClientType::Cli, "".into(), "".into()),
registration: None, registration: None,
}), }),

@ -42,13 +42,13 @@ impl IConnect for ConnectionWebSocket {
peer_pubk: PubKey, peer_pubk: PubKey,
remote_peer: DirectPeerId, remote_peer: DirectPeerId,
config: StartConfig, config: StartConfig,
) -> Result<ConnectionBase, NetError> { ) -> Result<ConnectionBase, ProtocolError> {
log_debug!("url {}", url); log_debug!("url {}", url);
let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS); let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS);
let (mut ws, wsio) = WsMeta::connect(url, None).await.map_err(|e| { let (mut ws, wsio) = WsMeta::connect(url, None).await.map_err(|e| {
//log_debug!("{:?}", e); //log_debug!("{:?}", e);
NetError::ConnectionError ProtocolError::ConnectionError
})?; })?;
cnx.start_read_loop(None, Some(peer_privk), Some(remote_peer)); cnx.start_read_loop(None, Some(peer_privk), Some(remote_peer));
@ -62,7 +62,7 @@ impl IConnect for ConnectionWebSocket {
shutdown, shutdown,
)); ));
cnx.start(config).await; cnx.start(config).await?;
Ok(cnx) Ok(cnx)
} }

@ -0,0 +1,42 @@
/*
* Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
* All rights reserved.
* Licensed under the Apache License, Version 2.0
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
* at your option. All files in the project carrying such
* notice may not be copied, modified, or distributed except
* according to those terms.
*/
use crate::broker::{ServerConfig, BROKER};
use crate::connection::NoiseFSM;
use crate::types::*;
use crate::{actor::*, errors::ProtocolError, types::ProtocolMessage};
use async_std::sync::Mutex;
use p2p_repo::log::*;
use p2p_repo::types::PubKey;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
pub type Connecting = ();
impl From<Connecting> for ProtocolMessage {
fn from(msg: Connecting) -> ProtocolMessage {
unimplemented!();
}
}
impl Actor<'_, Connecting, ()> {}
#[async_trait::async_trait]
impl EActor for Actor<'_, Connecting, ()> {
async fn respond(
&mut self,
msg: ProtocolMessage,
fsm: Arc<Mutex<NoiseFSM>>,
) -> Result<(), ProtocolError> {
fsm.lock().await.remove_actor(0).await;
Ok(())
}
}

@ -21,3 +21,6 @@ pub use add_invitation::*;
pub mod list_invitations; pub mod list_invitations;
pub use list_invitations::*; pub use list_invitations::*;
pub mod connecting;
pub use connecting::*;

@ -90,6 +90,8 @@ pub struct Broker<'a> {
test: u32, test: u32,
tauri_streams: HashMap<String, Sender<Commit>>, tauri_streams: HashMap<String, Sender<Commit>>,
disconnections_sender: Sender<String>,
disconnections_receiver: Option<Receiver<String>>,
} }
impl<'a> Broker<'a> { impl<'a> Broker<'a> {
@ -441,6 +443,7 @@ impl<'a> Broker<'a> {
let (shutdown_sender, shutdown_receiver) = mpsc::unbounded::<ProtocolError>(); let (shutdown_sender, shutdown_receiver) = mpsc::unbounded::<ProtocolError>();
let mut random_buf = [0u8; 4]; let mut random_buf = [0u8; 4];
getrandom::getrandom(&mut random_buf).unwrap(); getrandom::getrandom(&mut random_buf).unwrap();
let (disconnections_sender, disconnections_receiver) = mpsc::unbounded::<String>();
Broker { Broker {
anonymous_connections: HashMap::new(), anonymous_connections: HashMap::new(),
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
@ -455,6 +458,8 @@ impl<'a> Broker<'a> {
closing: false, closing: false,
test: u32::from_be_bytes(random_buf), test: u32::from_be_bytes(random_buf),
server_storage: None, server_storage: None,
disconnections_sender,
disconnections_receiver: Some(disconnections_receiver),
} }
} }
@ -526,6 +531,29 @@ impl<'a> Broker<'a> {
.await; .await;
} }
pub async fn close_all_connections() {
let peer_ids;
let anonymous;
{
let broker = BROKER.write().await;
if broker.closing {
return;
}
peer_ids = Vec::from_iter(broker.peers.keys().cloned());
anonymous = Vec::from_iter(broker.anonymous_connections.keys().cloned());
}
for peer_id in peer_ids {
BROKER
.write()
.await
.close_peer_connection_x(peer_id.1, peer_id.0)
.await;
}
for anon in anonymous {
BROKER.write().await.close_anonymous(anon.1, anon.0).await;
}
}
pub async fn shutdown(&mut self) { pub async fn shutdown(&mut self) {
if self.closing { if self.closing {
return; return;
@ -728,14 +756,14 @@ impl<'a> Broker<'a> {
pub async fn connect( pub async fn connect(
&mut self, &mut self,
cnx: Box<dyn IConnect>, cnx: Arc<Box<dyn IConnect>>,
peer_privk: PrivKey, peer_privk: PrivKey,
peer_pubk: PubKey, peer_pubk: PubKey,
remote_peer_id: DirectPeerId, remote_peer_id: DirectPeerId,
config: StartConfig, config: StartConfig,
) -> Result<(), NetError> { ) -> Result<(), ProtocolError> {
if self.closing { if self.closing {
return Err(NetError::Closing); return Err(ProtocolError::Closing);
} }
log_debug!("CONNECTING"); log_debug!("CONNECTING");
@ -750,7 +778,7 @@ impl<'a> Broker<'a> {
match already.unwrap().connected { match already.unwrap().connected {
PeerConnection::NONE => {} PeerConnection::NONE => {}
_ => { _ => {
return Err(NetError::PeerAlreadyConnected); return Err(ProtocolError::PeerAlreadyConnected);
} }
}; };
} }
@ -784,7 +812,7 @@ impl<'a> Broker<'a> {
self.direct_connections.insert(ip, dc); self.direct_connections.insert(ip, dc);
PeerConnection::Core(ip) PeerConnection::Core(ip)
} }
StartConfig::Client(config) => PeerConnection::Client(connection), StartConfig::Client(_config) => PeerConnection::Client(connection),
_ => unimplemented!(), _ => unimplemented!(),
}; };
@ -798,15 +826,16 @@ impl<'a> Broker<'a> {
async fn watch_close( async fn watch_close(
mut join: Receiver<Either<NetError, X25519PrivKey>>, mut join: Receiver<Either<NetError, X25519PrivKey>>,
cnx: Box<dyn IConnect>, cnx: Arc<Box<dyn IConnect>>,
peer_privk: PrivKey, peer_privk: PrivKey,
peer_pubkey: PubKey, peer_pubkey: PubKey,
remote_peer_id: [u8; 32], remote_peer_id: [u8; 32],
config: StartConfig, config: StartConfig,
mut disconnections_sender: Sender<String>,
) -> ResultSend<()> { ) -> ResultSend<()> {
async move { async move {
let res = join.next().await; let res = join.next().await;
log_debug!("SOCKET IS CLOSED {:?} {:?}", res, remote_peer_id); log_info!("SOCKET IS CLOSED {:?} {:?}", res, remote_peer_id);
if res.is_some() if res.is_some()
&& res.as_ref().unwrap().is_left() && res.as_ref().unwrap().is_left()
&& res.unwrap().unwrap_left() != NetError::Closing && res.unwrap().unwrap_left() != NetError::Closing
@ -820,8 +849,13 @@ impl<'a> Broker<'a> {
// .await; // .await;
// log_debug!("SOCKET RECONNECTION {:?} {:?}", result, &remote_peer_id); // log_debug!("SOCKET RECONNECTION {:?} {:?}", result, &remote_peer_id);
// TODO: deal with error and incremental backoff // TODO: deal with error and incremental backoff
// if all attempts fail :
if let Some(user) = config.get_user() {
disconnections_sender.send(user.to_string()).await;
}
} else { } else {
log_debug!("REMOVED"); log_info!("REMOVED");
BROKER BROKER
.write() .write()
.await .await
@ -838,10 +872,15 @@ impl<'a> Broker<'a> {
peer_pubk, peer_pubk,
*remote_peer_id_dh.slice(), *remote_peer_id_dh.slice(),
config, config,
self.disconnections_sender.clone(),
)); ));
Ok(()) Ok(())
} }
pub fn take_disconnections_receiver(&mut self) -> Option<Receiver<String>> {
self.disconnections_receiver.take()
}
pub async fn close_peer_connection_x(&mut self, peer_id: X25519PubKey, user: Option<PubKey>) { pub async fn close_peer_connection_x(&mut self, peer_id: X25519PubKey, user: Option<PubKey>) {
if let Some(peer) = self.peers.get_mut(&(user, peer_id)) { if let Some(peer) = self.peers.get_mut(&(user, peer_id)) {
match &mut peer.connected { match &mut peer.connected {
@ -878,13 +917,13 @@ impl<'a> Broker<'a> {
pub fn print_status(&self) { pub fn print_status(&self) {
self.peers.iter().for_each(|(peerId, peerInfo)| { self.peers.iter().for_each(|(peerId, peerInfo)| {
log_debug!("PEER in BROKER {:?} {:?}", peerId, peerInfo); log_info!("PEER in BROKER {:?} {:?}", peerId, peerInfo);
}); });
self.direct_connections.iter().for_each(|(ip, directCnx)| { self.direct_connections.iter().for_each(|(ip, directCnx)| {
log_debug!("direct_connection in BROKER {:?} {:?}", ip, directCnx); log_info!("direct_connection in BROKER {:?} {:?}", ip, directCnx);
}); });
self.anonymous_connections.iter().for_each(|(binds, cb)| { self.anonymous_connections.iter().for_each(|(binds, cb)| {
log_debug!( log_info!(
"ANONYMOUS remote {:?} local {:?} {:?}", "ANONYMOUS remote {:?} local {:?} {:?}",
binds.1, binds.1,
binds.0, binds.0,

@ -68,7 +68,7 @@ pub trait IConnect: Send + Sync {
peer_pubk: PubKey, peer_pubk: PubKey,
remote_peer: DirectPeerId, remote_peer: DirectPeerId,
config: StartConfig, config: StartConfig,
) -> Result<ConnectionBase, NetError>; ) -> Result<ConnectionBase, ProtocolError>;
async fn probe(&self, ip: IP, port: u16) -> Result<Option<PubKey>, ProtocolError>; async fn probe(&self, ip: IP, port: u16) -> Result<Option<PubKey>, ProtocolError>;
} }
@ -162,7 +162,6 @@ pub struct ClientConfig {
pub user: PubKey, pub user: PubKey,
pub user_priv: PrivKey, pub user_priv: PrivKey,
pub client: PubKey, pub client: PubKey,
pub client_priv: PrivKey,
pub info: ClientInfo, pub info: ClientInfo,
pub registration: Option<Option<[u8; 32]>>, pub registration: Option<Option<[u8; 32]>>,
} }
@ -215,6 +214,12 @@ impl StartConfig {
_ => false, _ => false,
} }
} }
pub fn is_admin(&self) -> bool {
match self {
StartConfig::Admin(_) => true,
_ => false,
}
}
} }
impl NoiseFSM { impl NoiseFSM {
@ -786,6 +791,16 @@ impl NoiseFSM {
log_debug!("AUTHENTICATION SUCCESSFUL ! waiting for requests on the client side"); log_debug!("AUTHENTICATION SUCCESSFUL ! waiting for requests on the client side");
// we notify the actor "Connecting" that the connection is ready
let mut lock = self.actors.lock().await;
let exists = lock.remove(&0);
match exists {
Some(mut actor_sender) => {
let _ = actor_sender.send(ConnectionCommand::ReEnter).await;
}
_ => {}
}
return Ok(StepReply::NONE); return Ok(StepReply::NONE);
} }
} }
@ -930,7 +945,7 @@ impl ConnectionBase {
res = locked_fsm.step(None).await; res = locked_fsm.step(None).await;
} }
} else { } else {
panic!("shouldn't be here. ConnectionCommand is read_loop can only have 5 different variants") panic!("shouldn't be here. ConnectionCommand in read_loop can only have 5 different variants")
} }
match res { match res {
@ -1084,7 +1099,7 @@ impl ConnectionBase {
_ => Err(ProtocolError::ActorError), _ => Err(ProtocolError::ActorError),
} }
} else { } else {
panic!("cannot call probe on a server-side connection"); panic!("cannot call admin on a server-side connection");
} }
} }
@ -1145,9 +1160,10 @@ impl ConnectionBase {
} }
} }
pub async fn start(&mut self, config: StartConfig) { pub async fn start(&mut self, config: StartConfig) -> Result<(), ProtocolError> {
// BOOTSTRAP the protocol from client-side // BOOTSTRAP the protocol from client-side
if !self.dir.is_server() { if !self.dir.is_server() {
let is_admin = config.is_admin();
let res; let res;
{ {
let mut fsm = self.fsm.as_ref().unwrap().lock().await; let mut fsm = self.fsm.as_ref().unwrap().lock().await;
@ -1155,7 +1171,23 @@ impl ConnectionBase {
res = fsm.step(None).await; res = fsm.step(None).await;
} }
if let Err(err) = res { if let Err(err) = res {
self.send(ConnectionCommand::ProtocolError(err)).await; self.send(ConnectionCommand::ProtocolError(err.clone()))
.await;
Err(err)
} else if !is_admin {
let mut actor = Box::new(Actor::<Connecting, ()>::new(0, true));
self.actors.lock().await.insert(0, actor.get_receiver_tx());
let mut receiver = actor.detach_receiver();
match receiver.next().await {
Some(ConnectionCommand::ReEnter) => Ok(()),
Some(ConnectionCommand::ProtocolError(e)) => Err(e),
Some(ConnectionCommand::Error(e)) => Err(e.into()),
Some(ConnectionCommand::Close) => Err(ProtocolError::Closing),
_ => Err(ProtocolError::ActorError),
}
} else {
Ok(())
} }
} else { } else {
panic!("cannot call start on a server-side connection"); panic!("cannot call start on a server-side connection");

@ -427,9 +427,12 @@ impl BrokerServerV0 {
/// filtered by the current location url of the webpage /// filtered by the current location url of the webpage
/// on native apps (do not pass a location), returns or the connection URL without optional BindAddress or an empty string with /// on native apps (do not pass a location), returns or the connection URL without optional BindAddress or an empty string with
/// several BindAddresses to try to connect to with .to_ws_url() /// several BindAddresses to try to connect to with .to_ws_url()
pub async fn get_ws_url(&self, location: Option<String>) -> Option<(String, Vec<BindAddress>)> { pub async fn get_ws_url(
&self,
location: &Option<String>,
) -> Option<(String, Vec<BindAddress>)> {
if location.is_some() { if location.is_some() {
let location = location.unwrap(); let location = location.as_ref().unwrap();
if location.starts_with(APP_NG_ONE_URL) { if location.starts_with(APP_NG_ONE_URL) {
match &self.server_type { match &self.server_type {
BrokerServerTypeV0::BoxPublic(addrs) => { BrokerServerTypeV0::BoxPublic(addrs) => {

@ -18,6 +18,7 @@ importers:
autoprefixer: ^10.4.14 autoprefixer: ^10.4.14
classnames: ^2.3.2 classnames: ^2.3.2
cross-env: ^7.0.3 cross-env: ^7.0.3
dayjs: ^1.11.10
flowbite: ^1.6.5 flowbite: ^1.6.5
flowbite-svelte: ^0.43.3 flowbite-svelte: ^0.43.3
internal-ip: ^7.0.0 internal-ip: ^7.0.0
@ -31,6 +32,7 @@ importers:
svelte-heros-v2: ^0.10.12 svelte-heros-v2: ^0.10.12
svelte-preprocess: ^5.0.3 svelte-preprocess: ^5.0.3
svelte-spa-router: ^3.3.0 svelte-spa-router: ^3.3.0
svelte-time: ^0.8.0
tailwindcss: ^3.3.1 tailwindcss: ^3.3.1
tslib: ^2.4.1 tslib: ^2.4.1
typescript: ^4.9.5 typescript: ^4.9.5
@ -57,6 +59,7 @@ importers:
'@types/node': 18.16.16 '@types/node': 18.16.16
autoprefixer: 10.4.14_postcss@8.4.24 autoprefixer: 10.4.14_postcss@8.4.24
cross-env: 7.0.3 cross-env: 7.0.3
dayjs: 1.11.10
internal-ip: 7.0.0 internal-ip: 7.0.0
node-gzip: 1.1.2 node-gzip: 1.1.2
postcss: 8.4.24 postcss: 8.4.24
@ -66,6 +69,7 @@ importers:
svelte-check: 3.4.3_sxhny56dlbcmwov4vk7qwrzshi svelte-check: 3.4.3_sxhny56dlbcmwov4vk7qwrzshi
svelte-heros-v2: 0.10.12_svelte@3.59.1 svelte-heros-v2: 0.10.12_svelte@3.59.1
svelte-preprocess: 5.0.4_vmz4xia4c7tzh4ii3qac2x3tom svelte-preprocess: 5.0.4_vmz4xia4c7tzh4ii3qac2x3tom
svelte-time: 0.8.0
tailwindcss: 3.3.2 tailwindcss: 3.3.2
tslib: 2.5.3 tslib: 2.5.3
typescript: 4.9.5 typescript: 4.9.5
@ -850,6 +854,10 @@ packages:
css-tree: 2.2.1 css-tree: 2.2.1
dev: true dev: true
/dayjs/1.11.10:
resolution: {integrity: sha512-vjAczensTgRcqDERK0SR2XMwsF/tSvnvlv6VcF2GIhg6Sx4yOIt/irsr1RDJsKiIyBzJDpCoXiWWq28MqH2cnQ==}
dev: true
/debug/4.3.4: /debug/4.3.4:
resolution: {integrity: sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==} resolution: {integrity: sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==}
engines: {node: '>=6.0'} engines: {node: '>=6.0'}
@ -1861,6 +1869,12 @@ packages:
regexparam: 2.0.1 regexparam: 2.0.1
dev: false dev: false
/svelte-time/0.8.0:
resolution: {integrity: sha512-V0LBpJhYV2Q+jqiJ94ITAo51P6RIhrHQpDt3LCSk8PXfL2UMvSMlDzkqHq8mdKqmBCRZURnXhpynN03GQa/G/A==}
dependencies:
dayjs: 1.11.10
dev: true
/svelte/3.59.1: /svelte/3.59.1:
resolution: {integrity: sha512-pKj8fEBmqf6mq3/NfrB9SLtcJcUvjYSWyePlfCqN9gujLB25RitWK8PvFzlwim6hD/We35KbPlRteuA6rnPGcQ==} resolution: {integrity: sha512-pKj8fEBmqf6mq3/NfrB9SLtcJcUvjYSWyePlfCqN9gujLB25RitWK8PvFzlwim6hD/We35KbPlRteuA6rnPGcQ==}
engines: {node: '>= 8'} engines: {node: '>= 8'}

Loading…
Cancel
Save